[Enhancement] Add colocate group balance statistic (backport #61736) (#61876)

Signed-off-by: wyb <wybb86@gmail.com>
Co-authored-by: wyb <wybb86@gmail.com>
This commit is contained in:
mergify[bot] 2025-08-13 06:13:14 +00:00 committed by GitHub
parent fb3c1fbeb3
commit 1c0ffd7f4c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 252 additions and 138 deletions

View File

@ -16,12 +16,15 @@ package com.starrocks.clone;
import com.google.gson.Gson;
import java.util.Set;
public abstract class BalanceStat {
public enum BalanceType {
CLUSTER_DISK("inter-node disk usage"),
CLUSTER_TABLET("inter-node tablet distribution"),
BACKEND_DISK("intra-node disk usage"),
BACKEND_TABLET("intra-node tablet distribution");
BACKEND_TABLET("intra-node tablet distribution"),
COLOCATION_GROUP("colocation group");
private final String label;
@ -76,6 +79,10 @@ public abstract class BalanceStat {
return new BackendTabletBalanceStat(beId, maxPath, minPath, maxTabletCount, minTabletCount);
}
public static BalanceStat createColocationGroupBalanceStat(long tabletId, Set<Long> currentBes, Set<Long> bucketSeq) {
return new ColocationGroupBalanceStat(tabletId, currentBes, bucketSeq);
}
/**
* Represents balanced stat
*/
@ -191,4 +198,20 @@ public abstract class BalanceStat {
this.minTabletNum = minTabletNum;
}
}
/**
* Balance stat for colocate group bucket seq mismatch
*/
private static class ColocationGroupBalanceStat extends UnbalancedStat {
private long tabletId;
private Set<Long> currentBes;
private Set<Long> expectedBes;
public ColocationGroupBalanceStat(long tabletId, Set<Long> currentBes, Set<Long> expectedBes) {
super(BalanceType.COLOCATION_GROUP);
this.tabletId = tabletId;
this.currentBes = currentBes;
this.expectedBes = expectedBes;
}
}
}

View File

@ -797,17 +797,26 @@ public class ColocateTableBalancer extends FrontendDaemon {
for (MaterializedIndex index : physicalPartition.getMaterializedIndices(IndexExtState.VISIBLE)) {
Preconditions.checkState(backendBucketsSeq.size() == index.getTablets().size(),
backendBucketsSeq.size() + " v.s. " + index.getTablets().size());
BalanceStat balanceStat = BalanceStat.BALANCED_STAT;
int idx = 0;
for (Long tabletId : index.getTabletIds()) {
LocalTablet tablet = (LocalTablet) index.getTablet(tabletId);
Set<Long> bucketsSeq = backendBucketsSeq.get(idx);
Set<Long> bucketSeq = backendBucketsSeq.get(idx);
// check tablet colocate status
TabletHealthStatus st = TabletChecker.getColocateTabletHealthStatus(tablet, visibleVersion,
replicationNum, bucketSeq);
if (st == TabletHealthStatus.COLOCATE_MISMATCH && balanceStat.isBalanced()) {
balanceStat =
BalanceStat.createColocationGroupBalanceStat(tabletId, tablet.getBackendIds(), bucketSeq);
}
// Tablet has already been scheduled, no need to schedule again
if (!tabletScheduler.containsTablet(tablet.getId())) {
Preconditions.checkState(bucketsSeq.size() == replicationNum,
bucketsSeq.size() + " vs. " + replicationNum);
TabletHealthStatus
st = TabletChecker.getColocateTabletHealthStatus(tablet, visibleVersion,
replicationNum, bucketsSeq);
Preconditions.checkState(bucketSeq.size() == replicationNum,
bucketSeq.size() + " vs. " + replicationNum);
if (st != TabletHealthStatus.HEALTHY) {
isGroupStable = false;
Priority colocateUnhealthyPrio = Priority.HIGH;
@ -844,10 +853,10 @@ public class ColocateTableBalancer extends FrontendDaemon {
Pair<Boolean, Long> result =
tabletScheduler.blockingAddTabletCtxToScheduler(db, tabletCtx,
needToForceRepair(st, tablet,
bucketsSeq) || isPartitionUrgent /* forcefully add or not */);
bucketSeq) || isPartitionUrgent /* forcefully add or not */);
if (LOG.isDebugEnabled() && result.first &&
st == TabletHealthStatus.COLOCATE_MISMATCH) {
logDebugInfoForColocateMismatch(bucketsSeq, tablet);
logDebugInfoForColocateMismatch(bucketSeq, tablet);
}
waitTotalTimeMs += result.second;
@ -857,7 +866,7 @@ public class ColocateTableBalancer extends FrontendDaemon {
tableId,
info != null ? info.getLastBackendsPerBucketSeq().get(idx) :
Lists.newArrayList(),
bucketsSeq);
bucketSeq);
}
}
} else {
@ -866,13 +875,15 @@ public class ColocateTableBalancer extends FrontendDaemon {
} else {
// tablet maybe added to scheduler because of balance between local disks,
// in this case we shouldn't mark the group unstable
if (TabletChecker.getColocateTabletHealthStatus(
tablet, visibleVersion, replicationNum, bucketsSeq) != TabletHealthStatus.HEALTHY) {
if (st != TabletHealthStatus.HEALTHY) {
isGroupStable = false;
}
}
idx++;
} // end for tablets
// set balance stat in materialized index
index.setBalanceStat(balanceStat);
} // end for materialize indexes
if (isUrgentPartitionHealthy && isPartitionUrgent) {

View File

@ -2100,8 +2100,8 @@ public class TabletScheduler extends FrontendDaemon {
for (TStorageMedium medium : storageMediums) {
for (BalanceType type : BalanceType.values()) {
boolean isBalanced = !mediumBalanceTypes.contains(Pair.create(medium, type));
long pendingTabletNum = getBalancePendingTabletNum(medium, type);
long runningTabletNum = getBalanceRunningTabletNum(medium, type);
long pendingTabletNum = getPendingBalanceTabletNum(medium, type);
long runningTabletNum = getRunningBalanceTabletNum(medium, type);
stats.add(Lists.newArrayList(medium.name(), type.label(), String.valueOf(isBalanced),
String.valueOf(pendingTabletNum), String.valueOf(runningTabletNum)));
}
@ -2171,13 +2171,33 @@ public class TabletScheduler extends FrontendDaemon {
+ runningTablets.values().stream().filter(t -> t.getType() == Type.BALANCE).count();
}
private synchronized long getBalancePendingTabletNum(TStorageMedium medium, BalanceStat.BalanceType balanceType) {
private synchronized long getPendingRepairTabletNum(TStorageMedium medium, TabletHealthStatus status) {
return pendingTablets.stream()
.filter(t -> t.getStorageMedium() == medium && t.getType() == Type.REPAIR && t.getTabletHealthStatus() == status)
.count();
}
private synchronized long getPendingBalanceTabletNum(TStorageMedium medium, BalanceType balanceType) {
if (balanceType == BalanceType.COLOCATION_GROUP) {
return getPendingRepairTabletNum(medium, TabletHealthStatus.COLOCATE_MISMATCH);
}
return pendingTablets.stream()
.filter(t -> t.getStorageMedium() == medium && t.getType() == Type.BALANCE && t.getBalanceType() == balanceType)
.count();
}
private synchronized long getBalanceRunningTabletNum(TStorageMedium medium, BalanceStat.BalanceType balanceType) {
private synchronized long getRunningRepairTabletNum(TStorageMedium medium, TabletHealthStatus status) {
return runningTablets.values().stream()
.filter(t -> t.getStorageMedium() == medium && t.getType() == Type.REPAIR && t.getTabletHealthStatus() == status)
.count();
}
private synchronized long getRunningBalanceTabletNum(TStorageMedium medium, BalanceType balanceType) {
if (balanceType == BalanceType.COLOCATION_GROUP) {
return getRunningRepairTabletNum(medium, TabletHealthStatus.COLOCATE_MISMATCH);
}
return runningTablets.values().stream()
.filter(t -> t.getStorageMedium() == medium && t.getType() == Type.BALANCE && t.getBalanceType() == balanceType)
.count();

View File

@ -14,10 +14,13 @@
package com.starrocks.clone;
import com.google.common.collect.Sets;
import com.starrocks.clone.BalanceStat.BalanceType;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import java.util.Set;
public class BalanceStatTest {
@Test
@ -72,5 +75,18 @@ public class BalanceStatTest {
"\"type\":\"BACKEND_TABLET\",\"balanced\":false}",
stat.toString());
}
{
Set<Long> currentBes = Sets.newHashSet(1L, 2L);
Set<Long> expectedBes = Sets.newHashSet(2L, 3L);
BalanceStat stat = BalanceStat.createColocationGroupBalanceStat(1L, currentBes, expectedBes);
Assertions.assertFalse(stat.isBalanced());
Assertions.assertEquals(BalanceType.COLOCATION_GROUP, stat.getBalanceType());
Assertions.assertEquals("colocation group", stat.getBalanceType().label());
Assertions.assertEquals(
"{\"tabletId\":1,\"currentBes\":[1,2],\"expectedBes\":[2,3],\"type\":\"COLOCATION_GROUP\"," +
"\"balanced\":false}",
stat.toString());
}
}
}

View File

@ -48,15 +48,17 @@ import com.starrocks.catalog.LocalTablet;
import com.starrocks.catalog.MaterializedIndex;
import com.starrocks.catalog.OlapTable;
import com.starrocks.catalog.Partition;
import com.starrocks.catalog.Tablet;
import com.starrocks.catalog.PhysicalPartition;
import com.starrocks.catalog.TabletInvertedIndex;
import com.starrocks.catalog.Type;
import com.starrocks.clone.BalanceStat.BalanceType;
import com.starrocks.common.Config;
import com.starrocks.common.FeConstants;
import com.starrocks.common.jmockit.Deencapsulation;
import com.starrocks.leader.TabletCollector;
import com.starrocks.qe.ConnectContext;
import com.starrocks.server.GlobalStateMgr;
import com.starrocks.server.LocalMetastore;
import com.starrocks.system.Backend;
import com.starrocks.system.SystemInfoService;
import com.starrocks.utframe.StarRocksAssert;
@ -146,32 +148,6 @@ public class ColocateTableBalancerTest {
return colocateTableIndex;
}
private void addTabletsToScheduler(String dbName, String tableName, boolean setGroupId) {
Database database = GlobalStateMgr.getCurrentState().getLocalMetastore().getDb(dbName);
OlapTable table =
(OlapTable) GlobalStateMgr.getCurrentState().getLocalMetastore().getTable(database.getFullName(), tableName);
// add its tablet to TabletScheduler
TabletScheduler tabletScheduler = GlobalStateMgr.getCurrentState().getTabletScheduler();
for (Partition partition : table.getPartitions()) {
MaterializedIndex materializedIndex = partition.getDefaultPhysicalPartition().getBaseIndex();
for (Tablet tablet : materializedIndex.getTablets()) {
TabletSchedCtx ctx = new TabletSchedCtx(TabletSchedCtx.Type.REPAIR,
database.getId(),
table.getId(),
partition.getId(),
materializedIndex.getId(),
tablet.getId(),
System.currentTimeMillis());
ctx.setOrigPriority(TabletSchedCtx.Priority.LOW);
if (setGroupId) {
ctx.setColocateGroupId(
GlobalStateMgr.getCurrentState().getColocateTableIndex().getGroup(table.getId()));
}
tabletScheduler.addTablet(ctx, false);
}
}
}
@Test
public void test1MatchGroup() throws Exception {
starRocksAssert.withDatabase("db1").useDatabase("db1")
@ -179,24 +155,34 @@ public class ColocateTableBalancerTest {
"distributed by hash(`id`) buckets 3 " +
"properties('replication_num' = '1', 'colocate_with' = 'group1');");
Database database = GlobalStateMgr.getCurrentState().getLocalMetastore().getDb("db1");
OlapTable table =
(OlapTable) GlobalStateMgr.getCurrentState().getLocalMetastore().getTable(database.getFullName(), "tbl");
addTabletsToScheduler("db1", "tbl", false);
GlobalStateMgr globalStateMgr = GlobalStateMgr.getCurrentState();
LocalMetastore metastore = globalStateMgr.getLocalMetastore();
Database database = metastore.getDb("db1");
OlapTable table = (OlapTable) metastore.getTable(database.getFullName(), "tbl");
ColocateTableIndex colocateIndex = GlobalStateMgr.getCurrentState().getColocateTableIndex();
ColocateTableIndex colocateIndex = globalStateMgr.getColocateTableIndex();
List<List<Long>> bl = Lists.newArrayList();
bl.add(new ArrayList<>(Arrays.asList(1L, 2L, 3L)));
bl.add(new ArrayList<>(Arrays.asList(1L, 2L, 3L)));
bl.add(new ArrayList<>(Arrays.asList(1L, 2L, 3L)));
// current backend of the tablets is 10001
bl.add(Lists.newArrayList(100000000L));
bl.add(Lists.newArrayList(100000000L));
bl.add(Lists.newArrayList(100000000L));
colocateIndex.addBackendsPerBucketSeq(colocateIndex.getGroup(table.getId()), Lists.newArrayList(bl));
// test if group is unstable when all its tablets are in TabletScheduler
long tableId = table.getId();
ColocateTableBalancer colocateTableBalancer = ColocateTableBalancer.getInstance();
colocateTableBalancer.runAfterCatalogReady();
GroupId groupId = GlobalStateMgr.getCurrentState().getColocateTableIndex().getGroup(tableId);
Assertions.assertTrue(GlobalStateMgr.getCurrentState().getColocateTableIndex().isGroupUnstable(groupId));
GroupId groupId = globalStateMgr.getColocateTableIndex().getGroup(tableId);
Assertions.assertTrue(globalStateMgr.getColocateTableIndex().isGroupUnstable(groupId));
// check balance stat
Partition partition = table.getPartition("tbl");
PhysicalPartition physicalPartition = partition.getDefaultPhysicalPartition();
Assertions.assertFalse(physicalPartition.isTabletBalanced());
MaterializedIndex index = physicalPartition.getBaseIndex();
BalanceStat balanceStat = index.getBalanceStat();
Assertions.assertFalse(balanceStat.isBalanced());
Assertions.assertEquals(BalanceType.COLOCATION_GROUP, balanceStat.getBalanceType());
// clean
colocateIndex.removeTable(table.getId(), table, false);

View File

@ -17,11 +17,13 @@ package com.starrocks.common.proc;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.starrocks.catalog.CatalogRecycleBin;
import com.starrocks.catalog.Column;
import com.starrocks.catalog.DataProperty;
import com.starrocks.catalog.Database;
import com.starrocks.catalog.DiskInfo;
import com.starrocks.catalog.HashDistributionInfo;
import com.starrocks.catalog.ListPartitionInfo;
import com.starrocks.catalog.LocalTablet;
import com.starrocks.catalog.MaterializedIndex;
@ -60,87 +62,91 @@ public class BalanceStatProcNodeTest {
@Test
public void testFetchResult(@Mocked GlobalStateMgr globalStateMgr) throws AnalysisException {
// 0. backend disk balance
// system info service
// be1
Backend be1 = new Backend(10001L, "192.168.0.1", 9051);
Map<String, DiskInfo> disks = Maps.newHashMap();
DiskInfo diskInfo1 = new DiskInfo("/path1");
diskInfo1.setTotalCapacityB(1000000L);
diskInfo1.setAvailableCapacityB(100000L);
diskInfo1.setDataUsedCapacityB(880000L);
diskInfo1.setStorageMedium(TStorageMedium.HDD);
disks.put(diskInfo1.getRootPath(), diskInfo1);
DiskInfo diskInfo2 = new DiskInfo("/path2");
diskInfo2.setTotalCapacityB(1000000L);
diskInfo2.setAvailableCapacityB(900000L);
diskInfo2.setDataUsedCapacityB(80000L);
diskInfo2.setStorageMedium(TStorageMedium.HDD);
disks.put(diskInfo2.getRootPath(), diskInfo2);
be1.setDisks(ImmutableMap.copyOf(disks));
be1.setAlive(true);
// be2
Backend be2 = new Backend(10002L, "192.168.0.2", 9051);
disks = Maps.newHashMap();
diskInfo1 = new DiskInfo("/path1");
diskInfo1.setTotalCapacityB(1000000L);
diskInfo1.setAvailableCapacityB(500000L);
diskInfo1.setDataUsedCapacityB(480000L);
diskInfo1.setStorageMedium(TStorageMedium.HDD);
disks.put(diskInfo1.getRootPath(), diskInfo1);
diskInfo2 = new DiskInfo("/path2");
diskInfo2.setTotalCapacityB(1000000L);
diskInfo2.setAvailableCapacityB(500000L);
diskInfo2.setDataUsedCapacityB(480000L);
diskInfo2.setStorageMedium(TStorageMedium.HDD);
disks.put(diskInfo2.getRootPath(), diskInfo2);
be2.setDisks(ImmutableMap.copyOf(disks));
be2.setAlive(true);
SystemInfoService systemInfoService = new SystemInfoService();
systemInfoService.addBackend(be1);
systemInfoService.addBackend(be2);
// tablet inverted index
TabletInvertedIndex invertedIndex = new TabletInvertedIndex();
invertedIndex.addTablet(50000L, new TabletMeta(1L, 2L, 3L, 4L, TStorageMedium.HDD));
invertedIndex.addReplica(50000L, new Replica(50001L, be1.getId(), 0, Replica.ReplicaState.NORMAL));
invertedIndex.addTablet(60000L, new TabletMeta(1L, 2L, 3L, 4L, TStorageMedium.HDD));
invertedIndex.addReplica(60000L, new Replica(60002L, be2.getId(), 0, Replica.ReplicaState.NORMAL));
// cluster load statistic
ClusterLoadStatistic clusterLoadStat = new ClusterLoadStatistic(systemInfoService, invertedIndex);
clusterLoadStat.init();
clusterLoadStat.updateBackendDiskBalanceStat(Pair.create(TStorageMedium.HDD, be1.getId()),
BalanceStat.createBackendDiskBalanceStat(be1.getId(), "/path1", "/path2", 0.9, 0.1));
// tablet scheduler
TabletScheduler tabletScheduler = new TabletScheduler(new TabletSchedulerStat());
tabletScheduler.setClusterLoadStatistic(clusterLoadStat);
// 2 pending tablet
TabletSchedCtx ctx1 = new TabletSchedCtx(TabletSchedCtx.Type.BALANCE, 1L, 2L, 3L, 4L, 1001L, System.currentTimeMillis());
ctx1.setOrigPriority(TabletSchedCtx.Priority.NORMAL);
ctx1.setBalanceType(BalanceType.BACKEND_DISK);
ctx1.setStorageMedium(TStorageMedium.HDD);
Deencapsulation.invoke(tabletScheduler, "addToPendingTablets", ctx1);
TabletSchedCtx ctx2 = new TabletSchedCtx(TabletSchedCtx.Type.BALANCE, 1L, 2L, 3L, 4L, 1002L, System.currentTimeMillis());
ctx2.setOrigPriority(TabletSchedCtx.Priority.NORMAL);
ctx2.setBalanceType(BalanceType.BACKEND_DISK);
ctx2.setStorageMedium(TStorageMedium.HDD);
Deencapsulation.invoke(tabletScheduler, "addToPendingTablets", ctx2);
// 1. cluster tablet balance
// local meta store
LocalMetastore localMetastore = new LocalMetastore(GlobalStateMgr.getCurrentState(), new CatalogRecycleBin(), null);
Database db = new Database(10000L, "BalanceStatProcTestDB");
localMetastore.unprotectCreateDb(db);
Backend be1 = new Backend(10001L, "192.168.0.1", 9051);
Backend be2 = new Backend(10002L, "192.168.0.2", 9051);
// 0. backend disk balance
{
// system info service
// be1
Map<String, DiskInfo> disks = Maps.newHashMap();
DiskInfo diskInfo1 = new DiskInfo("/path1");
diskInfo1.setTotalCapacityB(1000000L);
diskInfo1.setAvailableCapacityB(100000L);
diskInfo1.setDataUsedCapacityB(880000L);
diskInfo1.setStorageMedium(TStorageMedium.HDD);
disks.put(diskInfo1.getRootPath(), diskInfo1);
DiskInfo diskInfo2 = new DiskInfo("/path2");
diskInfo2.setTotalCapacityB(1000000L);
diskInfo2.setAvailableCapacityB(900000L);
diskInfo2.setDataUsedCapacityB(80000L);
diskInfo2.setStorageMedium(TStorageMedium.HDD);
disks.put(diskInfo2.getRootPath(), diskInfo2);
be1.setDisks(ImmutableMap.copyOf(disks));
be1.setAlive(true);
// be2
disks = Maps.newHashMap();
diskInfo1 = new DiskInfo("/path1");
diskInfo1.setTotalCapacityB(1000000L);
diskInfo1.setAvailableCapacityB(500000L);
diskInfo1.setDataUsedCapacityB(480000L);
diskInfo1.setStorageMedium(TStorageMedium.HDD);
disks.put(diskInfo1.getRootPath(), diskInfo1);
diskInfo2 = new DiskInfo("/path2");
diskInfo2.setTotalCapacityB(1000000L);
diskInfo2.setAvailableCapacityB(500000L);
diskInfo2.setDataUsedCapacityB(480000L);
diskInfo2.setStorageMedium(TStorageMedium.HDD);
disks.put(diskInfo2.getRootPath(), diskInfo2);
be2.setDisks(ImmutableMap.copyOf(disks));
be2.setAlive(true);
SystemInfoService systemInfoService = new SystemInfoService();
systemInfoService.addBackend(be1);
systemInfoService.addBackend(be2);
// tablet inverted index
TabletInvertedIndex invertedIndex = new TabletInvertedIndex();
invertedIndex.addTablet(50000L, new TabletMeta(1L, 2L, 3L, 4L, TStorageMedium.HDD));
invertedIndex.addReplica(50000L, new Replica(50001L, be1.getId(), 0, Replica.ReplicaState.NORMAL));
invertedIndex.addTablet(60000L, new TabletMeta(1L, 2L, 3L, 4L, TStorageMedium.HDD));
invertedIndex.addReplica(60000L, new Replica(60002L, be2.getId(), 0, Replica.ReplicaState.NORMAL));
// cluster load statistic
ClusterLoadStatistic clusterLoadStat = new ClusterLoadStatistic(systemInfoService, invertedIndex);
clusterLoadStat.init();
clusterLoadStat.updateBackendDiskBalanceStat(Pair.create(TStorageMedium.HDD, be1.getId()),
BalanceStat.createBackendDiskBalanceStat(be1.getId(), "/path1", "/path2", 0.9, 0.1));
tabletScheduler.setClusterLoadStatistic(clusterLoadStat);
// 2 pending tablet
TabletSchedCtx ctx1 =
new TabletSchedCtx(TabletSchedCtx.Type.BALANCE, 1L, 2L, 3L, 4L, 1001L, System.currentTimeMillis());
ctx1.setOrigPriority(TabletSchedCtx.Priority.NORMAL);
ctx1.setBalanceType(BalanceType.BACKEND_DISK);
ctx1.setStorageMedium(TStorageMedium.HDD);
Deencapsulation.invoke(tabletScheduler, "addToPendingTablets", ctx1);
TabletSchedCtx ctx2 =
new TabletSchedCtx(TabletSchedCtx.Type.BALANCE, 1L, 2L, 3L, 4L, 1002L, System.currentTimeMillis());
ctx2.setOrigPriority(TabletSchedCtx.Priority.NORMAL);
ctx2.setBalanceType(BalanceType.BACKEND_DISK);
ctx2.setStorageMedium(TStorageMedium.HDD);
Deencapsulation.invoke(tabletScheduler, "addToPendingTablets", ctx2);
}
// 1. cluster tablet balance
{
List<Column> cols = Lists.newArrayList(new Column("province", Type.VARCHAR));
PartitionInfo listPartition = new ListPartitionInfo(PartitionType.LIST, cols);
@ -149,25 +155,75 @@ public class BalanceStatProcNodeTest {
listPartition.setIsInMemory(partitionId, false);
listPartition.setReplicationNum(partitionId, (short) 1);
OlapTable olapTable = new OlapTable(1024L, "olap_table", cols, null, listPartition, null);
MaterializedIndex index = new MaterializedIndex(1000L, MaterializedIndex.IndexState.NORMAL);
index.setBalanceStat(BalanceStat.createClusterTabletBalanceStat(10001L, 10002L, 9L, 1L));
TabletMeta tabletMeta = new TabletMeta(db.getId(), olapTable.getId(), partitionId, index.getId(), TStorageMedium.HDD);
long tablet1Id = 1010L;
index.addTablet(new LocalTablet(tablet1Id), tabletMeta);
long tablet2Id = 1011L;
index.addTablet(new LocalTablet(tablet2Id), tabletMeta);
Map<String, Long> indexNameToId = olapTable.getIndexNameToId();
indexNameToId.put("index1", index.getId());
TabletMeta tabletMeta = new TabletMeta(db.getId(), olapTable.getId(), partitionId, index.getId(), TStorageMedium.HDD);
index.addTablet(new LocalTablet(1010L), tabletMeta);
index.addTablet(new LocalTablet(1011L), tabletMeta);
// balance stat
index.setBalanceStat(BalanceStat.createClusterTabletBalanceStat(be1.getId(), be2.getId(), 9L, 1L));
Partition partition = new Partition(partitionId, partitionId, "p1", index, new RandomDistributionInfo(2));
olapTable.addPartition(partition);
db.registerTableUnlocked(olapTable);
// 1 running tablet
TabletSchedCtx ctx3 = new TabletSchedCtx(TabletSchedCtx.Type.BALANCE, db.getId(), olapTable.getId(), partitionId,
index.getId(), 1010L, System.currentTimeMillis());
ctx3.setOrigPriority(TabletSchedCtx.Priority.NORMAL);
ctx3.setBalanceType(BalanceType.CLUSTER_TABLET);
ctx3.setStorageMedium(TStorageMedium.HDD);
Deencapsulation.invoke(tabletScheduler, "addToRunningTablets", ctx3);
TabletSchedCtx ctx = new TabletSchedCtx(TabletSchedCtx.Type.BALANCE, db.getId(), olapTable.getId(), partitionId,
index.getId(), tablet1Id, System.currentTimeMillis());
ctx.setOrigPriority(TabletSchedCtx.Priority.NORMAL);
ctx.setBalanceType(BalanceType.CLUSTER_TABLET);
ctx.setStorageMedium(TStorageMedium.HDD);
Deencapsulation.invoke(tabletScheduler, "addToRunningTablets", ctx);
}
// 2. colocate mismatch balance
{
List<Column> cols = Lists.newArrayList(new Column("province", Type.VARCHAR));
PartitionInfo listPartition = new ListPartitionInfo(PartitionType.LIST, cols);
long partitionId = 1125L;
listPartition.setDataProperty(partitionId, DataProperty.DEFAULT_DATA_PROPERTY);
listPartition.setIsInMemory(partitionId, false);
listPartition.setReplicationNum(partitionId, (short) 1);
OlapTable olapTable = new OlapTable(1124L, "colocate_table", cols, null, listPartition, null);
MaterializedIndex index = new MaterializedIndex(1100L, MaterializedIndex.IndexState.NORMAL);
TabletMeta tabletMeta = new TabletMeta(db.getId(), olapTable.getId(), partitionId, index.getId(), TStorageMedium.HDD);
long tablet1Id = 1110L;
index.addTablet(new LocalTablet(tablet1Id), tabletMeta);
long tablet2Id = 1111L;
index.addTablet(new LocalTablet(tablet2Id), tabletMeta);
Map<String, Long> indexNameToId = olapTable.getIndexNameToId();
indexNameToId.put("index1", index.getId());
// balance stat
index.setBalanceStat(BalanceStat.createColocationGroupBalanceStat(
tablet1Id, Sets.newHashSet(be1.getId()), Sets.newHashSet(be2.getId())));
Partition partition = new Partition(partitionId, partitionId, "p1", index, new HashDistributionInfo(2, cols));
olapTable.addPartition(partition);
db.registerTableUnlocked(olapTable);
// 1 pending tablet, 1 running tablet
TabletSchedCtx ctx1 = new TabletSchedCtx(TabletSchedCtx.Type.REPAIR, db.getId(), olapTable.getId(), partitionId,
index.getId(), tablet1Id, System.currentTimeMillis());
ctx1.setOrigPriority(TabletSchedCtx.Priority.NORMAL);
ctx1.setTabletStatus(LocalTablet.TabletHealthStatus.COLOCATE_MISMATCH);
ctx1.setStorageMedium(TStorageMedium.HDD);
Deencapsulation.invoke(tabletScheduler, "addToPendingTablets", ctx1);
TabletSchedCtx ctx2 = new TabletSchedCtx(TabletSchedCtx.Type.REPAIR, db.getId(), olapTable.getId(), partitionId,
index.getId(), tablet2Id, System.currentTimeMillis());
ctx2.setOrigPriority(TabletSchedCtx.Priority.NORMAL);
ctx2.setTabletStatus(LocalTablet.TabletHealthStatus.COLOCATE_MISMATCH);
ctx2.setStorageMedium(TStorageMedium.HDD);
Deencapsulation.invoke(tabletScheduler, "addToRunningTablets", ctx2);
}
new Expectations() {
@ -181,7 +237,7 @@ public class BalanceStatProcNodeTest {
BalanceStatProcNode proc = new BalanceStatProcNode(tabletScheduler);
BaseProcResult result = (BaseProcResult) proc.fetchResult();
List<List<String>> rows = result.getRows();
Assertions.assertEquals(4, rows.size());
Assertions.assertEquals(5, rows.size());
// cluster disk balanced
Assertions.assertEquals("[HDD, inter-node disk usage, true, 0, 0]", rows.get(0).toString());
@ -191,5 +247,7 @@ public class BalanceStatProcNodeTest {
Assertions.assertEquals("[HDD, intra-node disk usage, false, 2, 0]", rows.get(2).toString());
// backend tablet balanced
Assertions.assertEquals("[HDD, intra-node tablet distribution, true, 0, 0]", rows.get(3).toString());
// colocation group not balanced
Assertions.assertEquals("[HDD, colocation group, false, 1, 1]", rows.get(4).toString());
}
}
}