Revert "[Enhancement] Import starcache submodule to support DLA block cache. (#20164)" (#20422)

This reverts commit ed55db7d09.
This commit is contained in:
Gavin 2023-03-28 15:11:06 +08:00 committed by GitHub
parent 9f954815a0
commit a05e8bde34
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
20 changed files with 91 additions and 705 deletions

3
.gitmodules vendored
View File

@ -1,3 +0,0 @@
[submodule "be/src/submodules/starcache"]
path = be/src/submodules/starcache
url = https://github.com/StarRocks/starcache.git

View File

@ -73,7 +73,7 @@ option(WITH_GCOV "Build binary with gcov to get code coverage" OFF)
option(WITH_COMPRESS "Build binary with compresss debug section" ON)
option(WITH_CACHELIB "Build binary with cachelib library" OFF)
option(WITH_BLOCK_CACHE "Build binary with block cache feature" OFF)
option(WITH_BENCH "Build binary with bench" OFF)
@ -336,10 +336,6 @@ set_target_properties(serdes PROPERTIES IMPORTED_LOCATION ${THIRDPARTY_DIR}/lib/
add_library(opentelemetry_exporter_jaeger_trace STATIC IMPORTED GLOBAL)
set_target_properties(opentelemetry_exporter_jaeger_trace PROPERTIES IMPORTED_LOCATION ${THIRDPARTY_DIR}/lib64/libopentelemetry_exporter_jaeger_trace.a)
add_library(starcache STATIC IMPORTED)
set_target_properties(starcache PROPERTIES IMPORTED_LOCATION ${starcache_DIR}/lib64/libstarcache.a)
include_directories(${starcache_DIR}/include)
# cachelib
set(CACHELIB_DEPENDENCIES
cachelib_common
@ -365,7 +361,7 @@ set(CACHELIB_DEPENDENCIES
wangle
)
if (${WITH_CACHELIB} STREQUAL "ON")
if (${WITH_BLOCK_CACHE} STREQUAL "ON")
LINK_LIBRARIES(${THIRDPARTY_DIR}/cachelib/deps/lib64/libunwind.so)
LINK_LIBRARIES(${THIRDPARTY_DIR}/cachelib/deps/lib64/liblzma.so)
foreach(dep ${CACHELIB_DEPENDENCIES})
@ -467,8 +463,8 @@ set(CXX_COMMON_FLAGS "${CXX_COMMON_FLAGS} -Werror=return-type -Werror=switch")
if (${USE_STAROS} STREQUAL "ON")
set(CXX_COMMON_FLAGS "${CXX_COMMON_FLAGS} -DUSE_STAROS")
endif()
if (${WITH_CACHELIB} STREQUAL "ON")
set(CXX_COMMON_FLAGS "${CXX_COMMON_FLAGS} -DWITH_CACHELIB")
if (${WITH_BLOCK_CACHE} STREQUAL "ON")
set(CXX_COMMON_FLAGS "${CXX_COMMON_FLAGS} -DWITH_BLOCK_CACHE")
endif()
# When LLVM is used, should give GCC_HOME to get c++11 header to use new string and list
@ -623,10 +619,13 @@ set(STARROCKS_LINK_LIBS
TestUtil
Tools
Geo
BlockCache
${WL_END_GROUP}
)
if (${WITH_BLOCK_CACHE} STREQUAL "ON")
set(STARROCKS_LINK_LIBS ${STARROCKS_LINK_LIBS} BlockCache)
endif()
set(STARROCKS_DEPENDENCIES ${STARROCKS_DEPENDENCIES}
mysql
)
@ -704,11 +703,10 @@ set(STARROCKS_DEPENDENCIES
jansson
avro
serdes
starcache
${WL_END_GROUP}
)
if (${WITH_CACHELIB} STREQUAL "ON")
if (${WITH_BLOCK_CACHE} STREQUAL "ON")
set(STARROCKS_DEPENDENCIES
${STARROCKS_DEPENDENCIES}
${WL_START_GROUP}
@ -860,7 +858,6 @@ add_subdirectory(${SRC_DIR}/service)
add_subdirectory(${SRC_DIR}/testutil)
add_subdirectory(${SRC_DIR}/types)
add_subdirectory(${SRC_DIR}/udf)
add_subdirectory(${SRC_DIR}/block_cache)
add_subdirectory(${SRC_DIR}/tools)
add_subdirectory(${SRC_DIR}/util)
@ -870,6 +867,10 @@ if (WITH_BENCH STREQUAL "ON")
add_subdirectory(${SRC_DIR}/bench)
endif()
if (${WITH_BLOCK_CACHE} STREQUAL "ON")
add_subdirectory(${SRC_DIR}/block_cache)
endif()
if (${MAKE_TEST} STREQUAL "ON")
add_subdirectory(test)
endif ()

View File

@ -16,4 +16,3 @@ ADD_BE_BENCH(${SRC_DIR}/bench/chunks_sorter_bench)
ADD_BE_BENCH(${SRC_DIR}/bench/runtime_filter_bench)
ADD_BE_BENCH(${SRC_DIR}/bench/csv_reader_bench)
ADD_BE_BENCH(${SRC_DIR}/bench/shuffle_chunk_bench)
ADD_BE_BENCH(${SRC_DIR}/bench/block_cache_bench)

View File

@ -1,420 +0,0 @@
// Copyright 2023-present StarRocks, Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include <benchmark/benchmark.h>
#include <brpc/server.h>
#include <bvar/bvar.h>
#include <bvar/detail/agent_group.h>
#include <bvar/passive_status.h>
#include <bvar/reducer.h>
#include <gtest/gtest.h>
#include <testutil/assert.h>
#include <filesystem>
#include <memory>
#include <numeric>
#include "block_cache/star_cachelib.h"
#include "block_cache/starcache/common/config.h"
#include "common/config.h"
#include "common/statusor.h"
#include "util/logging.h"
#include "util/random.h"
#include "util/time.h"
#ifdef WITH_CACHELIB
#include "block_cache/fb_cachelib.h"
#endif
namespace starrocks {
constexpr size_t KB = 1024;
constexpr size_t MB = KB * 1024;
constexpr size_t GB = MB * 1024;
constexpr std::string DISK_CACHE_PATH = "./bench_dir/block_disk_cache";
void delete_dir_content(const std::string& dir_path) {
for (const auto& entry : std::filesystem::directory_iterator(dir_path)) {
std::filesystem::remove_all(entry.path());
}
}
enum class CacheEngine { CACHELIB, STARCACHE };
class BlockCacheBenchSuite {
public:
struct BenchParams {
CacheEngine cache_engine;
size_t obj_count = 0;
size_t obj_key_size = 0;
size_t obj_value_size = 0;
size_t read_size = 0;
bool pre_populate = true;
bool check_value = false;
bool random_read_offset = false;
bool random_obj_size = false;
size_t total_op_count = 0;
// means percentage in 100
size_t remove_op_ratio = 0;
};
struct BenchContext {
std::vector<std::string> obj_keys;
std::vector<size_t> obj_sizes;
std::atomic<size_t> finish_op_count = 0;
std::unique_ptr<bvar::LatencyRecorder> write_latency;
std::unique_ptr<bvar::LatencyRecorder> read_latency;
std::unique_ptr<bvar::LatencyRecorder> remove_latency;
std::unique_ptr<bvar::Adder<size_t>> write_op_count;
std::unique_ptr<bvar::Adder<size_t>> read_op_count;
std::unique_ptr<bvar::Adder<size_t>> remove_op_count;
std::unique_ptr<bvar::Adder<size_t>> write_bytes;
std::unique_ptr<bvar::PerSecond<bvar::Adder<size_t>>> write_throught;
std::unique_ptr<bvar::Adder<size_t>> read_bytes;
std::unique_ptr<bvar::PerSecond<bvar::Adder<size_t>>> read_throught;
std::unique_ptr<bvar::IntRecorder> avg_read_size;
std::unique_ptr<Random> rnd;
BenchContext() {
write_latency = std::make_unique<bvar::LatencyRecorder>("star_bench", "write_latency_us");
read_latency = std::make_unique<bvar::LatencyRecorder>("star_bench", "read_latency_us");
remove_latency = std::make_unique<bvar::LatencyRecorder>("star_bench", "remove_latency_us");
write_op_count = std::make_unique<bvar::Adder<size_t>>("star_bench", "write_op_count");
read_op_count = std::make_unique<bvar::Adder<size_t>>("star_bench", "read_op_count");
remove_op_count = std::make_unique<bvar::Adder<size_t>>("star_bench", "remove_op_count");
write_bytes = std::make_unique<bvar::Adder<size_t>>("star_bench", "write_bytes");
write_throught = std::make_unique<bvar::PerSecond<bvar::Adder<size_t>>>("star_bench", "write_throught",
write_bytes.get());
read_bytes = std::make_unique<bvar::Adder<size_t>>("star_bench", "read_bytes");
read_throught = std::make_unique<bvar::PerSecond<bvar::Adder<size_t>>>("star_bench", "read_throught",
read_bytes.get());
avg_read_size = std::make_unique<bvar::IntRecorder>("star_bench", "avg_read_size");
rnd = std::make_unique<Random>(0);
}
};
BlockCacheBenchSuite(const CacheOptions& options, const BenchParams& params) {
_params = new BlockCacheBenchSuite::BenchParams(params);
if (params.cache_engine == CacheEngine::STARCACHE) {
_cache = new StarCacheLib;
#ifdef WITH_CACHELIB
} else {
_cache = new FbCacheLib;
#endif
}
else {
DCHECK(false) << "Unsupported cache engine: " << params.cache_engine;
}
Status st = _cache->init(options);
DCHECK(st.ok()) << st.get_error_msg();
_ctx = new BenchContext();
}
~BlockCacheBenchSuite() {
delete _params;
delete _ctx;
delete _cache;
}
static void Setup(const benchmark::State& state) { brpc::StartDummyServerAt(8080); }
static void Teardown(const benchmark::State& state) { delete_dir_content(DISK_CACHE_PATH); }
static char index2ch(size_t index) { return 'a' + index % 26; }
static std::string gen_obj_key(size_t obj_index, size_t key_size, BenchContext* ctx) {
const int range = 26;
std::string key;
key.push_back(index2ch(obj_index));
for (size_t i = 0; i < key_size; ++i) {
int32_t n = ctx->rnd->Uniform(range - 1);
key.push_back('a' + n);
}
return key;
}
static std::string gen_obj_value(size_t obj_index, size_t value_size, BenchContext* ctx) {
std::string value;
value.assign(value_size, index2ch(obj_index));
return value;
}
static bool check_buffer(const char* data, size_t length, char ch) {
for (size_t i = 0; i < length; ++i) {
if (data[i] != ch) {
LOG(ERROR) << "check buffer failed, "
<< "real: " << data[i] << ", expect: " << ch << ", data: " << (uint64_t)data << ", i: " << i;
return false;
}
}
return true;
}
void do_bench(benchmark::State& state) {
for (size_t i = 0; i < _params->total_op_count; ++i) {
int index = _ctx->rnd->Uniform(_params->obj_count);
// remove
if (_params->remove_op_ratio > 0 && _ctx->rnd->Uniform(100) < _params->remove_op_ratio) {
int64_t start_us = MonotonicMicros();
Status st = _cache->remove_cache(_ctx->obj_keys[index]);
if (st.ok()) {
*(_ctx->remove_latency) << MonotonicMicros() - start_us;
*(_ctx->remove_op_count) << 1;
}
continue;
}
const size_t obj_value_size = _ctx->obj_sizes[index];
// read
char* value = new char[_params->read_size];
off_t delta = obj_value_size - _params->read_size;
off_t offset = 0;
if (_params->random_read_offset && delta > 0) {
offset = _ctx->rnd->Uniform(delta);
}
int64_t start_us = MonotonicMicros();
auto res = _cache->read_cache(_ctx->obj_keys[index], value, offset, _params->read_size);
if (res.ok()) {
*(_ctx->read_latency) << MonotonicMicros() - start_us;
*(_ctx->read_bytes) << res.value();
*(_ctx->avg_read_size) << res.value();
*(_ctx->read_op_count) << 1;
if (_params->check_value && !check_buffer(value, res.value(), index2ch(index))) {
ASSERT_TRUE(false) << "check read data correctness failed"
<< ", index: " << index << ", key: " << _ctx->obj_keys[index];
}
} else if (res.status().is_not_found()) {
std::string v = gen_obj_value(index, obj_value_size, _ctx);
start_us = MonotonicMicros();
Status st = _cache->write_cache(_ctx->obj_keys[index], v.data(), obj_value_size, 0);
ASSERT_TRUE(st.ok()) << "write cache failed: " << st.get_error_msg();
*(_ctx->write_latency) << MonotonicMicros() - start_us;
*(_ctx->write_bytes) << v.size();
*(_ctx->write_op_count) << 1;
char* read_value = new char[obj_value_size];
auto res = _cache->read_cache(_ctx->obj_keys[index], read_value, 0, _params->read_size);
delete[] read_value;
} else {
ASSERT_TRUE(false) << "read cache failed: " << res.status().get_error_msg();
}
delete[] value;
}
}
void do_prepare(bool pre_populate, bool random_obj_size) {
_ctx->obj_keys.reserve(_params->obj_count);
for (size_t i = 0; i < _params->obj_count; ++i) {
std::string key = gen_obj_key(i, _params->obj_key_size, _ctx);
size_t size = _params->obj_value_size;
if (random_obj_size) {
size = _ctx->rnd->Uniform(size);
}
if (pre_populate) {
std::string value = gen_obj_value(i, size, _ctx);
int64_t start_us = MonotonicMicros();
Status st = _cache->write_cache(key, value.data(), size, 0);
ASSERT_OK(st);
char* read_value = new char[_params->obj_value_size];
std::string read_key = i == 0 ? key : _ctx->obj_keys[0];
auto res = _cache->read_cache(read_key, read_value, 0, _params->read_size);
delete[] read_value;
*(_ctx->write_latency) << MonotonicMicros() - start_us;
*(_ctx->write_bytes) << value.size();
*(_ctx->write_op_count) << 1;
}
_ctx->obj_keys.push_back(key);
_ctx->obj_sizes.push_back(size);
}
}
BenchParams* params() { return _params; }
BenchContext* ctx() { return _ctx; }
private:
BenchParams* _params = nullptr;
BenchContext* _ctx = nullptr;
KvCache* _cache = nullptr;
};
static void do_bench_cache(benchmark::State& state, const CacheOptions& options,
const BlockCacheBenchSuite::BenchParams& params) {
static std::shared_ptr<BlockCacheBenchSuite> suite;
if (state.thread_index == 0) {
suite = std::make_shared<BlockCacheBenchSuite>(options, params);
suite->Setup(state);
suite->do_prepare(suite->params()->pre_populate, suite->params()->random_obj_size);
}
for (auto _ : state) {
suite->do_bench(state);
}
if (state.thread_index == 0) {
suite->Teardown(state);
}
state.counters["write_bytes"] = suite->ctx()->write_bytes->get_value();
state.counters["read_bytes"] = suite->ctx()->read_bytes->get_value();
//state.counters["write_throught"] = suite->ctx()->write_throught->get_value();
//state.counters["read_throught"] = suite->ctx()->read_throught->get_value();
}
template <class... Args>
static void BM_bench_cachelib(benchmark::State& state, Args&&... args) {
auto args_tuple = std::make_tuple(std::move(args)...);
std::get<0>(args_tuple).second.cache_engine = CacheEngine::CACHELIB;
do_bench_cache(state, std::get<0>(args_tuple).first, std::get<0>(args_tuple).second);
}
template <class... Args>
static void BM_bench_starcache(benchmark::State& state, Args&&... args) {
auto args_tuple = std::make_tuple(std::move(args)...);
std::get<0>(args_tuple).second.cache_engine = CacheEngine::STARCACHE;
do_bench_cache(state, std::get<0>(args_tuple).first, std::get<0>(args_tuple).second);
}
[[maybe_unused]] static std::pair<CacheOptions, BlockCacheBenchSuite::BenchParams> read_mem_suite() {
CacheOptions options;
options.mem_space_size = 5 * GB;
options.meta_path = DISK_CACHE_PATH;
options.disk_spaces.push_back({.path = DISK_CACHE_PATH, .size = 1 * GB});
options.block_size = 4 * MB;
options.checksum = false;
starcache::config::FLAGS_enable_disk_checksum = false;
BlockCacheBenchSuite::BenchParams params;
params.obj_count = 1000;
params.obj_key_size = 20;
params.obj_value_size = 1 * MB;
params.read_size = 1 * MB;
params.total_op_count = 300000;
params.check_value = false;
return std::make_pair(options, params);
}
[[maybe_unused]] static std::pair<CacheOptions, BlockCacheBenchSuite::BenchParams> read_disk_suite() {
CacheOptions options;
options.mem_space_size = 300 * MB;
options.meta_path = DISK_CACHE_PATH;
options.disk_spaces.push_back({.path = DISK_CACHE_PATH, .size = 10 * GB});
options.block_size = 4 * MB;
options.checksum = true;
starcache::config::FLAGS_enable_disk_checksum = true;
BlockCacheBenchSuite::BenchParams params;
params.obj_count = 1000;
params.obj_key_size = 20;
params.obj_value_size = 1 * MB;
params.read_size = 1 * MB;
params.total_op_count = 1000;
params.check_value = true;
return std::make_pair(options, params);
}
[[maybe_unused]] static std::pair<CacheOptions, BlockCacheBenchSuite::BenchParams> read_write_remove_disk_suite() {
CacheOptions options;
options.mem_space_size = 300 * MB;
options.meta_path = DISK_CACHE_PATH;
options.disk_spaces.push_back({.path = DISK_CACHE_PATH, .size = 10 * GB});
options.block_size = 4 * MB;
options.checksum = true;
starcache::config::FLAGS_enable_disk_checksum = true;
BlockCacheBenchSuite::BenchParams params;
params.obj_count = 1000;
params.obj_key_size = 20;
params.obj_value_size = 1 * MB;
params.read_size = 1 * MB;
params.total_op_count = 500;
params.check_value = true;
params.pre_populate = false;
//params.remove_op_ratio = 30;
return std::make_pair(options, params);
}
[[maybe_unused]] static std::pair<CacheOptions, BlockCacheBenchSuite::BenchParams> random_offset_read_suite() {
CacheOptions options;
options.mem_space_size = 300 * MB;
//options.mem_space_size = 8000 * MB;
options.meta_path = DISK_CACHE_PATH;
options.disk_spaces.push_back({.path = DISK_CACHE_PATH, .size = 10 * GB});
options.block_size = 1 * MB;
options.checksum = true;
starcache::config::FLAGS_enable_disk_checksum = true;
BlockCacheBenchSuite::BenchParams params;
params.obj_count = 1000;
params.obj_key_size = 20;
//params.obj_value_size = 1 * MB;
params.obj_value_size = 2 * MB + 123;
params.read_size = 257 * KB;
params.random_obj_size = true;
params.random_read_offset = true;
params.total_op_count = 10000;
params.check_value = true;
return std::make_pair(options, params);
}
// Read Mem
BENCHMARK_CAPTURE(BM_bench_starcache, bench_read_mem, read_mem_suite())->Threads(16);
// Read Disk
BENCHMARK_CAPTURE(BM_bench_starcache, bench_read_disk, read_disk_suite())->Threads(16);
// Read+Write+Remove Disk
BENCHMARK_CAPTURE(BM_bench_starcache, bench_read_write_remove_disk, read_write_remove_disk_suite())->Threads(16);
// Random offset for Read+Write+Remove Disk
BENCHMARK_CAPTURE(BM_bench_starcache, bench_random_offset_read, random_offset_read_suite())->Threads(16);
#ifdef WITH_CACHELIB
BENCHMARK_CAPTURE(BM_bench_cachelib, bench_read_mem, read_mem_suite())->Threads(16);
BENCHMARK_CAPTURE(BM_bench_cachelib, bench_read_disk, read_disk_suite())->Threads(16);
BENCHMARK_CAPTURE(BM_bench_cachelib, bench_read_write_remove_disk, read_write_remove_disk_suite())->Threads(16);
BENCHMARK_CAPTURE(BM_bench_cachelib, bench_random_offset_read, random_offset_read_suite())->Threads(16);
#endif
} // namespace starrocks
//BENCHMARK_MAIN();
int main(int argc, char** argv) {
starrocks::init_glog("be_bench", true);
std::filesystem::create_directories(starrocks::DISK_CACHE_PATH);
starrocks::delete_dir_content(starrocks::DISK_CACHE_PATH);
::benchmark::Initialize(&argc, argv);
if (::benchmark::ReportUnrecognizedArguments(argc, argv)) return 1;
::benchmark::RunSpecifiedBenchmarks();
::benchmark::Shutdown();
//std::filesystem::remove_all("./bench_dir");
return 0;
}

View File

@ -1,4 +1,4 @@
# Copyright 2023-present StarRocks, Inc. All rights reserved.
# Copyright 2021-present StarRocks, Inc. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@ -20,22 +20,15 @@ set(EXECUTABLE_OUTPUT_PATH "${BUILD_DIR}/src/block_cache")
set(CACHE_FILES
block_cache.cpp
star_cachelib.cpp
fb_cachelib.cpp
)
if (${WITH_CACHELIB} STREQUAL "ON")
list(APPEND CACHE_FILES fb_cachelib.cpp)
endif()
add_library(BlockCache STATIC
${CACHE_FILES}
)
if (${WITH_CACHELIB} STREQUAL "ON")
set(CACHELIB_DIR ${THIRDPARTY_DIR}/cachelib)
link_directories(${CACHELIB_DIR}/deps/lib64)
include_directories(AFTER ${CACHELIB_DIR}/include)
include_directories(AFTER ${CACHELIB_DIR}/deps/include)
endif()
set(CACHELIB_DIR ${THIRDPARTY_DIR}/cachelib)
link_directories(${CACHELIB_DIR}/deps/lib64)
include_directories(AFTER ${CACHELIB_DIR}/include)
include_directories(AFTER ${CACHELIB_DIR}/deps/include)

View File

@ -1,4 +1,4 @@
// Copyright 2023-present StarRocks, Inc. All rights reserved.
// Copyright 2021-present StarRocks, Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
@ -16,11 +16,7 @@
#include <fmt/format.h>
#ifdef WITH_CACHELIB
#include "block_cache/fb_cachelib.h"
#endif
#include "block_cache/star_cachelib.h"
#include "common/config.h"
#include "common/logging.h"
#include "common/statusor.h"
@ -28,6 +24,10 @@
namespace starrocks {
BlockCache::BlockCache() {
_kv_cache = std::make_unique<FbCacheLib>();
}
BlockCache* BlockCache::instance() {
static BlockCache cache;
return &cache;
@ -36,18 +36,6 @@ BlockCache* BlockCache::instance() {
Status BlockCache::init(const CacheOptions& options) {
// TODO: check block size limit
_block_size = options.block_size;
if (options.engine == "starcache") {
_kv_cache = std::make_unique<StarCacheLib>();
LOG(INFO) << "init starcache block engine";
#ifdef WITH_CACHELIB
} else if (options.engine == "cachelib") {
_kv_cache = std::make_unique<FbCacheLib>();
LOG(INFO) << "init cachelib block engine";
#endif
} else {
LOG(ERROR) << "unsupported block cache engine: " << options.engine;
return Status::NotSupported("unsupported block cache engine");
}
return _kv_cache->init(options);
}
@ -96,9 +84,7 @@ Status BlockCache::remove_cache(const CacheKey& cache_key, off_t offset, size_t
}
Status BlockCache::shutdown() {
Status st = _kv_cache->shutdown();
_kv_cache = nullptr;
return st;
return _kv_cache->shutdown();
}
} // namespace starrocks

View File

@ -1,4 +1,4 @@
// Copyright 2023-present StarRocks, Inc. All rights reserved.
// Copyright 2021-present StarRocks, Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
@ -50,10 +50,10 @@ public:
size_t block_size() const { return _block_size; }
private:
BlockCache() = default;
BlockCache();
size_t _block_size = 0;
std::unique_ptr<KvCache> _kv_cache = nullptr;
std::unique_ptr<KvCache> _kv_cache;
};
} // namespace starrocks

View File

@ -35,7 +35,6 @@ struct CacheOptions {
bool checksum;
size_t max_parcel_memory_mb;
size_t max_concurrent_inserts;
std::string engine;
};
} // namespace starrocks

View File

@ -1,82 +0,0 @@
// Copyright 2023-present StarRocks, Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "block_cache/star_cachelib.h"
#include "common/logging.h"
#include "common/statusor.h"
#include "gutil/strings/fastmem.h"
#include "util/filesystem_util.h"
namespace starrocks {
Status StarCacheLib::init(const CacheOptions& options) {
starcache::CacheOptions opt;
opt.mem_quota_bytes = options.mem_space_size;
for (auto& dir : options.disk_spaces) {
opt.disk_dir_spaces.push_back({.path = dir.path, .quota_bytes = dir.size});
}
opt.checksum = options.checksum;
opt.block_size = options.block_size;
_cache = std::make_unique<starcache::StarCache>();
return to_status(_cache->init(opt));
}
static void empty_deleter(void* buf) {}
Status StarCacheLib::write_cache(const std::string& key, const char* value, size_t size, size_t ttl_seconds) {
butil::IOBuf buf;
// Don't free the buffer passed by users
buf.append_user_data((void*)value, size, empty_deleter);
return to_status(_cache->set(key, buf));
}
StatusOr<size_t> StarCacheLib::read_cache(const std::string& key, char* value, size_t off, size_t size) {
butil::IOBuf buf;
RETURN_IF_ERROR(to_status(_cache->read(key, off, size, &buf)));
// to check if cached.
if (value == nullptr) {
return 0;
}
copy_iobuf(buf, value);
return buf.size();
}
Status StarCacheLib::remove_cache(const std::string& key) {
_cache->remove(key);
return Status::OK();
}
std::unordered_map<std::string, double> StarCacheLib::cache_stats() {
// TODO: fill some statistics information
std::unordered_map<std::string, double> stats;
return stats;
}
Status StarCacheLib::shutdown() {
return Status::OK();
}
void copy_iobuf(const butil::IOBuf& buf, char* value) {
off_t off = 0;
for (size_t i = 0; i < buf.backing_block_num(); ++i) {
auto sp = buf.backing_block(i);
if (!sp.empty()) {
strings::memcpy_inlined(value + off, (void*)sp.data(), sp.size());
off += sp.size();
}
}
}
} // namespace starrocks

View File

@ -1,67 +0,0 @@
// Copyright 2023-present StarRocks, Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#pragma once
#include <starcache/star_cache.h>
#include "block_cache/kv_cache.h"
#include "common/status.h"
namespace starrocks {
class StarCacheLib : public KvCache {
public:
StarCacheLib() = default;
~StarCacheLib() override = default;
Status init(const CacheOptions& options) override;
Status write_cache(const std::string& key, const char* value, size_t size, size_t ttl_seconds) override;
StatusOr<size_t> read_cache(const std::string& key, char* value, size_t off, size_t size) override;
Status remove_cache(const std::string& key) override;
std::unordered_map<std::string, double> cache_stats() override;
Status shutdown() override;
private:
std::unique_ptr<starcache::StarCache> _cache = nullptr;
};
// Inorder to split the starcache library to a separate registry for other users such as the cloud team,
// we decouple it and starrocks as much as possible. So we use the butil::Status in starcache instead of starrocks::Status.
// This function is used to convert the butil::Status from starcache to starrocks::Status.
inline Status to_status(const butil::Status& st) {
switch (st.error_code()) {
case 0:
return Status::OK();
case ENOENT:
return Status::NotFound(st.error_str());
case EEXIST:
return Status::AlreadyExist(st.error_str());
case EINVAL:
return Status::InvalidArgument(st.error_str());
case EIO:
return Status::IOError(st.error_str());
default:
return Status::InternalError(st.error_str());
}
}
void copy_iobuf(const butil::IOBuf& buf, char* value);
} // namespace starrocks

View File

@ -880,8 +880,6 @@ CONF_Int64(block_cache_max_concurrent_inserts, "1000000");
// Once this is reached, requests will be rejected until the parcel memory usage gets under the limit.
CONF_Int64(block_cache_max_parcel_memory_mb, "256");
CONF_Bool(block_cache_report_stats, "false");
// cachelib, starcache
CONF_String(block_cache_engine, "starcache");
CONF_mInt64(l0_l1_merge_ratio, "10");
CONF_mInt64(l0_max_file_size, "209715200"); // 200MB

View File

@ -33,6 +33,7 @@ CacheInputStream::CacheInputStream(std::shared_ptr<SeekableInputStream> stream,
_stream(stream),
_offset(0),
_size(size) {
#ifdef WITH_BLOCK_CACHE
// _cache_key = _filename;
// use hash(filename) as cache key.
_cache_key.resize(16);
@ -42,8 +43,10 @@ CacheInputStream::CacheInputStream(std::shared_ptr<SeekableInputStream> stream,
int64_t file_size = _size;
memcpy(data + 8, &file_size, sizeof(file_size));
_buffer.reserve(BlockCache::instance()->block_size());
#endif
}
#ifdef WITH_BLOCK_CACHE
Status CacheInputStream::read_at_fully(int64_t offset, void* out, int64_t count) {
BlockCache* cache = BlockCache::instance();
count = std::min(_size - offset, count);
@ -121,6 +124,13 @@ Status CacheInputStream::read_at_fully(int64_t offset, void* out, int64_t count)
DCHECK(p == pe);
return Status::OK();
}
#else
Status CacheInputStream::read_at_fully(int64_t offset, void* out, int64_t count) {
int64_t load_size = std::min(count, _size - offset);
RETURN_IF_ERROR(_stream->read_at_fully(offset, out, load_size));
return Status::OK();
}
#endif
StatusOr<int64_t> CacheInputStream::read(void* data, int64_t count) {
RETURN_IF_ERROR(read_at_fully(_offset, data, count));
@ -144,8 +154,12 @@ StatusOr<int64_t> CacheInputStream::get_size() {
}
int64_t CacheInputStream::get_align_size() const {
#ifdef WITH_BLOCK_CACHE
BlockCache* cache = BlockCache::instance();
return cache->block_size();
#else
return 0;
#endif
}
StatusOr<std::string_view> CacheInputStream::peek(int64_t count) {
@ -158,6 +172,7 @@ StatusOr<std::string_view> CacheInputStream::peek(int64_t count) {
return s;
}
#ifdef WITH_BLOCK_CACHE
void CacheInputStream::_populate_cache_from_zero_copy_buffer(const char* p, int64_t offset, int64_t count) {
BlockCache* cache = BlockCache::instance();
const int64_t BLOCK_SIZE = cache->block_size();
@ -193,5 +208,10 @@ void CacheInputStream::_populate_cache_from_zero_copy_buffer(const char* p, int6
}
return;
}
#else
void CacheInputStream::_populate_cache_from_zero_copy_buffer(const char* p, int64_t offset, int64_t size) {
return;
}
#endif
} // namespace starrocks::io

View File

@ -207,6 +207,7 @@ int main(int argc, char** argv) {
}
#endif
#ifdef WITH_BLOCK_CACHE
if (starrocks::config::block_cache_enable) {
starrocks::BlockCache* cache = starrocks::BlockCache::instance();
starrocks::CacheOptions cache_options;
@ -230,9 +231,9 @@ int main(int argc, char** argv) {
cache_options.checksum = starrocks::config::block_cache_checksum_enable;
cache_options.max_parcel_memory_mb = starrocks::config::block_cache_max_parcel_memory_mb;
cache_options.max_concurrent_inserts = starrocks::config::block_cache_max_concurrent_inserts;
cache_options.engine = starrocks::config::block_cache_engine;
cache->init(cache_options);
}
#endif
Aws::SDKOptions aws_sdk_options;
if (starrocks::config::aws_sdk_logging_trace_enabled) {
@ -349,9 +350,9 @@ int main(int argc, char** argv) {
start_be();
}
if (starrocks::config::block_cache_enable) {
starrocks::BlockCache::instance()->shutdown();
}
#ifdef WITH_BLOCK_CACHE
starrocks::BlockCache::instance()->shutdown();
#endif
daemon->stop();
daemon.reset();

@ -1 +0,0 @@
Subproject commit 67b769a368b14308f6dbd2bae8c10ef32111dc9e

View File

@ -345,13 +345,15 @@ set(EXEC_FILES
./util/cpu_usage_info_test.cpp
./gutil/sysinfo-test.cc
./service/lake_service_test.cpp
./block_cache/block_cache_test.cpp
./io/cache_input_stream_test.cpp
)
if ("${USE_STAROS}" STREQUAL "ON")
list(APPEND EXEC_FILES ./fs/fs_starlet_test.cpp)
endif ()
if ("${WITH_BLOCK_CACHE}" STREQUAL "ON")
list(APPEND EXEC_FILES ./block_cache/block_cache_test.cpp)
list(APPEND EXEC_FILES ./io/cache_input_stream_test.cpp)
endif ()
if (USE_AVX2)
set(EXEC_FILES ${EXEC_FILES} ./column/avx_numeric_column_test.cpp)

View File

@ -32,18 +32,17 @@ protected:
TEST_F(BlockCacheTest, hybrid_cache) {
BlockCache* cache = BlockCache::instance();
const size_t block_size = 1 * 1024 * 1024;
const size_t block_size = 4 * 1024;
CacheOptions options;
options.mem_space_size = 20 * 1024 * 1024;
size_t quota = 500 * 1024 * 1024;
options.disk_spaces.push_back({.path = "./ut_dir/block_disk_cache", .size = quota});
options.block_size = block_size;
options.engine = "starcache";
Status status = cache->init(options);
ASSERT_TRUE(status.ok());
const size_t batch_size = block_size - 1234;
const size_t batch_size = 2 * block_size + 1234;
const size_t rounds = 20;
const std::string cache_key = "test_file";
@ -77,11 +76,13 @@ TEST_F(BlockCacheTest, hybrid_cache) {
auto res = cache->read_cache(cache_key, 0, batch_size, value);
ASSERT_TRUE(res.status().is_not_found());
// invalid offset
res = cache->read_cache(cache_key, 1000000, batch_size, value);
ASSERT_TRUE(res.status().is_invalid_argument());
// not found
res = cache->read_cache(cache_key, block_size * 1000, batch_size, value);
ASSERT_TRUE(res.status().is_not_found());
cache->shutdown();
}
} // namespace starrocks

View File

@ -51,15 +51,16 @@ private:
class CacheInputStreamTest : public ::testing::Test {
public:
static void SetUpTestCase() {
ASSERT_TRUE(fs::create_directories("./ut_dir/block_disk_cache").ok());
auto cache = BlockCache::instance();
CacheOptions options;
options.mem_space_size = 100 * 1024 * 1024;
options.engine = "starcache";
options.mem_space_size = 20 * 1024 * 1024;
options.disk_spaces.push_back({.path = "./ut_dir/block_disk_cache", .size = 50 * 1024 * 1024});
options.block_size = block_size;
ASSERT_OK(cache->init(options));
}
static void TearDownTestCase() { BlockCache::instance()->shutdown(); }
static void TearDownTestCase() { ASSERT_TRUE(fs::remove_all("./ut_dir").ok()); }
void SetUp() override {}
void TearDown() override {}
@ -93,15 +94,14 @@ public:
const int64_t CacheInputStreamTest::block_size = 1024 * 1024;
TEST_F(CacheInputStreamTest, test_aligned_read) {
const int64_t block_count = 3;
const int64_t block_count = 4;
int64_t data_size = block_size * block_count;
char data[data_size + 1];
gen_test_data(data, data_size, block_size);
std::shared_ptr<io::SeekableInputStream> stream(new MockSeekableInputStream(data, data_size));
io::CacheInputStream cache_stream(stream, "test_file1", data_size);
cache_stream.set_enable_populate_cache(true);
io::CacheInputStream cache_stream("test_file1", stream);
auto& stats = cache_stream.stats();
// first read from backend
@ -123,15 +123,14 @@ TEST_F(CacheInputStreamTest, test_aligned_read) {
}
TEST_F(CacheInputStreamTest, test_random_read) {
const int64_t block_count = 3;
const int64_t block_count = 4;
const int64_t data_size = block_size * block_count;
char data[data_size + 1];
gen_test_data(data, data_size, block_size);
std::shared_ptr<io::SeekableInputStream> stream(new MockSeekableInputStream(data, data_size));
io::CacheInputStream cache_stream(stream, "test_file2", data_size);
cache_stream.set_enable_populate_cache(true);
io::CacheInputStream cache_stream("test_file2", stream);
auto& stats = cache_stream.stats();
// first read from backend
@ -154,8 +153,9 @@ TEST_F(CacheInputStreamTest, test_random_read) {
ASSERT_TRUE(check_data_content(buffer, block_size - off_in_block, 'a' + 1));
ASSERT_TRUE(check_data_content(buffer + block_size - off_in_block, block_size, 'a' + 2));
ASSERT_TRUE(check_data_content(buffer + 2 * block_size - off_in_block, off_in_block, 'a' + 3));
ASSERT_EQ(stats.read_cache_count, 2);
ASSERT_EQ(stats.read_cache_count, 3);
}
} // namespace starrocks::io

View File

@ -170,34 +170,3 @@ export_cachelib_lib_path() {
CACHELIB_DIR=$STARROCKS_HOME/lib/cachelib
export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:$CACHELIB_DIR/lib64
}
build_submodules()
{
local build_type=$1
local clean=$2
pushd ${STARROCKS_HOME} &>/dev/null
git submodule init
git submodule update
# build starcache
STARCACHE_HOME=${STARROCKS_HOME}/be/src/submodules/starcache
cd ${STARCACHE_HOME}
if [ ${clean} -eq 1 ]; then
STARCACHE_GCC_HOME=${STARROCKS_GCC_HOME} \
STARCACHE_CMAKE_CMD=${CMAKE_CMD} \
STARCACHE_THIRDPARTY=${STARROCKS_THIRDPARTY}/installed \
STARCACHE_INSTALL_DIR=${STARCACHE_HOME}/installed \
BUILD_TYPE=${build_type} ./build-scripts/cmake-build.sh --clean
else
if test -f "${STARCACHE_HOME}/installed/lib64/libstarcache.a"; then
return
fi
STARCACHE_GCC_HOME=${STARROCKS_GCC_HOME} \
STARCACHE_CMAKE_CMD=${CMAKE_CMD} \
STARCACHE_THIRDPARTY=${STARROCKS_THIRDPARTY}/installed \
STARCACHE_INSTALL_DIR=${STARCACHE_HOME}/installed \
BUILD_TYPE=${build_type} ./build-scripts/cmake-build.sh
fi
popd
}

View File

@ -42,6 +42,7 @@ export STARROCKS_HOME=${ROOT}
. ${STARROCKS_HOME}/env.sh
if [[ $OSTYPE == darwin* ]] ; then
PARALLEL=$(sysctl -n hw.ncpu)
# We know for sure that build-thirdparty.sh will fail on darwin platform, so just skip the step.
@ -138,14 +139,14 @@ if [ -e /proc/cpuinfo ] ; then
fi
if [[ "${MACHINE_TYPE}" == "aarch64" ]]; then
# force turn off cachelib on arm platform
WITH_CACHELIB=OFF
elif [[ -z ${WITH_CACHELIB} ]]; then
WITH_CACHELIB=OFF
# force turn off block cache on arm platform
WITH_BLOCK_CACHE=OFF
elif [[ -z ${WITH_BLOCK_CACHE} ]]; then
WITH_BLOCK_CACHE=OFF
fi
if [[ "${WITH_CACHELIB}" == "ON" && ! -f ${STARROCKS_THIRDPARTY}/installed/cachelib/lib/libcachelib_allocator.a ]]; then
echo "WITH_CACHELIB=ON but missing depdency libraries(cachelib)"
if [[ "${WITH_BLOCK_CACHE}" == "ON" && ! -f ${STARROCKS_THIRDPARTY}/installed/cachelib/lib/libcachelib_allocator.a ]]; then
echo "WITH_BLOCK_CACHE=ON but missing depdency libraries(cachelib)"
exit 1
fi
@ -223,7 +224,7 @@ echo "Get params:
USE_AVX512 -- $USE_AVX512
PARALLEL -- $PARALLEL
ENABLE_QUERY_DEBUG_TRACE -- $ENABLE_QUERY_DEBUG_TRACE
WITH_CACHELIB -- $WITH_CACHELIB
WITH_BLOCK_CACHE -- $WITH_BLOCK_CACHE
USE_JEMALLOC -- $USE_JEMALLOC
"
@ -278,17 +279,11 @@ if [ ${BUILD_BE} -eq 1 ] ; then
if [ "${WITH_GCOV}" = "ON" ]; then
CMAKE_BUILD_DIR=${STARROCKS_HOME}/be/build_${CMAKE_BUILD_TYPE}_gcov
fi
if [ ${CLEAN} -eq 1 ]; then
rm -rf $CMAKE_BUILD_DIR
rm -rf ${STARROCKS_HOME}/be/output/
fi
mkdir -p ${CMAKE_BUILD_DIR}
. ${STARROCKS_HOME}/bin/common.sh
build_submodules ${CMAKE_BUILD_TYPE} ${CLEAN}
cd ${CMAKE_BUILD_DIR}
if [ "${USE_STAROS}" == "ON" ]; then
if [ -z "$STARLET_INSTALL_DIR" ] ; then
@ -307,8 +302,7 @@ if [ ${BUILD_BE} -eq 1 ] ; then
-DCMAKE_EXPORT_COMPILE_COMMANDS=ON \
-DUSE_STAROS=${USE_STAROS} \
-DWITH_BENCH=${WITH_BENCH} \
-DWITH_CACHELIB=${WITH_CACHELIB} \
-Dstarcache_DIR=${STARROCKS_HOME}/be/src/submodules/starcache/installed \
-DWITH_BLOCK_CACHE=${WITH_BLOCK_CACHE} \
-Dabsl_DIR=${STARLET_INSTALL_DIR}/third_party/lib/cmake/absl \
-DgRPC_DIR=${STARLET_INSTALL_DIR}/third_party/lib/cmake/grpc \
-Dprometheus-cpp_DIR=${STARLET_INSTALL_DIR}/third_party/lib/cmake/prometheus-cpp \
@ -324,8 +318,7 @@ if [ ${BUILD_BE} -eq 1 ] ; then
-DENABLE_QUERY_DEBUG_TRACE=$ENABLE_QUERY_DEBUG_TRACE \
-DUSE_JEMALLOC=$USE_JEMALLOC \
-DWITH_BENCH=${WITH_BENCH} \
-DWITH_CACHELIB=${WITH_CACHELIB} \
-Dstarcache_DIR=${STARROCKS_HOME}/be/src/submodules/starcache/installed \
-DWITH_BLOCK_CACHE=${WITH_BLOCK_CACHE} \
-DCMAKE_EXPORT_COMPILE_COMMANDS=ON ..
fi
time ${BUILD_SYSTEM} -j${PARALLEL}
@ -436,7 +429,7 @@ if [ ${BUILD_BE} -eq 1 ]; then
rm -f ${STARROCKS_OUTPUT}/be/lib/hadoop/common/lib/log4j-1.2.17.jar
rm -f ${STARROCKS_OUTPUT}/be/lib/hadoop/hdfs/lib/log4j-1.2.17.jar
if [ "${WITH_CACHELIB}" == "ON" ]; then
if [ "${WITH_BLOCK_CACHE}" == "ON" ]; then
mkdir -p ${STARROCKS_OUTPUT}/be/lib/cachelib
cp -r -p ${CACHELIB_DIR}/deps/lib64 ${STARROCKS_OUTPUT}/be/lib/cachelib/
fi

View File

@ -83,7 +83,7 @@ TEST_MODULE=".*"
HELP=0
WITH_AWS=OFF
USE_STAROS=OFF
WITH_CACHELIB=OFF
WITH_BLOCK_CACHE=OFF
WITH_GCOV=OFF
while true; do
case "$1" in
@ -132,11 +132,8 @@ if [ ! -d ${CMAKE_BUILD_DIR} ]; then
mkdir -p ${CMAKE_BUILD_DIR}
fi
. ${STARROCKS_HOME}/bin/common.sh
build_submodules ${CMAKE_BUILD_TYPE} ${CLEAN}
cd ${CMAKE_BUILD_DIR}
if [ "${USE_STAROS}" == "ON" ]; then
if [ -z "$STARLET_INSTALL_DIR" ] ; then
# assume starlet_thirdparty is installed to ${STARROCKS_THIRDPARTY}/installed/starlet/
@ -150,8 +147,7 @@ if [ "${USE_STAROS}" == "ON" ]; then
-DUSE_AVX2=$USE_AVX2 -DUSE_AVX512=$USE_AVX512 -DUSE_SSE4_2=$USE_SSE4_2 \
-DCMAKE_EXPORT_COMPILE_COMMANDS=ON \
-DUSE_STAROS=${USE_STAROS} -DWITH_GCOV=${WITH_GCOV} \
-DWITH_CACHELIB=${WITH_CACHELIB} \
-Dstarcache_DIR=${STARROCKS_HOME}/be/src/submodules/starcache/installed \
-DWITH_BLOCK_CACHE=${WITH_BLOCK_CACHE} \
-Dabsl_DIR=${STARLET_INSTALL_DIR}/third_party/lib/cmake/absl \
-DgRPC_DIR=${STARLET_INSTALL_DIR}/third_party/lib/cmake/grpc \
-Dprometheus-cpp_DIR=${STARLET_INSTALL_DIR}/third_party/lib/cmake/prometheus-cpp \
@ -164,8 +160,7 @@ else
-DMAKE_TEST=ON -DCMAKE_BUILD_TYPE=${CMAKE_BUILD_TYPE} \
-DUSE_AVX2=$USE_AVX2 -DUSE_AVX512=$USE_AVX512 -DUSE_SSE4_2=$USE_SSE4_2 \
-DWITH_GCOV=${WITH_GCOV} \
-DWITH_CACHELIB=${WITH_CACHELIB} \
-Dstarcache_DIR=${STARROCKS_HOME}/be/src/submodules/starcache/installed \
-DWITH_BLOCK_CACHE=${WITH_BLOCK_CACHE} \
-DCMAKE_EXPORT_COMPILE_COMMANDS=ON ../
fi
${BUILD_SYSTEM} -j${PARALLEL}
@ -188,6 +183,8 @@ mkdir -p $LOG_DIR
mkdir -p ${UDF_RUNTIME_DIR}
rm -f ${UDF_RUNTIME_DIR}/*
. ${STARROCKS_HOME}/bin/common.sh
# ====================== configure JAVA/JVM ====================
# NOTE: JAVA_HOME must be configed if using hdfs scan, like hive external table
# this is only for starting be
@ -212,7 +209,7 @@ else
fi
export LD_LIBRARY_PATH=$STARROCKS_HOME/lib/hadoop/native:$LD_LIBRARY_PATH
if [ "${WITH_CACHELIB}" == "ON" ]; then
if [ "${WITH_BLOCK_CACHE}" == "ON" ]; then
CACHELIB_DIR=${STARROCKS_THIRDPARTY}/installed/cachelib
export LD_LIBRARY_PATH=$CACHELIB_DIR/lib:$CACHELIB_DIR/lib64:$CACHELIB_DIR/deps/lib:$CACHELIB_DIR/deps/lib64:$LD_LIBRARY_PATH
fi