[BugFix] Fix multi statement stream load due to invalid source type (backport #63044) (#63154)

Signed-off-by: meegoo <meegoo.sr@gmail.com>
Co-authored-by: meegoo <meegoo.sr@gmail.com>
This commit is contained in:
mergify[bot] 2025-09-16 11:24:43 -07:00 committed by GitHub
parent d675248863
commit 0d8ffc659e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 21 additions and 5 deletions

View File

@ -38,7 +38,7 @@ public class TxnInfoHelper {
}
infoPB.setGtid(state.getGlobalTransactionId());
// set load ids
if (state.getLoadIds() != null) {
if (state.getLoadIds() != null && state.getSourceType() == TransactionState.LoadJobSourceType.INSERT_STREAMING) {
infoPB.setLoadIds(state.getLoadIds().stream()
.map(tUniqueId -> {
PUniqueId pUniqueId = new PUniqueId();

View File

@ -159,7 +159,8 @@ public class StreamLoadMultiStmtTask extends AbstractStreamLoadTask {
}
public void beginTxn(TransactionResult resp) {
TransactionStmtExecutor.beginStmt(context, new BeginStmt(NodePosition.ZERO));
TransactionStmtExecutor.beginStmt(context, new BeginStmt(NodePosition.ZERO),
TransactionState.LoadJobSourceType.MULTI_STATEMENT_STREAMING);
this.txnId = context.getTxnId();
LOG.info("start transaction id {}", txnId);
}

View File

@ -15,6 +15,7 @@
package com.starrocks.transaction;
import com.google.common.collect.Maps;
import com.google.gson.Gson;
import com.starrocks.sql.ast.DmlStmt;
import com.starrocks.thrift.TUniqueId;
@ -126,5 +127,12 @@ public class ExplicitTxnState {
public void setDmlStmt(DmlStmt dmlStmt) {
this.dmlStmt = dmlStmt;
}
@Override
public String toString() {
Gson gson = new Gson();
return gson.toJson(this);
}
}
}

View File

@ -51,6 +51,7 @@ import com.starrocks.common.Config;
import com.starrocks.common.StarRocksException;
import com.starrocks.common.TraceManager;
import com.starrocks.common.io.Writable;
import com.starrocks.common.util.DebugUtil;
import com.starrocks.metric.MetricRepo;
import com.starrocks.persist.gson.GsonPreProcessable;
import com.starrocks.server.GlobalStateMgr;
@ -895,6 +896,8 @@ public class TransactionState implements Writable, GsonPreProcessable {
sb.append(", label: ").append(label);
sb.append(", db id: ").append(dbId);
sb.append(", table id list: ").append(StringUtils.join(tableIdList, ","));
sb.append(", load id list: ").append(loadIds != null ?
loadIds.stream().map(DebugUtil::printId).collect(java.util.stream.Collectors.joining(",")) : "null");
sb.append(", callback id: ").append(getCallbackId());
sb.append(", coordinator: ").append(txnCoordinator.toString());
sb.append(", transaction status: ").append(transactionStatus);

View File

@ -78,6 +78,10 @@ public class TransactionStmtExecutor {
private static final Logger LOG = LogManager.getLogger(TransactionStmtExecutor.class);
public static void beginStmt(ConnectContext context, BeginStmt stmt) {
beginStmt(context, stmt, TransactionState.LoadJobSourceType.INSERT_STREAMING);
}
public static void beginStmt(ConnectContext context, BeginStmt stmt, TransactionState.LoadJobSourceType sourceType) {
GlobalTransactionMgr globalTransactionMgr = GlobalStateMgr.getCurrentState().getGlobalTransactionMgr();
if (context.getTxnId() != 0) {
//Repeated begin does not create a new transaction
@ -92,7 +96,7 @@ public class TransactionStmtExecutor {
.getTransactionIDGenerator().getNextTransactionId();
String label = DebugUtil.printId(context.getExecutionId());
TransactionState transactionState = new TransactionState(transactionId, label, null,
TransactionState.LoadJobSourceType.INSERT_STREAMING,
sourceType,
new TransactionState.TxnCoordinator(TransactionState.TxnSourceType.FE, FrontendOptions.getLocalHostAddress()),
context.getExecTimeout() * 1000L);

View File

@ -38,7 +38,7 @@ public class TxnInfoHelperTest {
Mockito.when(state.getTransactionId()).thenReturn(42L);
Mockito.when(state.isUseCombinedTxnLog()).thenReturn(true);
Mockito.when(state.getCommitTime()).thenReturn(5000L); // ms
Mockito.when(state.getSourceType()).thenReturn(TransactionState.LoadJobSourceType.LAKE_COMPACTION);
Mockito.when(state.getSourceType()).thenReturn(TransactionState.LoadJobSourceType.INSERT_STREAMING);
CompactionTxnCommitAttachment attachment = Mockito.mock(CompactionTxnCommitAttachment.class);
Mockito.when(attachment.getForceCommit()).thenReturn(true);
@ -56,7 +56,7 @@ public class TxnInfoHelperTest {
assertEquals(42L, info.txnId.longValue());
assertTrue(info.combinedTxnLog);
assertEquals(5L, info.commitTime.longValue()); // seconds
assertTrue(info.forcePublish);
assertFalse(info.forcePublish);
// Compare gtid as long
assertEquals(1001L, info.getGtid().longValue());