starrocks/tools/query-trace-thread-pov.py

145 lines
4.4 KiB
Python

#!/usr/bin/env python
# Copyright 2021-present StarRocks, Inc. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https:#www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# This tool is to change POV of trace graph, from pipeline driver to thread.
# usage:
# ./query-trace-thread-pov.py < pipeline-driver.trace > thread.trace // use pipe
# ./query-trace-thread-pov.py --input pipeline-driver.trace --output thread.trace // use file
# ./query-trace-thread-pov.py < pipeline-driver.trace | gzip > thread.trace.gz // use pipe and compress
import json
import sys
def load_events(fh):
for s in fh:
s = s.strip()
if not s: continue
if s.startswith('{'):
if s.startswith('{"traceEvents"'): continue
if s.startswith('{}'): continue
if s.endswith(','):
s = s[:-1]
yield json.loads(s)
def remove_fields(d):
for f in ('id', 'tidx', 'args'):
if f in d:
del d[f]
def fix_events(fh, events, prefilters, postfilters):
io_tid_set = set()
exec_tid_set = set()
fh.write('{"traceEvents":[\n')
# pid: fragment instance id low bits.
# tid: pipeline driver pointer.
def fix_io_task(ev):
d = ev.copy()
d.update({'pid': '0', 'tid': d['tidx'], 'name': d['cat']})
d['ph'] = d['ph'].upper()
d['cat'] = d['tid']
# id: chunk source pointer.
return d
def fix_driver(ev):
d = ev.copy()
d.update({'pid': 0, 'tid': d['tidx'], 'name': d['cat'] + '_' + d['tid']})
d['cat'] = d['tid']
return d
def fix_operator(ev):
d = ev.copy()
d.update({'pid': 0, 'tid': d['tidx']})
return d
for ev in events:
name = ev.get('name')
cat = ev.get('cat')
d = None
if any(f(ev) for f in prefilters): continue
if name == 'io_task':
d = fix_io_task(ev)
io_tid_set.add(d['tid'])
else:
if name == 'process' and cat.startswith('driver'):
d = fix_driver(ev)
elif cat in ('pull_chunk', 'push_chunk'):
d = fix_operator(ev)
if d:
exec_tid_set.add(d['tid'])
if not d:
continue
remove_fields(d)
if any(f(ev) for f in postfilters): continue
fh.write(json.dumps(d) + ',\n')
headers = []
headers.append(dict(name='process_name', ph='M', pid=0, args={'name': 'starrocks_be'}))
for tid in io_tid_set:
d = dict(name='thread_name', ph="M", pid=0, tid=tid, args={"name": "IOThread-%s" % tid})
headers.append(d)
for tid in exec_tid_set:
d = dict(name='thread_name', ph="M", pid=0, tid=tid, args={"name": "ExecThread-%s" % tid})
headers.append(d)
for d in headers:
fh.write(json.dumps(d) + ',\n')
fh.write('{}]}')
def drill_events(fh, events, text):
fh.write('{"traceEvents":[\n')
for ev in events:
if ev['ph'] == 'M' or ev['name'].find(text) != -1:
fh.write(json.dumps(ev) + ',\n')
fh.write('{}]}')
def main():
import argparse
parser = argparse.ArgumentParser()
parser.add_argument('--input', help='input file')
parser.add_argument('--output', help='output file')
parser.add_argument('-v', action='store_true', help='output detailed version')
parser.add_argument('-c', action='store_true', help='compress output file')
parser.add_argument('--drill', help='to drill down a single operator by name')
args = parser.parse_args()
fin = sys.stdin
if args.input:
fin = open(args.input)
fout = sys.stdout
if args.output:
fout = open(args.output, 'w')
prefilters = []
postfilters = []
if not args.v:
prefilters.append(lambda x: x.get('cat') in ('push_chunk', 'pull_chunk'))
events = load_events(fin)
if args.drill:
drill_events(fout, events, args.drill)
else:
fix_events(fout, events, prefilters, postfilters)
if __name__ == '__main__':
main()