Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
b16a727
Refactor ConcurrentPool so that it becomes agnostic of the item initi…
stIncMale Mar 4, 2021
f21d299
Implement maxConnecting in DefaultConnectionPool.get
stIncMale Mar 10, 2021
9371a8b
Add specification tests
stIncMale Mar 10, 2021
388a7a7
Implement integration style CMAP specification tests
stIncMale Mar 10, 2021
1e75fe0
Implement maxConnecting in DefaultConnectionPool.getAsync
stIncMale Mar 16, 2021
8360c76
Fix static validation issues
stIncMale Mar 16, 2021
045fe13
Implement runOn in AbstractConnectionPoolTest
stIncMale Mar 16, 2021
5e5c7c0
Send ConnectionCheckOutFailedEvent with Reason.CONNECTION_ERROR if fa…
stIncMale Mar 17, 2021
459466a
Address the first set of review concerns
stIncMale Mar 17, 2021
4dae245
Add an test for DefaultConnectionPool.getAsync that checks for Reason…
stIncMale Mar 18, 2021
5d203f7
Simplify DefaultConnectionPoolSpecification.selectConnectionAsync
stIncMale Mar 18, 2021
3d54d9d
Simplify ConcurrentPool.ensureMinSize
stIncMale Mar 23, 2021
26a4a7b
Replace Java assert statements with our assert methods
stIncMale Mar 24, 2021
e2bd080
Merge branch 'master' into JAVA-3927
stIncMale Mar 24, 2021
45d1891
Address naming and try-finally review concerns
stIncMale Mar 24, 2021
49a3e3c
Fix ensureMinSize call in ConcurrentPoolTest
stIncMale Mar 24, 2021
fbd5a97
Add tests for Timeout
stIncMale Mar 25, 2021
6eaf542
Introduce sync and async methods by duplicating the logic of openOrSi…
stIncMale Mar 25, 2021
f5967d1
Make it clearer that releasePermit and tryHandOver methods acquire th…
stIncMale Mar 26, 2021
1e29f20
Replace Timeout.remainingNanos with remaining(TimeUnit)
stIncMale Mar 26, 2021
d3a9195
Address review concerns
stIncMale Mar 31, 2021
fa3c23c
Undo changes in the ConnectionPoolListener specification
stIncMale Apr 1, 2021
0fedc2d
Improve docs
stIncMale Apr 3, 2021
22d3afc
Add concurrency tests for DefaultConnectionPool
stIncMale Apr 3, 2021
fbdd763
Assert all checked in connections are handed over in DefaultConnectio…
stIncMale Apr 5, 2021
ba98f3d
Address review concerns
stIncMale Apr 5, 2021
b04ff8f
Do not confuse a user with timeout exceptions caused by timeout excep…
stIncMale Apr 6, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
Refactor ConcurrentPool so that it becomes agnostic of the item initi…
…alization concept

JAVA-3927
  • Loading branch information
stIncMale committed Mar 16, 2021
commit b16a7274f785825e4f31bd7c1b363f50e8d8a8ba
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,15 @@

package com.mongodb.internal.connection;

import com.mongodb.Function;
import com.mongodb.MongoInternalException;
import com.mongodb.MongoInterruptedException;
import com.mongodb.MongoTimeoutException;
import com.mongodb.internal.connection.ConcurrentLinkedDeque.RemovalReportingIterator;
import com.mongodb.lang.Nullable;

import java.util.Iterator;
import java.util.Optional;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;

Expand Down Expand Up @@ -59,7 +62,7 @@ public enum Prune {
* @param <T>
*/
public interface ItemFactory<T> {
T create(boolean initialize);
T create();

void close(T t);

Expand Down Expand Up @@ -143,7 +146,8 @@ public T get(final long timeout, final TimeUnit timeUnit) {

T t = available.pollLast();
if (t == null) {
t = createNewAndReleasePermitIfFailure(false);
t = createNewAndReleasePermitIfFailure(null);
assert t != null : "new item must not be null if postCreate is null";
}

return t;
Expand All @@ -167,25 +171,53 @@ public void prune() {
}
}

public void ensureMinSize(final int minSize, final boolean initialize) {
/**
* Try to populate this pool with items so that {@link #getCount()} is not smaller than {@code minSize}.
* The {@code postCreate} action returning an {@linkplain Optional#isEmpty() empty} result causes this method to return.
*
* @param postCreate A transforming action applied to non-{@code null} new items.
* This action may return an item as is, may modify the item,
* or return a (potentially {@linkplain Optional#isEmpty() empty}) reference to different object / throw an
* {@link Exception}. In the latter case the action must release
* resources associated with the item if they cannot be releases via the returned reference.
*/
public void ensureMinSize(final int minSize, @Nullable final Function<? super T, ? extends Optional<? extends T>> postCreate) {
while (getCount() < minSize) {
if (!acquirePermit(10, TimeUnit.MILLISECONDS)) {
if (!acquirePermit(0, TimeUnit.MILLISECONDS)) {
break;
}
release(createNewAndReleasePermitIfFailure(initialize));
T newItem = createNewAndReleasePermitIfFailure(postCreate);
if (newItem == null) {
break;
} else {
release(newItem);
}
}
}

private T createNewAndReleasePermitIfFailure(final boolean initialize) {
/**
* If this method returns {@code null} or throws an {@link Exception}, then before doing so it releases a permit.
*
* @param postCreate See {@link #ensureMinSize(int, Function)}.
* @return Either a {@linkplain ItemFactory#create() new} non-{@code null} item if {@code postCreate} is {@code null},
* or the result of {@linkplain Function#apply(Object) applying} {@code postCreate} to the new item
* (this result may be {@linkplain Optional#isEmpty() empty}, in which case the method returns {@code null}).
*/
@Nullable
private T createNewAndReleasePermitIfFailure(@Nullable final Function<? super T, ? extends Optional<? extends T>> postCreate) {
boolean failure = true;
try {
T newMember = itemFactory.create(initialize);
T newMember = itemFactory.create();
if (newMember == null) {
throw new MongoInternalException("The factory for the pool created a null item");
}
newMember = postCreate == null ? newMember : postCreate.apply(newMember).orElse(null);
failure = newMember == null;
return newMember;
} catch (RuntimeException e) {
permits.release();
throw e;
} finally {
if (failure) {
permits.release();
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import org.bson.codecs.Decoder;

import java.util.List;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
Expand Down Expand Up @@ -320,7 +321,13 @@ public synchronized void run() {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug(format("Ensuring minimum pooled connections to %s", serverId.getAddress()));
}
pool.ensureMinSize(settings.getMinSize(), true);
pool.ensureMinSize(settings.getMinSize(), newConnection -> {
if (!newConnection.opened()) {
newConnection.open();
connectionPoolListener.connectionReady(new ConnectionReadyEvent(getId(newConnection)));
}
return Optional.of(newConnection);
});
}
} catch (MongoInterruptedException e) {
// don't log interruptions due to the shutdownNow call on the ExecutorService
Expand Down Expand Up @@ -580,16 +587,11 @@ private class UsageTrackingInternalConnectionItemFactory implements ConcurrentPo
}

@Override
public UsageTrackingInternalConnection create(final boolean initialize) {
public UsageTrackingInternalConnection create() {
UsageTrackingInternalConnection internalConnection =
new UsageTrackingInternalConnection(internalConnectionFactory.create(serverId), generation.get());
ConnectionId id = getId(internalConnection);
connectionCreated(connectionPoolListener, id);
if (initialize) {
internalConnection.open();
connectionPoolListener.connectionReady(new ConnectionReadyEvent(id));
}

return internalConnection;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public PowerOfTwoBufferPool(final int highestPowerOfTwo) {
powerOfTwoToPoolMap.put(i, new ConcurrentPool<ByteBuffer>(Integer.MAX_VALUE,
new ConcurrentPool.ItemFactory<ByteBuffer>() {
@Override
public ByteBuffer create(final boolean initialize) {
public ByteBuffer create() {
return createNew(size);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,7 @@ public boolean isMarkedDirty() {

private final class ServerSessionItemFactory implements ConcurrentPool.ItemFactory<ServerSessionImpl> {
@Override
public ServerSessionImpl create(final boolean initialize) {
public ServerSessionImpl create() {
return new ServerSessionImpl(createNewServerSessionIdentifier());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,13 @@

package com.mongodb.internal.connection;

import com.mongodb.Function;
import com.mongodb.MongoException;
import com.mongodb.MongoTimeoutException;
import org.junit.Test;

import java.io.Closeable;
import java.util.Optional;

import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static org.junit.Assert.assertEquals;
Expand Down Expand Up @@ -141,36 +143,36 @@ public void testCloseItemOnReleaseAfterPoolClosed() {
public void testEnsureMinSize() {
pool = new ConcurrentPool<TestCloseable>(3, new TestItemFactory());

pool.ensureMinSize(0, false);
pool.ensureMinSize(0, null);
assertEquals(0, pool.getAvailableCount());

pool.ensureMinSize(1, false);
pool.ensureMinSize(1, null);
assertEquals(1, pool.getAvailableCount());

pool.ensureMinSize(1, false);
pool.ensureMinSize(1, null);
assertEquals(1, pool.getAvailableCount());

pool.get();
pool.ensureMinSize(1, false);
pool.ensureMinSize(1, null);
assertEquals(0, pool.getAvailableCount());

pool.ensureMinSize(4, false);
pool.ensureMinSize(4, null);
assertEquals(3, pool.getAvailableCount());
}

@Test
public void whenEnsuringMinSizeShouldNotInitializePooledItemIfNotRequested() {
pool = new ConcurrentPool<TestCloseable>(3, new TestItemFactory());

pool.ensureMinSize(1, false);
pool.ensureMinSize(1, null);
assertFalse(pool.get().isInitialized());
}

@Test
public void whenEnsuringMinSizeShouldInitializePooledItemIfRequested() {
pool = new ConcurrentPool<TestCloseable>(3, new TestItemFactory());

pool.ensureMinSize(1, true);
pool.ensureMinSize(1, TestCloseable.initializeAction);
assertTrue(pool.get().isInitialized());
}

Expand All @@ -179,7 +181,7 @@ public void testThatEnsuringMinSizeReleasesPermitIfCreateFails() {
pool = new ConcurrentPool<TestCloseable>(1, new TestItemFactory(true));

try {
pool.ensureMinSize(1, true);
pool.ensureMinSize(1, TestCloseable.initializeAction);
fail();
} catch (MongoException e) {
// expected
Expand Down Expand Up @@ -232,11 +234,11 @@ class TestItemFactory implements ConcurrentPool.ItemFactory<TestCloseable> {
}

@Override
public TestCloseable create(final boolean initialize) {
public TestCloseable create() {
if (shouldThrowOnCreate) {
throw new MongoException("This is a journey");
}
return new TestCloseable(initialize);
return new TestCloseable();
}

@Override
Expand All @@ -251,12 +253,16 @@ public ConcurrentPool.Prune shouldPrune(final TestCloseable testCloseable) {
}

static class TestCloseable implements Closeable {
private static final Function<TestCloseable, Optional<TestCloseable>> initializeAction = connection -> {
connection.initialized = true;
return Optional.of(connection);
};

private boolean closed;
private ConcurrentPool.Prune shouldPrune;
private final boolean initialized;
private boolean initialized;

TestCloseable(final boolean initialize) {
this.initialized = initialize;
TestCloseable() {
}

@Override
Expand Down