[BugFix] fix shared-data cluster MV does not support colocation (backport #62941) (#63033)

Signed-off-by: starrocks-xupeng <xupeng@starrocks.com>
Co-authored-by: starrocks-xupeng <xupeng@starrocks.com>
This commit is contained in:
mergify[bot] 2025-09-12 14:37:12 +08:00 committed by GitHub
parent 70280d3da6
commit c8e85a33ae
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
10 changed files with 85 additions and 27 deletions

View File

@ -57,6 +57,7 @@ import com.starrocks.scheduler.Task;
import com.starrocks.scheduler.TaskBuilder;
import com.starrocks.scheduler.TaskManager;
import com.starrocks.server.GlobalStateMgr;
import com.starrocks.server.RunMode;
import com.starrocks.sql.analyzer.MaterializedViewAnalyzer;
import com.starrocks.sql.analyzer.SemanticException;
import com.starrocks.sql.analyzer.SetStmtAnalyzer;
@ -274,11 +275,22 @@ public class AlterMVJobExecutor extends AlterJobExecutor {
properties.remove(PropertyAnalyzer.PROPERTIES_BF_COLUMNS);
}
if (!properties.isEmpty()) {
if (propClone.containsKey(PropertyAnalyzer.PROPERTIES_COLOCATE_WITH)) {
if (properties.containsKey(PropertyAnalyzer.PROPERTIES_COLOCATE_WITH)) {
// TODO: when support shared-nothing mode, must check PROPERTIES_LABELS_LOCATION
if (RunMode.isSharedNothingMode()) {
throw new SemanticException("Modify failed because unsupported properties: " +
"colocate group is not supported for materialized view");
"colocate_with is not supported for materialized view in shared-nothing cluster.");
}
try {
String colocateGroup = PropertyAnalyzer.analyzeColocate(properties);
GlobalStateMgr.getCurrentState().getColocateTableIndex()
.modifyTableColocate(db, materializedView, colocateGroup, false, null);
} catch (DdlException e) {
throw new AlterJobException(e.getMessage(), e);
}
}
if (!properties.isEmpty()) {
// analyze properties
List<SetListItem> setListItems = Lists.newArrayList();
for (Map.Entry<String, String> entry : properties.entrySet()) {

View File

@ -50,7 +50,6 @@ import com.starrocks.common.util.PropertyAnalyzer;
import com.starrocks.common.util.concurrent.lock.AutoCloseableLock;
import com.starrocks.common.util.concurrent.lock.LockType;
import com.starrocks.common.util.concurrent.lock.Locker;
import com.starrocks.lake.LakeTable;
import com.starrocks.persist.ColocatePersistInfo;
import com.starrocks.persist.ImageWriter;
import com.starrocks.persist.TablePropertyInfo;
@ -241,10 +240,9 @@ public class ColocateTableIndex implements Writable {
group2Schema.put(groupId, groupSchema);
}
if (tbl.isCloudNativeTable()) {
if (tbl.isCloudNativeTableOrMaterializedView()) {
if (!isReplay) { // leader create or update meta group
LakeTable ltbl = (LakeTable) tbl;
List<Long> shardGroupIds = ltbl.getShardGroupIds();
List<Long> shardGroupIds = tbl.getShardGroupIds();
if (!groupAlreadyExist) {
GlobalStateMgr.getCurrentState().getStarOSAgent().createMetaGroup(groupId.grpId, shardGroupIds);
} else {
@ -321,9 +319,8 @@ public class ColocateTableIndex implements Writable {
GroupId groupId = table2Group.remove(tableId);
if (tbl != null && tbl.isCloudNativeTable() && !isReplay) {
LakeTable ltbl = (LakeTable) tbl;
List<Long> shardGroupIds = ltbl.getShardGroupIds();
if (tbl != null && tbl.isCloudNativeTableOrMaterializedView() && !isReplay) {
List<Long> shardGroupIds = tbl.getShardGroupIds();
try {
GlobalStateMgr.getCurrentState().getStarOSAgent().updateMetaGroup(groupId.grpId, shardGroupIds,
false /* isJoin */);
@ -948,7 +945,7 @@ public class ColocateTableIndex implements Writable {
public void updateLakeTableColocationInfo(OlapTable olapTable, boolean isJoin,
GroupId expectGroupId) throws DdlException {
if (olapTable == null || !olapTable.isCloudNativeTable()) { // skip non-lake table
if (olapTable == null || !olapTable.isCloudNativeTableOrMaterializedView()) { // skip non-lake table
return;
}
@ -962,8 +959,7 @@ public class ColocateTableIndex implements Writable {
groupId = table2Group.get(olapTable.getId());
}
LakeTable ltbl = (LakeTable) olapTable;
List<Long> shardGroupIds = ltbl.getShardGroupIds();
List<Long> shardGroupIds = olapTable.getShardGroupIds();
LOG.info("update meta group id {}, table {}, shard groups: {}, join: {}",
groupId.grpId, olapTable.getId(), shardGroupIds, isJoin);
GlobalStateMgr.getCurrentState().getStarOSAgent().updateMetaGroup(groupId.grpId, shardGroupIds, isJoin);
@ -983,7 +979,7 @@ public class ColocateTableIndex implements Writable {
// database and table should be valid if reach here
Database database = globalStateMgr.getLocalMetastore().getDbIncludeRecycleBin(dbId);
Table table = globalStateMgr.getLocalMetastore().getTableIncludeRecycleBin(database, tableId);
if (table.isCloudNativeTable()) {
if (table.isCloudNativeTableOrMaterializedView()) {
lakeGroups.add(entry.getValue());
}
}

View File

@ -3969,4 +3969,19 @@ public class OlapTable extends Table {
}
return true;
}
// only used for LakeTable and LakeMaterializedView
public List<Long> getShardGroupIds() {
if (RunMode.isSharedNothingMode()) {
return Lists.newArrayList();
}
List<Long> shardGroupIds = new ArrayList<>();
for (Partition p : getAllPartitions()) {
for (MaterializedIndex index : p.getDefaultPhysicalPartition()
.getMaterializedIndices(MaterializedIndex.IndexExtState.ALL)) {
shardGroupIds.add(index.getShardGroupId());
}
}
return shardGroupIds;
}
}

View File

@ -1824,7 +1824,7 @@ public class PropertyAnalyzer {
throw new AnalysisException(": random distribution does not support 'colocate_with'");
}
GlobalStateMgr.getCurrentState().getColocateTableIndex().addTableToGroup(
db, materializedView, colocateGroup, materializedView.isCloudNativeMaterializedView());
db, materializedView, colocateGroup, false /* expectLakeTable */);
}
// enable_query_rewrite

View File

@ -16,6 +16,7 @@ package com.starrocks.lake;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.collect.Lists;
import com.google.common.collect.Range;
import com.staros.proto.FileCacheInfo;
import com.staros.proto.FilePathInfo;
@ -35,6 +36,7 @@ import com.starrocks.catalog.PartitionKey;
import com.starrocks.catalog.RangePartitionInfo;
import com.starrocks.catalog.RecyclePartitionInfo;
import com.starrocks.catalog.TableProperty;
import com.starrocks.common.DdlException;
import com.starrocks.common.io.DeepCopy;
import com.starrocks.common.util.PropertyAnalyzer;
import com.starrocks.server.GlobalStateMgr;
@ -220,4 +222,10 @@ public class LakeMaterializedView extends MaterializedView {
LakeTableHelper.restoreColumnUniqueIdIfNeeded(this);
super.gsonPostProcess();
}
// used in colocate table index, return an empty list for LakeMaterializedView
@Override
public List<List<Long>> getArbitraryTabletBucketsSeq() throws DdlException {
return Lists.newArrayList();
}
}

View File

@ -208,17 +208,6 @@ public class LakeTable extends OlapTable {
return Lists.newArrayList();
}
public List<Long> getShardGroupIds() {
List<Long> shardGroupIds = new ArrayList<>();
for (Partition p : getAllPartitions()) {
for (MaterializedIndex index : p.getDefaultPhysicalPartition()
.getMaterializedIndices(MaterializedIndex.IndexExtState.ALL)) {
shardGroupIds.add(index.getShardGroupId());
}
}
return shardGroupIds;
}
@Override
public String getComment() {
if (!Strings.isNullOrEmpty(comment)) {

View File

@ -746,6 +746,10 @@ public class StarOSAgent {
}
public void updateMetaGroup(long metaGroupId, List<Long> shardGroupIds, boolean isJoin) throws DdlException {
if (shardGroupIds == null || shardGroupIds.isEmpty()) {
return;
}
prepare();
try {

View File

@ -3087,6 +3087,7 @@ public class LocalMetastore implements ConnectorMetadata, MVRepairHandler, Memor
boolean isNonPartitioned = partitionInfo.isUnPartitioned();
DataProperty dataProperty = PropertyAnalyzer.analyzeMVDataProperty(materializedView, properties);
String colocateGroup = properties.get(PropertyAnalyzer.PROPERTIES_COLOCATE_WITH);
PropertyAnalyzer.analyzeMVProperties(db, materializedView, properties, isNonPartitioned,
stmt.getPartitionByExprToAdjustExprMap());
final long warehouseId = materializedView.getWarehouseId();
@ -3123,6 +3124,11 @@ public class LocalMetastore implements ConnectorMetadata, MVRepairHandler, Memor
materializedView.setPartitionExprMaps(partitionExprMaps);
}
// shared-data mv's colocation info must be updated after tablet creation
if (StringUtils.isNotEmpty(colocateGroup)) {
colocateTableIndex.addTableToGroup(db, materializedView, colocateGroup, true /* expectLakeTable */);
}
GlobalStateMgr.getCurrentState().getMaterializedViewMgr().prepareMaintenanceWork(stmt, materializedView);
String storageVolumeId = "";

View File

@ -281,7 +281,7 @@ public class ColocateTableIndexTest {
}
@Mock
public boolean isCloudNativeTable() {
public boolean isCloudNativeTableOrMaterializedView() {
return true;
}

View File

@ -23,6 +23,7 @@ import com.staros.proto.FileStoreInfo;
import com.staros.proto.FileStoreType;
import com.staros.proto.S3FileStoreInfo;
import com.starrocks.catalog.AggregateType;
import com.starrocks.catalog.ColocateTableIndex;
import com.starrocks.catalog.Column;
import com.starrocks.catalog.DataProperty;
import com.starrocks.catalog.Database;
@ -495,4 +496,31 @@ public class LakeMaterializedViewTest {
recyclePartitionInfo = mv3.buildRecyclePartitionInfo(dbId, partition);
Assertions.assertTrue(recyclePartitionInfo instanceof RecycleLakeListPartitionInfo);
}
@Test
public void testMaterializedViewColocation() throws Exception {
starRocksAssert.withMaterializedView("create materialized view mv6\n" +
"distributed by hash(k2) buckets 3\n" +
"PROPERTIES(\n" +
" 'colocate_with' = 'aaa'\n" +
")\n" +
"refresh async\n" +
"as select k2, sum(k3) as total from base_table group by k2;");
Database db = GlobalStateMgr.getCurrentState().getLocalMetastore().getDb(DB);
MaterializedView mv =
(MaterializedView) GlobalStateMgr.getCurrentState().getLocalMetastore().getTable(db.getFullName(), "mv6");
Assertions.assertTrue(mv.isCloudNativeMaterializedView());
ColocateTableIndex index = GlobalStateMgr.getCurrentState().getColocateTableIndex();
Assertions.assertTrue(index.isLakeColocateTable(mv.getId()));
String alterMvSql = "alter materialized view mv6 set ('colocate_with'='');";
StatementBase statement = SqlParser.parseSingleStatement(alterMvSql, connectContext.getSessionVariable().getSqlMode());
StmtExecutor stmtExecutor = new StmtExecutor(connectContext, statement);
stmtExecutor.execute();
Assertions.assertFalse(index.isLakeColocateTable(mv.getId()));
starRocksAssert.dropMaterializedView("mv6");
Assertions.assertNull(GlobalStateMgr.getCurrentState().getLocalMetastore().getTable(db.getFullName(), "mv6"));
}
}