[BugFix] Fix mv refresh with potential to-refresh partitions (#62422)

Signed-off-by: shuming.li <ming.moriarty@gmail.com>
This commit is contained in:
shuming.li 2025-08-29 10:39:21 +08:00 committed by GitHub
parent dded9e4d2e
commit ced00834c2
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
16 changed files with 491 additions and 233 deletions

2
.gitignore vendored
View File

@ -21,7 +21,7 @@ thirdparty/installed
core.*
extension/spark-doris-connector/.classpath
extension/spark-doris-connector/target
contrib/.*/target/
contrib/*/target/
fe/log
**/ut_ports
venv/

View File

@ -20,7 +20,6 @@ import com.google.common.util.concurrent.Uninterruptibles;
import com.starrocks.catalog.Database;
import com.starrocks.catalog.MaterializedView;
import com.starrocks.catalog.Table;
import com.starrocks.common.AnalysisException;
import com.starrocks.common.Config;
import com.starrocks.common.FeConstants;
import com.starrocks.common.StarRocksException;
@ -37,14 +36,18 @@ import com.starrocks.scheduler.mv.BaseMVRefreshProcessor;
import com.starrocks.scheduler.mv.MVRefreshExecutor;
import com.starrocks.scheduler.mv.MVRefreshProcessorFactory;
import com.starrocks.scheduler.mv.MVTraceUtils;
import com.starrocks.scheduler.persist.MVTaskRunExtraMessage;
import com.starrocks.scheduler.persist.TaskRunStatus;
import com.starrocks.server.GlobalStateMgr;
import com.starrocks.sql.ast.InsertStmt;
import com.starrocks.sql.ast.StatementBase;
import com.starrocks.sql.common.DmlException;
import com.starrocks.sql.common.QueryDebugOptions;
import com.starrocks.sql.optimizer.QueryMaterializationContext;
import com.starrocks.sql.optimizer.rule.transformation.materialization.MvUtils;
import com.starrocks.sql.plan.ExecPlan;
import com.starrocks.thrift.TUniqueId;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.Logger;
@ -301,8 +304,60 @@ public class MVTaskRunProcessor extends BaseTaskRunProcessor implements MVRefres
return this.mvRefreshProcessor;
}
public Set<String> getPCTMVToRefreshedPartitions(TaskRunContext context) throws AnalysisException, LockTimeoutException {
return this.mvRefreshProcessor.getPCTMVToRefreshedPartitions(context, true);
/**
* Get extra explain info for the mv refresh task run which is used for explain task run.
*/
public String getExtraExplainInfo(StatementBase statement) {
StringBuilder sb = new StringBuilder();
if (mvRefreshProcessor != null && statement.isExplain()) {
if (statement.isExplainTrace() || statement.isExplainAnalyze()) {
return "";
}
try {
TaskRunStatus status = mvTaskRunContext.getStatus();
if (status != null && status.getMvTaskRunExtraMessage() != null) {
MVTaskRunExtraMessage extraMessage = status.getMvTaskRunExtraMessage();
// mv partitions to refresh
Set<String> mvPartitionsToRefresh = extraMessage.getMvPartitionsToRefresh();
if (!CollectionUtils.isEmpty(mvPartitionsToRefresh)) {
sb.append("\n");
sb.append("MVToRefreshedPartitions: " + mvPartitionsToRefresh);
}
// base partitions to refresh
Map<String, Set<String>> basePartitionsToRefreshMap = extraMessage.getBasePartitionsToRefreshMap();
if (!basePartitionsToRefreshMap.isEmpty()) {
sb.append("\n");
sb.append("BasePartitionsToRefreshed: " + basePartitionsToRefreshMap);
}
// plan builder message
Map<String, String> planBuilderMessage = extraMessage.getPlanBuilderMessage();
if (!planBuilderMessage.isEmpty()) {
sb.append("\n");
sb.append("PlanBuilderMessage: " + extraMessage.getPlanBuilderMessage());
}
// next start partition
String nextPartition = extraMessage.getNextPartitionStart();
if (!StringUtils.isEmpty(nextPartition)) {
sb.append("\n");
sb.append("NextPartition: " + nextPartition);
}
// next end partition
if (!StringUtils.isEmpty(extraMessage.getNextPartitionEnd())) {
sb.append("\n");
sb.append("NextPartitionEnd: " + extraMessage.getNextPartitionEnd());
}
// next partition values
if (!StringUtils.isEmpty(extraMessage.getNextPartitionValues())) {
sb.append("\n");
sb.append("NextPartitionValues: " + extraMessage.getNextPartitionValues());
}
}
} catch (Exception e) {
logger.warn("failed to get pct mv to refreshed partitions", e);
}
}
return sb.toString();
}
@VisibleForTesting

View File

@ -66,7 +66,6 @@ import java.time.LocalDateTime;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
@ -368,18 +367,14 @@ public class TaskManager implements MemoryTrackable {
ExecPlan execPlan = getMVRefreshExecPlan(taskRun, task, option, statement);
String explainString = StmtExecutor.buildExplainString(execPlan, statement,
ConnectContext.get(), ResourceGroupClassifier.QueryType.MV, statement.getExplainLevel());
// append pctmvToRefreshedPartitions
if (statement.getExplainLevel() == StatementBase.ExplainLevel.VERBOSE) {
Optional<Set<String>> pctmvToRefreshedPartitions = getPCTMVToRefreshedPartitions(taskRun);
if (pctmvToRefreshedPartitions.isPresent()) {
explainString += "\n" + "MVToRefreshedPartitions: " + pctmvToRefreshedPartitions.get();
}
// add extra info
String extraInfo = getExtraExplainInfo(taskRun, statement);
if (!Strings.isNullOrEmpty(extraInfo)) {
explainString += "\n" + extraInfo;
}
return explainString;
}
public TaskRun buildTaskRun(Task task, ExecuteOption option) {
return TaskRunBuilder.newBuilder(task)
.properties(option.getTaskRunProperties())
@ -387,13 +382,22 @@ public class TaskManager implements MemoryTrackable {
.setConnectContext(ConnectContext.get()).build();
}
private Optional<Set<String>> getPCTMVToRefreshedPartitions(TaskRun taskRun) {
MVTaskRunProcessor mvRefreshProcessor = (MVTaskRunProcessor) taskRun.getProcessor();
private String getExtraExplainInfo(TaskRun taskRun,
StatementBase statement) {
TaskRunProcessor taskRunProcessor = taskRun.getProcessor();
if (taskRunProcessor == null) {
return "";
}
try {
return Optional.of(mvRefreshProcessor.getPCTMVToRefreshedPartitions(taskRun.buildTaskRunContext()));
if (taskRunProcessor instanceof MVTaskRunProcessor) {
MVTaskRunProcessor mvRefreshProcessor = (MVTaskRunProcessor) taskRun.getProcessor();
return mvRefreshProcessor.getExtraExplainInfo(statement);
} else {
return "";
}
} catch (Exception e) {
LOG.error("Failed to get PCTMVToRefreshedPartitions", e);
return Optional.empty();
LOG.error("Failed to get getExtraExplainInfo:", e);
return "";
}
}

View File

@ -372,6 +372,12 @@ public class TaskRun implements Comparable<TaskRun> {
// post process task run
try (Timer ignored = Tracers.watchScope("TaskRunPostProcess")) {
// record the final status of task run
if (taskRunContext != null && taskRunContext.getStatus() != null) {
Tracers.record("TaskRunStatus", taskRunContext.getStatus().toJSON());
}
// post process the task run
processor.postTaskRun(taskRunContext);
} catch (Exception e) {
LOG.warn("Failed to post task run, task_id: {}, task_run_id: {}, error: {}",

View File

@ -96,6 +96,7 @@ public abstract class BaseMVRefreshProcessor {
// Collect all bases tables of the mv to be updated meta after mv refresh success.
// format : table id -> <base table info, snapshot table>
protected final MVPCTRefreshPartitioner mvRefreshPartitioner;
protected final MVRefreshParams mvRefreshParams;
// Collect all base table snapshot infos for the mv which the snapshot infos are kept
// and used in the final update meta.
@ -128,8 +129,12 @@ public abstract class BaseMVRefreshProcessor {
this.mvContext = mvContext;
this.mvEntity = mvEntity;
this.logger = MVTraceUtils.getLogger(mv, clazz);
// init mv refresh params
MaterializedView.PartitionRefreshStrategy partitionRefreshStrategy = mv.getPartitionRefreshStrategy();
boolean isForce = partitionRefreshStrategy == MaterializedView.PartitionRefreshStrategy.FORCE;
this.mvRefreshParams = new MVRefreshParams(mv.getPartitionInfo(), mvContext.getProperties(), isForce);
// prepare mv refresh partitioner
this.mvRefreshPartitioner = buildMvRefreshPartitioner(mv, mvContext);
this.mvRefreshPartitioner = buildMvRefreshPartitioner(mv, mvContext, mvRefreshParams);
}
/**
@ -177,14 +182,15 @@ public abstract class BaseMVRefreshProcessor {
* Create a mv refresh partitioner by the mv's partition info.
*/
private MVPCTRefreshPartitioner buildMvRefreshPartitioner(MaterializedView mv,
TaskRunContext context) {
TaskRunContext context,
MVRefreshParams mvRefreshParams) {
PartitionInfo partitionInfo = mv.getPartitionInfo();
if (partitionInfo.isUnPartitioned()) {
return new MVPCTRefreshNonPartitioner(mvContext, context, db, mv);
return new MVPCTRefreshNonPartitioner(mvContext, context, db, mv, mvRefreshParams);
} else if (partitionInfo.isRangePartition()) {
return new MVPCTRefreshRangePartitioner(mvContext, context, db, mv);
return new MVPCTRefreshRangePartitioner(mvContext, context, db, mv, mvRefreshParams);
} else if (partitionInfo.isListPartition()) {
return new MVPCTRefreshListPartitioner(mvContext, context, db, mv);
return new MVPCTRefreshListPartitioner(mvContext, context, db, mv, mvRefreshParams);
} else {
throw new DmlException(String.format("materialized view:%s in database:%s refresh failed: partition info %s not " +
"supported", mv.getName(), db.getFullName(), partitionInfo));
@ -297,7 +303,7 @@ public abstract class BaseMVRefreshProcessor {
throw new DmlException(String.format("materialized view %s refresh task failed: sync partition failed",
mv.getName()));
}
Set<String> mvCandidatePartition = getPCTMVToRefreshedPartitions(taskRunContext, true);
Set<String> mvCandidatePartition = getPCTMVToRefreshedPartitions(true);
baseTableCandidatePartitions = getPCTRefTableRefreshPartitions(mvCandidatePartition);
} catch (Exception e) {
logger.warn("failed to compute candidate partitions in sync partitions",
@ -333,7 +339,7 @@ public abstract class BaseMVRefreshProcessor {
}
protected void updatePCTToRefreshMetas(TaskRunContext taskRunContext) throws Exception {
this.pctMVToRefreshedPartitions = getPCTMVToRefreshedPartitions(taskRunContext, false);
this.pctMVToRefreshedPartitions = getPCTMVToRefreshedPartitions(false);
// ref table of mv : refreshed partition names
this.pctRefTableRefreshPartitions = getPCTRefTableRefreshPartitions(pctMVToRefreshedPartitions);
// ref table of mv : refreshed partition names
@ -562,18 +568,20 @@ public abstract class BaseMVRefreshProcessor {
return tables;
}
/**
* @param tentative: if true, it means this is called in the first phase to compute candidate partitions and not the
* standard phase to get final partitions to refresh.
*/
@VisibleForTesting
public Set<String> getPCTMVToRefreshedPartitions(TaskRunContext context,
boolean tentative) throws AnalysisException, LockTimeoutException {
final MaterializedView.PartitionRefreshStrategy partitionRefreshStrategy = mv.getPartitionRefreshStrategy();
boolean isForce = partitionRefreshStrategy == MaterializedView.PartitionRefreshStrategy.FORCE || tentative;
final MVRefreshParams mvRefreshParams = new MVRefreshParams(mv.getPartitionInfo(), context.getProperties(), isForce);
public Set<String> getPCTMVToRefreshedPartitions(boolean tentative) throws AnalysisException, LockTimeoutException {
// change mv refresh params if needed
mvRefreshParams.setIsTentative(tentative);
final Set<String> mvPotentialPartitionNames = Sets.newHashSet();
final PCellSortedSet mvToRefreshedPartitions = mvRefreshPartitioner.getMVToRefreshedPartitions(
snapshotBaseTables, mvRefreshParams, partitionRefreshStrategy, mvPotentialPartitionNames, tentative);
snapshotBaseTables, mvPotentialPartitionNames);
// update mv extra message
if (!tentative) {
if (!mvRefreshParams.isTentative()) {
updateTaskRunStatus(status -> {
MVTaskRunExtraMessage extraMessage = status.getMvTaskRunExtraMessage();
extraMessage.setForceRefresh(mvRefreshParams.isForce());

View File

@ -80,8 +80,9 @@ public final class MVPCTRefreshListPartitioner extends MVPCTRefreshPartitioner {
public MVPCTRefreshListPartitioner(MvTaskRunContext mvContext,
TaskRunContext context,
Database db,
MaterializedView mv) {
super(mvContext, context, db, mv);
MaterializedView mv,
MVRefreshParams mvRefreshParams) {
super(mvContext, context, db, mv, mvRefreshParams);
this.differ = new ListPartitionDiffer(mv, false);
this.logger = MVTraceUtils.getLogger(mv, MVPCTRefreshListPartitioner.class);
}
@ -345,11 +346,10 @@ public final class MVPCTRefreshListPartitioner extends MVPCTRefreshPartitioner {
@Override
public PCellSortedSet getMVPartitionsToRefresh(PartitionInfo mvPartitionInfo,
Map<Long, BaseTableSnapshotInfo> snapshotBaseTables,
MVRefreshParams mvRefreshParams,
Set<String> mvPotentialPartitionNames) {
// list partitioned materialized view
boolean isAutoRefresh = mvContext.getTaskType().isAutoRefresh();
PCellSortedSet mvListPartitionNames = getMVPartitionNamesWithTTL(mv, mvRefreshParams, isAutoRefresh);
PCellSortedSet mvListPartitionNames = getMVPartitionNamesWithTTL(isAutoRefresh);
// check non-ref base tables
if (mvRefreshParams.isForce() || needsRefreshBasedOnNonRefTables(snapshotBaseTables)) {
@ -363,40 +363,45 @@ public final class MVPCTRefreshListPartitioner extends MVPCTRefreshPartitioner {
}
} else {
// check the ref base table
PCellSortedSet result = getMvPartitionNamesToRefresh(mvListPartitionNames);
Map<Table, Set<String>> baseChangedPartitionNames =
getBasePartitionNamesByMVPartitionNames(result);
if (baseChangedPartitionNames.isEmpty()) {
logger.info("Cannot get associated base table change partitions from mv's refresh partitions {}",
result);
return result;
}
// because the relation of partitions between materialized view and base partition table is n : m,
// should calculate the candidate partitions recursively.
if (isCalcPotentialRefreshPartition()) {
logger.info("Start calcPotentialRefreshPartition, result: {}," +
" baseChangedPartitionNames: {}", result, baseChangedPartitionNames);
Set<String> mvToRefreshPartitionNames = result.getPartitionNames();
Set<String> tmpMvPotentialPartitionNames = Sets.newHashSet(mvToRefreshPartitionNames);
SyncPartitionUtils.calcPotentialRefreshPartition(tmpMvPotentialPartitionNames, baseChangedPartitionNames,
mvContext.getRefBaseTableMVIntersectedPartitions(),
mvContext.getMvRefBaseTableIntersectedPartitions(),
mvPotentialPartitionNames);
Map<String, PCell> mvCellMap = mvContext.getMVToCellMap();
Set<String> addedPartitions = Sets.difference(tmpMvPotentialPartitionNames, mvToRefreshPartitionNames);
for (String partition : addedPartitions) {
PCell pCell = mvCellMap.get(partition);
if (pCell == null) {
logger.warn("Cannot find mv partition name range cell:{}", partition);
continue;
}
result.add(PCellWithName.of(partition, pCell));
}
logger.info("Finish calcPotentialRefreshPartition, result: {}," +
" baseChangedPartitionNames: {}", result, baseChangedPartitionNames);
}
return getMvPartitionNamesToRefresh(mvListPartitionNames);
}
}
@Override
public PCellSortedSet calcPotentialMVRefreshPartitions(Set<String> mvPotentialPartitionNames,
PCellSortedSet result) {
Map<Table, Set<String>> baseChangedPartitionNames =
getBasePartitionNamesByMVPartitionNames(result);
if (baseChangedPartitionNames.isEmpty()) {
logger.info("Cannot get associated base table change partitions from mv's refresh partitions {}",
result);
return result;
}
// because the relation of partitions between materialized view and base partition table is n : m,
// should calculate the candidate partitions recursively.
if (isCalcPotentialRefreshPartition()) {
logger.info("Start calcPotentialRefreshPartition, result: {}," +
" baseChangedPartitionNames: {}", result, baseChangedPartitionNames);
Set<String> mvToRefreshPartitionNames = result.getPartitionNames();
Set<String> tmpMvPotentialPartitionNames = Sets.newHashSet(mvToRefreshPartitionNames);
SyncPartitionUtils.calcPotentialRefreshPartition(tmpMvPotentialPartitionNames, baseChangedPartitionNames,
mvContext.getRefBaseTableMVIntersectedPartitions(),
mvContext.getMvRefBaseTableIntersectedPartitions(),
mvPotentialPartitionNames);
Map<String, PCell> mvCellMap = mvContext.getMVToCellMap();
Set<String> addedPartitions = Sets.difference(tmpMvPotentialPartitionNames, mvToRefreshPartitionNames);
for (String partition : addedPartitions) {
PCell pCell = mvCellMap.get(partition);
if (pCell == null) {
logger.warn("Cannot find mv partition name range cell:{}", partition);
continue;
}
result.add(PCellWithName.of(partition, pCell));
}
logger.info("Finish calcPotentialRefreshPartition, result: {}," +
" baseChangedPartitionNames: {}", result, baseChangedPartitionNames);
}
return result;
}
public boolean isCalcPotentialRefreshPartition() {
@ -409,9 +414,7 @@ public final class MVPCTRefreshListPartitioner extends MVPCTRefreshPartitioner {
}
@Override
public PCellSortedSet getMVPartitionNamesWithTTL(MaterializedView mv,
MVRefreshParams mvRefreshParams,
boolean isAutoRefresh) {
public PCellSortedSet getMVPartitionNamesWithTTL(boolean isAutoRefresh) {
// if the user specifies the start and end ranges, only refresh the specified partitions
boolean isCompleteRefresh = mvRefreshParams.isCompleteRefresh();
Map<String, PCell> mvListPartitionMap = Maps.newHashMap();
@ -442,23 +445,20 @@ public final class MVPCTRefreshListPartitioner extends MVPCTRefreshPartitioner {
@Override
public void filterPartitionByRefreshNumber(PCellSortedSet mvPartitionsToRefresh,
Set<String> mvPotentialPartitionNames,
boolean tentative) {
filterPartitionByRefreshNumberInternal(mvPartitionsToRefresh, mvPotentialPartitionNames, tentative,
Set<String> mvPotentialPartitionNames) {
filterPartitionByRefreshNumberInternal(mvPartitionsToRefresh, mvPotentialPartitionNames,
MaterializedView.PartitionRefreshStrategy.STRICT);
}
@Override
public void filterPartitionByAdaptiveRefreshNumber(PCellSortedSet mvPartitionsToRefresh,
Set<String> mvPotentialPartitionNames,
boolean tentative) {
filterPartitionByRefreshNumberInternal(mvPartitionsToRefresh, mvPotentialPartitionNames, tentative,
Set<String> mvPotentialPartitionNames) {
filterPartitionByRefreshNumberInternal(mvPartitionsToRefresh, mvPotentialPartitionNames,
MaterializedView.PartitionRefreshStrategy.ADAPTIVE);
}
public void filterPartitionByRefreshNumberInternal(PCellSortedSet toRefreshPartitions,
Set<String> mvPotentialPartitionNames,
boolean tentative,
MaterializedView.PartitionRefreshStrategy refreshStrategy) {
// filter by partition ttl
@ -486,7 +486,7 @@ public final class MVPCTRefreshListPartitioner extends MVPCTRefreshPartitioner {
if (CollectionUtils.isEmpty(nextPartitionValues)) {
return;
}
if (!tentative) {
if (!mvRefreshParams.isTentative()) {
// partitionNameIter has just been traversed, and endPartitionName is not updated
// will cause endPartitionName == null
mvContext.setNextPartitionValues(PListCell.batchSerialize(nextPartitionValues));

View File

@ -38,8 +38,9 @@ public final class MVPCTRefreshNonPartitioner extends MVPCTRefreshPartitioner {
public MVPCTRefreshNonPartitioner(MvTaskRunContext mvContext,
TaskRunContext context,
Database db,
MaterializedView mv) {
super(mvContext, context, db, mv);
MaterializedView mv,
MVRefreshParams mvRefreshParams) {
super(mvContext, context, db, mv, mvRefreshParams);
}
@Override
@ -78,7 +79,6 @@ public final class MVPCTRefreshNonPartitioner extends MVPCTRefreshPartitioner {
@Override
public PCellSortedSet getMVPartitionsToRefresh(PartitionInfo mvPartitionInfo,
Map<Long, BaseTableSnapshotInfo> snapshotBaseTables,
MVRefreshParams mvRefreshParams,
Set<String> mvPotentialPartitionNames) {
// non-partitioned materialized view
if (mvRefreshParams.isForce() || isNonPartitionedMVNeedToRefresh(snapshotBaseTables, mv)) {
@ -88,20 +88,25 @@ public final class MVPCTRefreshNonPartitioner extends MVPCTRefreshPartitioner {
}
@Override
public PCellSortedSet getMVPartitionNamesWithTTL(MaterializedView materializedView,
MVRefreshParams mvRefreshParams,
boolean isAutoRefresh) {
public PCellSortedSet calcPotentialMVRefreshPartitions(Set<String> mvPotentialPartitionNames,
PCellSortedSet result) {
return result;
}
@Override
public PCellSortedSet getMVPartitionNamesWithTTL(boolean isAutoRefresh) {
return PCellSortedSet.of();
}
@Override
public void filterPartitionByRefreshNumber(PCellSortedSet mvPartitionsToRefresh,
Set<String> mvPotentialPartitionNames, boolean tentative) {
Set<String> mvPotentialPartitionNames) {
// do nothing
}
@Override
public void filterPartitionByAdaptiveRefreshNumber(PCellSortedSet mvPartitionsToRefresh,
Set<String> mvPotentialPartitionNames, boolean tentative) {
Set<String> mvPotentialPartitionNames) {
// do nothing
}

View File

@ -91,16 +91,19 @@ public abstract class MVPCTRefreshPartitioner {
protected final TaskRunContext context;
protected final Database db;
protected final MaterializedView mv;
protected final MVRefreshParams mvRefreshParams;
private final Logger logger;
public MVPCTRefreshPartitioner(MvTaskRunContext mvContext,
TaskRunContext context,
Database db,
MaterializedView mv) {
MaterializedView mv,
MVRefreshParams mvRefreshParams) {
this.mvContext = mvContext;
this.context = context;
this.db = db;
this.mv = mv;
this.mvRefreshParams = mvRefreshParams;
this.logger = MVTraceUtils.getLogger(mv, MVPCTRefreshPartitioner.class);
}
@ -144,46 +147,50 @@ public abstract class MVPCTRefreshPartitioner {
*/
public abstract PCellSortedSet getMVPartitionsToRefresh(PartitionInfo mvPartitionInfo,
Map<Long, BaseTableSnapshotInfo> snapshotBaseTables,
MVRefreshParams mvRefreshParams,
Set<String> mvPotentialPartitionNames) throws AnalysisException;
public abstract PCellSortedSet getMVPartitionsToRefreshWithForce() throws AnalysisException;
/**
* Get mv partition names with TTL based on the ref base table partitions.
*
* @param materializedView: materialized view to check.
* @param isAutoRefresh: is auto refresh or not.
* @throws AnalysisException
* @return: mv to refresh partition names with TTL based on the ref base table partitions.
*/
public abstract PCellSortedSet getMVPartitionNamesWithTTL(MaterializedView materializedView,
MVRefreshParams mvRefreshParams,
boolean isAutoRefresh) throws AnalysisException;
public abstract PCellSortedSet getMVPartitionNamesWithTTL(boolean isAutoRefresh) throws AnalysisException;
/**
* Filter to refresh partitions by refresh number.
*
* @param mvPartitionsToRefresh : mv partitions to refresh.
* @param mvPotentialPartitionNames : mv potential partition names to check.
* @param tentative see {@link MVPCTBasedRefreshProcessor}
* Filter to refresh partitions by adaptive refresh number.
* @param mvPartitionsToRefresh: mv partitions to refresh.
* @param mvPotentialPartitionNames: mv potential partition names to check.
*/
public abstract void filterPartitionByRefreshNumber(PCellSortedSet mvPartitionsToRefresh,
Set<String> mvPotentialPartitionNames,
boolean tentative);
public abstract void filterPartitionByAdaptiveRefreshNumber(PCellSortedSet mvPartitionsToRefresh,
Set<String> mvPotentialPartitionNames,
boolean tentative);
Set<String> mvPotentialPartitionNames);
@VisibleForTesting
public abstract void filterPartitionByRefreshNumber(PCellSortedSet partitionsToRefresh,
Set<String> mvPotentialPartitionNames);
/**
* Calculate the associated potential partitions to refresh according to the partitions to refresh.
* NOTE: This must be called after filterMVToRefreshPartitions, otherwise it may lose some potential to-refresh mv partitions
* which will cause filtered insert load.
* @param mvPotentialPartitionNames: mv potential partition names to check.
* @param result: partitions to refresh for materialized view which will be changed in this method.
*/
public abstract PCellSortedSet calcPotentialMVRefreshPartitions(Set<String> mvPotentialPartitionNames,
PCellSortedSet result);
public void filterMVToRefreshPartitionsByProperty(PCellSortedSet mvToRefreshedPartitions) {
// do nothing by default
}
/**
* @param mvPartitionInfo : materialized view's partition info
* @param mvRefreshParams : materialized view's refresh params
* @return the partitions to refresh for materialized view
* @throws AnalysisException
*/
private PCellSortedSet getPartitionsToRefreshForMaterializedView(PartitionInfo mvPartitionInfo,
MVRefreshParams mvRefreshParams,
Map<Long, BaseTableSnapshotInfo> snapshotBaseTables,
Set<String> mvPotentialPartitionNames)
throws AnalysisException {
@ -191,8 +198,7 @@ public abstract class MVPCTRefreshPartitioner {
// Force refresh
return getMVPartitionsToRefreshWithForce();
} else {
return getMVPartitionsToRefresh(mvPartitionInfo, snapshotBaseTables,
mvRefreshParams, mvPotentialPartitionNames);
return getMVPartitionsToRefresh(mvPartitionInfo, snapshotBaseTables, mvPotentialPartitionNames);
}
}
@ -202,10 +208,7 @@ public abstract class MVPCTRefreshPartitioner {
* IF it's not, it would modify the context state, like `NEXT_PARTITION_START`
*/
public PCellSortedSet getMVToRefreshedPartitions(Map<Long, BaseTableSnapshotInfo> snapshotBaseTables,
MVRefreshParams mvRefreshParams,
MaterializedView.PartitionRefreshStrategy partitionRefreshStrategy,
Set<String> mvPotentialPartitionNames,
boolean tentative)
Set<String> mvPotentialPartitionNames)
throws AnalysisException, LockTimeoutException {
PCellSortedSet mvToRefreshedPartitions = null;
Locker locker = new Locker();
@ -214,36 +217,21 @@ public abstract class MVPCTRefreshPartitioner {
logger.warn("failed to lock database: {} in checkMvToRefreshedPartitions", db.getFullName());
throw new LockTimeoutException("Failed to lock database: " + db.getFullName());
}
final PartitionInfo partitionInfo = mv.getPartitionInfo();
try {
final PartitionInfo partitionInfo = mv.getPartitionInfo();
mvToRefreshedPartitions = getPartitionsToRefreshForMaterializedView(partitionInfo,
mvRefreshParams, snapshotBaseTables, mvPotentialPartitionNames);
snapshotBaseTables, mvPotentialPartitionNames);
if (mvToRefreshedPartitions == null || mvToRefreshedPartitions.isEmpty()) {
logger.info("no partitions to refresh for materialized view");
return mvToRefreshedPartitions;
}
// filter partitions to avoid refreshing too many partitions
filterMVToRefreshPartitions(mvToRefreshedPartitions, mvPotentialPartitionNames);
boolean hasUnsupportedTableType = mv.getBaseTableTypes().stream()
.anyMatch(type -> !SUPPORTED_TABLE_TYPES_FOR_ADAPTIVE_MV_REFRESH.contains(type));
if (hasUnsupportedTableType) {
logger.warn("Materialized view {} contains unsupported external tables. Using default refresh strategy.",
mv.getId());
filterPartitionByRefreshNumber(mvToRefreshedPartitions, mvPotentialPartitionNames, mv,
tentative);
} else {
switch (partitionRefreshStrategy) {
case ADAPTIVE:
filterPartitionByAdaptiveRefreshNumber(mvToRefreshedPartitions, mvPotentialPartitionNames, mv,
tentative);
break;
case STRICT:
default:
// Only refresh the first partition refresh number partitions, other partitions will generate new tasks
filterPartitionByRefreshNumber(mvToRefreshedPartitions, mvPotentialPartitionNames, mv,
tentative);
}
}
// calculate the associated potential partitions to refresh
mvToRefreshedPartitions = calcPotentialMVRefreshPartitions(mvPotentialPartitionNames,
mvToRefreshedPartitions);
int partitionRefreshNumber = mv.getTableProperty().getPartitionRefreshNumber();
logger.info("filter partitions to refresh partitionRefreshNumber={}, partitionsToRefresh:{}, " +
@ -256,17 +244,45 @@ public abstract class MVPCTRefreshPartitioner {
return mvToRefreshedPartitions;
}
@VisibleForTesting
public void filterPartitionByRefreshNumber(PCellSortedSet partitionsToRefresh,
Set<String> mvPotentialPartitionNames,
MaterializedView mv) {
filterPartitionByRefreshNumber(partitionsToRefresh, mvPotentialPartitionNames, mv, false);
private void filterMVToRefreshPartitions(PCellSortedSet mvToRefreshedPartitions,
Set<String> mvPotentialPartitionNames) {
if (mvToRefreshedPartitions == null || mvToRefreshedPartitions.isEmpty()) {
return;
}
// first filter partitions by user's config(eg: auto_refresh_partition_number)
filterMVToRefreshPartitionsByProperty(mvToRefreshedPartitions);
logger.info("after filterMVToRefreshPartitionsByProperty, partitionsToRefresh: {}",
mvToRefreshedPartitions);
if (mvToRefreshedPartitions.isEmpty() || mvToRefreshedPartitions.size() <= 1) {
return;
}
// filter partitions by partition refresh strategy
final MaterializedView.PartitionRefreshStrategy partitionRefreshStrategy = mv.getPartitionRefreshStrategy();
boolean hasUnsupportedTableType = mv.getBaseTableTypes().stream()
.anyMatch(type -> !SUPPORTED_TABLE_TYPES_FOR_ADAPTIVE_MV_REFRESH.contains(type));
if (hasUnsupportedTableType) {
logger.warn("Materialized view {} contains unsupported external tables. Using default refresh strategy.",
mv.getId());
filterPartitionWithStrict(mvToRefreshedPartitions, mvPotentialPartitionNames);
} else {
switch (partitionRefreshStrategy) {
case ADAPTIVE:
filterPartitionWithAdaptive(mvToRefreshedPartitions, mvPotentialPartitionNames);
break;
case STRICT:
default:
// Only refresh the first partition refresh number partitions, other partitions will generate new tasks
filterPartitionWithStrict(mvToRefreshedPartitions, mvPotentialPartitionNames);
}
}
logger.info("after filterPartitionByAdaptive, partitionsToRefresh: {}",
mvToRefreshedPartitions);
}
public void filterPartitionByRefreshNumber(PCellSortedSet partitionsToRefresh,
Set<String> mvPotentialPartitionNames,
MaterializedView mv,
boolean tentative) {
public void filterPartitionWithStrict(PCellSortedSet partitionsToRefresh,
Set<String> mvPotentialPartitionNames) {
// refresh all partition when it's a sync refresh, otherwise updated partitions may be lost.
ExecuteOption executeOption = mvContext.getExecuteOption();
if (executeOption != null && executeOption.getIsSync()) {
@ -283,20 +299,11 @@ public abstract class MVPCTRefreshPartitioner {
}
// do filter actions
filterPartitionByRefreshNumber(partitionsToRefresh, mvPotentialPartitionNames, tentative);
filterPartitionByRefreshNumber(partitionsToRefresh, mvPotentialPartitionNames);
}
@VisibleForTesting
public void filterPartitionByAdaptiveRefreshNumber(PCellSortedSet partitionsToRefresh,
Set<String> mvPotentialPartitionNames,
MaterializedView mv) {
filterPartitionByAdaptiveRefreshNumber(partitionsToRefresh, mvPotentialPartitionNames, mv, false);
}
public void filterPartitionByAdaptiveRefreshNumber(PCellSortedSet partitionsToRefresh,
Set<String> mvPotentialPartitionNames,
MaterializedView mv,
boolean tentative) {
public void filterPartitionWithAdaptive(PCellSortedSet partitionsToRefresh,
Set<String> mvPotentialPartitionNames) {
// refresh all partition when it's a sync refresh, otherwise updated partitions may be lost.
ExecuteOption executeOption = mvContext.getExecuteOption();
if (executeOption != null && executeOption.getIsSync()) {
@ -308,7 +315,7 @@ public abstract class MVPCTRefreshPartitioner {
}
// do filter actions
filterPartitionByAdaptiveRefreshNumber(partitionsToRefresh, mvPotentialPartitionNames, tentative);
filterPartitionByAdaptiveRefreshNumber(partitionsToRefresh, mvPotentialPartitionNames);
}
/**
@ -440,8 +447,8 @@ public abstract class MVPCTRefreshPartitioner {
toReservePartitionName, toRefreshPartitions);
}
}
logger.info("The ref base table {} has updated partitions: {}, the corresponding " +
"mv partitions to refresh: {}, " + "mvRangePartitionNames: {}", baseTable.getName(),
logger.info("base table {} updated partitions: {}, mv partitions to refresh: {}, "
+ "toRefreshPartitions: {}", baseTable.getName(),
refBaseTablePartitionNames, ans, toRefreshPartitions);
}
return result;

View File

@ -83,8 +83,9 @@ public final class MVPCTRefreshRangePartitioner extends MVPCTRefreshPartitioner
public MVPCTRefreshRangePartitioner(MvTaskRunContext mvContext,
TaskRunContext context,
Database db,
MaterializedView mv) {
super(mvContext, context, db, mv);
MaterializedView mv,
MVRefreshParams mvRefreshParams) {
super(mvContext, context, db, mv, mvRefreshParams);
this.differ = new RangePartitionDiffer(mv, false, null);
this.logger = MVTraceUtils.getLogger(mv, MVPCTRefreshRangePartitioner.class);
}
@ -243,45 +244,21 @@ public final class MVPCTRefreshRangePartitioner extends MVPCTRefreshPartitioner
@Override
public PCellSortedSet getMVPartitionsToRefresh(PartitionInfo mvPartitionInfo,
Map<Long, BaseTableSnapshotInfo> snapshotBaseTables,
MVRefreshParams mvRefreshParams,
Set<String> mvPotentialPartitionNames) throws AnalysisException {
// range partitioned materialized views
boolean isAutoRefresh = mvContext.getTaskType().isAutoRefresh();
int partitionTTLNumber = mvContext.getPartitionTTLNumber();
boolean isRefreshMvBaseOnNonRefTables = needsRefreshBasedOnNonRefTables(snapshotBaseTables);
String start = mvRefreshParams.getRangeStart();
String end = mvRefreshParams.getRangeEnd();
boolean force = mvRefreshParams.isForce();
PCellSortedSet mvRangePartitionNames = getMVPartitionNamesWithTTL(mv, mvRefreshParams, isAutoRefresh);
logger.info("Get partition names by range with partition limit, mv name: {}, start: {}, end: {}, force:{}, " +
"partitionTTLNumber: {}, isAutoRefresh: {}, mvRangePartitionNames: {}, isRefreshMvBaseOnNonRefTables:{}",
mv.getName(), start, end, force, partitionTTLNumber, isAutoRefresh, mvRangePartitionNames,
isRefreshMvBaseOnNonRefTables);
PCellSortedSet toRefreshPartitions = getMVPartitionsToRefreshImpl(mvRefreshParams,
mvPotentialPartitionNames, mvRangePartitionNames, isRefreshMvBaseOnNonRefTables, isAutoRefresh, force);
if (toRefreshPartitions.isEmpty()) {
return PCellSortedSet.of();
}
// filter partitions by refresh partition limit
int refreshPartitionLimit = getRefreshPartitionLimit();
if (mvRefreshParams.isCompleteRefresh() &&
refreshPartitionLimit > 0 && toRefreshPartitions.size() > refreshPartitionLimit) {
// remove the oldest partitions
int toRemoveNum = toRefreshPartitions.size() - refreshPartitionLimit;
return toRefreshPartitions.skip(toRemoveNum);
}
return toRefreshPartitions;
PCellSortedSet result = getMVPartitionsToRefreshImpl(isRefreshMvBaseOnNonRefTables, isAutoRefresh);
logger.info("get mv partition to refresh, refresh params:{}, " +
"isAutoRefresh: {}, isRefreshMvBaseOnNonRefTables:{}, result:{}",
mvRefreshParams, isAutoRefresh, isRefreshMvBaseOnNonRefTables, result);
return result;
}
public PCellSortedSet getMVPartitionsToRefreshImpl(MVRefreshParams mvRefreshParams,
Set<String> mvPotentialPartitionNames,
PCellSortedSet mvPCellWithNames,
boolean isRefreshMvBaseOnNonRefTables,
boolean isAutoRefresh,
boolean force) {
// check non-ref base tables or force refresh
if (force || isRefreshMvBaseOnNonRefTables) {
public PCellSortedSet getMVPartitionsToRefreshImpl(boolean isRefreshMvBaseOnNonRefTables,
boolean isAutoRefresh) throws AnalysisException {
PCellSortedSet mvPCellWithNames = getMVPartitionNamesWithTTL(isAutoRefresh);
if (mvRefreshParams.isForce() || isRefreshMvBaseOnNonRefTables) {
if (mvRefreshParams.isCompleteRefresh()) {
// if non-partition table changed, should refresh all partitions of materialized view
return mvPCellWithNames;
@ -301,7 +278,7 @@ public final class MVPCTRefreshRangePartitioner extends MVPCTRefreshPartitioner
// if use method below, it will break in the 2th TaskRun because ref-table has not updated in the
// specific start and end ranges.
return mvPCellWithNames;
} else if (force) {
} else if (mvRefreshParams.isForce()) {
// should refresh all related partitions if user want to do force refresh
return mvPCellWithNames;
} else {
@ -311,14 +288,26 @@ public final class MVPCTRefreshRangePartitioner extends MVPCTRefreshPartitioner
}
}
}
// check the related partition table
PCellSortedSet result = getMvPartitionNamesToRefresh(mvPCellWithNames);
if (result.isEmpty()) {
logger.info("No need to refresh materialized view partitions");
return result;
}
return getMvPartitionNamesToRefresh(mvPCellWithNames);
}
@Override
public void filterMVToRefreshPartitionsByProperty(PCellSortedSet mvToRefreshedPartitions) {
// filter partitions by refresh partition limit
int refreshPartitionLimit = getRefreshPartitionLimit();
if (mvRefreshParams.isCompleteRefresh() &&
refreshPartitionLimit > 0 && mvToRefreshedPartitions.size() > refreshPartitionLimit) {
// remove the oldest partitions
int toRemoveNum = mvToRefreshedPartitions.size() - refreshPartitionLimit;
mvToRefreshedPartitions.removeWithSize(toRemoveNum);
}
}
@Override
public PCellSortedSet calcPotentialMVRefreshPartitions(Set<String> mvPotentialPartitionNames,
PCellSortedSet result) {
// check non-ref base tables or force refresh
Map<Table, Set<String>> baseChangedPartitionNames = getBasePartitionNamesByMVPartitionNames(result);
if (baseChangedPartitionNames.isEmpty()) {
logger.info("Cannot get associated base table change partitions from mv's refresh partitions {}",
@ -355,7 +344,6 @@ public final class MVPCTRefreshRangePartitioner extends MVPCTRefreshPartitioner
}
result.add(PCellWithName.of(partitionName, pCell));
}
logger.info("Finish calcPotentialRefreshPartition, needRefreshMvPartitionNames: {}," +
" baseChangedPartitionNames: {}", result, baseChangedPartitionNames);
}
@ -371,8 +359,7 @@ public final class MVPCTRefreshRangePartitioner extends MVPCTRefreshPartitioner
}
@Override
public PCellSortedSet getMVPartitionNamesWithTTL(MaterializedView mv, MVRefreshParams mvRefreshParams,
boolean isAutoRefresh) throws AnalysisException {
public PCellSortedSet getMVPartitionNamesWithTTL(boolean isAutoRefresh) throws AnalysisException {
PCellSortedSet mvPartitionCells = PCellSortedSet.of();
int partitionTTLNumber = getPartitionTTLLimit();
boolean hasPartitionRange = !mvRefreshParams.isCompleteRefresh();
@ -443,23 +430,20 @@ public final class MVPCTRefreshRangePartitioner extends MVPCTRefreshPartitioner
@Override
public void filterPartitionByRefreshNumber(PCellSortedSet mvPartitionsToRefresh,
Set<String> mvPotentialPartitionNames,
boolean tentative) {
filterPartitionByRefreshNumberInternal(mvPartitionsToRefresh, mvPotentialPartitionNames, tentative,
Set<String> mvPotentialPartitionNames) {
filterPartitionByRefreshNumberInternal(mvPartitionsToRefresh, mvPotentialPartitionNames,
MaterializedView.PartitionRefreshStrategy.STRICT);
}
@Override
public void filterPartitionByAdaptiveRefreshNumber(PCellSortedSet mvPartitionsToRefresh,
Set<String> mvPotentialPartitionNames,
boolean tentative) {
filterPartitionByRefreshNumberInternal(mvPartitionsToRefresh, mvPotentialPartitionNames, tentative,
Set<String> mvPotentialPartitionNames) {
filterPartitionByRefreshNumberInternal(mvPartitionsToRefresh, mvPotentialPartitionNames,
MaterializedView.PartitionRefreshStrategy.ADAPTIVE);
}
public void filterPartitionByRefreshNumberInternal(PCellSortedSet mvPartitionsToRefresh,
Set<String> mvPotentialPartitionNames,
boolean tentative,
MaterializedView.PartitionRefreshStrategy refreshStrategy) {
int partitionRefreshNumber = mv.getTableProperty().getPartitionRefreshNumber();
Map<String, Range<PartitionKey>> mvRangePartitionMap = mv.getRangePartitionMap();
@ -520,7 +504,7 @@ public final class MVPCTRefreshRangePartitioner extends MVPCTRefreshPartitioner
if (!Config.materialized_view_refresh_ascending) {
sortedIterator = sortedPartition.iterator();
}
setNextPartitionStartAndEnd(mvPartitionsToRefresh, sortedIterator, tentative);
setNextPartitionStartAndEnd(mvPartitionsToRefresh, sortedIterator);
}
public int getAdaptivePartitionRefreshNumber(
@ -547,8 +531,7 @@ public final class MVPCTRefreshRangePartitioner extends MVPCTRefreshPartitioner
}
private void setNextPartitionStartAndEnd(PCellSortedSet partitionsToRefresh,
Iterator<PCellWithName> iterator,
boolean tentative) {
Iterator<PCellWithName> iterator) {
String nextPartitionStart = null;
PCellWithName end = null;
if (iterator.hasNext()) {
@ -565,7 +548,7 @@ public final class MVPCTRefreshRangePartitioner extends MVPCTRefreshPartitioner
partitionsToRefresh.remove(end);
}
if (!tentative) {
if (!mvRefreshParams.isTentative()) {
mvContext.setNextPartitionStart(nextPartitionStart);
if (end != null) {

View File

@ -34,6 +34,10 @@ public class MVRefreshParams {
private final Set<PListCell> listValues;
private final PartitionInfo mvPartitionInfo;
// whether the refresh is tentative, when it's true, it means the refresh is triggered temporarily and used for
// find candidate partitions to refresh.
private boolean isTentative = false;
public MVRefreshParams(PartitionInfo partitionInfo,
Map<String, String> properties,
boolean tentative) {
@ -47,11 +51,23 @@ public class MVRefreshParams {
}
public boolean isForceCompleteRefresh() {
return isForce && isCompleteRefresh();
return isForce() && isCompleteRefresh();
}
public boolean isNonTentativeForce() {
return isForce && !isTentative;
}
public boolean isForce() {
return isForce;
return isForce || isTentative;
}
public void setIsTentative(boolean tentative) {
this.isTentative = tentative;
}
public boolean isTentative() {
return isTentative;
}
public boolean isCompleteRefresh() {
@ -83,4 +99,14 @@ public class MVRefreshParams {
public Set<PListCell> getListValues() {
return listValues;
}
@Override
public String toString() {
return "{rangeStart='" + rangeStart + '\'' +
", rangeEnd='" + rangeEnd + '\'' +
", isForce=" + isForce +
", isTentative=" + isTentative +
", listValues=" + listValues +
'}';
}
}

View File

@ -179,6 +179,10 @@ public class MVTaskRunExtraMessage implements Writable {
Config.max_mv_task_run_meta_message_values_length);
}
public Map<String, String> getPlanBuilderMessage() {
return planBuilderMessage;
}
@Override
public String toString() {
return GsonUtils.GSON.toJson(this);

View File

@ -989,7 +989,7 @@ public class PartitionBasedMvRefreshProcessorOlapTest extends MVTestBase {
MVPCTRefreshPartitioner partitioner = processor.getMvRefreshPartitioner();
PCellSortedSet mvToRefreshPartitionNames = getMVPCellWithNames(mv, mv.getPartitionNames());
partitioner.filterPartitionByAdaptiveRefreshNumber(mvToRefreshPartitionNames,
Sets.newHashSet(), mv);
Sets.newHashSet());
MvTaskRunContext mvContext = processor.getMvContext();
Assertions.assertNull(mvContext.getNextPartitionStart());
Assertions.assertNull(mvContext.getNextPartitionEnd());
@ -1020,7 +1020,7 @@ public class PartitionBasedMvRefreshProcessorOlapTest extends MVTestBase {
MVPCTBasedRefreshProcessor processor = getPartitionBasedRefreshProcessor(taskRun);
MVPCTRefreshPartitioner partitioner = processor.getMvRefreshPartitioner();
partitioner.filterPartitionByAdaptiveRefreshNumber(getMVPCellWithNames(mv, mv.getPartitionNames()),
Sets.newHashSet(), mv);
Sets.newHashSet());
MvTaskRunContext mvContext = processor.getMvContext();
Assertions.assertNull(mvContext.getNextPartitionStart());
Assertions.assertNull(mvContext.getNextPartitionEnd());
@ -1064,7 +1064,7 @@ public class PartitionBasedMvRefreshProcessorOlapTest extends MVTestBase {
MVPCTBasedRefreshProcessor processor = getPartitionBasedRefreshProcessor(taskRun);
MVPCTRefreshPartitioner partitioner = processor.getMvRefreshPartitioner();
partitioner.filterPartitionByAdaptiveRefreshNumber(getMVPCellWithNames(mv, mv.getPartitionNames()),
Sets.newHashSet(), mv);
Sets.newHashSet());
MvTaskRunContext mvContext = processor.getMvContext();
Assertions.assertNull(mvContext.getNextPartitionStart());
Assertions.assertNull(mvContext.getNextPartitionEnd());
@ -1123,19 +1123,19 @@ public class PartitionBasedMvRefreshProcessorOlapTest extends MVTestBase {
// round 1
PCellSortedSet toRefresh = getMVPCellWithNames(mv, allPartitions);
partitioner.filterPartitionByRefreshNumber(toRefresh, Sets.newHashSet(), mv);
partitioner.filterPartitionByRefreshNumber(toRefresh, Sets.newHashSet());
assertPCellNameEquals("p0", toRefresh);
refreshedPartitions.addAll(getPartitionNames(toRefresh));
// round 2
toRefresh = getMVPCellWithNames(mv, SetUtils.disjunction(allPartitions, refreshedPartitions));
partitioner.filterPartitionByRefreshNumber(toRefresh, Sets.newHashSet(), mv);
partitioner.filterPartitionByRefreshNumber(toRefresh, Sets.newHashSet());
assertPCellNameEquals("p1", toRefresh);
refreshedPartitions.addAll(getPartitionNames(toRefresh));
// round 3
toRefresh = getMVPCellWithNames(mv, SetUtils.disjunction(allPartitions, refreshedPartitions));
partitioner.filterPartitionByRefreshNumber(toRefresh, Sets.newHashSet(), mv);
partitioner.filterPartitionByRefreshNumber(toRefresh, Sets.newHashSet());
assertPCellNameEquals("p2", toRefresh);
refreshedPartitions.addAll(getPartitionNames(toRefresh));
}
@ -1147,31 +1147,31 @@ public class PartitionBasedMvRefreshProcessorOlapTest extends MVTestBase {
// round 1
PCellSortedSet toRefresh = getMVPCellWithNames(mv, allPartitions);
partitioner.filterPartitionByRefreshNumber(toRefresh, Sets.newHashSet(), mv);
partitioner.filterPartitionByRefreshNumber(toRefresh, Sets.newHashSet());
assertPCellNameEquals("p4", toRefresh);
refreshedPartitions.addAll(getPartitionNames(toRefresh));
// round 2
toRefresh = getMVPCellWithNames(mv, SetUtils.disjunction(allPartitions, refreshedPartitions));
partitioner.filterPartitionByRefreshNumber(toRefresh, Sets.newHashSet(), mv);
partitioner.filterPartitionByRefreshNumber(toRefresh, Sets.newHashSet());
assertPCellNameEquals("p3", toRefresh);
refreshedPartitions.addAll(getPartitionNames(toRefresh));
// round 3
toRefresh = getMVPCellWithNames(mv, SetUtils.disjunction(allPartitions, refreshedPartitions));
partitioner.filterPartitionByRefreshNumber(toRefresh, Sets.newHashSet(), mv);
partitioner.filterPartitionByRefreshNumber(toRefresh, Sets.newHashSet());
assertPCellNameEquals("p2", toRefresh);
refreshedPartitions.addAll(getPartitionNames(toRefresh));
// round 4
toRefresh = getMVPCellWithNames(mv, SetUtils.disjunction(allPartitions, refreshedPartitions));
partitioner.filterPartitionByRefreshNumber(toRefresh, Sets.newHashSet(), mv);
partitioner.filterPartitionByRefreshNumber(toRefresh, Sets.newHashSet());
assertPCellNameEquals("p1", toRefresh);
refreshedPartitions.addAll(getPartitionNames(toRefresh));
// round 5
toRefresh = getMVPCellWithNames(mv, SetUtils.disjunction(allPartitions, refreshedPartitions));
partitioner.filterPartitionByRefreshNumber(toRefresh, Sets.newHashSet(), mv);
partitioner.filterPartitionByRefreshNumber(toRefresh, Sets.newHashSet());
assertPCellNameEquals("p0", toRefresh);
refreshedPartitions.addAll(getPartitionNames(toRefresh));
Config.materialized_view_refresh_ascending = true;
@ -2816,7 +2816,7 @@ public class PartitionBasedMvRefreshProcessorOlapTest extends MVTestBase {
MVPCTBasedRefreshProcessor processor =
(MVPCTBasedRefreshProcessor) mvTaskRunProcessor.getMVRefreshProcessor();
Set<String> result = processor.getPCTMVToRefreshedPartitions(mvTaskRunContext, false);
Set<String> result = processor.getPCTMVToRefreshedPartitions(false);
Assertions.assertTrue(result.isEmpty());
});
}
@ -2859,7 +2859,7 @@ public class PartitionBasedMvRefreshProcessorOlapTest extends MVTestBase {
MVPCTBasedRefreshProcessor mvRefreshProcessor =
(MVPCTBasedRefreshProcessor) processor.getMVRefreshProcessor();
MvTaskRunContext mvTaskRunContext = new MvTaskRunContext(taskRunContext);
Set<String> result = mvRefreshProcessor.getPCTMVToRefreshedPartitions(mvTaskRunContext, false);
Set<String> result = mvRefreshProcessor.getPCTMVToRefreshedPartitions(false);
Assertions.assertFalse(result.isEmpty());
Set<String> expect = ImmutableSet.of("p0", "p1", "p2", "p3", "p4");
Assertions.assertEquals(expect, result);

View File

@ -34,8 +34,9 @@ public class MVPCTRefreshNonPartitionerTest {
TaskRunContext taskRunContext = Mockito.mock(TaskRunContext.class);
Database database = Mockito.mock(Database.class);
MaterializedView mv = Mockito.mock(MaterializedView.class);
MVRefreshParams mvRefreshParams = Mockito.mock(MVRefreshParams.class);
MVPCTRefreshNonPartitioner job = new MVPCTRefreshNonPartitioner(mvTaskRunContext, taskRunContext,
database, mv);
database, mv, mvRefreshParams);
Iterator<PCellWithName> dummyIter = Collections.emptyIterator();
int result = job.getAdaptivePartitionRefreshNumber(dummyIter);
Assertions.assertEquals(0, result);

View File

@ -68,7 +68,9 @@ public class MVPCTRefreshRangePartitionerTest {
List<PCellWithName> partitions = Arrays.asList(PCellWithName.of("mv_p1", new PCellNone()),
PCellWithName.of("mv_p2", new PCellNone()));
Iterator<PCellWithName> iter = partitions.iterator();
MVPCTRefreshRangePartitioner partitioner = new MVPCTRefreshRangePartitioner(mvContext, null, null, mv);
MVRefreshParams mvRefreshParams = new MVRefreshParams(mv.getPartitionInfo(), new HashMap<>(), false);
MVPCTRefreshRangePartitioner partitioner = new MVPCTRefreshRangePartitioner(mvContext, null,
null, mv, mvRefreshParams);
MVAdaptiveRefreshException exception = Assertions.assertThrows(MVAdaptiveRefreshException.class,
() -> partitioner.getAdaptivePartitionRefreshNumber(iter));
Assertions.assertTrue(exception.getMessage().contains("Missing too many partition stats"));
@ -84,7 +86,9 @@ public class MVPCTRefreshRangePartitionerTest {
when(mv.getPartitionInfo()).thenReturn(mock(PartitionInfo.class));
when(mv.getTableProperty().getPartitionTTLNumber()).thenReturn(2);
MVPCTRefreshRangePartitioner partitioner = new MVPCTRefreshRangePartitioner(mvContext, null, null, mv);
MVRefreshParams mvRefreshParams = new MVRefreshParams(mv.getPartitionInfo(), new HashMap<>(), false);
MVPCTRefreshRangePartitioner partitioner = new MVPCTRefreshRangePartitioner(mvContext, null, null, mv,
mvRefreshParams);
PCellSortedSet toRefreshPartitions = PCellSortedSet.of();
toRefreshPartitions.add(PCellWithName.of("partition1", new PCellNone()));

View File

@ -0,0 +1,91 @@
-- name: test_mv_refresh_with_potential_partitions
create database db_${uuid0};
-- result:
-- !result
use db_${uuid0};
-- result:
-- !result
CREATE TABLE `t1` (
`k1` date,
`k2` datetime,
`k3` char(20),
`k4` varchar(20),
`k5` boolean,
`k6` tinyint,
`k7` smallint,
`k8` int,
`k9` bigint,
`k10` largeint,
`k11` float,
`k12` double,
`k13` decimal(27,9)
)
DUPLICATE KEY(`k1`, `k2`, `k3`, `k4`, `k5`)
COMMENT "OLAP"
PARTITION BY RANGE (k1) (
START ("2020-06-01") END ("2020-09-01") EVERY (INTERVAL 3 day)
)
DISTRIBUTED BY HASH(`k1`, `k2`, `k3`, `k4`, `k5`) BUCKETS 3
PROPERTIES (
"replication_num" = "1"
);
-- result:
-- !result
INSERT INTO t1
SELECT
date_add('2020-06-01', INTERVAL (series_id * 2) DAY) as k1,
date_add('2020-06-01 12:12:12', INTERVAL (series_id * 2) DAY) as k2,
'k3' as k3,
'k4' as k4,
(series_id % 2 = 0) as k5,
(series_id % 3 + 1) as k6,
2 as k7,
3 as k8,
4 as k9,
5 as k10,
1.1 as k11,
1.12 as k12,
2.889 as k13
FROM TABLE(generate_series(0, 37)) t(series_id)
WHERE date_add('2020-06-01', INTERVAL (series_id * 2) DAY) <= '2020-08-15';
-- result:
-- !result
CREATE MATERIALIZED VIEW test_mv1
PARTITION BY date_trunc('month',k1)
DISTRIBUTED BY HASH(k1) BUCKETS 10
REFRESH DEFERRED ASYNC
AS SELECT k1,k6,k7,k8,k9,k10,k11,k12,k13 FROM t1;
-- result:
-- !result
REFRESH MATERIALIZED VIEW test_mv1 WITH SYNC MODE;
function: check_hit_materialized_view("SELECT k1,k6,k7,k8,k9,k10,k11,k12,k13 FROM t1", "test_mv1")
-- result:
None
-- !result
SELECT COUNT(1) FROM test_mv1;
-- result:
38
-- !result
SELECT COUNT(1) FROM t1;
-- result:
38
-- !result
SELECT * FROM test_mv1 ORDER BY 1,2,3,4,5,6 LIMIT 5;
-- result:
2020-06-01 1 2 3 4 5 1.1 1.12 2.889000000
2020-06-03 2 2 3 4 5 1.1 1.12 2.889000000
2020-06-05 3 2 3 4 5 1.1 1.12 2.889000000
2020-06-07 1 2 3 4 5 1.1 1.12 2.889000000
2020-06-09 2 2 3 4 5 1.1 1.12 2.889000000
-- !result
SELECT * FROM t1 ORDER BY 1,2,3,4,5,6 LIMIT 5;
-- result:
2020-06-01 2020-06-01 12:12:12 k3 k4 1 1 2 3 4 5 1.1 1.12 2.889000000
2020-06-03 2020-06-03 12:12:12 k3 k4 0 2 2 3 4 5 1.1 1.12 2.889000000
2020-06-05 2020-06-05 12:12:12 k3 k4 1 3 2 3 4 5 1.1 1.12 2.889000000
2020-06-07 2020-06-07 12:12:12 k3 k4 0 1 2 3 4 5 1.1 1.12 2.889000000
2020-06-09 2020-06-09 12:12:12 k3 k4 1 2 2 3 4 5 1.1 1.12 2.889000000
-- !result
drop database db_${uuid0};
-- result:
-- !result

View File

@ -0,0 +1,64 @@
-- name: test_mv_refresh_with_potential_partitions
create database db_${uuid0};
use db_${uuid0};
CREATE TABLE `t1` (
`k1` date,
`k2` datetime,
`k3` char(20),
`k4` varchar(20),
`k5` boolean,
`k6` tinyint,
`k7` smallint,
`k8` int,
`k9` bigint,
`k10` largeint,
`k11` float,
`k12` double,
`k13` decimal(27,9)
)
DUPLICATE KEY(`k1`, `k2`, `k3`, `k4`, `k5`)
COMMENT "OLAP"
PARTITION BY RANGE (k1) (
START ("2020-06-01") END ("2020-09-01") EVERY (INTERVAL 3 day)
)
DISTRIBUTED BY HASH(`k1`, `k2`, `k3`, `k4`, `k5`) BUCKETS 3
PROPERTIES (
"replication_num" = "1"
);
INSERT INTO t1
SELECT
date_add('2020-06-01', INTERVAL (series_id * 2) DAY) as k1,
date_add('2020-06-01 12:12:12', INTERVAL (series_id * 2) DAY) as k2,
'k3' as k3,
'k4' as k4,
(series_id % 2 = 0) as k5,
(series_id % 3 + 1) as k6,
2 as k7,
3 as k8,
4 as k9,
5 as k10,
1.1 as k11,
1.12 as k12,
2.889 as k13
FROM TABLE(generate_series(0, 37)) t(series_id)
WHERE date_add('2020-06-01', INTERVAL (series_id * 2) DAY) <= '2020-08-15';
CREATE MATERIALIZED VIEW test_mv1
PARTITION BY date_trunc('month',k1)
DISTRIBUTED BY HASH(k1) BUCKETS 10
REFRESH DEFERRED ASYNC
AS SELECT k1,k6,k7,k8,k9,k10,k11,k12,k13 FROM t1;
REFRESH MATERIALIZED VIEW test_mv1 WITH SYNC MODE;
function: check_hit_materialized_view("SELECT k1,k6,k7,k8,k9,k10,k11,k12,k13 FROM t1", "test_mv1")
SELECT COUNT(1) FROM test_mv1;
SELECT COUNT(1) FROM t1;
SELECT * FROM test_mv1 ORDER BY 1,2,3,4,5,6 LIMIT 5;
SELECT * FROM t1 ORDER BY 1,2,3,4,5,6 LIMIT 5;
drop database db_${uuid0};