[BugFix] fix nullptr delta writer in local tablet channel (backport #62861) (#62875)

Signed-off-by: Kevin Cai <kevin.cai@celerdata.com>
Co-authored-by: Kevin Cai <caixiaohua@starrocks.com>
This commit is contained in:
mergify[bot] 2025-09-09 07:26:54 +00:00 committed by GitHub
parent 5c87a1899f
commit 573454f4f8
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 36 additions and 1 deletions

View File

@ -390,7 +390,17 @@ void LocalTabletsChannel::add_chunk(Chunk* chunk, const PTabletWriterAddChunkReq
std::set<long> immutable_tablet_ids;
for (auto tablet_id : request.tablet_ids()) {
auto& writer = _delta_writers[tablet_id];
auto it = _delta_writers.find(tablet_id);
if (it == _delta_writers.end()) {
LOG(WARNING) << "LocalTabletsChannel txn_id: " << _txn_id << " load_id: " << print_id(request.id())
<< " not found tablet_id: " << tablet_id;
response->mutable_status()->set_status_code(TStatusCode::INTERNAL_ERROR);
response->mutable_status()->add_error_msgs(
fmt::format("Failed to add_chunk since tablet_id {} does not exist, txn_id: {}, load_id: {}",
tablet_id, _txn_id, print_id(request.id())));
return;
}
auto& writer = it->second;
if (writer->is_immutable() && immutable_tablet_ids.count(tablet_id) == 0) {
response->add_immutable_tablet_ids(tablet_id);
response->add_immutable_partition_ids(writer->partition_id());

View File

@ -232,6 +232,31 @@ protected:
using RpcLoadDisagnosePair = std::pair<PLoadDiagnoseRequest*, ReusableClosure<PLoadDiagnoseResult>*>;
TEST_F(LocalTabletsChannelTest, test_add_chunk_not_exist_tablet) {
_create_tablets(1);
// open as a secondary replica of 3 replicas
ReplicaInfo replica_info{_tablets[0]->tablet_id(), _nodes};
_open_channel(_nodes[1].node_id(), {replica_info});
PTabletWriterAddChunkRequest add_chunk_request;
add_chunk_request.mutable_id()->CopyFrom(_load_id);
add_chunk_request.set_index_id(_index_id);
add_chunk_request.set_sink_id(_sink_id);
add_chunk_request.set_sender_id(0);
add_chunk_request.set_eos(true);
add_chunk_request.set_packet_seq(0);
auto non_exist_tablet_id = _tablets[0]->tablet_id() + 1;
add_chunk_request.add_tablet_ids(non_exist_tablet_id);
bool close_channel = false;
PTabletWriterAddBatchResult add_chunk_response;
_tablets_channel->add_chunk(nullptr, add_chunk_request, &add_chunk_response, &close_channel);
ASSERT_EQ(TStatusCode::INTERNAL_ERROR, add_chunk_response.status().status_code()) << add_chunk_response.status();
ASSERT_TRUE(close_channel); // set_eos(true)
_tablets_channel->abort();
}
TEST_F(LocalTabletsChannelTest, diagnose_stack_trace) {
_create_tablets(1);
// open as a secondary replica of 3 replicas