[Enhancement] add ExecState into /current_queries cmd result to distinguish running/pending query (backport #62261) (#62372)
Signed-off-by: MatthewH00 <1639097204@qq.com> Co-authored-by: hmx <1639097204@qq.com>
This commit is contained in:
parent
77291b7c49
commit
5f192a6f28
|
|
@ -66,6 +66,7 @@ public class CurrentQueryStatisticsProcDir implements ProcDirInterface {
|
|||
.add("CPUTime")
|
||||
.add("ExecTime")
|
||||
.add("ExecProgress")
|
||||
.add("ExecState")
|
||||
.add("Warehouse")
|
||||
.add("CustomQueryId")
|
||||
.add("ResourceGroup")
|
||||
|
|
|
|||
|
|
@ -44,6 +44,8 @@ import com.starrocks.common.Config;
|
|||
import com.starrocks.common.proc.CurrentQueryInfoProvider;
|
||||
import com.starrocks.common.util.QueryStatisticsFormatter;
|
||||
import com.starrocks.common.util.TimeUtils;
|
||||
import com.starrocks.qe.scheduler.slot.SlotManager;
|
||||
import com.starrocks.server.GlobalStateMgr;
|
||||
import com.starrocks.service.FrontendOptions;
|
||||
import com.starrocks.thrift.TQueryStatisticsInfo;
|
||||
import org.apache.http.HttpStatus;
|
||||
|
|
@ -77,6 +79,7 @@ public class QueryStatisticsInfo {
|
|||
private long spillBytes;
|
||||
private long execTime;
|
||||
private String execProgress;
|
||||
private String execState;
|
||||
private String wareHouseName;
|
||||
private String customQueryId;
|
||||
private String resourceGroupName;
|
||||
|
|
@ -86,8 +89,8 @@ public class QueryStatisticsInfo {
|
|||
|
||||
public QueryStatisticsInfo(long queryStartTime, String feIp, String queryId, String connId, String db, String user,
|
||||
long cpuCostNs, long scanBytes, long scanRows, long memUsageBytes, long spillBytes,
|
||||
long execTime, String execProgress, String wareHouseName, String customQueryId,
|
||||
String resourceGroupName) {
|
||||
long execTime, String execProgress, String execState, String wareHouseName,
|
||||
String customQueryId, String resourceGroupName) {
|
||||
this.queryStartTime = queryStartTime;
|
||||
this.feIp = feIp;
|
||||
this.queryId = queryId;
|
||||
|
|
@ -101,6 +104,7 @@ public class QueryStatisticsInfo {
|
|||
this.spillBytes = spillBytes;
|
||||
this.execTime = execTime;
|
||||
this.execProgress = execProgress;
|
||||
this.execState = execState;
|
||||
this.wareHouseName = wareHouseName;
|
||||
this.customQueryId = customQueryId;
|
||||
this.resourceGroupName = resourceGroupName;
|
||||
|
|
@ -158,6 +162,10 @@ public class QueryStatisticsInfo {
|
|||
return execProgress;
|
||||
}
|
||||
|
||||
public String getExecState() {
|
||||
return execState;
|
||||
}
|
||||
|
||||
public String getWareHouseName() {
|
||||
return wareHouseName;
|
||||
}
|
||||
|
|
@ -235,6 +243,11 @@ public class QueryStatisticsInfo {
|
|||
return this;
|
||||
}
|
||||
|
||||
public QueryStatisticsInfo withExecState(String execState) {
|
||||
this.execState = execState;
|
||||
return this;
|
||||
}
|
||||
|
||||
public QueryStatisticsInfo withWareHouseName(String warehouseName) {
|
||||
this.wareHouseName = warehouseName;
|
||||
return this;
|
||||
|
|
@ -265,6 +278,7 @@ public class QueryStatisticsInfo {
|
|||
.setSpillBytes(spillBytes)
|
||||
.setExecTime(execTime)
|
||||
.setExecProgress(execProgress)
|
||||
.setExecState(execState)
|
||||
.setWareHouseName(wareHouseName)
|
||||
.setCustomQueryId(customQueryId)
|
||||
.setResourceGroupName(resourceGroupName);
|
||||
|
|
@ -285,6 +299,7 @@ public class QueryStatisticsInfo {
|
|||
.withCpuCostNs(tinfo.getCpuCostNs())
|
||||
.withExecTime(tinfo.getExecTime())
|
||||
.withExecProgress(tinfo.getExecProgress())
|
||||
.withExecState(tinfo.getExecState())
|
||||
.withWareHouseName(tinfo.getWareHouseName())
|
||||
.withCustomQueryId(tinfo.getCustomQueryId())
|
||||
.withResourceGroupName(tinfo.getResourceGroupName());
|
||||
|
|
@ -305,6 +320,7 @@ public class QueryStatisticsInfo {
|
|||
values.add(QueryStatisticsFormatter.getSecondsFromNano(this.getCpuCostNs()));
|
||||
values.add(QueryStatisticsFormatter.getSecondsFromMilli(this.getExecTime()));
|
||||
values.add(this.getExecProgress());
|
||||
values.add(this.getExecState());
|
||||
values.add(this.getWareHouseName());
|
||||
values.add(this.getCustomQueryId());
|
||||
values.add(this.getResourceGroupName());
|
||||
|
|
@ -325,14 +341,15 @@ public class QueryStatisticsInfo {
|
|||
Objects.equals(db, that.db) && Objects.equals(user, that.user) && cpuCostNs == that.cpuCostNs &&
|
||||
scanBytes == that.scanBytes && scanRows == that.scanRows && memUsageBytes == that.memUsageBytes &&
|
||||
spillBytes == that.spillBytes && execTime == that.execTime && execProgress == that.execProgress &&
|
||||
Objects.equals(wareHouseName, that.wareHouseName) && Objects.equals(customQueryId, that.customQueryId) &&
|
||||
execState == that.execState && Objects.equals(wareHouseName, that.wareHouseName) &&
|
||||
Objects.equals(customQueryId, that.customQueryId) &&
|
||||
Objects.equals(resourceGroupName, that.resourceGroupName);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return Objects.hash(queryStartTime, feIp, queryId, connId, db, user, cpuCostNs, scanBytes, scanRows, memUsageBytes,
|
||||
spillBytes, execTime, execProgress, wareHouseName, customQueryId, resourceGroupName);
|
||||
spillBytes, execTime, execProgress, execState, wareHouseName, customQueryId, resourceGroupName);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
@ -350,6 +367,7 @@ public class QueryStatisticsInfo {
|
|||
", spillBytes=" + spillBytes +
|
||||
", execTime=" + execTime +
|
||||
", execProgress=" + execProgress +
|
||||
", execState=" + execState +
|
||||
", wareHouseName=" + wareHouseName +
|
||||
", customQueryId=" + customQueryId +
|
||||
", resourceGroupName=" + resourceGroupName +
|
||||
|
|
@ -369,6 +387,7 @@ public class QueryStatisticsInfo {
|
|||
.sorted(Comparator.comparingLong(QueryStatisticsItem::getQueryStartTime))
|
||||
.collect(Collectors.toList());
|
||||
final HttpClient httpClient = HttpClient.newHttpClient();
|
||||
final SlotManager slotManager = (SlotManager) GlobalStateMgr.getCurrentState().getSlotManager();
|
||||
for (QueryStatisticsItem item : sorted) {
|
||||
final CurrentQueryInfoProvider.QueryStatistics statistics = statisticsMap.get(item.getQueryId());
|
||||
|
||||
|
|
@ -382,6 +401,7 @@ public class QueryStatisticsInfo {
|
|||
.withExecTime(item.getQueryExecTime())
|
||||
.withExecProgress(getExecProgress(FrontendOptions.getLocalHostAddress(),
|
||||
item.getQueryId(), httpClient))
|
||||
.withExecState(slotManager.getExecStateByQueryId(item.getQueryId()))
|
||||
.withWareHouseName(item.getWarehouseName())
|
||||
.withCustomQueryId(item.getCustomQueryId())
|
||||
.withResourceGroupName(item.getResourceGroupName());
|
||||
|
|
|
|||
|
|
@ -14,6 +14,7 @@
|
|||
|
||||
package com.starrocks.qe.scheduler.slot;
|
||||
|
||||
import com.starrocks.common.util.DebugUtil;
|
||||
import com.starrocks.metric.MetricVisitor;
|
||||
import com.starrocks.qe.ConnectContext;
|
||||
import com.starrocks.thrift.TStatus;
|
||||
|
|
@ -63,6 +64,14 @@ public class SlotManager extends BaseSlotManager {
|
|||
// do nothing
|
||||
}
|
||||
|
||||
public String getExecStateByQueryId(String queryId) {
|
||||
return getSlots().stream()
|
||||
.filter(slot -> queryId.equals(DebugUtil.printId(slot.getSlotId())))
|
||||
.map(slot -> slot.getState().toQueryStateString())
|
||||
.findFirst()
|
||||
.orElse("");
|
||||
}
|
||||
|
||||
private class RequestWorker extends Thread {
|
||||
public RequestWorker() {
|
||||
super("slot-mgr-req");
|
||||
|
|
|
|||
|
|
@ -48,6 +48,7 @@ public class CurrentGlobalQueryStatisticsProcDirTest {
|
|||
.withCpuCostNs(97323000)
|
||||
.withExecTime(3533000)
|
||||
.withExecProgress("100%")
|
||||
.withExecState("FINISHED")
|
||||
.withWareHouseName("default_warehouse")
|
||||
.withCustomQueryId("")
|
||||
.withResourceGroupName("wg1");
|
||||
|
|
@ -67,6 +68,7 @@ public class CurrentGlobalQueryStatisticsProcDirTest {
|
|||
.withCpuCostNs(96576000)
|
||||
.withExecTime(2086000)
|
||||
.withExecProgress("100%")
|
||||
.withExecState("FINISHED")
|
||||
.withWareHouseName("default_warehouse")
|
||||
.withCustomQueryId("")
|
||||
.withResourceGroupName("wg2");
|
||||
|
|
@ -85,6 +87,7 @@ public class CurrentGlobalQueryStatisticsProcDirTest {
|
|||
.withCpuCostNs(97456000)
|
||||
.withExecTime(3687000)
|
||||
.withExecProgress("100%")
|
||||
.withExecState("FINISHED")
|
||||
.withWareHouseName("default_warehouse")
|
||||
.withCustomQueryId("")
|
||||
.withResourceGroupName("wg3");
|
||||
|
|
@ -104,6 +107,7 @@ public class CurrentGlobalQueryStatisticsProcDirTest {
|
|||
.withCpuCostNs(96686000)
|
||||
.withExecTime(2196000)
|
||||
.withExecProgress("100%")
|
||||
.withExecState("FINISHED")
|
||||
.withWareHouseName("default_warehouse")
|
||||
.withCustomQueryId("")
|
||||
.withResourceGroupName("wg");
|
||||
|
|
|
|||
|
|
@ -79,22 +79,22 @@ public class CurrentQueryStatisticsProcDirTest {
|
|||
// QueryId
|
||||
Assertions.assertEquals("queryId1", list1.get(2));
|
||||
// Warehouse
|
||||
Assertions.assertEquals("wh1", list1.get(13));
|
||||
Assertions.assertEquals("wh1", list1.get(14));
|
||||
// CustomQueryId
|
||||
Assertions.assertEquals("abc1", list1.get(14));
|
||||
Assertions.assertEquals("abc1", list1.get(15));
|
||||
// ResourceGroupName
|
||||
Assertions.assertEquals("wg1", list1.get(15));
|
||||
Assertions.assertEquals("wg1", list1.get(16));
|
||||
|
||||
List<String> list2 = rows.get(1);
|
||||
Assertions.assertEquals(list2.size(), CurrentQueryStatisticsProcDir.TITLE_NAMES.size());
|
||||
// QueryId
|
||||
Assertions.assertEquals("queryId2", list2.get(2));
|
||||
// Warehouse
|
||||
Assertions.assertEquals("wh1", list2.get(13));
|
||||
Assertions.assertEquals("wh1", list2.get(14));
|
||||
// CustomQueryId
|
||||
Assertions.assertEquals("abc2", list2.get(14));
|
||||
Assertions.assertEquals("abc2", list2.get(15));
|
||||
// ResourceGroupName
|
||||
Assertions.assertEquals("wg2", list2.get(15));
|
||||
Assertions.assertEquals("wg2", list2.get(16));
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
|
|||
|
|
@ -46,6 +46,7 @@ public class QueryStatisticsInfoTest {
|
|||
firstQuery.getSpillBytes(),
|
||||
firstQuery.getExecTime(),
|
||||
firstQuery.getExecProgress(),
|
||||
firstQuery.getExecState(),
|
||||
firstQuery.getWareHouseName(),
|
||||
firstQuery.getCustomQueryId(),
|
||||
firstQuery.getResourceGroupName()
|
||||
|
|
|
|||
|
|
@ -16,11 +16,13 @@ package com.starrocks.qe.scheduler;
|
|||
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import com.google.common.collect.Maps;
|
||||
import com.starrocks.catalog.ResourceGroup;
|
||||
import com.starrocks.common.Config;
|
||||
import com.starrocks.common.ExceptionChecker;
|
||||
import com.starrocks.common.Pair;
|
||||
import com.starrocks.common.StarRocksException;
|
||||
import com.starrocks.common.util.DebugUtil;
|
||||
import com.starrocks.metric.MetricRepo;
|
||||
import com.starrocks.qe.ConnectContext;
|
||||
import com.starrocks.qe.DefaultCoordinator;
|
||||
|
|
@ -1667,4 +1669,67 @@ public class QueryQueueManagerTest extends SchedulerTestBase {
|
|||
coord.onFinished();
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testShowProcGlobalCurrentQueries() throws Exception {
|
||||
final int concurrencyLimit = 2;
|
||||
|
||||
GlobalVariable.setEnableQueryQueueSelect(true);
|
||||
GlobalVariable.setQueryQueueConcurrencyLimit(concurrencyLimit);
|
||||
|
||||
TWorkGroup group0 = new TWorkGroup().setId(0L).setConcurrency_limit(concurrencyLimit - 1);
|
||||
List<TWorkGroup> groups = ImmutableList.of(group0);
|
||||
|
||||
final int numPendingCoords = groups.size() * concurrencyLimit;
|
||||
|
||||
// 1. Run `concurrencyLimit` queries.
|
||||
List<DefaultCoordinator> runningCoords = new ArrayList<>();
|
||||
mockResourceGroup(null);
|
||||
runningCoords.add(runNoPendingQuery());
|
||||
mockResourceGroup(group0);
|
||||
runningCoords.add(runNoPendingQuery());
|
||||
|
||||
// 2. Set group has `concurrencyLimit` pending queries.
|
||||
List<DefaultCoordinator> coords = new ArrayList<>();
|
||||
List<Thread> threads = new ArrayList<>();
|
||||
for (int i = 0; i < concurrencyLimit; i++) {
|
||||
for (TWorkGroup group : groups) {
|
||||
if (group.getId() == LogicalSlot.ABSENT_GROUP_ID) {
|
||||
mockResourceGroup(null);
|
||||
} else {
|
||||
mockResourceGroup(group);
|
||||
}
|
||||
DefaultCoordinator coord = getSchedulerWithQueryId("select count(1) from lineitem");
|
||||
coords.add(coord);
|
||||
|
||||
threads.add(new Thread(() -> Assertions.assertThrows(StarRocksException.class,
|
||||
() -> manager.maybeWait(connectContext, coord),
|
||||
"Cancelled")));
|
||||
}
|
||||
}
|
||||
threads.forEach(Thread::start);
|
||||
Awaitility.await().atMost(5, TimeUnit.SECONDS)
|
||||
.until(() -> numPendingCoords == MetricRepo.COUNTER_QUERY_QUEUE_PENDING.getValue());
|
||||
coords.forEach(coord -> Assertions.assertEquals(LogicalSlot.State.REQUIRING, coord.getSlot().getState()));
|
||||
Awaitility.await().atMost(5, TimeUnit.SECONDS)
|
||||
.until(() -> GlobalStateMgr.getCurrentState().getSlotManager().getSlots().size() ==
|
||||
numPendingCoords + concurrencyLimit);
|
||||
|
||||
// 3. Get ExecState(RUNNING/PENDING) via SlotManager.getExecStateByQueryId().
|
||||
List<LogicalSlot> slots = GlobalStateMgr.getCurrentState().getSlotManager().getSlots();
|
||||
Map<String, String> queryStateMap = Maps.newHashMap();
|
||||
SlotManager slotManager = (SlotManager) GlobalStateMgr.getCurrentState().getSlotManager();
|
||||
for (LogicalSlot slot : slots) {
|
||||
String queryId = DebugUtil.printId(slot.getSlotId());
|
||||
String state = slotManager.getExecStateByQueryId(queryId);
|
||||
queryStateMap.put(queryId, state);
|
||||
}
|
||||
long runningCnt = queryStateMap.values().stream().filter("RUNNING"::equals).count();
|
||||
long pendingCnt = queryStateMap.values().stream().filter("PENDING"::equals).count();
|
||||
Assertions.assertEquals(runningCnt, 2L);
|
||||
Assertions.assertEquals(pendingCnt, 2L);
|
||||
|
||||
coords.forEach(coor -> coor.cancel("Cancel by test"));
|
||||
runningCoords.forEach(DefaultCoordinator::onFinished);
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1648,6 +1648,7 @@ struct TQueryStatisticsInfo {
|
|||
14: optional string customQueryId
|
||||
15: optional string resourceGroupName
|
||||
16: optional string execProgress
|
||||
17: optional string execState
|
||||
}
|
||||
|
||||
struct TGetQueryStatisticsResponse {
|
||||
|
|
|
|||
Loading…
Reference in New Issue