256 lines
9.1 KiB
Python
256 lines
9.1 KiB
Python
#!/usr/bin/env python
|
|
# -*- coding: utf-8 -*-
|
|
|
|
# Licensed to the Apache Software Foundation (ASF) under one
|
|
# or more contributor license agreements. See the NOTICE file
|
|
# distributed with this work for additional information
|
|
# regarding copyright ownership. The ASF licenses this file
|
|
# to you under the Apache License, Version 2.0 (the
|
|
# "License"); you may not use this file except in compliance
|
|
# with the License. You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing,
|
|
# software distributed under the License is distributed on an
|
|
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
|
# KIND, either express or implied. See the License for the
|
|
# specific language governing permissions and limitations
|
|
# under the License.
|
|
|
|
import MySQLdb
|
|
|
|
# NOTE: The default organization of meta info is cascading, we flatten its structure
|
|
# NOTE: We get schema-hash from proc '/dbs/db_id/tbl_id/'index_schema'
|
|
class FeMetaResolver:
|
|
def __init__(self, fe_host, query_port, user, query_pwd, db_candidates, tbl_candidates):
|
|
self.fe_host = fe_host
|
|
self.query_port = query_port
|
|
self.user = user
|
|
self.query_pwd = query_pwd
|
|
self.db_candidates = db_candidates
|
|
self.tbl_candidates = tbl_candidates
|
|
|
|
self.db = None
|
|
self.cur = None
|
|
|
|
self.be_list = []
|
|
self.db_list = []
|
|
# Only base tables, excluding rollups
|
|
self.table_list = []
|
|
# All rollups, including base tables
|
|
self.rollup_map = {}
|
|
self.partition_list = []
|
|
self.index_list = []
|
|
self.tablet_map = {}
|
|
|
|
def init(self):
|
|
self.connect_mysql()
|
|
self.fetch_be_list();
|
|
self.fetch_db_list();
|
|
self.fetch_table_list();
|
|
self.fetch_rollup_map();
|
|
self.fetch_partition_list();
|
|
self.fetch_idx_list();
|
|
self._merge_schema_hash_to_idx_list()
|
|
self.fetch_tablet_list();
|
|
self.close()
|
|
|
|
def connect_mysql(self):
|
|
try:
|
|
self.db = MySQLdb.connect(host=self.fe_host, port=self.query_port,
|
|
user=self.user,
|
|
passwd=self.query_pwd)
|
|
self.cur = self.db.cursor()
|
|
except MySQLdb.Error as e:
|
|
print ("Failed to connect fe server. error %s:%s" % (str(e.args[0]), e.args[1]))
|
|
exit(-1);
|
|
|
|
def exec_sql(self, sql):
|
|
try:
|
|
self.cur.execute(sql)
|
|
except MySQLdb.Error as e:
|
|
print ("exec sql error %s:%s" % (str(e.args[0]), e.args[1]))
|
|
exit(-1);
|
|
|
|
def close(self):
|
|
if self.db.open:
|
|
self.cur.close()
|
|
self.db.close()
|
|
|
|
def fetch_be_list(self):
|
|
show_be_sql = "show backends"
|
|
self.exec_sql(show_be_sql);
|
|
be_list = self.cur.fetchall()
|
|
for be_tuple in be_list :
|
|
be = {}
|
|
be['be_id'] = long(be_tuple[0])
|
|
be['ip'] = be_tuple[2]
|
|
be['http_port'] = be_tuple[5]
|
|
self.be_list.append(be)
|
|
|
|
return
|
|
|
|
def fetch_db_list(self):
|
|
show_database_sql = "show proc \"/dbs\" "
|
|
self.exec_sql(show_database_sql);
|
|
db_list = self.cur.fetchall()
|
|
for db_tuple in db_list :
|
|
db = {}
|
|
if long(db_tuple[0]) <= 0:
|
|
continue
|
|
|
|
db_name = db_tuple[1].split(':')[1].strip()
|
|
if self.db_candidates and (not db_name in self.db_candidates):
|
|
continue
|
|
|
|
db['db_id'] = long(db_tuple[0])
|
|
db['db_name'] = db_tuple[1]
|
|
self.db_list.append(db)
|
|
|
|
def fetch_table_list(self):
|
|
for db in self.db_list:
|
|
self._fetch_tables_by_db(db)
|
|
|
|
def _fetch_tables_by_db(self, db):
|
|
sql = "show proc \"/dbs/%s\" " % db['db_id']
|
|
self.exec_sql(sql);
|
|
table_list = self.cur.fetchall()
|
|
for table_tuple in table_list :
|
|
if self.tbl_candidates and (not table_tuple[1] in self.tbl_candidates):
|
|
continue
|
|
if table_tuple[6] == "OLAP":
|
|
table = {}
|
|
table['db_id'] = db['db_id']
|
|
table['db_name'] = db['db_name']
|
|
table['tbl_id'] = long(table_tuple[0])
|
|
table['tbl_name'] = table_tuple[1]
|
|
self.table_list.append(table)
|
|
return
|
|
|
|
def fetch_rollup_map(self):
|
|
for table in self.table_list:
|
|
self._fetch_rollups_by_table(table);
|
|
|
|
def _fetch_rollups_by_table(self, table):
|
|
sql = "show proc \"/dbs/%s/%s/index_schema\" " % (table['db_id'], table['tbl_id'])
|
|
self.exec_sql(sql);
|
|
index_list = self.cur.fetchall()
|
|
for index_tuple in index_list :
|
|
index = {}
|
|
index['tbl_id'] = table['tbl_id']
|
|
index['tbl_name'] = table['tbl_name']
|
|
index['idx_id'] = long(index_tuple[0])
|
|
index['schema_hash'] = long(index_tuple[3])
|
|
self.rollup_map[index['idx_id']] = index
|
|
return
|
|
|
|
def fetch_partition_list(self):
|
|
for table in self.table_list:
|
|
self._fetch_partitions_by_table(table);
|
|
|
|
def _fetch_partitions_by_table(self, table):
|
|
sql = "show proc \"/dbs/%s/%s/partitions\" " % (table['db_id'], table['tbl_id'])
|
|
self.exec_sql(sql);
|
|
partition_list = self.cur.fetchall()
|
|
for partition_tuple in partition_list :
|
|
partition = {}
|
|
partition['db_id'] = table['db_id']
|
|
partition['db_name'] = table['db_name']
|
|
partition['tbl_id'] = table['tbl_id']
|
|
partition['tbl_name'] = table['tbl_name']
|
|
partition['partition_id'] = long(partition_tuple[0])
|
|
partition['partition_name'] = partition_tuple[1]
|
|
self.partition_list.append(partition)
|
|
return
|
|
|
|
def fetch_idx_list(self):
|
|
for partition in self.partition_list:
|
|
self._fetch_idxes_by_partition(partition);
|
|
|
|
def _fetch_idxes_by_partition(self, partition):
|
|
sql = "show proc \"/dbs/%s/%s/partitions/%s\" " % \
|
|
(partition['db_id'], partition['tbl_id'], partition['partition_id'])
|
|
self.exec_sql(sql);
|
|
index_list = self.cur.fetchall()
|
|
for idx_tuple in index_list :
|
|
idx = {}
|
|
idx['db_id'] = partition['db_id']
|
|
idx['db_name'] = partition['db_name']
|
|
idx['tbl_id'] = partition['tbl_id']
|
|
idx['tbl_name'] = partition['tbl_name']
|
|
idx['partition_id'] = partition['partition_id']
|
|
idx['partition_name'] = partition['partition_name']
|
|
idx['idx_id'] = long(idx_tuple[0])
|
|
idx['idx_name'] = idx_tuple[1]
|
|
idx['idx_state'] = idx_tuple[2]
|
|
self.index_list.append(idx)
|
|
return
|
|
|
|
def _merge_schema_hash_to_idx_list(self):
|
|
for index in self.index_list:
|
|
idx_id = index['idx_id']
|
|
rollup = self.rollup_map.get(idx_id)
|
|
index['schema_hash'] = rollup['schema_hash']
|
|
|
|
def fetch_tablet_list(self):
|
|
for index in self.index_list:
|
|
self._fetch_tablets_by_index(index);
|
|
|
|
def _fetch_tablets_by_index(self, index):
|
|
sql = "show proc \"/dbs/%s/%s/partitions/%s/%s\" " % \
|
|
(index['db_id'], index['tbl_id'], index['partition_id'], index['idx_id'])
|
|
self.exec_sql(sql);
|
|
tablet_list = self.cur.fetchall()
|
|
for tablet_tuple in tablet_list :
|
|
tablet = {}
|
|
tablet['db_id'] = index['db_id']
|
|
tablet['db_name'] = index['db_name']
|
|
tablet['tbl_id'] = index['tbl_id']
|
|
tablet['tbl_name'] = index['tbl_name']
|
|
tablet['partition_id'] = index['partition_id']
|
|
tablet['partition_name'] = index['partition_name']
|
|
tablet['idx_id'] = index['idx_id']
|
|
tablet['idx_name'] = index['idx_name']
|
|
tablet['idx_state'] = index['idx_state']
|
|
tablet['tablet_id'] = long(tablet_tuple[0])
|
|
tablet['replica_id'] = long(tablet_tuple[1])
|
|
tablet['be_id'] = long(tablet_tuple[2])
|
|
tablet['schema_hash'] = index["schema_hash"]
|
|
self.tablet_map[tablet['tablet_id']] = tablet
|
|
return
|
|
|
|
def debug_output(self):
|
|
print "be_list:"
|
|
self._print_list(self.be_list)
|
|
print
|
|
print "database_list:"
|
|
self._print_list(self.db_list)
|
|
print
|
|
print "table_list:"
|
|
self._print_list(self.table_list)
|
|
print
|
|
print "rollup_list:"
|
|
self._print_list(self.rollup_map.values())
|
|
print
|
|
print "partition_list:"
|
|
self._print_list(self.partition_list)
|
|
print
|
|
print "index_list:"
|
|
self._print_list(self.index_list)
|
|
print
|
|
print "tablet_map:(%s), print up to ten here:" % len(self.tablet_map)
|
|
self._print_list(self.tablet_map.values()[0:10])
|
|
print
|
|
|
|
def _print_list(self, one_list):
|
|
for item in one_list:
|
|
print item
|
|
|
|
def get_tablet_by_id(self, tablet_id):
|
|
return self.tablet_map.get(tablet_id)
|
|
|
|
def get_all_tablets(self):
|
|
return self.tablet_map.values()
|
|
|