[BugFix] Fixed phased scheduler always waiting for profile collection in sync profile collection (backport #62140) (#62177)

Signed-off-by: stdpain <drfeng08@gmail.com>
Co-authored-by: stdpain <34912776+stdpain@users.noreply.github.com>
This commit is contained in:
mergify[bot] 2025-08-21 16:43:48 +08:00 committed by GitHub
parent 9db589ba27
commit 72c4842aa6
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 31 additions and 5 deletions

View File

@ -73,6 +73,8 @@ public class FeConstants {
"Compute node not found. Check if any compute node is down.";
public static final String QUERY_FINISHED_ERROR = "QueryFinished";
public static final String LIMIT_REACH_ERROR = "LimitReach";
public static final String SCHEDULE_FRAGMENT_ERROR =
"Schedule Fragment error. caused by:";
public static boolean USE_MOCK_DICT_MANAGER = false;

View File

@ -587,7 +587,7 @@ public class DefaultCoordinator extends Coordinator {
scheduler.tryScheduleNextTurn(fragmentInstanceId);
} catch (Exception e) {
LOG.warn("schedule fragment:{} next internal error:", DebugUtil.printId(fragmentInstanceId), e);
cancel(PPlanFragmentCancelReason.INTERNAL_ERROR, e.getMessage());
cancel(PPlanFragmentCancelReason.INTERNAL_ERROR, FeConstants.SCHEDULE_FRAGMENT_ERROR + e.getMessage());
return Status.internalError(e.getMessage());
}
return Status.OK;
@ -1007,6 +1007,11 @@ public class DefaultCoordinator extends Coordinator {
public void cancel(PPlanFragmentCancelReason reason, String message) {
lock();
try {
// All results have been obtained. The query has ended. Ignore this error.
if (returnedAllResults) {
cancelInternal(PPlanFragmentCancelReason.QUERY_FINISHED);
return;
}
if (!queryStatus.ok()) {
// we can't cancel twice
return;
@ -1020,10 +1025,10 @@ public class DefaultCoordinator extends Coordinator {
try {
// Disable count down profileDoneSignal for collect all backend's profile
// but if backend has crashed, we need count down profileDoneSignal since it will not report by itself
if (message.equals(FeConstants.BACKEND_NODE_NOT_FOUND_ERROR)) {
if (message.equals(FeConstants.BACKEND_NODE_NOT_FOUND_ERROR) ||
message.startsWith(FeConstants.SCHEDULE_FRAGMENT_ERROR)) {
queryProfile.finishAllInstances(Status.OK);
LOG.info("count down profileDoneSignal since backend has crashed, query id: {}",
DebugUtil.printId(jobSpec.getQueryId()));
}
} finally {
unlock();

View File

@ -211,4 +211,17 @@ with agged_table as ( select distinct c0, c1 from t0) select /*+SET_VAR(cbo_cte_
with agged_table as ( select distinct c0, c1 from t0) select /*+SET_VAR(cbo_cte_reuse_rate=-1) */ count(*), sum(c0),sum(c1) from (select l.c1, l.c0 from agged_table l join t0 r on l.c0=r.c2 union all select * from agged_table) tb;
-- result:
81921 1677762560.0 1677762560.0
-- !result
set enable_profile=true;
-- result:
-- !result
set enable_async_profile=true;
-- result:
-- !result
set profile_timeout=300;
-- result:
-- !result
select count(*) from small_table1 s1 join small_table2 s2 on s1.c0 = s2.c0 join small_table3 s3 on s1.c0 = s3.c0 where s3.c3 = 0;
-- result:
0
-- !result

View File

@ -118,4 +118,10 @@ with agged_table as ( select distinct c0,c1 from (select l.c0, r.c1 from t0 l jo
select count(*) from small_table1 s1 join small_table2 s2 on s1.c0 = s2.c0 join small_table3 s3 on s1.c0 = s3.c0 join small_table1 s4 on s1.c0 = s4.c0 join small_table2 s5 on s1.c0 = s5.c0 join small_table3 s6 on s1.c0 = s6.c0 join small_table1 s7 on s1.c0 = s7.c0 join small_table2 s8 on s1.c0 = s8.c0 join small_table3 s9 on s1.c0 = s9.c0 join small_table1 s10 on s1.c0 = s10.c0;
-- union all with cte
with agged_table as ( select distinct c0, c1 from t0) select /*+SET_VAR(cbo_cte_reuse_rate=-1) */ count(*), sum(c0),sum(c1) from (select * from agged_table union all select l.c1, l.c0 from agged_table l join t0 r on l.c0=r.c2) tb;
with agged_table as ( select distinct c0, c1 from t0) select /*+SET_VAR(cbo_cte_reuse_rate=-1) */ count(*), sum(c0),sum(c1) from (select l.c1, l.c0 from agged_table l join t0 r on l.c0=r.c2 union all select * from agged_table) tb;
with agged_table as ( select distinct c0, c1 from t0) select /*+SET_VAR(cbo_cte_reuse_rate=-1) */ count(*), sum(c0),sum(c1) from (select l.c1, l.c0 from agged_table l join t0 r on l.c0=r.c2 union all select * from agged_table) tb;
-- test with sync collect profile
set enable_profile=true;
set enable_async_profile=true;
set profile_timeout=300;
select count(*) from small_table1 s1 join small_table2 s2 on s1.c0 = s2.c0 join small_table3 s3 on s1.c0 = s3.c0 where s3.c3 = 0;