[Enhancement] Change default_mv_partition_refresh_strategy to adaptive by default (#63594)

Signed-off-by: shuming.li <ming.moriarty@gmail.com>
This commit is contained in:
shuming.li 2025-10-11 11:33:11 +08:00 committed by GitHub
parent 78945fde78
commit ee66eb3b3f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
18 changed files with 260 additions and 250 deletions

View File

@ -159,7 +159,16 @@ public class MaterializedView extends OlapTable implements GsonPreProcessable, G
ADAPTIVE;
public static PartitionRefreshStrategy defaultValue() {
return STRICT;
return ADAPTIVE;
}
public static PartitionRefreshStrategy of(String val) {
try {
return PartitionRefreshStrategy.valueOf(val.trim().toUpperCase());
} catch (Exception e) {
LOG.warn("Failed to get partition refresh strategy, use default value", e);
return PartitionRefreshStrategy.defaultValue();
}
}
}
@ -1396,15 +1405,25 @@ public class MaterializedView extends OlapTable implements GsonPreProcessable, G
* Return the partition refresh strategy of the materialized view.
*/
public PartitionRefreshStrategy getPartitionRefreshStrategy() {
if (tableProperty == null || tableProperty.getPartitionRefreshStrategy() == null) {
TableProperty tableProperty = getTableProperty();
if (tableProperty == null) {
return PartitionRefreshStrategy.defaultValue();
}
try {
return PartitionRefreshStrategy.valueOf(tableProperty.getPartitionRefreshStrategy().trim().toUpperCase());
} catch (Exception e) {
LOG.warn("Failed to get partition refresh strategy for materialized view: {}, use default value",
this.getName(), e);
return PartitionRefreshStrategy.defaultValue();
// otherwise, use `partition_refresh_strategy` property.
PartitionRefreshStrategy partitionRefreshStrategy =
PartitionRefreshStrategy.of(tableProperty.getPartitionRefreshStrategy());
if (tableProperty.isSetPartitionRefreshStrategy()
&& !PartitionRefreshStrategy.STRICT.equals(partitionRefreshStrategy)) {
return partitionRefreshStrategy;
} else {
// if mv has `partition_refresh_number` property, use it first.
// for compatibility, if `partition_refresh_number` is set and not equal to 1,
if (tableProperty.isSetPartitionRefreshNumber()) {
return PartitionRefreshStrategy.STRICT;
} else {
return PartitionRefreshStrategy.defaultValue();
}
}
}

View File

@ -209,15 +209,15 @@ public class TableProperty implements Writable, GsonPostProcessable {
// This property only applies to materialized views
// It represents the maximum number of partitions that will be refreshed by a TaskRun refresh
private int partitionRefreshNumber = Config.default_mv_partition_refresh_number;
private int partitionRefreshNumber = INVALID;
// This property only applies to materialized views
// It represents the mode selected to determine the number of partitions to refresh
private String partitionRefreshStrategy = Config.default_mv_partition_refresh_strategy;
private String partitionRefreshStrategy = "";
// This property only applies to materialized views/
// It represents the mode selected to determine how to refresh the materialized view
private String mvRefreshMode = Config.default_mv_refresh_mode;
private String mvRefreshMode = "";
// This property only applies to materialized views
// When using the system to automatically refresh, the maximum range of the most recent partitions will be refreshed.
@ -994,12 +994,21 @@ public class TableProperty implements Writable, GsonPostProcessable {
this.autoRefreshPartitionsLimit = autoRefreshPartitionsLimit;
}
public boolean isSetPartitionRefreshNumber() {
return partitionRefreshNumber != INVALID;
}
public int getPartitionRefreshNumber() {
return partitionRefreshNumber;
return partitionRefreshNumber == INVALID ? Config.default_mv_partition_refresh_number : partitionRefreshNumber;
}
public boolean isSetPartitionRefreshStrategy() {
return !Strings.isNullOrEmpty(partitionRefreshStrategy);
}
public String getPartitionRefreshStrategy() {
return partitionRefreshStrategy;
return Strings.isNullOrEmpty(partitionRefreshStrategy) ? Config.default_mv_partition_refresh_strategy
: partitionRefreshStrategy;
}
public void setPartitionRefreshNumber(int partitionRefreshNumber) {
@ -1015,7 +1024,7 @@ public class TableProperty implements Writable, GsonPostProcessable {
}
public String getMvRefreshMode() {
return mvRefreshMode;
return Strings.isNullOrEmpty(mvRefreshMode) ? Config.default_mv_refresh_mode : mvRefreshMode;
}
public void setResourceGroup(String resourceGroup) {

View File

@ -3396,8 +3396,8 @@ public class Config extends ConfigBase {
@ConfField(mutable = true, comment = "The default try lock timeout for mv refresh to try base table/mv dbs' lock")
public static int mv_refresh_try_lock_timeout_ms = 30 * 1000;
@ConfField(mutable = true, comment = "materialized view can refresh at most 10 partition at a time")
public static int mv_max_partitions_num_per_refresh = 10;
@ConfField(mutable = true, comment = "materialized view can refresh at most 64 partition at a time")
public static int mv_max_partitions_num_per_refresh = 64;
@ConfField(mutable = true, comment = "materialized view can refresh at most 100_000_000 rows of data at a time")
public static long mv_max_rows_per_refresh = 100_000_000L;
@ -3428,8 +3428,9 @@ public class Config extends ConfigBase {
@ConfField(mutable = true)
public static int default_mv_partition_refresh_number = 1;
@ConfField(mutable = true)
public static String default_mv_partition_refresh_strategy = "strict";
@ConfField(mutable = true, comment = "The default refresh strategy for materialized view partition refresh, " +
"adaptive by default")
public static String default_mv_partition_refresh_strategy = "adaptive";
@ConfField(mutable = true)
public static String default_mv_refresh_mode = "pct";

View File

@ -222,6 +222,11 @@ public class TaskRun implements Comparable<TaskRun> {
return Math.min(Math.min(defaultTimeoutS, Config.task_runs_ttl_second), Config.task_ttl_second);
}
@VisibleForTesting
public void setStatus(TaskRunStatus status) {
this.status = status;
}
public Map<String, String> refreshTaskProperties(ConnectContext ctx) {
Map<String, String> newProperties = Maps.newHashMap();
if (task.getSource() != Constants.TaskSource.MV) {

View File

@ -28,6 +28,7 @@ import com.starrocks.scheduler.history.TaskRunHistory;
import com.starrocks.scheduler.persist.TaskRunStatus;
import com.starrocks.scheduler.persist.TaskRunStatusChange;
import com.starrocks.server.GlobalStateMgr;
import org.apache.arrow.util.VisibleForTesting;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@ -58,6 +59,11 @@ public class TaskRunManager implements MemoryTrackable {
this.taskRunScheduler = taskRunScheduler;
}
public SubmitResult submitTaskRun(TaskRun taskRun) {
return submitTaskRun(taskRun, taskRun.getExecuteOption());
}
@VisibleForTesting
public SubmitResult submitTaskRun(TaskRun taskRun, ExecuteOption option) {
LOG.info("submit task run:{}", taskRun);

View File

@ -423,40 +423,40 @@ public final class MVPCTRefreshListPartitioner extends MVPCTRefreshPartitioner {
}
@Override
public void filterPartitionByRefreshNumber(PCellSortedSet mvPartitionsToRefresh) {
filterPartitionByRefreshNumberInternal(mvPartitionsToRefresh, MaterializedView.PartitionRefreshStrategy.STRICT);
}
@Override
public void filterPartitionByAdaptiveRefreshNumber(PCellSortedSet mvPartitionsToRefresh) {
filterPartitionByRefreshNumberInternal(mvPartitionsToRefresh, MaterializedView.PartitionRefreshStrategy.ADAPTIVE);
}
public void filterPartitionByRefreshNumberInternal(PCellSortedSet toRefreshPartitions,
MaterializedView.PartitionRefreshStrategy refreshStrategy) {
public void filterPartitionByRefreshNumber(PCellSortedSet toRefreshPartitions,
MaterializedView.PartitionRefreshStrategy refreshStrategy) {
// filter by partition ttl
filterPartitionsByTTL(toRefreshPartitions, false);
if (toRefreshPartitions == null || toRefreshPartitions.isEmpty()) {
return;
}
Iterator<PCellWithName> iterator = toRefreshPartitions.iterator();
// filter invalid cells from input
toRefreshPartitions.stream()
.filter(cell -> !mv.getListPartitionItems().containsKey(cell.name()))
.forEach(toRefreshPartitions::remove);
// dynamically get the number of partitions to be refreshed this time
int refreshNumber = getRefreshNumberByMode(iterator, refreshStrategy);
if (toRefreshPartitions.size() <= refreshNumber) {
int partitionRefreshNumber = getPartitionRefreshNumberAdaptive(toRefreshPartitions, refreshStrategy);
if (partitionRefreshNumber <= 0 || partitionRefreshNumber >= toRefreshPartitions.size()) {
return;
}
// iterate refreshNumber times
Set<PListCell> nextPartitionValues = Sets.newHashSet();
int i = 0;
while (iterator.hasNext() && i++ < refreshNumber) {
// refresh the recent partitions first
Iterator<PCellWithName> iterator = getToRefreshPartitionsIterator(toRefreshPartitions, false);
while (i++ < partitionRefreshNumber && iterator.hasNext()) {
PCellWithName pCell = iterator.next();
logger.debug("Materialized view [{}] to refresh partition name {}, value {}",
mv.getName(), pCell.name(), pCell.cell());
}
// get next partition values for next refresh
Set<PListCell> nextPartitionValues = Sets.newHashSet();
while (iterator.hasNext()) {
PCellWithName pCell = iterator.next();
nextPartitionValues.add((PListCell) pCell.cell());
toRefreshPartitions.remove(pCell);
iterator.remove();
}
logger.info("Filter partitions by refresh number, ttl_number:{}, result:{}, remains:{}",
refreshNumber, toRefreshPartitions, nextPartitionValues);
partitionRefreshNumber, toRefreshPartitions, nextPartitionValues);
if (CollectionUtils.isEmpty(nextPartitionValues)) {
return;
}
@ -467,26 +467,6 @@ public final class MVPCTRefreshListPartitioner extends MVPCTRefreshPartitioner {
}
}
public int getAdaptivePartitionRefreshNumber(Iterator<PCellWithName> iterator) throws MVAdaptiveRefreshException {
Map<String, Map<Table, Set<String>>> mvToBaseNameRefs = mvContext.getMvRefBaseTableIntersectedPartitions();
MVRefreshPartitionSelector mvRefreshPartitionSelector =
new MVRefreshPartitionSelector(Config.mv_max_rows_per_refresh, Config.mv_max_bytes_per_refresh,
Config.mv_max_partitions_num_per_refresh, mvContext.getExternalRefBaseTableMVPartitionMap());
int adaptiveRefreshNumber = 0;
while (iterator.hasNext()) {
PCellWithName cellWithName = iterator.next();
String partitionName = cellWithName.name();
Map<Table, Set<String>> refPartitionInfos = mvToBaseNameRefs.get(partitionName);
if (mvRefreshPartitionSelector.canAddPartition(refPartitionInfos)) {
mvRefreshPartitionSelector.addPartition(refPartitionInfos);
adaptiveRefreshNumber++;
} else {
break;
}
}
return adaptiveRefreshNumber;
}
private void addListPartitions(Database database, MaterializedView materializedView,
Map<String, PCell> adds, Map<String, String> partitionProperties,
DistributionDesc distributionDesc) {

View File

@ -27,7 +27,6 @@ import com.starrocks.sql.common.PCellNone;
import com.starrocks.sql.common.PCellSortedSet;
import com.starrocks.sql.common.PCellWithName;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
@ -94,20 +93,11 @@ public final class MVPCTRefreshNonPartitioner extends MVPCTRefreshPartitioner {
}
@Override
public void filterPartitionByRefreshNumber(PCellSortedSet mvPartitionsToRefresh) {
public void filterPartitionByRefreshNumber(PCellSortedSet mvPartitionsToRefresh,
MaterializedView.PartitionRefreshStrategy partitionRefreshStrategy) {
// do nothing
}
@Override
public void filterPartitionByAdaptiveRefreshNumber(PCellSortedSet mvPartitionsToRefresh) {
// do nothing
}
@Override
protected int getAdaptivePartitionRefreshNumber(Iterator<PCellWithName> partitionNameIter) throws MVAdaptiveRefreshException {
return 0;
}
@Override
public boolean isCalcPotentialRefreshPartition(Map<Table, PCellSortedSet> baseChangedPartitionNames,
PCellSortedSet mvPartitions) {

View File

@ -172,18 +172,14 @@ public abstract class MVPCTRefreshPartitioner {
*/
public abstract PCellSortedSet getMVPartitionNamesWithTTL(boolean isAutoRefresh) throws AnalysisException;
/**
* Filter to refresh partitions by adaptive refresh number.
* @param mvPartitionsToRefresh: mv partitions to refresh.
*/
public abstract void filterPartitionByAdaptiveRefreshNumber(PCellSortedSet mvPartitionsToRefresh);
/**
* Filter to refresh partitions by partition refresh number.
* @param partitionsToRefresh: partitions to refresh.
*/
@VisibleForTesting
public abstract void filterPartitionByRefreshNumber(PCellSortedSet partitionsToRefresh);
public abstract void filterPartitionByRefreshNumber(
PCellSortedSet partitionsToRefresh,
MaterializedView.PartitionRefreshStrategy refreshStrategy);
/**
* Check whether to calculate the potential partitions to refresh or not. When the base table changed partitions
@ -308,65 +304,29 @@ public abstract class MVPCTRefreshPartitioner {
if (mvToRefreshedPartitions.isEmpty() || mvToRefreshedPartitions.size() <= 1) {
return;
}
// refresh all partition when it's a sync refresh, otherwise updated partitions may be lost.
ExecuteOption executeOption = mvContext.getExecuteOption();
if (executeOption != null && executeOption.getIsSync()) {
return;
}
// ignore if mv is not partitioned.
if (!mv.isPartitionedTable()) {
return;
}
// filter partitions by partition refresh strategy
final MaterializedView.PartitionRefreshStrategy partitionRefreshStrategy = mv.getPartitionRefreshStrategy();
boolean hasUnsupportedTableType = mv.getBaseTableTypes().stream()
.anyMatch(type -> !SUPPORTED_TABLE_TYPES_FOR_ADAPTIVE_MV_REFRESH.contains(type));
if (hasUnsupportedTableType) {
logger.warn("Materialized view {} contains unsupported external tables. Using default refresh strategy.",
mv.getId());
filterPartitionWithStrict(mvToRefreshedPartitions);
filterPartitionByRefreshNumber(mvToRefreshedPartitions, MaterializedView.PartitionRefreshStrategy.STRICT);
} else {
switch (partitionRefreshStrategy) {
case ADAPTIVE:
filterPartitionWithAdaptive(mvToRefreshedPartitions);
break;
case STRICT:
default:
// Only refresh the first partition refresh number partitions, other partitions will generate new tasks
filterPartitionWithStrict(mvToRefreshedPartitions);
}
MaterializedView.PartitionRefreshStrategy partitionRefreshStrategy = mv.getPartitionRefreshStrategy();
filterPartitionByRefreshNumber(mvToRefreshedPartitions, partitionRefreshStrategy);
}
logger.info("after filterPartitionByAdaptive, partitionsToRefresh: {}",
mvToRefreshedPartitions);
}
public void filterPartitionWithStrict(PCellSortedSet partitionsToRefresh) {
// refresh all partition when it's a sync refresh, otherwise updated partitions may be lost.
ExecuteOption executeOption = mvContext.getExecuteOption();
if (executeOption != null && executeOption.getIsSync()) {
return;
}
// ignore if mv is not partitioned.
if (!mv.isPartitionedTable()) {
return;
}
// ignore if partition_fresh_limit is not set
int partitionRefreshNumber = mv.getTableProperty().getPartitionRefreshNumber();
if (partitionRefreshNumber <= 0) {
return;
}
// do filter actions
filterPartitionByRefreshNumber(partitionsToRefresh);
}
public void filterPartitionWithAdaptive(PCellSortedSet partitionsToRefresh) {
// refresh all partition when it's a sync refresh, otherwise updated partitions may be lost.
ExecuteOption executeOption = mvContext.getExecuteOption();
if (executeOption != null && executeOption.getIsSync()) {
return;
}
// ignore if mv is not partitioned.
if (!mv.isPartitionedTable()) {
return;
}
// do filter actions
filterPartitionByAdaptiveRefreshNumber(partitionsToRefresh);
}
/**
* Determines the number of partitions to refresh based on the given refresh strategy.
*
@ -386,29 +346,86 @@ public abstract class MVPCTRefreshPartitioner {
* may fail due to missing or invalid metadata. In such cases, the method automatically
* falls back to the STRICT strategy to ensure the MV can still be refreshed correctly.
*
* @param sortedPartitionIterator Iterator over sorted partition names to be refreshed.
* @param refreshStrategy The refresh strategy: either ADAPTIVE or STRICT.
* @param toRefreshPartitions sorted partition names to be refreshed.
* @param refreshStrategy The refresh strategy: either ADAPTIVE or STRICT.
* @return The number of partitions to refresh.
*/
public int getRefreshNumberByMode(Iterator<PCellWithName> sortedPartitionIterator,
MaterializedView.PartitionRefreshStrategy refreshStrategy) {
public int getPartitionRefreshNumberAdaptive(PCellSortedSet toRefreshPartitions,
MaterializedView.PartitionRefreshStrategy refreshStrategy) {
try {
switch (refreshStrategy) {
case ADAPTIVE:
return getAdaptivePartitionRefreshNumber(sortedPartitionIterator);
return getAdaptivePartitionRefreshNumber(toRefreshPartitions);
case STRICT:
default:
return mv.getTableProperty().getPartitionRefreshNumber();
return getRefreshNumberByDefaultMode(toRefreshPartitions);
}
} catch (MVAdaptiveRefreshException e) {
} catch (Exception e) {
logger.warn("Adaptive refresh failed for mode '{}', falling back to STRICT mode. Reason: {}",
refreshStrategy, e.getMessage(), e);
return mv.getTableProperty().getPartitionRefreshNumber();
return getRefreshNumberByDefaultMode(toRefreshPartitions);
}
}
protected abstract int getAdaptivePartitionRefreshNumber(Iterator<PCellWithName> partitionNameIter)
throws MVAdaptiveRefreshException;
/**
* Get partition refresh number by default mode, which is used to be compatible with old versions and the following
* conditions are met:
* - partition_refresh_strategy is not set strict by user
* - partition_refresh_number is not set by user
*/
private int getRefreshNumberByDefaultMode(PCellSortedSet torRefreshPartitions) {
int defaultPartitionRefreshNumber = mv.getTableProperty().getPartitionRefreshNumber();
TableProperty tableProperty = mv.getTableProperty();
// if user has set partition_refresh_number, use it directly
if (tableProperty == null || tableProperty.isSetPartitionRefreshNumber()
|| defaultPartitionRefreshNumber != Config.default_mv_partition_refresh_number) {
return defaultPartitionRefreshNumber;
}
MaterializedView.PartitionRefreshStrategy partitionRefreshStrategy =
MaterializedView.PartitionRefreshStrategy.of(tableProperty.getPartitionRefreshStrategy());
// if partition_refresh_strategy is strict, use partition_refresh_number directly
if (tableProperty.isSetPartitionRefreshStrategy()
&& MaterializedView.PartitionRefreshStrategy.STRICT.equals(partitionRefreshStrategy)) {
return defaultPartitionRefreshNumber;
}
// if the number of partitions to refresh is not too many, use it directly
int toRefreshPartitionNum = torRefreshPartitions.size();
if (toRefreshPartitionNum <= Config.mv_max_partitions_num_per_refresh) {
return defaultPartitionRefreshNumber;
}
// to be compatible with old version, use adaptive mode if partition_refresh_strategy is not set
int toRefreshPartitionNumPerTaskRun =
(int) Math.ceil((double) toRefreshPartitionNum / Math.max(1, Config.task_runs_concurrency));
// if there are too many partitions to refresh, limit the number of partitions to refresh per task run
int finalToRefreshPartitionNumPerTaskRun = Math.min(toRefreshPartitionNumPerTaskRun,
Config.mv_max_partitions_num_per_refresh);
return finalToRefreshPartitionNumPerTaskRun;
}
public int getAdaptivePartitionRefreshNumber(PCellSortedSet toRefreshPartitions)
throws MVAdaptiveRefreshException {
if (!mv.isPartitionedTable()) {
return toRefreshPartitions.size();
}
Map<String, Map<Table, Set<String>>> mvToBaseNameRefs = mvContext.getMvRefBaseTableIntersectedPartitions();
MVRefreshPartitionSelector mvRefreshPartitionSelector =
new MVRefreshPartitionSelector(Config.mv_max_rows_per_refresh, Config.mv_max_bytes_per_refresh,
Config.mv_max_partitions_num_per_refresh, mvContext.getExternalRefBaseTableMVPartitionMap());
int adaptiveRefreshNumber = 0;
for (PCellWithName pCellWithName : toRefreshPartitions.partitions()) {
String mvRefreshPartition = pCellWithName.name();
Map<Table, Set<String>> refBaseTablesPartitions = mvToBaseNameRefs.get(mvRefreshPartition);
if (mvRefreshPartitionSelector.canAddPartition(refBaseTablesPartitions)) {
mvRefreshPartitionSelector.addPartition(refBaseTablesPartitions);
adaptiveRefreshNumber++;
} else {
break;
}
}
return adaptiveRefreshNumber;
}
/**
* Check whether the base table is supported partition refresh or not.
@ -657,7 +674,9 @@ public abstract class MVPCTRefreshPartitioner {
getExpiredPartitionsWithRetention(ttlCondition, toRefreshPartitionMap, isMockPartitionIds);
if (CollectionUtils.isNotEmpty(toRemovePartitions)) {
toRemovePartitions.stream()
.forEach(p -> toRefreshPartitions.remove(PCellWithName.of(p, toRefreshPartitionMap.get(p))));
.filter(p -> toRefreshPartitionMap.containsKey(p))
.map(p -> PCellWithName.of(p, toRefreshPartitionMap.get(p)))
.forEach(toRefreshPartitions::remove);
}
}
@ -722,4 +741,15 @@ public abstract class MVPCTRefreshPartitioner {
return result;
}
/**
* Get the iterator of to refresh partitions according to config.
*/
public Iterator<PCellWithName> getToRefreshPartitionsIterator(PCellSortedSet toRefreshPartitions,
boolean isAscending) {
if (isAscending) {
return toRefreshPartitions.iterator();
} else {
return toRefreshPartitions.descendingIterator();
}
}
}

View File

@ -64,7 +64,6 @@ import org.apache.logging.log4j.Logger;
import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@ -413,51 +412,33 @@ public final class MVPCTRefreshRangePartitioner extends MVPCTRefreshPartitioner
}
@Override
public void filterPartitionByRefreshNumber(PCellSortedSet mvPartitionsToRefresh) {
filterPartitionByRefreshNumberInternal(mvPartitionsToRefresh,
MaterializedView.PartitionRefreshStrategy.STRICT);
}
@Override
public void filterPartitionByAdaptiveRefreshNumber(PCellSortedSet mvPartitionsToRefresh) {
filterPartitionByRefreshNumberInternal(mvPartitionsToRefresh, MaterializedView.PartitionRefreshStrategy.ADAPTIVE);
}
public void filterPartitionByRefreshNumberInternal(PCellSortedSet mvPartitionsToRefresh,
MaterializedView.PartitionRefreshStrategy refreshStrategy) {
public void filterPartitionByRefreshNumber(PCellSortedSet mvPartitionsToRefresh,
MaterializedView.PartitionRefreshStrategy refreshStrategy) {
int partitionRefreshNumber = mv.getTableProperty().getPartitionRefreshNumber();
Map<String, Range<PartitionKey>> mvRangePartitionMap = mv.getRangePartitionMap();
if (partitionRefreshNumber <= 0 || partitionRefreshNumber >= mvRangePartitionMap.size()) {
return;
}
Iterator<PCellWithName> inputIter = mvPartitionsToRefresh.iterator();
while (inputIter.hasNext()) {
PCellWithName pCellWithName = inputIter.next();
String mvPartitionName = pCellWithName.name();
// skip if partition is not in the mv's partition range map
if (!mvRangePartitionMap.containsKey(mvPartitionName)) {
logger.warn("Partition {} is not in the materialized view's partition range map, " +
"remove it from refresh list", mvPartitionName);
mvPartitionsToRefresh.remove(pCellWithName);
}
// remove invalid cells from the input to-refresh partitions
mvPartitionsToRefresh
.stream()
.filter(pCell -> !mvRangePartitionMap.containsKey(pCell.name()))
.forEach(mvPartitionsToRefresh::remove);
Iterator<PCellWithName> iterator = getToRefreshPartitionsIterator(mvPartitionsToRefresh,
Config.materialized_view_refresh_ascending);
// dynamically get the number of partitions to be refreshed this time
partitionRefreshNumber = getPartitionRefreshNumberAdaptive(mvPartitionsToRefresh, refreshStrategy);
if (partitionRefreshNumber <= 0 || mvPartitionsToRefresh.size() <= partitionRefreshNumber) {
return;
}
LinkedList<PCellWithName> sortedPartition = mvPartitionsToRefresh
.stream()
.collect(Collectors.toCollection(LinkedList::new));
Iterator<PCellWithName> sortedIterator = Config.materialized_view_refresh_ascending
? sortedPartition.iterator() : sortedPartition.descendingIterator();
// dynamically get the number of partitions to be refreshed this time
partitionRefreshNumber = getRefreshNumberByMode(sortedIterator, refreshStrategy);
PCellWithName tmpPCellWithName = null;
for (int i = 0; i < partitionRefreshNumber; i++) {
if (sortedIterator.hasNext()) {
tmpPCellWithName = sortedIterator.next();
sortedIterator.remove();
}
int i = 0;
while (i++ < partitionRefreshNumber && iterator.hasNext()) {
PCellWithName pCell = iterator.next();
logger.debug("Materialized view [{}] to refresh partition name {}, value {}",
mv.getName(), pCell.name(), pCell.cell());
// NOTE: if mv's need to refresh partitions in the many-to-many mappings, no need to filter to
// avoid data lose.
// eg:
@ -477,41 +458,11 @@ public final class MVPCTRefreshRangePartitioner extends MVPCTRefreshPartitioner
// BTW, since the refresh has already scanned the needed base tables' data, it's better to update
// more mv's partitions as more as possible.
// TODO: But it may cause much memory to refresh many partitions, support fine-grained partition refresh later.
if (!mvToRefreshPotentialPartitions.isEmpty() && mvToRefreshPotentialPartitions.contains(tmpPCellWithName.name())) {
if (!mvToRefreshPotentialPartitions.isEmpty() && mvToRefreshPotentialPartitions.contains(pCell.name())) {
return;
}
}
if (!Config.materialized_view_refresh_ascending) {
sortedIterator = sortedPartition.iterator();
}
setNextPartitionStartAndEnd(mvPartitionsToRefresh, sortedIterator);
}
public int getAdaptivePartitionRefreshNumber(
Iterator<PCellWithName> iterator) throws MVAdaptiveRefreshException {
Map<String, Map<Table, Set<String>>> mvToBaseNameRefs = mvContext.getMvRefBaseTableIntersectedPartitions();
MVRefreshPartitionSelector mvRefreshPartitionSelector =
new MVRefreshPartitionSelector(Config.mv_max_rows_per_refresh, Config.mv_max_bytes_per_refresh,
Config.mv_max_partitions_num_per_refresh, mvContext.getExternalRefBaseTableMVPartitionMap());
int adaptiveRefreshNumber = 0;
while (iterator.hasNext()) {
PCellWithName pCellWithName = iterator.next();
String mvRefreshPartition = pCellWithName.name();
Map<Table, Set<String>> refBaseTablesPartitions = mvToBaseNameRefs.get(mvRefreshPartition);
if (mvRefreshPartitionSelector.canAddPartition(refBaseTablesPartitions)) {
mvRefreshPartitionSelector.addPartition(refBaseTablesPartitions);
iterator.remove();
adaptiveRefreshNumber++;
} else {
break;
}
}
return adaptiveRefreshNumber;
}
private void setNextPartitionStartAndEnd(PCellSortedSet partitionsToRefresh,
Iterator<PCellWithName> iterator) {
String nextPartitionStart = null;
PCellWithName end = null;
if (iterator.hasNext()) {
@ -520,12 +471,11 @@ public final class MVPCTRefreshRangePartitioner extends MVPCTRefreshPartitioner
Range<PartitionKey> partitionKeyRange = pRangeCell.getRange();
nextPartitionStart = AnalyzerUtils.parseLiteralExprToDateString(partitionKeyRange.lowerEndpoint(), 0);
end = start;
partitionsToRefresh.remove(end);
iterator.remove();
}
while (iterator.hasNext()) {
end = iterator.next();
iterator.remove();
partitionsToRefresh.remove(end);
}
if (!mvRefreshParams.isTentative()) {

View File

@ -19,6 +19,7 @@ import com.starrocks.common.Config;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NavigableSet;
import java.util.Objects;
import java.util.Set;
import java.util.SortedSet;
@ -92,6 +93,10 @@ public record PCellSortedSet(SortedSet<PCellWithName> partitions) {
partitions.forEach(action);
}
public Iterator<PCellWithName> descendingIterator() {
return ((NavigableSet<PCellWithName>) partitions).descendingIterator();
}
public Iterator<PCellWithName> iterator() {
return partitions.iterator();
}

View File

@ -117,9 +117,14 @@ public final class PRangeCell extends PCell {
@Override
public String toString() {
return "PRangeCell{" +
"range=" + range +
'}';
return "[%s-%s]".formatted(toString(range.lowerEndpoint()), toString(range.upperEndpoint()));
}
private String toString(PartitionKey partitionKey) {
if (partitionKey == null) {
return "null";
}
return partitionKey.toSql();
}
public static Map<String, Range<PartitionKey>> toRangeMap(Map<String, PCell> input) {

View File

@ -1221,8 +1221,6 @@ public class MaterializedViewTest extends StarRocksTestBase {
@Test
public void testPartitionRefreshStrategy() {
Assertions.assertEquals(MaterializedView.PartitionRefreshStrategy.defaultValue(),
MaterializedView.PartitionRefreshStrategy.valueOf("STRICT"));
String sql = "create materialized view test_mv1 " +
"DISTRIBUTED BY HASH(`k2`) BUCKETS 3 \n" +
"REFRESH MANUAL\n" +
@ -1236,7 +1234,7 @@ public class MaterializedViewTest extends StarRocksTestBase {
Database db = GlobalStateMgr.getCurrentState().getLocalMetastore().getDb("test");
MaterializedView mv = ((MaterializedView) GlobalStateMgr.getCurrentState().getLocalMetastore()
.getTable(db.getFullName(), "test_mv1"));
Assertions.assertEquals(mv.getPartitionRefreshStrategy(), MaterializedView.PartitionRefreshStrategy.STRICT);
Assertions.assertEquals(mv.getPartitionRefreshStrategy(), MaterializedView.PartitionRefreshStrategy.ADAPTIVE);
});
}
}

View File

@ -177,8 +177,7 @@ public class QueryCacheAndMVTest extends MVTestBase {
"DISTRIBUTED BY HASH(`id`) BUCKETS 10\n" +
"REFRESH DEFERRED MANUAL\n" +
"PROPERTIES (\n" +
"\"replication_num\" = \"1\",\n" +
"\"storage_medium\" = \"HDD\"\n" +
"\"replication_num\" = \"1\"" +
")\n" +
"AS SELECT id, data, ts FROM `iceberg0`.`partitioned_transforms_db`.`t0_year` as a;");
Database testDb = GlobalStateMgr.getCurrentState().getLocalMetastore().getDb("test");

View File

@ -418,8 +418,6 @@ public class PartitionBasedMvRefreshProcessorOlapPart2Test extends MVTestBase {
Database testDb = GlobalStateMgr.getCurrentState().getLocalMetastore().getDb("test");
MaterializedView mv = ((MaterializedView) testDb.getTable(mvName));
TaskManager tm = GlobalStateMgr.getCurrentState().getTaskManager();
long taskId = tm.getTask(TaskBuilder.getMvTaskName(mv.getId())).getId();
TaskRunScheduler taskRunScheduler = tm.getTaskRunScheduler();
executeInsertSql(connectContext, "insert into tbl6 partition(p1) values('2022-01-02',2,10);");
executeInsertSql(connectContext, "insert into tbl6 partition(p2) values('2022-02-02',2,10);");
@ -428,15 +426,12 @@ public class PartitionBasedMvRefreshProcessorOlapPart2Test extends MVTestBase {
taskRunProperties.put(TaskRun.FORCE, Boolean.toString(true));
Task task = TaskBuilder.buildMvTask(mv, testDb.getFullName());
TaskRun taskRun = TaskRunBuilder.newBuilder(task).build();
initAndExecuteTaskRun(taskRun);
executeTaskRun(taskRun);
TGetTasksParams params = new TGetTasksParams();
params.setTask_name(task.getName());
List<TaskRunStatus> statuses = tm.getMatchedTaskRunStatus(params);
while (statuses.size() != 1) {
statuses = tm.getMatchedTaskRunStatus(params);
Thread.sleep(100);
}
Assertions.assertEquals(1, statuses.size());
Assertions.assertEquals(2, statuses.size());
TaskRunStatus status = statuses.get(0);
// default priority for next refresh batch is Constants.TaskRunPriority.HIGHER.value()
Assertions.assertEquals(Constants.TaskRunPriority.HIGHER.value(), status.getPriority());
@ -487,16 +482,13 @@ public class PartitionBasedMvRefreshProcessorOlapPart2Test extends MVTestBase {
Task task = TaskBuilder.buildMvTask(mv, testDb.getFullName());
ExecuteOption executeOption = new ExecuteOption(70, false, new HashMap<>());
TaskRun taskRun = TaskRunBuilder.newBuilder(task).setExecuteOption(executeOption).build();
initAndExecuteTaskRun(taskRun);
executeTaskRun(taskRun);
TGetTasksParams params = new TGetTasksParams();
params.setTask_name(task.getName());
TaskManager tm = GlobalStateMgr.getCurrentState().getTaskManager();
List<TaskRunStatus> statuses = tm.getMatchedTaskRunStatus(params);
while (statuses.size() != 1) {
statuses = tm.getMatchedTaskRunStatus(params);
Thread.sleep(100);
}
Assertions.assertEquals(1, statuses.size());
Assertions.assertEquals(2, statuses.size());
TaskRunStatus status = statuses.get(0);
// the priority for next refresh batch is 70 which is specified in executeOption
Assertions.assertEquals(70, status.getPriority());

View File

@ -988,7 +988,8 @@ public class PartitionBasedMvRefreshProcessorOlapTest extends MVTestBase {
MVPCTBasedRefreshProcessor processor = getPartitionBasedRefreshProcessor(taskRun);
MVPCTRefreshPartitioner partitioner = processor.getMvRefreshPartitioner();
PCellSortedSet mvToRefreshPartitionNames = getMVPCellWithNames(mv, mv.getPartitionNames());
partitioner.filterPartitionByAdaptiveRefreshNumber(mvToRefreshPartitionNames);
partitioner.filterPartitionByRefreshNumber(mvToRefreshPartitionNames,
MaterializedView.PartitionRefreshStrategy.ADAPTIVE);
MvTaskRunContext mvContext = processor.getMvContext();
Assertions.assertNull(mvContext.getNextPartitionStart());
Assertions.assertNull(mvContext.getNextPartitionEnd());
@ -1018,7 +1019,8 @@ public class PartitionBasedMvRefreshProcessorOlapTest extends MVTestBase {
MVPCTBasedRefreshProcessor processor = getPartitionBasedRefreshProcessor(taskRun);
MVPCTRefreshPartitioner partitioner = processor.getMvRefreshPartitioner();
partitioner.filterPartitionByAdaptiveRefreshNumber(getMVPCellWithNames(mv, mv.getPartitionNames()));
partitioner.filterPartitionByRefreshNumber(getMVPCellWithNames(mv, mv.getPartitionNames()),
MaterializedView.PartitionRefreshStrategy.ADAPTIVE);
MvTaskRunContext mvContext = processor.getMvContext();
Assertions.assertNull(mvContext.getNextPartitionStart());
Assertions.assertNull(mvContext.getNextPartitionEnd());
@ -1061,7 +1063,8 @@ public class PartitionBasedMvRefreshProcessorOlapTest extends MVTestBase {
MVPCTBasedRefreshProcessor processor = getPartitionBasedRefreshProcessor(taskRun);
MVPCTRefreshPartitioner partitioner = processor.getMvRefreshPartitioner();
partitioner.filterPartitionByAdaptiveRefreshNumber(getMVPCellWithNames(mv, mv.getPartitionNames()));
partitioner.filterPartitionByRefreshNumber(getMVPCellWithNames(mv, mv.getPartitionNames()),
MaterializedView.PartitionRefreshStrategy.ADAPTIVE);
MvTaskRunContext mvContext = processor.getMvContext();
Assertions.assertNull(mvContext.getNextPartitionStart());
Assertions.assertNull(mvContext.getNextPartitionEnd());
@ -1120,19 +1123,22 @@ public class PartitionBasedMvRefreshProcessorOlapTest extends MVTestBase {
// round 1
PCellSortedSet toRefresh = getMVPCellWithNames(mv, allPartitions);
partitioner.filterPartitionByRefreshNumber(toRefresh);
partitioner.filterPartitionByRefreshNumber(toRefresh,
MaterializedView.PartitionRefreshStrategy.STRICT);
assertPCellNameEquals("p0", toRefresh);
refreshedPartitions.addAll(getPartitionNames(toRefresh));
// round 2
toRefresh = getMVPCellWithNames(mv, SetUtils.disjunction(allPartitions, refreshedPartitions));
partitioner.filterPartitionByRefreshNumber(toRefresh);
partitioner.filterPartitionByRefreshNumber(toRefresh,
MaterializedView.PartitionRefreshStrategy.STRICT);
assertPCellNameEquals("p1", toRefresh);
refreshedPartitions.addAll(getPartitionNames(toRefresh));
// round 3
toRefresh = getMVPCellWithNames(mv, SetUtils.disjunction(allPartitions, refreshedPartitions));
partitioner.filterPartitionByRefreshNumber(toRefresh);
partitioner.filterPartitionByRefreshNumber(toRefresh,
MaterializedView.PartitionRefreshStrategy.STRICT);
assertPCellNameEquals("p2", toRefresh);
refreshedPartitions.addAll(getPartitionNames(toRefresh));
}
@ -1144,31 +1150,36 @@ public class PartitionBasedMvRefreshProcessorOlapTest extends MVTestBase {
// round 1
PCellSortedSet toRefresh = getMVPCellWithNames(mv, allPartitions);
partitioner.filterPartitionByRefreshNumber(toRefresh);
partitioner.filterPartitionByRefreshNumber(toRefresh,
MaterializedView.PartitionRefreshStrategy.STRICT);
assertPCellNameEquals("p4", toRefresh);
refreshedPartitions.addAll(getPartitionNames(toRefresh));
// round 2
toRefresh = getMVPCellWithNames(mv, SetUtils.disjunction(allPartitions, refreshedPartitions));
partitioner.filterPartitionByRefreshNumber(toRefresh);
partitioner.filterPartitionByRefreshNumber(toRefresh,
MaterializedView.PartitionRefreshStrategy.STRICT);
assertPCellNameEquals("p3", toRefresh);
refreshedPartitions.addAll(getPartitionNames(toRefresh));
// round 3
toRefresh = getMVPCellWithNames(mv, SetUtils.disjunction(allPartitions, refreshedPartitions));
partitioner.filterPartitionByRefreshNumber(toRefresh);
partitioner.filterPartitionByRefreshNumber(toRefresh,
MaterializedView.PartitionRefreshStrategy.STRICT);
assertPCellNameEquals("p2", toRefresh);
refreshedPartitions.addAll(getPartitionNames(toRefresh));
// round 4
toRefresh = getMVPCellWithNames(mv, SetUtils.disjunction(allPartitions, refreshedPartitions));
partitioner.filterPartitionByRefreshNumber(toRefresh);
partitioner.filterPartitionByRefreshNumber(toRefresh,
MaterializedView.PartitionRefreshStrategy.STRICT);
assertPCellNameEquals("p1", toRefresh);
refreshedPartitions.addAll(getPartitionNames(toRefresh));
// round 5
toRefresh = getMVPCellWithNames(mv, SetUtils.disjunction(allPartitions, refreshedPartitions));
partitioner.filterPartitionByRefreshNumber(toRefresh);
partitioner.filterPartitionByRefreshNumber(toRefresh,
MaterializedView.PartitionRefreshStrategy.STRICT);
assertPCellNameEquals("p0", toRefresh);
refreshedPartitions.addAll(getPartitionNames(toRefresh));
Config.materialized_view_refresh_ascending = true;

View File

@ -18,14 +18,11 @@ import com.starrocks.catalog.Database;
import com.starrocks.catalog.MaterializedView;
import com.starrocks.scheduler.MvTaskRunContext;
import com.starrocks.scheduler.TaskRunContext;
import com.starrocks.sql.common.PCellWithName;
import com.starrocks.sql.common.PCellSortedSet;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;
import java.util.Collections;
import java.util.Iterator;
public class MVPCTRefreshNonPartitionerTest {
@Test
@ -37,8 +34,7 @@ public class MVPCTRefreshNonPartitionerTest {
MVRefreshParams mvRefreshParams = Mockito.mock(MVRefreshParams.class);
MVPCTRefreshNonPartitioner job = new MVPCTRefreshNonPartitioner(mvTaskRunContext, taskRunContext,
database, mv, mvRefreshParams);
Iterator<PCellWithName> dummyIter = Collections.emptyIterator();
int result = job.getAdaptivePartitionRefreshNumber(dummyIter);
int result = job.getAdaptivePartitionRefreshNumber(PCellSortedSet.of());
Assertions.assertEquals(0, result);
}
}

View File

@ -31,7 +31,6 @@ import org.mockito.Mockito;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
@ -47,6 +46,7 @@ public class MVPCTRefreshRangePartitionerTest {
MaterializedView mv = mock(MaterializedView.class);
when(mv.getTableProperty()).thenReturn(mock(TableProperty.class));
when(mv.getPartitionInfo()).thenReturn(mock(PartitionInfo.class));
when(mv.isPartitionedTable()).thenReturn(true);
OlapTable refTable1 = Mockito.mock(OlapTable.class);
Set<String> refTablePartition1 = Set.of("partition1", "partition2");
@ -67,12 +67,11 @@ public class MVPCTRefreshRangePartitionerTest {
// TODO: make range cells
List<PCellWithName> partitions = Arrays.asList(PCellWithName.of("mv_p1", new PCellNone()),
PCellWithName.of("mv_p2", new PCellNone()));
Iterator<PCellWithName> iter = partitions.iterator();
MVRefreshParams mvRefreshParams = new MVRefreshParams(mv, new HashMap<>());
MVPCTRefreshRangePartitioner partitioner = new MVPCTRefreshRangePartitioner(mvContext, null,
null, mv, mvRefreshParams);
MVAdaptiveRefreshException exception = Assertions.assertThrows(MVAdaptiveRefreshException.class,
() -> partitioner.getAdaptivePartitionRefreshNumber(iter));
() -> partitioner.getAdaptivePartitionRefreshNumber(PCellSortedSet.of(partitions)));
Assertions.assertTrue(exception.getMessage().contains("Missing too many partition stats"));
}

View File

@ -38,14 +38,17 @@ import com.starrocks.persist.gson.GsonUtils;
import com.starrocks.pseudocluster.PseudoCluster;
import com.starrocks.qe.ConnectContext;
import com.starrocks.qe.StmtExecutor;
import com.starrocks.scheduler.Constants;
import com.starrocks.scheduler.ExecuteOption;
import com.starrocks.scheduler.MVTaskRunProcessor;
import com.starrocks.scheduler.MvTaskRunContext;
import com.starrocks.scheduler.SubmitResult;
import com.starrocks.scheduler.Task;
import com.starrocks.scheduler.TaskBuilder;
import com.starrocks.scheduler.TaskManager;
import com.starrocks.scheduler.TaskRun;
import com.starrocks.scheduler.TaskRunBuilder;
import com.starrocks.scheduler.TaskRunManager;
import com.starrocks.scheduler.TaskRunProcessor;
import com.starrocks.scheduler.mv.BaseTableSnapshotInfo;
import com.starrocks.scheduler.mv.MVPCTBasedRefreshProcessor;
@ -101,6 +104,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
@ -455,6 +459,17 @@ public abstract class MVTestBase extends StarRocksTestBase {
return result;
}
protected static void executeTaskRun(TaskRun taskRun) throws Exception {
// ensure taskRun is initialized
taskRun.setStatus(null);
TaskManager taskManager = GlobalStateMgr.getCurrentState().getTaskManager();
TaskRunManager taskRunManager = taskManager.getTaskRunManager();
SubmitResult result = taskRunManager.submitTaskRun(taskRun);
Assertions.assertTrue(result.getStatus().equals(SubmitResult.SubmitStatus.SUBMITTED));
Constants.TaskRunState state = result.getFuture().get(300000, TimeUnit.MILLISECONDS);
Assertions.assertTrue(state.isFinishState());
}
protected static void initAndExecuteTaskRun(TaskRun taskRun) throws Exception {
taskRun.initStatus(UUIDUtil.genUUID().toString(), System.currentTimeMillis());
taskRun.executeTaskRun();