[Enhancement] Make some fe metrics leader awareness (backport #63004) (#63038)

Signed-off-by: wyb <wybb86@gmail.com>
Co-authored-by: wyb <wybb86@gmail.com>
This commit is contained in:
mergify[bot] 2025-09-12 04:01:17 +00:00 committed by GitHub
parent df795c442f
commit 2fe00e59f3
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
12 changed files with 490 additions and 218 deletions

View File

@ -0,0 +1,24 @@
// Copyright 2021-present StarRocks, Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package com.starrocks.metric;
public abstract class LeaderAwareCounterMetric<T> extends LeaderAwareMetric<T> {
public LeaderAwareCounterMetric(String name, MetricUnit unit, String description) {
super(name, MetricType.COUNTER, unit, description);
}
public abstract void increase(T delta);
}

View File

@ -0,0 +1,47 @@
// Copyright 2021-present StarRocks, Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package com.starrocks.metric;
import com.starrocks.sql.common.UnsupportedException;
import java.util.concurrent.atomic.AtomicLong;
/**
* LeaderAwareCounterMetricExternalLong represents a counter metric that is aware of the current fe's role (leader or follower)
* and whose value is maintained by an external `AtomicLong`.
*/
public class LeaderAwareCounterMetricExternalLong extends LeaderAwareCounterMetric<Long> {
private final AtomicLong counter;
public LeaderAwareCounterMetricExternalLong(String name, MetricUnit unit, String description, AtomicLong counter) {
super(name, unit, description);
this.counter = counter;
}
@Override
public void increase(Long delta) {
throw UnsupportedException.unsupportedException("Not support increase() because it uses external counter");
}
@Override
public Long getValueLeader() {
return counter.longValue();
}
@Override
public Long getValueNonLeader() {
return 0L;
}
}

View File

@ -0,0 +1,47 @@
// Copyright 2021-present StarRocks, Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package com.starrocks.metric;
import java.util.concurrent.atomic.LongAdder;
/**
* LeaderAwareCounterMetricLong represents a counter metric that is aware of the current fe's role (leader or follower)
* and internally maintains a long counter value.
*/
public class LeaderAwareCounterMetricLong extends LeaderAwareCounterMetric<Long> {
public LeaderAwareCounterMetricLong(String name, MetricUnit unit, String description) {
super(name, unit, description);
}
// LongAdder is used for purposes such as collecting statistics, not for fine-grained synchronization control.
// Under high contention, expected throughput of LongAdder is significantly higher than AtomicLong
private LongAdder value = new LongAdder();
@Override
public void increase(Long delta) {
value.add(delta);
}
@Override
public Long getValueLeader() {
return value.longValue();
}
@Override
public Long getValueNonLeader() {
return 0L;
}
}

View File

@ -14,38 +14,10 @@
package com.starrocks.metric;
import com.starrocks.common.FeConstants;
import com.starrocks.server.GlobalStateMgr;
/**
* Gauge metric is updated every time it is visited. Additionally, label {is_leader="true|false"} will be added automatically
* based on current node role.
*/
public abstract class LeaderAwareGaugeMetric<T> extends GaugeMetric<T> {
private boolean isLeader;
public abstract class LeaderAwareGaugeMetric<T> extends LeaderAwareMetric<T> {
public LeaderAwareGaugeMetric(String name, MetricUnit unit, String description) {
super(name, unit, description);
isLeader = GlobalStateMgr.getCurrentState().isLeader();
addLabel(getLabel(isLeader));
}
@Override
public T getValue() {
boolean leader = GlobalStateMgr.getCurrentState().isLeader();
if (isLeader != leader) {
addLabel(getLabel(leader));
isLeader = leader;
}
return isLeader ? getValueLeader() : getValueNonLeader();
}
public abstract T getValueNonLeader();
public abstract T getValueLeader();
private static MetricLabel getLabel(boolean isLeader) {
return new MetricLabel(FeConstants.METRIC_LABEL_IS_LEADER, String.valueOf(isLeader));
super(name, MetricType.GAUGE, unit, description);
}
}

View File

@ -0,0 +1,26 @@
// Copyright 2021-present StarRocks, Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package com.starrocks.metric;
public abstract class LeaderAwareGaugeMetricInteger extends LeaderAwareGaugeMetric<Integer> {
public LeaderAwareGaugeMetricInteger(String name, MetricUnit unit, String description) {
super(name, unit, description);
}
@Override
public Integer getValueNonLeader() {
return 0;
}
}

View File

@ -0,0 +1,53 @@
// Copyright 2021-present StarRocks, Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package com.starrocks.metric;
import com.starrocks.common.FeConstants;
import com.starrocks.server.GlobalStateMgr;
/**
* LeaderAwareMetric automatically adds a label to distinguish
* between metrics reported by the leader and follower FE in the cluster.
*
* The label {is_leader="true|false"} is based on current node role.
*/
public abstract class LeaderAwareMetric<T> extends Metric<T> {
private boolean isLeader;
public LeaderAwareMetric(String name, MetricType type, MetricUnit unit, String description) {
super(name, type, unit, description);
isLeader = GlobalStateMgr.getCurrentState().isLeader();
addLabel(getLabel(isLeader));
}
@Override
public T getValue() {
boolean leader = GlobalStateMgr.getCurrentState().isLeader();
if (isLeader != leader) {
addLabel(getLabel(leader));
isLeader = leader;
}
return isLeader ? getValueLeader() : getValueNonLeader();
}
public abstract T getValueNonLeader();
public abstract T getValueLeader();
private static MetricLabel getLabel(boolean isLeader) {
return new MetricLabel(FeConstants.METRIC_LABEL_IS_LEADER, String.valueOf(isLeader));
}
}

View File

@ -38,7 +38,6 @@ import com.starrocks.common.Config;
import com.starrocks.qe.QueryDetail;
import com.starrocks.qe.QueryDetailQueue;
import com.starrocks.server.GlobalStateMgr;
import com.starrocks.server.RunMode;
import java.util.ArrayList;
import java.util.Comparator;
@ -119,21 +118,6 @@ public class MetricCalculator extends TimerTask {
lastTs = currentTs;
// max tablet compaction score of all backends
if (RunMode.isSharedDataMode()) {
MetricRepo.GAUGE_MAX_TABLET_COMPACTION_SCORE.setValue(
(long) GlobalStateMgr.getCurrentState().getCompactionMgr().getMaxCompactionScore());
} else {
long maxCompactionScore = 0;
List<Metric> compactionScoreMetrics = MetricRepo.getMetricsByName(MetricRepo.TABLET_MAX_COMPACTION_SCORE);
for (Metric metric : compactionScoreMetrics) {
if (((GaugeMetric<Long>) metric).getValue() > maxCompactionScore) {
maxCompactionScore = ((GaugeMetric<Long>) metric).getValue();
}
}
MetricRepo.GAUGE_MAX_TABLET_COMPACTION_SCORE.setValue(maxCompactionScore);
}
// query latency
List<QueryDetail> queryList = QueryDetailQueue.getQueryDetailsAfterTime(lastQueryEventTime);
List<Long> latencyList = new ArrayList<>();

View File

@ -50,6 +50,7 @@ import com.starrocks.catalog.Table;
import com.starrocks.catalog.TabletInvertedIndex;
import com.starrocks.clone.BalanceStat;
import com.starrocks.clone.TabletSchedCtx;
import com.starrocks.clone.TabletSchedulerStat;
import com.starrocks.common.Config;
import com.starrocks.common.DdlException;
import com.starrocks.common.StarRocksException;
@ -76,6 +77,7 @@ import com.starrocks.proto.PKafkaOffsetProxyResult;
import com.starrocks.qe.ConnectContext;
import com.starrocks.qe.scheduler.slot.BaseSlotManager;
import com.starrocks.server.GlobalStateMgr;
import com.starrocks.server.RunMode;
import com.starrocks.server.WarehouseManager;
import com.starrocks.service.ExecuteEnv;
import com.starrocks.staros.StarMgrServer;
@ -182,10 +184,10 @@ public final class MetricRepo {
public static LongCounterMetric COUNTER_EDIT_LOG_SIZE_BYTES;
public static LongCounterMetric COUNTER_IMAGE_WRITE;
public static LongCounterMetric COUNTER_IMAGE_PUSH;
public static LongCounterMetric COUNTER_TXN_REJECT;
public static LongCounterMetric COUNTER_TXN_BEGIN;
public static LongCounterMetric COUNTER_TXN_FAILED;
public static LongCounterMetric COUNTER_TXN_SUCCESS;
public static LeaderAwareCounterMetricLong COUNTER_TXN_REJECT;
public static LeaderAwareCounterMetricLong COUNTER_TXN_BEGIN;
public static LeaderAwareCounterMetricLong COUNTER_TXN_FAILED;
public static LeaderAwareCounterMetricLong COUNTER_TXN_SUCCESS;
public static LongCounterMetric COUNTER_ROUTINE_LOAD_ROWS;
public static LongCounterMetric COUNTER_ROUTINE_LOAD_RECEIVED_BYTES;
public static LongCounterMetric COUNTER_ROUTINE_LOAD_ERROR_ROWS;
@ -225,12 +227,12 @@ public final class MetricRepo {
public static GaugeMetricImpl<Double> GAUGE_QUERY_LATENCY_P95;
public static GaugeMetricImpl<Double> GAUGE_QUERY_LATENCY_P99;
public static GaugeMetricImpl<Double> GAUGE_QUERY_LATENCY_P999;
public static GaugeMetricImpl<Long> GAUGE_MAX_TABLET_COMPACTION_SCORE;
public static LeaderAwareGaugeMetric<Long> GAUGE_MAX_TABLET_COMPACTION_SCORE;
public static GaugeMetricImpl<Long> GAUGE_STACKED_JOURNAL_NUM;
public static GaugeMetricImpl<Long> GAUGE_ENCRYPTION_KEY_NUM;
public static List<GaugeMetricImpl<Long>> GAUGE_ROUTINE_LOAD_LAGS;
public static List<LeaderAwareGaugeMetric<Long>> GAUGE_ROUTINE_LOAD_LAGS;
public static List<GaugeMetricImpl<Long>> GAUGE_MEMORY_USAGE_STATS;
public static List<GaugeMetricImpl<Long>> GAUGE_OBJECT_COUNT_STATS;
@ -387,9 +389,26 @@ public final class MetricRepo {
GAUGE_QUERY_TIMEOUT_RATE.setValue(0.0);
STARROCKS_METRIC_REGISTER.addMetric(GAUGE_QUERY_TIMEOUT_RATE);
GAUGE_MAX_TABLET_COMPACTION_SCORE = new GaugeMetricImpl<>("max_tablet_compaction_score",
MetricUnit.NOUNIT, "max tablet compaction score of all backends");
GAUGE_MAX_TABLET_COMPACTION_SCORE.setValue(0L);
// max tablet compaction score of all backends
GAUGE_MAX_TABLET_COMPACTION_SCORE = new LeaderAwareGaugeMetricLong("max_tablet_compaction_score", MetricUnit.NOUNIT,
"max tablet compaction score of all backends") {
@Override
public Long getValueLeader() {
if (RunMode.isSharedDataMode()) {
return (long) GlobalStateMgr.getCurrentState().getCompactionMgr().getMaxCompactionScore();
} else {
long maxCompactionScore = 0;
List<Metric> compactionScoreMetrics = MetricRepo.getMetricsByName(MetricRepo.TABLET_MAX_COMPACTION_SCORE);
for (Metric metric : compactionScoreMetrics) {
long compactionScore = ((LeaderAwareGaugeMetric<Long>) metric).getValue();
if (compactionScore > maxCompactionScore) {
maxCompactionScore = compactionScore;
}
}
return maxCompactionScore;
}
}
};
STARROCKS_METRIC_REGISTER.addMetric(GAUGE_MAX_TABLET_COMPACTION_SCORE);
GAUGE_STACKED_JOURNAL_NUM = new GaugeMetricImpl<>(
@ -546,19 +565,21 @@ public final class MetricRepo {
"total analysis error query");
STARROCKS_METRIC_REGISTER.addMetric(COUNTER_QUERY_ANALYSIS_ERR);
COUNTER_QUERY_INTERNAL_ERR = new LongCounterMetric("query_internal_err", MetricUnit.REQUESTS,
COUNTER_QUERY_INTERNAL_ERR = new LongCounterMetric("query_internal_err", MetricUnit.REQUESTS,
"total internal error query");
STARROCKS_METRIC_REGISTER.addMetric(COUNTER_QUERY_INTERNAL_ERR);
COUNTER_TXN_REJECT =
new LongCounterMetric("txn_reject", MetricUnit.REQUESTS, "counter of rejected transactions");
new LeaderAwareCounterMetricLong("txn_reject", MetricUnit.REQUESTS, "counter of rejected transactions");
STARROCKS_METRIC_REGISTER.addMetric(COUNTER_TXN_REJECT);
COUNTER_TXN_BEGIN = new LongCounterMetric("txn_begin", MetricUnit.REQUESTS, "counter of beginning transactions");
COUNTER_TXN_BEGIN =
new LeaderAwareCounterMetricLong("txn_begin", MetricUnit.REQUESTS, "counter of beginning transactions");
STARROCKS_METRIC_REGISTER.addMetric(COUNTER_TXN_BEGIN);
COUNTER_TXN_SUCCESS =
new LongCounterMetric("txn_success", MetricUnit.REQUESTS, "counter of success transactions");
new LeaderAwareCounterMetricLong("txn_success", MetricUnit.REQUESTS, "counter of success transactions");
STARROCKS_METRIC_REGISTER.addMetric(COUNTER_TXN_SUCCESS);
COUNTER_TXN_FAILED = new LongCounterMetric("txn_failed", MetricUnit.REQUESTS, "counter of failed transactions");
COUNTER_TXN_FAILED =
new LeaderAwareCounterMetricLong("txn_failed", MetricUnit.REQUESTS, "counter of failed transactions");
STARROCKS_METRIC_REGISTER.addMetric(COUNTER_TXN_FAILED);
COUNTER_ROUTINE_LOAD_ROWS =
@ -720,67 +741,37 @@ public final class MetricRepo {
// counter metrics
// clone task total
LongCounterMetric cloneTaskTotal = new LongCounterMetric("clone_task_total", MetricUnit.REQUESTS, "total clone task") {
@Override
public Long getValue() {
return GlobalStateMgr.getCurrentState().getTabletScheduler().getStat().counterCloneTask.longValue();
}
};
TabletSchedulerStat stat = GlobalStateMgr.getCurrentState().getTabletScheduler().getStat();
LeaderAwareCounterMetric cloneTaskTotal = new LeaderAwareCounterMetricExternalLong(
"clone_task_total", MetricUnit.REQUESTS, "total clone task", stat.counterCloneTask);
STARROCKS_METRIC_REGISTER.addMetric(cloneTaskTotal);
// clone task success
LongCounterMetric cloneTaskSuccess = new LongCounterMetric(
"clone_task_success", MetricUnit.REQUESTS, "success clone task") {
@Override
public Long getValue() {
return GlobalStateMgr.getCurrentState().getTabletScheduler().getStat().counterCloneTaskSucceeded.longValue();
}
};
LeaderAwareCounterMetric cloneTaskSuccess = new LeaderAwareCounterMetricExternalLong(
"clone_task_success", MetricUnit.REQUESTS, "success clone task", stat.counterCloneTaskSucceeded);
STARROCKS_METRIC_REGISTER.addMetric(cloneTaskSuccess);
// clone task copy bytes
LongCounterMetric cloneTaskInterNodeCopyBytes = new LongCounterMetric(
"clone_task_copy_bytes", MetricUnit.BYTES, "clone task copy bytes") {
@Override
public Long getValue() {
return GlobalStateMgr.getCurrentState().getTabletScheduler().getStat().counterCloneTaskInterNodeCopyBytes
.longValue();
}
};
LeaderAwareCounterMetric cloneTaskInterNodeCopyBytes = new LeaderAwareCounterMetricExternalLong(
"clone_task_copy_bytes", MetricUnit.BYTES, "clone task copy bytes", stat.counterCloneTaskInterNodeCopyBytes);
cloneTaskInterNodeCopyBytes.addLabel(new MetricLabel("type", BalanceStat.INTER_NODE));
STARROCKS_METRIC_REGISTER.addMetric(cloneTaskInterNodeCopyBytes);
LongCounterMetric cloneTaskIntraNodeCopyBytes = new LongCounterMetric(
"clone_task_copy_bytes", MetricUnit.BYTES, "clone task copy bytes") {
@Override
public Long getValue() {
return GlobalStateMgr.getCurrentState().getTabletScheduler().getStat().counterCloneTaskIntraNodeCopyBytes
.longValue();
}
};
LeaderAwareCounterMetric cloneTaskIntraNodeCopyBytes = new LeaderAwareCounterMetricExternalLong(
"clone_task_copy_bytes", MetricUnit.BYTES, "clone task copy bytes", stat.counterCloneTaskIntraNodeCopyBytes);
cloneTaskIntraNodeCopyBytes.addLabel(new MetricLabel("type", BalanceStat.INTRA_NODE));
STARROCKS_METRIC_REGISTER.addMetric(cloneTaskIntraNodeCopyBytes);
// clone task copy duration
LongCounterMetric cloneTaskInterNodeCopyDurationMs = new LongCounterMetric(
"clone_task_copy_duration_ms", MetricUnit.MILLISECONDS, "clone task copy duration ms") {
@Override
public Long getValue() {
return GlobalStateMgr.getCurrentState().getTabletScheduler().getStat().counterCloneTaskInterNodeCopyDurationMs
.longValue();
}
};
LeaderAwareCounterMetric cloneTaskInterNodeCopyDurationMs = new LeaderAwareCounterMetricExternalLong(
"clone_task_copy_duration_ms", MetricUnit.MILLISECONDS, "clone task copy duration ms",
stat.counterCloneTaskInterNodeCopyDurationMs);
cloneTaskInterNodeCopyDurationMs.addLabel(new MetricLabel("type", BalanceStat.INTER_NODE));
STARROCKS_METRIC_REGISTER.addMetric(cloneTaskInterNodeCopyDurationMs);
LongCounterMetric cloneTaskIntraNodeCopyDurationMs = new LongCounterMetric(
"clone_task_copy_duration_ms", MetricUnit.MILLISECONDS, "clone task copy duration ms") {
@Override
public Long getValue() {
return GlobalStateMgr.getCurrentState().getTabletScheduler().getStat().counterCloneTaskIntraNodeCopyDurationMs
.longValue();
}
};
LeaderAwareCounterMetric cloneTaskIntraNodeCopyDurationMs = new LeaderAwareCounterMetricExternalLong(
"clone_task_copy_duration_ms", MetricUnit.MILLISECONDS, "clone task copy duration ms",
stat.counterCloneTaskIntraNodeCopyDurationMs);
cloneTaskIntraNodeCopyDurationMs.addLabel(new MetricLabel("type", BalanceStat.INTRA_NODE));
STARROCKS_METRIC_REGISTER.addMetric(cloneTaskIntraNodeCopyDurationMs);
}
@ -887,7 +878,7 @@ public final class MetricRepo {
Collectors.groupingBy(RoutineLoadJob::getWarehouseId)
);
List<GaugeMetricImpl<Long>> routineLoadLags = new ArrayList<>();
List<LeaderAwareGaugeMetric<Long>> routineLoadLags = new ArrayList<>();
// get all partitions offset in a batch api
final WarehouseManager warehouseManager = GlobalStateMgr.getCurrentState().getWarehouseMgr();
@ -954,11 +945,15 @@ public final class MetricRepo {
maxLag = Math.max(latestOffsets.get(j) - progress, maxLag);
}
if (maxLag >= Config.min_routine_load_lag_for_metrics) {
GaugeMetricImpl<Long> metric =
new GaugeMetricImpl<>("routine_load_max_lag_of_partition", MetricUnit.NOUNIT,
"routine load kafka lag");
long finalMaxLag = maxLag;
LeaderAwareGaugeMetric<Long> metric = new LeaderAwareGaugeMetricLong("routine_load_max_lag_of_partition",
MetricUnit.NOUNIT, "routine load kafka lag") {
@Override
public Long getValueLeader() {
return finalMaxLag;
}
};
metric.addLabel(new MetricLabel("job_name", kJob.getName()));
metric.setValue(maxLag);
routineLoadLags.add(metric);
}
}
@ -1176,7 +1171,7 @@ public final class MetricRepo {
}
private static void collectRoutineLoadProcessMetrics(MetricVisitor visitor) {
for (GaugeMetricImpl<Long> metric : GAUGE_ROUTINE_LOAD_LAGS) {
for (Metric<Long> metric : GAUGE_ROUTINE_LOAD_LAGS) {
visitor.visit(metric);
}
}
@ -1205,7 +1200,7 @@ public final class MetricRepo {
});
}
// collect runnning txns of per db
// collect running txns of per db
private static void collectDbRunningTxnMetrics(MetricVisitor visitor) {
Map<Long, DatabaseTransactionMgr> dbIdToDatabaseTransactionMgrs =
GlobalStateMgr.getCurrentState().getGlobalTransactionMgr().getAllDatabaseTransactionMgrs();
@ -1214,10 +1209,14 @@ public final class MetricRepo {
if (null == db) {
continue;
}
GaugeMetricImpl<Integer> txnNum = new GaugeMetricImpl<>("txn_running", MetricUnit.NOUNIT,
"number of running transactions");
LeaderAwareGaugeMetric<Integer> txnNum = new LeaderAwareGaugeMetricInteger("txn_running", MetricUnit.NOUNIT,
"number of running transactions") {
@Override
public Integer getValueLeader() {
return mgr.getRunningTxnNums();
}
};
txnNum.addLabel(new MetricLabel("db", db.getFullName()));
txnNum.setValue(mgr.getRunningTxnNums());
visitor.visit(txnNum);
}
}

View File

@ -57,6 +57,8 @@ import com.starrocks.common.AnalysisException;
import com.starrocks.common.Config;
import com.starrocks.common.DdlException;
import com.starrocks.common.ExceptionChecker;
import com.starrocks.metric.LongCounterMetric;
import com.starrocks.metric.Metric;
import com.starrocks.metric.MetricRepo;
import com.starrocks.persist.EditLog;
import com.starrocks.persist.ImageWriter;
@ -120,12 +122,19 @@ public class BackupHandlerTest {
private TabletInvertedIndex invertedIndex = new TabletInvertedIndex();
public void setUpMocker(GlobalStateMgr globalStateMgr, BrokerMgr brokerMgr, EditLog editLog) {
private void initMetrics() {
MetricRepo.COUNTER_UNFINISHED_BACKUP_JOB = new LongCounterMetric("unfinished_backup_job", Metric.MetricUnit.REQUESTS,
"current unfinished backup job");
MetricRepo.COUNTER_UNFINISHED_RESTORE_JOB = new LongCounterMetric("unfinished_restore_job", Metric.MetricUnit.REQUESTS,
"current unfinished restore job");
}
private void setUpMocker(GlobalStateMgr globalStateMgr, BrokerMgr brokerMgr, EditLog editLog) {
Config.tmp_dir = tmpPath;
rootDir = new File(Config.tmp_dir);
rootDir.mkdirs();
MetricRepo.init();
initMetrics();
try {
db = CatalogMocker.mockDb();

View File

@ -0,0 +1,100 @@
// Copyright 2021-present StarRocks, Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package com.starrocks.metric;
import com.starrocks.server.GlobalStateMgr;
import mockit.Expectations;
import mockit.Mocked;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import java.util.concurrent.atomic.AtomicLong;
public class LeaderAwareCounterMetricTest {
private LeaderAwareCounterMetricLong metricLong;
private AtomicLong metricExternalLongValue;
private Metric<Long> metricExternalLong;
void createMetrics() {
metricLong = new LeaderAwareCounterMetricLong("metricLong", Metric.MetricUnit.SECONDS, "metric long");
metricExternalLongValue = new AtomicLong(0L);
metricExternalLong = new LeaderAwareCounterMetricExternalLong(
"metricExternalLong", Metric.MetricUnit.SECONDS, "metric external long", metricExternalLongValue);
}
@Test
public void testLeaderAwareMetricLeaderOutputLeader(@Mocked GlobalStateMgr globalStateMgr) {
new Expectations() {
{
GlobalStateMgr.getCurrentState();
result = globalStateMgr;
globalStateMgr.isLeader();
result = true;
}
};
// setup Expectations before creating the metrics
createMetrics();
{
metricLong.increase(10L);
MetricVisitor visitor = new PrometheusMetricVisitor("");
visitor.visit(metricLong);
// _metricLong{is_leader="true"} 10
String output = visitor.build();
Assertions.assertTrue(output.contains("_metricLong{is_leader=\"true\"} 10"), output);
}
{
metricExternalLongValue.addAndGet(11L);
MetricVisitor visitor = new PrometheusMetricVisitor("");
visitor.visit(metricExternalLong);
// _metricExternalLong{is_leader="true"} 11
String output = visitor.build();
Assertions.assertTrue(output.contains("_metricExternalLong{is_leader=\"true\"} 11"), output);
}
}
@Test
public void testLeaderAwareMetricLeaderOutputNonLeader(@Mocked GlobalStateMgr globalStateMgr) {
new Expectations() {
{
GlobalStateMgr.getCurrentState();
result = globalStateMgr;
globalStateMgr.isLeader();
result = false;
}
};
// setup Expectations before creating the metrics
createMetrics();
{
metricLong.increase(10L);
MetricVisitor visitor = new PrometheusMetricVisitor("");
visitor.visit(metricLong);
// _metricLong{is_leader="false"} 0
String output = visitor.build();
Assertions.assertTrue(output.contains("_metricLong{is_leader=\"false\"} 0"), output);
}
{
metricExternalLongValue.addAndGet(11L);
MetricVisitor visitor = new PrometheusMetricVisitor("");
visitor.visit(metricExternalLong);
// _metricExternalLong{is_leader="false"} 0
String output = visitor.build();
Assertions.assertTrue(output.contains("_metricExternalLong{is_leader=\"false\"} 0"), output);
}
}
}

View File

@ -15,6 +15,10 @@
package com.starrocks.metric;
import com.starrocks.catalog.Table;
import com.starrocks.clone.TabletSchedCtx;
import com.starrocks.clone.TabletScheduler;
import com.starrocks.clone.TabletSchedulerStat;
import com.starrocks.common.jmockit.Deencapsulation;
import com.starrocks.http.rest.MetricsAction;
import com.starrocks.rpc.BrpcProxy;
import com.starrocks.server.GlobalStateMgr;
@ -27,6 +31,7 @@ import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import java.util.List;
import java.util.Set;
public class MetricRepoTest extends PlanTestBase {
@ -138,6 +143,113 @@ public class MetricRepoTest extends PlanTestBase {
}
Assertions.assertTrue(line.contains("is_leader=\"true\""), line);
}
}
@Test
public void testCloneMetrics() {
GlobalStateMgr globalStateMgr = GlobalStateMgr.getCurrentState();
Assertions.assertTrue(globalStateMgr.isLeader());
TabletScheduler tabletScheduler = globalStateMgr.getTabletScheduler();
TabletSchedulerStat stat = tabletScheduler.getStat();
stat.counterCloneTask.incrementAndGet(); // 1
stat.counterCloneTaskSucceeded.incrementAndGet(); // 1
stat.counterCloneTaskInterNodeCopyBytes.addAndGet(100L);
stat.counterCloneTaskInterNodeCopyDurationMs.addAndGet(10L);
stat.counterCloneTaskIntraNodeCopyBytes.addAndGet(101L);
stat.counterCloneTaskIntraNodeCopyDurationMs.addAndGet(11L);
TabletSchedCtx ctx1 = new TabletSchedCtx(TabletSchedCtx.Type.BALANCE, 1L, 2L, 3L, 4L, 1001L, System.currentTimeMillis());
Deencapsulation.invoke(tabletScheduler, "addToPendingTablets", ctx1);
TabletSchedCtx ctx2 = new TabletSchedCtx(TabletSchedCtx.Type.REPAIR, 1L, 2L, 3L, 4L, 1002L, System.currentTimeMillis());
Deencapsulation.invoke(tabletScheduler, "addToRunningTablets", ctx2);
Set<Long> allTabletIds = Deencapsulation.getField(tabletScheduler, "allTabletIds");
allTabletIds.add(1001L);
allTabletIds.add(1002L);
// check
List<Metric> scheduledTabletNum = MetricRepo.getMetricsByName("scheduled_tablet_num");
Assertions.assertEquals(1, scheduledTabletNum.size());
Assertions.assertEquals(2L, (Long) scheduledTabletNum.get(0).getValue());
List<Metric> scheduledPendingTabletNums = MetricRepo.getMetricsByName("scheduled_pending_tablet_num");
Assertions.assertEquals(2, scheduledPendingTabletNums.size());
for (Metric metric : scheduledPendingTabletNums) {
// label 0 is is_leader
MetricLabel label0 = (MetricLabel) metric.getLabels().get(0);
Assertions.assertEquals("is_leader", label0.getKey());
Assertions.assertEquals("true", label0.getValue());
MetricLabel label1 = (MetricLabel) metric.getLabels().get(1);
String type = label1.getValue();
if (type.equals("REPAIR")) {
Assertions.assertEquals(0L, metric.getValue());
} else if (type.equals("BALANCE")) {
Assertions.assertEquals(1L, metric.getValue());
} else {
Assertions.fail("Unknown type: " + type);
}
}
List<Metric> scheduledRunningTabletNums = MetricRepo.getMetricsByName("scheduled_running_tablet_num");
Assertions.assertEquals(2, scheduledRunningTabletNums.size());
for (Metric metric : scheduledRunningTabletNums) {
// label 0 is is_leader
MetricLabel label = (MetricLabel) metric.getLabels().get(1);
String type = label.getValue();
if (type.equals("REPAIR")) {
Assertions.assertEquals(1L, metric.getValue());
} else if (type.equals("BALANCE")) {
Assertions.assertEquals(0L, metric.getValue());
} else {
Assertions.fail("Unknown type: " + type);
}
}
List<Metric> cloneTaskTotal = MetricRepo.getMetricsByName("clone_task_total");
Assertions.assertEquals(1, cloneTaskTotal.size());
Assertions.assertEquals(1L, (Long) cloneTaskTotal.get(0).getValue());
List<Metric> cloneTaskSuccess = MetricRepo.getMetricsByName("clone_task_success");
Assertions.assertEquals(1, cloneTaskSuccess.size());
Assertions.assertEquals(1L, (Long) cloneTaskSuccess.get(0).getValue());
List<Metric> cloneTaskCopyBytes = MetricRepo.getMetricsByName("clone_task_copy_bytes");
Assertions.assertEquals(2, cloneTaskCopyBytes.size());
for (Metric metric : cloneTaskCopyBytes) {
// label 0 is is_leader
MetricLabel label0 = (MetricLabel) metric.getLabels().get(0);
Assertions.assertEquals("is_leader", label0.getKey());
Assertions.assertEquals("true", label0.getValue());
MetricLabel label1 = (MetricLabel) metric.getLabels().get(1);
String type = label1.getValue();
if (type.equals("INTER_NODE")) {
Assertions.assertEquals(100L, metric.getValue());
} else if (type.equals("INTRA_NODE")) {
Assertions.assertEquals(101L, metric.getValue());
} else {
Assertions.fail("Unknown type: " + type);
}
}
List<Metric> cloneTaskCopyDurationMs = MetricRepo.getMetricsByName("clone_task_copy_duration_ms");
Assertions.assertEquals(2, cloneTaskCopyDurationMs.size());
for (Metric metric : cloneTaskCopyDurationMs) {
// label 0 is is_leader
MetricLabel label0 = (MetricLabel) metric.getLabels().get(0);
Assertions.assertEquals("is_leader", label0.getKey());
Assertions.assertEquals("true", label0.getValue());
MetricLabel label1 = (MetricLabel) metric.getLabels().get(1);
String type = label1.getValue();
if (type.equals("INTER_NODE")) {
Assertions.assertEquals(10L, metric.getValue());
} else if (type.equals("INTRA_NODE")) {
Assertions.assertEquals(11L, metric.getValue());
} else {
Assertions.fail("Unknown type: " + type);
}
}
}
}

View File

@ -17,24 +17,17 @@
package com.starrocks.metric;
import com.starrocks.clone.TabletSchedCtx;
import com.starrocks.clone.TabletScheduler;
import com.starrocks.clone.TabletSchedulerStat;
import com.starrocks.common.AnalysisException;
import com.starrocks.common.FeConstants;
import com.starrocks.common.jmockit.Deencapsulation;
import com.starrocks.common.proc.JvmMonitorProcDir;
import com.starrocks.ha.FrontendNodeType;
import com.starrocks.monitor.jvm.JvmStatCollector;
import com.starrocks.monitor.jvm.JvmStats;
import com.starrocks.server.GlobalStateMgr;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import java.util.Arrays;
import java.util.List;
import java.util.Set;
public class MetricsTest {
@ -225,98 +218,4 @@ public class MetricsTest {
Assertions.assertTrue(output.contains(metricName));
}
}
@Test
public void testCloneMetrics() {
GlobalStateMgr globalStateMgr = GlobalStateMgr.getCurrentState();
globalStateMgr.setFrontendNodeType(FrontendNodeType.LEADER);
TabletScheduler tabletScheduler = globalStateMgr.getTabletScheduler();
TabletSchedulerStat stat = tabletScheduler.getStat();
stat.counterCloneTask.incrementAndGet(); // 1
stat.counterCloneTaskSucceeded.incrementAndGet(); // 1
stat.counterCloneTaskInterNodeCopyBytes.addAndGet(100L);
stat.counterCloneTaskInterNodeCopyDurationMs.addAndGet(10L);
stat.counterCloneTaskIntraNodeCopyBytes.addAndGet(101L);
stat.counterCloneTaskIntraNodeCopyDurationMs.addAndGet(11L);
TabletSchedCtx ctx1 = new TabletSchedCtx(TabletSchedCtx.Type.BALANCE, 1L, 2L, 3L, 4L, 1001L, System.currentTimeMillis());
Deencapsulation.invoke(tabletScheduler, "addToPendingTablets", ctx1);
TabletSchedCtx ctx2 = new TabletSchedCtx(TabletSchedCtx.Type.REPAIR, 1L, 2L, 3L, 4L, 1002L, System.currentTimeMillis());
Deencapsulation.invoke(tabletScheduler, "addToRunningTablets", ctx2);
Set<Long> allTabletIds = Deencapsulation.getField(tabletScheduler, "allTabletIds");
allTabletIds.add(1001L);
allTabletIds.add(1002L);
// check
List<Metric> scheduledTabletNum = MetricRepo.getMetricsByName("scheduled_tablet_num");
Assertions.assertEquals(1, scheduledTabletNum.size());
Assertions.assertEquals(2L, (Long) scheduledTabletNum.get(0).getValue());
List<Metric> scheduledPendingTabletNums = MetricRepo.getMetricsByName("scheduled_pending_tablet_num");
Assertions.assertEquals(2, scheduledPendingTabletNums.size());
for (Metric metric : scheduledPendingTabletNums) {
// label 0 is is_leader
MetricLabel label = (MetricLabel) metric.getLabels().get(1);
String type = label.getValue();
if (type.equals("REPAIR")) {
Assertions.assertEquals(0L, metric.getValue());
} else if (type.equals("BALANCE")) {
Assertions.assertEquals(1L, metric.getValue());
} else {
Assertions.fail("Unknown type: " + type);
}
}
List<Metric> scheduledRunningTabletNums = MetricRepo.getMetricsByName("scheduled_running_tablet_num");
Assertions.assertEquals(2, scheduledRunningTabletNums.size());
for (Metric metric : scheduledRunningTabletNums) {
// label 0 is is_leader
MetricLabel label = (MetricLabel) metric.getLabels().get(1);
String type = label.getValue();
if (type.equals("REPAIR")) {
Assertions.assertEquals(1L, metric.getValue());
} else if (type.equals("BALANCE")) {
Assertions.assertEquals(0L, metric.getValue());
} else {
Assertions.fail("Unknown type: " + type);
}
}
List<Metric> cloneTaskTotal = MetricRepo.getMetricsByName("clone_task_total");
Assertions.assertEquals(1, cloneTaskTotal.size());
Assertions.assertEquals(1L, (Long) cloneTaskTotal.get(0).getValue());
List<Metric> cloneTaskSuccess = MetricRepo.getMetricsByName("clone_task_success");
Assertions.assertEquals(1, cloneTaskSuccess.size());
Assertions.assertEquals(1L, (Long) cloneTaskSuccess.get(0).getValue());
List<Metric> cloneTaskCopyBytes = MetricRepo.getMetricsByName("clone_task_copy_bytes");
Assertions.assertEquals(2, cloneTaskCopyBytes.size());
for (Metric metric : cloneTaskCopyBytes) {
MetricLabel label = (MetricLabel) metric.getLabels().get(0);
String type = label.getValue();
if (type.equals("INTER_NODE")) {
Assertions.assertEquals(100L, metric.getValue());
} else if (type.equals("INTRA_NODE")) {
Assertions.assertEquals(101L, metric.getValue());
} else {
Assertions.fail("Unknown type: " + type);
}
}
List<Metric> cloneTaskCopyDurationMs = MetricRepo.getMetricsByName("clone_task_copy_duration_ms");
Assertions.assertEquals(2, cloneTaskCopyDurationMs.size());
for (Metric metric : cloneTaskCopyDurationMs) {
MetricLabel label = (MetricLabel) metric.getLabels().get(0);
String type = label.getValue();
if (type.equals("INTER_NODE")) {
Assertions.assertEquals(10L, metric.getValue());
} else if (type.equals("INTRA_NODE")) {
Assertions.assertEquals(11L, metric.getValue());
} else {
Assertions.fail("Unknown type: " + type);
}
}
}
}