[Tool] Add cluster status checker before SQL case (#61577)

Signed-off-by: andyziye <yeziyu@starrocks.com>
This commit is contained in:
andyziye 2025-08-05 19:10:24 +08:00 committed by GitHub
parent d8ff13b53b
commit fb44105882
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 224 additions and 142 deletions

View File

@ -52,6 +52,7 @@ import pymysql as _mysql
import requests
from cup import shell
from nose import tools
from nose.plugins.skip import SkipTest
from cup import log
from cup.util import conf as configparser
from requests.auth import HTTPBasicAuth
@ -171,6 +172,8 @@ class StarrocksSQLApiLib(object):
self.arrow_sql_lib = ArrowSqlLib()
self.arrow_port = ""
self.check_status = os.environ.get("check_status", "False") == "True"
# connection pool
self.connection_pool = None
# thread
@ -277,14 +280,14 @@ class StarrocksSQLApiLib(object):
title = f"[{self.run_info}] SQL-Tester crash"
run_link = os.environ.get("WORKFLOW_URL", "")
body = (
"""```\nTest Case:\n %s\n```\n\n ```\nCrash Log: \n%s\n```\n\n```\nSR Version: %s\nBE: %s\nURL: %s\n\n```"""
% (
be_crash_case,
be_crash_log,
cluster_status_dict["version"],
cluster_status_dict["ip"][0],
run_link,
)
"""```\nTest Case:\n %s\n```\n\n ```\nCrash Log: \n%s\n```\n\n```\nSR Version: %s\nBE: %s\nURL: %s\n\n```"""
% (
be_crash_case,
be_crash_log,
cluster_status_dict["version"],
cluster_status_dict["ip"][0],
run_link,
)
)
assignee = os.environ.get("ISSUE_AUTHOR")
repo = os.environ.get("GITHUB_REPOSITORY")
@ -455,7 +458,7 @@ class StarrocksSQLApiLib(object):
_now_conf_key = ""
for arg in args:
_now_conf_key = ".".join(args[:_index + 1])
_now_conf_key = ".".join(args[: _index + 1])
if arg not in _tmp_conf:
if not StarrocksSQLApiLib._instance:
self_print(f"[Miss config] {_now_conf_key}", color=ColorEnum.YELLOW)
@ -483,7 +486,7 @@ class StarrocksSQLApiLib(object):
for var_str in var_strs:
var_str_path = "['" + var_str.replace(".", "']['") + "']"
try:
var_value = eval(f'config_parser{var_str_path}')
var_value = eval(f"config_parser{var_str_path}")
except Exception as e:
self_print(f"[ERROR] config: {var_str} is incorrect!", color=ColorEnum.RED, bold=True)
sys.exit(1)
@ -507,10 +510,7 @@ class StarrocksSQLApiLib(object):
else:
comp_conf_dict = _get_value(config_parser, "client", each_comp)
self.component_status[each_comp] = {
"status": None,
"keys": list(comp_conf_dict.keys())
}
self.component_status[each_comp] = {"status": None, "keys": list(comp_conf_dict.keys())}
for com_k, com_v in comp_conf_dict.items():
if str(com_v).strip() == "" and "passwd" not in com_k.lower() and "passwd" not in com_k.lower():
@ -564,7 +564,7 @@ class StarrocksSQLApiLib(object):
each_env_value = os.environ.get(each_env_key, "")
else:
# save secrets info
if 'aws' in each_env_key or 'oss_' in each_env_key:
if "aws" in each_env_key or "oss_" in each_env_key:
SECRET_INFOS[each_env_key] = each_env_value
self.__setattr__(each_env_key, each_env_value)
@ -609,19 +609,11 @@ class StarrocksSQLApiLib(object):
self.trino_lib.connect(trino_dict)
def connect_spark(self):
spark_dict = {
"host": self.spark_host,
"port": self.spark_port,
"user": self.spark_user
}
spark_dict = {"host": self.spark_host, "port": self.spark_port, "user": self.spark_user}
self.spark_lib.connect(spark_dict)
def connect_hive(self):
hive_dict = {
"host": self.hive_host,
"port": self.hive_port,
"user": self.hive_user
}
hive_dict = {"host": self.hive_host, "port": self.hive_port, "user": self.hive_user}
self.hive_lib.connect(hive_dict)
def close_starrocks(self):
@ -812,7 +804,7 @@ class StarrocksSQLApiLib(object):
@timeout(
QUERY_TIMEOUT,
timeout_exception=AssertionError,
exception_message=f"Query TimeoutException(TRINO/SPARK/HIVE): {QUERY_TIMEOUT}s!"
exception_message=f"Query TimeoutException(TRINO/SPARK/HIVE): {QUERY_TIMEOUT}s!",
)
def conn_execute_sql(self, conn, sql):
try:
@ -996,7 +988,7 @@ class StarrocksSQLApiLib(object):
if regex.match(cmd):
# set variable
var = regex.match(cmd).group()
cmd = cmd[len(var):]
cmd = cmd[len(var) :]
var = var[:-1]
match_words: list = re.compile("\\${([^}]*)}").findall(cmd)
@ -1014,8 +1006,9 @@ class StarrocksSQLApiLib(object):
# only replace the first keywords
if each_keyword in self.thread_var[thread_key]:
each_keyword_value = each_word.replace(each_keyword,
f"self.thread_var[{thread_key}][{each_keyword}]")
each_keyword_value = each_word.replace(
each_keyword, f"self.thread_var[{thread_key}][{each_keyword}]"
)
if unfold:
each_keyword_value = str(eval(each_keyword_value))
@ -1023,7 +1016,7 @@ class StarrocksSQLApiLib(object):
pass
cmd = cmd.replace(f"${{{each_word}}}", each_keyword_value)
log.info(f'Replace {each_keyword}{each_keyword_value}, {cmd}')
log.info(f"Replace {each_keyword}{each_keyword_value}, {cmd}")
match_words.remove(each_word)
@ -1045,7 +1038,7 @@ class StarrocksSQLApiLib(object):
pass
cmd = cmd.replace("${%s}" % each_word, each_keyword_value)
log.info(f'Replace {each_keyword}{each_keyword_value}, {cmd}')
log.info(f"Replace {each_keyword}{each_keyword_value}, {cmd}")
match_words.remove(each_word)
@ -1098,7 +1091,7 @@ class StarrocksSQLApiLib(object):
this_res = []
self.thread_var[exec_id] = {}
_t_info = exec_id[:str(exec_id).rindex("-")]
_t_info = exec_id[: str(exec_id).rindex("-")]
_t_exec_id = exec_id.split("-")[-1]
if init_db:
@ -1107,18 +1100,19 @@ class StarrocksSQLApiLib(object):
for _cmd_id, _each_cmd in enumerate(cmd_list):
uncheck = False
_cmd_id_str = f'Thread-{exec_id}.{_cmd_id}'
_cmd_id_str = f"Thread-{exec_id}.{_cmd_id}"
this_res.append(ori_cmd_list[_cmd_id])
prefix_s_count = len(ori_cmd_list[_cmd_id]) - len(ori_cmd_list[_cmd_id].lstrip())
# uncheck flag, owns the highest priority
if _each_cmd.startswith(UNCHECK_FLAG):
uncheck = True
_each_cmd = _each_cmd[len(UNCHECK_FLAG):]
_each_cmd = _each_cmd[len(UNCHECK_FLAG) :]
old_this_res_len = len(this_res)
actual_res, actual_res_log, var, order = self.execute_single_statement(_each_cmd, _cmd_id_str, record_mode,
this_res, var_key=exec_id, conn=conn)
actual_res, actual_res_log, var, order = self.execute_single_statement(
_each_cmd, _cmd_id_str, record_mode, this_res, var_key=exec_id, conn=conn
)
if record_mode:
for new_res_lino in range(old_this_res_len, len(this_res)):
@ -1153,8 +1147,9 @@ class StarrocksSQLApiLib(object):
self.thread_res_log.setdefault(_t_info, [])
self.thread_res_log[_t_info].append(this_res)
def execute_single_statement(self, statement, sql_id, record_mode, res_container: list = None, var_key: str = None,
conn: any = None):
def execute_single_statement(
self, statement, sql_id, record_mode, res_container: list = None, var_key: str = None, conn: any = None
):
"""
execute single statement and return result
"""
@ -1162,7 +1157,7 @@ class StarrocksSQLApiLib(object):
res_container = res_container if res_container is not None else self.res_log
if statement.startswith(TRINO_FLAG):
statement = statement[len(TRINO_FLAG):]
statement = statement[len(TRINO_FLAG) :]
# analyse var set
var, statement = self.analyse_var(statement, thread_key=var_key)
@ -1179,7 +1174,7 @@ class StarrocksSQLApiLib(object):
actual_res, actual_res_log = self.pretreatment_res(actual_res)
elif statement.startswith(SPARK_FLAG):
statement = statement[len(SPARK_FLAG):]
statement = statement[len(SPARK_FLAG) :]
# analyse var set
var, statement = self.analyse_var(statement, thread_key=var_key)
@ -1196,7 +1191,7 @@ class StarrocksSQLApiLib(object):
actual_res, actual_res_log = self.pretreatment_res(actual_res)
elif statement.startswith(HIVE_FLAG):
statement = statement[len(HIVE_FLAG):]
statement = statement[len(HIVE_FLAG) :]
# analyse var set
var, statement = self.analyse_var(statement, thread_key=var_key)
@ -1214,7 +1209,7 @@ class StarrocksSQLApiLib(object):
# execute command in files
elif statement.startswith(SHELL_FLAG):
statement = statement[len(SHELL_FLAG):]
statement = statement[len(SHELL_FLAG) :]
# analyse var set
var, statement = self.analyse_var(statement, thread_key=var_key)
@ -1229,7 +1224,7 @@ class StarrocksSQLApiLib(object):
elif statement.startswith(FUNCTION_FLAG):
# function invoke
sql = statement[len(FUNCTION_FLAG):]
sql = statement[len(FUNCTION_FLAG) :]
# analyse var set
var, sql = self.analyse_var(sql, thread_key=var_key)
@ -1242,7 +1237,7 @@ class StarrocksSQLApiLib(object):
actual_res_log = ""
elif statement.startswith(ARROW_FLAG):
statement = statement[len(ARROW_FLAG):]
statement = statement[len(ARROW_FLAG) :]
# analyse var set
var, statement = self.analyse_var(statement, thread_key=var_key)
@ -1266,7 +1261,7 @@ class StarrocksSQLApiLib(object):
# order flag
if statement.startswith(ORDER_FLAG):
order = True
statement = statement[len(ORDER_FLAG):]
statement = statement[len(ORDER_FLAG) :]
# analyse var set
var, statement = self.analyse_var(statement, thread_key=var_key)
@ -1319,7 +1314,7 @@ class StarrocksSQLApiLib(object):
# check statement
if each_statement.startswith(CHECK_FLAG):
each_statement = each_statement[len(CHECK_FLAG):].strip()
each_statement = each_statement[len(CHECK_FLAG) :].strip()
# analyse var set
var, each_statement_new = self.analyse_var(each_statement, unfold=False)
@ -1328,9 +1323,10 @@ class StarrocksSQLApiLib(object):
check_result = eval(each_statement_new)
except Exception as e:
self_print(f"[LOOP] Exception: {each_statement}, {e}!", ColorEnum.YELLOW, logout=True)
for self_k in re.findall(r'self.([0-9a-zA-Z_-]+)', each_statement_new):
for self_k in re.findall(r"self.([0-9a-zA-Z_-]+)", each_statement_new):
self_print(
f"{self_k}: %s" % eval(f'self.{self_k}') if self_k in self.__dict__ else "")
f"{self_k}: %s" % eval(f"self.{self_k}") if self_k in self.__dict__ else ""
)
loop_check_res = False
break
@ -1338,19 +1334,24 @@ class StarrocksSQLApiLib(object):
self_print(f"[LOOP CHECK] SUCCESS: {each_statement}!", color=ColorEnum.GREEN, logout=True)
loop_check_res = True
else:
self_print(f"[LOOP CHECK] FAILURE: {each_statement}, result: {check_result}!",
color=ColorEnum.YELLOW, logout=True)
self_print(
f"[LOOP CHECK] FAILURE: {each_statement}, result: {check_result}!",
color=ColorEnum.YELLOW,
logout=True,
)
# print variables in each_statement_new
for self_k in re.findall(r'self.([0-9a-zA-Z_-]+)', each_statement_new):
for self_k in re.findall(r"self.([0-9a-zA-Z_-]+)", each_statement_new):
self_print(
f"{self_k}: %s" % eval(f'self.{self_k}') if self_k in self.__dict__ else "")
f"{self_k}: %s" % eval(f"self.{self_k}") if self_k in self.__dict__ else ""
)
loop_check_res = False
break
else:
# normal statement, loop statement no need to record the result
actual_res, actual_res_log, var, _ = self.execute_single_statement(each_statement, each_stat_id,
False)
actual_res, actual_res_log, var, _ = self.execute_single_statement(
each_statement, each_stat_id, False
)
log.info("[%s.result]: %s" % (sql_id_str, actual_res))
if loop_check_res:
@ -1379,15 +1380,18 @@ class StarrocksSQLApiLib(object):
return
if any(re.compile(condition).search(sql) is not None for condition in skip.skip_res_cmd) or any(
condition in sql for condition in skip.skip_res_cmd
condition in sql for condition in skip.skip_res_cmd
):
log.info("[%s.check] skip check" % sql_id)
return
tmp_ori_sql = ori_sql[len(UNCHECK_FLAG):] if ori_sql.startswith(UNCHECK_FLAG) else ori_sql
tmp_ori_sql = ori_sql[len(UNCHECK_FLAG) :] if ori_sql.startswith(UNCHECK_FLAG) else ori_sql
if tmp_ori_sql.startswith(SHELL_FLAG):
tools.assert_equal(int(exp.split("\n")[0]), act[0],
f"[SHELL ERROR]\n\t- cmd : {sql}\n\t- code: {act[0]}\n\t- msg : {act[1]}")
tools.assert_equal(
int(exp.split("\n")[0]),
act[0],
f"[SHELL ERROR]\n\t- cmd : {sql}\n\t- code: {act[0]}\n\t- msg : {act[1]}",
)
exp_code = exp.split("\n")[0]
exp_std = "\n".join(exp.split("\n")[1:])
@ -1437,12 +1441,12 @@ class StarrocksSQLApiLib(object):
tools.assert_equal(str(exp), str(act))
else:
if exp.startswith(REGEX_FLAG):
log.info("[check regex]: %s" % exp[len(REGEX_FLAG):])
log.info("[check regex]: %s" % exp[len(REGEX_FLAG) :])
tools.assert_regexp_matches(
r"%s" % str(act),
exp[len(REGEX_FLAG):],
exp[len(REGEX_FLAG) :],
"sql result not match regex:\n- [SQL]: %s\n- [exp]: %s\n- [act]: %s\n---"
% (self_print(sql, need_print=False), exp[len(REGEX_FLAG):], act),
% (self_print(sql, need_print=False), exp[len(REGEX_FLAG) :], act),
)
return
@ -1577,7 +1581,7 @@ class StarrocksSQLApiLib(object):
insert_round = 1
while len(new_log) > 0:
current_log = new_log[: min(len(new_log), 65533)]
new_log = new_log[len(current_log):]
new_log = new_log[len(current_log) :]
arg_dict = {
"database_name": T_R_DB,
@ -1783,15 +1787,19 @@ class StarrocksSQLApiLib(object):
break
time.sleep(interval)
times += 1
tools.assert_true(row_count == 0, f"wait db transaction finish error, timeout {timeout_sec}s, row_count={row_count}")
tools.assert_true(
row_count == 0, f"wait db transaction finish error, timeout {timeout_sec}s, row_count={row_count}"
)
def show_routine_load(self, routine_load_task_name):
show_sql = "show routine load for %s" % routine_load_task_name
return self.execute_sql(show_sql, True)
def running_load_count(self, db_name, table_name, load_type):
load_sql = "select count(*) from information_schema.loads where db_name='%s' and table_name='%s' and type='%s' and state not in ('FINISHED','CANCELLED')" % (
db_name, table_name, load_type)
load_sql = (
"select count(*) from information_schema.loads where db_name='%s' and table_name='%s' and type='%s' and state not in ('FINISHED','CANCELLED')"
% (db_name, table_name, load_type)
)
res = self.execute_sql(load_sql, True)
tools.assert_true(res["status"])
return int(res["result"][0][0])
@ -1821,8 +1829,10 @@ class StarrocksSQLApiLib(object):
if running_load_count == 0:
load_finished = True
break
log.info("routine load running loads %d, db: %s, table: %s, job: %s" % (
running_load_count, db_name, table_name, task_name))
log.info(
"routine load running loads %d, db: %s, table: %s, job: %s"
% (running_load_count, db_name, table_name, task_name)
)
time.sleep(3)
count += 1
tools.assert_true(load_finished)
@ -1922,8 +1932,8 @@ class StarrocksSQLApiLib(object):
break
count += 1
tools.assert_equal("CANCELLED", status, "wait alter table cancel error")
def retry_execute_sql(self, sql:str, ori:bool, max_retry_times:int=3, pending_time_ms:int=100):
def retry_execute_sql(self, sql: str, ori: bool, max_retry_times: int = 3, pending_time_ms: int = 100):
"""
execute sql with retry
:param sql: sql to execute
@ -1960,7 +1970,10 @@ class StarrocksSQLApiLib(object):
for _res in results:
last_refresh_state = _res[12]
if last_refresh_state not in TASK_RUN_SUCCESS_STATES:
print("mv %s last refresh state is %s, not in %s" % (mv_name, last_refresh_state, TASK_RUN_SUCCESS_STATES))
print(
"mv %s last refresh state is %s, not in %s"
% (mv_name, last_refresh_state, TASK_RUN_SUCCESS_STATES)
)
return False
return True
@ -2149,13 +2162,13 @@ class StarrocksSQLApiLib(object):
# could be faster if making this loop parallel
for sql in sqls:
if sql.startswith(TRINO_FLAG):
sql = sql[len(TRINO_FLAG):]
sql = sql[len(TRINO_FLAG) :]
res = self.trino_execute_sql(sql)
elif sql.startswith(SPARK_FLAG):
sql = sql[len(SPARK_FLAG):]
sql = sql[len(SPARK_FLAG) :]
res = self.spark_execute_sql(sql)
elif sql.startswith(HIVE_FLAG):
sql = sql[len(HIVE_FLAG):]
sql = sql[len(HIVE_FLAG) :]
res = self.hive_execute_sql(sql)
else:
res = self.execute_sql(sql)
@ -2179,8 +2192,9 @@ class StarrocksSQLApiLib(object):
tools.assert_true(res["status"])
plan = str(res["result"])
for expect in expects:
tools.assert_false(plan.find(expect) > 0,
"assert expect %s should not be found in plan: %s" % (expect, plan))
tools.assert_false(
plan.find(expect) > 0, "assert expect %s should not be found in plan: %s" % (expect, plan)
)
def wait_alter_table_finish(self, alter_type="COLUMN", off=9):
"""
@ -2338,8 +2352,8 @@ out.append("${{dictMgr.NO_DICT_STRING_COLUMNS.contains(cid)}}")
return
else:
if (
res["msg"][1].find("EsTable metadata has not been synced, Try it later") == -1
and res["msg"][1].find("metadata failure: null") == -1
res["msg"][1].find("EsTable metadata has not been synced, Try it later") == -1
and res["msg"][1].find("metadata failure: null") == -1
):
log.info("==========check success: es table metadata is ready==========")
return
@ -2351,15 +2365,15 @@ out.append("${{dictMgr.NO_DICT_STRING_COLUMNS.contains(cid)}}")
def _stream_load(self, label, database_name, table_name, filepath, headers=None, meta_sync=True):
""" """
url = (
"http://"
+ self.mysql_host
+ ":"
+ self.http_port
+ "/api/"
+ database_name
+ "/"
+ table_name
+ "/_stream_load"
"http://"
+ self.mysql_host
+ ":"
+ self.http_port
+ "/api/"
+ database_name
+ "/"
+ table_name
+ "/_stream_load"
)
params = [
"curl",
@ -2623,6 +2637,24 @@ out.append("${{dictMgr.NO_DICT_STRING_COLUMNS.contains(cid)}}")
return backends
def check_cluster_status(self):
"""Check the cluster status."""
if not self.check_status:
return
err_msg = ""
res = self.execute_sql("show backends;", ori=True)
tools.assert_true(res["status"], res["msg"])
for row in res["result"]:
be_alive_str = str(row[8])
if be_alive_str != "true":
err_msg += f"BE({row[1]}:{row[4]}) is not alive.\n"
if err_msg != "":
raise SkipTest(f"cluster status check failed, {err_msg}")
def update_be_config(self, key, value):
"""Update the config to all the backends."""
backends = self._get_backend_http_endpoints()
@ -2716,8 +2748,9 @@ out.append("${{dictMgr.NO_DICT_STRING_COLUMNS.contains(cid)}}")
sql2 = "explain %s" % query2
res1 = self.execute_sql(sql1, True)
res2 = self.execute_sql(sql2, True)
tools.assert_true(res1 == res2,
"assert two plans are different, plan1: {}, plan2: {}".format(res1["result"], res2["result"]))
tools.assert_true(
res1 == res2, "assert two plans are different, plan1: {}, plan2: {}".format(res1["result"], res2["result"])
)
def assert_explain_contains(self, query, *expects):
"""
@ -2746,12 +2779,13 @@ out.append("${{dictMgr.NO_DICT_STRING_COLUMNS.contains(cid)}}")
"""
sql = "explain verbose %s" % (query)
res = self.execute_sql(sql, True)
tools.assert_true(res["status"], res['msg'])
tools.assert_true(res["status"], res["msg"])
for expect in expects:
plan_string = "\n".join(item[0] for item in res["result"])
tools.assert_true(plan_string.find(expect) > 0,
"verbose plan of sql (%s) assert expect %s is not found in plan: %s" % (
query, expect, plan_string))
tools.assert_true(
plan_string.find(expect) > 0,
"verbose plan of sql (%s) assert expect %s is not found in plan: %s" % (query, expect, plan_string),
)
def assert_explain_costs_contains(self, query, *expects):
"""
@ -2761,8 +2795,10 @@ out.append("${{dictMgr.NO_DICT_STRING_COLUMNS.contains(cid)}}")
res = self.execute_sql(sql, True)
for expect in expects:
plan_string = "\n".join(item[0] for item in res["result"])
tools.assert_true(str(res["result"]).find(expect) > 0,
"assert expect %s is not found in plan:\n %s" % (expect, plan_string))
tools.assert_true(
str(res["result"]).find(expect) > 0,
"assert expect %s is not found in plan:\n %s" % (expect, plan_string),
)
def assert_show_stats_meta_contains(self, predicate, *expects):
"""
@ -2773,8 +2809,10 @@ out.append("${{dictMgr.NO_DICT_STRING_COLUMNS.contains(cid)}}")
for expect in expects:
# Concatenate all tuples in res['result'] into a single string
meta_string = "\n".join("\t".join(item) for item in res["result"])
tools.assert_true(str(res["result"]).find(expect) > 0,
"assert expect %s is not found in show stats meta:\n %s" % (expect, meta_string))
tools.assert_true(
str(res["result"]).find(expect) > 0,
"assert expect %s is not found in show stats meta:\n %s" % (expect, meta_string),
)
def assert_trace_values_contains(self, query, *expects):
"""
@ -2844,17 +2882,17 @@ out.append("${{dictMgr.NO_DICT_STRING_COLUMNS.contains(cid)}}")
plan = res["result"]
tools.assert_true(len(plan) > 0, "explain sql result is empty")
lines = str(plan).split('\n')
lines = str(plan).split("\n")
# Search for the table scan node for the specified table
for line in lines:
if f'TABLE: {table_name}'.lower() in line.lower():
if f"TABLE: {table_name}".lower() in line.lower():
# Now look forward in the following lines for the partitions information
idx = lines.index(line)
for i in range(idx, min(idx + 10, len(lines))): # Check next 10 lines max
if 'partitions=' in lines[i]:
parts = lines[i].split('partitions=')
if "partitions=" in lines[i]:
parts = lines[i].split("partitions=")
if len(parts) > 1:
partition_info = parts[1].split(',')[0].replace('\'', '').strip()
partition_info = parts[1].split(",")[0].replace("'", "").strip()
return partition_info
def assert_table_partitions_num(self, table_name, expect_num):
@ -2866,7 +2904,7 @@ out.append("${{dictMgr.NO_DICT_STRING_COLUMNS.contains(cid)}}")
def wait_table_rowcount_not_empty(self, table, max_times=300):
times = 0
rc = 0
sql = 'show partitions from ' + table
sql = "show partitions from " + table
while times < max_times:
result = self.execute_sql(sql, True)
if len(result["result"]) > 0:
@ -2882,13 +2920,18 @@ out.append("${{dictMgr.NO_DICT_STRING_COLUMNS.contains(cid)}}")
"""
Check cache select is success, make sure that read_cache_size + write_cache_size > 0
"""
res = self.execute_sql(query, True, )
res = self.execute_sql(
query,
True,
)
result = res["result"][0]
# remove unit
read_cache_size = int(result[0].replace("B", "").replace("KB", ""))
write_cache_size = int(result[1].replace("B", "").replace("KB", ""))
tools.assert_true(read_cache_size + write_cache_size > 0,
"cache select is failed, read_cache_size + write_cache_size must larger than 0 bytes")
tools.assert_true(
read_cache_size + write_cache_size > 0,
"cache select is failed, read_cache_size + write_cache_size must larger than 0 bytes",
)
def assert_show_stats_meta(self, sql=None, exp=None):
if sql is None:
@ -2914,12 +2957,12 @@ out.append("${{dictMgr.NO_DICT_STRING_COLUMNS.contains(cid)}}")
processed_row.append(value)
act.append(processed_row)
log.info("[check regex]: %s" % exp[len(REGEX_FLAG):])
log.info("[check regex]: %s" % exp[len(REGEX_FLAG) :])
tools.assert_regexp_matches(
r"%s" % str(act),
exp[len(REGEX_FLAG):],
exp[len(REGEX_FLAG) :],
"sql result not match regex:\n- [SQL]: %s\n- [exp]: %s\n- [act]: %s\n---"
% (self_print(sql, need_print=True), exp[len(REGEX_FLAG):], act),
% (self_print(sql, need_print=True), exp[len(REGEX_FLAG) :], act),
)
@staticmethod

View File

@ -43,7 +43,7 @@ def print_help():
"""help"""
print(
"""
python run.py [-d dirname/file] [-r] [-l] [-c ${concurrency}] [-t ${time}] [-a ${attr}] [-C ${cluster_type}] [--file_filter=${regex}] [--case_filter=${regex}]
python run.py [-d dirname/file] [-r] [-l] [-c ${concurrency}] [-t ${time}] [-a ${attr}] [-C ${cluster_type}] [--file_filter=${regex}] [--case_filter=${regex}] [--check-status]
-d|--dir Case dirname|filename, default "./sql"
-r|--record SQL record mode, run cases in T and generate R
-v|--validate [DEFAULT] SQL validate mode, run cases in R, and check the results
@ -60,6 +60,7 @@ python run.py [-d dirname/file] [-r] [-l] [-c ${concurrency}] [-t ${time}] [-a $
--keep_alive Check cluster status before each case, only works with sequential mode(-c=1)
--run_info Extra info
--arrow Only run the arrow protocol
--check-status Check cluster status before run cases
"""
)
@ -80,6 +81,7 @@ if __name__ == "__main__":
config = "conf/sr.conf"
keep_alive = False
run_info = ""
check_status = False
args = "hld:rvc:t:x:y:pa:C:"
detail_args = [
@ -100,7 +102,8 @@ if __name__ == "__main__":
"keep_alive",
"run_info=",
"log_filtered",
"arrow"
"arrow",
"check-status",
]
case_dir = None
@ -178,11 +181,14 @@ if __name__ == "__main__":
if opt == "--arrow":
arrow_mode = True
if opt == "--check-status":
check_status = True
# merge cluster info to attr
cluster_attr = "!cloud" if cluster == "native" else "!native"
attr = f"{attr},{cluster_attr}".strip(",")
# check sequential mode with concurrency=1
if 'sequential' in attr and concurrency != 1:
if "sequential" in attr and concurrency != 1:
print("In sequential mode, set concurrency=1 in default!")
concurrency = 1
# check alive mode with concurrency=1
@ -191,16 +197,20 @@ if __name__ == "__main__":
concurrency = 1
# set environment
os.environ["record_mode"] = "true" if record else "false"
os.environ["sql_dir"] = str(dirname)
os.environ["file_filter"] = file_filter
os.environ["case_filter"] = case_filter
os.environ["attr"] = attr
os.environ["config_path"] = config
os.environ["keep_alive"] = str(keep_alive)
os.environ['run_info'] = run_info
os.environ['log_filtered'] = str(log_filtered)
os.environ["arrow_mode"] = str(arrow_mode)
os.environ.update(
{
"record_mode": "true" if record else "false",
"sql_dir": str(dirname),
"file_filter": file_filter,
"case_filter": case_filter,
"attr": attr,
"config_path": config,
"keep_alive": str(keep_alive),
"run_info": run_info,
"log_filtered": str(log_filtered),
"check_status": str(check_status),
}
)
argv = [
"nosetests",
@ -211,12 +221,14 @@ if __name__ == "__main__":
]
if not skip_reruns and not record:
argv.extend([
"--with-flaky",
"--force-flaky",
"--max-runs=3",
"--min-passes=3",
])
argv.extend(
[
"--with-flaky",
"--force-flaky",
"--max-runs=3",
"--min-passes=3",
]
)
# concurrency
if concurrency <= 0:

View File

@ -88,6 +88,7 @@ class TestSQLCases(sr_sql_lib.StarrocksSQLApiLib):
super().setUp()
self.connect_starrocks()
self.create_starrocks_conn_pool()
self.check_cluster_status()
self._init_global_configs()
def _init_global_configs(self):
@ -232,7 +233,9 @@ class TestSQLCases(sr_sql_lib.StarrocksSQLApiLib):
tools.ok_(False, "Check db uniqueness error!")
error_info_dict = {db: list(cases) for db, cases in all_db_dict.items() if len(cases) > 1}
tools.assert_true(len(error_info_dict) <= 0, "Pre Check Failed, Duplicate DBs: \n%s" % json.dumps(error_info_dict, indent=2))
tools.assert_true(
len(error_info_dict) <= 0, "Pre Check Failed, Duplicate DBs: \n%s" % json.dumps(error_info_dict, indent=2)
)
@staticmethod
def _replace_uuid_variables(sql_list: List) -> List:
@ -308,7 +311,7 @@ class TestSQLCases(sr_sql_lib.StarrocksSQLApiLib):
def _get_resource_name(sql: str) -> str:
matches = list()
if "CREATE EXTERNAL RESOURCE" in sql.upper():
matches = re.findall(r'CREATE EXTERNAL RESOURCE \"?([a-zA-Z0-9_-]+)\"?', sql, flags=re.IGNORECASE)
matches = re.findall(r"CREATE EXTERNAL RESOURCE \"?([a-zA-Z0-9_-]+)\"?", sql, flags=re.IGNORECASE)
return matches[0] if len(matches) > 0 else ""
# -------------------------------------------
@ -356,8 +359,20 @@ Start to run: %s
for sql_id, sql in enumerate(sql_list):
if arrow_mode and isinstance(sql, str):
sql = sql.strip()
if sql.startswith(("mysql:", "shell:", "--", "function:", "CHECK:", "PROPERTY:", "LOOP", "END LOOP",
"CONCURRENCY", "END CONCURRENCY")):
if sql.startswith(
(
"mysql:",
"shell:",
"--",
"function:",
"CHECK:",
"PROPERTY:",
"LOOP",
"END LOOP",
"CONCURRENCY",
"END CONCURRENCY",
)
):
self_print(f"[arrow_mode] Skip non-arrow SQL: {sql}", ColorEnum.YELLOW)
continue
if not sql.startswith("arrow:"):
@ -375,7 +390,7 @@ Start to run: %s
# uncheck flag, owns the highest priority
if sql.startswith(sr_sql_lib.UNCHECK_FLAG):
uncheck = True
sql = sql[len(sr_sql_lib.UNCHECK_FLAG):]
sql = sql[len(sr_sql_lib.UNCHECK_FLAG) :]
actual_res, actual_res_log, var, order = self.execute_single_statement(sql, sql_id, record_mode)
@ -385,9 +400,11 @@ Start to run: %s
expect_res = case_info.result[sql_id]
expect_res_for_log = expect_res if len(expect_res) < 1000 else expect_res[:1000] + "..."
log.info(f"""[{sql_id}.result]:
log.info(
f"""[{sql_id}.result]:
- [exp]: {expect_res_for_log}
- [act]: {actual_res}""")
- [act]: {actual_res}"""
)
# -------------------------------------------
# [CHECKER]
@ -436,15 +453,17 @@ Start to run: %s
# thread exec, set count in (*)
for _t_exec_id in range(_t_count):
this_t_id = f'{_t_name}-{_t_info_id}-{_t_exec_id}'
this_t_id = f"{_t_name}-{_t_info_id}-{_t_exec_id}"
# init a conn for thread
this_conn = self.connection_pool.connection()
_t_conn_list.append([this_conn, this_t_id])
t = threading.Thread(name=f"Thread-{this_t_id}",
target=self.execute_thread,
args=(this_t_id, _t_cmd, _t_res, _t_ori_cmd, record_mode, _outer_db, this_conn))
t = threading.Thread(
name=f"Thread-{this_t_id}",
target=self.execute_thread,
args=(this_t_id, _t_cmd, _t_res, _t_ori_cmd, record_mode, _outer_db, this_conn),
)
thread_list.append(t)
threading.excepthook = self.custom_except_hook
@ -476,19 +495,28 @@ Start to run: %s
for _t_info_id, _thread in enumerate(t_info_list):
_t_name = _thread["name"]
_t_count = _thread["count"]
_t_uid = f'{_t_name}-{_t_info_id}'
_t_uid = f"{_t_name}-{_t_info_id}"
_t_info_line = _thread["info"]
self.res_log.append(_t_info_line)
# check thread result info
tools.assert_in(_t_uid, self.thread_res_log, f"Thread log of {_t_uid} is not found!")
tools.eq_(len(self.thread_res_log[_t_uid]), _t_count, f"Thread log size: {len(self.thread_res_log[_t_uid])} error, maybe you used the same thread name?")
tools.eq_(
len(self.thread_res_log[_t_uid]),
_t_count,
f"Thread log size: {len(self.thread_res_log[_t_uid])} error, maybe you used the same thread name?",
)
s_thread_log = self.thread_res_log[_t_uid][0]
for exec_res_log in self.thread_res_log[_t_uid]:
if exec_res_log != s_thread_log:
self_print("Thread result of exec not equal: \n - %s\n - %s" % (exec_res_log, s_thread_log), color=ColorEnum.RED, logout=True, bold=True)
self_print(
"Thread result of exec not equal: \n - %s\n - %s" % (exec_res_log, s_thread_log),
color=ColorEnum.RED,
logout=True,
bold=True,
)
self.res_log.extend(s_thread_log)
self.res_log.append("")
@ -496,4 +524,3 @@ Start to run: %s
self_print(f"[CONCURRENCY] SUCCESS!", color=ColorEnum.CYAN, logout=True)
if record_mode:
self.res_log.append("} " + END_CONCURRENCY_FLAG + "\n")