[Enhancement] Only clear mv's metadata in schema changing (#57371)

Signed-off-by: crossoverJie <crossoverJie@gmail.com>
This commit is contained in:
crossoverJie 2025-03-31 13:55:43 +08:00 committed by GitHub
parent 4a00cf78c9
commit 24d76e1feb
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 117 additions and 2 deletions

View File

@ -223,7 +223,6 @@ public class AlterJobMgr {
List<BaseTableInfo> baseTableInfos =
Lists.newArrayList(MaterializedViewAnalyzer.getBaseTableInfos(mvQueryStatement, !isReplay));
materializedView.setBaseTableInfos(baseTableInfos);
materializedView.getRefreshScheme().getAsyncRefreshContext().clearVisibleVersionMap();
materializedView.onReload();
materializedView.setActive();
} else if (AlterMaterializedViewStatusClause.INACTIVE.equalsIgnoreCase(status)) {

View File

@ -457,7 +457,7 @@ public class AlterMVJobExecutor extends AlterJobExecutor {
// for manual refresh type, do not refresh
if (materializedView.getRefreshScheme().getType() != MaterializedView.RefreshType.MANUAL) {
GlobalStateMgr.getCurrentState().getLocalMetastore()
.refreshMaterializedView(dbName, materializedView.getName(), true, null,
.refreshMaterializedView(dbName, materializedView.getName(), false, null,
Constants.TaskRunPriority.NORMAL.value(), true, false);
}
} else if (AlterMaterializedViewStatusClause.INACTIVE.equalsIgnoreCase(status)) {
@ -611,6 +611,9 @@ public class AlterMVJobExecutor extends AlterJobExecutor {
if (mv.getColumns().stream().anyMatch(x -> modifiedColumns.contains(x.getName()))) {
doInactiveMaterializedView(mv, reason);
}
} finally {
// clear version map to make sure the MV will be refreshed
mv.getRefreshScheme().getAsyncRefreshContext().clearVisibleVersionMap();
}
}
}

View File

@ -832,6 +832,8 @@ public class LakeTableSchemaChangeJob extends LakeTableSchemaChangeJobBase {
mvColumn.getName(), tbl.getName());
mv.setInactiveAndReason(
MaterializedViewExceptions.inactiveReasonForColumnChanged(modifiedColumns));
// clear version map to make sure the MV will be refreshed
mv.getRefreshScheme().getAsyncRefreshContext().clearVisibleVersionMap();
return;
}
}

View File

@ -17,8 +17,12 @@ package com.starrocks.analysis;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.starrocks.alter.AlterJobMgr;
import com.starrocks.alter.AlterMVJobExecutor;
import com.starrocks.catalog.Column;
import com.starrocks.catalog.Database;
import com.starrocks.catalog.MaterializedView;
import com.starrocks.catalog.OlapTable;
import com.starrocks.catalog.Table;
import com.starrocks.common.AnalysisException;
import com.starrocks.common.util.TimeUtils;
import com.starrocks.qe.ConnectContext;
@ -39,8 +43,11 @@ import java.time.Duration;
import java.time.LocalDateTime;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import static com.starrocks.analysis.CreateSyncMaterializedViewTest.executeInsertSql;
public class AlterMaterializedViewTest {
private static ConnectContext connectContext;
private static StarRocksAssert starRocksAssert;
@ -183,6 +190,110 @@ public class AlterMaterializedViewTest {
}
}
@Test
public void testInactiveMV() throws Exception {
starRocksAssert
.withTable("CREATE TABLE IF NOT EXISTS par_tbl1\n" +
"(\n" +
" datekey DATETIME,\n" +
" item_id STRING,\n" +
" v1 INT\n" +
")PRIMARY KEY (`datekey`,`item_id`)\n" +
" PARTITION BY date_trunc('day', `datekey`);");
executeInsertSql(connectContext, "INSERT INTO par_tbl1 values ('2025-01-01', '1', 1);");
executeInsertSql(connectContext, "INSERT INTO par_tbl1 values ('2025-01-02', '1', 1);");
starRocksAssert
.withTable("CREATE TABLE IF NOT EXISTS par_tbl2\n" +
"(\n" +
" datekey DATETIME,\n" +
" item_id STRING,\n" +
" v1 INT\n" +
")PRIMARY KEY (`datekey`,`item_id`)\n" +
" PARTITION BY date_trunc('day', `datekey`);");
executeInsertSql(connectContext, "INSERT INTO par_tbl2 values ('2025-01-01', '1', 2);");
executeInsertSql(connectContext, "INSERT INTO par_tbl2 values ('2025-01-02', '1', 1);");
starRocksAssert
.withTable("CREATE TABLE IF NOT EXISTS dim_data\n" +
"(\n" +
" item_id STRING,\n" +
" v1 INT\n" +
")PRIMARY KEY (`item_id`);");
executeInsertSql(connectContext, "INSERT INTO dim_data values ('1', 4);");
starRocksAssert
.withMaterializedView("CREATE\n" +
"MATERIALIZED VIEW mv_dim_data1\n" +
"REFRESH ASYNC EVERY(INTERVAL 60 MINUTE)\n" +
"AS\n" +
"select *\n" +
"from dim_data;");
starRocksAssert
.withMaterializedView("CREATE\n" +
"MATERIALIZED VIEW mv_test1\n" +
"REFRESH ASYNC EVERY(INTERVAL 60 MINUTE)\n" +
"PARTITION BY p_time\n" +
"PROPERTIES (\n" +
"\"excluded_trigger_tables\" = \"mv_dim_data1\",\n" +
"\"excluded_refresh_tables\" = \"mv_dim_data1\",\n" +
"\"partition_refresh_number\" = \"1\"\n" +
")\n" +
"AS\n" +
"select date_trunc(\"day\", a.datekey) as p_time, sum(a.v1) + sum(b.v1) as v1\n" +
"from par_tbl1 a\n" +
" left join par_tbl2 b on a.datekey = b.datekey and a.item_id = b.item_id\n" +
" left join mv_dim_data1 d on a.item_id = d.item_id\n" +
"group by date_trunc(\"day\", a.datekey), a.item_id;");
starRocksAssert.refreshMV("refresh materialized view mv_test1");
MaterializedView mv = (MaterializedView) starRocksAssert.getTable(connectContext.getDatabase(), "mv_test1");
Assert.assertTrue(starRocksAssert.waitRefreshFinished(mv.getId()));
Map<Long, Map<String, MaterializedView.BasePartitionInfo>> baseTableVisibleVersionMap =
mv.getRefreshScheme().getAsyncRefreshContext().getBaseTableVisibleVersionMap();
Assert.assertTrue(!baseTableVisibleVersionMap.isEmpty());
String alterMvSql = "alter materialized view mv_test1 INACTIVE";
AlterMaterializedViewStmt stmt =
(AlterMaterializedViewStmt) UtFrameUtils.parseStmtWithNewParser(alterMvSql, connectContext);
currentState.getLocalMetastore().alterMaterializedView(stmt);
Assert.assertFalse(mv.isActive());
alterMvSql = "alter materialized view mv_test1 ACTIVE";
stmt = (AlterMaterializedViewStmt) UtFrameUtils.parseStmtWithNewParser(alterMvSql, connectContext);
currentState.getLocalMetastore().alterMaterializedView(stmt);
Assert.assertTrue(starRocksAssert.waitRefreshFinished(mv.getId()));
Assert.assertTrue(mv.isActive());
baseTableVisibleVersionMap =
mv.getRefreshScheme().getAsyncRefreshContext().getBaseTableVisibleVersionMap();
Assert.assertTrue(!baseTableVisibleVersionMap.isEmpty());
alterMvSql = "alter materialized view mv_test1 INACTIVE";
stmt = (AlterMaterializedViewStmt) UtFrameUtils.parseStmtWithNewParser(alterMvSql, connectContext);
currentState.getLocalMetastore().alterMaterializedView(stmt);
Assert.assertFalse(mv.isActive());
alterMvSql = "alter materialized view mv_test1 ACTIVE";
stmt = (AlterMaterializedViewStmt) UtFrameUtils.parseStmtWithNewParser(alterMvSql, connectContext);
currentState.getLocalMetastore().alterMaterializedView(stmt);
Assert.assertTrue(starRocksAssert.waitRefreshFinished(mv.getId()));
Assert.assertTrue(mv.isActive());
// Don't refresh base table version map
baseTableVisibleVersionMap = mv.getRefreshScheme().getAsyncRefreshContext().getBaseTableVisibleVersionMap();
Assert.assertTrue(!baseTableVisibleVersionMap.isEmpty());
// inactive mv when base table's schema changed
Database db = starRocksAssert.getDb(connectContext.getDatabase());
Table parTbl1 = starRocksAssert.getTable(connectContext.getDatabase(), "par_tbl1");
AlterMVJobExecutor.inactiveRelatedMaterializedViews(db, (OlapTable) parTbl1, Set.of("item_id"));
baseTableVisibleVersionMap = mv.getRefreshScheme().getAsyncRefreshContext().getBaseTableVisibleVersionMap();
Assert.assertTrue(baseTableVisibleVersionMap.isEmpty());
}
@Test
public void testAlterMVOnView() throws Exception {
final String mvName = "mv_on_view_1";