[Enhancement] Speedup some UT (#61502)

Signed-off-by: Binglin Chang <decstery@gmail.com>
This commit is contained in:
Binglin Chang 2025-08-01 12:59:00 +08:00 committed by GitHub
parent 84a7a0132b
commit 0dfbef56e5
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 77 additions and 28 deletions

View File

@ -1236,6 +1236,7 @@ public class GlobalStateMgr {
// wait until FE is ready.
public void waitForReady() throws InterruptedException {
long lastLoggingTimeMs = System.currentTimeMillis();
long lastInfoLogTimeMs = System.currentTimeMillis();
while (true) {
if (isReady()) {
LOG.info("globalStateMgr is ready. FE type: {}", feType);
@ -1243,8 +1244,13 @@ public class GlobalStateMgr {
break;
}
Thread.sleep(2000);
LOG.info("wait globalStateMgr to be ready. FE type: {}. is ready: {}", feType, isReady.get());
Thread.sleep(20);
long currentTimeMs = System.currentTimeMillis();
if (currentTimeMs - lastInfoLogTimeMs > 2000L) {
lastInfoLogTimeMs = currentTimeMs;
LOG.info("wait globalStateMgr to be ready. FE type: {}. is ready: {}", feType, isReady.get());
}
if (System.currentTimeMillis() - lastLoggingTimeMs > 60000L) {
lastLoggingTimeMs = System.currentTimeMillis();

View File

@ -51,6 +51,7 @@ import com.starrocks.qe.DDLStmtExecutor;
import com.starrocks.server.GlobalStateMgr;
import com.starrocks.sql.ast.AlterTableStmt;
import com.starrocks.utframe.TestWithFeService;
import com.starrocks.utframe.UtFrameUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.junit.jupiter.api.Assertions;
@ -69,6 +70,11 @@ public class SchemaChangeHandlerTest extends TestWithFeService {
private static final Logger LOG = LogManager.getLogger(SchemaChangeHandlerTest.class);
private int jobSize = 0;
@Override
protected void createStarrocksCluster() {
UtFrameUtils.createMinStarRocksCluster(false, runMode);
}
@Override
protected void runBeforeAll() throws Exception {
// set some parameters to speedup test

View File

@ -73,6 +73,11 @@ public class SchemaChangeHandlerWithMVTest extends TestWithFeService {
}
}
@Override
protected void createStarrocksCluster() {
UtFrameUtils.createMinStarRocksCluster(false, runMode);
}
private void waitAlterJobDone(Map<Long, AlterJobV2> alterJobs) throws Exception {
for (AlterJobV2 alterJobV2 : alterJobs.values()) {
while (!alterJobV2.getJobState().isFinalState()) {

View File

@ -293,36 +293,60 @@ public class MockedBackend {
}
private static class MockBeThriftClient extends BackendService.Client {
// task queue to save all agent tasks coming from Frontend
private final BlockingQueue<TAgentTaskRequest> taskQueue = Queues.newLinkedBlockingQueue();
// Shared static resources across all MockBeThriftClient instances
private static BlockingQueue<TaskWrapper> sharedTaskQueue = Queues.newLinkedBlockingQueue();
private static LeaderImpl sharedMaster = new LeaderImpl();
private static volatile boolean workerThreadStarted = false;
private static Object lock = new Object();
// Wrapper class to include backend reference with task
private static class TaskWrapper {
final TAgentTaskRequest request;
final TBackend backend;
final long reportVersion;
TaskWrapper(TAgentTaskRequest request, TBackend backend, long reportVersion) {
this.request = request;
this.backend = backend;
this.reportVersion = reportVersion;
}
}
private final TBackend tBackend;
private long reportVersion = 0;
private final LeaderImpl master = new LeaderImpl();
public MockBeThriftClient(MockedBackend backend) {
super(null);
tBackend = new TBackend(backend.getHost(), backend.getBeThriftPort(), backend.getHttpPort());
new Thread(() -> {
while (true) {
try {
TAgentTaskRequest request = taskQueue.take();
TFinishTaskRequest finishTaskRequest = new TFinishTaskRequest(tBackend,
request.getTask_type(), request.getSignature(), new TStatus(TStatusCode.OK));
finishTaskRequest.setReport_version(++reportVersion);
if (request.getTask_type() == CREATE) {
TTabletInfo tabletInfo = new TTabletInfo();
tabletInfo.setPath_hash(PATH_HASH);
tabletInfo.setData_size(0);
tabletInfo.setTablet_id(request.getSignature());
finishTaskRequest.setFinish_tablet_infos(Lists.newArrayList(tabletInfo));
// Start the shared worker thread only once
synchronized (lock) {
if (!workerThreadStarted) {
workerThreadStarted = true;
new Thread(() -> {
while (true) {
try {
TaskWrapper taskWrapper = sharedTaskQueue.take();
TFinishTaskRequest finishTaskRequest = new TFinishTaskRequest(taskWrapper.backend,
taskWrapper.request.getTask_type(), taskWrapper.request.getSignature(),
new TStatus(TStatusCode.OK));
finishTaskRequest.setReport_version(taskWrapper.reportVersion);
if (taskWrapper.request.getTask_type() == CREATE) {
TTabletInfo tabletInfo = new TTabletInfo();
tabletInfo.setPath_hash(PATH_HASH);
tabletInfo.setData_size(0);
tabletInfo.setTablet_id(taskWrapper.request.getSignature());
finishTaskRequest.setFinish_tablet_infos(Lists.newArrayList(tabletInfo));
}
sharedMaster.finishTask(finishTaskRequest);
} catch (Exception e) {
e.printStackTrace();
}
}
master.finishTask(finishTaskRequest);
} catch (Exception e) {
e.printStackTrace();
}
}, "shared-mock-be-thrift-worker").start();
}
}, "mock-be-thrift-client-" + backend.getBackendId()).start();
}
}
@Override
@ -347,7 +371,9 @@ public class MockedBackend {
@Override
public TAgentResult submit_tasks(List<TAgentTaskRequest> tasks) {
taskQueue.addAll(tasks);
for (TAgentTaskRequest task : tasks) {
sharedTaskQueue.add(new TaskWrapper(task, tBackend, ++reportVersion));
}
return new TAgentResult(new TStatus(TStatusCode.OK));
}

View File

@ -311,11 +311,13 @@ public class MockedFrontend {
private void waitForCatalogReady(FERunnable fe) throws FeStartException {
int tryCount = 0;
while (!fe.isReady() && tryCount < 600) {
while (!fe.isReady() && tryCount < 6000) {
try {
tryCount++;
Thread.sleep(1000);
System.out.println("globalStateMgr is not ready, wait for 1 second");
Thread.sleep(100);
if (tryCount % 10 == 0) {
System.out.println("globalStateMgr is not ready, wait for 1 second");
}
} catch (InterruptedException e) {
e.printStackTrace();
}

View File

@ -302,6 +302,10 @@ public class UtFrameUtils {
}
public static synchronized void createMinStarRocksCluster(boolean startBDB, RunMode runMode) {
createMinStarRocksCluster(startBDB, runMode, "fe/mocked/test/" + UUID.randomUUID().toString() + "/");
}
public static synchronized void createMinStarRocksCluster(boolean startBDB, RunMode runMode, String runningDir) {
// to avoid call createMinStarRocksCluster multiple times
if (CREATED_MIN_CLUSTER.get()) {
return;
@ -310,7 +314,7 @@ public class UtFrameUtils {
ThriftConnectionPool.beHeartbeatPool = new MockGenericPool.HeatBeatPool("heartbeat");
ThriftConnectionPool.backendPool = new MockGenericPool.BackendThriftPool("backend");
startFEServer("fe/mocked/test/" + UUID.randomUUID().toString() + "/", startBDB, runMode);
startFEServer(runningDir, startBDB, runMode);
addMockBackend(10001);