[BugFix] Fix Warehouse Idle Check bug when switching warehouse (backport #56552) (backport #56604) (#56778)
Co-authored-by: mergify[bot] <37929162+mergify[bot]@users.noreply.github.com>
This commit is contained in:
parent
1189206caf
commit
2f78e09f3b
|
|
@ -525,8 +525,10 @@ public class StmtExecutor {
|
|||
httpResultSender = new HttpResultSender((HttpConnectContext) context);
|
||||
}
|
||||
|
||||
if (shouldMarkIdleCheck(parsedStmt)) {
|
||||
WarehouseIdleChecker.increaseRunningSQL(context.getCurrentWarehouseId());
|
||||
final boolean shouldMarkIdleCheck = shouldMarkIdleCheck(parsedStmt);
|
||||
final long originWarehouseId = context.getCurrentWarehouseId();
|
||||
if (shouldMarkIdleCheck) {
|
||||
WarehouseIdleChecker.increaseRunningSQL(originWarehouseId);
|
||||
}
|
||||
try {
|
||||
context.getState().setIsQuery(parsedStmt instanceof QueryStatement);
|
||||
|
|
@ -833,8 +835,8 @@ public class StmtExecutor {
|
|||
// restore session variable in connect context
|
||||
context.setSessionVariable(sessionVariableBackup);
|
||||
|
||||
if (shouldMarkIdleCheck(parsedStmt)) {
|
||||
WarehouseIdleChecker.decreaseRunningSQL(context.getCurrentWarehouseId());
|
||||
if (shouldMarkIdleCheck) {
|
||||
WarehouseIdleChecker.decreaseRunningSQL(originWarehouseId);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -285,6 +285,17 @@ public class WarehouseManager implements Writable {
|
|||
return workerGroupId;
|
||||
}
|
||||
|
||||
public long getWarehouseResumeTime(long warehouseId) {
|
||||
try (LockCloseable ignored = new LockCloseable(rwLock.readLock())) {
|
||||
Warehouse warehouse = idToWh.get(warehouseId);
|
||||
if (warehouse == null) {
|
||||
return -1;
|
||||
} else {
|
||||
return warehouse.getResumeTime();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private Optional<Long> selectWorkerGroupInternal(long warehouseId) {
|
||||
Warehouse warehouse = getWarehouse(warehouseId);
|
||||
List<Long> ids = warehouse.getWorkerGroupIds();
|
||||
|
|
|
|||
|
|
@ -72,4 +72,9 @@ public class DefaultWarehouse extends Warehouse {
|
|||
public ProcResult fetchResult() {
|
||||
return new BaseProcResult();
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getResumeTime() {
|
||||
return -1L;
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -56,6 +56,8 @@ public abstract class Warehouse implements Writable {
|
|||
Text.writeString(out, json);
|
||||
}
|
||||
|
||||
public abstract long getResumeTime();
|
||||
|
||||
public abstract Long getAnyWorkerGroupId();
|
||||
|
||||
public abstract List<Long> getWorkerGroupIds();
|
||||
|
|
|
|||
|
|
@ -76,7 +76,16 @@ public class WarehouseIdleChecker extends FrontendDaemon {
|
|||
if (runningJobCnt == 0
|
||||
&& lastFinishedJobTime <
|
||||
System.currentTimeMillis() - Config.warehouse_idle_check_interval_seconds * 2000) {
|
||||
warehouseIdleTime.putIfAbsent(wId, System.currentTimeMillis());
|
||||
long resumeTime = GlobalStateMgr.getCurrentState().getWarehouseMgr().getWarehouseResumeTime(wId);
|
||||
warehouseIdleTime.compute(wId, (k, v) -> {
|
||||
// If this is the first time to become idle, set idleTime to now.
|
||||
// If resumed during an idle period, change idleTime to resumeTime.
|
||||
if (v == null) {
|
||||
return System.currentTimeMillis();
|
||||
} else {
|
||||
return v < resumeTime ? resumeTime : v;
|
||||
}
|
||||
});
|
||||
LOG.info("warehouse: {} is idle, idle start time: {}",
|
||||
wId, TimeUtils.longToTimeString(warehouseIdleTime.get(wId)));
|
||||
} else {
|
||||
|
|
|
|||
Loading…
Reference in New Issue