Signed-off-by: starrocks-xupeng <xupeng@starrocks.com> Co-authored-by: starrocks-xupeng <xupeng@starrocks.com>
This commit is contained in:
parent
d2e01def4b
commit
9dd848f23c
|
|
@ -451,22 +451,35 @@ void AgentServer::Impl::submit_tasks(TAgentResult& agent_result, const std::vect
|
|||
}
|
||||
}
|
||||
|
||||
#define HANDLE_TASK(t_task_type, all_tasks, do_func, AGENT_REQ, request, env) \
|
||||
for (auto* task : all_tasks) { \
|
||||
auto pool = get_thread_pool(t_task_type); \
|
||||
auto signature = task->signature; \
|
||||
std::pair<bool, size_t> register_pair = register_task_info(task_type, signature); \
|
||||
if (register_pair.first) { \
|
||||
LOG(INFO) << "Submit task success. type=" << t_task_type << ", signature=" << signature \
|
||||
<< ", task_count_in_queue=" << register_pair.second; \
|
||||
ret_st = pool->submit_func( \
|
||||
std::bind(do_func, std::make_shared<AGENT_REQ>(*task, task->request, time(nullptr)), env)); \
|
||||
if (!ret_st.ok()) { \
|
||||
LOG(WARNING) << "fail to submit task. reason: " << ret_st.message() << ", task: " << task; \
|
||||
} \
|
||||
} else { \
|
||||
LOG(INFO) << "Submit task failed, already exists type=" << t_task_type << ", signature=" << signature; \
|
||||
} \
|
||||
#define HANDLE_TASK(t_task_type, all_tasks, do_func, AGENT_REQ, request, env) \
|
||||
{ \
|
||||
std::string submit_log = "Submit task success. type=" + to_string(t_task_type) + ", signatures="; \
|
||||
size_t log_count = 0; \
|
||||
size_t queue_len = 0; \
|
||||
for (auto* task : all_tasks) { \
|
||||
auto pool = get_thread_pool(t_task_type); \
|
||||
auto signature = task->signature; \
|
||||
std::pair<bool, size_t> register_pair = register_task_info(task_type, signature); \
|
||||
if (register_pair.first) { \
|
||||
if (log_count++ < 100) { \
|
||||
submit_log += std::to_string(signature) + ","; \
|
||||
} \
|
||||
queue_len = register_pair.second; \
|
||||
ret_st = pool->submit_func( \
|
||||
std::bind(do_func, std::make_shared<AGENT_REQ>(*task, task->request, time(nullptr)), env)); \
|
||||
if (!ret_st.ok()) { \
|
||||
LOG(WARNING) << "fail to submit task. reason: " << ret_st.message() << ", task: " << task; \
|
||||
} \
|
||||
} else { \
|
||||
LOG(INFO) << "Submit task failed, already exists type=" << t_task_type << ", signature=" << signature; \
|
||||
} \
|
||||
} \
|
||||
if (queue_len > 0) { \
|
||||
if (log_count >= 100) { \
|
||||
submit_log += "...,"; \
|
||||
} \
|
||||
LOG(INFO) << submit_log << " task_count_in_queue=" << queue_len; \
|
||||
} \
|
||||
}
|
||||
|
||||
// batch submit tasks
|
||||
|
|
|
|||
|
|
@ -119,7 +119,7 @@ static void alter_tablet(const TAlterTabletReqV2& agent_task_req, int64_t signat
|
|||
if (status == STARROCKS_SUCCESS) {
|
||||
swap(finish_tablet_infos, finish_task_request->finish_tablet_infos);
|
||||
finish_task_request->__isset.finish_tablet_infos = true;
|
||||
LOG(INFO) << alter_msg_head << "alter success. signature: " << signature;
|
||||
VLOG(2) << alter_msg_head << "alter success. signature: " << signature;
|
||||
error_msgs.emplace_back("alter success");
|
||||
task_status.__set_status_code(TStatusCode::OK);
|
||||
} else if (status == STARROCKS_TASK_REQUEST_ERROR) {
|
||||
|
|
@ -156,8 +156,8 @@ static void unify_finish_agent_task(TStatusCode::type status_code, const std::ve
|
|||
|
||||
finish_task(finish_task_request);
|
||||
size_t task_queue_size = remove_task_info(task_type, signature);
|
||||
LOG(INFO) << "Remove task success. type=" << task_type << ", signature=" << signature
|
||||
<< ", task_count_in_queue=" << task_queue_size;
|
||||
VLOG(1) << "Remove task success. type=" << task_type << ", signature=" << signature
|
||||
<< ", task_count_in_queue=" << task_queue_size;
|
||||
}
|
||||
|
||||
void run_drop_tablet_task(const std::shared_ptr<DropTabletAgentTaskRequest>& agent_task_req, ExecEnv* exec_env) {
|
||||
|
|
@ -710,8 +710,7 @@ void run_upload_task(const std::shared_ptr<UploadAgentTaskRequest>& agent_task_r
|
|||
finish_task(finish_task_request);
|
||||
remove_task_info(agent_task_req->task_type, agent_task_req->signature);
|
||||
|
||||
LOG(INFO) << "Finished uploaded task signature=" << agent_task_req->signature
|
||||
<< " job id=" << upload_request.job_id;
|
||||
VLOG(1) << "Finished uploaded task signature=" << agent_task_req->signature << " job id=" << upload_request.job_id;
|
||||
}
|
||||
|
||||
void run_download_task(const std::shared_ptr<DownloadAgentTaskRequest>& agent_task_req, ExecEnv* exec_env) {
|
||||
|
|
@ -746,8 +745,8 @@ void run_download_task(const std::shared_ptr<DownloadAgentTaskRequest>& agent_ta
|
|||
finish_task(finish_task_request);
|
||||
remove_task_info(agent_task_req->task_type, agent_task_req->signature);
|
||||
|
||||
LOG(INFO) << "Finished downloaded task signature=" << agent_task_req->signature
|
||||
<< " job id=" << download_request.job_id;
|
||||
VLOG(1) << "Finished downloaded task signature=" << agent_task_req->signature
|
||||
<< " job id=" << download_request.job_id;
|
||||
}
|
||||
|
||||
void run_make_snapshot_task(const std::shared_ptr<SnapshotAgentTaskRequest>& agent_task_req, ExecEnv* exec_env) {
|
||||
|
|
@ -768,9 +767,9 @@ void run_make_snapshot_task(const std::shared_ptr<SnapshotAgentTaskRequest>& age
|
|||
<< " status=" << st.to_string();
|
||||
error_msgs.push_back("make_snapshot failed. status: " + st.to_string());
|
||||
} else {
|
||||
LOG(INFO) << "Created snapshot tablet_id=" << snapshot_request.tablet_id
|
||||
<< " schema_hash=" << snapshot_request.schema_hash << " version=" << snapshot_request.version
|
||||
<< " snapshot_path=" << snapshot_path;
|
||||
VLOG(1) << "Created snapshot tablet_id=" << snapshot_request.tablet_id
|
||||
<< " schema_hash=" << snapshot_request.schema_hash << " version=" << snapshot_request.version
|
||||
<< " snapshot_path=" << snapshot_path;
|
||||
if (snapshot_request.__isset.list_files) {
|
||||
// list and save all snapshot files
|
||||
// snapshot_path like: data/snapshot/20180417205230.1.86400
|
||||
|
|
@ -820,7 +819,7 @@ void run_release_snapshot_task(const std::shared_ptr<ReleaseSnapshotAgentTaskReq
|
|||
error_msgs.push_back("release_snapshot failed. status: " +
|
||||
boost::lexical_cast<std::string>(release_snapshot_status));
|
||||
} else {
|
||||
LOG(INFO) << "Released snapshot path=" << snapshot_path << " status=" << release_snapshot_status;
|
||||
VLOG(1) << "Released snapshot path=" << snapshot_path << " status=" << release_snapshot_status;
|
||||
}
|
||||
|
||||
unify_finish_agent_task(status_code, error_msgs, agent_task_req->task_type, agent_task_req->signature);
|
||||
|
|
@ -1047,8 +1046,8 @@ void run_remote_snapshot_task(const std::shared_ptr<RemoteSnapshotAgentTaskReque
|
|||
finish_task(finish_task_request);
|
||||
#endif
|
||||
auto task_queue_size = remove_task_info(agent_task_req->task_type, agent_task_req->signature);
|
||||
LOG(INFO) << "Remove task success. type=" << agent_task_req->task_type
|
||||
<< ", signature=" << agent_task_req->signature << ", task_count_in_queue=" << task_queue_size;
|
||||
VLOG(1) << "Remove task success. type=" << agent_task_req->task_type << ", signature=" << agent_task_req->signature
|
||||
<< ", task_count_in_queue=" << task_queue_size;
|
||||
}
|
||||
|
||||
void run_replicate_snapshot_task(const std::shared_ptr<ReplicateSnapshotAgentTaskRequest>& agent_task_req,
|
||||
|
|
@ -1092,8 +1091,8 @@ void run_replicate_snapshot_task(const std::shared_ptr<ReplicateSnapshotAgentTas
|
|||
finish_task(finish_task_request);
|
||||
#endif
|
||||
auto task_queue_size = remove_task_info(agent_task_req->task_type, agent_task_req->signature);
|
||||
LOG(INFO) << "Remove task success. type=" << agent_task_req->task_type
|
||||
<< ", signature=" << agent_task_req->signature << ", task_count_in_queue=" << task_queue_size;
|
||||
VLOG(1) << "Remove task success. type=" << agent_task_req->task_type << ", signature=" << agent_task_req->signature
|
||||
<< ", task_count_in_queue=" << task_queue_size;
|
||||
}
|
||||
|
||||
} // namespace starrocks
|
||||
|
|
|
|||
|
|
@ -401,8 +401,8 @@ void* DeleteTaskWorkerPool::_worker_thread_callback(void* arg_this) {
|
|||
|
||||
int num_of_remove_task = 0;
|
||||
if (push_req.push_type == TPushType::CANCEL_DELETE) {
|
||||
LOG(INFO) << "get delete push task. remove delete task txn_id: " << push_req.transaction_id
|
||||
<< " priority: " << priority << " push_type: " << push_req.push_type;
|
||||
VLOG(3) << "get delete push task. remove delete task txn_id: " << push_req.transaction_id
|
||||
<< " priority: " << priority << " push_type: " << push_req.push_type;
|
||||
|
||||
std::lock_guard l(worker_pool_this->_worker_thread_lock);
|
||||
auto& tasks = worker_pool_this->_tasks;
|
||||
|
|
@ -435,8 +435,8 @@ void* DeleteTaskWorkerPool::_worker_thread_callback(void* arg_this) {
|
|||
}
|
||||
auto& push_req = agent_task_req->task_req;
|
||||
|
||||
LOG(INFO) << "get delete push task. signature: " << agent_task_req->signature << " priority: " << priority
|
||||
<< " push_type: " << push_req.push_type;
|
||||
VLOG(3) << "get delete push task. signature: " << agent_task_req->signature << " priority: " << priority
|
||||
<< " push_type: " << push_req.push_type;
|
||||
std::vector<TTabletInfo> tablet_infos;
|
||||
EngineBatchLoadTask engine_task(push_req, &tablet_infos, agent_task_req->signature, &status,
|
||||
GlobalEnv::GetInstance()->load_mem_tracker());
|
||||
|
|
|
|||
|
|
@ -57,7 +57,7 @@ Status LakeSnapshotLoader::_get_existing_files_from_remote(BrokerServiceConnecti
|
|||
LOG(WARNING) << ss.str();
|
||||
return Status::InternalError(ss.str());
|
||||
}
|
||||
LOG(INFO) << "finished to list files from remote path. file num: " << list_rep.files.size();
|
||||
VLOG(2) << "finished to list files from remote path. file num: " << list_rep.files.size();
|
||||
|
||||
// split file name and checksum
|
||||
for (const auto& file : list_rep.files) {
|
||||
|
|
@ -79,7 +79,7 @@ Status LakeSnapshotLoader::_get_existing_files_from_remote(BrokerServiceConnecti
|
|||
<< ", checksum: " << std::string(file_name, pos + 1);
|
||||
}
|
||||
|
||||
LOG(INFO) << "finished to split files. valid file num: " << files->size();
|
||||
VLOG(2) << "finished to split files. valid file num: " << files->size();
|
||||
|
||||
} catch (apache::thrift::TException& e) {
|
||||
(void)client.reopen(config::thrift_rpc_timeout_ms);
|
||||
|
|
@ -124,7 +124,7 @@ Status LakeSnapshotLoader::_rename_remote_file(BrokerServiceConnection& client,
|
|||
return Status::ThriftRpcError(ss.str());
|
||||
}
|
||||
|
||||
LOG(INFO) << "finished to rename file. orig: " << orig_name << ", new: " << new_name;
|
||||
VLOG(2) << "finished to rename file. orig: " << orig_name << ", new: " << new_name;
|
||||
|
||||
return Status::OK();
|
||||
}
|
||||
|
|
@ -167,9 +167,10 @@ Status LakeSnapshotLoader::upload(const ::starrocks::UploadSnapshotsRequest* req
|
|||
LOG(WARNING) << ss.str();
|
||||
return Status::InternalError(ss.str());
|
||||
}
|
||||
LOG(INFO) << "begin to upload snapshot files. num: " << request->snapshots().size()
|
||||
<< ", broker addr: " << request->broker();
|
||||
VLOG(2) << "begin to upload snapshot files. num: " << request->snapshots().size()
|
||||
<< ", broker addr: " << request->broker();
|
||||
|
||||
int64_t start_ms = MonotonicMillis();
|
||||
for (auto& [tablet_id, snapshot] : request->snapshots()) {
|
||||
// TODO: support report logic
|
||||
|
||||
|
|
@ -234,13 +235,15 @@ Status LakeSnapshotLoader::upload(const ::starrocks::UploadSnapshotsRequest* req
|
|||
if (!res.ok()) {
|
||||
return res.status();
|
||||
}
|
||||
LOG(INFO) << "finished to write file via broker. file: " << file << ", length: " << *res;
|
||||
VLOG(2) << "finished to write file via broker. file: " << file << ", length: " << *res;
|
||||
RETURN_IF_ERROR(remote_writable_file->close());
|
||||
// rename file to end with ".md5sum"
|
||||
RETURN_IF_ERROR(_rename_remote_file(*client, full_remote_file + ".part", full_remote_file + "." + md5sum,
|
||||
broker_prop));
|
||||
}
|
||||
}
|
||||
LOG(INFO) << "finished to upload snapshots. num: " << request->snapshots().size()
|
||||
<< ", broker addr: " << request->broker() << ", cost: " << (MonotonicMillis() - start_ms) << "ms.";
|
||||
return status;
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -87,13 +87,14 @@ SnapshotLoader::SnapshotLoader(ExecEnv* env, int64_t job_id, int64_t task_id)
|
|||
Status SnapshotLoader::upload(const std::map<std::string, std::string>& src_to_dest_path, const TUploadReq& upload,
|
||||
std::map<int64_t, std::vector<std::string>>* tablet_files) {
|
||||
if (!upload.__isset.use_broker || upload.use_broker) {
|
||||
LOG(INFO) << "begin to upload snapshot files. num: " << src_to_dest_path.size()
|
||||
<< ", broker addr: " << upload.broker_addr << ", job id: " << _job_id << ", task id: " << _task_id;
|
||||
VLOG(2) << "begin to upload snapshot files. num: " << src_to_dest_path.size()
|
||||
<< ", broker addr: " << upload.broker_addr << ", job id: " << _job_id << ", task id: " << _task_id;
|
||||
} else {
|
||||
LOG(INFO) << "begin to upload snapshot files. num: " << src_to_dest_path.size() << ", job id: " << _job_id
|
||||
<< ", task id: " << _task_id;
|
||||
VLOG(2) << "begin to upload snapshot files. num: " << src_to_dest_path.size() << ", job id: " << _job_id
|
||||
<< ", task id: " << _task_id;
|
||||
}
|
||||
|
||||
int64_t start_ms = MonotonicMillis();
|
||||
Status status = Status::OK();
|
||||
// 1. validate local tablet snapshot paths
|
||||
RETURN_IF_ERROR(_check_local_snapshot_paths(src_to_dest_path, true));
|
||||
|
|
@ -202,7 +203,7 @@ Status SnapshotLoader::upload(const std::map<std::string, std::string>& src_to_d
|
|||
ASSIGN_OR_RETURN(auto file_size,
|
||||
fs::copy(input_file.get(), remote_writable_file.get(), config::upload_buffer_size));
|
||||
RETURN_IF_ERROR(remote_writable_file->close());
|
||||
LOG(INFO) << "finished to upload file: " << local_file_path << ", length: " << file_size;
|
||||
VLOG(2) << "finished to upload file: " << local_file_path << ", length: " << file_size;
|
||||
|
||||
// rename file to end with ".md5sum"
|
||||
if (!upload.__isset.use_broker || upload.use_broker) {
|
||||
|
|
@ -218,10 +219,11 @@ Status SnapshotLoader::upload(const std::map<std::string, std::string>& src_to_d
|
|||
|
||||
tablet_files->emplace(tablet_id, local_files_with_checksum);
|
||||
finished_num++;
|
||||
LOG(INFO) << "finished to write tablet to remote. local path: " << src_path << ", remote path: " << dest_path;
|
||||
VLOG(2) << "finished to write tablet to remote. local path: " << src_path << ", remote path: " << dest_path;
|
||||
} // end for each tablet path
|
||||
|
||||
LOG(INFO) << "finished to upload snapshots. job id: " << _job_id << ", task id: " << _task_id;
|
||||
LOG(INFO) << "finished to upload snapshots. num: " << src_to_dest_path.size() << ", job id: " << _job_id
|
||||
<< ", task id: " << _task_id << ", cost: " << (MonotonicMillis() - start_ms) << "ms.";
|
||||
return status;
|
||||
}
|
||||
|
||||
|
|
@ -800,7 +802,7 @@ Status SnapshotLoader::_check_local_snapshot_paths(const std::map<std::string, s
|
|||
return Status::RuntimeError(ss.str());
|
||||
}
|
||||
}
|
||||
LOG(INFO) << "all local snapshot paths are existing. num: " << src_to_dest_path.size();
|
||||
VLOG(2) << "all local snapshot paths are existing. num: " << src_to_dest_path.size();
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
|
|
@ -833,7 +835,7 @@ Status SnapshotLoader::_get_existing_files_from_remote(BrokerServiceConnection&
|
|||
LOG(WARNING) << ss.str();
|
||||
return Status::InternalError(ss.str());
|
||||
}
|
||||
LOG(INFO) << "finished to list files from remote path. file num: " << list_rep.files.size();
|
||||
VLOG(2) << "finished to list files from remote path. file num: " << list_rep.files.size();
|
||||
|
||||
// split file name and checksum
|
||||
for (const auto& file : list_rep.files) {
|
||||
|
|
@ -855,7 +857,7 @@ Status SnapshotLoader::_get_existing_files_from_remote(BrokerServiceConnection&
|
|||
<< ", checksum: " << std::string(file_name, pos + 1);
|
||||
}
|
||||
|
||||
LOG(INFO) << "finished to split files. valid file num: " << files->size();
|
||||
VLOG(2) << "finished to split files. valid file num: " << files->size();
|
||||
|
||||
} catch (apache::thrift::TException& e) {
|
||||
(void)client.reopen(config::thrift_rpc_timeout_ms);
|
||||
|
|
@ -904,7 +906,7 @@ Status SnapshotLoader::_get_existing_files_from_remote_without_broker(const std:
|
|||
return st;
|
||||
}
|
||||
|
||||
LOG(INFO) << "finished to split files. total file num: " << file_num << " valid file num: " << files->size();
|
||||
VLOG(2) << "finished to split files. total file num: " << file_num << " valid file num: " << files->size();
|
||||
|
||||
return Status::OK();
|
||||
}
|
||||
|
|
@ -918,7 +920,7 @@ Status SnapshotLoader::_get_existing_files_from_local(const std::string& local_p
|
|||
LOG(WARNING) << ss.str();
|
||||
return status;
|
||||
}
|
||||
LOG(INFO) << "finished to list files in local path: " << local_path << ", file num: " << local_files->size();
|
||||
VLOG(2) << "finished to list files in local path: " << local_path << ", file num: " << local_files->size();
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
|
|
@ -954,7 +956,7 @@ Status SnapshotLoader::_rename_remote_file(BrokerServiceConnection& client, cons
|
|||
return Status::ThriftRpcError(ss.str());
|
||||
}
|
||||
|
||||
LOG(INFO) << "finished to rename file. orig: " << orig_name << ", new: " << new_name;
|
||||
VLOG(2) << "finished to rename file. orig: " << orig_name << ", new: " << new_name;
|
||||
|
||||
return Status::OK();
|
||||
}
|
||||
|
|
@ -968,7 +970,7 @@ Status SnapshotLoader::_rename_remote_file_without_broker(const std::unique_ptr<
|
|||
LOG(WARNING) << ss.str();
|
||||
return Status::InternalError(ss.str());
|
||||
}
|
||||
LOG(INFO) << "finished to rename file. orig: " << orig_name << ", new: " << new_name;
|
||||
VLOG(2) << "finished to rename file. orig: " << orig_name << ", new: " << new_name;
|
||||
|
||||
return Status::OK();
|
||||
}
|
||||
|
|
|
|||
|
|
@ -90,11 +90,11 @@ void CompactionManager::submit_compaction_task(const CompactionCandidate& compac
|
|||
StarRocksMetrics::instance()->tablet_cumulative_max_compaction_score.set_value(compaction_candidate.score);
|
||||
}
|
||||
auto task_id = next_compaction_task_id();
|
||||
LOG(INFO) << "submit task to compaction pool"
|
||||
<< ", task_id:" << task_id << ", tablet_id:" << compaction_candidate.tablet->tablet_id()
|
||||
<< ", compaction_type:" << starrocks::to_string(compaction_candidate.type)
|
||||
<< ", compaction_score:" << compaction_candidate.score << " for round:" << _round
|
||||
<< ", candidates_size:" << candidates_size();
|
||||
VLOG(2) << "submit task to compaction pool"
|
||||
<< ", task_id:" << task_id << ", tablet_id:" << compaction_candidate.tablet->tablet_id()
|
||||
<< ", compaction_type:" << starrocks::to_string(compaction_candidate.type)
|
||||
<< ", compaction_score:" << compaction_candidate.score << " for round:" << _round
|
||||
<< ", candidates_size:" << candidates_size();
|
||||
auto manager = this;
|
||||
auto tablet = std::move(compaction_candidate.tablet);
|
||||
auto type = compaction_candidate.type;
|
||||
|
|
|
|||
|
|
@ -90,12 +90,12 @@ Status DeleteConditionHandler::generate_delete_predicate(const TabletSchema& sch
|
|||
in_pred->add_values(condition_value);
|
||||
}
|
||||
|
||||
LOG(INFO) << "store one sub-delete condition. condition name=" << in_pred->column_name()
|
||||
<< ",condition size=" << in_pred->values().size();
|
||||
VLOG(3) << "store one sub-delete condition. condition name=" << in_pred->column_name()
|
||||
<< ",condition size=" << in_pred->values().size();
|
||||
} else {
|
||||
string condition_str = construct_sub_predicates(condition);
|
||||
del_pred->add_sub_predicates(condition_str);
|
||||
LOG(INFO) << "store one sub-delete condition. condition=" << condition_str;
|
||||
VLOG(3) << "store one sub-delete condition. condition=" << condition_str;
|
||||
}
|
||||
}
|
||||
del_pred->set_version(-1);
|
||||
|
|
|
|||
|
|
@ -27,7 +27,7 @@ using PushBrokerReader = starrocks::PushBrokerReader;
|
|||
|
||||
Status SparkLoadHandler::process_streaming_ingestion(VersionedTablet& tablet, const TPushReq& request,
|
||||
PushType push_type, std::vector<TTabletInfo>* tablet_info_vec) {
|
||||
LOG(INFO) << "begin to realtime vectorized push. tablet=" << tablet.id() << ", txn_id=" << request.transaction_id;
|
||||
VLOG(3) << "begin to realtime vectorized push. tablet=" << tablet.id() << ", txn_id=" << request.transaction_id;
|
||||
DCHECK_EQ(push_type, PUSH_NORMAL_V2);
|
||||
|
||||
_request = request;
|
||||
|
|
|
|||
|
|
@ -40,8 +40,8 @@ namespace starrocks {
|
|||
// very useful in rollup action.
|
||||
Status PushHandler::process_streaming_ingestion(const TabletSharedPtr& tablet, const TPushReq& request,
|
||||
PushType push_type, std::vector<TTabletInfo>* tablet_info_vec) {
|
||||
LOG(INFO) << "begin to realtime vectorized push. tablet=" << tablet->full_name()
|
||||
<< ", txn_id: " << request.transaction_id;
|
||||
VLOG(3) << "begin to realtime vectorized push. tablet=" << tablet->full_name()
|
||||
<< ", txn_id: " << request.transaction_id;
|
||||
|
||||
_request = request;
|
||||
std::vector<TabletVars> tablet_vars(1);
|
||||
|
|
@ -52,9 +52,9 @@ Status PushHandler::process_streaming_ingestion(const TabletSharedPtr& tablet, c
|
|||
if (tablet_info_vec != nullptr) {
|
||||
_get_tablet_infos(tablet_vars, tablet_info_vec);
|
||||
}
|
||||
LOG(INFO) << "process realtime vectorized push successfully. "
|
||||
<< "tablet=" << tablet->full_name() << ", partition_id=" << request.partition_id
|
||||
<< ", txn_id: " << request.transaction_id;
|
||||
VLOG(3) << "process realtime vectorized push successfully. "
|
||||
<< "tablet=" << tablet->full_name() << ", partition_id=" << request.partition_id
|
||||
<< ", txn_id: " << request.transaction_id;
|
||||
}
|
||||
|
||||
return st;
|
||||
|
|
|
|||
|
|
@ -155,7 +155,7 @@ Status SnapshotManager::release_snapshot(const string& snapshot_path) {
|
|||
if (snapshot_path.compare(0, abs_path.size(), abs_path) == 0 &&
|
||||
snapshot_path.compare(abs_path.size(), SNAPSHOT_PREFIX.size(), SNAPSHOT_PREFIX) == 0) {
|
||||
(void)fs::remove_all(snapshot_path);
|
||||
LOG(INFO) << "success to release snapshot path. [path='" << snapshot_path << "']";
|
||||
VLOG(2) << "success to release snapshot path. [path='" << snapshot_path << "']";
|
||||
return Status::OK();
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -188,8 +188,9 @@ Status TabletManager::create_tablet(const TCreateTabletReq& request, std::vector
|
|||
|
||||
int64_t tablet_id = request.tablet_id;
|
||||
int32_t schema_hash = request.tablet_schema.schema_hash;
|
||||
LOG(INFO) << "Creating tablet " << tablet_id;
|
||||
VLOG(3) << "Creating tablet " << tablet_id;
|
||||
|
||||
int64_t start_ms = MonotonicMillis();
|
||||
std::unique_lock wlock(_get_tablets_shard_lock(tablet_id), std::defer_lock);
|
||||
std::shared_lock<std::shared_mutex> base_rlock;
|
||||
|
||||
|
|
@ -257,7 +258,7 @@ Status TabletManager::create_tablet(const TCreateTabletReq& request, std::vector
|
|||
return Status::InternalError("fail to create tablet");
|
||||
}
|
||||
|
||||
LOG(INFO) << "Created tablet " << tablet_id;
|
||||
LOG(INFO) << "Created tablet " << tablet_id << ", cost " << (MonotonicMillis() - start_ms) << "ms.";
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
|
|
@ -414,7 +415,7 @@ Status TabletManager::drop_tablet(TTabletId tablet_id, TabletDropFlag flag) {
|
|||
return Status::NotFound(strings::Substitute("tablet $0 not fount", tablet_id));
|
||||
}
|
||||
|
||||
LOG(INFO) << "Start to drop tablet " << tablet_id;
|
||||
VLOG(3) << "Start to drop tablet " << tablet_id;
|
||||
dropped_tablet = it->second;
|
||||
dropped_tablet->set_is_dropping(true);
|
||||
// we can not erase tablet from tablet map here.
|
||||
|
|
@ -422,6 +423,7 @@ Status TabletManager::drop_tablet(TTabletId tablet_id, TabletDropFlag flag) {
|
|||
// to complete, which can take a while. If a clone occurs in the meantime, a new tablet will be created, and
|
||||
// the new tablet and the old tablet may apply at the same time, modifying the primary key index at the same time.
|
||||
}
|
||||
int64_t start_ms = MonotonicMillis();
|
||||
if (config::enable_event_based_compaction_framework) {
|
||||
dropped_tablet->stop_compaction();
|
||||
StorageEngine::instance()->compaction_manager()->remove_candidate(dropped_tablet->tablet_id());
|
||||
|
|
@ -489,7 +491,7 @@ Status TabletManager::drop_tablet(TTabletId tablet_id, TabletDropFlag flag) {
|
|||
_remove_tablet_from_partition(*dropped_tablet);
|
||||
}
|
||||
dropped_tablet->deregister_tablet_from_dir();
|
||||
LOG(INFO) << "Succeed to drop tablet " << tablet_id;
|
||||
LOG(INFO) << "Succeed to drop tablet " << tablet_id << ", cost " << (MonotonicMillis() - start_ms) << "ms.";
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
|
|
@ -867,9 +869,9 @@ TabletSharedPtr TabletManager::find_best_tablet_to_do_update_compaction(DataDir*
|
|||
}
|
||||
|
||||
if (best_tablet != nullptr) {
|
||||
LOG(INFO) << "Found the best tablet to compact. "
|
||||
<< "compaction_type=update"
|
||||
<< " tablet_id=" << best_tablet->tablet_id() << " highest_score=" << highest_score;
|
||||
VLOG(2) << "Found the best tablet to compact. "
|
||||
<< "compaction_type=update"
|
||||
<< " tablet_id=" << best_tablet->tablet_id() << " highest_score=" << highest_score;
|
||||
StarRocksMetrics::instance()->tablet_update_max_compaction_score.set_value(highest_score);
|
||||
}
|
||||
return best_tablet;
|
||||
|
|
@ -1106,7 +1108,7 @@ void TabletManager::sweep_shutdown_tablet(const DroppedTabletInfo& info,
|
|||
}
|
||||
|
||||
if (st.ok() || st.is_not_found()) {
|
||||
LOG(INFO) << ((info.flag == kMoveFilesToTrash) ? "Moved " : " Removed ") << tablet->tablet_id_path();
|
||||
VLOG(3) << ((info.flag == kMoveFilesToTrash) ? "Moved " : "Removed ") << tablet->tablet_id_path();
|
||||
} else {
|
||||
LOG(WARNING) << "Fail to remove or move " << tablet->tablet_id_path() << " :" << st;
|
||||
return;
|
||||
|
|
@ -1215,15 +1217,14 @@ Status TabletManager::delete_shutdown_tablet(int64_t tablet_id) {
|
|||
return Status::NotFound(fmt::format("invalid flag: {}", to_delete.flag));
|
||||
}
|
||||
|
||||
if (st.ok() || st.is_not_found()) {
|
||||
LOG(INFO) << ((to_delete.flag == kMoveFilesToTrash) ? "Moved " : " Removed ") << tablet->tablet_id_path();
|
||||
} else {
|
||||
if (!st.ok() && !st.is_not_found()) {
|
||||
LOG(WARNING) << "Fail to remove or move " << tablet->tablet_id_path() << " :" << st;
|
||||
return st;
|
||||
}
|
||||
st = _remove_tablet_meta(tablet);
|
||||
if (st.ok() || st.is_not_found()) {
|
||||
LOG(INFO) << "Removed tablet meta of tablet " << tablet->tablet_id();
|
||||
VLOG(3) << "Removed tablet meta of tablet " << tablet->tablet_id() << ", "
|
||||
<< ((to_delete.flag == kMoveFilesToTrash) ? "Moved " : "Removed ") << tablet->tablet_id_path();
|
||||
} else {
|
||||
LOG(ERROR) << "Fail to remove tablet meta of tablet " << tablet->tablet_id() << ", status:" << st;
|
||||
return st;
|
||||
|
|
@ -1484,7 +1485,7 @@ Status TabletManager::_create_tablet_meta_unlocked(const TCreateTabletReq& reque
|
|||
for (uint32_t col_idx = 0; col_idx < next_unique_id; ++col_idx) {
|
||||
col_idx_to_unique_id[col_idx] = col_idx;
|
||||
}
|
||||
LOG(INFO) << "creating tablet meta. next_unique_id:" << next_unique_id;
|
||||
VLOG(3) << "creating tablet meta. next_unique_id:" << next_unique_id;
|
||||
} else {
|
||||
auto base_tablet_schema = base_tablet->tablet_schema();
|
||||
next_unique_id = base_tablet_schema->next_column_unique_id();
|
||||
|
|
@ -1559,6 +1560,7 @@ Status TabletManager::_drop_tablet_unlocked(TTabletId tablet_id, TabletDropFlag
|
|||
return Status::InvalidArgument(fmt::format("invalid TabletDropFlag {}", (int)flag));
|
||||
}
|
||||
|
||||
int64_t start_ms = MonotonicMillis();
|
||||
TabletMap& tablet_map = _get_tablet_map(tablet_id);
|
||||
auto it = tablet_map.find(tablet_id);
|
||||
if (it == tablet_map.end()) {
|
||||
|
|
@ -1566,7 +1568,7 @@ Status TabletManager::_drop_tablet_unlocked(TTabletId tablet_id, TabletDropFlag
|
|||
return Status::NotFound(strings::Substitute("tablet $0 not fount", tablet_id));
|
||||
}
|
||||
|
||||
LOG(INFO) << "Start to drop tablet " << tablet_id;
|
||||
VLOG(3) << "Start to drop tablet " << tablet_id;
|
||||
TabletSharedPtr dropped_tablet = it->second;
|
||||
tablet_map.erase(it);
|
||||
_remove_tablet_from_partition(*dropped_tablet);
|
||||
|
|
@ -1598,7 +1600,7 @@ Status TabletManager::_drop_tablet_unlocked(TTabletId tablet_id, TabletDropFlag
|
|||
DCHECK_EQ(kKeepMetaAndFiles, flag);
|
||||
}
|
||||
dropped_tablet->deregister_tablet_from_dir();
|
||||
LOG(INFO) << "Succeed to drop tablet " << tablet_id;
|
||||
LOG(INFO) << "Succeed to drop tablet " << tablet_id << ", cost " << (MonotonicMillis() - start_ms) << "ms.";
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -595,7 +595,7 @@ void TabletMeta::remove_delete_predicate_by_version(const Version& version) {
|
|||
for (const auto& it : temp.sub_predicates()) {
|
||||
del_cond_str += it + ";";
|
||||
}
|
||||
LOG(INFO) << "remove one del_pred. version=" << temp.version() << ", condition=" << del_cond_str;
|
||||
VLOG(3) << "remove one del_pred. version=" << temp.version() << ", condition=" << del_cond_str;
|
||||
|
||||
// remove delete condition from PB
|
||||
_del_pred_array.SwapElements(ordinal, _del_pred_array.size() - 1);
|
||||
|
|
|
|||
|
|
@ -80,7 +80,7 @@ Status EngineAlterTabletTask::execute() {
|
|||
return res;
|
||||
}
|
||||
|
||||
LOG(INFO) << alter_msg_header << "success to do alter task. base_tablet_id=" << _alter_tablet_req.base_tablet_id;
|
||||
VLOG(2) << alter_msg_header << "success to do alter task. base_tablet_id=" << _alter_tablet_req.base_tablet_id;
|
||||
return Status::OK();
|
||||
} // execute
|
||||
|
||||
|
|
|
|||
|
|
@ -272,7 +272,7 @@ Status EngineBatchLoadTask::_push(const TPushReq& request, std::vector<TTabletIn
|
|||
}
|
||||
|
||||
Status EngineBatchLoadTask::_delete_data(const TPushReq& request, std::vector<TTabletInfo>* tablet_info_vec) {
|
||||
LOG(INFO) << "begin to process delete data. request=" << ThriftDebugString(request);
|
||||
VLOG(3) << "begin to process delete data. request=" << ThriftDebugString(request);
|
||||
StarRocksMetrics::instance()->delete_requests_total.increment(1);
|
||||
|
||||
if (tablet_info_vec == nullptr) {
|
||||
|
|
@ -297,7 +297,7 @@ Status EngineBatchLoadTask::_delete_data(const TPushReq& request, std::vector<TT
|
|||
return res;
|
||||
}
|
||||
|
||||
LOG(INFO) << "Finish to delete data. tablet:" << tablet->full_name();
|
||||
VLOG(3) << "Finish to delete data. tablet:" << tablet->full_name();
|
||||
return res;
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -342,7 +342,7 @@ public class CheckConsistencyJob {
|
|||
}
|
||||
|
||||
if (isConsistent) {
|
||||
LOG.info("tablet[{}] is consistent: {}", tabletId, checksumMap.keySet());
|
||||
LOG.debug("tablet[{}] is consistent: {}", tabletId, checksumMap.keySet());
|
||||
} else {
|
||||
StringBuilder sb = new StringBuilder();
|
||||
sb.append("tablet[").append(tabletId).append("] is not consistent: ");
|
||||
|
|
|
|||
|
|
@ -408,7 +408,7 @@ public class ConsistencyChecker extends FrontendDaemon {
|
|||
if (jobs.containsKey(job.getTabletId())) {
|
||||
return false;
|
||||
} else {
|
||||
LOG.info("add tablet[{}] to check consistency", job.getTabletId());
|
||||
LOG.debug("add tablet[{}] to check consistency", job.getTabletId());
|
||||
jobs.put(job.getTabletId(), job);
|
||||
return true;
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1299,12 +1299,10 @@ public class ReportHandler extends Daemon implements MemoryTrackable {
|
|||
}
|
||||
|
||||
private static void addDropReplicaTask(AgentBatchTask batchTask, long backendId,
|
||||
long tabletId, int schemaHash, String reason, boolean force) {
|
||||
long tabletId, int schemaHash, boolean force) {
|
||||
DropReplicaTask task =
|
||||
new DropReplicaTask(backendId, tabletId, schemaHash, force);
|
||||
batchTask.addTask(task);
|
||||
LOG.info("delete tablet[{}] from backend[{}] because {}",
|
||||
tabletId, backendId, reason);
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
|
|
@ -1344,7 +1342,8 @@ public class ReportHandler extends Daemon implements MemoryTrackable {
|
|||
// continue to report them to FE forever and add some processing overhead(the tablet report
|
||||
// process is protected with DB S lock).
|
||||
addDropReplicaTask(batchTask, backendId, tabletId,
|
||||
-1 /* Unknown schema hash */, "not found in meta", invertedIndex.tabletForceDelete(tabletId, backendId));
|
||||
-1 /* Unknown schema hash */, invertedIndex.tabletForceDelete(tabletId, backendId));
|
||||
LOG.debug("delete tablet[{}] from backend[{}] because not found in meta", tabletId, backendId);
|
||||
if (!FeConstants.runningUnitTest) {
|
||||
invertedIndex.eraseTabletForceDelete(tabletId, backendId);
|
||||
}
|
||||
|
|
@ -1389,10 +1388,11 @@ public class ReportHandler extends Daemon implements MemoryTrackable {
|
|||
|
||||
if (needDelete && maxTaskSendPerBe > 0) {
|
||||
// drop replica
|
||||
String reason = "invalid meta, " +
|
||||
(errorMsgAddingReplica != null ? errorMsgAddingReplica : "replica unhealthy");
|
||||
addDropReplicaTask(batchTask, backendId, tabletId, backendTabletInfo.getSchema_hash(),
|
||||
"invalid meta, " +
|
||||
(errorMsgAddingReplica != null ? errorMsgAddingReplica : "replica unhealthy"),
|
||||
true);
|
||||
LOG.info("delete tablet[{}] from backend[{}] because {}", tabletId, backendId, reason);
|
||||
++deleteFromBackendCounter;
|
||||
--maxTaskSendPerBe;
|
||||
}
|
||||
|
|
|
|||
|
|
@ -473,11 +473,11 @@ public class LoadMgr implements MemoryTrackable {
|
|||
// remove the ones that finished earlier
|
||||
int numJobsToRemove = idToLoadJob.size() - Config.label_keep_max_num;
|
||||
if (numJobsToRemove > 0) {
|
||||
LOG.info("remove {} jobs from {}", numJobsToRemove, jobs.size());
|
||||
LOG.debug("remove {} jobs from {}", numJobsToRemove, jobs.size());
|
||||
Iterator<LoadJob> iterator = jobs.iterator();
|
||||
for (int i = 0; i != numJobsToRemove && iterator.hasNext(); ++i) {
|
||||
LoadJob job = iterator.next();
|
||||
LOG.info("remove redundant job: {}", job.getLabel());
|
||||
LOG.debug("remove redundant job: {}", job.getLabel());
|
||||
unprotectedRemoveJobReleatedMeta(job);
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -223,7 +223,7 @@ public class PushTask extends AgentTask {
|
|||
public void countDownLatch(long backendId, long tabletId) {
|
||||
if (this.latch != null) {
|
||||
if (latch.markedCountDown(backendId, tabletId)) {
|
||||
LOG.info("pushTask current latch count: {}. backend: {}, tablet:{}",
|
||||
LOG.debug("pushTask current latch count: {}. backend: {}, tablet:{}",
|
||||
latch.getCount(), backendId, tabletId);
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -462,7 +462,7 @@ public class TabletTaskExecutor {
|
|||
batchTaskMap.put(backendId, batchTask);
|
||||
}
|
||||
batchTask.addTask(dropTask);
|
||||
LOG.info("delete tablet[{}] from backend[{}] because table {}-{} is dropped",
|
||||
LOG.debug("delete tablet[{}] from backend[{}] because table {}-{} is dropped",
|
||||
tabletId, backendId, olapTable.getId(), olapTable.getName());
|
||||
} // end for replicas
|
||||
} // end for tablets
|
||||
|
|
|
|||
|
|
@ -37,14 +37,14 @@ public class TxnStateCallbackFactory {
|
|||
return false;
|
||||
}
|
||||
callbacks.put(callback.getId(), callback);
|
||||
LOG.info("add callback of txn state : {}. current callback size: {}",
|
||||
LOG.debug("add callback of txn state : {}. current callback size: {}",
|
||||
callback.getId(), callbacks.size());
|
||||
return true;
|
||||
}
|
||||
|
||||
public synchronized void removeCallback(long id) {
|
||||
if (callbacks.remove(id) != null) {
|
||||
LOG.info("remove callback of txn state : {}. current callback size: {}",
|
||||
LOG.debug("remove callback of txn state : {}. current callback size: {}",
|
||||
id, callbacks.size());
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in New Issue