[Enhancement] Add task run execute timeout checker (#63842)

Signed-off-by: shuming.li <ming.moriarty@gmail.com>
This commit is contained in:
shuming.li 2025-10-10 15:16:48 +08:00 committed by GitHub
parent 707b3c4a7a
commit 2c23586802
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 314 additions and 63 deletions

View File

@ -414,6 +414,9 @@ public class Config extends ConfigBase {
@ConfField(mutable = true, comment = "task run ttl")
public static int task_runs_ttl_second = 7 * 24 * 3600; // 7 day
@ConfField(mutable = true, comment = "task run execute timeout, default 4 hours")
public static int task_runs_timeout_second = 4 * 3600; // 4 hour
@ConfField(mutable = true, comment = "max number of task run history. ")
public static int task_runs_max_history_number = 10000;

View File

@ -965,7 +965,30 @@ public class TaskManager implements MemoryTrackable {
}
public void removeExpiredTaskRuns(boolean archiveHistory) {
// remove expired task run records
taskRunManager.getTaskRunHistory().vacuum(archiveHistory);
// cancel long-running task runs to avoid resource waste
long currentTimeMs = System.currentTimeMillis();
Set<TaskRun> runningTaskRuns = taskRunManager.getTaskRunScheduler().getCopiedRunningTaskRuns();
for (TaskRun taskRun : runningTaskRuns) {
int taskRunTimeout = taskRun.getExecuteTimeoutS();
if (taskRunTimeout <= 0) {
continue;
}
if (taskRun.getStatus() == null) {
continue;
}
TaskRunStatus taskRunStatus = taskRun.getStatus();
long taskRunCreatedTime = taskRunStatus.getCreateTime();
if (currentTimeMs - taskRunCreatedTime < taskRunTimeout * 1000L) {
continue;
}
// if the task run has been running for a long time, cancel it directly
LOG.warn("task run [{}] has been running for a long time, cancel it," +
" created(ms):{}, timeout(s):{}", taskRun, taskRunCreatedTime, taskRunTimeout);
taskRunManager.killRunningTaskRun(taskRun, true);
}
}
@Override

View File

@ -39,6 +39,7 @@ import com.starrocks.common.util.UUIDUtil;
import com.starrocks.load.loadv2.InsertLoadJob;
import com.starrocks.qe.ConnectContext;
import com.starrocks.qe.QueryState;
import com.starrocks.qe.SessionVariable;
import com.starrocks.qe.StmtExecutor;
import com.starrocks.scheduler.persist.TaskRunStatus;
import com.starrocks.server.GlobalStateMgr;
@ -196,6 +197,31 @@ public class TaskRun implements Comparable<TaskRun> {
return isKilled;
}
/**
* Get the execute timeout in seconds.
*/
public int getExecuteTimeoutS() {
// if `query_timeout`/`insert_timeout` is set in the execute option, use it
int defaultTimeoutS = Config.task_runs_timeout_second;
if (properties != null) {
for (Map.Entry<String, String> entry : properties.entrySet()) {
if (entry.getKey().equalsIgnoreCase(SessionVariable.QUERY_TIMEOUT)
|| entry.getKey().equalsIgnoreCase(SessionVariable.INSERT_TIMEOUT)) {
try {
int timeout = Integer.parseInt(entry.getValue());
if (timeout > 0) {
defaultTimeoutS = Math.max(timeout, defaultTimeoutS);
}
} catch (NumberFormatException e) {
LOG.warn("invalid timeout value: {}, task run:{}", entry.getValue(), this);
}
}
}
}
// The timeout of task run should not be longer than the ttl of task runs and task
return Math.min(Math.min(defaultTimeoutS, Config.task_runs_ttl_second), Config.task_ttl_second);
}
public Map<String, String> refreshTaskProperties(ConnectContext ctx) {
Map<String, String> newProperties = Maps.newHashMap();
if (task.getSource() != Constants.TaskSource.MV) {

View File

@ -87,8 +87,11 @@ public class TaskRunManager implements MemoryTrackable {
return new SubmitResult(queryId, SubmitResult.SubmitStatus.SUBMITTED, taskRun.getFuture());
}
public boolean killTaskRun(Long taskId, boolean force) {
TaskRun taskRun = taskRunScheduler.getRunningTaskRun(taskId);
/**
* Kill the running task run. If force is true, it will always clear the task run from task run scheduler whether it's
* canceled or not so can trigger the pending task runs as soon as possible.
*/
public boolean killRunningTaskRun(TaskRun taskRun, boolean force) {
if (taskRun == null) {
return false;
}
@ -101,15 +104,6 @@ public class TaskRunManager implements MemoryTrackable {
if (future != null && !future.completeExceptionally(new RuntimeException("TaskRun killed"))) {
LOG.warn("failed to complete future for task run: {}", taskRun);
}
// mark pending tasks as failed
Set<TaskRun> pendingTaskRuns = taskRunScheduler.getPendingTaskRunsByTaskId(taskId);
if (CollectionUtils.isNotEmpty(pendingTaskRuns)) {
for (TaskRun pendingTaskRun : pendingTaskRuns) {
taskRunScheduler.removePendingTaskRun(pendingTaskRun, Constants.TaskRunState.FAILED);
}
}
// kill the task run
ConnectContext runCtx = taskRun.getRunCtx();
if (runCtx != null) {
@ -126,6 +120,30 @@ public class TaskRunManager implements MemoryTrackable {
}
}
/**
* Kill all pending task runs of the input task id.
*/
public void killPendingTaskRuns(Long taskId) {
// mark pending tasks as failed
Set<TaskRun> pendingTaskRuns = taskRunScheduler.getPendingTaskRunsByTaskId(taskId);
if (CollectionUtils.isNotEmpty(pendingTaskRuns)) {
for (TaskRun pendingTaskRun : pendingTaskRuns) {
taskRunScheduler.removePendingTaskRun(pendingTaskRun, Constants.TaskRunState.FAILED);
}
}
}
public boolean killTaskRun(Long taskId, boolean force) {
// kill all pending task runs of the task id
killPendingTaskRuns(taskId);
// kill the running task run
TaskRun taskRun = taskRunScheduler.getRunningTaskRun(taskId);
if (taskRun == null) {
return false;
}
return killRunningTaskRun(taskRun, force);
}
// At present, only the manual and automatic tasks of the materialized view have different priorities.
// The manual priority is higher. For manual tasks, we do not merge operations.
// For automatic tasks, we will compare the definition, and if they are the same,

View File

@ -60,6 +60,8 @@ import java.util.Map;
import java.util.Queue;
import java.util.Set;
import static org.junit.jupiter.api.Assertions.assertEquals;
@TestMethodOrder(MethodName.class)
public class TaskManagerTest {
@ -70,6 +72,7 @@ public class TaskManagerTest {
private static final ExecuteOption DEFAULT_MERGE_OPTION = makeExecuteOption(true, false);
private static final ExecuteOption DEFAULT_NO_MERGE_OPTION = makeExecuteOption(false, false);
private final TaskRunScheduler taskRunScheduler = new TaskRunScheduler();
private TaskRun taskRun;
@BeforeEach
public void setUp() {
@ -86,6 +89,7 @@ public class TaskManagerTest {
}
};
taskRun = new TaskRun();
}
@BeforeAll
@ -165,7 +169,7 @@ public class TaskManagerTest {
}
LOG.info("SubmitTaskRegularTest is waiting for TaskRunState retryCount:" + retryCount);
}
Assertions.assertEquals(Constants.TaskRunState.SUCCESS, state);
assertEquals(Constants.TaskRunState.SUCCESS, state);
}
@Test
@ -196,15 +200,15 @@ public class TaskManagerTest {
queue.offer(taskRun4);
TaskRunStatus get1 = queue.poll().getStatus();
Assertions.assertEquals(10, get1.getPriority());
assertEquals(10, get1.getPriority());
TaskRunStatus get2 = queue.poll().getStatus();
Assertions.assertEquals(5, get2.getPriority());
Assertions.assertEquals(now, get2.getCreateTime());
assertEquals(5, get2.getPriority());
assertEquals(now, get2.getCreateTime());
TaskRunStatus get3 = queue.poll().getStatus();
Assertions.assertEquals(5, get3.getPriority());
Assertions.assertEquals(now + 100, get3.getCreateTime());
assertEquals(5, get3.getPriority());
assertEquals(now + 100, get3.getCreateTime());
TaskRunStatus get4 = queue.poll().getStatus();
Assertions.assertEquals(0, get4.getPriority());
assertEquals(0, get4.getPriority());
}
@ -240,8 +244,8 @@ public class TaskManagerTest {
TaskRunScheduler taskRunScheduler = taskRunManager.getTaskRunScheduler();
List<TaskRun> taskRuns = Lists.newArrayList(taskRunScheduler.getPendingTaskRunsByTaskId(taskId));
Assertions.assertTrue(taskRuns != null);
Assertions.assertEquals(1, taskRuns.size());
Assertions.assertEquals(10, taskRuns.get(0).getStatus().getPriority());
assertEquals(1, taskRuns.size());
assertEquals(10, taskRuns.get(0).getStatus().getPriority());
}
@Test
@ -276,8 +280,8 @@ public class TaskManagerTest {
TaskRunScheduler taskRunScheduler = taskRunManager.getTaskRunScheduler();
List<TaskRun> taskRuns = Lists.newArrayList(taskRunScheduler.getPendingTaskRunsByTaskId(taskId));
Assertions.assertTrue(taskRuns != null);
Assertions.assertEquals(1, taskRuns.size());
Assertions.assertEquals(10, taskRuns.get(0).getStatus().getPriority());
assertEquals(1, taskRuns.size());
assertEquals(10, taskRuns.get(0).getStatus().getPriority());
}
@ -313,9 +317,9 @@ public class TaskManagerTest {
TaskRunScheduler taskRunScheduler = taskRunManager.getTaskRunScheduler();
List<TaskRun> taskRuns = Lists.newArrayList(taskRunScheduler.getPendingTaskRunsByTaskId(taskId));
Assertions.assertTrue(taskRuns != null);
Assertions.assertEquals(1, taskRuns.size());
assertEquals(1, taskRuns.size());
TaskRun taskRun = taskRuns.get(0);
Assertions.assertEquals(now, taskRun.getStatus().getCreateTime());
assertEquals(now, taskRun.getStatus().getCreateTime());
}
@Test
@ -350,9 +354,9 @@ public class TaskManagerTest {
TaskRunScheduler taskRunScheduler = taskRunManager.getTaskRunScheduler();
List<TaskRun> taskRuns = Lists.newArrayList(taskRunScheduler.getPendingTaskRunsByTaskId(taskId));
Assertions.assertTrue(taskRuns != null);
Assertions.assertEquals(1, taskRuns.size());
assertEquals(1, taskRuns.size());
TaskRun taskRun = taskRuns.get(0);
Assertions.assertEquals(now, taskRun.getStatus().getCreateTime());
assertEquals(now, taskRun.getStatus().getCreateTime());
}
@Test
@ -396,7 +400,7 @@ public class TaskManagerTest {
TaskRunScheduler taskRunScheduler = taskRunManager.getTaskRunScheduler();
Collection<TaskRun> taskRuns = taskRunScheduler.getPendingTaskRunsByTaskId(taskId);
Assertions.assertTrue(taskRuns != null);
Assertions.assertEquals(3, taskRuns.size());
assertEquals(3, taskRuns.size());
}
@Test
@ -423,7 +427,7 @@ public class TaskManagerTest {
taskManager.replayUpdateTaskRun(change1);
TaskRunScheduler taskRunScheduler = taskManager.getTaskRunScheduler();
Assertions.assertEquals(1, taskRunScheduler.getRunningTaskCount());
assertEquals(1, taskRunScheduler.getRunningTaskCount());
}
@Test
@ -451,8 +455,8 @@ public class TaskManagerTest {
TaskRunStatusChange change1 = new TaskRunStatusChange(task.getId(), taskRun2.getStatus(),
Constants.TaskRunState.PENDING, Constants.TaskRunState.RUNNING);
taskManager.replayUpdateTaskRun(change1);
Assertions.assertEquals(1, taskRunScheduler.getRunningTaskCount());
Assertions.assertEquals(1, taskRunScheduler.getPendingQueueCount());
assertEquals(1, taskRunScheduler.getRunningTaskCount());
assertEquals(1, taskRunScheduler.getPendingQueueCount());
}
{
@ -460,8 +464,8 @@ public class TaskManagerTest {
TaskRunStatusChange change = new TaskRunStatusChange(task.getId(), taskRun2.getStatus(),
Constants.TaskRunState.RUNNING, Constants.TaskRunState.FAILED);
taskManager.replayUpdateTaskRun(change);
Assertions.assertEquals(0, taskRunScheduler.getRunningTaskCount());
Assertions.assertEquals(1, taskRunScheduler.getPendingQueueCount());
assertEquals(0, taskRunScheduler.getRunningTaskCount());
assertEquals(1, taskRunScheduler.getPendingQueueCount());
}
{
@ -469,8 +473,8 @@ public class TaskManagerTest {
TaskRunStatusChange change = new TaskRunStatusChange(task.getId(), taskRun1.getStatus(),
Constants.TaskRunState.PENDING, Constants.TaskRunState.FAILED);
taskManager.replayUpdateTaskRun(change);
Assertions.assertEquals(0, taskRunScheduler.getRunningTaskCount());
Assertions.assertEquals(0, taskRunScheduler.getPendingQueueCount());
assertEquals(0, taskRunScheduler.getRunningTaskCount());
assertEquals(0, taskRunScheduler.getPendingQueueCount());
}
}
@ -486,7 +490,7 @@ public class TaskManagerTest {
}
Config.task_runs_max_history_number = 20;
taskRunManager.getTaskRunHistory().forceGC();
Assertions.assertEquals(20, taskRunManager.getTaskRunHistory().getInMemoryHistory().size());
assertEquals(20, taskRunManager.getTaskRunHistory().getInMemoryHistory().size());
Config.task_runs_max_history_number = 10000;
Config.enable_task_history_archive = true;
}
@ -502,7 +506,7 @@ public class TaskManagerTest {
}
Config.task_runs_max_history_number = 20;
taskRunManager.getTaskRunHistory().forceGC();
Assertions.assertEquals(10, taskRunManager.getTaskRunHistory().getInMemoryHistory().size());
assertEquals(10, taskRunManager.getTaskRunHistory().getInMemoryHistory().size());
Config.task_runs_max_history_number = 10000;
}
@ -513,21 +517,21 @@ public class TaskManagerTest {
@Test
public void testGetInitialDelayTime1() throws Exception {
Assertions.assertEquals(50, TaskManager.getInitialDelayTime(60, parseLocalDateTime("2023-04-18 19:08:50"),
assertEquals(50, TaskManager.getInitialDelayTime(60, parseLocalDateTime("2023-04-18 19:08:50"),
parseLocalDateTime("2023-04-18 20:00:00")));
Assertions.assertEquals(30, TaskManager.getInitialDelayTime(60, parseLocalDateTime("2023-04-18 19:08:30"),
assertEquals(30, TaskManager.getInitialDelayTime(60, parseLocalDateTime("2023-04-18 19:08:30"),
parseLocalDateTime("2023-04-18 20:00:00")));
Assertions.assertEquals(20, TaskManager.getInitialDelayTime(60, parseLocalDateTime("2023-04-18 19:08:30"),
assertEquals(20, TaskManager.getInitialDelayTime(60, parseLocalDateTime("2023-04-18 19:08:30"),
parseLocalDateTime("2023-04-18 20:00:10")));
Assertions.assertEquals(0, TaskManager.getInitialDelayTime(20, parseLocalDateTime("2023-04-18 19:08:30"),
assertEquals(0, TaskManager.getInitialDelayTime(20, parseLocalDateTime("2023-04-18 19:08:30"),
parseLocalDateTime("2023-04-18 21:00:10")));
}
@Test
public void testGetInitialDelayTime2() throws Exception {
Assertions.assertEquals(23, TaskManager.getInitialDelayTime(60, parseLocalDateTime("2023-12-29 19:50:00"),
assertEquals(23, TaskManager.getInitialDelayTime(60, parseLocalDateTime("2023-12-29 19:50:00"),
LocalDateTime.parse("2024-01-30T15:27:37.342356010")));
Assertions.assertEquals(50, TaskManager.getInitialDelayTime(60, parseLocalDateTime("2023-12-29 19:50:00"),
assertEquals(50, TaskManager.getInitialDelayTime(60, parseLocalDateTime("2023-12-29 19:50:00"),
LocalDateTime.parse("2024-01-30T15:27:10.342356010")));
}
@ -566,50 +570,50 @@ public class TaskManagerTest {
TaskRunScheduler taskRunScheduler = taskRunManager.getTaskRunScheduler();
Collection<TaskRun> taskRuns = taskRunScheduler.getPendingTaskRunsByTaskId(taskId);
Assertions.assertTrue(taskRuns != null);
Assertions.assertEquals(2, taskRunScheduler.getPendingQueueCount());
Assertions.assertEquals(2, taskRunScheduler.getPendingTaskRunsByTaskId(taskId).size());
assertEquals(2, taskRunScheduler.getPendingQueueCount());
assertEquals(2, taskRunScheduler.getPendingTaskRunsByTaskId(taskId).size());
// If it's a sync refresh, no merge redundant anyway
TaskRun taskRun3 = makeTaskRun(taskId, task, makeExecuteOption(false, true));
result = taskRunManager.submitTaskRun(taskRun3, taskRun3.getExecuteOption());
Assertions.assertTrue(result.getStatus() == SubmitResult.SubmitStatus.SUBMITTED);
Assertions.assertEquals(3, taskRunScheduler.getPendingQueueCount());
Assertions.assertEquals(3, taskRunScheduler.getPendingTaskRunsByTaskId(taskId).size());
assertEquals(3, taskRunScheduler.getPendingQueueCount());
assertEquals(3, taskRunScheduler.getPendingTaskRunsByTaskId(taskId).size());
// merge it
TaskRun taskRun4 = makeTaskRun(taskId, task, makeExecuteOption(true, false));
result = taskRunManager.submitTaskRun(taskRun4, taskRun4.getExecuteOption());
Assertions.assertTrue(result.getStatus() == SubmitResult.SubmitStatus.SUBMITTED);
Assertions.assertEquals(3, taskRunScheduler.getPendingQueueCount());
Assertions.assertEquals(3, taskRunScheduler.getPendingTaskRunsByTaskId(taskId).size());
assertEquals(3, taskRunScheduler.getPendingQueueCount());
assertEquals(3, taskRunScheduler.getPendingTaskRunsByTaskId(taskId).size());
// no merge it
TaskRun taskRun5 = makeTaskRun(taskId, task, makeExecuteOption(false, false));
result = taskRunManager.submitTaskRun(taskRun5, taskRun5.getExecuteOption());
Assertions.assertTrue(result.getStatus() == SubmitResult.SubmitStatus.SUBMITTED);
Assertions.assertEquals(4, taskRunScheduler.getPendingQueueCount());
Assertions.assertEquals(4, taskRunScheduler.getPendingTaskRunsByTaskId(taskId).size());
assertEquals(4, taskRunScheduler.getPendingQueueCount());
assertEquals(4, taskRunScheduler.getPendingTaskRunsByTaskId(taskId).size());
for (int i = 4; i < Config.task_runs_queue_length; i++) {
TaskRun taskRun = makeTaskRun(taskId, task, makeExecuteOption(false, false));
result = taskRunManager.submitTaskRun(taskRun, taskRun.getExecuteOption());
Assertions.assertTrue(result.getStatus() == SubmitResult.SubmitStatus.SUBMITTED);
Assertions.assertEquals(i + 1, taskRunScheduler.getPendingQueueCount());
Assertions.assertEquals(i + 1, taskRunScheduler.getPendingTaskRunsByTaskId(taskId).size());
assertEquals(i + 1, taskRunScheduler.getPendingQueueCount());
assertEquals(i + 1, taskRunScheduler.getPendingTaskRunsByTaskId(taskId).size());
}
// no assign it: exceed queue's size
TaskRun taskRun6 = makeTaskRun(taskId, task, makeExecuteOption(false, false));
result = taskRunManager.submitTaskRun(taskRun6, taskRun6.getExecuteOption());
Assertions.assertTrue(result.getStatus() == SubmitResult.SubmitStatus.REJECTED);
Assertions.assertEquals(Config.task_runs_queue_length, taskRunScheduler.getPendingQueueCount());
Assertions.assertEquals(Config.task_runs_queue_length, taskRunScheduler.getPendingTaskRunsByTaskId(taskId).size());
assertEquals(Config.task_runs_queue_length, taskRunScheduler.getPendingQueueCount());
assertEquals(Config.task_runs_queue_length, taskRunScheduler.getPendingTaskRunsByTaskId(taskId).size());
// no assign it: exceed queue's size
TaskRun taskRun7 = makeTaskRun(taskId, task, makeExecuteOption(false, false));
result = taskRunManager.submitTaskRun(taskRun7, taskRun7.getExecuteOption());
Assertions.assertTrue(result.getStatus() == SubmitResult.SubmitStatus.REJECTED);
Assertions.assertEquals(Config.task_runs_queue_length, taskRunScheduler.getPendingQueueCount());
Assertions.assertEquals(Config.task_runs_queue_length, taskRunScheduler.getPendingTaskRunsByTaskId(taskId).size());
assertEquals(Config.task_runs_queue_length, taskRunScheduler.getPendingQueueCount());
assertEquals(Config.task_runs_queue_length, taskRunScheduler.getPendingTaskRunsByTaskId(taskId).size());
}
@ -707,7 +711,7 @@ public class TaskManagerTest {
tm.getTaskRunManager().submitTaskRun(taskRun, taskRun.getExecuteOption());
}
long pendingTaskRunsCount = taskRunScheduler.getPendingQueueCount();
Assertions.assertEquals(pendingTaskRunsCount, 10);
assertEquals(pendingTaskRunsCount, 10);
}
@Test
@ -770,7 +774,7 @@ public class TaskManagerTest {
}
});
long runningTaskRunsCount = taskRunScheduler.getRunningTaskCount();
Assertions.assertEquals(1, runningTaskRunsCount);
assertEquals(1, runningTaskRunsCount);
new MockUp<TaskRun>() {
@Mock
@ -781,9 +785,9 @@ public class TaskManagerTest {
// running task run will not be removed if force kill is false
TaskRunManager taskRunManager = tm.getTaskRunManager();
taskRunManager.killTaskRun(1L, false);
Assertions.assertEquals(1, taskRunScheduler.getRunningTaskCount());
assertEquals(1, taskRunScheduler.getRunningTaskCount());
taskRunManager.killTaskRun(1L, true);
Assertions.assertEquals(0, taskRunScheduler.getRunningTaskCount());
assertEquals(0, taskRunScheduler.getRunningTaskCount());
}
@Test
@ -800,7 +804,7 @@ public class TaskManagerTest {
taskRun.initStatus("1", now + 10);
taskRun.getStatus().setPriority(0);
TaskRunStatus taskRunStatus = taskRun.getStatus();
Assertions.assertEquals(taskRunStatus.getDefinition(), "select 1");
assertEquals(taskRunStatus.getDefinition(), "select 1");
}
@Test
@ -897,7 +901,7 @@ public class TaskManagerTest {
taskManager.replayCreateTaskRun(validStatus);
TaskRunHistory taskRunHistory = taskManager.getTaskRunHistory();
Assertions.assertEquals(2, taskRunHistory.getTaskRunCount());
assertEquals(2, taskRunHistory.getTaskRunCount());
taskManager.saveTasksV2(imageWriter);
}
@ -907,7 +911,7 @@ public class TaskManagerTest {
TaskManager taskManager = new TaskManager();
taskManager.loadTasksV2(imageReader);
TaskRunHistory taskRunHistory = taskManager.getTaskRunHistory();
Assertions.assertEquals(2, taskRunHistory.getTaskRunCount());
assertEquals(2, taskRunHistory.getTaskRunCount());
Set<Constants.TaskRunState> expectedStates = ImmutableSet.of(
Constants.TaskRunState.SUCCESS, Constants.TaskRunState.SKIPPED);
@ -957,4 +961,107 @@ public class TaskManagerTest {
taskManager.replayCreateTaskRun(validStatus);
// The valid task run should be processed without errors.
}
@Test
public void removeExpiredTaskRunsShouldCancelLongRunningTasks() {
TaskRunManager taskRunManager = new TaskRunManager(taskRunScheduler);
TaskRun taskRun = new TaskRun();
TaskRunStatus status = new TaskRunStatus();
status.setCreateTime(System.currentTimeMillis() - 5000);
new MockUp<TaskRun>() {
@Mock
public TaskRunStatus getStatus() {
return status;
}
@Mock
public int getExecuteTimeoutS() {
return 3;
}
};
new MockUp<TaskRunScheduler>() {
@Mock
public Set<TaskRun> getCopiedRunningTaskRuns() {
return ImmutableSet.of(taskRun);
}
};
new MockUp<TaskRunManager>() {
@Mock
void killRunningTaskRun(TaskRun taskRun, boolean force) {
assertEquals(status, taskRun.getStatus());
}
};
TaskManager taskManager = new TaskManager();
taskManager.removeExpiredTaskRuns(false);
}
@Test
public void removeExpiredTaskRunsShouldNotCancelTasksWithoutTimeout() {
TaskRunManager taskRunManager = new TaskRunManager(taskRunScheduler);
TaskRun taskRun = new TaskRun();
TaskRunStatus status = new TaskRunStatus();
status.setCreateTime(System.currentTimeMillis() - 5000);
new MockUp<TaskRun>() {
@Mock
public TaskRunStatus getStatus() {
return status;
}
@Mock
public int getExecuteTimeoutS() {
return 0;
}
};
new MockUp<TaskRunScheduler>() {
@Mock
public Set<TaskRun> getCopiedRunningTaskRuns() {
return ImmutableSet.of(taskRun);
}
};
new MockUp<TaskRunManager>() {
@Mock
void killRunningTaskRun(TaskRun taskRun, boolean force) {
Assertions.fail("Task without timeout should not be canceled");
}
};
TaskManager taskManager = new TaskManager();
taskManager.removeExpiredTaskRuns(false);
}
@Test
public void removeExpiredTaskRunsShouldNotCancelNonExpiredTasks() {
TaskRunManager taskRunManager = new TaskRunManager(taskRunScheduler);
TaskRun taskRun = new TaskRun();
TaskRunStatus status = new TaskRunStatus();
status.setCreateTime(System.currentTimeMillis() - 1000);
new MockUp<TaskRun>() {
@Mock
public TaskRunStatus getStatus() {
return status;
}
@Mock
public int getExecuteTimeoutS() {
return 5;
}
};
new MockUp<TaskRunScheduler>() {
@Mock
public Set<TaskRun> getCopiedRunningTaskRuns() {
return ImmutableSet.of(taskRun);
}
};
new MockUp<TaskRunManager>() {
@Mock
void killRunningTaskRun(TaskRun taskRun, boolean force) {
Assertions.fail("Non-expired task should not be canceled");
}
};
TaskManager taskManager = new TaskManager();
taskManager.removeExpiredTaskRuns(false);
}
}

View File

@ -0,0 +1,74 @@
// Copyright 2021-present StarRocks, Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package com.starrocks.scheduler;
import com.starrocks.common.Config;
import com.starrocks.qe.SessionVariable;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.util.HashMap;
import java.util.Map;
import static org.junit.jupiter.api.Assertions.assertEquals;
public class TaskRunTest {
private TaskRun taskRun;
@BeforeEach
public void setUp() {
taskRun = new TaskRun();
}
@Test
public void testDefaultTimeout() {
taskRun.setProperties(null);
assertEquals(Config.task_runs_timeout_second, taskRun.getExecuteTimeoutS());
}
@Test
public void testQueryTimeoutProperty() {
Map<String, String> props = new HashMap<>();
props.put(SessionVariable.QUERY_TIMEOUT, "120");
taskRun.setProperties(props);
assertEquals(Math.max(120, Config.task_runs_timeout_second), taskRun.getExecuteTimeoutS());
}
@Test
public void testInsertTimeoutProperty() {
Map<String, String> props = new HashMap<>();
props.put(SessionVariable.INSERT_TIMEOUT, "200");
taskRun.setProperties(props);
assertEquals(Math.max(200, Config.task_runs_timeout_second), taskRun.getExecuteTimeoutS());
}
@Test
public void testInvalidTimeoutValue() {
Map<String, String> props = new HashMap<>();
props.put(SessionVariable.QUERY_TIMEOUT, "invalid");
taskRun.setProperties(props);
assertEquals(Config.task_runs_timeout_second, taskRun.getExecuteTimeoutS());
}
@Test
public void testNegativeTimeoutValue() {
Map<String, String> props = new HashMap<>();
props.put(SessionVariable.QUERY_TIMEOUT, "-10");
taskRun.setProperties(props);
assertEquals(Config.task_runs_timeout_second, taskRun.getExecuteTimeoutS());
}
}