[BugFix] Fix dcg data not read correctly when switching from column mode to row mode in partial update (#61529)
Signed-off-by: predator4ann <yunlong.sun@hotmail.com>
This commit is contained in:
parent
b9176a4696
commit
7f5710459a
|
|
@ -19,6 +19,7 @@
|
|||
#include "runtime/current_thread.h"
|
||||
#include "storage/chunk_helper.h"
|
||||
#include "storage/del_vector.h"
|
||||
#include "storage/delta_column_group.h"
|
||||
#include "storage/lake/column_mode_partial_update_handler.h"
|
||||
#include "storage/lake/lake_local_persistent_index.h"
|
||||
#include "storage/lake/lake_persistent_index.h"
|
||||
|
|
@ -32,6 +33,9 @@
|
|||
#include "storage/rowset/column_iterator.h"
|
||||
#include "storage/rowset/default_value_column_iterator.h"
|
||||
#include "storage/tablet_manager.h"
|
||||
#include "storage/tablet_schema.h"
|
||||
#include "storage/tablet_updates.h"
|
||||
#include "storage/utils.h"
|
||||
#include "testutil/sync_point.h"
|
||||
#include "util/failpoint/fail_point.h"
|
||||
#include "util/pretty_printer.h"
|
||||
|
|
@ -562,6 +566,60 @@ Status UpdateManager::get_rowids_from_pkindex(int64_t tablet_id, int64_t base_ve
|
|||
return st;
|
||||
}
|
||||
|
||||
static StatusOr<std::shared_ptr<Segment>> get_lake_dcg_segment(GetDeltaColumnContext& ctx, uint32_t ucid,
|
||||
int32_t* col_index,
|
||||
const TabletSchemaCSPtr& read_tablet_schema) {
|
||||
// iterate dcg from new ver to old ver
|
||||
for (const auto& dcg : ctx.dcgs) {
|
||||
std::pair<int32_t, int32_t> idx = dcg->get_column_idx(ucid);
|
||||
if (idx.first < 0) {
|
||||
// column not found in this DCG, try next one
|
||||
continue;
|
||||
}
|
||||
|
||||
auto column_file_result = dcg->column_file_by_idx(parent_name(ctx.segment->file_name()), idx.first);
|
||||
if (!column_file_result.ok()) {
|
||||
return Status::InternalError(
|
||||
fmt::format("DCG file not found for column {}: {}", ucid, column_file_result.status().to_string()));
|
||||
}
|
||||
std::string column_file = column_file_result.value();
|
||||
|
||||
if (ctx.dcg_segments.count(column_file) == 0) {
|
||||
auto dcg_segment_result = ctx.segment->new_dcg_segment(*dcg, idx.first, read_tablet_schema);
|
||||
if (!dcg_segment_result.ok()) {
|
||||
return Status::InternalError(fmt::format("Failed to create DCG segment for column {}: {}", ucid,
|
||||
dcg_segment_result.status().to_string()));
|
||||
}
|
||||
ctx.dcg_segments[column_file] = dcg_segment_result.value();
|
||||
}
|
||||
|
||||
if (col_index != nullptr) {
|
||||
*col_index = idx.second;
|
||||
}
|
||||
return ctx.dcg_segments[column_file];
|
||||
}
|
||||
return Status::NotFound(fmt::format("Column {} not found in any DCG", ucid));
|
||||
}
|
||||
|
||||
static StatusOr<std::unique_ptr<ColumnIterator>> new_lake_dcg_column_iterator(
|
||||
GetDeltaColumnContext& ctx, const std::shared_ptr<FileSystem>& fs, ColumnIteratorOptions& iter_opts,
|
||||
const TabletColumn& column, const TabletSchemaCSPtr& read_tablet_schema) {
|
||||
// build column iter from dcg
|
||||
int32_t col_index = 0;
|
||||
auto dcg_segment_result = get_lake_dcg_segment(ctx, column.unique_id(), &col_index, read_tablet_schema);
|
||||
if (!dcg_segment_result.ok()) {
|
||||
return dcg_segment_result.status();
|
||||
}
|
||||
|
||||
auto dcg_segment = dcg_segment_result.value();
|
||||
if (ctx.dcg_read_files.count(dcg_segment->file_name()) == 0) {
|
||||
ASSIGN_OR_RETURN(auto read_file, fs->new_random_access_file(dcg_segment->file_info()));
|
||||
ctx.dcg_read_files[dcg_segment->file_name()] = std::move(read_file);
|
||||
}
|
||||
iter_opts.read_file = ctx.dcg_read_files[dcg_segment->file_name()].get();
|
||||
return dcg_segment->new_column_iterator(column, nullptr);
|
||||
}
|
||||
|
||||
Status UpdateManager::get_column_values(const RowsetUpdateStateParams& params, std::vector<uint32_t>& column_ids,
|
||||
bool with_default, std::map<uint32_t, std::vector<uint32_t>>& rowids_by_rssid,
|
||||
vector<MutableColumnPtr>* columns,
|
||||
|
|
@ -601,6 +659,31 @@ Status UpdateManager::get_column_values(const RowsetUpdateStateParams& params, s
|
|||
watch.reset();
|
||||
|
||||
std::shared_ptr<FileSystem> fs;
|
||||
|
||||
bool need_dcg_check = false;
|
||||
std::unordered_map<uint32_t, GetDeltaColumnContext> dcg_contexts;
|
||||
bool has_any_dcg = false;
|
||||
if (params.op_write.has_txn_meta()) {
|
||||
// only enable dcg logic when switching from column mode to row mode
|
||||
need_dcg_check = !params.metadata->dcg_meta().dcgs().empty();
|
||||
}
|
||||
|
||||
if (need_dcg_check) {
|
||||
LakeDeltaColumnGroupLoader dcg_loader(params.metadata);
|
||||
for (const auto& [rssid, rowids] : rowids_by_rssid) {
|
||||
TabletSegmentId tsid;
|
||||
tsid.tablet_id = params.tablet->id();
|
||||
tsid.segment_id = rssid;
|
||||
|
||||
DeltaColumnGroupList dcgs;
|
||||
Status dcg_status = dcg_loader.load(tsid, params.metadata->version(), &dcgs);
|
||||
if (dcg_status.ok() && !dcgs.empty()) {
|
||||
dcg_contexts[rssid].dcgs = std::move(dcgs);
|
||||
has_any_dcg = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
auto fetch_values_from_segment = [&](const FileInfo& segment_info, uint32_t segment_id,
|
||||
const TabletSchemaCSPtr& tablet_schema, const std::vector<uint32_t>& rowids,
|
||||
const std::vector<uint32_t>& read_column_ids) -> Status {
|
||||
|
|
@ -632,9 +715,36 @@ Status UpdateManager::get_column_values(const RowsetUpdateStateParams& params, s
|
|||
iter_opts.stats = &stats;
|
||||
ASSIGN_OR_RETURN(auto read_file, fs->new_random_access_file_with_bundling(opts, file_info));
|
||||
iter_opts.read_file = read_file.get();
|
||||
|
||||
GetDeltaColumnContext* dcg_ctx = nullptr;
|
||||
if (has_any_dcg && dcg_contexts.count(segment_id) > 0 && !dcg_contexts[segment_id].dcgs.empty()) {
|
||||
dcg_ctx = &dcg_contexts[segment_id];
|
||||
dcg_ctx->segment = *segment;
|
||||
}
|
||||
|
||||
for (auto i = 0; i < read_column_ids.size(); ++i) {
|
||||
const TabletColumn& col = tablet_schema->column(read_column_ids[i]);
|
||||
ASSIGN_OR_RETURN(auto col_iter, (*segment)->new_column_iterator_or_default(col, nullptr));
|
||||
std::unique_ptr<ColumnIterator> col_iter = nullptr;
|
||||
|
||||
// try dcg read only if dcg context exists
|
||||
if (dcg_ctx != nullptr) {
|
||||
auto dcg_col_iter_result = new_lake_dcg_column_iterator(*dcg_ctx, fs, iter_opts, col, tablet_schema);
|
||||
if (dcg_col_iter_result.ok()) {
|
||||
col_iter = std::move(dcg_col_iter_result.value());
|
||||
} else if (!dcg_col_iter_result.status().is_not_found()) {
|
||||
// NotFound is expected when column doesn't exist in DCG, other errors are real issues
|
||||
return Status::InternalError(fmt::format("Failed to create DCG column iterator for column {}: {}",
|
||||
col.name(), dcg_col_iter_result.status().to_string()));
|
||||
}
|
||||
// If status is NotFound, col_iter remains nullptr and we'll read from original segment
|
||||
}
|
||||
|
||||
// read from original segment if no dcg data available
|
||||
if (col_iter == nullptr) {
|
||||
ASSIGN_OR_RETURN(col_iter, (*segment)->new_column_iterator_or_default(col, nullptr));
|
||||
iter_opts.read_file = read_file.get();
|
||||
}
|
||||
|
||||
RETURN_IF_ERROR(col_iter->init(iter_opts));
|
||||
RETURN_IF_ERROR(col_iter->fetch_values_by_rowid(rowids.data(), rowids.size(), (*columns)[i].get()));
|
||||
// padding char columns
|
||||
|
|
@ -657,9 +767,9 @@ Status UpdateManager::get_column_values(const RowsetUpdateStateParams& params, s
|
|||
return Status::Cancelled(fmt::format("tablet id {} version {} rowset_segment_id {} no exist",
|
||||
params.metadata->id(), params.metadata->version(), rssid));
|
||||
}
|
||||
// use 0 segment_id is safe, because we need not get either delvector or dcg here
|
||||
RETURN_IF_ERROR(fetch_values_from_segment(params.container.rssid_to_file().at(rssid), 0, params.tablet_schema,
|
||||
rowids, column_ids));
|
||||
// pass the actual segment_id to properly handle DCG reading
|
||||
RETURN_IF_ERROR(fetch_values_from_segment(params.container.rssid_to_file().at(rssid), rssid,
|
||||
params.tablet_schema, rowids, column_ids));
|
||||
}
|
||||
if (auto_increment_state != nullptr && with_default) {
|
||||
if (fs == nullptr) {
|
||||
|
|
|
|||
|
|
@ -273,6 +273,133 @@ TEST_P(LakePartialUpdateTest, test_write) {
|
|||
}
|
||||
}
|
||||
|
||||
// This test case covers the following logic:
|
||||
// - with_default branch in get_column_values() (default values and column_to_expr_value override)
|
||||
// - Column mode generates DCG then switches to row mode, triggering need_dcg_check and DCG loading paths
|
||||
TEST_P(LakePartialUpdateTest, test_dcg_then_row_mode_with_default_and_expr_override) {
|
||||
auto chunk0 = generate_data(kChunkSize, 0, false, 3);
|
||||
auto chunk_partial_same_keys = generate_data(kChunkSize, 0, true, 3);
|
||||
|
||||
// Construct a batch of "new primary key" partial update data, containing only (c0, c1)
|
||||
// c0 = i + kChunkSize, c1 = i * 3 (values don't matter much, keeping consistent ratio with generate_data)
|
||||
std::vector<int> new_keys(kChunkSize);
|
||||
std::vector<int> new_vals(kChunkSize);
|
||||
for (int i = 0; i < kChunkSize; i++) {
|
||||
new_keys[i] = i + kChunkSize;
|
||||
new_vals[i] = new_keys[i] * 3;
|
||||
}
|
||||
auto c0_new = Int32Column::create();
|
||||
auto c1_new = Int32Column::create();
|
||||
c0_new->append_numbers(new_keys.data(), new_keys.size() * sizeof(int));
|
||||
c1_new->append_numbers(new_vals.data(), new_vals.size() * sizeof(int));
|
||||
Chunk chunk_partial_new_keys({c0_new, c1_new}, _slot_cid_map);
|
||||
|
||||
auto indexes = std::vector<uint32_t>(kChunkSize);
|
||||
for (int i = 0; i < kChunkSize; i++) indexes[i] = i;
|
||||
|
||||
auto version = 1;
|
||||
auto tablet_id = _tablet_metadata->id();
|
||||
|
||||
// 1) Basic full writes (3 versions)
|
||||
for (int i = 0; i < 3; i++) {
|
||||
auto txn_id = next_id();
|
||||
ASSIGN_OR_ABORT(auto delta_writer, DeltaWriterBuilder()
|
||||
.set_tablet_manager(_tablet_mgr.get())
|
||||
.set_tablet_id(tablet_id)
|
||||
.set_txn_id(txn_id)
|
||||
.set_partition_id(_partition_id)
|
||||
.set_mem_tracker(_mem_tracker.get())
|
||||
.set_schema_id(_tablet_schema->id())
|
||||
.build());
|
||||
ASSERT_OK(delta_writer->open());
|
||||
ASSERT_OK(delta_writer->write(chunk0, indexes.data(), indexes.size()));
|
||||
ASSERT_OK(delta_writer->finish_with_txnlog());
|
||||
delta_writer->close();
|
||||
ASSERT_OK(publish_single_version(tablet_id, version + 1, txn_id).status());
|
||||
version++;
|
||||
}
|
||||
|
||||
// 2) Column mode (COLUMN_UPDATE_MODE) partial update, generating DCG
|
||||
{
|
||||
auto txn_id = next_id();
|
||||
ASSIGN_OR_ABORT(auto delta_writer, DeltaWriterBuilder()
|
||||
.set_tablet_manager(_tablet_mgr.get())
|
||||
.set_tablet_id(tablet_id)
|
||||
.set_txn_id(txn_id)
|
||||
.set_partition_id(_partition_id)
|
||||
.set_mem_tracker(_mem_tracker.get())
|
||||
.set_schema_id(_tablet_schema->id())
|
||||
.set_slot_descriptors(&_slot_pointers)
|
||||
.set_partial_update_mode(PartialUpdateMode::COLUMN_UPDATE_MODE)
|
||||
.build());
|
||||
ASSERT_OK(delta_writer->open());
|
||||
ASSERT_OK(delta_writer->write(chunk_partial_same_keys, indexes.data(), indexes.size()));
|
||||
ASSERT_OK(delta_writer->finish_with_txnlog());
|
||||
delta_writer->close();
|
||||
ASSERT_OK(publish_single_version(tablet_id, version + 1, txn_id).status());
|
||||
version++;
|
||||
}
|
||||
|
||||
// 3) Row mode (ROW_MODE) partial update (same primary keys), only providing (c0, c1), triggering need_dcg_check and DCG loading
|
||||
{
|
||||
auto txn_id = next_id();
|
||||
ASSIGN_OR_ABORT(auto delta_writer, DeltaWriterBuilder()
|
||||
.set_tablet_manager(_tablet_mgr.get())
|
||||
.set_tablet_id(tablet_id)
|
||||
.set_txn_id(txn_id)
|
||||
.set_partition_id(_partition_id)
|
||||
.set_mem_tracker(_mem_tracker.get())
|
||||
.set_schema_id(_tablet_schema->id())
|
||||
.set_slot_descriptors(&_slot_pointers)
|
||||
.set_partial_update_mode(PartialUpdateMode::ROW_MODE)
|
||||
.build());
|
||||
ASSERT_OK(delta_writer->open());
|
||||
ASSERT_OK(delta_writer->write(chunk_partial_same_keys, indexes.data(), indexes.size()));
|
||||
ASSERT_OK(delta_writer->finish_with_txnlog());
|
||||
delta_writer->close();
|
||||
ASSERT_OK(publish_single_version(tablet_id, version + 1, txn_id).status());
|
||||
version++;
|
||||
}
|
||||
|
||||
// 4) Row mode (ROW_MODE) partial update (new primary keys), covering with_default branch, and overriding default values via column_to_expr_value
|
||||
{
|
||||
std::map<std::string, std::string> expr_overrides;
|
||||
// Override the default value of unprovided column c2 from schema default (10) to 77
|
||||
expr_overrides.emplace("c2", "77");
|
||||
|
||||
auto txn_id = next_id();
|
||||
ASSIGN_OR_ABORT(auto delta_writer, DeltaWriterBuilder()
|
||||
.set_tablet_manager(_tablet_mgr.get())
|
||||
.set_tablet_id(tablet_id)
|
||||
.set_txn_id(txn_id)
|
||||
.set_partition_id(_partition_id)
|
||||
.set_mem_tracker(_mem_tracker.get())
|
||||
.set_schema_id(_tablet_schema->id())
|
||||
.set_slot_descriptors(&_slot_pointers)
|
||||
.set_partial_update_mode(PartialUpdateMode::ROW_MODE)
|
||||
.set_column_to_expr_value(&expr_overrides)
|
||||
.build());
|
||||
ASSERT_OK(delta_writer->open());
|
||||
ASSERT_OK(delta_writer->write(chunk_partial_new_keys, indexes.data(), indexes.size()));
|
||||
ASSERT_OK(delta_writer->finish_with_txnlog());
|
||||
delta_writer->close();
|
||||
ASSERT_OK(publish_single_version(tablet_id, version + 1, txn_id).status());
|
||||
version++;
|
||||
}
|
||||
|
||||
// Verification:
|
||||
// - Old primary keys [0, kChunkSize) still satisfy c1 = c0*3, c2 = c0*4
|
||||
// - New primary keys [kChunkSize, 2*kChunkSize) satisfy c1 = c0*3, and c2 is overridden by expr to 77
|
||||
ASSERT_EQ(kChunkSize * 2, check(version, [&](int c0, int c1, int c2) {
|
||||
if (c0 < kChunkSize) {
|
||||
return (c1 == c0 * 3) && (c2 == c0 * 4);
|
||||
} else if (c0 < kChunkSize * 2) {
|
||||
return (c1 == c0 * 3) && (c2 == 77);
|
||||
}
|
||||
return false;
|
||||
}));
|
||||
}
|
||||
|
||||
TEST_P(LakePartialUpdateTest, test_partial_update_with_condition) {
|
||||
if (GetParam().partial_update_mode == PartialUpdateMode::COLUMN_UPDATE_MODE) {
|
||||
return;
|
||||
|
|
@ -350,6 +477,196 @@ TEST_P(LakePartialUpdateTest, test_partial_update_with_condition) {
|
|||
}
|
||||
}
|
||||
|
||||
TEST_P(LakePartialUpdateTest, test_dcg_not_found_and_fallback_to_segment) {
|
||||
// Prepare base full data
|
||||
auto chunk0 = generate_data(kChunkSize, 0, false, 3);
|
||||
auto indexes = std::vector<uint32_t>(kChunkSize);
|
||||
for (int i = 0; i < kChunkSize; i++) indexes[i] = i;
|
||||
|
||||
auto version = 1;
|
||||
auto tablet_id = _tablet_metadata->id();
|
||||
for (int i = 0; i < 2; i++) {
|
||||
auto txn_id = next_id();
|
||||
ASSIGN_OR_ABORT(auto delta_writer, DeltaWriterBuilder()
|
||||
.set_tablet_manager(_tablet_mgr.get())
|
||||
.set_tablet_id(tablet_id)
|
||||
.set_txn_id(txn_id)
|
||||
.set_partition_id(_partition_id)
|
||||
.set_mem_tracker(_mem_tracker.get())
|
||||
.set_schema_id(_tablet_schema->id())
|
||||
.build());
|
||||
ASSERT_OK(delta_writer->open());
|
||||
ASSERT_OK(delta_writer->write(chunk0, indexes.data(), indexes.size()));
|
||||
ASSERT_OK(delta_writer->finish_with_txnlog());
|
||||
delta_writer->close();
|
||||
ASSERT_OK(publish_single_version(tablet_id, version + 1, txn_id).status());
|
||||
version++;
|
||||
}
|
||||
|
||||
// Column mode update on one column to create DCG for c1 only
|
||||
auto partial_c1 = generate_data(kChunkSize, 0, true, 7); // (c0, c1)
|
||||
{
|
||||
auto txn_id = next_id();
|
||||
ASSIGN_OR_ABORT(auto delta_writer, DeltaWriterBuilder()
|
||||
.set_tablet_manager(_tablet_mgr.get())
|
||||
.set_tablet_id(tablet_id)
|
||||
.set_txn_id(txn_id)
|
||||
.set_partition_id(_partition_id)
|
||||
.set_mem_tracker(_mem_tracker.get())
|
||||
.set_schema_id(_tablet_schema->id())
|
||||
.set_slot_descriptors(&_slot_pointers)
|
||||
.set_partial_update_mode(PartialUpdateMode::COLUMN_UPDATE_MODE)
|
||||
.build());
|
||||
ASSERT_OK(delta_writer->open());
|
||||
ASSERT_OK(delta_writer->write(partial_c1, indexes.data(), indexes.size()));
|
||||
ASSERT_OK(delta_writer->finish_with_txnlog());
|
||||
delta_writer->close();
|
||||
ASSERT_OK(publish_single_version(tablet_id, version + 1, txn_id).status());
|
||||
version++;
|
||||
}
|
||||
|
||||
// Row mode read to fetch unmodified c2: we provide (c0, c1) so the writer schema matches slots (2 columns),
|
||||
// and get_column_values() will read unmodified c2. Since DCG has only c1, reading c2 triggers DCG NotFound fallback.
|
||||
std::vector<int> keys_only(kChunkSize);
|
||||
std::vector<int> c1_vals(kChunkSize);
|
||||
for (int i = 0; i < kChunkSize; i++) {
|
||||
keys_only[i] = i;
|
||||
c1_vals[i] = i * 7; // consistent with partial_c1 ratio
|
||||
}
|
||||
auto c0_col = Int32Column::create();
|
||||
auto c1_col = Int32Column::create();
|
||||
c0_col->append_numbers(keys_only.data(), keys_only.size() * sizeof(int));
|
||||
c1_col->append_numbers(c1_vals.data(), c1_vals.size() * sizeof(int));
|
||||
Chunk::SlotHashMap slot_kv;
|
||||
slot_kv[0] = 0; // c0
|
||||
slot_kv[1] = 1; // c1
|
||||
Chunk keys_c1_chunk({std::move(c0_col), std::move(c1_col)}, slot_kv);
|
||||
|
||||
{
|
||||
auto txn_id = next_id();
|
||||
ASSIGN_OR_ABORT(auto delta_writer, DeltaWriterBuilder()
|
||||
.set_tablet_manager(_tablet_mgr.get())
|
||||
.set_tablet_id(tablet_id)
|
||||
.set_txn_id(txn_id)
|
||||
.set_partition_id(_partition_id)
|
||||
.set_mem_tracker(_mem_tracker.get())
|
||||
.set_schema_id(_tablet_schema->id())
|
||||
.set_slot_descriptors(&_slot_pointers)
|
||||
.set_partial_update_mode(PartialUpdateMode::ROW_MODE)
|
||||
.build());
|
||||
ASSERT_OK(delta_writer->open());
|
||||
ASSERT_OK(delta_writer->write(keys_c1_chunk, indexes.data(), indexes.size()));
|
||||
ASSERT_OK(delta_writer->finish_with_txnlog());
|
||||
delta_writer->close();
|
||||
ASSERT_OK(publish_single_version(tablet_id, version + 1, txn_id).status());
|
||||
version++;
|
||||
}
|
||||
|
||||
// Verify c2 is still from original segment (fallback) or default, and c1 reflects DCG update
|
||||
ASSERT_EQ(kChunkSize, check(version, [](int c0, int c1, int c2) { return (c1 == c0 * 7) && (c2 == c0 * 4); }));
|
||||
}
|
||||
|
||||
// Explicitly test DCG column file missing: column_file_by_idx returns a path but the file is removed,
|
||||
// so new_dcg_segment fails and get_column_values should surface an InternalError during publish.
|
||||
TEST_P(LakePartialUpdateTest, test_dcg_segment_missing_files_returns_error) {
|
||||
auto chunk0 = generate_data(kChunkSize, 0, false, 3);
|
||||
auto partial_c1 = generate_data(kChunkSize, 0, true, 7); // (c0, c1) to generate DCG for c1
|
||||
auto indexes = std::vector<uint32_t>(kChunkSize);
|
||||
for (int i = 0; i < kChunkSize; i++) indexes[i] = i;
|
||||
|
||||
auto version = 1;
|
||||
auto tablet_id = _tablet_metadata->id();
|
||||
|
||||
// Base full write
|
||||
{
|
||||
auto txn_id = next_id();
|
||||
ASSIGN_OR_ABORT(auto delta_writer, DeltaWriterBuilder()
|
||||
.set_tablet_manager(_tablet_mgr.get())
|
||||
.set_tablet_id(tablet_id)
|
||||
.set_txn_id(txn_id)
|
||||
.set_partition_id(_partition_id)
|
||||
.set_mem_tracker(_mem_tracker.get())
|
||||
.set_schema_id(_tablet_schema->id())
|
||||
.build());
|
||||
ASSERT_OK(delta_writer->open());
|
||||
ASSERT_OK(delta_writer->write(chunk0, indexes.data(), indexes.size()));
|
||||
ASSERT_OK(delta_writer->finish_with_txnlog());
|
||||
delta_writer->close();
|
||||
ASSERT_OK(publish_single_version(tablet_id, version + 1, txn_id).status());
|
||||
version++;
|
||||
}
|
||||
|
||||
// Column mode partial update to create DCG
|
||||
{
|
||||
auto txn_id = next_id();
|
||||
ASSIGN_OR_ABORT(auto delta_writer, DeltaWriterBuilder()
|
||||
.set_tablet_manager(_tablet_mgr.get())
|
||||
.set_tablet_id(tablet_id)
|
||||
.set_txn_id(txn_id)
|
||||
.set_partition_id(_partition_id)
|
||||
.set_mem_tracker(_mem_tracker.get())
|
||||
.set_schema_id(_tablet_schema->id())
|
||||
.set_slot_descriptors(&_slot_pointers)
|
||||
.set_partial_update_mode(PartialUpdateMode::COLUMN_UPDATE_MODE)
|
||||
.build());
|
||||
ASSERT_OK(delta_writer->open());
|
||||
ASSERT_OK(delta_writer->write(partial_c1, indexes.data(), indexes.size()));
|
||||
ASSERT_OK(delta_writer->finish_with_txnlog());
|
||||
delta_writer->close();
|
||||
ASSERT_OK(publish_single_version(tablet_id, version + 1, txn_id).status());
|
||||
version++;
|
||||
}
|
||||
|
||||
// Remove generated DCG column files with absolute path to force Segment::open failure inside new_dcg_segment
|
||||
{
|
||||
ASSIGN_OR_ABORT(auto md, _tablet_mgr->get_tablet_metadata(tablet_id, version));
|
||||
for (const auto& kv : md->dcg_meta().dcgs()) {
|
||||
const auto& dcg_ver = kv.second;
|
||||
for (const auto& rel : dcg_ver.column_files()) {
|
||||
auto abs = _tablet_mgr->segment_location(tablet_id, rel);
|
||||
(void)fs::remove(abs);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Row mode partial update providing only primary keys with single-column slots,
|
||||
// so c1,c2 are unmodified and need to be read; c1 prefers DCG -> error
|
||||
std::vector<int> keys_only(kChunkSize);
|
||||
for (int i = 0; i < kChunkSize; i++) keys_only[i] = i;
|
||||
auto c0_only = Int32Column::create();
|
||||
c0_only->append_numbers(keys_only.data(), keys_only.size() * sizeof(int));
|
||||
Chunk::SlotHashMap slot_only;
|
||||
slot_only[0] = 0; // only c0
|
||||
Chunk c0_only_chunk({std::move(c0_only)}, slot_only);
|
||||
// Build local slot descriptors with single column (c0) to match chunk schema
|
||||
std::vector<SlotDescriptor> local_slots;
|
||||
local_slots.emplace_back(0, "c0", TypeDescriptor{LogicalType::TYPE_INT});
|
||||
std::vector<SlotDescriptor*> local_slot_ptrs;
|
||||
local_slot_ptrs.emplace_back(&local_slots[0]);
|
||||
|
||||
StatusOr<TabletMetadataPtr> pub_st = Status::OK();
|
||||
{
|
||||
auto txn_id = next_id();
|
||||
ASSIGN_OR_ABORT(auto delta_writer, DeltaWriterBuilder()
|
||||
.set_tablet_manager(_tablet_mgr.get())
|
||||
.set_tablet_id(tablet_id)
|
||||
.set_txn_id(txn_id)
|
||||
.set_partition_id(_partition_id)
|
||||
.set_mem_tracker(_mem_tracker.get())
|
||||
.set_schema_id(_tablet_schema->id())
|
||||
.set_slot_descriptors(&local_slot_ptrs)
|
||||
.set_partial_update_mode(PartialUpdateMode::ROW_MODE)
|
||||
.build());
|
||||
ASSERT_OK(delta_writer->open());
|
||||
ASSERT_OK(delta_writer->write(c0_only_chunk, indexes.data(), indexes.size()));
|
||||
ASSERT_OK(delta_writer->finish_with_txnlog());
|
||||
delta_writer->close();
|
||||
pub_st = publish_single_version(tablet_id, version + 1, txn_id);
|
||||
}
|
||||
// Expect publish failed due to DCG segment open failure
|
||||
ASSERT_FALSE(pub_st.status().ok());
|
||||
}
|
||||
|
||||
TEST_P(LakePartialUpdateTest, test_write_multi_segment) {
|
||||
auto chunk0 = generate_data(kChunkSize, 0, false, 3);
|
||||
auto chunk1 = generate_data(kChunkSize, 0, true, 3);
|
||||
|
|
|
|||
|
|
@ -0,0 +1,50 @@
|
|||
-- name: test_dcg_column_mode_update_data_accuracy
|
||||
-- Test DCG mode partial update followed by row mode insert
|
||||
-- This verifies that DCG data is not lost during row mode operations
|
||||
-- Enhanced to cover various DCG error handling paths and edge cases
|
||||
|
||||
-- Test 1: Basic DCG functionality
|
||||
CREATE TABLE t1(id int, os string, version string) PRIMARY KEY (id) PROPERTIES ("replication_num" = "1");
|
||||
-- result:
|
||||
-- !result
|
||||
|
||||
INSERT INTO t1(id, os, version) VALUES (1, '1', '1');
|
||||
-- result:
|
||||
-- !result
|
||||
|
||||
SELECT * FROM t1;
|
||||
-- result:
|
||||
1 1 1
|
||||
-- !result
|
||||
|
||||
SET partial_update_mode = 'column';
|
||||
-- result:
|
||||
-- !result
|
||||
|
||||
UPDATE t1 SET os = '2' WHERE id = 1;
|
||||
-- result:
|
||||
-- !result
|
||||
|
||||
SELECT * FROM t1;
|
||||
-- result:
|
||||
1 2 1
|
||||
-- !result
|
||||
|
||||
-- Switch to row mode - triggers get_column_values with DCG loading
|
||||
SET partial_update_mode = 'auto';
|
||||
-- result:
|
||||
-- !result
|
||||
|
||||
INSERT INTO t1 (id, version) VALUES (1, '2');
|
||||
-- result:
|
||||
-- !result
|
||||
|
||||
SELECT * FROM t1;
|
||||
-- result:
|
||||
1 2 2
|
||||
-- !result
|
||||
|
||||
-- Clean up
|
||||
DROP TABLE t1;
|
||||
-- result:
|
||||
-- !result
|
||||
|
|
@ -0,0 +1,21 @@
|
|||
-- name: test_dcg_column_mode_update_data_accuracy
|
||||
-- Test DCG mode partial update followed by row mode insert
|
||||
-- This verifies that DCG data is not lost during row mode operations
|
||||
|
||||
-- Test 1: Basic DCG functionality
|
||||
CREATE TABLE t1(id int, os string, version string) PRIMARY KEY (id) PROPERTIES ("replication_num" = "1");
|
||||
|
||||
INSERT INTO t1(id, os, version) VALUES (1, '1', '1');
|
||||
SELECT * FROM t1;
|
||||
|
||||
SET partial_update_mode = 'column';
|
||||
UPDATE t1 SET os = '2' WHERE id = 1;
|
||||
SELECT * FROM t1;
|
||||
|
||||
-- Switch to row mode - triggers get_column_values with DCG loading
|
||||
SET partial_update_mode = 'auto';
|
||||
INSERT INTO t1 (id, version) VALUES (1, '2');
|
||||
SELECT * FROM t1;
|
||||
|
||||
-- Clean up
|
||||
DROP TABLE t1;
|
||||
Loading…
Reference in New Issue