[Enhancement] Emit routine load lag time (backport #62048) (#63239)

This commit is contained in:
Shawn 2025-09-17 12:37:20 -04:00 committed by GitHub
parent 7a765d9f1a
commit a999634dc7
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
9 changed files with 810 additions and 0 deletions

View File

@ -1722,6 +1722,11 @@ For more information on how to build a monitoring service for your StarRocks clu
- Unit: -
- Description: The maximum Kafka partition offset lag for each Routine Load job. It is collected only when the FE configuration `enable_routine_load_lag_metrics` is set to `true` and the offset lag is greater than or equal to the FE configuration `min_routine_load_lag_for_metrics`. By default, `enable_routine_load_lag_metrics` is `false`, and `min_routine_load_lag_for_metrics` is `10000`.
### starrocks_fe_routine_load_max_lag_time_of_partition
- Unit: Seconds
- Description: The maximum Kafka partition offset timestamp lag for each Routine Load job. It is collected only when the FE configuration `enable_routine_load_lag_time_metrics` is set to `true`. By default, `enable_routine_load_lag_time_metrics` is `false`.
### starrocks_fe_sql_block_hit_count
- Unit: Count

View File

@ -2616,6 +2616,12 @@ public class Config extends ConfigBase {
@ConfField(mutable = true)
public static boolean enable_routine_load_lag_metrics = false;
/**
* Whether to collect routine load latency metrics.
*/
@ConfField(mutable = true)
public static boolean enable_routine_load_lag_time_metrics = false;
@ConfField(mutable = true)
public static boolean enable_collect_query_detail_info = false;

View File

@ -68,6 +68,7 @@ import com.starrocks.common.util.concurrent.lock.LockType;
import com.starrocks.common.util.concurrent.lock.Locker;
import com.starrocks.load.Load;
import com.starrocks.load.RoutineLoadDesc;
import com.starrocks.metric.RoutineLoadLagTimeMetricMgr;
import com.starrocks.qe.OriginStatement;
import com.starrocks.server.GlobalStateMgr;
import com.starrocks.server.RunMode;
@ -84,6 +85,7 @@ import org.apache.logging.log4j.Logger;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@ -501,6 +503,7 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob {
Long.valueOf((totalRows - errorRows - unselectedRows) * 1000 / totalTaskExcutionTimeMs));
summary.put("committedTaskNum", Long.valueOf(committedTaskNum));
summary.put("abortedTaskNum", Long.valueOf(abortedTaskNum));
summary.put("partitionLagTime", new HashMap<>(getRoutineLoadLagTime()));
Gson gson = new GsonBuilder().disableHtmlEscaping().create();
return gson.toJson(summary);
}
@ -853,4 +856,64 @@ public class KafkaRoutineLoadJob extends RoutineLoadJob {
}
updateSubstate(JobSubstate.STABLE, null);
}
@Override
public void afterVisible(TransactionState txnState, boolean txnOperated) {
super.afterVisible(txnState, txnOperated);
// Update lag time metrics when Kafka transaction becomes visible
if (Config.enable_routine_load_lag_time_metrics) {
updateLagTimeMetricsFromProgress();
}
}
/**
* Update lag time metrics using current Kafka progress
*/
private void updateLagTimeMetricsFromProgress() {
try {
KafkaProgress progress = (KafkaProgress) getTimestampProgress();
if (progress == null) {
LOG.warn("Progres is null for Kafka job {}:{}", id, name);
return;
}
Map<Integer, Long> partitionTimestamps = progress.getPartitionIdToOffset();
Map<Integer, Long> partitionLagTimes = Maps.newHashMap();
long now = System.currentTimeMillis();
for (Map.Entry<Integer, Long> entry : partitionTimestamps.entrySet()) {
int partition = entry.getKey();
Long timestampValue = entry.getValue();
long lag = 0L;
// Check for clock drift (future timestamps)
if (timestampValue > now) {
long clockDrift = timestampValue - now;
LOG.warn("Clock drift detected for job {} ({}) partition {}: " +
"timestamp {}ms is {}ms ahead of current time {}ms. ",
id, name, partition, timestampValue, clockDrift, now);
} else {
lag = (now - timestampValue) / 1000; // convert to seconds
}
partitionLagTimes.put(partition, lag);
}
if (!partitionLagTimes.isEmpty()) {
RoutineLoadLagTimeMetricMgr.getInstance()
.updateRoutineLoadLagTimeMetric(this.getDbId(), this.getName(), partitionLagTimes);
}
} catch (Exception e) {
LOG.warn("Failed to update lag time metrics for Kafka job {} ({}): {}", id, name, e.getMessage(), e);
}
}
private Map<Integer, Long> getRoutineLoadLagTime() {
try {
RoutineLoadLagTimeMetricMgr metricMgr = RoutineLoadLagTimeMetricMgr.getInstance();
Map<Integer, Long> lagTimes = metricMgr.getPartitionLagTimes(this.getDbId(), this.getName());
return lagTimes != null ? lagTimes : Maps.newHashMap();
} catch (Exception e) {
LOG.warn("Failed to get routine load lag time for job {} ({}): {}", id, name, e.getMessage(), e);
// Return empty map as fallback
return Maps.newHashMap();
}
}
}

View File

@ -166,6 +166,9 @@ public class MetricCalculator extends TimerTask {
MetricRepo.updateMemoryUsageMetrics();
}
// Clean up stale routine load lag time metrics
RoutineLoadLagTimeMetricMgr.getInstance().cleanupStaleMetrics();
MetricRepo.GAUGE_SAFE_MODE.setValue(GlobalStateMgr.getCurrentState().isSafeMode() ? 1 : 0);
}
}

View File

@ -1010,6 +1010,9 @@ public final class MetricRepo {
collectRoutineLoadProcessMetrics(visitor);
}
// ADD: Collect Kafka routine load lag time metrics
RoutineLoadLagTimeMetricMgr.getInstance().collectRoutineLoadLagTimeMetrics(visitor);
if (Config.memory_tracker_enable) {
collectMemoryUsageMetrics(visitor);
}

View File

@ -0,0 +1,221 @@
// Copyright 2021-present StarRocks, Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package com.starrocks.metric;
import com.google.common.collect.Maps;
import com.starrocks.common.Config;
import com.starrocks.metric.Metric.MetricUnit;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.Collections;
import java.util.Iterator;
import java.util.Map;
/**
* Manager for Routine Load Lag Time Metrics
* Handles collection and emission of lag metrics for Kafka routine load jobs
*/
public class RoutineLoadLagTimeMetricMgr {
private static final Logger LOG = LogManager.getLogger(RoutineLoadLagTimeMetricMgr.class);
private static final long STALE_THRESHOLD_MS = 5L * 60 * 1000; // 5 minutes; to prevent memory leaks
private static final RoutineLoadLagTimeMetricMgr INSTANCE = new RoutineLoadLagTimeMetricMgr();
// Use composite key (dbId_jobName) to uniquely identify jobs
private final Map<String, RoutineLoadLagTimeMetric> jobLagTimeMap = Maps.newConcurrentMap();
private RoutineLoadLagTimeMetricMgr() {}
public static RoutineLoadLagTimeMetricMgr getInstance() {
return INSTANCE;
}
private String getJobKey(long dbId, String jobName) {
return dbId + "." + jobName;
}
private static class RoutineLoadLagTimeMetric {
private long updateTimestamp;
private final String jobName;
private final Map<Integer, Long> partitionLagTimes;
private final LeaderAwareGaugeMetricLong maxLagTimeMetric;
public RoutineLoadLagTimeMetric(String jobName) {
this.updateTimestamp = -1;
this.jobName = jobName;
this.partitionLagTimes = Maps.newHashMap();
this.maxLagTimeMetric = new LeaderAwareGaugeMetricLong(
"routine_load_max_lag_time_of_partition",
MetricUnit.SECONDS,
"Maximum lag time across all partitions for routine load job"
) {
@Override
public Long getValueLeader() {
return partitionLagTimes.values().stream()
.mapToLong(Long::longValue)
.max()
.orElse(0L);
}
};
this.maxLagTimeMetric.addLabel(new MetricLabel("job_name", jobName));
LOG.debug("Initialized empty lag time metric structure for job {}", jobName);
}
public void updateMetrics(Map<Integer, Long> newPartitionLagTimes, long newUpdateTimestamp) {
this.updateTimestamp = newUpdateTimestamp;
this.partitionLagTimes.clear();
this.partitionLagTimes.putAll(newPartitionLagTimes);
LOG.debug("Updated metrics for job {}: partitions={}",
jobName, newPartitionLagTimes.size());
}
public Map<Integer, Long> getPartitionLagTimes() {
return Maps.newHashMap(partitionLagTimes);
}
public LeaderAwareGaugeMetricLong getMaxLagTimeMetric() {
return maxLagTimeMetric;
}
public boolean isStale(long currentTime, long staleThresholdMs) {
return (currentTime - updateTimestamp) > staleThresholdMs;
}
public boolean hasData() {
return !partitionLagTimes.isEmpty() && updateTimestamp > 0;
}
}
public void updateRoutineLoadLagTimeMetric(long dbId, String jobName, Map<Integer, Long> partitionLagTimes) {
try {
if (partitionLagTimes.isEmpty()) {
LOG.debug("No partition lag times available for job {}", jobName);
return;
}
// Get or create the metric structure using composite key
String jobKey = getJobKey(dbId, jobName);
RoutineLoadLagTimeMetric lagTimeMetric =
jobLagTimeMap.computeIfAbsent(jobKey, RoutineLoadLagTimeMetric::new);
// Update the metric with new values
long now = System.currentTimeMillis();
lagTimeMetric.updateMetrics(partitionLagTimes, now);
LOG.debug("Updated lag time data for Kafka job {}: partitions={}",
jobKey, partitionLagTimes.size());
} catch (Exception e) {
LOG.warn("Failed to update lag time data for Kafka job {}: {}", getJobKey(dbId, jobName), e.getMessage());
}
}
/**
* Clean up stale lag time metrics to prevent memory leaks
*/
public void cleanupStaleMetrics() {
if (!Config.enable_routine_load_lag_time_metrics) {
return;
}
try {
long now = System.currentTimeMillis();
long staleThresholdMs = STALE_THRESHOLD_MS;
// Clean up stale data
Iterator<Map.Entry<String, RoutineLoadLagTimeMetric>> jobLagTimeIterator = jobLagTimeMap.entrySet().iterator();
while (jobLagTimeIterator.hasNext()) {
Map.Entry<String, RoutineLoadLagTimeMetric> entry = jobLagTimeIterator.next();
String jobKey = entry.getKey();
RoutineLoadLagTimeMetric lagTimeMetric = entry.getValue();
// Remove stale data
if (lagTimeMetric.isStale(now, staleThresholdMs)) {
LOG.debug("Removing stale lag time data for job {}", jobKey);
jobLagTimeIterator.remove();
}
}
} catch (Exception e) {
LOG.warn("Failed to cleanup stale routine load lag time metrics", e);
}
}
/**
* Collect routine load lag time metrics - now uses stored data instead of calculating on-demand
*/
public void collectRoutineLoadLagTimeMetrics(MetricVisitor visitor) {
if (!Config.enable_routine_load_lag_time_metrics) {
return;
}
try {
// Emit metrics for all jobs with data
for (Map.Entry<String, RoutineLoadLagTimeMetric> entry : jobLagTimeMap.entrySet()) {
String jobKey = entry.getKey();
RoutineLoadLagTimeMetric lagTimeMetric = entry.getValue();
// Skip metrics that don't have data yet
if (!lagTimeMetric.hasData()) {
LOG.debug("Skipping job {} - no data available yet", jobKey);
continue;
}
// Emit metrics for this job
emitJobMetrics(jobKey, lagTimeMetric, visitor);
}
} catch (Exception e) {
LOG.warn("Failed to collect routine load lag time metrics", e);
}
}
private void emitJobMetrics(String jobKey, RoutineLoadLagTimeMetric lagTimeMetric, MetricVisitor visitor) {
try {
if (lagTimeMetric.hasData()) {
// only max lag is emitted; partition-level lag is only available through `SHOW ROUTINE LOAD`
visitor.visit(lagTimeMetric.getMaxLagTimeMetric());
}
} catch (Exception e) {
LOG.warn("Failed to emit metrics for job {}: {}", jobKey, e.getMessage());
}
}
public Map<Integer, Long> getPartitionLagTimes(long dbId, String jobName) {
try {
String jobKey = getJobKey(dbId, jobName);
RoutineLoadLagTimeMetric lagTimeMetric = jobLagTimeMap.get(jobKey);
if (lagTimeMetric == null || !lagTimeMetric.hasData()) {
LOG.debug("No lag time metric found for job {} ({})", jobKey, jobName);
return Collections.emptyMap();
}
Map<Integer, Long> lagTimes = lagTimeMetric.getPartitionLagTimes();
LOG.debug("Retrieved {} partition lag times for job {} ({})",
lagTimes.size(), jobKey, jobName);
return lagTimes;
} catch (Exception e) {
LOG.warn("Failed to get partition lag times for job {} ({}): {}",
getJobKey(dbId, jobName), jobName, e.getMessage());
return Collections.emptyMap();
}
}
}

View File

@ -49,6 +49,7 @@ import com.starrocks.common.StarRocksException;
import com.starrocks.common.jmockit.Deencapsulation;
import com.starrocks.common.util.KafkaUtil;
import com.starrocks.load.RoutineLoadDesc;
import com.starrocks.metric.RoutineLoadLagTimeMetricMgr;
import com.starrocks.persist.gson.GsonUtils;
import com.starrocks.qe.ConnectContext;
import com.starrocks.qe.OriginStatement;
@ -629,5 +630,124 @@ public class KafkaRoutineLoadJobTest {
}
@Test
public void testUpdateLagTimeMetricsFromProgress() {
KafkaRoutineLoadJob job = new KafkaRoutineLoadJob(1L, "test_job", 1L, 1L, "127.0.0.1:9020", "topic1");
// Create timestamp progress with future timestamp (clock drift scenario)
Map<Integer, Long> partitionTimestamps = Maps.newHashMap();
long currentTime = System.currentTimeMillis();
partitionTimestamps.put(0, currentTime + 60000); // 1 minute in the future (clock drift)
partitionTimestamps.put(1, currentTime - 5000); // Normal timestamp
KafkaProgress timestampProgress = new KafkaProgress(partitionTimestamps);
Deencapsulation.setField(job, "timestampProgress", timestampProgress);
new MockUp<RoutineLoadLagTimeMetricMgr>() {
@Mock
public void updateRoutineLoadLagTimeMetric(long dbId, String jobName, Map<Integer, Long> partitionLagTimes) {
// Verify clock drift handling: partition 0 should have lag 0, partition 1 should have positive lag
Assertions.assertTrue(partitionLagTimes.containsKey(0));
Assertions.assertTrue(partitionLagTimes.containsKey(1));
Assertions.assertEquals(Long.valueOf(0L), partitionLagTimes.get(0)); // Clock drift case
Assertions.assertTrue(partitionLagTimes.get(1) > 0); // Normal case
}
};
// Execute: Call the private method
Deencapsulation.invoke(job, "updateLagTimeMetricsFromProgress");
}
@Test
public void testUpdateLagTimeMetricsFromProgressWithException() {
KafkaRoutineLoadJob job = new KafkaRoutineLoadJob(1L, "test_job", 1L, 1L, "127.0.0.1:9020", "topic1");
// Set null timestamp progress to trigger exception
Deencapsulation.setField(job, "timestampProgress", null);
Deencapsulation.invoke(job, "updateLagTimeMetricsFromProgress");
}
@Test
public void testGetRoutineLoadLagTimeSuccess() {
KafkaRoutineLoadJob job = new KafkaRoutineLoadJob(1L, "test_job", 1L, 1L, "127.0.0.1:9020", "topic1");
// Mock successful retrieval from RoutineLoadLagTimeMetricMgr
Map<Integer, Long> expectedLagTimes = Maps.newHashMap();
expectedLagTimes.put(0, 10L);
expectedLagTimes.put(1, 15L);
new MockUp<RoutineLoadLagTimeMetricMgr>() {
@Mock
public Map<Integer, Long> getPartitionLagTimes(long dbId, String jobName) {
return expectedLagTimes;
}
};
// Execute: Call the private method
Map<Integer, Long> result = Deencapsulation.invoke(job, "getRoutineLoadLagTime");
// Verify: Should return the expected lag times
Assertions.assertEquals(expectedLagTimes, result);
Assertions.assertEquals(2, result.size());
Assertions.assertEquals(Long.valueOf(10L), result.get(0));
Assertions.assertEquals(Long.valueOf(15L), result.get(1));
}
@Test
public void testGetRoutineLoadLagTimeEmpty() {
KafkaRoutineLoadJob job = new KafkaRoutineLoadJob(1L, "test_job", 1L, 1L, "127.0.0.1:9020", "topic1");
// Mock empty retrieval from RoutineLoadLagTimeMetricMgr
new MockUp<RoutineLoadLagTimeMetricMgr>() {
@Mock
public Map<Integer, Long> getPartitionLagTimes(long dbId, String jobName) {
return Maps.newHashMap(); // Empty map
}
};
// Execute: Call the private method
Map<Integer, Long> result = Deencapsulation.invoke(job, "getRoutineLoadLagTime");
// Verify: Should return empty map
Assertions.assertTrue(result.isEmpty());
}
@Test
public void testGetRoutineLoadLagTimeWithNull() {
KafkaRoutineLoadJob job = new KafkaRoutineLoadJob(1L, "test_job", 1L, 1L, "127.0.0.1:9020", "topic1");
// Mock null retrieval from RoutineLoadLagTimeMetricMgr
new MockUp<RoutineLoadLagTimeMetricMgr>() {
@Mock
public Map<Integer, Long> getPartitionLagTimes(long dbId, String jobName) {
return null; // Null return
}
};
// Execute: Call the private method
Map<Integer, Long> result = Deencapsulation.invoke(job, "getRoutineLoadLagTime");
// Verify: Should return empty map as fallback
Assertions.assertTrue(result.isEmpty());
}
@Test
public void testGetRoutineLoadLagTimeWithException() {
KafkaRoutineLoadJob job = new KafkaRoutineLoadJob(1L, "test_job", 1L, 1L, "127.0.0.1:9020", "topic1");
// Mock exception from RoutineLoadLagTimeMetricMgr
new MockUp<RoutineLoadLagTimeMetricMgr>() {
@Mock
public Map<Integer, Long> getPartitionLagTimes(long dbId, String jobName) {
throw new RuntimeException("Test exception");
}
};
// Execute: Call the private method
Map<Integer, Long> result = Deencapsulation.invoke(job, "getRoutineLoadLagTime");
// Verify: Should return empty map as fallback
Assertions.assertTrue(result.isEmpty());
}
}

View File

@ -18,6 +18,7 @@ import com.starrocks.catalog.Table;
import com.starrocks.clone.TabletSchedCtx;
import com.starrocks.clone.TabletScheduler;
import com.starrocks.clone.TabletSchedulerStat;
import com.starrocks.common.Config;
import com.starrocks.common.jmockit.Deencapsulation;
import com.starrocks.http.rest.MetricsAction;
import com.starrocks.rpc.BrpcProxy;
@ -252,4 +253,36 @@ public class MetricRepoTest extends PlanTestBase {
}
}
}
@Test
public void testRoutineLoadLagTimeMetricsCollection() {
// Test that routine load lag time metrics are collected when config is enabled
boolean originalConfigValue = Config.enable_routine_load_lag_time_metrics;
try {
// Test case 1: Config enabled - should collect metrics
Config.enable_routine_load_lag_time_metrics = true;
JsonMetricVisitor visitor = new JsonMetricVisitor("test");
MetricsAction.RequestParams params = new MetricsAction.RequestParams(true, true, true, true);
// This should execute line 914 and call RoutineLoadLagTimeMetricMgr.getInstance().collectRoutineLoadLagTimeMetrics(visitor)
String result = MetricRepo.getMetric(visitor, params);
// Verify that the method completed successfully (no exceptions thrown)
Assertions.assertNotNull(result);
// Test case 2: Config disabled - should skip metrics collection
Config.enable_routine_load_lag_time_metrics = false;
JsonMetricVisitor visitor2 = new JsonMetricVisitor("test2");
String result2 = MetricRepo.getMetric(visitor2, params);
// Verify that the method completed successfully even when config is disabled
Assertions.assertNotNull(result2);
} finally {
// Restore original config value
Config.enable_routine_load_lag_time_metrics = originalConfigValue;
}
}
}

View File

@ -0,0 +1,356 @@
// Copyright 2021-present StarRocks, Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package com.starrocks.metric;
import com.google.common.collect.Maps;
import com.starrocks.common.Config;
import com.starrocks.common.jmockit.Deencapsulation;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import java.util.Map;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.atLeast;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.verify;
public class RoutineLoadLagTimeMetricMgrTest {
private RoutineLoadLagTimeMetricMgr metricMgr;
private AutoCloseable closeable;
@Mock
private MetricVisitor mockVisitor;
@BeforeEach
public void setUp() {
closeable = MockitoAnnotations.openMocks(this);
metricMgr = RoutineLoadLagTimeMetricMgr.getInstance();
// Enable routine load lag time metrics for testing
Config.enable_routine_load_lag_time_metrics = true;
// Clear any existing metrics from previous tests to ensure test isolation
Map<String, Object> jobLagTimeMap = Deencapsulation.getField(metricMgr, "jobLagTimeMap");
jobLagTimeMap.clear();
}
@AfterEach
public void tearDown() throws Exception {
if (closeable != null) {
closeable.close();
}
}
@Test
public void testMultipleJobsMetrics() {
// Setup: Different lag times for each job
Map<Integer, Long> job1LagTimes = Maps.newHashMap();
job1LagTimes.put(0, 30L);
job1LagTimes.put(1, 45L);
Map<Integer, Long> job2LagTimes = Maps.newHashMap();
job2LagTimes.put(0, 60L);
job2LagTimes.put(1, 75L);
job2LagTimes.put(2, 90L);
// Execute: Update metrics for both jobs
metricMgr.updateRoutineLoadLagTimeMetric(1L, "test_multiple_job1", job1LagTimes);
metricMgr.updateRoutineLoadLagTimeMetric(2L, "test_multiple_job2", job2LagTimes);
// Verify: Each job has its own metrics
Map<Integer, Long> retrieved1 = metricMgr.getPartitionLagTimes(1L, "test_multiple_job1");
Map<Integer, Long> retrieved2 = metricMgr.getPartitionLagTimes(2L, "test_multiple_job2");
Assertions.assertEquals(2, retrieved1.size(), "Job 1 should have 2 partitions");
Assertions.assertEquals(3, retrieved2.size(), "Job 2 should have 3 partitions");
Assertions.assertEquals(Long.valueOf(30L), retrieved1.get(0), "Job 1 partition 0");
Assertions.assertEquals(Long.valueOf(60L), retrieved2.get(0), "Job 2 partition 0");
}
@Test
public void testUpdateRoutineLoadLagTimeMetricWithEmptyPartitions() {
// Setup: Empty partition lag times
Map<Integer, Long> partitionLagTimes = Maps.newHashMap();
// Execute: Update metrics with empty map
metricMgr.updateRoutineLoadLagTimeMetric(1L, "test_empty_job", partitionLagTimes);
// Verify: No metrics stored
Map<Integer, Long> retrievedLagTimes = metricMgr.getPartitionLagTimes(1L, "test_empty_job");
Assertions.assertTrue(retrievedLagTimes.isEmpty(), "Should return empty map for empty input");
}
@Test
public void testUpdateRoutineLoadLagTimeMetricOverwrite() {
// Setup: Initial partition lag times
Map<Integer, Long> initialLagTimes = Maps.newHashMap();
initialLagTimes.put(0, 30L);
initialLagTimes.put(1, 45L);
// Execute: First update
metricMgr.updateRoutineLoadLagTimeMetric(1L, "test_overwrite_job", initialLagTimes);
// Setup: Updated partition lag times
Map<Integer, Long> updatedLagTimes = Maps.newHashMap();
updatedLagTimes.put(0, 35L); // Updated lag for partition 0
updatedLagTimes.put(1, 50L); // Updated lag for partition 1
updatedLagTimes.put(2, 25L); // New partition 2
// Execute: Second update
metricMgr.updateRoutineLoadLagTimeMetric(1L, "test_overwrite_job", updatedLagTimes);
// Verify: Metrics were updated, old partitions removed
Map<Integer, Long> retrievedLagTimes = metricMgr.getPartitionLagTimes(1L, "test_overwrite_job");
Assertions.assertEquals(3, retrievedLagTimes.size(), "Should have 3 partitions after update");
Assertions.assertEquals(Long.valueOf(35L), retrievedLagTimes.get(0), "Partition 0 lag time updated");
Assertions.assertEquals(Long.valueOf(50L), retrievedLagTimes.get(1), "Partition 1 lag time updated");
Assertions.assertEquals(Long.valueOf(25L), retrievedLagTimes.get(2), "Partition 2 lag time added");
}
@Test
public void testGetPartitionLagTimesForNonExistentJob() {
// Execute: Try to get lag times for non-existent job
Map<Integer, Long> retrievedLagTimes = metricMgr.getPartitionLagTimes(999L, "test_nonexistent_job");
// Verify: Returns empty map
Assertions.assertTrue(retrievedLagTimes.isEmpty(), "Should return empty map for non-existent job");
}
@Test
public void testCollectMetrics() throws Exception {
// Setup: Add metrics
Map<Integer, Long> partitionLagTimes = Maps.newHashMap();
partitionLagTimes.put(0, 30L);
partitionLagTimes.put(1, 45L);
metricMgr.updateRoutineLoadLagTimeMetric(1L, "test_collect_job", partitionLagTimes);
// Verify: Check that metrics were actually stored
Map<Integer, Long> retrievedLagTimes = metricMgr.getPartitionLagTimes(1L, "test_collect_job");
Assertions.assertFalse(retrievedLagTimes.isEmpty(), "Metrics should be stored before collection");
// Execute: Collect metrics
metricMgr.collectRoutineLoadLagTimeMetrics(mockVisitor);
// Verify: Visitor was called for partition metrics and max metric
verify(mockVisitor, atLeast(1)).visit(any());
}
@Test
public void testMaxLagTimeCalculation() {
// Setup: Partition lag times with different values
Map<Integer, Long> partitionLagTimes = Maps.newHashMap();
partitionLagTimes.put(0, 30L); // min
partitionLagTimes.put(1, 75L); // max
partitionLagTimes.put(2, 45L); // middle
// Execute: Update metrics
metricMgr.updateRoutineLoadLagTimeMetric(1L, "test_maxlag_job", partitionLagTimes);
// Verify: Max lag time is calculated correctly
Map<Integer, Long> retrieved = metricMgr.getPartitionLagTimes(1L, "test_maxlag_job");
Assertions.assertEquals(3, retrieved.size(), "Should maintain all partition data");
// Verify max value is among the retrieved values
Long maxValue = retrieved.values().stream().max(Long::compareTo).orElse(0L);
Assertions.assertEquals(Long.valueOf(75L), maxValue, "Max lag time should be 75");
}
@Test
public void testIsStaleMethodWhenStale() {
// Setup: Add a job with lag time metrics
Map<Integer, Long> partitionLagTimes = Maps.newHashMap();
partitionLagTimes.put(0, 30L);
metricMgr.updateRoutineLoadLagTimeMetric(1L, "test_stale_job", partitionLagTimes);
// Get the internal RoutineLoadLagTimeMetric object
Map<String, Object> jobLagTimeMap = Deencapsulation.getField(metricMgr, "jobLagTimeMap");
Object lagTimeMetric = jobLagTimeMap.get("1.test_stale_job");
Assertions.assertNotNull(lagTimeMetric, "Lag time metric should exist");
// Test: Check if metric is stale (6 minutes ago)
long currentTime = System.currentTimeMillis();
long staleThresholdMs = 5L * 60 * 1000; // 5 minutes
long staleUpdateTime = currentTime - (6L * 60 * 1000); // 6 minutes ago
// Set the update timestamp to 6 minutes ago
Deencapsulation.setField(lagTimeMetric, "updateTimestamp", staleUpdateTime);
// Verify: Metric should be stale
boolean isStale = Deencapsulation.invoke(lagTimeMetric, "isStale", currentTime, staleThresholdMs);
Assertions.assertTrue(isStale, "Metric should be stale after 6 minutes");
}
@Test
public void testIsStaleMethodWhenFresh() {
// Setup: Add a job with lag time metrics
Map<Integer, Long> partitionLagTimes = Maps.newHashMap();
partitionLagTimes.put(0, 30L);
metricMgr.updateRoutineLoadLagTimeMetric(1L, "test_fresh_job", partitionLagTimes);
// Get the internal RoutineLoadLagTimeMetric object
Map<String, Object> jobLagTimeMap = Deencapsulation.getField(metricMgr, "jobLagTimeMap");
Object lagTimeMetric = jobLagTimeMap.get("1.test_fresh_job");
Assertions.assertNotNull(lagTimeMetric, "Lag time metric should exist");
// Test: Check if metric is fresh (2 minutes ago)
long currentTime = System.currentTimeMillis();
long staleThresholdMs = 5L * 60 * 1000; // 5 minutes
long freshUpdateTime = currentTime - (2L * 60 * 1000); // 2 minutes ago
// Set the update timestamp to 2 minutes ago
Deencapsulation.setField(lagTimeMetric, "updateTimestamp", freshUpdateTime);
// Verify: Metric should not be stale
boolean isStale = Deencapsulation.invoke(lagTimeMetric, "isStale", currentTime, staleThresholdMs);
Assertions.assertFalse(isStale, "Metric should not be stale after 2 minutes");
}
@Test
public void testCollectRoutineLoadLagTimeMetricsExceptionHandling() {
// Setup: Add a job with metrics
Map<Integer, Long> partitionLagTimes = Maps.newHashMap();
partitionLagTimes.put(0, 30L);
metricMgr.updateRoutineLoadLagTimeMetric(1L, "test_collect_exception_job", partitionLagTimes);
// Mock visitor to throw exception
doThrow(new RuntimeException("Simulated visitor exception")).when(mockVisitor).visit(any());
// Execute: This should handle the exception gracefully
Assertions.assertDoesNotThrow(() -> {
metricMgr.collectRoutineLoadLagTimeMetrics(mockVisitor);
}, "collectRoutineLoadLagTimeMetrics should handle exceptions gracefully");
}
@Test
public void testGetPartitionLagTimesExceptionHandling() {
// Setup: First add a job to have some data
Map<Integer, Long> partitionLagTimes = Maps.newHashMap();
partitionLagTimes.put(0, 30L);
metricMgr.updateRoutineLoadLagTimeMetric(1L, "test_get_exception_job", partitionLagTimes);
// Get the internal jobLagTimeMap and replace it with a map that throws exception
Map<String, Object> originalJobLagTimeMap = Deencapsulation.getField(metricMgr, "jobLagTimeMap");
// Create a custom map that throws exception when get() is called
Map<String, Object> faultyMap = new java.util.HashMap<String, Object>() {
@Override
public Object get(Object key) {
throw new RuntimeException("Simulated map access exception");
}
};
// Replace the internal map with our faulty map
Deencapsulation.setField(metricMgr, "jobLagTimeMap", faultyMap);
// Execute: This should handle the exception gracefully
Map<Integer, Long> result = Assertions.assertDoesNotThrow(() -> {
return metricMgr.getPartitionLagTimes(1L, "test_get_exception_job");
}, "getPartitionLagTimes should handle exceptions gracefully");
// Verify: Should return empty map on exception
Assertions.assertTrue(result.isEmpty(), "Should return empty map when exception occurs");
// Restore the original map for other tests
Deencapsulation.setField(metricMgr, "jobLagTimeMap", originalJobLagTimeMap);
}
@Test
public void testEmitJobMetricsExceptionHandling() {
// Setup: Add a job with metrics
Map<Integer, Long> partitionLagTimes = Maps.newHashMap();
partitionLagTimes.put(0, 30L);
metricMgr.updateRoutineLoadLagTimeMetric(1L, "test_emit_exception_job", partitionLagTimes);
// Get the internal objects
Map<String, Object> jobLagTimeMap = Deencapsulation.getField(metricMgr, "jobLagTimeMap");
Object lagTimeMetric = jobLagTimeMap.get("1.test_emit_exception_job");
Assertions.assertNotNull(lagTimeMetric, "Lag time metric should exist");
// Mock visitor to throw exception
doThrow(new RuntimeException("Simulated emit exception")).when(mockVisitor).visit(any());
// Execute: This should handle the exception gracefully
Assertions.assertDoesNotThrow(() -> {
Deencapsulation.invoke(metricMgr, "emitJobMetrics", "1.test_emit_exception_job", lagTimeMetric, mockVisitor);
}, "emitJobMetrics should handle exceptions gracefully");
}
@Test
public void testGetValueLeaderCalculatesMaxLagTimeCorrectly() {
// Setup: Partition lag times with different values
Map<Integer, Long> partitionLagTimes = Maps.newHashMap();
partitionLagTimes.put(0, 30L); // min
partitionLagTimes.put(1, 75L); // max
partitionLagTimes.put(2, 45L); // middle
partitionLagTimes.put(3, 60L); // another value
// Execute: Update metrics
metricMgr.updateRoutineLoadLagTimeMetric(1L, "test_getValueLeader_job", partitionLagTimes);
// Get the internal RoutineLoadLagTimeMetric object
Map<String, Object> jobLagTimeMap = Deencapsulation.getField(metricMgr, "jobLagTimeMap");
Object lagTimeMetric = jobLagTimeMap.get("1.test_getValueLeader_job");
Assertions.assertNotNull(lagTimeMetric, "Lag time metric should exist");
// Get the maxLagTimeMetric field
Object maxLagTimeMetric = Deencapsulation.getField(lagTimeMetric, "maxLagTimeMetric");
Assertions.assertNotNull(maxLagTimeMetric, "Max lag time metric should exist");
// Verify: Call getValueLeader() directly to test the max calculation logic
Long maxValue = Deencapsulation.invoke(maxLagTimeMetric, "getValueLeader");
Assertions.assertEquals(Long.valueOf(75L), maxValue, "getValueLeader should return the maximum lag time (75L)");
}
@Test
public void testCleanupStaleMetrics() {
// Setup: Add multiple jobs with different timestamps
Map<Integer, Long> partitionLagTimes = Maps.newHashMap();
partitionLagTimes.put(0, 30L);
partitionLagTimes.put(1, 45L);
// Add fresh job (should not be removed)
metricMgr.updateRoutineLoadLagTimeMetric(1L, "fresh_job", partitionLagTimes);
// Add stale job (should be removed)
metricMgr.updateRoutineLoadLagTimeMetric(2L, "stale_job", partitionLagTimes);
// Get the internal jobLagTimeMap
Map<String, Object> jobLagTimeMap = Deencapsulation.getField(metricMgr, "jobLagTimeMap");
// Verify both jobs exist initially
Assertions.assertEquals(2, jobLagTimeMap.size(), "Should have 2 jobs initially");
Assertions.assertTrue(jobLagTimeMap.containsKey("1.fresh_job"), "Fresh job should exist");
Assertions.assertTrue(jobLagTimeMap.containsKey("2.stale_job"), "Stale job should exist");
// Manually set the stale job's timestamp to be older than 5 minutes
Object staleJobMetric = jobLagTimeMap.get("2.stale_job");
long staleTimestamp = System.currentTimeMillis() - (6L * 60 * 1000); // 6 minutes ago
Deencapsulation.setField(staleJobMetric, "updateTimestamp", staleTimestamp);
// Execute: Clean up stale metrics
metricMgr.cleanupStaleMetrics();
// Verify: Only fresh job should remain
Assertions.assertEquals(1, jobLagTimeMap.size(), "Should have 1 job after cleanup");
Assertions.assertTrue(jobLagTimeMap.containsKey("1.fresh_job"), "Fresh job should still exist");
Assertions.assertFalse(jobLagTimeMap.containsKey("2.stale_job"), "Stale job should be removed");
}
}