[Enhancement] Enforce request consistency for channel stream load (#63347)

Signed-off-by: meegoo <meegoo.sr@gmail.com>
This commit is contained in:
meegoo 2025-09-22 19:55:21 -07:00 committed by GitHub
parent eb306cde2d
commit 9dad29ab51
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
1 changed files with 11 additions and 3 deletions

View File

@ -499,11 +499,14 @@ Status TransactionStreamLoadAction::_parse_request(HttpRequest* http_req, Stream
}
Status TransactionStreamLoadAction::_exec_plan_fragment(HttpRequest* http_req, StreamLoadContext* ctx) {
if (ctx->is_channel_stream_load_context()) {
return Status::OK();
}
TStreamLoadPutRequest request;
RETURN_IF_ERROR(_parse_request(http_req, ctx, request));
// Enforce request parameter consistency across multiple HTTP calls of the same transaction.
// A streaming load may arrive in several requests (e.g., chunked uploads). We cache the first
// TStreamLoadPutRequest in ctx->request and require subsequent requests to be identical
// (headers like columns, format, separators, partitions, etc.). This prevents parameter drift
// that could lead to undefined behavior or loading into an unexpected schema.
// Note: For channel stream load, this check still applies; planning is skipped later.
if (ctx->request.db != "") {
if (ctx->request != request) {
return Status::InternalError("load request not equal last.");
@ -511,6 +514,11 @@ Status TransactionStreamLoadAction::_exec_plan_fragment(HttpRequest* http_req, S
} else {
ctx->request = request;
}
if (ctx->is_channel_stream_load_context()) {
// Channel stream load is planned elsewhere; here we only validate request equality above
// and return. The data path will proceed without re-planning.
return Status::OK();
}
// setup stream pipe
auto pipe = _exec_env->load_stream_mgr()->get(ctx->id);
if (pipe == nullptr) {