Skip to content
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
b16a727
Refactor ConcurrentPool so that it becomes agnostic of the item initi…
stIncMale Mar 4, 2021
f21d299
Implement maxConnecting in DefaultConnectionPool.get
stIncMale Mar 10, 2021
9371a8b
Add specification tests
stIncMale Mar 10, 2021
388a7a7
Implement integration style CMAP specification tests
stIncMale Mar 10, 2021
1e75fe0
Implement maxConnecting in DefaultConnectionPool.getAsync
stIncMale Mar 16, 2021
8360c76
Fix static validation issues
stIncMale Mar 16, 2021
045fe13
Implement runOn in AbstractConnectionPoolTest
stIncMale Mar 16, 2021
5e5c7c0
Send ConnectionCheckOutFailedEvent with Reason.CONNECTION_ERROR if fa…
stIncMale Mar 17, 2021
459466a
Address the first set of review concerns
stIncMale Mar 17, 2021
4dae245
Add an test for DefaultConnectionPool.getAsync that checks for Reason…
stIncMale Mar 18, 2021
5d203f7
Simplify DefaultConnectionPoolSpecification.selectConnectionAsync
stIncMale Mar 18, 2021
3d54d9d
Simplify ConcurrentPool.ensureMinSize
stIncMale Mar 23, 2021
26a4a7b
Replace Java assert statements with our assert methods
stIncMale Mar 24, 2021
e2bd080
Merge branch 'master' into JAVA-3927
stIncMale Mar 24, 2021
45d1891
Address naming and try-finally review concerns
stIncMale Mar 24, 2021
49a3e3c
Fix ensureMinSize call in ConcurrentPoolTest
stIncMale Mar 24, 2021
fbd5a97
Add tests for Timeout
stIncMale Mar 25, 2021
6eaf542
Introduce sync and async methods by duplicating the logic of openOrSi…
stIncMale Mar 25, 2021
f5967d1
Make it clearer that releasePermit and tryHandOver methods acquire th…
stIncMale Mar 26, 2021
1e29f20
Replace Timeout.remainingNanos with remaining(TimeUnit)
stIncMale Mar 26, 2021
d3a9195
Address review concerns
stIncMale Mar 31, 2021
fa3c23c
Undo changes in the ConnectionPoolListener specification
stIncMale Apr 1, 2021
0fedc2d
Improve docs
stIncMale Apr 3, 2021
22d3afc
Add concurrency tests for DefaultConnectionPool
stIncMale Apr 3, 2021
fbdd763
Assert all checked in connections are handed over in DefaultConnectio…
stIncMale Apr 5, 2021
ba98f3d
Address review concerns
stIncMale Apr 5, 2021
b04ff8f
Do not confuse a user with timeout exceptions caused by timeout excep…
stIncMale Apr 6, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions config/findbugs-exclude.xml
Original file line number Diff line number Diff line change
Expand Up @@ -190,4 +190,10 @@
<Bug pattern="VA_FORMAT_STRING_USES_NEWLINE"/>
</Match>

<!-- The return value of Condition.awaitNanos is ignored on purpose for infinite timeouts. -->
<Match>
<Class name="com.mongodb.internal.connection.DefaultConnectionPool$OpenConcurrencyLimiter"/>
<Method name="awaitNanos"/>
<Bug pattern="RV_RETURN_VALUE_IGNORED_BAD_PRACTICE"/>
</Match>
</FindBugsFilter>
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

/**
* A listener for connection pool-related events.
* Methods of this interface must not throw {@link Exception}s.
*
* @since 3.5
*/
Expand Down
188 changes: 188 additions & 0 deletions driver-core/src/main/com/mongodb/internal/Timeout.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,188 @@
/*
* Copyright 2008-present MongoDB, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.mongodb.internal;

import com.mongodb.annotations.Immutable;

import java.util.Objects;
import java.util.concurrent.TimeUnit;

import static com.mongodb.assertions.Assertions.notNull;
import static java.util.concurrent.TimeUnit.NANOSECONDS;

/**
* A <a href="https://docs.oracle.com/javase/8/docs/api/java/lang/doc-files/ValueBased.html">value-based</a> class
* useful for tracking timeouts.
*/
@Immutable
public final class Timeout {
private static final Timeout INFINITE = new Timeout(-1, 0);
private static final Timeout IMMEDIATE = new Timeout(0, 0);

private final long durationNanos;
private final long startNanos;

private Timeout(final long durationNanos, final long startNanos) {
this.durationNanos = durationNanos;
this.startNanos = startNanos;
}

/**
* @see #startNow(long)
*/
public static Timeout startNow(final long duration, final TimeUnit unit) {
return startNow(notNull("unit", unit).toNanos(duration));
}

/**
* Returns an {@linkplain #isInfinite() infinite} timeout if {@code durationNanos} is either negative
* or is equal to {@link Long#MAX_VALUE},
* an {@linkplain #isImmediate() immediate} timeout if {@code durationNanos} is 0,
* otherwise an object that represents the specified {@code durationNanos}.
*/
public static Timeout startNow(final long durationNanos) {
if (durationNanos < 0 || durationNanos == Long.MAX_VALUE) {
return INFINITE;
} else if (durationNanos == 0) {
return IMMEDIATE;
} else {
return new Timeout(durationNanos, System.nanoTime());
}
}

/**
* @see #startNow(long)
*/
public static Timeout infinite() {
return INFINITE;
}

/**
* @see #startNow(long)
*/
public static Timeout immediate() {
return IMMEDIATE;
}

/**
* Returns 0 or a positive value.
* Must not be called on {@linkplain #isInfinite() infinite} or {@linkplain #isImmediate() immediate} timeouts.
*/
private long elapsedNanos() {
assert !(isInfinite() || isImmediate());
return System.nanoTime() - startNanos;
}

/**
* Returns 0 or a positive value.
* Use {@link #expired(long)} to check if the returned value signifies that a timeout is expired.
*
* @throws UnsupportedOperationException If the timeout is {@linkplain #isInfinite() infinite}.
* @see #remainingNanosOrInfinite()
*/
public long remainingNanos() throws UnsupportedOperationException {
if (isInfinite()) {
throw new UnsupportedOperationException();
}
return isImmediate() ? 0 : Math.max(0, durationNanos - elapsedNanos());
}

/**
* Returns a negative value for {@linkplain #isInfinite() infinite} timeouts, otherwise 0 or a positive value.
* Use {@link #expired(long)} to check if the returned value signifies that a timeout is expired.
*
* @see #remainingNanos()
*/
public long remainingNanosOrInfinite() {
return isInfinite() ? -1 : remainingNanos();
}

/**
* @see #expired(long)
*/
public boolean expired() {
return expired(remainingNanosOrInfinite());
}

/**
* Returns {@code true} if and only if {@code remainingNanos} is 0.
*
* @see #remainingNanos()
* @see #expired()
*/
public static boolean expired(final long remainingNanos) {
return remainingNanos == 0;
}

/**
* @return {@code true} if and only if the timeout duration is considered to be infinite.
*/
public boolean isInfinite() {
return equals(INFINITE);
}

/**
* @return {@code true} if and only if the timeout duration is 0.
*/
public boolean isImmediate() {
return equals(IMMEDIATE);
}

@Override
public boolean equals(final Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
final Timeout other = (Timeout) o;
return durationNanos == other.durationNanos && startNanos == other.startNanos;
}

@Override
public int hashCode() {
return Objects.hash(durationNanos, startNanos);
}

/**
* This method is useful for debugging.
*
* @see #toUserString()
*/
@Override
public String toString() {
return "Timeout{"
+ "durationNanos=" + durationNanos
+ ", startNanos=" + startNanos
+ '}';
}

/**
* Returns a user-friendly representation. Examples: 1500 ms, infinite, 0 ms (immediate).
*
* @see #toString()
*/
public String toUserString() {
if (isInfinite()) {
return "infinite";
} else if (isImmediate()) {
return "0 ms (immediate)";
} else {
return NANOSECONDS.toMillis(durationNanos) + " ms";
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,12 @@
import com.mongodb.MongoInterruptedException;
import com.mongodb.MongoTimeoutException;
import com.mongodb.internal.connection.ConcurrentLinkedDeque.RemovalReportingIterator;
import com.mongodb.lang.Nullable;

import java.util.Iterator;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.function.UnaryOperator;

/**
* A concurrent pool implementation.
Expand Down Expand Up @@ -59,7 +61,7 @@ public enum Prune {
* @param <T>
*/
public interface ItemFactory<T> {
T create(boolean initialize);
T create();

void close(T t);

Expand Down Expand Up @@ -143,7 +145,8 @@ public T get(final long timeout, final TimeUnit timeUnit) {

T t = available.pollLast();
if (t == null) {
t = createNewAndReleasePermitIfFailure(false);
t = createNewAndReleasePermitIfFailure(null);
assert t != null : "new item must not be null if postCreate is null";
}

return t;
Expand All @@ -167,25 +170,49 @@ public void prune() {
}
}

public void ensureMinSize(final int minSize, final boolean initialize) {
/**
* Try to populate this pool with items so that {@link #getCount()} is not smaller than {@code minSize}.
* The {@code postCreate} action throwing a exception causes this method to stop and re-throw that exception.
*
* @param postCreate A transforming action applied to non-{@code null} new items.
* This action may return an item as is, may modify the item,
* or throw an {@link Exception}. In the latter case the action must release
* resources associated with the item.
*/
public void ensureMinSize(final int minSize, @Nullable final UnaryOperator<T> postCreate) {
while (getCount() < minSize) {
if (!acquirePermit(10, TimeUnit.MILLISECONDS)) {
if (!acquirePermit(0, TimeUnit.MILLISECONDS)) {
break;
}
release(createNewAndReleasePermitIfFailure(initialize));
T newItem = createNewAndReleasePermitIfFailure(postCreate);
release(newItem);
}
}

private T createNewAndReleasePermitIfFailure(final boolean initialize) {
/**
* If this method returns {@code null} or throws an {@link Exception}, then before doing so it releases a permit.
*
* @param postCreate See {@link #ensureMinSize(int, UnaryOperator)}.
* @return Either a {@linkplain ItemFactory#create() new} non-{@code null} item if {@code postCreate} is {@code null},
* or the result of {@linkplain UnaryOperator#apply(Object) applying} {@code postCreate} to the new item.
*/
private T createNewAndReleasePermitIfFailure(@Nullable final UnaryOperator<T> postCreate) {
boolean failure = true;
try {
T newMember = itemFactory.create(initialize);
T newMember = itemFactory.create();
if (newMember == null) {
throw new MongoInternalException("The factory for the pool created a null item");
}
newMember = postCreate == null ? newMember : postCreate.apply(newMember);
if (newMember == null) {
throw new MongoInternalException("The post create operator for the pool created a null item");
}
failure = false;
return newMember;
} catch (RuntimeException e) {
permits.release();
throw e;
} finally {
if (failure) {
permits.release();
}
}
}

Expand Down
Loading