[BugFix] support lazy delta column compact for size tiered compaction in pk table to reduce cost (backport #61930) (#62244)

Signed-off-by: luohaha <18810541851@163.com>
Co-authored-by: Yixin Luo <18810541851@163.com>
This commit is contained in:
mergify[bot] 2025-08-23 09:36:42 +08:00 committed by GitHub
parent cf71a82f85
commit d14a733a70
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 105 additions and 37 deletions

View File

@ -2181,7 +2181,9 @@ Status TabletUpdates::_commit_compaction(std::unique_ptr<CompactionInfo>* pinfo,
EditVersionMetaPB edit;
auto lastv = _edit_version_infos.back().get();
// handle conflict between column mode partial update
RETURN_IF_ERROR(_check_conflict_with_partial_update((*pinfo).get()));
if (rowset->num_rows() > 0) {
RETURN_IF_ERROR(_check_conflict_with_partial_update((*pinfo).get()));
}
auto edit_version_pb = edit.mutable_version();
edit_version_pb->set_major_number(lastv->version.major_number());
edit_version_pb->set_minor_number(lastv->version.minor_number() + 1);
@ -3080,6 +3082,7 @@ Status TabletUpdates::compaction_for_size_tiered(MemTracker* mem_tracker) {
size_t total_valid_rowsets = 0;
size_t total_valid_segments = 0;
bool has_partial_update_by_column = false;
// level -1 keep empty rowsets and have no IO overhead, so we can merge them with any level
std::map<int, vector<CompactionEntry>> candidates_by_level;
{
@ -3106,6 +3109,7 @@ Status TabletUpdates::compaction_for_size_tiered(MemTracker* mem_tracker) {
e.num_dels = stat.num_dels;
e.bytes = stat.byte_size;
e.num_segments = stat.num_segments;
has_partial_update_by_column |= stat.partial_update_by_column;
}
}
}
@ -3116,7 +3120,21 @@ Status TabletUpdates::compaction_for_size_tiered(MemTracker* mem_tracker) {
int64_t max_score = 0;
for (auto& [level, candidates] : candidates_by_level) {
if (level == -1) {
continue;
// When we enable lazy delta column compaction, which means that we don't want to merge
// delta column back to main segment file too soon, for save compaction IO cost.
// Separate delta column won't affect query performance.
// check if there is rowset with column update and more than 1, trigger lazy compaction strategy.
if (has_partial_update_by_column && candidates.size() > 1 && config::enable_lazy_delta_column_compaction) {
for (auto& e : candidates) {
info->inputs.emplace_back(e.rowsetid);
}
VLOG(1) << "trigger lazy compaction strategy for tablet:" << _tablet.tablet_id()
<< " because of column update rowset count:" << candidates.size();
// only merge empty rowsets, so no need to consider other level
break;
} else {
continue;
}
}
int64_t total_segments = 0;
int64_t del_rows = 0;
@ -3135,46 +3153,50 @@ Status TabletUpdates::compaction_for_size_tiered(MemTracker* mem_tracker) {
int64_t total_merged_segments = 0;
RowsetStats stat;
std::set<int32_t> compaction_level_candidate;
max_score = 0;
do {
auto iter = candidates_by_level.find(compaction_level);
if (iter == candidates_by_level.end()) {
break;
}
for (auto& e : iter->second) {
size_t new_rows = stat.num_rows + e.num_rows - e.num_dels;
size_t new_bytes = stat.byte_size;
if (e.num_rows != 0) {
new_bytes += e.bytes * (e.num_rows - e.num_dels) / e.num_rows;
}
if ((stat.byte_size > 0 && new_bytes > config::update_compaction_result_bytes * 2) ||
info->inputs.size() >= config::max_update_compaction_num_singleton_deltas) {
if (info->inputs.empty()) {
// no trigger lazy compaction strategy, try to merge level by level
max_score = 0;
do {
auto iter = candidates_by_level.find(compaction_level);
if (iter == candidates_by_level.end()) {
break;
}
max_score += e.score_per_row * (e.num_rows - e.num_dels);
info->inputs.emplace_back(e.rowsetid);
stat.num_rows = new_rows;
stat.byte_size = new_bytes;
total_rows += e.num_rows;
total_bytes += e.bytes;
total_merged_segments += e.num_segments;
}
compaction_level_candidate.insert(compaction_level);
compaction_level = _calc_compaction_level(&stat);
stat.num_segments = stat.byte_size > 0 ? (stat.byte_size - 1) / config::max_segment_file_size + 1 : 0;
_calc_compaction_score(&stat);
} while (stat.byte_size <= config::update_compaction_result_bytes * 2 &&
info->inputs.size() < config::max_update_compaction_num_singleton_deltas &&
compaction_level_candidate.find(compaction_level) == compaction_level_candidate.end() &&
candidates_by_level.find(compaction_level) != candidates_by_level.end() && stat.compaction_score > 0);
if (compaction_level_candidate.find(-1) == compaction_level_candidate.end()) {
if (candidates_by_level[-1].size() > 0) {
for (auto& e : candidates_by_level[-1]) {
for (auto& e : iter->second) {
size_t new_rows = stat.num_rows + e.num_rows - e.num_dels;
size_t new_bytes = stat.byte_size;
if (e.num_rows != 0) {
new_bytes += e.bytes * (e.num_rows - e.num_dels) / e.num_rows;
}
if ((stat.byte_size > 0 && new_bytes > config::update_compaction_result_bytes * 2) ||
info->inputs.size() >= config::max_update_compaction_num_singleton_deltas) {
break;
}
max_score += e.score_per_row * (e.num_rows - e.num_dels);
info->inputs.emplace_back(e.rowsetid);
stat.num_rows = new_rows;
stat.byte_size = new_bytes;
total_rows += e.num_rows;
total_bytes += e.bytes;
total_merged_segments += e.num_segments;
}
compaction_level_candidate.insert(-1);
compaction_level_candidate.insert(compaction_level);
compaction_level = _calc_compaction_level(&stat);
stat.num_segments = stat.byte_size > 0 ? (stat.byte_size - 1) / config::max_segment_file_size + 1 : 0;
_calc_compaction_score(&stat);
} while (stat.byte_size <= config::update_compaction_result_bytes * 2 &&
info->inputs.size() < config::max_update_compaction_num_singleton_deltas &&
compaction_level_candidate.find(compaction_level) == compaction_level_candidate.end() &&
candidates_by_level.find(compaction_level) != candidates_by_level.end() && stat.compaction_score > 0);
if (compaction_level_candidate.find(-1) == compaction_level_candidate.end()) {
if (candidates_by_level[-1].size() > 0) {
for (auto& e : candidates_by_level[-1]) {
info->inputs.emplace_back(e.rowsetid);
total_merged_segments += e.num_segments;
}
compaction_level_candidate.insert(-1);
}
}
}

View File

@ -1444,6 +1444,52 @@ TEST_P(RowsetColumnPartialUpdateTest, test_dcg_file_size) {
ASSERT_GT(dcg_file_size, 0) << "dcg file size should be greater than 0";
}
TEST_P(RowsetColumnPartialUpdateTest, partial_update_with_size_tier_compaction) {
const int N = 100;
auto tablet = create_tablet(rand(), rand());
ASSERT_EQ(1, tablet->updates()->version_history_count());
// create full rowsets first
std::vector<int64_t> keys(N);
for (int i = 0; i < N; i++) {
keys[i] = i;
}
std::vector<RowsetSharedPtr> rowsets;
rowsets.emplace_back(create_rowset(tablet, keys));
int64_t version = 1;
commit_rowsets(tablet, rowsets, version);
// check data
ASSERT_TRUE(check_tablet(tablet, version, N, [](int64_t k1, int64_t v1, int32_t v2, int32_t v3) {
return (int16_t)(k1 % 100 + 1) == v1 && (int32_t)(k1 % 1000 + 2) == v2;
}));
std::vector<int32_t> column_indexes = {0, 1};
auto v1_func = [](int64_t k1) { return (int16_t)(k1 % 100 + 3); };
auto v2_func = [](int64_t k1) { return (int32_t)(k1 % 1000 + 4); };
std::shared_ptr<TabletSchema> partial_schema = TabletSchema::create(tablet->tablet_schema(), column_indexes);
for (int i = 0; i < 10; i++) {
// create partial rowset
RowsetSharedPtr partial_rowset =
create_partial_rowset(tablet, keys, column_indexes, v1_func, v2_func, partial_schema, 1);
// commit partial update
auto st = tablet->rowset_commit(++version, partial_rowset, 10000);
ASSERT_TRUE(st.ok()) << st.to_string();
}
// check data
ASSERT_TRUE(check_tablet(tablet, version, N, [](int64_t k1, int64_t v1, int32_t v2, int32_t v3) {
return (int16_t)(k1 % 100 + 3) == v1 && (int32_t)(k1 % 1000 + 2) == v2;
}));
// trigger size tiered compaction
config::enable_pk_size_tiered_compaction_strategy = true;
ASSERT_TRUE(tablet->updates()->compaction(_compaction_mem_tracker.get()).ok());
// check data
ASSERT_TRUE(check_tablet(tablet, version, N, [](int64_t k1, int64_t v1, int32_t v2, int32_t v3) {
return (int16_t)(k1 % 100 + 3) == v1 && (int32_t)(k1 % 1000 + 2) == v2;
}));
// there will be two rowsets
ASSERT_TRUE(tablet->updates()->num_rowsets() == 2);
}
INSTANTIATE_TEST_SUITE_P(RowsetColumnPartialUpdateTest, RowsetColumnPartialUpdateTest,
::testing::Values(RowsetColumnPartialUpdateParam{1, false},
RowsetColumnPartialUpdateParam{1024, true},