Signed-off-by: meegoo <meegoo.sr@gmail.com> Co-authored-by: meegoo <meegoo.sr@gmail.com>
This commit is contained in:
parent
9d3a2e50df
commit
d675248863
|
|
@ -29,6 +29,7 @@ public class LoadConstants {
|
|||
public static final String RUNTIME_DETAILS_PLAN_TIME_MS = "plan_time_ms";
|
||||
public static final String RUNTIME_DETAILS_RECEIVE_DATA_TIME_MS = "receive_data_time_ms";
|
||||
public static final String RUNTIME_DETAILS_BEGIN_TXN_TIME_MS = "begin_txn_time_ms";
|
||||
public static final String RUNTIME_DETAILS_TXN_ERROR_MSG = "txn_error_msg";
|
||||
|
||||
public static final String PROPERTIES_TIMEOUT = "timeout";
|
||||
public static final String PROPERTIES_MAX_FILTER_RATIO = "max_filter_ratio";
|
||||
|
|
|
|||
|
|
@ -927,6 +927,15 @@ public abstract class LoadJob extends AbstractTxnStateChangeCallback implements
|
|||
}
|
||||
runtimeDetails.put(LoadConstants.RUNTIME_DETAILS_LOAD_ID, Joiner.on(", ").join(loadIds));
|
||||
runtimeDetails.put(LoadConstants.RUNTIME_DETAILS_TXN_ID, transactionId);
|
||||
TransactionState txnState = GlobalStateMgr.getCurrentState()
|
||||
.getGlobalTransactionMgr().getTransactionState(dbId, transactionId);
|
||||
if (txnState != null) {
|
||||
// NOTE: Do NOT acquire txnState write lock here to avoid lock inversion with
|
||||
// DatabaseTransactionMgr.finishTransaction() where txnState write lock is held
|
||||
// and LoadJob lock is requested in callbacks. Reading errMsg without locking is acceptable
|
||||
// for diagnostics, even if slightly stale.
|
||||
runtimeDetails.put(LoadConstants.RUNTIME_DETAILS_TXN_ERROR_MSG, txnState.getErrMsg());
|
||||
}
|
||||
runtimeDetails.putAll(loadingStatus.getLoadStatistic().toRuntimeDetails());
|
||||
Gson gson = new Gson();
|
||||
return gson.toJson(runtimeDetails);
|
||||
|
|
|
|||
|
|
@ -1631,6 +1631,12 @@ public class StreamLoadTask extends AbstractStreamLoadTask {
|
|||
runtimeDetails.put(LoadConstants.RUNTIME_DETAILS_BEGIN_TXN_TIME_MS, beginTxnTimeMs);
|
||||
runtimeDetails.put(LoadConstants.RUNTIME_DETAILS_RECEIVE_DATA_TIME_MS, receiveDataTimeMs);
|
||||
runtimeDetails.put(LoadConstants.RUNTIME_DETAILS_PLAN_TIME_MS, planTimeMs);
|
||||
TransactionState txnState = GlobalStateMgr.getCurrentState().getGlobalTransactionMgr().getTransactionState(dbId, txnId);
|
||||
if (txnState != null) {
|
||||
// Avoid acquiring txnState write lock here to prevent deadlocks with callbacks holding txn lock
|
||||
// and attempting to take StreamLoadTask/LoadJob locks. A slightly stale errMsg is acceptable.
|
||||
runtimeDetails.put(LoadConstants.RUNTIME_DETAILS_TXN_ERROR_MSG, txnState.getErrMsg());
|
||||
}
|
||||
Gson gson = new Gson();
|
||||
return gson.toJson(runtimeDetails);
|
||||
}
|
||||
|
|
|
|||
|
|
@ -613,6 +613,11 @@ public class PublishVersionDaemon extends FrontendDaemon {
|
|||
stateBatch.putBeTablets(partitionId, nodeToTablets);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
for (int i = 0; i < transactionStates.size(); i++) {
|
||||
TransactionState txnState = transactionStates.get(i);
|
||||
// Avoid holding txn write lock here; setting errMsg is best-effort for diagnostics
|
||||
txnState.setErrorMsg("Fail to publish partition " + partitionId + " error " + e.getMessage());
|
||||
}
|
||||
LOG.error("Fail to publish partition {} of txnIds {}:", partitionId,
|
||||
txnInfos.stream().map(i -> i.txnId).collect(Collectors.toList()), e);
|
||||
return false;
|
||||
|
|
@ -892,6 +897,9 @@ public class PublishVersionDaemon extends FrontendDaemon {
|
|||
}
|
||||
return true;
|
||||
} catch (Throwable e) {
|
||||
// Avoid holding txn write lock here; setting errMsg is best-effort for diagnostics
|
||||
txnState.setErrorMsg("Fail to publish partition " + partitionCommitInfo.getPhysicalPartitionId()
|
||||
+ " error " + e.getMessage());
|
||||
// prevent excessive logging
|
||||
if (partitionCommitInfo.getVersionTime() < 0 &&
|
||||
Math.abs(partitionCommitInfo.getVersionTime()) + 10000 < System.currentTimeMillis()) {
|
||||
|
|
|
|||
Loading…
Reference in New Issue