From ee93eaef3987f3d67ecf5843447936a0cd85b771 Mon Sep 17 00:00:00 2001 From: Yixin Luo <18810541851@163.com> Date: Fri, 26 Sep 2025 20:13:16 +0800 Subject: [PATCH] [Enhancement] Improve delvec file cleanup logic to properly handle pk index references (#63384) Signed-off-by: luohaha <18810541851@163.com> --- be/src/storage/lake/compaction_task.cpp | 2 +- be/src/storage/lake/lake_delvec_loader.cpp | 5 + be/src/storage/lake/lake_delvec_loader.h | 1 + be/src/storage/lake/lake_persistent_index.cpp | 42 ++++--- be/src/storage/lake/lake_persistent_index.h | 9 +- be/src/storage/lake/lake_primary_index.cpp | 8 +- be/src/storage/lake/lake_primary_index.h | 2 +- be/src/storage/lake/meta_file.cpp | 91 +++++++------- be/src/storage/lake/meta_file.h | 11 ++ .../storage/lake/persistent_index_sstable.cpp | 15 ++- .../storage/lake/persistent_index_sstable.h | 4 +- be/src/storage/lake/update_manager.cpp | 19 +-- be/src/storage/lake/update_manager.h | 2 +- .../lake/lake_persistent_index_test.cpp | 20 ++-- be/test/storage/lake/meta_file_test.cpp | 113 ++++++++++++++++++ .../lake/persistent_index_sstable_test.cpp | 4 +- gensrc/proto/lake_types.proto | 24 ++++ gensrc/proto/types.proto | 24 ---- 18 files changed, 276 insertions(+), 120 deletions(-) diff --git a/be/src/storage/lake/compaction_task.cpp b/be/src/storage/lake/compaction_task.cpp index 7e00e709e30..99f515b0d1d 100644 --- a/be/src/storage/lake/compaction_task.cpp +++ b/be/src/storage/lake/compaction_task.cpp @@ -39,7 +39,7 @@ Status CompactionTask::execute_index_major_compaction(TxnLogPB* txn_log) { auto metadata = _tablet.metadata(); if (metadata->enable_persistent_index() && metadata->persistent_index_type() == PersistentIndexTypePB::CLOUD_NATIVE) { - RETURN_IF_ERROR(_tablet.tablet_manager()->update_mgr()->execute_index_major_compaction(*metadata, txn_log)); + RETURN_IF_ERROR(_tablet.tablet_manager()->update_mgr()->execute_index_major_compaction(metadata, txn_log)); if (txn_log->has_op_compaction() && !txn_log->op_compaction().input_sstables().empty()) { size_t total_input_sstable_file_size = 0; for (const auto& input_sstable : txn_log->op_compaction().input_sstables()) { diff --git a/be/src/storage/lake/lake_delvec_loader.cpp b/be/src/storage/lake/lake_delvec_loader.cpp index 4825c4c4660..7e360057f4b 100644 --- a/be/src/storage/lake/lake_delvec_loader.cpp +++ b/be/src/storage/lake/lake_delvec_loader.cpp @@ -34,6 +34,11 @@ Status LakeDelvecLoader::load(const TabletSegmentId& tsid, int64_t version, DelV return load_from_file(tsid, version, pdelvec); } +Status LakeDelvecLoader::load_from_meta(const TabletMetadataPtr& metadata, const DelvecPagePB& delvec_page, + DelVectorPtr* pdelvec) { + return lake::get_del_vec(_tablet_manager, *metadata, delvec_page, _fill_cache, _lake_io_opts, pdelvec->get()); +} + Status LakeDelvecLoader::load_from_file(const TabletSegmentId& tsid, int64_t version, DelVectorPtr* pdelvec) { (*pdelvec).reset(new DelVector()); // 2. find in delvec file diff --git a/be/src/storage/lake/lake_delvec_loader.h b/be/src/storage/lake/lake_delvec_loader.h index b6d2c386cd6..f3942620651 100644 --- a/be/src/storage/lake/lake_delvec_loader.h +++ b/be/src/storage/lake/lake_delvec_loader.h @@ -32,6 +32,7 @@ public: _fill_cache(fill_cache), _lake_io_opts(std::move(lake_io_opts)) {} Status load(const TabletSegmentId& tsid, int64_t version, DelVectorPtr* pdelvec); + Status load_from_meta(const TabletMetadataPtr& metadata, const DelvecPagePB& delvec_page, DelVectorPtr* pdelvec); Status load_from_file(const TabletSegmentId& tsid, int64_t version, DelVectorPtr* pdelvec); private: diff --git a/be/src/storage/lake/lake_persistent_index.cpp b/be/src/storage/lake/lake_persistent_index.cpp index 8811b013f7f..c012c8d1952 100644 --- a/be/src/storage/lake/lake_persistent_index.cpp +++ b/be/src/storage/lake/lake_persistent_index.cpp @@ -177,11 +177,12 @@ LakePersistentIndex::~LakePersistentIndex() { _sstables.clear(); } -Status LakePersistentIndex::init(const PersistentIndexSstableMetaPB& sstable_meta) { +Status LakePersistentIndex::init(const TabletMetadataPtr& metadata) { auto* block_cache = _tablet_mgr->update_mgr()->block_cache(); if (block_cache == nullptr) { return Status::InternalError("Block cache is null."); } + const PersistentIndexSstableMetaPB& sstable_meta = metadata->sstable_meta(); uint64_t max_rss_rowid = 0; for (auto& sstable_pb : sstable_meta.sstables()) { RandomAccessFileOptions opts; @@ -192,7 +193,8 @@ Status LakePersistentIndex::init(const PersistentIndexSstableMetaPB& sstable_met ASSIGN_OR_RETURN(auto rf, fs::new_random_access_file( opts, _tablet_mgr->sst_location(_tablet_id, sstable_pb.filename()))); auto sstable = std::make_unique(); - RETURN_IF_ERROR(sstable->init(std::move(rf), sstable_pb, block_cache->cache(), _tablet_mgr, _tablet_id)); + RETURN_IF_ERROR(sstable->init(std::move(rf), sstable_pb, block_cache->cache(), true /* need filter */, + nullptr /* delvec */, metadata, _tablet_mgr)); _sstables.emplace_back(std::move(sstable)); max_rss_rowid = std::max(max_rss_rowid, sstable_pb.max_rss_rowid()); } @@ -257,14 +259,14 @@ Status LakePersistentIndex::minor_compact() { sstable_pb.set_max_rss_rowid(_memtable->max_rss_rowid()); sstable_pb.set_encryption_meta(encryption_meta); TEST_SYNC_POINT_CALLBACK("LakePersistentIndex::minor_compact:inject_predicate", &sstable_pb); - RETURN_IF_ERROR(sstable->init(std::move(rf), sstable_pb, block_cache->cache(), _tablet_mgr, _tablet_id)); + RETURN_IF_ERROR(sstable->init(std::move(rf), sstable_pb, block_cache->cache())); _sstables.emplace_back(std::move(sstable)); TRACE_COUNTER_INCREMENT("minor_compact_times", 1); return Status::OK(); } -Status LakePersistentIndex::ingest_sst(const FileMetaPB& sst_meta, uint32_t rssid, int64_t version, bool is_compaction, - DelVectorPtr delvec) { +Status LakePersistentIndex::ingest_sst(const FileMetaPB& sst_meta, uint32_t rssid, int64_t version, + const DelvecPagePB& delvec_page, DelVectorPtr delvec) { auto* block_cache = _tablet_mgr->update_mgr()->block_cache(); if (block_cache == nullptr) { return Status::InternalError("Block cache is null."); @@ -287,12 +289,14 @@ Status LakePersistentIndex::ingest_sst(const FileMetaPB& sst_meta, uint32_t rssi sstable_pb.set_shared_version(version); sstable_pb.set_encryption_meta(sst_meta.encryption_meta()); // sstable generated by compaction need delvec to resolve conflict. - sstable_pb.set_has_delvec(is_compaction); + if (delvec_page.size() > 0) { + sstable_pb.mutable_delvec()->CopyFrom(delvec_page); + } // use UINT32_MAX - 1 as max rowid here to indicate all rows of this segment are already contained in this sst. // We don't use UINT32_MAX as max rowid because it is reserved for delete rows. sstable_pb.set_max_rss_rowid((static_cast(rssid) << 32) | (UINT32_MAX - 1)); - RETURN_IF_ERROR(sstable->init(std::move(rf), sstable_pb, block_cache->cache(), _tablet_mgr, _tablet_id, - true /* need filter */, std::move(delvec))); + RETURN_IF_ERROR( + sstable->init(std::move(rf), sstable_pb, block_cache->cache(), true /* need filter */, std::move(delvec))); _sstables.emplace_back(std::move(sstable)); TRACE_COUNTER_INCREMENT("ingest_sst_times", 1); return Status::OK(); @@ -471,7 +475,7 @@ void LakePersistentIndex::pick_sstables_for_merge(const PersistentIndexSstableMe } Status LakePersistentIndex::prepare_merging_iterator( - TabletManager* tablet_mgr, const TabletMetadata& metadata, TxnLogPB* txn_log, + TabletManager* tablet_mgr, const TabletMetadataPtr& metadata, TxnLogPB* txn_log, std::vector>* merging_sstables, std::unique_ptr* merging_iter_ptr, bool* merge_base_level) { sstable::ReadOptions read_options; @@ -484,11 +488,11 @@ Status LakePersistentIndex::prepare_merging_iterator( } }); - iters.reserve(metadata.sstable_meta().sstables().size()); + iters.reserve(metadata->sstable_meta().sstables().size()); std::stringstream ss_debug; std::vector sstables_to_merge; // Pick sstable for merge, decide to use base merge or cumulative merge. - pick_sstables_for_merge(metadata.sstable_meta(), &sstables_to_merge, merge_base_level); + pick_sstables_for_merge(metadata->sstable_meta(), &sstables_to_merge, merge_base_level); if (sstables_to_merge.size() <= 1) { // no need to do merge return Status::OK(); @@ -501,16 +505,16 @@ Status LakePersistentIndex::prepare_merging_iterator( opts.encryption_info = std::move(info); } ASSIGN_OR_RETURN(auto rf, fs::new_random_access_file( - opts, tablet_mgr->sst_location(metadata.id(), sstable_pb.filename()))); + opts, tablet_mgr->sst_location(metadata->id(), sstable_pb.filename()))); auto merging_sstable = std::make_shared(); - RETURN_IF_ERROR(merging_sstable->init(std::move(rf), sstable_pb, nullptr, tablet_mgr, metadata.id(), - false /** no filter **/)); + RETURN_IF_ERROR(merging_sstable->init(std::move(rf), sstable_pb, nullptr, false /** no filter **/, + nullptr /* delvec */, metadata, tablet_mgr)); merging_sstables->push_back(merging_sstable); // Pass `max_rss_rowid` to iterator, will be used when compaction. read_options.max_rss_rowid = sstable_pb.max_rss_rowid(); if (sstable_pb.has_predicate()) { ASSIGN_OR_RETURN(read_options.predicate, - sstable::SstablePredicate::create(metadata.schema(), sstable_pb.predicate())); + sstable::SstablePredicate::create(metadata->schema(), sstable_pb.predicate())); } read_options.shared_rssid = sstable_pb.shared_rssid(); read_options.shared_version = sstable_pb.shared_version(); @@ -542,9 +546,9 @@ Status LakePersistentIndex::merge_sstables(std::unique_ptr it return builder->Finish(); } -Status LakePersistentIndex::major_compact(TabletManager* tablet_mgr, const TabletMetadata& metadata, +Status LakePersistentIndex::major_compact(TabletManager* tablet_mgr, const TabletMetadataPtr& metadata, TxnLogPB* txn_log) { - if (metadata.sstable_meta().sstables_size() < config::lake_pk_index_sst_min_compaction_versions) { + if (metadata->sstable_meta().sstables_size() < config::lake_pk_index_sst_min_compaction_versions) { return Status::OK(); } @@ -563,7 +567,7 @@ Status LakePersistentIndex::major_compact(TabletManager* tablet_mgr, const Table } auto filename = gen_sst_filename(); - auto location = tablet_mgr->sst_location(metadata.id(), filename); + auto location = tablet_mgr->sst_location(metadata->id(), filename); WritableFileOptions wopts; std::string encryption_meta; if (config::enable_transparent_data_encryption) { @@ -608,7 +612,7 @@ Status LakePersistentIndex::apply_opcompaction(const TxnLogPB_OpCompaction& op_c } ASSIGN_OR_RETURN(auto rf, fs::new_random_access_file(opts, _tablet_mgr->sst_location(_tablet_id, sstable_pb.filename()))); - RETURN_IF_ERROR(sstable->init(std::move(rf), sstable_pb, block_cache->cache(), _tablet_mgr, _tablet_id)); + RETURN_IF_ERROR(sstable->init(std::move(rf), sstable_pb, block_cache->cache())); std::unordered_set filenames; for (const auto& input_sstable : op_compaction.input_sstables()) { diff --git a/be/src/storage/lake/lake_persistent_index.h b/be/src/storage/lake/lake_persistent_index.h index 67fb8b9b649..0149854c1ce 100644 --- a/be/src/storage/lake/lake_persistent_index.h +++ b/be/src/storage/lake/lake_persistent_index.h @@ -73,7 +73,7 @@ public: DISALLOW_COPY(LakePersistentIndex); - Status init(const PersistentIndexSstableMetaPB& sstable_meta); + Status init(const TabletMetadataPtr& metadata); // batch get // |n|: size of key/value array @@ -137,10 +137,10 @@ public: Status minor_compact(); - Status ingest_sst(const FileMetaPB& sst_meta, uint32_t rssid, int64_t version, bool is_compaction, + Status ingest_sst(const FileMetaPB& sst_meta, uint32_t rssid, int64_t version, const DelvecPagePB& delvec_page, DelVectorPtr delvec); - static Status major_compact(TabletManager* tablet_mgr, const TabletMetadata& metadata, TxnLogPB* txn_log); + static Status major_compact(TabletManager* tablet_mgr, const TabletMetadataPtr& metadata, TxnLogPB* txn_log); Status apply_opcompaction(const TxnLogPB_OpCompaction& op_compaction); @@ -183,7 +183,8 @@ private: static void set_difference(KeyIndexSet* key_indexes, const KeyIndexSet& found_key_indexes); // get sstable's iterator that need to compact and modify txn_log - static Status prepare_merging_iterator(TabletManager* tablet_mgr, const TabletMetadata& metadata, TxnLogPB* txn_log, + static Status prepare_merging_iterator(TabletManager* tablet_mgr, const TabletMetadataPtr& metadata, + TxnLogPB* txn_log, std::vector>* merging_sstables, std::unique_ptr* merging_iter_ptr, bool* merge_base_level); diff --git a/be/src/storage/lake/lake_primary_index.cpp b/be/src/storage/lake/lake_primary_index.cpp index 5899f5e2b85..9965b664aba 100644 --- a/be/src/storage/lake/lake_primary_index.cpp +++ b/be/src/storage/lake/lake_primary_index.cpp @@ -111,7 +111,7 @@ Status LakePrimaryIndex::_do_lake_load(TabletManager* tablet_mgr, const TabletMe _persistent_index = std::make_shared(tablet_mgr, metadata->id()); set_enable_persistent_index(true); auto* lake_persistent_index = dynamic_cast(_persistent_index.get()); - RETURN_IF_ERROR(lake_persistent_index->init(metadata->sstable_meta())); + RETURN_IF_ERROR(lake_persistent_index->init(metadata)); return lake_persistent_index->load_from_lake_tablet(tablet_mgr, metadata, base_version, builder); } default: @@ -201,15 +201,15 @@ Status LakePrimaryIndex::apply_opcompaction(const TabletMetadata& metadata, return Status::OK(); } -Status LakePrimaryIndex::ingest_sst(const FileMetaPB& sst_meta, uint32_t rssid, int64_t version, bool is_compaction, - DelVectorPtr delvec) { +Status LakePrimaryIndex::ingest_sst(const FileMetaPB& sst_meta, uint32_t rssid, int64_t version, + const DelvecPagePB& delvec_page, DelVectorPtr delvec) { if (!_enable_persistent_index) { return Status::OK(); } auto* lake_persistent_index = dynamic_cast(_persistent_index.get()); if (lake_persistent_index != nullptr) { - return lake_persistent_index->ingest_sst(sst_meta, rssid, version, is_compaction, std::move(delvec)); + return lake_persistent_index->ingest_sst(sst_meta, rssid, version, delvec_page, std::move(delvec)); } else { return Status::InternalError("Persistent index is not a LakePersistentIndex."); } diff --git a/be/src/storage/lake/lake_primary_index.h b/be/src/storage/lake/lake_primary_index.h index f331da76b7e..1ece8261bbe 100644 --- a/be/src/storage/lake/lake_primary_index.h +++ b/be/src/storage/lake/lake_primary_index.h @@ -68,7 +68,7 @@ public: Status commit(const TabletMetadataPtr& metadata, MetaFileBuilder* builder); - Status ingest_sst(const FileMetaPB& sst_meta, uint32_t rssid, int64_t version, bool is_compaction, + Status ingest_sst(const FileMetaPB& sst_meta, uint32_t rssid, int64_t version, const DelvecPagePB& delvec_page, DelVectorPtr delvec); double get_local_pk_index_write_amp_score(); diff --git a/be/src/storage/lake/meta_file.cpp b/be/src/storage/lake/meta_file.cpp index 07791cf89ee..926d532738b 100644 --- a/be/src/storage/lake/meta_file.cpp +++ b/be/src/storage/lake/meta_file.cpp @@ -509,6 +509,12 @@ Status MetaFileBuilder::_finalize_delvec(int64_t version, int64_t txn_id) { for (const auto& item : _tablet_meta->delvec_meta().delvecs()) { refered_versions.insert(item.second.version()); } + // collect version from sstable delvecs + for (const auto& sst : _tablet_meta->sstable_meta().sstables()) { + if (sst.has_delvec() && sst.delvec().size() > 0) { + refered_versions.insert(sst.delvec().version()); + } + } auto itr = _tablet_meta->mutable_delvec_meta()->mutable_version_to_file()->begin(); for (; itr != _tablet_meta->mutable_delvec_meta()->mutable_version_to_file()->end();) { @@ -599,51 +605,56 @@ void MetaFileBuilder::finalize_sstable_meta(const PersistentIndexSstableMetaPB& _tablet_meta->mutable_sstable_meta()->CopyFrom(sstable_meta); } +// get delvec by DelvecPagePB +Status get_del_vec(TabletManager* tablet_mgr, const TabletMetadata& metadata, const DelvecPagePB& delvec_page, + bool fill_cache, const LakeIOOptions& lake_io_opts, DelVector* delvec) { + VLOG(2) << fmt::format("get_del_vec {} tabletid {}", delvec_page.ShortDebugString(), metadata.id()); + std::string buf; + raw::stl_string_resize_uninitialized(&buf, delvec_page.size()); + // find in cache + std::string cache_key = delvec_cache_key(metadata.id(), delvec_page); + auto cached_delvec = tablet_mgr->metacache()->lookup_delvec(cache_key); + if (cached_delvec != nullptr) { + delvec->copy_from(*cached_delvec); + return Status::OK(); + } + + // lookup delvec file name and then read it + auto iter = metadata.delvec_meta().version_to_file().find(delvec_page.version()); + if (iter == metadata.delvec_meta().version_to_file().end()) { + LOG(ERROR) << "Can't find delvec file name for tablet: " << metadata.id() + << ", version: " << delvec_page.version(); + return Status::InternalError("Can't find delvec file name"); + } + const auto& delvec_name = iter->second.name(); + RandomAccessFileOptions opts{.skip_fill_local_cache = !lake_io_opts.fill_data_cache}; + std::unique_ptr rf; + if (lake_io_opts.fs && lake_io_opts.location_provider) { + ASSIGN_OR_RETURN(rf, + lake_io_opts.fs->new_random_access_file( + opts, lake_io_opts.location_provider->delvec_location(metadata.id(), delvec_name))); + } else { + ASSIGN_OR_RETURN(rf, fs::new_random_access_file(opts, tablet_mgr->delvec_location(metadata.id(), delvec_name))); + } + RETURN_IF_ERROR(rf->read_at_fully(delvec_page.offset(), buf.data(), delvec_page.size())); + // parse delvec + RETURN_IF_ERROR(delvec->load(delvec_page.version(), buf.data(), delvec_page.size())); + // put in cache + if (fill_cache) { + auto delvec_cache_ptr = std::make_shared(); + delvec_cache_ptr->copy_from(*delvec); + tablet_mgr->metacache()->cache_delvec(cache_key, delvec_cache_ptr); + } + TRACE("end load delvec"); + return Status::OK(); +} + Status get_del_vec(TabletManager* tablet_mgr, const TabletMetadata& metadata, uint32_t segment_id, bool fill_cache, const LakeIOOptions& lake_io_opts, DelVector* delvec) { // find delvec by segment id auto iter = metadata.delvec_meta().delvecs().find(segment_id); if (iter != metadata.delvec_meta().delvecs().end()) { - VLOG(2) << fmt::format("get_del_vec {} segid {}", metadata.delvec_meta().ShortDebugString(), segment_id); - std::string buf; - raw::stl_string_resize_uninitialized(&buf, iter->second.size()); - // find in cache - std::string cache_key = delvec_cache_key(metadata.id(), iter->second); - auto cached_delvec = tablet_mgr->metacache()->lookup_delvec(cache_key); - if (cached_delvec != nullptr) { - delvec->copy_from(*cached_delvec); - return Status::OK(); - } - - // lookup delvec file name and then read it - auto iter2 = metadata.delvec_meta().version_to_file().find(iter->second.version()); - if (iter2 == metadata.delvec_meta().version_to_file().end()) { - LOG(ERROR) << "Can't find delvec file name for tablet: " << metadata.id() - << ", version: " << iter->second.version(); - return Status::InternalError("Can't find delvec file name"); - } - const auto& delvec_name = iter2->second.name(); - RandomAccessFileOptions opts{.skip_fill_local_cache = !lake_io_opts.fill_data_cache}; - std::unique_ptr rf; - if (lake_io_opts.fs && lake_io_opts.location_provider) { - ASSIGN_OR_RETURN( - rf, lake_io_opts.fs->new_random_access_file( - opts, lake_io_opts.location_provider->delvec_location(metadata.id(), delvec_name))); - } else { - ASSIGN_OR_RETURN(rf, - fs::new_random_access_file(opts, tablet_mgr->delvec_location(metadata.id(), delvec_name))); - } - RETURN_IF_ERROR(rf->read_at_fully(iter->second.offset(), buf.data(), iter->second.size())); - // parse delvec - RETURN_IF_ERROR(delvec->load(iter->second.version(), buf.data(), iter->second.size())); - // put in cache - if (fill_cache) { - auto delvec_cache_ptr = std::make_shared(); - delvec_cache_ptr->copy_from(*delvec); - tablet_mgr->metacache()->cache_delvec(cache_key, delvec_cache_ptr); - } - TRACE("end load delvec"); - return Status::OK(); + return get_del_vec(tablet_mgr, metadata, iter->second, fill_cache, lake_io_opts, delvec); } VLOG(2) << fmt::format("get_del_vec not found, segmentid {} tablet_meta {}", segment_id, metadata.delvec_meta().ShortDebugString()); diff --git a/be/src/storage/lake/meta_file.h b/be/src/storage/lake/meta_file.h index f971046ed4c..cd7874caec1 100644 --- a/be/src/storage/lake/meta_file.h +++ b/be/src/storage/lake/meta_file.h @@ -78,6 +78,15 @@ public: const TabletMetadata* tablet_meta() const { return _tablet_meta.get(); } + const DelvecPagePB& delvec_page(uint32_t segment_id) const { + static DelvecPagePB empty; + auto it = _delvecs.find(segment_id); + if (it != _delvecs.end()) { + return it->second; + } + return empty; + } + private: // update delvec in tablet meta Status _finalize_delvec(int64_t version, int64_t txn_id); @@ -113,6 +122,8 @@ private: PendingRowsetData _pending_rowset_data; }; +Status get_del_vec(TabletManager* tablet_mgr, const TabletMetadata& metadata, const DelvecPagePB& delvec_page, + bool fill_cache, const LakeIOOptions& lake_io_opts, DelVector* delvec); Status get_del_vec(TabletManager* tablet_mgr, const TabletMetadata& metadata, uint32_t segment_id, bool fill_cache, const LakeIOOptions& lake_io_opts, DelVector* delvec); bool is_primary_key(TabletMetadata* metadata); diff --git a/be/src/storage/lake/persistent_index_sstable.cpp b/be/src/storage/lake/persistent_index_sstable.cpp index 65dd918e35a..f8e0b53adbc 100644 --- a/be/src/storage/lake/persistent_index_sstable.cpp +++ b/be/src/storage/lake/persistent_index_sstable.cpp @@ -27,8 +27,8 @@ namespace starrocks::lake { Status PersistentIndexSstable::init(std::unique_ptr rf, const PersistentIndexSstablePB& sstable_pb, - Cache* cache, TabletManager* tablet_mgr, int64_t tablet_id, bool need_filter, - DelVectorPtr delvec) { + Cache* cache, bool need_filter, DelVectorPtr delvec, + const TabletMetadataPtr& metadata, TabletManager* tablet_mgr) { sstable::Options options; if (need_filter) { _filter_policy.reset(const_cast(sstable::NewBloomFilterPolicy(10))); @@ -41,17 +41,22 @@ Status PersistentIndexSstable::init(std::unique_ptr rf, const _rf = std::move(rf); _sstable_pb.CopyFrom(sstable_pb); // load delvec - if (_sstable_pb.has_delvec()) { + if (_sstable_pb.has_delvec() && _sstable_pb.delvec().size() > 0) { if (delvec) { // If delvec is already provided, use it directly. _delvec = std::move(delvec); } else { + if (metadata == nullptr) { + return Status::InvalidArgument("metadata is null when loading delvec from file"); + } + if (tablet_mgr == nullptr) { + return Status::InvalidArgument("tablet_mgr is null when loading delvec from file"); + } // otherwise, load delvec from file LakeIOOptions lake_io_opts{.fill_data_cache = true, .skip_disk_cache = false}; auto delvec_loader = std::make_unique(tablet_mgr, nullptr, true /* fill cache */, lake_io_opts); - RETURN_IF_ERROR(delvec_loader->load(TabletSegmentId(tablet_id, _sstable_pb.shared_rssid()), - _sstable_pb.shared_version(), &_delvec)); + RETURN_IF_ERROR(delvec_loader->load_from_meta(metadata, _sstable_pb.delvec(), &_delvec)); } } return Status::OK(); diff --git a/be/src/storage/lake/persistent_index_sstable.h b/be/src/storage/lake/persistent_index_sstable.h index e19fd6c8b6c..f9fa43f1f7b 100644 --- a/be/src/storage/lake/persistent_index_sstable.h +++ b/be/src/storage/lake/persistent_index_sstable.h @@ -19,6 +19,7 @@ #include #include "gen_cpp/lake_types.pb.h" +#include "storage/lake/tablet_metadata.h" #include "storage/persistent_index.h" #include "storage/sstable/filter_policy.h" #include "storage/sstable/table.h" @@ -47,7 +48,8 @@ public: ~PersistentIndexSstable() = default; Status init(std::unique_ptr rf, const PersistentIndexSstablePB& sstable_pb, Cache* cache, - TabletManager* tablet_mgr, int64_t tablet_id, bool need_filter = true, DelVectorPtr delvec = nullptr); + bool need_filter = true, DelVectorPtr delvec = nullptr, const TabletMetadataPtr& metadata = nullptr, + TabletManager* tablet_mgr = nullptr); static Status build_sstable(const phmap::btree_map>& map, WritableFile* wf, uint64_t* filesz); diff --git a/be/src/storage/lake/update_manager.cpp b/be/src/storage/lake/update_manager.cpp index 457ea2bac58..1a61ae17ecc 100644 --- a/be/src/storage/lake/update_manager.cpp +++ b/be/src/storage/lake/update_manager.cpp @@ -297,7 +297,7 @@ Status UpdateManager::publish_primary_key_tablet(const TxnLogPB_OpWrite& op_writ // TODO support condition column with sst ingestion. // rowset_id + segment_id is the rssid of this segment RETURN_IF_ERROR(index.ingest_sst(op_write.ssts(segment_id), rowset_id + segment_id, metadata->version(), - false /* no compaction */, nullptr)); + DelvecPagePB() /* empty */, nullptr)); } } @@ -1197,19 +1197,22 @@ Status UpdateManager::light_publish_primary_compaction(const TxnLogPB_OpCompacti } else { RETURN_IF_ERROR(resolver->execute()); } - // 3. ingest ssts to index + // 3. add delvec to builder + for (auto&& each : delvecs) { + builder->append_delvec(each.second, each.first); + } + // 4. ingest ssts to index DCHECK(op_compaction.ssts_size() == 0 || delvecs.size() == op_compaction.ssts_size()) << "delvecs.size(): " << delvecs.size() << ", op_compaction.ssts_size(): " << op_compaction.ssts_size(); for (int i = 0; i < op_compaction.ssts_size(); i++) { // metadata.next_rowset_id() + i is the rssid of output rowset's i-th segment + DelvecPagePB delvec_page_pb = builder->delvec_page(metadata.next_rowset_id() + i); + delvec_page_pb.set_version(metadata.version()); RETURN_IF_ERROR(index.ingest_sst(op_compaction.ssts(i), metadata.next_rowset_id() + i, metadata.version(), - true /* is compaction */, delvecs[i].second)); + delvec_page_pb, delvecs[i].second)); } _index_cache.update_object_size(index_entry, index.memory_usage()); - // 4. update TabletMeta and write to meta file - for (auto&& each : delvecs) { - builder->append_delvec(each.second, each.first); - } + // 5. update TabletMeta builder->apply_opcompaction(op_compaction, max_rowset_id, tablet_schema->id()); RETURN_IF_ERROR(builder->update_num_del_stat(segment_id_to_add_dels)); RETURN_IF_ERROR(index.apply_opcompaction(metadata, op_compaction)); @@ -1552,7 +1555,7 @@ void UpdateManager::set_enable_persistent_index(int64_t tablet_id, bool enable_p } } -Status UpdateManager::execute_index_major_compaction(const TabletMetadata& metadata, TxnLogPB* txn_log) { +Status UpdateManager::execute_index_major_compaction(const TabletMetadataPtr& metadata, TxnLogPB* txn_log) { return LakePersistentIndex::major_compact(_tablet_mgr, metadata, txn_log); } diff --git a/be/src/storage/lake/update_manager.h b/be/src/storage/lake/update_manager.h index bf3431c6d8e..de0c8131de3 100644 --- a/be/src/storage/lake/update_manager.h +++ b/be/src/storage/lake/update_manager.h @@ -219,7 +219,7 @@ public: void set_enable_persistent_index(int64_t tablet_id, bool enable_persistent_index); - Status execute_index_major_compaction(const TabletMetadata& metadata, TxnLogPB* txn_log); + Status execute_index_major_compaction(const TabletMetadataPtr& metadata, TxnLogPB* txn_log); PersistentIndexBlockCache* block_cache() { return _block_cache.get(); } diff --git a/be/test/storage/lake/lake_persistent_index_test.cpp b/be/test/storage/lake/lake_persistent_index_test.cpp index 7eb8cc772c8..5d066d5756f 100644 --- a/be/test/storage/lake/lake_persistent_index_test.cpp +++ b/be/test/storage/lake/lake_persistent_index_test.cpp @@ -27,7 +27,7 @@ namespace starrocks::lake { class LakePersistentIndexTest : public TestBase { public: LakePersistentIndexTest() : TestBase(kTestDirectory) { - _tablet_metadata = std::make_unique(); + _tablet_metadata = std::make_shared(); _tablet_metadata->set_id(next_id()); _tablet_metadata->set_version(1); _tablet_metadata->set_enable_persistent_index(true); @@ -70,7 +70,7 @@ protected: constexpr static const char* const kTestDirectory = "test_lake_persistent_index"; - std::unique_ptr _tablet_metadata; + std::shared_ptr _tablet_metadata; }; TEST_F(LakePersistentIndexTest, test_basic_api) { @@ -91,7 +91,7 @@ TEST_F(LakePersistentIndexTest, test_basic_api) { } auto tablet_id = _tablet_metadata->id(); auto index = std::make_unique(_tablet_mgr.get(), tablet_id); - ASSERT_OK(index->init(_tablet_metadata->sstable_meta())); + ASSERT_OK(index->init(_tablet_metadata)); ASSERT_OK(index->insert(N, key_slices.data(), values.data(), 0)); ASSERT_TRUE(index->memory_usage() > 0); @@ -172,7 +172,7 @@ TEST_F(LakePersistentIndexTest, test_replace) { auto tablet_id = _tablet_metadata->id(); auto index = std::make_unique(_tablet_mgr.get(), tablet_id); - ASSERT_OK(index->init(_tablet_metadata->sstable_meta())); + ASSERT_OK(index->init(_tablet_metadata)); ASSERT_OK(index->insert(N, key_slices.data(), values.data(), false)); //replace @@ -202,7 +202,7 @@ TEST_F(LakePersistentIndexTest, test_major_compaction) { total_keys.reserve(M * N); auto tablet_id = _tablet_metadata->id(); auto index = std::make_unique(_tablet_mgr.get(), tablet_id); - ASSERT_OK(index->init(_tablet_metadata->sstable_meta())); + ASSERT_OK(index->init(_tablet_metadata)); int k = 0; for (int i = 0; i < M; ++i) { vector keys; @@ -241,7 +241,7 @@ TEST_F(LakePersistentIndexTest, test_major_compaction) { get_values.reserve(M * N); auto txn_log = std::make_shared(); // try to compact sst files. - ASSERT_OK(LakePersistentIndex::major_compact(_tablet_mgr.get(), *tablet_metadata_ptr, txn_log.get())); + ASSERT_OK(LakePersistentIndex::major_compact(_tablet_mgr.get(), tablet_metadata_ptr, txn_log.get())); ASSERT_TRUE(txn_log->op_compaction().input_sstables_size() > 0); ASSERT_TRUE(txn_log->op_compaction().has_output_sstable()); ASSERT_OK(index->apply_opcompaction(txn_log->op_compaction())); @@ -304,7 +304,7 @@ TEST_F(LakePersistentIndexTest, test_compaction_strategy) { TEST_F(LakePersistentIndexTest, test_insert_delete) { auto tablet_id = _tablet_metadata->id(); auto index = std::make_unique(_tablet_mgr.get(), tablet_id); - ASSERT_OK(index->init(_tablet_metadata->sstable_meta())); + ASSERT_OK(index->init(_tablet_metadata)); auto l0_max_mem_usage = config::l0_max_mem_usage; config::l0_max_mem_usage = 10; @@ -355,7 +355,7 @@ TEST_F(LakePersistentIndexTest, test_insert_delete) { TEST_F(LakePersistentIndexTest, test_memtable_full) { auto tablet_id = _tablet_metadata->id(); auto index = std::make_unique(_tablet_mgr.get(), tablet_id); - ASSERT_OK(index->init(_tablet_metadata->sstable_meta())); + ASSERT_OK(index->init(_tablet_metadata)); size_t old_l0_max_mem_usage = config::l0_max_mem_usage; config::l0_max_mem_usage = 1073741824; @@ -493,7 +493,7 @@ TEST_F(LakePersistentIndexTest, test_major_compaction_with_predicate) { total_keys.reserve(M * N); auto tablet_id = _tablet_metadata->id(); auto index = std::make_unique(_tablet_mgr.get(), tablet_id); - ASSERT_OK(index->init(_tablet_metadata->sstable_meta())); + ASSERT_OK(index->init(_tablet_metadata)); int k = 0; for (int i = 0; i < M; ++i) { vector keys; @@ -540,7 +540,7 @@ TEST_F(LakePersistentIndexTest, test_major_compaction_with_predicate) { auto hit_count = SIMD::count_nonzero(hits.data(), hits.size()); auto txn_log = std::make_shared(); // try to compact sst files. - ASSERT_OK(LakePersistentIndex::major_compact(_tablet_mgr.get(), *tablet_metadata_ptr, txn_log.get())); + ASSERT_OK(LakePersistentIndex::major_compact(_tablet_mgr.get(), tablet_metadata_ptr, txn_log.get())); ASSERT_TRUE(txn_log->op_compaction().input_sstables_size() == M); ASSERT_TRUE(txn_log->op_compaction().has_output_sstable() || hit_count == 0); ASSERT_OK(index->apply_opcompaction(txn_log->op_compaction())); diff --git a/be/test/storage/lake/meta_file_test.cpp b/be/test/storage/lake/meta_file_test.cpp index fe590ce4ea6..1d02609d6c8 100644 --- a/be/test/storage/lake/meta_file_test.cpp +++ b/be/test/storage/lake/meta_file_test.cpp @@ -705,4 +705,117 @@ TEST_F(MetaFileTest, test_batch_apply_opwrite_merge_dels) { ASSERT_EQ(1, persisted->rowsets_size()); ASSERT_EQ(3, persisted->rowsets(0).del_files_size()); } + +TEST_F(MetaFileTest, test_sstable_delvec_integration) { + // Test SSTable delvec integration: test new get_del_vec(DelvecPagePB) function and + // version reference collection from SSTable delvecs during finalization + const int64_t tablet_id = 40001; + const uint32_t segment_id = 1001; + const int64_t version1 = 11; + const int64_t version2 = 12; + auto tablet = std::make_shared(_tablet_manager.get(), tablet_id); + auto metadata = std::make_shared(); + metadata->set_id(tablet_id); + metadata->set_version(version1); + metadata->set_next_rowset_id(110); + metadata->mutable_schema()->set_keys_type(PRIMARY_KEYS); + metadata->set_enable_persistent_index(true); + metadata->set_persistent_index_type(PersistentIndexTypePB::CLOUD_NATIVE); + + // 1. Create and write delvec first + MetaFileBuilder builder1(*tablet, metadata); + DelVector dv1; + dv1.set_empty(); + std::shared_ptr ndv1; + std::vector dels1 = {1, 3, 5, 7, 100}; + dv1.add_dels_as_new_version(dels1, version1, &ndv1); + std::string original_delvec = ndv1->save(); + builder1.append_delvec(ndv1, segment_id); + Status st = builder1.finalize(next_id()); + EXPECT_TRUE(st.ok()); + + // 2. Get the delvec page info for creating SSTable delvec + ASSIGN_OR_ABORT(auto metadata1, _tablet_manager->get_tablet_metadata(tablet_id, version1)); + auto iter = metadata1->delvec_meta().delvecs().find(segment_id); + EXPECT_TRUE(iter != metadata1->delvec_meta().delvecs().end()); + DelvecPagePB delvec_page = iter->second; + + // 3. Test new get_del_vec function with DelvecPagePB + DelVector read_delvec1; + LakeIOOptions lake_io_opts; + EXPECT_TRUE(get_del_vec(_tablet_manager.get(), *metadata1, delvec_page, true, lake_io_opts, &read_delvec1).ok()); + EXPECT_EQ(original_delvec, read_delvec1.save()); + + // 4. Create SSTable with delvec and write to version2 + metadata->set_version(version2); + MetaFileBuilder builder2(*tablet, metadata); + + PersistentIndexSstableMetaPB sstable_meta; + PersistentIndexSstablePB* sstable = sstable_meta.add_sstables(); + sstable->set_filename("test_sstable.sst"); + sstable->set_filesize(1024); + sstable->set_max_rss_rowid(100); + sstable->mutable_delvec()->CopyFrom(delvec_page); // Use DelvecPagePB instead of has_delvec + + builder2.finalize_sstable_meta(sstable_meta); + st = builder2.finalize(next_id()); + EXPECT_TRUE(st.ok()); + + // 5. Verify SSTable contains delvec information + ASSIGN_OR_ABORT(auto metadata2, _tablet_manager->get_tablet_metadata(tablet_id, version2)); + EXPECT_EQ(1, metadata2->sstable_meta().sstables_size()); + const auto& saved_sstable = metadata2->sstable_meta().sstables(0); + EXPECT_TRUE(saved_sstable.has_delvec()); + EXPECT_EQ(delvec_page.version(), saved_sstable.delvec().version()); + EXPECT_EQ(delvec_page.offset(), saved_sstable.delvec().offset()); + EXPECT_EQ(delvec_page.size(), saved_sstable.delvec().size()); + + // 6. Test reading delvec via SSTable's delvec page + DelVector read_delvec2; + EXPECT_TRUE( + get_del_vec(_tablet_manager.get(), *metadata2, saved_sstable.delvec(), true, lake_io_opts, &read_delvec2) + .ok()); + EXPECT_EQ(original_delvec, read_delvec2.save()); + + // 7. Test version reference collection: create new metadata without regular delvec but with SSTable delvec + auto metadata3 = std::make_shared(*metadata2); + metadata3->set_version(version2 + 1); + metadata3->mutable_delvec_meta()->mutable_delvecs()->clear(); // Clear regular delvecs + + MetaFileBuilder builder3(*tablet, metadata3); + st = builder3.finalize(next_id()); + EXPECT_TRUE(st.ok()); + + // 8. Verify that delvec file version is preserved due to SSTable reference + ASSIGN_OR_ABORT(auto metadata4, _tablet_manager->get_tablet_metadata(tablet_id, version2 + 1)); + auto version_to_file_map = metadata4->delvec_meta().version_to_file(); + + // The delvec file with version1 should still exist because SSTable references it + auto version_iter = version_to_file_map.find(version1); + EXPECT_TRUE(version_iter != version_to_file_map.end()); + + // 9. Verify we can still read delvec from SSTable after cleanup + DelVector read_delvec3; + EXPECT_TRUE( + get_del_vec(_tablet_manager.get(), *metadata4, saved_sstable.delvec(), true, lake_io_opts, &read_delvec3) + .ok()); + EXPECT_EQ(original_delvec, read_delvec3.save()); + + // 10. Remove SSTable delvec reference and verify delvec file cleanup + auto metadata5 = std::make_shared(*metadata4); + metadata5->set_version(version2 + 2); + metadata5->mutable_sstable_meta()->clear_sstables(); // Remove SSTable that references delvec + + MetaFileBuilder builder4(*tablet, metadata5); + st = builder4.finalize(next_id()); + EXPECT_TRUE(st.ok()); + + // 11. Verify that delvec file version is now removed since no SSTable references it + ASSIGN_OR_ABORT(auto metadata6, _tablet_manager->get_tablet_metadata(tablet_id, version2 + 2)); + auto final_version_to_file_map = metadata6->delvec_meta().version_to_file(); + + // The delvec file with version1 should be removed because no SSTable references it anymore + auto final_version_iter = final_version_to_file_map.find(version1); + EXPECT_TRUE(final_version_iter == final_version_to_file_map.end()); +} } // namespace starrocks::lake diff --git a/be/test/storage/lake/persistent_index_sstable_test.cpp b/be/test/storage/lake/persistent_index_sstable_test.cpp index 0bbf691c609..b5f2a192672 100644 --- a/be/test/storage/lake/persistent_index_sstable_test.cpp +++ b/be/test/storage/lake/persistent_index_sstable_test.cpp @@ -226,7 +226,7 @@ TEST_F(PersistentIndexSstableTest, test_persistent_index_sstable) { PersistentIndexSstablePB sstable_pb; sstable_pb.set_filename(filename); sstable_pb.set_filesize(filesize); - ASSERT_OK(sst->init(std::move(read_file), sstable_pb, cache_ptr.get(), nullptr, 0)); + ASSERT_OK(sst->init(std::move(read_file), sstable_pb, cache_ptr.get())); // check memory usage ASSERT_TRUE(sst->memory_usage() > 0); @@ -517,7 +517,7 @@ TEST_F(PersistentIndexSstableTest, test_persistent_index_sstable_stream_builder) PersistentIndexSstablePB sstable_pb; sstable_pb.set_filename(filename); sstable_pb.set_filesize(file_size); - ASSERT_OK(sst->init(std::move(read_file), sstable_pb, cache_ptr.get(), nullptr, 0)); + ASSERT_OK(sst->init(std::move(read_file), sstable_pb, cache_ptr.get())); // 4. Iterate and verify all keys sstable::ReadIOStat stat; diff --git a/gensrc/proto/lake_types.proto b/gensrc/proto/lake_types.proto index 510a87dae4c..493ab1e8ec2 100644 --- a/gensrc/proto/lake_types.proto +++ b/gensrc/proto/lake_types.proto @@ -35,6 +35,30 @@ message DelvecPagePB { optional uint64 size = 3; } +message PersistentIndexSstablePB { + optional int64 version = 1; // Deprecated + optional string filename = 2; + optional int64 filesize = 3; + // used for rebuild point of persistent index + optional uint64 max_rss_rowid = 4; + optional bytes encryption_meta = 5; + // Whether this file shared by multiple tablets + optional bool shared = 6 [default=false]; + // Filter out rows which do not satisfy the predicate + // when reading this specific sstable + optional PersistentIndexSstablePredicatePB predicate = 7; + // For sstable generated during data write & compaction process. + optional uint32 shared_rssid = 8; + optional int64 shared_version = 9; + // For sstable generated during compaction process. + optional DelvecPagePB delvec = 10; +} + +message PersistentIndexSstableMetaPB { + // sstables are ordered with the smaller version on the left. + repeated PersistentIndexSstablePB sstables = 1; +} + message DelvecCacheKeyPB { optional int64 id = 1; optional DelvecPagePB delvec_page = 2; diff --git a/gensrc/proto/types.proto b/gensrc/proto/types.proto index 6f99d1eccfd..dfb6952a29d 100644 --- a/gensrc/proto/types.proto +++ b/gensrc/proto/types.proto @@ -217,30 +217,6 @@ message IndexValuesWithVerPB { repeated IndexValueWithVerPB values = 1; } -message PersistentIndexSstablePB { - optional int64 version = 1; // Deprecated - optional string filename = 2; - optional int64 filesize = 3; - // used for rebuild point of persistent index - optional uint64 max_rss_rowid = 4; - optional bytes encryption_meta = 5; - // Whether this file shared by multiple tablets - optional bool shared = 6 [default=false]; - // Filter out rows which do not satisfy the predicate - // when reading this specific sstable - optional PersistentIndexSstablePredicatePB predicate = 7; - // For sstable generated during data write & compaction process. - optional uint32 shared_rssid = 8; - optional int64 shared_version = 9; - // For sstable generated during compaction process. - optional bool has_delvec = 10; -} - -message PersistentIndexSstableMetaPB { - // sstables are ordered with the smaller version on the left. - repeated PersistentIndexSstablePB sstables = 1; -} - enum TransactionStatusPB { TRANS_UNKNOWN = 0; TRANS_PREPARE = 1;