-
Notifications
You must be signed in to change notification settings - Fork 1.5k
Fix deadlock and couple more problems in DefaultConnectionPool
#699
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
ec11550
9e73482
8d68448
5ec1763
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -62,6 +62,7 @@ | |
| import java.util.concurrent.locks.Condition; | ||
| import java.util.concurrent.locks.Lock; | ||
| import java.util.concurrent.locks.ReentrantLock; | ||
| import java.util.concurrent.locks.StampedLock; | ||
|
|
||
| import static com.mongodb.assertions.Assertions.assertFalse; | ||
| import static com.mongodb.assertions.Assertions.assertNotNull; | ||
|
|
@@ -77,7 +78,6 @@ | |
| @SuppressWarnings("deprecation") | ||
| class DefaultConnectionPool implements ConnectionPool { | ||
| private static final Logger LOGGER = Loggers.getLogger("connection"); | ||
| private static final Executor SAME_THREAD_EXECUTOR = Runnable::run; | ||
| /** | ||
| * Is package-access for the purpose of testing and must not be used for any other purpose outside of this class. | ||
| */ | ||
|
|
@@ -88,7 +88,7 @@ class DefaultConnectionPool implements ConnectionPool { | |
| private final AtomicInteger generation = new AtomicInteger(0); | ||
| private final AtomicInteger lastPrunedGeneration = new AtomicInteger(0); | ||
| private final ScheduledExecutorService sizeMaintenanceTimer; | ||
| private ExecutorService asyncGetter; | ||
| private final Workers workers; | ||
| private final Runnable maintenanceTask; | ||
| private final ConnectionPoolListener connectionPoolListener; | ||
| private final ServerId serverId; | ||
|
|
@@ -107,6 +107,7 @@ class DefaultConnectionPool implements ConnectionPool { | |
| sizeMaintenanceTimer = createMaintenanceTimer(); | ||
| connectionPoolCreated(connectionPoolListener, serverId, settings); | ||
| openConcurrencyLimiter = new OpenConcurrencyLimiter(MAX_CONNECTING); | ||
| workers = new Workers(); | ||
| } | ||
|
|
||
| @Override | ||
|
|
@@ -166,9 +167,9 @@ public void getAsync(final SingleResultCallback<InternalConnection> callback) { | |
| } | ||
|
|
||
| if (immediateConnection != null) { | ||
| openAsync(immediateConnection, timeout, getAsyncGetter(), eventSendingCallback); | ||
| openAsync(immediateConnection, timeout, eventSendingCallback); | ||
| } else { | ||
| getAsyncGetter().execute(() -> { | ||
| workers.getter().execute(() -> { | ||
| if (timeout.expired()) { | ||
| eventSendingCallback.onResult(null, createTimeoutException(timeout)); | ||
| return; | ||
|
|
@@ -180,7 +181,7 @@ public void getAsync(final SingleResultCallback<InternalConnection> callback) { | |
| eventSendingCallback.onResult(null, e); | ||
| return; | ||
| } | ||
| openAsync(connection, timeout, SAME_THREAD_EXECUTOR, eventSendingCallback); | ||
| openAsync(connection, timeout, eventSendingCallback); | ||
| }); | ||
| } | ||
| } | ||
|
|
@@ -205,7 +206,7 @@ private Throwable checkOutFailed(final Throwable t) { | |
| return result; | ||
| } | ||
|
|
||
| private void openAsync(final PooledConnection pooledConnection, final Timeout timeout, final Executor executor, | ||
| private void openAsync(final PooledConnection pooledConnection, final Timeout timeout, | ||
| final SingleResultCallback<InternalConnection> callback) { | ||
| if (pooledConnection.opened()) { | ||
| if (LOGGER.isTraceEnabled()) { | ||
|
|
@@ -218,20 +219,7 @@ private void openAsync(final PooledConnection pooledConnection, final Timeout ti | |
| LOGGER.trace(format("Pooled connection %s to server %s is not yet open", | ||
| getId(pooledConnection), serverId)); | ||
| } | ||
| executor.execute(() -> openConcurrencyLimiter.openAsyncOrGetAvailable(pooledConnection, timeout, callback)); | ||
| } | ||
| } | ||
|
|
||
| private synchronized ExecutorService getAsyncGetter() { | ||
| if (asyncGetter == null) { | ||
| asyncGetter = Executors.newSingleThreadExecutor(new DaemonThreadFactory("AsyncGetter")); | ||
| } | ||
| return asyncGetter; | ||
| } | ||
|
|
||
| private synchronized void shutdownAsyncGetter() { | ||
| if (asyncGetter != null) { | ||
| asyncGetter.shutdownNow(); | ||
| workers.opener().execute(() -> openConcurrencyLimiter.openAsyncOrGetAvailable(pooledConnection, timeout, callback)); | ||
| } | ||
| } | ||
|
|
||
|
|
@@ -249,7 +237,7 @@ public void close() { | |
| if (sizeMaintenanceTimer != null) { | ||
| sizeMaintenanceTimer.shutdownNow(); | ||
| } | ||
| shutdownAsyncGetter(); | ||
| workers.close(); | ||
| closed = true; | ||
| connectionPoolListener.connectionPoolClosed(new ConnectionPoolClosedEvent(serverId)); | ||
| } | ||
|
|
@@ -710,7 +698,7 @@ public Prune shouldPrune(final UsageTrackingInternalConnection usageTrackingConn | |
| } | ||
|
|
||
| /** | ||
| * Package-private methods are thread-safe, | ||
| * Package-access methods are thread-safe, | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. "package-private" is not a term the JLS uses, while "package-access" is. |
||
| * and only they should be called outside of the {@link OpenConcurrencyLimiter}'s code. | ||
| */ | ||
| @ThreadSafe | ||
|
|
@@ -745,10 +733,10 @@ void openImmediately(final PooledConnection connection) throws MongoTimeoutExcep | |
| * one becomes available while waiting for a permit. | ||
| * The first phase has one of the following outcomes: | ||
| * <ol> | ||
| * <li>A {@link MongoTimeoutException} or a different {@link Exception} is thrown.</li> | ||
| * <li>A {@link MongoTimeoutException} or a different {@link Exception} is thrown, | ||
| * and the specified {@code connection} is {@linkplain PooledConnection#closeSilently() silently closed}.</li> | ||
| * <li>An opened connection different from the specified one is returned, | ||
| * and the specified {@code connection} is {@linkplain PooledConnection#closeSilently() silently closed}. | ||
| * </li> | ||
| * and the specified {@code connection} is {@linkplain PooledConnection#closeSilently() silently closed}.</li> | ||
| * <li>A permit is acquired, {@link #connectionCreated(ConnectionPoolListener, ConnectionId)} is reported | ||
| * and an attempt to open the specified {@code connection} is made. This is the second phase in which | ||
| * the {@code connection} is {@linkplain PooledConnection#open() opened synchronously}. | ||
|
|
@@ -838,6 +826,7 @@ void openAsyncOrGetAvailable( | |
| private PooledConnection acquirePermitOrGetAvailableOpenedConnection(final boolean tryGetAvailable, final Timeout timeout) | ||
| throws MongoTimeoutException { | ||
| PooledConnection availableConnection = null; | ||
| boolean expressedDesireToGetAvailableConnection = false; | ||
| tryLock(timeout); | ||
| try { | ||
| if (tryGetAvailable) { | ||
|
|
@@ -855,6 +844,7 @@ private PooledConnection acquirePermitOrGetAvailableOpenedConnection(final boole | |
| return availableConnection; | ||
| } | ||
| expressDesireToGetAvailableConnection(); | ||
| expressedDesireToGetAvailableConnection = true; | ||
| } | ||
| long remainingNanos = timeout.remainingOrInfinite(NANOSECONDS); | ||
| while (permits == 0) { | ||
|
|
@@ -865,7 +855,7 @@ private PooledConnection acquirePermitOrGetAvailableOpenedConnection(final boole | |
| if (Timeout.expired(remainingNanos)) { | ||
| throw createTimeoutException(timeout); | ||
| } | ||
| remainingNanos = awaitNanos(remainingNanos); | ||
| remainingNanos = awaitNanos(condition, remainingNanos); | ||
| } | ||
| if (availableConnection == null) { | ||
| assertTrue(permits > 0); | ||
|
|
@@ -874,7 +864,7 @@ private PooledConnection acquirePermitOrGetAvailableOpenedConnection(final boole | |
| return availableConnection; | ||
| } finally { | ||
| try { | ||
| if (tryGetAvailable && availableConnection == null) {//the desired connection slot has not yet been removed | ||
| if (expressedDesireToGetAvailableConnection && availableConnection == null) { | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This fixes the second problem I discovered while implementing pausable pool. This is an example of how one should try not to rely if possible on methods not throwing exceptions. |
||
| giveUpOnTryingToGetAvailableConnection(); | ||
| } | ||
| } finally { | ||
|
|
@@ -968,17 +958,15 @@ private void tryLock(final Timeout timeout) throws MongoTimeoutException { | |
| } | ||
|
|
||
| /** | ||
| * Returns {@code timeoutNanos} if {@code timeoutNanos} is negative, otherwise returns 0 or a positive value. | ||
| * | ||
| * @param timeoutNanos Use a negative value for an infinite timeout, | ||
| * in which case {@link Condition#awaitNanos(long)} is called with {@link Long#MAX_VALUE}. | ||
| * @param timeoutNanos See {@link Timeout#startNow(long)}. | ||
| * @return The remaining duration as per {@link Timeout#remainingOrInfinite(TimeUnit)} if waiting ended early either | ||
| * spuriously or because of receiving a signal. | ||
| */ | ||
| private long awaitNanos(final long timeoutNanos) { | ||
| private long awaitNanos(final Condition condition, final long timeoutNanos) throws MongoInterruptedException { | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This change makes the behavior of this method consistent with |
||
| try { | ||
| if (timeoutNanos < 0) { | ||
| //noinspection ResultOfMethodCallIgnored | ||
| condition.awaitNanos(Long.MAX_VALUE); | ||
| return timeoutNanos; | ||
| if (timeoutNanos < 0 || timeoutNanos == Long.MAX_VALUE) { | ||
| condition.await(); | ||
| return -1; | ||
jyemin marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| } else { | ||
| return Math.max(0, condition.awaitNanos(timeoutNanos)); | ||
| } | ||
|
|
@@ -997,4 +985,56 @@ private static final class MutableReference<T> { | |
| private MutableReference() { | ||
| } | ||
| } | ||
|
|
||
| @ThreadSafe | ||
| private static class Workers implements AutoCloseable { | ||
| private volatile ExecutorService getter; | ||
| private volatile ExecutorService opener; | ||
| private final Lock lock; | ||
|
|
||
| Workers() { | ||
| lock = new StampedLock().asWriteLock(); | ||
| } | ||
|
|
||
| Executor getter() { | ||
| if (getter == null) { | ||
| lock.lock(); | ||
| try { | ||
| if (getter == null) { | ||
| getter = Executors.newSingleThreadExecutor(new DaemonThreadFactory("AsyncGetter")); | ||
| } | ||
| } finally { | ||
| lock.unlock(); | ||
| } | ||
| } | ||
| return getter; | ||
| } | ||
|
|
||
| Executor opener() { | ||
| if (opener == null) { | ||
| lock.lock(); | ||
| try { | ||
| if (opener == null) { | ||
| opener = Executors.newSingleThreadExecutor(new DaemonThreadFactory("AsyncOpener")); | ||
| } | ||
| } finally { | ||
| lock.unlock(); | ||
| } | ||
| } | ||
| return opener; | ||
| } | ||
|
|
||
| @Override | ||
| public void close() { | ||
| try { | ||
| if (getter != null) { | ||
| getter.shutdownNow(); | ||
| } | ||
| } finally { | ||
| if (opener != null) { | ||
| opener.shutdownNow(); | ||
| } | ||
| } | ||
| } | ||
| } | ||
| } | ||
Uh oh!
There was an error while loading. Please reload this page.