[Enhancement] extend query trace to support POV of thread (#12982)

This commit is contained in:
yan.zhang 2022-11-07 11:59:04 +08:00 committed by GitHub
parent dcdaccc4db
commit 3abd77e495
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 169 additions and 27 deletions

View File

@ -173,7 +173,7 @@ static inline bool is_multilane(pipeline::OperatorPtr& op) {
StatusOr<DriverState> PipelineDriver::process(RuntimeState* runtime_state, int worker_id) {
COUNTER_UPDATE(_schedule_counter, 1);
SCOPED_TIMER(_active_timer);
QUERY_TRACE_SCOPED("process", "");
QUERY_TRACE_SCOPED("process", _driver_name);
set_driver_state(DriverState::RUNNING);
size_t total_chunks_moved = 0;
size_t total_rows_moved = 0;
@ -387,7 +387,7 @@ void PipelineDriver::_close_operators(RuntimeState* runtime_state) {
void PipelineDriver::finalize(RuntimeState* runtime_state, DriverState state) {
VLOG_ROW << "[Driver] finalize, driver=" << this;
DCHECK(state == DriverState::FINISH || state == DriverState::CANCELED || state == DriverState::INTERNAL_ERROR);
QUERY_TRACE_BEGIN("finalize", "");
QUERY_TRACE_BEGIN("finalize", _driver_name);
_close_operators(runtime_state);
set_driver_state(state);
@ -433,7 +433,7 @@ void PipelineDriver::finalize(RuntimeState* runtime_state, DriverState state) {
wg->decr_num_queries();
}
}
QUERY_TRACE_END("finalize", "");
QUERY_TRACE_END("finalize", _driver_name);
// @TODO(silverbullet233): if necessary, remove the dump from the execution thread
// considering that this feature is generally used for debugging,
// I think it should not have a big impact now
@ -441,7 +441,7 @@ void PipelineDriver::finalize(RuntimeState* runtime_state, DriverState state) {
return;
}
}
QUERY_TRACE_END("finalize", "");
QUERY_TRACE_END("finalize", _driver_name);
}
void PipelineDriver::_update_overhead_timer() {

View File

@ -17,6 +17,7 @@
#include "exec/pipeline/scan/scan_operator.h"
#include "exec/pipeline/source_operator.h"
#include "exec/workgroup/work_group_fwd.h"
#include "fmt/printf.h"
#include "util/phmap/phmap.h"
namespace starrocks {
@ -158,6 +159,7 @@ public:
for (auto& op : _operators) {
_operator_stages[op->get_id()] = OperatorStage::INIT;
}
_driver_name = fmt::sprintf("driver_%d_%d", _source_node_id, _driver_id);
}
PipelineDriver(const PipelineDriver& driver)
@ -403,6 +405,7 @@ private:
// The default value -1 means no source
int32_t _source_node_id = -1;
int32_t _driver_id;
std::string _driver_name;
DriverAcct _driver_acct;
// The first one is source operator
MorselQueue* _morsel_queue = nullptr;

View File

@ -2,11 +2,12 @@
#include "exec/pipeline/scan/chunk_source.h"
#include <random>
#include "common/statusor.h"
#include "exec/pipeline/scan/balanced_chunk_buffer.h"
#include "exec/workgroup/work_group.h"
#include "runtime/runtime_state.h"
namespace starrocks::pipeline {
ChunkSource::ChunkSource(int32_t scan_operator_id, RuntimeProfile* runtime_profile, MorselPtr&& morsel,

View File

@ -333,10 +333,10 @@ Status ScanOperator::_trigger_next_scan(RuntimeState* state, int chunk_source_in
SCOPED_SET_TRACE_INFO(driver_id, state->query_id(), state->fragment_instance_id());
SCOPED_THREAD_LOCAL_MEM_TRACKER_SETTER(state->instance_mem_tracker());
[[maybe_unused]] std::string category = "chunk_source_" + std::to_string(chunk_source_index);
QUERY_TRACE_ASYNC_START("io_task", category, query_trace_ctx);
auto& chunk_source = _chunk_sources[chunk_source_index];
[[maybe_unused]] std::string category;
category = fmt::sprintf("chunk_source_%d_0x%x", get_plan_node_id(), query_trace_ctx.id);
QUERY_TRACE_ASYNC_START("io_task", category, query_trace_ctx);
DeferOp timer_defer([chunk_source]() {
COUNTER_SET(chunk_source->scan_timer(),

View File

@ -27,13 +27,14 @@ QueryTraceEvent QueryTraceEvent::create(const std::string& name, const std::stri
event.instance_id = instance_id;
event.driver = driver;
event.args = std::move(args);
event.thread_id = std::this_thread::get_id();
return event;
}
QueryTraceEvent QueryTraceEvent::create_with_ctx(const std::string& name, const std::string& category, int64_t id,
char phase, const QueryTraceContext& ctx) {
return create(name, category, id, phase, MonotonicMicros() - ctx.start_ts, -1, ctx.fragment_instance_id, ctx.driver,
{});
return create(name, category, id, phase, MonotonicMicros() - ctx.start_ts, QueryTraceContext::DEFAULT_EVENT_ID,
ctx.fragment_instance_id, ctx.driver, {});
}
QueryTraceEvent QueryTraceEvent::create_with_ctx(const std::string& name, const std::string& category, int64_t id,
@ -43,18 +44,20 @@ QueryTraceEvent QueryTraceEvent::create_with_ctx(const std::string& name, const
}
static const char* kSimpleEventFormat =
R"({"cat":"%s","name":"%s","pid":"%ld","tid":"%ld","id":"%ld","ts":%ld,"ph":"%c","args":%s})";
R"({"cat":"%s","name":"%s","pid":"0x%x","tid":"0x%x","id":"0x%x","ts":%ld,"ph":"%c","args":%s,"tidx":"0x%x"})";
static const char* kCompleteEventFormat =
R"({"cat":"%s","name":"%s","pid":"%ld","tid":"%ld","id":"%ld","ts":%ld,"dur":%ld,"ph":"%c","args":%s})";
R"({"cat":"%s","name":"%s","pid":"0x%x","tid":"0x%x","id":"0x%x","ts":%ld,"dur":%ld,"ph":"%c","args":%s,"tidx":"0x%x"})";
std::string QueryTraceEvent::to_string() {
std::string args_str = args_to_string();
size_t tidx = std::hash<std::thread::id>{}(thread_id);
if (phase == 'X') {
return fmt::sprintf(kCompleteEventFormat, category.c_str(), name.c_str(), instance_id, (int64_t)driver, id,
timestamp, duration, phase, args_str.c_str());
return fmt::sprintf(kCompleteEventFormat, category.c_str(), name.c_str(), (uint64_t)instance_id,
(uint64_t)driver, id, timestamp, duration, phase, args_str.c_str(), tidx);
} else {
return fmt::sprintf(kSimpleEventFormat, category.c_str(), name.c_str(), instance_id, (int64_t)driver, id,
timestamp, phase, args_str.c_str());
return fmt::sprintf(kSimpleEventFormat, category.c_str(), name.c_str(), (uint64_t)instance_id, (uint64_t)driver,
id, timestamp, phase, args_str.c_str(), tidx);
}
}
@ -112,25 +115,25 @@ Status QueryTrace::dump() {
return Status::OK();
}
static const char* kProcessNameMetaEventFormat =
"{\"name\":\"process_name\",\"ph\":\"M\",\"pid\":\"%ld\",\"args\":{\"name\":\"%s\"}}";
"{\"name\":\"process_name\",\"ph\":\"M\",\"pid\":\"0x%x\",\"args\":{\"name\":\"%s\"}}";
static const char* kThreadNameMetaEventFormat =
"{\"name\":\"thread_name\",\"ph\":\"M\",\"pid\":\"%ld\",\"tid\":\"%ld\",\"args\":{\"name\":\"%s\"}}";
"{\"name\":\"thread_name\",\"ph\":\"M\",\"pid\":\"0x%x\",\"tid\":\"0x%x\",\"args\":{\"name\":\"%s\"}}";
try {
std::filesystem::create_directory(starrocks::config::query_debug_trace_dir);
std::string file_name =
fmt::format("{}/{}.json", starrocks::config::query_debug_trace_dir, print_id(_query_id));
std::ofstream oss(file_name.c_str(), std::ios::out | std::ios::binary);
oss << "{\"traceEvents\":[";
oss << "{\"traceEvents\":[\n";
bool is_first = true;
for (auto& [fragment_id, driver_set] : _fragment_drivers) {
std::string fragment_id_str = print_id(fragment_id);
oss << (is_first ? "" : ",\n");
oss << fmt::sprintf(kProcessNameMetaEventFormat, fragment_id.lo, fragment_id_str.c_str());
oss << fmt::sprintf(kProcessNameMetaEventFormat, (uint64_t)fragment_id.lo, fragment_id_str.c_str());
is_first = false;
for (auto& driver : *driver_set) {
starrocks::pipeline::DriverRawPtr ptr = reinterpret_cast<starrocks::pipeline::DriverRawPtr>(driver);
oss << (is_first ? "" : ",\n");
oss << fmt::sprintf(kThreadNameMetaEventFormat, fragment_id.lo, (int64_t)driver,
oss << fmt::sprintf(kThreadNameMetaEventFormat, (uint64_t)fragment_id.lo, (uint64_t)driver,
ptr->get_name().c_str());
}
}
@ -142,7 +145,7 @@ Status QueryTrace::dump() {
oss << iter.to_string();
}
}
oss << "]}";
oss << "\n]}";
oss.close();
} catch (std::exception& e) {
@ -179,8 +182,9 @@ ScopedTracer::ScopedTracer(std::string name, std::string category)
ScopedTracer::~ScopedTracer() {
if (tls_trace_ctx.event_buffer != nullptr) {
_duration = MonotonicMicros() - _start_ts;
tls_trace_ctx.event_buffer->add(QueryTraceEvent::create_with_ctx(
_name, _category, -1, 'X', _start_ts - tls_trace_ctx.start_ts, _duration, tls_trace_ctx));
tls_trace_ctx.event_buffer->add(
QueryTraceEvent::create_with_ctx(_name, _category, QueryTraceContext::DEFAULT_EVENT_ID, 'X',
_start_ts - tls_trace_ctx.start_ts, _duration, tls_trace_ctx));
}
}

View File

@ -32,6 +32,7 @@ struct QueryTraceEvent {
int64_t instance_id;
// driver pointer
std::uintptr_t driver;
std::thread::id thread_id;
std::vector<std::pair<std::string, std::string>> args;
std::string to_string();
@ -105,17 +106,19 @@ private:
};
struct QueryTraceContext {
static constexpr int64_t DEFAULT_EVENT_ID = 0;
int64_t start_ts = -1;
int64_t fragment_instance_id = -1;
std::uintptr_t driver = 0;
int64_t id = -1; // used for async event
int64_t id = DEFAULT_EVENT_ID; // used for async event.
EventBuffer* event_buffer = nullptr;
void reset() {
start_ts = -1;
fragment_instance_id = -1;
driver = 0;
id = -1;
id = DEFAULT_EVENT_ID;
event_buffer = nullptr;
}
};
@ -123,7 +126,7 @@ struct QueryTraceContext {
inline thread_local QueryTraceContext tls_trace_ctx;
#define INTERNAL_CREATE_EVENT_WITH_CTX(name, category, phase, ctx) \
starrocks::debug::QueryTraceEvent::create_with_ctx(name, category, -1, phase, ctx)
starrocks::debug::QueryTraceEvent::create_with_ctx(name, category, ctx.DEFAULT_EVENT_ID, phase, ctx)
#define INTERNAL_CREATE_ASYNC_EVENT_WITH_CTX(name, category, id, phase, ctx) \
starrocks::debug::QueryTraceEvent::create_with_ctx(name, category, id, phase, ctx)

131
tools/query-trace-thread-pov.py Executable file
View File

@ -0,0 +1,131 @@
#!/usr/bin/env python
# This file is licensed under the Elastic License 2.0. Copyright 2021-present, StarRocks Inc.
# This tool is to change POV of trace graph, from pipeline driver to thread.
# usage:
# ./query-trace-thread-pov.py < pipeline-driver.trace > thread.trace // use pipe
# ./query-trace-thread-pov.py --input pipeline-driver.trace --output thread.trace // use file
# ./query-trace-thread-pov.py < pipeline-driver.trace | gzip > thread.trace.gz // use pipe and compress
import json
import sys
def load_events(fh):
for s in fh:
s = s.strip()
if not s: continue
if s.startswith('{'):
if s.startswith('{"traceEvents"'): continue
if s.startswith('{}'): continue
if s.endswith(','):
s = s[:-1]
yield json.loads(s)
def remove_fields(d):
for f in ('id', 'tidx', 'args'):
if f in d:
del d[f]
def fix_events(fh, events, prefilters, postfilters):
io_tid_set = set()
exec_tid_set = set()
fh.write('{"traceEvents":[\n')
# pid: fragment instance id low bits.
# tid: pipeline driver pointer.
def fix_io_task(ev):
d = ev.copy()
d.update({'pid': '0', 'tid': d['tidx'], 'name': d['cat']})
d['ph'] = d['ph'].upper()
d['cat'] = d['tid']
# id: chunk source pointer.
return d
def fix_driver(ev):
d = ev.copy()
d.update({'pid': 0, 'tid': d['tidx'], 'name': d['cat'] + '_' + d['tid']})
d['cat'] = d['tid']
return d
def fix_operator(ev):
d = ev.copy()
d.update({'pid': 0, 'tid': d['tidx']})
return d
for ev in events:
name = ev.get('name')
cat = ev.get('cat')
d = None
if any(f(ev) for f in prefilters): continue
if name == 'io_task':
d = fix_io_task(ev)
io_tid_set.add(d['tid'])
else:
if name == 'process' and cat.startswith('driver'):
d = fix_driver(ev)
elif cat in ('pull_chunk', 'push_chunk'):
d = fix_operator(ev)
if d:
exec_tid_set.add(d['tid'])
if not d:
continue
remove_fields(d)
if any(f(ev) for f in postfilters): continue
fh.write(json.dumps(d) + ',\n')
headers = []
headers.append(dict(name='process_name', ph='M', pid=0, args={'name': 'starrocks_be'}))
for tid in io_tid_set:
d = dict(name='thread_name', ph="M", pid=0, tid=tid, args={"name": "IOThread-%s" % tid})
headers.append(d)
for tid in exec_tid_set:
d = dict(name='thread_name', ph="M", pid=0, tid=tid, args={"name": "ExecThread-%s" % tid})
headers.append(d)
for d in headers:
fh.write(json.dumps(d) + ',\n')
fh.write('{}]}')
def drill_events(fh, events, text):
fh.write('{"traceEvents":[\n')
for ev in events:
if ev['ph'] == 'M' or ev['name'].find(text) != -1:
fh.write(json.dumps(ev) + ',\n')
fh.write('{}]}')
def main():
import argparse
parser = argparse.ArgumentParser()
parser.add_argument('--input', help='input file')
parser.add_argument('--output', help='output file')
parser.add_argument('-v', action='store_true', help='output detailed version')
parser.add_argument('-c', action='store_true', help='compress output file')
parser.add_argument('--drill', help='to drill down a single operator by name')
args = parser.parse_args()
fin = sys.stdin
if args.input:
fin = open(args.input)
fout = sys.stdout
if args.output:
fout = open(args.output, 'w')
prefilters = []
postfilters = []
if not args.v:
prefilters.append(lambda x: x.get('cat') in ('push_chunk', 'pull_chunk'))
events = load_events(fin)
if args.drill:
drill_events(fout, events, args.drill)
else:
fix_events(fout, events, prefilters, postfilters)
if __name__ == '__main__':
main()