[Enhancement] Health Action need to be handled synchronously (backport #62490) (#62762)

Signed-off-by: crossoverJie <crossoverJie@gmail.com>
Co-authored-by: crossoverJie <crossoverJie@gmail.com>
This commit is contained in:
mergify[bot] 2025-09-09 09:20:53 +08:00 committed by GitHub
parent 1710f4d901
commit 08e4100bd6
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 192 additions and 90 deletions

View File

@ -68,4 +68,10 @@ public class HealthAction extends RestBaseAction {
sendResult(request, response, result);
}
}
@Override
public boolean supportAsyncHandler() {
// Health Action need to be handled synchronously
return false;
}
}

View File

@ -0,0 +1,69 @@
// Copyright 2021-present StarRocks, Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package com.starrocks.http;
import com.starrocks.http.rest.HealthAction;
import io.netty.handler.codec.http.DefaultFullHttpRequest;
import io.netty.handler.codec.http.HttpMethod;
import io.netty.handler.codec.http.HttpVersion;
import io.netty.util.ReferenceCountUtil;
import org.junit.jupiter.api.Test;
import java.util.concurrent.atomic.AtomicInteger;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
public class HealthActionTest extends HttpServerTestUtils {
@Test
public void testAsyncHandleHealth() throws Exception {
String uri = "/api/health";
ActionController controller = new ActionController();
MockHealthAction action = new MockHealthAction(controller);
controller.registerHandler(HttpMethod.GET, uri, action);
HttpServerTestUtils.MockExecutor executor = new HttpServerTestUtils.MockExecutor();
executor.setRejectExecute(false);
MockChannelHandlerContext context = createChannelHandlerContext();
DefaultFullHttpRequest request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, uri);
HttpServerHandler handler = new HttpServerHandler(controller, executor);
assertEquals(1, ReferenceCountUtil.refCnt(request));
assertEquals(0, action.executeCount());
handler.channelRead(context, request);
assertEquals(1, action.executeCount());
assertEquals(0, ReferenceCountUtil.refCnt(request));
assertEquals(0, executor.pendingTaskCount());
assertEquals(0, context.numResponses());
assertFalse(context.isFlushed());
}
private static class MockHealthAction extends HealthAction {
private final AtomicInteger executeCount = new AtomicInteger(0);
public MockHealthAction(ActionController controller) {
super(controller);
}
@Override
public void handleRequest(BaseRequest request) {
executeCount.incrementAndGet();
}
int executeCount() {
return executeCount.get();
}
}
}

View File

@ -14,37 +14,23 @@
package com.starrocks.http;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.http.DefaultFullHttpRequest;
import io.netty.handler.codec.http.DefaultFullHttpResponse;
import io.netty.handler.codec.http.HttpMethod;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.HttpVersion;
import io.netty.util.Attribute;
import io.netty.util.ReferenceCountUtil;
import org.junit.jupiter.api.Test;
import java.nio.charset.StandardCharsets;
import java.util.LinkedList;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.atomic.AtomicInteger;
import static com.starrocks.http.HttpServerHandler.HTTP_CONNECT_CONTEXT_ATTRIBUTE_KEY;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.same;
import static org.mockito.Mockito.CALLS_REAL_METHODS;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.mockito.Mockito.withSettings;
public class HttpServerHandlerTest {
public class HttpServerHandlerTest extends HttpServerTestUtils {
@Test
public void testSyncHandle() throws Exception {
@ -176,12 +162,7 @@ public class HttpServerHandlerTest {
assertTrue(httpResponse.content().toString(StandardCharsets.UTF_8).contains(expectContent));
}
private MockChannelHandlerContext createChannelHandlerContext() {
return mock(MockChannelHandlerContext.class,
withSettings()
.useConstructor()
.defaultAnswer(CALLS_REAL_METHODS));
}
private static class MockAction extends BaseAction {
@ -221,76 +202,7 @@ public class HttpServerHandlerTest {
}
}
private static class MockExecutor implements Executor {
private boolean rejectExecute = false;
private final LinkedList<Runnable> pendingTasks = new LinkedList<>();
public void setRejectExecute(boolean rejectExecute) {
this.rejectExecute = rejectExecute;
}
int pendingTaskCount() {
return pendingTasks.size();
}
@Override
public void execute(Runnable runnable) {
if (rejectExecute) {
throw new RejectedExecutionException("mock reject");
}
pendingTasks.add(runnable);
}
public void runOneTask() {
if (!pendingTasks.isEmpty()) {
pendingTasks.pollFirst().run();
}
}
}
private abstract static class MockChannelHandlerContext implements ChannelHandlerContext {
private final Channel channel;
private final ChannelFuture channelFuture;
private final ConcurrentLinkedQueue<Object> responses = new ConcurrentLinkedQueue<>();
private volatile boolean flushed = false;
public MockChannelHandlerContext() {
Attribute attribute = mock(Attribute.class);
when(attribute.get()).thenReturn(null);
this.channel = mock(Channel.class);
when(channel.attr(same(HTTP_CONNECT_CONTEXT_ATTRIBUTE_KEY))).thenReturn(attribute);
this.channelFuture = mock(ChannelFuture.class);
}
@Override
public ChannelHandlerContext flush() {
this.flushed = true;
return this;
}
public boolean isFlushed() {
return flushed;
}
@Override
public ChannelFuture writeAndFlush(Object object) {
responses.add(object);
return channelFuture;
}
@Override
public Channel channel() {
return channel;
}
public int numResponses() {
return responses.size();
}
public Object pollResponse() {
return responses.poll();
}
}
}

View File

@ -0,0 +1,115 @@
// Copyright 2021-present StarRocks, Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package com.starrocks.http;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.util.Attribute;
import java.util.LinkedList;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.RejectedExecutionException;
import static com.starrocks.http.HttpServerHandler.HTTP_CONNECT_CONTEXT_ATTRIBUTE_KEY;
import static org.mockito.ArgumentMatchers.same;
import static org.mockito.Mockito.CALLS_REAL_METHODS;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.mockito.Mockito.withSettings;
public class HttpServerTestUtils {
protected static class MockExecutor implements Executor {
private boolean rejectExecute = false;
private final LinkedList<Runnable> pendingTasks = new LinkedList<>();
public void setRejectExecute(boolean rejectExecute) {
this.rejectExecute = rejectExecute;
}
int pendingTaskCount() {
return pendingTasks.size();
}
@Override
public void execute(Runnable runnable) {
if (rejectExecute) {
throw new RejectedExecutionException("mock reject");
}
pendingTasks.add(runnable);
}
public void runOneTask() {
if (!pendingTasks.isEmpty()) {
pendingTasks.pollFirst().run();
}
}
}
protected MockChannelHandlerContext createChannelHandlerContext() {
return mock(MockChannelHandlerContext.class,
withSettings()
.useConstructor()
.defaultAnswer(CALLS_REAL_METHODS));
}
protected abstract static class MockChannelHandlerContext implements ChannelHandlerContext {
private final Channel channel;
private final ChannelFuture channelFuture;
private final ConcurrentLinkedQueue<Object> responses = new ConcurrentLinkedQueue<>();
private volatile boolean flushed = false;
public MockChannelHandlerContext() {
Attribute attribute = mock(Attribute.class);
when(attribute.get()).thenReturn(null);
this.channel = mock(Channel.class);
when(channel.attr(same(HTTP_CONNECT_CONTEXT_ATTRIBUTE_KEY))).thenReturn(attribute);
this.channelFuture = mock(ChannelFuture.class);
}
@Override
public ChannelHandlerContext flush() {
this.flushed = true;
return this;
}
public boolean isFlushed() {
return flushed;
}
@Override
public ChannelFuture writeAndFlush(Object object) {
responses.add(object);
return channelFuture;
}
@Override
public Channel channel() {
return channel;
}
public int numResponses() {
return responses.size();
}
public Object pollResponse() {
return responses.poll();
}
}
}