Skip to content

Commit 23d3a99

Browse files
committed
Replace ExecutionList with linked list of RunnableExecutionPair
Patch does CAS loop and XCHG instead of synchronized which should be faster. Patch also reduces per-future memory consumption.
1 parent 31864fa commit 23d3a99

File tree

4 files changed

+128
-171
lines changed

4 files changed

+128
-171
lines changed

client/src/main/java/org/asynchttpclient/future/AbstractListenableFuture.java

Lines changed: 35 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -28,13 +28,15 @@
2828

2929
package org.asynchttpclient.future;
3030

31+
import java.util.ArrayList;
3132
import java.util.concurrent.Executor;
33+
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
3234

3335
import org.asynchttpclient.ListenableFuture;
3436

3537
/**
36-
* An abstract base implementation of the listener support provided by {@link ListenableFuture}. This class uses an {@link ExecutionList} to guarantee that all registered listeners
37-
* will be executed. Listener/Executor pairs are stored in the execution list and executed in the order in which they were added, but because of thread scheduling issues there is
38+
* An abstract base implementation of the listener support provided by {@link ListenableFuture}.
39+
* Listener/Executor pairs are stored in the {@link RunnableExecutorPair} linked list in the order in which they were added, but because of thread scheduling issues there is
3840
* no guarantee that the JVM will execute them in order. In addition, listeners added after the task is complete will be executed immediately, even if some previously added
3941
* listeners have not yet been executed.
4042
*
@@ -43,41 +45,48 @@
4345
*/
4446
public abstract class AbstractListenableFuture<V> implements ListenableFuture<V> {
4547

46-
private volatile boolean hasRun;
47-
private volatile boolean executionListInitialized;
48-
private volatile ExecutionList executionList;
48+
/**
49+
* Marks that execution is already done, and new runnables
50+
* should be executed right away instead of begin added to the list.
51+
*/
52+
private static final RunnableExecutorPair executedMarker = new RunnableExecutorPair(null, null, null);
4953

50-
private ExecutionList executionList() {
51-
ExecutionList localExecutionList = executionList;
52-
if (localExecutionList == null) {
53-
synchronized (this) {
54-
localExecutionList = executionList;
55-
if (localExecutionList == null) {
56-
localExecutionList = new ExecutionList();
57-
executionList = localExecutionList;
58-
executionListInitialized = true;
59-
}
60-
}
61-
}
62-
return localExecutionList;
63-
}
54+
/**
55+
* Linked list of executions or a {@link #executedMarker}.
56+
*/
57+
private volatile RunnableExecutorPair executionList;
58+
private static final AtomicReferenceFieldUpdater<AbstractListenableFuture, RunnableExecutorPair> executionListField =
59+
AtomicReferenceFieldUpdater.newUpdater(AbstractListenableFuture.class, RunnableExecutorPair.class, "executionList");
6460

6561
@Override
6662
public ListenableFuture<V> addListener(Runnable listener, Executor exec) {
67-
executionList().add(listener, exec);
68-
if (hasRun) {
69-
runListeners();
63+
for (;;) {
64+
RunnableExecutorPair executionListLocal = this.executionList;
65+
if (executionListLocal == executedMarker) {
66+
RunnableExecutorPair.executeListener(listener, exec);
67+
return this;
68+
}
69+
70+
RunnableExecutorPair pair = new RunnableExecutorPair(listener, exec, executionListLocal);
71+
if (executionListField.compareAndSet(this, executionListLocal, pair)) {
72+
return this;
73+
}
7074
}
71-
return this;
7275
}
7376

7477
/**
7578
* Execute the execution list.
7679
*/
7780
protected void runListeners() {
78-
hasRun = true;
79-
if (executionListInitialized) {
80-
executionList().execute();
81+
RunnableExecutorPair execution = executionListField.getAndSet(this, executedMarker);
82+
if (execution == executedMarker) {
83+
return;
84+
}
85+
86+
RunnableExecutorPair reversedList = RunnableExecutorPair.reverseList(execution);
87+
88+
while (reversedList != null) {
89+
RunnableExecutorPair.executeListener(reversedList.runnable, reversedList.executor);
8190
}
8291
}
8392
}

client/src/main/java/org/asynchttpclient/future/ExecutionList.java

Lines changed: 0 additions & 145 deletions
This file was deleted.
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
package org.asynchttpclient.future;
2+
3+
import java.util.concurrent.Executor;
4+
import java.util.logging.Level;
5+
import java.util.logging.Logger;
6+
7+
/**
8+
* Linked list of runnables with executors.
9+
*/
10+
final class RunnableExecutorPair {
11+
private static final Logger log = Logger.getLogger(RunnableExecutorPair.class.getPackage().getName());
12+
13+
final Runnable runnable;
14+
final Executor executor;
15+
RunnableExecutorPair next;
16+
17+
RunnableExecutorPair(Runnable runnable, Executor executor, RunnableExecutorPair next) {
18+
this.runnable = runnable;
19+
this.executor = executor;
20+
this.next = next;
21+
}
22+
23+
/**
24+
* Submits the given runnable to the given {@link Executor} catching and logging all {@linkplain RuntimeException runtime exceptions} thrown by the executor.
25+
*/
26+
static void executeListener(Runnable runnable, Executor executor) {
27+
try {
28+
executor.execute(runnable);
29+
} catch (RuntimeException e) {
30+
// Log it and keep going, bad runnable and/or executor. Don't punish the other runnables if
31+
// we're given a bad one. We only catch RuntimeException because we want Errors to propagate
32+
// up.
33+
log.log(Level.SEVERE, "RuntimeException while executing runnable " + runnable + " with executor " + executor, e);
34+
}
35+
}
36+
37+
static RunnableExecutorPair reverseList(RunnableExecutorPair list) {
38+
// The pairs in the stack are in the opposite order from how they were added
39+
// so we need to reverse the list to fulfill our contract.
40+
// This is somewhat annoying, but turns out to be very fast in practice. Alternatively, we
41+
// could drop the contract on the method that enforces this queue like behavior since depending
42+
// on it is likely to be a bug anyway.
43+
44+
// N.B. All writes to the list and the next pointers must have happened before the above
45+
// synchronized block, so we can iterate the list without the lock held here.
46+
RunnableExecutorPair prev = null;
47+
48+
while (list != null) {
49+
RunnableExecutorPair next = list.next;
50+
list.next = prev;
51+
prev = list;
52+
list = next;
53+
}
54+
55+
return prev;
56+
}
57+
}
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
package org.asynchttpclient.future;
2+
3+
import java.util.ArrayList;
4+
5+
import org.testng.Assert;
6+
import org.testng.annotations.Test;
7+
8+
/**
9+
* @author Stepan Koltsov
10+
*/
11+
public class RunnableExecutorPairTest {
12+
13+
@Test
14+
public void testReverseList() {
15+
// empty
16+
{
17+
Assert.assertNull(RunnableExecutorPair.reverseList(null));
18+
}
19+
20+
for (int len = 1; len < 5; ++len) {
21+
ArrayList<RunnableExecutorPair> list = new ArrayList<>();
22+
for (int i = 0; i < len; ++i) {
23+
RunnableExecutorPair prev = i != 0 ? list.get(i - 1) : null;
24+
list.add(new RunnableExecutorPair(() -> {}, null, prev));
25+
}
26+
27+
RunnableExecutorPair reversed = RunnableExecutorPair.reverseList(list.get(list.size() - 1));
28+
for (int i = 0; i < len; ++i) {
29+
Assert.assertSame(reversed, list.get(i));
30+
Assert.assertSame(i != len - 1 ? list.get(i + 1) : null, reversed.next);
31+
reversed = reversed.next;
32+
}
33+
}
34+
}
35+
36+
}

0 commit comments

Comments
 (0)