Signed-off-by: Murphy <mofei@starrocks.com> Co-authored-by: Murphy <96611012+murphyatwork@users.noreply.github.com> Co-authored-by: Murphy <mofei@starrocks.com>
This commit is contained in:
parent
3b1e377c6e
commit
c844fce1fb
|
|
@ -100,6 +100,7 @@ public class FunctionSet {
|
|||
public static final String TIMESTAMPADD = "timestampadd";
|
||||
public static final String TIMESTAMPDIFF = "timestampdiff";
|
||||
public static final String TO_DATE = "to_date";
|
||||
public static final String TO_DATETIME = "to_datetime";
|
||||
public static final String DATE = "date";
|
||||
public static final String LAST_DAY = "last_day";
|
||||
public static final String MAKEDATE = "makedate";
|
||||
|
|
|
|||
|
|
@ -415,6 +415,7 @@ public class SessionVariable implements Serializable, Writable, Cloneable {
|
|||
public static final String CBO_USE_DB_LOCK = "cbo_use_lock_db";
|
||||
public static final String CBO_PREDICATE_SUBFIELD_PATH = "cbo_enable_predicate_subfield_path";
|
||||
public static final String CBO_PREPARE_METADATA_THREAD_POOL_SIZE = "cbo_prepare_metadata_thread_pool_size";
|
||||
public static final String CBO_REWRITE_MONOTONIC_MINMAX = "cbo_rewrite_monotonic_minmax_aggregation";
|
||||
|
||||
public static final String CBO_ENABLE_PARALLEL_PREPARE_METADATA = "enable_parallel_prepare_metadata";
|
||||
|
||||
|
|
@ -2521,6 +2522,9 @@ public class SessionVariable implements Serializable, Writable, Cloneable {
|
|||
@VarAttr(name = CBO_PREDICATE_SUBFIELD_PATH, flag = VariableMgr.INVISIBLE)
|
||||
private boolean cboPredicateSubfieldPath = true;
|
||||
|
||||
@VarAttr(name = CBO_REWRITE_MONOTONIC_MINMAX, flag = VariableMgr.INVISIBLE)
|
||||
private boolean cboRewriteMonotonicMinMaxAggregation = true;
|
||||
|
||||
@VarAttr(name = CROSS_JOIN_COST_PENALTY, flag = VariableMgr.INVISIBLE)
|
||||
private long crossJoinCostPenalty = 1000000;
|
||||
|
||||
|
|
@ -4832,6 +4836,14 @@ public class SessionVariable implements Serializable, Writable, Cloneable {
|
|||
this.cboEqBaseType = cboEqBaseType;
|
||||
}
|
||||
|
||||
public void setCboRewriteMonotonicMinmax(boolean value) {
|
||||
this.cboRewriteMonotonicMinMaxAggregation = value;
|
||||
}
|
||||
|
||||
public boolean isCboRewriteMonotonicMinMaxAggregation() {
|
||||
return cboRewriteMonotonicMinMaxAggregation;
|
||||
}
|
||||
|
||||
public boolean isAuditExecuteStmt() {
|
||||
return auditExecuteStmt;
|
||||
}
|
||||
|
|
|
|||
|
|
@ -76,6 +76,7 @@ import com.starrocks.sql.optimizer.rule.transformation.PushDownTopNBelowUnionRul
|
|||
import com.starrocks.sql.optimizer.rule.transformation.PushLimitAndFilterToCTEProduceRule;
|
||||
import com.starrocks.sql.optimizer.rule.transformation.RemoveAggregationFromAggTable;
|
||||
import com.starrocks.sql.optimizer.rule.transformation.RewriteGroupingSetsByCTERule;
|
||||
import com.starrocks.sql.optimizer.rule.transformation.RewriteMinMaxByMonotonicFunctionRule;
|
||||
import com.starrocks.sql.optimizer.rule.transformation.RewriteMultiDistinctRule;
|
||||
import com.starrocks.sql.optimizer.rule.transformation.RewriteSimpleAggToHDFSScanRule;
|
||||
import com.starrocks.sql.optimizer.rule.transformation.RewriteUnnestBitmapRule;
|
||||
|
|
@ -685,6 +686,7 @@ public class QueryOptimizer extends Optimizer {
|
|||
scheduler.rewriteIterative(tree, rootTaskContext, new MergeProjectWithChildRule());
|
||||
|
||||
scheduler.rewriteOnce(tree, rootTaskContext, JsonPathRewriteRule.createForOlapScan());
|
||||
scheduler.rewriteIterative(tree, rootTaskContext, new RewriteMinMaxByMonotonicFunctionRule());
|
||||
|
||||
scheduler.rewriteOnce(tree, rootTaskContext, new EliminateSortColumnWithEqualityPredicateRule());
|
||||
scheduler.rewriteOnce(tree, rootTaskContext, new PushDownTopNBelowOuterJoinRule());
|
||||
|
|
|
|||
|
|
@ -18,6 +18,7 @@ package com.starrocks.sql.optimizer.rewrite;
|
|||
import com.google.common.base.Joiner;
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import com.google.common.collect.ImmutableSet;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Sets;
|
||||
import com.starrocks.analysis.FunctionName;
|
||||
|
|
@ -251,11 +252,16 @@ public enum ScalarOperatorEvaluator {
|
|||
return false;
|
||||
}
|
||||
|
||||
if ((FunctionSet.DATE_FORMAT.equalsIgnoreCase(invoker.getSignature().getName())
|
||||
|| FunctionSet.STR_TO_DATE.equalsIgnoreCase(invoker.getSignature().getName())
|
||||
|| FunctionSet.STR2DATE.equalsIgnoreCase(invoker.getSignature().getName())
|
||||
|| FunctionSet.FROM_UNIXTIME.equalsIgnoreCase(invoker.getSignature().getName()))
|
||||
&& operator.getChildren().size() == 2) {
|
||||
final ImmutableSet<String> SUPPORTED = ImmutableSet.of(
|
||||
FunctionSet.DATE_FORMAT,
|
||||
FunctionSet.STR_TO_DATE,
|
||||
FunctionSet.STR2DATE,
|
||||
FunctionSet.FROM_UNIXTIME,
|
||||
FunctionSet.FROM_UNIXTIME_MS,
|
||||
FunctionSet.TO_DATETIME
|
||||
);
|
||||
|
||||
if (SUPPORTED.contains(invoker.getSignature().getName().toLowerCase()) && operator.getChildren().size() == 2) {
|
||||
String pattern = operator.getChild(1).toString();
|
||||
if (pattern.isEmpty()) {
|
||||
return true;
|
||||
|
|
|
|||
|
|
@ -795,6 +795,53 @@ public class ScalarOperatorFunctions {
|
|||
return dateFormat(dl, fmtLiteral);
|
||||
}
|
||||
|
||||
@ConstantFunction(name = "to_datetime", argTypes = {BIGINT, INT}, returnType = DATETIME, isMonotonic = true)
|
||||
public static ConstantOperator toDatetime(ConstantOperator unixtime, ConstantOperator scale) {
|
||||
long seconds = unixtime.getBigint();
|
||||
long nanos = 0;
|
||||
int scaleValue = 0;
|
||||
if (scale != null && scale.getInt() > 0) {
|
||||
scaleValue = scale.getInt();
|
||||
}
|
||||
switch (scaleValue) {
|
||||
case 0:
|
||||
break;
|
||||
case 3:
|
||||
nanos = (seconds % 1000) * 1000_000;
|
||||
seconds /= 1000;
|
||||
break;
|
||||
case 6:
|
||||
nanos = (seconds % 1000_000) * 1000;
|
||||
seconds /= 1000_000;
|
||||
break;
|
||||
default:
|
||||
return ConstantOperator.NULL;
|
||||
}
|
||||
|
||||
if (seconds < 0 || seconds > TimeUtils.MAX_UNIX_TIMESTAMP || nanos < 0) {
|
||||
return ConstantOperator.NULL;
|
||||
}
|
||||
|
||||
if (scaleValue == 0) {
|
||||
return ConstantOperator.createDatetime(
|
||||
LocalDateTime.ofInstant(Instant.ofEpochSecond(seconds), TimeUtils.getTimeZone().toZoneId()));
|
||||
} else {
|
||||
return ConstantOperator.createDatetime(
|
||||
LocalDateTime.ofInstant(Instant.ofEpochSecond(seconds).plusNanos(nanos),
|
||||
TimeUtils.getTimeZone().toZoneId()));
|
||||
}
|
||||
}
|
||||
|
||||
@ConstantFunction(name = "to_datetime", argTypes = {BIGINT}, returnType = DATETIME, isMonotonic = true)
|
||||
public static ConstantOperator toDatetime(ConstantOperator unixtime) {
|
||||
long seconds = unixtime.getBigint();
|
||||
if (seconds < 0 || seconds > TimeUtils.MAX_UNIX_TIMESTAMP) {
|
||||
return ConstantOperator.NULL;
|
||||
}
|
||||
return ConstantOperator.createDatetime(
|
||||
LocalDateTime.ofInstant(Instant.ofEpochSecond(seconds), TimeUtils.getTimeZone().toZoneId()));
|
||||
}
|
||||
|
||||
@ConstantFunction.List(list = {
|
||||
@ConstantFunction(name = "curtime", argTypes = {}, returnType = TIME),
|
||||
@ConstantFunction(name = "current_time", argTypes = {}, returnType = TIME)
|
||||
|
|
|
|||
|
|
@ -152,6 +152,7 @@ import com.starrocks.sql.optimizer.rule.transformation.RewriteBitmapCountDistinc
|
|||
import com.starrocks.sql.optimizer.rule.transformation.RewriteCountIfFunction;
|
||||
import com.starrocks.sql.optimizer.rule.transformation.RewriteDuplicateAggregateFnRule;
|
||||
import com.starrocks.sql.optimizer.rule.transformation.RewriteHllCountDistinctRule;
|
||||
import com.starrocks.sql.optimizer.rule.transformation.RewriteMinMaxByMonotonicFunctionRule;
|
||||
import com.starrocks.sql.optimizer.rule.transformation.RewriteSimpleAggToHDFSScanRule;
|
||||
import com.starrocks.sql.optimizer.rule.transformation.RewriteSimpleAggToMetaScanRule;
|
||||
import com.starrocks.sql.optimizer.rule.transformation.RewriteSumByAssociativeRule;
|
||||
|
|
@ -322,7 +323,8 @@ public class RuleSet {
|
|||
new RewriteDuplicateAggregateFnRule(),
|
||||
new RewriteSimpleAggToMetaScanRule(),
|
||||
new RewriteSumByAssociativeRule(),
|
||||
new RewriteCountIfFunction()
|
||||
new RewriteCountIfFunction(),
|
||||
new RewriteMinMaxByMonotonicFunctionRule()
|
||||
));
|
||||
|
||||
public static final Rule PRUNE_PROJECT_RULES = new CombinationRule(RuleType.GP_PRUNE_PROJECT, ImmutableList.of(
|
||||
|
|
|
|||
|
|
@ -149,6 +149,7 @@ public enum RuleType {
|
|||
TF_REWRITE_PARTITION_COLUMN_ONLY_AGG,
|
||||
TF_REWRITE_SUM_BY_ASSOCIATIVE_RULE,
|
||||
TF_REWRITE_COUNT_IF_RULE,
|
||||
TF_REWRITE_MINMAX_BY_MONOTONIC_FUNCTION,
|
||||
TF_ICEBERG_PARTITIONS_TABLE_REWRITE_RULE,
|
||||
TF_ICEBERG_EQUALITY_REWRITE_RULE,
|
||||
TF_REWRITE_UNNEST_BITMAP_RULE,
|
||||
|
|
|
|||
|
|
@ -0,0 +1,302 @@
|
|||
// Copyright 2021-present StarRocks, Inc. All rights reserved.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// https://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package com.starrocks.sql.optimizer.rule.transformation;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.collect.ImmutableSet;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Maps;
|
||||
import com.starrocks.catalog.AggregateFunction;
|
||||
import com.starrocks.catalog.Column;
|
||||
import com.starrocks.catalog.Function;
|
||||
import com.starrocks.catalog.FunctionSet;
|
||||
import com.starrocks.catalog.OlapTable;
|
||||
import com.starrocks.catalog.Type;
|
||||
import com.starrocks.server.GlobalStateMgr;
|
||||
import com.starrocks.sql.optimizer.OptExpression;
|
||||
import com.starrocks.sql.optimizer.OptimizerContext;
|
||||
import com.starrocks.sql.optimizer.Utils;
|
||||
import com.starrocks.sql.optimizer.base.ColumnIdentifier;
|
||||
import com.starrocks.sql.optimizer.base.ColumnRefFactory;
|
||||
import com.starrocks.sql.optimizer.operator.OperatorType;
|
||||
import com.starrocks.sql.optimizer.operator.Projection;
|
||||
import com.starrocks.sql.optimizer.operator.logical.LogicalAggregationOperator;
|
||||
import com.starrocks.sql.optimizer.operator.logical.LogicalOlapScanOperator;
|
||||
import com.starrocks.sql.optimizer.operator.logical.LogicalScanOperator;
|
||||
import com.starrocks.sql.optimizer.operator.pattern.Pattern;
|
||||
import com.starrocks.sql.optimizer.operator.scalar.CallOperator;
|
||||
import com.starrocks.sql.optimizer.operator.scalar.ColumnRefOperator;
|
||||
import com.starrocks.sql.optimizer.operator.scalar.ConstantOperator;
|
||||
import com.starrocks.sql.optimizer.operator.scalar.OperatorFunctionChecker;
|
||||
import com.starrocks.sql.optimizer.operator.scalar.ScalarOperator;
|
||||
import com.starrocks.sql.optimizer.rewrite.ScalarOperatorFunctions;
|
||||
import com.starrocks.sql.optimizer.rule.RuleType;
|
||||
import com.starrocks.sql.optimizer.statistics.IMinMaxStatsMgr;
|
||||
import com.starrocks.sql.optimizer.statistics.StatsVersion;
|
||||
import com.starrocks.statistic.StatisticUtils;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Objects;
|
||||
import java.util.Optional;
|
||||
|
||||
/**
|
||||
* Rewrite MIN(f(col)) -> f(MIN(col)) and MAX(f(col)) -> f(MAX(col)) when f is monotonic and safe.
|
||||
* Correctness:
|
||||
* 1. Aggregation must be MIN or MAX.
|
||||
* 2. Function must be deterministic and monotonic.
|
||||
* 3. NULLs are handled consistently: both forms ignore NULL values.
|
||||
* 4. Domain must be valid: inputs must not produce overflow/invalid casts.
|
||||
*
|
||||
* Initial scope: only supports
|
||||
* - to_datetime(BIGINT)
|
||||
* - from_unixtime(BIGINT)
|
||||
*/
|
||||
public class RewriteMinMaxByMonotonicFunctionRule extends TransformationRule {
|
||||
|
||||
private static final ImmutableSet<String> SUPPORTED_FUNCTION_SET = ImmutableSet.of(
|
||||
FunctionSet.TO_DATETIME,
|
||||
FunctionSet.FROM_UNIXTIME
|
||||
);
|
||||
|
||||
public RewriteMinMaxByMonotonicFunctionRule() {
|
||||
super(RuleType.TF_REWRITE_MINMAX_BY_MONOTONIC_FUNCTION,
|
||||
Pattern.create(OperatorType.LOGICAL_AGGR)
|
||||
.addChildren(Pattern.create(OperatorType.LOGICAL_OLAP_SCAN)));
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean check(OptExpression input, OptimizerContext context) {
|
||||
LogicalAggregationOperator agg = input.getOp().cast();
|
||||
LogicalScanOperator scanOperator = input.inputAt(0).getOp().cast();
|
||||
if (!context.getSessionVariable().isCboRewriteMonotonicMinMaxAggregation()) {
|
||||
return false;
|
||||
}
|
||||
// 1. No aggregation
|
||||
// 2. No projection
|
||||
// 3. HAVING
|
||||
if (agg.getAggregations().isEmpty() || agg.getPredicate() != null || scanOperator.getProjection() == null) {
|
||||
return false;
|
||||
}
|
||||
// Fast check: at least one MIN/MAX over a projected monotonic function of a single column
|
||||
for (Map.Entry<ColumnRefOperator, CallOperator> entry : agg.getAggregations().entrySet()) {
|
||||
CallOperator call = entry.getValue();
|
||||
// 1. Min/Max aggregation
|
||||
if (!isMinOrMax(call)) {
|
||||
continue;
|
||||
}
|
||||
if (call.getArguments().size() != 1 || !call.getArguments().get(0).isColumnRef()) {
|
||||
continue;
|
||||
}
|
||||
ColumnRefOperator argRef = (ColumnRefOperator) call.getArguments().get(0);
|
||||
ScalarOperator projected = scanOperator.getProjection().getColumnRefMap().get(argRef);
|
||||
if (projected == null) {
|
||||
continue;
|
||||
}
|
||||
// 2. Monotonic function
|
||||
if (!isAllowedMonotonicFunction(projected)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
// 3. The MinMaxStats exists
|
||||
List<ColumnRefOperator> columnRefList = Utils.extractColumnRef(projected);
|
||||
if (columnRefList.size() != 1) {
|
||||
continue;
|
||||
}
|
||||
ColumnRefOperator ref = columnRefList.get(0);
|
||||
OlapTable table = (OlapTable) scanOperator.getTable();
|
||||
Column column = scanOperator.getColRefToColumnMetaMap().get(ref);
|
||||
if (column == null) {
|
||||
continue;
|
||||
}
|
||||
final Long lastUpdateTime = StatisticUtils.getTableLastUpdateTimestamp(table);
|
||||
Optional<IMinMaxStatsMgr.ColumnMinMax> minMax = IMinMaxStatsMgr.internalInstance()
|
||||
.getStats(new ColumnIdentifier(table.getId(), column.getColumnId()),
|
||||
new StatsVersion(-1, lastUpdateTime));
|
||||
if (minMax.isPresent()) {
|
||||
return validateArgumentDomain(projected, minMax.get());
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<OptExpression> transform(OptExpression input, OptimizerContext context) {
|
||||
LogicalAggregationOperator agg = (LogicalAggregationOperator) input.getOp();
|
||||
LogicalOlapScanOperator scan = input.inputAt(0).getOp().cast();
|
||||
ColumnRefFactory columnRefFactory = context.getColumnRefFactory();
|
||||
|
||||
Map<ColumnRefOperator, ScalarOperator> oldPreProj = scan.getProjection().getColumnRefMap();
|
||||
Map<ColumnRefOperator, ScalarOperator> newPreProj = Maps.newHashMap(oldPreProj);
|
||||
Map<ColumnRefOperator, CallOperator> newAggs = Maps.newHashMap();
|
||||
// Collect rewritten outputs and their reapplied expressions first.
|
||||
Map<ColumnRefOperator, ScalarOperator> rewrittenPostProj = Maps.newHashMap();
|
||||
Map<ColumnRefOperator, ScalarOperator> postProj = Maps.newHashMap();
|
||||
|
||||
boolean rewroteAny = false;
|
||||
|
||||
for (Map.Entry<ColumnRefOperator, CallOperator> entry : agg.getAggregations().entrySet()) {
|
||||
ColumnRefOperator outAggRef = entry.getKey();
|
||||
CallOperator outerAgg = entry.getValue();
|
||||
|
||||
if (!isMinOrMax(outerAgg) || outerAgg.getArguments().size() != 1 || !outerAgg.getArguments().get(0).isColumnRef()) {
|
||||
// Preserve as-is
|
||||
newAggs.put(outAggRef, outerAgg);
|
||||
continue;
|
||||
}
|
||||
|
||||
ColumnRefOperator projectedRef = (ColumnRefOperator) outerAgg.getArguments().get(0);
|
||||
ScalarOperator projectedExpr = oldPreProj.get(projectedRef);
|
||||
if (projectedExpr == null || !isAllowedMonotonicFunction(projectedExpr)) {
|
||||
// Not eligible, keep as-is
|
||||
newAggs.put(outAggRef, outerAgg);
|
||||
continue;
|
||||
}
|
||||
|
||||
ScalarOperator innerArg;
|
||||
FunctionWithChild fnWithChild = extractFunctionAndChild(projectedExpr);
|
||||
if (fnWithChild == null) {
|
||||
newAggs.put(outAggRef, outerAgg);
|
||||
continue;
|
||||
}
|
||||
innerArg = fnWithChild.child;
|
||||
if (!(innerArg instanceof ColumnRefOperator)) {
|
||||
newAggs.put(outAggRef, outerAgg);
|
||||
continue;
|
||||
}
|
||||
ColumnRefOperator innerColRef = (ColumnRefOperator) innerArg;
|
||||
|
||||
// Ensure inner column is projected pre-agg
|
||||
if (!newPreProj.containsKey(innerColRef)) {
|
||||
newPreProj.put(innerColRef, innerColRef);
|
||||
}
|
||||
|
||||
// Build MIN/MAX over innerColRef
|
||||
String aggName = outerAgg.getFnName();
|
||||
Type innerType = innerColRef.getType();
|
||||
AggregateFunction aggFn = AggregateFunction.createBuiltin(aggName,
|
||||
Lists.newArrayList(innerType), innerType, innerType, false, true, false);
|
||||
Function resolvedAggFn = GlobalStateMgr.getCurrentState().getFunction(aggFn, Function.CompareMode.IS_IDENTICAL);
|
||||
Preconditions.checkState(resolvedAggFn != null, "cannot find aggregate function %s(%s)", aggName, innerType);
|
||||
|
||||
CallOperator newInnerAggCall = new CallOperator(aggName, innerType, List.of(innerColRef), resolvedAggFn);
|
||||
ColumnRefOperator newInnerAggRef = columnRefFactory.create(newInnerAggCall, newInnerAggCall.getType(), true);
|
||||
newAggs.put(newInnerAggRef, newInnerAggCall);
|
||||
|
||||
// Post-agg: outAggRef := f(newInnerAggRef)
|
||||
ScalarOperator reapplied;
|
||||
if (fnWithChild.call != null) {
|
||||
CallOperator origCall = fnWithChild.call;
|
||||
Function fn = origCall.getFunction();
|
||||
// rebuild with new child
|
||||
reapplied = new CallOperator(origCall.getFnName(), origCall.getType(), List.of(newInnerAggRef), fn);
|
||||
} else {
|
||||
// should not happen
|
||||
newAggs.put(outAggRef, outerAgg);
|
||||
continue;
|
||||
}
|
||||
rewrittenPostProj.put(outAggRef, reapplied);
|
||||
rewroteAny = true;
|
||||
}
|
||||
|
||||
if (!rewroteAny) {
|
||||
return Lists.newArrayList();
|
||||
}
|
||||
|
||||
// Preserve existing projection entries for outputs that were not rewritten
|
||||
if (agg.getProjection() != null && agg.getProjection().getColumnRefMap() != null) {
|
||||
for (Map.Entry<ColumnRefOperator, ScalarOperator> e : agg.getProjection().getColumnRefMap().entrySet()) {
|
||||
if (!rewrittenPostProj.containsKey(e.getKey())) {
|
||||
postProj.put(e.getKey(), e.getValue());
|
||||
}
|
||||
}
|
||||
}
|
||||
// Add rewritten outputs
|
||||
postProj.putAll(rewrittenPostProj);
|
||||
|
||||
// Build the new expression
|
||||
LogicalOlapScanOperator newScan =
|
||||
new LogicalOlapScanOperator.Builder()
|
||||
.withOperator(scan).setProjection(new Projection(newPreProj))
|
||||
.build();
|
||||
LogicalAggregationOperator newAgg =
|
||||
new LogicalAggregationOperator.Builder()
|
||||
.withOperator(agg)
|
||||
.setAggregations(newAggs)
|
||||
.setProjection(new Projection(postProj)).build();
|
||||
|
||||
OptExpression newScanExpr =
|
||||
OptExpression.builder().with(input.inputAt(0)).setOp(newScan).build();
|
||||
OptExpression newAggOpt =
|
||||
OptExpression.builder().with(input).setOp(newAgg).setInputs(List.of(newScanExpr)).build();
|
||||
return Lists.newArrayList(newAggOpt);
|
||||
}
|
||||
|
||||
private static boolean isMinOrMax(CallOperator call) {
|
||||
String name = call.getFnName();
|
||||
return Objects.equals(name, FunctionSet.MIN) || Objects.equals(name, FunctionSet.MAX);
|
||||
}
|
||||
|
||||
private static boolean isAllowedMonotonicFunction(ScalarOperator op) {
|
||||
if (op instanceof CallOperator call) {
|
||||
if (!OperatorFunctionChecker.onlyContainMonotonicFunctions(call).first) {
|
||||
return false;
|
||||
}
|
||||
if (!SUPPORTED_FUNCTION_SET.contains(call.getFnName().toLowerCase())) {
|
||||
return false;
|
||||
}
|
||||
// require one child which is column
|
||||
return !call.getChildren().isEmpty() && call.getChild(0).isColumnRef();
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
private static boolean validateArgumentDomain(ScalarOperator projected, IMinMaxStatsMgr.ColumnMinMax minMax) {
|
||||
FunctionWithChild fnWithChild = extractFunctionAndChild(projected);
|
||||
String functionName = fnWithChild.call.getFunction().functionName();
|
||||
if (functionName.equalsIgnoreCase(FunctionSet.FROM_UNIXTIME) ||
|
||||
functionName.equalsIgnoreCase(FunctionSet.TO_DATETIME)) {
|
||||
long minValue = Long.parseLong(minMax.minValue());
|
||||
long maxValue = Long.parseLong(minMax.maxValue());
|
||||
ConstantOperator scale = ConstantOperator.createInt(0);
|
||||
if (fnWithChild.call.getArguments().size() > 1) {
|
||||
ScalarOperator arg0 = fnWithChild.call.getArguments().get(1);
|
||||
if (!(arg0 instanceof ConstantOperator)) {
|
||||
return false;
|
||||
}
|
||||
scale = (ConstantOperator) arg0;
|
||||
}
|
||||
if (ScalarOperatorFunctions.toDatetime(ConstantOperator.createBigint(minValue), scale).isNull()) {
|
||||
return false;
|
||||
}
|
||||
if (ScalarOperatorFunctions.toDatetime(ConstantOperator.createBigint(maxValue), scale).isNull()) {
|
||||
return false;
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
private static FunctionWithChild extractFunctionAndChild(ScalarOperator op) {
|
||||
if (op instanceof CallOperator call && !call.getChildren().isEmpty()) {
|
||||
return new FunctionWithChild(call, call.getChild(0));
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
private record FunctionWithChild(CallOperator call, ScalarOperator child) {
|
||||
}
|
||||
}
|
||||
|
|
@ -54,7 +54,6 @@ public class PruneSubfieldRule extends TransformationRule {
|
|||
.add(FunctionSet.JSON_QUERY)
|
||||
.add(FunctionSet.JSON_EXISTS)
|
||||
.add(FunctionSet.JSON_LENGTH)
|
||||
.add(FunctionSet.JSON_REMOVE)
|
||||
.build();
|
||||
|
||||
public static final List<String> PRUNE_FUNCTIONS = ImmutableList.<String>builder()
|
||||
|
|
|
|||
|
|
@ -17,6 +17,7 @@ package com.starrocks.sql.optimizer.statistics;
|
|||
import com.github.benmanes.caffeine.cache.AsyncCacheLoader;
|
||||
import com.github.benmanes.caffeine.cache.AsyncLoadingCache;
|
||||
import com.github.benmanes.caffeine.cache.Caffeine;
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import com.google.gson.JsonArray;
|
||||
|
|
@ -28,7 +29,9 @@ import com.starrocks.catalog.InternalCatalog;
|
|||
import com.starrocks.catalog.OlapTable;
|
||||
import com.starrocks.catalog.PhysicalPartition;
|
||||
import com.starrocks.catalog.Table;
|
||||
import com.starrocks.catalog.Type;
|
||||
import com.starrocks.common.Config;
|
||||
import com.starrocks.common.FeConstants;
|
||||
import com.starrocks.common.Pair;
|
||||
import com.starrocks.common.ThreadPoolManager;
|
||||
import com.starrocks.memory.MemoryTrackable;
|
||||
|
|
@ -37,6 +40,7 @@ import com.starrocks.qe.SimpleExecutor;
|
|||
import com.starrocks.server.GlobalStateMgr;
|
||||
import com.starrocks.sql.analyzer.SemanticException;
|
||||
import com.starrocks.sql.optimizer.base.ColumnIdentifier;
|
||||
import com.starrocks.sql.optimizer.rule.tree.prunesubfield.SubfieldAccessPathNormalizer;
|
||||
import com.starrocks.statistic.StatisticUtils;
|
||||
import com.starrocks.thrift.TResultBatch;
|
||||
import com.starrocks.thrift.TResultSinkType;
|
||||
|
|
@ -53,6 +57,7 @@ import java.util.Map;
|
|||
import java.util.Optional;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.Executor;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
public class ColumnMinMaxMgr implements IMinMaxStatsMgr, MemoryTrackable {
|
||||
private record CacheValue(ColumnMinMax minMax, StatsVersion version) {}
|
||||
|
|
@ -76,7 +81,7 @@ public class ColumnMinMaxMgr implements IMinMaxStatsMgr, MemoryTrackable {
|
|||
@Override
|
||||
public Optional<ColumnMinMax> getStats(ColumnIdentifier identifier, StatsVersion version) {
|
||||
CompletableFuture<Optional<CacheValue>> future = cache.get(identifier);
|
||||
if (future.isDone()) {
|
||||
if (future.isDone() || FeConstants.runningUnitTest) {
|
||||
try {
|
||||
Optional<CacheValue> cacheValue = future.get();
|
||||
if (cacheValue.isPresent()) {
|
||||
|
|
@ -119,7 +124,36 @@ public class ColumnMinMaxMgr implements IMinMaxStatsMgr, MemoryTrackable {
|
|||
return List.of(Pair.create(List.of(new ColumnMinMax("1", "10000")), (long) cache.asMap().size()));
|
||||
}
|
||||
|
||||
private static final class CacheLoader implements AsyncCacheLoader<ColumnIdentifier, Optional<CacheValue>> {
|
||||
@VisibleForTesting
|
||||
public static String genMinMaxSql(String catalogName, Database db, OlapTable olapTable, Column column) {
|
||||
List<String> pieces = SubfieldAccessPathNormalizer.parseSimpleJsonPath(column.getColumnId().getId());
|
||||
String columnName;
|
||||
if (pieces.size() == 1) {
|
||||
columnName = column.getName();
|
||||
} else {
|
||||
String path = pieces.stream().skip(1).collect(Collectors.joining("."));
|
||||
String jsonFunc;
|
||||
Type type = column.getType();
|
||||
if (type.equals(Type.BIGINT)) {
|
||||
jsonFunc = "get_json_int";
|
||||
} else if (type.isStringType()) {
|
||||
jsonFunc = "get_json_string";
|
||||
} else if (type.equals(Type.DOUBLE)) {
|
||||
jsonFunc = "get_json_double";
|
||||
} else {
|
||||
throw new IllegalStateException("unsupported json field type: " + column.getType());
|
||||
}
|
||||
columnName = String.format("%s(%s, '%s')", jsonFunc, StatisticUtils.quoting(column.getName()), path);
|
||||
}
|
||||
String sql = "select min(" + columnName + ") as min, max(" + columnName + ") as max"
|
||||
+ " from " +
|
||||
StatisticUtils.quoting(catalogName, db.getOriginName(), olapTable.getName())
|
||||
+ "[_META_];";
|
||||
return sql;
|
||||
}
|
||||
|
||||
protected static final class CacheLoader implements AsyncCacheLoader<ColumnIdentifier, Optional<CacheValue>> {
|
||||
|
||||
@Override
|
||||
public @NonNull CompletableFuture<Optional<CacheValue>> asyncLoad(@NonNull ColumnIdentifier key,
|
||||
@NonNull Executor executor) {
|
||||
|
|
@ -140,9 +174,7 @@ public class ColumnMinMaxMgr implements IMinMaxStatsMgr, MemoryTrackable {
|
|||
long version = olapTable.getPartitions().stream().flatMap(p -> p.getSubPartitions().stream()).map(
|
||||
PhysicalPartition::getVisibleVersionTime).max(Long::compareTo).orElse(0L);
|
||||
String catalogName = InternalCatalog.DEFAULT_INTERNAL_CATALOG_NAME;
|
||||
String sql = "select min(" + column.getName() + ") as min, max(" + column.getName() + ") as max"
|
||||
+ " from " + StatisticUtils.quoting(catalogName, db.getOriginName(), olapTable.getName())
|
||||
+ "[_META_];";
|
||||
String sql = genMinMaxSql(catalogName, db, olapTable, column);
|
||||
|
||||
ConnectContext context = STMT_EXECUTOR.createConnectContext();
|
||||
context.getSessionVariable().setPipelineDop(1);
|
||||
|
|
|
|||
|
|
@ -0,0 +1,208 @@
|
|||
// Copyright 2021-present StarRocks, Inc. All rights reserved.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// https://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package com.starrocks.sql.plan;
|
||||
|
||||
import com.starrocks.catalog.Column;
|
||||
import com.starrocks.catalog.ColumnId;
|
||||
import com.starrocks.catalog.Database;
|
||||
import com.starrocks.catalog.OlapTable;
|
||||
import com.starrocks.catalog.Type;
|
||||
import com.starrocks.sql.optimizer.base.ColumnIdentifier;
|
||||
import com.starrocks.sql.optimizer.operator.scalar.ConstantOperator;
|
||||
import com.starrocks.sql.optimizer.rewrite.ScalarOperatorFunctions;
|
||||
import com.starrocks.sql.optimizer.statistics.ColumnMinMaxMgr;
|
||||
import com.starrocks.sql.optimizer.statistics.IMinMaxStatsMgr;
|
||||
import com.starrocks.sql.optimizer.statistics.StatsVersion;
|
||||
import mockit.Mock;
|
||||
import mockit.MockUp;
|
||||
import org.junit.jupiter.api.AfterAll;
|
||||
import org.junit.jupiter.api.BeforeAll;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.params.ParameterizedTest;
|
||||
import org.junit.jupiter.params.provider.CsvSource;
|
||||
|
||||
import java.util.Optional;
|
||||
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
public class MinMaxMonotonicRewriteTest extends PlanTestBase {
|
||||
|
||||
@BeforeAll
|
||||
public static void beforeAll() {
|
||||
starRocksAssert.getCtx().getSessionVariable().setCboRewriteMonotonicMinmax(true);
|
||||
}
|
||||
|
||||
@AfterAll
|
||||
public static void afterAll() {
|
||||
starRocksAssert.getCtx().getSessionVariable().setCboRewriteMonotonicMinmax(false);
|
||||
}
|
||||
|
||||
@ParameterizedTest
|
||||
@CsvSource(delimiter = '|',
|
||||
value = {
|
||||
"1756099237 | 0 | '2025-08-25 13:20:37'",
|
||||
"1756099237 | 0 | '2025-08-25 13:20:37'",
|
||||
"1756099237001 | 3 | '2025-08-25 13:20:37.001000'",
|
||||
"1756099237001000 | 6 | '2025-08-25 13:20:37.001000'",
|
||||
|
||||
"-1 | 0 |null",
|
||||
"1756099237000000000 | 4 |null",
|
||||
})
|
||||
public void testToDatetime(long ts, int scale, String expect) throws Exception {
|
||||
ConstantOperator datetime = ScalarOperatorFunctions.toDatetime(ConstantOperator.createBigint(ts),
|
||||
ConstantOperator.createInt(scale));
|
||||
assertEquals(expect, datetime.toString());
|
||||
}
|
||||
|
||||
@ParameterizedTest
|
||||
@CsvSource(
|
||||
delimiter = '|',
|
||||
value = {
|
||||
"select min(to_datetime(t1d)) from test_all_type " +
|
||||
"| to_datetime(13: min) ",
|
||||
"select max(to_datetime(t1d)) from test_all_type " +
|
||||
"| to_datetime(13: max) ",
|
||||
"select min(to_datetime(t1d, 6)) from test_all_type " +
|
||||
"| to_datetime(13: min) ",
|
||||
|
||||
"select max(from_unixtime(t1d)) from test_all_type " +
|
||||
"| from_unixtime(13: max) ",
|
||||
"select max(from_unixtime(get_json_int(v_json, 'ts'))) from tjson " +
|
||||
"| from_unixtime(6: max)",
|
||||
"select date_diff('millisecond', " +
|
||||
" min(from_unixtime(t1d)), " +
|
||||
" max(from_unixtime(t1d))) from test_all_type " +
|
||||
"| from_unixtime(15: min)",
|
||||
"select date_diff('millisecond', " +
|
||||
" min(to_datetime(t1d)), " +
|
||||
" max(to_datetime(t1d))) from test_all_type " +
|
||||
"| to_datetime(15: min)",
|
||||
"select date_diff('millisecond', " +
|
||||
" min(to_datetime(get_json_int(v_json, 'ts'), 6)), " +
|
||||
" max(to_datetime(get_json_int(v_json, 'ts'), 6))) from tjson" +
|
||||
"| to_datetime(8: min)",
|
||||
})
|
||||
public void testRewriteMinMaxMonotonic(String sql, String expectedAggregation)
|
||||
throws Exception {
|
||||
{
|
||||
// with MinMaxStats
|
||||
new MockUp<ColumnMinMaxMgr>() {
|
||||
@Mock
|
||||
public Optional<IMinMaxStatsMgr.ColumnMinMax> getStats(ColumnIdentifier identifier,
|
||||
StatsVersion version) {
|
||||
return Optional.of(new IMinMaxStatsMgr.ColumnMinMax("1", "100"));
|
||||
}
|
||||
};
|
||||
String plan = getFragmentPlan(sql);
|
||||
assertContains(plan, expectedAggregation);
|
||||
}
|
||||
|
||||
{
|
||||
// without MinMaxStats
|
||||
new MockUp<ColumnMinMaxMgr>() {
|
||||
@Mock
|
||||
public Optional<IMinMaxStatsMgr.ColumnMinMax> getStats(ColumnIdentifier identifier,
|
||||
StatsVersion version) {
|
||||
return Optional.empty();
|
||||
}
|
||||
};
|
||||
String plan = getFragmentPlan(sql);
|
||||
assertNotContains(plan, expectedAggregation);
|
||||
}
|
||||
|
||||
{
|
||||
// invalid MinMaxStats
|
||||
new MockUp<ColumnMinMaxMgr>() {
|
||||
@Mock
|
||||
public Optional<IMinMaxStatsMgr.ColumnMinMax> getStats(ColumnIdentifier identifier,
|
||||
StatsVersion version) {
|
||||
return Optional.of(new IMinMaxStatsMgr.ColumnMinMax("-1", "100"));
|
||||
}
|
||||
};
|
||||
String plan = getFragmentPlan(sql);
|
||||
assertNotContains(plan, expectedAggregation);
|
||||
}
|
||||
}
|
||||
|
||||
@ParameterizedTest
|
||||
@CsvSource(
|
||||
delimiter = '|',
|
||||
value = {
|
||||
"select min(t1d + 1) from test_all_type " +
|
||||
"| output: min(4: t1d + 1)",
|
||||
"select max(t1d * 2) from test_all_type " +
|
||||
"| output: max(4: t1d * 2)",
|
||||
"select min(concat(t1a, 'suffix')) from test_all_type " +
|
||||
"| output: min(concat(1: t1a, 'suffix'))",
|
||||
"select max(abs(t1d)) from test_all_type " +
|
||||
"| output: max(abs(4: t1d))",
|
||||
"select min(if(t1d > 0, t1d, 0)) from test_all_type " +
|
||||
"| output: min(if(4: t1d > 0, 4: t1d, 0))",
|
||||
"select min(cast(t1d as date)) from test_all_type " +
|
||||
"| output: min(CAST(4: t1d AS DATE))",
|
||||
"select min(t1d) from test_all_type having min(t1d) > 100 " +
|
||||
"| output: min(4: t1d)",
|
||||
})
|
||||
public void testNoRewriteForNonMonotonicExpressions(String sql, String expectedAggregation)
|
||||
throws Exception {
|
||||
String plan = getFragmentPlan(sql);
|
||||
assertContains(plan, expectedAggregation);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testColumnMinMaxMgrGenSql() throws Exception {
|
||||
// Test regular column
|
||||
Database mockDb = mock(Database.class);
|
||||
when(mockDb.getOriginName()).thenReturn("test_db");
|
||||
|
||||
OlapTable mockTable = mock(OlapTable.class);
|
||||
when(mockTable.getName()).thenReturn("test_table");
|
||||
|
||||
Column regularColumn = mock(Column.class);
|
||||
when(regularColumn.getName()).thenReturn("id");
|
||||
when(regularColumn.getType()).thenReturn(Type.BIGINT);
|
||||
when(regularColumn.getColumnId()).thenReturn(ColumnId.create("id"));
|
||||
|
||||
String sql1 = ColumnMinMaxMgr.genMinMaxSql("default_catalog", mockDb, mockTable, regularColumn);
|
||||
String expectedSql1 =
|
||||
"select min(id) as min, max(id) as max from `default_catalog`.`test_db`.`test_table`[_META_];";
|
||||
assertEquals(expectedSql1, sql1);
|
||||
|
||||
// Test JSON column with BIGINT type
|
||||
Column jsonColumn = mock(Column.class);
|
||||
when(jsonColumn.getName()).thenReturn("json_col");
|
||||
when(jsonColumn.getType()).thenReturn(Type.BIGINT);
|
||||
when(jsonColumn.getColumnId()).thenReturn(ColumnId.create("json_col.field1"));
|
||||
|
||||
String sql2 = ColumnMinMaxMgr.genMinMaxSql("default_catalog", mockDb, mockTable, jsonColumn);
|
||||
String expectedSql2 = "select min(get_json_int(`json_col`, 'field1')) as min, " +
|
||||
"max(get_json_int(`json_col`, 'field1')) as max " +
|
||||
"from `default_catalog`.`test_db`.`test_table`[_META_];";
|
||||
assertEquals(expectedSql2, sql2);
|
||||
|
||||
// Test JSON column with STRING type
|
||||
when(jsonColumn.getName()).thenReturn("json_col");
|
||||
when(jsonColumn.getType()).thenReturn(Type.VARCHAR);
|
||||
when(jsonColumn.getColumnId()).thenReturn(ColumnId.create("json_col.field2"));
|
||||
|
||||
String sql3 = ColumnMinMaxMgr.genMinMaxSql("default_catalog", mockDb, mockTable, jsonColumn);
|
||||
String expectedSql3 = "select min(get_json_string(`json_col`, 'field2')) as min, " +
|
||||
"max(get_json_string(`json_col`, 'field2')) as max " +
|
||||
"from `default_catalog`.`test_db`.`test_table`[_META_];";
|
||||
assertEquals(expectedSql3, sql3);
|
||||
}
|
||||
}
|
||||
|
|
@ -984,6 +984,7 @@ public class PlanTestBase extends PlanTestNoneDBBase {
|
|||
connectContext.getSessionVariable().setEnableShortCircuit(true);
|
||||
connectContext.getSessionVariable().setCboPushDownGroupingSet(false);
|
||||
connectContext.getSessionVariable().setCboEnableSingleNodePreferTwoStageAggregate(false);
|
||||
connectContext.getSessionVariable().setCboRewriteMonotonicMinmax(false);
|
||||
}
|
||||
|
||||
@AfterAll
|
||||
|
|
|
|||
Loading…
Reference in New Issue