[BugFix] Fix fail to get splits from morsel queue introduces crash of be (#62753)

Signed-off-by: sunny.xl <wxl24life@gmail.com>
This commit is contained in:
Drake Wang 2025-10-11 14:27:34 +08:00 committed by GitHub
parent 674145d7c8
commit 1378c139ae
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 21 additions and 8 deletions

View File

@ -94,17 +94,19 @@ Status ScanNode::prepare(RuntimeState* state) {
}
// Distribute morsels from a single queue to multiple queues
static std::map<int, pipeline::MorselQueuePtr> uniform_distribute_morsels(pipeline::MorselQueuePtr morsel_queue,
int dop) {
static StatusOr<std::map<int, pipeline::MorselQueuePtr>> uniform_distribute_morsels(
pipeline::MorselQueuePtr morsel_queue, int dop) {
std::map<int, pipeline::MorselQueuePtr> queue_per_driver;
std::map<int, pipeline::Morsels> morsels_per_driver;
int driver_seq = 0;
while (!morsel_queue->empty()) {
auto maybe_morsel = morsel_queue->try_get();
DCHECK(maybe_morsel.ok());
morsels_per_driver[driver_seq].push_back(std::move(maybe_morsel.value()));
auto maybe_morsel_status_or = morsel_queue->try_get();
if (UNLIKELY(!maybe_morsel_status_or.ok())) {
return maybe_morsel_status_or.status();
}
morsels_per_driver[driver_seq].push_back(std::move(maybe_morsel_status_or.value()));
driver_seq = (driver_seq + 1) % dop;
}
std::map<int, pipeline::MorselQueuePtr> queue_per_driver;
auto morsel_queue_type = morsel_queue->type();
DCHECK(morsel_queue_type == pipeline::MorselQueue::Type::FIXED ||
@ -144,7 +146,7 @@ StatusOr<pipeline::MorselQueueFactoryPtr> ScanNode::convert_scan_range_to_morsel
// If not so much morsels, try to assign morsel uniformly among operators to avoid data skew
if (!always_shared_scan() && scan_dop > 1 && is_fixed_or_dynamic_morsel_queue &&
morsel_queue->num_original_morsels() <= io_parallelism) {
auto morsel_queue_map = uniform_distribute_morsels(std::move(morsel_queue), scan_dop);
ASSIGN_OR_RETURN(auto morsel_queue_map, uniform_distribute_morsels(std::move(morsel_queue), scan_dop));
return std::make_unique<pipeline::IndividualMorselQueueFactory>(std::move(morsel_queue_map),
/*could_local_shuffle*/ true);
} else {

View File

@ -171,7 +171,18 @@ Status TabletReader::open(const TabletReaderParams& read_params) {
split_morsel_queue->set_tablet_schema(_tablet_schema);
while (true) {
auto split = split_morsel_queue->try_get().value();
auto split_status_or = split_morsel_queue->try_get();
if (UNLIKELY(!split_status_or.ok())) {
LOG(WARNING) << "failed to get split morsel: " << split_status_or.status()
<< ", query_id: " << print_id(read_params.runtime_state->query_id())
<< ", tablet_id: " << tablet_shared_ptr->tablet_id();
// clear split tasks, and fallback to non-split mode
_split_tasks.clear();
_need_split = false;
return init_collector(read_params);
}
auto split = std::move(split_status_or.value());
if (split != nullptr) {
auto ctx = std::make_unique<pipeline::LakeSplitContext>();