[Enhancement] Improve fragment instance exec state report (backport #63132) (#63190)

Signed-off-by: wyb <wybb86@gmail.com>
Co-authored-by: wyb <wybb86@gmail.com>
This commit is contained in:
mergify[bot] 2025-09-16 10:17:10 +00:00 committed by GitHub
parent c462f0779d
commit 00d76ad660
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 24 additions and 14 deletions

View File

@ -356,9 +356,10 @@ void GlobalDriverExecutor::report_exec_state(QueryContext* query_ctx, FragmentCo
}
auto exec_env = fragment_ctx->runtime_state()->exec_env();
auto fragment_id = fragment_ctx->fragment_instance_id();
auto instance_id = fragment_ctx->fragment_instance_id();
auto query_id = fragment_ctx->query_id();
auto report_task = [params, exec_env, fe_addr, fragment_id]() {
auto report_task = [params, exec_env, fe_addr, instance_id, query_id]() {
int retry_times = 0;
int max_retry_times = config::report_exec_rpc_request_retry_num;
while (retry_times++ < max_retry_times) {
@ -372,20 +373,20 @@ void GlobalDriverExecutor::report_exec_state(QueryContext* query_ctx, FragmentCo
if (!status.ok()) {
if (status.is_not_found()) {
VLOG(1) << "[Driver] Fail to report exec state due to query not found: fragment_instance_id="
<< print_id(fragment_id);
VLOG(1) << "[Driver] Fail to report exec state due to query not found. fragment_instance_id="
<< print_id(instance_id) << ", query_id=" << print_id(query_id);
} else {
LOG(WARNING) << "[Driver] Fail to report exec state: fragment_instance_id=" << print_id(fragment_id)
<< ", status: " << status.to_string() << ", retry_times=" << retry_times
<< ", max_retry_times=" << max_retry_times;
LOG(WARNING) << "[Driver] Fail to report exec state. fragment_instance_id=" << print_id(instance_id)
<< ", query_id=" << print_id(query_id) << ", status: " << status.to_string()
<< ", retry_times=" << retry_times << ", max_retry_times=" << max_retry_times;
// if it is done exec state report, we should retry
if (params->__isset.done && params->done) {
continue;
}
}
} else {
VLOG(1) << "[Driver] Succeed to report exec state: fragment_instance_id=" << print_id(fragment_id)
<< ", is_done=" << params->done;
VLOG(1) << "[Driver] Succeed to report exec state. fragment_instance_id=" << print_id(instance_id)
<< ", query_id=" << print_id(query_id) << ", is_done=" << params->done;
}
break;
}
@ -393,10 +394,11 @@ void GlobalDriverExecutor::report_exec_state(QueryContext* query_ctx, FragmentCo
// if it is done exec state report, We need to ensure that this report is executed with priority
// and is retried as much as possible to ensure success.
// Otherwise, it may result in the query or ingestion status getting stuck.
this->_exec_state_reporter->submit(std::move(report_task), done);
VLOG(2) << "[Driver] Submit exec state report task: fragment_instance_id=" << print_id(fragment_id)
<< ", is_done=" << done;
// Otherwise, it may result in the ingestion status getting stuck.
bool priority = done && fragment_ctx->runtime_state()->query_options().query_type == TQueryType::LOAD;
this->_exec_state_reporter->submit(std::move(report_task), priority);
VLOG(2) << "[Driver] Submit exec state report task. fragment_instance_id=" << print_id(instance_id)
<< ", query_id=" << print_id(query_id) << ", is_done=" << done;
}
void GlobalDriverExecutor::report_audit_statistics(QueryContext* query_ctx, FragmentContext* fragment_ctx) {

View File

@ -1060,6 +1060,10 @@ public class DefaultCoordinator extends Coordinator {
// count down to zero to notify all objects waiting for this
if (!connectContext.isProfileEnabled()) {
queryProfile.finishAllInstances(Status.OK);
List<String> unFinishedInstanceIds = queryProfile.getUnfinishedInstanceIds();
if (!unFinishedInstanceIds.isEmpty()) {
LOG.info("query: {} has unfinished instances: {}", connectContext.queryId, unFinishedInstanceIds);
}
}
}
}

View File

@ -684,7 +684,11 @@ public class QueryRuntimeProfile {
return matcher.matches() ? Optional.of(matcher.group(1)) : Optional.empty();
}
private List<String> getUnfinishedInstanceIds() {
public List<String> getUnfinishedInstanceIds() {
if (profileDoneSignal == null) {
return Lists.newArrayList();
}
return profileDoneSignal.getLeftMarks().stream()
.map(Map.Entry::getKey)
.map(DebugUtil::printId)