[Enhancement] Optimize the iceberg sink local sorting based on the spill partition writer (backport #62096) (#62252)

Signed-off-by: GavinMar <yangguansuo@starrocks.com>
This commit is contained in:
Gavin 2025-08-25 13:57:40 +08:00 committed by GitHub
parent bdb0b0e467
commit 66b0f412c4
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
18 changed files with 897 additions and 51 deletions

View File

@ -17,6 +17,8 @@
#include <algorithm>
#include <utility>
#include "exec/sorting/sorting.h"
namespace starrocks {
#ifdef BE_TEST
@ -28,8 +30,13 @@ Schema::Schema(Fields fields) : Schema(fields, KeysType::DUP_KEYS, {}) {
#endif
Schema::Schema(Fields fields, KeysType keys_type, std::vector<ColumnId> sort_key_idxes)
: Schema(std::move(fields), keys_type, std::move(sort_key_idxes), nullptr) {}
Schema::Schema(Fields fields, KeysType keys_type, std::vector<ColumnId> sort_key_idxes,
std::shared_ptr<SortDescs> sort_descs)
: _fields(std::move(fields)),
_sort_key_idxes(std::move(sort_key_idxes)),
_sort_descs(std::move(sort_descs)),
_name_to_index_append_buffer(nullptr),
_keys_type(static_cast<uint8_t>(keys_type)) {
@ -52,9 +59,16 @@ Schema::Schema(Schema* schema, const std::vector<ColumnId>& cids)
_fields[i] = schema->_fields[cids[i]];
cids_to_field_id[cids[i]] = i;
}
for (auto idx : ori_sort_idxes) {
if (schema->sort_descs()) {
_sort_descs = std::make_shared<SortDescs>();
}
for (size_t pos = 0; pos < ori_sort_idxes.size(); ++pos) {
auto idx = ori_sort_idxes[pos];
if (cids_to_field_id.count(idx) > 0) {
_sort_key_idxes.emplace_back(cids_to_field_id[idx]);
if (_sort_descs && pos < schema->sort_descs()->descs.size()) {
_sort_descs->descs.emplace_back(schema->sort_descs()->descs[pos]);
}
}
}
auto is_key = [](const FieldPtr& f) { return f->is_key(); };
@ -88,6 +102,7 @@ Schema::Schema(Schema* schema)
_fields[i] = schema->_fields[i];
}
_sort_key_idxes = schema->sort_key_idxes();
_sort_descs = schema->sort_descs();
if (schema->_name_to_index_append_buffer == nullptr) {
// share the name_to_index with schema, later append fields will be added to _name_to_index_append_buffer
schema->_share_name_to_index = true;
@ -109,6 +124,7 @@ Schema::Schema(const Schema& schema)
_fields[i] = schema._fields[i];
}
_sort_key_idxes = schema.sort_key_idxes();
_sort_descs = schema.sort_descs();
if (schema._name_to_index_append_buffer == nullptr) {
// share the name_to_index with schema&, later append fields will be added to _name_to_index_append_buffer
schema._share_name_to_index = true;
@ -132,6 +148,7 @@ Schema& Schema::operator=(const Schema& other) {
this->_fields[i] = other._fields[i];
}
this->_sort_key_idxes = other.sort_key_idxes();
this->_sort_descs = other.sort_descs();
if (other._name_to_index_append_buffer == nullptr) {
// share the name_to_index with schema&, later append fields will be added to _name_to_index_append_buffer
other._share_name_to_index = true;

View File

@ -24,6 +24,8 @@
namespace starrocks {
struct SortDescs;
// TODO: move constructor and move assignment
class Schema {
public:
@ -39,6 +41,9 @@ public:
explicit Schema(Fields fields, KeysType keys_type, std::vector<ColumnId> sort_key_idxes);
explicit Schema(Fields fields, KeysType keys_type, std::vector<ColumnId> sort_key_idxes,
std::shared_ptr<SortDescs> sort_descs);
// if we use this constructor and share the name_to_index with another schema,
// we must make sure another shema is read only!!!
explicit Schema(Schema* schema);
@ -61,6 +66,10 @@ public:
const std::vector<ColumnId> sort_key_idxes() const { return _sort_key_idxes; }
void append_sort_key_idx(ColumnId idx) { _sort_key_idxes.emplace_back(idx); }
void set_sort_key_idxes(const std::vector<ColumnId>& sort_key_idxes) { _sort_key_idxes = sort_key_idxes; }
std::shared_ptr<SortDescs> sort_descs() const { return _sort_descs; }
void set_sort_descs(const std::shared_ptr<SortDescs>& sort_descs) { _sort_descs = sort_descs; }
void reserve(size_t size) { _fields.reserve(size); }
@ -133,6 +142,7 @@ private:
Fields _fields;
size_t _num_keys = 0;
std::vector<ColumnId> _sort_key_idxes;
std::shared_ptr<SortDescs> _sort_descs;
std::shared_ptr<std::unordered_map<std::string_view, size_t>> _name_to_index;
// If we share the same _name_to_index with another vectorized schema,

View File

@ -252,7 +252,7 @@ Status SpillPartitionChunkWriter::_flush_to_file() {
Status SpillPartitionChunkWriter::_flush_chunk(Chunk* chunk, bool split) {
if (chunk->get_slot_id_to_index_map().empty()) {
auto slot_map = _base_chunk->get_slot_id_to_index_map();
auto& slot_map = _base_chunk->get_slot_id_to_index_map();
for (auto& it : slot_map) {
chunk->set_slot_id_to_index(it.first, it.second);
}
@ -317,17 +317,26 @@ SchemaPtr SpillPartitionChunkWriter::_make_schema() {
auto field = std::make_shared<Field>(slot->id(), slot->col_name(), type_info, slot->is_nullable());
fields.push_back(field);
}
SchemaPtr schema =
std::make_shared<Schema>(std::move(fields), KeysType::DUP_KEYS,
_sort_ordering ? _sort_ordering->sort_key_idxes : std::vector<uint32_t>());
SchemaPtr schema;
if (_sort_ordering) {
schema = std::make_shared<Schema>(std::move(fields), KeysType::DUP_KEYS, _sort_ordering->sort_key_idxes,
std::make_shared<SortDescs>(_sort_ordering->sort_descs));
} else {
schema = std::make_shared<Schema>(std::move(fields), KeysType::DUP_KEYS, std::vector<uint32_t>(), nullptr);
}
return schema;
}
ChunkPtr SpillPartitionChunkWriter::_create_schema_chunk(const ChunkPtr& base_chunk, size_t num_rows) {
if (!_schema) {
auto schema = base_chunk->schema();
const SchemaPtr& schema = base_chunk->schema();
if (schema) {
_schema = schema;
if (_sort_ordering) {
_schema->set_sort_key_idxes(_sort_ordering->sort_key_idxes);
_schema->set_sort_descs(std::make_shared<SortDescs>(_sort_ordering->sort_descs));
}
} else {
_schema = _make_schema();
}

View File

@ -35,7 +35,11 @@ bool SinkOperatorMemoryManager::kill_victim() {
// For spillable partition writer, choose the the writer with the largest memory size that can be spilled.
PartitionChunkWriterPtr victim = nullptr;
for (auto& [key, writer] : *_candidates) {
if (victim && victim->get_flushable_bytes() > writer->get_flushable_bytes()) {
int64_t flushable_bytes = writer->get_flushable_bytes();
if (flushable_bytes == 0) {
continue;
}
if (victim && flushable_bytes < victim->get_flushable_bytes()) {
continue;
}
victim = writer;
@ -60,7 +64,7 @@ int64_t SinkOperatorMemoryManager::update_releasable_memory() {
int64_t SinkOperatorMemoryManager::update_writer_occupied_memory() {
int64_t writer_occupied_memory = 0;
for (auto& [_, writer] : *_candidates) {
writer_occupied_memory += writer->get_written_bytes();
writer_occupied_memory += writer->get_flushable_bytes();
}
_writer_occupied_memory.store(writer_occupied_memory);
return _writer_occupied_memory;
@ -113,33 +117,23 @@ bool SinkMemoryManager::_apply_on_mem_tracker(SinkOperatorMemoryManager* child_m
auto available_memory = [&]() { return mem_tracker->limit() - mem_tracker->consumption(); };
auto low_watermark = static_cast<int64_t>(mem_tracker->limit() * _low_watermark_ratio);
auto exceed_urgent_space = [&]() {
return _total_writer_occupied_memory() > _query_tracker->limit() * _urgent_space_ratio;
};
if (available_memory() <= low_watermark) {
child_manager->update_releasable_memory();
int64_t flush_watermark = _query_tracker->limit() * _urgent_space_ratio;
while (available_memory() <= low_watermark) {
child_manager->update_writer_occupied_memory();
int64_t total_occupied_memory = _total_writer_occupied_memory();
LOG_EVERY_SECOND(WARNING) << "consumption: " << mem_tracker->consumption()
<< " releasable_memory: " << _total_releasable_memory()
<< " writer_allocated_memory: " << _total_writer_occupied_memory();
// trigger early close
while (exceed_urgent_space() && available_memory() <= low_watermark) {
bool found = child_manager->kill_victim();
if (!found) {
break;
}
child_manager->update_releasable_memory();
child_manager->update_writer_occupied_memory();
<< ", writer_allocated_memory: " << total_occupied_memory
<< ", flush_watermark: " << flush_watermark;
if (total_occupied_memory < flush_watermark) {
break;
}
bool found = child_manager->kill_victim();
if (!found) {
break;
}
}
child_manager->update_releasable_memory();
if (available_memory() <= low_watermark && _total_releasable_memory() > 0) {
return false;
}
return true;
return available_memory() > low_watermark;
}
} // namespace starrocks::connector

View File

@ -240,6 +240,9 @@ IcebergTableDescriptor::IcebergTableDescriptor(const TTableDescriptor& tdesc, Ob
_source_column_names = tdesc.icebergTable.partition_column_names; //to compat with lower fe, set this also
_partition_column_names = tdesc.icebergTable.partition_column_names;
}
if (tdesc.icebergTable.__isset.sort_order) {
_t_sort_order = tdesc.icebergTable.sort_order;
}
}
std::vector<int32_t> IcebergTableDescriptor::partition_source_index_in_schema() {

View File

@ -251,6 +251,7 @@ public:
const std::vector<std::string> full_column_names();
std::vector<int32_t> partition_source_index_in_schema();
bool has_base_path() const override { return true; }
const TSortOrder& sort_order() const { return _t_sort_order; }
Status set_partition_desc_map(const TIcebergTable& thrift_table, ObjectPool* pool);
@ -260,6 +261,7 @@ private:
std::vector<std::string> _partition_column_names;
std::vector<std::string> _transform_exprs;
std::vector<TExpr> _partition_exprs;
TSortOrder _t_sort_order;
};
class FileTableDescriptor : public HiveTableDescriptor {

View File

@ -81,6 +81,25 @@ Status IcebergTableSink::decompose_to_pipeline(pipeline::OpFactories prev_operat
sink_ctx->transform_exprs = iceberg_table_desc->get_transform_exprs();
sink_ctx->fragment_context = fragment_ctx;
sink_ctx->tuple_desc_id = t_iceberg_sink.tuple_id;
auto& sort_order = iceberg_table_desc->sort_order();
if (!sort_order.sort_key_idxes.empty()) {
sink_ctx->sort_ordering = std::make_shared<connector::SortOrdering>();
sink_ctx->sort_ordering->sort_key_idxes.assign(sort_order.sort_key_idxes.begin(),
sort_order.sort_key_idxes.end());
sink_ctx->sort_ordering->sort_descs.descs.reserve(sort_order.sort_key_idxes.size());
for (size_t idx = 0; idx < sort_order.sort_key_idxes.size(); ++idx) {
bool is_asc = idx < sort_order.is_ascs.size() ? sort_order.is_ascs[idx] : true;
bool is_null_first = false;
if (idx < sort_order.is_null_firsts.size()) {
is_null_first = sort_order.is_null_firsts[idx];
} else if (is_asc) {
// If ascending, nulls are first by default
// If descending, nulls are last by default
is_null_first = true;
}
sink_ctx->sort_ordering->sort_descs.descs.emplace_back(is_asc, is_null_first);
}
}
auto connector = connector::ConnectorManager::default_instance()->get(connector::Connector::ICEBERG);
auto sink_provider = connector->create_data_sink_provider();

View File

@ -21,18 +21,36 @@
#include "column/chunk.h"
#include "common/config.h"
#include "exec/sorting/sorting.h"
#include "gutil/strings/substitute.h"
#include "storage/chunk_helper.h"
namespace starrocks {
// Compare the row of index |m| in |lhs|, with the row of index |n| in |rhs|.
inline int compare_chunk(size_t key_columns, const std::vector<uint32_t>& sort_key_idxes, const Chunk& lhs, size_t m,
const Chunk& rhs, size_t n, const std::string& merge_condition) {
for (unsigned int sort_key_idx : sort_key_idxes) {
inline int compare_column(const ColumnPtr& lc, size_t m, const ColumnPtr& rc, size_t n, const SortDesc* sort_desc) {
int sort_order = 1;
int nan_direction = -1;
if (sort_desc) {
sort_order = sort_desc->sort_order;
nan_direction = sort_desc->nan_direction();
}
return lc->compare_at(m, n, *rc, nan_direction) * sort_order;
}
// Compare the row of index |m| in |lhs|, with the row of index |n| in |rhs|.
inline int compare_chunk(size_t key_columns, const std::vector<uint32_t>& sort_key_idxes,
const std::shared_ptr<SortDescs>& sort_descs, const Chunk& lhs, size_t m, const Chunk& rhs,
size_t n, const std::string& merge_condition) {
for (size_t pos = 0; pos < sort_key_idxes.size(); ++pos) {
uint32_t sort_key_idx = sort_key_idxes[pos];
const ColumnPtr& lc = lhs.get_column_by_index(sort_key_idx);
const ColumnPtr& rc = rhs.get_column_by_index(sort_key_idx);
if (int r = lc->compare_at(m, n, *rc, -1); r != 0) {
SortDesc* sort_desc = nullptr;
if (sort_descs && pos < sort_descs->descs.size()) {
sort_desc = &sort_descs->descs[pos];
}
if (int r = compare_column(lc, m, rc, n, sort_desc); r != 0) {
return r;
}
}
@ -42,7 +60,7 @@ inline int compare_chunk(size_t key_columns, const std::vector<uint32_t>& sort_k
if (!merge_condition.empty() && lhs.columns().size() > key_columns) {
const ColumnPtr& lc = lhs.get_column_by_index(key_columns);
const ColumnPtr& rc = rhs.get_column_by_index(key_columns);
if (int r = lc->compare_at(m, n, *rc, -1); r != 0) {
if (int r = compare_column(lc, m, rc, n, nullptr); r != 0) {
return r;
}
}
@ -74,23 +92,26 @@ protected:
class ComparableChunk : public MergingChunk {
public:
explicit ComparableChunk(Chunk* chunk, size_t order, size_t key_columns, std::vector<uint32_t> sort_key_idxes,
std::string merge_condition)
std::shared_ptr<SortDescs> sort_descs, std::string merge_condition)
: MergingChunk(chunk),
_order(order),
_key_columns(key_columns),
_sort_key_idxes(std::move(sort_key_idxes)),
_sort_descs(std::move(sort_descs)),
_merge_condition(std::move(merge_condition)) {}
explicit ComparableChunk(Chunk* chunk, size_t order, size_t key_columns, std::vector<uint32_t> sort_key_idxes,
std::string merge_condition, std::shared_ptr<std::vector<uint64_t>> rssid_rowids)
: ComparableChunk(chunk, order, key_columns, std::move(sort_key_idxes), std::move(merge_condition)) {
std::shared_ptr<SortDescs> sort_descs, std::string merge_condition,
std::shared_ptr<std::vector<uint64_t>> rssid_rowids)
: ComparableChunk(chunk, order, key_columns, std::move(sort_key_idxes), std::move(sort_descs),
std::move(merge_condition)) {
_rssid_rowids = std::move(rssid_rowids);
}
bool operator>(const ComparableChunk& rhs) const {
DCHECK_EQ(_key_columns, rhs._key_columns);
int r = compare_chunk(_key_columns, _sort_key_idxes, *_chunk, _compared_row, *rhs._chunk, rhs._compared_row,
_merge_condition);
int r = compare_chunk(_key_columns, _sort_key_idxes, _sort_descs, *_chunk, _compared_row, *rhs._chunk,
rhs._compared_row, _merge_condition);
return (r > 0) | ((r == 0) & (_order > rhs._order));
}
@ -115,8 +136,8 @@ public:
}
bool less_than(size_t lhs_row, const ComparableChunk& rhs) {
int r = compare_chunk(_key_columns, _sort_key_idxes, *_chunk, lhs_row, *rhs._chunk, rhs._compared_row,
_merge_condition);
int r = compare_chunk(_key_columns, _sort_key_idxes, _sort_descs, *_chunk, lhs_row, *rhs._chunk,
rhs._compared_row, _merge_condition);
return (r < 0) | ((r == 0) & (_order < rhs._order));
}
@ -127,6 +148,7 @@ private:
uint16_t _order;
uint16_t _key_columns;
std::vector<uint32_t> _sort_key_idxes;
std::shared_ptr<SortDescs> _sort_descs;
std::string _merge_condition;
std::shared_ptr<std::vector<uint64_t>> _rssid_rowids;
};
@ -354,10 +376,10 @@ inline Status HeapMergeIterator::fill(size_t child) {
}
if (need_rssid_rowids) {
_heap.push(ComparableChunk{chunk, child, _schema.num_key_fields(), _schema.sort_key_idxes(),
merge_condition, std::move(rssid_rowids)});
_schema.sort_descs(), merge_condition, std::move(rssid_rowids)});
} else {
_heap.push(
ComparableChunk{chunk, child, _schema.num_key_fields(), _schema.sort_key_idxes(), merge_condition});
_heap.push(ComparableChunk{chunk, child, _schema.num_key_fields(), _schema.sort_key_idxes(),
_schema.sort_descs(), merge_condition});
}
} else if (st.is_end_of_file()) {
// ignore Status::EndOfFile.

View File

@ -35,6 +35,8 @@ set(EXEC_FILES
./connector_sink/file_chunk_sink_test.cpp
./connector_sink/partition_chunk_writer_test.cpp
./connector_sink/async_flush_output_stream_test.cpp
./connector_sink/iceberg_table_sink_test.cpp
./connector_sink/sink_memory_manager_test.cpp
./fs/azure/azblob_uri_test.cpp
./fs/azure/fs_azblob_test.cpp
./fs/azure/utils_test.cpp

View File

@ -0,0 +1,98 @@
// Copyright 2021-present StarRocks, Inc. All rights reserved. //
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "runtime/iceberg_table_sink.h"
#include <gmock/gmock.h>
#include <gtest/gtest-param-test.h>
#include <gtest/gtest.h>
#include <future>
#include <thread>
#include "exec/pipeline/empty_set_operator.h"
#include "exec/pipeline/fragment_context.h"
#include "runtime/descriptor_helper.h"
#include "testutil/assert.h"
namespace starrocks {
class IcebergTableSinkTest : public ::testing::Test {
protected:
void SetUp() override {
_fragment_context = std::make_shared<pipeline::FragmentContext>();
_fragment_context->set_runtime_state(std::make_shared<RuntimeState>());
_runtime_state = _fragment_context->runtime_state();
}
void TearDown() override {}
ObjectPool _pool;
std::shared_ptr<pipeline::FragmentContext> _fragment_context;
RuntimeState* _runtime_state;
};
TEST_F(IcebergTableSinkTest, decompose_to_pipeline) {
TDescriptorTableBuilder table_desc_builder;
TSlotDescriptorBuilder slot_desc_builder;
auto slot1 = slot_desc_builder.type(LogicalType::TYPE_INT).column_name("c1").column_pos(0).nullable(true).build();
TTupleDescriptorBuilder tuple_desc_builder;
tuple_desc_builder.add_slot(slot1);
tuple_desc_builder.build(&table_desc_builder);
DescriptorTbl* tbl = nullptr;
EXPECT_OK(DescriptorTbl::create(_runtime_state, &_pool, table_desc_builder.desc_tbl(), &tbl,
config::vector_chunk_size));
_runtime_state->set_desc_tbl(tbl);
TIcebergTable t_iceberg_table;
TColumn t_column;
t_column.__set_column_name("c1");
t_iceberg_table.__set_columns({t_column});
TSortOrder sort_order;
sort_order.__set_sort_key_idxes({0});
sort_order.__set_is_ascs({true});
sort_order.__set_is_null_firsts({true});
t_iceberg_table.__set_sort_order(sort_order);
TTableDescriptor tdesc;
tdesc.__set_icebergTable(t_iceberg_table);
IcebergTableDescriptor* ice_table_desc = _pool.add(new IcebergTableDescriptor(tdesc, &_pool));
tbl->get_tuple_descriptor(0)->set_table_desc(ice_table_desc);
tbl->_tbl_desc_map[0] = ice_table_desc;
auto context = std::make_shared<pipeline::PipelineBuilderContext>(_fragment_context.get(), 1, 1, false);
TDataSink data_sink;
TIcebergTableSink iceberg_table_sink;
data_sink.iceberg_table_sink = iceberg_table_sink;
std::vector<starrocks::TExpr> exprs = {};
IcebergTableSink sink(&_pool, exprs);
auto connector = connector::ConnectorManager::default_instance()->get(connector::Connector::ICEBERG);
auto sink_provider = connector->create_data_sink_provider();
pipeline::OpFactories prev_operators{std::make_shared<pipeline::EmptySetOperatorFactory>(1, 1)};
EXPECT_OK(sink.decompose_to_pipeline(prev_operators, data_sink, context.get()));
pipeline::Pipeline* pl = const_cast<pipeline::Pipeline*>(context->last_pipeline());
pipeline::OperatorFactory* op_factory = pl->sink_operator_factory();
auto connector_sink_factory = dynamic_cast<pipeline::ConnectorSinkOperatorFactory*>(op_factory);
auto sink_ctx = dynamic_cast<connector::IcebergChunkSinkContext*>(connector_sink_factory->_sink_context.get());
EXPECT_EQ(sink_ctx->sort_ordering->sort_key_idxes.size(), 1);
EXPECT_EQ(sink_ctx->sort_ordering->sort_descs.descs.size(), 1);
}
} // namespace starrocks

View File

@ -477,6 +477,389 @@ TEST_F(PartitionChunkWriterTest, sort_column_asc) {
last_row = cur_row;
}
}
std::filesystem::remove_all(fs_base_path);
}
TEST_F(PartitionChunkWriterTest, sort_column_desc) {
std::string fs_base_path = "base_path";
std::filesystem::create_directories(fs_base_path + "/c1");
parquet::Utils::SlotDesc slot_descs[] = {{"c1", TYPE_VARCHAR_DESC}, {""}};
TupleDescriptor* tuple_desc =
parquet::Utils::create_tuple_descriptor(_fragment_context->runtime_state(), &_pool, slot_descs);
auto writer_helper = WriterHelper::instance();
bool commited = false;
Status status;
// Create partition writer
auto mock_writer_factory = std::make_shared<MockFileWriterFactory>();
auto location_provider = std::make_shared<LocationProvider>(fs_base_path, "ffffff", 0, 0, "parquet");
EXPECT_CALL(*mock_writer_factory, create(::testing::_)).WillRepeatedly([](const std::string&) {
WriterAndStream ws;
ws.writer = std::make_unique<MockWriter>();
ws.stream = std::make_unique<Stream>(std::make_unique<MockFile>(), nullptr, nullptr);
return ws;
});
auto sort_ordering = std::make_shared<SortOrdering>();
sort_ordering->sort_key_idxes = {0};
sort_ordering->sort_descs.descs.emplace_back(false, false);
const size_t max_file_size = 1073741824; // 1GB
auto partition_chunk_writer_ctx = std::make_shared<SpillPartitionChunkWriterContext>(
SpillPartitionChunkWriterContext{mock_writer_factory, location_provider, max_file_size, false,
_fragment_context.get(), tuple_desc, sort_ordering});
auto partition_chunk_writer_factory =
std::make_unique<SpillPartitionChunkWriterFactory>(partition_chunk_writer_ctx);
std::vector<int8_t> partition_field_null_list;
auto partition_writer = std::dynamic_pointer_cast<SpillPartitionChunkWriter>(
partition_chunk_writer_factory->create("c1", partition_field_null_list));
auto commit_callback = [&commited](const CommitResult& r) { commited = true; };
auto error_handler = [&status](const Status& s) { status = s; };
auto poller = MockPoller();
partition_writer->set_io_poller(&poller);
partition_writer->set_commit_callback(commit_callback);
partition_writer->set_error_handler(error_handler);
EXPECT_OK(partition_writer->init());
// Normal write and flush to file
{
writer_helper->reset();
for (size_t i = 0; i < 3; ++i) {
// Create a chunk
ChunkPtr chunk = ChunkHelper::new_chunk(*tuple_desc, 3);
std::string suffix = std::to_string(3 - i);
chunk->get_column_by_index(0)->append_datum(Slice("ccc" + suffix));
chunk->get_column_by_index(0)->append_datum(Slice("bbb" + suffix));
chunk->get_column_by_index(0)->append_datum(Slice("aaa" + suffix));
// Write chunk
auto ret = partition_writer->write(chunk.get());
EXPECT_EQ(ret.ok(), true);
EXPECT_GT(partition_writer->get_written_bytes(), 0);
}
// Flush chunks directly
auto ret = partition_writer->finish();
EXPECT_EQ(ret.ok(), true);
EXPECT_EQ(commited, true);
EXPECT_EQ(status.ok(), true);
EXPECT_EQ(writer_helper->written_rows(), 0);
EXPECT_EQ(writer_helper->result_rows(), 9);
EXPECT_EQ(writer_helper->result_chunks().size(), 1);
// Check the result order
auto result_chunk = writer_helper->result_chunks()[0];
auto column = result_chunk->get_column_by_index(0);
std::string last_row;
for (size_t i = 0; i < column->size(); ++i) {
std::string cur_row = column->get(i).get_slice().to_string();
LOG(INFO) << "(" << i << "): " << cur_row;
if (!last_row.empty()) {
EXPECT_LT(cur_row, last_row);
}
last_row = cur_row;
}
}
// Write and spill multiple chunks
{
writer_helper->reset();
commited = false;
status = Status::OK();
for (size_t i = 0; i < 3; ++i) {
// Create a chunk
ChunkPtr chunk = ChunkHelper::new_chunk(*tuple_desc, 3);
std::string suffix = std::to_string(3 - i);
chunk->get_column_by_index(0)->append_datum(Slice("ccc" + suffix));
chunk->get_column_by_index(0)->append_datum(Slice("bbb" + suffix));
chunk->get_column_by_index(0)->append_datum(Slice("aaa" + suffix));
// Write chunk
auto ret = partition_writer->write(chunk.get());
EXPECT_EQ(ret.ok(), true);
EXPECT_GT(partition_writer->get_written_bytes(), 0);
// Flush chunk
ret = partition_writer->_spill();
EXPECT_EQ(ret.ok(), true);
Awaitility()
.timeout(3 * 1000 * 1000) // 3s
.interval(300 * 1000) // 300ms
.until([partition_writer]() {
return partition_writer->_spilling_bytes_usage.load(std::memory_order_relaxed) == 0;
});
EXPECT_EQ(partition_writer->_spilling_bytes_usage.load(std::memory_order_relaxed), 0);
EXPECT_EQ(status.ok(), true);
}
// Merge spill blocks
auto ret = partition_writer->finish();
EXPECT_EQ(ret.ok(), true);
Awaitility()
.timeout(3 * 1000 * 1000) // 3s
.interval(300 * 1000) // 300ms
.until([&commited]() { return commited; });
EXPECT_EQ(commited, true);
EXPECT_EQ(status.ok(), true);
EXPECT_EQ(writer_helper->written_rows(), 0);
EXPECT_EQ(writer_helper->result_rows(), 9);
EXPECT_EQ(writer_helper->result_chunks().size(), 1);
// Check the result order
auto result_chunk = writer_helper->result_chunks()[0];
auto column = result_chunk->get_column_by_index(0);
std::string last_row;
for (size_t i = 0; i < column->size(); ++i) {
std::string cur_row = column->get(i).get_slice().to_string();
LOG(INFO) << "(" << i << "): " << cur_row;
if (!last_row.empty()) {
EXPECT_LT(cur_row, last_row);
}
last_row = cur_row;
}
}
std::filesystem::remove_all(fs_base_path);
}
TEST_F(PartitionChunkWriterTest, sort_multiple_columns) {
std::string fs_base_path = "base_path";
std::filesystem::create_directories(fs_base_path + "/c1");
parquet::Utils::SlotDesc slot_descs[] = {{"c1", TYPE_VARCHAR_DESC}, {"c2", TYPE_VARCHAR_DESC}, {""}};
TupleDescriptor* tuple_desc =
parquet::Utils::create_tuple_descriptor(_fragment_context->runtime_state(), &_pool, slot_descs);
auto writer_helper = WriterHelper::instance();
bool commited = false;
Status status;
// Create partition writer
auto mock_writer_factory = std::make_shared<MockFileWriterFactory>();
auto location_provider = std::make_shared<LocationProvider>(fs_base_path, "ffffff", 0, 0, "parquet");
EXPECT_CALL(*mock_writer_factory, create(::testing::_)).WillRepeatedly([](const std::string&) {
WriterAndStream ws;
ws.writer = std::make_unique<MockWriter>();
ws.stream = std::make_unique<Stream>(std::make_unique<MockFile>(), nullptr, nullptr);
return ws;
});
auto sort_ordering = std::make_shared<SortOrdering>();
sort_ordering->sort_key_idxes = {0, 1};
sort_ordering->sort_descs.descs.emplace_back(true, false);
sort_ordering->sort_descs.descs.emplace_back(false, false);
const size_t max_file_size = 1073741824; // 1GB
auto partition_chunk_writer_ctx = std::make_shared<SpillPartitionChunkWriterContext>(
SpillPartitionChunkWriterContext{mock_writer_factory, location_provider, max_file_size, false,
_fragment_context.get(), tuple_desc, sort_ordering});
auto partition_chunk_writer_factory =
std::make_unique<SpillPartitionChunkWriterFactory>(partition_chunk_writer_ctx);
std::vector<int8_t> partition_field_null_list;
auto partition_writer = std::dynamic_pointer_cast<SpillPartitionChunkWriter>(
partition_chunk_writer_factory->create("c1", partition_field_null_list));
auto commit_callback = [&commited](const CommitResult& r) { commited = true; };
auto error_handler = [&status](const Status& s) { status = s; };
auto poller = MockPoller();
partition_writer->set_io_poller(&poller);
partition_writer->set_commit_callback(commit_callback);
partition_writer->set_error_handler(error_handler);
EXPECT_OK(partition_writer->init());
// Write and spill multiple chunks
{
writer_helper->reset();
for (size_t i = 0; i < 3; ++i) {
// Create a chunk
ChunkPtr chunk = ChunkHelper::new_chunk(*tuple_desc, 3);
std::string suffix = std::to_string(3 - i);
chunk->get_column_by_index(0)->append_datum(Slice("ccc" + suffix));
chunk->get_column_by_index(1)->append_datum(Slice("222" + suffix));
chunk->get_column_by_index(0)->append_datum(Slice("ccc" + suffix));
chunk->get_column_by_index(1)->append_datum(Slice("111" + suffix));
chunk->get_column_by_index(0)->append_datum(Slice("bbb" + suffix));
chunk->get_column_by_index(1)->append_datum(Slice("222" + suffix));
chunk->get_column_by_index(0)->append_datum(Slice("bbb" + suffix));
chunk->get_column_by_index(1)->append_datum(Slice("111" + suffix));
chunk->get_column_by_index(0)->append_datum(Slice("aaa" + suffix));
chunk->get_column_by_index(1)->append_datum(Slice("222" + suffix));
chunk->get_column_by_index(0)->append_datum(Slice("aaa" + suffix));
chunk->get_column_by_index(1)->append_datum(Slice("111" + suffix));
// Write chunk
auto ret = partition_writer->write(chunk.get());
EXPECT_EQ(ret.ok(), true);
EXPECT_GT(partition_writer->get_written_bytes(), 0);
// Flush chunk
ret = partition_writer->_spill();
EXPECT_EQ(ret.ok(), true);
Awaitility()
.timeout(3 * 1000 * 1000) // 3s
.interval(300 * 1000) // 300ms
.until([partition_writer]() {
return partition_writer->_spilling_bytes_usage.load(std::memory_order_relaxed) == 0;
});
EXPECT_EQ(partition_writer->_spilling_bytes_usage.load(std::memory_order_relaxed), 0);
EXPECT_EQ(status.ok(), true);
}
// Merge spill blocks
auto ret = partition_writer->finish();
EXPECT_EQ(ret.ok(), true);
Awaitility()
.timeout(3 * 1000 * 1000) // 3s
.interval(300 * 1000) // 300ms
.until([&commited]() { return commited; });
EXPECT_EQ(commited, true);
EXPECT_EQ(status.ok(), true);
EXPECT_EQ(writer_helper->written_rows(), 0);
EXPECT_EQ(writer_helper->result_rows(), 18);
EXPECT_EQ(writer_helper->result_chunks().size(), 1);
// Check the result order
auto result_chunk = writer_helper->result_chunks()[0];
auto c1 = result_chunk->get_column_by_index(0);
auto c2 = result_chunk->get_column_by_index(1);
std::string c1_last_row;
std::string c2_last_row;
for (size_t i = 0; i < c1->size(); ++i) {
std::string c1_cur_row = c1->get(i).get_slice().to_string();
std::string c2_cur_row = c2->get(i).get_slice().to_string();
LOG(INFO) << "(" << i << "): " << c1_cur_row << ", " << c2_cur_row;
if (!c1_last_row.empty()) {
EXPECT_GE(c1_cur_row, c1_last_row);
}
if (!c2_last_row.empty() && c1_cur_row == c1_last_row) {
EXPECT_LE(c2_cur_row, c2_last_row);
}
c1_last_row = c1_cur_row;
c2_last_row = c2_cur_row;
}
}
std::filesystem::remove_all(fs_base_path);
}
TEST_F(PartitionChunkWriterTest, sort_column_with_schema_chunk) {
std::string fs_base_path = "base_path";
std::filesystem::create_directories(fs_base_path + "/c1");
parquet::Utils::SlotDesc slot_descs[] = {{"c1", TYPE_VARCHAR_DESC}, {""}};
TupleDescriptor* tuple_desc =
parquet::Utils::create_tuple_descriptor(_fragment_context->runtime_state(), &_pool, slot_descs);
Fields fields;
for (auto& slot : tuple_desc->slots()) {
TypeDescriptor type_desc = slot->type();
TypeInfoPtr type_info = get_type_info(type_desc.type, type_desc.precision, type_desc.scale);
auto field = std::make_shared<Field>(slot->id(), slot->col_name(), type_info, slot->is_nullable());
fields.push_back(field);
}
auto schema = std::make_shared<Schema>(std::move(fields), KeysType::DUP_KEYS, std::vector<uint32_t>(), nullptr);
auto writer_helper = WriterHelper::instance();
bool commited = false;
Status status;
// Create partition writer
auto mock_writer_factory = std::make_shared<MockFileWriterFactory>();
auto location_provider = std::make_shared<LocationProvider>(fs_base_path, "ffffff", 0, 0, "parquet");
EXPECT_CALL(*mock_writer_factory, create(::testing::_)).WillRepeatedly([](const std::string&) {
WriterAndStream ws;
ws.writer = std::make_unique<MockWriter>();
ws.stream = std::make_unique<Stream>(std::make_unique<MockFile>(), nullptr, nullptr);
return ws;
});
auto sort_ordering = std::make_shared<SortOrdering>();
sort_ordering->sort_key_idxes = {0};
sort_ordering->sort_descs.descs.emplace_back(true, false);
const size_t max_file_size = 1073741824; // 1GB
auto partition_chunk_writer_ctx = std::make_shared<SpillPartitionChunkWriterContext>(
SpillPartitionChunkWriterContext{mock_writer_factory, location_provider, max_file_size, false,
_fragment_context.get(), tuple_desc, sort_ordering});
auto partition_chunk_writer_factory =
std::make_unique<SpillPartitionChunkWriterFactory>(partition_chunk_writer_ctx);
std::vector<int8_t> partition_field_null_list;
auto partition_writer = std::dynamic_pointer_cast<SpillPartitionChunkWriter>(
partition_chunk_writer_factory->create("c1", partition_field_null_list));
auto commit_callback = [&commited](const CommitResult& r) { commited = true; };
auto error_handler = [&status](const Status& s) { status = s; };
auto poller = MockPoller();
partition_writer->set_io_poller(&poller);
partition_writer->set_commit_callback(commit_callback);
partition_writer->set_error_handler(error_handler);
EXPECT_OK(partition_writer->init());
// Write and spill multiple chunks
{
writer_helper->reset();
for (size_t i = 0; i < 3; ++i) {
// Create a chunk
ChunkPtr chunk = ChunkHelper::new_chunk(*schema, 3);
std::string suffix = std::to_string(3 - i);
chunk->get_column_by_index(0)->append_datum(Slice("ccc" + suffix));
chunk->get_column_by_index(0)->append_datum(Slice("bbb" + suffix));
chunk->get_column_by_index(0)->append_datum(Slice("aaa" + suffix));
// Write chunk
auto ret = partition_writer->write(chunk.get());
EXPECT_EQ(ret.ok(), true);
EXPECT_GT(partition_writer->get_written_bytes(), 0);
// Flush chunk
ret = partition_writer->_spill();
EXPECT_EQ(ret.ok(), true);
Awaitility()
.timeout(3 * 1000 * 1000) // 3s
.interval(300 * 1000) // 300ms
.until([partition_writer]() {
return partition_writer->_spilling_bytes_usage.load(std::memory_order_relaxed) == 0;
});
EXPECT_EQ(partition_writer->_spilling_bytes_usage.load(std::memory_order_relaxed), 0);
EXPECT_EQ(status.ok(), true);
}
// Merge spill blocks
auto ret = partition_writer->finish();
EXPECT_EQ(ret.ok(), true);
Awaitility()
.timeout(3 * 1000 * 1000) // 3s
.interval(300 * 1000) // 300ms
.until([&commited]() { return commited; });
EXPECT_EQ(commited, true);
EXPECT_EQ(status.ok(), true);
EXPECT_EQ(writer_helper->written_rows(), 0);
EXPECT_EQ(writer_helper->result_rows(), 9);
EXPECT_EQ(writer_helper->result_chunks().size(), 1);
// Check the result order
auto result_chunk = writer_helper->result_chunks()[0];
auto column = result_chunk->get_column_by_index(0);
std::string last_row;
for (size_t i = 0; i < column->size(); ++i) {
std::string cur_row = column->get(i).get_slice().to_string();
LOG(INFO) << "(" << i << "): " << cur_row;
if (!last_row.empty()) {
EXPECT_GT(cur_row, last_row);
}
last_row = cur_row;
}
}
std::filesystem::remove_all(fs_base_path);
}
} // namespace

View File

@ -0,0 +1,145 @@
// Copyright 2021-present StarRocks, Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "connector/sink_memory_manager.h"
#include <gmock/gmock.h>
#include <gtest/gtest-param-test.h>
#include <gtest/gtest.h>
#include <future>
#include <thread>
#include "connector/connector_chunk_sink.h"
#include "connector/partition_chunk_writer.h"
#include "exec/pipeline/fragment_context.h"
#include "formats/file_writer.h"
#include "formats/parquet/parquet_test_util/util.h"
#include "formats/utils.h"
#include "testutil/assert.h"
#include "util/integer_util.h"
namespace starrocks::connector {
namespace {
using Stream = io::AsyncFlushOutputStream;
class SinkMemoryManagerTest : public ::testing::Test {
protected:
void SetUp() override {
_fragment_context = std::make_shared<pipeline::FragmentContext>();
_fragment_context->set_runtime_state(std::make_shared<RuntimeState>());
_runtime_state = _fragment_context->runtime_state();
}
void TearDown() override {}
ObjectPool _pool;
std::shared_ptr<pipeline::FragmentContext> _fragment_context;
RuntimeState* _runtime_state;
};
class MockPartitionChunkWriter : public PartitionChunkWriter {
public:
MockPartitionChunkWriter(std::string partition, std::vector<int8_t> partition_field_null_list,
const std::shared_ptr<PartitionChunkWriterContext>& ctx)
: PartitionChunkWriter(partition, partition_field_null_list, ctx) {}
MOCK_METHOD(Status, init, (), (override));
MOCK_METHOD(Status, write, (Chunk * chunk), (override));
MOCK_METHOD(Status, finish, (), (override));
MOCK_METHOD(bool, is_finished, (), (override));
MOCK_METHOD(int64_t, get_written_bytes, (), (override));
int64_t get_flushable_bytes() override { return _flushable_bytes; }
Status flush() override {
_flushed = true;
return Status::OK();
}
bool is_flushed() { return _flushed; }
void set_flushable_bytes(int64_t bytes) { _flushable_bytes = bytes; }
private:
bool _flushed = false;
int64_t _flushable_bytes = 0;
};
class MockFile : public WritableFile {
public:
MOCK_METHOD(Status, append, (const Slice& data), (override));
MOCK_METHOD(Status, appendv, (const Slice* data, size_t cnt), (override));
MOCK_METHOD(Status, pre_allocate, (uint64_t size), (override));
MOCK_METHOD(Status, close, (), (override));
MOCK_METHOD(Status, flush, (FlushMode mode), (override));
MOCK_METHOD(Status, sync, (), (override));
MOCK_METHOD(uint64_t, size, (), (const, override));
const std::string& filename() const override { return _filename; }
private:
std::string _filename = "mock_filename";
};
TEST_F(SinkMemoryManagerTest, kill_victim) {
parquet::Utils::SlotDesc slot_descs[] = {{"c1", TYPE_VARCHAR_DESC}, {"c2", TYPE_VARCHAR_DESC}, {""}};
TupleDescriptor* tuple_desc =
parquet::Utils::create_tuple_descriptor(_fragment_context->runtime_state(), &_pool, slot_descs);
auto partition_chunk_writer_ctx =
std::make_shared<SpillPartitionChunkWriterContext>(SpillPartitionChunkWriterContext{
nullptr, nullptr, 1024, false, _fragment_context.get(), tuple_desc, nullptr});
auto sink_mem_mgr = std::make_shared<SinkOperatorMemoryManager>();
std::map<PartitionKey, PartitionChunkWriterPtr> partition_chunk_writers;
std::vector<int8_t> partition_field_null_list = {};
const int64_t max_flush_bytes = 9;
for (size_t i = 0; i < 5; ++i) {
std::string partition = std::to_string(i);
auto writer = std::make_shared<MockPartitionChunkWriter>(partition, partition_field_null_list,
partition_chunk_writer_ctx);
writer->set_flushable_bytes(i);
auto out_stream = std::make_unique<Stream>(std::make_unique<MockFile>(), nullptr, nullptr);
writer->_out_stream = std::move(out_stream);
partition_chunk_writers[std::make_pair(partition, partition_field_null_list)] = writer;
}
for (size_t i = max_flush_bytes; i >= 5; --i) {
std::string partition = std::to_string(i);
auto writer = std::make_shared<MockPartitionChunkWriter>(partition, partition_field_null_list,
partition_chunk_writer_ctx);
writer->set_flushable_bytes(i);
auto out_stream = std::make_unique<Stream>(std::make_unique<MockFile>(), nullptr, nullptr);
writer->_out_stream = std::move(out_stream);
partition_chunk_writers[std::make_pair(partition, partition_field_null_list)] = writer;
}
auto commit_callback = [this](const CommitResult& r) {};
sink_mem_mgr->init(&partition_chunk_writers, nullptr, commit_callback);
EXPECT_TRUE(sink_mem_mgr->kill_victim());
for (auto& [key, writer] : partition_chunk_writers) {
auto mock_writer = std::dynamic_pointer_cast<MockPartitionChunkWriter>(writer);
if (key.first == std::to_string(max_flush_bytes)) {
EXPECT_TRUE(mock_writer->is_flushed());
} else {
EXPECT_FALSE(mock_writer->is_flushed());
}
}
}
} // namespace
} // namespace starrocks::connector

View File

@ -47,14 +47,18 @@ import com.starrocks.thrift.THdfsPartition;
import com.starrocks.thrift.TIcebergPartitionInfo;
import com.starrocks.thrift.TIcebergTable;
import com.starrocks.thrift.TPartitionMap;
import com.starrocks.thrift.TSortOrder;
import com.starrocks.thrift.TTableDescriptor;
import com.starrocks.thrift.TTableType;
import org.apache.iceberg.BaseTable;
import org.apache.iceberg.NullOrder;
import org.apache.iceberg.PartitionField;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Partitioning;
import org.apache.iceberg.Schema;
import org.apache.iceberg.SortDirection;
import org.apache.iceberg.SortField;
import org.apache.iceberg.SortOrder;
import org.apache.iceberg.types.Types;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@ -472,6 +476,23 @@ public class IcebergTable extends Table {
}
}
SortOrder sortOrder = nativeTable.sortOrder();
if (sortOrder != null && sortOrder.isSorted()) {
TSortOrder tSortOrder = new TSortOrder();
List<Integer> sortKeyIndexes = getSortKeyIndexes();
for (int idx = 0; idx < sortKeyIndexes.size(); ++idx) {
int sortKeyIndex = sortKeyIndexes.get(idx);
SortField sortField = sortOrder.fields().get(idx);
if (!sortField.transform().isIdentity()) {
continue;
}
tSortOrder.addToSort_key_idxes(sortKeyIndex);
tSortOrder.addToIs_ascs(sortField.direction() == SortDirection.ASC);
tSortOrder.addToIs_null_firsts(sortField.nullOrder() == NullOrder.NULLS_FIRST);
}
tIcebergTable.setSort_order(tSortOrder);
}
TTableDescriptor tTableDescriptor = new TTableDescriptor(id, TTableType.ICEBERG_TABLE,
fullSchema.size(), 0, catalogTableName, catalogDBName);
tTableDescriptor.setIcebergTable(tIcebergTable);

View File

@ -150,6 +150,9 @@ import com.starrocks.storagevolume.StorageVolume;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang.NotImplementedException;
import org.apache.commons.lang3.StringUtils;
import org.apache.iceberg.SortDirection;
import org.apache.iceberg.SortField;
import org.apache.iceberg.SortOrder;
import java.util.ArrayList;
import java.util.Collections;
@ -1951,6 +1954,29 @@ public class AstToStringBuilder {
createTableSql.append("\nCOMMENT (\"").append(table.getComment()).append("\")");
}
// Order by
if (table.isIcebergTable()) {
IcebergTable icebergTable = (IcebergTable) table;
SortOrder sortOrder = icebergTable.getNativeTable().sortOrder();
if (sortOrder != null && sortOrder.isSorted()) {
List<String> columnNames = table.getFullSchema().stream().map(Column::getName).collect(toList());
List<String> sortColumns = new ArrayList<>();
List<Integer> sortKeyIndexes = icebergTable.getSortKeyIndexes();
for (int idx = 0; idx < sortKeyIndexes.size(); ++idx) {
int sortKeyIndex = sortKeyIndexes.get(idx);
SortField sortField = sortOrder.fields().get(idx);
if (!sortField.transform().isIdentity()) {
continue;
}
String sortColumnName = columnNames.get(sortKeyIndex);
String sortDirection = sortField.direction() == SortDirection.ASC ? "ASC" : "DESC";
String sortNullsOrder = sortField.nullOrder().toString();
sortColumns.add(String.format("%s %s %s", sortColumnName, sortDirection, sortNullsOrder));
}
createTableSql.append("\nORDER BY (").append(String.join(",", sortColumns)).append(")");
}
}
// Properties
Map<String, String> properties = new HashMap<>();
try {

View File

@ -65,6 +65,7 @@ import com.starrocks.server.LocalMetastore;
import com.starrocks.server.MetadataMgr;
import com.starrocks.server.TemporaryTableMgr;
import com.starrocks.sql.analyzer.AnalyzeTestUtil;
import com.starrocks.sql.analyzer.AstToStringBuilder;
import com.starrocks.sql.ast.AddColumnClause;
import com.starrocks.sql.ast.AddColumnsClause;
import com.starrocks.sql.ast.AlterClause;
@ -108,7 +109,10 @@ import org.apache.iceberg.DataFile;
import org.apache.iceberg.DataFiles;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.MetricsModes;
import org.apache.iceberg.NullOrder;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.SortOrder;
import org.apache.iceberg.TableMetadata;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.TableScan;
@ -285,6 +289,78 @@ public class IcebergMetadataTest extends TableTestBase {
Assertions.assertEquals(ICEBERG, icebergTable.getType());
}
@Test
public void testShowCreateTableWithSortOrder(@Mocked IcebergHiveCatalog icebergHiveCatalog,
@Mocked HiveTableOperations hiveTableOperations) {
new Expectations() {
{
icebergHiveCatalog.getIcebergCatalogType();
result = IcebergCatalogType.HIVE_CATALOG;
minTimes = 0;
icebergHiveCatalog.getTable(connectContext, "DB", "TBL");
result = new BaseTable(hiveTableOperations, "tbl");
minTimes = 0;
}
};
new MockUp<Table>() {
@Mock
public List<Column> getFullSchema() {
return ImmutableList.of(new Column("c1", Type.INT), new Column("c2", STRING));
}
@Mock
public Table.TableType getType() {
return ICEBERG;
}
};
new MockUp<IcebergTable>() {
@Mock
public boolean isUnPartitioned() {
return true;
}
@Mock
public List<Integer> getSortKeyIndexes() {
return ImmutableList.of(0, 1);
}
};
IcebergMetadata metadata = new IcebergMetadata(CATALOG_NAME, HDFS_ENVIRONMENT, icebergHiveCatalog,
Executors.newSingleThreadExecutor(), Executors.newSingleThreadExecutor(), null);
Table actual = metadata.getTable(new ConnectContext(), "DB", "TBL");
Assertions.assertTrue(actual instanceof IcebergTable);
IcebergTable icebergTable = (IcebergTable) actual;
Schema schema = IcebergApiConverter.toIcebergApiSchema(actual.getFullSchema());
SortOrder.Builder builder = SortOrder.builderFor(schema);
builder.asc("c1", NullOrder.NULLS_FIRST);
builder.desc("c2", NullOrder.NULLS_LAST);
SortOrder sortOrder = builder.build();
org.apache.iceberg.Table nativeTable = icebergTable.getNativeTable();
new Expectations() {
{
nativeTable.sortOrder();
result = sortOrder;
minTimes = 0;
}
};
Assertions.assertEquals("db", icebergTable.getCatalogDBName());
Assertions.assertEquals("tbl", icebergTable.getCatalogTableName());
String createSql = AstToStringBuilder.getExternalCatalogTableDdlStmt(actual);
Assertions.assertEquals("CREATE TABLE `tbl` (\n" +
" `c1` int(11) DEFAULT NULL,\n" +
" `c2` varchar(1048576) DEFAULT NULL\n" +
")\n" +
"ORDER BY (c1 ASC NULLS FIRST,c2 DESC NULLS LAST);",
createSql);
}
@Test
public void testIcebergHiveCatalogTableExists(@Mocked IcebergHiveCatalog icebergHiveCatalog) {
new Expectations() {
@ -1958,4 +2034,4 @@ public class IcebergMetadataTest extends TableTestBase {
HDFS_ENVIRONMENT);
executor.execute();
}
}
}

View File

@ -25,6 +25,7 @@ import com.starrocks.connector.RemoteFileInfo;
import com.starrocks.connector.RemoteFileInfoSource;
import com.starrocks.connector.exception.StarRocksConnectorException;
import com.starrocks.connector.iceberg.IcebergAlterTableExecutor;
import com.starrocks.connector.iceberg.IcebergApiConverter;
import com.starrocks.connector.iceberg.IcebergConnectorScanRangeSource;
import com.starrocks.connector.iceberg.IcebergMORParams;
import com.starrocks.connector.iceberg.IcebergMetadata;
@ -82,10 +83,12 @@ import org.apache.iceberg.FileContent;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.FileMetadata;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.NullOrder;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.ReplacePartitions;
import org.apache.iceberg.RewriteFiles;
import org.apache.iceberg.Schema;
import org.apache.iceberg.SortOrder;
import org.apache.iceberg.TableMetadata;
import org.apache.iceberg.TableOperations;
import org.apache.iceberg.Transaction;
@ -618,10 +621,16 @@ public class IcebergScanNodeTest {
.setFullSchema(schemaColumns)
.setNativeTable(mockNativeTable)
.setIcebergProperties(Collections.singletonMap("iceberg.catalog.type", "hive"))
.build();
icebergTable.setComment("some normal comment");
Schema icebergApiSchema = IcebergApiConverter.toIcebergApiSchema(schemaColumns);
SortOrder.Builder builder = SortOrder.builderFor(icebergApiSchema);
builder.asc("col1", NullOrder.NULLS_FIRST);
SortOrder sortOrder = builder.build();
Mockito.when(mockNativeTable.sortOrder()).thenReturn(sortOrder);
List<DescriptorTable.ReferencedPartitionInfo> partitions = new ArrayList<>();
TTableDescriptor tdesc = icebergTable.toThrift(partitions);
@ -631,6 +640,7 @@ public class IcebergScanNodeTest {
Assertions.assertEquals("file:///tmp/test", tIcebergTable.getLocation());
Assertions.assertFalse(tIcebergTable.getPartition_info().isEmpty());
Assertions.assertEquals("col1_trunc", tIcebergTable.getPartition_info().get(0).getPartition_column_name());
Assertions.assertFalse(tIcebergTable.getSort_order().getSort_key_idxes().isEmpty());
}
private Schema _schema() {
@ -1255,4 +1265,4 @@ public class IcebergScanNodeTest {
// wrong method
Assertions.assertEquals(876, Stream.of(2, 3, 4, 5).reduce(1, (a, b) -> (a + 1) * (b + 1)));
}
}
}

View File

@ -528,6 +528,12 @@ struct TIcebergPartitionInfo {
4: optional Exprs.TExpr partition_expr
}
struct TSortOrder {
1: optional list<i32> sort_key_idxes
2: optional list<bool> is_ascs;
3: optional list<bool> is_null_firsts;
}
struct TIcebergTable {
// table location
1: optional string location
@ -551,6 +557,9 @@ struct TIcebergTable {
7: optional TIcebergSchema iceberg_equal_delete_schema
8: optional list<TIcebergPartitionInfo> partition_info
// Iceberg sort order, used to sort data before writing to Iceberg
9: optional TSortOrder sort_order
}
struct THudiTable {

View File

@ -7,16 +7,16 @@ insert into iceberg_sql_test_${uuid0}.iceberg_db_${uuid0}.ice_tbl_${uuid0} selec
insert into iceberg_sql_test_${uuid0}.iceberg_db_${uuid0}.ice_tbl_${uuid0} select 2;
insert into iceberg_sql_test_${uuid0}.iceberg_db_${uuid0}.ice_tbl_${uuid0} select 3;
select count(1) from files ("path"="oss://starrocks-ci-test/iceberg_db_${uuid0}/ice_tbl_${uuid0}/data","list_files_only" = "true");
select count(1) from files ("path"="oss://starrocks-ci-test/iceberg_db_${uuid0}/ice_tbl_${uuid0}/data","list_files_only" = "true") where IS_DIR=0;
insert overwrite iceberg_sql_test_${uuid0}.iceberg_db_${uuid0}.ice_tbl_${uuid0} SELECT 4;
select count(1) from files ("path"="oss://starrocks-ci-test/iceberg_db_${uuid0}/ice_tbl_${uuid0}/data","list_files_only" = "true");
select count(1) from files ("path"="oss://starrocks-ci-test/iceberg_db_${uuid0}/ice_tbl_${uuid0}/data","list_files_only" = "true") where IS_DIR=0;
alter table iceberg_sql_test_${uuid0}.iceberg_db_${uuid0}.ice_tbl_${uuid0} execute expire_snapshots(now());
alter table iceberg_sql_test_${uuid0}.iceberg_db_${uuid0}.ice_tbl_${uuid0} execute remove_orphan_files(now());
alter table iceberg_sql_test_${uuid0}.iceberg_db_${uuid0}.ice_tbl_${uuid0} execute expire_snapshots(now());
alter table iceberg_sql_test_${uuid0}.iceberg_db_${uuid0}.ice_tbl_${uuid0} execute remove_orphan_files(now());
select count(1) from files ("path"="oss://starrocks-ci-test/iceberg_db_${uuid0}/ice_tbl_${uuid0}/data","list_files_only" = "true");
select count(1) from files ("path"="oss://starrocks-ci-test/iceberg_db_${uuid0}/ice_tbl_${uuid0}/data","list_files_only" = "true") where IS_DIR=0;
drop table iceberg_sql_test_${uuid0}.iceberg_db_${uuid0}.ice_tbl_${uuid0} force;
drop database iceberg_sql_test_${uuid0}.iceberg_db_${uuid0};
drop catalog iceberg_sql_test_${uuid0};
drop catalog iceberg_sql_test_${uuid0};