[BugFix]runtime filter partition are bucket aware (backport #62191) (#62208)

Signed-off-by: zombee0 <ewang2027@gmail.com>
Co-authored-by: zombee0 <ewang2027@gmail.com>
This commit is contained in:
mergify[bot] 2025-08-22 14:03:02 +08:00 committed by GitHub
parent 4fe0b95635
commit b44663ddfc
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
19 changed files with 272 additions and 52 deletions

View File

@ -57,6 +57,7 @@ set(EXEC_FILES
aggregate/distinct_blocking_node.cpp
aggregate/aggregate_streaming_node.cpp
aggregate/distinct_streaming_node.cpp
partition/bucket_aware_partition.cpp
partition/chunks_partitioner.cpp
partition/partition_hash_variant.cpp
analytic_node.cpp

View File

@ -0,0 +1,63 @@
// Copyright 2021-present StarRocks, Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "exec/partition/bucket_aware_partition.h"
#include "column/nullable_column.h"
#include "gutil/casts.h"
namespace starrocks {
void calc_hash_values_and_bucket_ids(const std::vector<const Column*>& partitions_columns,
BucketAwarePartitionCtx ctx) {
size_t num_rows = partitions_columns[0]->size();
const auto& bucket_properties = ctx.bucket_properties;
auto& hash_values = ctx.hash_values;
auto& bucket_ids = ctx.bucket_ids;
auto& round_hashes = ctx.round_hashes;
auto& round_ids = ctx.round_ids;
hash_values.assign(num_rows, 0);
bucket_ids.assign(num_rows, 0);
for (int i = 0; i < partitions_columns.size(); ++i) {
// TODO, enhance it if we try to support more bucket functions.
DCHECK(bucket_properties[i].bucket_func == TBucketFunction::MURMUR3_X86_32);
round_hashes.assign(num_rows, 0);
round_ids.assign(num_rows, 0);
partitions_columns[i]->murmur_hash3_x86_32(&round_hashes[0], 0, num_rows);
for (int j = 0; j < num_rows; j++) {
hash_values[j] ^= round_hashes[j];
round_ids[j] = (round_hashes[j] & std::numeric_limits<int>::max()) % bucket_properties[i].bucket_num;
}
if (partitions_columns[i]->has_null()) {
const auto& null_data = down_cast<const NullableColumn*>(partitions_columns[i])->null_column()->get_data();
for (int j = 0; j < num_rows; j++) {
round_ids[j] = null_data[j] ? bucket_properties[i].bucket_num : round_ids[j];
}
}
if (i == partitions_columns.size() - 1) {
for (int j = 0; j < num_rows; j++) {
bucket_ids[j] += round_ids[j];
}
} else {
for (int j = 0; j < num_rows; j++) {
// bucket mapping, same behavior as FE
bucket_ids[j] = (round_ids[j] + bucket_ids[j]) * (bucket_properties[i + 1].bucket_num + 1);
}
}
}
}
} // namespace starrocks

View File

@ -0,0 +1,42 @@
// Copyright 2021-present StarRocks, Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#pragma once
#include <vector>
#include "column/column.h"
#include "gen_cpp/Partitions_types.h"
namespace starrocks {
struct BucketAwarePartitionCtx {
BucketAwarePartitionCtx(const std::vector<TBucketProperty>& bucket_properties, std::vector<uint32_t>& hash_values,
std::vector<uint32_t>& round_hashes, std::vector<uint32_t>& bucket_ids,
std::vector<uint32_t>& round_ids)
: bucket_properties(bucket_properties),
hash_values(hash_values),
round_hashes(round_hashes),
bucket_ids(bucket_ids),
round_ids(round_ids) {}
const std::vector<TBucketProperty>& bucket_properties;
std::vector<uint32_t>& hash_values;
std::vector<uint32_t>& round_hashes;
std::vector<uint32_t>& bucket_ids;
std::vector<uint32_t>& round_ids;
};
void calc_hash_values_and_bucket_ids(const std::vector<const Column*>& partitions_columns, BucketAwarePartitionCtx ctx);
} // namespace starrocks

View File

@ -23,6 +23,7 @@
#include <utility>
#include "common/config.h"
#include "exec/partition/bucket_aware_partition.h"
#include "exec/pipeline/exchange/shuffler.h"
#include "exec/pipeline/exchange/sink_buffer.h"
#include "exprs/expr.h"
@ -640,38 +641,13 @@ Status ExchangeSinkOperator::push_chunk(RuntimeState* state, const ChunkPtr& chu
}
void ExchangeSinkOperator::_calc_hash_values_and_bucket_ids() {
size_t num_rows = _partitions_columns[0]->size();
_hash_values.assign(num_rows, 0);
_bucket_ids.assign(num_rows, 0);
for (int i = 0; i < _partitions_columns.size(); ++i) {
// TODO, enhance it if we try to support more bucket functions.
DCHECK(_bucket_properties[i].bucket_func == TBucketFunction::MURMUR3_X86_32);
_round_hashes.assign(num_rows, 0);
_round_ids.assign(num_rows, 0);
_partitions_columns[i]->murmur_hash3_x86_32(&_round_hashes[0], 0, num_rows);
for (int j = 0; j < num_rows; j++) {
_hash_values[j] ^= _round_hashes[j];
_round_ids[j] = (_round_hashes[j] & std::numeric_limits<int>::max()) % _bucket_properties[i].bucket_num;
}
if (_partitions_columns[i]->has_null()) {
const auto& null_data =
down_cast<const NullableColumn*>(_partitions_columns[i].get())->null_column()->get_data();
for (int j = 0; j < num_rows; j++) {
_round_ids[j] = null_data[j] ? _bucket_properties[i].bucket_num : _round_ids[j];
}
}
if (i == _partitions_columns.size() - 1) {
for (int j = 0; j < num_rows; j++) {
_bucket_ids[j] += _round_ids[j];
}
} else {
for (int j = 0; j < num_rows; j++) {
// bucket mapping, same behavior as FE
_bucket_ids[j] = (_round_ids[j] + _bucket_ids[j]) * (_bucket_properties[i + 1].bucket_num + 1);
}
}
std::vector<const Column*> partitions_columns;
for (size_t i = 0; i < _partitions_columns.size(); i++) {
partitions_columns.emplace_back(_partitions_columns[i].get());
}
BucketAwarePartitionCtx bctx(_bucket_properties, _hash_values, _round_hashes, _bucket_ids, _round_ids);
calc_hash_values_and_bucket_ids(partitions_columns, bctx);
}
void ExchangeSinkOperator::update_metrics(RuntimeState* state) {

View File

@ -26,6 +26,7 @@
#include "column/vectorized_fwd.h"
#include "common/global_types.h"
#include "common/object_pool.h"
#include "exec/partition/bucket_aware_partition.h"
#include "exec/pipeline/exchange/shuffler.h"
#include "exprs/runtime_filter_layout.h"
#include "gen_cpp/PlanNodes_types.h"
@ -246,6 +247,9 @@ public:
bool use_merged_selection;
bool compatibility = true;
std::vector<uint32_t> hash_values;
std::vector<uint32_t> round_hashes;
std::vector<uint32_t> bucket_ids;
std::vector<uint32_t> round_ids;
};
virtual RuntimeFilterSerializeType type() const { return RuntimeFilterSerializeType::NONE; }
@ -359,6 +363,12 @@ concept HashValueForEachFunc = requires(F f, size_t index, uint32_t& hash_value)
->std::same_as<void>;
};
template <typename F>
concept HashValueAndBucketIdForEachFunc = requires(F f, size_t index, uint32_t& hash_value, uint32_t& bucket_id) {
{ f(index, hash_value, bucket_id) }
->std::same_as<void>;
};
struct FullScanIterator {
typedef void (Column::*HashFuncType)(uint32_t*, uint32_t, uint32_t) const;
static constexpr HashFuncType FNV_HASH = &Column::fnv_hash;
@ -384,6 +394,22 @@ struct FullScanIterator {
size_t num_rows;
};
struct BucketAwareFullScanIterator {
BucketAwareFullScanIterator(BucketAwarePartitionCtx& ctx, size_t num_rows) : ctx(ctx), num_rows(num_rows) {}
template <HashValueAndBucketIdForEachFunc ForEachFuncType>
void for_each(ForEachFuncType func) {
for (size_t i = 0; i < num_rows; i++) {
func(i, ctx.hash_values[i], ctx.bucket_ids[i]);
}
}
void compute_hash(const std::vector<const Column*>& columns) { calc_hash_values_and_bucket_ids(columns, ctx); }
BucketAwarePartitionCtx& ctx;
size_t num_rows;
};
struct SelectionIterator {
typedef void (Column::*HashFuncType)(uint32_t*, uint8_t*, uint16_t, uint16_t) const;
static constexpr HashFuncType FNV_HASH = &Column::fnv_hash_with_selection;
@ -505,6 +531,41 @@ struct WithModuloArg {
};
};
template <>
struct WithModuloArg<ModuloOp, BucketAwareFullScanIterator> {
template <TRuntimeFilterLayoutMode::type M>
struct HashValueCompute {
void operator()(const RuntimeFilterLayout& layout, const std::vector<const Column*>& columns,
size_t real_num_partitions, BucketAwareFullScanIterator iterator) const {
if constexpr (layout_is_singleton<M>) {
iterator.for_each([&](size_t i, uint32_t& hash_value, uint32_t& bucket_id) { hash_value = 0; });
return;
}
if constexpr (layout_is_bucket<M>) {
[[maybe_unused]] const auto& bucketseq_to_instance = layout.bucketseq_to_instance();
[[maybe_unused]] const auto num_drivers_per_instance = layout.num_drivers_per_instance();
iterator.compute_hash(columns);
iterator.for_each([&](size_t i, uint32_t& hash_value, uint32_t& bucket_id) {
if constexpr (layout_is_pipeline_bucket_lx<M>) {
hash_value = ModuloOp()(HashUtil::xorshift32(hash_value), num_drivers_per_instance);
} else if constexpr (layout_is_global_bucket_1l<M>) {
hash_value = bucketseq_to_instance[bucket_id];
} else if constexpr (layout_is_global_bucket_2l_lx<M>) {
const auto instance = bucketseq_to_instance[bucket_id];
const auto driverseq = ModuloOp()(HashUtil::xorshift32(hash_value), num_drivers_per_instance);
hash_value = (instance == BUCKET_ABSENT) ? BUCKET_ABSENT
: instance * num_drivers_per_instance + driverseq;
} else {
DCHECK(false);
}
});
} else {
DCHECK(false);
}
}
};
};
template <LogicalType Type>
class MinMaxRuntimeFilter final : public RuntimeFilter {
public:
@ -1275,8 +1336,16 @@ public:
dispatch_layout<WithModuloArg<ReduceOp, FullScanIterator>::HashValueCompute>(
_global, layout, columns, _hash_partition_bf.size(), FullScanIterator(_hash_values, num_rows));
} else {
dispatch_layout<WithModuloArg<ModuloOp, FullScanIterator>::HashValueCompute>(
_global, layout, columns, _hash_partition_bf.size(), FullScanIterator(_hash_values, num_rows));
if (layout.bucket_properties().empty()) {
dispatch_layout<WithModuloArg<ModuloOp, FullScanIterator>::HashValueCompute>(
_global, layout, columns, _hash_partition_bf.size(), FullScanIterator(_hash_values, num_rows));
} else {
BucketAwarePartitionCtx bctx(layout.bucket_properties(), ctx->hash_values, ctx->round_hashes,
ctx->bucket_ids, ctx->round_ids);
dispatch_layout<WithModuloArg<ModuloOp, BucketAwareFullScanIterator>::HashValueCompute>(
_global, layout, columns, _hash_partition_bf.size(),
BucketAwareFullScanIterator(bctx, num_rows));
}
}
}
void compute_partition_index(const RuntimeFilterLayout& layout, const std::vector<const Column*>& columns,

View File

@ -33,6 +33,9 @@ void RuntimeFilterLayout::init(const TRuntimeFilterLayout& layout) {
if (layout.__isset.bucketseq_to_partition) {
this->_bucketseq_to_partition = layout.bucketseq_to_partition;
}
if (layout.__isset.bucket_properties) {
this->_bucket_properties = layout.bucket_properties;
}
}
void RuntimeFilterLayout::init(int filter_id, const std::vector<int32_t>& bucketseq_to_instance) {

View File

@ -37,6 +37,7 @@ public:
const std::vector<int32_t>& bucketseq_to_driverseq() const { return _bucketseq_to_driverseq; }
const std::vector<int32_t>& bucketseq_to_partition() const { return _bucketseq_to_partition; }
const std::vector<TBucketProperty>& bucket_properties() const { return _bucket_properties; }
protected:
int _filter_id;
@ -48,6 +49,7 @@ protected:
std::vector<int32_t> _bucketseq_to_instance;
std::vector<int32_t> _bucketseq_to_driverseq;
std::vector<int32_t> _bucketseq_to_partition;
std::vector<TBucketProperty> _bucket_properties;
};
class WithLayoutMixin {

View File

@ -247,6 +247,7 @@ public class IcebergScanNode extends ScanNode {
this.bucketProperties = Optional.of(bucketProperties);
}
@Override
public Optional<List<BucketProperty>> getBucketProperties() {
return this.bucketProperties;
}

View File

@ -80,6 +80,7 @@ import com.starrocks.common.FeConstants;
import com.starrocks.common.Pair;
import com.starrocks.common.StarRocksException;
import com.starrocks.common.VectorSearchOptions;
import com.starrocks.connector.BucketProperty;
import com.starrocks.lake.LakeTablet;
import com.starrocks.persist.ColumnIdExpr;
import com.starrocks.qe.ConnectContext;
@ -326,6 +327,11 @@ public class OlapScanNode extends ScanNode {
return bucketNum;
}
@Override
public Optional<List<BucketProperty>> getBucketProperties() throws StarRocksException {
return Optional.empty();
}
public void setBucketExprs(List<Expr> bucketExprs) {
this.bucketExprs = bucketExprs;
}

View File

@ -18,8 +18,10 @@ import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.starrocks.analysis.Expr;
import com.starrocks.analysis.SortInfo;
import com.starrocks.connector.BucketProperty;
import com.starrocks.qe.ConnectContext;
import com.starrocks.qe.SessionVariable;
import com.starrocks.thrift.TBucketProperty;
import com.starrocks.thrift.TNetworkAddress;
import com.starrocks.thrift.TRuntimeFilterBuildJoinMode;
import com.starrocks.thrift.TRuntimeFilterBuildType;
@ -105,6 +107,7 @@ public class RuntimeFilterDescription {
private List<Integer> bucketSeqToInstance = Lists.newArrayList();
private List<Integer> bucketSeqToDriverSeq = Lists.newArrayList();
private List<Integer> bucketSeqToPartition = Lists.newArrayList();
private List<BucketProperty> bucketProperties = Lists.newArrayList();
// partitionByExprs are used for computing partition ids in probe side when
// join's equal conjuncts size > 1.
private final Map<Integer, List<Expr>> nodeIdToParitionByExprs = Maps.newHashMap();
@ -393,6 +396,10 @@ public class RuntimeFilterDescription {
this.bucketSeqToPartition = bucketSeqToPartition;
}
public void setBucketProperties(List<BucketProperty> bucketProperties) {
this.bucketProperties = bucketProperties;
}
public List<Integer> getBucketSeqToInstance() {
return this.bucketSeqToInstance;
}
@ -566,6 +573,13 @@ public class RuntimeFilterDescription {
if (bucketSeqToPartition != null && !bucketSeqToPartition.isEmpty()) {
layout.setBucketseq_to_partition(bucketSeqToPartition);
}
if (bucketProperties != null && !bucketProperties.isEmpty()) {
List<TBucketProperty> tBucketProperties = new ArrayList<>();
for (BucketProperty bucketProperty : bucketProperties) {
tBucketProperties.add(bucketProperty.toThrift());
}
layout.setBucket_properties(tBucketProperties);
}
return layout;
}

View File

@ -41,6 +41,7 @@ import com.starrocks.analysis.SlotDescriptor;
import com.starrocks.analysis.TupleDescriptor;
import com.starrocks.catalog.ColumnAccessPath;
import com.starrocks.common.StarRocksException;
import com.starrocks.connector.BucketProperty;
import com.starrocks.connector.RemoteFilesSampleStrategy;
import com.starrocks.datacache.DataCacheOptions;
import com.starrocks.server.WarehouseManager;
@ -55,6 +56,7 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
@ -130,6 +132,10 @@ public abstract class ScanNode extends PlanNode {
throw new StarRocksException("Error when using bucket-aware execution");
}
public Optional<List<BucketProperty>> getBucketProperties() throws StarRocksException {
throw new StarRocksException("Error when using bucket-aware execution");
}
public boolean isLocalNativeTable() {
return false;
}

View File

@ -19,6 +19,7 @@ import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.starrocks.common.StarRocksException;
import com.starrocks.connector.BucketProperty;
import com.starrocks.planner.OlapScanNode;
import com.starrocks.planner.PlanNodeId;
import com.starrocks.planner.ScanNode;
@ -38,6 +39,7 @@ import java.util.List;
import java.util.Map;
import java.util.NavigableSet;
import java.util.NoSuchElementException;
import java.util.Optional;
import java.util.Set;
import java.util.TreeSet;
import java.util.function.Function;
@ -178,14 +180,14 @@ public class ColocatedBackendSelector implements BackendSelector {
new ColocatedBackendSelector.BucketSeqToScanRange();
private final Map<Long, Integer> backendIdToBucketCount = Maps.newHashMap();
private final int bucketNum;
private final Optional<List<BucketProperty>> bucketProperties;
private final int numScanNodes;
private final ScanRangeType type;
private final Set<PlanNodeId> assignedScanNodeIds = Sets.newHashSet();
public Assignment(int bucketNum, int numScanNodes, ScanRangeType type) {
public Assignment(int bucketNum, int numScanNodes, Optional<List<BucketProperty>> bucketProperties) {
this.numScanNodes = numScanNodes;
this.bucketNum = bucketNum;
this.type = type;
this.bucketProperties = bucketProperties;
}
public Map<Integer, Long> getSeqToWorkerId() {
@ -200,8 +202,12 @@ public class ColocatedBackendSelector implements BackendSelector {
return bucketNum;
}
public Optional<List<BucketProperty>> getBucketProperties() {
return bucketProperties;
}
public boolean isNative() {
return type.equals(ScanRangeType.NATIVE);
return bucketProperties.isEmpty();
}
public void recordAssignedScanNode(ScanNode scanNode) {

View File

@ -19,9 +19,9 @@ import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.starrocks.common.StarRocksException;
import com.starrocks.common.util.DebugUtil;
import com.starrocks.connector.BucketProperty;
import com.starrocks.planner.ExchangeNode;
import com.starrocks.planner.JoinNode;
import com.starrocks.planner.OlapScanNode;
import com.starrocks.planner.PlanFragment;
import com.starrocks.planner.PlanFragmentId;
import com.starrocks.planner.PlanNode;
@ -46,6 +46,7 @@ import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Random;
import java.util.stream.Collectors;
@ -77,6 +78,7 @@ public class ExecutionFragment {
public List<Integer> bucketSeqToInstance;
public List<Integer> bucketSeqToDriverSeq;
public List<Integer> bucketSeqToPartition;
public Optional<List<BucketProperty>> bucketProperties;
}
private BucketSeqAssignment cachedBucketSeqAssignment = null;
private boolean bucketSeqToInstanceForFilterIsSet = false;
@ -157,6 +159,7 @@ public class ExecutionFragment {
rf.setBucketSeqToInstance(bucketSeqAssignment.bucketSeqToInstance);
rf.setBucketSeqToDriverSeq(bucketSeqAssignment.bucketSeqToDriverSeq);
rf.setBucketSeqToPartition(bucketSeqAssignment.bucketSeqToPartition);
bucketSeqAssignment.bucketProperties.ifPresent(rf::setBucketProperties);
}
}
@ -172,15 +175,10 @@ public class ExecutionFragment {
throws StarRocksException {
if (colocatedAssignment == null) {
final int numScanNodes = scanNodes.size();
ColocatedBackendSelector.Assignment.ScanRangeType type;
if (scanNode instanceof OlapScanNode olapScanNode) {
type = ColocatedBackendSelector.Assignment.ScanRangeType.NATIVE;
} else {
type = ColocatedBackendSelector.Assignment.ScanRangeType.NONNATIVE;
}
int bucketNum = scanNode.getBucketNums();
colocatedAssignment = new ColocatedBackendSelector.Assignment(bucketNum, numScanNodes, type);
colocatedAssignment = new ColocatedBackendSelector.Assignment(bucketNum, numScanNodes,
scanNode.getBucketProperties());
}
return colocatedAssignment;
}
@ -244,6 +242,7 @@ public class ExecutionFragment {
cachedBucketSeqAssignment.bucketSeqToDriverSeq = Arrays.asList(bucketSeqToDriverSeq);
cachedBucketSeqAssignment.bucketSeqToPartition = Arrays.asList(bucketSeqToPartition);
}
cachedBucketSeqAssignment.bucketProperties = colocatedAssignment.getBucketProperties();
return cachedBucketSeqAssignment;
}

View File

@ -624,7 +624,7 @@ public class DefaultSharedDataWorkerProviderTest {
{ // normal case
FragmentScanRangeAssignment assignment = new FragmentScanRangeAssignment();
ColocatedBackendSelector.Assignment colAssignment = new ColocatedBackendSelector.Assignment(
scanNode.getBucketNums(), 1, ColocatedBackendSelector.Assignment.ScanRangeType.NATIVE);
scanNode.getBucketNums(), 1, Optional.empty());
ColocatedBackendSelector selector =
new ColocatedBackendSelector(scanNode, assignment, colAssignment, false, provider, 1);
// the computation will not fail even though there are non-available locations
@ -643,7 +643,7 @@ public class DefaultSharedDataWorkerProviderTest {
FragmentScanRangeAssignment assignment = new FragmentScanRangeAssignment();
ColocatedBackendSelector.Assignment colAssignment = new ColocatedBackendSelector.Assignment(
scanNode.getBucketNums(), 1, ColocatedBackendSelector.Assignment.ScanRangeType.NATIVE);
scanNode.getBucketNums(), 1, Optional.empty());
ColocatedBackendSelector selector =
new ColocatedBackendSelector(scanNode, assignment, colAssignment, false, provider1, 1);
// the computation will not fail even though there are non-available locations
@ -661,7 +661,7 @@ public class DefaultSharedDataWorkerProviderTest {
ImmutableMap.of(), WarehouseManager.DEFAULT_RESOURCE);
FragmentScanRangeAssignment assignment = new FragmentScanRangeAssignment();
ColocatedBackendSelector.Assignment colAssignment = new ColocatedBackendSelector.Assignment(
scanNode.getBucketNums(), 1, ColocatedBackendSelector.Assignment.ScanRangeType.NATIVE);
scanNode.getBucketNums(), 1, Optional.empty());
ColocatedBackendSelector selector =
new ColocatedBackendSelector(scanNode, assignment, colAssignment, false, providerNoAvailNode, 1);
Assertions.assertThrows(NonRecoverableException.class, selector::computeScanRangeAssignment);

View File

@ -15,13 +15,17 @@
package com.starrocks.qe;
import com.google.common.collect.ImmutableMap;
import com.starrocks.catalog.Column;
import com.starrocks.catalog.Type;
import com.starrocks.common.StarRocksException;
import com.starrocks.connector.BucketProperty;
import com.starrocks.planner.PlanNodeId;
import com.starrocks.planner.ScanNode;
import com.starrocks.qe.scheduler.DefaultWorkerProvider;
import com.starrocks.qe.scheduler.WorkerProvider;
import com.starrocks.server.WarehouseManager;
import com.starrocks.system.ComputeNode;
import com.starrocks.thrift.TBucketFunction;
import com.starrocks.thrift.THdfsScanRange;
import com.starrocks.thrift.TNetworkAddress;
import com.starrocks.thrift.TScanRange;
@ -36,6 +40,7 @@ import org.junit.jupiter.api.Test;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
@ -79,8 +84,8 @@ public class BucketAwareBackendSelectorTest {
}
private ColocatedBackendSelector.Assignment genColocatedAssignment(int bucketNum, int numScanNodes) {
return new ColocatedBackendSelector.Assignment(bucketNum, numScanNodes,
ColocatedBackendSelector.Assignment.ScanRangeType.NONNATIVE);
BucketProperty bucketProperty = new BucketProperty(TBucketFunction.MURMUR3_X86_32, 10, new Column("c1", Type.INT));
return new ColocatedBackendSelector.Assignment(bucketNum, numScanNodes, Optional.of(List.of(bucketProperty)));
}
@Test

View File

@ -40,6 +40,7 @@ import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
@ -274,7 +275,7 @@ public class ColocatedBackendSelectorTest {
FragmentScanRangeAssignment assignment = new FragmentScanRangeAssignment();
ColocatedBackendSelector.Assignment colocatedAssignemnt =
new ColocatedBackendSelector.Assignment(scanNodes.get(0).getBucketNums(), scanNodes.size(),
ColocatedBackendSelector.Assignment.ScanRangeType.NATIVE);
Optional.empty());
for (OlapScanNode scanNode : scanNodes) {
ColocatedBackendSelector backendSelector =

View File

@ -36,6 +36,7 @@ namespace cpp starrocks
namespace java com.starrocks.thrift
include "Exprs.thrift"
include "Partitions.thrift"
include "Types.thrift"
enum TRuntimeFilterBuildJoinMode {
@ -93,6 +94,7 @@ struct TRuntimeFilterLayout {
7: optional list<i32> bucketseq_to_instance;
8: optional list<i32> bucketseq_to_driverseq;
9: optional list<i32> bucketseq_to_partition;
10: optional list<Partitions.TBucketProperty> bucket_properties;
}
struct TRuntimeFilterDescription {

View File

@ -153,6 +153,25 @@ select count(a.trip_distance), count(*), sum(a.trip_distance) from iceberg_sql_t
-- result:
4002 4002 1032700
-- !result
select count(a.trip_distance), count(*), sum(a.trip_distance) from iceberg_sql_test_${uuid0}.iceberg_bucket_db.taxis_1bucket_big a join [BUCKET] iceberg_sql_test_${uuid0}.iceberg_bucket_db.taxis_no_bucket b join [BUCKET] iceberg_sql_test_${uuid0}.iceberg_bucket_db.taxis_bucket_evo c where a.trip_id = b.trip_id and b.trip_id = c.trip_id;
-- result:
7842 7842 2007666
-- !result
set global_runtime_filter_wait_timeout = 500;
-- result:
-- !result
select count(a.trip_distance), count(*), sum(a.trip_distance) from iceberg_sql_test_${uuid0}.iceberg_bucket_db.taxis_1bucket_big a join [BUCKET] iceberg_sql_test_${uuid0}.iceberg_bucket_db.taxis_no_bucket b join [BUCKET] iceberg_sql_test_${uuid0}.iceberg_bucket_db.taxis_bucket_evo c where a.trip_id = b.trip_id and b.trip_id = c.trip_id;
-- result:
7842 7842 2007666
-- !result
select count(a.trip_distance), count(*), sum(a.trip_distance) from iceberg_sql_test_${uuid0}.iceberg_bucket_db.taxis_1bucket_big a join [BUCKET] iceberg_sql_test_${uuid0}.iceberg_bucket_db.taxis_no_bucket b join [BUCKET] iceberg_sql_test_${uuid0}.iceberg_bucket_db.taxis_bucket_evo c where a.trip_id = b.trip_id and b.trip_id = c.trip_id;
-- result:
7842 7842 2007666
-- !result
select count(a.trip_distance), count(*), sum(a.trip_distance) from iceberg_sql_test_${uuid0}.iceberg_bucket_db.taxis_1bucket_big a join [BUCKET] iceberg_sql_test_${uuid0}.iceberg_bucket_db.taxis_no_bucket b join [BUCKET] iceberg_sql_test_${uuid0}.iceberg_bucket_db.taxis_bucket_evo c where a.trip_id = b.trip_id and b.trip_id = c.trip_id;
-- result:
7842 7842 2007666
-- !result
select count(a.trip_distance), count(*), sum(a.trip_distance) from iceberg_sql_test_${uuid0}.iceberg_bucket_db.taxis_2bucket_1 a join iceberg_sql_test_${uuid0}.iceberg_bucket_db.taxis_2bucket_2 b on a.trip_id = b.trip_id;
-- result:
2969 2969 757258

View File

@ -70,7 +70,12 @@ select count(a.trip_distance), count(*), sum(a.trip_distance) from iceberg_sql_t
-- #### bucket shuffle join
select count(a.trip_distance), count(*), sum(a.trip_distance) from iceberg_sql_test_${uuid0}.iceberg_bucket_db.taxis_1bucket_big a join [BUCKET] iceberg_sql_test_${uuid0}.iceberg_bucket_db.taxis_no_bucket b on a.trip_id = b.trip_id;
-- select count(a.trip_distance), count(*), sum(a.trip_distance) from iceberg_sql_test_${uuid0}.iceberg_bucket_db.taxis_1bucket_big a join [BUCKET] iceberg_sql_test_${uuid0}.iceberg_bucket_db.taxis_no_bucket b join [BUCKET] iceberg_sql_test_${uuid0}.iceberg_bucket_db.taxis_bucket_evo c where a.trip_id = b.trip_id and b.trip_id = c.trip_id;
select count(a.trip_distance), count(*), sum(a.trip_distance) from iceberg_sql_test_${uuid0}.iceberg_bucket_db.taxis_1bucket_big a join [BUCKET] iceberg_sql_test_${uuid0}.iceberg_bucket_db.taxis_no_bucket b join [BUCKET] iceberg_sql_test_${uuid0}.iceberg_bucket_db.taxis_bucket_evo c where a.trip_id = b.trip_id and b.trip_id = c.trip_id;
-- wait for global runtime filter
set global_runtime_filter_wait_timeout = 500;
select count(a.trip_distance), count(*), sum(a.trip_distance) from iceberg_sql_test_${uuid0}.iceberg_bucket_db.taxis_1bucket_big a join [BUCKET] iceberg_sql_test_${uuid0}.iceberg_bucket_db.taxis_no_bucket b join [BUCKET] iceberg_sql_test_${uuid0}.iceberg_bucket_db.taxis_bucket_evo c where a.trip_id = b.trip_id and b.trip_id = c.trip_id;
select count(a.trip_distance), count(*), sum(a.trip_distance) from iceberg_sql_test_${uuid0}.iceberg_bucket_db.taxis_1bucket_big a join [BUCKET] iceberg_sql_test_${uuid0}.iceberg_bucket_db.taxis_no_bucket b join [BUCKET] iceberg_sql_test_${uuid0}.iceberg_bucket_db.taxis_bucket_evo c where a.trip_id = b.trip_id and b.trip_id = c.trip_id;
select count(a.trip_distance), count(*), sum(a.trip_distance) from iceberg_sql_test_${uuid0}.iceberg_bucket_db.taxis_1bucket_big a join [BUCKET] iceberg_sql_test_${uuid0}.iceberg_bucket_db.taxis_no_bucket b join [BUCKET] iceberg_sql_test_${uuid0}.iceberg_bucket_db.taxis_bucket_evo c where a.trip_id = b.trip_id and b.trip_id = c.trip_id;
-- #### colocate/bucket shuffle join for multi bucket columns
select count(a.trip_distance), count(*), sum(a.trip_distance) from iceberg_sql_test_${uuid0}.iceberg_bucket_db.taxis_2bucket_1 a join iceberg_sql_test_${uuid0}.iceberg_bucket_db.taxis_2bucket_2 b on a.trip_id = b.trip_id;