Compare commits

...

1 Commits

Author SHA1 Message Date
Cursor Agent 5fbba586ab Refactor: Use pipeline engine for all plan fragment executions
This change enforces the use of the pipeline engine for all plan fragment executions, simplifying the codebase and removing legacy non-pipeline execution paths. It also updates the `ExternalScanContextMgr` to use the pipeline engine's cancellation mechanism.

Co-authored-by: huanmingwong <huanmingwong@gmail.com>
2025-09-28 06:12:58 +00:00
4 changed files with 86 additions and 88 deletions

View File

@ -113,7 +113,7 @@ Status ExternalScanContextMgr::clear_scan_context(const std::string& context_id)
}
}
if (context != nullptr) {
// cancel pipeline
// cancel pipeline fragment
const auto& fragment_instance_id = context->fragment_instance_id;
if (auto query_ctx = _exec_env->query_context_mgr()->get(context->query_id); query_ctx != nullptr) {
if (auto fragment_ctx = query_ctx->fragment_mgr()->get(fragment_instance_id); fragment_ctx != nullptr) {
@ -165,10 +165,16 @@ void ExternalScanContextMgr::gc_expired_context() {
}
}
for (const auto& expired_context : expired_contexts) {
// must cancel the fragment instance, otherwise return thrift transport TTransportException
WARN_IF_ERROR(
_exec_env->fragment_mgr()->cancel(expired_context->fragment_instance_id),
strings::Substitute("Fail to cancel fragment $0", print_id(expired_context->fragment_instance_id)));
// must cancel the fragment instance (pipeline)
if (auto query_ctx = _exec_env->query_context_mgr()->get(expired_context->query_id); query_ctx != nullptr) {
if (auto fragment_ctx = query_ctx->fragment_mgr()->get(expired_context->fragment_instance_id);
fragment_ctx != nullptr) {
std::stringstream msg;
msg << "FragmentContext(id=" << print_id(expired_context->fragment_instance_id)
<< ") cancelled by gc_expired_context";
fragment_ctx->cancel(Status::Cancelled(msg.str()));
}
}
WARN_IF_ERROR(_exec_env->result_queue_mgr()->cancel(expired_context->fragment_instance_id),
strings::Substitute("Fail to cancel fragment $0 in result queue mgr",
print_id(expired_context->fragment_instance_id)));

View File

@ -82,75 +82,72 @@ Status StreamLoadExecutor::execute_plan_fragment(StreamLoadContext* ctx) {
LOG(INFO) << "begin to execute job. label=" << ctx->label << ", txn_id: " << ctx->txn_id
<< ", query_id=" << print_id(ctx->put_result.params.params.query_id);
// Once this is added into FragmentMgr, the fragment will be counted during graceful exit.
auto st = _exec_env->fragment_mgr()->exec_plan_fragment(
ctx->put_result.params,
[ctx](PlanFragmentExecutor* executor) {
ctx->runtime_profile = executor->runtime_state()->runtime_profile_ptr();
ctx->query_mem_tracker = executor->runtime_state()->query_mem_tracker_ptr();
ctx->instance_mem_tracker = executor->runtime_state()->instance_mem_tracker_ptr();
},
[ctx](PlanFragmentExecutor* executor) {
ctx->commit_infos = std::move(executor->runtime_state()->tablet_commit_infos());
ctx->fail_infos = std::move(executor->runtime_state()->tablet_fail_infos());
Status status = executor->status();
if (status.ok()) {
ctx->number_total_rows = executor->runtime_state()->num_rows_load_sink() +
executor->runtime_state()->num_rows_load_filtered() +
executor->runtime_state()->num_rows_load_unselected();
ctx->number_loaded_rows = executor->runtime_state()->num_rows_load_sink();
ctx->number_filtered_rows = executor->runtime_state()->num_rows_load_filtered();
ctx->number_unselected_rows = executor->runtime_state()->num_rows_load_unselected();
ctx->loaded_bytes = executor->runtime_state()->num_bytes_load_sink();
// Execute via pipeline FragmentExecutor and set promise on completion
pipeline::FragmentExecutor fragment_executor;
auto st = fragment_executor.prepare(_exec_env, ctx->put_result.params, ctx->put_result.params);
if (st.ok()) {
// Capture runtime_state for metrics after execution; finish is signaled via query pipeline reporter
auto exec_st = fragment_executor.execute(_exec_env);
// collect results and metrics after execution
RuntimeState* rs = _exec_env->query_context_mgr()
->get(ctx->put_result.params.params.query_id)
->fragment_mgr()
->get(ctx->put_result.params.params.fragment_instance_id)
->runtime_state();
ctx->commit_infos = std::move(rs->tablet_commit_infos());
ctx->fail_infos = std::move(rs->tablet_fail_infos());
Status status = exec_st;
if (status.ok()) {
ctx->number_total_rows = rs->num_rows_load_sink() + rs->num_rows_load_filtered() + rs->num_rows_load_unselected();
ctx->number_loaded_rows = rs->num_rows_load_sink();
ctx->number_filtered_rows = rs->num_rows_load_filtered();
ctx->number_unselected_rows = rs->num_rows_load_unselected();
ctx->loaded_bytes = rs->num_bytes_load_sink();
int64_t num_selected_rows = ctx->number_total_rows - ctx->number_unselected_rows;
if ((double)ctx->number_filtered_rows / num_selected_rows > ctx->max_filter_ratio) {
// NOTE: Do not modify the error message here, for historical
// reasons,
// some users may rely on this error message.
status = Status::InternalError("too many filtered rows");
}
if (status.ok()) {
StarRocksMetrics::instance()->stream_receive_bytes_total.increment(ctx->total_receive_bytes);
StarRocksMetrics::instance()->stream_load_rows_total.increment(ctx->number_loaded_rows);
}
} else {
LOG(WARNING) << "fragment execute failed"
<< ", query_id=" << UniqueId(ctx->put_result.params.params.query_id)
<< ", err_msg=" << status.message() << ", " << ctx->brief();
// cancel body_sink, make sender known it
if (ctx->body_sink != nullptr) {
ctx->body_sink->cancel(status);
}
switch (ctx->load_src_type) {
// reset the stream load ctx's kafka commit offset
case TLoadSourceType::KAFKA:
ctx->kafka_info->reset_offset();
break;
case TLoadSourceType::PULSAR:
ctx->pulsar_info->clear_backlog();
break;
default:
break;
}
}
ctx->write_data_cost_nanos = MonotonicNanos() - ctx->start_write_data_nanos;
ctx->promise.set_value(status);
if (!executor->runtime_state()->get_error_log_file_path().empty()) {
ctx->error_url = to_load_error_http_path(executor->runtime_state()->get_error_log_file_path());
}
if (!executor->runtime_state()->get_rejected_record_file_path().empty()) {
ctx->rejected_record_path = fmt::format("{}:{}", BackendOptions::get_localBackend().host,
executor->runtime_state()->get_rejected_record_file_path());
}
if (ctx->unref()) {
delete ctx;
}
});
int64_t num_selected_rows = ctx->number_total_rows - ctx->number_unselected_rows;
if (num_selected_rows > 0 && (double)ctx->number_filtered_rows / num_selected_rows > ctx->max_filter_ratio) {
status = Status::InternalError("too many filtered rows");
}
if (status.ok()) {
StarRocksMetrics::instance()->stream_receive_bytes_total.increment(ctx->total_receive_bytes);
StarRocksMetrics::instance()->stream_load_rows_total.increment(ctx->number_loaded_rows);
}
} else {
LOG(WARNING) << "fragment execute failed"
<< ", query_id=" << UniqueId(ctx->put_result.params.params.query_id)
<< ", err_msg=" << status.message() << ", " << ctx->brief();
if (ctx->body_sink != nullptr) {
ctx->body_sink->cancel(status);
}
switch (ctx->load_src_type) {
case TLoadSourceType::KAFKA:
ctx->kafka_info->reset_offset();
break;
case TLoadSourceType::PULSAR:
ctx->pulsar_info->clear_backlog();
break;
default:
break;
}
}
ctx->write_data_cost_nanos = MonotonicNanos() - ctx->start_write_data_nanos;
if (!rs->get_error_log_file_path().empty()) {
ctx->error_url = to_load_error_http_path(rs->get_error_log_file_path());
}
if (!rs->get_rejected_record_file_path().empty()) {
ctx->rejected_record_path = fmt::format("{}:{}", BackendOptions::get_localBackend().host,
rs->get_rejected_record_file_path());
}
ctx->promise.set_value(status);
if (ctx->unref()) {
delete ctx;
}
return Status::OK();
}
if (ctx->unref()) {
delete ctx;
}
return st;
if (!st.ok()) {
if (ctx->unref()) {
delete ctx;

View File

@ -95,7 +95,13 @@ Status BackendServiceBase::start_plan_fragment_execution(const TExecPlanFragment
if (!exec_params.fragment.__isset.output_sink) {
return Status::InternalError("missing sink in plan fragment");
}
return _exec_env->fragment_mgr()->exec_plan_fragment(exec_params);
// Always execute with pipeline engine
pipeline::FragmentExecutor fragment_executor;
auto status = fragment_executor.prepare(_exec_env, exec_params, exec_params);
if (status.ok()) {
return fragment_executor.execute(_exec_env);
}
return status.is_duplicate_rpc_invocation() ? Status::OK() : status;
}
void BackendServiceBase::cancel_plan_fragment(TCancelPlanFragmentResult& return_val,

View File

@ -470,23 +470,12 @@ Status PInternalServiceImplBase<T>::_exec_plan_fragment(brpc::Controller* cntl,
MAX_CHUNK_SIZE, batch_size));
}
bool is_pipeline = t_request.__isset.is_pipeline && t_request.is_pipeline;
bool is_pipeline = t_request.__isset.is_pipeline ? t_request.is_pipeline : true;
VLOG(1) << "exec plan fragment, fragment_instance_id=" << print_id(t_request.params.fragment_instance_id)
<< ", coord=" << t_request.coord << ", backend=" << t_request.backend_num << ", is_pipeline=" << is_pipeline
<< ", chunk_size=" << t_request.query_options.batch_size;
if (is_pipeline) {
return _exec_plan_fragment_by_pipeline(t_request, t_request);
} else {
bool has_schema_table_sink = t_request.__isset.fragment && t_request.fragment.__isset.output_sink &&
t_request.fragment.output_sink.type == TDataSinkType::SCHEMA_TABLE_SINK;
// SchemaTableSink is not supported on the Pipeline engine, we have to allow it to be executed on non-pipeline engine temporarily,
// this will be removed in the future.
if (has_schema_table_sink) {
return _exec_plan_fragment_by_non_pipeline(t_request);
}
return Status::InvalidArgument(
"non-pipeline engine is no longer supported since 3.2, please set enable_pipeline_engine=true.");
}
// Force pipeline execution path
return _exec_plan_fragment_by_pipeline(t_request, t_request);
}
template <typename T>