Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.StampedLock;

import static com.mongodb.assertions.Assertions.assertFalse;
import static com.mongodb.assertions.Assertions.assertNotNull;
Expand All @@ -77,7 +78,6 @@
@SuppressWarnings("deprecation")
class DefaultConnectionPool implements ConnectionPool {
private static final Logger LOGGER = Loggers.getLogger("connection");
private static final Executor SAME_THREAD_EXECUTOR = Runnable::run;
/**
* Is package-access for the purpose of testing and must not be used for any other purpose outside of this class.
*/
Expand All @@ -88,7 +88,7 @@ class DefaultConnectionPool implements ConnectionPool {
private final AtomicInteger generation = new AtomicInteger(0);
private final AtomicInteger lastPrunedGeneration = new AtomicInteger(0);
private final ScheduledExecutorService sizeMaintenanceTimer;
private ExecutorService asyncGetter;
private final Workers workers;
private final Runnable maintenanceTask;
private final ConnectionPoolListener connectionPoolListener;
private final ServerId serverId;
Expand All @@ -107,6 +107,7 @@ class DefaultConnectionPool implements ConnectionPool {
sizeMaintenanceTimer = createMaintenanceTimer();
connectionPoolCreated(connectionPoolListener, serverId, settings);
openConcurrencyLimiter = new OpenConcurrencyLimiter(MAX_CONNECTING);
workers = new Workers();
}

@Override
Expand Down Expand Up @@ -166,9 +167,9 @@ public void getAsync(final SingleResultCallback<InternalConnection> callback) {
}

if (immediateConnection != null) {
openAsync(immediateConnection, timeout, getAsyncGetter(), eventSendingCallback);
openAsync(immediateConnection, timeout, eventSendingCallback);
} else {
getAsyncGetter().execute(() -> {
workers.getter().execute(() -> {
if (timeout.expired()) {
eventSendingCallback.onResult(null, createTimeoutException(timeout));
return;
Expand All @@ -180,7 +181,7 @@ public void getAsync(final SingleResultCallback<InternalConnection> callback) {
eventSendingCallback.onResult(null, e);
return;
}
openAsync(connection, timeout, SAME_THREAD_EXECUTOR, eventSendingCallback);
openAsync(connection, timeout, eventSendingCallback);
});
}
}
Expand All @@ -205,7 +206,7 @@ private Throwable checkOutFailed(final Throwable t) {
return result;
}

private void openAsync(final PooledConnection pooledConnection, final Timeout timeout, final Executor executor,
private void openAsync(final PooledConnection pooledConnection, final Timeout timeout,
final SingleResultCallback<InternalConnection> callback) {
if (pooledConnection.opened()) {
if (LOGGER.isTraceEnabled()) {
Expand All @@ -218,20 +219,7 @@ private void openAsync(final PooledConnection pooledConnection, final Timeout ti
LOGGER.trace(format("Pooled connection %s to server %s is not yet open",
getId(pooledConnection), serverId));
}
executor.execute(() -> openConcurrencyLimiter.openAsyncOrGetAvailable(pooledConnection, timeout, callback));
}
}

private synchronized ExecutorService getAsyncGetter() {
if (asyncGetter == null) {
asyncGetter = Executors.newSingleThreadExecutor(new DaemonThreadFactory("AsyncGetter"));
}
return asyncGetter;
}

private synchronized void shutdownAsyncGetter() {
if (asyncGetter != null) {
asyncGetter.shutdownNow();
workers.opener().execute(() -> openConcurrencyLimiter.openAsyncOrGetAvailable(pooledConnection, timeout, callback));
}
}

Expand All @@ -249,7 +237,7 @@ public void close() {
if (sizeMaintenanceTimer != null) {
sizeMaintenanceTimer.shutdownNow();
}
shutdownAsyncGetter();
workers.close();
closed = true;
connectionPoolListener.connectionPoolClosed(new ConnectionPoolClosedEvent(serverId));
}
Expand Down Expand Up @@ -710,7 +698,7 @@ public Prune shouldPrune(final UsageTrackingInternalConnection usageTrackingConn
}

/**
* Package-private methods are thread-safe,
* Package-access methods are thread-safe,
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"package-private" is not a term the JLS uses, while "package-access" is.

* and only they should be called outside of the {@link OpenConcurrencyLimiter}'s code.
*/
@ThreadSafe
Expand Down Expand Up @@ -838,6 +826,7 @@ void openAsyncOrGetAvailable(
private PooledConnection acquirePermitOrGetAvailableOpenedConnection(final boolean tryGetAvailable, final Timeout timeout)
throws MongoTimeoutException {
PooledConnection availableConnection = null;
boolean expressedDesireToGetAvailableConnection = false;
tryLock(timeout);
try {
if (tryGetAvailable) {
Expand All @@ -855,6 +844,7 @@ private PooledConnection acquirePermitOrGetAvailableOpenedConnection(final boole
return availableConnection;
}
expressDesireToGetAvailableConnection();
expressedDesireToGetAvailableConnection = true;
}
long remainingNanos = timeout.remainingOrInfinite(NANOSECONDS);
while (permits == 0) {
Expand All @@ -865,7 +855,7 @@ private PooledConnection acquirePermitOrGetAvailableOpenedConnection(final boole
if (Timeout.expired(remainingNanos)) {
throw createTimeoutException(timeout);
}
remainingNanos = awaitNanos(remainingNanos);
remainingNanos = awaitNanos(condition, remainingNanos);
}
if (availableConnection == null) {
assertTrue(permits > 0);
Expand All @@ -874,7 +864,7 @@ private PooledConnection acquirePermitOrGetAvailableOpenedConnection(final boole
return availableConnection;
} finally {
try {
if (tryGetAvailable && availableConnection == null) {//the desired connection slot has not yet been removed
if (expressedDesireToGetAvailableConnection && availableConnection == null) {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This fixes the second problem I discovered while implementing pausable pool. getPooledConnectionImmediately there may throw an exception, which in turn may result in this code calling giveUpOnTryingToGetAvailableConnection despite expressDesireToGetAvailableConnection not being called.

This is an example of how one should try not to rely if possible on methods not throwing exceptions.

giveUpOnTryingToGetAvailableConnection();
}
} finally {
Expand Down Expand Up @@ -968,17 +958,15 @@ private void tryLock(final Timeout timeout) throws MongoTimeoutException {
}

/**
* Returns {@code timeoutNanos} if {@code timeoutNanos} is negative, otherwise returns 0 or a positive value.
*
* @param timeoutNanos Use a negative value for an infinite timeout,
* in which case {@link Condition#awaitNanos(long)} is called with {@link Long#MAX_VALUE}.
* @param timeoutNanos See {@link Timeout#startNow(long)}.
* @return The remaining duration as per {@link Timeout#remainingOrInfinite(TimeUnit)} if waiting ended early either
* spuriously or because of receiving a signal.
*/
private long awaitNanos(final long timeoutNanos) {
private long awaitNanos(final Condition condition, final long timeoutNanos) throws MongoInterruptedException {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change makes the behavior of this method consistent with Timeout and allows to see explicitly what condition it waits for on the calling side.

try {
if (timeoutNanos < 0) {
//noinspection ResultOfMethodCallIgnored
condition.awaitNanos(Long.MAX_VALUE);
return timeoutNanos;
if (timeoutNanos < 0 || timeoutNanos == Long.MAX_VALUE) {
condition.await();
return -1;
} else {
return Math.max(0, condition.awaitNanos(timeoutNanos));
}
Expand All @@ -997,4 +985,56 @@ private static final class MutableReference<T> {
private MutableReference() {
}
}

@ThreadSafe
private static class Workers implements AutoCloseable {
private volatile ExecutorService getter;
private volatile ExecutorService opener;
private final Lock lock;

Workers() {
lock = new StampedLock().asWriteLock();
}

Executor getter() {
if (getter == null) {
lock.lock();
try {
if (getter == null) {
getter = Executors.newSingleThreadExecutor(new DaemonThreadFactory("AsyncGetter"));
}
} finally {
lock.unlock();
}
}
return getter;
}

Executor opener() {
if (opener == null) {
lock.lock();
try {
if (opener == null) {
opener = Executors.newSingleThreadExecutor(new DaemonThreadFactory("AsyncOpener"));
}
} finally {
lock.unlock();
}
}
return opener;
}

@Override
public void close() {
try {
if (getter != null) {
getter.shutdownNow();
}
} finally {
if (opener != null) {
opener.shutdownNow();
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,12 @@
import com.mongodb.event.ConnectionCreatedEvent;
import com.mongodb.internal.Timeout;
import com.mongodb.internal.async.SingleResultCallback;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;

import java.util.ArrayList;
import java.util.Collection;
Expand All @@ -43,17 +46,18 @@
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Stream;

import static com.mongodb.internal.connection.DefaultConnectionPool.MAX_CONNECTING;
import static java.lang.Long.MAX_VALUE;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.MINUTES;
import static java.util.concurrent.TimeUnit.NANOSECONDS;
import static java.util.concurrent.TimeUnit.SECONDS;
import static junit.framework.TestCase.assertNotNull;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.fail;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
Expand All @@ -72,15 +76,16 @@ public class DefaultConnectionPoolTest {
private DefaultConnectionPool provider;
private ExecutorService cachedExecutor;

@Before
@BeforeEach
public void setUp() {
connectionFactory = new TestInternalConnectionFactory();
cachedExecutor = Executors.newCachedThreadPool();
}

@After
@AfterEach
@SuppressWarnings("try")
public void cleanup() throws InterruptedException {
//noinspection unused
try (DefaultConnectionPool closed = provider) {
cachedExecutor.shutdownNow();
//noinspection ResultOfMethodCallIgnored
Expand Down Expand Up @@ -236,18 +241,30 @@ public void shouldPruneAfterMaintenanceTaskRuns() throws InterruptedException {
assertTrue(connectionFactory.getCreatedConnections().get(0).isClosed());
}

@Test
public void concurrentUsage() throws InterruptedException {
int maxAvailableConnections = 7;
@ParameterizedTest
@MethodSource("concurrentUsageArguments")
public void concurrentUsage(final int minSize, final int maxSize, final int concurrentUsersCount,
final boolean checkoutSync, final boolean checkoutAsync, final float invalidateProb)
throws InterruptedException {
ControllableConnectionFactory controllableConnFactory = newControllableConnectionFactory(cachedExecutor);
provider = new DefaultConnectionPool(SERVER_ID, controllableConnFactory.factory, ConnectionPoolSettings.builder()
.maxSize(MAX_CONNECTING + maxAvailableConnections)
.minSize(2)
.minSize(minSize)
.maxSize(maxSize)
.maxWaitTime(TEST_WAIT_TIMEOUT_MILLIS, MILLISECONDS)
.maintenanceInitialDelay(0, NANOSECONDS)
.maintenanceFrequency(100, MILLISECONDS)
.build());
assertUseConcurrently(provider, 2 * maxAvailableConnections, cachedExecutor, SECONDS.toNanos(15));
assertUseConcurrently(provider, concurrentUsersCount, checkoutSync, checkoutAsync, invalidateProb,
cachedExecutor, SECONDS.toNanos(10));
}

private static Stream<Arguments> concurrentUsageArguments() {
return Stream.of(
Arguments.of(0, 1, 8, true, false, 0.02f),
Arguments.of(0, 1, 8, false, true, 0.02f),
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Of all 5 parameter sets, only this one reproduced the deadlock.

Arguments.of(Math.min(3, MAX_CONNECTING), MAX_CONNECTING, 8, true, true, 0f),
Arguments.of(MAX_CONNECTING + 5, MAX_CONNECTING + 5, 2 * (MAX_CONNECTING + 5), true, true, 0.02f),
Arguments.of(Math.min(3, MAX_CONNECTING), MAX_CONNECTING + 5, 2 * (MAX_CONNECTING + 5), true, true, 0.9f));
}

@Test
Expand All @@ -262,7 +279,8 @@ public void callbackShouldNotBlockCheckoutIfOpenAsyncWorksNotInCurrentThread() t
.maintenanceInitialDelay(MAX_VALUE, NANOSECONDS)
.build());
acquireOpenPermits(provider, MAX_CONNECTING, InfiniteCheckoutEmulation.INFINITE_CALLBACK, controllableConnFactory, listener);
assertUseConcurrently(provider, 2 * maxAvailableConnections, cachedExecutor, SECONDS.toNanos(10));
assertUseConcurrently(provider, 2 * maxAvailableConnections, true, true, 0.02f,
cachedExecutor, SECONDS.toNanos(10));
}

/**
Expand Down Expand Up @@ -339,33 +357,36 @@ public void checkoutHandOverMechanism() throws InterruptedException, TimeoutExce
}

private static void assertUseConcurrently(final DefaultConnectionPool pool, final int concurrentUsersCount,
final boolean sync, final boolean async, final float invalidateProb,
final ExecutorService executor, final long durationNanos) throws InterruptedException {
try {
useConcurrently(pool, concurrentUsersCount, executor, durationNanos);
useConcurrently(pool, concurrentUsersCount, sync, async, invalidateProb, executor, durationNanos);
} catch (TimeoutException | ExecutionException e) {
throw new AssertionError(e);
}
}

private static void useConcurrently(final DefaultConnectionPool pool, final int concurrentUsersCount,
final boolean checkoutSync, final boolean checkoutAsync, final float invalidateProb,
final ExecutorService executor, final long durationNanos)
throws ExecutionException, InterruptedException, TimeoutException {
assertTrue(invalidateProb >= 0 && invalidateProb <= 1);
Runnable spontaneouslyInvalidate = () -> {
if (ThreadLocalRandom.current().nextFloat() < 0.02) {
if (ThreadLocalRandom.current().nextFloat() < invalidateProb) {
pool.invalidate();
}
};
Collection<Future<?>> tasks = new ArrayList<>();
Timeout duration = Timeout.startNow(durationNanos);
for (int i = 0; i < concurrentUsersCount; i++) {
if (i % 2 == 0) {//check out synchronously
if ((checkoutSync && checkoutAsync) ? i % 2 == 0 : checkoutSync) {//check out synchronously
tasks.add(executor.submit(() -> {
while (!(duration.expired() || Thread.currentThread().isInterrupted())) {
spontaneouslyInvalidate.run();
pool.get(TEST_WAIT_TIMEOUT_MILLIS, MILLISECONDS).close();
}
}));
} else {//check out asynchronously
} else if (checkoutAsync) {//check out asynchronously
tasks.add(executor.submit(() -> {
while (!(duration.expired() || Thread.currentThread().isInterrupted())) {
spontaneouslyInvalidate.run();
Expand Down