Signed-off-by: stdpain <drfeng08@gmail.com> Co-authored-by: stdpain <34912776+stdpain@users.noreply.github.com>
This commit is contained in:
parent
9837153661
commit
1cf54d7670
|
|
@ -18,6 +18,7 @@
|
|||
#include <vector>
|
||||
|
||||
#include "agent/master_info.h"
|
||||
#include "common/status.h"
|
||||
#include "exec/pipeline/fragment_context.h"
|
||||
#include "exec/pipeline/pipeline_fwd.h"
|
||||
#include "exec/pipeline/scan/connector_scan_operator.h"
|
||||
|
|
@ -29,6 +30,7 @@
|
|||
#include "runtime/exec_env.h"
|
||||
#include "runtime/query_statistics.h"
|
||||
#include "runtime/runtime_filter_cache.h"
|
||||
#include "util/defer_op.h"
|
||||
#include "util/thread.h"
|
||||
#include "util/thrift_rpc_helper.h"
|
||||
|
||||
|
|
@ -417,10 +419,20 @@ StatusOr<QueryContext*> QueryContextManager::get_or_register(const TUniqueId& qu
|
|||
// lookup query context for the second chance in sc_map
|
||||
if (sc_it != sc_map.end()) {
|
||||
auto ctx = std::move(sc_it->second);
|
||||
sc_map.erase(sc_it);
|
||||
RETURN_CANCELLED_STATUS_IF_CTX_CANCELLED(ctx);
|
||||
auto* raw_ctx_ptr = ctx.get();
|
||||
context_map.emplace(query_id, std::move(ctx));
|
||||
sc_map.erase(sc_it);
|
||||
auto cancel_status = [ctx]() -> Status {
|
||||
RETURN_CANCELLED_STATUS_IF_CTX_CANCELLED(ctx);
|
||||
return Status::OK();
|
||||
}();
|
||||
// If there are still active fragments, we cannot directly remove the query context
|
||||
// because the operator is still executing.
|
||||
// We need to wait until the fragment execution is complete,
|
||||
// then call QueryContextManager::remove to safely remove this query context.
|
||||
if (cancel_status.ok() || ctx->has_no_active_instances()) {
|
||||
context_map.emplace(query_id, std::move(ctx));
|
||||
}
|
||||
RETURN_IF_ERROR(cancel_status);
|
||||
return raw_ctx_ptr;
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -171,6 +171,31 @@ TEST(QueryContextManagerTest, testSingleThreadOperations) {
|
|||
sleep(2);
|
||||
ASSERT_TRUE(query_ctx_mgr->get(query_id) == nullptr);
|
||||
}
|
||||
|
||||
{
|
||||
auto query_ctx_mgr = std::make_shared<QueryContextManager>(6);
|
||||
ASSERT_TRUE(query_ctx_mgr->init().ok());
|
||||
TUniqueId query_id;
|
||||
query_id.hi = 100;
|
||||
query_id.lo = 3;
|
||||
ASSIGN_OR_ASSERT_FAIL(auto* query_ctx, query_ctx_mgr->get_or_register(query_id));
|
||||
query_ctx->set_total_fragments(8);
|
||||
query_ctx->set_delivery_expire_seconds(60);
|
||||
query_ctx->set_query_expire_seconds(300);
|
||||
query_ctx->extend_delivery_lifetime();
|
||||
query_ctx->extend_query_lifetime();
|
||||
query_ctx->init_mem_tracker(parent_mem_tracker->limit(), parent_mem_tracker.get());
|
||||
// port query_ctx to second map
|
||||
query_ctx->count_down_fragments();
|
||||
|
||||
ASSIGN_OR_ASSERT_FAIL(auto* query_ctx1, query_ctx_mgr->get_or_register(query_id));
|
||||
// cancel
|
||||
query_ctx1->cancel(Status::Cancelled("cannelled"), true);
|
||||
|
||||
ASSERT_TRUE(query_ctx_mgr->get_or_register(query_id).status().is_cancelled());
|
||||
|
||||
ASSERT_TRUE(query_ctx_mgr->get(query_id) != nullptr);
|
||||
}
|
||||
}
|
||||
|
||||
TEST(QueryContextManagerTest, testMulitiThreadOperations) {
|
||||
|
|
|
|||
Loading…
Reference in New Issue