[Enhancement] collect global dict for flatjson (#61680)
Signed-off-by: Murphy <mofei@starrocks.com>
This commit is contained in:
parent
4878d7a233
commit
cc7c240861
|
|
@ -223,6 +223,8 @@ public:
|
|||
|
||||
bool is_not_authorized() const { return code() == TStatusCode::NOT_AUTHORIZED; }
|
||||
|
||||
bool is_global_dict_error() const { return code() == TStatusCode::GLOBAL_DICT_ERROR; }
|
||||
|
||||
// Convert into TStatus. Call this if 'status_container' contains an optional
|
||||
// TStatus field named 'status'. This also sets __isset.status.
|
||||
template <typename T>
|
||||
|
|
|
|||
|
|
@ -29,6 +29,19 @@ MetaScanNode::~MetaScanNode() {
|
|||
}
|
||||
}
|
||||
|
||||
Status MetaScanNode::init(const TPlanNode& tnode, RuntimeState* state) {
|
||||
RETURN_IF_ERROR(ScanNode::init(tnode, state));
|
||||
|
||||
if (_meta_scan_node.__isset.column_access_paths) {
|
||||
for (int i = 0; i < _meta_scan_node.column_access_paths.size(); ++i) {
|
||||
ASSIGN_OR_RETURN(auto path, ColumnAccessPath::create(_meta_scan_node.column_access_paths[i], state, _pool));
|
||||
_column_access_paths.emplace_back(std::move(path));
|
||||
}
|
||||
}
|
||||
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
void MetaScanNode::_init_counter(RuntimeState* state) {
|
||||
_scan_timer = ADD_TIMER(_runtime_profile, "ScanTime");
|
||||
_meta_scan_profile = _runtime_profile->create_child("META_SCAN", true, false);
|
||||
|
|
|
|||
|
|
@ -28,6 +28,7 @@ public:
|
|||
MetaScanNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs);
|
||||
~MetaScanNode() override;
|
||||
|
||||
Status init(const TPlanNode& tnode, RuntimeState* state) override;
|
||||
Status prepare(RuntimeState* state) override;
|
||||
|
||||
void close(RuntimeState* state) override;
|
||||
|
|
|
|||
|
|
@ -14,6 +14,8 @@
|
|||
|
||||
#include "exec/olap_meta_scanner.h"
|
||||
|
||||
#include <memory>
|
||||
|
||||
#include "exec/olap_meta_scan_node.h"
|
||||
#include "storage/storage_engine.h"
|
||||
#include "storage/tablet.h"
|
||||
|
|
@ -53,8 +55,34 @@ Status OlapMetaScanner::_init_meta_reader_params() {
|
|||
} else {
|
||||
_reader_params.tablet_schema = _tablet->tablet_schema();
|
||||
}
|
||||
if (_parent->_meta_scan_node.__isset.column_access_paths && !_parent->_column_access_paths.empty()) {
|
||||
_reader_params.column_access_paths = &_parent->_column_access_paths;
|
||||
}
|
||||
// add the extended column access paths into tablet_schema
|
||||
{
|
||||
TabletSchemaSPtr tmp_schema = TabletSchema::copy(*_reader_params.tablet_schema);
|
||||
int field_number = tmp_schema->num_columns();
|
||||
for (auto& path : _parent->_column_access_paths) {
|
||||
int root_column_index = tmp_schema->field_index(path->path());
|
||||
RETURN_IF(root_column_index < 0, Status::RuntimeError("unknown access path: " + path->path()));
|
||||
|
||||
TabletColumn column;
|
||||
column.set_name(path->linear_path());
|
||||
column.set_unique_id(++field_number);
|
||||
column.set_type(path->value_type().type);
|
||||
column.set_length(path->value_type().len);
|
||||
column.set_is_nullable(true);
|
||||
column.set_extended_info(std::make_unique<ExtendedColumnInfo>(path.get(), root_column_index));
|
||||
|
||||
tmp_schema->append_column(column);
|
||||
VLOG(2) << "extend the tablet-schema: " << column.debug_string();
|
||||
}
|
||||
_reader_params.tablet_schema = tmp_schema;
|
||||
}
|
||||
_reader_params.desc_tbl = &_parent->_desc_tbl;
|
||||
|
||||
VLOG(2) << "init_meta_reader schema: " << _reader_params.tablet_schema->debug_string();
|
||||
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -302,12 +302,25 @@ Status SegmentMetaCollecter::_collect_dict(ColumnId cid, Column* column, Logical
|
|||
if (!_column_iterators[cid]) {
|
||||
return Status::InvalidArgument("Invalid Collect Params.");
|
||||
}
|
||||
auto& schema = _params->tablet_schema;
|
||||
RETURN_IF(cid < 0 || cid >= schema->num_columns(), Status::InvalidArgument("Invalid cid: " + std::to_string(cid)));
|
||||
auto& tablet_column = schema->column(cid);
|
||||
if (tablet_column.type() == TYPE_VARCHAR || tablet_column.type() == TYPE_ARRAY) {
|
||||
RETURN_IF_ERROR(_collect_dict_for_column(_column_iterators[cid].get(), cid, column));
|
||||
} else if (tablet_column.type() == TYPE_JSON) {
|
||||
RETURN_IF_ERROR(_collect_dict_for_flatjson(cid, column));
|
||||
} else {
|
||||
return Status::InvalidArgument("unsupported column type: " + type_to_string(tablet_column.type()));
|
||||
}
|
||||
return {};
|
||||
}
|
||||
|
||||
Status SegmentMetaCollecter::_collect_dict_for_column(ColumnIterator* column_iter, ColumnId cid, Column* column) {
|
||||
std::vector<Slice> words;
|
||||
if (!_column_iterators[cid]->all_page_dict_encoded()) {
|
||||
if (!column_iter->all_page_dict_encoded()) {
|
||||
return Status::GlobalDictError("no global dict");
|
||||
} else {
|
||||
RETURN_IF_ERROR(_column_iterators[cid]->fetch_all_dict_words(&words));
|
||||
RETURN_IF_ERROR(column_iter->fetch_all_dict_words(&words));
|
||||
}
|
||||
|
||||
if (words.size() > _params->low_cardinality_threshold) {
|
||||
|
|
@ -347,6 +360,37 @@ Status SegmentMetaCollecter::_collect_dict(ColumnId cid, Column* column, Logical
|
|||
return Status::OK();
|
||||
}
|
||||
|
||||
Status SegmentMetaCollecter::_collect_dict_for_flatjson(ColumnId cid, Column* column) {
|
||||
auto& tablet_column = _segment->tablet_schema().column(cid);
|
||||
if (tablet_column.type() != TYPE_JSON) {
|
||||
return Status::InvalidArgument("not a flat json column");
|
||||
}
|
||||
auto column_reader = _segment->column(cid);
|
||||
RETURN_IF(column_reader == nullptr, Status::NotFound(fmt::format("column not found: {}", tablet_column.name())));
|
||||
|
||||
auto& sub_readers = *column_reader->sub_readers();
|
||||
for (auto& sub_reader : sub_readers) {
|
||||
if (sub_reader->column_type() == TYPE_VARCHAR) {
|
||||
ASSIGN_OR_RETURN(auto source_iter, sub_reader->new_iterator());
|
||||
ColumnIteratorOptions iter_opts;
|
||||
iter_opts.check_dict_encoding = true;
|
||||
iter_opts.read_file = _read_file.get();
|
||||
iter_opts.stats = &_stats;
|
||||
RETURN_IF_ERROR(source_iter->init(iter_opts));
|
||||
|
||||
Status st = (_collect_dict_for_column(source_iter.get(), cid, column));
|
||||
if (st.is_global_dict_error()) {
|
||||
// ignore
|
||||
} else if (!st.ok()) {
|
||||
return st;
|
||||
}
|
||||
VLOG(2) << "collect_dict_for_flatjson: " << sub_reader->name();
|
||||
}
|
||||
}
|
||||
|
||||
return {};
|
||||
}
|
||||
|
||||
Status SegmentMetaCollecter::_collect_max(ColumnId cid, Column* column, LogicalType type) {
|
||||
return __collect_max_or_min<true>(cid, column, type);
|
||||
}
|
||||
|
|
|
|||
|
|
@ -121,10 +121,15 @@ public:
|
|||
using CollectFunc = std::function<Status(ColumnId, Column*, LogicalType)>;
|
||||
std::unordered_map<std::string, CollectFunc> support_collect_func;
|
||||
|
||||
// Friend class for testing
|
||||
friend class SegmentMetaCollecterTest;
|
||||
|
||||
private:
|
||||
Status _init_return_column_iterators();
|
||||
Status _collect(const std::string& name, ColumnId cid, Column* column, LogicalType type);
|
||||
Status _collect_dict(ColumnId cid, Column* column, LogicalType type);
|
||||
Status _collect_dict_for_flatjson(ColumnId cid, Column* column);
|
||||
Status _collect_dict_for_column(ColumnIterator* column_iter, ColumnId cid, Column* column);
|
||||
Status _collect_max(ColumnId cid, Column* column, LogicalType type);
|
||||
Status _collect_min(ColumnId cid, Column* column, LogicalType type);
|
||||
Status _collect_count(ColumnId cid, Column* column, LogicalType type);
|
||||
|
|
|
|||
|
|
@ -34,6 +34,7 @@ struct OlapMetaReaderParams : MetaReaderParams {
|
|||
|
||||
TabletSharedPtr tablet;
|
||||
TabletSchemaCSPtr tablet_schema;
|
||||
std::vector<ColumnAccessPathPtr>* column_access_paths = nullptr;
|
||||
};
|
||||
|
||||
class OlapMetaReader final : public MetaReader {
|
||||
|
|
|
|||
|
|
@ -43,6 +43,7 @@
|
|||
#include "gen_cpp/segment.pb.h" // for EncodingTypePB
|
||||
#include "gutil/strings/substitute.h"
|
||||
#include "runtime/global_dict/types.h"
|
||||
#include "runtime/global_dict/types_fwd_decl.h"
|
||||
#include "storage/index/inverted/inverted_writer.h"
|
||||
#include "storage/rowset/binary_dict_page.h"
|
||||
#include "storage/rowset/common.h"
|
||||
|
|
@ -86,7 +87,9 @@ struct ColumnWriterOptions {
|
|||
|
||||
// when column data is encoding by dict
|
||||
// if global_dict is not nullptr, will checkout whether global_dict can cover all data
|
||||
GlobalDictMap* global_dict = nullptr;
|
||||
const GlobalDictMap* global_dict = nullptr;
|
||||
// map<sub_column_name, dict> for FlatJSON
|
||||
std::unordered_map<std::string, const GlobalDictMap> flat_json_dicts;
|
||||
|
||||
bool is_compaction = false;
|
||||
bool need_flat = false;
|
||||
|
|
|
|||
|
|
@ -186,10 +186,22 @@ Status FlatJsonColumnCompactor::_flatten_columns(Columns& json_datas) {
|
|||
Status FlatJsonColumnCompactor::finish() {
|
||||
RETURN_IF_ERROR(_compact_columns(_json_datas));
|
||||
_json_datas.clear(); // release after write
|
||||
for (auto& iter : _flat_writers) {
|
||||
RETURN_IF_ERROR(iter->finish());
|
||||
|
||||
RETURN_IF_ERROR(_json_writer->finish());
|
||||
|
||||
// Check global dict validity for flat writers
|
||||
_subcolumn_dict_valid.clear();
|
||||
|
||||
for (size_t i = 0; i < _flat_writers.size(); i++) {
|
||||
RETURN_IF_ERROR(_flat_writers[i]->finish());
|
||||
|
||||
// Record dict validity for each sub-column
|
||||
bool sub_dict_valid = _flat_writers[i]->is_global_dict_valid();
|
||||
std::string sub_column_key = _column_name + "." + _flat_paths[i];
|
||||
_subcolumn_dict_valid[sub_column_key] = sub_dict_valid;
|
||||
}
|
||||
return _json_writer->finish();
|
||||
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status JsonColumnCompactor::append(const Column& column) {
|
||||
|
|
@ -222,7 +234,12 @@ Status JsonColumnCompactor::finish() {
|
|||
_json_meta->mutable_json_meta()->set_format_version(kJsonMetaDefaultFormatVersion);
|
||||
_json_meta->mutable_json_meta()->set_has_remain(false);
|
||||
_json_meta->mutable_json_meta()->set_is_flat(false);
|
||||
return _json_writer->finish();
|
||||
|
||||
// Check global dict validity
|
||||
RETURN_IF_ERROR(_json_writer->finish());
|
||||
_is_global_dict_valid = _json_writer->is_global_dict_valid();
|
||||
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
} // namespace starrocks
|
||||
|
|
|
|||
|
|
@ -64,11 +64,14 @@ public:
|
|||
ordinal_t get_next_rowid() const override { return _json_writer->get_next_rowid(); }
|
||||
uint64_t total_mem_footprint() const override { return _json_writer->total_mem_footprint(); }
|
||||
|
||||
bool is_global_dict_valid() override { return _is_global_dict_valid; }
|
||||
|
||||
private:
|
||||
void _flat_column(Columns& json_datas);
|
||||
|
||||
private:
|
||||
ColumnMetaPB* _json_meta;
|
||||
std::unique_ptr<ScalarColumnWriter> _json_writer;
|
||||
bool _is_global_dict_valid = true;
|
||||
};
|
||||
} // namespace starrocks
|
||||
|
|
|
|||
|
|
@ -52,7 +52,9 @@ FlatJsonColumnWriter::FlatJsonColumnWriter(const ColumnWriterOptions& opts, Type
|
|||
_json_meta(opts.meta),
|
||||
_wfile(wfile),
|
||||
_json_writer(std::move(json_writer)),
|
||||
_flat_json_config(opts.flat_json_config) {}
|
||||
_flat_json_config(opts.flat_json_config),
|
||||
_global_dict(opts.flat_json_dicts),
|
||||
_column_name(opts.field_name) {}
|
||||
|
||||
Status FlatJsonColumnWriter::init() {
|
||||
_json_meta->mutable_json_meta()->set_format_version(kJsonMetaDefaultFormatVersion);
|
||||
|
|
@ -193,6 +195,19 @@ Status FlatJsonColumnWriter::_init_flat_writers() {
|
|||
opts.need_flat = false;
|
||||
opts.need_zone_map = config::json_flat_create_zonemap && is_zone_map_key_type(_flat_types[i]);
|
||||
|
||||
// Set global dict for sub-columns that support it
|
||||
if (is_string_type(_flat_types[i])) {
|
||||
std::string sub_column_key = _column_name + "." + _flat_paths[i];
|
||||
auto it = _global_dict.find(sub_column_key);
|
||||
if (it != _global_dict.end()) {
|
||||
const GlobalDictMap& sub_map = it->second;
|
||||
opts.global_dict = &sub_map;
|
||||
_subcolumn_dict_valid[sub_column_key] = true;
|
||||
} else {
|
||||
_subcolumn_dict_valid[sub_column_key] = false;
|
||||
}
|
||||
}
|
||||
|
||||
TabletColumn col(StorageAggregateType::STORAGE_AGGREGATE_NONE, _flat_types[i], true);
|
||||
ASSIGN_OR_RETURN(auto fw, ColumnWriter::create(opts, &col, _wfile));
|
||||
_flat_writers.emplace_back(std::move(fw));
|
||||
|
|
@ -213,6 +228,23 @@ Status FlatJsonColumnWriter::_write_flat_column() {
|
|||
return Status::OK();
|
||||
}
|
||||
|
||||
const std::map<std::string, bool>& FlatJsonColumnWriter::get_subcolumn_dict_valid() const {
|
||||
return _subcolumn_dict_valid;
|
||||
}
|
||||
|
||||
bool FlatJsonColumnWriter::is_global_dict_valid() {
|
||||
// If any value in _subcolumn_dict_valid is true, return true
|
||||
if (_subcolumn_dict_valid.empty()) {
|
||||
return false;
|
||||
}
|
||||
for (const auto& kv : _subcolumn_dict_valid) {
|
||||
if (kv.second) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
Status FlatJsonColumnWriter::finish() {
|
||||
auto st = _flat_column(_json_datas);
|
||||
_is_flat = st.ok();
|
||||
|
|
@ -222,11 +254,21 @@ Status FlatJsonColumnWriter::finish() {
|
|||
}
|
||||
}
|
||||
_json_datas.clear(); // release after write
|
||||
// flat datas
|
||||
|
||||
RETURN_IF_ERROR(_json_writer->finish());
|
||||
|
||||
for (size_t i = 0; i < _flat_writers.size(); i++) {
|
||||
RETURN_IF_ERROR(_flat_writers[i]->finish());
|
||||
std::string sub_column_key = _column_name + "." + _flat_paths[i];
|
||||
|
||||
// Record dict validity for each sub-column
|
||||
if (_subcolumn_dict_valid[sub_column_key]) {
|
||||
bool sub_dict_valid = _flat_writers[i]->is_global_dict_valid();
|
||||
_subcolumn_dict_valid[sub_column_key] = sub_dict_valid;
|
||||
}
|
||||
}
|
||||
return _json_writer->finish();
|
||||
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
ordinal_t FlatJsonColumnWriter::get_next_rowid() const {
|
||||
|
|
|
|||
|
|
@ -14,6 +14,8 @@
|
|||
|
||||
#pragma once
|
||||
|
||||
#include <map>
|
||||
|
||||
#include "storage/rowset/column_writer.h"
|
||||
|
||||
namespace starrocks {
|
||||
|
|
@ -47,6 +49,11 @@ public:
|
|||
Status write_bloom_filter_index() override;
|
||||
ordinal_t get_next_rowid() const override;
|
||||
|
||||
bool is_global_dict_valid() override;
|
||||
|
||||
// Get global dict validity status for each sub-column
|
||||
const std::map<std::string, bool>& get_subcolumn_dict_valid() const;
|
||||
|
||||
uint64_t total_mem_footprint() const override;
|
||||
|
||||
protected:
|
||||
|
|
@ -73,5 +80,13 @@ protected:
|
|||
std::shared_ptr<BloomFilter> _remain_filter;
|
||||
bool _is_flat = false;
|
||||
const FlatJsonConfig* _flat_json_config = nullptr;
|
||||
|
||||
// Store original options for sub-column global dict setup
|
||||
// FIXME: avoid copy the map
|
||||
const std::unordered_map<std::string, const GlobalDictMap> _global_dict;
|
||||
std::string _column_name;
|
||||
|
||||
// Track global dict validity for each sub-column
|
||||
std::map<std::string, bool> _subcolumn_dict_valid;
|
||||
};
|
||||
} // namespace starrocks
|
||||
|
|
|
|||
|
|
@ -48,6 +48,7 @@
|
|||
#include "storage/index/index_descriptor.h"
|
||||
#include "storage/row_store_encoder.h"
|
||||
#include "storage/rowset/column_writer.h" // ColumnWriter
|
||||
#include "storage/rowset/json_column_writer.h"
|
||||
#include "storage/rowset/page_io.h"
|
||||
#include "storage/seek_tuple.h"
|
||||
#include "storage/short_key_index.h"
|
||||
|
|
@ -206,6 +207,20 @@ Status SegmentWriter::init(const std::vector<uint32_t>& column_indexes, bool has
|
|||
_global_dict_columns_valid_info[iter->first] = true;
|
||||
}
|
||||
}
|
||||
if (column.type() == LogicalType::TYPE_JSON && _opts.global_dicts != nullptr) {
|
||||
opts.field_name = column.name();
|
||||
std::string_view col_name = column.name();
|
||||
for (auto& [k, dict_v] : *_opts.global_dicts) {
|
||||
// k can be a.b.c, we must check the first token matches column.name()
|
||||
size_t dot_pos = k.find('.');
|
||||
std::string first_token = (dot_pos == std::string::npos) ? k : k.substr(0, dot_pos);
|
||||
if (first_token == col_name) {
|
||||
opts.flat_json_dicts.emplace(k, dict_v.dict);
|
||||
_global_dict_columns_valid_info[k] = true;
|
||||
VLOG(2) << "set global dict for json column: " << k;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
opts.need_flat = config::enable_json_flat;
|
||||
opts.is_compaction = _opts.is_compaction;
|
||||
|
|
@ -317,11 +332,7 @@ Status SegmentWriter::finalize_columns(uint64_t* index_size) {
|
|||
*index_size += _wfile->size() - index_offset + standalone_index_size;
|
||||
|
||||
// check global dict valid
|
||||
const auto& column = _tablet_schema->column(column_index);
|
||||
if (!column_writer->is_global_dict_valid() && is_string_type(column.type())) {
|
||||
std::string col_name(column.name());
|
||||
_global_dict_columns_valid_info[col_name] = false;
|
||||
}
|
||||
_check_column_global_dict_valid(column_writer.get(), column_index);
|
||||
|
||||
// reset to release memory
|
||||
column_writer.reset();
|
||||
|
|
@ -449,4 +460,26 @@ StatusOr<std::unique_ptr<io::NumericStatistics>> SegmentWriter::get_numeric_stat
|
|||
return _wfile->get_numeric_statistics();
|
||||
}
|
||||
|
||||
void SegmentWriter::_check_column_global_dict_valid(ColumnWriter* column_writer, uint32_t column_index) {
|
||||
const auto& column = _tablet_schema->column(column_index);
|
||||
|
||||
// Check global dict valid for string types
|
||||
if (!column_writer->is_global_dict_valid() && is_string_type(column.type())) {
|
||||
std::string col_name(column.name());
|
||||
_global_dict_columns_valid_info[col_name] = false;
|
||||
}
|
||||
|
||||
// Check global dict valid for JSON type and collect sub-column dict info
|
||||
if (column.type() == LogicalType::TYPE_JSON) {
|
||||
auto* flat_json_writer = dynamic_cast<FlatJsonColumnWriter*>(column_writer);
|
||||
if (flat_json_writer != nullptr) {
|
||||
// Collect dict validity for each sub-column
|
||||
const auto& subcolumn_dict_valid = flat_json_writer->get_subcolumn_dict_valid();
|
||||
for (const auto& kv : subcolumn_dict_valid) {
|
||||
_global_dict_columns_valid_info[kv.first] = kv.second;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
} // namespace starrocks
|
||||
|
|
|
|||
|
|
@ -46,6 +46,7 @@
|
|||
#include "gutil/macros.h"
|
||||
#include "io/input_stream.h"
|
||||
#include "runtime/global_dict/types.h"
|
||||
#include "runtime/global_dict/types_fwd_decl.h"
|
||||
#include "storage/row_store_encoder_factory.h"
|
||||
#include "storage/tablet_schema.h"
|
||||
|
||||
|
|
@ -162,6 +163,9 @@ private:
|
|||
void _init_column_meta(ColumnMetaPB* meta, uint32_t column_id, const TabletColumn& column);
|
||||
void _verify_footer();
|
||||
|
||||
// Check global dictionary validity for a single column writer
|
||||
void _check_column_global_dict_valid(ColumnWriter* column_writer, uint32_t column_index);
|
||||
|
||||
uint32_t _segment_id;
|
||||
TabletSchemaCSPtr _tablet_schema;
|
||||
SegmentWriterOptions _opts;
|
||||
|
|
|
|||
|
|
@ -18,13 +18,18 @@
|
|||
|
||||
#include <memory>
|
||||
|
||||
#include "column/array_column.h"
|
||||
#include "column/fixed_length_column.h"
|
||||
#include "column/json_column.h"
|
||||
#include "column/nullable_column.h"
|
||||
#include "fs/fs_util.h"
|
||||
#include "fs/key_cache.h"
|
||||
#include "storage/rowset/column_iterator.h"
|
||||
#include "storage/rowset/segment_writer.h"
|
||||
#include "testutil/assert.h"
|
||||
|
||||
namespace starrocks {
|
||||
using fs::delete_file;
|
||||
|
||||
class SegmentMetaCollecterTest : public ::testing::Test {
|
||||
public:
|
||||
|
|
@ -52,13 +57,20 @@ public:
|
|||
ASSIGN_OR_ABORT(_segment, Segment::open(fs, file_info, 0, _tablet_schema));
|
||||
}
|
||||
|
||||
~SegmentMetaCollecterTest() { fs::delete_file(_segment_name); }
|
||||
~SegmentMetaCollecterTest() { delete_file(_segment_name); }
|
||||
|
||||
protected:
|
||||
std::string _segment_name;
|
||||
TabletSchemaCSPtr _tablet_schema;
|
||||
SegmentSharedPtr _segment;
|
||||
std::string _segment_encryption_meta;
|
||||
|
||||
// Helper method to create array column
|
||||
ColumnPtr create_array_column() {
|
||||
auto elements = NullableColumn::create(BinaryColumn::create(), NullColumn::create());
|
||||
auto offsets = UInt32Column::create();
|
||||
return ArrayColumn::create(std::move(elements), std::move(offsets));
|
||||
}
|
||||
};
|
||||
|
||||
TEST_F(SegmentMetaCollecterTest, test_init) {
|
||||
|
|
@ -110,4 +122,76 @@ TEST_F(SegmentMetaCollecterTest, test_open_and_collect) {
|
|||
EXPECT_EQ(0, col->get(0).get_int64());
|
||||
}
|
||||
|
||||
TEST_F(SegmentMetaCollecterTest, test_collect_dict_json_column_success) {
|
||||
// Create a proper JSON segment with actual data
|
||||
TabletSchemaPB json_schema_pb;
|
||||
auto json_col = json_schema_pb.add_column();
|
||||
json_col->set_name("json_col");
|
||||
json_col->set_type("JSON");
|
||||
json_col->set_is_key(false);
|
||||
json_col->set_is_nullable(true);
|
||||
auto json_tablet_schema = TabletSchema::create(json_schema_pb);
|
||||
|
||||
// Create JSON segment with dictionary-encoded string data
|
||||
std::string json_segment_name = "json_meta_collector_test.dat";
|
||||
DeferOp defer_op([&] { delete_file(json_segment_name); });
|
||||
ASSIGN_OR_ABORT(auto fs, FileSystem::CreateSharedFromString(json_segment_name));
|
||||
auto encryption_pair = KeyCache::instance().create_plain_random_encryption_meta_pair().value();
|
||||
WritableFileOptions options{.mode = FileSystem::CREATE_OR_OPEN_WITH_TRUNCATE,
|
||||
.encryption_info = encryption_pair.info};
|
||||
ASSIGN_OR_ABORT(auto wf, fs->new_writable_file(options, json_segment_name));
|
||||
|
||||
SegmentWriterOptions seg_opts;
|
||||
seg_opts.flat_json_config = std::make_shared<FlatJsonConfig>();
|
||||
seg_opts.flat_json_config->set_flat_json_enabled(true);
|
||||
SegmentWriter json_writer(std::move(wf), 0, json_tablet_schema, seg_opts);
|
||||
EXPECT_OK(json_writer.init());
|
||||
|
||||
// Create JSON data with repeated string values for dictionary encoding
|
||||
auto json_column = JsonColumn::create();
|
||||
std::vector<std::string> json_strings = {R"({"name": "Alice", "city": "Beijing"})",
|
||||
R"({"name": "Bob", "city": "Beijing"})",
|
||||
R"({"name": "Alice", "city": "Shanghai"})"};
|
||||
|
||||
for (const auto& json_str : json_strings) {
|
||||
JsonValue json_value;
|
||||
ASSERT_OK(JsonValue::parse(json_str, &json_value));
|
||||
json_column->append(&json_value);
|
||||
}
|
||||
|
||||
SchemaPtr chunk_schema = std::make_shared<Schema>(json_tablet_schema->schema());
|
||||
auto chunk = std::make_shared<Chunk>(Columns{json_column}, chunk_schema);
|
||||
for (int i = 0; i < 100; i++) {
|
||||
ASSERT_OK(json_writer.append_chunk(*chunk));
|
||||
}
|
||||
|
||||
uint64_t file_size, index_size, footer_pos;
|
||||
EXPECT_OK(json_writer.finalize(&file_size, &index_size, &footer_pos));
|
||||
|
||||
// Open the JSON segment
|
||||
FileInfo json_file_info{.path = json_segment_name, .encryption_meta = encryption_pair.encryption_meta};
|
||||
ASSIGN_OR_ABORT(auto json_segment, Segment::open(fs, json_file_info, 0, json_tablet_schema));
|
||||
|
||||
// Test dictionary collection from JSON column
|
||||
SegmentMetaCollecter json_collecter(json_segment);
|
||||
SegmentMetaCollecterParams params;
|
||||
params.fields.emplace_back("dict_merge");
|
||||
params.field_type.emplace_back(LogicalType::TYPE_ARRAY);
|
||||
params.cids.emplace_back(0);
|
||||
params.read_page.emplace_back(true); // Need to read page for JSON
|
||||
params.tablet_schema = json_tablet_schema;
|
||||
params.low_cardinality_threshold = 1000;
|
||||
EXPECT_OK(json_collecter.init(¶ms));
|
||||
EXPECT_OK(json_collecter.open());
|
||||
|
||||
auto array_col = create_array_column();
|
||||
|
||||
// Test successful dictionary collection for JSON column
|
||||
Status status = json_collecter._collect_dict(0, array_col.get(), LogicalType::TYPE_ARRAY);
|
||||
|
||||
ASSERT_OK(status);
|
||||
EXPECT_GT(array_col->size(), 0);
|
||||
EXPECT_EQ("[['Beijing','Shanghai'], ['Alice','Bob']]", array_col->debug_string());
|
||||
}
|
||||
|
||||
} // namespace starrocks
|
||||
|
|
|
|||
|
|
@ -26,6 +26,7 @@
|
|||
#include "column/vectorized_fwd.h"
|
||||
#include "common/config.h"
|
||||
#include "common/statusor.h"
|
||||
#include "fs/fs.h"
|
||||
#include "fs/fs_memory.h"
|
||||
#include "gen_cpp/PlanNodes_types.h"
|
||||
#include "gutil/casts.h"
|
||||
|
|
@ -63,7 +64,7 @@ public:
|
|||
protected:
|
||||
void SetUp() override {
|
||||
config::enable_json_flat_complex_type = true;
|
||||
_meta.reset(new ColumnMetaPB());
|
||||
_meta = std::make_shared<ColumnMetaPB>();
|
||||
}
|
||||
|
||||
void TearDown() override {
|
||||
|
|
@ -158,7 +159,7 @@ protected:
|
|||
return json_col;
|
||||
}
|
||||
|
||||
private:
|
||||
protected:
|
||||
std::shared_ptr<TabletSchema> _dummy_segment_schema;
|
||||
std::shared_ptr<ColumnMetaPB> _meta;
|
||||
};
|
||||
|
|
@ -2499,4 +2500,249 @@ TEST_F(FlatJsonColumnRWTest, testSegmentWriterIteratorWithMixedDataTypes) {
|
|||
}
|
||||
}
|
||||
|
||||
TEST_F(FlatJsonColumnRWTest, test_json_global_dict) {
|
||||
// Test JSON global dictionary functionality
|
||||
auto fs = std::make_shared<MemoryFileSystem>();
|
||||
const std::string file_name = "/tmp/test_json_global_dict.dat";
|
||||
ASSERT_TRUE(fs->create_dir("/tmp/").ok());
|
||||
|
||||
// Prepare test data with repeated JSON values to trigger dictionary encoding
|
||||
std::vector<std::string> json_strings = {
|
||||
R"({"name": "Alice", "age": 30, "city": "Beijing"})",
|
||||
R"({"name": "Bob", "age": 25, "city": "Shanghai"})",
|
||||
R"({"name": "Alice", "age": 35, "city": "Beijing"})", // repeated name and city
|
||||
R"({"name": "Charlie", "age": 28, "city": "Guangzhou"})",
|
||||
R"({"name": "Bob", "age": 30, "city": "Shanghai"})", // repeated name and city
|
||||
R"({"name": "Alice", "age": 40, "city": "Beijing"})", // repeated name and city
|
||||
};
|
||||
TabletSchemaPB tablet_schema_pb;
|
||||
auto* column_pb = tablet_schema_pb.add_column();
|
||||
column_pb->set_name("test_json");
|
||||
column_pb->set_type("JSON");
|
||||
column_pb->set_is_key(false);
|
||||
column_pb->set_is_nullable(true);
|
||||
column_pb->set_unique_id(0);
|
||||
auto tablet_schema = TabletSchema::create(tablet_schema_pb);
|
||||
SchemaPtr chunk_schema = std::make_shared<Schema>(tablet_schema->schema());
|
||||
TabletColumn json_tablet_column = create_with_default_value<TYPE_JSON>("");
|
||||
|
||||
auto write_data = [tablet_schema](std::unique_ptr<WritableFile> wfile, const std::vector<std::string>& json_strings,
|
||||
const SegmentWriterOptions& opts, std::unique_ptr<SegmentWriter>* out_writer) {
|
||||
auto writer = std::make_unique<SegmentWriter>(std::move(wfile), 0, tablet_schema, opts);
|
||||
ASSERT_OK(writer->init());
|
||||
|
||||
auto json_column = ColumnHelper::create_column(TypeDescriptor::create_json_type(), true);
|
||||
SchemaPtr chunk_schema = std::make_shared<Schema>(tablet_schema->schema());
|
||||
auto chunk = std::make_shared<Chunk>(Columns{json_column}, chunk_schema);
|
||||
|
||||
// Add JSON data to chunk
|
||||
for (const auto& json_str : json_strings) {
|
||||
JsonValue json_value;
|
||||
ASSERT_OK(JsonValue::parse(json_str, &json_value));
|
||||
json_column->append_datum(Datum(&json_value));
|
||||
}
|
||||
|
||||
ASSERT_OK(writer->append_chunk(*chunk));
|
||||
|
||||
uint64_t index_size = 0;
|
||||
uint64_t footer_position = 0;
|
||||
ASSERT_OK(writer->finalize_columns(&index_size));
|
||||
ASSERT_OK(writer->finalize_footer(&footer_position));
|
||||
*out_writer = std::move(writer);
|
||||
};
|
||||
|
||||
// Write data with global dictionary
|
||||
{
|
||||
// Create global dictionary for sub-columns
|
||||
GlobalDictByNameMaps global_dicts;
|
||||
GlobalDictMap name_dict = {{"Alice", 0}, {"Bob", 1}, {"Charlie", 2}};
|
||||
GlobalDictMap city_dict = {{"Beijing", 0}, {"Shanghai", 1}, {"Guangzhou", 2}};
|
||||
|
||||
// Store dictionaries with column names
|
||||
global_dicts["test_json.name"] = GlobalDictsWithVersion<GlobalDictMap>{name_dict, 1};
|
||||
global_dicts["test_json.city"] = GlobalDictsWithVersion<GlobalDictMap>{city_dict, 1};
|
||||
|
||||
SegmentWriterOptions opts;
|
||||
opts.global_dicts = &global_dicts;
|
||||
opts.flat_json_config = std::make_shared<FlatJsonConfig>();
|
||||
opts.flat_json_config->set_flat_json_enabled(true);
|
||||
ASSIGN_OR_ABORT(auto wfile, fs->new_writable_file(file_name));
|
||||
|
||||
auto json_column = ColumnHelper::create_column(TypeDescriptor::create_json_type(), true);
|
||||
auto chunk = std::make_shared<Chunk>(Columns{json_column}, chunk_schema);
|
||||
|
||||
// Add JSON data to chunk
|
||||
for (const auto& json_str : json_strings) {
|
||||
JsonValue json_value;
|
||||
ASSERT_OK(JsonValue::parse(json_str, &json_value));
|
||||
json_column->append_datum(Datum(&json_value));
|
||||
}
|
||||
|
||||
std::unique_ptr<SegmentWriter> writer;
|
||||
write_data(std::move(wfile), json_strings, opts, &writer);
|
||||
|
||||
// Check global dictionary validity
|
||||
const auto& dict_valid_info = writer->global_dict_columns_valid_info();
|
||||
ASSERT_TRUE(dict_valid_info.count("test_json.name") > 0);
|
||||
ASSERT_TRUE(dict_valid_info.at("test_json.name")); // Should be valid
|
||||
ASSERT_TRUE(dict_valid_info.count("test_json.city") > 0);
|
||||
ASSERT_TRUE(dict_valid_info.at("test_json.city")); // Should be valid
|
||||
}
|
||||
|
||||
// Another batch of JSON strings with different values
|
||||
std::vector<std::string> json_strings2 = {
|
||||
R"({"name": "David", "age": 22, "city": "Shenzhen"})", R"({"name": "Eve", "age": 29, "city": "Hangzhou"})",
|
||||
R"({"name": "Frank", "age": 33, "city": "Chengdu"})", R"({"name": "Grace", "age": 27, "city": "Wuhan"})",
|
||||
R"({"name": "Heidi", "age": 31, "city": "Nanjing"})", R"({"name": "Ivan", "age": 26, "city": "Suzhou"})"};
|
||||
|
||||
// Test with invalid global dictionary (missing values)
|
||||
{
|
||||
ASSIGN_OR_ABORT(auto wfile2, fs->new_writable_file(file_name + "_invalid"));
|
||||
|
||||
// Create incomplete dictionary that doesn't contain all values
|
||||
GlobalDictByNameMaps invalid_global_dicts;
|
||||
|
||||
GlobalDictMap incomplete_name_dict;
|
||||
incomplete_name_dict["Alice"] = 0;
|
||||
incomplete_name_dict["Bob"] = 1;
|
||||
|
||||
invalid_global_dicts["test_json.name"] = GlobalDictsWithVersion<GlobalDictMap>{incomplete_name_dict, 1};
|
||||
|
||||
SegmentWriterOptions seg_opts;
|
||||
seg_opts.global_dicts = &invalid_global_dicts;
|
||||
seg_opts.flat_json_config = std::make_shared<FlatJsonConfig>();
|
||||
seg_opts.flat_json_config->set_flat_json_enabled(true);
|
||||
|
||||
auto json_column = ColumnHelper::create_column(TypeDescriptor::create_json_type(), true);
|
||||
auto chunk = std::make_shared<Chunk>(Columns{json_column}, chunk_schema);
|
||||
|
||||
// Add JSON data to chunk
|
||||
for (const auto& json_str : json_strings2) {
|
||||
JsonValue json_value;
|
||||
ASSERT_OK(JsonValue::parse(json_str, &json_value));
|
||||
json_column->append_datum(Datum(&json_value));
|
||||
}
|
||||
|
||||
std::unique_ptr<SegmentWriter> writer;
|
||||
write_data(std::move(wfile2), json_strings2, seg_opts, &writer);
|
||||
|
||||
const auto& dict_valid_info = writer->global_dict_columns_valid_info();
|
||||
ASSERT_TRUE(dict_valid_info.count("test_json.city") > 0);
|
||||
ASSERT_FALSE(dict_valid_info.at("test_json.city")); // Should be invalid
|
||||
}
|
||||
// Clean up test files
|
||||
ASSERT_OK(fs->delete_file(file_name));
|
||||
ASSERT_OK(fs->delete_file(file_name + "_invalid"));
|
||||
|
||||
// Test case 1: Generate JSON data with same fields but different data types
|
||||
std::vector<std::string> json_strings_different_types = {
|
||||
R"({"name": "Alice", "age": 30, "city": "Beijing"})",
|
||||
R"({"name": "Bob", "age": "25", "city": "Shanghai"})", // age is string instead of number
|
||||
R"({"name": "Charlie", "age": 28, "city": "Guangzhou"})",
|
||||
R"({"name": "David", "age": 35.5, "city": "Shenzhen"})", // age is float instead of integer
|
||||
R"({"name": "Eve", "age": 29, "city": 12345})", // city is number instead of string
|
||||
R"({"name": "Frank", "age": 33, "city": "Chengdu"})"};
|
||||
|
||||
// Test with global dictionary that expects consistent data types
|
||||
{
|
||||
ASSIGN_OR_ABORT(auto wfile3, fs->new_writable_file(file_name + "_different_types"));
|
||||
|
||||
// Create global dictionary expecting consistent data types
|
||||
GlobalDictByNameMaps type_consistent_global_dicts;
|
||||
GlobalDictMap name_dict = {{"Alice", 0}, {"Bob", 1}, {"Charlie", 2}, {"David", 3}, {"Eve", 4}, {"Frank", 5}};
|
||||
GlobalDictMap age_dict = {{"30", 0}, {"25", 1}, {"28", 2}, {"35", 3}, {"29", 4}, {"33", 5}}; // All as strings
|
||||
GlobalDictMap city_dict = {{"Beijing", 0}, {"Shanghai", 1}, {"Guangzhou", 2}, {"Shenzhen", 3}, {"Chengdu", 4}};
|
||||
|
||||
type_consistent_global_dicts["test_json.name"] = GlobalDictsWithVersion<GlobalDictMap>{name_dict, 1};
|
||||
type_consistent_global_dicts["test_json.age"] = GlobalDictsWithVersion<GlobalDictMap>{age_dict, 1};
|
||||
type_consistent_global_dicts["test_json.city"] = GlobalDictsWithVersion<GlobalDictMap>{city_dict, 1};
|
||||
|
||||
SegmentWriterOptions seg_opts3;
|
||||
seg_opts3.global_dicts = &type_consistent_global_dicts;
|
||||
seg_opts3.flat_json_config = std::make_shared<FlatJsonConfig>();
|
||||
seg_opts3.flat_json_config->set_flat_json_enabled(true);
|
||||
|
||||
auto json_column = ColumnHelper::create_column(TypeDescriptor::create_json_type(), true);
|
||||
auto chunk = std::make_shared<Chunk>(Columns{json_column}, chunk_schema);
|
||||
|
||||
// Add JSON data with different data types to chunk
|
||||
for (const auto& json_str : json_strings_different_types) {
|
||||
JsonValue json_value;
|
||||
ASSERT_OK(JsonValue::parse(json_str, &json_value));
|
||||
json_column->append_datum(Datum(&json_value));
|
||||
}
|
||||
|
||||
std::unique_ptr<SegmentWriter> writer;
|
||||
write_data(std::move(wfile3), json_strings_different_types, seg_opts3, &writer);
|
||||
|
||||
// Check that dictionaries are invalidated due to data type inconsistencies
|
||||
const auto& dict_valid_info = writer->global_dict_columns_valid_info();
|
||||
ASSERT_TRUE(dict_valid_info.count("test_json.name") > 0);
|
||||
ASSERT_TRUE(dict_valid_info.at("test_json.name")); // Name should still be valid (all strings)
|
||||
ASSERT_TRUE(dict_valid_info.count("test_json.age") > 0);
|
||||
ASSERT_FALSE(dict_valid_info.at("test_json.age")); // Age should be invalid (mixed types)
|
||||
ASSERT_TRUE(dict_valid_info.count("test_json.city") > 0);
|
||||
ASSERT_FALSE(dict_valid_info.at("test_json.city")); // City should be invalid (mixed types)
|
||||
}
|
||||
|
||||
// Test case 2: Verify dict invalidation with more complex type mismatches
|
||||
std::vector<std::string> json_strings_complex_types = {
|
||||
R"({"user": {"id": 1001, "name": "Alice", "active": true, "score": 95.5}})",
|
||||
R"({"user": {"id": [1,2,3], "name": "Bob", "active": "yes", "score": 88}})", // id is array, active is string, score is int
|
||||
R"({"user": {"id": 1003, "name": "Charlie", "active": false, "score": 92.0}})",
|
||||
R"({"user": {"id": 1004, "name": "David", "active": 1, "score": "85.5"}})", // active is number, score is string
|
||||
R"({"user": {"id": 1005, "name": {"nick": "murphy"}, "active": true, "score": 90}})" // score is int, name is object
|
||||
};
|
||||
|
||||
{
|
||||
ASSIGN_OR_ABORT(auto wfile4, fs->new_writable_file(file_name + "_complex_types"));
|
||||
|
||||
// Create global dictionary for nested fields
|
||||
GlobalDictByNameMaps complex_global_dicts;
|
||||
GlobalDictMap user_id_dict = {{"1001", 0}, {"1002", 1}, {"1003", 2}, {"1004", 3}, {"1005", 4}};
|
||||
GlobalDictMap user_name_dict = {{"Alice", 0}, {"Bob", 1}, {"Charlie", 2}, {"David", 3}, {"Eve", 4}};
|
||||
GlobalDictMap user_active_dict = {{"true", 0}, {"false", 1}};
|
||||
GlobalDictMap user_score_dict = {{"95.5", 0}, {"88", 1}, {"92.0", 2}, {"85.5", 3}, {"90", 4}};
|
||||
|
||||
complex_global_dicts["test_json.user.id"] = GlobalDictsWithVersion<GlobalDictMap>{user_id_dict, 1};
|
||||
complex_global_dicts["test_json.user.name"] = GlobalDictsWithVersion<GlobalDictMap>{user_name_dict, 1};
|
||||
complex_global_dicts["test_json.user.active"] = GlobalDictsWithVersion<GlobalDictMap>{user_active_dict, 1};
|
||||
complex_global_dicts["test_json.user.score"] = GlobalDictsWithVersion<GlobalDictMap>{user_score_dict, 1};
|
||||
|
||||
SegmentWriterOptions seg_opts4;
|
||||
seg_opts4.global_dicts = &complex_global_dicts;
|
||||
seg_opts4.flat_json_config = std::make_shared<FlatJsonConfig>();
|
||||
seg_opts4.flat_json_config->set_flat_json_enabled(true);
|
||||
|
||||
auto json_column = ColumnHelper::create_column(TypeDescriptor::create_json_type(), true);
|
||||
auto chunk = std::make_shared<Chunk>(Columns{json_column}, chunk_schema);
|
||||
|
||||
// Add JSON data with complex type mismatches to chunk
|
||||
for (const auto& json_str : json_strings_complex_types) {
|
||||
JsonValue json_value;
|
||||
ASSERT_OK(JsonValue::parse(json_str, &json_value));
|
||||
json_column->append_datum(Datum(&json_value));
|
||||
}
|
||||
|
||||
std::unique_ptr<SegmentWriter> writer;
|
||||
write_data(std::move(wfile4), json_strings_complex_types, seg_opts4, &writer);
|
||||
|
||||
// Check that dictionaries are invalidated due to complex type mismatches
|
||||
const auto& dict_valid_info = writer->global_dict_columns_valid_info();
|
||||
ASSERT_TRUE(dict_valid_info.count("test_json.user.name") > 0);
|
||||
ASSERT_FALSE(dict_valid_info.at("test_json.user.name")); // Name should be invalid(mixed object/string)
|
||||
ASSERT_TRUE(dict_valid_info.count("test_json.user.id") > 0);
|
||||
ASSERT_FALSE(dict_valid_info.at("test_json.user.id")); // ID should be invalid (mixed int/string)
|
||||
ASSERT_TRUE(dict_valid_info.count("test_json.user.active") > 0);
|
||||
ASSERT_FALSE(
|
||||
dict_valid_info.at("test_json.user.active")); // Active should be invalid (mixed bool/string/number)
|
||||
ASSERT_TRUE(dict_valid_info.count("test_json.user.score") > 0);
|
||||
ASSERT_FALSE(dict_valid_info.at("test_json.user.score")); // Score should be invalid (mixed float/int/string)
|
||||
}
|
||||
|
||||
// Clean up additional test files
|
||||
ASSERT_OK(fs->delete_file(file_name + "_different_types"));
|
||||
ASSERT_OK(fs->delete_file(file_name + "_complex_types"));
|
||||
}
|
||||
|
||||
} // namespace starrocks
|
||||
|
|
|
|||
|
|
@ -1250,7 +1250,8 @@ struct TMetaScanNode {
|
|||
// column id to column name
|
||||
1: optional map<i32, string> id_to_names
|
||||
2: optional list<Descriptors.TColumn> columns
|
||||
3: optional i32 low_cardinality_threshold;
|
||||
3: optional i32 low_cardinality_threshold
|
||||
4: optional list<TColumnAccessPath> column_access_paths
|
||||
}
|
||||
|
||||
struct TDecodeNode {
|
||||
|
|
|
|||
Loading…
Reference in New Issue