[BugFix] fix persistent index compatibility issue when migrate between different cpu arch (#59219)

Signed-off-by: luohaha <18810541851@163.com>
This commit is contained in:
Yixin Luo 2025-05-27 13:58:33 +08:00 committed by GitHub
parent 3c61005e17
commit 2f1ec5e920
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 485 additions and 72 deletions

View File

@ -72,7 +72,7 @@ struct DistinctAggregateState<LT, SumLT, FixedLengthLTGuard<LT>> {
void serialize(uint8_t* dst) const {
phmap::InMemoryOutput output(reinterpret_cast<char*>(dst));
set.dump(output);
[[maybe_unused]] auto err = set.dump(output);
DCHECK(output.length() == set.dump_bound());
}
@ -80,10 +80,10 @@ struct DistinctAggregateState<LT, SumLT, FixedLengthLTGuard<LT>> {
phmap::InMemoryInput input(reinterpret_cast<const char*>(src));
auto old_size = set.size();
if (old_size == 0) {
set.load(input);
[[maybe_unused]] auto err = set.load(input);
} else {
MyHashSet set_src;
set_src.load(input);
[[maybe_unused]] auto err = set_src.load(input);
set.merge(set_src);
}
}

View File

@ -60,6 +60,12 @@ constexpr size_t kPackSize = 16;
constexpr size_t kBucketSizeMax = 256;
constexpr size_t kFixedMaxKeySize = 128;
constexpr size_t kBatchBloomFilterReadSize = 4ULL << 20;
constexpr uint32_t kMutableIndexFormatVersion1 = 1;
constexpr uint32_t kMutableIndexFormatVersion2 = 2;
// The introduction of this magic number serves two purposes:
// 1. To detect endianness mismatches in cross-platform scenarios
// 2. To identify the new snapshot encoding format
constexpr uint32_t kSnapshotMagicNum = 0xF2345678;
const char* const kIndexFileMagic = "IDX1";
@ -773,7 +779,7 @@ Status ImmutableIndexWriter::finish() {
_meta.compression_type());
_version.to_pb(_meta.mutable_version());
_meta.set_size(_total);
_meta.set_format_version(PERSISTENT_INDEX_VERSION_6);
_meta.set_format_version(PERSISTENT_INDEX_VERSION_7);
for (const auto& [key_size, shard_info] : _shard_info_by_length) {
const auto [shard_offset, shard_num] = shard_info;
auto info = _meta.add_shard_info();
@ -963,9 +969,31 @@ public:
}
Status load_snapshot(phmap::BinaryInputArchive& ar) override {
bool succ = false;
TRY_CATCH_BAD_ALLOC(succ = _map.load(ar));
RETURN_IF(!succ, Status::InternalError("load snapshot failed"));
if (_mutable_index_format_version == kMutableIndexFormatVersion1) {
TRY_CATCH_BAD_ALLOC(RETURN_IF_ERROR(_map.load(ar)));
} else if (_mutable_index_format_version == kMutableIndexFormatVersion2) {
// We introduced the new format specifically to address cross-platform compatibility issues with snapshot files.
// In previous format, we met issue when migrate from x86 to arm64.
// https://github.com/StarRocks/starrocks/issues/57952
uint64_t size = 0;
RETURN_IF(!ar.load(&size), Status::Corruption("FixedMutableIndex load snapshot size failed"));
RETURN_IF(size == 0, Status::OK());
TRY_CATCH_BAD_ALLOC(reserve(size));
for (auto i = 0; i < size; ++i) {
KeyType key;
IndexValue value;
RETURN_IF((!ar.load(reinterpret_cast<char*>(&key), sizeof(KeyType))),
Status::Corruption("FixedMutableIndex load snapshot failed because load key failed"));
RETURN_IF((!ar.load(reinterpret_cast<char*>(&value), sizeof(IndexValue))),
Status::Corruption("FixedMutableIndex load snapshot failed because load value failed"));
uint64_t hash = FixedKeyHash<KeySize>()(key);
if (auto [it, inserted] = _map.emplace_with_hash(hash, key, value); !inserted) {
it->second = value;
}
}
} else {
return Status::Corruption("FixedMutableIndex load snapshot failed because format version is not supported");
}
return Status::OK();
}
@ -1008,7 +1036,33 @@ public:
// will use `sizeof(size_t)` as return value.
size_t dump_bound() override { return _map.empty() ? sizeof(size_t) : _map.dump_bound(); }
bool dump(phmap::BinaryOutputArchive& ar) override { return _map.dump(ar); }
Status completeness_check(phmap::BinaryInputArchive& ar) override { return _map.completeness_check(ar); }
Status dump(phmap::BinaryOutputArchive& ar) override {
bool use_old_format = false;
TEST_SYNC_POINT_CALLBACK("FixedMutableIndex::dump::1", &use_old_format);
if (UNLIKELY(use_old_format)) {
// For UT only.
RETURN_IF_ERROR(_map.dump(ar));
return Status::OK();
}
if (!ar.dump(static_cast<uint64_t>(size()))) {
return Status::InternalError("FixedMutableIndex dump size failed");
}
if (size() == 0) {
return Status::OK();
}
for (const auto& each : _map) {
if (!ar.dump(reinterpret_cast<const char*>(each.first.data), sizeof(KeyType))) {
return Status::InternalError("FixedMutableIndex dump key failed");
}
if (!ar.dump(reinterpret_cast<const char*>(&each.second), sizeof(IndexValue))) {
return Status::InternalError("FixedMutableIndex dump value failed");
}
}
return Status::OK();
}
Status pk_dump(PrimaryKeyDump* dump, PrimaryIndexDumpPB* dump_pb) override {
for (const auto& each : _map) {
@ -1060,8 +1114,11 @@ public:
size_t memory_usage() override { return _map.capacity() * (1 + (KeySize + 3) / 4 * 4 + kIndexValueSize); }
void set_mutable_index_format_version(uint32_t ver) override { _mutable_index_format_version = ver; }
private:
phmap::flat_hash_map<KeyType, IndexValue, FixedKeyHash<KeySize>> _map;
uint32_t _mutable_index_format_version = kMutableIndexFormatVersion2;
};
std::tuple<size_t, size_t, size_t> MutableIndex::estimate_nshard_and_npage(const size_t total_kv_pairs_usage,
@ -1348,34 +1405,50 @@ public:
// |total num|| data size || data | ... | data size || data |
size_t dump_bound() override { return sizeof(size_t) * (1 + size()) + _total_kv_pairs_usage; }
bool dump(phmap::BinaryOutputArchive& ar) override {
if (!ar.dump(size())) {
LOG(ERROR) << "Pindex dump snapshot size failed";
return false;
Status dump(phmap::BinaryOutputArchive& ar) override {
if (!ar.dump(static_cast<uint64_t>(size()))) {
return Status::Corruption("SliceMutableIndex dump size failed");
}
if (size() == 0) {
return true;
return Status::OK();
}
for (const auto& composite_key : _set) {
if (!ar.dump(static_cast<size_t>(composite_key.size()))) {
LOG(ERROR) << "Pindex dump compose_key_size failed";
return false;
if (!ar.dump(static_cast<uint64_t>(composite_key.size()))) {
return Status::Corruption("SliceMutableIndex dump composite_key size failed");
}
if (composite_key.size() == 0) {
continue;
}
if (!ar.dump(composite_key.data(), composite_key.size())) {
LOG(ERROR) << "Pindex dump composite_key failed.";
return false;
return Status::Corruption("SliceMutableIndex dump composite_key failed");
}
}
return true;
return Status::OK();
// TODO: construct a large buffer and write instead of one by one.
// TODO: dive in phmap internal detail and implement dump of std::string type inside, use ctrl_&slot_ directly to improve performance
// return _set.dump(ar);
}
Status completeness_check(phmap::BinaryInputArchive& ar) override {
uint64_t size = 0;
RETURN_IF(!ar.load(&size), Status::Corruption("Pindex load snapshot size failed"));
RETURN_IF(size == 0, Status::OK());
for (auto i = 0; i < size; ++i) {
uint64_t compose_key_size = 0;
RETURN_IF(!ar.load(&compose_key_size),
Status::Corruption("Pindex load snapshot failed because load compose_key_size failed"));
if (compose_key_size == 0) {
continue;
}
std::string composite_key;
TRY_CATCH_BAD_ALLOC(raw::stl_string_resize_uninitialized(&composite_key, compose_key_size));
RETURN_IF((!ar.load(composite_key.data(), composite_key.size())),
Status::Corruption("Pindex load snapshot failed because load composite_key failed"));
}
return Status::OK();
}
Status pk_dump(PrimaryKeyDump* dump, PrimaryIndexDumpPB* dump_pb) override {
for (const auto& composite_key : _set) {
auto value = UNALIGNED_LOAD64(composite_key.data() + composite_key.size() - kIndexValueSize);
@ -1386,7 +1459,7 @@ public:
}
Status load_snapshot(phmap::BinaryInputArchive& ar) override {
size_t size = 0;
uint64_t size = 0;
RETURN_IF(!ar.load(&size), Status::Corruption("Pindex load snapshot size failed"));
RETURN_IF(size == 0, Status::OK());
TRY_CATCH_BAD_ALLOC(reserve(size));
@ -1395,7 +1468,7 @@ public:
return Status::MemoryLimitExceeded("error phmap size");
});
for (auto i = 0; i < size; ++i) {
size_t compose_key_size = 0;
uint64_t compose_key_size = 0;
RETURN_IF(!ar.load(&compose_key_size),
Status::Corruption("Pindex load snapshot failed because load compose_key_size failed"));
if (compose_key_size == 0) {
@ -1508,6 +1581,8 @@ public:
return ret;
}
void set_mutable_index_format_version(uint32_t ver) override {}
private:
friend ShardByLengthMutableIndex;
friend PersistentIndex;
@ -1965,7 +2040,40 @@ Status ShardByLengthMutableIndex::append_wal(const Slice* keys, const IndexValue
return Status::OK();
}
Status ShardByLengthMutableIndex::check_snapshot_file(phmap::BinaryInputArchive& ar, const std::set<uint32_t>& idxes) {
// Check if this file is generated by old version of SR. There could be two types based on whether support SSE.
// https://github.com/StarRocks/starrocks/blob/0d19cb4f9bc58d0cab5237a469b9e4bd30c0eb31/be/src/util/phmap/phmap.h#L447
// If the `completeness_check` fails or `ar` doesn't reach end of file, it indicates that the file is either corrupted
// or was generated on a different CPU architecture. In this case, compatibility loading will be skipped,
// and the snapshot will be rebuilt.
ar.reset();
for (const auto idx : idxes) {
RETURN_IF_ERROR(_shards[idx]->completeness_check(ar));
}
// Must reach the end of the file.
if (!ar.eof()) {
return Status::Corruption(fmt::format(
"ShardByLengthMutableIndex snapshot file {} is generated by different arch or corrupt, will rebuild.",
_path));
}
return Status::OK();
}
Status ShardByLengthMutableIndex::load_snapshot(phmap::BinaryInputArchive& ar, const std::set<uint32_t>& idxes) {
uint32_t magic_num = 0;
RETURN_IF(!ar.load(&magic_num), Status::Corruption("ShardByLengthMutableIndex load snapshot magic num failed"));
if (magic_num != kSnapshotMagicNum) {
// There are three possible reasons:
// 1. This file is corrupted.
// 2. This file was generated at a different cpu architecture.
// 3. This file was generated by a old version of SR.
RETURN_IF_ERROR(check_snapshot_file(ar, idxes));
// keep load snapshot using old format.
for (const auto idx : idxes) {
_shards[idx]->set_mutable_index_format_version(kMutableIndexFormatVersion1);
}
ar.reset();
}
for (const auto idx : idxes) {
RETURN_IF_ERROR(_shards[idx]->load_snapshot(ar));
}
@ -1979,17 +2087,25 @@ size_t ShardByLengthMutableIndex::dump_bound() {
[](size_t s, const auto& e) { return e->size() > 0 ? s + e->dump_bound() : s; });
}
bool ShardByLengthMutableIndex::dump(phmap::BinaryOutputArchive& ar_out, std::set<uint32_t>& dumped_shard_idxes) {
Status ShardByLengthMutableIndex::dump(phmap::BinaryOutputArchive& ar_out, std::set<uint32_t>& dumped_shard_idxes) {
bool use_old_format = false;
TEST_SYNC_POINT_CALLBACK("ShardByLengthMutableIndex::dump::1", &use_old_format);
// We introduced the new format specifically to address cross-platform compatibility issues with snapshot files.
// In previous format, we met issue when migrate from x86 to arm64.
// https://github.com/StarRocks/starrocks/issues/57952
if (LIKELY(!use_old_format)) {
if (!ar_out.dump(kSnapshotMagicNum)) {
return Status::InternalError("ShardByLengthMutableIndex dump snapshot magic num failed");
}
}
for (uint32_t i = 0; i < _shards.size(); ++i) {
const auto& shard = _shards[i];
if (shard->size() > 0) {
if (!shard->dump(ar_out)) {
return false;
}
RETURN_IF_ERROR(shard->dump(ar_out));
dumped_shard_idxes.insert(i);
}
}
return true;
return Status::OK();
}
Status ShardByLengthMutableIndex::pk_dump(PrimaryKeyDump* dump, PrimaryIndexDumpPB* dump_pb) {
@ -2031,7 +2147,7 @@ Status ShardByLengthMutableIndex::commit(MutableIndexMetaPB* meta, const EditVer
// create a new empty _l0 file, set _offset to 0
data->set_offset(0);
data->set_size(0);
meta->set_format_version(PERSISTENT_INDEX_VERSION_6);
meta->set_format_version(PERSISTENT_INDEX_VERSION_7);
_offset = 0;
_page_size = 0;
_checksum = 0;
@ -2048,11 +2164,7 @@ Status ShardByLengthMutableIndex::commit(MutableIndexMetaPB* meta, const EditVer
// closed. So the archive object needed to be destroyed before reopen the file and assigned it
// to _index_file. Otherwise some data of file maybe overwrite in future append.
phmap::BinaryOutputArchive ar_out(file_name.data());
if (!dump(ar_out, dumped_shard_idxes)) {
std::string err_msg = strings::Substitute("failed to dump snapshot to file $0", file_name);
LOG(WARNING) << err_msg;
return Status::InternalError(err_msg);
}
RETURN_IF_ERROR(dump(ar_out, dumped_shard_idxes));
if (!ar_out.close()) {
std::string err_msg =
strings::Substitute("failed to dump snapshot to file $0, because of close", file_name);
@ -2083,7 +2195,7 @@ Status ShardByLengthMutableIndex::commit(MutableIndexMetaPB* meta, const EditVer
snapshot->mutable_dumped_shard_idxes()->Add(dumped_shard_idxes.begin(), dumped_shard_idxes.end());
RETURN_IF_ERROR(checksum_of_file(l0_rfile.get(), 0, snapshot_size, &_checksum));
snapshot->set_checksum(_checksum);
meta->set_format_version(PERSISTENT_INDEX_VERSION_6);
meta->set_format_version(PERSISTENT_INDEX_VERSION_7);
_offset = snapshot_size;
_page_size = 0;
_checksum = 0;
@ -2096,7 +2208,7 @@ Status ShardByLengthMutableIndex::commit(MutableIndexMetaPB* meta, const EditVer
data->set_offset(_offset);
data->set_size(_page_size);
wal_pb->set_checksum(_checksum);
meta->set_format_version(PERSISTENT_INDEX_VERSION_6);
meta->set_format_version(PERSISTENT_INDEX_VERSION_7);
_offset += _page_size;
_page_size = 0;
_checksum = 0;
@ -2113,7 +2225,7 @@ Status ShardByLengthMutableIndex::load(const MutableIndexMetaPB& meta) {
auto format_version = meta.format_version();
if (format_version != PERSISTENT_INDEX_VERSION_2 && format_version != PERSISTENT_INDEX_VERSION_3 &&
format_version != PERSISTENT_INDEX_VERSION_4 && format_version != PERSISTENT_INDEX_VERSION_5 &&
format_version != PERSISTENT_INDEX_VERSION_6) {
format_version != PERSISTENT_INDEX_VERSION_6 && format_version != PERSISTENT_INDEX_VERSION_7) {
std::string msg = strings::Substitute("different l0 format, should rebuid index. actual:$0, expect:$1",
format_version, PERSISTENT_INDEX_VERSION_5);
LOG(WARNING) << msg;
@ -3017,10 +3129,10 @@ StatusOr<std::unique_ptr<ImmutableIndex>> ImmutableIndex::load(std::unique_ptr<R
auto format_version = meta.format_version();
if (format_version != PERSISTENT_INDEX_VERSION_2 && format_version != PERSISTENT_INDEX_VERSION_3 &&
format_version != PERSISTENT_INDEX_VERSION_4 && format_version != PERSISTENT_INDEX_VERSION_5 &&
format_version != PERSISTENT_INDEX_VERSION_6) {
format_version != PERSISTENT_INDEX_VERSION_6 && format_version != PERSISTENT_INDEX_VERSION_7) {
std::string msg =
strings::Substitute("different immutable index format, should rebuid index. actual:$0, expect:$1",
format_version, PERSISTENT_INDEX_VERSION_6);
format_version, PERSISTENT_INDEX_VERSION_7);
LOG(WARNING) << msg;
return Status::InternalError(msg);
}
@ -3570,7 +3682,7 @@ Status PersistentIndex::commit(PersistentIndexMetaPB* index_meta, IOStat* stat)
// update PersistentIndexMetaPB
index_meta->set_size(_size);
index_meta->set_usage(_usage);
index_meta->set_format_version(PERSISTENT_INDEX_VERSION_6);
index_meta->set_format_version(PERSISTENT_INDEX_VERSION_7);
_version.to_pb(index_meta->mutable_version());
_version.to_pb(index_meta->mutable_l1_version());
MutableIndexMetaPB* l0_meta = index_meta->mutable_l0_meta();
@ -3580,14 +3692,14 @@ Status PersistentIndex::commit(PersistentIndexMetaPB* index_meta, IOStat* stat)
} else if (_dump_snapshot) {
index_meta->set_size(_size);
index_meta->set_usage(_usage);
index_meta->set_format_version(PERSISTENT_INDEX_VERSION_6);
index_meta->set_format_version(PERSISTENT_INDEX_VERSION_7);
_version.to_pb(index_meta->mutable_version());
MutableIndexMetaPB* l0_meta = index_meta->mutable_l0_meta();
RETURN_IF_ERROR(_l0->commit(l0_meta, _version, kSnapshot));
} else {
index_meta->set_size(_size);
index_meta->set_usage(_usage);
index_meta->set_format_version(PERSISTENT_INDEX_VERSION_6);
index_meta->set_format_version(PERSISTENT_INDEX_VERSION_7);
_version.to_pb(index_meta->mutable_version());
MutableIndexMetaPB* l0_meta = index_meta->mutable_l0_meta();
RETURN_IF_ERROR(_l0->commit(l0_meta, _version, kAppendWAL));
@ -4659,7 +4771,7 @@ Status PersistentIndex::_minor_compaction(PersistentIndexMetaPB* index_meta) {
// 3. modify meta
index_meta->set_size(_size);
index_meta->set_usage(_usage);
index_meta->set_format_version(PERSISTENT_INDEX_VERSION_6);
index_meta->set_format_version(PERSISTENT_INDEX_VERSION_7);
_version.to_pb(index_meta->mutable_version());
_version.to_pb(index_meta->mutable_l1_version());
MutableIndexMetaPB* l0_meta = index_meta->mutable_l0_meta();
@ -5102,7 +5214,7 @@ Status PersistentIndex::reset(Tablet* tablet, EditVersion version, PersistentInd
index_meta->clear_l2_version_merged();
index_meta->set_key_size(_key_size);
index_meta->set_size(0);
index_meta->set_format_version(PERSISTENT_INDEX_VERSION_6);
index_meta->set_format_version(PERSISTENT_INDEX_VERSION_7);
version.to_pb(index_meta->mutable_version());
MutableIndexMetaPB* l0_meta = index_meta->mutable_l0_meta();
l0_meta->clear_wals();
@ -5270,7 +5382,7 @@ Status PersistentIndex::_load_by_loader(TabletLoader* loader) {
index_meta.clear_l2_version_merged();
index_meta.set_key_size(_key_size);
index_meta.set_size(0);
index_meta.set_format_version(PERSISTENT_INDEX_VERSION_6);
index_meta.set_format_version(PERSISTENT_INDEX_VERSION_7);
applied_version.to_pb(index_meta.mutable_version());
MutableIndexMetaPB* l0_meta = index_meta.mutable_l0_meta();
l0_meta->clear_wals();

View File

@ -74,7 +74,8 @@ enum PersistentIndexFileVersion {
PERSISTENT_INDEX_VERSION_3,
PERSISTENT_INDEX_VERSION_4,
PERSISTENT_INDEX_VERSION_5,
PERSISTENT_INDEX_VERSION_6
PERSISTENT_INDEX_VERSION_6,
PERSISTENT_INDEX_VERSION_7
};
static constexpr uint64_t NullIndexValue = -1;
@ -250,7 +251,7 @@ public:
// get dump total size of hashmaps of shards
virtual size_t dump_bound() = 0;
virtual bool dump(phmap::BinaryOutputArchive& ar) = 0;
virtual Status dump(phmap::BinaryOutputArchive& ar) = 0;
// get all key-values pair references by shard, the result will remain valid until next modification
// |nshard|: number of shard
@ -281,6 +282,10 @@ public:
virtual Status pk_dump(PrimaryKeyDump* dump, PrimaryIndexDumpPB* dump_pb) = 0;
virtual void set_mutable_index_format_version(uint32_t ver) = 0;
virtual Status completeness_check(phmap::BinaryInputArchive& ar) = 0;
static StatusOr<std::unique_ptr<MutableIndex>> create(size_t key_size);
static std::tuple<size_t, size_t, size_t> estimate_nshard_and_npage(const size_t total_kv_pairs_usage,
@ -373,7 +378,7 @@ public:
size_t dump_bound();
bool dump(phmap::BinaryOutputArchive& ar, std::set<uint32_t>& dumped_shard_idxes);
Status dump(phmap::BinaryOutputArchive& ar, std::set<uint32_t>& dumped_shard_idxes);
Status commit(MutableIndexMetaPB* meta, const EditVersion& version, const CommitType& type);
@ -408,6 +413,8 @@ public:
Status pk_dump(PrimaryKeyDump* dump, PrimaryIndexDumpPB* dump_pb);
Status check_snapshot_file(phmap::BinaryInputArchive& ar, const std::set<uint32_t>& idxes);
private:
friend class PersistentIndex;
friend class starrocks::lake::LakeLocalPersistentIndex;

View File

@ -82,6 +82,7 @@
#include <utility>
#include <vector>
#include "common/status.h"
#include "phmap_base.h"
#include "phmap_fwd_decl.h"
#include "phmap_hash.h"
@ -1537,10 +1538,13 @@ public:
size_t dump_bound() const;
template <typename OutputArchive>
bool dump(OutputArchive&) const;
starrocks::Status dump(OutputArchive&) const;
template <typename InputArchive>
bool load(InputArchive&);
starrocks::Status load(InputArchive&);
template <typename InputArchive>
starrocks::Status completeness_check(InputArchive& ar);
#endif
void rehash(size_t n) {

View File

@ -82,65 +82,108 @@ size_t raw_hash_set<Policy, Hash, Eq, Alloc>::dump_bound() const {
template <class Policy, class Hash, class Eq, class Alloc>
template <typename OutputArchive>
bool raw_hash_set<Policy, Hash, Eq, Alloc>::dump(OutputArchive& ar) const {
starrocks::Status raw_hash_set<Policy, Hash, Eq, Alloc>::dump(OutputArchive& ar) const {
static_assert(type_traits_internal::IsTriviallyCopyable<value_type>::value,
"value_type should be trivially copyable");
if (!ar.dump(size_)) {
std::cerr << "Failed to dump size_" << std::endl;
return false;
if (!ar.dump(static_cast<uint64_t>(size_))) {
return starrocks::Status::InternalError("Failed to dump size_");
}
if (size_ == 0) {
return true;
return starrocks::Status::OK();
}
if (!ar.dump(capacity_)) {
std::cerr << "Failed to dump capacity_" << std::endl;
return false;
if (!ar.dump(static_cast<uint64_t>(capacity_))) {
return starrocks::Status::InternalError("Failed to dump capacity_");
}
SanitizerUnpoisonMemoryRegion(ctrl_, sizeof(ctrl_t) * (capacity_ + Group::kWidth + 1));
if (!ar.dump(reinterpret_cast<char*>(ctrl_), sizeof(ctrl_t) * (capacity_ + Group::kWidth + 1))) {
std::cerr << "Failed to dump ctrl_" << std::endl;
return false;
return starrocks::Status::InternalError("Failed to dump ctrl_");
}
SanitizerUnpoisonMemoryRegion(slots_, sizeof(slot_type) * capacity_);
if (!ar.dump(reinterpret_cast<char*>(slots_), sizeof(slot_type) * capacity_)) {
std::cerr << "Failed to dump slot_" << std::endl;
return starrocks::Status::InternalError("Failed to dump slots_");
}
return starrocks::Status::OK();
}
static inline bool safe_convert_from_uint64(uint64_t value, size_t& result) {
if (value > std::numeric_limits<size_t>::max()) {
return false;
}
result = static_cast<size_t>(value);
return true;
}
template <class Policy, class Hash, class Eq, class Alloc>
template <typename InputArchive>
bool raw_hash_set<Policy, Hash, Eq, Alloc>::load(InputArchive& ar) {
starrocks::Status raw_hash_set<Policy, Hash, Eq, Alloc>::load(InputArchive& ar) {
static_assert(type_traits_internal::IsTriviallyCopyable<value_type>::value,
"value_type should be trivially copyable");
raw_hash_set<Policy, Hash, Eq, Alloc>().swap(*this); // clear any existing content
if (!ar.load(&size_)) {
std::cerr << "Failed to load size_" << std::endl;
return false;
uint64_t size_64;
if (!ar.load(&size_64)) {
return starrocks::Status::InternalError("Failed to load size_");
}
if (!safe_convert_from_uint64(size_64, size_)) {
// If size_64 is larger than the maximum value of size_t, we cannot safely convert it.
return starrocks::Status::InternalError("Loaded size too large for current platform's size_t");
}
if (size_ == 0) {
return true;
return starrocks::Status::OK();
}
if (!ar.load(&capacity_)) {
std::cerr << "Failed to load capacity_" << std::endl;
return false;
uint64_t capacity_64;
if (!ar.load(&capacity_64)) {
return starrocks::Status::InternalError("Failed to load capacity_");
}
if (!safe_convert_from_uint64(capacity_64, capacity_)) {
// If capacity_64 is larger than the maximum value of size_t, we cannot safely convert it.
return starrocks::Status::InternalError("Loaded capacity too large for current platform's size_t");
}
// allocate memory for ctrl_ and slots_
initialize_slots();
SanitizerUnpoisonMemoryRegion(ctrl_, sizeof(ctrl_t) * (capacity_ + Group::kWidth + 1));
if (!ar.load(reinterpret_cast<char*>(ctrl_), sizeof(ctrl_t) * (capacity_ + Group::kWidth + 1))) {
std::cerr << "Failed to load ctrl" << std::endl;
return false;
return starrocks::Status::InternalError("Failed to load ctrl_");
}
SanitizerUnpoisonMemoryRegion(slots_, sizeof(slot_type) * capacity_);
if (!ar.load(reinterpret_cast<char*>(slots_), sizeof(slot_type) * capacity_)) {
std::cerr << "Failed to load slot" << std::endl;
return false;
return starrocks::Status::InternalError("Failed to load slots_");
}
return true;
return starrocks::Status::OK();
}
template <class Policy, class Hash, class Eq, class Alloc>
template <typename InputArchive>
starrocks::Status raw_hash_set<Policy, Hash, Eq, Alloc>::completeness_check(InputArchive& ar) {
static_assert(type_traits_internal::IsTriviallyCopyable<value_type>::value,
"value_type should be trivially copyable");
raw_hash_set<Policy, Hash, Eq, Alloc>().swap(*this); // clear any existing content
uint64_t size = 0;
if (!ar.load(&size)) {
return starrocks::Status::InternalError("Failed to load size");
}
if (size == 0) {
return starrocks::Status::OK();
}
uint64_t capacity = 0;
if (!ar.load(&capacity)) {
return starrocks::Status::InternalError("Failed to load capacity");
}
// skip ctrl
if (!ar.skip(sizeof(ctrl_t) * (capacity + Group::kWidth + 1))) {
return starrocks::Status::InternalError("Failed to skip ctrl");
}
// skip slot
if (!ar.skip(sizeof(slot_type) * capacity)) {
return starrocks::Status::InternalError("Failed to skip slot");
}
return starrocks::Status::OK();
}
// ------------------------------------------------------------------------
@ -296,6 +339,22 @@ public:
return !ifs_.fail();
}
void reset() {
ifs_.clear();
ifs_.seekg(0, std::ios_base::beg);
}
bool skip(size_t sz) {
ifs_.seekg(sz, std::ios_base::cur);
return !ifs_.fail();
}
bool eof() {
char dummy;
ifs_.get(dummy);
return ifs_.eof();
}
private:
std::ifstream ifs_;
};

View File

@ -263,6 +263,7 @@ set(EXEC_FILES
./storage/page_cache_test.cpp
./storage/persistent_index_test.cpp
./storage/persistent_index_load_executor_test.cpp
./storage/persistent_index_snapshot_load_test.cpp
./storage/primary_index_test.cpp
./storage/primary_key_encoder_test.cpp
./storage/roaring2range_test.cpp

View File

@ -0,0 +1,230 @@
// Copyright 2021-present StarRocks, Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include <gtest/gtest.h>
#include <cstdlib>
#include "fs/fs_memory.h"
#include "fs/fs_util.h"
#include "storage/chunk_helper.h"
#include "storage/persistent_index.h"
#include "storage/persistent_index_compaction_manager.h"
#include "storage/rowset/rowset.h"
#include "storage/rowset/rowset_factory.h"
#include "storage/rowset/rowset_writer.h"
#include "storage/rowset/rowset_writer_context.h"
#include "storage/rowset_update_state.h"
#include "storage/storage_engine.h"
#include "storage/tablet_manager.h"
#include "storage/update_manager.h"
#include "testutil/assert.h"
#include "testutil/parallel_test.h"
#include "util/coding.h"
#include "util/failpoint/fail_point.h"
#include "util/faststring.h"
namespace starrocks {
struct PersistentIndexSnapshotLoadTestParam {
bool fixed_key_size = false; // if true, use fixed key size, otherwise use variable key size
};
class PersistentIndexSnapshotLoadTest : public testing::TestWithParam<PersistentIndexSnapshotLoadTestParam> {
public:
virtual ~PersistentIndexSnapshotLoadTest() {}
void SetUp() override {}
template <typename KeyType>
void test_index_snapshot_load_impl(bool load_only) {
using Key = KeyType;
const int N = 1000;
vector<Key> keys(N);
vector<Slice> key_slices;
vector<IndexValue> values;
vector<size_t> idxes;
key_slices.reserve(N);
idxes.reserve(N);
for (int i = 0; i < N; i++) {
if constexpr (std::is_same_v<Key, uint32_t>) {
ASSERT_TRUE(GetParam().fixed_key_size)
<< "KeyType is uint32_t, but GetParam().fixed_key_size is false.";
keys[i] = i;
values.emplace_back(i * 2);
key_slices.emplace_back(reinterpret_cast<const uint8_t*>(&keys[i]), sizeof(Key));
idxes.push_back(i);
} else {
ASSERT_FALSE(GetParam().fixed_key_size)
<< "KeyType is std::string, but GetParam().fixed_key_size is true.";
keys[i] = "test_varlen_" + std::to_string(i);
values.emplace_back(i * 2);
key_slices.emplace_back(keys[i]);
idxes.push_back(i);
}
}
auto check_fn = [&](MutableIndex* idx) {
vector<IndexValue> get_values(keys.size());
KeysInfo get_not_found;
size_t get_num_found = 0;
ASSERT_TRUE(idx->get(key_slices.data(), get_values.data(), &get_not_found, &get_num_found, idxes).ok());
ASSERT_EQ(keys.size(), get_num_found);
ASSERT_EQ(get_not_found.size(), 0);
for (int i = 0; i < values.size(); i++) {
ASSERT_EQ(values[i], get_values[i]);
}
};
ASSIGN_OR_ABORT(auto idx, MutableIndex::create(GetParam().fixed_key_size ? sizeof(Key) : 0));
if (!load_only) {
ASSERT_OK(idx->insert(key_slices.data(), values.data(), idxes));
// check the index after insert
check_fn(idx.get());
}
// dump to file
std::string file_name = "";
if (GetParam().fixed_key_size) {
file_name = "./test_fixed_index_snapshot_load";
} else {
file_name = "./test_var_index_snapshot_load";
}
if (!load_only) {
(void)FileSystem::Default()->delete_file(file_name);
phmap::BinaryOutputArchive ar(file_name.data());
ASSERT_OK(idx->dump(ar));
ASSERT_TRUE(ar.close());
}
ASSIGN_OR_ABORT(auto new_idx, MutableIndex::create(GetParam().fixed_key_size ? sizeof(Key) : 0));
phmap::BinaryInputArchive ar_in(file_name.data());
ASSERT_TRUE(new_idx->load_snapshot(ar_in).ok());
// check the index after load
check_fn(new_idx.get());
// clean files
(void)FileSystem::Default()->delete_file(file_name);
}
template <typename KeyType>
void test_dump_load_snapshot_impl() {
// skip if not fixed key size
FileSystem* fs = FileSystem::Default();
const std::string kPersistentIndexDir = "./PersistentIndexSnapshotLoadTest_test_dump_load_snapshot";
const std::string kIndexFile = "./PersistentIndexSnapshotLoadTest_test_dump_load_snapshot/index.l0.0.0";
bool created;
ASSERT_OK(fs->create_dir_if_missing(kPersistentIndexDir, &created));
using Key = KeyType;
PersistentIndexMetaPB index_meta;
const int N = 1000;
vector<Key> keys;
vector<Slice> key_slices;
vector<IndexValue> values;
keys.resize(N);
for (int i = 0; i < N; i++) {
if constexpr (std::is_same_v<Key, uint32_t>) {
ASSERT_TRUE(GetParam().fixed_key_size)
<< "KeyType is uint32_t, but GetParam().fixed_key_size is false.";
keys[i] = i;
values.emplace_back(i * 2);
key_slices.emplace_back(reinterpret_cast<const uint8_t*>(&keys[i]), sizeof(Key));
} else {
ASSERT_FALSE(GetParam().fixed_key_size)
<< "KeyType is std::string, but GetParam().fixed_key_size is true.";
std::string prefix(64, 'A');
keys[i] = prefix + "test_varlen_" + std::to_string(i);
values.emplace_back(i * 2);
key_slices.emplace_back(keys[i]);
}
}
{
ASSIGN_OR_ABORT(auto wfile, FileSystem::Default()->new_writable_file(kIndexFile));
ASSERT_OK(wfile->close());
}
EditVersion version(0, 0);
index_meta.set_key_size(GetParam().fixed_key_size ? sizeof(Key) : 0);
index_meta.set_size(0);
version.to_pb(index_meta.mutable_version());
MutableIndexMetaPB* l0_meta = index_meta.mutable_l0_meta();
l0_meta->set_format_version(PERSISTENT_INDEX_VERSION_5);
IndexSnapshotMetaPB* snapshot_meta = l0_meta->mutable_snapshot();
version.to_pb(snapshot_meta->mutable_version());
{
std::vector<IndexValue> old_values(N, IndexValue(NullIndexValue));
PersistentIndex index(kPersistentIndexDir);
ASSERT_OK(index.load(index_meta));
// 1. dump snapshot using version 1
SyncPoint::GetInstance()->SetCallBack("FixedMutableIndex::dump::1", [](void* arg) { *(bool*)arg = true; });
SyncPoint::GetInstance()->SetCallBack("ShardByLengthMutableIndex::dump::1",
[](void* arg) { *(bool*)arg = true; });
SyncPoint::GetInstance()->EnableProcessing();
DeferOp defer([]() {
SyncPoint::GetInstance()->ClearCallBack("FixedMutableIndex::dump::1");
SyncPoint::GetInstance()->ClearCallBack("ShardByLengthMutableIndex::dump::1");
SyncPoint::GetInstance()->DisableProcessing();
});
index.test_force_dump();
ASSERT_OK(index.prepare(EditVersion(1, 0), N));
ASSERT_OK(index.upsert(N, key_slices.data(), values.data(), old_values.data()));
ASSERT_OK(index.commit(&index_meta));
ASSERT_OK(index.on_commited());
}
{
// 2. load snapshot using version 2
PersistentIndex index2(kPersistentIndexDir);
ASSERT_OK(index2.load(index_meta));
// check the index after load
std::vector<IndexValue> get_values(keys.size());
ASSERT_TRUE(index2.get(keys.size(), key_slices.data(), get_values.data()).ok());
ASSERT_EQ(keys.size(), get_values.size());
ASSERT_EQ(values.size(), get_values.size());
for (int i = 0; i < values.size(); i++) {
ASSERT_EQ(values[i], get_values[i]);
}
}
ASSERT_TRUE(fs::remove_all(kPersistentIndexDir).ok());
}
};
TEST_P(PersistentIndexSnapshotLoadTest, test_index_snapshot_load) {
if (GetParam().fixed_key_size) {
using Key = uint32_t; // fixed key size
test_index_snapshot_load_impl<Key>(false);
} else {
using Key = std::string; // variable key size
test_index_snapshot_load_impl<Key>(false);
}
}
TEST_P(PersistentIndexSnapshotLoadTest, test_dump_load_snapshot) {
if (GetParam().fixed_key_size) {
using Key = uint32_t; // fixed key size
test_dump_load_snapshot_impl<Key>();
} else {
using Key = std::string; // variable key size
test_dump_load_snapshot_impl<Key>();
}
}
INSTANTIATE_TEST_SUITE_P(PersistentIndexSnapshotLoadTest, PersistentIndexSnapshotLoadTest,
::testing::Values(PersistentIndexSnapshotLoadTestParam{true}, // fixed key size
PersistentIndexSnapshotLoadTestParam{false} // variable key size
));
} // namespace starrocks