Signed-off-by: zombee0 <ewang2027@gmail.com>
This commit is contained in:
parent
85b141ca97
commit
843806e61e
|
|
@ -71,16 +71,14 @@ public class BucketProperty {
|
|||
List<BucketProperty> bp0 = bucketProperties.get(0);
|
||||
for (int i = 1; i < bucketProperties.size(); i++) {
|
||||
List<BucketProperty> bp = bucketProperties.get(i);
|
||||
boolean pass = bp.size() == bp0.size();
|
||||
if (bp.size() != bp0.size()) {
|
||||
throw new StarRocksPlannerException("Error when using bucket-aware execution", INTERNAL_ERROR);
|
||||
}
|
||||
for (int j = 0; j < bp0.size(); j++) {
|
||||
if (!bp.get(j).satisfy(bp0.get(j))) {
|
||||
pass = false;
|
||||
break;
|
||||
throw new StarRocksPlannerException("Error when using bucket-aware execution", INTERNAL_ERROR);
|
||||
}
|
||||
}
|
||||
if (!pass) {
|
||||
throw new StarRocksPlannerException("Error when using bucket-aware execution", INTERNAL_ERROR);
|
||||
}
|
||||
}
|
||||
return Optional.of(bucketProperties.get(0));
|
||||
}
|
||||
|
|
|
|||
|
|
@ -257,8 +257,14 @@ public class IcebergScanNode extends ScanNode {
|
|||
throw new StarRocksConnectorException("Error when using bucket-aware execution for table: "
|
||||
+ icebergTable.getName());
|
||||
}
|
||||
return this.bucketProperties.get().stream().map(
|
||||
BucketProperty::getBucketNum).reduce(1, (a, b) -> (a + 1) * (b + 1));
|
||||
|
||||
List<Integer> bucketNums = this.bucketProperties.get().stream().map(
|
||||
BucketProperty::getBucketNum).toList();
|
||||
int res = bucketNums.get(0) + 1;
|
||||
for (int i = 1; i < bucketNums.size(); i++) {
|
||||
res = res * (bucketNums.get(i) + 1);
|
||||
}
|
||||
return res;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
|||
|
|
@ -105,7 +105,6 @@ public class BucketAwareBackendSelector implements BackendSelector {
|
|||
scanRanges.put(scanNode.getId().asInt(), entry.getValue());
|
||||
seqToWorkerId.put(entry.getKey(), bucketSeqToWorkerId.get(entry.getKey()));
|
||||
}
|
||||
recordScanRangeStatistic();
|
||||
|
||||
if (useIncrementalScanRanges) {
|
||||
boolean hasMore = scanNode.hasMoreScanRanges();
|
||||
|
|
@ -139,6 +138,7 @@ public class BucketAwareBackendSelector implements BackendSelector {
|
|||
}
|
||||
}
|
||||
}
|
||||
recordScanRangeStatistic();
|
||||
}
|
||||
|
||||
private void recordScanRangeStatistic() {
|
||||
|
|
@ -156,7 +156,7 @@ public class BucketAwareBackendSelector implements BackendSelector {
|
|||
String host = workerProvider.getWorkerById(entry.getKey()).getAddress().hostname.replace('.', '_');
|
||||
long value = entry.getValue();
|
||||
String key = String.format("Bucket Placement.%s.assign.%s", scanNode.getTableName(), host);
|
||||
Tracers.count(Tracers.Module.EXTERNAL, key, value);
|
||||
Tracers.record(Tracers.Module.EXTERNAL, key, String.valueOf(value));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -19,6 +19,16 @@ import com.starrocks.sql.optimizer.OptExpressionVisitor;
|
|||
import com.starrocks.sql.optimizer.base.PhysicalPropertySet;
|
||||
import com.starrocks.sql.optimizer.task.TaskContext;
|
||||
|
||||
/*
|
||||
* This rule is used to remove Scan Node Output Properties requested by the RequiredPropertyDriver,
|
||||
* but ultimately unused.
|
||||
* If there are no nodes between the Exchange and the Scan Node requesting these properties,
|
||||
* Optional Output Properties on the Scan Node are unused.
|
||||
* For example, if both the left and right tables are bucket column partitioned tables,
|
||||
* but the number of buckets differs, a bucket shuffle join is required.
|
||||
* And then the upstream node of the Scan Node for the right table is an Exchange,
|
||||
* the Output Property on the right table is unused and should be removed.
|
||||
*/
|
||||
public class RemoveUselessScanOutputPropertyRule implements TreeRewriteRule {
|
||||
@Override
|
||||
public OptExpression rewrite(OptExpression root, TaskContext taskContext) {
|
||||
|
|
@ -29,7 +39,7 @@ public class RemoveUselessScanOutputPropertyRule implements TreeRewriteRule {
|
|||
|
||||
public static class RemoveUselessOutputPropertyVisitor extends OptExpressionVisitor<Void, Boolean> {
|
||||
@Override
|
||||
public Void visit(OptExpression optExpression, Boolean context) {
|
||||
public Void visit(OptExpression optExpression, Boolean removeOutputProperty) {
|
||||
// set operation except/intersect/union
|
||||
// join operation
|
||||
if (optExpression.getInputs().size() > 1) {
|
||||
|
|
@ -38,36 +48,36 @@ public class RemoveUselessScanOutputPropertyRule implements TreeRewriteRule {
|
|||
}
|
||||
} else {
|
||||
for (OptExpression opt : optExpression.getInputs()) {
|
||||
opt.getOp().accept(this, opt, context);
|
||||
opt.getOp().accept(this, opt, removeOutputProperty);
|
||||
}
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Void visitPhysicalHashAggregate(OptExpression optExpression, Boolean context) {
|
||||
public Void visitPhysicalHashAggregate(OptExpression optExpression, Boolean removeOutputProperty) {
|
||||
OptExpression opt = optExpression.getInputs().get(0);
|
||||
opt.getOp().accept(this, opt, false);
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Void visitPhysicalAnalytic(OptExpression optExpression, Boolean context) {
|
||||
public Void visitPhysicalAnalytic(OptExpression optExpression, Boolean removeOutputProperty) {
|
||||
OptExpression opt = optExpression.getInputs().get(0);
|
||||
opt.getOp().accept(this, opt, false);
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Void visitPhysicalDistribution(OptExpression optExpression, Boolean context) {
|
||||
public Void visitPhysicalDistribution(OptExpression optExpression, Boolean removeOutputProperty) {
|
||||
OptExpression opt = optExpression.getInputs().get(0);
|
||||
opt.getOp().accept(this, opt, true);
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Void visitPhysicalIcebergScan(OptExpression optExpression, Boolean context) {
|
||||
if (context && optExpression.getOutputProperty().getDistributionProperty().isShuffle()) {
|
||||
public Void visitPhysicalIcebergScan(OptExpression optExpression, Boolean removeOutputProperty) {
|
||||
if (removeOutputProperty && optExpression.getOutputProperty().getDistributionProperty().isShuffle()) {
|
||||
optExpression.setOutputProperty(PhysicalPropertySet.EMPTY);
|
||||
}
|
||||
return null;
|
||||
|
|
|
|||
|
|
@ -56,7 +56,6 @@ import com.starrocks.sql.ast.AlterTableStmt;
|
|||
import com.starrocks.sql.ast.DmlStmt;
|
||||
import com.starrocks.sql.ast.IcebergRewriteStmt;
|
||||
import com.starrocks.sql.ast.InsertStmt;
|
||||
import com.starrocks.sql.ast.QueryStatement;
|
||||
import com.starrocks.sql.ast.StatementBase;
|
||||
import com.starrocks.sql.optimizer.base.ColumnRefFactory;
|
||||
import com.starrocks.sql.optimizer.operator.scalar.ConstantOperator;
|
||||
|
|
@ -64,8 +63,8 @@ import com.starrocks.sql.optimizer.operator.scalar.ScalarOperator;
|
|||
import com.starrocks.sql.optimizer.transformer.ExpressionMapping;
|
||||
import com.starrocks.sql.optimizer.transformer.SqlToScalarOperatorTranslator;
|
||||
import com.starrocks.sql.parser.NodePosition;
|
||||
import com.starrocks.sql.parser.SqlParser;
|
||||
import com.starrocks.sql.plan.ExecPlan;
|
||||
import com.starrocks.thrift.TBucketFunction;
|
||||
import com.starrocks.thrift.TIcebergTable;
|
||||
import com.starrocks.thrift.TScanRangeLocations;
|
||||
import com.starrocks.thrift.TSinkCommitInfo;
|
||||
|
|
@ -109,6 +108,7 @@ import java.util.Optional;
|
|||
import java.util.Set;
|
||||
import javax.swing.text.html.Option;
|
||||
import static com.starrocks.common.util.Util.executeCommand;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
public class IcebergScanNodeTest {
|
||||
public static final HdfsEnvironment HDFS_ENVIRONMENT = new HdfsEnvironment();
|
||||
|
|
@ -1220,4 +1220,39 @@ public class IcebergScanNodeTest {
|
|||
Assertions.assertTrue(ex.getMessage().contains("db.table"));
|
||||
Assertions.assertTrue(ex.getMessage().contains("boom"));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetBucketNums(@Mocked IcebergTable table) {
|
||||
TupleDescriptor desc = new TupleDescriptor(new TupleId(0));
|
||||
desc.setTable(table);
|
||||
|
||||
IcebergScanNode scanNode = new IcebergScanNode(
|
||||
new PlanNodeId(0), desc, "IcebergScanNode",
|
||||
IcebergTableMORParams.EMPTY, IcebergMORParams.DATA_FILE_WITHOUT_EQ_DELETE);
|
||||
|
||||
// Create three bucket properties
|
||||
List<BucketProperty> bucketProperties = new ArrayList<>();
|
||||
Column column1 = new Column("test_col1", ScalarType.INT);
|
||||
Column column2 = new Column("test_col2", ScalarType.INT);
|
||||
Column column3 = new Column("test_col3", ScalarType.INT);
|
||||
Column column4 = new Column("test_col4", ScalarType.INT);
|
||||
BucketProperty bucketProperty1 = new BucketProperty(TBucketFunction.MURMUR3_X86_32, 2, column1);
|
||||
BucketProperty bucketProperty2 = new BucketProperty(TBucketFunction.MURMUR3_X86_32, 3, column2);
|
||||
BucketProperty bucketProperty3 = new BucketProperty(TBucketFunction.MURMUR3_X86_32, 4, column3);
|
||||
BucketProperty bucketProperty4 = new BucketProperty(TBucketFunction.MURMUR3_X86_32, 5, column4);
|
||||
bucketProperties.add(bucketProperty1);
|
||||
bucketProperties.add(bucketProperty2);
|
||||
bucketProperties.add(bucketProperty3);
|
||||
bucketProperties.add(bucketProperty4);
|
||||
|
||||
scanNode.setBucketProperties(bucketProperties);
|
||||
|
||||
// Test
|
||||
int result = scanNode.getBucketNums();
|
||||
|
||||
// Verify: (2 + 1) * (3 + 1) * (4 + 1) * (5 + 1) = 3 * 4 * 5 * 6 = 360
|
||||
Assertions.assertEquals(360, result);
|
||||
// wrong method
|
||||
Assertions.assertEquals(876, Stream.of(2, 3, 4, 5).reduce(1, (a, b) -> (a + 1) * (b + 1)));
|
||||
}
|
||||
}
|
||||
Loading…
Reference in New Issue