[Enhancement] support parquet data page v2(no encoding) (#57621)

Signed-off-by: yan zhang <dirtysalt1987@gmail.com>
This commit is contained in:
yan zhang 2025-04-04 10:11:39 +08:00 committed by GitHub
parent 90a7e74bc6
commit 76548e2cb1
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
9 changed files with 124 additions and 40 deletions

View File

@ -19,11 +19,9 @@
#include <memory>
#include <string>
#include <string_view>
#include <utility>
#include "common/compiler_util.h"
#include "common/status.h"
#include "common/statusor.h"
#include "formats/parquet/encoding.h"
#include "formats/parquet/types.h"
#include "formats/parquet/utils.h"
@ -126,20 +124,27 @@ Status ColumnChunkReader::_parse_page_header() {
// so we set _num_values when parsing header.
auto& page_type = _page_reader->current_header()->type;
// TODO: support DATA_PAGE_V2, now common writer use DATA_PAGE as default
if (UNLIKELY(page_type != tparquet::PageType::DICTIONARY_PAGE && page_type != tparquet::PageType::DATA_PAGE)) {
if (UNLIKELY(page_type != tparquet::PageType::DICTIONARY_PAGE && page_type != tparquet::PageType::DATA_PAGE &&
page_type != tparquet::PageType::DATA_PAGE_V2)) {
return Status::NotSupported(strings::Substitute("Not supported page type: $0", page_type));
}
if (page_type == tparquet::PageType::DATA_PAGE) {
const auto& header = *_page_reader->current_header();
_num_values = header.data_page_header.num_values;
const auto& page_header = _page_reader->current_header()->data_page_header;
_num_values = page_header.num_values;
_opts.stats->has_page_statistics |=
(header.data_page_header.__isset.statistics && (header.data_page_header.statistics.__isset.min_value ||
header.data_page_header.statistics.__isset.min));
_current_page_no_null =
(header.data_page_header.__isset.statistics && header.data_page_header.statistics.__isset.null_count &&
header.data_page_header.statistics.null_count == 0)
? true
: false;
(page_header.__isset.statistics &&
(page_header.statistics.__isset.min_value || page_header.statistics.__isset.min));
_current_page_no_null = (page_header.__isset.statistics && page_header.statistics.__isset.null_count &&
page_header.statistics.null_count == 0)
? true
: false;
} else if (page_type == tparquet::PageType::DATA_PAGE_V2) {
const auto& page_header = _page_reader->current_header()->data_page_header_v2;
_num_values = page_header.num_values;
_opts.stats->has_page_statistics |=
(page_header.__isset.statistics &&
(page_header.statistics.__isset.min_value || page_header.statistics.__isset.min));
_current_page_no_null = (page_header.num_nulls == 0) ? true : false;
}
return Status::OK();
@ -149,7 +154,8 @@ Status ColumnChunkReader::_parse_page_data() {
SCOPED_RAW_TIMER(&_opts.stats->page_read_ns);
switch (_page_reader->current_header()->type) {
case tparquet::PageType::DATA_PAGE:
RETURN_IF_ERROR(_parse_data_page());
case tparquet::PageType::DATA_PAGE_V2:
RETURN_IF_ERROR(_parse_data_page(_page_reader->current_header()->type));
break;
case tparquet::PageType::DICTIONARY_PAGE:
RETURN_IF_ERROR(_parse_dict_page());
@ -163,7 +169,7 @@ Status ColumnChunkReader::_parse_page_data() {
}
Status ColumnChunkReader::_read_and_decompress_page_data(uint32_t compressed_size, uint32_t uncompressed_size,
bool is_compressed) {
bool is_compressed, uint32_t bytes_level_size) {
RETURN_IF_ERROR(CurrentThread::mem_tracker()->check_mem_limit("read and decompress page"));
is_compressed = is_compressed && (_compress_codec != nullptr);
@ -191,14 +197,23 @@ Status ColumnChunkReader::_read_and_decompress_page_data(uint32_t compressed_siz
if (is_compressed) {
_uncompressed_buf.reserve(uncompressed_size);
_data = Slice(_uncompressed_buf.data(), uncompressed_size);
if (bytes_level_size > 0) {
memcpy(_data.data, read_data.data, bytes_level_size);
read_data.remove_prefix(bytes_level_size);
_data.remove_prefix(bytes_level_size);
}
RETURN_IF_ERROR(_compress_codec->decompress(read_data, &_data));
if (bytes_level_size > 0) {
// reconstruct this uncompressed buffer.
_data = Slice(_uncompressed_buf.data(), uncompressed_size);
}
} else {
_data = read_data;
}
return Status::OK();
}
Status ColumnChunkReader::_parse_data_page() {
Status ColumnChunkReader::_parse_data_page(tparquet::PageType::type page_type) {
if (_page_parse_state == PAGE_DATA_PARSED) {
return Status::OK();
}
@ -207,22 +222,48 @@ Status ColumnChunkReader::_parse_data_page() {
}
const auto& header = *_page_reader->current_header();
tparquet::Encoding::type encoding = tparquet::Encoding::PLAIN;
uint32_t compressed_size = header.compressed_page_size;
uint32_t uncompressed_size = header.uncompressed_page_size;
RETURN_IF_ERROR(_read_and_decompress_page_data(compressed_size, uncompressed_size, true));
bool is_compressed = true;
uint32_t bytes_level_size = 0;
// parse levels
if (_max_rep_level > 0) {
RETURN_IF_ERROR(_rep_level_decoder.parse(header.data_page_header.repetition_level_encoding, _max_rep_level,
header.data_page_header.num_values, &_data));
if (page_type == tparquet::PageType::DATA_PAGE_V2) {
const auto& page_header = header.data_page_header_v2;
if (page_header.__isset.is_compressed) {
is_compressed = page_header.is_compressed;
}
bytes_level_size = page_header.repetition_levels_byte_length + page_header.definition_levels_byte_length;
}
if (_max_def_level > 0) {
RETURN_IF_ERROR(_def_level_decoder.parse(header.data_page_header.definition_level_encoding, _max_def_level,
header.data_page_header.num_values, &_data));
RETURN_IF_ERROR(
_read_and_decompress_page_data(compressed_size, uncompressed_size, is_compressed, bytes_level_size));
if (page_type == tparquet::PageType::DATA_PAGE) {
const auto& page_header = header.data_page_header;
encoding = page_header.encoding;
// parse levels
if (_max_rep_level > 0) {
RETURN_IF_ERROR(_rep_level_decoder.parse(page_header.repetition_level_encoding, _max_rep_level,
page_header.num_values, &_data));
}
if (_max_def_level > 0) {
RETURN_IF_ERROR(_def_level_decoder.parse(page_header.definition_level_encoding, _max_def_level,
page_header.num_values, &_data));
}
} else if (page_type == tparquet::PageType::DATA_PAGE_V2) {
const auto& page_header = header.data_page_header_v2;
encoding = page_header.encoding;
// parse levels
if (_max_rep_level > 0) {
RETURN_IF_ERROR(_rep_level_decoder.parse_v2(page_header.repetition_levels_byte_length, _max_rep_level,
page_header.num_values, &_data));
}
if (_max_def_level > 0) {
RETURN_IF_ERROR(_def_level_decoder.parse_v2(page_header.definition_levels_byte_length, _max_def_level,
page_header.num_values, &_data));
}
}
auto encoding = header.data_page_header.encoding;
// change the deprecated encoding to RLE_DICTIONARY
if (encoding == tparquet::Encoding::PLAIN_DICTIONARY) {
encoding = tparquet::Encoding::RLE_DICTIONARY;

View File

@ -142,13 +142,12 @@ private:
Status _parse_page_header();
Status _parse_page_data();
Status _read_and_decompress_page_data();
Status _parse_data_page();
Status _parse_data_page(tparquet::PageType::type page_type);
Status _parse_dict_page();
Status _try_load_dictionary();
Status _read_and_decompress_page_data(uint32_t compressed_size, uint32_t uncompressed_size, bool is_compressed);
Status _read_and_decompress_page_data(uint32_t compressed_size, uint32_t uncompressed_size, bool is_compressed,
uint32_t bytes_level_size = 0);
private:
enum PageParseState {

View File

@ -209,7 +209,7 @@ private:
} else {
auto ret = _rle_batch_reader.GetBatchWithDict(_dict.data(), _dict.size(), data, count);
if (UNLIKELY(ret <= 0)) {
return Status::InternalError("DictDecoder GetBatchWithDict failed");
return Status::InternalError("DictDecoder<> GetBatchWithDict failed");
}
}
@ -361,7 +361,7 @@ private:
raw::stl_vector_resize_uninitialized(&_slices, count);
auto ret = _rle_batch_reader.GetBatchWithDict(_dict.data(), _dict.size(), _slices.data(), count);
if (UNLIKELY(ret <= 0)) {
return Status::InternalError("DictDecoder GetBatchWithDict failed");
return Status::InternalError("DictDecoder<Slice> GetBatchWithDict failed");
}
ret = dst->append_strings_overflow(_slices, _max_value_length);
if (UNLIKELY(!ret)) {

View File

@ -30,18 +30,16 @@ Status LevelDecoder::parse(tparquet::Encoding::type encoding, level_t max_level,
switch (encoding) {
case tparquet::Encoding::RLE: {
if (slice->size < 4) {
return Status::InternalError("");
return Status::Corruption("");
}
auto* data = (uint8_t*)slice->data;
uint32_t num_bytes = decode_fixed32_le(data);
if (num_bytes > slice->size - 4) {
return Status::InternalError("");
return Status::Corruption("");
}
_rle_decoder = RleDecoder<level_t>(data + 4, num_bytes, _bit_width);
slice->data += 4 + num_bytes;
slice->size -= 4 + num_bytes;
slice->remove_prefix(4 + num_bytes);
break;
}
case tparquet::Encoding::BIT_PACKED: {
@ -51,9 +49,7 @@ Status LevelDecoder::parse(tparquet::Encoding::type encoding, level_t max_level,
return Status::Corruption("");
}
_bit_packed_decoder = BitReader((uint8_t*)slice->data, num_bytes);
slice->data += num_bytes;
slice->size -= num_bytes;
slice->remove_prefix(num_bytes);
break;
}
default:
@ -62,6 +58,21 @@ Status LevelDecoder::parse(tparquet::Encoding::type encoding, level_t max_level,
return Status::OK();
}
Status LevelDecoder::parse_v2(uint32_t num_bytes, level_t max_level, uint32_t num_levels, Slice* slice) {
_encoding = tparquet::Encoding::RLE;
_bit_width = BitUtil::log2(max_level + 1);
_num_levels = num_levels;
// new page, invalid cached decode
_levels_decoded = _levels_parsed;
auto* data = (uint8_t*)slice->data;
if (num_bytes > slice->size) {
return Status::Corruption("");
}
_rle_decoder = RleDecoder<level_t>(data, num_bytes, _bit_width);
slice->remove_prefix(num_bytes);
return Status::OK();
}
size_t LevelDecoder::_get_level_to_decode_batch_size(size_t row_num) {
constexpr size_t min_level_batch_size = 4096;
size_t levels_remaining = _levels_decoded - _levels_parsed;

View File

@ -46,6 +46,7 @@ public:
// input 1000 length data, and decoder digest 100 bytes. slice will be set to
// the last 900.
Status parse(tparquet::Encoding::type encoding, level_t max_level, uint32_t num_levels, Slice* slice);
Status parse_v2(uint32_t num_bytes, level_t max_level, uint32_t num_levels, Slice* slice);
size_t next_repeated_count() {
DCHECK_EQ(_encoding, tparquet::Encoding::RLE);

View File

@ -4294,4 +4294,34 @@ TEST_F(FileReaderTest, test_filter_to_dict_decoder) {
EXPECT_EQ(2, total_row_nums);
}
TEST_F(FileReaderTest, test_data_page_v2) {
auto chunk = std::make_shared<Chunk>();
chunk->append_column(ColumnHelper::create_column(TYPE_INT_DESC, true), chunk->num_columns());
chunk->append_column(ColumnHelper::create_column(TYPE_VARCHAR_DESC, true), chunk->num_columns());
const std::string file_path = "./be/test/formats/parquet/test_data/data_page_v2_test.parquet";
Utils::SlotDesc slot_descs[] = {{"id", TYPE_INT_DESC}, {"name", TYPE_VARCHAR_DESC}, {""}};
auto ctx = _create_file_random_read_context(file_path, slot_descs);
auto file_reader = _create_file_reader(file_path);
Status status = file_reader->init(ctx);
ASSERT_TRUE(status.ok());
size_t total_row_nums = 0;
while (!status.is_end_of_file()) {
chunk->reset();
status = file_reader->get_next(&chunk);
chunk->check_or_die();
total_row_nums += chunk->num_rows();
// for (int i = 0; i < chunk->num_rows(); i++) {
// std::cout << chunk->debug_row(i) << std::endl;
// }
if (chunk->num_rows() == 4) {
ASSERT_EQ(chunk->debug_row(0), "[1, 'a']");
ASSERT_EQ(chunk->debug_row(1), "[2, 'b']");
ASSERT_EQ(chunk->debug_row(2), "[3, 'c']");
ASSERT_EQ(chunk->debug_row(3), "[4, 'd']");
}
}
EXPECT_EQ(4, total_row_nums);
}
} // namespace starrocks::parquet

View File

@ -35,7 +35,7 @@ static uint64_t hash(uint64_t value) {
}
// keep logic same with java version in fe when you change hll_test.cpp,see HllTest.java
TEST_F(TestHll, Normal) {
uint8_t buf[HLL_REGISTERS_COUNT + 1];
uint8_t buf[HLL_REGISTERS_COUNT + 1] = {0};
// empty
{

View File

@ -13,6 +13,8 @@
# INFO, WARNING, ERROR, FATAL
sys_log_level = INFO
# sys_log_verbose_modules = *
# sys_log_verbose_level = 3
# ports for admin, web, heartbeat service
be_port = 9060