diff --git a/be/src/storage/sstable/table_builder.cpp b/be/src/storage/sstable/table_builder.cpp index a5b575df3f4..c71d4902aa3 100644 --- a/be/src/storage/sstable/table_builder.cpp +++ b/be/src/storage/sstable/table_builder.cpp @@ -66,7 +66,6 @@ TableBuilder::TableBuilder(const Options& options, WritableFile* file) : rep_(ne } TableBuilder::~TableBuilder() { - assert(rep_->closed); // Catch errors where caller forgot to call Finish() delete rep_->filter_block; delete rep_; } diff --git a/be/test/storage/lake/lake_primary_key_consistency_test.cpp b/be/test/storage/lake/lake_primary_key_consistency_test.cpp index 2482f0ea819..e93204dc0d6 100644 --- a/be/test/storage/lake/lake_primary_key_consistency_test.cpp +++ b/be/test/storage/lake/lake_primary_key_consistency_test.cpp @@ -99,15 +99,15 @@ public: } bool check(const ChunkPtr& chunk) { - std::map> tmp_chunk; + std::map> tmp_chunk; for (int i = 0; i < chunk->num_rows(); i++) { - if (tmp_chunk.count(chunk->columns()[0]->get(i).get_int32()) > 0) { + if (tmp_chunk.count(chunk->columns()[0]->get(i).get_slice().to_string()) > 0) { // duplicate pk - LOG(ERROR) << "duplicate pk: " << chunk->columns()[0]->get(i).get_int32(); + LOG(ERROR) << "duplicate pk: " << chunk->columns()[0]->get(i).get_slice().to_string(); return false; } - tmp_chunk[chunk->columns()[0]->get(i).get_int32()] = {chunk->columns()[1]->get(i).get_int32(), - chunk->columns()[2]->get(i).get_int32()}; + tmp_chunk[chunk->columns()[0]->get(i).get_slice().to_string()] = {chunk->columns()[1]->get(i).get_int32(), + chunk->columns()[2]->get(i).get_int32()}; } if (tmp_chunk.size() != _replayer_index.size()) { LOG(ERROR) << "inconsistency row number, actual : " << tmp_chunk.size() @@ -146,24 +146,24 @@ public: if (log.op == ReplayerOP::UPSERT) { // Upsert for (int i = 0; i < chunk->num_rows(); i++) { - _replayer_index[chunk->columns()[0]->get(i).get_int32()] = { + _replayer_index[chunk->columns()[0]->get(i).get_slice().to_string()] = { chunk->columns()[1]->get(i).get_int32(), chunk->columns()[2]->get(i).get_int32()}; } } else if (log.op == ReplayerOP::ERASE) { // Delete for (int i = 0; i < chunk->num_rows(); i++) { - _replayer_index.erase(chunk->columns()[0]->get(i).get_int32()); + _replayer_index.erase(chunk->columns()[0]->get(i).get_slice().to_string()); } } else if (log.op == ReplayerOP::PARTIAL_UPSERT || log.op == ReplayerOP::PARTIAL_UPDATE) { // Partial update for (int i = 0; i < chunk->num_rows(); i++) { - auto iter = _replayer_index.find(chunk->columns()[0]->get(i).get_int32()); + auto iter = _replayer_index.find(chunk->columns()[0]->get(i).get_slice().to_string()); if (iter != _replayer_index.end()) { - _replayer_index[chunk->columns()[0]->get(i).get_int32()] = { + _replayer_index[chunk->columns()[0]->get(i).get_slice().to_string()] = { chunk->columns()[1]->get(i).get_int32(), iter->second.second}; } else if (log.op == ReplayerOP::PARTIAL_UPSERT) { // insert new record with default val - _replayer_index[chunk->columns()[0]->get(i).get_int32()] = { + _replayer_index[chunk->columns()[0]->get(i).get_slice().to_string()] = { chunk->columns()[1]->get(i).get_int32(), 0}; } else { // do nothing @@ -186,11 +186,11 @@ public: return false; }; for (int i = 0; i < chunk->num_rows(); i++) { - auto iter = _replayer_index.find(chunk->columns()[0]->get(i).get_int32()); + auto iter = _replayer_index.find(chunk->columns()[0]->get(i).get_slice().to_string()); if (iter == _replayer_index.end() || is_condition_meet_fn(iter->second, i)) { // update if condition meet or not found // insert new record - _replayer_index[chunk->columns()[0]->get(i).get_int32()] = { + _replayer_index[chunk->columns()[0]->get(i).get_slice().to_string()] = { chunk->columns()[1]->get(i).get_int32(), chunk->columns()[2]->get(i).get_int32()}; } } @@ -206,7 +206,7 @@ private: // logs for replay. std::vector _redo_logs; // c0 -> - std::map> _replayer_index; + std::map> _replayer_index; }; class LakePrimaryKeyConsistencyTest : public TestBase, testing::WithParamInterface { @@ -216,8 +216,8 @@ public: _tablet_metadata->set_enable_persistent_index(true); _tablet_metadata->set_persistent_index_type(GetParam().persistent_index_type); - _slots.emplace_back(0, "c0", TypeDescriptor{LogicalType::TYPE_INT}); - _partial_slots.emplace_back(0, "c0", TypeDescriptor{LogicalType::TYPE_INT}); + _slots.emplace_back(0, "c0", TypeDescriptor{LogicalType::TYPE_VARCHAR}); + _partial_slots.emplace_back(0, "c0", TypeDescriptor{LogicalType::TYPE_VARCHAR}); _slots.emplace_back(1, "c1", TypeDescriptor{LogicalType::TYPE_INT}); _partial_slots.emplace_back(1, "c1", TypeDescriptor{LogicalType::TYPE_INT}); _slots.emplace_back(2, "c2", TypeDescriptor{LogicalType::TYPE_INT}); @@ -269,6 +269,8 @@ public: config::enable_pindex_minor_compaction = false; _old_enable_pk_strict_memcheck = config::enable_pk_strict_memcheck; config::enable_pk_strict_memcheck = false; + _old_pk_parallel_execution_threshold_bytes = config::pk_parallel_execution_threshold_bytes; + config::pk_parallel_execution_threshold_bytes = 1; } void TearDown() override { @@ -276,6 +278,7 @@ public: config::l0_max_mem_usage = _old_l0_size; config::write_buffer_size = _old_memtable_size; config::enable_pk_strict_memcheck = _old_enable_pk_strict_memcheck; + config::pk_parallel_execution_threshold_bytes = _old_pk_parallel_execution_threshold_bytes; } std::shared_ptr generate_tablet_metadata(KeysType keys_type) { @@ -287,7 +290,7 @@ public: // // | column | type | KEY | NULL | // +--------+------+-----+------+ - // | c0 | INT | YES | NO | + // | c0 | STRING | YES | NO | // | c1 | INT | NO | NO | // | c2 | INT | NO | NO | auto schema = metadata->mutable_schema(); @@ -299,9 +302,10 @@ public: { c0->set_unique_id(next_id()); c0->set_name("c0"); - c0->set_type("INT"); + c0->set_type("VARCHAR"); c0->set_is_key(true); c0->set_is_nullable(false); + c0->set_length(3200); } auto c1 = schema->add_column(); { @@ -327,14 +331,22 @@ public: std::pair> gen_upsert_data(bool is_upsert) { const size_t chunk_size = (size_t)_random_generator->random_n(); std::vector> cols(3); + std::vector key_col_str; + std::vector key_col; std::vector v3(chunk_size, is_upsert ? TOpType::UPSERT : TOpType::DELETE); _random_generator->random_cols(chunk_size, &cols); + for (size_t i = 0; i < chunk_size; i++) { + key_col_str.emplace_back(std::to_string(cols[0][i])); + } + for (const auto& s : key_col_str) { + key_col.emplace_back(Slice(s)); + } - auto c0 = Int32Column::create(); + auto c0 = BinaryColumn::create(); auto c1 = Int32Column::create(); auto c2 = Int32Column::create(); auto c3 = Int8Column::create(); - c0->append_numbers(cols[0].data(), cols[0].size() * sizeof(int)); + c0->append_strings(key_col.data(), key_col.size()); c1->append_numbers(cols[1].data(), cols[1].size() * sizeof(int)); c2->append_numbers(cols[2].data(), cols[2].size() * sizeof(int)); c3->append_numbers(v3.data(), v3.size() * sizeof(uint8_t)); @@ -350,12 +362,20 @@ public: std::pair> gen_partial_update_data() { const size_t chunk_size = (size_t)_random_generator->random_n(); std::vector> cols(2); + std::vector key_col_str; + std::vector key_col; std::vector v3(chunk_size, TOpType::UPSERT); _random_generator->random_cols(chunk_size, &cols); + for (size_t i = 0; i < chunk_size; i++) { + key_col_str.emplace_back(std::to_string(cols[0][i])); + } + for (const auto& s : key_col_str) { + key_col.emplace_back(Slice(s)); + } - auto c0 = Int32Column::create(); + auto c0 = BinaryColumn::create(); auto c1 = Int32Column::create(); - c0->append_numbers(cols[0].data(), cols[0].size() * sizeof(int)); + c0->append_strings(key_col.data(), key_col.size()); c1->append_numbers(cols[1].data(), cols[1].size() * sizeof(int)); auto indexes = std::vector(chunk_size); for (uint32_t i = 0; i < chunk_size; i++) { @@ -364,6 +384,29 @@ public: return {std::make_shared(Columns{std::move(c0), std::move(c1)}, _slot_cid_map), std::move(indexes)}; } + // 5% chance to force index memtable flush + std::unique_ptr> random_force_index_mem_flush() { + std::unique_ptr> force_flush_guard; + uint32_t r = _random_generator->random() % 100; + if (r < 5) { + // 5% chance to force index memtable flush + force_flush_guard = std::make_unique>(&config::l0_max_mem_usage, 1); + } + return force_flush_guard; + } + + // 20% chance to enable pk parallel execution + std::unique_ptr> random_pk_parallel_execution() { + std::unique_ptr> pk_parallel_execution_guard; + uint32_t r = _random_generator->random() % 100; + if (r < 20) { + // 20% chance to enable pk parallel execution + pk_parallel_execution_guard = + std::make_unique>(&config::enable_pk_parallel_execution, true); + } + return pk_parallel_execution_guard; + } + ChunkPtr read(int64_t tablet_id, int64_t version) { ASSIGN_OR_ABORT(auto metadata, _tablet_mgr->get_tablet_metadata(tablet_id, version)); auto reader = std::make_shared(_tablet_mgr.get(), metadata, *_schema); @@ -383,6 +426,8 @@ public: } Status upsert_op() { + std::unique_ptr> force_index_mem_flush_guard = random_force_index_mem_flush(); + std::unique_ptr> pk_parallel_execution_guard = random_pk_parallel_execution(); auto txn_id = next_id(); ASSIGN_OR_ABORT(auto delta_writer, DeltaWriterBuilder() .set_tablet_manager(_tablet_mgr.get()) @@ -418,6 +463,8 @@ public: } Status partial_update_op(PartialUpdateMode mode) { + std::unique_ptr> force_index_mem_flush_guard = random_force_index_mem_flush(); + std::unique_ptr> pk_parallel_execution_guard = random_pk_parallel_execution(); auto txn_id = next_id(); ASSIGN_OR_ABORT(auto delta_writer, DeltaWriterBuilder() .set_tablet_manager(_tablet_mgr.get()) @@ -453,6 +500,8 @@ public: } Status condition_update() { + std::unique_ptr> force_index_mem_flush_guard = random_force_index_mem_flush(); + std::unique_ptr> pk_parallel_execution_guard = random_pk_parallel_execution(); auto txn_id = next_id(); // c2 as merge_condition std::string merge_condition = "c2"; @@ -484,6 +533,8 @@ public: } Status upsert_with_batch_pub_op() { + std::unique_ptr> force_index_mem_flush_guard = random_force_index_mem_flush(); + std::unique_ptr> pk_parallel_execution_guard = random_pk_parallel_execution(); size_t batch_cnt = std::max(_random_generator->random() % MaxBatchCnt, (size_t)1); std::vector txn_ids; for (int i = 0; i < batch_cnt; i++) { @@ -525,6 +576,8 @@ public: } Status delete_op() { + std::unique_ptr> force_index_mem_flush_guard = random_force_index_mem_flush(); + std::unique_ptr> pk_parallel_execution_guard = random_pk_parallel_execution(); auto chunk_index = gen_upsert_data(false); auto txn_id = next_id(); ASSIGN_OR_ABORT(auto delta_writer, DeltaWriterBuilder() @@ -550,6 +603,8 @@ public: } Status compact_op() { + std::unique_ptr> force_index_mem_flush_guard = random_force_index_mem_flush(); + std::unique_ptr> pk_parallel_execution_guard = random_pk_parallel_execution(); auto txn_id = next_id(); auto task_context = std::make_unique(txn_id, _tablet_metadata->id(), _version, false, false, nullptr); @@ -658,6 +713,7 @@ protected: int64_t _old_l0_size = 0; int64_t _old_memtable_size = 0; bool _old_enable_pk_strict_memcheck = false; + int64_t _old_pk_parallel_execution_threshold_bytes = 0; }; TEST_P(LakePrimaryKeyConsistencyTest, test_local_pk_consistency) {