[Enhancement] to avoid memory stats inaccurate case (#63788)
Signed-off-by: yan zhang <dirtysalt1987@gmail.com>
This commit is contained in:
parent
3e1b09d470
commit
ece0f0d560
|
|
@ -80,12 +80,19 @@ Status ChunkSource::buffer_next_batch_chunks_blocking(RuntimeState* state, size_
|
|||
if (_status.is_end_of_file()) {
|
||||
chunk->owner_info().set_owner_id(owner_id, true);
|
||||
_chunk_buffer.put(_scan_operator_seq, std::move(chunk), std::move(_chunk_token));
|
||||
break;
|
||||
} else if (_status.is_time_out()) {
|
||||
chunk->owner_info().set_owner_id(owner_id, false);
|
||||
_chunk_buffer.put(_scan_operator_seq, std::move(chunk), std::move(_chunk_token));
|
||||
_status = Status::OK();
|
||||
break;
|
||||
} else if (_status.is_eagain()) {
|
||||
// EAGAIN is normal case, but sleep a while to avoid busy loop
|
||||
SleepFor(MonoDelta::FromNanoseconds(workgroup::WorkGroup::YIELD_PREEMPT_MAX_TIME_SPENT));
|
||||
_status = Status::OK();
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
break;
|
||||
}
|
||||
|
||||
// schema won't be used by the computing layer, here we just reset it.
|
||||
|
|
|
|||
|
|
@ -17,7 +17,6 @@
|
|||
#include "exec/connector_scan_node.h"
|
||||
#include "exec/pipeline/pipeline_driver.h"
|
||||
#include "exec/pipeline/scan/balanced_chunk_buffer.h"
|
||||
#include "runtime/exec_env.h"
|
||||
#include "runtime/runtime_state.h"
|
||||
|
||||
namespace starrocks::pipeline {
|
||||
|
|
@ -763,7 +762,7 @@ Status ConnectorChunkSource::_read_chunk(RuntimeState* state, ChunkPtr* chunk) {
|
|||
RETURN_IF_ERROR(_open_data_source(state, &mem_alloc_failed));
|
||||
if (mem_alloc_failed) {
|
||||
_mem_alloc_failed_count += 1;
|
||||
return Status::TimedOut("");
|
||||
return Status::EAgain("");
|
||||
}
|
||||
if (state->is_cancelled()) {
|
||||
return Status::Cancelled("canceled state");
|
||||
|
|
|
|||
Loading…
Reference in New Issue