[Enhancement] Ensure mv force refresh will refresh target partitions (backport #62627) (#63844)

Signed-off-by: shuming.li <ming.moriarty@gmail.com>
Co-authored-by: shuming.li <ming.moriarty@gmail.com>
This commit is contained in:
mergify[bot] 2025-10-10 16:41:08 +08:00 committed by GitHub
parent 057803bbaa
commit 0c2e5c39f9
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
22 changed files with 906 additions and 62 deletions

View File

@ -1021,8 +1021,6 @@ public class MaterializedView extends OlapTable implements GsonPreProcessable, G
}
}
@Override
public void dropPartition(long dbId, String partitionName, boolean isForceDrop) {
super.dropPartition(dbId, partitionName, isForceDrop);

View File

@ -433,7 +433,7 @@ public class DynamicPartitionUtil {
PartitionInfo partitionInfo = olapTable.getPartitionInfo();
PartitionType partitionType = partitionInfo.getType();
if (!isDynamicPartitionTable(olapTable, partitionInfo, tableProperty)) {
LOG.info("unable to schedule olap table {}-{} because dynamic partition is not enabled, " +
LOG.debug("unable to schedule olap table {}-{} because dynamic partition is not enabled, " +
"partition type: {}, partition column size: {}",
table.getName(), table.getId(), partitionType, partitionInfo.getPartitionColumnsSize());
return false;
@ -479,7 +479,7 @@ public class DynamicPartitionUtil {
PartitionInfo partitionInfo = olapTable.getPartitionInfo();
PartitionType partitionType = partitionInfo.getType();
if (!isTTLPartitionTable(olapTable, partitionInfo, tableProperty)) {
LOG.info("unable to schedule olap table {}-{} because partition ttl is not enabled, " +
LOG.debug("unable to schedule olap table {}-{} because partition ttl is not enabled, " +
"partition type: {}, partition column size: {}",
table.getName(), table.getId(), partitionType, partitionInfo.getPartitionColumnsSize());
return false;

View File

@ -24,8 +24,9 @@ import org.apache.commons.lang3.StringUtils;
public abstract class BaseTaskRunProcessor implements TaskRunProcessor {
@Override
public void prepare(TaskRunContext context) throws Exception {
public TaskRunContext prepare(TaskRunContext context) throws Exception {
// do nothing
return context;
}
@Override

View File

@ -27,6 +27,7 @@ import com.google.common.util.concurrent.Uninterruptibles;
import com.starrocks.analysis.Expr;
import com.starrocks.catalog.BaseTableInfo;
import com.starrocks.catalog.Column;
import com.starrocks.catalog.DataProperty;
import com.starrocks.catalog.Database;
import com.starrocks.catalog.MaterializedView;
import com.starrocks.catalog.MaterializedView.PartitionRefreshStrategy;
@ -73,6 +74,7 @@ import com.starrocks.scheduler.mv.MVVersionManager;
import com.starrocks.scheduler.persist.MVTaskRunExtraMessage;
import com.starrocks.scheduler.persist.TaskRunStatus;
import com.starrocks.server.GlobalStateMgr;
import com.starrocks.server.LocalMetastore;
import com.starrocks.sql.StatementPlanner;
import com.starrocks.sql.analyzer.MaterializedViewAnalyzer;
import com.starrocks.sql.analyzer.PlannerMetaLocker;
@ -269,7 +271,7 @@ public class PartitionBasedMvRefreshProcessor extends BaseTaskRunProcessor {
try (Timer ignored = Tracers.watchScope("MVRefreshSyncPartitions")) {
// sync partitions between mv and base tables out of lock
// do it outside lock because it is a time-cost operation
if (!syncPartitions()) {
if (!syncPartitions(context, false)) {
logger.warn("Sync partitions failed.");
return false;
}
@ -471,7 +473,7 @@ public class PartitionBasedMvRefreshProcessor extends BaseTaskRunProcessor {
Map<TableSnapshotInfo, Set<String>> baseTableCandidatePartitions = Maps.newHashMap();
if (Config.enable_materialized_view_external_table_precise_refresh) {
try (Timer ignored = Tracers.watchScope("MVRefreshComputeCandidatePartitions")) {
if (!syncPartitions()) {
if (!syncPartitions(context, false)) {
throw new DmlException(String.format("materialized view %s refresh task failed: sync partition failed",
mv.getName()));
}
@ -940,7 +942,7 @@ public class PartitionBasedMvRefreshProcessor extends BaseTaskRunProcessor {
@VisibleForTesting
@Override
public void prepare(TaskRunContext context) throws Exception {
public TaskRunContext prepare(TaskRunContext context) throws Exception {
Map<String, String> properties = context.getProperties();
// NOTE: mvId is set in Task's properties when creating
long mvId = Long.parseLong(properties.get(MV_ID));
@ -994,6 +996,7 @@ public class PartitionBasedMvRefreshProcessor extends BaseTaskRunProcessor {
this.mvRefreshPartitioner = buildMvRefreshPartitioner(mv, context);
logger.info("finish prepare refresh mv:{}, jobId: {}", mvId, jobId);
return context;
}
/**
@ -1016,11 +1019,59 @@ public class PartitionBasedMvRefreshProcessor extends BaseTaskRunProcessor {
/**
* Sync base table's partition infos to be used later.
*/
private boolean syncPartitions() throws AnalysisException, LockTimeoutException {
private boolean syncPartitions(TaskRunContext taskRunContext,
boolean tentative) throws AnalysisException, LockTimeoutException {
Stopwatch stopwatch = Stopwatch.createStarted();
// collect base table snapshot infos
snapshotBaseTables = collectBaseTableSnapshotInfos(mv);
final MVRefreshParams mvRefreshParams = new MVRefreshParams(mv.getPartitionInfo(),
taskRunContext.getProperties(), tentative);
if (mvRefreshParams.isForce() && !tentative) {
// drop existing partitions for force refresh
Set<String> toRefreshPartitions = mvRefreshPartitioner.getMVPartitionsToRefreshByParams(mvRefreshParams);
if (toRefreshPartitions != null && !toRefreshPartitions.isEmpty()) {
logger.info("force refresh, drop partitions: [{}]",
Joiner.on(",").join(toRefreshPartitions));
// first lock and drop partitions from a visible map
Locker locker = new Locker();
if (!locker.lockTableAndCheckDbExist(db, mv.getId(), LockType.WRITE)) {
logger.warn("failed to lock database: {} in syncPartitions for force refresh", db.getFullName());
throw new DmlException("Force refresh failed, database:" + db.getFullName() + " not exist");
}
try {
PartitionInfo partitionInfo = mv.getPartitionInfo();
DataProperty dataProperty = null;
if (!mv.isPartitionedTable()) {
String partitionName = toRefreshPartitions.iterator().next();
Partition partition = mv.getPartition(partitionName);
dataProperty = partitionInfo.getDataProperty(partition.getId());
mv.dropPartition(db.getId(), partitionName, false);
} else {
for (String partName : toRefreshPartitions) {
mvRefreshPartitioner.dropPartition(db, mv, partName);
}
}
// for non-partitioned table, we need to build the partition here
if (!mv.isPartitionedTable()) {
LocalMetastore localMetastore = GlobalStateMgr.getCurrentState().getLocalMetastore();
ConnectContext connectContext = mvContext.getCtx();
localMetastore.buildNonPartitionOlapTable(db, mv, partitionInfo, dataProperty,
connectContext.getCurrentComputeResource());
}
} catch (Exception e) {
logger.warn("failed to drop partitions {} for force refresh",
Joiner.on(",").join(toRefreshPartitions),
DebugUtil.getRootStackTrace(e));
throw new AnalysisException("failed to drop partitions for force refresh: " + e.getMessage());
} finally {
locker.unLockTableWithIntensiveDbLock(db.getId(), this.mv.getId(), LockType.WRITE);
}
}
}
// do sync partitions (add or drop partitions) for materialized view
boolean result = mvRefreshPartitioner.syncAddOrDropPartitions();
logger.info("finish sync partitions, cost(ms): {}", stopwatch.elapsed(TimeUnit.MILLISECONDS));
@ -1048,9 +1099,9 @@ public class PartitionBasedMvRefreshProcessor extends BaseTaskRunProcessor {
MVRefreshParams mvRefreshParams,
Set<String> mvPotentialPartitionNames)
throws AnalysisException {
if (mvRefreshParams.isForceCompleteRefresh()) {
if (mvRefreshParams.isForce()) {
// Force refresh
return mvRefreshPartitioner.getMVPartitionsToRefreshWithForce();
return mvRefreshPartitioner.getMVPartitionsToRefreshWithForce(mvRefreshParams);
} else {
return mvRefreshPartitioner.getMVPartitionsToRefresh(mvPartitionInfo, snapshotBaseTables,
mvRefreshParams, mvPotentialPartitionNames);

View File

@ -344,7 +344,7 @@ public class TaskRun implements Comparable<TaskRun> {
taskRunContext.setTaskRun(this);
// prepare to execute task run, move it here so that we can catch the exception and set the status
processor.prepare(taskRunContext);
taskRunContext = processor.prepare(taskRunContext);
// process task run
Constants.TaskRunState taskRunState;

View File

@ -20,7 +20,7 @@ public interface TaskRunProcessor {
* Before task run, prepare the context.
* @param context: task run context
*/
void prepare(TaskRunContext context) throws Exception;
TaskRunContext prepare(TaskRunContext context) throws Exception;
/**
* Process a task run with the given context.

View File

@ -87,6 +87,30 @@ public final class MVPCTRefreshListPartitioner extends MVPCTRefreshPartitioner {
this.logger = MVTraceUtils.getLogger(mv, MVPCTRefreshListPartitioner.class);
}
@Override
public Set<String> getMVPartitionsToRefreshByParams(MVRefreshParams mvRefreshParams) {
if (mvRefreshParams.isCompleteRefresh()) {
return mv.getListPartitionItems().keySet();
} else {
Set<PListCell> pListCells = mvRefreshParams.getListValues();
Map<String, PListCell> mvPartitions = mv.getListPartitionItems();
Map<String, PListCell> mvFilteredPartitions = mvPartitions.entrySet().stream()
.filter(e -> {
PListCell mvListCell = e.getValue();
if (mvListCell.getItemSize() == 1) {
// if list value is a single value, check it directly
return pListCells.contains(e.getValue());
} else {
// if list values is multi values, split it into single values and check it then.
return mvListCell.toSingleValueCells().stream().anyMatch(i -> pListCells.contains(i));
}
})
.map(Map.Entry::getKey)
.collect(Collectors.toMap(k -> k, mvPartitions::get));
return mvFilteredPartitions.keySet();
}
}
@Override
public boolean syncAddOrDropPartitions() throws LockTimeoutException {
// collect mv partition items with lock
@ -337,13 +361,6 @@ public final class MVPCTRefreshListPartitioner extends MVPCTRefreshPartitioner {
}
}
@Override
public Set<String> getMVPartitionsToRefreshWithForce() {
Map<String, PCell> mvValidListPartitionMapMap = mv.getPartitionCells(Optional.empty());
filterPartitionsByTTL(mvValidListPartitionMapMap, false);
return mvValidListPartitionMapMap.keySet();
}
@Override
public Set<String> getMVPartitionsToRefresh(PartitionInfo mvPartitionInfo,
Map<Long, TableSnapshotInfo> snapshotBaseTables,

View File

@ -40,6 +40,11 @@ public final class MVPCTRefreshNonPartitioner extends MVPCTRefreshPartitioner {
super(mvContext, context, db, mv);
}
@Override
public Set<String> getMVPartitionsToRefreshByParams(MVRefreshParams mvRefreshParams) {
return mv.getVisiblePartitionNames();
}
@Override
public boolean syncAddOrDropPartitions() {
// do nothing
@ -60,11 +65,6 @@ public final class MVPCTRefreshNonPartitioner extends MVPCTRefreshPartitioner {
return null;
}
@Override
public Set<String> getMVPartitionsToRefreshWithForce() {
return mv.getVisiblePartitionNames();
}
@Override
public Set<String> getMVPartitionsToRefresh(PartitionInfo mvPartitionInfo,
Map<Long, TableSnapshotInfo> snapshotBaseTables,

View File

@ -27,6 +27,7 @@ import com.starrocks.catalog.PartitionInfo;
import com.starrocks.catalog.Table;
import com.starrocks.common.AnalysisException;
import com.starrocks.common.Config;
import com.starrocks.common.Pair;
import com.starrocks.common.util.concurrent.lock.LockTimeoutException;
import com.starrocks.common.util.concurrent.lock.LockType;
import com.starrocks.common.util.concurrent.lock.Locker;
@ -48,8 +49,10 @@ import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import static com.starrocks.catalog.MvRefreshArbiter.getMvBaseTableUpdateInfo;
import static com.starrocks.catalog.MvRefreshArbiter.needsToRefreshTable;
@ -108,6 +111,11 @@ public abstract class MVPCTRefreshPartitioner {
public abstract Expr generateMVPartitionPredicate(TableName tableName,
Set<String> mvPartitionNames) throws AnalysisException;
/**
* Get mv partition names to refresh based on the mv refresh params.
*/
public abstract Set<String> getMVPartitionsToRefreshByParams(MVRefreshParams mvRefreshParams) throws AnalysisException;
/**
* Get mv partitions to refresh based on the ref base table partitions.
*
@ -122,7 +130,19 @@ public abstract class MVPCTRefreshPartitioner {
MVRefreshParams mvRefreshParams,
Set<String> mvPotentialPartitionNames) throws AnalysisException;
public abstract Set<String> getMVPartitionsToRefreshWithForce() throws AnalysisException;
public Set<String> getMVPartitionsToRefreshWithForce(MVRefreshParams mvRefreshParams) throws AnalysisException {
Set<String> toRefreshPartitions = getMVPartitionsToRefreshByParams(mvRefreshParams);
if (CollectionUtils.isEmpty(toRefreshPartitions) || !mv.isPartitionedTable()) {
return toRefreshPartitions;
}
Map<String, PCell> mvListPartitionMap = mv.getPartitionCells(Optional.empty());
Map<String, PCell> validToRefreshPartitions = toRefreshPartitions.stream()
.filter(mvListPartitionMap::containsKey)
.map(name -> Pair.create(name, mvListPartitionMap.get(name)))
.collect(Collectors.toMap(x -> x.first, x -> x.second));
filterPartitionsByTTL(validToRefreshPartitions, true);
return validToRefreshPartitions.keySet();
}
/**
* Get mv partition names with TTL based on the ref base table partitions.

View File

@ -19,6 +19,7 @@ import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Range;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.Uninterruptibles;
import com.starrocks.analysis.BoolLiteral;
import com.starrocks.analysis.Expr;
@ -62,6 +63,7 @@ import com.starrocks.sql.common.RangePartitionDiffer;
import com.starrocks.sql.common.SyncPartitionUtils;
import com.starrocks.sql.optimizer.rule.transformation.materialization.MvUtils;
import org.apache.commons.collections4.ListUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.Logger;
import java.util.Collections;
@ -90,6 +92,38 @@ public final class MVPCTRefreshRangePartitioner extends MVPCTRefreshPartitioner
this.logger = MVTraceUtils.getLogger(mv, MVPCTRefreshRangePartitioner.class);
}
@Override
public Set<String> getMVPartitionsToRefreshByParams(MVRefreshParams mvRefreshParams) throws AnalysisException {
if (mvRefreshParams.isCompleteRefresh()) {
Map<String, Range<PartitionKey>> rangePartitionMap = mv.getRangePartitionMap();
if (rangePartitionMap.isEmpty()) {
return Sets.newHashSet();
}
return rangePartitionMap.keySet();
} else {
String start = mvRefreshParams.getRangeStart();
String end = mvRefreshParams.getRangeEnd();
if (StringUtils.isEmpty(start) && StringUtils.isEmpty(end)) {
return Sets.newHashSet();
}
// range partition must have partition column, and its column size must be 1
Column partitionColumn = mv.getRangePartitionFirstColumn().get();
Range<PartitionKey> rangeToInclude = createRange(start, end, partitionColumn);
Map<String, Range<PartitionKey>> rangePartitionMap = mv.getRangePartitionMap();
if (rangePartitionMap.isEmpty()) {
return Sets.newHashSet();
}
Set<String> result = Sets.newHashSet();
for (Map.Entry<String, Range<PartitionKey>> entry : rangePartitionMap.entrySet()) {
Range<PartitionKey> rangeToCheck = entry.getValue();
if (RangePartitionDiffer.isRangeIncluded(rangeToInclude, rangeToCheck)) {
result.add(entry.getKey());
}
}
return result;
}
}
@Override
public boolean syncAddOrDropPartitions() throws AnalysisException {
String start = context == null ? null : context.getProperties().get(TaskRun.PARTITION_START);
@ -327,14 +361,6 @@ public final class MVPCTRefreshRangePartitioner extends MVPCTRefreshPartitioner
return mvToRefreshPartitionNames;
}
@Override
public Set<String> getMVPartitionsToRefreshWithForce() throws AnalysisException {
int partitionTTLNumber = mvContext.getPartitionTTLNumber();
Map<String, Range<PartitionKey>> inputRanges = mv.getValidRangePartitionMap(partitionTTLNumber);
filterPartitionsByTTL(PRangeCell.toCellMap(inputRanges), false);
return inputRanges.keySet();
}
@Override
public Set<String> getMVPartitionNamesWithTTL(MaterializedView mv, MVRefreshParams mvRefreshParams,
boolean isAutoRefresh) throws AnalysisException {

View File

@ -46,10 +46,6 @@ public class MVRefreshParams {
this.mvPartitionInfo = partitionInfo;
}
public boolean isForceCompleteRefresh() {
return isForce && isCompleteRefresh();
}
public boolean isForce() {
return isForce;
}

View File

@ -3089,30 +3089,15 @@ public class LocalMetastore implements ConnectorMetadata, MVRepairHandler, Memor
stmt.getPartitionByExprToAdjustExprMap());
final long warehouseId = materializedView.getWarehouseId();
try {
Set<Long> tabletIdSet = new HashSet<>();
// process single partition info
if (isNonPartitioned) {
long partitionId = GlobalStateMgr.getCurrentState().getNextId();
Preconditions.checkNotNull(dataProperty);
partitionInfo.setDataProperty(partitionId, dataProperty);
partitionInfo.setReplicationNum(partitionId, materializedView.getDefaultReplicationNum());
partitionInfo.setIsInMemory(partitionId, false);
partitionInfo.setTabletType(partitionId, TTabletType.TABLET_TYPE_DISK);
StorageInfo storageInfo = materializedView.getTableProperty().getStorageInfo();
partitionInfo.setDataCacheInfo(partitionId,
storageInfo == null ? null : storageInfo.getDataCacheInfo());
Long version = Partition.PARTITION_INIT_VERSION;
final WarehouseManager warehouseManager = GlobalStateMgr.getCurrentState().getWarehouseMgr();
final CRAcquireContext acquireContext = CRAcquireContext.of(warehouseId);
final ComputeResource computeResource = warehouseManager.acquireComputeResource(acquireContext);
if (!warehouseManager.isResourceAvailable(computeResource)) {
throw new DdlException("No available resource for warehouse " + warehouseId);
}
Partition partition = createPartition(db, materializedView, partitionId, mvName, version, tabletIdSet,
computeResource);
buildPartitions(db, materializedView, new ArrayList<>(partition.getSubPartitions()), computeResource);
materializedView.addPartition(partition);
buildNonPartitionOlapTable(db, materializedView, partitionInfo, dataProperty, computeResource);
} else {
List<Expr> mvPartitionExprs = stmt.getPartitionByExprs();
LinkedHashMap<Expr, SlotRef> partitionExprMaps = MVPartitionExprResolver.getMVPartitionExprsChecked(
@ -3147,6 +3132,35 @@ public class LocalMetastore implements ConnectorMetadata, MVRepairHandler, Memor
DynamicPartitionUtil.registerOrRemovePartitionTTLTable(db.getId(), materializedView);
}
/**
* Initialize a non-partitioned table with one partition.
*/
public void buildNonPartitionOlapTable(Database db,
OlapTable olapTable,
PartitionInfo partitionInfo,
DataProperty dataProperty,
ComputeResource computeResource) throws DdlException {
if (olapTable.isPartitionedTable()) {
throw new DdlException("Table " + olapTable.getName() + " is a partitioned table, not a non-partitioned table");
}
long partitionId = GlobalStateMgr.getCurrentState().getNextId();
Preconditions.checkNotNull(dataProperty);
partitionInfo.setDataProperty(partitionId, dataProperty);
partitionInfo.setReplicationNum(partitionId, olapTable.getDefaultReplicationNum());
partitionInfo.setIsInMemory(partitionId, false);
partitionInfo.setTabletType(partitionId, TTabletType.TABLET_TYPE_DISK);
StorageInfo storageInfo = olapTable.getTableProperty().getStorageInfo();
partitionInfo.setDataCacheInfo(partitionId,
storageInfo == null ? null : storageInfo.getDataCacheInfo());
Set<Long> tabletIdSet = new HashSet<>();
Long version = Partition.PARTITION_INIT_VERSION;
Partition partition = createPartition(db, olapTable, partitionId, olapTable.getName(), version, tabletIdSet,
computeResource);
buildPartitions(db, olapTable, new ArrayList<>(partition.getSubPartitions()), computeResource);
olapTable.addPartition(partition);
}
private long getRandomStart(IntervalLiteral interval, long randomizeStart) throws DdlException {
if (interval == null || randomizeStart == -1) {
return 0;

View File

@ -199,7 +199,7 @@ public final class RangePartitionDiffer extends PartitionDiffer {
* @param rangeToInclude range to check whether the to be checked range is in
* @return true if included, else false
*/
private static boolean isRangeIncluded(Range<PartitionKey> rangeToCheck, Range<PartitionKey> rangeToInclude) {
public static boolean isRangeIncluded(Range<PartitionKey> rangeToCheck, Range<PartitionKey> rangeToInclude) {
if (rangeToInclude == null) {
return true;
}

View File

@ -33,8 +33,8 @@ public class MockTaskRunProcessor implements TaskRunProcessor {
}
@Override
public void prepare(TaskRunContext context) throws Exception {
// do nothing
public TaskRunContext prepare(TaskRunContext context) throws Exception {
return context;
}
@Override

View File

@ -1650,4 +1650,149 @@ public class PCTRefreshListPartitionOlapTest extends MVTestBase {
}
Assertions.assertEquals(0, mv.getPartitions().size());
}
@Test
public void testMVForceRefresh() throws Exception {
String partitionTable = "CREATE TABLE list_t1 (dt1 date, int1 int)\n" +
"PARTITION BY list(dt1) (\n" +
" PARTITION p1 VALUES IN (\"2025-05-16\") ,\n" +
" PARTITION p2 VALUES IN (\"2025-05-17\") \n" +
")\n";
starRocksAssert.withTable(partitionTable);
String[] sqls = {
"insert into list_t1 partition(p1) values('2025-05-16', 1);",
"insert into list_t1 partition(p2) values('2025-05-17', 1);"
};
for (String sql : sqls) {
executeInsertSql(sql);
}
String mvQuery = "CREATE MATERIALIZED VIEW test_mv1 " +
"PARTITION BY (dt1) " +
"REFRESH DEFERRED MANUAL PROPERTIES (\"partition_refresh_number\"=\"-1\")\n" +
"AS SELECT dt1,sum(int1) from list_t1 group by dt1 \n";
starRocksAssert.withMaterializedView(mvQuery);
MaterializedView mv = getMv("test_mv1");
TaskRun taskRun = buildMVTaskRun(mv, "test");
ExecPlan execPlan;
// explain without force
{
execPlan = getMVRefreshExecPlan(taskRun);
Assertions.assertNotNull(execPlan);
refreshMV("test", mv);
execPlan = getMVRefreshExecPlan(taskRun);
Assertions.assertNull(execPlan);
}
// refresh with force
Map<String, String> props = taskRun.getProperties();
props.put(TaskRun.FORCE, "true");
// explain with refresh
{
ExecuteOption executeOption = new ExecuteOption(taskRun.getTask());
Map<String, String> explainProps = executeOption.getTaskRunProperties();
explainProps.put(TaskRun.FORCE, "true");
execPlan = getMVRefreshExecPlan(taskRun, true);
Assertions.assertNotNull(execPlan);
execPlan = getMVRefreshExecPlan(taskRun, true);
Assertions.assertNotNull(execPlan);
// after refresh, still can refresh with force
execPlan = getMVRefreshExecPlan(taskRun, true);
String plan = execPlan.getExplainString(TExplainLevel.NORMAL);
PlanTestBase.assertContains(plan, " TABLE: list_t1\n" +
" PREAGGREGATION: ON\n" +
" partitions=2/2");
Assertions.assertNotNull(execPlan);
refreshMV("test", mv);
// after refresh, still can refresh with force
execPlan = getMVRefreshExecPlan(taskRun, true);
plan = execPlan.getExplainString(TExplainLevel.NORMAL);
PlanTestBase.assertContains(plan, " TABLE: list_t1\n" +
" PREAGGREGATION: ON\n" +
" partitions=2/2");
Assertions.assertNotNull(execPlan);
}
}
@Test
public void testMVForcePartialRefresh() throws Exception {
String partitionTable = "CREATE TABLE list_t1 (dt1 date, int1 int)\n" +
"PARTITION BY list(dt1) (\n" +
" PARTITION p1 VALUES IN (\"2025-05-16\") ,\n" +
" PARTITION p2 VALUES IN (\"2025-05-17\") \n" +
")\n";
starRocksAssert.withTable(partitionTable);
String[] sqls = {
"insert into list_t1 partition(p1) values('2025-05-16', 1);",
"insert into list_t1 partition(p2) values('2025-05-17', 1);"
};
for (String sql : sqls) {
executeInsertSql(sql);
}
String mvQuery = "CREATE MATERIALIZED VIEW test_mv1 " +
"PARTITION BY (dt1) " +
"REFRESH DEFERRED MANUAL PROPERTIES (\"partition_refresh_number\"=\"-1\")\n" +
"AS SELECT dt1,sum(int1) from list_t1 group by dt1 \n";
starRocksAssert.withMaterializedView(mvQuery);
MaterializedView mv = getMv("test_mv1");
TaskRun taskRun = buildMVTaskRun(mv, "test");
// partial refresh with force
Map<String, String> props = taskRun.getProperties();
props.put(TaskRun.PARTITION_VALUES,
PListCell.batchSerialize(ImmutableSet.of(new PListCell("2025-05-16"))));
ExecPlan execPlan;
// explain without force
{
execPlan = getMVRefreshExecPlan(taskRun);
Assertions.assertNotNull(execPlan);
refreshMV("test", mv);
execPlan = getMVRefreshExecPlan(taskRun);
Assertions.assertNull(execPlan);
}
// explain with partial refresh
{
ExecuteOption executeOption = new ExecuteOption(taskRun.getTask());
Map<String, String> explainProps = executeOption.getTaskRunProperties();
explainProps.put(TaskRun.PARTITION_VALUES,
PListCell.batchSerialize(ImmutableSet.of(new PListCell("2025-05-16"))));
explainProps.put(TaskRun.FORCE, "true");
execPlan = getMVRefreshExecPlan(taskRun, true);
Assertions.assertNotNull(execPlan);
execPlan = getMVRefreshExecPlan(taskRun, true);
Assertions.assertNotNull(execPlan);
// after refresh, still can refresh with force
execPlan = getMVRefreshExecPlan(taskRun, true);
String plan = execPlan.getExplainString(TExplainLevel.NORMAL);
PlanTestBase.assertContains(plan, " TABLE: list_t1\n" +
" PREAGGREGATION: ON\n" +
" partitions=1/2");
Assertions.assertNotNull(execPlan);
refreshMV("test", mv);
// after refresh, still can refresh with force
execPlan = getMVRefreshExecPlan(taskRun, true);
plan = execPlan.getExplainString(TExplainLevel.NORMAL);
PlanTestBase.assertContains(plan, " TABLE: list_t1\n" +
" PREAGGREGATION: ON\n" +
" partitions=1/2");
Assertions.assertNotNull(execPlan);
}
}
}

View File

@ -0,0 +1,106 @@
// Copyright 2021-present StarRocks, Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package com.starrocks.scheduler;
import com.starrocks.catalog.MaterializedView;
import com.starrocks.sql.optimizer.rule.transformation.materialization.MVTestBase;
import com.starrocks.sql.plan.ExecPlan;
import com.starrocks.sql.plan.PlanTestBase;
import com.starrocks.thrift.TExplainLevel;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.MethodOrderer.MethodName;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestMethodOrder;
import java.util.Map;
@TestMethodOrder(MethodName.class)
public class PCTRefreshNonPartitionOlapTest extends MVTestBase {
@BeforeAll
public static void beforeClass() throws Exception {
MVTestBase.beforeClass();
}
@Test
public void testMVForceRefresh() throws Exception {
String partitionTable = "CREATE TABLE range_t1 (dt1 date, int1 int)\n" +
"PARTITION BY date_trunc('day', dt1)";
starRocksAssert.withTable(partitionTable);
addRangePartition("range_t1", "p1", "2024-01-04", "2024-01-05");
addRangePartition("range_t1", "p2", "2024-01-05", "2024-01-06");
String[] sqls = {
"INSERT INTO range_t1 partition(p1) VALUES (\"2024-01-04\",1);",
"INSERT INTO range_t1 partition(p2) VALUES (\"2024-01-05\",1);"
};
for (String sql : sqls) {
executeInsertSql(sql);
}
String mvQuery = "CREATE MATERIALIZED VIEW test_mv1 " +
"REFRESH DEFERRED MANUAL\n" +
"AS SELECT dt1,sum(int1) from range_t1 group by dt1";
starRocksAssert.withMaterializedView(mvQuery);
MaterializedView mv = getMv("test_mv1");
TaskRun taskRun = buildMVTaskRun(mv, "test");
ExecPlan execPlan;
// explain without force
{
execPlan = getMVRefreshExecPlan(taskRun);
Assertions.assertNotNull(execPlan);
refreshMV("test", mv);
execPlan = getMVRefreshExecPlan(taskRun);
Assertions.assertNull(execPlan);
}
// refresh with force
Map<String, String> props = taskRun.getProperties();
props.put(TaskRun.FORCE, "true");
// explain with refresh
{
ExecuteOption executeOption = new ExecuteOption(taskRun.getTask());
Map<String, String> explainProps = executeOption.getTaskRunProperties();
explainProps.put(TaskRun.FORCE, "true");
execPlan = getMVRefreshExecPlan(taskRun, true);
Assertions.assertNotNull(execPlan);
execPlan = getMVRefreshExecPlan(taskRun, true);
Assertions.assertNotNull(execPlan);
// after refresh, still can refresh with force
execPlan = getMVRefreshExecPlan(taskRun, true);
String plan = execPlan.getExplainString(TExplainLevel.NORMAL);
PlanTestBase.assertContains(plan, " TABLE: range_t1\n" +
" PREAGGREGATION: ON\n" +
" partitions=2/2");
Assertions.assertNotNull(execPlan);
refreshMV("test", mv);
// after refresh, still can refresh with force
execPlan = getMVRefreshExecPlan(taskRun, true);
plan = execPlan.getExplainString(TExplainLevel.NORMAL);
PlanTestBase.assertContains(plan, " TABLE: range_t1\n" +
" PREAGGREGATION: ON\n" +
" partitions=2/2");
Assertions.assertNotNull(execPlan);
}
}
}

View File

@ -0,0 +1,181 @@
// Copyright 2021-present StarRocks, Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package com.starrocks.scheduler;
import com.starrocks.catalog.MaterializedView;
import com.starrocks.sql.optimizer.rule.transformation.materialization.MVTestBase;
import com.starrocks.sql.plan.ExecPlan;
import com.starrocks.sql.plan.PlanTestBase;
import com.starrocks.thrift.TExplainLevel;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.MethodOrderer.MethodName;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestMethodOrder;
import java.util.Map;
@TestMethodOrder(MethodName.class)
public class PCTRefreshRangePartitionOlapTest extends MVTestBase {
@BeforeAll
public static void beforeClass() throws Exception {
MVTestBase.beforeClass();
}
@Test
public void testMVForceRefresh() throws Exception {
String partitionTable = "CREATE TABLE range_t1 (dt1 date, int1 int)\n" +
"PARTITION BY date_trunc('day', dt1)";
starRocksAssert.withTable(partitionTable);
addRangePartition("range_t1", "p1", "2024-01-04", "2024-01-05");
addRangePartition("range_t1", "p2", "2024-01-05", "2024-01-06");
String[] sqls = {
"INSERT INTO range_t1 partition(p1) VALUES (\"2024-01-04\",1);",
"INSERT INTO range_t1 partition(p2) VALUES (\"2024-01-05\",1);"
};
for (String sql : sqls) {
executeInsertSql(sql);
}
String mvQuery = "CREATE MATERIALIZED VIEW test_mv1 " +
"PARTITION BY date_trunc('day', dt1) " +
"REFRESH DEFERRED MANUAL PROPERTIES (\"partition_refresh_number\"=\"-1\")\n" +
"AS SELECT dt1,sum(int1) from range_t1 group by dt1";
starRocksAssert.withMaterializedView(mvQuery);
MaterializedView mv = getMv("test_mv1");
TaskRun taskRun = buildMVTaskRun(mv, "test");
ExecPlan execPlan;
// explain without force
{
execPlan = getMVRefreshExecPlan(taskRun);
Assertions.assertNotNull(execPlan);
refreshMV("test", mv);
execPlan = getMVRefreshExecPlan(taskRun);
Assertions.assertNull(execPlan);
}
// refresh with force
Map<String, String> props = taskRun.getProperties();
props.put(TaskRun.FORCE, "true");
// explain with refresh
{
ExecuteOption executeOption = new ExecuteOption(taskRun.getTask());
Map<String, String> explainProps = executeOption.getTaskRunProperties();
explainProps.put(TaskRun.FORCE, "true");
execPlan = getMVRefreshExecPlan(taskRun, true);
Assertions.assertNotNull(execPlan);
execPlan = getMVRefreshExecPlan(taskRun, true);
Assertions.assertNotNull(execPlan);
// after refresh, still can refresh with force
execPlan = getMVRefreshExecPlan(taskRun, true);
String plan = execPlan.getExplainString(TExplainLevel.NORMAL);
PlanTestBase.assertContains(plan, " TABLE: range_t1\n" +
" PREAGGREGATION: ON\n" +
" partitions=2/2");
Assertions.assertNotNull(execPlan);
refreshMV("test", mv);
// after refresh, still can refresh with force
execPlan = getMVRefreshExecPlan(taskRun, true);
plan = execPlan.getExplainString(TExplainLevel.NORMAL);
PlanTestBase.assertContains(plan, " TABLE: range_t1\n" +
" PREAGGREGATION: ON\n" +
" partitions=2/2");
Assertions.assertNotNull(execPlan);
}
}
@Test
public void testMVForcePartialRefresh() throws Exception {
String partitionTable = "CREATE TABLE range_t1 (dt1 date, int1 int)\n" +
"PARTITION BY date_trunc('day', dt1)";
starRocksAssert.withTable(partitionTable);
addRangePartition("range_t1", "p1", "2024-01-04", "2024-01-05");
addRangePartition("range_t1", "p2", "2024-01-05", "2024-01-06");
String[] sqls = {
"INSERT INTO range_t1 partition(p1) VALUES (\"2024-01-04\",1);",
"INSERT INTO range_t1 partition(p2) VALUES (\"2024-01-05\",1);"
};
for (String sql : sqls) {
executeInsertSql(sql);
}
String mvQuery = "CREATE MATERIALIZED VIEW test_mv1 " +
"PARTITION BY date_trunc('day', dt1) " +
"REFRESH DEFERRED MANUAL PROPERTIES (\"partition_refresh_number\"=\"-1\")\n" +
"AS SELECT dt1,sum(int1) from range_t1 group by dt1";
starRocksAssert.withMaterializedView(mvQuery);
MaterializedView mv = getMv("test_mv1");
TaskRun taskRun = buildMVTaskRun(mv, "test");
Map<String, String> props = taskRun.getProperties();
props.put(TaskRun.PARTITION_START, "2024-01-04");
props.put(TaskRun.PARTITION_END, "2024-01-05");
ExecPlan execPlan;
// explain without force
{
execPlan = getMVRefreshExecPlan(taskRun);
Assertions.assertNotNull(execPlan);
refreshMV("test", mv);
execPlan = getMVRefreshExecPlan(taskRun);
Assertions.assertNull(execPlan);
}
// refresh with force
props.put(TaskRun.FORCE, "true");
// explain with refresh
{
ExecuteOption executeOption = new ExecuteOption(taskRun.getTask());
Map<String, String> explainProps = executeOption.getTaskRunProperties();
explainProps.put(TaskRun.FORCE, "true");
explainProps.put(TaskRun.PARTITION_START, "2024-01-04");
explainProps.put(TaskRun.PARTITION_END, "2024-01-05");
execPlan = getMVRefreshExecPlan(taskRun, true);
Assertions.assertNotNull(execPlan);
execPlan = getMVRefreshExecPlan(taskRun, true);
Assertions.assertNotNull(execPlan);
// after refresh, still can refresh with force
execPlan = getMVRefreshExecPlan(taskRun, true);
String plan = execPlan.getExplainString(TExplainLevel.NORMAL);
PlanTestBase.assertContains(plan, " TABLE: range_t1\n" +
" PREAGGREGATION: ON\n" +
" partitions=1/2");
Assertions.assertNotNull(execPlan);
refreshMV("test", mv);
// after refresh, still can refresh with force
execPlan = getMVRefreshExecPlan(taskRun, true);
plan = execPlan.getExplainString(TExplainLevel.NORMAL);
PlanTestBase.assertContains(plan, " TABLE: range_t1\n" +
" PREAGGREGATION: ON\n" +
" partitions=1/2");
Assertions.assertNotNull(execPlan);
}
}
}

View File

@ -239,7 +239,7 @@ public class PartitionBasedMvRefreshProcessorJdbcTest extends MVTestBase {
.collect(Collectors.toList());
Assertions.assertEquals(Arrays.asList("p000101_202308", "p202308_202309"), partitions);
// mv partition p202308_202309 is force refreshed and visible version is increased
Assertions.assertEquals(partitionVersionMap.get("p202308_202309") + 1,
Assertions.assertEquals(partitionVersionMap.get("p202308_202309"),
materializedView.getPartition("p202308_202309")
.getDefaultPhysicalPartition().getVisibleVersion());
}

View File

@ -421,15 +421,15 @@ public class PartitionBasedMvRefreshProcessorOlapTest extends MVTestBase {
.getDefaultPhysicalPartition().getVisibleVersion());
refreshMVRange(materializedView.getName(), "2021-12-03", "2022-05-06", true);
Assertions.assertEquals(3, materializedView.getPartition("p202112_202201")
Assertions.assertEquals(2, materializedView.getPartition("p202112_202201")
.getDefaultPhysicalPartition().getVisibleVersion());
Assertions.assertEquals(3, materializedView.getPartition("p202201_202202")
Assertions.assertEquals(2, materializedView.getPartition("p202201_202202")
.getDefaultPhysicalPartition().getVisibleVersion());
Assertions.assertEquals(2, materializedView.getPartition("p202202_202203")
.getDefaultPhysicalPartition().getVisibleVersion());
Assertions.assertEquals(2, materializedView.getPartition("p202203_202204")
.getDefaultPhysicalPartition().getVisibleVersion());
Assertions.assertEquals(3, materializedView.getPartition("p202204_202205")
Assertions.assertEquals(2, materializedView.getPartition("p202204_202205")
.getDefaultPhysicalPartition().getVisibleVersion());
}

View File

@ -411,6 +411,13 @@ public class MVTestBase extends StarRocksTestBase {
}
protected static ExecPlan getMVRefreshExecPlan(TaskRun taskRun) throws Exception {
return getMVRefreshExecPlan(taskRun, false);
}
protected static ExecPlan getMVRefreshExecPlan(TaskRun taskRun, boolean isForce) throws Exception {
if (isForce) {
taskRun.getProperties().put(TaskRun.FORCE, "true");
}
initAndExecuteTaskRun(taskRun);
PartitionBasedMvRefreshProcessor processor = (PartitionBasedMvRefreshProcessor)
taskRun.getProcessor();
@ -610,7 +617,7 @@ public class MVTestBase extends StarRocksTestBase {
}
});
}
public static List<String> extractColumnValues(String sql, int columnIndex) {
List<String> result = new ArrayList<>();

View File

@ -0,0 +1,178 @@
-- name: test_mv_force_refresh
create database db_${uuid0};
-- result:
-- !result
use db_${uuid0};
-- result:
-- !result
CREATE TABLE `t1` (
`k1` date,
`k2` int,
`k3` int
)
DUPLICATE KEY(`k1`)
PARTITION BY RANGE (k1) (
START ("2020-10-01") END ("2022-03-04") EVERY (INTERVAL 15 day)
)
DISTRIBUTED BY HASH(`k1`) BUCKETS 3;
-- result:
-- !result
INSERT INTO t1 VALUES ("2020-11-10",1,1),("2020-11-11",2,2),("2020-11-12",3,3);
-- result:
-- !result
CREATE MATERIALIZED VIEW test_mv1
PARTITION BY k1
DISTRIBUTED BY HASH(k1) BUCKETS 10
REFRESH DEFERRED MANUAL
PROPERTIES ("partition_refresh_number"="-1")
AS SELECT * FROM t1;
-- result:
-- !result
REFRESH MATERIALIZED VIEW test_mv1 WITH SYNC MODE;
select * from test_mv1 order by k1, k2;
-- result:
2020-11-10 1 1
2020-11-11 2 2
2020-11-12 3 3
-- !result
REFRESH MATERIALIZED VIEW test_mv1 FORCE WITH SYNC MODE;
select * from test_mv1 order by k1, k2;
-- result:
2020-11-10 1 1
2020-11-11 2 2
2020-11-12 3 3
-- !result
REFRESH MATERIALIZED VIEW test_mv1 PARTITION START("2020-11-10") END("2020-11-11") FORCE WITH SYNC MODE;
select * from test_mv1 order by k1, k2;
-- result:
2020-11-10 1 1
2020-11-11 2 2
2020-11-12 3 3
-- !result
INSERT INTO t1 VALUES ("2020-11-10",4,4);
-- result:
-- !result
REFRESH MATERIALIZED VIEW test_mv1 FORCE WITH SYNC MODE;
select * from test_mv1 order by k1, k2;
-- result:
2020-11-10 1 1
2020-11-10 4 4
2020-11-11 2 2
2020-11-12 3 3
-- !result
DROP MATERIALIZED VIEW test_mv1;
-- result:
-- !result
CREATE TABLE `t2` (
`k1` date,
`k2` int,
`k3` int
)
DUPLICATE KEY(`k1`)
DISTRIBUTED BY HASH(`k1`) BUCKETS 3;
-- result:
-- !result
INSERT INTO t2 VALUES ("2020-11-10",1,1),("2020-11-11",2,2),("2020-11-12",3,3);
-- result:
-- !result
CREATE MATERIALIZED VIEW test_mv1
DISTRIBUTED BY HASH(k1) BUCKETS 10
REFRESH DEFERRED MANUAL
AS SELECT * FROM t2;
-- result:
-- !result
REFRESH MATERIALIZED VIEW test_mv1 WITH SYNC MODE;
select * from test_mv1 order by k1, k2;
-- result:
2020-11-10 1 1
2020-11-11 2 2
2020-11-12 3 3
-- !result
REFRESH MATERIALIZED VIEW test_mv1 FORCE WITH SYNC MODE;
select * from test_mv1 order by k1, k2;
-- result:
2020-11-10 1 1
2020-11-11 2 2
2020-11-12 3 3
-- !result
INSERT INTO t2 VALUES ("2020-11-10",4,4);
-- result:
-- !result
REFRESH MATERIALIZED VIEW test_mv1 FORCE WITH SYNC MODE;
select * from test_mv1 order by k1, k2;
-- result:
2020-11-10 1 1
2020-11-10 4 4
2020-11-11 2 2
2020-11-12 3 3
-- !result
DROP MATERIALIZED VIEW test_mv1;
-- result:
-- !result
CREATE TABLE t3 (
k1 date,
k2 int,
k3 int
)
DUPLICATE KEY(k1)
PARTITION BY date_trunc("day", k1), k2;
-- result:
-- !result
INSERT INTO t3 VALUES ("2020-11-10",1,1),("2020-11-11",2,2),("2020-11-12",3,3);
-- result:
-- !result
CREATE MATERIALIZED VIEW test_mv1
partition by (date_trunc("day", k1), k2)
REFRESH DEFERRED MANUAL
PROPERTIES ("partition_refresh_number"="-1")
AS select k1, k2, sum(k3) from t3 group by k1, k2;
-- result:
-- !result
REFRESH MATERIALIZED VIEW test_mv1 WITH SYNC MODE;
select * from test_mv1 order by k1, k2;
-- result:
2020-11-10 1 1
2020-11-11 2 2
2020-11-12 3 3
-- !result
REFRESH MATERIALIZED VIEW test_mv1 FORCE WITH SYNC MODE;
select * from test_mv1 order by k1, k2;
-- result:
2020-11-10 1 1
2020-11-11 2 2
2020-11-12 3 3
-- !result
INSERT INTO t3 VALUES ("2020-11-10",4,4);
-- result:
-- !result
REFRESH MATERIALIZED VIEW test_mv1 PARTITION (("2020-11-10", "4")) FORCE WITH SYNC MODE;
select * from test_mv1 order by k1, k2;
-- result:
2020-11-10 1 1
2020-11-10 4 4
2020-11-11 2 2
2020-11-12 3 3
-- !result
REFRESH MATERIALIZED VIEW test_mv1 FORCE WITH SYNC MODE;
select * from test_mv1 order by k1, k2;
-- result:
2020-11-10 1 1
2020-11-10 4 4
2020-11-11 2 2
2020-11-12 3 3
-- !result
DROP MATERIALIZED VIEW test_mv1;
-- result:
-- !result
drop table t1;
-- result:
-- !result
drop table t2;
-- result:
-- !result
drop table t3;
-- result:
-- !result
drop database db_${uuid0} force;
-- result:
-- !result

View File

@ -0,0 +1,104 @@
-- name: test_mv_force_refresh
create database db_${uuid0};
use db_${uuid0};
-- range partition table
CREATE TABLE `t1` (
`k1` date,
`k2` int,
`k3` int
)
DUPLICATE KEY(`k1`)
PARTITION BY RANGE (k1) (
START ("2020-10-01") END ("2022-03-04") EVERY (INTERVAL 15 day)
)
DISTRIBUTED BY HASH(`k1`) BUCKETS 3;
INSERT INTO t1 VALUES ("2020-11-10",1,1),("2020-11-11",2,2),("2020-11-12",3,3);
CREATE MATERIALIZED VIEW test_mv1
PARTITION BY k1
DISTRIBUTED BY HASH(k1) BUCKETS 10
REFRESH DEFERRED MANUAL
PROPERTIES ("partition_refresh_number"="-1")
AS SELECT * FROM t1;
REFRESH MATERIALIZED VIEW test_mv1 WITH SYNC MODE;
select * from test_mv1 order by k1, k2;
REFRESH MATERIALIZED VIEW test_mv1 FORCE WITH SYNC MODE;
select * from test_mv1 order by k1, k2;
REFRESH MATERIALIZED VIEW test_mv1 PARTITION START("2020-11-10") END("2020-11-11") FORCE WITH SYNC MODE;
select * from test_mv1 order by k1, k2;
INSERT INTO t1 VALUES ("2020-11-10",4,4);
REFRESH MATERIALIZED VIEW test_mv1 FORCE WITH SYNC MODE;
select * from test_mv1 order by k1, k2;
DROP MATERIALIZED VIEW test_mv1;
-- non partition table
CREATE TABLE `t2` (
`k1` date,
`k2` int,
`k3` int
)
DUPLICATE KEY(`k1`)
DISTRIBUTED BY HASH(`k1`) BUCKETS 3;
INSERT INTO t2 VALUES ("2020-11-10",1,1),("2020-11-11",2,2),("2020-11-12",3,3);
CREATE MATERIALIZED VIEW test_mv1
DISTRIBUTED BY HASH(k1) BUCKETS 10
REFRESH DEFERRED MANUAL
AS SELECT * FROM t2;
REFRESH MATERIALIZED VIEW test_mv1 WITH SYNC MODE;
select * from test_mv1 order by k1, k2;
REFRESH MATERIALIZED VIEW test_mv1 FORCE WITH SYNC MODE;
select * from test_mv1 order by k1, k2;
INSERT INTO t2 VALUES ("2020-11-10",4,4);
REFRESH MATERIALIZED VIEW test_mv1 FORCE WITH SYNC MODE;
select * from test_mv1 order by k1, k2;
DROP MATERIALIZED VIEW test_mv1;
-- list partition table
CREATE TABLE t3 (
k1 date,
k2 int,
k3 int
)
DUPLICATE KEY(k1)
PARTITION BY date_trunc("day", k1), k2;
INSERT INTO t3 VALUES ("2020-11-10",1,1),("2020-11-11",2,2),("2020-11-12",3,3);
CREATE MATERIALIZED VIEW test_mv1
partition by (date_trunc("day", k1), k2)
REFRESH DEFERRED MANUAL
PROPERTIES ("partition_refresh_number"="-1")
AS select k1, k2, sum(k3) from t3 group by k1, k2;
REFRESH MATERIALIZED VIEW test_mv1 WITH SYNC MODE;
select * from test_mv1 order by k1, k2;
REFRESH MATERIALIZED VIEW test_mv1 FORCE WITH SYNC MODE;
select * from test_mv1 order by k1, k2;
INSERT INTO t3 VALUES ("2020-11-10",4,4);
REFRESH MATERIALIZED VIEW test_mv1 PARTITION (("2020-11-10", "4")) FORCE WITH SYNC MODE;
select * from test_mv1 order by k1, k2;
REFRESH MATERIALIZED VIEW test_mv1 FORCE WITH SYNC MODE;
select * from test_mv1 order by k1, k2;
DROP MATERIALIZED VIEW test_mv1;
drop table t1;
drop table t2;
drop table t3;
drop database db_${uuid0} force;