[Enhancement] Support automatically choosing the latest automated cluster snapshot to restore (#56546)

Signed-off-by: xiangguangyxg <xiangguangyxg@gmail.com>
This commit is contained in:
xiangguangyxg 2025-03-13 14:23:44 +08:00 committed by GitHub
parent a86031f04d
commit 63de64df43
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 43 additions and 26 deletions

View File

@ -2,7 +2,7 @@
# information about the cluster snapshot to be downloaded and restored
#cluster_snapshot:
# cluster_snapshot_path: s3://defaultbucket/test/f7265e80-631c-44d3-a8ac-cf7cdc7adec811019/meta/image/automated_cluster_snapshot_1704038400000
# cluster_snapshot_path: s3://defaultbucket/test/f7265e80-631c-44d3-a8ac-cf7cdc7adec811019/meta
# storage_volume_name: my_s3_volume #defined in storage_volumes
# do not include leader fe

View File

@ -49,7 +49,7 @@ import java.util.concurrent.ConcurrentSkipListMap;
// only used for AUTOMATED snapshot for now
public class ClusterSnapshotMgr implements GsonPostProcessable {
public static final Logger LOG = LogManager.getLogger(ClusterSnapshotMgr.class);
public static final String AUTOMATED_NAME_PREFIX = "automated_cluster_snapshot";
public static final String AUTOMATED_NAME_PREFIX = "automated_cluster_snapshot_";
@SerializedName(value = "storageVolumeName")
private volatile String storageVolumeName;
@ -130,7 +130,7 @@ public class ClusterSnapshotMgr implements GsonPostProcessable {
public ClusterSnapshotJob createAutomatedSnapshotJob() {
long createTimeMs = System.currentTimeMillis();
long id = GlobalStateMgr.getCurrentState().getNextId();
String snapshotName = AUTOMATED_NAME_PREFIX + '_' + String.valueOf(createTimeMs);
String snapshotName = AUTOMATED_NAME_PREFIX + String.valueOf(createTimeMs);
ClusterSnapshotJob job = new ClusterSnapshotJob(id, snapshotName, storageVolumeName, createTimeMs);
job.logJob();
@ -246,7 +246,7 @@ public class ClusterSnapshotMgr implements GsonPostProcessable {
Entry<Long, ClusterSnapshotJob> entry = automatedSnapshotJobs.lastEntry();
if (entry != null) {
ClusterSnapshotJob job = entry.getValue();
// Last snapshot may in init state, because the last snapshot checkpoint does not include the
// Last snapshot may in init state, because it does not include the
// editlog for the state transtition after ClusterSnapshotJobState.INITIALIZING
if (job.getSnapshotName().equals(restoredSnapshotName) && job.isInitializing()) {
job.setJournalIds(feJournalId, starMgrJournalId);

View File

@ -14,6 +14,7 @@
package com.starrocks.lake.snapshot;
import com.starrocks.analysis.BrokerDesc;
import com.starrocks.common.Config;
import com.starrocks.common.StarRocksException;
import com.starrocks.fs.HdfsUtil;
@ -30,11 +31,11 @@ import com.starrocks.system.ComputeNode;
import com.starrocks.system.Frontend;
import com.starrocks.system.SystemInfoService;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.fs.FileStatus;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.io.File;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
@ -48,13 +49,13 @@ public class RestoreClusterSnapshotMgr {
private boolean oldResetElectionGroup;
private RestoredSnapshotInfo restoredSnapshotInfo;
private RestoreClusterSnapshotMgr(String clusterSnapshotYamlFile) throws StarRocksException, IOException {
private RestoreClusterSnapshotMgr(String clusterSnapshotYamlFile) throws StarRocksException {
config = ClusterSnapshotConfig.load(clusterSnapshotYamlFile);
downloadSnapshot();
updateConfig();
}
public static void init(String clusterSnapshotYamlFile, String[] args) throws StarRocksException, IOException {
public static void init(String clusterSnapshotYamlFile, String[] args) throws StarRocksException {
for (String arg : args) {
if (arg.equalsIgnoreCase("-cluster_snapshot")) {
LOG.info("FE start to restore from a cluster snapshot (-cluster_snapshot)");
@ -123,7 +124,7 @@ public class RestoreClusterSnapshotMgr {
Config.bdbje_reset_election_group = oldResetElectionGroup;
}
private void downloadSnapshot() throws StarRocksException, IOException {
private void downloadSnapshot() throws StarRocksException {
ClusterSnapshotConfig.ClusterSnapshot clusterSnapshot = config.getClusterSnapshot();
if (clusterSnapshot == null) {
return;
@ -140,35 +141,51 @@ public class RestoreClusterSnapshotMgr {
}
String snapshotImagePath = clusterSnapshot.getClusterSnapshotPath();
snapshotImagePath = snapshotImagePath.replaceAll("/+$", "");
if (snapshotImagePath.endsWith("/meta")) {
String pathPattern = snapshotImagePath + "/image/" + ClusterSnapshotMgr.AUTOMATED_NAME_PREFIX + '*';
List<FileStatus> fileStatusList = HdfsUtil.listFileMeta(pathPattern,
new BrokerDesc(clusterSnapshot.getStorageVolume().getProperties()), false);
if (fileStatusList.isEmpty() || fileStatusList.get(0).isFile()) {
throw new StarRocksException("No cluster snapshot found in path " + pathPattern);
}
snapshotImagePath = fileStatusList.get(0).getPath().toString();
}
LOG.info("Download cluster snapshot {} to local dir {}", snapshotImagePath, localImagePath);
HdfsUtil.copyToLocal(snapshotImagePath, localImagePath, clusterSnapshot.getStorageVolume().getProperties());
collectSnapshotInfoAfterDownload(snapshotImagePath, localImagePath);
collectSnapshotInfoAfterDownloaded(snapshotImagePath, localImagePath);
}
private void collectSnapshotInfoAfterDownload(String snapshotImagePath, String localImagePath) throws IOException {
String restoredSnapshotName = null;
private void collectSnapshotInfoAfterDownloaded(String snapshotImagePath, String localImagePath)
throws StarRocksException {
long feImageJournalId = 0L;
long starMgrImageJournalId = 0L;
Storage storageFe = new Storage(localImagePath);
Storage storageStarMgr = new Storage(localImagePath + StarMgrServer.IMAGE_SUBDIR);
// get image version
feImageJournalId = storageFe.getImageJournalId();
starMgrImageJournalId = storageStarMgr.getImageJournalId();
LOG.info("Download cluster snapshot successfully with FE image version: {}, StarMgr image version: {}",
feImageJournalId, starMgrImageJournalId);
String normalizePath = snapshotImagePath.replaceAll("/+$", "");
int lastSlashIndex = normalizePath.lastIndexOf('/');
if (lastSlashIndex != -1) {
restoredSnapshotName = normalizePath.substring(lastSlashIndex + 1);
try {
Storage storageFe = new Storage(localImagePath);
Storage storageStarMgr = new Storage(localImagePath + StarMgrServer.IMAGE_SUBDIR);
// get image version
feImageJournalId = storageFe.getImageJournalId();
starMgrImageJournalId = storageStarMgr.getImageJournalId();
} catch (Exception e) {
throw new StarRocksException("Failed to get local image version", e);
}
if (restoredSnapshotName != null) {
restoredSnapshotInfo = new RestoredSnapshotInfo(restoredSnapshotName, feImageJournalId, starMgrImageJournalId);
int lastSlashIndex = snapshotImagePath.lastIndexOf('/');
if (lastSlashIndex < 0) {
throw new StarRocksException("Failed to get snapshot name from snapshot path " + snapshotImagePath);
}
String restoredSnapshotName = snapshotImagePath.substring(lastSlashIndex + 1);
restoredSnapshotInfo = new RestoredSnapshotInfo(restoredSnapshotName,
feImageJournalId, starMgrImageJournalId);
LOG.info("Downloaded cluster snapshot {} successfully, FE image version: {}, StarMgr image version: {}",
restoredSnapshotName, feImageJournalId, starMgrImageJournalId);
}
private void updateFrontends() throws StarRocksException {