diff --git a/sdk/clientcore/http-netty4/pom.xml b/sdk/clientcore/http-netty4/pom.xml
index 6ba17179a4ba..d3306077c811 100644
--- a/sdk/clientcore/http-netty4/pom.xml
+++ b/sdk/clientcore/http-netty4/pom.xml
@@ -205,6 +205,12 @@
2.5.2test
+
+ org.mockito
+ mockito-core
+ 4.11.0
+ test
+
diff --git a/sdk/clientcore/http-netty4/spotbugs-exclude.xml b/sdk/clientcore/http-netty4/spotbugs-exclude.xml
index 9b8d71f16243..5eee4b5d1967 100644
--- a/sdk/clientcore/http-netty4/spotbugs-exclude.xml
+++ b/sdk/clientcore/http-netty4/spotbugs-exclude.xml
@@ -9,6 +9,7 @@
+
@@ -61,4 +62,18 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/sdk/clientcore/http-netty4/src/main/java/io/clientcore/http/netty4/NettyHttpClient.java b/sdk/clientcore/http-netty4/src/main/java/io/clientcore/http/netty4/NettyHttpClient.java
index 90fbafd96b16..66dea873d2c1 100644
--- a/sdk/clientcore/http-netty4/src/main/java/io/clientcore/http/netty4/NettyHttpClient.java
+++ b/sdk/clientcore/http-netty4/src/main/java/io/clientcore/http/netty4/NettyHttpClient.java
@@ -7,6 +7,7 @@
import io.clientcore.core.http.client.HttpProtocolVersion;
import io.clientcore.core.http.models.HttpHeaderName;
import io.clientcore.core.http.models.HttpRequest;
+import io.clientcore.core.http.models.ProxyOptions;
import io.clientcore.core.http.models.Response;
import io.clientcore.core.http.models.ServerSentEventListener;
import io.clientcore.core.instrumentation.logging.ClientLogger;
@@ -20,8 +21,10 @@
import io.clientcore.http.netty4.implementation.ChannelInitializationProxyHandler;
import io.clientcore.http.netty4.implementation.Netty4AlpnHandler;
import io.clientcore.http.netty4.implementation.Netty4ChannelBinaryData;
-import io.clientcore.http.netty4.implementation.Netty4EagerConsumeChannelHandler;
-import io.clientcore.http.netty4.implementation.Netty4HandlerNames;
+import io.clientcore.http.netty4.implementation.Netty4ConnectionPool;
+import io.clientcore.http.netty4.implementation.Netty4ConnectionPoolKey;
+import io.clientcore.http.netty4.implementation.Netty4PipelineCleanupEvent;
+import io.clientcore.http.netty4.implementation.Netty4PipelineCleanupHandler;
import io.clientcore.http.netty4.implementation.Netty4ProgressAndTimeoutHandler;
import io.clientcore.http.netty4.implementation.Netty4ResponseHandler;
import io.clientcore.http.netty4.implementation.Netty4SslInitializationHandler;
@@ -32,33 +35,47 @@
import io.netty.channel.Channel;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
-import io.netty.handler.codec.http2.Http2SecurityUtil;
import io.netty.handler.proxy.ProxyHandler;
-import io.netty.handler.ssl.ApplicationProtocolConfig;
-import io.netty.handler.ssl.ApplicationProtocolNames;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslContextBuilder;
import io.netty.handler.ssl.SslHandler;
-import io.netty.handler.ssl.SslProvider;
-import io.netty.handler.ssl.SupportedCipherSuiteFilter;
+import io.netty.util.concurrent.Future;
+import io.netty.util.concurrent.GenericFutureListener;
import javax.net.ssl.SSLException;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
import java.net.URI;
+import java.nio.channels.ClosedChannelException;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Consumer;
import static io.clientcore.core.utils.ServerSentEventUtils.attemptRetry;
import static io.clientcore.core.utils.ServerSentEventUtils.processTextEventStream;
+import static io.clientcore.http.netty4.implementation.Netty4HandlerNames.ALPN;
+import static io.clientcore.http.netty4.implementation.Netty4HandlerNames.HTTP2_GOAWAY;
+import static io.clientcore.http.netty4.implementation.Netty4HandlerNames.HTTP_CODEC;
import static io.clientcore.http.netty4.implementation.Netty4HandlerNames.HTTP_RESPONSE;
+import static io.clientcore.http.netty4.implementation.Netty4HandlerNames.PIPELINE_CLEANUP;
+import static io.clientcore.http.netty4.implementation.Netty4HandlerNames.POOL_CONNECTION_HEALTH;
import static io.clientcore.http.netty4.implementation.Netty4HandlerNames.PROGRESS_AND_TIMEOUT;
+import static io.clientcore.http.netty4.implementation.Netty4HandlerNames.PROXY;
+import static io.clientcore.http.netty4.implementation.Netty4HandlerNames.PROXY_EXCEPTION_WARNING_SUPPRESSION;
+import static io.clientcore.http.netty4.implementation.Netty4HandlerNames.SSL;
+import static io.clientcore.http.netty4.implementation.Netty4HandlerNames.SSL_INITIALIZER;
import static io.clientcore.http.netty4.implementation.Netty4Utility.awaitLatch;
+import static io.clientcore.http.netty4.implementation.Netty4Utility.buildSslContext;
import static io.clientcore.http.netty4.implementation.Netty4Utility.createCodec;
+import static io.clientcore.http.netty4.implementation.Netty4Utility.createHttp2Codec;
import static io.clientcore.http.netty4.implementation.Netty4Utility.sendHttp11Request;
+import static io.clientcore.http.netty4.implementation.Netty4Utility.sendHttp2Request;
import static io.clientcore.http.netty4.implementation.Netty4Utility.setOrSuppressError;
/**
@@ -73,34 +90,97 @@ class NettyHttpClient implements HttpClient {
private static final String NO_LISTENER_ERROR_MESSAGE
= "No ServerSentEventListener attached to HttpRequest to handle the text/event-stream response";
- private final Bootstrap bootstrap;
- private final Consumer sslContextModifier;
+ private final EventLoopGroup eventLoopGroup;
+ private final Netty4ConnectionPool connectionPool;
+ private final ProxyOptions proxyOptions;
private final ChannelInitializationProxyHandler channelInitializationProxyHandler;
- private final AtomicReference> proxyChallenges;
private final long readTimeoutMillis;
private final long responseTimeoutMillis;
private final long writeTimeoutMillis;
+
+ private final Bootstrap bootstrap;
+ private final Consumer sslContextModifier;
private final HttpProtocolVersion maximumHttpVersion;
- NettyHttpClient(Bootstrap bootstrap, Consumer sslContextModifier,
- HttpProtocolVersion maximumHttpVersion, ChannelInitializationProxyHandler channelInitializationProxyHandler,
- long readTimeoutMillis, long responseTimeoutMillis, long writeTimeoutMillis) {
+ NettyHttpClient(Bootstrap bootstrap, EventLoopGroup eventLoopGroup, Netty4ConnectionPool connectionPool,
+ ProxyOptions proxyOptions, ChannelInitializationProxyHandler channelInitializationProxyHandler,
+ Consumer sslContextModifier, HttpProtocolVersion maximumHttpVersion, long readTimeoutMillis,
+ long responseTimeoutMillis, long writeTimeoutMillis) {
this.bootstrap = bootstrap;
+ this.eventLoopGroup = eventLoopGroup;
+ this.connectionPool = connectionPool;
+ this.proxyOptions = proxyOptions;
+ this.channelInitializationProxyHandler = channelInitializationProxyHandler;
this.sslContextModifier = sslContextModifier;
this.maximumHttpVersion = maximumHttpVersion;
- this.channelInitializationProxyHandler = channelInitializationProxyHandler;
- this.proxyChallenges = new AtomicReference<>();
this.readTimeoutMillis = readTimeoutMillis;
this.responseTimeoutMillis = responseTimeoutMillis;
this.writeTimeoutMillis = writeTimeoutMillis;
}
Bootstrap getBootstrap() {
- return bootstrap;
+ return connectionPool != null ? connectionPool.getBootstrap() : bootstrap;
}
@Override
public Response send(HttpRequest request) {
+ return connectionPool != null ? sendWithConnectionPool(request) : sendWithoutConnectionPool(request);
+ }
+
+ private Response sendWithConnectionPool(HttpRequest request) {
+ final URI uri = request.getUri();
+ final boolean isHttps = "https".equalsIgnoreCase(uri.getScheme());
+ final int port = uri.getPort() == -1 ? (isHttps ? 443 : 80) : uri.getPort();
+ final SocketAddress finalDestination = new InetSocketAddress(uri.getHost(), port);
+
+ final Netty4ConnectionPoolKey connectionPoolKey = constructConnectionPoolKey(finalDestination, isHttps);
+
+ final CountDownLatch latch = new CountDownLatch(1);
+ final AtomicReference responseReference = new AtomicReference<>();
+ final AtomicReference errorReference = new AtomicReference<>();
+
+ Future channelFuture = connectionPool.acquire(connectionPoolKey, isHttps);
+
+ channelFuture.addListener((GenericFutureListener>) future -> {
+ if (!future.isSuccess()) {
+ LOGGER.atError().setThrowable(future.cause()).log("Failed connection.");
+ errorReference.set(future.cause());
+ latch.countDown();
+ return;
+ }
+
+ final Channel channel = future.getNow();
+ try {
+ configurePooledRequestPipeline(channel, request, responseReference, errorReference, latch, isHttps);
+ } catch (Exception e) {
+ // An exception occurred during the pipeline setup.
+ // We fire the exception through the pipeline to trigger the cleanup handler,
+ // which will ensure the channel is properly closed and not returned to the pool.
+ setOrSuppressError(errorReference, e);
+ if (channel.isActive()) {
+ channel.pipeline().fireExceptionCaught(e);
+ }
+ latch.countDown();
+ }
+ });
+
+ awaitLatch(latch);
+
+ ResponseStateInfo info = responseReference.get();
+ if (info != null) {
+ return createResponse(request, info);
+ }
+
+ if (errorReference.get() != null) {
+ throw LOGGER.throwableAtError().log(errorReference.get(), CoreException::from);
+ } else {
+ throw LOGGER.throwableAtError()
+ .log("The request pipeline completed without producing a response or an error.",
+ IllegalStateException::new);
+ }
+ }
+
+ private Response sendWithoutConnectionPool(HttpRequest request) {
URI uri = request.getUri();
String host = uri.getHost();
int port = uri.getPort() == -1 ? ("https".equalsIgnoreCase(uri.getScheme()) ? 443 : 80) : uri.getPort();
@@ -113,6 +193,7 @@ public Response send(HttpRequest request) {
AtomicReference responseReference = new AtomicReference<>();
AtomicReference errorReference = new AtomicReference<>();
+ AtomicReference> proxyChallenges = new AtomicReference<>();
CountDownLatch latch = new CountDownLatch(1);
// Configure an immutable ChannelInitializer in the builder with everything that can be added on a non-per
@@ -130,47 +211,21 @@ protected void initChannel(Channel channel) throws SSLException {
}
});
- channel.pipeline().addFirst(Netty4HandlerNames.PROXY, proxyHandler);
+ ChannelPipeline pipeline = channel.pipeline();
+ pipeline.addFirst(PROXY, proxyHandler);
+ pipeline.addAfter(PROXY, PROXY_EXCEPTION_WARNING_SUPPRESSION,
+ Netty4ConnectionPool.SuppressProxyConnectExceptionWarningHandler.INSTANCE);
}
// Add SSL handling if the request is HTTPS.
if (isHttps) {
- SslContextBuilder sslContextBuilder
- = SslContextBuilder.forClient().endpointIdentificationAlgorithm("HTTPS");
- if (maximumHttpVersion == HttpProtocolVersion.HTTP_2) {
- // If HTTP/2 is the maximum version, we need to ensure that ALPN is enabled.
- SslProvider sslProvider = SslContext.defaultClientProvider();
- ApplicationProtocolConfig.SelectorFailureBehavior selectorBehavior;
- ApplicationProtocolConfig.SelectedListenerFailureBehavior selectedBehavior;
- if (sslProvider == SslProvider.JDK) {
- selectorBehavior = ApplicationProtocolConfig.SelectorFailureBehavior.FATAL_ALERT;
- selectedBehavior = ApplicationProtocolConfig.SelectedListenerFailureBehavior.FATAL_ALERT;
- } else {
- // Netty OpenSslContext doesn't support FATAL_ALERT, use NO_ADVERTISE and ACCEPT
- // instead.
- selectorBehavior = ApplicationProtocolConfig.SelectorFailureBehavior.NO_ADVERTISE;
- selectedBehavior = ApplicationProtocolConfig.SelectedListenerFailureBehavior.ACCEPT;
- }
-
- sslContextBuilder.ciphers(Http2SecurityUtil.CIPHERS, SupportedCipherSuiteFilter.INSTANCE)
- .applicationProtocolConfig(new ApplicationProtocolConfig(
- ApplicationProtocolConfig.Protocol.ALPN, selectorBehavior, selectedBehavior,
- ApplicationProtocolNames.HTTP_2, ApplicationProtocolNames.HTTP_1_1));
- }
- if (sslContextModifier != null) {
- // Allow the caller to modify the SslContextBuilder before it is built.
- sslContextModifier.accept(sslContextBuilder);
- }
-
- SslContext ssl = sslContextBuilder.build();
+ SslContext ssl = buildSslContext(maximumHttpVersion, sslContextModifier);
// SSL handling is added last here. This is done as proxying could require SSL handling too.
- channel.pipeline().addLast(Netty4HandlerNames.SSL, ssl.newHandler(channel.alloc(), host, port));
- channel.pipeline()
- .addLast(Netty4HandlerNames.SSL_INITIALIZER, new Netty4SslInitializationHandler());
+ channel.pipeline().addLast(SSL, ssl.newHandler(channel.alloc(), host, port));
+ channel.pipeline().addLast(SSL_INITIALIZER, new Netty4SslInitializationHandler());
channel.pipeline()
- .addLast(Netty4HandlerNames.ALPN,
- new Netty4AlpnHandler(request, responseReference, errorReference, latch));
+ .addLast(ALPN, new Netty4AlpnHandler(request, responseReference, errorReference, latch));
}
}
});
@@ -194,8 +249,8 @@ protected void initChannel(Channel channel) throws SSLException {
// Only add CoreProgressAndTimeoutHandler if it will do anything, ex it is reporting progress or is
// applying timeouts.
- // This is done to keep the ChannelPipeline shorter, therefore more performant, if this would
- // effectively be a no-op.
+ // This is done to keep the ChannelPipeline shorter, therefore more performant if this
+ // effectively is a no-op.
if (addProgressAndTimeoutHandler) {
channel.pipeline()
.addLast(PROGRESS_AND_TIMEOUT, new Netty4ProgressAndTimeoutHandler(progressReporter,
@@ -204,7 +259,7 @@ protected void initChannel(Channel channel) throws SSLException {
Throwable earlyError = errorReference.get();
if (earlyError != null) {
- // If an error occurred between the connect and the request being sent, don't proceed with sending
+ // If an error occurred between the connecting and the request being sent, don't proceed with sending
// the request.
latch.countDown();
return;
@@ -238,7 +293,7 @@ protected void initChannel(Channel channel) throws SSLException {
channel.pipeline().addLast(HTTP_RESPONSE, responseHandler);
String addBefore = addProgressAndTimeoutHandler ? PROGRESS_AND_TIMEOUT : HTTP_RESPONSE;
- channel.pipeline().addBefore(addBefore, Netty4HandlerNames.HTTP_CODEC, createCodec());
+ channel.pipeline().addBefore(addBefore, HTTP_CODEC, createCodec());
sendHttp11Request(request, channel, errorReference)
.addListener((ChannelFutureListener) sendListener -> {
@@ -260,67 +315,151 @@ protected void initChannel(Channel channel) throws SSLException {
throw LOGGER.throwableAtError().log(errorReference.get(), CoreException::from);
}
- Response response;
- if (info.isChannelConsumptionComplete()) {
- // The network response is already complete, handle creating our Response based on the request method and
- // response headers.
- BinaryData body = BinaryData.empty();
- ByteArrayOutputStream eagerContent = info.getEagerContent();
- if (info.getResponseBodyHandling() != ResponseBodyHandling.IGNORE && eagerContent.size() > 0) {
- // Set the response body as the first HttpContent received if the request wasn't a HEAD request and
- // there was body content.
- body = BinaryData.fromBytes(eagerContent.toByteArray());
+ return createResponse(request, info);
+ }
+
+ private void configurePooledRequestPipeline(Channel channel, HttpRequest request,
+ AtomicReference responseReference, AtomicReference errorReference,
+ CountDownLatch latch, boolean isHttps) {
+
+ ReentrantLock lock = channel.attr(Netty4ConnectionPool.CHANNEL_LOCK).get();
+ lock.lock();
+ try {
+ channel.config().setAutoRead(false);
+
+ // It's possible that the channel was closed between the time it was acquired and now.
+ // This check ensures that we don't try to add handlers to a closed channel.
+ // Read handlers are responsible after this check for not being added in a closed channel.
+ if (!channel.isActive()) {
+ LOGGER.atWarning().log("Channel acquired from the pool is inactive, failing the request.");
+ setOrSuppressError(errorReference, new ClosedChannelException());
+ latch.countDown();
+ return;
}
- response = new Response<>(request, info.getStatusCode(), info.getHeaders(), body);
- } else {
- // Otherwise we aren't finished, handle the remaining content according to the documentation in
- // 'channelRead()'.
- BinaryData body = BinaryData.empty();
- ResponseBodyHandling bodyHandling = info.getResponseBodyHandling();
- Channel channel = info.getResponseChannel();
- if (bodyHandling == ResponseBodyHandling.IGNORE) {
- // We're ignoring the response content.
- CountDownLatch drainLatch = new CountDownLatch(1);
- channel.pipeline().addLast(new Netty4EagerConsumeChannelHandler(drainLatch, ignored -> {
- }, info.isHttp2()));
- channel.config().setAutoRead(true);
- awaitLatch(drainLatch);
- } else if (bodyHandling == ResponseBodyHandling.STREAM) {
- // Body streaming uses a special BinaryData that tracks the firstContent read and the Channel it came
- // from so it can be consumed when the BinaryData is being used.
- // autoRead should have been disabled already but lets make sure that it is.
- channel.config().setAutoRead(false);
- String contentLength = info.getHeaders().getValue(HttpHeaderName.CONTENT_LENGTH);
- Long length = null;
- if (!CoreUtils.isNullOrEmpty(contentLength)) {
- try {
- length = Long.parseLong(contentLength);
- } catch (NumberFormatException ignored) {
- // Ignore, we'll just read until the channel is closed.
- }
+ final Object pipelineOwnerToken = new Object();
+ channel.attr(Netty4ConnectionPool.PIPELINE_OWNER_TOKEN).set(pipelineOwnerToken);
+ ChannelPipeline pipeline = channel.pipeline();
+
+ HttpProtocolVersion protocol = channel.attr(Netty4AlpnHandler.HTTP_PROTOCOL_VERSION_KEY).get();
+ boolean isHttp2 = protocol == HttpProtocolVersion.HTTP_2;
+
+ if (protocol == null) {
+ // Ideally, this should never happen, but as a safeguard.
+ setOrSuppressError(errorReference, new IllegalStateException("Channel from pool is missing protocol."));
+ latch.countDown();
+ return;
+ }
+
+ if (isHttp2) {
+ // For H2 (which is always HTTPS), the codec is persistent.
+ // Add it only if it's not already there (first request).
+ if (pipeline.get(HTTP_CODEC) == null) {
+ pipeline.addAfter(SSL, HTTP_CODEC, createHttp2Codec());
+ pipeline.addAfter(HTTP_CODEC, HTTP2_GOAWAY, new Netty4ConnectionPool.Http2GoAwayHandler());
}
+ } else { // HTTP/1.1 (can be HTTP or HTTPS)
+ // For H1, the codec is transient and must be added for every request.
+ // The cleanup handler is responsible for removing it.
+ String after = isHttps ? SSL : POOL_CONNECTION_HEALTH;
+ pipeline.addAfter(after, HTTP_CODEC, createCodec());
+ }
+
+ ProgressReporter progressReporter = request.getContext() == null
+ ? null
+ : (ProgressReporter) request.getContext().getMetadata("progressReporter");
+
+ boolean addProgressAndTimeoutHandler = progressReporter != null
+ || writeTimeoutMillis > 0
+ || responseTimeoutMillis > 0
+ || readTimeoutMillis > 0;
- body = new Netty4ChannelBinaryData(info.getEagerContent(), channel, length, info.isHttp2());
+ Netty4ResponseHandler responseHandler
+ = new Netty4ResponseHandler(request, responseReference, errorReference, latch);
+
+ if (addProgressAndTimeoutHandler) {
+ Netty4ProgressAndTimeoutHandler progressAndTimeoutHandler = new Netty4ProgressAndTimeoutHandler(
+ progressReporter, writeTimeoutMillis, responseTimeoutMillis, readTimeoutMillis);
+
+ pipeline.addAfter(HTTP_CODEC, PROGRESS_AND_TIMEOUT, progressAndTimeoutHandler);
+ pipeline.addAfter(PROGRESS_AND_TIMEOUT, HTTP_RESPONSE, responseHandler);
} else {
- // All cases otherwise assume BUFFER.
- CountDownLatch drainLatch = new CountDownLatch(1);
- channel.pipeline().addLast(new Netty4EagerConsumeChannelHandler(drainLatch, buf -> {
- try {
- buf.readBytes(info.getEagerContent(), buf.readableBytes());
- } catch (IOException ex) {
- throw LOGGER.throwableAtError().log(ex, CoreException::from);
- }
- }, info.isHttp2()));
- channel.config().setAutoRead(true);
- awaitLatch(drainLatch);
+ pipeline.addAfter(HTTP_CODEC, HTTP_RESPONSE, responseHandler);
+ }
+
+ pipeline.addLast(PIPELINE_CLEANUP,
+ new Netty4PipelineCleanupHandler(connectionPool, errorReference, pipelineOwnerToken));
+
+ channel.eventLoop().execute(() -> {
+ if (isHttp2) {
+ sendHttp2Request(request, channel, errorReference, latch);
+ } else { // HTTP/1.1
+ send(request, channel, errorReference, latch);
+ }
+ });
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ private void send(HttpRequest request, Channel channel, AtomicReference errorReference,
+ CountDownLatch latch) {
+ sendHttp11Request(request, channel, errorReference).addListener(f -> {
+ if (f.isSuccess()) {
+ channel.read();
+ } else {
+ setOrSuppressError(errorReference, f.cause());
+ channel.pipeline().fireExceptionCaught(f.cause());
+ latch.countDown();
+ }
+ });
+ }
+
+ private Response createResponse(HttpRequest request, ResponseStateInfo info) {
+ BinaryData body;
+ Response response;
+ Channel channelToCleanup = info.getResponseChannel();
- body = BinaryData.fromBytes(info.getEagerContent().toByteArray());
+ channelToCleanup.eventLoop().execute(() -> {
+ if (channelToCleanup.pipeline().get(Netty4ResponseHandler.class) != null) {
+ channelToCleanup.pipeline().remove(Netty4ResponseHandler.class);
}
+ });
- response = new Response<>(request, info.getStatusCode(), info.getHeaders(), body);
+ final Runnable cleanupTask = () -> {
+ if (connectionPool != null) {
+ channelToCleanup.pipeline().fireUserEventTriggered(Netty4PipelineCleanupEvent.CLEANUP_PIPELINE);
+ } else {
+ channelToCleanup.close();
+ }
+ };
+
+ if (info.isChannelConsumptionComplete()) {
+ ByteArrayOutputStream eagerContent = info.getEagerContent();
+
+ body = (info.getResponseBodyHandling() != ResponseBodyHandling.IGNORE
+ && eagerContent != null
+ && eagerContent.size() > 0) ? BinaryData.fromBytes(eagerContent.toByteArray()) : BinaryData.empty();
+
+ channelToCleanup.eventLoop().execute(cleanupTask);
+ } else {
+ // For all other cases, create a streaming response body.
+ // This delegates all body consumption and cleanup logic to Netty4ChannelBinaryData.
+ String contentLength = info.getHeaders().getValue(HttpHeaderName.CONTENT_LENGTH);
+ Long length = null;
+ if (!CoreUtils.isNullOrEmpty(contentLength)) {
+ try {
+ length = Long.parseLong(contentLength);
+ } catch (NumberFormatException ignored) {
+ // Ignore, we'll just read until the channel is closed.
+ }
+ }
+ body = new Netty4ChannelBinaryData(info.getEagerContent(), info.getResponseChannel(), length,
+ info.isHttp2(), cleanupTask);
}
+ response = new Response<>(request, info.getStatusCode(), info.getHeaders(), body);
+
if (response.getValue() != BinaryData.empty()
&& ServerSentEventUtils
.isTextEventStreamContentType(response.getHeaders().getValue(HttpHeaderName.CONTENT_TYPE))) {
@@ -338,6 +477,7 @@ protected void initChannel(Channel channel) throws SSLException {
// If an error occurred or we want to reconnect
if (!Thread.currentThread().isInterrupted() && attemptRetry(serverSentResult, request)) {
+ response.close();
return this.send(request);
}
@@ -347,17 +487,23 @@ protected void initChannel(Channel channel) throws SSLException {
throw LOGGER.throwableAtError().log(ex, CoreException::from);
}
} else {
+ response.close();
throw LOGGER.throwableAtError().log(NO_LISTENER_ERROR_MESSAGE, IllegalStateException::new);
}
}
-
return response;
}
public void close() {
- EventLoopGroup group = bootstrap.config().group();
- if (group != null) {
- group.shutdownGracefully();
+ if (connectionPool != null) {
+ try {
+ connectionPool.close();
+ } catch (IOException e) {
+ LOGGER.atWarning().setThrowable(e).log("Failed to close the Netty Connection pool.");
+ }
+ }
+ if (eventLoopGroup != null && !eventLoopGroup.isShuttingDown()) {
+ eventLoopGroup.shutdownGracefully();
}
}
@@ -367,4 +513,26 @@ private static BinaryData createBodyFromServerSentResult(ServerSentResult server
: BinaryData.empty();
}
+ private Netty4ConnectionPoolKey constructConnectionPoolKey(SocketAddress finalDestination, boolean isHttps) {
+ final Netty4ConnectionPoolKey key;
+
+ final boolean useProxy = channelInitializationProxyHandler.test(finalDestination);
+ if (useProxy) {
+ SocketAddress proxyAddress = proxyOptions.getAddress();
+ if (isHttps) {
+ // For proxied HTTPS, the pool is keyed by the unique combination of the proxy
+ // and the final destination. This creates dedicated pools for each tunneled destination.
+ key = new Netty4ConnectionPoolKey(proxyAddress, finalDestination);
+ } else {
+ // For proxied plain HTTP, the pool is keyed only by the proxy address.
+ // This allows reusing the same connection to the proxy for different final destinations.
+ key = new Netty4ConnectionPoolKey(proxyAddress, proxyAddress);
+ }
+ } else {
+ key = new Netty4ConnectionPoolKey(finalDestination, finalDestination);
+ }
+
+ return key;
+ }
+
}
diff --git a/sdk/clientcore/http-netty4/src/main/java/io/clientcore/http/netty4/NettyHttpClientBuilder.java b/sdk/clientcore/http-netty4/src/main/java/io/clientcore/http/netty4/NettyHttpClientBuilder.java
index b74853327e3a..68c163abd001 100644
--- a/sdk/clientcore/http-netty4/src/main/java/io/clientcore/http/netty4/NettyHttpClientBuilder.java
+++ b/sdk/clientcore/http-netty4/src/main/java/io/clientcore/http/netty4/NettyHttpClientBuilder.java
@@ -7,8 +7,10 @@
import io.clientcore.core.http.client.HttpProtocolVersion;
import io.clientcore.core.http.models.ProxyOptions;
import io.clientcore.core.instrumentation.logging.ClientLogger;
+import io.clientcore.core.instrumentation.logging.LoggingEvent;
import io.clientcore.core.utils.configuration.Configuration;
import io.clientcore.http.netty4.implementation.ChannelInitializationProxyHandler;
+import io.clientcore.http.netty4.implementation.Netty4ConnectionPool;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelOption;
@@ -135,6 +137,13 @@ private static Class extends SocketChannel> getChannelClass(String className)
private Duration writeTimeout;
private HttpProtocolVersion maximumHttpVersion = HttpProtocolVersion.HTTP_2;
+ // --- Connection Pool Configuration ---
+ private int connectionPoolSize = 1000;
+ private Duration connectionIdleTimeout = Duration.ofSeconds(60);
+ private Duration maxConnectionLifetime;
+ private Duration pendingAcquireTimeout = Duration.ofSeconds(60); // Default wait time for a connection
+ private int maxPendingAcquires = 10_000; // Default pending queue size
+
/**
* Creates a new instance of {@link NettyHttpClientBuilder}.
*/
@@ -281,6 +290,86 @@ public NettyHttpClientBuilder maximumHttpVersion(HttpProtocolVersion httpVersion
return this;
}
+ /**
+ * Sets the maximum number of connections allowed per remote address in the connection pool.
+ *
+ * If not set, a default value of 1000 is used.
+ *
+ * A value of {@code 0} or less will disable connection pooling, and hence each request will
+ * get a newly created connection.
+ *
+ * @param connectionPoolSize The maximum number of connections.
+ * @return The updated builder.
+ */
+ public NettyHttpClientBuilder connectionPoolSize(int connectionPoolSize) {
+ this.connectionPoolSize = connectionPoolSize;
+ return this;
+ }
+
+ /**
+ * Sets the maximum time a connection can remain idle in the pool before it is closed and removed.
+ *
+ * If not set, a default value of 60 seconds is used.
+ *
+ * A {@link Duration} of zero or less will make the connections never expire. Note: While this is
+ * provided as an option, it is not recommended for most use cases, as it can lead to
+ * request failures if network intermediaries (like load balancers or firewalls) silently drop idle
+ * connections.
+ *
+ * @param connectionIdleTimeout The idle timeout duration.
+ * @return The updated builder.
+ */
+ public NettyHttpClientBuilder connectionIdleTimeout(Duration connectionIdleTimeout) {
+ this.connectionIdleTimeout = connectionIdleTimeout;
+ return this;
+ }
+
+ /**
+ * Sets the maximum time a connection is allowed to exist.
+ *
+ * By default, connections have no lifetime limit and can be used indefinitely.
+ *
+ * After this time is met or exceeded, the connection will be closed upon release. A {@link Duration} of zero or
+ * less, or a null value, will also result in connections having no lifetime limit.
+ *
+ * @param maxConnectionLifetime The maximum connection lifetime.
+ * @return The updated builder.
+ */
+ public NettyHttpClientBuilder maxConnectionLifetime(Duration maxConnectionLifetime) {
+ this.maxConnectionLifetime = maxConnectionLifetime;
+ return this;
+ }
+
+ /**
+ * Sets the maximum time to wait for a connection from the pool.
+ *
+ * If not set, a default value of 60 seconds is used.
+ * @param pendingAcquireTimeout The timeout for pending acquires.
+ * @return The updated builder.
+ */
+ public NettyHttpClientBuilder pendingAcquireTimeout(Duration pendingAcquireTimeout) {
+ this.pendingAcquireTimeout = pendingAcquireTimeout;
+ return this;
+ }
+
+ /**
+ * Sets the maximum number of requests that can be queued waiting for a connection.
+ *
+ * This limit is applied on a per-route (per-host) basis.
+ * If not set, a default value of 10_000 is used.
+ *
+ * @param maxPendingAcquires The maximum number of pending acquires.
+ * @return The updated builder.
+ */
+ public NettyHttpClientBuilder maxPendingAcquires(int maxPendingAcquires) {
+ if (maxPendingAcquires <= 0) {
+ throw LOGGER.throwableAtError()
+ .log("maxPendingAcquires must be greater than 0", IllegalArgumentException::new);
+ }
+ this.maxPendingAcquires = maxPendingAcquires;
+ return this;
+ }
+
/**
* Builds the NettyHttpClient.
*
@@ -293,18 +382,33 @@ public HttpClient build() {
= getChannelClass(this.channelClass, group.getClass(), IS_EPOLL_AVAILABLE, IS_KQUEUE_AVAILABLE);
// Leave breadcrumbs about the NettyHttpClient configuration, in case troubleshooting is needed.
- LOGGER.atVerbose()
+ LoggingEvent loggingEvent = LOGGER.atVerbose()
.addKeyValue("customEventLoopGroup", eventLoopGroup != null)
.addKeyValue("eventLoopGroupClass", group.getClass())
.addKeyValue("customChannelClass", this.channelClass != null)
- .addKeyValue("channelClass", channelClass)
- .log("NettyHttpClient was built with these configurations.");
+ .addKeyValue("channelClass", channelClass);
+
+ if (connectionPoolSize > 0) {
+ loggingEvent.addKeyValue("connectionPoolSize", this.connectionPoolSize)
+ .addKeyValue("connectionIdleTimeout", this.connectionIdleTimeout)
+ .addKeyValue("maxConnectionLifetime", this.maxConnectionLifetime)
+ .addKeyValue("pendingAcquireTimeout", this.pendingAcquireTimeout)
+ .addKeyValue("maxPendingAcquires", this.maxPendingAcquires);
+ }
+
+ loggingEvent.log("NettyHttpClient was built with these configurations.");
Bootstrap bootstrap = new Bootstrap().group(group)
.channel(channelClass)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, (int) getTimeoutMillis(connectTimeout, 10_000));
// Disable auto-read as we want to control when and how data is read from the channel.
bootstrap.option(ChannelOption.AUTO_READ, false);
+ // Enable TCP keep-alive to proactively detect and clean up stale connections in the pool. This helps evict
+ // connections that have been silently dropped by network intermediaries.
+ bootstrap.option(ChannelOption.SO_KEEPALIVE, true);
+ // Allow the channel to remain open for writing even after the server has closed its sending side.
+ // This helps detect half-closures with a ChannelInputShutdownEvent in the PoolConnectionHealthHandler.
+ bootstrap.option(ChannelOption.ALLOW_HALF_CLOSURE, true);
Configuration buildConfiguration
= (configuration == null) ? Configuration.getGlobalConfiguration() : configuration;
@@ -312,9 +416,17 @@ public HttpClient build() {
ProxyOptions buildProxyOptions
= (proxyOptions == null) ? ProxyOptions.fromConfiguration(buildConfiguration, true) : proxyOptions;
- return new NettyHttpClient(bootstrap, sslContextModifier, maximumHttpVersion,
- new ChannelInitializationProxyHandler(buildProxyOptions), getTimeoutMillis(readTimeout),
- getTimeoutMillis(responseTimeout), getTimeoutMillis(writeTimeout));
+ Netty4ConnectionPool connectionPool = null;
+ if (connectionPoolSize > 0) {
+ connectionPool
+ = new Netty4ConnectionPool(bootstrap, new ChannelInitializationProxyHandler(buildProxyOptions),
+ sslContextModifier, connectionPoolSize, connectionIdleTimeout, maxConnectionLifetime,
+ pendingAcquireTimeout, maxPendingAcquires, maximumHttpVersion);
+ }
+
+ return new NettyHttpClient(bootstrap, group, connectionPool, buildProxyOptions,
+ new ChannelInitializationProxyHandler(buildProxyOptions), sslContextModifier, maximumHttpVersion,
+ getTimeoutMillis(readTimeout), getTimeoutMillis(responseTimeout), getTimeoutMillis(writeTimeout));
}
static long getTimeoutMillis(Duration duration) {
diff --git a/sdk/clientcore/http-netty4/src/main/java/io/clientcore/http/netty4/NettyHttpClientProvider.java b/sdk/clientcore/http-netty4/src/main/java/io/clientcore/http/netty4/NettyHttpClientProvider.java
index 9151620ee25c..5541317fb0f9 100644
--- a/sdk/clientcore/http-netty4/src/main/java/io/clientcore/http/netty4/NettyHttpClientProvider.java
+++ b/sdk/clientcore/http-netty4/src/main/java/io/clientcore/http/netty4/NettyHttpClientProvider.java
@@ -30,6 +30,20 @@ public HttpClient getHttpClient() {
public NettyHttpClientProvider() {
}
+ /**
+ * Creates a new {@link HttpClient} instance with a default, shared connection pool.
+ *
+ * For more advanced customization, such as disabling pooling entirely, use the {@link NettyHttpClientBuilder}.
+ *
+ * Example: Creating a client without a connection pool
+ *
+ *
+ * @return A new {@link HttpClient} instance.
+ */
@Override
public HttpClient getNewInstance() {
return new NettyHttpClientBuilder().build();
diff --git a/sdk/clientcore/http-netty4/src/main/java/io/clientcore/http/netty4/implementation/ChannelInitializationProxyHandler.java b/sdk/clientcore/http-netty4/src/main/java/io/clientcore/http/netty4/implementation/ChannelInitializationProxyHandler.java
index 56b79d04d92c..9e0717730775 100644
--- a/sdk/clientcore/http-netty4/src/main/java/io/clientcore/http/netty4/implementation/ChannelInitializationProxyHandler.java
+++ b/sdk/clientcore/http-netty4/src/main/java/io/clientcore/http/netty4/implementation/ChannelInitializationProxyHandler.java
@@ -67,7 +67,7 @@ public boolean test(SocketAddress socketAddress) {
InetSocketAddress inetSocketAddress = (InetSocketAddress) socketAddress;
String hostString = inetSocketAddress.getHostString();
- return hostString != null && nonProxyHostsPattern.matcher(hostString).matches();
+ return hostString != null && !nonProxyHostsPattern.matcher(hostString).matches();
}
/**
diff --git a/sdk/clientcore/http-netty4/src/main/java/io/clientcore/http/netty4/implementation/Netty4AlpnHandler.java b/sdk/clientcore/http-netty4/src/main/java/io/clientcore/http/netty4/implementation/Netty4AlpnHandler.java
index 6b066103ab6d..0a869b84f761 100644
--- a/sdk/clientcore/http-netty4/src/main/java/io/clientcore/http/netty4/implementation/Netty4AlpnHandler.java
+++ b/sdk/clientcore/http-netty4/src/main/java/io/clientcore/http/netty4/implementation/Netty4AlpnHandler.java
@@ -2,26 +2,22 @@
// Licensed under the MIT License.
package io.clientcore.http.netty4.implementation;
+import io.clientcore.core.http.client.HttpProtocolVersion;
import io.clientcore.core.http.models.HttpRequest;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPipeline;
-import io.netty.handler.codec.http2.DefaultHttp2Connection;
-import io.netty.handler.codec.http2.DelegatingDecompressorFrameListener;
-import io.netty.handler.codec.http2.Http2Connection;
-import io.netty.handler.codec.http2.Http2FrameListener;
-import io.netty.handler.codec.http2.Http2Settings;
-import io.netty.handler.codec.http2.HttpToHttp2ConnectionHandler;
-import io.netty.handler.codec.http2.HttpToHttp2ConnectionHandlerBuilder;
-import io.netty.handler.codec.http2.InboundHttp2ToHttpAdapterBuilder;
import io.netty.handler.ssl.ApplicationProtocolNames;
import io.netty.handler.ssl.ApplicationProtocolNegotiationHandler;
+import io.netty.util.AttributeKey;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicReference;
-import static io.clientcore.http.netty4.implementation.Netty4Utility.createCodec;
+import static io.clientcore.http.netty4.implementation.Netty4HandlerNames.ALPN;
+import static io.clientcore.http.netty4.implementation.Netty4Utility.configureHttpsPipeline;
import static io.clientcore.http.netty4.implementation.Netty4Utility.sendHttp11Request;
+import static io.clientcore.http.netty4.implementation.Netty4Utility.sendHttp2Request;
import static io.clientcore.http.netty4.implementation.Netty4Utility.setOrSuppressError;
/**
@@ -29,7 +25,15 @@
* either HTTP/1.1 or HTTP/2 based on the result of negotiation.
*/
public final class Netty4AlpnHandler extends ApplicationProtocolNegotiationHandler {
- private static final int TWO_FIFTY_SIX_KB = 256 * 1024;
+
+ /**
+ * An Attribute key for the channel storing the HTTP protocol version that was negotiated.
+ * This information will be used in case the same channel is reused in the future, so we can
+ * adjust the correct handlers because there's no need for ALPN to run again.
+ */
+ public static final AttributeKey HTTP_PROTOCOL_VERSION_KEY
+ = AttributeKey.valueOf("http-protocol-version");
+
private final HttpRequest request;
private final AtomicReference responseReference;
private final AtomicReference errorReference;
@@ -38,9 +42,9 @@ public final class Netty4AlpnHandler extends ApplicationProtocolNegotiationHandl
/**
* Creates a new instance of {@link Netty4AlpnHandler} with a fallback to using HTTP/1.1.
*
- * @param request The request to send once ALPN negotiation completes.
+ * @param request The request to send once ALPN negotiation completes.
* @param errorReference An AtomicReference keeping track of errors during the request lifecycle.
- * @param latch A CountDownLatch that will be released once the request completes.
+ * @param latch A CountDownLatch that will be released once the request completes.
*/
public Netty4AlpnHandler(HttpRequest request, AtomicReference responseReference,
AtomicReference errorReference, CountDownLatch latch) {
@@ -58,78 +62,37 @@ public boolean isSharable() {
@Override
protected void configurePipeline(ChannelHandlerContext ctx, String protocol) {
+ HttpProtocolVersion protocolVersion;
if (ApplicationProtocolNames.HTTP_2.equals(protocol)) {
- // TODO (alzimmer): InboundHttp2ToHttpAdapter buffers the entire response into a FullHttpResponse. Need to
- // create a streaming version of this to support huge response payloads.
- Http2Connection http2Connection = new DefaultHttp2Connection(false);
- Http2Settings settings = new Http2Settings().headerTableSize(4096)
- .maxHeaderListSize(TWO_FIFTY_SIX_KB)
- .pushEnabled(false)
- .initialWindowSize(TWO_FIFTY_SIX_KB);
- Http2FrameListener frameListener = new DelegatingDecompressorFrameListener(http2Connection,
- new InboundHttp2ToHttpAdapterBuilder(http2Connection).maxContentLength(Integer.MAX_VALUE)
- .propagateSettings(true)
- .validateHttpHeaders(true)
- .build(),
- 0);
+ protocolVersion = HttpProtocolVersion.HTTP_2;
+ } else if (ApplicationProtocolNames.HTTP_1_1.equals(protocol)) {
+ protocolVersion = HttpProtocolVersion.HTTP_1_1;
+ } else {
+ ctx.fireExceptionCaught(new IllegalStateException("unknown protocol: " + protocol));
+ return;
+ }
- HttpToHttp2ConnectionHandler connectionHandler
- = new HttpToHttp2ConnectionHandlerBuilder().initialSettings(settings)
- .frameListener(frameListener)
- .connection(http2Connection)
- .validateHeaders(true)
- .build();
+ ctx.channel().attr(HTTP_PROTOCOL_VERSION_KEY).set(protocolVersion);
- if (ctx.pipeline().get(Netty4HandlerNames.PROGRESS_AND_TIMEOUT) != null) {
- ctx.pipeline()
- .addAfter(Netty4HandlerNames.PROGRESS_AND_TIMEOUT, Netty4HandlerNames.HTTP_RESPONSE,
- new Netty4ResponseHandler(request, responseReference, errorReference, latch));
- ctx.pipeline()
- .addBefore(Netty4HandlerNames.PROGRESS_AND_TIMEOUT, Netty4HandlerNames.HTTP_CODEC,
- connectionHandler);
- } else {
- ctx.pipeline().addAfter(Netty4HandlerNames.SSL, Netty4HandlerNames.HTTP_CODEC, connectionHandler);
- ctx.pipeline()
- .addAfter(Netty4HandlerNames.HTTP_CODEC, Netty4HandlerNames.HTTP_RESPONSE,
- new Netty4ResponseHandler(request, responseReference, errorReference, latch));
- }
+ configureHttpsPipeline(ctx.pipeline(), request, protocolVersion, responseReference, errorReference, latch);
+ if (protocolVersion == HttpProtocolVersion.HTTP_2) {
+ sendHttp2Request(request, ctx.channel(), errorReference, latch);
+ } else {
sendHttp11Request(request, ctx.channel(), errorReference)
.addListener((ChannelFutureListener) sendListener -> {
if (!sendListener.isSuccess()) {
setOrSuppressError(errorReference, sendListener.cause());
- sendListener.channel().close();
+ sendListener.channel().pipeline().fireExceptionCaught(sendListener.cause());
latch.countDown();
} else {
sendListener.channel().read();
}
});
- } else if (ApplicationProtocolNames.HTTP_1_1.equals(protocol)) {
- if (ctx.pipeline().get(Netty4HandlerNames.PROGRESS_AND_TIMEOUT) != null) {
- ctx.pipeline()
- .addAfter(Netty4HandlerNames.PROGRESS_AND_TIMEOUT, Netty4HandlerNames.HTTP_RESPONSE,
- new Netty4ResponseHandler(request, responseReference, errorReference, latch));
- ctx.pipeline()
- .addBefore(Netty4HandlerNames.PROGRESS_AND_TIMEOUT, Netty4HandlerNames.HTTP_CODEC, createCodec());
- } else {
- ctx.pipeline().addAfter(Netty4HandlerNames.SSL, Netty4HandlerNames.HTTP_CODEC, createCodec());
- ctx.pipeline()
- .addAfter(Netty4HandlerNames.HTTP_CODEC, Netty4HandlerNames.HTTP_RESPONSE,
- new Netty4ResponseHandler(request, responseReference, errorReference, latch));
- }
+ }
- sendHttp11Request(request, ctx.channel(), errorReference)
- .addListener((ChannelFutureListener) sendListener -> {
- if (!sendListener.isSuccess()) {
- setOrSuppressError(errorReference, sendListener.cause());
- sendListener.channel().close();
- latch.countDown();
- } else {
- sendListener.channel().read();
- }
- });
- } else {
- throw new IllegalStateException("unknown protocol: " + protocol);
+ if (ctx.pipeline().get(ALPN) != null) {
+ ctx.pipeline().remove(this);
}
}
}
diff --git a/sdk/clientcore/http-netty4/src/main/java/io/clientcore/http/netty4/implementation/Netty4ChannelBinaryData.java b/sdk/clientcore/http-netty4/src/main/java/io/clientcore/http/netty4/implementation/Netty4ChannelBinaryData.java
index 837f4d164b1b..f1c8500dbb39 100644
--- a/sdk/clientcore/http-netty4/src/main/java/io/clientcore/http/netty4/implementation/Netty4ChannelBinaryData.java
+++ b/sdk/clientcore/http-netty4/src/main/java/io/clientcore/http/netty4/implementation/Netty4ChannelBinaryData.java
@@ -21,6 +21,7 @@
import java.nio.charset.StandardCharsets;
import java.util.Objects;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicBoolean;
import static io.clientcore.http.netty4.implementation.Netty4Utility.awaitLatch;
@@ -36,6 +37,11 @@ public final class Netty4ChannelBinaryData extends BinaryData {
private final Channel channel;
private final Long length;
private final boolean isHttp2;
+ private final AtomicBoolean streamDrained = new AtomicBoolean(false);
+ private final CountDownLatch drainLatch = new CountDownLatch(1);
+ // Manages the "closed" state, ensuring cleanup happens only once.
+ private final AtomicBoolean closed = new AtomicBoolean(false);
+ private final Runnable onClose;
// Non-final to allow nulling out after use.
private ByteArrayOutputStream eagerContent;
@@ -49,12 +55,23 @@ public final class Netty4ChannelBinaryData extends BinaryData {
* @param channel The Netty {@link Channel}.
* @param length Size of the response body (if known).
* @param isHttp2 Flag indicating whether the handler is used for HTTP/2 or not.
+ * @param onClose The Runnable to run when the {@code close()} method is triggered.
*/
+ public Netty4ChannelBinaryData(ByteArrayOutputStream eagerContent, Channel channel, Long length, boolean isHttp2,
+ Runnable onClose) {
+ this.eagerContent = eagerContent;
+ this.channel = channel;
+ this.length = length;
+ this.isHttp2 = isHttp2;
+ this.onClose = onClose;
+ }
+
public Netty4ChannelBinaryData(ByteArrayOutputStream eagerContent, Channel channel, Long length, boolean isHttp2) {
this.eagerContent = eagerContent;
this.channel = channel;
this.length = length;
this.isHttp2 = isHttp2;
+ this.onClose = null;
}
@Override
@@ -64,27 +81,8 @@ public byte[] toBytes() {
}
if (bytes == null) {
- CountDownLatch latch = new CountDownLatch(1);
- Netty4EagerConsumeChannelHandler handler = new Netty4EagerConsumeChannelHandler(latch,
- buf -> buf.readBytes(eagerContent, buf.readableBytes()), isHttp2);
- channel.pipeline().addLast(Netty4HandlerNames.EAGER_CONSUME, handler);
- channel.config().setAutoRead(true);
-
- awaitLatch(latch);
-
- Throwable exception = handler.channelException();
- if (exception != null) {
- if (exception instanceof Error) {
- throw (Error) exception;
- } else {
- throw CoreException.from(exception);
- }
- } else {
- bytes = eagerContent.toByteArray();
- }
- eagerContent = null;
+ drainStream();
}
-
return bytes;
}
@@ -105,7 +103,7 @@ public T toObject(Type type, ObjectSerializer serializer) {
@Override
public InputStream toStream() {
if (bytes == null) {
- return new Netty4ChannelInputStream(eagerContent, channel, isHttp2);
+ return new Netty4ChannelInputStream(eagerContent, channel, isHttp2, this::close);
} else {
return new ByteArrayInputStream(bytes);
}
@@ -124,36 +122,51 @@ public void writeTo(JsonWriter jsonWriter) {
@Override
public void writeTo(OutputStream outputStream) {
+ Objects.requireNonNull(outputStream, "'outputStream' cannot be null.");
+
try {
- if (bytes == null) {
- // Channel hasn't been read yet, don't buffer it, just write it to the OutputStream as it's being read.
- if (eagerContent.size() > 0) {
- outputStream.write(eagerContent.toByteArray());
- }
+ if (bytes != null) {
+ outputStream.write(bytes);
+ return;
+ }
- CountDownLatch latch = new CountDownLatch(1);
- Netty4EagerConsumeChannelHandler handler = new Netty4EagerConsumeChannelHandler(latch,
- buf -> buf.readBytes(outputStream, buf.readableBytes()), isHttp2);
- channel.pipeline().addLast(Netty4HandlerNames.EAGER_CONSUME, handler);
- channel.config().setAutoRead(true);
+ if (streamDrained.compareAndSet(false, true)) {
+ try {
+ // Channel hasn't been read yet, don't buffer it, just write it to the OutputStream as it's being read.
+ if (eagerContent != null && eagerContent.size() > 0) {
+ eagerContent.writeTo(outputStream);
+ }
- awaitLatch(latch);
+ CountDownLatch latch = new CountDownLatch(1);
+ Netty4EagerConsumeChannelHandler handler = new Netty4EagerConsumeChannelHandler(latch,
+ buf -> buf.readBytes(outputStream, buf.readableBytes()), isHttp2);
+ channel.pipeline().addLast(Netty4HandlerNames.EAGER_CONSUME, handler);
+ channel.config().setAutoRead(true);
- Throwable exception = handler.channelException();
- if (exception != null) {
- if (exception instanceof Error) {
- throw (Error) exception;
- } else {
- throw CoreException.from(exception);
+ awaitLatch(latch);
+
+ Throwable exception = handler.channelException();
+ if (exception != null) {
+ if (exception instanceof Error) {
+ throw (Error) exception;
+ } else {
+ throw CoreException.from(exception);
+ }
+ }
+ } finally {
+ eagerContent = null;
+ drainLatch.countDown();
+
+ if (onClose != null) {
+ onClose.run();
}
}
- eagerContent = null;
} else {
- // Already converted the Channel to a byte[], use it.
- outputStream.write(bytes);
+ throw LOGGER.throwableAtError()
+ .log("The stream has already been consumed and is not replayable.", IllegalStateException::new);
}
- } catch (IOException ex) {
- throw LOGGER.throwableAtError().log(ex, CoreException::from);
+ } catch (IOException e) {
+ throw CoreException.from(e);
}
}
@@ -182,10 +195,96 @@ public BinaryData toReplayableBinaryData() {
return BinaryData.fromBytes(toBytes());
}
+ /**
+ * Ensures the underlying network stream is fully consumed but does not close the channel,
+ * allowing it to be reused by the connection pool.
+ */
@Override
public void close() {
- eagerContent = null;
- channel.disconnect();
- channel.close();
+ if (closed.compareAndSet(false, true)) {
+ // If draining hasn't started, it means the stream was not consumed.
+ // We need to drain it to ensure the connection can be safely reused.
+ if (!streamDrained.get()) {
+ drainAndCleanupAsync();
+ } else {
+ if (onClose != null) {
+ onClose.run();
+ }
+ }
+ }
+ }
+
+ private void drainAndCleanupAsync() {
+ if (streamDrained.compareAndSet(false, true)) {
+ if (!channel.isActive()) {
+ if (onClose != null) {
+ onClose.run();
+ }
+ drainLatch.countDown();
+ return;
+ }
+
+ Netty4EagerConsumeChannelHandler handler = new Netty4EagerConsumeChannelHandler(() -> {
+ if (onClose != null) {
+ onClose.run();
+ }
+ drainLatch.countDown();
+ }, isHttp2);
+
+ channel.pipeline().addLast(Netty4HandlerNames.EAGER_CONSUME, handler);
+ channel.config().setAutoRead(true);
+ } else {
+ awaitLatch(drainLatch);
+ }
+ }
+
+ private void drainStream() {
+ if (streamDrained.compareAndSet(false, true)) {
+ try {
+ if (length != null && eagerContent != null && eagerContent.size() >= length) {
+ bytes = eagerContent.toByteArray();
+ return;
+ }
+
+ if (!channel.isActive()) {
+ // The connection was closed before we could read the full body.
+ // Check if, by chance, the eager content we already have satisfies the full length.
+ // This can happen if the body was very small and the server closed immediately.
+ if (length != null && eagerContent != null && length == eagerContent.size()) {
+ bytes = eagerContent.toByteArray();
+ return;
+ }
+ throw LOGGER.throwableAtError()
+ .log("Connection closed prematurely while reading response body.", IOException::new);
+ }
+
+ CountDownLatch ioLatch = new CountDownLatch(1);
+ Netty4EagerConsumeChannelHandler handler = new Netty4EagerConsumeChannelHandler(ioLatch,
+ buf -> buf.readBytes(eagerContent, buf.readableBytes()), isHttp2);
+
+ channel.pipeline().addLast(Netty4HandlerNames.EAGER_CONSUME, handler);
+ channel.config().setAutoRead(true);
+
+ awaitLatch(ioLatch);
+ Throwable exception = handler.channelException();
+
+ if (exception != null) {
+ if (exception instanceof Error) {
+ throw (Error) exception;
+ } else {
+ throw CoreException.from(exception);
+ }
+ } else {
+ bytes = eagerContent.toByteArray();
+ }
+ } catch (IOException e) {
+ throw LOGGER.throwableAtError().log(e, CoreException::from);
+ } finally {
+ eagerContent = null;
+ drainLatch.countDown();
+ }
+ } else {
+ awaitLatch(drainLatch);
+ }
}
}
diff --git a/sdk/clientcore/http-netty4/src/main/java/io/clientcore/http/netty4/implementation/Netty4ChannelInputStream.java b/sdk/clientcore/http-netty4/src/main/java/io/clientcore/http/netty4/implementation/Netty4ChannelInputStream.java
index 418df7945248..de5d4c96c0af 100644
--- a/sdk/clientcore/http-netty4/src/main/java/io/clientcore/http/netty4/implementation/Netty4ChannelInputStream.java
+++ b/sdk/clientcore/http-netty4/src/main/java/io/clientcore/http/netty4/implementation/Netty4ChannelInputStream.java
@@ -7,7 +7,8 @@
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
-import java.util.LinkedList;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
/**
@@ -16,6 +17,7 @@
public final class Netty4ChannelInputStream extends InputStream {
private final Channel channel;
private final boolean isHttp2;
+ private final Runnable onClose;
// Indicator for the Channel being fully read.
// This will become true before 'streamDone' becomes true, but both may become true in the same operation.
@@ -27,9 +29,9 @@ public final class Netty4ChannelInputStream extends InputStream {
// Once this is true, the stream will never return data again.
private boolean streamDone = false;
- // Linked list of byte[]s that maintains the last available contents from the Channel / eager content.
- // A list is needed as each Channel.read() may result in many channelRead calls.
- private final LinkedList additionalBuffers;
+ // Queue of byte[]s that maintains the last available contents from the Channel / eager content.
+ // A queue is needed as each Channel.read() may result in many channelRead calls.
+ private final Queue additionalBuffers;
private byte[] currentBuffer;
@@ -46,20 +48,23 @@ public final class Netty4ChannelInputStream extends InputStream {
* status line and response headers.
* @param channel The {@link Channel} to read from.
* @param isHttp2 Flag indicating whether the Channel is used for HTTP/2 or not.
+ * @param onClose A runnable to execute when the stream is closed.
*/
- Netty4ChannelInputStream(ByteArrayOutputStream eagerContent, Channel channel, boolean isHttp2) {
+ Netty4ChannelInputStream(ByteArrayOutputStream eagerContent, Channel channel, boolean isHttp2, Runnable onClose) {
if (eagerContent != null && eagerContent.size() > 0) {
this.currentBuffer = eagerContent.toByteArray();
+ eagerContent.reset();
} else {
this.currentBuffer = new byte[0];
}
this.readIndex = 0;
- this.additionalBuffers = new LinkedList<>();
+ this.additionalBuffers = new ConcurrentLinkedQueue<>();
this.channel = channel;
if (channel.pipeline().get(Netty4InitiateOneReadHandler.class) != null) {
channel.pipeline().remove(Netty4InitiateOneReadHandler.class);
}
this.isHttp2 = isHttp2;
+ this.onClose = onClose;
}
byte[] getCurrentBuffer() {
@@ -167,19 +172,28 @@ public long skip(long n) throws IOException {
return n - toSkip;
}
+ /**
+ * Closes this input stream and ensures the underlying connection can be returned to the pool.
+ * This method does not close the underlying channel. Instead, it triggers the onClose
+ * callback which is responsible for draining the rest of the stream content.
+ */
@Override
- public void close() {
- currentBuffer = null;
- additionalBuffers.clear();
- if (channel.isOpen() || channel.isActive()) {
- channel.disconnect();
- channel.close();
+ public void close() throws IOException {
+ try {
+ if (onClose != null) {
+ onClose.run();
+ }
+ } finally {
+ super.close();
+ currentBuffer = null;
+ additionalBuffers.clear();
+ streamDone = true;
}
}
private boolean setupNextBuffer() throws IOException {
if (!additionalBuffers.isEmpty()) {
- currentBuffer = additionalBuffers.pop();
+ currentBuffer = additionalBuffers.poll();
readIndex = 0;
return true;
} else if (readMore()) {
@@ -214,7 +228,7 @@ private boolean readMore() throws IOException {
byte[] buffer = new byte[byteBuf.readableBytes()];
byteBuf.readBytes(buffer);
- additionalBuffers.add(buffer);
+ additionalBuffers.offer(buffer);
}, isHttp2);
channel.pipeline().addLast(Netty4HandlerNames.READ_ONE, handler);
}
@@ -242,7 +256,7 @@ private boolean readMore() throws IOException {
}
if (!additionalBuffers.isEmpty()) {
- currentBuffer = additionalBuffers.pop();
+ currentBuffer = additionalBuffers.poll();
readIndex = 0;
} else if (channelDone) { // Don't listen to IntelliJ here, channelDone may be false.
// This read contained no data and the channel completed, therefore the stream is also completed.
diff --git a/sdk/clientcore/http-netty4/src/main/java/io/clientcore/http/netty4/implementation/Netty4ConnectionPool.java b/sdk/clientcore/http-netty4/src/main/java/io/clientcore/http/netty4/implementation/Netty4ConnectionPool.java
new file mode 100644
index 000000000000..f5645652f5bb
--- /dev/null
+++ b/sdk/clientcore/http-netty4/src/main/java/io/clientcore/http/netty4/implementation/Netty4ConnectionPool.java
@@ -0,0 +1,709 @@
+// Copyright (c) Microsoft Corporation. All rights reserved.
+// Licensed under the MIT License.
+
+package io.clientcore.http.netty4.implementation;
+
+import io.clientcore.core.http.client.HttpProtocolVersion;
+import io.clientcore.core.instrumentation.logging.ClientLogger;
+import io.clientcore.core.models.CoreException;
+import io.clientcore.core.utils.AuthenticateChallenge;
+import io.netty.bootstrap.Bootstrap;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelHandler;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelPipeline;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.socket.ChannelInputShutdownEvent;
+import io.netty.handler.codec.http2.Http2GoAwayFrame;
+import io.netty.handler.proxy.HttpProxyHandler;
+import io.netty.handler.proxy.ProxyHandler;
+import io.netty.handler.ssl.ApplicationProtocolNames;
+import io.netty.handler.ssl.ApplicationProtocolNegotiationHandler;
+import io.netty.handler.ssl.SslCloseCompletionEvent;
+import io.netty.handler.ssl.SslContext;
+import io.netty.handler.ssl.SslContextBuilder;
+import io.netty.util.AttributeKey;
+import io.netty.util.concurrent.Future;
+import io.netty.util.concurrent.Promise;
+import io.netty.util.concurrent.ScheduledFuture;
+
+import javax.net.ssl.SSLException;
+import java.io.Closeable;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.time.Duration;
+import java.time.OffsetDateTime;
+import java.time.ZoneOffset;
+import java.util.Deque;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedDeque;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.function.Consumer;
+
+import static io.clientcore.http.netty4.implementation.Netty4HandlerNames.CONNECTION_POOL_ALPN;
+import static io.clientcore.http.netty4.implementation.Netty4HandlerNames.POOL_CONNECTION_HEALTH;
+import static io.clientcore.http.netty4.implementation.Netty4HandlerNames.PROXY;
+import static io.clientcore.http.netty4.implementation.Netty4HandlerNames.PROXY_EXCEPTION_WARNING_SUPPRESSION;
+import static io.clientcore.http.netty4.implementation.Netty4HandlerNames.SSL;
+import static io.clientcore.http.netty4.implementation.Netty4HandlerNames.SSL_GRACEFUL_SHUTDOWN;
+import static io.clientcore.http.netty4.implementation.Netty4Utility.buildSslContext;
+
+/**
+ * A thread-safe pool of Netty {@link Channel}s that are reused for requests to the same route.
+ *
+ * This connection pool manages the entire connection lifecycle, including TCP connected, proxy handshakes, and the
+ * asynchronous SSL/ALPN negotiation. It is designed to return fully configured channels that are
+ * ready for immediate use, thereby eliminating per-request handshake latency.
+ */
+public class Netty4ConnectionPool implements Closeable {
+
+ /**
+ * An AttributeKey referring to a channel-specific {@link ReentrantLock}.
+ *
+ * This lock is used to ensure that the setup and cleanup of a channel's pipeline are atomic operations.
+ * It protects against race conditions where a channel might be acquired from the pool and configured for a new
+ * request before the cleanup from the previous request has fully completed. Each channel gets its own unique
+ * lock instance, making the lock contention extremely low.
+ */
+ public static final AttributeKey CHANNEL_LOCK = AttributeKey.valueOf("channel-lock");
+
+ /**
+ * A unique token created for each request to identify the current owner of a channel pipeline.
+ *
+ * It protects against stale cleanup handlers from previous, timed-out, or failed requests,
+ * ensuring that only the {@link Netty4PipelineCleanupHandler} that belongs to the current,
+ * active request is allowed to modify the pipeline.
+ */
+ public static final AttributeKey