From 3b454d6ec9a024778457b4fd8d835463b17b8746 Mon Sep 17 00:00:00 2001 From: Gavin Date: Sun, 5 Oct 2025 20:15:06 -0700 Subject: [PATCH] [BugFix] Fix the crash issue caused by using an unintialized column evaluator in iceberg partition writer. (#63782) Signed-off-by: GavinMar --- be/src/connector/file_chunk_sink.cpp | 5 +++-- be/src/connector/hive_chunk_sink.cpp | 3 ++- be/src/connector/iceberg_chunk_sink.cpp | 9 +++++---- be/src/connector/partition_chunk_writer.h | 5 ++--- .../formats/parquet/parquet_file_writer.cpp | 19 +++++++++---------- be/src/formats/parquet/parquet_file_writer.h | 4 ++-- .../parquet/parquet_file_writer_test.cpp | 5 +++-- 7 files changed, 26 insertions(+), 24 deletions(-) diff --git a/be/src/connector/file_chunk_sink.cpp b/be/src/connector/file_chunk_sink.cpp index e6ac546f21f..2487f5a14b5 100644 --- a/be/src/connector/file_chunk_sink.cpp +++ b/be/src/connector/file_chunk_sink.cpp @@ -56,8 +56,9 @@ StatusOr> FileChunkSinkProvider::create_chun std::shared_ptr file_writer_factory; if (boost::iequals(ctx->format, formats::PARQUET)) { file_writer_factory = std::make_shared( - fs, ctx->compression_type, ctx->options, ctx->column_names, std::move(column_evaluators), std::nullopt, - ctx->executor, runtime_state); + fs, ctx->compression_type, ctx->options, ctx->column_names, + std::make_shared>>(std::move(column_evaluators)), + std::nullopt, ctx->executor, runtime_state); } else if (boost::iequals(ctx->format, formats::ORC)) { file_writer_factory = std::make_shared( fs, ctx->compression_type, ctx->options, ctx->column_names, std::move(column_evaluators), ctx->executor, diff --git a/be/src/connector/hive_chunk_sink.cpp b/be/src/connector/hive_chunk_sink.cpp index fc4480dd10f..a16c6ea8a27 100644 --- a/be/src/connector/hive_chunk_sink.cpp +++ b/be/src/connector/hive_chunk_sink.cpp @@ -66,7 +66,8 @@ StatusOr> HiveChunkSinkProvider::create_chun ctx->options[formats::ParquetWriterOptions::USE_LEGACY_DECIMAL_ENCODING] = "true"; ctx->options[formats::ParquetWriterOptions::USE_INT96_TIMESTAMP_ENCODING] = "true"; file_writer_factory = std::make_shared( - fs, ctx->compression_type, ctx->options, ctx->data_column_names, std::move(data_column_evaluators), + fs, ctx->compression_type, ctx->options, ctx->data_column_names, + std::make_shared>>(std::move(data_column_evaluators)), std::nullopt, ctx->executor, runtime_state); } else if (boost::iequals(ctx->format, formats::ORC)) { file_writer_factory = std::make_shared( diff --git a/be/src/connector/iceberg_chunk_sink.cpp b/be/src/connector/iceberg_chunk_sink.cpp index e53a08d9f24..bc27ef6af8d 100644 --- a/be/src/connector/iceberg_chunk_sink.cpp +++ b/be/src/connector/iceberg_chunk_sink.cpp @@ -82,7 +82,8 @@ StatusOr> IcebergChunkSinkProvider::create_c auto ctx = std::dynamic_pointer_cast(context); auto runtime_state = ctx->fragment_context->runtime_state(); std::shared_ptr fs = FileSystem::CreateUniqueFromString(ctx->path, FSOptions(&ctx->cloud_conf)).value(); - auto column_evaluators = ColumnEvaluator::clone(ctx->column_evaluators); + auto column_evaluators = std::make_shared>>( + ColumnEvaluator::clone(ctx->column_evaluators)); auto location_provider = std::make_shared( ctx->path, print_id(ctx->fragment_context->query_id()), runtime_state->be_number(), driver_id, boost::to_lower_copy(ctx->format)); @@ -93,8 +94,8 @@ StatusOr> IcebergChunkSinkProvider::create_c std::shared_ptr file_writer_factory; if (boost::iequals(ctx->format, formats::PARQUET)) { file_writer_factory = std::make_shared( - fs, ctx->compression_type, ctx->options, ctx->column_names, std::move(column_evaluators), - ctx->parquet_field_ids, ctx->executor, runtime_state); + fs, ctx->compression_type, ctx->options, ctx->column_names, column_evaluators, ctx->parquet_field_ids, + ctx->executor, runtime_state); } else { file_writer_factory = std::make_shared(ctx->format); } @@ -107,7 +108,7 @@ StatusOr> IcebergChunkSinkProvider::create_c fs, ctx->fragment_context, runtime_state->desc_tbl().get_tuple_descriptor(ctx->tuple_desc_id), - &ctx->column_evaluators, + column_evaluators, ctx->sort_ordering}); partition_chunk_writer_factory = std::make_unique(partition_chunk_writer_ctx); } else { diff --git a/be/src/connector/partition_chunk_writer.h b/be/src/connector/partition_chunk_writer.h index 50f870e22ff..8a8140987d6 100644 --- a/be/src/connector/partition_chunk_writer.h +++ b/be/src/connector/partition_chunk_writer.h @@ -50,7 +50,7 @@ struct SpillPartitionChunkWriterContext : public PartitionChunkWriterContext { std::shared_ptr fs; pipeline::FragmentContext* fragment_context = nullptr; TupleDescriptor* tuple_desc = nullptr; - std::vector>* column_evaluators; + std::shared_ptr>> column_evaluators; std::shared_ptr sort_ordering; }; @@ -185,13 +185,12 @@ private: std::shared_ptr _fs = nullptr; pipeline::FragmentContext* _fragment_context = nullptr; TupleDescriptor* _tuple_desc = nullptr; - std::vector>* _column_evaluators; + std::shared_ptr>> _column_evaluators; std::shared_ptr _sort_ordering; std::unique_ptr _chunk_spill_token; std::unique_ptr _block_merge_token; std::unique_ptr _load_spill_block_mgr; std::shared_ptr _load_chunk_spiller; - //std::function(Chunk*, size_t)> _column_eval_func; TUniqueId _writer_id; std::list _chunks; diff --git a/be/src/formats/parquet/parquet_file_writer.cpp b/be/src/formats/parquet/parquet_file_writer.cpp index f87cf762c36..e3d754d5c6a 100644 --- a/be/src/formats/parquet/parquet_file_writer.cpp +++ b/be/src/formats/parquet/parquet_file_writer.cpp @@ -455,13 +455,12 @@ Status ParquetFileWriter::init() { ParquetFileWriter::~ParquetFileWriter() = default; -ParquetFileWriterFactory::ParquetFileWriterFactory(std::shared_ptr fs, - TCompressionType::type compression_type, - std::map options, - std::vector column_names, - std::vector>&& column_evaluators, - std::optional> field_ids, - PriorityThreadPool* executors, RuntimeState* runtime_state) +ParquetFileWriterFactory::ParquetFileWriterFactory( + std::shared_ptr fs, TCompressionType::type compression_type, + std::map options, std::vector column_names, + std::shared_ptr>> column_evaluators, + std::optional> field_ids, PriorityThreadPool* executors, + RuntimeState* runtime_state) : _fs(std::move(fs)), _compression_type(compression_type), _field_ids(std::move(field_ids)), @@ -472,7 +471,7 @@ ParquetFileWriterFactory::ParquetFileWriterFactory(std::shared_ptr f _runtime_state(runtime_state) {} Status ParquetFileWriterFactory::init() { - RETURN_IF_ERROR(ColumnEvaluator::init(_column_evaluators)); + RETURN_IF_ERROR(ColumnEvaluator::init(*_column_evaluators)); _parsed_options = std::make_shared(); _parsed_options->column_ids = _field_ids; if (_options.contains(ParquetWriterOptions::USE_LEGACY_DECIMAL_ENCODING)) { @@ -506,8 +505,8 @@ StatusOr ParquetFileWriterFactory::create(const std::string& pa auto rollback_action = [fs = _fs, path = path]() { WARN_IF_ERROR(ignore_not_found(fs->delete_file(path)), "fail to delete file"); }; - auto column_evaluators = ColumnEvaluator::clone(_column_evaluators); - auto types = ColumnEvaluator::types(_column_evaluators); + auto column_evaluators = ColumnEvaluator::clone(*_column_evaluators); + auto types = ColumnEvaluator::types(*_column_evaluators); auto async_output_stream = std::make_unique(std::move(file), _executors, _runtime_state); auto parquet_output_stream = std::make_shared(async_output_stream.get()); diff --git a/be/src/formats/parquet/parquet_file_writer.h b/be/src/formats/parquet/parquet_file_writer.h index d0b385a6ded..f14e6288236 100644 --- a/be/src/formats/parquet/parquet_file_writer.h +++ b/be/src/formats/parquet/parquet_file_writer.h @@ -162,7 +162,7 @@ class ParquetFileWriterFactory : public FileWriterFactory { public: ParquetFileWriterFactory(std::shared_ptr fs, TCompressionType::type compression_type, std::map options, std::vector column_names, - std::vector>&& column_evaluators, + std::shared_ptr>> column_evaluators, std::optional> field_ids, PriorityThreadPool* executors, RuntimeState* runtime_state); @@ -178,7 +178,7 @@ private: std::shared_ptr _parsed_options; std::vector _column_names; - std::vector> _column_evaluators; + std::shared_ptr>> _column_evaluators; PriorityThreadPool* _executors = nullptr; RuntimeState* _runtime_state = nullptr; }; diff --git a/be/test/formats/parquet/parquet_file_writer_test.cpp b/be/test/formats/parquet/parquet_file_writer_test.cpp index 4c26001725f..5b4ff12520f 100644 --- a/be/test/formats/parquet/parquet_file_writer_test.cpp +++ b/be/test/formats/parquet/parquet_file_writer_test.cpp @@ -1123,10 +1123,11 @@ TEST_F(ParquetFileWriterTest, TestFactory) { std::vector type_descs{type_bool}; auto column_names = _make_type_names(type_descs); - auto column_evaluators = ColumnSlotIdEvaluator::from_types(type_descs); + auto column_evaluators = std::make_shared>>( + ColumnSlotIdEvaluator::from_types(type_descs)); auto fs = std::make_shared(); auto factory = formats::ParquetFileWriterFactory(fs, TCompressionType::NO_COMPRESSION, {}, column_names, - std::move(column_evaluators), std::nullopt, nullptr, nullptr); + column_evaluators, std::nullopt, nullptr, nullptr); ASSERT_OK(factory.init()); auto maybe_writer = factory.create(_file_path); ASSERT_OK(maybe_writer.status());