[Enhancement] Add clone metrics in backend (backport #62479) (#62607)

Signed-off-by: wyb <wybb86@gmail.com>
Co-authored-by: wyb <wybb86@gmail.com>
This commit is contained in:
mergify[bot] 2025-09-02 02:15:40 +00:00 committed by GitHub
parent 4d0d380972
commit d3dedf1051
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
12 changed files with 108 additions and 34 deletions

View File

@ -161,8 +161,6 @@ static void unify_finish_agent_task(TStatusCode::type status_code, const std::ve
}
void run_drop_tablet_task(const std::shared_ptr<DropTabletAgentTaskRequest>& agent_task_req, ExecEnv* exec_env) {
StarRocksMetrics::instance()->clone_requests_total.increment(1);
const TDropTabletReq& drop_tablet_req = agent_task_req->task_req;
bool force_drop = drop_tablet_req.__isset.force && drop_tablet_req.force;
@ -348,6 +346,7 @@ void run_clear_transaction_task(const std::shared_ptr<ClearTransactionAgentTaskR
}
void run_clone_task(const std::shared_ptr<CloneAgentTaskRequest>& agent_task_req, ExecEnv* exec_env) {
StarRocksMetrics::instance()->clone_requests_total.increment(1);
const TCloneReq& clone_req = agent_task_req->task_req;
AgentStatus status = STARROCKS_SUCCESS;
@ -366,6 +365,7 @@ void run_clone_task(const std::shared_ptr<CloneAgentTaskRequest>& agent_task_req
if (clone_req.__isset.is_local && clone_req.is_local) {
DataDir* dest_store = StorageEngine::instance()->get_store(clone_req.dest_path_hash);
if (dest_store == nullptr) {
StarRocksMetrics::instance()->clone_requests_failed.increment(1);
LOG(WARNING) << "fail to get dest store. path_hash:" << clone_req.dest_path_hash;
status_code = TStatusCode::RUNTIME_ERROR;
} else {
@ -374,6 +374,7 @@ void run_clone_task(const std::shared_ptr<CloneAgentTaskRequest>& agent_task_req
need_rebuild_pk_index);
Status res = StorageEngine::instance()->execute_task(&engine_task);
if (!res.ok()) {
StarRocksMetrics::instance()->clone_requests_failed.increment(1);
status_code = TStatusCode::RUNTIME_ERROR;
LOG(WARNING) << "local tablet migration failed. status: " << res
<< ", signature: " << agent_task_req->signature;
@ -392,6 +393,14 @@ void run_clone_task(const std::shared_ptr<CloneAgentTaskRequest>& agent_task_req
tablet_infos.push_back(tablet_info);
}
finish_task_request.__set_finish_tablet_infos(tablet_infos);
int64_t copy_size = engine_task.get_copy_size();
finish_task_request.__set_copy_size(copy_size);
StarRocksMetrics::instance()->clone_task_intra_node_copy_bytes.increment(copy_size);
int64_t copy_time_ms = engine_task.get_copy_time_ms();
finish_task_request.__set_copy_time_ms(copy_time_ms);
StarRocksMetrics::instance()->clone_task_intra_node_copy_duration_ms.increment(copy_time_ms);
}
}
} else {
@ -399,6 +408,7 @@ void run_clone_task(const std::shared_ptr<CloneAgentTaskRequest>& agent_task_req
&error_msgs, &tablet_infos, &status);
Status res = StorageEngine::instance()->execute_task(&engine_task);
if (!res.ok()) {
StarRocksMetrics::instance()->clone_requests_failed.increment(1);
status_code = TStatusCode::RUNTIME_ERROR;
LOG(WARNING) << "clone failed. status:" << res << ", signature:" << agent_task_req->signature;
error_msgs.emplace_back("clone failed.");
@ -412,8 +422,14 @@ void run_clone_task(const std::shared_ptr<CloneAgentTaskRequest>& agent_task_req
LOG(INFO) << "clone success, set tablet infos. status:" << status
<< ", signature:" << agent_task_req->signature;
finish_task_request.__set_finish_tablet_infos(tablet_infos);
finish_task_request.__set_copy_size(engine_task.get_copy_size());
finish_task_request.__set_copy_time_ms(engine_task.get_copy_time_ms());
int64_t copy_size = engine_task.get_copy_size();
finish_task_request.__set_copy_size(copy_size);
StarRocksMetrics::instance()->clone_task_inter_node_copy_bytes.increment(copy_size);
int64_t copy_time_ms = engine_task.get_copy_time_ms();
finish_task_request.__set_copy_time_ms(copy_time_ms);
StarRocksMetrics::instance()->clone_task_inter_node_copy_duration_ms.increment(copy_time_ms);
}
}
}

View File

@ -155,16 +155,17 @@ inline StatusOr<int64_t> copy_by_range(RandomAccessFile* src, WritableFile* dest
}
// copy the file from src path to dest path, it will overwrite the existing files
inline Status copy_file(const std::string& src_path, const std::string& dst_path, size_t buffer_size = 8192) {
inline StatusOr<int64_t> copy_file(const std::string& src_path, const std::string& dst_path,
size_t buffer_size = 8192) {
TEST_ERROR_POINT("fs::copy_file");
WritableFileOptions opts{.sync_on_close = true, .mode = FileSystem::CREATE_OR_OPEN_WITH_TRUNCATE};
ASSIGN_OR_RETURN(auto src_fs, FileSystem::CreateSharedFromString(src_path));
ASSIGN_OR_RETURN(auto dst_fs, FileSystem::CreateSharedFromString(dst_path));
ASSIGN_OR_RETURN(auto src_file, src_fs->new_sequential_file(src_path));
ASSIGN_OR_RETURN(auto dst_file, dst_fs->new_writable_file(opts, dst_path));
RETURN_IF_ERROR(copy(src_file.get(), dst_file.get(), buffer_size));
ASSIGN_OR_RETURN(auto ncopy, copy(src_file.get(), dst_file.get(), buffer_size));
RETURN_IF_ERROR(dst_file->close());
return Status::OK();
return ncopy;
}
// copy the file range [offset, offset + size] from src path to dest path, it will overwrite the existing files

View File

@ -468,7 +468,7 @@ Status publish_log_version(TabletManager* tablet_mgr, int64_t tablet_id, std::sp
auto txn_log_path = tablet_mgr->txn_log_location(tablet_id, txn_id);
auto txn_vlog_path = tablet_mgr->txn_vlog_location(tablet_id, log_version);
// TODO: use rename() API if supported by the underlying filesystem.
auto st = fs::copy_file(txn_log_path, txn_vlog_path);
auto st = fs::copy_file(txn_log_path, txn_vlog_path).status();
if (st.is_not_found()) {
ASSIGN_OR_RETURN(auto fs, FileSystem::CreateSharedFromString(txn_vlog_path));
auto check_st = fs->path_exists(txn_vlog_path);

View File

@ -550,7 +550,8 @@ Status Rowset::_link_delta_column_group_files(KVStore* kvstore, const std::strin
return Status::OK();
}
Status Rowset::copy_files_to(KVStore* kvstore, const std::string& dir) {
StatusOr<int64_t> Rowset::copy_files_to(KVStore* kvstore, const std::string& dir) {
int64_t ncopy = 0;
for (int i = 0; i < num_segments(); ++i) {
std::string dst_path = segment_file_path(dir, rowset_id(), i);
if (fs::path_exist(dst_path)) {
@ -558,12 +559,16 @@ Status Rowset::copy_files_to(KVStore* kvstore, const std::string& dir) {
return Status::AlreadyExist(fmt::format("Path already exist: {}", dst_path));
}
std::string src_path = segment_file_path(_rowset_path, rowset_id(), i);
if (!fs::copy_file(src_path, dst_path).ok()) {
auto copy_st = fs::copy_file(src_path, dst_path);
if (!copy_st.ok()) {
LOG(WARNING) << "Error to copy file. src:" << src_path << ", dst:" << dst_path
<< ", errno=" << std::strerror(Errno::no());
return Status::IOError(fmt::format("Error to copy file. src: {}, dst: {}, error:{} ", src_path, dst_path,
std::strerror(Errno::no())));
} else {
ncopy += copy_st.value();
}
// copy index
const auto& indexes = *_schema->indexes();
if (!indexes.empty()) {
@ -586,12 +591,15 @@ Status Rowset::copy_files_to(KVStore* kvstore, const std::string& dir) {
auto dst_absolute_path =
fmt::format("{}/{}_{}_{}_{}", dir, rowset_id().to_string(), i, index.index_id(), file);
if (!fs::copy_file(src_absolute_path, dst_absolute_path).ok()) {
copy_st = fs::copy_file(src_absolute_path, dst_absolute_path);
if (!copy_st.ok()) {
LOG(WARNING) << "Error to copy index. src:" << src_absolute_path
<< ", dst:" << dst_absolute_path << ", errno=" << std::strerror(Errno::no());
return Status::IOError(fmt::format("Error to copy file. src: {}, dst: {}, error:{} ",
src_absolute_path, dst_absolute_path,
std::strerror(Errno::no())));
} else {
ncopy += copy_st.value();
}
}
}
@ -606,11 +614,14 @@ Status Rowset::copy_files_to(KVStore* kvstore, const std::string& dir) {
LOG(WARNING) << "Path already exist: " << dst_path;
return Status::AlreadyExist(fmt::format("Path already exist: {}", dst_path));
}
if (!fs::copy_file(src_path, dst_path).ok()) {
auto copy_st = fs::copy_file(src_path, dst_path);
if (!copy_st.ok()) {
LOG(WARNING) << "Error to copy file. src:" << src_path << ", dst:" << dst_path
<< ", errno=" << std::strerror(Errno::no());
return Status::IOError(fmt::format("Error to copy file. src: {}, dst: {}, error:{} ", src_path,
dst_path, std::strerror(Errno::no())));
} else {
ncopy += copy_st.value();
}
}
}
@ -622,19 +633,26 @@ Status Rowset::copy_files_to(KVStore* kvstore, const std::string& dir) {
LOG(WARNING) << "Path already exist: " << dst_path;
return Status::AlreadyExist(fmt::format("Path already exist: {}", dst_path));
}
if (!fs::copy_file(src_path, dst_path).ok()) {
auto copy_st = fs::copy_file(src_path, dst_path);
if (!copy_st.ok()) {
LOG(WARNING) << "Error to copy file. src:" << src_path << ", dst:" << dst_path
<< ", errno=" << std::strerror(Errno::no());
return Status::IOError(fmt::format("Error to copy file. src: {}, dst: {}, error:{} ", src_path,
dst_path, std::strerror(Errno::no())));
} else {
ncopy += copy_st.value();
}
}
}
RETURN_IF_ERROR(_copy_delta_column_group_files(kvstore, dir, INT64_MAX));
return Status::OK();
ASSIGN_OR_RETURN(auto delta_file_size, _copy_delta_column_group_files(kvstore, dir, INT64_MAX));
ncopy += delta_file_size;
return ncopy;
}
Status Rowset::_copy_delta_column_group_files(KVStore* kvstore, const std::string& dir, int64_t version) {
StatusOr<int64_t> Rowset::_copy_delta_column_group_files(KVStore* kvstore, const std::string& dir, int64_t version) {
int64_t ncopy = 0;
if (num_segments() > 0 && kvstore != nullptr && _rowset_path != dir) {
// link dcg files
for (int i = 0; i < num_segments(); i++) {
@ -661,18 +679,20 @@ Status Rowset::_copy_delta_column_group_files(KVStore* kvstore, const std::strin
return Status::AlreadyExist(fmt::format("Path already exist: {}", dst_copy_path));
}
if (!fs::copy_file(src_file_path.c_str(), dst_copy_path.c_str()).ok()) {
auto copy_st = fs::copy_file(src_file_path.c_str(), dst_copy_path.c_str());
if (!copy_st.ok()) {
LOG(WARNING) << "Fail to copy " << src_file_path << " to " << dst_copy_path;
return Status::RuntimeError(fmt::format("Fail to copy segment cols file, src: {}, dst {}",
src_file_path, dst_copy_path));
} else {
ncopy += copy_st.value();
VLOG(2) << "success to copy " << src_file_path << " to " << dst_copy_path;
}
}
}
}
}
return Status::OK();
return ncopy;
}
void Rowset::do_close() {

View File

@ -292,8 +292,8 @@ public:
// `version` is used for link col files, default using INT64_MAX means link all col files
Status link_files_to(KVStore* kvstore, const std::string& dir, RowsetId new_rowset_id, int64_t version = INT64_MAX);
// copy all files to `dir`
Status copy_files_to(KVStore* kvstore, const std::string& dir);
// copy all files to `dir`, returns total copy bytes
StatusOr<int64_t> copy_files_to(KVStore* kvstore, const std::string& dir);
static std::string segment_file_path(const std::string& segment_dir, const RowsetId& rowset_id, int segment_id);
static std::string segment_temp_file_path(const std::string& dir, const RowsetId& rowset_id, int segment_id);
@ -430,7 +430,7 @@ private:
Status _link_delta_column_group_files(KVStore* kvstore, const std::string& dir, int64_t version);
Status _copy_delta_column_group_files(KVStore* kvstore, const std::string& dir, int64_t version);
StatusOr<int64_t> _copy_delta_column_group_files(KVStore* kvstore, const std::string& dir, int64_t version);
StatusOr<std::shared_ptr<Segment>> _load_segment(int32_t idx, const TabletSchemaCSPtr& schema,
std::shared_ptr<FileSystem>& fs,

View File

@ -30,7 +30,7 @@ Status SegmentRewriter::rewrite_partial_update(const FileInfo& src, FileInfo* de
if (UNLIKELY(column_ids.empty())) {
// In shared-nothing mode, this size can be null, and we don't need it so it's ok to return zero;
dest->size = src.size.value_or(0);
return fs::copy_file(src.path, dest->path, kBufferSize);
return fs::copy_file(src.path, dest->path, kBufferSize).status();
}
ASSIGN_OR_RETURN(auto fs, FileSystem::CreateSharedFromString(dest->path));
RandomAccessFileOptions ropts;

View File

@ -611,22 +611,23 @@ void EngineStorageMigrationTask::_generate_new_header(DataDir* store, const uint
// remove old meta after the new tablet is loaded successfully
}
Status EngineStorageMigrationTask::_copy_index_and_data_files(
const string& schema_hash_path, const TabletSharedPtr& ref_tablet,
const std::vector<RowsetSharedPtr>& consistent_rowsets) const {
Status status = Status::OK();
Status EngineStorageMigrationTask::_copy_index_and_data_files(const string& schema_hash_path,
const TabletSharedPtr& ref_tablet,
const std::vector<RowsetSharedPtr>& consistent_rowsets) {
MonotonicStopWatch watch;
watch.start();
for (const auto& rs : consistent_rowsets) {
bool bg_worker_stopped = StorageEngine::instance()->bg_worker_stopped();
if (bg_worker_stopped) {
status = Status::InternalError("Process is going to quit.");
break;
}
status = rs->copy_files_to(ref_tablet->data_dir()->get_meta(), schema_hash_path);
if (!status.ok()) {
break;
return Status::InternalError("Process is going to quit.");
}
ASSIGN_OR_RETURN(auto copy_size, rs->copy_files_to(ref_tablet->data_dir()->get_meta(), schema_hash_path));
_copy_size += copy_size;
}
return status;
uint64_t total_time_ms = watch.elapsed_time() / 1000 / 1000;
_copy_time_ms = (int64_t)total_time_ms;
return Status::OK();
}
} // namespace starrocks

View File

@ -54,6 +54,9 @@ public:
bool need_rebuild_pk_index);
~EngineStorageMigrationTask() override = default;
int64_t get_copy_size() const { return _copy_size; }
int64_t get_copy_time_ms() const { return _copy_time_ms; }
private:
Status _storage_migrate(TabletSharedPtr tablet);
@ -62,7 +65,7 @@ private:
const TabletMetaSharedPtr& new_tablet_meta);
Status _copy_index_and_data_files(const std::string& header_path, const TabletSharedPtr& ref_tablet,
const std::vector<RowsetSharedPtr>& consistent_rowsets) const;
const std::vector<RowsetSharedPtr>& consistent_rowsets);
Status _finish_migration(const TabletSharedPtr& tablet, int64_t end_version, uint64_t shard,
const std::vector<RowsetSharedPtr>& consistent_rowsets,
@ -79,5 +82,7 @@ private:
TSchemaHash _schema_hash;
DataDir* _dest_store;
bool _need_rebuild_pk_index{false};
int64_t _copy_size{0};
int64_t _copy_time_ms{0};
}; // EngineTask
} // namespace starrocks

View File

@ -113,6 +113,16 @@ StarRocksMetrics::StarRocksMetrics() : _metrics(_s_registry_name) {
REGISTER_STARROCKS_METRIC(primary_key_wait_apply_done_duration_ms);
REGISTER_STARROCKS_METRIC(primary_key_wait_apply_done_total);
// clone
_metrics.register_metric("clone_task_copy_bytes", MetricLabels().add("type", "INTER_NODE"),
&clone_task_inter_node_copy_bytes);
_metrics.register_metric("clone_task_copy_bytes", MetricLabels().add("type", "INTRA_NODE"),
&clone_task_intra_node_copy_bytes);
_metrics.register_metric("clone_task_copy_duration_ms", MetricLabels().add("type", "INTER_NODE"),
&clone_task_inter_node_copy_duration_ms);
_metrics.register_metric("clone_task_copy_duration_ms", MetricLabels().add("type", "INTRA_NODE"),
&clone_task_intra_node_copy_duration_ms);
// push request
_metrics.register_metric("push_requests_total", MetricLabels().add("status", "SUCCESS"),
&push_requests_success_total);

View File

@ -151,6 +151,12 @@ public:
METRIC_DEFINE_INT_COUNTER(finish_task_requests_total, MetricUnit::REQUESTS);
METRIC_DEFINE_INT_COUNTER(finish_task_requests_failed, MetricUnit::REQUESTS);
// clone
METRIC_DEFINE_INT_COUNTER(clone_task_inter_node_copy_bytes, MetricUnit::BYTES);
METRIC_DEFINE_INT_COUNTER(clone_task_intra_node_copy_bytes, MetricUnit::BYTES);
METRIC_DEFINE_INT_COUNTER(clone_task_inter_node_copy_duration_ms, MetricUnit::MILLISECONDS);
METRIC_DEFINE_INT_COUNTER(clone_task_intra_node_copy_duration_ms, MetricUnit::MILLISECONDS);
// Compaction Task Metric
// compaction task num, including all finished tasks and failed tasks
METRIC_DEFINE_INT_COUNTER(base_compaction_request_total, MetricUnit::REQUESTS);

View File

@ -347,10 +347,14 @@ public:
}
EngineStorageMigrationTask migration_task(tablet_id, schema_hash, dest_path, false);
ASSERT_OK(migration_task.execute());
ASSERT_GT(migration_task.get_copy_size(), 0);
ASSERT_GE(migration_task.get_copy_time_ms(), 0);
// sleep 2 second for add latency for load
sleep(2);
EngineStorageMigrationTask migration_task_2(tablet_id, schema_hash, source_path, false);
ASSERT_OK(migration_task_2.execute());
ASSERT_GT(migration_task_2.get_copy_size(), 0);
ASSERT_GE(migration_task_2.get_copy_time_ms(), 0);
}
void do_chain_path_migration(int64_t tablet_id, int32_t schema_hash) {

View File

@ -239,6 +239,13 @@ TEST_F(StarRocksMetricsTest, Normal) {
ASSERT_TRUE(metric != nullptr);
ASSERT_STREQ("22", metric->to_string().c_str());
}
{
instance->clone_requests_total.increment(23);
auto metric = metrics->get_metric("engine_requests_total",
MetricLabels().add("type", "clone").add("status", "total"));
ASSERT_TRUE(metric != nullptr);
ASSERT_STREQ("23", metric->to_string().c_str());
}
// comapction
{
instance->base_compaction_deltas_total.increment(30);
@ -374,6 +381,10 @@ TEST_F(StarRocksMetricsTest, test_metrics_register) {
ASSERT_NE(nullptr, instance->get_metric("delta_writer_wait_replica_duration_us"));
ASSERT_NE(nullptr, instance->get_metric("delta_writer_txn_commit_duration_us"));
ASSERT_NE(nullptr, instance->get_metric("memtable_finalize_duration_us"));
ASSERT_NE(nullptr, instance->get_metric("clone_task_copy_bytes", MetricLabels().add("type", "INTER_NODE")));
ASSERT_NE(nullptr, instance->get_metric("clone_task_copy_bytes", MetricLabels().add("type", "INTRA_NODE")));
ASSERT_NE(nullptr, instance->get_metric("clone_task_copy_duration_ms", MetricLabels().add("type", "INTER_NODE")));
ASSERT_NE(nullptr, instance->get_metric("clone_task_copy_duration_ms", MetricLabels().add("type", "INTRA_NODE")));
}
} // namespace starrocks