[BugFix] Revert PR #59009 (backport #59815) (#59827)

Signed-off-by: sevev <qiangzh95@gmail.com>
Co-authored-by: zhangqiang <qiangzh95@gmail.com>
This commit is contained in:
mergify[bot] 2025-06-11 23:51:30 +08:00 committed by GitHub
parent bcc26d08dc
commit 10d7323077
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 19 additions and 326 deletions

View File

@ -1689,6 +1689,4 @@ CONF_mInt64(split_exchanger_buffer_chunk_num, "1000");
// when to split hashmap/hashset into two level hashmap/hashset, negative number means use default value
CONF_mInt64(two_level_memory_threshold, "-1");
CONF_mInt32(max_update_tablet_version_internal_ms, "5000");
} // namespace starrocks::config

View File

@ -15,15 +15,12 @@
#include "publish_version_manager.h"
#include "agent/finish_task.h"
#include "agent/master_info.h"
#include "agent/task_signatures_manager.h"
#include "common/config.h"
#include "runtime/client_cache.h"
#include "storage/storage_engine.h"
#include "storage/tablet.h"
#include "storage/tablet_manager.h"
#include "util/cpu_info.h"
#include "util/thrift_rpc_helper.h"
namespace starrocks {
const int MIN_FINISH_PUBLISH_WORKER_COUNT = 8;
@ -85,8 +82,8 @@ bool PublishVersionManager::_all_task_applied(const TFinishTaskRequest& finish_t
return all_task_applied;
}
size_t PublishVersionManager::_left_task_applied(const TFinishTaskRequest& finish_task_request) {
size_t unapplied_tablet_num = 0;
bool PublishVersionManager::_left_task_applied(const TFinishTaskRequest& finish_task_request) {
bool applied = true;
int64_t signature = finish_task_request.signature;
std::set<std::pair<int64_t, int64_t>> unapplied_tablet;
auto iter = _unapplied_tablet_by_txn.find(signature);
@ -103,19 +100,19 @@ size_t PublishVersionManager::_left_task_applied(const TFinishTaskRequest& finis
continue;
}
if (tablet->max_readable_version() < request_version) {
unapplied_tablet_num++;
applied = false;
unapplied_tablet.insert(std::make_pair(tablet_id, request_version));
}
VLOG(2) << "tablet: " << tablet->tablet_id() << " max_readable_version is "
<< tablet->max_readable_version() << ", request_version is " << request_version;
}
}
if (unapplied_tablet_num > 0) {
if (!applied) {
iter->second.swap(unapplied_tablet);
} else {
_unapplied_tablet_by_txn.erase(signature);
}
return unapplied_tablet_num;
return applied;
}
void PublishVersionManager::wait_publish_task_apply_finish(std::vector<TFinishTaskRequest> finish_task_requests) {
@ -124,17 +121,14 @@ void PublishVersionManager::wait_publish_task_apply_finish(std::vector<TFinishTa
if (_all_task_applied(finish_task_requests[i])) {
_finish_task_requests[finish_task_requests[i].signature] = std::move(finish_task_requests[i]);
} else {
FinishTaskInfo info;
info.last_report_time = MonotonicMillis();
info.not_report_tablet_num = finish_task_requests[i].tablet_publish_versions.size();
info.request = std::move(finish_task_requests[i]);
_waitting_finish_task_requests[finish_task_requests[i].signature] = std::move(info);
_waitting_finish_task_requests[finish_task_requests[i].signature] = std::move(finish_task_requests[i]);
}
}
DCHECK(has_pending_task());
}
void PublishVersionManager::update_tablet_version(std::vector<TTabletVersionPair>& tablet_versions) {
void PublishVersionManager::update_tablet_version(TFinishTaskRequest& finish_task_request) {
auto& tablet_versions = finish_task_request.tablet_versions;
for (int32_t i = 0; i < tablet_versions.size(); i++) {
int64_t tablet_id = tablet_versions[i].tablet_id;
TabletSharedPtr tablet = StorageEngine::instance()->tablet_manager()->get_tablet(tablet_id);
@ -154,7 +148,7 @@ void PublishVersionManager::finish_publish_version_task() {
// submit finish task
st = _finish_publish_version_thread_pool->submit_func(
[this, finish_request = std::move(finish_task_request)]() mutable {
update_tablet_version(finish_request.tablet_versions);
update_tablet_version(finish_request);
#ifndef BE_TEST
finish_task(finish_request);
#endif
@ -166,12 +160,11 @@ void PublishVersionManager::finish_publish_version_task() {
}
std::vector<int64_t> clear_txn;
for (auto& [signature, finish_task_info] : _waitting_finish_task_requests) {
size_t unapplied_tablet_num = _left_task_applied(finish_task_info.request);
if (unapplied_tablet_num == 0) {
for (auto& [signature, finish_task_request] : _waitting_finish_task_requests) {
if (_left_task_applied(finish_task_request)) {
st = _finish_publish_version_thread_pool->submit_func(
[this, finish_request = std::move(finish_task_info.request)]() mutable {
update_tablet_version(finish_request.tablet_versions);
[this, finish_request = std::move(finish_task_request)]() mutable {
update_tablet_version(finish_request);
#ifndef BE_TEST
finish_task(finish_request);
#endif
@ -180,41 +173,6 @@ void PublishVersionManager::finish_publish_version_task() {
if (st.ok()) {
erase_waitting_finish_task_signature.emplace_back(signature);
}
} else {
size_t not_report_tablet_num = finish_task_info.not_report_tablet_num;
if (unapplied_tablet_num < not_report_tablet_num &&
MonotonicMillis() - finish_task_info.last_report_time >
config::max_update_tablet_version_internal_ms) {
VLOG(2) << "unapplied_tablet_num: " << unapplied_tablet_num
<< ", not_report_tablet_num: " << not_report_tablet_num
<< ", report_internal_ms: " << MonotonicMillis() - finish_task_info.last_report_time
<< ", allow_internla_ms: " << config::max_update_tablet_version_internal_ms;
finish_task_info.not_report_tablet_num = unapplied_tablet_num;
finish_task_info.last_report_time = MonotonicMillis();
TUpdateTabletVersionRequest update_request;
update_request.__set_backend(finish_task_info.request.backend);
update_request.__set_signature(signature);
update_request.__set_tablet_versions(finish_task_info.request.tablet_versions);
st = _finish_publish_version_thread_pool->submit_func(
[this, request = std::move(update_request)]() mutable {
update_tablet_version(request.tablet_versions);
TNetworkAddress master_addr = get_master_address();
TUpdateTabletVersionResult result;
auto st = ThriftRpcHelper::rpc<FrontendServiceClient>(
master_addr.hostname, master_addr.port,
[&request, &result](FrontendServiceConnection& client) {
client->updateTabletVersion(result, request);
});
if (!st.ok()) {
LOG(WARNING) << "updateTabletVersion failed: " << st
<< ", signature: " << request.signature;
}
});
if (!st.ok()) {
LOG(WARNING) << "submit report tablet version task failed";
}
}
}
}
for (auto& signature : erase_finish_task_signature) {

View File

@ -27,12 +27,6 @@ namespace starrocks {
using FinishTaskRequestPtr = std::shared_ptr<TFinishTaskRequest>;
struct FinishTaskInfo {
TFinishTaskRequest request;
int64_t last_report_time;
size_t not_report_tablet_num;
};
class PublishVersionManager {
public:
Status init();
@ -40,20 +34,20 @@ public:
void wait_publish_task_apply_finish(std::vector<TFinishTaskRequest> finish_task_requests);
bool has_pending_task() { return !_finish_task_requests.empty() || !_waitting_finish_task_requests.empty(); }
void finish_publish_version_task();
void update_tablet_version(std::vector<TTabletVersionPair>& tablet_versions);
void update_tablet_version(TFinishTaskRequest& finish_task_request);
size_t finish_task_requests_size() { return _finish_task_requests.size(); }
size_t waitting_finish_task_requests_size() { return _waitting_finish_task_requests.size(); }
private:
bool _all_task_applied(const TFinishTaskRequest& finish_task_request);
size_t _left_task_applied(const TFinishTaskRequest& finish_task_request);
bool _left_task_applied(const TFinishTaskRequest& finish_task_request);
private:
mutable std::mutex _lock;
std::map<int64_t, TFinishTaskRequest> _finish_task_requests;
std::map<int64_t, FinishTaskInfo> _waitting_finish_task_requests;
std::map<int64_t, TFinishTaskRequest> _waitting_finish_task_requests;
std::map<int64_t, std::set<std::pair<int64_t, int64_t>>> _unapplied_tablet_by_txn;
std::unique_ptr<ThreadPool> _finish_publish_version_thread_pool;
};

View File

@ -237,34 +237,16 @@ TEST_F(PublishVersionManagerTest, test_publish_task) {
_tablet->updates()->stop_apply(true);
auto rs1 = create_rowset(_tablet, keys);
ASSERT_TRUE(_tablet->rowset_commit(3, rs1).ok());
auto tablet1 = create_tablet(rand(), rand());
{
auto rs0 = create_rowset(tablet1, keys);
ASSERT_TRUE(tablet1->rowset_commit(2, rs0).ok());
auto rs1 = create_rowset(tablet1, keys);
ASSERT_TRUE(tablet1->rowset_commit(3, rs1).ok());
}
std::vector<TFinishTaskRequest> finish_task_requests;
auto& finish_task_request = finish_task_requests.emplace_back();
finish_task_request.signature = 2222;
auto& tablet_publish_versions = finish_task_request.tablet_publish_versions;
{
auto& pair1 = tablet_publish_versions.emplace_back();
pair1.__set_tablet_id(_tablet->tablet_id());
pair1.__set_version(3);
auto& pair2 = tablet_publish_versions.emplace_back();
pair2.__set_tablet_id(tablet1->tablet_id());
pair2.__set_version(3);
}
config::max_update_tablet_version_internal_ms = 1000;
auto& pair = tablet_publish_versions.emplace_back();
pair.__set_tablet_id(_tablet->tablet_id());
pair.__set_version(3);
_publish_version_manager->wait_publish_task_apply_finish(std::move(finish_task_requests));
_finish_publish_version_cv.notify_one();
std::this_thread::sleep_for(std::chrono::seconds(2));
ASSERT_EQ(0, _publish_version_manager->finish_task_requests_size());
ASSERT_EQ(1, _publish_version_manager->waitting_finish_task_requests_size());
_tablet->updates()->stop_apply(false);

View File

@ -147,10 +147,7 @@ import com.starrocks.thrift.TTableReplicationRequest;
import com.starrocks.thrift.TTableReplicationResponse;
import com.starrocks.thrift.TTabletInfo;
import com.starrocks.thrift.TTabletMeta;
import com.starrocks.thrift.TTabletVersionPair;
import com.starrocks.thrift.TTaskType;
import com.starrocks.thrift.TUpdateTabletVersionRequest;
import com.starrocks.thrift.TUpdateTabletVersionResult;
import com.starrocks.transaction.GlobalTransactionMgr;
import com.starrocks.transaction.PartitionCommitInfo;
import com.starrocks.transaction.TabletCommitInfo;
@ -1441,95 +1438,4 @@ public class LeaderImpl {
return response;
}
}
public TUpdateTabletVersionResult updateTabletVersion(TUpdateTabletVersionRequest request) {
TUpdateTabletVersionResult result = new TUpdateTabletVersionResult();
TStatus tStatus = new TStatus(TStatusCode.OK);
result.setStatus(tStatus);
if (!GlobalStateMgr.getCurrentState().isLeader()) {
LOG.warn("current node is not leader, update tablet version failed, signature: {}",
request.getSignature());
tStatus.setStatus_code(TStatusCode.CANCELLED);
tStatus.setError_msgs(Lists.newArrayList("current fe is not leader"));
result.setStatus(tStatus);
return result;
}
TBackend tBackend = request.getBackend();
String host = tBackend.getHost();
int bePort = tBackend.getBe_port();
long backendId;
ComputeNode cn = GlobalStateMgr.getCurrentState().getNodeMgr().getClusterInfo().getBackendWithBePort(host, bePort);
if (cn == null) {
if (RunMode.isSharedDataMode()) {
cn = GlobalStateMgr.getCurrentState().getNodeMgr().getClusterInfo().getComputeNodeWithBePort(host, bePort);
}
if (cn == null) {
tStatus.setStatus_code(TStatusCode.CANCELLED);
tStatus.setError_msgs(Lists.newArrayList("backend not exist."));
LOG.warn("backend does not found. host: {}, be port: {}.", host, bePort);
result.setStatus(tStatus);
return result;
}
}
backendId = cn.getId();
TabletInvertedIndex tablets = GlobalStateMgr.getCurrentState().getTabletInvertedIndex();
List<TTabletVersionPair> tabletVersions = request.getTablet_versions();
List<Long> tabletIds = tabletVersions.stream().map(tv -> tv.tablet_id).collect(Collectors.toList());
List<Replica> replicas = tablets.getReplicasOnBackendByTabletIds(tabletIds, backendId);
if (replicas == null) {
LOG.warn("backend not found or no replicas on backend, backendid={}", backendId);
tStatus.setStatus_code(TStatusCode.CANCELLED);
tStatus.setError_msgs(Lists.newArrayList("no replicas on backend"));
result.setStatus(tStatus);
return result;
}
List<TabletMeta> tabletMetaList = tablets.getTabletMetaList(tabletIds);
Long dbId = null;
Long tableId = null;
if (tabletMetaList.isEmpty()) {
tStatus.setStatus_code(TStatusCode.CANCELLED);
tStatus.setError_msgs(Lists.newArrayList("no tabletMeta found"));
result.setStatus(tStatus);
return result;
}
for (TabletMeta tabletMeta : tabletMetaList) {
if (tabletMeta == null || tabletMeta == TabletInvertedIndex.NOT_EXIST_TABLET_META) {
continue;
}
if (dbId == null) {
dbId = tabletMeta.getDbId();
}
if (tableId == null) {
tableId = tabletMeta.getTableId();
}
if (dbId != tabletMeta.getDbId() || tableId != tabletMeta.getTableId()) {
LOG.warn("Tablets in UpdateTabletVersionRequest from different databases or table");
tStatus.setStatus_code(TStatusCode.CANCELLED);
tStatus.setError_msgs(Lists.newArrayList("tablets in request from different db or table"));
result.setStatus(tStatus);
return result;
}
}
Locker locker = new Locker();
locker.lockTableWithIntensiveDbLock(dbId, tableId, LockType.WRITE);
try {
for (int i = 0; i < tabletVersions.size(); i++) {
TTabletVersionPair tabletVersion = tabletVersions.get(i);
Replica replica = replicas.get(i);
if (replica == null) {
continue;
}
replica.updateVersion(tabletVersion.version);
}
} finally {
locker.unLockTableWithIntensiveDbLock(dbId, tableId, LockType.WRITE);
}
return result;
}
}

View File

@ -352,8 +352,6 @@ import com.starrocks.thrift.TUniqueId;
import com.starrocks.thrift.TUpdateExportTaskStatusRequest;
import com.starrocks.thrift.TUpdateResourceUsageRequest;
import com.starrocks.thrift.TUpdateResourceUsageResponse;
import com.starrocks.thrift.TUpdateTabletVersionRequest;
import com.starrocks.thrift.TUpdateTabletVersionResult;
import com.starrocks.thrift.TUserPrivDesc;
import com.starrocks.thrift.TVerboseVariableRecord;
import com.starrocks.thrift.TWarehouseInfo;
@ -3156,11 +3154,6 @@ public class FrontendServiceImpl implements FrontendService.Iface {
return response;
}
@Override
public TUpdateTabletVersionResult updateTabletVersion(TUpdateTabletVersionRequest request) {
return leaderImpl.updateTabletVersion(request);
}
@NotNull
private static TConnectionInfo getTConnectionInfo(List<String> row) {
TConnectionInfo tConnectionInfo = new TConnectionInfo();

View File

@ -21,23 +21,9 @@ import com.starrocks.catalog.MaterializedIndex;
import com.starrocks.catalog.OlapTable;
import com.starrocks.catalog.PhysicalPartition;
import com.starrocks.catalog.Replica;
import com.starrocks.catalog.Replica.ReplicaState;
import com.starrocks.catalog.TabletInvertedIndex;
import com.starrocks.catalog.TabletMeta;
import com.starrocks.common.jmockit.Deencapsulation;
import com.starrocks.common.util.concurrent.lock.LockType;
import com.starrocks.common.util.concurrent.lock.Locker;
import com.starrocks.lake.LakeTable;
import com.starrocks.lake.LakeTablet;
import com.starrocks.server.GlobalStateMgr;
import com.starrocks.system.Backend;
import com.starrocks.system.SystemInfoService;
import com.starrocks.thrift.TBackend;
import com.starrocks.thrift.TStatusCode;
import com.starrocks.thrift.TStorageMedium;
import com.starrocks.thrift.TTabletVersionPair;
import com.starrocks.thrift.TUpdateTabletVersionRequest;
import com.starrocks.thrift.TUpdateTabletVersionResult;
import mockit.Expectations;
import mockit.Mock;
import mockit.MockUp;
@ -46,8 +32,6 @@ import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import static com.starrocks.catalog.Replica.ReplicaState.NORMAL;
@ -116,114 +100,4 @@ public class LeaderImplTest {
Assert.assertEquals(new Replica(tabletId, backendId, -1, NORMAL), Deencapsulation.invoke(leader, "findRelatedReplica",
olapTable, physicalPartition, backendId, tabletId, indexId));
}
@Test
public void testUpdateTabletVersion() throws Exception {
TUpdateTabletVersionRequest request = new TUpdateTabletVersionRequest();
TUpdateTabletVersionResult result = new TUpdateTabletVersionResult();
result = leader.updateTabletVersion(request);
Assert.assertEquals(result.status.getStatus_code(), TStatusCode.CANCELLED);
Assert.assertEquals("current fe is not leader", result.status.getError_msgs().get(0));
new MockUp<GlobalStateMgr>() {
@Mock
boolean isLeader() {
return true;
}
};
TBackend tBackend = new TBackend("host2", 8000, 1);
request.setBackend(tBackend);
result = leader.updateTabletVersion(request);
Assert.assertEquals(result.status.getStatus_code(), TStatusCode.CANCELLED);
Assert.assertEquals("backend not exist.", result.status.getError_msgs().get(0));
Backend cn = new Backend(10002, "host2", 8000);
new MockUp<SystemInfoService>() {
@Mock
public Backend getBackendWithBePort(String host, int bePort) {
return cn;
}
};
List<TTabletVersionPair> tabletVersions = new ArrayList<>();
TTabletVersionPair pair1 = new TTabletVersionPair();
pair1.setTablet_id(10001L);
pair1.setVersion(4L);
tabletVersions.add(pair1);
TTabletVersionPair pair2 = new TTabletVersionPair();
pair2.setTablet_id(10002L);
pair2.setVersion(5L);
tabletVersions.add(pair2);
request.setTablet_versions(tabletVersions);
result = leader.updateTabletVersion(request);
Assert.assertEquals(result.status.getStatus_code(), TStatusCode.CANCELLED);
Assert.assertEquals("no replicas on backend", result.status.getError_msgs().get(0));
List<TabletMeta> metaList = new ArrayList<>();
List<Replica> replicas = new ArrayList<>();
Replica replica1 = new Replica(1L, cn.getId(), ReplicaState.NORMAL, 3, 0);
Replica replica2 = new Replica(2L, cn.getId(), ReplicaState.NORMAL, 3, 0);
replicas.add(replica1);
replicas.add(replica2);
new MockUp<TabletInvertedIndex>() {
@Mock
public List<Replica> getReplicasOnBackendByTabletIds(List<Long> tabletIds, long backendId) {
return replicas;
}
@Mock
public List<TabletMeta> getTabletMetaList(List<Long> tabletIds) {
return metaList;
}
};
result = leader.updateTabletVersion(request);
Assert.assertEquals(TStatusCode.CANCELLED, result.status.getStatus_code());
Assert.assertEquals("no tabletMeta found", result.status.getError_msgs().get(0));
TabletMeta meta1 = new TabletMeta(1L, 1L, 1L, 1L, 1, TStorageMedium.HDD, false);
TabletMeta meta2 = new TabletMeta(1L, 1L, 1L, 2L, 1, TStorageMedium.HDD, false);
metaList.add(meta1);
metaList.add(meta2);
new MockUp<Locker>() {
@Mock
public void lockTableWithIntensiveDbLock(Long dbId, Long tabletId, LockType lockType) {
return;
}
@Mock
public void unLockTableWithIntensiveDbLock(Long dbId, Long tabletId, LockType lockType) {
return;
}
};
result = leader.updateTabletVersion(request);
Assert.assertEquals(TStatusCode.OK, result.status.getStatus_code());
Assert.assertEquals(4L, replica1.getVersion());
Assert.assertEquals(5L, replica2.getVersion());
TabletMeta wrongMeta = new TabletMeta(2L, 1L, 1L, 1L, 1, TStorageMedium.HDD, false);
List<TabletMeta> wrongMetaList = new ArrayList<>();
wrongMetaList.add(meta1);
wrongMetaList.add(wrongMeta);
new MockUp<TabletInvertedIndex>() {
@Mock
public List<TabletMeta> getTabletMetaList(List<Long> tabletIds) {
return wrongMetaList;
}
};
result = leader.updateTabletVersion(request);
Assert.assertEquals(TStatusCode.CANCELLED, result.status.getStatus_code());
Assert.assertEquals("tablets in request from different db or table",
result.status.getError_msgs().get(0));
}
}

View File

@ -2115,16 +2115,6 @@ struct TGetApplicableRolesResponse {
2: optional i64 next_table_id_offset;
}
struct TUpdateTabletVersionRequest {
1: optional Types.TBackend backend;
2: optional i64 signature;
3: optional list<MasterService.TTabletVersionPair> tablet_versions;
}
struct TUpdateTabletVersionResult {
1: optional Status.TStatus status;
}
service FrontendService {
TGetDbsResult getDbNames(1:TGetDbsParams params)
TGetTablesResult getTableNames(1:TGetTablesParams params)
@ -2264,7 +2254,5 @@ service FrontendService {
TGetWarehouseMetricsRespone getWarehouseMetrics(1: TGetWarehouseMetricsRequest request)
TGetWarehouseQueriesResponse getWarehouseQueries(1: TGetWarehouseQueriesRequest request)
TUpdateTabletVersionResult updateTabletVersion(1: TUpdateTabletVersionRequest request)
}