[Enhancement] add a session variable to deploy scan ranges back/foreground (backport #62291) (#62320)

Signed-off-by: yan zhang <dirtysalt1987@gmail.com>
Co-authored-by: yan zhang <dirtysalt1987@gmail.com>
This commit is contained in:
mergify[bot] 2025-08-26 06:59:50 +00:00 committed by GitHub
parent 4efe5dbf02
commit ee253d78df
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 17 additions and 18 deletions

View File

@ -902,6 +902,8 @@ public class SessionVariable implements Serializable, Writable, Cloneable {
public static final String ENABLE_CONNECTOR_INCREMENTAL_SCAN_RANGES = "enable_connector_incremental_scan_ranges";
public static final String CONNECTOR_INCREMENTAL_SCAN_RANGE_SIZE = "connector_incremental_scan_ranges_size";
public static final String ENABLE_CONNECTOR_ASYNC_LIST_PARTITIONS = "enable_connector_async_list_partitions";
public static final String ENABLE_CONNECTOR_DEPLOY_SCAN_RANGES_BACKGROUND =
"enable_connector_deploy_scan_ranges_background";
public static final String ENABLE_PLAN_ANALYZER = "enable_plan_analyzer";
public static final String ENABLE_PLAN_ADVISOR = "enable_plan_advisor";
@ -1264,7 +1266,6 @@ public class SessionVariable implements Serializable, Writable, Cloneable {
@VarAttr(name = CBO_JSON_V2_DICT_OPT)
private boolean cboJSONV2DictOpt = true;
/*
* the parallel exec instance num for one Fragment in one BE
* 1 means disable this feature
@ -2660,6 +2661,9 @@ public class SessionVariable implements Serializable, Writable, Cloneable {
@VarAttr(name = ENABLE_CONNECTOR_INCREMENTAL_SCAN_RANGES)
private boolean enableConnectorIncrementalScanRanges = true;
@VarAttr(name = ENABLE_CONNECTOR_DEPLOY_SCAN_RANGES_BACKGROUND)
private boolean enableConnectorDeployScanRangesBackground = true;
@VarAttr(name = CONNECTOR_INCREMENTAL_SCAN_RANGE_SIZE)
private int connectorIncrementalScanRangeSize = 500;
@ -5079,6 +5083,10 @@ public class SessionVariable implements Serializable, Writable, Cloneable {
return enableConnectorIncrementalScanRanges;
}
public boolean isEnableConnectorDeployScanRangesBackground() {
return enableConnectorDeployScanRangesBackground;
}
public boolean isEnableConnectorAsyncListPartitions() {
return enableConnectorAsyncListPartitions;
}
@ -5254,7 +5262,7 @@ public class SessionVariable implements Serializable, Writable, Cloneable {
public void setEnableJSONV2Rewrite(boolean enableJSONV2Rewrite) {
this.cboJSONV2Rewrite = enableJSONV2Rewrite;
}
public boolean isEnableDropTableCheckMvDependency() {
return enableDropTableCheckMvDependency;
}
@ -5464,9 +5472,6 @@ public class SessionVariable implements Serializable, Writable, Cloneable {
return root.toString();
}
private void readFromJson(DataInput in) throws IOException {
String json = Text.readString(in);
replayFromJson(json);

View File

@ -1401,7 +1401,7 @@ public class StmtExecutor {
return;
}
coord.execWithQueryDeployExecutor();
coord.execWithQueryDeployExecutor(context);
coord.setTopProfileSupplier(this::buildTopLevelProfile);
coord.setExecPlan(execPlan);

View File

@ -107,9 +107,9 @@ public abstract class Coordinator {
startScheduling(option);
}
public void execWithQueryDeployExecutor() throws Exception {
public void execWithQueryDeployExecutor(ConnectContext context) throws Exception {
ScheduleOption option = new ScheduleOption();
option.useQueryDeployExecutor = true;
option.useQueryDeployExecutor = context.getSessionVariable().isEnableConnectorDeployScanRangesBackground();
startScheduling(option);
}

View File

@ -72,9 +72,7 @@ public class AllAtOnceExecutionSchedule implements ExecutionSchedule {
return;
}
try (TracerContext tracerContext = new TracerContext(currentTracers)) {
try (Timer timer = Tracers.watchScope(Tracers.Module.SCHEDULER, "DeployScanRanges")) {
runOnce();
}
runOnce();
}
// If run in the executor service, we need to start the next turn.
// To submit this task again so all queries could get the same opportunity to run by queueing up
@ -82,7 +80,7 @@ public class AllAtOnceExecutionSchedule implements ExecutionSchedule {
}
public void runOnce() {
try {
try (Timer timer = Tracers.watchScope(Tracers.Module.SCHEDULER, "DeployScanRanges")) {
states = coordinator.assignIncrementalScanRangesToDeployStates(deployer, states);
if (states.isEmpty()) {
return;
@ -119,7 +117,7 @@ public class AllAtOnceExecutionSchedule implements ExecutionSchedule {
@Override
public void schedule(Coordinator.ScheduleOption option) throws RpcException, StarRocksException {
List<DeployState> states = new ArrayList<>();
List<DeployState> states = new ArrayList<>();
for (List<ExecutionFragment> executionFragments : dag.getFragmentsInTopologicalOrderFromRoot()) {
final DeployState deployState = deployer.createFragmentExecStates(executionFragments);
deployer.deployFragments(deployState);
@ -130,16 +128,12 @@ public class AllAtOnceExecutionSchedule implements ExecutionSchedule {
if (option.useQueryDeployExecutor) {
deployScanRangesTask.executorService = GlobalStateMgr.getCurrentState().getQueryDeployExecutor();
deployScanRangesTask.currentTracers = Tracers.get();
} else {
deployScanRangesTask.start();
}
}
@Override
public void continueSchedule(Coordinator.ScheduleOption option) throws RpcException, StarRocksException {
if (option.useQueryDeployExecutor) {
deployScanRangesTask.start();
}
deployScanRangesTask.start();
}
@Override