[BugFix] Fix multi statements transaction fail due to mix file bundle write (backport #63823) (backport #63871) (#63873)

Co-authored-by: mergify[bot] <37929162+mergify[bot]@users.noreply.github.com>
Co-authored-by: meegoo <meegoo.sr@gmail.com>
This commit is contained in:
mergify[bot] 2025-10-10 05:20:55 +00:00 committed by GitHub
parent d928ccc269
commit 23a222e669
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 64 additions and 26 deletions

View File

@ -748,13 +748,30 @@ Status LakeTabletsChannel::_create_delta_writers(const PTabletWriterOpenRequest&
std::vector<int64_t> tablet_ids;
tablet_ids.reserve(params.tablets_size());
bool multi_stmt = _is_multi_statements_txn(params);
for (const PTabletWithPartition& tablet : params.tablets()) {
BundleWritableFileContext* bundle_writable_file_context = nullptr;
if (_is_data_file_bundle_enabled(params)) {
// Do NOT enable bundle write for a multi-statements transaction.
// Rationale:
// A multi-statements txn may invoke multiple open/add_chunk cycles and flush segments in batches.
// If some early segments are appended into a bundle file while later segments are flushed as
// standalone segment files (because a subsequent writer/open does not attach to the previous
// bundle context), the final Rowset metadata will contain a mixed set: a subset of segments with
// bundle offsets recorded and the remaining without any offsets. Downstream rowset load logic
// assumes a 1:1 correspondence between the 'segments' list and 'bundle_file_offsets' when the
// latter is present. A partial list therefore triggers size mismatch errors when loading.
// Mitigation Strategy:
// Disable bundling entirely for multi-statements txns so every segment is materialized as an
// independent file, guaranteeing consistent metadata and eliminating offset mismatch risk.
// NOTE: If future optimization desires bundling + multi-stmt, it must introduce an atomic merge
// mechanism ensuring offsets array completeness before publish.
if (!multi_stmt && _is_data_file_bundle_enabled(params)) {
if (_bundle_wfile_ctx_by_partition.count(tablet.partition_id()) == 0) {
_bundle_wfile_ctx_by_partition[tablet.partition_id()] = std::make_unique<BundleWritableFileContext>();
}
bundle_writable_file_context = _bundle_wfile_ctx_by_partition[tablet.partition_id()].get();
} else if (multi_stmt && _is_data_file_bundle_enabled(params)) {
VLOG(1) << "disable bundle write for multi statements txn partition=" << tablet.partition_id();
}
if (_delta_writers.count(tablet.tablet_id()) != 0) {
// already created for the tablet, usually in incremental open case
@ -778,7 +795,7 @@ Status LakeTabletsChannel::_create_delta_writers(const PTabletWriterOpenRequest&
.set_profile(_profile)
.set_bundle_writable_file_context(bundle_writable_file_context)
.set_global_dicts(&_global_dicts)
.set_is_multi_statements_txn(_is_multi_statements_txn(params))
.set_is_multi_statements_txn(multi_stmt)
.build());
_delta_writers.emplace(tablet.tablet_id(), std::move(writer));
tablet_ids.emplace_back(tablet.tablet_id());

View File

@ -696,6 +696,9 @@ void MetaFileBuilder::add_rowset(const RowsetMetadataPB& rowset_pb, const std::m
_pending_rowset_data.dels.insert(_pending_rowset_data.dels.end(), dels.begin(), dels.end());
_pending_rowset_data.del_encryption_metas.insert(_pending_rowset_data.del_encryption_metas.end(),
del_encryption_metas.begin(), del_encryption_metas.end());
// Track cumulative number of segments already added when batch applying multiple opwrites.
_pending_rowset_data.assigned_segment_id += rowset_pb.segments_size();
}
void MetaFileBuilder::set_final_rowset() {

View File

@ -72,6 +72,9 @@ public:
void set_recover_flag(RecoverFlag flag) { _recover_flag = flag; }
RecoverFlag recover_flag() const { return _recover_flag; }
// Number of segments already assigned (accumulated) in current pending batch rowset build.
uint32_t assigned_segment_id() const { return _pending_rowset_data.assigned_segment_id; }
void finalize_sstable_meta(const PersistentIndexSstableMetaPB& sstable_meta);
void remove_compacted_sst(const TxnLogPB_OpCompaction& op_compaction);
@ -96,6 +99,7 @@ private:
std::vector<FileMetaPB> orphan_files;
std::vector<std::string> dels;
std::vector<std::string> del_encryption_metas;
uint32_t assigned_segment_id = 0;
};
Tablet _tablet;

View File

@ -224,8 +224,6 @@ public:
&_tablet, _index_entry, &_builder,
_base_version, true));
}
_metadata->set_next_rowset_id(_metadata->next_rowset_id() +
std::max(1, op_write.rowset().segments_size()));
}
}
_builder.set_final_rowset();

View File

@ -247,41 +247,55 @@ Status UpdateManager::publish_primary_key_tablet(const TxnLogPB_OpWrite& op_writ
};
state.init(params);
// Init delvec state.
// Map from rssid (rowset id + segment offset) to the list of deleted rowids collected during this publish.
PrimaryIndex::DeletesMap new_deletes;
for (uint32_t segment_id = 0; segment_id < op_write.rowset().segments_size(); segment_id++) {
new_deletes[rowset_id + segment_id] = {};
// Global segment id offset assigned by builder when batch applying multiple op_write in a single publish.
uint32_t assigned_global_segments = batch_apply ? builder->assigned_segment_id() : 0;
// Number of segments in the incoming rowset of this op_write.
uint32_t local_segments = op_write.rowset().segments_size();
for (uint32_t local_id = 0; local_id < local_segments; ++local_id) {
uint32_t global_segment_id = assigned_global_segments + local_id;
new_deletes[rowset_id + global_segment_id] = {};
}
// Rssid of delete files is equal to `rowset_id + op_offset`, and delete is always after upsert now,
// so we use max segment id as `op_offset`.
// TODO : support real order of mix upsert and delete in one transaction.
// The rssid for delete files equals `rowset_id + op_offset`. Since delete currently happens after upsert,
// we use the max segment id as the `op_offset` for rebuild. This is a simplification until mixed
// upsert+delete order in a single transaction is supported.
// TODO: Support the actual interleaving order of upsert and delete within one transaction.
const uint32_t del_rebuild_rssid = rowset_id + std::max(op_write.rowset().segments_size(), 1) - 1;
// 2. Handle segment one by one to save memory usage.
for (uint32_t segment_id = 0; segment_id < op_write.rowset().segments_size(); segment_id++) {
RETURN_IF_ERROR(state.load_segment(segment_id, params, base_version, true /*reslove conflict*/,
false /*no need lock*/));
for (uint32_t local_id = 0; local_id < local_segments; ++local_id) {
uint32_t global_segment_id = assigned_global_segments + local_id;
// Load update state of the current segment, resolving conflicts but without taking index lock.
RETURN_IF_ERROR(
state.load_segment(local_id, params, base_version, true /*reslove conflict*/, false /*no need lock*/));
_update_state_cache.update_object_size(state_entry, state.memory_usage());
// 2.1 rewrite segment file if it is partial update
RETURN_IF_ERROR(state.rewrite_segment(segment_id, txn_id, params, &replace_segments, &orphan_files));
rssid_fileinfo_container.add_rssid_to_file(op_write.rowset(), metadata->next_rowset_id(), segment_id,
// 2.1 For partial update, rewrite the segment file to generate replace segments and orphan files.
RETURN_IF_ERROR(state.rewrite_segment(local_id, txn_id, params, &replace_segments, &orphan_files));
rssid_fileinfo_container.add_rssid_to_file(op_write.rowset(), metadata->next_rowset_id(), local_id,
replace_segments);
// handle merge condition, skip update row which's merge condition column value is smaller than current row
VLOG(2) << strings::Substitute(
"[publish_pk_tablet][segment_loop] tablet:$0 txn:$1 assigned:$2 local_id:$3 global_id:$4 "
"segments_local:$5",
tablet->id(), txn_id, assigned_global_segments, local_id, global_segment_id, local_segments);
// If a merge condition is configured, only update rows that satisfy the condition.
int32_t condition_column = _get_condition_column(op_write, *tablet_schema);
// 2.2 update primary index, and generate delete info.
// 2.2 Update primary index and collect delete information caused by key replacement.
TRACE_COUNTER_SCOPE_LATENCY_US("update_index_latency_us");
DCHECK(state.upserts(segment_id) != nullptr);
DCHECK(state.upserts(local_id) != nullptr);
if (condition_column < 0) {
RETURN_IF_ERROR(_do_update(rowset_id, segment_id, state.upserts(segment_id), index, &new_deletes));
RETURN_IF_ERROR(_do_update(rowset_id, global_segment_id, state.upserts(local_id), index, &new_deletes));
} else {
RETURN_IF_ERROR(_do_update_with_condition(params, rowset_id, segment_id, condition_column,
state.upserts(segment_id)->pk_column, index, &new_deletes));
RETURN_IF_ERROR(_do_update_with_condition(params, rowset_id, global_segment_id, condition_column,
state.upserts(local_id)->pk_column, index, &new_deletes));
}
// 2.3 handle auto increment deletes
if (state.auto_increment_deletes(segment_id) != nullptr) {
// 2.3 Apply deletes generated by auto-increment conflict handling (if any).
if (state.auto_increment_deletes(local_id) != nullptr) {
RETURN_IF_ERROR(
index.erase(metadata, *state.auto_increment_deletes(segment_id), &new_deletes, del_rebuild_rssid));
index.erase(metadata, *state.auto_increment_deletes(local_id), &new_deletes, del_rebuild_rssid));
}
// Refresh memory accounting after index/state changes.
_index_cache.update_object_size(index_entry, index.memory_usage());
state.release_segment(segment_id);
state.release_segment(local_id);
_update_state_cache.update_object_size(state_entry, state.memory_usage());
}
@ -304,7 +318,9 @@ Status UpdateManager::publish_primary_key_tablet(const TxnLogPB_OpWrite& op_writ
std::map<uint32_t, size_t> segment_id_to_add_dels;
for (auto& new_delete : new_deletes) {
uint32_t rssid = new_delete.first;
if (rssid >= rowset_id && rssid < rowset_id + op_write.rowset().segments_size()) {
uint32_t assigned_segment_id = batch_apply ? builder->assigned_segment_id() : 0;
if (rssid >= rowset_id + assigned_segment_id &&
rssid < rowset_id + assigned_segment_id + op_write.rowset().segments_size()) {
// it's newly added rowset's segment, do not have latest delvec yet
new_del_vecs[idx].first = rssid;
new_del_vecs[idx].second = std::make_shared<DelVector>();