[Enhancement] Improved the logic for determining when to enable PK parallel execution (#63651)

Signed-off-by: luohaha <18810541851@163.com>
This commit is contained in:
Yixin Luo 2025-09-28 19:13:06 +08:00 committed by GitHub
parent 50d0a2aa79
commit d4032438df
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
9 changed files with 128 additions and 23 deletions

View File

@ -217,6 +217,7 @@ set(STORAGE_FILES
lake/lake_primary_key_recover.cpp
lake/spill_mem_table_sink.cpp
lake/tablet_retain_info.cpp
lake/tablet_writer.cpp
load_spill_block_manager.cpp
load_chunk_spiller.cpp
sstable/block_builder.cpp

View File

@ -90,4 +90,21 @@ Status CompactionTask::fill_compaction_segment_info(TxnLogPB_OpCompaction* op_co
return Status::OK();
}
bool CompactionTask::should_enable_pk_parallel_execution(int64_t input_bytes) {
if (_tablet.get_schema()->keys_type() != KeysType::PRIMARY_KEYS) {
return false;
}
// pk parallel execution is only work when all conditions are met:
// 1. whether use cloud native index
// 2. whether use light compaction publish
// 3. whether input_bytes is large enough
auto metadata = _tablet.metadata();
bool use_cloud_native_index = metadata->enable_persistent_index() &&
metadata->persistent_index_type() == PersistentIndexTypePB::CLOUD_NATIVE;
bool use_light_compaction_publish = config::enable_light_pk_compaction_publish &&
StorageEngine::instance()->get_persistent_index_store(_tablet.id()) != nullptr;
return use_cloud_native_index && use_light_compaction_publish &&
input_bytes >= config::pk_parallel_execution_threshold_bytes;
}
} // namespace starrocks::lake

View File

@ -53,6 +53,8 @@ public:
Status fill_compaction_segment_info(TxnLogPB_OpCompaction* op_compaction, TabletWriter* writer);
bool should_enable_pk_parallel_execution(int64_t input_bytes);
protected:
int64_t _txn_id;
VersionedTablet _tablet;

View File

@ -65,7 +65,7 @@ Status HorizontalCompactionTask::execute(CancelFunc cancel_func, ThreadPool* flu
RETURN_IF_ERROR(writer->open());
DeferOp defer([&]() { writer->close(); });
if (input_bytes >= config::pk_parallel_execution_threshold_bytes) {
if (should_enable_pk_parallel_execution(input_bytes)) {
writer->try_enable_pk_parallel_execution();
}

View File

@ -0,0 +1,46 @@
// Copyright 2021-present StarRocks, Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "storage/lake/tablet_writer.h"
#include "storage/lake/tablet_manager.h"
namespace starrocks::lake {
void TabletWriter::try_enable_pk_parallel_execution() {
if (!config::enable_pk_parallel_execution || _schema->keys_type() != KeysType::PRIMARY_KEYS ||
_schema->has_separate_sort_key()) {
return;
}
auto metadata = _tablet_mgr->get_latest_cached_tablet_metadata(_tablet_id);
if (metadata != nullptr) {
// Pk parallel execution only support cloud native pk index.
if (!metadata->enable_persistent_index() ||
metadata->persistent_index_type() != PersistentIndexTypePB::CLOUD_NATIVE) {
return;
}
}
// For primary key table with single key column and the type is not VARCHAR/CHAR,
// we can't enable pk parrallel execution. The reason is that, in the current implementation,
// when encoding a single-key column of a non-binary type, big-endian encoding is not used,
// which may result in incorrect ordering between sst and segment files.
// This is a legacy bug, but for compatibility reasons, it will not be supported in the first phase.
// Will fix it later.
if (_schema->num_key_columns() > 1 || _schema->column(0).type() == LogicalType::TYPE_VARCHAR ||
_schema->column(0).type() == LogicalType::TYPE_CHAR) {
_enable_pk_parallel_execution = true;
}
}
} // namespace starrocks::lake

View File

@ -148,22 +148,7 @@ public:
// When the system determines that pk parallel execution can be enabled
// (for example, during large imports or major compaction tasks), it will invoke this function.
// However, whether pk parallel execution is actually enabled still depends on the schema.
void try_enable_pk_parallel_execution() {
if (!config::enable_pk_parallel_execution || _schema->keys_type() != KeysType::PRIMARY_KEYS ||
_schema->has_separate_sort_key()) {
return;
}
// For primary key table with single key column and the type is not VARCHAR/CHAR,
// we can't enable pk parrallel execution. The reason is that, in the current implementation,
// when encoding a single-key column of a non-binary type, big-endian encoding is not used,
// which may result in incorrect ordering between sst and segment files.
// This is a legacy bug, but for compatibility reasons, it will not be supported in the first phase.
// Will fix it later.
if (_schema->num_key_columns() > 1 || _schema->column(0).type() == LogicalType::TYPE_VARCHAR ||
_schema->column(0).type() == LogicalType::TYPE_CHAR) {
_enable_pk_parallel_execution = true;
}
}
void try_enable_pk_parallel_execution();
bool enable_pk_parallel_execution() const { return _enable_pk_parallel_execution; }

View File

@ -47,6 +47,11 @@
namespace starrocks::lake {
static bool use_cloud_native_pk_index(const TabletMetadata& metadata) {
return metadata.enable_persistent_index() &&
metadata.persistent_index_type() == PersistentIndexTypePB::CLOUD_NATIVE;
}
UpdateManager::UpdateManager(std::shared_ptr<LocationProvider> location_provider, MemTracker* mem_tracker)
: _index_cache(std::numeric_limits<size_t>::max()),
_update_state_cache(std::numeric_limits<size_t>::max()),
@ -279,8 +284,10 @@ Status UpdateManager::publish_primary_key_tablet(const TxnLogPB_OpWrite& op_writ
TRACE_COUNTER_SCOPE_LATENCY_US("update_index_latency_us");
DCHECK(state.upserts(segment_id) != nullptr);
if (condition_column < 0) {
RETURN_IF_ERROR(_do_update(rowset_id, segment_id, state.upserts(segment_id), index, &new_deletes,
op_write.ssts_size() > 0 /* read pk index only when ingest sst */));
RETURN_IF_ERROR(
_do_update(rowset_id, segment_id, state.upserts(segment_id), index, &new_deletes,
op_write.ssts_size() > 0 &&
use_cloud_native_pk_index(*metadata) /* read pk index only when ingest sst */));
} else {
RETURN_IF_ERROR(_do_update_with_condition(params, rowset_id, segment_id, condition_column,
state.upserts(segment_id)->pk_column, index, &new_deletes));
@ -293,7 +300,7 @@ Status UpdateManager::publish_primary_key_tablet(const TxnLogPB_OpWrite& op_writ
_index_cache.update_object_size(index_entry, index.memory_usage());
state.release_segment(segment_id);
_update_state_cache.update_object_size(state_entry, state.memory_usage());
if (op_write.ssts_size() > 0 && condition_column < 0) {
if (op_write.ssts_size() > 0 && condition_column < 0 && use_cloud_native_pk_index(*metadata)) {
// TODO support condition column with sst ingestion.
// rowset_id + segment_id is the rssid of this segment
RETURN_IF_ERROR(index.ingest_sst(op_write.ssts(segment_id), rowset_id + segment_id, metadata->version(),
@ -1192,7 +1199,7 @@ Status UpdateManager::light_publish_primary_compaction(const TxnLogPB_OpCompacti
auto resolver = std::make_unique<LakePrimaryKeyCompactionConflictResolver>(&metadata, &output_rowset, _tablet_mgr,
builder, &index, txn_id, base_version,
&segment_id_to_add_dels, &delvecs);
if (op_compaction.ssts_size() > 0) {
if (op_compaction.ssts_size() > 0 && use_cloud_native_pk_index(metadata)) {
RETURN_IF_ERROR(resolver->execute_without_update_index());
} else {
RETURN_IF_ERROR(resolver->execute());
@ -1204,7 +1211,7 @@ Status UpdateManager::light_publish_primary_compaction(const TxnLogPB_OpCompacti
// 4. ingest ssts to index
DCHECK(op_compaction.ssts_size() == 0 || delvecs.size() == op_compaction.ssts_size())
<< "delvecs.size(): " << delvecs.size() << ", op_compaction.ssts_size(): " << op_compaction.ssts_size();
for (int i = 0; i < op_compaction.ssts_size(); i++) {
for (int i = 0; i < op_compaction.ssts_size() && use_cloud_native_pk_index(metadata); i++) {
// metadata.next_rowset_id() + i is the rssid of output rowset's i-th segment
DelvecPagePB delvec_page_pb = builder->delvec_page(metadata.next_rowset_id() + i);
delvec_page_pb.set_version(metadata.version());

View File

@ -58,7 +58,7 @@ Status VerticalCompactionTask::execute(CancelFunc cancel_func, ThreadPool* flush
RETURN_IF_ERROR(writer->open());
DeferOp defer([&]() { writer->close(); });
if (input_bytes >= config::pk_parallel_execution_threshold_bytes) {
if (should_enable_pk_parallel_execution(input_bytes)) {
writer->try_enable_pk_parallel_execution();
}

View File

@ -1602,6 +1602,53 @@ TEST_P(LakePrimaryKeyCompactionTest, test_major_compaction_thread_safe) {
config::l0_max_mem_usage = l0_max_mem_usage;
}
TEST_P(LakePrimaryKeyCompactionTest, test_should_enable_pk_parallel_execution) {
// Prepare data for writing
auto chunk0 = generate_data(kChunkSize, 0);
auto indexes = std::vector<uint32_t>(kChunkSize);
for (int i = 0; i < kChunkSize; i++) {
indexes[i] = i;
}
auto version = 1;
auto tablet_id = _tablet_metadata->id();
for (int i = 0; i < 3; i++) {
auto txn_id = next_id();
ASSIGN_OR_ABORT(auto delta_writer, DeltaWriterBuilder()
.set_tablet_manager(_tablet_mgr.get())
.set_tablet_id(tablet_id)
.set_txn_id(txn_id)
.set_partition_id(_partition_id)
.set_mem_tracker(_mem_tracker.get())
.set_schema_id(_tablet_schema->id())
.set_profile(&_dummy_runtime_profile)
.build());
ASSERT_OK(delta_writer->open());
ASSERT_OK(delta_writer->write(chunk0, indexes.data(), indexes.size()));
ASSERT_OK(delta_writer->finish_with_txnlog());
delta_writer->close();
// Publish version
ASSERT_OK(publish_single_version(tablet_id, version + 1, txn_id).status());
version++;
}
auto txn_id = next_id();
auto task_context = std::make_unique<CompactionTaskContext>(txn_id, tablet_id, version, false, false, nullptr);
ASSIGN_OR_ABORT(auto task, _tablet_mgr->compact(task_context.get()));
// check should_enable_pk_parallel_execution
if (!GetParam().enable_persistent_index || GetParam().persistent_index_type == PersistentIndexTypePB::LOCAL) {
EXPECT_FALSE(task->should_enable_pk_parallel_execution(0));
EXPECT_FALSE(task->should_enable_pk_parallel_execution(config::pk_parallel_execution_threshold_bytes - 1));
EXPECT_FALSE(task->should_enable_pk_parallel_execution(config::pk_parallel_execution_threshold_bytes));
EXPECT_FALSE(task->should_enable_pk_parallel_execution(config::pk_parallel_execution_threshold_bytes + 1));
} else {
EXPECT_FALSE(task->should_enable_pk_parallel_execution(0));
EXPECT_FALSE(task->should_enable_pk_parallel_execution(config::pk_parallel_execution_threshold_bytes - 1));
EXPECT_TRUE(task->should_enable_pk_parallel_execution(config::pk_parallel_execution_threshold_bytes));
EXPECT_TRUE(task->should_enable_pk_parallel_execution(config::pk_parallel_execution_threshold_bytes + 1));
}
}
INSTANTIATE_TEST_SUITE_P(
LakePrimaryKeyCompactionTest, LakePrimaryKeyCompactionTest,
::testing::Values(CompactionParam{HORIZONTAL_COMPACTION, 5, false},