Skip to content
This repository was archived by the owner on Jun 29, 2022. It is now read-only.

Commit dc79d27

Browse files
author
Borja Gorriz
authored
Merge pull request #1 from metrolab/BOD-1602
[BOD-1602] Added proxy handler in the netty websocket channel pipeline
2 parents 62ac3fc + e202a21 commit dc79d27

File tree

5 files changed

+655
-205
lines changed

5 files changed

+655
-205
lines changed

pom.xml

Lines changed: 7 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919

2020
<groupId>com.google.firebase</groupId>
2121
<artifactId>firebase-admin</artifactId>
22-
<version>6.8.2-SNAPSHOT</version>
22+
<version>6.8.metropolis</version>
2323
<packaging>jar</packaging>
2424

2525
<name>firebase-admin</name>
@@ -59,7 +59,7 @@
5959
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
6060
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
6161
<skipUTs>${skipTests}</skipUTs>
62-
<netty.version>4.1.34.Final</netty.version>
62+
<netty.version>4.1.37.Final</netty.version>
6363
</properties>
6464

6565
<scm>
@@ -295,26 +295,6 @@
295295
</resource>
296296
</resources>
297297
<plugins>
298-
<plugin>
299-
<artifactId>maven-checkstyle-plugin</artifactId>
300-
<version>2.17</version>
301-
<executions>
302-
<execution>
303-
<id>validate</id>
304-
<phase>validate</phase>
305-
<configuration>
306-
<configLocation>checkstyle.xml</configLocation>
307-
<encoding>UTF-8</encoding>
308-
<consoleOutput>true</consoleOutput>
309-
<failsOnError>true</failsOnError>
310-
<includeTestSourceDirectory>true</includeTestSourceDirectory>
311-
</configuration>
312-
<goals>
313-
<goal>check</goal>
314-
</goals>
315-
</execution>
316-
</executions>
317-
</plugin>
318298
<plugin>
319299
<artifactId>maven-compiler-plugin</artifactId>
320300
<version>3.6.1</version>
@@ -330,18 +310,6 @@
330310
<skipTests>${skipUTs}</skipTests>
331311
</configuration>
332312
</plugin>
333-
<plugin>
334-
<artifactId>maven-failsafe-plugin</artifactId>
335-
<version>2.19.1</version>
336-
<executions>
337-
<execution>
338-
<goals>
339-
<goal>integration-test</goal>
340-
<goal>verify</goal>
341-
</goals>
342-
</execution>
343-
</executions>
344-
</plugin>
345313
<plugin>
346314
<artifactId>maven-javadoc-plugin</artifactId>
347315
<version>2.10.4</version>
@@ -451,6 +419,11 @@
451419
<artifactId>netty-transport</artifactId>
452420
<version>${netty.version}</version>
453421
</dependency>
422+
<dependency>
423+
<groupId>io.netty</groupId>
424+
<artifactId>netty-handler-proxy</artifactId>
425+
<version>${netty.version}</version>
426+
</dependency>
454427

455428
<!-- Test Dependencies -->
456429
<dependency>
Lines changed: 288 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,288 @@
1+
package com.google.firebase.database.connection;
2+
3+
4+
/*
5+
* Copyright 2012 The Netty Project
6+
*
7+
* The Netty Project licenses this file to you under the Apache License,
8+
* version 2.0 (the "License"); you may not use this file except in compliance
9+
* with the License. You may obtain a copy of the License at:
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
15+
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
16+
* License for the specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
21+
import io.netty.buffer.ByteBuf;
22+
import io.netty.channel.ChannelHandlerContext;
23+
import io.netty.channel.ChannelPipeline;
24+
import io.netty.channel.CombinedChannelDuplexHandler;
25+
import io.netty.handler.codec.PrematureChannelClosureException;
26+
import io.netty.handler.codec.http.*;
27+
import io.netty.util.ReferenceCountUtil;
28+
29+
import java.util.ArrayDeque;
30+
import java.util.List;
31+
import java.util.Queue;
32+
import java.util.concurrent.atomic.AtomicLong;
33+
34+
35+
public final class CustomHttpClientCodec extends CombinedChannelDuplexHandler<HttpResponseDecoder, HttpRequestEncoder>
36+
implements HttpClientUpgradeHandler.SourceCodec {
37+
38+
/**
39+
* A queue that is used for correlating a request and a response.
40+
*/
41+
private final Queue<HttpMethod> queue = new ArrayDeque<HttpMethod>();
42+
private final boolean parseHttpAfterConnectRequest;
43+
44+
/**
45+
* If true, decoding stops (i.e. pass-through)
46+
*/
47+
private boolean done;
48+
49+
private final AtomicLong requestResponseCounter = new AtomicLong();
50+
private final boolean failOnMissingResponse;
51+
52+
/**
53+
* Creates a new instance with the default decoder options
54+
* ({@code maxInitialLineLength (4096}}, {@code maxHeaderSize (8192)}, and
55+
* {@code maxChunkSize (8192)}).
56+
*/
57+
public CustomHttpClientCodec() {
58+
this(4096, 8192, 8192, false);
59+
}
60+
61+
/**
62+
* Creates a new instance with the specified decoder options.
63+
*/
64+
public CustomHttpClientCodec(int maxInitialLineLength, int maxHeaderSize, int maxChunkSize) {
65+
this(maxInitialLineLength, maxHeaderSize, maxChunkSize, false);
66+
}
67+
68+
/**
69+
* Creates a new instance with the specified decoder options.
70+
*/
71+
public CustomHttpClientCodec(
72+
int maxInitialLineLength, int maxHeaderSize, int maxChunkSize, boolean failOnMissingResponse) {
73+
this(maxInitialLineLength, maxHeaderSize, maxChunkSize, failOnMissingResponse, true);
74+
}
75+
76+
/**
77+
* Creates a new instance with the specified decoder options.
78+
*/
79+
public CustomHttpClientCodec(
80+
int maxInitialLineLength, int maxHeaderSize, int maxChunkSize, boolean failOnMissingResponse,
81+
boolean validateHeaders) {
82+
this(maxInitialLineLength, maxHeaderSize, maxChunkSize, failOnMissingResponse, validateHeaders, false);
83+
}
84+
85+
/**
86+
* Creates a new instance with the specified decoder options.
87+
*/
88+
public CustomHttpClientCodec(
89+
int maxInitialLineLength, int maxHeaderSize, int maxChunkSize, boolean failOnMissingResponse,
90+
boolean validateHeaders, boolean parseHttpAfterConnectRequest) {
91+
init(new Decoder(maxInitialLineLength, maxHeaderSize, maxChunkSize, validateHeaders), new Encoder());
92+
this.failOnMissingResponse = failOnMissingResponse;
93+
this.parseHttpAfterConnectRequest = parseHttpAfterConnectRequest;
94+
}
95+
96+
/**
97+
* Creates a new instance with the specified decoder options.
98+
*/
99+
public CustomHttpClientCodec(
100+
int maxInitialLineLength, int maxHeaderSize, int maxChunkSize, boolean failOnMissingResponse,
101+
boolean validateHeaders, int initialBufferSize) {
102+
this(maxInitialLineLength, maxHeaderSize, maxChunkSize, failOnMissingResponse, validateHeaders,
103+
initialBufferSize, false);
104+
}
105+
106+
/**
107+
* Creates a new instance with the specified decoder options.
108+
*/
109+
public CustomHttpClientCodec(
110+
int maxInitialLineLength, int maxHeaderSize, int maxChunkSize, boolean failOnMissingResponse,
111+
boolean validateHeaders, int initialBufferSize, boolean parseHttpAfterConnectRequest) {
112+
init(new Decoder(maxInitialLineLength, maxHeaderSize, maxChunkSize, validateHeaders, initialBufferSize),
113+
new Encoder());
114+
this.parseHttpAfterConnectRequest = parseHttpAfterConnectRequest;
115+
this.failOnMissingResponse = failOnMissingResponse;
116+
}
117+
118+
/**
119+
* Prepares to upgrade to another protocol from HTTP. Disables the {@link Encoder}.
120+
*/
121+
@Override
122+
public void prepareUpgradeFrom(ChannelHandlerContext ctx) {
123+
((Encoder) outboundHandler()).upgraded = true;
124+
}
125+
126+
/**
127+
* Upgrades to another protocol from HTTP. Removes the {@link Decoder} and {@link Encoder} from
128+
* the pipeline.
129+
*/
130+
@Override
131+
public void upgradeFrom(ChannelHandlerContext ctx) {
132+
final ChannelPipeline p = ctx.pipeline();
133+
p.remove(this);
134+
}
135+
136+
public void setSingleDecode(boolean singleDecode) {
137+
inboundHandler().setSingleDecode(singleDecode);
138+
}
139+
140+
public boolean isSingleDecode() {
141+
return inboundHandler().isSingleDecode();
142+
}
143+
144+
private final class Encoder extends HttpRequestEncoder {
145+
146+
boolean upgraded;
147+
148+
@Override
149+
protected void encode(
150+
ChannelHandlerContext ctx, Object msg, List<Object> out) throws Exception {
151+
152+
if (upgraded) {
153+
out.add(ReferenceCountUtil.retain(msg));
154+
return;
155+
}
156+
157+
if (msg instanceof HttpRequest && !done) {
158+
queue.offer(((HttpRequest) msg).method());
159+
}
160+
161+
super.encode(ctx, msg, out);
162+
163+
if (failOnMissingResponse && !done) {
164+
// check if the request is chunked if so do not increment
165+
if (msg instanceof LastHttpContent) {
166+
// increment as its the last chunk
167+
requestResponseCounter.incrementAndGet();
168+
}
169+
}
170+
}
171+
}
172+
173+
private final class Decoder extends HttpResponseDecoder {
174+
Decoder(int maxInitialLineLength, int maxHeaderSize, int maxChunkSize, boolean validateHeaders) {
175+
super(maxInitialLineLength, maxHeaderSize, maxChunkSize, validateHeaders);
176+
}
177+
178+
Decoder(int maxInitialLineLength, int maxHeaderSize, int maxChunkSize, boolean validateHeaders,
179+
int initialBufferSize) {
180+
super(maxInitialLineLength, maxHeaderSize, maxChunkSize, validateHeaders, initialBufferSize);
181+
}
182+
183+
@Override
184+
protected void decode(
185+
ChannelHandlerContext ctx, ByteBuf buffer, List<Object> out) throws Exception {
186+
if (done) {
187+
int readable = actualReadableBytes();
188+
if (readable == 0) {
189+
// if non is readable just return null
190+
// https://github.com/netty/netty/issues/1159
191+
return;
192+
}
193+
out.add(buffer.readBytes(readable));
194+
} else {
195+
int oldSize = out.size();
196+
super.decode(ctx, buffer, out);
197+
if (failOnMissingResponse) {
198+
int size = out.size();
199+
for (int i = oldSize; i < size; i++) {
200+
decrement(out.get(i));
201+
}
202+
}
203+
}
204+
}
205+
206+
private void decrement(Object msg) {
207+
if (msg == null) {
208+
return;
209+
}
210+
211+
// check if it's an Header and its transfer encoding is not chunked.
212+
if (msg instanceof LastHttpContent) {
213+
requestResponseCounter.decrementAndGet();
214+
}
215+
}
216+
217+
@Override
218+
protected boolean isContentAlwaysEmpty(HttpMessage msg) {
219+
final int statusCode = ((HttpResponse) msg).status().code();
220+
if (statusCode == 100 || statusCode == 101) {
221+
// 100-continue and 101 switching protocols response should be excluded from paired comparison.
222+
// Just delegate to super method which has all the needed handling.
223+
return super.isContentAlwaysEmpty(msg);
224+
}
225+
226+
// Get the getMethod of the HTTP request that corresponds to the
227+
// current response.
228+
HttpMethod method = queue.poll();
229+
230+
char firstChar = method.name().charAt(0);
231+
switch (firstChar) {
232+
case 'H':
233+
// According to 4.3, RFC2616:
234+
// All responses to the HEAD request method MUST NOT include a
235+
// message-body, even though the presence of entity-header fields
236+
// might lead one to believe they do.
237+
if (HttpMethod.HEAD.equals(method)) {
238+
return true;
239+
240+
// The following code was inserted to work around the servers
241+
// that behave incorrectly. It has been commented out
242+
// because it does not work with well behaving servers.
243+
// Please note, even if the 'Transfer-Encoding: chunked'
244+
// header exists in the HEAD response, the response should
245+
// have absolutely no content.
246+
//
247+
//// Interesting edge case:
248+
//// Some poorly implemented servers will send a zero-byte
249+
//// chunk if Transfer-Encoding of the response is 'chunked'.
250+
////
251+
//// return !msg.isChunked();
252+
}
253+
break;
254+
case 'C':
255+
// Successful CONNECT request results in a response with empty body.
256+
if (statusCode == 200) {
257+
if (HttpMethod.CONNECT.equals(method)) {
258+
// Proxy connection established - Parse HTTP only if configured by parseHttpAfterConnectRequest,
259+
// else pass through.
260+
if (!parseHttpAfterConnectRequest) {
261+
done = true;
262+
queue.clear();
263+
}
264+
return true;
265+
}
266+
}
267+
break;
268+
}
269+
270+
return super.isContentAlwaysEmpty(msg);
271+
}
272+
273+
@Override
274+
public void channelInactive(ChannelHandlerContext ctx)
275+
throws Exception {
276+
super.channelInactive(ctx);
277+
278+
if (failOnMissingResponse) {
279+
long missingResponses = requestResponseCounter.get();
280+
if (missingResponses > 0) {
281+
ctx.fireExceptionCaught(new PrematureChannelClosureException(
282+
"channel gone inactive with " + missingResponses +
283+
" missing response(s)"));
284+
}
285+
}
286+
}
287+
}
288+
}

0 commit comments

Comments
 (0)