[Enhancement] Refactor balance type (backport #62163) (#62185)

Signed-off-by: wyb <wybb86@gmail.com>
Co-authored-by: wyb <wybb86@gmail.com>
This commit is contained in:
mergify[bot] 2025-08-21 08:43:32 +00:00 committed by GitHub
parent 0c3cdc90ca
commit 9db589ba27
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
10 changed files with 49 additions and 47 deletions

View File

@ -22,12 +22,12 @@ import java.util.Set;
public abstract class BalanceStat {
public enum BalanceType {
CLUSTER_DISK("inter-node disk usage"),
CLUSTER_TABLET("inter-node tablet distribution"),
BACKEND_DISK("intra-node disk usage"),
BACKEND_TABLET("intra-node tablet distribution"),
INTER_NODE_DISK_USAGE("inter-node disk usage"),
INTER_NODE_TABLET_DISTRIBUTION("inter-node tablet distribution"),
INTRA_NODE_DISK_USAGE("intra-node disk usage"),
INTRA_NODE_TABLET_DISTRIBUTION("intra-node tablet distribution"),
COLOCATION_GROUP("colocation group"),
LABEL_LOCATION("label-aware location");
LABEL_AWARE_LOCATION("label-aware location");
private final String label;
@ -143,7 +143,7 @@ public abstract class BalanceStat {
private double minUsedPercent;
public ClusterDiskBalanceStat(long maxBeId, long minBeId, double maxUsedPercent, double minUsedPercent) {
super(BalanceType.CLUSTER_DISK, maxBeId, minBeId);
super(BalanceType.INTER_NODE_DISK_USAGE, maxBeId, minBeId);
this.maxUsedPercent = maxUsedPercent;
this.minUsedPercent = minUsedPercent;
}
@ -157,7 +157,7 @@ public abstract class BalanceStat {
private long minTabletNum;
public ClusterTabletBalanceStat(long maxBeId, long minBeId, long maxTabletNum, long minTabletNum) {
super(BalanceType.CLUSTER_TABLET, maxBeId, minBeId);
super(BalanceType.INTER_NODE_TABLET_DISTRIBUTION, maxBeId, minBeId);
this.maxTabletNum = maxTabletNum;
this.minTabletNum = minTabletNum;
}
@ -187,7 +187,7 @@ public abstract class BalanceStat {
private double minUsedPercent;
public BackendDiskBalanceStat(long beId, String maxPath, String minPath, double maxUsedPercent, double minUsedPercent) {
super(BalanceType.BACKEND_DISK, beId, maxPath, minPath);
super(BalanceType.INTRA_NODE_DISK_USAGE, beId, maxPath, minPath);
this.maxUsedPercent = maxUsedPercent;
this.minUsedPercent = minUsedPercent;
}
@ -201,7 +201,7 @@ public abstract class BalanceStat {
private long minTabletNum;
public BackendTabletBalanceStat(long beId, String maxPath, String minPath, long maxTabletNum, long minTabletNum) {
super(BalanceType.BACKEND_TABLET, beId, maxPath, minPath);
super(BalanceType.INTRA_NODE_TABLET_DISTRIBUTION, beId, maxPath, minPath);
this.maxTabletNum = maxTabletNum;
this.minTabletNum = minTabletNum;
}
@ -232,7 +232,7 @@ public abstract class BalanceStat {
private Map<String, Collection<String>> expectedLocations;
public LabelLocationBalanceStat(long tabletId, Set<Long> currentBes, Map<String, Collection<String>> expectedLocations) {
super(BalanceType.LABEL_LOCATION);
super(BalanceType.LABEL_AWARE_LOCATION);
this.tabletId = tabletId;
this.currentBes = currentBes;
this.expectedLocations = expectedLocations;

View File

@ -98,10 +98,10 @@ public class DiskAndTabletLoadReBalancer extends Rebalancer {
// balance cluster
if (!isClusterDiskBalanced(clusterStat, medium)) {
alternativeTablets = balanceClusterDisk(clusterStat, medium);
balanceType = BalanceType.CLUSTER_DISK;
balanceType = BalanceType.INTER_NODE_DISK_USAGE;
} else {
alternativeTablets = balanceClusterTablet(clusterStat, medium);
balanceType = BalanceType.CLUSTER_TABLET;
balanceType = BalanceType.INTER_NODE_TABLET_DISTRIBUTION;
}
if (!alternativeTablets.isEmpty()) {
break;
@ -110,10 +110,10 @@ public class DiskAndTabletLoadReBalancer extends Rebalancer {
// balance backend
if (!isBackendDiskBalanced(clusterStat, medium)) {
alternativeTablets = balanceBackendDisk(clusterStat, medium);
balanceType = BalanceType.BACKEND_DISK;
balanceType = BalanceType.INTRA_NODE_DISK_USAGE;
} else {
alternativeTablets = balanceBackendTablet(clusterStat, medium);
balanceType = BalanceType.BACKEND_TABLET;
balanceType = BalanceType.INTRA_NODE_TABLET_DISTRIBUTION;
}
} while (false);
@ -137,7 +137,8 @@ public class DiskAndTabletLoadReBalancer extends Rebalancer {
long replicaSize = tabletCtx.getSrcReplica().getDataSize();
boolean isLocalBalance = (tabletCtx.getDestBackendId() == tabletCtx.getSrcBackendId());
// tabletCtx may wait a long time from the pending state to the running state, so we must double-check the task
if (tabletCtx.getBalanceType() == BalanceType.CLUSTER_DISK || tabletCtx.getBalanceType() == BalanceType.BACKEND_DISK) {
if (tabletCtx.getBalanceType() == BalanceType.INTER_NODE_DISK_USAGE ||
tabletCtx.getBalanceType() == BalanceType.INTRA_NODE_DISK_USAGE) {
BackendLoadStatistic srcBeStat = clusterStat.getBackendLoadStatistic(tabletCtx.getSrcBackendId());
BackendLoadStatistic destBeStat = clusterStat.getBackendLoadStatistic(tabletCtx.getDestBackendId());
if (srcBeStat == null || destBeStat == null) {
@ -623,7 +624,7 @@ public class DiskAndTabletLoadReBalancer extends Rebalancer {
schedCtx.setOrigPriority(TabletSchedCtx.Priority.LOW);
schedCtx.setSrc(replica);
schedCtx.setDest(lBackend.getId(), destPathHash);
schedCtx.setBalanceType(BalanceType.CLUSTER_DISK);
schedCtx.setBalanceType(BalanceType.INTER_NODE_DISK_USAGE);
selectedTablets.add(tabletId);
alternativeTablets.add(schedCtx);
@ -856,7 +857,7 @@ public class DiskAndTabletLoadReBalancer extends Rebalancer {
schedCtx.setOrigPriority(TabletSchedCtx.Priority.LOW);
schedCtx.setSrc(replica);
schedCtx.setDest(beId, destPathHash);
schedCtx.setBalanceType(BalanceType.BACKEND_DISK);
schedCtx.setBalanceType(BalanceType.INTRA_NODE_DISK_USAGE);
alternativeTablets.add(schedCtx);
if (alternativeTablets.size() >= Config.tablet_sched_max_balancing_tablets) {
@ -1458,7 +1459,8 @@ public class DiskAndTabletLoadReBalancer extends Rebalancer {
tabletMeta.getPhysicalPartitionId(),
tabletMeta.getIndexId(), tabletId, System.currentTimeMillis());
schedCtx.setOrigPriority(TabletSchedCtx.Priority.LOW);
schedCtx.setBalanceType(isLocalBalance ? BalanceType.BACKEND_TABLET : BalanceType.CLUSTER_TABLET);
schedCtx.setBalanceType(isLocalBalance ?
BalanceType.INTRA_NODE_TABLET_DISTRIBUTION : BalanceType.INTER_NODE_TABLET_DISTRIBUTION);
schedCtx.setSrc(replica);
// update state

View File

@ -2180,7 +2180,7 @@ public class TabletScheduler extends FrontendDaemon {
private synchronized long getPendingBalanceTabletNum(TStorageMedium medium, BalanceType balanceType) {
if (balanceType == BalanceType.COLOCATION_GROUP) {
return getPendingRepairTabletNum(medium, TabletHealthStatus.COLOCATE_MISMATCH);
} else if (balanceType == BalanceType.LABEL_LOCATION) {
} else if (balanceType == BalanceType.LABEL_AWARE_LOCATION) {
return getPendingRepairTabletNum(medium, TabletHealthStatus.LOCATION_MISMATCH);
}
@ -2198,7 +2198,7 @@ public class TabletScheduler extends FrontendDaemon {
private synchronized long getRunningBalanceTabletNum(TStorageMedium medium, BalanceType balanceType) {
if (balanceType == BalanceType.COLOCATION_GROUP) {
return getRunningRepairTabletNum(medium, TabletHealthStatus.COLOCATE_MISMATCH);
} else if (balanceType == BalanceType.LABEL_LOCATION) {
} else if (balanceType == BalanceType.LABEL_AWARE_LOCATION) {
return getRunningRepairTabletNum(medium, TabletHealthStatus.LOCATION_MISMATCH);
}

View File

@ -39,44 +39,44 @@ public class BalanceStatTest {
{
BalanceStat stat = BalanceStat.createClusterDiskBalanceStat(1L, 2L, 0.9, 0.1);
Assertions.assertFalse(stat.isBalanced());
Assertions.assertEquals(BalanceType.CLUSTER_DISK, stat.getBalanceType());
Assertions.assertEquals(BalanceType.INTER_NODE_DISK_USAGE, stat.getBalanceType());
Assertions.assertEquals("inter-node disk usage", stat.getBalanceType().label());
Assertions.assertEquals(
"{\"maxUsedPercent\":0.9,\"minUsedPercent\":0.1,\"maxBeId\":1,\"minBeId\":2,\"type\":\"CLUSTER_DISK\"," +
"\"balanced\":false}",
"{\"maxUsedPercent\":0.9,\"minUsedPercent\":0.1,\"maxBeId\":1,\"minBeId\":2," +
"\"type\":\"INTER_NODE_DISK_USAGE\",\"balanced\":false}",
stat.toString());
}
{
BalanceStat stat = BalanceStat.createClusterTabletBalanceStat(1L, 2L, 9L, 1L);
Assertions.assertFalse(stat.isBalanced());
Assertions.assertEquals(BalanceType.CLUSTER_TABLET, stat.getBalanceType());
Assertions.assertEquals(BalanceType.INTER_NODE_TABLET_DISTRIBUTION, stat.getBalanceType());
Assertions.assertEquals("inter-node tablet distribution", stat.getBalanceType().label());
Assertions.assertEquals(
"{\"maxTabletNum\":9,\"minTabletNum\":1,\"maxBeId\":1,\"minBeId\":2,\"type\":\"CLUSTER_TABLET\"," +
"\"balanced\":false}",
"{\"maxTabletNum\":9,\"minTabletNum\":1,\"maxBeId\":1,\"minBeId\":2," +
"\"type\":\"INTER_NODE_TABLET_DISTRIBUTION\",\"balanced\":false}",
stat.toString());
}
{
BalanceStat stat = BalanceStat.createBackendDiskBalanceStat(1L, "disk1", "disk2", 0.9, 0.1);
Assertions.assertFalse(stat.isBalanced());
Assertions.assertEquals(BalanceType.BACKEND_DISK, stat.getBalanceType());
Assertions.assertEquals(BalanceType.INTRA_NODE_DISK_USAGE, stat.getBalanceType());
Assertions.assertEquals("intra-node disk usage", stat.getBalanceType().label());
Assertions.assertEquals(
"{\"maxUsedPercent\":0.9,\"minUsedPercent\":0.1,\"beId\":1,\"maxPath\":\"disk1\",\"minPath\":\"disk2\"," +
"\"type\":\"BACKEND_DISK\",\"balanced\":false}",
"\"type\":\"INTRA_NODE_DISK_USAGE\",\"balanced\":false}",
stat.toString());
}
{
BalanceStat stat = BalanceStat.createBackendTabletBalanceStat(1L, "disk1", "disk2", 9L, 1L);
Assertions.assertFalse(stat.isBalanced());
Assertions.assertEquals(BalanceType.BACKEND_TABLET, stat.getBalanceType());
Assertions.assertEquals(BalanceType.INTRA_NODE_TABLET_DISTRIBUTION, stat.getBalanceType());
Assertions.assertEquals("intra-node tablet distribution", stat.getBalanceType().label());
Assertions.assertEquals(
"{\"maxTabletNum\":9,\"minTabletNum\":1,\"beId\":1,\"maxPath\":\"disk1\",\"minPath\":\"disk2\"," +
"\"type\":\"BACKEND_TABLET\",\"balanced\":false}",
"\"type\":\"INTRA_NODE_TABLET_DISTRIBUTION\",\"balanced\":false}",
stat.toString());
}
@ -99,11 +99,11 @@ public class BalanceStatTest {
expectedLocations.put("rack", Arrays.asList("rack1", "rack2"));
BalanceStat stat = BalanceStat.createLabelLocationBalanceStat(1L, currentBes, expectedLocations);
Assertions.assertFalse(stat.isBalanced());
Assertions.assertEquals(BalanceType.LABEL_LOCATION, stat.getBalanceType());
Assertions.assertEquals(BalanceType.LABEL_AWARE_LOCATION, stat.getBalanceType());
Assertions.assertEquals("label-aware location", stat.getBalanceType().label());
Assertions.assertEquals(
"{\"tabletId\":1,\"currentBes\":[1,2],\"expectedLocations\":{\"rack\":[\"rack1\",\"rack2\"]}," +
"\"type\":\"LABEL_LOCATION\",\"balanced\":false}",
"\"type\":\"LABEL_AWARE_LOCATION\",\"balanced\":false}",
stat.toString());
}
}

View File

@ -205,7 +205,7 @@ public class DiskAndTabletLoadReBalancerTest {
// check balance stat
Assertions.assertFalse(materializedIndex.isTabletBalanced());
Assertions.assertEquals(BalanceType.CLUSTER_TABLET, materializedIndex.getBalanceType());
Assertions.assertEquals(BalanceType.INTER_NODE_TABLET_DISTRIBUTION, materializedIndex.getBalanceType());
// set table state to schema_change, balance should be ignored
table.setState(OlapTable.OlapTableState.SCHEMA_CHANGE);
@ -578,10 +578,10 @@ public class DiskAndTabletLoadReBalancerTest {
// check balance stat
BalanceStat stat0 = clusterLoadStatistic.getBackendDiskBalanceStat(TStorageMedium.HDD, beId1);
Assertions.assertFalse(stat0.isBalanced());
Assertions.assertEquals(BalanceType.BACKEND_DISK, stat0.getBalanceType());
Assertions.assertEquals(BalanceType.INTRA_NODE_DISK_USAGE, stat0.getBalanceType());
BalanceStat stat1 = clusterLoadStatistic.getBackendDiskBalanceStat(TStorageMedium.SSD, beId1);
Assertions.assertFalse(stat1.isBalanced());
Assertions.assertEquals(BalanceType.BACKEND_DISK, stat1.getBalanceType());
Assertions.assertEquals(BalanceType.INTRA_NODE_DISK_USAGE, stat1.getBalanceType());
// set Config.balance_load_disk_safe_threshold to 0.9 to trigger backend tablet distribution balance
Config.tablet_sched_balance_load_disk_safe_threshold = 0.9;
@ -597,7 +597,7 @@ public class DiskAndTabletLoadReBalancerTest {
// check balance stat
BalanceStat stat2 = materializedIndex.getBalanceStat();
Assertions.assertFalse(stat2.isBalanced());
Assertions.assertEquals(BalanceType.BACKEND_TABLET, stat2.getBalanceType());
Assertions.assertEquals(BalanceType.INTRA_NODE_TABLET_DISTRIBUTION, stat2.getBalanceType());
// set table state to schema_change, balance should be ignored
table.setState(OlapTable.OlapTableState.SCHEMA_CHANGE);
@ -783,7 +783,7 @@ public class DiskAndTabletLoadReBalancerTest {
// check balance stat
BalanceStat stat = clusterLoadStatistic.getClusterDiskBalanceStat(TStorageMedium.HDD);
Assertions.assertFalse(stat.isBalanced());
Assertions.assertEquals(BalanceType.CLUSTER_DISK, stat.getBalanceType());
Assertions.assertEquals(BalanceType.INTER_NODE_DISK_USAGE, stat.getBalanceType());
}
@Test

View File

@ -297,11 +297,11 @@ public class TabletSchedCtxTest {
ctx = new TabletSchedCtx(Type.BALANCE, 1, 2, 3, 4, 1001, System.currentTimeMillis());
ctx.setOrigPriority(Priority.NORMAL);
ctx.setBalanceType(BalanceStat.BalanceType.CLUSTER_TABLET);
ctx.setBalanceType(BalanceStat.BalanceType.INTER_NODE_TABLET_DISTRIBUTION);
results = ctx.getBrief();
Assertions.assertEquals("1001", results.get(0));
Assertions.assertEquals("BALANCE", results.get(1));
Assertions.assertEquals("CLUSTER_TABLET", results.get(3));
Assertions.assertEquals("INTER_NODE_TABLET_DISTRIBUTION", results.get(3));
}
@Test

View File

@ -137,14 +137,14 @@ public class BalanceStatProcNodeTest {
TabletSchedCtx ctx1 =
new TabletSchedCtx(TabletSchedCtx.Type.BALANCE, 1L, 2L, 3L, 4L, 1001L, System.currentTimeMillis());
ctx1.setOrigPriority(TabletSchedCtx.Priority.NORMAL);
ctx1.setBalanceType(BalanceType.BACKEND_DISK);
ctx1.setBalanceType(BalanceType.INTRA_NODE_DISK_USAGE);
ctx1.setStorageMedium(TStorageMedium.HDD);
Deencapsulation.invoke(tabletScheduler, "addToPendingTablets", ctx1);
TabletSchedCtx ctx2 =
new TabletSchedCtx(TabletSchedCtx.Type.BALANCE, 1L, 2L, 3L, 4L, 1002L, System.currentTimeMillis());
ctx2.setOrigPriority(TabletSchedCtx.Priority.NORMAL);
ctx2.setBalanceType(BalanceType.BACKEND_DISK);
ctx2.setBalanceType(BalanceType.INTRA_NODE_DISK_USAGE);
ctx2.setStorageMedium(TStorageMedium.HDD);
Deencapsulation.invoke(tabletScheduler, "addToPendingTablets", ctx2);
}
@ -180,7 +180,7 @@ public class BalanceStatProcNodeTest {
TabletSchedCtx ctx = new TabletSchedCtx(TabletSchedCtx.Type.BALANCE, db.getId(), olapTable.getId(), partitionId,
index.getId(), tablet1Id, System.currentTimeMillis());
ctx.setOrigPriority(TabletSchedCtx.Priority.NORMAL);
ctx.setBalanceType(BalanceType.CLUSTER_TABLET);
ctx.setBalanceType(BalanceType.INTER_NODE_TABLET_DISTRIBUTION);
ctx.setStorageMedium(TStorageMedium.HDD);
Deencapsulation.invoke(tabletScheduler, "addToRunningTablets", ctx);
}

View File

@ -92,8 +92,8 @@ public class ClusterLoadStatByMediumTest {
String balanceStat = row.get(1);
if (medium.equals("HDD")) {
Assertions.assertEquals(
"{\"maxUsedPercent\":0.9,\"minUsedPercent\":0.1,\"maxBeId\":1,\"minBeId\":2,\"type\":\"CLUSTER_DISK\"," +
"\"balanced\":false}",
"{\"maxUsedPercent\":0.9,\"minUsedPercent\":0.1,\"maxBeId\":1,\"minBeId\":2," +
"\"type\":\"INTER_NODE_DISK_USAGE\",\"balanced\":false}",
balanceStat);
} else if (medium.equals("SSD")) {
Assertions.assertEquals("{\"balanced\":true}", balanceStat);

View File

@ -116,7 +116,7 @@ public class ClusterLoadStatisticProcDirTest {
if (beId.equals("10001")) {
Assertions.assertEquals(
"{\"maxUsedPercent\":0.9,\"minUsedPercent\":0.1,\"beId\":10001,\"maxPath\":\"/path1\"," +
"\"minPath\":\"/path2\",\"type\":\"BACKEND_DISK\",\"balanced\":false}",
"\"minPath\":\"/path2\",\"type\":\"INTRA_NODE_DISK_USAGE\",\"balanced\":false}",
row.get(11));
} else if (beId.equals("10002")) {
Assertions.assertEquals("{\"balanced\":true}", row.get(11));

View File

@ -73,8 +73,8 @@ public class IndicesProcDirTest {
Assertions.assertEquals("NORMAL", row.get(2));
Assertions.assertEquals("\\N", row.get(3)); // last consistency check time
Assertions.assertEquals(
"{\"maxTabletNum\":9,\"minTabletNum\":1,\"maxBeId\":1,\"minBeId\":2,\"type\":\"CLUSTER_TABLET\"," +
"\"balanced\":false}",
"{\"maxTabletNum\":9,\"minTabletNum\":1,\"maxBeId\":1,\"minBeId\":2," +
"\"type\":\"INTER_NODE_TABLET_DISTRIBUTION\",\"balanced\":false}",
row.get(4)); // tablet balance stat
Assertions.assertEquals("2", row.get(5)); // virtual buckets
Assertions.assertEquals("2", row.get(6)); // tablets