Signed-off-by: sunny.xl <wxl24life@gmail.com> Co-authored-by: Drake Wang <wxl24life@gmail.com>
This commit is contained in:
parent
7ecb5836a4
commit
7c8e2d1734
|
|
@ -94,17 +94,19 @@ Status ScanNode::prepare(RuntimeState* state) {
|
|||
}
|
||||
|
||||
// Distribute morsels from a single queue to multiple queues
|
||||
static std::map<int, pipeline::MorselQueuePtr> uniform_distribute_morsels(pipeline::MorselQueuePtr morsel_queue,
|
||||
int dop) {
|
||||
static StatusOr<std::map<int, pipeline::MorselQueuePtr>> uniform_distribute_morsels(
|
||||
pipeline::MorselQueuePtr morsel_queue, int dop) {
|
||||
std::map<int, pipeline::MorselQueuePtr> queue_per_driver;
|
||||
std::map<int, pipeline::Morsels> morsels_per_driver;
|
||||
int driver_seq = 0;
|
||||
while (!morsel_queue->empty()) {
|
||||
auto maybe_morsel = morsel_queue->try_get();
|
||||
DCHECK(maybe_morsel.ok());
|
||||
morsels_per_driver[driver_seq].push_back(std::move(maybe_morsel.value()));
|
||||
auto maybe_morsel_status_or = morsel_queue->try_get();
|
||||
if (UNLIKELY(!maybe_morsel_status_or.ok())) {
|
||||
return maybe_morsel_status_or.status();
|
||||
}
|
||||
morsels_per_driver[driver_seq].push_back(std::move(maybe_morsel_status_or.value()));
|
||||
driver_seq = (driver_seq + 1) % dop;
|
||||
}
|
||||
std::map<int, pipeline::MorselQueuePtr> queue_per_driver;
|
||||
|
||||
auto morsel_queue_type = morsel_queue->type();
|
||||
DCHECK(morsel_queue_type == pipeline::MorselQueue::Type::FIXED ||
|
||||
|
|
@ -144,7 +146,7 @@ StatusOr<pipeline::MorselQueueFactoryPtr> ScanNode::convert_scan_range_to_morsel
|
|||
// If not so much morsels, try to assign morsel uniformly among operators to avoid data skew
|
||||
if (!always_shared_scan() && scan_dop > 1 && is_fixed_or_dynamic_morsel_queue &&
|
||||
morsel_queue->num_original_morsels() <= io_parallelism) {
|
||||
auto morsel_queue_map = uniform_distribute_morsels(std::move(morsel_queue), scan_dop);
|
||||
ASSIGN_OR_RETURN(auto morsel_queue_map, uniform_distribute_morsels(std::move(morsel_queue), scan_dop));
|
||||
return std::make_unique<pipeline::IndividualMorselQueueFactory>(std::move(morsel_queue_map),
|
||||
/*could_local_shuffle*/ true);
|
||||
} else {
|
||||
|
|
|
|||
|
|
@ -170,7 +170,18 @@ Status TabletReader::open(const TabletReaderParams& read_params) {
|
|||
split_morsel_queue->set_tablet_schema(_tablet_schema);
|
||||
|
||||
while (true) {
|
||||
auto split = split_morsel_queue->try_get().value();
|
||||
auto split_status_or = split_morsel_queue->try_get();
|
||||
if (UNLIKELY(!split_status_or.ok())) {
|
||||
LOG(WARNING) << "failed to get split morsel: " << split_status_or.status()
|
||||
<< ", query_id: " << print_id(read_params.runtime_state->query_id())
|
||||
<< ", tablet_id: " << tablet_shared_ptr->tablet_id();
|
||||
// clear split tasks, and fallback to non-split mode
|
||||
_split_tasks.clear();
|
||||
_need_split = false;
|
||||
return init_collector(read_params);
|
||||
}
|
||||
|
||||
auto split = std::move(split_status_or.value());
|
||||
if (split != nullptr) {
|
||||
auto ctx = std::make_unique<pipeline::LakeSplitContext>();
|
||||
|
||||
|
|
|
|||
Loading…
Reference in New Issue