Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
60 commits
Select commit Hold shift + click to select a range
1ddd05d
Http-netty4 connection pool implementation
georgebanasios Jun 28, 2025
a379a00
Merge remote-tracking branch 'origin/main' into connection-pool-http-…
georgebanasios Jun 29, 2025
784ade2
fix duplicate http response handler in the pipeline
georgebanasios Jun 29, 2025
7edd7a9
apply ALPN after the correct handler
georgebanasios Jun 29, 2025
ce50c9e
fix formatting
georgebanasios Jun 30, 2025
6b73bb3
fix Netty4ChannelBinaryData
georgebanasios Jun 30, 2025
c8693a1
fix connection pool release race condition
georgebanasios Jun 30, 2025
e1525d8
adjust chunked request
georgebanasios Jun 30, 2025
f26b6c3
Revert "adjust chunked request"
georgebanasios Jun 30, 2025
1602d4b
fix proxy logic when acquiring a channel
georgebanasios Jul 1, 2025
989b302
cleanup the pipeline after the last content
georgebanasios Jul 1, 2025
f0a7847
create channel wrapper
georgebanasios Jul 1, 2025
c8c648b
remove channel closure from response handler
georgebanasios Jul 1, 2025
8dea326
try and fix the release deadlock
georgebanasios Jul 1, 2025
e076602
handle case where read handlers are added to a closed channel
georgebanasios Jul 2, 2025
8de78bd
remove synchronized
georgebanasios Jul 2, 2025
ed27af7
apply ALPN only on new connections
georgebanasios Jul 2, 2025
51fa938
try and fix deadlock
georgebanasios Jul 2, 2025
f3eee88
Revert "try and fix deadlock"
georgebanasios Jul 2, 2025
61c273b
deadlock fix v2
georgebanasios Jul 3, 2025
40b929a
Revert "deadlock fix v2"
georgebanasios Jul 3, 2025
792a06e
deadlock fix v3
georgebanasios Jul 3, 2025
56bbb97
Revert "deadlock fix v3"
georgebanasios Jul 3, 2025
f849ec8
deadlock fix attempt v4
georgebanasios Jul 4, 2025
140a095
fix on input stream close method
georgebanasios Jul 4, 2025
c0f9beb
deadlock fix attempt v5
georgebanasios Jul 4, 2025
a2d0bcd
Revert "deadlock fix attempt v5"
georgebanasios Jul 4, 2025
d66fd1d
deadlock fix attempt v6
georgebanasios Jul 4, 2025
42f72c7
make method more efficient
georgebanasios Jul 5, 2025
4606fe4
fix response handler tests
georgebanasios Jul 5, 2025
6fdf2e7
make close method of netty binary data non blocking
georgebanasios Jul 5, 2025
623503c
cleanup connection on netty binary data
georgebanasios Jul 6, 2025
f085ece
add exception handling on cleanup handler
georgebanasios Jul 6, 2025
d66d154
Netty4ChannelBinaryData adjustments
georgebanasios Jul 6, 2025
aa1fba9
try to not block on send method
georgebanasios Jul 7, 2025
720b395
disable connection pooling functionality
georgebanasios Jul 8, 2025
670687f
adjust cleanups
georgebanasios Jul 9, 2025
c0022f4
use event for pipeline cleanup
georgebanasios Jul 9, 2025
7b3c8fe
attempt to synchronize the pipeline modification and cleaup for the s…
georgebanasios Jul 10, 2025
1a28409
self removal of handlers
georgebanasios Jul 10, 2025
ed9b857
attempt to fix race condition
georgebanasios Jul 10, 2025
fec1ffc
close clients after tests
georgebanasios Jul 13, 2025
ca36245
Revert "close clients after tests"
georgebanasios Jul 13, 2025
437616b
http2 connecetions
georgebanasios Jul 13, 2025
40ef6ee
synchronize connection pool
georgebanasios Jul 13, 2025
59f9240
Revert "synchronize connection pool"
georgebanasios Jul 14, 2025
9e65d78
check connection
georgebanasios Jul 16, 2025
84be996
Revert "check connection"
georgebanasios Jul 16, 2025
e092c9c
try to centralize waiting queue logic
georgebanasios Jul 16, 2025
df003e0
Revert "try to centralize waiting queue logic"
georgebanasios Jul 16, 2025
dbf1c69
enable half closure
georgebanasios Jul 16, 2025
9a3477b
add lock on acquire/release
georgebanasios Jul 16, 2025
6855d45
documentation fix & pr comments
georgebanasios Jul 19, 2025
814285d
fix ssl/alpn on pooled connections
georgebanasios Jul 21, 2025
91f72e0
Merge remote-tracking branch 'origin/main' into connection-pool-http-…
georgebanasios Jul 22, 2025
879c638
remove duplicate line & javadoc fix
georgebanasios Jul 22, 2025
d4694d8
add javadoc on netty provider
georgebanasios Jul 23, 2025
61be579
add HTTP client tests for pooled connections
georgebanasios Jul 23, 2025
5551d47
fix builder logging configuration logic
georgebanasios Jul 23, 2025
c64019a
fix merge conflicts
georgebanasios Aug 1, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Revert "deadlock fix v2"
This reverts commit 61c273b.
  • Loading branch information
georgebanasios committed Jul 3, 2025
commit 40b929ab8ddc149a5fea18edeb4a2799d4c120d8
Original file line number Diff line number Diff line change
Expand Up @@ -60,41 +60,45 @@ public LocalTestServer(HttpProtocolVersion supportedProtocol, boolean includeTls
httpConfig.addCustomizer(new SecureRequestCustomizer());
}

final ServerConnector httpConnector;
if (!includeTls) {
HttpConnectionFactory http11 = new HttpConnectionFactory(httpConfig);
httpConnector = new ServerConnector(server, http11);
} else {
List<ConnectionFactory> connectionFactories = new ArrayList<>();

// SSL/TLS connection factory
if (includeTls) {
String nextProtocol = supportedProtocol == null
? HttpVersion.HTTP_1_1.asString()
: (supportedProtocol == HttpProtocolVersion.HTTP_1_1) ? HttpVersion.HTTP_1_1.asString() : "alpn";

Security.addProvider(new OpenSSLProvider());
SslContextFactory.Server sslContextFactory = new SslContextFactory.Server();
sslContextFactory.setProvider("Conscrypt");
String mockKeyStore = Objects.toString(LocalTestServer.class.getResource("/keystore.jks"), null);
sslContextFactory.setKeyStorePath(mockKeyStore);
sslContextFactory.setKeyStorePassword("password");
sslContextFactory.setKeyManagerPassword("password");
sslContextFactory.setKeyStorePath(mockKeyStore);
sslContextFactory.setTrustStorePassword("password");
sslContextFactory.setTrustAll(true);

sslContextFactory.setCipherComparator(java.util.Comparator.comparingInt(String::hashCode));

SslConnectionFactory ssl = new SslConnectionFactory(sslContextFactory, "alpn");

HttpConnectionFactory http11 = new HttpConnectionFactory(httpConfig);
connectionFactories.add(new SslConnectionFactory(sslContextFactory, nextProtocol));
}

if (supportedProtocol == HttpProtocolVersion.HTTP_2) {
HTTP2ServerConnectionFactory http2 = new HTTP2ServerConnectionFactory(httpConfig);
ALPNServerConnectionFactory alpn = new ALPNServerConnectionFactory();
alpn.setDefaultProtocol(http11.getProtocol());
if (supportedProtocol == HttpProtocolVersion.HTTP_2) {
// ALPN connection factory
// HTTP/2 connection factory
ALPNServerConnectionFactory alpn = new ALPNServerConnectionFactory();
alpn.setDefaultProtocol("h2");
connectionFactories.add(alpn);
connectionFactories.add(new HTTP2ServerConnectionFactory(httpConfig));
}

httpConnector = new ServerConnector(server, ssl, alpn, http2, http11);
} else {
httpConnector = new ServerConnector(server, ssl, http11);
}
if (supportedProtocol == null || supportedProtocol == HttpProtocolVersion.HTTP_1_1) {
// HTTP/1.1 connection factory
connectionFactories.add(new HttpConnectionFactory(httpConfig));
}

this.connector = httpConnector;
this.connector.setHost("localhost");
server.addConnector(this.connector);
connector = new ServerConnector(server, connectionFactories.toArray(new ConnectionFactory[0]));
connector.setHost("localhost");
server.addConnector(connector);

ServletContextHandler servletContextHandler = new ServletContextHandler();
servletContextHandler.setContextPath("/");
Expand Down
4 changes: 0 additions & 4 deletions sdk/clientcore/http-netty4/spotbugs-exclude.xml
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,4 @@
<Bug pattern="VO_VOLATILE_REFERENCE_TO_ARRAY" />
<Class name="io.clientcore.http.netty4.implementation.Netty4ChannelBinaryData" />
</Match>
<Match>
<Bug pattern="RV_RETURN_VALUE_IGNORED_BAD_PRACTICE" />
<Class name="io.clientcore.http.netty4.implementation.Netty4ChannelInputStream" />
</Match>
</FindBugsFilter>
Original file line number Diff line number Diff line change
Expand Up @@ -156,26 +156,19 @@ public Response<BinaryData> send(HttpRequest request) {
}

response = new Response<>(request, info.getStatusCode(), info.getHeaders(), body);
Channel channel = info.getResponseChannel();
if (channel != null) {
cleanupPipeline(channel.pipeline());
}
} else {
// Otherwise we aren't finished, handle the remaining content according to the documentation in
// 'channelRead()'.
BinaryData body = BinaryData.empty();
ResponseBodyHandling bodyHandling = info.getResponseBodyHandling();
Channel channel = info.getResponseChannel();
ChannelPipeline pipeline = channel.pipeline();
if (bodyHandling == ResponseBodyHandling.IGNORE) {
// We're ignoring the response content.
CountDownLatch drainLatch = new CountDownLatch(1);
channel.pipeline().addLast(new Netty4EagerConsumeChannelHandler(drainLatch, ignored -> {
}, info.isHttp2()));
channel.config().setAutoRead(true);
awaitLatch(drainLatch);

cleanupPipeline(pipeline);
} else if (bodyHandling == ResponseBodyHandling.STREAM) {
// Body streaming uses a special BinaryData that tracks the firstContent read and the Channel it came
// from so it can be consumed when the BinaryData is being used.
Expand Down Expand Up @@ -205,8 +198,6 @@ public Response<BinaryData> send(HttpRequest request) {
channel.config().setAutoRead(true);
awaitLatch(drainLatch);

cleanupPipeline(pipeline);

body = BinaryData.fromBytes(info.getEagerContent().toByteArray());
}

Expand Down Expand Up @@ -362,11 +353,4 @@ private Netty4ConnectionPoolKey constructConnectionPoolKey(SocketAddress finalDe
return key;
}

private void cleanupPipeline(ChannelPipeline pipeline) {
Netty4PipelineCleanupHandler pipelineCleanupHandler = pipeline.get(Netty4PipelineCleanupHandler.class);
if (pipelineCleanupHandler != null) {
pipelineCleanupHandler.cleanup(pipeline.context(pipelineCleanupHandler), false);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -213,11 +213,6 @@ private void drainStream() {
throw CoreException.from(exception);
}
}

Netty4PipelineCleanupHandler cleanupHandler = channel.pipeline().get(Netty4PipelineCleanupHandler.class);
if (cleanupHandler != null) {
cleanupHandler.cleanup(channel.pipeline().context(cleanupHandler), false);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,8 @@
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.LinkedList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;

/**
* Implementation of {@link InputStream} that reads contents from a Netty {@link Channel}.
Expand All @@ -20,9 +18,6 @@ public final class Netty4ChannelInputStream extends InputStream {
private final boolean isHttp2;
private final Runnable onClose;

//Ensures the close operation is idempotent.
private final AtomicBoolean closed = new AtomicBoolean(false);

// Indicator for the Channel being fully read.
// This will become true before 'streamDone' becomes true, but both may become true in the same operation.
// Once this is true, the Channel will never be read again.
Expand All @@ -33,9 +28,9 @@ public final class Netty4ChannelInputStream extends InputStream {
// Once this is true, the stream will never return data again.
private boolean streamDone = false;

// Queue of byte[]s that maintains the last available contents from the Channel / eager content.
// A queue is needed as each Channel.read() may result in many channelRead calls.
private final Queue<byte[]> additionalBuffers;
// Linked list of byte[]s that maintains the last available contents from the Channel / eager content.
// A list is needed as each Channel.read() may result in many channelRead calls.
private final LinkedList<byte[]> additionalBuffers;

private byte[] currentBuffer;

Expand All @@ -62,7 +57,7 @@ public final class Netty4ChannelInputStream extends InputStream {
this.currentBuffer = new byte[0];
}
this.readIndex = 0;
this.additionalBuffers = new ConcurrentLinkedQueue<>();
this.additionalBuffers = new LinkedList<>();
this.channel = channel;
if (channel.pipeline().get(Netty4InitiateOneReadHandler.class) != null) {
channel.pipeline().remove(Netty4InitiateOneReadHandler.class);
Expand Down Expand Up @@ -183,21 +178,19 @@ public long skip(long n) throws IOException {
*/
@Override
public void close() throws IOException {
if (closed.compareAndSet(false, true)) {
try {
if (onClose != null) {
onClose.run();
}
} finally {
super.close();
streamDone = true;
try {
if (onClose != null && !streamDone) {
onClose.run();
}
} finally {
super.close();
streamDone = true;
}
}

private boolean setupNextBuffer() throws IOException {
if (!additionalBuffers.isEmpty()) {
currentBuffer = additionalBuffers.poll();
currentBuffer = additionalBuffers.pop();
readIndex = 0;
return true;
} else if (readMore()) {
Expand Down Expand Up @@ -232,7 +225,7 @@ private boolean readMore() throws IOException {
byte[] buffer = new byte[byteBuf.readableBytes()];
byteBuf.readBytes(buffer);

additionalBuffers.offer(buffer);
additionalBuffers.add(buffer);
}, isHttp2);
channel.pipeline().addLast(Netty4HandlerNames.READ_ONE, handler);
}
Expand Down Expand Up @@ -260,7 +253,7 @@ private boolean readMore() throws IOException {
}

if (!additionalBuffers.isEmpty()) {
currentBuffer = additionalBuffers.poll();
currentBuffer = additionalBuffers.pop();
readIndex = 0;
} else if (channelDone) { // Don't listen to IntelliJ here, channelDone may be false.
// This read contained no data and the channel completed, therefore the stream is also completed.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,38 +57,28 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) {
} else {
lastRead = msg instanceof LastHttpContent;
}

if (lastRead) {
latch.countDown();
}

ctx.fireChannelRead(msg);

} catch (IOException | RuntimeException ex) {
ReferenceCountUtil.release(msg);
latch.countDown();
Netty4PipelineCleanupHandler cleanupHandler = ctx.pipeline().get(Netty4PipelineCleanupHandler.class);
if (cleanupHandler != null) {
cleanupHandler.cleanup(ctx, true);
} else {
ctx.close();
}
ctx.fireExceptionCaught(ex);
cleanup(ctx);
}
}

@Override
public void channelReadComplete(ChannelHandlerContext ctx) {
ctx.fireChannelReadComplete();
if (lastRead) {
latch.countDown();
cleanup(ctx);
}
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
this.exception = cause;
latch.countDown();
ctx.fireExceptionCaught(cause);
cleanup(ctx);
}

Throwable channelException() {
Expand All @@ -98,13 +88,13 @@ Throwable channelException() {
// TODO (alzimmer): Are the latch countdowns needed for unregistering and inactivity?
@Override
public void channelUnregistered(ChannelHandlerContext ctx) {
latch.countDown();
cleanup(ctx);
ctx.fireChannelUnregistered();
}

@Override
public void channelInactive(ChannelHandlerContext ctx) {
latch.countDown();
cleanup(ctx);
ctx.fireChannelInactive();
}

Expand All @@ -115,7 +105,15 @@ public void handlerAdded(ChannelHandlerContext ctx) {
// an exception. Simply counting down the latch would cause the caller to receive
// an empty/incomplete data stream without any indication of the underlying network error.
ctx.fireExceptionCaught(new ClosedChannelException());
ctx.pipeline().remove(this);
}
}

private void cleanup(ChannelHandlerContext ctx) {
if (ctx.pipeline().get(Netty4EagerConsumeChannelHandler.class) != null) {
ctx.pipeline().remove(this);
}

latch.countDown();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,9 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) {
@Override
public void channelReadComplete(ChannelHandlerContext ctx) {
latch.countDown();
if (lastRead) {
ctx.pipeline().remove(this);
}
ctx.fireChannelReadComplete();
}

Expand All @@ -106,6 +109,9 @@ boolean isChannelConsumed() {
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
this.exception = cause;
latch.countDown();
if (ctx.pipeline().get(Netty4InitiateOneReadHandler.class) != null) {
ctx.pipeline().remove(this);
}
ctx.fireExceptionCaught(cause);
}

Expand All @@ -117,12 +123,18 @@ Throwable channelException() {
@Override
public void channelUnregistered(ChannelHandlerContext ctx) {
latch.countDown();
if (ctx.pipeline().get(Netty4InitiateOneReadHandler.class) != null) {
ctx.pipeline().remove(this);
}
ctx.fireChannelUnregistered();
}

@Override
public void channelInactive(ChannelHandlerContext ctx) {
latch.countDown();
if (ctx.pipeline().get(Netty4InitiateOneReadHandler.class) != null) {
ctx.pipeline().remove(this);
}
ctx.fireChannelInactive();
}

Expand All @@ -133,6 +145,7 @@ public void handlerAdded(ChannelHandlerContext ctx) {
// an exception. Simply counting down the latch would cause the caller to receive
// an empty/incomplete data stream without any indication of the underlying network error.
ctx.fireExceptionCaught(new ClosedChannelException());
ctx.pipeline().remove(this);
}
}
}
Loading
Loading