Signed-off-by: sevev <qiangzh95@gmail.com> Co-authored-by: zhangqiang <qiangzh95@gmail.com>
This commit is contained in:
parent
e7459daeac
commit
4d6242ccf3
|
|
@ -114,6 +114,97 @@ protected:
|
|||
return log;
|
||||
}
|
||||
|
||||
void run_aggregate_compact(::google::protobuf::RpcController* cntl, const AggregateCompactRequest* request,
|
||||
CompactResponse* response) {
|
||||
CountDownLatch latch(1);
|
||||
auto cb = ::google::protobuf::NewCallback(&latch, &CountDownLatch::count_down);
|
||||
_lake_service.aggregate_compact(cntl, request, response, cb);
|
||||
latch.wait();
|
||||
}
|
||||
|
||||
void init_server_with_mock(MockLakeServiceImpl* mock_service, brpc::Server* server, int* port_out) {
|
||||
brpc::ServerOptions options;
|
||||
options.num_threads = 1;
|
||||
ASSERT_EQ(server->AddService(mock_service, brpc::SERVER_DOESNT_OWN_SERVICE), 0);
|
||||
ASSERT_EQ(server->Start(0, &options), 0);
|
||||
butil::EndPoint server_addr = server->listen_address();
|
||||
*port_out = server_addr.port;
|
||||
}
|
||||
|
||||
AggregatePublishVersionRequest build_default_agg_request(int port) {
|
||||
AggregatePublishVersionRequest request;
|
||||
auto* compute_node = request.add_compute_nodes();
|
||||
compute_node->set_host("127.0.0.1");
|
||||
compute_node->set_brpc_port(port);
|
||||
auto* publish_req = request.add_publish_reqs();
|
||||
publish_req->set_timeout_ms(5000);
|
||||
return request;
|
||||
}
|
||||
|
||||
void build_schemas_and_metadata(TabletSchemaPB* schema_pb1, TabletSchemaPB* schema_pb2, TabletSchemaPB* schema_pb3,
|
||||
starrocks::TabletMetadataPB* metadata1, starrocks::TabletMetadataPB* metadata2) {
|
||||
// schema 1
|
||||
schema_pb1->set_id(10);
|
||||
schema_pb1->set_num_short_key_columns(1);
|
||||
schema_pb1->set_keys_type(DUP_KEYS);
|
||||
schema_pb1->set_num_rows_per_row_block(65535);
|
||||
auto c0 = schema_pb1->add_column();
|
||||
c0->set_unique_id(0);
|
||||
c0->set_name("c0");
|
||||
c0->set_type("INT");
|
||||
c0->set_is_key(true);
|
||||
c0->set_is_nullable(false);
|
||||
|
||||
// schema 2
|
||||
schema_pb2->set_id(11);
|
||||
schema_pb2->set_num_short_key_columns(1);
|
||||
schema_pb2->set_keys_type(DUP_KEYS);
|
||||
schema_pb2->set_num_rows_per_row_block(65535);
|
||||
auto c1 = schema_pb2->add_column();
|
||||
c1->set_unique_id(1);
|
||||
c1->set_name("c1");
|
||||
c1->set_type("INT");
|
||||
c1->set_is_key(false);
|
||||
c1->set_is_nullable(false);
|
||||
|
||||
// schema 3
|
||||
if (schema_pb3 != nullptr) {
|
||||
schema_pb3->set_id(12);
|
||||
schema_pb3->set_num_short_key_columns(1);
|
||||
schema_pb3->set_keys_type(DUP_KEYS);
|
||||
schema_pb3->set_num_rows_per_row_block(65535);
|
||||
auto c2 = schema_pb3->add_column();
|
||||
c2->set_unique_id(2);
|
||||
c2->set_name("c2");
|
||||
c2->set_type("INT");
|
||||
c2->set_is_key(false);
|
||||
c2->set_is_nullable(false);
|
||||
}
|
||||
|
||||
// metadata 1
|
||||
metadata1->set_id(1);
|
||||
metadata1->set_version(2);
|
||||
metadata1->mutable_schema()->CopyFrom(*schema_pb1);
|
||||
auto& item1 = (*metadata1->mutable_historical_schemas())[10];
|
||||
item1.CopyFrom(*schema_pb1);
|
||||
auto& item2 = (*metadata1->mutable_historical_schemas())[11];
|
||||
item2.CopyFrom(*schema_pb2);
|
||||
(*metadata1->mutable_rowset_to_schema())[3] = 11;
|
||||
|
||||
// metadata 2
|
||||
if (metadata2 != nullptr) {
|
||||
metadata2->set_id(2);
|
||||
metadata2->set_version(2);
|
||||
metadata2->mutable_schema()->CopyFrom(*schema_pb1);
|
||||
auto& item1_b = (*metadata1->mutable_historical_schemas())[10];
|
||||
item1_b.CopyFrom(*schema_pb1);
|
||||
if (schema_pb3 != nullptr) {
|
||||
auto& item2_b = (*metadata1->mutable_historical_schemas())[12];
|
||||
item2_b.CopyFrom(*schema_pb3);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
constexpr static const char* const kRootLocation = "./lake_service_test";
|
||||
int64_t _tablet_id;
|
||||
int64_t _partition_id;
|
||||
|
|
@ -982,25 +1073,19 @@ TEST_F(LakeServiceTest, test_compact) {
|
|||
}
|
||||
}
|
||||
|
||||
TEST_F(LakeServiceTest, test_aggregate_compact) {
|
||||
auto agg_compact = [this](::google::protobuf::RpcController* cntl, const AggregateCompactRequest* request,
|
||||
CompactResponse* response) {
|
||||
CountDownLatch latch(1);
|
||||
auto cb = ::google::protobuf::NewCallback(&latch, &CountDownLatch::count_down);
|
||||
_lake_service.aggregate_compact(cntl, request, response, cb);
|
||||
latch.wait();
|
||||
};
|
||||
|
||||
TEST_F(LakeServiceTest, test_aggregate_compact_failed) {
|
||||
auto txn_id = next_id();
|
||||
|
||||
// empty requests
|
||||
{
|
||||
brpc::Controller cntl;
|
||||
AggregateCompactRequest agg_request;
|
||||
CompactResponse response;
|
||||
agg_compact(&cntl, &agg_request, &response);
|
||||
run_aggregate_compact(&cntl, &agg_request, &response);
|
||||
ASSERT_TRUE(cntl.Failed());
|
||||
ASSERT_EQ("empty requests", cntl.ErrorText());
|
||||
}
|
||||
|
||||
// compute nodes size not equal to requests size
|
||||
{
|
||||
brpc::Controller cntl;
|
||||
|
|
@ -1010,12 +1095,12 @@ TEST_F(LakeServiceTest, test_aggregate_compact) {
|
|||
request.add_tablet_ids(_tablet_id);
|
||||
request.set_txn_id(txn_id);
|
||||
request.set_version(1);
|
||||
// add request to agg_request
|
||||
agg_request.add_requests()->CopyFrom(request);
|
||||
agg_compact(&cntl, &agg_request, &response);
|
||||
run_aggregate_compact(&cntl, &agg_request, &response);
|
||||
ASSERT_TRUE(cntl.Failed());
|
||||
ASSERT_EQ("compute nodes size not equal to requests size", cntl.ErrorText());
|
||||
}
|
||||
|
||||
// compute node missing host/port
|
||||
{
|
||||
brpc::Controller cntl;
|
||||
|
|
@ -1027,10 +1112,9 @@ TEST_F(LakeServiceTest, test_aggregate_compact) {
|
|||
request.add_tablet_ids(_tablet_id);
|
||||
request.set_txn_id(txn_id);
|
||||
request.set_version(1);
|
||||
// add request to agg_request
|
||||
agg_request.add_requests()->CopyFrom(request);
|
||||
agg_request.add_compute_nodes()->CopyFrom(cn);
|
||||
agg_compact(&cntl, &agg_request, &response);
|
||||
run_aggregate_compact(&cntl, &agg_request, &response);
|
||||
ASSERT_EQ("compute node missing host/port", response.status().error_msgs(0));
|
||||
}
|
||||
|
||||
|
|
@ -1047,13 +1131,14 @@ TEST_F(LakeServiceTest, test_aggregate_compact) {
|
|||
request.add_tablet_ids(_tablet_id);
|
||||
request.set_txn_id(txn_id);
|
||||
request.set_version(1);
|
||||
// add request to agg_request
|
||||
agg_request.add_requests()->CopyFrom(request);
|
||||
agg_request.add_compute_nodes()->CopyFrom(cn);
|
||||
agg_compact(&cntl, &agg_request, &response);
|
||||
run_aggregate_compact(&cntl, &agg_request, &response);
|
||||
ASSERT_TRUE(response.status().status_code() != 0);
|
||||
}
|
||||
}
|
||||
|
||||
TEST_F(LakeServiceTest, test_aggregate_compact_success) {
|
||||
brpc::ServerOptions options;
|
||||
options.num_threads = 1;
|
||||
brpc::Server server;
|
||||
|
|
@ -1078,6 +1163,9 @@ TEST_F(LakeServiceTest, test_aggregate_compact) {
|
|||
resp->mutable_status()->set_status_code(0);
|
||||
done->Run();
|
||||
}));
|
||||
|
||||
auto txn_id = next_id();
|
||||
|
||||
// compact success - single cn
|
||||
{
|
||||
brpc::Controller cntl;
|
||||
|
|
@ -1092,14 +1180,14 @@ TEST_F(LakeServiceTest, test_aggregate_compact) {
|
|||
request.set_txn_id(txn_id);
|
||||
request.set_version(1);
|
||||
request.set_timeout_ms(3000);
|
||||
// add request to agg_request
|
||||
agg_request.add_requests()->CopyFrom(request);
|
||||
agg_request.add_compute_nodes()->CopyFrom(cn);
|
||||
agg_request.set_partition_id(99);
|
||||
agg_compact(&cntl, &agg_request, &response);
|
||||
run_aggregate_compact(&cntl, &agg_request, &response);
|
||||
ASSERT_FALSE(cntl.Failed());
|
||||
ASSERT_EQ(0, response.failed_tablets_size());
|
||||
}
|
||||
|
||||
// compact success - 3 cn
|
||||
{
|
||||
brpc::Controller cntl;
|
||||
|
|
@ -1114,36 +1202,26 @@ TEST_F(LakeServiceTest, test_aggregate_compact) {
|
|||
request.set_txn_id(txn_id);
|
||||
request.set_version(1);
|
||||
request.set_timeout_ms(3000);
|
||||
// add request to agg_request
|
||||
agg_request.add_requests()->CopyFrom(request);
|
||||
agg_request.add_compute_nodes()->CopyFrom(cn);
|
||||
agg_request.set_partition_id(99);
|
||||
}
|
||||
CompactResponse response;
|
||||
agg_compact(&cntl, &agg_request, &response);
|
||||
run_aggregate_compact(&cntl, &agg_request, &response);
|
||||
ASSERT_FALSE(cntl.Failed());
|
||||
ASSERT_EQ(0, response.failed_tablets_size());
|
||||
}
|
||||
|
||||
server.Stop(0);
|
||||
server.Join();
|
||||
}
|
||||
|
||||
TEST_F(LakeServiceTest, test_aggregate_compact_with_error) {
|
||||
auto agg_compact = [this](::google::protobuf::RpcController* cntl, const AggregateCompactRequest* request,
|
||||
CompactResponse* response) {
|
||||
CountDownLatch latch(1);
|
||||
auto cb = ::google::protobuf::NewCallback(&latch, &CountDownLatch::count_down);
|
||||
_lake_service.aggregate_compact(cntl, request, response, cb);
|
||||
latch.wait();
|
||||
};
|
||||
|
||||
brpc::ServerOptions options;
|
||||
options.num_threads = 1;
|
||||
brpc::Server server;
|
||||
MockLakeServiceImpl mock_service;
|
||||
ASSERT_EQ(server.AddService(&mock_service, brpc::SERVER_DOESNT_OWN_SERVICE), 0);
|
||||
ASSERT_EQ(server.Start(0, &options), 0);
|
||||
int port = 0;
|
||||
init_server_with_mock(&mock_service, &server, &port);
|
||||
|
||||
butil::EndPoint server_addr = server.listen_address();
|
||||
const int port = server_addr.port;
|
||||
EXPECT_CALL(mock_service, compact(_, _, _, _))
|
||||
.WillRepeatedly(Invoke([&](::google::protobuf::RpcController*, const CompactRequest*, CompactResponse* resp,
|
||||
::google::protobuf::Closure* done) {
|
||||
|
|
@ -1153,32 +1231,30 @@ TEST_F(LakeServiceTest, test_aggregate_compact_with_error) {
|
|||
}));
|
||||
|
||||
auto txn_id = next_id();
|
||||
// compact failed - single cn
|
||||
{
|
||||
brpc::Controller cntl;
|
||||
AggregateCompactRequest agg_request;
|
||||
CompactRequest request;
|
||||
ComputeNodePB cn;
|
||||
cn.set_host("127.0.0.1");
|
||||
cn.set_brpc_port(port);
|
||||
cn.set_id(1);
|
||||
CompactResponse response;
|
||||
request.add_tablet_ids(_tablet_id);
|
||||
request.set_txn_id(txn_id);
|
||||
request.set_version(1);
|
||||
request.set_timeout_ms(3000);
|
||||
// add request to agg_request
|
||||
agg_request.add_requests()->CopyFrom(request);
|
||||
agg_request.add_compute_nodes()->CopyFrom(cn);
|
||||
agg_compact(&cntl, &agg_request, &response);
|
||||
ASSERT_FALSE(cntl.Failed());
|
||||
// check status
|
||||
ASSERT_EQ(TStatusCode::INTERNAL_ERROR, response.status().status_code());
|
||||
// check error messages
|
||||
ASSERT_EQ(1, response.status().error_msgs_size());
|
||||
// check error msge
|
||||
ASSERT_EQ("injected error", response.status().error_msgs(0));
|
||||
}
|
||||
brpc::Controller cntl;
|
||||
AggregateCompactRequest agg_request;
|
||||
CompactRequest request;
|
||||
ComputeNodePB cn;
|
||||
cn.set_host("127.0.0.1");
|
||||
cn.set_brpc_port(port);
|
||||
cn.set_id(1);
|
||||
CompactResponse response;
|
||||
request.add_tablet_ids(_tablet_id);
|
||||
request.set_txn_id(txn_id);
|
||||
request.set_version(1);
|
||||
request.set_timeout_ms(3000);
|
||||
agg_request.add_requests()->CopyFrom(request);
|
||||
agg_request.add_compute_nodes()->CopyFrom(cn);
|
||||
|
||||
run_aggregate_compact(&cntl, &agg_request, &response);
|
||||
|
||||
ASSERT_FALSE(cntl.Failed());
|
||||
ASSERT_EQ(TStatusCode::INTERNAL_ERROR, response.status().status_code());
|
||||
ASSERT_EQ(1, response.status().error_msgs_size());
|
||||
ASSERT_EQ("injected error", response.status().error_msgs(0));
|
||||
|
||||
server.Stop(0);
|
||||
server.Join();
|
||||
}
|
||||
|
||||
TEST_F(LakeServiceTest, test_drop_table) {
|
||||
|
|
@ -2497,159 +2573,80 @@ TEST_F(LakeServiceTest, test_delete_data_ok) {
|
|||
EXPECT_EQ(0L, response.failed_tablets().size());
|
||||
}
|
||||
|
||||
TEST_F(LakeServiceTest, test_aggregate_publish_version) {
|
||||
brpc::ServerOptions options;
|
||||
options.num_threads = 1;
|
||||
TEST_F(LakeServiceTest, test_aggregate_publish_version_normal) {
|
||||
brpc::Server server;
|
||||
MockLakeServiceImpl mock_service;
|
||||
ASSERT_EQ(server.AddService(&mock_service, brpc::SERVER_DOESNT_OWN_SERVICE), 0);
|
||||
ASSERT_EQ(server.Start(0, &options), 0);
|
||||
int port = 0;
|
||||
init_server_with_mock(&mock_service, &server, &port);
|
||||
|
||||
butil::EndPoint server_addr = server.listen_address();
|
||||
const int port = server_addr.port;
|
||||
AggregatePublishVersionRequest request;
|
||||
auto* compute_node = request.add_compute_nodes();
|
||||
compute_node->set_host("127.0.0.1");
|
||||
compute_node->set_brpc_port(port);
|
||||
auto* publish_req = request.add_publish_reqs();
|
||||
publish_req->set_timeout_ms(5000);
|
||||
auto request = build_default_agg_request(port);
|
||||
|
||||
TabletSchemaPB schema_pb1;
|
||||
{
|
||||
schema_pb1.set_id(10);
|
||||
schema_pb1.set_num_short_key_columns(1);
|
||||
schema_pb1.set_keys_type(DUP_KEYS);
|
||||
schema_pb1.set_num_rows_per_row_block(65535);
|
||||
auto c0 = schema_pb1.add_column();
|
||||
c0->set_unique_id(0);
|
||||
c0->set_name("c0");
|
||||
c0->set_type("INT");
|
||||
c0->set_is_key(true);
|
||||
c0->set_is_nullable(false);
|
||||
}
|
||||
|
||||
TabletSchemaPB schema_pb2;
|
||||
{
|
||||
schema_pb2.set_id(11);
|
||||
schema_pb2.set_num_short_key_columns(1);
|
||||
schema_pb2.set_keys_type(DUP_KEYS);
|
||||
schema_pb2.set_num_rows_per_row_block(65535);
|
||||
auto c1 = schema_pb2.add_column();
|
||||
c1->set_unique_id(1);
|
||||
c1->set_name("c1");
|
||||
c1->set_type("INT");
|
||||
c1->set_is_key(false);
|
||||
c1->set_is_nullable(false);
|
||||
}
|
||||
|
||||
TabletSchemaPB schema_pb3;
|
||||
{
|
||||
schema_pb3.set_id(12);
|
||||
schema_pb3.set_num_short_key_columns(1);
|
||||
schema_pb3.set_keys_type(DUP_KEYS);
|
||||
schema_pb3.set_num_rows_per_row_block(65535);
|
||||
auto c2 = schema_pb3.add_column();
|
||||
c2->set_unique_id(2);
|
||||
c2->set_name("c2");
|
||||
c2->set_type("INT");
|
||||
c2->set_is_key(false);
|
||||
c2->set_is_nullable(false);
|
||||
}
|
||||
|
||||
starrocks::TabletMetadataPB metadata1;
|
||||
{
|
||||
metadata1.set_id(1);
|
||||
metadata1.set_version(2);
|
||||
metadata1.mutable_schema()->CopyFrom(schema_pb1);
|
||||
auto& item1 = (*metadata1.mutable_historical_schemas())[10];
|
||||
item1.CopyFrom(schema_pb1);
|
||||
auto& item2 = (*metadata1.mutable_historical_schemas())[11];
|
||||
item2.CopyFrom(schema_pb2);
|
||||
(*metadata1.mutable_rowset_to_schema())[3] = 11;
|
||||
}
|
||||
|
||||
starrocks::TabletMetadataPB metadata2;
|
||||
{
|
||||
metadata2.set_id(2);
|
||||
metadata2.set_version(2);
|
||||
metadata2.mutable_schema()->CopyFrom(schema_pb1);
|
||||
auto& item1 = (*metadata1.mutable_historical_schemas())[10];
|
||||
item1.CopyFrom(schema_pb1);
|
||||
auto& item2 = (*metadata1.mutable_historical_schemas())[12];
|
||||
item2.CopyFrom(schema_pb3);
|
||||
}
|
||||
build_schemas_and_metadata(&schema_pb1, &schema_pb2, &schema_pb3, &metadata1, &metadata2);
|
||||
|
||||
{
|
||||
// invalid stub
|
||||
AggregatePublishVersionRequest request;
|
||||
auto* compute_node1 = request.add_compute_nodes();
|
||||
compute_node1->set_host("invalid.host");
|
||||
compute_node1->set_brpc_port(port);
|
||||
auto* compute_node2 = request.add_compute_nodes();
|
||||
compute_node2->set_host("127.0.0.1");
|
||||
compute_node2->set_brpc_port(port);
|
||||
auto* publish_req = request.add_publish_reqs();
|
||||
publish_req->set_timeout_ms(5000);
|
||||
EXPECT_CALL(mock_service, publish_version(_, _, _, _))
|
||||
.WillOnce(Invoke([&](::google::protobuf::RpcController*, const PublishVersionRequest*,
|
||||
PublishVersionResponse* resp, ::google::protobuf::Closure* done) {
|
||||
resp->mutable_status()->set_status_code(0);
|
||||
auto& item1 = (*resp->mutable_tablet_metas())[1];
|
||||
item1.CopyFrom(metadata1);
|
||||
auto& item2 = (*resp->mutable_tablet_metas())[2];
|
||||
item2.CopyFrom(metadata2);
|
||||
done->Run();
|
||||
}));
|
||||
|
||||
PublishVersionResponse response;
|
||||
brpc::Controller cntl;
|
||||
google::protobuf::Closure* done = brpc::NewCallback([]() {});
|
||||
_lake_service.aggregate_publish_version(&cntl, &request, &response, done);
|
||||
EXPECT_FALSE(response.status().status_code() == 0);
|
||||
}
|
||||
PublishVersionResponse response;
|
||||
brpc::Controller cntl;
|
||||
google::protobuf::Closure* done = brpc::NewCallback([]() {});
|
||||
_lake_service.aggregate_publish_version(&cntl, &request, &response, done);
|
||||
|
||||
// normal response
|
||||
{
|
||||
EXPECT_CALL(mock_service, publish_version(_, _, _, _))
|
||||
.WillOnce(Invoke([&](::google::protobuf::RpcController*, const PublishVersionRequest*,
|
||||
PublishVersionResponse* resp, ::google::protobuf::Closure* done) {
|
||||
resp->mutable_status()->set_status_code(0);
|
||||
auto& item1 = (*resp->mutable_tablet_metas())[1];
|
||||
item1.CopyFrom(metadata1);
|
||||
auto& item2 = (*resp->mutable_tablet_metas())[2];
|
||||
item2.CopyFrom(metadata2);
|
||||
done->Run();
|
||||
}));
|
||||
EXPECT_EQ(response.status().status_code(), 0);
|
||||
auto res = _tablet_mgr->get_single_tablet_metadata(1, 2);
|
||||
ASSERT_TRUE(res.ok());
|
||||
ASSERT_EQ(res.value()->id(), 1);
|
||||
TabletMetadataPtr metadata3 = std::move(res).value();
|
||||
ASSERT_EQ(metadata3->schema().id(), 10);
|
||||
ASSERT_EQ(metadata3->historical_schemas_size(), 2);
|
||||
auto res2 = _tablet_mgr->get_single_tablet_metadata(2, 2);
|
||||
ASSERT_TRUE(res2.ok());
|
||||
ASSERT_EQ(res2.value()->id(), 2);
|
||||
|
||||
PublishVersionResponse response;
|
||||
brpc::Controller cntl;
|
||||
google::protobuf::Closure* done = brpc::NewCallback([]() {});
|
||||
_lake_service.aggregate_publish_version(&cntl, &request, &response, done);
|
||||
server.Stop(0);
|
||||
server.Join();
|
||||
}
|
||||
|
||||
EXPECT_EQ(response.status().status_code(), 0);
|
||||
// read tablet id - 1
|
||||
auto res = _tablet_mgr->get_single_tablet_metadata(1, 2);
|
||||
ASSERT_TRUE(res.ok());
|
||||
// check tablet id
|
||||
ASSERT_EQ(res.value()->id(), 1);
|
||||
TabletMetadataPtr metadata3 = std::move(res).value();
|
||||
ASSERT_EQ(metadata3->schema().id(), 10);
|
||||
ASSERT_EQ(metadata3->historical_schemas_size(), 2);
|
||||
// read tablet id - 2
|
||||
auto res2 = _tablet_mgr->get_single_tablet_metadata(2, 2);
|
||||
ASSERT_TRUE(res2.ok());
|
||||
// check tablet id
|
||||
ASSERT_EQ(res2.value()->id(), 2);
|
||||
}
|
||||
TEST_F(LakeServiceTest, test_aggregate_publish_version_failed) {
|
||||
brpc::Server server;
|
||||
MockLakeServiceImpl mock_service;
|
||||
int port = 0;
|
||||
init_server_with_mock(&mock_service, &server, &port);
|
||||
|
||||
// publish version failed
|
||||
{
|
||||
EXPECT_CALL(mock_service, publish_version(_, _, _, _))
|
||||
.WillOnce(Invoke([&](::google::protobuf::RpcController*, const PublishVersionRequest*,
|
||||
PublishVersionResponse* resp, ::google::protobuf::Closure* done) {
|
||||
resp->mutable_status()->set_status_code(1);
|
||||
auto& item1 = (*resp->mutable_tablet_metas())[1];
|
||||
item1.CopyFrom(metadata1);
|
||||
done->Run();
|
||||
}));
|
||||
auto request = build_default_agg_request(port);
|
||||
|
||||
PublishVersionResponse response;
|
||||
brpc::Controller cntl;
|
||||
google::protobuf::Closure* done = brpc::NewCallback([]() {});
|
||||
_lake_service.aggregate_publish_version(&cntl, &request, &response, done);
|
||||
TabletSchemaPB schema_pb1;
|
||||
TabletSchemaPB schema_pb2;
|
||||
starrocks::TabletMetadataPB metadata1;
|
||||
build_schemas_and_metadata(&schema_pb1, &schema_pb2, /*schema_pb3=*/nullptr, &metadata1, /*metadata2=*/nullptr);
|
||||
|
||||
EXPECT_EQ(response.status().status_code(), 6);
|
||||
}
|
||||
EXPECT_CALL(mock_service, publish_version(_, _, _, _))
|
||||
.WillOnce(Invoke([&](::google::protobuf::RpcController*, const PublishVersionRequest*,
|
||||
PublishVersionResponse* resp, ::google::protobuf::Closure* done) {
|
||||
resp->mutable_status()->set_status_code(1);
|
||||
auto& item1 = (*resp->mutable_tablet_metas())[1];
|
||||
item1.CopyFrom(metadata1);
|
||||
done->Run();
|
||||
}));
|
||||
|
||||
PublishVersionResponse response;
|
||||
brpc::Controller cntl;
|
||||
google::protobuf::Closure* done = brpc::NewCallback([]() {});
|
||||
_lake_service.aggregate_publish_version(&cntl, &request, &response, done);
|
||||
|
||||
EXPECT_EQ(response.status().status_code(), 6);
|
||||
|
||||
server.Stop(0);
|
||||
server.Join();
|
||||
|
|
|
|||
Loading…
Reference in New Issue