diff --git a/tck-flow/README.md b/tck-flow/README.md index 9f0b2cc9..ad39c4ba 100644 --- a/tck-flow/README.md +++ b/tck-flow/README.md @@ -209,18 +209,24 @@ within the TCK which await for something to happen. The other timeout is `publis [Rule 3.13](https://github.com/reactive-streams/reactive-streams-jvm#3.13) which defines that `Subscriber` references MUST be dropped by the Publisher. -Note that the TCK differenciates between timeouts for "waiting for a signal" (``defaultTimeoutMillis``), -and "asserting no signals happen during a given amount of time" (``envDefaultNoSignalsTimeoutMillis``). -While the latter defaults to the prior, it may be useful to tweak them independently when running on continious -integration servers (for example, keeping the no-signals timeout significantly lower). - -In order to configure these timeouts (for example when running on a slow continious integtation machine), you can either: +Note that the TCK differentiates between timeouts for "waiting for a signal" +(`defaultTimeoutMillis`), and "asserting no signals happen during a given amount of time" +(`defaultNoSignalsTimeoutMillis`). While the latter defaults to the prior, it may be useful to tweak +them independently when running on continuous integration servers (for example, keeping the +no-signals timeout significantly lower). Another configuration option is the "poll timeout" which is +used whenever an operation has to poll for a `defaultTimeoutMillis` for a signal to appear (most +often errors), it can then poll and check using the `defaultPollTimeoutMillis`, for the expected +error, rather than blocking for the full default timeout. + +In order to configure these timeouts (for example when running on a slow continuous integration +machine), you can either: **Use env variables** to set these timeouts, in which case the you can do: ```bash export DEFAULT_TIMEOUT_MILLIS=100 export DEFAULT_NO_SIGNALS_TIMEOUT_MILLIS=100 +export DEFAULT_POLL_TIMEOUT_MILLIS=20 export PUBLISHER_REFERENCE_GC_TIMEOUT_MILLIS=300 ``` @@ -231,10 +237,11 @@ public class RangePublisherTest extends FlowPublisherVerification { public static final long DEFAULT_TIMEOUT_MILLIS = 100L; public static final long DEFAULT_NO_SIGNALS_TIMEOUT_MILLIS = DEFAULT_TIMEOUT_MILLIS; - public static final long PUBLISHER_REFERENCE_CLEANUP_TIMEOUT_MILLIS = 500L; + public static final long DEFAULT_POLL_TIMEOUT_MILLIS = 20L; + public static final long PUBLISHER_REFERENCE_CLEANUP_TIMEOUT_MILLIS = 300L; public RangePublisherTest() { - super(new TestEnvironment(DEFAULT_TIMEOUT_MILLIS, DEFAULT_TIMEOUT_MILLIS), PUBLISHER_REFERENCE_CLEANUP_TIMEOUT_MILLIS); + super(new TestEnvironment(DEFAULT_TIMEOUT_MILLIS, DEFAULT_TIMEOUT_MILLIS, DEFAULT_POLL_TIMEOUT_MILLIS), PUBLISHER_REFERENCE_CLEANUP_TIMEOUT_MILLIS); } // ... diff --git a/tck/README.md b/tck/README.md index 9cd8ccec..a7add87a 100644 --- a/tck/README.md +++ b/tck/README.md @@ -209,18 +209,24 @@ within the TCK which await for something to happen. The other timeout is `publis [Rule 3.13](https://github.com/reactive-streams/reactive-streams-jvm#3.13) which defines that `Subscriber` references MUST be dropped by the Publisher. -Note that the TCK differenciates between timeouts for "waiting for a signal" (``defaultTimeoutMillis``), -and "asserting no signals happen during a given amount of time" (``envDefaultNoSignalsTimeoutMillis``). -While the latter defaults to the prior, it may be useful to tweak them independently when running on continious -integration servers (for example, keeping the no-signals timeout significantly lower). - -In order to configure these timeouts (for example when running on a slow continious integtation machine), you can either: +Note that the TCK differentiates between timeouts for "waiting for a signal" +(`defaultTimeoutMillis`), and "asserting no signals happen during a given amount of time" +(`defaultNoSignalsTimeoutMillis`). While the latter defaults to the prior, it may be useful to tweak +them independently when running on continuous integration servers (for example, keeping the +no-signals timeout significantly lower). Another configuration option is the "poll timeout" which is +used whenever an operation has to poll for a `defaultTimeoutMillis` for a signal to appear (most +often errors), it can then poll and check using the `defaultPollTimeoutMillis`, for the expected +error, rather than blocking for the full default timeout. + +In order to configure these timeouts (for example when running on a slow continuous integration +machine), you can either: **Use env variables** to set these timeouts, in which case the you can do: ```bash export DEFAULT_TIMEOUT_MILLIS=100 export DEFAULT_NO_SIGNALS_TIMEOUT_MILLIS=100 +export DEFAULT_POLL_TIMEOUT_MILLIS=20 export PUBLISHER_REFERENCE_GC_TIMEOUT_MILLIS=300 ``` @@ -231,10 +237,11 @@ public class RangePublisherTest extends PublisherVerification { public static final long DEFAULT_TIMEOUT_MILLIS = 100L; public static final long DEFAULT_NO_SIGNALS_TIMEOUT_MILLIS = DEFAULT_TIMEOUT_MILLIS; - public static final long PUBLISHER_REFERENCE_CLEANUP_TIMEOUT_MILLIS = 500L; + public static final long DEFAULT_POLL_TIMEOUT_MILLIS = 20L; + public static final long PUBLISHER_REFERENCE_CLEANUP_TIMEOUT_MILLIS = 300L; public RangePublisherTest() { - super(new TestEnvironment(DEFAULT_TIMEOUT_MILLIS, DEFAULT_TIMEOUT_MILLIS), PUBLISHER_REFERENCE_CLEANUP_TIMEOUT_MILLIS); + super(new TestEnvironment(DEFAULT_TIMEOUT_MILLIS, DEFAULT_TIMEOUT_MILLIS, DEFAULT_POLL_TIMEOUT_MILLIS), PUBLISHER_REFERENCE_CLEANUP_TIMEOUT_MILLIS); } // ... diff --git a/tck/src/main/java/org/reactivestreams/tck/TestEnvironment.java b/tck/src/main/java/org/reactivestreams/tck/TestEnvironment.java index c44d483b..f3e435cf 100644 --- a/tck/src/main/java/org/reactivestreams/tck/TestEnvironment.java +++ b/tck/src/main/java/org/reactivestreams/tck/TestEnvironment.java @@ -14,8 +14,8 @@ import org.reactivestreams.Publisher; import org.reactivestreams.Subscriber; import org.reactivestreams.Subscription; -import org.reactivestreams.tck.flow.support.SubscriberBufferOverflowException; import org.reactivestreams.tck.flow.support.Optional; +import org.reactivestreams.tck.flow.support.SubscriberBufferOverflowException; import java.util.Collections; import java.util.LinkedList; @@ -24,9 +24,10 @@ import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; +import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static java.util.concurrent.TimeUnit.NANOSECONDS; import static org.testng.Assert.assertTrue; import static org.testng.Assert.fail; @@ -37,8 +38,10 @@ public class TestEnvironment { private static final long DEFAULT_TIMEOUT_MILLIS = 100; private static final String DEFAULT_NO_SIGNALS_TIMEOUT_MILLIS_ENV = "DEFAULT_NO_SIGNALS_TIMEOUT_MILLIS"; + private static final String DEFAULT_POLL_TIMEOUT_MILLIS_ENV = "DEFAULT_POLL_TIMEOUT_MILLIS_ENV"; private final long defaultTimeoutMillis; + private final long defaultPollTimeoutMillis; private final long defaultNoSignalsTimeoutMillis; private final boolean printlnDebug; @@ -51,14 +54,46 @@ public class TestEnvironment { * run the tests. * @param defaultTimeoutMillis default timeout to be used in all expect* methods * @param defaultNoSignalsTimeoutMillis default timeout to be used when no further signals are expected anymore + * @param defaultPollTimeoutMillis default amount of time to poll for events if {@code defaultTimeoutMillis} isn't + * preempted by an asynchronous event. * @param printlnDebug if true, signals such as OnNext / Request / OnComplete etc will be printed to standard output, */ - public TestEnvironment(long defaultTimeoutMillis, long defaultNoSignalsTimeoutMillis, boolean printlnDebug) { + public TestEnvironment(long defaultTimeoutMillis, long defaultNoSignalsTimeoutMillis, long defaultPollTimeoutMillis, + boolean printlnDebug) { this.defaultTimeoutMillis = defaultTimeoutMillis; + this.defaultPollTimeoutMillis = defaultPollTimeoutMillis; this.defaultNoSignalsTimeoutMillis = defaultNoSignalsTimeoutMillis; this.printlnDebug = printlnDebug; } + /** + * Tests must specify the timeout for expected outcome of asynchronous + * interactions. Longer timeout does not invalidate the correctness of + * the implementation, but can in some cases result in longer time to + * run the tests. + * @param defaultTimeoutMillis default timeout to be used in all expect* methods + * @param defaultNoSignalsTimeoutMillis default timeout to be used when no further signals are expected anymore + * @param printlnDebug if true, signals such as OnNext / Request / OnComplete etc will be printed to standard output, + */ + public TestEnvironment(long defaultTimeoutMillis, long defaultNoSignalsTimeoutMillis, boolean printlnDebug) { + this(defaultTimeoutMillis, defaultNoSignalsTimeoutMillis, defaultTimeoutMillis, printlnDebug); + } + + /** + * Tests must specify the timeout for expected outcome of asynchronous + * interactions. Longer timeout does not invalidate the correctness of + * the implementation, but can in some cases result in longer time to + * run the tests. + * + * @param defaultTimeoutMillis default timeout to be used in all expect* methods + * @param defaultNoSignalsTimeoutMillis default timeout to be used when no further signals are expected anymore + * @param defaultPollTimeoutMillis default amount of time to poll for events if {@code defaultTimeoutMillis} isn't + * preempted by an asynchronous event. + */ + public TestEnvironment(long defaultTimeoutMillis, long defaultNoSignalsTimeoutMillis, long defaultPollTimeoutMillis) { + this(defaultTimeoutMillis, defaultNoSignalsTimeoutMillis, defaultPollTimeoutMillis, false); + } + /** * Tests must specify the timeout for expected outcome of asynchronous * interactions. Longer timeout does not invalidate the correctness of @@ -69,7 +104,7 @@ public TestEnvironment(long defaultTimeoutMillis, long defaultNoSignalsTimeoutMi * @param defaultNoSignalsTimeoutMillis default timeout to be used when no further signals are expected anymore */ public TestEnvironment(long defaultTimeoutMillis, long defaultNoSignalsTimeoutMillis) { - this(defaultTimeoutMillis, defaultNoSignalsTimeoutMillis, false); + this(defaultTimeoutMillis, defaultTimeoutMillis, defaultNoSignalsTimeoutMillis); } /** @@ -81,7 +116,7 @@ public TestEnvironment(long defaultTimeoutMillis, long defaultNoSignalsTimeoutMi * @param defaultTimeoutMillis default timeout to be used in all expect* methods */ public TestEnvironment(long defaultTimeoutMillis) { - this(defaultTimeoutMillis, defaultTimeoutMillis, false); + this(defaultTimeoutMillis, defaultTimeoutMillis, defaultTimeoutMillis); } /** @@ -97,7 +132,7 @@ public TestEnvironment(long defaultTimeoutMillis) { * often helpful to pinpoint simple race conditions etc. */ public TestEnvironment(boolean printlnDebug) { - this(envDefaultTimeoutMillis(), envDefaultNoSignalsTimeoutMillis(), printlnDebug); + this(envDefaultTimeoutMillis(), envDefaultNoSignalsTimeoutMillis(), envDefaultPollTimeoutMillis(), printlnDebug); } /** @@ -126,6 +161,14 @@ public long defaultNoSignalsTimeoutMillis() { return defaultNoSignalsTimeoutMillis; } + /** + * The default amount of time to poll for events if {@code defaultTimeoutMillis} isn't preempted by an asynchronous + * event. + */ + public long defaultPollTimeoutMillis() { + return defaultPollTimeoutMillis; + } + /** * Tries to parse the env variable {@code DEFAULT_TIMEOUT_MILLIS} as long and returns the value if present OR its default value. * @@ -156,6 +199,21 @@ public static long envDefaultNoSignalsTimeoutMillis() { } } + /** + * Tries to parse the env variable {@code DEFAULT_POLL_TIMEOUT_MILLIS_ENV} as long and returns the value if present OR its default value. + * + * @throws java.lang.IllegalArgumentException when unable to parse the env variable + */ + public static long envDefaultPollTimeoutMillis() { + final String envMillis = System.getenv(DEFAULT_POLL_TIMEOUT_MILLIS_ENV); + if (envMillis == null) return envDefaultTimeoutMillis(); + else try { + return Long.parseLong(envMillis); + } catch (NumberFormatException ex) { + throw new IllegalArgumentException(String.format("Unable to parse %s env value [%s] as long!", DEFAULT_POLL_TIMEOUT_MILLIS_ENV, envMillis), ex); + } + } + /** * To flop means to "fail asynchronously", either by onErroring or by failing some TCK check triggered asynchronously. * This method does *NOT* fail the test - it's up to inspections of the error to fail the test if required. @@ -277,7 +335,7 @@ public Throwable dropAsyncError() { } /** - * Waits for {@link TestEnvironment#defaultTimeoutMillis()} and then verifies that no asynchronous errors + * Waits for {@link TestEnvironment#defaultNoSignalsTimeoutMillis()} and then verifies that no asynchronous errors * were signalled pior to, or during that time (by calling {@code flop()}). */ public void verifyNoAsyncErrors() { @@ -519,26 +577,32 @@ public void expectCompletion(long timeoutMillis, String errorMsg) throws Interru } public void expectErrorWithMessage(Class expected, String requiredMessagePart) throws Exception { - expectErrorWithMessage(expected, requiredMessagePart, env.defaultTimeoutMillis()); + expectErrorWithMessage(expected, Collections.singletonList(requiredMessagePart), env.defaultTimeoutMillis(), env.defaultPollTimeoutMillis()); } public void expectErrorWithMessage(Class expected, List requiredMessagePartAlternatives) throws Exception { - expectErrorWithMessage(expected, requiredMessagePartAlternatives, env.defaultTimeoutMillis()); + expectErrorWithMessage(expected, requiredMessagePartAlternatives, env.defaultTimeoutMillis(), env.defaultPollTimeoutMillis()); } @SuppressWarnings("ThrowableResultOfMethodCallIgnored") public void expectErrorWithMessage(Class expected, String requiredMessagePart, long timeoutMillis) throws Exception { expectErrorWithMessage(expected, Collections.singletonList(requiredMessagePart), timeoutMillis); } + public void expectErrorWithMessage(Class expected, List requiredMessagePartAlternatives, long timeoutMillis) throws Exception { - final E err = expectError(expected, timeoutMillis); + expectErrorWithMessage(expected, requiredMessagePartAlternatives, timeoutMillis, timeoutMillis); + } + + public void expectErrorWithMessage(Class expected, List requiredMessagePartAlternatives, + long totalTimeoutMillis, long pollTimeoutMillis) throws Exception { + final E err = expectError(expected, totalTimeoutMillis, pollTimeoutMillis); final String message = err.getMessage(); - + boolean contains = false; - for (String requiredMessagePart : requiredMessagePartAlternatives) + for (String requiredMessagePart : requiredMessagePartAlternatives) if (message.contains(requiredMessagePart)) contains = true; // not short-circuting loop, it is expected to assertTrue(contains, - String.format("Got expected exception [%s] but missing message part [%s], was: %s", - err.getClass(), "anyOf: " + requiredMessagePartAlternatives, err.getMessage())); + String.format("Got expected exception [%s] but missing message part [%s], was: %s", + err.getClass(), "anyOf: " + requiredMessagePartAlternatives, err.getMessage())); } public E expectError(Class expected) throws Exception { @@ -546,7 +610,7 @@ public E expectError(Class expected) throws Exception { } public E expectError(Class expected, long timeoutMillis) throws Exception { - return expectError(expected, timeoutMillis, String.format("Expected onError(%s)", expected.getName())); + return expectError(expected, timeoutMillis, env.defaultPollTimeoutMillis()); } public E expectError(Class expected, String errorMsg) throws Exception { @@ -554,7 +618,16 @@ public E expectError(Class expected, String errorMsg) t } public E expectError(Class expected, long timeoutMillis, String errorMsg) throws Exception { - return received.expectError(expected, timeoutMillis, errorMsg); + return expectError(expected, timeoutMillis, env.defaultPollTimeoutMillis(), errorMsg); + } + + public E expectError(Class expected, long totalTimeoutMillis, long pollTimeoutMillis) throws Exception { + return expectError(expected, totalTimeoutMillis, pollTimeoutMillis, String.format("Expected onError(%s)", expected.getName())); + } + + public E expectError(Class expected, long totalTimeoutMillis, long pollTimeoutMillis, + String errorMsg) throws Exception { + return received.expectError(expected, totalTimeoutMillis, pollTimeoutMillis, errorMsg); } public void expectNone() throws InterruptedException { @@ -1025,22 +1098,44 @@ public void expectCompletion(long timeoutMillis, String errorMsg) throws Interru } // else, ok } - @SuppressWarnings("unchecked") + /** + * @deprecated Deprecated in favor of {@link #expectError(Class, long, long, String)}. + */ + @Deprecated public E expectError(Class clazz, long timeoutMillis, String errorMsg) throws Exception { - Thread.sleep(timeoutMillis); - - if (env.asyncErrors.isEmpty()) { - return env.flopAndFail(String.format("%s within %d ms", errorMsg, timeoutMillis)); - } else { - // ok, there was an expected error - Throwable thrown = env.asyncErrors.remove(0); + return expectError(clazz, timeoutMillis, timeoutMillis, errorMsg); + } - if (clazz.isInstance(thrown)) { - return (E) thrown; + @SuppressWarnings("unchecked") + final E expectError(Class clazz, final long totalTimeoutMillis, + long pollTimeoutMillis, + String errorMsg) throws Exception { + long totalTimeoutRemainingNs = MILLISECONDS.toNanos(totalTimeoutMillis); + long timeStampANs = System.nanoTime(); + long timeStampBNs; + + for (;;) { + Thread.sleep(Math.min(pollTimeoutMillis, NANOSECONDS.toMillis(totalTimeoutRemainingNs))); + + if (env.asyncErrors.isEmpty()) { + timeStampBNs = System.nanoTime(); + totalTimeoutRemainingNs =- timeStampBNs - timeStampANs; + timeStampANs = timeStampBNs; + + if (totalTimeoutRemainingNs <= 0) { + return env.flopAndFail(String.format("%s within %d ms", errorMsg, totalTimeoutMillis)); + } } else { + // ok, there was an expected error + Throwable thrown = env.asyncErrors.remove(0); + + if (clazz.isInstance(thrown)) { + return (E) thrown; + } else { - return env.flopAndFail(String.format("%s within %d ms; Got %s but expected %s", - errorMsg, timeoutMillis, thrown.getClass().getCanonicalName(), clazz.getCanonicalName())); + return env.flopAndFail(String.format("%s within %d ms; Got %s but expected %s", + errorMsg, totalTimeoutMillis, thrown.getClass().getCanonicalName(), clazz.getCanonicalName())); + } } } }