[BugFix] fix query hang because of incorrect scan range delivery (#61562)

Signed-off-by: yan zhang <dirtysalt1987@gmail.com>
This commit is contained in:
yan zhang 2025-08-05 18:35:14 +08:00 committed by GitHub
parent a1921ff837
commit d8ff13b53b
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 39 additions and 20 deletions

View File

@ -981,9 +981,14 @@ Status FragmentExecutor::append_incremental_scan_ranges(ExecEnv* exec_env, const
const TUniqueId& instance_id = params.fragment_instance_id;
QueryContextPtr query_ctx = exec_env->query_context_mgr()->get(query_id);
if (query_ctx == nullptr) return Status::OK();
if (query_ctx == nullptr) {
return Status::InternalError(fmt::format("QueryContext not found for query_id: {}", print_id(query_id)));
}
FragmentContextPtr fragment_ctx = query_ctx->fragment_mgr()->get(instance_id);
if (fragment_ctx == nullptr) return Status::OK();
if (fragment_ctx == nullptr) {
return Status::InternalError(fmt::format("FragmentContext not found for query_id: {}, instance_id: {}",
print_id(query_id), print_id(instance_id)));
}
RuntimeState* runtime_state = fragment_ctx->runtime_state();
std::unordered_set<int> notify_ids;
@ -992,11 +997,15 @@ Status FragmentExecutor::append_incremental_scan_ranges(ExecEnv* exec_env, const
if (scan_ranges.size() == 0) continue;
auto iter = fragment_ctx->morsel_queue_factories().find(node_id);
if (iter == fragment_ctx->morsel_queue_factories().end()) {
continue;
return Status::InternalError(
fmt::format("MorselQueueFactory not found for node_id: {}, query_id: {}, instance_id: {}", node_id,
print_id(query_id), print_id(instance_id)));
}
MorselQueueFactory* morsel_queue_factory = iter->second.get();
if (morsel_queue_factory == nullptr) {
continue;
return Status::InternalError(
fmt::format("MorselQueueFactory is null for node_id: {}, query_id: {}, instance_id: {}", node_id,
print_id(query_id), print_id(instance_id)));
}
RETURN_IF_ERROR(add_scan_ranges_partition_values(runtime_state, scan_ranges));
@ -1012,11 +1021,15 @@ Status FragmentExecutor::append_incremental_scan_ranges(ExecEnv* exec_env, const
for (const auto& [node_id, per_driver_scan_ranges] : params.node_to_per_driver_seq_scan_ranges) {
auto iter = fragment_ctx->morsel_queue_factories().find(node_id);
if (iter == fragment_ctx->morsel_queue_factories().end()) {
continue;
return Status::InternalError(
fmt::format("MorselQueueFactory not found for node_id: {}, query_id: {}, instance_id: {}",
node_id, print_id(query_id), print_id(instance_id)));
}
MorselQueueFactory* morsel_queue_factory = iter->second.get();
if (morsel_queue_factory == nullptr) {
continue;
return Status::InternalError(
fmt::format("MorselQueueFactory is null for node_id: {}, query_id: {}, instance_id: {}",
node_id, print_id(query_id), print_id(instance_id)));
}
bool has_more_morsel = has_more_per_driver_seq_scan_ranges(per_driver_scan_ranges);

View File

@ -15,12 +15,15 @@
package com.starrocks.qe.scheduler.dag;
import com.starrocks.common.StarRocksException;
import com.starrocks.proto.PPlanFragmentCancelReason;
import com.starrocks.qe.scheduler.Coordinator;
import com.starrocks.qe.scheduler.Deployer;
import com.starrocks.qe.scheduler.slot.DeployState;
import com.starrocks.rpc.RpcException;
import com.starrocks.server.GlobalStateMgr;
import com.starrocks.thrift.TUniqueId;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.ArrayList;
import java.util.List;
@ -28,6 +31,7 @@ import java.util.concurrent.ExecutorService;
// all at once execution schedule only schedule once.
public class AllAtOnceExecutionSchedule implements ExecutionSchedule {
private static final Logger LOG = LogManager.getLogger(AllAtOnceExecutionSchedule.class);
private Coordinator coordinator;
private Deployer deployer;
private ExecutionDAG dag;
@ -44,22 +48,21 @@ public class AllAtOnceExecutionSchedule implements ExecutionSchedule {
@Override
public void run() {
if (cancelled) {
return;
}
try {
states = coordinator.assignIncrementalScanRangesToDeployStates(deployer, states);
if (states.isEmpty()) {
return;
while (!cancelled && !states.isEmpty()) {
try {
states = coordinator.assignIncrementalScanRangesToDeployStates(deployer, states);
if (states.isEmpty()) {
return;
}
for (DeployState state : states) {
deployer.deployFragments(state);
}
} catch (StarRocksException | RpcException e) {
LOG.warn("Failed to assign incremental scan ranges to deploy states", e);
coordinator.cancel(PPlanFragmentCancelReason.INTERNAL_ERROR, e.getMessage());
throw new RuntimeException(e);
}
for (DeployState state : states) {
deployer.deployFragments(state);
}
} catch (StarRocksException | RpcException e) {
throw new RuntimeException(e);
}
// jvm should use tail optimization.
start();
}
public void start() {

View File

@ -228,6 +228,9 @@ public class FragmentInstanceExecState {
return get();
}
};
} finally {
serializedRequest = null; // clear serializedRequest after deploy
requestToDeploy = null; // clear requestToDeploy after deploy
}
}