[Feature] Support multi statement transaction (part1) - stream load (backport #61362) (#61917)

Signed-off-by: meegoo <meegoo.sr@gmail.com>
Co-authored-by: meegoo <meegoo.sr@gmail.com>
This commit is contained in:
mergify[bot] 2025-09-09 00:33:25 -07:00 committed by GitHub
parent 573454f4f8
commit 3be4653b77
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
32 changed files with 2163 additions and 217 deletions

View File

@ -676,7 +676,8 @@ Status FragmentExecutor::_prepare_stream_load_pipe(ExecEnv* exec_env, const Unif
delete ctx;
}
});
RETURN_IF_ERROR(exec_env->stream_context_mgr()->put_channel_context(label, channel_id, ctx));
RETURN_IF_ERROR(
exec_env->stream_context_mgr()->put_channel_context(label, table_name, channel_id, ctx));
}
stream_load_contexts.push_back(ctx);
}

View File

@ -236,12 +236,17 @@ int TransactionStreamLoadAction::on_header(HttpRequest* req) {
return -1;
}
const auto& table_name = req->header(HTTP_TABLE_KEY);
StreamLoadContext* ctx = nullptr;
if (!req->header(HTTP_CHANNEL_ID).empty()) {
int channel_id = std::stoi(req->header(HTTP_CHANNEL_ID));
ctx = _exec_env->stream_context_mgr()->get_channel_context(label, channel_id);
ctx = _exec_env->stream_context_mgr()->get_channel_context(label, table_name, channel_id);
} else {
ctx = _exec_env->stream_context_mgr()->get(label);
if (ctx == nullptr) {
ctx = _exec_env->stream_context_mgr()->get_channel_context(label, table_name, 0);
}
}
if (ctx == nullptr) {
_send_error_reply(req, Status::TransactionNotExists(fmt::format("Transaction with label {} not exists",

View File

@ -119,34 +119,45 @@ public:
return Status::OK();
}
Status put_channel_context(const string& label, int channel_id, StreamLoadContext* ctx) {
Status put_channel_context(const string& label, const string& table_name, int channel_id, StreamLoadContext* ctx) {
std::lock_guard<std::mutex> l(_lock);
auto it = _channel_stream_map.find(label);
if (it == std::end(_channel_stream_map)) {
std::unordered_map<int, StreamLoadContext*> empty_map;
std::unordered_map<std::string, std::unordered_map<int, StreamLoadContext*>> empty_map;
it = _channel_stream_map.emplace(label, std::move(empty_map)).first;
}
auto& label_channel_map = it->second;
auto it2 = label_channel_map.find(channel_id);
if (it2 != std::end(label_channel_map)) {
auto& label_map = it->second;
auto it_table = label_map.find(table_name);
if (it_table == std::end(label_map)) {
std::unordered_map<int, StreamLoadContext*> empty_channel_map;
it_table = label_map.emplace(table_name, std::move(empty_channel_map)).first;
}
auto& channel_map = it_table->second;
auto it2 = channel_map.find(channel_id);
if (it2 != std::end(channel_map)) {
return Status::InternalError("channel id " + std::to_string(channel_id) + " for label " + label +
" already exist");
}
ctx->ref();
label_channel_map.emplace(channel_id, ctx);
channel_map.emplace(channel_id, ctx);
LOG(INFO) << "stream load: " << label << ", channel_id: " << std::to_string(channel_id) << " start pipe";
return Status::OK();
}
StreamLoadContext* get_channel_context(const string& label, int channel_id) {
StreamLoadContext* get_channel_context(const string& label, const string& table_name, int channel_id) {
std::lock_guard<std::mutex> l(_lock);
auto it = _channel_stream_map.find(label);
if (it == std::end(_channel_stream_map)) {
return nullptr;
}
auto& label_channel_map = it->second;
auto it2 = label_channel_map.find(channel_id);
if (it2 == std::end(label_channel_map)) {
auto& label_map = it->second;
auto it_table = label_map.find(table_name);
if (it_table == std::end(label_map)) {
return nullptr;
}
auto& channel_map = it_table->second;
auto it2 = channel_map.find(channel_id);
if (it2 == std::end(channel_map)) {
return nullptr;
}
auto stream = it2->second;
@ -156,50 +167,63 @@ public:
void remove_channel_context(StreamLoadContext* ctx) {
const string& label = ctx->label;
const string& table_name = ctx->table;
int channel_id = ctx->channel_id;
std::lock_guard<std::mutex> l(_lock);
auto it = _channel_stream_map.find(label);
if (it != std::end(_channel_stream_map)) {
auto& label_channel_map = it->second;
auto it2 = label_channel_map.find(channel_id);
if (it2 != std::end(label_channel_map)) {
if (it2->second->unref()) {
delete it2->second;
auto& label_map = it->second;
auto it_table = label_map.find(table_name);
if (it_table != std::end(label_map)) {
auto& channel_map = it_table->second;
auto it2 = channel_map.find(channel_id);
if (it2 != std::end(channel_map)) {
if (it2->second->unref()) {
delete it2->second;
}
channel_map.erase(it2);
VLOG(3) << "remove channel stream load context: " << label << ", table: " << table_name
<< ", channel_id: " << std::to_string(channel_id);
}
if (channel_map.size() == 0) {
label_map.erase(it_table);
}
label_channel_map.erase(it2);
VLOG(3) << "remove channel stream load context: " << label
<< ", channel_id: " << std::to_string(channel_id);
}
if (label_channel_map.size() == 0) {
if (label_map.size() == 0) {
_channel_stream_map.erase(it);
}
}
};
Status finish_body_sink(const string& label, int channel_id) {
Status finish_body_sink(const string& label, const string& table_name, int channel_id) {
std::lock_guard<std::mutex> l(_lock);
auto it = _channel_stream_map.find(label);
if (it != std::end(_channel_stream_map)) {
auto& label_channel_map = it->second;
auto it2 = label_channel_map.find(channel_id);
if (it2 != std::end(label_channel_map)) {
if (it2->second->body_sink != nullptr) {
StreamLoadContext* ctx = it2->second;
// 1. finish stream pipe & wait it done
if (ctx->buffer != nullptr && ctx->buffer->pos > 0) {
ctx->buffer->flip();
RETURN_IF_ERROR(ctx->body_sink->append(std::move(ctx->buffer)));
ctx->buffer = nullptr;
auto& label_map = it->second;
auto it_table = label_map.find(table_name);
if (it_table != std::end(label_map)) {
auto& channel_map = it_table->second;
auto it2 = channel_map.find(channel_id);
if (it2 != std::end(channel_map)) {
if (it2->second->body_sink != nullptr) {
StreamLoadContext* ctx = it2->second;
// 1. finish stream pipe & wait it done
if (ctx->buffer != nullptr && ctx->buffer->pos > 0) {
ctx->buffer->flip();
RETURN_IF_ERROR(ctx->body_sink->append(std::move(ctx->buffer)));
ctx->buffer = nullptr;
}
RETURN_IF_ERROR(ctx->body_sink->finish());
} else {
std::string error_msg =
fmt::format("stream load {} table {} channel_id {}'s pipe doesn't exist", label,
table_name, std::to_string(channel_id));
LOG(WARNING) << error_msg;
return Status::InternalError(error_msg);
}
RETURN_IF_ERROR(ctx->body_sink->finish());
} else {
std::string error_msg = fmt::format("stream load {} channel_id {}'s pipe doesn't exist", label,
std::to_string(channel_id));
LOG(WARNING) << error_msg;
return Status::InternalError(error_msg);
LOG(INFO) << "stream load: " << label << ", table: " << table_name
<< ", channel_id: " << std::to_string(channel_id) << " finish pipe";
}
LOG(INFO) << "stream load: " << label << ", channel_id: " << std::to_string(channel_id)
<< " finish pipe";
}
}
return Status::OK();
@ -208,7 +232,8 @@ public:
private:
std::mutex _lock;
std::unordered_map<std::string, StreamLoadContext*> _stream_map;
std::unordered_map<std::string, std::unordered_map<int, StreamLoadContext*>> _channel_stream_map;
std::unordered_map<std::string, std::unordered_map<std::string, std::unordered_map<int, StreamLoadContext*>>>
_channel_stream_map;
};
class TransactionMgr {

View File

@ -135,10 +135,11 @@ void BackendServiceBase::submit_routine_load_task(TStatus& t_status, const std::
}
void BackendServiceBase::finish_stream_load_channel(TStatus& t_status, const TStreamLoadChannel& stream_load_channel) {
Status st = _exec_env->stream_context_mgr()->finish_body_sink(stream_load_channel.label,
stream_load_channel.channel_id);
Status st = _exec_env->stream_context_mgr()->finish_body_sink(
stream_load_channel.label, stream_load_channel.table_name, stream_load_channel.channel_id);
if (!st.ok()) {
LOG(WARNING) << "failed to finish stream load channel. label: " << stream_load_channel.label
<< " table name: " << stream_load_channel.table_name
<< " channel id: " << stream_load_channel.channel_id;
return st.to_thrift(&t_status);
}

View File

@ -16,8 +16,8 @@
package com.starrocks.common.proc;
import com.starrocks.common.AnalysisException;
import com.starrocks.load.streamload.AbstractStreamLoadTask;
import com.starrocks.load.streamload.StreamLoadMgr;
import com.starrocks.load.streamload.StreamLoadTask;
import com.starrocks.server.GlobalStateMgr;
import com.starrocks.sql.ast.ShowStreamLoadStmt;
@ -41,14 +41,16 @@ public class StreamLoadsLabelProcDir implements ProcNodeInterface {
@Override
public ProcResult fetchResult() throws AnalysisException {
StreamLoadMgr streamLoadManager = GlobalStateMgr.getCurrentState().getStreamLoadMgr();
List<StreamLoadTask> streamLoadTaskList = streamLoadManager.getTaskByName(label);
List<AbstractStreamLoadTask> streamLoadTaskList = streamLoadManager.getTaskByName(label);
if (streamLoadTaskList.isEmpty()) {
throw new AnalysisException("stream load label[" + label + "] does not exist");
}
BaseProcResult baseProcResult = new BaseProcResult();
baseProcResult.setNames(ShowStreamLoadStmt.getTitleNames());
for (StreamLoadTask streamLoadTask : streamLoadTaskList) {
baseProcResult.addRow(streamLoadTask.getShowInfo());
for (AbstractStreamLoadTask streamLoadTask : streamLoadTaskList) {
for (List<String> row : streamLoadTask.getShowInfo()) {
baseProcResult.addRow(row);
}
}
return baseProcResult;
}

View File

@ -19,8 +19,8 @@ import com.google.common.base.Strings;
import com.google.common.collect.ImmutableList;
import com.starrocks.common.AnalysisException;
import com.starrocks.common.MetaNotFoundException;
import com.starrocks.load.streamload.AbstractStreamLoadTask;
import com.starrocks.load.streamload.StreamLoadMgr;
import com.starrocks.load.streamload.StreamLoadTask;
import com.starrocks.server.GlobalStateMgr;
import java.util.List;
@ -62,9 +62,11 @@ public class StreamLoadsProcDir implements ProcDirInterface {
baseProcResult.setNames(TITLE_NAMES);
StreamLoadMgr streamLoadManager = GlobalStateMgr.getCurrentState().getStreamLoadMgr();
try {
List<StreamLoadTask> streamLoadTaskList = streamLoadManager.getTask(null, null, true);
for (StreamLoadTask streamLoadTask : streamLoadTaskList) {
baseProcResult.addRow(streamLoadTask.getShowBriefInfo());
List<AbstractStreamLoadTask> streamLoadTaskList = streamLoadManager.getTask(null, null, true);
for (AbstractStreamLoadTask streamLoadTask : streamLoadTaskList) {
for (List<String> row : streamLoadTask.getShowBriefInfo()) {
baseProcResult.addRow(row);
}
}
} catch (MetaNotFoundException e) {
throw new AnalysisException("failed to get all of stream load task");

View File

@ -50,6 +50,7 @@ import com.starrocks.http.BaseResponse;
import com.starrocks.http.HttpMetricRegistry;
import com.starrocks.http.IllegalArgException;
import com.starrocks.http.rest.transaction.BypassWriteTransactionHandler;
import com.starrocks.http.rest.transaction.MultiStatementTransactionHandler;
import com.starrocks.http.rest.transaction.TransactionOperation;
import com.starrocks.http.rest.transaction.TransactionOperationHandler;
import com.starrocks.http.rest.transaction.TransactionOperationHandler.ResultWrapper;
@ -109,6 +110,9 @@ public class TransactionLoadAction extends RestBaseAction {
private static final String CHANNEL_NUM_STR = "channel_num";
private static final String CHANNEL_ID_STR = "channel_id";
private static final String SOURCE_TYPE = "source_type";
private static final String TRANSACTION_TYPE = "transaction_type";
private static final String MULTI_STATEMENTS_TRANSACTION_TYPE = "multi";
private static TransactionLoadAction ac;
@ -278,6 +282,9 @@ public class TransactionLoadAction extends RestBaseAction {
}
LoadJobSourceType sourceType = params.getSourceType();
if (null != sourceType && sourceType.equals(LoadJobSourceType.MULTI_STATEMENT_STREAMING)) {
return new MultiStatementTransactionHandler(params);
}
// There can be several cases where sourceType is not specified (null) in the request:
// 1. The operation is BEGIN or LOAD. This is only allowed for transaction stream load for backward compatibility.
// 2. The operation is COMMIT, PREPARE, or ROLLBACK. It can be transaction stream load or bypass writer. Need to
@ -346,6 +353,14 @@ public class TransactionLoadAction extends RestBaseAction {
.map(sec -> sec * 1000L)
.orElse(-1L);
LoadJobSourceType sourceType = parseSourceType(request.getSingleParameter(SOURCE_TYPE));
if (sourceType == null) {
sourceType = parseSourceType(request.getRequest().headers().get(SOURCE_TYPE));
}
String transactionType = request.getRequest().headers().get(TRANSACTION_TYPE);
if (transactionType != null && transactionType.equalsIgnoreCase(MULTI_STATEMENTS_TRANSACTION_TYPE)) {
sourceType = LoadJobSourceType.MULTI_STATEMENT_STREAMING;
}
Integer channelId = Optional
.ofNullable(request.getRequest().headers().get(CHANNEL_ID_STR))

View File

@ -0,0 +1,81 @@
// Copyright 2021-present StarRocks, Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package com.starrocks.http.rest.transaction;
import com.starrocks.common.StarRocksException;
import com.starrocks.http.BaseRequest;
import com.starrocks.http.BaseResponse;
import com.starrocks.http.rest.TransactionResult;
import com.starrocks.server.GlobalStateMgr;
import com.starrocks.thrift.TNetworkAddress;
import com.starrocks.warehouse.Warehouse;
import com.starrocks.warehouse.cngroup.ComputeResource;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
/**
*
*/
public class MultiStatementTransactionHandler implements TransactionOperationHandler {
private static final Logger LOG = LogManager.getLogger(MultiStatementTransactionHandler.class);
private final TransactionOperationParams txnOperationParams;
public MultiStatementTransactionHandler(TransactionOperationParams txnOperationParams) {
this.txnOperationParams = txnOperationParams;
}
@Override
public ResultWrapper handle(BaseRequest request, BaseResponse response) throws StarRocksException {
TransactionOperation txnOperation = txnOperationParams.getTxnOperation();
String dbName = txnOperationParams.getDbName();
String tableName = txnOperationParams.getTableName();
Long timeoutMillis = txnOperationParams.getTimeoutMillis();
String label = txnOperationParams.getLabel();
LOG.info("Handle multi statement stream load transaction label: {} db: {} table: {} op: {}",
label, dbName, tableName, txnOperation.getValue());
TransactionResult result = new TransactionResult();
switch (txnOperation) {
case TXN_BEGIN:
String warehouseName = txnOperationParams.getWarehouseName();
Warehouse warehouse =
GlobalStateMgr.getCurrentState().getWarehouseMgr().getWarehouse(warehouseName);
ComputeResource computeResource =
GlobalStateMgr.getCurrentState().getWarehouseMgr().acquireComputeResource(warehouse.getId());
GlobalStateMgr.getCurrentState().getStreamLoadMgr().beginMultiStatementLoadTask(
dbName, label, "", "", timeoutMillis, result, computeResource);
return new ResultWrapper(result);
case TXN_COMMIT:
GlobalStateMgr.getCurrentState().getStreamLoadMgr().commitLoadTask(
label, request.getRequest().headers(), result);
return new ResultWrapper(result);
case TXN_ROLLBACK:
GlobalStateMgr.getCurrentState().getStreamLoadMgr().rollbackLoadTask(label, result);
return new ResultWrapper(result);
case TXN_LOAD:
TNetworkAddress redirectAddr = GlobalStateMgr.getCurrentState()
.getStreamLoadMgr().executeLoadTask(
label, 0, request.getRequest().headers(), result, dbName, tableName);
if (!result.stateOK() || result.containMsg()) {
return new ResultWrapper(result);
}
return new ResultWrapper(redirectAddr);
default:
throw new StarRocksException("Unsupported operation: " + txnOperation);
}
}
}

View File

@ -70,7 +70,7 @@ public class TransactionWithChannelHandler implements TransactionOperationHandle
return new ResultWrapper(result);
case TXN_PREPARE:
GlobalStateMgr.getCurrentState().getStreamLoadMgr().prepareLoadTask(
label, channel.getId(), request.getRequest().headers(), result);
label, tableName, channel.getId(), request.getRequest().headers(), result);
if (!result.stateOK() || result.containMsg()) {
return new ResultWrapper(result);
}
@ -78,7 +78,8 @@ public class TransactionWithChannelHandler implements TransactionOperationHandle
txnOperationParams.getPreparedTimeoutMillis(), result);
return new ResultWrapper(result);
case TXN_COMMIT:
GlobalStateMgr.getCurrentState().getStreamLoadMgr().commitLoadTask(label, result);
GlobalStateMgr.getCurrentState().getStreamLoadMgr().commitLoadTask(
label, request.getRequest().headers(), result);
return new ResultWrapper(result);
case TXN_ROLLBACK:
GlobalStateMgr.getCurrentState().getStreamLoadMgr().rollbackLoadTask(label, result);

View File

@ -242,7 +242,7 @@ public class KafkaTaskInfo extends RoutineLoadTaskInfo {
// plan for each task, in case table has change(rollup or schema change)
TExecPlanFragmentParams tExecPlanFragmentParams = routineLoadJob.plan(loadId, txnId, label);
if (tExecPlanFragmentParams.query_options.enable_profile) {
StreamLoadTask streamLoadTask = GlobalStateMgr.getCurrentState().
StreamLoadTask streamLoadTask = (StreamLoadTask) GlobalStateMgr.getCurrentState().
getStreamLoadMgr().getTaskByLabel(label);
setStreamLoadTask(streamLoadTask);
}

View File

@ -0,0 +1,64 @@
// Copyright 2021-present StarRocks, Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
package com.starrocks.load.streamload;
import com.starrocks.common.StarRocksException;
import com.starrocks.common.io.Writable;
import com.starrocks.http.rest.TransactionResult;
import com.starrocks.persist.gson.GsonPostProcessable;
import com.starrocks.persist.gson.GsonPreProcessable;
import com.starrocks.thrift.TLoadInfo;
import com.starrocks.thrift.TNetworkAddress;
import com.starrocks.thrift.TStreamLoadInfo;
import com.starrocks.thrift.TUniqueId;
import com.starrocks.transaction.AbstractTxnStateChangeCallback;
import com.starrocks.warehouse.LoadJobWithWarehouse;
import io.netty.handler.codec.http.HttpHeaders;
import java.util.List;
/**
* Abstract base class for stream load tasks
*/
public abstract class AbstractStreamLoadTask extends AbstractTxnStateChangeCallback
implements Writable, GsonPostProcessable, GsonPreProcessable, LoadJobWithWarehouse {
public abstract void beginTxnFromFrontend(TransactionResult resp);
public abstract void beginTxnFromFrontend(int channelId, int channelNum, TransactionResult resp);
public abstract void beginTxnFromBackend(TUniqueId requestId, String clientIp, long backendId, TransactionResult resp);
public abstract TNetworkAddress tryLoad(int channelId, String tableName, TransactionResult resp) throws StarRocksException;
public abstract TNetworkAddress executeTask(int channelId, String tableName, HttpHeaders headers, TransactionResult resp);
public abstract void prepareChannel(int channelId, String tableName, HttpHeaders headers, TransactionResult resp);
public abstract void waitCoordFinishAndPrepareTxn(long preparedTimeoutMs, TransactionResult resp);
public abstract void commitTxn(HttpHeaders headers, TransactionResult resp) throws StarRocksException;
public abstract void manualCancelTask(TransactionResult resp) throws StarRocksException;
public abstract boolean checkNeedRemove(long currentMs, boolean isForce);
public abstract boolean checkNeedPrepareTxn();
public abstract boolean isDurableLoadState();
public abstract void cancelAfterRestart();
public abstract List<TStreamLoadInfo> toStreamLoadThrift();
public abstract List<TLoadInfo> toThrift();
public abstract void init();
// Common getters used by StreamLoadMgr
public abstract long getId();
public abstract String getLabel();
public abstract String getDBName();
public abstract long getDBId();
public abstract long getTxnId();
public abstract String getTableName();
public abstract String getStateName();
public abstract boolean isFinalState();
public abstract long createTimeMs();
public abstract long endTimeMs();
public abstract long getFinishTimestampMs();
public abstract List<List<String>> getShowInfo();
public abstract List<List<String>> getShowBriefInfo();
public abstract String getStringByType();
}

View File

@ -26,16 +26,16 @@ import org.apache.logging.log4j.Logger;
/**
* Provide the predicate chain and comparator chain
* which would be used in `List<StreamLoadTask>.stream().filter(predicateChain).sorted(comparatorChain).skip().limit()`
* which would be used in `List<AbstractStreamLoadTask>.stream().filter(predicateChain).sorted(comparatorChain).skip().limit()`
* with a group of pre-defined ColumnValueSuppliers.
*/
public class StreamLoadFunctionalExprProvider extends FunctionalExprProvider<StreamLoadTask> {
public class StreamLoadFunctionalExprProvider extends FunctionalExprProvider<AbstractStreamLoadTask> {
private static final Logger LOG = LogManager.getLogger(StreamLoadFunctionalExprProvider.class);
private static final ColumnValueSupplier<StreamLoadTask> TASK_NAME_SUPPLIER =
new ColumnValueSupplier<StreamLoadTask>() {
private static final ColumnValueSupplier<AbstractStreamLoadTask> TASK_NAME_SUPPLIER =
new ColumnValueSupplier<AbstractStreamLoadTask>() {
@Override
public String getColumnName() {
return "Label";
@ -48,12 +48,12 @@ public class StreamLoadFunctionalExprProvider extends FunctionalExprProvider<Str
@Override
@SuppressWarnings("unchecked")
public String getColumnValue(StreamLoadTask task) {
public String getColumnValue(AbstractStreamLoadTask task) {
return task.getLabel();
}
};
private static final ColumnValueSupplier<StreamLoadTask> TASK_ID_SUPPLIER =
new ColumnValueSupplier<StreamLoadTask>() {
private static final ColumnValueSupplier<AbstractStreamLoadTask> TASK_ID_SUPPLIER =
new ColumnValueSupplier<AbstractStreamLoadTask>() {
@Override
public String getColumnName() {
return "Id";
@ -66,12 +66,12 @@ public class StreamLoadFunctionalExprProvider extends FunctionalExprProvider<Str
@Override
@SuppressWarnings("unchecked")
public Long getColumnValue(StreamLoadTask task) {
public Long getColumnValue(AbstractStreamLoadTask task) {
return task.getId();
}
};
private static final ColumnValueSupplier<StreamLoadTask> TASK_CREATE_TIME_SUPPLIER =
new ColumnValueSupplier<StreamLoadTask>() {
private static final ColumnValueSupplier<AbstractStreamLoadTask> TASK_CREATE_TIME_SUPPLIER =
new ColumnValueSupplier<AbstractStreamLoadTask>() {
@Override
public String getColumnName() {
return "CreateTimeMs";
@ -84,12 +84,12 @@ public class StreamLoadFunctionalExprProvider extends FunctionalExprProvider<Str
@Override
@SuppressWarnings("unchecked")
public Long getColumnValue(StreamLoadTask task) {
public Long getColumnValue(AbstractStreamLoadTask task) {
return task.createTimeMs() / 1000 * 1000;
}
};
private static final ColumnValueSupplier<StreamLoadTask> TASK_DB_NAME_SUPPLIER =
new ColumnValueSupplier<StreamLoadTask>() {
private static final ColumnValueSupplier<AbstractStreamLoadTask> TASK_DB_NAME_SUPPLIER =
new ColumnValueSupplier<AbstractStreamLoadTask>() {
@Override
public String getColumnName() {
return "DbName";
@ -102,12 +102,12 @@ public class StreamLoadFunctionalExprProvider extends FunctionalExprProvider<Str
@Override
@SuppressWarnings("unchecked")
public String getColumnValue(StreamLoadTask task) {
public String getColumnValue(AbstractStreamLoadTask task) {
return task.getDBName();
}
};
private static final ColumnValueSupplier<StreamLoadTask> TASK_TABLE_NAME_SUPPLIER =
new ColumnValueSupplier<StreamLoadTask>() {
private static final ColumnValueSupplier<AbstractStreamLoadTask> TASK_TABLE_NAME_SUPPLIER =
new ColumnValueSupplier<AbstractStreamLoadTask>() {
@Override
public String getColumnName() {
return "TableName";
@ -120,12 +120,12 @@ public class StreamLoadFunctionalExprProvider extends FunctionalExprProvider<Str
@Override
@SuppressWarnings("unchecked")
public String getColumnValue(StreamLoadTask task) {
public String getColumnValue(AbstractStreamLoadTask task) {
return task.getTableName();
}
};
private static final ColumnValueSupplier<StreamLoadTask> TASK_STATE_SUPPLIER =
new ColumnValueSupplier<StreamLoadTask>() {
private static final ColumnValueSupplier<AbstractStreamLoadTask> TASK_STATE_SUPPLIER =
new ColumnValueSupplier<AbstractStreamLoadTask>() {
@Override
public String getColumnName() {
return "State";
@ -138,13 +138,13 @@ public class StreamLoadFunctionalExprProvider extends FunctionalExprProvider<Str
@Override
@SuppressWarnings("unchecked")
public String getColumnValue(StreamLoadTask task) {
public String getColumnValue(AbstractStreamLoadTask task) {
return task.getStateName();
}
};
private static final ColumnValueSupplier<StreamLoadTask> TASK_TYPE_SUPPLIER =
new ColumnValueSupplier<StreamLoadTask>() {
private static final ColumnValueSupplier<AbstractStreamLoadTask> TASK_TYPE_SUPPLIER =
new ColumnValueSupplier<AbstractStreamLoadTask>() {
@Override
public String getColumnName() {
return "Type";
@ -157,15 +157,15 @@ public class StreamLoadFunctionalExprProvider extends FunctionalExprProvider<Str
@Override
@SuppressWarnings("unchecked")
public String getColumnValue(StreamLoadTask task) {
public String getColumnValue(AbstractStreamLoadTask task) {
return task.getStringByType();
}
};
@Override
protected ImmutableList<ColumnValueSupplier<StreamLoadTask>> delegateWhereSuppliers() {
protected ImmutableList<ColumnValueSupplier<AbstractStreamLoadTask>> delegateWhereSuppliers() {
// return a group of ColumnValueSuppliers which are abled to be filtered and ordered.
return new ImmutableList.Builder<ColumnValueSupplier<StreamLoadTask>>()
return new ImmutableList.Builder<ColumnValueSupplier<AbstractStreamLoadTask>>()
.add(TASK_NAME_SUPPLIER)
.add(TASK_ID_SUPPLIER)
.add(TASK_CREATE_TIME_SUPPLIER)
@ -177,7 +177,7 @@ public class StreamLoadFunctionalExprProvider extends FunctionalExprProvider<Str
}
@Override
protected boolean delegatePostRowFilter(ConnectContext cxt, StreamLoadTask task) {
protected boolean delegatePostRowFilter(ConnectContext cxt, AbstractStreamLoadTask task) {
// validate table privilege at the end of a predicateChain in the `stream().filter()`
try {
Authorizer.checkTableAction(

View File

@ -64,14 +64,14 @@ public class StreamLoadMgr implements MemoryTrackable {
private static final Logger LOG = LogManager.getLogger(StreamLoadMgr.class);
private static final int MEMORY_JOB_SAMPLES = 10;
// label -> streamLoadTask
private Map<String, StreamLoadTask> idToStreamLoadTask;
// label -> AbstractStreamLoadTask (unified management)
private Map<String, AbstractStreamLoadTask> idToStreamLoadTask;
// Only used for sync stream load
// txnId -> streamLoadTask
// txnId -> StreamLoadTask (only StreamLoadTask can be sync)
private Map<Long, StreamLoadTask> txnIdToSyncStreamLoadTasks;
private Map<Long, Map<String, StreamLoadTask>> dbToLabelToStreamLoadTask;
private Map<Long, Map<String, AbstractStreamLoadTask>> dbToLabelToStreamLoadTask;
protected final WarehouseLoadInfoBuilder warehouseLoadStatusInfoBuilder =
new WarehouseLoadInfoBuilder();
@ -106,6 +106,51 @@ public class StreamLoadMgr implements MemoryTrackable {
lock = new ReentrantReadWriteLock(true);
}
public void beginMultiStatementLoadTask(String dbName, String label, String user,
String clientIp, long timeoutMillis,
TransactionResult resp, ComputeResource computeResource) throws StarRocksException {
AbstractStreamLoadTask task = null;
Database db = checkDbName(dbName);
long dbId = db.getId();
readLock();
try {
task = idToStreamLoadTask.get(label);
if (task != null) {
task.beginTxnFromFrontend(resp);
return;
}
} finally {
readUnlock();
}
boolean createTask = false;
writeLock();
try {
task = idToStreamLoadTask.get(label);
if (task != null) {
task.beginTxnFromFrontend(resp);
return;
}
task = createMultiStatementLoadTask(db, label, user, clientIp, timeoutMillis, computeResource);
addLoadTask(task);
LOG.info("create multi statment task {}", task);
task.beginTxnFromFrontend(resp);
createTask = true;
} finally {
writeUnlock();
}
if (createTask) {
GlobalStateMgr.getCurrentState().getEditLog().logCreateMultiStmtStreamLoadJob((StreamLoadMultiStmtTask) task);
LOG.info("create multi statement task success");
}
}
public void prepareMultiStatementLoadTask(String label, String tableName, HttpHeaders headers, TransactionResult resp)
throws StarRocksException {
}
public void beginLoadTaskFromFrontend(String dbName, String tableName, String label, String user,
String clientIp, long timeoutMillis, int channelNum,
int channelId, TransactionResult resp) throws StarRocksException {
@ -116,7 +161,7 @@ public class StreamLoadMgr implements MemoryTrackable {
public void beginLoadTaskFromFrontend(String dbName, String tableName, String label, String user,
String clientIp, long timeoutMillis, int channelNum, int channelId,
TransactionResult resp, ComputeResource computeResource) throws StarRocksException {
StreamLoadTask task = null;
AbstractStreamLoadTask task = null;
Database db = checkDbName(dbName);
long dbId = db.getId();
// if task is already created, return directly
@ -153,7 +198,7 @@ public class StreamLoadMgr implements MemoryTrackable {
writeUnlock();
}
if (createTask) {
GlobalStateMgr.getCurrentState().getEditLog().logCreateStreamLoadJob(task);
GlobalStateMgr.getCurrentState().getEditLog().logCreateStreamLoadJob((StreamLoadTask) task);
}
}
@ -163,7 +208,7 @@ public class StreamLoadMgr implements MemoryTrackable {
TransactionResult resp, boolean isRoutineLoad,
ComputeResource computeResource, long backendId)
throws StarRocksException {
StreamLoadTask task = null;
AbstractStreamLoadTask task = null;
Database db = checkDbName(dbName);
long dbId = db.getId();
Table table = checkMeta(db, tableName);
@ -182,6 +227,14 @@ public class StreamLoadMgr implements MemoryTrackable {
}
}
public StreamLoadMultiStmtTask createMultiStatementLoadTask(Database db, String label, String user, String clientIp,
long timeoutMillis, ComputeResource computeResource) {
long id = GlobalStateMgr.getCurrentState().getNextId();
StreamLoadMultiStmtTask streamLoadTask = new StreamLoadMultiStmtTask(id, db, label, user, clientIp,
timeoutMillis, System.currentTimeMillis(), computeResource);
return streamLoadTask;
}
public StreamLoadTask createLoadTaskWithoutLock(Database db, Table table, String label, String user, String clientIp,
long timeoutMillis, boolean isRoutineLoad, ComputeResource computeResource) {
// init stream load task
@ -224,7 +277,7 @@ public class StreamLoadMgr implements MemoryTrackable {
return table;
}
public void replayCreateLoadTask(StreamLoadTask loadJob) {
public void replayCreateLoadTask(AbstractStreamLoadTask loadJob) {
addLoadTask(loadJob);
LOG.info(new LogBuilder(LogKey.STREAM_LOAD_TASK, loadJob.getId())
.add("msg", "replay create load job")
@ -241,20 +294,15 @@ public class StreamLoadMgr implements MemoryTrackable {
}
// add load tasks and also add callback factory
public void addLoadTask(StreamLoadTask task) {
if (task.isSyncStreamLoad()) {
txnIdToSyncStreamLoadTasks.put(task.getTxnId(), task);
public void addLoadTask(AbstractStreamLoadTask task) {
if (task instanceof StreamLoadTask && ((StreamLoadTask) task).isSyncStreamLoad()) {
txnIdToSyncStreamLoadTasks.put(task.getTxnId(), (StreamLoadTask) task);
}
// Clear the stream load tasks manually
if (idToStreamLoadTask.size() > Config.stream_load_task_keep_max_num) {
// If enable_load_profile = true,
// most stream load tasks are generated through flink-cdc and routine load generally,
// so clearing the syncStreamLoadTask is preferred.
LOG.info("trigger cleanSyncStreamLoadTasks when add load task label:{}", task.getLabel());
cleanSyncStreamLoadTasks();
// The size of idToStreamLoadTask is still huge, indicates that the type of most tasks is PARALLEL,
// so clean all the streamLoadTasks manaully not waitting for Config.stream_load_task_keep_max_second.
if (idToStreamLoadTask.size() > Config.stream_load_task_keep_max_num / 2) {
LOG.info("trigger cleanOldStreamLoadTasks when add load task label{}", task.getLabel());
cleanOldStreamLoadTasks(true);
@ -263,7 +311,7 @@ public class StreamLoadMgr implements MemoryTrackable {
long dbId = task.getDBId();
String label = task.getLabel();
Map<String, StreamLoadTask> labelToStreamLoadTask = null;
Map<String, AbstractStreamLoadTask> labelToStreamLoadTask = null;
if (dbToLabelToStreamLoadTask.containsKey(dbId)) {
labelToStreamLoadTask = dbToLabelToStreamLoadTask.get(dbId);
} else {
@ -273,7 +321,6 @@ public class StreamLoadMgr implements MemoryTrackable {
labelToStreamLoadTask.put(label, task);
idToStreamLoadTask.put(label, task);
// add callback before txn created, because callback will be performed on replay without txn begin
// register txn state listener
GlobalStateMgr.getCurrentState().getGlobalTransactionMgr().getCallbackFactory().addCallback(task);
}
@ -287,26 +334,21 @@ public class StreamLoadMgr implements MemoryTrackable {
if (!idToStreamLoadTask.containsKey(label)) {
throw new StarRocksException("stream load task " + label + " does not exist");
}
StreamLoadTask task = idToStreamLoadTask.get(label);
AbstractStreamLoadTask task = idToStreamLoadTask.get(label);
// check whether the database and table are consistent with the transaction,
// for single database and single table are supported so far
// check whether the database is consistent with the transaction
if (!task.getDBName().equals(dbName)) {
throw new StarRocksException(
String.format("Request table %s not equal transaction table %s", dbName, task.getDBName()));
}
if (!task.getTableName().equals(tableName)) {
throw new StarRocksException(
String.format("Request table %s not equal transaction table %s", tableName, task.getTableName()));
String.format("Request database %s not equal transaction database %s", dbName, task.getDBName()));
}
readUnlock();
needUnLock = false;
TNetworkAddress redirectAddress = task.tryLoad(channelId, resp);
TNetworkAddress redirectAddress = task.tryLoad(channelId, tableName, resp);
if (redirectAddress != null || !resp.stateOK() || resp.containMsg()) {
return redirectAddress;
}
return task.executeTask(channelId, headers, resp);
return task.executeTask(channelId, tableName, headers, resp);
} finally {
if (needUnLock) {
readUnlock();
@ -314,7 +356,7 @@ public class StreamLoadMgr implements MemoryTrackable {
}
}
public void prepareLoadTask(String label, int channelId, HttpHeaders headers, TransactionResult resp)
public void prepareLoadTask(String label, String tableName, int channelId, HttpHeaders headers, TransactionResult resp)
throws StarRocksException {
boolean needUnLock = true;
readLock();
@ -322,10 +364,10 @@ public class StreamLoadMgr implements MemoryTrackable {
if (!idToStreamLoadTask.containsKey(label)) {
throw new StarRocksException("stream load task " + label + " does not exist");
}
StreamLoadTask task = idToStreamLoadTask.get(label);
AbstractStreamLoadTask task = idToStreamLoadTask.get(label);
readUnlock();
needUnLock = false;
task.prepareChannel(channelId, headers, resp);
task.prepareChannel(channelId, tableName, headers, resp);
} finally {
if (needUnLock) {
readUnlock();
@ -341,7 +383,7 @@ public class StreamLoadMgr implements MemoryTrackable {
if (!idToStreamLoadTask.containsKey(label)) {
throw new StarRocksException("stream load task " + label + " does not exist");
}
StreamLoadTask task = idToStreamLoadTask.get(label);
AbstractStreamLoadTask task = idToStreamLoadTask.get(label);
readUnlock();
needUnLock = false;
if (task.checkNeedPrepareTxn()) {
@ -354,7 +396,7 @@ public class StreamLoadMgr implements MemoryTrackable {
}
}
public void commitLoadTask(String label, TransactionResult resp)
public void commitLoadTask(String label, HttpHeaders headers, TransactionResult resp)
throws StarRocksException {
boolean needUnLock = true;
readLock();
@ -362,10 +404,10 @@ public class StreamLoadMgr implements MemoryTrackable {
if (!idToStreamLoadTask.containsKey(label)) {
throw new StarRocksException("stream load task " + label + " does not exist");
}
StreamLoadTask task = idToStreamLoadTask.get(label);
AbstractStreamLoadTask task = idToStreamLoadTask.get(label);
readUnlock();
needUnLock = false;
task.commitTxn(resp);
task.commitTxn(headers, resp);
} finally {
if (needUnLock) {
readUnlock();
@ -381,7 +423,7 @@ public class StreamLoadMgr implements MemoryTrackable {
if (!idToStreamLoadTask.containsKey(label)) {
throw new StarRocksException("stream load task" + label + "does not exist");
}
StreamLoadTask task = idToStreamLoadTask.get(label);
AbstractStreamLoadTask task = idToStreamLoadTask.get(label);
readUnlock();
needUnLock = false;
task.manualCancelTask(resp);
@ -399,14 +441,14 @@ public class StreamLoadMgr implements MemoryTrackable {
LOG.debug("begin to clean old stream load tasks");
writeLock();
try {
Iterator<Map.Entry<String, StreamLoadTask>> iterator = idToStreamLoadTask.entrySet().iterator();
Iterator<Map.Entry<String, AbstractStreamLoadTask>> iterator = idToStreamLoadTask.entrySet().iterator();
long currentMs = System.currentTimeMillis();
while (iterator.hasNext()) {
StreamLoadTask streamLoadTask = iterator.next().getValue();
AbstractStreamLoadTask streamLoadTask = iterator.next().getValue();
if (streamLoadTask.checkNeedRemove(currentMs, isForce)) {
unprotectedRemoveTaskFromDb(streamLoadTask);
iterator.remove();
if (streamLoadTask.isSyncStreamLoad()) {
if (streamLoadTask instanceof StreamLoadTask && ((StreamLoadTask) streamLoadTask).isSyncStreamLoad()) {
txnIdToSyncStreamLoadTasks.remove(streamLoadTask.getTxnId());
}
LOG.info(new LogBuilder(LogKey.STREAM_LOAD_TASK, streamLoadTask.getId())
@ -418,7 +460,6 @@ public class StreamLoadMgr implements MemoryTrackable {
);
}
}
} finally {
writeUnlock();
}
@ -429,11 +470,12 @@ public class StreamLoadMgr implements MemoryTrackable {
public void cleanSyncStreamLoadTasks() {
writeLock();
try {
Iterator<Map.Entry<String, StreamLoadTask>> iterator = idToStreamLoadTask.entrySet().iterator();
Iterator<Map.Entry<String, AbstractStreamLoadTask>> iterator = idToStreamLoadTask.entrySet().iterator();
long currentMs = System.currentTimeMillis();
while (iterator.hasNext()) {
StreamLoadTask streamLoadTask = iterator.next().getValue();
if (streamLoadTask.isSyncStreamLoad() && streamLoadTask.isFinalState()) {
AbstractStreamLoadTask streamLoadTask = iterator.next().getValue();
if (streamLoadTask instanceof StreamLoadTask && ((StreamLoadTask) streamLoadTask).isSyncStreamLoad()
&& streamLoadTask.isFinalState()) {
unprotectedRemoveTaskFromDb(streamLoadTask);
iterator.remove();
txnIdToSyncStreamLoadTasks.remove(streamLoadTask.getTxnId());
@ -452,7 +494,7 @@ public class StreamLoadMgr implements MemoryTrackable {
}
}
private void unprotectedRemoveTaskFromDb(StreamLoadTask streamLoadTask) {
private void unprotectedRemoveTaskFromDb(AbstractStreamLoadTask streamLoadTask) {
long dbId = streamLoadTask.getDBId();
String label = streamLoadTask.getLabel();
@ -463,7 +505,9 @@ public class StreamLoadMgr implements MemoryTrackable {
}
}
warehouseLoadStatusInfoBuilder.withRemovedJob(streamLoadTask);
if (streamLoadTask instanceof StreamLoadTask) {
warehouseLoadStatusInfoBuilder.withRemovedJob((StreamLoadTask) streamLoadTask);
}
}
/*
@ -473,12 +517,12 @@ public class StreamLoadMgr implements MemoryTrackable {
if includeHistory is false, filter not running load task in result
else return all of result
*/
public List<StreamLoadTask> getTask(String dbFullName, String label, boolean includeHistory)
public List<AbstractStreamLoadTask> getTask(String dbFullName, String label, boolean includeHistory)
throws MetaNotFoundException {
readLock();
try {
// return all of stream load task
List<StreamLoadTask> result;
List<AbstractStreamLoadTask> result;
RESULT:
{
if (dbFullName == null) {
@ -523,7 +567,11 @@ public class StreamLoadMgr implements MemoryTrackable {
public Map<Long, WarehouseLoadStatusInfo> getWarehouseLoadInfo() {
readLock();
try {
return warehouseLoadStatusInfoBuilder.buildFromJobs(idToStreamLoadTask.values());
List<StreamLoadTask> streamLoadTasks = idToStreamLoadTask.values().stream()
.filter(task -> task instanceof StreamLoadTask)
.map(task -> (StreamLoadTask) task)
.collect(Collectors.toList());
return warehouseLoadStatusInfoBuilder.buildFromJobs(streamLoadTasks);
} finally {
readUnlock();
}
@ -534,30 +582,30 @@ public class StreamLoadMgr implements MemoryTrackable {
}
// put history task in the end
private void sortStreamLoadTask(List<StreamLoadTask> streamLoadTaskList) {
private void sortStreamLoadTask(List<AbstractStreamLoadTask> streamLoadTaskList) {
if (streamLoadTaskList == null) {
return;
}
Collections.sort(streamLoadTaskList, new Comparator<StreamLoadTask>() {
Collections.sort(streamLoadTaskList, new Comparator<AbstractStreamLoadTask>() {
@Override
public int compare(StreamLoadTask t1, StreamLoadTask t2) {
public int compare(AbstractStreamLoadTask t1, AbstractStreamLoadTask t2) {
return (int) (t1.createTimeMs() - t2.createTimeMs());
}
});
}
// for each label, we can have only one task
public StreamLoadTask getTaskByLabel(String label) {
public AbstractStreamLoadTask getTaskByLabel(String label) {
return idToStreamLoadTask.get(label);
}
public StreamLoadTask getTaskById(long id) {
readLock();
try {
List<StreamLoadTask> taskList =
List<AbstractStreamLoadTask> taskList =
idToStreamLoadTask.values().stream().filter(streamLoadTask -> id == streamLoadTask.getId())
.collect(Collectors.toList());
return taskList.isEmpty() ? null : taskList.get(0);
return taskList.isEmpty() ? null : (StreamLoadTask) taskList.get(0);
} finally {
readUnlock();
}
@ -565,12 +613,12 @@ public class StreamLoadMgr implements MemoryTrackable {
// return all of stream load task named label in all of db
// return all tasks if label is null
public List<StreamLoadTask> getTaskByName(String label) {
List<StreamLoadTask> result = Lists.newArrayList();
public List<AbstractStreamLoadTask> getTaskByName(String label) {
List<AbstractStreamLoadTask> result = Lists.newArrayList();
readLock();
try {
if (label != null) {
StreamLoadTask task = idToStreamLoadTask.get(label);
AbstractStreamLoadTask task = idToStreamLoadTask.get(label);
if (task != null) {
result.add(task);
}
@ -585,7 +633,7 @@ public class StreamLoadMgr implements MemoryTrackable {
}
public void cancelUnDurableTaskAfterRestart() {
for (StreamLoadTask streamLoadTask : idToStreamLoadTask.values()) {
for (AbstractStreamLoadTask streamLoadTask : idToStreamLoadTask.values()) {
if (!streamLoadTask.isDurableLoadState()) {
streamLoadTask.cancelAfterRestart();
}
@ -600,12 +648,30 @@ public class StreamLoadMgr implements MemoryTrackable {
return idToStreamLoadTask.size();
}
public int numOfStreamLoadTask() {
int count = 0;
for (AbstractStreamLoadTask streamLoadTask : idToStreamLoadTask.values()) {
if (streamLoadTask instanceof StreamLoadTask) {
++count;
}
}
return count;
}
public void save(ImageWriter imageWriter) throws IOException, SRMetaBlockException {
int numJson = 1 + idToStreamLoadTask.size();
int numJson = 2 + idToStreamLoadTask.size();
SRMetaBlockWriter writer = imageWriter.getBlockWriter(SRMetaBlockID.STREAM_LOAD_MGR, numJson);
writer.writeInt(idToStreamLoadTask.size());
for (StreamLoadTask streamLoadTask : idToStreamLoadTask.values()) {
writer.writeJson(streamLoadTask);
writer.writeInt(numOfStreamLoadTask());
for (AbstractStreamLoadTask streamLoadTask : idToStreamLoadTask.values()) {
if (streamLoadTask instanceof StreamLoadTask) {
writer.writeJson(streamLoadTask);
}
}
writer.writeInt(idToStreamLoadTask.size() - numOfStreamLoadTask());
for (AbstractStreamLoadTask streamLoadTask : idToStreamLoadTask.values()) {
if (!(streamLoadTask instanceof StreamLoadTask)) {
writer.writeJson(streamLoadTask);
}
}
writer.close();
@ -621,6 +687,16 @@ public class StreamLoadMgr implements MemoryTrackable {
return;
}
addLoadTask(loadTask);
});
reader.readCollection(StreamLoadMultiStmtTask.class, loadTask -> {
loadTask.init();
// discard expired task right away
if (loadTask.checkNeedRemove(currentMs, false)) {
LOG.info("discard expired task: {}", loadTask.getLabel());
return;
}
addLoadTask(loadTask);
});
}
@ -643,9 +719,9 @@ public class StreamLoadMgr implements MemoryTrackable {
long latestTime = -1L;
readLock();
try {
for (StreamLoadTask task : idToStreamLoadTask.values()) {
if (task.isFinal()) {
latestTime = Math.max(latestTime, task.getFinishTimestampMs());
for (AbstractStreamLoadTask task : idToStreamLoadTask.values()) {
if (task instanceof StreamLoadTask && ((StreamLoadTask) task).isFinal()) {
latestTime = Math.max(latestTime, ((StreamLoadTask) task).getFinishTimestampMs());
}
}
} finally {
@ -658,7 +734,7 @@ public class StreamLoadMgr implements MemoryTrackable {
readLock();
try {
Map<Long, Long> result = new HashMap<>();
for (StreamLoadTask task : idToStreamLoadTask.values()) {
for (AbstractStreamLoadTask task : idToStreamLoadTask.values()) {
if (!task.isFinalState()) {
result.compute(task.getCurrentWarehouseId(), (key, value) -> value == null ? 1L : value + 1);
}
@ -668,4 +744,4 @@ public class StreamLoadMgr implements MemoryTrackable {
readUnlock();
}
}
}
}

View File

@ -0,0 +1,490 @@
// Copyright 2021-present StarRocks, Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package com.starrocks.load.streamload;
import com.google.common.collect.Lists;
import com.google.gson.annotations.SerializedName;
import com.starrocks.catalog.Database;
import com.starrocks.catalog.OlapTable;
import com.starrocks.common.Config;
import com.starrocks.common.MetaNotFoundException;
import com.starrocks.common.StarRocksException;
import com.starrocks.common.io.Text;
import com.starrocks.common.util.UUIDUtil;
import com.starrocks.http.rest.TransactionResult;
import com.starrocks.persist.gson.GsonUtils;
import com.starrocks.qe.ConnectContext;
import com.starrocks.server.GlobalStateMgr;
import com.starrocks.server.WarehouseManager;
import com.starrocks.sql.ast.txn.BeginStmt;
import com.starrocks.sql.ast.txn.CommitStmt;
import com.starrocks.sql.parser.NodePosition;
import com.starrocks.thrift.TLoadInfo;
import com.starrocks.thrift.TNetworkAddress;
import com.starrocks.thrift.TStreamLoadInfo;
import com.starrocks.thrift.TUniqueId;
import com.starrocks.transaction.TransactionException;
import com.starrocks.transaction.TransactionState;
import com.starrocks.transaction.TransactionStmtExecutor;
import com.starrocks.warehouse.cngroup.ComputeResource;
import io.netty.handler.codec.http.HttpHeaders;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.io.DataOutput;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.ReentrantReadWriteLock;
public class StreamLoadMultiStmtTask extends AbstractStreamLoadTask {
private static final Logger LOG = LogManager.getLogger(StreamLoadMultiStmtTask.class);
public enum State {
BEGIN,
BEFORE_LOAD,
LOADING,
PREPARING,
PREPARED,
COMMITED,
CANCELLED,
FINISHED
}
@SerializedName(value = "id")
private long id;
private TUniqueId loadId;
@SerializedName("loadIdHi")
private long loadIdHi;
@SerializedName("loadIdLo")
private long loadIdLo;
@SerializedName(value = "label")
private String label;
@SerializedName(value = "dbId")
private long dbId;
@SerializedName(value = "dbName")
private String dbName;
@SerializedName(value = "txnId")
private long txnId;
@SerializedName(value = "createTimeMs")
private long createTimeMs;
@SerializedName(value = "state")
private State state;
@SerializedName(value = "warehouseId")
private long warehouseId;
@SerializedName(value = "computeResource")
private ComputeResource computeResource = WarehouseManager.DEFAULT_RESOURCE;
@SerializedName(value = "type")
private StreamLoadTask.Type type = StreamLoadTask.Type.MULTI_STATEMENT_STREAM_LOAD;
private ConnectContext context = new ConnectContext();
@SerializedName(value = "taskMaps")
private Map<String, StreamLoadTask> taskMaps = new ConcurrentHashMap<>();
private ReentrantReadWriteLock lock;
@SerializedName(value = "timeoutMs")
private long timeoutMs;
@SerializedName(value = "user")
private String user = "";
@SerializedName(value = "clientIp")
private String clientIp = "";
@SerializedName(value = "endTimeMs")
private long endTimeMs = -1;
// Add missing fields
private String errorMsg = "";
private int channelNum = 0;
private List<Object> channels = Lists.newArrayList();
public StreamLoadMultiStmtTask(long id, Database db, String label, String user, String clientIp,
long timeoutMs, long createTimeMs, ComputeResource computeResource) {
this.id = id;
this.dbId = db.getId();
this.dbName = db.getFullName();
this.label = label;
this.user = user;
this.clientIp = clientIp;
this.timeoutMs = timeoutMs;
this.createTimeMs = createTimeMs;
this.computeResource = computeResource;
this.warehouseId = computeResource.getWarehouseId();
this.state = State.BEGIN;
this.type = StreamLoadTask.Type.MULTI_STATEMENT_STREAM_LOAD;
this.loadId = UUIDUtil.genTUniqueId();
init();
}
private void writeLock() {
lock.writeLock().lock();
}
private void writeUnlock() {
lock.writeLock().unlock();
}
public void init() {
this.lock = new ReentrantReadWriteLock(true);
}
@Override
public void beginTxnFromFrontend(TransactionResult resp) {
beginTxn(resp);
}
@Override
public long getCurrentWarehouseId() {
return warehouseId;
}
@Override
public void beginTxnFromFrontend(int channelId, int channelNum, TransactionResult resp) {
beginTxn(resp);
}
@Override
public void beginTxnFromBackend(TUniqueId requestId, String clientIp, long backendId, TransactionResult resp) {
}
public void beginTxn(TransactionResult resp) {
TransactionStmtExecutor.beginStmt(context, new BeginStmt(NodePosition.ZERO));
this.txnId = context.getTxnId();
LOG.info("start transaction id {}", txnId);
}
@Override
public void waitCoordFinishAndPrepareTxn(long preparedTimeoutMs, TransactionResult resp) {
// Prepare all sub-tasks
for (StreamLoadTask task : taskMaps.values()) {
task.waitCoordFinishAndPrepareTxn(preparedTimeoutMs, resp);
if (!resp.stateOK()) {
return;
}
}
}
@Override
public void commitTxn(HttpHeaders headers, TransactionResult resp) throws StarRocksException {
// Commit all sub-tasks
LOG.info("commit {} sub tasks", taskMaps.size());
for (StreamLoadTask task : taskMaps.values()) {
task.prepareChannel(0, task.getTableName(), headers, resp);
if (!resp.stateOK()) {
return;
}
if (task.checkNeedPrepareTxn()) {
task.waitCoordFinish(resp);
}
if (!resp.stateOK()) {
return;
}
TransactionStmtExecutor.loadData(dbId, task.getTable().getId(), task.getTxnStateItem(), context);
}
TransactionStmtExecutor.commitStmt(context, new CommitStmt(NodePosition.ZERO));
this.state = State.COMMITED;
}
@Override
public void manualCancelTask(TransactionResult resp) throws StarRocksException {
// Cancel all sub-tasks
for (StreamLoadTask task : taskMaps.values()) {
task.manualCancelTask(resp);
}
this.state = State.CANCELLED;
this.endTimeMs = System.currentTimeMillis();
}
@Override
public boolean checkNeedRemove(long currentMs, boolean isForce) {
if (!isFinalState()) {
return false;
}
if (endTimeMs == -1) {
return false;
}
return isForce || ((currentMs - endTimeMs) > Config.stream_load_task_keep_max_second * 1000L);
}
@Override
public boolean checkNeedPrepareTxn() {
return taskMaps.values().stream().allMatch(task -> task.checkNeedPrepareTxn());
}
@Override
public boolean isDurableLoadState() {
return state == State.PREPARED || state == State.CANCELLED || state == State.COMMITED || state == State.FINISHED;
}
@Override
public long endTimeMs() {
return endTimeMs;
}
@Override
public long createTimeMs() {
return createTimeMs;
}
@Override
public List<List<String>> getShowInfo() {
List<List<String>> result = Lists.newArrayList();
for (StreamLoadTask task : taskMaps.values()) {
result.addAll(task.getShowInfo());
}
return result;
}
@Override
public List<List<String>> getShowBriefInfo() {
List<List<String>> result = Lists.newArrayList();
for (StreamLoadTask task : taskMaps.values()) {
result.addAll(task.getShowBriefInfo());
}
return result;
}
// Implement abstract methods from parent class
@Override
public long getId() {
return id;
}
@Override
public String getLabel() {
return label;
}
@Override
public long getDBId() {
return dbId;
}
@Override
public String getDBName() {
return dbName;
}
@Override
public long getTxnId() {
return txnId;
}
@Override
public String getStateName() {
return state.name();
}
@Override
public boolean isFinal() {
return isFinalState();
}
@Override
public boolean isFinalState() {
for (StreamLoadTask task : taskMaps.values()) {
if (!task.isFinalState()) {
return false;
}
}
return true;
}
@Override
public List<TLoadInfo> toThrift() {
// loop taskMaps
List<TLoadInfo> result = Lists.newArrayList();
for (StreamLoadTask task : taskMaps.values()) {
result.addAll(task.toThrift());
}
return result;
}
@Override
public List<TStreamLoadInfo> toStreamLoadThrift() {
// loop taskMaps
List<TStreamLoadInfo> result = Lists.newArrayList();
for (StreamLoadTask task : taskMaps.values()) {
result.addAll(task.toStreamLoadThrift());
}
return result;
}
// Implement remaining abstract methods from parent classes
@Override
public void write(DataOutput out) throws IOException {
Text.writeString(out, GsonUtils.GSON.toJson(this));
}
@Override
public void gsonPostProcess() throws IOException {
loadId = new TUniqueId(loadIdHi, loadIdLo);
}
@Override
public void gsonPreProcess() throws IOException {
loadIdHi = loadId.getHi();
loadIdLo = loadId.getLo();
}
@Override
public TNetworkAddress tryLoad(int channelId, String tableName, TransactionResult resp) throws StarRocksException {
// For multi-statement tasks, delegate to specific table task if exists
StreamLoadTask task = taskMaps.get(tableName);
if (task != null) {
return task.tryLoad(0, tableName, resp);
} else {
Database db = GlobalStateMgr.getCurrentState().getLocalMetastore().getDb(dbId);
if (db == null) {
throw new MetaNotFoundException("Database " + dbId + "has been deleted");
}
OlapTable table = (OlapTable) GlobalStateMgr.getCurrentState().getLocalMetastore().getTable(dbName, tableName);
if (table == null) {
throw new MetaNotFoundException("Failed to find table " + tableName + " in db " + dbName);
}
long id = GlobalStateMgr.getCurrentState().getNextId();
task = new StreamLoadTask(id, db, table, label, user, clientIp, timeoutMs, 1, 0, createTimeMs, computeResource);
taskMaps.put(table.getName(), task);
LOG.info("Add stream load task {}", task.getShowInfo());
task.tryBegin(0, 1, txnId);
return task.tryLoad(0, tableName, resp);
}
}
@Override
public TNetworkAddress executeTask(int channelId, String tableName, HttpHeaders headers, TransactionResult resp) {
// For multi-statement tasks, delegate to specific table task if exists
StreamLoadTask task = taskMaps.get(tableName);
if (task != null) {
return task.executeTask(0, tableName, headers, resp);
}
// For multi-statement tasks, we don't strictly validate table names
// as they can handle multiple tables dynamically
resp.setErrorMsg("Table " + tableName + " not found in multi-statement task or not initialized yet");
return null;
}
@Override
public void prepareChannel(int channelId, String tableName, HttpHeaders headers, TransactionResult resp) {
// For multi-statement tasks, delegate to specific table task if exists
StreamLoadTask task = taskMaps.get(tableName);
if (task != null) {
task.prepareChannel(0, tableName, headers, resp);
} else {
// For multi-statement tasks, we don't strictly validate table names
// as they can handle multiple tables dynamically
resp.setErrorMsg("Table " + tableName + " not found in multi-statement task or not initialized yet");
}
}
@Override
public void cancelAfterRestart() {
// loop taskMaps and execute cancelAfterRestart
for (StreamLoadTask task : taskMaps.values()) {
task.cancelAfterRestart();
}
state = State.CANCELLED;
}
@Override
public long getFinishTimestampMs() {
// loop taskMaps get max finish timestamp
return taskMaps.values().stream()
.mapToLong(StreamLoadTask::getFinishTimestampMs)
.max()
.orElse(-1L);
}
@Override
public String getStringByType() {
return "MULTI_STATEMENT_STREAM_LOAD";
}
@Override
public String getTableName() {
// loop taskMaps and get table names
return String.join(",", taskMaps.keySet());
}
@Override
public void beforePrepared(TransactionState txnState) throws TransactionException {
for (StreamLoadTask task : taskMaps.values()) {
task.beforePrepared(txnState);
}
}
@Override
public void afterPrepared(TransactionState txnState, boolean txnOperated) throws StarRocksException {
for (StreamLoadTask task : taskMaps.values()) {
task.afterPrepared(txnState, txnOperated);
}
}
@Override
public void replayOnPrepared(TransactionState txnState) {
for (StreamLoadTask task : taskMaps.values()) {
task.replayOnPrepared(txnState);
}
}
@Override
public void beforeCommitted(TransactionState txnState) throws TransactionException {
for (StreamLoadTask task : taskMaps.values()) {
task.beforeCommitted(txnState);
}
}
@Override
public void afterCommitted(TransactionState txnState, boolean txnOperated) throws StarRocksException {
for (StreamLoadTask task : taskMaps.values()) {
task.afterCommitted(txnState, txnOperated);
}
}
@Override
public void replayOnCommitted(TransactionState txnState) {
for (StreamLoadTask task : taskMaps.values()) {
task.replayOnCommitted(txnState);
}
}
@Override
public void afterAborted(TransactionState txnState, boolean txnOperated, String txnStatusChangeReason)
throws StarRocksException {
for (StreamLoadTask task : taskMaps.values()) {
task.afterAborted(txnState, txnOperated, txnStatusChangeReason);
}
}
@Override
public void replayOnAborted(TransactionState txnState) {
for (StreamLoadTask task : taskMaps.values()) {
task.replayOnAborted(txnState);
}
}
@Override
public void afterVisible(TransactionState txnState, boolean txnOperated) {
for (StreamLoadTask task : taskMaps.values()) {
task.afterVisible(txnState, txnOperated);
}
}
@Override
public void replayOnVisible(TransactionState txnState) {
for (StreamLoadTask task : taskMaps.values()) {
task.replayOnVisible(txnState);
}
}
}

View File

@ -14,11 +14,11 @@
package com.starrocks.load.streamload;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.gson.Gson;
import com.google.gson.annotations.SerializedName;
import com.starrocks.analysis.TableName;
import com.starrocks.catalog.Database;
import com.starrocks.catalog.OlapTable;
import com.starrocks.common.Config;
@ -29,7 +29,6 @@ import com.starrocks.common.MetaNotFoundException;
import com.starrocks.common.StarRocksException;
import com.starrocks.common.Status;
import com.starrocks.common.Version;
import com.starrocks.common.io.Writable;
import com.starrocks.common.util.DebugUtil;
import com.starrocks.common.util.LoadPriority;
import com.starrocks.common.util.LogBuilder;
@ -47,8 +46,7 @@ import com.starrocks.load.LoadConstants;
import com.starrocks.load.loadv2.LoadJob;
import com.starrocks.load.loadv2.ManualLoadTxnCommitAttachment;
import com.starrocks.load.routineload.RLTaskTxnCommitAttachment;
import com.starrocks.persist.gson.GsonPostProcessable;
import com.starrocks.persist.gson.GsonPreProcessable;
import com.starrocks.qe.ConnectContext;
import com.starrocks.qe.DefaultCoordinator;
import com.starrocks.qe.QeProcessorImpl;
import com.starrocks.qe.scheduler.Coordinator;
@ -58,6 +56,8 @@ import com.starrocks.server.GlobalStateMgr;
import com.starrocks.server.WarehouseManager;
import com.starrocks.service.FrontendOptions;
import com.starrocks.sql.LoadPlanner;
import com.starrocks.sql.ast.StreamLoadStmt;
import com.starrocks.sql.parser.NodePosition;
import com.starrocks.task.LoadEtlTask;
import com.starrocks.thrift.TLoadInfo;
import com.starrocks.thrift.TNetworkAddress;
@ -66,14 +66,13 @@ import com.starrocks.thrift.TStatusCode;
import com.starrocks.thrift.TStreamLoadChannel;
import com.starrocks.thrift.TStreamLoadInfo;
import com.starrocks.thrift.TUniqueId;
import com.starrocks.transaction.AbstractTxnStateChangeCallback;
import com.starrocks.transaction.ExplicitTxnState;
import com.starrocks.transaction.TabletCommitInfo;
import com.starrocks.transaction.TabletFailInfo;
import com.starrocks.transaction.TransactionException;
import com.starrocks.transaction.TransactionState;
import com.starrocks.transaction.TransactionState.TxnCoordinator;
import com.starrocks.transaction.TxnCommitAttachment;
import com.starrocks.warehouse.LoadJobWithWarehouse;
import com.starrocks.warehouse.WarehouseIdleChecker;
import com.starrocks.warehouse.cngroup.ComputeResource;
import io.netty.handler.codec.http.HttpHeaders;
@ -89,8 +88,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
import static com.starrocks.common.ErrorCode.ERR_NO_PARTITIONS_HAVE_DATA_LOAD;
public class StreamLoadTask extends AbstractTxnStateChangeCallback
implements Writable, GsonPostProcessable, GsonPreProcessable, LoadJobWithWarehouse {
public class StreamLoadTask extends AbstractStreamLoadTask {
private static final Logger LOG = LogManager.getLogger(StreamLoadTask.class);
public enum State {
@ -107,7 +105,8 @@ public class StreamLoadTask extends AbstractTxnStateChangeCallback
public enum Type {
STREAM_LOAD,
ROUTINE_LOAD,
PARALLEL_STREAM_LOAD // default
PARALLEL_STREAM_LOAD,
MULTI_STATEMENT_STREAM_LOAD
}
private static final double DEFAULT_MAX_FILTER_RATIO = 0.0;
@ -195,6 +194,8 @@ public class StreamLoadTask extends AbstractTxnStateChangeCallback
private OlapTable table;
private long taskDeadlineMs;
private boolean isCommitting;
private ConnectContext context;
private ExplicitTxnState.ExplicitTxnStateItem txnStateItem = new ExplicitTxnState.ExplicitTxnStateItem();
private ReentrantReadWriteLock lock;
@ -272,6 +273,8 @@ public class StreamLoadTask extends AbstractTxnStateChangeCallback
this.streamLoadParams = null;
this.streamLoadInfo = null;
this.isCommitting = false;
this.context = new ConnectContext();
this.context.setQualifiedUser(user);
}
@Override
@ -293,6 +296,11 @@ public class StreamLoadTask extends AbstractTxnStateChangeCallback
beginTxn(0, 1, requestId, TxnCoordinator.fromBackend(clientIp, backendId), resp);
}
@Override
public void beginTxnFromFrontend(TransactionResult resp) {
beginTxnFromFrontend(0, 1, resp);
}
public void beginTxnFromFrontend(int channelId, int channelNum, TransactionResult resp) {
beginTxn(channelId, channelNum, null,
new TxnCoordinator(TransactionState.TxnSourceType.FE, FrontendOptions.getLocalHostAddress()), resp);
@ -383,6 +391,36 @@ public class StreamLoadTask extends AbstractTxnStateChangeCallback
}
}
public void tryBegin(int channelId, int channelNum, long txnId) throws StarRocksException {
writeLock();
try {
if (channelNum != this.channelNum) {
throw new StarRocksException("channel num " + channelNum
+ " does not equal to original channel num " + this.channelNum);
}
if (channelId >= this.channelNum || channelId < 0) {
throw new StarRocksException("channel id should be between [0, " + String.valueOf(this.channelNum - 1) + "].");
}
switch (this.state) {
case BEGIN: {
this.txnId = txnId;
this.state = State.BEFORE_LOAD;
this.channels.set(channelId, State.BEFORE_LOAD);
this.beforeLoadTimeMs = System.currentTimeMillis();
LOG.info("stream load {} channel_id {} begin. db: {}, tbl: {}, txn_id: {}",
label, channelId, dbName, tableName, txnId);
break;
}
default: {
break;
}
}
} finally {
writeUnlock();
}
}
private void updateTransactionResultWithException(Exception e, TransactionResult resp) {
ActionStatus status = ActionStatus.FAILED;
if (e instanceof LabelAlreadyUsedException) {
@ -396,7 +434,18 @@ public class StreamLoadTask extends AbstractTxnStateChangeCallback
resp.setError(status, this.errorMsg, e);
}
public TNetworkAddress tryLoad(int channelId, TransactionResult resp) {
public TNetworkAddress tryLoad(int channelId, String tableName, TransactionResult resp) throws StarRocksException {
// Validate table name first
try {
if (!this.tableName.equals(tableName)) {
throw new StarRocksException(
String.format("Request table %s not equal transaction table %s", tableName, this.tableName));
}
} catch (StarRocksException e) {
resp.setErrorMsg(e.getMessage());
return null;
}
long startTimeMs = System.currentTimeMillis();
boolean needUnLock = true;
boolean exception = false;
@ -468,7 +517,18 @@ public class StreamLoadTask extends AbstractTxnStateChangeCallback
return null;
}
public TNetworkAddress executeTask(int channelId, HttpHeaders headers, TransactionResult resp) {
public TNetworkAddress executeTask(int channelId, String tableName, HttpHeaders headers, TransactionResult resp) {
// Validate table name first
try {
if (!this.tableName.equals(tableName)) {
throw new StarRocksException(
String.format("Request table %s not equal transaction table %s", tableName, this.tableName));
}
} catch (StarRocksException e) {
resp.setErrorMsg(e.getMessage());
return null;
}
long startTimeMs = System.currentTimeMillis();
boolean exception = false;
writeLock();
@ -532,6 +592,7 @@ public class StreamLoadTask extends AbstractTxnStateChangeCallback
}
}
} catch (Exception e) {
LOG.warn("execute task " + label + " exception ", e);
this.errorMsg = new LogBuilder(LogKey.STREAM_LOAD_TASK, id, ':').add("label", label)
.add("error_msg", "cancel stream task for exception: " + e.getMessage()).build_http_log();
exception = true;
@ -548,7 +609,18 @@ public class StreamLoadTask extends AbstractTxnStateChangeCallback
return null;
}
public void prepareChannel(int channelId, HttpHeaders headers, TransactionResult resp) {
public void prepareChannel(int channelId, String tableName, HttpHeaders headers, TransactionResult resp) {
// Validate table name first
try {
if (!this.tableName.equals(tableName)) {
throw new StarRocksException(
String.format("Request table %s not equal transaction table %s", tableName, this.tableName));
}
} catch (StarRocksException e) {
resp.setErrorMsg(e.getMessage());
return;
}
long startTimeMs = System.currentTimeMillis();
boolean needUnLock = true;
boolean exception = false;
@ -668,7 +740,7 @@ public class StreamLoadTask extends AbstractTxnStateChangeCallback
return;
}
public void waitCoordFinishAndPrepareTxn(long preparedTimeoutMs, TransactionResult resp) {
public void waitCoordFinish(TransactionResult resp) {
long startTimeMs = System.currentTimeMillis();
boolean exception = false;
writeLock();
@ -697,6 +769,7 @@ public class StreamLoadTask extends AbstractTxnStateChangeCallback
this.trackingUrl);
}
} catch (Exception e) {
LOG.info("waitCoordFinish", e);
this.errorMsg = new LogBuilder(LogKey.STREAM_LOAD_TASK, id, ':').add("label", label)
.add("error_msg", "cancel stream task for exception: " + e.getMessage()).build_http_log();
exception = true;
@ -711,10 +784,21 @@ public class StreamLoadTask extends AbstractTxnStateChangeCallback
resp.setErrorMsg(this.errorMsg);
return;
}
}
public void waitCoordFinishAndPrepareTxn(long preparedTimeoutMs, TransactionResult resp) {
long startTimeMs = System.currentTimeMillis();
boolean exception = false;
waitCoordFinish(resp);
if (!resp.stateOK()) {
return;
}
try {
unprotectedPrepareTxn(preparedTimeoutMs);
} catch (Exception e) {
LOG.info("unprotectedPrepareTxn", e);
this.errorMsg = new LogBuilder(LogKey.STREAM_LOAD_TASK, id, ':').add("label", label)
.add("error_msg", "cancel stream task for exception: " + e.getMessage()).build_http_log();
exception = true;
@ -740,7 +824,7 @@ public class StreamLoadTask extends AbstractTxnStateChangeCallback
label, dbName, tableName, txnId);
}
public void commitTxn(TransactionResult resp) throws StarRocksException {
public void commitTxn(HttpHeaders headers, TransactionResult resp) throws StarRocksException {
long startTimeMs = System.currentTimeMillis();
boolean exception = false;
readLock();
@ -827,7 +911,7 @@ public class StreamLoadTask extends AbstractTxnStateChangeCallback
}
LoadPlanner loadPlanner = new LoadPlanner(id, loadId, txnId, dbId, dbName, table,
streamLoadInfo.isStrictMode(), streamLoadInfo.getTimezone(), streamLoadInfo.isPartialUpdate(),
null, null, streamLoadInfo.getLoadMemLimit(), streamLoadInfo.getExecMemLimit(),
context, null, streamLoadInfo.getLoadMemLimit(), streamLoadInfo.getExecMemLimit(),
streamLoadInfo.getNegative(), channelNum, streamLoadInfo.getColumnExprDescs(), streamLoadInfo, label,
streamLoadInfo.getTimeout());
@ -858,6 +942,7 @@ public class StreamLoadTask extends AbstractTxnStateChangeCallback
TStreamLoadChannel streamLoadChannel = new TStreamLoadChannel();
streamLoadChannel.setLabel(label);
streamLoadChannel.setChannel_id(channelId);
streamLoadChannel.setTable_name(tableName);
TStatus tStatus = ThriftRPCRequestExecutor.callNoRetry(
ThriftConnectionPool.backendPool,
@ -902,6 +987,12 @@ public class StreamLoadTask extends AbstractTxnStateChangeCallback
if (!status.ok()) {
throw new LoadException(status.getErrorMsg());
}
txnStateItem.setDmlStmt(new StreamLoadStmt(new TableName(dbName, tableName), NodePosition.ZERO));
txnStateItem.setTabletCommitInfos(TabletCommitInfo.fromThrift(coord.getCommitInfos()));
txnStateItem.setTabletFailInfos(TabletFailInfo.fromThrift(coord.getFailInfos()));
txnStateItem.addLoadedRows(numRowsNormal);
txnStateItem.addFilteredRows(numRowsUnselected);
txnStateItem.addLoadedBytes(numLoadBytesTotal);
} else {
throw new LoadException("coordinator could not finished before job timeout");
}
@ -995,7 +1086,9 @@ public class StreamLoadTask extends AbstractTxnStateChangeCallback
} finally {
readUnlock();
}
Preconditions.checkState(endTimeMs != -1, endTimeMs);
if (endTimeMs == -1) {
return false;
}
if (isForce || ((currentMs - endTimeMs) > Config.stream_load_task_keep_max_second * 1000L)) {
return true;
}
@ -1445,7 +1538,8 @@ public class StreamLoadTask extends AbstractTxnStateChangeCallback
}
}
public List<String> getShowInfo() {
@Override
public List<List<String>> getShowInfo() {
readLock();
try {
List<String> row = Lists.newArrayList();
@ -1489,13 +1583,13 @@ public class StreamLoadTask extends AbstractTxnStateChangeCallback
} else {
row.add("");
}
return row;
return List.of(row);
} finally {
readUnlock();
}
}
public List<String> getShowBriefInfo() {
public List<List<String>> getShowBriefInfo() {
readLock();
try {
List<String> row = Lists.newArrayList();
@ -1504,7 +1598,7 @@ public class StreamLoadTask extends AbstractTxnStateChangeCallback
row.add(dbName);
row.add(tableName);
row.add(state.name());
return row;
return List.of(row);
} finally {
readUnlock();
}
@ -1554,7 +1648,7 @@ public class StreamLoadTask extends AbstractTxnStateChangeCallback
return gson.toJson(properties);
}
public TLoadInfo toThrift() {
public List<TLoadInfo> toThrift() {
readLock();
try {
TLoadInfo info = new TLoadInfo();
@ -1595,14 +1689,15 @@ public class StreamLoadTask extends AbstractTxnStateChangeCallback
info.setLoad_finish_time(TimeUtils.longToTimeString(endTimeMs));
info.setType(getStringByType());
return info;
return Lists.newArrayList(info);
} finally {
readUnlock();
}
}
public TStreamLoadInfo toStreamLoadThrift() {
@Override
public List<TStreamLoadInfo> toStreamLoadThrift() {
readLock();
try {
TStreamLoadInfo info = new TStreamLoadInfo();
@ -1646,7 +1741,7 @@ public class StreamLoadTask extends AbstractTxnStateChangeCallback
}
info.setChannel_state(channelStateBuilder.toString());
info.setType(getStringByType());
return info;
return Lists.newArrayList(info);
} finally {
readUnlock();
}
@ -1655,4 +1750,8 @@ public class StreamLoadTask extends AbstractTxnStateChangeCallback
protected void setState(State state) {
this.state = state;
}
public ExplicitTxnState.ExplicitTxnStateItem getTxnStateItem() {
return txnStateItem;
}
}

View File

@ -76,6 +76,7 @@ import com.starrocks.load.MultiDeleteInfo;
import com.starrocks.load.loadv2.LoadJob.LoadJobStateUpdateInfo;
import com.starrocks.load.loadv2.LoadJobFinalOperation;
import com.starrocks.load.routineload.RoutineLoadJob;
import com.starrocks.load.streamload.StreamLoadMultiStmtTask;
import com.starrocks.load.streamload.StreamLoadTask;
import com.starrocks.metric.MetricRepo;
import com.starrocks.persist.gson.GsonUtils;
@ -614,6 +615,11 @@ public class EditLog {
globalStateMgr.getStreamLoadMgr().replayCreateLoadTask(streamLoadTask);
break;
}
case OperationType.OP_CREATE_MULTI_STMT_STREAM_LOAD_TASK: {
StreamLoadMultiStmtTask streamLoadTask = (StreamLoadMultiStmtTask) journal.data();
globalStateMgr.getStreamLoadMgr().replayCreateLoadTask(streamLoadTask);
break;
}
case OperationType.OP_CREATE_LOAD_JOB_V2: {
com.starrocks.load.loadv2.LoadJob loadJob =
(com.starrocks.load.loadv2.LoadJob) journal.data();
@ -1714,6 +1720,10 @@ public class EditLog {
logJsonObject(OperationType.OP_CREATE_STREAM_LOAD_TASK_V2, streamLoadTask);
}
public void logCreateMultiStmtStreamLoadJob(StreamLoadMultiStmtTask streamLoadTask) {
logJsonObject(OperationType.OP_CREATE_MULTI_STMT_STREAM_LOAD_TASK, streamLoadTask);
}
public void logCreateLoadJob(com.starrocks.load.loadv2.LoadJob loadJob) {
logJsonObject(OperationType.OP_CREATE_LOAD_JOB_V2, loadJob);
}

View File

@ -37,6 +37,7 @@ import com.starrocks.load.MultiDeleteInfo;
import com.starrocks.load.loadv2.LoadJob;
import com.starrocks.load.loadv2.LoadJobFinalOperation;
import com.starrocks.load.routineload.RoutineLoadJob;
import com.starrocks.load.streamload.StreamLoadMultiStmtTask;
import com.starrocks.load.streamload.StreamLoadTask;
import com.starrocks.persist.gson.GsonUtils;
import com.starrocks.plugin.PluginInfo;
@ -153,6 +154,7 @@ public class EditLogDeserializer {
.put(OperationType.OP_CREATE_ROUTINE_LOAD_JOB_V2, RoutineLoadJob.class)
.put(OperationType.OP_CHANGE_ROUTINE_LOAD_JOB_V2, RoutineLoadOperation.class)
.put(OperationType.OP_CREATE_STREAM_LOAD_TASK_V2, StreamLoadTask.class)
.put(OperationType.OP_CREATE_MULTI_STMT_STREAM_LOAD_TASK, StreamLoadMultiStmtTask.class)
.put(OperationType.OP_CREATE_LOAD_JOB_V2, LoadJob.class)
.put(OperationType.OP_END_LOAD_JOB_V2, LoadJobFinalOperation.class)
.put(OperationType.OP_UPDATE_LOAD_JOB, LoadJob.LoadJobStateUpdateInfo.class)

View File

@ -538,6 +538,9 @@ public class OperationType {
@IgnorableOnReplayFailed
public static final short OP_CREATE_STREAM_LOAD_TASK_V2 = 13070;
@IgnorableOnReplayFailed
public static final short OP_CREATE_MULTI_STMT_STREAM_LOAD_TASK = 13071;
@IgnorableOnReplayFailed
public static final short OP_MODIFY_COLUMN_COMMENT = 13080;

View File

@ -17,6 +17,7 @@ package com.starrocks.qe;
import com.starrocks.sql.ast.DeleteStmt;
import com.starrocks.sql.ast.DmlStmt;
import com.starrocks.sql.ast.InsertStmt;
import com.starrocks.sql.ast.StreamLoadStmt;
import com.starrocks.sql.ast.UpdateStmt;
/**
@ -27,7 +28,8 @@ public enum DmlType {
INSERT_INTO,
INSERT_OVERWRITE,
UPDATE,
DELETE;
DELETE,
STREAM_LOAD;
public static DmlType fromStmt(DmlStmt stmt) {
if (stmt instanceof InsertStmt) {
@ -41,6 +43,8 @@ public enum DmlType {
return UPDATE;
} else if (stmt instanceof DeleteStmt) {
return DELETE;
} else if (stmt instanceof StreamLoadStmt) {
return STREAM_LOAD;
} else {
throw new UnsupportedOperationException("unsupported");
}

View File

@ -133,8 +133,8 @@ import com.starrocks.load.pipe.Pipe;
import com.starrocks.load.pipe.PipeManager;
import com.starrocks.load.routineload.RoutineLoadFunctionalExprProvider;
import com.starrocks.load.routineload.RoutineLoadJob;
import com.starrocks.load.streamload.AbstractStreamLoadTask;
import com.starrocks.load.streamload.StreamLoadFunctionalExprProvider;
import com.starrocks.load.streamload.StreamLoadTask;
import com.starrocks.meta.BlackListSql;
import com.starrocks.proto.FailPointTriggerModeType;
import com.starrocks.proto.PFailPointInfo;
@ -1374,7 +1374,7 @@ public class ShowExecutor {
public ShowResultSet visitShowStreamLoadStatement(ShowStreamLoadStmt statement, ConnectContext context) {
List<List<String>> rows = Lists.newArrayList();
// if task exists
List<StreamLoadTask> streamLoadTaskList;
List<AbstractStreamLoadTask> streamLoadTaskList;
try {
streamLoadTaskList = GlobalStateMgr.getCurrentState().getStreamLoadMgr()
.getTask(statement.getDbFullName(),
@ -1388,13 +1388,15 @@ public class ShowExecutor {
if (streamLoadTaskList != null) {
StreamLoadFunctionalExprProvider fProvider =
statement.getFunctionalExprProvider(context);
rows = streamLoadTaskList.parallelStream()
List<AbstractStreamLoadTask> tasks = streamLoadTaskList.parallelStream()
.filter(fProvider.getPredicateChain())
.sorted(fProvider.getOrderComparator())
.skip(fProvider.getSkipCount())
.limit(fProvider.getLimitCount())
.map(StreamLoadTask::getShowInfo)
.collect(Collectors.toList());
for (AbstractStreamLoadTask task : tasks) {
rows.addAll(task.getShowInfo());
}
}
if (!Strings.isNullOrEmpty(statement.getName()) && rows.isEmpty()) {

View File

@ -131,6 +131,7 @@ import com.starrocks.load.pipe.filelist.RepoAccessor;
import com.starrocks.load.routineload.RLTaskTxnCommitAttachment;
import com.starrocks.load.routineload.RoutineLoadJob;
import com.starrocks.load.routineload.RoutineLoadMgr;
import com.starrocks.load.streamload.AbstractStreamLoadTask;
import com.starrocks.load.streamload.StreamLoadInfo;
import com.starrocks.load.streamload.StreamLoadKvParams;
import com.starrocks.load.streamload.StreamLoadMgr;
@ -1181,7 +1182,7 @@ public class FrontendServiceImpl implements FrontendService.Iface {
throw resp.getException();
}
StreamLoadTask task = streamLoadManager.getTaskByLabel(request.getLabel());
AbstractStreamLoadTask task = streamLoadManager.getTaskByLabel(request.getLabel());
// this should't open
if (task == null || task.getTxnId() == -1) {
throw new StarRocksException(String.format("Load label: {} begin transacton failed", request.getLabel()));
@ -2571,16 +2572,18 @@ public class FrontendServiceImpl implements FrontendService.Iface {
}
if (request.isSetJob_id()) {
StreamLoadTask task = GlobalStateMgr.getCurrentState().getStreamLoadMgr().getTaskById(request.getJob_id());
AbstractStreamLoadTask task = GlobalStateMgr.getCurrentState()
.getStreamLoadMgr().getTaskById(request.getJob_id());
if (task != null) {
loads.add(task.toThrift());
loads.addAll(task.toThrift());
}
} else {
List<StreamLoadTask> streamLoadTaskList = GlobalStateMgr.getCurrentState().getStreamLoadMgr()
List<AbstractStreamLoadTask> streamLoadTaskList = GlobalStateMgr.getCurrentState().getStreamLoadMgr()
.getTaskByName(request.getLabel());
if (streamLoadTaskList != null) {
loads.addAll(
streamLoadTaskList.stream().map(StreamLoadTask::toThrift).collect(Collectors.toList()));
for (AbstractStreamLoadTask streamLoadTask : streamLoadTaskList) {
loads.addAll(streamLoadTask.toThrift());
}
}
}
@ -2731,15 +2734,16 @@ public class FrontendServiceImpl implements FrontendService.Iface {
List<TStreamLoadInfo> loads = Lists.newArrayList();
try {
if (request.isSetJob_id()) {
StreamLoadTask task = loadManager.getTaskById(request.getJob_id());
AbstractStreamLoadTask task = loadManager.getTaskById(request.getJob_id());
if (task != null) {
loads.add(task.toStreamLoadThrift());
loads.addAll(task.toStreamLoadThrift());
}
} else {
List<StreamLoadTask> streamLoadTaskList = loadManager.getTaskByName(request.getLabel());
List<AbstractStreamLoadTask> streamLoadTaskList = loadManager.getTaskByName(request.getLabel());
if (streamLoadTaskList != null) {
loads.addAll(
streamLoadTaskList.stream().map(StreamLoadTask::toStreamLoadThrift).collect(Collectors.toList()));
for (AbstractStreamLoadTask task : streamLoadTaskList) {
loads.addAll(task.toStreamLoadThrift());
}
}
}
result.setLoads(loads);

View File

@ -0,0 +1,32 @@
// Copyright 2021-present StarRocks, Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package com.starrocks.sql.ast;
import com.starrocks.analysis.TableName;
import com.starrocks.sql.parser.NodePosition;
public class StreamLoadStmt extends DmlStmt {
private final TableName tblName;
public StreamLoadStmt(TableName tableName, NodePosition pos) {
super(pos);
this.tblName = tableName;
}
@Override
public TableName getTableName() {
return tblName;
}
}

View File

@ -108,7 +108,8 @@ public class TransactionState implements Writable, GsonPreProcessable {
FRONTEND_STREAMING(8), // FE streaming load use this type
MV_REFRESH(9), // Refresh MV
REPLICATION(10), // Replication
BYPASS_WRITE(11); // Bypass BE, and write data file directly
BYPASS_WRITE(11), // Bypass BE, and write data file directly
MULTI_STATEMENT_STREAMING(12); // multi statement streaming load
private final int flag;

View File

@ -166,6 +166,29 @@ public class TransactionStmtExecutor {
}
}
public static void loadData(long dbId, long tableId, ExplicitTxnState.ExplicitTxnStateItem item,
ConnectContext context) throws StarRocksException {
GlobalTransactionMgr globalTransactionMgr = GlobalStateMgr.getCurrentState().getGlobalTransactionMgr();
ExplicitTxnState explicitTxnState = globalTransactionMgr.getExplicitTxnState(context.getTxnId());
TransactionState transactionState = explicitTxnState.getTransactionState();
if (transactionState.getDbId() == 0) {
transactionState.setDbId(dbId);
DatabaseTransactionMgr databaseTransactionMgr = GlobalStateMgr.getCurrentState().getGlobalTransactionMgr()
.getDatabaseTransactionMgr(dbId);
databaseTransactionMgr.upsertTransactionState(transactionState);
}
transactionState.addTableIdList(tableId);
explicitTxnState.addTransactionItem(item);
context.getState().setOk(item.getLoadedRows(), Ints.saturatedCast(item.getFilteredRows()),
buildMessage(transactionState.getLabel(), TransactionStatus.PREPARE,
transactionState.getTransactionId(), dbId));
LOG.info("load database {} table {} item {} txn {}", dbId, tableId, item, context.getTxnId());
}
public static void commitStmt(ConnectContext context, CommitStmt stmt) {
GlobalTransactionMgr globalTransactionMgr = GlobalStateMgr.getCurrentState().getGlobalTransactionMgr();
ExplicitTxnState explicitTxnState = globalTransactionMgr.getExplicitTxnState(context.getTxnId());
@ -230,30 +253,32 @@ public class TransactionStmtExecutor {
List<ExplicitTxnState.ExplicitTxnStateItem> explicitTxnStateItems
= explicitTxnState.getTransactionStateItems();
List<Long> callbackIds = transactionState.getCallbackId();
Preconditions.checkArgument(explicitTxnStateItems.size() == callbackIds.size());
for (int i = 0; i < explicitTxnStateItems.size(); i++) {
ExplicitTxnState.ExplicitTxnStateItem item = explicitTxnStateItems.get(i);
DmlStmt dmlStmt = item.getDmlStmt();
Table targetTable = GlobalStateMgr.getCurrentState().getLocalMetastore()
.getTable(database.getFullName(), dmlStmt.getTableName().getTbl());
if (dmlStmt instanceof InsertStmt) {
Preconditions.checkArgument(explicitTxnStateItems.size() == callbackIds.size());
Table targetTable = GlobalStateMgr.getCurrentState().getLocalMetastore()
.getTable(database.getFullName(), dmlStmt.getTableName().getTbl());
// collect table-level metrics
TableMetricsEntity entity = TableMetricsRegistry.getInstance().getMetricsEntity(targetTable.getId());
entity.counterInsertLoadFinishedTotal.increase(1L);
entity.counterInsertLoadRowsTotal.increase(item.getLoadedRows());
entity.counterInsertLoadBytesTotal.increase(item.getLoadedBytes());
GlobalStateMgr.getCurrentState().getOperationListenerBus()
.onDMLStmtJobTransactionFinish(transactionState, database, targetTable, DmlType.fromStmt(dmlStmt));
context.getGlobalStateMgr().getLoadMgr().recordFinishedOrCancelledLoadJob(
callbackIds.get(i),
EtlJobType.INSERT,
"",
"");
}
MetricRepo.COUNTER_LOAD_FINISHED.increase(1L);
// collect table-level metrics
TableMetricsEntity entity = TableMetricsRegistry.getInstance().getMetricsEntity(targetTable.getId());
entity.counterInsertLoadFinishedTotal.increase(1L);
entity.counterInsertLoadRowsTotal.increase(item.getLoadedRows());
entity.counterInsertLoadBytesTotal.increase(item.getLoadedBytes());
GlobalStateMgr.getCurrentState().getOperationListenerBus()
.onDMLStmtJobTransactionFinish(transactionState, database, targetTable, DmlType.fromStmt(dmlStmt));
context.getGlobalStateMgr().getLoadMgr().recordFinishedOrCancelledLoadJob(
callbackIds.get(i),
EtlJobType.INSERT,
"",
"");
}
context.getState().setOk(0, 0,
@ -302,13 +327,16 @@ public class TransactionStmtExecutor {
List<ExplicitTxnState.ExplicitTxnStateItem> explicitTxnStateItems
= explicitTxnState.getTransactionStateItems();
List<Long> callbackIds = transactionState.getCallbackId();
Preconditions.checkArgument(explicitTxnStateItems.size() == callbackIds.size());
for (int i = 0; i < explicitTxnStateItems.size(); i++) {
ExplicitTxnState.ExplicitTxnStateItem item = explicitTxnStateItems.get(i);
DmlStmt dmlStmt = item.getDmlStmt();
commitInfos.addAll(item.getTabletCommitInfos());
failInfos.addAll(item.getTabletFailInfos());
loadMgr.recordFinishedOrCancelledLoadJob(callbackIds.get(i), EtlJobType.INSERT, "", "");
if (dmlStmt instanceof InsertStmt) {
Preconditions.checkArgument(explicitTxnStateItems.size() == callbackIds.size());
loadMgr.recordFinishedOrCancelledLoadJob(callbackIds.get(i), EtlJobType.INSERT, "", "");
}
}
transactionMgr.abortTransaction(

View File

@ -604,11 +604,12 @@ public class TransactionLoadActionTest extends StarRocksHttpTestCase {
{
new Expectations() {
{
streamLoadMgr.prepareLoadTask(anyString, anyInt, (HttpHeaders) any, (TransactionResult) any);
streamLoadMgr.prepareLoadTask(anyString, anyString, anyInt, (HttpHeaders) any, (TransactionResult) any);
times = 1;
result = new Delegate<Void>() {
public void prepareLoadTask(String label,
String tableName,
int channelId,
HttpHeaders headers,
TransactionResult resp) throws StarRocksException {
@ -636,11 +637,12 @@ public class TransactionLoadActionTest extends StarRocksHttpTestCase {
{
new Expectations() {
{
streamLoadMgr.prepareLoadTask(anyString, anyInt, (HttpHeaders) any, (TransactionResult) any);
streamLoadMgr.prepareLoadTask(anyString, anyString, anyInt, (HttpHeaders) any, (TransactionResult) any);
times = 1;
result = new Delegate<Void>() {
public void prepareLoadTask(String label,
String tableName,
int channelId,
HttpHeaders headers,
TransactionResult resp) throws StarRocksException {
@ -672,11 +674,12 @@ public class TransactionLoadActionTest extends StarRocksHttpTestCase {
{
new Expectations() {
{
streamLoadMgr.prepareLoadTask(anyString, anyInt, (HttpHeaders) any, (TransactionResult) any);
streamLoadMgr.prepareLoadTask(anyString, anyString, anyInt, (HttpHeaders) any, (TransactionResult) any);
times = 1;
result = new Delegate<Void>() {
public void prepareLoadTask(String label,
String tableName,
int channelId,
HttpHeaders headers,
TransactionResult resp) throws StarRocksException {
@ -864,7 +867,7 @@ public class TransactionLoadActionTest extends StarRocksHttpTestCase {
{
new Expectations() {
{
streamLoadMgr.commitLoadTask(anyString, (TransactionResult) any);
streamLoadMgr.commitLoadTask(anyString, (HttpHeaders) any, (TransactionResult) any);
times = 1;
result = new StarRocksException("commit load task error");
}
@ -887,11 +890,12 @@ public class TransactionLoadActionTest extends StarRocksHttpTestCase {
{
new Expectations() {
{
streamLoadMgr.commitLoadTask(anyString, (TransactionResult) any);
streamLoadMgr.commitLoadTask(anyString, (HttpHeaders) any, (TransactionResult) any);
times = 1;
result = new Delegate<Void>() {
public void commitLoadTask(String label, TransactionResult resp) throws StarRocksException {
public void commitLoadTask(String label, HttpHeaders h, TransactionResult resp)
throws StarRocksException {
resp.addResultEntry(TransactionResult.LABEL_KEY, label);
}
@ -1793,6 +1797,251 @@ public class TransactionLoadActionTest extends StarRocksHttpTestCase {
}
}
@Test
public void multiStatementTransactionBeginTest() throws Exception {
{
new Expectations() {
{
streamLoadMgr.beginMultiStatementLoadTask(
anyString, anyString, anyString, anyString, anyLong,
(TransactionResult) any, (ComputeResource) any);
times = 1;
result = new Delegate<Void>() {
public void beginMultiStatementLoadTask(String dbName, String label,
String user, String clientIp,
long timeoutMillis,
TransactionResult resp,
ComputeResource computeResource) {
resp.addResultEntry(TransactionResult.LABEL_KEY, label);
}
};
}
};
String label = RandomStringUtils.randomAlphanumeric(32);
Request request = newRequest(TransactionOperation.TXN_BEGIN, (uriBuilder, reqBuilder) -> {
reqBuilder.addHeader(DB_KEY, DB_NAME);
reqBuilder.addHeader(TABLE_KEY, TABLE_NAME);
reqBuilder.addHeader(LABEL_KEY, label);
reqBuilder.addHeader(WAREHOUSE_KEY, "default_warehouse");
reqBuilder.addHeader(
SOURCE_TYPE, Objects.toString(LoadJobSourceType.MULTI_STATEMENT_STREAMING.getFlag()));
});
try (Response response = networkClient.newCall(request).execute()) {
Map<String, Object> body = parseResponseBody(response);
assertEquals(OK, body.get(TransactionResult.STATUS_KEY));
assertEquals(label, body.get(TransactionResult.LABEL_KEY));
}
}
{
new Expectations() {
{
streamLoadMgr.beginMultiStatementLoadTask(
anyString, anyString, anyString, anyString, anyLong,
(TransactionResult) any, (ComputeResource) any);
times = 1;
result = new StarRocksException("begin multi statement load task error");
}
};
String label = RandomStringUtils.randomAlphanumeric(32);
Request request = newRequest(TransactionOperation.TXN_BEGIN, (uriBuilder, reqBuilder) -> {
reqBuilder.addHeader(DB_KEY, DB_NAME);
reqBuilder.addHeader(TABLE_KEY, TABLE_NAME);
reqBuilder.addHeader(LABEL_KEY, label);
reqBuilder.addHeader(WAREHOUSE_KEY, "default_warehouse");
reqBuilder.addHeader(
SOURCE_TYPE, Objects.toString(LoadJobSourceType.MULTI_STATEMENT_STREAMING.getFlag()));
});
try (Response response = networkClient.newCall(request).execute()) {
Map<String, Object> body = parseResponseBody(response);
assertEquals(FAILED, body.get(TransactionResult.STATUS_KEY));
assertTrue(Objects.toString(body.get(TransactionResult.MESSAGE_KEY))
.contains("begin multi statement load task error"));
}
}
}
@Test
public void multiStatementTransactionCommitTest() throws Exception {
{
new Expectations() {
{
streamLoadMgr.commitLoadTask(anyString, (HttpHeaders) any, (TransactionResult) any);
times = 1;
result = new StarRocksException("commit multi statement load task error");
}
};
String label = RandomStringUtils.randomAlphanumeric(32);
Request request = newRequest(TransactionOperation.TXN_COMMIT, (uriBuilder, reqBuilder) -> {
reqBuilder.addHeader(DB_KEY, DB_NAME);
reqBuilder.addHeader(TABLE_KEY, TABLE_NAME);
reqBuilder.addHeader(LABEL_KEY, label);
uriBuilder.addParameter(
SOURCE_TYPE, Objects.toString(LoadJobSourceType.MULTI_STATEMENT_STREAMING.getFlag()));
});
try (Response response = networkClient.newCall(request).execute()) {
Map<String, Object> body = parseResponseBody(response);
assertEquals(FAILED, body.get(TransactionResult.STATUS_KEY));
assertTrue(Objects.toString(body.get(TransactionResult.MESSAGE_KEY))
.contains("commit multi statement load task error"));
}
}
{
new Expectations() {
{
streamLoadMgr.commitLoadTask(anyString, (HttpHeaders) any, (TransactionResult) any);
times = 1;
result = new Delegate<Void>() {
public void commitLoadTask(String label, HttpHeaders h, TransactionResult resp) {
resp.addResultEntry(TransactionResult.LABEL_KEY, label);
}
};
}
};
String label = RandomStringUtils.randomAlphanumeric(32);
Request request = newRequest(TransactionOperation.TXN_COMMIT, (uriBuilder, reqBuilder) -> {
reqBuilder.addHeader(DB_KEY, DB_NAME);
reqBuilder.addHeader(TABLE_KEY, TABLE_NAME);
reqBuilder.addHeader(LABEL_KEY, label);
reqBuilder.addHeader(
SOURCE_TYPE, Objects.toString(LoadJobSourceType.MULTI_STATEMENT_STREAMING.getFlag()));
});
try (Response response = networkClient.newCall(request).execute()) {
Map<String, Object> body = parseResponseBody(response);
assertEquals(OK, body.get(TransactionResult.STATUS_KEY));
assertEquals(label, body.get(TransactionResult.LABEL_KEY));
}
}
}
@Test
public void multiStatementTransactionRollbackTest() throws Exception {
{
new Expectations() {
{
streamLoadMgr.rollbackLoadTask(anyString, (TransactionResult) any);
times = 1;
result = new StarRocksException("rollback multi statement load task error");
}
};
String label = RandomStringUtils.randomAlphanumeric(32);
Request request = newRequest(TransactionOperation.TXN_ROLLBACK, (uriBuilder, reqBuilder) -> {
reqBuilder.addHeader(DB_KEY, DB_NAME);
reqBuilder.addHeader(TABLE_KEY, TABLE_NAME);
reqBuilder.addHeader(LABEL_KEY, label);
reqBuilder.addHeader(
SOURCE_TYPE, Objects.toString(LoadJobSourceType.MULTI_STATEMENT_STREAMING.getFlag()));
});
try (Response response = networkClient.newCall(request).execute()) {
Map<String, Object> body = parseResponseBody(response);
assertEquals(FAILED, body.get(TransactionResult.STATUS_KEY));
assertTrue(Objects.toString(body.get(TransactionResult.MESSAGE_KEY))
.contains("rollback multi statement load task error"));
}
}
{
new Expectations() {
{
streamLoadMgr.rollbackLoadTask(anyString, (TransactionResult) any);
times = 1;
result = new Delegate<Void>() {
public void rollbackLoadTask(String label, TransactionResult resp) {
resp.addResultEntry(TransactionResult.LABEL_KEY, label);
}
};
}
};
String label = RandomStringUtils.randomAlphanumeric(32);
Request request = newRequest(TransactionOperation.TXN_ROLLBACK, (uriBuilder, reqBuilder) -> {
reqBuilder.addHeader(DB_KEY, DB_NAME);
reqBuilder.addHeader(TABLE_KEY, TABLE_NAME);
reqBuilder.addHeader(LABEL_KEY, label);
reqBuilder.addHeader(
SOURCE_TYPE, Objects.toString(LoadJobSourceType.MULTI_STATEMENT_STREAMING.getFlag()));
});
try (Response response = networkClient.newCall(request).execute()) {
Map<String, Object> body = parseResponseBody(response);
assertEquals(OK, body.get(TransactionResult.STATUS_KEY));
assertEquals(label, body.get(TransactionResult.LABEL_KEY));
}
}
}
@Test
public void multiStatementTransactionLoadTest() throws Exception {
{
new Expectations() {
{
streamLoadMgr.executeLoadTask(
anyString, anyInt, (HttpHeaders) any, (TransactionResult) any,
anyString, anyString);
times = 1;
result = new Delegate<TNetworkAddress>() {
public TNetworkAddress executeLoadTask(String label,
int channelId,
HttpHeaders headers,
TransactionResult resp,
String dbName,
String tableName) {
resp.setErrorMsg("execute multi statement load task error");
return null;
}
};
}
};
String label = RandomStringUtils.randomAlphanumeric(32);
Request request = newRequest(TransactionOperation.TXN_LOAD, (uriBuilder, reqBuilder) -> {
reqBuilder.addHeader(DB_KEY, DB_NAME);
reqBuilder.addHeader(TABLE_KEY, TABLE_NAME);
reqBuilder.addHeader(LABEL_KEY, label);
reqBuilder.addHeader(
SOURCE_TYPE, Objects.toString(LoadJobSourceType.MULTI_STATEMENT_STREAMING.getFlag()));
});
try (Response response = networkClient.newCall(request).execute()) {
Map<String, Object> body = parseResponseBody(response);
assertEquals(FAILED, body.get(TransactionResult.STATUS_KEY));
assertTrue(Objects.toString(body.get(TransactionResult.MESSAGE_KEY))
.contains("execute multi statement load task error"));
}
}
{
new Expectations() {
{
streamLoadMgr.executeLoadTask(
anyString, anyInt, (HttpHeaders) any, (TransactionResult) any,
anyString, anyString);
times = 1;
result = new TNetworkAddress("localhost", 8040);
}
};
String label = RandomStringUtils.randomAlphanumeric(32);
Request request = newRequest(TransactionOperation.TXN_LOAD, (uriBuilder, reqBuilder) -> {
reqBuilder.addHeader(DB_KEY, DB_NAME);
reqBuilder.addHeader(TABLE_KEY, TABLE_NAME);
reqBuilder.addHeader(LABEL_KEY, label);
reqBuilder.addHeader(
SOURCE_TYPE, Objects.toString(LoadJobSourceType.MULTI_STATEMENT_STREAMING.getFlag()));
});
try (Response response = networkClient.newCall(request).execute()) {
Map<String, Object> body = parseResponseBody(response);
assertEquals(OK, body.get(TransactionResult.STATUS_KEY));
assertTrue(Objects.toString(body.get(TransactionResult.MESSAGE_KEY))
.contains("mock redirect to BE"));
}
}
}
private Request newRequest(TransactionOperation operation) throws Exception {
return newRequest(operation, (uriBuilder, reqBuilder) -> {
});

View File

@ -210,7 +210,7 @@ public class StreamLoadManagerTest {
streamLoadManager.beginLoadTaskFromFrontend(
dbName, tableName, labelName, "", "", timeoutMillis, channelNum, channelId, resp);
List<StreamLoadTask> tasks = streamLoadManager.getTaskByName(labelName);
List<AbstractStreamLoadTask> tasks = streamLoadManager.getTaskByName(labelName);
Assertions.assertEquals(1, tasks.size());
Assertions.assertEquals("label1", tasks.get(0).getLabel());
Assertions.assertEquals("test_db", tasks.get(0).getDBName());
@ -236,7 +236,7 @@ public class StreamLoadManagerTest {
streamLoadManager.beginLoadTaskFromFrontend(
dbName, tableName, labelName2, "", "", timeoutMillis, channelNum, channelId, resp);
List<StreamLoadTask> tasks = streamLoadManager.getTaskByName(null);
List<AbstractStreamLoadTask> tasks = streamLoadManager.getTaskByName(null);
Assertions.assertEquals(2, tasks.size());
Assertions.assertEquals("label1", tasks.get(0).getLabel());
Assertions.assertEquals("label2", tasks.get(1).getLabel());

View File

@ -0,0 +1,138 @@
// Copyright 2021-present StarRocks, Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package com.starrocks.load.streamload;
import com.starrocks.catalog.Database;
import com.starrocks.catalog.OlapTable;
import com.starrocks.common.Config;
import com.starrocks.common.StarRocksException;
import com.starrocks.common.jmockit.Deencapsulation;
import com.starrocks.http.rest.TransactionResult;
import com.starrocks.server.WarehouseManager;
import com.starrocks.thrift.TNetworkAddress;
import com.starrocks.transaction.TransactionState;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.util.List;
public class StreamLoadMultiStmtTaskTest {
private Database db;
private StreamLoadMultiStmtTask multiTask;
@BeforeEach
public void setUp() {
db = new Database(20000, "test_db");
multiTask = new StreamLoadMultiStmtTask(1L, db, "label_multi", "userA", "127.0.0.1", 10000L,
System.currentTimeMillis(), WarehouseManager.DEFAULT_RESOURCE);
}
@Test
public void testTryLoadExistingSubTask() throws Exception {
// prepare a sub task manually and put into taskMaps
StreamLoadTask sub = new StreamLoadTask(10L, new Database(), new OlapTable(), "label_multi", "userA",
"127.0.0.1", 5000, 1, 0, System.currentTimeMillis(), WarehouseManager.DEFAULT_RESOURCE);
Deencapsulation.setField(sub, "tableName", "t1");
Deencapsulation.invoke(sub, "setState", StreamLoadTask.State.LOADING);
java.util.Map<Integer, TNetworkAddress> addrMap = com.google.common.collect.Maps.newHashMap();
addrMap.put(0, new TNetworkAddress("beHost", 8040));
Deencapsulation.setField(sub, "channelIdToBEHTTPAddress", addrMap);
@SuppressWarnings("unchecked") java.util.Map<String, StreamLoadTask> map =
(java.util.Map<String, StreamLoadTask>) Deencapsulation.getField(multiTask, "taskMaps");
map.put("t1", sub);
TransactionResult resp = new TransactionResult();
TNetworkAddress addr = multiTask.tryLoad(0, "t1", resp);
Assertions.assertTrue(resp.stateOK());
Assertions.assertNotNull(addr);
Assertions.assertEquals("t1", multiTask.getTableName());
}
@Test
public void testExecuteTaskNoSubTask() {
TransactionResult resp = new TransactionResult();
Assertions.assertNull(multiTask.executeTask(0, "unknown", null, resp));
Assertions.assertFalse(resp.stateOK());
}
@Test
public void testPrepareChannelNoSubTask() {
TransactionResult resp = new TransactionResult();
multiTask.prepareChannel(0, "unknown", null, resp);
Assertions.assertFalse(resp.stateOK());
}
@Test
public void testCommitTxnEmpty() throws StarRocksException {
TransactionResult resp = new TransactionResult();
multiTask.commitTxn(null, resp);
Assertions.assertTrue(resp.stateOK());
Assertions.assertEquals("COMMITED", multiTask.getStateName());
}
@Test
public void testManualCancelTask() throws StarRocksException {
TransactionResult resp = new TransactionResult();
multiTask.manualCancelTask(resp);
Assertions.assertEquals("CANCELLED", multiTask.getStateName());
Assertions.assertTrue(multiTask.endTimeMs() > 0);
}
@Test
public void testCheckNeedRemoveAndDurable() throws Exception {
Assertions.assertFalse(multiTask.checkNeedRemove(System.currentTimeMillis(), false));
StreamLoadTask sub = new StreamLoadTask(2L, db, new OlapTable(), "label_sub", "u", "127.0.0.1", 1000, 1, 0,
System.currentTimeMillis(), WarehouseManager.DEFAULT_RESOURCE);
Deencapsulation.setField(sub, "state", StreamLoadTask.State.FINISHED);
@SuppressWarnings("unchecked") java.util.Map<String, StreamLoadTask> map =
(java.util.Map<String, StreamLoadTask>) Deencapsulation.getField(multiTask, "taskMaps");
map.put("tbl", sub);
Deencapsulation.setField(multiTask, "endTimeMs",
System.currentTimeMillis() - (Config.stream_load_task_keep_max_second * 1000L + 10));
Assertions.assertTrue(multiTask.isFinalState());
Assertions.assertTrue(multiTask.checkNeedRemove(System.currentTimeMillis(), false));
}
@Test
public void testToThriftAndStreamLoadThriftEmpty() {
Assertions.assertTrue(multiTask.toThrift().isEmpty());
Assertions.assertTrue(multiTask.toStreamLoadThrift().isEmpty());
}
@Test
public void testCallbackDelegations() throws Exception {
StreamLoadTask sub1 = new StreamLoadTask(3L, db, new OlapTable(), "l1", "u", "127.0.0.1", 1000, 1, 0,
System.currentTimeMillis(), WarehouseManager.DEFAULT_RESOURCE);
StreamLoadTask sub2 = new StreamLoadTask(4L, db, new OlapTable(), "l2", "u", "127.0.0.1", 1000, 1, 0,
System.currentTimeMillis(), WarehouseManager.DEFAULT_RESOURCE);
java.util.Map<String, StreamLoadTask> map =
(java.util.Map<String, StreamLoadTask>) Deencapsulation.getField(multiTask, "taskMaps");
map.put("t1", sub1);
map.put("t2", sub2);
TransactionState txnState = new TransactionState();
multiTask.beforePrepared(txnState);
multiTask.afterPrepared(txnState, true);
multiTask.replayOnPrepared(txnState);
multiTask.beforeCommitted(txnState);
multiTask.afterCommitted(txnState, true);
multiTask.replayOnCommitted(txnState);
multiTask.afterAborted(txnState, true, "reason");
multiTask.replayOnAborted(txnState);
multiTask.afterVisible(txnState, true);
multiTask.replayOnVisible(txnState);
List<List<String>> show = multiTask.getShowInfo();
Assertions.assertTrue(show.isEmpty() || show.size() >= 0);
}
}

View File

@ -17,9 +17,11 @@ package com.starrocks.load.streamload;
import com.google.common.collect.Maps;
import com.starrocks.catalog.Database;
import com.starrocks.catalog.OlapTable;
import com.starrocks.common.Config;
import com.starrocks.common.DuplicatedRequestException;
import com.starrocks.common.ExceptionChecker;
import com.starrocks.common.StarRocksException;
import com.starrocks.common.Status;
import com.starrocks.common.jmockit.Deencapsulation;
import com.starrocks.http.rest.TransactionResult;
import com.starrocks.load.loadv2.LoadJob;
@ -27,9 +29,11 @@ import com.starrocks.load.loadv2.ManualLoadTxnCommitAttachment;
import com.starrocks.load.routineload.RLTaskTxnCommitAttachment;
import com.starrocks.qe.DefaultCoordinator;
import com.starrocks.qe.QeProcessorImpl;
import com.starrocks.qe.scheduler.Coordinator;
import com.starrocks.server.WarehouseManager;
import com.starrocks.task.LoadEtlTask;
import com.starrocks.thrift.TLoadInfo;
import com.starrocks.thrift.TNetworkAddress;
import com.starrocks.thrift.TUniqueId;
import com.starrocks.transaction.TransactionState;
import com.starrocks.warehouse.WarehouseIdleChecker;
@ -43,7 +47,9 @@ import org.mockito.Mockito;
import java.util.Map;
import static com.starrocks.common.ErrorCode.ERR_NO_PARTITIONS_HAVE_DATA_LOAD;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.same;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@ -157,7 +163,7 @@ public class StreamLoadTaskTest {
streamLoadTask.setLoadState(attachment, "Error message");
TLoadInfo loadInfo = streamLoadTask.toThrift();
TLoadInfo loadInfo = streamLoadTask.toThrift().get(0);
Assertions.assertEquals(100L, loadInfo.getNum_sink_rows());
Assertions.assertEquals(10L, loadInfo.getNum_filtered_rows());
@ -178,7 +184,7 @@ public class StreamLoadTaskTest {
streamLoadTask.setLoadState(attachment, "Another error message");
TLoadInfo loadInfo = streamLoadTask.toThrift();
TLoadInfo loadInfo = streamLoadTask.toThrift().get(0);
Assertions.assertEquals(200L, loadInfo.getNum_sink_rows());
Assertions.assertEquals(20L, loadInfo.getNum_filtered_rows());
@ -226,4 +232,224 @@ public class StreamLoadTaskTest {
streamLoadTask1.beginTxn(0, 1, requestId, coordinator, resp);
Assertions.assertTrue(resp.stateOK());
}
@Test
public void testBeginTxnInVariousStates() throws StarRocksException {
StreamLoadTask taskReal = new StreamLoadTask(999, new Database(), new OlapTable(), "t_label", "u", "127.0.0.1",
10000, 2, 0, System.currentTimeMillis(), WarehouseManager.DEFAULT_RESOURCE);
StreamLoadTask task = Mockito.spy(taskReal);
doNothing().when(task).unprotectedBeginTxn(any(), any());
TransactionResult resp = new TransactionResult();
task.beginTxn(0, 2, null,
new TransactionState.TxnCoordinator(TransactionState.TxnSourceType.FE, "fe"), resp);
Assertions.assertTrue(resp.stateOK());
for (StreamLoadTask.State st : java.util.List.of(StreamLoadTask.State.PREPARED,
StreamLoadTask.State.COMMITED,
StreamLoadTask.State.CANCELLED,
StreamLoadTask.State.FINISHED)) {
Deencapsulation.invoke(task, "setState", st);
TransactionResult r = new TransactionResult();
task.beginTxn(0, 2, null,
new TransactionState.TxnCoordinator(TransactionState.TxnSourceType.FE, "fe"), r);
Assertions.assertTrue(r.stateOK());
}
}
@Test
public void testTryLoadRedirectSuccessAndFailure() throws StarRocksException {
StreamLoadTask task = new StreamLoadTask(1000, new Database(), new OlapTable(), "t_label", "u", "127.0.0.1",
10000, 1, 0, System.currentTimeMillis(), WarehouseManager.DEFAULT_RESOURCE);
Deencapsulation.setField(task, "tableName", "tbl");
Deencapsulation.invoke(task, "setState", StreamLoadTask.State.LOADING);
java.util.Map<Integer, TNetworkAddress> addrMap = com.google.common.collect.Maps.newHashMap();
addrMap.put(0, new TNetworkAddress("beHost", 8040));
Deencapsulation.setField(task, "channelIdToBEHTTPAddress", addrMap);
TransactionResult respOk = new TransactionResult();
TNetworkAddress ret = task.tryLoad(0, task.getTableName(), respOk);
Assertions.assertNotNull(ret);
StreamLoadTask task2 = new StreamLoadTask(1001, new Database(), new OlapTable(), "t_label2", "u", "127.0.0.1",
10000, 1, 0, System.currentTimeMillis(), WarehouseManager.DEFAULT_RESOURCE);
Deencapsulation.setField(task2, "tableName", "tbl2");
Deencapsulation.invoke(task2, "setState", StreamLoadTask.State.LOADING);
Deencapsulation.setField(task2, "channelIdToBEHTTPAddress", com.google.common.collect.Maps.newHashMap());
TransactionResult respErr = new TransactionResult();
task2.tryLoad(0, task2.getTableName(), respErr);
Assertions.assertFalse(respErr.stateOK());
}
@Test
public void testExecuteTaskLoadingBranches() {
StreamLoadTask task = new StreamLoadTask(1002, new Database(), new OlapTable(), "t_label3", "u", "127.0.0.1",
10000, 1, 0, System.currentTimeMillis(), WarehouseManager.DEFAULT_RESOURCE);
Deencapsulation.setField(task, "tableName", "tbl3");
Deencapsulation.invoke(task, "setState", StreamLoadTask.State.LOADING);
java.util.Map<Integer, TNetworkAddress> addrMap = com.google.common.collect.Maps.newHashMap();
addrMap.put(0, new TNetworkAddress("beHost", 8040));
Deencapsulation.setField(task, "channelIdToBEHTTPAddress", addrMap);
TransactionResult resp = new TransactionResult();
TNetworkAddress addr = task.executeTask(0, task.getTableName(), null, resp);
Assertions.assertNotNull(addr);
StreamLoadTask task2 = new StreamLoadTask(1003, new Database(), new OlapTable(), "t_label4", "u", "127.0.0.1",
10000, 1, 0, System.currentTimeMillis(), WarehouseManager.DEFAULT_RESOURCE);
Deencapsulation.setField(task2, "tableName", "tbl4");
Deencapsulation.invoke(task2, "setState", StreamLoadTask.State.LOADING);
Deencapsulation.setField(task2, "channelIdToBEHTTPAddress", com.google.common.collect.Maps.newHashMap());
TransactionResult resp2 = new TransactionResult();
task2.executeTask(0, task2.getTableName(), null, resp2);
Assertions.assertFalse(resp2.stateOK());
}
@Test
public void testWaitCoordFinishDataQualityFail(@Mocked Coordinator c) {
StreamLoadTask task = new StreamLoadTask(1005, new Database(), new OlapTable(), "t_label6", "u", "127.0.0.1",
10000, 1, 0, System.currentTimeMillis(), WarehouseManager.DEFAULT_RESOURCE);
Deencapsulation.invoke(task, "setState", StreamLoadTask.State.PREPARING);
Deencapsulation.setField(task, "coord", c);
StreamLoadKvParams params = new StreamLoadKvParams(java.util.Map.of("max_filter_ratio", "0.1"));
Deencapsulation.setField(task, "streamLoadParams", params);
new Expectations() {
{
c.join(anyInt);
result = true;
c.getExecStatus();
result = Status.OK;
c.getLoadCounters();
result = java.util.Map.of("dpp.norm.ALL", "100", "dpp.abnorm.ALL", "50",
"unselected.rows", "0", "loaded.bytes", "10");
c.isEnableLoadProfile();
result = false;
c.getTrackingUrl();
result = "url";
}
};
TransactionResult resp = new TransactionResult();
task.waitCoordFinish(resp);
Assertions.assertFalse(resp.stateOK());
}
@Test
public void testWaitCoordFinishSuccess(@Mocked Coordinator c) {
StreamLoadTask task = new StreamLoadTask(1006, new Database(), new OlapTable(), "t_label7", "u", "127.0.0.1",
10000, 1, 0, System.currentTimeMillis(), WarehouseManager.DEFAULT_RESOURCE);
Deencapsulation.invoke(task, "setState", StreamLoadTask.State.PREPARING);
Deencapsulation.setField(task, "coord", c);
StreamLoadKvParams params = new StreamLoadKvParams(java.util.Map.of("max_filter_ratio", "0.5"));
Deencapsulation.setField(task, "streamLoadParams", params);
new Expectations() {
{
c.join(anyInt);
result = true;
c.getExecStatus();
result = Status.OK;
c.getLoadCounters();
result = java.util.Map.of("dpp.norm.ALL", "100", "dpp.abnorm.ALL", "10",
"unselected.rows", "0", "loaded.bytes", "10");
c.isEnableLoadProfile();
result = false;
c.getTrackingUrl();
result = "url";
}
};
TransactionResult resp = new TransactionResult();
task.waitCoordFinish(resp);
Assertions.assertTrue(resp.stateOK());
}
@Test
public void testCommitTxnNotPrepared() throws StarRocksException {
StreamLoadTask task = new StreamLoadTask(1007, new Database(), new OlapTable(), "t_label8", "u", "127.0.0.1",
10000, 1, 0, System.currentTimeMillis(), WarehouseManager.DEFAULT_RESOURCE);
Deencapsulation.invoke(task, "setState", StreamLoadTask.State.BEFORE_LOAD);
TransactionResult resp = new TransactionResult();
task.commitTxn(null, resp);
Assertions.assertTrue(resp.stateOK());
}
@Test
public void testManualCancelWhileCommitting() throws StarRocksException {
StreamLoadTask task = new StreamLoadTask(1008, new Database(), new OlapTable(), "t_label9", "u", "127.0.0.1",
10000, 1, 0, System.currentTimeMillis(), WarehouseManager.DEFAULT_RESOURCE);
Deencapsulation.setField(task, "isCommitting", true);
TransactionResult resp = new TransactionResult();
task.manualCancelTask(resp);
Assertions.assertTrue(resp.stateOK());
}
@Test
public void testCancelTaskBranches() {
StreamLoadTask task = new StreamLoadTask(1009, new Database(), new OlapTable(), "t_label10", "u", "127.0.0.1",
10000, 1, 0, System.currentTimeMillis(), WarehouseManager.DEFAULT_RESOURCE);
Deencapsulation.invoke(task, "setState", StreamLoadTask.State.CANCELLED);
Deencapsulation.setField(task, "errorMsg", "err");
String r = task.cancelTask("manual");
Assertions.assertTrue(r.contains("CANCELLED"));
StreamLoadTask task2 = new StreamLoadTask(1010, new Database(), new OlapTable(), "t_label11", "u", "127.0.0.1",
10000, 2, 0, System.currentTimeMillis(), WarehouseManager.DEFAULT_RESOURCE);
String r2 = task2.cancelTask("manual");
Assertions.assertNull(r2);
}
@Test
public void testCheckNeedRemoveAndDataQuality() {
StreamLoadTask task = new StreamLoadTask(1011, new Database(), new OlapTable(), "t_label12", "u", "127.0.0.1",
10000, 1, 0, System.currentTimeMillis(), WarehouseManager.DEFAULT_RESOURCE);
Deencapsulation.invoke(task, "setState", StreamLoadTask.State.FINISHED);
Deencapsulation.setField(task, "endTimeMs", System.currentTimeMillis());
Assertions.assertFalse(task.checkNeedRemove(System.currentTimeMillis(), false));
long end = (long) Deencapsulation.getField(task, "endTimeMs");
long later = end + Config.stream_load_task_keep_max_second * 1000L + 10;
Assertions.assertTrue(task.checkNeedRemove(later, false));
Assertions.assertTrue((Boolean) Deencapsulation.invoke(task, "checkDataQuality"));
}
@Test
public void testStateHelpersAndStringByType() {
StreamLoadTask task = new StreamLoadTask(1012, new Database(), new OlapTable(), "t_label13", "u", "127.0.0.1",
10000, 1, 0, System.currentTimeMillis(), WarehouseManager.DEFAULT_RESOURCE);
Assertions.assertEquals("PARALLEL_STREAM_LOAD", task.getStringByType());
task.setType(StreamLoadTask.Type.ROUTINE_LOAD);
Assertions.assertEquals("ROUTINE_LOAD", task.getStringByType());
task.setType(StreamLoadTask.Type.STREAM_LOAD);
Assertions.assertEquals("STREAM_LOAD", task.getStringByType());
task.setType(StreamLoadTask.Type.MULTI_STATEMENT_STREAM_LOAD);
Assertions.assertEquals("UNKNOWN", task.getStringByType());
}
@Test
public void testShowInfoTrackingUrl() {
StreamLoadTask task = new StreamLoadTask(1013, new Database(), new OlapTable(), "t_label14", "u", "127.0.0.1",
10000, 1, 0, System.currentTimeMillis(), WarehouseManager.DEFAULT_RESOURCE);
Deencapsulation.setField(task, "trackingUrl", "url");
java.util.List<java.util.List<String>> rows = task.getShowInfo();
Assertions.assertEquals(1, rows.size());
int lastIdx = rows.get(0).size() - 1;
Assertions.assertTrue(rows.get(0).get(lastIdx).startsWith("select tracking_log"));
}
@Test
public void testToThriftProgress() {
StreamLoadTask task = new StreamLoadTask(1014, new Database(), new OlapTable(), "t_label15", "u", "127.0.0.1",
10000, 1, 0, System.currentTimeMillis(), WarehouseManager.DEFAULT_RESOURCE);
Deencapsulation.invoke(task, "setState", StreamLoadTask.State.FINISHED);
var info = task.toThrift().get(0);
Assertions.assertEquals("100%", info.getProgress());
StreamLoadTask task2 = new StreamLoadTask(1015, new Database(), new OlapTable(), "t_label16", "u", "127.0.0.1",
10000, 1, 0, System.currentTimeMillis(), WarehouseManager.DEFAULT_RESOURCE);
var info2 = task2.toThrift().get(0);
Assertions.assertEquals("0%", info2.getProgress());
}
@Test
public void testGsonPreAndPostProcess() throws Exception {
StreamLoadTask task = new StreamLoadTask(1016, new Database(), new OlapTable(), "t_label17", "u", "127.0.0.1",
10000, 1, 0, System.currentTimeMillis(), WarehouseManager.DEFAULT_RESOURCE);
TUniqueId id = new TUniqueId(1, 2);
task.setTUniqueId(id);
task.gsonPreProcess();
Deencapsulation.setField(task, "loadId", null);
task.gsonPostProcess();
Assertions.assertEquals(id.getHi(), task.getTUniqueId().getHi());
}
}

View File

@ -137,6 +137,7 @@ public class OperationTypeTest {
Assertions.assertTrue(OperationType.IGNORABLE_OPERATIONS.contains(OperationType.OP_DROP_FUNCTION_V2));
Assertions.assertTrue(OperationType.IGNORABLE_OPERATIONS.contains(OperationType.OP_UPDATE_USER_PROP_V3));
Assertions.assertTrue(OperationType.IGNORABLE_OPERATIONS.contains(OperationType.OP_CREATE_STREAM_LOAD_TASK_V2));
Assertions.assertTrue(OperationType.IGNORABLE_OPERATIONS.contains(OperationType.OP_CREATE_MULTI_STMT_STREAM_LOAD_TASK));
Assertions.assertTrue(OperationType.IGNORABLE_OPERATIONS.contains(OperationType.OP_SET_DEFAULT_STORAGE_VOLUME));
Assertions.assertTrue(OperationType.IGNORABLE_OPERATIONS.contains(OperationType.OP_CREATE_STORAGE_VOLUME));
Assertions.assertTrue(OperationType.IGNORABLE_OPERATIONS.contains(OperationType.OP_UPDATE_STORAGE_VOLUME));

View File

@ -115,6 +115,7 @@ struct TProxyResult {
struct TStreamLoadChannel {
1: optional string label
2: optional i32 channel_id
3: optional string table_name
}
struct TGetTabletsInfoRequest {

View File

@ -0,0 +1,311 @@
-- name: test_multi_stream_load_single_table_stmt
create database db_${uuid0};
-- result:
-- !result
use db_${uuid0};
-- result:
-- !result
CREATE TABLE `test` (
`id` int
) ENGINE=OLAP
DUPLICATE KEY(`id`)
DISTRIBUTED BY HASH(`id`) BUCKETS 10
PROPERTIES (
"replication_num" = "1"
);
-- result:
-- !result
[UC]shell: curl --location-trusted -u root: -H "label:llal_${uuid0}" -H "db:db_${uuid0}" -H "source_type:12" -XPOST ${url}/api/transaction/begin
-- result:
0
{
"Status": "OK",
"Message": ""
}
-- !result
[UC]shell: curl --location-trusted -u root: -H "label:llal_${uuid0}" -H "Expect:100-continue" -H "db:db_${uuid0}" -H "table:test" -H "source_type:12" -d '1' -X PUT ${url}/api/transaction/load
-- result:
0
{
"Status": "OK",
"Message": "",
"Label": "llal_fda89880f9794af4a7be32229f1f526d",
"TxnId": 3005,
"LoadBytes": 1,
"StreamLoadPlanTimeMs": 0,
"ReceivedDataTimeMs": 0
}
-- !result
[UC]shell: curl --location-trusted -u root: -H "label:llal_${uuid0}" -H "db:db_${uuid0}" -H "source_type:12" -XPOST ${url}/api/transaction/commit
-- result:
0
{
"Label": "llal_fda89880f9794af4a7be32229f1f526d",
"Status": "OK",
"TxnId": 3005,
"ChannelId": 0,
"Message": "",
"Prepared Channel Num": 1
}
-- !result
select * from test;
-- result:
1
-- !result
-- name: test_multi_stream_load_single_table_multi_stmt
create database db_${uuid0};
-- result:
-- !result
use db_${uuid0};
-- result:
-- !result
CREATE TABLE `test` (
`id` int
)
PROPERTIES (
"replication_num" = "1"
);
-- result:
-- !result
[UC]shell: curl --location-trusted -u root: -H "label:llal_${uuid0}" -H "db:db_${uuid0}" -H "source_type:12" -XPOST ${url}/api/transaction/begin
-- result:
0
{
"Status": "OK",
"Message": ""
}
-- !result
[UC]shell: curl --location-trusted -u root: -H "label:llal_${uuid0}" -H "Expect:100-continue" -H "db:db_${uuid0}" -H "table:test" -H "source_type:12" -H "format:json" -d '{"id":"1"}' -X PUT ${url}/api/transaction/load
-- result:
0
{
"Status": "OK",
"Message": "",
"Label": "llal_f119223b1ca143b38cb6dd5a27f94ce8",
"TxnId": 3004,
"LoadBytes": 10,
"StreamLoadPlanTimeMs": 0,
"ReceivedDataTimeMs": 0
}
-- !result
[UC]shell: curl --location-trusted -u root: -H "label:llal_${uuid0}" -H "Expect:100-continue" -H "db:db_${uuid0}" -H "table:test" -H "source_type:12" -H "format:json" -d '{"id":"2"}' -X PUT ${url}/api/transaction/load
-- result:
0
{
"Status": "OK",
"Message": "",
"Label": "llal_f119223b1ca143b38cb6dd5a27f94ce8",
"TxnId": 3004,
"LoadBytes": 10,
"StreamLoadPlanTimeMs": 0,
"ReceivedDataTimeMs": 0
}
-- !result
[UC]shell: curl --location-trusted -u root: -H "label:llal_${uuid0}" -H "Expect:100-continue" -H "db:db_${uuid0}" -H "table:test" -H "source_type:12" -H "format:json" -d '{"id":"3"}' -X PUT ${url}/api/transaction/load
-- result:
0
{
"Status": "OK",
"Message": "",
"Label": "llal_f119223b1ca143b38cb6dd5a27f94ce8",
"TxnId": 3004,
"LoadBytes": 10,
"StreamLoadPlanTimeMs": 0,
"ReceivedDataTimeMs": 0
}
-- !result
[UC]shell: curl --location-trusted -u root: -H "label:llal_${uuid0}" -H "db:db_${uuid0}" -H "source_type:12" -XPOST ${url}/api/transaction/commit
-- result:
0
{
"Label": "llal_f119223b1ca143b38cb6dd5a27f94ce8",
"Status": "OK",
"TxnId": 3004,
"ChannelId": 0,
"Message": "",
"Prepared Channel Num": 1
}
-- !result
select * from test;
-- result:
1
3
2
-- !result
-- name: test_multi_stream_load_multi_table_multi_stmt
create database db_${uuid0};
-- result:
-- !result
use db_${uuid0};
-- result:
-- !result
CREATE TABLE `test1` (
`id` int
);
-- result:
-- !result
CREATE TABLE `test2` (
`id` int
);
-- result:
-- !result
CREATE TABLE `test3` (
`id` int
);
-- result:
-- !result
[UC]shell: curl --location-trusted -u root: -H "label:llal_${uuid0}" -H "db:db_${uuid0}" -H "source_type:12" -XPOST ${url}/api/transaction/begin
-- result:
0
{
"Status": "OK",
"Message": ""
}
-- !result
[UC]shell: curl --location-trusted -u root: -H "label:llal_${uuid0}" -H "Expect:100-continue" -H "db:db_${uuid0}" -H "table:test1" -H "source_type:12" -H "format:json" -d '{"id":"1"}' -X PUT ${url}/api/transaction/load
-- result:
0
{
"Status": "OK",
"Message": "",
"Label": "llal_14fbd896490246ae86e8d3027d1e19d9",
"TxnId": 3006,
"LoadBytes": 10,
"StreamLoadPlanTimeMs": 0,
"ReceivedDataTimeMs": 0
}
-- !result
[UC]shell: curl --location-trusted -u root: -H "label:llal_${uuid0}" -H "Expect:100-continue" -H "db:db_${uuid0}" -H "table:test1" -H "source_type:12" -H "format:json" -d '{"id":"2"}' -X PUT ${url}/api/transaction/load
-- result:
0
{
"Status": "OK",
"Message": "",
"Label": "llal_14fbd896490246ae86e8d3027d1e19d9",
"TxnId": 3006,
"LoadBytes": 10,
"StreamLoadPlanTimeMs": 0,
"ReceivedDataTimeMs": 0
}
-- !result
[UC]shell: curl --location-trusted -u root: -H "label:llal_${uuid0}" -H "Expect:100-continue" -H "db:db_${uuid0}" -H "table:test1" -H "source_type:12" -H "format:json" -d '{"id":"3"}' -X PUT ${url}/api/transaction/load
-- result:
0
{
"Status": "OK",
"Message": "",
"Label": "llal_14fbd896490246ae86e8d3027d1e19d9",
"TxnId": 3006,
"LoadBytes": 10,
"StreamLoadPlanTimeMs": 0,
"ReceivedDataTimeMs": 0
}
-- !result
[UC]shell: curl --location-trusted -u root: -H "label:llal_${uuid0}" -H "Expect:100-continue" -H "db:db_${uuid0}" -H "table:test2" -H "source_type:12" -H "format:json" -d '{"id":"1"}' -X PUT ${url}/api/transaction/load
-- result:
0
{
"Status": "OK",
"Message": "",
"Label": "llal_14fbd896490246ae86e8d3027d1e19d9",
"TxnId": 3006,
"LoadBytes": 10,
"StreamLoadPlanTimeMs": 0,
"ReceivedDataTimeMs": 0
}
-- !result
[UC]shell: curl --location-trusted -u root: -H "label:llal_${uuid0}" -H "Expect:100-continue" -H "db:db_${uuid0}" -H "table:test2" -H "source_type:12" -H "format:json" -d '{"id":"2"}' -X PUT ${url}/api/transaction/load
-- result:
0
{
"Status": "OK",
"Message": "",
"Label": "llal_14fbd896490246ae86e8d3027d1e19d9",
"TxnId": 3006,
"LoadBytes": 10,
"StreamLoadPlanTimeMs": 0,
"ReceivedDataTimeMs": 0
}
-- !result
[UC]shell: curl --location-trusted -u root: -H "label:llal_${uuid0}" -H "Expect:100-continue" -H "db:db_${uuid0}" -H "table:test2" -H "source_type:12" -H "format:json" -d '{"id":"3"}' -X PUT ${url}/api/transaction/load
-- result:
0
{
"Status": "OK",
"Message": "",
"Label": "llal_14fbd896490246ae86e8d3027d1e19d9",
"TxnId": 3006,
"LoadBytes": 10,
"StreamLoadPlanTimeMs": 0,
"ReceivedDataTimeMs": 0
}
-- !result
[UC]shell: curl --location-trusted -u root: -H "label:llal_${uuid0}" -H "Expect:100-continue" -H "db:db_${uuid0}" -H "table:test3" -H "source_type:12" -H "format:json" -d '{"id":"1"}' -X PUT ${url}/api/transaction/load
-- result:
0
{
"Status": "OK",
"Message": "",
"Label": "llal_14fbd896490246ae86e8d3027d1e19d9",
"TxnId": 3006,
"LoadBytes": 10,
"StreamLoadPlanTimeMs": 0,
"ReceivedDataTimeMs": 0
}
-- !result
[UC]shell: curl --location-trusted -u root: -H "label:llal_${uuid0}" -H "Expect:100-continue" -H "db:db_${uuid0}" -H "table:test3" -H "source_type:12" -H "format:json" -d '{"id":"2"}' -X PUT ${url}/api/transaction/load
-- result:
0
{
"Status": "OK",
"Message": "",
"Label": "llal_14fbd896490246ae86e8d3027d1e19d9",
"TxnId": 3006,
"LoadBytes": 10,
"StreamLoadPlanTimeMs": 0,
"ReceivedDataTimeMs": 0
}
-- !result
[UC]shell: curl --location-trusted -u root: -H "label:llal_${uuid0}" -H "Expect:100-continue" -H "db:db_${uuid0}" -H "table:test3" -H "source_type:12" -H "format:json" -d '{"id":"3"}' -X PUT ${url}/api/transaction/load
-- result:
0
{
"Status": "OK",
"Message": "",
"Label": "llal_14fbd896490246ae86e8d3027d1e19d9",
"TxnId": 3006,
"LoadBytes": 10,
"StreamLoadPlanTimeMs": 0,
"ReceivedDataTimeMs": 0
}
-- !result
[UC]shell: curl --location-trusted -u root: -H "label:llal_${uuid0}" -H "db:db_${uuid0}" -H "source_type:12" -XPOST ${url}/api/transaction/commit
-- result:
0
{
"Label": "llal_14fbd896490246ae86e8d3027d1e19d9",
"Status": "OK",
"TxnId": 3006,
"ChannelId": 0,
"Message": "",
"Prepared Channel Num": 1
}
-- !result
select * from test1;
-- result:
1
3
2
-- !result
select * from test2;
-- result:
2
3
1
-- !result
select * from test3;
-- result:
1
3
2
-- !result

View File

@ -0,0 +1,72 @@
-- name: test_multi_stream_load_single_table_stmt
create database db_${uuid0};
use db_${uuid0};
CREATE TABLE `test` (
`id` int
) ENGINE=OLAP
DUPLICATE KEY(`id`)
DISTRIBUTED BY HASH(`id`) BUCKETS 10
PROPERTIES (
"replication_num" = "1"
);
[UC]shell: curl --location-trusted -u root: -H "label:llal_${uuid0}" -H "db:db_${uuid0}" -H "source_type:12" -XPOST ${url}/api/transaction/begin
[UC]shell: curl --location-trusted -u root: -H "label:llal_${uuid0}" -H "Expect:100-continue" -H "db:db_${uuid0}" -H "table:test" -H "source_type:12" -d '1' -X PUT ${url}/api/transaction/load
[UC]shell: curl --location-trusted -u root: -H "label:llal_${uuid0}" -H "db:db_${uuid0}" -H "source_type:12" -XPOST ${url}/api/transaction/commit
select * from test;
-- name: test_multi_stream_load_single_table_multi_stmt
create database db_${uuid0};
use db_${uuid0};
CREATE TABLE `test` (
`id` int
)
PROPERTIES (
"replication_num" = "1"
);
[UC]shell: curl --location-trusted -u root: -H "label:llal_${uuid0}" -H "db:db_${uuid0}" -H "source_type:12" -XPOST ${url}/api/transaction/begin
[UC]shell: curl --location-trusted -u root: -H "label:llal_${uuid0}" -H "Expect:100-continue" -H "db:db_${uuid0}" -H "table:test" -H "source_type:12" -H "format:json" -d '{"id":"1"}' -X PUT ${url}/api/transaction/load
[UC]shell: curl --location-trusted -u root: -H "label:llal_${uuid0}" -H "Expect:100-continue" -H "db:db_${uuid0}" -H "table:test" -H "source_type:12" -H "format:json" -d '{"id":"2"}' -X PUT ${url}/api/transaction/load
[UC]shell: curl --location-trusted -u root: -H "label:llal_${uuid0}" -H "Expect:100-continue" -H "db:db_${uuid0}" -H "table:test" -H "source_type:12" -H "format:json" -d '{"id":"3"}' -X PUT ${url}/api/transaction/load
[UC]shell: curl --location-trusted -u root: -H "label:llal_${uuid0}" -H "db:db_${uuid0}" -H "source_type:12" -XPOST ${url}/api/transaction/commit
select * from test;
-- name: test_multi_stream_load_multi_table_multi_stmt
create database db_${uuid0};
use db_${uuid0};
CREATE TABLE `test1` (
`id` int
);
CREATE TABLE `test2` (
`id` int
);
CREATE TABLE `test3` (
`id` int
);
[UC]shell: curl --location-trusted -u root: -H "label:llal_${uuid0}" -H "db:db_${uuid0}" -H "source_type:12" -XPOST ${url}/api/transaction/begin
[UC]shell: curl --location-trusted -u root: -H "label:llal_${uuid0}" -H "Expect:100-continue" -H "db:db_${uuid0}" -H "table:test1" -H "source_type:12" -H "format:json" -d '{"id":"1"}' -X PUT ${url}/api/transaction/load
[UC]shell: curl --location-trusted -u root: -H "label:llal_${uuid0}" -H "Expect:100-continue" -H "db:db_${uuid0}" -H "table:test1" -H "source_type:12" -H "format:json" -d '{"id":"2"}' -X PUT ${url}/api/transaction/load
[UC]shell: curl --location-trusted -u root: -H "label:llal_${uuid0}" -H "Expect:100-continue" -H "db:db_${uuid0}" -H "table:test1" -H "source_type:12" -H "format:json" -d '{"id":"3"}' -X PUT ${url}/api/transaction/load
[UC]shell: curl --location-trusted -u root: -H "label:llal_${uuid0}" -H "Expect:100-continue" -H "db:db_${uuid0}" -H "table:test2" -H "source_type:12" -H "format:json" -d '{"id":"1"}' -X PUT ${url}/api/transaction/load
[UC]shell: curl --location-trusted -u root: -H "label:llal_${uuid0}" -H "Expect:100-continue" -H "db:db_${uuid0}" -H "table:test2" -H "source_type:12" -H "format:json" -d '{"id":"2"}' -X PUT ${url}/api/transaction/load
[UC]shell: curl --location-trusted -u root: -H "label:llal_${uuid0}" -H "Expect:100-continue" -H "db:db_${uuid0}" -H "table:test2" -H "source_type:12" -H "format:json" -d '{"id":"3"}' -X PUT ${url}/api/transaction/load
[UC]shell: curl --location-trusted -u root: -H "label:llal_${uuid0}" -H "Expect:100-continue" -H "db:db_${uuid0}" -H "table:test3" -H "source_type:12" -H "format:json" -d '{"id":"1"}' -X PUT ${url}/api/transaction/load
[UC]shell: curl --location-trusted -u root: -H "label:llal_${uuid0}" -H "Expect:100-continue" -H "db:db_${uuid0}" -H "table:test3" -H "source_type:12" -H "format:json" -d '{"id":"2"}' -X PUT ${url}/api/transaction/load
[UC]shell: curl --location-trusted -u root: -H "label:llal_${uuid0}" -H "Expect:100-continue" -H "db:db_${uuid0}" -H "table:test3" -H "source_type:12" -H "format:json" -d '{"id":"3"}' -X PUT ${url}/api/transaction/load
[UC]shell: curl --location-trusted -u root: -H "label:llal_${uuid0}" -H "db:db_${uuid0}" -H "source_type:12" -XPOST ${url}/api/transaction/commit
select * from test1;
select * from test2;
select * from test3;