[Refactor] Split the LocalCacheEngine interface into LocalMemCacheEngine and LocalDiskCacheEngine (#63734)
Signed-off-by: trueeyu <lxhhust350@qq.com>
This commit is contained in:
parent
0d5d2c76e9
commit
df0faf1b26
|
|
@ -849,7 +849,7 @@ void* ReportDataCacheMetricsTaskWorkerPool::_worker_thread_callback(void* arg_th
|
|||
|
||||
TDataCacheMetrics t_metrics{};
|
||||
// TODO: mem_metrics + disk_metrics
|
||||
const LocalCacheEngine* cache = DataCache::GetInstance()->local_disk_cache();
|
||||
const LocalDiskCacheEngine* cache = DataCache::GetInstance()->local_disk_cache();
|
||||
if (cache != nullptr && cache->is_initialized()) {
|
||||
const auto metrics = cache->cache_metrics();
|
||||
DataCacheUtils::set_metrics_from_thrift(t_metrics, metrics);
|
||||
|
|
|
|||
|
|
@ -36,7 +36,7 @@ BlockCache::~BlockCache() {
|
|||
(void)shutdown();
|
||||
}
|
||||
|
||||
Status BlockCache::init(const BlockCacheOptions& options, std::shared_ptr<LocalCacheEngine> local_cache,
|
||||
Status BlockCache::init(const BlockCacheOptions& options, std::shared_ptr<LocalDiskCacheEngine> local_cache,
|
||||
std::shared_ptr<RemoteCacheEngine> remote_cache) {
|
||||
_block_size = std::min(options.block_size, MAX_BLOCK_SIZE);
|
||||
_local_cache = std::move(local_cache);
|
||||
|
|
|
|||
|
|
@ -16,7 +16,7 @@
|
|||
|
||||
#include <atomic>
|
||||
|
||||
#include "cache/local_cache_engine.h"
|
||||
#include "cache/local_disk_cache_engine.h"
|
||||
#include "cache/remote_cache_engine.h"
|
||||
#include "common/status.h"
|
||||
|
||||
|
|
@ -33,7 +33,7 @@ public:
|
|||
~BlockCache();
|
||||
|
||||
// Init the block cache instance
|
||||
Status init(const BlockCacheOptions& options, std::shared_ptr<LocalCacheEngine> local_cache,
|
||||
Status init(const BlockCacheOptions& options, std::shared_ptr<LocalDiskCacheEngine> local_cache,
|
||||
std::shared_ptr<RemoteCacheEngine> remote_cache);
|
||||
|
||||
// Write data buffer to cache, the `offset` must be aligned by block size
|
||||
|
|
@ -73,15 +73,14 @@ public:
|
|||
bool is_initialized() const { return _initialized.load(std::memory_order_relaxed); }
|
||||
|
||||
bool available() const { return is_initialized() && _local_cache->available(); }
|
||||
bool mem_cache_available() const { return is_initialized() && _local_cache->mem_cache_available(); }
|
||||
|
||||
std::shared_ptr<LocalCacheEngine> local_cache() { return _local_cache; }
|
||||
std::shared_ptr<LocalDiskCacheEngine> local_cache() { return _local_cache; }
|
||||
|
||||
static const size_t MAX_BLOCK_SIZE;
|
||||
|
||||
private:
|
||||
size_t _block_size = 0;
|
||||
std::shared_ptr<LocalCacheEngine> _local_cache;
|
||||
std::shared_ptr<LocalDiskCacheEngine> _local_cache;
|
||||
std::shared_ptr<RemoteCacheEngine> _remote_cache;
|
||||
std::atomic<bool> _initialized = false;
|
||||
};
|
||||
|
|
|
|||
|
|
@ -44,11 +44,6 @@ Status DataCache::init(const std::vector<StorePath>& store_paths) {
|
|||
_block_cache = std::make_shared<BlockCache>();
|
||||
_page_cache = std::make_shared<StoragePageCache>();
|
||||
|
||||
#if defined(WITH_STARCACHE)
|
||||
_local_disk_cache_engine = "starcache";
|
||||
#endif
|
||||
_local_mem_cache_engine = "lrucache";
|
||||
|
||||
if (!config::datacache_enable) {
|
||||
config::disable_storage_page_cache = true;
|
||||
config::block_cache_enable = false;
|
||||
|
|
@ -173,89 +168,89 @@ BlockCacheOptions DataCache::_init_block_cache_options() {
|
|||
return cache_options;
|
||||
}
|
||||
|
||||
#if defined(WITH_STARCACHE)
|
||||
StatusOr<DiskCacheOptions> DataCache::_init_disk_cache_options() {
|
||||
DiskCacheOptions cache_options;
|
||||
|
||||
if (_local_disk_cache_engine == "starcache") {
|
||||
#ifdef USE_STAROS
|
||||
std::vector<string> corresponding_starlet_dirs;
|
||||
if (config::datacache_unified_instance_enable && !config::starlet_cache_dir.empty()) {
|
||||
// in older versions, users might set `starlet_cache_dir` instead of `storage_root_path` for starlet cache,
|
||||
// we need to move starlet cache into storage_root_path/datacache
|
||||
auto s = DataCacheUtils::get_corresponding_starlet_cache_dir(_store_paths, config::starlet_cache_dir);
|
||||
if (!s.ok()) {
|
||||
LOG(WARNING) << s.status().message() << ", change config::datacache_unified_instance_enable to false";
|
||||
config::datacache_unified_instance_enable = false;
|
||||
} else {
|
||||
corresponding_starlet_dirs = *s;
|
||||
}
|
||||
std::vector<string> corresponding_starlet_dirs;
|
||||
if (config::datacache_unified_instance_enable && !config::starlet_cache_dir.empty()) {
|
||||
// in older versions, users might set `starlet_cache_dir` instead of `storage_root_path` for starlet cache,
|
||||
// we need to move starlet cache into storage_root_path/datacache
|
||||
auto s = DataCacheUtils::get_corresponding_starlet_cache_dir(_store_paths, config::starlet_cache_dir);
|
||||
if (!s.ok()) {
|
||||
LOG(WARNING) << s.status().message() << ", change config::datacache_unified_instance_enable to false";
|
||||
config::datacache_unified_instance_enable = false;
|
||||
} else {
|
||||
corresponding_starlet_dirs = *s;
|
||||
}
|
||||
int idx = 0;
|
||||
#endif
|
||||
|
||||
for (auto& root_path : _store_paths) {
|
||||
// Because we have unified the datacache between datalake and starlet, we also need to unify the
|
||||
// cache path and quota.
|
||||
// To reuse the old cache data in `starlet_cache` directory, we try to rename it to the new `datacache`
|
||||
// directory if it exists. To avoid the risk of cross disk renaming of a large amount of cached data,
|
||||
// we do not automatically rename it when the source and destination directories are on different disks.
|
||||
// In this case, users should manually remount the directories and restart them.
|
||||
std::string datacache_path = root_path.path + "/datacache";
|
||||
#ifdef USE_STAROS
|
||||
if (config::datacache_unified_instance_enable) {
|
||||
std::string starlet_cache_path;
|
||||
if (idx < corresponding_starlet_dirs.size()) {
|
||||
starlet_cache_path = corresponding_starlet_dirs[idx++];
|
||||
} else {
|
||||
starlet_cache_path = root_path.path + "/starlet_cache/star_cache";
|
||||
}
|
||||
RETURN_IF_ERROR(DataCacheUtils::change_disk_path(starlet_cache_path, datacache_path));
|
||||
}
|
||||
#endif
|
||||
// Create it if not exist
|
||||
Status st = FileSystem::Default()->create_dir_if_missing(datacache_path);
|
||||
if (!st.ok()) {
|
||||
LOG(ERROR) << "Fail to create datacache directory: " << datacache_path << ", reason: " << st.message();
|
||||
return Status::InternalError("Fail to create datacache directory");
|
||||
}
|
||||
|
||||
ASSIGN_OR_RETURN(int64_t disk_size, DataCacheUtils::parse_conf_datacache_disk_size(
|
||||
datacache_path, config::datacache_disk_size, -1));
|
||||
#ifdef USE_STAROS
|
||||
// If the `datacache_disk_size` is manually set a positive value, we will use the maximum cache quota between
|
||||
// dataleke and starlet cache as the quota of the unified cache. Otherwise, the cache quota will remain zero
|
||||
// and then automatically adjusted based on the current avalible disk space.
|
||||
if (config::datacache_unified_instance_enable &&
|
||||
(!config::enable_datacache_disk_auto_adjust || disk_size > 0)) {
|
||||
ASSIGN_OR_RETURN(
|
||||
int64_t starlet_cache_size,
|
||||
DataCacheUtils::parse_conf_datacache_disk_size(
|
||||
datacache_path, fmt::format("{}%", config::starlet_star_cache_disk_size_percent), -1));
|
||||
disk_size = std::max(disk_size, starlet_cache_size);
|
||||
}
|
||||
#endif
|
||||
cache_options.dir_spaces.push_back({.path = datacache_path, .size = static_cast<size_t>(disk_size)});
|
||||
}
|
||||
|
||||
if (cache_options.dir_spaces.empty()) {
|
||||
config::enable_datacache_disk_auto_adjust = false;
|
||||
}
|
||||
|
||||
cache_options.block_size = config::datacache_block_size;
|
||||
cache_options.max_flying_memory_mb = config::datacache_max_flying_memory_mb;
|
||||
cache_options.max_concurrent_inserts = config::datacache_max_concurrent_inserts;
|
||||
cache_options.enable_checksum = config::datacache_checksum_enable;
|
||||
cache_options.enable_direct_io = config::datacache_direct_io_enable;
|
||||
cache_options.enable_tiered_cache = config::datacache_tiered_cache_enable;
|
||||
cache_options.skip_read_factor = config::datacache_skip_read_factor;
|
||||
cache_options.scheduler_threads_per_cpu = config::datacache_scheduler_threads_per_cpu;
|
||||
cache_options.enable_datacache_persistence = config::datacache_persistence_enable;
|
||||
cache_options.inline_item_count_limit = config::datacache_inline_item_count_limit;
|
||||
cache_options.eviction_policy = config::datacache_eviction_policy;
|
||||
}
|
||||
int idx = 0;
|
||||
#endif
|
||||
|
||||
for (auto& root_path : _store_paths) {
|
||||
// Because we have unified the datacache between datalake and starlet, we also need to unify the
|
||||
// cache path and quota.
|
||||
// To reuse the old cache data in `starlet_cache` directory, we try to rename it to the new `datacache`
|
||||
// directory if it exists. To avoid the risk of cross disk renaming of a large amount of cached data,
|
||||
// we do not automatically rename it when the source and destination directories are on different disks.
|
||||
// In this case, users should manually remount the directories and restart them.
|
||||
std::string datacache_path = root_path.path + "/datacache";
|
||||
#ifdef USE_STAROS
|
||||
if (config::datacache_unified_instance_enable) {
|
||||
std::string starlet_cache_path;
|
||||
if (idx < corresponding_starlet_dirs.size()) {
|
||||
starlet_cache_path = corresponding_starlet_dirs[idx++];
|
||||
} else {
|
||||
starlet_cache_path = root_path.path + "/starlet_cache/star_cache";
|
||||
}
|
||||
RETURN_IF_ERROR(DataCacheUtils::change_disk_path(starlet_cache_path, datacache_path));
|
||||
}
|
||||
#endif
|
||||
// Create it if not exist
|
||||
Status st = FileSystem::Default()->create_dir_if_missing(datacache_path);
|
||||
if (!st.ok()) {
|
||||
LOG(ERROR) << "Fail to create datacache directory: " << datacache_path << ", reason: " << st.message();
|
||||
return Status::InternalError("Fail to create datacache directory");
|
||||
}
|
||||
|
||||
ASSIGN_OR_RETURN(int64_t disk_size, DataCacheUtils::parse_conf_datacache_disk_size(
|
||||
datacache_path, config::datacache_disk_size, -1));
|
||||
#ifdef USE_STAROS
|
||||
// If the `datacache_disk_size` is manually set a positive value, we will use the maximum cache quota between
|
||||
// dataleke and starlet cache as the quota of the unified cache. Otherwise, the cache quota will remain zero
|
||||
// and then automatically adjusted based on the current avalible disk space.
|
||||
if (config::datacache_unified_instance_enable &&
|
||||
(!config::enable_datacache_disk_auto_adjust || disk_size > 0)) {
|
||||
ASSIGN_OR_RETURN(
|
||||
int64_t starlet_cache_size,
|
||||
DataCacheUtils::parse_conf_datacache_disk_size(
|
||||
datacache_path, fmt::format("{}%", config::starlet_star_cache_disk_size_percent), -1));
|
||||
disk_size = std::max(disk_size, starlet_cache_size);
|
||||
}
|
||||
#endif
|
||||
cache_options.dir_spaces.push_back({.path = datacache_path, .size = static_cast<size_t>(disk_size)});
|
||||
}
|
||||
|
||||
if (cache_options.dir_spaces.empty()) {
|
||||
config::enable_datacache_disk_auto_adjust = false;
|
||||
}
|
||||
|
||||
cache_options.block_size = config::datacache_block_size;
|
||||
cache_options.max_flying_memory_mb = config::datacache_max_flying_memory_mb;
|
||||
cache_options.max_concurrent_inserts = config::datacache_max_concurrent_inserts;
|
||||
cache_options.enable_checksum = config::datacache_checksum_enable;
|
||||
cache_options.enable_direct_io = config::datacache_direct_io_enable;
|
||||
cache_options.enable_tiered_cache = config::datacache_tiered_cache_enable;
|
||||
cache_options.skip_read_factor = config::datacache_skip_read_factor;
|
||||
cache_options.scheduler_threads_per_cpu = config::datacache_scheduler_threads_per_cpu;
|
||||
cache_options.enable_datacache_persistence = config::datacache_persistence_enable;
|
||||
cache_options.inline_item_count_limit = config::datacache_inline_item_count_limit;
|
||||
cache_options.eviction_policy = config::datacache_eviction_policy;
|
||||
|
||||
return cache_options;
|
||||
}
|
||||
#endif
|
||||
|
||||
static bool parse_resource_str(const string& str, string* value) {
|
||||
if (!str.empty()) {
|
||||
|
|
|
|||
|
|
@ -15,7 +15,8 @@
|
|||
#pragma once
|
||||
|
||||
#include "cache/block_cache/block_cache.h"
|
||||
#include "cache/local_cache_engine.h"
|
||||
#include "cache/local_disk_cache_engine.h"
|
||||
#include "cache/local_mem_cache_engine.h"
|
||||
#include "common/status.h"
|
||||
|
||||
namespace starrocks {
|
||||
|
|
@ -39,16 +40,13 @@ public:
|
|||
|
||||
void try_release_resource_before_core_dump();
|
||||
|
||||
void set_local_mem_cache(std::shared_ptr<LocalCacheEngine> local_mem_cache) {
|
||||
_local_mem_cache = std::move(local_mem_cache);
|
||||
}
|
||||
void set_local_disk_cache(std::shared_ptr<LocalCacheEngine> local_disk_cache) {
|
||||
void set_local_disk_cache(std::shared_ptr<LocalDiskCacheEngine> local_disk_cache) {
|
||||
_local_disk_cache = std::move(local_disk_cache);
|
||||
}
|
||||
void set_page_cache(std::shared_ptr<StoragePageCache> page_cache) { _page_cache = std::move(page_cache); }
|
||||
|
||||
LocalCacheEngine* local_mem_cache() { return _local_mem_cache.get(); }
|
||||
LocalCacheEngine* local_disk_cache() { return _local_disk_cache.get(); }
|
||||
LocalMemCacheEngine* local_mem_cache() { return _local_mem_cache.get(); }
|
||||
LocalDiskCacheEngine* local_disk_cache() { return _local_disk_cache.get(); }
|
||||
BlockCache* block_cache() const { return _block_cache.get(); }
|
||||
void set_block_cache(std::shared_ptr<BlockCache> block_cache) { _block_cache = std::move(block_cache); }
|
||||
StoragePageCache* page_cache() const { return _page_cache.get(); }
|
||||
|
|
@ -63,11 +61,11 @@ public:
|
|||
|
||||
private:
|
||||
StatusOr<MemCacheOptions> _init_mem_cache_options();
|
||||
StatusOr<DiskCacheOptions> _init_disk_cache_options();
|
||||
RemoteCacheOptions _init_remote_cache_options();
|
||||
BlockCacheOptions _init_block_cache_options();
|
||||
|
||||
#if defined(WITH_STARCACHE)
|
||||
StatusOr<DiskCacheOptions> _init_disk_cache_options();
|
||||
Status _init_starcache_engine(DiskCacheOptions* cache_options);
|
||||
Status _init_peer_cache(const RemoteCacheOptions& cache_options);
|
||||
#endif
|
||||
|
|
@ -78,10 +76,8 @@ private:
|
|||
std::vector<StorePath> _store_paths;
|
||||
|
||||
// cache engine
|
||||
std::string _local_mem_cache_engine;
|
||||
std::string _local_disk_cache_engine;
|
||||
std::shared_ptr<LocalCacheEngine> _local_mem_cache;
|
||||
std::shared_ptr<LocalCacheEngine> _local_disk_cache;
|
||||
std::shared_ptr<LocalMemCacheEngine> _local_mem_cache;
|
||||
std::shared_ptr<LocalDiskCacheEngine> _local_disk_cache;
|
||||
std::shared_ptr<RemoteCacheEngine> _remote_cache;
|
||||
|
||||
std::shared_ptr<BlockCache> _block_cache;
|
||||
|
|
|
|||
|
|
@ -15,7 +15,7 @@
|
|||
#pragma once
|
||||
|
||||
#include "cache/cache_metrics.h"
|
||||
#include "cache/local_cache_engine.h"
|
||||
#include "cache/local_disk_cache_engine.h"
|
||||
#include "gen_cpp/DataCache_types.h"
|
||||
#include "storage/options.h"
|
||||
|
||||
|
|
|
|||
|
|
@ -224,10 +224,10 @@ dev_t DiskSpace::FileSystemWrapper::device_id(const std::string& path) {
|
|||
return DataCacheUtils::disk_device_id(path);
|
||||
}
|
||||
|
||||
DiskSpaceMonitor::DiskSpaceMonitor(LocalCacheEngine* cache)
|
||||
DiskSpaceMonitor::DiskSpaceMonitor(LocalDiskCacheEngine* cache)
|
||||
: _cache(cache), _fs(std::make_shared<DiskSpace::FileSystemWrapper>()) {}
|
||||
|
||||
DiskSpaceMonitor::DiskSpaceMonitor(LocalCacheEngine* cache, std::shared_ptr<DiskSpace::FileSystemWrapper> fs)
|
||||
DiskSpaceMonitor::DiskSpaceMonitor(LocalDiskCacheEngine* cache, std::shared_ptr<DiskSpace::FileSystemWrapper> fs)
|
||||
: _cache(cache), _fs(std::move(fs)) {}
|
||||
|
||||
DiskSpaceMonitor::~DiskSpaceMonitor() {
|
||||
|
|
|
|||
|
|
@ -20,7 +20,7 @@
|
|||
#include <unordered_map>
|
||||
|
||||
#include "cache/cache_options.h"
|
||||
#include "cache/local_cache_engine.h"
|
||||
#include "cache/local_disk_cache_engine.h"
|
||||
#include "common/status.h"
|
||||
#include "fs/fs.h"
|
||||
#include "util/disk_info.h"
|
||||
|
|
@ -118,8 +118,8 @@ private:
|
|||
|
||||
class DiskSpaceMonitor {
|
||||
public:
|
||||
DiskSpaceMonitor(LocalCacheEngine* cache);
|
||||
DiskSpaceMonitor(LocalCacheEngine* cache, std::shared_ptr<DiskSpace::FileSystemWrapper> fs);
|
||||
DiskSpaceMonitor(LocalDiskCacheEngine* cache);
|
||||
DiskSpaceMonitor(LocalDiskCacheEngine* cache, std::shared_ptr<DiskSpace::FileSystemWrapper> fs);
|
||||
~DiskSpaceMonitor();
|
||||
|
||||
Status init(std::vector<DirSpace>* dir_spaces);
|
||||
|
|
@ -152,7 +152,7 @@ private:
|
|||
|
||||
size_t _total_cache_usage = 0;
|
||||
size_t _total_cache_quota = 0;
|
||||
LocalCacheEngine* _cache = nullptr;
|
||||
LocalDiskCacheEngine* _cache = nullptr;
|
||||
std::shared_ptr<DiskSpace::FileSystemWrapper> _fs = nullptr;
|
||||
};
|
||||
|
||||
|
|
|
|||
|
|
@ -0,0 +1,74 @@
|
|||
// Copyright 2021-present StarRocks, Inc. All rights reserved.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// https://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
#pragma once
|
||||
|
||||
#include "cache/block_cache/io_buffer.h"
|
||||
#include "cache/cache_options.h"
|
||||
#include "cache/object_cache/cache_types.h"
|
||||
#include "common/status.h"
|
||||
|
||||
namespace starrocks {
|
||||
|
||||
class LocalDiskCacheEngine {
|
||||
public:
|
||||
virtual ~LocalDiskCacheEngine() = default;
|
||||
|
||||
virtual bool is_initialized() const = 0;
|
||||
|
||||
// Write data to cache
|
||||
virtual Status write(const std::string& key, const IOBuffer& buffer, WriteCacheOptions* options) = 0;
|
||||
|
||||
// Read data from cache, it returns the data size if successful; otherwise the error status
|
||||
// will be returned.
|
||||
virtual Status read(const std::string& key, size_t off, size_t size, IOBuffer* buffer,
|
||||
ReadCacheOptions* options) = 0;
|
||||
|
||||
virtual bool exist(const std::string& key) const = 0;
|
||||
|
||||
// Remove data from cache.
|
||||
virtual Status remove(const std::string& key) = 0;
|
||||
|
||||
// Update the datacache disk space information, such as disk quota or disk path.
|
||||
virtual Status update_disk_spaces(const std::vector<DirSpace>& spaces) = 0;
|
||||
|
||||
// Update the datacache inline cache count limit
|
||||
virtual Status update_inline_cache_count_limit(int32_t limit) = 0;
|
||||
|
||||
virtual const DataCacheMetrics cache_metrics() const = 0;
|
||||
|
||||
virtual void record_read_remote(size_t size, int64_t latency_us) = 0;
|
||||
|
||||
virtual void record_read_cache(size_t size, int64_t latency_us) = 0;
|
||||
|
||||
virtual Status shutdown() = 0;
|
||||
|
||||
virtual bool has_disk_cache() const = 0;
|
||||
virtual bool available() const = 0;
|
||||
virtual void disk_spaces(std::vector<DirSpace>* spaces) const = 0;
|
||||
|
||||
// Get the lookup count, including cache hit count and cache miss count.
|
||||
virtual size_t lookup_count() const = 0;
|
||||
|
||||
// Get the cache hit count.
|
||||
virtual size_t hit_count() const = 0;
|
||||
|
||||
// Get all cache metrics together.
|
||||
virtual const ObjectCacheMetrics metrics() const = 0;
|
||||
|
||||
// Remove all cache entries that are not actively in use.
|
||||
virtual Status prune() = 0;
|
||||
};
|
||||
|
||||
} // namespace starrocks
|
||||
|
|
@ -21,22 +21,12 @@
|
|||
|
||||
namespace starrocks {
|
||||
|
||||
enum class LocalCacheEngineType { STARCACHE, LRUCACHE };
|
||||
|
||||
class LocalCacheEngine {
|
||||
class LocalMemCacheEngine {
|
||||
public:
|
||||
virtual ~LocalCacheEngine() = default;
|
||||
virtual ~LocalMemCacheEngine() = default;
|
||||
|
||||
virtual bool is_initialized() const = 0;
|
||||
|
||||
// Write data to cache
|
||||
virtual Status write(const std::string& key, const IOBuffer& buffer, WriteCacheOptions* options) = 0;
|
||||
|
||||
// Read data from cache, it returns the data size if successful; otherwise the error status
|
||||
// will be returned.
|
||||
virtual Status read(const std::string& key, size_t off, size_t size, IOBuffer* buffer,
|
||||
ReadCacheOptions* options) = 0;
|
||||
|
||||
// Insert object to cache
|
||||
virtual Status insert(const std::string& key, void* value, size_t size, ObjectCacheDeleter deleter,
|
||||
ObjectCacheHandlePtr* handle, const ObjectCacheWriteOptions& options) = 0;
|
||||
|
|
@ -67,27 +57,13 @@ public:
|
|||
// Update the datacache memory quota.
|
||||
virtual Status update_mem_quota(size_t quota_bytes, bool flush_to_disk) = 0;
|
||||
|
||||
// Update the datacache disk space information, such as disk quota or disk path.
|
||||
virtual Status update_disk_spaces(const std::vector<DirSpace>& spaces) = 0;
|
||||
|
||||
// Update the datacache inline cache count limit
|
||||
virtual Status update_inline_cache_count_limit(int32_t limit) = 0;
|
||||
|
||||
virtual const DataCacheMetrics cache_metrics() const = 0;
|
||||
|
||||
virtual void record_read_remote(size_t size, int64_t latency_us) = 0;
|
||||
|
||||
virtual void record_read_cache(size_t size, int64_t latency_us) = 0;
|
||||
|
||||
virtual Status shutdown() = 0;
|
||||
|
||||
virtual LocalCacheEngineType engine_type() = 0;
|
||||
|
||||
virtual bool has_mem_cache() const = 0;
|
||||
virtual bool has_disk_cache() const = 0;
|
||||
virtual bool available() const = 0;
|
||||
virtual bool mem_cache_available() const = 0;
|
||||
virtual void disk_spaces(std::vector<DirSpace>* spaces) const = 0;
|
||||
|
||||
virtual size_t mem_quota() const = 0;
|
||||
virtual size_t mem_usage() const = 0;
|
||||
|
|
@ -105,4 +81,4 @@ public:
|
|||
virtual Status prune() = 0;
|
||||
};
|
||||
|
||||
} // namespace starrocks
|
||||
} // namespace starrocks
|
||||
|
|
@ -23,20 +23,8 @@ Status LRUCacheEngine::init(const MemCacheOptions& options) {
|
|||
return Status::OK();
|
||||
}
|
||||
|
||||
Status LRUCacheEngine::write(const std::string& key, const IOBuffer& buffer, WriteCacheOptions* options) {
|
||||
return Status::NotSupported("LRUCache engine don't support write block");
|
||||
}
|
||||
|
||||
Status LRUCacheEngine::read(const std::string& key, size_t off, size_t size, IOBuffer* buffer,
|
||||
ReadCacheOptions* options) {
|
||||
return Status::NotSupported("LRUCache engine don't support read block");
|
||||
}
|
||||
|
||||
Status LRUCacheEngine::insert(const std::string& key, void* value, size_t size, ObjectCacheDeleter deleter,
|
||||
ObjectCacheHandlePtr* handle, const ObjectCacheWriteOptions& options) {
|
||||
if (!_check_write(size, options)) {
|
||||
return Status::InternalError("cache insertion is rejected");
|
||||
}
|
||||
auto* lru_handle = _cache->insert(key, value, size, deleter, static_cast<CachePriority>(options.priority));
|
||||
if (handle) {
|
||||
*handle = reinterpret_cast<ObjectCacheHandlePtr>(lru_handle);
|
||||
|
|
@ -73,14 +61,6 @@ Status LRUCacheEngine::update_mem_quota(size_t quota_bytes, bool flush_to_disk)
|
|||
return Status::OK();
|
||||
}
|
||||
|
||||
Status LRUCacheEngine::update_disk_spaces(const std::vector<DirSpace>& spaces) {
|
||||
return Status::NotSupported("LRUCache engine don't support update disk spaces");
|
||||
}
|
||||
|
||||
Status LRUCacheEngine::update_inline_cache_count_limit(int32_t limit) {
|
||||
return Status::NotSupported("LRUCache engine don't support update inline cache count limit");
|
||||
}
|
||||
|
||||
const DataCacheMetrics LRUCacheEngine::cache_metrics() const {
|
||||
return DataCacheMetrics{.status = DataCacheStatus::NORMAL,
|
||||
.mem_quota_bytes = _cache->get_capacity(),
|
||||
|
|
@ -144,25 +124,4 @@ const ObjectCacheMetrics LRUCacheEngine::metrics() const {
|
|||
return m;
|
||||
}
|
||||
|
||||
bool LRUCacheEngine::_check_write(size_t charge, const ObjectCacheWriteOptions& options) const {
|
||||
if (options.evict_probability >= 100) {
|
||||
return true;
|
||||
}
|
||||
if (options.evict_probability <= 0) {
|
||||
return false;
|
||||
}
|
||||
|
||||
/*
|
||||
// TODO: The cost of this call may be relatively high, and it needs to be optimized later.
|
||||
if (_cache->get_memory_usage() + charge <= _cache->get_capacity()) {
|
||||
return true;
|
||||
}
|
||||
*/
|
||||
|
||||
if (butil::fast_rand_less_than(100) < options.evict_probability) {
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
} // namespace starrocks
|
||||
|
|
@ -16,11 +16,11 @@
|
|||
|
||||
#include <atomic>
|
||||
|
||||
#include "cache/local_cache_engine.h"
|
||||
#include "cache/local_mem_cache_engine.h"
|
||||
#include "util/lru_cache.h"
|
||||
|
||||
namespace starrocks {
|
||||
class LRUCacheEngine final : public LocalCacheEngine {
|
||||
class LRUCacheEngine final : public LocalMemCacheEngine {
|
||||
public:
|
||||
LRUCacheEngine() = default;
|
||||
~LRUCacheEngine() override = default;
|
||||
|
|
@ -28,9 +28,6 @@ public:
|
|||
Status init(const MemCacheOptions& options);
|
||||
bool is_initialized() const override { return _initialized.load(std::memory_order_relaxed); }
|
||||
|
||||
Status write(const std::string& key, const IOBuffer& buffer, WriteCacheOptions* options) override;
|
||||
Status read(const std::string& key, size_t off, size_t size, IOBuffer* buffer, ReadCacheOptions* options) override;
|
||||
|
||||
Status insert(const std::string& key, void* value, size_t size, ObjectCacheDeleter deleter,
|
||||
ObjectCacheHandlePtr* handle, const ObjectCacheWriteOptions& options) override;
|
||||
Status lookup(const std::string& key, ObjectCacheHandlePtr* handle, ObjectCacheReadOptions* options) override;
|
||||
|
|
@ -39,23 +36,15 @@ public:
|
|||
Status remove(const std::string& key) override;
|
||||
|
||||
Status update_mem_quota(size_t quota_bytes, bool flush_to_disk) override;
|
||||
Status update_disk_spaces(const std::vector<DirSpace>& spaces) override;
|
||||
Status update_inline_cache_count_limit(int32_t limit) override;
|
||||
|
||||
const DataCacheMetrics cache_metrics() const override;
|
||||
void record_read_remote(size_t size, int64_t latency_us) override {}
|
||||
void record_read_cache(size_t size, int64_t latency_us) override {}
|
||||
|
||||
Status shutdown() override;
|
||||
LocalCacheEngineType engine_type() override { return LocalCacheEngineType::LRUCACHE; }
|
||||
bool has_mem_cache() const override { return _cache->get_capacity() > 0; }
|
||||
bool has_disk_cache() const override { return false; }
|
||||
|
||||
bool available() const override { return is_initialized() && has_mem_cache(); }
|
||||
bool mem_cache_available() const override { return is_initialized() && has_mem_cache(); }
|
||||
|
||||
void disk_spaces(std::vector<DirSpace>* spaces) const override {}
|
||||
|
||||
void release(ObjectCacheHandlePtr handle) override;
|
||||
const void* value(ObjectCacheHandlePtr handle) override;
|
||||
|
||||
|
|
@ -73,8 +62,6 @@ public:
|
|||
Status prune() override;
|
||||
|
||||
private:
|
||||
bool _check_write(size_t charge, const ObjectCacheWriteOptions& options) const;
|
||||
|
||||
std::atomic<bool> _initialized = false;
|
||||
std::unique_ptr<ShardedLRUCache> _cache;
|
||||
};
|
||||
|
|
|
|||
|
|
@ -23,19 +23,9 @@
|
|||
|
||||
namespace starrocks {
|
||||
|
||||
enum class ObjectCacheModuleType { LRUCACHE, STARCACHE };
|
||||
|
||||
struct ObjectCacheWriteOptions {
|
||||
// The priority of the cache object, only support 0 and 1 now.
|
||||
int8_t priority = 0;
|
||||
// If ttl_seconds=0 (default), no ttl restriction will be set. If an old one exists, remove it.
|
||||
uint64_t ttl_seconds = 0;
|
||||
// If overwrite=true, the cache value will be replaced if it already exists.
|
||||
bool overwrite = false;
|
||||
// The probability to evict other items if the cache space is full, which can help avoid frequent cache replacement
|
||||
// and improve cache hit rate sometimes.
|
||||
// It is expressed as a percentage. If evict_probability is 10, it means the probability to evict other data is 10%.
|
||||
int32_t evict_probability = 100;
|
||||
};
|
||||
|
||||
struct ObjectCacheReadOptions {};
|
||||
|
|
@ -60,16 +50,4 @@ using ObjectCacheHandlePtr = ObjectCacheHandle*;
|
|||
// cache deleter when using a lru cache module.
|
||||
using ObjectCacheDeleter = void (*)(const CacheKey&, void*);
|
||||
|
||||
inline std::ostream& operator<<(std::ostream& os, const ObjectCacheModuleType& module) {
|
||||
switch (module) {
|
||||
case ObjectCacheModuleType::LRUCACHE:
|
||||
os << "lrucache";
|
||||
break;
|
||||
case ObjectCacheModuleType::STARCACHE:
|
||||
os << "starcache";
|
||||
break;
|
||||
}
|
||||
return os;
|
||||
}
|
||||
|
||||
} // namespace starrocks
|
||||
|
|
@ -66,9 +66,9 @@ public:
|
|||
// Client should call create_global_cache before.
|
||||
static StoragePageCache* instance() { return DataCache::GetInstance()->page_cache(); }
|
||||
|
||||
StoragePageCache(LocalCacheEngine* cache_engine) : _cache(cache_engine), _initialized(true) {}
|
||||
StoragePageCache(LocalMemCacheEngine* cache_engine) : _cache(cache_engine), _initialized(true) {}
|
||||
|
||||
void init(LocalCacheEngine* cache_engine) {
|
||||
void init(LocalMemCacheEngine* cache_engine) {
|
||||
_cache = cache_engine;
|
||||
_initialized.store(true, std::memory_order_relaxed);
|
||||
}
|
||||
|
|
@ -115,7 +115,7 @@ public:
|
|||
size_t get_pinned_count() const;
|
||||
|
||||
private:
|
||||
LocalCacheEngine* _cache = nullptr;
|
||||
LocalMemCacheEngine* _cache = nullptr;
|
||||
std::atomic<bool> _initialized = false;
|
||||
};
|
||||
|
||||
|
|
@ -125,7 +125,12 @@ private:
|
|||
class PageCacheHandle {
|
||||
public:
|
||||
PageCacheHandle() = default;
|
||||
PageCacheHandle(LocalCacheEngine* cache, ObjectCacheHandle* handle) : _cache(cache), _handle(handle) {}
|
||||
PageCacheHandle(LocalMemCacheEngine* cache, ObjectCacheHandle* handle) : _cache(cache), _handle(handle) {}
|
||||
|
||||
// Don't allow copy and assign
|
||||
PageCacheHandle(const PageCacheHandle&) = delete;
|
||||
const PageCacheHandle& operator=(const PageCacheHandle&) = delete;
|
||||
|
||||
~PageCacheHandle() {
|
||||
if (_handle != nullptr) {
|
||||
StoragePageCacheMetrics::released_page_handle_count++;
|
||||
|
|
@ -145,16 +150,12 @@ public:
|
|||
return *this;
|
||||
}
|
||||
|
||||
LocalCacheEngine* cache() const { return _cache; }
|
||||
LocalMemCacheEngine* cache() const { return _cache; }
|
||||
const void* data() const { return _cache->value(_handle); }
|
||||
|
||||
private:
|
||||
LocalCacheEngine* _cache = nullptr;
|
||||
LocalMemCacheEngine* _cache = nullptr;
|
||||
ObjectCacheHandle* _handle = nullptr;
|
||||
|
||||
// Don't allow copy and assign
|
||||
PageCacheHandle(const PageCacheHandle&) = delete;
|
||||
const PageCacheHandle& operator=(const PageCacheHandle&) = delete;
|
||||
};
|
||||
|
||||
} // namespace starrocks
|
||||
|
|
|
|||
|
|
@ -120,52 +120,6 @@ Status StarCacheEngine::read(const std::string& key, size_t off, size_t size, IO
|
|||
return st;
|
||||
}
|
||||
|
||||
Status StarCacheEngine::insert(const std::string& key, void* value, size_t size, ObjectCacheDeleter deleter,
|
||||
ObjectCacheHandlePtr* handle, const ObjectCacheWriteOptions& options) {
|
||||
starcache::ObjectHandle* obj_hdl = new starcache::ObjectHandle;
|
||||
auto obj_deleter = [deleter, key, value] {
|
||||
// For temporary compatibility with old deleters.
|
||||
CacheKey cache_key(key);
|
||||
deleter(cache_key, value);
|
||||
};
|
||||
starcache::WriteOptions opts;
|
||||
opts.priority = options.priority;
|
||||
opts.ttl_seconds = options.ttl_seconds;
|
||||
opts.overwrite = options.overwrite;
|
||||
opts.evict_probability = options.evict_probability;
|
||||
Status st = to_status(_cache->set_object(key, value, size, obj_deleter, obj_hdl, &opts));
|
||||
if (!st.ok()) {
|
||||
delete obj_hdl;
|
||||
} else if (handle) {
|
||||
// Try release the old handle before fill it with a new one.
|
||||
_try_release_obj_handle(*handle);
|
||||
*handle = reinterpret_cast<ObjectCacheHandlePtr>(obj_hdl);
|
||||
}
|
||||
return st;
|
||||
}
|
||||
|
||||
Status StarCacheEngine::lookup(const std::string& key, ObjectCacheHandlePtr* handle, ObjectCacheReadOptions* options) {
|
||||
starcache::ObjectHandle* obj_hdl = new starcache::ObjectHandle;
|
||||
// Skip checking options temporarily because there is no valid members in `ObjectCacheReadOptions` now.
|
||||
Status st = to_status(_cache->get_object(key, obj_hdl, nullptr));
|
||||
if (!st.ok()) {
|
||||
delete obj_hdl;
|
||||
} else if (handle) {
|
||||
_try_release_obj_handle(*handle);
|
||||
*handle = reinterpret_cast<ObjectCacheHandlePtr>(obj_hdl);
|
||||
}
|
||||
return st;
|
||||
}
|
||||
|
||||
void StarCacheEngine::release(ObjectCacheHandlePtr handle) {
|
||||
_try_release_obj_handle(handle);
|
||||
}
|
||||
|
||||
const void* StarCacheEngine::value(ObjectCacheHandlePtr handle) {
|
||||
auto obj_hdl = reinterpret_cast<starcache::ObjectHandle*>(handle);
|
||||
return obj_hdl->ptr();
|
||||
}
|
||||
|
||||
bool StarCacheEngine::exist(const std::string& key) const {
|
||||
return _cache->exist(key);
|
||||
}
|
||||
|
|
@ -174,22 +128,6 @@ Status StarCacheEngine::remove(const std::string& key) {
|
|||
return to_status(_cache->remove(key));
|
||||
}
|
||||
|
||||
Status StarCacheEngine::update_mem_quota(size_t quota_bytes, bool flush_to_disk) {
|
||||
Status st = to_status(_cache->update_mem_quota(quota_bytes, flush_to_disk));
|
||||
_refresh_quota();
|
||||
return st;
|
||||
}
|
||||
|
||||
Status StarCacheEngine::adjust_mem_quota(int64_t delta, size_t min_capacity) {
|
||||
auto starcache_metrics = _cache->metrics();
|
||||
size_t capacity = starcache_metrics.mem_quota_bytes;
|
||||
int64_t new_capacity = capacity + delta;
|
||||
if (new_capacity < (int64_t)min_capacity) {
|
||||
return Status::InvalidArgument("target capacity is less than the minimum capacity");
|
||||
}
|
||||
return to_status(_cache->update_mem_quota(new_capacity, false));
|
||||
}
|
||||
|
||||
Status StarCacheEngine::update_disk_spaces(const std::vector<DirSpace>& spaces) {
|
||||
std::vector<starcache::DirSpace> disk_spaces;
|
||||
disk_spaces.reserve(spaces.size());
|
||||
|
|
@ -239,18 +177,9 @@ Status StarCacheEngine::shutdown() {
|
|||
|
||||
void StarCacheEngine::_refresh_quota() {
|
||||
auto metrics = starcache_metrics(0);
|
||||
_mem_quota.store(metrics.mem_quota_bytes, std::memory_order_relaxed);
|
||||
_disk_quota.store(metrics.disk_quota_bytes, std::memory_order_relaxed);
|
||||
}
|
||||
|
||||
void StarCacheEngine::_try_release_obj_handle(ObjectCacheHandlePtr handle) {
|
||||
if (handle) {
|
||||
auto obj_hdl = reinterpret_cast<starcache::ObjectHandle*>(handle);
|
||||
obj_hdl->release();
|
||||
delete obj_hdl;
|
||||
}
|
||||
}
|
||||
|
||||
void StarCacheEngine::disk_spaces(std::vector<DirSpace>* spaces) const {
|
||||
spaces->clear();
|
||||
auto metrics = starcache_metrics(0);
|
||||
|
|
@ -259,18 +188,6 @@ void StarCacheEngine::disk_spaces(std::vector<DirSpace>* spaces) const {
|
|||
}
|
||||
}
|
||||
|
||||
size_t StarCacheEngine::mem_quota() const {
|
||||
starcache::CacheMetrics metrics = _cache->metrics(0);
|
||||
// TODO: optimizer later
|
||||
return metrics.mem_quota_bytes;
|
||||
}
|
||||
|
||||
size_t StarCacheEngine::mem_usage() const {
|
||||
// TODO: add meta size?
|
||||
starcache::CacheMetrics metrics = _cache->metrics(0);
|
||||
return metrics.mem_used_bytes;
|
||||
}
|
||||
|
||||
size_t StarCacheEngine::lookup_count() const {
|
||||
starcache::CacheMetrics metrics = _cache->metrics(1);
|
||||
return metrics.detail_l1->hit_count + metrics.detail_l1->miss_count;
|
||||
|
|
|
|||
|
|
@ -14,17 +14,17 @@
|
|||
|
||||
#pragma once
|
||||
|
||||
#include "cache/local_cache_engine.h"
|
||||
#include "cache/local_disk_cache_engine.h"
|
||||
#include "common/status.h"
|
||||
#include "starcache/star_cache.h"
|
||||
#include "starcache/time_based_cache_adaptor.h"
|
||||
|
||||
namespace starrocks {
|
||||
|
||||
class StarCacheEngine : public LocalCacheEngine {
|
||||
class StarCacheEngine : public LocalDiskCacheEngine {
|
||||
public:
|
||||
StarCacheEngine() = default;
|
||||
virtual ~StarCacheEngine() override = default;
|
||||
~StarCacheEngine() override = default;
|
||||
|
||||
Status init(const DiskCacheOptions& options);
|
||||
bool is_initialized() const override { return _initialized.load(std::memory_order_relaxed); }
|
||||
|
|
@ -32,23 +32,10 @@ public:
|
|||
Status write(const std::string& key, const IOBuffer& buffer, WriteCacheOptions* options) override;
|
||||
Status read(const std::string& key, size_t off, size_t size, IOBuffer* buffer, ReadCacheOptions* options) override;
|
||||
|
||||
Status insert(const std::string& key, void* value, size_t size, ObjectCacheDeleter deleter,
|
||||
ObjectCacheHandlePtr* handle, const ObjectCacheWriteOptions& options) override;
|
||||
|
||||
Status lookup(const std::string& key, ObjectCacheHandlePtr* handle, ObjectCacheReadOptions* options) override;
|
||||
|
||||
void release(ObjectCacheHandlePtr handle) override;
|
||||
|
||||
const void* value(ObjectCacheHandlePtr handle) override;
|
||||
|
||||
bool exist(const std::string& key) const override;
|
||||
|
||||
Status remove(const std::string& key) override;
|
||||
|
||||
Status adjust_mem_quota(int64_t delta, size_t min_capacity) override;
|
||||
|
||||
Status update_mem_quota(size_t quota_bytes, bool flush_to_disk) override;
|
||||
|
||||
Status update_disk_spaces(const std::vector<DirSpace>& spaces) override;
|
||||
|
||||
Status update_inline_cache_count_limit(int32_t limit) override;
|
||||
|
|
@ -63,19 +50,12 @@ public:
|
|||
|
||||
Status shutdown() override;
|
||||
|
||||
LocalCacheEngineType engine_type() override { return LocalCacheEngineType::STARCACHE; }
|
||||
|
||||
std::shared_ptr<starcache::StarCache> starcache_instance() { return _cache; }
|
||||
bool has_mem_cache() const override { return _mem_quota.load(std::memory_order_relaxed) > 0; }
|
||||
bool has_disk_cache() const override { return _disk_quota.load(std::memory_order_relaxed) > 0; }
|
||||
bool available() const override { return is_initialized() && (has_mem_cache() || has_disk_cache()); }
|
||||
bool mem_cache_available() const override { return is_initialized() && has_mem_cache(); }
|
||||
bool available() const override { return is_initialized() && has_disk_cache(); }
|
||||
|
||||
void disk_spaces(std::vector<DirSpace>* spaces) const override;
|
||||
|
||||
size_t mem_quota() const override;
|
||||
size_t mem_usage() const override;
|
||||
|
||||
size_t lookup_count() const override;
|
||||
|
||||
size_t hit_count() const override;
|
||||
|
|
@ -86,7 +66,6 @@ public:
|
|||
|
||||
private:
|
||||
void _refresh_quota();
|
||||
void _try_release_obj_handle(ObjectCacheHandlePtr handle);
|
||||
|
||||
std::shared_ptr<starcache::StarCache> _cache;
|
||||
std::unique_ptr<starcache::TimeBasedCacheAdaptor> _cache_adaptor;
|
||||
|
|
@ -94,7 +73,6 @@ private:
|
|||
bool _enable_datacache_persistence = false;
|
||||
std::atomic<bool> _initialized = false;
|
||||
|
||||
std::atomic<size_t> _mem_quota = 0;
|
||||
std::atomic<size_t> _disk_quota = 0;
|
||||
};
|
||||
} // namespace starrocks
|
||||
|
|
|
|||
|
|
@ -69,7 +69,7 @@ Status SchemaBeDataCacheMetricsScanner::get_next(ChunkPtr* chunk, bool* eos) {
|
|||
|
||||
// TODO: Support LRUCacheEngine
|
||||
auto* cache = DataCache::GetInstance()->local_disk_cache();
|
||||
if (cache != nullptr && cache->is_initialized() && cache->engine_type() == LocalCacheEngineType::STARCACHE) {
|
||||
if (cache != nullptr && cache->is_initialized()) {
|
||||
auto* starcache = reinterpret_cast<StarCacheEngine*>(cache);
|
||||
// retrieve different priority's used bytes from level = 2 metrics
|
||||
metrics = starcache->starcache_metrics(2);
|
||||
|
|
|
|||
|
|
@ -471,7 +471,6 @@ StatusOr<FileMetaDataPtr> FileMetaDataParser::get_file_metadata() {
|
|||
if (file_metadata_size > 0) {
|
||||
auto deleter = [](const starrocks::CacheKey& key, void* value) { delete (FileMetaDataPtr*)value; };
|
||||
ObjectCacheWriteOptions options;
|
||||
options.evict_probability = _datacache_options->datacache_evict_probability;
|
||||
auto capture = std::make_unique<FileMetaDataPtr>(file_metadata);
|
||||
Status st = _cache->insert(metacache_key, (void*)(capture.get()), file_metadata_size, deleter, options,
|
||||
&cache_handle);
|
||||
|
|
|
|||
|
|
@ -92,7 +92,7 @@ Status PageReader::_deal_page_with_cache() {
|
|||
return Status::OK();
|
||||
}
|
||||
RETURN_IF_ERROR(_read_and_decompress_internal(true));
|
||||
ObjectCacheWriteOptions opts{.evict_probability = _opts.datacache_options->datacache_evict_probability};
|
||||
ObjectCacheWriteOptions opts;
|
||||
auto st = _cache->insert(page_cache_key, _cache_buf, opts, &cache_handle);
|
||||
if (st.ok()) {
|
||||
_page_handle = PageHandle(std::move(cache_handle));
|
||||
|
|
|
|||
|
|
@ -22,7 +22,7 @@
|
|||
#include <string>
|
||||
|
||||
#include "cache/block_cache/block_cache_hit_rate_counter.hpp"
|
||||
#include "cache/local_cache_engine.h"
|
||||
#include "cache/local_disk_cache_engine.h"
|
||||
#include "http/http_channel.h"
|
||||
#include "http/http_headers.h"
|
||||
#include "http/http_request.h"
|
||||
|
|
@ -58,8 +58,6 @@ void DataCacheAction::handle(HttpRequest* req) {
|
|||
}
|
||||
if (!_local_cache || !_local_cache->is_initialized()) {
|
||||
_handle_error(req, strings::Substitute("Cache system is not ready"));
|
||||
} else if (_local_cache->engine_type() != LocalCacheEngineType::STARCACHE) {
|
||||
_handle_error(req, strings::Substitute("No more metrics for current cache engine type"));
|
||||
} else if (req->param(ACTION_KEY) == ACTION_STAT) {
|
||||
_handle_stat(req);
|
||||
} else {
|
||||
|
|
|
|||
|
|
@ -28,11 +28,11 @@
|
|||
|
||||
namespace starrocks {
|
||||
|
||||
class LocalCacheEngine;
|
||||
class LocalDiskCacheEngine;
|
||||
// TODO: support mem metrics
|
||||
class DataCacheAction : public HttpHandler {
|
||||
public:
|
||||
explicit DataCacheAction(LocalCacheEngine* local_cache) : _local_cache(local_cache) {}
|
||||
explicit DataCacheAction(LocalDiskCacheEngine* local_cache) : _local_cache(local_cache) {}
|
||||
~DataCacheAction() override = default;
|
||||
|
||||
void handle(HttpRequest* req) override;
|
||||
|
|
@ -44,7 +44,7 @@ private:
|
|||
void _handle_app_stat(HttpRequest* req);
|
||||
void _handle_error(HttpRequest* req, const std::string& error_msg);
|
||||
|
||||
LocalCacheEngine* _local_cache;
|
||||
LocalDiskCacheEngine* _local_cache;
|
||||
};
|
||||
|
||||
} // namespace starrocks
|
||||
|
|
|
|||
|
|
@ -115,7 +115,7 @@ Status UpdateConfigAction::update_config(const std::string& name, const std::str
|
|||
return Status::OK();
|
||||
});
|
||||
_config_callback.emplace("datacache_mem_size", [&]() -> Status {
|
||||
LocalCacheEngine* cache = DataCache::GetInstance()->local_mem_cache();
|
||||
LocalMemCacheEngine* cache = DataCache::GetInstance()->local_mem_cache();
|
||||
if (cache == nullptr || !cache->is_initialized()) {
|
||||
return Status::InternalError("Local cache is not initialized");
|
||||
}
|
||||
|
|
@ -130,7 +130,7 @@ Status UpdateConfigAction::update_config(const std::string& name, const std::str
|
|||
return cache->update_mem_quota(mem_size, true);
|
||||
});
|
||||
_config_callback.emplace("datacache_disk_size", [&]() -> Status {
|
||||
LocalCacheEngine* cache = DataCache::GetInstance()->local_disk_cache();
|
||||
LocalDiskCacheEngine* cache = DataCache::GetInstance()->local_disk_cache();
|
||||
if (cache == nullptr || !cache->is_initialized()) {
|
||||
return Status::InternalError("Local cache is not initialized");
|
||||
}
|
||||
|
|
@ -149,7 +149,7 @@ Status UpdateConfigAction::update_config(const std::string& name, const std::str
|
|||
return cache->update_disk_spaces(spaces);
|
||||
});
|
||||
_config_callback.emplace("datacache_inline_item_count_limit", [&]() -> Status {
|
||||
LocalCacheEngine* cache = DataCache::GetInstance()->local_disk_cache();
|
||||
LocalDiskCacheEngine* cache = DataCache::GetInstance()->local_disk_cache();
|
||||
if (cache == nullptr || !cache->is_initialized()) {
|
||||
return Status::InternalError("Local cache is not initialized");
|
||||
}
|
||||
|
|
|
|||
|
|
@ -527,7 +527,7 @@ void RuntimeState::update_load_datacache_metrics(TReportExecStatusParams* load_p
|
|||
#endif // USE_STAROS
|
||||
} else {
|
||||
// TODO: mem_metrics + disk_metrics
|
||||
const LocalCacheEngine* cache = DataCache::GetInstance()->local_disk_cache();
|
||||
const LocalDiskCacheEngine* cache = DataCache::GetInstance()->local_disk_cache();
|
||||
if (cache != nullptr && cache->is_initialized()) {
|
||||
TDataCacheMetrics t_metrics{};
|
||||
DataCacheUtils::set_metrics_from_thrift(t_metrics, cache->cache_metrics());
|
||||
|
|
|
|||
|
|
@ -301,7 +301,7 @@ void SystemMetrics::_update_datacache_mem_tracker() {
|
|||
int64_t datacache_mem_bytes = 0;
|
||||
auto* datacache_mem_tracker = GlobalEnv::GetInstance()->datacache_mem_tracker();
|
||||
if (datacache_mem_tracker) {
|
||||
LocalCacheEngine* local_cache = DataCache::GetInstance()->local_mem_cache();
|
||||
LocalMemCacheEngine* local_cache = DataCache::GetInstance()->local_mem_cache();
|
||||
if (local_cache != nullptr && local_cache->is_initialized()) {
|
||||
auto datacache_metrics = local_cache->cache_metrics();
|
||||
datacache_mem_bytes = datacache_metrics.mem_used_bytes + datacache_metrics.meta_used_bytes;
|
||||
|
|
|
|||
|
|
@ -232,13 +232,6 @@ TEST_F(BlockCacheTest, update_cache_quota) {
|
|||
ASSERT_EQ(metrics.disk_quota_bytes, quota);
|
||||
}
|
||||
|
||||
{
|
||||
size_t new_mem_quota = 2 * 1024 * 1024;
|
||||
ASSERT_TRUE(local_cache->update_mem_quota(new_mem_quota, false).ok());
|
||||
auto metrics = local_cache->cache_metrics();
|
||||
ASSERT_EQ(metrics.mem_quota_bytes, new_mem_quota);
|
||||
}
|
||||
|
||||
{
|
||||
size_t new_disk_quota = 100 * 1024 * 1024;
|
||||
std::vector<DirSpace> dir_spaces;
|
||||
|
|
|
|||
|
|
@ -191,9 +191,4 @@ TEST_F(LRUCacheEngineTest, test_metrics) {
|
|||
ASSERT_EQ(metrics.capacity, _capacity);
|
||||
ASSERT_EQ(metrics.object_item_count, 0);
|
||||
}
|
||||
|
||||
TEST_F(LRUCacheEngineTest, adjust_inline_cache_count_limit) {
|
||||
ASSERT_TRUE(_cache->update_inline_cache_count_limit(0).is_not_supported());
|
||||
}
|
||||
|
||||
} // namespace starrocks
|
||||
|
|
|
|||
|
|
@ -27,22 +27,13 @@ protected:
|
|||
void SetUp() override;
|
||||
void TearDown() override;
|
||||
|
||||
static void Deleter(const CacheKey& k, void* v) { free(v); }
|
||||
|
||||
void insert_value(int i);
|
||||
|
||||
void _init_local_cache();
|
||||
static std::string _int_to_string(size_t length, int num);
|
||||
void _check_not_found(int value);
|
||||
void _check_found(int value);
|
||||
|
||||
std::string _cache_dir = "./starcache_engine_test";
|
||||
std::shared_ptr<StarCacheEngine> _cache;
|
||||
|
||||
size_t _value_size = 256 * 1024;
|
||||
int64_t _mem_quota = 64 * 1024 * 1024;
|
||||
|
||||
ObjectCacheWriteOptions _write_opt;
|
||||
};
|
||||
|
||||
void StarCacheEngineTest::SetUp() {
|
||||
|
|
@ -55,36 +46,12 @@ void StarCacheEngineTest::TearDown() {
|
|||
ASSERT_OK(fs::remove_all(_cache_dir));
|
||||
}
|
||||
|
||||
void StarCacheEngineTest::insert_value(int i) {
|
||||
std::string key = _int_to_string(6, i);
|
||||
int* ptr = (int*)malloc(_value_size);
|
||||
*ptr = i;
|
||||
ObjectCacheHandlePtr handle = nullptr;
|
||||
ASSERT_OK(_cache->insert(key, (void*)ptr, _value_size, &Deleter, &handle, _write_opt));
|
||||
_cache->release(handle);
|
||||
}
|
||||
|
||||
std::string StarCacheEngineTest::_int_to_string(size_t length, int num) {
|
||||
std::ostringstream oss;
|
||||
oss << std::setw(length) << std::setfill('0') << num;
|
||||
return oss.str();
|
||||
}
|
||||
|
||||
void StarCacheEngineTest::_check_not_found(int value) {
|
||||
std::string key = _int_to_string(6, value);
|
||||
ObjectCacheHandlePtr handle = nullptr;
|
||||
Status st = _cache->lookup(key, &handle, nullptr);
|
||||
ASSERT_TRUE(st.is_not_found());
|
||||
}
|
||||
|
||||
void StarCacheEngineTest::_check_found(int value) {
|
||||
std::string key = _int_to_string(6, value);
|
||||
ObjectCacheHandlePtr handle = nullptr;
|
||||
ASSERT_OK(_cache->lookup(key, &handle, nullptr));
|
||||
ASSERT_EQ(*(int*)(_cache->value(handle)), value);
|
||||
_cache->release(handle);
|
||||
}
|
||||
|
||||
void StarCacheEngineTest::_init_local_cache() {
|
||||
DiskCacheOptions options = TestCacheUtils::create_simple_options(256 * KB, _mem_quota);
|
||||
options.dir_spaces.push_back({.path = _cache_dir, .size = 50 * MB});
|
||||
|
|
@ -93,121 +60,6 @@ void StarCacheEngineTest::_init_local_cache() {
|
|||
ASSERT_OK(_cache->init(options));
|
||||
}
|
||||
|
||||
TEST_F(StarCacheEngineTest, insert_success) {
|
||||
insert_value(0);
|
||||
size_t kv_size = _cache->mem_usage();
|
||||
|
||||
for (int i = 1; i < 20; i++) {
|
||||
insert_value(i);
|
||||
}
|
||||
ASSERT_EQ(_cache->mem_usage(), kv_size * 20);
|
||||
}
|
||||
|
||||
TEST_F(StarCacheEngineTest, test_insert) {
|
||||
size_t mem_size = 4096;
|
||||
void* ptr = malloc(mem_size);
|
||||
int* value = new (ptr) int;
|
||||
*value = 10;
|
||||
ObjectCacheHandlePtr handle = nullptr;
|
||||
ASSERT_OK(_cache->insert("1", (void*)ptr, mem_size, &Deleter, &handle, _write_opt));
|
||||
_cache->release(handle);
|
||||
|
||||
ObjectCacheHandlePtr lookup_handle = nullptr;
|
||||
ASSERT_OK(_cache->lookup("1", &lookup_handle, nullptr));
|
||||
const void* result = _cache->value(lookup_handle);
|
||||
ASSERT_EQ(*(const int*)(result), 10);
|
||||
_cache->release(lookup_handle);
|
||||
}
|
||||
|
||||
TEST_F(StarCacheEngineTest, lookup) {
|
||||
insert_value(0);
|
||||
insert_value(1);
|
||||
|
||||
ObjectCacheHandlePtr handle = nullptr;
|
||||
std::string key = _int_to_string(6, 1);
|
||||
ASSERT_OK(_cache->lookup(key, &handle, nullptr));
|
||||
ASSERT_EQ(*(int*)_cache->value(handle), 1);
|
||||
|
||||
ASSERT_OK(_cache->lookup(key, &handle, nullptr));
|
||||
ASSERT_EQ(*(int*)_cache->value(handle), 1);
|
||||
_cache->release(handle);
|
||||
|
||||
_check_not_found(2);
|
||||
}
|
||||
|
||||
TEST_F(StarCacheEngineTest, insert_and_release_old_handle) {
|
||||
std::string key = _int_to_string(6, 0);
|
||||
int* ptr = (int*)malloc(_value_size);
|
||||
*ptr = 0;
|
||||
ObjectCacheHandlePtr handle = nullptr;
|
||||
ASSERT_OK(_cache->insert(key, (void*)ptr, _value_size, &Deleter, &handle, _write_opt));
|
||||
|
||||
key = _int_to_string(6, 1);
|
||||
ptr = (int*)malloc(_value_size);
|
||||
*ptr = 1;
|
||||
ASSERT_OK(_cache->insert(key, (void*)ptr, _value_size, &Deleter, &handle, _write_opt));
|
||||
_cache->release(handle);
|
||||
}
|
||||
|
||||
TEST_F(StarCacheEngineTest, remove) {
|
||||
insert_value(0);
|
||||
insert_value(1);
|
||||
|
||||
ASSERT_OK(_cache->remove(_int_to_string(6, 1)));
|
||||
_check_not_found(1);
|
||||
}
|
||||
|
||||
TEST_F(StarCacheEngineTest, value) {
|
||||
insert_value(1);
|
||||
|
||||
ObjectCacheHandlePtr handle = nullptr;
|
||||
std::string key = _int_to_string(6, 1);
|
||||
ASSERT_OK(_cache->lookup(key, &handle, nullptr));
|
||||
const void* data = _cache->value(handle);
|
||||
ASSERT_EQ(*(const int*)data, 1);
|
||||
_cache->release(handle);
|
||||
}
|
||||
|
||||
TEST_F(StarCacheEngineTest, set_capacity) {
|
||||
insert_value(0);
|
||||
size_t kv_size = _cache->mem_usage();
|
||||
|
||||
size_t num = _mem_quota / kv_size;
|
||||
|
||||
for (size_t i = 1; i < num; i++) {
|
||||
insert_value(i);
|
||||
}
|
||||
_check_found(0);
|
||||
ASSERT_LE(_cache->mem_usage(), num * kv_size);
|
||||
ASSERT_EQ(_cache->mem_quota(), _mem_quota);
|
||||
|
||||
ASSERT_OK(_cache->update_mem_quota(_mem_quota / 2, false));
|
||||
_check_not_found(1);
|
||||
ASSERT_EQ(_cache->mem_quota(), _mem_quota / 2);
|
||||
ASSERT_LE(_cache->mem_usage(), _mem_quota / 2);
|
||||
}
|
||||
|
||||
TEST_F(StarCacheEngineTest, adjust_capacity) {
|
||||
insert_value(0);
|
||||
size_t kv_size = _cache->mem_usage();
|
||||
|
||||
size_t num = _mem_quota / kv_size;
|
||||
|
||||
for (size_t i = 1; i < num; i++) {
|
||||
insert_value(i);
|
||||
}
|
||||
_check_found(0);
|
||||
ASSERT_LE(_cache->mem_usage(), _mem_quota);
|
||||
ASSERT_EQ(_cache->mem_quota(), _mem_quota);
|
||||
|
||||
ASSERT_OK(_cache->adjust_mem_quota(-1 * _mem_quota / 2, 0));
|
||||
_check_not_found(1);
|
||||
ASSERT_LE(_cache->mem_usage(), _mem_quota / 2);
|
||||
ASSERT_EQ(_cache->mem_quota(), _mem_quota / 2);
|
||||
|
||||
ASSERT_TRUE(_cache->adjust_mem_quota(-1 * _mem_quota / 3, _mem_quota / 2).is_invalid_argument());
|
||||
}
|
||||
|
||||
static void empty_deleter(void*) {}
|
||||
|
||||
TEST_F(StarCacheEngineTest, adjust_inline_cache_count_limit) {
|
||||
|
|
@ -239,40 +91,4 @@ TEST_F(StarCacheEngineTest, adjust_inline_cache_count_limit) {
|
|||
ASSERT_OK(_cache->read(key, 0, val.size(), &buffer, nullptr));
|
||||
}
|
||||
}
|
||||
|
||||
TEST_F(StarCacheEngineTest, metrics) {
|
||||
insert_value(0);
|
||||
size_t kv_size = _cache->mem_usage();
|
||||
|
||||
for (size_t i = 1; i < 128; i++) {
|
||||
insert_value(i);
|
||||
}
|
||||
for (size_t i = 0; i < 10; i++) {
|
||||
_check_found(i);
|
||||
}
|
||||
for (size_t i = 200; i < 210; i++) {
|
||||
_check_not_found(i);
|
||||
}
|
||||
|
||||
ASSERT_EQ(_cache->mem_quota(), _mem_quota);
|
||||
ASSERT_EQ(_cache->mem_usage(), kv_size * 128);
|
||||
ASSERT_EQ(_cache->lookup_count(), 20);
|
||||
ASSERT_EQ(_cache->hit_count(), 10);
|
||||
|
||||
auto metrics = _cache->metrics();
|
||||
ASSERT_EQ(metrics.capacity, _mem_quota);
|
||||
ASSERT_EQ(metrics.usage, kv_size * 128);
|
||||
ASSERT_EQ(metrics.lookup_count, 20);
|
||||
ASSERT_EQ(metrics.hit_count, 10);
|
||||
ASSERT_EQ(metrics.object_item_count, 128);
|
||||
}
|
||||
|
||||
TEST_F(StarCacheEngineTest, prune) {
|
||||
for (size_t i = 0; i < 128; i++) {
|
||||
insert_value(i);
|
||||
}
|
||||
ASSERT_OK(_cache->prune());
|
||||
ASSERT_EQ(_cache->mem_usage(), 0);
|
||||
}
|
||||
|
||||
} // namespace starrocks
|
||||
|
|
|
|||
|
|
@ -22,6 +22,7 @@
|
|||
|
||||
#include "cache/block_cache/block_cache.h"
|
||||
#include "cache/block_cache/test_cache_utils.h"
|
||||
#include "cache/lrucache_engine.h"
|
||||
#include "cache/starcache_engine.h"
|
||||
#include "column/column_helper.h"
|
||||
#include "column/fixed_length_column.h"
|
||||
|
|
@ -3347,8 +3348,8 @@ TEST_F(FileReaderTest, TestStructSubfieldNoDecodeNotOutput) {
|
|||
}
|
||||
|
||||
TEST_F(FileReaderTest, TestReadFooterCache) {
|
||||
DiskCacheOptions options = TestCacheUtils::create_simple_options(256 * KB, 100 * MB);
|
||||
auto local_cache = std::make_shared<StarCacheEngine>();
|
||||
MemCacheOptions options{.mem_space_size = 100 * MB};
|
||||
auto local_cache = std::make_shared<LRUCacheEngine>();
|
||||
ASSERT_OK(local_cache->init(options));
|
||||
auto cache = std::make_shared<StoragePageCache>(local_cache.get());
|
||||
|
||||
|
|
|
|||
|
|
@ -62,7 +62,7 @@ public:
|
|||
|
||||
protected:
|
||||
evhttp_request* _evhttp_req = nullptr;
|
||||
std::shared_ptr<LocalCacheEngine> _cache;
|
||||
std::shared_ptr<LocalDiskCacheEngine> _cache;
|
||||
};
|
||||
|
||||
TEST_F(DataCacheActionTest, stat_success) {
|
||||
|
|
|
|||
|
|
@ -181,11 +181,11 @@ TEST_F(InternalServiceTest, test_fetch_datacache_via_brpc) {
|
|||
ASSERT_FALSE(st.ok());
|
||||
}
|
||||
|
||||
std::shared_ptr<BlockCache> cache(new BlockCache);
|
||||
std::shared_ptr<BlockCache> cache;
|
||||
{
|
||||
DiskCacheOptions options = TestCacheUtils::create_simple_options(256 * KB, 20 * MB);
|
||||
DiskCacheOptions options = TestCacheUtils::create_simple_options(256 * KB, 0, 20 * MB);
|
||||
options.inline_item_count_limit = 1000;
|
||||
auto cache = TestCacheUtils::create_cache(options);
|
||||
cache = TestCacheUtils::create_cache(options);
|
||||
|
||||
const size_t cache_size = 1024;
|
||||
const std::string cache_key = "test_file";
|
||||
|
|
|
|||
|
|
@ -1017,7 +1017,6 @@ TEST_P(LakePrimaryKeyPublishTest, test_write_rebuild_persistent_index) {
|
|||
}
|
||||
|
||||
TEST_P(LakePrimaryKeyPublishTest, test_abort_txn) {
|
||||
bool old_val = config::skip_pk_preload;
|
||||
config::skip_pk_preload = false;
|
||||
SyncPoint::GetInstance()->EnableProcessing();
|
||||
SyncPoint::GetInstance()->LoadDependency(
|
||||
|
|
|
|||
Loading…
Reference in New Issue