Compare commits
1 Commits
main
...
cursor/rem
| Author | SHA1 | Date |
|---|---|---|
|
|
5fbba586ab |
|
|
@ -113,7 +113,7 @@ Status ExternalScanContextMgr::clear_scan_context(const std::string& context_id)
|
|||
}
|
||||
}
|
||||
if (context != nullptr) {
|
||||
// cancel pipeline
|
||||
// cancel pipeline fragment
|
||||
const auto& fragment_instance_id = context->fragment_instance_id;
|
||||
if (auto query_ctx = _exec_env->query_context_mgr()->get(context->query_id); query_ctx != nullptr) {
|
||||
if (auto fragment_ctx = query_ctx->fragment_mgr()->get(fragment_instance_id); fragment_ctx != nullptr) {
|
||||
|
|
@ -165,10 +165,16 @@ void ExternalScanContextMgr::gc_expired_context() {
|
|||
}
|
||||
}
|
||||
for (const auto& expired_context : expired_contexts) {
|
||||
// must cancel the fragment instance, otherwise return thrift transport TTransportException
|
||||
WARN_IF_ERROR(
|
||||
_exec_env->fragment_mgr()->cancel(expired_context->fragment_instance_id),
|
||||
strings::Substitute("Fail to cancel fragment $0", print_id(expired_context->fragment_instance_id)));
|
||||
// must cancel the fragment instance (pipeline)
|
||||
if (auto query_ctx = _exec_env->query_context_mgr()->get(expired_context->query_id); query_ctx != nullptr) {
|
||||
if (auto fragment_ctx = query_ctx->fragment_mgr()->get(expired_context->fragment_instance_id);
|
||||
fragment_ctx != nullptr) {
|
||||
std::stringstream msg;
|
||||
msg << "FragmentContext(id=" << print_id(expired_context->fragment_instance_id)
|
||||
<< ") cancelled by gc_expired_context";
|
||||
fragment_ctx->cancel(Status::Cancelled(msg.str()));
|
||||
}
|
||||
}
|
||||
WARN_IF_ERROR(_exec_env->result_queue_mgr()->cancel(expired_context->fragment_instance_id),
|
||||
strings::Substitute("Fail to cancel fragment $0 in result queue mgr",
|
||||
print_id(expired_context->fragment_instance_id)));
|
||||
|
|
|
|||
|
|
@ -82,75 +82,72 @@ Status StreamLoadExecutor::execute_plan_fragment(StreamLoadContext* ctx) {
|
|||
LOG(INFO) << "begin to execute job. label=" << ctx->label << ", txn_id: " << ctx->txn_id
|
||||
<< ", query_id=" << print_id(ctx->put_result.params.params.query_id);
|
||||
// Once this is added into FragmentMgr, the fragment will be counted during graceful exit.
|
||||
auto st = _exec_env->fragment_mgr()->exec_plan_fragment(
|
||||
ctx->put_result.params,
|
||||
[ctx](PlanFragmentExecutor* executor) {
|
||||
ctx->runtime_profile = executor->runtime_state()->runtime_profile_ptr();
|
||||
ctx->query_mem_tracker = executor->runtime_state()->query_mem_tracker_ptr();
|
||||
ctx->instance_mem_tracker = executor->runtime_state()->instance_mem_tracker_ptr();
|
||||
},
|
||||
[ctx](PlanFragmentExecutor* executor) {
|
||||
ctx->commit_infos = std::move(executor->runtime_state()->tablet_commit_infos());
|
||||
ctx->fail_infos = std::move(executor->runtime_state()->tablet_fail_infos());
|
||||
Status status = executor->status();
|
||||
if (status.ok()) {
|
||||
ctx->number_total_rows = executor->runtime_state()->num_rows_load_sink() +
|
||||
executor->runtime_state()->num_rows_load_filtered() +
|
||||
executor->runtime_state()->num_rows_load_unselected();
|
||||
ctx->number_loaded_rows = executor->runtime_state()->num_rows_load_sink();
|
||||
ctx->number_filtered_rows = executor->runtime_state()->num_rows_load_filtered();
|
||||
ctx->number_unselected_rows = executor->runtime_state()->num_rows_load_unselected();
|
||||
ctx->loaded_bytes = executor->runtime_state()->num_bytes_load_sink();
|
||||
// Execute via pipeline FragmentExecutor and set promise on completion
|
||||
pipeline::FragmentExecutor fragment_executor;
|
||||
auto st = fragment_executor.prepare(_exec_env, ctx->put_result.params, ctx->put_result.params);
|
||||
if (st.ok()) {
|
||||
// Capture runtime_state for metrics after execution; finish is signaled via query pipeline reporter
|
||||
auto exec_st = fragment_executor.execute(_exec_env);
|
||||
// collect results and metrics after execution
|
||||
RuntimeState* rs = _exec_env->query_context_mgr()
|
||||
->get(ctx->put_result.params.params.query_id)
|
||||
->fragment_mgr()
|
||||
->get(ctx->put_result.params.params.fragment_instance_id)
|
||||
->runtime_state();
|
||||
ctx->commit_infos = std::move(rs->tablet_commit_infos());
|
||||
ctx->fail_infos = std::move(rs->tablet_fail_infos());
|
||||
Status status = exec_st;
|
||||
if (status.ok()) {
|
||||
ctx->number_total_rows = rs->num_rows_load_sink() + rs->num_rows_load_filtered() + rs->num_rows_load_unselected();
|
||||
ctx->number_loaded_rows = rs->num_rows_load_sink();
|
||||
ctx->number_filtered_rows = rs->num_rows_load_filtered();
|
||||
ctx->number_unselected_rows = rs->num_rows_load_unselected();
|
||||
ctx->loaded_bytes = rs->num_bytes_load_sink();
|
||||
|
||||
int64_t num_selected_rows = ctx->number_total_rows - ctx->number_unselected_rows;
|
||||
if ((double)ctx->number_filtered_rows / num_selected_rows > ctx->max_filter_ratio) {
|
||||
// NOTE: Do not modify the error message here, for historical
|
||||
// reasons,
|
||||
// some users may rely on this error message.
|
||||
status = Status::InternalError("too many filtered rows");
|
||||
}
|
||||
|
||||
if (status.ok()) {
|
||||
StarRocksMetrics::instance()->stream_receive_bytes_total.increment(ctx->total_receive_bytes);
|
||||
StarRocksMetrics::instance()->stream_load_rows_total.increment(ctx->number_loaded_rows);
|
||||
}
|
||||
} else {
|
||||
LOG(WARNING) << "fragment execute failed"
|
||||
<< ", query_id=" << UniqueId(ctx->put_result.params.params.query_id)
|
||||
<< ", err_msg=" << status.message() << ", " << ctx->brief();
|
||||
// cancel body_sink, make sender known it
|
||||
if (ctx->body_sink != nullptr) {
|
||||
ctx->body_sink->cancel(status);
|
||||
}
|
||||
|
||||
switch (ctx->load_src_type) {
|
||||
// reset the stream load ctx's kafka commit offset
|
||||
case TLoadSourceType::KAFKA:
|
||||
ctx->kafka_info->reset_offset();
|
||||
break;
|
||||
case TLoadSourceType::PULSAR:
|
||||
ctx->pulsar_info->clear_backlog();
|
||||
break;
|
||||
default:
|
||||
break;
|
||||
}
|
||||
}
|
||||
ctx->write_data_cost_nanos = MonotonicNanos() - ctx->start_write_data_nanos;
|
||||
ctx->promise.set_value(status);
|
||||
|
||||
if (!executor->runtime_state()->get_error_log_file_path().empty()) {
|
||||
ctx->error_url = to_load_error_http_path(executor->runtime_state()->get_error_log_file_path());
|
||||
}
|
||||
|
||||
if (!executor->runtime_state()->get_rejected_record_file_path().empty()) {
|
||||
ctx->rejected_record_path = fmt::format("{}:{}", BackendOptions::get_localBackend().host,
|
||||
executor->runtime_state()->get_rejected_record_file_path());
|
||||
}
|
||||
|
||||
if (ctx->unref()) {
|
||||
delete ctx;
|
||||
}
|
||||
});
|
||||
int64_t num_selected_rows = ctx->number_total_rows - ctx->number_unselected_rows;
|
||||
if (num_selected_rows > 0 && (double)ctx->number_filtered_rows / num_selected_rows > ctx->max_filter_ratio) {
|
||||
status = Status::InternalError("too many filtered rows");
|
||||
}
|
||||
if (status.ok()) {
|
||||
StarRocksMetrics::instance()->stream_receive_bytes_total.increment(ctx->total_receive_bytes);
|
||||
StarRocksMetrics::instance()->stream_load_rows_total.increment(ctx->number_loaded_rows);
|
||||
}
|
||||
} else {
|
||||
LOG(WARNING) << "fragment execute failed"
|
||||
<< ", query_id=" << UniqueId(ctx->put_result.params.params.query_id)
|
||||
<< ", err_msg=" << status.message() << ", " << ctx->brief();
|
||||
if (ctx->body_sink != nullptr) {
|
||||
ctx->body_sink->cancel(status);
|
||||
}
|
||||
switch (ctx->load_src_type) {
|
||||
case TLoadSourceType::KAFKA:
|
||||
ctx->kafka_info->reset_offset();
|
||||
break;
|
||||
case TLoadSourceType::PULSAR:
|
||||
ctx->pulsar_info->clear_backlog();
|
||||
break;
|
||||
default:
|
||||
break;
|
||||
}
|
||||
}
|
||||
ctx->write_data_cost_nanos = MonotonicNanos() - ctx->start_write_data_nanos;
|
||||
if (!rs->get_error_log_file_path().empty()) {
|
||||
ctx->error_url = to_load_error_http_path(rs->get_error_log_file_path());
|
||||
}
|
||||
if (!rs->get_rejected_record_file_path().empty()) {
|
||||
ctx->rejected_record_path = fmt::format("{}:{}", BackendOptions::get_localBackend().host,
|
||||
rs->get_rejected_record_file_path());
|
||||
}
|
||||
ctx->promise.set_value(status);
|
||||
if (ctx->unref()) {
|
||||
delete ctx;
|
||||
}
|
||||
return Status::OK();
|
||||
}
|
||||
if (ctx->unref()) {
|
||||
delete ctx;
|
||||
}
|
||||
return st;
|
||||
if (!st.ok()) {
|
||||
if (ctx->unref()) {
|
||||
delete ctx;
|
||||
|
|
|
|||
|
|
@ -95,7 +95,13 @@ Status BackendServiceBase::start_plan_fragment_execution(const TExecPlanFragment
|
|||
if (!exec_params.fragment.__isset.output_sink) {
|
||||
return Status::InternalError("missing sink in plan fragment");
|
||||
}
|
||||
return _exec_env->fragment_mgr()->exec_plan_fragment(exec_params);
|
||||
// Always execute with pipeline engine
|
||||
pipeline::FragmentExecutor fragment_executor;
|
||||
auto status = fragment_executor.prepare(_exec_env, exec_params, exec_params);
|
||||
if (status.ok()) {
|
||||
return fragment_executor.execute(_exec_env);
|
||||
}
|
||||
return status.is_duplicate_rpc_invocation() ? Status::OK() : status;
|
||||
}
|
||||
|
||||
void BackendServiceBase::cancel_plan_fragment(TCancelPlanFragmentResult& return_val,
|
||||
|
|
|
|||
|
|
@ -470,23 +470,12 @@ Status PInternalServiceImplBase<T>::_exec_plan_fragment(brpc::Controller* cntl,
|
|||
MAX_CHUNK_SIZE, batch_size));
|
||||
}
|
||||
|
||||
bool is_pipeline = t_request.__isset.is_pipeline && t_request.is_pipeline;
|
||||
bool is_pipeline = t_request.__isset.is_pipeline ? t_request.is_pipeline : true;
|
||||
VLOG(1) << "exec plan fragment, fragment_instance_id=" << print_id(t_request.params.fragment_instance_id)
|
||||
<< ", coord=" << t_request.coord << ", backend=" << t_request.backend_num << ", is_pipeline=" << is_pipeline
|
||||
<< ", chunk_size=" << t_request.query_options.batch_size;
|
||||
if (is_pipeline) {
|
||||
return _exec_plan_fragment_by_pipeline(t_request, t_request);
|
||||
} else {
|
||||
bool has_schema_table_sink = t_request.__isset.fragment && t_request.fragment.__isset.output_sink &&
|
||||
t_request.fragment.output_sink.type == TDataSinkType::SCHEMA_TABLE_SINK;
|
||||
// SchemaTableSink is not supported on the Pipeline engine, we have to allow it to be executed on non-pipeline engine temporarily,
|
||||
// this will be removed in the future.
|
||||
if (has_schema_table_sink) {
|
||||
return _exec_plan_fragment_by_non_pipeline(t_request);
|
||||
}
|
||||
return Status::InvalidArgument(
|
||||
"non-pipeline engine is no longer supported since 3.2, please set enable_pipeline_engine=true.");
|
||||
}
|
||||
// Force pipeline execution path
|
||||
return _exec_plan_fragment_by_pipeline(t_request, t_request);
|
||||
}
|
||||
|
||||
template <typename T>
|
||||
|
|
|
|||
Loading…
Reference in New Issue