[Enhancement] Improve delvec file cleanup logic to properly handle pk index references (#63384)

Signed-off-by: luohaha <18810541851@163.com>
This commit is contained in:
Yixin Luo 2025-09-26 20:13:16 +08:00 committed by GitHub
parent ee28a4b9ac
commit ee93eaef39
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
18 changed files with 276 additions and 120 deletions

View File

@ -39,7 +39,7 @@ Status CompactionTask::execute_index_major_compaction(TxnLogPB* txn_log) {
auto metadata = _tablet.metadata();
if (metadata->enable_persistent_index() &&
metadata->persistent_index_type() == PersistentIndexTypePB::CLOUD_NATIVE) {
RETURN_IF_ERROR(_tablet.tablet_manager()->update_mgr()->execute_index_major_compaction(*metadata, txn_log));
RETURN_IF_ERROR(_tablet.tablet_manager()->update_mgr()->execute_index_major_compaction(metadata, txn_log));
if (txn_log->has_op_compaction() && !txn_log->op_compaction().input_sstables().empty()) {
size_t total_input_sstable_file_size = 0;
for (const auto& input_sstable : txn_log->op_compaction().input_sstables()) {

View File

@ -34,6 +34,11 @@ Status LakeDelvecLoader::load(const TabletSegmentId& tsid, int64_t version, DelV
return load_from_file(tsid, version, pdelvec);
}
Status LakeDelvecLoader::load_from_meta(const TabletMetadataPtr& metadata, const DelvecPagePB& delvec_page,
DelVectorPtr* pdelvec) {
return lake::get_del_vec(_tablet_manager, *metadata, delvec_page, _fill_cache, _lake_io_opts, pdelvec->get());
}
Status LakeDelvecLoader::load_from_file(const TabletSegmentId& tsid, int64_t version, DelVectorPtr* pdelvec) {
(*pdelvec).reset(new DelVector());
// 2. find in delvec file

View File

@ -32,6 +32,7 @@ public:
_fill_cache(fill_cache),
_lake_io_opts(std::move(lake_io_opts)) {}
Status load(const TabletSegmentId& tsid, int64_t version, DelVectorPtr* pdelvec);
Status load_from_meta(const TabletMetadataPtr& metadata, const DelvecPagePB& delvec_page, DelVectorPtr* pdelvec);
Status load_from_file(const TabletSegmentId& tsid, int64_t version, DelVectorPtr* pdelvec);
private:

View File

@ -177,11 +177,12 @@ LakePersistentIndex::~LakePersistentIndex() {
_sstables.clear();
}
Status LakePersistentIndex::init(const PersistentIndexSstableMetaPB& sstable_meta) {
Status LakePersistentIndex::init(const TabletMetadataPtr& metadata) {
auto* block_cache = _tablet_mgr->update_mgr()->block_cache();
if (block_cache == nullptr) {
return Status::InternalError("Block cache is null.");
}
const PersistentIndexSstableMetaPB& sstable_meta = metadata->sstable_meta();
uint64_t max_rss_rowid = 0;
for (auto& sstable_pb : sstable_meta.sstables()) {
RandomAccessFileOptions opts;
@ -192,7 +193,8 @@ Status LakePersistentIndex::init(const PersistentIndexSstableMetaPB& sstable_met
ASSIGN_OR_RETURN(auto rf, fs::new_random_access_file(
opts, _tablet_mgr->sst_location(_tablet_id, sstable_pb.filename())));
auto sstable = std::make_unique<PersistentIndexSstable>();
RETURN_IF_ERROR(sstable->init(std::move(rf), sstable_pb, block_cache->cache(), _tablet_mgr, _tablet_id));
RETURN_IF_ERROR(sstable->init(std::move(rf), sstable_pb, block_cache->cache(), true /* need filter */,
nullptr /* delvec */, metadata, _tablet_mgr));
_sstables.emplace_back(std::move(sstable));
max_rss_rowid = std::max(max_rss_rowid, sstable_pb.max_rss_rowid());
}
@ -257,14 +259,14 @@ Status LakePersistentIndex::minor_compact() {
sstable_pb.set_max_rss_rowid(_memtable->max_rss_rowid());
sstable_pb.set_encryption_meta(encryption_meta);
TEST_SYNC_POINT_CALLBACK("LakePersistentIndex::minor_compact:inject_predicate", &sstable_pb);
RETURN_IF_ERROR(sstable->init(std::move(rf), sstable_pb, block_cache->cache(), _tablet_mgr, _tablet_id));
RETURN_IF_ERROR(sstable->init(std::move(rf), sstable_pb, block_cache->cache()));
_sstables.emplace_back(std::move(sstable));
TRACE_COUNTER_INCREMENT("minor_compact_times", 1);
return Status::OK();
}
Status LakePersistentIndex::ingest_sst(const FileMetaPB& sst_meta, uint32_t rssid, int64_t version, bool is_compaction,
DelVectorPtr delvec) {
Status LakePersistentIndex::ingest_sst(const FileMetaPB& sst_meta, uint32_t rssid, int64_t version,
const DelvecPagePB& delvec_page, DelVectorPtr delvec) {
auto* block_cache = _tablet_mgr->update_mgr()->block_cache();
if (block_cache == nullptr) {
return Status::InternalError("Block cache is null.");
@ -287,12 +289,14 @@ Status LakePersistentIndex::ingest_sst(const FileMetaPB& sst_meta, uint32_t rssi
sstable_pb.set_shared_version(version);
sstable_pb.set_encryption_meta(sst_meta.encryption_meta());
// sstable generated by compaction need delvec to resolve conflict.
sstable_pb.set_has_delvec(is_compaction);
if (delvec_page.size() > 0) {
sstable_pb.mutable_delvec()->CopyFrom(delvec_page);
}
// use UINT32_MAX - 1 as max rowid here to indicate all rows of this segment are already contained in this sst.
// We don't use UINT32_MAX as max rowid because it is reserved for delete rows.
sstable_pb.set_max_rss_rowid((static_cast<uint64_t>(rssid) << 32) | (UINT32_MAX - 1));
RETURN_IF_ERROR(sstable->init(std::move(rf), sstable_pb, block_cache->cache(), _tablet_mgr, _tablet_id,
true /* need filter */, std::move(delvec)));
RETURN_IF_ERROR(
sstable->init(std::move(rf), sstable_pb, block_cache->cache(), true /* need filter */, std::move(delvec)));
_sstables.emplace_back(std::move(sstable));
TRACE_COUNTER_INCREMENT("ingest_sst_times", 1);
return Status::OK();
@ -471,7 +475,7 @@ void LakePersistentIndex::pick_sstables_for_merge(const PersistentIndexSstableMe
}
Status LakePersistentIndex::prepare_merging_iterator(
TabletManager* tablet_mgr, const TabletMetadata& metadata, TxnLogPB* txn_log,
TabletManager* tablet_mgr, const TabletMetadataPtr& metadata, TxnLogPB* txn_log,
std::vector<std::shared_ptr<PersistentIndexSstable>>* merging_sstables,
std::unique_ptr<sstable::Iterator>* merging_iter_ptr, bool* merge_base_level) {
sstable::ReadOptions read_options;
@ -484,11 +488,11 @@ Status LakePersistentIndex::prepare_merging_iterator(
}
});
iters.reserve(metadata.sstable_meta().sstables().size());
iters.reserve(metadata->sstable_meta().sstables().size());
std::stringstream ss_debug;
std::vector<PersistentIndexSstablePB> sstables_to_merge;
// Pick sstable for merge, decide to use base merge or cumulative merge.
pick_sstables_for_merge(metadata.sstable_meta(), &sstables_to_merge, merge_base_level);
pick_sstables_for_merge(metadata->sstable_meta(), &sstables_to_merge, merge_base_level);
if (sstables_to_merge.size() <= 1) {
// no need to do merge
return Status::OK();
@ -501,16 +505,16 @@ Status LakePersistentIndex::prepare_merging_iterator(
opts.encryption_info = std::move(info);
}
ASSIGN_OR_RETURN(auto rf, fs::new_random_access_file(
opts, tablet_mgr->sst_location(metadata.id(), sstable_pb.filename())));
opts, tablet_mgr->sst_location(metadata->id(), sstable_pb.filename())));
auto merging_sstable = std::make_shared<PersistentIndexSstable>();
RETURN_IF_ERROR(merging_sstable->init(std::move(rf), sstable_pb, nullptr, tablet_mgr, metadata.id(),
false /** no filter **/));
RETURN_IF_ERROR(merging_sstable->init(std::move(rf), sstable_pb, nullptr, false /** no filter **/,
nullptr /* delvec */, metadata, tablet_mgr));
merging_sstables->push_back(merging_sstable);
// Pass `max_rss_rowid` to iterator, will be used when compaction.
read_options.max_rss_rowid = sstable_pb.max_rss_rowid();
if (sstable_pb.has_predicate()) {
ASSIGN_OR_RETURN(read_options.predicate,
sstable::SstablePredicate::create(metadata.schema(), sstable_pb.predicate()));
sstable::SstablePredicate::create(metadata->schema(), sstable_pb.predicate()));
}
read_options.shared_rssid = sstable_pb.shared_rssid();
read_options.shared_version = sstable_pb.shared_version();
@ -542,9 +546,9 @@ Status LakePersistentIndex::merge_sstables(std::unique_ptr<sstable::Iterator> it
return builder->Finish();
}
Status LakePersistentIndex::major_compact(TabletManager* tablet_mgr, const TabletMetadata& metadata,
Status LakePersistentIndex::major_compact(TabletManager* tablet_mgr, const TabletMetadataPtr& metadata,
TxnLogPB* txn_log) {
if (metadata.sstable_meta().sstables_size() < config::lake_pk_index_sst_min_compaction_versions) {
if (metadata->sstable_meta().sstables_size() < config::lake_pk_index_sst_min_compaction_versions) {
return Status::OK();
}
@ -563,7 +567,7 @@ Status LakePersistentIndex::major_compact(TabletManager* tablet_mgr, const Table
}
auto filename = gen_sst_filename();
auto location = tablet_mgr->sst_location(metadata.id(), filename);
auto location = tablet_mgr->sst_location(metadata->id(), filename);
WritableFileOptions wopts;
std::string encryption_meta;
if (config::enable_transparent_data_encryption) {
@ -608,7 +612,7 @@ Status LakePersistentIndex::apply_opcompaction(const TxnLogPB_OpCompaction& op_c
}
ASSIGN_OR_RETURN(auto rf,
fs::new_random_access_file(opts, _tablet_mgr->sst_location(_tablet_id, sstable_pb.filename())));
RETURN_IF_ERROR(sstable->init(std::move(rf), sstable_pb, block_cache->cache(), _tablet_mgr, _tablet_id));
RETURN_IF_ERROR(sstable->init(std::move(rf), sstable_pb, block_cache->cache()));
std::unordered_set<std::string> filenames;
for (const auto& input_sstable : op_compaction.input_sstables()) {

View File

@ -73,7 +73,7 @@ public:
DISALLOW_COPY(LakePersistentIndex);
Status init(const PersistentIndexSstableMetaPB& sstable_meta);
Status init(const TabletMetadataPtr& metadata);
// batch get
// |n|: size of key/value array
@ -137,10 +137,10 @@ public:
Status minor_compact();
Status ingest_sst(const FileMetaPB& sst_meta, uint32_t rssid, int64_t version, bool is_compaction,
Status ingest_sst(const FileMetaPB& sst_meta, uint32_t rssid, int64_t version, const DelvecPagePB& delvec_page,
DelVectorPtr delvec);
static Status major_compact(TabletManager* tablet_mgr, const TabletMetadata& metadata, TxnLogPB* txn_log);
static Status major_compact(TabletManager* tablet_mgr, const TabletMetadataPtr& metadata, TxnLogPB* txn_log);
Status apply_opcompaction(const TxnLogPB_OpCompaction& op_compaction);
@ -183,7 +183,8 @@ private:
static void set_difference(KeyIndexSet* key_indexes, const KeyIndexSet& found_key_indexes);
// get sstable's iterator that need to compact and modify txn_log
static Status prepare_merging_iterator(TabletManager* tablet_mgr, const TabletMetadata& metadata, TxnLogPB* txn_log,
static Status prepare_merging_iterator(TabletManager* tablet_mgr, const TabletMetadataPtr& metadata,
TxnLogPB* txn_log,
std::vector<std::shared_ptr<PersistentIndexSstable>>* merging_sstables,
std::unique_ptr<sstable::Iterator>* merging_iter_ptr,
bool* merge_base_level);

View File

@ -111,7 +111,7 @@ Status LakePrimaryIndex::_do_lake_load(TabletManager* tablet_mgr, const TabletMe
_persistent_index = std::make_shared<LakePersistentIndex>(tablet_mgr, metadata->id());
set_enable_persistent_index(true);
auto* lake_persistent_index = dynamic_cast<LakePersistentIndex*>(_persistent_index.get());
RETURN_IF_ERROR(lake_persistent_index->init(metadata->sstable_meta()));
RETURN_IF_ERROR(lake_persistent_index->init(metadata));
return lake_persistent_index->load_from_lake_tablet(tablet_mgr, metadata, base_version, builder);
}
default:
@ -201,15 +201,15 @@ Status LakePrimaryIndex::apply_opcompaction(const TabletMetadata& metadata,
return Status::OK();
}
Status LakePrimaryIndex::ingest_sst(const FileMetaPB& sst_meta, uint32_t rssid, int64_t version, bool is_compaction,
DelVectorPtr delvec) {
Status LakePrimaryIndex::ingest_sst(const FileMetaPB& sst_meta, uint32_t rssid, int64_t version,
const DelvecPagePB& delvec_page, DelVectorPtr delvec) {
if (!_enable_persistent_index) {
return Status::OK();
}
auto* lake_persistent_index = dynamic_cast<LakePersistentIndex*>(_persistent_index.get());
if (lake_persistent_index != nullptr) {
return lake_persistent_index->ingest_sst(sst_meta, rssid, version, is_compaction, std::move(delvec));
return lake_persistent_index->ingest_sst(sst_meta, rssid, version, delvec_page, std::move(delvec));
} else {
return Status::InternalError("Persistent index is not a LakePersistentIndex.");
}

View File

@ -68,7 +68,7 @@ public:
Status commit(const TabletMetadataPtr& metadata, MetaFileBuilder* builder);
Status ingest_sst(const FileMetaPB& sst_meta, uint32_t rssid, int64_t version, bool is_compaction,
Status ingest_sst(const FileMetaPB& sst_meta, uint32_t rssid, int64_t version, const DelvecPagePB& delvec_page,
DelVectorPtr delvec);
double get_local_pk_index_write_amp_score();

View File

@ -509,6 +509,12 @@ Status MetaFileBuilder::_finalize_delvec(int64_t version, int64_t txn_id) {
for (const auto& item : _tablet_meta->delvec_meta().delvecs()) {
refered_versions.insert(item.second.version());
}
// collect version from sstable delvecs
for (const auto& sst : _tablet_meta->sstable_meta().sstables()) {
if (sst.has_delvec() && sst.delvec().size() > 0) {
refered_versions.insert(sst.delvec().version());
}
}
auto itr = _tablet_meta->mutable_delvec_meta()->mutable_version_to_file()->begin();
for (; itr != _tablet_meta->mutable_delvec_meta()->mutable_version_to_file()->end();) {
@ -599,51 +605,56 @@ void MetaFileBuilder::finalize_sstable_meta(const PersistentIndexSstableMetaPB&
_tablet_meta->mutable_sstable_meta()->CopyFrom(sstable_meta);
}
// get delvec by DelvecPagePB
Status get_del_vec(TabletManager* tablet_mgr, const TabletMetadata& metadata, const DelvecPagePB& delvec_page,
bool fill_cache, const LakeIOOptions& lake_io_opts, DelVector* delvec) {
VLOG(2) << fmt::format("get_del_vec {} tabletid {}", delvec_page.ShortDebugString(), metadata.id());
std::string buf;
raw::stl_string_resize_uninitialized(&buf, delvec_page.size());
// find in cache
std::string cache_key = delvec_cache_key(metadata.id(), delvec_page);
auto cached_delvec = tablet_mgr->metacache()->lookup_delvec(cache_key);
if (cached_delvec != nullptr) {
delvec->copy_from(*cached_delvec);
return Status::OK();
}
// lookup delvec file name and then read it
auto iter = metadata.delvec_meta().version_to_file().find(delvec_page.version());
if (iter == metadata.delvec_meta().version_to_file().end()) {
LOG(ERROR) << "Can't find delvec file name for tablet: " << metadata.id()
<< ", version: " << delvec_page.version();
return Status::InternalError("Can't find delvec file name");
}
const auto& delvec_name = iter->second.name();
RandomAccessFileOptions opts{.skip_fill_local_cache = !lake_io_opts.fill_data_cache};
std::unique_ptr<RandomAccessFile> rf;
if (lake_io_opts.fs && lake_io_opts.location_provider) {
ASSIGN_OR_RETURN(rf,
lake_io_opts.fs->new_random_access_file(
opts, lake_io_opts.location_provider->delvec_location(metadata.id(), delvec_name)));
} else {
ASSIGN_OR_RETURN(rf, fs::new_random_access_file(opts, tablet_mgr->delvec_location(metadata.id(), delvec_name)));
}
RETURN_IF_ERROR(rf->read_at_fully(delvec_page.offset(), buf.data(), delvec_page.size()));
// parse delvec
RETURN_IF_ERROR(delvec->load(delvec_page.version(), buf.data(), delvec_page.size()));
// put in cache
if (fill_cache) {
auto delvec_cache_ptr = std::make_shared<DelVector>();
delvec_cache_ptr->copy_from(*delvec);
tablet_mgr->metacache()->cache_delvec(cache_key, delvec_cache_ptr);
}
TRACE("end load delvec");
return Status::OK();
}
Status get_del_vec(TabletManager* tablet_mgr, const TabletMetadata& metadata, uint32_t segment_id, bool fill_cache,
const LakeIOOptions& lake_io_opts, DelVector* delvec) {
// find delvec by segment id
auto iter = metadata.delvec_meta().delvecs().find(segment_id);
if (iter != metadata.delvec_meta().delvecs().end()) {
VLOG(2) << fmt::format("get_del_vec {} segid {}", metadata.delvec_meta().ShortDebugString(), segment_id);
std::string buf;
raw::stl_string_resize_uninitialized(&buf, iter->second.size());
// find in cache
std::string cache_key = delvec_cache_key(metadata.id(), iter->second);
auto cached_delvec = tablet_mgr->metacache()->lookup_delvec(cache_key);
if (cached_delvec != nullptr) {
delvec->copy_from(*cached_delvec);
return Status::OK();
}
// lookup delvec file name and then read it
auto iter2 = metadata.delvec_meta().version_to_file().find(iter->second.version());
if (iter2 == metadata.delvec_meta().version_to_file().end()) {
LOG(ERROR) << "Can't find delvec file name for tablet: " << metadata.id()
<< ", version: " << iter->second.version();
return Status::InternalError("Can't find delvec file name");
}
const auto& delvec_name = iter2->second.name();
RandomAccessFileOptions opts{.skip_fill_local_cache = !lake_io_opts.fill_data_cache};
std::unique_ptr<RandomAccessFile> rf;
if (lake_io_opts.fs && lake_io_opts.location_provider) {
ASSIGN_OR_RETURN(
rf, lake_io_opts.fs->new_random_access_file(
opts, lake_io_opts.location_provider->delvec_location(metadata.id(), delvec_name)));
} else {
ASSIGN_OR_RETURN(rf,
fs::new_random_access_file(opts, tablet_mgr->delvec_location(metadata.id(), delvec_name)));
}
RETURN_IF_ERROR(rf->read_at_fully(iter->second.offset(), buf.data(), iter->second.size()));
// parse delvec
RETURN_IF_ERROR(delvec->load(iter->second.version(), buf.data(), iter->second.size()));
// put in cache
if (fill_cache) {
auto delvec_cache_ptr = std::make_shared<DelVector>();
delvec_cache_ptr->copy_from(*delvec);
tablet_mgr->metacache()->cache_delvec(cache_key, delvec_cache_ptr);
}
TRACE("end load delvec");
return Status::OK();
return get_del_vec(tablet_mgr, metadata, iter->second, fill_cache, lake_io_opts, delvec);
}
VLOG(2) << fmt::format("get_del_vec not found, segmentid {} tablet_meta {}", segment_id,
metadata.delvec_meta().ShortDebugString());

View File

@ -78,6 +78,15 @@ public:
const TabletMetadata* tablet_meta() const { return _tablet_meta.get(); }
const DelvecPagePB& delvec_page(uint32_t segment_id) const {
static DelvecPagePB empty;
auto it = _delvecs.find(segment_id);
if (it != _delvecs.end()) {
return it->second;
}
return empty;
}
private:
// update delvec in tablet meta
Status _finalize_delvec(int64_t version, int64_t txn_id);
@ -113,6 +122,8 @@ private:
PendingRowsetData _pending_rowset_data;
};
Status get_del_vec(TabletManager* tablet_mgr, const TabletMetadata& metadata, const DelvecPagePB& delvec_page,
bool fill_cache, const LakeIOOptions& lake_io_opts, DelVector* delvec);
Status get_del_vec(TabletManager* tablet_mgr, const TabletMetadata& metadata, uint32_t segment_id, bool fill_cache,
const LakeIOOptions& lake_io_opts, DelVector* delvec);
bool is_primary_key(TabletMetadata* metadata);

View File

@ -27,8 +27,8 @@
namespace starrocks::lake {
Status PersistentIndexSstable::init(std::unique_ptr<RandomAccessFile> rf, const PersistentIndexSstablePB& sstable_pb,
Cache* cache, TabletManager* tablet_mgr, int64_t tablet_id, bool need_filter,
DelVectorPtr delvec) {
Cache* cache, bool need_filter, DelVectorPtr delvec,
const TabletMetadataPtr& metadata, TabletManager* tablet_mgr) {
sstable::Options options;
if (need_filter) {
_filter_policy.reset(const_cast<sstable::FilterPolicy*>(sstable::NewBloomFilterPolicy(10)));
@ -41,17 +41,22 @@ Status PersistentIndexSstable::init(std::unique_ptr<RandomAccessFile> rf, const
_rf = std::move(rf);
_sstable_pb.CopyFrom(sstable_pb);
// load delvec
if (_sstable_pb.has_delvec()) {
if (_sstable_pb.has_delvec() && _sstable_pb.delvec().size() > 0) {
if (delvec) {
// If delvec is already provided, use it directly.
_delvec = std::move(delvec);
} else {
if (metadata == nullptr) {
return Status::InvalidArgument("metadata is null when loading delvec from file");
}
if (tablet_mgr == nullptr) {
return Status::InvalidArgument("tablet_mgr is null when loading delvec from file");
}
// otherwise, load delvec from file
LakeIOOptions lake_io_opts{.fill_data_cache = true, .skip_disk_cache = false};
auto delvec_loader =
std::make_unique<LakeDelvecLoader>(tablet_mgr, nullptr, true /* fill cache */, lake_io_opts);
RETURN_IF_ERROR(delvec_loader->load(TabletSegmentId(tablet_id, _sstable_pb.shared_rssid()),
_sstable_pb.shared_version(), &_delvec));
RETURN_IF_ERROR(delvec_loader->load_from_meta(metadata, _sstable_pb.delvec(), &_delvec));
}
}
return Status::OK();

View File

@ -19,6 +19,7 @@
#include <vector>
#include "gen_cpp/lake_types.pb.h"
#include "storage/lake/tablet_metadata.h"
#include "storage/persistent_index.h"
#include "storage/sstable/filter_policy.h"
#include "storage/sstable/table.h"
@ -47,7 +48,8 @@ public:
~PersistentIndexSstable() = default;
Status init(std::unique_ptr<RandomAccessFile> rf, const PersistentIndexSstablePB& sstable_pb, Cache* cache,
TabletManager* tablet_mgr, int64_t tablet_id, bool need_filter = true, DelVectorPtr delvec = nullptr);
bool need_filter = true, DelVectorPtr delvec = nullptr, const TabletMetadataPtr& metadata = nullptr,
TabletManager* tablet_mgr = nullptr);
static Status build_sstable(const phmap::btree_map<std::string, IndexValueWithVer, std::less<>>& map,
WritableFile* wf, uint64_t* filesz);

View File

@ -297,7 +297,7 @@ Status UpdateManager::publish_primary_key_tablet(const TxnLogPB_OpWrite& op_writ
// TODO support condition column with sst ingestion.
// rowset_id + segment_id is the rssid of this segment
RETURN_IF_ERROR(index.ingest_sst(op_write.ssts(segment_id), rowset_id + segment_id, metadata->version(),
false /* no compaction */, nullptr));
DelvecPagePB() /* empty */, nullptr));
}
}
@ -1197,19 +1197,22 @@ Status UpdateManager::light_publish_primary_compaction(const TxnLogPB_OpCompacti
} else {
RETURN_IF_ERROR(resolver->execute());
}
// 3. ingest ssts to index
// 3. add delvec to builder
for (auto&& each : delvecs) {
builder->append_delvec(each.second, each.first);
}
// 4. ingest ssts to index
DCHECK(op_compaction.ssts_size() == 0 || delvecs.size() == op_compaction.ssts_size())
<< "delvecs.size(): " << delvecs.size() << ", op_compaction.ssts_size(): " << op_compaction.ssts_size();
for (int i = 0; i < op_compaction.ssts_size(); i++) {
// metadata.next_rowset_id() + i is the rssid of output rowset's i-th segment
DelvecPagePB delvec_page_pb = builder->delvec_page(metadata.next_rowset_id() + i);
delvec_page_pb.set_version(metadata.version());
RETURN_IF_ERROR(index.ingest_sst(op_compaction.ssts(i), metadata.next_rowset_id() + i, metadata.version(),
true /* is compaction */, delvecs[i].second));
delvec_page_pb, delvecs[i].second));
}
_index_cache.update_object_size(index_entry, index.memory_usage());
// 4. update TabletMeta and write to meta file
for (auto&& each : delvecs) {
builder->append_delvec(each.second, each.first);
}
// 5. update TabletMeta
builder->apply_opcompaction(op_compaction, max_rowset_id, tablet_schema->id());
RETURN_IF_ERROR(builder->update_num_del_stat(segment_id_to_add_dels));
RETURN_IF_ERROR(index.apply_opcompaction(metadata, op_compaction));
@ -1552,7 +1555,7 @@ void UpdateManager::set_enable_persistent_index(int64_t tablet_id, bool enable_p
}
}
Status UpdateManager::execute_index_major_compaction(const TabletMetadata& metadata, TxnLogPB* txn_log) {
Status UpdateManager::execute_index_major_compaction(const TabletMetadataPtr& metadata, TxnLogPB* txn_log) {
return LakePersistentIndex::major_compact(_tablet_mgr, metadata, txn_log);
}

View File

@ -219,7 +219,7 @@ public:
void set_enable_persistent_index(int64_t tablet_id, bool enable_persistent_index);
Status execute_index_major_compaction(const TabletMetadata& metadata, TxnLogPB* txn_log);
Status execute_index_major_compaction(const TabletMetadataPtr& metadata, TxnLogPB* txn_log);
PersistentIndexBlockCache* block_cache() { return _block_cache.get(); }

View File

@ -27,7 +27,7 @@ namespace starrocks::lake {
class LakePersistentIndexTest : public TestBase {
public:
LakePersistentIndexTest() : TestBase(kTestDirectory) {
_tablet_metadata = std::make_unique<TabletMetadata>();
_tablet_metadata = std::make_shared<TabletMetadata>();
_tablet_metadata->set_id(next_id());
_tablet_metadata->set_version(1);
_tablet_metadata->set_enable_persistent_index(true);
@ -70,7 +70,7 @@ protected:
constexpr static const char* const kTestDirectory = "test_lake_persistent_index";
std::unique_ptr<TabletMetadata> _tablet_metadata;
std::shared_ptr<TabletMetadata> _tablet_metadata;
};
TEST_F(LakePersistentIndexTest, test_basic_api) {
@ -91,7 +91,7 @@ TEST_F(LakePersistentIndexTest, test_basic_api) {
}
auto tablet_id = _tablet_metadata->id();
auto index = std::make_unique<LakePersistentIndex>(_tablet_mgr.get(), tablet_id);
ASSERT_OK(index->init(_tablet_metadata->sstable_meta()));
ASSERT_OK(index->init(_tablet_metadata));
ASSERT_OK(index->insert(N, key_slices.data(), values.data(), 0));
ASSERT_TRUE(index->memory_usage() > 0);
@ -172,7 +172,7 @@ TEST_F(LakePersistentIndexTest, test_replace) {
auto tablet_id = _tablet_metadata->id();
auto index = std::make_unique<LakePersistentIndex>(_tablet_mgr.get(), tablet_id);
ASSERT_OK(index->init(_tablet_metadata->sstable_meta()));
ASSERT_OK(index->init(_tablet_metadata));
ASSERT_OK(index->insert(N, key_slices.data(), values.data(), false));
//replace
@ -202,7 +202,7 @@ TEST_F(LakePersistentIndexTest, test_major_compaction) {
total_keys.reserve(M * N);
auto tablet_id = _tablet_metadata->id();
auto index = std::make_unique<LakePersistentIndex>(_tablet_mgr.get(), tablet_id);
ASSERT_OK(index->init(_tablet_metadata->sstable_meta()));
ASSERT_OK(index->init(_tablet_metadata));
int k = 0;
for (int i = 0; i < M; ++i) {
vector<Key> keys;
@ -241,7 +241,7 @@ TEST_F(LakePersistentIndexTest, test_major_compaction) {
get_values.reserve(M * N);
auto txn_log = std::make_shared<TxnLogPB>();
// try to compact sst files.
ASSERT_OK(LakePersistentIndex::major_compact(_tablet_mgr.get(), *tablet_metadata_ptr, txn_log.get()));
ASSERT_OK(LakePersistentIndex::major_compact(_tablet_mgr.get(), tablet_metadata_ptr, txn_log.get()));
ASSERT_TRUE(txn_log->op_compaction().input_sstables_size() > 0);
ASSERT_TRUE(txn_log->op_compaction().has_output_sstable());
ASSERT_OK(index->apply_opcompaction(txn_log->op_compaction()));
@ -304,7 +304,7 @@ TEST_F(LakePersistentIndexTest, test_compaction_strategy) {
TEST_F(LakePersistentIndexTest, test_insert_delete) {
auto tablet_id = _tablet_metadata->id();
auto index = std::make_unique<LakePersistentIndex>(_tablet_mgr.get(), tablet_id);
ASSERT_OK(index->init(_tablet_metadata->sstable_meta()));
ASSERT_OK(index->init(_tablet_metadata));
auto l0_max_mem_usage = config::l0_max_mem_usage;
config::l0_max_mem_usage = 10;
@ -355,7 +355,7 @@ TEST_F(LakePersistentIndexTest, test_insert_delete) {
TEST_F(LakePersistentIndexTest, test_memtable_full) {
auto tablet_id = _tablet_metadata->id();
auto index = std::make_unique<LakePersistentIndex>(_tablet_mgr.get(), tablet_id);
ASSERT_OK(index->init(_tablet_metadata->sstable_meta()));
ASSERT_OK(index->init(_tablet_metadata));
size_t old_l0_max_mem_usage = config::l0_max_mem_usage;
config::l0_max_mem_usage = 1073741824;
@ -493,7 +493,7 @@ TEST_F(LakePersistentIndexTest, test_major_compaction_with_predicate) {
total_keys.reserve(M * N);
auto tablet_id = _tablet_metadata->id();
auto index = std::make_unique<LakePersistentIndex>(_tablet_mgr.get(), tablet_id);
ASSERT_OK(index->init(_tablet_metadata->sstable_meta()));
ASSERT_OK(index->init(_tablet_metadata));
int k = 0;
for (int i = 0; i < M; ++i) {
vector<Key> keys;
@ -540,7 +540,7 @@ TEST_F(LakePersistentIndexTest, test_major_compaction_with_predicate) {
auto hit_count = SIMD::count_nonzero(hits.data(), hits.size());
auto txn_log = std::make_shared<TxnLogPB>();
// try to compact sst files.
ASSERT_OK(LakePersistentIndex::major_compact(_tablet_mgr.get(), *tablet_metadata_ptr, txn_log.get()));
ASSERT_OK(LakePersistentIndex::major_compact(_tablet_mgr.get(), tablet_metadata_ptr, txn_log.get()));
ASSERT_TRUE(txn_log->op_compaction().input_sstables_size() == M);
ASSERT_TRUE(txn_log->op_compaction().has_output_sstable() || hit_count == 0);
ASSERT_OK(index->apply_opcompaction(txn_log->op_compaction()));

View File

@ -705,4 +705,117 @@ TEST_F(MetaFileTest, test_batch_apply_opwrite_merge_dels) {
ASSERT_EQ(1, persisted->rowsets_size());
ASSERT_EQ(3, persisted->rowsets(0).del_files_size());
}
TEST_F(MetaFileTest, test_sstable_delvec_integration) {
// Test SSTable delvec integration: test new get_del_vec(DelvecPagePB) function and
// version reference collection from SSTable delvecs during finalization
const int64_t tablet_id = 40001;
const uint32_t segment_id = 1001;
const int64_t version1 = 11;
const int64_t version2 = 12;
auto tablet = std::make_shared<Tablet>(_tablet_manager.get(), tablet_id);
auto metadata = std::make_shared<TabletMetadata>();
metadata->set_id(tablet_id);
metadata->set_version(version1);
metadata->set_next_rowset_id(110);
metadata->mutable_schema()->set_keys_type(PRIMARY_KEYS);
metadata->set_enable_persistent_index(true);
metadata->set_persistent_index_type(PersistentIndexTypePB::CLOUD_NATIVE);
// 1. Create and write delvec first
MetaFileBuilder builder1(*tablet, metadata);
DelVector dv1;
dv1.set_empty();
std::shared_ptr<DelVector> ndv1;
std::vector<uint32_t> dels1 = {1, 3, 5, 7, 100};
dv1.add_dels_as_new_version(dels1, version1, &ndv1);
std::string original_delvec = ndv1->save();
builder1.append_delvec(ndv1, segment_id);
Status st = builder1.finalize(next_id());
EXPECT_TRUE(st.ok());
// 2. Get the delvec page info for creating SSTable delvec
ASSIGN_OR_ABORT(auto metadata1, _tablet_manager->get_tablet_metadata(tablet_id, version1));
auto iter = metadata1->delvec_meta().delvecs().find(segment_id);
EXPECT_TRUE(iter != metadata1->delvec_meta().delvecs().end());
DelvecPagePB delvec_page = iter->second;
// 3. Test new get_del_vec function with DelvecPagePB
DelVector read_delvec1;
LakeIOOptions lake_io_opts;
EXPECT_TRUE(get_del_vec(_tablet_manager.get(), *metadata1, delvec_page, true, lake_io_opts, &read_delvec1).ok());
EXPECT_EQ(original_delvec, read_delvec1.save());
// 4. Create SSTable with delvec and write to version2
metadata->set_version(version2);
MetaFileBuilder builder2(*tablet, metadata);
PersistentIndexSstableMetaPB sstable_meta;
PersistentIndexSstablePB* sstable = sstable_meta.add_sstables();
sstable->set_filename("test_sstable.sst");
sstable->set_filesize(1024);
sstable->set_max_rss_rowid(100);
sstable->mutable_delvec()->CopyFrom(delvec_page); // Use DelvecPagePB instead of has_delvec
builder2.finalize_sstable_meta(sstable_meta);
st = builder2.finalize(next_id());
EXPECT_TRUE(st.ok());
// 5. Verify SSTable contains delvec information
ASSIGN_OR_ABORT(auto metadata2, _tablet_manager->get_tablet_metadata(tablet_id, version2));
EXPECT_EQ(1, metadata2->sstable_meta().sstables_size());
const auto& saved_sstable = metadata2->sstable_meta().sstables(0);
EXPECT_TRUE(saved_sstable.has_delvec());
EXPECT_EQ(delvec_page.version(), saved_sstable.delvec().version());
EXPECT_EQ(delvec_page.offset(), saved_sstable.delvec().offset());
EXPECT_EQ(delvec_page.size(), saved_sstable.delvec().size());
// 6. Test reading delvec via SSTable's delvec page
DelVector read_delvec2;
EXPECT_TRUE(
get_del_vec(_tablet_manager.get(), *metadata2, saved_sstable.delvec(), true, lake_io_opts, &read_delvec2)
.ok());
EXPECT_EQ(original_delvec, read_delvec2.save());
// 7. Test version reference collection: create new metadata without regular delvec but with SSTable delvec
auto metadata3 = std::make_shared<TabletMetadataPB>(*metadata2);
metadata3->set_version(version2 + 1);
metadata3->mutable_delvec_meta()->mutable_delvecs()->clear(); // Clear regular delvecs
MetaFileBuilder builder3(*tablet, metadata3);
st = builder3.finalize(next_id());
EXPECT_TRUE(st.ok());
// 8. Verify that delvec file version is preserved due to SSTable reference
ASSIGN_OR_ABORT(auto metadata4, _tablet_manager->get_tablet_metadata(tablet_id, version2 + 1));
auto version_to_file_map = metadata4->delvec_meta().version_to_file();
// The delvec file with version1 should still exist because SSTable references it
auto version_iter = version_to_file_map.find(version1);
EXPECT_TRUE(version_iter != version_to_file_map.end());
// 9. Verify we can still read delvec from SSTable after cleanup
DelVector read_delvec3;
EXPECT_TRUE(
get_del_vec(_tablet_manager.get(), *metadata4, saved_sstable.delvec(), true, lake_io_opts, &read_delvec3)
.ok());
EXPECT_EQ(original_delvec, read_delvec3.save());
// 10. Remove SSTable delvec reference and verify delvec file cleanup
auto metadata5 = std::make_shared<TabletMetadataPB>(*metadata4);
metadata5->set_version(version2 + 2);
metadata5->mutable_sstable_meta()->clear_sstables(); // Remove SSTable that references delvec
MetaFileBuilder builder4(*tablet, metadata5);
st = builder4.finalize(next_id());
EXPECT_TRUE(st.ok());
// 11. Verify that delvec file version is now removed since no SSTable references it
ASSIGN_OR_ABORT(auto metadata6, _tablet_manager->get_tablet_metadata(tablet_id, version2 + 2));
auto final_version_to_file_map = metadata6->delvec_meta().version_to_file();
// The delvec file with version1 should be removed because no SSTable references it anymore
auto final_version_iter = final_version_to_file_map.find(version1);
EXPECT_TRUE(final_version_iter == final_version_to_file_map.end());
}
} // namespace starrocks::lake

View File

@ -226,7 +226,7 @@ TEST_F(PersistentIndexSstableTest, test_persistent_index_sstable) {
PersistentIndexSstablePB sstable_pb;
sstable_pb.set_filename(filename);
sstable_pb.set_filesize(filesize);
ASSERT_OK(sst->init(std::move(read_file), sstable_pb, cache_ptr.get(), nullptr, 0));
ASSERT_OK(sst->init(std::move(read_file), sstable_pb, cache_ptr.get()));
// check memory usage
ASSERT_TRUE(sst->memory_usage() > 0);
@ -517,7 +517,7 @@ TEST_F(PersistentIndexSstableTest, test_persistent_index_sstable_stream_builder)
PersistentIndexSstablePB sstable_pb;
sstable_pb.set_filename(filename);
sstable_pb.set_filesize(file_size);
ASSERT_OK(sst->init(std::move(read_file), sstable_pb, cache_ptr.get(), nullptr, 0));
ASSERT_OK(sst->init(std::move(read_file), sstable_pb, cache_ptr.get()));
// 4. Iterate and verify all keys
sstable::ReadIOStat stat;

View File

@ -35,6 +35,30 @@ message DelvecPagePB {
optional uint64 size = 3;
}
message PersistentIndexSstablePB {
optional int64 version = 1; // Deprecated
optional string filename = 2;
optional int64 filesize = 3;
// used for rebuild point of persistent index
optional uint64 max_rss_rowid = 4;
optional bytes encryption_meta = 5;
// Whether this file shared by multiple tablets
optional bool shared = 6 [default=false];
// Filter out rows which do not satisfy the predicate
// when reading this specific sstable
optional PersistentIndexSstablePredicatePB predicate = 7;
// For sstable generated during data write & compaction process.
optional uint32 shared_rssid = 8;
optional int64 shared_version = 9;
// For sstable generated during compaction process.
optional DelvecPagePB delvec = 10;
}
message PersistentIndexSstableMetaPB {
// sstables are ordered with the smaller version on the left.
repeated PersistentIndexSstablePB sstables = 1;
}
message DelvecCacheKeyPB {
optional int64 id = 1;
optional DelvecPagePB delvec_page = 2;

View File

@ -217,30 +217,6 @@ message IndexValuesWithVerPB {
repeated IndexValueWithVerPB values = 1;
}
message PersistentIndexSstablePB {
optional int64 version = 1; // Deprecated
optional string filename = 2;
optional int64 filesize = 3;
// used for rebuild point of persistent index
optional uint64 max_rss_rowid = 4;
optional bytes encryption_meta = 5;
// Whether this file shared by multiple tablets
optional bool shared = 6 [default=false];
// Filter out rows which do not satisfy the predicate
// when reading this specific sstable
optional PersistentIndexSstablePredicatePB predicate = 7;
// For sstable generated during data write & compaction process.
optional uint32 shared_rssid = 8;
optional int64 shared_version = 9;
// For sstable generated during compaction process.
optional bool has_delvec = 10;
}
message PersistentIndexSstableMetaPB {
// sstables are ordered with the smaller version on the left.
repeated PersistentIndexSstablePB sstables = 1;
}
enum TransactionStatusPB {
TRANS_UNKNOWN = 0;
TRANS_PREPARE = 1;