[BugFix] avoid get file size in report tablet stat thread (#61901)

Signed-off-by: luohaha <18810541851@163.com>
This commit is contained in:
Yixin Luo 2025-08-18 15:35:39 +08:00 committed by GitHub
parent c2c16f36d9
commit 463b29f8bd
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
15 changed files with 99 additions and 86 deletions

View File

@ -24,11 +24,12 @@ namespace starrocks {
void DeltaColumnGroup::init(int64_t version, const std::vector<std::vector<ColumnUID>>& column_ids,
const std::vector<std::string>& column_files,
const std::vector<std::string>& encryption_metas) {
const std::vector<std::string>& encryption_metas, int64_t file_size) {
_version = version;
_column_uids = column_ids;
_column_files = column_files;
_encryption_metas = encryption_metas;
_file_size = file_size;
_calc_memory_usage();
}
@ -128,6 +129,7 @@ Status DeltaColumnGroup::load(int64_t version, const char* data, size_t length)
_column_uids.back().push_back(cid);
}
}
_file_size = dcg_pb.file_size();
_calc_memory_usage();
return Status::OK();
}
@ -164,6 +166,7 @@ std::string DeltaColumnGroup::save() const {
dcg_col_pb->add_column_ids(cid);
}
}
dcg_pb.set_file_size(_file_size);
std::string result;
dcg_pb.SerializeToString(&result);
return result;
@ -187,6 +190,7 @@ std::string DeltaColumnGroupListSerializer::serialize_delta_column_group_list(co
dcg_col_pb->add_column_ids(cid);
}
}
dcg_pb.set_file_size(dcg->file_size());
dcgs_pb.add_dcgs()->CopyFrom(dcg_pb);
}
@ -232,7 +236,7 @@ Status DeltaColumnGroupListSerializer::_deserialize_delta_column_group_list(cons
encryption_metas.push_back(dcgs_pb.dcgs(i).encryption_metas(j));
}
}
dcg->init(dcgs_pb.versions(i), column_ids, column_files, encryption_metas);
dcg->init(dcgs_pb.versions(i), column_ids, column_files, encryption_metas, dcgs_pb.dcgs(i).file_size());
dcgs->push_back(dcg);
}
return Status::OK();

View File

@ -37,7 +37,8 @@ public:
DeltaColumnGroup() {}
~DeltaColumnGroup() {}
void init(int64_t version, const std::vector<std::vector<ColumnUID>>& column_ids,
const std::vector<std::string>& column_files, const std::vector<std::string>& encryption_metas = {});
const std::vector<std::string>& column_files, const std::vector<std::string>& encryption_metas = {},
int64_t file_size = 0);
Status load(int64_t version, const char* data, size_t length);
Status load(int64_t version, const DeltaColumnGroupVerPB& dcg_ver_pb);
std::string save() const;
@ -104,6 +105,8 @@ public:
const std::vector<std::string>& encryption_metas() const { return _encryption_metas; }
int64_t file_size() const { return _file_size; }
private:
void _calc_memory_usage();
@ -113,6 +116,7 @@ private:
std::vector<std::string> _column_files;
std::vector<std::string> _encryption_metas;
size_t _memory_usage = 0;
int64_t _file_size = 0; // file size of all column files
};
class DeltaColumnGroupLoader {

View File

@ -3706,6 +3706,7 @@ Status PersistentIndex::commit(PersistentIndexMetaPB* index_meta, IOStat* stat)
}
if (stat != nullptr) {
stat->reload_meta_cost += watch.elapsed_time();
stat->total_file_size = (_l0 ? _l0->file_size() : 0) + _l1_l2_file_size();
}
_calc_memory_usage();

View File

@ -100,14 +100,16 @@ struct IOStat {
uint64_t flush_or_wal_cost = 0;
uint64_t compaction_cost = 0;
uint64_t reload_meta_cost = 0;
uint64_t total_file_size = 0;
std::string print_str() {
return fmt::format(
"IOStat read_iops: {} filtered_kv_cnt: {} get_in_shard_cost: {} read_io_bytes: {} "
"l0_write_cost: {} "
"l1_l2_read_cost: {} flush_or_wal_cost: {} compaction_cost: {} reload_meta_cost: {}",
"l1_l2_read_cost: {} flush_or_wal_cost: {} compaction_cost: {} reload_meta_cost: {} total_file_size: "
"{}",
read_iops, filtered_kv_cnt, get_in_shard_cost, read_io_bytes, l0_write_cost, l1_l2_read_cost,
flush_or_wal_cost, compaction_cost, reload_meta_cost);
flush_or_wal_cost, compaction_cost, reload_meta_cost, total_file_size);
}
};

View File

@ -1130,10 +1130,10 @@ Status PrimaryIndex::prepare(const EditVersion& version, size_t n) {
return Status::OK();
}
Status PrimaryIndex::commit(PersistentIndexMetaPB* index_meta) {
Status PrimaryIndex::commit(PersistentIndexMetaPB* index_meta, IOStat* stat) {
auto scope = IOProfiler::scope(IOProfiler::TAG_PKINDEX, _tablet_id);
if (_persistent_index != nullptr) {
return _persistent_index->commit(index_meta);
return _persistent_index->commit(index_meta, stat);
}
_calc_memory_usage();
return Status::OK();

View File

@ -127,7 +127,7 @@ public:
Status prepare(const EditVersion& version, size_t n);
Status commit(PersistentIndexMetaPB* index_meta);
Status commit(PersistentIndexMetaPB* index_meta, IOStat* stat = nullptr);
Status on_commited();

View File

@ -761,6 +761,7 @@ Status RowsetColumnUpdateState::finalize(Tablet* tablet, Rowset* rowset, uint32_
// It means column_1 and column_2 are stored in aaa.cols, and column_3 and column_4 are stored in bbb.cols
std::map<uint32_t, std::vector<std::vector<ColumnUID>>> dcg_column_ids;
std::map<uint32_t, std::vector<std::string>> dcg_column_files;
std::map<uint32_t, int64_t> rssid_to_segment_file_size;
// 3. read from raw segment file and update file, and generate `.col` files one by one
int idx = 0; // It is used for generate different .cols filename
for (uint32_t col_index = 0; col_index < update_column_ids.size(); col_index += BATCH_HANDLE_COLUMN_CNT) {
@ -814,6 +815,7 @@ Status RowsetColumnUpdateState::finalize(Tablet* tablet, Rowset* rowset, uint32_
dcg_column_ids[each.first].push_back(selective_unique_update_column_ids);
dcg_column_files[each.first].push_back(file_name(delta_column_group_writer->segment_path()));
handle_cnt++;
rssid_to_segment_file_size[each.first] += segment_file_size;
}
idx++;
}
@ -821,7 +823,8 @@ Status RowsetColumnUpdateState::finalize(Tablet* tablet, Rowset* rowset, uint32_
for (const auto& each : rss_upt_id_to_rowid_pairs) {
_rssid_to_delta_column_group[each.first] = std::make_shared<DeltaColumnGroup>();
_rssid_to_delta_column_group[each.first]->init(latest_applied_version.major_number() + 1,
dcg_column_ids[each.first], dcg_column_files[each.first]);
dcg_column_ids[each.first], dcg_column_files[each.first], {},
rssid_to_segment_file_size[each.first]);
}
cost_str << " [generate delta column group] " << watch.elapsed_time();
watch.reset();

View File

@ -277,6 +277,15 @@ Status TabletUpdates::_load_from_pb(const TabletUpdatesPB& tablet_updates_pb) {
}
}
// load extra file size. Include persistent index files and delta column files.
if (tablet_updates_pb.has_extra_file_size()) {
_extra_file_size_cache.pindex_size = tablet_updates_pb.extra_file_size().pindex_size();
_extra_file_size_cache.col_size = tablet_updates_pb.extra_file_size().col_size();
} else {
_extra_file_size_cache.pindex_size = 0;
_extra_file_size_cache.col_size = 0;
}
RETURN_IF_ERROR(_load_meta_and_log(tablet_updates_pb));
{
@ -412,16 +421,7 @@ size_t TabletUpdates::data_size() const {
LOG_EVERY_N(WARNING, 10) << "data_size() some rowset stats not found tablet=" << _tablet.tablet_id()
<< " rowset=" << err_rowsets;
}
auto size_st = _get_extra_file_size();
if (!size_st.ok()) {
// Ignore error status here, because we don't to break up tablet report because of get extra file size failure.
// So just print error log and keep going.
VLOG(2) << "get extra file size in primary table fail, tablet_id: " << _tablet.tablet_id()
<< " status: " << size_st.status();
return total_size;
} else {
return total_size + (*size_st).pindex_size + (*size_st).col_size;
}
return total_size + _extra_file_size_cache.pindex_size + _extra_file_size_cache.col_size;
}
size_t TabletUpdates::num_rows() const {
@ -477,16 +477,7 @@ std::pair<int64_t, int64_t> TabletUpdates::num_rows_and_data_size() const {
LOG_EVERY_N(WARNING, 10) << "data_size() some rowset stats not found tablet=" << _tablet.tablet_id()
<< " rowset=" << err_rowsets;
}
auto size_st = _get_extra_file_size();
if (!size_st.ok()) {
// Ignore error status here, because we don't to break up tablet report because of get extra file size failure.
// So just print error log and keep going.
VLOG(2) << "get extra file size in primary table fail, tablet_id: " << _tablet.tablet_id()
<< " status: " << size_st.status();
return {total_row, total_size};
} else {
return {total_row, total_size + (*size_st).pindex_size + (*size_st).col_size};
}
return {total_row, total_size + _extra_file_size_cache.pindex_size + _extra_file_size_cache.col_size};
}
size_t TabletUpdates::num_rowsets() const {
@ -1637,7 +1628,8 @@ Status TabletUpdates::_apply_normal_rowset_commit(const EditVersionInfo& version
}
}
span->AddEvent("commit_index");
st = index.commit(index_meta);
IOStat stat;
st = index.commit(index_meta, &stat);
FAIL_POINT_TRIGGER_EXECUTE(tablet_apply_index_commit_failed,
{ st = Status::InternalError("inject tablet_apply_index_commit_failed"); });
if (!st.ok()) {
@ -1646,6 +1638,7 @@ Status TabletUpdates::_apply_normal_rowset_commit(const EditVersionInfo& version
return apply_st;
}
_extra_file_size_cache.pindex_size.store(stat.total_file_size);
manager->index_cache().update_object_size(index_entry, index.memory_usage());
// release resource
// update state only used once, so delete it
@ -2507,7 +2500,8 @@ Status TabletUpdates::_apply_compaction_commit(const EditVersionInfo& version_in
}
int64_t t_index_delvec = MonotonicMillis();
st = index.commit(index_meta);
IOStat stat;
st = index.commit(index_meta, &stat);
FAIL_POINT_TRIGGER_EXECUTE(tablet_apply_index_commit_failed,
{ st = Status::InternalError("inject tablet_apply_index_commit_failed"); });
if (!st.ok()) {
@ -2516,6 +2510,8 @@ Status TabletUpdates::_apply_compaction_commit(const EditVersionInfo& version_in
failure_handler(msg, st.code());
return apply_st;
}
_extra_file_size_cache.pindex_size.store(stat.total_file_size);
manager->index_cache().update_object_size(index_entry, index.memory_usage());
{
@ -2766,6 +2762,10 @@ void TabletUpdates::remove_expired_versions(int64_t expire_time) {
std::unique_lock wrlock(_tablet.get_header_lock());
rewrite_rs_meta(false);
}
// get delta column group file size
_extra_file_size_cache.col_size.store(
StorageEngine::instance()->update_manager()->get_delta_column_group_file_size_by_tablet_id(
_tablet.tablet_id()));
// GC works that can be done outside of lock
if (num_version_removed > 0) {
@ -3571,38 +3571,6 @@ size_t TabletUpdates::_get_rowset_num_deletes(const Rowset& rowset) {
return num_dels;
}
StatusOr<ExtraFileSize> TabletUpdates::_get_extra_file_size() const {
ExtraFileSize ef_size;
#if !defined(ADDRESS_SANITIZER)
std::string tablet_path_str = _tablet.schema_hash_path();
std::filesystem::path tablet_path(tablet_path_str.c_str());
try {
for (const auto& entry : std::filesystem::directory_iterator(tablet_path)) {
if (entry.is_regular_file()) {
std::string filename = entry.path().filename().string();
if (filename.starts_with("index.l")) {
ef_size.pindex_size += entry.file_size();
} else if (filename.ends_with(".cols")) {
// TODO skip the expired cols file
ef_size.col_size += entry.file_size();
}
}
}
} catch (const std::filesystem::filesystem_error& ex) {
std::string err_msg = "Iterate dir " + tablet_path.string() + " Filesystem error: " + ex.what();
return Status::InternalError(err_msg);
} catch (const std::exception& ex) {
std::string err_msg = "Iterate dir " + tablet_path.string() + " Standard error: " + ex.what();
return Status::InternalError(err_msg);
} catch (...) {
std::string err_msg = "Iterate dir " + tablet_path.string() + " Unknown exception occurred.";
return Status::InternalError(err_msg);
}
#endif
return ef_size;
}
void TabletUpdates::get_tablet_info_extra(TTabletInfo* info) {
int64_t min_readable_version = 0;
int64_t max_readable_version = 0;
@ -3646,16 +3614,7 @@ void TabletUpdates::get_tablet_info_extra(TTabletInfo* info) {
LOG_EVERY_N(WARNING, 10) << "get_tablet_info_extra() some rowset stats not found tablet=" << _tablet.tablet_id()
<< " rowset=" << err_rowsets;
}
auto size_st = _get_extra_file_size();
if (!size_st.ok()) {
// Ignore error status here, because we don't to break up tablet report because of get extra file size failure.
// So just print error log and keep going.
VLOG(2) << "get extra file size in primary table fail, tablet_id: " << _tablet.tablet_id()
<< " status: " << size_st.status();
} else {
total_size += (*size_st).pindex_size + (*size_st).col_size;
}
total_size += _extra_file_size_cache.pindex_size + _extra_file_size_cache.col_size;
info->__set_version(version);
info->__set_min_readable_version(min_readable_version);
info->__set_max_readable_version(max_readable_version);
@ -4785,15 +4744,7 @@ void TabletUpdates::get_basic_info_extra(TabletBasicInfo& info) {
info.index_mem = index_entry->size();
index_cache.release(index_entry);
}
auto size_st = _get_extra_file_size();
if (!size_st.ok()) {
// Ignore error status here, because we don't to break up get basic info because of get pk index disk usage failure.
// So just print error log and keep going.
VLOG(2) << "get persistent index disk usage fail, tablet_id: " << _tablet.tablet_id()
<< ", error: " << size_st.status();
} else {
info.index_disk_usage = (*size_st).pindex_size;
}
info.index_disk_usage = _extra_file_size_cache.pindex_size;
}
Status TabletUpdates::pk_index_major_compaction() {
@ -4860,6 +4811,9 @@ void TabletUpdates::_to_updates_pb_unlocked(TabletUpdatesPB* updates_pb) const {
}
updates_pb->set_next_rowset_id(_next_rowset_id);
updates_pb->set_next_log_id(_next_log_id);
// set extra file size
updates_pb->mutable_extra_file_size()->set_pindex_size(_extra_file_size_cache.pindex_size);
updates_pb->mutable_extra_file_size()->set_col_size(_extra_file_size_cache.col_size);
if (_apply_version_idx < _edit_version_infos.size()) {
const EditVersion& apply_version = _edit_version_infos[_apply_version_idx]->version;
updates_pb->mutable_apply_version()->set_major_number(apply_version.major_number());

View File

@ -73,8 +73,8 @@ struct CompactionInfo {
};
struct ExtraFileSize {
int64_t pindex_size = 0;
int64_t col_size = 0;
std::atomic<int64_t> pindex_size = 0;
std::atomic<int64_t> col_size = 0;
};
struct EditVersionInfo {
@ -520,8 +520,6 @@ private:
std::shared_timed_mutex* get_index_lock() { return &_index_lock; }
StatusOr<ExtraFileSize> _get_extra_file_size() const;
bool _use_light_apply_compaction(Rowset* rowset);
Status _light_apply_compaction_commit(const EditVersion& version, Rowset* output_rowset, PrimaryIndex* index,
@ -598,6 +596,9 @@ private:
std::atomic<bool> _apply_schedule{false};
size_t _apply_failed_time = 0;
// cache of latest ExtraFileSize
ExtraFileSize _extra_file_size_cache;
};
} // namespace starrocks

View File

@ -325,6 +325,19 @@ void UpdateManager::clear_cached_delta_column_group_by_tablet_id(int64_t tablet_
}
}
int64_t UpdateManager::get_delta_column_group_file_size_by_tablet_id(int64_t tablet_id) {
int64_t file_size = 0;
std::lock_guard<std::mutex> lg(_delta_column_group_cache_lock);
auto itr = _delta_column_group_cache.lower_bound(TabletSegmentId(tablet_id, 0));
while (itr != _delta_column_group_cache.end() && itr->first.tablet_id == tablet_id) {
if (!itr->second.empty()) {
file_size += itr->second[0]->file_size(); // only latest dcg file size.
}
itr++;
}
return file_size;
}
void UpdateManager::clear_cached_delta_column_group(const std::vector<TabletSegmentId>& tsids) {
std::lock_guard<std::mutex> lg(_delta_column_group_cache_lock);
for (const auto& tsid : tsids) {

View File

@ -124,6 +124,7 @@ public:
void clear_cached_delta_column_group_by_tablet_id(int64_t tablet_id);
void clear_cached_delta_column_group(const std::vector<TabletSegmentId>& tsids);
int64_t get_delta_column_group_file_size_by_tablet_id(int64_t tablet_id);
StatusOr<size_t> clear_delta_column_group_before_version(KVStore* meta, const std::string& tablet_path,
int64_t tablet_id, int64_t min_readable_version);

View File

@ -1431,6 +1431,19 @@ TEST_P(RowsetColumnPartialUpdateTest, partial_update_with_compaction_conflict_ch
config::enable_light_pk_compaction_publish = true;
}
TEST_P(RowsetColumnPartialUpdateTest, test_dcg_file_size) {
const int N = 100;
auto tablet = create_tablet(rand(), rand());
ASSERT_EQ(1, tablet->updates()->version_history_count());
int64_t version = 1;
int64_t version_before_partial_update = 1;
prepare_tablet(this, tablet, version, version_before_partial_update, N);
// get dcg file size
int64_t dcg_file_size = StorageEngine::instance()->update_manager()->get_delta_column_group_file_size_by_tablet_id(
tablet->tablet_id());
ASSERT_GT(dcg_file_size, 0) << "dcg file size should be greater than 0";
}
INSTANTIATE_TEST_SUITE_P(RowsetColumnPartialUpdateTest, RowsetColumnPartialUpdateTest,
::testing::Values(RowsetColumnPartialUpdateParam{1, false},
RowsetColumnPartialUpdateParam{1024, true},

View File

@ -910,6 +910,16 @@ void TabletUpdatesTest::test_apply(bool enable_persistent_index, bool has_merge_
ASSERT_EQ(N, read_tablet(_tablet, i));
}
test_pk_dump(rowsets.size());
// test extra file size;
// get TabletUpdatesPB
TabletUpdatesPB updates_pb;
_tablet->updates()->to_updates_pb(&updates_pb);
// check extra file size exist.
if (enable_persistent_index) {
ASSERT_TRUE(updates_pb.extra_file_size().pindex_size() > 0);
} else {
ASSERT_TRUE(updates_pb.extra_file_size().pindex_size() == 0);
}
}
TEST_F(TabletUpdatesTest, apply) {

View File

@ -61,6 +61,7 @@ message DeltaColumnGroupPB {
repeated DeltaColumnGroupColumnIdsPB column_ids = 1;
repeated string column_files = 2;
repeated bytes encryption_metas = 3;
optional int64 file_size = 4;
}
message DeltaColumnGroupListPB {

View File

@ -257,11 +257,17 @@ message TabletMetaLogPB {
repeated TabletMetaOpPB ops = 1;
}
message TabletUpdatesExtraFileSize {
optional int64 pindex_size = 1; // total file size of persistent index.
optional int64 col_size = 2; // total file size of delta column files.
}
message TabletUpdatesPB {
repeated EditVersionMetaPB versions = 1;
optional EditVersionPB apply_version = 2;
optional uint32 next_rowset_id = 3;
optional uint64 next_log_id = 4;
optional TabletUpdatesExtraFileSize extra_file_size = 5;
}
message BinlogConfigPB {