Signed-off-by: zihe.liu <ziheliu1024@gmail.com> Co-authored-by: zihe.liu <ziheliu1024@gmail.com>
This commit is contained in:
parent
7d45558eb5
commit
0eb1a93ca0
|
|
@ -120,6 +120,16 @@ public:
|
|||
|
||||
const ChunkPtr& back() { return _chunks.back(); }
|
||||
|
||||
void append_selective_to_back(const Chunk& src, const uint32_t* indexes, uint32_t from, uint32_t size) {
|
||||
auto& chunk = _chunks.back();
|
||||
const size_t prev_bytes = chunk->memory_usage();
|
||||
|
||||
chunk->append_selective(src, indexes, from, size);
|
||||
const size_t new_bytes = chunk->memory_usage();
|
||||
|
||||
_tracker->consume(new_bytes - prev_bytes);
|
||||
}
|
||||
|
||||
bool is_full() const {
|
||||
return _chunks.size() >= 4 || _tracker->consumption() > config::partition_hash_join_probe_limit_size;
|
||||
}
|
||||
|
|
@ -213,10 +223,10 @@ Status PartitionedHashJoinProberImpl::push_probe_chunk(RuntimeState* state, Chun
|
|||
}
|
||||
std::vector<uint32_t> hash_values;
|
||||
{
|
||||
hash_values.assign(num_rows, HashUtil::FNV_SEED);
|
||||
hash_values.assign(num_rows, 0);
|
||||
|
||||
for (const ColumnPtr& column : partition_columns) {
|
||||
column->fnv_hash(hash_values.data(), 0, num_rows);
|
||||
column->crc32_hash(hash_values.data(), 0, num_rows);
|
||||
}
|
||||
// find partition id
|
||||
for (size_t i = 0; i < hash_values.size(); ++i) {
|
||||
|
|
@ -364,7 +374,7 @@ bool SingleHashJoinBuilder::anti_join_key_column_has_null() const {
|
|||
return false;
|
||||
}
|
||||
|
||||
Status SingleHashJoinBuilder::do_append_chunk(const ChunkPtr& chunk) {
|
||||
Status SingleHashJoinBuilder::do_append_chunk(RuntimeState* state, const ChunkPtr& chunk) {
|
||||
if (UNLIKELY(_ht.get_row_count() + chunk->num_rows() >= max_hash_table_element_size)) {
|
||||
return Status::NotSupported(strings::Substitute("row count of right table in hash join > $0", UINT32_MAX));
|
||||
}
|
||||
|
|
@ -406,7 +416,7 @@ enum class CacheLevel { L2, L3, MEMORY };
|
|||
|
||||
class AdaptivePartitionHashJoinBuilder final : public HashJoinBuilder {
|
||||
public:
|
||||
AdaptivePartitionHashJoinBuilder(HashJoiner& hash_joiner);
|
||||
explicit AdaptivePartitionHashJoinBuilder(HashJoiner& hash_joiner);
|
||||
~AdaptivePartitionHashJoinBuilder() override = default;
|
||||
|
||||
void create(const HashTableParam& param) override;
|
||||
|
|
@ -415,7 +425,7 @@ public:
|
|||
|
||||
void reset(const HashTableParam& param) override;
|
||||
|
||||
Status do_append_chunk(const ChunkPtr& chunk) override;
|
||||
Status do_append_chunk(RuntimeState* state, const ChunkPtr& chunk) override;
|
||||
|
||||
Status build(RuntimeState* state) override;
|
||||
|
||||
|
|
@ -434,27 +444,53 @@ public:
|
|||
|
||||
void clone_readable(HashJoinBuilder* builder) override;
|
||||
|
||||
Status prepare_for_spill_start(RuntimeState* state) override;
|
||||
ChunkPtr convert_to_spill_schema(const ChunkPtr& chunk) const override;
|
||||
|
||||
private:
|
||||
size_t _estimated_row_size(const HashTableParam& param) const;
|
||||
size_t _estimated_probe_cost(const HashTableParam& param) const;
|
||||
static double _calculate_cache_miss_factor(const HashJoiner& hash_joiner);
|
||||
|
||||
size_t _estimate_hash_table_probing_bytes_per_row(const HashTableParam& param) const;
|
||||
size_t _estimate_probe_row_bytes(const HashTableParam& param) const;
|
||||
template <CacheLevel T>
|
||||
size_t _estimated_build_cost(size_t build_row_size) const;
|
||||
void _adjust_partition_rows(size_t build_row_size);
|
||||
size_t _estimate_cost_by_bytes(size_t row_bytes) const;
|
||||
|
||||
void _init_partition_nums(const HashTableParam& param);
|
||||
Status _convert_to_single_partition();
|
||||
Status _append_chunk_to_partitions(const ChunkPtr& chunk);
|
||||
void _adjust_partition_rows(size_t hash_table_bytes_per_row, size_t hash_table_probing_bytes_per_row);
|
||||
|
||||
Status _do_append_chunk(RuntimeState* state, const ChunkPtr& chunk);
|
||||
Status _append_chunk_to_partitions(RuntimeState* state, const ChunkPtr& chunk);
|
||||
Status _transfer_to_appending_stage(RuntimeState* state);
|
||||
Status _convert_to_single_partition(RuntimeState* state);
|
||||
Status _flush_buffer_chunks(RuntimeState* state);
|
||||
|
||||
bool _need_partition_join_for_build(size_t ht_num_rows) const;
|
||||
bool _need_partition_join_for_append(size_t ht_num_rows) const;
|
||||
|
||||
private:
|
||||
std::vector<std::unique_ptr<SingleHashJoinBuilder>> _builders;
|
||||
|
||||
size_t _partition_num = 0;
|
||||
size_t _partition_join_min_rows = 0;
|
||||
size_t _partition_join_max_rows = 0;
|
||||
// Split append chunk into two stages:
|
||||
// - BUFFERING: buffers chunks without partitioning until the number of rows exceeds _partition_join_l2_max_rows or _partition_join_l3_max_rows.
|
||||
// - APPENDING: partitions all incoming chunks.
|
||||
enum class Stage { BUFFERING, APPENDING };
|
||||
Stage _stage = Stage::BUFFERING;
|
||||
MemTracker _mem_tracker;
|
||||
std::vector<PartitionChunkChannel> _partition_input_channels;
|
||||
std::vector<ChunkPtr> _unpartition_chunks;
|
||||
|
||||
size_t _probe_estimated_costs = 0;
|
||||
size_t _partition_num = 0;
|
||||
|
||||
size_t _hash_table_probing_bytes_per_row = 0;
|
||||
size_t _hash_table_bytes_per_row = 0;
|
||||
size_t _partition_join_l2_min_rows = 0;
|
||||
size_t _partition_join_l2_max_rows = 0;
|
||||
size_t _partition_join_l3_min_rows = 0;
|
||||
size_t _partition_join_l3_max_rows = 0;
|
||||
|
||||
size_t _probe_row_shuffle_cost = 0;
|
||||
size_t _l2_benefit = 0;
|
||||
size_t _l3_benefit = 0;
|
||||
|
||||
size_t _fit_L2_cache_max_rows = 0;
|
||||
size_t _fit_L3_cache_max_rows = 0;
|
||||
|
|
@ -463,10 +499,15 @@ private:
|
|||
size_t _L3_cache_size = 0;
|
||||
|
||||
size_t _pushed_chunks = 0;
|
||||
|
||||
// Shared read-only data accessed concurrently by threads can lead to better cache performance.
|
||||
// Therefore, for broadcast joins, this parameter is used to reduce benefit of partitioned hash joins as the number
|
||||
// of prober threads (DOP) increases.
|
||||
const double _cache_miss_factor;
|
||||
};
|
||||
|
||||
AdaptivePartitionHashJoinBuilder::AdaptivePartitionHashJoinBuilder(HashJoiner& hash_joiner)
|
||||
: HashJoinBuilder(hash_joiner) {
|
||||
: HashJoinBuilder(hash_joiner), _cache_miss_factor(_calculate_cache_miss_factor(hash_joiner)) {
|
||||
static constexpr size_t DEFAULT_L2_CACHE_SIZE = 1 * 1024 * 1024;
|
||||
static constexpr size_t DEFAULT_L3_CACHE_SIZE = 32 * 1024 * 1024;
|
||||
const auto& cache_sizes = CpuInfo::get_cache_sizes();
|
||||
|
|
@ -476,100 +517,171 @@ AdaptivePartitionHashJoinBuilder::AdaptivePartitionHashJoinBuilder(HashJoiner& h
|
|||
_L3_cache_size = _L3_cache_size ? _L3_cache_size : DEFAULT_L3_CACHE_SIZE;
|
||||
}
|
||||
|
||||
size_t AdaptivePartitionHashJoinBuilder::_estimated_row_size(const HashTableParam& param) const {
|
||||
double AdaptivePartitionHashJoinBuilder::_calculate_cache_miss_factor(const HashJoiner& hash_joiner) {
|
||||
if (hash_joiner.distribution_mode() != TJoinDistributionMode::BROADCAST) {
|
||||
return 1.0; // No broadcast join, no cache reuse between different probers.
|
||||
}
|
||||
|
||||
const size_t max_prober_dop = hash_joiner.max_dop();
|
||||
if (max_prober_dop <= 1) {
|
||||
return 1.0;
|
||||
}
|
||||
if (max_prober_dop > 8) {
|
||||
return 0.1;
|
||||
}
|
||||
return 1 - (max_prober_dop - 1) * 0.1;
|
||||
}
|
||||
|
||||
size_t AdaptivePartitionHashJoinBuilder::_estimate_hash_table_probing_bytes_per_row(const HashTableParam& param) const {
|
||||
size_t estimated_each_row = 0;
|
||||
|
||||
// Probing a row need
|
||||
// 1. touch `first` and `next` vectors,
|
||||
// 2 and compare join keys between builder and prober.
|
||||
// 3. output columns from the build side.
|
||||
|
||||
// 1. `first` and `next` bytes
|
||||
estimated_each_row += 8;
|
||||
|
||||
// 2. key bytes
|
||||
for (const auto& join_key : param.join_keys) {
|
||||
if (join_key.type != nullptr) {
|
||||
estimated_each_row += get_size_of_fixed_length_type(join_key.type->type);
|
||||
// The benefit from non-fixed key columns is less than those from fixed key columns, so the penalty (/4) is applied here.
|
||||
estimated_each_row += type_estimated_overhead_bytes(join_key.type->type) / 4;
|
||||
}
|
||||
}
|
||||
|
||||
// 3. output bytes
|
||||
for (auto* tuple : param.build_row_desc->tuple_descriptors()) {
|
||||
for (auto slot : tuple->slots()) {
|
||||
if (param.build_output_slots.contains(slot->id())) {
|
||||
for (const auto* slot : tuple->slots()) {
|
||||
if (param.build_output_slots.empty() || param.build_output_slots.contains(slot->id())) {
|
||||
estimated_each_row += get_size_of_fixed_length_type(slot->type().type);
|
||||
estimated_each_row += type_estimated_overhead_bytes(slot->type().type);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// for hash table bucket
|
||||
estimated_each_row += 4;
|
||||
|
||||
return estimated_each_row;
|
||||
return std::max<size_t>(estimated_each_row * _cache_miss_factor, 1);
|
||||
}
|
||||
|
||||
// We could use a better estimation model.
|
||||
size_t AdaptivePartitionHashJoinBuilder::_estimated_probe_cost(const HashTableParam& param) const {
|
||||
size_t AdaptivePartitionHashJoinBuilder::_estimate_probe_row_bytes(const HashTableParam& param) const {
|
||||
size_t size = 0;
|
||||
|
||||
// shuffling probe bytes
|
||||
for (auto* tuple : param.probe_row_desc->tuple_descriptors()) {
|
||||
for (auto slot : tuple->slots()) {
|
||||
if (param.probe_output_slots.contains(slot->id())) {
|
||||
size += get_size_of_fixed_length_type(slot->type().type);
|
||||
size += type_estimated_overhead_bytes(slot->type().type);
|
||||
}
|
||||
for (const auto* slot : tuple->slots()) {
|
||||
size += get_size_of_fixed_length_type(slot->type().type);
|
||||
size += type_estimated_overhead_bytes(slot->type().type);
|
||||
}
|
||||
}
|
||||
// we define probe cost is bytes size * 6
|
||||
return size * 6;
|
||||
|
||||
return std::max<size_t>(size, 1);
|
||||
}
|
||||
|
||||
template <>
|
||||
size_t AdaptivePartitionHashJoinBuilder::_estimated_build_cost<CacheLevel::L2>(size_t build_row_size) const {
|
||||
return build_row_size / 2;
|
||||
size_t AdaptivePartitionHashJoinBuilder::_estimate_cost_by_bytes<CacheLevel::L2>(size_t row_bytes) const {
|
||||
return row_bytes / 2;
|
||||
}
|
||||
|
||||
template <>
|
||||
size_t AdaptivePartitionHashJoinBuilder::_estimated_build_cost<CacheLevel::L3>(size_t build_row_size) const {
|
||||
return build_row_size;
|
||||
size_t AdaptivePartitionHashJoinBuilder::_estimate_cost_by_bytes<CacheLevel::L3>(size_t row_bytes) const {
|
||||
return row_bytes;
|
||||
}
|
||||
|
||||
template <>
|
||||
size_t AdaptivePartitionHashJoinBuilder::_estimated_build_cost<CacheLevel::MEMORY>(size_t build_row_size) const {
|
||||
return build_row_size * 2;
|
||||
size_t AdaptivePartitionHashJoinBuilder::_estimate_cost_by_bytes<CacheLevel::MEMORY>(size_t row_bytes) const {
|
||||
return row_bytes * 2;
|
||||
}
|
||||
|
||||
void AdaptivePartitionHashJoinBuilder::_adjust_partition_rows(size_t build_row_size) {
|
||||
build_row_size = std::max(build_row_size, 4UL);
|
||||
_fit_L2_cache_max_rows = _L2_cache_size / build_row_size;
|
||||
_fit_L3_cache_max_rows = _L3_cache_size / build_row_size;
|
||||
bool AdaptivePartitionHashJoinBuilder::_need_partition_join_for_build(size_t ht_num_rows) const {
|
||||
return (_partition_join_l2_min_rows < ht_num_rows && ht_num_rows <= _partition_join_l2_max_rows) ||
|
||||
(_partition_join_l3_min_rows < ht_num_rows && ht_num_rows <= _partition_join_l3_max_rows);
|
||||
}
|
||||
|
||||
// If the hash table is smaller than the L2 cache. we don't think partition hash join is needed.
|
||||
_partition_join_min_rows = _fit_L2_cache_max_rows;
|
||||
// If the hash table after partition can't be loaded to L3. we don't think partition hash join is needed.
|
||||
_partition_join_max_rows = _fit_L3_cache_max_rows * _partition_num;
|
||||
bool AdaptivePartitionHashJoinBuilder::_need_partition_join_for_append(size_t ht_num_rows) const {
|
||||
return ht_num_rows <= _partition_join_l2_max_rows || ht_num_rows <= _partition_join_l3_max_rows;
|
||||
}
|
||||
|
||||
if (_probe_estimated_costs + _estimated_build_cost<CacheLevel::L2>(build_row_size) <
|
||||
_estimated_build_cost<CacheLevel::L3>(build_row_size)) {
|
||||
// overhead after hash table partitioning + probe extra cost < cost before partitioning
|
||||
// nothing to do
|
||||
} else if (_probe_estimated_costs + _estimated_build_cost<CacheLevel::L3>(build_row_size) <
|
||||
_estimated_build_cost<CacheLevel::MEMORY>(build_row_size)) {
|
||||
// It is only after this that performance gains can be realized beyond the L3 cache.
|
||||
_partition_join_min_rows = _fit_L3_cache_max_rows;
|
||||
void AdaptivePartitionHashJoinBuilder::_adjust_partition_rows(size_t hash_table_bytes_per_row,
|
||||
size_t hash_table_probing_bytes_per_row) {
|
||||
if (hash_table_bytes_per_row == _hash_table_bytes_per_row &&
|
||||
hash_table_probing_bytes_per_row == _hash_table_probing_bytes_per_row) {
|
||||
return; // No need to adjust partition rows.
|
||||
}
|
||||
|
||||
_hash_table_bytes_per_row = hash_table_bytes_per_row;
|
||||
_hash_table_probing_bytes_per_row = hash_table_probing_bytes_per_row;
|
||||
|
||||
_fit_L2_cache_max_rows = _L2_cache_size / hash_table_bytes_per_row;
|
||||
_fit_L3_cache_max_rows = _L3_cache_size / hash_table_bytes_per_row;
|
||||
|
||||
_partition_join_l2_min_rows = -1;
|
||||
_partition_join_l2_max_rows = 0;
|
||||
_partition_join_l3_min_rows = -1;
|
||||
_partition_join_l3_max_rows = 0;
|
||||
|
||||
const auto l2_benefit = _estimate_cost_by_bytes<CacheLevel::L3>(hash_table_probing_bytes_per_row) -
|
||||
_estimate_cost_by_bytes<CacheLevel::L2>(hash_table_probing_bytes_per_row);
|
||||
const auto l3_benefit = _estimate_cost_by_bytes<CacheLevel::MEMORY>(hash_table_probing_bytes_per_row) -
|
||||
_estimate_cost_by_bytes<CacheLevel::L3>(hash_table_probing_bytes_per_row);
|
||||
|
||||
if (_probe_row_shuffle_cost < l3_benefit) { // Partitioned joins benefit from L3 cache.
|
||||
// Partitioned joins benefit from L3 cache when probing a row has cache miss in non-partitioned join but not in partitioned join.
|
||||
// 1. min_rows > (l3_cache_size/hash_table_bytes_per_row)*(l3_benefit/(l3_benefit-_probe_row_shuffle_cost)), because:
|
||||
// - l3_benefit * non_partition_cache_miss_rate > _probe_row_shuffle_cost
|
||||
// - non_partition_cache_miss_rate = 1 - l3_cache_size/(min_rows*hash_table_bytes_per_row)
|
||||
// 2. max_rows < (l3_cache_size/hash_table_bytes_per_row)*(l3_benefit/_probe_row_shuffle_cost)*num_partitions, because:
|
||||
// - l3_benefit * partition_cache_hit_rate > _probe_row_shuffle_cost
|
||||
// - partition_cache_hit_rate = l3_cache_size/(max_rows_per_partition*hash_table_bytes_per_row)
|
||||
_partition_join_l3_min_rows = _fit_L3_cache_max_rows * l3_benefit / (l3_benefit - _probe_row_shuffle_cost);
|
||||
_partition_join_l3_max_rows = _fit_L3_cache_max_rows * _partition_num * l3_benefit / _probe_row_shuffle_cost;
|
||||
_partition_join_l3_max_rows *= 2; // relax the restriction
|
||||
|
||||
if (_probe_row_shuffle_cost < l2_benefit) { // Partitioned joins benefit from L2 cache.
|
||||
_partition_join_l2_min_rows = _fit_L2_cache_max_rows * l2_benefit / (l2_benefit - _probe_row_shuffle_cost);
|
||||
_partition_join_l2_min_rows *= 2; // Make the restriction more stringent
|
||||
_partition_join_l2_max_rows =
|
||||
(_fit_L2_cache_max_rows * _partition_num) * l2_benefit / _probe_row_shuffle_cost;
|
||||
}
|
||||
} else {
|
||||
// Partitioned joins don't have performance gains. Not using partition hash join.
|
||||
_partition_num = 1;
|
||||
}
|
||||
|
||||
VLOG_OPERATOR << "TRACE:"
|
||||
<< "partition_num=" << _partition_num << " partition_join_min_rows=" << _partition_join_min_rows
|
||||
<< " partition_join_max_rows=" << _partition_join_max_rows << " probe cost=" << _probe_estimated_costs
|
||||
<< " build cost L2=" << _estimated_build_cost<CacheLevel::L2>(build_row_size)
|
||||
<< " build cost L3=" << _estimated_build_cost<CacheLevel::L3>(build_row_size)
|
||||
<< " build cost Mem=" << _estimated_build_cost<CacheLevel::MEMORY>(build_row_size);
|
||||
_l2_benefit = l2_benefit;
|
||||
_l3_benefit = l3_benefit;
|
||||
|
||||
VLOG_OPERATOR << "TRACE: _adjust_partition_rows "
|
||||
<< "[partition_num=" << _partition_num << "] "
|
||||
<< "[partition_join_l2_min_rows=" << _partition_join_l2_min_rows << "] "
|
||||
<< "[partition_join_l2_max_rows=" << _partition_join_l2_max_rows << "] "
|
||||
<< "[partition_join_l3_min_rows=" << _partition_join_l3_min_rows << "] "
|
||||
<< "[partition_join_l3_max_rows=" << _partition_join_l3_max_rows << "] "
|
||||
<< "[hash_table_probing_bytes_per_row=" << hash_table_probing_bytes_per_row << "] "
|
||||
<< "[hash_table_bytes_per_row=" << hash_table_bytes_per_row << "] "
|
||||
<< "[l2_benefit=" << l2_benefit << "] "
|
||||
<< "[l3_benefit=" << l3_benefit << "] "
|
||||
<< "[probe_shuffle_cost=" << _probe_row_shuffle_cost << "] ";
|
||||
}
|
||||
|
||||
void AdaptivePartitionHashJoinBuilder::_init_partition_nums(const HashTableParam& param) {
|
||||
_partition_num = 16;
|
||||
|
||||
size_t estimated_bytes_each_row = _estimated_row_size(param);
|
||||
_probe_row_shuffle_cost =
|
||||
std::max<size_t>(_estimate_cost_by_bytes<CacheLevel::L3>(_estimate_probe_row_bytes(param)), 1);
|
||||
|
||||
_probe_estimated_costs = _estimated_probe_cost(param);
|
||||
const size_t hash_table_probing_bytes_per_row = _estimate_hash_table_probing_bytes_per_row(param);
|
||||
_adjust_partition_rows(1, hash_table_probing_bytes_per_row);
|
||||
|
||||
_adjust_partition_rows(estimated_bytes_each_row);
|
||||
|
||||
COUNTER_SET(_hash_joiner.build_metrics().partition_nums, (int64_t)_partition_num);
|
||||
COUNTER_SET(_hash_joiner.build_metrics().partition_nums, static_cast<int64_t>(_partition_num));
|
||||
}
|
||||
|
||||
void AdaptivePartitionHashJoinBuilder::create(const HashTableParam& param) {
|
||||
_init_partition_nums(param);
|
||||
|
||||
if (_partition_num > 1) {
|
||||
_partition_input_channels.resize(_partition_num, PartitionChunkChannel(&_mem_tracker));
|
||||
}
|
||||
for (size_t i = 0; i < _partition_num; ++i) {
|
||||
_builders.emplace_back(std::make_unique<SingleHashJoinBuilder>(_hash_joiner));
|
||||
_builders.back()->create(param);
|
||||
|
|
@ -581,10 +693,14 @@ void AdaptivePartitionHashJoinBuilder::close() {
|
|||
builder->close();
|
||||
}
|
||||
_builders.clear();
|
||||
_partition_input_channels.clear();
|
||||
_partition_num = 0;
|
||||
_partition_join_min_rows = 0;
|
||||
_partition_join_max_rows = 0;
|
||||
_probe_estimated_costs = 0;
|
||||
_partition_join_l2_min_rows = 0;
|
||||
_partition_join_l2_max_rows = 0;
|
||||
_partition_join_l3_min_rows = 0;
|
||||
_partition_join_l3_max_rows = 0;
|
||||
_probe_row_shuffle_cost = 0;
|
||||
_hash_table_probing_bytes_per_row = 0;
|
||||
_fit_L2_cache_max_rows = 0;
|
||||
_fit_L3_cache_max_rows = 0;
|
||||
_pushed_chunks = 0;
|
||||
|
|
@ -639,17 +755,70 @@ int64_t AdaptivePartitionHashJoinBuilder::ht_mem_usage() const {
|
|||
[](int64_t sum, const auto& builder) { return sum + builder->ht_mem_usage(); });
|
||||
}
|
||||
|
||||
Status AdaptivePartitionHashJoinBuilder::_convert_to_single_partition() {
|
||||
Status AdaptivePartitionHashJoinBuilder::_convert_to_single_partition(RuntimeState* state) {
|
||||
VLOG_OPERATOR << "TRACE: convert_to_single_partition "
|
||||
<< "[partition_num=" << _partition_num << "] "
|
||||
<< "[partition_join_l2_min_rows=" << _partition_join_l2_min_rows << "] "
|
||||
<< "[partition_join_l2_max_rows=" << _partition_join_l2_max_rows << "] "
|
||||
<< "[partition_join_l3_min_rows=" << _partition_join_l3_min_rows << "] "
|
||||
<< "[partition_join_l3_max_rows=" << _partition_join_l3_max_rows << "] "
|
||||
<< "[hash_table_row_count=" << hash_table_row_count() << "] ";
|
||||
|
||||
// merge all partition data to the first partition
|
||||
for (size_t i = 1; i < _builders.size(); ++i) {
|
||||
_builders[0]->hash_table().merge_ht(_builders[i]->hash_table());
|
||||
if (_stage == Stage::BUFFERING) {
|
||||
_mem_tracker.set(0);
|
||||
for (const auto& unpartition_chunk : _unpartition_chunks) {
|
||||
RETURN_IF_ERROR(_builders[0]->do_append_chunk(state, unpartition_chunk));
|
||||
}
|
||||
_unpartition_chunks.clear();
|
||||
} else {
|
||||
for (size_t i = 0; i < _builders.size(); ++i) {
|
||||
if (i != 0) {
|
||||
_builders[0]->hash_table().merge_ht(_builders[i]->hash_table());
|
||||
}
|
||||
auto& channel = _partition_input_channels[i];
|
||||
while (!channel.is_empty()) {
|
||||
RETURN_IF_ERROR(_builders[0]->do_append_chunk(state, channel.pull()));
|
||||
}
|
||||
}
|
||||
_partition_input_channels.clear();
|
||||
}
|
||||
_builders.resize(1);
|
||||
|
||||
_partition_num = 1;
|
||||
COUNTER_SET(_hash_joiner.build_metrics().partition_nums, static_cast<int64_t>(1));
|
||||
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status AdaptivePartitionHashJoinBuilder::_append_chunk_to_partitions(const ChunkPtr& chunk) {
|
||||
Status AdaptivePartitionHashJoinBuilder::_transfer_to_appending_stage(RuntimeState* state) {
|
||||
_stage = Stage::APPENDING;
|
||||
_mem_tracker.set(0); // All the buffered chunks are moved to the partition builders, so clear the memory tracker.
|
||||
for (const auto& unpartition_chunk : _unpartition_chunks) {
|
||||
RETURN_IF_ERROR(_append_chunk_to_partitions(state, unpartition_chunk));
|
||||
}
|
||||
_unpartition_chunks.clear();
|
||||
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status AdaptivePartitionHashJoinBuilder::_do_append_chunk(RuntimeState* state, const ChunkPtr& chunk) {
|
||||
if (_stage == Stage::BUFFERING) {
|
||||
_mem_tracker.consume(chunk->memory_usage());
|
||||
_unpartition_chunks.push_back(chunk);
|
||||
|
||||
const size_t num_rows = hash_table_row_count();
|
||||
if (num_rows >= _partition_join_l2_min_rows || num_rows >= _partition_join_l3_min_rows) {
|
||||
RETURN_IF_ERROR(_transfer_to_appending_stage(state));
|
||||
}
|
||||
|
||||
return Status::OK();
|
||||
} else {
|
||||
return _append_chunk_to_partitions(state, chunk);
|
||||
}
|
||||
}
|
||||
|
||||
Status AdaptivePartitionHashJoinBuilder::_append_chunk_to_partitions(RuntimeState* state, const ChunkPtr& chunk) {
|
||||
const std::vector<ExprContext*>& build_partition_keys = _hash_joiner.build_expr_ctxs();
|
||||
|
||||
size_t num_rows = chunk->num_rows();
|
||||
|
|
@ -662,10 +831,10 @@ Status AdaptivePartitionHashJoinBuilder::_append_chunk_to_partitions(const Chunk
|
|||
}
|
||||
std::vector<uint32_t> hash_values;
|
||||
{
|
||||
hash_values.assign(num_rows, HashUtil::FNV_SEED);
|
||||
hash_values.assign(num_rows, 0);
|
||||
|
||||
for (const ColumnPtr& column : partition_columns) {
|
||||
column->fnv_hash(hash_values.data(), 0, num_rows);
|
||||
column->crc32_hash(hash_values.data(), 0, num_rows);
|
||||
}
|
||||
// find partition id
|
||||
for (size_t i = 0; i < hash_values.size(); ++i) {
|
||||
|
|
@ -700,45 +869,83 @@ Status AdaptivePartitionHashJoinBuilder::_append_chunk_to_partitions(const Chunk
|
|||
if (size == 0) {
|
||||
continue;
|
||||
}
|
||||
// TODO: make builder implements append with selective
|
||||
auto partition_chunk = chunk->clone_empty();
|
||||
partition_chunk->append_selective(*chunk, selection.data(), from, size);
|
||||
RETURN_IF_ERROR(_builders[i]->append_chunk(std::move(partition_chunk)));
|
||||
|
||||
auto& channel = _partition_input_channels[i];
|
||||
|
||||
if (channel.is_empty()) {
|
||||
channel.push(chunk->clone_empty());
|
||||
}
|
||||
|
||||
if (channel.back()->num_rows() + size <= state->chunk_size()) {
|
||||
channel.append_selective_to_back(*chunk, selection.data(), from, size);
|
||||
} else {
|
||||
channel.push(chunk->clone_empty());
|
||||
channel.append_selective_to_back(*chunk, selection.data(), from, size);
|
||||
}
|
||||
|
||||
while (channel.is_full()) {
|
||||
RETURN_IF_ERROR(_builders[i]->append_chunk(state, channel.pull()));
|
||||
}
|
||||
}
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status AdaptivePartitionHashJoinBuilder::do_append_chunk(const ChunkPtr& chunk) {
|
||||
if (_partition_num > 1 && hash_table_row_count() > _partition_join_max_rows) {
|
||||
RETURN_IF_ERROR(_convert_to_single_partition());
|
||||
Status AdaptivePartitionHashJoinBuilder::do_append_chunk(RuntimeState* state, const ChunkPtr& chunk) {
|
||||
if (_partition_num > 1 && !_need_partition_join_for_append(hash_table_row_count())) {
|
||||
RETURN_IF_ERROR(_convert_to_single_partition(state));
|
||||
}
|
||||
|
||||
if (_partition_num > 1 && ++_pushed_chunks % 8 == 0) {
|
||||
size_t build_row_size = ht_mem_usage() / hash_table_row_count();
|
||||
_adjust_partition_rows(build_row_size);
|
||||
const size_t build_row_size = (ht_mem_usage() + _mem_tracker.consumption()) / hash_table_row_count();
|
||||
_adjust_partition_rows(build_row_size, _hash_table_probing_bytes_per_row);
|
||||
if (_partition_num == 1) {
|
||||
RETURN_IF_ERROR(_convert_to_single_partition());
|
||||
RETURN_IF_ERROR(_convert_to_single_partition(state));
|
||||
}
|
||||
}
|
||||
|
||||
if (_partition_num > 1) {
|
||||
RETURN_IF_ERROR(_append_chunk_to_partitions(chunk));
|
||||
RETURN_IF_ERROR(_do_append_chunk(state, chunk));
|
||||
} else {
|
||||
RETURN_IF_ERROR(_builders[0]->do_append_chunk(chunk));
|
||||
RETURN_IF_ERROR(_builders[0]->do_append_chunk(state, chunk));
|
||||
}
|
||||
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status AdaptivePartitionHashJoinBuilder::prepare_for_spill_start(RuntimeState* state) {
|
||||
if (_partition_num > 1) {
|
||||
return _flush_buffer_chunks(state);
|
||||
}
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
ChunkPtr AdaptivePartitionHashJoinBuilder::convert_to_spill_schema(const ChunkPtr& chunk) const {
|
||||
return _builders[0]->convert_to_spill_schema(chunk);
|
||||
}
|
||||
|
||||
Status AdaptivePartitionHashJoinBuilder::_flush_buffer_chunks(RuntimeState* state) {
|
||||
if (_stage == Stage::BUFFERING) {
|
||||
RETURN_IF_ERROR(_transfer_to_appending_stage(state));
|
||||
}
|
||||
for (size_t i = 0; i < _partition_input_channels.size(); ++i) {
|
||||
auto& channel = _partition_input_channels[i];
|
||||
while (!channel.is_empty()) {
|
||||
RETURN_IF_ERROR(_builders[i]->do_append_chunk(state, channel.pull()));
|
||||
}
|
||||
}
|
||||
|
||||
return Status::OK();
|
||||
}
|
||||
|
||||
Status AdaptivePartitionHashJoinBuilder::build(RuntimeState* state) {
|
||||
DCHECK_EQ(_partition_num, _builders.size());
|
||||
|
||||
if (_partition_num > 1 && hash_table_row_count() < _partition_join_min_rows) {
|
||||
RETURN_IF_ERROR(_convert_to_single_partition());
|
||||
if (_partition_num > 1) {
|
||||
if (!_need_partition_join_for_build(hash_table_row_count())) {
|
||||
RETURN_IF_ERROR(_convert_to_single_partition(state));
|
||||
} else {
|
||||
RETURN_IF_ERROR(_flush_buffer_chunks(state));
|
||||
}
|
||||
}
|
||||
|
||||
for (auto& builder : _builders) {
|
||||
|
|
@ -771,17 +978,20 @@ std::unique_ptr<HashJoinProberImpl> AdaptivePartitionHashJoinBuilder::create_pro
|
|||
}
|
||||
}
|
||||
|
||||
void AdaptivePartitionHashJoinBuilder::clone_readable(HashJoinBuilder* builder) {
|
||||
void AdaptivePartitionHashJoinBuilder::clone_readable(HashJoinBuilder* other_builder) {
|
||||
for (auto& builder : _builders) {
|
||||
DCHECK(builder->ready());
|
||||
}
|
||||
DCHECK(_ready);
|
||||
DCHECK_EQ(_partition_num, _builders.size());
|
||||
auto other = down_cast<AdaptivePartitionHashJoinBuilder*>(builder);
|
||||
auto other = down_cast<AdaptivePartitionHashJoinBuilder*>(other_builder);
|
||||
other->_builders.clear();
|
||||
other->_partition_num = _partition_num;
|
||||
other->_partition_join_max_rows = _partition_join_max_rows;
|
||||
other->_partition_join_min_rows = _partition_join_min_rows;
|
||||
other->_partition_join_l2_min_rows = _partition_join_l2_min_rows;
|
||||
other->_partition_join_l2_max_rows = _partition_join_l2_max_rows;
|
||||
other->_partition_join_l3_min_rows = _partition_join_l3_min_rows;
|
||||
other->_partition_join_l3_max_rows = _partition_join_l3_max_rows;
|
||||
other->_partition_join_l3_max_rows = _partition_join_l3_max_rows;
|
||||
other->_ready = _ready;
|
||||
for (size_t i = 0; i < _partition_num; ++i) {
|
||||
other->_builders.emplace_back(std::make_unique<SingleHashJoinBuilder>(_hash_joiner));
|
||||
|
|
|
|||
|
|
@ -92,11 +92,11 @@ public:
|
|||
virtual void create(const HashTableParam& param) = 0;
|
||||
|
||||
// append chunk to hash table
|
||||
Status append_chunk(const ChunkPtr& chunk) {
|
||||
Status append_chunk(RuntimeState* state, const ChunkPtr& chunk) {
|
||||
_inc_row_count(chunk->num_rows());
|
||||
return do_append_chunk(chunk);
|
||||
return do_append_chunk(state, chunk);
|
||||
}
|
||||
virtual Status do_append_chunk(const ChunkPtr& chunk) = 0;
|
||||
virtual Status do_append_chunk(RuntimeState* state, const ChunkPtr& chunk) = 0;
|
||||
|
||||
virtual Status build(RuntimeState* state) = 0;
|
||||
|
||||
|
|
@ -125,6 +125,7 @@ public:
|
|||
// clone readable to to builder
|
||||
virtual void clone_readable(HashJoinBuilder* builder) = 0;
|
||||
|
||||
virtual Status prepare_for_spill_start(RuntimeState* state) { return Status::OK(); }
|
||||
virtual ChunkPtr convert_to_spill_schema(const ChunkPtr& chunk) const = 0;
|
||||
|
||||
protected:
|
||||
|
|
@ -149,7 +150,7 @@ public:
|
|||
|
||||
void reset(const HashTableParam& param) override;
|
||||
|
||||
Status do_append_chunk(const ChunkPtr& chunk) override;
|
||||
Status do_append_chunk(RuntimeState* state, const ChunkPtr& chunk) override;
|
||||
|
||||
Status build(RuntimeState* state) override;
|
||||
|
||||
|
|
|
|||
|
|
@ -483,8 +483,8 @@ pipeline::OpFactories HashJoinNode::_decompose_to_pipeline(pipeline::PipelineBui
|
|||
HashJoinerParam param(pool, _hash_join_node, _is_null_safes, _build_expr_ctxs, _probe_expr_ctxs,
|
||||
_other_join_conjunct_ctxs, _conjunct_ctxs, child(1)->row_desc(), child(0)->row_desc(),
|
||||
child(1)->type(), child(0)->type(), child(1)->conjunct_ctxs().empty(), _build_runtime_filters,
|
||||
_output_slots, _output_slots, _distribution_mode, _enable_late_materialization,
|
||||
_enable_partition_hash_join, _is_skew_join);
|
||||
_output_slots, _output_slots, context->degree_of_parallelism(), _distribution_mode,
|
||||
_enable_late_materialization, _enable_partition_hash_join, _is_skew_join);
|
||||
auto hash_joiner_factory = std::make_shared<starrocks::pipeline::HashJoinerFactory>(param);
|
||||
|
||||
// Create a shared RefCountedRuntimeFilterCollector
|
||||
|
|
|
|||
|
|
@ -82,6 +82,7 @@ HashJoiner::HashJoiner(const HashJoinerParam& param)
|
|||
_probe_output_slots(param._probe_output_slots),
|
||||
_build_runtime_filters(param._build_runtime_filters.begin(), param._build_runtime_filters.end()),
|
||||
_enable_late_materialization(param._enable_late_materialization),
|
||||
_max_dop(param._max_dop),
|
||||
_is_skew_join(param._is_skew_join) {
|
||||
_is_push_down = param._hash_join_node.is_push_down;
|
||||
if (_join_type == TJoinOp::LEFT_ANTI_JOIN && param._hash_join_node.is_rewritten_from_not_in) {
|
||||
|
|
@ -178,7 +179,7 @@ void HashJoiner::_init_hash_table_param(HashTableParam* param, RuntimeState* sta
|
|||
}
|
||||
}
|
||||
}
|
||||
Status HashJoiner::append_chunk_to_ht(const ChunkPtr& chunk) {
|
||||
Status HashJoiner::append_chunk_to_ht(RuntimeState* state, const ChunkPtr& chunk) {
|
||||
if (_phase != HashJoinPhase::BUILD) {
|
||||
return Status::OK();
|
||||
}
|
||||
|
|
@ -187,7 +188,7 @@ Status HashJoiner::append_chunk_to_ht(const ChunkPtr& chunk) {
|
|||
}
|
||||
|
||||
update_build_rows(chunk->num_rows());
|
||||
return _hash_join_builder->append_chunk(chunk);
|
||||
return _hash_join_builder->append_chunk(state, chunk);
|
||||
}
|
||||
|
||||
Status HashJoiner::append_chunk_to_spill_buffer(RuntimeState* state, const ChunkPtr& chunk) {
|
||||
|
|
|
|||
|
|
@ -70,7 +70,7 @@ struct HashJoinerParam {
|
|||
const RowDescriptor& build_row_descriptor, const RowDescriptor& probe_row_descriptor,
|
||||
TPlanNodeType::type build_node_type, TPlanNodeType::type probe_node_type,
|
||||
bool build_conjunct_ctxs_is_empty, std::list<RuntimeFilterBuildDescriptor*> build_runtime_filters,
|
||||
std::set<SlotId> build_output_slots, std::set<SlotId> probe_output_slots,
|
||||
std::set<SlotId> build_output_slots, std::set<SlotId> probe_output_slots, size_t max_dop,
|
||||
const TJoinDistributionMode::type distribution_mode, bool enable_late_materialization,
|
||||
bool enable_partition_hash_join, bool is_skew_join)
|
||||
: _pool(pool),
|
||||
|
|
@ -88,6 +88,7 @@ struct HashJoinerParam {
|
|||
_build_runtime_filters(std::move(build_runtime_filters)),
|
||||
_build_output_slots(std::move(build_output_slots)),
|
||||
_probe_output_slots(std::move(probe_output_slots)),
|
||||
_max_dop(max_dop),
|
||||
_distribution_mode(distribution_mode),
|
||||
_enable_late_materialization(enable_late_materialization),
|
||||
_enable_partition_hash_join(enable_partition_hash_join),
|
||||
|
|
@ -113,6 +114,8 @@ struct HashJoinerParam {
|
|||
std::set<SlotId> _build_output_slots;
|
||||
std::set<SlotId> _probe_output_slots;
|
||||
|
||||
size_t _max_dop;
|
||||
|
||||
const TJoinDistributionMode::type _distribution_mode;
|
||||
const bool _enable_late_materialization;
|
||||
const bool _enable_partition_hash_join;
|
||||
|
|
@ -205,7 +208,7 @@ public:
|
|||
|
||||
void enter_eos_phase() { _phase = HashJoinPhase::EOS; }
|
||||
// build phase
|
||||
Status append_chunk_to_ht(const ChunkPtr& chunk);
|
||||
Status append_chunk_to_ht(RuntimeState* state, const ChunkPtr& chunk);
|
||||
|
||||
Status append_chunk_to_spill_buffer(RuntimeState* state, const ChunkPtr& chunk);
|
||||
|
||||
|
|
@ -343,6 +346,9 @@ public:
|
|||
return DeferOp([this]() { _probe_observable.notify_source_observers(); });
|
||||
}
|
||||
|
||||
size_t max_dop() const { return _max_dop; }
|
||||
TJoinDistributionMode::type distribution_mode() const { return _hash_join_node.distribution_mode; }
|
||||
|
||||
private:
|
||||
static bool _has_null(const ColumnPtr& column);
|
||||
|
||||
|
|
@ -361,7 +367,7 @@ private:
|
|||
const_column->data_column()->assign(chunk->num_rows(), 0);
|
||||
key_columns.emplace_back(const_column->data_column());
|
||||
} else {
|
||||
key_columns.emplace_back(column_ptr);
|
||||
key_columns.emplace_back(std::move(column_ptr));
|
||||
}
|
||||
}
|
||||
return Status::OK();
|
||||
|
|
@ -483,6 +489,8 @@ private:
|
|||
pipeline::Observable _builder_observable;
|
||||
pipeline::Observable _probe_observable;
|
||||
|
||||
size_t _max_dop = 0;
|
||||
|
||||
bool _is_skew_join = false;
|
||||
};
|
||||
|
||||
|
|
|
|||
|
|
@ -657,6 +657,21 @@ void JoinHashTable::merge_ht(const JoinHashTable& ht) {
|
|||
}
|
||||
columns[i]->append(*other_columns[i], 1, other_columns[i]->size() - 1);
|
||||
}
|
||||
|
||||
auto& key_columns = _table_items->key_columns;
|
||||
auto& other_key_columns = ht._table_items->key_columns;
|
||||
for (size_t i = 0; i < key_columns.size(); i++) {
|
||||
// If the join key is slot ref, will get from build chunk directly,
|
||||
// otherwise will append from key_column of input
|
||||
if (_table_items->join_keys[i].col_ref == nullptr) {
|
||||
// upgrade to nullable column
|
||||
if (!key_columns[i]->is_nullable() && other_key_columns[i]->is_nullable()) {
|
||||
const size_t row_count = key_columns[i]->size();
|
||||
key_columns[i] = NullableColumn::create(key_columns[i], NullColumn::create(row_count, 0));
|
||||
}
|
||||
key_columns[i]->append(*other_key_columns[i]);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
ChunkPtr JoinHashTable::convert_to_spill_schema(const ChunkPtr& chunk) const {
|
||||
|
|
|
|||
|
|
@ -38,7 +38,7 @@ HashJoinBuildOperator::HashJoinBuildOperator(OperatorFactory* factory, int32_t i
|
|||
_distribution_mode(distribution_mode) {}
|
||||
|
||||
Status HashJoinBuildOperator::push_chunk(RuntimeState* state, const ChunkPtr& chunk) {
|
||||
return _join_builder->append_chunk_to_ht(chunk);
|
||||
return _join_builder->append_chunk_to_ht(state, chunk);
|
||||
}
|
||||
|
||||
Status HashJoinBuildOperator::prepare(RuntimeState* state) {
|
||||
|
|
|
|||
|
|
@ -85,6 +85,7 @@ Status SpillableHashJoinBuildOperator::set_finishing(RuntimeState* state) {
|
|||
if (!_join_builder->spiller()->spilled()) {
|
||||
DCHECK(_is_first_time_spill);
|
||||
_is_first_time_spill = false;
|
||||
RETURN_IF_ERROR(_join_builder->hash_join_builder()->prepare_for_spill_start(runtime_state()));
|
||||
RETURN_IF_ERROR(init_spiller_partitions(state, _join_builder->hash_join_builder()));
|
||||
ASSIGN_OR_RETURN(_hash_table_slice_iterator, _convert_hash_map_to_chunk());
|
||||
RETURN_IF_ERROR(_join_builder->append_spill_task(state, _hash_table_slice_iterator));
|
||||
|
|
@ -201,6 +202,7 @@ Status SpillableHashJoinBuildOperator::push_chunk(RuntimeState* state, const Chu
|
|||
|
||||
// Estimate the appropriate number of partitions
|
||||
if (_is_first_time_spill) {
|
||||
RETURN_IF_ERROR(_join_builder->hash_join_builder()->prepare_for_spill_start(runtime_state()));
|
||||
RETURN_IF_ERROR(init_spiller_partitions(state, _join_builder->hash_join_builder()));
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -280,7 +280,7 @@ Status SpillableHashJoinProbeOperator::_load_partition_build_side(workgroup::Yie
|
|||
|
||||
if (chunk_st.ok() && chunk_st.value() != nullptr && !chunk_st.value()->is_empty()) {
|
||||
int64_t old_mem_usage = hash_table_mem_usage;
|
||||
RETURN_IF_ERROR(builder->append_chunk(std::move(chunk_st.value())));
|
||||
RETURN_IF_ERROR(builder->append_chunk(state, std::move(chunk_st.value())));
|
||||
hash_table_mem_usage = builder->ht_mem_usage();
|
||||
COUNTER_ADD(metrics.build_partition_peak_memory_usage, hash_table_mem_usage - old_mem_usage);
|
||||
} else if (chunk_st.status().is_end_of_file()) {
|
||||
|
|
|
|||
|
|
@ -0,0 +1,133 @@
|
|||
-- name: test_partition_hash_join
|
||||
CREATE TABLE __row_util_base (
|
||||
k1 bigint NULL
|
||||
) ENGINE=OLAP
|
||||
DUPLICATE KEY(`k1`)
|
||||
DISTRIBUTED BY HASH(`k1`) BUCKETS 32
|
||||
PROPERTIES (
|
||||
"replication_num" = "1"
|
||||
);
|
||||
-- result:
|
||||
-- !result
|
||||
insert into __row_util_base select generate_series from TABLE(generate_series(0, 10000 - 1));
|
||||
-- result:
|
||||
-- !result
|
||||
insert into __row_util_base select * from __row_util_base; -- 20000
|
||||
insert into __row_util_base select * from __row_util_base; -- 40000
|
||||
insert into __row_util_base select * from __row_util_base; -- 80000
|
||||
insert into __row_util_base select * from __row_util_base; -- 160000
|
||||
insert into __row_util_base select * from __row_util_base; -- 320000
|
||||
insert into __row_util_base select * from __row_util_base; -- 640000
|
||||
insert into __row_util_base select * from __row_util_base; -- 1280000
|
||||
insert into __row_util_base select * from __row_util_base; -- 2560000
|
||||
insert into __row_util_base select * from __row_util_base; -- 5120000
|
||||
CREATE TABLE __row_util (
|
||||
idx bigint NULL
|
||||
) ENGINE=OLAP
|
||||
DUPLICATE KEY(`idx`)
|
||||
DISTRIBUTED BY HASH(`idx`) BUCKETS 32
|
||||
PROPERTIES (
|
||||
"replication_num" = "1"
|
||||
);
|
||||
-- result:
|
||||
-- !result
|
||||
insert into __row_util select row_number() over() as idx from __row_util_base;
|
||||
-- result:
|
||||
-- !result
|
||||
CREATE TABLE t1 (
|
||||
k1 bigint NULL,
|
||||
|
||||
c_bigint_null bigint NULL,
|
||||
c_string STRING
|
||||
) ENGINE=OLAP
|
||||
DUPLICATE KEY(`k1`)
|
||||
DISTRIBUTED BY HASH(`k1`) BUCKETS 96
|
||||
PROPERTIES (
|
||||
"replication_num" = "1"
|
||||
);
|
||||
-- result:
|
||||
-- !result
|
||||
insert into t1
|
||||
select
|
||||
idx,
|
||||
idx, -- c_int
|
||||
uuid() -- c_string
|
||||
from __row_util;
|
||||
-- result:
|
||||
-- !result
|
||||
select count(tt1.k1) from t1 tt1 join [broadcast] t1 tt2 using(c_bigint_null);
|
||||
-- result:
|
||||
5120000
|
||||
-- !result
|
||||
select count(tt1.k1) from t1 tt1 join [broadcast] t1 tt2 using(c_bigint_null) where tt1.k1 % 2 = 0;
|
||||
-- result:
|
||||
2560000
|
||||
-- !result
|
||||
select count(tt1.k1) from t1 tt1 join [broadcast] t1 tt2 using(c_bigint_null) where tt1.k1 % 3 = 0;
|
||||
-- result:
|
||||
1706666
|
||||
-- !result
|
||||
select count(tt1.k1) from t1 tt1 join [broadcast] t1 tt2 using(c_bigint_null) where tt1.k1 % 5 = 0;
|
||||
-- result:
|
||||
1024000
|
||||
-- !result
|
||||
select count(tt1.k1) from t1 tt1 join [broadcast] t1 tt2 using(c_bigint_null) where tt1.k1 % 10 = 0;
|
||||
-- result:
|
||||
512000
|
||||
-- !result
|
||||
select count(tt1.k1) from t1 tt1 join [broadcast] t1 tt2 using(c_bigint_null) where tt1.k1 % 100 = 0;
|
||||
-- result:
|
||||
51200
|
||||
-- !result
|
||||
select count(tt1.c_string) from t1 tt1 join [broadcast] t1 tt2 using(c_bigint_null);
|
||||
-- result:
|
||||
5120000
|
||||
-- !result
|
||||
select count(tt1.c_string) from t1 tt1 join [broadcast] t1 tt2 using(c_bigint_null) where tt1.k1 % 2 = 0;
|
||||
-- result:
|
||||
2560000
|
||||
-- !result
|
||||
select count(tt1.c_string) from t1 tt1 join [broadcast] t1 tt2 using(c_bigint_null) where tt1.k1 % 3 = 0;
|
||||
-- result:
|
||||
1706666
|
||||
-- !result
|
||||
select count(tt1.c_string) from t1 tt1 join [broadcast] t1 tt2 using(c_bigint_null) where tt1.k1 % 5 = 0;
|
||||
-- result:
|
||||
1024000
|
||||
-- !result
|
||||
select count(tt1.c_string) from t1 tt1 join [broadcast] t1 tt2 using(c_bigint_null) where tt1.k1 % 10 = 0;
|
||||
-- result:
|
||||
512000
|
||||
-- !result
|
||||
select count(tt1.c_string) from t1 tt1 join [broadcast] t1 tt2 using(c_bigint_null) where tt1.k1 % 100 = 0;
|
||||
-- result:
|
||||
51200
|
||||
-- !result
|
||||
select count(tt2.c_string) from t1 tt1 join [broadcast] t1 tt2 using(c_bigint_null);
|
||||
-- result:
|
||||
5120000
|
||||
-- !result
|
||||
select count(tt2.c_string) from t1 tt1 join [broadcast] t1 tt2 using(c_bigint_null) where tt1.k1 % 2 = 0;
|
||||
-- result:
|
||||
2560000
|
||||
-- !result
|
||||
select count(tt2.c_string) from t1 tt1 join [broadcast] t1 tt2 using(c_bigint_null) where tt1.k1 % 3 = 0;
|
||||
-- result:
|
||||
1706666
|
||||
-- !result
|
||||
select count(tt2.c_string) from t1 tt1 join [broadcast] t1 tt2 using(c_bigint_null) where tt1.k1 % 5 = 0;
|
||||
-- result:
|
||||
1024000
|
||||
-- !result
|
||||
select count(tt2.c_string) from t1 tt1 join [broadcast] t1 tt2 using(c_bigint_null) where tt1.k1 % 10 = 0;
|
||||
-- result:
|
||||
512000
|
||||
-- !result
|
||||
select count(tt2.c_string) from t1 tt1 join [broadcast] t1 tt2 using(c_bigint_null) where tt1.k1 % 100 = 0;
|
||||
-- result:
|
||||
51200
|
||||
-- !result
|
||||
select count(tt2.c_string) from t1 tt1 join [broadcast] t1 tt2 using(c_string);
|
||||
-- result:
|
||||
5120000
|
||||
-- !result
|
||||
|
|
@ -0,0 +1,74 @@
|
|||
-- name: test_partition_hash_join
|
||||
|
||||
CREATE TABLE __row_util_base (
|
||||
k1 bigint NULL
|
||||
) ENGINE=OLAP
|
||||
DUPLICATE KEY(`k1`)
|
||||
DISTRIBUTED BY HASH(`k1`) BUCKETS 32
|
||||
PROPERTIES (
|
||||
"replication_num" = "1"
|
||||
);
|
||||
insert into __row_util_base select generate_series from TABLE(generate_series(0, 10000 - 1));
|
||||
insert into __row_util_base select * from __row_util_base; -- 20000
|
||||
insert into __row_util_base select * from __row_util_base; -- 40000
|
||||
insert into __row_util_base select * from __row_util_base; -- 80000
|
||||
insert into __row_util_base select * from __row_util_base; -- 160000
|
||||
insert into __row_util_base select * from __row_util_base; -- 320000
|
||||
insert into __row_util_base select * from __row_util_base; -- 640000
|
||||
insert into __row_util_base select * from __row_util_base; -- 1280000
|
||||
insert into __row_util_base select * from __row_util_base; -- 2560000
|
||||
insert into __row_util_base select * from __row_util_base; -- 5120000
|
||||
CREATE TABLE __row_util (
|
||||
idx bigint NULL
|
||||
) ENGINE=OLAP
|
||||
DUPLICATE KEY(`idx`)
|
||||
DISTRIBUTED BY HASH(`idx`) BUCKETS 32
|
||||
PROPERTIES (
|
||||
"replication_num" = "1"
|
||||
);
|
||||
insert into __row_util select row_number() over() as idx from __row_util_base;
|
||||
|
||||
|
||||
CREATE TABLE t1 (
|
||||
k1 bigint NULL,
|
||||
|
||||
c_bigint_null bigint NULL,
|
||||
c_string STRING
|
||||
) ENGINE=OLAP
|
||||
DUPLICATE KEY(`k1`)
|
||||
DISTRIBUTED BY HASH(`k1`) BUCKETS 96
|
||||
PROPERTIES (
|
||||
"replication_num" = "1"
|
||||
);
|
||||
|
||||
insert into t1
|
||||
select
|
||||
idx,
|
||||
idx, -- c_int
|
||||
uuid() -- c_string
|
||||
from __row_util;
|
||||
|
||||
select count(tt1.k1) from t1 tt1 join [broadcast] t1 tt2 using(c_bigint_null);
|
||||
select count(tt1.k1) from t1 tt1 join [broadcast] t1 tt2 using(c_bigint_null) where tt1.k1 % 2 = 0;
|
||||
select count(tt1.k1) from t1 tt1 join [broadcast] t1 tt2 using(c_bigint_null) where tt1.k1 % 3 = 0;
|
||||
select count(tt1.k1) from t1 tt1 join [broadcast] t1 tt2 using(c_bigint_null) where tt1.k1 % 5 = 0;
|
||||
select count(tt1.k1) from t1 tt1 join [broadcast] t1 tt2 using(c_bigint_null) where tt1.k1 % 10 = 0;
|
||||
select count(tt1.k1) from t1 tt1 join [broadcast] t1 tt2 using(c_bigint_null) where tt1.k1 % 100 = 0;
|
||||
|
||||
|
||||
select count(tt1.c_string) from t1 tt1 join [broadcast] t1 tt2 using(c_bigint_null);
|
||||
select count(tt1.c_string) from t1 tt1 join [broadcast] t1 tt2 using(c_bigint_null) where tt1.k1 % 2 = 0;
|
||||
select count(tt1.c_string) from t1 tt1 join [broadcast] t1 tt2 using(c_bigint_null) where tt1.k1 % 3 = 0;
|
||||
select count(tt1.c_string) from t1 tt1 join [broadcast] t1 tt2 using(c_bigint_null) where tt1.k1 % 5 = 0;
|
||||
select count(tt1.c_string) from t1 tt1 join [broadcast] t1 tt2 using(c_bigint_null) where tt1.k1 % 10 = 0;
|
||||
select count(tt1.c_string) from t1 tt1 join [broadcast] t1 tt2 using(c_bigint_null) where tt1.k1 % 100 = 0;
|
||||
|
||||
select count(tt2.c_string) from t1 tt1 join [broadcast] t1 tt2 using(c_bigint_null);
|
||||
select count(tt2.c_string) from t1 tt1 join [broadcast] t1 tt2 using(c_bigint_null) where tt1.k1 % 2 = 0;
|
||||
select count(tt2.c_string) from t1 tt1 join [broadcast] t1 tt2 using(c_bigint_null) where tt1.k1 % 3 = 0;
|
||||
select count(tt2.c_string) from t1 tt1 join [broadcast] t1 tt2 using(c_bigint_null) where tt1.k1 % 5 = 0;
|
||||
select count(tt2.c_string) from t1 tt1 join [broadcast] t1 tt2 using(c_bigint_null) where tt1.k1 % 10 = 0;
|
||||
select count(tt2.c_string) from t1 tt1 join [broadcast] t1 tt2 using(c_bigint_null) where tt1.k1 % 100 = 0;
|
||||
|
||||
select count(tt2.c_string) from t1 tt1 join [broadcast] t1 tt2 using(c_string);
|
||||
|
||||
Loading…
Reference in New Issue