Skip to content

Commit f52ec4b

Browse files
committed
修复http client连接池释放的bug
1 parent f166965 commit f52ec4b

14 files changed

+135
-75
lines changed

src/main/java/love/wangqi/GatewayServerDemo.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ protected List<Route> locateRouteList(Set<Long> ids) {
4040
routeList.add(new Route(7L, HttpMethod.GET, "/css/main.css", new URL("https://blog.wangqi.love/css/main.css")));
4141
routeList.add(new Route(8L, HttpMethod.GET, "/path", new URL("http://127.0.0.1:9990/path")));
4242
routeList.add(new Route(9L, HttpMethod.GET, "/html", new URL("http://10.100.64.71/html/user.json")));
43+
routeList.add(new Route(10L, HttpMethod.GET, "/css", new URL("https://ss0.bdstatic.com/5aV1bjqh_Q23odCf/static/mantpl/css/news/init_7637f86c.css")));
4344
} catch (MalformedURLException e) {
4445
}
4546
return routeList;

src/main/java/love/wangqi/context/Attributes.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package love.wangqi.context;
22

33
import io.netty.channel.Channel;
4+
import io.netty.channel.pool.SimpleChannelPool;
45
import io.netty.handler.codec.http.FullHttpRequest;
56
import io.netty.handler.codec.http.FullHttpResponse;
67
import io.netty.util.AttributeKey;
@@ -16,4 +17,5 @@ public interface Attributes {
1617
AttributeKey<FullHttpResponse> RESPONSE = AttributeKey.newInstance("response");
1718
AttributeKey<Boolean> KEEPALIVE = AttributeKey.newInstance("keepAlive");
1819
AttributeKey<Channel> SERVER_CHANNEL = AttributeKey.newInstance("serverChannel");
20+
AttributeKey<SimpleChannelPool> CLIENT_POOL = AttributeKey.newInstance("clientPool");
1921
}

src/main/java/love/wangqi/core/DefaultChannelWriteFinishListener.java

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -17,16 +17,17 @@ public class DefaultChannelWriteFinishListener implements ChannelFutureListener
1717

1818
@Override
1919
public void operationComplete(ChannelFuture future) throws Exception {
20-
Channel channel = future.channel();
21-
Boolean keepAlive = ContextUtil.getKeepAlive(channel);
22-
logger.debug("keepAlive {}", keepAlive);
23-
logger.debug("======= serverChannelId: {}", channel.id());
20+
if (future.isSuccess()) {
21+
Channel channel = future.channel();
22+
Boolean keepAlive = ContextUtil.getKeepAlive(channel);
2423

25-
ContextUtil.getRequest(channel).release();
26-
ContextUtil.clear(channel);
24+
// if (ContextUtil.getRequest(channel) != null) {
25+
// ContextUtil.getRequest(channel).release();
26+
// }
2727

28-
if (!keepAlive) {
29-
channel.close();
28+
if (keepAlive == null || !keepAlive) {
29+
channel.close();
30+
}
3031
}
3132
}
3233
}

src/main/java/love/wangqi/core/ResponseHandler.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
import io.netty.channel.ChannelFuture;
55
import io.netty.handler.codec.http.FullHttpResponse;
66
import io.netty.handler.codec.http.HttpHeaderNames;
7-
import io.netty.handler.codec.http.HttpUtil;
7+
import io.netty.handler.codec.http.HttpHeaderValues;
88
import love.wangqi.config.GatewayConfig;
99
import love.wangqi.context.ContextUtil;
1010
import org.slf4j.Logger;
@@ -20,12 +20,13 @@ public class ResponseHandler {
2020

2121
private GatewayConfig config = GatewayConfig.getInstance();
2222

23-
public void send(Channel channel, FullHttpResponse response) {
24-
logger.debug("======= serverChannelId: {}", channel.id());
25-
logger.debug("readableBytes {}", response.content().readableBytes());
23+
public synchronized void send(Channel channel, FullHttpResponse response) {
2624
response.headers().set(HttpHeaderNames.CONTENT_LENGTH, response.content().readableBytes());
2725
Boolean keepAlive = ContextUtil.getKeepAlive(channel);
28-
HttpUtil.setKeepAlive(response, keepAlive == null ? false : keepAlive);
26+
if (keepAlive) {
27+
response.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE);
28+
}
29+
2930
ChannelFuture future = channel.writeAndFlush(response);
3031
if (config.getChannelWriteFinishListener() != null) {
3132
future.addListener(config.getChannelWriteFinishListener());

src/main/java/love/wangqi/filter/SendErrorFilter.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ public int filterOrder() {
3030
public void filter(Channel channel) throws Exception {
3131
Exception e = ContextUtil.getException(channel);
3232
if (e != null) {
33-
logger.error(e.toString());
33+
logger.error(e.getMessage(), e);
3434
config.getExceptionHandler().handle(channel, e);
3535
}
3636
}

src/main/java/love/wangqi/filter/SendResponseFilter.java

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,6 @@
88
import org.slf4j.Logger;
99
import org.slf4j.LoggerFactory;
1010

11-
import java.nio.charset.Charset;
12-
1311
/**
1412
* @author: wangqi
1513
* @description:
@@ -35,9 +33,14 @@ public int filterOrder() {
3533
public void filter(Channel channel) throws Exception {
3634
Exception e = ContextUtil.getException(channel);
3735
if (e == null) {
38-
FullHttpResponse response = ContextUtil.getResponse(channel);
39-
// logger.info("*** content {}", response.content().toString(Charset.defaultCharset()));
40-
config.getResponseHandler().send(channel, response);
36+
try {
37+
FullHttpResponse response = ContextUtil.getResponse(channel);
38+
config.getResponseHandler().send(channel, response);
39+
} catch (Exception sendException) {
40+
logger.error(sendException.getMessage(), sendException);
41+
throw sendException;
42+
}
43+
4144
} else {
4245
GatewayRunner.getInstance().errorAction(channel);
4346
}

src/main/java/love/wangqi/filter/command/HttpClientPool.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@
2828
public class HttpClientPool {
2929
private final static Logger logger = LoggerFactory.getLogger(HttpClientPool.class);
3030

31-
private final EventLoopGroup group = new NioEventLoopGroup();
31+
private final EventLoopGroup group = new NioEventLoopGroup(8 * 4);
3232
private final Bootstrap bootstrap = new Bootstrap();
3333
private ChannelPoolMap<RequestHolder, SimpleChannelPool> poolMap;
3434

@@ -51,24 +51,25 @@ protected SimpleChannelPool newPool(RequestHolder requestHolder) {
5151
};
5252
}
5353

54-
public void request(RequestHolder requestHolder, Channel serverChannel) {
55-
logger.debug("requestHolder.hashCode: {}", requestHolder.hashCode());
54+
public synchronized void request(RequestHolder requestHolder, Channel serverChannel) throws InterruptedException {
5655
final SimpleChannelPool pool = poolMap.get(requestHolder);
57-
Future<Channel> f = pool.acquire();
56+
Future<Channel> f = pool.acquire().sync();
5857
f.addListener((FutureListener<Channel>) future -> {
5958
if (future.isSuccess()) {
6059
HttpRequest request = requestHolder.request;
6160
HttpPostRequestEncoder bodyRequestEncoder = requestHolder.bodyRequestEncoder;
6261

6362
Channel clientChannel = future.getNow();
6463
clientChannel.attr(Attributes.SERVER_CHANNEL).set(serverChannel);
64+
clientChannel.attr(Attributes.CLIENT_POOL).set(pool);
65+
6566
clientChannel.write(request);
6667
if (bodyRequestEncoder != null && bodyRequestEncoder.isChunked()) {
6768
clientChannel.write(bodyRequestEncoder);
6869
}
6970
clientChannel.flush();
7071

71-
pool.release(clientChannel);
72+
// pool.release(clientChannel);
7273
}
7374
});
7475
}

src/main/java/love/wangqi/filter/command/HttpHandler.java

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -25,15 +25,19 @@ public HttpHandler() {
2525
}
2626

2727
@Override
28-
protected void channelRead0(ChannelHandlerContext ctx, FullHttpResponse response) throws Exception {
29-
logger.debug("handler hashCode: {}", this.hashCode());
30-
logger.debug("clientChannelId: {}", ctx.channel().id());
31-
Channel serverChannel = ctx.channel().attr(Attributes.SERVER_CHANNEL).get();
32-
logger.debug("serverChannelId: {}", serverChannel.id());
28+
public void channelReadComplete(ChannelHandlerContext ctx) {
29+
ctx.flush();
30+
}
31+
32+
@Override
33+
public void channelRead0(ChannelHandlerContext ctx, FullHttpResponse response) throws Exception {
34+
Channel clientChannel = ctx.channel();
35+
Channel serverChannel = clientChannel.attr(Attributes.SERVER_CHANNEL).get();
3336

3437
ContextUtil.setResponse(serverChannel, response);
35-
// ctx.channel().close();
3638
GatewayRunner.getInstance().postRoutAction(serverChannel);
39+
40+
clientChannel.attr(Attributes.CLIENT_POOL).get().release(clientChannel);
3741
}
3842

3943
@Override
@@ -45,7 +49,6 @@ public void channelActive(ChannelHandlerContext ctx) throws Exception {
4549
@Override
4650
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
4751
Channel serverChannel = ctx.channel().attr(Attributes.SERVER_CHANNEL).get();
48-
logger.debug("serverChannelId: {}", serverChannel.id());
4952
if (cause instanceof ReadTimeoutException) {
5053
logger.error("read time out");
5154
Exception exception = new GatewayTimeoutException();

src/main/java/love/wangqi/filter/command/HttpPoolHandler.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,11 +27,9 @@
2727
public class HttpPoolHandler implements ChannelPoolHandler {
2828
private static final Logger logger = LoggerFactory.getLogger(HttpPoolHandler.class);
2929

30-
private RequestHolder requestHolder;
3130
private SslContext sslCtx = null;
3231

3332
public HttpPoolHandler(RequestHolder requestHolder) {
34-
this.requestHolder = requestHolder;
3533
if (requestHolder.getProtocol().equalsIgnoreCase(HTTPS)) {
3634
try {
3735
sslCtx = SslContextBuilder.forClient()
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
package love.wangqi.handler;
2+
3+
import io.netty.channel.ChannelHandlerContext;
4+
import io.netty.channel.ChannelInboundHandlerAdapter;
5+
import io.netty.handler.codec.http.HttpResponseStatus;
6+
import love.wangqi.context.ContextUtil;
7+
import love.wangqi.exception.GatewayException;
8+
import org.slf4j.Logger;
9+
import org.slf4j.LoggerFactory;
10+
11+
/**
12+
* @author: wangqi
13+
* @description:
14+
* @date: Created in 2018-11-30 09:14
15+
*/
16+
public class ExceptionHandler extends ChannelInboundHandlerAdapter {
17+
private static final Logger logger = LoggerFactory.getLogger(ExceptionHandler.class);
18+
19+
@Override
20+
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
21+
logger.error(cause.getMessage(), cause);
22+
GatewayRunner runner = GatewayRunner.getInstance();
23+
Exception exception = new GatewayException(HttpResponseStatus.INTERNAL_SERVER_ERROR, "UNHANDLED_EXCEPTION_" + cause.getClass().getName());
24+
ContextUtil.setException(ctx.channel(), exception);
25+
runner.errorAction(ctx.channel());
26+
}
27+
}

0 commit comments

Comments
 (0)