[Refactor] Introduce a load chunk spiller and refactor the load spill memtable sink based on it. (backport #61867) (#61964)

Signed-off-by: GavinMar <yangguansuo@starrocks.com>
This commit is contained in:
Gavin 2025-08-15 14:15:31 +08:00 committed by GitHub
parent f5a74aa16d
commit 9df260eee1
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
16 changed files with 445 additions and 339 deletions

View File

@ -58,9 +58,9 @@
#include "runtime/load_channel_mgr.h"
#include "storage/compaction_manager.h"
#include "storage/lake/compaction_scheduler.h"
#include "storage/lake/load_spill_block_manager.h"
#include "storage/lake/tablet_manager.h"
#include "storage/lake/update_manager.h"
#include "storage/load_spill_block_manager.h"
#include "storage/memtable_flush_executor.h"
#include "storage/persistent_index_compaction_manager.h"
#include "storage/persistent_index_load_executor.h"

View File

@ -213,9 +213,10 @@ set(STORAGE_FILES
lake/cloud_native_index_compaction_task.cpp
lake/metacache.cpp
lake/lake_primary_key_recover.cpp
lake/load_spill_block_manager.cpp
lake/spill_mem_table_sink.cpp
lake/tablet_retain_info.cpp
load_spill_block_manager.cpp
load_chunk_spiller.cpp
sstable/block_builder.cpp
sstable/block.cpp
sstable/coding.cpp

View File

@ -22,7 +22,7 @@
#include "common/compiler_util.h"
#include "storage/lake/delta_writer.h"
#include "storage/lake/load_spill_block_manager.h"
#include "storage/load_spill_block_manager.h"
#include "storage/storage_engine.h"
#include "testutil/sync_point.h"
#include "util/stack_trace_mutex.h"

View File

@ -29,7 +29,6 @@
#include "runtime/mem_tracker.h"
#include "storage/delta_writer.h"
#include "storage/lake/filenames.h"
#include "storage/lake/load_spill_block_manager.h"
#include "storage/lake/meta_file.h"
#include "storage/lake/metacache.h"
#include "storage/lake/pk_tablet_writer.h"
@ -38,6 +37,7 @@
#include "storage/lake/tablet_manager.h"
#include "storage/lake/tablet_writer.h"
#include "storage/lake/update_manager.h"
#include "storage/load_spill_block_manager.h"
#include "storage/memtable.h"
#include "storage/memtable_sink.h"
#include "storage/primary_key_encoder.h"
@ -307,9 +307,11 @@ Status DeltaWriterImpl::build_schema_and_writer() {
!(_tablet_schema->keys_type() == KeysType::PRIMARY_KEYS &&
(!_merge_condition.empty() || is_partial_update() || _tablet_schema->has_separate_sort_key()))) {
if (_load_spill_block_mgr == nullptr || !_load_spill_block_mgr->is_initialized()) {
_load_spill_block_mgr =
std::make_unique<LoadSpillBlockManager>(UniqueId(_load_id).to_thrift(), _tablet_id, _txn_id,
_tablet_manager->tablet_root_location(_tablet_id));
_load_spill_block_mgr = std::make_unique<LoadSpillBlockManager>(
UniqueId(_load_id).to_thrift(),
UniqueId(_tablet_id, _txn_id)
.to_thrift(), // use tablet id + txn id to generate fragment instance id
_tablet_manager->tablet_root_location(_tablet_id));
RETURN_IF_ERROR(_load_spill_block_mgr->init());
}
// Init SpillMemTableSink

View File

@ -21,163 +21,42 @@
#include "runtime/runtime_state.h"
#include "storage/aggregate_iterator.h"
#include "storage/chunk_helper.h"
#include "storage/lake/load_spill_block_manager.h"
#include "storage/lake/tablet_writer.h"
#include "storage/load_spill_block_manager.h"
#include "storage/merge_iterator.h"
namespace starrocks::lake {
Status LoadSpillOutputDataStream::append(RuntimeState* state, const std::vector<Slice>& data, size_t total_write_size,
size_t write_num_rows) {
_append_rows += write_num_rows;
size_t total_size = 0;
// calculate total size
std::for_each(data.begin(), data.end(), [&](const Slice& slice) { total_size += slice.size; });
// preallocate block
RETURN_IF_ERROR(_preallocate(total_size));
// append data
auto st = _block->append(data);
if (st.is_capacity_limit_exceeded()) {
// No space left on device
// Try to acquire a new block from remote storage.
RETURN_IF_ERROR(_switch_to_remote_block(total_size));
st = _block->append(data);
}
if (st.ok()) {
_append_bytes += total_size;
}
return st;
}
Status LoadSpillOutputDataStream::flush() {
RETURN_IF_ERROR(_freeze_current_block());
return Status::OK();
}
bool LoadSpillOutputDataStream::is_remote() const {
return _block ? _block->is_remote() : false;
}
// this function will be called when local disk is full
Status LoadSpillOutputDataStream::_switch_to_remote_block(size_t block_size) {
if (_block->size() > 0) {
// Freeze current block firstly.
RETURN_IF_ERROR(_freeze_current_block());
} else {
// Release empty block.
RETURN_IF_ERROR(_block_manager->release_block(_block));
_block = nullptr;
}
// Acquire new block.
ASSIGN_OR_RETURN(_block, _block_manager->acquire_block(block_size, true /* force remote */));
return Status::OK();
}
Status LoadSpillOutputDataStream::_freeze_current_block() {
if (_block == nullptr) {
return Status::OK();
}
RETURN_IF_ERROR(_block->flush());
RETURN_IF_ERROR(_block_manager->release_block(_block));
// Save this block into block container.
_block_manager->block_container()->append_block(_block);
_block = nullptr;
return Status::OK();
}
Status LoadSpillOutputDataStream::_preallocate(size_t block_size) {
// Try to preallocate from current block first.
if (_block == nullptr || !_block->try_acquire_sizes(block_size)) {
// Freeze current block firstly.
RETURN_IF_ERROR(_freeze_current_block());
// Acquire new block.
ASSIGN_OR_RETURN(_block, _block_manager->acquire_block(block_size));
}
return Status::OK();
}
SpillMemTableSink::SpillMemTableSink(LoadSpillBlockManager* block_manager, TabletWriter* writer,
RuntimeProfile* profile) {
_block_manager = block_manager;
_load_chunk_spiller = std::make_unique<LoadChunkSpiller>(block_manager, profile);
_writer = writer;
_profile = profile;
if (_profile == nullptr) {
// use dummy profile
_dummy_profile = std::make_unique<RuntimeProfile>("dummy");
_profile = _dummy_profile.get();
}
_runtime_state = std::make_shared<RuntimeState>();
_spiller_factory = spill::make_spilled_factory();
std::string tracker_label = "LoadSpillMerge-" + std::to_string(_block_manager->tablet_id()) + "-" +
std::to_string(_block_manager->txn_id());
std::string tracker_label =
"LoadSpillMerge-" + std::to_string(writer->tablet_id()) + "-" + std::to_string(writer->txn_id());
_merge_mem_tracker = std::make_unique<MemTracker>(MemTrackerType::COMPACTION_TASK, -1, std::move(tracker_label),
GlobalEnv::GetInstance()->compaction_mem_tracker());
}
Status SpillMemTableSink::_prepare(const ChunkPtr& chunk_ptr) {
if (_spiller == nullptr) {
// 1. alloc & prepare spiller
spill::SpilledOptions options;
options.encode_level = 7;
_spiller = _spiller_factory->create(options);
RETURN_IF_ERROR(_spiller->prepare(_runtime_state.get()));
DCHECK(_profile != nullptr) << "SpillMemTableSink profile is null";
spill::SpillProcessMetrics metrics(_profile, _runtime_state->mutable_total_spill_bytes());
_spiller->set_metrics(metrics);
// 2. prepare serde
if (const_cast<spill::ChunkBuilder*>(&_spiller->chunk_builder())->chunk_schema()->empty()) {
const_cast<spill::ChunkBuilder*>(&_spiller->chunk_builder())->chunk_schema()->set_schema(chunk_ptr);
RETURN_IF_ERROR(_spiller->serde()->prepare());
}
}
return Status::OK();
}
Status SpillMemTableSink::_do_spill(const Chunk& chunk, const spill::SpillOutputDataStreamPtr& output) {
// 1. caclulate per row memory usage
const int64_t per_row_memory_usage = chunk.memory_usage() / chunk.num_rows();
const int64_t spill_rows = std::min(config::load_spill_max_chunk_bytes / (per_row_memory_usage + 1) + 1,
(int64_t)max_merge_chunk_size);
// 2. serialize chunk
for (int64_t rowid = 0; rowid < chunk.num_rows(); rowid += spill_rows) {
int64_t rows = std::min(spill_rows, (int64_t)chunk.num_rows() - rowid);
ChunkPtr each_chunk = chunk.clone_empty();
each_chunk->append(chunk, rowid, rows);
RETURN_IF_ERROR(_prepare(each_chunk));
spill::SerdeContext ctx;
RETURN_IF_ERROR(_spiller->serde()->serialize(_runtime_state.get(), ctx, each_chunk, output, true));
}
if (!_schema) {
_schema = chunk.schema();
}
return Status::OK();
}
Status SpillMemTableSink::flush_chunk(const Chunk& chunk, starrocks::SegmentPB* segment, bool eos,
int64_t* flush_data_size) {
if (eos && _block_manager->block_container()->empty()) {
if (eos && _load_chunk_spiller->empty()) {
// If there is only one flush, flush it to segment directly
RETURN_IF_ERROR(_writer->write(chunk, segment, eos));
return _writer->flush(segment);
}
if (chunk.num_rows() == 0) return Status::OK();
// 1. create new block group
_block_manager->block_container()->create_block_group();
auto output = std::make_shared<LoadSpillOutputDataStream>(_block_manager);
// 2. spill
RETURN_IF_ERROR(_do_spill(chunk, output));
// 3. flush
RETURN_IF_ERROR(output->flush());
auto res = _load_chunk_spiller->spill(chunk);
RETURN_IF_ERROR(res.status());
// record append bytes to `flush_data_size`
if (flush_data_size != nullptr) {
*flush_data_size = output->append_bytes();
*flush_data_size = res.value();
}
return Status::OK();
}
Status SpillMemTableSink::flush_chunk_with_deletes(const Chunk& upserts, const Column& deletes,
starrocks::SegmentPB* segment, bool eos, int64_t* flush_data_size) {
if (eos && _block_manager->block_container()->empty()) {
if (eos && _load_chunk_spiller->empty()) {
// If there is only one flush, flush it to segment directly
RETURN_IF_ERROR(_writer->flush_del_file(deletes));
RETURN_IF_ERROR(_writer->write(upserts, segment, eos));
@ -190,127 +69,28 @@ Status SpillMemTableSink::flush_chunk_with_deletes(const Chunk& upserts, const C
return Status::OK();
}
class BlockGroupIterator : public ChunkIterator {
public:
BlockGroupIterator(Schema schema, spill::Serde& serde, const std::vector<spill::BlockPtr>& blocks)
: ChunkIterator(std::move(schema)), _serde(serde), _blocks(blocks) {
auto& metrics = _serde.parent()->metrics();
_options.read_io_timer = metrics.read_io_timer;
_options.read_io_count = metrics.read_io_count;
_options.read_io_bytes = metrics.restore_bytes;
}
Status do_get_next(Chunk* chunk) override {
while (_block_idx < _blocks.size()) {
if (!_reader) {
_reader = _blocks[_block_idx]->get_reader(_options);
RETURN_IF_UNLIKELY(!_reader, Status::InternalError("Failed to get reader"));
}
auto st = _serde.deserialize(_ctx, _reader.get());
if (st.ok()) {
st.value()->swap_chunk(*chunk);
return Status::OK();
} else if (st.status().is_end_of_file()) {
_block_idx++;
_reader.reset();
} else {
return st.status();
}
}
return Status::EndOfFile("End of block group");
}
void close() override {}
private:
spill::Serde& _serde;
spill::SerdeContext _ctx;
spill::BlockReaderOptions _options;
std::shared_ptr<spill::BlockReader> _reader;
const std::vector<spill::BlockPtr>& _blocks;
size_t _block_idx = 0;
};
Status SpillMemTableSink::merge_blocks_to_segments() {
TEST_SYNC_POINT_CALLBACK("SpillMemTableSink::merge_blocks_to_segments", this);
SCOPED_THREAD_LOCAL_MEM_SETTER(_merge_mem_tracker.get(), false);
auto& groups = _block_manager->block_container()->block_groups();
RETURN_IF(groups.empty(), Status::OK());
MonotonicStopWatch timer;
timer.start();
RETURN_IF(_load_chunk_spiller->empty(), Status::OK());
// merge process needs to control _writer's flush behavior manually
_writer->set_auto_flush(false);
auto char_field_indexes = ChunkHelper::get_char_field_indexes(*_schema);
size_t total_blocks = 0;
size_t total_block_bytes = 0;
size_t total_merges = 0;
size_t total_rows = 0;
size_t total_chunk = 0;
std::vector<ChunkIteratorPtr> merge_inputs;
size_t current_input_bytes = 0;
auto merge_func = [&] {
total_merges++;
// PK shouldn't do agg because pk support order key different from primary key,
// in that case, data is sorted by order key and cannot be aggregated by primary key
bool do_agg = _schema->keys_type() == KeysType::AGG_KEYS || _schema->keys_type() == KeysType::UNIQUE_KEYS;
auto tmp_itr = new_heap_merge_iterator(merge_inputs);
auto merge_itr = do_agg ? new_aggregate_iterator(tmp_itr) : tmp_itr;
RETURN_IF_ERROR(merge_itr->init_encoded_schema(EMPTY_GLOBAL_DICTMAPS));
auto chunk_shared_ptr = ChunkHelper::new_chunk(*_schema, config::vector_chunk_size);
auto chunk = chunk_shared_ptr.get();
while (true) {
chunk->reset();
auto st = merge_itr->get_next(chunk);
if (st.is_end_of_file()) {
break;
} else if (st.ok()) {
ChunkHelper::padding_char_columns(char_field_indexes, *_schema, _writer->tablet_schema(), chunk);
total_rows += chunk->num_rows();
total_chunk++;
RETURN_IF_ERROR(_writer->write(*chunk, nullptr));
} else {
return st;
}
}
merge_itr->close();
return _writer->flush();
SchemaPtr schema = _load_chunk_spiller->schema();
bool do_agg = schema->keys_type() == KeysType::AGG_KEYS || schema->keys_type() == KeysType::UNIQUE_KEYS;
auto char_field_indexes = ChunkHelper::get_char_field_indexes(*schema);
auto write_func = [&char_field_indexes, schema, this](Chunk* chunk) {
ChunkHelper::padding_char_columns(char_field_indexes, *schema, _writer->tablet_schema(), chunk);
return _writer->write(*chunk, nullptr);
};
for (size_t i = 0; i < groups.size(); ++i) {
auto& group = groups[i];
// We need to stop merging if:
// 1. The current input block group size exceed the load_spill_max_merge_bytes,
// because we don't want to generate too large segment file.
// 2. The input chunks memory usage exceed the load_spill_max_merge_bytes,
// because we don't want each thread cost too much memory.
if (merge_inputs.size() > 0 &&
(current_input_bytes + group.data_size() >= config::load_spill_max_merge_bytes ||
merge_inputs.size() * config::load_spill_max_chunk_bytes >= config::load_spill_max_merge_bytes)) {
RETURN_IF_ERROR(merge_func());
merge_inputs.clear();
current_input_bytes = 0;
}
merge_inputs.push_back(std::make_shared<BlockGroupIterator>(*_schema, *_spiller->serde(), group.blocks()));
current_input_bytes += group.data_size();
total_block_bytes += group.data_size();
total_blocks += group.blocks().size();
}
if (merge_inputs.size() > 0) {
RETURN_IF_ERROR(merge_func());
}
timer.stop();
auto duration_ms = timer.elapsed_time() / 1000000;
LOG(INFO) << fmt::format(
"SpillMemTableSink merge finished, txn:{} tablet:{} blockgroups:{} blocks:{} input_bytes:{} merges:{} "
"rows:{} chunks:{} duration:{}ms",
_block_manager->txn_id(), _block_manager->tablet_id(), groups.size(), total_blocks, total_block_bytes,
total_merges, total_rows, total_chunk, duration_ms);
ADD_COUNTER(_profile, "SpillMergeInputGroups", TUnit::UNIT)->update(groups.size());
ADD_COUNTER(_profile, "SpillMergeInputBytes", TUnit::BYTES)->update(total_block_bytes);
ADD_COUNTER(_profile, "SpillMergeCount", TUnit::UNIT)->update(total_merges);
ADD_COUNTER(_profile, "SpillMergeDurationNs", TUnit::TIME_NS)->update(duration_ms * 1000000);
return Status::OK();
auto flush_func = [this]() { return _writer->flush(); };
Status st = _load_chunk_spiller->merge_write(config::load_spill_max_merge_bytes, true /* do_sort */, do_agg,
write_func, flush_func);
LOG_IF(WARNING, !st.ok()) << fmt::format(
"SpillMemTableSink merge blocks to segment failed, txn:{} tablet:{} msg:{}", _writer->txn_id(),
_writer->tablet_id(), st.message());
return st;
}
int64_t SpillMemTableSink::txn_id() {
@ -321,4 +101,4 @@ int64_t SpillMemTableSink::tablet_id() {
return _writer->tablet_id();
}
} // namespace starrocks::lake
} // namespace starrocks::lake

View File

@ -14,49 +14,19 @@
#pragma once
#include "exec/spill/block_manager.h"
#include "exec/spill/data_stream.h"
#include "exec/spill/spiller_factory.h"
#include "storage/load_chunk_spiller.h"
#include "storage/memtable_sink.h"
#include "util/runtime_profile.h"
namespace starrocks {
class RuntimeState;
class LoadSpillBlockManager;
namespace lake {
class LoadSpillBlockManager;
class TabletWriter;
class LoadSpillOutputDataStream : public spill::SpillOutputDataStream {
public:
LoadSpillOutputDataStream(LoadSpillBlockManager* block_manager) : _block_manager(block_manager) {}
Status append(RuntimeState* state, const std::vector<Slice>& data, size_t total_write_size,
size_t write_num_rows) override;
Status flush() override;
bool is_remote() const override;
int64_t append_bytes() const { return _append_bytes; }
private:
Status _preallocate(size_t block_size);
// Freeze current block and append it to block container
Status _freeze_current_block();
// Switch to remote block when local disk is full
Status _switch_to_remote_block(size_t block_size);
private:
LoadSpillBlockManager* _block_manager = nullptr;
spill::BlockPtr _block;
int64_t _append_bytes = 0;
};
class SpillMemTableSink : public MemTableSink {
public:
SpillMemTableSink(LoadSpillBlockManager* block_manager, TabletWriter* writer, RuntimeProfile* profile);
@ -71,29 +41,17 @@ public:
Status merge_blocks_to_segments();
spill::Spiller* get_spiller() { return _spiller.get(); }
spill::Spiller* get_spiller() { return _load_chunk_spiller->spiller().get(); }
int64_t txn_id() override;
int64_t tablet_id() override;
private:
Status _prepare(const ChunkPtr& chunk_ptr);
Status _do_spill(const Chunk& chunk, const spill::SpillOutputDataStreamPtr& output);
private:
LoadSpillBlockManager* _block_manager = nullptr;
TabletWriter* _writer;
// destroy spiller before runtime_state
std::shared_ptr<RuntimeState> _runtime_state;
// used when input profile is nullptr
std::unique_ptr<RuntimeProfile> _dummy_profile;
RuntimeProfile* _profile = nullptr;
spill::SpillerFactoryPtr _spiller_factory;
std::shared_ptr<spill::Spiller> _spiller;
SchemaPtr _schema;
// used for spill merge, parent trakcer is compaction tracker
std::unique_ptr<MemTracker> _merge_mem_tracker = nullptr;
std::unique_ptr<LoadChunkSpiller> _load_chunk_spiller = nullptr;
};
} // namespace lake
} // namespace starrocks
} // namespace starrocks

View File

@ -0,0 +1,281 @@
// Copyright 2021-present StarRocks, Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "storage/load_chunk_spiller.h"
#include "exec/spill/options.h"
#include "exec/spill/serde.h"
#include "exec/spill/spiller.h"
#include "exec/spill/spiller_factory.h"
#include "runtime/runtime_state.h"
#include "storage/aggregate_iterator.h"
#include "storage/chunk_helper.h"
#include "storage/load_spill_block_manager.h"
#include "storage/merge_iterator.h"
#include "storage/union_iterator.h"
namespace starrocks {
Status LoadSpillOutputDataStream::append(RuntimeState* state, const std::vector<Slice>& data, size_t total_write_size,
size_t write_num_rows) {
_append_rows += write_num_rows;
size_t total_size = 0;
// calculate total size
std::for_each(data.begin(), data.end(), [&](const Slice& slice) { total_size += slice.size; });
// preallocate block
RETURN_IF_ERROR(_preallocate(total_size));
// append data
auto st = _block->append(data);
if (st.is_capacity_limit_exceeded()) {
// No space left on device
// Try to acquire a new block from remote storage.
RETURN_IF_ERROR(_switch_to_remote_block(total_size));
st = _block->append(data);
}
if (st.ok()) {
_append_bytes += total_size;
}
return st;
}
Status LoadSpillOutputDataStream::flush() {
RETURN_IF_ERROR(_freeze_current_block());
return Status::OK();
}
bool LoadSpillOutputDataStream::is_remote() const {
return _block ? _block->is_remote() : false;
}
// this function will be called when local disk is full
Status LoadSpillOutputDataStream::_switch_to_remote_block(size_t block_size) {
if (_block->size() > 0) {
// Freeze current block firstly.
RETURN_IF_ERROR(_freeze_current_block());
} else {
// Release empty block.
RETURN_IF_ERROR(_block_manager->release_block(_block));
_block = nullptr;
}
// Acquire new block.
ASSIGN_OR_RETURN(_block, _block_manager->acquire_block(block_size, true /* force remote */));
return Status::OK();
}
Status LoadSpillOutputDataStream::_freeze_current_block() {
if (_block == nullptr) {
return Status::OK();
}
RETURN_IF_ERROR(_block->flush());
RETURN_IF_ERROR(_block_manager->release_block(_block));
// Save this block into block container.
_block_manager->block_container()->append_block(_block);
_block = nullptr;
return Status::OK();
}
Status LoadSpillOutputDataStream::_preallocate(size_t block_size) {
// Try to preallocate from current block first.
if (_block == nullptr || !_block->try_acquire_sizes(block_size)) {
// Freeze current block firstly.
RETURN_IF_ERROR(_freeze_current_block());
// Acquire new block.
ASSIGN_OR_RETURN(_block, _block_manager->acquire_block(block_size));
}
return Status::OK();
}
LoadChunkSpiller::LoadChunkSpiller(LoadSpillBlockManager* block_manager, RuntimeProfile* profile)
: _block_manager(block_manager), _profile(profile) {
if (_profile == nullptr) {
// use dummy profile
_dummy_profile = std::make_unique<RuntimeProfile>("dummy");
_profile = _dummy_profile.get();
}
_runtime_state = std::make_shared<RuntimeState>();
_spiller_factory = spill::make_spilled_factory();
}
StatusOr<size_t> LoadChunkSpiller::spill(const Chunk& chunk) {
if (chunk.num_rows() == 0) return 0;
// 1. create new block group
_block_manager->block_container()->create_block_group();
auto output = std::make_shared<LoadSpillOutputDataStream>(_block_manager);
// 2. spill
RETURN_IF_ERROR(_do_spill(chunk, output));
// 3. flush
RETURN_IF_ERROR(output->flush());
return output->append_bytes();
}
Status LoadChunkSpiller::_prepare(const ChunkPtr& chunk_ptr) {
if (_spiller == nullptr) {
// 1. alloc & prepare spiller
spill::SpilledOptions options;
options.encode_level = 7;
_spiller = _spiller_factory->create(options);
RETURN_IF_ERROR(_spiller->prepare(_runtime_state.get()));
DCHECK(_profile != nullptr) << "LoadChunkSpiller profile is null";
spill::SpillProcessMetrics metrics(_profile, _runtime_state->mutable_total_spill_bytes());
_spiller->set_metrics(metrics);
// 2. prepare serde
if (const_cast<spill::ChunkBuilder*>(&_spiller->chunk_builder())->chunk_schema()->empty()) {
const_cast<spill::ChunkBuilder*>(&_spiller->chunk_builder())->chunk_schema()->set_schema(chunk_ptr);
RETURN_IF_ERROR(_spiller->serde()->prepare());
}
}
return Status::OK();
}
Status LoadChunkSpiller::_do_spill(const Chunk& chunk, const spill::SpillOutputDataStreamPtr& output) {
// 1. caclulate per row memory usage
const int64_t per_row_memory_usage = chunk.memory_usage() / chunk.num_rows();
const int64_t spill_rows = std::min(config::load_spill_max_chunk_bytes / (per_row_memory_usage + 1) + 1,
(int64_t)max_merge_chunk_size);
// 2. serialize chunk
for (int64_t rowid = 0; rowid < chunk.num_rows(); rowid += spill_rows) {
int64_t rows = std::min(spill_rows, (int64_t)chunk.num_rows() - rowid);
ChunkPtr each_chunk = chunk.clone_empty();
each_chunk->append(chunk, rowid, rows);
RETURN_IF_ERROR(_prepare(each_chunk));
spill::SerdeContext ctx;
RETURN_IF_ERROR(_spiller->serde()->serialize(_runtime_state.get(), ctx, each_chunk, output, true));
}
if (!_schema) {
_schema = chunk.schema();
}
return Status::OK();
}
class BlockGroupIterator : public ChunkIterator {
public:
BlockGroupIterator(Schema schema, spill::Serde& serde, const std::vector<spill::BlockPtr>& blocks)
: ChunkIterator(std::move(schema)), _serde(serde), _blocks(blocks) {
auto& metrics = _serde.parent()->metrics();
_options.read_io_timer = metrics.read_io_timer;
_options.read_io_count = metrics.read_io_count;
_options.read_io_bytes = metrics.restore_bytes;
}
Status do_get_next(Chunk* chunk) override {
while (_block_idx < _blocks.size()) {
if (!_reader) {
_reader = _blocks[_block_idx]->get_reader(_options);
RETURN_IF_UNLIKELY(!_reader, Status::InternalError("Failed to get reader"));
}
auto st = _serde.deserialize(_ctx, _reader.get());
if (st.ok()) {
st.value()->swap_chunk(*chunk);
return Status::OK();
} else if (st.status().is_end_of_file()) {
_block_idx++;
_reader.reset();
} else {
return st.status();
}
}
return Status::EndOfFile("End of block group");
}
void close() override {}
private:
spill::Serde& _serde;
spill::SerdeContext _ctx;
spill::BlockReaderOptions _options;
std::shared_ptr<spill::BlockReader> _reader;
const std::vector<spill::BlockPtr>& _blocks;
size_t _block_idx = 0;
};
Status LoadChunkSpiller::merge_write(size_t target_size, bool do_sort, bool do_agg,
std::function<Status(Chunk*)> write_func, std::function<Status()> flush_func) {
auto& groups = _block_manager->block_container()->block_groups();
RETURN_IF(groups.empty(), Status::OK());
MonotonicStopWatch timer;
timer.start();
size_t total_blocks = 0;
size_t total_block_bytes = 0;
size_t total_merges = 0;
size_t total_rows = 0;
size_t total_chunk = 0;
std::vector<ChunkIteratorPtr> merge_inputs;
size_t current_input_bytes = 0;
auto merge_func = [&] {
total_merges++;
auto tmp_itr = do_sort ? new_heap_merge_iterator(merge_inputs) : new_union_iterator(merge_inputs);
auto merge_itr = do_agg ? new_aggregate_iterator(tmp_itr) : tmp_itr;
RETURN_IF_ERROR(merge_itr->init_encoded_schema(EMPTY_GLOBAL_DICTMAPS));
auto chunk_shared_ptr = ChunkHelper::new_chunk(*_schema, config::vector_chunk_size);
auto chunk = chunk_shared_ptr.get();
while (true) {
chunk->reset();
auto st = merge_itr->get_next(chunk);
if (st.is_end_of_file()) {
break;
} else if (st.ok()) {
total_rows += chunk->num_rows();
total_chunk++;
RETURN_IF_ERROR(write_func(chunk));
} else {
return st;
}
}
merge_itr->close();
return flush_func();
};
for (size_t i = 0; i < groups.size(); ++i) {
auto& group = groups[i];
// We need to stop merging if:
// 1. The current input block group size exceed the target_size,
// because we don't want to generate too large segment file.
// 2. The input chunks memory usage exceed the load_spill_max_merge_bytes,
// because we don't want each thread cost too much memory.
if (merge_inputs.size() > 0 && (current_input_bytes + group.data_size() >= target_size ||
merge_inputs.size() * config::load_spill_max_chunk_bytes >= target_size)) {
RETURN_IF_ERROR(merge_func());
merge_inputs.clear();
current_input_bytes = 0;
}
merge_inputs.push_back(std::make_shared<BlockGroupIterator>(*_schema, *_spiller->serde(), group.blocks()));
current_input_bytes += group.data_size();
total_block_bytes += group.data_size();
total_blocks += group.blocks().size();
}
if (merge_inputs.size() > 0) {
RETURN_IF_ERROR(merge_func());
}
timer.stop();
auto duration_ms = timer.elapsed_time() / 1000000;
LOG(INFO) << fmt::format(
"LoadChunkSpiller merge finished, load_id:{} fragment_instance_id:{} blockgroups:{} blocks:{} "
"input_bytes:{} merges:{} rows:{} chunks:{} duration:{}ms",
(std::ostringstream() << _block_manager->load_id()).str(),
(std::ostringstream() << _block_manager->fragment_instance_id()).str(), groups.size(), total_blocks,
total_block_bytes, total_merges, total_rows, total_chunk, duration_ms);
ADD_COUNTER(_profile, "SpillMergeInputGroups", TUnit::UNIT)->update(groups.size());
ADD_COUNTER(_profile, "SpillMergeInputBytes", TUnit::BYTES)->update(total_block_bytes);
ADD_COUNTER(_profile, "SpillMergeCount", TUnit::UNIT)->update(total_merges);
ADD_COUNTER(_profile, "SpillMergeDurationNs", TUnit::TIME_NS)->update(duration_ms * 1000000);
return Status::OK();
}
bool LoadChunkSpiller::empty() {
return _block_manager->block_container()->empty();
}
} // namespace starrocks

View File

@ -0,0 +1,90 @@
// Copyright 2021-present StarRocks, Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#pragma once
#include "exec/spill/block_manager.h"
#include "exec/spill/data_stream.h"
#include "exec/spill/spiller_factory.h"
#include "util/runtime_profile.h"
namespace starrocks {
class RuntimeState;
class LoadSpillBlockManager;
class LoadSpillOutputDataStream : public spill::SpillOutputDataStream {
public:
LoadSpillOutputDataStream(LoadSpillBlockManager* block_manager) : _block_manager(block_manager) {}
Status append(RuntimeState* state, const std::vector<Slice>& data, size_t total_write_size,
size_t write_num_rows) override;
Status flush() override;
bool is_remote() const override;
int64_t append_bytes() const { return _append_bytes; }
private:
Status _preallocate(size_t block_size);
// Freeze current block and append it to block container
Status _freeze_current_block();
// Switch to remote block when local disk is full
Status _switch_to_remote_block(size_t block_size);
private:
LoadSpillBlockManager* _block_manager = nullptr;
spill::BlockPtr _block;
int64_t _append_bytes = 0;
};
class LoadChunkSpiller {
public:
LoadChunkSpiller(LoadSpillBlockManager* block_manager, RuntimeProfile* profile);
~LoadChunkSpiller() = default;
StatusOr<size_t> spill(const Chunk& chunk);
Status merge_write(size_t target_size, bool do_sort, bool do_agg, std::function<Status(Chunk*)> write_func,
std::function<Status()> flush_func);
bool empty();
std::shared_ptr<spill::Spiller> spiller() { return _spiller; }
SchemaPtr schema() { return _schema; }
private:
Status _prepare(const ChunkPtr& chunk_ptr);
Status _do_spill(const Chunk& chunk, const spill::SpillOutputDataStreamPtr& output);
private:
LoadSpillBlockManager* _block_manager = nullptr;
// destroy spiller before runtime_state
std::shared_ptr<RuntimeState> _runtime_state;
// used when input profile is nullptr
std::unique_ptr<RuntimeProfile> _dummy_profile;
RuntimeProfile* _profile = nullptr;
spill::SpillerFactoryPtr _spiller_factory;
std::shared_ptr<spill::Spiller> _spiller;
SchemaPtr _schema;
// used for spill merge, parent trakcer is compaction tracker
std::unique_ptr<MemTracker> _merge_mem_tracker = nullptr;
};
} // namespace starrocks

View File

@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
#include "storage/lake/load_spill_block_manager.h"
#include "storage/load_spill_block_manager.h"
#include <vector>
@ -24,7 +24,7 @@
#include "runtime/exec_env.h"
#include "util/threadpool.h"
namespace starrocks::lake {
namespace starrocks {
static int calc_max_merge_blocks_thread() {
#ifndef BE_TEST
@ -117,8 +117,7 @@ Status LoadSpillBlockManager::init() {
StatusOr<spill::BlockPtr> LoadSpillBlockManager::acquire_block(size_t block_size, bool force_remote) {
spill::AcquireBlockOptions opts;
opts.query_id = _load_id; // load id as query id
opts.fragment_instance_id =
UniqueId(_tablet_id, _txn_id).to_thrift(); // use tablet id + txn id to generate fragment instance id
opts.fragment_instance_id = _fragment_instance_id;
opts.plan_node_id = 0;
opts.name = "load_spill";
opts.block_size = block_size;
@ -131,4 +130,4 @@ Status LoadSpillBlockManager::release_block(spill::BlockPtr block) {
return _block_manager->release_block(std::move(block));
}
} // namespace starrocks::lake
} // namespace starrocks

View File

@ -23,8 +23,6 @@ namespace starrocks {
class ThreadPoolToken;
namespace lake {
class LoadSpillBlockMergeExecutor {
public:
LoadSpillBlockMergeExecutor() {}
@ -60,18 +58,15 @@ private:
class LoadSpillBlockManager {
public:
// Constructor that initializes the LoadSpillBlockManager with a query ID and remote spill path.
LoadSpillBlockManager(const TUniqueId& load_id, int64_t tablet_id, int64_t txn_id,
LoadSpillBlockManager(const TUniqueId& load_id, const TUniqueId& fragment_instance_id,
const std::string& remote_spill_path)
: _load_id(load_id), _tablet_id(tablet_id), _txn_id(txn_id) {
: _load_id(load_id), _fragment_instance_id(fragment_instance_id) {
_remote_spill_path = remote_spill_path + "/load_spill";
}
// Default destructor.
~LoadSpillBlockManager();
int64_t tablet_id() const { return _tablet_id; }
int64_t txn_id() const { return _txn_id; }
// Initializes the LoadSpillBlockManager.
Status init();
@ -87,10 +82,13 @@ public:
bool has_spill_block() const { return _block_container != nullptr && !_block_container->empty(); }
const TUniqueId& load_id() const { return _load_id; }
const TUniqueId& fragment_instance_id() const { return _fragment_instance_id; }
private:
TUniqueId _load_id; // Unique ID for the load.
int64_t _tablet_id; // ID for the tablet.
int64_t _txn_id; // ID for the transaction.
TUniqueId _fragment_instance_id; // Unique ID for the fragment instance.
std::string _remote_spill_path; // Path for remote spill storage.
std::unique_ptr<spill::DirManager> _remote_dir_manager; // Manager for remote directories.
std::unique_ptr<spill::BlockManager> _block_manager; // Manager for blocks.
@ -98,5 +96,4 @@ private:
bool _initialized = false; // Whether the manager is initialized.
};
} // namespace lake
} // namespace starrocks
} // namespace starrocks

View File

@ -59,8 +59,8 @@
#include "storage/compaction_manager.h"
#include "storage/data_dir.h"
#include "storage/dictionary_cache_manager.h"
#include "storage/lake/load_spill_block_manager.h"
#include "storage/lake/local_pk_index_manager.h"
#include "storage/load_spill_block_manager.h"
#include "storage/memtable_flush_executor.h"
#include "storage/publish_version_manager.h"
#include "storage/replication_txn_manager.h"
@ -235,7 +235,7 @@ Status StorageEngine::_open(const EngineOptions& options) {
async_delta_writer,
static_cast<bthreads::ThreadPoolExecutor*>(_async_delta_writer_executor.get())->get_thread_pool());
_load_spill_block_merge_executor = std::make_unique<lake::LoadSpillBlockMergeExecutor>();
_load_spill_block_merge_executor = std::make_unique<LoadSpillBlockMergeExecutor>();
RETURN_IF_ERROR(_load_spill_block_merge_executor->init());
REGISTER_THREAD_POOL_METRICS(load_spill_block_merge, _load_spill_block_merge_executor->get_thread_pool());

View File

@ -71,7 +71,6 @@ class Executor;
namespace starrocks::lake {
class LocalPkIndexManager;
class LoadSpillBlockMergeExecutor;
} // namespace starrocks::lake
namespace starrocks {
@ -85,6 +84,7 @@ class UpdateManager;
class CompactionManager;
class PublishVersionManager;
class DictionaryCacheManager;
class LoadSpillBlockMergeExecutor;
class SegmentFlushExecutor;
class SegmentReplicateExecutor;
@ -236,9 +236,7 @@ public:
bthread::Executor* async_delta_writer_executor() { return _async_delta_writer_executor.get(); }
lake::LoadSpillBlockMergeExecutor* load_spill_block_merge_executor() {
return _load_spill_block_merge_executor.get();
}
LoadSpillBlockMergeExecutor* load_spill_block_merge_executor() { return _load_spill_block_merge_executor.get(); }
MemTableFlushExecutor* memtable_flush_executor() { return _memtable_flush_executor.get(); }
@ -496,7 +494,7 @@ private:
std::unique_ptr<bthread::Executor> _async_delta_writer_executor;
std::unique_ptr<lake::LoadSpillBlockMergeExecutor> _load_spill_block_merge_executor;
std::unique_ptr<LoadSpillBlockMergeExecutor> _load_spill_block_merge_executor;
std::unique_ptr<MemTableFlushExecutor> _memtable_flush_executor;

View File

@ -304,8 +304,8 @@ set(EXEC_FILES
./storage/lake/compaction_scheduler_test.cpp
./storage/lake/compaction_task_context_test.cpp
./storage/lake/lake_primary_key_consistency_test.cpp
./storage/lake/load_spill_block_manager_test.cpp
./storage/lake/spill_mem_table_sink_test.cpp
./storage/load_spill_block_manager_test.cpp
./storage/rowset_update_state_test.cpp
./storage/rowset_column_update_state_test.cpp
./storage/rowset_column_partial_update_test.cpp

View File

@ -28,9 +28,9 @@
#include "storage/chunk_helper.h"
#include "storage/lake/fixed_location_provider.h"
#include "storage/lake/join_path.h"
#include "storage/lake/load_spill_block_manager.h"
#include "storage/lake/tablet_manager.h"
#include "storage/lake/txn_log.h"
#include "storage/load_spill_block_manager.h"
#include "storage/rowset/segment.h"
#include "storage/rowset/segment_options.h"
#include "storage/tablet_schema.h"

View File

@ -27,10 +27,10 @@
#include "exec/spill/spiller.h"
#include "fs/fs.h"
#include "storage/lake/general_tablet_writer.h"
#include "storage/lake/load_spill_block_manager.h"
#include "storage/lake/pk_tablet_writer.h"
#include "storage/lake/tablet_metadata.h"
#include "storage/lake/test_util.h"
#include "storage/load_spill_block_manager.h"
#include "storage/tablet_schema.h"
#include "testutil/assert.h"
#include "util/raw_container.h"
@ -88,7 +88,7 @@ TEST_F(SpillMemTableSinkTest, test_flush_chunk) {
int64_t tablet_id = 1;
int64_t txn_id = 1;
std::unique_ptr<LoadSpillBlockManager> block_manager =
std::make_unique<LoadSpillBlockManager>(TUniqueId(), tablet_id, txn_id, kTestDir);
std::make_unique<LoadSpillBlockManager>(TUniqueId(), UniqueId(tablet_id, txn_id).to_thrift(), kTestDir);
ASSERT_OK(block_manager->init());
std::unique_ptr<TabletWriter> tablet_writer = std::make_unique<HorizontalGeneralTabletWriter>(
_tablet_mgr.get(), tablet_id, _tablet_schema, txn_id, false);
@ -128,7 +128,7 @@ TEST_F(SpillMemTableSinkTest, test_flush_chunk_with_deletes) {
int64_t tablet_id = 1;
int64_t txn_id = 1;
std::unique_ptr<LoadSpillBlockManager> block_manager =
std::make_unique<LoadSpillBlockManager>(TUniqueId(), tablet_id, txn_id, kTestDir);
std::make_unique<LoadSpillBlockManager>(TUniqueId(), UniqueId(tablet_id, txn_id).to_thrift(), kTestDir);
ASSERT_OK(block_manager->init());
std::unique_ptr<TabletWriter> tablet_writer = std::make_unique<HorizontalPkTabletWriter>(
_tablet_mgr.get(), tablet_id, _tablet_schema, txn_id, nullptr, false);
@ -166,7 +166,7 @@ TEST_F(SpillMemTableSinkTest, test_flush_chunk2) {
int64_t tablet_id = 1;
int64_t txn_id = 1;
std::unique_ptr<LoadSpillBlockManager> block_manager =
std::make_unique<LoadSpillBlockManager>(TUniqueId(), tablet_id, txn_id, kTestDir);
std::make_unique<LoadSpillBlockManager>(TUniqueId(), UniqueId(tablet_id, txn_id).to_thrift(), kTestDir);
ASSERT_OK(block_manager->init());
std::unique_ptr<TabletWriter> tablet_writer = std::make_unique<HorizontalGeneralTabletWriter>(
_tablet_mgr.get(), tablet_id, _tablet_schema, txn_id, false);
@ -181,7 +181,7 @@ TEST_F(SpillMemTableSinkTest, test_flush_chunk_with_delete2) {
int64_t tablet_id = 1;
int64_t txn_id = 1;
std::unique_ptr<LoadSpillBlockManager> block_manager =
std::make_unique<LoadSpillBlockManager>(TUniqueId(), tablet_id, txn_id, kTestDir);
std::make_unique<LoadSpillBlockManager>(TUniqueId(), UniqueId(tablet_id, txn_id).to_thrift(), kTestDir);
ASSERT_OK(block_manager->init());
std::unique_ptr<TabletWriter> tablet_writer = std::make_unique<HorizontalPkTabletWriter>(
_tablet_mgr.get(), tablet_id, _tablet_schema, txn_id, nullptr, false);
@ -196,7 +196,7 @@ TEST_F(SpillMemTableSinkTest, test_flush_chunk_with_limit) {
int64_t tablet_id = 1;
int64_t txn_id = 1;
std::unique_ptr<LoadSpillBlockManager> block_manager =
std::make_unique<LoadSpillBlockManager>(TUniqueId(), tablet_id, txn_id, kTestDir);
std::make_unique<LoadSpillBlockManager>(TUniqueId(), UniqueId(tablet_id, txn_id).to_thrift(), kTestDir);
ASSERT_OK(block_manager->init());
std::unique_ptr<TabletWriter> tablet_writer = std::make_unique<HorizontalGeneralTabletWriter>(
_tablet_mgr.get(), tablet_id, _tablet_schema, txn_id, false);
@ -239,7 +239,7 @@ TEST_F(SpillMemTableSinkTest, test_merge) {
int64_t tablet_id = 1;
int64_t txn_id = 1;
std::unique_ptr<LoadSpillBlockManager> block_manager =
std::make_unique<LoadSpillBlockManager>(TUniqueId(), tablet_id, txn_id, kTestDir);
std::make_unique<LoadSpillBlockManager>(TUniqueId(), UniqueId(tablet_id, txn_id).to_thrift(), kTestDir);
ASSERT_OK(block_manager->init());
std::unique_ptr<TabletWriter> tablet_writer = std::make_unique<HorizontalGeneralTabletWriter>(
_tablet_mgr.get(), tablet_id, _tablet_schema, txn_id, false);
@ -264,7 +264,7 @@ TEST_F(SpillMemTableSinkTest, test_out_of_disk_space) {
int64_t tablet_id = 1;
int64_t txn_id = 1;
std::unique_ptr<LoadSpillBlockManager> block_manager =
std::make_unique<LoadSpillBlockManager>(TUniqueId(), tablet_id, txn_id, kTestDir);
std::make_unique<LoadSpillBlockManager>(TUniqueId(), UniqueId(tablet_id, txn_id).to_thrift(), kTestDir);
ASSERT_OK(block_manager->init());
std::unique_ptr<TabletWriter> tablet_writer = std::make_unique<HorizontalGeneralTabletWriter>(
_tablet_mgr.get(), tablet_id, _tablet_schema, txn_id, false);
@ -278,4 +278,4 @@ TEST_F(SpillMemTableSinkTest, test_out_of_disk_space) {
ASSERT_EQ(1, tablet_writer->files().size());
}
} // namespace starrocks::lake
} // namespace starrocks::lake

View File

@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
#include "storage/lake/load_spill_block_manager.h"
#include "storage/load_spill_block_manager.h"
#include <gtest/gtest.h>
@ -21,7 +21,7 @@
#include "util/raw_container.h"
#include "util/runtime_profile.h"
namespace starrocks::lake {
namespace starrocks {
class LoadSpillBlockManagerTest : public ::testing::Test {
public:
@ -35,7 +35,7 @@ protected:
TEST_F(LoadSpillBlockManagerTest, test_basic) {
std::unique_ptr<LoadSpillBlockManager> block_manager =
std::make_unique<LoadSpillBlockManager>(TUniqueId(), 1, 1, kTestDir);
std::make_unique<LoadSpillBlockManager>(TUniqueId(), TUniqueId(), kTestDir);
ASSERT_OK(block_manager->init());
ASSIGN_OR_ABORT(auto block, block_manager->acquire_block(1024));
ASSERT_OK(block_manager->release_block(block));
@ -43,7 +43,7 @@ TEST_F(LoadSpillBlockManagerTest, test_basic) {
TEST_F(LoadSpillBlockManagerTest, test_write_read) {
std::unique_ptr<LoadSpillBlockManager> block_manager =
std::make_unique<LoadSpillBlockManager>(TUniqueId(), 1, 1, kTestDir);
std::make_unique<LoadSpillBlockManager>(TUniqueId(), TUniqueId(), kTestDir);
ASSERT_OK(block_manager->init());
ASSIGN_OR_ABORT(auto block, block_manager->acquire_block(1024));
ASSERT_OK(block->append({Slice("hello"), Slice("world")}));
@ -57,4 +57,4 @@ TEST_F(LoadSpillBlockManagerTest, test_write_read) {
ASSERT_OK(block_manager->release_block(block));
}
} // namespace starrocks::lake
} // namespace starrocks