[BugFix] Disable sync_publish for shadow tablet (backport #61887) (#61941)

Signed-off-by: sevev <qiangzh95@gmail.com>
Co-authored-by: zhangqiang <qiangzh95@gmail.com>
This commit is contained in:
mergify[bot] 2025-08-15 09:51:39 +08:00 committed by GitHub
parent f96b93e208
commit 3635b317d8
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
13 changed files with 50 additions and 34 deletions

View File

@ -49,6 +49,7 @@ struct TabletPublishVersionTask {
// or 0 which means tablet not found or publish task cannot be submitted
int64_t max_continuous_version{0};
bool is_double_write{false};
bool is_shadow{false};
};
void run_publish_version_task(ThreadPoolToken* token, const TPublishVersionRequest& publish_version_req,
@ -91,7 +92,7 @@ void run_publish_version_task(ThreadPoolToken* token, const TPublishVersionReque
}
}
} else {
std::vector<std::map<TabletInfo, RowsetSharedPtr>> partitions(num_partition);
std::vector<std::map<TabletInfo, std::pair<RowsetSharedPtr, bool>>> partitions(num_partition);
for (size_t i = 0; i < publish_version_req.partition_version_infos.size(); i++) {
StorageEngine::instance()->txn_manager()->get_txn_related_tablets(
transaction_id, publish_version_req.partition_version_infos[i].partition_id, &partitions[i]);
@ -108,7 +109,8 @@ void run_publish_version_task(ThreadPoolToken* token, const TPublishVersionReque
task.partition_id = publish_version_req.partition_version_infos[i].partition_id;
task.tablet_id = itr.first.tablet_id;
task.version = publish_version_req.partition_version_infos[i].version;
task.rowset = std::move(itr.second);
task.rowset = std::move(itr.second.first);
task.is_shadow = itr.second.second;
// rowset can be nullptr if it just prepared but not committed
if (task.rowset != nullptr) {
task.rowset->rowset_meta()->set_gtid(publish_version_req.gtid);
@ -235,10 +237,13 @@ void run_publish_version_task(ThreadPoolToken* token, const TPublishVersionReque
if (st.ok()) {
st = task.st;
}
} else {
} else if (!task.is_shadow) {
auto& pair = tablet_publish_versions.emplace_back();
pair.__set_tablet_id(task.tablet_id);
pair.__set_version(task.version);
} else {
VLOG(1) << "publish_version success tablet:" << task.tablet_id << " version:" << task.version
<< " is_shadow:" << task.is_shadow;
}
}
// return tablet and its version which has already finished.

View File

@ -404,7 +404,8 @@ void DataDir::load() {
}
Status commit_txn_status = _txn_manager->commit_txn(
_kv_store, rowset_meta->partition_id(), rowset_meta->txn_id(), rowset_meta->tablet_id(),
rowset_meta->tablet_schema_hash(), rowset_meta->tablet_uid(), rowset_meta->load_id(), rowset, true);
rowset_meta->tablet_schema_hash(), rowset_meta->tablet_uid(), rowset_meta->load_id(), rowset, true,
tablet->tablet_state() != TABLET_RUNNING);
if (!commit_txn_status.ok() && !commit_txn_status.is_already_exist()) {
LOG(WARNING) << "Fail to add committed rowset=" << rowset_meta->rowset_id()
<< " tablet=" << rowset_meta->tablet_id() << " txn_id: " << rowset_meta->txn_id();

View File

@ -664,6 +664,7 @@ Status DeltaWriter::_build_current_tablet_schema(int64_t index_id, const POlapTa
if (ptable_schema_param->indexes(i).id() == index_id) break;
}
if (i < ptable_schema_param->indexes_size()) {
_is_shadow = ptable_schema_param->indexes(i).is_shadow();
if (ptable_schema_param->indexes_size() > 0 && ptable_schema_param->indexes(i).has_column_param() &&
ptable_schema_param->indexes(i).column_param().columns_desc_size() != 0 &&
ptable_schema_param->indexes(i).column_param().columns_desc(0).unique_id() >= 0 &&
@ -774,7 +775,7 @@ Status DeltaWriter::commit() {
FAIL_POINT_TRIGGER_ASSIGN_STATUS_OR_DEFAULT(
load_commit_txn, res, COMMIT_TXN_FP_ACTION(_opt.txn_id, _opt.tablet_id),
_storage_engine->txn_manager()->commit_txn(_opt.partition_id, _tablet, _opt.txn_id, _opt.load_id,
_cur_rowset, false));
_cur_rowset, false, _is_shadow));
auto commit_txn_ts = watch.elapsed_time();
if (!res.ok()) {

View File

@ -288,6 +288,7 @@ private:
int64_t _write_buffer_size = 0;
DeltaWriterStat _stats;
bool _is_shadow = false;
};
} // namespace starrocks

View File

@ -167,7 +167,7 @@ Status PushHandler::_do_streaming_ingestion(TabletSharedPtr tablet, const TPushR
}
Status commit_status = StorageEngine::instance()->txn_manager()->commit_txn(
request.partition_id, tablet_var.tablet, request.transaction_id, load_id, tablet_var.rowset_to_add,
false);
false, false);
if (!commit_status.ok() && !commit_status.is_already_exist()) {
LOG(WARNING) << "fail to commit txn. res=" << commit_status << ", table=" << tablet->full_name()
<< ", txn_id: " << request.transaction_id;

View File

@ -687,7 +687,7 @@ void StorageEngine::clear_transaction_task(const TTransactionId transaction_id,
LOG(INFO) << "Clearing transaction task txn_id: " << transaction_id;
for (const TPartitionId& partition_id : partition_ids) {
std::map<TabletInfo, RowsetSharedPtr> tablet_infos;
std::map<TabletInfo, std::pair<RowsetSharedPtr, bool>> tablet_infos;
StorageEngine::instance()->txn_manager()->get_txn_related_tablets(transaction_id, partition_id, &tablet_infos);
// each tablet

View File

@ -146,11 +146,12 @@ Status TxnManager::prepare_txn(TPartitionId partition_id, const TabletSharedPtr&
}
Status TxnManager::commit_txn(TPartitionId partition_id, const TabletSharedPtr& tablet, TTransactionId transaction_id,
const PUniqueId& load_id, const RowsetSharedPtr& rowset_ptr, bool is_recovery) {
const PUniqueId& load_id, const RowsetSharedPtr& rowset_ptr, bool is_recovery,
bool is_shadow) {
auto scoped =
trace::Scope(Tracer::Instance().start_trace_txn_tablet("txn_commit", transaction_id, tablet->tablet_id()));
return commit_txn(tablet->data_dir()->get_meta(), partition_id, transaction_id, tablet->tablet_id(),
tablet->schema_hash(), tablet->tablet_uid(), load_id, rowset_ptr, is_recovery);
tablet->schema_hash(), tablet->tablet_uid(), load_id, rowset_ptr, is_recovery, is_shadow);
}
// delete the txn from manager if it is not committed(not have a valid rowset)
@ -206,7 +207,7 @@ Status TxnManager::prepare_txn(TPartitionId partition_id, TTransactionId transac
// not found load id
// case 1: user start a new txn, rowset_ptr = null
// case 2: loading txn from meta env
TabletTxnInfo load_info(load_id, nullptr);
TabletTxnInfo load_info(load_id, nullptr, false);
txn_tablet_map[key][tablet_info] = load_info;
_insert_txn_partition_map_unlocked(transaction_id, partition_id);
@ -217,7 +218,8 @@ Status TxnManager::prepare_txn(TPartitionId partition_id, TTransactionId transac
Status TxnManager::commit_txn(KVStore* meta, TPartitionId partition_id, TTransactionId transaction_id,
TTabletId tablet_id, SchemaHash schema_hash, const TabletUid& tablet_uid,
const PUniqueId& load_id, const RowsetSharedPtr& rowset_ptr, bool is_recovery) {
const PUniqueId& load_id, const RowsetSharedPtr& rowset_ptr, bool is_recovery,
bool is_shadow) {
if (partition_id < 1 || transaction_id < 1 || tablet_id < 1) {
LOG(FATAL) << "Invalid commit req "
<< " partition_id=" << partition_id << " txn_id: " << transaction_id << " tablet_id=" << tablet_id;
@ -242,6 +244,7 @@ Status TxnManager::commit_txn(KVStore* meta, TPartitionId partition_id, TTransac
// found load for txn,tablet
// case 1: user commit rowset, then the load id must be equal
TabletTxnInfo& load_info = load_itr->second;
load_info.is_shadow = is_shadow;
// check if load id is equal
if (load_info.load_id.hi() == load_id.hi() && load_info.load_id.lo() == load_id.lo() &&
load_info.rowset != nullptr && load_info.rowset->rowset_id() == rowset_ptr->rowset_id()) {
@ -312,13 +315,14 @@ Status TxnManager::commit_txn(KVStore* meta, TPartitionId partition_id, TTransac
auto& tablet_txn_infos = txn_tablet_map[key];
auto itr = tablet_txn_infos.find(tablet_info);
if (itr == tablet_txn_infos.end()) {
TabletTxnInfo info(load_id, rowset_ptr);
TabletTxnInfo info(load_id, rowset_ptr, is_shadow);
info.commit_time = UnixSeconds();
tablet_txn_infos[tablet_info] = info;
} else {
itr->second.load_id = load_id;
itr->second.rowset = rowset_ptr;
itr->second.commit_time = UnixSeconds();
itr->second.is_shadow = is_shadow;
}
// [tablet_info] = load_info;
_insert_txn_partition_map_unlocked(transaction_id, partition_id);
@ -647,7 +651,7 @@ void TxnManager::force_rollback_tablet_related_txns(KVStore* meta, TTabletId tab
}
void TxnManager::get_txn_related_tablets(const TTransactionId transaction_id, TPartitionId partition_id,
std::map<TabletInfo, RowsetSharedPtr>* tablet_infos) {
std::map<TabletInfo, std::pair<RowsetSharedPtr, bool>>* tablet_infos) {
// get tablets in this transaction
pair<int64_t, int64_t> key(partition_id, transaction_id);
std::shared_lock txn_rdlock(_get_txn_map_lock(transaction_id));
@ -665,7 +669,7 @@ void TxnManager::get_txn_related_tablets(const TTransactionId transaction_id, TP
const TabletInfo& tablet_info = load_info.first;
// must not check rowset == null here, because if rowset == null
// publish version should failed
tablet_infos->emplace(tablet_info, load_info.second.rowset);
tablet_infos->emplace(tablet_info, std::make_pair(load_info.second.rowset, load_info.second.is_shadow));
}
}

View File

@ -76,9 +76,13 @@ struct TabletTxnInfo {
RowsetSharedPtr rowset;
int64_t creation_time{0};
int64_t commit_time{0};
bool is_shadow{false};
TabletTxnInfo(PUniqueId load_id, RowsetSharedPtr rowset)
: load_id(std::move(load_id)), rowset(std::move(rowset)), creation_time(UnixSeconds()) {}
TabletTxnInfo(PUniqueId load_id, RowsetSharedPtr rowset, bool is_shadow)
: load_id(std::move(load_id)),
rowset(std::move(rowset)),
creation_time(UnixSeconds()),
is_shadow(is_shadow) {}
TabletTxnInfo() = default;
};
@ -94,7 +98,7 @@ public:
const PUniqueId& load_id);
Status commit_txn(TPartitionId partition_id, const TabletSharedPtr& tablet, TTransactionId transaction_id,
const PUniqueId& load_id, const RowsetSharedPtr& rowset_ptr, bool is_recovery);
const PUniqueId& load_id, const RowsetSharedPtr& rowset_ptr, bool is_recovery, bool is_shadow);
Status publish_txn(TPartitionId partition_id, const TabletSharedPtr& tablet, TTransactionId transaction_id,
int64_t version, const RowsetSharedPtr& rowset, uint32_t wait_time = 0,
@ -123,7 +127,7 @@ public:
Status commit_txn(KVStore* meta, TPartitionId partition_id, TTransactionId transaction_id, TTabletId tablet_id,
SchemaHash schema_hash, const TabletUid& tablet_uid, const PUniqueId& load_id,
const RowsetSharedPtr& rowset_ptr, bool is_recovery);
const RowsetSharedPtr& rowset_ptr, bool is_recovery, bool is_shadow);
// delete the txn from manager if it is not committed(not have a valid rowset)
Status rollback_txn(TPartitionId partition_id, TTransactionId transaction_id, TTabletId tablet_id,
@ -139,7 +143,7 @@ public:
int64_t* partition_id, std::set<int64_t>* transaction_ids);
void get_txn_related_tablets(const TTransactionId transaction_id, TPartitionId partition_ids,
std::map<TabletInfo, RowsetSharedPtr>* tablet_infos);
std::map<TabletInfo, std::pair<RowsetSharedPtr, bool>>* tablet_infos);
void get_all_related_tablets(std::set<TabletInfo>* tablet_infos);

View File

@ -236,14 +236,14 @@ TEST_F(PublishVersionTaskTest, test_publish_version) {
ASSERT_TRUE(st.ok()) << st.to_string();
}
std::map<TabletInfo, RowsetSharedPtr> tablet_related_rs;
std::map<TabletInfo, std::pair<RowsetSharedPtr, bool>> tablet_related_rs;
StorageEngine::instance()->txn_manager()->get_txn_related_tablets(2222, 10, &tablet_related_rs);
ASSERT_EQ(1, tablet_related_rs.size());
TVersion version = 3;
// publish version for txn
auto tablet = tablet_manager->get_tablet(12345);
for (auto& tablet_rs : tablet_related_rs) {
const RowsetSharedPtr& rowset = tablet_rs.second;
const RowsetSharedPtr& rowset = tablet_rs.second.first;
auto st = StorageEngine::instance()->txn_manager()->publish_txn(10, tablet, 2222, version, rowset);
// success because the related transaction is GCed
ASSERT_TRUE(st.ok()) << st.to_string();

View File

@ -841,11 +841,11 @@ TEST_F(SchemaChangeTest, overlapping_direct_schema_change) {
auto base_tablet = _tablet_mgr->get_tablet(base_tablet_id);
{
std::map<TabletInfo, RowsetSharedPtr> tablet_related_rs;
std::map<TabletInfo, std::pair<RowsetSharedPtr, bool>> tablet_related_rs;
_txn_mgr->get_txn_related_tablets(txn_id, partition_id, &tablet_related_rs);
for (auto& tablet_rs : tablet_related_rs) {
ASSERT_OK(
_txn_mgr->publish_txn(partition_id, base_tablet, txn_id, version.second, tablet_rs.second, 10000));
ASSERT_OK(_txn_mgr->publish_txn(partition_id, base_tablet, txn_id, version.second, tablet_rs.second.first,
10000));
}
}

View File

@ -180,11 +180,11 @@ public:
}
Status get_prepared_rowset(int64_t tablet_id, int64_t txn_id, int64_t partition_id, RowsetSharedPtr* rowset) {
std::map<TabletInfo, RowsetSharedPtr> tablet_infos;
std::map<TabletInfo, std::pair<RowsetSharedPtr, bool>> tablet_infos;
StorageEngine::instance()->txn_manager()->get_txn_related_tablets(txn_id, partition_id, &tablet_infos);
for (auto& [tablet_info, rs] : tablet_infos) {
if (tablet_info.tablet_id == tablet_id) {
(*rowset) = rs;
(*rowset) = rs.first;
return Status::OK();
}
}

View File

@ -3912,7 +3912,7 @@ TEST_F(TabletUpdatesTest, test_skip_schema) {
ASSERT_TRUE(StorageEngine::instance()
->txn_manager()
->commit_txn(_tablet->data_dir()->get_meta(), 100, 100, _tablet->tablet_id(),
_tablet->schema_hash(), _tablet->tablet_uid(), load_id, rs1, false)
_tablet->schema_hash(), _tablet->tablet_uid(), load_id, rs1, false, false)
.ok());
ASSERT_EQ(true, rs1->rowset_meta()->skip_tablet_schema());
ASSERT_EQ(1, _tablet->committed_rowset_size());
@ -3951,7 +3951,7 @@ TEST_F(TabletUpdatesTest, test_skip_schema) {
ASSERT_TRUE(StorageEngine::instance()
->txn_manager()
->commit_txn(_tablet->data_dir()->get_meta(), 101, 101, _tablet->tablet_id(),
_tablet->schema_hash(), _tablet->tablet_uid(), load_id, rs2, false)
_tablet->schema_hash(), _tablet->tablet_uid(), load_id, rs2, false, false)
.ok());
ASSERT_EQ(true, rs2->rowset_meta()->skip_tablet_schema());
ASSERT_EQ(1, _tablet->committed_rowset_size());
@ -4000,7 +4000,7 @@ TEST_F(TabletUpdatesTest, test_skip_schema) {
ASSERT_TRUE(StorageEngine::instance()
->txn_manager()
->commit_txn(_tablet->data_dir()->get_meta(), 102, 102, _tablet->tablet_id(),
_tablet->schema_hash(), _tablet->tablet_uid(), load_id, rs3, false)
_tablet->schema_hash(), _tablet->tablet_uid(), load_id, rs3, false, false)
.ok());
ASSERT_EQ(false, rs3->rowset_meta()->skip_tablet_schema());
ASSERT_EQ(0, _tablet->committed_rowset_size());
@ -4025,7 +4025,7 @@ TEST_F(TabletUpdatesTest, test_skip_schema) {
ASSERT_TRUE(StorageEngine::instance()
->txn_manager()
->commit_txn(_tablet->data_dir()->get_meta(), 103, 103, _tablet->tablet_id(),
_tablet->schema_hash(), _tablet->tablet_uid(), load_id, rs4, false)
_tablet->schema_hash(), _tablet->tablet_uid(), load_id, rs4, false, false)
.ok());
ASSERT_EQ(true, rs4->rowset_meta()->skip_tablet_schema());
ASSERT_EQ(1, _tablet->committed_rowset_size());

View File

@ -489,14 +489,14 @@ TEST_F(EngineStorageMigrationTaskTest, test_concurrent_ingestion_and_migration)
tablet_manager->start_trash_sweep();
starrocks::StorageEngine::instance()->_clean_unused_txns();
std::map<TabletInfo, RowsetSharedPtr> tablet_related_rs;
std::map<TabletInfo, std::pair<RowsetSharedPtr, bool>> tablet_related_rs;
StorageEngine::instance()->txn_manager()->get_txn_related_tablets(2222, 10, &tablet_related_rs);
ASSERT_EQ(1, tablet_related_rs.size());
TVersion version = 3;
// publish version for txn
auto tablet = tablet_manager->get_tablet(12345);
for (auto& tablet_rs : tablet_related_rs) {
const RowsetSharedPtr& rowset = tablet_rs.second;
const RowsetSharedPtr& rowset = tablet_rs.second.first;
auto st = StorageEngine::instance()->txn_manager()->publish_txn(10, tablet, 2222, version, rowset);
// success because the related transaction is GCed
ASSERT_TRUE(st.ok());
@ -566,14 +566,14 @@ TEST_F(EngineStorageMigrationTaskTest, test_concurrent_ingestion_and_migration_p
tablet_manager->start_trash_sweep();
starrocks::StorageEngine::instance()->_clean_unused_txns();
std::map<TabletInfo, RowsetSharedPtr> tablet_related_rs;
std::map<TabletInfo, std::pair<RowsetSharedPtr, bool>> tablet_related_rs;
StorageEngine::instance()->txn_manager()->get_txn_related_tablets(4444, 90, &tablet_related_rs);
ASSERT_EQ(1, tablet_related_rs.size());
TVersion version = 5;
// publish version for txn
auto tablet = tablet_manager->get_tablet(99999);
for (auto& tablet_rs : tablet_related_rs) {
const RowsetSharedPtr& rowset = tablet_rs.second;
const RowsetSharedPtr& rowset = tablet_rs.second.first;
auto st = StorageEngine::instance()->txn_manager()->publish_txn(90, tablet, 4444, version, rowset);
// success because the related transaction is GCed
ASSERT_TRUE(st.ok());