diff --git a/be/src/runtime/external_scan_context_mgr.cpp b/be/src/runtime/external_scan_context_mgr.cpp index 017141b2571..6cd9d6f469d 100644 --- a/be/src/runtime/external_scan_context_mgr.cpp +++ b/be/src/runtime/external_scan_context_mgr.cpp @@ -113,7 +113,7 @@ Status ExternalScanContextMgr::clear_scan_context(const std::string& context_id) } } if (context != nullptr) { - // cancel pipeline + // cancel pipeline fragment const auto& fragment_instance_id = context->fragment_instance_id; if (auto query_ctx = _exec_env->query_context_mgr()->get(context->query_id); query_ctx != nullptr) { if (auto fragment_ctx = query_ctx->fragment_mgr()->get(fragment_instance_id); fragment_ctx != nullptr) { @@ -165,10 +165,16 @@ void ExternalScanContextMgr::gc_expired_context() { } } for (const auto& expired_context : expired_contexts) { - // must cancel the fragment instance, otherwise return thrift transport TTransportException - WARN_IF_ERROR( - _exec_env->fragment_mgr()->cancel(expired_context->fragment_instance_id), - strings::Substitute("Fail to cancel fragment $0", print_id(expired_context->fragment_instance_id))); + // must cancel the fragment instance (pipeline) + if (auto query_ctx = _exec_env->query_context_mgr()->get(expired_context->query_id); query_ctx != nullptr) { + if (auto fragment_ctx = query_ctx->fragment_mgr()->get(expired_context->fragment_instance_id); + fragment_ctx != nullptr) { + std::stringstream msg; + msg << "FragmentContext(id=" << print_id(expired_context->fragment_instance_id) + << ") cancelled by gc_expired_context"; + fragment_ctx->cancel(Status::Cancelled(msg.str())); + } + } WARN_IF_ERROR(_exec_env->result_queue_mgr()->cancel(expired_context->fragment_instance_id), strings::Substitute("Fail to cancel fragment $0 in result queue mgr", print_id(expired_context->fragment_instance_id))); diff --git a/be/src/runtime/stream_load/stream_load_executor.cpp b/be/src/runtime/stream_load/stream_load_executor.cpp index b684ab4f7f7..b11cd0634c8 100644 --- a/be/src/runtime/stream_load/stream_load_executor.cpp +++ b/be/src/runtime/stream_load/stream_load_executor.cpp @@ -82,75 +82,72 @@ Status StreamLoadExecutor::execute_plan_fragment(StreamLoadContext* ctx) { LOG(INFO) << "begin to execute job. label=" << ctx->label << ", txn_id: " << ctx->txn_id << ", query_id=" << print_id(ctx->put_result.params.params.query_id); // Once this is added into FragmentMgr, the fragment will be counted during graceful exit. - auto st = _exec_env->fragment_mgr()->exec_plan_fragment( - ctx->put_result.params, - [ctx](PlanFragmentExecutor* executor) { - ctx->runtime_profile = executor->runtime_state()->runtime_profile_ptr(); - ctx->query_mem_tracker = executor->runtime_state()->query_mem_tracker_ptr(); - ctx->instance_mem_tracker = executor->runtime_state()->instance_mem_tracker_ptr(); - }, - [ctx](PlanFragmentExecutor* executor) { - ctx->commit_infos = std::move(executor->runtime_state()->tablet_commit_infos()); - ctx->fail_infos = std::move(executor->runtime_state()->tablet_fail_infos()); - Status status = executor->status(); - if (status.ok()) { - ctx->number_total_rows = executor->runtime_state()->num_rows_load_sink() + - executor->runtime_state()->num_rows_load_filtered() + - executor->runtime_state()->num_rows_load_unselected(); - ctx->number_loaded_rows = executor->runtime_state()->num_rows_load_sink(); - ctx->number_filtered_rows = executor->runtime_state()->num_rows_load_filtered(); - ctx->number_unselected_rows = executor->runtime_state()->num_rows_load_unselected(); - ctx->loaded_bytes = executor->runtime_state()->num_bytes_load_sink(); + // Execute via pipeline FragmentExecutor and set promise on completion + pipeline::FragmentExecutor fragment_executor; + auto st = fragment_executor.prepare(_exec_env, ctx->put_result.params, ctx->put_result.params); + if (st.ok()) { + // Capture runtime_state for metrics after execution; finish is signaled via query pipeline reporter + auto exec_st = fragment_executor.execute(_exec_env); + // collect results and metrics after execution + RuntimeState* rs = _exec_env->query_context_mgr() + ->get(ctx->put_result.params.params.query_id) + ->fragment_mgr() + ->get(ctx->put_result.params.params.fragment_instance_id) + ->runtime_state(); + ctx->commit_infos = std::move(rs->tablet_commit_infos()); + ctx->fail_infos = std::move(rs->tablet_fail_infos()); + Status status = exec_st; + if (status.ok()) { + ctx->number_total_rows = rs->num_rows_load_sink() + rs->num_rows_load_filtered() + rs->num_rows_load_unselected(); + ctx->number_loaded_rows = rs->num_rows_load_sink(); + ctx->number_filtered_rows = rs->num_rows_load_filtered(); + ctx->number_unselected_rows = rs->num_rows_load_unselected(); + ctx->loaded_bytes = rs->num_bytes_load_sink(); - int64_t num_selected_rows = ctx->number_total_rows - ctx->number_unselected_rows; - if ((double)ctx->number_filtered_rows / num_selected_rows > ctx->max_filter_ratio) { - // NOTE: Do not modify the error message here, for historical - // reasons, - // some users may rely on this error message. - status = Status::InternalError("too many filtered rows"); - } - - if (status.ok()) { - StarRocksMetrics::instance()->stream_receive_bytes_total.increment(ctx->total_receive_bytes); - StarRocksMetrics::instance()->stream_load_rows_total.increment(ctx->number_loaded_rows); - } - } else { - LOG(WARNING) << "fragment execute failed" - << ", query_id=" << UniqueId(ctx->put_result.params.params.query_id) - << ", err_msg=" << status.message() << ", " << ctx->brief(); - // cancel body_sink, make sender known it - if (ctx->body_sink != nullptr) { - ctx->body_sink->cancel(status); - } - - switch (ctx->load_src_type) { - // reset the stream load ctx's kafka commit offset - case TLoadSourceType::KAFKA: - ctx->kafka_info->reset_offset(); - break; - case TLoadSourceType::PULSAR: - ctx->pulsar_info->clear_backlog(); - break; - default: - break; - } - } - ctx->write_data_cost_nanos = MonotonicNanos() - ctx->start_write_data_nanos; - ctx->promise.set_value(status); - - if (!executor->runtime_state()->get_error_log_file_path().empty()) { - ctx->error_url = to_load_error_http_path(executor->runtime_state()->get_error_log_file_path()); - } - - if (!executor->runtime_state()->get_rejected_record_file_path().empty()) { - ctx->rejected_record_path = fmt::format("{}:{}", BackendOptions::get_localBackend().host, - executor->runtime_state()->get_rejected_record_file_path()); - } - - if (ctx->unref()) { - delete ctx; - } - }); + int64_t num_selected_rows = ctx->number_total_rows - ctx->number_unselected_rows; + if (num_selected_rows > 0 && (double)ctx->number_filtered_rows / num_selected_rows > ctx->max_filter_ratio) { + status = Status::InternalError("too many filtered rows"); + } + if (status.ok()) { + StarRocksMetrics::instance()->stream_receive_bytes_total.increment(ctx->total_receive_bytes); + StarRocksMetrics::instance()->stream_load_rows_total.increment(ctx->number_loaded_rows); + } + } else { + LOG(WARNING) << "fragment execute failed" + << ", query_id=" << UniqueId(ctx->put_result.params.params.query_id) + << ", err_msg=" << status.message() << ", " << ctx->brief(); + if (ctx->body_sink != nullptr) { + ctx->body_sink->cancel(status); + } + switch (ctx->load_src_type) { + case TLoadSourceType::KAFKA: + ctx->kafka_info->reset_offset(); + break; + case TLoadSourceType::PULSAR: + ctx->pulsar_info->clear_backlog(); + break; + default: + break; + } + } + ctx->write_data_cost_nanos = MonotonicNanos() - ctx->start_write_data_nanos; + if (!rs->get_error_log_file_path().empty()) { + ctx->error_url = to_load_error_http_path(rs->get_error_log_file_path()); + } + if (!rs->get_rejected_record_file_path().empty()) { + ctx->rejected_record_path = fmt::format("{}:{}", BackendOptions::get_localBackend().host, + rs->get_rejected_record_file_path()); + } + ctx->promise.set_value(status); + if (ctx->unref()) { + delete ctx; + } + return Status::OK(); + } + if (ctx->unref()) { + delete ctx; + } + return st; if (!st.ok()) { if (ctx->unref()) { delete ctx; diff --git a/be/src/service/backend_base.cpp b/be/src/service/backend_base.cpp index a65e82298be..c2b99367329 100644 --- a/be/src/service/backend_base.cpp +++ b/be/src/service/backend_base.cpp @@ -95,7 +95,13 @@ Status BackendServiceBase::start_plan_fragment_execution(const TExecPlanFragment if (!exec_params.fragment.__isset.output_sink) { return Status::InternalError("missing sink in plan fragment"); } - return _exec_env->fragment_mgr()->exec_plan_fragment(exec_params); + // Always execute with pipeline engine + pipeline::FragmentExecutor fragment_executor; + auto status = fragment_executor.prepare(_exec_env, exec_params, exec_params); + if (status.ok()) { + return fragment_executor.execute(_exec_env); + } + return status.is_duplicate_rpc_invocation() ? Status::OK() : status; } void BackendServiceBase::cancel_plan_fragment(TCancelPlanFragmentResult& return_val, diff --git a/be/src/service/internal_service.cpp b/be/src/service/internal_service.cpp index 0a4822a90d3..5186a37b6cb 100644 --- a/be/src/service/internal_service.cpp +++ b/be/src/service/internal_service.cpp @@ -470,23 +470,12 @@ Status PInternalServiceImplBase::_exec_plan_fragment(brpc::Controller* cntl, MAX_CHUNK_SIZE, batch_size)); } - bool is_pipeline = t_request.__isset.is_pipeline && t_request.is_pipeline; + bool is_pipeline = t_request.__isset.is_pipeline ? t_request.is_pipeline : true; VLOG(1) << "exec plan fragment, fragment_instance_id=" << print_id(t_request.params.fragment_instance_id) << ", coord=" << t_request.coord << ", backend=" << t_request.backend_num << ", is_pipeline=" << is_pipeline << ", chunk_size=" << t_request.query_options.batch_size; - if (is_pipeline) { - return _exec_plan_fragment_by_pipeline(t_request, t_request); - } else { - bool has_schema_table_sink = t_request.__isset.fragment && t_request.fragment.__isset.output_sink && - t_request.fragment.output_sink.type == TDataSinkType::SCHEMA_TABLE_SINK; - // SchemaTableSink is not supported on the Pipeline engine, we have to allow it to be executed on non-pipeline engine temporarily, - // this will be removed in the future. - if (has_schema_table_sink) { - return _exec_plan_fragment_by_non_pipeline(t_request); - } - return Status::InvalidArgument( - "non-pipeline engine is no longer supported since 3.2, please set enable_pipeline_engine=true."); - } + // Force pipeline execution path + return _exec_plan_fragment_by_pipeline(t_request, t_request); } template