Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Replace ExecutionList with linked list of RunnableExecutionPair
Patch does CAS loop and XCHG instead of synchronized which should
be faster.

Patch also reduces per-future memory consumption.
  • Loading branch information
stepancheg committed Oct 26, 2016
commit 3f701e5a597ea5c0118dd803c8568b42d6a2815a
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,13 @@
package org.asynchttpclient.future;

import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;

import org.asynchttpclient.ListenableFuture;

/**
* An abstract base implementation of the listener support provided by {@link ListenableFuture}. This class uses an {@link ExecutionList} to guarantee that all registered listeners
* will be executed. Listener/Executor pairs are stored in the execution list and executed in the order in which they were added, but because of thread scheduling issues there is
* An abstract base implementation of the listener support provided by {@link ListenableFuture}.
* Listener/Executor pairs are stored in the {@link RunnableExecutorPair} linked list in the order in which they were added, but because of thread scheduling issues there is
* no guarantee that the JVM will execute them in order. In addition, listeners added after the task is complete will be executed immediately, even if some previously added
* listeners have not yet been executed.
*
Expand All @@ -43,41 +44,49 @@
*/
public abstract class AbstractListenableFuture<V> implements ListenableFuture<V> {

private volatile boolean hasRun;
private volatile boolean executionListInitialized;
private volatile ExecutionList executionList;
/**
* Marks that execution is already done, and new runnables
* should be executed right away instead of begin added to the list.
*/
private static final RunnableExecutorPair executedMarker = new RunnableExecutorPair();

private ExecutionList executionList() {
ExecutionList localExecutionList = executionList;
if (localExecutionList == null) {
synchronized (this) {
localExecutionList = executionList;
if (localExecutionList == null) {
localExecutionList = new ExecutionList();
executionList = localExecutionList;
executionListInitialized = true;
}
}
}
return localExecutionList;
}
/**
* Linked list of executions or a {@link #executedMarker}.
*/
private volatile RunnableExecutorPair executionList;
private static final AtomicReferenceFieldUpdater<AbstractListenableFuture, RunnableExecutorPair> executionListField =
AtomicReferenceFieldUpdater.newUpdater(AbstractListenableFuture.class, RunnableExecutorPair.class, "executionList");

@Override
public ListenableFuture<V> addListener(Runnable listener, Executor exec) {
executionList().add(listener, exec);
if (hasRun) {
runListeners();
for (;;) {
RunnableExecutorPair executionListLocal = this.executionList;
if (executionListLocal == executedMarker) {
RunnableExecutorPair.executeListener(listener, exec);
return this;
}

RunnableExecutorPair pair = new RunnableExecutorPair(listener, exec, executionListLocal);
if (executionListField.compareAndSet(this, executionListLocal, pair)) {
return this;
}
}
return this;
}

/**
* Execute the execution list.
*/
protected void runListeners() {
hasRun = true;
if (executionListInitialized) {
executionList().execute();
RunnableExecutorPair execution = executionListField.getAndSet(this, executedMarker);
if (execution == executedMarker) {
return;
}

RunnableExecutorPair reversedList = RunnableExecutorPair.reverseList(execution);

while (reversedList != null) {
RunnableExecutorPair.executeListener(reversedList.runnable, reversedList.executor);
reversedList = reversedList.next;
}
}
}
148 changes: 0 additions & 148 deletions client/src/main/java/org/asynchttpclient/future/ExecutionList.java

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
package org.asynchttpclient.future;

import java.util.concurrent.Executor;
import java.util.logging.Level;
import java.util.logging.Logger;

import org.asynchttpclient.util.Assertions;

/**
* Linked list of runnables with executors.
*/
final class RunnableExecutorPair {
private static final Logger log = Logger.getLogger(RunnableExecutorPair.class.getPackage().getName());

final Runnable runnable;
final Executor executor;
RunnableExecutorPair next;

RunnableExecutorPair() {
runnable = null;
executor = null;
}

RunnableExecutorPair(Runnable runnable, Executor executor, RunnableExecutorPair next) {
Assertions.assertNotNull(runnable, "runnable");

this.runnable = runnable;
this.executor = executor;
this.next = next;
}

/**
* Submits the given runnable to the given {@link Executor} catching and logging all {@linkplain RuntimeException runtime exceptions} thrown by the executor.
*/
static void executeListener(Runnable runnable, Executor executor) {
try {
if (executor != null) {
executor.execute(runnable);
} else {
runnable.run();
}
} catch (RuntimeException e) {
// Log it and keep going, bad runnable and/or executor. Don't punish the other runnables if
// we're given a bad one. We only catch RuntimeException because we want Errors to propagate
// up.
log.log(Level.SEVERE, "RuntimeException while executing runnable " + runnable + " with executor " + executor, e);
}
}

static RunnableExecutorPair reverseList(RunnableExecutorPair list) {
// The pairs in the stack are in the opposite order from how they were added
// so we need to reverse the list to fulfill our contract.
// This is somewhat annoying, but turns out to be very fast in practice. Alternatively, we
// could drop the contract on the method that enforces this queue like behavior since depending
// on it is likely to be a bug anyway.

// N.B. All writes to the list and the next pointers must have happened before the above
// synchronized block, so we can iterate the list without the lock held here.
RunnableExecutorPair prev = null;

while (list != null) {
RunnableExecutorPair next = list.next;
list.next = prev;
prev = list;
list = next;
}

return prev;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package org.asynchttpclient.future;

import java.util.ArrayList;

import org.testng.Assert;
import org.testng.annotations.Test;

/**
* @author Stepan Koltsov
*/
public class RunnableExecutorPairTest {

@Test
public void testReverseList() {
// empty
{
Assert.assertNull(RunnableExecutorPair.reverseList(null));
}

for (int len = 1; len < 5; ++len) {
ArrayList<RunnableExecutorPair> list = new ArrayList<>();
for (int i = 0; i < len; ++i) {
RunnableExecutorPair prev = i != 0 ? list.get(i - 1) : null;
list.add(new RunnableExecutorPair(() -> {}, null, prev));
}

RunnableExecutorPair reversed = RunnableExecutorPair.reverseList(list.get(list.size() - 1));
for (int i = 0; i < len; ++i) {
Assert.assertSame(reversed, list.get(i));
Assert.assertSame(i != len - 1 ? list.get(i + 1) : null, reversed.next);
reversed = reversed.next;
}
}
}

}