[Enhancement] publish sst files generated during data loading and compaction (#63005)

Signed-off-by: luohaha <18810541851@163.com>
This commit is contained in:
Yixin Luo 2025-09-16 19:13:30 +08:00 committed by GitHub
parent 0925f39e61
commit a5e2e085dc
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
22 changed files with 588 additions and 27 deletions

View File

@ -49,6 +49,18 @@ Status KeyValueMerger::merge(const sstable::Iterator* iter_ptr) {
if (index_value_ver.values_size() == 0) {
return Status::OK();
}
// filter rows which already been deleted in this sst
if (iter_ptr->delvec() != nullptr && iter_ptr->delvec()->roaring()->contains(index_value_ver.values(0).rowid())) {
// this row has been deleted in this sst, skip it
return Status::OK();
}
// fill shared version & rssid if have
if (iter_ptr->shared_version() > 0) {
for (size_t i = 0; i < index_value_ver.values_size(); ++i) {
index_value_ver.mutable_values(i)->set_version(iter_ptr->shared_version());
index_value_ver.mutable_values(i)->set_rssid(iter_ptr->shared_rssid());
}
}
/*
* Do not distinguish between base compaction and cumulative compaction here.
@ -163,6 +175,10 @@ LakePersistentIndex::~LakePersistentIndex() {
}
Status LakePersistentIndex::init(const PersistentIndexSstableMetaPB& sstable_meta) {
auto* block_cache = _tablet_mgr->update_mgr()->block_cache();
if (block_cache == nullptr) {
return Status::InternalError("Block cache is null.");
}
uint64_t max_rss_rowid = 0;
for (auto& sstable_pb : sstable_meta.sstables()) {
RandomAccessFileOptions opts;
@ -172,12 +188,8 @@ Status LakePersistentIndex::init(const PersistentIndexSstableMetaPB& sstable_met
}
ASSIGN_OR_RETURN(auto rf, fs::new_random_access_file(
opts, _tablet_mgr->sst_location(_tablet_id, sstable_pb.filename())));
auto* block_cache = _tablet_mgr->update_mgr()->block_cache();
if (block_cache == nullptr) {
return Status::InternalError("Block cache is null.");
}
auto sstable = std::make_unique<PersistentIndexSstable>();
RETURN_IF_ERROR(sstable->init(std::move(rf), sstable_pb, block_cache->cache()));
RETURN_IF_ERROR(sstable->init(std::move(rf), sstable_pb, block_cache->cache(), _tablet_mgr, _tablet_id));
_sstables.emplace_back(std::move(sstable));
max_rss_rowid = std::max(max_rss_rowid, sstable_pb.max_rss_rowid());
}
@ -212,6 +224,10 @@ bool LakePersistentIndex::too_many_rebuild_files() const {
Status LakePersistentIndex::minor_compact() {
TRACE_COUNTER_SCOPE_LATENCY_US("minor_compact_latency_us");
auto* block_cache = _tablet_mgr->update_mgr()->block_cache();
if (block_cache == nullptr) {
return Status::InternalError("Block cache is null.");
}
auto filename = gen_sst_filename();
auto location = _tablet_mgr->sst_location(_tablet_id, filename);
WritableFileOptions wopts;
@ -237,14 +253,44 @@ Status LakePersistentIndex::minor_compact() {
sstable_pb.set_filesize(filesize);
sstable_pb.set_max_rss_rowid(_memtable->max_rss_rowid());
sstable_pb.set_encryption_meta(encryption_meta);
TEST_SYNC_POINT_CALLBACK("LakePersistentIndex::minor_compact:inject_predicate", &sstable_pb);
RETURN_IF_ERROR(sstable->init(std::move(rf), sstable_pb, block_cache->cache(), _tablet_mgr, _tablet_id));
_sstables.emplace_back(std::move(sstable));
TRACE_COUNTER_INCREMENT("minor_compact_times", 1);
return Status::OK();
}
Status LakePersistentIndex::ingest_sst(const FileMetaPB& sst_meta, uint32_t rssid, int64_t version, bool is_compaction,
DelVectorPtr delvec) {
auto* block_cache = _tablet_mgr->update_mgr()->block_cache();
if (block_cache == nullptr) {
return Status::InternalError("Block cache is null.");
}
TEST_SYNC_POINT_CALLBACK("LakePersistentIndex::minor_compact:inject_predicate", &sstable_pb);
RETURN_IF_ERROR(sstable->init(std::move(rf), sstable_pb, block_cache->cache()));
if (!_memtable->empty()) {
RETURN_IF_ERROR(flush_memtable());
}
TRACE_COUNTER_SCOPE_LATENCY_US("ingest_sst_latency_us");
auto sstable = std::make_unique<PersistentIndexSstable>();
RandomAccessFileOptions opts;
if (!sst_meta.encryption_meta().empty()) {
ASSIGN_OR_RETURN(opts.encryption_info, KeyCache::instance().unwrap_encryption_meta(sst_meta.encryption_meta()));
}
auto location = _tablet_mgr->sst_location(_tablet_id, sst_meta.name());
ASSIGN_OR_RETURN(auto rf, fs::new_random_access_file(opts, location));
PersistentIndexSstablePB sstable_pb;
sstable_pb.set_filename(sst_meta.name());
sstable_pb.set_filesize(sst_meta.size());
sstable_pb.set_shared_rssid(rssid);
sstable_pb.set_shared_version(version);
sstable_pb.set_encryption_meta(sst_meta.encryption_meta());
// sstable generated by compaction need delvec to resolve conflict.
sstable_pb.set_has_delvec(is_compaction);
// use UINT32_MAX as max rowid here to indicate all rows of this segment are already contained in this sst
sstable_pb.set_max_rss_rowid((static_cast<uint64_t>(rssid) << 32) | UINT32_MAX);
RETURN_IF_ERROR(sstable->init(std::move(rf), sstable_pb, block_cache->cache(), _tablet_mgr, _tablet_id,
true /* need filter */, std::move(delvec)));
_sstables.emplace_back(std::move(sstable));
TRACE_COUNTER_INCREMENT("minor_compact_times", 1);
TRACE_COUNTER_INCREMENT("ingest_sst_times", 1);
return Status::OK();
}
@ -437,7 +483,8 @@ Status LakePersistentIndex::prepare_merging_iterator(
ASSIGN_OR_RETURN(auto rf, fs::new_random_access_file(
opts, tablet_mgr->sst_location(metadata.id(), sstable_pb.filename())));
auto merging_sstable = std::make_shared<PersistentIndexSstable>();
RETURN_IF_ERROR(merging_sstable->init(std::move(rf), sstable_pb, nullptr, false /** no filter **/));
RETURN_IF_ERROR(merging_sstable->init(std::move(rf), sstable_pb, nullptr, tablet_mgr, metadata.id(),
false /** no filter **/));
merging_sstables->push_back(merging_sstable);
// Pass `max_rss_rowid` to iterator, will be used when compaction.
read_options.max_rss_rowid = sstable_pb.max_rss_rowid();
@ -445,6 +492,9 @@ Status LakePersistentIndex::prepare_merging_iterator(
ASSIGN_OR_RETURN(read_options.predicate,
sstable::SstablePredicate::create(metadata.schema(), sstable_pb.predicate()));
}
read_options.shared_rssid = sstable_pb.shared_rssid();
read_options.shared_version = sstable_pb.shared_version();
read_options.delvec = merging_sstable->delvec();
sstable::Iterator* iter = merging_sstable->new_iterator(read_options);
iters.emplace_back(iter);
// add input sstable.
@ -521,6 +571,10 @@ Status LakePersistentIndex::apply_opcompaction(const TxnLogPB_OpCompaction& op_c
if (op_compaction.input_sstables().empty() || !op_compaction.has_output_sstable()) {
return Status::OK();
}
auto* block_cache = _tablet_mgr->update_mgr()->block_cache();
if (block_cache == nullptr) {
return Status::InternalError("Block cache is null.");
}
PersistentIndexSstablePB sstable_pb;
sstable_pb.CopyFrom(op_compaction.output_sstable());
@ -534,11 +588,7 @@ Status LakePersistentIndex::apply_opcompaction(const TxnLogPB_OpCompaction& op_c
}
ASSIGN_OR_RETURN(auto rf,
fs::new_random_access_file(opts, _tablet_mgr->sst_location(_tablet_id, sstable_pb.filename())));
auto* block_cache = _tablet_mgr->update_mgr()->block_cache();
if (block_cache == nullptr) {
return Status::InternalError("Block cache is null.");
}
RETURN_IF_ERROR(sstable->init(std::move(rf), sstable_pb, block_cache->cache()));
RETURN_IF_ERROR(sstable->init(std::move(rf), sstable_pb, block_cache->cache(), _tablet_mgr, _tablet_id));
std::unordered_set<std::string> filenames;
for (const auto& input_sstable : op_compaction.input_sstables()) {

View File

@ -137,6 +137,9 @@ public:
Status minor_compact();
Status ingest_sst(const FileMetaPB& sst_meta, uint32_t rssid, int64_t version, bool is_compaction,
DelVectorPtr delvec);
static Status major_compact(TabletManager* tablet_mgr, const TabletMetadata& metadata, TxnLogPB* txn_log);
Status apply_opcompaction(const TxnLogPB_OpCompaction& op_compaction);

View File

@ -201,6 +201,20 @@ Status LakePrimaryIndex::apply_opcompaction(const TabletMetadata& metadata,
return Status::OK();
}
Status LakePrimaryIndex::ingest_sst(const FileMetaPB& sst_meta, uint32_t rssid, int64_t version, bool is_compaction,
DelVectorPtr delvec) {
if (!_enable_persistent_index) {
return Status::OK();
}
auto* lake_persistent_index = dynamic_cast<LakePersistentIndex*>(_persistent_index.get());
if (lake_persistent_index != nullptr) {
return lake_persistent_index->ingest_sst(sst_meta, rssid, version, is_compaction, std::move(delvec));
} else {
return Status::InternalError("Persistent index is not a LakePersistentIndex.");
}
}
Status LakePrimaryIndex::commit(const TabletMetadataPtr& metadata, MetaFileBuilder* builder) {
TRACE_COUNTER_SCOPE_LATENCY_US("primary_index_commit_latency_us");
if (!_enable_persistent_index) {

View File

@ -68,6 +68,9 @@ public:
Status commit(const TabletMetadataPtr& metadata, MetaFileBuilder* builder);
Status ingest_sst(const FileMetaPB& sst_meta, uint32_t rssid, int64_t version, bool is_compaction,
DelVectorPtr delvec);
double get_local_pk_index_write_amp_score();
void set_local_pk_index_write_amp_score(double score);

View File

@ -48,10 +48,10 @@ Status LakePrimaryKeyCompactionConflictResolver::segment_iterator(
ASSIGN_OR_RETURN(auto segment_iters, _rowset->get_each_segment_iterator(pkey_schema, false, &stats));
RETURN_ERROR_IF_FALSE(segment_iters.size() == _rowset->num_segments());
// init delvec loader
SegmentReadOptions seg_options;
LakeIOOptions lake_io_opts{.fill_data_cache = true, .skip_disk_cache = false};
auto delvec_loader =
std::make_unique<LakeDelvecLoader>(_tablet_mgr, _builder, false /* fill cache */, seg_options.lake_io_opts);
std::make_unique<LakeDelvecLoader>(_tablet_mgr, _builder, false /* fill cache */, lake_io_opts);
// init params
CompactConflictResolveParams params;
params.tablet_id = _rowset->tablet_id();
@ -66,4 +66,30 @@ Status LakePrimaryKeyCompactionConflictResolver::segment_iterator(
});
}
Status LakePrimaryKeyCompactionConflictResolver::segment_iterator(
const std::function<Status(const CompactConflictResolveParams&, const std::vector<SegmentPtr>&,
const std::function<void(uint32_t, const DelVectorPtr&, uint32_t)>&)>& handler) {
// load all segments
std::vector<SegmentPtr> segments;
RETURN_IF_ERROR(_rowset->load_segments(&segments, true /* file cache*/));
RETURN_ERROR_IF_FALSE(segments.size() == _rowset->num_segments());
// init delvec loader
LakeIOOptions lake_io_opts{.fill_data_cache = true, .skip_disk_cache = false};
auto delvec_loader =
std::make_unique<LakeDelvecLoader>(_tablet_mgr, _builder, false /* fill cache */, lake_io_opts);
// init params
CompactConflictResolveParams params;
params.tablet_id = _rowset->tablet_id();
params.rowset_id = _metadata->next_rowset_id();
params.base_version = _base_version;
params.new_version = _metadata->version();
params.delvec_loader = delvec_loader.get();
params.index = _index;
return handler(params, segments, [&](uint32_t rssid, const DelVectorPtr& dv, uint32_t num_dels) {
(*_segment_id_to_add_dels)[rssid] += num_dels;
_delvecs->emplace_back(rssid, dv);
});
}
} // namespace starrocks::lake

View File

@ -15,6 +15,7 @@
#pragma once
#include "storage/lake/tablet_metadata.h"
#include "storage/lake/types_fwd.h"
#include "storage/primary_key_compaction_conflict_resolver.h"
#include "storage/tablet_manager.h"
@ -50,6 +51,11 @@ public:
const std::function<void(uint32_t, const DelVectorPtr&, uint32_t)>&)>& handler)
override;
Status segment_iterator(
const std::function<Status(const CompactConflictResolveParams&, const std::vector<SegmentPtr>&,
const std::function<void(uint32_t, const DelVectorPtr&, uint32_t)>&)>& handler)
override;
private:
// input
const TabletMetadata* _metadata = nullptr;

View File

@ -27,7 +27,8 @@
namespace starrocks::lake {
Status PersistentIndexSstable::init(std::unique_ptr<RandomAccessFile> rf, const PersistentIndexSstablePB& sstable_pb,
Cache* cache, bool need_filter) {
Cache* cache, TabletManager* tablet_mgr, int64_t tablet_id, bool need_filter,
DelVectorPtr delvec) {
sstable::Options options;
if (need_filter) {
_filter_policy.reset(const_cast<sstable::FilterPolicy*>(sstable::NewBloomFilterPolicy(10)));
@ -39,6 +40,20 @@ Status PersistentIndexSstable::init(std::unique_ptr<RandomAccessFile> rf, const
_sst.reset(table);
_rf = std::move(rf);
_sstable_pb.CopyFrom(sstable_pb);
// load delvec
if (_sstable_pb.has_delvec()) {
if (delvec) {
// If delvec is already provided, use it directly.
_delvec = std::move(delvec);
} else {
// otherwise, load delvec from file
LakeIOOptions lake_io_opts{.fill_data_cache = true, .skip_disk_cache = false};
auto delvec_loader =
std::make_unique<LakeDelvecLoader>(tablet_mgr, nullptr, true /* fill cache */, lake_io_opts);
RETURN_IF_ERROR(delvec_loader->load(TabletSegmentId(tablet_id, _sstable_pb.shared_rssid()),
_sstable_pb.shared_version(), &_delvec));
}
}
return Status::OK();
}
@ -90,6 +105,22 @@ Status PersistentIndexSstable::multi_get(const Slice* keys, const KeyIndexSet& k
if (!index_value_with_ver_pb.ParseFromString(index_value_with_vers[i])) {
return Status::InternalError("parse index value info failed");
}
// Check if this rowid is already filtered by delvec
if (_delvec) {
if (_delvec->roaring()->contains(index_value_with_ver_pb.values(0).rowid())) {
++i;
continue;
}
}
// fill shared rssid & version if have
if (_sstable_pb.has_shared_version() && _sstable_pb.shared_version() > 0) {
DCHECK(_sstable_pb.has_shared_rssid());
for (size_t j = 0; j < index_value_with_ver_pb.values_size(); ++j) {
index_value_with_ver_pb.mutable_values(j)->set_rssid(_sstable_pb.shared_rssid());
index_value_with_ver_pb.mutable_values(j)->set_version(_sstable_pb.shared_version());
}
}
if (index_value_with_ver_pb.values_size() > 0) {
if (version < 0) {
values[key_index] = build_index_value(index_value_with_ver_pb.values(0));

View File

@ -47,7 +47,7 @@ public:
~PersistentIndexSstable() = default;
Status init(std::unique_ptr<RandomAccessFile> rf, const PersistentIndexSstablePB& sstable_pb, Cache* cache,
bool need_filter = true);
TabletManager* tablet_mgr, int64_t tablet_id, bool need_filter = true, DelVectorPtr delvec = nullptr);
static Status build_sstable(const phmap::btree_map<std::string, IndexValueWithVer, std::less<>>& map,
WritableFile* wf, uint64_t* filesz);
@ -67,11 +67,16 @@ public:
size_t memory_usage() const;
// `_delvec` should only be modified in `init()` via publish version thread
// which is thread-safe. And after that, it should be immutable.
DelVectorPtr delvec() const { return _delvec; }
private:
std::unique_ptr<sstable::Table> _sst{nullptr};
std::unique_ptr<sstable::FilterPolicy> _filter_policy{nullptr};
std::unique_ptr<RandomAccessFile> _rf{nullptr};
PersistentIndexSstablePB _sstable_pb;
DelVectorPtr _delvec;
};
class PersistentIndexSstableStreamBuilder {

View File

@ -162,7 +162,6 @@ public:
_schema->column(0).type() == LogicalType::TYPE_CHAR) {
_enable_pk_parallel_execution = true;
}
return;
}
bool enable_pk_parallel_execution() const { return _enable_pk_parallel_execution; }

View File

@ -271,7 +271,8 @@ Status UpdateManager::publish_primary_key_tablet(const TxnLogPB_OpWrite& op_writ
TRACE_COUNTER_SCOPE_LATENCY_US("update_index_latency_us");
DCHECK(state.upserts(segment_id) != nullptr);
if (condition_column < 0) {
RETURN_IF_ERROR(_do_update(rowset_id, segment_id, state.upserts(segment_id), index, &new_deletes));
RETURN_IF_ERROR(_do_update(rowset_id, segment_id, state.upserts(segment_id), index, &new_deletes,
op_write.ssts_size() > 0 /* read pk index only when ingest sst */));
} else {
RETURN_IF_ERROR(_do_update_with_condition(params, rowset_id, segment_id, condition_column,
state.upserts(segment_id)->pk_column, index, &new_deletes));
@ -284,6 +285,12 @@ Status UpdateManager::publish_primary_key_tablet(const TxnLogPB_OpWrite& op_writ
_index_cache.update_object_size(index_entry, index.memory_usage());
state.release_segment(segment_id);
_update_state_cache.update_object_size(state_entry, state.memory_usage());
if (op_write.ssts_size() > 0 && condition_column < 0) {
// TODO support condition column with sst ingestion.
// rowset_id + segment_id is the rssid of this segment
RETURN_IF_ERROR(index.ingest_sst(op_write.ssts(segment_id), rowset_id + segment_id, metadata->version(),
false /* no compaction */, nullptr));
}
}
// 3. Handle del files one by one.
@ -410,11 +417,22 @@ Status UpdateManager::publish_column_mode_partial_update(const TxnLogPB_OpWrite&
}
Status UpdateManager::_do_update(uint32_t rowset_id, int32_t upsert_idx, const SegmentPKEncodeResultPtr& upsert,
PrimaryIndex& index, DeletesMap* new_deletes) {
PrimaryIndex& index, DeletesMap* new_deletes, bool skip_pk_index_update) {
TRACE_COUNTER_SCOPE_LATENCY_US("do_update_latency_us");
for (; !upsert->done(); upsert->next()) {
auto current = upsert->current();
RETURN_IF_ERROR(index.upsert(rowset_id + upsert_idx, current.second, *current.first, new_deletes));
if (skip_pk_index_update) {
// use get instead of upsert
std::vector<uint64_t> old_values(current.first->size(), NullIndexValue);
RETURN_IF_ERROR(index.get(*current.first, &old_values));
for (unsigned long old : old_values) {
if (old != NullIndexValue) {
(*new_deletes)[(uint32_t)(old >> 32)].push_back((uint32_t)(old & ROWID_MASK));
}
}
} else {
RETURN_IF_ERROR(index.upsert(rowset_id + upsert_idx, current.second, *current.first, new_deletes));
}
}
return upsert->status();
}
@ -799,9 +817,20 @@ Status UpdateManager::light_publish_primary_compaction(const TxnLogPB_OpCompacti
auto resolver = std::make_unique<LakePrimaryKeyCompactionConflictResolver>(&metadata, &output_rowset, _tablet_mgr,
builder, &index, txn_id, base_version,
&segment_id_to_add_dels, &delvecs);
RETURN_IF_ERROR(resolver->execute());
if (op_compaction.ssts_size() > 0) {
RETURN_IF_ERROR(resolver->execute_without_update_index());
} else {
RETURN_IF_ERROR(resolver->execute());
}
// 3. ingest ssts to index
DCHECK(delvecs.size() == op_compaction.ssts_size());
for (int i = 0; i < op_compaction.ssts_size(); i++) {
// metadata.next_rowset_id() + i is the rssid of output rowset's i-th segment
RETURN_IF_ERROR(index.ingest_sst(op_compaction.ssts(i), metadata.next_rowset_id() + i, metadata.version(),
true /* is compaction */, delvecs[i].second));
}
_index_cache.update_object_size(index_entry, index.memory_usage());
// 3. update TabletMeta and write to meta file
// 4. update TabletMeta and write to meta file
for (auto&& each : delvecs) {
builder->append_delvec(each.second, each.first);
}

View File

@ -211,7 +211,7 @@ private:
// print memory tracker state
void _print_memory_stats();
Status _do_update(uint32_t rowset_id, int32_t upsert_idx, const SegmentPKEncodeResultPtr& upsert,
PrimaryIndex& index, DeletesMap* new_deletes);
PrimaryIndex& index, DeletesMap* new_deletes, bool skip_pk_index_update);
Status _do_update_with_condition(const RowsetUpdateStateParams& params, uint32_t rowset_id, int32_t upsert_idx,
int32_t condition_column, const MutableColumnPtr& upsert, PrimaryIndex& index,

View File

@ -62,6 +62,13 @@ Status LocalPrimaryKeyCompactionConflictResolver::segment_iterator(
});
}
Status LocalPrimaryKeyCompactionConflictResolver::segment_iterator(
const std::function<Status(const CompactConflictResolveParams&, const std::vector<std::shared_ptr<Segment>>&,
const std::function<void(uint32_t, const DelVectorPtr&, uint32_t)>&)>& handler) {
return Status::NotSupported(
"LocalPrimaryKeyCompactionConflictResolver::segment_iterator without read data not supported");
}
Status LocalPrimaryKeyCompactionConflictResolver::breakpoint_check() {
return _tablet->updates()->breakpoint_check();
}

View File

@ -43,6 +43,10 @@ public:
const std::function<Status(const CompactConflictResolveParams&, const std::vector<ChunkIteratorPtr>&,
const std::function<void(uint32_t, const DelVectorPtr&, uint32_t)>&)>& handler)
override;
Status segment_iterator(
const std::function<
Status(const CompactConflictResolveParams&, const std::vector<std::shared_ptr<Segment>>&,
const std::function<void(uint32_t, const DelVectorPtr&, uint32_t)>&)>& handler) override;
Status breakpoint_check() override;
private:

View File

@ -122,4 +122,55 @@ Status PrimaryKeyCompactionConflictResolver::execute() {
return mapper_iter.status();
}
Status PrimaryKeyCompactionConflictResolver::execute_without_update_index() {
// init rows mapper iter
ASSIGN_OR_RETURN(auto filename, filename());
RowsMapperIterator mapper_iter;
RETURN_IF_ERROR(mapper_iter.open(filename));
// 1. iterate all segment in output rowset
RETURN_IF_ERROR(segment_iterator(
[&](const CompactConflictResolveParams& params, const std::vector<std::shared_ptr<Segment>>& segments,
const std::function<void(uint32_t, const DelVectorPtr&, uint32_t)>& handle_delvec_result_func) {
std::map<uint32_t, DelVectorPtr> rssid_to_delvec;
for (size_t segment_id = 0; segment_id < segments.size(); segment_id++) {
RETURN_IF_ERROR(breakpoint_check());
// 2. get input rssid & rowids, so we can generate delvec
vector<uint32_t> tmp_deletes;
std::vector<uint64_t> rssid_rowids;
RETURN_IF_ERROR(mapper_iter.next_values(segments[segment_id]->num_rows(), &rssid_rowids));
DCHECK(segments[segment_id]->num_rows() == rssid_rowids.size());
for (int i = 0; i < rssid_rowids.size(); i++) {
const uint32_t rssid = rssid_rowids[i] >> 32;
const uint32_t rowid = rssid_rowids[i] & 0xffffffff;
if (rssid_to_delvec.count(rssid) == 0) {
// get delvec by loader
DelVectorPtr delvec_ptr;
{
TRACE_COUNTER_SCOPE_LATENCY_US("compaction_delvec_loader_latency_us");
RETURN_IF_ERROR(params.delvec_loader->load({params.tablet_id, rssid},
params.base_version, &delvec_ptr));
}
rssid_to_delvec[rssid] = delvec_ptr;
}
if (!rssid_to_delvec[rssid]->empty() && rssid_to_delvec[rssid]->roaring()->contains(rowid)) {
// Input row had been deleted, so we need to delete it from output rowset
tmp_deletes.push_back(i);
}
}
// 3. generate final delvec
DelVectorPtr dv = std::make_shared<DelVector>();
if (tmp_deletes.empty()) {
dv->init(params.new_version, nullptr, 0);
} else {
dv->init(params.new_version, tmp_deletes.data(), tmp_deletes.size());
}
handle_delvec_result_func(params.rowset_id + segment_id, dv, tmp_deletes.size());
}
return Status::OK();
}));
return mapper_iter.status();
}
} // namespace starrocks

View File

@ -25,6 +25,7 @@ namespace starrocks {
class DelvecLoader;
class Schema;
class PrimaryIndex;
class Segment;
struct CompactConflictResolveParams {
int64_t tablet_id = 0;
@ -45,8 +46,15 @@ public:
const std::function<Status(const CompactConflictResolveParams&, const std::vector<ChunkIteratorPtr>&,
const std::function<void(uint32_t, const DelVectorPtr&, uint32_t)>&)>&
handler) = 0;
// This function won't read data from each segment files. Only need to get segment's row count.
virtual Status segment_iterator(
const std::function<
Status(const CompactConflictResolveParams&, const std::vector<std::shared_ptr<Segment>>&,
const std::function<void(uint32_t, const DelVectorPtr&, uint32_t)>&)>& handler) = 0;
Status execute();
Status execute_without_update_index();
};
} // namespace starrocks

View File

@ -5,6 +5,7 @@
#pragma once
#include "common/status.h"
#include "storage/del_vector.h"
#include "storage/sstable/sstable_predicate_fwd.h"
#include "util/slice.h"
@ -64,6 +65,11 @@ public:
// Return the max rss_rowid the iterator contains.
virtual uint64_t max_rss_rowid() const { return 0; };
// Return the shared rssid & version the iterator contains.
virtual uint32_t shared_rssid() const { return 0; };
virtual int64_t shared_version() const { return 0; };
virtual DelVectorPtr delvec() const { return nullptr; };
/*
* Return predicate the iterator contains.
* Currently, predicate is available for TwoLevelIterator and MergingIterator, because such

View File

@ -50,6 +50,18 @@ public:
assert(iter_);
return iter_->max_rss_rowid();
}
uint32_t shared_rssid() const {
assert(iter_);
return iter_->shared_rssid();
}
int64_t shared_version() const {
assert(iter_);
return iter_->shared_version();
}
DelVectorPtr delvec() const {
assert(iter_);
return iter_->delvec();
}
SstablePredicateSPtr predicate() const {
assert(iter_);
return iter_->predicate();

View File

@ -133,6 +133,21 @@ public:
return current_->predicate();
}
uint32_t shared_rssid() const override {
assert(Valid());
return current_->shared_rssid();
}
int64_t shared_version() const override {
assert(Valid());
return current_->shared_version();
}
DelVectorPtr delvec() const override {
assert(Valid());
return current_->delvec();
}
private:
// Which direction is the iterator moving?
enum Direction { kForward, kReverse };

View File

@ -8,6 +8,7 @@
#include <memory>
#include <string>
#include "storage/del_vector.h"
#include "storage/sstable/sstable_predicate_fwd.h"
namespace starrocks {
@ -131,6 +132,14 @@ struct ReadOptions {
ReadIOStat* stat = nullptr;
SstablePredicateSPtr predicate = nullptr;
// When sst was generated during data write & compaction process,
// these two fields are used to indicate the shared rssid & version
uint32_t shared_rssid = 0;
int64_t shared_version = 0;
// Mark rows that have been deleted in this sst
DelVectorPtr delvec = nullptr;
};
// Options that control write operations

View File

@ -124,6 +124,10 @@ public:
uint64_t max_rss_rowid() const override { return options_.max_rss_rowid; }
SstablePredicateSPtr predicate() const override { return options_.predicate; }
uint32_t shared_rssid() const override { return options_.shared_rssid; };
int64_t shared_version() const override { return options_.shared_version; };
DelVectorPtr delvec() const override { return options_.delvec; };
private:
void SaveError(const Status& s) {
if (status_.ok() && !s.ok()) status_ = s;

View File

@ -226,7 +226,7 @@ TEST_F(PersistentIndexSstableTest, test_persistent_index_sstable) {
PersistentIndexSstablePB sstable_pb;
sstable_pb.set_filename(filename);
sstable_pb.set_filesize(filesize);
ASSERT_OK(sst->init(std::move(read_file), sstable_pb, cache_ptr.get()));
ASSERT_OK(sst->init(std::move(read_file), sstable_pb, cache_ptr.get(), nullptr, 0));
// check memory usage
ASSERT_TRUE(sst->memory_usage() > 0);
@ -519,7 +519,7 @@ TEST_F(PersistentIndexSstableTest, test_persistent_index_sstable_stream_builder)
PersistentIndexSstablePB sstable_pb;
sstable_pb.set_filename(filename);
sstable_pb.set_filesize(file_size);
ASSERT_OK(sst->init(std::move(read_file), sstable_pb, cache_ptr.get()));
ASSERT_OK(sst->init(std::move(read_file), sstable_pb, cache_ptr.get(), nullptr, 0));
// 4. Iterate and verify all keys
sstable::ReadIOStat stat;

View File

@ -33,6 +33,7 @@
#include "storage/lake/fixed_location_provider.h"
#include "storage/lake/join_path.h"
#include "storage/lake/tablet_manager.h"
#include "storage/lake/tablet_reader.h"
#include "storage/lake/txn_log.h"
#include "storage/rowset/segment.h"
#include "storage/rowset/segment_options.h"
@ -52,6 +53,8 @@ inline std::shared_ptr<TabletMetadataPB> generate_tablet_metadata(KeysType keys_
metadata->set_version(1);
metadata->set_cumulative_point(0);
metadata->set_next_rowset_id(1);
metadata->set_enable_persistent_index(true);
metadata->set_persistent_index_type(PersistentIndexTypePB::CLOUD_NATIVE);
//
// | column | type | KEY | NULL |
// +--------+------+-----+------+
@ -115,6 +118,29 @@ protected:
return Chunk({std::move(c0), std::move(c1)}, _schema);
}
ChunkPtr read(int64_t tablet_id, int64_t version) {
ASSIGN_OR_ABORT(auto metadata, _tablet_mgr->get_tablet_metadata(tablet_id, version));
auto reader = std::make_shared<TabletReader>(_tablet_mgr.get(), metadata, *_schema);
CHECK_OK(reader->prepare());
CHECK_OK(reader->open(TabletReaderParams()));
auto ret = ChunkHelper::new_chunk(*_schema, 128);
while (true) {
auto tmp = ChunkHelper::new_chunk(*_schema, 128);
auto st = reader->get_next(tmp.get());
if (st.is_end_of_file()) {
break;
}
CHECK_OK(st);
ret->append(*tmp);
}
return ret;
}
int64_t read_rows(int64_t tablet_id, int64_t version) {
auto chunk = read(tablet_id, version);
return chunk->num_rows();
}
constexpr static const char* const kTestDirectory = "test_pk_tablet_sst_writer";
std::shared_ptr<TabletMetadata> _tablet_metadata;
@ -314,4 +340,257 @@ TEST_F(PkTabletSSTWriterTest, test_publish_multi_segments_with_sst) {
config::enable_pk_parallel_execution = old_enable_pk_parallel_execution;
}
TEST_F(PkTabletSSTWriterTest, test_parallel_execution_data_import) {
const int64_t tablet_id = _tablet_metadata->id();
const int64_t old_size = config::write_buffer_size;
const bool old_enable_pk_parallel_execution = config::enable_pk_parallel_execution;
config::write_buffer_size = 1024;
config::enable_pk_parallel_execution = true;
auto chunk0 = generate_data(100, 0);
auto chunk1 = generate_data(100, 100);
auto chunk2 = generate_data(100, 200);
auto indexes = std::vector<uint32_t>(chunk0.num_rows());
for (uint32_t i = 0, n = chunk0.num_rows(); i < n; i++) {
indexes[i] = i;
}
int version = 1;
int64_t txn_id = next_id();
ASSIGN_OR_ABORT(auto delta_writer, DeltaWriterBuilder()
.set_tablet_manager(_tablet_mgr.get())
.set_tablet_id(tablet_id)
.set_txn_id(txn_id)
.set_partition_id(_partition_id)
.set_mem_tracker(_mem_tracker.get())
.set_schema_id(_tablet_schema->id())
.set_profile(&_dummy_runtime_profile)
.build());
ASSERT_OK(delta_writer->open());
ASSERT_OK(delta_writer->write(chunk0, indexes.data(), indexes.size()));
ASSERT_OK(delta_writer->write(chunk1, indexes.data(), indexes.size()));
ASSERT_OK(delta_writer->write(chunk2, indexes.data(), indexes.size()));
ASSERT_OK(delta_writer->finish_with_txnlog());
delta_writer->close();
ASSIGN_OR_ABORT(auto txn_log, _tablet_mgr->get_txn_log(tablet_id, txn_id));
EXPECT_GT(txn_log->op_write().ssts_size(), 0);
ASSERT_OK(publish_single_version(tablet_id, version + 1, txn_id).status());
EXPECT_TRUE(_update_mgr->TEST_check_update_state_cache_absent(tablet_id, txn_id));
// Verify the imported data by reading rows
int64_t expected_rows = 300; // chunk0(100) + chunk1(100) + chunk2(100)
int64_t actual_rows = read_rows(tablet_id, version + 1);
EXPECT_EQ(expected_rows, actual_rows);
config::write_buffer_size = old_size;
config::enable_pk_parallel_execution = old_enable_pk_parallel_execution;
}
TEST_F(PkTabletSSTWriterTest, test_parallel_execution_with_update_operations) {
const int64_t tablet_id = _tablet_metadata->id();
const int64_t old_size = config::write_buffer_size;
const bool old_enable_pk_parallel_execution = config::enable_pk_parallel_execution;
config::write_buffer_size = 1024;
config::enable_pk_parallel_execution = true;
auto chunk0 = generate_data(50, 0);
auto indexes = std::vector<uint32_t>(chunk0.num_rows());
for (uint32_t i = 0, n = chunk0.num_rows(); i < n; i++) {
indexes[i] = i;
}
int version = 1;
for (int i = 0; i < 3; i++) {
int64_t txn_id = next_id();
ASSIGN_OR_ABORT(auto delta_writer, DeltaWriterBuilder()
.set_tablet_manager(_tablet_mgr.get())
.set_tablet_id(tablet_id)
.set_txn_id(txn_id)
.set_partition_id(_partition_id)
.set_mem_tracker(_mem_tracker.get())
.set_schema_id(_tablet_schema->id())
.set_profile(&_dummy_runtime_profile)
.build());
ASSERT_OK(delta_writer->open());
auto chunk_update = generate_data(50, i * 25);
ASSERT_OK(delta_writer->write(chunk_update, indexes.data(), indexes.size()));
ASSERT_OK(delta_writer->finish_with_txnlog());
delta_writer->close();
ASSIGN_OR_ABORT(auto txn_log, _tablet_mgr->get_txn_log(tablet_id, txn_id));
EXPECT_GT(txn_log->op_write().ssts_size(), 0);
ASSERT_OK(publish_single_version(tablet_id, version + 1, txn_id).status());
version++;
}
// Verify the final result after multiple updates
// With overlapping keys, the final count should be based on unique primary keys
int64_t final_rows = read_rows(tablet_id, version);
EXPECT_EQ(final_rows, 100);
config::write_buffer_size = old_size;
config::enable_pk_parallel_execution = old_enable_pk_parallel_execution;
}
TEST_F(PkTabletSSTWriterTest, test_parallel_execution_compaction_consistency) {
const int64_t tablet_id = _tablet_metadata->id();
const int64_t old_size = config::write_buffer_size;
const bool old_enable_pk_parallel_execution = config::enable_pk_parallel_execution;
const auto old_min_segments = config::lake_pk_compaction_min_input_segments;
config::write_buffer_size = 512;
config::enable_pk_parallel_execution = true;
config::lake_pk_compaction_min_input_segments = 2;
auto chunk0 = generate_data(80, 0);
auto chunk1 = generate_data(80, 80);
auto chunk2 = generate_data(80, 160);
auto indexes = std::vector<uint32_t>(chunk0.num_rows());
for (uint32_t i = 0, n = chunk0.num_rows(); i < n; i++) {
indexes[i] = i;
}
int version = 1;
for (int i = 0; i < 4; i++) {
int64_t txn_id = next_id();
ASSIGN_OR_ABORT(auto delta_writer, DeltaWriterBuilder()
.set_tablet_manager(_tablet_mgr.get())
.set_tablet_id(tablet_id)
.set_txn_id(txn_id)
.set_partition_id(_partition_id)
.set_mem_tracker(_mem_tracker.get())
.set_schema_id(_tablet_schema->id())
.set_profile(&_dummy_runtime_profile)
.build());
ASSERT_OK(delta_writer->open());
ASSERT_OK(delta_writer->write(chunk0, indexes.data(), indexes.size()));
ASSERT_OK(delta_writer->write(chunk1, indexes.data(), indexes.size()));
ASSERT_OK(delta_writer->write(chunk2, indexes.data(), indexes.size()));
ASSERT_OK(delta_writer->finish_with_txnlog());
delta_writer->close();
ASSIGN_OR_ABORT(auto txn_log, _tablet_mgr->get_txn_log(tablet_id, txn_id));
EXPECT_GT(txn_log->op_write().ssts_size(), 0);
ASSERT_OK(publish_single_version(tablet_id, version + 1, txn_id).status());
version++;
}
int64_t compaction_txn_id = next_id();
auto task_context =
std::make_unique<CompactionTaskContext>(compaction_txn_id, tablet_id, version, false, false, nullptr);
ASSIGN_OR_ABORT(auto task, _tablet_mgr->compact(task_context.get()));
ASSERT_OK(task->execute(CompactionTask::kNoCancelFn));
EXPECT_EQ(100, task_context->progress.value());
ASSIGN_OR_ABORT(auto compaction_txn_log, _tablet_mgr->get_txn_log(tablet_id, compaction_txn_id));
EXPECT_EQ(compaction_txn_log->op_compaction().input_rowsets_size(), 4);
EXPECT_GT(compaction_txn_log->op_compaction().ssts_size(), 0);
ASSERT_OK(publish_single_version(tablet_id, version + 1, compaction_txn_id).status());
// Verify data consistency after compaction with parallel execution
int64_t rows_after_compaction = read_rows(tablet_id, version + 1);
EXPECT_EQ(rows_after_compaction, 240);
config::write_buffer_size = old_size;
config::enable_pk_parallel_execution = old_enable_pk_parallel_execution;
config::lake_pk_compaction_min_input_segments = old_min_segments;
}
TEST_F(PkTabletSSTWriterTest, test_parallel_execution_vs_serial_execution_results) {
const int64_t tablet_id = _tablet_metadata->id();
const int64_t old_size = config::write_buffer_size;
const bool old_enable_pk_parallel_execution = config::enable_pk_parallel_execution;
config::write_buffer_size = 1024;
auto chunk0 = generate_data(60, 0);
auto indexes = std::vector<uint32_t>(chunk0.num_rows());
for (uint32_t i = 0, n = chunk0.num_rows(); i < n; i++) {
indexes[i] = i;
}
std::vector<FileMetaPB> parallel_ssts;
std::vector<FileMetaPB> serial_ssts;
config::enable_pk_parallel_execution = true;
{
int64_t txn_id = next_id();
ASSIGN_OR_ABORT(auto delta_writer, DeltaWriterBuilder()
.set_tablet_manager(_tablet_mgr.get())
.set_tablet_id(tablet_id)
.set_txn_id(txn_id)
.set_partition_id(_partition_id)
.set_mem_tracker(_mem_tracker.get())
.set_schema_id(_tablet_schema->id())
.set_profile(&_dummy_runtime_profile)
.build());
ASSERT_OK(delta_writer->open());
ASSERT_OK(delta_writer->write(chunk0, indexes.data(), indexes.size()));
ASSERT_OK(delta_writer->finish_with_txnlog());
delta_writer->close();
ASSIGN_OR_ABORT(auto txn_log, _tablet_mgr->get_txn_log(tablet_id, txn_id));
for (const auto& sst : txn_log->op_write().ssts()) {
parallel_ssts.push_back(sst);
}
ASSERT_OK(publish_single_version(tablet_id, 2, txn_id).status());
}
config::enable_pk_parallel_execution = false;
{
int64_t txn_id = next_id();
ASSIGN_OR_ABORT(auto delta_writer, DeltaWriterBuilder()
.set_tablet_manager(_tablet_mgr.get())
.set_tablet_id(tablet_id)
.set_txn_id(txn_id)
.set_partition_id(_partition_id)
.set_mem_tracker(_mem_tracker.get())
.set_schema_id(_tablet_schema->id())
.set_profile(&_dummy_runtime_profile)
.build());
ASSERT_OK(delta_writer->open());
ASSERT_OK(delta_writer->write(chunk0, indexes.data(), indexes.size()));
ASSERT_OK(delta_writer->finish_with_txnlog());
delta_writer->close();
ASSIGN_OR_ABORT(auto txn_log, _tablet_mgr->get_txn_log(tablet_id, txn_id));
for (const auto& sst : txn_log->op_write().ssts()) {
serial_ssts.push_back(sst);
}
ASSERT_OK(publish_single_version(tablet_id, 3, txn_id).status());
}
EXPECT_GT(parallel_ssts.size(), 0);
EXPECT_EQ(serial_ssts.size(), 0);
// Verify both parallel and serial execution produce correct results
int64_t parallel_rows = read_rows(tablet_id, 2);
int64_t serial_rows = read_rows(tablet_id, 3);
EXPECT_EQ(60, parallel_rows); // Should have 60 unique rows
EXPECT_EQ(60, serial_rows); // Should have same 60 unique rows
EXPECT_EQ(parallel_rows, serial_rows); // Both should produce same result
config::write_buffer_size = old_size;
config::enable_pk_parallel_execution = old_enable_pk_parallel_execution;
}
} // namespace starrocks::lake