Co-authored-by: Murphy <96611012+murphyatwork@users.noreply.github.com> Co-authored-by: Cursor Agent <cursoragent@cursor.com>
This commit is contained in:
parent
5cfe433d5b
commit
98477be39a
|
|
@ -35,7 +35,8 @@
|
|||
namespace starrocks {
|
||||
|
||||
std::vector<std::string> SegmentMetaCollecter::support_collect_fields = {
|
||||
META_FLAT_JSON_META, META_DICT_MERGE, META_MAX, META_MIN, META_COUNT_ROWS, META_COUNT_COL};
|
||||
META_FLAT_JSON_META, META_DICT_MERGE, META_MAX, META_MIN,
|
||||
META_COUNT_ROWS, META_COUNT_COL, META_COLUMN_SIZE, META_COLUMN_COMPRESSED_SIZE};
|
||||
|
||||
Status SegmentMetaCollecter::parse_field_and_colname(const std::string& item, std::string* field,
|
||||
std::string* col_name) {
|
||||
|
|
@ -140,6 +141,11 @@ Status MetaReader::_fill_result_chunk(Chunk* chunk) {
|
|||
desc.children.emplace_back(item_desc);
|
||||
MutableColumnPtr column = ColumnHelper::create_column(desc, false);
|
||||
chunk->append_column(std::move(column), slot->id());
|
||||
} else if (field == META_COLUMN_SIZE || field == META_COLUMN_COMPRESSED_SIZE) {
|
||||
TypeDescriptor desc;
|
||||
desc.type = TYPE_BIGINT;
|
||||
MutableColumnPtr column = ColumnHelper::create_column(desc, true);
|
||||
chunk->append_column(std::move(column), slot->id());
|
||||
} else {
|
||||
MutableColumnPtr column = ColumnHelper::create_column(slot->type(), true);
|
||||
chunk->append_column(std::move(column), slot->id());
|
||||
|
|
@ -236,6 +242,10 @@ Status SegmentMetaCollecter::_collect(const std::string& name, ColumnId cid, Col
|
|||
return _collect_flat_json(cid, column);
|
||||
} else if (name == META_COUNT_COL) {
|
||||
return _collect_count(cid, column, type);
|
||||
} else if (name == META_COLUMN_SIZE) {
|
||||
return _collect_column_size(cid, column, type);
|
||||
} else if (name == META_COLUMN_COMPRESSED_SIZE) {
|
||||
return _collect_column_compressed_size(cid, column, type);
|
||||
}
|
||||
return Status::NotSupported("Not Support Collect Meta: " + name);
|
||||
}
|
||||
|
|
@ -411,7 +421,7 @@ Status SegmentMetaCollecter::__collect_max_or_min(ColumnId cid, Column* column,
|
|||
if (cid >= _segment->num_columns()) {
|
||||
return Status::NotFound("");
|
||||
}
|
||||
const ColumnReader* col_reader = _segment->column(cid);
|
||||
ColumnReader* col_reader = const_cast<ColumnReader*>(_segment->column(cid));
|
||||
if (col_reader == nullptr || col_reader->segment_zone_map() == nullptr) {
|
||||
return Status::NotFound("");
|
||||
}
|
||||
|
|
@ -460,4 +470,58 @@ Status SegmentMetaCollecter::_collect_count(ColumnId cid, Column* column, Logica
|
|||
return Status::OK();
|
||||
}
|
||||
|
||||
Status SegmentMetaCollecter::_collect_column_size(ColumnId cid, Column* column, LogicalType type) {
|
||||
ColumnReader* col_reader = const_cast<ColumnReader*>(_segment->column(cid));
|
||||
RETURN_IF(col_reader == nullptr, Status::NotFound("column not found: " + std::to_string(cid)));
|
||||
|
||||
size_t total_mem_footprint = _collect_column_size_recursive(col_reader);
|
||||
column->append_datum(int64_t(total_mem_footprint));
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status SegmentMetaCollecter::_collect_column_compressed_size(ColumnId cid, Column* column, LogicalType type) {
|
||||
// Compressed size estimation: sum of data page sizes via ordinal index ranges
|
||||
ColumnReader* col_reader = const_cast<ColumnReader*>(_segment->column(cid));
|
||||
RETURN_IF(col_reader == nullptr, Status::NotFound("column not found: " + std::to_string(cid)));
|
||||
|
||||
int64_t total = _collect_column_compressed_size_recursive(col_reader);
|
||||
column->append_datum(total);
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
size_t SegmentMetaCollecter::_collect_column_size_recursive(ColumnReader* col_reader) {
|
||||
size_t total_mem_footprint = col_reader->total_mem_footprint();
|
||||
|
||||
if (col_reader->sub_readers() != nullptr) {
|
||||
for (const auto& sub_reader : *col_reader->sub_readers()) {
|
||||
total_mem_footprint += _collect_column_size_recursive(sub_reader.get());
|
||||
}
|
||||
}
|
||||
|
||||
return total_mem_footprint;
|
||||
}
|
||||
|
||||
int64_t SegmentMetaCollecter::_collect_column_compressed_size_recursive(ColumnReader* col_reader) {
|
||||
OlapReaderStatistics stats;
|
||||
IndexReadOptions opts;
|
||||
opts.use_page_cache = false;
|
||||
opts.read_file = _read_file.get();
|
||||
opts.stats = &stats;
|
||||
|
||||
Status status = col_reader->load_ordinal_index(opts);
|
||||
if (!status.ok()) {
|
||||
return 0; // Return 0 on error, caller should handle the error
|
||||
}
|
||||
|
||||
int64_t total = col_reader->data_page_footprint();
|
||||
|
||||
if (col_reader->sub_readers() != nullptr) {
|
||||
for (const auto& sub_reader : *col_reader->sub_readers()) {
|
||||
total += _collect_column_compressed_size_recursive(sub_reader.get());
|
||||
}
|
||||
}
|
||||
|
||||
return total;
|
||||
}
|
||||
|
||||
} // namespace starrocks
|
||||
|
|
|
|||
|
|
@ -105,6 +105,8 @@ static const std::string META_MAX = "max";
|
|||
static const std::string META_DICT_MERGE = "dict_merge";
|
||||
static const std::string META_FLAT_JSON_META = "flat_json_meta";
|
||||
static const std::string META_COUNT_COL = "count";
|
||||
static const std::string META_COLUMN_SIZE = "column_size";
|
||||
static const std::string META_COLUMN_COMPRESSED_SIZE = "column_compressed_size";
|
||||
|
||||
class SegmentMetaCollecter {
|
||||
public:
|
||||
|
|
@ -135,8 +137,14 @@ private:
|
|||
Status _collect_count(ColumnId cid, Column* column, LogicalType type);
|
||||
Status _collect_rows(Column* column, LogicalType type);
|
||||
Status _collect_flat_json(ColumnId cid, Column* column);
|
||||
Status _collect_column_size(ColumnId cid, Column* column, LogicalType type);
|
||||
Status _collect_column_compressed_size(ColumnId cid, Column* column, LogicalType type);
|
||||
template <bool is_max>
|
||||
Status __collect_max_or_min(ColumnId cid, Column* column, LogicalType type);
|
||||
|
||||
// Recursive helper methods for collecting column sizes
|
||||
size_t _collect_column_size_recursive(ColumnReader* col_reader);
|
||||
int64_t _collect_column_compressed_size_recursive(ColumnReader* col_reader);
|
||||
SegmentSharedPtr _segment;
|
||||
std::vector<std::unique_ptr<ColumnIterator>> _column_iterators;
|
||||
const SegmentMetaCollecterParams* _params = nullptr;
|
||||
|
|
|
|||
|
|
@ -87,7 +87,8 @@ Status OlapMetaReader::_build_collect_context(const OlapMetaReaderParams& read_p
|
|||
|
||||
// only collect the field of dict need read data page
|
||||
// others just depend on footer
|
||||
if (collect_field == META_DICT_MERGE || collect_field == META_COUNT_COL) {
|
||||
if (collect_field == META_DICT_MERGE || collect_field == META_COUNT_COL ||
|
||||
collect_field == META_COLUMN_COMPRESSED_SIZE) {
|
||||
_collect_context.seg_collecter_params.read_page.emplace_back(true);
|
||||
} else {
|
||||
_collect_context.seg_collecter_params.read_page.emplace_back(false);
|
||||
|
|
|
|||
|
|
@ -563,6 +563,18 @@ std::pair<ordinal_t, ordinal_t> ColumnReader::get_page_range(size_t page_index)
|
|||
return std::make_pair(_ordinal_index->get_first_ordinal(page_index), _ordinal_index->get_last_ordinal(page_index));
|
||||
}
|
||||
|
||||
// Iterate the oridinal index to get the total size of all data pages
|
||||
int64_t ColumnReader::data_page_footprint() const {
|
||||
RETURN_IF(_ordinal_index == nullptr, 0);
|
||||
int64_t total_size = 0;
|
||||
auto iter = _ordinal_index->begin();
|
||||
while (iter.valid()) {
|
||||
total_size += iter.page().size;
|
||||
iter.next();
|
||||
}
|
||||
return total_size;
|
||||
}
|
||||
|
||||
Status ColumnReader::zone_map_filter(const std::vector<const ColumnPredicate*>& predicates,
|
||||
const ColumnPredicate* del_predicate,
|
||||
std::unordered_set<uint32_t>* del_partial_filtered_pages,
|
||||
|
|
|
|||
|
|
@ -148,7 +148,9 @@ public:
|
|||
|
||||
uint64_t total_mem_footprint() const { return _total_mem_footprint; }
|
||||
|
||||
int32_t num_data_pages() { return _ordinal_index ? _ordinal_index->num_data_pages() : 0; }
|
||||
int32_t num_data_pages() const { return _ordinal_index ? _ordinal_index->num_data_pages() : 0; }
|
||||
// Return the total size of all data pages
|
||||
int64_t data_page_footprint() const;
|
||||
|
||||
// Return the ordinal range of a page
|
||||
std::pair<ordinal_t, ordinal_t> get_page_range(size_t page_index);
|
||||
|
|
|
|||
|
|
@ -308,6 +308,8 @@ public class FunctionSet {
|
|||
public static final String DISTINCT_PCSA = "distinct_pcsa";
|
||||
public static final String HISTOGRAM = "histogram";
|
||||
public static final String FLAT_JSON_META = "flat_json_meta";
|
||||
public static final String COLUMN_SIZE = "column_size";
|
||||
public static final String COLUMN_COMPRESSED_SIZE = "column_compressed_size";
|
||||
public static final String MANN_WHITNEY_U_TEST = "mann_whitney_u_test";
|
||||
|
||||
// Bitmap functions:
|
||||
|
|
@ -1346,6 +1348,12 @@ public class FunctionSet {
|
|||
addBuiltin(AggregateFunction.createBuiltin(FLAT_JSON_META, Lists.newArrayList(Type.ANY_ARRAY),
|
||||
Type.ARRAY_VARCHAR, Type.ARRAY_VARCHAR, false, false, false));
|
||||
|
||||
// column meta size inspectors (used only in META_SCAN)
|
||||
addBuiltin(AggregateFunction.createBuiltin(COLUMN_SIZE, Lists.newArrayList(Type.ANY_ELEMENT),
|
||||
Type.BIGINT, Type.BIGINT, false, false, false));
|
||||
addBuiltin(AggregateFunction.createBuiltin(COLUMN_COMPRESSED_SIZE, Lists.newArrayList(Type.ANY_ELEMENT),
|
||||
Type.BIGINT, Type.BIGINT, false, false, false));
|
||||
|
||||
for (Type t : Type.getSupportedTypes()) {
|
||||
// null/char/time is handled through type promotion
|
||||
// TODO: array/json/pseudo is not supported yet
|
||||
|
|
|
|||
|
|
@ -78,7 +78,9 @@ public class PushDownAggToMetaScanRule extends TransformationRule {
|
|||
if (!aggFuncName.equalsIgnoreCase(FunctionSet.DICT_MERGE)
|
||||
&& !aggFuncName.equalsIgnoreCase(FunctionSet.MAX)
|
||||
&& !aggFuncName.equalsIgnoreCase(FunctionSet.MIN)
|
||||
&& !aggFuncName.equalsIgnoreCase(FunctionSet.COUNT)) {
|
||||
&& !aggFuncName.equalsIgnoreCase(FunctionSet.COUNT)
|
||||
&& !aggFuncName.equalsIgnoreCase(FunctionSet.COLUMN_SIZE)
|
||||
&& !aggFuncName.equalsIgnoreCase(FunctionSet.COLUMN_COMPRESSED_SIZE)) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
|
@ -135,7 +137,9 @@ public class PushDownAggToMetaScanRule extends TransformationRule {
|
|||
|
||||
Column c = metaScan.getColRefToColumnMetaMap().get(usedColumn);
|
||||
Column copiedColumn = c.deepCopy();
|
||||
if (aggCall.getFnName().equals(FunctionSet.COUNT)) {
|
||||
if (aggCall.getFnName().equals(FunctionSet.COUNT)
|
||||
|| aggCall.getFnName().equals(FunctionSet.COLUMN_SIZE)
|
||||
|| aggCall.getFnName().equals(FunctionSet.COLUMN_COMPRESSED_SIZE)) {
|
||||
// this variable is introduced to solve compatibility issues,
|
||||
// see more details in the description of https://github.com/StarRocks/starrocks/pull/17619
|
||||
copiedColumn.setType(Type.BIGINT);
|
||||
|
|
@ -152,7 +156,9 @@ public class PushDownAggToMetaScanRule extends TransformationRule {
|
|||
newAggCalls.put(kv.getKey(),
|
||||
new CallOperator(aggCall.getFnName(), aggCall.getType(),
|
||||
List.of(metaColumn, aggCall.getChild(1)), aggFunction));
|
||||
} else if (aggCall.getFnName().equals(FunctionSet.COUNT)) {
|
||||
} else if (aggCall.getFnName().equals(FunctionSet.COUNT)
|
||||
|| aggCall.getFnName().equals(FunctionSet.COLUMN_SIZE)
|
||||
|| aggCall.getFnName().equals(FunctionSet.COLUMN_COMPRESSED_SIZE)) {
|
||||
// rewrite count to sum
|
||||
Function aggFunction = Expr.getBuiltinFunction(FunctionSet.SUM, new Type[] {Type.BIGINT},
|
||||
Function.CompareMode.IS_IDENTICAL);
|
||||
|
|
|
|||
|
|
@ -97,7 +97,9 @@ public class RewriteSimpleAggToMetaScanRule extends TransformationRule {
|
|||
Type columnType = aggCall.getType();
|
||||
|
||||
ColumnRefOperator metaColumn;
|
||||
if (aggCall.getFnName().equals(FunctionSet.COUNT)) {
|
||||
if (aggCall.getFnName().equals(FunctionSet.COUNT)
|
||||
|| aggCall.getFnName().equals(FunctionSet.COLUMN_SIZE)
|
||||
|| aggCall.getFnName().equals(FunctionSet.COLUMN_COMPRESSED_SIZE)) {
|
||||
if (countPlaceHolderColumn != null) {
|
||||
metaColumn = countPlaceHolderColumn;
|
||||
} else {
|
||||
|
|
@ -121,7 +123,9 @@ public class RewriteSimpleAggToMetaScanRule extends TransformationRule {
|
|||
Function aggFunction = aggCall.getFunction();
|
||||
String newAggFnName = aggCall.getFnName();
|
||||
Type newAggReturnType = aggCall.getType();
|
||||
if (aggCall.getFnName().equals(FunctionSet.COUNT)) {
|
||||
if (aggCall.getFnName().equals(FunctionSet.COUNT)
|
||||
|| aggCall.getFnName().equals(FunctionSet.COLUMN_SIZE)
|
||||
|| aggCall.getFnName().equals(FunctionSet.COLUMN_COMPRESSED_SIZE)) {
|
||||
aggFunction = Expr.getBuiltinFunction(FunctionSet.SUM,
|
||||
new Type[] {Type.BIGINT}, Function.CompareMode.IS_IDENTICAL);
|
||||
newAggFnName = FunctionSet.SUM;
|
||||
|
|
@ -205,7 +209,9 @@ public class RewriteSimpleAggToMetaScanRule extends TransformationRule {
|
|||
// min/max column should have zonemap index
|
||||
Type type = aggregator.getType();
|
||||
return !(type.isStringType() || type.isComplexType());
|
||||
} else if (functionName.equals(FunctionSet.COUNT) && !aggregator.isDistinct()) {
|
||||
} else if ((functionName.equals(FunctionSet.COUNT) ||
|
||||
functionName.equals(FunctionSet.COLUMN_SIZE) ||
|
||||
functionName.equals(FunctionSet.COLUMN_COMPRESSED_SIZE)) && !aggregator.isDistinct()) {
|
||||
if (usedColumns.size() == 1) {
|
||||
ColumnRefOperator usedColumn =
|
||||
context.getColumnRefFactory().getColumnRef(usedColumns.getFirstId());
|
||||
|
|
|
|||
|
|
@ -201,6 +201,41 @@ SELECT count(*), max(k13), min(k13), count(k13) FROM duplicate_table_with_null_p
|
|||
-- result:
|
||||
0 None None 0
|
||||
-- !result
|
||||
-- name: test_meta_column_size_functions
|
||||
CREATE TABLE meta_nested_types (
|
||||
k1 int,
|
||||
j json,
|
||||
a array<int>,
|
||||
m map<int, varchar(20)>,
|
||||
s struct<c0 int, c1 varchar(20)>
|
||||
)
|
||||
DUPLICATE KEY(k1)
|
||||
DISTRIBUTED BY HASH(k1) BUCKETS 1
|
||||
PROPERTIES ("replication_num" = "1", "compression"="lz4");
|
||||
-- result:
|
||||
-- !result
|
||||
INSERT INTO meta_nested_types VALUES
|
||||
(1, json_object('k','v'), [1,2,3], map{1:'a',2:'b'}, row(1,'x')),
|
||||
(2, json_object('k','w'), [], map{3:'c'}, row(2,'y')),
|
||||
(3, NULL, [4], map{}, row(NULL,NULL));
|
||||
-- result:
|
||||
-- !result
|
||||
SELECT column_size(j) > 0, column_compressed_size(j) > 0 FROM meta_nested_types [_META_];
|
||||
-- result:
|
||||
1 1
|
||||
-- !result
|
||||
SELECT column_size(a) > 0, column_compressed_size(a) > 0 FROM meta_nested_types [_META_];
|
||||
-- result:
|
||||
1 1
|
||||
-- !result
|
||||
SELECT column_size(m) > 0, column_compressed_size(m) > 0 FROM meta_nested_types [_META_];
|
||||
-- result:
|
||||
1 1
|
||||
-- !result
|
||||
SELECT column_size(s) > 0, column_compressed_size(s) > 0 FROM meta_nested_types [_META_];
|
||||
-- result:
|
||||
1 1
|
||||
-- !result
|
||||
-- name: test_update_table_on_meta_scan
|
||||
CREATE TABLE `update_table_with_null_partition` (
|
||||
`k1` date,
|
||||
|
|
@ -535,4 +570,4 @@ SELECT count(*), max(k12), min(k12), count(k12) FROM update_table_with_null_part
|
|||
SELECT count(*), max(k13), min(k13), count(k13) FROM update_table_with_null_partition PARTITION (p202009);
|
||||
-- result:
|
||||
12 51.000000000 40.000000000 11
|
||||
-- !result
|
||||
-- !result
|
||||
|
|
@ -84,6 +84,29 @@ SELECT count(*), max(k11), min(k11), count(k11) FROM duplicate_table_with_null_p
|
|||
SELECT count(*), max(k12), min(k12), count(k12) FROM duplicate_table_with_null_partition PARTITION (p202008);
|
||||
SELECT count(*), max(k13), min(k13), count(k13) FROM duplicate_table_with_null_partition PARTITION (p202008);
|
||||
|
||||
-- name: test_meta_column_size_functions
|
||||
CREATE TABLE meta_nested_types (
|
||||
k1 int,
|
||||
j json,
|
||||
a array<int>,
|
||||
m map<int, varchar(20)>,
|
||||
s struct<c0 int, c1 varchar(20)>
|
||||
)
|
||||
DUPLICATE KEY(k1)
|
||||
DISTRIBUTED BY HASH(k1) BUCKETS 1
|
||||
PROPERTIES ("replication_num" = "1", "compression"="lz4");
|
||||
|
||||
INSERT INTO meta_nested_types VALUES
|
||||
(1, json_object('k','v'), [1,2,3], map{1:'a',2:'b'}, row(1,'x')),
|
||||
(2, json_object('k','w'), [], map{3:'c'}, row(2,'y')),
|
||||
(3, NULL, [4], map{}, row(NULL,NULL));
|
||||
|
||||
-- aggregated sizes across segments
|
||||
SELECT column_size(j) > 0, column_compressed_size(j) > 0 FROM meta_nested_types [_META_];
|
||||
SELECT column_size(a) > 0, column_compressed_size(a) > 0 FROM meta_nested_types [_META_];
|
||||
SELECT column_size(m) > 0, column_compressed_size(m) > 0 FROM meta_nested_types [_META_];
|
||||
SELECT column_size(s) > 0, column_compressed_size(s) > 0 FROM meta_nested_types [_META_];
|
||||
|
||||
-- name: test_update_table_on_meta_scan
|
||||
CREATE TABLE `update_table_with_null_partition` (
|
||||
`k1` date,
|
||||
|
|
|
|||
Loading…
Reference in New Issue