[Enhancement] Use ColumnView to reduce CopyRightTableChunkTable time(backport #58043)

Signed-off-by: satanson <ranpanf@gmail.com>
This commit is contained in:
satanson 2025-04-10 15:03:53 +08:00
parent 9036fec7d1
commit ed3e7203f9
No known key found for this signature in database
33 changed files with 1071 additions and 30 deletions

View File

@ -42,4 +42,6 @@ add_library(Column STATIC
map_column.cpp
struct_column.cpp
stream_chunk.cpp
column_view/column_view_base.cpp
column_view/column_view_helper.cpp
)

View File

@ -17,6 +17,7 @@
#include <cstdint>
#include "column/column_helper.h"
#include "column/column_view/column_view.h"
#include "column/fixed_length_column.h"
#include "column/nullable_column.h"
#include "column/vectorized_fwd.h"
@ -27,7 +28,6 @@
#include "util/mysql_row_buffer.h"
namespace starrocks {
void ArrayColumn::check_or_die() const {
CHECK_EQ(_offsets->get_data().back(), _elements->size());
DCHECK(_elements->is_nullable());
@ -122,6 +122,10 @@ void ArrayColumn::append(const Column& src, size_t offset, size_t count) {
}
void ArrayColumn::append_selective(const Column& src, const uint32_t* indexes, uint32_t from, uint32_t size) {
if (src.is_array_view()) {
down_cast<const ColumnView*>(&src)->append_to(*this, indexes, from, size);
return;
}
for (uint32_t i = 0; i < size; i++) {
append(src, indexes[from + i], 1);
}
@ -697,5 +701,4 @@ template bool ArrayColumn::is_all_array_lengths_equal<true>(const ColumnPtr& v1,
const NullColumnPtr& null_data);
template bool ArrayColumn::is_all_array_lengths_equal<false>(const ColumnPtr& v1, const ColumnPtr& v2,
const NullColumnPtr& null_data);
} // namespace starrocks
} // namespace starrocks

View File

@ -14,6 +14,8 @@
#include "column/binary_column.h"
#include "column/column_view/column_view.h"
#ifdef __x86_64__
#include <immintrin.h>
#endif
@ -29,7 +31,6 @@
#include "util/raw_container.h"
namespace starrocks {
template <typename T>
void BinaryColumnBase<T>::check_or_die() const {
CHECK_EQ(_bytes.size(), _offsets.back());
@ -62,6 +63,10 @@ void BinaryColumnBase<T>::append(const Column& src, size_t offset, size_t count)
template <typename T>
void BinaryColumnBase<T>::append_selective(const Column& src, const uint32_t* indexes, uint32_t from, uint32_t size) {
if (src.is_binary_view()) {
down_cast<const ColumnView*>(&src)->append_to(*this, indexes, from, size);
return;
}
const auto& src_column = down_cast<const BinaryColumnBase<T>&>(src);
const auto& src_offsets = src_column.get_offset();
const auto& src_bytes = src_column.get_bytes();
@ -791,5 +796,4 @@ bool BinaryColumnBase<T>::capacity_limit_reached(std::string* msg) const {
template class BinaryColumnBase<uint32_t>;
template class BinaryColumnBase<uint64_t>;
} // namespace starrocks

View File

@ -36,7 +36,7 @@ struct TypeDescriptor;
// Forward declaration
class Datum;
class Column {
class Column : public std::enable_shared_from_this<Column> {
public:
// we use append fixed size to achieve faster memory copy.
// We copy 350M rows, which total length is 2GB, max length is 15.
@ -97,7 +97,13 @@ public:
virtual bool is_array() const { return false; }
virtual bool is_view() const { return false; }
virtual bool is_nullable_view() const { return false; }
virtual bool is_array_view() const { return false; }
virtual bool is_json_view() const { return false; }
virtual bool is_binary_view() const { return false; }
virtual bool is_struct_view() const { return false; }
virtual bool is_map_view() const { return false; }
virtual bool is_map() const { return false; }
@ -412,6 +418,8 @@ public:
// current only used by adaptive_nullable_column
virtual void materialized_nullable() const {}
ColumnPtr get_ptr() const { return const_cast<Column*>(this)->shared_from_this(); }
protected:
static StatusOr<ColumnPtr> downgrade_helper_func(ColumnPtr* col);
static StatusOr<ColumnPtr> upgrade_helper_func(ColumnPtr* col);

View File

@ -19,6 +19,7 @@
#include "column/adaptive_nullable_column.h"
#include "column/array_column.h"
#include "column/chunk.h"
#include "column/column_view/column_view_helper.h"
#include "column/json_column.h"
#include "column/map_column.h"
#include "column/struct_column.h"
@ -32,10 +33,6 @@
namespace starrocks {
NullColumnPtr ColumnHelper::one_size_not_null_column = NullColumn::create(1, 0);
NullColumnPtr ColumnHelper::one_size_null_column = NullColumn::create(1, 1);
Filter& ColumnHelper::merge_nullable_filter(Column* column) {
if (column->is_nullable()) {
auto* nullable_column = down_cast<NullableColumn*>(column);
@ -277,6 +274,18 @@ ColumnPtr ColumnHelper::create_column(const TypeDescriptor& type_desc, bool null
return create_column(type_desc, nullable, false, 0);
}
MutableColumnPtr ColumnHelper::create_column(const TypeDescriptor& type_desc, bool nullable, bool use_view_if_needed,
long column_view_concat_rows_limit, long column_view_concat_bytes_limit) {
if (use_view_if_needed) {
auto opt_column = ColumnViewHelper::create_column_view(type_desc, nullable, column_view_concat_rows_limit,
column_view_concat_bytes_limit);
if (opt_column.has_value()) {
return std::move(opt_column.value());
}
}
return create_column(type_desc, nullable);
}
struct ColumnBuilder {
template <LogicalType ltype>
ColumnPtr operator()(const TypeDescriptor& type_desc, size_t size) {
@ -502,5 +511,4 @@ Ptr ChunkSliceTemplate<Ptr>::cutoff(size_t required_rows) {
template struct ChunkSliceTemplate<ChunkPtr>;
template struct ChunkSliceTemplate<ChunkUniquePtr>;
} // namespace starrocks

View File

@ -233,6 +233,9 @@ public:
// Create an empty column
static ColumnPtr create_column(const TypeDescriptor& type_desc, bool nullable);
static MutableColumnPtr create_column(const TypeDescriptor& type_desc, bool nullable, bool use_view_if_needed,
long column_view_concat_rows_limit, long column_view_concat_bytes_limit);
// expression trees' return column should align return type when some return columns maybe diff from the required
// return type, as well the null flag. e.g., concat_ws returns col from create_const_null_column(), it's type is
// Nullable(int8), but required return type is nullable(string), so col need align return type to nullable(string).

View File

@ -0,0 +1,36 @@
// Copyright 2021-present StarRocks, Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#pragma once
#include "column/column.h"
#include "column/column_helper.h"
#include "column/column_view/column_view_base.h"
namespace starrocks {
class ColumnView final : public ColumnFactory<ColumnViewBase, ColumnView, Column> {
public:
using Super = ColumnFactory<ColumnViewBase, ColumnView, Column>;
explicit ColumnView(ColumnPtr&& default_column, long concat_rows_limit, long concat_bytes_limit)
: Super(std::move(default_column), concat_rows_limit, concat_bytes_limit) {}
ColumnView(const ColumnView& column_view) : Super(column_view) {}
ColumnView(ColumnView&& column_view) = delete;
bool is_view() const override { return true; }
bool is_json_view() const override { return ColumnHelper::get_data_column(_default_column.get())->is_json(); }
bool is_map_view() const override { return ColumnHelper::get_data_column(_default_column.get())->is_map(); }
bool is_array_view() const override { return ColumnHelper::get_data_column(_default_column.get())->is_array(); }
bool is_binary_view() const override { return ColumnHelper::get_data_column(_default_column.get())->is_binary(); }
bool is_struct_view() const override { return ColumnHelper::get_data_column(_default_column.get())->is_struct(); }
};
} // namespace starrocks

View File

@ -0,0 +1,219 @@
// Copyright 2021-present StarRocks, Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include <column/column.h>
#include <column/column_helper.h>
#include <column/column_view/column_view_base.h>
#include <simd/gather.h>
namespace starrocks {
size_t ColumnViewBase::container_memory_usage() const {
if (_concat_column) {
return _concat_column->container_memory_usage();
}
size_t usage = 0;
for (const auto& column : _habitats) {
usage += column->container_memory_usage();
}
usage += sizeof(ColumnViewBase::LocationType) * 2 * _num_rows;
return usage;
}
size_t ColumnViewBase::reference_memory_usage(size_t from, size_t size) const {
_to_view();
if (_concat_column) {
return _concat_column->reference_memory_usage(from, size);
}
size_t usage = 0;
for (auto i = from; i < from + size; i++) {
const auto& column = _habitats[_habitat_idx[i]];
if (column->is_constant() || column->only_null()) {
continue;
}
usage += column->reference_memory_usage(i, 1);
}
for (const auto& column : _habitats) {
if (column->is_constant() && !column->empty()) {
usage += column->reference_memory_usage(0, 1);
}
}
return usage;
}
size_t ColumnViewBase::memory_usage() const {
if (_concat_column) {
return _concat_column->memory_usage();
}
size_t usage = 0;
for (const auto& column : _habitats) {
usage += column->memory_usage();
}
usage += sizeof(ColumnViewBase::LocationType) * 2 * _num_rows;
return usage;
}
static bool is_continuous(const std::vector<uint32_t>& selection) {
const auto first = selection[0];
for (auto i = 0; i < selection.size(); i++) {
if (selection[i] != first + i) {
return false;
}
}
return true;
}
void ColumnViewBase::append(const Column& src, size_t offset, size_t count) {
if (src.is_view()) {
const ColumnViewBase& src_view = static_cast<const ColumnViewBase&>(src);
src_view._to_view();
if (src_view._concat_column) {
append(*src_view._concat_column, offset, count);
return;
}
std::vector<std::vector<uint32_t> > selections(src_view._habitats.size());
for (auto hi = 0; hi < src_view._habitats.size(); ++hi) {
selections[hi].reserve(src_view._habitats[hi]->size());
}
for (auto i = offset; i < offset + count; i++) {
const auto habitat_idx = src_view._habitat_idx[i];
const auto row_idx = src_view._row_idx[i];
selections[habitat_idx].push_back(row_idx);
}
for (auto hi = 0; hi < selections.size(); ++hi) {
auto& selection = selections[hi];
auto& habitat_column = src_view._habitats[hi];
if (selection.empty()) {
continue;
}
if (selection.back() + 1 - selection.front() == selection.size() && is_continuous(selection)) {
append(*habitat_column.get(), selection.front(), selection.size());
} else {
_append_selective(*habitat_column.get(), std::move(selection));
}
}
} else {
int habitat_idx = _habitats.size();
ColumnPtr column_ptr = src.get_ptr();
_habitats.push_back(column_ptr);
_num_rows += count;
_tasks.emplace_back([=]() { _append(habitat_idx, column_ptr, offset, count); });
}
}
void ColumnViewBase::append_selective(const Column& src, const uint32_t* indexes, uint32_t from, uint32_t size) {
std::vector<uint32_t> index_container(size);
std::copy_n(indexes + from, size, index_container.begin());
_append_selective(src, std::move(index_container));
}
void ColumnViewBase::_append_selective(const Column& src, std::vector<uint32_t>&& indexes) {
int habitat_idx = _habitats.size();
ColumnPtr column_ptr = src.get_ptr();
_habitats.push_back(column_ptr);
_num_rows += indexes.size();
_tasks.emplace_back([=, indexes2 = std::move(indexes)] { _append_selective(habitat_idx, column_ptr, indexes2); });
}
void ColumnViewBase::append_default() {
const auto habitat_idx = _habitats.size();
_habitats.emplace_back(_default_column->clone());
_num_rows += 1;
_tasks.emplace_back([=]() {
_habitat_idx.push_back(habitat_idx);
_row_idx.push_back(0);
});
}
void ColumnViewBase::_append(int habitat_idx, const ColumnPtr& src, size_t offset, size_t count) {
_habitat_idx.resize(_habitat_idx.size() + count);
_row_idx.resize(_row_idx.size() + count);
const auto off = _habitat_idx.size() - count;
std::fill(_habitat_idx.begin() + off, _habitat_idx.end(), habitat_idx);
std::iota(_row_idx.begin() + off, _row_idx.end(), offset);
}
void ColumnViewBase::_append_selective(int habitat_idx, const ColumnPtr& src,
const std::vector<uint32_t>& index_container) {
const auto count = index_container.size();
_habitat_idx.resize(_habitat_idx.size() + count);
_row_idx.resize(_row_idx.size() + count);
const auto off = _habitat_idx.size() - count;
std::fill(_habitat_idx.begin() + off, _habitat_idx.end(), habitat_idx);
std::ranges::copy(index_container, _row_idx.begin() + off);
}
void ColumnViewBase::append_to(Column& dest_column, const uint32_t* indexes, uint32_t from, uint32_t count) const {
_to_view();
DCHECK(from + count <= _num_rows);
if (_concat_column) {
dest_column.append_selective(*_concat_column, indexes, from, count);
return;
}
for (auto i = from; i < from + count; ++i) {
const auto n = indexes[i];
const auto& habitat_column = _habitats[_habitat_idx[n]];
const auto ordinal = habitat_column->is_constant() ? 0 : _row_idx[n];
if (habitat_column->is_null(ordinal)) {
DCHECK(dest_column.is_nullable());
dest_column.append_nulls(1);
} else {
dest_column.append(*ColumnHelper::get_data_column(habitat_column.get()), ordinal, 1);
}
}
}
void ColumnViewBase::_concat_if_need() const {
if (_concat_rows_limit != -1 && _num_rows >= _concat_rows_limit) {
return;
}
if (_concat_bytes_limit != -1 && memory_usage() >= _concat_bytes_limit) {
return;
}
auto dst_column = clone_empty();
for (auto i = 0; i < _num_rows; ++i) {
const auto& habitat_column = _habitats[_habitat_idx[i]];
const auto ordinal = habitat_column->is_constant() ? 0 : _row_idx[i];
if (habitat_column->is_null(ordinal)) {
DCHECK(dst_column->is_nullable());
dst_column->append_nulls(1);
} else {
dst_column->append(*ColumnHelper::get_data_column(habitat_column.get()), ordinal, 1);
}
}
_concat_column = std::move(dst_column);
_habitat_idx.clear();
_row_idx.clear();
_habitats.clear();
}
void ColumnViewBase::_to_view() const {
std::call_once(_to_view_flag, [this]() {
if (_tasks.empty()) {
return;
}
DCHECK(_habitat_idx.size() == _row_idx.size());
_habitat_idx.reserve(_num_rows);
_row_idx.reserve(_num_rows);
for (const auto& task : _tasks) {
task();
}
_tasks.clear();
_concat_if_need();
});
}
} // namespace starrocks

View File

@ -0,0 +1,151 @@
// Copyright 2021-present StarRocks, Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#pragma once
#include <column/column.h>
#include <column/datum.h>
#if defined(__GNUC__) || defined(__clang__)
#define NOT_SUPPORT() \
do { \
throw std::runtime_error(std::string("ColumnView not support method '") + __PRETTY_FUNCTION__ + "'"); \
} while (0);
#elif defined(_MSC_VER)
#define NOT_SUPPORT() \
do { \
throw std::runtime_error(std::string("ColumnView not support method '") + __FUNCSIG__ + "'"); \
} while (0);
#else
#define NOT_SUPPORT() \
do { \
throw std::runtime_error(std::string("ColumnView not support method '") + __func__ + "'"); \
} while (0);
#endif
namespace starrocks {
class ColumnViewBase : public Column {
public:
using LocationType = uint32_t;
using Locations = std::vector<LocationType>;
StatusOr<ColumnPtr> upgrade_if_overflow() override { return nullptr; };
StatusOr<ColumnPtr> downgrade() override { return nullptr; }
bool has_large_column() const override { return false; }
size_t size() const override { return _num_rows; }
size_t container_memory_usage() const override;
size_t reference_memory_usage(size_t from, size_t size) const override;
size_t memory_usage() const override;
void append(const Column& src) override { append(src, 0, src.size()); }
void append(const Column& src, size_t offset, size_t count) override;
void append_selective(const Column& src, const uint32_t* indexes, uint32_t from, uint32_t size) override;
ColumnViewBase(ColumnPtr&& default_column, long concat_rows_limit, long concat_bytes_limit)
: _default_column(std::move(default_column)),
_concat_rows_limit(concat_rows_limit),
_concat_bytes_limit(concat_bytes_limit) {}
ColumnViewBase(const ColumnViewBase& that)
: _default_column(that._default_column->clone()),
_concat_rows_limit(that._concat_rows_limit),
_concat_bytes_limit(that._concat_bytes_limit),
_habitats(that._habitats),
_num_rows(that._num_rows),
_tasks(that._tasks),
_habitat_idx(that._habitat_idx),
_row_idx(that._row_idx),
_concat_column(that._concat_column) {}
ColumnViewBase(ColumnViewBase&&) = delete;
void append_default() override;
MutableColumnPtr clone_empty() const override { return _default_column->clone_empty(); }
virtual void append_to(Column& dest_column, const uint32_t* indexes, uint32_t from, uint32_t count) const;
const uint8_t* raw_data() const override { NOT_SUPPORT(); }
uint8_t* mutable_raw_data() override { NOT_SUPPORT(); }
size_t capacity() const override { NOT_SUPPORT(); }
size_t byte_size() const override { NOT_SUPPORT(); }
size_t type_size() const override { NOT_SUPPORT(); }
size_t byte_size(size_t from, size_t size) const override { NOT_SUPPORT(); }
size_t byte_size(size_t idx) const override { NOT_SUPPORT(); };
void reserve(size_t n) override { NOT_SUPPORT(); }
void resize(size_t n) override { NOT_SUPPORT(); }
void assign(size_t n, size_t idx) override { NOT_SUPPORT(); }
void append_datum(const Datum& datum) override { NOT_SUPPORT(); }
void remove_first_n_values(size_t count) override { NOT_SUPPORT(); }
void fill_default(const Filter& filter) override { NOT_SUPPORT(); }
void update_rows(const Column& src, const uint32_t* indexes) override { NOT_SUPPORT(); }
void append_value_multiple_times(const Column& src, uint32_t index, uint32_t size) override { NOT_SUPPORT(); }
bool append_nulls(size_t count) override { NOT_SUPPORT(); }
[[nodiscard]] size_t append_numbers(const void* buff, size_t length) override { NOT_SUPPORT(); }
void append_value_multiple_times(const void* value, size_t count) override { NOT_SUPPORT(); }
void append_default(size_t count) override { NOT_SUPPORT(); }
uint32_t max_one_element_serialize_size() const override { NOT_SUPPORT(); }
uint32_t serialize(size_t idx, uint8_t* pos) override { NOT_SUPPORT(); }
uint32_t serialize_default(uint8_t* pos) override { NOT_SUPPORT(); }
void serialize_batch(uint8_t*, starrocks::Buffer<unsigned int>&, size_t, uint32_t) override { NOT_SUPPORT(); }
const uint8_t* deserialize_and_append(const uint8_t* pos) override { NOT_SUPPORT(); }
void deserialize_and_append_batch(Buffer<Slice>& srcs, size_t chunk_size) override { NOT_SUPPORT(); }
MutablePtr clone() const override { NOT_SUPPORT(); }
uint32_t serialize_size(size_t idx) const override { NOT_SUPPORT(); }
size_t filter_range(const Filter& filter, size_t from, size_t to) override { NOT_SUPPORT(); }
int compare_at(size_t left, size_t right, const Column& rhs, int nan_direction_hint) const override {
NOT_SUPPORT();
}
void fnv_hash(uint32_t* seed, uint32_t from, uint32_t to) const override { NOT_SUPPORT(); }
void crc32_hash(uint32_t* seed, uint32_t from, uint32_t to) const override { NOT_SUPPORT(); }
int64_t xor_checksum(uint32_t from, uint32_t to) const override { NOT_SUPPORT(); }
void put_mysql_row_buffer(MysqlRowBuffer* buf, size_t idx, bool is_binary_protocol = false) const override {
NOT_SUPPORT();
}
std::string get_name() const override { NOT_SUPPORT(); }
Datum get(size_t n) const override { NOT_SUPPORT(); }
void swap_column(Column& rhs) override { NOT_SUPPORT(); }
Status capacity_limit_reached() const override { NOT_SUPPORT(); }
void check_or_die() const override {}
Status accept(ColumnVisitor* visitor) const override { NOT_SUPPORT(); }
Status accept_mutable(ColumnVisitorMutable* visitor) override { NOT_SUPPORT(); }
bool is_nullable_view() const override { return _default_column->is_nullable(); }
protected:
void _append_selective(const Column& src, std::vector<uint32_t>&& indexes);
virtual void _append(int habitat_idx, const ColumnPtr& src, size_t offset, size_t count);
virtual void _append_selective(int habitat_idx, const ColumnPtr& src, const std::vector<uint32_t>& index_container);
virtual void _to_view() const;
void _concat_if_need() const;
ColumnPtr _default_column;
const long _concat_rows_limit;
const long _concat_bytes_limit;
mutable std::vector<ColumnPtr> _habitats;
size_t _num_rows{0};
mutable std::once_flag _to_view_flag;
mutable std::vector<std::function<void()> > _tasks;
mutable Locations _habitat_idx;
mutable Locations _row_idx;
mutable ColumnPtr _concat_column;
};
} // namespace starrocks

View File

@ -0,0 +1,52 @@
// Copyright 2021-present StarRocks, Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "column/column_view/column_view_helper.h"
#include "column/column_helper.h"
#include "column/column_view/column_view.h"
#include "runtime/types.h"
#include "types/logical_type.h"
namespace starrocks {
static bool should_use_view(LogicalType ltype) {
switch (ltype) {
case TYPE_ARRAY:
case TYPE_STRUCT:
case TYPE_MAP:
case TYPE_JSON:
case TYPE_VARCHAR:
case TYPE_CHAR:
case TYPE_BINARY:
case TYPE_VARBINARY:
return true;
default:
return false;
}
return false;
}
std::optional<MutableColumnPtr> ColumnViewHelper::create_column_view(const TypeDescriptor& type_desc, bool nullable,
long concat_rows_limit, long concat_bytes_limit) {
if (!should_use_view(type_desc.type)) {
return {};
}
ColumnPtr default_column = ColumnHelper::create_column(type_desc, nullable);
if (default_column->is_nullable()) {
down_cast<NullableColumn*>(default_column.get())->append_default_not_null_value();
} else {
default_column->append_default();
}
return ColumnView::create(std::move(default_column), concat_rows_limit, concat_bytes_limit);
}
} // namespace starrocks

View File

@ -0,0 +1,25 @@
// Copyright 2021-present StarRocks, Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#pragma once
#include <optional>
#include "column/column.h"
#include "runtime/types.h"
namespace starrocks {
struct ColumnViewHelper {
static std::optional<MutableColumnPtr> create_column_view(const TypeDescriptor& type_desc, bool nullable,
long concat_rows_limit, long concat_bytes_limit);
};
} // namespace starrocks

View File

@ -83,6 +83,8 @@ public:
virtual Status visit(const ArrayViewColumn& column) {
return Status::NotSupported("ArrayViewColumn is not supported");
}
virtual Status visit(const ColumnView& column) { return Status::NotSupported("ColumnView is not supported"); }
};
} // namespace starrocks

View File

@ -22,7 +22,6 @@
#include "util/int96.h"
namespace starrocks {
class ColumnVisitorMutable {
public:
virtual ~ColumnVisitorMutable() = default;
@ -81,6 +80,6 @@ public:
virtual Status visit(FixedLengthColumnBase<decimal12_t>* column);
virtual Status visit(ObjectColumn<JsonValue>* column);
virtual Status visit(ArrayViewColumn* column) { return Status::NotSupported("ArrayViewColumn is not supported"); }
virtual Status visit(ColumnView* column) { return Status::NotSupported("ColumnView is not supported"); }
};
} // namespace starrocks

View File

@ -19,6 +19,7 @@
#include "column/bytes.h"
#include "column/column_helper.h"
#include "column/column_view/column_view.h"
#include "column/nullable_column.h"
#include "column/vectorized_fwd.h"
#include "common/compiler_util.h"
@ -273,6 +274,10 @@ void JsonColumn::append_default() {
}
void JsonColumn::append_selective(const Column& src, const uint32_t* indexes, uint32_t from, uint32_t size) {
if (src.is_json_view()) {
down_cast<const ColumnView*>(&src)->append_to(*this, indexes, from, size);
return;
}
const auto* other_json = down_cast<const JsonColumn*>(&src);
if (other_json->is_flat_json() && !is_flat_json()) {
// only hit in AggregateIterator (Aggregate mode in storage)

View File

@ -18,6 +18,7 @@
#include <set>
#include "column/column_helper.h"
#include "column/column_view/column_view.h"
#include "column/datum.h"
#include "column/fixed_length_column.h"
#include "column/nullable_column.h"
@ -29,7 +30,6 @@
#include "util/mysql_row_buffer.h"
namespace starrocks {
void MapColumn::check_or_die() const {
CHECK_EQ(_offsets->get_data().back(), _keys->size());
CHECK_EQ(_offsets->get_data().back(), _values->size());
@ -129,6 +129,10 @@ void MapColumn::append(const Column& src, size_t offset, size_t count) {
}
void MapColumn::append_selective(const Column& src, const uint32_t* indexes, uint32_t from, uint32_t size) {
if (src.is_map_view()) {
down_cast<const ColumnView*>(&src)->append_to(*this, indexes, from, size);
return;
}
for (uint32_t i = 0; i < size; i++) {
append(src, indexes[from + i], 1);
}
@ -736,5 +740,4 @@ void MapColumn::remove_duplicated_keys(bool need_recursive) {
_offsets.swap(new_offsets);
}
}
} // namespace starrocks
} // namespace starrocks

View File

@ -15,6 +15,7 @@
#include "column/nullable_column.h"
#include "column/column_helper.h"
#include "column/column_view/column_view.h"
#include "column/vectorized_fwd.h"
#include "gutil/casts.h"
#include "gutil/strings/fastmem.h"
@ -95,6 +96,10 @@ void NullableColumn::append(const Column& src, size_t offset, size_t count) {
}
void NullableColumn::append_selective(const Column& src, const uint32_t* indexes, uint32_t from, uint32_t size) {
if (src.is_view()) {
down_cast<const ColumnView*>(&src)->append_to(*this, indexes, from, size);
return;
}
DCHECK_EQ(_null_column->size(), _data_column->size());
size_t orig_size = _null_column->size();
if (src.only_null()) {

View File

@ -15,6 +15,7 @@
#include "column/struct_column.h"
#include "column/column_helper.h"
#include "column/column_view/column_view.h"
#include "util/mysql_row_buffer.h"
namespace starrocks {
@ -161,6 +162,10 @@ void StructColumn::update_rows(const Column& src, const uint32_t* indexes) {
}
void StructColumn::append_selective(const Column& src, const uint32_t* indexes, uint32_t from, uint32_t size) {
if (src.is_struct_view()) {
down_cast<const ColumnView*>(&src)->append_to(*this, indexes, from, size);
return;
}
DCHECK(src.is_struct());
const auto& src_column = down_cast<const StructColumn&>(src);
DCHECK_EQ(_fields.size(), src_column._fields.size());

View File

@ -104,6 +104,8 @@ class JsonColumn;
class MapColumn;
class StructColumn;
class ColumnView;
using ChunkPtr = std::shared_ptr<Chunk>;
using ChunkUniquePtr = std::unique_ptr<Chunk>;
using Chunks = std::vector<ChunkPtr>;

View File

@ -181,7 +181,7 @@ Status HashJoinNode::prepare(RuntimeState* state) {
RETURN_IF_ERROR(Expr::prepare(_other_join_conjunct_ctxs, state));
HashTableParam param;
_init_hash_table_param(&param);
_init_hash_table_param(&param, state);
_ht.create(param);
_output_probe_column_count = _ht.get_output_probe_column_count();
@ -190,7 +190,7 @@ Status HashJoinNode::prepare(RuntimeState* state) {
return Status::OK();
}
void HashJoinNode::_init_hash_table_param(HashTableParam* param) {
void HashJoinNode::_init_hash_table_param(HashTableParam* param, RuntimeState* runtime_state) {
param->with_other_conjunct = !_other_join_conjunct_ctxs.empty();
param->join_type = _join_type;
param->build_row_desc = &child(1)->row_desc();
@ -201,6 +201,8 @@ void HashJoinNode::_init_hash_table_param(HashTableParam* param) {
param->build_output_slots = _output_slots;
param->probe_output_slots = _output_slots;
param->enable_late_materialization = _enable_late_materialization;
param->column_view_concat_rows_limit = runtime_state->column_view_concat_rows_limit();
param->column_view_concat_bytes_limit = runtime_state->column_view_concat_bytes_limit();
std::set<SlotId> predicate_slots;
for (ExprContext* expr_context : _conjunct_ctxs) {

View File

@ -59,7 +59,7 @@ private:
static bool _has_null(const ColumnPtr& column);
void _init_hash_table_param(HashTableParam* param);
void _init_hash_table_param(HashTableParam* param, RuntimeState* runtime_state);
// local join includes: broadcast join and colocate join.
Status _create_implicit_local_join_runtime_filters(RuntimeState* state);
void _final_update_profile() {

View File

@ -105,7 +105,7 @@ Status HashJoiner::prepare_builder(RuntimeState* state, RuntimeProfile* runtime_
_build_metrics->prepare(runtime_profile);
_init_hash_table_param(&_hash_table_param);
_init_hash_table_param(&_hash_table_param, state);
_hash_join_builder->create(hash_table_param());
auto& ht = _hash_join_builder->hash_table();
@ -136,7 +136,7 @@ Status HashJoiner::prepare_prober(RuntimeState* state, RuntimeProfile* runtime_p
return Status::OK();
}
void HashJoiner::_init_hash_table_param(HashTableParam* param) {
void HashJoiner::_init_hash_table_param(HashTableParam* param, RuntimeState* state) {
param->with_other_conjunct = !_other_join_conjunct_ctxs.empty();
param->join_type = _join_type;
param->build_row_desc = &_build_row_descriptor;
@ -145,7 +145,8 @@ void HashJoiner::_init_hash_table_param(HashTableParam* param) {
param->probe_output_slots = _probe_output_slots;
param->mor_reader_mode = _mor_reader_mode;
param->enable_late_materialization = _enable_late_materialization;
param->column_view_concat_rows_limit = state->column_view_concat_rows_limit();
param->column_view_concat_bytes_limit = state->column_view_concat_bytes_limit();
std::set<SlotId> predicate_slots;
for (ExprContext* expr_context : _conjunct_ctxs) {
std::vector<SlotId> expr_slots;

View File

@ -317,7 +317,7 @@ public:
private:
static bool _has_null(const ColumnPtr& column);
void _init_hash_table_param(HashTableParam* param);
void _init_hash_table_param(HashTableParam* param, RuntimeState* state);
[[nodiscard]] Status _prepare_key_columns(Columns& key_columns, const ChunkPtr& chunk,
const vector<ExprContext*>& expr_ctxs) {

View File

@ -26,7 +26,6 @@
#include "simd/simd.h"
namespace starrocks {
// if the same hash values are clustered, after the first probe, all related hash buckets are cached, without too many
// misses. So check time locality of probe keys here.
void HashTableProbeState::consider_probe_time_locality() {
@ -423,6 +422,12 @@ void JoinHashTable::_init_probe_column(const HashTableParam& param) {
void JoinHashTable::_init_build_column(const HashTableParam& param) {
const auto& build_desc = *param.build_row_desc;
std::unordered_set<SlotId> join_key_col_refs;
for (const auto& join_key : param.join_keys) {
if (join_key.col_ref != nullptr) {
join_key_col_refs.insert(join_key.col_ref->slot_id());
}
}
for (const auto& tuple_desc : build_desc.tuple_descriptors()) {
for (const auto& slot : tuple_desc->slots()) {
HashTableSlotDescriptor hash_table_slot;
@ -471,10 +476,15 @@ void JoinHashTable::_init_build_column(const HashTableParam& param) {
hash_table_slot.need_output = false;
}
}
_table_items->build_slots.emplace_back(hash_table_slot);
ColumnPtr column = ColumnHelper::create_column(slot->type(), slot->is_nullable());
if (slot->is_nullable()) {
const auto use_view =
(join_key_col_refs.find(slot->id()) == join_key_col_refs.end()) &&
(param.column_view_concat_rows_limit >= 0 || param.column_view_concat_bytes_limit >= 0);
MutableColumnPtr column = ColumnHelper::create_column(slot->type(), slot->is_nullable(), use_view,
param.column_view_concat_rows_limit,
param.column_view_concat_bytes_limit);
if (column->is_nullable()) {
auto* nullable_column = ColumnHelper::as_raw_column<NullableColumn>(column);
nullable_column->append_default_not_null_value();
} else {
@ -630,7 +640,7 @@ void JoinHashTable::append_chunk(const ChunkPtr& chunk, const Columns& key_colum
SlotDescriptor* slot = _table_items->build_slots[i].slot;
ColumnPtr& column = chunk->get_column_by_slot_id(slot->id());
if (!columns[i]->is_nullable() && column->is_nullable()) {
if (!columns[i]->is_nullable() && !columns[i]->is_view() && column->is_nullable()) {
// upgrade to nullable column
columns[i] = NullableColumn::create(columns[i], NullColumn::create(columns[i]->size(), 0));
}
@ -965,5 +975,4 @@ template class JoinHashMapForSerializedKey(TYPE_VARCHAR);
template class JoinHashMapForFixedSizeKey(TYPE_INT);
template class JoinHashMapForFixedSizeKey(TYPE_BIGINT);
template class JoinHashMapForFixedSizeKey(TYPE_LARGEINT);
} // namespace starrocks

View File

@ -277,6 +277,9 @@ struct HashTableProbeState {
struct HashTableParam {
bool with_other_conjunct = false;
bool enable_late_materialization = false;
long column_view_concat_rows_limit = -1L;
long column_view_concat_bytes_limit = -1L;
TJoinOp::type join_type = TJoinOp::INNER_JOIN;
const RowDescriptor* build_row_desc = nullptr;
const RowDescriptor* probe_row_desc = nullptr;

View File

@ -619,7 +619,7 @@ void JoinHashMap<LT, BuildFunc, ProbeFunc>::_build_output(ChunkPtr* chunk) {
bool need_output = is_lazy ? hash_table_slot.need_lazy_materialize : hash_table_slot.need_output;
if (need_output) {
ColumnPtr& column = _table_items->build_chunk->columns()[i];
if (!column->is_nullable()) {
if (!column->is_nullable() && !column->is_nullable_view()) {
_copy_build_column(column, chunk, slot, to_nullable);
} else {
_copy_build_nullable_column(column, chunk, slot);

View File

@ -420,6 +420,20 @@ public:
return _query_options.__isset.enable_hyperscan_vec && _query_options.enable_hyperscan_vec;
}
long column_view_concat_rows_limit() const {
return _query_options.__isset.column_view_concat_rows_limit ? _query_options.column_view_concat_rows_limit
: -1L;
}
long column_view_concat_bytes_limit() const {
return _query_options.__isset.column_view_concat_bytes_limit ? _query_options.column_view_concat_bytes_limit
: -1L;
}
bool enable_column_view() const {
return column_view_concat_bytes_limit() > 0 || column_view_concat_rows_limit() > 0;
}
const std::vector<TTabletCommitInfo>& tablet_commit_infos() const { return _tablet_commit_infos; }
std::vector<TTabletCommitInfo>& tablet_commit_infos() { return _tablet_commit_infos; }

View File

@ -3,6 +3,7 @@ set(EXEC_FILES
./agent/heartbeat_server_test.cpp
./agent/master_info_test.cpp
./column/array_column_test.cpp
./column/column_view_test.cpp
./column/array_view_column_test.cpp
./column/binary_column_test.cpp
./column/chunk_test.cpp

View File

@ -0,0 +1,166 @@
// Copyright 2021-present StarRocks, Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "column/column_view/column_view.h"
#include <gtest/gtest.h>
#include <numeric>
#include "column/array_column.h"
#include "column/binary_column.h"
#include "column/column_view/column_view_helper.h"
#include "runtime/types.h"
#include "testutil/parallel_test.h"
#include "types/logical_type.h"
namespace starrocks {
static ColumnPtr create_int32_array_column(const std::vector<std::vector<int32_t>>& values, bool is_nullable) {
UInt32Column::Ptr offsets = UInt32Column::create();
NullableColumn::Ptr elements = NullableColumn::create(Int32Column::create(), NullColumn::create());
offsets->append(0);
for (const auto& value : values) {
for (auto v : value) {
elements->append_datum(v);
}
offsets->append(elements->size());
}
auto array_column = ArrayColumn::create(elements, offsets);
if (is_nullable) {
auto null_column = NullColumn::create();
null_column->resize(values.size());
return NullableColumn::create(array_column, null_column);
} else {
return array_column;
}
}
static void test_array_column_view_helper(bool nullable, bool append_default, long concat_row_limit,
long concat_bytes_limit, std::vector<uint32_t> selection,
std::string expect_result) {
auto child_type_desc = TypeDescriptor(LogicalType::TYPE_INT);
auto type_desc = TypeDescriptor::create_array_type(child_type_desc);
auto opt_array_column_view =
ColumnViewHelper::create_column_view(type_desc, nullable, concat_row_limit, concat_bytes_limit);
DCHECK(opt_array_column_view.has_value());
auto array_column_view = down_cast<ColumnView*>(opt_array_column_view.value().get());
auto num_rows = 0;
if (append_default) {
array_column_view->append_default();
num_rows += 1;
}
DCHECK_EQ(array_column_view->size(), num_rows);
const auto array_col0 = create_int32_array_column({{}, {1}, {1, 2}, {1, 2, 3}, {1, 2, 3, 4}}, nullable);
num_rows += 5;
array_column_view->append(*array_col0);
DCHECK_EQ(array_column_view->size(), num_rows);
const auto array_col1 = create_int32_array_column({{5}, {6, 7}, {8}, {9, 10}}, nullable);
array_column_view->append(*array_col1, 2, 2);
num_rows += 2;
DCHECK_EQ(array_column_view->size(), num_rows);
const auto array_col2 = create_int32_array_column({{11}, {12, 13}, {14}, {15, 16}}, nullable);
auto indexes = std::vector<uint32_t>({1, 3});
array_column_view->append_selective(*array_col2, indexes.data(), 0, 2);
num_rows += 2;
DCHECK_EQ(array_column_view->size(), num_rows);
const auto final_array_column = array_column_view->clone_empty();
array_column_view->append_to(*final_array_column, selection.data(), 0, selection.size());
ASSERT_EQ(final_array_column->debug_string(), expect_result);
}
PARALLEL_TEST(ColumnViewTest, test_not_nullable_with_append_default) {
std::vector<uint32_t> selection(10);
std::iota(selection.begin(), selection.end(), 0);
test_array_column_view_helper(false, true, 0, 0, selection,
"[[], [], [1], [1,2], [1,2,3], [1,2,3,4], [8], [9,10], [12,13], [15,16]]");
test_array_column_view_helper(false, true, 1L << 63, 1L << 63, selection,
"[[], [], [1], [1,2], [1,2,3], [1,2,3,4], [8], [9,10], [12,13], [15,16]]");
}
PARALLEL_TEST(ColumnViewTest, test_not_nullable_without_append_default) {
std::vector<uint32_t> selection(5);
std::iota(selection.begin(), selection.end(), 4);
test_array_column_view_helper(false, false, 0, 0, selection, "[[1,2,3,4], [8], [9,10], [12,13], [15,16]]");
test_array_column_view_helper(false, false, 1L << 63, 1L << 63, selection,
"[[1,2,3,4], [8], [9,10], [12,13], [15,16]]");
}
PARALLEL_TEST(ColumnViewTest, test_nullable_with_append_default) {
std::vector<uint32_t> selection(10);
std::iota(selection.begin(), selection.end(), 0);
test_array_column_view_helper(false, true, 0, 0, selection,
"[[], [], [1], [1,2], [1,2,3], [1,2,3,4], [8], [9,10], [12,13], [15,16]]");
test_array_column_view_helper(false, true, 1L << 63, 1L << 63, selection,
"[[], [], [1], [1,2], [1,2,3], [1,2,3,4], [8], [9,10], [12,13], [15,16]]");
}
PARALLEL_TEST(ColumnViewTest, test_nullable_without_append_default) {
std::vector<uint32_t> selection(5);
std::iota(selection.begin(), selection.end(), 4);
test_array_column_view_helper(false, false, 0, 0, selection, "[[1,2,3,4], [8], [9,10], [12,13], [15,16]]");
test_array_column_view_helper(false, false, 1L << 63, 1L << 63, selection,
"[[1,2,3,4], [8], [9,10], [12,13], [15,16]]");
}
PARALLEL_TEST(ColumnViewTest, test_create_struct_column_view) {
TypeDescriptor type_desc = TypeDescriptor(LogicalType::TYPE_STRUCT);
type_desc.field_names.push_back("field1");
type_desc.field_names.push_back("field2");
TypeDescriptor field1_type_desc = TypeDescriptor::create_varchar_type(20);
TypeDescriptor field2_type_desc = TypeDescriptor::create_varchar_type(20);
type_desc.children.push_back(field1_type_desc);
type_desc.children.push_back(field2_type_desc);
for (auto nullable : {true, false}) {
auto opt_struct_column_view = ColumnViewHelper::create_column_view(type_desc, nullable, 0, 0);
DCHECK(opt_struct_column_view.has_value());
auto struct_column_view = std::move(opt_struct_column_view.value());
DCHECK(struct_column_view->is_struct_view());
}
}
PARALLEL_TEST(ColumnViewTest, test_create_json_column_view) {
TypeDescriptor type_desc = TypeDescriptor(LogicalType::TYPE_JSON);
for (auto nullable : {true, false}) {
auto opt_json_column_view = ColumnViewHelper::create_column_view(type_desc, nullable, 0, 0);
DCHECK(opt_json_column_view.has_value());
auto json_column_view = std::move(opt_json_column_view.value());
DCHECK(json_column_view->is_json_view());
}
}
PARALLEL_TEST(ColumnViewTest, test_create_binary_column_view) {
for (auto ltype : {LogicalType::TYPE_VARBINARY, LogicalType::TYPE_VARCHAR, LogicalType::TYPE_CHAR}) {
for (auto nullable : {true, false}) {
TypeDescriptor type_desc = TypeDescriptor(ltype);
auto opt_binary_column_view = ColumnViewHelper::create_column_view(type_desc, nullable, 0, 0);
DCHECK(opt_binary_column_view.has_value());
auto binary_column_view = std::move(opt_binary_column_view.value());
DCHECK(binary_column_view->is_binary_view());
}
}
}
PARALLEL_TEST(ColumnViewTest, test_create_map_column_view) {
TypeDescriptor type_desc = TypeDescriptor(LogicalType::TYPE_MAP);
TypeDescriptor field1_type_desc = TypeDescriptor::create_varchar_type(20);
TypeDescriptor field2_type_desc = TypeDescriptor::create_varchar_type(20);
type_desc.children.push_back(field1_type_desc);
type_desc.children.push_back(field2_type_desc);
for (auto nullable : {true, false}) {
auto opt_map_column_view = ColumnViewHelper::create_column_view(type_desc, nullable, 0, 0);
DCHECK(opt_map_column_view.has_value());
auto map_column_view = std::move(opt_map_column_view.value());
DCHECK(map_column_view->is_map_view());
}
}
} // namespace starrocks

View File

@ -808,6 +808,9 @@ public class SessionVariable implements Serializable, Writable, Cloneable {
public static final String BACK_PRESSURE_MAX_ROUNDS = "back_pressure_back_rounds";
public static final String BACK_PRESSURE_THROTTLE_TIME_UPPER_BOUND = "back_pressure_throttle_time_upper_bound";
public static final String COLUMN_VIEW_CONCAT_ROWS_LIMIT = "column_view_concat_rows_limit";
public static final String COLUMN_VIEW_CONCAT_BYTES_LIMIT = "column_view_concat_bytes_limit";
public static final List<String> DEPRECATED_VARIABLES = ImmutableList.<String>builder()
.add(CODEGEN_LEVEL)
.add(MAX_EXECUTION_TIME)
@ -1601,6 +1604,18 @@ public class SessionVariable implements Serializable, Writable, Cloneable {
@VarAttr(name = BACK_PRESSURE_THROTTLE_TIME_UPPER_BOUND)
private long backPressureThrottleTimeUpperBound = 300;
// when rows_num of ColumnView is less than column_view_concat_rows_limit or
// bytes size of ColumnView is less than column_view concat_bytes_limit, underlying columns of
// column view is concatenated together.
// 1. when both these variables are -1, ColumnView is disabled;
// 2. when either of these variables are 0, ColumnView is always enabled;
// 3. otherwise, ColumnView concatenation depends on its rows_num and bytes_size
@VarAttr(name = COLUMN_VIEW_CONCAT_ROWS_LIMIT)
private long columnViewConcatRowsLimit = -1;
@VarAttr(name = COLUMN_VIEW_CONCAT_BYTES_LIMIT)
private long columnViewConcatBytesLimit = 4294967296L;
public int getCboPruneJsonSubfieldDepth() {
return cboPruneJsonSubfieldDepth;
}
@ -4297,6 +4312,22 @@ public class SessionVariable implements Serializable, Writable, Cloneable {
this.backPressureThrottleTimeUpperBound = value;
}
public long getColumnViewConcatRowsLimit() {
return columnViewConcatRowsLimit;
}
public void setColumnViewConcatRowsLimit(long value) {
this.columnViewConcatRowsLimit = value;
}
public long getColumnViewConcatBytesLimit() {
return columnViewConcatBytesLimit;
}
public void setColumnViewConcatBytesLimit(long value) {
this.columnViewConcatBytesLimit = value;
}
// Serialize to thrift object
// used for rest api
public TQueryOptions toThrift() {
@ -4441,6 +4472,8 @@ public class SessionVariable implements Serializable, Writable, Cloneable {
tResult.setEnable_wait_dependent_event(enableWaitDependentEvent);
tResult.setConnector_max_split_size(connectorMaxSplitSize);
tResult.setOrc_use_column_names(orcUseColumnNames);
tResult.setColumn_view_concat_rows_limit(columnViewConcatRowsLimit);
tResult.setColumn_view_concat_bytes_limit(columnViewConcatRowsLimit);
return tResult;
}

View File

@ -320,6 +320,9 @@ struct TQueryOptions {
140: optional string catalog;
141: optional i32 datacache_evict_probability;
190: optional i64 column_view_concat_rows_limit;
191: optional i64 column_view_concat_bytes_limit;
}

View File

@ -0,0 +1,35 @@
-- name: test_async_recommend
function: prepare_data("ssb", "${db[0]}")
create tunespace ts0;
function: wait_partitions_available("ts0", 4)
alter tunespace ts0 append "Q1.1" select sum(lo_revenue) as revenue
from lineorder join dates on lo_orderdate = d_datekey
where d_year = 1993 and lo_discount between 1 and 3 and lo_quantity < 25;
alter tunespace ts0 append "Q1.2" select sum(lo_revenue) as revenue
from lineorder
join dates on lo_orderdate = d_datekey
where d_yearmonthnum = 199401
and lo_discount between 4 and 6
and lo_quantity between 26 and 35;
alter tunespace ts0 append "Q1.3" select sum(lo_revenue) as revenue
from lineorder
join dates on lo_orderdate = d_datekey
where d_weeknuminyear = 6 and d_year = 1994
and lo_discount between 5 and 7
and lo_quantity between 26 and 35;
set automv_card_rowcount_ratio_hwm = 1.0;
set automv_card_rowcount_ratio_lwm = 1.0;
set global automv_per_lattice_mv_selectivity_ratio = 1.0;
submit recommendations task 'task1' from ts0;
function: check_automv_async_recommend("task1")

View File

@ -0,0 +1,131 @@
-- name: test_column_view
function: prepare_data("ssb", "${db[0]}")
-- result:
None
-- !result
with cte as(
select lo_partkey,
array_agg(concat_ws(":",lo_partkey,c_custkey,c_name,c_city,c_nation,c_region,c_phone,c_mktsegment)) custkeys
from lineorder inner join customer on lo_custkey = c_custkey
where lo_orderdate between 19920101 and 19921231
group by lo_partkey
)
select /*+SET_VAR(column_view_concat_rows_limit=1,column_view_concat_bytes_limit=1)*/ count(1), 0!=sum(array_sum(array_map(x->murmur_hash3_32(x, p_name), custkeys)))
from part inner join[broadcast] cte on cte.lo_partkey = part.p_partkey;
-- result:
197929 1
-- !result
with cte as(
select lo_partkey,
group_concat(concat_ws(":",lo_partkey,c_custkey,c_name,c_city,c_nation,c_region,c_phone,c_mktsegment)) custkeys
from lineorder inner join customer on lo_custkey = c_custkey
where lo_orderdate between 19920101 and 19921231
group by lo_partkey
)
select /*+SET_VAR(column_view_concat_rows_limit=1,column_view_concat_bytes_limit=1)*/ count(1), 0!=sum(murmur_hash3_32(p_name, custkeys))
from part inner join[broadcast] cte on cte.lo_partkey = part.p_partkey;
-- result:
197929 1
-- !result
with cte as(
select lo_partkey,
json_object("value",group_concat(concat_ws(":",lo_partkey,c_custkey,c_name,c_city,c_nation,c_region,c_phone,c_mktsegment))) custkeys
from lineorder inner join customer on lo_custkey = c_custkey
where lo_orderdate between 19920101 and 19921231
group by lo_partkey
)
select /*+SET_VAR(column_view_concat_rows_limit=1,column_view_concat_bytes_limit=1)*/ count(1), 0!=sum(murmur_hash3_32(p_name, custkeys->'$.value'))
from part inner join[broadcast] cte on cte.lo_partkey = part.p_partkey;
-- result:
197929 1
-- !result
with cte as(
select lo_partkey,
named_struct("value",group_concat(concat_ws(":",lo_partkey,c_custkey,c_name,c_city,c_nation,c_region,c_phone,c_mktsegment))) custkeys
from lineorder inner join customer on lo_custkey = c_custkey
where lo_orderdate between 19920101 and 19921231
group by lo_partkey
)
select /*+SET_VAR(column_view_concat_rows_limit=1,column_view_concat_bytes_limit=1)*/ count(1), 0!=sum(murmur_hash3_32(p_name, custkeys.value))
from part inner join[broadcast] cte on cte.lo_partkey = part.p_partkey;
-- result:
197929 1
-- !result
with cte0 as(
select lo_partkey,
group_concat(concat_ws(":",lo_partkey,c_custkey,c_name,c_city,c_nation,c_region,c_phone,c_mktsegment)) custkeys
from lineorder inner join customer on lo_custkey = c_custkey
where lo_orderdate between 19920101 and 19921231
group by lo_partkey
),
cte as (
select lo_partkey, map{"value":custkeys} custkeys from cte0
)
select /*+SET_VAR(column_view_concat_rows_limit=1,column_view_concat_bytes_limit=1)*/ count(1), 0!=sum(murmur_hash3_32(p_name, custkeys["value"]))
from part inner join[broadcast] cte on cte.lo_partkey = part.p_partkey;
-- result:
197929 1
-- !result
with cte as(
select lo_partkey,
array_agg(concat_ws(":",lo_partkey,c_custkey,c_name,c_city,c_nation,c_region,c_phone,c_mktsegment)) custkeys
from lineorder inner join customer on lo_custkey = c_custkey
where lo_orderdate between 19920101 and 19921231
group by lo_partkey
)
select /*+SET_VAR(column_view_concat_rows_limit=10000000000000,column_view_concat_bytes_limit=1000000000000000)*/ count(1), 0!=sum(array_sum(array_map(x->murmur_hash3_32(x, p_name), custkeys)))
from part inner join[broadcast] cte on cte.lo_partkey = part.p_partkey;
-- result:
197929 1
-- !result
with cte as(
select lo_partkey,
group_concat(concat_ws(":",lo_partkey,c_custkey,c_name,c_city,c_nation,c_region,c_phone,c_mktsegment)) custkeys
from lineorder inner join customer on lo_custkey = c_custkey
where lo_orderdate between 19920101 and 19921231
group by lo_partkey
)
select /*+SET_VAR(column_view_concat_rows_limit=10000000000000,column_view_concat_bytes_limit=1000000000000000)*/ count(1), 0!=sum(murmur_hash3_32(p_name, custkeys))
from part inner join[broadcast] cte on cte.lo_partkey = part.p_partkey;
-- result:
197929 1
-- !result
with cte as(
select lo_partkey,
json_object("value",group_concat(concat_ws(":",lo_partkey,c_custkey,c_name,c_city,c_nation,c_region,c_phone,c_mktsegment))) custkeys
from lineorder inner join customer on lo_custkey = c_custkey
where lo_orderdate between 19920101 and 19921231
group by lo_partkey
)
select /*+SET_VAR(column_view_concat_rows_limit=10000000000000,column_view_concat_bytes_limit=1000000000000000)*/ count(1), 0!=sum(murmur_hash3_32(p_name, custkeys->'$.value'))
from part inner join[broadcast] cte on cte.lo_partkey = part.p_partkey;
-- result:
197929 1
-- !result
with cte as(
select lo_partkey,
named_struct("value",group_concat(concat_ws(":",lo_partkey,c_custkey,c_name,c_city,c_nation,c_region,c_phone,c_mktsegment))) custkeys
from lineorder inner join customer on lo_custkey = c_custkey
where lo_orderdate between 19920101 and 19921231
group by lo_partkey
)
select /*+SET_VAR(column_view_concat_rows_limit=10000000000000,column_view_concat_bytes_limit=1000000000000000)*/ count(1), 0!=sum(murmur_hash3_32(p_name, custkeys.value))
from part inner join[broadcast] cte on cte.lo_partkey = part.p_partkey;
-- result:
197929 1
-- !result
with cte0 as(
select lo_partkey,
group_concat(concat_ws(":",lo_partkey,c_custkey,c_name,c_city,c_nation,c_region,c_phone,c_mktsegment)) custkeys
from lineorder inner join customer on lo_custkey = c_custkey
where lo_orderdate between 19920101 and 19921231
group by lo_partkey
),
cte as (
select lo_partkey, map{"value":custkeys} custkeys from cte0
)
select /*+SET_VAR(column_view_concat_rows_limit=10000000000000,column_view_concat_bytes_limit=1000000000000000)*/ count(1), 0!=sum(murmur_hash3_32(p_name, custkeys["value"]))
from part inner join[broadcast] cte on cte.lo_partkey = part.p_partkey;
-- result:
197929 1
-- !result

View File

@ -0,0 +1,111 @@
-- name: test_column_view
function: prepare_data("ssb", "${db[0]}")
with cte as(
select lo_partkey,
array_agg(concat_ws(":",lo_partkey,c_custkey,c_name,c_city,c_nation,c_region,c_phone,c_mktsegment)) custkeys
from lineorder inner join customer on lo_custkey = c_custkey
where lo_orderdate between 19920101 and 19921231
group by lo_partkey
)
select /*+SET_VAR(column_view_concat_rows_limit=1,column_view_concat_bytes_limit=1)*/ count(1), 0!=sum(array_sum(array_map(x->murmur_hash3_32(x, p_name), custkeys)))
from part inner join[broadcast] cte on cte.lo_partkey = part.p_partkey;
with cte as(
select lo_partkey,
group_concat(concat_ws(":",lo_partkey,c_custkey,c_name,c_city,c_nation,c_region,c_phone,c_mktsegment)) custkeys
from lineorder inner join customer on lo_custkey = c_custkey
where lo_orderdate between 19920101 and 19921231
group by lo_partkey
)
select /*+SET_VAR(column_view_concat_rows_limit=1,column_view_concat_bytes_limit=1)*/ count(1), 0!=sum(murmur_hash3_32(p_name, custkeys))
from part inner join[broadcast] cte on cte.lo_partkey = part.p_partkey;
with cte as(
select lo_partkey,
json_object("value",group_concat(concat_ws(":",lo_partkey,c_custkey,c_name,c_city,c_nation,c_region,c_phone,c_mktsegment))) custkeys
from lineorder inner join customer on lo_custkey = c_custkey
where lo_orderdate between 19920101 and 19921231
group by lo_partkey
)
select /*+SET_VAR(column_view_concat_rows_limit=1,column_view_concat_bytes_limit=1)*/ count(1), 0!=sum(murmur_hash3_32(p_name, custkeys->'$.value'))
from part inner join[broadcast] cte on cte.lo_partkey = part.p_partkey;
with cte as(
select lo_partkey,
named_struct("value",group_concat(concat_ws(":",lo_partkey,c_custkey,c_name,c_city,c_nation,c_region,c_phone,c_mktsegment))) custkeys
from lineorder inner join customer on lo_custkey = c_custkey
where lo_orderdate between 19920101 and 19921231
group by lo_partkey
)
select /*+SET_VAR(column_view_concat_rows_limit=1,column_view_concat_bytes_limit=1)*/ count(1), 0!=sum(murmur_hash3_32(p_name, custkeys.value))
from part inner join[broadcast] cte on cte.lo_partkey = part.p_partkey;
with cte0 as(
select lo_partkey,
group_concat(concat_ws(":",lo_partkey,c_custkey,c_name,c_city,c_nation,c_region,c_phone,c_mktsegment)) custkeys
from lineorder inner join customer on lo_custkey = c_custkey
where lo_orderdate between 19920101 and 19921231
group by lo_partkey
),
cte as (
select lo_partkey, map{"value":custkeys} custkeys from cte0
)
select /*+SET_VAR(column_view_concat_rows_limit=1,column_view_concat_bytes_limit=1)*/ count(1), 0!=sum(murmur_hash3_32(p_name, custkeys["value"]))
from part inner join[broadcast] cte on cte.lo_partkey = part.p_partkey;
with cte as(
select lo_partkey,
array_agg(concat_ws(":",lo_partkey,c_custkey,c_name,c_city,c_nation,c_region,c_phone,c_mktsegment)) custkeys
from lineorder inner join customer on lo_custkey = c_custkey
where lo_orderdate between 19920101 and 19921231
group by lo_partkey
)
select /*+SET_VAR(column_view_concat_rows_limit=10000000000000,column_view_concat_bytes_limit=1000000000000000)*/ count(1), 0!=sum(array_sum(array_map(x->murmur_hash3_32(x, p_name), custkeys)))
from part inner join[broadcast] cte on cte.lo_partkey = part.p_partkey;
with cte as(
select lo_partkey,
group_concat(concat_ws(":",lo_partkey,c_custkey,c_name,c_city,c_nation,c_region,c_phone,c_mktsegment)) custkeys
from lineorder inner join customer on lo_custkey = c_custkey
where lo_orderdate between 19920101 and 19921231
group by lo_partkey
)
select /*+SET_VAR(column_view_concat_rows_limit=10000000000000,column_view_concat_bytes_limit=1000000000000000)*/ count(1), 0!=sum(murmur_hash3_32(p_name, custkeys))
from part inner join[broadcast] cte on cte.lo_partkey = part.p_partkey;
with cte as(
select lo_partkey,
json_object("value",group_concat(concat_ws(":",lo_partkey,c_custkey,c_name,c_city,c_nation,c_region,c_phone,c_mktsegment))) custkeys
from lineorder inner join customer on lo_custkey = c_custkey
where lo_orderdate between 19920101 and 19921231
group by lo_partkey
)
select /*+SET_VAR(column_view_concat_rows_limit=10000000000000,column_view_concat_bytes_limit=1000000000000000)*/ count(1), 0!=sum(murmur_hash3_32(p_name, custkeys->'$.value'))
from part inner join[broadcast] cte on cte.lo_partkey = part.p_partkey;
with cte as(
select lo_partkey,
named_struct("value",group_concat(concat_ws(":",lo_partkey,c_custkey,c_name,c_city,c_nation,c_region,c_phone,c_mktsegment))) custkeys
from lineorder inner join customer on lo_custkey = c_custkey
where lo_orderdate between 19920101 and 19921231
group by lo_partkey
)
select /*+SET_VAR(column_view_concat_rows_limit=10000000000000,column_view_concat_bytes_limit=1000000000000000)*/ count(1), 0!=sum(murmur_hash3_32(p_name, custkeys.value))
from part inner join[broadcast] cte on cte.lo_partkey = part.p_partkey;
with cte0 as(
select lo_partkey,
group_concat(concat_ws(":",lo_partkey,c_custkey,c_name,c_city,c_nation,c_region,c_phone,c_mktsegment)) custkeys
from lineorder inner join customer on lo_custkey = c_custkey
where lo_orderdate between 19920101 and 19921231
group by lo_partkey
),
cte as (
select lo_partkey, map{"value":custkeys} custkeys from cte0
)
select /*+SET_VAR(column_view_concat_rows_limit=10000000000000,column_view_concat_bytes_limit=1000000000000000)*/ count(1), 0!=sum(murmur_hash3_32(p_name, custkeys["value"]))
from part inner join[broadcast] cte on cte.lo_partkey = part.p_partkey;