[Enhancement] Support parallel merge (2) (#21205)

* [Enhancement] Support parallel merge (2)

Signed-off-by: liuyehcf <1559500551@qq.com>
This commit is contained in:
liuyehcf 2023-04-18 19:19:29 +08:00 committed by GitHub
parent 28e868e9ee
commit bd6cca27bb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 616 additions and 216 deletions

1
.gitignore vendored
View File

@ -59,6 +59,7 @@ StarRocksLex.tokens
.project
.settings/
.idea/
.gradle/
/Default/
be/cmake-build*
be/.vscode

View File

@ -59,7 +59,6 @@ Status ExchangeParallelMergeSourceOperator::set_finishing(RuntimeState* state) {
}
StatusOr<ChunkPtr> ExchangeParallelMergeSourceOperator::pull_chunk(RuntimeState* state) {
// TODO process limit
auto chunk = _merger->try_get_next(_driver_sequence);
if (_merger->is_finished()) {
@ -107,8 +106,8 @@ merge_path::MergePathCascadeMerger* ExchangeParallelMergeSourceOperatorFactory::
auto chunk_providers = _stream_recvr->create_merge_path_chunk_providers();
SortDescs sort_descs(_is_asc_order, _nulls_first);
_merger = std::make_unique<merge_path::MergePathCascadeMerger>(
degree_of_parallelism(), _sort_exec_exprs->lhs_ordering_expr_ctxs(), sort_descs, chunk_providers,
state->chunk_size());
state->chunk_size(), degree_of_parallelism(), _sort_exec_exprs->lhs_ordering_expr_ctxs(), sort_descs,
_row_desc.tuple_descriptors()[0], _offset, _limit, chunk_providers);
}
return _merger.get();
}

View File

@ -93,8 +93,8 @@ OperatorPtr LocalParallelMergeSortSourceOperatorFactory::create(int32_t degree_o
});
}
_merger = std::make_unique<merge_path::MergePathCascadeMerger>(
degree_of_parallelism, sort_context->sort_exprs(), sort_context->sort_descs(), chunk_providers,
_state->chunk_size());
_state->chunk_size(), degree_of_parallelism, sort_context->sort_exprs(), sort_context->sort_descs(),
_tuple_desc, sort_context->offset(), sort_context->limit(), chunk_providers);
}
return std::make_shared<LocalParallelMergeSortSourceOperator>(this, _id, _plan_node_id, driver_sequence,

View File

@ -87,7 +87,10 @@ public:
Status prepare(RuntimeState* state) override;
OperatorPtr create(int32_t degree_of_parallelism, int32_t driver_sequence) override;
void set_tuple_desc(const TupleDescriptor* tuple_desc) { _tuple_desc = tuple_desc; }
private:
const TupleDescriptor* _tuple_desc;
RuntimeState* _state;
// share data with multiple partition sort sink opeartor through _sort_context.

View File

@ -57,6 +57,8 @@ public:
DCHECK_LT(driver_sequence, _chunks_sorter_partitions.size());
return _chunks_sorter_partitions[driver_sequence].get();
}
int64_t offset() const { return _offset; }
int64_t limit() const { return _limit; }
const std::vector<ExprContext*>& sort_exprs() const { return _sort_exprs; }
const SortDescs& sort_descs() const { return _sort_desc; }

File diff suppressed because it is too large Load Diff

View File

@ -14,12 +14,15 @@
#pragma once
#include <mutex>
#include <unordered_map>
#include <utility>
#include "column/vectorized_fwd.h"
#include "common/status.h"
#include "exec/sorting/merge.h"
#include "exec/sorting/sorting.h"
#include "runtime/descriptors.h"
#include "util/runtime_profile.h"
namespace starrocks::merge_path {
@ -132,7 +135,7 @@ void _is_intersection(const SortDescs& descs, const InputSegment& left, const si
* @param length The step for this parallelism can forward along merge path.
*/
void _do_merge_along_merge_path(const SortDescs& descs, const InputSegment& left, size_t& li, const InputSegment& right,
size_t& ri, OutputSegment& dest, size_t start_di, const size_t length);
size_t& ri, OutputSegment& dest, const size_t start_di, const size_t length);
} // namespace detail
class MergePathCascadeMerger;
@ -149,6 +152,8 @@ using MergePathChunkProvider = std::function<bool(bool only_check_if_has_data, C
namespace detail {
class MergeNode;
// A merge tree is composed by a group of node, node can be either LeafNode or MergeNode
// LeafNode gets data from the specific providers, and has no child
// MergeNode has two children, and combine two streams of data into one.
@ -156,26 +161,32 @@ class Node {
public:
Node(MergePathCascadeMerger* merger) : _merger(merger) {}
virtual ~Node() = default;
virtual void process_input(int32_t parallel_idx) = 0;
virtual void process_input(const int32_t parallel_idx) = 0;
virtual void process_input_done(){};
virtual bool is_leaf() { return false; }
// Means the previous node reaches eos, no more data will flow to current node,
// but there still may be some data in the input buffer to be consumed.
virtual bool dependency_finished() = 0;
// Means based on dependency_finished state, all the data in the input buffer has been consumed.
virtual bool input_finished() = 0;
// Return true if current node is active but cannot provide more data at this moment.
virtual bool is_pending() { return false; }
// Current node can produce more output if one of the following conditions are met.
// 1. It's dependency is not reaching eos. For LeafNode, dependency can be MergePathChunkProvider;
// For MergeNode, dependencies can be its left and right children.
// 2. Input buffer has not exhausted yet.
virtual bool has_more_output() = 0;
// Return true if current node cannot produce any more data, and the output segments of the latest
// processing has been fetched.
bool eos() { return !has_more_output() && _output_segments.empty(); }
std::vector<OutputSegmentPtr>& output_segments() { return _output_segments; }
void bind_parent(MergeNode* parent) { _parent = parent; }
bool parent_input_full();
size_t degree_of_parallelism() { return _global_2_local_parallel_idx.size(); }
void bind_parallel_idxs(std::unordered_map<int32_t, int32_t>&& global_2_local_parallel_idx) {
_global_2_local_parallel_idx = std::move(global_2_local_parallel_idx);
}
bool output_empty() { return _output_segments.empty(); }
bool eos() { return dependency_finished() && input_finished() && output_empty(); }
std::vector<OutputSegmentPtr>&& output_segments() { return std::move(_output_segments); }
protected:
MergePathCascadeMerger* _merger;
MergeNode* _parent = nullptr;
// There may be multiply workers work on the same Node. So wee need to know
// the local parallel_idx which the global parallel_idx corresponds to.
@ -187,7 +198,7 @@ protected:
// pair.first store global parallel_idx, pair.second store local parallel_idx
std::unordered_map<int32_t, int32_t> _global_2_local_parallel_idx;
// Every time traversing, current node must all the output of its children
// Output segments of current processing, it will be all fetched by its parent
std::vector<OutputSegmentPtr> _output_segments;
std::mutex _m;
@ -196,15 +207,13 @@ using NodePtr = std::unique_ptr<Node>;
class MergeNode final : public Node {
public:
MergeNode(MergePathCascadeMerger* merger, Node* left, Node* right) : Node(merger), _left(left), _right(right) {}
MergeNode(MergePathCascadeMerger* merger, Node* left, Node* right)
: Node(merger), _left(left), _right(right), _left_buffer({}, 0, 0), _right_buffer({}, 0, 0) {}
void process_input(int32_t parallel_idx) override;
void process_input(const int32_t parallel_idx) override;
void process_input_done() override;
bool dependency_finished() override { return _left->eos() && _right->eos(); }
bool input_finished() override {
return dependency_finished() && _left_input != nullptr && _left_input->len == 0 && _right_input != nullptr &&
_right_input->len == 0;
}
bool has_more_output() override;
bool input_full(Node* child);
private:
void _setup_input();
@ -212,28 +221,31 @@ private:
Node* _left;
Node* _right;
InputSegmentPtr _left_input;
InputSegmentPtr _right_input;
InputSegment _left_buffer;
InputSegment _right_buffer;
bool _input_ready = false;
size_t _merge_length;
size_t _merge_length = 0;
};
class LeafNode final : public Node {
public:
LeafNode(MergePathCascadeMerger* merger) : Node(merger) {}
LeafNode(MergePathCascadeMerger* merger, bool late_materialization)
: Node(merger), _late_materialization(late_materialization) {}
void process_input(int32_t parallel_idx) override;
bool dependency_finished() override { return _provider_eos; }
void process_input(const int32_t parallel_idx) override;
bool is_leaf() override { return true; }
bool input_finished() override { return dependency_finished(); }
bool is_pending() override { return !_provider_eos && !_provider(true, nullptr, nullptr); }
bool has_more_output() override { return !_provider_eos; }
bool provider_pending() { return has_more_output() && !_provider(true, nullptr, nullptr); }
void set_provider(MergePathChunkProvider&& provider) { _provider = std::move(provider); }
void set_provider(MergePathChunkProvider provider) { _provider = std::move(provider); }
private:
bool _provider_eos = false;
ChunkPtr _generate_ordinal(const size_t chunk_id, const size_t num_rows);
const bool _late_materialization;
MergePathChunkProvider _provider;
bool _provider_eos = false;
};
// Transition graph is as follows:
@ -335,14 +347,20 @@ struct Metrics {
// 6 -> Stage::FINISHED
std::vector<RuntimeProfile::Counter*> stage_timers;
std::vector<RuntimeProfile::Counter*> stage_counters;
RuntimeProfile::Counter* _sorted_run_provider_timer;
RuntimeProfile::Counter* _late_materialization_generate_ordinal_timer;
RuntimeProfile::Counter* _late_materialization_restore_according_to_ordinal_timer;
RuntimeProfile::Counter* _late_materialization_max_buffer_chunk_num;
};
} // namespace detail
class MergePathCascadeMerger {
public:
MergePathCascadeMerger(const int32_t degree_of_parallelism, std::vector<ExprContext*> sort_exprs,
const SortDescs& sort_descs, std::vector<MergePathChunkProvider> chunk_providers,
const size_t chunk_size);
MergePathCascadeMerger(const size_t chunk_size, const int32_t degree_of_parallelism,
std::vector<ExprContext*> sort_exprs, const SortDescs& sort_descs,
const TupleDescriptor* tuple_desc, const int64_t offset, const int64_t limit,
std::vector<MergePathChunkProvider> chunk_providers);
const std::vector<ExprContext*>& sort_exprs() const { return _sort_exprs; }
const SortDescs& sort_descs() const { return _sort_descs; }
@ -350,12 +368,12 @@ public:
// There may be several parallelism working on the same stage
// Return true if the current stage's work is done for the particular parallel_idx
bool is_current_stage_finished(int32_t parallel_idx);
bool is_current_stage_finished(const int32_t parallel_idx);
// All the data are coming from chunk_providers, which passes through ctor.
// If one of the providers cannot provider new data at the moment, maybe waiting for network,
// but the provider still alive(not reach eos), then merger will enter to pending stage.
bool is_pending(int32_t parallel_idx);
bool is_pending(const int32_t parallel_idx);
// Return true if merge process is done
bool is_finished();
@ -363,7 +381,10 @@ public:
// All the merge process are triggered by this method
ChunkPtr try_get_next(const int32_t parallel_idx);
void bind_profile(int32_t parallel_idx, RuntimeProfile* profile);
void bind_profile(const int32_t parallel_idx, RuntimeProfile* profile);
size_t add_original_chunk(ChunkPtr&& chunk);
detail::Metrics& get_metrics(const int32_t parallel_idx) { return _metrics[parallel_idx]; }
private:
bool _is_current_stage_done();
@ -371,42 +392,63 @@ private:
void _init();
void _prepare();
void _process(int32_t parallel_idx);
void _split_chunk(int32_t parallel_idx);
void _fetch_chunk(int32_t parallel_idx, ChunkPtr& chunk);
void _process(const int32_t parallel_idx);
void _split_chunk(const int32_t parallel_idx);
void _fetch_chunk(const int32_t parallel_idx, ChunkPtr& chunk);
void _finishing();
void _init_late_materialization();
ChunkPtr _restore_according_to_ordinal(const int32_t parallel_idx, const ChunkPtr& chunk);
void _process_limit(ChunkPtr& chunk);
void _find_unfinished_level();
using Action = std::function<void()>;
void _finish_current_stage(
int32_t parallel_idx, const Action& stage_done_action = []() {});
const int32_t parallel_idx, const Action& stage_done_action = []() {});
bool _has_pending_node();
void _reset_output();
public:
// The ordinal value is comprising the following two parts
// 1. chunk_id, which is assigned by `add_original_chunk
// 2. offset in chunk
// And for sake of performance, we use 64-bit to store these two parts, chunk_id takes first 44-bits
// and offset takes 20-bits.
constexpr static size_t MAX_CHUNK_SIZE = 0xfffff;
constexpr static size_t OFFSET_BITS = 20;
private:
// For each MergeNode, there may be multiply threads working on the same merge processing.
// And here we need to guarantee that each parallelism can process a certain amount of data. Assuming it
// equals to chunk_size right now (can be later optimized), so the total size of the merge should be
// chunk_size * degree_of_parallelism, which is called _streaming_batch_size here.
// And each MergeNode can hold left/right buffer of size within 2 * _streaming_batch_size.
// So the total amount size the whole merge tree will hold is around _degree_of_parallelism * 2 * _streaming_batch_size
// (The number of MergeNode approximately equals to _degree_of_parallelism)
const size_t _chunk_size;
const size_t _streaming_batch_size;
const int32_t _degree_of_parallelism;
const std::vector<ExprContext*> _sort_exprs;
const SortDescs _sort_descs;
std::vector<MergePathChunkProvider> _chunk_providers;
const TupleDescriptor* _tuple_desc;
const int64_t _offset;
const int64_t _limit;
const std::vector<MergePathChunkProvider> _chunk_providers;
Action _finish_merge_action;
// In order to get high performance, all the condition methods like `is_current_stage_finished/is_finished/is_pending`
// don't use global mutex. But here is one critical section of forwarding stage, during which we need to mute all these
// methods to avoid undefined concurrent behaviors.
// The atomicity is provided by the following three atomic fields. The write-read order of these atomic fields must be
// the same among all threads, so the `std::memory_order_seq_cst` is requied when manipulating them.
// 1. _is_forwarding_stage is used for creating a critical section because all the condition methods, like
// `is_current_stage_finished/is_finished/is_pending`, are not protected by global mutex.
// 2. _process_cnts[i] represents how many times for parallel_idx=<i> that method `try_get_next` can be executed. Besides,
// it provides the final consistency of all the following non-atomic fields through
// 1. Read _process_cnts at the begining of try_get_next
// 2. Write _process_cnts at the end of try_get_next
std::atomic<bool> _is_forwarding_stage = false;
std::atomic<detail::Stage> _stage;
// _process_cnts[i] represents how many times for parallel_idx=<i> that method `try_get_next` can be executed
// The final consistency of all the following non-atomic fields are protected by _process_cnts
// Through
// 1. Read _process_cnts at the begining of try_get_next
// 2. Write _process_cnts at tne end of try_get_next
std::vector<std::atomic<size_t>> _process_cnts;
// Merge nodes
@ -423,10 +465,19 @@ private:
// _working_nodes[i] represents the node list for parallel_idx=<i>
std::vector<std::vector<detail::Node*>> _working_nodes;
// Fields used for late materialization
bool _late_materialization = false;
size_t _chunk_id_generator = 0;
size_t _dequeued_chunk_num = 0;
size_t _max_buffer_chunk_num = 0;
std::deque<std::pair<ChunkPtr, size_t>> _original_chunk_buffer;
// Output chunks for each parallelism
std::vector<std::vector<ChunkPtr>> _output_chunks;
std::vector<ChunkPtr> _flat_output_chunks;
size_t _output_idx = 0;
size_t _output_row_num = 0;
bool _short_circuit = false;
std::vector<detail::Metrics> _metrics;
std::chrono::steady_clock::time_point _pending_start;

View File

@ -330,6 +330,10 @@ std::vector<std::shared_ptr<pipeline::OperatorFactory>> TopNNode::_decompose_to_
SourceOperatorFactoryPtr source_operator;
source_operator = std::make_shared<SourceFactory>(context->next_operator_id(), id(), context_factory);
if (enable_parallel_merge) {
down_cast<LocalParallelMergeSortSourceOperatorFactory*>(source_operator.get())
->set_tuple_desc(_materialized_tuple_desc);
}
ops_sink_with_sort.emplace_back(std::move(sink_operator));
context->add_pipeline(ops_sink_with_sort);
@ -364,8 +368,8 @@ pipeline::OpFactories TopNNode::decompose_to_pipeline(pipeline::PipelineBuilderC
// need_merge = true means gather is needed for multiple streams of data
// need_merge = false means gather is no longer needed
bool need_merge = _analytic_partition_exprs.empty();
bool enable_parallel_merge =
_tnode.sort_node.__isset.enable_parallel_merge && _tnode.sort_node.enable_parallel_merge;
bool enable_parallel_merge = !is_partition_topn && need_merge && _tnode.sort_node.__isset.enable_parallel_merge &&
_tnode.sort_node.enable_parallel_merge;
OpFactories operators_source_with_sort;