[BugFix] Fix bugs with partition_condition_retention (#58867)
Signed-off-by: shuming.li <ming.moriarty@gmail.com>
This commit is contained in:
parent
fa812f2baa
commit
b6d0bc5471
|
|
@ -3598,6 +3598,12 @@ public class OlapTable extends Table {
|
||||||
properties.put(PropertyAnalyzer.PROPERTIES_FLAT_JSON_COLUMN_MAX, flatJsonColumnMax);
|
properties.put(PropertyAnalyzer.PROPERTIES_FLAT_JSON_COLUMN_MAX, flatJsonColumnMax);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// partition_retention_condition
|
||||||
|
String partitionRetentionCondition = tableProperties.get(PropertyAnalyzer.PROPERTIES_PARTITION_RETENTION_CONDITION);
|
||||||
|
if (!Strings.isNullOrEmpty(partitionRetentionCondition)) {
|
||||||
|
properties.put(PropertyAnalyzer.PROPERTIES_PARTITION_RETENTION_CONDITION, partitionRetentionCondition);
|
||||||
|
}
|
||||||
|
|
||||||
return properties;
|
return properties;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -500,12 +500,13 @@ public class DynamicPartitionScheduler extends FrontendDaemon {
|
||||||
protected void runAfterCatalogReady() {
|
protected void runAfterCatalogReady() {
|
||||||
// Find all tables that need to be scheduled.
|
// Find all tables that need to be scheduled.
|
||||||
long now = System.currentTimeMillis();
|
long now = System.currentTimeMillis();
|
||||||
if ((now - lastFindingTime) > Math.max(300000, Config.dynamic_partition_check_interval_seconds)) {
|
long checkIntervalMs = Config.dynamic_partition_check_interval_seconds * 1000L;
|
||||||
|
if ((now - lastFindingTime) > Math.max(60000, checkIntervalMs)) {
|
||||||
findSchedulableTables();
|
findSchedulableTables();
|
||||||
}
|
}
|
||||||
|
|
||||||
// Update scheduler interval.
|
// Update scheduler interval.
|
||||||
setInterval(Config.dynamic_partition_check_interval_seconds * 1000L);
|
setInterval(checkIntervalMs);
|
||||||
|
|
||||||
// Schedule tables with dynamic partition enabled (only works for base table).
|
// Schedule tables with dynamic partition enabled (only works for base table).
|
||||||
if (Config.dynamic_partition_enable) {
|
if (Config.dynamic_partition_enable) {
|
||||||
|
|
|
||||||
|
|
@ -30,7 +30,6 @@ import com.starrocks.catalog.RangePartitionInfo;
|
||||||
import com.starrocks.catalog.Table;
|
import com.starrocks.catalog.Table;
|
||||||
import com.starrocks.catalog.Type;
|
import com.starrocks.catalog.Type;
|
||||||
import com.starrocks.common.AnalysisException;
|
import com.starrocks.common.AnalysisException;
|
||||||
import com.starrocks.common.DdlException;
|
|
||||||
import com.starrocks.common.Pair;
|
import com.starrocks.common.Pair;
|
||||||
import com.starrocks.common.util.concurrent.lock.AutoCloseableLock;
|
import com.starrocks.common.util.concurrent.lock.AutoCloseableLock;
|
||||||
import com.starrocks.common.util.concurrent.lock.LockType;
|
import com.starrocks.common.util.concurrent.lock.LockType;
|
||||||
|
|
@ -84,6 +83,16 @@ public class PartitionTTLScheduler {
|
||||||
}
|
}
|
||||||
|
|
||||||
public void scheduleTTLPartition() {
|
public void scheduleTTLPartition() {
|
||||||
|
// connect context is needed sometime, otherwise ConnectContext.get() is null.
|
||||||
|
ConnectContext connectContext = ConnectContext.buildInner();
|
||||||
|
try (var guard = connectContext.bindScope()) {
|
||||||
|
doScheduleTTLPartition();
|
||||||
|
} catch (Exception e) {
|
||||||
|
LOG.warn("Failed to schedule ttl partition", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public void doScheduleTTLPartition() {
|
||||||
Iterator<Pair<Long, Long>> iterator = ttlPartitionInfo.iterator();
|
Iterator<Pair<Long, Long>> iterator = ttlPartitionInfo.iterator();
|
||||||
while (iterator.hasNext()) {
|
while (iterator.hasNext()) {
|
||||||
Pair<Long, Long> tableInfo = iterator.next();
|
Pair<Long, Long> tableInfo = iterator.next();
|
||||||
|
|
@ -114,27 +123,39 @@ public class PartitionTTLScheduler {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
// get expired partition names
|
try {
|
||||||
List<String> dropPartitionNames = getExpiredPartitionNames(db, olapTable, partitionInfo);
|
// schedule drop expired partitions
|
||||||
if (CollectionUtils.isEmpty(dropPartitionNames)) {
|
doScheduleTableTTLPartition(db, olapTable);
|
||||||
continue;
|
} catch (Exception e) {
|
||||||
|
LOG.warn("database={}-{}, table={}-{} failed to schedule drop expired partitions",
|
||||||
|
db.getFullName(), dbId, olapTable.getName(), tableId, e);
|
||||||
}
|
}
|
||||||
LOG.info("database={}, table={} has ttl partitions to drop: {}", db.getOriginName(), olapTable.getName(),
|
}
|
||||||
dropPartitionNames);
|
}
|
||||||
|
|
||||||
// do drop partitions
|
private void doScheduleTableTTLPartition(Database db, OlapTable olapTable) {
|
||||||
String tableName = olapTable.getName();
|
// get expired partition names
|
||||||
List<DropPartitionClause> dropPartitionClauses = buildDropPartitionClauses(dropPartitionNames);
|
final PartitionInfo partitionInfo = olapTable.getPartitionInfo();
|
||||||
for (DropPartitionClause dropPartitionClause : dropPartitionClauses) {
|
List<String> dropPartitionNames = getExpiredPartitionNames(db, olapTable, partitionInfo);
|
||||||
try (AutoCloseableLock ignore = new AutoCloseableLock(
|
if (CollectionUtils.isEmpty(dropPartitionNames)) {
|
||||||
new Locker(), db.getId(), Lists.newArrayList(olapTable.getId()), LockType.WRITE)) {
|
LOG.info("database={}, table={} has no expired partitions to drop", db.getOriginName(), olapTable.getName());
|
||||||
AlterTableClauseAnalyzer analyzer = new AlterTableClauseAnalyzer(olapTable);
|
return;
|
||||||
analyzer.analyze(new ConnectContext(), dropPartitionClause);
|
}
|
||||||
GlobalStateMgr.getCurrentState().getLocalMetastore().dropPartition(db, olapTable, dropPartitionClause);
|
LOG.info("database={}, table={} has ttl partitions to drop: {}", db.getOriginName(), olapTable.getName(),
|
||||||
runtimeInfoCollector.clearDropPartitionFailedMsg(tableName);
|
dropPartitionNames);
|
||||||
} catch (DdlException e) {
|
|
||||||
runtimeInfoCollector.recordDropPartitionFailedMsg(db.getOriginName(), tableName, e.getMessage());
|
// do drop partitions
|
||||||
}
|
String tableName = olapTable.getName();
|
||||||
|
List<DropPartitionClause> dropPartitionClauses = buildDropPartitionClauses(dropPartitionNames);
|
||||||
|
for (DropPartitionClause dropPartitionClause : dropPartitionClauses) {
|
||||||
|
try (AutoCloseableLock ignore = new AutoCloseableLock(
|
||||||
|
new Locker(), db.getId(), Lists.newArrayList(olapTable.getId()), LockType.WRITE)) {
|
||||||
|
AlterTableClauseAnalyzer analyzer = new AlterTableClauseAnalyzer(olapTable);
|
||||||
|
analyzer.analyze(new ConnectContext(), dropPartitionClause);
|
||||||
|
GlobalStateMgr.getCurrentState().getLocalMetastore().dropPartition(db, olapTable, dropPartitionClause);
|
||||||
|
runtimeInfoCollector.clearDropPartitionFailedMsg(tableName);
|
||||||
|
} catch (Exception e) {
|
||||||
|
runtimeInfoCollector.recordDropPartitionFailedMsg(db.getOriginName(), tableName, e.getMessage());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -192,7 +213,9 @@ public class PartitionTTLScheduler {
|
||||||
dropPartitionNames = PartitionSelector.getExpiredPartitionsByRetentionCondition(db, olapTable, ttlCondition);
|
dropPartitionNames = PartitionSelector.getExpiredPartitionsByRetentionCondition(db, olapTable, ttlCondition);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} catch (AnalysisException e) {
|
LOG.info("database={}-{}, table={}-{} drop partitions by ttl: {}",
|
||||||
|
db.getFullName(), dbId, olapTable.getName(), tableId, dropPartitionNames);
|
||||||
|
} catch (Exception e) {
|
||||||
LOG.warn("database={}-{}, table={}-{} failed to build drop partition statement.",
|
LOG.warn("database={}-{}, table={}-{} failed to build drop partition statement.",
|
||||||
db.getFullName(), dbId, olapTable.getName(), tableId, e);
|
db.getFullName(), dbId, olapTable.getName(), tableId, e);
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -443,7 +443,10 @@ public class PropertyAnalyzer {
|
||||||
if (properties != null && properties.containsKey(PROPERTIES_PARTITION_RETENTION_CONDITION)) {
|
if (properties != null && properties.containsKey(PROPERTIES_PARTITION_RETENTION_CONDITION)) {
|
||||||
partitionRetentionCondition = properties.get(PROPERTIES_PARTITION_RETENTION_CONDITION);
|
partitionRetentionCondition = properties.get(PROPERTIES_PARTITION_RETENTION_CONDITION);
|
||||||
if (Strings.isNullOrEmpty(partitionRetentionCondition)) {
|
if (Strings.isNullOrEmpty(partitionRetentionCondition)) {
|
||||||
throw new SemanticException("Illegal partition retention condition: " + partitionRetentionCondition);
|
if (removeProperties) {
|
||||||
|
properties.remove(PROPERTIES_PARTITION_RETENTION_CONDITION);
|
||||||
|
}
|
||||||
|
return partitionRetentionCondition;
|
||||||
}
|
}
|
||||||
// parse retention condition
|
// parse retention condition
|
||||||
Expr whereExpr = null;
|
Expr whereExpr = null;
|
||||||
|
|
|
||||||
|
|
@ -3734,9 +3734,6 @@ public class LocalMetastore implements ConnectorMetadata, MVRepairHandler, Memor
|
||||||
}
|
}
|
||||||
String partitionRetentionCondition = PropertyAnalyzer.analyzePartitionRetentionCondition(
|
String partitionRetentionCondition = PropertyAnalyzer.analyzePartitionRetentionCondition(
|
||||||
db, table, properties, true, null);
|
db, table, properties, true, null);
|
||||||
if (Strings.isNullOrEmpty(partitionRetentionCondition)) {
|
|
||||||
throw new DdlException("Invalid partition retention condition");
|
|
||||||
}
|
|
||||||
results.put(PropertyAnalyzer.PROPERTIES_PARTITION_RETENTION_CONDITION, partitionRetentionCondition);
|
results.put(PropertyAnalyzer.PROPERTIES_PARTITION_RETENTION_CONDITION, partitionRetentionCondition);
|
||||||
}
|
}
|
||||||
if (properties.containsKey(PropertyAnalyzer.PROPERTIES_TIME_DRIFT_CONSTRAINT)) {
|
if (properties.containsKey(PropertyAnalyzer.PROPERTIES_TIME_DRIFT_CONSTRAINT)) {
|
||||||
|
|
|
||||||
|
|
@ -83,6 +83,7 @@ import java.nio.ByteBuffer;
|
||||||
import java.nio.charset.Charset;
|
import java.nio.charset.Charset;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
import java.util.Optional;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
|
|
@ -236,7 +237,7 @@ public class PartitionSelector {
|
||||||
} else if (partitionInfo.isListPartition()) {
|
} else if (partitionInfo.isListPartition()) {
|
||||||
ListPartitionInfo listPartitionInfo = (ListPartitionInfo) partitionInfo;
|
ListPartitionInfo listPartitionInfo = (ListPartitionInfo) partitionInfo;
|
||||||
selectedPartitionIds = getListPartitionIdsByExpr(tableName.getDb(), olapTable, listPartitionInfo,
|
selectedPartitionIds = getListPartitionIdsByExpr(tableName.getDb(), olapTable, listPartitionInfo,
|
||||||
whereExpr, scalarOperator, exprToColumnIdxes, inputCells);
|
whereExpr, scalarOperator, exprToColumnIdxes, isRecyclingCondition, inputCells);
|
||||||
} else {
|
} else {
|
||||||
throw new SemanticException("Unsupported partition type: " + partitionInfo.getType());
|
throw new SemanticException("Unsupported partition type: " + partitionInfo.getType());
|
||||||
}
|
}
|
||||||
|
|
@ -453,12 +454,14 @@ public class PartitionSelector {
|
||||||
Expr whereExpr,
|
Expr whereExpr,
|
||||||
ScalarOperator scalarOperator,
|
ScalarOperator scalarOperator,
|
||||||
Map<Expr, Integer> exprToColumnIdxes,
|
Map<Expr, Integer> exprToColumnIdxes,
|
||||||
|
boolean isRecyclingCondition,
|
||||||
Map<Long, PCell> inputCells) {
|
Map<Long, PCell> inputCells) {
|
||||||
|
|
||||||
List<Long> result = null;
|
List<Long> result = null;
|
||||||
// try to prune partitions by FE's constant evaluation ability
|
// try to prune partitions by FE's constant evaluation ability
|
||||||
try {
|
try {
|
||||||
result = getListPartitionIdsByExprV1(olapTable, listPartitionInfo, scalarOperator, inputCells);
|
result = getListPartitionIdsByExprV1(olapTable, listPartitionInfo, scalarOperator,
|
||||||
|
isRecyclingCondition, inputCells);
|
||||||
if (result != null) {
|
if (result != null) {
|
||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
|
|
@ -475,12 +478,79 @@ public class PartitionSelector {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static class Recorder {
|
||||||
|
// used to record the state of eval result
|
||||||
|
private boolean isConstTrue = false;
|
||||||
|
|
||||||
|
public Recorder() {
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean isConstTrue() {
|
||||||
|
return isConstTrue;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Check if the eval result is satisfied and short-circuit the evaluation.
|
||||||
|
// TODO: refactor and move it to a common place
|
||||||
|
* @param rewriter : rewriter to rewrite the partition condition scalar operator by constant fold and so on
|
||||||
|
* @param replaceColumnRefRewriter: replace columnRef with literal
|
||||||
|
* @param scalarOperator : partition condition scalar operator
|
||||||
|
* @param isRecyclingCondition : true for recycling/dropping condition, false for retention condition.
|
||||||
|
* @param record : used to record the state of eval result
|
||||||
|
* @return : true if the eval result is satisfied, false if not satisfied, null if the eval result is not set.
|
||||||
|
*/
|
||||||
|
private static Optional<Boolean> isEvalResultSatisfied(ScalarOperatorRewriter rewriter,
|
||||||
|
ReplaceColumnRefRewriter replaceColumnRefRewriter,
|
||||||
|
ScalarOperator scalarOperator,
|
||||||
|
boolean isRecyclingCondition,
|
||||||
|
Recorder record) {
|
||||||
|
ScalarOperator result = replaceColumnRefRewriter.rewrite(scalarOperator);
|
||||||
|
result = rewriter.rewrite(result, ScalarOperatorRewriter.DEFAULT_REWRITE_RULES);
|
||||||
|
if (!result.isConstant()) {
|
||||||
|
return Optional.empty();
|
||||||
|
}
|
||||||
|
if (isRecyclingCondition) {
|
||||||
|
// for recycling condition, keep partitions as less as possible
|
||||||
|
// T1, partitions:
|
||||||
|
// p1: [1, 2]
|
||||||
|
// p2: [2, 3]
|
||||||
|
// eg: alter table T1 drop partitions where p > 1, only choose p1 when all values are satisfied.
|
||||||
|
// p1: [1, 2] is not satisfied, so we need to return p2: [2, 3]
|
||||||
|
if (result.isConstantFalse()) {
|
||||||
|
record.isConstTrue = false;
|
||||||
|
return Optional.of(true);
|
||||||
|
} else if (result.isConstantTrue()) {
|
||||||
|
record.isConstTrue = true;
|
||||||
|
} else {
|
||||||
|
return Optional.empty();
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
// for retention condition, keep partitions as much as possible
|
||||||
|
// T1, partitions:
|
||||||
|
// p1: [1, 2]
|
||||||
|
// p2: [2, 3]
|
||||||
|
// eg: partition_retention_condition = 'p > 1', choose partition if it once is satisfied.
|
||||||
|
// p1: [1, 2]/[2, 3] are all satisfied, so we need to return p1/p2 both
|
||||||
|
if (result.isConstantFalse()) {
|
||||||
|
record.isConstTrue = false;
|
||||||
|
} else if (result.isConstantTrue()) {
|
||||||
|
record.isConstTrue = true;
|
||||||
|
return Optional.of(true);
|
||||||
|
} else {
|
||||||
|
return Optional.empty();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return Optional.of(false);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Fetch selected partition ids by using FE's constant evaluation ability.
|
* Fetch selected partition ids by using FE's constant evaluation ability.
|
||||||
*/
|
*/
|
||||||
private static List<Long> getListPartitionIdsByExprV1(OlapTable olapTable,
|
private static List<Long> getListPartitionIdsByExprV1(OlapTable olapTable,
|
||||||
ListPartitionInfo listPartitionInfo,
|
ListPartitionInfo listPartitionInfo,
|
||||||
ScalarOperator scalarOperator,
|
ScalarOperator scalarOperator,
|
||||||
|
boolean isRecyclingCondition,
|
||||||
Map<Long, PCell> inputCells) {
|
Map<Long, PCell> inputCells) {
|
||||||
// eval for each conjunct
|
// eval for each conjunct
|
||||||
Map<ColumnRefOperator, Integer> colRefIdxMap = Maps.newHashMap();
|
Map<ColumnRefOperator, Integer> colRefIdxMap = Maps.newHashMap();
|
||||||
|
|
@ -497,84 +567,77 @@ public class PartitionSelector {
|
||||||
List<Long> selectedPartitionIds = Lists.newArrayList();
|
List<Long> selectedPartitionIds = Lists.newArrayList();
|
||||||
// single partition column
|
// single partition column
|
||||||
Map<Long, List<LiteralExpr>> listPartitions = listPartitionInfo.getLiteralExprValues();
|
Map<Long, List<LiteralExpr>> listPartitions = listPartitionInfo.getLiteralExprValues();
|
||||||
|
final Recorder recorder = new Recorder();
|
||||||
for (Map.Entry<Long, List<LiteralExpr>> e : listPartitions.entrySet()) {
|
for (Map.Entry<Long, List<LiteralExpr>> e : listPartitions.entrySet()) {
|
||||||
boolean isConstTrue = false;
|
|
||||||
for (LiteralExpr literalExpr : e.getValue()) {
|
for (LiteralExpr literalExpr : e.getValue()) {
|
||||||
Map<ColumnRefOperator, ScalarOperator> replaceMap = Maps.newHashMap();
|
Map<ColumnRefOperator, ScalarOperator> replaceMap = Maps.newHashMap();
|
||||||
ConstantOperator replace = (ConstantOperator) SqlToScalarOperatorTranslator.translate(literalExpr);
|
ConstantOperator replace = (ConstantOperator) SqlToScalarOperatorTranslator.translate(literalExpr);
|
||||||
replaceMap.put(usedPartitionColumnRefs.get(0), replace);
|
replaceMap.put(usedPartitionColumnRefs.get(0), replace);
|
||||||
|
|
||||||
// replace columnRef with literal
|
// replace columnRef with literal
|
||||||
ReplaceColumnRefRewriter replaceColumnRefRewriter = new ReplaceColumnRefRewriter(replaceMap);
|
final ReplaceColumnRefRewriter replaceColumnRefRewriter = new ReplaceColumnRefRewriter(replaceMap);
|
||||||
ScalarOperator result = replaceColumnRefRewriter.rewrite(scalarOperator);
|
// check for each partition value
|
||||||
result = rewriter.rewrite(result, ScalarOperatorRewriter.FOLD_CONSTANT_RULES);
|
final Optional<Boolean> evalResultSatisfied = isEvalResultSatisfied(rewriter, replaceColumnRefRewriter,
|
||||||
|
scalarOperator, isRecyclingCondition, recorder);
|
||||||
if (!result.isConstant()) {
|
// if eval result is not set, return it directly
|
||||||
|
if (evalResultSatisfied == null || evalResultSatisfied.isEmpty()) {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
if (result.isConstantFalse()) {
|
// if eval result is satisfied, break the loop
|
||||||
isConstTrue = false;
|
if (evalResultSatisfied.get()) {
|
||||||
break;
|
break;
|
||||||
} else if (result.isConstantTrue()) {
|
|
||||||
isConstTrue = true;
|
|
||||||
} else {
|
|
||||||
return null;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (isConstTrue) {
|
if (recorder.isConstTrue()) {
|
||||||
selectedPartitionIds.add(e.getKey());
|
selectedPartitionIds.add(e.getKey());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// multi partition columns
|
// multi partition columns
|
||||||
Map<Long, List<List<LiteralExpr>>> multiListPartitions = listPartitionInfo.getMultiLiteralExprValues();
|
Map<Long, List<List<LiteralExpr>>> multiListPartitions = listPartitionInfo.getMultiLiteralExprValues();
|
||||||
for (Map.Entry<Long, List<List<LiteralExpr>>> e : multiListPartitions.entrySet()) {
|
for (Map.Entry<Long, List<List<LiteralExpr>>> e : multiListPartitions.entrySet()) {
|
||||||
boolean isConstTrue = false;
|
|
||||||
for (List<LiteralExpr> values : e.getValue()) {
|
for (List<LiteralExpr> values : e.getValue()) {
|
||||||
Map<ColumnRefOperator, ScalarOperator> replaceMap = buildReplaceMap(colRefIdxMap, values);
|
final Map<ColumnRefOperator, ScalarOperator> replaceMap = buildReplaceMap(colRefIdxMap, values);
|
||||||
ReplaceColumnRefRewriter replaceColumnRefRewriter = new ReplaceColumnRefRewriter(replaceMap);
|
final ReplaceColumnRefRewriter replaceColumnRefRewriter = new ReplaceColumnRefRewriter(replaceMap);
|
||||||
ScalarOperator result = replaceColumnRefRewriter.rewrite(scalarOperator);
|
|
||||||
result = rewriter.rewrite(result, ScalarOperatorRewriter.DEFAULT_REWRITE_RULES);
|
// check for each partition value
|
||||||
if (!result.isConstant()) {
|
final Optional<Boolean> evalResultSatisfied = isEvalResultSatisfied(rewriter, replaceColumnRefRewriter,
|
||||||
|
scalarOperator, isRecyclingCondition, recorder);
|
||||||
|
// if eval result is not set, return it directly
|
||||||
|
if (evalResultSatisfied == null || evalResultSatisfied.isEmpty()) {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
if (result.isConstantFalse()) {
|
// if eval result is satisfied, break the loop
|
||||||
isConstTrue = false;
|
if (evalResultSatisfied.get()) {
|
||||||
break;
|
break;
|
||||||
} else if (result.isConstantTrue()) {
|
|
||||||
isConstTrue = true;
|
|
||||||
} else {
|
|
||||||
return null;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (isConstTrue) {
|
if (recorder.isConstTrue()) {
|
||||||
selectedPartitionIds.add(e.getKey());
|
selectedPartitionIds.add(e.getKey());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (!CollectionUtils.sizeIsEmpty(inputCells)) {
|
if (!CollectionUtils.sizeIsEmpty(inputCells)) {
|
||||||
for (Map.Entry<Long, PCell> e : inputCells.entrySet()) {
|
for (Map.Entry<Long, PCell> e : inputCells.entrySet()) {
|
||||||
boolean isConstTrue = false;
|
|
||||||
PListCell pListCell = (PListCell) e.getValue();
|
PListCell pListCell = (PListCell) e.getValue();
|
||||||
for (List<String> values : pListCell.getPartitionItems()) {
|
for (List<String> values : pListCell.getPartitionItems()) {
|
||||||
Map<ColumnRefOperator, ScalarOperator> replaceMap = buildReplaceMapWithCell(colRefIdxMap, values);
|
final Map<ColumnRefOperator, ScalarOperator> replaceMap = buildReplaceMapWithCell(colRefIdxMap, values);
|
||||||
if (replaceMap == null) {
|
if (replaceMap == null) {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
ReplaceColumnRefRewriter replaceColumnRefRewriter = new ReplaceColumnRefRewriter(replaceMap);
|
final ReplaceColumnRefRewriter replaceColumnRefRewriter = new ReplaceColumnRefRewriter(replaceMap);
|
||||||
ScalarOperator result = replaceColumnRefRewriter.rewrite(scalarOperator);
|
|
||||||
result = rewriter.rewrite(result, ScalarOperatorRewriter.DEFAULT_REWRITE_RULES);
|
// check for each partition value
|
||||||
if (!result.isConstant()) {
|
final Optional<Boolean> evalResultSatisfied = isEvalResultSatisfied(rewriter, replaceColumnRefRewriter,
|
||||||
|
scalarOperator, isRecyclingCondition, recorder);
|
||||||
|
// if eval result is not set, return it directly
|
||||||
|
if (evalResultSatisfied == null || evalResultSatisfied.isEmpty()) {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
if (result.isConstantFalse()) {
|
// if eval result is satisfied, break the loop
|
||||||
isConstTrue = false;
|
if (evalResultSatisfied.get()) {
|
||||||
break;
|
break;
|
||||||
} else if (result.isConstantTrue()) {
|
|
||||||
isConstTrue = true;
|
|
||||||
} else {
|
|
||||||
return null;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (isConstTrue) {
|
if (recorder.isConstTrue()) {
|
||||||
selectedPartitionIds.add(e.getKey());
|
selectedPartitionIds.add(e.getKey());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -22,15 +22,16 @@ import com.starrocks.qe.ConnectContext;
|
||||||
import com.starrocks.sql.ast.CreateTableStmt;
|
import com.starrocks.sql.ast.CreateTableStmt;
|
||||||
import com.starrocks.sql.ast.PartitionDesc;
|
import com.starrocks.sql.ast.PartitionDesc;
|
||||||
import com.starrocks.utframe.StarRocksAssert;
|
import com.starrocks.utframe.StarRocksAssert;
|
||||||
|
import com.starrocks.utframe.StarRocksTestBase;
|
||||||
import com.starrocks.utframe.UtFrameUtils;
|
import com.starrocks.utframe.UtFrameUtils;
|
||||||
|
import org.apache.logging.log4j.util.Strings;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
import org.junit.BeforeClass;
|
import org.junit.BeforeClass;
|
||||||
import org.junit.Rule;
|
import org.junit.Rule;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import org.junit.rules.ExpectedException;
|
import org.junit.rules.ExpectedException;
|
||||||
|
|
||||||
public class CreateTableWithPartitionTest {
|
public class CreateTableWithPartitionTest extends StarRocksTestBase {
|
||||||
private static StarRocksAssert starRocksAssert;
|
|
||||||
|
|
||||||
@Rule
|
@Rule
|
||||||
public ExpectedException expectedEx = ExpectedException.none();
|
public ExpectedException expectedEx = ExpectedException.none();
|
||||||
|
|
@ -1034,7 +1035,7 @@ public class CreateTableWithPartitionTest {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testRangeTableWithRetentionCondition() throws Exception {
|
public void testRangeTableWithRetentionCondition1() throws Exception {
|
||||||
starRocksAssert.withTable("CREATE TABLE r1 \n" +
|
starRocksAssert.withTable("CREATE TABLE r1 \n" +
|
||||||
"(\n" +
|
"(\n" +
|
||||||
" dt date,\n" +
|
" dt date,\n" +
|
||||||
|
|
@ -1096,5 +1097,83 @@ public class CreateTableWithPartitionTest {
|
||||||
Assert.assertEquals("dt > current_date() - interval 1 month", retentionCondition);
|
Assert.assertEquals("dt > current_date() - interval 1 month", retentionCondition);
|
||||||
starRocksAssert.dropTable("r1");
|
starRocksAssert.dropTable("r1");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testRangeTableWithRetentionCondition2() throws Exception {
|
||||||
|
starRocksAssert.withTable("CREATE TABLE r1 \n" +
|
||||||
|
"(\n" +
|
||||||
|
" dt date,\n" +
|
||||||
|
" k2 int,\n" +
|
||||||
|
" v1 int \n" +
|
||||||
|
")\n" +
|
||||||
|
"PARTITION BY RANGE(dt)\n" +
|
||||||
|
"(\n" +
|
||||||
|
" PARTITION p0 values [('2024-01-29'),('2024-01-30')),\n" +
|
||||||
|
" PARTITION p1 values [('2024-01-30'),('2024-01-31')),\n" +
|
||||||
|
" PARTITION p2 values [('2024-01-31'),('2024-02-01')),\n" +
|
||||||
|
" PARTITION p3 values [('2024-02-01'),('2024-02-02')) \n" +
|
||||||
|
")\n" +
|
||||||
|
"DISTRIBUTED BY HASH(k2) BUCKETS 3\n" +
|
||||||
|
"PROPERTIES (\n" +
|
||||||
|
"'replication_num' = '1',\n" +
|
||||||
|
"'partition_retention_condition' = 'dt > current_date() - interval 1 month'\n" +
|
||||||
|
")");
|
||||||
|
OlapTable r1 = (OlapTable) starRocksAssert.getTable("db1", "r1");
|
||||||
|
String retentionCondition = r1.getTableProperty().getPartitionRetentionCondition();
|
||||||
|
Assert.assertEquals("dt > current_date() - interval 1 month", retentionCondition);
|
||||||
|
|
||||||
|
String result = starRocksAssert.showCreateTable("show create table r1");
|
||||||
|
final String expect = "CREATE TABLE `r1` (\n" +
|
||||||
|
" `dt` date NULL COMMENT \"\",\n" +
|
||||||
|
" `k2` int(11) NULL COMMENT \"\",\n" +
|
||||||
|
" `v1` int(11) NULL COMMENT \"\"\n" +
|
||||||
|
") ENGINE=OLAP \n" +
|
||||||
|
"DUPLICATE KEY(`dt`, `k2`, `v1`)\n" +
|
||||||
|
"PARTITION BY RANGE(`dt`)\n" +
|
||||||
|
"(PARTITION p0 VALUES [(\"2024-01-29\"), (\"2024-01-30\")),\n" +
|
||||||
|
"PARTITION p1 VALUES [(\"2024-01-30\"), (\"2024-01-31\")),\n" +
|
||||||
|
"PARTITION p2 VALUES [(\"2024-01-31\"), (\"2024-02-01\")),\n" +
|
||||||
|
"PARTITION p3 VALUES [(\"2024-02-01\"), (\"2024-02-02\")))\n" +
|
||||||
|
"DISTRIBUTED BY HASH(`k2`) BUCKETS 3 \n" +
|
||||||
|
"PROPERTIES (\n" +
|
||||||
|
"\"compression\" = \"LZ4\",\n" +
|
||||||
|
"\"fast_schema_evolution\" = \"true\",\n" +
|
||||||
|
"\"partition_retention_condition\" = \"dt > current_date() - interval 1 month\",\n" +
|
||||||
|
"\"replicated_storage\" = \"true\",\n" +
|
||||||
|
"\"replication_num\" = \"1\"\n" +
|
||||||
|
");";
|
||||||
|
Assert.assertEquals(expect, result);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testRangeTableWithRetentionCondition3() throws Exception {
|
||||||
|
starRocksAssert.withTable("CREATE TABLE r1 \n" +
|
||||||
|
"(\n" +
|
||||||
|
" dt date,\n" +
|
||||||
|
" k2 int,\n" +
|
||||||
|
" v1 int \n" +
|
||||||
|
")\n" +
|
||||||
|
"PARTITION BY RANGE(dt)\n" +
|
||||||
|
"(\n" +
|
||||||
|
" PARTITION p0 values [('2024-01-29'),('2024-01-30')),\n" +
|
||||||
|
" PARTITION p1 values [('2024-01-30'),('2024-01-31')),\n" +
|
||||||
|
" PARTITION p2 values [('2024-01-31'),('2024-02-01')),\n" +
|
||||||
|
" PARTITION p3 values [('2024-02-01'),('2024-02-02')) \n" +
|
||||||
|
")\n" +
|
||||||
|
"DISTRIBUTED BY HASH(k2) BUCKETS 3\n" +
|
||||||
|
"PROPERTIES (\n" +
|
||||||
|
"'replication_num' = '1',\n" +
|
||||||
|
"'partition_retention_condition' = 'dt > current_date() - interval 1 month'\n" +
|
||||||
|
")");
|
||||||
|
OlapTable r1 = (OlapTable) starRocksAssert.getTable("db1", "r1");
|
||||||
|
String retentionCondition = r1.getTableProperty().getPartitionRetentionCondition();
|
||||||
|
Assert.assertEquals("dt > current_date() - interval 1 month", retentionCondition);
|
||||||
|
|
||||||
|
String alterPartitionSql = "alter table r1 set ('partition_retention_condition' = '');";
|
||||||
|
starRocksAssert.alterTable(alterPartitionSql);
|
||||||
|
|
||||||
|
retentionCondition = r1.getTableProperty().getPartitionRetentionCondition();
|
||||||
|
Assert.assertTrue(Strings.isEmpty(retentionCondition));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -686,4 +686,32 @@ public class DropPartitionWithExprListTest extends MVTestBase {
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testPartitionConditionTTL1() throws Exception {
|
||||||
|
starRocksAssert.withTable("create table list_par_int(\n" +
|
||||||
|
" k1 int,\n" +
|
||||||
|
" k2 string)\n" +
|
||||||
|
" partition by list(k1)\n" +
|
||||||
|
" (partition p1 values in('1','2'),\n" +
|
||||||
|
" partition p2 values in('3','4'),\n" +
|
||||||
|
" partition p3 values in('5','6'),\n" +
|
||||||
|
" partition p4 values in('7','8'),\n" +
|
||||||
|
" partition p5 values in('9','10'),\n" +
|
||||||
|
" partition p6 values in('11','12'),\n" +
|
||||||
|
" partition p7 values in('13','14'),\n" +
|
||||||
|
" partition p8 values in('15','16'),\n" +
|
||||||
|
" partition p9 values in('17','18'),\n" +
|
||||||
|
" partition p10 values in('19','20'))\n" +
|
||||||
|
" distributed by hash(k1)\n");
|
||||||
|
|
||||||
|
OlapTable olapTable = (OlapTable) starRocksAssert.getTable("test", "list_par_int");
|
||||||
|
Assert.assertEquals(10, olapTable.getVisiblePartitions().size());
|
||||||
|
|
||||||
|
String sql = "alter table list_par_int drop partitions where k1 > 5";
|
||||||
|
starRocksAssert.alterTable(sql);
|
||||||
|
Assert.assertEquals(3, olapTable.getVisiblePartitions().size());
|
||||||
|
String[] expectedPartitions = {"p1", "p2", "p3"};
|
||||||
|
Assert.assertArrayEquals(expectedPartitions, olapTable.getVisiblePartitionNames().toArray());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -16,6 +16,7 @@ package com.starrocks.clone;
|
||||||
|
|
||||||
import com.google.common.collect.ImmutableList;
|
import com.google.common.collect.ImmutableList;
|
||||||
import com.google.common.collect.Range;
|
import com.google.common.collect.Range;
|
||||||
|
import com.google.common.collect.Sets;
|
||||||
import com.starrocks.catalog.Database;
|
import com.starrocks.catalog.Database;
|
||||||
import com.starrocks.catalog.DistributionInfo;
|
import com.starrocks.catalog.DistributionInfo;
|
||||||
import com.starrocks.catalog.DynamicPartitionProperty;
|
import com.starrocks.catalog.DynamicPartitionProperty;
|
||||||
|
|
@ -49,6 +50,7 @@ import java.time.format.DateTimeFormatter;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
import java.util.Set;
|
||||||
import java.util.TimeZone;
|
import java.util.TimeZone;
|
||||||
|
|
||||||
import static org.junit.Assert.fail;
|
import static org.junit.Assert.fail;
|
||||||
|
|
@ -723,4 +725,76 @@ public class DynamicPartitionSchedulerTest {
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testPartitionConditionTTL1() throws Exception {
|
||||||
|
starRocksAssert.withTable("create table list_par_int(\n" +
|
||||||
|
" k1 int,\n" +
|
||||||
|
" k2 string)\n" +
|
||||||
|
" partition by list(k1)\n" +
|
||||||
|
" (partition p1 values in('1','2'),\n" +
|
||||||
|
" partition p2 values in('3','4'),\n" +
|
||||||
|
" partition p3 values in('5','6'),\n" +
|
||||||
|
" partition p4 values in('7','8'),\n" +
|
||||||
|
" partition p5 values in('9','10'),\n" +
|
||||||
|
" partition p6 values in('11','12'),\n" +
|
||||||
|
" partition p7 values in('13','14'),\n" +
|
||||||
|
" partition p8 values in('15','16'),\n" +
|
||||||
|
" partition p9 values in('17','18'),\n" +
|
||||||
|
" partition p10 values in('19','20'))\n" +
|
||||||
|
" distributed by hash(k1)\n" +
|
||||||
|
" PROPERTIES (\"partition_retention_condition\" = \"k1 > 5\");");
|
||||||
|
|
||||||
|
final String tableName = "list_par_int";
|
||||||
|
OlapTable olapTable = (OlapTable) starRocksAssert.getTable("test", tableName);
|
||||||
|
Assert.assertEquals(10, olapTable.getVisiblePartitions().size());
|
||||||
|
Assert.assertFalse(DynamicPartitionUtil.isDynamicPartitionTable(olapTable));
|
||||||
|
Assert.assertTrue(DynamicPartitionUtil.isTTLPartitionTable(olapTable));
|
||||||
|
|
||||||
|
DynamicPartitionScheduler scheduler = GlobalStateMgr.getCurrentState()
|
||||||
|
.getDynamicPartitionScheduler();
|
||||||
|
scheduler.runOnceForTest();
|
||||||
|
Set<String> visiblePartitionNames = olapTable.getVisiblePartitionNames();
|
||||||
|
Assert.assertEquals(8, visiblePartitionNames.size());
|
||||||
|
Set<String> expectedPartitionNames = Sets.newHashSet("p3", "p4", "p5", "p6", "p7", "p8", "p9", "p10");
|
||||||
|
System.out.println(visiblePartitionNames);
|
||||||
|
Assert.assertEquals(expectedPartitionNames, visiblePartitionNames);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testPartitionConditionTTL2() throws Exception {
|
||||||
|
starRocksAssert.withTable("create table range_par_hour(\n" +
|
||||||
|
" k1 datetime,\n" +
|
||||||
|
" k2 string)\n" +
|
||||||
|
" partition by range(k1)\n" +
|
||||||
|
" (partition p1 values less than(\"2025-05-11 16:00:00\"),\n" +
|
||||||
|
" partition p2 values less than(\"2025-05-11 17:00:00\"),\n" +
|
||||||
|
" partition p3 values less than(\"2025-05-11 18:00:00\"),\n" +
|
||||||
|
" partition p4 values less than(\"2025-05-11 19:00:00\"),\n" +
|
||||||
|
" partition p5 values less than(\"2025-05-11 20:00:00\"),\n" +
|
||||||
|
" partition p6 values less than(\"2025-05-12 21:00:00\"),\n" +
|
||||||
|
" partition p7 values less than(\"2025-05-12 22:00:00\"),\n" +
|
||||||
|
" partition p8 values less than(\"2025-05-12 23:00:00\"),\n" +
|
||||||
|
" partition p9 values less than(\"2025-05-13 00:00:00\"),\n" +
|
||||||
|
" partition p10 values less than(\"2025-05-13 01:00:00\"),\n" +
|
||||||
|
" partition p11 values less than(\"2025-05-13 02:00:00\"))\n" +
|
||||||
|
" distributed by hash(k1)\n" +
|
||||||
|
" PROPERTIES (\"partition_retention_condition\" = \"k1 >= '2025-05-12 00:00:00'\");");
|
||||||
|
|
||||||
|
final String tableName = "range_par_hour";
|
||||||
|
OlapTable olapTable = (OlapTable) starRocksAssert.getTable("test", tableName);
|
||||||
|
Assert.assertEquals(11, olapTable.getVisiblePartitions().size());
|
||||||
|
Assert.assertFalse(DynamicPartitionUtil.isDynamicPartitionTable(olapTable));
|
||||||
|
Assert.assertTrue(DynamicPartitionUtil.isTTLPartitionTable(olapTable));
|
||||||
|
|
||||||
|
DynamicPartitionScheduler scheduler = GlobalStateMgr.getCurrentState()
|
||||||
|
.getDynamicPartitionScheduler();
|
||||||
|
scheduler.runOnceForTest();
|
||||||
|
Set<String> visiblePartitionNames = olapTable.getVisiblePartitionNames();
|
||||||
|
System.out.println(visiblePartitionNames);
|
||||||
|
Assert.assertEquals(6, visiblePartitionNames.size());
|
||||||
|
Set<String> expectedPartitionNames = Sets.newHashSet("p6", "p7", "p8", "p9", "p10", "p11");
|
||||||
|
System.out.println(visiblePartitionNames);
|
||||||
|
Assert.assertEquals(expectedPartitionNames, visiblePartitionNames);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue