Signed-off-by: yan zhang <dirtysalt1987@gmail.com> Co-authored-by: yan zhang <dirtysalt1987@gmail.com>
This commit is contained in:
parent
6d4fdae2f1
commit
6af0301a52
|
|
@ -80,6 +80,10 @@ public class Tracers {
|
|||
return THREAD_LOCAL.get();
|
||||
}
|
||||
|
||||
public static void set(Tracers tracers) {
|
||||
THREAD_LOCAL.set(tracers);
|
||||
}
|
||||
|
||||
/**
|
||||
* Init tracer with context and mode.
|
||||
* @param context connect context
|
||||
|
|
|
|||
|
|
@ -15,6 +15,8 @@
|
|||
package com.starrocks.qe.scheduler.dag;
|
||||
|
||||
import com.starrocks.common.StarRocksException;
|
||||
import com.starrocks.common.profile.Timer;
|
||||
import com.starrocks.common.profile.Tracers;
|
||||
import com.starrocks.proto.PPlanFragmentCancelReason;
|
||||
import com.starrocks.qe.scheduler.Coordinator;
|
||||
import com.starrocks.qe.scheduler.Deployer;
|
||||
|
|
@ -37,9 +39,28 @@ public class AllAtOnceExecutionSchedule implements ExecutionSchedule {
|
|||
private ExecutionDAG dag;
|
||||
private volatile boolean cancelled = false;
|
||||
|
||||
class TracerContext implements AutoCloseable {
|
||||
Tracers savedTracers;
|
||||
|
||||
TracerContext(Tracers currentTracers) {
|
||||
if (currentTracers != null) {
|
||||
savedTracers = Tracers.get();
|
||||
Tracers.set(currentTracers);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
if (savedTracers != null) {
|
||||
Tracers.set(savedTracers);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
class DeployMoreScanRangesTask implements Runnable {
|
||||
List<DeployState> states;
|
||||
private ExecutorService executorService;
|
||||
Tracers currentTracers;
|
||||
|
||||
DeployMoreScanRangesTask(List<DeployState> states, ExecutorService executorService) {
|
||||
this.states = states;
|
||||
|
|
@ -48,19 +69,23 @@ public class AllAtOnceExecutionSchedule implements ExecutionSchedule {
|
|||
|
||||
@Override
|
||||
public void run() {
|
||||
while (!cancelled && !states.isEmpty()) {
|
||||
try {
|
||||
states = coordinator.assignIncrementalScanRangesToDeployStates(deployer, states);
|
||||
if (states.isEmpty()) {
|
||||
return;
|
||||
try (TracerContext tracerContext = new TracerContext(currentTracers)) {
|
||||
try (Timer timer = Tracers.watchScope(Tracers.Module.SCHEDULER, "DeployMore")) {
|
||||
while (!cancelled && !states.isEmpty()) {
|
||||
try {
|
||||
states = coordinator.assignIncrementalScanRangesToDeployStates(deployer, states);
|
||||
if (states.isEmpty()) {
|
||||
return;
|
||||
}
|
||||
for (DeployState state : states) {
|
||||
deployer.deployFragments(state);
|
||||
}
|
||||
} catch (StarRocksException | RpcException e) {
|
||||
LOG.warn("Failed to assign incremental scan ranges to deploy states", e);
|
||||
coordinator.cancel(PPlanFragmentCancelReason.INTERNAL_ERROR, e.getMessage());
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
for (DeployState state : states) {
|
||||
deployer.deployFragments(state);
|
||||
}
|
||||
} catch (StarRocksException | RpcException e) {
|
||||
LOG.warn("Failed to assign incremental scan ranges to deploy states", e);
|
||||
coordinator.cancel(PPlanFragmentCancelReason.INTERNAL_ERROR, e.getMessage());
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -96,6 +121,10 @@ public class AllAtOnceExecutionSchedule implements ExecutionSchedule {
|
|||
}
|
||||
|
||||
DeployMoreScanRangesTask task = new DeployMoreScanRangesTask(states, executorService);
|
||||
|
||||
if (option.useQueryDeployExecutor) {
|
||||
task.currentTracers = Tracers.get();
|
||||
}
|
||||
task.start();
|
||||
}
|
||||
|
||||
|
|
|
|||
Loading…
Reference in New Issue