Skip to content

Commit e092c9c

Browse files
try to centralize waiting queue logic
1 parent 84be996 commit e092c9c

File tree

1 file changed

+80
-113
lines changed

1 file changed

+80
-113
lines changed

sdk/clientcore/http-netty4/src/main/java/io/clientcore/http/netty4/implementation/Netty4ConnectionPool.java

Lines changed: 80 additions & 113 deletions
Original file line numberDiff line numberDiff line change
@@ -275,6 +275,7 @@ private class PerRoutePool {
275275
private final Deque<Promise<Channel>> pendingAcquirers = new ConcurrentLinkedDeque<>();
276276
// Counter for all connections for a specific route (active and idle).
277277
private final AtomicInteger totalConnections = new AtomicInteger(0);
278+
private final AtomicBoolean dispatching = new AtomicBoolean(false);
278279
private final Netty4ConnectionPoolKey key;
279280
private final SocketAddress route;
280281
private final boolean isHttps;
@@ -310,145 +311,111 @@ Future<Channel> acquire() {
310311
return connection.channel.eventLoop().newSucceededFuture(connection.channel);
311312
}
312313

313-
// No idle connections, we need to either create a new one or queue.
314-
int currentTotal = totalConnections.getAndIncrement();
315-
if (currentTotal < maxConnectionsPerRoute) {
316-
return createNewConnection();
314+
Promise<Channel> promise = bootstrap.config().group().next().newPromise();
315+
if (pendingAcquirers.size() >= maxPendingAcquires) {
316+
promise.setFailure(CoreException.from("Pending acquisition queue is full."));
317+
return promise;
317318
}
318319

319-
// Pool is full, decrement the counter back and queue the request.
320-
totalConnections.getAndDecrement();
321-
return queueAcquireRequest();
320+
pendingAcquirers.offer(promise);
321+
dispatch();
322+
323+
if (pendingAcquireTimeout != null) {
324+
promise.addListener(future -> {
325+
if (future.isCancelled()) {
326+
pendingAcquirers.remove(promise);
327+
}
328+
});
329+
330+
bootstrap.config().group().schedule(() -> {
331+
if (promise.tryFailure(
332+
CoreException.from("Connection acquisition timed out after " + pendingAcquireTimeout))) {
333+
pendingAcquirers.remove(promise);
334+
}
335+
}, pendingAcquireTimeout.toMillis(), TimeUnit.MILLISECONDS);
336+
}
337+
return promise;
322338
}
323339

324340
void release(PooledConnection connection) {
325341
if (!isHealthy(connection)) {
326-
connection.close(); // The close listener will handle decrementing the counter.
342+
connection.close();
327343
return;
328344
}
329-
330345
connection.idleSince = OffsetDateTime.now(ZoneOffset.UTC);
331-
332-
// Offer to the idle queue and then try to satisfy pending waiters.
333346
idleConnections.offer(connection);
334-
satisfyWaiter();
347+
dispatch();
335348
}
336349

337-
private PooledConnection pollIdleConnection() {
338-
while (true) {
339-
PooledConnection connection = idleConnections.poll();
340-
if (connection == null) {
341-
return null;
342-
}
343-
344-
if (isHealthy(connection)) {
345-
connection.idleSince = null; // Mark as active
346-
return connection;
347-
}
348-
349-
connection.close(); // The close listener will handle decrementing the counter.
350-
}
351-
}
352-
353-
/**
354-
* Queues a new promise for a connection.
355-
* This is called when the pool is at max capacity.
356-
*
357-
* @return A Future that will be completed later.
358-
*/
359-
private Future<Channel> queueAcquireRequest() {
360-
if (pendingAcquirers.size() >= maxPendingAcquires) {
361-
return bootstrap.config()
362-
.group()
363-
.next()
364-
.newFailedFuture(CoreException.from("Pending acquisition queue is full."));
350+
private void dispatch() {
351+
if (!dispatching.compareAndSet(false, true)) {
352+
return;
365353
}
366354

367-
Promise<Channel> promise = bootstrap.config().group().next().newPromise();
368-
promise.addListener(future -> {
369-
if (future.isCancelled()) {
370-
pendingAcquirers.remove(promise);
371-
}
372-
});
373-
pendingAcquirers.offer(promise);
374-
if (pendingAcquireTimeout != null) {
375-
bootstrap.config().group().schedule(() -> {
376-
if (!promise.isDone()) {
377-
promise.tryFailure(
378-
CoreException.from("Connection acquisition timed out after " + pendingAcquireTimeout));
355+
try {
356+
while (!pendingAcquirers.isEmpty()) {
357+
PooledConnection idleConnection = pollIdleConnection();
358+
if (idleConnection != null) {
359+
Promise<Channel> waiter = pollNextWaiter();
360+
if (waiter != null) {
361+
if (!waiter.trySuccess(idleConnection.channel)) {
362+
release(idleConnection);
363+
}
364+
} else {
365+
idleConnections.addFirst(idleConnection);
366+
break;
367+
}
368+
continue;
379369
}
380-
}, pendingAcquireTimeout.toMillis(), TimeUnit.MILLISECONDS);
381-
}
382-
satisfyWaiter();
383-
return promise;
384-
}
385370

386-
/**
387-
* This is the core logic that matches pending waiters with available resources.
388-
* It can be triggered when a connection is released, or when a slot opens up.
389-
*/
390-
private void satisfyWaiter() {
391-
if (pendingAcquirers.isEmpty()) {
392-
return;
393-
}
371+
if (totalConnections.get() < maxConnectionsPerRoute) {
372+
if (totalConnections.incrementAndGet() > maxConnectionsPerRoute) {
373+
totalConnections.decrementAndGet();
374+
break;
375+
}
394376

395-
// First, try to get a ready-to-use idle connection.
396-
PooledConnection idleConnection = pollIdleConnection();
397-
if (idleConnection != null) {
398-
Promise<Channel> waiter = pollNextWaiter();
399-
if (waiter != null) {
400-
if (!waiter.trySuccess(idleConnection.channel)) {
401-
// Waiter was canceled, release the connection back.
402-
release(idleConnection);
377+
Promise<Channel> waiter = pollNextWaiter();
378+
if (waiter != null) {
379+
createNewConnection().addListener(future -> {
380+
if (future.isSuccess()) {
381+
if (!waiter.trySuccess((Channel) future.getNow())) {
382+
release(((Channel) future.getNow()).attr(POOLED_CONNECTION_KEY).get());
383+
}
384+
} else {
385+
waiter.tryFailure(future.cause());
386+
}
387+
});
388+
} else {
389+
totalConnections.decrementAndGet();
390+
}
391+
} else {
392+
break;
403393
}
404-
} else {
405-
// No waiter, put the connection back in the idle queue.
406-
idleConnections.addFirst(idleConnection);
407394
}
408-
return;
395+
} finally {
396+
dispatching.set(false);
409397
}
398+
}
410399

411-
// No idle connections, try to create a new one if there is capacity.
400+
private PooledConnection pollIdleConnection() {
412401
while (true) {
413-
int currentTotal = totalConnections.get();
414-
if (currentTotal >= maxConnectionsPerRoute) {
415-
// No capacity, can't create a new connection.
416-
return;
402+
PooledConnection connection = idleConnections.poll();
403+
if (connection == null) {
404+
return null;
417405
}
418-
419-
if (totalConnections.compareAndSet(currentTotal, currentTotal + 1)) {
420-
// We successfully reserved a slot for a new connection.
421-
Promise<Channel> waiter = pollNextWaiter();
422-
if (waiter != null) {
423-
// Create a new connection for this specific waiter.
424-
createNewConnection().addListener(future -> {
425-
if (future.isSuccess()) {
426-
if (!waiter.trySuccess((Channel) future.getNow())) {
427-
// The Waiter was canceled while we were connecting.
428-
// Release the new connection.
429-
release(((Channel) future.getNow()).attr(POOLED_CONNECTION_KEY).get());
430-
}
431-
} else {
432-
// Connection failed. The close listener on the (failed) channel
433-
// will decrement totalConnections and trigger another satisfyWaiter call.
434-
waiter.tryFailure(future.cause());
435-
}
436-
});
437-
} else {
438-
// We reserved a slot, but there's no waiter. Release the slot.
439-
totalConnections.decrementAndGet();
440-
}
441-
return;
406+
if (isHealthy(connection)) {
407+
connection.idleSince = null; // Mark as active
408+
return connection;
442409
}
443-
// CAS failed, another thread acted. Loop to retry.
410+
connection.close();
444411
}
445412
}
446413

447414
private Promise<Channel> pollNextWaiter() {
448415
while (true) {
449416
Promise<Channel> waiter = pendingAcquirers.poll();
450417
if (waiter == null) {
451-
return null; // Queue is empty
418+
return null;
452419
}
453420
if (!waiter.isCancelled()) {
454421
return waiter;
@@ -493,16 +460,16 @@ public void initChannel(Channel channel) throws SSLException {
493460
newConnectionBootstrap.connect(route).addListener(future -> {
494461
if (!future.isSuccess()) {
495462
LOGGER.atError().setThrowable(future.cause()).log("Failed connection.");
496-
totalConnections.getAndDecrement();
497-
satisfyWaiter();
463+
totalConnections.decrementAndGet();
464+
dispatch();
498465
promise.setFailure(future.cause());
499466
return;
500467
}
501468

502469
Channel newChannel = ((ChannelFuture) future).channel();
503470
newChannel.closeFuture().addListener(closeFuture -> {
504-
totalConnections.getAndDecrement();
505-
satisfyWaiter();
471+
totalConnections.decrementAndGet();
472+
dispatch();
506473
});
507474

508475
Runnable connectionReadyRunner = () -> {
@@ -539,7 +506,7 @@ public void initChannel(Channel channel) throws SSLException {
539506
}
540507
});
541508
} else {
542-
promise.setSuccess(newChannel);
509+
connectionReadyRunner.run();
543510
}
544511
});
545512
return promise;

0 commit comments

Comments
 (0)