[Feature] Introduce asof join (FE Part) (#63070)

Signed-off-by: stephen <stephen5217@163.com>
This commit is contained in:
stephen 2025-09-16 19:57:33 +08:00 committed by GitHub
parent 6e32a078e3
commit 9e641d9eaf
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
33 changed files with 1570 additions and 76 deletions

View File

@ -43,6 +43,7 @@ import com.starrocks.sql.ast.expression.Expr;
import com.starrocks.sql.ast.expression.JoinOperator;
import com.starrocks.sql.ast.expression.SlotRef;
import com.starrocks.sql.ast.expression.TableRef;
import com.starrocks.thrift.TAsofJoinCondition;
import com.starrocks.thrift.TEqJoinCondition;
import com.starrocks.thrift.THashJoinNode;
import com.starrocks.thrift.TNormalHashJoinNode;
@ -129,6 +130,17 @@ public class HashJoinNode extends JoinNode {
}
sqlJoinPredicatesBuilder.append(eqJoinPredicate.toSql());
}
if (joinOp.isAsofJoin() && asofJoinConjunct != null) {
TAsofJoinCondition asofJoinCondition = new TAsofJoinCondition(asofJoinConjunct.getChild(0).treeToThrift(),
asofJoinConjunct.getChild(1).treeToThrift(), asofJoinConjunct.getOpcode());
msg.hash_join_node.setAsof_join_condition(asofJoinCondition);
if (!sqlJoinPredicatesBuilder.isEmpty()) {
sqlJoinPredicatesBuilder.append(", ");
}
sqlJoinPredicatesBuilder.append(asofJoinConjunct.toSql());
}
for (Expr e : otherJoinConjuncts) {
msg.hash_join_node.addToOther_join_conjuncts(e.treeToThrift());
if (sqlJoinPredicatesBuilder.length() > 0) {

View File

@ -74,6 +74,8 @@ public abstract class JoinNode extends PlanNode implements RuntimeFilterBuildNod
protected List<BinaryPredicate> eqJoinConjuncts = Lists.newArrayList();
// join conjuncts from the JOIN clause that aren't equi-join predicates
protected List<Expr> otherJoinConjuncts;
// ASOF JOIN temporal inequality condition for finding closest match (only one per ASOF JOIN)
protected BinaryPredicate asofJoinConjunct;
protected boolean isPushDown;
protected DistributionMode distrMode;
protected String colocateReason = ""; // if can not do colocate join, set reason here
@ -187,7 +189,7 @@ public abstract class JoinNode extends PlanNode implements RuntimeFilterBuildNod
SessionVariable sessionVariable = ConnectContext.get().getSessionVariable();
JoinOperator joinOp = getJoinOp();
PlanNode inner = getChild(1);
if (!joinOp.isInnerJoin() && !joinOp.isLeftSemiJoin() && !joinOp.isRightJoin() && !joinOp.isCrossJoin()) {
if (!joinOp.isAnyInnerJoin() && !joinOp.isLeftSemiJoin() && !joinOp.isRightJoin() && !joinOp.isCrossJoin()) {
return;
}
@ -357,11 +359,11 @@ public abstract class JoinNode extends PlanNode implements RuntimeFilterBuildNod
Expr probeExpr,
List<Expr> partitionByExprs) {
List<Integer> sides = ImmutableList.of();
if (joinOp.isLeftAntiJoin() || joinOp.isLeftOuterJoin()) {
if (joinOp.isLeftAntiJoin() || joinOp.isAnyLeftOuterJoin()) {
sides = ImmutableList.of(0);
} else if (joinOp.isRightAntiJoin() || joinOp.isRightOuterJoin()) {
sides = ImmutableList.of(1);
} else if (joinOp.isInnerJoin() || joinOp.isSemiJoin() || joinOp.isCrossJoin()) {
} else if (joinOp.isAnyInnerJoin() || joinOp.isSemiJoin() || joinOp.isCrossJoin()) {
sides = ImmutableList.of(0, 1);
}
@ -457,6 +459,11 @@ public abstract class JoinNode extends PlanNode implements RuntimeFilterBuildNod
partitionExprs = exprs;
}
public void setAsofJoinConjunct(Expr asofJoinConjunct) {
Preconditions.checkArgument(asofJoinConjunct instanceof BinaryPredicate);
this.asofJoinConjunct = (BinaryPredicate) asofJoinConjunct;
}
@Override
public void computeStats() {
}
@ -520,6 +527,12 @@ public abstract class JoinNode extends PlanNode implements RuntimeFilterBuildNod
}
output.append("\n");
}
if (asofJoinConjunct != null) {
output.append(detailPrefix).append("asof join conjunct: ");
output.append(explainExpr(detailLevel, List.of(asofJoinConjunct))).append("\n");
}
if (!otherJoinConjuncts.isEmpty()) {
output.append(detailPrefix).append("other join predicates: ")
.append(explainExpr(detailLevel, otherJoinConjuncts)).append("\n");

View File

@ -802,6 +802,9 @@ public class QueryAnalyzer {
Scope leftScope = process(join.getLeft(), parentScope);
Scope rightScope;
if (join.getRight() instanceof TableFunctionRelation || join.isLateral()) {
if (join.getJoinOp().isAsofJoin()) {
throw new SemanticException("ASOF join is not supported with lateral join");
}
if (!(join.getRight() instanceof TableFunctionRelation)) {
throw new SemanticException("Only support lateral join with UDTF");
} else if (!join.getJoinOp().isInnerJoin() && !join.getJoinOp().isCrossJoin() &&
@ -860,6 +863,12 @@ public class QueryAnalyzer {
throw new SemanticException("WHERE clause must evaluate to a boolean: actual type %s",
joinEqual.getType());
}
// Validate ASOF JOIN conditions
if (join.getJoinOp().isAsofJoin()) {
validateAsofJoinConditions(joinEqual);
}
// check the join on predicate, example:
// we have col_json, we can't join on table_a.col_json = table_b.col_json,
// but we can join on cast(table_a.col_json->"a" as int) = cast(table_b.col_json->"a" as int)
@ -867,7 +876,7 @@ public class QueryAnalyzer {
// and table_a.col_struct.a = table_b.col_struct.a
checkJoinEqual(joinEqual);
} else {
if (join.getJoinOp().isOuterJoin() || join.getJoinOp().isSemiAntiJoin()) {
if (join.getJoinOp().isOuterJoin() || join.getJoinOp().isSemiAntiJoin() || join.getJoinOp().isAsofJoin()) {
throw new SemanticException(join.getJoinOp() + " requires an ON or USING clause.");
}
}
@ -880,7 +889,7 @@ public class QueryAnalyzer {
scope = new Scope(RelationId.of(join), leftScope.getRelationFields());
} else if (join.getJoinOp().isRightSemiAntiJoin()) {
scope = new Scope(RelationId.of(join), rightScope.getRelationFields());
} else if (join.getJoinOp().isLeftOuterJoin()) {
} else if (join.getJoinOp().isAnyLeftOuterJoin()) {
List<Field> rightFields = getFieldsWithNullable(rightScope);
scope = new Scope(RelationId.of(join),
leftScope.getRelationFields().joinWith(new RelationFields(rightFields)));
@ -915,6 +924,15 @@ public class QueryAnalyzer {
return newFields;
}
private void validateAsofJoinConditions(Expr joinPredicate) {
if (joinPredicate == null) {
throw new SemanticException("ASOF JOIN requires ON clause with join conditions");
}
AsofJoinConditionValidator validator = new AsofJoinConditionValidator();
validator.validate(joinPredicate);
}
private Expr analyzeJoinUsing(List<String> usingColNames, Scope left, Scope right) {
Expr joinEqual = null;
for (String colName : usingColNames) {
@ -1666,4 +1684,67 @@ public class QueryAnalyzer {
return null;
}
}
private static class AsofJoinConditionValidator {
private int equalityPredicateCount = 0;
private int inequalityPredicateCount = 0;
private boolean containsOrOperator = false;
public void validate(Expr joinPredicate) {
visit(joinPredicate);
if (equalityPredicateCount == 0) {
throw new SemanticException("ASOF JOIN requires at least one equality condition in join ON clause");
}
if (inequalityPredicateCount == 0) {
throw new SemanticException("ASOF JOIN requires exactly one temporal inequality condition in join ON clause");
}
if (inequalityPredicateCount > 1) {
throw new SemanticException("ASOF JOIN supports only one inequality condition in join ON clause, found: " +
inequalityPredicateCount);
}
}
private void visit(Expr expr) {
if (expr instanceof CompoundPredicate compound) {
if (compound.getOp() == CompoundPredicate.Operator.OR) {
containsOrOperator = true;
}
visit(compound.getChild(0));
visit(compound.getChild(1));
} else if (expr instanceof BinaryPredicate binary) {
if (binary.getOp() == BinaryType.EQ) {
equalityPredicateCount++;
} else if (binary.getOp().isRange()) {
inequalityPredicateCount++;
validateTemporalConditionTypes(binary);
} else {
throw new SemanticException("ASOF JOIN does not support '" + binary.getOp() + "' operator " +
"in join ON clause");
}
}
if (containsOrOperator) {
throw new SemanticException("ASOF JOIN conditions do not support OR operators in join ON clause");
}
}
private void validateTemporalConditionTypes(BinaryPredicate predicate) {
Type leftType = predicate.getChild(0).getType();
Type rightType = predicate.getChild(1).getType();
if (!isTemporalOrderingType(leftType) || !isTemporalOrderingType(rightType)) {
throw new SemanticException(
"ASOF JOIN temporal condition supports only BIGINT, DATE, or DATETIME types in join ON clause, found: " +
leftType.toSql() + " and " + rightType.toSql() + ". predicate: " + predicate.toMySql()
);
}
}
private boolean isTemporalOrderingType(Type type) {
return type.isBigint() || type.isDate() || type.isDatetime();
}
}
}

View File

@ -53,7 +53,10 @@ public enum JoinOperator {
// NOT IN subqueries. It can have a single equality join conjunct
// that returns TRUE when the rhs is NULL.
NULL_AWARE_LEFT_ANTI_JOIN("NULL AWARE LEFT ANTI JOIN", "▷*",
TJoinOp.NULL_AWARE_LEFT_ANTI_JOIN);
TJoinOp.NULL_AWARE_LEFT_ANTI_JOIN),
ASOF_INNER_JOIN("ASOF INNER JOIN", "⋈ₐ", TJoinOp.ASOF_INNER_JOIN),
ASOF_LEFT_OUTER_JOIN("ASOF LEFT OUTER JOIN", "⟕ₐ", TJoinOp.ASOF_LEFT_OUTER_JOIN);
private final String description;
private final String algebra;
@ -79,7 +82,7 @@ public enum JoinOperator {
}
public boolean isOuterJoin() {
return this == LEFT_OUTER_JOIN || this == RIGHT_OUTER_JOIN || this == FULL_OUTER_JOIN;
return this == LEFT_OUTER_JOIN || this == RIGHT_OUTER_JOIN || this == FULL_OUTER_JOIN || this == ASOF_LEFT_OUTER_JOIN;
}
public boolean isSemiAntiJoin() {
@ -167,8 +170,24 @@ public enum JoinOperator {
return Sets.newHashSet(INNER_JOIN, CROSS_JOIN);
}
public boolean canGenerateRuntimeFilter() {
return !(isLeftOuterJoin() || isFullOuterJoin() || isLeftAntiJoin());
public boolean isAsofJoin() {
return this == ASOF_INNER_JOIN || this == ASOF_LEFT_OUTER_JOIN;
}
public boolean isAsofInnerJoin() {
return this == ASOF_INNER_JOIN;
}
public boolean isAsofLeftOuterJoin() {
return this == ASOF_LEFT_OUTER_JOIN;
}
public boolean isAnyInnerJoin() {
return this == INNER_JOIN || this == ASOF_INNER_JOIN;
}
public boolean isAnyLeftOuterJoin() {
return this == LEFT_OUTER_JOIN || this == ASOF_LEFT_OUTER_JOIN;
}
}

View File

@ -87,7 +87,7 @@ public class JoinHelper {
leftOnCols = Lists.newArrayList();
rightOnCols = Lists.newArrayList();
boolean leftTableAggStrict = type.isLeftOuterJoin() || type.isFullOuterJoin();
boolean leftTableAggStrict = type.isAnyLeftOuterJoin() || type.isFullOuterJoin();
boolean rightTableAggStrict = type.isRightOuterJoin() || type.isFullOuterJoin();
for (BinaryPredicateOperator binaryPredicate : equalsPredicate) {
@ -220,4 +220,35 @@ public class JoinHelper {
return type.isCrossJoin() || JoinOperator.NULL_AWARE_LEFT_ANTI_JOIN.equals(type) ||
(type.isInnerJoin() && equalOnPredicate.isEmpty()) || HintNode.HINT_JOIN_BROADCAST.equals(hint);
}
/**
* Apply commutative transformation to a binary predicate
* For comparison operators (>, <, >=, <=), swap operands and transform operators
* For AsOf join scenarios where we need: left_table.column OP right_table.column
*
* @param predicate The predicate to transform
* @param leftColumns Columns from left table
* @param rightColumns Columns from right table
* @return Transformed predicate with proper left-right operand order
*/
public static ScalarOperator applyCommutativeToPredicates(ScalarOperator predicate,
ColumnRefSet leftColumns,
ColumnRefSet rightColumns) {
if (predicate instanceof BinaryPredicateOperator binaryPred) {
// Only apply to comparison operators (>, <, >=, <=)
if (binaryPred.getBinaryType().isRange()) {
if (!leftColumns.containsAll(binaryPred.getChild(0).getUsedColumns()) &&
rightColumns.containsAll(binaryPred.getChild(0).getUsedColumns())) {
return binaryPred.commutative();
} else {
return predicate;
}
} else {
return predicate;
}
} else {
return predicate;
}
}
}

View File

@ -194,13 +194,13 @@ public class OutputPropertyDeriver extends PropertyDeriverBase<PhysicalPropertyS
EquivalentDescriptor equivDesc = distributionSpec.getEquivDesc();
JoinOperator joinOperator = node.getJoinType();
if (joinOperator.isInnerJoin()) {
if (joinOperator.isAnyInnerJoin()) {
for (int i = 0; i < leftOnPredicateColumns.size(); i++) {
DistributionCol leftCol = leftOnPredicateColumns.get(i);
DistributionCol rightCol = rightOnPredicateColumns.get(i);
equivDesc.unionDistributionCols(leftCol, rightCol);
}
} else if (joinOperator.isLeftOuterJoin() || joinOperator.isRightOuterJoin()) {
} else if (joinOperator.isAnyLeftOuterJoin() || joinOperator.isRightOuterJoin()) {
for (int i = 0; i < leftOnPredicateColumns.size(); i++) {
DistributionCol leftCol = leftOnPredicateColumns.get(i);
DistributionCol rightCol = rightOnPredicateColumns.get(i);

View File

@ -83,7 +83,7 @@ public class JoinPredicatePushdown {
LogicalJoinOperator join = (LogicalJoinOperator) joinOptExpression.getOp();
ColumnRefSet leftColumns = joinOptExpression.inputAt(0).getOutputColumns();
ColumnRefSet rightColumns = joinOptExpression.inputAt(1).getOutputColumns();
if (join.isInnerOrCrossJoin()) {
if (join.isInnerOrCrossJoin() || join.getJoinType().isAsofInnerJoin()) {
for (ScalarOperator predicate : conjunctList) {
ColumnRefSet usedColumns = predicate.getUsedColumns();
if (usedColumns.isEmpty()) {
@ -100,7 +100,7 @@ public class JoinPredicatePushdown {
ColumnRefSet usedColumns = predicate.getUsedColumns();
if (leftColumns.containsAll(usedColumns) && join.getJoinType().isRightOuterJoin()) {
leftPushDown.add(predicate);
} else if (rightColumns.containsAll(usedColumns) && join.getJoinType().isLeftOuterJoin()) {
} else if (rightColumns.containsAll(usedColumns) && join.getJoinType().isAnyLeftOuterJoin()) {
rightPushDown.add(predicate);
}
}
@ -142,7 +142,7 @@ public class JoinPredicatePushdown {
private OptExpression pushdownFilterPredicate(ScalarOperator predicateToPush) {
LogicalJoinOperator join = joinOptExpression.getOp().cast();
if (join.isInnerOrCrossJoin()) {
if (join.isInnerOrCrossJoin() || join.getJoinType().isAsofInnerJoin()) {
return pushdownOnPredicate(predicateToPush);
}
@ -151,7 +151,7 @@ public class JoinPredicatePushdown {
ColumnRefSet rightColumns = joinOptExpression.inputAt(1).getOutputColumns();
List<ScalarOperator> remainingFilter = new ArrayList<>();
if (join.getJoinType().isLeftOuterJoin()) {
if (join.getJoinType().isAnyLeftOuterJoin()) {
for (ScalarOperator e : Utils.extractConjuncts(predicateToPush)) {
ColumnRefSet usedColumns = e.getUsedColumns();
if (leftColumns.containsAll(usedColumns)) {
@ -203,7 +203,7 @@ public class JoinPredicatePushdown {
LogicalJoinOperator newJoinOperator;
if (!remainingFilter.isEmpty()) {
if (join.getJoinType().isInnerJoin()) {
if (join.getJoinType().isAnyInnerJoin()) {
newJoinOperator = new LogicalJoinOperator.Builder().withOperator(join)
.setOnPredicate(Utils.compoundAnd(join.getOnPredicate(), Utils.compoundAnd(remainingFilter)))
.build();
@ -341,7 +341,7 @@ public class JoinPredicatePushdown {
JoinOperator joinType = joinOp.getJoinType();
boolean isLeftEmpty = leftPushDown.isEmpty();
if (joinType.isInnerJoin() || joinType.isRightSemiJoin()) {
if (joinType.isAnyInnerJoin() || joinType.isRightSemiJoin()) {
leftEQ.stream().map(c -> new IsNullPredicateOperator(true, c.clone(), true)).forEach(notNull -> {
optimizerContext.addPushdownNotNullPredicates(notNull);
if (isLeftEmpty) {
@ -350,7 +350,7 @@ public class JoinPredicatePushdown {
});
}
boolean isRightEmpty = rightPushDown.isEmpty();
if (joinType.isInnerJoin() || joinType.isLeftSemiJoin()) {
if (joinType.isAnyInnerJoin() || joinType.isLeftSemiJoin()) {
rightEQ.stream().map(c -> new IsNullPredicateOperator(true, c.clone(), true)).forEach(notNull -> {
optimizerContext.addPushdownNotNullPredicates(notNull);
if (isRightEmpty) {
@ -386,7 +386,7 @@ public class JoinPredicatePushdown {
joinOptExpression = convertOuterToInner(joinOptExpression, predicateToPush);
join = (LogicalJoinOperator) joinOptExpression.getOp();
}
if (join.isInnerOrCrossJoin()) {
if (join.isInnerOrCrossJoin() || join.getJoinType().isAsofInnerJoin()) {
ScalarOperator predicate =
rangePredicateDerive(Utils.compoundAnd(join.getOnPredicate(), predicateToPush, join.getPredicate()));
return equivalenceDerive(predicate, true);
@ -394,7 +394,7 @@ public class JoinPredicatePushdown {
ScalarOperator predicate = rangePredicateDerive(predicateToPush);
JoinOperator joinType = join.getJoinType();
if (optimizerContext.isEnableJoinEquivalenceDerive() ||
(!joinType.isLeftOuterJoin() && !joinType.isRightOuterJoin())) {
(!joinType.isAnyLeftOuterJoin() && !joinType.isRightOuterJoin())) {
getPushdownPredicatesFromEquivalenceDerive(
Utils.compoundAnd(join.getOnPredicate(), predicate), joinOptExpression, join);
}
@ -428,7 +428,7 @@ public class JoinPredicatePushdown {
rightPushDown.add(p);
}
}
} else if (join.getJoinType().isLeftOuterJoin()) {
} else if (join.getJoinType().isAnyLeftOuterJoin()) {
for (ScalarOperator p : derivedPredicates) {
if (rightOutputColumns.containsAll(p.getUsedColumns()) &&
Utils.canEliminateNull(rightOutputColumnOps, p.clone())) {
@ -464,7 +464,7 @@ public class JoinPredicatePushdown {
ScalarOperator equivalenceDeriveOnPredicate(ScalarOperator on, OptExpression joinOpt, LogicalJoinOperator join) {
// For SQl: select * from t1 left join t2 on t1.id = t2.id and t1.id > 1
// Infer t2.id > 1 and Push down it to right child
if (!join.getJoinType().isInnerJoin() && !join.getJoinType().isSemiJoin() &&
if (!join.getJoinType().isAnyInnerJoin() && !join.getJoinType().isSemiJoin() &&
!join.getJoinType().isOuterJoin()) {
return on;
}
@ -476,9 +476,9 @@ public class JoinPredicatePushdown {
ScalarOperator derivedPredicate = equivalenceDerive(on, false);
List<ScalarOperator> derivedPredicates = Utils.extractConjuncts(derivedPredicate);
if (join.getJoinType().isInnerJoin() || join.getJoinType().isSemiJoin()) {
if (join.getJoinType().isAnyInnerJoin() || join.getJoinType().isSemiJoin()) {
return Utils.compoundAnd(on, derivedPredicate);
} else if (join.getJoinType().isLeftOuterJoin()) {
} else if (join.getJoinType().isAnyLeftOuterJoin()) {
for (ScalarOperator p : derivedPredicates) {
if (rightOutputColumns.containsAll(p.getUsedColumns())) {
p.setIsPushdown(true);
@ -544,11 +544,13 @@ public class JoinPredicatePushdown {
Set<ColumnRefOperator> leftOutputColumnOps = columnRefFactory.getColumnRefs(leftColumns);
Set<ColumnRefOperator> rightOutputColumnOps = columnRefFactory.getColumnRefs(rightColumns);
if (join.getJoinType().isLeftOuterJoin()) {
if (join.getJoinType().isAnyLeftOuterJoin()) {
if (Utils.canEliminateNull(rightOutputColumnOps, predicateToPush)
|| hasPushdownNotNull(rightOutputColumnOps, optimizerContext.getPushdownNotNullPredicates())) {
JoinOperator newJoinType = join.getJoinType().isAsofLeftOuterJoin() ? JoinOperator.ASOF_INNER_JOIN :
JoinOperator.INNER_JOIN;
OptExpression newOpt = OptExpression.create(new LogicalJoinOperator.Builder().withOperator(join)
.setJoinType(JoinOperator.INNER_JOIN)
.setJoinType(newJoinType)
.build(),
joinOpt.getInputs());
return newOpt;

View File

@ -14,6 +14,7 @@
package com.starrocks.sql.optimizer.rule.join;
import com.starrocks.sql.ast.expression.JoinOperator;
/**
@ -25,12 +26,12 @@ public class JoinReorderProperty {
//not called
}
private static final int[][] INNER_ASSOCIATIVITY_PROPERTY = new int[10][10];
private static final int[][] INNER_ASSOCIATIVITY_PROPERTY = new int[12][12];
private static final int[][] OUTER_ASSOCIATIVITY_PROPERTY = new int[10][10];
private static final int[][] OUTER_ASSOCIATIVITY_PROPERTY = new int[12][12];
private static final int[][] INNER_LEFT_ASSCOM_PROPERTY = new int[10][10];
private static final int[][] OUTER_LEFT_ASSCOM_PROPERTY = new int[10][10];
private static final int[][] INNER_LEFT_ASSCOM_PROPERTY = new int[12][12];
private static final int[][] OUTER_LEFT_ASSCOM_PROPERTY = new int[12][12];
public static final int UNSUPPORTED = 0;
@ -61,8 +62,8 @@ public class JoinReorderProperty {
* 1 0 0 0 0 0 0 0 0 0
* × 1 0 0 0 0 0 0 0 0 0
*/
INNER_ASSOCIATIVITY_PROPERTY[JoinOperator.INNER_JOIN.ordinal()] = new int[] {1, 0, 0, 0, 0, 0, 0, 0, 0, 0};
INNER_ASSOCIATIVITY_PROPERTY[JoinOperator.CROSS_JOIN.ordinal()] = new int[] {1, 0, 0, 0, 0, 0, 0, 0, 0, 0};
INNER_ASSOCIATIVITY_PROPERTY[JoinOperator.INNER_JOIN.ordinal()] = new int[] {1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0};
INNER_ASSOCIATIVITY_PROPERTY[JoinOperator.CROSS_JOIN.ordinal()] = new int[] {1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0};
/*
* bottom join operator within the row and top join operator with in the column.
@ -74,10 +75,10 @@ public class JoinReorderProperty {
* 0 2 0 0 0 0 0 2 0 0
* × 0 1 0 0 0 0 0 0 0 0
*/
OUTER_ASSOCIATIVITY_PROPERTY[JoinOperator.INNER_JOIN.ordinal()] = new int[] {0, 1, 0, 0, 0, 0, 0, 0, 0, 0};
OUTER_ASSOCIATIVITY_PROPERTY[JoinOperator.LEFT_OUTER_JOIN.ordinal()] = new int[] {0, 2, 0, 0, 0, 0, 0, 0, 0, 0};
OUTER_ASSOCIATIVITY_PROPERTY[JoinOperator.FULL_OUTER_JOIN.ordinal()] = new int[] {0, 2, 0, 0, 0, 0, 0, 2, 0, 0};
OUTER_ASSOCIATIVITY_PROPERTY[JoinOperator.CROSS_JOIN.ordinal()] = new int[] {0, 1, 0, 0, 0, 0, 0, 0, 0, 0};
OUTER_ASSOCIATIVITY_PROPERTY[JoinOperator.INNER_JOIN.ordinal()] = new int[] {0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0};
OUTER_ASSOCIATIVITY_PROPERTY[JoinOperator.LEFT_OUTER_JOIN.ordinal()] = new int[] {0, 2, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0};
OUTER_ASSOCIATIVITY_PROPERTY[JoinOperator.FULL_OUTER_JOIN.ordinal()] = new int[] {0, 2, 0, 0, 0, 0, 0, 2, 0, 0, 0, 0};
OUTER_ASSOCIATIVITY_PROPERTY[JoinOperator.CROSS_JOIN.ordinal()] = new int[] {0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0};
/*
* bottom join operator within the row and top join operator with in the column
@ -87,8 +88,8 @@ public class JoinReorderProperty {
* 0 0 1 1 0 0 0 0 0 0
* × 0 0 1 1 0 0 0 0 0 0
*/
INNER_LEFT_ASSCOM_PROPERTY[JoinOperator.INNER_JOIN.ordinal()] = new int[] {0, 0, 1, 1, 0, 0, 0, 0, 0, 0};
INNER_LEFT_ASSCOM_PROPERTY[JoinOperator.CROSS_JOIN.ordinal()] = new int[] {0, 0, 1, 1, 0, 0, 0, 0, 0, 0};
INNER_LEFT_ASSCOM_PROPERTY[JoinOperator.INNER_JOIN.ordinal()] = new int[] {0, 0, 1, 1, 0, 0, 0, 0, 0, 0, 0, 0};
INNER_LEFT_ASSCOM_PROPERTY[JoinOperator.CROSS_JOIN.ordinal()] = new int[] {0, 0, 1, 1, 0, 0, 0, 0, 0, 0, 0, 0};
/*
* bottom join operator within the row and top join operator with in the column
@ -100,10 +101,10 @@ public class JoinReorderProperty {
* 0 1 1 1 0 0 0 0 0 0
* 0 2 0 0 0 0 0 2 0 0
*/
OUTER_LEFT_ASSCOM_PROPERTY[JoinOperator.LEFT_OUTER_JOIN.ordinal()] = new int[] {1, 1, 1, 1, 0, 0, 0, 2, 0, 0};
OUTER_LEFT_ASSCOM_PROPERTY[JoinOperator.LEFT_SEMI_JOIN.ordinal()] = new int[] {0, 1, 1, 1, 0, 0, 0, 0, 0, 0};
OUTER_LEFT_ASSCOM_PROPERTY[JoinOperator.LEFT_ANTI_JOIN.ordinal()] = new int[] {0, 1, 1, 1, 0, 0, 0, 0, 0, 0};
OUTER_LEFT_ASSCOM_PROPERTY[JoinOperator.FULL_OUTER_JOIN.ordinal()] = new int[] {0, 2, 0, 0, 0, 0, 0, 2, 0, 0};
OUTER_LEFT_ASSCOM_PROPERTY[JoinOperator.LEFT_OUTER_JOIN.ordinal()] = new int[] {1, 1, 1, 1, 0, 0, 0, 2, 0, 0, 0, 0};
OUTER_LEFT_ASSCOM_PROPERTY[JoinOperator.LEFT_SEMI_JOIN.ordinal()] = new int[] {0, 1, 1, 1, 0, 0, 0, 0, 0, 0, 0, 0};
OUTER_LEFT_ASSCOM_PROPERTY[JoinOperator.LEFT_ANTI_JOIN.ordinal()] = new int[] {0, 1, 1, 1, 0, 0, 0, 0, 0, 0, 0, 0};
OUTER_LEFT_ASSCOM_PROPERTY[JoinOperator.FULL_OUTER_JOIN.ordinal()] = new int[] {0, 2, 0, 0, 0, 0, 0, 2, 0, 0, 0, 0};
}
public static int getAssociativityProperty(JoinOperator bottomJoinType, JoinOperator topJoinType, boolean isInnerMode) {
@ -125,4 +126,4 @@ public class JoinReorderProperty {
}
}
}

View File

@ -94,7 +94,8 @@ public class EliminateJoinWithConstantRule extends TransformationRule {
JoinOperator joinType = joinOperator.getJoinType();
// anti/full outer join cannot be eliminated.
// semi join needs to distinct output which cannot be eliminated.
if (joinType.isAntiJoin() || joinType.isFullOuterJoin() || joinType.isSemiJoin()) {
// ASOF join has temporal ordering semantics and cannot be eliminated with constants.
if (joinType.isAntiJoin() || joinType.isFullOuterJoin() || joinType.isSemiJoin() || joinType.isAsofJoin()) {
return false;
}
if (constantIndex == 0) {

View File

@ -84,7 +84,7 @@ public class OnPredicateMoveAroundRule extends TransformationRule {
DomainProperty rightDomainProperty = rightChild.getDomainProperty();
OptExpression result = null;
if (joinOperator.getJoinType().isInnerJoin() || joinOperator.getJoinType().isSemiJoin()) {
if (joinOperator.getJoinType().isAnyInnerJoin() || joinOperator.getJoinType().isSemiJoin()) {
List<ScalarOperator> toLeftPredicates = binaryPredicates.stream()
.map(e -> derivePredicate(e, rightDomainProperty, leftDomainProperty, true))
.filter(Objects::nonNull)
@ -129,7 +129,7 @@ public class OnPredicateMoveAroundRule extends TransformationRule {
Lists.newArrayList(input.inputAt(0), OptExpression.create(filter, input.inputAt(1)))
);
}
} else if (joinOperator.getJoinType().isLeftOuterJoin()) {
} else if (joinOperator.getJoinType().isAnyLeftOuterJoin()) {
List<ScalarOperator> toRightPredicates = binaryPredicates.stream()
.map(e -> derivePredicate(e, leftDomainProperty, rightDomainProperty, false))
.collect(Collectors.toList());

View File

@ -67,14 +67,14 @@ public class PruneEmptyJoinRule extends TransformationRule {
JoinOperator type = join.getJoinType();
int joinIndex; // 0 left, 1 right
if (type.isInnerJoin() || type.isCrossJoin() || type.isSemiJoin()) {
if (type.isAnyInnerJoin() || type.isCrossJoin() || type.isSemiJoin()) {
/* inner join, cross join, semi join
* join
* / \ -> Empty
* Empty B
*/
return transToEmpty(input, context);
} else if (type.isRightOuterJoin() || type.isLeftOuterJoin()) {
} else if (type.isRightOuterJoin() || type.isAnyLeftOuterJoin()) {
/* left outer join Project (remain B columns(null))
* / \ -> |
* A Empty A

View File

@ -59,7 +59,7 @@ public class PushDownLimitJoinRule extends TransformationRule {
// TODO: Push down the limit to the full outer join if BE can output
// the matched rows first.
if (joinType.isSemiAntiJoin() || joinType.isFullOuterJoin()) {
if (joinType.isSemiAntiJoin() || joinType.isFullOuterJoin() || joinType.isAsofInnerJoin()) {
return Lists.newArrayList(result);
} else if (joinType.isInnerJoin() && newJoin.getOnPredicate() != null) {
return Lists.newArrayList(result);
@ -71,7 +71,7 @@ public class PushDownLimitJoinRule extends TransformationRule {
int[] pushDownChildIdx = {0, 1};
// push down all child
if (joinType.isLeftOuterJoin()) {
if (joinType.isAnyLeftOuterJoin()) {
pushDownChildIdx = new int[] {0};
} else if (joinType.isRightOuterJoin()) {
pushDownChildIdx = new int[] {1};

View File

@ -55,7 +55,7 @@ public class PushDownTopNBelowOuterJoinRule extends TransformationRule {
LogicalJoinOperator joinOperator = childExpr.getOp().cast();
JoinOperator joinType = joinOperator.getJoinType();
if (!joinType.isLeftOuterJoin() && !joinType.isRightOuterJoin()) {
if (!joinType.isAnyLeftOuterJoin() && !joinType.isRightOuterJoin()) {
return false;
}
@ -68,7 +68,7 @@ public class PushDownTopNBelowOuterJoinRule extends TransformationRule {
}
OptExpression joinChild = null;
if (joinType.isLeftOuterJoin()) {
if (joinType.isAnyLeftOuterJoin()) {
joinChild = childExpr.inputAt(0);
} else if (joinType.isRightJoin()) {
joinChild = childExpr.inputAt(1);
@ -94,7 +94,7 @@ public class PushDownTopNBelowOuterJoinRule extends TransformationRule {
LogicalJoinOperator joinOperator = childExpr.getOp().cast();
OptExpression joinChildWithSort;
if (joinOperator.getJoinType().isLeftOuterJoin()) {
if (joinOperator.getJoinType().isAnyLeftOuterJoin()) {
joinChildWithSort = childExpr.inputAt(0);
} else {
joinChildWithSort = childExpr.inputAt(1);
@ -109,7 +109,7 @@ public class PushDownTopNBelowOuterJoinRule extends TransformationRule {
.build(), joinChildWithSort);
OptExpression newJoinOperator;
if (joinOperator.getJoinType().isLeftOuterJoin()) {
if (joinOperator.getJoinType().isAnyLeftOuterJoin()) {
newJoinOperator = OptExpression.create(joinOperator,
Lists.newArrayList(newTopNOperator, childExpr.inputAt(1)));
} else {

View File

@ -426,7 +426,7 @@ public class SkewShuffleJoinEliminationRule implements TreeRewriteRule {
// currently only support inner join and left outer join
private boolean isValidJoinType(PhysicalHashJoinOperator joinOp) {
return joinOp.getJoinType().isInnerJoin() || joinOp.getJoinType().isLeftOuterJoin();
return joinOp.getJoinType().isAnyInnerJoin() || joinOp.getJoinType().isAnyLeftOuterJoin();
}
private boolean isShuffleJoin(OptExpression opt) {

View File

@ -1214,6 +1214,7 @@ public class StatisticsCalculator extends OperatorVisitor<Void, ExpressionContex
joinStatsBuilder = Statistics.buildFrom(crossJoinStats);
break;
case INNER_JOIN:
case ASOF_INNER_JOIN:
if (eqOnPredicates.isEmpty()) {
joinStatsBuilder = Statistics.buildFrom(crossJoinStats);
break;
@ -1225,6 +1226,11 @@ public class StatisticsCalculator extends OperatorVisitor<Void, ExpressionContex
joinStatsBuilder.setOutputRowCount(max(innerRowCount, leftRowCount));
computeNullFractionForOuterJoin(leftRowCount, innerRowCount, rightStatistics, joinStatsBuilder);
break;
case ASOF_LEFT_OUTER_JOIN:
joinStatsBuilder = Statistics.buildFrom(innerJoinStats);
joinStatsBuilder.setOutputRowCount(leftRowCount);
computeNullFractionForOuterJoin(leftRowCount, innerRowCount, rightStatistics, joinStatsBuilder);
break;
case LEFT_SEMI_JOIN:
joinStatsBuilder = Statistics.buildFrom(StatisticsEstimateUtils.adjustStatisticsByRowCount(
innerJoinStats, Math.min(leftRowCount, innerRowCount)));
@ -1279,6 +1285,14 @@ public class StatisticsCalculator extends OperatorVisitor<Void, ExpressionContex
estimateStatistics = Statistics.buildFrom(estimateStatistics)
.setOutputRowCount(Math.max(estimateStatistics.getOutputRowCount(), joinStats.getOutputRowCount()))
.build();
} else if (joinType.isAsofInnerJoin()) {
estimateStatistics = Statistics.buildFrom(estimateStatistics)
.setOutputRowCount(Math.max(Math.min(estimateStatistics.getOutputRowCount(), leftRowCount), 1))
.build();
} else if (joinType.isAsofLeftOuterJoin()) {
estimateStatistics = Statistics.buildFrom(estimateStatistics)
.setOutputRowCount(Math.max(1, leftRowCount))
.build();
}
context.setStatistics(estimateStatistics);

View File

@ -1221,14 +1221,14 @@ public class RelationTransformer implements AstVisitorExtendInterface<LogicalPla
// We need to extract the equivalence conditions to meet query analysis and
// avoid hash joins without equivalence conditions
if (scalarOperator.isConstant() && scalarOperator.getType().isBoolean()
&& !node.getJoinOp().isCrossJoin() && !node.getJoinOp().isInnerJoin()) {
&& !node.getJoinOp().isCrossJoin() && !node.getJoinOp().isAnyInnerJoin()) {
ScalarOperator scalarOperatorWithoutRewrite = Utils.compoundAnd(scalarConjuncts);
List<BinaryPredicateOperator> eqPredicate = JoinHelper.getEqualsPredicate(
new ColumnRefSet(leftOutputColumns),
new ColumnRefSet(rightOutputColumns),
Utils.extractConjuncts(scalarOperatorWithoutRewrite));
if (eqPredicate.size() > 0) {
if (!eqPredicate.isEmpty()) {
scalarOperator = Utils.compoundAnd(eqPredicate.get(0), scalarOperator);
}
}
@ -1390,7 +1390,7 @@ public class RelationTransformer implements AstVisitorExtendInterface<LogicalPla
Map<ColumnRefOperator, ScalarOperator> leftConstMap, Map<ColumnRefOperator, ScalarOperator> rightConstMap,
JoinOperator joinOperator) {
// outJoin may generate null values, so it may not safe to use const value
if (joinOperator.isLeftOuterJoin()) {
if (joinOperator.isAnyLeftOuterJoin()) {
rightConstMap = new HashMap<>();
} else if (joinOperator.isRightJoin()) {
leftConstMap = new HashMap<>();

View File

@ -6256,7 +6256,13 @@ public class AstBuilder extends com.starrocks.sql.parser.StarRocksBaseVisitor<Pa
Relation right = (Relation) visit(context.rightRelation);
JoinOperator joinType = JoinOperator.INNER_JOIN;
if (context.crossOrInnerJoinType() != null) {
if (context.asofJoinType() != null) {
if (context.asofJoinType().LEFT() != null) {
joinType = JoinOperator.ASOF_LEFT_OUTER_JOIN;
} else {
joinType = JoinOperator.ASOF_INNER_JOIN;
}
} else if (context.crossOrInnerJoinType() != null) {
if (context.crossOrInnerJoinType().CROSS() != null) {
joinType = JoinOperator.CROSS_JOIN;
} else {

View File

@ -2669,7 +2669,7 @@ public class PlanFragmentBuilder {
// Push down the predicates constructed by the right child when the
// join op is inner join or left semi join or right join(semi, outer, anti)
node.setIsPushDown(ConnectContext.get().getSessionVariable().isHashJoinPushDownRightTable()
&& (node.getJoinOp().isInnerJoin() || node.getJoinOp().isLeftSemiJoin() ||
&& (node.getJoinOp().isAnyInnerJoin() || node.getJoinOp().isLeftSemiJoin() ||
node.getJoinOp().isRightJoin()));
}
@ -2708,7 +2708,7 @@ public class PlanFragmentBuilder {
Set<TupleId> nullableTupleIds = new HashSet<>();
nullableTupleIds.addAll(leftFragment.getPlanRoot().getNullableTupleIds());
nullableTupleIds.addAll(rightFragment.getPlanRoot().getNullableTupleIds());
if (joinOperator.isLeftOuterJoin()) {
if (joinOperator.isAnyLeftOuterJoin()) {
nullableTupleIds.addAll(rightFragment.getPlanRoot().getTupleIds());
} else if (joinOperator.isRightOuterJoin()) {
nullableTupleIds.addAll(leftFragment.getPlanRoot().getTupleIds());
@ -2877,6 +2877,12 @@ public class PlanFragmentBuilder {
context.getNextNodeId(),
leftFragment.getPlanRoot(), rightFragment.getPlanRoot(),
joinOperator, eqJoinConjuncts, otherJoinConjuncts);
// Set ASOF join conjunct if present
if (joinExpr.asofJoinConjunct != null) {
joinNode.setAsofJoinConjunct(joinExpr.asofJoinConjunct);
}
UKFKConstraints constraints = optExpr.getConstraints();
if (constraints != null) {
UKFKConstraints.JoinProperty joinProperty = constraints.getJoinProperty();
@ -3639,13 +3645,15 @@ public class PlanFragmentBuilder {
public final List<Expr> eqJoinConjuncts;
public final List<Expr> otherJoin;
public final List<Expr> conjuncts;
public final Expr asofJoinConjunct;
public final Map<SlotId, Expr> commonSubOperatorMap;
public JoinExprInfo(List<Expr> eqJoinConjuncts, List<Expr> otherJoin, List<Expr> conjuncts,
Map<SlotId, Expr> commonSubOperatorMap) {
Expr asofJoinConjunct, Map<SlotId, Expr> commonSubOperatorMap) {
this.eqJoinConjuncts = eqJoinConjuncts;
this.otherJoin = otherJoin;
this.conjuncts = conjuncts;
this.asofJoinConjunct = asofJoinConjunct;
this.commonSubOperatorMap = commonSubOperatorMap;
}
@ -3676,10 +3684,13 @@ public class PlanFragmentBuilder {
private JoinExprInfo buildJoinExpr(OptExpression optExpr, ExecPlan context) {
ScalarOperator predicate = optExpr.getOp().getPredicate();
ScalarOperator onPredicate;
if (optExpr.getOp() instanceof PhysicalJoinOperator) {
onPredicate = ((PhysicalJoinOperator) optExpr.getOp()).getOnPredicate();
} else if (optExpr.getOp() instanceof PhysicalStreamJoinOperator) {
JoinOperator joinType;
if (optExpr.getOp() instanceof PhysicalJoinOperator op) {
onPredicate = op.getOnPredicate();
joinType = op.getJoinType();
} else if (optExpr.getOp() instanceof PhysicalStreamJoinOperator op) {
onPredicate = ((PhysicalStreamJoinOperator) optExpr.getOp()).getOnPredicate();
joinType = op.getJoinType();
} else {
throw new IllegalStateException("not supported join " + optExpr.getOp());
}
@ -3713,6 +3724,24 @@ public class PlanFragmentBuilder {
List<ScalarOperator> otherJoin = Utils.extractConjuncts(onPredicate);
otherJoin.removeAll(eqOnPredicates);
Expr asofJoinConjunct = null;
if (joinType.isAsofJoin()) {
if (eqJoinConjuncts.isEmpty()) {
throw new IllegalStateException("ASOF JOIN requires at least one equality condition");
}
ColumnRefSet leftColumns = optExpr.inputAt(0).getLogicalProperty().getOutputColumns();
ColumnRefSet rightColumns = optExpr.inputAt(1).getLogicalProperty().getOutputColumns();
ScalarOperator asofJoinPredicate = extractAndValidateAsofTemporalPredicate(otherJoin, leftColumns, rightColumns);
ScalarOperator transformedAsofPredicate = JoinHelper.applyCommutativeToPredicates(
asofJoinPredicate, leftColumns, rightColumns);
asofJoinConjunct = ScalarOperatorToExpr.buildExecExpression(transformedAsofPredicate,
new ScalarOperatorToExpr.FormatterContext(context.getColRefToExpr()));
otherJoin.remove(asofJoinPredicate);
}
List<Expr> otherJoinConjuncts = otherJoin.stream().map(e -> ScalarOperatorToExpr.buildExecExpression(e,
new ScalarOperatorToExpr.FormatterContext(context.getColRefToExpr())))
.collect(Collectors.toList());
@ -3727,7 +3756,7 @@ public class PlanFragmentBuilder {
new ScalarOperatorToExpr.FormatterContext(context.getColRefToExpr())))
.collect(Collectors.toList());
return new JoinExprInfo(eqJoinConjuncts, otherJoinConjuncts, conjuncts, commonSubExprMap);
return new JoinExprInfo(eqJoinConjuncts, otherJoinConjuncts, conjuncts, asofJoinConjunct, commonSubExprMap);
}
// TODO(murphy) consider state distribution
@ -4229,5 +4258,65 @@ public class PlanFragmentBuilder {
parentFragment.mergeQueryDictExprs(fragment.getQueryGlobalDictExprs());
}
}
private ScalarOperator extractAndValidateAsofTemporalPredicate(List<ScalarOperator> otherJoin,
ColumnRefSet leftColumns,
ColumnRefSet rightColumns) {
List<ScalarOperator> candidates = new ArrayList<>();
for (ScalarOperator predicate : otherJoin) {
if (isValidAsofTemporalPredicate(predicate, leftColumns, rightColumns)) {
candidates.add(predicate);
}
}
if (candidates.isEmpty()) {
throw new IllegalStateException("ASOF JOIN requires exactly one temporal inequality condition. found: 0");
}
if (candidates.size() > 1) {
throw new IllegalStateException(String.format(
"ASOF JOIN requires exactly one temporal inequality condition, found %d: %s",
candidates.size(), candidates));
}
ScalarOperator temporalPredicate = candidates.get(0);
for (ScalarOperator child : temporalPredicate.getChildren()) {
Type operandType = child.getType();
if (!operandType.isBigint() && !operandType.isDate() && !operandType.isDatetime()) {
throw new IllegalStateException(String.format(
"ASOF JOIN temporal condition operand must be BIGINT, DATE, or DATETIME in join ON clause, " +
"found: %s. Predicate: %s", operandType, temporalPredicate));
}
}
return candidates.get(0);
}
private boolean isValidAsofTemporalPredicate(ScalarOperator predicate,
ColumnRefSet leftColumns,
ColumnRefSet rightColumns) {
if (!(predicate instanceof BinaryPredicateOperator binaryPredicate)) {
return false;
}
if (!binaryPredicate.getBinaryType().isRange()) {
return false;
}
ColumnRefSet leftOperandColumns = binaryPredicate.getChild(0).getUsedColumns();
ColumnRefSet rightOperandColumns = binaryPredicate.getChild(1).getUsedColumns();
if (leftOperandColumns.isIntersect(leftColumns) && leftOperandColumns.isIntersect(rightColumns)) {
return false;
}
if (rightOperandColumns.isIntersect(leftColumns) && rightOperandColumns.isIntersect(rightColumns)) {
return false;
}
return true;
}
}
}

View File

@ -19,6 +19,7 @@ import com.starrocks.common.FeConstants;
import com.starrocks.sql.plan.PlanTestBase;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
@ -225,4 +226,98 @@ public class OnPredicateMoveAroundRuleTest extends PlanTestBase {
return argumentsList.stream();
}
@Test
public void asofJoinComplexPredicateCases() throws Exception {
String sql1 = "select * from \n" +
"(select * from test_all_type " +
"where id_datetime between '2021-01-01' and '2021-02-01') t1\n " +
"asof join test_all_type_not_null t2\n " +
"on t1.t1b = t2.t1b and t1.id_datetime < t2.id_datetime;";
String plan = getFragmentPlan(sql1);
assertContains(plan, "1:OlapScanNode\n" +
" TABLE: test_all_type_not_null\n" +
" PREAGGREGATION: ON\n" +
" PREDICATES: 18: id_datetime >= '2021-01-01 00:00:00'");
String sql2 = "select * from \n" +
"(select * from test_all_type where (t1d in (1, 2, 3) and id_date = '2021-01-01') " +
"or (id_date >'2021-04-01')) t1 asof join test_all_type_not_null t2\n" +
"on t1.t1d > t2.t1d and t1.id_date = t2.id_date;";
String plan2 = getFragmentPlan(sql2);
assertContains(plan2, "1:OlapScanNode\n" +
" TABLE: test_all_type_not_null\n" +
" PREAGGREGATION: ON\n" +
" PREDICATES: (19: id_date = '2021-01-01') OR (19: id_date > '2021-04-01'), 19: id_date >= '2021-01-01'");
String sql3 = "select * from \n" +
"(select * from test_all_type where abs(t1b + t1d) > 20 and t1a in ('abc', '中文')) t1\n" +
" asof join test_all_type_not_null t2\n" +
"on abs(t1.t1b + t1.t1d) = t2.t1b and t1.id_date > t2.id_date;";
String plan3 = getFragmentPlan(sql3);
assertContains(plan3, "PREDICATES: 12: t1b > 20, CAST(12: t1b AS LARGEINT) IS NOT NULL");
String sql4 = "select * from \n" +
"(select max(t1d) t1d, t1a, id_date from test_all_type group by t1a, id_date " +
"having max(t1d) > 10 and t1a in ('abc', '中文')) t1\n" +
"asof left join test_all_type_not_null t2\n" +
"on t1.t1d = t2.t1d and t1.t1a = t2.t1a and t1.id_date > t2.id_date;";
String plan4 = getFragmentPlan(sql4);
assertContains(plan4, "2:OlapScanNode\n" +
" TABLE: test_all_type_not_null\n" +
" PREAGGREGATION: ON\n" +
" PREDICATES: 15: t1d > 10, 12: t1a IN ('abc', '中文')");
String sql5 = "select * from t0 asof join t1 on v1 = v4 and v2 > v5 where all_match( x-> x > 1, [v1]) or v1 < 0;";
String plan5 = getFragmentPlan(sql5);
assertContains(plan5, "3:SELECT\n" +
" | predicates: (all_match(array_map(<slot 7> -> <slot 7> > 1, [4: v4]))) OR (4: v4 < 0)\n" +
" | \n" +
" 2:OlapScanNode\n" +
" TABLE: t1");
}
@Test
public void asofJoinRedundantPredicateCases() throws Exception {
String sql1 = "select * from \n" +
"(select * from test_all_type where (t1d in (1, 2, 3) and id_date = '2021-01-01') " +
"or (id_date >'2021-04-01')) t1 asof left join test_all_type_not_null t2\n" +
"on t1.t1d > t2.t1d and t1.id_date = t2.id_date;";
String plan1 = getFragmentPlan(sql1);
assertContains(plan1, "1:OlapScanNode\n" +
" TABLE: test_all_type_not_null\n" +
" PREAGGREGATION: ON\n" +
" PREDICATES: (19: id_date = '2021-01-01') OR (19: id_date > '2021-04-01'), 19: id_date >= '2021-01-01'");
String sql2 = "select * from \n" +
"(select * from t0 where v1 > 1 and v1 < 4) t0\n" +
"join\n" +
"(select * from t1 where v4 = 2) t1\n" +
"on t0.v1 > t1.v4 and t0.v3 = t1.v6;";
String plan2 = getFragmentPlan(sql2);
assertContains(plan2, "PREDICATES: 1: v1 >= 2, 3: v3 IS NOT NULL, 1: v1 > 1, 1: v1 < 4");
}
@ParameterizedTest(name = "sql_{index}: {0}.")
@MethodSource("asofJoinComplexJoinCases")
void testAsofComplexJoinCases(String sql, int expect) throws Exception {
String plan = getFragmentPlan(sql);
int numOfPredicate = (int) Arrays.stream(plan.split("\n"))
.filter(ln -> Pattern.compile("PREDICATES").matcher(ln).find()).count();
Assertions.assertEquals(expect, numOfPredicate, plan);
}
public static Stream<Arguments> asofJoinComplexJoinCases() {
List<Arguments> argumentsList = Lists.newArrayList();
String templateSql = "select * from (select * from t0 where v1 < 10) t0 " +
"%s join t1 on v1 = v4 and v2 > v5 %s join (select * from t2 where v7 > 1) t2\n" +
"on v4 = v7 and v6 > v9;";
argumentsList.add(Arguments.of(String.format(templateSql, "asof inner", "asof inner"), 3));
argumentsList.add(Arguments.of(String.format(templateSql, "asof left", "asof inner"), 3));
argumentsList.add(Arguments.of(String.format(templateSql, "asof inner", "asof left"), 3));
argumentsList.add(Arguments.of(String.format(templateSql, "asof left", "asof left"), 3));
return argumentsList.stream();
}
}

View File

@ -0,0 +1,311 @@
// Copyright 2021-present StarRocks, Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package com.starrocks.sql.analyzer;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import static com.starrocks.sql.analyzer.AnalyzeTestUtil.analyzeFail;
import static com.starrocks.sql.analyzer.AnalyzeTestUtil.analyzeSuccess;
public class AnalyzeAsofJoinTest {
@BeforeAll
public static void beforeAll() throws Exception {
AnalyzeTestUtil.init();
}
@Test
public void testRequireAtLeastOneEquality() {
analyzeFail("SELECT t0.v1 FROM t0 ASOF JOIN t1 ON t0.v2 <= t1.v5",
"at least one equality condition in join ON clause");
}
@Test
public void testRequireExactlyOneTemporalInequality() {
analyzeFail("SELECT t0.v1 FROM t0 ASOF JOIN t1 ON t0.v1 = t1.v4 AND t0.v2 <= t1.v5 AND t0.v2 >= t1.v5",
"ASOF JOIN supports only one inequality condition in join ON clause");
}
@Test
public void testUnsupportedOperators() {
analyzeFail("SELECT t0.v1 FROM t0 ASOF JOIN t1 ON t0.v1 = t1.v4 AND t0.v2 != t1.v5",
"does not support '!=' operator in join ON clause");
analyzeFail("SELECT t0.v1 FROM t0 ASOF JOIN t1 ON t0.v1 = t1.v4 AND t0.v2 <=> t1.v5",
"does not support '<=>' operator in join ON clause");
}
@Test
public void testOrNotSupported() {
analyzeFail("SELECT t0.v1 FROM t0 ASOF JOIN t1 ON (t0.v1 = t1.v4) OR (t0.v2 <= t1.v5)",
"do not support OR operators in join ON clause");
}
@Test
public void testValidAsofJoin() {
analyzeSuccess("SELECT t0.v1 FROM t0 ASOF JOIN t1 ON t0.v1 = t1.v4 AND t0.v2 <= t1.v5");
analyzeSuccess("SELECT t0.v1 FROM t0 ASOF LEFT JOIN t1 ON t0.v1 = t1.v4 AND t0.v2 >= t1.v5");
}
@Test
public void testMissingOnClause() {
analyzeFail("SELECT t0.v1 FROM t0 ASOF JOIN t1");
}
@Test
public void testMissingInequalityCondition() {
analyzeFail("SELECT t0.v1 FROM t0 ASOF JOIN t1 on t0.v1 = t1.v4",
"ASOF JOIN requires exactly one temporal inequality condition in join ON clause");
analyzeFail("SELECT t0.v1 FROM t0 ASOF JOIN t1 ON t0.v1 = t1.v4 WHERE t0.v2 > t1.v5",
"ASOF JOIN requires exactly one temporal inequality condition in join ON clause");
}
@Test
public void testTemporalTypeValidationInvalids() {
analyzeFail("SELECT t0.v1 FROM t0 ASOF JOIN tall ON t0.v1 = tall.td AND t0.v2 <= tall.te",
"supports only BIGINT, DATE, or DATETIME types in join ON clause");
analyzeFail("SELECT t0.v1 FROM t0 ASOF JOIN tall ON t0.v1 = tall.td AND t0.v2 <= tall.tj",
"supports only BIGINT, DATE, or DATETIME types in join ON clause");
analyzeFail("SELECT t0.v1 FROM t0 ASOF JOIN tall ON t0.v1 = tall.td AND t0.v2 <= tall.ta",
"supports only BIGINT, DATE, or DATETIME types in join ON clause");
analyzeFail("SELECT t0.v1 FROM t0 ASOF JOIN tall ON t0.v1 = tall.td AND t0.v2 <= tall.tb",
"supports only BIGINT, DATE, or DATETIME types in join ON clause");
analyzeFail("SELECT t0.v1 FROM t0 ASOF JOIN tjson ON t0.v1 = tjson.v_int AND t0.v2 <= tjson.v_json",
"supports only BIGINT, DATE, or DATETIME types in join ON clause");
}
@Test
public void testTemporalTypeBigintVsBigint() {
analyzeSuccess("SELECT t0.v1 FROM t0 asof JOIN t1 ON t0.v1 = t1.v4 AND t0.v2 <= t1.v5");
analyzeSuccess("SELECT t0.v1 FROM t0 ASOF JOIN t1 ON t0.v1 = t1.v4 AND t0.v2 < t1.v5");
analyzeSuccess("SELECT t0.v1 FROM t0 ASOF JOIN t1 ON t0.v1 = t1.v4 AND t0.v2 >= t1.v5");
analyzeSuccess("SELECT t0.v1 FROM t0 ASOF JOIN t1 ON t0.v1 = t1.v4 AND t0.v2 > t1.v5");
}
@Test
public void testSupportedAsofJoinType() {
analyzeSuccess("SELECT t0.v1 FROM t0 ASOF JOIN t1 ON t0.v1 = t1.v4 AND t0.v2 > t1.v5");
analyzeSuccess("SELECT t0.v1 FROM t0 ASOF INNER JOIN t1 ON t0.v1 = t1.v4 AND t0.v2 > t1.v5");
analyzeSuccess("SELECT t0.v1 FROM t0 ASOF LEFT JOIN t1 ON t0.v1 = t1.v4 AND t0.v2 > t1.v5");
analyzeSuccess("SELECT t0.v1 FROM t0 ASOF LEFT OUTER JOIN t1 ON t0.v1 = t1.v4 AND t0.v2 > t1.v5");
}
@Test
public void testUnsupportedAsofJoinType() {
analyzeFail("SELECT t0.v1 FROM t0 ASOF RIGHT JOIN t1 ON t0.v1 = t1.v4 AND t0.v2 > t1.v5");
analyzeFail("SELECT t0.v1 FROM t0 ASOF RIGHT OUTER JOIN t1 ON t0.v1 = t1.v4 AND t0.v2 > t1.v5");
analyzeFail("SELECT t0.v1 FROM t0 ASOF FULL JOIN t1 ON t0.v1 = t1.v4 AND t0.v2 > t1.v5");
analyzeFail("SELECT t0.v1 FROM t0 ASOF FULL OUTER JOIN t1 ON t0.v1 = t1.v4 AND t0.v2 > t1.v5");
analyzeFail("SELECT t0.v1 FROM t0 ASOF SEMI JOIN t1 ON t0.v1 = t1.v4 AND t0.v2 > t1.v5");
analyzeFail("SELECT t0.v1 FROM t0 ASOF ANTI JOIN t1 ON t0.v1 = t1.v4 AND t0.v2 > t1.v5");
analyzeFail("SELECT t0.v1 FROM t0 ASOF CROSS JOIN t1 ON t0.v1 = t1.v4 AND t0.v2 > t1.v5");
}
// datetime vs datetime, date vs date, and mixed date<->datetime on the same table (self-join)
@Test
public void testTemporalTypeDateTimeAndDateSelfJoins() {
// datetime vs datetime
analyzeSuccess("SELECT a.td FROM tall a ASOF JOIN tall b ON a.td = b.td AND a.th <= b.th");
analyzeSuccess("SELECT a.td FROM tall a ASOF LEFT JOIN tall b ON a.td = b.td AND a.th >= b.th");
// date vs date
analyzeSuccess("SELECT a.td FROM tall a ASOF JOIN tall b ON a.td = b.td AND a.ti < b.ti");
analyzeSuccess("SELECT a.td FROM tall a ASOF LEFT JOIN tall b ON a.td = b.td AND a.ti >= b.ti");
// mixed: datetime vs date and date vs datetime
analyzeSuccess("SELECT a.td FROM tall a ASOF JOIN tall b ON a.td = b.td AND a.th <= b.ti");
analyzeSuccess("SELECT a.td FROM tall a ASOF JOIN tall b ON a.td = b.td AND a.ti <= b.th");
}
@Test
public void testMultipleEqualityConditions() {
analyzeSuccess("SELECT t0.v1 FROM t0 ASOF JOIN t1 ON t0.v1 = t1.v4 AND t0.v3 = t1.v6 AND t0.v2 <= t1.v5");
analyzeSuccess("SELECT t0.v1 FROM t0 ASOF LEFT JOIN t1 ON t0.v1 = t1.v4 AND t0.v3 = t1.v6 AND t0.v2 >= t1.v5");
analyzeSuccess("SELECT a.td FROM tall a ASOF JOIN tall b ON a.td = b.td AND a.tc = b.tc AND a.tg = b.tg" +
" AND a.th <= b.th");
}
@Test
public void testAsofJoinWithWhereClause() {
analyzeSuccess("SELECT t0.v1 FROM t0 ASOF JOIN t1 ON t0.v1 = t1.v4 AND t0.v2 <= t1.v5 WHERE t0.v3 > 100");
analyzeSuccess("SELECT t0.v1 FROM t0 ASOF LEFT JOIN t1 ON t0.v1 = t1.v4 AND t0.v2 >= t1.v5 WHERE t1.v6 IS NOT NULL");
analyzeSuccess("SELECT t0.v1 FROM t0 ASOF JOIN t1 ON t0.v1 = t1.v4 AND t0.v2 <= t1.v5 " +
"WHERE t0.v3 > 100 AND t1.v6 < 200");
analyzeSuccess("SELECT t0.v1 FROM t0 ASOF JOIN t1 ON t0.v1 = t1.v4 AND t0.v2 <= t1.v5 " +
"WHERE t0.v1 IN (SELECT v7 FROM t2)");
}
@Test
public void testAsofJoinWithUsing() {
analyzeFail("SELECT l.v1 FROM t0 l ASOF JOIN t0 r USING (v1)");
analyzeFail("SELECT l.v1 FROM t0 l ASOF JOIN t0 r USING (v1, v2)");
}
@Test
public void testFunctionInTemporalCondition() {
analyzeSuccess("SELECT t0.v1 FROM t0 ASOF LEFT JOIN t1 ON t0.v1 = t1.v4 AND t0.v2 + 10 >= t1.v5 - 5");
analyzeSuccess("SELECT a.td FROM tall a ASOF LEFT JOIN tall b ON a.tc = b.tc AND a.td + 100 >= b.tg - 50");
analyzeSuccess("SELECT a.td FROM tall a ASOF JOIN tall b ON a.td = b.td AND DATE_ADD(a.th, INTERVAL 1 DAY) <= b.th");
analyzeSuccess("SELECT a.td FROM tall a ASOF LEFT JOIN tall b ON a.td = b.td" +
" AND DATE_SUB(a.th, INTERVAL 1 HOUR) >= b.th");
analyzeSuccess("SELECT a.td FROM tall a ASOF JOIN tall b ON a.td = b.td AND DATE_ADD(a.ti, INTERVAL 1 DAY) <= b.ti");
analyzeSuccess("SELECT a.td FROM tall a ASOF LEFT JOIN tall b ON a.td = b.td AND " +
"a.ti >= DATE_SUB(b.ti, INTERVAL 7 DAY)");
analyzeSuccess("SELECT a.td FROM tall a ASOF LEFT JOIN tall b ON a.td = b.td AND" +
" UNIX_TIMESTAMP(a.th) >= UNIX_TIMESTAMP(b.th)");
analyzeSuccess("SELECT t0.v1 FROM t0 ASOF JOIN t1 ON t0.v1 = t1.v4 AND t0.v2 <= t1.v5");
analyzeSuccess("SELECT a.td FROM tall a ASOF JOIN tall b ON a.tc = b.tc AND a.td <= b.tg");
}
@Test
public void testFunctionInEqualityCondition() {
analyzeSuccess("SELECT t0.v1 FROM t0 ASOF JOIN t1 ON t0.v1 + 0 = t1.v4 + 0 AND t0.v2 <= t1.v5");
analyzeSuccess("SELECT a.td FROM tall a ASOF LEFT JOIN tall b ON a.td + 1 = b.td + 1 AND a.tg >= b.tg");
analyzeSuccess("SELECT a.td FROM tall a ASOF JOIN tall b ON DATE(a.th) = DATE(b.th) AND a.th <= b.th");
analyzeSuccess("SELECT a.td FROM tall a ASOF LEFT JOIN tall b ON YEAR(a.ti) = YEAR(b.ti) AND a.ti >= b.ti");
analyzeSuccess("SELECT a.td FROM tall a ASOF JOIN tall b ON a.tc = b.tc AND a.th <= b.th");
analyzeSuccess("SELECT t0.v1 FROM t0 ASOF JOIN t1 ON t0.v1 = t1.v4 AND t0.v2 <= t1.v5");
analyzeSuccess("SELECT a.td FROM tall a ASOF JOIN tall b ON a.td = b.td AND a.tg <= b.tg");
}
@Test
public void testComplexAsofJoinScenarios() {
analyzeSuccess("SELECT t0.v1 FROM t0 ASOF JOIN t1 ON " +
"t0.v1 = t1.v4 AND t0.v3 + 1 = t1.v6 + 1 AND " +
"t0.v2 + 10 <= t1.v5 + 10");
analyzeSuccess("SELECT a.td FROM tall a ASOF JOIN tall b ON " +
"a.tc = b.tc AND a.td + 1 = b.td + 1 AND " +
"a.tg + 100 <= b.tg + 200");
analyzeSuccess("SELECT a.td FROM tall a ASOF JOIN tall b ON " +
"a.td = b.td AND " +
"CONVERT_TZ(a.th, '+00:00', '+08:00') <= CONVERT_TZ(b.th, '+00:00', '+08:00')");
analyzeSuccess("SELECT a.td FROM tall a ASOF JOIN tall b ON " +
"YEAR(a.ti) = YEAR(b.ti) AND MONTH(a.ti) = MONTH(b.ti) AND " +
"a.ti <= DATE_ADD(b.ti, INTERVAL 1 DAY)");
analyzeSuccess("SELECT t0.v1 FROM t0 ASOF LEFT JOIN t1 ON " +
"CASE WHEN t0.v1 > 0 THEN t0.v1 ELSE 0 END = CASE WHEN t1.v4 > 0 THEN t1.v4 ELSE 0 END AND " +
"t0.v2 >= t1.v5");
analyzeSuccess("SELECT a.td FROM tall a ASOF LEFT JOIN tall b ON " +
"a.tc = b.tc AND a.td = b.td AND " +
"DATE_ADD(a.th, INTERVAL 1 HOUR) >= DATE_SUB(b.th, INTERVAL 1 DAY)");
}
@Test
public void testInvalidFunctionInTemporalCondition() {
analyzeFail("SELECT t0.v1 FROM t0 ASOF JOIN tall ON t0.v1 = tall.td AND t0.v2 <= UPPER(tall.ta)",
"supports only BIGINT, DATE, or DATETIME types in join ON clause");
analyzeFail("SELECT t0.v1 FROM t0 ASOF JOIN t1 ON t0.v1 = t1.v4 AND ABS(t0.v2) <= ABS(t1.v5)",
"supports only BIGINT, DATE, or DATETIME types in join ON clause");
analyzeFail("SELECT t0.v1 FROM t0 ASOF JOIN t1 ON t0.v1 = t1.v4 AND t0.v2 <= SQRT(t1.v5)",
"supports only BIGINT, DATE, or DATETIME types in join ON clause");
analyzeFail("SELECT t0.v1 FROM t0 ASOF JOIN t1 ON t0.v1 = t1.v4 AND t0.v2 <= t1.v5 AND t0.v3 <= 'invalid'",
"ASOF JOIN temporal condition supports only BIGINT, DATE, or DATETIME types in join ON clause");
analyzeFail("SELECT t0.v1 FROM t0 ASOF JOIN tall ON t0.v1 = tall.td AND t0.v2 <= CONCAT(tall.ta, 'suffix')",
"ASOF JOIN temporal condition supports only BIGINT, DATE, or DATETIME types in join ON clause");
}
@Test
public void testAsofJoinWithLateralJoin() {
analyzeFail("SELECT tarray.v1 FROM tarray ASOF JOIN unnest(v3) AS u " +
"ON tarray.v1 = u.unnest AND tarray.v2 <= u.unnest",
"ASOF join is not supported with lateral join");
}
@Test
public void testAsofJoinWithSubquery() {
analyzeSuccess("SELECT t0.v1 FROM t0 ASOF JOIN (SELECT * FROM t1) sub " +
"ON t0.v1 = sub.v4 AND t0.v2 <= sub.v5");
analyzeSuccess("SELECT t0.v1 FROM t0 ASOF LEFT JOIN " +
"(SELECT v4, v5, v6 FROM t1 WHERE v4 > 10) sub " +
"ON t0.v1 = sub.v4 AND t0.v2 >= sub.v5");
}
@Test
public void testAsofJoinWithNullCondition() {
analyzeFail("SELECT t0.v1 FROM t0 ASOF JOIN t1 ON NULL",
"ASOF JOIN requires at least one equality condition in join ON clause");
}
@Test
public void testAsofJoinWithConstantConditions() {
analyzeSuccess("SELECT t0.v1 FROM t0 ASOF JOIN t1 ON t0.v1 = t1.v4 AND 1 = 1 AND t0.v2 <= t1.v5");
analyzeFail("SELECT t0.v1 FROM t0 ASOF JOIN t1 ON t0.v1 = t1.v4 AND 1 = 1",
"ASOF JOIN requires exactly one temporal inequality condition in join ON clause");
}
@Test
public void testAsofJoinWithNestedConditions() {
analyzeFail("SELECT t0.v1 FROM t0 ASOF JOIN t1 ON " +
"t0.v1 = t1.v4 AND (t0.v2 <= t1.v5 OR t0.v3 >= t1.v6)",
"ASOF JOIN conditions do not support OR operators in join ON clause");
analyzeSuccess("SELECT t0.v1 FROM t0 ASOF JOIN t1 ON " +
"((t0.v1 = t1.v4) AND (t0.v3 = t1.v6)) AND (t0.v2 <= t1.v5)");
}
@Test
public void testAsofJoinWithDifferentOperators() {
analyzeSuccess("SELECT t0.v1 FROM t0 ASOF JOIN t1 ON t0.v1 = t1.v4 AND t0.v2 < t1.v5");
analyzeSuccess("SELECT t0.v1 FROM t0 ASOF JOIN t1 ON t0.v1 = t1.v4 AND t0.v2 <= t1.v5");
analyzeSuccess("SELECT t0.v1 FROM t0 ASOF JOIN t1 ON t0.v1 = t1.v4 AND t0.v2 > t1.v5");
analyzeSuccess("SELECT t0.v1 FROM t0 ASOF JOIN t1 ON t0.v1 = t1.v4 AND t0.v2 >= t1.v5");
}
@Test
public void testAsofJoinWithMixedTypes() {
analyzeSuccess("SELECT a.td FROM tall a ASOF JOIN tall b ON a.tc = b.tc AND a.td <= b.th");
analyzeSuccess("SELECT a.td FROM tall a ASOF JOIN tall b ON a.tc = b.tc AND a.th >= b.td");
analyzeFail("SELECT a.td FROM tall a ASOF JOIN tall b ON a.tc = b.tc AND a.td <= b.ta",
"supports only BIGINT, DATE, or DATETIME types in join ON clause");
}
@Test
public void testOtherPredicates() {
analyzeSuccess("select * from t0 asof join t1 on t0.v1 = t1.v4 and t1.v5 <= t0.v2 where (v1 > 4 and v5 < 2)");
analyzeFail("select * from t0 asof join t1 on t0.v1 = t1.v4 and t1.v5 <= t0.v2 and (v1 > 4 and v5 < 2)",
"ASOF JOIN temporal condition supports only BIGINT, DATE, or DATETIME types in join ON clause");
analyzeSuccess("select * from t0 asof join t1 on t0.v1 = t1.v4 and t1.v5 <= t0.v2 where (v1 > 4 or v5 < 2)");
analyzeFail("select * from t0 asof join t1 on t0.v1 = t1.v4 and t1.v5 <= t0.v2 and (v1 > 4 or v5 < 2)",
"ASOF JOIN temporal condition supports only BIGINT, DATE, or DATETIME types in join ON clause");
analyzeSuccess("select * from t0 asof join t1 on t0.v1 = t1.v4 and t1.v5 <= t0.v2 where t0.v1 != 3");
analyzeFail("select * from t0 asof join t1 on t0.v1 = t1.v4 and t1.v5 <= t0.v2 and v1 != 3",
"ASOF JOIN does not support '!=' operator in join ON clause");
}
}

View File

@ -85,14 +85,23 @@ public class JoinPredicatePushdownTest extends PlanTestBase {
}
@Test
public void testMultiLeftOuterJoin2() throws Exception {
connectContext.getSessionVariable().setOptimizerExecuteTimeout(3000000);
connectContext.getSessionVariable().disableJoinReorder();
String query = "select x.v1 v11, x.v2 v21, x.v3 v31, sub2.v1 v12, sub2.v2 v22 from test.t0 x inner join" +
" (select v1, v2, v3, v4, v5, v8 " +
"from test.t0 left outer join (select * from test.t1 " +
"inner join test.t2 on v5 = v7) sub on v1 = v4) sub2 on x.v1 = sub2.v1";
public void testAsofJoinPushdownCTE() throws Exception {
connectContext.getSessionVariable().setCboCteReuse(true);
String query = "with xxx1 as (\n" +
"with x as (select * from t1 join t2 where t1.v4 = t2.v7 )\n" +
"select x1.v5, x2.v8, x1.v6, x2.v7 \n" +
"from (select * from x where x.v5 = 1 ) x1 asof left outer join" +
" (select * from x where x.v8 = 2) x2 on x1.v4 = x2.v7 and x1.v4 >= x2.v7)\n" +
"select * from xxx1 where xxx1.v6 = 2\n" +
"union \n" +
"select * from xxx1 where xxx1.v7 = 3";
String plan = getFragmentPlan(query);
PlanTestBase.assertContains(plan, "11:HASH JOIN\n" +
" | join op: ASOF LEFT OUTER JOIN (BROADCAST)\n" +
" | colocate: false, reason: \n" +
" | equal join conjunct: 7: v4 = 16: v7\n" +
" | asof join conjunct: 7: v4 >= 16: v7\n" +
" | other predicates: (16: v7 = 3) OR (9: v6 = 2)");
}
@Test
@ -125,6 +134,40 @@ public class JoinPredicatePushdownTest extends PlanTestBase {
PlanTestBase.assertNotContains(plan2, "LEFT OUTER JOIN");
}
@Test
public void testMultiAsofLeftOuterJoin() throws Exception {
String query = "select v1, v2, v5, v8 " +
"from test.t0 left outer join test.t1 on v1 = v4 " +
"asof left outer join test.t2 on v5 = v7 and v6 >= v8 where v9 = 10";
String plan = getFragmentPlan(query);
PlanTestBase.assertContains(plan, "8:HASH JOIN\n" +
" | join op: ASOF INNER JOIN (BROADCAST)\n" +
" | colocate: false, reason: \n" +
" | equal join conjunct: 5: v5 = 7: v7\n" +
" | asof join conjunct: 6: v6 >= 8: v8");
PlanTestBase.assertContains(plan, "5:OlapScanNode\n" +
" TABLE: t2\n" +
" PREAGGREGATION: ON\n" +
" PREDICATES: 9: v9 = 10");
String query2 = "select v1, v2, v5, v8 " +
"from test.t0 asof left outer join test.t1 on v1 = v4 and v2 > v5 " +
"left outer join test.t2 on v5 = v7 where v9 = 10 and v3 = 1";
String plan2 = getFragmentPlan(query2);
PlanTestBase.assertContains(plan2, "9:HASH JOIN\n" +
" | join op: INNER JOIN (BROADCAST)\n" +
" | colocate: false, reason: \n" +
" | equal join conjunct: 5: v5 = 7: v7\n" +
" | \n" +
" |----8:EXCHANGE");
PlanTestBase.assertContains(plan2, "4:HASH JOIN\n" +
" | join op: ASOF INNER JOIN (BROADCAST)\n" +
" | colocate: false, reason: \n" +
" | equal join conjunct: 1: v1 = 4: v4\n" +
" | asof join conjunct: 2: v2 > 5: v5");
PlanTestBase.assertNotContains(plan2, "LEFT OUTER JOIN");
}
@Test
public void testMultiRightOuterJoin() throws Exception {
String query = "select v1, v2, v5, v8 " +
@ -181,4 +224,28 @@ public class JoinPredicatePushdownTest extends PlanTestBase {
" PREAGGREGATION: ON\n" +
" PREDICATES: 4: v4 > 2");
}
@Test
public void testFunctionDerivedForAsof() throws Exception {
String sql = "select * from t0 asof join t1 on v1 = v4 and v2 > v5 where all_match(x -> x > 1, [v1]) and v1 > 2";
String plan = getFragmentPlan(sql);
assertContains(plan, "3:SELECT\n" +
" | predicates: all_match(array_map(<slot 7> -> <slot 7> > 1, [4: v4]))\n" +
" | \n" +
" 2:OlapScanNode\n" +
" TABLE: t1\n" +
" PREAGGREGATION: ON\n" +
" PREDICATES: 4: v4 > 2");
sql = "select * from t0 asof join t1 on v1 = v4 and v2 > v5 join t2 on v4 = v7 " +
"where all_match(x -> x > 1, [v1]) and v7 > 2";
plan = getFragmentPlan(sql);
assertContains(plan, " 4:SELECT\n" +
" | predicates: all_match(array_map(<slot 10> -> <slot 10> > 1, [4: v4]))\n" +
" | \n" +
" 3:OlapScanNode\n" +
" TABLE: t1\n" +
" PREAGGREGATION: ON\n" +
" PREDICATES: 4: v4 > 2");
}
}

View File

@ -0,0 +1,196 @@
// Copyright 2021-present StarRocks, Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package com.starrocks.sql.plan;
import com.starrocks.catalog.OlapTable;
import com.starrocks.common.FeConstants;
import com.starrocks.server.GlobalStateMgr;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
public class AsofJoinReorderTest extends PlanTestBase {
@BeforeAll
public static void beforeClass() throws Exception {
PlanTestBase.beforeClass();
GlobalStateMgr globalStateMgr = connectContext.getGlobalStateMgr();
OlapTable t0 = (OlapTable) globalStateMgr.getLocalMetastore().getDb("test").getTable("t0");
setTableStatistics(t0, 1);
OlapTable t1 = (OlapTable) globalStateMgr.getLocalMetastore().getDb("test").getTable("t1");
setTableStatistics(t1, 10);
OlapTable t2 = (OlapTable) globalStateMgr.getLocalMetastore().getDb("test").getTable("t2");
setTableStatistics(t2, 100000);
OlapTable t3 = (OlapTable) globalStateMgr.getLocalMetastore().getDb("test").getTable("t3");
setTableStatistics(t3, 1000000000);
connectContext.getSessionVariable().setMaxTransformReorderJoins(2);
FeConstants.runningUnitTest = true;
}
@Test
public void testAsofLeftJoin() throws Exception {
String sql = "select t1.* from t0 asof join t1 on v1 > v4 and v2 = v5 left join t2 on v1 < v7 ";
String plan = getFragmentPlan(sql);
assertContains(plan, "6:HASH JOIN\n" +
" | join op: ASOF INNER JOIN (PARTITIONED)\n" +
" | colocate: false, reason: \n" +
" | equal join conjunct: 2: v2 = 5: v5\n" +
" | asof join conjunct: 1: v1 > 4: v4\n" +
" | \n" +
" |----5:EXCHANGE\n" +
" | \n" +
" 3:EXCHANGE");
assertContains(plan, "EXCHANGE ID: 03\n" +
" HASH_PARTITIONED: 2: v2\n" +
"\n" +
" 2:OlapScanNode\n" +
" TABLE: t0");
assertContains(plan, "EXCHANGE ID: 05\n" +
" HASH_PARTITIONED: 5: v5\n" +
"\n" +
" 4:OlapScanNode\n" +
" TABLE: t1");
assertContains(plan, "9:NESTLOOP JOIN\n" +
" | join op: RIGHT OUTER JOIN\n" +
" | colocate: false, reason: \n" +
" | other join predicates: 1: v1 < 7: v7\n" +
" | \n" +
" |----8:EXCHANGE\n" +
" | \n" +
" 1:EXCHANGE");
}
@Test
void testAsofLeftJoinReorderGreedy() throws Exception {
connectContext.getSessionVariable().disableDPJoinReorder();
String sql = "select v6 from t1 " +
"left join (select t1.v5 from t1 asof join t3 on t1.v4 = t3.v10 and t1.v5 > t3.v11 join t0 join t2) a " +
"on t1.v6 = a.v5";
String planFragment = getFragmentPlan(sql);
Assertions.assertTrue(planFragment.contains("5:HASH JOIN\n" +
" | join op: ASOF INNER JOIN (BUCKET_SHUFFLE)\n" +
" | colocate: false, reason: \n" +
" | equal join conjunct: 4: v4 = 7: v10\n" +
" | asof join conjunct: 5: v5 > 8: v11\n" +
" | \n" +
" |----4:EXCHANGE\n" +
" | \n" +
" 2:OlapScanNode\n" +
" TABLE: t1"));
}
@Test
void testAsofInnerJoinReorderDP() throws Exception {
connectContext.getSessionVariable().enableDPJoinReorder();
String sql = "select * from t1 " +
"asof join t3 on t1.v4 = t3.v10 and t1.v5 > t3.v11 " +
"asof join t0 on t1.v4 = t0.v2 and t1.v5 > t0.v2 " +
"join t2 on t1.v5 = t2.v8 ";
String plan = getFragmentPlan(sql);
assertContains(plan, "7:HASH JOIN\n" +
" | join op: ASOF INNER JOIN (BUCKET_SHUFFLE)\n" +
" | colocate: false, reason: \n" +
" | equal join conjunct: 1: v4 = 8: v2\n" +
" | asof join conjunct: 2: v5 > 8: v2\n" +
" | \n" +
" |----6:EXCHANGE\n" +
" | \n" +
" 4:HASH JOIN\n" +
" | join op: ASOF INNER JOIN (BUCKET_SHUFFLE)\n" +
" | colocate: false, reason: \n" +
" | equal join conjunct: 1: v4 = 4: v10\n" +
" | asof join conjunct: 2: v5 > 5: v11\n" +
" | \n" +
" |----3:EXCHANGE\n" +
" | \n" +
" 1:OlapScanNode\n" +
" TABLE: t1");
}
@Test
void testTwoJoinRootGreedy() throws Exception {
connectContext.getSessionVariable().enableGreedyJoinReorder();
String sql = "select t0.v1 from t1 " +
"asof join t3 on t1.v4 = t3.v10 and t1.v5 > t3.v11 " +
"asof join t0 on t1.v4 = t0.v2 and t0.v2 > t3.v12 " +
"join (select * from t1 join t3 on t1.v4 = t3.v10 join t0 on t1.v4 = t0.v2 join t2 on t1.v5 = t2.v8) as a " +
"on t1.v5 = a.v8 ";
String planFragment = getFragmentPlan(sql);
assertContains(planFragment, "17:HASH JOIN\n" +
" | join op: ASOF INNER JOIN (BUCKET_SHUFFLE)\n" +
" | colocate: false, reason: \n" +
" | equal join conjunct: 1: v4 = 8: v2\n" +
" | asof join conjunct: 6: v12 < 8: v2\n" +
" | \n" +
" |----16:EXCHANGE\n" +
" | \n" +
" 14:Project\n" +
" | <slot 1> : 1: v4\n" +
" | <slot 2> : 2: v5\n" +
" | <slot 6> : 6: v12\n" +
" | \n" +
" 13:HASH JOIN\n" +
" | join op: ASOF INNER JOIN (BUCKET_SHUFFLE)\n" +
" | colocate: false, reason: \n" +
" | equal join conjunct: 1: v4 = 4: v10\n" +
" | asof join conjunct: 2: v5 > 5: v11\n" +
" | \n" +
" |----12:EXCHANGE\n" +
" | \n" +
" 10:OlapScanNode\n" +
" TABLE: t1");
}
@Test
void testTwoJoinRootDP() throws Exception {
connectContext.getSessionVariable().enableDPJoinReorder();
connectContext.getSessionVariable().disableGreedyJoinReorder();
String sql = "select t0.v1 from t1 " +
"join t3 on t1.v4 = t3.v10 " +
"asof join t0 on t3.v11 = t0.v2 and t0.v2 > t3.v12 " +
"join (select * from t1 join t3 on t1.v4 = t3.v10 join t0 on t1.v4 = t0.v2 join t2 on t1.v5 = t2.v8) as a " +
"on t1.v5 = a.v8 ";
String plan = getFragmentPlan(sql);
assertContains(plan, "17:HASH JOIN\n" +
" | join op: ASOF INNER JOIN (BROADCAST)\n" +
" | colocate: false, reason: \n" +
" | equal join conjunct: 5: v11 = 8: v2\n" +
" | asof join conjunct: 6: v12 < 8: v2\n" +
" | \n" +
" |----16:EXCHANGE\n" +
" | \n" +
" 14:Project\n" +
" | <slot 2> : 2: v5\n" +
" | <slot 5> : 5: v11\n" +
" | <slot 6> : 6: v12\n" +
" | \n" +
" 13:HASH JOIN\n" +
" | join op: INNER JOIN (BUCKET_SHUFFLE)\n" +
" | colocate: false, reason: \n" +
" | equal join conjunct: 4: v10 = 1: v4\n" +
" | \n" +
" |----12:EXCHANGE\n" +
" | \n" +
" 10:OlapScanNode\n" +
" TABLE: t3");
}
}

View File

@ -0,0 +1,69 @@
// Copyright 2021-present StarRocks, Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package com.starrocks.sql.plan;
import com.starrocks.catalog.OlapTable;
import com.starrocks.common.FeConstants;
import com.starrocks.server.GlobalStateMgr;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
public class AsofJoinStatisticsTest extends PlanWithCostTestBase {
@BeforeAll
public static void beforeClass() throws Exception {
PlanWithCostTestBase.beforeClass();
FeConstants.runningUnitTest = true;
GlobalStateMgr globalStateMgr = connectContext.getGlobalStateMgr();
OlapTable t0 = (OlapTable) globalStateMgr.getLocalMetastore().getDb("test").getTable("t0");
setTableStatistics(t0, 1000);
OlapTable t1 = (OlapTable) globalStateMgr.getLocalMetastore().getDb("test").getTable("t1");
setTableStatistics(t1, 200000);
}
@Test
public void testAsofInnerJoinStatistics() throws Exception {
// Test ASOF INNER JOIN statistics calculation
// Expected: output row count should be <= probe table row count (1000)
String sql = "SELECT t0.v1 FROM t0 ASOF JOIN t1 ON t0.v1 = t1.v4 AND t0.v2 <= t1.v5";
String plan = getCostExplain(sql);
assertContains(plan, "3:HASH JOIN\n" +
" | join op: ASOF INNER JOIN (BUCKET_SHUFFLE)\n" +
" | equal join conjunct: [1: v1, BIGINT, true] = [4: v4, BIGINT, true]\n" +
" | asof join conjunct: [2: v2, BIGINT, true] <= [5: v5, BIGINT, true]\n" +
" | build runtime filters:\n" +
" | - filter_id = 0, build_expr = (4: v4), remote = false\n" +
" | output columns: 1\n" +
" | cardinality: 900");
}
@Test
public void testAsofLeftJoinStatistics() throws Exception {
// Expected: output row count should equal probe table row count (1000)
String sql = "SELECT t0.v1 FROM t0 ASOF LEFT JOIN t1 ON t0.v1 = t1.v4 AND t0.v2 >= t1.v5";
String plan = getCostExplain(sql);
assertContains(plan, "3:HASH JOIN\n" +
" | join op: ASOF LEFT OUTER JOIN (BUCKET_SHUFFLE)\n" +
" | equal join conjunct: [1: v1, BIGINT, true] = [4: v4, BIGINT, true]\n" +
" | asof join conjunct: [2: v2, BIGINT, true] >= [5: v5, BIGINT, true]\n" +
" | output columns: 1\n" +
" | cardinality: 1000");
}
}

View File

@ -115,6 +115,42 @@ public class EmptyValueTest extends PlanTestBase {
assertCContains(plan, "other predicates: (43: L_RETURNFLAG != 'R') OR (58: L_DISCOUNT != 0.05)");
}
@Test
public void testPruneAsofJoinWithEmptyNode() throws Exception {
String sql = "select L_PARTKEY, test_all_type.t1d from lineitem_partition p " +
"asof left outer join test_all_type on p.L_ORDERKEY = test_all_type.t1d and " +
"p.L_COMMITDATE >= test_all_type.id_date where L_SHIPDATE = '2000-01-01' ";
String plan = getFragmentPlan(sql);
assertContains(plan, "RESULT SINK\n" +
"\n 0:EMPTYSET");
sql = "select L_PARTKEY, t0.t1d from test_all_type t0 asof left outer join " +
"(select * from lineitem_partition p where L_SHIPDATE = '2000-01-01') x " +
"on x.L_ORDERKEY = t0.t1d and x.L_COMMITDATE >= t0.id_date";
plan = getFragmentPlan(sql);
assertContains(plan, "1:Project\n" +
" | <slot 4> : 4: t1d\n" +
" | <slot 12> : NULL\n" +
" | \n" +
" 0:OlapScanNode\n" +
" TABLE: test_all_type");
sql = "select L_PARTKEY, test_all_type.t1d from lineitem_partition p " +
"asof join test_all_type on p.L_ORDERKEY = test_all_type.t1d and " +
"p.L_COMMITDATE >= test_all_type.id_date where L_SHIPDATE = '2000-01-01' ";
plan = getFragmentPlan(sql);
assertContains(plan, "RESULT SINK\n" +
"\n 0:EMPTYSET");
sql = "select L_PARTKEY, t0.t1d from test_all_type t0 asof join " +
"(select * from lineitem_partition p where L_SHIPDATE = '2000-01-01') x " +
"on x.L_ORDERKEY = t0.t1d and x.L_COMMITDATE >= t0.id_date";
plan = getFragmentPlan(sql);
assertContains(plan, "RESULT SINK\n" +
"\n 0:EMPTYSET");
}
@Test
public void testPartitionOtherJoin() throws Exception {
String sql = "select L_PARTKEY, t0.v2 from lineitem_partition p " +

View File

@ -17,6 +17,7 @@ package com.starrocks.sql.plan;
import com.starrocks.catalog.Database;
import com.starrocks.catalog.OlapTable;
import com.starrocks.catalog.Table;
import com.starrocks.common.ExceptionChecker;
import com.starrocks.common.FeConstants;
import com.starrocks.server.GlobalStateMgr;
import com.starrocks.sql.optimizer.OptimizerContext;
@ -2744,7 +2745,36 @@ public class JoinTest extends PlanTestBase {
}
}
@Test
public void testPushDownTopNWithAsofLeftOuterJoin() throws Exception {
String sql = "SELECT\n" +
" *\n" +
"FROM\n" +
" (\n" +
" SELECT\n" +
" t0.*,\n" +
" t1.v5,\n" +
" t1.v6\n" +
" FROM\n" +
" t0\n" +
" ASOF LEFT JOIN t1 ON t0.v1 = t1.v4 and t0.v2 > t1.v5\n" +
" ) AS mocktable\n" +
"ORDER BY\n" +
" mocktable.v2 DESC\n" +
"LIMIT\n" +
" 20";
String plan = getFragmentPlan(sql);
assertContains(plan, "5:HASH JOIN\n" +
" | join op: ASOF LEFT OUTER JOIN (BROADCAST)");
assertContains(plan, "1:TOP-N\n" +
" | order by: <slot 2> 2: v2 DESC\n" +
" | offset: 0\n" +
" | limit: 20\n" +
" | \n" +
" 0:OlapScanNode");
}
@Test
public void testPushDownTopWithOuterJoin() throws Exception {
@ -3333,4 +3363,114 @@ public class JoinTest extends PlanTestBase {
" | <slot 31> : 30: if = 'b'");
}
@Test
public void testAsofJoinConditionNormalizeAllOperators() throws Exception {
String sql1 = "select t0.v1 from t0 asof join t1 on t0.v1 = t1.v4 and t1.v5 > t0.v2";
String plan1 = getFragmentPlan(sql1);
assertContains(plan1, "asof join conjunct: 2: v2 < 5: v5");
String sql2 = "select t0.v1 from t0 asof join t1 on t0.v1 = t1.v4 and t1.v5 >= t0.v2";
String plan2 = getFragmentPlan(sql2);
assertContains(plan2, "asof join conjunct: 2: v2 <= 5: v5");
String sql3 = "select t0.v1 from t0 asof join t1 on t0.v1 = t1.v4 and t1.v5 < t0.v2";
String plan3 = getFragmentPlan(sql3);
assertContains(plan3, "asof join conjunct: 2: v2 > 5: v5");
String sql4 = "select t0.v1 from t0 asof join t1 on t0.v1 = t1.v4 and t1.v5 <= t0.v2";
String plan4 = getFragmentPlan(sql4);
assertContains(plan4, "asof join conjunct: 2: v2 >= 5: v5");
}
@Test
public void testAsofJoinConditionNormalizeWithFunctions() throws Exception {
String sql1 = "select t0.v1 from t0 asof join t1 on t0.v1 = t1.v4 and t1.v5 + 1 > t0.v2";
String plan1 = getFragmentPlan(sql1);
assertContains(plan1, "asof join conjunct: 2: v2 < 5: v5 + 1");
String sql2 = "select t0.v1 from t0 asof join t1 on t0.v1 = t1.v4 and t0.v2 * 2 >= t1.v5";
String plan2 = getFragmentPlan(sql2);
assertContains(plan2, "asof join conjunct: 2: v2 * 2 >= 5: v5");
String sql3 = "select t0.v1 from t0 asof join t1 on t0.v1 = t1.v4 and date_add(t1.v5, 1) > t0.v2";
ExceptionChecker.expectThrowsWithMsg(IllegalStateException.class,
"ASOF JOIN temporal condition operand must be BIGINT, DATE, or DATETIME in join ON clause",
() -> getFragmentPlan(sql3));
String sql4 = "select t0.v1 from t0 asof join t1 on t0.v1 = t1.v4 and unix_timestamp(t1.v5) < t0.v2";
String plan4 = getFragmentPlan(sql4);
assertContains(plan4, "asof join conjunct: 2: v2 > unix_timestamp(CAST(5: v5 AS DATETIME))");
}
@Test
public void testAsofJoinConditionNormalizeWithComplexExpressions() throws Exception {
String sql1 = "select t0.v1 from t0 asof join t1 on t0.v1 = t1.v4 and (t1.v5 + t1.v6) > (t0.v2 - t0.v3)";
String plan1 = getFragmentPlan(sql1);
assertContains(plan1, "asof join conjunct: 2: v2 - 3: v3 < 5: v5 + 6: v6");
String sql2 = "select t0.v1 from t0 asof join t1 on t0.v1 = t1.v4 and CASE WHEN t1.v5 > 0 THEN t1.v5 ELSE 0 END > t0.v2";
String plan2 = getFragmentPlan(sql2);
assertContains(plan2, "asof join conjunct: 2: v2 < if(5: v5 > 0, 5: v5, 0)");
String sql3 = "select t0.v1 from t0 asof join t1 on t0.v1 = t1.v4 and COALESCE(t1.v5, 0) >= t0.v2";
String plan3 = getFragmentPlan(sql3);
assertContains(plan3, "asof join conjunct: 2: v2 <= coalesce(5: v5, 0)");
}
@Test
public void testAsofJoinConditionNormalizeWithMultipleJoins() throws Exception {
String sql1 = "select t0.v1 from t0 asof join t1 on t0.v1 = t1.v4 and t0.v2 <= t1.v5 " +
"asof join t2 on t1.v4 = t2.v7 and t2.v8 > t1.v5";
String plan1 = getFragmentPlan(sql1);
assertContains(plan1, "asof join conjunct: 5: v5 < 8: v8");
String sql3 = "select t0.v1 from t0 join t1 on t0.v1 = t1.v4 " +
"asof join t2 on t1.v4 = t2.v7 and t2.v8 >= t1.v5";
String plan3 = getFragmentPlan(sql3);
assertContains(plan3, "asof join conjunct: 5: v5 <= 8: v8");
}
@Test
public void testAsofJoinConditionNormalizeWithSubqueries() {
String sql1 = "select t0.v1 from t0 asof join t1 on t0.v1 = t1.v4 and (SELECT MAX(v5) FROM t1) > t0.v2";
ExceptionChecker.expectThrowsWithMsg(IllegalStateException.class,
"ASOF JOIN requires exactly one temporal inequality condition",
() -> getFragmentPlan(sql1));
}
@Test
public void testAsofJoinOtherPredicates() throws Exception {
String sql1 = "SELECT t0.v1 FROM t0 asof JOIN t1 ON t0.v1 = t1.v4 and t0.v2 <= t1.v5 " +
"WHERE t0.v2 = t0.v3 + t1.v4 and t0.v1 =1 ;";
String plan1 = getFragmentPlan(sql1);
assertContains(plan1, "3:HASH JOIN\n" +
" | join op: ASOF INNER JOIN (BROADCAST)\n" +
" | colocate: false, reason: \n" +
" | equal join conjunct: 1: v1 = 4: v4\n" +
" | asof join conjunct: 2: v2 <= 5: v5\n" +
" | other join predicates: 2: v2 = 3: v3 + 4: v4");
String sql2 = "SELECT t0.v1 FROM t0 asof JOIN t1 ON t0.v1 = t1.v4 and t0.v2 < t0.v3 + t1.v4";
ExceptionChecker.expectThrowsWithMsg(IllegalStateException.class,
"ASOF JOIN requires exactly one temporal inequality condition",
() -> getFragmentPlan(sql2));
String sql3 = "select * from t0 asof join t1 on t0.v1 = t1.v4 and t1.v5 <= t0.v2 where (v1 > 4 and v5 < 2)";
String plan3 = getFragmentPlan(sql3);
assertContains(plan3, "3:HASH JOIN\n" +
" | join op: ASOF INNER JOIN (BROADCAST)\n" +
" | colocate: false, reason: \n" +
" | equal join conjunct: 1: v1 = 4: v4\n" +
" | asof join conjunct: 2: v2 >= 5: v5");
String sql4 = "SELECT t0.v1 FROM t0 ASOF JOIN t1 ON t0.v1 = t1.v4 AND t0.v2 <= t1.v5 " +
"where (t0.v3 = t1.v6 OR t0.v3 = t1.v4)";
String plan4 = getFragmentPlan(sql4);
assertContains(plan4, "3:HASH JOIN\n" +
" | join op: ASOF INNER JOIN (BROADCAST)\n" +
" | colocate: false, reason: \n" +
" | equal join conjunct: 1: v1 = 4: v4\n" +
" | asof join conjunct: 2: v2 <= 5: v5\n" +
" | other join predicates: (3: v3 = 6: v6) OR (3: v3 = 4: v4)");
}
}

View File

@ -686,6 +686,98 @@ public class LimitTest extends PlanTestBase {
+ " 1:OlapScanNode"));
}
@Test
public void testAsofJoinLimitPushDown() throws Exception {
String sql = "select t0.v1 from t0 asof join t1 on t0.v1 = t1.v4 and t0.v2 <= t1.v5 limit 10";
String plan = getFragmentPlan(sql);
assertContains(plan, "3:HASH JOIN\n" +
" | join op: ASOF INNER JOIN (BROADCAST)\n" +
" | colocate: false, reason: \n" +
" | equal join conjunct: 1: v1 = 4: v4\n" +
" | asof join conjunct: 2: v2 <= 5: v5\n" +
" | limit: 10\n" +
" | \n" +
" |----2:EXCHANGE\n" +
" | \n" +
" 0:OlapScanNode\n" +
" TABLE: t0\n" +
" PREAGGREGATION: ON\n" +
" PREDICATES: 1: v1 IS NOT NULL\n" +
" partitions=0/1\n" +
" rollup: t0\n" +
" tabletRatio=0/0\n" +
" tabletList=\n" +
" cardinality=1\n" +
" avgRowSize=2.0");
assertContains(plan, "1:OlapScanNode\n" +
" TABLE: t1\n" +
" PREAGGREGATION: ON\n" +
" PREDICATES: 4: v4 IS NOT NULL\n" +
" partitions=0/1\n" +
" rollup: t1\n" +
" tabletRatio=0/0\n" +
" tabletList=\n" +
" cardinality=1\n" +
" avgRowSize=2.0");
}
@Test
public void testAsofLeftJoinLimitPushDown() throws Exception {
String sql = "select t0.v1 from t0 asof left join t1 on t0.v1 = t1.v4 and t0.v2 >= t1.v5 limit 15";
String plan = getFragmentPlan(sql);
assertContains(plan, "1:OlapScanNode\n" +
" TABLE: t1\n" +
" PREAGGREGATION: ON\n" +
" partitions=0/1\n" +
" rollup: t1\n" +
" tabletRatio=0/0\n" +
" tabletList=\n" +
" cardinality=1\n" +
" avgRowSize=2.0");
assertContains(plan, "3:HASH JOIN\n" +
" | join op: ASOF LEFT OUTER JOIN (BROADCAST)\n" +
" | colocate: false, reason: \n" +
" | equal join conjunct: 1: v1 = 4: v4\n" +
" | asof join conjunct: 2: v2 >= 5: v5\n" +
" | limit: 15\n" +
" | \n" +
" |----2:EXCHANGE\n" +
" | \n" +
" 0:OlapScanNode\n" +
" TABLE: t0\n" +
" PREAGGREGATION: ON\n" +
" partitions=0/1\n" +
" rollup: t0\n" +
" tabletRatio=0/0\n" +
" tabletList=\n" +
" cardinality=1\n" +
" avgRowSize=2.0\n" +
" limit: 15");
}
@Test
public void testAsofJoinLimitWithDatetime() throws Exception {
// Test ASOF JOIN with datetime temporal condition
String sql = "select a.ta from tall a asof join tall b on a.tc = b.tc and a.th <= b.th limit 5";
String plan = getFragmentPlan(sql);
assertContains(plan, "join op: ASOF INNER JOIN");
assertContains(plan, "limit: 5");
}
@Test
public void testAsofJoinLimitWithDate() throws Exception {
// Test ASOF LEFT JOIN with date temporal condition
String sql = "select a.ta from tall a asof left join tall b on a.tc = b.tc and a.ti >= b.ti limit 8";
String plan = getFragmentPlan(sql);
assertContains(plan, "join op: ASOF LEFT OUTER JOIN");
assertContains(plan, "limit: 8");
}
@Test
public void testMergeLimitForFilterNode() throws Exception {
String sql =
@ -1059,4 +1151,5 @@ public class LimitTest extends PlanTestBase {
Assertions.assertThrows(SemanticException.class, () -> getFragmentPlan(tql));
}
}
}

View File

@ -71,6 +71,20 @@ class PushDownPredicateJoinTypeTest extends PlanTestBase {
assertContains(plan, "OUTER JOIN");
}
@ParameterizedTest
@MethodSource("asofInnerJoinStream")
void testAsofInnerJoin(String sql) throws Exception {
String plan = getFragmentPlan(sql);
assertContains(plan, "ASOF INNER JOIN");
}
@ParameterizedTest
@MethodSource("asofLeftJoinStream")
void testAsofLeftJoin(String sql) throws Exception {
String plan = getFragmentPlan(sql);
assertContains(plan, "ASOF LEFT OUTER JOIN");
}
private static Stream<Arguments> innerJoinStream() {
List<String> sqls = Lists.newArrayList();
sqls.add("select * from t0, t1 where v1 = v4 and v1 = 1 and v5 = 2");
@ -83,6 +97,16 @@ class PushDownPredicateJoinTypeTest extends PlanTestBase {
return sqls.stream().map(e -> Arguments.of(e));
}
private static Stream<Arguments> asofInnerJoinStream() {
List<String> sqls = Lists.newArrayList();
sqls.add("select * from t0 asof join t1 on t0.v1 = t1.v4 and v1 = 1 and v5 = 2 and t0.v2 <= t1.v5");
sqls.add("select * from t0 asof inner join t1 on t0.v1 = t1.v4 and t0.v2 >= t1.v5 where v1 > 4 or v5 < 2");
sqls.add("select * from t0 asof join t1 on t0.v1 = t1.v4 and t0.v2 < t1.v5 where t0.v1 > 1");
sqls.add("select * from t0 asof join t1 on t0.v1 = t1.v4 and t0.v2 > t1.v5 and t1.v6 = 2");
sqls.add("select * from t0 asof join t1 on t0.v1 = t1.v4 and t0.v2 <= t1.v5 join t2 on t1.v4 = t2.v7");
return sqls.stream().map(e -> Arguments.of(e));
}
private static Stream<Arguments> crossJoinStream() {
List<String> sqls = Lists.newArrayList();
sqls.add("select * from t0, t1 where v1 = 1 and v4 = 4");
@ -122,5 +146,15 @@ class PushDownPredicateJoinTypeTest extends PlanTestBase {
return sqls.stream().map(e -> Arguments.of(e));
}
private static Stream<Arguments> asofLeftJoinStream() {
List<String> sqls = Lists.newArrayList();
sqls.add("select * from t0 asof left join t1 on t0.v1 = t1.v4 and t0.v2 <= t1.v5");
sqls.add("select * from t0 asof left outer join t1 on t0.v1 = t1.v4 and t0.v2 >= t1.v5");
sqls.add("select * from t0 asof left join t1 on t0.v1 = t1.v4 and t0.v2 < t1.v5 where t0.v1 > 1");
sqls.add("select * from t0 asof left join t1 on t0.v1 = t1.v4 and t0.v2 > t1.v5 and t1.v6 = 2");
sqls.add("select * from t0 asof left join t1 on t0.v1 = t1.v4 and t0.v2 <= t1.v5 left join t2 on t1.v4 = t2.v7");
return sqls.stream().map(e -> Arguments.of(e));
}
}

View File

@ -219,4 +219,64 @@ public class SkewJoinV2Test extends PlanTestBase {
" | \n" +
" 4:HASH JOIN");
}
@Test
public void testSkewJoinV2WithAsofInnerJoin() throws Exception {
String sql = "select v2, v5 from t0 asof join[skew|t0.v1(1,2)] t1 on v1 = v4 and v2 <= v5";
String sqlPlan = getVerboseExplain(sql);
assertCContains(sqlPlan, "4:HASH JOIN\n" +
" | join op: ASOF INNER JOIN (PARTITIONED)\n" +
" | equal join conjunct: [1: v1, BIGINT, true] = [4: v4, BIGINT, true]\n" +
" | asof join conjunct: [2: v2, BIGINT, true] <= [5: v5, BIGINT, true]");
assertCContains(sqlPlan, "8:HASH JOIN\n" +
" | | join op: ASOF INNER JOIN (BROADCAST)\n" +
" | | equal join conjunct: [1: v1, BIGINT, true] = [4: v4, BIGINT, true]\n" +
" | | asof join conjunct: [2: v2, BIGINT, true] <= [5: v5, BIGINT, true]");
// Verify UNION structure is maintained
assertCContains(sqlPlan, "10:UNION\n" +
" | child exprs:\n" +
" | [2: v2, BIGINT, true] | [5: v5, BIGINT, true]\n" +
" | [2: v2, BIGINT, true] | [5: v5, BIGINT, true]\n" +
" | pass-through-operands: all");
// Verify split data sink for left table (t0)
assertCContains(sqlPlan, "PLAN FRAGMENT 2(F00)\n" +
"\n" +
" Input Partition: RANDOM\n" +
" SplitCastDataSink:\n" +
" OutPut Partition: HASH_PARTITIONED: 1: v1\n" +
" OutPut Exchange Id: 02\n" +
" Split expr: ([1: v1, BIGINT, true] NOT IN (1, 2)) OR ([1: v1, BIGINT, true] IS NULL)\n" +
" OutPut Partition: RANDOM\n" +
" OutPut Exchange Id: 06\n" +
" Split expr: [1: v1, BIGINT, true] IN (1, 2)\n" +
"\n" +
" 0:OlapScanNode\n" +
" table: t0, rollup: t0");
// Verify split data sink for right table (t1)
assertCContains(sqlPlan, "PLAN FRAGMENT 1(F01)\n" +
"\n" +
" Input Partition: RANDOM\n" +
" SplitCastDataSink:\n" +
" OutPut Partition: HASH_PARTITIONED: 4: v4\n" +
" OutPut Exchange Id: 03\n" +
" Split expr: ([4: v4, BIGINT, true] NOT IN (1, 2)) OR ([4: v4, BIGINT, true] IS NULL)\n" +
" OutPut Partition: UNPARTITIONED\n" +
" OutPut Exchange Id: 07\n" +
" Split expr: [4: v4, BIGINT, true] IN (1, 2)\n" +
"\n" +
" 1:OlapScanNode\n" +
" table: t1, rollup: t1");
}
@Test
public void testSkewJoinV2WithAsofLeftJoin() throws Exception {
String sql = "select v2, v5 from t0 asof left join[skew|t0.v1(1,2)] t1 on v1 = v4 and v2 >= v5";
String sqlPlan = getVerboseExplain(sql);
assertCContains(sqlPlan, "join op: ASOF LEFT OUTER JOIN (PARTITIONED)");
}
}

View File

@ -2445,12 +2445,21 @@ namedArgument
;
joinRelation
: crossOrInnerJoinType bracketHint?
: asofJoinType bracketHint?
rightRelation=relationPrimary joinCriteria
| crossOrInnerJoinType bracketHint?
LATERAL? rightRelation=relationPrimary joinCriteria?
| outerAndSemiJoinType bracketHint?
LATERAL? rightRelation=relationPrimary joinCriteria
;
asofJoinType
: ASOF JOIN
| ASOF INNER JOIN
| ASOF LEFT JOIN
| ASOF LEFT OUTER JOIN
;
crossOrInnerJoinType
: JOIN | INNER JOIN
| CROSS | CROSS JOIN

View File

@ -42,6 +42,7 @@ ARRAY_AGG: 'ARRAY_AGG';
ARRAY_AGG_DISTINCT: 'ARRAY_AGG_DISTINCT';
AS: 'AS';
ASC: 'ASC';
ASOF: 'ASOF';
ASYNC: 'ASYNC';
ASSERT_ROWS: 'ASSERT_ROWS';
AUTHORS: 'AUTHORS';

View File

@ -693,6 +693,15 @@ struct TEqJoinCondition {
3: optional Opcodes.TExprOpcode opcode;
}
struct TAsofJoinCondition {
// left-hand side of the asof condition (probe side)
1: required Exprs.TExpr left;
// right-hand side of the asof condition (build side)
2: required Exprs.TExpr right;
// operator for asof join: LT, LE, GT, GE
3: required Opcodes.TExprOpcode opcode;
}
enum TStreamingPreaggregationMode {
AUTO,
FORCE_STREAMING,
@ -718,7 +727,9 @@ enum TJoinOp {
// on the build side. Those NULLs are considered candidate matches, and therefore could
// be rejected (ANTI-join), based on the other join conjuncts. This is in contrast
// to LEFT_ANTI_JOIN where NULLs are not matches and therefore always returned.
NULL_AWARE_LEFT_ANTI_JOIN
NULL_AWARE_LEFT_ANTI_JOIN,
ASOF_INNER_JOIN,
ASOF_LEFT_OUTER_JOIN
}
enum TJoinDistributionMode {
@ -768,6 +779,8 @@ struct THashJoinNode {
58: optional bool is_skew_join = false
59: optional map<Types.TSlotId, Exprs.TExpr> common_slot_map
70: optional TAsofJoinCondition asof_join_condition
}
struct TMergeJoinNode {

View File

@ -0,0 +1,55 @@
-- name: test_asof_join
CREATE DATABASE test_asof_join;
-- result:
-- !result
use test_asof_join;
-- result:
-- !result
CREATE TABLE orders (
`order_id` int(11) NOT NULL COMMENT "订单ID",
`user_id` int(11) NOT NULL COMMENT "用户ID",
`order_time` datetime NOT NULL COMMENT "订单时间",
`amount` decimal(10,2) NOT NULL COMMENT "订单金额"
) ENGINE=OLAP
COMMENT "订单表"
DISTRIBUTED BY HASH(`order_id`)
PROPERTIES (
"replication_num" = "1"
);
-- result:
-- !result
CREATE TABLE user_status (
`user_id` int(11) NOT NULL COMMENT "用户ID",
`status_time` datetime NOT NULL COMMENT "状态变更时间",
`status` varchar(20) NOT NULL COMMENT "用户状态",
`credit_score` int(11) NOT NULL COMMENT "信用评分"
) ENGINE=OLAP
COMMENT "用户状态表"
DISTRIBUTED BY HASH(`user_id`)
PROPERTIES (
"replication_num" = "1"
);
-- result:
-- !result
INSERT INTO orders VALUES
(1, 101, '2024-01-01 10:00:00', 100.00),
(2, 101, '2024-01-01 15:30:00', 200.00),
(3, 102, '2024-01-01 11:00:00', 150.00),
(4, 102, '2024-01-01 16:00:00', 300.00),
(5, 101, '2024-01-02 09:00:00', 250.00),
(6, 102, '2024-01-02 14:00:00', 180.00);
-- result:
-- !result
INSERT INTO user_status VALUES
(101, '2024-01-01 08:00:00', 'NORMAL', 750),
(101, '2024-01-01 14:00:00', 'VIP', 850),
(101, '2024-01-02 08:00:00', 'PREMIUM', 900),
(102, '2024-01-01 09:00:00', 'NORMAL', 700),
(102, '2024-01-01 13:00:00', 'VIP', 800),
(102, '2024-01-02 12:00:00', 'PREMIUM', 950);
-- result:
-- !result
function: assert_explain_contains('SELECT * FROM orders o ASOF LEFT JOIN user_status us ON o.user_id = us.user_id AND o.order_time >= us.status_time', 'ASOF LEFT OUTER JOIN')
-- result:
None
-- !result

View File

@ -0,0 +1,46 @@
-- name: test_asof_join
CREATE DATABASE test_asof_join;
use test_asof_join;
CREATE TABLE orders (
`order_id` int(11) NOT NULL COMMENT "订单ID",
`user_id` int(11) NOT NULL COMMENT "用户ID",
`order_time` datetime NOT NULL COMMENT "订单时间",
`amount` decimal(10,2) NOT NULL COMMENT "订单金额"
) ENGINE=OLAP
COMMENT "订单表"
DISTRIBUTED BY HASH(`order_id`)
PROPERTIES (
"replication_num" = "1"
);
CREATE TABLE user_status (
`user_id` int(11) NOT NULL COMMENT "用户ID",
`status_time` datetime NOT NULL COMMENT "状态变更时间",
`status` varchar(20) NOT NULL COMMENT "用户状态",
`credit_score` int(11) NOT NULL COMMENT "信用评分"
) ENGINE=OLAP
COMMENT "用户状态表"
DISTRIBUTED BY HASH(`user_id`)
PROPERTIES (
"replication_num" = "1"
);
INSERT INTO orders VALUES
(1, 101, '2024-01-01 10:00:00', 100.00),
(2, 101, '2024-01-01 15:30:00', 200.00),
(3, 102, '2024-01-01 11:00:00', 150.00),
(4, 102, '2024-01-01 16:00:00', 300.00),
(5, 101, '2024-01-02 09:00:00', 250.00),
(6, 102, '2024-01-02 14:00:00', 180.00);
INSERT INTO user_status VALUES
(101, '2024-01-01 08:00:00', 'NORMAL', 750),
(101, '2024-01-01 14:00:00', 'VIP', 850),
(101, '2024-01-02 08:00:00', 'PREMIUM', 900),
(102, '2024-01-01 09:00:00', 'NORMAL', 700),
(102, '2024-01-01 13:00:00', 'VIP', 800),
(102, '2024-01-02 12:00:00', 'PREMIUM', 950);
function: assert_explain_contains('SELECT * FROM orders o ASOF LEFT JOIN user_status us ON o.user_id = us.user_id AND o.order_time >= us.status_time', 'ASOF LEFT OUTER JOIN')