[BugFix] Fix optimize table task submit reject (backport #62300) (#62555)

Signed-off-by: meegoo <meegoo.sr@gmail.com>
Co-authored-by: meegoo <meegoo.sr@gmail.com>
This commit is contained in:
mergify[bot] 2025-09-01 19:16:33 -07:00 committed by GitHub
parent d3dedf1051
commit fd9b3e3c2f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 138 additions and 4 deletions

View File

@ -44,6 +44,7 @@ import com.starrocks.persist.ReplacePartitionOperationLog;
import com.starrocks.persist.gson.GsonPostProcessable;
import com.starrocks.qe.SessionVariable;
import com.starrocks.scheduler.Constants;
import com.starrocks.scheduler.SubmitResult;
import com.starrocks.scheduler.TaskBuilder;
import com.starrocks.scheduler.TaskManager;
import com.starrocks.scheduler.TaskRun;
@ -286,7 +287,12 @@ public class OptimizeJobV2 extends AlterJobV2 implements GsonPostProcessable {
for (OptimizeTask rewriteTask : rewriteTasks) {
try {
taskManager.createTask(rewriteTask, false);
taskManager.executeTask(rewriteTask.getName());
SubmitResult r = taskManager.executeTask(rewriteTask.getName());
if (r.getStatus() == SubmitResult.SubmitStatus.SUBMITTED) {
rewriteTask.setOptimizeTaskState(Constants.TaskRunState.RUNNING);
} else if (r.getStatus() == SubmitResult.SubmitStatus.FAILED) {
rewriteTask.setOptimizeTaskState(Constants.TaskRunState.FAILED);
}
LOG.debug("create rewrite task {}", rewriteTask.toString());
} catch (DdlException e) {
rewriteTask.setOptimizeTaskState(Constants.TaskRunState.FAILED);
@ -329,6 +335,7 @@ public class OptimizeJobV2 extends AlterJobV2 implements GsonPostProcessable {
int progress = 0;
TaskRunManager taskRunManager = GlobalStateMgr.getCurrentState().getTaskManager().getTaskRunManager();
TaskRunScheduler taskRunScheduler = taskRunManager.getTaskRunScheduler();
TaskManager taskManager = GlobalStateMgr.getCurrentState().getTaskManager(); // add: define taskManager
// prepare for the history task info
Set<String> taskNames = Sets.newHashSet();
@ -339,6 +346,14 @@ public class OptimizeJobV2 extends AlterJobV2 implements GsonPostProcessable {
.getTaskRunManager().getTaskRunHistory().lookupHistoryByTaskNames(dbName, taskNames);
for (OptimizeTask rewriteTask : rewriteTasks) {
if (rewriteTask.getOptimizeTaskState() == Constants.TaskRunState.PENDING) {
SubmitResult r = taskManager.executeTask(rewriteTask.getName());
if (r.getStatus() == SubmitResult.SubmitStatus.SUBMITTED) {
rewriteTask.setOptimizeTaskState(Constants.TaskRunState.RUNNING);
} else if (r.getStatus() == SubmitResult.SubmitStatus.FAILED) {
rewriteTask.setOptimizeTaskState(Constants.TaskRunState.FAILED);
}
}
if (rewriteTask.getOptimizeTaskState() == Constants.TaskRunState.FAILED
|| rewriteTask.getOptimizeTaskState() == Constants.TaskRunState.SUCCESS) {
progress += 100 / rewriteTasks.size();

View File

@ -26,6 +26,7 @@ import com.starrocks.catalog.Replica;
import com.starrocks.common.Config;
import com.starrocks.common.util.ThreadUtil;
import com.starrocks.scheduler.Constants;
import com.starrocks.scheduler.TaskBuilder;
import com.starrocks.scheduler.persist.TaskRunStatus;
import com.starrocks.server.GlobalStateMgr;
import com.starrocks.sql.analyzer.DDLTestBase;
@ -451,9 +452,7 @@ public class OptimizeJobV2Test extends DDLTestBase {
// runWaitingTxnJob
optimizeJob.runWaitingTxnJob();
if (optimizeJob.getJobState() != JobState.RUNNING) {
return;
}
Assertions.assertEquals(JobState.RUNNING, optimizeJob.getJobState());
// runRunningJob
List<OptimizeTask> optimizeTasks = optimizeJob.getOptimizeTasks();
@ -476,4 +475,124 @@ public class OptimizeJobV2Test extends DDLTestBase {
Assertions.assertEquals(JobState.CANCELLED, optimizeJob.getJobState());
}
@Test
public void testOptimizeDistributionTypeSuccess() throws Exception {
SchemaChangeHandler schemaChangeHandler = GlobalStateMgr.getCurrentState().getSchemaChangeHandler();
Database db = GlobalStateMgr.getCurrentState().getLocalMetastore().getDb(GlobalStateMgrTestUtil.testDb1);
OlapTable olapTable =
(OlapTable) GlobalStateMgr.getCurrentState().getLocalMetastore().getTable(db.getFullName(), "testTable2");
String stmt = "alter table testTable2 distributed by random";
AlterTableStmt alterStmt = (AlterTableStmt) UtFrameUtils.parseStmtWithNewParser(stmt, starRocksAssert.getCtx());
schemaChangeHandler.process(alterStmt.getAlterClauseList(), db, olapTable);
Map<Long, AlterJobV2> alterJobsV2 = schemaChangeHandler.getAlterJobsV2();
Assertions.assertEquals(1, alterJobsV2.size());
OptimizeJobV2 optimizeJob = (OptimizeJobV2) alterJobsV2.values().stream().findAny().get();
// runPendingJob
optimizeJob.runPendingJob();
Assertions.assertEquals(JobState.WAITING_TXN, optimizeJob.getJobState());
// runWaitingTxnJob
optimizeJob.runWaitingTxnJob();
Assertions.assertEquals(JobState.RUNNING, optimizeJob.getJobState());
// Make all tasks SUCCESS to cover allPartitionOptimized branch
List<OptimizeTask> optimizeTasks = optimizeJob.getOptimizeTasks();
// Expect 2 tasks (2 partitions in test env)
Assertions.assertEquals(2, optimizeTasks.size());
for (OptimizeTask t : optimizeTasks) {
t.setOptimizeTaskState(Constants.TaskRunState.SUCCESS);
}
try {
optimizeJob.runRunningJob();
} catch (Exception e) {
LOG.info(e.getMessage());
}
// Verify job finished and default distribution updated
Assertions.assertEquals(JobState.FINISHED, optimizeJob.getJobState());
Assertions.assertEquals(OlapTable.OlapTableState.NORMAL, olapTable.getState());
Assertions.assertEquals(
com.starrocks.catalog.DistributionInfo.DistributionInfoType.RANDOM,
olapTable.getDefaultDistributionInfo().getType()
);
}
@Test
public void testRunRunningJobSubmitPendingTasks() throws Exception {
SchemaChangeHandler schemaChangeHandler = GlobalStateMgr.getCurrentState().getSchemaChangeHandler();
Database db = GlobalStateMgr.getCurrentState().getLocalMetastore().getDb(GlobalStateMgrTestUtil.testDb1);
OlapTable olapTable = (OlapTable) GlobalStateMgr.getCurrentState().getLocalMetastore()
.getTable(db.getFullName(), GlobalStateMgrTestUtil.testTable7);
// Drive job to PENDING -> WAITING_TXN -> RUNNING
schemaChangeHandler.process(alterTableStmt.getAlterClauseList(), db, olapTable);
Map<Long, AlterJobV2> alterJobsV2 = schemaChangeHandler.getAlterJobsV2();
Assertions.assertEquals(1, alterJobsV2.size());
OptimizeJobV2 optimizeJob = (OptimizeJobV2) alterJobsV2.values().stream().findAny().get();
optimizeJob.runPendingJob();
Assertions.assertEquals(JobState.WAITING_TXN, optimizeJob.getJobState());
optimizeJob.runWaitingTxnJob();
Assertions.assertEquals(JobState.RUNNING, optimizeJob.getJobState());
// Set all tasks to PENDING and clear scheduler state to trigger executeTask path in runRunningJob
List<OptimizeTask> optimizeTasks = optimizeJob.getOptimizeTasks();
for (OptimizeTask t : optimizeTasks) {
t.setOptimizeTaskState(Constants.TaskRunState.PENDING);
GlobalStateMgr.getCurrentState().getTaskManager().getTaskRunManager()
.getTaskRunScheduler().removeRunningTask(t.getId());
GlobalStateMgr.getCurrentState().getTaskManager().getTaskRunManager()
.getTaskRunScheduler().removePendingTask(t);
}
// Trigger path: executeTask for PENDING tasks should set state to RUNNING or FAILED
optimizeJob.runRunningJob();
// Assert: all tasks should not be PENDING
for (OptimizeTask t : optimizeTasks) {
Assertions.assertNotEquals(Constants.TaskRunState.PENDING, t.getOptimizeTaskState());
}
// Job should remain RUNNING because tasks are not finished
Assertions.assertEquals(JobState.RUNNING, optimizeJob.getJobState());
}
@Test
public void testRunRunningJobSubmitPendingTasksFailed() throws Exception {
SchemaChangeHandler schemaChangeHandler = GlobalStateMgr.getCurrentState().getSchemaChangeHandler();
Database db = GlobalStateMgr.getCurrentState().getLocalMetastore().getDb(GlobalStateMgrTestUtil.testDb1);
OlapTable olapTable = (OlapTable) GlobalStateMgr.getCurrentState().getLocalMetastore()
.getTable(db.getFullName(), GlobalStateMgrTestUtil.testTable7);
// Drive job to PENDING -> WAITING_TXN -> RUNNING
schemaChangeHandler.process(alterTableStmt.getAlterClauseList(), db, olapTable);
Map<Long, AlterJobV2> alterJobsV2 = schemaChangeHandler.getAlterJobsV2();
Assertions.assertEquals(1, alterJobsV2.size());
OptimizeJobV2 optimizeJob = (OptimizeJobV2) alterJobsV2.values().stream().findAny().get();
optimizeJob.runPendingJob();
Assertions.assertEquals(JobState.WAITING_TXN, optimizeJob.getJobState());
optimizeJob.runWaitingTxnJob();
Assertions.assertEquals(JobState.RUNNING, optimizeJob.getJobState());
// Create a fake PENDING task that is not registered in TaskManager to force executeTask -> FAILED
String fakeTaskName = optimizeJob.getName() + "_fake_pending";
OptimizeTask fakeTask = TaskBuilder.buildOptimizeTask(fakeTaskName, optimizeJob.getProperties(),
"select 1", db.getFullName(), 0L);
fakeTask.setOptimizeTaskState(Constants.TaskRunState.PENDING);
optimizeJob.getOptimizeTasks().add(fakeTask);
// Trigger runRunningJob: PENDING task should try to execute and become FAILED
optimizeJob.runRunningJob();
// Verify the fake task failed due to executeTask returning FAILED
Assertions.assertEquals(Constants.TaskRunState.FAILED, fakeTask.getOptimizeTaskState());
// Job should remain RUNNING because other tasks are not finished
Assertions.assertEquals(JobState.RUNNING, optimizeJob.getJobState());
}
}