[Enhancement] Introduce a connector partition chunk writer to support spilling chunk data for iceberg table sink. (backport #61963) (#62062)

Signed-off-by: GavinMar <yangguansuo@starrocks.com>
Co-authored-by: Gavin <yangguansuo@starrocks.com>
This commit is contained in:
mergify[bot] 2025-08-19 14:41:42 +08:00 committed by GitHub
parent 3e09498f8f
commit 24d26c33ac
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
37 changed files with 1503 additions and 152 deletions

View File

@ -1589,6 +1589,8 @@ CONF_mBool(apply_del_vec_after_all_index_filter, "true");
CONF_mDouble(connector_sink_mem_high_watermark_ratio, "0.3");
CONF_mDouble(connector_sink_mem_low_watermark_ratio, "0.1");
CONF_mDouble(connector_sink_mem_urgent_space_ratio, "0.1");
// Whether enable spill intermediate data for connector sink.
CONF_mBool(enable_connector_sink_spill, "true");
// .crm file can be removed after 1day.
CONF_mInt32(unused_crm_file_threshold_second, "86400" /** 1day **/);

View File

@ -31,6 +31,8 @@ add_library(Connector STATIC
utils.cpp
async_flush_stream_poller.cpp
sink_memory_manager.cpp
partition_chunk_writer.cpp
connector_sink_executor.cpp
deletion_vector/deletion_vector.cpp
deletion_vector/deletion_bitmap.cpp
)

View File

@ -16,7 +16,7 @@
namespace starrocks::connector {
void AsyncFlushStreamPoller::enqueue(std::unique_ptr<Stream> stream) {
void AsyncFlushStreamPoller::enqueue(std::shared_ptr<Stream> stream) {
auto async_status = stream->io_status();
_queue.push_back({
.stream = std::move(stream),

View File

@ -34,7 +34,7 @@ public:
virtual ~AsyncFlushStreamPoller() = default;
virtual void enqueue(std::unique_ptr<Stream> stream);
virtual void enqueue(std::shared_ptr<Stream> stream);
// return a pair of
// 1. io status
@ -45,7 +45,7 @@ public:
private:
struct StreamWithStatus {
std::unique_ptr<Stream> stream;
std::shared_ptr<Stream> stream;
std::future<Status> async_status;
};

View File

@ -24,21 +24,18 @@ namespace starrocks::connector {
ConnectorChunkSink::ConnectorChunkSink(std::vector<std::string> partition_columns,
std::vector<std::unique_ptr<ColumnEvaluator>>&& partition_column_evaluators,
std::unique_ptr<LocationProvider> location_provider,
std::unique_ptr<formats::FileWriterFactory> file_writer_factory,
int64_t max_file_size, RuntimeState* state, bool support_null_partition)
std::unique_ptr<PartitionChunkWriterFactory> partition_chunk_writer_factory,
RuntimeState* state, bool support_null_partition)
: _partition_column_names(std::move(partition_columns)),
_partition_column_evaluators(std::move(partition_column_evaluators)),
_location_provider(std::move(location_provider)),
_file_writer_factory(std::move(file_writer_factory)),
_max_file_size(max_file_size),
_partition_chunk_writer_factory(std::move(partition_chunk_writer_factory)),
_state(state),
_support_null_partition(support_null_partition) {}
Status ConnectorChunkSink::init() {
RETURN_IF_ERROR(ColumnEvaluator::init(_partition_column_evaluators));
RETURN_IF_ERROR(_file_writer_factory->init());
_op_mem_mgr->init(&_writer_stream_pairs, _io_poller,
RETURN_IF_ERROR(_partition_chunk_writer_factory->init());
_op_mem_mgr->init(&_partition_chunk_writers, _io_poller,
[this](const CommitResult& r) { this->callback_on_commit(r); });
return Status::OK();
}
@ -49,38 +46,20 @@ Status ConnectorChunkSink::write_partition_chunk(const std::string& partition,
// They are under the same dir path, but should not in the same data file.
// We should record them in different files so that each data file could has its own meta info.
// otherwise, the scanFileTask may filter data incorrectly.
auto it = _writer_stream_pairs.find(std::make_pair(partition, partition_field_null_list));
if (it != _writer_stream_pairs.end()) {
Writer* writer = it->second.first.get();
if (writer->get_written_bytes() >= _max_file_size) {
string null_fingerprint(partition_field_null_list.size(), '0');
std::transform(partition_field_null_list.begin(), partition_field_null_list.end(), null_fingerprint.begin(),
[](int8_t b) { return b + '0'; });
callback_on_commit(writer->commit().set_extra_data(null_fingerprint));
_writer_stream_pairs.erase(it);
auto path =
!_partition_column_names.empty() ? _location_provider->get(partition) : _location_provider->get();
ASSIGN_OR_RETURN(auto new_writer_and_stream, _file_writer_factory->create(path));
std::unique_ptr<Writer> new_writer = std::move(new_writer_and_stream.writer);
std::unique_ptr<Stream> new_stream = std::move(new_writer_and_stream.stream);
RETURN_IF_ERROR(new_writer->init());
RETURN_IF_ERROR(new_writer->write(chunk));
_writer_stream_pairs[std::make_pair(partition, partition_field_null_list)] =
std::make_pair(std::move(new_writer), new_stream.get());
_io_poller->enqueue(std::move(new_stream));
} else {
RETURN_IF_ERROR(writer->write(chunk));
}
PartitionKey partition_key = std::make_pair(partition, partition_field_null_list);
auto it = _partition_chunk_writers.find(partition_key);
if (it != _partition_chunk_writers.end()) {
return it->second->write(chunk);
} else {
auto path = !_partition_column_names.empty() ? _location_provider->get(partition) : _location_provider->get();
ASSIGN_OR_RETURN(auto new_writer_and_stream, _file_writer_factory->create(path));
std::unique_ptr<Writer> new_writer = std::move(new_writer_and_stream.writer);
std::unique_ptr<Stream> new_stream = std::move(new_writer_and_stream.stream);
RETURN_IF_ERROR(new_writer->init());
RETURN_IF_ERROR(new_writer->write(chunk));
_writer_stream_pairs[std::make_pair(partition, partition_field_null_list)] =
std::make_pair(std::move(new_writer), new_stream.get());
_io_poller->enqueue(std::move(new_stream));
auto writer = _partition_chunk_writer_factory->create(partition, partition_field_null_list);
auto commit_callback = [this](const CommitResult& r) { this->callback_on_commit(r); };
auto error_handler = [this](const Status& s) { this->set_status(s); };
writer->set_commit_callback(commit_callback);
writer->set_error_handler(error_handler);
writer->set_io_poller(_io_poller);
RETURN_IF_ERROR(writer->init());
RETURN_IF_ERROR(writer->write(chunk));
_partition_chunk_writers[partition_key] = writer;
}
return Status::OK();
}
@ -100,11 +79,8 @@ Status ConnectorChunkSink::add(Chunk* chunk) {
}
Status ConnectorChunkSink::finish() {
for (auto& [partition_key, writer_and_stream] : _writer_stream_pairs) {
string extra_data(partition_key.second.size(), '0');
std::transform(partition_key.second.begin(), partition_key.second.end(), extra_data.begin(),
[](int8_t b) { return b + '0'; });
callback_on_commit(writer_and_stream.first->commit().set_extra_data(extra_data));
for (auto& [partition_key, writer] : _partition_chunk_writers) {
RETURN_IF_ERROR(writer->finish());
}
return Status::OK();
}
@ -115,4 +91,23 @@ void ConnectorChunkSink::rollback() {
}
}
void ConnectorChunkSink::set_status(const Status& status) {
std::unique_lock<std::shared_mutex> wlck(_mutex);
_status = status;
}
Status ConnectorChunkSink::status() {
std::shared_lock<std::shared_mutex> rlck(_mutex);
return _status;
}
bool ConnectorChunkSink::is_finished() {
for (auto& [partition_key, writer] : _partition_chunk_writers) {
if (!writer->is_finished()) {
return false;
}
}
return true;
}
} // namespace starrocks::connector

View File

@ -20,8 +20,8 @@
#include "column/chunk.h"
#include "common/status.h"
#include "connector/partition_chunk_writer.h"
#include "connector/utils.h"
#include "formats/file_writer.h"
#include "fs/fs.h"
#include "runtime/runtime_state.h"
@ -30,20 +30,14 @@ namespace starrocks::connector {
class AsyncFlushStreamPoller;
class SinkOperatorMemoryManager;
using Writer = formats::FileWriter;
using Stream = io::AsyncFlushOutputStream;
using WriterStreamPair = std::pair<std::unique_ptr<Writer>, Stream*>;
using PartitionKey = std::pair<std::string, std::vector<int8_t>>;
using CommitResult = formats::FileWriter::CommitResult;
using CommitFunc = std::function<void(const CommitResult& result)>;
class ConnectorChunkSink {
public:
ConnectorChunkSink(std::vector<std::string> partition_columns,
std::vector<std::unique_ptr<ColumnEvaluator>>&& partition_column_evaluators,
std::unique_ptr<LocationProvider> location_provider,
std::unique_ptr<formats::FileWriterFactory> file_writer_factory, int64_t max_file_size,
RuntimeState* state, bool support_null_partition);
std::unique_ptr<PartitionChunkWriterFactory> partition_chunk_writer_factory, RuntimeState* state,
bool support_null_partition);
void set_io_poller(AsyncFlushStreamPoller* poller) { _io_poller = poller; }
@ -59,26 +53,33 @@ public:
void rollback();
bool is_finished();
virtual void callback_on_commit(const CommitResult& result) = 0;
Status write_partition_chunk(const std::string& partition, const vector<int8_t>& partition_field_null_list,
Chunk* chunk);
Status status();
void set_status(const Status& status);
protected:
AsyncFlushStreamPoller* _io_poller = nullptr;
SinkOperatorMemoryManager* _op_mem_mgr = nullptr;
std::vector<std::string> _partition_column_names;
std::vector<std::unique_ptr<ColumnEvaluator>> _partition_column_evaluators;
std::unique_ptr<LocationProvider> _location_provider;
std::unique_ptr<formats::FileWriterFactory> _file_writer_factory;
int64_t _max_file_size = 1024L * 1024 * 1024;
std::unique_ptr<PartitionChunkWriterFactory> _partition_chunk_writer_factory;
RuntimeState* _state = nullptr;
bool _support_null_partition{false};
std::vector<std::function<void()>> _rollback_actions;
std::map<PartitionKey, WriterStreamPair> _writer_stream_pairs;
std::map<PartitionKey, PartitionChunkWriterPtr> _partition_chunk_writers;
inline static std::string DEFAULT_PARTITION = "__DEFAULT_PARTITION__";
std::shared_mutex _mutex;
Status _status;
};
struct ConnectorChunkSinkContext {

View File

@ -0,0 +1,66 @@
// Copyright 2021-present StarRocks, Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "connector/connector_sink_executor.h"
#include "column/chunk.h"
#include "common/status.h"
#include "connector/partition_chunk_writer.h"
#include "storage/load_chunk_spiller.h"
namespace starrocks::connector {
Status ConnectorSinkSpillExecutor::init() {
return ThreadPoolBuilder(_executor_name)
.set_min_threads(0)
.set_max_threads(calc_max_thread_num())
.build(&_thread_pool);
}
int ConnectorSinkSpillExecutor::calc_max_thread_num() {
int dir_count = 0;
std::vector<starrocks::StorePath> spill_local_storage_paths;
Status st = parse_conf_store_paths(config::spill_local_storage_dir, &spill_local_storage_paths);
if (st.ok()) {
dir_count = spill_local_storage_paths.size();
}
int threads = config::lake_flush_thread_num_per_store;
if (threads == 0) {
threads = -2;
}
if (threads <= 0) {
threads = -threads;
threads *= CpuInfo::num_cores();
}
dir_count = std::max(1, dir_count);
dir_count = std::min(8, dir_count);
return dir_count * threads;
}
void ChunkSpillTask::run() {
auto res = _load_chunk_spiller->spill(*_chunk);
if (_cb) {
_cb(_chunk, res);
}
}
void MergeBlockTask::run() {
auto st = _writer->merge_blocks();
if (_cb) {
_cb(st);
}
}
} // namespace starrocks::connector

View File

@ -0,0 +1,100 @@
// Copyright 2021-present StarRocks, Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#pragma once
#include <fmt/format.h>
#include <map>
#include "column/chunk.h"
#include "common/status.h"
#include "connector/utils.h"
#include "util/threadpool.h"
namespace starrocks {
class LoadChunkSpiller;
}
namespace starrocks::connector {
class SpillPartitionChunkWriter;
class ConnectorSinkExecutor {
public:
ConnectorSinkExecutor(const std::string& executor_name) : _executor_name(executor_name) {}
virtual ~ConnectorSinkExecutor() {}
virtual Status init() = 0;
ThreadPool* get_thread_pool() { return _thread_pool.get(); }
std::unique_ptr<ThreadPoolToken> create_token() {
return _thread_pool->new_token(ThreadPool::ExecutionMode::SERIAL);
}
Status refresh_max_thread_num() {
if (_thread_pool != nullptr) {
return _thread_pool->update_max_threads(calc_max_thread_num());
}
return Status::OK();
}
protected:
virtual int calc_max_thread_num() = 0;
protected:
std::string _executor_name;
std::unique_ptr<ThreadPool> _thread_pool;
};
class ConnectorSinkSpillExecutor : public ConnectorSinkExecutor {
public:
ConnectorSinkSpillExecutor() : ConnectorSinkExecutor("conn_sink_spill") {}
Status init() override;
protected:
int calc_max_thread_num() override;
};
class ChunkSpillTask final : public Runnable {
public:
ChunkSpillTask(LoadChunkSpiller* load_chunk_spiller, ChunkPtr chunk,
std::function<void(ChunkPtr chunk, const StatusOr<size_t>&)> cb)
: _load_chunk_spiller(load_chunk_spiller), _chunk(chunk), _cb(std::move(cb)) {}
~ChunkSpillTask() override = default;
void run() override;
private:
LoadChunkSpiller* _load_chunk_spiller;
ChunkPtr _chunk;
std::function<void(ChunkPtr, const StatusOr<size_t>&)> _cb;
};
class MergeBlockTask : public Runnable {
public:
MergeBlockTask(SpillPartitionChunkWriter* writer, std::function<void(const Status&)> cb)
: _writer(writer), _cb(std::move(cb)) {}
void run() override;
private:
SpillPartitionChunkWriter* _writer;
std::function<void(const Status&)> _cb;
};
} // namespace starrocks::connector

View File

@ -31,12 +31,10 @@ namespace starrocks::connector {
FileChunkSink::FileChunkSink(std::vector<std::string> partition_columns,
std::vector<std::unique_ptr<ColumnEvaluator>>&& partition_column_evaluators,
std::unique_ptr<LocationProvider> location_provider,
std::unique_ptr<formats::FileWriterFactory> file_writer_factory, int64_t max_file_size,
std::unique_ptr<PartitionChunkWriterFactory> partition_chunk_writer_factory,
RuntimeState* state)
: ConnectorChunkSink(std::move(partition_columns), std::move(partition_column_evaluators),
std::move(location_provider), std::move(file_writer_factory), max_file_size, state, true) {
}
std::move(partition_chunk_writer_factory), state, true) {}
void FileChunkSink::callback_on_commit(const CommitResult& result) {
_rollback_actions.push_back(std::move(result.rollback_action));
@ -51,25 +49,25 @@ StatusOr<std::unique_ptr<ConnectorChunkSink>> FileChunkSinkProvider::create_chun
auto runtime_state = ctx->fragment_context->runtime_state();
auto fs = FileSystem::CreateUniqueFromString(ctx->path, FSOptions(&ctx->cloud_conf)).value();
auto column_evaluators = ColumnEvaluator::clone(ctx->column_evaluators);
auto location_provider = std::make_unique<connector::LocationProvider>(
auto location_provider = std::make_shared<connector::LocationProvider>(
ctx->path, print_id(ctx->fragment_context->query_id()), runtime_state->be_number(), driver_id,
boost::to_lower_copy(ctx->format));
std::unique_ptr<formats::FileWriterFactory> file_writer_factory;
std::shared_ptr<formats::FileWriterFactory> file_writer_factory;
if (boost::iequals(ctx->format, formats::PARQUET)) {
file_writer_factory = std::make_unique<formats::ParquetFileWriterFactory>(
file_writer_factory = std::make_shared<formats::ParquetFileWriterFactory>(
std::move(fs), ctx->compression_type, ctx->options, ctx->column_names, std::move(column_evaluators),
std::nullopt, ctx->executor, runtime_state);
} else if (boost::iequals(ctx->format, formats::ORC)) {
file_writer_factory = std::make_unique<formats::ORCFileWriterFactory>(
file_writer_factory = std::make_shared<formats::ORCFileWriterFactory>(
std::move(fs), ctx->compression_type, ctx->options, ctx->column_names, std::move(column_evaluators),
ctx->executor, runtime_state);
} else if (boost::iequals(ctx->format, formats::CSV)) {
file_writer_factory = std::make_unique<formats::CSVFileWriterFactory>(
file_writer_factory = std::make_shared<formats::CSVFileWriterFactory>(
std::move(fs), ctx->compression_type, ctx->options, ctx->column_names, std::move(column_evaluators),
ctx->executor, runtime_state);
} else {
file_writer_factory = std::make_unique<formats::UnknownFileWriterFactory>(ctx->format);
file_writer_factory = std::make_shared<formats::UnknownFileWriterFactory>(ctx->format);
}
std::vector<std::string> partition_columns;
@ -78,9 +76,27 @@ StatusOr<std::unique_ptr<ConnectorChunkSink>> FileChunkSinkProvider::create_chun
partition_columns.push_back(ctx->column_names[idx]);
partition_column_evaluators.push_back(ctx->column_evaluators[idx]->clone());
}
std::unique_ptr<PartitionChunkWriterFactory> partition_chunk_writer_factory;
// Disable the load spill for file sink temperarily
if (/* config::enable_connector_sink_spill */ false) {
auto partition_chunk_writer_ctx =
std::make_shared<SpillPartitionChunkWriterContext>(SpillPartitionChunkWriterContext{
{file_writer_factory, location_provider, ctx->max_file_size, partition_columns.empty()},
ctx->fragment_context,
nullptr,
nullptr});
partition_chunk_writer_factory = std::make_unique<SpillPartitionChunkWriterFactory>(partition_chunk_writer_ctx);
} else {
auto partition_chunk_writer_ctx =
std::make_shared<BufferPartitionChunkWriterContext>(BufferPartitionChunkWriterContext{
{file_writer_factory, location_provider, ctx->max_file_size, partition_columns.empty()}});
partition_chunk_writer_factory =
std::make_unique<BufferPartitionChunkWriterFactory>(partition_chunk_writer_ctx);
}
return std::make_unique<connector::FileChunkSink>(partition_columns, std::move(partition_column_evaluators),
std::move(location_provider), std::move(file_writer_factory),
ctx->max_file_size, runtime_state);
std::move(partition_chunk_writer_factory), runtime_state);
}
} // namespace starrocks::connector

View File

@ -36,9 +36,7 @@ class FileChunkSink : public ConnectorChunkSink {
public:
FileChunkSink(std::vector<std::string> partition_columns,
std::vector<std::unique_ptr<ColumnEvaluator>>&& partition_column_evaluators,
std::unique_ptr<LocationProvider> location_provider,
std::unique_ptr<formats::FileWriterFactory> file_writer_factory, int64_t max_file_size,
RuntimeState* state);
std::unique_ptr<PartitionChunkWriterFactory> partition_chunk_writer_factory, RuntimeState* state);
~FileChunkSink() override = default;

View File

@ -29,12 +29,10 @@ namespace starrocks::connector {
HiveChunkSink::HiveChunkSink(std::vector<std::string> partition_columns,
std::vector<std::unique_ptr<ColumnEvaluator>>&& partition_column_evaluators,
std::unique_ptr<LocationProvider> location_provider,
std::unique_ptr<formats::FileWriterFactory> file_writer_factory, int64_t max_file_size,
std::unique_ptr<PartitionChunkWriterFactory> partition_chunk_writer_factory,
RuntimeState* state)
: ConnectorChunkSink(std::move(partition_columns), std::move(partition_column_evaluators),
std::move(location_provider), std::move(file_writer_factory), max_file_size, state,
false) {}
std::move(partition_chunk_writer_factory), state, false) {}
void HiveChunkSink::callback_on_commit(const CommitResult& result) {
_rollback_actions.push_back(std::move(result.rollback_action));
@ -57,34 +55,52 @@ StatusOr<std::unique_ptr<ConnectorChunkSink>> HiveChunkSinkProvider::create_chun
auto runtime_state = ctx->fragment_context->runtime_state();
auto fs = FileSystem::CreateUniqueFromString(ctx->path, FSOptions(&ctx->cloud_conf)).value(); // must succeed
auto data_column_evaluators = ColumnEvaluator::clone(ctx->data_column_evaluators);
auto location_provider = std::make_unique<connector::LocationProvider>(
auto location_provider = std::make_shared<connector::LocationProvider>(
ctx->path, print_id(ctx->fragment_context->query_id()), runtime_state->be_number(), driver_id,
boost::to_lower_copy(ctx->format));
std::unique_ptr<formats::FileWriterFactory> file_writer_factory;
std::shared_ptr<formats::FileWriterFactory> file_writer_factory;
if (boost::iequals(ctx->format, formats::PARQUET)) {
// ensure hive compatibility since hive 3 and lower version accepts specific encoding
ctx->options[formats::ParquetWriterOptions::USE_LEGACY_DECIMAL_ENCODING] = "true";
ctx->options[formats::ParquetWriterOptions::USE_INT96_TIMESTAMP_ENCODING] = "true";
file_writer_factory = std::make_unique<formats::ParquetFileWriterFactory>(
file_writer_factory = std::make_shared<formats::ParquetFileWriterFactory>(
std::move(fs), ctx->compression_type, ctx->options, ctx->data_column_names,
std::move(data_column_evaluators), std::nullopt, ctx->executor, runtime_state);
} else if (boost::iequals(ctx->format, formats::ORC)) {
file_writer_factory = std::make_unique<formats::ORCFileWriterFactory>(
file_writer_factory = std::make_shared<formats::ORCFileWriterFactory>(
std::move(fs), ctx->compression_type, ctx->options, ctx->data_column_names,
std::move(data_column_evaluators), ctx->executor, runtime_state);
} else if (boost::iequals(ctx->format, formats::TEXTFILE)) {
file_writer_factory = std::make_unique<formats::CSVFileWriterFactory>(
file_writer_factory = std::make_shared<formats::CSVFileWriterFactory>(
std::move(fs), ctx->compression_type, ctx->options, ctx->data_column_names,
std::move(data_column_evaluators), ctx->executor, runtime_state);
} else {
file_writer_factory = std::make_unique<formats::UnknownFileWriterFactory>(ctx->format);
file_writer_factory = std::make_shared<formats::UnknownFileWriterFactory>(ctx->format);
}
std::unique_ptr<PartitionChunkWriterFactory> partition_chunk_writer_factory;
// Disable the load spill for hive sink temperarily
if (/* config::enable_connector_sink_spill */ false) {
auto partition_chunk_writer_ctx = std::make_shared<SpillPartitionChunkWriterContext>(
SpillPartitionChunkWriterContext{{file_writer_factory, location_provider, ctx->max_file_size,
ctx->partition_column_names.empty()},
ctx->fragment_context,
nullptr,
nullptr});
partition_chunk_writer_factory = std::make_unique<SpillPartitionChunkWriterFactory>(partition_chunk_writer_ctx);
} else {
auto partition_chunk_writer_ctx = std::make_shared<BufferPartitionChunkWriterContext>(
BufferPartitionChunkWriterContext{{file_writer_factory, location_provider, ctx->max_file_size,
ctx->partition_column_names.empty()}});
partition_chunk_writer_factory =
std::make_unique<BufferPartitionChunkWriterFactory>(partition_chunk_writer_ctx);
}
auto partition_column_evaluators = ColumnEvaluator::clone(ctx->partition_column_evaluators);
return std::make_unique<connector::HiveChunkSink>(
ctx->partition_column_names, std::move(partition_column_evaluators), std::move(location_provider),
std::move(file_writer_factory), ctx->max_file_size, runtime_state);
return std::make_unique<connector::HiveChunkSink>(ctx->partition_column_names,
std::move(partition_column_evaluators),
std::move(partition_chunk_writer_factory), runtime_state);
}
} // namespace starrocks::connector

View File

@ -38,9 +38,7 @@ class HiveChunkSink : public ConnectorChunkSink {
public:
HiveChunkSink(std::vector<std::string> partition_columns,
std::vector<std::unique_ptr<ColumnEvaluator>>&& partition_column_evaluators,
std::unique_ptr<LocationProvider> location_provider,
std::unique_ptr<formats::FileWriterFactory> file_writer_factory, int64_t max_file_size,
RuntimeState* state);
std::unique_ptr<PartitionChunkWriterFactory> partition_chunk_writer_factory, RuntimeState* state);
~HiveChunkSink() override = default;

View File

@ -30,11 +30,10 @@ namespace starrocks::connector {
IcebergChunkSink::IcebergChunkSink(std::vector<std::string> partition_columns, std::vector<std::string> transform_exprs,
std::vector<std::unique_ptr<ColumnEvaluator>>&& partition_column_evaluators,
std::unique_ptr<LocationProvider> location_provider,
std::unique_ptr<formats::FileWriterFactory> file_writer_factory,
int64_t max_file_size, RuntimeState* state)
std::unique_ptr<PartitionChunkWriterFactory> partition_chunk_writer_factory,
RuntimeState* state)
: ConnectorChunkSink(std::move(partition_columns), std::move(partition_column_evaluators),
std::move(location_provider), std::move(file_writer_factory), max_file_size, state, true),
std::move(partition_chunk_writer_factory), state, true),
_transform_exprs(std::move(transform_exprs)) {}
void IcebergChunkSink::callback_on_commit(const CommitResult& result) {
@ -84,25 +83,42 @@ StatusOr<std::unique_ptr<ConnectorChunkSink>> IcebergChunkSinkProvider::create_c
auto runtime_state = ctx->fragment_context->runtime_state();
auto fs = FileSystem::CreateUniqueFromString(ctx->path, FSOptions(&ctx->cloud_conf)).value();
auto column_evaluators = ColumnEvaluator::clone(ctx->column_evaluators);
auto location_provider = std::make_unique<connector::LocationProvider>(
auto location_provider = std::make_shared<connector::LocationProvider>(
ctx->path, print_id(ctx->fragment_context->query_id()), runtime_state->be_number(), driver_id,
boost::to_lower_copy(ctx->format));
std::unique_ptr<formats::FileWriterFactory> file_writer_factory;
if (boost::iequals(ctx->format, formats::PARQUET)) {
file_writer_factory = std::make_unique<formats::ParquetFileWriterFactory>(
std::move(fs), ctx->compression_type, ctx->options, ctx->column_names, std::move(column_evaluators),
ctx->parquet_field_ids, ctx->executor, runtime_state);
} else {
file_writer_factory = std::make_unique<formats::UnknownFileWriterFactory>(ctx->format);
}
std::vector<std::string>& partition_columns = ctx->partition_column_names;
std::vector<std::string>& transform_exprs = ctx->transform_exprs;
auto partition_evaluators = ColumnEvaluator::clone(ctx->partition_evaluators);
return std::make_unique<connector::IcebergChunkSink>(
partition_columns, transform_exprs, std::move(partition_evaluators), std::move(location_provider),
std::move(file_writer_factory), ctx->max_file_size, runtime_state);
std::shared_ptr<formats::FileWriterFactory> file_writer_factory;
if (boost::iequals(ctx->format, formats::PARQUET)) {
file_writer_factory = std::make_shared<formats::ParquetFileWriterFactory>(
std::move(fs), ctx->compression_type, ctx->options, ctx->column_names, std::move(column_evaluators),
ctx->parquet_field_ids, ctx->executor, runtime_state);
} else {
file_writer_factory = std::make_shared<formats::UnknownFileWriterFactory>(ctx->format);
}
std::unique_ptr<PartitionChunkWriterFactory> partition_chunk_writer_factory;
if (config::enable_connector_sink_spill) {
auto partition_chunk_writer_ctx =
std::make_shared<SpillPartitionChunkWriterContext>(SpillPartitionChunkWriterContext{
{file_writer_factory, location_provider, ctx->max_file_size, partition_columns.empty()},
ctx->fragment_context,
runtime_state->desc_tbl().get_tuple_descriptor(ctx->tuple_desc_id),
ctx->sort_ordering});
partition_chunk_writer_factory = std::make_unique<SpillPartitionChunkWriterFactory>(partition_chunk_writer_ctx);
} else {
auto partition_chunk_writer_ctx =
std::make_shared<BufferPartitionChunkWriterContext>(BufferPartitionChunkWriterContext{
{file_writer_factory, location_provider, ctx->max_file_size, partition_columns.empty()}});
partition_chunk_writer_factory =
std::make_unique<BufferPartitionChunkWriterFactory>(partition_chunk_writer_ctx);
}
return std::make_unique<connector::IcebergChunkSink>(partition_columns, transform_exprs,
std::move(partition_evaluators),
std::move(partition_chunk_writer_factory), runtime_state);
}
Status IcebergChunkSink::add(Chunk* chunk) {

View File

@ -37,9 +37,7 @@ class IcebergChunkSink : public ConnectorChunkSink {
public:
IcebergChunkSink(std::vector<std::string> partition_columns, std::vector<std::string> transform_exprs,
std::vector<std::unique_ptr<ColumnEvaluator>>&& partition_column_evaluators,
std::unique_ptr<LocationProvider> location_provider,
std::unique_ptr<formats::FileWriterFactory> file_writer_factory, int64_t max_file_size,
RuntimeState* state);
std::unique_ptr<PartitionChunkWriterFactory> partition_chunk_writer_factory, RuntimeState* state);
~IcebergChunkSink() override = default;
@ -70,6 +68,8 @@ struct IcebergChunkSinkContext : public ConnectorChunkSinkContext {
PriorityThreadPool* executor = nullptr;
TCloudConfiguration cloud_conf;
pipeline::FragmentContext* fragment_context = nullptr;
int tuple_desc_id = -1;
std::shared_ptr<SortOrdering> sort_ordering;
};
class IcebergChunkSinkProvider : public ConnectorChunkSinkProvider {

View File

@ -0,0 +1,339 @@
// Copyright 2021-present StarRocks, Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "connector/partition_chunk_writer.h"
#include "column/chunk.h"
#include "common/status.h"
#include "connector/async_flush_stream_poller.h"
#include "connector/connector_sink_executor.h"
#include "connector/sink_memory_manager.h"
#include "exec/pipeline/fragment_context.h"
#include "formats/file_writer.h"
#include "runtime/runtime_state.h"
#include "storage/chunk_helper.h"
#include "storage/load_spill_block_manager.h"
#include "storage/storage_engine.h"
#include "storage/types.h"
#include "util/monotime.h"
namespace starrocks::connector {
PartitionChunkWriter::PartitionChunkWriter(std::string partition, std::vector<int8_t> partition_field_null_list,
const std::shared_ptr<PartitionChunkWriterContext>& ctx)
: _partition(std::move(partition)),
_partition_field_null_list(std::move(partition_field_null_list)),
_file_writer_factory(ctx->file_writer_factory),
_location_provider(ctx->location_provider),
_max_file_size(ctx->max_file_size),
_is_default_partition(ctx->is_default_partition) {
_commit_extra_data.resize(_partition_field_null_list.size(), '0');
std::transform(_partition_field_null_list.begin(), _partition_field_null_list.end(), _commit_extra_data.begin(),
[](int8_t b) { return b + '0'; });
}
Status PartitionChunkWriter::create_file_writer_if_needed() {
if (!_file_writer) {
std::string path = _is_default_partition ? _location_provider->get() : _location_provider->get(_partition);
ASSIGN_OR_RETURN(auto new_writer_and_stream, _file_writer_factory->create(path));
_file_writer = std::move(new_writer_and_stream.writer);
_out_stream = std::move(new_writer_and_stream.stream);
RETURN_IF_ERROR(_file_writer->init());
_io_poller->enqueue(_out_stream);
}
return Status::OK();
}
void PartitionChunkWriter::commit_file() {
if (!_file_writer) {
return;
}
auto result = _file_writer->commit();
_commit_callback(result.set_extra_data(_commit_extra_data));
_file_writer = nullptr;
VLOG(3) << "commit to remote file, filename: " << _out_stream->filename()
<< ", size: " << result.file_statistics.file_size;
}
Status BufferPartitionChunkWriter::init() {
return Status::OK();
}
Status BufferPartitionChunkWriter::write(Chunk* chunk) {
RETURN_IF_ERROR(create_file_writer_if_needed());
if (_file_writer->get_written_bytes() >= _max_file_size) {
commit_file();
}
return _file_writer->write(chunk);
}
Status BufferPartitionChunkWriter::flush() {
commit_file();
return Status::OK();
}
Status BufferPartitionChunkWriter::finish() {
commit_file();
return Status::OK();
}
SpillPartitionChunkWriter::SpillPartitionChunkWriter(std::string partition,
std::vector<int8_t> partition_field_null_list,
const std::shared_ptr<SpillPartitionChunkWriterContext>& ctx)
: PartitionChunkWriter(std::move(partition), std::move(partition_field_null_list), ctx),
_fragment_context(ctx->fragment_context),
_sort_ordering(ctx->sort_ordering) {
_chunk_spill_token = ExecEnv::GetInstance()->connector_sink_spill_executor()->create_token();
_block_merge_token = StorageEngine::instance()->load_spill_block_merge_executor()->create_token();
_tuple_desc = ctx->tuple_desc;
}
SpillPartitionChunkWriter::~SpillPartitionChunkWriter() {
if (_chunk_spill_token) {
_chunk_spill_token->shutdown();
}
if (_block_merge_token) {
_block_merge_token->shutdown();
}
}
Status SpillPartitionChunkWriter::init() {
std::string root_location =
_is_default_partition ? _location_provider->root_location() : _location_provider->root_location(_partition);
_load_spill_block_mgr = std::make_unique<LoadSpillBlockManager>(
_fragment_context->query_id(), _fragment_context->fragment_instance_id(), root_location);
RETURN_IF_ERROR(_load_spill_block_mgr->init());
_load_chunk_spiller = std::make_unique<LoadChunkSpiller>(_load_spill_block_mgr.get(),
_fragment_context->runtime_state()->runtime_profile());
return Status::OK();
}
Status SpillPartitionChunkWriter::write(Chunk* chunk) {
RETURN_IF_ERROR(create_file_writer_if_needed());
_chunks.push_back(chunk->clone_unique());
_chunk_bytes_usage += chunk->bytes_usage();
if (!_base_chunk) {
_base_chunk = _chunks.back();
}
int64_t max_flush_batch_size = _file_writer->get_flush_batch_size();
if (_sort_ordering || max_flush_batch_size == 0) {
max_flush_batch_size = _max_file_size;
}
if (_chunk_bytes_usage >= max_flush_batch_size) {
return _flush_to_file();
} else if (_mem_insufficent()) {
return _spill();
}
return Status::OK();
}
Status SpillPartitionChunkWriter::flush() {
RETURN_IF(!_file_writer, Status::OK());
return _spill();
}
Status SpillPartitionChunkWriter::finish() {
_chunk_spill_token->wait();
// If no chunks have been spilled, flush data to remote file directly.
if (_load_chunk_spiller->empty()) {
VLOG(2) << "flush to remote directly when finish, query_id: " << print_id(_fragment_context->query_id())
<< ", fragment_instance_id: " << print_id(_fragment_context->fragment_instance_id());
RETURN_IF_ERROR(_flush_to_file());
commit_file();
return Status::OK();
}
auto cb = [this](const Status& st) {
LOG_IF(ERROR, !st.ok()) << "fail to merge spill blocks, query_id: " << print_id(_fragment_context->query_id())
<< ", fragment_instance_id: " << print_id(_fragment_context->fragment_instance_id());
_handle_err(st);
commit_file();
};
auto merge_task = std::make_shared<MergeBlockTask>(this, cb);
return _block_merge_token->submit(merge_task);
}
const int64_t SpillPartitionChunkWriter::kWaitMilliseconds = 10;
bool SpillPartitionChunkWriter::is_finished() {
bool finished = _chunk_spill_token->wait_for(MonoDelta::FromMilliseconds(kWaitMilliseconds)) &&
_block_merge_token->wait_for(MonoDelta::FromMilliseconds(kWaitMilliseconds));
return finished;
}
Status SpillPartitionChunkWriter::merge_blocks() {
RETURN_IF_ERROR(flush());
_chunk_spill_token->wait();
auto write_func = [this](Chunk* chunk) { return _flush_chunk(chunk, false); };
auto flush_func = []() {
// do nothing because we check and commit when writing chunk.
return Status::OK();
};
Status st = _load_chunk_spiller->merge_write(_max_file_size, _sort_ordering != nullptr, false /* do_agg */,
write_func, flush_func);
VLOG(2) << "finish merge blocks, query_id: " << _fragment_context->query_id() << ", status: " << st.message();
return st;
}
Status SpillPartitionChunkWriter::_sort() {
RETURN_IF(!_result_chunk, Status::OK());
auto chunk = _result_chunk->clone_empty_with_schema(0);
_result_chunk->swap_chunk(*chunk);
SmallPermutation perm = create_small_permutation(static_cast<uint32_t>(chunk->num_rows()));
Columns columns;
for (auto sort_key_idx : _sort_ordering->sort_key_idxes) {
columns.push_back(chunk->get_column_by_index(sort_key_idx));
}
RETURN_IF_ERROR(stable_sort_and_tie_columns(false, columns, _sort_ordering->sort_descs, &perm));
std::vector<uint32_t> selective;
permutate_to_selective(perm, &selective);
_result_chunk->rolling_append_selective(*chunk, selective.data(), 0, chunk->num_rows());
return Status::OK();
}
Status SpillPartitionChunkWriter::_spill() {
RETURN_IF(_chunks.empty(), Status::OK());
_merge_chunks();
if (_sort_ordering) {
RETURN_IF_ERROR(_sort());
}
auto callback = [this](const ChunkPtr& chunk, const StatusOr<size_t>& res) {
if (!res.ok()) {
LOG(ERROR) << "fail to spill connector partition chunk sink, write it to remote file directly. msg: "
<< res.status().message();
Status st = _flush_chunk(chunk.get(), true);
_handle_err(st);
} else {
VLOG(3) << "spill chunk data, filename: " << out_stream()->filename() << ", size: " << chunk->bytes_usage();
}
_spilling_bytes_usage.fetch_sub(chunk->bytes_usage(), std::memory_order_relaxed);
};
auto spill_task = std::make_shared<ChunkSpillTask>(_load_chunk_spiller.get(), _result_chunk, callback);
RETURN_IF_ERROR(_chunk_spill_token->submit(spill_task));
_spilling_bytes_usage.fetch_add(_result_chunk->bytes_usage(), std::memory_order_relaxed);
_chunk_bytes_usage = 0;
return Status::OK();
}
Status SpillPartitionChunkWriter::_flush_to_file() {
RETURN_IF(_chunks.empty(), Status::OK());
if (!_sort_ordering) {
for (auto& chunk : _chunks) {
RETURN_IF_ERROR(_flush_chunk(chunk.get(), false));
}
} else {
_merge_chunks();
RETURN_IF_ERROR(_sort());
RETURN_IF_ERROR(_flush_chunk(_result_chunk.get(), true));
}
_chunks.clear();
_chunk_bytes_usage = 0;
return Status::OK();
};
Status SpillPartitionChunkWriter::_flush_chunk(Chunk* chunk, bool split) {
if (chunk->get_slot_id_to_index_map().empty()) {
auto slot_map = _base_chunk->get_slot_id_to_index_map();
for (auto& it : slot_map) {
chunk->set_slot_id_to_index(it.first, it.second);
}
}
if (!split) {
return _write_chunk(chunk);
}
size_t chunk_size = config::vector_chunk_size;
for (size_t offset = 0; offset < chunk->num_rows(); offset += chunk_size) {
auto sub_chunk = chunk->clone_empty(chunk_size);
size_t num_rows = std::min(chunk_size, chunk->num_rows() - offset);
sub_chunk->append(*chunk, offset, num_rows);
RETURN_IF_ERROR(_write_chunk(sub_chunk.get()));
}
return Status::OK();
}
Status SpillPartitionChunkWriter::_write_chunk(Chunk* chunk) {
if (_file_writer->get_written_bytes() >= _max_file_size) {
commit_file();
}
RETURN_IF_ERROR(create_file_writer_if_needed());
RETURN_IF_ERROR(_file_writer->write(chunk));
return Status::OK();
}
void SpillPartitionChunkWriter::_merge_chunks() {
if (_chunks.empty()) {
return;
}
// Create a target chunk with schema to make it can use some
// module functions of native table directly.
size_t num_rows = std::accumulate(_chunks.begin(), _chunks.end(), 0,
[](int sum, const ChunkPtr& chunk) { return sum + chunk->num_rows(); });
_result_chunk = _create_schema_chunk(_chunks.front(), num_rows);
for (auto& chunk : _chunks) {
_result_chunk->append(*chunk, 0, chunk->num_rows());
chunk.reset();
}
_chunks.clear();
}
bool SpillPartitionChunkWriter::_mem_insufficent() {
// Return false because we will triger spill by sink memory manager.
return false;
}
void SpillPartitionChunkWriter::_handle_err(const Status& st) {
if (!st.ok()) {
_error_handler(st);
}
}
SchemaPtr SpillPartitionChunkWriter::_make_schema() {
Fields fields;
for (auto& slot : _tuple_desc->slots()) {
TypeDescriptor type_desc = slot->type();
TypeInfoPtr type_info = get_type_info(type_desc.type, type_desc.precision, type_desc.scale);
auto field = std::make_shared<Field>(slot->id(), slot->col_name(), type_info, slot->is_nullable());
fields.push_back(field);
}
SchemaPtr schema =
std::make_shared<Schema>(std::move(fields), KeysType::DUP_KEYS,
_sort_ordering ? _sort_ordering->sort_key_idxes : std::vector<uint32_t>());
return schema;
}
ChunkPtr SpillPartitionChunkWriter::_create_schema_chunk(const ChunkPtr& base_chunk, size_t num_rows) {
if (!_schema) {
auto schema = base_chunk->schema();
if (schema) {
_schema = schema;
} else {
_schema = _make_schema();
}
}
auto chunk = ChunkHelper::new_chunk(*_schema, num_rows);
return chunk;
}
} // namespace starrocks::connector

View File

@ -0,0 +1,253 @@
// Copyright 2021-present StarRocks, Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#pragma once
#include <fmt/format.h>
#include <map>
#include "column/chunk.h"
#include "common/status.h"
#include "connector/utils.h"
#include "formats/file_writer.h"
#include "fs/fs.h"
#include "runtime/exec_env.h"
#include "runtime/runtime_state.h"
#include "storage/load_chunk_spiller.h"
#include "util/threadpool.h"
namespace starrocks::connector {
using CommitResult = formats::FileWriter::CommitResult;
using CommitFunc = std::function<void(const CommitResult& result)>;
using ErrorHandleFunc = std::function<void(const Status& status)>;
class AsyncFlushStreamPoller;
struct SortOrdering {
std::vector<uint32_t> sort_key_idxes;
SortDescs sort_descs;
};
struct PartitionChunkWriterContext {
std::shared_ptr<formats::FileWriterFactory> file_writer_factory;
std::shared_ptr<LocationProvider> location_provider;
int64_t max_file_size = 0;
bool is_default_partition = false;
};
struct BufferPartitionChunkWriterContext : public PartitionChunkWriterContext {};
struct SpillPartitionChunkWriterContext : public PartitionChunkWriterContext {
pipeline::FragmentContext* fragment_context = nullptr;
TupleDescriptor* tuple_desc = nullptr;
std::shared_ptr<SortOrdering> sort_ordering;
};
class PartitionChunkWriter {
public:
PartitionChunkWriter(std::string partition, std::vector<int8_t> partition_field_null_list,
const std::shared_ptr<PartitionChunkWriterContext>& ctx);
virtual ~PartitionChunkWriter() = default;
virtual Status init() = 0;
virtual Status write(Chunk* chunk) = 0;
virtual Status flush() = 0;
virtual Status finish() = 0;
virtual bool is_finished() = 0;
virtual int64_t get_written_bytes() = 0;
virtual int64_t get_flushable_bytes() = 0;
const std::string& partition() const { return _partition; }
const std::vector<int8_t>& partition_field_null_list() const { return _partition_field_null_list; }
std::shared_ptr<formats::FileWriter> file_writer() { return _file_writer; }
std::shared_ptr<io::AsyncFlushOutputStream> out_stream() { return _out_stream; }
void set_io_poller(AsyncFlushStreamPoller* io_poller) { _io_poller = io_poller; }
void set_commit_callback(const CommitFunc& commit_callback) { _commit_callback = commit_callback; }
void set_error_handler(const ErrorHandleFunc& error_handler) { _error_handler = error_handler; }
protected:
Status create_file_writer_if_needed();
void commit_file();
protected:
std::string _partition;
std::vector<int8_t> _partition_field_null_list;
std::shared_ptr<formats::FileWriterFactory> _file_writer_factory;
std::shared_ptr<LocationProvider> _location_provider;
int64_t _max_file_size = 0;
bool _is_default_partition = false;
AsyncFlushStreamPoller* _io_poller = nullptr;
std::shared_ptr<formats::FileWriter> _file_writer;
std::shared_ptr<io::AsyncFlushOutputStream> _out_stream;
CommitFunc _commit_callback;
std::string _commit_extra_data;
ErrorHandleFunc _error_handler = nullptr;
};
class BufferPartitionChunkWriter : public PartitionChunkWriter {
public:
BufferPartitionChunkWriter(std::string partition, std::vector<int8_t> partition_field_null_list,
const std::shared_ptr<BufferPartitionChunkWriterContext>& ctx)
: PartitionChunkWriter(std::move(partition), std::move(partition_field_null_list), ctx) {}
Status init() override;
Status write(Chunk* chunk) override;
Status flush() override;
Status finish() override;
bool is_finished() override { return true; }
int64_t get_written_bytes() override { return _file_writer ? _file_writer->get_written_bytes() : 0; }
int64_t get_flushable_bytes() override { return _file_writer ? _file_writer->get_written_bytes() : 0; }
};
class SpillPartitionChunkWriter : public PartitionChunkWriter {
public:
SpillPartitionChunkWriter(std::string partition, std::vector<int8_t> partition_field_null_list,
const std::shared_ptr<SpillPartitionChunkWriterContext>& ctx);
~SpillPartitionChunkWriter();
Status init() override;
Status write(Chunk* chunk) override;
Status flush() override;
Status finish() override;
bool is_finished() override;
int64_t get_written_bytes() override {
if (!_file_writer) {
return 0;
}
return _chunk_bytes_usage + _spilling_bytes_usage.load(std::memory_order_relaxed) +
_file_writer->get_written_bytes();
}
int64_t get_flushable_bytes() override { return _chunk_bytes_usage; }
Status merge_blocks();
private:
Status _sort();
Status _spill();
Status _flush_to_file();
Status _flush_chunk(Chunk* chunk, bool split);
Status _write_chunk(Chunk* chunk);
void _merge_chunks();
SchemaPtr _make_schema();
ChunkPtr _create_schema_chunk(const ChunkPtr& base_chunk, size_t row_nums);
bool _mem_insufficent();
void _handle_err(const Status& st);
private:
pipeline::FragmentContext* _fragment_context = nullptr;
TupleDescriptor* _tuple_desc = nullptr;
std::shared_ptr<SortOrdering> _sort_ordering;
std::unique_ptr<ThreadPoolToken> _chunk_spill_token;
std::unique_ptr<ThreadPoolToken> _block_merge_token;
std::unique_ptr<LoadSpillBlockManager> _load_spill_block_mgr;
std::shared_ptr<LoadChunkSpiller> _load_chunk_spiller;
std::list<ChunkPtr> _chunks;
int64_t _chunk_bytes_usage = 0;
std::atomic<int64_t> _spilling_bytes_usage = 0;
ChunkPtr _result_chunk;
ChunkPtr _base_chunk;
SchemaPtr _schema;
static const int64_t kWaitMilliseconds;
};
using PartitionChunkWriterPtr = std::shared_ptr<PartitionChunkWriter>;
class PartitionChunkWriterFactory {
public:
virtual ~PartitionChunkWriterFactory() = default;
virtual Status init() = 0;
virtual PartitionChunkWriterPtr create(std::string partition,
std::vector<int8_t> partition_field_null_list) const = 0;
};
class BufferPartitionChunkWriterFactory : public PartitionChunkWriterFactory {
public:
BufferPartitionChunkWriterFactory(std::shared_ptr<BufferPartitionChunkWriterContext> ctx) : _ctx(ctx) {}
~BufferPartitionChunkWriterFactory() = default;
Status init() override { return _ctx->file_writer_factory->init(); }
PartitionChunkWriterPtr create(std::string partition,
std::vector<int8_t> partition_field_null_list) const override {
return std::make_shared<BufferPartitionChunkWriter>(std::move(partition), std::move(partition_field_null_list),
_ctx);
}
private:
std::shared_ptr<BufferPartitionChunkWriterContext> _ctx;
};
class SpillPartitionChunkWriterFactory : public PartitionChunkWriterFactory {
public:
SpillPartitionChunkWriterFactory(std::shared_ptr<SpillPartitionChunkWriterContext> ctx) : _ctx(ctx) {}
~SpillPartitionChunkWriterFactory() = default;
Status init() override { return _ctx->file_writer_factory->init(); }
PartitionChunkWriterPtr create(std::string partition,
std::vector<int8_t> partition_field_null_list) const override {
return std::make_shared<SpillPartitionChunkWriter>(std::move(partition), std::move(partition_field_null_list),
_ctx);
}
private:
std::shared_ptr<SpillPartitionChunkWriterContext> _ctx;
};
} // namespace starrocks::connector

View File

@ -18,9 +18,9 @@
namespace starrocks::connector {
void SinkOperatorMemoryManager::init(std::map<PartitionKey, WriterStreamPair>* writer_stream_pairs,
void SinkOperatorMemoryManager::init(std::map<PartitionKey, PartitionChunkWriterPtr>* partition_chunk_writers,
AsyncFlushStreamPoller* io_poller, CommitFunc commit_func) {
_candidates = writer_stream_pairs;
_candidates = partition_chunk_writers;
_commit_func = std::move(commit_func);
_io_poller = io_poller;
}
@ -30,24 +30,24 @@ bool SinkOperatorMemoryManager::kill_victim() {
return false;
}
// find file writer with the largest file size
PartitionKey partition;
WriterStreamPair* victim = nullptr;
for (auto& [key, writer_and_stream] : *_candidates) {
if (victim && victim->first->get_written_bytes() > writer_and_stream.first->get_written_bytes()) {
// Find a target file writer to flush.
// For buffered partition writer, choose the the writer with the largest file size.
// For spillable partition writer, choose the the writer with the largest memory size that can be spilled.
PartitionChunkWriterPtr victim = nullptr;
for (auto& [key, writer] : *_candidates) {
if (victim && victim->get_flushable_bytes() > writer->get_flushable_bytes()) {
continue;
}
partition = key;
victim = &writer_and_stream;
victim = writer;
}
if (victim == nullptr) {
return false;
}
auto result = victim->first->commit();
_commit_func(result);
LOG(INFO) << "kill victim: " << victim->second->filename() << " size: " << result.file_statistics.file_size;
_candidates->erase(partition);
// The flush will decrease the writer flushable memory bytes, so it usually
// will not be choosed in a short time.
auto result = victim->flush();
LOG(INFO) << "kill victim: " << victim->out_stream()->filename() << ", result: " << result;
return true;
}
@ -59,8 +59,8 @@ int64_t SinkOperatorMemoryManager::update_releasable_memory() {
int64_t SinkOperatorMemoryManager::update_writer_occupied_memory() {
int64_t writer_occupied_memory = 0;
for (auto& [_, writer_and_stream] : *_candidates) {
writer_occupied_memory += writer_and_stream.first->get_written_bytes();
for (auto& [_, writer] : *_candidates) {
writer_occupied_memory += writer->get_written_bytes();
}
_writer_occupied_memory.store(writer_occupied_memory);
return _writer_occupied_memory;
@ -113,7 +113,6 @@ bool SinkMemoryManager::_apply_on_mem_tracker(SinkOperatorMemoryManager* child_m
auto available_memory = [&]() { return mem_tracker->limit() - mem_tracker->consumption(); };
auto low_watermark = static_cast<int64_t>(mem_tracker->limit() * _low_watermark_ratio);
auto high_watermark = static_cast<int64_t>(mem_tracker->limit() * _high_watermark_ratio);
auto exceed_urgent_space = [&]() {
return _total_writer_occupied_memory() > _query_tracker->limit() * _urgent_space_ratio;
};
@ -125,7 +124,7 @@ bool SinkMemoryManager::_apply_on_mem_tracker(SinkOperatorMemoryManager* child_m
<< " releasable_memory: " << _total_releasable_memory()
<< " writer_allocated_memory: " << _total_writer_occupied_memory();
// trigger early close
while (exceed_urgent_space() && available_memory() + _total_releasable_memory() < high_watermark) {
while (exceed_urgent_space() && available_memory() <= low_watermark) {
bool found = child_manager->kill_victim();
if (!found) {
break;

View File

@ -28,8 +28,8 @@ class SinkOperatorMemoryManager {
public:
SinkOperatorMemoryManager() = default;
void init(std::map<PartitionKey, WriterStreamPair>* writer_stream_pairs, AsyncFlushStreamPoller* io_poller,
CommitFunc commit_func);
void init(std::map<PartitionKey, PartitionChunkWriterPtr>* partition_chunk_writers,
AsyncFlushStreamPoller* io_poller, CommitFunc commit_func);
// return true if a victim is found and killed, otherwise return false
bool kill_victim();
@ -45,7 +45,7 @@ public:
int64_t writer_occupied_memory() { return _writer_occupied_memory.load(); }
private:
std::map<PartitionKey, WriterStreamPair>* _candidates = nullptr; // reference, owned by sink operator
std::map<PartitionKey, PartitionChunkWriterPtr>* _candidates = nullptr; // reference, owned by sink operator
CommitFunc _commit_func;
AsyncFlushStreamPoller* _io_poller;
std::atomic_int64_t _releasable_memory{0};

View File

@ -104,6 +104,12 @@ public:
// location = base_path/{query_id}_{be_number}_{driver_id}_index.file_suffix
std::string get() { return fmt::format("{}/{}_{}.{}", _base_path, _file_name_prefix, _index++, _file_name_suffix); }
std::string root_location(const std::string& partition) {
return fmt::format("{}/{}", _base_path, PathUtils::remove_trailing_slash(partition));
}
std::string root_location() { return fmt::format("{}", PathUtils::remove_trailing_slash(_base_path)); }
private:
const std::string _base_path;
const std::string _file_name_prefix;

View File

@ -60,6 +60,9 @@ bool ConnectorSinkOperator::need_input() const {
}
auto [status, _] = _io_poller->poll();
if (status.ok()) {
status = _connector_chunk_sink->status();
}
if (!status.ok()) {
LOG(WARNING) << "cancel fragment: " << status;
_fragment_context->cancel(status);
@ -74,12 +77,16 @@ bool ConnectorSinkOperator::is_finished() const {
}
auto [status, finished] = _io_poller->poll();
if (status.ok()) {
status = _connector_chunk_sink->status();
}
if (!status.ok()) {
LOG(WARNING) << "cancel fragment: " << status;
_fragment_context->cancel(status);
}
return finished;
bool ret = finished && _connector_chunk_sink->is_finished();
return ret;
}
Status ConnectorSinkOperator::set_finishing(RuntimeState* state) {

View File

@ -67,6 +67,10 @@ int64_t CSVFileWriter::get_allocated_bytes() {
return _output_stream->buffer_size();
}
int64_t CSVFileWriter::get_flush_batch_size() {
return 0;
}
Status CSVFileWriter::write(Chunk* chunk) {
_num_rows += chunk->num_rows();

View File

@ -46,6 +46,8 @@ public:
int64_t get_allocated_bytes() override;
int64_t get_flush_batch_size() override;
Status write(Chunk* chunk) override;
CommitResult commit() override;

View File

@ -60,6 +60,7 @@ public:
virtual Status init() = 0;
virtual int64_t get_written_bytes() = 0;
virtual int64_t get_allocated_bytes() = 0;
virtual int64_t get_flush_batch_size() = 0;
virtual Status write(Chunk* chunk) = 0;
virtual CommitResult commit() = 0;
};

View File

@ -143,6 +143,10 @@ int64_t ORCFileWriter::get_allocated_bytes() {
return _memory_pool.bytes_allocated();
}
int64_t ORCFileWriter::get_flush_batch_size() {
return 0;
}
Status ORCFileWriter::write(Chunk* chunk) {
ASSIGN_OR_RETURN(auto cvb, _convert(chunk));
_writer->add(*cvb);

View File

@ -83,6 +83,8 @@ public:
int64_t get_allocated_bytes() override;
int64_t get_flush_batch_size() override;
Status write(Chunk* chunk) override;
CommitResult commit() override;

View File

@ -96,6 +96,10 @@ int64_t ParquetFileWriter::get_allocated_bytes() {
return _memory_pool.bytes_allocated();
}
int64_t ParquetFileWriter::get_flush_batch_size() {
return _writer_options->rowgroup_size;
}
Status ParquetFileWriter::_flush_row_group() {
DCHECK(_rowgroup_writer != nullptr);
try {

View File

@ -118,6 +118,8 @@ public:
int64_t get_allocated_bytes() override;
int64_t get_flush_batch_size() override;
Status write(Chunk* chunk) override;
CommitResult commit() override;

View File

@ -44,6 +44,7 @@
#include "common/configbase.h"
#include "common/logging.h"
#include "common/process_exit.h"
#include "connector/connector_sink_executor.h"
#include "exec/pipeline/driver_limiter.h"
#include "exec/pipeline/pipeline_driver_executor.h"
#include "exec/pipeline/query_context.h"
@ -517,6 +518,9 @@ Status ExecEnv::init(const std::vector<StorePath>& store_paths, bool as_cn) {
_routine_load_task_executor = new RoutineLoadTaskExecutor(this);
RETURN_IF_ERROR(_routine_load_task_executor->init());
_connector_sink_spill_executor = new connector::ConnectorSinkSpillExecutor();
RETURN_IF_ERROR(_connector_sink_spill_executor->init());
_small_file_mgr = new SmallFileMgr(this, config::small_file_dir);
_runtime_filter_worker = new RuntimeFilterWorker(this);
_runtime_filter_cache = new RuntimeFilterCache(8);
@ -732,6 +736,7 @@ void ExecEnv::destroy() {
SAFE_DELETE(_stream_context_mgr);
SAFE_DELETE(_routine_load_task_executor);
SAFE_DELETE(_stream_load_executor);
SAFE_DELETE(_connector_sink_spill_executor);
SAFE_DELETE(_fragment_mgr);
SAFE_DELETE(_load_stream_mgr);
SAFE_DELETE(_load_channel_mgr);

View File

@ -107,6 +107,10 @@ namespace spill {
class DirManager;
}
namespace connector {
class ConnectorSinkSpillExecutor;
}
class GlobalEnv {
public:
static GlobalEnv* GetInstance() {
@ -311,6 +315,8 @@ public:
RoutineLoadTaskExecutor* routine_load_task_executor() { return _routine_load_task_executor; }
HeartbeatFlags* heartbeat_flags() { return _heartbeat_flags; }
connector::ConnectorSinkSpillExecutor* connector_sink_spill_executor() { return _connector_sink_spill_executor; }
ThreadPool* automatic_partition_pool() { return _automatic_partition_pool.get(); }
RuntimeFilterWorker* runtime_filter_worker() { return _runtime_filter_worker; }
@ -408,6 +414,8 @@ private:
SmallFileMgr* _small_file_mgr = nullptr;
HeartbeatFlags* _heartbeat_flags = nullptr;
connector::ConnectorSinkSpillExecutor* _connector_sink_spill_executor = nullptr;
std::unique_ptr<ThreadPool> _automatic_partition_pool;
RuntimeFilterWorker* _runtime_filter_worker = nullptr;

View File

@ -80,6 +80,7 @@ Status IcebergTableSink::decompose_to_pipeline(pipeline::OpFactories prev_operat
sink_ctx->column_evaluators = ColumnExprEvaluator::from_exprs(this->get_output_expr(), runtime_state);
sink_ctx->transform_exprs = iceberg_table_desc->get_transform_exprs();
sink_ctx->fragment_context = fragment_ctx;
sink_ctx->tuple_desc_id = t_iceberg_sink.tuple_id;
auto connector = connector::ConnectorManager::default_instance()->get(connector::Connector::ICEBERG);
auto sink_provider = connector->create_data_sink_provider();

View File

@ -33,6 +33,7 @@ set(EXEC_FILES
./connector_sink/hive_chunk_sink_test.cpp
./connector_sink/iceberg_chunk_sink_test.cpp
./connector_sink/file_chunk_sink_test.cpp
./connector_sink/partition_chunk_writer_test.cpp
./connector_sink/async_flush_output_stream_test.cpp
./fs/azure/azblob_uri_test.cpp
./fs/azure/fs_azblob_test.cpp
@ -604,4 +605,4 @@ ADD_BE_TEST(storage/lake/compaction_task_test)
# ADD_BE_TEST(exec/sorting_test)
# Standalone executable for builtin_functions_fuzzy_test
ADD_BE_TEST(fuzzy/builtin_functions_fuzzy_test)
ADD_BE_TEST(fuzzy/builtin_functions_fuzzy_test)

View File

@ -64,11 +64,14 @@ TEST_F(FileChunkSinkTest, test_callback) {
std::vector<std::string> partition_column_names = {"k1"};
std::vector<std::unique_ptr<ColumnEvaluator>> partition_column_evaluators =
ColumnSlotIdEvaluator::from_types({TypeDescriptor::from_logical_type(TYPE_VARCHAR)});
auto mock_writer_factory = std::make_unique<MockFileWriterFactory>();
auto location_provider = std::make_unique<LocationProvider>("base_path", "ffffff", 0, 0, "parquet");
auto mock_writer_factory = std::make_shared<MockFileWriterFactory>();
auto location_provider = std::make_shared<LocationProvider>("base_path", "ffffff", 0, 0, "parquet");
auto partition_chunk_writer_ctx = std::make_shared<BufferPartitionChunkWriterContext>(
BufferPartitionChunkWriterContext{mock_writer_factory, location_provider, 100, false});
auto partition_chunk_writer_factory =
std::make_unique<BufferPartitionChunkWriterFactory>(partition_chunk_writer_ctx);
auto sink = std::make_unique<FileChunkSink>(partition_column_names, std::move(partition_column_evaluators),
std::move(location_provider), std::move(mock_writer_factory), 100,
_runtime_state);
std::move(partition_chunk_writer_factory), _runtime_state);
sink->callback_on_commit(CommitResult{
.io_status = Status::OK(),
.format = formats::PARQUET,

View File

@ -64,11 +64,14 @@ TEST_F(HiveChunkSinkTest, test_callback) {
std::vector<std::string> partition_column_names = {"k1"};
std::vector<std::unique_ptr<ColumnEvaluator>> partition_column_evaluators =
ColumnSlotIdEvaluator::from_types({TypeDescriptor::from_logical_type(TYPE_VARCHAR)});
auto mock_writer_factory = std::make_unique<MockFileWriterFactory>();
auto location_provider = std::make_unique<LocationProvider>("base_path", "ffffff", 0, 0, "parquet");
auto mock_writer_factory = std::make_shared<MockFileWriterFactory>();
auto location_provider = std::make_shared<LocationProvider>("base_path", "ffffff", 0, 0, "parquet");
auto partition_chunk_writer_ctx = std::make_shared<BufferPartitionChunkWriterContext>(
BufferPartitionChunkWriterContext{mock_writer_factory, location_provider, 100, false});
auto partition_chunk_writer_factory =
std::make_unique<BufferPartitionChunkWriterFactory>(partition_chunk_writer_ctx);
auto sink = std::make_unique<HiveChunkSink>(partition_column_names, std::move(partition_column_evaluators),
std::move(location_provider), std::move(mock_writer_factory), 100,
_runtime_state);
std::move(partition_chunk_writer_factory), _runtime_state);
sink->callback_on_commit(CommitResult{
.io_status = Status::OK(),
.format = formats::PARQUET,

View File

@ -27,6 +27,7 @@
#include "formats/file_writer.h"
#include "formats/utils.h"
#include "testutil/assert.h"
#include "testutil/scoped_updater.h"
#include "util/defer_op.h"
#include "util/integer_util.h"
@ -35,6 +36,7 @@ namespace {
using CommitResult = formats::FileWriter::CommitResult;
using WriterAndStream = formats::WriterAndStream;
using Stream = io::AsyncFlushOutputStream;
using ::testing::Return;
using ::testing::ByMove;
using ::testing::_;
@ -65,6 +67,7 @@ public:
MOCK_METHOD(Status, init, (), (override));
MOCK_METHOD(int64_t, get_written_bytes, (), (override));
MOCK_METHOD(int64_t, get_allocated_bytes, (), (override));
MOCK_METHOD(int64_t, get_flush_batch_size, (), (override));
MOCK_METHOD(Status, write, (Chunk * chunk), (override));
MOCK_METHOD(CommitResult, commit, (), (override));
};
@ -83,7 +86,7 @@ public:
class MockPoller : public AsyncFlushStreamPoller {
public:
MOCK_METHOD(void, enqueue, (std::unique_ptr<Stream> stream), (override));
MOCK_METHOD(void, enqueue, (std::shared_ptr<Stream> stream), (override));
};
TEST_F(IcebergChunkSinkTest, test_callback) {
@ -92,18 +95,24 @@ TEST_F(IcebergChunkSinkTest, test_callback) {
std::vector<std::string> transform = {"identity"};
std::vector<std::unique_ptr<ColumnEvaluator>> partition_column_evaluators =
ColumnSlotIdEvaluator::from_types({TypeDescriptor::from_logical_type(TYPE_VARCHAR)});
auto mock_writer_factory = std::make_unique<MockFileWriterFactory>();
auto location_provider = std::make_unique<LocationProvider>("base_path", "ffffff", 0, 0, "parquet");
auto mock_writer_factory = std::make_shared<MockFileWriterFactory>();
auto location_provider = std::make_shared<LocationProvider>("base_path", "ffffff", 0, 0, "parquet");
WriterAndStream ws;
ws.writer = std::make_unique<MockWriter>();
ws.stream = std::make_unique<io::AsyncFlushOutputStream>(std::make_unique<MockFile>(), nullptr, nullptr);
ws.stream = std::make_unique<Stream>(std::make_unique<MockFile>(), nullptr, nullptr);
EXPECT_CALL(*mock_writer_factory, create(::testing::_))
.WillRepeatedly(::testing::Return(ByMove(StatusOr<WriterAndStream>(std::move(ws)))));
auto sink = std::make_unique<IcebergChunkSink>(
partition_column_names, transform, std::move(partition_column_evaluators), std::move(location_provider),
std::move(mock_writer_factory), 100, _runtime_state);
auto partition_chunk_writer_ctx = std::make_shared<BufferPartitionChunkWriterContext>(
BufferPartitionChunkWriterContext{mock_writer_factory, location_provider, 100, false});
auto partition_chunk_writer_factory =
std::make_unique<BufferPartitionChunkWriterFactory>(partition_chunk_writer_ctx);
auto sink = std::make_unique<connector::IcebergChunkSink>(
partition_column_names, transform, std::move(partition_column_evaluators),
std::move(partition_chunk_writer_factory), _runtime_state);
auto poller = MockPoller();
sink->set_io_poller(&poller);
Columns partition_key_columns;
ChunkPtr chunk = std::make_shared<Chunk>();
std::vector<ChunkExtraColumnsMeta> extra_metas;
@ -132,12 +141,16 @@ TEST_F(IcebergChunkSinkTest, test_callback) {
.location = "path/to/directory/data.parquet",
}
.set_extra_data("0"));
sink->set_status(Status::OK());
EXPECT_EQ(sink->is_finished(), true);
EXPECT_EQ(sink->status().ok(), true);
EXPECT_EQ(_runtime_state->num_rows_load_sink(), 100);
}
}
TEST_F(IcebergChunkSinkTest, test_factory) {
SCOPED_UPDATE(bool, config::enable_connector_sink_spill, false);
IcebergChunkSinkProvider provider;
{

View File

@ -0,0 +1,483 @@
// Copyright 2021-present StarRocks, Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "connector/partition_chunk_writer.h"
#include <gmock/gmock.h>
#include <gtest/gtest-param-test.h>
#include <gtest/gtest.h>
#include <future>
#include <thread>
#include "connector/connector_chunk_sink.h"
#include "connector/iceberg_chunk_sink.h"
#include "connector/sink_memory_manager.h"
#include "exec/pipeline/fragment_context.h"
#include "formats/file_writer.h"
#include "formats/parquet/parquet_test_util/util.h"
#include "formats/utils.h"
#include "testutil/assert.h"
#include "util/await.h"
#include "util/defer_op.h"
#include "util/integer_util.h"
namespace starrocks::connector {
namespace {
using CommitResult = formats::FileWriter::CommitResult;
using WriterAndStream = formats::WriterAndStream;
using Stream = io::AsyncFlushOutputStream;
using ::testing::Return;
using ::testing::ByMove;
using ::testing::_;
class PartitionChunkWriterTest : public ::testing::Test {
protected:
void SetUp() override {
_fragment_context = std::make_shared<pipeline::FragmentContext>();
_fragment_context->set_runtime_state(std::make_shared<RuntimeState>());
_runtime_state = _fragment_context->runtime_state();
}
void TearDown() override {}
ObjectPool _pool;
std::shared_ptr<pipeline::FragmentContext> _fragment_context;
RuntimeState* _runtime_state;
};
class MockFileWriterFactory : public formats::FileWriterFactory {
public:
MOCK_METHOD(Status, init, (), (override));
MOCK_METHOD(StatusOr<WriterAndStream>, create, (const std::string&), (const override));
};
class WriterHelper {
public:
static WriterHelper* instance() {
static WriterHelper helper;
return &helper;
}
Status write(Chunk* chunk) {
if (!_tmp_chunk) {
_tmp_chunk = chunk->clone_empty();
}
_tmp_chunk->append(*chunk, 0, chunk->num_rows());
;
return Status::OK();
}
int64_t commit() {
if (!_tmp_chunk) {
return 0;
}
ChunkPtr result_chunk = _tmp_chunk->clone_empty(_tmp_chunk->num_rows());
result_chunk->append(*_tmp_chunk, 0, _tmp_chunk->num_rows());
_result_chunks.push_back(result_chunk);
size_t num_rows = result_chunk->num_rows();
_tmp_chunk.reset();
return num_rows;
}
void reset() {
if (_tmp_chunk) {
_tmp_chunk.reset();
}
_result_chunks.clear();
}
int64_t written_bytes() { return _tmp_chunk ? _tmp_chunk->bytes_usage() : 0; }
int64_t written_rows() { return _tmp_chunk != nullptr ? _tmp_chunk->num_rows() : 0; }
int64_t result_rows() {
int64_t num_rows = 0;
for (auto& chunk : _result_chunks) {
num_rows += chunk->num_rows();
}
return num_rows;
}
std::vector<ChunkPtr>& result_chunks() { return _result_chunks; }
private:
WriterHelper() {}
ChunkPtr _tmp_chunk = nullptr;
std::vector<ChunkPtr> _result_chunks;
};
class MockWriter : public formats::FileWriter {
public:
MockWriter() {}
MOCK_METHOD(Status, init, (), (override));
MOCK_METHOD(int64_t, get_allocated_bytes, (), (override));
int64_t get_written_bytes() override { return WriterHelper::instance()->written_bytes(); }
Status write(Chunk* chunk) override { return WriterHelper::instance()->write(chunk); }
int64_t get_flush_batch_size() override { return _flush_batch_size; }
void set_flush_batch_size(int64_t flush_batch_size) { _flush_batch_size = flush_batch_size; }
CommitResult commit() override {
size_t num_rows = WriterHelper::instance()->commit();
CommitResult commit_result = {
.io_status = Status::OK(),
.format = formats::PARQUET,
.file_statistics =
{
.record_count = num_rows,
},
.location = "path/to/directory/data.parquet",
};
return commit_result;
}
private:
int64_t _flush_batch_size = 128L * 1024 * 1024; // 128MB
};
class MockFile : public WritableFile {
public:
MOCK_METHOD(Status, append, (const Slice& data), (override));
MOCK_METHOD(Status, appendv, (const Slice* data, size_t cnt), (override));
MOCK_METHOD(Status, pre_allocate, (uint64_t size), (override));
MOCK_METHOD(Status, close, (), (override));
MOCK_METHOD(Status, flush, (FlushMode mode), (override));
MOCK_METHOD(Status, sync, (), (override));
MOCK_METHOD(uint64_t, size, (), (const, override));
MOCK_METHOD(const std::string&, filename, (), (const, override));
};
class MockPoller : public AsyncFlushStreamPoller {
public:
MOCK_METHOD(void, enqueue, (std::shared_ptr<Stream> stream), (override));
};
TEST_F(PartitionChunkWriterTest, buffer_partition_chunk_writer) {
std::string fs_base_path = "base_path";
std::filesystem::create_directories(fs_base_path + "/c1");
parquet::Utils::SlotDesc slot_descs[] = {{"c1", TYPE_VARCHAR_DESC}, {""}};
TupleDescriptor* tuple_desc =
parquet::Utils::create_tuple_descriptor(_fragment_context->runtime_state(), &_pool, slot_descs);
auto writer_helper = WriterHelper::instance();
{
writer_helper->reset();
// Create partition writer
auto mock_writer_factory = std::make_shared<MockFileWriterFactory>();
auto location_provider = std::make_shared<LocationProvider>(fs_base_path, "ffffff", 0, 0, "parquet");
EXPECT_CALL(*mock_writer_factory, create(::testing::_)).WillRepeatedly([](const std::string&) {
WriterAndStream ws;
ws.writer = std::make_unique<MockWriter>();
ws.stream = std::make_unique<Stream>(std::make_unique<MockFile>(), nullptr, nullptr);
return ws;
});
auto partition_chunk_writer_ctx = std::make_shared<BufferPartitionChunkWriterContext>(
BufferPartitionChunkWriterContext{mock_writer_factory, location_provider, 100, false});
auto partition_chunk_writer_factory =
std::make_unique<BufferPartitionChunkWriterFactory>(partition_chunk_writer_ctx);
std::vector<int8_t> partition_field_null_list;
auto partition_writer = partition_chunk_writer_factory->create("c1", partition_field_null_list);
bool commited = false;
auto commit_callback = [&commited](const CommitResult& r) { commited = true; };
auto poller = MockPoller();
partition_writer->set_io_poller(&poller);
partition_writer->set_commit_callback(commit_callback);
EXPECT_OK(partition_writer->init());
// Create a chunk
ChunkPtr chunk = ChunkHelper::new_chunk(*tuple_desc, 1);
chunk->get_column_by_index(0)->append_datum(Slice("aaa"));
// Write chunk
auto ret = partition_writer->write(chunk.get());
EXPECT_EQ(ret.ok(), true);
EXPECT_EQ(writer_helper->written_rows(), 1);
EXPECT_EQ(writer_helper->result_rows(), 0);
// Flush chunk
ret = partition_writer->flush();
EXPECT_EQ(ret.ok(), true);
EXPECT_EQ(commited, true);
EXPECT_EQ(partition_writer->is_finished(), true);
EXPECT_EQ(partition_writer->get_written_bytes(), 0);
EXPECT_EQ(partition_writer->get_flushable_bytes(), 0);
EXPECT_EQ(writer_helper->written_rows(), 0);
EXPECT_EQ(writer_helper->result_rows(), 1);
}
std::filesystem::remove_all(fs_base_path);
}
TEST_F(PartitionChunkWriterTest, spill_partition_chunk_writer) {
std::string fs_base_path = "base_path";
std::filesystem::create_directories(fs_base_path + "/c1");
auto writer_helper = WriterHelper::instance();
bool commited = false;
Status status;
parquet::Utils::SlotDesc slot_descs[] = {{"c1", TYPE_VARCHAR_DESC}, {""}};
TupleDescriptor* tuple_desc =
parquet::Utils::create_tuple_descriptor(_fragment_context->runtime_state(), &_pool, slot_descs);
// Create partition writer
auto mock_writer_factory = std::make_shared<MockFileWriterFactory>();
auto location_provider = std::make_shared<LocationProvider>(fs_base_path, "ffffff", 0, 0, "parquet");
EXPECT_CALL(*mock_writer_factory, create(::testing::_)).WillRepeatedly([](const std::string&) {
WriterAndStream ws;
ws.writer = std::make_unique<MockWriter>();
ws.stream = std::make_unique<Stream>(std::make_unique<MockFile>(), nullptr, nullptr);
return ws;
});
auto partition_chunk_writer_ctx =
std::make_shared<SpillPartitionChunkWriterContext>(SpillPartitionChunkWriterContext{
mock_writer_factory, location_provider, 100, false, _fragment_context.get(), tuple_desc, nullptr});
auto partition_chunk_writer_factory =
std::make_unique<SpillPartitionChunkWriterFactory>(partition_chunk_writer_ctx);
std::vector<int8_t> partition_field_null_list;
auto partition_writer = std::dynamic_pointer_cast<SpillPartitionChunkWriter>(
partition_chunk_writer_factory->create("c1", partition_field_null_list));
auto commit_callback = [&commited](const CommitResult& r) { commited = true; };
auto error_handler = [&status](const Status& s) { status = s; };
auto poller = MockPoller();
partition_writer->set_io_poller(&poller);
partition_writer->set_commit_callback(commit_callback);
partition_writer->set_error_handler(error_handler);
EXPECT_OK(partition_writer->init());
// Normal write and flush to file
{
writer_helper->reset();
// Create a chunk
ChunkPtr chunk = ChunkHelper::new_chunk(*tuple_desc, 1);
chunk->get_column_by_index(0)->append_datum(Slice("aaa"));
// Write chunk
auto ret = partition_writer->write(chunk.get());
EXPECT_EQ(ret.ok(), true);
EXPECT_GT(partition_writer->get_written_bytes(), 0);
EXPECT_EQ(partition_writer->get_flushable_bytes(), chunk->bytes_usage());
// Flush chunk
ret = partition_writer->finish();
EXPECT_EQ(ret.ok(), true);
EXPECT_EQ(commited, true);
EXPECT_EQ(writer_helper->written_rows(), 0);
EXPECT_EQ(writer_helper->result_rows(), 1);
}
// Write and spill
{
// Reset states
writer_helper->reset();
commited = false;
status = Status::OK();
// Create a chunk
ChunkPtr chunk = ChunkHelper::new_chunk(*tuple_desc, 1);
chunk->get_column_by_index(0)->append_datum(Slice("aaa"));
for (size_t i = 0; i < 3; ++i) {
// Write chunk
auto ret = partition_writer->write(chunk.get());
EXPECT_EQ(ret.ok(), true);
EXPECT_GT(partition_writer->get_written_bytes(), 0);
// Flush chunk
ret = partition_writer->_spill();
EXPECT_EQ(ret.ok(), true);
Awaitility()
.timeout(3 * 1000 * 1000) // 3s
.interval(300 * 1000) // 300ms
.until([partition_writer]() {
return partition_writer->_spilling_bytes_usage.load(std::memory_order_relaxed) == 0;
});
EXPECT_EQ(partition_writer->_spilling_bytes_usage.load(std::memory_order_relaxed), 0);
EXPECT_EQ(status.ok(), true);
}
// Merge spill blocks
auto ret = partition_writer->finish();
EXPECT_EQ(ret.ok(), true);
Awaitility()
.timeout(3 * 1000 * 1000) // 3s
.interval(300 * 1000) // 300ms
.until([&commited]() { return commited; });
EXPECT_EQ(commited, true);
EXPECT_EQ(status.ok(), true);
EXPECT_EQ(partition_writer->is_finished(), true);
EXPECT_EQ(writer_helper->written_rows(), 0);
EXPECT_EQ(writer_helper->result_rows(), 3);
}
std::filesystem::remove_all(fs_base_path);
}
TEST_F(PartitionChunkWriterTest, sort_column_asc) {
std::string fs_base_path = "base_path";
std::filesystem::create_directories(fs_base_path + "/c1");
parquet::Utils::SlotDesc slot_descs[] = {{"c1", TYPE_VARCHAR_DESC}, {""}};
TupleDescriptor* tuple_desc =
parquet::Utils::create_tuple_descriptor(_fragment_context->runtime_state(), &_pool, slot_descs);
auto writer_helper = WriterHelper::instance();
bool commited = false;
Status status;
// Create partition writer
auto mock_writer_factory = std::make_shared<MockFileWriterFactory>();
auto location_provider = std::make_shared<LocationProvider>(fs_base_path, "ffffff", 0, 0, "parquet");
EXPECT_CALL(*mock_writer_factory, create(::testing::_)).WillRepeatedly([](const std::string&) {
WriterAndStream ws;
ws.writer = std::make_unique<MockWriter>();
ws.stream = std::make_unique<Stream>(std::make_unique<MockFile>(), nullptr, nullptr);
return ws;
});
auto sort_ordering = std::make_shared<SortOrdering>();
sort_ordering->sort_key_idxes = {0};
sort_ordering->sort_descs.descs.emplace_back(true, false);
const size_t max_file_size = 1073741824; // 1GB
auto partition_chunk_writer_ctx = std::make_shared<SpillPartitionChunkWriterContext>(
SpillPartitionChunkWriterContext{mock_writer_factory, location_provider, max_file_size, false,
_fragment_context.get(), tuple_desc, sort_ordering});
auto partition_chunk_writer_factory =
std::make_unique<SpillPartitionChunkWriterFactory>(partition_chunk_writer_ctx);
std::vector<int8_t> partition_field_null_list;
auto partition_writer = std::dynamic_pointer_cast<SpillPartitionChunkWriter>(
partition_chunk_writer_factory->create("c1", partition_field_null_list));
auto commit_callback = [&commited](const CommitResult& r) { commited = true; };
auto error_handler = [&status](const Status& s) { status = s; };
auto poller = MockPoller();
partition_writer->set_io_poller(&poller);
partition_writer->set_commit_callback(commit_callback);
partition_writer->set_error_handler(error_handler);
EXPECT_OK(partition_writer->init());
// Normal write and flush to file
{
writer_helper->reset();
for (size_t i = 0; i < 3; ++i) {
// Create a chunk
ChunkPtr chunk = ChunkHelper::new_chunk(*tuple_desc, 3);
std::string suffix = std::to_string(3 - i);
chunk->get_column_by_index(0)->append_datum(Slice("ccc" + suffix));
chunk->get_column_by_index(0)->append_datum(Slice("bbb" + suffix));
chunk->get_column_by_index(0)->append_datum(Slice("aaa" + suffix));
// Write chunk
auto ret = partition_writer->write(chunk.get());
EXPECT_EQ(ret.ok(), true);
EXPECT_GT(partition_writer->get_written_bytes(), 0);
}
// Flush chunks directly
auto ret = partition_writer->finish();
EXPECT_EQ(ret.ok(), true);
EXPECT_EQ(commited, true);
EXPECT_EQ(status.ok(), true);
EXPECT_EQ(writer_helper->written_rows(), 0);
EXPECT_EQ(writer_helper->result_rows(), 9);
EXPECT_EQ(writer_helper->result_chunks().size(), 1);
// Check the result order
auto result_chunk = writer_helper->result_chunks()[0];
auto column = result_chunk->get_column_by_index(0);
std::string last_row;
for (size_t i = 0; i < column->size(); ++i) {
std::string cur_row = column->get(i).get_slice().to_string();
LOG(INFO) << "(" << i << "): " << cur_row;
if (!last_row.empty()) {
EXPECT_GT(cur_row, last_row);
}
last_row = cur_row;
}
}
// Write and spill multiple chunks
{
writer_helper->reset();
commited = false;
status = Status::OK();
for (size_t i = 0; i < 3; ++i) {
// Create a chunk
ChunkPtr chunk = ChunkHelper::new_chunk(*tuple_desc, 3);
std::string suffix = std::to_string(3 - i);
chunk->get_column_by_index(0)->append_datum(Slice("ccc" + suffix));
chunk->get_column_by_index(0)->append_datum(Slice("bbb" + suffix));
chunk->get_column_by_index(0)->append_datum(Slice("aaa" + suffix));
// Write chunk
auto ret = partition_writer->write(chunk.get());
EXPECT_EQ(ret.ok(), true);
EXPECT_GT(partition_writer->get_written_bytes(), 0);
// Flush chunk
ret = partition_writer->_spill();
EXPECT_EQ(ret.ok(), true);
Awaitility()
.timeout(3 * 1000 * 1000) // 3s
.interval(300 * 1000) // 300ms
.until([partition_writer]() {
return partition_writer->_spilling_bytes_usage.load(std::memory_order_relaxed) == 0;
});
EXPECT_EQ(partition_writer->_spilling_bytes_usage.load(std::memory_order_relaxed), 0);
EXPECT_EQ(status.ok(), true);
}
// Merge spill blocks
auto ret = partition_writer->finish();
EXPECT_EQ(ret.ok(), true);
Awaitility()
.timeout(3 * 1000 * 1000) // 3s
.interval(300 * 1000) // 300ms
.until([&commited]() { return commited; });
EXPECT_EQ(commited, true);
EXPECT_EQ(status.ok(), true);
EXPECT_EQ(writer_helper->written_rows(), 0);
EXPECT_EQ(writer_helper->result_rows(), 9);
EXPECT_EQ(writer_helper->result_chunks().size(), 1);
// Check the result order
auto result_chunk = writer_helper->result_chunks()[0];
auto column = result_chunk->get_column_by_index(0);
std::string last_row;
for (size_t i = 0; i < column->size(); ++i) {
std::string cur_row = column->get(i).get_slice().to_string();
LOG(INFO) << "(" << i << "): " << cur_row;
if (!last_row.empty()) {
EXPECT_GT(cur_row, last_row);
}
last_row = cur_row;
}
}
}
} // namespace
} // namespace starrocks::connector

View File

@ -248,6 +248,7 @@ struct TIcebergTableSink {
5: optional bool is_static_partition_sink
6: optional CloudConfiguration.TCloudConfiguration cloud_configuration
7: optional i64 target_max_file_size
8: required i32 tuple_id
}
struct THiveTableSink {

View File

@ -17,14 +17,14 @@ insert into iceberg_sql_test_${uuid0}.iceberg_db_${uuid0}.ice_tbl_${uuid0} selec
insert into iceberg_sql_test_${uuid0}.iceberg_db_${uuid0}.ice_tbl_${uuid0} select 3;
-- result:
-- !result
select count(1) from files ("path"="oss://starrocks-ci-test/iceberg_db_${uuid0}/ice_tbl_${uuid0}/data","list_files_only" = "true");
select count(1) from files ("path"="oss://starrocks-ci-test/iceberg_db_${uuid0}/ice_tbl_${uuid0}/data","list_files_only" = "true") where IS_DIR=0;
-- result:
3
-- !result
insert overwrite iceberg_sql_test_${uuid0}.iceberg_db_${uuid0}.ice_tbl_${uuid0} SELECT 4;
-- result:
-- !result
select count(1) from files ("path"="oss://starrocks-ci-test/iceberg_db_${uuid0}/ice_tbl_${uuid0}/data","list_files_only" = "true");
select count(1) from files ("path"="oss://starrocks-ci-test/iceberg_db_${uuid0}/ice_tbl_${uuid0}/data","list_files_only" = "true") where IS_DIR=0;
-- result:
4
-- !result
@ -40,7 +40,7 @@ alter table iceberg_sql_test_${uuid0}.iceberg_db_${uuid0}.ice_tbl_${uuid0} execu
alter table iceberg_sql_test_${uuid0}.iceberg_db_${uuid0}.ice_tbl_${uuid0} execute remove_orphan_files(now());
-- result:
-- !result
select count(1) from files ("path"="oss://starrocks-ci-test/iceberg_db_${uuid0}/ice_tbl_${uuid0}/data","list_files_only" = "true");
select count(1) from files ("path"="oss://starrocks-ci-test/iceberg_db_${uuid0}/ice_tbl_${uuid0}/data","list_files_only" = "true") where IS_DIR=0;
-- result:
1
-- !result
@ -52,4 +52,4 @@ drop database iceberg_sql_test_${uuid0}.iceberg_db_${uuid0};
-- !result
drop catalog iceberg_sql_test_${uuid0};
-- result:
-- !result
-- !result