[Enhancement] Ensure mv force refresh will refresh target partitions (backport #62627) (backport #63844) (#63887)
Signed-off-by: shuming.li <ming.moriarty@gmail.com> Co-authored-by: mergify[bot] <37929162+mergify[bot]@users.noreply.github.com> Co-authored-by: shuming.li <ming.moriarty@gmail.com>
This commit is contained in:
parent
2d6c893e5d
commit
86a8a9de4d
|
|
@ -433,7 +433,7 @@ public class DynamicPartitionUtil {
|
|||
PartitionInfo partitionInfo = olapTable.getPartitionInfo();
|
||||
PartitionType partitionType = partitionInfo.getType();
|
||||
if (!isDynamicPartitionTable(olapTable, partitionInfo, tableProperty)) {
|
||||
LOG.info("unable to schedule olap table {}-{} because dynamic partition is not enabled, " +
|
||||
LOG.debug("unable to schedule olap table {}-{} because dynamic partition is not enabled, " +
|
||||
"partition type: {}, partition column size: {}",
|
||||
table.getName(), table.getId(), partitionType, partitionInfo.getPartitionColumnsSize());
|
||||
return false;
|
||||
|
|
@ -479,7 +479,7 @@ public class DynamicPartitionUtil {
|
|||
PartitionInfo partitionInfo = olapTable.getPartitionInfo();
|
||||
PartitionType partitionType = partitionInfo.getType();
|
||||
if (!isTTLPartitionTable(olapTable, partitionInfo, tableProperty)) {
|
||||
LOG.info("unable to schedule olap table {}-{} because partition ttl is not enabled, " +
|
||||
LOG.debug("unable to schedule olap table {}-{} because partition ttl is not enabled, " +
|
||||
"partition type: {}, partition column size: {}",
|
||||
table.getName(), table.getId(), partitionType, partitionInfo.getPartitionColumnsSize());
|
||||
return false;
|
||||
|
|
|
|||
|
|
@ -24,8 +24,9 @@ import org.apache.commons.lang3.StringUtils;
|
|||
|
||||
public abstract class BaseTaskRunProcessor implements TaskRunProcessor {
|
||||
@Override
|
||||
public void prepare(TaskRunContext context) throws Exception {
|
||||
public TaskRunContext prepare(TaskRunContext context) throws Exception {
|
||||
// do nothing
|
||||
return context;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
|||
|
|
@ -27,6 +27,7 @@ import com.google.common.util.concurrent.Uninterruptibles;
|
|||
import com.starrocks.analysis.Expr;
|
||||
import com.starrocks.catalog.BaseTableInfo;
|
||||
import com.starrocks.catalog.Column;
|
||||
import com.starrocks.catalog.DataProperty;
|
||||
import com.starrocks.catalog.Database;
|
||||
import com.starrocks.catalog.MaterializedView;
|
||||
import com.starrocks.catalog.OlapTable;
|
||||
|
|
@ -72,6 +73,7 @@ import com.starrocks.scheduler.mv.MVVersionManager;
|
|||
import com.starrocks.scheduler.persist.MVTaskRunExtraMessage;
|
||||
import com.starrocks.scheduler.persist.TaskRunStatus;
|
||||
import com.starrocks.server.GlobalStateMgr;
|
||||
import com.starrocks.server.LocalMetastore;
|
||||
import com.starrocks.sql.StatementPlanner;
|
||||
import com.starrocks.sql.analyzer.MaterializedViewAnalyzer;
|
||||
import com.starrocks.sql.analyzer.PlannerMetaLocker;
|
||||
|
|
@ -249,7 +251,7 @@ public class PartitionBasedMvRefreshProcessor extends BaseTaskRunProcessor {
|
|||
try (Timer ignored = Tracers.watchScope("MVRefreshSyncPartitions")) {
|
||||
// sync partitions between mv and base tables out of lock
|
||||
// do it outside lock because it is a time-cost operation
|
||||
if (!syncPartitions()) {
|
||||
if (!syncPartitions(context, false)) {
|
||||
logger.warn("Sync partitions failed.");
|
||||
return false;
|
||||
}
|
||||
|
|
@ -431,7 +433,7 @@ public class PartitionBasedMvRefreshProcessor extends BaseTaskRunProcessor {
|
|||
Map<TableSnapshotInfo, Set<String>> baseTableCandidatePartitions = Maps.newHashMap();
|
||||
if (Config.enable_materialized_view_external_table_precise_refresh) {
|
||||
try (Timer ignored = Tracers.watchScope("MVRefreshComputeCandidatePartitions")) {
|
||||
if (!syncPartitions()) {
|
||||
if (!syncPartitions(context, false)) {
|
||||
throw new DmlException(String.format("materialized view %s refresh task failed: sync partition failed",
|
||||
mv.getName()));
|
||||
}
|
||||
|
|
@ -875,7 +877,7 @@ public class PartitionBasedMvRefreshProcessor extends BaseTaskRunProcessor {
|
|||
|
||||
@VisibleForTesting
|
||||
@Override
|
||||
public void prepare(TaskRunContext context) throws Exception {
|
||||
public TaskRunContext prepare(TaskRunContext context) throws Exception {
|
||||
Map<String, String> properties = context.getProperties();
|
||||
// NOTE: mvId is set in Task's properties when creating
|
||||
long mvId = Long.parseLong(properties.get(MV_ID));
|
||||
|
|
@ -929,6 +931,7 @@ public class PartitionBasedMvRefreshProcessor extends BaseTaskRunProcessor {
|
|||
this.mvRefreshPartitioner = buildMvRefreshPartitioner(mv, context);
|
||||
|
||||
logger.info("finish prepare refresh mv:{}, jobId: {}", mvId, jobId);
|
||||
return context;
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -951,11 +954,59 @@ public class PartitionBasedMvRefreshProcessor extends BaseTaskRunProcessor {
|
|||
/**
|
||||
* Sync base table's partition infos to be used later.
|
||||
*/
|
||||
private boolean syncPartitions() throws AnalysisException, LockTimeoutException {
|
||||
private boolean syncPartitions(TaskRunContext taskRunContext,
|
||||
boolean tentative) throws AnalysisException, LockTimeoutException {
|
||||
Stopwatch stopwatch = Stopwatch.createStarted();
|
||||
// collect base table snapshot infos
|
||||
snapshotBaseTables = collectBaseTableSnapshotInfos(mv);
|
||||
|
||||
final MVRefreshParams mvRefreshParams = new MVRefreshParams(mv.getPartitionInfo(),
|
||||
taskRunContext.getProperties(), tentative);
|
||||
if (mvRefreshParams.isForce() && !tentative) {
|
||||
// drop existing partitions for force refresh
|
||||
Set<String> toRefreshPartitions = mvRefreshPartitioner.getMVPartitionsToRefreshByParams(mvRefreshParams);
|
||||
if (toRefreshPartitions != null && !toRefreshPartitions.isEmpty()) {
|
||||
logger.info("force refresh, drop partitions: [{}]",
|
||||
Joiner.on(",").join(toRefreshPartitions));
|
||||
|
||||
// first lock and drop partitions from a visible map
|
||||
Locker locker = new Locker();
|
||||
if (!locker.lockTableAndCheckDbExist(db, mv.getId(), LockType.WRITE)) {
|
||||
logger.warn("failed to lock database: {} in syncPartitions for force refresh", db.getFullName());
|
||||
throw new DmlException("Force refresh failed, database:" + db.getFullName() + " not exist");
|
||||
}
|
||||
try {
|
||||
PartitionInfo partitionInfo = mv.getPartitionInfo();
|
||||
DataProperty dataProperty = null;
|
||||
if (!mv.isPartitionedTable()) {
|
||||
String partitionName = toRefreshPartitions.iterator().next();
|
||||
Partition partition = mv.getPartition(partitionName);
|
||||
dataProperty = partitionInfo.getDataProperty(partition.getId());
|
||||
mv.dropPartition(db.getId(), partitionName, false);
|
||||
} else {
|
||||
for (String partName : toRefreshPartitions) {
|
||||
mvRefreshPartitioner.dropPartition(db, mv, partName);
|
||||
}
|
||||
}
|
||||
|
||||
// for non-partitioned table, we need to build the partition here
|
||||
if (!mv.isPartitionedTable()) {
|
||||
LocalMetastore localMetastore = GlobalStateMgr.getCurrentState().getLocalMetastore();
|
||||
ConnectContext connectContext = mvContext.getCtx();
|
||||
localMetastore.buildNonPartitionOlapTable(db, mv, partitionInfo, dataProperty,
|
||||
connectContext.getCurrentWarehouseId());
|
||||
}
|
||||
} catch (Exception e) {
|
||||
logger.warn("failed to drop partitions {} for force refresh",
|
||||
Joiner.on(",").join(toRefreshPartitions),
|
||||
DebugUtil.getRootStackTrace(e));
|
||||
throw new AnalysisException("failed to drop partitions for force refresh: " + e.getMessage());
|
||||
} finally {
|
||||
locker.unLockTableWithIntensiveDbLock(db.getId(), this.mv.getId(), LockType.WRITE);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// do sync partitions (add or drop partitions) for materialized view
|
||||
boolean result = mvRefreshPartitioner.syncAddOrDropPartitions();
|
||||
logger.info("finish sync partitions, cost(ms): {}", stopwatch.elapsed(TimeUnit.MILLISECONDS));
|
||||
|
|
@ -983,9 +1034,9 @@ public class PartitionBasedMvRefreshProcessor extends BaseTaskRunProcessor {
|
|||
MVRefreshParams mvRefreshParams,
|
||||
Set<String> mvPotentialPartitionNames)
|
||||
throws AnalysisException {
|
||||
if (mvRefreshParams.isForceCompleteRefresh()) {
|
||||
if (mvRefreshParams.isForce()) {
|
||||
// Force refresh
|
||||
return mvRefreshPartitioner.getMVPartitionsToRefreshWithForce();
|
||||
return mvRefreshPartitioner.getMVPartitionsToRefreshWithForce(mvRefreshParams);
|
||||
} else {
|
||||
return mvRefreshPartitioner.getMVPartitionsToRefresh(mvPartitionInfo, snapshotBaseTables,
|
||||
mvRefreshParams, mvPotentialPartitionNames);
|
||||
|
|
|
|||
|
|
@ -370,7 +370,7 @@ public class TaskRun implements Comparable<TaskRun> {
|
|||
taskRunContext.setTaskRun(this);
|
||||
|
||||
// prepare to execute task run, move it here so that we can catch the exception and set the status
|
||||
processor.prepare(taskRunContext);
|
||||
taskRunContext = processor.prepare(taskRunContext);
|
||||
|
||||
// process task run
|
||||
Constants.TaskRunState taskRunState;
|
||||
|
|
|
|||
|
|
@ -20,7 +20,7 @@ public interface TaskRunProcessor {
|
|||
* Before task run, prepare the context.
|
||||
* @param context: task run context
|
||||
*/
|
||||
void prepare(TaskRunContext context) throws Exception;
|
||||
TaskRunContext prepare(TaskRunContext context) throws Exception;
|
||||
|
||||
/**
|
||||
* Process a task run with the given context.
|
||||
|
|
|
|||
|
|
@ -87,6 +87,30 @@ public final class MVPCTRefreshListPartitioner extends MVPCTRefreshPartitioner {
|
|||
this.logger = MVTraceUtils.getLogger(mv, MVPCTRefreshListPartitioner.class);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Set<String> getMVPartitionsToRefreshByParams(MVRefreshParams mvRefreshParams) {
|
||||
if (mvRefreshParams.isCompleteRefresh()) {
|
||||
return mv.getListPartitionItems().keySet();
|
||||
} else {
|
||||
Set<PListCell> pListCells = mvRefreshParams.getListValues();
|
||||
Map<String, PListCell> mvPartitions = mv.getListPartitionItems();
|
||||
Map<String, PListCell> mvFilteredPartitions = mvPartitions.entrySet().stream()
|
||||
.filter(e -> {
|
||||
PListCell mvListCell = e.getValue();
|
||||
if (mvListCell.getItemSize() == 1) {
|
||||
// if list value is a single value, check it directly
|
||||
return pListCells.contains(e.getValue());
|
||||
} else {
|
||||
// if list values is multi values, split it into single values and check it then.
|
||||
return mvListCell.toSingleValueCells().stream().anyMatch(i -> pListCells.contains(i));
|
||||
}
|
||||
})
|
||||
.map(Map.Entry::getKey)
|
||||
.collect(Collectors.toMap(k -> k, mvPartitions::get));
|
||||
return mvFilteredPartitions.keySet();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean syncAddOrDropPartitions() throws LockTimeoutException {
|
||||
// collect mv partition items with lock
|
||||
|
|
@ -337,13 +361,6 @@ public final class MVPCTRefreshListPartitioner extends MVPCTRefreshPartitioner {
|
|||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public Set<String> getMVPartitionsToRefreshWithForce() {
|
||||
Map<String, PCell> mvValidListPartitionMapMap = mv.getPartitionCells(Optional.empty());
|
||||
filterPartitionsByTTL(mvValidListPartitionMapMap, false);
|
||||
return mvValidListPartitionMapMap.keySet();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Set<String> getMVPartitionsToRefresh(PartitionInfo mvPartitionInfo,
|
||||
Map<Long, TableSnapshotInfo> snapshotBaseTables,
|
||||
|
|
|
|||
|
|
@ -39,6 +39,11 @@ public final class MVPCTRefreshNonPartitioner extends MVPCTRefreshPartitioner {
|
|||
super(mvContext, context, db, mv);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Set<String> getMVPartitionsToRefreshByParams(MVRefreshParams mvRefreshParams) {
|
||||
return mv.getVisiblePartitionNames();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean syncAddOrDropPartitions() {
|
||||
// do nothing
|
||||
|
|
@ -59,11 +64,6 @@ public final class MVPCTRefreshNonPartitioner extends MVPCTRefreshPartitioner {
|
|||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Set<String> getMVPartitionsToRefreshWithForce() {
|
||||
return mv.getVisiblePartitionNames();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Set<String> getMVPartitionsToRefresh(PartitionInfo mvPartitionInfo,
|
||||
Map<Long, TableSnapshotInfo> snapshotBaseTables,
|
||||
|
|
|
|||
|
|
@ -27,6 +27,7 @@ import com.starrocks.catalog.PartitionInfo;
|
|||
import com.starrocks.catalog.Table;
|
||||
import com.starrocks.common.AnalysisException;
|
||||
import com.starrocks.common.Config;
|
||||
import com.starrocks.common.Pair;
|
||||
import com.starrocks.common.util.concurrent.lock.LockTimeoutException;
|
||||
import com.starrocks.common.util.concurrent.lock.LockType;
|
||||
import com.starrocks.common.util.concurrent.lock.Locker;
|
||||
|
|
@ -47,8 +48,10 @@ import org.apache.logging.log4j.Logger;
|
|||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import static com.starrocks.catalog.MvRefreshArbiter.getMvBaseTableUpdateInfo;
|
||||
import static com.starrocks.catalog.MvRefreshArbiter.needsToRefreshTable;
|
||||
|
|
@ -107,6 +110,11 @@ public abstract class MVPCTRefreshPartitioner {
|
|||
public abstract Expr generateMVPartitionPredicate(TableName tableName,
|
||||
Set<String> mvPartitionNames) throws AnalysisException;
|
||||
|
||||
/**
|
||||
* Get mv partition names to refresh based on the mv refresh params.
|
||||
*/
|
||||
public abstract Set<String> getMVPartitionsToRefreshByParams(MVRefreshParams mvRefreshParams) throws AnalysisException;
|
||||
|
||||
/**
|
||||
* Get mv partitions to refresh based on the ref base table partitions.
|
||||
*
|
||||
|
|
@ -121,7 +129,19 @@ public abstract class MVPCTRefreshPartitioner {
|
|||
MVRefreshParams mvRefreshParams,
|
||||
Set<String> mvPotentialPartitionNames) throws AnalysisException;
|
||||
|
||||
public abstract Set<String> getMVPartitionsToRefreshWithForce() throws AnalysisException;
|
||||
public Set<String> getMVPartitionsToRefreshWithForce(MVRefreshParams mvRefreshParams) throws AnalysisException {
|
||||
Set<String> toRefreshPartitions = getMVPartitionsToRefreshByParams(mvRefreshParams);
|
||||
if (CollectionUtils.isEmpty(toRefreshPartitions) || !mv.isPartitionedTable()) {
|
||||
return toRefreshPartitions;
|
||||
}
|
||||
Map<String, PCell> mvListPartitionMap = mv.getPartitionCells(Optional.empty());
|
||||
Map<String, PCell> validToRefreshPartitions = toRefreshPartitions.stream()
|
||||
.filter(mvListPartitionMap::containsKey)
|
||||
.map(name -> Pair.create(name, mvListPartitionMap.get(name)))
|
||||
.collect(Collectors.toMap(x -> x.first, x -> x.second));
|
||||
filterPartitionsByTTL(validToRefreshPartitions, true);
|
||||
return validToRefreshPartitions.keySet();
|
||||
}
|
||||
|
||||
/**
|
||||
* Get mv partition names with TTL based on the ref base table partitions.
|
||||
|
|
|
|||
|
|
@ -19,6 +19,7 @@ import com.google.common.base.Preconditions;
|
|||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Maps;
|
||||
import com.google.common.collect.Range;
|
||||
import com.google.common.collect.Sets;
|
||||
import com.google.common.util.concurrent.Uninterruptibles;
|
||||
import com.starrocks.analysis.BoolLiteral;
|
||||
import com.starrocks.analysis.Expr;
|
||||
|
|
@ -62,6 +63,7 @@ import com.starrocks.sql.common.RangePartitionDiffer;
|
|||
import com.starrocks.sql.common.SyncPartitionUtils;
|
||||
import com.starrocks.sql.optimizer.rule.transformation.materialization.MvUtils;
|
||||
import org.apache.commons.collections4.ListUtils;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
|
||||
import java.util.Collections;
|
||||
|
|
@ -90,6 +92,38 @@ public final class MVPCTRefreshRangePartitioner extends MVPCTRefreshPartitioner
|
|||
this.logger = MVTraceUtils.getLogger(mv, MVPCTRefreshRangePartitioner.class);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Set<String> getMVPartitionsToRefreshByParams(MVRefreshParams mvRefreshParams) throws AnalysisException {
|
||||
if (mvRefreshParams.isCompleteRefresh()) {
|
||||
Map<String, Range<PartitionKey>> rangePartitionMap = mv.getRangePartitionMap();
|
||||
if (rangePartitionMap.isEmpty()) {
|
||||
return Sets.newHashSet();
|
||||
}
|
||||
return rangePartitionMap.keySet();
|
||||
} else {
|
||||
String start = mvRefreshParams.getRangeStart();
|
||||
String end = mvRefreshParams.getRangeEnd();
|
||||
if (StringUtils.isEmpty(start) && StringUtils.isEmpty(end)) {
|
||||
return Sets.newHashSet();
|
||||
}
|
||||
// range partition must have partition column, and its column size must be 1
|
||||
Column partitionColumn = mv.getRangePartitionFirstColumn().get();
|
||||
Range<PartitionKey> rangeToInclude = createRange(start, end, partitionColumn);
|
||||
Map<String, Range<PartitionKey>> rangePartitionMap = mv.getRangePartitionMap();
|
||||
if (rangePartitionMap.isEmpty()) {
|
||||
return Sets.newHashSet();
|
||||
}
|
||||
Set<String> result = Sets.newHashSet();
|
||||
for (Map.Entry<String, Range<PartitionKey>> entry : rangePartitionMap.entrySet()) {
|
||||
Range<PartitionKey> rangeToCheck = entry.getValue();
|
||||
if (RangePartitionDiffer.isRangeIncluded(rangeToInclude, rangeToCheck)) {
|
||||
result.add(entry.getKey());
|
||||
}
|
||||
}
|
||||
return result;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean syncAddOrDropPartitions() throws AnalysisException {
|
||||
String start = context == null ? null : context.getProperties().get(TaskRun.PARTITION_START);
|
||||
|
|
@ -327,14 +361,6 @@ public final class MVPCTRefreshRangePartitioner extends MVPCTRefreshPartitioner
|
|||
return mvToRefreshPartitionNames;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Set<String> getMVPartitionsToRefreshWithForce() throws AnalysisException {
|
||||
int partitionTTLNumber = mvContext.getPartitionTTLNumber();
|
||||
Map<String, Range<PartitionKey>> inputRanges = mv.getValidRangePartitionMap(partitionTTLNumber);
|
||||
filterPartitionsByTTL(PRangeCell.toCellMap(inputRanges), false);
|
||||
return inputRanges.keySet();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Set<String> getMVPartitionNamesWithTTL(MaterializedView mv, MVRefreshParams mvRefreshParams,
|
||||
boolean isAutoRefresh) throws AnalysisException {
|
||||
|
|
|
|||
|
|
@ -46,10 +46,6 @@ public class MVRefreshParams {
|
|||
this.mvPartitionInfo = partitionInfo;
|
||||
}
|
||||
|
||||
public boolean isForceCompleteRefresh() {
|
||||
return isForce && isCompleteRefresh();
|
||||
}
|
||||
|
||||
public boolean isForce() {
|
||||
return isForce;
|
||||
}
|
||||
|
|
|
|||
|
|
@ -3062,24 +3062,9 @@ public class LocalMetastore implements ConnectorMetadata, MVRepairHandler, Memor
|
|||
PropertyAnalyzer.analyzeMVProperties(db, materializedView, properties, isNonPartitioned,
|
||||
stmt.getPartitionByExprToAdjustExprMap());
|
||||
try {
|
||||
Set<Long> tabletIdSet = new HashSet<>();
|
||||
// process single partition info
|
||||
if (isNonPartitioned) {
|
||||
long partitionId = GlobalStateMgr.getCurrentState().getNextId();
|
||||
Preconditions.checkNotNull(dataProperty);
|
||||
partitionInfo.setDataProperty(partitionId, dataProperty);
|
||||
partitionInfo.setReplicationNum(partitionId, materializedView.getDefaultReplicationNum());
|
||||
partitionInfo.setIsInMemory(partitionId, false);
|
||||
partitionInfo.setTabletType(partitionId, TTabletType.TABLET_TYPE_DISK);
|
||||
StorageInfo storageInfo = materializedView.getTableProperty().getStorageInfo();
|
||||
partitionInfo.setDataCacheInfo(partitionId,
|
||||
storageInfo == null ? null : storageInfo.getDataCacheInfo());
|
||||
Long version = Partition.PARTITION_INIT_VERSION;
|
||||
Partition partition = createPartition(db, materializedView, partitionId, mvName, version, tabletIdSet,
|
||||
materializedView.getWarehouseId());
|
||||
buildPartitions(db, materializedView, new ArrayList<>(partition.getSubPartitions()),
|
||||
materializedView.getWarehouseId());
|
||||
materializedView.addPartition(partition);
|
||||
buildNonPartitionOlapTable(db, materializedView, partitionInfo, dataProperty, materializedView.getWarehouseId());
|
||||
} else {
|
||||
List<Expr> mvPartitionExprs = stmt.getPartitionByExprs();
|
||||
LinkedHashMap<Expr, SlotRef> partitionExprMaps = MVPartitionExprResolver.getMVPartitionExprsChecked(
|
||||
|
|
@ -3114,6 +3099,35 @@ public class LocalMetastore implements ConnectorMetadata, MVRepairHandler, Memor
|
|||
DynamicPartitionUtil.registerOrRemovePartitionTTLTable(db.getId(), materializedView);
|
||||
}
|
||||
|
||||
/**
|
||||
* Initialize a non-partitioned table with one partition.
|
||||
*/
|
||||
public void buildNonPartitionOlapTable(Database db,
|
||||
OlapTable olapTable,
|
||||
PartitionInfo partitionInfo,
|
||||
DataProperty dataProperty,
|
||||
Long warehouseId) throws DdlException {
|
||||
if (olapTable.isPartitionedTable()) {
|
||||
throw new DdlException("Table " + olapTable.getName() + " is a partitioned table, not a non-partitioned table");
|
||||
}
|
||||
|
||||
long partitionId = GlobalStateMgr.getCurrentState().getNextId();
|
||||
Preconditions.checkNotNull(dataProperty);
|
||||
partitionInfo.setDataProperty(partitionId, dataProperty);
|
||||
partitionInfo.setReplicationNum(partitionId, olapTable.getDefaultReplicationNum());
|
||||
partitionInfo.setIsInMemory(partitionId, false);
|
||||
partitionInfo.setTabletType(partitionId, TTabletType.TABLET_TYPE_DISK);
|
||||
StorageInfo storageInfo = olapTable.getTableProperty().getStorageInfo();
|
||||
partitionInfo.setDataCacheInfo(partitionId,
|
||||
storageInfo == null ? null : storageInfo.getDataCacheInfo());
|
||||
Set<Long> tabletIdSet = new HashSet<>();
|
||||
Long version = Partition.PARTITION_INIT_VERSION;
|
||||
Partition partition = createPartition(db, olapTable, partitionId, olapTable.getName(), version, tabletIdSet,
|
||||
warehouseId);
|
||||
buildPartitions(db, olapTable, new ArrayList<>(partition.getSubPartitions()), warehouseId);
|
||||
olapTable.addPartition(partition);
|
||||
}
|
||||
|
||||
private long getRandomStart(IntervalLiteral interval, long randomizeStart) throws DdlException {
|
||||
if (interval == null || randomizeStart == -1) {
|
||||
return 0;
|
||||
|
|
|
|||
|
|
@ -199,7 +199,7 @@ public final class RangePartitionDiffer extends PartitionDiffer {
|
|||
* @param rangeToInclude range to check whether the to be checked range is in
|
||||
* @return true if included, else false
|
||||
*/
|
||||
private static boolean isRangeIncluded(Range<PartitionKey> rangeToCheck, Range<PartitionKey> rangeToInclude) {
|
||||
public static boolean isRangeIncluded(Range<PartitionKey> rangeToCheck, Range<PartitionKey> rangeToInclude) {
|
||||
if (rangeToInclude == null) {
|
||||
return true;
|
||||
}
|
||||
|
|
|
|||
|
|
@ -33,8 +33,8 @@ public class MockTaskRunProcessor implements TaskRunProcessor {
|
|||
}
|
||||
|
||||
@Override
|
||||
public void prepare(TaskRunContext context) throws Exception {
|
||||
// do nothing
|
||||
public TaskRunContext prepare(TaskRunContext context) throws Exception {
|
||||
return context;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
|||
|
|
@ -1652,4 +1652,149 @@ public class PCTRefreshListPartitionOlapTest extends MVTestBase {
|
|||
}
|
||||
Assertions.assertEquals(0, mv.getPartitions().size());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMVForceRefresh() throws Exception {
|
||||
String partitionTable = "CREATE TABLE list_t1 (dt1 date, int1 int)\n" +
|
||||
"PARTITION BY list(dt1) (\n" +
|
||||
" PARTITION p1 VALUES IN (\"2025-05-16\") ,\n" +
|
||||
" PARTITION p2 VALUES IN (\"2025-05-17\") \n" +
|
||||
")\n";
|
||||
starRocksAssert.withTable(partitionTable);
|
||||
String[] sqls = {
|
||||
"insert into list_t1 partition(p1) values('2025-05-16', 1);",
|
||||
"insert into list_t1 partition(p2) values('2025-05-17', 1);"
|
||||
};
|
||||
for (String sql : sqls) {
|
||||
executeInsertSql(sql);
|
||||
}
|
||||
|
||||
String mvQuery = "CREATE MATERIALIZED VIEW test_mv1 " +
|
||||
"PARTITION BY (dt1) " +
|
||||
"REFRESH DEFERRED MANUAL PROPERTIES (\"partition_refresh_number\"=\"-1\")\n" +
|
||||
"AS SELECT dt1,sum(int1) from list_t1 group by dt1 \n";
|
||||
starRocksAssert.withMaterializedView(mvQuery);
|
||||
MaterializedView mv = getMv("test_mv1");
|
||||
|
||||
TaskRun taskRun = buildMVTaskRun(mv, "test");
|
||||
ExecPlan execPlan;
|
||||
// explain without force
|
||||
{
|
||||
execPlan = getMVRefreshExecPlan(taskRun);
|
||||
Assertions.assertNotNull(execPlan);
|
||||
|
||||
refreshMV("test", mv);
|
||||
execPlan = getMVRefreshExecPlan(taskRun);
|
||||
Assertions.assertNull(execPlan);
|
||||
}
|
||||
|
||||
// refresh with force
|
||||
Map<String, String> props = taskRun.getProperties();
|
||||
props.put(TaskRun.FORCE, "true");
|
||||
// explain with refresh
|
||||
{
|
||||
ExecuteOption executeOption = new ExecuteOption(taskRun.getTask());
|
||||
Map<String, String> explainProps = executeOption.getTaskRunProperties();
|
||||
explainProps.put(TaskRun.FORCE, "true");
|
||||
|
||||
execPlan = getMVRefreshExecPlan(taskRun, true);
|
||||
Assertions.assertNotNull(execPlan);
|
||||
|
||||
|
||||
execPlan = getMVRefreshExecPlan(taskRun, true);
|
||||
Assertions.assertNotNull(execPlan);
|
||||
|
||||
// after refresh, still can refresh with force
|
||||
execPlan = getMVRefreshExecPlan(taskRun, true);
|
||||
String plan = execPlan.getExplainString(TExplainLevel.NORMAL);
|
||||
PlanTestBase.assertContains(plan, " TABLE: list_t1\n" +
|
||||
" PREAGGREGATION: ON\n" +
|
||||
" partitions=2/2");
|
||||
Assertions.assertNotNull(execPlan);
|
||||
|
||||
refreshMV("test", mv);
|
||||
|
||||
// after refresh, still can refresh with force
|
||||
execPlan = getMVRefreshExecPlan(taskRun, true);
|
||||
plan = execPlan.getExplainString(TExplainLevel.NORMAL);
|
||||
PlanTestBase.assertContains(plan, " TABLE: list_t1\n" +
|
||||
" PREAGGREGATION: ON\n" +
|
||||
" partitions=2/2");
|
||||
Assertions.assertNotNull(execPlan);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMVForcePartialRefresh() throws Exception {
|
||||
String partitionTable = "CREATE TABLE list_t1 (dt1 date, int1 int)\n" +
|
||||
"PARTITION BY list(dt1) (\n" +
|
||||
" PARTITION p1 VALUES IN (\"2025-05-16\") ,\n" +
|
||||
" PARTITION p2 VALUES IN (\"2025-05-17\") \n" +
|
||||
")\n";
|
||||
starRocksAssert.withTable(partitionTable);
|
||||
String[] sqls = {
|
||||
"insert into list_t1 partition(p1) values('2025-05-16', 1);",
|
||||
"insert into list_t1 partition(p2) values('2025-05-17', 1);"
|
||||
};
|
||||
for (String sql : sqls) {
|
||||
executeInsertSql(sql);
|
||||
}
|
||||
|
||||
String mvQuery = "CREATE MATERIALIZED VIEW test_mv1 " +
|
||||
"PARTITION BY (dt1) " +
|
||||
"REFRESH DEFERRED MANUAL PROPERTIES (\"partition_refresh_number\"=\"-1\")\n" +
|
||||
"AS SELECT dt1,sum(int1) from list_t1 group by dt1 \n";
|
||||
starRocksAssert.withMaterializedView(mvQuery);
|
||||
MaterializedView mv = getMv("test_mv1");
|
||||
|
||||
TaskRun taskRun = buildMVTaskRun(mv, "test");
|
||||
// partial refresh with force
|
||||
Map<String, String> props = taskRun.getProperties();
|
||||
props.put(TaskRun.PARTITION_VALUES,
|
||||
PListCell.batchSerialize(ImmutableSet.of(new PListCell("2025-05-16"))));
|
||||
|
||||
ExecPlan execPlan;
|
||||
// explain without force
|
||||
{
|
||||
execPlan = getMVRefreshExecPlan(taskRun);
|
||||
Assertions.assertNotNull(execPlan);
|
||||
|
||||
refreshMV("test", mv);
|
||||
execPlan = getMVRefreshExecPlan(taskRun);
|
||||
Assertions.assertNull(execPlan);
|
||||
}
|
||||
|
||||
// explain with partial refresh
|
||||
{
|
||||
ExecuteOption executeOption = new ExecuteOption(taskRun.getTask());
|
||||
Map<String, String> explainProps = executeOption.getTaskRunProperties();
|
||||
explainProps.put(TaskRun.PARTITION_VALUES,
|
||||
PListCell.batchSerialize(ImmutableSet.of(new PListCell("2025-05-16"))));
|
||||
explainProps.put(TaskRun.FORCE, "true");
|
||||
|
||||
execPlan = getMVRefreshExecPlan(taskRun, true);
|
||||
Assertions.assertNotNull(execPlan);
|
||||
|
||||
execPlan = getMVRefreshExecPlan(taskRun, true);
|
||||
Assertions.assertNotNull(execPlan);
|
||||
|
||||
// after refresh, still can refresh with force
|
||||
execPlan = getMVRefreshExecPlan(taskRun, true);
|
||||
String plan = execPlan.getExplainString(TExplainLevel.NORMAL);
|
||||
PlanTestBase.assertContains(plan, " TABLE: list_t1\n" +
|
||||
" PREAGGREGATION: ON\n" +
|
||||
" partitions=1/2");
|
||||
Assertions.assertNotNull(execPlan);
|
||||
|
||||
refreshMV("test", mv);
|
||||
|
||||
// after refresh, still can refresh with force
|
||||
execPlan = getMVRefreshExecPlan(taskRun, true);
|
||||
plan = execPlan.getExplainString(TExplainLevel.NORMAL);
|
||||
PlanTestBase.assertContains(plan, " TABLE: list_t1\n" +
|
||||
" PREAGGREGATION: ON\n" +
|
||||
" partitions=1/2");
|
||||
Assertions.assertNotNull(execPlan);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,106 @@
|
|||
// Copyright 2021-present StarRocks, Inc. All rights reserved.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// https://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package com.starrocks.scheduler;
|
||||
|
||||
import com.starrocks.catalog.MaterializedView;
|
||||
import com.starrocks.sql.optimizer.rule.transformation.materialization.MVTestBase;
|
||||
import com.starrocks.sql.plan.ExecPlan;
|
||||
import com.starrocks.sql.plan.PlanTestBase;
|
||||
import com.starrocks.thrift.TExplainLevel;
|
||||
import org.junit.jupiter.api.Assertions;
|
||||
import org.junit.jupiter.api.BeforeAll;
|
||||
import org.junit.jupiter.api.MethodOrderer.MethodName;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.api.TestMethodOrder;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
@TestMethodOrder(MethodName.class)
|
||||
public class PCTRefreshNonPartitionOlapTest extends MVTestBase {
|
||||
|
||||
@BeforeAll
|
||||
public static void beforeClass() throws Exception {
|
||||
MVTestBase.beforeClass();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMVForceRefresh() throws Exception {
|
||||
String partitionTable = "CREATE TABLE range_t1 (dt1 date, int1 int)\n" +
|
||||
"PARTITION BY date_trunc('day', dt1)";
|
||||
starRocksAssert.withTable(partitionTable);
|
||||
addRangePartition("range_t1", "p1", "2024-01-04", "2024-01-05");
|
||||
addRangePartition("range_t1", "p2", "2024-01-05", "2024-01-06");
|
||||
String[] sqls = {
|
||||
"INSERT INTO range_t1 partition(p1) VALUES (\"2024-01-04\",1);",
|
||||
"INSERT INTO range_t1 partition(p2) VALUES (\"2024-01-05\",1);"
|
||||
};
|
||||
for (String sql : sqls) {
|
||||
executeInsertSql(sql);
|
||||
}
|
||||
|
||||
String mvQuery = "CREATE MATERIALIZED VIEW test_mv1 " +
|
||||
"REFRESH DEFERRED MANUAL\n" +
|
||||
"AS SELECT dt1,sum(int1) from range_t1 group by dt1";
|
||||
starRocksAssert.withMaterializedView(mvQuery);
|
||||
|
||||
MaterializedView mv = getMv("test_mv1");
|
||||
|
||||
TaskRun taskRun = buildMVTaskRun(mv, "test");
|
||||
ExecPlan execPlan;
|
||||
// explain without force
|
||||
{
|
||||
execPlan = getMVRefreshExecPlan(taskRun);
|
||||
Assertions.assertNotNull(execPlan);
|
||||
|
||||
refreshMV("test", mv);
|
||||
execPlan = getMVRefreshExecPlan(taskRun);
|
||||
Assertions.assertNull(execPlan);
|
||||
}
|
||||
|
||||
// refresh with force
|
||||
Map<String, String> props = taskRun.getProperties();
|
||||
props.put(TaskRun.FORCE, "true");
|
||||
// explain with refresh
|
||||
{
|
||||
ExecuteOption executeOption = new ExecuteOption(taskRun.getTask());
|
||||
Map<String, String> explainProps = executeOption.getTaskRunProperties();
|
||||
explainProps.put(TaskRun.FORCE, "true");
|
||||
|
||||
execPlan = getMVRefreshExecPlan(taskRun, true);
|
||||
Assertions.assertNotNull(execPlan);
|
||||
|
||||
execPlan = getMVRefreshExecPlan(taskRun, true);
|
||||
Assertions.assertNotNull(execPlan);
|
||||
|
||||
// after refresh, still can refresh with force
|
||||
execPlan = getMVRefreshExecPlan(taskRun, true);
|
||||
String plan = execPlan.getExplainString(TExplainLevel.NORMAL);
|
||||
PlanTestBase.assertContains(plan, " TABLE: range_t1\n" +
|
||||
" PREAGGREGATION: ON\n" +
|
||||
" partitions=2/2");
|
||||
Assertions.assertNotNull(execPlan);
|
||||
|
||||
refreshMV("test", mv);
|
||||
|
||||
// after refresh, still can refresh with force
|
||||
execPlan = getMVRefreshExecPlan(taskRun, true);
|
||||
plan = execPlan.getExplainString(TExplainLevel.NORMAL);
|
||||
PlanTestBase.assertContains(plan, " TABLE: range_t1\n" +
|
||||
" PREAGGREGATION: ON\n" +
|
||||
" partitions=2/2");
|
||||
Assertions.assertNotNull(execPlan);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,181 @@
|
|||
// Copyright 2021-present StarRocks, Inc. All rights reserved.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// https://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package com.starrocks.scheduler;
|
||||
|
||||
import com.starrocks.catalog.MaterializedView;
|
||||
import com.starrocks.sql.optimizer.rule.transformation.materialization.MVTestBase;
|
||||
import com.starrocks.sql.plan.ExecPlan;
|
||||
import com.starrocks.sql.plan.PlanTestBase;
|
||||
import com.starrocks.thrift.TExplainLevel;
|
||||
import org.junit.jupiter.api.Assertions;
|
||||
import org.junit.jupiter.api.BeforeAll;
|
||||
import org.junit.jupiter.api.MethodOrderer.MethodName;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.api.TestMethodOrder;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
@TestMethodOrder(MethodName.class)
|
||||
public class PCTRefreshRangePartitionOlapTest extends MVTestBase {
|
||||
|
||||
@BeforeAll
|
||||
public static void beforeClass() throws Exception {
|
||||
MVTestBase.beforeClass();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMVForceRefresh() throws Exception {
|
||||
String partitionTable = "CREATE TABLE range_t1 (dt1 date, int1 int)\n" +
|
||||
"PARTITION BY date_trunc('day', dt1)";
|
||||
starRocksAssert.withTable(partitionTable);
|
||||
addRangePartition("range_t1", "p1", "2024-01-04", "2024-01-05");
|
||||
addRangePartition("range_t1", "p2", "2024-01-05", "2024-01-06");
|
||||
String[] sqls = {
|
||||
"INSERT INTO range_t1 partition(p1) VALUES (\"2024-01-04\",1);",
|
||||
"INSERT INTO range_t1 partition(p2) VALUES (\"2024-01-05\",1);"
|
||||
};
|
||||
for (String sql : sqls) {
|
||||
executeInsertSql(sql);
|
||||
}
|
||||
|
||||
String mvQuery = "CREATE MATERIALIZED VIEW test_mv1 " +
|
||||
"PARTITION BY date_trunc('day', dt1) " +
|
||||
"REFRESH DEFERRED MANUAL PROPERTIES (\"partition_refresh_number\"=\"-1\")\n" +
|
||||
"AS SELECT dt1,sum(int1) from range_t1 group by dt1";
|
||||
starRocksAssert.withMaterializedView(mvQuery);
|
||||
|
||||
MaterializedView mv = getMv("test_mv1");
|
||||
|
||||
TaskRun taskRun = buildMVTaskRun(mv, "test");
|
||||
ExecPlan execPlan;
|
||||
// explain without force
|
||||
{
|
||||
execPlan = getMVRefreshExecPlan(taskRun);
|
||||
Assertions.assertNotNull(execPlan);
|
||||
|
||||
refreshMV("test", mv);
|
||||
execPlan = getMVRefreshExecPlan(taskRun);
|
||||
Assertions.assertNull(execPlan);
|
||||
}
|
||||
|
||||
// refresh with force
|
||||
Map<String, String> props = taskRun.getProperties();
|
||||
props.put(TaskRun.FORCE, "true");
|
||||
// explain with refresh
|
||||
{
|
||||
ExecuteOption executeOption = new ExecuteOption(taskRun.getTask());
|
||||
Map<String, String> explainProps = executeOption.getTaskRunProperties();
|
||||
explainProps.put(TaskRun.FORCE, "true");
|
||||
|
||||
execPlan = getMVRefreshExecPlan(taskRun, true);
|
||||
Assertions.assertNotNull(execPlan);
|
||||
|
||||
execPlan = getMVRefreshExecPlan(taskRun, true);
|
||||
Assertions.assertNotNull(execPlan);
|
||||
|
||||
// after refresh, still can refresh with force
|
||||
execPlan = getMVRefreshExecPlan(taskRun, true);
|
||||
String plan = execPlan.getExplainString(TExplainLevel.NORMAL);
|
||||
PlanTestBase.assertContains(plan, " TABLE: range_t1\n" +
|
||||
" PREAGGREGATION: ON\n" +
|
||||
" partitions=2/2");
|
||||
Assertions.assertNotNull(execPlan);
|
||||
|
||||
refreshMV("test", mv);
|
||||
|
||||
// after refresh, still can refresh with force
|
||||
execPlan = getMVRefreshExecPlan(taskRun, true);
|
||||
plan = execPlan.getExplainString(TExplainLevel.NORMAL);
|
||||
PlanTestBase.assertContains(plan, " TABLE: range_t1\n" +
|
||||
" PREAGGREGATION: ON\n" +
|
||||
" partitions=2/2");
|
||||
Assertions.assertNotNull(execPlan);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMVForcePartialRefresh() throws Exception {
|
||||
String partitionTable = "CREATE TABLE range_t1 (dt1 date, int1 int)\n" +
|
||||
"PARTITION BY date_trunc('day', dt1)";
|
||||
starRocksAssert.withTable(partitionTable);
|
||||
addRangePartition("range_t1", "p1", "2024-01-04", "2024-01-05");
|
||||
addRangePartition("range_t1", "p2", "2024-01-05", "2024-01-06");
|
||||
String[] sqls = {
|
||||
"INSERT INTO range_t1 partition(p1) VALUES (\"2024-01-04\",1);",
|
||||
"INSERT INTO range_t1 partition(p2) VALUES (\"2024-01-05\",1);"
|
||||
};
|
||||
for (String sql : sqls) {
|
||||
executeInsertSql(sql);
|
||||
}
|
||||
|
||||
String mvQuery = "CREATE MATERIALIZED VIEW test_mv1 " +
|
||||
"PARTITION BY date_trunc('day', dt1) " +
|
||||
"REFRESH DEFERRED MANUAL PROPERTIES (\"partition_refresh_number\"=\"-1\")\n" +
|
||||
"AS SELECT dt1,sum(int1) from range_t1 group by dt1";
|
||||
starRocksAssert.withMaterializedView(mvQuery);
|
||||
|
||||
MaterializedView mv = getMv("test_mv1");
|
||||
|
||||
TaskRun taskRun = buildMVTaskRun(mv, "test");
|
||||
Map<String, String> props = taskRun.getProperties();
|
||||
props.put(TaskRun.PARTITION_START, "2024-01-04");
|
||||
props.put(TaskRun.PARTITION_END, "2024-01-05");
|
||||
|
||||
ExecPlan execPlan;
|
||||
// explain without force
|
||||
{
|
||||
execPlan = getMVRefreshExecPlan(taskRun);
|
||||
Assertions.assertNotNull(execPlan);
|
||||
|
||||
refreshMV("test", mv);
|
||||
execPlan = getMVRefreshExecPlan(taskRun);
|
||||
Assertions.assertNull(execPlan);
|
||||
}
|
||||
|
||||
// refresh with force
|
||||
props.put(TaskRun.FORCE, "true");
|
||||
// explain with refresh
|
||||
{
|
||||
ExecuteOption executeOption = new ExecuteOption(taskRun.getTask());
|
||||
Map<String, String> explainProps = executeOption.getTaskRunProperties();
|
||||
explainProps.put(TaskRun.FORCE, "true");
|
||||
explainProps.put(TaskRun.PARTITION_START, "2024-01-04");
|
||||
explainProps.put(TaskRun.PARTITION_END, "2024-01-05");
|
||||
execPlan = getMVRefreshExecPlan(taskRun, true);
|
||||
Assertions.assertNotNull(execPlan);
|
||||
|
||||
execPlan = getMVRefreshExecPlan(taskRun, true);
|
||||
Assertions.assertNotNull(execPlan);
|
||||
|
||||
// after refresh, still can refresh with force
|
||||
execPlan = getMVRefreshExecPlan(taskRun, true);
|
||||
String plan = execPlan.getExplainString(TExplainLevel.NORMAL);
|
||||
PlanTestBase.assertContains(plan, " TABLE: range_t1\n" +
|
||||
" PREAGGREGATION: ON\n" +
|
||||
" partitions=1/2");
|
||||
Assertions.assertNotNull(execPlan);
|
||||
|
||||
refreshMV("test", mv);
|
||||
|
||||
// after refresh, still can refresh with force
|
||||
execPlan = getMVRefreshExecPlan(taskRun, true);
|
||||
plan = execPlan.getExplainString(TExplainLevel.NORMAL);
|
||||
PlanTestBase.assertContains(plan, " TABLE: range_t1\n" +
|
||||
" PREAGGREGATION: ON\n" +
|
||||
" partitions=1/2");
|
||||
Assertions.assertNotNull(execPlan);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -239,7 +239,7 @@ public class PartitionBasedMvRefreshProcessorJdbcTest extends MVTestBase {
|
|||
.collect(Collectors.toList());
|
||||
Assertions.assertEquals(Arrays.asList("p000101_202308", "p202308_202309"), partitions);
|
||||
// mv partition p202308_202309 is force refreshed and visible version is increased
|
||||
Assertions.assertEquals(partitionVersionMap.get("p202308_202309") + 1,
|
||||
Assertions.assertEquals(partitionVersionMap.get("p202308_202309"),
|
||||
materializedView.getPartition("p202308_202309")
|
||||
.getDefaultPhysicalPartition().getVisibleVersion());
|
||||
}
|
||||
|
|
|
|||
|
|
@ -421,15 +421,15 @@ public class PartitionBasedMvRefreshProcessorOlapTest extends MVTestBase {
|
|||
.getDefaultPhysicalPartition().getVisibleVersion());
|
||||
|
||||
refreshMVRange(materializedView.getName(), "2021-12-03", "2022-05-06", true);
|
||||
Assertions.assertEquals(3, materializedView.getPartition("p202112_202201")
|
||||
Assertions.assertEquals(2, materializedView.getPartition("p202112_202201")
|
||||
.getDefaultPhysicalPartition().getVisibleVersion());
|
||||
Assertions.assertEquals(3, materializedView.getPartition("p202201_202202")
|
||||
Assertions.assertEquals(2, materializedView.getPartition("p202201_202202")
|
||||
.getDefaultPhysicalPartition().getVisibleVersion());
|
||||
Assertions.assertEquals(2, materializedView.getPartition("p202202_202203")
|
||||
.getDefaultPhysicalPartition().getVisibleVersion());
|
||||
Assertions.assertEquals(2, materializedView.getPartition("p202203_202204")
|
||||
.getDefaultPhysicalPartition().getVisibleVersion());
|
||||
Assertions.assertEquals(3, materializedView.getPartition("p202204_202205")
|
||||
Assertions.assertEquals(2, materializedView.getPartition("p202204_202205")
|
||||
.getDefaultPhysicalPartition().getVisibleVersion());
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -410,6 +410,13 @@ public class MVTestBase extends StarRocksTestBase {
|
|||
}
|
||||
|
||||
protected static ExecPlan getMVRefreshExecPlan(TaskRun taskRun) throws Exception {
|
||||
return getMVRefreshExecPlan(taskRun, false);
|
||||
}
|
||||
|
||||
protected static ExecPlan getMVRefreshExecPlan(TaskRun taskRun, boolean isForce) throws Exception {
|
||||
if (isForce) {
|
||||
taskRun.getProperties().put(TaskRun.FORCE, "true");
|
||||
}
|
||||
initAndExecuteTaskRun(taskRun);
|
||||
PartitionBasedMvRefreshProcessor processor = (PartitionBasedMvRefreshProcessor)
|
||||
taskRun.getProcessor();
|
||||
|
|
|
|||
|
|
@ -0,0 +1,178 @@
|
|||
-- name: test_mv_force_refresh
|
||||
create database db_${uuid0};
|
||||
-- result:
|
||||
-- !result
|
||||
use db_${uuid0};
|
||||
-- result:
|
||||
-- !result
|
||||
CREATE TABLE `t1` (
|
||||
`k1` date,
|
||||
`k2` int,
|
||||
`k3` int
|
||||
)
|
||||
DUPLICATE KEY(`k1`)
|
||||
PARTITION BY RANGE (k1) (
|
||||
START ("2020-10-01") END ("2022-03-04") EVERY (INTERVAL 15 day)
|
||||
)
|
||||
DISTRIBUTED BY HASH(`k1`) BUCKETS 3;
|
||||
-- result:
|
||||
-- !result
|
||||
INSERT INTO t1 VALUES ("2020-11-10",1,1),("2020-11-11",2,2),("2020-11-12",3,3);
|
||||
-- result:
|
||||
-- !result
|
||||
CREATE MATERIALIZED VIEW test_mv1
|
||||
PARTITION BY k1
|
||||
DISTRIBUTED BY HASH(k1) BUCKETS 10
|
||||
REFRESH DEFERRED MANUAL
|
||||
PROPERTIES ("partition_refresh_number"="-1")
|
||||
AS SELECT * FROM t1;
|
||||
-- result:
|
||||
-- !result
|
||||
REFRESH MATERIALIZED VIEW test_mv1 WITH SYNC MODE;
|
||||
select * from test_mv1 order by k1, k2;
|
||||
-- result:
|
||||
2020-11-10 1 1
|
||||
2020-11-11 2 2
|
||||
2020-11-12 3 3
|
||||
-- !result
|
||||
REFRESH MATERIALIZED VIEW test_mv1 FORCE WITH SYNC MODE;
|
||||
select * from test_mv1 order by k1, k2;
|
||||
-- result:
|
||||
2020-11-10 1 1
|
||||
2020-11-11 2 2
|
||||
2020-11-12 3 3
|
||||
-- !result
|
||||
REFRESH MATERIALIZED VIEW test_mv1 PARTITION START("2020-11-10") END("2020-11-11") FORCE WITH SYNC MODE;
|
||||
select * from test_mv1 order by k1, k2;
|
||||
-- result:
|
||||
2020-11-10 1 1
|
||||
2020-11-11 2 2
|
||||
2020-11-12 3 3
|
||||
-- !result
|
||||
INSERT INTO t1 VALUES ("2020-11-10",4,4);
|
||||
-- result:
|
||||
-- !result
|
||||
REFRESH MATERIALIZED VIEW test_mv1 FORCE WITH SYNC MODE;
|
||||
select * from test_mv1 order by k1, k2;
|
||||
-- result:
|
||||
2020-11-10 1 1
|
||||
2020-11-10 4 4
|
||||
2020-11-11 2 2
|
||||
2020-11-12 3 3
|
||||
-- !result
|
||||
DROP MATERIALIZED VIEW test_mv1;
|
||||
-- result:
|
||||
-- !result
|
||||
CREATE TABLE `t2` (
|
||||
`k1` date,
|
||||
`k2` int,
|
||||
`k3` int
|
||||
)
|
||||
DUPLICATE KEY(`k1`)
|
||||
DISTRIBUTED BY HASH(`k1`) BUCKETS 3;
|
||||
-- result:
|
||||
-- !result
|
||||
INSERT INTO t2 VALUES ("2020-11-10",1,1),("2020-11-11",2,2),("2020-11-12",3,3);
|
||||
-- result:
|
||||
-- !result
|
||||
CREATE MATERIALIZED VIEW test_mv1
|
||||
DISTRIBUTED BY HASH(k1) BUCKETS 10
|
||||
REFRESH DEFERRED MANUAL
|
||||
AS SELECT * FROM t2;
|
||||
-- result:
|
||||
-- !result
|
||||
REFRESH MATERIALIZED VIEW test_mv1 WITH SYNC MODE;
|
||||
select * from test_mv1 order by k1, k2;
|
||||
-- result:
|
||||
2020-11-10 1 1
|
||||
2020-11-11 2 2
|
||||
2020-11-12 3 3
|
||||
-- !result
|
||||
REFRESH MATERIALIZED VIEW test_mv1 FORCE WITH SYNC MODE;
|
||||
select * from test_mv1 order by k1, k2;
|
||||
-- result:
|
||||
2020-11-10 1 1
|
||||
2020-11-11 2 2
|
||||
2020-11-12 3 3
|
||||
-- !result
|
||||
INSERT INTO t2 VALUES ("2020-11-10",4,4);
|
||||
-- result:
|
||||
-- !result
|
||||
REFRESH MATERIALIZED VIEW test_mv1 FORCE WITH SYNC MODE;
|
||||
select * from test_mv1 order by k1, k2;
|
||||
-- result:
|
||||
2020-11-10 1 1
|
||||
2020-11-10 4 4
|
||||
2020-11-11 2 2
|
||||
2020-11-12 3 3
|
||||
-- !result
|
||||
DROP MATERIALIZED VIEW test_mv1;
|
||||
-- result:
|
||||
-- !result
|
||||
CREATE TABLE t3 (
|
||||
k1 date,
|
||||
k2 int,
|
||||
k3 int
|
||||
)
|
||||
DUPLICATE KEY(k1)
|
||||
PARTITION BY date_trunc("day", k1), k2;
|
||||
-- result:
|
||||
-- !result
|
||||
INSERT INTO t3 VALUES ("2020-11-10",1,1),("2020-11-11",2,2),("2020-11-12",3,3);
|
||||
-- result:
|
||||
-- !result
|
||||
CREATE MATERIALIZED VIEW test_mv1
|
||||
partition by (date_trunc("day", k1), k2)
|
||||
REFRESH DEFERRED MANUAL
|
||||
PROPERTIES ("partition_refresh_number"="-1")
|
||||
AS select k1, k2, sum(k3) from t3 group by k1, k2;
|
||||
-- result:
|
||||
-- !result
|
||||
REFRESH MATERIALIZED VIEW test_mv1 WITH SYNC MODE;
|
||||
select * from test_mv1 order by k1, k2;
|
||||
-- result:
|
||||
2020-11-10 1 1
|
||||
2020-11-11 2 2
|
||||
2020-11-12 3 3
|
||||
-- !result
|
||||
REFRESH MATERIALIZED VIEW test_mv1 FORCE WITH SYNC MODE;
|
||||
select * from test_mv1 order by k1, k2;
|
||||
-- result:
|
||||
2020-11-10 1 1
|
||||
2020-11-11 2 2
|
||||
2020-11-12 3 3
|
||||
-- !result
|
||||
INSERT INTO t3 VALUES ("2020-11-10",4,4);
|
||||
-- result:
|
||||
-- !result
|
||||
REFRESH MATERIALIZED VIEW test_mv1 PARTITION (("2020-11-10", "4")) FORCE WITH SYNC MODE;
|
||||
select * from test_mv1 order by k1, k2;
|
||||
-- result:
|
||||
2020-11-10 1 1
|
||||
2020-11-10 4 4
|
||||
2020-11-11 2 2
|
||||
2020-11-12 3 3
|
||||
-- !result
|
||||
REFRESH MATERIALIZED VIEW test_mv1 FORCE WITH SYNC MODE;
|
||||
select * from test_mv1 order by k1, k2;
|
||||
-- result:
|
||||
2020-11-10 1 1
|
||||
2020-11-10 4 4
|
||||
2020-11-11 2 2
|
||||
2020-11-12 3 3
|
||||
-- !result
|
||||
DROP MATERIALIZED VIEW test_mv1;
|
||||
-- result:
|
||||
-- !result
|
||||
drop table t1;
|
||||
-- result:
|
||||
-- !result
|
||||
drop table t2;
|
||||
-- result:
|
||||
-- !result
|
||||
drop table t3;
|
||||
-- result:
|
||||
-- !result
|
||||
drop database db_${uuid0} force;
|
||||
-- result:
|
||||
-- !result
|
||||
|
|
@ -0,0 +1,104 @@
|
|||
-- name: test_mv_force_refresh
|
||||
|
||||
create database db_${uuid0};
|
||||
use db_${uuid0};
|
||||
|
||||
-- range partition table
|
||||
CREATE TABLE `t1` (
|
||||
`k1` date,
|
||||
`k2` int,
|
||||
`k3` int
|
||||
)
|
||||
DUPLICATE KEY(`k1`)
|
||||
PARTITION BY RANGE (k1) (
|
||||
START ("2020-10-01") END ("2022-03-04") EVERY (INTERVAL 15 day)
|
||||
)
|
||||
DISTRIBUTED BY HASH(`k1`) BUCKETS 3;
|
||||
|
||||
INSERT INTO t1 VALUES ("2020-11-10",1,1),("2020-11-11",2,2),("2020-11-12",3,3);
|
||||
|
||||
CREATE MATERIALIZED VIEW test_mv1
|
||||
PARTITION BY k1
|
||||
DISTRIBUTED BY HASH(k1) BUCKETS 10
|
||||
REFRESH DEFERRED MANUAL
|
||||
PROPERTIES ("partition_refresh_number"="-1")
|
||||
AS SELECT * FROM t1;
|
||||
|
||||
REFRESH MATERIALIZED VIEW test_mv1 WITH SYNC MODE;
|
||||
select * from test_mv1 order by k1, k2;
|
||||
|
||||
REFRESH MATERIALIZED VIEW test_mv1 FORCE WITH SYNC MODE;
|
||||
select * from test_mv1 order by k1, k2;
|
||||
|
||||
REFRESH MATERIALIZED VIEW test_mv1 PARTITION START("2020-11-10") END("2020-11-11") FORCE WITH SYNC MODE;
|
||||
select * from test_mv1 order by k1, k2;
|
||||
|
||||
INSERT INTO t1 VALUES ("2020-11-10",4,4);
|
||||
REFRESH MATERIALIZED VIEW test_mv1 FORCE WITH SYNC MODE;
|
||||
select * from test_mv1 order by k1, k2;
|
||||
|
||||
DROP MATERIALIZED VIEW test_mv1;
|
||||
|
||||
-- non partition table
|
||||
CREATE TABLE `t2` (
|
||||
`k1` date,
|
||||
`k2` int,
|
||||
`k3` int
|
||||
)
|
||||
DUPLICATE KEY(`k1`)
|
||||
DISTRIBUTED BY HASH(`k1`) BUCKETS 3;
|
||||
INSERT INTO t2 VALUES ("2020-11-10",1,1),("2020-11-11",2,2),("2020-11-12",3,3);
|
||||
|
||||
CREATE MATERIALIZED VIEW test_mv1
|
||||
DISTRIBUTED BY HASH(k1) BUCKETS 10
|
||||
REFRESH DEFERRED MANUAL
|
||||
AS SELECT * FROM t2;
|
||||
|
||||
REFRESH MATERIALIZED VIEW test_mv1 WITH SYNC MODE;
|
||||
select * from test_mv1 order by k1, k2;
|
||||
|
||||
REFRESH MATERIALIZED VIEW test_mv1 FORCE WITH SYNC MODE;
|
||||
select * from test_mv1 order by k1, k2;
|
||||
|
||||
INSERT INTO t2 VALUES ("2020-11-10",4,4);
|
||||
REFRESH MATERIALIZED VIEW test_mv1 FORCE WITH SYNC MODE;
|
||||
select * from test_mv1 order by k1, k2;
|
||||
|
||||
DROP MATERIALIZED VIEW test_mv1;
|
||||
|
||||
-- list partition table
|
||||
CREATE TABLE t3 (
|
||||
k1 date,
|
||||
k2 int,
|
||||
k3 int
|
||||
)
|
||||
DUPLICATE KEY(k1)
|
||||
PARTITION BY date_trunc("day", k1), k2;
|
||||
|
||||
INSERT INTO t3 VALUES ("2020-11-10",1,1),("2020-11-11",2,2),("2020-11-12",3,3);
|
||||
|
||||
CREATE MATERIALIZED VIEW test_mv1
|
||||
partition by (date_trunc("day", k1), k2)
|
||||
REFRESH DEFERRED MANUAL
|
||||
PROPERTIES ("partition_refresh_number"="-1")
|
||||
AS select k1, k2, sum(k3) from t3 group by k1, k2;
|
||||
|
||||
REFRESH MATERIALIZED VIEW test_mv1 WITH SYNC MODE;
|
||||
select * from test_mv1 order by k1, k2;
|
||||
|
||||
REFRESH MATERIALIZED VIEW test_mv1 FORCE WITH SYNC MODE;
|
||||
select * from test_mv1 order by k1, k2;
|
||||
|
||||
INSERT INTO t3 VALUES ("2020-11-10",4,4);
|
||||
REFRESH MATERIALIZED VIEW test_mv1 PARTITION (("2020-11-10", "4")) FORCE WITH SYNC MODE;
|
||||
select * from test_mv1 order by k1, k2;
|
||||
|
||||
REFRESH MATERIALIZED VIEW test_mv1 FORCE WITH SYNC MODE;
|
||||
select * from test_mv1 order by k1, k2;
|
||||
|
||||
DROP MATERIALIZED VIEW test_mv1;
|
||||
|
||||
drop table t1;
|
||||
drop table t2;
|
||||
drop table t3;
|
||||
drop database db_${uuid0} force;
|
||||
Loading…
Reference in New Issue