[BugFix] Fix ApplyCommitTask loss in concurrent scenarios. (#60633)

Signed-off-by: edwinhzhang <edwinhzhang@tencent.com>
Signed-off-by: sevev <qiangzh95@gmail.com>
Signed-off-by: zhangqiang <qiangzh95@gmail.com>
Co-authored-by: sevev <qiangzh95@gmail.com>
This commit is contained in:
zhanghe 2025-09-19 10:10:25 +08:00 committed by GitHub
parent 2fc227d19a
commit 5c2aaaf717
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
1 changed files with 21 additions and 8 deletions

View File

@ -986,9 +986,9 @@ void TabletUpdates::do_apply() {
SCOPED_THREAD_LOCAL_SINGLETON_CHECK_MEM_TRACKER_SETTER(
config::enable_pk_strict_memcheck ? StorageEngine::instance()->update_manager()->mem_tracker() : nullptr);
// only 1 thread at max is running this method
bool first = true;
bool apply_operation_performed = false;
Status apply_st;
while (!_apply_stopped) {
Status apply_st;
const EditVersionInfo* version_info_apply = nullptr;
{
std::lock_guard rl(_lock);
@ -997,7 +997,7 @@ void TabletUpdates::do_apply() {
break;
}
if (_apply_version_idx + 1 >= _edit_version_infos.size()) {
if (first) {
if (!apply_operation_performed) {
LOG(WARNING) << "illegal state: do_apply should not be called when there is "
"nothing to apply: "
<< _debug_string(false);
@ -1015,16 +1015,18 @@ void TabletUpdates::do_apply() {
apply_st = _apply_rowset_commit(*version_info_apply);
}
StarRocksMetrics::instance()->update_rowset_commit_apply_duration_us.increment(duration_ns / 1000);
apply_operation_performed = true;
} else if (version_info_apply->compaction) {
// _compaction_running may be false after BE restart, reset it to true
_compaction_running = true;
apply_st = _apply_compaction_commit(*version_info_apply);
apply_operation_performed = true;
} else {
std::string msg = strings::Substitute("bad EditVersionInfo tablet: $0 ", _tablet.tablet_id());
LOG(ERROR) << msg;
_set_error(msg);
}
first = false;
// submit a delay apply task to storage_engine
if (config::enable_retry_apply && _is_retryable(apply_st) && !apply_st.ok()) {
//reset pk index, reset rowset_update_states, reset compaction_state
@ -1057,10 +1059,21 @@ void TabletUpdates::do_apply() {
}
}
}
std::lock_guard<std::mutex> lg(_apply_running_lock);
DCHECK(_apply_running) << "illegal state: _apply_running should be true";
_apply_running = false;
_apply_stopped_cond.notify_all();
{
std::lock_guard<std::mutex> lg(_apply_running_lock);
DCHECK(_apply_running) << "illegal state: _apply_running should be true";
_apply_running = false;
_apply_stopped_cond.notify_all();
}
{
std::lock_guard rl(_lock);
// if apply_st is not ok, which means the apply task is failed, we should not submit a new apply task.
if (apply_st.ok() && apply_operation_performed) {
_check_for_apply();
}
}
}
void TabletUpdates::_wait_apply_done() {