feat: Assign unique label to MV refresh loads
This change ensures that each materialized view refresh operation is assigned a unique load label in the format "mv_{uuid}". This improves traceability and management of MV refresh tasks.
Co-authored-by: huanmingwong <huanmingwong@gmail.com>
This commit is contained in:
parent
1dbf05f5dd
commit
8e2dd745db
|
|
@ -417,6 +417,12 @@ public abstract class BaseMVRefreshProcessor {
|
||||||
|
|
||||||
// insert overwrite mv must set system = true
|
// insert overwrite mv must set system = true
|
||||||
insertStmt.setSystem(true);
|
insertStmt.setSystem(true);
|
||||||
|
// set a unique load label for MV refresh: mv_{uuid}
|
||||||
|
if (insertStmt.getLabel() == null || insertStmt.getLabel().isEmpty()) {
|
||||||
|
insertStmt.setLabel(
|
||||||
|
com.starrocks.sql.common.MetaUtils.genMVLabel(
|
||||||
|
com.starrocks.common.util.UUIDUtil.toTUniqueId(ctx.getQueryId())));
|
||||||
|
}
|
||||||
// if mv has set sort keys, materialized view's output columns
|
// if mv has set sort keys, materialized view's output columns
|
||||||
// may be different from the defined query's output.
|
// may be different from the defined query's output.
|
||||||
// so set materialized view's defined outputs as target columns.
|
// so set materialized view's defined outputs as target columns.
|
||||||
|
|
|
||||||
|
|
@ -154,6 +154,14 @@ public class MetaUtils {
|
||||||
return "update_" + DebugUtil.printId(executionId);
|
return "update_" + DebugUtil.printId(executionId);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Generate a transaction label for MV refresh loads.
|
||||||
|
* The label format is: mv_{uuid}
|
||||||
|
*/
|
||||||
|
public static String genMVLabel(TUniqueId executionId) {
|
||||||
|
return "mv_" + DebugUtil.printId(executionId);
|
||||||
|
}
|
||||||
|
|
||||||
public static ExternalOlapTable syncOLAPExternalTableMeta(ExternalOlapTable externalOlapTable) {
|
public static ExternalOlapTable syncOLAPExternalTableMeta(ExternalOlapTable externalOlapTable) {
|
||||||
ExternalOlapTable copiedTable = new ExternalOlapTable();
|
ExternalOlapTable copiedTable = new ExternalOlapTable();
|
||||||
externalOlapTable.copyOnlyForQuery(copiedTable);
|
externalOlapTable.copyOnlyForQuery(copiedTable);
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,60 @@
|
||||||
|
// Copyright 2021-present StarRocks, Inc. All rights reserved.
|
||||||
|
//
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// https://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
|
||||||
|
package com.starrocks.scheduler;
|
||||||
|
|
||||||
|
import com.starrocks.catalog.MaterializedView;
|
||||||
|
import com.starrocks.common.util.UUIDUtil;
|
||||||
|
import com.starrocks.scheduler.mv.BaseMVRefreshProcessor;
|
||||||
|
import com.starrocks.server.GlobalStateMgr;
|
||||||
|
import com.starrocks.sql.optimizer.rule.transformation.materialization.MVTestBase;
|
||||||
|
import org.junit.jupiter.api.Assertions;
|
||||||
|
import org.junit.jupiter.api.BeforeAll;
|
||||||
|
import org.junit.jupiter.api.Test;
|
||||||
|
|
||||||
|
public class MvRefreshLabelTest extends MVTestBase {
|
||||||
|
|
||||||
|
@BeforeAll
|
||||||
|
public static void setup() throws Exception {
|
||||||
|
MVTestBase.beforeClass();
|
||||||
|
starRocksAssert.withTable("CREATE TABLE base_label_t1 (k1 int) DUPLICATE KEY(k1) DISTRIBUTED BY HASH(k1) BUCKETS 3 PROPERTIES('replication_num'='1')");
|
||||||
|
cluster.runSql(DB_NAME, "insert into base_label_t1 values (1), (2)");
|
||||||
|
starRocksAssert.withMaterializedView("CREATE MATERIALIZED VIEW mv_label_test REFRESH MANUAL AS select k1 from base_label_t1");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testMVRefreshAssignsMvLabel() throws Exception {
|
||||||
|
MaterializedView mv = (MaterializedView) starRocksAssert.getTable(DB_NAME, "mv_label_test");
|
||||||
|
|
||||||
|
Task task = TaskBuilder.buildMvTask(mv, DB_NAME);
|
||||||
|
TaskManager taskManager = GlobalStateMgr.getCurrentState().getTaskManager();
|
||||||
|
TaskRun taskRun = taskManager.buildTaskRun(task, new ExecuteOption(Constants.TaskRunPriority.HIGH.value(), true));
|
||||||
|
// init status and build context
|
||||||
|
String queryId = UUIDUtil.genUUID().toString();
|
||||||
|
taskRun.initStatus(queryId, System.currentTimeMillis());
|
||||||
|
TaskRunContext ctx = taskRun.buildTaskRunContext();
|
||||||
|
|
||||||
|
MVTaskRunProcessor proc = (MVTaskRunProcessor) taskRun.getProcessor();
|
||||||
|
proc.prepare(ctx);
|
||||||
|
BaseMVRefreshProcessor.ProcessExecPlan plan = proc.getMVRefreshProcessor().getProcessExecPlan(proc.getMvTaskRunContext());
|
||||||
|
|
||||||
|
if (plan != null && plan.insertStmt() != null) {
|
||||||
|
String label = plan.insertStmt().getLabel();
|
||||||
|
// The label is assigned during planning; it should either be null (skipped) or start with mv_
|
||||||
|
if (label != null) {
|
||||||
|
Assertions.assertTrue(label.startsWith("mv_"), "MV refresh label should start with mv_ but was: " + label);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1,39 @@
|
||||||
|
-- name: test_mv_refresh_label
|
||||||
|
create database db_${uuid0};
|
||||||
|
-- result:
|
||||||
|
-- !result
|
||||||
|
use db_${uuid0};
|
||||||
|
-- result:
|
||||||
|
-- !result
|
||||||
|
CREATE TABLE t_base (k1 int) DUPLICATE KEY(k1) DISTRIBUTED BY HASH(k1) BUCKETS 1 PROPERTIES("replication_num" = "1");
|
||||||
|
-- result:
|
||||||
|
-- !result
|
||||||
|
insert into t_base values (1), (2);
|
||||||
|
-- result:
|
||||||
|
-- !result
|
||||||
|
CREATE MATERIALIZED VIEW mv_label
|
||||||
|
REFRESH MANUAL
|
||||||
|
AS SELECT k1 FROM t_base;
|
||||||
|
-- result:
|
||||||
|
-- !result
|
||||||
|
refresh materialized view mv_label with sync mode;
|
||||||
|
-- result:
|
||||||
|
-- !result
|
||||||
|
[UC]label=select label from information_schema.loads where db_name='db_${uuid0}';
|
||||||
|
-- result:
|
||||||
|
[REGEX]mv_.+
|
||||||
|
-- !result
|
||||||
|
select case when substr('${label}', 1, 3) = 'mv_' then 1 else 0 end;
|
||||||
|
-- result:
|
||||||
|
1
|
||||||
|
-- !result
|
||||||
|
|
||||||
|
drop materialized view mv_label;
|
||||||
|
-- result:
|
||||||
|
-- !result
|
||||||
|
drop table t_base;
|
||||||
|
-- result:
|
||||||
|
-- !result
|
||||||
|
drop database db_${uuid0};
|
||||||
|
-- result:
|
||||||
|
-- !result
|
||||||
|
|
@ -0,0 +1,20 @@
|
||||||
|
-- name: test_mv_refresh_label
|
||||||
|
create database db_${uuid0};
|
||||||
|
use db_${uuid0};
|
||||||
|
|
||||||
|
CREATE TABLE t_base (k1 int) DUPLICATE KEY(k1) DISTRIBUTED BY HASH(k1) BUCKETS 1 PROPERTIES("replication_num" = "1");
|
||||||
|
insert into t_base values (1), (2);
|
||||||
|
|
||||||
|
CREATE MATERIALIZED VIEW mv_label
|
||||||
|
REFRESH MANUAL
|
||||||
|
AS SELECT k1 FROM t_base;
|
||||||
|
|
||||||
|
refresh materialized view mv_label with sync mode;
|
||||||
|
|
||||||
|
-- capture a label from information_schema.loads for this db
|
||||||
|
[UC]label=select label from information_schema.loads where db_name='db_${uuid0}';
|
||||||
|
select case when substr('${label}', 1, 3) = 'mv_' then 1 else 0 end;
|
||||||
|
|
||||||
|
drop materialized view mv_label;
|
||||||
|
drop table t_base;
|
||||||
|
drop database db_${uuid0};
|
||||||
Loading…
Reference in New Issue