Skip to content

Commit 6928c01

Browse files
committed
fix ForwardCommand
1 parent 7d1b5cc commit 6928c01

File tree

9 files changed

+69
-77
lines changed

9 files changed

+69
-77
lines changed

pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@
2828
<dependency>
2929
<groupId>io.netty</groupId>
3030
<artifactId>netty-all</artifactId>
31-
<version>4.1.25.Final</version>
31+
<version>4.1.28.Final</version>
3232
</dependency>
3333
<dependency>
3434
<groupId>com.netflix.hystrix</groupId>

src/main/java/love/wangqi/codec/DefaultHttpRequestBuilder.java

Lines changed: 6 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -28,28 +28,13 @@ public class DefaultHttpRequestBuilder implements HttpRequestBuilder {
2828
private static final Logger logger = LoggerFactory.getLogger(DefaultHttpRequestBuilder.class);
2929

3030
private RouteMapper routeMapper;
31-
private FullHttpRequest originRequest;
3231
private HttpRequest newRequest;
3332
private HttpPostRequestEncoder newBodyRequestEncoder;
3433

3534
protected HttpRequestDecomposer httpRequestDecomposer;
3635

3736
HttpDataFactory factory = new DefaultHttpDataFactory(DefaultHttpDataFactory.MINSIZE);
3837

39-
public class RequestHolder {
40-
public Route route;
41-
public URL url;
42-
public HttpRequest request;
43-
public HttpPostRequestEncoder bodyRequestEncoder;
44-
45-
public RequestHolder(Route route, URL url, HttpRequest request, HttpPostRequestEncoder bodyRequestEncoder) {
46-
this.route = route;
47-
this.url = url;
48-
this.request = request;
49-
this.bodyRequestEncoder = bodyRequestEncoder;
50-
}
51-
}
52-
5338
public DefaultHttpRequestBuilder() {
5439
}
5540

@@ -60,25 +45,19 @@ public DefaultHttpRequestBuilder setRouteMapper(RouteMapper routeMapper) {
6045
}
6146

6247
@Override
63-
public DefaultHttpRequestBuilder setOriginRequest(FullHttpRequest originRequest) {
64-
this.originRequest = originRequest;
65-
this.httpRequestDecomposer = new HttpRequestDecomposer(originRequest);
66-
return this;
67-
}
68-
69-
@Override
70-
public Route getRoute() {
71-
return routeMapper.getRoute(this.originRequest);
48+
public Route getRoute(FullHttpRequest originRequest) {
49+
return routeMapper.getRoute(originRequest);
7250
}
7351

7452
@Override
75-
public RequestHolder build() throws Exception {
76-
Route route = getRoute();
53+
public RequestHolder build(FullHttpRequest originRequest) throws Exception {
54+
httpRequestDecomposer = new HttpRequestDecomposer(originRequest);
55+
Route route = getRoute(originRequest);
7756
if (route == null) {
7857
throw new NoRouteException();
7958
}
8059
URL url = route.getMapUrl();
81-
logger.info(url.toString());
60+
// logger.info(url.toString());
8261

8362
// 请求路径
8463
QueryStringEncoder queryStringEncoder = new QueryStringEncoder(url.getPath());

src/main/java/love/wangqi/codec/HttpRequestBuilder.java

Lines changed: 2 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -18,23 +18,16 @@ public interface HttpRequestBuilder {
1818
*/
1919
DefaultHttpRequestBuilder setRouteMapper(RouteMapper routeMapper);
2020

21-
/**
22-
* 设置原始请求
23-
* @param originRequest
24-
* @return
25-
*/
26-
DefaultHttpRequestBuilder setOriginRequest(FullHttpRequest originRequest);
27-
2821
/**
2922
* 生成新的请求
3023
* @return
3124
* @throws Exception
3225
*/
33-
DefaultHttpRequestBuilder.RequestHolder build() throws Exception;
26+
RequestHolder build(FullHttpRequest originRequest) throws Exception;
3427

3528
/**
3629
* 获取路由
3730
* @return
3831
*/
39-
Route getRoute();
32+
Route getRoute(FullHttpRequest originRequest);
4033
}
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
package love.wangqi.codec;
2+
3+
import io.netty.handler.codec.http.HttpRequest;
4+
import io.netty.handler.codec.http.multipart.HttpPostRequestEncoder;
5+
import love.wangqi.route.Route;
6+
7+
import java.net.URL;
8+
9+
/**
10+
* @author: wangqi
11+
* @description:
12+
* @date: Created in 2018/7/28 22:37
13+
*/
14+
public class RequestHolder {
15+
public Route route;
16+
public URL url;
17+
public HttpRequest request;
18+
public HttpPostRequestEncoder bodyRequestEncoder;
19+
20+
public RequestHolder(Route route, URL url, HttpRequest request, HttpPostRequestEncoder bodyRequestEncoder) {
21+
this.route = route;
22+
this.url = url;
23+
this.request = request;
24+
this.bodyRequestEncoder = bodyRequestEncoder;
25+
}
26+
}

src/main/java/love/wangqi/exception/handler/DefaultExceptionHandler.java

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
import love.wangqi.exception.TimeoutException;
1111

1212
import java.net.ConnectException;
13+
import java.util.concurrent.RejectedExecutionException;
1314

1415
/**
1516
* @author: wangqi
@@ -32,6 +33,10 @@ public ExceptionResponse getExceptionResponse(Exception exception) {
3233
exceptionResponse.setStatus(HttpResponseStatus.REQUEST_TIMEOUT);
3334
exceptionResponse.setContentType("text/plain");
3435
exceptionResponse.setContent("request timeout");
36+
} else if (exception instanceof RejectedExecutionException) {
37+
exceptionResponse.setStatus(HttpResponseStatus.TOO_MANY_REQUESTS);
38+
exceptionResponse.setContentType("text/plain");
39+
exceptionResponse.setContent("too many requests");
3540
} else {
3641
exceptionResponse.setStatus(HttpResponseStatus.INTERNAL_SERVER_ERROR);
3742
exceptionResponse.setContentType("text/plain");
@@ -43,11 +48,11 @@ public ExceptionResponse getExceptionResponse(Exception exception) {
4348
@Override
4449
public void send(ChannelHandlerContext ctx, ExceptionResponse exceptionResponse) {
4550
String content = exceptionResponse.getContent();
46-
FullHttpResponse response;
47-
response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, exceptionResponse.getStatus());
51+
FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, exceptionResponse.getStatus());
4852
if (content != null) {
4953
response.headers().set("X-Ca-Error-Message", content);
5054
}
51-
ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
55+
ctx.channel().writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
56+
ctx.channel().close();
5257
}
5358
}
Lines changed: 6 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
package love.wangqi.handler;
22

3-
import io.netty.channel.Channel;
3+
import io.netty.channel.ChannelHandlerContext;
44
import io.netty.channel.ChannelInitializer;
55
import io.netty.channel.ChannelPipeline;
66
import io.netty.channel.socket.SocketChannel;
@@ -17,29 +17,23 @@
1717
*/
1818
public class BackendFilter extends ChannelInitializer<SocketChannel> {
1919
private final SslContext sslCtx;
20-
private final Channel inboundChannel;
20+
private final ChannelHandlerContext ctx;
2121

22-
public BackendFilter(SslContext sslCtx, Channel inboundChannel) {
22+
public BackendFilter(SslContext sslCtx, ChannelHandlerContext ctx) {
2323
this.sslCtx = sslCtx;
24-
this.inboundChannel = inboundChannel;
24+
this.ctx = ctx;
2525
}
2626

2727
@Override
2828
protected void initChannel(SocketChannel ch) throws Exception {
2929
ChannelPipeline pipeline = ch.pipeline();
30-
3130
if (sslCtx != null) {
3231
pipeline.addLast("ssl", sslCtx.newHandler(ch.alloc()));
3332
}
34-
3533
pipeline.addLast("codec", new HttpClientCodec());
36-
3734
pipeline.addLast("inflater", new HttpContentDecompressor());
38-
39-
pipeline.addLast(new HttpObjectAggregator(1048576));
40-
35+
pipeline.addLast(new HttpObjectAggregator(1024 * 1024 * 64));
4136
pipeline.addLast("chunkedWriter", new ChunkedWriteHandler());
42-
43-
pipeline.addLast("handler", new BackendHandler(inboundChannel));
37+
pipeline.addLast("handler", new BackendHandler(ctx));
4438
}
4539
}
Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
11
package love.wangqi.handler;
22

3-
import io.netty.channel.Channel;
43
import io.netty.channel.ChannelFutureListener;
54
import io.netty.channel.ChannelHandlerContext;
65
import io.netty.channel.ChannelInboundHandlerAdapter;
6+
import io.netty.handler.codec.http.FullHttpResponse;
77
import org.slf4j.Logger;
88
import org.slf4j.LoggerFactory;
99

@@ -15,15 +15,15 @@
1515
public class BackendHandler extends ChannelInboundHandlerAdapter {
1616
private Logger logger = LoggerFactory.getLogger(BackendHandler.class);
1717

18-
private Channel inboundChannel;
18+
private ChannelHandlerContext ctx;
1919

20-
BackendHandler(Channel inboundChannel) {
21-
this.inboundChannel = inboundChannel;
20+
BackendHandler(ChannelHandlerContext ctx) {
21+
this.ctx = ctx;
2222
}
2323

2424
@Override
2525
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
26-
inboundChannel.writeAndFlush(msg).addListener(ChannelFutureListener.CLOSE);
26+
this.ctx.channel().writeAndFlush(msg).addListener(ChannelFutureListener.CLOSE);
2727
ctx.channel().close();
2828
}
2929
}

src/main/java/love/wangqi/handler/FrontHandler.java

Lines changed: 8 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,10 @@
22

33
import io.netty.channel.ChannelHandlerContext;
44
import io.netty.channel.ChannelInboundHandlerAdapter;
5-
import io.netty.handler.codec.http.FullHttpRequest;
5+
import io.netty.handler.codec.http.*;
66
import love.wangqi.codec.DefaultHttpRequestBuilder;
77
import love.wangqi.codec.HttpRequestBuilder;
8+
import love.wangqi.codec.RequestHolder;
89
import love.wangqi.filter.HttpRequestFilter;
910
import love.wangqi.handler.command.ForwardCommand;
1011
import love.wangqi.server.GatewayServer;
@@ -33,24 +34,19 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
3334
httpRequestFilter.filter(GatewayServer.config, ctx, httpRequest);
3435
}
3536

36-
HttpRequestBuilder httpRequestBuilder = GatewayServer.config.getHttpRequestBuilder()
37-
.setRouteMapper(GatewayServer.config.getRouteMapper())
38-
.setOriginRequest(httpRequest);
37+
HttpRequestBuilder httpRequestBuilder = new DefaultHttpRequestBuilder()
38+
.setRouteMapper(GatewayServer.config.getRouteMapper());
39+
40+
RequestHolder requestHolder = httpRequestBuilder.build(httpRequest);
3941

40-
DefaultHttpRequestBuilder.RequestHolder requestHolder = httpRequestBuilder.build();
4142
ForwardCommand forwardCommand = new ForwardCommand(ctx, requestHolder);
4243
forwardCommand.queue();
4344
} catch (Exception e) {
4445
logger.error(e.toString());
4546
GatewayServer.config.getExceptionHandler().handle(ctx, e);
46-
} finally {
47+
}
48+
finally {
4749
httpRequest.release();
4850
}
4951
}
50-
51-
@Override
52-
public void channelActive(ChannelHandlerContext ctx) throws Exception {
53-
logger.info("连接的客户端地址:{}", ctx.channel().remoteAddress());
54-
super.channelActive(ctx);
55-
}
5652
}

src/main/java/love/wangqi/handler/command/ForwardCommand.java

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
import io.netty.handler.ssl.SslContext;
1414
import io.netty.handler.ssl.SslContextBuilder;
1515
import io.netty.handler.ssl.util.InsecureTrustManagerFactory;
16-
import love.wangqi.codec.DefaultHttpRequestBuilder;
16+
import love.wangqi.codec.RequestHolder;
1717
import love.wangqi.exception.TimeoutException;
1818
import love.wangqi.handler.BackendFilter;
1919
import love.wangqi.server.GatewayServer;
@@ -27,9 +27,9 @@
2727
*/
2828
public class ForwardCommand extends HystrixCommand<Void> {
2929
private ChannelHandlerContext ctx;
30-
private DefaultHttpRequestBuilder.RequestHolder requestHolder;
30+
private RequestHolder requestHolder;
3131

32-
public ForwardCommand(ChannelHandlerContext ctx, DefaultHttpRequestBuilder.RequestHolder requestHolder) {
32+
public ForwardCommand(ChannelHandlerContext ctx, RequestHolder requestHolder) {
3333
super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("ForwardCommandGroup"))
3434
.andCommandKey(HystrixCommandKey.Factory.asKey("ForwardCommand"))
3535
.andThreadPoolKey(HystrixThreadPoolKey.Factory.asKey("threadPool-" + requestHolder.route.getId()))
@@ -46,15 +46,14 @@ public ForwardCommand(ChannelHandlerContext ctx, DefaultHttpRequestBuilder.Reque
4646
.withFallbackIsolationSemaphoreMaxConcurrentRequests(100)
4747
)
4848
);
49-
5049
this.ctx = ctx;
5150
this.requestHolder = requestHolder;
5251
}
5352

5453

5554
@Override
5655
protected Void run() throws Exception {
57-
forward(ctx.channel(), requestHolder.url, requestHolder.request, requestHolder.bodyRequestEncoder);
56+
forward(ctx, requestHolder.url, requestHolder.request, requestHolder.bodyRequestEncoder);
5857
return null;
5958
}
6059

@@ -99,7 +98,7 @@ private UrlMetadata getProtocol(URL url) {
9998
return new UrlMetadata(protocol, host, port);
10099
}
101100

102-
private void forward(Channel inboundChannel, URL url, HttpRequest request, HttpPostRequestEncoder bodyRequestEncoder) throws Exception {
101+
private void forward(ChannelHandlerContext ctx, URL url, HttpRequest request, HttpPostRequestEncoder bodyRequestEncoder) throws Exception {
103102
UrlMetadata urlMetadata = getProtocol(url);
104103
// Configure SSL context if necessary.
105104
final boolean ssl = "https".equalsIgnoreCase(urlMetadata.protocol);
@@ -111,12 +110,12 @@ private void forward(Channel inboundChannel, URL url, HttpRequest request, HttpP
111110
sslCtx = null;
112111
}
113112

114-
EventLoopGroup group = new NioEventLoopGroup();
113+
EventLoopGroup group = new NioEventLoopGroup(1);
115114
try {
116115
Bootstrap b = new Bootstrap();
117116
b.group(group)
118117
.channel(NioSocketChannel.class)
119-
.handler(new BackendFilter(sslCtx, inboundChannel));
118+
.handler(new BackendFilter(sslCtx, ctx));
120119

121120
Channel ch = b.connect(urlMetadata.host, urlMetadata.port).sync().channel();
122121

0 commit comments

Comments
 (0)