- * It can be called however often and whenever needed—but the outstanding cumulative demand must never exceed Long.MAX_VALUE. - * An outstanding cumulative demand of Long.MAX_VALUE may be treated by the {@link Publisher} as "effectively unbounded". + * It can be called however often and whenever needed—but if the outstanding cumulative demand ever becomes Long.MAX_VALUE or more, + * it may be treated by the {@link Publisher} as "effectively unbounded". *
* Whatever has been requested can be sent by the {@link Publisher} so only signal demand for what can be safely handled. *
diff --git a/flow-adapters/src/main/java/org/reactivestreams/FlowAdapters.java b/api/src/main/java9/org/reactivestreams/FlowAdapters.java
similarity index 74%
rename from flow-adapters/src/main/java/org/reactivestreams/FlowAdapters.java
rename to api/src/main/java9/org/reactivestreams/FlowAdapters.java
index 4417e4c5..96cbff5b 100644
--- a/flow-adapters/src/main/java/org/reactivestreams/FlowAdapters.java
+++ b/api/src/main/java9/org/reactivestreams/FlowAdapters.java
@@ -12,6 +12,7 @@
package org.reactivestreams;
import java.util.concurrent.Flow;
+import static java.util.Objects.requireNonNull;
/**
* Bridge between Reactive Streams API and the Java 9 {@link java.util.concurrent.Flow} API.
@@ -31,16 +32,16 @@ private FlowAdapters() {
@SuppressWarnings("unchecked")
public static
+ * This does not necessarily have to correlate 1:1 with a {@code Subscription.request(elements)} call, but the sum
+ * of the elements requested by your {@code Subscriber} must eventually be at least the the sum of the elements
+ * triggered to be requested by all the invocations of this method.
+ *
+ * Additionally, subscribers are permitted to delay requesting elements until previous requests for elements have
+ * been fulfilled. For example, a subscriber that only requests one element at a time may fulfill the request made
+ * by this method by requesting one element {@code elements} times, waiting for each element to arrive before the
+ * next request is made.
+ *
+ * Before sending any element to the subscriber, the TCK must wait for the subscriber to request that element, and
+ * must be prepared for the subscriber to only request one element at a time, it is not enough for the TCK to
+ * simply invoke this method before sending elements.
+ *
+ * An invocation of {@link #signalCancel()} may be coalesced into any elements that have not yet been requested,
+ * such that only a cancel signal is emitted.
*/
void triggerRequest(long elements);
/**
- * Trigger {@code cancel()} on your {@link Subscriber}
+ * Trigger {@code cancel()} on your {@link Subscriber}.
+ *
+ * An invocation of this method may be coalesced into any outstanding requests, as requested by
+ * {@link #triggerRequest(long)}, such that only a cancel signal is emitted.
*/
void signalCancel();
}
diff --git a/tck/src/main/java/org/reactivestreams/tck/TestEnvironment.java b/tck/src/main/java/org/reactivestreams/tck/TestEnvironment.java
index 8fdad9fc..f3e435cf 100644
--- a/tck/src/main/java/org/reactivestreams/tck/TestEnvironment.java
+++ b/tck/src/main/java/org/reactivestreams/tck/TestEnvironment.java
@@ -14,8 +14,8 @@
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
-import org.reactivestreams.tck.flow.support.SubscriberBufferOverflowException;
import org.reactivestreams.tck.flow.support.Optional;
+import org.reactivestreams.tck.flow.support.SubscriberBufferOverflowException;
import java.util.Collections;
import java.util.LinkedList;
@@ -24,7 +24,10 @@
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static java.util.concurrent.TimeUnit.NANOSECONDS;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
@@ -35,8 +38,10 @@ public class TestEnvironment {
private static final long DEFAULT_TIMEOUT_MILLIS = 100;
private static final String DEFAULT_NO_SIGNALS_TIMEOUT_MILLIS_ENV = "DEFAULT_NO_SIGNALS_TIMEOUT_MILLIS";
+ private static final String DEFAULT_POLL_TIMEOUT_MILLIS_ENV = "DEFAULT_POLL_TIMEOUT_MILLIS_ENV";
private final long defaultTimeoutMillis;
+ private final long defaultPollTimeoutMillis;
private final long defaultNoSignalsTimeoutMillis;
private final boolean printlnDebug;
@@ -49,14 +54,46 @@ public class TestEnvironment {
* run the tests.
* @param defaultTimeoutMillis default timeout to be used in all expect* methods
* @param defaultNoSignalsTimeoutMillis default timeout to be used when no further signals are expected anymore
+ * @param defaultPollTimeoutMillis default amount of time to poll for events if {@code defaultTimeoutMillis} isn't
+ * preempted by an asynchronous event.
* @param printlnDebug if true, signals such as OnNext / Request / OnComplete etc will be printed to standard output,
*/
- public TestEnvironment(long defaultTimeoutMillis, long defaultNoSignalsTimeoutMillis, boolean printlnDebug) {
+ public TestEnvironment(long defaultTimeoutMillis, long defaultNoSignalsTimeoutMillis, long defaultPollTimeoutMillis,
+ boolean printlnDebug) {
this.defaultTimeoutMillis = defaultTimeoutMillis;
+ this.defaultPollTimeoutMillis = defaultPollTimeoutMillis;
this.defaultNoSignalsTimeoutMillis = defaultNoSignalsTimeoutMillis;
this.printlnDebug = printlnDebug;
}
+ /**
+ * Tests must specify the timeout for expected outcome of asynchronous
+ * interactions. Longer timeout does not invalidate the correctness of
+ * the implementation, but can in some cases result in longer time to
+ * run the tests.
+ * @param defaultTimeoutMillis default timeout to be used in all expect* methods
+ * @param defaultNoSignalsTimeoutMillis default timeout to be used when no further signals are expected anymore
+ * @param printlnDebug if true, signals such as OnNext / Request / OnComplete etc will be printed to standard output,
+ */
+ public TestEnvironment(long defaultTimeoutMillis, long defaultNoSignalsTimeoutMillis, boolean printlnDebug) {
+ this(defaultTimeoutMillis, defaultNoSignalsTimeoutMillis, defaultTimeoutMillis, printlnDebug);
+ }
+
+ /**
+ * Tests must specify the timeout for expected outcome of asynchronous
+ * interactions. Longer timeout does not invalidate the correctness of
+ * the implementation, but can in some cases result in longer time to
+ * run the tests.
+ *
+ * @param defaultTimeoutMillis default timeout to be used in all expect* methods
+ * @param defaultNoSignalsTimeoutMillis default timeout to be used when no further signals are expected anymore
+ * @param defaultPollTimeoutMillis default amount of time to poll for events if {@code defaultTimeoutMillis} isn't
+ * preempted by an asynchronous event.
+ */
+ public TestEnvironment(long defaultTimeoutMillis, long defaultNoSignalsTimeoutMillis, long defaultPollTimeoutMillis) {
+ this(defaultTimeoutMillis, defaultNoSignalsTimeoutMillis, defaultPollTimeoutMillis, false);
+ }
+
/**
* Tests must specify the timeout for expected outcome of asynchronous
* interactions. Longer timeout does not invalidate the correctness of
@@ -67,7 +104,7 @@ public TestEnvironment(long defaultTimeoutMillis, long defaultNoSignalsTimeoutMi
* @param defaultNoSignalsTimeoutMillis default timeout to be used when no further signals are expected anymore
*/
public TestEnvironment(long defaultTimeoutMillis, long defaultNoSignalsTimeoutMillis) {
- this(defaultTimeoutMillis, defaultNoSignalsTimeoutMillis, false);
+ this(defaultTimeoutMillis, defaultTimeoutMillis, defaultNoSignalsTimeoutMillis);
}
/**
@@ -79,7 +116,7 @@ public TestEnvironment(long defaultTimeoutMillis, long defaultNoSignalsTimeoutMi
* @param defaultTimeoutMillis default timeout to be used in all expect* methods
*/
public TestEnvironment(long defaultTimeoutMillis) {
- this(defaultTimeoutMillis, defaultTimeoutMillis, false);
+ this(defaultTimeoutMillis, defaultTimeoutMillis, defaultTimeoutMillis);
}
/**
@@ -95,7 +132,7 @@ public TestEnvironment(long defaultTimeoutMillis) {
* often helpful to pinpoint simple race conditions etc.
*/
public TestEnvironment(boolean printlnDebug) {
- this(envDefaultTimeoutMillis(), envDefaultNoSignalsTimeoutMillis(), printlnDebug);
+ this(envDefaultTimeoutMillis(), envDefaultNoSignalsTimeoutMillis(), envDefaultPollTimeoutMillis(), printlnDebug);
}
/**
@@ -124,6 +161,14 @@ public long defaultNoSignalsTimeoutMillis() {
return defaultNoSignalsTimeoutMillis;
}
+ /**
+ * The default amount of time to poll for events if {@code defaultTimeoutMillis} isn't preempted by an asynchronous
+ * event.
+ */
+ public long defaultPollTimeoutMillis() {
+ return defaultPollTimeoutMillis;
+ }
+
/**
* Tries to parse the env variable {@code DEFAULT_TIMEOUT_MILLIS} as long and returns the value if present OR its default value.
*
@@ -154,6 +199,21 @@ public static long envDefaultNoSignalsTimeoutMillis() {
}
}
+ /**
+ * Tries to parse the env variable {@code DEFAULT_POLL_TIMEOUT_MILLIS_ENV} as long and returns the value if present OR its default value.
+ *
+ * @throws java.lang.IllegalArgumentException when unable to parse the env variable
+ */
+ public static long envDefaultPollTimeoutMillis() {
+ final String envMillis = System.getenv(DEFAULT_POLL_TIMEOUT_MILLIS_ENV);
+ if (envMillis == null) return envDefaultTimeoutMillis();
+ else try {
+ return Long.parseLong(envMillis);
+ } catch (NumberFormatException ex) {
+ throw new IllegalArgumentException(String.format("Unable to parse %s env value [%s] as long!", DEFAULT_POLL_TIMEOUT_MILLIS_ENV, envMillis), ex);
+ }
+ }
+
/**
* To flop means to "fail asynchronously", either by onErroring or by failing some TCK check triggered asynchronously.
* This method does *NOT* fail the test - it's up to inspections of the error to fail the test if required.
@@ -191,7 +251,7 @@ public void flop(Throwable thr, String msg) {
asyncErrors.add(thr);
}
}
-
+
/**
* To flop means to "fail asynchronously", either by onErroring or by failing some TCK check triggered asynchronously.
* This method does *NOT* fail the test - it's up to inspections of the error to fail the test if required.
@@ -275,7 +335,7 @@ public Throwable dropAsyncError() {
}
/**
- * Waits for {@link TestEnvironment#defaultTimeoutMillis()} and then verifies that no asynchronous errors
+ * Waits for {@link TestEnvironment#defaultNoSignalsTimeoutMillis()} and then verifies that no asynchronous errors
* were signalled pior to, or during that time (by calling {@code flop()}).
*/
public void verifyNoAsyncErrors() {
@@ -318,8 +378,13 @@ public void verifyNoAsyncErrorsNoDelay() {
/** If {@code TestEnvironment#printlnDebug} is true, print debug message to std out. */
public void debug(String msg) {
- if (printlnDebug)
+ if (debugEnabled()) {
System.out.printf("[TCK-DEBUG] %s%n", msg);
+ }
+ }
+
+ public final boolean debugEnabled() {
+ return printlnDebug;
}
/**
@@ -512,26 +577,32 @@ public void expectCompletion(long timeoutMillis, String errorMsg) throws Interru
}
public