[Enhancement] to avoid memory stats inaccurate case (backport #63788) (#63810)

Signed-off-by: yan zhang <dirtysalt1987@gmail.com>
Co-authored-by: yan zhang <dirtysalt1987@gmail.com>
This commit is contained in:
mergify[bot] 2025-10-08 11:19:32 +08:00 committed by GitHub
parent c42e55c57b
commit 34bba8b645
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 9 additions and 3 deletions

View File

@ -80,12 +80,19 @@ Status ChunkSource::buffer_next_batch_chunks_blocking(RuntimeState* state, size_
if (_status.is_end_of_file()) {
chunk->owner_info().set_owner_id(owner_id, true);
_chunk_buffer.put(_scan_operator_seq, std::move(chunk), std::move(_chunk_token));
break;
} else if (_status.is_time_out()) {
chunk->owner_info().set_owner_id(owner_id, false);
_chunk_buffer.put(_scan_operator_seq, std::move(chunk), std::move(_chunk_token));
_status = Status::OK();
break;
} else if (_status.is_eagain()) {
// EAGAIN is normal case, but sleep a while to avoid busy loop
SleepFor(MonoDelta::FromNanoseconds(workgroup::WorkGroup::YIELD_PREEMPT_MAX_TIME_SPENT));
_status = Status::OK();
} else {
break;
}
break;
}
// schema won't be used by the computing layer, here we just reset it.

View File

@ -17,7 +17,6 @@
#include "exec/connector_scan_node.h"
#include "exec/pipeline/pipeline_driver.h"
#include "exec/pipeline/scan/balanced_chunk_buffer.h"
#include "runtime/exec_env.h"
#include "runtime/runtime_state.h"
namespace starrocks::pipeline {
@ -763,7 +762,7 @@ Status ConnectorChunkSource::_read_chunk(RuntimeState* state, ChunkPtr* chunk) {
RETURN_IF_ERROR(_open_data_source(state, &mem_alloc_failed));
if (mem_alloc_failed) {
_mem_alloc_failed_count += 1;
return Status::TimedOut("");
return Status::EAgain("");
}
if (state->is_cancelled()) {
return Status::Cancelled("canceled state");