[BugFix] Fix query detail lost audit items (#63237)
Signed-off-by: before-Sunrise <unclejyj@gmail.com>
This commit is contained in:
parent
5ae19c32f4
commit
86fdf35bcf
|
|
@ -316,7 +316,6 @@ public class StmtExecutor {
|
|||
private List<ByteBuffer> proxyResultBuffer = null;
|
||||
private ShowResultSet proxyResultSet = null;
|
||||
private PQueryStatistics statisticsForAuditLog;
|
||||
private boolean statisticsConsumed = false;
|
||||
private List<StmtExecutor> subStmtExecutors;
|
||||
private Optional<Boolean> isForwardToLeaderOpt = Optional.empty();
|
||||
private HttpResultSender httpResultSender;
|
||||
|
|
@ -2383,46 +2382,28 @@ public class StmtExecutor {
|
|||
}
|
||||
|
||||
public PQueryStatistics getQueryStatisticsForAuditLog() {
|
||||
// for one StmtExecutor, only consume PQueryStatistics once
|
||||
// so call getQueryStatisticsForAuditLog will return a emtpy PQueryStatistics if this is not the first call
|
||||
if (statisticsConsumed) {
|
||||
// create a empty PQueryStatistics
|
||||
PQueryStatistics stats = normalizeQueryStatistics(null);
|
||||
statisticsForAuditLog = stats;
|
||||
return stats;
|
||||
if (statisticsForAuditLog == null && coord != null) {
|
||||
statisticsForAuditLog = coord.getAuditStatistics();
|
||||
}
|
||||
|
||||
PQueryStatistics stats = statisticsForAuditLog;
|
||||
if (stats == null && coord != null) {
|
||||
// for insert stmt
|
||||
stats = coord.getAuditStatistics();
|
||||
if (statisticsForAuditLog == null) {
|
||||
statisticsForAuditLog = new PQueryStatistics();
|
||||
}
|
||||
|
||||
stats = normalizeQueryStatistics(stats);
|
||||
|
||||
statisticsForAuditLog = stats;
|
||||
statisticsConsumed = true;
|
||||
return stats;
|
||||
}
|
||||
|
||||
private PQueryStatistics normalizeQueryStatistics(PQueryStatistics stats) {
|
||||
PQueryStatistics normalized = (stats != null) ? stats : new PQueryStatistics();
|
||||
if (normalized.scanBytes == null) {
|
||||
normalized.scanBytes = 0L;
|
||||
if (statisticsForAuditLog.scanBytes == null) {
|
||||
statisticsForAuditLog.scanBytes = 0L;
|
||||
}
|
||||
if (normalized.scanRows == null) {
|
||||
normalized.scanRows = 0L;
|
||||
if (statisticsForAuditLog.scanRows == null) {
|
||||
statisticsForAuditLog.scanRows = 0L;
|
||||
}
|
||||
if (normalized.cpuCostNs == null) {
|
||||
normalized.cpuCostNs = 0L;
|
||||
if (statisticsForAuditLog.cpuCostNs == null) {
|
||||
statisticsForAuditLog.cpuCostNs = 0L;
|
||||
}
|
||||
if (normalized.memCostBytes == null) {
|
||||
normalized.memCostBytes = 0L;
|
||||
if (statisticsForAuditLog.memCostBytes == null) {
|
||||
statisticsForAuditLog.memCostBytes = 0L;
|
||||
}
|
||||
if (normalized.spillBytes == null) {
|
||||
normalized.spillBytes = 0L;
|
||||
if (statisticsForAuditLog.spillBytes == null) {
|
||||
statisticsForAuditLog.spillBytes = 0L;
|
||||
}
|
||||
return normalized;
|
||||
return statisticsForAuditLog;
|
||||
}
|
||||
|
||||
public void handleInsertOverwrite(InsertStmt insertStmt) throws Exception {
|
||||
|
|
@ -2985,7 +2966,6 @@ public class StmtExecutor {
|
|||
GlobalStateMgr.getCurrentState().getOperationListenerBus()
|
||||
.onDMLStmtJobTransactionFinish(txnState, database, targetTable, dmlType);
|
||||
}
|
||||
recordExecStatsIntoContext();
|
||||
}
|
||||
|
||||
String errMsg = "";
|
||||
|
|
|
|||
|
|
@ -3247,3 +3247,239 @@ out.append("${{dictMgr.NO_DICT_STRING_COLUMNS.contains(cid)}}")
|
|||
result_str = column_separator.join(column_values)
|
||||
log.info(f"Final result: {result_str}")
|
||||
return result_str
|
||||
|
||||
def get_timestamp_ms(self):
|
||||
"""
|
||||
Get current timestamp in milliseconds
|
||||
"""
|
||||
import time
|
||||
timestamp_ms = int(time.time() * 1000)
|
||||
# print(f"Generated timestamp: {timestamp_ms}")
|
||||
return timestamp_ms
|
||||
|
||||
def get_last_query_id(self):
|
||||
"""
|
||||
Get the query_id of the last query
|
||||
"""
|
||||
sql = "select last_query_id()"
|
||||
result = self.execute_sql(sql, True)
|
||||
|
||||
if not result["status"]:
|
||||
# print(f"Failed to get last query id: {result}")
|
||||
return None
|
||||
|
||||
if "result" not in result or len(result["result"]) == 0:
|
||||
# print("No query id found")
|
||||
return None
|
||||
|
||||
query_id = result["result"][0][0]
|
||||
# print(f"Last query id: {query_id}")
|
||||
return query_id
|
||||
|
||||
def get_query_detail_by_api(self, timestamp_ms, query_id):
|
||||
"""
|
||||
Get query detail information through API
|
||||
"""
|
||||
import json
|
||||
import subprocess
|
||||
|
||||
# Build API URL and curl command
|
||||
api_url = f"http://{self.mysql_host}:{self.http_port}/api/query_detail?event_time={timestamp_ms}"
|
||||
# print(f"API URL: {api_url}")
|
||||
# print(f"Looking for query_id: {query_id}")
|
||||
|
||||
try:
|
||||
# Use curl command to send HTTP request
|
||||
cmd = f"curl -s --location-trusted -u {self.mysql_user}:{self.mysql_password} '{api_url}'"
|
||||
# print(f"Curl command: {cmd}")
|
||||
|
||||
result = subprocess.run(
|
||||
cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, encoding="utf-8", timeout=30, shell=True
|
||||
)
|
||||
|
||||
if result.returncode != 0:
|
||||
# print(
|
||||
# f"Curl command failed with return code {result.returncode}")
|
||||
# print(f"Stderr: {result.stderr}")
|
||||
return None
|
||||
|
||||
if not result.stdout.strip():
|
||||
# print("Empty response from API")
|
||||
return None
|
||||
|
||||
# print(f"Raw API response: {result.stdout[:500]}...") # Only show first 500 characters
|
||||
|
||||
query_details = json.loads(result.stdout)
|
||||
# print(f"Retrieved {len(query_details)} query details")
|
||||
# print(
|
||||
# f"Available query_ids: {[detail.get('queryId') for detail in query_details]}")
|
||||
|
||||
# Find matching query_id, prioritize records with FINISHED state
|
||||
finished_detail = None
|
||||
other_detail = None
|
||||
|
||||
for detail in query_details:
|
||||
if detail.get("queryId") == query_id:
|
||||
state = detail.get("state", "")
|
||||
# print(
|
||||
# f"Found query detail for query_id: {query_id}, state: {state}")
|
||||
if state == "FINISHED":
|
||||
finished_detail = detail
|
||||
else:
|
||||
other_detail = detail
|
||||
|
||||
# Return FINISHED state record first, otherwise return other state record
|
||||
if finished_detail:
|
||||
# print(f"Using FINISHED query detail for query_id: {query_id}")
|
||||
return finished_detail
|
||||
elif other_detail:
|
||||
# print(
|
||||
# f"Using non-FINISHED query detail for query_id: {query_id}, state: {other_detail.get('state')}")
|
||||
return other_detail
|
||||
else:
|
||||
# print(f"No query detail found for query_id: {query_id}")
|
||||
# print(
|
||||
# f"Available query_ids: {[detail.get('queryId') for detail in query_details]}")
|
||||
return None
|
||||
|
||||
except json.JSONDecodeError as e:
|
||||
# print(f"Failed to parse JSON response: {e}")
|
||||
# print(
|
||||
# f"Response: {result.stdout if 'result' in locals() else 'No response'}")
|
||||
return None
|
||||
except Exception as e:
|
||||
# print(f"Failed to get query detail: {e}")
|
||||
# print(f"API URL: {api_url}")
|
||||
# print(f"Query ID: {query_id}")
|
||||
return None
|
||||
|
||||
def assert_query_detail_field(self, query_detail, field_name, expected_value=None):
|
||||
"""
|
||||
Validate field values in query detail
|
||||
"""
|
||||
if query_detail is None:
|
||||
# print("Query detail is None")
|
||||
return False
|
||||
|
||||
if field_name not in query_detail:
|
||||
# print(f"Field {field_name} not found in query detail")
|
||||
return False
|
||||
|
||||
actual_value = query_detail[field_name]
|
||||
# print(f"Field {field_name}: {actual_value}")
|
||||
|
||||
if expected_value is not None:
|
||||
if actual_value != expected_value:
|
||||
# print(
|
||||
# f"Field {field_name} mismatch: expected {expected_value}, got {actual_value}")
|
||||
return False
|
||||
|
||||
return True
|
||||
|
||||
def query_detail_check(self, sql=None, expected_scan_rows=None, expected_return_rows=None):
|
||||
"""
|
||||
Comprehensive function to test query detail API using assertions to validate results
|
||||
|
||||
Args:
|
||||
sql: SQL statement to execute
|
||||
expected_scan_rows: Expected scanRows value
|
||||
expected_return_rows: Expected returnRows value
|
||||
|
||||
Returns:
|
||||
dict: Dictionary containing test results
|
||||
"""
|
||||
import time
|
||||
|
||||
# 1. Get timestamp
|
||||
timestamp_ms = self.get_timestamp_ms()
|
||||
# print(f"Test started with timestamp: {timestamp_ms}")
|
||||
|
||||
# 2. Execute SQL statement
|
||||
# print(f"Executing SQL: {sql}")
|
||||
result = self.execute_sql(sql, True)
|
||||
|
||||
if not result["status"]:
|
||||
# print(f"Failed to execute SQL: {result}")
|
||||
tools.assert_true(False, f"SQL execution failed: {result}")
|
||||
|
||||
# 3. Get query_id
|
||||
query_id = self.get_last_query_id()
|
||||
if query_id is None:
|
||||
# print("Failed to get query_id")
|
||||
tools.assert_true(False, "Failed to get query_id")
|
||||
|
||||
# 4. Wait a bit to ensure query detail is collected
|
||||
time.sleep(1)
|
||||
|
||||
# 5. Get query detail, retry up to 3 times
|
||||
query_detail = None
|
||||
for retry_count in range(3):
|
||||
query_detail = self.get_query_detail_by_api(timestamp_ms, query_id)
|
||||
if query_detail is not None:
|
||||
break
|
||||
# print(f"Failed to get query detail, retry {retry_count + 1}/3")
|
||||
if retry_count < 2: # Not the last retry
|
||||
time.sleep(1)
|
||||
|
||||
if query_detail is None:
|
||||
# print("Failed to get query detail after 3 retries")
|
||||
tools.assert_true(
|
||||
False, "Failed to get query detail after 3 retries")
|
||||
|
||||
# 6. Validate each field and assert
|
||||
# print("=== Query Detail API Test Results ===")
|
||||
# print(f"Timestamp: {timestamp_ms}")
|
||||
# print(f"Query ID: {query_id}")
|
||||
# print(f"Query Detail: {query_detail}")
|
||||
|
||||
# Validate scanRows
|
||||
if expected_scan_rows is not None:
|
||||
actual_scan_rows = query_detail.get("scanRows")
|
||||
tools.assert_equal(
|
||||
expected_scan_rows, actual_scan_rows,
|
||||
f"scanRows mismatch: expected {expected_scan_rows}, got {actual_scan_rows}"
|
||||
)
|
||||
# print(f"✓ scanRows validation passed: {actual_scan_rows}")
|
||||
else:
|
||||
# print(f"scanRows: {query_detail.get('scanRows')}")
|
||||
pass
|
||||
|
||||
# Validate returnRows
|
||||
if expected_return_rows is not None:
|
||||
actual_return_rows = query_detail.get("returnRows")
|
||||
tools.assert_equal(
|
||||
expected_return_rows, actual_return_rows,
|
||||
f"returnRows mismatch: expected {expected_return_rows}, got {actual_return_rows}"
|
||||
)
|
||||
# print(f"✓ returnRows validation passed: {actual_return_rows}")
|
||||
else:
|
||||
# print(f"returnRows: {query_detail.get('returnRows')}")
|
||||
pass
|
||||
|
||||
# Validate cpuCostNs
|
||||
actual_cpu_cost_ns = query_detail.get("cpuCostNs")
|
||||
tools.assert_true(
|
||||
actual_cpu_cost_ns > 0,
|
||||
f"cpuCostNs is negative: {actual_cpu_cost_ns}"
|
||||
)
|
||||
# print(f"✓ cpuCostNs validation passed: {actual_cpu_cost_ns}")
|
||||
|
||||
# Validate scanBytes
|
||||
actual_scan_bytes = query_detail.get("scanBytes")
|
||||
tools.assert_true(
|
||||
actual_scan_bytes > 0,
|
||||
f"scanBytes is negative: {actual_scan_bytes}"
|
||||
)
|
||||
# print(f"✓ scanBytes validation passed: {actual_scan_bytes}")
|
||||
|
||||
# Validate memCostBytes
|
||||
actual_mem_cost_bytes = query_detail.get("memCostBytes")
|
||||
tools.assert_true(
|
||||
actual_mem_cost_bytes > 0,
|
||||
f"memCostBytes is negative: {actual_mem_cost_bytes}"
|
||||
)
|
||||
# print(f"✓ memCostBytes validation passed: {actual_mem_cost_bytes}")
|
||||
|
||||
return {
|
||||
"success": True
|
||||
}
|
||||
|
|
|
|||
|
|
@ -0,0 +1,28 @@
|
|||
-- name: test_query_detail_api
|
||||
create database db_${uuid0};
|
||||
-- result:
|
||||
-- !result
|
||||
use db_${uuid0};
|
||||
-- result:
|
||||
-- !result
|
||||
admin set frontend config ("enable_collect_query_detail_info" = "true");
|
||||
-- result:
|
||||
-- !result
|
||||
create table test_table (
|
||||
id int,
|
||||
name varchar(100),
|
||||
value double
|
||||
) distributed by hash(id) properties("replication_num" = "1");
|
||||
-- result:
|
||||
-- !result
|
||||
insert into test_table values (1, 'test1', 10.5), (2, 'test2', 20.5), (3, 'test3', 30.5);
|
||||
-- result:
|
||||
-- !result
|
||||
function: query_detail_check("select * from test_table", 3, 3)
|
||||
-- result:
|
||||
{'success': True}
|
||||
-- !result
|
||||
function: query_detail_check("insert into test_table select * from test_table", 3, 0)
|
||||
-- result:
|
||||
{'success': True}
|
||||
-- !result
|
||||
|
|
@ -0,0 +1,25 @@
|
|||
-- name: test_query_detail_api
|
||||
|
||||
create database db_${uuid0};
|
||||
use db_${uuid0};
|
||||
|
||||
|
||||
admin set frontend config ("enable_collect_query_detail_info" = "true");
|
||||
|
||||
|
||||
create table test_table (
|
||||
id int,
|
||||
name varchar(100),
|
||||
value double
|
||||
) distributed by hash(id) properties("replication_num" = "1");
|
||||
|
||||
|
||||
insert into test_table values (1, 'test1', 10.5), (2, 'test2', 20.5), (3, 'test3', 30.5);
|
||||
|
||||
function: query_detail_check("select * from test_table", 3, 3)
|
||||
|
||||
function: query_detail_check("insert into test_table select * from test_table", 3, 0)
|
||||
|
||||
-- admin set frontend config ("enable_collect_query_detail_info" = "false");
|
||||
|
||||
|
||||
Loading…
Reference in New Issue