Skip to content

Commit 976532e

Browse files
fix ssl/alpn on pooled connections
1 parent 6855d45 commit 976532e

File tree

6 files changed

+189
-145
lines changed

6 files changed

+189
-145
lines changed

sdk/clientcore/http-netty4/src/main/java/io/clientcore/http/netty4/NettyHttpClient.java

Lines changed: 54 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -60,16 +60,20 @@
6060
import static io.clientcore.core.utils.ServerSentEventUtils.attemptRetry;
6161
import static io.clientcore.core.utils.ServerSentEventUtils.processTextEventStream;
6262
import static io.clientcore.http.netty4.implementation.Netty4HandlerNames.ALPN;
63+
import static io.clientcore.http.netty4.implementation.Netty4HandlerNames.HTTP2_GOAWAY;
6364
import static io.clientcore.http.netty4.implementation.Netty4HandlerNames.HTTP_CODEC;
6465
import static io.clientcore.http.netty4.implementation.Netty4HandlerNames.HTTP_RESPONSE;
6566
import static io.clientcore.http.netty4.implementation.Netty4HandlerNames.PIPELINE_CLEANUP;
67+
import static io.clientcore.http.netty4.implementation.Netty4HandlerNames.POOL_CONNECTION_HEALTH;
6668
import static io.clientcore.http.netty4.implementation.Netty4HandlerNames.PROGRESS_AND_TIMEOUT;
6769
import static io.clientcore.http.netty4.implementation.Netty4HandlerNames.PROXY;
70+
import static io.clientcore.http.netty4.implementation.Netty4HandlerNames.PROXY_EXCEPTION_WARNING_SUPPRESSION;
6871
import static io.clientcore.http.netty4.implementation.Netty4HandlerNames.SSL;
6972
import static io.clientcore.http.netty4.implementation.Netty4HandlerNames.SSL_INITIALIZER;
7073
import static io.clientcore.http.netty4.implementation.Netty4Utility.awaitLatch;
7174
import static io.clientcore.http.netty4.implementation.Netty4Utility.buildSslContext;
7275
import static io.clientcore.http.netty4.implementation.Netty4Utility.createCodec;
76+
import static io.clientcore.http.netty4.implementation.Netty4Utility.createHttp2Codec;
7377
import static io.clientcore.http.netty4.implementation.Netty4Utility.sendHttp11Request;
7478
import static io.clientcore.http.netty4.implementation.Netty4Utility.sendHttp2Request;
7579
import static io.clientcore.http.netty4.implementation.Netty4Utility.setOrSuppressError;
@@ -79,8 +83,6 @@
7983
*/
8084
class NettyHttpClient implements HttpClient {
8185
private static final ClientLogger LOGGER = new ClientLogger(NettyHttpClient.class);
82-
private static final Netty4ConnectionPool.Http2GoAwayHandler HTTP_2_GO_AWAY_HANDLER
83-
= new Netty4ConnectionPool.Http2GoAwayHandler();
8486

8587
/**
8688
* Error message for when no {@link ServerSentEventListener} is attached to the {@link HttpRequest}.
@@ -149,7 +151,7 @@ private Response<BinaryData> sendWithConnectionPool(HttpRequest request) {
149151

150152
final Channel channel = future.getNow();
151153
try {
152-
configurePooledRequestPipeline(channel, request, responseReference, errorReference, latch, isHttps, uri.getHost(), port);
154+
configurePooledRequestPipeline(channel, request, responseReference, errorReference, latch, isHttps);
153155
} catch (Exception e) {
154156
// An exception occurred during the pipeline setup.
155157
// We fire the exception through the pipeline to trigger the cleanup handler,
@@ -173,7 +175,7 @@ private Response<BinaryData> sendWithConnectionPool(HttpRequest request) {
173175
throw LOGGER.throwableAtError().log(errorReference.get(), CoreException::from);
174176
} else {
175177
throw LOGGER.throwableAtError()
176-
.log("The request latch was released without a response or an error being set.",
178+
.log("The request pipeline completed without producing a response or an error.",
177179
IllegalStateException::new);
178180
}
179181
}
@@ -209,7 +211,10 @@ protected void initChannel(Channel channel) throws SSLException {
209211
}
210212
});
211213

212-
channel.pipeline().addFirst(PROXY, proxyHandler);
214+
ChannelPipeline pipeline = channel.pipeline();
215+
pipeline.addFirst(PROXY, proxyHandler);
216+
pipeline.addAfter(PROXY, PROXY_EXCEPTION_WARNING_SUPPRESSION,
217+
Netty4ConnectionPool.SuppressProxyConnectExceptionWarningHandler.INSTANCE);
213218
}
214219

215220
// Add SSL handling if the request is HTTPS.
@@ -315,7 +320,7 @@ protected void initChannel(Channel channel) throws SSLException {
315320

316321
private void configurePooledRequestPipeline(Channel channel, HttpRequest request,
317322
AtomicReference<ResponseStateInfo> responseReference, AtomicReference<Throwable> errorReference,
318-
CountDownLatch latch, boolean isHttps, String host, int port) {
323+
CountDownLatch latch, boolean isHttps) {
319324

320325
ReentrantLock lock = channel.attr(Netty4ConnectionPool.CHANNEL_LOCK).get();
321326
lock.lock();
@@ -336,64 +341,64 @@ private void configurePooledRequestPipeline(Channel channel, HttpRequest request
336341

337342
final Object pipelineOwnerToken = new Object();
338343
channel.attr(Netty4ConnectionPool.PIPELINE_OWNER_TOKEN).set(pipelineOwnerToken);
344+
ChannelPipeline pipeline = channel.pipeline();
345+
346+
HttpProtocolVersion protocol = channel.attr(Netty4AlpnHandler.HTTP_PROTOCOL_VERSION_KEY).get();
347+
boolean isHttp2 = protocol == HttpProtocolVersion.HTTP_2;
348+
349+
if (protocol == null) {
350+
// Ideally, this should never happen, but as a safeguard.
351+
setOrSuppressError(errorReference, new IllegalStateException("Channel from pool is missing protocol."));
352+
latch.countDown();
353+
return;
354+
}
339355

340-
ProgressReporter progressReporter = (request.getContext() == null)
356+
if (isHttp2) {
357+
// For H2 (which is always HTTPS), the codec is persistent.
358+
// Add it only if it's not already there (first request).
359+
if (pipeline.get(HTTP_CODEC) == null) {
360+
pipeline.addAfter(SSL, HTTP_CODEC, createHttp2Codec());
361+
pipeline.addAfter(HTTP_CODEC, HTTP2_GOAWAY, new Netty4ConnectionPool.Http2GoAwayHandler());
362+
}
363+
} else { // HTTP/1.1 (can be HTTP or HTTPS)
364+
// For H1, the codec is transient and must be added for every request.
365+
// The cleanup handler is responsible for removing it.
366+
String after = isHttps ? SSL : POOL_CONNECTION_HEALTH;
367+
pipeline.addAfter(after, HTTP_CODEC, createCodec());
368+
}
369+
370+
ProgressReporter progressReporter = request.getContext() == null
341371
? null
342372
: (ProgressReporter) request.getContext().getMetadata("progressReporter");
373+
343374
boolean addProgressAndTimeoutHandler = progressReporter != null
344375
|| writeTimeoutMillis > 0
345376
|| responseTimeoutMillis > 0
346377
|| readTimeoutMillis > 0;
347378

348-
ChannelPipeline pipeline = channel.pipeline();
349-
350-
pipeline.addLast(PIPELINE_CLEANUP,
351-
new Netty4PipelineCleanupHandler(connectionPool, errorReference, pipelineOwnerToken));
352-
353-
HttpProtocolVersion protocol = channel.attr(Netty4AlpnHandler.HTTP_PROTOCOL_VERSION_KEY).get();
354-
355-
if (protocol != null) {
356-
if (addProgressAndTimeoutHandler) {
357-
pipeline.addLast(PROGRESS_AND_TIMEOUT, new Netty4ProgressAndTimeoutHandler(progressReporter, writeTimeoutMillis, responseTimeoutMillis, readTimeoutMillis));
358-
}
359-
pipeline.addLast(HTTP_RESPONSE, new Netty4ResponseHandler(request, responseReference, errorReference, latch));
379+
Netty4ResponseHandler responseHandler
380+
= new Netty4ResponseHandler(request, responseReference, errorReference, latch);
360381

361-
if (protocol == HttpProtocolVersion.HTTP_1_1) {
362-
String addBefore = addProgressAndTimeoutHandler ? PROGRESS_AND_TIMEOUT : HTTP_RESPONSE;
363-
pipeline.addBefore(addBefore, HTTP_CODEC, createCodec());
364-
}
382+
if (addProgressAndTimeoutHandler) {
383+
Netty4ProgressAndTimeoutHandler progressAndTimeoutHandler = new Netty4ProgressAndTimeoutHandler(
384+
progressReporter, writeTimeoutMillis, responseTimeoutMillis, readTimeoutMillis);
365385

366-
channel.eventLoop().execute(() -> {
367-
if (protocol == HttpProtocolVersion.HTTP_2) {
368-
sendHttp2Request(request, channel, errorReference, latch);
369-
} else {
370-
send(request, channel, errorReference, latch);
371-
}
372-
});
386+
pipeline.addAfter(HTTP_CODEC, PROGRESS_AND_TIMEOUT, progressAndTimeoutHandler);
387+
pipeline.addAfter(PROGRESS_AND_TIMEOUT, HTTP_RESPONSE, responseHandler);
373388
} else {
374-
if (addProgressAndTimeoutHandler) {
375-
pipeline.addLast(PROGRESS_AND_TIMEOUT,
376-
new Netty4ProgressAndTimeoutHandler(progressReporter, writeTimeoutMillis, responseTimeoutMillis, readTimeoutMillis));
377-
}
389+
pipeline.addAfter(HTTP_CODEC, HTTP_RESPONSE, responseHandler);
390+
}
378391

379-
if (isHttps) {
380-
SslContext sslContext = buildSslContext(maximumHttpVersion, sslContextModifier);
381-
SslHandler sslHandler = sslContext.newHandler(channel.alloc(), host, port);
382-
pipeline.addFirst(SSL, sslHandler);
383-
pipeline.addAfter(SSL, ALPN, new Netty4AlpnHandler(request, responseReference, errorReference, latch));
384-
385-
channel.writeAndFlush(Unpooled.EMPTY_BUFFER);
386-
} else {
387-
Netty4ResponseHandler responseHandler = new Netty4ResponseHandler(request, responseReference, errorReference, latch);
388-
pipeline.addLast(HTTP_RESPONSE, responseHandler);
389-
String addBefore = addProgressAndTimeoutHandler ? PROGRESS_AND_TIMEOUT : HTTP_RESPONSE;
390-
pipeline.addBefore(addBefore, HTTP_CODEC, createCodec());
392+
pipeline.addLast(PIPELINE_CLEANUP,
393+
new Netty4PipelineCleanupHandler(connectionPool, errorReference, pipelineOwnerToken));
391394

395+
channel.eventLoop().execute(() -> {
396+
if (isHttp2) {
397+
sendHttp2Request(request, channel, errorReference, latch);
398+
} else { // HTTP/1.1
392399
send(request, channel, errorReference, latch);
393400
}
394-
}
395-
} catch (SSLException e) {
396-
throw new RuntimeException(e);
401+
});
397402
} finally {
398403
lock.unlock();
399404
}

sdk/clientcore/http-netty4/src/main/java/io/clientcore/http/netty4/NettyHttpClientBuilder.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -355,6 +355,7 @@ public NettyHttpClientBuilder pendingAcquireTimeout(Duration pendingAcquireTimeo
355355
* Sets the maximum number of requests that can be queued waiting for a connection.
356356
* <p>
357357
* This limit is applied on a per-route (per-host) basis.
358+
* If not set, a default value of 10_000 is used.
358359
*
359360
* @param maxPendingAcquires The maximum number of pending acquires.
360361
* @return The updated builder.
@@ -397,7 +398,11 @@ public HttpClient build() {
397398
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, (int) getTimeoutMillis(connectTimeout, 10_000));
398399
// Disable auto-read as we want to control when and how data is read from the channel.
399400
bootstrap.option(ChannelOption.AUTO_READ, false);
401+
// Enable TCP keep-alive to proactively detect and clean up stale connections in the pool. This helps evict
402+
// connections that have been silently dropped by network intermediaries.
400403
bootstrap.option(ChannelOption.SO_KEEPALIVE, true);
404+
// Allow the channel to remain open for writing even after the server has closed its sending side.
405+
// This helps detect half-closures with a ChannelInputShutdownEvent in the PoolConnectionHealthHandler.
401406
bootstrap.option(ChannelOption.ALLOW_HALF_CLOSURE, true);
402407

403408
Configuration buildConfiguration

0 commit comments

Comments
 (0)