[Enhancement] VacuumFull Implementation (backport #61602) (#62016)

Signed-off-by: srlch <linzichao@starrocks.com>
Co-authored-by: srlch <111035020+srlch@users.noreply.github.com>
Co-authored-by: Connor Brennan <cbrennan@pinterest.com>
This commit is contained in:
mergify[bot] 2025-08-18 03:51:37 +00:00 committed by GitHub
parent dbb3e1d5f8
commit ea8c32a0d8
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
22 changed files with 1226 additions and 187 deletions

View File

@ -35,6 +35,7 @@
#include "storage/lake/tablet.h"
#include "storage/lake/transactions.h"
#include "storage/lake/vacuum.h"
#include "storage/lake/vacuum_full.h"
#include "testutil/sync_point.h"
#include "util/brpc_stub_cache.h"
#include "util/countdown_latch.h"

View File

@ -191,6 +191,7 @@ set(STORAGE_FILES
lake/horizontal_compaction_task.cpp
lake/delta_writer.cpp
lake/vacuum.cpp
lake/vacuum_full.cpp
lake/general_tablet_writer.cpp
lake/pk_tablet_writer.cpp
lake/primary_key_compaction_policy.cpp

View File

@ -0,0 +1,129 @@
// Copyright 2021-present StarRocks, Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#pragma once
#include <future>
#include <string>
#include <unordered_map>
#include <unordered_set>
#include <vector>
#include "common/config.h"
#include "common/status.h"
namespace starrocks::lake {
std::future<Status> delete_files_callable(std::vector<std::string> files_to_delete);
// AsyncFileDeleter
// A class for asynchronously deleting files in batches.
//
// The AsyncFileDeleter class provides a mechanism to delete files in batches in an asynchronous manner.
// It allows specifying the batch size, which determines the number of files to be deleted in each batch.
class AsyncFileDeleter {
public:
using DeleteCallback = std::function<void(const std::vector<std::string>&)>;
explicit AsyncFileDeleter(int64_t batch_size) : _batch_size(batch_size) {}
explicit AsyncFileDeleter(int64_t batch_size, DeleteCallback cb) : _batch_size(batch_size), _cb(std::move(cb)) {}
virtual ~AsyncFileDeleter() = default;
virtual Status delete_file(std::string path) {
_batch.emplace_back(std::move(path));
if (_batch.size() < _batch_size) {
return Status::OK();
}
return submit(&_batch);
}
virtual Status finish() {
if (!_batch.empty()) {
RETURN_IF_ERROR(submit(&_batch));
}
return wait();
}
int64_t delete_count() const { return _delete_count; }
private:
// Wait for all submitted deletion tasks to finish and return task execution results.
Status wait() {
if (_prev_task_status.valid()) {
try {
return _prev_task_status.get();
} catch (const std::exception& e) {
return Status::InternalError(e.what());
}
} else {
return Status::OK();
}
}
Status submit(std::vector<std::string>* files_to_delete) {
// Await previous task completion before submitting a new deletion.
RETURN_IF_ERROR(wait());
_delete_count += files_to_delete->size();
if (_cb) {
_cb(*files_to_delete);
}
_prev_task_status = delete_files_callable(std::move(*files_to_delete));
files_to_delete->clear();
DCHECK(_prev_task_status.valid());
return Status::OK();
}
int64_t _batch_size;
int64_t _delete_count = 0;
std::vector<std::string> _batch;
std::future<Status> _prev_task_status;
DeleteCallback _cb;
};
// Used for delete files which are shared by multiple tablets, so we can't delete them at first.
// We need to wait for all tablets to finish and decide whether to delete them.
class AsyncBundleFileDeleter : public AsyncFileDeleter {
public:
explicit AsyncBundleFileDeleter(int64_t batch_size) : AsyncFileDeleter(batch_size) {}
Status delete_file(std::string path) override {
_pending_files[path]++;
return Status::OK();
}
Status delay_delete(std::string path) {
_delay_delete_files.insert(std::move(path));
return Status::OK();
}
Status finish() override {
for (auto& [path, count] : _pending_files) {
if (_delay_delete_files.count(path) == 0) {
if (config::lake_print_delete_log) {
LOG(INFO) << "Deleting bundle file: " << path << " ref count: " << count;
}
RETURN_IF_ERROR(AsyncFileDeleter::delete_file(path));
}
}
return AsyncFileDeleter::finish();
}
bool is_empty() const { return _pending_files.empty(); }
private:
// file to shared count.
std::unordered_map<std::string, uint32_t> _pending_files;
std::unordered_set<std::string> _delay_delete_files;
};
} // namespace starrocks::lake

View File

@ -502,6 +502,45 @@ StatusOr<BundleTabletMetadataPtr> TabletManager::parse_bundle_tablet_metadata(co
return bundle_metadata;
}
StatusOr<TabletMetadataPtrs> TabletManager::get_metas_from_bundle_tablet_metadata(const std::string& location,
FileSystem* input_fs) {
std::unique_ptr<RandomAccessFile> input_file;
RandomAccessFileOptions opts{.skip_fill_local_cache = true};
if (input_fs == nullptr) {
ASSIGN_OR_RETURN(auto fs, FileSystem::CreateSharedFromString(location));
ASSIGN_OR_RETURN(input_file, fs->new_random_access_file(opts, location));
} else {
ASSIGN_OR_RETURN(input_file, input_fs->new_random_access_file(opts, location));
}
ASSIGN_OR_RETURN(auto serialized_string, input_file->read_all());
auto file_size = serialized_string.size();
ASSIGN_OR_RETURN(auto bundle_metadata, TabletManager::parse_bundle_tablet_metadata(location, serialized_string));
TabletMetadataPtrs metadatas;
metadatas.reserve(bundle_metadata->tablet_meta_pages().size());
for (const auto& tablet_page : bundle_metadata->tablet_meta_pages()) {
const PagePointerPB& page_pointer = tablet_page.second;
auto offset = page_pointer.offset();
auto size = page_pointer.size();
RETURN_IF(offset + size > file_size,
Status::InternalError(
fmt::format("Invalid page pointer for tablet {}, offset: {}, size: {}, file size: {}",
tablet_page.first, offset, size, file_size)));
auto metadata = std::make_shared<starrocks::TabletMetadataPB>();
std::string_view metadata_str = std::string_view(serialized_string.data() + offset);
RETURN_IF(
!metadata->ParseFromArray(metadata_str.data(), size),
Status::InternalError(fmt::format("Failed to parse tablet metadata for tablet {}, offset: {}, size: {}",
tablet_page.first, offset, size)));
RETURN_IF(metadata->id() != tablet_page.first,
Status::InternalError(fmt::format("Tablet ID mismatch in bundle metadata, expected: {}, found: {}",
tablet_page.first, metadata->id())));
metadatas.push_back(std::move(metadata));
}
return metadatas;
}
DEFINE_FAIL_POINT(tablet_schema_not_found_in_bundle_metadata);
StatusOr<TabletMetadataPtr> TabletManager::get_single_tablet_metadata(int64_t tablet_id, int64_t version,
bool fill_cache, int64_t expected_gtid,

View File

@ -102,6 +102,9 @@ public:
static StatusOr<BundleTabletMetadataPtr> parse_bundle_tablet_metadata(const std::string& path,
const std::string& serialized_string);
static StatusOr<TabletMetadataPtrs> get_metas_from_bundle_tablet_metadata(const std::string& location,
FileSystem* input_fs = nullptr);
TabletMetadataPtr get_latest_cached_tablet_metadata(int64_t tablet_id);
StatusOr<TabletMetadataIter> list_tablet_metadata(int64_t tablet_id);

View File

@ -24,5 +24,6 @@ using TabletMetadata = TabletMetadataPB;
using TabletMetadataPtr = std::shared_ptr<const TabletMetadata>;
using MutableTabletMetadataPtr = std::shared_ptr<TabletMetadata>;
using BundleTabletMetadataPtr = std::shared_ptr<BundleTabletMetadataPB>;
using TabletMetadataPtrs = std::vector<TabletMetadataPtr>;
} // namespace starrocks

View File

@ -21,7 +21,6 @@
#include <string_view>
#include <unordered_map>
#include "common/config.h"
#include "common/status.h"
#include "fs/fs.h"
#include "gutil/stl_util.h"
@ -134,6 +133,8 @@ Status delete_files_with_retry(FileSystem* fs, std::span<const std::string> path
}
}
} // namespace
// Batch delete files with specified FileSystem object |fs|
Status do_delete_files(FileSystem* fs, const std::vector<std::string>& paths) {
if (UNLIKELY(paths.empty())) {
@ -176,108 +177,6 @@ Status do_delete_files(FileSystem* fs, const std::vector<std::string>& paths) {
return Status::OK();
}
// AsyncFileDeleter
// A class for asynchronously deleting files in batches.
//
// The AsyncFileDeleter class provides a mechanism to delete files in batches in an asynchronous manner.
// It allows specifying the batch size, which determines the number of files to be deleted in each batch.
class AsyncFileDeleter {
public:
using DeleteCallback = std::function<void(const std::vector<std::string>&)>;
explicit AsyncFileDeleter(int64_t batch_size) : _batch_size(batch_size) {}
explicit AsyncFileDeleter(int64_t batch_size, DeleteCallback cb) : _batch_size(batch_size), _cb(std::move(cb)) {}
virtual ~AsyncFileDeleter() = default;
virtual Status delete_file(std::string path) {
_batch.emplace_back(std::move(path));
if (_batch.size() < _batch_size) {
return Status::OK();
}
return submit(&_batch);
}
virtual Status finish() {
if (!_batch.empty()) {
RETURN_IF_ERROR(submit(&_batch));
}
return wait();
}
int64_t delete_count() const { return _delete_count; }
private:
// Wait for all submitted deletion tasks to finish and return task execution results.
Status wait() {
if (_prev_task_status.valid()) {
try {
return _prev_task_status.get();
} catch (const std::exception& e) {
return Status::InternalError(e.what());
}
} else {
return Status::OK();
}
}
Status submit(std::vector<std::string>* files_to_delete) {
// Await previous task completion before submitting a new deletion.
RETURN_IF_ERROR(wait());
_delete_count += files_to_delete->size();
if (_cb) {
_cb(*files_to_delete);
}
_prev_task_status = delete_files_callable(std::move(*files_to_delete));
files_to_delete->clear();
DCHECK(_prev_task_status.valid());
return Status::OK();
}
int64_t _batch_size;
int64_t _delete_count = 0;
std::vector<std::string> _batch;
std::future<Status> _prev_task_status;
DeleteCallback _cb;
};
// Used for delete files which are shared by multiple tablets, so we can't delete them at first.
// We need to wait for all tablets to finish and decide whether to delete them.
class AsyncBundleFileDeleter : public AsyncFileDeleter {
public:
explicit AsyncBundleFileDeleter(int64_t batch_size) : AsyncFileDeleter(batch_size) {}
Status delete_file(std::string path) override {
_pending_files[path]++;
return Status::OK();
}
Status delay_delete(std::string path) {
_delay_delete_files.insert(std::move(path));
return Status::OK();
}
Status finish() override {
for (auto& [path, count] : _pending_files) {
if (_delay_delete_files.count(path) == 0) {
if (config::lake_print_delete_log) {
LOG(INFO) << "Deleting bundle file: " << path << " ref count: " << count;
}
RETURN_IF_ERROR(AsyncFileDeleter::delete_file(path));
}
}
return AsyncFileDeleter::finish();
}
bool is_empty() const { return _pending_files.empty(); }
private:
// file to shared count.
std::unordered_map<std::string, uint32_t> _pending_files;
std::unordered_set<std::string> _delay_delete_files;
};
} // namespace
// Batch delete files with automatically derived FileSystems.
// REQUIRE: All files in |paths| have the same file system scheme.
Status delete_files(const std::vector<std::string>& paths) {
@ -601,8 +500,8 @@ static Status vacuum_tablet_metadata(TabletManager* tablet_mgr, std::string_view
return Status::OK();
}
static Status vacuum_txn_log(std::string_view root_location, int64_t min_active_txn_id, int64_t* vacuumed_files,
int64_t* vacuumed_file_size) {
Status vacuum_txn_log(std::string_view root_location, int64_t min_active_txn_id, int64_t* vacuumed_files,
int64_t* vacuumed_file_size) {
DCHECK(vacuumed_files != nullptr);
DCHECK(vacuumed_file_size != nullptr);
ASSIGN_OR_RETURN(auto fs, FileSystem::CreateSharedFromString(root_location));
@ -715,15 +614,6 @@ void vacuum(TabletManager* tablet_mgr, const VacuumRequest& request, VacuumRespo
st.to_protobuf(response->mutable_status());
}
Status vacuum_full_impl(TabletManager* tablet_mgr, const VacuumFullRequest& request, VacuumFullResponse* response) {
return Status::NotSupported("vacuum_full not implemented yet");
}
void vacuum_full(TabletManager* tablet_mgr, const VacuumFullRequest& request, VacuumFullResponse* response) {
auto st = vacuum_full_impl(tablet_mgr, request, response);
st.to_protobuf(response->mutable_status());
}
// The state of the bundle tablet meta, used to determine whether the bundle file can be deleted.
// ALL_TABLETS_TO_BE_DELETED means all tablets in this bundle tablet meta are going to be deleted,
// SOME_TABLETS_NOT_TO_BE_DELETED means some tablets in this bundle tablet meta aren't going to be deleted,
@ -1110,7 +1000,7 @@ static StatusOr<TabletMetadataPtr> get_tablet_metadata(const string& metadata_lo
return metadata;
}
static StatusOr<std::pair<std::list<std::string>, std::list<std::string>>> list_meta_files(
StatusOr<std::pair<std::list<std::string>, std::list<std::string>>> list_meta_files(
FileSystem* fs, const std::string& metadata_root_location) {
LOG(INFO) << "Start to list " << metadata_root_location;
std::list<std::string> meta_files;
@ -1150,8 +1040,9 @@ static StatusOr<std::map<std::string, DirEntry>> list_data_files(FileSystem* fs,
total_files++;
total_bytes += entry.size.value_or(0);
if (!is_segment(entry.name) &&
!is_sst(entry.name)) { // Only segment files and sst
// should consider segment files, sst, del file, delvector
if (!is_segment(entry.name) && !is_sst(entry.name) &&
!is_delvec(entry.name) && !is_del(entry.name)) {
return true;
}
if (!entry.mtime.has_value()) {
@ -1165,14 +1056,16 @@ static StatusOr<std::map<std::string, DirEntry>> list_data_files(FileSystem* fs,
return true;
})),
"Failed to list " + segment_root_location);
LOG(INFO) << "Listed all data files, total files: " << total_files << ", total bytes: " << total_bytes
<< ", candidate files: " << data_files.size();
LOG(INFO) << segment_root_location << ": Listed all data files, total files: " << total_files
<< ", total bytes: " << total_bytes << ", candidate files: " << data_files.size();
return data_files;
}
static StatusOr<std::map<std::string, DirEntry>> find_orphan_data_files(FileSystem* fs, std::string_view root_location,
int64_t expired_seconds,
std::ostream& audit_ostream) {
StatusOr<std::map<std::string, DirEntry>> find_orphan_data_files(FileSystem* fs, std::string_view root_location,
int64_t expired_seconds,
const std::list<std::string>& meta_files,
const std::list<std::string>& bundle_meta_files,
std::ostream* audit_ostream) {
const auto metadata_root_location = join_path(root_location, kMetadataDirectoryName);
const auto segment_root_location = join_path(root_location, kSegmentDirectoryName);
@ -1182,27 +1075,35 @@ static StatusOr<std::map<std::string, DirEntry>> find_orphan_data_files(FileSyst
return data_files;
}
ASSIGN_OR_RETURN(auto meta_files_and_bundle_files, list_meta_files(fs, metadata_root_location));
const auto& meta_files = std::move(meta_files_and_bundle_files.first);
const auto& bundle_meta_files = std::move(meta_files_and_bundle_files.second);
std::set<std::string> data_files_in_metadatas;
auto check_rowset = [&](const RowsetMetadata& rowset) {
for (const auto& segment : rowset.segments()) {
data_files.erase(segment);
data_files_in_metadatas.emplace(segment);
auto check_reference_files = [&](const TabletMetadataPtr& check_meta) {
for (const auto& rowset : check_meta->rowsets()) {
for (const auto& segment : rowset.segments()) {
data_files.erase(segment);
data_files_in_metadatas.emplace(segment);
}
for (const auto& del_file : rowset.del_files()) {
data_files.erase(del_file.name());
data_files_in_metadatas.emplace(del_file.name());
}
}
};
auto check_sst_meta = [&](const PersistentIndexSstableMetaPB& sst_meta) {
for (const auto& sst : sst_meta.sstables()) {
const auto& delvector_meta = check_meta->delvec_meta();
for (const auto& [_, file_meta_pb] : delvector_meta.version_to_file()) {
data_files.erase(file_meta_pb.name());
data_files_in_metadatas.emplace(file_meta_pb.name());
}
const auto& sstable_meta = check_meta->sstable_meta();
for (const auto& sst : sstable_meta.sstables()) {
data_files.erase(sst.filename());
data_files_in_metadatas.emplace(sst.filename());
}
};
if (audit_ostream) {
audit_ostream << "Total meta files: " << meta_files.size() << " bundle meta files" << bundle_meta_files.size()
<< std::endl;
(*audit_ostream) << "Total meta files: " << meta_files.size() << " bundle meta files"
<< bundle_meta_files.size() << std::endl;
}
LOG(INFO) << "Start to filter with metadatas, count: " << meta_files.size()
<< " bundle meta files: " << bundle_meta_files.size();
@ -1220,14 +1121,11 @@ static StatusOr<std::map<std::string, DirEntry>> find_orphan_data_files(FileSyst
return res.status();
}
const auto& metadata = res.value();
for (const auto& rowset : metadata->rowsets()) {
check_rowset(rowset);
}
check_sst_meta(metadata->sstable_meta());
check_reference_files(metadata);
++progress;
if (audit_ostream) {
audit_ostream << '(' << progress << '/' << meta_files.size() << ") " << name << '\n'
<< proto_to_json(*metadata) << std::endl;
(*audit_ostream) << '(' << progress << '/' << meta_files.size() << ") " << name << '\n'
<< proto_to_json(*metadata) << std::endl;
}
LOG(INFO) << "Filtered with meta file: " << name << " (" << progress << '/' << meta_files.size() << ')';
}
@ -1236,40 +1134,13 @@ static StatusOr<std::map<std::string, DirEntry>> find_orphan_data_files(FileSyst
progress = 0;
for (const auto& name : bundle_meta_files) {
auto location = join_path(metadata_root_location, name);
RandomAccessFileOptions opts{.skip_fill_local_cache = true};
ASSIGN_OR_RETURN(auto input_file, fs->new_random_access_file(opts, location));
ASSIGN_OR_RETURN(auto serialized_string, input_file->read_all());
auto file_size = serialized_string.size();
ASSIGN_OR_RETURN(auto bundle_metadata,
TabletManager::parse_bundle_tablet_metadata(location, serialized_string));
for (const auto& tablet_page : bundle_metadata->tablet_meta_pages()) {
const PagePointerPB& page_pointer = tablet_page.second;
auto offset = page_pointer.offset();
auto size = page_pointer.size();
RETURN_IF(offset + size > file_size,
Status::InternalError(
fmt::format("Invalid page pointer for tablet {}, offset: {}, size: {}, file size: {}",
tablet_page.first, offset, size, file_size)));
auto metadata = std::make_shared<starrocks::TabletMetadataPB>();
std::string_view metadata_str = std::string_view(serialized_string.data() + offset);
RETURN_IF(!metadata->ParseFromArray(metadata_str.data(), size),
Status::InternalError(
fmt::format("Failed to parse tablet metadata for tablet {}, offset: {}, size: {}",
tablet_page.first, offset, size)));
RETURN_IF(
metadata->id() != tablet_page.first,
Status::InternalError(fmt::format("Tablet ID mismatch in bundle metadata, expected: {}, found: {}",
tablet_page.first, metadata->id())));
for (const auto& rowset : metadata->rowsets()) {
check_rowset(rowset);
}
check_sst_meta(metadata->sstable_meta());
ASSIGN_OR_RETURN(auto metadatas, TabletManager::get_metas_from_bundle_tablet_metadata(location, fs));
for (const auto& metadata : metadatas) {
check_reference_files(metadata);
}
++progress;
if (audit_ostream) {
audit_ostream << '(' << progress << '/' << bundle_meta_files.size() << ") " << name << std::endl;
(*audit_ostream) << '(' << progress << '/' << bundle_meta_files.size() << ") " << name << std::endl;
}
LOG(INFO) << "Filtered with bundle meta file: " << name << " (" << progress << '/' << bundle_meta_files.size()
<< ')';
@ -1290,6 +1161,16 @@ static StatusOr<std::map<std::string, DirEntry>> find_orphan_data_files(FileSyst
return data_files;
}
static StatusOr<std::map<std::string, DirEntry>> find_orphan_data_files(FileSystem* fs, std::string_view root_location,
int64_t expired_seconds,
std::ostream& audit_ostream) {
ASSIGN_OR_RETURN(auto meta_files_and_bundle_files,
list_meta_files(fs, join_path(root_location, kMetadataDirectoryName)));
const auto& meta_files = std::move(meta_files_and_bundle_files.first);
const auto& bundle_meta_files = std::move(meta_files_and_bundle_files.second);
return find_orphan_data_files(fs, root_location, expired_seconds, meta_files, bundle_meta_files, &audit_ostream);
}
// root_location is a partition dir in s3
static StatusOr<std::pair<int64_t, int64_t>> partition_datafile_gc(std::string_view root_location,
std::string_view audit_file_path,

View File

@ -14,16 +14,18 @@
#pragma once
#include <future>
#include <string>
#include <vector>
#include <list>
#include "common/config.h"
#include "common/statusor.h"
#include "gen_cpp/lake_service.pb.h"
#include "storage/lake/async_file_deleter.h"
namespace starrocks {
class Status;
}
class FileSystem;
class DirEntry;
} // namespace starrocks
namespace starrocks::lake {
@ -31,8 +33,6 @@ class TabletManager;
void vacuum(TabletManager* tablet_mgr, const VacuumRequest& request, VacuumResponse* response);
void vacuum_full(TabletManager* tablet_mgr, const VacuumFullRequest& request, VacuumFullResponse* response);
// REQUIRES:
// - tablet_mgr != NULL
// - request.tablet_ids_size() > 0
@ -62,4 +62,18 @@ StatusOr<int64_t> datafile_gc(std::string_view root_location, std::string_view a
// Returns the number of garbage files found.
StatusOr<int64_t> garbage_file_check(std::string_view root_location);
Status vacuum_txn_log(std::string_view root_location, int64_t min_active_txn_id, int64_t* vacuumed_files,
int64_t* vacuumed_file_size);
StatusOr<std::pair<std::list<std::string>, std::list<std::string>>> list_meta_files(
FileSystem* fs, const std::string& metadata_root_location);
StatusOr<std::map<std::string, DirEntry>> find_orphan_data_files(FileSystem* fs, std::string_view root_location,
int64_t expired_seconds,
const std::list<std::string>& meta_files,
const std::list<std::string>& bundle_meta_files,
std::ostream* audit_ostream);
Status do_delete_files(FileSystem* fs, const std::vector<std::string>& paths);
} // namespace starrocks::lake

View File

@ -0,0 +1,217 @@
// Copyright 2021-present StarRocks, Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "storage/lake/vacuum_full.h"
#include <set>
#include <string_view>
#include "common/status.h"
#include "fs/fs.h"
#include "storage/lake/join_path.h"
#include "storage/lake/metacache.h"
#include "storage/lake/tablet_manager.h"
#include "storage/lake/tablet_metadata.h"
#include "storage/lake/vacuum.h"
#include "storage/protobuf_file.h"
namespace starrocks::lake {
static Status vacuum_expired_tablet_metadata(TabletManager* tablet_mgr, std::string_view root_loc,
int64_t grace_timestamp, int64_t* vacuumed_files,
std::list<std::string>* meta_files,
std::list<std::string>* bundle_meta_files,
const std::unordered_set<int64_t>& retain_versions,
int64_t min_check_version, int64_t max_check_version) {
DCHECK(tablet_mgr != nullptr);
DCHECK(meta_files != nullptr);
DCHECK(bundle_meta_files != nullptr);
if (grace_timestamp == 0) {
// no expired metadata to be vacuumed
return Status::OK();
}
auto meta_ver_checker = [&](const auto& version) {
return version >= min_check_version && version < max_check_version && !retain_versions.contains(version);
};
auto metafile_delete_cb = [=](const std::vector<std::string>& files) {
auto cache = tablet_mgr->metacache();
DCHECK(cache != nullptr);
for (const auto& path : files) {
cache->erase(path);
}
};
AsyncFileDeleter metafile_deleter(INT64_MAX, metafile_delete_cb);
std::vector<std::string> expired_metas;
std::vector<std::string> bundle_expired_metas;
const auto metadata_root_location = join_path(root_loc, kMetadataDirectoryName);
bool has_expired = false;
for (const auto& name : *meta_files) {
auto [tablet_id, version] = parse_tablet_metadata_filename(name);
if (!meta_ver_checker(version)) {
continue;
}
const string path = join_path(metadata_root_location, name);
ASSIGN_OR_RETURN(auto metadata, tablet_mgr->get_tablet_metadata(tablet_id, version, false));
if ((metadata->has_commit_time() && metadata->commit_time() < grace_timestamp) ||
/* init version does not has commit time, delete it if there is any meta has been expired. */
(has_expired && !metadata->has_commit_time())) {
LOG(INFO) << "Try delete for full vacuum: " << path;
RETURN_IF_ERROR(metafile_deleter.delete_file(path));
expired_metas.push_back(name);
has_expired = true;
}
}
for (const auto& expired_meta : expired_metas) {
meta_files->remove(expired_meta);
}
ASSIGN_OR_RETURN(auto fs, FileSystem::CreateSharedFromString(root_loc));
for (const auto& name : *bundle_meta_files) {
auto [tablet_id, version] = parse_tablet_metadata_filename(name);
if (!meta_ver_checker(version)) {
continue;
}
auto path = join_path(metadata_root_location, name);
bool need_clear = true;
ASSIGN_OR_RETURN(auto metadatas, TabletManager::get_metas_from_bundle_tablet_metadata(path, fs.get()));
// metadatas parsed from bundle files should alwasy have version >= 2
for (const auto& metadata : metadatas) {
if (metadata->commit_time() >= grace_timestamp) {
need_clear = false;
break;
}
}
if (need_clear) {
LOG(INFO) << "Try delete for full vacuum: " << path;
RETURN_IF_ERROR(metafile_deleter.delete_file(path));
bundle_expired_metas.push_back(name);
}
}
for (const auto& bundle_expired_meta : bundle_expired_metas) {
bundle_meta_files->remove(bundle_expired_meta);
}
RETURN_IF_ERROR(metafile_deleter.finish());
(*vacuumed_files) += metafile_deleter.delete_count();
return Status::OK();
}
// Deletes orphaned data files (data files which are not referenced by "relevant" metadata files)
// Data files written by transactions with txn_id >= min_active_txn_id will NOT be vacuumed
// Returns the number of files vacuumed and their total size
static StatusOr<std::pair<int64_t, int64_t>> vacuum_orphaned_datafiles(
TabletManager* tablet_mgr, std::string_view root_loc, int64_t min_active_txn_id,
const std::list<std::string>& meta_files, const std::list<std::string>& bundle_meta_files) {
DCHECK(tablet_mgr != nullptr);
const auto segment_root_location = join_path(root_loc, kSegmentDirectoryName);
ASSIGN_OR_RETURN(auto fs, FileSystem::CreateSharedFromString(root_loc));
ASSIGN_OR_RETURN(auto data_files_to_vacuum,
find_orphan_data_files(fs.get(), root_loc, 0, meta_files, bundle_meta_files, nullptr));
const auto original_size = data_files_to_vacuum.size();
std::erase_if(data_files_to_vacuum, [&](const auto& elem) {
const auto& name = elem.first;
const auto txn_id = extract_txn_id_prefix(name).value_or(0);
return txn_id == 0 || txn_id >= min_active_txn_id;
});
LOG(INFO) << segment_root_location << ": "
<< "Removed " << original_size - data_files_to_vacuum.size()
<< " data files from consideration as orphans based on txn id";
int64_t bytes_to_delete = 0;
std::vector<std::string> files_to_delete;
files_to_delete.reserve(data_files_to_vacuum.size());
for (const auto& [name, dir_entry] : data_files_to_vacuum) {
bytes_to_delete += dir_entry.size.value_or(0);
files_to_delete.push_back(join_path(segment_root_location, name));
}
LOG(INFO) << segment_root_location << ": "
<< "Start to delete orphan data files: " << data_files_to_vacuum.size()
<< ", total size: " << bytes_to_delete;
RETURN_IF_ERROR(do_delete_files(fs.get(), files_to_delete));
return std::pair<int64_t, int64_t>(files_to_delete.size(), bytes_to_delete);
}
Status vacuum_full_impl(TabletManager* tablet_mgr, const VacuumFullRequest& request, VacuumFullResponse* response) {
if (UNLIKELY(tablet_mgr == nullptr)) {
return Status::InvalidArgument("tablet_mgr is null");
}
if (UNLIKELY(request.partition_id() == 0)) {
return Status::InvalidArgument("partition_id is unset");
}
if (UNLIKELY(request.tablet_id() == 0)) {
return Status::InvalidArgument("tablet_id is unset");
}
if (UNLIKELY(request.min_active_txn_id() <= 0)) {
return Status::InvalidArgument("value of min_active_txn_id is unset or negative");
}
if (UNLIKELY(request.grace_timestamp() < 0)) {
return Status::InvalidArgument("value of grace_timestamp is unset or negative");
}
if (UNLIKELY(request.min_check_version() < 0)) {
return Status::InvalidArgument("value of min_check_version is unset or negative");
}
if (UNLIKELY(request.max_check_version() < 0)) {
return Status::InvalidArgument("value of max_check_version is unset or negative");
}
const auto grace_timestamp = request.grace_timestamp();
const auto min_active_txn_id = request.min_active_txn_id();
std::unordered_set<int64_t> retain_versions;
if (request.retain_versions_size() > 0) {
retain_versions.insert(request.retain_versions().begin(), request.retain_versions().end());
}
int64_t min_check_version = request.min_check_version();
int64_t max_check_version = request.max_check_version();
int64_t vacuumed_files = 0;
int64_t vacuumed_file_size = 0;
const std::string root_loc = tablet_mgr->tablet_root_location(request.tablet_id());
const auto metadata_root_location = join_path(root_loc, kMetadataDirectoryName);
ASSIGN_OR_RETURN(auto fs, FileSystem::CreateSharedFromString(root_loc));
ASSIGN_OR_RETURN(auto meta_files_and_bundle_files, list_meta_files(fs.get(), metadata_root_location));
auto& meta_files = meta_files_and_bundle_files.first;
auto& bundle_meta_files = meta_files_and_bundle_files.second;
// 1. Delete all metadata files associated with the given partition which have tabletmeta.commit_time < grace_timestamp
// and the checked version of tablet meta should not be found in retain_versions and should in [min_check_version, max_check_version)
RETURN_IF_ERROR(vacuum_expired_tablet_metadata(tablet_mgr, root_loc, grace_timestamp, &vacuumed_files, &meta_files,
&bundle_meta_files, retain_versions, min_check_version,
max_check_version));
// 2. Determine and delete orphaned data files. We use min_active_txn_id to filter out new data files, then we open
// any remaining metadata files and note that the data files referenced are not orphans.
ASSIGN_OR_RETURN(auto count_and_size,
vacuum_orphaned_datafiles(tablet_mgr, root_loc, min_active_txn_id, meta_files, bundle_meta_files));
vacuumed_files += count_and_size.first;
vacuumed_file_size += count_and_size.second;
// 3. deleted txn log which have txn id < min_active_txn_id
RETURN_IF_ERROR(vacuum_txn_log(root_loc, min_active_txn_id, &vacuumed_files, &vacuumed_file_size));
response->set_vacuumed_files(vacuumed_files);
response->set_vacuumed_file_size(vacuumed_file_size);
return Status::OK();
}
void vacuum_full(TabletManager* tablet_mgr, const VacuumFullRequest& request, VacuumFullResponse* response) {
auto st = vacuum_full_impl(tablet_mgr, request, response);
st.to_protobuf(response->mutable_status());
}
} // namespace starrocks::lake

View File

@ -0,0 +1,25 @@
// Copyright 2021-present StarRocks, Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#pragma once
#include "gen_cpp/lake_service.pb.h"
namespace starrocks::lake {
class TabletManager;
void vacuum_full(TabletManager* tablet_mgr, const VacuumFullRequest& request, VacuumFullResponse* response);
} // namespace starrocks::lake

View File

@ -1970,7 +1970,7 @@ TEST_F(LakeServiceTest, test_vacuum_full_null_thread_pool) {
brpc::Controller cntl;
VacuumFullRequest request;
VacuumFullResponse response;
request.add_tablet_ids(_tablet_id);
request.set_tablet_id(_tablet_id);
_lake_service.vacuum_full(&cntl, &request, &response, nullptr);
ASSERT_EQ("full vacuum thread pool is null", cntl.ErrorText());
}
@ -1986,7 +1986,7 @@ TEST_F(LakeServiceTest, test_vacuum_full_thread_pool_full) {
brpc::Controller cntl;
VacuumFullRequest request;
VacuumFullResponse response;
request.add_tablet_ids(_tablet_id);
request.set_tablet_id(_tablet_id);
_lake_service.vacuum_full(&cntl, &request, &response, nullptr);
EXPECT_FALSE(cntl.Failed()) << cntl.ErrorText();
EXPECT_EQ(TStatusCode::SERVICE_UNAVAILABLE, response.status().status_code()) << response.status().status_code();

View File

@ -28,6 +28,7 @@
#include "storage/lake/metacache.h"
#include "storage/lake/tablet_metadata.h"
#include "storage/lake/txn_log.h"
#include "storage/lake/vacuum_full.h"
#include "test_util.h"
#include "testutil/assert.h"
#include "testutil/sync_point.h"
@ -151,6 +152,248 @@ TEST_P(LakeVacuumTest, test_vacuum_1) {
}
}
// Check that vacuum_full cleans up the expected metadata files
// NOLINTNEXTLINE
TEST_P(LakeVacuumTest, test_vacuum_full) {
create_data_file("0000000000000001_27dc159f-6bfc-4a3a-9d9c-c97c10bb2e1d.dat");
create_data_file("0000000000000001_a542395a-bff5-48a7-a3a7-2ed05691b58c.dat");
create_data_file("000000000000FFFF_a542f95a-bff5-48a7-a3a7-2ed05691b58c.dat");
create_data_file("0000000000000002_a542ff5a-bff5-48a7-a3a7-2ed05691b58c.dat");
VacuumFullRequest request;
request.set_partition_id(1);
request.set_tablet_id(66600);
request.set_min_active_txn_id(10);
request.set_grace_timestamp(100);
request.add_retain_versions(3);
request.set_min_check_version(0);
request.set_max_check_version(5);
ASSERT_OK(_tablet_mgr->put_tablet_metadata(json_to_pb<TabletMetadataPB>(R"DEL(
{
"id": 66601,
"version": 5,
"rowsets": [],
"commit_time": 1
}
)DEL")));
ASSERT_OK(_tablet_mgr->put_tablet_metadata(json_to_pb<TabletMetadataPB>(R"DEL(
{
"id": 66600,
"version": 6,
"rowsets": [
{
"segments": [
"0000000000000001_27dc159f-6bfc-4a3a-9d9c-c97c10bb2e1d.dat"
],
"data_size": 4096
}
],
"commit_time": 99,
"prev_garbage_version": 3
}
)DEL")));
ASSERT_OK(_tablet_mgr->put_tablet_metadata(json_to_pb<TabletMetadataPB>(R"DEL(
{
"id": 66600,
"version": 5,
"rowsets": [],
"commit_time": 99,
"prev_garbage_version": 3
}
)DEL")));
ASSERT_OK(_tablet_mgr->put_tablet_metadata(json_to_pb<TabletMetadataPB>(R"DEL(
{
"id": 66600,
"version": 4,
"rowsets": [
{
"segments": [
"0000000000000001_a542395a-bff5-48a7-a3a7-2ed05691b58c.dat"
],
"data_size": 4096
}
],
"commit_time": 98,
"prev_garbage_version": 3
}
)DEL")));
ASSERT_OK(_tablet_mgr->put_tablet_metadata(json_to_pb<TabletMetadataPB>(R"DEL(
{
"id": 66600,
"version": 3,
"rowsets": [],
"commit_time": 97,
"prev_garbage_version": 3
}
)DEL")));
EXPECT_TRUE(file_exist(tablet_metadata_filename(66601, 5)));
EXPECT_TRUE(file_exist(tablet_metadata_filename(66600, 6)));
EXPECT_TRUE(file_exist(tablet_metadata_filename(66600, 5)));
EXPECT_TRUE(file_exist(tablet_metadata_filename(66600, 4)));
EXPECT_TRUE(file_exist(tablet_metadata_filename(66600, 3)));
VacuumFullResponse response;
vacuum_full(_tablet_mgr.get(), request, &response);
ASSERT_TRUE(response.has_status());
EXPECT_EQ(0, response.status().status_code()) << response.status().error_msgs(0);
EXPECT_EQ(1 + 2, response.vacuumed_files()); // 1 metadata, 2 data
EXPECT_TRUE(file_exist(tablet_metadata_filename(66601, 5)));
EXPECT_TRUE(file_exist(tablet_metadata_filename(66600, 6)));
EXPECT_TRUE(file_exist(tablet_metadata_filename(66600, 5)));
EXPECT_FALSE(file_exist(tablet_metadata_filename(66600, 4)));
EXPECT_TRUE(file_exist(tablet_metadata_filename(66600, 3)));
EXPECT_TRUE(file_exist("0000000000000001_27dc159f-6bfc-4a3a-9d9c-c97c10bb2e1d.dat"));
EXPECT_FALSE(file_exist("0000000000000001_a542395a-bff5-48a7-a3a7-2ed05691b58c.dat"));
EXPECT_TRUE(file_exist("000000000000FFFF_a542f95a-bff5-48a7-a3a7-2ed05691b58c.dat"));
EXPECT_FALSE(file_exist("0000000000000002_a542ff5a-bff5-48a7-a3a7-2ed05691b58c.dat"));
}
// NOLINTNEXTLINE
TEST_P(LakeVacuumTest, test_vacuum_full_with_bundle) {
create_data_file("0000000000000005_27dc159f-6bfc-4a3a-9d9c-c97c10bb2e1d.dat");
create_data_file("0000000000000005_a542395a-bff5-48a7-a3a7-2ed05691b58c.dat");
create_data_file("0000000000000004_a542f95a-bff5-48a7-a3a7-2ed05691b58c.dat");
create_data_file("0000000000000004_a542ff5a-bff5-48a7-a3a7-2ed05691b58c.dat");
VacuumFullRequest request;
request.set_partition_id(1);
request.set_tablet_id(66600);
request.set_min_active_txn_id(10);
request.set_grace_timestamp(100);
request.set_min_check_version(0);
request.set_max_check_version(7);
auto tablet_66601_v8 = json_to_pb<TabletMetadataPB>(R"DEL(
{
"id": 66601,
"version": 8,
"rowsets": [],
"commit_time": 10010
}
)DEL");
auto tablet_66600_v8 = json_to_pb<TabletMetadataPB>(R"DEL(
{
"id": 66600,
"version": 8,
"rowsets": [],
"commit_time": 10010
}
)DEL");
std::map<int64_t, TabletMetadataPB> tablet_metas_v8;
tablet_metas_v8[66601] = *tablet_66601_v8;
tablet_metas_v8[66600] = *tablet_66600_v8;
ASSERT_OK(_tablet_mgr->put_bundle_tablet_metadata(tablet_metas_v8));
auto tablet_66601_v7 = json_to_pb<TabletMetadataPB>(R"DEL(
{
"id": 66601,
"version": 7,
"rowsets": [
{
"segments": [
"0000000000000005_27dc159f-6bfc-4a3a-9d9c-c97c10bb2e1d.dat"
],
"data_size": 4096
}
],
"commit_time": 11,
"prev_garbage_version": 3
}
)DEL");
auto tablet_66600_v7 = json_to_pb<TabletMetadataPB>(R"DEL(
{
"id": 66600,
"version": 7,
"rowsets": [
{
"segments": [
"0000000000000005_a542395a-bff5-48a7-a3a7-2ed05691b58c.dat"
],
"data_size": 4096
}
],
"commit_time": 11,
"prev_garbage_version": 3
}
)DEL");
std::map<int64_t, TabletMetadataPB> tablet_metas_v7;
tablet_metas_v7[66601] = *tablet_66601_v7;
tablet_metas_v7[66600] = *tablet_66600_v7;
ASSERT_OK(_tablet_mgr->put_bundle_tablet_metadata(tablet_metas_v7));
auto tablet_66601_v6 = json_to_pb<TabletMetadataPB>(R"DEL(
{
"id": 66601,
"version": 6,
"rowsets": [
{
"segments": [
"0000000000000004_a542f95a-bff5-48a7-a3a7-2ed05691b58c.dat"
],
"data_size": 4096
}
],
"commit_time": 10,
"prev_garbage_version": 3
}
)DEL");
auto tablet_66600_v6 = json_to_pb<TabletMetadataPB>(R"DEL(
{
"id": 66600,
"version": 6,
"rowsets": [
{
"segments": [
"0000000000000004_a542ff5a-bff5-48a7-a3a7-2ed05691b58c.dat"
],
"data_size": 4096
}
],
"commit_time": 10,
"prev_garbage_version": 3
}
)DEL");
std::map<int64_t, TabletMetadataPB> tablet_metas_v6;
tablet_metas_v6[66601] = *tablet_66601_v6;
tablet_metas_v6[66600] = *tablet_66600_v6;
ASSERT_OK(_tablet_mgr->put_bundle_tablet_metadata(tablet_metas_v6));
EXPECT_TRUE(file_exist(tablet_metadata_filename(0, 6)));
EXPECT_TRUE(file_exist(tablet_metadata_filename(0, 7)));
EXPECT_TRUE(file_exist(tablet_metadata_filename(0, 8)));
VacuumFullResponse response;
vacuum_full(_tablet_mgr.get(), request, &response);
ASSERT_TRUE(response.has_status());
EXPECT_EQ(0, response.status().status_code()) << response.status().error_msgs(0);
EXPECT_EQ(3, response.vacuumed_files()); // 1 metadata, 2 data
EXPECT_FALSE(file_exist(tablet_metadata_filename(0, 6)));
EXPECT_TRUE(file_exist(tablet_metadata_filename(0, 7)));
EXPECT_TRUE(file_exist(tablet_metadata_filename(0, 8)));
EXPECT_TRUE(file_exist("0000000000000005_27dc159f-6bfc-4a3a-9d9c-c97c10bb2e1d.dat"));
EXPECT_TRUE(file_exist("0000000000000005_a542395a-bff5-48a7-a3a7-2ed05691b58c.dat"));
EXPECT_FALSE(file_exist("0000000000000004_a542f95a-bff5-48a7-a3a7-2ed05691b58c.dat"));
EXPECT_FALSE(file_exist("0000000000000004_a542ff5a-bff5-48a7-a3a7-2ed05691b58c.dat"));
}
// NOLINTNEXTLINE
TEST_P(LakeVacuumTest, test_vacuum_2) {
create_data_file("00000000000259e4_27dc159f-6bfc-4a3a-9d9c-c97c10bb2e1d.dat");

View File

@ -131,8 +131,12 @@ public class PhysicalPartition extends MetaObject implements GsonPostProcessable
*/
private long visibleTxnId = -1;
// Autovacuum
private final AtomicLong lastVacuumTime = new AtomicLong(0);
// Full vacuum (orphan data files and redundant db/table/partition)
private volatile long lastFullVacuumTime;
private final AtomicLong minRetainVersion = new AtomicLong(0);
private final AtomicLong lastSuccVacuumVersion = new AtomicLong(0);
@ -227,6 +231,14 @@ public class PhysicalPartition extends MetaObject implements GsonPostProcessable
this.lastVacuumTime.set(lastVacuumTime);
}
public long getLastFullVacuumTime() {
return lastFullVacuumTime;
}
public void setLastFullVacuumTime(long lastVacuumTime) {
this.lastFullVacuumTime = lastVacuumTime;
}
public long getMinRetainVersion() {
long retainVersion = minRetainVersion.get();
if (metadataSwitchVersion != 0) {

View File

@ -2989,6 +2989,9 @@ public class Config extends ConfigBase {
@ConfField(comment = "how many partitions can autovacuum be executed simultaneously at most")
public static int lake_autovacuum_parallel_partitions = 8;
@ConfField(comment = "how many partitions can fullvacuum execute simultaneously at most")
public static int lake_fullvacuum_parallel_partitions = 16;
@ConfField(mutable = true, comment = "the minimum delay between autovacuum runs on any given partition")
public static long lake_autovacuum_partition_naptime_seconds = 180;
@ -3009,6 +3012,12 @@ public class Config extends ConfigBase {
"Determine whether a vacuum operation needs to be initiated based on the vacuum version.\n")
public static boolean lake_autovacuum_detect_vaccumed_version = true;
@ConfField(mutable = true, comment = "the minimum delay between full vacuum runs on any given partition")
public static long lake_fullvacuum_partition_naptime_seconds = 3600L * 24L;
@ConfField(mutable = true, comment = "metadata expired time from full vacuum begin running")
public static long lake_fullvacuum_meta_expired_seconds = 3600L * 24L * 2L;
@ConfField(mutable = true, comment =
"Whether enable throttling ingestion speed when compaction score exceeds the threshold.\n" +
"Only takes effect for tables in clusters with run_mode=shared_data.")

View File

@ -0,0 +1,271 @@
// Copyright 2021-present StarRocks, Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package com.starrocks.lake.vacuum;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.starrocks.catalog.Database;
import com.starrocks.catalog.MaterializedIndex;
import com.starrocks.catalog.OlapTable;
import com.starrocks.catalog.PhysicalPartition;
import com.starrocks.catalog.Table;
import com.starrocks.catalog.Tablet;
import com.starrocks.common.Config;
import com.starrocks.common.FeConstants;
import com.starrocks.common.io.Writable;
import com.starrocks.common.util.FrontendDaemon;
import com.starrocks.common.util.concurrent.lock.LockType;
import com.starrocks.common.util.concurrent.lock.Locker;
import com.starrocks.lake.snapshot.ClusterSnapshotMgr;
import com.starrocks.proto.VacuumFullRequest;
import com.starrocks.proto.VacuumFullResponse;
import com.starrocks.rpc.BrpcProxy;
import com.starrocks.rpc.LakeService;
import com.starrocks.rpc.RpcException;
import com.starrocks.server.GlobalStateMgr;
import com.starrocks.server.WarehouseManager;
import com.starrocks.system.ComputeNode;
import org.apache.hadoop.util.BlockingThreadPoolExecutorService;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import static com.starrocks.rpc.LakeService.TIMEOUT_VACUUM_FULL;
public class FullVacuumDaemon extends FrontendDaemon implements Writable {
private static final Logger LOG = LogManager.getLogger(FullVacuumDaemon.class);
private static final long MILLISECONDS_PER_SECOND = 1000;
private final Set<Long> vacuumingPartitions = Sets.newConcurrentHashSet();
private final BlockingThreadPoolExecutorService executorService =
BlockingThreadPoolExecutorService.newInstance(Config.lake_fullvacuum_parallel_partitions, 0, TIMEOUT_VACUUM_FULL,
TimeUnit.MILLISECONDS, "fullvacuum");
public FullVacuumDaemon() {
// Check every minute if we should run a full vacuum
super("FullVacuumDaemon", 1000 * 60);
}
@Override
protected void runAfterCatalogReady() {
if (FeConstants.runningUnitTest) {
return;
}
List<Long> dbIds = GlobalStateMgr.getCurrentState().getLocalMetastore().getDbIds();
for (Long dbId : dbIds) {
Database db = GlobalStateMgr.getCurrentState().getLocalMetastore().getDb(dbId);
if (db == null) {
continue;
}
List<Table> tables = new ArrayList<>();
for (Table table : GlobalStateMgr.getCurrentState().getLocalMetastore().getTables(db.getId())) {
if (table.isCloudNativeTableOrMaterializedView()) {
tables.add(table);
}
}
// Full vacuum cleans up two types of data (orphan data files and redundant db/table/partition)
// 1. Cleanup Orphan Data Files: Data files not referenced by tablet metadata.
for (Table table : tables) {
vacuumTable(db, table);
}
// 2. Redundant db/table/partition: Dbs, tables, or partitions that have been deleted in FE but not in the object storage.
}
}
public boolean shouldVacuum(PhysicalPartition partition) {
long current = System.currentTimeMillis();
if (partition.getLastFullVacuumTime() == 0) {
// init LastFullVacuumTime
partition.setLastFullVacuumTime(current);
return false;
}
// prevent vacuum too frequent
return current >= partition.getLastFullVacuumTime() + Config.lake_fullvacuum_partition_naptime_seconds * 1000;
}
private void vacuumTable(Database db, Table baseTable) {
OlapTable table = (OlapTable) baseTable;
List<PhysicalPartition> partitions;
Locker locker = new Locker();
locker.lockTablesWithIntensiveDbLock(db.getId(), Lists.newArrayList(baseTable.getId()), LockType.READ);
try {
partitions = table.getPhysicalPartitions().stream().filter(this::shouldVacuum).toList();
} finally {
locker.unLockTablesWithIntensiveDbLock(db.getId(), Lists.newArrayList(baseTable.getId()), LockType.READ);
}
for (PhysicalPartition partition : partitions) {
if (vacuumingPartitions.add(partition.getId())) {
executorService.execute(() -> vacuumPartition(db, table, partition));
}
}
}
private void vacuumPartition(Database db, OlapTable table, PhysicalPartition partition) {
try {
vacuumPartitionImpl(db, table, partition);
} finally {
vacuumingPartitions.remove(partition.getId());
}
}
private void vacuumPartitionImpl(Database db, OlapTable table, PhysicalPartition partition) {
LOG.info("Running orphan file deletion task for table={}, partition={}.", table.getName(), partition.getId());
List<Tablet> tablets = new ArrayList<>();
long visibleVersion;
long startTime = System.currentTimeMillis();
long minActiveTxnId = computeMinActiveTxnId(db, table);
Locker locker = new Locker();
locker.lockTablesWithIntensiveDbLock(db.getId(), Lists.newArrayList(table.getId()), LockType.READ);
try {
for (MaterializedIndex index : partition.getMaterializedIndices(MaterializedIndex.IndexExtState.ALL)) {
tablets.addAll(index.getTablets());
}
visibleVersion = partition.getVisibleVersion();
} finally {
locker.unLockTablesWithIntensiveDbLock(db.getId(), Lists.newArrayList(table.getId()), LockType.READ);
}
if (visibleVersion <= 1) {
LOG.info("skipping full vacuum of partition={} because its visible version is {}", partition.getId(), visibleVersion);
partition.setLastFullVacuumTime(startTime);
return;
}
ClusterSnapshotMgr clusterSnapshotMgr = GlobalStateMgr.getCurrentState().getClusterSnapshotMgr();
WarehouseManager warehouseManager = GlobalStateMgr.getCurrentState().getWarehouseMgr();
Set<ComputeNode> involvedNodes = new HashSet<>();
Map<ComputeNode, Tablet> nodeToTablet = new HashMap<>();
for (Tablet tablet : tablets) {
ComputeNode node = warehouseManager.getComputeNodeAssignedToTablet(WarehouseManager.DEFAULT_RESOURCE, tablet.getId());
if (node == null) {
LOG.error("Could not get CN for tablet={}, returning early.", tablet.getId());
return;
}
involvedNodes.add(node);
nodeToTablet.put(node, tablet); // save any one tablet of this node
}
if (involvedNodes.isEmpty()) {
LOG.error("Could not find any CN node for full vacuum, returning early.");
return;
}
// choose a node for full vacuum by random
List<ComputeNode> involvedNodesList = involvedNodes.stream().collect(Collectors.toList());
Random random = new Random();
ComputeNode chosenNode = involvedNodesList.get(random.nextInt(involvedNodesList.size()));
VacuumFullRequest vacuumFullRequest = new VacuumFullRequest();
vacuumFullRequest.setPartitionId(partition.getId());
vacuumFullRequest.setMinActiveTxnId(minActiveTxnId);
long graceTimestamp = startTime / MILLISECONDS_PER_SECOND - Config.lake_fullvacuum_meta_expired_seconds;
graceTimestamp = Math.min(graceTimestamp,
Math.max(clusterSnapshotMgr.getSafeDeletionTimeMs() / MILLISECONDS_PER_SECOND, 1));
vacuumFullRequest.setGraceTimestamp(graceTimestamp);
List<Long> retainVersions = new ArrayList<>();
retainVersions.addAll(clusterSnapshotMgr.getVacuumRetainVersions(
db.getId(), table.getId(), partition.getParentId(), partition.getId()));
if (!retainVersions.contains(visibleVersion)) {
retainVersions.add(visibleVersion); // current visibleVersion should be retained
}
long minCheckVersion = 0;
long maxCheckVersion = visibleVersion; // always should be inited by current visibleVersion
vacuumFullRequest.setMinCheckVersion(minCheckVersion);
vacuumFullRequest.setMaxCheckVersion(maxCheckVersion);
vacuumFullRequest.setRetainVersions(retainVersions);
vacuumFullRequest.setTabletId(nodeToTablet.get(chosenNode).getId());
LOG.info(
"Sending full vacuum request to cn={}: table={}, partition={}, max_check_version={}, " + "min_active_txn_id={}",
chosenNode.getHost(), table.getName(), vacuumFullRequest.getPartitionId(), vacuumFullRequest.maxCheckVersion,
vacuumFullRequest.minActiveTxnId);
boolean hasError = false;
long vacuumedFiles = 0;
long vacuumedFileSize = 0;
long vacuumedVersion = Long.MAX_VALUE;
Future<VacuumFullResponse> responseFuture = null;
try {
LakeService service = BrpcProxy.getLakeService(chosenNode.getHost(), chosenNode.getBrpcPort());
responseFuture = service.vacuumFull(vacuumFullRequest);
} catch (RpcException e) {
LOG.error("failed to send full vacuum request for partition {}.{}.{}", db.getFullName(), table.getName(),
partition.getId(), e);
hasError = true;
}
try {
if (responseFuture != null) {
VacuumFullResponse response = responseFuture.get();
if (response.status.statusCode != 0) {
hasError = true;
LOG.warn("Vacuumed {}.{}.{} with error: {}", db.getFullName(), table.getName(), partition.getId(),
response.status.errorMsgs != null && !response.status.errorMsgs.isEmpty() ?
response.status.errorMsgs.get(0) : "");
} else {
vacuumedFiles += response.vacuumedFiles;
vacuumedFileSize += response.vacuumedFileSize;
}
}
} catch (InterruptedException e) {
LOG.warn("thread interrupted");
Thread.currentThread().interrupt();
hasError = true;
} catch (ExecutionException e) {
LOG.error("failed to full vacuum {}.{}.{}: {}", db.getFullName(), table.getName(), partition.getId(),
e.getMessage());
hasError = true;
}
partition.setLastFullVacuumTime(startTime);
LOG.info("Full vacuumed {}.{}.{} hasError={} vacuumedFiles={} vacuumedFileSize={} " +
"visibleVersion={} minActiveTxnId={} vacuumVersion={} cost={}ms", db.getFullName(), table.getName(),
partition.getId(), hasError, vacuumedFiles, vacuumedFileSize, visibleVersion, minActiveTxnId, vacuumedVersion,
System.currentTimeMillis() - startTime);
}
@VisibleForTesting
public static long computeMinActiveTxnId(Database db, Table table) {
long a = GlobalStateMgr.getCurrentState().getGlobalTransactionMgr().getMinActiveTxnIdOfDatabase(db.getId());
Optional<Long> b = GlobalStateMgr.getCurrentState().getSchemaChangeHandler().getActiveTxnIdOfTable(table.getId());
return Math.min(a, b.orElse(Long.MAX_VALUE));
}
}

View File

@ -47,6 +47,8 @@ import com.starrocks.proto.UnlockTabletMetadataRequest;
import com.starrocks.proto.UnlockTabletMetadataResponse;
import com.starrocks.proto.UploadSnapshotsRequest;
import com.starrocks.proto.UploadSnapshotsResponse;
import com.starrocks.proto.VacuumFullRequest;
import com.starrocks.proto.VacuumFullResponse;
import com.starrocks.proto.VacuumRequest;
import com.starrocks.proto.VacuumResponse;
@ -69,6 +71,7 @@ public interface LakeService {
long TIMEOUT_PUBLISH_LOG_VERSION_BATCH = MILLIS_PER_MINUTE;
long TIMEOUT_ABORT_COMPACTION = 5 * MILLIS_PER_SECOND;
long TIMEOUT_VACUUM = MILLIS_PER_HOUR;
long TIMEOUT_VACUUM_FULL = MILLIS_PER_HOUR * 24;
@ProtobufRPC(serviceName = "LakeService", methodName = "publish_version", onceTalkTimeout = TIMEOUT_PUBLISH_VERSION)
Future<PublishVersionResponse> publishVersion(PublishVersionRequest request);
@ -124,5 +127,8 @@ public interface LakeService {
@ProtobufRPC(serviceName = "LakeService", methodName = "aggregate_publish_version", onceTalkTimeout = TIMEOUT_PUBLISH_VERSION)
Future<PublishVersionResponse> aggregatePublishVersion(AggregatePublishVersionRequest request);
@ProtobufRPC(serviceName = "LakeService", methodName = "vacuum_full", onceTalkTimeout = TIMEOUT_VACUUM_FULL)
Future<VacuumFullResponse> vacuumFull(VacuumFullRequest request);
}

View File

@ -45,6 +45,8 @@ import com.starrocks.proto.UnlockTabletMetadataRequest;
import com.starrocks.proto.UnlockTabletMetadataResponse;
import com.starrocks.proto.UploadSnapshotsRequest;
import com.starrocks.proto.UploadSnapshotsResponse;
import com.starrocks.proto.VacuumFullRequest;
import com.starrocks.proto.VacuumFullResponse;
import com.starrocks.proto.VacuumRequest;
import com.starrocks.proto.VacuumResponse;
@ -169,4 +171,10 @@ public class LakeServiceWithMetrics implements LakeService {
increaseMetrics();
return lakeService.aggregatePublishVersion(request);
}
@Override
public Future<VacuumFullResponse> vacuumFull(VacuumFullRequest request) {
increaseMetrics();
return lakeService.vacuumFull(request);
}
}

View File

@ -143,6 +143,7 @@ import com.starrocks.lake.compaction.CompactionControlScheduler;
import com.starrocks.lake.compaction.CompactionMgr;
import com.starrocks.lake.snapshot.ClusterSnapshotMgr;
import com.starrocks.lake.vacuum.AutovacuumDaemon;
import com.starrocks.lake.vacuum.FullVacuumDaemon;
import com.starrocks.leader.CheckpointController;
import com.starrocks.leader.ReportHandler;
import com.starrocks.leader.TabletCollector;
@ -481,6 +482,7 @@ public class GlobalStateMgr {
private final StorageVolumeMgr storageVolumeMgr;
private AutovacuumDaemon autovacuumDaemon;
private FullVacuumDaemon fullVacuumDaemon;
private final PipeManager pipeManager;
private final PipeListener pipeListener;
@ -781,6 +783,7 @@ public class GlobalStateMgr {
this.storageVolumeMgr = new SharedDataStorageVolumeMgr();
this.autovacuumDaemon = new AutovacuumDaemon();
this.slotManager = new SlotManager(resourceUsageMonitor);
this.fullVacuumDaemon = new FullVacuumDaemon();
} else {
this.storageVolumeMgr = new SharedNothingStorageVolumeMgr();
this.slotManager = new SlotManager(resourceUsageMonitor);
@ -1461,6 +1464,7 @@ public class GlobalStateMgr {
starMgrMetaSyncer.start();
autovacuumDaemon.start();
fullVacuumDaemon.start();
}
if (Config.enable_safe_mode) {

View File

@ -16,18 +16,28 @@ package com.starrocks.lake;
import com.starrocks.catalog.Database;
import com.starrocks.catalog.GlobalStateMgrTestUtil;
import com.starrocks.catalog.MaterializedIndex;
import com.starrocks.catalog.OlapTable;
import com.starrocks.catalog.PhysicalPartition;
import com.starrocks.catalog.Table;
import com.starrocks.catalog.Tablet;
import com.starrocks.common.Config;
import com.starrocks.common.FeConstants;
import com.starrocks.common.jmockit.Deencapsulation;
import com.starrocks.lake.snapshot.ClusterSnapshotMgr;
import com.starrocks.lake.vacuum.AutovacuumDaemon;
import com.starrocks.lake.vacuum.FullVacuumDaemon;
import com.starrocks.proto.StatusPB;
import com.starrocks.proto.VacuumFullRequest;
import com.starrocks.proto.VacuumFullResponse;
import com.starrocks.proto.VacuumRequest;
import com.starrocks.proto.VacuumResponse;
import com.starrocks.qe.ConnectContext;
import com.starrocks.rpc.BrpcProxy;
import com.starrocks.rpc.LakeService;
import com.starrocks.rpc.LakeServiceWithMetrics;
import com.starrocks.server.GlobalStateMgr;
import com.starrocks.server.LocalMetastore;
import com.starrocks.server.RunMode;
import com.starrocks.server.WarehouseManager;
import com.starrocks.system.ComputeNode;
@ -35,6 +45,8 @@ import com.starrocks.utframe.StarRocksAssert;
import com.starrocks.utframe.UtFrameUtils;
import com.starrocks.warehouse.Warehouse;
import com.starrocks.warehouse.cngroup.ComputeResource;
import mockit.Mock;
import mockit.MockUp;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
@ -43,6 +55,10 @@ import org.mockito.MockedStatic;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
import static org.mockito.ArgumentMatchers.anyLong;
@ -116,7 +132,7 @@ public class VacuumTest {
warehouseManager = mock(WarehouseManager.class);
computeNode = mock(ComputeNode.class);
when(warehouseManager.getBackgroundWarehouse()).thenReturn(mock(Warehouse.class));
when(warehouseManager.getComputeNodeAssignedToTablet((ComputeResource) any(), anyLong()))
@ -124,7 +140,7 @@ public class VacuumTest {
when(computeNode.getHost()).thenReturn("localhost");
when(computeNode.getBrpcPort()).thenReturn(8080);
}
@AfterAll
@ -203,7 +219,7 @@ public class VacuumTest {
mockBrpcProxyStatic.when(() -> BrpcProxy.getLakeService(anyString(), anyInt())).thenReturn(lakeService);
autovacuumDaemon.testVacuumPartitionImpl(db, olapTable2, partition);
}
Assertions.assertEquals(5L, partition.getLastSuccVacuumVersion());
mockResponse.vacuumedVersion = 7L;
@ -256,6 +272,132 @@ public class VacuumTest {
Assertions.assertEquals(0L, partition.getMetadataSwitchVersion());
}
@Test
public void testFullVacuumBasic() {
GlobalStateMgr currentState = GlobalStateMgr.getCurrentState();
FullVacuumDaemon fullVacuumDaemon = new FullVacuumDaemon();
Set<Long> allTabletIds = new HashSet<>();
long testDbId = 0;
List<Long> dbIds = currentState.getLocalMetastore().getDbIds();
for (Long dbId : dbIds) {
Database currDb = GlobalStateMgr.getCurrentState().getLocalMetastore().getDb(dbId);
if (currDb != null && !currDb.isSystemDatabase()) {
testDbId = dbId;
break;
}
}
final Database sourceDb = GlobalStateMgr.getCurrentState().getLocalMetastore().getDb(testDbId);
for (Table tbl : sourceDb.getTables()) {
if (tbl.isOlapTable()) {
OlapTable olapTbl = (OlapTable) tbl;
for (PhysicalPartition part : olapTbl.getPhysicalPartitions()) {
part.setLastFullVacuumTime(1L);
for (MaterializedIndex index : part.getMaterializedIndices(MaterializedIndex.IndexExtState.ALL)) {
allTabletIds.addAll(index.getTablets().stream().map(Tablet::getId).toList());
}
}
}
}
new MockUp<PhysicalPartition>() {
@Mock
public long getVisibleVersion() {
return 10L;
}
};
new MockUp<Table>() {
@Mock
public boolean isCloudNativeTableOrMaterializedView() {
return true;
}
};
new MockUp<LocalMetastore>() {
@Mock
public Database getDb(long dbId) {
return sourceDb;
}
@Mock
public List<Table> getTables(Long dbId) {
return sourceDb.getTables();
}
};
new MockUp<WarehouseManager>() {
@Mock
public ComputeNode getComputeNodeAssignedToTablet(ComputeResource computeResource, long tabletId) {
return new ComputeNode();
}
};
new MockUp<BrpcProxy>() {
@Mock
public LakeService getLakeService(String host, int port) {
return new LakeServiceWithMetrics(null);
}
};
new MockUp<LakeServiceWithMetrics>() {
@Mock
public Future<VacuumFullResponse> vacuumFull(VacuumFullRequest request) {
VacuumFullResponse resp = new VacuumFullResponse();
resp.status = new StatusPB();
resp.status.statusCode = 0;
resp.vacuumedFiles = 1L;
resp.vacuumedFileSize = 1L;
return CompletableFuture.completedFuture(resp);
}
};
final StarOSAgent starOSAgent = new StarOSAgent();
final ClusterSnapshotMgr clusterSnapshotMgr = new ClusterSnapshotMgr();
final WarehouseManager curWarehouseManager = new WarehouseManager();
new MockUp<GlobalStateMgr>() {
@Mock
public ClusterSnapshotMgr getClusterSnapshotMgr() {
return clusterSnapshotMgr;
}
@Mock
public WarehouseManager getWarehouseMgr() {
return curWarehouseManager;
}
@Mock
public StarOSAgent getStarOSAgent() {
return starOSAgent;
}
};
new MockUp<VacuumFullRequest>() {
@Mock
public void setRetainVersions(List<Long> retainVersions) {
Assertions.assertEquals(1, retainVersions.size());
return;
}
};
new MockUp<ClusterSnapshotMgr>() {
@Mock
public long getSafeDeletionTimeMs() {
return 454545L;
}
};
FeConstants.runningUnitTest = false;
int oldValue1 = Config.lake_fullvacuum_parallel_partitions;
long oldValue2 = Config.lake_fullvacuum_partition_naptime_seconds;
Config.lake_fullvacuum_parallel_partitions = 1;
Config.lake_fullvacuum_partition_naptime_seconds = 0;
Deencapsulation.invoke(fullVacuumDaemon, "runAfterCatalogReady");
Config.lake_fullvacuum_partition_naptime_seconds = oldValue2;
Config.lake_fullvacuum_parallel_partitions = oldValue1;
FeConstants.runningUnitTest = true;
}
@Test
public void testLastSuccVacuumVersionUpdateFailed() throws Exception {
GlobalStateMgr currentState = GlobalStateMgr.getCurrentState();

View File

@ -92,6 +92,8 @@ import com.starrocks.proto.UnlockTabletMetadataRequest;
import com.starrocks.proto.UnlockTabletMetadataResponse;
import com.starrocks.proto.UploadSnapshotsRequest;
import com.starrocks.proto.UploadSnapshotsResponse;
import com.starrocks.proto.VacuumFullRequest;
import com.starrocks.proto.VacuumFullResponse;
import com.starrocks.proto.VacuumRequest;
import com.starrocks.proto.VacuumResponse;
import com.starrocks.rpc.LakeService;
@ -1206,6 +1208,11 @@ public class PseudoBackend {
public Future<PublishVersionResponse> aggregatePublishVersion(AggregatePublishVersionRequest request) {
return CompletableFuture.completedFuture(null);
}
@Override
public Future<VacuumFullResponse> vacuumFull(VacuumFullRequest request) {
return CompletableFuture.completedFuture(null);
}
}
static TStatus status(TStatusCode code, String msg) {

View File

@ -78,6 +78,8 @@ import com.starrocks.proto.UnlockTabletMetadataRequest;
import com.starrocks.proto.UnlockTabletMetadataResponse;
import com.starrocks.proto.UploadSnapshotsRequest;
import com.starrocks.proto.UploadSnapshotsResponse;
import com.starrocks.proto.VacuumFullRequest;
import com.starrocks.proto.VacuumFullResponse;
import com.starrocks.proto.VacuumRequest;
import com.starrocks.proto.VacuumResponse;
import com.starrocks.rpc.BrpcProxy;
@ -686,5 +688,10 @@ public class MockedBackend {
public Future<PublishVersionResponse> aggregatePublishVersion(AggregatePublishVersionRequest request) {
return CompletableFuture.completedFuture(null);
}
@Override
public Future<VacuumFullResponse> vacuumFull(VacuumFullRequest request) {
return CompletableFuture.completedFuture(null);
}
}
}

View File

@ -327,11 +327,30 @@ message VacuumResponse {
optional int64 extra_file_size = 6;
}
// 1. Before the orphaned data file cleanup starts, VacuumFull will delete metadata files which satisfy the following:
// (commit_time < grace_timestamp)
// 2. Then, orphaned data files will be determined. We use min_active_txn_id to filter out new data files, then we open
// any remaining metadata files and note that the data files referenced are not orphans.
// 3. all tabletMetadata with versions in retain_versions should be retained when try to vaccum to meta which have
// commit_time < grace_timestamp. retain_versions must contains the lastest visible version.
// 4. all tabletMetadata with versions NOT in [min_check_version, max_check_version) should not be vacuumed.
message VacuumFullRequest {
repeated int64 tablet_ids = 1;
optional int64 min_check_version = 2;
optional int64 max_check_version = 3;
optional int64 min_active_txn_id = 4;
// The id of the partition to vacuum
optional int64 partition_id = 1;
// tablet_id can be the id of any of the tablet belongs to the table to be dropped.
optional int64 tablet_id = 2;
optional int64 min_check_version = 3;
optional int64 max_check_version = 4;
// Don't delete files with txn_id >= min_active_txn_id
optional int64 min_active_txn_id = 5;
// Delete metadata files with commit_time < grace_timestamp (set by FE)
optional int64 grace_timestamp = 6;
// retain version info of tablet metadata
// The version here means the version of tablet metadata or the physical partition version
// (because vacuun request is send by <node, physical partition>)
// All tablet metadata and their data files of these verisons should be retained and can not be
// vaccummed.
repeated int64 retain_versions = 7;
}
message VacuumFullResponse {