From a9ed6d35e4e1b32065ce59445d28e6bed48aff0e Mon Sep 17 00:00:00 2001 From: Gavin Date: Fri, 26 Sep 2025 20:36:35 -0700 Subject: [PATCH] [BugFix] Fix the thread safety issue caused by concurrent initialization of column evaluator by multiple iceberg partition writers. (#63598) Why I'm doing: Now we initialize the column evaluator in the partition writer, which cause the same evaluator instance may be modified by different threads. What I'm doing: Remove the unnecessary evaluator initialization in the iceberg partition writer because it has been initialized in partition chunk writer factory. Also, we check the file writer and create it if needed before writing file data, because it may has been reset by the previous commit operation. Fixes #issue Signed-off-by: GavinMar --- be/src/connector/partition_chunk_writer.cpp | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/be/src/connector/partition_chunk_writer.cpp b/be/src/connector/partition_chunk_writer.cpp index ff8df511908..67e8b8ba0da 100644 --- a/be/src/connector/partition_chunk_writer.cpp +++ b/be/src/connector/partition_chunk_writer.cpp @@ -73,10 +73,10 @@ Status BufferPartitionChunkWriter::init() { } Status BufferPartitionChunkWriter::write(Chunk* chunk) { - RETURN_IF_ERROR(create_file_writer_if_needed()); - if (_file_writer->get_written_bytes() >= _max_file_size) { + if (_file_writer && _file_writer->get_written_bytes() >= _max_file_size) { commit_file(); } + RETURN_IF_ERROR(create_file_writer_if_needed()); return _file_writer->write(chunk); } @@ -120,9 +120,6 @@ Status SpillPartitionChunkWriter::init() { RETURN_IF_ERROR(_load_spill_block_mgr->init()); _load_chunk_spiller = std::make_unique(_load_spill_block_mgr.get(), _fragment_context->runtime_state()->runtime_profile()); - if (_column_evaluators) { - RETURN_IF_ERROR(ColumnEvaluator::init(*_column_evaluators)); - } return Status::OK(); }