[BugFix] Fix data cache bugs with invalid compute resources (#63537)

Signed-off-by: shuming.li <ming.moriarty@gmail.com>
This commit is contained in:
shuming.li 2025-10-11 14:30:30 +08:00 committed by GitHub
parent 1378c139ae
commit fe3bef00d1
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 59 additions and 7 deletions

View File

@ -32,6 +32,7 @@ import com.starrocks.system.SystemInfoService;
import com.starrocks.thrift.TUniqueId;
import com.starrocks.warehouse.Warehouse;
import com.starrocks.warehouse.cngroup.ComputeResource;
import com.starrocks.warehouse.cngroup.ComputeResourceProvider;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@ -42,19 +43,27 @@ public class DataCacheSelectExecutor {
private static final Logger LOG = LogManager.getLogger(DataCacheSelectExecutor.class);
public static DataCacheSelectMetrics cacheSelect(DataCacheSelectStatement statement,
ConnectContext connectContext) throws Exception {
ConnectContext connectContext) throws Exception {
InsertStmt insertStmt = statement.getInsertStmt();
final WarehouseManager warehouseManager = GlobalStateMgr.getCurrentState().getWarehouseMgr();
final Warehouse wh = warehouseManager.getWarehouse(connectContext.getCurrentWarehouseName());
final ComputeResourceProvider computeResourceProvider = warehouseManager.getComputeResourceProvider();
final List<ComputeResource> computeResources = computeResourceProvider.getComputeResources(wh);
List<StmtExecutor> subStmtExecutors = Lists.newArrayList();
for (int workerGroupIdx = 0; workerGroupIdx < wh.getWorkerGroupIds().size(); workerGroupIdx++) {
long workerGroupId = wh.getWorkerGroupIds().get(workerGroupIdx);
ConnectContext subContext = buildCacheSelectConnectContext(statement, connectContext, workerGroupIdx == 0);
ComputeResource computeResource = warehouseManager.getComputeResourceProvider().ofComputeResource(
wh.getId(), workerGroupId);
boolean isFirstSubContext = true;
for (ComputeResource computeResource : computeResources) {
if (!computeResourceProvider.isResourceAvailable(computeResource)) {
// skip if this compute resource is not available
LOG.warn("skip cache select for compute resource {} because it is not available", computeResource);
continue;
}
ConnectContext subContext = buildCacheSelectConnectContext(statement, connectContext, isFirstSubContext);
subContext.setCurrentComputeResource(computeResource);
StmtExecutor subStmtExecutor = StmtExecutor.newInternalExecutor(subContext, insertStmt);
isFirstSubContext = false;
// Register new StmtExecutor into current ConnectContext's StmtExecutor, so we can handle ctrl+c command
// If DataCacheSelect is forward to leader, connectContext's Executor is null
if (connectContext.getExecutor() != null) {

View File

@ -33,6 +33,13 @@ public interface ComputeResourceProvider {
*/
ComputeResource ofComputeResource(long warehouseId, long workGroupId);
/**
* Get all ComputeResources by warehouse.
* @param warehouse the warehouse to get the ComputeResources from
* @return a list of ComputeResources that can be used for compute
*/
List<ComputeResource> getComputeResources(Warehouse warehouse);
/**
* NOTE: prefer to call this infrequently, as it can come to dominate the execution time of a query in the
* frontend if there are many calls per request (e.g. one per partition when there are many partitions).

View File

@ -14,11 +14,12 @@
package com.starrocks.warehouse.cngroup;
import com.google.api.client.util.Lists;
import com.google.common.collect.Lists;
import com.starrocks.common.ErrorCode;
import com.starrocks.common.ErrorReportException;
import com.starrocks.common.StarRocksException;
import com.starrocks.server.GlobalStateMgr;
import com.starrocks.server.RunMode;
import com.starrocks.system.ComputeNode;
import com.starrocks.system.SystemInfoService;
import com.starrocks.warehouse.Warehouse;
@ -47,6 +48,14 @@ public final class WarehouseComputeResourceProvider implements ComputeResourcePr
return WarehouseComputeResource.of(warehouseId);
}
@Override
public List<ComputeResource> getComputeResources(Warehouse warehouse) {
if (warehouse == null) {
throw ErrorReportException.report(ErrorCode.ERR_UNKNOWN_WAREHOUSE, "warehouse is null");
}
return Lists.newArrayList(WarehouseComputeResource.of(warehouse.getId()));
}
@Override
public Optional<ComputeResource> acquireComputeResource(Warehouse warehouse, CRAcquireContext acquireContext) {
final long warehouseId = acquireContext.getWarehouseId();
@ -67,6 +76,9 @@ public final class WarehouseComputeResourceProvider implements ComputeResourcePr
*/
@Override
public boolean isResourceAvailable(ComputeResource computeResource) {
if (!RunMode.isSharedDataMode()) {
return true;
}
try {
final long availableWorkerGroupIdSize =
Optional.ofNullable(getAliveComputeNodes(computeResource)).map(List::size).orElse(0);

View File

@ -43,6 +43,7 @@ import mockit.Mock;
import mockit.MockUp;
import mockit.Mocked;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import java.util.ArrayList;
@ -65,6 +66,16 @@ public class WarehouseManagerTest {
@Mocked
StarOSAgent starOSAgent;
@BeforeAll
public static void setUp() {
new MockUp<RunMode>() {
@Mock
boolean isSharedDataMode() {
return true;
}
};
}
@Test
public void testWarehouseNotExist() {
WarehouseManager mgr = new WarehouseManager();
@ -399,6 +410,12 @@ public class WarehouseManagerTest {
@Test
public void testBackgroundWarehouse() {
new MockUp<WarehouseComputeResourceProvider>() {
@Mock
public boolean isResourceAvailable(ComputeResource computeResource) {
return true;
}
};
WarehouseManager mgr = new WarehouseManager();
mgr.initDefaultWarehouse();
Assertions.assertEquals(WarehouseManager.DEFAULT_WAREHOUSE_ID, mgr.getBackgroundWarehouse(123).getId());

View File

@ -20,6 +20,7 @@ import com.starrocks.persist.ImageWriter;
import com.starrocks.persist.metablock.SRMetaBlockReader;
import com.starrocks.persist.metablock.SRMetaBlockReaderV2;
import com.starrocks.server.GlobalStateMgr;
import com.starrocks.server.RunMode;
import com.starrocks.server.WarehouseManager;
import mockit.Mock;
import mockit.MockUp;
@ -43,6 +44,12 @@ public class HistoricalNodeMgrTest {
@BeforeEach
public void setUp() throws IOException {
new MockUp<RunMode>() {
@Mock
boolean isSharedDataMode() {
return true;
}
};
WarehouseManager warehouseManager = GlobalStateMgr.getCurrentState().getWarehouseMgr();
warehouseManager.initDefaultWarehouse();
}