Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
60 commits
Select commit Hold shift + click to select a range
1ddd05d
Http-netty4 connection pool implementation
georgebanasios Jun 28, 2025
a379a00
Merge remote-tracking branch 'origin/main' into connection-pool-http-…
georgebanasios Jun 29, 2025
784ade2
fix duplicate http response handler in the pipeline
georgebanasios Jun 29, 2025
7edd7a9
apply ALPN after the correct handler
georgebanasios Jun 29, 2025
ce50c9e
fix formatting
georgebanasios Jun 30, 2025
6b73bb3
fix Netty4ChannelBinaryData
georgebanasios Jun 30, 2025
c8693a1
fix connection pool release race condition
georgebanasios Jun 30, 2025
e1525d8
adjust chunked request
georgebanasios Jun 30, 2025
f26b6c3
Revert "adjust chunked request"
georgebanasios Jun 30, 2025
1602d4b
fix proxy logic when acquiring a channel
georgebanasios Jul 1, 2025
989b302
cleanup the pipeline after the last content
georgebanasios Jul 1, 2025
f0a7847
create channel wrapper
georgebanasios Jul 1, 2025
c8c648b
remove channel closure from response handler
georgebanasios Jul 1, 2025
8dea326
try and fix the release deadlock
georgebanasios Jul 1, 2025
e076602
handle case where read handlers are added to a closed channel
georgebanasios Jul 2, 2025
8de78bd
remove synchronized
georgebanasios Jul 2, 2025
ed27af7
apply ALPN only on new connections
georgebanasios Jul 2, 2025
51fa938
try and fix deadlock
georgebanasios Jul 2, 2025
f3eee88
Revert "try and fix deadlock"
georgebanasios Jul 2, 2025
61c273b
deadlock fix v2
georgebanasios Jul 3, 2025
40b929a
Revert "deadlock fix v2"
georgebanasios Jul 3, 2025
792a06e
deadlock fix v3
georgebanasios Jul 3, 2025
56bbb97
Revert "deadlock fix v3"
georgebanasios Jul 3, 2025
f849ec8
deadlock fix attempt v4
georgebanasios Jul 4, 2025
140a095
fix on input stream close method
georgebanasios Jul 4, 2025
c0f9beb
deadlock fix attempt v5
georgebanasios Jul 4, 2025
a2d0bcd
Revert "deadlock fix attempt v5"
georgebanasios Jul 4, 2025
d66fd1d
deadlock fix attempt v6
georgebanasios Jul 4, 2025
42f72c7
make method more efficient
georgebanasios Jul 5, 2025
4606fe4
fix response handler tests
georgebanasios Jul 5, 2025
6fdf2e7
make close method of netty binary data non blocking
georgebanasios Jul 5, 2025
623503c
cleanup connection on netty binary data
georgebanasios Jul 6, 2025
f085ece
add exception handling on cleanup handler
georgebanasios Jul 6, 2025
d66d154
Netty4ChannelBinaryData adjustments
georgebanasios Jul 6, 2025
aa1fba9
try to not block on send method
georgebanasios Jul 7, 2025
720b395
disable connection pooling functionality
georgebanasios Jul 8, 2025
670687f
adjust cleanups
georgebanasios Jul 9, 2025
c0022f4
use event for pipeline cleanup
georgebanasios Jul 9, 2025
7b3c8fe
attempt to synchronize the pipeline modification and cleaup for the s…
georgebanasios Jul 10, 2025
1a28409
self removal of handlers
georgebanasios Jul 10, 2025
ed9b857
attempt to fix race condition
georgebanasios Jul 10, 2025
fec1ffc
close clients after tests
georgebanasios Jul 13, 2025
ca36245
Revert "close clients after tests"
georgebanasios Jul 13, 2025
437616b
http2 connecetions
georgebanasios Jul 13, 2025
40ef6ee
synchronize connection pool
georgebanasios Jul 13, 2025
59f9240
Revert "synchronize connection pool"
georgebanasios Jul 14, 2025
9e65d78
check connection
georgebanasios Jul 16, 2025
84be996
Revert "check connection"
georgebanasios Jul 16, 2025
e092c9c
try to centralize waiting queue logic
georgebanasios Jul 16, 2025
df003e0
Revert "try to centralize waiting queue logic"
georgebanasios Jul 16, 2025
dbf1c69
enable half closure
georgebanasios Jul 16, 2025
9a3477b
add lock on acquire/release
georgebanasios Jul 16, 2025
6855d45
documentation fix & pr comments
georgebanasios Jul 19, 2025
814285d
fix ssl/alpn on pooled connections
georgebanasios Jul 21, 2025
91f72e0
Merge remote-tracking branch 'origin/main' into connection-pool-http-…
georgebanasios Jul 22, 2025
879c638
remove duplicate line & javadoc fix
georgebanasios Jul 22, 2025
d4694d8
add javadoc on netty provider
georgebanasios Jul 23, 2025
61be579
add HTTP client tests for pooled connections
georgebanasios Jul 23, 2025
5551d47
fix builder logging configuration logic
georgebanasios Jul 23, 2025
c64019a
fix merge conflicts
georgebanasios Aug 1, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions sdk/clientcore/http-netty4/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,12 @@
<version>2.5.2</version> <!-- {x-version-update;org.conscrypt:conscrypt-openjdk-uber;external_dependency} -->
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<version>4.11.0</version> <!-- {x-version-update;org.mockito:mockito-core;external_dependency} -->
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand Down
15 changes: 15 additions & 0 deletions sdk/clientcore/http-netty4/spotbugs-exclude.xml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
<Class name="io.clientcore.http.netty4.implementation.Netty4ChannelBinaryData" />
<Class name="io.clientcore.http.netty4.implementation.Netty4HttpProxyHandler" />
<Class name="io.clientcore.http.netty4.implementation.Netty4StreamingHttp2Adapter" />
<Class name="io.clientcore.http.netty4.implementation.Netty4ConnectionPool" />
</Or>
</Match>
<Match>
Expand Down Expand Up @@ -61,4 +62,18 @@
<Bug pattern="VO_VOLATILE_REFERENCE_TO_ARRAY" />
<Class name="io.clientcore.http.netty4.implementation.Netty4ChannelBinaryData" />
</Match>
<Match>
<Bug pattern="RV_RETURN_VALUE_IGNORED_BAD_PRACTICE" />
<Or>
<Class name="io.clientcore.http.netty4.implementation.Netty4ChannelInputStream" />
<Class name="io.clientcore.http.netty4.implementation.Netty4ConnectionPoolTests" />
</Or>
</Match>
<Match>
<Bug pattern="DP_DO_INSIDE_DO_PRIVILEGED" />
<Or>
<Class name="io.clientcore.http.netty4.NettyHttpClientBuilderTests" />
<Class name="io.clientcore.http.netty4.implementation.Netty4ConnectionPoolTests" />
</Or>
</Match>
</FindBugsFilter>

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,10 @@
import io.clientcore.core.http.client.HttpProtocolVersion;
import io.clientcore.core.http.models.ProxyOptions;
import io.clientcore.core.instrumentation.logging.ClientLogger;
import io.clientcore.core.instrumentation.logging.LoggingEvent;
import io.clientcore.core.utils.configuration.Configuration;
import io.clientcore.http.netty4.implementation.ChannelInitializationProxyHandler;
import io.clientcore.http.netty4.implementation.Netty4ConnectionPool;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelOption;
Expand Down Expand Up @@ -135,6 +137,13 @@ private static Class<? extends SocketChannel> getChannelClass(String className)
private Duration writeTimeout;
private HttpProtocolVersion maximumHttpVersion = HttpProtocolVersion.HTTP_2;

// --- Connection Pool Configuration ---
private int connectionPoolSize = 1000;
private Duration connectionIdleTimeout = Duration.ofSeconds(60);
private Duration maxConnectionLifetime;
private Duration pendingAcquireTimeout = Duration.ofSeconds(60); // Default wait time for a connection
private int maxPendingAcquires = 10_000; // Default pending queue size

/**
* Creates a new instance of {@link NettyHttpClientBuilder}.
*/
Expand Down Expand Up @@ -281,6 +290,86 @@ public NettyHttpClientBuilder maximumHttpVersion(HttpProtocolVersion httpVersion
return this;
}

/**
* Sets the maximum number of connections allowed per remote address in the connection pool.
* <p>
* If not set, a default value of 1000 is used.
* <p>
* <strong>A value of {@code 0} or less will disable connection pooling, and hence each request will
* get a newly created connection.</strong>
*
* @param connectionPoolSize The maximum number of connections.
* @return The updated builder.
*/
public NettyHttpClientBuilder connectionPoolSize(int connectionPoolSize) {
this.connectionPoolSize = connectionPoolSize;
return this;
}

/**
* Sets the maximum time a connection can remain idle in the pool before it is closed and removed.
* <p>
* If not set, a default value of 60 seconds is used.
* <p>
* A {@link Duration} of zero or less will make the connections never expire. <strong>Note:</strong> While this is
* provided as an option, it is <strong>not recommended for most use cases</strong>, as it can lead to
* request failures if network intermediaries (like load balancers or firewalls) silently drop idle
* connections.
*
* @param connectionIdleTimeout The idle timeout duration.
* @return The updated builder.
*/
public NettyHttpClientBuilder connectionIdleTimeout(Duration connectionIdleTimeout) {
this.connectionIdleTimeout = connectionIdleTimeout;
return this;
}

/**
* Sets the maximum time a connection is allowed to exist.
* <p>
* By default, connections have no lifetime limit and can be used indefinitely.
* <p>
* After this time is met or exceeded, the connection will be closed upon release. A {@link Duration} of zero or
* less, or a null value, will also result in connections having no lifetime limit.
*
* @param maxConnectionLifetime The maximum connection lifetime.
* @return The updated builder.
*/
public NettyHttpClientBuilder maxConnectionLifetime(Duration maxConnectionLifetime) {
this.maxConnectionLifetime = maxConnectionLifetime;
return this;
}

/**
* Sets the maximum time to wait for a connection from the pool.
* <p>
* If not set, a default value of 60 seconds is used.
* @param pendingAcquireTimeout The timeout for pending acquires.
* @return The updated builder.
*/
public NettyHttpClientBuilder pendingAcquireTimeout(Duration pendingAcquireTimeout) {
this.pendingAcquireTimeout = pendingAcquireTimeout;
return this;
}

/**
* Sets the maximum number of requests that can be queued waiting for a connection.
* <p>
* This limit is applied on a per-route (per-host) basis.
* If not set, a default value of 10_000 is used.
*
* @param maxPendingAcquires The maximum number of pending acquires.
* @return The updated builder.
*/
public NettyHttpClientBuilder maxPendingAcquires(int maxPendingAcquires) {
if (maxPendingAcquires <= 0) {
throw LOGGER.throwableAtError()
.log("maxPendingAcquires must be greater than 0", IllegalArgumentException::new);
}
this.maxPendingAcquires = maxPendingAcquires;
return this;
}

/**
* Builds the NettyHttpClient.
*
Expand All @@ -293,28 +382,51 @@ public HttpClient build() {
= getChannelClass(this.channelClass, group.getClass(), IS_EPOLL_AVAILABLE, IS_KQUEUE_AVAILABLE);

// Leave breadcrumbs about the NettyHttpClient configuration, in case troubleshooting is needed.
LOGGER.atVerbose()
LoggingEvent loggingEvent = LOGGER.atVerbose()
.addKeyValue("customEventLoopGroup", eventLoopGroup != null)
.addKeyValue("eventLoopGroupClass", group.getClass())
.addKeyValue("customChannelClass", this.channelClass != null)
.addKeyValue("channelClass", channelClass)
.log("NettyHttpClient was built with these configurations.");
.addKeyValue("channelClass", channelClass);

if (connectionPoolSize > 0) {
loggingEvent.addKeyValue("connectionPoolSize", this.connectionPoolSize)
.addKeyValue("connectionIdleTimeout", this.connectionIdleTimeout)
.addKeyValue("maxConnectionLifetime", this.maxConnectionLifetime)
.addKeyValue("pendingAcquireTimeout", this.pendingAcquireTimeout)
.addKeyValue("maxPendingAcquires", this.maxPendingAcquires);
}

loggingEvent.log("NettyHttpClient was built with these configurations.");

Bootstrap bootstrap = new Bootstrap().group(group)
.channel(channelClass)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, (int) getTimeoutMillis(connectTimeout, 10_000));
// Disable auto-read as we want to control when and how data is read from the channel.
bootstrap.option(ChannelOption.AUTO_READ, false);
// Enable TCP keep-alive to proactively detect and clean up stale connections in the pool. This helps evict
// connections that have been silently dropped by network intermediaries.
bootstrap.option(ChannelOption.SO_KEEPALIVE, true);
// Allow the channel to remain open for writing even after the server has closed its sending side.
// This helps detect half-closures with a ChannelInputShutdownEvent in the PoolConnectionHealthHandler.
bootstrap.option(ChannelOption.ALLOW_HALF_CLOSURE, true);

Configuration buildConfiguration
= (configuration == null) ? Configuration.getGlobalConfiguration() : configuration;

ProxyOptions buildProxyOptions
= (proxyOptions == null) ? ProxyOptions.fromConfiguration(buildConfiguration, true) : proxyOptions;

return new NettyHttpClient(bootstrap, sslContextModifier, maximumHttpVersion,
new ChannelInitializationProxyHandler(buildProxyOptions), getTimeoutMillis(readTimeout),
getTimeoutMillis(responseTimeout), getTimeoutMillis(writeTimeout));
Netty4ConnectionPool connectionPool = null;
if (connectionPoolSize > 0) {
connectionPool
= new Netty4ConnectionPool(bootstrap, new ChannelInitializationProxyHandler(buildProxyOptions),
sslContextModifier, connectionPoolSize, connectionIdleTimeout, maxConnectionLifetime,
pendingAcquireTimeout, maxPendingAcquires, maximumHttpVersion);
}

return new NettyHttpClient(bootstrap, group, connectionPool, buildProxyOptions,
new ChannelInitializationProxyHandler(buildProxyOptions), sslContextModifier, maximumHttpVersion,
getTimeoutMillis(readTimeout), getTimeoutMillis(responseTimeout), getTimeoutMillis(writeTimeout));
}

static long getTimeoutMillis(Duration duration) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,20 @@ public HttpClient getHttpClient() {
public NettyHttpClientProvider() {
}

/**
* Creates a new {@link HttpClient} instance with a default, shared connection pool.
* <p>
* For more advanced customization, such as disabling pooling entirely, use the {@link NettyHttpClientBuilder}.
* <p>
* <b>Example: Creating a client without a connection pool</b>
* <pre>{@code
* HttpClient client = new NettyHttpClientBuilder()
* .connectionPoolSize(0)
* .build();
* }</pre>
*
* @return A new {@link HttpClient} instance.
*/
@Override
public HttpClient getNewInstance() {
return new NettyHttpClientBuilder().build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ public boolean test(SocketAddress socketAddress) {
InetSocketAddress inetSocketAddress = (InetSocketAddress) socketAddress;
String hostString = inetSocketAddress.getHostString();

return hostString != null && nonProxyHostsPattern.matcher(hostString).matches();
return hostString != null && !nonProxyHostsPattern.matcher(hostString).matches();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,34 +2,38 @@
// Licensed under the MIT License.
package io.clientcore.http.netty4.implementation;

import io.clientcore.core.http.client.HttpProtocolVersion;
import io.clientcore.core.http.models.HttpRequest;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPipeline;
import io.netty.handler.codec.http2.DefaultHttp2Connection;
import io.netty.handler.codec.http2.DelegatingDecompressorFrameListener;
import io.netty.handler.codec.http2.Http2Connection;
import io.netty.handler.codec.http2.Http2FrameListener;
import io.netty.handler.codec.http2.Http2Settings;
import io.netty.handler.codec.http2.HttpToHttp2ConnectionHandler;
import io.netty.handler.codec.http2.HttpToHttp2ConnectionHandlerBuilder;
import io.netty.handler.codec.http2.InboundHttp2ToHttpAdapterBuilder;
import io.netty.handler.ssl.ApplicationProtocolNames;
import io.netty.handler.ssl.ApplicationProtocolNegotiationHandler;
import io.netty.util.AttributeKey;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicReference;

import static io.clientcore.http.netty4.implementation.Netty4Utility.createCodec;
import static io.clientcore.http.netty4.implementation.Netty4HandlerNames.ALPN;
import static io.clientcore.http.netty4.implementation.Netty4Utility.configureHttpsPipeline;
import static io.clientcore.http.netty4.implementation.Netty4Utility.sendHttp11Request;
import static io.clientcore.http.netty4.implementation.Netty4Utility.sendHttp2Request;
import static io.clientcore.http.netty4.implementation.Netty4Utility.setOrSuppressError;

/**
* Handler that deals with application protocol negotiation (ALPN) and configures the {@link ChannelPipeline} to use
* either HTTP/1.1 or HTTP/2 based on the result of negotiation.
*/
public final class Netty4AlpnHandler extends ApplicationProtocolNegotiationHandler {
private static final int TWO_FIFTY_SIX_KB = 256 * 1024;

/**
* An Attribute key for the channel storing the HTTP protocol version that was negotiated.
* This information will be used in case the same channel is reused in the future, so we can
* adjust the correct handlers because there's no need for ALPN to run again.
*/
public static final AttributeKey<HttpProtocolVersion> HTTP_PROTOCOL_VERSION_KEY
= AttributeKey.valueOf("http-protocol-version");

private final HttpRequest request;
private final AtomicReference<ResponseStateInfo> responseReference;
private final AtomicReference<Throwable> errorReference;
Expand All @@ -38,9 +42,9 @@ public final class Netty4AlpnHandler extends ApplicationProtocolNegotiationHandl
/**
* Creates a new instance of {@link Netty4AlpnHandler} with a fallback to using HTTP/1.1.
*
* @param request The request to send once ALPN negotiation completes.
* @param request The request to send once ALPN negotiation completes.
* @param errorReference An AtomicReference keeping track of errors during the request lifecycle.
* @param latch A CountDownLatch that will be released once the request completes.
* @param latch A CountDownLatch that will be released once the request completes.
*/
public Netty4AlpnHandler(HttpRequest request, AtomicReference<ResponseStateInfo> responseReference,
AtomicReference<Throwable> errorReference, CountDownLatch latch) {
Expand All @@ -58,78 +62,37 @@ public boolean isSharable() {

@Override
protected void configurePipeline(ChannelHandlerContext ctx, String protocol) {
HttpProtocolVersion protocolVersion;
if (ApplicationProtocolNames.HTTP_2.equals(protocol)) {
// TODO (alzimmer): InboundHttp2ToHttpAdapter buffers the entire response into a FullHttpResponse. Need to
// create a streaming version of this to support huge response payloads.
Http2Connection http2Connection = new DefaultHttp2Connection(false);
Http2Settings settings = new Http2Settings().headerTableSize(4096)
.maxHeaderListSize(TWO_FIFTY_SIX_KB)
.pushEnabled(false)
.initialWindowSize(TWO_FIFTY_SIX_KB);
Http2FrameListener frameListener = new DelegatingDecompressorFrameListener(http2Connection,
new InboundHttp2ToHttpAdapterBuilder(http2Connection).maxContentLength(Integer.MAX_VALUE)
.propagateSettings(true)
.validateHttpHeaders(true)
.build(),
0);
protocolVersion = HttpProtocolVersion.HTTP_2;
} else if (ApplicationProtocolNames.HTTP_1_1.equals(protocol)) {
protocolVersion = HttpProtocolVersion.HTTP_1_1;
} else {
ctx.fireExceptionCaught(new IllegalStateException("unknown protocol: " + protocol));
return;
}

HttpToHttp2ConnectionHandler connectionHandler
= new HttpToHttp2ConnectionHandlerBuilder().initialSettings(settings)
.frameListener(frameListener)
.connection(http2Connection)
.validateHeaders(true)
.build();
ctx.channel().attr(HTTP_PROTOCOL_VERSION_KEY).set(protocolVersion);

if (ctx.pipeline().get(Netty4HandlerNames.PROGRESS_AND_TIMEOUT) != null) {
ctx.pipeline()
.addAfter(Netty4HandlerNames.PROGRESS_AND_TIMEOUT, Netty4HandlerNames.HTTP_RESPONSE,
new Netty4ResponseHandler(request, responseReference, errorReference, latch));
ctx.pipeline()
.addBefore(Netty4HandlerNames.PROGRESS_AND_TIMEOUT, Netty4HandlerNames.HTTP_CODEC,
connectionHandler);
} else {
ctx.pipeline().addAfter(Netty4HandlerNames.SSL, Netty4HandlerNames.HTTP_CODEC, connectionHandler);
ctx.pipeline()
.addAfter(Netty4HandlerNames.HTTP_CODEC, Netty4HandlerNames.HTTP_RESPONSE,
new Netty4ResponseHandler(request, responseReference, errorReference, latch));
}
configureHttpsPipeline(ctx.pipeline(), request, protocolVersion, responseReference, errorReference, latch);

if (protocolVersion == HttpProtocolVersion.HTTP_2) {
sendHttp2Request(request, ctx.channel(), errorReference, latch);
} else {
sendHttp11Request(request, ctx.channel(), errorReference)
.addListener((ChannelFutureListener) sendListener -> {
if (!sendListener.isSuccess()) {
setOrSuppressError(errorReference, sendListener.cause());
sendListener.channel().close();
sendListener.channel().pipeline().fireExceptionCaught(sendListener.cause());
latch.countDown();
} else {
sendListener.channel().read();
}
});
} else if (ApplicationProtocolNames.HTTP_1_1.equals(protocol)) {
if (ctx.pipeline().get(Netty4HandlerNames.PROGRESS_AND_TIMEOUT) != null) {
ctx.pipeline()
.addAfter(Netty4HandlerNames.PROGRESS_AND_TIMEOUT, Netty4HandlerNames.HTTP_RESPONSE,
new Netty4ResponseHandler(request, responseReference, errorReference, latch));
ctx.pipeline()
.addBefore(Netty4HandlerNames.PROGRESS_AND_TIMEOUT, Netty4HandlerNames.HTTP_CODEC, createCodec());
} else {
ctx.pipeline().addAfter(Netty4HandlerNames.SSL, Netty4HandlerNames.HTTP_CODEC, createCodec());
ctx.pipeline()
.addAfter(Netty4HandlerNames.HTTP_CODEC, Netty4HandlerNames.HTTP_RESPONSE,
new Netty4ResponseHandler(request, responseReference, errorReference, latch));
}
}

sendHttp11Request(request, ctx.channel(), errorReference)
.addListener((ChannelFutureListener) sendListener -> {
if (!sendListener.isSuccess()) {
setOrSuppressError(errorReference, sendListener.cause());
sendListener.channel().close();
latch.countDown();
} else {
sendListener.channel().read();
}
});
} else {
throw new IllegalStateException("unknown protocol: " + protocol);
if (ctx.pipeline().get(ALPN) != null) {
ctx.pipeline().remove(this);
}
}
}
Loading