Signed-off-by: wyb <wybb86@gmail.com> Co-authored-by: wyb <wybb86@gmail.com>
This commit is contained in:
parent
c6da99c2bb
commit
6b4f0cbef5
|
|
@ -16,6 +16,8 @@ package com.starrocks.clone;
|
|||
|
||||
import com.google.gson.Gson;
|
||||
|
||||
import java.util.Collection;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
||||
public abstract class BalanceStat {
|
||||
|
|
@ -24,7 +26,8 @@ public abstract class BalanceStat {
|
|||
CLUSTER_TABLET("inter-node tablet distribution"),
|
||||
BACKEND_DISK("intra-node disk usage"),
|
||||
BACKEND_TABLET("intra-node tablet distribution"),
|
||||
COLOCATION_GROUP("colocation group");
|
||||
COLOCATION_GROUP("colocation group"),
|
||||
LABEL_LOCATION("label-aware location");
|
||||
|
||||
private final String label;
|
||||
|
||||
|
|
@ -83,6 +86,11 @@ public abstract class BalanceStat {
|
|||
return new ColocationGroupBalanceStat(tabletId, currentBes, bucketSeq);
|
||||
}
|
||||
|
||||
public static BalanceStat createLabelLocationBalanceStat(long tabletId, Set<Long> currentBes,
|
||||
Map<String, Collection<String>> expectedLocations) {
|
||||
return new LabelLocationBalanceStat(tabletId, currentBes, expectedLocations);
|
||||
}
|
||||
|
||||
/**
|
||||
* Represents balanced stat
|
||||
*/
|
||||
|
|
@ -214,4 +222,20 @@ public abstract class BalanceStat {
|
|||
this.expectedBes = expectedBes;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Balance stat for label-aware location mismatch
|
||||
*/
|
||||
private static class LabelLocationBalanceStat extends UnbalancedStat {
|
||||
private long tabletId;
|
||||
private Set<Long> currentBes;
|
||||
private Map<String, Collection<String>> expectedLocations;
|
||||
|
||||
public LabelLocationBalanceStat(long tabletId, Set<Long> currentBes, Map<String, Collection<String>> expectedLocations) {
|
||||
super(BalanceType.LABEL_LOCATION);
|
||||
this.tabletId = tabletId;
|
||||
this.currentBes = currentBes;
|
||||
this.expectedLocations = expectedLocations;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1692,6 +1692,8 @@ public class DiskAndTabletLoadReBalancer extends Rebalancer {
|
|||
continue;
|
||||
}
|
||||
|
||||
boolean isLabelLocationTable = olapTbl.getLocation() != null;
|
||||
|
||||
for (Partition partition : globalStateMgr.getLocalMetastore().getAllPartitionsIncludeRecycleBin(olapTbl)) {
|
||||
partitionChecked++;
|
||||
if (partitionChecked % partitionBatchNum == 0) {
|
||||
|
|
@ -1794,11 +1796,15 @@ public class DiskAndTabletLoadReBalancer extends Rebalancer {
|
|||
|
||||
boolean isTabletBalanced = pStat.skew >= 0 && pStat.skew <= 1;
|
||||
if (isTabletBalanced) {
|
||||
idx.setBalanceStat(BalanceStat.BALANCED_STAT);
|
||||
if (isLocalBalance || !isLabelLocationTable) {
|
||||
idx.setBalanceStat(BalanceStat.BALANCED_STAT);
|
||||
}
|
||||
} else if (isLocalBalance) {
|
||||
// tablet not balanced && is local balance
|
||||
idx.setBalanceStat(BalanceStat.createBackendTabletBalanceStat(
|
||||
bePaths.first, getPath(maxKey), getPath(minKey), maxNum, minNum));
|
||||
} else {
|
||||
} else if (!isLabelLocationTable) {
|
||||
// tablet not balanced && not local balance && table not use label location
|
||||
idx.setBalanceStat(
|
||||
BalanceStat.createClusterTabletBalanceStat(maxKey, minKey, maxNum, minNum));
|
||||
}
|
||||
|
|
|
|||
|
|
@ -410,16 +410,25 @@ public class TabletChecker extends FrontendDaemon {
|
|||
int replicaNum, List<Long> aliveBeIdsInCluster,
|
||||
boolean isPartitionUrgent) {
|
||||
TabletCheckerStat partitionTabletCheckerStat = new TabletCheckerStat();
|
||||
Multimap<String, String> locations = olapTbl.getLocation();
|
||||
boolean isLabelLocationTable = locations != null;
|
||||
boolean enoughLocationMatchedBackends = preCheckEnoughLocationMatchedBackends(locations, replicaNum);
|
||||
|
||||
// Tablet in SHADOW index can not be repaired or balanced
|
||||
if (physicalPartition != null) {
|
||||
for (MaterializedIndex idx : physicalPartition.getMaterializedIndices(
|
||||
IndexExtState.VISIBLE)) {
|
||||
BalanceStat balanceStat = BalanceStat.BALANCED_STAT;
|
||||
boolean allTabletsChecked = true;
|
||||
|
||||
for (Tablet tablet : idx.getTablets()) {
|
||||
LocalTablet localTablet = (LocalTablet) tablet;
|
||||
partitionTabletCheckerStat.totalTabletNum++;
|
||||
|
||||
if (tabletScheduler.containsTablet(tablet.getId())) {
|
||||
long tabletId = tablet.getId();
|
||||
if (tabletScheduler.containsTablet(tabletId)) {
|
||||
partitionTabletCheckerStat.tabletInScheduler++;
|
||||
allTabletsChecked = false;
|
||||
continue;
|
||||
}
|
||||
|
||||
|
|
@ -431,13 +440,18 @@ public class TabletChecker extends FrontendDaemon {
|
|||
physicalPartition.getVisibleVersion(),
|
||||
replicaNum,
|
||||
aliveBeIdsInCluster,
|
||||
olapTbl.getLocation());
|
||||
locations);
|
||||
|
||||
if (statusWithPrio.first == TabletHealthStatus.HEALTHY) {
|
||||
// Only set last status check time when status is healthy.
|
||||
localTablet.setLastStatusCheckTime(System.currentTimeMillis());
|
||||
continue;
|
||||
} else if (isPartitionUrgent) {
|
||||
} else if (statusWithPrio.first == TabletHealthStatus.LOCATION_MISMATCH && balanceStat.isBalanced()) {
|
||||
balanceStat = BalanceStat.createLabelLocationBalanceStat(
|
||||
tabletId, localTablet.getBackendIds(), locations.asMap());
|
||||
}
|
||||
|
||||
if (isPartitionUrgent) {
|
||||
statusWithPrio.second = TabletSchedCtx.Priority.VERY_HIGH;
|
||||
partitionTabletCheckerStat.isUrgentPartitionHealthy = false;
|
||||
}
|
||||
|
|
@ -449,21 +463,20 @@ public class TabletChecker extends FrontendDaemon {
|
|||
continue;
|
||||
}
|
||||
|
||||
if (statusWithPrio.first == TabletHealthStatus.LOCATION_MISMATCH &&
|
||||
!preCheckEnoughLocationMatchedBackends(olapTbl.getLocation(), replicaNum)) {
|
||||
if (statusWithPrio.first == TabletHealthStatus.LOCATION_MISMATCH && !enoughLocationMatchedBackends) {
|
||||
continue;
|
||||
}
|
||||
|
||||
TabletSchedCtx tabletSchedCtx = new TabletSchedCtx(
|
||||
TabletSchedCtx.Type.REPAIR,
|
||||
db.getId(), olapTbl.getId(),
|
||||
physicalPartition.getId(), idx.getId(), tablet.getId(),
|
||||
physicalPartition.getId(), idx.getId(), tabletId,
|
||||
System.currentTimeMillis());
|
||||
// the tablet status will be set again when being scheduled
|
||||
tabletSchedCtx.setTabletStatus(statusWithPrio.first);
|
||||
tabletSchedCtx.setOrigPriority(statusWithPrio.second);
|
||||
tabletSchedCtx.setTablet(localTablet);
|
||||
tabletSchedCtx.setRequiredLocation(olapTbl.getLocation());
|
||||
tabletSchedCtx.setRequiredLocation(locations);
|
||||
tabletSchedCtx.setReplicaNum(replicaNum);
|
||||
if (!tryChooseSrcBeforeSchedule(tabletSchedCtx)) {
|
||||
continue;
|
||||
|
|
@ -477,6 +490,13 @@ public class TabletChecker extends FrontendDaemon {
|
|||
partitionTabletCheckerStat.addToSchedulerTabletNum++;
|
||||
}
|
||||
}
|
||||
|
||||
if (isLabelLocationTable) {
|
||||
// set label location balance stat in materialized index if not balanced or all tablets check balanced.
|
||||
if (!balanceStat.isBalanced() || (balanceStat.isBalanced() && allTabletsChecked)) {
|
||||
idx.setBalanceStat(balanceStat);
|
||||
}
|
||||
}
|
||||
} // indices
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -2180,6 +2180,8 @@ public class TabletScheduler extends FrontendDaemon {
|
|||
private synchronized long getPendingBalanceTabletNum(TStorageMedium medium, BalanceType balanceType) {
|
||||
if (balanceType == BalanceType.COLOCATION_GROUP) {
|
||||
return getPendingRepairTabletNum(medium, TabletHealthStatus.COLOCATE_MISMATCH);
|
||||
} else if (balanceType == BalanceType.LABEL_LOCATION) {
|
||||
return getPendingRepairTabletNum(medium, TabletHealthStatus.LOCATION_MISMATCH);
|
||||
}
|
||||
|
||||
return pendingTablets.stream()
|
||||
|
|
@ -2196,6 +2198,8 @@ public class TabletScheduler extends FrontendDaemon {
|
|||
private synchronized long getRunningBalanceTabletNum(TStorageMedium medium, BalanceType balanceType) {
|
||||
if (balanceType == BalanceType.COLOCATION_GROUP) {
|
||||
return getRunningRepairTabletNum(medium, TabletHealthStatus.COLOCATE_MISMATCH);
|
||||
} else if (balanceType == BalanceType.LABEL_LOCATION) {
|
||||
return getRunningRepairTabletNum(medium, TabletHealthStatus.LOCATION_MISMATCH);
|
||||
}
|
||||
|
||||
return runningTablets.values().stream()
|
||||
|
|
|
|||
|
|
@ -14,11 +14,15 @@
|
|||
|
||||
package com.starrocks.clone;
|
||||
|
||||
import com.google.common.collect.Maps;
|
||||
import com.google.common.collect.Sets;
|
||||
import com.starrocks.clone.BalanceStat.BalanceType;
|
||||
import org.junit.jupiter.api.Assertions;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
||||
public class BalanceStatTest {
|
||||
|
|
@ -88,5 +92,19 @@ public class BalanceStatTest {
|
|||
"\"balanced\":false}",
|
||||
stat.toString());
|
||||
}
|
||||
|
||||
{
|
||||
Set<Long> currentBes = Sets.newHashSet(1L, 2L);
|
||||
Map<String, Collection<String>> expectedLocations = Maps.newHashMap();
|
||||
expectedLocations.put("rack", Arrays.asList("rack1", "rack2"));
|
||||
BalanceStat stat = BalanceStat.createLabelLocationBalanceStat(1L, currentBes, expectedLocations);
|
||||
Assertions.assertFalse(stat.isBalanced());
|
||||
Assertions.assertEquals(BalanceType.LABEL_LOCATION, stat.getBalanceType());
|
||||
Assertions.assertEquals("label-aware location", stat.getBalanceType().label());
|
||||
Assertions.assertEquals(
|
||||
"{\"tabletId\":1,\"currentBes\":[1,2],\"expectedLocations\":{\"rack\":[\"rack1\",\"rack2\"]}," +
|
||||
"\"type\":\"LABEL_LOCATION\",\"balanced\":false}",
|
||||
stat.toString());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -55,8 +55,11 @@ import mockit.Mocked;
|
|||
import org.junit.jupiter.api.Assertions;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
||||
public class BalanceStatProcNodeTest {
|
||||
|
||||
|
|
@ -226,6 +229,45 @@ public class BalanceStatProcNodeTest {
|
|||
Deencapsulation.invoke(tabletScheduler, "addToRunningTablets", ctx2);
|
||||
}
|
||||
|
||||
// 3. label location mismatch balance
|
||||
{
|
||||
List<Column> cols = Lists.newArrayList(new Column("province", Type.VARCHAR));
|
||||
PartitionInfo listPartition = new ListPartitionInfo(PartitionType.LIST, cols);
|
||||
long partitionId = 1225L;
|
||||
listPartition.setDataProperty(partitionId, DataProperty.DEFAULT_DATA_PROPERTY);
|
||||
listPartition.setIsInMemory(partitionId, false);
|
||||
listPartition.setReplicationNum(partitionId, (short) 1);
|
||||
OlapTable olapTable = new OlapTable(1224L, "location_table", cols, null, listPartition, null);
|
||||
|
||||
MaterializedIndex index = new MaterializedIndex(1200L, MaterializedIndex.IndexState.NORMAL);
|
||||
TabletMeta tabletMeta = new TabletMeta(db.getId(), olapTable.getId(), partitionId, index.getId(), TStorageMedium.HDD);
|
||||
long tablet1Id = 1210L;
|
||||
index.addTablet(new LocalTablet(tablet1Id), tabletMeta);
|
||||
long tablet2Id = 1211L;
|
||||
index.addTablet(new LocalTablet(tablet2Id), tabletMeta);
|
||||
Map<String, Long> indexNameToId = olapTable.getIndexNameToId();
|
||||
indexNameToId.put("index1", index.getId());
|
||||
|
||||
// balance stat
|
||||
Set<Long> currentBes = Sets.newHashSet(be1.getId(), be2.getId());
|
||||
Map<String, Collection<String>> expectedLocations = Maps.newHashMap();
|
||||
expectedLocations.put("rack", Arrays.asList("rack1", "rack2"));
|
||||
index.setBalanceStat(BalanceStat.createLabelLocationBalanceStat(tablet1Id, currentBes, expectedLocations));
|
||||
|
||||
Partition partition = new Partition(partitionId, partitionId, "p1", index, new HashDistributionInfo(2, cols));
|
||||
olapTable.addPartition(partition);
|
||||
|
||||
db.registerTableUnlocked(olapTable);
|
||||
|
||||
// 1 running tablet
|
||||
TabletSchedCtx ctx = new TabletSchedCtx(TabletSchedCtx.Type.REPAIR, db.getId(), olapTable.getId(), partitionId,
|
||||
index.getId(), tablet1Id, System.currentTimeMillis());
|
||||
ctx.setOrigPriority(TabletSchedCtx.Priority.NORMAL);
|
||||
ctx.setTabletStatus(LocalTablet.TabletHealthStatus.LOCATION_MISMATCH);
|
||||
ctx.setStorageMedium(TStorageMedium.HDD);
|
||||
Deencapsulation.invoke(tabletScheduler, "addToRunningTablets", ctx);
|
||||
}
|
||||
|
||||
new Expectations() {
|
||||
{
|
||||
GlobalStateMgr.getCurrentState().getLocalMetastore();
|
||||
|
|
@ -237,7 +279,7 @@ public class BalanceStatProcNodeTest {
|
|||
BalanceStatProcNode proc = new BalanceStatProcNode(tabletScheduler);
|
||||
BaseProcResult result = (BaseProcResult) proc.fetchResult();
|
||||
List<List<String>> rows = result.getRows();
|
||||
Assertions.assertEquals(5, rows.size());
|
||||
Assertions.assertEquals(6, rows.size());
|
||||
|
||||
// cluster disk balanced
|
||||
Assertions.assertEquals("[HDD, inter-node disk usage, true, 0, 0]", rows.get(0).toString());
|
||||
|
|
@ -249,5 +291,7 @@ public class BalanceStatProcNodeTest {
|
|||
Assertions.assertEquals("[HDD, intra-node tablet distribution, true, 0, 0]", rows.get(3).toString());
|
||||
// colocation group not balanced
|
||||
Assertions.assertEquals("[HDD, colocation group, false, 1, 1]", rows.get(4).toString());
|
||||
// label-aware location table not balanced
|
||||
Assertions.assertEquals("[HDD, label-aware location, false, 0, 1]", rows.get(5).toString());
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in New Issue