[BugFix] fix parquet array write when split null string (backport #61999) (#62012)

Signed-off-by: yan zhang <dirtysalt1987@gmail.com>
Co-authored-by: yan zhang <dirtysalt1987@gmail.com>
This commit is contained in:
mergify[bot] 2025-08-18 03:37:13 +00:00 committed by GitHub
parent 982f2ebd3e
commit dbb3e1d5f8
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 105 additions and 2 deletions

View File

@ -155,6 +155,9 @@ StatusOr<ColumnPtr> StringFunctions::split(FunctionContext* context, const starr
array_binary_column->reserve(row_nums * 5, haystack_columns->get_bytes().size());
for (int row = 0; row < row_nums; ++row) {
array_offsets->append(offset);
if (string_viewer.is_null(row)) {
continue;
}
Slice haystack = string_viewer.value(row);
int32_t haystack_offset = 0;
int splits_size = 0;

View File

@ -457,8 +457,16 @@ Status LevelBuilder::_write_array_column_chunk(const LevelBuilderContext& ctx, c
continue;
}
auto array_size = offsets[offset + 1] - offsets[offset];
auto array_is_null = (def_level < ctx._max_def_level || (null_col != nullptr && null_col[offset]));
// null in current array_column
if (def_level < ctx._max_def_level || (null_col != nullptr && null_col[offset])) {
if (array_is_null) {
if (array_size > 0) {
return Status::DataQualityError(
fmt::format("Array column ({}) has null element at offset {}, but array size is {}",
type_desc.debug_string(), offset, array_size));
}
(*def_levels)[num_levels] = def_level;
(*rep_levels)[num_levels] = rep_level;
@ -467,7 +475,6 @@ Status LevelBuilder::_write_array_column_chunk(const LevelBuilderContext& ctx, c
continue;
}
auto array_size = offsets[offset + 1] - offsets[offset];
// not null but empty array
if (array_size == 0) {
(*def_levels)[num_levels] = def_level + node->is_optional();

View File

@ -466,6 +466,52 @@ TEST_F(FileWriterTest, TestWriteArray) {
Utils::assert_equal_chunk(chunk.get(), read_chunk.get());
}
TEST_F(FileWriterTest, TestWriteArrayNullWithOffset) {
// type_descs
std::vector<TypeDescriptor> type_descs;
auto type_int = TypeDescriptor::from_logical_type(TYPE_INT);
auto type_int_array = TypeDescriptor::from_logical_type(TYPE_ARRAY);
type_int_array.children.push_back(type_int);
type_descs.push_back(type_int_array);
// NULL, [10]
auto chunk = std::make_shared<Chunk>();
{
auto elements_data_col = Int32Column::create();
std::vector<int32_t> nums{-99, 10};
elements_data_col->append_numbers(nums.data(), sizeof(int32_t) * nums.size());
auto elements_null_col = UInt8Column::create();
std::vector<uint8_t> nulls{1, 0};
elements_null_col->append_numbers(nulls.data(), sizeof(uint8_t) * nulls.size());
auto elements_col = NullableColumn::create(std::move(elements_data_col), std::move(elements_null_col));
auto offsets_col = UInt32Column::create();
std::vector<uint32_t> offsets{0, 1, 2};
offsets_col->append_numbers(offsets.data(), sizeof(uint32_t) * offsets.size());
auto array_col = ArrayColumn::create(std::move(elements_col), std::move(offsets_col));
std::vector<uint8_t> _nulls{1, 0};
auto null_col = UInt8Column::create();
null_col->append_numbers(_nulls.data(), sizeof(uint8_t) * _nulls.size());
auto nullable_col = NullableColumn::create(std::move(array_col), std::move(null_col));
std::cout << "nullable_col: " << nullable_col->debug_string() << std::endl;
chunk->append_column(std::move(nullable_col), chunk->num_columns());
}
// write chunk
auto schema = _make_schema(type_descs);
ASSERT_TRUE(schema != nullptr);
auto st = _write_chunk(chunk, type_descs, schema);
ASSERT_ERROR(st);
// // read chunk and assert equality
// auto read_chunk = _read_chunk(type_descs);
// ASSERT_TRUE(read_chunk != nullptr);
// Utils::assert_equal_chunk(chunk.get(), read_chunk.get());
}
TEST_F(FileWriterTest, TestWriteStruct) {
// type_descs
std::vector<TypeDescriptor> type_descs;

View File

@ -0,0 +1,28 @@
-- name: test_parquet_array_check
shell: ossutil64 mkdir oss://${oss_bucket}/test_parquet_array_check/${uuid0}/ >/dev/null || echo "exit 0" >/dev/null
-- result:
0
-- !result
insert into files
(
"path" = "oss://${oss_bucket}/test_parquet_array_check/${uuid0}/000/",
"format" = "parquet"
)
with tt (tme_mv_id, fid, fsinger_id_max) as ((select 10, "10", null) union all (select 20, "20", "aaa")),
tt2 (tme_mv_id, fid, fsinger_ids, fsinger_id_max) as (select tme_mv_id, fid, split(fsinger_id_max, '|') as fsinger_ids, fsinger_id_max from tt)
select * from tt2;
-- result:
-- !result
select * from
files("path" = "oss://${oss_bucket}/test_parquet_array_check/${uuid0}/000/*",
"format" = "parquet") order by tme_mv_id;
-- result:
10 10 None None
20 20 ["aaa"] aaa
-- !result
shell: ossutil64 rm -rf oss://${oss_bucket}/test_parquet_array_check/${uuid0}/ >/dev/null || echo "exit 0" >/dev/null
-- result:
0
-- !result

View File

@ -0,0 +1,19 @@
-- name: test_parquet_array_check
shell: ossutil64 mkdir oss://${oss_bucket}/test_parquet_array_check/${uuid0}/ >/dev/null || echo "exit 0" >/dev/null
insert into files
(
"path" = "oss://${oss_bucket}/test_parquet_array_check/${uuid0}/000/",
"format" = "parquet"
)
with tt (tme_mv_id, fid, fsinger_id_max) as ((select 10, "10", null) union all (select 20, "20", "aaa")),
tt2 (tme_mv_id, fid, fsinger_ids, fsinger_id_max) as (select tme_mv_id, fid, split(fsinger_id_max, '|') as fsinger_ids, fsinger_id_max from tt)
select * from tt2;
-- expect [10, nul], [20, ["aaa"]]
select * from
files("path" = "oss://${oss_bucket}/test_parquet_array_check/${uuid0}/000/*",
"format" = "parquet") order by tme_mv_id;
shell: ossutil64 rm -rf oss://${oss_bucket}/test_parquet_array_check/${uuid0}/ >/dev/null || echo "exit 0" >/dev/null