[UT] fix ut ASAN leak, wait for all rpc requests done before exit (backport #62986) (#62990)

Signed-off-by: Kevin Cai <kevin.cai@celerdata.com>
Co-authored-by: Kevin Cai <caixiaohua@starrocks.com>
This commit is contained in:
mergify[bot] 2025-09-11 06:39:55 +00:00 committed by GitHub
parent 7bfb6c6076
commit 395e12e3ba
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
1 changed files with 24 additions and 6 deletions

View File

@ -46,17 +46,35 @@ TEST_F(PInternalService_RecoverableStub_ParallelTest, test_parallel_reset_execut
}
});
int num_threads = 20;
constexpr int num_threads = 20;
constexpr int repeats_in_single_round = 1000;
std::vector<std::thread> send_threads;
for (int i = 0; i < num_threads; ++i) {
send_threads.emplace_back([&] {
std::vector<RefCountClosure<PTabletWriterAddBatchResult>*> closures;
closures.reserve(repeats_in_single_round);
while (running.load()) {
PTabletWriterAddChunkRequest request;
auto* closure = new starrocks::RefCountClosure<PTabletWriterAddBatchResult>();
closure->ref();
stub->tablet_writer_add_chunk(&closure->cntl, &request, &closure->result, closure);
std::this_thread::sleep_for(std::chrono::microseconds(1));
while (closures.size() < repeats_in_single_round && running.load()) {
PTabletWriterAddChunkRequest request;
auto* closure = new starrocks::RefCountClosure<PTabletWriterAddBatchResult>();
closure->ref(); // retain itself to be managed in the vector
closures.push_back(closure);
closure->ref(); // retain it to be used when closure->Run() is invoked.
stub->tablet_writer_add_chunk(&closure->cntl, &request, &closure->result, closure);
std::this_thread::sleep_for(std::chrono::microseconds(1));
}
// wait for closures done and release the resources
for (auto* closure : closures) {
// make sure the closure is done
closure->join();
// dereference it to release the memory
if (closure->unref()) {
delete closure;
}
}
closures.clear();
}
EXPECT_TRUE(closures.empty());
});
}