[Enhancement] enable pk parallel execution when meet large import and compaction (#63219)

Signed-off-by: luohaha <18810541851@163.com>
This commit is contained in:
Yixin Luo 2025-09-23 20:20:19 +08:00 committed by GitHub
parent cff0ceb1e9
commit 5ae19c32f4
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
26 changed files with 325 additions and 120 deletions

View File

@ -413,6 +413,9 @@ CONF_Bool(enable_size_tiered_compaction_strategy, "true");
CONF_mBool(enable_pk_size_tiered_compaction_strategy, "true");
// Enable parallel execution within tablet for primary key tables.
CONF_mBool(enable_pk_parallel_execution, "false");
// The minimum threshold of data size for enabling pk parallel execution.
// Default is 300MB.
CONF_mInt64(pk_parallel_execution_threshold_bytes, "314572800");
// We support real-time compaction strategy for primary key tables in shared-data mode.
// This real-time compaction strategy enables compacting rowsets across multiple levels simultaneously.
// The parameter `size_tiered_max_compaction_level` defines the maximum compaction level allowed in a single compaction task.

View File

@ -35,9 +35,11 @@ Status HorizontalCompactionTask::execute(CancelFunc cancel_func, ThreadPool* flu
SCOPED_THREAD_LOCAL_MEM_TRACKER_SETTER(_mem_tracker.get());
int64_t total_num_rows = 0;
int64_t input_bytes = 0;
for (auto& rowset : _input_rowsets) {
total_num_rows += rowset->num_rows();
_context->stats->read_segment_count += rowset->num_segments();
input_bytes += rowset->data_size_after_deletion();
}
ASSIGN_OR_RETURN(auto chunk_size, calculate_chunk_size());
@ -63,6 +65,10 @@ Status HorizontalCompactionTask::execute(CancelFunc cancel_func, ThreadPool* flu
RETURN_IF_ERROR(writer->open());
DeferOp defer([&]() { writer->close(); });
if (input_bytes >= config::pk_parallel_execution_threshold_bytes) {
writer->try_enable_pk_parallel_execution();
}
auto chunk = ChunkHelper::new_chunk(schema, chunk_size);
auto char_field_indexes = ChunkHelper::get_char_field_indexes(schema);
std::vector<uint64_t> rssid_rowids;

View File

@ -50,7 +50,8 @@ Status KeyValueMerger::merge(const sstable::Iterator* iter_ptr) {
return Status::OK();
}
// filter rows which already been deleted in this sst
if (iter_ptr->delvec() != nullptr && iter_ptr->delvec()->roaring()->contains(index_value_ver.values(0).rowid())) {
if (iter_ptr->delvec() != nullptr && !iter_ptr->delvec()->empty() &&
iter_ptr->delvec()->roaring()->contains(index_value_ver.values(0).rowid())) {
// this row has been deleted in this sst, skip it
return Status::OK();
}
@ -134,7 +135,7 @@ Status KeyValueMerger::merge(const sstable::Iterator* iter_ptr) {
_index_value_vers.swap(t);
}
} else {
flush();
RETURN_IF_ERROR(flush());
_key = key;
_max_rss_rowid = max_rss_rowid;
_index_value_vers.emplace_front(version, index_value);
@ -142,9 +143,9 @@ Status KeyValueMerger::merge(const sstable::Iterator* iter_ptr) {
return Status::OK();
}
void KeyValueMerger::flush() {
Status KeyValueMerger::flush() {
if (_index_value_vers.empty()) {
return;
return Status::OK();
}
IndexValuesWithVerPB index_value_pb;
@ -159,9 +160,11 @@ void KeyValueMerger::flush() {
value->set_rowid(index_value_with_ver.second.get_rowid());
}
if (index_value_pb.values_size() > 0) {
_builder->Add(Slice(_key), Slice(index_value_pb.SerializeAsString()));
RETURN_IF_ERROR(_builder->Add(Slice(_key), Slice(index_value_pb.SerializeAsString())));
}
_index_value_vers.clear();
return Status::OK();
}
LakePersistentIndex::LakePersistentIndex(TabletManager* tablet_mgr, int64_t tablet_id)
@ -285,8 +288,9 @@ Status LakePersistentIndex::ingest_sst(const FileMetaPB& sst_meta, uint32_t rssi
sstable_pb.set_encryption_meta(sst_meta.encryption_meta());
// sstable generated by compaction need delvec to resolve conflict.
sstable_pb.set_has_delvec(is_compaction);
// use UINT32_MAX as max rowid here to indicate all rows of this segment are already contained in this sst
sstable_pb.set_max_rss_rowid((static_cast<uint64_t>(rssid) << 32) | UINT32_MAX);
// use UINT32_MAX - 1 as max rowid here to indicate all rows of this segment are already contained in this sst.
// We don't use UINT32_MAX as max rowid because it is reserved for delete rows.
sstable_pb.set_max_rss_rowid((static_cast<uint64_t>(rssid) << 32) | (UINT32_MAX - 1));
RETURN_IF_ERROR(sstable->init(std::move(rf), sstable_pb, block_cache->cache(), _tablet_mgr, _tablet_id,
true /* need filter */, std::move(delvec)));
_sstables.emplace_back(std::move(sstable));
@ -534,7 +538,7 @@ Status LakePersistentIndex::merge_sstables(std::unique_ptr<sstable::Iterator> it
iter_ptr->Next();
}
RETURN_IF_ERROR(iter_ptr->status());
merger->finish();
RETURN_IF_ERROR(merger->finish());
return builder->Finish();
}

View File

@ -48,10 +48,10 @@ public:
Status merge(const sstable::Iterator* iter_ptr);
void finish() { flush(); }
Status finish() { return flush(); }
private:
void flush();
Status flush();
private:
std::string _key;

View File

@ -70,7 +70,7 @@ Status PersistentIndexSstable::build_sstable(const phmap::btree_map<std::string,
value->set_version(v.first);
value->set_rssid(v.second.get_rssid());
value->set_rowid(v.second.get_rowid());
builder.Add(Slice(k), Slice(index_value_pb.SerializeAsString()));
RETURN_IF_ERROR(builder.Add(Slice(k), Slice(index_value_pb.SerializeAsString())));
}
RETURN_IF_ERROR(builder.Finish());
*filesz = builder.FileSize();
@ -158,17 +158,12 @@ Status PersistentIndexSstableStreamBuilder::add(const Slice& key) {
return Status::InvalidArgument("Builder already finished");
}
if (!_status.ok()) {
return _status;
}
IndexValuesWithVerPB index_value_pb;
auto* val = index_value_pb.add_values();
val->set_rowid(_sst_rowid++);
_table_builder->Add(key, Slice(index_value_pb.SerializeAsString()));
_status = _table_builder->status();
return _status;
RETURN_IF_ERROR(_table_builder->Add(key, Slice(index_value_pb.SerializeAsString())));
return _table_builder->status();
}
Status PersistentIndexSstableStreamBuilder::finish(uint64_t* file_size) {
@ -176,18 +171,12 @@ Status PersistentIndexSstableStreamBuilder::finish(uint64_t* file_size) {
return Status::InvalidArgument("Builder already finished");
}
if (!_status.ok()) {
return _status;
RETURN_IF_ERROR(_table_builder->Finish());
_finished = true;
if (file_size != nullptr) {
*file_size = _table_builder->FileSize();
}
_status = _table_builder->Finish();
if (_status.ok()) {
_finished = true;
if (file_size != nullptr) {
*file_size = _table_builder->FileSize();
}
}
return _status;
return _table_builder->status();
}
uint64_t PersistentIndexSstableStreamBuilder::num_entries() const {
@ -202,8 +191,4 @@ FileInfo PersistentIndexSstableStreamBuilder::file_info() const {
return file_info;
}
Status PersistentIndexSstableStreamBuilder::status() const {
return _status;
}
} // namespace starrocks::lake

View File

@ -88,14 +88,12 @@ public:
uint64_t num_entries() const;
FileInfo file_info() const;
Status status() const;
std::string file_path() const { return _wf->filename(); }
private:
std::unique_ptr<sstable::TableBuilder> _table_builder;
std::unique_ptr<sstable::FilterPolicy> _filter_policy;
std::unique_ptr<WritableFile> _wf;
Status _status;
bool _finished;
std::string _encryption_meta;
uint32_t _sst_rowid = 0;

View File

@ -29,15 +29,28 @@ namespace starrocks::lake {
class PersistentIndexSstableStreamBuilder;
class PkTabletSSTWriter {
class DefaultSSTWriter {
public:
virtual ~DefaultSSTWriter() = default;
virtual Status append_sst_record(const Chunk& data) { return Status::OK(); }
virtual Status reset_sst_writer(const std::shared_ptr<LocationProvider>& location_provider,
const std::shared_ptr<FileSystem>& fs) {
return Status::OK();
}
virtual StatusOr<FileInfo> flush_sst_writer() { return Status::OK(); }
virtual bool has_file_info() const { return false; }
};
class PkTabletSSTWriter : public DefaultSSTWriter {
public:
PkTabletSSTWriter(const TabletSchemaCSPtr& tablet_schema_ptr, TabletManager* tablet_mgr, int64_t tablet_id)
: _tablet_schema_ptr(tablet_schema_ptr), _tablet_mgr(tablet_mgr), _tablet_id(tablet_id) {}
virtual ~PkTabletSSTWriter() = default;
Status append_sst_record(const Chunk& data);
~PkTabletSSTWriter() override = default;
Status append_sst_record(const Chunk& data) override;
Status reset_sst_writer(const std::shared_ptr<LocationProvider>& location_provider,
const std::shared_ptr<FileSystem>& fs);
StatusOr<FileInfo> flush_sst_writer();
const std::shared_ptr<FileSystem>& fs) override;
StatusOr<FileInfo> flush_sst_writer() override;
bool has_file_info() const override { return _pk_sst_builder != nullptr; }
private:
std::unique_ptr<PersistentIndexSstableStreamBuilder> _pk_sst_builder;

View File

@ -51,9 +51,7 @@ HorizontalPkTabletWriter::~HorizontalPkTabletWriter() = default;
Status HorizontalPkTabletWriter::write(const Chunk& data, const std::vector<uint64_t>& rssid_rowids,
SegmentPB* segment) {
RETURN_IF_ERROR(HorizontalGeneralTabletWriter::write(data, segment));
if (_pk_sst_writer != nullptr) {
RETURN_IF_ERROR(_pk_sst_writer->append_sst_record(data));
}
RETURN_IF_ERROR(_pk_sst_writer->append_sst_record(data));
if (_rows_mapper_builder != nullptr) {
RETURN_IF_ERROR(_rows_mapper_builder->append(rssid_rowids));
}
@ -62,9 +60,7 @@ Status HorizontalPkTabletWriter::write(const Chunk& data, const std::vector<uint
Status HorizontalPkTabletWriter::write(const Chunk& data, SegmentPB* segment, bool eos) {
RETURN_IF_ERROR(HorizontalGeneralTabletWriter::write(data, segment, eos));
if (_pk_sst_writer != nullptr) {
RETURN_IF_ERROR(_pk_sst_writer->append_sst_record(data));
}
RETURN_IF_ERROR(_pk_sst_writer->append_sst_record(data));
return Status::OK();
}
@ -97,12 +93,14 @@ Status HorizontalPkTabletWriter::flush_del_file(const Column& deletes) {
Status HorizontalPkTabletWriter::reset_segment_writer(bool eos) {
RETURN_IF_ERROR(HorizontalGeneralTabletWriter::reset_segment_writer(eos));
// reset sst file writer
if (_pk_sst_writer == nullptr && enable_pk_parallel_execution()) {
_pk_sst_writer = std::make_unique<PkTabletSSTWriter>(tablet_schema(), _tablet_mgr, _tablet_id);
}
if (_pk_sst_writer != nullptr) {
RETURN_IF_ERROR(_pk_sst_writer->reset_sst_writer(_location_provider, _fs));
if (_pk_sst_writer == nullptr) {
if (enable_pk_parallel_execution()) {
_pk_sst_writer = std::make_unique<PkTabletSSTWriter>(tablet_schema(), _tablet_mgr, _tablet_id);
} else {
_pk_sst_writer = std::make_unique<DefaultSSTWriter>();
}
}
RETURN_IF_ERROR(_pk_sst_writer->reset_sst_writer(_location_provider, _fs));
return Status::OK();
}
@ -135,10 +133,9 @@ Status HorizontalPkTabletWriter::flush_segment_writer(SegmentPB* segment) {
}
_seg_writer.reset();
}
if (_pk_sst_writer != nullptr) {
if (_pk_sst_writer && _pk_sst_writer->has_file_info()) {
ASSIGN_OR_RETURN(auto sst_file_info, _pk_sst_writer->flush_sst_writer());
_ssts.emplace_back(sst_file_info);
_pk_sst_writer.reset();
}
return Status::OK();
}
@ -171,14 +168,17 @@ Status VerticalPkTabletWriter::write_columns(const Chunk& data, const std::vecto
// Save rssid_rowids only when writing key columns
DCHECK(is_key);
RETURN_IF_ERROR(VerticalGeneralTabletWriter::write_columns(data, column_indexes, is_key));
if (_pk_sst_writers.size() <= _current_writer_index && enable_pk_parallel_execution()) {
auto sst_writer = std::make_unique<PkTabletSSTWriter>(tablet_schema(), _tablet_mgr, _tablet_id);
if (_pk_sst_writers.size() <= _current_writer_index) {
std::unique_ptr<DefaultSSTWriter> sst_writer;
if (enable_pk_parallel_execution()) {
sst_writer = std::make_unique<PkTabletSSTWriter>(tablet_schema(), _tablet_mgr, _tablet_id);
} else {
sst_writer = std::make_unique<DefaultSSTWriter>();
}
RETURN_IF_ERROR(sst_writer->reset_sst_writer(_location_provider, _fs));
_pk_sst_writers.emplace_back(std::move(sst_writer));
}
if (_pk_sst_writers.size() > _current_writer_index) {
RETURN_IF_ERROR(_pk_sst_writers[_current_writer_index]->append_sst_record(data));
}
RETURN_IF_ERROR(_pk_sst_writers[_current_writer_index]->append_sst_record(data));
if (_rows_mapper_builder != nullptr) {
RETURN_IF_ERROR(_rows_mapper_builder->append(rssid_rowids));
}
@ -187,8 +187,10 @@ Status VerticalPkTabletWriter::write_columns(const Chunk& data, const std::vecto
Status VerticalPkTabletWriter::finish(SegmentPB* segment) {
for (auto& sst_writer : _pk_sst_writers) {
ASSIGN_OR_RETURN(auto sst_file_info, sst_writer->flush_sst_writer());
_ssts.emplace_back(sst_file_info);
if (sst_writer->has_file_info()) {
ASSIGN_OR_RETURN(auto sst_file_info, sst_writer->flush_sst_writer());
_ssts.emplace_back(sst_file_info);
}
}
_pk_sst_writers.clear();
if (_rows_mapper_builder != nullptr) {

View File

@ -29,7 +29,7 @@ class BundleWritableFileContext;
namespace starrocks::lake {
class PkTabletSSTWriter;
class DefaultSSTWriter;
class HorizontalPkTabletWriter : public HorizontalGeneralTabletWriter {
public:
@ -64,7 +64,7 @@ protected:
private:
std::unique_ptr<RowsetTxnMetaPB> _rowset_txn_meta;
std::unique_ptr<RowsMapperBuilder> _rows_mapper_builder;
std::unique_ptr<PkTabletSSTWriter> _pk_sst_writer;
std::unique_ptr<DefaultSSTWriter> _pk_sst_writer;
};
class VerticalPkTabletWriter : public VerticalGeneralTabletWriter {
@ -93,7 +93,7 @@ public:
private:
std::unique_ptr<RowsMapperBuilder> _rows_mapper_builder;
std::vector<std::unique_ptr<PkTabletSSTWriter>> _pk_sst_writers;
std::vector<std::unique_ptr<DefaultSSTWriter>> _pk_sst_writers;
};
} // namespace starrocks::lake

View File

@ -567,4 +567,13 @@ Status Rowset::load_segments(std::vector<SegmentPtr>* segments, SegmentReadOptio
return Status::OK();
}
int64_t Rowset::data_size_after_deletion() const {
// get data size after delete vector applied
if (num_rows() == 0 || data_size() == 0) {
return 0;
}
// data size * (num_rows - num_deleted_rows) / num_rows
return (int64_t)(data_size() * ((double)(num_rows() - num_dels()) / num_rows()));
}
} // namespace starrocks::lake

View File

@ -100,6 +100,8 @@ public:
[[nodiscard]] int64_t data_size() const { return metadata().data_size(); }
[[nodiscard]] int64_t data_size_after_deletion() const;
[[nodiscard]] uint32_t id() const { return metadata().id(); }
[[nodiscard]] RowsetId rowset_id() const override;

View File

@ -79,6 +79,18 @@ Status SpillMemTableSink::merge_blocks_to_segments() {
SchemaPtr schema = _load_chunk_spiller->schema();
bool do_agg = schema->keys_type() == KeysType::AGG_KEYS || schema->keys_type() == KeysType::UNIQUE_KEYS;
if (_load_chunk_spiller->total_bytes() >= config::pk_parallel_execution_threshold_bytes) {
// When bulk load happens, try to enable pk parallel execution
_writer->try_enable_pk_parallel_execution();
// When enable pk parallel execution, that means it will generate sst files when data loading,
// so we need to make sure not duplicate keys exist in segment files and sst files.
// That means we need to do aggregation when spill merge.
if (_writer->enable_pk_parallel_execution()) {
do_agg = true;
}
}
auto char_field_indexes = ChunkHelper::get_char_field_indexes(*schema);
auto write_func = [&char_field_indexes, schema, this](Chunk* chunk) {
ChunkHelper::padding_char_columns(char_field_indexes, *schema, _writer->tablet_schema(), chunk);

View File

@ -49,9 +49,7 @@ public:
_schema(std::move(schema)),
_txn_id(txn_id),
_flush_pool(flush_pool),
_is_compaction(is_compaction) {
decide_pk_parallel_execution();
}
_is_compaction(is_compaction) {}
virtual ~TabletWriter() = default;
@ -147,7 +145,10 @@ public:
const DictColumnsValidMap& global_dict_columns_valid_info() const { return _global_dict_columns_valid_info; }
void decide_pk_parallel_execution() {
// When the system determines that pk parallel execution can be enabled
// (for example, during large imports or major compaction tasks), it will invoke this function.
// However, whether pk parallel execution is actually enabled still depends on the schema.
void try_enable_pk_parallel_execution() {
if (!config::enable_pk_parallel_execution || _schema->keys_type() != KeysType::PRIMARY_KEYS ||
_schema->has_separate_sort_key()) {
return;

View File

@ -35,12 +35,15 @@ namespace starrocks::lake {
Status VerticalCompactionTask::execute(CancelFunc cancel_func, ThreadPool* flush_pool) {
SCOPED_THREAD_LOCAL_MEM_TRACKER_SETTER(_mem_tracker.get());
int64_t input_bytes = 0;
for (auto& rowset : _input_rowsets) {
_total_num_rows += rowset->num_rows();
_total_data_size += rowset->data_size();
_total_input_segs += rowset->is_overlapped() ? rowset->num_segments() : 1;
// do not check `is_overlapped`, we want actual segment count here
_context->stats->read_segment_count += rowset->num_segments();
// real input bytes, which need to remove deleted rows
input_bytes += rowset->data_size_after_deletion();
}
const auto& store_paths = ExecEnv::GetInstance()->store_paths();
@ -55,6 +58,10 @@ Status VerticalCompactionTask::execute(CancelFunc cancel_func, ThreadPool* flush
RETURN_IF_ERROR(writer->open());
DeferOp defer([&]() { writer->close(); });
if (input_bytes >= config::pk_parallel_execution_threshold_bytes) {
writer->try_enable_pk_parallel_execution();
}
std::vector<std::vector<uint32_t>> column_groups;
CompactionUtils::split_column_into_groups(_tablet_schema->num_columns(), _tablet_schema->sort_key_idxes(),
config::vertical_compaction_max_columns_per_group, &column_groups);

View File

@ -199,6 +199,10 @@ private:
size_t _block_idx = 0;
};
size_t LoadChunkSpiller::total_bytes() const {
return _block_manager ? _block_manager->total_bytes() : 0;
}
Status LoadChunkSpiller::merge_write(size_t target_size, bool do_sort, bool do_agg,
std::function<Status(Chunk*)> write_func, std::function<Status()> flush_func) {
auto& groups = _block_manager->block_container()->block_groups();

View File

@ -68,6 +68,8 @@ public:
SchemaPtr schema() { return _schema; }
size_t total_bytes() const;
private:
Status _prepare(const ChunkPtr& chunk_ptr);

View File

@ -66,6 +66,7 @@ std::unique_ptr<ThreadPoolToken> LoadSpillBlockMergeExecutor::create_token() {
void LoadSpillBlockContainer::append_block(const spill::BlockPtr& block) {
std::lock_guard guard(_mutex);
_block_groups.back().append(block);
_total_bytes += block->size();
}
void LoadSpillBlockContainer::create_block_group() {

View File

@ -47,12 +47,15 @@ public:
// No thread safe, UT only
spill::BlockPtr get_block(size_t gid, size_t bid);
std::vector<spill::BlockGroup>& block_groups() { return _block_groups; }
size_t total_bytes() const { return _total_bytes; }
private:
// Mutex for the container.
std::mutex _mutex;
// Blocks generated when loading. Each block group contains multiple blocks which are ordered.
std::vector<spill::BlockGroup> _block_groups;
// total groups bytes
size_t _total_bytes = 0;
};
class LoadSpillBlockManager {
@ -86,6 +89,8 @@ public:
const TUniqueId& fragment_instance_id() const { return _fragment_instance_id; }
size_t total_bytes() const { return _block_container ? _block_container->total_bytes() : 0; }
private:
TUniqueId _load_id; // Unique ID for the load.
TUniqueId _fragment_instance_id; // Unique ID for the fragment instance.

View File

@ -87,16 +87,17 @@ Status TableBuilder::ChangeOptions(const Options& options) {
return Status::OK();
}
void TableBuilder::Add(const Slice& key, const Slice& value) {
Status TableBuilder::Add(const Slice& key, const Slice& value) {
Rep* r = rep_;
assert(!r->closed);
if (!ok()) return;
RETURN_ERROR_IF_FALSE(!r->closed);
if (!ok()) return Status::InternalError("TableBuilder has encountered a previous error");
if (r->num_entries > 0) {
assert(r->options.comparator->Compare(key, Slice(r->last_key)) > 0);
RETURN_ERROR_IF_FALSE(r->options.comparator->Compare(key, Slice(r->last_key)) > 0,
"Key must be greater than the previously added key according to comparator");
}
if (r->pending_index_entry) {
assert(r->data_block.empty());
RETURN_ERROR_IF_FALSE(r->data_block.empty(), "Data block must be empty when pending index entry exists");
r->options.comparator->FindShortestSeparator(&r->last_key, key);
std::string handle_encoding;
r->pending_handle.EncodeTo(&handle_encoding);
@ -116,6 +117,7 @@ void TableBuilder::Add(const Slice& key, const Slice& value) {
if (estimated_block_size >= r->options.block_size) {
Flush();
}
return Status::OK();
}
void TableBuilder::Flush() {

View File

@ -44,7 +44,7 @@ public:
// Add key,value to the table being constructed.
// REQUIRES: key is after any previously added key according to comparator.
// REQUIRES: Finish(), Abandon() have not been called
void Add(const Slice& key, const Slice& value);
Status Add(const Slice& key, const Slice& value);
// Advanced operation: flush any buffered key/value pairs to file.
// Can be used to ensure that two adjacent entries never live in

View File

@ -480,7 +480,6 @@ TEST_F(PersistentIndexSstableTest, test_persistent_index_sstable_stream_builder)
// Test initial state
ASSERT_EQ(0, builder->num_entries());
ASSERT_OK(builder->status());
ASSERT_FALSE(builder->file_path().empty());
// Add keys in order
@ -493,7 +492,6 @@ TEST_F(PersistentIndexSstableTest, test_persistent_index_sstable_stream_builder)
// Check state after adding keys
ASSERT_EQ(N, builder->num_entries());
ASSERT_OK(builder->status());
// 2. Finish building
uint64_t file_size = 0;
@ -582,4 +580,38 @@ TEST_F(PersistentIndexSstableTest, test_stream_builder_error_handling) {
ASSERT_ERROR(builder->add(Slice("key3")));
}
TEST_F(PersistentIndexSstableTest, test_table_builder_out_of_order_keys) {
sstable::Options options;
const std::string filename = "test_out_of_order.sst";
ASSIGN_OR_ABORT(auto file, fs::new_writable_file(lake::join_path(kTestDir, filename)));
sstable::TableBuilder builder(options, file.get());
// Add first key
std::string key1 = "test_key_0000000000000002";
IndexValue val1(2);
ASSERT_OK(builder.Add(Slice(key1), Slice(val1.v, 8)));
// Try to add a key that is lexicographically smaller than the previous key
// This should fail due to out-of-order constraint
std::string key2 = "test_key_0000000000000001";
IndexValue val2(1);
ASSERT_ERROR(builder.Add(Slice(key2), Slice(val2.v, 8)));
// Add another key in correct order should work
std::string key3 = "test_key_0000000000000003";
IndexValue val3(3);
ASSERT_OK(builder.Add(Slice(key3), Slice(val3.v, 8)));
// Try to add same key again - should fail
ASSERT_ERROR(builder.Add(Slice(key3), Slice(val3.v, 8)));
// Add key with same prefix but lexicographically smaller - should fail
std::string key4 = "test_key_0000000000000002A";
IndexValue val4(4);
ASSERT_ERROR(builder.Add(Slice(key4), Slice(val4.v, 8)));
// Builder should still be able to finish properly after error
builder.Abandon();
}
} // namespace starrocks::lake

View File

@ -290,10 +290,9 @@ TEST_F(PkTabletSSTWriterTest, test_publish_multi_segments_with_sst) {
}
auto version = 1;
auto tablet_id = _tablet_metadata->id();
const int64_t old_size = config::write_buffer_size;
config::write_buffer_size = 1;
const bool old_enable_pk_parallel_execution = config::enable_pk_parallel_execution;
config::enable_pk_parallel_execution = true;
ConfigResetGuard<int64_t> guard(&config::write_buffer_size, 1);
ConfigResetGuard<bool> guard2(&config::enable_pk_parallel_execution, true);
ConfigResetGuard<int64_t> guard3(&config::pk_parallel_execution_threshold_bytes, 1);
for (int i = 0; i < 5; i++) {
int64_t txn_id = next_id();
ASSIGN_OR_ABORT(auto delta_writer, DeltaWriterBuilder()
@ -321,8 +320,7 @@ TEST_F(PkTabletSSTWriterTest, test_publish_multi_segments_with_sst) {
}
// Compaction
{
auto old_val = config::lake_pk_compaction_min_input_segments;
config::lake_pk_compaction_min_input_segments = 1;
ConfigResetGuard<int64_t> guard(&config::lake_pk_compaction_min_input_segments, 1);
int64_t txn_id = next_id();
auto task_context = std::make_unique<CompactionTaskContext>(txn_id, tablet_id, version, false, false, nullptr);
ASSIGN_OR_ABORT(auto task, _tablet_mgr->compact(task_context.get()));
@ -333,20 +331,14 @@ TEST_F(PkTabletSSTWriterTest, test_publish_multi_segments_with_sst) {
EXPECT_EQ(txn_log->op_compaction().ssts_size(), 1);
ASSERT_OK(publish_single_version(tablet_id, version + 1, txn_id).status());
version++;
config::lake_pk_compaction_min_input_segments = old_val;
}
// update memory usage, should large than zero
config::write_buffer_size = old_size;
config::enable_pk_parallel_execution = old_enable_pk_parallel_execution;
}
TEST_F(PkTabletSSTWriterTest, test_parallel_execution_data_import) {
const int64_t tablet_id = _tablet_metadata->id();
const int64_t old_size = config::write_buffer_size;
const bool old_enable_pk_parallel_execution = config::enable_pk_parallel_execution;
config::write_buffer_size = 1024;
config::enable_pk_parallel_execution = true;
ConfigResetGuard<int64_t> guard(&config::write_buffer_size, 1024);
ConfigResetGuard<bool> guard2(&config::enable_pk_parallel_execution, true);
ConfigResetGuard<int64_t> guard3(&config::pk_parallel_execution_threshold_bytes, 1);
auto chunk0 = generate_data(100, 0);
auto chunk1 = generate_data(100, 100);
@ -386,18 +378,86 @@ TEST_F(PkTabletSSTWriterTest, test_parallel_execution_data_import) {
int64_t expected_rows = 300; // chunk0(100) + chunk1(100) + chunk2(100)
int64_t actual_rows = read_rows(tablet_id, version + 1);
EXPECT_EQ(expected_rows, actual_rows);
}
config::write_buffer_size = old_size;
config::enable_pk_parallel_execution = old_enable_pk_parallel_execution;
TEST_F(PkTabletSSTWriterTest, test_no_parallel_execution_with_update_operations) {
const int64_t tablet_id = _tablet_metadata->id();
ConfigResetGuard<int64_t> guard(&config::write_buffer_size, 1);
auto chunk0 = generate_data(50, 0);
auto indexes = std::vector<uint32_t>(chunk0.num_rows());
for (uint32_t i = 0, n = chunk0.num_rows(); i < n; i++) {
indexes[i] = i;
}
int version = 1;
for (int i = 0; i < 3; i++) {
// flush sst files
ConfigResetGuard<int64_t> guard2(&config::l0_max_mem_usage, 1);
int64_t txn_id = next_id();
ASSIGN_OR_ABORT(auto delta_writer, DeltaWriterBuilder()
.set_tablet_manager(_tablet_mgr.get())
.set_tablet_id(tablet_id)
.set_txn_id(txn_id)
.set_partition_id(_partition_id)
.set_mem_tracker(_mem_tracker.get())
.set_schema_id(_tablet_schema->id())
.set_profile(&_dummy_runtime_profile)
.build());
ASSERT_OK(delta_writer->open());
auto chunk_update = generate_data(50, i * 25);
ASSERT_OK(delta_writer->write(chunk_update, indexes.data(), indexes.size()));
ASSERT_OK(delta_writer->write(chunk_update, indexes.data(), indexes.size()));
ASSERT_OK(delta_writer->finish_with_txnlog());
delta_writer->close();
ASSIGN_OR_ABORT(auto txn_log, _tablet_mgr->get_txn_log(tablet_id, txn_id));
EXPECT_EQ(txn_log->op_write().ssts_size(), 0);
ASSERT_OK(publish_single_version(tablet_id, version + 1, txn_id).status());
version++;
}
{
int64_t txn_id = next_id();
ASSIGN_OR_ABORT(auto delta_writer, DeltaWriterBuilder()
.set_tablet_manager(_tablet_mgr.get())
.set_tablet_id(tablet_id)
.set_txn_id(txn_id)
.set_partition_id(_partition_id)
.set_mem_tracker(_mem_tracker.get())
.set_schema_id(_tablet_schema->id())
.set_profile(&_dummy_runtime_profile)
.build());
ASSERT_OK(delta_writer->open());
auto chunk_update = generate_data(50, 0);
ASSERT_OK(delta_writer->write(chunk_update, indexes.data(), indexes.size()));
ASSERT_OK(delta_writer->write(chunk_update, indexes.data(), indexes.size()));
ASSERT_OK(delta_writer->finish_with_txnlog());
delta_writer->close();
ASSIGN_OR_ABORT(auto txn_log, _tablet_mgr->get_txn_log(tablet_id, txn_id));
EXPECT_EQ(txn_log->op_write().ssts_size(), 0);
ASSERT_OK(publish_single_version(tablet_id, version + 1, txn_id).status());
version++;
}
// Verify the final result after multiple updates
// With overlapping keys, the final count should be based on unique primary keys
int64_t final_rows = read_rows(tablet_id, version);
EXPECT_EQ(final_rows, 100);
}
TEST_F(PkTabletSSTWriterTest, test_parallel_execution_with_update_operations) {
const int64_t tablet_id = _tablet_metadata->id();
const int64_t old_size = config::write_buffer_size;
const bool old_enable_pk_parallel_execution = config::enable_pk_parallel_execution;
config::write_buffer_size = 1024;
config::enable_pk_parallel_execution = true;
ConfigResetGuard<int64_t> guard(&config::write_buffer_size, 1);
ConfigResetGuard<bool> guard2(&config::enable_pk_parallel_execution, true);
ConfigResetGuard<int64_t> guard3(&config::pk_parallel_execution_threshold_bytes, 1);
auto chunk0 = generate_data(50, 0);
auto indexes = std::vector<uint32_t>(chunk0.num_rows());
@ -423,6 +483,7 @@ TEST_F(PkTabletSSTWriterTest, test_parallel_execution_with_update_operations) {
auto chunk_update = generate_data(50, i * 25);
ASSERT_OK(delta_writer->write(chunk_update, indexes.data(), indexes.size()));
ASSERT_OK(delta_writer->write(chunk_update, indexes.data(), indexes.size()));
ASSERT_OK(delta_writer->finish_with_txnlog());
delta_writer->close();
@ -437,20 +498,14 @@ TEST_F(PkTabletSSTWriterTest, test_parallel_execution_with_update_operations) {
// With overlapping keys, the final count should be based on unique primary keys
int64_t final_rows = read_rows(tablet_id, version);
EXPECT_EQ(final_rows, 100);
config::write_buffer_size = old_size;
config::enable_pk_parallel_execution = old_enable_pk_parallel_execution;
}
TEST_F(PkTabletSSTWriterTest, test_parallel_execution_compaction_consistency) {
const int64_t tablet_id = _tablet_metadata->id();
const int64_t old_size = config::write_buffer_size;
const bool old_enable_pk_parallel_execution = config::enable_pk_parallel_execution;
const auto old_min_segments = config::lake_pk_compaction_min_input_segments;
config::write_buffer_size = 512;
config::enable_pk_parallel_execution = true;
config::lake_pk_compaction_min_input_segments = 2;
ConfigResetGuard<int64_t> guard(&config::lake_pk_compaction_min_input_segments, 2);
ConfigResetGuard<int64_t> guard2(&config::write_buffer_size, 512);
ConfigResetGuard<bool> guard3(&config::enable_pk_parallel_execution, true);
ConfigResetGuard<int64_t> guard4(&config::pk_parallel_execution_threshold_bytes, 1);
auto chunk0 = generate_data(80, 0);
auto chunk1 = generate_data(80, 80);
@ -505,18 +560,13 @@ TEST_F(PkTabletSSTWriterTest, test_parallel_execution_compaction_consistency) {
// Verify data consistency after compaction with parallel execution
int64_t rows_after_compaction = read_rows(tablet_id, version + 1);
EXPECT_EQ(rows_after_compaction, 240);
config::write_buffer_size = old_size;
config::enable_pk_parallel_execution = old_enable_pk_parallel_execution;
config::lake_pk_compaction_min_input_segments = old_min_segments;
}
TEST_F(PkTabletSSTWriterTest, test_parallel_execution_vs_serial_execution_results) {
const int64_t tablet_id = _tablet_metadata->id();
const int64_t old_size = config::write_buffer_size;
const bool old_enable_pk_parallel_execution = config::enable_pk_parallel_execution;
config::write_buffer_size = 1024;
ConfigResetGuard<int64_t> guard(&config::write_buffer_size, 1);
ConfigResetGuard<bool> guard2(&config::enable_pk_parallel_execution, true);
ConfigResetGuard<int64_t> guard3(&config::pk_parallel_execution_threshold_bytes, 1);
auto chunk0 = generate_data(60, 0);
auto indexes = std::vector<uint32_t>(chunk0.num_rows());
@ -527,7 +577,6 @@ TEST_F(PkTabletSSTWriterTest, test_parallel_execution_vs_serial_execution_result
std::vector<FileMetaPB> parallel_ssts;
std::vector<FileMetaPB> serial_ssts;
config::enable_pk_parallel_execution = true;
{
int64_t txn_id = next_id();
ASSIGN_OR_ABORT(auto delta_writer, DeltaWriterBuilder()
@ -542,6 +591,7 @@ TEST_F(PkTabletSSTWriterTest, test_parallel_execution_vs_serial_execution_result
ASSERT_OK(delta_writer->open());
ASSERT_OK(delta_writer->write(chunk0, indexes.data(), indexes.size()));
ASSERT_OK(delta_writer->write(chunk0, indexes.data(), indexes.size()));
ASSERT_OK(delta_writer->finish_with_txnlog());
delta_writer->close();
@ -553,7 +603,7 @@ TEST_F(PkTabletSSTWriterTest, test_parallel_execution_vs_serial_execution_result
ASSERT_OK(publish_single_version(tablet_id, 2, txn_id).status());
}
config::enable_pk_parallel_execution = false;
ConfigResetGuard<bool> guard4(&config::enable_pk_parallel_execution, false);
{
int64_t txn_id = next_id();
ASSIGN_OR_ABORT(auto delta_writer, DeltaWriterBuilder()
@ -568,6 +618,7 @@ TEST_F(PkTabletSSTWriterTest, test_parallel_execution_vs_serial_execution_result
ASSERT_OK(delta_writer->open());
ASSERT_OK(delta_writer->write(chunk0, indexes.data(), indexes.size()));
ASSERT_OK(delta_writer->write(chunk0, indexes.data(), indexes.size()));
ASSERT_OK(delta_writer->finish_with_txnlog());
delta_writer->close();
@ -588,9 +639,6 @@ TEST_F(PkTabletSSTWriterTest, test_parallel_execution_vs_serial_execution_result
EXPECT_EQ(60, parallel_rows); // Should have 60 unique rows
EXPECT_EQ(60, serial_rows); // Should have same 60 unique rows
EXPECT_EQ(parallel_rows, serial_rows); // Both should produce same result
config::write_buffer_size = old_size;
config::enable_pk_parallel_execution = old_enable_pk_parallel_execution;
}
} // namespace starrocks::lake

View File

@ -123,6 +123,21 @@ struct PrimaryKeyParam {
bool enable_transparent_data_encryption = false;
};
template <typename T>
class ConfigResetGuard {
public:
ConfigResetGuard(T* val, T new_val) {
_val = val;
_old_val = *val;
*val = new_val;
}
~ConfigResetGuard() { *_val = _old_val; }
private:
T* _val;
T _old_val;
};
inline StatusOr<TabletMetadataPtr> TEST_publish_single_version(TabletManager* tablet_mgr, int64_t tablet_id,
int64_t new_version, int64_t txn_id,
bool rebuild_pindex) {

View File

@ -1269,6 +1269,24 @@ curl http://<BE_IP>:<BE_HTTP_PORT>/varz
- Description: The number of levels for the Size-tiered Compaction policy. At most one rowset is reserved for each level. Therefore, under a stable condition, there are, at most, as many rowsets as the level number specified in this configuration item.
- Introduced in: -
##### enable_pk_parallel_execution
- Default: false
- Type: Boolean
- Unit: -
- Is mutable: Yes
- Description: Determines whether the Primary Key table parallel execution strategy is enabled. When enabled, PK index files will be generated during the import and compaction phases.
- Introduced in: -
##### pk_parallel_execution_threshold_bytes
- Default: 314572800
- Type: int
- Unit: -
- Is mutable: Yes
- Description: When enable_pk_parallel_execution is set to true, the Primary Key table parallel execution strategy will be enabled if the data generated during import or compaction exceeds this threshold.
- Introduced in: -
##### enable_check_string_lengths
- Default: true

View File

@ -984,6 +984,24 @@ curl http://<BE_IP>:<BE_HTTP_PORT>/varz
- 説明: サイズ階層型コンパクションポリシーのレベル数。各レベルには最大で 1 つの rowset が保持されます。したがって、安定した状態では、この設定項目で指定されたレベル数と同じ数の rowset が最大で存在します。
- 導入バージョン: -
##### enable_pk_parallel_execution
- デフォルト: false
- タイプ: Boolean
- 単位: -
- 可変: はい
- 説明: Primary Key テーブルの並列実行戦略を有効にするかどうかを決定します。有効化されると、インポートおよびコンパクションの段階で PK インデックスファイルが生成されます。
- 導入バージョン: -
##### pk_parallel_execution_threshold_bytes
- デフォルト: 314572800
- タイプ: Int
- 単位: -
- 可変: はい
- 説明: enable_pk_parallel_execution が true に設定されている場合、インポートまたはコンパクションで生成されるデータがこの閾値を超えると、Primary Key テーブルの並列実行戦略が有効になります。
- 導入バージョン: -
##### enable_check_string_lengths
- デフォルト: true

View File

@ -1226,6 +1226,24 @@ curl http://<BE_IP>:<BE_HTTP_PORT>/varz
- 描述Size-tiered Compaction 策略的 Level 数量。每个 Level 最多保留一个 Rowset因此稳定状态下最多会有和 Level 数相同的 Rowset。
- 引入版本:-
##### enable_pk_parallel_execution
- 默认值false
- 类型Boolean
- 单位:-
- 是否动态:是
- 描述:是否为 Primary Key 表并行执行策略。当并行执行策略开启时pk索引文件会在导入和compaction阶段生成。
- 引入版本:-
##### pk_parallel_execution_threshold_bytes
- 默认值314572800
- 类型Int
- 单位:-
- 是否动态:是
- 描述当enable_pk_parallel_execution设置为true后导入或者compaction生成的数据大于该阈值时Primary Key 表并行执行策略将被启用。
- 引入版本:-
##### enable_check_string_lengths
- 默认值true