[SR-4075] Refactor report_exec_status in pipeline engine (#343)
This commit is contained in:
parent
a8cb05c616
commit
b2a528a861
|
|
@ -156,28 +156,8 @@ ExecStateReporter::ExecStateReporter() {
|
|||
}
|
||||
}
|
||||
|
||||
void ExecStateReporter::submit(FragmentContext* fragment_ctx, const Status& status, bool done, bool clean) {
|
||||
auto report_func = [=]() {
|
||||
auto params = create_report_exec_status_params(fragment_ctx, status, done);
|
||||
auto status = report_exec_status(params, fragment_ctx->runtime_state()->exec_env(), fragment_ctx->fe_addr());
|
||||
if (!status.ok()) {
|
||||
LOG(WARNING) << "[Driver] Fail to report exec state: fragment_instance_id="
|
||||
<< fragment_ctx->fragment_instance_id();
|
||||
} else {
|
||||
LOG(INFO) << "[Driver] Succeed to report exec state: fragment_instance_id="
|
||||
<< fragment_ctx->fragment_instance_id();
|
||||
}
|
||||
if (clean) {
|
||||
auto query_id = fragment_ctx->query_id();
|
||||
auto&& query_ctx = QueryContextManager::instance()->get(query_id);
|
||||
DCHECK(query_ctx);
|
||||
query_ctx->fragment_mgr()->unregister(fragment_ctx->fragment_instance_id());
|
||||
if (query_ctx->count_down_fragment()) {
|
||||
QueryContextManager::instance()->unregister(query_id);
|
||||
}
|
||||
}
|
||||
};
|
||||
_thread_pool->submit_func(report_func);
|
||||
void ExecStateReporter::submit(std::function<void()>&& report_task) {
|
||||
_thread_pool->submit_func(std::move(report_task));
|
||||
}
|
||||
|
||||
} // namespace pipeline
|
||||
|
|
|
|||
|
|
@ -22,7 +22,7 @@ public:
|
|||
bool done);
|
||||
static Status report_exec_status(const TReportExecStatusParams& params, ExecEnv* exec_env, TNetworkAddress fe_addr);
|
||||
ExecStateReporter();
|
||||
void submit(FragmentContext* fragment_ctx, const Status& status, bool done, bool clean);
|
||||
void submit(std::function<void()>&& report_task);
|
||||
|
||||
private:
|
||||
std::unique_ptr<ThreadPool> _thread_pool;
|
||||
|
|
|
|||
|
|
@ -91,6 +91,8 @@ private:
|
|||
TUniqueId _fragment_instance_id;
|
||||
TNetworkAddress _fe_addr;
|
||||
|
||||
// never adjust the order of _mem_tracker, _runtime_state, _plan, _pipelines and _drivers, since
|
||||
// _plan depends on _runtime_state and _drivers depends on _mem_tracker and _runtime_state.
|
||||
std::unique_ptr<MemTracker> _mem_tracker = nullptr;
|
||||
std::shared_ptr<RuntimeState> _runtime_state = nullptr;
|
||||
ExecNode* _plan = nullptr; // lives in _runtime_state->obj_pool()
|
||||
|
|
|
|||
|
|
@ -161,16 +161,16 @@ void PipelineDriver::finalize(RuntimeState* runtime_state, DriverState state) {
|
|||
_fragment_ctx->finish();
|
||||
auto status = _fragment_ctx->final_status();
|
||||
_fragment_ctx->runtime_state()->exec_env()->driver_dispatcher()->report_exec_state(_fragment_ctx, status,
|
||||
true, false);
|
||||
true);
|
||||
}
|
||||
}
|
||||
// last finished driver notify FE the fragment's completion again and
|
||||
// unregister the FragmentContext.
|
||||
if (_fragment_ctx->count_down_drivers()) {
|
||||
auto status = _fragment_ctx->final_status();
|
||||
auto fragment_id = _fragment_ctx->fragment_instance_id();
|
||||
VLOG_ROW << "[Driver] Last driver finished: final_status=" << status.to_string();
|
||||
_fragment_ctx->runtime_state()->exec_env()->driver_dispatcher()->report_exec_state(_fragment_ctx, status, true,
|
||||
true);
|
||||
_query_ctx->count_down_fragments();
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -29,6 +29,16 @@ void GlobalDriverDispatcher::change_num_threads(int32_t num_threads) {
|
|||
}
|
||||
}
|
||||
|
||||
void GlobalDriverDispatcher::finalize_driver(DriverPtr& driver, RuntimeState* runtime_state, DriverState state) {
|
||||
DCHECK(driver);
|
||||
driver->finalize(runtime_state, state);
|
||||
if (driver->query_ctx()->is_finished()) {
|
||||
auto query_id = driver->query_ctx()->query_id();
|
||||
driver.reset();
|
||||
QueryContextManager::instance()->remove(query_id);
|
||||
}
|
||||
}
|
||||
|
||||
void GlobalDriverDispatcher::run() {
|
||||
while (true) {
|
||||
if (_num_threads_setter.should_shrink()) {
|
||||
|
|
@ -48,13 +58,13 @@ void GlobalDriverDispatcher::run() {
|
|||
driver->set_driver_state(DriverState::PENDING_FINISH);
|
||||
_blocked_driver_poller->add_blocked_driver(driver);
|
||||
} else {
|
||||
driver->finalize(runtime_state, DriverState::CANCELED);
|
||||
finalize_driver(driver, runtime_state, DriverState::CANCELED);
|
||||
}
|
||||
continue;
|
||||
}
|
||||
// a blocked driver is canceled because of fragment cancellation or query expiration.
|
||||
if (driver->is_finished()) {
|
||||
driver->finalize(runtime_state, driver->driver_state());
|
||||
finalize_driver(driver, runtime_state, driver->driver_state());
|
||||
continue;
|
||||
}
|
||||
// query context has ready drivers to run, so extend its lifetime.
|
||||
|
|
@ -69,7 +79,7 @@ void GlobalDriverDispatcher::run() {
|
|||
driver->set_driver_state(DriverState::PENDING_FINISH);
|
||||
_blocked_driver_poller->add_blocked_driver(driver);
|
||||
} else {
|
||||
driver->finalize(runtime_state, DriverState::INTERNAL_ERROR);
|
||||
finalize_driver(driver, runtime_state, DriverState::INTERNAL_ERROR);
|
||||
}
|
||||
continue;
|
||||
}
|
||||
|
|
@ -88,7 +98,7 @@ void GlobalDriverDispatcher::run() {
|
|||
VLOG_ROW << strings::Substitute("[Driver] Finished, source=$0, state=$1, status=$2",
|
||||
driver->source_operator()->get_name(), ds_to_string(driver_state),
|
||||
fragment_ctx->final_status().to_string());
|
||||
driver->finalize(runtime_state, driver_state);
|
||||
finalize_driver(driver, runtime_state, driver_state);
|
||||
break;
|
||||
}
|
||||
case INPUT_EMPTY:
|
||||
|
|
@ -109,9 +119,22 @@ void GlobalDriverDispatcher::dispatch(DriverPtr driver) {
|
|||
this->_driver_queue->put_back(driver);
|
||||
}
|
||||
|
||||
void GlobalDriverDispatcher::report_exec_state(FragmentContext* fragment_ctx, const Status& status, bool done,
|
||||
bool clean) {
|
||||
this->_exec_state_reporter->submit(fragment_ctx, status, done, clean);
|
||||
void GlobalDriverDispatcher::report_exec_state(FragmentContext* fragment_ctx, const Status& status, bool done) {
|
||||
auto params = ExecStateReporter::create_report_exec_status_params(fragment_ctx, status, done);
|
||||
auto fe_addr = fragment_ctx->fe_addr();
|
||||
auto exec_env = fragment_ctx->runtime_state()->exec_env();
|
||||
auto fragment_id = fragment_ctx->fragment_instance_id();
|
||||
|
||||
auto report_task = [=]() {
|
||||
auto status = ExecStateReporter::report_exec_status(params, exec_env, fe_addr);
|
||||
if (!status.ok()) {
|
||||
LOG(WARNING) << "[Driver] Fail to report exec state: fragment_instance_id=" << print_id(fragment_id);
|
||||
} else {
|
||||
LOG(INFO) << "[Driver] Succeed to report exec state: fragment_instance_id=" << print_id(fragment_id);
|
||||
}
|
||||
};
|
||||
|
||||
this->_exec_state_reporter->submit(std::move(report_task));
|
||||
}
|
||||
} // namespace pipeline
|
||||
} // namespace starrocks
|
||||
|
|
|
|||
|
|
@ -33,13 +33,8 @@ public:
|
|||
// just notify FE timely the completeness of fragment via invocation of report_exec_state, but
|
||||
// the FragmentContext is not unregistered until all the drivers has finished, because some
|
||||
// non-root drivers maybe has pending io task executed in io threads asynchronously has reference
|
||||
// to objects owned by FragmentContext. so here:
|
||||
// 1. for the first time report_exec_state, clean is false, means not unregister FragmentContext
|
||||
// when all root drivers has finished.
|
||||
//
|
||||
// 2. for the second time report_exec_state, clean is true means that all the drivers has finished,
|
||||
// so now FragmentContext can be unregistered safely.
|
||||
virtual void report_exec_state(FragmentContext* fragment_ctx, const Status& status, bool done, bool clean) = 0;
|
||||
// to objects owned by FragmentContext.
|
||||
virtual void report_exec_state(FragmentContext* fragment_ctx, const Status& status, bool done) = 0;
|
||||
};
|
||||
|
||||
class GlobalDriverDispatcher final : public FactoryMethod<DriverDispatcher, GlobalDriverDispatcher> {
|
||||
|
|
@ -49,10 +44,11 @@ public:
|
|||
void initialize(int32_t num_threads) override;
|
||||
void change_num_threads(int32_t num_threads) override;
|
||||
void dispatch(DriverPtr driver) override;
|
||||
void report_exec_state(FragmentContext* fragment_ctx, const Status& status, bool done, bool clean) override;
|
||||
void report_exec_state(FragmentContext* fragment_ctx, const Status& status, bool done) override;
|
||||
|
||||
private:
|
||||
void run();
|
||||
void finalize_driver(DriverPtr& driver, RuntimeState* runtime_state, DriverState state);
|
||||
|
||||
private:
|
||||
LimitSetter _num_threads_setter;
|
||||
|
|
|
|||
|
|
@ -26,8 +26,9 @@ QueryContext* QueryContextManager::get_or_register(const TUniqueId& query_id) {
|
|||
return iter->second.get();
|
||||
}
|
||||
|
||||
auto&& ctx = std::make_unique<QueryContext>();
|
||||
auto&& ctx = std::make_shared<QueryContext>();
|
||||
auto* ctx_raw_ptr = ctx.get();
|
||||
ctx_raw_ptr->set_query_id(query_id);
|
||||
ctx_raw_ptr->increment_num_fragments();
|
||||
_contexts.emplace(query_id, std::move(ctx));
|
||||
return ctx_raw_ptr;
|
||||
|
|
@ -43,9 +44,16 @@ QueryContextPtr QueryContextManager::get(const TUniqueId& query_id) {
|
|||
}
|
||||
}
|
||||
|
||||
void QueryContextManager::unregister(const TUniqueId& query_id) {
|
||||
QueryContextPtr QueryContextManager::remove(const TUniqueId& query_id) {
|
||||
std::lock_guard lock(_lock);
|
||||
_contexts.erase(query_id);
|
||||
auto it = _contexts.find(query_id);
|
||||
if (it != _contexts.end()) {
|
||||
auto ctx = std::move(it->second);
|
||||
_contexts.erase(it);
|
||||
return ctx;
|
||||
} else {
|
||||
return nullptr;
|
||||
}
|
||||
}
|
||||
|
||||
} // namespace pipeline
|
||||
|
|
|
|||
|
|
@ -27,7 +27,9 @@ using std::chrono::duration_cast;
|
|||
class QueryContext {
|
||||
public:
|
||||
QueryContext();
|
||||
RuntimeState* get_runtime_state() { return _runtime_state.get(); }
|
||||
void set_query_id(const TUniqueId& query_id) { _query_id = query_id; }
|
||||
TUniqueId query_id() { return _query_id; }
|
||||
RuntimeState* runtime_state() { return _runtime_state.get(); }
|
||||
void set_total_fragments(size_t total_fragments) { _total_fragments = total_fragments; }
|
||||
|
||||
void increment_num_fragments() {
|
||||
|
|
@ -35,7 +37,9 @@ public:
|
|||
_num_active_fragments.fetch_add(1);
|
||||
}
|
||||
|
||||
bool count_down_fragment() { return _num_active_fragments.fetch_sub(1) == 1; }
|
||||
bool count_down_fragments() { return _num_active_fragments.fetch_sub(1) == 1; }
|
||||
|
||||
bool is_finished() { return _num_active_fragments.load() == 0; }
|
||||
|
||||
void set_expire_seconds(int expire_seconds) { _expire_seconds = seconds(expire_seconds); }
|
||||
|
||||
|
|
@ -74,7 +78,7 @@ class QueryContextManager {
|
|||
public:
|
||||
QueryContext* get_or_register(const TUniqueId& query_id);
|
||||
QueryContextPtr get(const TUniqueId& query_id);
|
||||
void unregister(const TUniqueId& query_id);
|
||||
QueryContextPtr remove(const TUniqueId& query_id);
|
||||
|
||||
private:
|
||||
//TODO(by satanson)
|
||||
|
|
@ -82,5 +86,6 @@ private:
|
|||
std::mutex _lock;
|
||||
std::unordered_map<TUniqueId, QueryContextPtr> _contexts;
|
||||
};
|
||||
|
||||
} // namespace pipeline
|
||||
} // namespace starrocks
|
||||
Loading…
Reference in New Issue