Remove unused operator's mem_tracker (#2878)

This commit is contained in:
liuyehcf 2022-01-18 12:51:03 +08:00 committed by GitHub
parent 8a93143929
commit b42dc63da5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 31 additions and 26 deletions

1
.gitignore vendored
View File

@ -36,6 +36,7 @@ fs_brokers/apache_hdfs_broker/src/main/thrift/
dependency-reduced-pom.xml
tags
.tags
.cache
*.vim
compile_commands.json

View File

@ -8,7 +8,7 @@ namespace starrocks::pipeline {
Status AggregateBlockingSinkOperator::prepare(RuntimeState* state) {
RETURN_IF_ERROR(Operator::prepare(state));
RETURN_IF_ERROR(_aggregator->prepare(state, state->obj_pool(), get_runtime_profile(), _mem_tracker.get()));
RETURN_IF_ERROR(_aggregator->prepare(state, state->obj_pool(), get_runtime_profile(), _mem_tracker));
return _aggregator->open(state);
}

View File

@ -8,7 +8,7 @@ namespace starrocks::pipeline {
Status AggregateDistinctBlockingSinkOperator::prepare(RuntimeState* state) {
RETURN_IF_ERROR(Operator::prepare(state));
RETURN_IF_ERROR(_aggregator->prepare(state, state->obj_pool(), get_runtime_profile(), _mem_tracker.get()));
RETURN_IF_ERROR(_aggregator->prepare(state, state->obj_pool(), get_runtime_profile(), _mem_tracker));
return _aggregator->open(state);
}

View File

@ -8,7 +8,7 @@ namespace starrocks::pipeline {
Status AggregateDistinctStreamingSinkOperator::prepare(RuntimeState* state) {
RETURN_IF_ERROR(Operator::prepare(state));
RETURN_IF_ERROR(_aggregator->prepare(state, state->obj_pool(), get_runtime_profile(), _mem_tracker.get()));
RETURN_IF_ERROR(_aggregator->prepare(state, state->obj_pool(), get_runtime_profile(), _mem_tracker));
return _aggregator->open(state);
}
@ -145,4 +145,4 @@ Status AggregateDistinctStreamingSinkOperator::_push_chunk_by_auto(const size_t
return Status::OK();
}
} // namespace starrocks::pipeline
} // namespace starrocks::pipeline

View File

@ -8,7 +8,7 @@ namespace starrocks::pipeline {
Status AggregateStreamingSinkOperator::prepare(RuntimeState* state) {
RETURN_IF_ERROR(Operator::prepare(state));
RETURN_IF_ERROR(_aggregator->prepare(state, state->obj_pool(), get_runtime_profile(), _mem_tracker.get()));
RETURN_IF_ERROR(_aggregator->prepare(state, state->obj_pool(), get_runtime_profile(), _mem_tracker));
return _aggregator->open(state);
}

View File

@ -107,8 +107,23 @@ Status FragmentExecutor::prepare(ExecEnv* exec_env, const TExecPlanFragmentParam
LOG(INFO) << "Prepare(): query_id=" << print_id(query_id)
<< " fragment_instance_id=" << print_id(params.fragment_instance_id) << " backend_num=" << backend_num;
_fragment_ctx->set_runtime_state(
std::make_unique<RuntimeState>(query_id, fragment_instance_id, query_options, query_globals, exec_env));
int32_t degree_of_parallelism = 1;
if (request.__isset.pipeline_dop && request.pipeline_dop > 0) {
degree_of_parallelism = request.pipeline_dop;
} else {
// default dop is a half of the number of hardware threads.
degree_of_parallelism = std::max<int32_t>(1, std::thread::hardware_concurrency() / 2);
}
if (query_options.__isset.mem_limit) {
auto copy_query_options = query_options;
copy_query_options.mem_limit *= degree_of_parallelism;
_fragment_ctx->set_runtime_state(std::make_unique<RuntimeState>(query_id, fragment_instance_id,
copy_query_options, query_globals, exec_env));
} else {
_fragment_ctx->set_runtime_state(
std::make_unique<RuntimeState>(query_id, fragment_instance_id, query_options, query_globals, exec_env));
}
auto* runtime_state = _fragment_ctx->runtime_state();
runtime_state->init_mem_trackers(query_id);
runtime_state->set_be_number(backend_num);
@ -148,14 +163,6 @@ Status FragmentExecutor::prepare(ExecEnv* exec_env, const TExecPlanFragmentParam
static_cast<ExchangeNode*>(exch_node)->set_num_senders(num_senders);
}
int32_t degree_of_parallelism = 1;
if (request.__isset.pipeline_dop && request.pipeline_dop > 0) {
degree_of_parallelism = request.pipeline_dop;
} else {
// default dop is a half of the number of hardware threads.
degree_of_parallelism = std::max<int32_t>(1, std::thread::hardware_concurrency() / 2);
}
// set scan ranges
std::vector<ExecNode*> scan_nodes;
std::vector<TScanRangeParams> no_scan_ranges;
@ -166,6 +173,7 @@ Status FragmentExecutor::prepare(ExecEnv* exec_env, const TExecPlanFragmentParam
ScanNode* scan_node = down_cast<ScanNode*>(i);
const std::vector<TScanRangeParams>& scan_ranges =
FindWithDefault(params.per_node_scan_ranges, scan_node->id(), no_scan_ranges);
DCHECK_GT(morsel_queues.count(scan_node->id()), 0);
Morsels morsels = convert_scan_range_to_morsel(scan_ranges, scan_node->id());
morsel_queues.emplace(scan_node->id(), std::make_unique<MorselQueue>(std::move(morsels)));
}

View File

@ -28,10 +28,10 @@ Operator::Operator(OperatorFactory* factory, int32_t id, const std::string& name
}
_runtime_profile = std::make_shared<RuntimeProfile>(profile_name);
_runtime_profile->set_metadata(_id);
_mem_tracker = std::make_unique<MemTracker>(_runtime_profile.get(), -1, _runtime_profile->name(), nullptr);
}
Status Operator::prepare(RuntimeState* state) {
_mem_tracker = state->instance_mem_tracker();
_total_timer = ADD_TIMER(_runtime_profile, "OperatorTotalTime");
_push_timer = ADD_TIMER(_runtime_profile, "PushTotalTime");
_pull_timer = ADD_TIMER(_runtime_profile, "PullTotalTime");

View File

@ -141,7 +141,7 @@ protected:
// Which plan node this operator belongs to
const int32_t _plan_node_id;
std::shared_ptr<RuntimeProfile> _runtime_profile;
std::unique_ptr<MemTracker> _mem_tracker;
MemTracker* _mem_tracker = nullptr;
bool _conjuncts_and_in_filters_is_cached = false;
std::vector<ExprContext*> _cached_conjuncts_and_in_filters;

View File

@ -100,7 +100,7 @@ StatusOr<DriverState> PipelineDriver::process(RuntimeState* runtime_state) {
size_t num_chunk_moved = 0;
bool should_yield = false;
size_t num_operators = _operators.size();
size_t _new_first_unfinished = _first_unfinished;
size_t new_first_unfinished = _first_unfinished;
for (size_t i = _first_unfinished; i < num_operators - 1; ++i) {
{
SCOPED_RAW_TIMER(&time_spent);
@ -114,7 +114,7 @@ StatusOr<DriverState> PipelineDriver::process(RuntimeState* runtime_state) {
_mark_operator_finishing(curr_op, runtime_state);
}
_mark_operator_finishing(next_op, runtime_state);
_new_first_unfinished = i + 1;
new_first_unfinished = i + 1;
continue;
}
@ -174,7 +174,7 @@ StatusOr<DriverState> PipelineDriver::process(RuntimeState* runtime_state) {
_mark_operator_finishing(curr_op, runtime_state);
}
_mark_operator_finishing(next_op, runtime_state);
_new_first_unfinished = i + 1;
new_first_unfinished = i + 1;
continue;
}
}
@ -186,10 +186,10 @@ StatusOr<DriverState> PipelineDriver::process(RuntimeState* runtime_state) {
}
}
// close finished operators and update _first_unfinished index
for (auto i = _first_unfinished; i < _new_first_unfinished; ++i) {
for (auto i = _first_unfinished; i < new_first_unfinished; ++i) {
_mark_operator_finished(_operators[i], runtime_state);
}
_first_unfinished = _new_first_unfinished;
_first_unfinished = new_first_unfinished;
if (sink_operator()->is_finished()) {
finish_operators(runtime_state);

View File

@ -557,10 +557,6 @@ pipeline::OpFactories OlapScanNode::decompose_to_pipeline(pipeline::PipelineBuil
std::move(_conjunct_ctxs), limit());
// Initialize OperatorFactory's fields involving runtime filters.
this->init_runtime_filter_for_operator(scan_operator.get(), context, rc_rf_probe_collector);
auto& morsel_queues = context->fragment_context()->morsel_queues();
auto source_id = scan_operator->plan_node_id();
DCHECK(morsel_queues.count(source_id));
auto& morsel_queue = morsel_queues[source_id];
scan_operator->set_degree_of_parallelism(context->get_dop_of_scan_node(this->id()));
operators.emplace_back(std::move(scan_operator));
if (limit() != -1) {