[BugFix] Fix the issue that create spill directory failed when writing data to iceberg table. (backport #63278) (#63393)

Signed-off-by: GavinMar <yangguansuo@starrocks.com>
Co-authored-by: Gavin <yangguansuo@starrocks.com>
This commit is contained in:
mergify[bot] 2025-09-23 10:58:33 +08:00 committed by GitHub
parent 953555c49c
commit 0aa7cfb99e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
20 changed files with 191 additions and 87 deletions

View File

@ -85,7 +85,14 @@ Status ConnectorChunkSink::finish() {
return Status::OK();
}
void ConnectorChunkSink::push_rollback_action(const std::function<void()>& action) {
// Not a very frequent operation, so use unique_lock here is ok.
std::unique_lock<std::shared_mutex> wlck(_mutex);
_rollback_actions.push_back(std::move(action));
}
void ConnectorChunkSink::rollback() {
std::shared_lock<std::shared_mutex> rlck(_mutex);
for (auto& action : _rollback_actions) {
action();
}

View File

@ -65,6 +65,8 @@ public:
void set_status(const Status& status);
protected:
void push_rollback_action(const std::function<void()>& action);
AsyncFlushStreamPoller* _io_poller = nullptr;
SinkOperatorMemoryManager* _op_mem_mgr = nullptr;

View File

@ -47,7 +47,7 @@ StatusOr<std::unique_ptr<ConnectorChunkSink>> FileChunkSinkProvider::create_chun
std::shared_ptr<ConnectorChunkSinkContext> context, int32_t driver_id) {
auto ctx = std::dynamic_pointer_cast<FileChunkSinkContext>(context);
auto runtime_state = ctx->fragment_context->runtime_state();
auto fs = FileSystem::CreateUniqueFromString(ctx->path, FSOptions(&ctx->cloud_conf)).value();
std::shared_ptr<FileSystem> fs = FileSystem::CreateUniqueFromString(ctx->path, FSOptions(&ctx->cloud_conf)).value();
auto column_evaluators = ColumnEvaluator::clone(ctx->column_evaluators);
auto location_provider = std::make_shared<connector::LocationProvider>(
ctx->path, print_id(ctx->fragment_context->query_id()), runtime_state->be_number(), driver_id,
@ -56,16 +56,16 @@ StatusOr<std::unique_ptr<ConnectorChunkSink>> FileChunkSinkProvider::create_chun
std::shared_ptr<formats::FileWriterFactory> file_writer_factory;
if (boost::iequals(ctx->format, formats::PARQUET)) {
file_writer_factory = std::make_shared<formats::ParquetFileWriterFactory>(
std::move(fs), ctx->compression_type, ctx->options, ctx->column_names, std::move(column_evaluators),
std::nullopt, ctx->executor, runtime_state);
fs, ctx->compression_type, ctx->options, ctx->column_names, std::move(column_evaluators), std::nullopt,
ctx->executor, runtime_state);
} else if (boost::iequals(ctx->format, formats::ORC)) {
file_writer_factory = std::make_shared<formats::ORCFileWriterFactory>(
std::move(fs), ctx->compression_type, ctx->options, ctx->column_names, std::move(column_evaluators),
ctx->executor, runtime_state);
fs, ctx->compression_type, ctx->options, ctx->column_names, std::move(column_evaluators), ctx->executor,
runtime_state);
} else if (boost::iequals(ctx->format, formats::CSV)) {
file_writer_factory = std::make_shared<formats::CSVFileWriterFactory>(
std::move(fs), ctx->compression_type, ctx->options, ctx->column_names, std::move(column_evaluators),
ctx->executor, runtime_state);
fs, ctx->compression_type, ctx->options, ctx->column_names, std::move(column_evaluators), ctx->executor,
runtime_state);
} else {
file_writer_factory = std::make_shared<formats::UnknownFileWriterFactory>(ctx->format);
}
@ -83,6 +83,7 @@ StatusOr<std::unique_ptr<ConnectorChunkSink>> FileChunkSinkProvider::create_chun
auto partition_chunk_writer_ctx =
std::make_shared<SpillPartitionChunkWriterContext>(SpillPartitionChunkWriterContext{
{file_writer_factory, location_provider, ctx->max_file_size, partition_columns.empty()},
fs,
ctx->fragment_context,
nullptr,
nullptr});

View File

@ -53,7 +53,8 @@ StatusOr<std::unique_ptr<ConnectorChunkSink>> HiveChunkSinkProvider::create_chun
std::shared_ptr<ConnectorChunkSinkContext> context, int32_t driver_id) {
auto ctx = std::dynamic_pointer_cast<HiveChunkSinkContext>(context);
auto runtime_state = ctx->fragment_context->runtime_state();
auto fs = FileSystem::CreateUniqueFromString(ctx->path, FSOptions(&ctx->cloud_conf)).value(); // must succeed
std::shared_ptr<FileSystem> fs =
FileSystem::CreateUniqueFromString(ctx->path, FSOptions(&ctx->cloud_conf)).value(); // must succeed
auto data_column_evaluators = ColumnEvaluator::clone(ctx->data_column_evaluators);
auto location_provider = std::make_shared<connector::LocationProvider>(
ctx->path, print_id(ctx->fragment_context->query_id()), runtime_state->be_number(), driver_id,
@ -65,16 +66,16 @@ StatusOr<std::unique_ptr<ConnectorChunkSink>> HiveChunkSinkProvider::create_chun
ctx->options[formats::ParquetWriterOptions::USE_LEGACY_DECIMAL_ENCODING] = "true";
ctx->options[formats::ParquetWriterOptions::USE_INT96_TIMESTAMP_ENCODING] = "true";
file_writer_factory = std::make_shared<formats::ParquetFileWriterFactory>(
std::move(fs), ctx->compression_type, ctx->options, ctx->data_column_names,
std::move(data_column_evaluators), std::nullopt, ctx->executor, runtime_state);
fs, ctx->compression_type, ctx->options, ctx->data_column_names, std::move(data_column_evaluators),
std::nullopt, ctx->executor, runtime_state);
} else if (boost::iequals(ctx->format, formats::ORC)) {
file_writer_factory = std::make_shared<formats::ORCFileWriterFactory>(
std::move(fs), ctx->compression_type, ctx->options, ctx->data_column_names,
std::move(data_column_evaluators), ctx->executor, runtime_state);
fs, ctx->compression_type, ctx->options, ctx->data_column_names, std::move(data_column_evaluators),
ctx->executor, runtime_state);
} else if (boost::iequals(ctx->format, formats::TEXTFILE)) {
file_writer_factory = std::make_shared<formats::CSVFileWriterFactory>(
std::move(fs), ctx->compression_type, ctx->options, ctx->data_column_names,
std::move(data_column_evaluators), ctx->executor, runtime_state);
fs, ctx->compression_type, ctx->options, ctx->data_column_names, std::move(data_column_evaluators),
ctx->executor, runtime_state);
} else {
file_writer_factory = std::make_shared<formats::UnknownFileWriterFactory>(ctx->format);
}
@ -85,6 +86,7 @@ StatusOr<std::unique_ptr<ConnectorChunkSink>> HiveChunkSinkProvider::create_chun
auto partition_chunk_writer_ctx = std::make_shared<SpillPartitionChunkWriterContext>(
SpillPartitionChunkWriterContext{{file_writer_factory, location_provider, ctx->max_file_size,
ctx->partition_column_names.empty()},
fs,
ctx->fragment_context,
nullptr,
nullptr});

View File

@ -37,7 +37,7 @@ IcebergChunkSink::IcebergChunkSink(std::vector<std::string> partition_columns, s
_transform_exprs(std::move(transform_exprs)) {}
void IcebergChunkSink::callback_on_commit(const CommitResult& result) {
_rollback_actions.push_back(std::move(result.rollback_action));
push_rollback_action(std::move(result.rollback_action));
if (result.io_status.ok()) {
_state->update_num_rows_load_sink(result.file_statistics.record_count);
@ -81,7 +81,7 @@ StatusOr<std::unique_ptr<ConnectorChunkSink>> IcebergChunkSinkProvider::create_c
std::shared_ptr<ConnectorChunkSinkContext> context, int32_t driver_id) {
auto ctx = std::dynamic_pointer_cast<IcebergChunkSinkContext>(context);
auto runtime_state = ctx->fragment_context->runtime_state();
auto fs = FileSystem::CreateUniqueFromString(ctx->path, FSOptions(&ctx->cloud_conf)).value();
std::shared_ptr<FileSystem> fs = FileSystem::CreateUniqueFromString(ctx->path, FSOptions(&ctx->cloud_conf)).value();
auto column_evaluators = ColumnEvaluator::clone(ctx->column_evaluators);
auto location_provider = std::make_shared<connector::LocationProvider>(
ctx->path, print_id(ctx->fragment_context->query_id()), runtime_state->be_number(), driver_id,
@ -93,7 +93,7 @@ StatusOr<std::unique_ptr<ConnectorChunkSink>> IcebergChunkSinkProvider::create_c
std::shared_ptr<formats::FileWriterFactory> file_writer_factory;
if (boost::iequals(ctx->format, formats::PARQUET)) {
file_writer_factory = std::make_shared<formats::ParquetFileWriterFactory>(
std::move(fs), ctx->compression_type, ctx->options, ctx->column_names, std::move(column_evaluators),
fs, ctx->compression_type, ctx->options, ctx->column_names, std::move(column_evaluators),
ctx->parquet_field_ids, ctx->executor, runtime_state);
} else {
file_writer_factory = std::make_shared<formats::UnknownFileWriterFactory>(ctx->format);
@ -104,8 +104,10 @@ StatusOr<std::unique_ptr<ConnectorChunkSink>> IcebergChunkSinkProvider::create_c
auto partition_chunk_writer_ctx =
std::make_shared<SpillPartitionChunkWriterContext>(SpillPartitionChunkWriterContext{
{file_writer_factory, location_provider, ctx->max_file_size, partition_columns.empty()},
fs,
ctx->fragment_context,
runtime_state->desc_tbl().get_tuple_descriptor(ctx->tuple_desc_id),
&ctx->column_evaluators,
ctx->sort_ordering});
partition_chunk_writer_factory = std::make_unique<SpillPartitionChunkWriterFactory>(partition_chunk_writer_ctx);
} else {

View File

@ -23,6 +23,7 @@
#include "formats/file_writer.h"
#include "runtime/runtime_state.h"
#include "storage/chunk_helper.h"
#include "storage/convert_helper.h"
#include "storage/load_spill_block_manager.h"
#include "storage/storage_engine.h"
#include "storage/types.h"
@ -64,6 +65,7 @@ void PartitionChunkWriter::commit_file() {
_file_writer = nullptr;
VLOG(3) << "commit to remote file, filename: " << _out_stream->filename()
<< ", size: " << result.file_statistics.file_size;
_out_stream = nullptr;
}
Status BufferPartitionChunkWriter::init() {
@ -92,11 +94,14 @@ SpillPartitionChunkWriter::SpillPartitionChunkWriter(std::string partition,
std::vector<int8_t> partition_field_null_list,
const std::shared_ptr<SpillPartitionChunkWriterContext>& ctx)
: PartitionChunkWriter(std::move(partition), std::move(partition_field_null_list), ctx),
_fs(ctx->fs),
_fragment_context(ctx->fragment_context),
_column_evaluators(ctx->column_evaluators),
_sort_ordering(ctx->sort_ordering) {
_chunk_spill_token = ExecEnv::GetInstance()->connector_sink_spill_executor()->create_token();
_block_merge_token = StorageEngine::instance()->load_spill_block_merge_executor()->create_token();
_tuple_desc = ctx->tuple_desc;
_writer_id = generate_uuid();
}
SpillPartitionChunkWriter::~SpillPartitionChunkWriter() {
@ -109,18 +114,21 @@ SpillPartitionChunkWriter::~SpillPartitionChunkWriter() {
}
Status SpillPartitionChunkWriter::init() {
std::string root_location =
_is_default_partition ? _location_provider->root_location() : _location_provider->root_location(_partition);
_load_spill_block_mgr = std::make_unique<LoadSpillBlockManager>(
_fragment_context->query_id(), _fragment_context->fragment_instance_id(), root_location);
std::string root_location = _location_provider->root_location();
_load_spill_block_mgr =
std::make_unique<LoadSpillBlockManager>(_fragment_context->query_id(), _writer_id, root_location, _fs);
RETURN_IF_ERROR(_load_spill_block_mgr->init());
_load_chunk_spiller = std::make_unique<LoadChunkSpiller>(_load_spill_block_mgr.get(),
_fragment_context->runtime_state()->runtime_profile());
if (_column_evaluators) {
RETURN_IF_ERROR(ColumnEvaluator::init(*_column_evaluators));
}
return Status::OK();
}
Status SpillPartitionChunkWriter::write(Chunk* chunk) {
RETURN_IF_ERROR(create_file_writer_if_needed());
_chunks.push_back(chunk->clone_unique());
_chunk_bytes_usage += chunk->bytes_usage();
if (!_base_chunk) {
@ -149,7 +157,7 @@ Status SpillPartitionChunkWriter::finish() {
// If no chunks have been spilled, flush data to remote file directly.
if (_load_chunk_spiller->empty()) {
VLOG(2) << "flush to remote directly when finish, query_id: " << print_id(_fragment_context->query_id())
<< ", fragment_instance_id: " << print_id(_fragment_context->fragment_instance_id());
<< ", writer_id: " << print_id(_writer_id);
RETURN_IF_ERROR(_flush_to_file());
commit_file();
return Status::OK();
@ -157,7 +165,7 @@ Status SpillPartitionChunkWriter::finish() {
auto cb = [this](const Status& st) {
LOG_IF(ERROR, !st.ok()) << "fail to merge spill blocks, query_id: " << print_id(_fragment_context->query_id())
<< ", fragment_instance_id: " << print_id(_fragment_context->fragment_instance_id());
<< ", writer_id: " << print_id(_writer_id);
_handle_err(st);
commit_file();
};
@ -178,8 +186,12 @@ Status SpillPartitionChunkWriter::merge_blocks() {
_chunk_spill_token->wait();
auto write_func = [this](Chunk* chunk) { return _flush_chunk(chunk, false); };
auto flush_func = []() {
// do nothing because we check and commit when writing chunk.
auto flush_func = [this]() {
// Commit file after each merge function to ensure the data written to one file is ordered,
// because data generated by different merge function may be unordered.
if (_sort_ordering) {
commit_file();
}
return Status::OK();
};
Status st = _load_chunk_spiller->merge_write(_max_file_size, _sort_ordering != nullptr, false /* do_agg */,
@ -209,7 +221,7 @@ Status SpillPartitionChunkWriter::_sort() {
Status SpillPartitionChunkWriter::_spill() {
RETURN_IF(_chunks.empty(), Status::OK());
_merge_chunks();
RETURN_IF_ERROR(_merge_chunks());
if (_sort_ordering) {
RETURN_IF_ERROR(_sort());
}
@ -221,7 +233,9 @@ Status SpillPartitionChunkWriter::_spill() {
Status st = _flush_chunk(chunk.get(), true);
_handle_err(st);
} else {
VLOG(3) << "spill chunk data, filename: " << out_stream()->filename() << ", size: " << chunk->bytes_usage();
VLOG(3) << "spill chunk data, filename: " << out_stream()->filename() << ", size: " << chunk->bytes_usage()
<< ", rows: " << chunk->num_rows() << ", partition: " << _partition
<< ", writer_id: " << _writer_id;
}
_spilling_bytes_usage.fetch_sub(chunk->bytes_usage(), std::memory_order_relaxed);
};
@ -240,9 +254,10 @@ Status SpillPartitionChunkWriter::_flush_to_file() {
RETURN_IF_ERROR(_flush_chunk(chunk.get(), false));
}
} else {
_merge_chunks();
RETURN_IF_ERROR(_merge_chunks());
RETURN_IF_ERROR(_sort());
RETURN_IF_ERROR(_flush_chunk(_result_chunk.get(), true));
commit_file();
}
_chunks.clear();
_chunk_bytes_usage = 0;
@ -254,7 +269,7 @@ Status SpillPartitionChunkWriter::_flush_chunk(Chunk* chunk, bool split) {
if (chunk->get_slot_id_to_index_map().empty()) {
auto& slot_map = _base_chunk->get_slot_id_to_index_map();
for (auto& it : slot_map) {
chunk->set_slot_id_to_index(it.first, it.second);
chunk->set_slot_id_to_index(it.first, _col_index_map[it.second]);
}
}
@ -272,7 +287,7 @@ Status SpillPartitionChunkWriter::_flush_chunk(Chunk* chunk, bool split) {
}
Status SpillPartitionChunkWriter::_write_chunk(Chunk* chunk) {
if (_file_writer->get_written_bytes() >= _max_file_size) {
if (!_sort_ordering && _file_writer->get_written_bytes() >= _max_file_size) {
commit_file();
}
RETURN_IF_ERROR(create_file_writer_if_needed());
@ -280,9 +295,9 @@ Status SpillPartitionChunkWriter::_write_chunk(Chunk* chunk) {
return Status::OK();
}
void SpillPartitionChunkWriter::_merge_chunks() {
Status SpillPartitionChunkWriter::_merge_chunks() {
if (_chunks.empty()) {
return;
return Status::OK();
}
// Create a target chunk with schema to make it can use some
@ -291,11 +306,36 @@ void SpillPartitionChunkWriter::_merge_chunks() {
[](int sum, const ChunkPtr& chunk) { return sum + chunk->num_rows(); });
_result_chunk = _create_schema_chunk(_chunks.front(), num_rows);
std::unordered_map<Column*, size_t> col_ptr_index_map;
auto& columns = _chunks.front()->columns();
for (size_t i = 0; i < columns.size(); ++i) {
col_ptr_index_map[columns[i]->get_ptr()] = i;
}
for (auto& chunk : _chunks) {
_result_chunk->append(*chunk, 0, chunk->num_rows());
for (size_t i = 0; i < _result_chunk->num_columns(); ++i) {
auto* dst_col = _result_chunk->get_column_by_index(i).get();
ColumnPtr src_col;
if (_column_evaluators) {
ASSIGN_OR_RETURN(src_col, (*_column_evaluators)[i]->evaluate(chunk.get()));
} else {
src_col = chunk->get_column_by_index(i);
}
dst_col->append(*src_col);
if (chunk == _chunks.front()) {
auto it = col_ptr_index_map.find(src_col.get());
if (it != col_ptr_index_map.end()) {
_col_index_map[it->second] = i;
} else {
return Status::InternalError("unknown column index: " + std::to_string(i));
}
}
}
chunk.reset();
}
_chunks.clear();
return Status::OK();
}
bool SpillPartitionChunkWriter::_mem_insufficent() {

View File

@ -14,19 +14,15 @@
#pragma once
#include <fmt/format.h>
#include <map>
#include "column/chunk.h"
#include "common/status.h"
#include "connector/utils.h"
#include "formats/file_writer.h"
#include "fs/fs.h"
#include "runtime/exec_env.h"
#include "runtime/runtime_state.h"
#include "storage/load_chunk_spiller.h"
#include "util/threadpool.h"
#include "util/uid_util.h"
namespace starrocks::connector {
@ -51,8 +47,10 @@ struct PartitionChunkWriterContext {
struct BufferPartitionChunkWriterContext : public PartitionChunkWriterContext {};
struct SpillPartitionChunkWriterContext : public PartitionChunkWriterContext {
std::shared_ptr<FileSystem> fs;
pipeline::FragmentContext* fragment_context = nullptr;
TupleDescriptor* tuple_desc = nullptr;
std::vector<std::unique_ptr<ColumnEvaluator>>* column_evaluators;
std::shared_ptr<SortOrdering> sort_ordering;
};
@ -173,7 +171,7 @@ private:
Status _write_chunk(Chunk* chunk);
void _merge_chunks();
Status _merge_chunks();
SchemaPtr _make_schema();
@ -184,13 +182,17 @@ private:
void _handle_err(const Status& st);
private:
std::shared_ptr<FileSystem> _fs = nullptr;
pipeline::FragmentContext* _fragment_context = nullptr;
TupleDescriptor* _tuple_desc = nullptr;
std::vector<std::unique_ptr<ColumnEvaluator>>* _column_evaluators;
std::shared_ptr<SortOrdering> _sort_ordering;
std::unique_ptr<ThreadPoolToken> _chunk_spill_token;
std::unique_ptr<ThreadPoolToken> _block_merge_token;
std::unique_ptr<LoadSpillBlockManager> _load_spill_block_mgr;
std::shared_ptr<LoadChunkSpiller> _load_chunk_spiller;
//std::function<StatusOr<ColumnPtr>(Chunk*, size_t)> _column_eval_func;
TUniqueId _writer_id;
std::list<ChunkPtr> _chunks;
int64_t _chunk_bytes_usage = 0;
@ -198,6 +200,7 @@ private:
ChunkPtr _result_chunk;
ChunkPtr _base_chunk;
SchemaPtr _schema;
std::unordered_map<int, int> _col_index_map; // result chunk index -> chunk index
static const int64_t kWaitMilliseconds;
};

View File

@ -50,8 +50,9 @@ bool SinkOperatorMemoryManager::kill_victim() {
// The flush will decrease the writer flushable memory bytes, so it usually
// will not be choosed in a short time.
auto result = victim->flush();
LOG(INFO) << "kill victim: " << victim->out_stream()->filename() << ", result: " << result;
const auto filename = victim->out_stream()->filename();
const auto result = victim->flush();
LOG(INFO) << "kill victim: " << filename << ", result: " << result;
return true;
}
@ -121,9 +122,9 @@ bool SinkMemoryManager::_apply_on_mem_tracker(SinkOperatorMemoryManager* child_m
while (available_memory() <= low_watermark) {
child_manager->update_writer_occupied_memory();
int64_t total_occupied_memory = _total_writer_occupied_memory();
LOG_EVERY_SECOND(WARNING) << "consumption: " << mem_tracker->consumption()
<< ", writer_allocated_memory: " << total_occupied_memory
<< ", flush_watermark: " << flush_watermark;
LOG_EVERY_SECOND(INFO) << "consumption: " << mem_tracker->consumption()
<< ", total_occupied_memory: " << total_occupied_memory
<< ", flush_watermark: " << flush_watermark;
if (total_occupied_memory < flush_watermark) {
break;
}
@ -133,7 +134,14 @@ bool SinkMemoryManager::_apply_on_mem_tracker(SinkOperatorMemoryManager* child_m
}
}
return available_memory() > low_watermark;
child_manager->update_releasable_memory();
if (available_memory() <= low_watermark && _total_releasable_memory() > 0) {
LOG_EVERY_SECOND(WARNING) << "memory usage is still high after flush, : available_memory" << available_memory()
<< ", memory_low_watermark: " << low_watermark
<< ", total_releasable_memory: " << _total_releasable_memory();
return false;
}
return true;
}
} // namespace starrocks::connector

View File

@ -28,6 +28,7 @@
#include <ostream>
#include <utility>
#include "column/column_helper.h"
#include "formats/file_writer.h"
#include "formats/parquet/arrow_memory_pool.h"
#include "formats/parquet/chunk_writer.h"
@ -42,6 +43,7 @@
namespace starrocks {
class Chunk;
class ColumnHelper;
} // namespace starrocks
namespace starrocks::formats {

View File

@ -313,7 +313,7 @@ Status DeltaWriterImpl::build_schema_and_writer() {
UniqueId(_load_id).to_thrift(),
UniqueId(_tablet_id, _txn_id)
.to_thrift(), // use tablet id + txn id to generate fragment instance id
_tablet_manager->tablet_root_location(_tablet_id));
_tablet_manager->tablet_root_location(_tablet_id), nullptr);
RETURN_IF_ERROR(_load_spill_block_mgr->init());
}
// Init SpillMemTableSink

View File

@ -91,13 +91,16 @@ Status LoadSpillBlockManager::init() {
// init remote block manager
std::vector<std::shared_ptr<spill::Dir>> remote_dirs;
// Remote FS can also use data cache to speed up.
ASSIGN_OR_RETURN(auto fs, FileSystem::CreateSharedFromString(_remote_spill_path));
if (fs->type() != FileSystem::Type::STARLET) {
if (!_fs) {
ASSIGN_OR_RETURN(_fs, FileSystem::CreateSharedFromString(_remote_spill_path));
}
if (_fs->type() != FileSystem::Type::STARLET) {
// in starlet fs, there is opt create_missing_parent and it will create the parent dir if not exists.
// but in other fs, we need to create the parent dir manually.
RETURN_IF_ERROR(fs->create_dir_if_missing(_remote_spill_path));
RETURN_IF_ERROR(_fs->create_dir_if_missing(_remote_spill_path));
}
auto dir = std::make_shared<spill::RemoteDir>(_remote_spill_path, std::move(fs), nullptr, INT64_MAX);
auto dir = std::make_shared<spill::RemoteDir>(_remote_spill_path, std::move(_fs), nullptr, INT64_MAX);
remote_dirs.emplace_back(dir);
_remote_dir_manager = std::make_unique<spill::DirManager>(remote_dirs);

View File

@ -59,8 +59,8 @@ class LoadSpillBlockManager {
public:
// Constructor that initializes the LoadSpillBlockManager with a query ID and remote spill path.
LoadSpillBlockManager(const TUniqueId& load_id, const TUniqueId& fragment_instance_id,
const std::string& remote_spill_path)
: _load_id(load_id), _fragment_instance_id(fragment_instance_id) {
const std::string& remote_spill_path, std::shared_ptr<FileSystem> fs)
: _load_id(load_id), _fragment_instance_id(fragment_instance_id), _fs(fs) {
_remote_spill_path = remote_spill_path + "/load_spill";
}
@ -90,6 +90,7 @@ private:
TUniqueId _load_id; // Unique ID for the load.
TUniqueId _fragment_instance_id; // Unique ID for the fragment instance.
std::string _remote_spill_path; // Path for remote spill storage.
std::shared_ptr<FileSystem> _fs; // File system for remote storage.
std::unique_ptr<spill::DirManager> _remote_dir_manager; // Manager for remote directories.
std::unique_ptr<spill::BlockManager> _block_manager; // Manager for blocks.
std::unique_ptr<LoadSpillBlockContainer> _block_container; // Container for blocks.

View File

@ -248,9 +248,9 @@ TEST_F(PartitionChunkWriterTest, spill_partition_chunk_writer) {
return ws;
});
auto partition_chunk_writer_ctx =
std::make_shared<SpillPartitionChunkWriterContext>(SpillPartitionChunkWriterContext{
mock_writer_factory, location_provider, 100, false, _fragment_context.get(), tuple_desc, nullptr});
auto partition_chunk_writer_ctx = std::make_shared<SpillPartitionChunkWriterContext>(
SpillPartitionChunkWriterContext{mock_writer_factory, location_provider, 100, false, nullptr,
_fragment_context.get(), tuple_desc, nullptr, nullptr});
auto partition_chunk_writer_factory =
std::make_unique<SpillPartitionChunkWriterFactory>(partition_chunk_writer_ctx);
std::vector<int8_t> partition_field_null_list;
@ -361,8 +361,8 @@ TEST_F(PartitionChunkWriterTest, sort_column_asc) {
sort_ordering->sort_descs.descs.emplace_back(true, false);
const size_t max_file_size = 1073741824; // 1GB
auto partition_chunk_writer_ctx = std::make_shared<SpillPartitionChunkWriterContext>(
SpillPartitionChunkWriterContext{mock_writer_factory, location_provider, max_file_size, false,
_fragment_context.get(), tuple_desc, sort_ordering});
SpillPartitionChunkWriterContext{mock_writer_factory, location_provider, max_file_size, false, nullptr,
_fragment_context.get(), tuple_desc, nullptr, sort_ordering});
auto partition_chunk_writer_factory =
std::make_unique<SpillPartitionChunkWriterFactory>(partition_chunk_writer_ctx);
std::vector<int8_t> partition_field_null_list;
@ -508,8 +508,8 @@ TEST_F(PartitionChunkWriterTest, sort_column_desc) {
sort_ordering->sort_descs.descs.emplace_back(false, false);
const size_t max_file_size = 1073741824; // 1GB
auto partition_chunk_writer_ctx = std::make_shared<SpillPartitionChunkWriterContext>(
SpillPartitionChunkWriterContext{mock_writer_factory, location_provider, max_file_size, false,
_fragment_context.get(), tuple_desc, sort_ordering});
SpillPartitionChunkWriterContext{mock_writer_factory, location_provider, max_file_size, false, nullptr,
_fragment_context.get(), tuple_desc, nullptr, sort_ordering});
auto partition_chunk_writer_factory =
std::make_unique<SpillPartitionChunkWriterFactory>(partition_chunk_writer_ctx);
std::vector<int8_t> partition_field_null_list;
@ -656,8 +656,8 @@ TEST_F(PartitionChunkWriterTest, sort_multiple_columns) {
sort_ordering->sort_descs.descs.emplace_back(false, false);
const size_t max_file_size = 1073741824; // 1GB
auto partition_chunk_writer_ctx = std::make_shared<SpillPartitionChunkWriterContext>(
SpillPartitionChunkWriterContext{mock_writer_factory, location_provider, max_file_size, false,
_fragment_context.get(), tuple_desc, sort_ordering});
SpillPartitionChunkWriterContext{mock_writer_factory, location_provider, max_file_size, false, nullptr,
_fragment_context.get(), tuple_desc, nullptr, sort_ordering});
auto partition_chunk_writer_factory =
std::make_unique<SpillPartitionChunkWriterFactory>(partition_chunk_writer_ctx);
std::vector<int8_t> partition_field_null_list;
@ -783,10 +783,10 @@ TEST_F(PartitionChunkWriterTest, sort_column_with_schema_chunk) {
auto sort_ordering = std::make_shared<SortOrdering>();
sort_ordering->sort_key_idxes = {0};
sort_ordering->sort_descs.descs.emplace_back(true, false);
const size_t max_file_size = 1073741824; // 1GB
const size_t max_file_size = 1073741823; // 1GB
auto partition_chunk_writer_ctx = std::make_shared<SpillPartitionChunkWriterContext>(
SpillPartitionChunkWriterContext{mock_writer_factory, location_provider, max_file_size, false,
_fragment_context.get(), tuple_desc, sort_ordering});
SpillPartitionChunkWriterContext{mock_writer_factory, location_provider, max_file_size, false, nullptr,
_fragment_context.get(), tuple_desc, nullptr, sort_ordering});
auto partition_chunk_writer_factory =
std::make_unique<SpillPartitionChunkWriterFactory>(partition_chunk_writer_ctx);
std::vector<int8_t> partition_field_null_list;

View File

@ -101,7 +101,7 @@ TEST_F(SinkMemoryManagerTest, kill_victim) {
auto partition_chunk_writer_ctx =
std::make_shared<SpillPartitionChunkWriterContext>(SpillPartitionChunkWriterContext{
nullptr, nullptr, 1024, false, _fragment_context.get(), tuple_desc, nullptr});
nullptr, nullptr, 1024, false, nullptr, _fragment_context.get(), tuple_desc, nullptr});
auto sink_mem_mgr = std::make_shared<SinkOperatorMemoryManager>();
std::map<PartitionKey, PartitionChunkWriterPtr> partition_chunk_writers;

View File

@ -87,8 +87,8 @@ protected:
TEST_F(SpillMemTableSinkTest, test_flush_chunk) {
int64_t tablet_id = 1;
int64_t txn_id = 1;
std::unique_ptr<LoadSpillBlockManager> block_manager =
std::make_unique<LoadSpillBlockManager>(TUniqueId(), UniqueId(tablet_id, txn_id).to_thrift(), kTestDir);
std::unique_ptr<LoadSpillBlockManager> block_manager = std::make_unique<LoadSpillBlockManager>(
TUniqueId(), UniqueId(tablet_id, txn_id).to_thrift(), kTestDir, nullptr);
ASSERT_OK(block_manager->init());
std::unique_ptr<TabletWriter> tablet_writer = std::make_unique<HorizontalGeneralTabletWriter>(
_tablet_mgr.get(), tablet_id, _tablet_schema, txn_id, false);
@ -127,8 +127,8 @@ TEST_F(SpillMemTableSinkTest, test_flush_chunk) {
TEST_F(SpillMemTableSinkTest, test_flush_chunk_with_deletes) {
int64_t tablet_id = 1;
int64_t txn_id = 1;
std::unique_ptr<LoadSpillBlockManager> block_manager =
std::make_unique<LoadSpillBlockManager>(TUniqueId(), UniqueId(tablet_id, txn_id).to_thrift(), kTestDir);
std::unique_ptr<LoadSpillBlockManager> block_manager = std::make_unique<LoadSpillBlockManager>(
TUniqueId(), UniqueId(tablet_id, txn_id).to_thrift(), kTestDir, nullptr);
ASSERT_OK(block_manager->init());
std::unique_ptr<TabletWriter> tablet_writer = std::make_unique<HorizontalPkTabletWriter>(
_tablet_mgr.get(), tablet_id, _tablet_schema, txn_id, nullptr, false);
@ -165,8 +165,8 @@ TEST_F(SpillMemTableSinkTest, test_flush_chunk_with_deletes) {
TEST_F(SpillMemTableSinkTest, test_flush_chunk2) {
int64_t tablet_id = 1;
int64_t txn_id = 1;
std::unique_ptr<LoadSpillBlockManager> block_manager =
std::make_unique<LoadSpillBlockManager>(TUniqueId(), UniqueId(tablet_id, txn_id).to_thrift(), kTestDir);
std::unique_ptr<LoadSpillBlockManager> block_manager = std::make_unique<LoadSpillBlockManager>(
TUniqueId(), UniqueId(tablet_id, txn_id).to_thrift(), kTestDir, nullptr);
ASSERT_OK(block_manager->init());
std::unique_ptr<TabletWriter> tablet_writer = std::make_unique<HorizontalGeneralTabletWriter>(
_tablet_mgr.get(), tablet_id, _tablet_schema, txn_id, false);
@ -180,8 +180,8 @@ TEST_F(SpillMemTableSinkTest, test_flush_chunk2) {
TEST_F(SpillMemTableSinkTest, test_flush_chunk_with_delete2) {
int64_t tablet_id = 1;
int64_t txn_id = 1;
std::unique_ptr<LoadSpillBlockManager> block_manager =
std::make_unique<LoadSpillBlockManager>(TUniqueId(), UniqueId(tablet_id, txn_id).to_thrift(), kTestDir);
std::unique_ptr<LoadSpillBlockManager> block_manager = std::make_unique<LoadSpillBlockManager>(
TUniqueId(), UniqueId(tablet_id, txn_id).to_thrift(), kTestDir, nullptr);
ASSERT_OK(block_manager->init());
std::unique_ptr<TabletWriter> tablet_writer = std::make_unique<HorizontalPkTabletWriter>(
_tablet_mgr.get(), tablet_id, _tablet_schema, txn_id, nullptr, false);
@ -195,8 +195,8 @@ TEST_F(SpillMemTableSinkTest, test_flush_chunk_with_delete2) {
TEST_F(SpillMemTableSinkTest, test_flush_chunk_with_limit) {
int64_t tablet_id = 1;
int64_t txn_id = 1;
std::unique_ptr<LoadSpillBlockManager> block_manager =
std::make_unique<LoadSpillBlockManager>(TUniqueId(), UniqueId(tablet_id, txn_id).to_thrift(), kTestDir);
std::unique_ptr<LoadSpillBlockManager> block_manager = std::make_unique<LoadSpillBlockManager>(
TUniqueId(), UniqueId(tablet_id, txn_id).to_thrift(), kTestDir, nullptr);
ASSERT_OK(block_manager->init());
std::unique_ptr<TabletWriter> tablet_writer = std::make_unique<HorizontalGeneralTabletWriter>(
_tablet_mgr.get(), tablet_id, _tablet_schema, txn_id, false);
@ -238,8 +238,8 @@ TEST_F(SpillMemTableSinkTest, test_flush_chunk_with_limit) {
TEST_F(SpillMemTableSinkTest, test_merge) {
int64_t tablet_id = 1;
int64_t txn_id = 1;
std::unique_ptr<LoadSpillBlockManager> block_manager =
std::make_unique<LoadSpillBlockManager>(TUniqueId(), UniqueId(tablet_id, txn_id).to_thrift(), kTestDir);
std::unique_ptr<LoadSpillBlockManager> block_manager = std::make_unique<LoadSpillBlockManager>(
TUniqueId(), UniqueId(tablet_id, txn_id).to_thrift(), kTestDir, nullptr);
ASSERT_OK(block_manager->init());
std::unique_ptr<TabletWriter> tablet_writer = std::make_unique<HorizontalGeneralTabletWriter>(
_tablet_mgr.get(), tablet_id, _tablet_schema, txn_id, false);
@ -263,8 +263,8 @@ TEST_F(SpillMemTableSinkTest, test_out_of_disk_space) {
});
int64_t tablet_id = 1;
int64_t txn_id = 1;
std::unique_ptr<LoadSpillBlockManager> block_manager =
std::make_unique<LoadSpillBlockManager>(TUniqueId(), UniqueId(tablet_id, txn_id).to_thrift(), kTestDir);
std::unique_ptr<LoadSpillBlockManager> block_manager = std::make_unique<LoadSpillBlockManager>(
TUniqueId(), UniqueId(tablet_id, txn_id).to_thrift(), kTestDir, nullptr);
ASSERT_OK(block_manager->init());
std::unique_ptr<TabletWriter> tablet_writer = std::make_unique<HorizontalGeneralTabletWriter>(
_tablet_mgr.get(), tablet_id, _tablet_schema, txn_id, false);

View File

@ -35,7 +35,7 @@ protected:
TEST_F(LoadSpillBlockManagerTest, test_basic) {
std::unique_ptr<LoadSpillBlockManager> block_manager =
std::make_unique<LoadSpillBlockManager>(TUniqueId(), TUniqueId(), kTestDir);
std::make_unique<LoadSpillBlockManager>(TUniqueId(), TUniqueId(), kTestDir, nullptr);
ASSERT_OK(block_manager->init());
ASSIGN_OR_ABORT(auto block, block_manager->acquire_block(1024));
ASSERT_OK(block_manager->release_block(block));
@ -43,7 +43,7 @@ TEST_F(LoadSpillBlockManagerTest, test_basic) {
TEST_F(LoadSpillBlockManagerTest, test_write_read) {
std::unique_ptr<LoadSpillBlockManager> block_manager =
std::make_unique<LoadSpillBlockManager>(TUniqueId(), TUniqueId(), kTestDir);
std::make_unique<LoadSpillBlockManager>(TUniqueId(), TUniqueId(), kTestDir, nullptr);
ASSERT_OK(block_manager->init());
ASSIGN_OR_ABORT(auto block, block_manager->acquire_block(1024));
ASSERT_OK(block->append({Slice("hello"), Slice("world")}));

View File

@ -32,6 +32,7 @@ import com.starrocks.catalog.ScalarType;
import com.starrocks.catalog.StructField;
import com.starrocks.catalog.StructType;
import com.starrocks.catalog.Type;
import com.starrocks.common.DdlException;
import com.starrocks.common.FeConstants;
import com.starrocks.common.Pair;
import com.starrocks.connector.ConnectorViewDefinition;
@ -75,11 +76,13 @@ import org.apache.logging.log4j.Logger;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
@ -146,11 +149,12 @@ public class IcebergApiConverter {
return new Schema(icebergSchema.asStructType().fields());
}
public static SortOrder toIcebergSortOrder(Schema schema, List<OrderByElement> orderByElements) {
public static SortOrder toIcebergSortOrder(Schema schema, List<OrderByElement> orderByElements) throws DdlException {
if (orderByElements == null) {
return null;
}
Set<String> addedSortKey = new HashSet<>();
SortOrder.Builder builder = SortOrder.builderFor(schema);
for (OrderByElement orderByElement : orderByElements) {
String columnName = orderByElement.castAsSlotRef();
@ -161,6 +165,9 @@ public class IcebergApiConverter {
} else {
builder.desc(columnName, nullOrder);
}
if (!addedSortKey.add(columnName)) {
throw new DdlException("Duplicate sort key column " + columnName + " is not allowed.");
}
}
SortOrder sortOrder = null;

View File

@ -71,7 +71,9 @@ public class HiveTableSink extends DataSink {
} else {
this.compressionType = sessionVariable.getConnectorSinkCompressionCodec();
}
this.targetMaxFileSize = sessionVariable.getConnectorSinkTargetMaxFileSize();
this.targetMaxFileSize = sessionVariable.getConnectorSinkTargetMaxFileSize() > 0 ?
sessionVariable.getConnectorSinkTargetMaxFileSize() : 1024L * 1024 * 1024;
String catalogName = hiveTable.getCatalogName();
Connector connector = GlobalStateMgr.getCurrentState().getConnectorMgr().getConnector(catalogName);
Preconditions.checkState(connector != null,

View File

@ -59,7 +59,8 @@ public class IcebergTableSink extends DataSink {
this.fileFormat = nativeTable.properties().getOrDefault(DEFAULT_FILE_FORMAT, DEFAULT_FILE_FORMAT_DEFAULT)
.toLowerCase();
this.compressionType = sessionVariable.getConnectorSinkCompressionCodec();
this.targetMaxFileSize = sessionVariable.getConnectorSinkTargetMaxFileSize();
this.targetMaxFileSize = sessionVariable.getConnectorSinkTargetMaxFileSize() > 0 ?
sessionVariable.getConnectorSinkTargetMaxFileSize() : 1024L * 1024 * 1024;
this.targetBranch = targetBranch;
String catalogName = icebergTable.getCatalogName();
@ -100,6 +101,7 @@ public class IcebergTableSink extends DataSink {
TDataSink tDataSink = new TDataSink(TDataSinkType.ICEBERG_TABLE_SINK);
TIcebergTableSink tIcebergTableSink = new TIcebergTableSink();
tIcebergTableSink.setTarget_table_id(targetTableId);
tIcebergTableSink.setTuple_id(desc.getId().asInt());
tIcebergTableSink.setLocation(location);
tIcebergTableSink.setFile_format(fileFormat);
tIcebergTableSink.setIs_static_partition_sink(isStaticPartitionSink);

View File

@ -27,6 +27,7 @@ import com.starrocks.catalog.PrimitiveType;
import com.starrocks.catalog.ScalarType;
import com.starrocks.catalog.StructType;
import com.starrocks.catalog.Type;
import com.starrocks.common.DdlException;
import com.starrocks.connector.exception.StarRocksConnectorException;
import com.starrocks.connector.hive.RemoteFileInputFormat;
import com.starrocks.persist.ColumnIdExpr;
@ -522,8 +523,12 @@ public class IcebergApiConverterTest {
fields.add(Types.NestedField.optional(3, "data", new Types.StringType()));
Schema schema = new Schema(fields);
SortOrder nullSortOrder = IcebergApiConverter.toIcebergSortOrder(schema, null);
assertEquals(nullSortOrder, null);
try {
SortOrder nullSortOrder = IcebergApiConverter.toIcebergSortOrder(schema, null);
assertEquals(nullSortOrder, null);
} catch (DdlException e) {
assertTrue(false);
}
List<OrderByElement> orderByElements = new ArrayList<>();
Expr expr1 = ColumnIdExpr.fromSql("id").getExpr();
@ -531,9 +536,26 @@ public class IcebergApiConverterTest {
Expr expr2 = ColumnIdExpr.fromSql("dt").getExpr();
orderByElements.add(new OrderByElement(expr2, false, false));
SortOrder sortOrder = IcebergApiConverter.toIcebergSortOrder(schema, orderByElements);
SortOrder sortOrder = null;
try {
sortOrder = IcebergApiConverter.toIcebergSortOrder(schema, orderByElements);
} catch (DdlException e) {
assertTrue(false);
}
List<SortField> sortFields = sortOrder.fields();
assertEquals(sortFields.size(), 2);
// duplicate sorted columns
Expr expr3 = ColumnIdExpr.fromSql("id").getExpr();
orderByElements.add(new OrderByElement(expr3, true, true));
sortOrder = null;
try {
sortOrder = IcebergApiConverter.toIcebergSortOrder(schema, orderByElements);
} catch (DdlException e) {
assertTrue(true);
}
assertEquals(sortOrder, null);
}
}