[BugFix] Fix secondary replicas continue waiting because of wrong timestamp (backport #62805) (#63162)
Signed-off-by: PengFei Li <lpengfei2016@gmail.com> Co-authored-by: PengFei Li <lpengfei2016@gmail.com>
This commit is contained in:
parent
e33b9bc64d
commit
af71661762
|
|
@ -396,6 +396,17 @@ void LakeTabletsChannel::add_chunk(Chunk* chunk, const PTabletWriterAddChunkRequ
|
|||
response->mutable_status()->add_error_msgs("no packet_seq in PTabletWriterAddChunkRequest");
|
||||
return;
|
||||
}
|
||||
if (UNLIKELY(!request.has_timeout_ms())) {
|
||||
response->mutable_status()->set_status_code(TStatusCode::INVALID_ARGUMENT);
|
||||
response->mutable_status()->add_error_msgs("missing timeout_ms in PTabletWriterAddChunkRequest");
|
||||
return;
|
||||
}
|
||||
if (UNLIKELY(request.timeout_ms() < 0)) {
|
||||
response->mutable_status()->set_status_code(TStatusCode::INVALID_ARGUMENT);
|
||||
response->mutable_status()->add_error_msgs(
|
||||
fmt::format("negtive timeout_ms {} in PTabletWriterAddChunkRequest", request.timeout_ms()));
|
||||
return;
|
||||
}
|
||||
|
||||
auto& sender = _senders[request.sender_id()];
|
||||
|
||||
|
|
|
|||
|
|
@ -198,6 +198,17 @@ void LocalTabletsChannel::add_chunk(Chunk* chunk, const PTabletWriterAddChunkReq
|
|||
response->mutable_status()->add_error_msgs("no packet_seq in PTabletWriterAddChunkRequest");
|
||||
return;
|
||||
}
|
||||
if (UNLIKELY(!request.has_timeout_ms())) {
|
||||
response->mutable_status()->set_status_code(TStatusCode::INVALID_ARGUMENT);
|
||||
response->mutable_status()->add_error_msgs("missing timeout_ms in PTabletWriterAddChunkRequest");
|
||||
return;
|
||||
}
|
||||
if (UNLIKELY(request.timeout_ms() < 0)) {
|
||||
response->mutable_status()->set_status_code(TStatusCode::INVALID_ARGUMENT);
|
||||
response->mutable_status()->add_error_msgs(
|
||||
fmt::format("negtive timeout_ms {} in PTabletWriterAddChunkRequest", request.timeout_ms()));
|
||||
return;
|
||||
}
|
||||
|
||||
{
|
||||
std::lock_guard lock(_senders[request.sender_id()].lock);
|
||||
|
|
@ -361,7 +372,8 @@ void LocalTabletsChannel::add_chunk(Chunk* chunk, const PTabletWriterAddChunkReq
|
|||
}
|
||||
int64_t start_wait_time = MonotonicMillis();
|
||||
for (auto& [node_id, delta_writers] : unfinished_replicas_grouped_by_primary_node) {
|
||||
int64_t left_timeout_ms = std::max((uint64_t)0, request.timeout_ms() - watch.elapsed_time() / 1000000);
|
||||
int64_t elapsed_ms = static_cast<int64_t>(watch.elapsed_time() / NANOSECS_PER_MILLIS);
|
||||
int64_t left_timeout_ms = std::max<int64_t>(0, request.timeout_ms() - elapsed_ms);
|
||||
SecondaryReplicasWaiter waiter(request.id(), _txn_id, request.sink_id(), left_timeout_ms, start_wait_time,
|
||||
delta_writers);
|
||||
Status status = waiter.wait();
|
||||
|
|
@ -1246,7 +1258,7 @@ SecondaryReplicasWaiter::SecondaryReplicasWaiter(PUniqueId load_id, int64_t txn_
|
|||
: _load_id(std::move(load_id)),
|
||||
_txn_id(txn_id),
|
||||
_sink_id(sink_id),
|
||||
_timeout_ns(timeout_ms * NANOSECS_PER_MILLIS),
|
||||
_timeout_ns(std::max((int64_t)0, timeout_ms) * NANOSECS_PER_MILLIS),
|
||||
_delta_writers(std::move(delta_writers)),
|
||||
_eos_time_ms(eos_time_ms),
|
||||
_last_get_replica_status_time_ms(eos_time_ms) {}
|
||||
|
|
@ -1441,7 +1453,7 @@ void SecondaryReplicasWaiter::_try_diagnose_stack_strace_on_primary(int unfinish
|
|||
SET_IGNORE_OVERCROWDED(closure->cntl, load);
|
||||
PLoadDiagnoseRequest request;
|
||||
request.mutable_id()->set_hi(_load_id.hi());
|
||||
request.mutable_id()->set_hi(_load_id.lo());
|
||||
request.mutable_id()->set_lo(_load_id.lo());
|
||||
request.set_txn_id(_txn_id);
|
||||
request.set_stack_trace(true);
|
||||
closure->ref();
|
||||
|
|
|
|||
|
|
@ -281,6 +281,7 @@ TEST_F(LakeTabletsChannelTest, test_simple_write) {
|
|||
add_chunk_request.set_sender_id(0);
|
||||
add_chunk_request.set_eos(false);
|
||||
add_chunk_request.set_packet_seq(0);
|
||||
add_chunk_request.set_timeout_ms(60000);
|
||||
|
||||
for (int i = 0; i < kChunkSize; i++) {
|
||||
int64_t tablet_id = 10086 + (i / kChunkSizePerTablet);
|
||||
|
|
@ -345,6 +346,7 @@ TEST_F(LakeTabletsChannelTest, test_simple_write) {
|
|||
finish_request.set_sender_id(0);
|
||||
finish_request.set_eos(true);
|
||||
finish_request.set_packet_seq(1);
|
||||
finish_request.set_timeout_ms(60000);
|
||||
finish_request.add_partition_ids(10);
|
||||
finish_request.add_partition_ids(11);
|
||||
|
||||
|
|
@ -395,6 +397,7 @@ TEST_F(LakeTabletsChannelTest, test_write_partial_partition) {
|
|||
add_chunk_request.set_sender_id(0);
|
||||
add_chunk_request.set_eos(false);
|
||||
add_chunk_request.set_packet_seq(0);
|
||||
add_chunk_request.set_timeout_ms(60000);
|
||||
|
||||
for (int i = 0; i < kChunkSize; i++) {
|
||||
int64_t tablet_id = 10086 + (i / kChunkSizePerTablet);
|
||||
|
|
@ -416,6 +419,7 @@ TEST_F(LakeTabletsChannelTest, test_write_partial_partition) {
|
|||
finish_request.set_sender_id(0);
|
||||
finish_request.set_eos(true);
|
||||
finish_request.set_packet_seq(1);
|
||||
finish_request.set_timeout_ms(60000);
|
||||
// Does not contain partition 11
|
||||
finish_request.add_partition_ids(10);
|
||||
|
||||
|
|
@ -459,6 +463,7 @@ TEST_F(LakeTabletsChannelTest, test_write_bundling_file) {
|
|||
add_chunk_request.set_sender_id(0);
|
||||
add_chunk_request.set_eos(false);
|
||||
add_chunk_request.set_packet_seq(0);
|
||||
add_chunk_request.set_timeout_ms(60000);
|
||||
|
||||
for (int i = 0; i < kChunkSize; i++) {
|
||||
int64_t tablet_id = 10086 + (i / kChunkSizePerTablet);
|
||||
|
|
@ -480,6 +485,7 @@ TEST_F(LakeTabletsChannelTest, test_write_bundling_file) {
|
|||
finish_request.set_sender_id(0);
|
||||
finish_request.set_eos(true);
|
||||
finish_request.set_packet_seq(1);
|
||||
finish_request.set_timeout_ms(60000);
|
||||
finish_request.add_partition_ids(10);
|
||||
finish_request.add_partition_ids(11);
|
||||
|
||||
|
|
@ -530,6 +536,7 @@ TEST_F(LakeTabletsChannelTest, test_write_concurrently) {
|
|||
add_chunk_request.set_sender_id(sender_id);
|
||||
add_chunk_request.set_eos(false);
|
||||
add_chunk_request.set_packet_seq(i);
|
||||
add_chunk_request.set_timeout_ms(60000);
|
||||
|
||||
for (int j = 0; j < kChunkSize; j++) {
|
||||
int64_t tablet_id = 10086 + (j / kChunkSizePerTablet);
|
||||
|
|
@ -551,6 +558,7 @@ TEST_F(LakeTabletsChannelTest, test_write_concurrently) {
|
|||
finish_request.set_sender_id(sender_id);
|
||||
finish_request.set_eos(true);
|
||||
finish_request.set_packet_seq(kLookCount);
|
||||
finish_request.set_timeout_ms(60000);
|
||||
finish_request.add_partition_ids(10);
|
||||
finish_request.add_partition_ids(11);
|
||||
|
||||
|
|
@ -598,6 +606,7 @@ TEST_F(LakeTabletsChannelTest, DISABLED_test_abort) {
|
|||
add_chunk_request.set_sender_id(0);
|
||||
add_chunk_request.set_eos(false);
|
||||
add_chunk_request.set_packet_seq(packet_seq++);
|
||||
add_chunk_request.set_timeout_ms(60000);
|
||||
|
||||
for (int i = 0; i < kChunkSize; i++) {
|
||||
int64_t tablet_id = 10086 + (i / kChunkSizePerTablet);
|
||||
|
|
@ -623,6 +632,7 @@ TEST_F(LakeTabletsChannelTest, DISABLED_test_abort) {
|
|||
finish_request.set_packet_seq(packet_seq++);
|
||||
finish_request.add_partition_ids(10);
|
||||
finish_request.add_partition_ids(11);
|
||||
finish_request.set_timeout_ms(60000);
|
||||
_tablets_channel->add_chunk(nullptr, finish_request, &finish_response, &close_channel);
|
||||
ASSERT_NE(TStatusCode::OK, finish_response.status().status_code());
|
||||
ASSERT_TRUE(close_channel);
|
||||
|
|
@ -659,7 +669,7 @@ TEST_F(LakeTabletsChannelTest, test_write_failed) {
|
|||
add_chunk_request.set_sender_id(0);
|
||||
add_chunk_request.set_eos(false);
|
||||
add_chunk_request.set_packet_seq(0);
|
||||
|
||||
add_chunk_request.set_timeout_ms(60000);
|
||||
for (int i = 0; i < kChunkSize; i++) {
|
||||
int64_t tablet_id = 10086 + (i / kChunkSizePerTablet);
|
||||
add_chunk_request.add_tablet_ids(tablet_id);
|
||||
|
|
@ -705,6 +715,7 @@ TEST_F(LakeTabletsChannelTest, test_tablet_not_existed) {
|
|||
add_chunk_request.set_sender_id(0);
|
||||
add_chunk_request.set_eos(false);
|
||||
add_chunk_request.set_packet_seq(0);
|
||||
add_chunk_request.set_timeout_ms(60000);
|
||||
|
||||
for (int i = 0; i < kChunkSize; i++) {
|
||||
int64_t tablet_id = 10086 + (i / kChunkSizePerTablet);
|
||||
|
|
@ -744,6 +755,7 @@ TEST_F(LakeTabletsChannelTest, test_empty_tablet) {
|
|||
add_chunk_request.set_sender_id(0);
|
||||
add_chunk_request.set_eos(false);
|
||||
add_chunk_request.set_packet_seq(0);
|
||||
add_chunk_request.set_timeout_ms(60000);
|
||||
|
||||
// Only tablet 10086 has data
|
||||
for (int i = 0; i < kChunkSize; i++) {
|
||||
|
|
@ -767,6 +779,7 @@ TEST_F(LakeTabletsChannelTest, test_empty_tablet) {
|
|||
finish_request.set_packet_seq(1);
|
||||
finish_request.add_partition_ids(10);
|
||||
finish_request.add_partition_ids(11);
|
||||
finish_request.set_timeout_ms(60000);
|
||||
|
||||
_tablets_channel->add_chunk(nullptr, finish_request, &finish_response, &close_channel);
|
||||
ASSERT_TRUE(finish_response.status().status_code() == TStatusCode::OK);
|
||||
|
|
@ -812,6 +825,7 @@ TEST_F(LakeTabletsChannelTest, test_finish_failed) {
|
|||
add_chunk_request.set_sender_id(0);
|
||||
add_chunk_request.set_eos(false);
|
||||
add_chunk_request.set_packet_seq(0);
|
||||
add_chunk_request.set_timeout_ms(60000);
|
||||
|
||||
// Only tablet 10086 has data
|
||||
for (int i = 0; i < kChunkSize; i++) {
|
||||
|
|
@ -838,6 +852,7 @@ TEST_F(LakeTabletsChannelTest, test_finish_failed) {
|
|||
finish_request.set_packet_seq(1);
|
||||
finish_request.add_partition_ids(10);
|
||||
finish_request.add_partition_ids(11);
|
||||
finish_request.set_timeout_ms(60000);
|
||||
|
||||
_tablets_channel->add_chunk(nullptr, finish_request, &finish_response, &close_channel);
|
||||
ASSERT_NE(TStatusCode::OK, finish_response.status().status_code());
|
||||
|
|
@ -861,6 +876,7 @@ TEST_F(LakeTabletsChannelTest, test_finish_after_abort) {
|
|||
add_chunk_request.set_sender_id(0);
|
||||
add_chunk_request.set_eos(true);
|
||||
add_chunk_request.set_packet_seq(0);
|
||||
add_chunk_request.set_timeout_ms(60000);
|
||||
|
||||
for (int i = 0; i < kChunkSize; i++) {
|
||||
int64_t tablet_id = 10086 + (i / kChunkSizePerTablet);
|
||||
|
|
@ -889,6 +905,7 @@ TEST_F(LakeTabletsChannelTest, test_finish_after_abort) {
|
|||
finish_request.set_sender_id(1);
|
||||
finish_request.set_eos(true);
|
||||
finish_request.set_packet_seq(0);
|
||||
finish_request.set_timeout_ms(60000);
|
||||
|
||||
bool close_channel;
|
||||
_tablets_channel->add_chunk(nullptr, finish_request, &finish_response, &close_channel);
|
||||
|
|
@ -921,6 +938,7 @@ TEST_F(LakeTabletsChannelTest, test_profile) {
|
|||
add_chunk_request.set_sender_id(0);
|
||||
add_chunk_request.set_eos(true);
|
||||
add_chunk_request.set_packet_seq(0);
|
||||
add_chunk_request.set_timeout_ms(60000);
|
||||
|
||||
for (int i = 0; i < kChunkSize; i++) {
|
||||
int64_t tablet_id = 10086 + (i / kChunkSizePerTablet);
|
||||
|
|
@ -949,6 +967,81 @@ TEST_F(LakeTabletsChannelTest, test_profile) {
|
|||
ASSERT_EQ(4, replicas_profile->get_counter("TabletsNum")->value());
|
||||
}
|
||||
|
||||
TEST_F(LakeTabletsChannelTest, test_missing_timeout_ms) {
|
||||
auto open_request = _open_request;
|
||||
open_request.set_num_senders(1);
|
||||
|
||||
ASSERT_OK(_tablets_channel->open(open_request, &_open_response, _schema_param, false));
|
||||
|
||||
constexpr int kChunkSize = 32;
|
||||
constexpr int kChunkSizePerTablet = kChunkSize / 4;
|
||||
auto chunk = generate_data(kChunkSize);
|
||||
|
||||
PTabletWriterAddChunkRequest add_chunk_request;
|
||||
add_chunk_request.set_index_id(kIndexId);
|
||||
add_chunk_request.set_sender_id(0);
|
||||
add_chunk_request.set_eos(false);
|
||||
add_chunk_request.set_packet_seq(0);
|
||||
|
||||
for (int i = 0; i < kChunkSize; i++) {
|
||||
int64_t tablet_id = 10086 + (i / kChunkSizePerTablet);
|
||||
add_chunk_request.add_tablet_ids(tablet_id);
|
||||
add_chunk_request.add_partition_ids(tablet_id < 10088 ? 10 : 11);
|
||||
}
|
||||
|
||||
ASSIGN_OR_ABORT(auto chunk_pb, serde::ProtobufChunkSerde::serialize(chunk));
|
||||
add_chunk_request.mutable_chunk()->Swap(&chunk_pb);
|
||||
|
||||
bool close_channel;
|
||||
PTabletWriterAddBatchResult resp;
|
||||
_tablets_channel->add_chunk(&chunk, add_chunk_request, &resp, &close_channel);
|
||||
ASSERT_EQ(TStatusCode::INVALID_ARGUMENT, resp.status().status_code());
|
||||
ASSERT_GE(resp.status().error_msgs_size(), 1);
|
||||
{
|
||||
const auto& msg = resp.status().error_msgs(0);
|
||||
ASSERT_TRUE(msg.find("missing timeout_ms") != std::string::npos) << msg;
|
||||
}
|
||||
ASSERT_FALSE(close_channel);
|
||||
}
|
||||
|
||||
TEST_F(LakeTabletsChannelTest, test_negative_timeout_ms) {
|
||||
auto open_request = _open_request;
|
||||
open_request.set_num_senders(1);
|
||||
|
||||
ASSERT_OK(_tablets_channel->open(open_request, &_open_response, _schema_param, false));
|
||||
|
||||
constexpr int kChunkSize = 32;
|
||||
constexpr int kChunkSizePerTablet = kChunkSize / 4;
|
||||
auto chunk = generate_data(kChunkSize);
|
||||
|
||||
PTabletWriterAddChunkRequest add_chunk_request;
|
||||
add_chunk_request.set_index_id(kIndexId);
|
||||
add_chunk_request.set_sender_id(0);
|
||||
add_chunk_request.set_eos(false);
|
||||
add_chunk_request.set_packet_seq(0);
|
||||
add_chunk_request.set_timeout_ms(-1);
|
||||
|
||||
for (int i = 0; i < kChunkSize; i++) {
|
||||
int64_t tablet_id = 10086 + (i / kChunkSizePerTablet);
|
||||
add_chunk_request.add_tablet_ids(tablet_id);
|
||||
add_chunk_request.add_partition_ids(tablet_id < 10088 ? 10 : 11);
|
||||
}
|
||||
|
||||
ASSIGN_OR_ABORT(auto chunk_pb, serde::ProtobufChunkSerde::serialize(chunk));
|
||||
add_chunk_request.mutable_chunk()->Swap(&chunk_pb);
|
||||
|
||||
bool close_channel;
|
||||
PTabletWriterAddBatchResult resp;
|
||||
_tablets_channel->add_chunk(&chunk, add_chunk_request, &resp, &close_channel);
|
||||
ASSERT_EQ(TStatusCode::INVALID_ARGUMENT, resp.status().status_code());
|
||||
ASSERT_GE(resp.status().error_msgs_size(), 1);
|
||||
{
|
||||
const auto& msg = resp.status().error_msgs(0);
|
||||
ASSERT_TRUE(msg.find("negtive timeout_ms") != std::string::npos) << msg;
|
||||
}
|
||||
ASSERT_FALSE(close_channel);
|
||||
}
|
||||
|
||||
struct Param {
|
||||
int num_sender;
|
||||
int fail_tablet;
|
||||
|
|
|
|||
|
|
@ -280,6 +280,7 @@ TEST_F(LoadChannelTestForLakeTablet, test_simple_write) {
|
|||
add_chunk_request.mutable_id()->set_hi(0);
|
||||
add_chunk_request.mutable_id()->set_lo(0);
|
||||
add_chunk_request.set_sink_id(0);
|
||||
add_chunk_request.set_timeout_ms(60000);
|
||||
|
||||
ASSIGN_OR_ABORT(auto chunk_pb, serde::ProtobufChunkSerde::serialize(chunk));
|
||||
add_chunk_request.mutable_chunk()->Swap(&chunk_pb);
|
||||
|
|
@ -308,6 +309,7 @@ TEST_F(LoadChannelTestForLakeTablet, test_simple_write) {
|
|||
finish_request.set_packet_seq(1);
|
||||
finish_request.add_partition_ids(10);
|
||||
finish_request.add_partition_ids(11);
|
||||
finish_request.set_timeout_ms(60000);
|
||||
|
||||
_load_channel->add_chunk(finish_request, &finish_response);
|
||||
ASSERT_EQ(TStatusCode::OK, finish_response.status().status_code());
|
||||
|
|
@ -360,6 +362,7 @@ TEST_F(LoadChannelTestForLakeTablet, test_write_concurrently) {
|
|||
add_chunk_request.mutable_id()->set_hi(0);
|
||||
add_chunk_request.mutable_id()->set_lo(0);
|
||||
add_chunk_request.set_sink_id(0);
|
||||
add_chunk_request.set_timeout_ms(60000);
|
||||
|
||||
for (int j = 0; j < kChunkSize; j++) {
|
||||
int64_t tablet_id = 10086 + (j / kChunkSizePerTablet);
|
||||
|
|
@ -384,6 +387,7 @@ TEST_F(LoadChannelTestForLakeTablet, test_write_concurrently) {
|
|||
finish_request.set_packet_seq(kLookCount);
|
||||
finish_request.add_partition_ids(10);
|
||||
finish_request.add_partition_ids(11);
|
||||
finish_request.set_timeout_ms(60000);
|
||||
|
||||
_load_channel->add_chunk(finish_request, &finish_response);
|
||||
ASSERT_EQ(TStatusCode::OK, finish_response.status().status_code()) << finish_response.status().error_msgs()[0];
|
||||
|
|
@ -428,6 +432,7 @@ TEST_F(LoadChannelTestForLakeTablet, test_abort) {
|
|||
add_chunk_request.mutable_id()->set_hi(0);
|
||||
add_chunk_request.mutable_id()->set_lo(0);
|
||||
add_chunk_request.set_sink_id(0);
|
||||
add_chunk_request.set_timeout_ms(60000);
|
||||
|
||||
for (int i = 0; i < kChunkSize; i++) {
|
||||
int64_t tablet_id = 10086 + (i / kChunkSizePerTablet);
|
||||
|
|
@ -452,6 +457,7 @@ TEST_F(LoadChannelTestForLakeTablet, test_abort) {
|
|||
finish_request.set_packet_seq(packet_seq++);
|
||||
finish_request.add_partition_ids(10);
|
||||
finish_request.add_partition_ids(11);
|
||||
finish_request.set_timeout_ms(60000);
|
||||
_load_channel->add_chunk(finish_request, &finish_response);
|
||||
ASSERT_NE(TStatusCode::OK, finish_response.status().status_code());
|
||||
stopped.store(true);
|
||||
|
|
@ -624,6 +630,7 @@ TEST_F(LoadChannelTestForLakeTablet, test_final_profile) {
|
|||
add_chunk_request.mutable_id()->set_hi(0);
|
||||
add_chunk_request.mutable_id()->set_lo(0);
|
||||
add_chunk_request.set_sink_id(0);
|
||||
add_chunk_request.set_timeout_ms(60000);
|
||||
|
||||
ASSIGN_OR_ABORT(auto chunk_pb, serde::ProtobufChunkSerde::serialize(chunk));
|
||||
add_chunk_request.mutable_chunk()->Swap(&chunk_pb);
|
||||
|
|
@ -773,6 +780,7 @@ TEST_F(LoadChannelTestForLakeTablet, test_load_diagnose) {
|
|||
add_chunk_request.mutable_id()->set_hi(0);
|
||||
add_chunk_request.mutable_id()->set_lo(0);
|
||||
add_chunk_request.set_sink_id(0);
|
||||
add_chunk_request.set_timeout_ms(60000);
|
||||
|
||||
ASSIGN_OR_ABORT(auto chunk_pb, serde::ProtobufChunkSerde::serialize(chunk));
|
||||
add_chunk_request.mutable_chunk()->Swap(&chunk_pb);
|
||||
|
|
|
|||
|
|
@ -245,6 +245,7 @@ TEST_F(LocalTabletsChannelTest, test_add_chunk_not_exist_tablet) {
|
|||
add_chunk_request.set_sender_id(0);
|
||||
add_chunk_request.set_eos(true);
|
||||
add_chunk_request.set_packet_seq(0);
|
||||
add_chunk_request.set_timeout_ms(60000);
|
||||
|
||||
auto non_exist_tablet_id = _tablets[0]->tablet_id() + 1;
|
||||
add_chunk_request.add_tablet_ids(non_exist_tablet_id);
|
||||
|
|
@ -270,6 +271,7 @@ TEST_F(LocalTabletsChannelTest, diagnose_stack_trace) {
|
|||
add_chunk_request.set_sender_id(0);
|
||||
add_chunk_request.set_eos(true);
|
||||
add_chunk_request.set_packet_seq(0);
|
||||
add_chunk_request.set_timeout_ms(100);
|
||||
|
||||
auto old_threshold = config::load_diagnose_rpc_timeout_stack_trace_threshold_ms;
|
||||
DeferOp defer([&]() {
|
||||
|
|
@ -301,6 +303,56 @@ TEST_F(LocalTabletsChannelTest, diagnose_stack_trace) {
|
|||
ASSERT_EQ(1, num_diagnose);
|
||||
}
|
||||
|
||||
TEST_F(LocalTabletsChannelTest, test_add_chunk_missing_timeout_ms) {
|
||||
_create_tablets(1);
|
||||
// open as a secondary replica of 3 replicas
|
||||
ReplicaInfo replica_info{_tablets[0]->tablet_id(), _nodes};
|
||||
_open_channel(_nodes[1].node_id(), {replica_info});
|
||||
|
||||
PTabletWriterAddChunkRequest add_chunk_request;
|
||||
add_chunk_request.mutable_id()->CopyFrom(_load_id);
|
||||
add_chunk_request.set_index_id(_index_id);
|
||||
add_chunk_request.set_sink_id(_sink_id);
|
||||
add_chunk_request.set_sender_id(0);
|
||||
add_chunk_request.set_eos(true);
|
||||
add_chunk_request.set_packet_seq(0);
|
||||
// intentionally do NOT set timeout_ms to trigger validation error
|
||||
|
||||
bool close_channel = true; // will be reset to false inside add_chunk
|
||||
PTabletWriterAddBatchResult add_chunk_response;
|
||||
_tablets_channel->add_chunk(nullptr, add_chunk_request, &add_chunk_response, &close_channel);
|
||||
ASSERT_EQ(TStatusCode::INVALID_ARGUMENT, add_chunk_response.status().status_code()) << add_chunk_response.status();
|
||||
ASSERT_TRUE(add_chunk_response.status().error_msgs_size() > 0);
|
||||
ASSERT_TRUE(add_chunk_response.status().error_msgs(0).find("missing timeout_ms") != std::string::npos)
|
||||
<< add_chunk_response.status().error_msgs(0);
|
||||
ASSERT_FALSE(close_channel);
|
||||
}
|
||||
|
||||
TEST_F(LocalTabletsChannelTest, test_add_chunk_negative_timeout_ms) {
|
||||
_create_tablets(1);
|
||||
// open as a secondary replica of 3 replicas
|
||||
ReplicaInfo replica_info{_tablets[0]->tablet_id(), _nodes};
|
||||
_open_channel(_nodes[1].node_id(), {replica_info});
|
||||
|
||||
PTabletWriterAddChunkRequest add_chunk_request;
|
||||
add_chunk_request.mutable_id()->CopyFrom(_load_id);
|
||||
add_chunk_request.set_index_id(_index_id);
|
||||
add_chunk_request.set_sink_id(_sink_id);
|
||||
add_chunk_request.set_sender_id(0);
|
||||
add_chunk_request.set_eos(true);
|
||||
add_chunk_request.set_packet_seq(0);
|
||||
add_chunk_request.set_timeout_ms(-1); // negative to trigger validation error
|
||||
|
||||
bool close_channel = true; // will be reset to false inside add_chunk
|
||||
PTabletWriterAddBatchResult add_chunk_response;
|
||||
_tablets_channel->add_chunk(nullptr, add_chunk_request, &add_chunk_response, &close_channel);
|
||||
ASSERT_EQ(TStatusCode::INVALID_ARGUMENT, add_chunk_response.status().status_code()) << add_chunk_response.status();
|
||||
ASSERT_TRUE(add_chunk_response.status().error_msgs_size() > 0);
|
||||
ASSERT_TRUE(add_chunk_response.status().error_msgs(0).find("negtive timeout_ms") != std::string::npos)
|
||||
<< add_chunk_response.status().error_msgs(0);
|
||||
ASSERT_FALSE(close_channel);
|
||||
}
|
||||
|
||||
TEST_F(LocalTabletsChannelTest, test_primary_replica_profile) {
|
||||
_create_tablets(1);
|
||||
auto& tablet = _tablets[0];
|
||||
|
|
@ -314,6 +366,7 @@ TEST_F(LocalTabletsChannelTest, test_primary_replica_profile) {
|
|||
add_chunk_request.set_sender_id(0);
|
||||
add_chunk_request.set_eos(true);
|
||||
add_chunk_request.set_packet_seq(0);
|
||||
add_chunk_request.set_timeout_ms(60000);
|
||||
|
||||
int chunk_size = 16;
|
||||
auto chunk = _generate_data(chunk_size, tablet->tablet_schema());
|
||||
|
|
@ -382,6 +435,7 @@ void LocalTabletsChannelTest::test_cancel_secondary_replica_base(bool is_empty_t
|
|||
add_chunk_request.set_eos(true);
|
||||
add_chunk_request.set_packet_seq(0);
|
||||
add_chunk_request.set_wait_all_sender_close(true);
|
||||
add_chunk_request.set_timeout_ms(60000);
|
||||
|
||||
auto chunk = _generate_data(1, tablet->tablet_schema());
|
||||
ASSIGN_OR_ABORT(auto chunk_pb, serde::ProtobufChunkSerde::serialize(chunk));
|
||||
|
|
@ -447,6 +501,7 @@ TEST_F(LocalTabletsChannelTest, test_cancel_secondary_replica_rpc_fail) {
|
|||
add_chunk_request.set_eos(true);
|
||||
add_chunk_request.set_packet_seq(0);
|
||||
add_chunk_request.set_wait_all_sender_close(true);
|
||||
add_chunk_request.set_timeout_ms(60000);
|
||||
|
||||
DeferOp defer([&]() {
|
||||
SyncPoint::GetInstance()->ClearCallBack("LocalTabletsChannel::rpc::tablet_writer_cancel");
|
||||
|
|
@ -744,6 +799,7 @@ TEST_F(LocalTabletsChannelTest, test_get_replica_status) {
|
|||
add_chunk_request.set_eos(true);
|
||||
add_chunk_request.set_packet_seq(0);
|
||||
add_chunk_request.set_wait_all_sender_close(true);
|
||||
add_chunk_request.set_timeout_ms(60000);
|
||||
auto chunk = _generate_data(2, _tablets[0]->tablet_schema());
|
||||
ASSIGN_OR_ABORT(auto chunk_pb, serde::ProtobufChunkSerde::serialize(chunk));
|
||||
add_chunk_request.mutable_chunk()->Swap(&chunk_pb);
|
||||
|
|
|
|||
|
|
@ -2142,6 +2142,7 @@ TEST_F(LakeServiceTest, test_abort_txn2) {
|
|||
add_chunk_request.set_sender_id(0);
|
||||
add_chunk_request.set_eos(false);
|
||||
add_chunk_request.set_packet_seq(i);
|
||||
add_chunk_request.set_timeout_ms(60000);
|
||||
|
||||
for (int j = 0; j < chunk_size; j++) {
|
||||
add_chunk_request.add_tablet_ids(_tablet_id);
|
||||
|
|
|
|||
Loading…
Reference in New Issue