[Enhancement] Write iceberg tables without accumulating chunks when memory is sufficient to improve the sink performance for small number of partitions. (#63802)

Signed-off-by: GavinMar <yangguansuo@starrocks.com>
This commit is contained in:
Gavin 2025-10-07 23:42:22 -07:00 committed by GitHub
parent 5f982fa9e6
commit 1dbf05f5dd
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
14 changed files with 109 additions and 48 deletions

View File

@ -1604,7 +1604,7 @@ CONF_mBool(apply_del_vec_after_all_index_filter, "true");
// connector sink memory watermark
CONF_mDouble(connector_sink_mem_high_watermark_ratio, "0.3");
CONF_mDouble(connector_sink_mem_low_watermark_ratio, "0.1");
CONF_mDouble(connector_sink_mem_urgent_space_ratio, "0.1");
CONF_mDouble(connector_sink_mem_urgent_space_ratio, "0.05");
// Whether enable spill intermediate data for connector sink.
CONF_mBool(enable_connector_sink_spill, "true");

View File

@ -41,7 +41,8 @@ Status ConnectorChunkSink::init() {
}
Status ConnectorChunkSink::write_partition_chunk(const std::string& partition,
const std::vector<int8_t>& partition_field_null_list, Chunk* chunk) {
const std::vector<int8_t>& partition_field_null_list,
const ChunkPtr& chunk) {
// partition_field_null_list is used to distinguish with the secenario like NULL and string "null"
// They are under the same dir path, but should not in the same data file.
// We should record them in different files so that each data file could has its own meta info.
@ -64,13 +65,13 @@ Status ConnectorChunkSink::write_partition_chunk(const std::string& partition,
return Status::OK();
}
Status ConnectorChunkSink::add(Chunk* chunk) {
Status ConnectorChunkSink::add(const ChunkPtr& chunk) {
std::string partition = DEFAULT_PARTITION;
bool partitioned = !_partition_column_names.empty();
if (partitioned) {
ASSIGN_OR_RETURN(partition,
HiveUtils::make_partition_name(_partition_column_names, _partition_column_evaluators, chunk,
_support_null_partition));
HiveUtils::make_partition_name(_partition_column_names, _partition_column_evaluators,
chunk.get(), _support_null_partition));
}
RETURN_IF_ERROR(
@ -79,6 +80,13 @@ Status ConnectorChunkSink::add(Chunk* chunk) {
}
Status ConnectorChunkSink::finish() {
// Flushing data to disk to make more memory space for subsequent merge operations.
for (auto& [partition_key, writer] : _partition_chunk_writers) {
RETURN_IF_ERROR(writer->flush());
}
for (auto& [partition_key, writer] : _partition_chunk_writers) {
RETURN_IF_ERROR(writer->wait_flush());
}
for (auto& [partition_key, writer] : _partition_chunk_writers) {
RETURN_IF_ERROR(writer->finish());
}

View File

@ -47,7 +47,7 @@ public:
Status init();
virtual Status add(Chunk* chunk);
virtual Status add(const ChunkPtr& chunk);
Status finish();
@ -58,7 +58,7 @@ public:
virtual void callback_on_commit(const CommitResult& result) = 0;
Status write_partition_chunk(const std::string& partition, const vector<int8_t>& partition_field_null_list,
Chunk* chunk);
const ChunkPtr& chunk);
Status status();

View File

@ -50,13 +50,16 @@ int ConnectorSinkSpillExecutor::calc_max_thread_num() {
}
void ChunkSpillTask::run() {
SCOPED_THREAD_LOCAL_MEM_TRACKER_SETTER(_mem_tracker);
auto res = _load_chunk_spiller->spill(*_chunk);
if (_cb) {
_cb(_chunk, res);
}
_chunk.reset();
}
void MergeBlockTask::run() {
SCOPED_THREAD_LOCAL_MEM_TRACKER_SETTER(_mem_tracker);
auto st = _writer->merge_blocks();
if (_cb) {
_cb(st);

View File

@ -71,9 +71,12 @@ protected:
class ChunkSpillTask final : public Runnable {
public:
ChunkSpillTask(LoadChunkSpiller* load_chunk_spiller, ChunkPtr chunk,
ChunkSpillTask(LoadChunkSpiller* load_chunk_spiller, ChunkPtr chunk, MemTracker* mem_tracker,
std::function<void(ChunkPtr chunk, const StatusOr<size_t>&)> cb)
: _load_chunk_spiller(load_chunk_spiller), _chunk(chunk), _cb(std::move(cb)) {}
: _load_chunk_spiller(load_chunk_spiller),
_chunk(std::move(chunk)),
_mem_tracker(mem_tracker),
_cb(std::move(cb)) {}
~ChunkSpillTask() override = default;
@ -82,18 +85,20 @@ public:
private:
LoadChunkSpiller* _load_chunk_spiller;
ChunkPtr _chunk;
MemTracker* _mem_tracker;
std::function<void(ChunkPtr, const StatusOr<size_t>&)> _cb;
};
class MergeBlockTask : public Runnable {
public:
MergeBlockTask(SpillPartitionChunkWriter* writer, std::function<void(const Status&)> cb)
: _writer(writer), _cb(std::move(cb)) {}
MergeBlockTask(SpillPartitionChunkWriter* writer, MemTracker* mem_tracker, std::function<void(const Status&)> cb)
: _writer(writer), _mem_tracker(mem_tracker), _cb(std::move(cb)) {}
void run() override;
private:
SpillPartitionChunkWriter* _writer;
MemTracker* _mem_tracker;
std::function<void(const Status&)> _cb;
};

View File

@ -124,14 +124,14 @@ StatusOr<std::unique_ptr<ConnectorChunkSink>> IcebergChunkSinkProvider::create_c
std::move(partition_chunk_writer_factory), runtime_state);
}
Status IcebergChunkSink::add(Chunk* chunk) {
Status IcebergChunkSink::add(const ChunkPtr& chunk) {
std::string partition = DEFAULT_PARTITION;
bool partitioned = !_partition_column_names.empty();
std::vector<int8_t> partition_field_null_list;
if (partitioned) {
ASSIGN_OR_RETURN(partition, HiveUtils::iceberg_make_partition_name(
_partition_column_names, _partition_column_evaluators,
dynamic_cast<IcebergChunkSink*>(this)->transform_expr(), chunk,
dynamic_cast<IcebergChunkSink*>(this)->transform_expr(), chunk.get(),
_support_null_partition, partition_field_null_list));
}

View File

@ -45,7 +45,7 @@ public:
const std::vector<std::string>& transform_expr() const { return _transform_exprs; }
Status add(Chunk* chunk) override;
Status add(const ChunkPtr& chunk) override;
private:
std::vector<std::string> _transform_exprs;

View File

@ -72,12 +72,12 @@ Status BufferPartitionChunkWriter::init() {
return Status::OK();
}
Status BufferPartitionChunkWriter::write(Chunk* chunk) {
Status BufferPartitionChunkWriter::write(const ChunkPtr& chunk) {
if (_file_writer && _file_writer->get_written_bytes() >= _max_file_size) {
commit_file();
}
RETURN_IF_ERROR(create_file_writer_if_needed());
return _file_writer->write(chunk);
return _file_writer->write(chunk.get());
}
Status BufferPartitionChunkWriter::flush() {
@ -85,6 +85,10 @@ Status BufferPartitionChunkWriter::flush() {
return Status::OK();
}
Status BufferPartitionChunkWriter::wait_flush() {
return Status::OK();
}
Status BufferPartitionChunkWriter::finish() {
commit_file();
return Status::OK();
@ -102,6 +106,7 @@ SpillPartitionChunkWriter::SpillPartitionChunkWriter(std::string partition,
_block_merge_token = StorageEngine::instance()->load_spill_block_merge_executor()->create_token();
_tuple_desc = ctx->tuple_desc;
_writer_id = generate_uuid();
_spill_mode = _sort_ordering != nullptr;
}
SpillPartitionChunkWriter::~SpillPartitionChunkWriter() {
@ -123,10 +128,13 @@ Status SpillPartitionChunkWriter::init() {
return Status::OK();
}
Status SpillPartitionChunkWriter::write(Chunk* chunk) {
Status SpillPartitionChunkWriter::write(const ChunkPtr& chunk) {
RETURN_IF_ERROR(create_file_writer_if_needed());
if (!_spill_mode) {
return _write_chunk(chunk.get());
}
_chunks.push_back(chunk->clone_unique());
_chunks.push_back(chunk);
_chunk_bytes_usage += chunk->bytes_usage();
if (!_base_chunk) {
_base_chunk = _chunks.back();
@ -146,11 +154,21 @@ Status SpillPartitionChunkWriter::write(Chunk* chunk) {
Status SpillPartitionChunkWriter::flush() {
RETURN_IF(!_file_writer, Status::OK());
// Change to spill mode if memory is insufficent.
if (!_spill_mode) {
_spill_mode = true;
commit_file();
return Status::OK();
}
return _spill();
}
Status SpillPartitionChunkWriter::finish() {
Status SpillPartitionChunkWriter::wait_flush() {
_chunk_spill_token->wait();
return Status::OK();
}
Status SpillPartitionChunkWriter::finish() {
// If no chunks have been spilled, flush data to remote file directly.
if (_load_chunk_spiller->empty()) {
VLOG(2) << "flush to remote directly when finish, query_id: " << print_id(_fragment_context->query_id())
@ -166,7 +184,8 @@ Status SpillPartitionChunkWriter::finish() {
_handle_err(st);
commit_file();
};
auto merge_task = std::make_shared<MergeBlockTask>(this, cb);
auto merge_task = std::make_shared<MergeBlockTask>(this, _fragment_context->runtime_state()->instance_mem_tracker(),
std::move(cb));
return _block_merge_token->submit(merge_task);
}
@ -179,9 +198,7 @@ bool SpillPartitionChunkWriter::is_finished() {
}
Status SpillPartitionChunkWriter::merge_blocks() {
RETURN_IF_ERROR(flush());
_chunk_spill_token->wait();
auto write_func = [this](Chunk* chunk) { return _flush_chunk(chunk, false); };
auto flush_func = [this]() {
// Commit file after each merge function to ensure the data written to one file is ordered,
@ -236,10 +253,13 @@ Status SpillPartitionChunkWriter::_spill() {
}
_spilling_bytes_usage.fetch_sub(chunk->bytes_usage(), std::memory_order_relaxed);
};
auto spill_task = std::make_shared<ChunkSpillTask>(_load_chunk_spiller.get(), _result_chunk, callback);
auto spill_task = std::make_shared<ChunkSpillTask>(_load_chunk_spiller.get(), _result_chunk,
_fragment_context->runtime_state()->instance_mem_tracker(),
std::move(callback));
RETURN_IF_ERROR(_chunk_spill_token->submit(spill_task));
_spilling_bytes_usage.fetch_add(_result_chunk->bytes_usage(), std::memory_order_relaxed);
_chunk_bytes_usage = 0;
_result_chunk.reset();
return Status::OK();
}
@ -327,7 +347,6 @@ Status SpillPartitionChunkWriter::_merge_chunks() {
}
}
}
chunk.reset();
}

View File

@ -63,10 +63,12 @@ public:
virtual Status init() = 0;
virtual Status write(Chunk* chunk) = 0;
virtual Status write(const ChunkPtr& chunk) = 0;
virtual Status flush() = 0;
virtual Status wait_flush() = 0;
virtual Status finish() = 0;
virtual bool is_finished() = 0;
@ -118,10 +120,12 @@ public:
Status init() override;
Status write(Chunk* chunk) override;
Status write(const ChunkPtr& chunk) override;
Status flush() override;
Status wait_flush() override;
Status finish() override;
bool is_finished() override { return true; }
@ -140,10 +144,12 @@ public:
Status init() override;
Status write(Chunk* chunk) override;
Status write(const ChunkPtr& chunk) override;
Status flush() override;
Status wait_flush() override;
Status finish() override;
bool is_finished() override;
@ -156,7 +162,12 @@ public:
_file_writer->get_written_bytes();
}
int64_t get_flushable_bytes() override { return _chunk_bytes_usage; }
int64_t get_flushable_bytes() override {
if (!_spill_mode) {
return _file_writer ? _file_writer->get_written_bytes() : 0;
}
return _chunk_bytes_usage;
}
Status merge_blocks();
@ -200,6 +211,7 @@ private:
ChunkPtr _base_chunk;
SchemaPtr _schema;
std::unordered_map<int, int> _col_index_map; // result chunk index -> chunk index
bool _spill_mode = false;
static const int64_t kWaitMilliseconds;
};

View File

@ -51,8 +51,9 @@ bool SinkOperatorMemoryManager::kill_victim() {
// The flush will decrease the writer flushable memory bytes, so it usually
// will not be choosed in a short time.
const auto filename = victim->out_stream()->filename();
size_t flush_bytes = victim->get_flushable_bytes();
const auto result = victim->flush();
LOG(INFO) << "kill victim: " << filename << ", result: " << result;
LOG(INFO) << "kill victim: " << filename << ", result: " << result << ", flushable_bytes: " << flush_bytes;
return true;
}
@ -118,7 +119,7 @@ bool SinkMemoryManager::_apply_on_mem_tracker(SinkOperatorMemoryManager* child_m
auto available_memory = [&]() { return mem_tracker->limit() - mem_tracker->consumption(); };
auto low_watermark = static_cast<int64_t>(mem_tracker->limit() * _low_watermark_ratio);
int64_t flush_watermark = _query_tracker->limit() * _urgent_space_ratio;
int64_t flush_watermark = mem_tracker->limit() * _urgent_space_ratio;
while (available_memory() <= low_watermark) {
child_manager->update_writer_occupied_memory();
int64_t total_occupied_memory = _total_writer_occupied_memory();

View File

@ -109,7 +109,7 @@ StatusOr<ChunkPtr> ConnectorSinkOperator::pull_chunk(RuntimeState* state) {
}
Status ConnectorSinkOperator::push_chunk(RuntimeState* state, const ChunkPtr& chunk) {
RETURN_IF_ERROR(_connector_chunk_sink->add(chunk.get()));
RETURN_IF_ERROR(_connector_chunk_sink->add(chunk));
return Status::OK();
}

View File

@ -128,8 +128,7 @@ TEST_F(IcebergChunkSinkTest, test_callback) {
auto chunk_extra_data = std::make_shared<ChunkExtraColumnsData>(extra_metas, std::move(partition_key_columns));
// Unlock during merging partition chunks into a full chunk.
chunk->set_extra_data(chunk_extra_data);
Chunk* raw_chunk_ptr = chunk.get();
auto ret = sink->add(raw_chunk_ptr);
auto ret = sink->add(chunk);
EXPECT_EQ(ret.ok(), true);
sink->callback_on_commit(CommitResult{
.io_status = Status::OK(),

View File

@ -208,7 +208,7 @@ TEST_F(PartitionChunkWriterTest, buffer_partition_chunk_writer) {
chunk->get_column_by_index(0)->append_datum(Slice("aaa"));
// Write chunk
auto ret = partition_writer->write(chunk.get());
auto ret = partition_writer->write(chunk);
EXPECT_EQ(ret.ok(), true);
EXPECT_EQ(writer_helper->written_rows(), 1);
EXPECT_EQ(writer_helper->result_rows(), 0);
@ -216,6 +216,8 @@ TEST_F(PartitionChunkWriterTest, buffer_partition_chunk_writer) {
// Flush chunk
ret = partition_writer->flush();
EXPECT_EQ(ret.ok(), true);
ret = partition_writer->wait_flush();
EXPECT_EQ(ret.ok(), true);
EXPECT_EQ(commited, true);
EXPECT_EQ(partition_writer->is_finished(), true);
EXPECT_EQ(partition_writer->get_written_bytes(), 0);
@ -272,7 +274,7 @@ TEST_F(PartitionChunkWriterTest, spill_partition_chunk_writer) {
chunk->get_column_by_index(0)->append_datum(Slice("aaa"));
// Write chunk
auto ret = partition_writer->write(chunk.get());
auto ret = partition_writer->write(chunk);
EXPECT_EQ(ret.ok(), true);
EXPECT_GT(partition_writer->get_written_bytes(), 0);
EXPECT_EQ(partition_writer->get_flushable_bytes(), chunk->bytes_usage());
@ -298,12 +300,15 @@ TEST_F(PartitionChunkWriterTest, spill_partition_chunk_writer) {
for (size_t i = 0; i < 3; ++i) {
// Write chunk
auto ret = partition_writer->write(chunk.get());
auto ret = partition_writer->write(chunk);
EXPECT_EQ(ret.ok(), true);
EXPECT_GT(partition_writer->get_written_bytes(), 0);
// Flush chunk
ret = partition_writer->_spill();
EXPECT_GT(partition_writer->get_flushable_bytes(), 0);
ret = partition_writer->flush();
EXPECT_EQ(ret.ok(), true);
ret = partition_writer->wait_flush();
EXPECT_EQ(ret.ok(), true);
Awaitility()
.timeout(3 * 1000 * 1000) // 3s
@ -389,7 +394,7 @@ TEST_F(PartitionChunkWriterTest, sort_column_asc) {
chunk->get_column_by_index(0)->append_datum(Slice("aaa" + suffix));
// Write chunk
auto ret = partition_writer->write(chunk.get());
auto ret = partition_writer->write(chunk);
EXPECT_EQ(ret.ok(), true);
EXPECT_GT(partition_writer->get_written_bytes(), 0);
}
@ -432,12 +437,14 @@ TEST_F(PartitionChunkWriterTest, sort_column_asc) {
chunk->get_column_by_index(0)->append_datum(Slice("aaa" + suffix));
// Write chunk
auto ret = partition_writer->write(chunk.get());
auto ret = partition_writer->write(chunk);
EXPECT_EQ(ret.ok(), true);
EXPECT_GT(partition_writer->get_written_bytes(), 0);
// Flush chunk
ret = partition_writer->_spill();
ret = partition_writer->flush();
EXPECT_EQ(ret.ok(), true);
ret = partition_writer->wait_flush();
EXPECT_EQ(ret.ok(), true);
Awaitility()
.timeout(3 * 1000 * 1000) // 3s
@ -536,7 +543,7 @@ TEST_F(PartitionChunkWriterTest, sort_column_desc) {
chunk->get_column_by_index(0)->append_datum(Slice("aaa" + suffix));
// Write chunk
auto ret = partition_writer->write(chunk.get());
auto ret = partition_writer->write(chunk);
EXPECT_EQ(ret.ok(), true);
EXPECT_GT(partition_writer->get_written_bytes(), 0);
}
@ -579,12 +586,14 @@ TEST_F(PartitionChunkWriterTest, sort_column_desc) {
chunk->get_column_by_index(0)->append_datum(Slice("aaa" + suffix));
// Write chunk
auto ret = partition_writer->write(chunk.get());
auto ret = partition_writer->write(chunk);
EXPECT_EQ(ret.ok(), true);
EXPECT_GT(partition_writer->get_written_bytes(), 0);
// Flush chunk
ret = partition_writer->_spill();
ret = partition_writer->flush();
EXPECT_EQ(ret.ok(), true);
ret = partition_writer->wait_flush();
EXPECT_EQ(ret.ok(), true);
Awaitility()
.timeout(3 * 1000 * 1000) // 3s
@ -693,12 +702,14 @@ TEST_F(PartitionChunkWriterTest, sort_multiple_columns) {
chunk->get_column_by_index(1)->append_datum(Slice("111" + suffix));
// Write chunk
auto ret = partition_writer->write(chunk.get());
auto ret = partition_writer->write(chunk);
EXPECT_EQ(ret.ok(), true);
EXPECT_GT(partition_writer->get_written_bytes(), 0);
// Flush chunk
ret = partition_writer->_spill();
ret = partition_writer->flush();
EXPECT_EQ(ret.ok(), true);
ret = partition_writer->wait_flush();
EXPECT_EQ(ret.ok(), true);
Awaitility()
.timeout(3 * 1000 * 1000) // 3s
@ -813,12 +824,14 @@ TEST_F(PartitionChunkWriterTest, sort_column_with_schema_chunk) {
chunk->get_column_by_index(0)->append_datum(Slice("aaa" + suffix));
// Write chunk
auto ret = partition_writer->write(chunk.get());
auto ret = partition_writer->write(chunk);
EXPECT_EQ(ret.ok(), true);
EXPECT_GT(partition_writer->get_written_bytes(), 0);
// Flush chunk
ret = partition_writer->_spill();
ret = partition_writer->flush();
EXPECT_EQ(ret.ok(), true);
ret = partition_writer->wait_flush();
EXPECT_EQ(ret.ok(), true);
Awaitility()
.timeout(3 * 1000 * 1000) // 3s

View File

@ -57,8 +57,9 @@ public:
: PartitionChunkWriter(partition, partition_field_null_list, ctx) {}
MOCK_METHOD(Status, init, (), (override));
MOCK_METHOD(Status, write, (Chunk * chunk), (override));
MOCK_METHOD(Status, write, (const starrocks::ChunkPtr&), (override));
MOCK_METHOD(Status, finish, (), (override));
MOCK_METHOD(Status, wait_flush, (), (override));
MOCK_METHOD(bool, is_finished, (), (override));
MOCK_METHOD(int64_t, get_written_bytes, (), (override));