[Enhancement] Add clone metrics in frontend (backport #62421) (#62515)

Signed-off-by: wyb <wybb86@gmail.com>
Co-authored-by: wyb <wybb86@gmail.com>
This commit is contained in:
mergify[bot] 2025-08-29 08:08:05 +00:00 committed by GitHub
parent ea63796289
commit ede876e277
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
9 changed files with 259 additions and 15 deletions

View File

@ -21,6 +21,9 @@ import java.util.Map;
import java.util.Set;
public abstract class BalanceStat {
public static final String INTER_NODE = "INTER_NODE";
public static final String INTRA_NODE = "INTRA_NODE";
public enum BalanceType {
INTER_NODE_DISK_USAGE("inter-node disk usage"),
INTER_NODE_TABLET_DISTRIBUTION("inter-node tablet distribution"),

View File

@ -539,6 +539,11 @@ public class TabletSchedCtx implements Comparable<TabletSchedCtx> {
this.balanceType = balanceType;
}
public boolean isIntraNodeBalance() {
return balanceType != null &&
(balanceType == BalanceType.INTRA_NODE_DISK_USAGE || balanceType == BalanceType.INTRA_NODE_TABLET_DISTRIBUTION);
}
public void setSrcPathResourceHold() {
this.srcPathResourceHold = true;
}
@ -1011,7 +1016,7 @@ public class TabletSchedCtx implements Comparable<TabletSchedCtx> {
* 1. SCHEDULE_FAILED: will keep the tablet RUNNING.
* 2. UNRECOVERABLE: will remove the tablet from runningTablets.
*/
public void finishCloneTask(CloneTask cloneTask, TFinishTaskRequest request)
public void finishCloneTask(CloneTask cloneTask, TFinishTaskRequest request, TabletSchedulerStat stat)
throws SchedException {
Preconditions.checkState(state == State.RUNNING, state);
Preconditions.checkArgument(cloneTask.getTaskVersion() == CloneTask.VERSION_2);
@ -1105,10 +1110,20 @@ public class TabletSchedCtx implements Comparable<TabletSchedCtx> {
if (request.isSetCopy_size()) {
this.copySize = request.getCopy_size();
if (isIntraNodeBalance()) {
stat.counterCloneTaskIntraNodeCopyBytes.addAndGet(copySize);
} else {
stat.counterCloneTaskInterNodeCopyBytes.addAndGet(copySize);
}
}
if (request.isSetCopy_time_ms()) {
this.copyTimeMs = request.getCopy_time_ms();
if (isIntraNodeBalance()) {
stat.counterCloneTaskIntraNodeCopyDurationMs.addAndGet(copyTimeMs);
} else {
stat.counterCloneTaskInterNodeCopyDurationMs.addAndGet(copyTimeMs);
}
}
}

View File

@ -1839,7 +1839,7 @@ public class TabletScheduler extends FrontendDaemon {
Preconditions.checkState(tabletCtx.getState() == TabletSchedCtx.State.RUNNING, tabletCtx.getState());
try {
tabletCtx.finishCloneTask(cloneTask, request);
tabletCtx.finishCloneTask(cloneTask, request, stat);
} catch (SchedException e) {
tabletCtx.increaseFailedRunningCounter();
tabletCtx.setErrMsg(e.getMessage());
@ -2160,10 +2160,18 @@ public class TabletScheduler extends FrontendDaemon {
return pendingTablets.size();
}
public synchronized long getPendingNum(Type type) {
return pendingTablets.stream().filter(t -> t.getType() == type).count();
}
public synchronized int getRunningNum() {
return runningTablets.size();
}
public synchronized long getRunningNum(Type type) {
return runningTablets.values().stream().filter(t -> t.getType() == type).count();
}
public synchronized int getHistoryNum() {
return schedHistory.size();
}

View File

@ -110,6 +110,14 @@ public class TabletSchedulerStat {
public AtomicLong counterCloneTaskFailed = new AtomicLong(0L);
@StatField("num of clone task timeout")
public AtomicLong counterCloneTaskTimeout = new AtomicLong(0L);
@StatField("copy bytes of inter-node clone task")
public AtomicLong counterCloneTaskInterNodeCopyBytes = new AtomicLong(0L);
@StatField("copy bytes of intra-node clone task")
public AtomicLong counterCloneTaskIntraNodeCopyBytes = new AtomicLong(0L);
@StatField("copy duration of inter-node clone task(ms)")
public AtomicLong counterCloneTaskInterNodeCopyDurationMs = new AtomicLong(0L);
@StatField("copy duration of intra-node clone task(ms)")
public AtomicLong counterCloneTaskIntraNodeCopyDurationMs = new AtomicLong(0L);
/*
* replica unhealthy type
@ -124,8 +132,6 @@ public class TabletSchedulerStat {
public AtomicLong counterReplicaLocMismatchErr = new AtomicLong(0L);
@StatField("num of replica redundant error")
public AtomicLong counterReplicaRedundantErr = new AtomicLong(0L);
@StatField("num of replica missing in cluster error")
public AtomicLong counterReplicaMissingInClusterErr = new AtomicLong(0L);
@StatField("num of balance scheduled")
public AtomicLong counterBalanceSchedule = new AtomicLong(0L);
@StatField("num of colocate replica mismatch")

View File

@ -48,6 +48,8 @@ import com.starrocks.catalog.Database;
import com.starrocks.catalog.OlapTable;
import com.starrocks.catalog.Table;
import com.starrocks.catalog.TabletInvertedIndex;
import com.starrocks.clone.BalanceStat;
import com.starrocks.clone.TabletSchedCtx;
import com.starrocks.common.Config;
import com.starrocks.common.DdlException;
import com.starrocks.common.StarRocksException;
@ -324,16 +326,6 @@ public final class MetricRepo {
};
STARROCKS_METRIC_REGISTER.addMetric(metaLogCount);
// scheduled tablet num
Metric<Long> scheduledTabletNum = new LeaderAwareGaugeMetricLong(
"scheduled_tablet_num", MetricUnit.NOUNIT, "number of tablets being scheduled") {
@Override
public Long getValueLeader() {
return (long) GlobalStateMgr.getCurrentState().getTabletScheduler().getTotalNum();
}
};
STARROCKS_METRIC_REGISTER.addMetric(scheduledTabletNum);
// routine load jobs
for (RoutineLoadJob.JobState state : RoutineLoadJob.JobState.values()) {
Metric<Long> gauge = new LeaderAwareGaugeMetricLong("routine_load_jobs",
@ -631,6 +623,9 @@ public final class MetricRepo {
// init system metrics
initSystemMetrics();
// init clone metrics
initCloneMetrics();
updateMetrics();
hasInit = true;
@ -685,6 +680,111 @@ public final class MetricRepo {
STARROCKS_METRIC_REGISTER.addMetric(tpcOutSegs);
}
private static void initCloneMetrics() {
// gauge metrics
// scheduled tablet num
Metric<Long> scheduledTabletNum = new LeaderAwareGaugeMetricLong(
"scheduled_tablet_num", MetricUnit.NOUNIT, "number of tablets being scheduled") {
@Override
public Long getValueLeader() {
return (long) GlobalStateMgr.getCurrentState().getTabletScheduler().getTotalNum();
}
};
STARROCKS_METRIC_REGISTER.addMetric(scheduledTabletNum);
// scheduled pending tablet num
for (TabletSchedCtx.Type schedType : TabletSchedCtx.Type.values()) {
Metric<Long> scheduledPendingTabletNum = new LeaderAwareGaugeMetricLong(
"scheduled_pending_tablet_num", MetricUnit.NOUNIT, "number of pending tablets being scheduled") {
@Override
public Long getValueLeader() {
return GlobalStateMgr.getCurrentState().getTabletScheduler().getPendingNum(schedType);
}
};
scheduledPendingTabletNum.addLabel(new MetricLabel("type", schedType.name()));
STARROCKS_METRIC_REGISTER.addMetric(scheduledPendingTabletNum);
}
// scheduled running tablet num
for (TabletSchedCtx.Type schedType : TabletSchedCtx.Type.values()) {
Metric<Long> scheduledRunningTabletNum = new LeaderAwareGaugeMetricLong(
"scheduled_running_tablet_num", MetricUnit.NOUNIT, "number of running tablets being scheduled") {
@Override
public Long getValueLeader() {
return GlobalStateMgr.getCurrentState().getTabletScheduler().getRunningNum(schedType);
}
};
scheduledRunningTabletNum.addLabel(new MetricLabel("type", schedType.name()));
STARROCKS_METRIC_REGISTER.addMetric(scheduledRunningTabletNum);
}
// counter metrics
// clone task total
LongCounterMetric cloneTaskTotal = new LongCounterMetric("clone_task_total", MetricUnit.REQUESTS, "total clone task") {
@Override
public Long getValue() {
return GlobalStateMgr.getCurrentState().getTabletScheduler().getStat().counterCloneTask.longValue();
}
};
STARROCKS_METRIC_REGISTER.addMetric(cloneTaskTotal);
// clone task success
LongCounterMetric cloneTaskSuccess = new LongCounterMetric(
"clone_task_success", MetricUnit.REQUESTS, "success clone task") {
@Override
public Long getValue() {
return GlobalStateMgr.getCurrentState().getTabletScheduler().getStat().counterCloneTaskSucceeded.longValue();
}
};
STARROCKS_METRIC_REGISTER.addMetric(cloneTaskSuccess);
// clone task copy bytes
LongCounterMetric cloneTaskInterNodeCopyBytes = new LongCounterMetric(
"clone_task_copy_bytes", MetricUnit.BYTES, "clone task copy bytes") {
@Override
public Long getValue() {
return GlobalStateMgr.getCurrentState().getTabletScheduler().getStat().counterCloneTaskInterNodeCopyBytes
.longValue();
}
};
cloneTaskInterNodeCopyBytes.addLabel(new MetricLabel("type", BalanceStat.INTER_NODE));
STARROCKS_METRIC_REGISTER.addMetric(cloneTaskInterNodeCopyBytes);
LongCounterMetric cloneTaskIntraNodeCopyBytes = new LongCounterMetric(
"clone_task_copy_bytes", MetricUnit.BYTES, "clone task copy bytes") {
@Override
public Long getValue() {
return GlobalStateMgr.getCurrentState().getTabletScheduler().getStat().counterCloneTaskIntraNodeCopyBytes
.longValue();
}
};
cloneTaskIntraNodeCopyBytes.addLabel(new MetricLabel("type", BalanceStat.INTRA_NODE));
STARROCKS_METRIC_REGISTER.addMetric(cloneTaskIntraNodeCopyBytes);
// clone task copy duration
LongCounterMetric cloneTaskInterNodeCopyDurationMs = new LongCounterMetric(
"clone_task_copy_duration_ms", MetricUnit.MILLISECONDS, "clone task copy duration ms") {
@Override
public Long getValue() {
return GlobalStateMgr.getCurrentState().getTabletScheduler().getStat().counterCloneTaskInterNodeCopyDurationMs
.longValue();
}
};
cloneTaskInterNodeCopyDurationMs.addLabel(new MetricLabel("type", BalanceStat.INTER_NODE));
STARROCKS_METRIC_REGISTER.addMetric(cloneTaskInterNodeCopyDurationMs);
LongCounterMetric cloneTaskIntraNodeCopyDurationMs = new LongCounterMetric(
"clone_task_copy_duration_ms", MetricUnit.MILLISECONDS, "clone task copy duration ms") {
@Override
public Long getValue() {
return GlobalStateMgr.getCurrentState().getTabletScheduler().getStat().counterCloneTaskIntraNodeCopyDurationMs
.longValue();
}
};
cloneTaskIntraNodeCopyDurationMs.addLabel(new MetricLabel("type", BalanceStat.INTRA_NODE));
STARROCKS_METRIC_REGISTER.addMetric(cloneTaskIntraNodeCopyDurationMs);
}
// to generate the metrics related to tablets of each backend
// this metric is reentrant, so that we can add or remove metric along with the backend add or remove
// at runtime.

View File

@ -303,6 +303,7 @@ public class TabletSchedCtxTest {
Assertions.assertEquals("1001", results.get(0));
Assertions.assertEquals("BALANCE", results.get(1));
Assertions.assertEquals("INTER_NODE_TABLET_DISTRIBUTION", results.get(3));
Assertions.assertFalse(ctx.isIntraNodeBalance());
}
@Test

View File

@ -31,9 +31,17 @@ public class TabletSchedulerStatTest {
// update
stat.counterCloneTask.incrementAndGet();
stat.counterCloneTaskSucceeded.incrementAndGet();
stat.counterCloneTaskInterNodeCopyBytes.addAndGet(100L);
stat.counterCloneTaskIntraNodeCopyBytes.addAndGet(101L);
stat.counterCloneTaskInterNodeCopyDurationMs.addAndGet(10L);
stat.counterCloneTaskIntraNodeCopyDurationMs.addAndGet(11L);
Assertions.assertEquals(1L, stat.counterCloneTask.get());
Assertions.assertEquals(1L, stat.counterCloneTaskSucceeded.get());
Assertions.assertEquals(0L, stat.counterCloneTaskFailed.get());
Assertions.assertEquals(100L, stat.counterCloneTaskInterNodeCopyBytes.get());
Assertions.assertEquals(101L, stat.counterCloneTaskIntraNodeCopyBytes.get());
Assertions.assertEquals(10L, stat.counterCloneTaskInterNodeCopyDurationMs.get());
Assertions.assertEquals(11L, stat.counterCloneTaskIntraNodeCopyDurationMs.get());
Assertions.assertNull(stat.getLastSnapshot());
long lastSnapShotTime = ((AtomicLong) Deencapsulation.getField(stat, "lastSnapshotTime")).get();
Assertions.assertEquals(-1L, lastSnapShotTime);
@ -53,7 +61,7 @@ public class TabletSchedulerStatTest {
// test getBrief
List<List<String>> infos = stat.getBrief();
Assertions.assertEquals(27, infos.size());
Assertions.assertEquals(30, infos.size());
for (List<String> info : infos) {
if (info.get(0).equals("num of clone task")) {
Assertions.assertEquals("1", info.get(1));

View File

@ -544,6 +544,7 @@ public class TabletSchedulerTest {
TabletScheduler tabletScheduler = new TabletScheduler(new TabletSchedulerStat());
Deencapsulation.invoke(tabletScheduler, "addToPendingTablets", ctx);
Assertions.assertEquals(1L, tabletScheduler.getPendingNum(TabletSchedCtx.Type.REPAIR));
Deencapsulation.invoke(tabletScheduler, "schedulePendingTablets");
Assertions.assertEquals(TabletSchedCtx.State.UNEXPECTED, ctx.getState());
@ -573,6 +574,7 @@ public class TabletSchedulerTest {
TabletScheduler tabletScheduler = new TabletScheduler(new TabletSchedulerStat());
Deencapsulation.invoke(tabletScheduler, "addToRunningTablets", ctx);
Assertions.assertEquals(1L, tabletScheduler.getRunningNum(TabletSchedCtx.Type.REPAIR));
tabletScheduler.finishCloneTask(task, new TFinishTaskRequest());
Assertions.assertEquals(TabletSchedCtx.State.UNEXPECTED, ctx.getState());

View File

@ -17,17 +17,24 @@
package com.starrocks.metric;
import com.starrocks.clone.TabletSchedCtx;
import com.starrocks.clone.TabletScheduler;
import com.starrocks.clone.TabletSchedulerStat;
import com.starrocks.common.AnalysisException;
import com.starrocks.common.FeConstants;
import com.starrocks.common.jmockit.Deencapsulation;
import com.starrocks.common.proc.JvmMonitorProcDir;
import com.starrocks.ha.FrontendNodeType;
import com.starrocks.monitor.jvm.JvmStatCollector;
import com.starrocks.monitor.jvm.JvmStats;
import com.starrocks.server.GlobalStateMgr;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import java.util.Arrays;
import java.util.List;
import java.util.Set;
public class MetricsTest {
@ -218,4 +225,98 @@ public class MetricsTest {
Assertions.assertTrue(output.contains(metricName));
}
}
@Test
public void testCloneMetrics() {
GlobalStateMgr globalStateMgr = GlobalStateMgr.getCurrentState();
globalStateMgr.setFrontendNodeType(FrontendNodeType.LEADER);
TabletScheduler tabletScheduler = globalStateMgr.getTabletScheduler();
TabletSchedulerStat stat = tabletScheduler.getStat();
stat.counterCloneTask.incrementAndGet(); // 1
stat.counterCloneTaskSucceeded.incrementAndGet(); // 1
stat.counterCloneTaskInterNodeCopyBytes.addAndGet(100L);
stat.counterCloneTaskInterNodeCopyDurationMs.addAndGet(10L);
stat.counterCloneTaskIntraNodeCopyBytes.addAndGet(101L);
stat.counterCloneTaskIntraNodeCopyDurationMs.addAndGet(11L);
TabletSchedCtx ctx1 = new TabletSchedCtx(TabletSchedCtx.Type.BALANCE, 1L, 2L, 3L, 4L, 1001L, System.currentTimeMillis());
Deencapsulation.invoke(tabletScheduler, "addToPendingTablets", ctx1);
TabletSchedCtx ctx2 = new TabletSchedCtx(TabletSchedCtx.Type.REPAIR, 1L, 2L, 3L, 4L, 1002L, System.currentTimeMillis());
Deencapsulation.invoke(tabletScheduler, "addToRunningTablets", ctx2);
Set<Long> allTabletIds = Deencapsulation.getField(tabletScheduler, "allTabletIds");
allTabletIds.add(1001L);
allTabletIds.add(1002L);
// check
List<Metric> scheduledTabletNum = MetricRepo.getMetricsByName("scheduled_tablet_num");
Assertions.assertEquals(1, scheduledTabletNum.size());
Assertions.assertEquals(2L, (Long) scheduledTabletNum.get(0).getValue());
List<Metric> scheduledPendingTabletNums = MetricRepo.getMetricsByName("scheduled_pending_tablet_num");
Assertions.assertEquals(2, scheduledPendingTabletNums.size());
for (Metric metric : scheduledPendingTabletNums) {
// label 0 is is_leader
MetricLabel label = (MetricLabel) metric.getLabels().get(1);
String type = label.getValue();
if (type.equals("REPAIR")) {
Assertions.assertEquals(0L, metric.getValue());
} else if (type.equals("BALANCE")) {
Assertions.assertEquals(1L, metric.getValue());
} else {
Assertions.fail("Unknown type: " + type);
}
}
List<Metric> scheduledRunningTabletNums = MetricRepo.getMetricsByName("scheduled_running_tablet_num");
Assertions.assertEquals(2, scheduledRunningTabletNums.size());
for (Metric metric : scheduledRunningTabletNums) {
// label 0 is is_leader
MetricLabel label = (MetricLabel) metric.getLabels().get(1);
String type = label.getValue();
if (type.equals("REPAIR")) {
Assertions.assertEquals(1L, metric.getValue());
} else if (type.equals("BALANCE")) {
Assertions.assertEquals(0L, metric.getValue());
} else {
Assertions.fail("Unknown type: " + type);
}
}
List<Metric> cloneTaskTotal = MetricRepo.getMetricsByName("clone_task_total");
Assertions.assertEquals(1, cloneTaskTotal.size());
Assertions.assertEquals(1L, (Long) cloneTaskTotal.get(0).getValue());
List<Metric> cloneTaskSuccess = MetricRepo.getMetricsByName("clone_task_success");
Assertions.assertEquals(1, cloneTaskSuccess.size());
Assertions.assertEquals(1L, (Long) cloneTaskSuccess.get(0).getValue());
List<Metric> cloneTaskCopyBytes = MetricRepo.getMetricsByName("clone_task_copy_bytes");
Assertions.assertEquals(2, cloneTaskCopyBytes.size());
for (Metric metric : cloneTaskCopyBytes) {
MetricLabel label = (MetricLabel) metric.getLabels().get(0);
String type = label.getValue();
if (type.equals("INTER_NODE")) {
Assertions.assertEquals(100L, metric.getValue());
} else if (type.equals("INTRA_NODE")) {
Assertions.assertEquals(101L, metric.getValue());
} else {
Assertions.fail("Unknown type: " + type);
}
}
List<Metric> cloneTaskCopyDurationMs = MetricRepo.getMetricsByName("clone_task_copy_duration_ms");
Assertions.assertEquals(2, cloneTaskCopyDurationMs.size());
for (Metric metric : cloneTaskCopyDurationMs) {
MetricLabel label = (MetricLabel) metric.getLabels().get(0);
String type = label.getValue();
if (type.equals("INTER_NODE")) {
Assertions.assertEquals(10L, metric.getValue());
} else if (type.equals("INTRA_NODE")) {
Assertions.assertEquals(11L, metric.getValue());
} else {
Assertions.fail("Unknown type: " + type);
}
}
}
}