From ee66eb3b3f47f7a25d6f5c71ab1fc9ecfb7022d2 Mon Sep 17 00:00:00 2001 From: "shuming.li" Date: Sat, 11 Oct 2025 11:33:11 +0800 Subject: [PATCH] [Enhancement] Change default_mv_partition_refresh_strategy to adaptive by default (#63594) Signed-off-by: shuming.li --- .../starrocks/catalog/MaterializedView.java | 35 +++- .../com/starrocks/catalog/TableProperty.java | 21 ++- .../java/com/starrocks/common/Config.java | 9 +- .../java/com/starrocks/scheduler/TaskRun.java | 5 + .../starrocks/scheduler/TaskRunManager.java | 6 + .../mv/MVPCTRefreshListPartitioner.java | 62 +++---- .../mv/MVPCTRefreshNonPartitioner.java | 14 +- .../scheduler/mv/MVPCTRefreshPartitioner.java | 162 +++++++++++------- .../mv/MVPCTRefreshRangePartitioner.java | 92 +++------- .../starrocks/sql/common/PCellSortedSet.java | 5 + .../com/starrocks/sql/common/PRangeCell.java | 11 +- .../catalog/MaterializedViewTest.java | 4 +- .../planner/QueryCacheAndMVTest.java | 3 +- ...nBasedMvRefreshProcessorOlapPart2Test.java | 20 +-- ...titionBasedMvRefreshProcessorOlapTest.java | 33 ++-- .../mv/MVPCTRefreshNonPartitionerTest.java | 8 +- .../mv/MVPCTRefreshRangePartitionerTest.java | 5 +- .../materialization/MVTestBase.java | 15 ++ 18 files changed, 260 insertions(+), 250 deletions(-) diff --git a/fe/fe-core/src/main/java/com/starrocks/catalog/MaterializedView.java b/fe/fe-core/src/main/java/com/starrocks/catalog/MaterializedView.java index b7792fe9f07..8d94465db10 100644 --- a/fe/fe-core/src/main/java/com/starrocks/catalog/MaterializedView.java +++ b/fe/fe-core/src/main/java/com/starrocks/catalog/MaterializedView.java @@ -159,7 +159,16 @@ public class MaterializedView extends OlapTable implements GsonPreProcessable, G ADAPTIVE; public static PartitionRefreshStrategy defaultValue() { - return STRICT; + return ADAPTIVE; + } + + public static PartitionRefreshStrategy of(String val) { + try { + return PartitionRefreshStrategy.valueOf(val.trim().toUpperCase()); + } catch (Exception e) { + LOG.warn("Failed to get partition refresh strategy, use default value", e); + return PartitionRefreshStrategy.defaultValue(); + } } } @@ -1396,15 +1405,25 @@ public class MaterializedView extends OlapTable implements GsonPreProcessable, G * Return the partition refresh strategy of the materialized view. */ public PartitionRefreshStrategy getPartitionRefreshStrategy() { - if (tableProperty == null || tableProperty.getPartitionRefreshStrategy() == null) { + TableProperty tableProperty = getTableProperty(); + if (tableProperty == null) { return PartitionRefreshStrategy.defaultValue(); } - try { - return PartitionRefreshStrategy.valueOf(tableProperty.getPartitionRefreshStrategy().trim().toUpperCase()); - } catch (Exception e) { - LOG.warn("Failed to get partition refresh strategy for materialized view: {}, use default value", - this.getName(), e); - return PartitionRefreshStrategy.defaultValue(); + + // otherwise, use `partition_refresh_strategy` property. + PartitionRefreshStrategy partitionRefreshStrategy = + PartitionRefreshStrategy.of(tableProperty.getPartitionRefreshStrategy()); + if (tableProperty.isSetPartitionRefreshStrategy() + && !PartitionRefreshStrategy.STRICT.equals(partitionRefreshStrategy)) { + return partitionRefreshStrategy; + } else { + // if mv has `partition_refresh_number` property, use it first. + // for compatibility, if `partition_refresh_number` is set and not equal to 1, + if (tableProperty.isSetPartitionRefreshNumber()) { + return PartitionRefreshStrategy.STRICT; + } else { + return PartitionRefreshStrategy.defaultValue(); + } } } diff --git a/fe/fe-core/src/main/java/com/starrocks/catalog/TableProperty.java b/fe/fe-core/src/main/java/com/starrocks/catalog/TableProperty.java index 0931035aaef..2718c8156bd 100644 --- a/fe/fe-core/src/main/java/com/starrocks/catalog/TableProperty.java +++ b/fe/fe-core/src/main/java/com/starrocks/catalog/TableProperty.java @@ -209,15 +209,15 @@ public class TableProperty implements Writable, GsonPostProcessable { // This property only applies to materialized views // It represents the maximum number of partitions that will be refreshed by a TaskRun refresh - private int partitionRefreshNumber = Config.default_mv_partition_refresh_number; + private int partitionRefreshNumber = INVALID; // This property only applies to materialized views // It represents the mode selected to determine the number of partitions to refresh - private String partitionRefreshStrategy = Config.default_mv_partition_refresh_strategy; + private String partitionRefreshStrategy = ""; // This property only applies to materialized views/ // It represents the mode selected to determine how to refresh the materialized view - private String mvRefreshMode = Config.default_mv_refresh_mode; + private String mvRefreshMode = ""; // This property only applies to materialized views // When using the system to automatically refresh, the maximum range of the most recent partitions will be refreshed. @@ -994,12 +994,21 @@ public class TableProperty implements Writable, GsonPostProcessable { this.autoRefreshPartitionsLimit = autoRefreshPartitionsLimit; } + public boolean isSetPartitionRefreshNumber() { + return partitionRefreshNumber != INVALID; + } + public int getPartitionRefreshNumber() { - return partitionRefreshNumber; + return partitionRefreshNumber == INVALID ? Config.default_mv_partition_refresh_number : partitionRefreshNumber; + } + + public boolean isSetPartitionRefreshStrategy() { + return !Strings.isNullOrEmpty(partitionRefreshStrategy); } public String getPartitionRefreshStrategy() { - return partitionRefreshStrategy; + return Strings.isNullOrEmpty(partitionRefreshStrategy) ? Config.default_mv_partition_refresh_strategy + : partitionRefreshStrategy; } public void setPartitionRefreshNumber(int partitionRefreshNumber) { @@ -1015,7 +1024,7 @@ public class TableProperty implements Writable, GsonPostProcessable { } public String getMvRefreshMode() { - return mvRefreshMode; + return Strings.isNullOrEmpty(mvRefreshMode) ? Config.default_mv_refresh_mode : mvRefreshMode; } public void setResourceGroup(String resourceGroup) { diff --git a/fe/fe-core/src/main/java/com/starrocks/common/Config.java b/fe/fe-core/src/main/java/com/starrocks/common/Config.java index dbf8d6ac646..184f1a73fbd 100644 --- a/fe/fe-core/src/main/java/com/starrocks/common/Config.java +++ b/fe/fe-core/src/main/java/com/starrocks/common/Config.java @@ -3396,8 +3396,8 @@ public class Config extends ConfigBase { @ConfField(mutable = true, comment = "The default try lock timeout for mv refresh to try base table/mv dbs' lock") public static int mv_refresh_try_lock_timeout_ms = 30 * 1000; - @ConfField(mutable = true, comment = "materialized view can refresh at most 10 partition at a time") - public static int mv_max_partitions_num_per_refresh = 10; + @ConfField(mutable = true, comment = "materialized view can refresh at most 64 partition at a time") + public static int mv_max_partitions_num_per_refresh = 64; @ConfField(mutable = true, comment = "materialized view can refresh at most 100_000_000 rows of data at a time") public static long mv_max_rows_per_refresh = 100_000_000L; @@ -3428,8 +3428,9 @@ public class Config extends ConfigBase { @ConfField(mutable = true) public static int default_mv_partition_refresh_number = 1; - @ConfField(mutable = true) - public static String default_mv_partition_refresh_strategy = "strict"; + @ConfField(mutable = true, comment = "The default refresh strategy for materialized view partition refresh, " + + "adaptive by default") + public static String default_mv_partition_refresh_strategy = "adaptive"; @ConfField(mutable = true) public static String default_mv_refresh_mode = "pct"; diff --git a/fe/fe-core/src/main/java/com/starrocks/scheduler/TaskRun.java b/fe/fe-core/src/main/java/com/starrocks/scheduler/TaskRun.java index 9986c6990b2..877ca35f87f 100644 --- a/fe/fe-core/src/main/java/com/starrocks/scheduler/TaskRun.java +++ b/fe/fe-core/src/main/java/com/starrocks/scheduler/TaskRun.java @@ -222,6 +222,11 @@ public class TaskRun implements Comparable { return Math.min(Math.min(defaultTimeoutS, Config.task_runs_ttl_second), Config.task_ttl_second); } + @VisibleForTesting + public void setStatus(TaskRunStatus status) { + this.status = status; + } + public Map refreshTaskProperties(ConnectContext ctx) { Map newProperties = Maps.newHashMap(); if (task.getSource() != Constants.TaskSource.MV) { diff --git a/fe/fe-core/src/main/java/com/starrocks/scheduler/TaskRunManager.java b/fe/fe-core/src/main/java/com/starrocks/scheduler/TaskRunManager.java index fae55f8b165..8c9306cef68 100644 --- a/fe/fe-core/src/main/java/com/starrocks/scheduler/TaskRunManager.java +++ b/fe/fe-core/src/main/java/com/starrocks/scheduler/TaskRunManager.java @@ -28,6 +28,7 @@ import com.starrocks.scheduler.history.TaskRunHistory; import com.starrocks.scheduler.persist.TaskRunStatus; import com.starrocks.scheduler.persist.TaskRunStatusChange; import com.starrocks.server.GlobalStateMgr; +import org.apache.arrow.util.VisibleForTesting; import org.apache.commons.collections4.CollectionUtils; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -58,6 +59,11 @@ public class TaskRunManager implements MemoryTrackable { this.taskRunScheduler = taskRunScheduler; } + public SubmitResult submitTaskRun(TaskRun taskRun) { + return submitTaskRun(taskRun, taskRun.getExecuteOption()); + } + + @VisibleForTesting public SubmitResult submitTaskRun(TaskRun taskRun, ExecuteOption option) { LOG.info("submit task run:{}", taskRun); diff --git a/fe/fe-core/src/main/java/com/starrocks/scheduler/mv/MVPCTRefreshListPartitioner.java b/fe/fe-core/src/main/java/com/starrocks/scheduler/mv/MVPCTRefreshListPartitioner.java index b0e6f4fe3d4..ec525f5c857 100644 --- a/fe/fe-core/src/main/java/com/starrocks/scheduler/mv/MVPCTRefreshListPartitioner.java +++ b/fe/fe-core/src/main/java/com/starrocks/scheduler/mv/MVPCTRefreshListPartitioner.java @@ -423,40 +423,40 @@ public final class MVPCTRefreshListPartitioner extends MVPCTRefreshPartitioner { } @Override - public void filterPartitionByRefreshNumber(PCellSortedSet mvPartitionsToRefresh) { - filterPartitionByRefreshNumberInternal(mvPartitionsToRefresh, MaterializedView.PartitionRefreshStrategy.STRICT); - } - - @Override - public void filterPartitionByAdaptiveRefreshNumber(PCellSortedSet mvPartitionsToRefresh) { - filterPartitionByRefreshNumberInternal(mvPartitionsToRefresh, MaterializedView.PartitionRefreshStrategy.ADAPTIVE); - } - - public void filterPartitionByRefreshNumberInternal(PCellSortedSet toRefreshPartitions, - MaterializedView.PartitionRefreshStrategy refreshStrategy) { - + public void filterPartitionByRefreshNumber(PCellSortedSet toRefreshPartitions, + MaterializedView.PartitionRefreshStrategy refreshStrategy) { // filter by partition ttl filterPartitionsByTTL(toRefreshPartitions, false); if (toRefreshPartitions == null || toRefreshPartitions.isEmpty()) { return; } - - Iterator iterator = toRefreshPartitions.iterator(); + // filter invalid cells from input + toRefreshPartitions.stream() + .filter(cell -> !mv.getListPartitionItems().containsKey(cell.name())) + .forEach(toRefreshPartitions::remove); // dynamically get the number of partitions to be refreshed this time - int refreshNumber = getRefreshNumberByMode(iterator, refreshStrategy); - if (toRefreshPartitions.size() <= refreshNumber) { + int partitionRefreshNumber = getPartitionRefreshNumberAdaptive(toRefreshPartitions, refreshStrategy); + if (partitionRefreshNumber <= 0 || partitionRefreshNumber >= toRefreshPartitions.size()) { return; } - // iterate refreshNumber times - Set nextPartitionValues = Sets.newHashSet(); int i = 0; - while (iterator.hasNext() && i++ < refreshNumber) { + // refresh the recent partitions first + Iterator iterator = getToRefreshPartitionsIterator(toRefreshPartitions, false); + while (i++ < partitionRefreshNumber && iterator.hasNext()) { + PCellWithName pCell = iterator.next(); + logger.debug("Materialized view [{}] to refresh partition name {}, value {}", + mv.getName(), pCell.name(), pCell.cell()); + } + + // get next partition values for next refresh + Set nextPartitionValues = Sets.newHashSet(); + while (iterator.hasNext()) { PCellWithName pCell = iterator.next(); nextPartitionValues.add((PListCell) pCell.cell()); - toRefreshPartitions.remove(pCell); + iterator.remove(); } logger.info("Filter partitions by refresh number, ttl_number:{}, result:{}, remains:{}", - refreshNumber, toRefreshPartitions, nextPartitionValues); + partitionRefreshNumber, toRefreshPartitions, nextPartitionValues); if (CollectionUtils.isEmpty(nextPartitionValues)) { return; } @@ -467,26 +467,6 @@ public final class MVPCTRefreshListPartitioner extends MVPCTRefreshPartitioner { } } - public int getAdaptivePartitionRefreshNumber(Iterator iterator) throws MVAdaptiveRefreshException { - Map>> mvToBaseNameRefs = mvContext.getMvRefBaseTableIntersectedPartitions(); - MVRefreshPartitionSelector mvRefreshPartitionSelector = - new MVRefreshPartitionSelector(Config.mv_max_rows_per_refresh, Config.mv_max_bytes_per_refresh, - Config.mv_max_partitions_num_per_refresh, mvContext.getExternalRefBaseTableMVPartitionMap()); - int adaptiveRefreshNumber = 0; - while (iterator.hasNext()) { - PCellWithName cellWithName = iterator.next(); - String partitionName = cellWithName.name(); - Map> refPartitionInfos = mvToBaseNameRefs.get(partitionName); - if (mvRefreshPartitionSelector.canAddPartition(refPartitionInfos)) { - mvRefreshPartitionSelector.addPartition(refPartitionInfos); - adaptiveRefreshNumber++; - } else { - break; - } - } - return adaptiveRefreshNumber; - } - private void addListPartitions(Database database, MaterializedView materializedView, Map adds, Map partitionProperties, DistributionDesc distributionDesc) { diff --git a/fe/fe-core/src/main/java/com/starrocks/scheduler/mv/MVPCTRefreshNonPartitioner.java b/fe/fe-core/src/main/java/com/starrocks/scheduler/mv/MVPCTRefreshNonPartitioner.java index fc89eabe2f4..c2171fb5acc 100644 --- a/fe/fe-core/src/main/java/com/starrocks/scheduler/mv/MVPCTRefreshNonPartitioner.java +++ b/fe/fe-core/src/main/java/com/starrocks/scheduler/mv/MVPCTRefreshNonPartitioner.java @@ -27,7 +27,6 @@ import com.starrocks.sql.common.PCellNone; import com.starrocks.sql.common.PCellSortedSet; import com.starrocks.sql.common.PCellWithName; -import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; @@ -94,20 +93,11 @@ public final class MVPCTRefreshNonPartitioner extends MVPCTRefreshPartitioner { } @Override - public void filterPartitionByRefreshNumber(PCellSortedSet mvPartitionsToRefresh) { + public void filterPartitionByRefreshNumber(PCellSortedSet mvPartitionsToRefresh, + MaterializedView.PartitionRefreshStrategy partitionRefreshStrategy) { // do nothing } - @Override - public void filterPartitionByAdaptiveRefreshNumber(PCellSortedSet mvPartitionsToRefresh) { - // do nothing - } - - @Override - protected int getAdaptivePartitionRefreshNumber(Iterator partitionNameIter) throws MVAdaptiveRefreshException { - return 0; - } - @Override public boolean isCalcPotentialRefreshPartition(Map baseChangedPartitionNames, PCellSortedSet mvPartitions) { diff --git a/fe/fe-core/src/main/java/com/starrocks/scheduler/mv/MVPCTRefreshPartitioner.java b/fe/fe-core/src/main/java/com/starrocks/scheduler/mv/MVPCTRefreshPartitioner.java index 838660f103e..77136abf93b 100644 --- a/fe/fe-core/src/main/java/com/starrocks/scheduler/mv/MVPCTRefreshPartitioner.java +++ b/fe/fe-core/src/main/java/com/starrocks/scheduler/mv/MVPCTRefreshPartitioner.java @@ -172,18 +172,14 @@ public abstract class MVPCTRefreshPartitioner { */ public abstract PCellSortedSet getMVPartitionNamesWithTTL(boolean isAutoRefresh) throws AnalysisException; - /** - * Filter to refresh partitions by adaptive refresh number. - * @param mvPartitionsToRefresh: mv partitions to refresh. - */ - public abstract void filterPartitionByAdaptiveRefreshNumber(PCellSortedSet mvPartitionsToRefresh); - /** * Filter to refresh partitions by partition refresh number. * @param partitionsToRefresh: partitions to refresh. */ @VisibleForTesting - public abstract void filterPartitionByRefreshNumber(PCellSortedSet partitionsToRefresh); + public abstract void filterPartitionByRefreshNumber( + PCellSortedSet partitionsToRefresh, + MaterializedView.PartitionRefreshStrategy refreshStrategy); /** * Check whether to calculate the potential partitions to refresh or not. When the base table changed partitions @@ -308,65 +304,29 @@ public abstract class MVPCTRefreshPartitioner { if (mvToRefreshedPartitions.isEmpty() || mvToRefreshedPartitions.size() <= 1) { return; } + // refresh all partition when it's a sync refresh, otherwise updated partitions may be lost. + ExecuteOption executeOption = mvContext.getExecuteOption(); + if (executeOption != null && executeOption.getIsSync()) { + return; + } + // ignore if mv is not partitioned. + if (!mv.isPartitionedTable()) { + return; + } // filter partitions by partition refresh strategy - final MaterializedView.PartitionRefreshStrategy partitionRefreshStrategy = mv.getPartitionRefreshStrategy(); boolean hasUnsupportedTableType = mv.getBaseTableTypes().stream() .anyMatch(type -> !SUPPORTED_TABLE_TYPES_FOR_ADAPTIVE_MV_REFRESH.contains(type)); if (hasUnsupportedTableType) { - logger.warn("Materialized view {} contains unsupported external tables. Using default refresh strategy.", - mv.getId()); - filterPartitionWithStrict(mvToRefreshedPartitions); + filterPartitionByRefreshNumber(mvToRefreshedPartitions, MaterializedView.PartitionRefreshStrategy.STRICT); } else { - switch (partitionRefreshStrategy) { - case ADAPTIVE: - filterPartitionWithAdaptive(mvToRefreshedPartitions); - break; - case STRICT: - default: - // Only refresh the first partition refresh number partitions, other partitions will generate new tasks - filterPartitionWithStrict(mvToRefreshedPartitions); - } + MaterializedView.PartitionRefreshStrategy partitionRefreshStrategy = mv.getPartitionRefreshStrategy(); + filterPartitionByRefreshNumber(mvToRefreshedPartitions, partitionRefreshStrategy); } logger.info("after filterPartitionByAdaptive, partitionsToRefresh: {}", mvToRefreshedPartitions); } - public void filterPartitionWithStrict(PCellSortedSet partitionsToRefresh) { - // refresh all partition when it's a sync refresh, otherwise updated partitions may be lost. - ExecuteOption executeOption = mvContext.getExecuteOption(); - if (executeOption != null && executeOption.getIsSync()) { - return; - } - // ignore if mv is not partitioned. - if (!mv.isPartitionedTable()) { - return; - } - // ignore if partition_fresh_limit is not set - int partitionRefreshNumber = mv.getTableProperty().getPartitionRefreshNumber(); - if (partitionRefreshNumber <= 0) { - return; - } - - // do filter actions - filterPartitionByRefreshNumber(partitionsToRefresh); - } - - public void filterPartitionWithAdaptive(PCellSortedSet partitionsToRefresh) { - // refresh all partition when it's a sync refresh, otherwise updated partitions may be lost. - ExecuteOption executeOption = mvContext.getExecuteOption(); - if (executeOption != null && executeOption.getIsSync()) { - return; - } - // ignore if mv is not partitioned. - if (!mv.isPartitionedTable()) { - return; - } - - // do filter actions - filterPartitionByAdaptiveRefreshNumber(partitionsToRefresh); - } - /** * Determines the number of partitions to refresh based on the given refresh strategy. * @@ -386,29 +346,86 @@ public abstract class MVPCTRefreshPartitioner { * may fail due to missing or invalid metadata. In such cases, the method automatically * falls back to the STRICT strategy to ensure the MV can still be refreshed correctly. * - * @param sortedPartitionIterator Iterator over sorted partition names to be refreshed. - * @param refreshStrategy The refresh strategy: either ADAPTIVE or STRICT. + * @param toRefreshPartitions sorted partition names to be refreshed. + * @param refreshStrategy The refresh strategy: either ADAPTIVE or STRICT. * @return The number of partitions to refresh. */ - public int getRefreshNumberByMode(Iterator sortedPartitionIterator, - MaterializedView.PartitionRefreshStrategy refreshStrategy) { + public int getPartitionRefreshNumberAdaptive(PCellSortedSet toRefreshPartitions, + MaterializedView.PartitionRefreshStrategy refreshStrategy) { try { switch (refreshStrategy) { case ADAPTIVE: - return getAdaptivePartitionRefreshNumber(sortedPartitionIterator); + return getAdaptivePartitionRefreshNumber(toRefreshPartitions); case STRICT: default: - return mv.getTableProperty().getPartitionRefreshNumber(); + return getRefreshNumberByDefaultMode(toRefreshPartitions); } - } catch (MVAdaptiveRefreshException e) { + } catch (Exception e) { logger.warn("Adaptive refresh failed for mode '{}', falling back to STRICT mode. Reason: {}", refreshStrategy, e.getMessage(), e); - return mv.getTableProperty().getPartitionRefreshNumber(); + return getRefreshNumberByDefaultMode(toRefreshPartitions); } } - protected abstract int getAdaptivePartitionRefreshNumber(Iterator partitionNameIter) - throws MVAdaptiveRefreshException; + /** + * Get partition refresh number by default mode, which is used to be compatible with old versions and the following + * conditions are met: + * - partition_refresh_strategy is not set strict by user + * - partition_refresh_number is not set by user + */ + private int getRefreshNumberByDefaultMode(PCellSortedSet torRefreshPartitions) { + int defaultPartitionRefreshNumber = mv.getTableProperty().getPartitionRefreshNumber(); + TableProperty tableProperty = mv.getTableProperty(); + // if user has set partition_refresh_number, use it directly + if (tableProperty == null || tableProperty.isSetPartitionRefreshNumber() + || defaultPartitionRefreshNumber != Config.default_mv_partition_refresh_number) { + return defaultPartitionRefreshNumber; + } + MaterializedView.PartitionRefreshStrategy partitionRefreshStrategy = + MaterializedView.PartitionRefreshStrategy.of(tableProperty.getPartitionRefreshStrategy()); + // if partition_refresh_strategy is strict, use partition_refresh_number directly + if (tableProperty.isSetPartitionRefreshStrategy() + && MaterializedView.PartitionRefreshStrategy.STRICT.equals(partitionRefreshStrategy)) { + return defaultPartitionRefreshNumber; + } + // if the number of partitions to refresh is not too many, use it directly + int toRefreshPartitionNum = torRefreshPartitions.size(); + if (toRefreshPartitionNum <= Config.mv_max_partitions_num_per_refresh) { + return defaultPartitionRefreshNumber; + } + // to be compatible with old version, use adaptive mode if partition_refresh_strategy is not set + int toRefreshPartitionNumPerTaskRun = + (int) Math.ceil((double) toRefreshPartitionNum / Math.max(1, Config.task_runs_concurrency)); + + // if there are too many partitions to refresh, limit the number of partitions to refresh per task run + int finalToRefreshPartitionNumPerTaskRun = Math.min(toRefreshPartitionNumPerTaskRun, + Config.mv_max_partitions_num_per_refresh); + return finalToRefreshPartitionNumPerTaskRun; + } + + public int getAdaptivePartitionRefreshNumber(PCellSortedSet toRefreshPartitions) + throws MVAdaptiveRefreshException { + if (!mv.isPartitionedTable()) { + return toRefreshPartitions.size(); + } + + Map>> mvToBaseNameRefs = mvContext.getMvRefBaseTableIntersectedPartitions(); + MVRefreshPartitionSelector mvRefreshPartitionSelector = + new MVRefreshPartitionSelector(Config.mv_max_rows_per_refresh, Config.mv_max_bytes_per_refresh, + Config.mv_max_partitions_num_per_refresh, mvContext.getExternalRefBaseTableMVPartitionMap()); + int adaptiveRefreshNumber = 0; + for (PCellWithName pCellWithName : toRefreshPartitions.partitions()) { + String mvRefreshPartition = pCellWithName.name(); + Map> refBaseTablesPartitions = mvToBaseNameRefs.get(mvRefreshPartition); + if (mvRefreshPartitionSelector.canAddPartition(refBaseTablesPartitions)) { + mvRefreshPartitionSelector.addPartition(refBaseTablesPartitions); + adaptiveRefreshNumber++; + } else { + break; + } + } + return adaptiveRefreshNumber; + } /** * Check whether the base table is supported partition refresh or not. @@ -657,7 +674,9 @@ public abstract class MVPCTRefreshPartitioner { getExpiredPartitionsWithRetention(ttlCondition, toRefreshPartitionMap, isMockPartitionIds); if (CollectionUtils.isNotEmpty(toRemovePartitions)) { toRemovePartitions.stream() - .forEach(p -> toRefreshPartitions.remove(PCellWithName.of(p, toRefreshPartitionMap.get(p)))); + .filter(p -> toRefreshPartitionMap.containsKey(p)) + .map(p -> PCellWithName.of(p, toRefreshPartitionMap.get(p))) + .forEach(toRefreshPartitions::remove); } } @@ -722,4 +741,15 @@ public abstract class MVPCTRefreshPartitioner { return result; } + /** + * Get the iterator of to refresh partitions according to config. + */ + public Iterator getToRefreshPartitionsIterator(PCellSortedSet toRefreshPartitions, + boolean isAscending) { + if (isAscending) { + return toRefreshPartitions.iterator(); + } else { + return toRefreshPartitions.descendingIterator(); + } + } } diff --git a/fe/fe-core/src/main/java/com/starrocks/scheduler/mv/MVPCTRefreshRangePartitioner.java b/fe/fe-core/src/main/java/com/starrocks/scheduler/mv/MVPCTRefreshRangePartitioner.java index 8c964db2412..af7f6c6d137 100644 --- a/fe/fe-core/src/main/java/com/starrocks/scheduler/mv/MVPCTRefreshRangePartitioner.java +++ b/fe/fe-core/src/main/java/com/starrocks/scheduler/mv/MVPCTRefreshRangePartitioner.java @@ -64,7 +64,6 @@ import org.apache.logging.log4j.Logger; import java.util.Collections; import java.util.Iterator; -import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Optional; @@ -413,51 +412,33 @@ public final class MVPCTRefreshRangePartitioner extends MVPCTRefreshPartitioner } @Override - public void filterPartitionByRefreshNumber(PCellSortedSet mvPartitionsToRefresh) { - filterPartitionByRefreshNumberInternal(mvPartitionsToRefresh, - MaterializedView.PartitionRefreshStrategy.STRICT); - } - - @Override - public void filterPartitionByAdaptiveRefreshNumber(PCellSortedSet mvPartitionsToRefresh) { - filterPartitionByRefreshNumberInternal(mvPartitionsToRefresh, MaterializedView.PartitionRefreshStrategy.ADAPTIVE); - } - - public void filterPartitionByRefreshNumberInternal(PCellSortedSet mvPartitionsToRefresh, - MaterializedView.PartitionRefreshStrategy refreshStrategy) { + public void filterPartitionByRefreshNumber(PCellSortedSet mvPartitionsToRefresh, + MaterializedView.PartitionRefreshStrategy refreshStrategy) { int partitionRefreshNumber = mv.getTableProperty().getPartitionRefreshNumber(); Map> mvRangePartitionMap = mv.getRangePartitionMap(); if (partitionRefreshNumber <= 0 || partitionRefreshNumber >= mvRangePartitionMap.size()) { return; } - Iterator inputIter = mvPartitionsToRefresh.iterator(); - while (inputIter.hasNext()) { - PCellWithName pCellWithName = inputIter.next(); - String mvPartitionName = pCellWithName.name(); - // skip if partition is not in the mv's partition range map - if (!mvRangePartitionMap.containsKey(mvPartitionName)) { - logger.warn("Partition {} is not in the materialized view's partition range map, " + - "remove it from refresh list", mvPartitionName); - mvPartitionsToRefresh.remove(pCellWithName); - } + // remove invalid cells from the input to-refresh partitions + mvPartitionsToRefresh + .stream() + .filter(pCell -> !mvRangePartitionMap.containsKey(pCell.name())) + .forEach(mvPartitionsToRefresh::remove); + + Iterator iterator = getToRefreshPartitionsIterator(mvPartitionsToRefresh, + Config.materialized_view_refresh_ascending); + // dynamically get the number of partitions to be refreshed this time + partitionRefreshNumber = getPartitionRefreshNumberAdaptive(mvPartitionsToRefresh, refreshStrategy); + if (partitionRefreshNumber <= 0 || mvPartitionsToRefresh.size() <= partitionRefreshNumber) { + return; } - LinkedList sortedPartition = mvPartitionsToRefresh - .stream() - .collect(Collectors.toCollection(LinkedList::new)); - Iterator sortedIterator = Config.materialized_view_refresh_ascending - ? sortedPartition.iterator() : sortedPartition.descendingIterator(); - // dynamically get the number of partitions to be refreshed this time - partitionRefreshNumber = getRefreshNumberByMode(sortedIterator, refreshStrategy); - - PCellWithName tmpPCellWithName = null; - for (int i = 0; i < partitionRefreshNumber; i++) { - if (sortedIterator.hasNext()) { - tmpPCellWithName = sortedIterator.next(); - sortedIterator.remove(); - } - + int i = 0; + while (i++ < partitionRefreshNumber && iterator.hasNext()) { + PCellWithName pCell = iterator.next(); + logger.debug("Materialized view [{}] to refresh partition name {}, value {}", + mv.getName(), pCell.name(), pCell.cell()); // NOTE: if mv's need to refresh partitions in the many-to-many mappings, no need to filter to // avoid data lose. // eg: @@ -477,41 +458,11 @@ public final class MVPCTRefreshRangePartitioner extends MVPCTRefreshPartitioner // BTW, since the refresh has already scanned the needed base tables' data, it's better to update // more mv's partitions as more as possible. // TODO: But it may cause much memory to refresh many partitions, support fine-grained partition refresh later. - if (!mvToRefreshPotentialPartitions.isEmpty() && mvToRefreshPotentialPartitions.contains(tmpPCellWithName.name())) { + if (!mvToRefreshPotentialPartitions.isEmpty() && mvToRefreshPotentialPartitions.contains(pCell.name())) { return; } } - if (!Config.materialized_view_refresh_ascending) { - sortedIterator = sortedPartition.iterator(); - } - setNextPartitionStartAndEnd(mvPartitionsToRefresh, sortedIterator); - } - public int getAdaptivePartitionRefreshNumber( - Iterator iterator) throws MVAdaptiveRefreshException { - Map>> mvToBaseNameRefs = mvContext.getMvRefBaseTableIntersectedPartitions(); - MVRefreshPartitionSelector mvRefreshPartitionSelector = - new MVRefreshPartitionSelector(Config.mv_max_rows_per_refresh, Config.mv_max_bytes_per_refresh, - Config.mv_max_partitions_num_per_refresh, mvContext.getExternalRefBaseTableMVPartitionMap()); - - int adaptiveRefreshNumber = 0; - while (iterator.hasNext()) { - PCellWithName pCellWithName = iterator.next(); - String mvRefreshPartition = pCellWithName.name(); - Map> refBaseTablesPartitions = mvToBaseNameRefs.get(mvRefreshPartition); - if (mvRefreshPartitionSelector.canAddPartition(refBaseTablesPartitions)) { - mvRefreshPartitionSelector.addPartition(refBaseTablesPartitions); - iterator.remove(); - adaptiveRefreshNumber++; - } else { - break; - } - } - return adaptiveRefreshNumber; - } - - private void setNextPartitionStartAndEnd(PCellSortedSet partitionsToRefresh, - Iterator iterator) { String nextPartitionStart = null; PCellWithName end = null; if (iterator.hasNext()) { @@ -520,12 +471,11 @@ public final class MVPCTRefreshRangePartitioner extends MVPCTRefreshPartitioner Range partitionKeyRange = pRangeCell.getRange(); nextPartitionStart = AnalyzerUtils.parseLiteralExprToDateString(partitionKeyRange.lowerEndpoint(), 0); end = start; - partitionsToRefresh.remove(end); + iterator.remove(); } while (iterator.hasNext()) { end = iterator.next(); iterator.remove(); - partitionsToRefresh.remove(end); } if (!mvRefreshParams.isTentative()) { diff --git a/fe/fe-core/src/main/java/com/starrocks/sql/common/PCellSortedSet.java b/fe/fe-core/src/main/java/com/starrocks/sql/common/PCellSortedSet.java index abe3d07c8ce..609b2976b3c 100644 --- a/fe/fe-core/src/main/java/com/starrocks/sql/common/PCellSortedSet.java +++ b/fe/fe-core/src/main/java/com/starrocks/sql/common/PCellSortedSet.java @@ -19,6 +19,7 @@ import com.starrocks.common.Config; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.NavigableSet; import java.util.Objects; import java.util.Set; import java.util.SortedSet; @@ -92,6 +93,10 @@ public record PCellSortedSet(SortedSet partitions) { partitions.forEach(action); } + public Iterator descendingIterator() { + return ((NavigableSet) partitions).descendingIterator(); + } + public Iterator iterator() { return partitions.iterator(); } diff --git a/fe/fe-core/src/main/java/com/starrocks/sql/common/PRangeCell.java b/fe/fe-core/src/main/java/com/starrocks/sql/common/PRangeCell.java index 7248866abce..2c4132fb322 100644 --- a/fe/fe-core/src/main/java/com/starrocks/sql/common/PRangeCell.java +++ b/fe/fe-core/src/main/java/com/starrocks/sql/common/PRangeCell.java @@ -117,9 +117,14 @@ public final class PRangeCell extends PCell { @Override public String toString() { - return "PRangeCell{" + - "range=" + range + - '}'; + return "[%s-%s]".formatted(toString(range.lowerEndpoint()), toString(range.upperEndpoint())); + } + + private String toString(PartitionKey partitionKey) { + if (partitionKey == null) { + return "null"; + } + return partitionKey.toSql(); } public static Map> toRangeMap(Map input) { diff --git a/fe/fe-core/src/test/java/com/starrocks/catalog/MaterializedViewTest.java b/fe/fe-core/src/test/java/com/starrocks/catalog/MaterializedViewTest.java index 08d73fba647..36128dc9a03 100644 --- a/fe/fe-core/src/test/java/com/starrocks/catalog/MaterializedViewTest.java +++ b/fe/fe-core/src/test/java/com/starrocks/catalog/MaterializedViewTest.java @@ -1221,8 +1221,6 @@ public class MaterializedViewTest extends StarRocksTestBase { @Test public void testPartitionRefreshStrategy() { - Assertions.assertEquals(MaterializedView.PartitionRefreshStrategy.defaultValue(), - MaterializedView.PartitionRefreshStrategy.valueOf("STRICT")); String sql = "create materialized view test_mv1 " + "DISTRIBUTED BY HASH(`k2`) BUCKETS 3 \n" + "REFRESH MANUAL\n" + @@ -1236,7 +1234,7 @@ public class MaterializedViewTest extends StarRocksTestBase { Database db = GlobalStateMgr.getCurrentState().getLocalMetastore().getDb("test"); MaterializedView mv = ((MaterializedView) GlobalStateMgr.getCurrentState().getLocalMetastore() .getTable(db.getFullName(), "test_mv1")); - Assertions.assertEquals(mv.getPartitionRefreshStrategy(), MaterializedView.PartitionRefreshStrategy.STRICT); + Assertions.assertEquals(mv.getPartitionRefreshStrategy(), MaterializedView.PartitionRefreshStrategy.ADAPTIVE); }); } } diff --git a/fe/fe-core/src/test/java/com/starrocks/planner/QueryCacheAndMVTest.java b/fe/fe-core/src/test/java/com/starrocks/planner/QueryCacheAndMVTest.java index f542cce74fb..12c3144de06 100644 --- a/fe/fe-core/src/test/java/com/starrocks/planner/QueryCacheAndMVTest.java +++ b/fe/fe-core/src/test/java/com/starrocks/planner/QueryCacheAndMVTest.java @@ -177,8 +177,7 @@ public class QueryCacheAndMVTest extends MVTestBase { "DISTRIBUTED BY HASH(`id`) BUCKETS 10\n" + "REFRESH DEFERRED MANUAL\n" + "PROPERTIES (\n" + - "\"replication_num\" = \"1\",\n" + - "\"storage_medium\" = \"HDD\"\n" + + "\"replication_num\" = \"1\"" + ")\n" + "AS SELECT id, data, ts FROM `iceberg0`.`partitioned_transforms_db`.`t0_year` as a;"); Database testDb = GlobalStateMgr.getCurrentState().getLocalMetastore().getDb("test"); diff --git a/fe/fe-core/src/test/java/com/starrocks/scheduler/PartitionBasedMvRefreshProcessorOlapPart2Test.java b/fe/fe-core/src/test/java/com/starrocks/scheduler/PartitionBasedMvRefreshProcessorOlapPart2Test.java index c811bf53062..8a6a9adb151 100644 --- a/fe/fe-core/src/test/java/com/starrocks/scheduler/PartitionBasedMvRefreshProcessorOlapPart2Test.java +++ b/fe/fe-core/src/test/java/com/starrocks/scheduler/PartitionBasedMvRefreshProcessorOlapPart2Test.java @@ -418,8 +418,6 @@ public class PartitionBasedMvRefreshProcessorOlapPart2Test extends MVTestBase { Database testDb = GlobalStateMgr.getCurrentState().getLocalMetastore().getDb("test"); MaterializedView mv = ((MaterializedView) testDb.getTable(mvName)); TaskManager tm = GlobalStateMgr.getCurrentState().getTaskManager(); - long taskId = tm.getTask(TaskBuilder.getMvTaskName(mv.getId())).getId(); - TaskRunScheduler taskRunScheduler = tm.getTaskRunScheduler(); executeInsertSql(connectContext, "insert into tbl6 partition(p1) values('2022-01-02',2,10);"); executeInsertSql(connectContext, "insert into tbl6 partition(p2) values('2022-02-02',2,10);"); @@ -428,15 +426,12 @@ public class PartitionBasedMvRefreshProcessorOlapPart2Test extends MVTestBase { taskRunProperties.put(TaskRun.FORCE, Boolean.toString(true)); Task task = TaskBuilder.buildMvTask(mv, testDb.getFullName()); TaskRun taskRun = TaskRunBuilder.newBuilder(task).build(); - initAndExecuteTaskRun(taskRun); + executeTaskRun(taskRun); + TGetTasksParams params = new TGetTasksParams(); params.setTask_name(task.getName()); List statuses = tm.getMatchedTaskRunStatus(params); - while (statuses.size() != 1) { - statuses = tm.getMatchedTaskRunStatus(params); - Thread.sleep(100); - } - Assertions.assertEquals(1, statuses.size()); + Assertions.assertEquals(2, statuses.size()); TaskRunStatus status = statuses.get(0); // default priority for next refresh batch is Constants.TaskRunPriority.HIGHER.value() Assertions.assertEquals(Constants.TaskRunPriority.HIGHER.value(), status.getPriority()); @@ -487,16 +482,13 @@ public class PartitionBasedMvRefreshProcessorOlapPart2Test extends MVTestBase { Task task = TaskBuilder.buildMvTask(mv, testDb.getFullName()); ExecuteOption executeOption = new ExecuteOption(70, false, new HashMap<>()); TaskRun taskRun = TaskRunBuilder.newBuilder(task).setExecuteOption(executeOption).build(); - initAndExecuteTaskRun(taskRun); + executeTaskRun(taskRun); + TGetTasksParams params = new TGetTasksParams(); params.setTask_name(task.getName()); TaskManager tm = GlobalStateMgr.getCurrentState().getTaskManager(); List statuses = tm.getMatchedTaskRunStatus(params); - while (statuses.size() != 1) { - statuses = tm.getMatchedTaskRunStatus(params); - Thread.sleep(100); - } - Assertions.assertEquals(1, statuses.size()); + Assertions.assertEquals(2, statuses.size()); TaskRunStatus status = statuses.get(0); // the priority for next refresh batch is 70 which is specified in executeOption Assertions.assertEquals(70, status.getPriority()); diff --git a/fe/fe-core/src/test/java/com/starrocks/scheduler/PartitionBasedMvRefreshProcessorOlapTest.java b/fe/fe-core/src/test/java/com/starrocks/scheduler/PartitionBasedMvRefreshProcessorOlapTest.java index 6f58ee106e9..ef9b6a96d88 100644 --- a/fe/fe-core/src/test/java/com/starrocks/scheduler/PartitionBasedMvRefreshProcessorOlapTest.java +++ b/fe/fe-core/src/test/java/com/starrocks/scheduler/PartitionBasedMvRefreshProcessorOlapTest.java @@ -988,7 +988,8 @@ public class PartitionBasedMvRefreshProcessorOlapTest extends MVTestBase { MVPCTBasedRefreshProcessor processor = getPartitionBasedRefreshProcessor(taskRun); MVPCTRefreshPartitioner partitioner = processor.getMvRefreshPartitioner(); PCellSortedSet mvToRefreshPartitionNames = getMVPCellWithNames(mv, mv.getPartitionNames()); - partitioner.filterPartitionByAdaptiveRefreshNumber(mvToRefreshPartitionNames); + partitioner.filterPartitionByRefreshNumber(mvToRefreshPartitionNames, + MaterializedView.PartitionRefreshStrategy.ADAPTIVE); MvTaskRunContext mvContext = processor.getMvContext(); Assertions.assertNull(mvContext.getNextPartitionStart()); Assertions.assertNull(mvContext.getNextPartitionEnd()); @@ -1018,7 +1019,8 @@ public class PartitionBasedMvRefreshProcessorOlapTest extends MVTestBase { MVPCTBasedRefreshProcessor processor = getPartitionBasedRefreshProcessor(taskRun); MVPCTRefreshPartitioner partitioner = processor.getMvRefreshPartitioner(); - partitioner.filterPartitionByAdaptiveRefreshNumber(getMVPCellWithNames(mv, mv.getPartitionNames())); + partitioner.filterPartitionByRefreshNumber(getMVPCellWithNames(mv, mv.getPartitionNames()), + MaterializedView.PartitionRefreshStrategy.ADAPTIVE); MvTaskRunContext mvContext = processor.getMvContext(); Assertions.assertNull(mvContext.getNextPartitionStart()); Assertions.assertNull(mvContext.getNextPartitionEnd()); @@ -1061,7 +1063,8 @@ public class PartitionBasedMvRefreshProcessorOlapTest extends MVTestBase { MVPCTBasedRefreshProcessor processor = getPartitionBasedRefreshProcessor(taskRun); MVPCTRefreshPartitioner partitioner = processor.getMvRefreshPartitioner(); - partitioner.filterPartitionByAdaptiveRefreshNumber(getMVPCellWithNames(mv, mv.getPartitionNames())); + partitioner.filterPartitionByRefreshNumber(getMVPCellWithNames(mv, mv.getPartitionNames()), + MaterializedView.PartitionRefreshStrategy.ADAPTIVE); MvTaskRunContext mvContext = processor.getMvContext(); Assertions.assertNull(mvContext.getNextPartitionStart()); Assertions.assertNull(mvContext.getNextPartitionEnd()); @@ -1120,19 +1123,22 @@ public class PartitionBasedMvRefreshProcessorOlapTest extends MVTestBase { // round 1 PCellSortedSet toRefresh = getMVPCellWithNames(mv, allPartitions); - partitioner.filterPartitionByRefreshNumber(toRefresh); + partitioner.filterPartitionByRefreshNumber(toRefresh, + MaterializedView.PartitionRefreshStrategy.STRICT); assertPCellNameEquals("p0", toRefresh); refreshedPartitions.addAll(getPartitionNames(toRefresh)); // round 2 toRefresh = getMVPCellWithNames(mv, SetUtils.disjunction(allPartitions, refreshedPartitions)); - partitioner.filterPartitionByRefreshNumber(toRefresh); + partitioner.filterPartitionByRefreshNumber(toRefresh, + MaterializedView.PartitionRefreshStrategy.STRICT); assertPCellNameEquals("p1", toRefresh); refreshedPartitions.addAll(getPartitionNames(toRefresh)); // round 3 toRefresh = getMVPCellWithNames(mv, SetUtils.disjunction(allPartitions, refreshedPartitions)); - partitioner.filterPartitionByRefreshNumber(toRefresh); + partitioner.filterPartitionByRefreshNumber(toRefresh, + MaterializedView.PartitionRefreshStrategy.STRICT); assertPCellNameEquals("p2", toRefresh); refreshedPartitions.addAll(getPartitionNames(toRefresh)); } @@ -1144,31 +1150,36 @@ public class PartitionBasedMvRefreshProcessorOlapTest extends MVTestBase { // round 1 PCellSortedSet toRefresh = getMVPCellWithNames(mv, allPartitions); - partitioner.filterPartitionByRefreshNumber(toRefresh); + partitioner.filterPartitionByRefreshNumber(toRefresh, + MaterializedView.PartitionRefreshStrategy.STRICT); assertPCellNameEquals("p4", toRefresh); refreshedPartitions.addAll(getPartitionNames(toRefresh)); // round 2 toRefresh = getMVPCellWithNames(mv, SetUtils.disjunction(allPartitions, refreshedPartitions)); - partitioner.filterPartitionByRefreshNumber(toRefresh); + partitioner.filterPartitionByRefreshNumber(toRefresh, + MaterializedView.PartitionRefreshStrategy.STRICT); assertPCellNameEquals("p3", toRefresh); refreshedPartitions.addAll(getPartitionNames(toRefresh)); // round 3 toRefresh = getMVPCellWithNames(mv, SetUtils.disjunction(allPartitions, refreshedPartitions)); - partitioner.filterPartitionByRefreshNumber(toRefresh); + partitioner.filterPartitionByRefreshNumber(toRefresh, + MaterializedView.PartitionRefreshStrategy.STRICT); assertPCellNameEquals("p2", toRefresh); refreshedPartitions.addAll(getPartitionNames(toRefresh)); // round 4 toRefresh = getMVPCellWithNames(mv, SetUtils.disjunction(allPartitions, refreshedPartitions)); - partitioner.filterPartitionByRefreshNumber(toRefresh); + partitioner.filterPartitionByRefreshNumber(toRefresh, + MaterializedView.PartitionRefreshStrategy.STRICT); assertPCellNameEquals("p1", toRefresh); refreshedPartitions.addAll(getPartitionNames(toRefresh)); // round 5 toRefresh = getMVPCellWithNames(mv, SetUtils.disjunction(allPartitions, refreshedPartitions)); - partitioner.filterPartitionByRefreshNumber(toRefresh); + partitioner.filterPartitionByRefreshNumber(toRefresh, + MaterializedView.PartitionRefreshStrategy.STRICT); assertPCellNameEquals("p0", toRefresh); refreshedPartitions.addAll(getPartitionNames(toRefresh)); Config.materialized_view_refresh_ascending = true; diff --git a/fe/fe-core/src/test/java/com/starrocks/scheduler/mv/MVPCTRefreshNonPartitionerTest.java b/fe/fe-core/src/test/java/com/starrocks/scheduler/mv/MVPCTRefreshNonPartitionerTest.java index 2e2c4abd1dd..450469df2c8 100644 --- a/fe/fe-core/src/test/java/com/starrocks/scheduler/mv/MVPCTRefreshNonPartitionerTest.java +++ b/fe/fe-core/src/test/java/com/starrocks/scheduler/mv/MVPCTRefreshNonPartitionerTest.java @@ -18,14 +18,11 @@ import com.starrocks.catalog.Database; import com.starrocks.catalog.MaterializedView; import com.starrocks.scheduler.MvTaskRunContext; import com.starrocks.scheduler.TaskRunContext; -import com.starrocks.sql.common.PCellWithName; +import com.starrocks.sql.common.PCellSortedSet; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; import org.mockito.Mockito; -import java.util.Collections; -import java.util.Iterator; - public class MVPCTRefreshNonPartitionerTest { @Test @@ -37,8 +34,7 @@ public class MVPCTRefreshNonPartitionerTest { MVRefreshParams mvRefreshParams = Mockito.mock(MVRefreshParams.class); MVPCTRefreshNonPartitioner job = new MVPCTRefreshNonPartitioner(mvTaskRunContext, taskRunContext, database, mv, mvRefreshParams); - Iterator dummyIter = Collections.emptyIterator(); - int result = job.getAdaptivePartitionRefreshNumber(dummyIter); + int result = job.getAdaptivePartitionRefreshNumber(PCellSortedSet.of()); Assertions.assertEquals(0, result); } } diff --git a/fe/fe-core/src/test/java/com/starrocks/scheduler/mv/MVPCTRefreshRangePartitionerTest.java b/fe/fe-core/src/test/java/com/starrocks/scheduler/mv/MVPCTRefreshRangePartitionerTest.java index cc2fbacf566..46bdc5235a5 100644 --- a/fe/fe-core/src/test/java/com/starrocks/scheduler/mv/MVPCTRefreshRangePartitionerTest.java +++ b/fe/fe-core/src/test/java/com/starrocks/scheduler/mv/MVPCTRefreshRangePartitionerTest.java @@ -31,7 +31,6 @@ import org.mockito.Mockito; import java.util.Arrays; import java.util.HashMap; -import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; @@ -47,6 +46,7 @@ public class MVPCTRefreshRangePartitionerTest { MaterializedView mv = mock(MaterializedView.class); when(mv.getTableProperty()).thenReturn(mock(TableProperty.class)); when(mv.getPartitionInfo()).thenReturn(mock(PartitionInfo.class)); + when(mv.isPartitionedTable()).thenReturn(true); OlapTable refTable1 = Mockito.mock(OlapTable.class); Set refTablePartition1 = Set.of("partition1", "partition2"); @@ -67,12 +67,11 @@ public class MVPCTRefreshRangePartitionerTest { // TODO: make range cells List partitions = Arrays.asList(PCellWithName.of("mv_p1", new PCellNone()), PCellWithName.of("mv_p2", new PCellNone())); - Iterator iter = partitions.iterator(); MVRefreshParams mvRefreshParams = new MVRefreshParams(mv, new HashMap<>()); MVPCTRefreshRangePartitioner partitioner = new MVPCTRefreshRangePartitioner(mvContext, null, null, mv, mvRefreshParams); MVAdaptiveRefreshException exception = Assertions.assertThrows(MVAdaptiveRefreshException.class, - () -> partitioner.getAdaptivePartitionRefreshNumber(iter)); + () -> partitioner.getAdaptivePartitionRefreshNumber(PCellSortedSet.of(partitions))); Assertions.assertTrue(exception.getMessage().contains("Missing too many partition stats")); } diff --git a/fe/fe-core/src/test/java/com/starrocks/sql/optimizer/rule/transformation/materialization/MVTestBase.java b/fe/fe-core/src/test/java/com/starrocks/sql/optimizer/rule/transformation/materialization/MVTestBase.java index fbcfa1f2b87..543bd6dfa95 100644 --- a/fe/fe-core/src/test/java/com/starrocks/sql/optimizer/rule/transformation/materialization/MVTestBase.java +++ b/fe/fe-core/src/test/java/com/starrocks/sql/optimizer/rule/transformation/materialization/MVTestBase.java @@ -38,14 +38,17 @@ import com.starrocks.persist.gson.GsonUtils; import com.starrocks.pseudocluster.PseudoCluster; import com.starrocks.qe.ConnectContext; import com.starrocks.qe.StmtExecutor; +import com.starrocks.scheduler.Constants; import com.starrocks.scheduler.ExecuteOption; import com.starrocks.scheduler.MVTaskRunProcessor; import com.starrocks.scheduler.MvTaskRunContext; +import com.starrocks.scheduler.SubmitResult; import com.starrocks.scheduler.Task; import com.starrocks.scheduler.TaskBuilder; import com.starrocks.scheduler.TaskManager; import com.starrocks.scheduler.TaskRun; import com.starrocks.scheduler.TaskRunBuilder; +import com.starrocks.scheduler.TaskRunManager; import com.starrocks.scheduler.TaskRunProcessor; import com.starrocks.scheduler.mv.BaseTableSnapshotInfo; import com.starrocks.scheduler.mv.MVPCTBasedRefreshProcessor; @@ -101,6 +104,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.TimeUnit; import java.util.regex.Matcher; import java.util.regex.Pattern; import java.util.stream.Collectors; @@ -455,6 +459,17 @@ public abstract class MVTestBase extends StarRocksTestBase { return result; } + protected static void executeTaskRun(TaskRun taskRun) throws Exception { + // ensure taskRun is initialized + taskRun.setStatus(null); + TaskManager taskManager = GlobalStateMgr.getCurrentState().getTaskManager(); + TaskRunManager taskRunManager = taskManager.getTaskRunManager(); + SubmitResult result = taskRunManager.submitTaskRun(taskRun); + Assertions.assertTrue(result.getStatus().equals(SubmitResult.SubmitStatus.SUBMITTED)); + Constants.TaskRunState state = result.getFuture().get(300000, TimeUnit.MILLISECONDS); + Assertions.assertTrue(state.isFinishState()); + } + protected static void initAndExecuteTaskRun(TaskRun taskRun) throws Exception { taskRun.initStatus(UUIDUtil.genUUID().toString(), System.currentTimeMillis()); taskRun.executeTaskRun();