[Enhancement] Add transaction error message to loads internal table (backport #61364) (#62851)

Signed-off-by: meegoo <meegoo.sr@gmail.com>
Co-authored-by: meegoo <meegoo.sr@gmail.com>
This commit is contained in:
mergify[bot] 2025-09-09 04:31:21 +00:00 committed by GitHub
parent 1cd78def58
commit 183bae2ff1
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 40 additions and 2 deletions

View File

@ -1221,7 +1221,7 @@ public class Config extends ConfigBase {
* Online optimize table allows to optimize a table without blocking write operations.
*/
@ConfField(mutable = true)
public static boolean enable_online_optimize_table = true;
public static boolean enable_online_optimize_table = false;
/**
* If set to true, FE will check backend available capacity by storage medium when create table

View File

@ -29,6 +29,7 @@ public class LoadConstants {
public static final String RUNTIME_DETAILS_PLAN_TIME_MS = "plan_time_ms";
public static final String RUNTIME_DETAILS_RECEIVE_DATA_TIME_MS = "receive_data_time_ms";
public static final String RUNTIME_DETAILS_BEGIN_TXN_TIME_MS = "begin_txn_time_ms";
public static final String RUNTIME_DETAILS_TXN_ERROR_MSG = "txn_error_msg";
public static final String PROPERTIES_TIMEOUT = "timeout";
public static final String PROPERTIES_MAX_FILTER_RATIO = "max_filter_ratio";

View File

@ -927,6 +927,16 @@ public abstract class LoadJob extends AbstractTxnStateChangeCallback implements
}
runtimeDetails.put(LoadConstants.RUNTIME_DETAILS_LOAD_ID, Joiner.on(", ").join(loadIds));
runtimeDetails.put(LoadConstants.RUNTIME_DETAILS_TXN_ID, transactionId);
TransactionState txnState = GlobalStateMgr.getCurrentState()
.getGlobalTransactionMgr().getTransactionState(dbId, transactionId);
if (txnState != null) {
txnState.writeLock();
try {
runtimeDetails.put(LoadConstants.RUNTIME_DETAILS_TXN_ERROR_MSG, txnState.getErrMsg());
} finally {
txnState.writeUnlock();
}
}
runtimeDetails.putAll(loadingStatus.getLoadStatistic().toRuntimeDetails());
Gson gson = new Gson();
return gson.toJson(runtimeDetails);

View File

@ -1534,6 +1534,15 @@ public class StreamLoadTask extends AbstractTxnStateChangeCallback
runtimeDetails.put(LoadConstants.RUNTIME_DETAILS_BEGIN_TXN_TIME_MS, beginTxnTimeMs);
runtimeDetails.put(LoadConstants.RUNTIME_DETAILS_RECEIVE_DATA_TIME_MS, receiveDataTimeMs);
runtimeDetails.put(LoadConstants.RUNTIME_DETAILS_PLAN_TIME_MS, planTimeMs);
TransactionState txnState = GlobalStateMgr.getCurrentState().getGlobalTransactionMgr().getTransactionState(dbId, txnId);
if (txnState != null) {
txnState.writeLock();
try {
runtimeDetails.put(LoadConstants.RUNTIME_DETAILS_TXN_ERROR_MSG, txnState.getErrMsg());
} finally {
txnState.writeUnlock();
}
}
Gson gson = new Gson();
return gson.toJson(runtimeDetails);
}

View File

@ -613,6 +613,15 @@ public class PublishVersionDaemon extends FrontendDaemon {
stateBatch.putBeTablets(partitionId, nodeToTablets);
}
} catch (Exception e) {
for (int i = 0; i < transactionStates.size(); i++) {
TransactionState txnState = transactionStates.get(i);
txnState.writeLock();
try {
txnState.setErrorMsg("Fail to publish partition " + partitionId + " error " + e.getMessage());
} finally {
txnState.writeUnlock();
}
}
LOG.error("Fail to publish partition {} of txnIds {}:", partitionId,
txnInfos.stream().map(i -> i.txnId).collect(Collectors.toList()), e);
return false;
@ -892,6 +901,13 @@ public class PublishVersionDaemon extends FrontendDaemon {
}
return true;
} catch (Throwable e) {
txnState.writeLock();
try {
txnState.setErrorMsg("Fail to publish partition " + partitionCommitInfo.getPhysicalPartitionId()
+ " error " + e.getMessage());
} finally {
txnState.writeUnlock();
}
// prevent excessive logging
if (partitionCommitInfo.getVersionTime() < 0 &&
Math.abs(partitionCommitInfo.getVersionTime()) + 10000 < System.currentTimeMillis()) {

View File

@ -15,6 +15,7 @@
package com.starrocks.alter;
import com.starrocks.catalog.OlapTable;
import com.starrocks.common.Config;
import com.starrocks.common.StarRocksException;
import com.starrocks.sql.ast.KeysDesc;
import com.starrocks.sql.ast.OptimizeClause;
@ -86,6 +87,7 @@ public class OptimizeJobV2BuilderTest {
@Test
public void testBuildWithoutOptimizeClause() throws StarRocksException {
Config.enable_online_optimize_table = true;
// Create a mock OlapTable
OlapTable table = Mockito.mock(OlapTable.class);
Mockito.when(table.getId()).thenReturn(123L);
@ -108,4 +110,4 @@ public class OptimizeJobV2BuilderTest {
Assertions.assertEquals(123L, onlineOptimizeJob.getTableId());
Assertions.assertEquals("myTable", onlineOptimizeJob.getTableName());
}
}
}