[BugFix] fix deploy serialize pool block (#61150)

Signed-off-by: Seaven <seaven_7@qq.com>
This commit is contained in:
Seaven 2025-07-25 10:53:40 +08:00 committed by GitHub
parent 72488b08c9
commit 8a4ef5c2f1
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 59 additions and 12 deletions

View File

@ -3792,4 +3792,18 @@ public class Config extends ConfigBase {
*/
@ConfField(mutable = true)
public static boolean enable_trace_historical_node = false;
/**
* The size of the thread pool for deploy serialization.
* If set to -1, it means same as cpu core number.
*/
@ConfField
public static int deploy_serialization_thread_pool_size = -1;
/**
* The size of the queue for deploy serialization thread pool.
* If set to -1, it means same as cpu core number * 2.
*/
@ConfField
public static int deploy_serialization_queue_size = -1;
}

View File

@ -4787,6 +4787,10 @@ public class SessionVariable implements Serializable, Writable, Cloneable {
return enablePlanSerializeConcurrently;
}
public void setEnablePlanSerializeConcurrently(boolean enablePlanSerializeConcurrently) {
this.enablePlanSerializeConcurrently = enablePlanSerializeConcurrently;
}
public long getCrossJoinCostPenalty() {
return crossJoinCostPenalty;
}

View File

@ -17,11 +17,13 @@ package com.starrocks.qe.scheduler;
import com.google.api.client.util.Sets;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.starrocks.common.Config;
import com.starrocks.common.StarRocksException;
import com.starrocks.common.Status;
import com.starrocks.common.ThreadPoolManager;
import com.starrocks.common.profile.Timer;
import com.starrocks.common.profile.Tracers;
import com.starrocks.common.util.DebugUtil;
import com.starrocks.qe.ConnectContext;
import com.starrocks.qe.ExecuteExceptionHandler;
import com.starrocks.qe.scheduler.dag.ExecutionDAG;
@ -42,13 +44,16 @@ import org.apache.logging.log4j.Logger;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import static com.starrocks.qe.scheduler.dag.FragmentInstanceExecState.DeploymentResult;
@ -58,17 +63,24 @@ import static com.starrocks.qe.scheduler.dag.FragmentInstanceExecState.Deploymen
*/
public class Deployer {
private static final Logger LOG = LogManager.getLogger(Deployer.class);
private static final ThreadPoolExecutor EXECUTOR =
ThreadPoolManager.newDaemonCacheThreadPool(ThreadPoolManager.cpuCores(),
Integer.MAX_VALUE, "deployer", true);
private static final ThreadPoolExecutor EXECUTOR;
static {
int threadPoolSize = Math.max(ThreadPoolManager.cpuCores(), Config.deploy_serialization_thread_pool_size);
int threadPoolQueueSize = Math.max(threadPoolSize * 2, Config.deploy_serialization_queue_size);
EXECUTOR = ThreadPoolManager.newDaemonThreadPool(1, threadPoolSize, 60, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(threadPoolQueueSize), new ThreadPoolExecutor.AbortPolicy(),
"deployer", true);
}
private final ConnectContext context;
private final JobSpec jobSpec;
private final ExecutionDAG executionDAG;
private final TFragmentInstanceFactory tFragmentInstanceFactory;
private final TDescriptorTable emptyDescTable;
private final long deliveryTimeoutMs;
private boolean enablePlanSerializeConcurrently;
private final boolean enablePlanSerializeConcurrently;
private final FailureHandler failureHandler;
private final boolean needDeploy;
@ -81,6 +93,7 @@ public class Deployer {
TNetworkAddress coordAddress,
FailureHandler failureHandler,
boolean needDeploy) {
this.context = context;
this.jobSpec = jobSpec;
this.executionDAG = executionDAG;
@ -116,14 +129,29 @@ public class Deployer {
if (enablePlanSerializeConcurrently) {
try (Timer ignored = Tracers.watchScope(Tracers.Module.SCHEDULER, "DeploySerializeConcurrencyTime")) {
List<Future<?>> futures = new LinkedList<>();
threeStageExecutionsToDeploy.forEach(
executions -> executions.forEach(e ->
futures.add(EXECUTOR.submit(e::serializeRequest))
)
);
int count = threeStageExecutionsToDeploy.stream().mapToInt(List::size).sum();
List<Future<?>> futures = new ArrayList<>(count + 1);
for (List<FragmentInstanceExecState> execStates : threeStageExecutionsToDeploy) {
for (FragmentInstanceExecState execState : execStates) {
try {
Future<?> f = EXECUTOR.submit(execState::serializeRequest);
futures.add(f);
} catch (RejectedExecutionException e) {
// If the thread pool is full, we will serialize the request in the current thread.
}
}
}
for (Future<?> future : futures) {
future.get();
try {
future.get(2, TimeUnit.SECONDS);
} catch (TimeoutException e) {
LOG.warn("Slow serialize request, query: {}", DebugUtil.printId(context.getQueryId()));
}
}
for (Future<?> future : futures) {
if (!future.isDone()) {
future.get();
}
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();

View File

@ -118,6 +118,7 @@ public class StatisticUtils {
context.getSessionVariable().setEnablePipelineEngine(true);
context.getSessionVariable().setCboCteReuse(true);
context.getSessionVariable().setCboCTERuseRatio(0);
context.getSessionVariable().setEnablePlanSerializeConcurrently(false);
WarehouseManager manager = GlobalStateMgr.getCurrentState().getWarehouseMgr();
Warehouse warehouse = manager.getBackgroundWarehouse();