diff --git a/be/src/exec/scan_node.cpp b/be/src/exec/scan_node.cpp index 1fa95ccf0e6..ee02b23b159 100644 --- a/be/src/exec/scan_node.cpp +++ b/be/src/exec/scan_node.cpp @@ -94,17 +94,19 @@ Status ScanNode::prepare(RuntimeState* state) { } // Distribute morsels from a single queue to multiple queues -static std::map uniform_distribute_morsels(pipeline::MorselQueuePtr morsel_queue, - int dop) { +static StatusOr> uniform_distribute_morsels( + pipeline::MorselQueuePtr morsel_queue, int dop) { + std::map queue_per_driver; std::map morsels_per_driver; int driver_seq = 0; while (!morsel_queue->empty()) { - auto maybe_morsel = morsel_queue->try_get(); - DCHECK(maybe_morsel.ok()); - morsels_per_driver[driver_seq].push_back(std::move(maybe_morsel.value())); + auto maybe_morsel_status_or = morsel_queue->try_get(); + if (UNLIKELY(!maybe_morsel_status_or.ok())) { + return maybe_morsel_status_or.status(); + } + morsels_per_driver[driver_seq].push_back(std::move(maybe_morsel_status_or.value())); driver_seq = (driver_seq + 1) % dop; } - std::map queue_per_driver; auto morsel_queue_type = morsel_queue->type(); DCHECK(morsel_queue_type == pipeline::MorselQueue::Type::FIXED || @@ -144,7 +146,7 @@ StatusOr ScanNode::convert_scan_range_to_morsel // If not so much morsels, try to assign morsel uniformly among operators to avoid data skew if (!always_shared_scan() && scan_dop > 1 && is_fixed_or_dynamic_morsel_queue && morsel_queue->num_original_morsels() <= io_parallelism) { - auto morsel_queue_map = uniform_distribute_morsels(std::move(morsel_queue), scan_dop); + ASSIGN_OR_RETURN(auto morsel_queue_map, uniform_distribute_morsels(std::move(morsel_queue), scan_dop)); return std::make_unique(std::move(morsel_queue_map), /*could_local_shuffle*/ true); } else { diff --git a/be/src/storage/lake/tablet_reader.cpp b/be/src/storage/lake/tablet_reader.cpp index e7e7e8c8f8f..330ae04006a 100644 --- a/be/src/storage/lake/tablet_reader.cpp +++ b/be/src/storage/lake/tablet_reader.cpp @@ -171,7 +171,18 @@ Status TabletReader::open(const TabletReaderParams& read_params) { split_morsel_queue->set_tablet_schema(_tablet_schema); while (true) { - auto split = split_morsel_queue->try_get().value(); + auto split_status_or = split_morsel_queue->try_get(); + if (UNLIKELY(!split_status_or.ok())) { + LOG(WARNING) << "failed to get split morsel: " << split_status_or.status() + << ", query_id: " << print_id(read_params.runtime_state->query_id()) + << ", tablet_id: " << tablet_shared_ptr->tablet_id(); + // clear split tasks, and fallback to non-split mode + _split_tasks.clear(); + _need_split = false; + return init_collector(read_params); + } + + auto split = std::move(split_status_or.value()); if (split != nullptr) { auto ctx = std::make_unique();