[Enhancement] add ExecState into /current_queries cmd result to distinguish running/pending query (#62261)

Signed-off-by: MatthewH00 <1639097204@qq.com>
This commit is contained in:
hmx 2025-08-27 14:02:07 +08:00 committed by GitHub
parent 44c95430d3
commit 7a3e5e2cef
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 111 additions and 10 deletions

View File

@ -66,6 +66,7 @@ public class CurrentQueryStatisticsProcDir implements ProcDirInterface {
.add("CPUTime")
.add("ExecTime")
.add("ExecProgress")
.add("ExecState")
.add("Warehouse")
.add("CustomQueryId")
.add("ResourceGroup")

View File

@ -44,6 +44,8 @@ import com.starrocks.common.Config;
import com.starrocks.common.proc.CurrentQueryInfoProvider;
import com.starrocks.common.util.QueryStatisticsFormatter;
import com.starrocks.common.util.TimeUtils;
import com.starrocks.qe.scheduler.slot.SlotManager;
import com.starrocks.server.GlobalStateMgr;
import com.starrocks.service.FrontendOptions;
import com.starrocks.thrift.TQueryStatisticsInfo;
import org.apache.http.HttpStatus;
@ -77,6 +79,7 @@ public class QueryStatisticsInfo {
private long spillBytes;
private long execTime;
private String execProgress;
private String execState;
private String wareHouseName;
private String customQueryId;
private String resourceGroupName;
@ -86,8 +89,8 @@ public class QueryStatisticsInfo {
public QueryStatisticsInfo(long queryStartTime, String feIp, String queryId, String connId, String db, String user,
long cpuCostNs, long scanBytes, long scanRows, long memUsageBytes, long spillBytes,
long execTime, String execProgress, String wareHouseName, String customQueryId,
String resourceGroupName) {
long execTime, String execProgress, String execState, String wareHouseName,
String customQueryId, String resourceGroupName) {
this.queryStartTime = queryStartTime;
this.feIp = feIp;
this.queryId = queryId;
@ -101,6 +104,7 @@ public class QueryStatisticsInfo {
this.spillBytes = spillBytes;
this.execTime = execTime;
this.execProgress = execProgress;
this.execState = execState;
this.wareHouseName = wareHouseName;
this.customQueryId = customQueryId;
this.resourceGroupName = resourceGroupName;
@ -158,6 +162,10 @@ public class QueryStatisticsInfo {
return execProgress;
}
public String getExecState() {
return execState;
}
public String getWareHouseName() {
return wareHouseName;
}
@ -235,6 +243,11 @@ public class QueryStatisticsInfo {
return this;
}
public QueryStatisticsInfo withExecState(String execState) {
this.execState = execState;
return this;
}
public QueryStatisticsInfo withWareHouseName(String warehouseName) {
this.wareHouseName = warehouseName;
return this;
@ -265,6 +278,7 @@ public class QueryStatisticsInfo {
.setSpillBytes(spillBytes)
.setExecTime(execTime)
.setExecProgress(execProgress)
.setExecState(execState)
.setWareHouseName(wareHouseName)
.setCustomQueryId(customQueryId)
.setResourceGroupName(resourceGroupName);
@ -285,6 +299,7 @@ public class QueryStatisticsInfo {
.withCpuCostNs(tinfo.getCpuCostNs())
.withExecTime(tinfo.getExecTime())
.withExecProgress(tinfo.getExecProgress())
.withExecState(tinfo.getExecState())
.withWareHouseName(tinfo.getWareHouseName())
.withCustomQueryId(tinfo.getCustomQueryId())
.withResourceGroupName(tinfo.getResourceGroupName());
@ -305,6 +320,7 @@ public class QueryStatisticsInfo {
values.add(QueryStatisticsFormatter.getSecondsFromNano(this.getCpuCostNs()));
values.add(QueryStatisticsFormatter.getSecondsFromMilli(this.getExecTime()));
values.add(this.getExecProgress());
values.add(this.getExecState());
values.add(this.getWareHouseName());
values.add(this.getCustomQueryId());
values.add(this.getResourceGroupName());
@ -325,14 +341,15 @@ public class QueryStatisticsInfo {
Objects.equals(db, that.db) && Objects.equals(user, that.user) && cpuCostNs == that.cpuCostNs &&
scanBytes == that.scanBytes && scanRows == that.scanRows && memUsageBytes == that.memUsageBytes &&
spillBytes == that.spillBytes && execTime == that.execTime && execProgress == that.execProgress &&
Objects.equals(wareHouseName, that.wareHouseName) && Objects.equals(customQueryId, that.customQueryId) &&
execState == that.execState && Objects.equals(wareHouseName, that.wareHouseName) &&
Objects.equals(customQueryId, that.customQueryId) &&
Objects.equals(resourceGroupName, that.resourceGroupName);
}
@Override
public int hashCode() {
return Objects.hash(queryStartTime, feIp, queryId, connId, db, user, cpuCostNs, scanBytes, scanRows, memUsageBytes,
spillBytes, execTime, execProgress, wareHouseName, customQueryId, resourceGroupName);
spillBytes, execTime, execProgress, execState, wareHouseName, customQueryId, resourceGroupName);
}
@Override
@ -350,6 +367,7 @@ public class QueryStatisticsInfo {
", spillBytes=" + spillBytes +
", execTime=" + execTime +
", execProgress=" + execProgress +
", execState=" + execState +
", wareHouseName=" + wareHouseName +
", customQueryId=" + customQueryId +
", resourceGroupName=" + resourceGroupName +
@ -369,6 +387,7 @@ public class QueryStatisticsInfo {
.sorted(Comparator.comparingLong(QueryStatisticsItem::getQueryStartTime))
.collect(Collectors.toList());
final HttpClient httpClient = HttpClient.newHttpClient();
final SlotManager slotManager = (SlotManager) GlobalStateMgr.getCurrentState().getSlotManager();
for (QueryStatisticsItem item : sorted) {
final CurrentQueryInfoProvider.QueryStatistics statistics = statisticsMap.get(item.getQueryId());
@ -382,6 +401,7 @@ public class QueryStatisticsInfo {
.withExecTime(item.getQueryExecTime())
.withExecProgress(getExecProgress(FrontendOptions.getLocalHostAddress(),
item.getQueryId(), httpClient))
.withExecState(slotManager.getExecStateByQueryId(item.getQueryId()))
.withWareHouseName(item.getWarehouseName())
.withCustomQueryId(item.getCustomQueryId())
.withResourceGroupName(item.getResourceGroupName());

View File

@ -14,6 +14,7 @@
package com.starrocks.qe.scheduler.slot;
import com.starrocks.common.util.DebugUtil;
import com.starrocks.metric.MetricVisitor;
import com.starrocks.qe.ConnectContext;
import com.starrocks.thrift.TStatus;
@ -63,6 +64,14 @@ public class SlotManager extends BaseSlotManager {
// do nothing
}
public String getExecStateByQueryId(String queryId) {
return getSlots().stream()
.filter(slot -> queryId.equals(DebugUtil.printId(slot.getSlotId())))
.map(slot -> slot.getState().toQueryStateString())
.findFirst()
.orElse("");
}
private class RequestWorker extends Thread {
public RequestWorker() {
super("slot-mgr-req");

View File

@ -48,6 +48,7 @@ public class CurrentGlobalQueryStatisticsProcDirTest {
.withCpuCostNs(97323000)
.withExecTime(3533000)
.withExecProgress("100%")
.withExecState("FINISHED")
.withWareHouseName("default_warehouse")
.withCustomQueryId("")
.withResourceGroupName("wg1");
@ -67,6 +68,7 @@ public class CurrentGlobalQueryStatisticsProcDirTest {
.withCpuCostNs(96576000)
.withExecTime(2086000)
.withExecProgress("100%")
.withExecState("FINISHED")
.withWareHouseName("default_warehouse")
.withCustomQueryId("")
.withResourceGroupName("wg2");
@ -85,6 +87,7 @@ public class CurrentGlobalQueryStatisticsProcDirTest {
.withCpuCostNs(97456000)
.withExecTime(3687000)
.withExecProgress("100%")
.withExecState("FINISHED")
.withWareHouseName("default_warehouse")
.withCustomQueryId("")
.withResourceGroupName("wg3");
@ -104,6 +107,7 @@ public class CurrentGlobalQueryStatisticsProcDirTest {
.withCpuCostNs(96686000)
.withExecTime(2196000)
.withExecProgress("100%")
.withExecState("FINISHED")
.withWareHouseName("default_warehouse")
.withCustomQueryId("")
.withResourceGroupName("wg");

View File

@ -79,22 +79,22 @@ public class CurrentQueryStatisticsProcDirTest {
// QueryId
Assertions.assertEquals("queryId1", list1.get(2));
// Warehouse
Assertions.assertEquals("wh1", list1.get(13));
Assertions.assertEquals("wh1", list1.get(14));
// CustomQueryId
Assertions.assertEquals("abc1", list1.get(14));
Assertions.assertEquals("abc1", list1.get(15));
// ResourceGroupName
Assertions.assertEquals("wg1", list1.get(15));
Assertions.assertEquals("wg1", list1.get(16));
List<String> list2 = rows.get(1);
Assertions.assertEquals(list2.size(), CurrentQueryStatisticsProcDir.TITLE_NAMES.size());
// QueryId
Assertions.assertEquals("queryId2", list2.get(2));
// Warehouse
Assertions.assertEquals("wh1", list2.get(13));
Assertions.assertEquals("wh1", list2.get(14));
// CustomQueryId
Assertions.assertEquals("abc2", list2.get(14));
Assertions.assertEquals("abc2", list2.get(15));
// ResourceGroupName
Assertions.assertEquals("wg2", list2.get(15));
Assertions.assertEquals("wg2", list2.get(16));
}
@Test

View File

@ -46,6 +46,7 @@ public class QueryStatisticsInfoTest {
firstQuery.getSpillBytes(),
firstQuery.getExecTime(),
firstQuery.getExecProgress(),
firstQuery.getExecState(),
firstQuery.getWareHouseName(),
firstQuery.getCustomQueryId(),
firstQuery.getResourceGroupName()

View File

@ -16,11 +16,13 @@ package com.starrocks.qe.scheduler;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import com.starrocks.catalog.ResourceGroup;
import com.starrocks.common.Config;
import com.starrocks.common.ExceptionChecker;
import com.starrocks.common.Pair;
import com.starrocks.common.StarRocksException;
import com.starrocks.common.util.DebugUtil;
import com.starrocks.metric.MetricRepo;
import com.starrocks.qe.ConnectContext;
import com.starrocks.qe.DefaultCoordinator;
@ -1667,4 +1669,67 @@ public class QueryQueueManagerTest extends SchedulerTestBase {
coord.onFinished();
}
}
@Test
public void testShowProcGlobalCurrentQueries() throws Exception {
final int concurrencyLimit = 2;
GlobalVariable.setEnableQueryQueueSelect(true);
GlobalVariable.setQueryQueueConcurrencyLimit(concurrencyLimit);
TWorkGroup group0 = new TWorkGroup().setId(0L).setConcurrency_limit(concurrencyLimit - 1);
List<TWorkGroup> groups = ImmutableList.of(group0);
final int numPendingCoords = groups.size() * concurrencyLimit;
// 1. Run `concurrencyLimit` queries.
List<DefaultCoordinator> runningCoords = new ArrayList<>();
mockResourceGroup(null);
runningCoords.add(runNoPendingQuery());
mockResourceGroup(group0);
runningCoords.add(runNoPendingQuery());
// 2. Set group has `concurrencyLimit` pending queries.
List<DefaultCoordinator> coords = new ArrayList<>();
List<Thread> threads = new ArrayList<>();
for (int i = 0; i < concurrencyLimit; i++) {
for (TWorkGroup group : groups) {
if (group.getId() == LogicalSlot.ABSENT_GROUP_ID) {
mockResourceGroup(null);
} else {
mockResourceGroup(group);
}
DefaultCoordinator coord = getSchedulerWithQueryId("select count(1) from lineitem");
coords.add(coord);
threads.add(new Thread(() -> Assertions.assertThrows(StarRocksException.class,
() -> manager.maybeWait(connectContext, coord),
"Cancelled")));
}
}
threads.forEach(Thread::start);
Awaitility.await().atMost(5, TimeUnit.SECONDS)
.until(() -> numPendingCoords == MetricRepo.COUNTER_QUERY_QUEUE_PENDING.getValue());
coords.forEach(coord -> Assertions.assertEquals(LogicalSlot.State.REQUIRING, coord.getSlot().getState()));
Awaitility.await().atMost(5, TimeUnit.SECONDS)
.until(() -> GlobalStateMgr.getCurrentState().getSlotManager().getSlots().size() ==
numPendingCoords + concurrencyLimit);
// 3. Get ExecState(RUNNING/PENDING) via SlotManager.getExecStateByQueryId().
List<LogicalSlot> slots = GlobalStateMgr.getCurrentState().getSlotManager().getSlots();
Map<String, String> queryStateMap = Maps.newHashMap();
SlotManager slotManager = (SlotManager) GlobalStateMgr.getCurrentState().getSlotManager();
for (LogicalSlot slot : slots) {
String queryId = DebugUtil.printId(slot.getSlotId());
String state = slotManager.getExecStateByQueryId(queryId);
queryStateMap.put(queryId, state);
}
long runningCnt = queryStateMap.values().stream().filter("RUNNING"::equals).count();
long pendingCnt = queryStateMap.values().stream().filter("PENDING"::equals).count();
Assertions.assertEquals(runningCnt, 2L);
Assertions.assertEquals(pendingCnt, 2L);
coords.forEach(coor -> coor.cancel("Cancel by test"));
runningCoords.forEach(DefaultCoordinator::onFinished);
}
}

View File

@ -1648,6 +1648,7 @@ struct TQueryStatisticsInfo {
14: optional string customQueryId
15: optional string resourceGroupName
16: optional string execProgress
17: optional string execState
}
struct TGetQueryStatisticsResponse {