[Enhancement] Collect FE's machine info including cpu and mac address (#61154)

Signed-off-by: gengjun-git <gengjun@starrocks.com>
This commit is contained in:
gengjun-git 2025-07-24 15:19:11 +08:00 committed by GitHub
parent 5bc153445d
commit 167adca471
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
11 changed files with 294 additions and 16 deletions

View File

@ -0,0 +1,209 @@
// Copyright 2021-present StarRocks, Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package com.starrocks.common.util;
import com.google.common.base.Strings;
import com.starrocks.service.FrontendOptions;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.net.InetAddress;
import java.net.NetworkInterface;
import java.nio.file.FileStore;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.HashSet;
import java.util.Set;
public class MachineInfo {
private static final Logger LOG = LogManager.getLogger(MachineInfo.class);
private MachineInfo() {}
private static class Holder {
private static final MachineInfo INSTANCE = new MachineInfo();
}
public static MachineInfo getInstance() {
return Holder.INSTANCE;
}
private volatile Integer cpuCores = null;
private volatile String macAddress = null;
public int getCpuCores() {
if (cpuCores == null) {
synchronized (this) {
if (cpuCores == null) {
cpuCores = computeCpuCores();
}
}
}
return cpuCores;
}
public String getMacAddress() {
if (macAddress == null) {
synchronized (this) {
if (macAddress == null) {
macAddress = computeMacAddress();
}
}
}
return macAddress;
}
private int computeCpuCores() {
int hostCores = 0;
try (BufferedReader reader = new BufferedReader(new FileReader("/proc/cpuinfo"))) {
String line;
while ((line = reader.readLine()) != null) {
int colon = line.indexOf(':');
if (colon != -1) {
String name = line.substring(0, colon).trim();
if (name.equals("processor")) {
hostCores++;
}
}
}
} catch (IOException ignored) {
}
return getCgroupCpuLimit(hostCores);
}
private int getCgroupCpuLimit(int defaultCores) {
// If not in a Docker container, return the default cores
if (!Files.exists(Paths.get("/.dockerenv"))) {
return defaultCores;
}
int cfsNumCores = defaultCores;
int cpusetNumCores = defaultCores;
try {
String fsType = getFsType("/sys/fs/cgroup");
String cfsPeriodStr = null;
String cfsQuotaStr = null;
String cpusetStr = null;
// Determine the cgroup filesystem type and read the appropriate files
if ("tmpfs".equals(fsType)) {
cfsPeriodStr = readFileTrim("/sys/fs/cgroup/cpu/cpu.cfs_period_us");
cfsQuotaStr = readFileTrim("/sys/fs/cgroup/cpu/cpu.cfs_quota_us");
cpusetStr = readFileTrim("/sys/fs/cgroup/cpuset/cpuset.cpus");
} else if ("cgroup2".equals(fsType)) {
String cpuMax = readFileTrim("/sys/fs/cgroup/cpu.max");
if (!Strings.isNullOrEmpty(cpuMax)) {
String[] parts = cpuMax.split("\\s+");
if (parts.length >= 2) {
cfsQuotaStr = parts[0];
cfsPeriodStr = parts[1];
}
}
cpusetStr = readFileTrim("/sys/fs/cgroup/cpuset.cpus");
}
// Parse the cgroup files to determine CPU limits
if (cfsPeriodStr != null && cfsQuotaStr != null) {
try {
int cfsPeriod = Integer.parseInt(cfsPeriodStr.trim());
int cfsQuota = Integer.parseInt(cfsQuotaStr.trim());
if (cfsQuota > 0 && cfsPeriod > 0) {
cfsNumCores = cfsQuota / cfsPeriod;
}
} catch (NumberFormatException ignored) {
}
}
// Parse the cpuset.cpus file to determine available CPU cores
if (!Strings.isNullOrEmpty(cpusetStr)) {
Set<Integer> cpusetCores = parseCpusetCpus(cpusetStr);
cpusetCores.removeAll(getOfflineCores());
cpusetNumCores = cpusetCores.size();
}
} catch (Exception ignored) {
}
// Ensure the number of cores is at least 1 and get the minimum of the two limits
return Math.max(1, Math.min(cfsNumCores, cpusetNumCores));
}
private String getFsType(String path) throws Exception {
Path cgroupPath = Paths.get(path);
FileStore store = Files.getFileStore(cgroupPath);
return store.type();
}
private String readFileTrim(String path) {
try {
return Files.readString(Paths.get(path)).trim();
} catch (Exception e) {
return null;
}
}
private Set<Integer> parseCpusetCpus(String cpusStr) {
Set<Integer> cpuids = new HashSet<>();
for (String part : cpusStr.split(",")) {
part = part.trim();
if (part.contains("-")) {
String[] range = part.split("-");
if (range.length == 2) {
int start = Integer.parseInt(range[0].trim());
int end = Integer.parseInt(range[1].trim());
for (int i = start; i <= end; i++) {
cpuids.add(i);
}
}
} else if (!part.isEmpty()) {
cpuids.add(Integer.parseInt(part));
}
}
return cpuids;
}
private Set<Integer> getOfflineCores() {
Set<Integer> offline = new HashSet<>();
String offlineStr = readFileTrim("/sys/devices/system/cpu/offline");
if (offlineStr != null && !offlineStr.isEmpty()) {
offline.addAll(parseCpusetCpus(offlineStr));
}
return offline;
}
private String computeMacAddress() {
if (Files.exists(Paths.get("/.dockerenv"))) {
return "unknown";
}
InetAddress address = FrontendOptions.getLocalAddr();
try {
NetworkInterface networkInterface = NetworkInterface.getByInetAddress(address);
if (networkInterface == null) {
LOG.warn("cannot find network interface by host {}", address.getHostAddress());
return "unknown";
}
byte[] macAddressBytes = networkInterface.getHardwareAddress();
if (macAddressBytes == null) {
LOG.warn("cannot get mac address of network interface {}", networkInterface.getName());
return "unknown";
}
StringBuilder macAddressBuilder = new StringBuilder();
for (int i = 0; i < macAddressBytes.length; i++) {
macAddressBuilder.append(String.format("%02X%s", macAddressBytes[i],
(i < macAddressBytes.length - 1) ? "-" : ""));
}
return macAddressBuilder.toString();
} catch (Exception e) {
LOG.warn("get mac address failed", e);
return "unknown";
}
}
}

View File

@ -40,6 +40,7 @@ import com.google.gson.annotations.SerializedName;
import com.starrocks.common.Config;
import com.starrocks.common.DdlException;
import com.starrocks.common.Version;
import com.starrocks.common.util.MachineInfo;
import com.starrocks.http.ActionController;
import com.starrocks.http.BaseRequest;
import com.starrocks.http.BaseResponse;
@ -79,15 +80,11 @@ public class BootstrapFinishAction extends RestBaseAction {
result = new BootstrapResult();
String token = request.getSingleParameter(TOKEN);
if (!Strings.isNullOrEmpty(token)) {
if (result.status == ActionStatus.OK) {
if (!token.equals(GlobalStateMgr.getCurrentState().getNodeMgr().getToken())) {
result.status = ActionStatus.FAILED;
LOG.info("invalid token: {}", token);
result.msg = "invalid parameter";
}
}
if (result.status == ActionStatus.OK) {
if (!token.equals(GlobalStateMgr.getCurrentState().getNodeMgr().getToken())) {
result.status = ActionStatus.FAILED;
LOG.info("invalid token: {}", token);
result.msg = "invalid parameter";
} else {
// cluster id and token are valid, return replayed journal id
long replayedJournalId = GlobalStateMgr.getCurrentState().getReplayedJournalId();
long feStartTime = GlobalStateMgr.getCurrentState().getFeStartTime();
@ -97,13 +94,15 @@ public class BootstrapFinishAction extends RestBaseAction {
result.setFeStartTime(feStartTime);
result.setFeVersion(Version.STARROCKS_VERSION + "-" + Version.STARROCKS_COMMIT_HASH);
result.setHeapUsedPercent(JvmStats.getJvmHeapUsedPercent());
result.setCpuCores(MachineInfo.getInstance().getCpuCores());
result.setMacAddress(MachineInfo.getInstance().getMacAddress());
}
}
} else {
result = new BootstrapResult("not ready");
}
// send result
// send the result
response.setContentType("application/json");
response.getContent().append(result.toJson());
sendResult(request, response);
@ -122,6 +121,10 @@ public class BootstrapFinishAction extends RestBaseAction {
private String feVersion;
@SerializedName("heapUsedPercent")
private float heapUsedPercent;
@SerializedName("cpuCores")
private int cpuCores;
@SerializedName("macAddress")
private String macAddress;
public BootstrapResult() {
super();
@ -179,6 +182,22 @@ public class BootstrapFinishAction extends RestBaseAction {
this.heapUsedPercent = heapUsedPercent;
}
public int getCpuCores() {
return cpuCores;
}
public void setCpuCores(int cpuCores) {
this.cpuCores = cpuCores;
}
public String getMacAddress() {
return macAddress;
}
public void setMacAddress(String macAddress) {
this.macAddress = macAddress;
}
@Override
public String toJson() {
Gson gson = new Gson();

View File

@ -1279,4 +1279,11 @@ public class NodeMgr {
public static boolean isFeNodeNameValid(String nodeName, String host, int port) {
return nodeName.startsWith(host + "_" + port);
}
public long getTotalCpuCores() {
return frontends.values()
.stream()
.mapToLong(Frontend::getCpuCores)
.sum() + systemInfo.getTotalCpuCores();
}
}

View File

@ -305,6 +305,10 @@ public class FrontendOptions {
return InetAddresses.toAddrString(localAddr);
}
public static InetAddress getLocalAddr() {
return localAddr;
}
public static String getHostname() {
return localAddr.getHostName();
}

View File

@ -60,6 +60,8 @@ public class Frontend extends JsonWriter {
private int editLogPort;
@SerializedName(value = "fid")
private int fid = 0;
@SerializedName("macAddress")
private String macAddress;
private int queryPort;
private int rpcPort;
@ -75,6 +77,7 @@ public class Frontend extends JsonWriter {
private int heartbeatRetryTimes = 0;
private float heapUsedPercent;
private int cpuCores;
public Frontend() {
}
@ -169,6 +172,14 @@ public class Frontend extends JsonWriter {
this.fid = fid;
}
public String getMacAddress() {
return macAddress;
}
public int getCpuCores() {
return cpuCores;
}
/**
* handle Frontend's heartbeat response.
*/
@ -186,6 +197,8 @@ public class Frontend extends JsonWriter {
heartbeatErrMsg = "";
heartbeatRetryTimes = 0;
heapUsedPercent = hbResponse.getHeapUsedPercent();
cpuCores = hbResponse.getCpuCores();
macAddress = hbResponse.getMacAddress();
} else {
if (this.heartbeatRetryTimes < Config.heartbeat_retry_times) {
this.heartbeatRetryTimes++;

View File

@ -61,6 +61,10 @@ public class FrontendHbResponse extends HeartbeatResponse implements Writable {
private long feStartTime;
@SerializedName(value = "feVersion")
private String feVersion;
@SerializedName("cpuCores")
private int cpuCores;
@SerializedName("macAddress")
private String macAddress;
private float heapUsedPercent;
@ -70,7 +74,7 @@ public class FrontendHbResponse extends HeartbeatResponse implements Writable {
public FrontendHbResponse(String name, int queryPort, int rpcPort,
long replayedJournalId, long hbTime, long feStartTime,
String feVersion, float heapUsedPercent) {
String feVersion, float heapUsedPercent, int cpuCores, String macAddress) {
super(HeartbeatResponse.Type.FRONTEND);
this.status = HbStatus.OK;
this.name = name;
@ -81,6 +85,8 @@ public class FrontendHbResponse extends HeartbeatResponse implements Writable {
this.feStartTime = feStartTime;
this.feVersion = feVersion;
this.heapUsedPercent = heapUsedPercent;
this.cpuCores = cpuCores;
this.macAddress = macAddress;
}
public FrontendHbResponse(String name, String errMsg) {
@ -122,6 +128,14 @@ public class FrontendHbResponse extends HeartbeatResponse implements Writable {
this.heapUsedPercent = heapUsedPercent;
}
public int getCpuCores() {
return cpuCores;
}
public String getMacAddress() {
return macAddress;
}
public static FrontendHbResponse read(DataInput in) throws IOException {
FrontendHbResponse result = new FrontendHbResponse();
result.readFields(in);

View File

@ -43,6 +43,7 @@ import com.starrocks.common.Config;
import com.starrocks.common.ThreadPoolManager;
import com.starrocks.common.Version;
import com.starrocks.common.util.FrontendDaemon;
import com.starrocks.common.util.MachineInfo;
import com.starrocks.common.util.NetUtils;
import com.starrocks.common.util.Util;
import com.starrocks.encryption.KeyMgr;
@ -359,7 +360,8 @@ public class HeartbeatMgr extends FrontendDaemon {
GlobalStateMgr.getCurrentState().getMaxJournalId(), System.currentTimeMillis(),
GlobalStateMgr.getCurrentState().getFeStartTime(),
Version.STARROCKS_VERSION + "-" + Version.STARROCKS_COMMIT_HASH,
JvmStats.getJvmHeapUsedPercent());
JvmStats.getJvmHeapUsedPercent(), MachineInfo.getInstance().getCpuCores(),
MachineInfo.getInstance().getMacAddress());
} else {
return new FrontendHbResponse(fe.getNodeName(), "not ready");
}
@ -382,7 +384,8 @@ public class HeartbeatMgr extends FrontendDaemon {
} else {
return new FrontendHbResponse(fe.getNodeName(), result.getQueryPort(), result.getRpcPort(),
result.getReplayedJournal(), System.currentTimeMillis(),
result.getFeStartTime(), result.getFeVersion(), result.getHeapUsedPercent());
result.getFeStartTime(), result.getFeVersion(), result.getHeapUsedPercent(),
result.getCpuCores(), result.getMacAddress());
}
} catch (Exception e) {
return new FrontendHbResponse(fe.getNodeName(),

View File

@ -1357,6 +1357,15 @@ public class SystemInfoService implements GsonPostProcessable {
LOG.debug("update path infos: {}", newPathInfos);
}
public long getTotalCpuCores() {
return Stream.concat(
idToBackendRef.values().stream(),
idToComputeNodeRef.values().stream()
)
.mapToLong(ComputeNode::getCpuCores)
.sum();
}
@Override
public void gsonPostProcess() throws IOException {
Map<Long, AtomicLong> idToReportVersion = new HashMap<>();

View File

@ -76,7 +76,7 @@ public class BDBHATest {
// one joined successfully
new Frontend(FrontendNodeType.FOLLOWER, "node1", "192.168.2.4", 9010)
.handleHbResponse(new FrontendHbResponse("n1", 8030, 9050,
1000, System.currentTimeMillis(), System.currentTimeMillis(), "v1", 0.5f),
1000, System.currentTimeMillis(), System.currentTimeMillis(), "v1", 0.5f, 1, null),
false);
Assertions.assertEquals(2,
environment.getReplicatedEnvironment().getRepMutableConfig().getElectableGroupSizeOverride());

View File

@ -393,7 +393,7 @@ public class SchedulerTestBase extends SchedulerTestNoneDBBase {
FrontendHbResponse hbResponse;
if (isAlive) {
hbResponse = new FrontendHbResponse(fe.getNodeName(), fe.getQueryPort(), fe.getRpcPort(),
fe.getReplayedJournalId(), fe.getLastUpdateTime(), startTimeMs, fe.getFeVersion(), 0.5f);
fe.getReplayedJournalId(), fe.getLastUpdateTime(), startTimeMs, fe.getFeVersion(), 0.5f, 1, null);
} else {
hbResponse = new FrontendHbResponse(fe.getNodeName(), "mock-dead-frontend");
}

View File

@ -59,7 +59,7 @@ public class NodeMgrTest {
NodeMgr nodeMgr = new NodeMgr();
Frontend fe = new Frontend(FrontendNodeType.FOLLOWER, "node1", "10.0.0.3", 9010);
fe.handleHbResponse(new FrontendHbResponse("node1", 9030, 9020, 1,
System.currentTimeMillis(), System.currentTimeMillis(), "v1", 0.5f), true);
System.currentTimeMillis(), System.currentTimeMillis(), "v1", 0.5f, 1, null), true);
nodeMgr.replayAddFrontend(fe);
Assertions.assertTrue(nodeMgr.checkFeExistByRPCPort("10.0.0.3", 9020));