diff --git a/fe/fe-core/src/main/java/com/starrocks/common/Config.java b/fe/fe-core/src/main/java/com/starrocks/common/Config.java index 60941911190..dbf8d6ac646 100644 --- a/fe/fe-core/src/main/java/com/starrocks/common/Config.java +++ b/fe/fe-core/src/main/java/com/starrocks/common/Config.java @@ -414,6 +414,9 @@ public class Config extends ConfigBase { @ConfField(mutable = true, comment = "task run ttl") public static int task_runs_ttl_second = 7 * 24 * 3600; // 7 day + @ConfField(mutable = true, comment = "task run execute timeout, default 4 hours") + public static int task_runs_timeout_second = 4 * 3600; // 4 hour + @ConfField(mutable = true, comment = "max number of task run history. ") public static int task_runs_max_history_number = 10000; diff --git a/fe/fe-core/src/main/java/com/starrocks/scheduler/TaskManager.java b/fe/fe-core/src/main/java/com/starrocks/scheduler/TaskManager.java index 2882c5d6a45..35c01f4ba0a 100644 --- a/fe/fe-core/src/main/java/com/starrocks/scheduler/TaskManager.java +++ b/fe/fe-core/src/main/java/com/starrocks/scheduler/TaskManager.java @@ -965,7 +965,30 @@ public class TaskManager implements MemoryTrackable { } public void removeExpiredTaskRuns(boolean archiveHistory) { + // remove expired task run records taskRunManager.getTaskRunHistory().vacuum(archiveHistory); + + // cancel long-running task runs to avoid resource waste + long currentTimeMs = System.currentTimeMillis(); + Set runningTaskRuns = taskRunManager.getTaskRunScheduler().getCopiedRunningTaskRuns(); + for (TaskRun taskRun : runningTaskRuns) { + int taskRunTimeout = taskRun.getExecuteTimeoutS(); + if (taskRunTimeout <= 0) { + continue; + } + if (taskRun.getStatus() == null) { + continue; + } + TaskRunStatus taskRunStatus = taskRun.getStatus(); + long taskRunCreatedTime = taskRunStatus.getCreateTime(); + if (currentTimeMs - taskRunCreatedTime < taskRunTimeout * 1000L) { + continue; + } + // if the task run has been running for a long time, cancel it directly + LOG.warn("task run [{}] has been running for a long time, cancel it," + + " created(ms):{}, timeout(s):{}", taskRun, taskRunCreatedTime, taskRunTimeout); + taskRunManager.killRunningTaskRun(taskRun, true); + } } @Override diff --git a/fe/fe-core/src/main/java/com/starrocks/scheduler/TaskRun.java b/fe/fe-core/src/main/java/com/starrocks/scheduler/TaskRun.java index dac288ede4e..9986c6990b2 100644 --- a/fe/fe-core/src/main/java/com/starrocks/scheduler/TaskRun.java +++ b/fe/fe-core/src/main/java/com/starrocks/scheduler/TaskRun.java @@ -39,6 +39,7 @@ import com.starrocks.common.util.UUIDUtil; import com.starrocks.load.loadv2.InsertLoadJob; import com.starrocks.qe.ConnectContext; import com.starrocks.qe.QueryState; +import com.starrocks.qe.SessionVariable; import com.starrocks.qe.StmtExecutor; import com.starrocks.scheduler.persist.TaskRunStatus; import com.starrocks.server.GlobalStateMgr; @@ -196,6 +197,31 @@ public class TaskRun implements Comparable { return isKilled; } + /** + * Get the execute timeout in seconds. + */ + public int getExecuteTimeoutS() { + // if `query_timeout`/`insert_timeout` is set in the execute option, use it + int defaultTimeoutS = Config.task_runs_timeout_second; + if (properties != null) { + for (Map.Entry entry : properties.entrySet()) { + if (entry.getKey().equalsIgnoreCase(SessionVariable.QUERY_TIMEOUT) + || entry.getKey().equalsIgnoreCase(SessionVariable.INSERT_TIMEOUT)) { + try { + int timeout = Integer.parseInt(entry.getValue()); + if (timeout > 0) { + defaultTimeoutS = Math.max(timeout, defaultTimeoutS); + } + } catch (NumberFormatException e) { + LOG.warn("invalid timeout value: {}, task run:{}", entry.getValue(), this); + } + } + } + } + // The timeout of task run should not be longer than the ttl of task runs and task + return Math.min(Math.min(defaultTimeoutS, Config.task_runs_ttl_second), Config.task_ttl_second); + } + public Map refreshTaskProperties(ConnectContext ctx) { Map newProperties = Maps.newHashMap(); if (task.getSource() != Constants.TaskSource.MV) { diff --git a/fe/fe-core/src/main/java/com/starrocks/scheduler/TaskRunManager.java b/fe/fe-core/src/main/java/com/starrocks/scheduler/TaskRunManager.java index 5ba9be06453..fae55f8b165 100644 --- a/fe/fe-core/src/main/java/com/starrocks/scheduler/TaskRunManager.java +++ b/fe/fe-core/src/main/java/com/starrocks/scheduler/TaskRunManager.java @@ -87,8 +87,11 @@ public class TaskRunManager implements MemoryTrackable { return new SubmitResult(queryId, SubmitResult.SubmitStatus.SUBMITTED, taskRun.getFuture()); } - public boolean killTaskRun(Long taskId, boolean force) { - TaskRun taskRun = taskRunScheduler.getRunningTaskRun(taskId); + /** + * Kill the running task run. If force is true, it will always clear the task run from task run scheduler whether it's + * canceled or not so can trigger the pending task runs as soon as possible. + */ + public boolean killRunningTaskRun(TaskRun taskRun, boolean force) { if (taskRun == null) { return false; } @@ -101,15 +104,6 @@ public class TaskRunManager implements MemoryTrackable { if (future != null && !future.completeExceptionally(new RuntimeException("TaskRun killed"))) { LOG.warn("failed to complete future for task run: {}", taskRun); } - - // mark pending tasks as failed - Set pendingTaskRuns = taskRunScheduler.getPendingTaskRunsByTaskId(taskId); - if (CollectionUtils.isNotEmpty(pendingTaskRuns)) { - for (TaskRun pendingTaskRun : pendingTaskRuns) { - taskRunScheduler.removePendingTaskRun(pendingTaskRun, Constants.TaskRunState.FAILED); - } - } - // kill the task run ConnectContext runCtx = taskRun.getRunCtx(); if (runCtx != null) { @@ -126,6 +120,30 @@ public class TaskRunManager implements MemoryTrackable { } } + /** + * Kill all pending task runs of the input task id. + */ + public void killPendingTaskRuns(Long taskId) { + // mark pending tasks as failed + Set pendingTaskRuns = taskRunScheduler.getPendingTaskRunsByTaskId(taskId); + if (CollectionUtils.isNotEmpty(pendingTaskRuns)) { + for (TaskRun pendingTaskRun : pendingTaskRuns) { + taskRunScheduler.removePendingTaskRun(pendingTaskRun, Constants.TaskRunState.FAILED); + } + } + } + + public boolean killTaskRun(Long taskId, boolean force) { + // kill all pending task runs of the task id + killPendingTaskRuns(taskId); + // kill the running task run + TaskRun taskRun = taskRunScheduler.getRunningTaskRun(taskId); + if (taskRun == null) { + return false; + } + return killRunningTaskRun(taskRun, force); + } + // At present, only the manual and automatic tasks of the materialized view have different priorities. // The manual priority is higher. For manual tasks, we do not merge operations. // For automatic tasks, we will compare the definition, and if they are the same, diff --git a/fe/fe-core/src/test/java/com/starrocks/scheduler/TaskManagerTest.java b/fe/fe-core/src/test/java/com/starrocks/scheduler/TaskManagerTest.java index 90d671ed8b6..93c0468b774 100644 --- a/fe/fe-core/src/test/java/com/starrocks/scheduler/TaskManagerTest.java +++ b/fe/fe-core/src/test/java/com/starrocks/scheduler/TaskManagerTest.java @@ -60,6 +60,8 @@ import java.util.Map; import java.util.Queue; import java.util.Set; +import static org.junit.jupiter.api.Assertions.assertEquals; + @TestMethodOrder(MethodName.class) public class TaskManagerTest { @@ -70,6 +72,7 @@ public class TaskManagerTest { private static final ExecuteOption DEFAULT_MERGE_OPTION = makeExecuteOption(true, false); private static final ExecuteOption DEFAULT_NO_MERGE_OPTION = makeExecuteOption(false, false); private final TaskRunScheduler taskRunScheduler = new TaskRunScheduler(); + private TaskRun taskRun; @BeforeEach public void setUp() { @@ -86,6 +89,7 @@ public class TaskManagerTest { } }; + taskRun = new TaskRun(); } @BeforeAll @@ -165,7 +169,7 @@ public class TaskManagerTest { } LOG.info("SubmitTaskRegularTest is waiting for TaskRunState retryCount:" + retryCount); } - Assertions.assertEquals(Constants.TaskRunState.SUCCESS, state); + assertEquals(Constants.TaskRunState.SUCCESS, state); } @Test @@ -196,15 +200,15 @@ public class TaskManagerTest { queue.offer(taskRun4); TaskRunStatus get1 = queue.poll().getStatus(); - Assertions.assertEquals(10, get1.getPriority()); + assertEquals(10, get1.getPriority()); TaskRunStatus get2 = queue.poll().getStatus(); - Assertions.assertEquals(5, get2.getPriority()); - Assertions.assertEquals(now, get2.getCreateTime()); + assertEquals(5, get2.getPriority()); + assertEquals(now, get2.getCreateTime()); TaskRunStatus get3 = queue.poll().getStatus(); - Assertions.assertEquals(5, get3.getPriority()); - Assertions.assertEquals(now + 100, get3.getCreateTime()); + assertEquals(5, get3.getPriority()); + assertEquals(now + 100, get3.getCreateTime()); TaskRunStatus get4 = queue.poll().getStatus(); - Assertions.assertEquals(0, get4.getPriority()); + assertEquals(0, get4.getPriority()); } @@ -240,8 +244,8 @@ public class TaskManagerTest { TaskRunScheduler taskRunScheduler = taskRunManager.getTaskRunScheduler(); List taskRuns = Lists.newArrayList(taskRunScheduler.getPendingTaskRunsByTaskId(taskId)); Assertions.assertTrue(taskRuns != null); - Assertions.assertEquals(1, taskRuns.size()); - Assertions.assertEquals(10, taskRuns.get(0).getStatus().getPriority()); + assertEquals(1, taskRuns.size()); + assertEquals(10, taskRuns.get(0).getStatus().getPriority()); } @Test @@ -276,8 +280,8 @@ public class TaskManagerTest { TaskRunScheduler taskRunScheduler = taskRunManager.getTaskRunScheduler(); List taskRuns = Lists.newArrayList(taskRunScheduler.getPendingTaskRunsByTaskId(taskId)); Assertions.assertTrue(taskRuns != null); - Assertions.assertEquals(1, taskRuns.size()); - Assertions.assertEquals(10, taskRuns.get(0).getStatus().getPriority()); + assertEquals(1, taskRuns.size()); + assertEquals(10, taskRuns.get(0).getStatus().getPriority()); } @@ -313,9 +317,9 @@ public class TaskManagerTest { TaskRunScheduler taskRunScheduler = taskRunManager.getTaskRunScheduler(); List taskRuns = Lists.newArrayList(taskRunScheduler.getPendingTaskRunsByTaskId(taskId)); Assertions.assertTrue(taskRuns != null); - Assertions.assertEquals(1, taskRuns.size()); + assertEquals(1, taskRuns.size()); TaskRun taskRun = taskRuns.get(0); - Assertions.assertEquals(now, taskRun.getStatus().getCreateTime()); + assertEquals(now, taskRun.getStatus().getCreateTime()); } @Test @@ -350,9 +354,9 @@ public class TaskManagerTest { TaskRunScheduler taskRunScheduler = taskRunManager.getTaskRunScheduler(); List taskRuns = Lists.newArrayList(taskRunScheduler.getPendingTaskRunsByTaskId(taskId)); Assertions.assertTrue(taskRuns != null); - Assertions.assertEquals(1, taskRuns.size()); + assertEquals(1, taskRuns.size()); TaskRun taskRun = taskRuns.get(0); - Assertions.assertEquals(now, taskRun.getStatus().getCreateTime()); + assertEquals(now, taskRun.getStatus().getCreateTime()); } @Test @@ -396,7 +400,7 @@ public class TaskManagerTest { TaskRunScheduler taskRunScheduler = taskRunManager.getTaskRunScheduler(); Collection taskRuns = taskRunScheduler.getPendingTaskRunsByTaskId(taskId); Assertions.assertTrue(taskRuns != null); - Assertions.assertEquals(3, taskRuns.size()); + assertEquals(3, taskRuns.size()); } @Test @@ -423,7 +427,7 @@ public class TaskManagerTest { taskManager.replayUpdateTaskRun(change1); TaskRunScheduler taskRunScheduler = taskManager.getTaskRunScheduler(); - Assertions.assertEquals(1, taskRunScheduler.getRunningTaskCount()); + assertEquals(1, taskRunScheduler.getRunningTaskCount()); } @Test @@ -451,8 +455,8 @@ public class TaskManagerTest { TaskRunStatusChange change1 = new TaskRunStatusChange(task.getId(), taskRun2.getStatus(), Constants.TaskRunState.PENDING, Constants.TaskRunState.RUNNING); taskManager.replayUpdateTaskRun(change1); - Assertions.assertEquals(1, taskRunScheduler.getRunningTaskCount()); - Assertions.assertEquals(1, taskRunScheduler.getPendingQueueCount()); + assertEquals(1, taskRunScheduler.getRunningTaskCount()); + assertEquals(1, taskRunScheduler.getPendingQueueCount()); } { @@ -460,8 +464,8 @@ public class TaskManagerTest { TaskRunStatusChange change = new TaskRunStatusChange(task.getId(), taskRun2.getStatus(), Constants.TaskRunState.RUNNING, Constants.TaskRunState.FAILED); taskManager.replayUpdateTaskRun(change); - Assertions.assertEquals(0, taskRunScheduler.getRunningTaskCount()); - Assertions.assertEquals(1, taskRunScheduler.getPendingQueueCount()); + assertEquals(0, taskRunScheduler.getRunningTaskCount()); + assertEquals(1, taskRunScheduler.getPendingQueueCount()); } { @@ -469,8 +473,8 @@ public class TaskManagerTest { TaskRunStatusChange change = new TaskRunStatusChange(task.getId(), taskRun1.getStatus(), Constants.TaskRunState.PENDING, Constants.TaskRunState.FAILED); taskManager.replayUpdateTaskRun(change); - Assertions.assertEquals(0, taskRunScheduler.getRunningTaskCount()); - Assertions.assertEquals(0, taskRunScheduler.getPendingQueueCount()); + assertEquals(0, taskRunScheduler.getRunningTaskCount()); + assertEquals(0, taskRunScheduler.getPendingQueueCount()); } } @@ -486,7 +490,7 @@ public class TaskManagerTest { } Config.task_runs_max_history_number = 20; taskRunManager.getTaskRunHistory().forceGC(); - Assertions.assertEquals(20, taskRunManager.getTaskRunHistory().getInMemoryHistory().size()); + assertEquals(20, taskRunManager.getTaskRunHistory().getInMemoryHistory().size()); Config.task_runs_max_history_number = 10000; Config.enable_task_history_archive = true; } @@ -502,7 +506,7 @@ public class TaskManagerTest { } Config.task_runs_max_history_number = 20; taskRunManager.getTaskRunHistory().forceGC(); - Assertions.assertEquals(10, taskRunManager.getTaskRunHistory().getInMemoryHistory().size()); + assertEquals(10, taskRunManager.getTaskRunHistory().getInMemoryHistory().size()); Config.task_runs_max_history_number = 10000; } @@ -513,21 +517,21 @@ public class TaskManagerTest { @Test public void testGetInitialDelayTime1() throws Exception { - Assertions.assertEquals(50, TaskManager.getInitialDelayTime(60, parseLocalDateTime("2023-04-18 19:08:50"), + assertEquals(50, TaskManager.getInitialDelayTime(60, parseLocalDateTime("2023-04-18 19:08:50"), parseLocalDateTime("2023-04-18 20:00:00"))); - Assertions.assertEquals(30, TaskManager.getInitialDelayTime(60, parseLocalDateTime("2023-04-18 19:08:30"), + assertEquals(30, TaskManager.getInitialDelayTime(60, parseLocalDateTime("2023-04-18 19:08:30"), parseLocalDateTime("2023-04-18 20:00:00"))); - Assertions.assertEquals(20, TaskManager.getInitialDelayTime(60, parseLocalDateTime("2023-04-18 19:08:30"), + assertEquals(20, TaskManager.getInitialDelayTime(60, parseLocalDateTime("2023-04-18 19:08:30"), parseLocalDateTime("2023-04-18 20:00:10"))); - Assertions.assertEquals(0, TaskManager.getInitialDelayTime(20, parseLocalDateTime("2023-04-18 19:08:30"), + assertEquals(0, TaskManager.getInitialDelayTime(20, parseLocalDateTime("2023-04-18 19:08:30"), parseLocalDateTime("2023-04-18 21:00:10"))); } @Test public void testGetInitialDelayTime2() throws Exception { - Assertions.assertEquals(23, TaskManager.getInitialDelayTime(60, parseLocalDateTime("2023-12-29 19:50:00"), + assertEquals(23, TaskManager.getInitialDelayTime(60, parseLocalDateTime("2023-12-29 19:50:00"), LocalDateTime.parse("2024-01-30T15:27:37.342356010"))); - Assertions.assertEquals(50, TaskManager.getInitialDelayTime(60, parseLocalDateTime("2023-12-29 19:50:00"), + assertEquals(50, TaskManager.getInitialDelayTime(60, parseLocalDateTime("2023-12-29 19:50:00"), LocalDateTime.parse("2024-01-30T15:27:10.342356010"))); } @@ -566,50 +570,50 @@ public class TaskManagerTest { TaskRunScheduler taskRunScheduler = taskRunManager.getTaskRunScheduler(); Collection taskRuns = taskRunScheduler.getPendingTaskRunsByTaskId(taskId); Assertions.assertTrue(taskRuns != null); - Assertions.assertEquals(2, taskRunScheduler.getPendingQueueCount()); - Assertions.assertEquals(2, taskRunScheduler.getPendingTaskRunsByTaskId(taskId).size()); + assertEquals(2, taskRunScheduler.getPendingQueueCount()); + assertEquals(2, taskRunScheduler.getPendingTaskRunsByTaskId(taskId).size()); // If it's a sync refresh, no merge redundant anyway TaskRun taskRun3 = makeTaskRun(taskId, task, makeExecuteOption(false, true)); result = taskRunManager.submitTaskRun(taskRun3, taskRun3.getExecuteOption()); Assertions.assertTrue(result.getStatus() == SubmitResult.SubmitStatus.SUBMITTED); - Assertions.assertEquals(3, taskRunScheduler.getPendingQueueCount()); - Assertions.assertEquals(3, taskRunScheduler.getPendingTaskRunsByTaskId(taskId).size()); + assertEquals(3, taskRunScheduler.getPendingQueueCount()); + assertEquals(3, taskRunScheduler.getPendingTaskRunsByTaskId(taskId).size()); // merge it TaskRun taskRun4 = makeTaskRun(taskId, task, makeExecuteOption(true, false)); result = taskRunManager.submitTaskRun(taskRun4, taskRun4.getExecuteOption()); Assertions.assertTrue(result.getStatus() == SubmitResult.SubmitStatus.SUBMITTED); - Assertions.assertEquals(3, taskRunScheduler.getPendingQueueCount()); - Assertions.assertEquals(3, taskRunScheduler.getPendingTaskRunsByTaskId(taskId).size()); + assertEquals(3, taskRunScheduler.getPendingQueueCount()); + assertEquals(3, taskRunScheduler.getPendingTaskRunsByTaskId(taskId).size()); // no merge it TaskRun taskRun5 = makeTaskRun(taskId, task, makeExecuteOption(false, false)); result = taskRunManager.submitTaskRun(taskRun5, taskRun5.getExecuteOption()); Assertions.assertTrue(result.getStatus() == SubmitResult.SubmitStatus.SUBMITTED); - Assertions.assertEquals(4, taskRunScheduler.getPendingQueueCount()); - Assertions.assertEquals(4, taskRunScheduler.getPendingTaskRunsByTaskId(taskId).size()); + assertEquals(4, taskRunScheduler.getPendingQueueCount()); + assertEquals(4, taskRunScheduler.getPendingTaskRunsByTaskId(taskId).size()); for (int i = 4; i < Config.task_runs_queue_length; i++) { TaskRun taskRun = makeTaskRun(taskId, task, makeExecuteOption(false, false)); result = taskRunManager.submitTaskRun(taskRun, taskRun.getExecuteOption()); Assertions.assertTrue(result.getStatus() == SubmitResult.SubmitStatus.SUBMITTED); - Assertions.assertEquals(i + 1, taskRunScheduler.getPendingQueueCount()); - Assertions.assertEquals(i + 1, taskRunScheduler.getPendingTaskRunsByTaskId(taskId).size()); + assertEquals(i + 1, taskRunScheduler.getPendingQueueCount()); + assertEquals(i + 1, taskRunScheduler.getPendingTaskRunsByTaskId(taskId).size()); } // no assign it: exceed queue's size TaskRun taskRun6 = makeTaskRun(taskId, task, makeExecuteOption(false, false)); result = taskRunManager.submitTaskRun(taskRun6, taskRun6.getExecuteOption()); Assertions.assertTrue(result.getStatus() == SubmitResult.SubmitStatus.REJECTED); - Assertions.assertEquals(Config.task_runs_queue_length, taskRunScheduler.getPendingQueueCount()); - Assertions.assertEquals(Config.task_runs_queue_length, taskRunScheduler.getPendingTaskRunsByTaskId(taskId).size()); + assertEquals(Config.task_runs_queue_length, taskRunScheduler.getPendingQueueCount()); + assertEquals(Config.task_runs_queue_length, taskRunScheduler.getPendingTaskRunsByTaskId(taskId).size()); // no assign it: exceed queue's size TaskRun taskRun7 = makeTaskRun(taskId, task, makeExecuteOption(false, false)); result = taskRunManager.submitTaskRun(taskRun7, taskRun7.getExecuteOption()); Assertions.assertTrue(result.getStatus() == SubmitResult.SubmitStatus.REJECTED); - Assertions.assertEquals(Config.task_runs_queue_length, taskRunScheduler.getPendingQueueCount()); - Assertions.assertEquals(Config.task_runs_queue_length, taskRunScheduler.getPendingTaskRunsByTaskId(taskId).size()); + assertEquals(Config.task_runs_queue_length, taskRunScheduler.getPendingQueueCount()); + assertEquals(Config.task_runs_queue_length, taskRunScheduler.getPendingTaskRunsByTaskId(taskId).size()); } @@ -707,7 +711,7 @@ public class TaskManagerTest { tm.getTaskRunManager().submitTaskRun(taskRun, taskRun.getExecuteOption()); } long pendingTaskRunsCount = taskRunScheduler.getPendingQueueCount(); - Assertions.assertEquals(pendingTaskRunsCount, 10); + assertEquals(pendingTaskRunsCount, 10); } @Test @@ -770,7 +774,7 @@ public class TaskManagerTest { } }); long runningTaskRunsCount = taskRunScheduler.getRunningTaskCount(); - Assertions.assertEquals(1, runningTaskRunsCount); + assertEquals(1, runningTaskRunsCount); new MockUp() { @Mock @@ -781,9 +785,9 @@ public class TaskManagerTest { // running task run will not be removed if force kill is false TaskRunManager taskRunManager = tm.getTaskRunManager(); taskRunManager.killTaskRun(1L, false); - Assertions.assertEquals(1, taskRunScheduler.getRunningTaskCount()); + assertEquals(1, taskRunScheduler.getRunningTaskCount()); taskRunManager.killTaskRun(1L, true); - Assertions.assertEquals(0, taskRunScheduler.getRunningTaskCount()); + assertEquals(0, taskRunScheduler.getRunningTaskCount()); } @Test @@ -800,7 +804,7 @@ public class TaskManagerTest { taskRun.initStatus("1", now + 10); taskRun.getStatus().setPriority(0); TaskRunStatus taskRunStatus = taskRun.getStatus(); - Assertions.assertEquals(taskRunStatus.getDefinition(), "select 1"); + assertEquals(taskRunStatus.getDefinition(), "select 1"); } @Test @@ -897,7 +901,7 @@ public class TaskManagerTest { taskManager.replayCreateTaskRun(validStatus); TaskRunHistory taskRunHistory = taskManager.getTaskRunHistory(); - Assertions.assertEquals(2, taskRunHistory.getTaskRunCount()); + assertEquals(2, taskRunHistory.getTaskRunCount()); taskManager.saveTasksV2(imageWriter); } @@ -907,7 +911,7 @@ public class TaskManagerTest { TaskManager taskManager = new TaskManager(); taskManager.loadTasksV2(imageReader); TaskRunHistory taskRunHistory = taskManager.getTaskRunHistory(); - Assertions.assertEquals(2, taskRunHistory.getTaskRunCount()); + assertEquals(2, taskRunHistory.getTaskRunCount()); Set expectedStates = ImmutableSet.of( Constants.TaskRunState.SUCCESS, Constants.TaskRunState.SKIPPED); @@ -957,4 +961,107 @@ public class TaskManagerTest { taskManager.replayCreateTaskRun(validStatus); // The valid task run should be processed without errors. } + + @Test + public void removeExpiredTaskRunsShouldCancelLongRunningTasks() { + TaskRunManager taskRunManager = new TaskRunManager(taskRunScheduler); + TaskRun taskRun = new TaskRun(); + TaskRunStatus status = new TaskRunStatus(); + status.setCreateTime(System.currentTimeMillis() - 5000); + new MockUp() { + @Mock + public TaskRunStatus getStatus() { + return status; + } + + @Mock + public int getExecuteTimeoutS() { + return 3; + } + }; + new MockUp() { + @Mock + public Set getCopiedRunningTaskRuns() { + return ImmutableSet.of(taskRun); + } + }; + new MockUp() { + @Mock + void killRunningTaskRun(TaskRun taskRun, boolean force) { + assertEquals(status, taskRun.getStatus()); + } + }; + + TaskManager taskManager = new TaskManager(); + taskManager.removeExpiredTaskRuns(false); + } + + @Test + public void removeExpiredTaskRunsShouldNotCancelTasksWithoutTimeout() { + TaskRunManager taskRunManager = new TaskRunManager(taskRunScheduler); + TaskRun taskRun = new TaskRun(); + TaskRunStatus status = new TaskRunStatus(); + status.setCreateTime(System.currentTimeMillis() - 5000); + + new MockUp() { + @Mock + public TaskRunStatus getStatus() { + return status; + } + + @Mock + public int getExecuteTimeoutS() { + return 0; + } + }; + new MockUp() { + @Mock + public Set getCopiedRunningTaskRuns() { + return ImmutableSet.of(taskRun); + } + }; + new MockUp() { + @Mock + void killRunningTaskRun(TaskRun taskRun, boolean force) { + Assertions.fail("Task without timeout should not be canceled"); + } + }; + + TaskManager taskManager = new TaskManager(); + taskManager.removeExpiredTaskRuns(false); + } + + @Test + public void removeExpiredTaskRunsShouldNotCancelNonExpiredTasks() { + TaskRunManager taskRunManager = new TaskRunManager(taskRunScheduler); + TaskRun taskRun = new TaskRun(); + TaskRunStatus status = new TaskRunStatus(); + status.setCreateTime(System.currentTimeMillis() - 1000); + new MockUp() { + @Mock + public TaskRunStatus getStatus() { + return status; + } + + @Mock + public int getExecuteTimeoutS() { + return 5; + } + }; + new MockUp() { + @Mock + public Set getCopiedRunningTaskRuns() { + return ImmutableSet.of(taskRun); + } + }; + new MockUp() { + @Mock + void killRunningTaskRun(TaskRun taskRun, boolean force) { + Assertions.fail("Non-expired task should not be canceled"); + } + }; + + TaskManager taskManager = new TaskManager(); + taskManager.removeExpiredTaskRuns(false); + } } \ No newline at end of file diff --git a/fe/fe-core/src/test/java/com/starrocks/scheduler/TaskRunTest.java b/fe/fe-core/src/test/java/com/starrocks/scheduler/TaskRunTest.java new file mode 100644 index 00000000000..ef0c9226ab4 --- /dev/null +++ b/fe/fe-core/src/test/java/com/starrocks/scheduler/TaskRunTest.java @@ -0,0 +1,74 @@ +// Copyright 2021-present StarRocks, Inc. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package com.starrocks.scheduler; + +import com.starrocks.common.Config; +import com.starrocks.qe.SessionVariable; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.util.HashMap; +import java.util.Map; + +import static org.junit.jupiter.api.Assertions.assertEquals; + + +public class TaskRunTest { + + private TaskRun taskRun; + + @BeforeEach + public void setUp() { + taskRun = new TaskRun(); + } + + @Test + public void testDefaultTimeout() { + taskRun.setProperties(null); + assertEquals(Config.task_runs_timeout_second, taskRun.getExecuteTimeoutS()); + } + + @Test + public void testQueryTimeoutProperty() { + Map props = new HashMap<>(); + props.put(SessionVariable.QUERY_TIMEOUT, "120"); + taskRun.setProperties(props); + assertEquals(Math.max(120, Config.task_runs_timeout_second), taskRun.getExecuteTimeoutS()); + } + + @Test + public void testInsertTimeoutProperty() { + Map props = new HashMap<>(); + props.put(SessionVariable.INSERT_TIMEOUT, "200"); + taskRun.setProperties(props); + assertEquals(Math.max(200, Config.task_runs_timeout_second), taskRun.getExecuteTimeoutS()); + } + + @Test + public void testInvalidTimeoutValue() { + Map props = new HashMap<>(); + props.put(SessionVariable.QUERY_TIMEOUT, "invalid"); + taskRun.setProperties(props); + assertEquals(Config.task_runs_timeout_second, taskRun.getExecuteTimeoutS()); + } + + @Test + public void testNegativeTimeoutValue() { + Map props = new HashMap<>(); + props.put(SessionVariable.QUERY_TIMEOUT, "-10"); + taskRun.setProperties(props); + assertEquals(Config.task_runs_timeout_second, taskRun.getExecuteTimeoutS()); + } +} \ No newline at end of file