[BugFix] Correct add query context to context conditions (backport #61929) (#61945)

Signed-off-by: stdpain <drfeng08@gmail.com>
Co-authored-by: stdpain <34912776+stdpain@users.noreply.github.com>
This commit is contained in:
mergify[bot] 2025-08-15 14:00:14 +08:00 committed by GitHub
parent f571bb1ac0
commit 3708c97461
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 14 additions and 6 deletions

View File

@ -85,7 +85,7 @@ QueryContext::~QueryContext() noexcept {
}
}
void QueryContext::count_down_fragments() {
void QueryContext::count_down_fragments(QueryContextManager* query_context_mgr) {
size_t old = _num_active_fragments.fetch_sub(1);
DCHECK_GE(old, 1);
bool all_fragments_finished = old == 1;
@ -95,7 +95,7 @@ void QueryContext::count_down_fragments() {
// Acquire the pointer to avoid be released when removing query
auto query_trace = shared_query_trace();
ExecEnv::GetInstance()->query_context_mgr()->remove(_query_id);
query_context_mgr->remove(_query_id);
// @TODO(silverbullet233): if necessary, remove the dump from the execution thread
// considering that this feature is generally used for debugging,
// I think it should not have a big impact now
@ -104,6 +104,10 @@ void QueryContext::count_down_fragments() {
}
}
void QueryContext::count_down_fragments() {
return this->count_down_fragments(ExecEnv::GetInstance()->query_context_mgr());
}
FragmentContextManager* QueryContext::fragment_mgr() {
return _fragment_mgr.get();
}
@ -429,7 +433,7 @@ StatusOr<QueryContext*> QueryContextManager::get_or_register(const TUniqueId& qu
// because the operator is still executing.
// We need to wait until the fragment execution is complete,
// then call QueryContextManager::remove to safely remove this query context.
if (cancel_status.ok() || ctx->has_no_active_instances()) {
if (cancel_status.ok() || !ctx->has_no_active_instances()) {
context_map.emplace(query_id, std::move(ctx));
}
RETURN_IF_ERROR(cancel_status);

View File

@ -71,6 +71,7 @@ public:
}
void count_down_fragments();
void count_down_fragments(QueryContextManager* query_context_mgr);
int num_active_fragments() const { return _num_active_fragments.load(); }
bool has_no_active_instances() { return _num_active_fragments.load() == 0; }

View File

@ -186,11 +186,14 @@ TEST(QueryContextManagerTest, testSingleThreadOperations) {
query_ctx->extend_query_lifetime();
query_ctx->init_mem_tracker(parent_mem_tracker->limit(), parent_mem_tracker.get());
// port query_ctx to second map
query_ctx->count_down_fragments();
query_ctx->count_down_fragments(query_ctx_mgr.get());
// Single thread cannot reproduce query context registration success,
// while this query context is in the second case. So let's simulate it here.
query_ctx->increment_num_fragments();
ASSIGN_OR_ASSERT_FAIL(auto* query_ctx1, query_ctx_mgr->get_or_register(query_id));
// cancel
query_ctx1->cancel(Status::Cancelled("cannelled"), true);
query_ctx->cancel(Status::Cancelled("cannelled"), true);
ASSERT_TRUE(query_ctx_mgr->get_or_register(query_id).status().is_cancelled());