From 4a3092a3063c099f7c457c179cdbfa5d31777606 Mon Sep 17 00:00:00 2001 From: meegoo Date: Thu, 9 Oct 2025 20:36:11 -0700 Subject: [PATCH] [BugFix] Fix multi statements transaction fail due to mix file bundle write (#63823) Signed-off-by: meegoo --- be/src/runtime/lake_tablets_channel.cpp | 21 +++++++- be/src/storage/lake/meta_file.cpp | 3 ++ be/src/storage/lake/meta_file.h | 4 ++ be/src/storage/lake/txn_log_applier.cpp | 2 - be/src/storage/lake/update_manager.cpp | 64 +++++++++++++++---------- 5 files changed, 66 insertions(+), 28 deletions(-) diff --git a/be/src/runtime/lake_tablets_channel.cpp b/be/src/runtime/lake_tablets_channel.cpp index 474411765b3..6056e14eba0 100644 --- a/be/src/runtime/lake_tablets_channel.cpp +++ b/be/src/runtime/lake_tablets_channel.cpp @@ -748,13 +748,30 @@ Status LakeTabletsChannel::_create_delta_writers(const PTabletWriterOpenRequest& std::vector tablet_ids; tablet_ids.reserve(params.tablets_size()); + bool multi_stmt = _is_multi_statements_txn(params); for (const PTabletWithPartition& tablet : params.tablets()) { BundleWritableFileContext* bundle_writable_file_context = nullptr; - if (_is_data_file_bundle_enabled(params)) { + // Do NOT enable bundle write for a multi-statements transaction. + // Rationale: + // A multi-statements txn may invoke multiple open/add_chunk cycles and flush segments in batches. + // If some early segments are appended into a bundle file while later segments are flushed as + // standalone segment files (because a subsequent writer/open does not attach to the previous + // bundle context), the final Rowset metadata will contain a mixed set: a subset of segments with + // bundle offsets recorded and the remaining without any offsets. Downstream rowset load logic + // assumes a 1:1 correspondence between the 'segments' list and 'bundle_file_offsets' when the + // latter is present. A partial list therefore triggers size mismatch errors when loading. + // Mitigation Strategy: + // Disable bundling entirely for multi-statements txns so every segment is materialized as an + // independent file, guaranteeing consistent metadata and eliminating offset mismatch risk. + // NOTE: If future optimization desires bundling + multi-stmt, it must introduce an atomic merge + // mechanism ensuring offsets array completeness before publish. + if (!multi_stmt && _is_data_file_bundle_enabled(params)) { if (_bundle_wfile_ctx_by_partition.count(tablet.partition_id()) == 0) { _bundle_wfile_ctx_by_partition[tablet.partition_id()] = std::make_unique(); } bundle_writable_file_context = _bundle_wfile_ctx_by_partition[tablet.partition_id()].get(); + } else if (multi_stmt && _is_data_file_bundle_enabled(params)) { + VLOG(1) << "disable bundle write for multi statements txn partition=" << tablet.partition_id(); } if (_delta_writers.count(tablet.tablet_id()) != 0) { // already created for the tablet, usually in incremental open case @@ -778,7 +795,7 @@ Status LakeTabletsChannel::_create_delta_writers(const PTabletWriterOpenRequest& .set_profile(_profile) .set_bundle_writable_file_context(bundle_writable_file_context) .set_global_dicts(&_global_dicts) - .set_is_multi_statements_txn(_is_multi_statements_txn(params)) + .set_is_multi_statements_txn(multi_stmt) .build()); _delta_writers.emplace(tablet.tablet_id(), std::move(writer)); tablet_ids.emplace_back(tablet.tablet_id()); diff --git a/be/src/storage/lake/meta_file.cpp b/be/src/storage/lake/meta_file.cpp index 926d532738b..0cab4efcb2f 100644 --- a/be/src/storage/lake/meta_file.cpp +++ b/be/src/storage/lake/meta_file.cpp @@ -707,6 +707,9 @@ void MetaFileBuilder::add_rowset(const RowsetMetadataPB& rowset_pb, const std::m _pending_rowset_data.dels.insert(_pending_rowset_data.dels.end(), dels.begin(), dels.end()); _pending_rowset_data.del_encryption_metas.insert(_pending_rowset_data.del_encryption_metas.end(), del_encryption_metas.begin(), del_encryption_metas.end()); + + // Track cumulative number of segments already added when batch applying multiple opwrites. + _pending_rowset_data.assigned_segment_id += rowset_pb.segments_size(); } void MetaFileBuilder::set_final_rowset() { diff --git a/be/src/storage/lake/meta_file.h b/be/src/storage/lake/meta_file.h index cd7874caec1..6c2efce9585 100644 --- a/be/src/storage/lake/meta_file.h +++ b/be/src/storage/lake/meta_file.h @@ -72,6 +72,9 @@ public: void set_recover_flag(RecoverFlag flag) { _recover_flag = flag; } RecoverFlag recover_flag() const { return _recover_flag; } + // Number of segments already assigned (accumulated) in current pending batch rowset build. + uint32_t assigned_segment_id() const { return _pending_rowset_data.assigned_segment_id; } + void finalize_sstable_meta(const PersistentIndexSstableMetaPB& sstable_meta); void remove_compacted_sst(const TxnLogPB_OpCompaction& op_compaction); @@ -105,6 +108,7 @@ private: std::vector orphan_files; std::vector dels; std::vector del_encryption_metas; + uint32_t assigned_segment_id = 0; }; Tablet _tablet; diff --git a/be/src/storage/lake/txn_log_applier.cpp b/be/src/storage/lake/txn_log_applier.cpp index bce03ff224f..ca6d3fecf77 100644 --- a/be/src/storage/lake/txn_log_applier.cpp +++ b/be/src/storage/lake/txn_log_applier.cpp @@ -225,8 +225,6 @@ public: &_tablet, _index_entry, &_builder, _base_version, true)); } - _metadata->set_next_rowset_id(_metadata->next_rowset_id() + - std::max(1, op_write.rowset().segments_size())); } } _builder.set_final_rowset(); diff --git a/be/src/storage/lake/update_manager.cpp b/be/src/storage/lake/update_manager.cpp index 056cc95c099..bfcfb3ba3f4 100644 --- a/be/src/storage/lake/update_manager.cpp +++ b/be/src/storage/lake/update_manager.cpp @@ -261,50 +261,64 @@ Status UpdateManager::publish_primary_key_tablet(const TxnLogPB_OpWrite& op_writ }; state.init(params); // Init delvec state. + // Map from rssid (rowset id + segment offset) to the list of deleted rowids collected during this publish. PrimaryIndex::DeletesMap new_deletes; - for (uint32_t segment_id = 0; segment_id < op_write.rowset().segments_size(); segment_id++) { - new_deletes[rowset_id + segment_id] = {}; + // Global segment id offset assigned by builder when batch applying multiple op_write in a single publish. + uint32_t assigned_global_segments = batch_apply ? builder->assigned_segment_id() : 0; + // Number of segments in the incoming rowset of this op_write. + uint32_t local_segments = op_write.rowset().segments_size(); + for (uint32_t local_id = 0; local_id < local_segments; ++local_id) { + uint32_t global_segment_id = assigned_global_segments + local_id; + new_deletes[rowset_id + global_segment_id] = {}; } - // Rssid of delete files is equal to `rowset_id + op_offset`, and delete is always after upsert now, - // so we use max segment id as `op_offset`. - // TODO : support real order of mix upsert and delete in one transaction. + // The rssid for delete files equals `rowset_id + op_offset`. Since delete currently happens after upsert, + // we use the max segment id as the `op_offset` for rebuild. This is a simplification until mixed + // upsert+delete order in a single transaction is supported. + // TODO: Support the actual interleaving order of upsert and delete within one transaction. const uint32_t del_rebuild_rssid = rowset_id + std::max(op_write.rowset().segments_size(), 1) - 1; // 2. Handle segment one by one to save memory usage. - for (uint32_t segment_id = 0; segment_id < op_write.rowset().segments_size(); segment_id++) { - RETURN_IF_ERROR(state.load_segment(segment_id, params, base_version, true /*reslove conflict*/, - false /*no need lock*/)); + for (uint32_t local_id = 0; local_id < local_segments; ++local_id) { + uint32_t global_segment_id = assigned_global_segments + local_id; + // Load update state of the current segment, resolving conflicts but without taking index lock. + RETURN_IF_ERROR( + state.load_segment(local_id, params, base_version, true /*reslove conflict*/, false /*no need lock*/)); _update_state_cache.update_object_size(state_entry, state.memory_usage()); - // 2.1 rewrite segment file if it is partial update - RETURN_IF_ERROR(state.rewrite_segment(segment_id, txn_id, params, &replace_segments, &orphan_files)); - rssid_fileinfo_container.add_rssid_to_file(op_write.rowset(), metadata->next_rowset_id(), segment_id, + // 2.1 For partial update, rewrite the segment file to generate replace segments and orphan files. + RETURN_IF_ERROR(state.rewrite_segment(local_id, txn_id, params, &replace_segments, &orphan_files)); + rssid_fileinfo_container.add_rssid_to_file(op_write.rowset(), metadata->next_rowset_id(), local_id, replace_segments); - // handle merge condition, skip update row which's merge condition column value is smaller than current row + VLOG(2) << strings::Substitute( + "[publish_pk_tablet][segment_loop] tablet:$0 txn:$1 assigned:$2 local_id:$3 global_id:$4 " + "segments_local:$5", + tablet->id(), txn_id, assigned_global_segments, local_id, global_segment_id, local_segments); + // If a merge condition is configured, only update rows that satisfy the condition. int32_t condition_column = _get_condition_column(op_write, *tablet_schema); - // 2.2 update primary index, and generate delete info. + // 2.2 Update primary index and collect delete information caused by key replacement. TRACE_COUNTER_SCOPE_LATENCY_US("update_index_latency_us"); - DCHECK(state.upserts(segment_id) != nullptr); + DCHECK(state.upserts(local_id) != nullptr); if (condition_column < 0) { RETURN_IF_ERROR( - _do_update(rowset_id, segment_id, state.upserts(segment_id), index, &new_deletes, + _do_update(rowset_id, global_segment_id, state.upserts(local_id), index, &new_deletes, op_write.ssts_size() > 0 && use_cloud_native_pk_index(*metadata) /* read pk index only when ingest sst */)); } else { - RETURN_IF_ERROR(_do_update_with_condition(params, rowset_id, segment_id, condition_column, - state.upserts(segment_id)->pk_column, index, &new_deletes)); + RETURN_IF_ERROR(_do_update_with_condition(params, rowset_id, global_segment_id, condition_column, + state.upserts(local_id)->pk_column, index, &new_deletes)); } - // 2.3 handle auto increment deletes - if (state.auto_increment_deletes(segment_id) != nullptr) { + // 2.3 Apply deletes generated by auto-increment conflict handling (if any). + if (state.auto_increment_deletes(local_id) != nullptr) { RETURN_IF_ERROR( - index.erase(metadata, *state.auto_increment_deletes(segment_id), &new_deletes, del_rebuild_rssid)); + index.erase(metadata, *state.auto_increment_deletes(local_id), &new_deletes, del_rebuild_rssid)); } + // Refresh memory accounting after index/state changes. _index_cache.update_object_size(index_entry, index.memory_usage()); - state.release_segment(segment_id); + state.release_segment(local_id); _update_state_cache.update_object_size(state_entry, state.memory_usage()); if (op_write.ssts_size() > 0 && condition_column < 0 && use_cloud_native_pk_index(*metadata)) { // TODO support condition column with sst ingestion. // rowset_id + segment_id is the rssid of this segment - RETURN_IF_ERROR(index.ingest_sst(op_write.ssts(segment_id), rowset_id + segment_id, metadata->version(), - DelvecPagePB() /* empty */, nullptr)); + RETURN_IF_ERROR(index.ingest_sst(op_write.ssts(local_id), rowset_id + global_segment_id, + metadata->version(), DelvecPagePB() /* empty */, nullptr)); } } @@ -327,7 +341,9 @@ Status UpdateManager::publish_primary_key_tablet(const TxnLogPB_OpWrite& op_writ std::map segment_id_to_add_dels; for (auto& new_delete : new_deletes) { uint32_t rssid = new_delete.first; - if (rssid >= rowset_id && rssid < rowset_id + op_write.rowset().segments_size()) { + uint32_t assigned_segment_id = batch_apply ? builder->assigned_segment_id() : 0; + if (rssid >= rowset_id + assigned_segment_id && + rssid < rowset_id + assigned_segment_id + op_write.rowset().segments_size()) { // it's newly added rowset's segment, do not have latest delvec yet new_del_vecs[idx].first = rssid; new_del_vecs[idx].second = std::make_shared();