[Enhancement] Enhance cluster snapshot restore to support warehouse (#63023)

Signed-off-by: xiangguangyxg <xiangguangyxg@gmail.com>
This commit is contained in:
xiangguangyxg 2025-09-15 11:18:53 +08:00 committed by GitHub
parent f7b21e6eb8
commit ec460c2c34
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 266 additions and 34 deletions

View File

@ -29,6 +29,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
import com.google.common.base.Preconditions;
import com.starrocks.server.WarehouseManager;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@ -145,6 +146,12 @@ public class ClusterSnapshotConfig {
@JsonProperty("heartbeat_service_port")
private int heartbeatServicePort;
@JsonProperty("warehouse")
private String warehouse = WarehouseManager.DEFAULT_WAREHOUSE_NAME;
@JsonProperty("cngroup")
private String cngroup;
public String getHost() {
return host;
}
@ -161,9 +168,26 @@ public class ClusterSnapshotConfig {
this.heartbeatServicePort = heartbeatServicePort;
}
public String getWarehouse() {
return warehouse;
}
public void setWarehouse(String warehouse) {
this.warehouse = warehouse;
}
public String getCNGroup() {
return cngroup;
}
public void setCNGroup(String cngroup) {
this.cngroup = cngroup;
}
@Override
public String toString() {
return "ComputeNode [host=" + host + ", heartbeatServicePort=" + heartbeatServicePort + "]";
return "ComputeNode [host=" + host + ", heartbeatServicePort=" + heartbeatServicePort +
", warehouse=" + warehouse + ", cngroup=" + cngroup + "]";
}
}

View File

@ -23,7 +23,6 @@ import com.starrocks.persist.Storage;
import com.starrocks.server.GlobalStateMgr;
import com.starrocks.server.NodeMgr;
import com.starrocks.server.StorageVolumeMgr;
import com.starrocks.server.WarehouseManager;
import com.starrocks.sql.ast.BrokerDesc;
import com.starrocks.staros.StarMgrServer;
import com.starrocks.system.Backend;
@ -214,24 +213,22 @@ public class RestoreClusterSnapshotMgr {
}
SystemInfoService systemInfoService = GlobalStateMgr.getCurrentState().getNodeMgr().getClusterInfo();
// Drop old backend nodes
for (Backend be : systemInfoService.getIdToBackend().values()) {
LOG.info("Drop old backend {}", be);
systemInfoService.dropBackend(be.getHost(), be.getHeartbeatPort(),
WarehouseManager.DEFAULT_WAREHOUSE_NAME, "", false);
systemInfoService.dropBackend(be.getHost(), be.getHeartbeatPort(), null, null, false);
}
// Drop old compute nodes
for (ComputeNode cn : systemInfoService.getIdComputeNode().values()) {
LOG.info("Drop old compute node {}", cn);
systemInfoService.dropComputeNode(cn.getHost(), cn.getHeartbeatPort(),
WarehouseManager.DEFAULT_WAREHOUSE_NAME, "");
systemInfoService.dropComputeNode(cn.getHost(), cn.getHeartbeatPort(), null, null);
}
// Add new compute nodes
for (ClusterSnapshotConfig.ComputeNode cn : computeNodes) {
LOG.info("Add new compute node {}", cn);
systemInfoService.addComputeNode(cn.getHost(), cn.getHeartbeatServicePort(),
WarehouseManager.DEFAULT_WAREHOUSE_NAME, "");
systemInfoService.addComputeNode(cn.getHost(), cn.getHeartbeatServicePort(), cn.getWarehouse(), cn.getCNGroup());
}
}

View File

@ -464,6 +464,12 @@ public class SystemInfoService implements GsonPostProcessable {
}
}
/*
* The arg warehouse and cnGroupName can be null or empty,
* which means ignore warehouse and cngroup when dropping compute node,
* otherwise they will be checked and an exception will be thrown if they are not matched.
* If the warehouse is null or empty, the cnGroupName will be ignored.
*/
public void dropComputeNode(String host, int heartbeatPort, String warehouse, String cnGroupName)
throws DdlException {
ComputeNode dropComputeNode = getComputeNodeWithHeartbeatPort(host, heartbeatPort);
@ -472,19 +478,22 @@ public class SystemInfoService implements GsonPostProcessable {
NetUtils.getHostPortInAccessibleFormat(host, heartbeatPort) + "]");
}
// check if warehouseName is right
Warehouse wh = GlobalStateMgr.getCurrentState().getWarehouseMgr().getWarehouseAllowNull(dropComputeNode.getWarehouseId());
if (wh != null) {
if (!warehouse.equalsIgnoreCase(wh.getName())) {
throw new DdlException("compute node [" + host + ":" + heartbeatPort +
"] does not exist in warehouse " + warehouse);
}
if (!Strings.isNullOrEmpty(cnGroupName)) {
// validate cnGroupName if provided
wh.validateRemoveNodeFromCNGroup(dropComputeNode, cnGroupName);
if (!Strings.isNullOrEmpty(warehouse)) {
// check if warehouseName is right
Warehouse wh = GlobalStateMgr.getCurrentState().getWarehouseMgr()
.getWarehouseAllowNull(dropComputeNode.getWarehouseId());
if (wh != null) {
if (!warehouse.equalsIgnoreCase(wh.getName())) {
throw new DdlException("compute node [" + host + ":" + heartbeatPort +
"] does not exist in warehouse " + warehouse);
}
if (!Strings.isNullOrEmpty(cnGroupName)) {
// validate cnGroupName if provided
wh.validateRemoveNodeFromCNGroup(dropComputeNode, cnGroupName);
}
}
// Allow drop compute node if `wh` is null for whatever reason
}
// Allow drop compute node if `wh` is null for whatever reason
// try to record the historical backend nodes
tryUpdateHistoricalComputeNodes(dropComputeNode.getWarehouseId(), dropComputeNode.getWorkerGroupId());
@ -575,7 +584,13 @@ public class SystemInfoService implements GsonPostProcessable {
});
}
// final entry of dropping backend
/*
* Final entry of dropping backend.
* The arg warehouse and cnGroupName can be null or empty,
* which means ignore warehouse and cngroup when dropping compute node,
* otherwise they will be checked and an exception will be thrown if they are not matched.
* If the warehouse is null or empty, the cnGroupName will be ignored.
*/
public void dropBackend(String host, int heartbeatPort, String warehouse, String cnGroupName,
boolean needCheckWithoutForce) throws DdlException {
Backend droppedBackend = getBackendWithHeartbeatPort(host, heartbeatPort);
@ -585,22 +600,25 @@ public class SystemInfoService implements GsonPostProcessable {
NetUtils.getHostPortInAccessibleFormat(host, heartbeatPort) + "]");
}
// check if warehouseName is right
Warehouse wh = GlobalStateMgr.getCurrentState().getWarehouseMgr().getWarehouseAllowNull(droppedBackend.getWarehouseId());
if (wh != null) {
if (!warehouse.equalsIgnoreCase(wh.getName())) {
LOG.warn("warehouseName in dropBackends is not equal, " +
"warehouseName from dropBackendClause is {}, while actual one is {}",
warehouse, wh.getName());
throw new DdlException("backend [" + host + ":" + heartbeatPort +
"] does not exist in warehouse " + warehouse);
}
if (!Strings.isNullOrEmpty(cnGroupName)) {
// validate cnGroupName if provided
wh.validateRemoveNodeFromCNGroup(droppedBackend, cnGroupName);
if (!Strings.isNullOrEmpty(warehouse)) {
// check if warehouseName is right
Warehouse wh = GlobalStateMgr.getCurrentState().getWarehouseMgr()
.getWarehouseAllowNull(droppedBackend.getWarehouseId());
if (wh != null) {
if (!warehouse.equalsIgnoreCase(wh.getName())) {
LOG.warn("warehouseName in dropBackends is not equal, " +
"warehouseName from dropBackendClause is {}, while actual one is {}",
warehouse, wh.getName());
throw new DdlException("backend [" + host + ":" + heartbeatPort +
"] does not exist in warehouse " + warehouse);
}
if (!Strings.isNullOrEmpty(cnGroupName)) {
// validate cnGroupName if provided
wh.validateRemoveNodeFromCNGroup(droppedBackend, cnGroupName);
}
}
// allow dropping the node if `wh` is null for whatever reason
}
// allow dropping the node if `wh` is null for whatever reason
if (needCheckWithoutForce) {
try {

View File

@ -63,6 +63,8 @@ public class ClusterSnapshotConfigTest {
ClusterSnapshotConfig.ComputeNode computeNode1 = config.getComputeNodes().get(0);
Assertions.assertEquals("172.26.92.11", computeNode1.getHost());
Assertions.assertEquals(9050, computeNode1.getHeartbeatServicePort());
Assertions.assertEquals("default_warehouse", computeNode1.getWarehouse());
Assertions.assertEquals("default_cngroup", computeNode1.getCNGroup());
ClusterSnapshotConfig.ComputeNode computeNode2 = config.getComputeNodes().get(1);
Assertions.assertEquals("172.26.92.12", computeNode2.getHost());
@ -71,6 +73,8 @@ public class ClusterSnapshotConfigTest {
computeNode1.toString();
computeNode1.setHost(computeNode2.getHost());
computeNode1.setHeartbeatServicePort(computeNode2.getHeartbeatServicePort());
computeNode1.setWarehouse(computeNode2.getWarehouse());
computeNode1.setCNGroup(computeNode2.getCNGroup());
ClusterSnapshotConfig.StorageVolume storageVolume1 = config.getStorageVolumes().get(0);
Assertions.assertEquals("my_s3_volume", storageVolume1.getName());

View File

@ -134,6 +134,7 @@ public class RestoreClusterSnapshotMgrTest {
Assertions.assertTrue(RestoreClusterSnapshotMgr.getRestoredSnapshotInfo().getStarMgrJournalId() == 10L);
Assertions.assertTrue(RestoreClusterSnapshotMgr.isRestoring());
RestoreClusterSnapshotMgr.getConfig().getComputeNodes().get(0).setCNGroup(null);
ClusterSnapshotConfig.StorageVolume sv1 = RestoreClusterSnapshotMgr.getConfig().getStorageVolumes().get(0);
ClusterSnapshotConfig.StorageVolume sv2 = RestoreClusterSnapshotMgr.getConfig().getStorageVolumes().get(1);
RestoreClusterSnapshotMgr.getConfig().getStorageVolumes().remove(0);

View File

@ -279,6 +279,192 @@ public class SystemInfoServiceTest {
Config.enable_trace_historical_node = savedConfig;
}
@Test
public void testDropBackendWithoutWarehouse() throws Exception {
new MockUp<RunMode>() {
@Mock
public RunMode getCurrentRunMode() {
return RunMode.SHARED_DATA;
}
};
Boolean savedConfig = Config.enable_trace_historical_node;
Config.enable_trace_historical_node = true;
Backend be = new Backend(10001, "newHost", 1000);
service.addBackend(be);
LocalMetastore localMetastore = new LocalMetastore(globalStateMgr, null, null);
WarehouseManager warehouseManager = new WarehouseManager();
warehouseManager.initDefaultWarehouse();
new MockUp<GlobalStateMgr>() {
@Mock
public LocalMetastore getLocalMetastore() {
return localMetastore;
}
@Mock
public WarehouseManager getWarehouseMgr() {
return warehouseManager;
}
};
new MockUp<WarehouseManager>() {
@Mock
public ComputeResource acquireComputeResource(CRAcquireContext acquireContext) {
return WarehouseManager.DEFAULT_RESOURCE;
}
};
service.addBackend(be);
be.setStarletPort(1001);
service.dropBackend("newHost", 1000, null, null, false);
Backend beIP = service.getBackendWithHeartbeatPort("newHost", 1000);
Assertions.assertNull(beIP);
Config.enable_trace_historical_node = savedConfig;
}
@Test
public void testDropComputeNodeWithoutWarehouse() throws Exception {
new MockUp<RunMode>() {
@Mock
public RunMode getCurrentRunMode() {
return RunMode.SHARED_DATA;
}
};
Boolean savedConfig = Config.enable_trace_historical_node;
Config.enable_trace_historical_node = true;
ComputeNode cn = new ComputeNode(10001, "newHost", 1000);
service.addComputeNode(cn);
LocalMetastore localMetastore = new LocalMetastore(globalStateMgr, null, null);
WarehouseManager warehouseManager = new WarehouseManager();
warehouseManager.initDefaultWarehouse();
new MockUp<GlobalStateMgr>() {
@Mock
public LocalMetastore getLocalMetastore() {
return localMetastore;
}
@Mock
public WarehouseManager getWarehouseMgr() {
return warehouseManager;
}
};
new MockUp<WarehouseManager>() {
@Mock
public ComputeResource acquireComputeResource(CRAcquireContext acquireContext) {
return WarehouseManager.DEFAULT_RESOURCE;
}
};
service.addComputeNode(cn);
cn.setStarletPort(1001);
service.dropComputeNode("newHost", 1000, null, null);
ComputeNode cnIP = service.getComputeNodeWithHeartbeatPort("newHost", 1000);
Assertions.assertNull(cnIP);
Config.enable_trace_historical_node = savedConfig;
}
@Test
public void testDropBackendWithInvalidWarehouse() throws Exception {
new MockUp<RunMode>() {
@Mock
public RunMode getCurrentRunMode() {
return RunMode.SHARED_DATA;
}
};
Boolean savedConfig = Config.enable_trace_historical_node;
Config.enable_trace_historical_node = true;
Backend be = new Backend(10001, "newHost", 1000);
service.addBackend(be);
LocalMetastore localMetastore = new LocalMetastore(globalStateMgr, null, null);
WarehouseManager warehouseManager = new WarehouseManager();
warehouseManager.initDefaultWarehouse();
new MockUp<GlobalStateMgr>() {
@Mock
public LocalMetastore getLocalMetastore() {
return localMetastore;
}
@Mock
public WarehouseManager getWarehouseMgr() {
return warehouseManager;
}
};
new MockUp<WarehouseManager>() {
@Mock
public ComputeResource acquireComputeResource(CRAcquireContext acquireContext) {
return WarehouseManager.DEFAULT_RESOURCE;
}
};
service.addBackend(be);
be.setStarletPort(1001);
Assertions.assertThrows(DdlException.class,
() -> service.dropBackend("newHost", 1000, "not_existed_warehouse", null, false));
Config.enable_trace_historical_node = savedConfig;
}
@Test
public void testDropComputeNodeWithInvalidWarehouse() throws Exception {
new MockUp<RunMode>() {
@Mock
public RunMode getCurrentRunMode() {
return RunMode.SHARED_DATA;
}
};
Boolean savedConfig = Config.enable_trace_historical_node;
Config.enable_trace_historical_node = true;
ComputeNode cn = new ComputeNode(10001, "newHost", 1000);
service.addComputeNode(cn);
LocalMetastore localMetastore = new LocalMetastore(globalStateMgr, null, null);
WarehouseManager warehouseManager = new WarehouseManager();
warehouseManager.initDefaultWarehouse();
new MockUp<GlobalStateMgr>() {
@Mock
public LocalMetastore getLocalMetastore() {
return localMetastore;
}
@Mock
public WarehouseManager getWarehouseMgr() {
return warehouseManager;
}
};
new MockUp<WarehouseManager>() {
@Mock
public ComputeResource acquireComputeResource(CRAcquireContext acquireContext) {
return WarehouseManager.DEFAULT_RESOURCE;
}
};
service.addComputeNode(cn);
cn.setStarletPort(1001);
Assertions.assertThrows(DdlException.class,
() -> service.dropComputeNode("newHost", 1000, "not_existed_warehouse", null));
Config.enable_trace_historical_node = savedConfig;
}
@Test
public void testReplayUpdateHistoricalNode() throws Exception {
new MockUp<RunMode>() {

View File

@ -17,6 +17,8 @@ frontends:
compute_nodes:
- host: 172.26.92.11
heartbeat_service_port: 9050
warehouse: default_warehouse
cngroup: default_cngroup
- host: 172.26.92.12
heartbeat_service_port: 9050