diff --git a/src/main/java/io/reactivex/Flowable.java b/src/main/java/io/reactivex/Flowable.java index 3246993537..c2e18393d5 100644 --- a/src/main/java/io/reactivex/Flowable.java +++ b/src/main/java/io/reactivex/Flowable.java @@ -11329,6 +11329,8 @@ public final Flowable scan(final R initialValue, BiFunction o.scan(new ArrayList<>(), (list, item) -> list.add(item))) * ); * + *

+ * Unlike 1.x, this operator doesn't emit the seed value unless the upstream signals an event. *

*
Backpressure:
*
The operator honors downstream backpressure and expects the source {@code Publisher} to honor backpressure as well. diff --git a/src/main/java/io/reactivex/internal/observers/ForEachWhileObserver.java b/src/main/java/io/reactivex/internal/observers/ForEachWhileObserver.java index 8abda8fbf2..9736361e7f 100644 --- a/src/main/java/io/reactivex/internal/observers/ForEachWhileObserver.java +++ b/src/main/java/io/reactivex/internal/observers/ForEachWhileObserver.java @@ -17,7 +17,7 @@ import io.reactivex.Observer; import io.reactivex.disposables.Disposable; -import io.reactivex.exceptions.Exceptions; +import io.reactivex.exceptions.*; import io.reactivex.functions.*; import io.reactivex.internal.disposables.DisposableHelper; import io.reactivex.plugins.RxJavaPlugins; @@ -82,7 +82,7 @@ public void onError(Throwable t) { onError.accept(t); } catch (Throwable ex) { Exceptions.throwIfFatal(ex); - RxJavaPlugins.onError(ex); + RxJavaPlugins.onError(new CompositeException(t, ex)); } } diff --git a/src/main/java/io/reactivex/internal/observers/LambdaObserver.java b/src/main/java/io/reactivex/internal/observers/LambdaObserver.java index e84c540011..95a66ed7e8 100644 --- a/src/main/java/io/reactivex/internal/observers/LambdaObserver.java +++ b/src/main/java/io/reactivex/internal/observers/LambdaObserver.java @@ -17,9 +17,9 @@ import io.reactivex.Observer; import io.reactivex.disposables.Disposable; -import io.reactivex.exceptions.Exceptions; +import io.reactivex.exceptions.*; import io.reactivex.functions.*; -import io.reactivex.internal.disposables.*; +import io.reactivex.internal.disposables.DisposableHelper; import io.reactivex.plugins.RxJavaPlugins; public final class LambdaObserver extends AtomicReference implements Observer, Disposable { @@ -47,42 +47,46 @@ public void onSubscribe(Disposable s) { onSubscribe.accept(this); } catch (Throwable ex) { Exceptions.throwIfFatal(ex); - s.dispose(); - RxJavaPlugins.onError(ex); + onError(ex); } } } @Override public void onNext(T t) { - try { - onNext.accept(t); - } catch (Throwable e) { - Exceptions.throwIfFatal(e); - onError(e); + if (!isDisposed()) { + try { + onNext.accept(t); + } catch (Throwable e) { + Exceptions.throwIfFatal(e); + onError(e); + } } } @Override public void onError(Throwable t) { - dispose(); - try { - onError.accept(t); - } catch (Throwable e) { - Exceptions.throwIfFatal(e); - RxJavaPlugins.onError(e); - RxJavaPlugins.onError(t); + if (!isDisposed()) { + dispose(); + try { + onError.accept(t); + } catch (Throwable e) { + Exceptions.throwIfFatal(e); + RxJavaPlugins.onError(new CompositeException(t, e)); + } } } @Override public void onComplete() { - dispose(); - try { - onComplete.run(); - } catch (Throwable e) { - Exceptions.throwIfFatal(e); - RxJavaPlugins.onError(e); + if (!isDisposed()) { + dispose(); + try { + onComplete.run(); + } catch (Throwable e) { + Exceptions.throwIfFatal(e); + RxJavaPlugins.onError(e); + } } } diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableConcatMapEager.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableConcatMapEager.java index d133c23428..ab91853b6e 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableConcatMapEager.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableConcatMapEager.java @@ -72,7 +72,7 @@ static final class ConcatMapEagerDelayErrorSubscriber final ErrorMode errorMode; - final AtomicReference error; + final AtomicThrowable errors; final AtomicLong requested; @@ -95,7 +95,7 @@ static final class ConcatMapEagerDelayErrorSubscriber this.prefetch = prefetch; this.errorMode = errorMode; this.subscribers = new SpscLinkedArrayQueue>(Math.min(prefetch, maxConcurrency)); - this.error = new AtomicReference(); + this.errors = new AtomicThrowable(); this.requested = new AtomicLong(); } @@ -146,7 +146,7 @@ public void onNext(T t) { @Override public void onError(Throwable t) { - if (ExceptionHelper.addThrowable(error, t)) { + if (errors.addThrowable(t)) { done = true; drain(); } else { @@ -207,7 +207,7 @@ public void innerNext(InnerQueuedSubscriber inner, R value) { @Override public void innerError(InnerQueuedSubscriber inner, Throwable e) { - if (ExceptionHelper.addThrowable(this.error, e)) { + if (errors.addThrowable(e)) { inner.setDone(); if (errorMode != ErrorMode.END) { s.cancel(); @@ -242,11 +242,11 @@ public void drain() { if (inner == null) { if (em != ErrorMode.END) { - Throwable ex = error.get(); + Throwable ex = errors.get(); if (ex != null) { cancelAll(); - a.onError(ex); + a.onError(errors.terminate()); return; } } @@ -256,7 +256,7 @@ public void drain() { inner = subscribers.poll(); if (outerDone && inner == null) { - Throwable ex = error.get(); + Throwable ex = errors.terminate(); if (ex != null) { a.onError(ex); } else { @@ -282,13 +282,13 @@ public void drain() { } if (em == ErrorMode.IMMEDIATE) { - Throwable ex = error.get(); + Throwable ex = errors.get(); if (ex != null) { current = null; inner.cancel(); cancelAll(); - a.onError(ex); + a.onError(errors.terminate()); return; } } @@ -336,13 +336,13 @@ public void drain() { } if (em == ErrorMode.IMMEDIATE) { - Throwable ex = error.get(); + Throwable ex = errors.get(); if (ex != null) { current = null; inner.cancel(); cancelAll(); - a.onError(ex); + a.onError(errors.terminate()); return; } } diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableCreate.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableCreate.java index 3d791d8c3c..ba458fa6db 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableCreate.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableCreate.java @@ -22,7 +22,7 @@ import io.reactivex.exceptions.*; import io.reactivex.functions.Cancellable; import io.reactivex.internal.disposables.*; -import io.reactivex.internal.fuseable.SimpleQueue; +import io.reactivex.internal.fuseable.SimplePlainQueue; import io.reactivex.internal.queue.SpscLinkedArrayQueue; import io.reactivex.internal.subscriptions.SubscriptionHelper; import io.reactivex.internal.util.*; @@ -91,7 +91,7 @@ static final class SerializedEmitter final AtomicThrowable error; - final SimpleQueue queue; + final SimplePlainQueue queue; volatile boolean done; @@ -116,7 +116,7 @@ public void onNext(T t) { return; } } else { - SimpleQueue q = queue; + SimplePlainQueue q = queue; synchronized (q) { q.offer(t); } @@ -161,7 +161,7 @@ void drain() { void drainLoop() { BaseEmitter e = emitter; - SimpleQueue q = queue; + SimplePlainQueue q = queue; AtomicThrowable error = this.error; int missed = 1; for (;;) { @@ -179,15 +179,8 @@ void drainLoop() { } boolean d = done; - T v; - - try { - v = q.poll(); - } catch (Throwable ex) { - Exceptions.throwIfFatal(ex); - // should never happen - v = null; - } + + T v = q.poll(); boolean empty = v == null; diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableDebounce.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableDebounce.java index 9dc68fc769..0472c3ebd3 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableDebounce.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableDebounce.java @@ -142,9 +142,7 @@ void emit(long idx, T value) { long r = get(); if (r != 0L) { actual.onNext(value); - if (r != Long.MAX_VALUE) { - decrementAndGet(); - } + BackpressureHelper.produced(this, 1); } else { cancel(); actual.onError(new MissingBackpressureException("Could not deliver value due to lack of requests")); diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableDebounceTimed.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableDebounceTimed.java index 11d0b9cf3a..5bc03e9b32 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableDebounceTimed.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableDebounceTimed.java @@ -22,7 +22,7 @@ import io.reactivex.Scheduler.Worker; import io.reactivex.disposables.Disposable; import io.reactivex.exceptions.MissingBackpressureException; -import io.reactivex.internal.disposables.DisposableHelper; +import io.reactivex.internal.disposables.*; import io.reactivex.internal.subscriptions.SubscriptionHelper; import io.reactivex.internal.util.BackpressureHelper; import io.reactivex.plugins.RxJavaPlugins; @@ -58,7 +58,7 @@ static final class DebounceTimedSubscriber extends AtomicLong Subscription s; - final AtomicReference timer = new AtomicReference(); + final SequentialDisposable timer = new SequentialDisposable(); volatile long index; @@ -94,13 +94,11 @@ public void onNext(T t) { } DebounceEmitter de = new DebounceEmitter(t, idx, this); - if (!timer.compareAndSet(d, de)) { - return; - } + if (timer.replace(de)) { + d = worker.schedule(de, timeout, unit); - d = worker.schedule(de, timeout, unit); - - de.setResource(d); + de.setResource(d); + } } @Override @@ -153,9 +151,7 @@ void emit(long idx, T t, DebounceEmitter emitter) { long r = get(); if (r != 0L) { actual.onNext(t); - if (r != Long.MAX_VALUE) { - decrementAndGet(); - } + BackpressureHelper.produced(this, 1); emitter.dispose(); } else { diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableFlatMapCompletableCompletable.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableFlatMapCompletableCompletable.java index 231f333e56..49143e5bab 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableFlatMapCompletableCompletable.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableFlatMapCompletableCompletable.java @@ -13,7 +13,7 @@ package io.reactivex.internal.operators.flowable; -import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.atomic.*; import org.reactivestreams.*; @@ -24,7 +24,6 @@ import io.reactivex.internal.disposables.DisposableHelper; import io.reactivex.internal.functions.ObjectHelper; import io.reactivex.internal.fuseable.FuseToFlowable; -import io.reactivex.internal.observers.BasicIntQueueDisposable; import io.reactivex.internal.subscriptions.SubscriptionHelper; import io.reactivex.internal.util.AtomicThrowable; import io.reactivex.plugins.RxJavaPlugins; @@ -62,8 +61,8 @@ public Flowable fuseToFlowable() { return RxJavaPlugins.onAssembly(new FlowableFlatMapCompletable(source, mapper, delayErrors, maxConcurrency)); } - static final class FlatMapCompletableMainSubscriber extends BasicIntQueueDisposable - implements Subscriber { + static final class FlatMapCompletableMainSubscriber extends AtomicInteger + implements Subscriber, Disposable { private static final long serialVersionUID = 8443155186132538303L; final CompletableObserver actual; @@ -183,26 +182,6 @@ public boolean isDisposed() { return set.isDisposed(); } - @Override - public T poll() throws Exception { - return null; // always empty - } - - @Override - public boolean isEmpty() { - return true; // always empty - } - - @Override - public void clear() { - // nothing to clear - } - - @Override - public int requestFusion(int mode) { - return mode & ASYNC; - } - void innerComplete(InnerObserver inner) { set.delete(inner); onComplete(); diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableGroupBy.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableGroupBy.java index 909ee5943b..87104cb9f3 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableGroupBy.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableGroupBy.java @@ -167,15 +167,14 @@ public void onError(Throwable t) { @Override public void onComplete() { - if (done) { - return; - } - for (GroupedUnicast g : groups.values()) { - g.onComplete(); + if (!done) { + for (GroupedUnicast g : groups.values()) { + g.onComplete(); + } + groups.clear(); + done = true; + drain(); } - groups.clear(); - done = true; - drain(); } @Override @@ -435,11 +434,10 @@ static final class State extends BasicIntQueueSubscription implements P @Override public void request(long n) { - if (!SubscriptionHelper.validate(n)) { - return; + if (SubscriptionHelper.validate(n)) { + BackpressureHelper.add(requested, n); + drain(); } - BackpressureHelper.add(requested, n); - drain(); } @Override @@ -461,12 +459,7 @@ public void subscribe(Subscriber s) { } public void onNext(T t) { - if (t == null) { - error = new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."); - done = true; - } else { - queue.offer(t); - } + queue.offer(t); drain(); } diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableOnBackpressureBuffer.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableOnBackpressureBuffer.java index c9a5acb409..9e95e57367 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableOnBackpressureBuffer.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableOnBackpressureBuffer.java @@ -13,13 +13,13 @@ package io.reactivex.internal.operators.flowable; -import java.util.concurrent.atomic.*; +import java.util.concurrent.atomic.AtomicLong; import org.reactivestreams.*; import io.reactivex.exceptions.*; import io.reactivex.functions.Action; -import io.reactivex.internal.fuseable.SimpleQueue; +import io.reactivex.internal.fuseable.*; import io.reactivex.internal.queue.*; import io.reactivex.internal.subscriptions.*; import io.reactivex.internal.util.BackpressureHelper; @@ -49,7 +49,7 @@ static final class BackpressureBufferSubscriber extends BasicIntQueueSubscrip private static final long serialVersionUID = -2514538129242366402L; final Subscriber actual; - final SimpleQueue queue; + final SimplePlainQueue queue; final boolean delayError; final Action onOverflow; @@ -70,7 +70,7 @@ static final class BackpressureBufferSubscriber extends BasicIntQueueSubscrip this.onOverflow = onOverflow; this.delayError = delayError; - SimpleQueue q; + SimplePlainQueue q; if (unbounded) { q = new SpscLinkedArrayQueue(bufferSize); @@ -157,7 +157,7 @@ public void cancel() { void drain() { if (getAndIncrement() == 0) { int missed = 1; - final SimpleQueue q = queue; + final SimplePlainQueue q = queue; final Subscriber a = actual; for (;;) { @@ -171,16 +171,7 @@ void drain() { while (e != r) { boolean d = done; - T v; - - try { - v = q.poll(); - } catch (Throwable ex) { - Exceptions.throwIfFatal(ex); - s.cancel(); - a.onError(ex); - return; - } + T v = q.poll(); boolean empty = v == null; if (checkTerminated(d, empty, a)) { diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableScanSeed.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableScanSeed.java index f382dc199b..6ddbb00cdb 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableScanSeed.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableScanSeed.java @@ -12,17 +12,15 @@ */ package io.reactivex.internal.operators.flowable; -import io.reactivex.internal.functions.ObjectHelper; import java.util.concurrent.Callable; import org.reactivestreams.*; import io.reactivex.exceptions.Exceptions; import io.reactivex.functions.BiFunction; -import io.reactivex.internal.queue.SpscArrayQueue; -import io.reactivex.internal.subscribers.QueueDrainSubscriber; -import io.reactivex.internal.subscriptions.*; -import io.reactivex.plugins.RxJavaPlugins; +import io.reactivex.internal.functions.ObjectHelper; +import io.reactivex.internal.subscribers.SinglePostCompleteSubscriber; +import io.reactivex.internal.subscriptions.EmptySubscription; public final class FlowableScanSeed extends AbstractFlowableWithUpstream { final BiFunction accumulator; @@ -49,27 +47,15 @@ protected void subscribeActual(Subscriber s) { source.subscribe(new ScanSeedSubscriber(s, accumulator, r)); } - // FIXME update to a fresh Rsc algorithm - static final class ScanSeedSubscriber extends QueueDrainSubscriber implements Subscription { - final BiFunction accumulator; - - R value; + static final class ScanSeedSubscriber extends SinglePostCompleteSubscriber { + private static final long serialVersionUID = -1776795561228106469L; - Subscription s; + final BiFunction accumulator; ScanSeedSubscriber(Subscriber actual, BiFunction accumulator, R value) { - super(actual, new SpscArrayQueue(2)); + super(actual); this.accumulator = accumulator; this.value = value; - queue.offer(value); - } - - @Override - public void onSubscribe(Subscription s) { - if (SubscriptionHelper.validate(this.s, s)) { - this.s = s; - actual.onSubscribe(this); - } } @Override @@ -88,54 +74,19 @@ public void onNext(T t) { } value = u; - - if (!queue.offer(u)) { - s.cancel(); - onError(new IllegalStateException("Queue if full?!")); - return; - } - drain(false); + produced++; + actual.onNext(v); } @Override public void onError(Throwable t) { - if (done) { - RxJavaPlugins.onError(t); - return; - } - error = t; - done = true; - drain(false); + value = null; + actual.onError(t); } @Override public void onComplete() { - if (done) { - return; - } - done = true; - drain(false); - } - - @Override - public void request(long n) { - requested(n); - s.request(n); - drain(false); - } - - @Override - public void cancel() { - if (!cancelled) { - cancelled = true; - s.cancel(); - } - } - - @Override - public boolean accept(Subscriber a, R v) { - a.onNext(v); - return true; + complete(value); } } } diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableSkipUntil.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableSkipUntil.java index 2fc06c3bdc..1b941d0734 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableSkipUntil.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableSkipUntil.java @@ -13,12 +13,13 @@ package io.reactivex.internal.operators.flowable; -import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.*; import org.reactivestreams.*; -import io.reactivex.internal.subscriptions.*; -import io.reactivex.subscribers.SerializedSubscriber; +import io.reactivex.internal.fuseable.ConditionalSubscriber; +import io.reactivex.internal.subscriptions.SubscriptionHelper; +import io.reactivex.internal.util.*; public final class FlowableSkipUntil extends AbstractFlowableWithUpstream { final Publisher other; @@ -29,112 +30,110 @@ public FlowableSkipUntil(Publisher source, Publisher other) { @Override protected void subscribeActual(Subscriber child) { - final SerializedSubscriber serial = new SerializedSubscriber(child); + SkipUntilMainSubscriber parent = new SkipUntilMainSubscriber(child); + child.onSubscribe(parent); - final ArrayCompositeSubscription frc = new ArrayCompositeSubscription(2); + other.subscribe(parent.other); - final SkipUntilSubscriber sus = new SkipUntilSubscriber(serial, frc); - - other.subscribe(new Subscriber() { - Subscription s; - @Override - public void onSubscribe(Subscription s) { - if (SubscriptionHelper.validate(this.s, s)) { - this.s = s; - if (frc.setResource(1, s)) { - s.request(Long.MAX_VALUE); - } - } - } - - @Override - public void onNext(U t) { - s.cancel(); - sus.notSkipping = true; - } - - @Override - public void onError(Throwable t) { - frc.dispose(); - // in case the other emits an onError before the main even sets a subscription - if (sus.compareAndSet(false, true)) { - EmptySubscription.error(t, serial); - } else { - serial.onError(t); - } - } + source.subscribe(parent); + } - @Override - public void onComplete() { - sus.notSkipping = true; - } - }); + static final class SkipUntilMainSubscriber extends AtomicInteger + implements ConditionalSubscriber, Subscription { + private static final long serialVersionUID = -6270983465606289181L; - source.subscribe(sus); - } + final Subscriber actual; + final AtomicReference s; - static final class SkipUntilSubscriber extends AtomicBoolean implements Subscriber, Subscription { + final AtomicLong requested; - private static final long serialVersionUID = -1113667257122396604L; - final Subscriber actual; - final ArrayCompositeSubscription frc; + final OtherSubscriber other; - Subscription s; + final AtomicThrowable error; - volatile boolean notSkipping; - boolean notSkippingLocal; + volatile boolean gate; - SkipUntilSubscriber(Subscriber actual, ArrayCompositeSubscription frc) { + SkipUntilMainSubscriber(Subscriber actual) { this.actual = actual; - this.frc = frc; + this.s = new AtomicReference(); + this.requested = new AtomicLong(); + this.other = new OtherSubscriber(); + this.error = new AtomicThrowable(); } @Override public void onSubscribe(Subscription s) { - if (SubscriptionHelper.validate(this.s, s)) { - this.s = s; - if (frc.setResource(0, s)) { - if (compareAndSet(false, true)) { - actual.onSubscribe(this); - } - } - } + SubscriptionHelper.deferredSetOnce(this.s, requested, s); } @Override public void onNext(T t) { - if (notSkippingLocal) { - actual.onNext(t); - } else - if (notSkipping) { - notSkippingLocal = true; - actual.onNext(t); - } else { - s.request(1); + if (!tryOnNext(t)) { + s.get().request(1); } } + @Override + public boolean tryOnNext(T t) { + if (gate) { + HalfSerializer.onNext(actual, t, this, error); + return true; + } + return false; + } + @Override public void onError(Throwable t) { - frc.dispose(); - actual.onError(t); + SubscriptionHelper.cancel(other); + HalfSerializer.onError(actual, t, SkipUntilMainSubscriber.this, error); } @Override public void onComplete() { - frc.dispose(); - actual.onComplete(); + SubscriptionHelper.cancel(other); + HalfSerializer.onComplete(actual, this, error); } @Override public void request(long n) { - s.request(n); + SubscriptionHelper.deferredRequest(s, requested, n); } @Override public void cancel() { - frc.dispose(); + SubscriptionHelper.cancel(s); + SubscriptionHelper.cancel(other); + } + + final class OtherSubscriber extends AtomicReference + implements Subscriber { + + private static final long serialVersionUID = -5592042965931999169L; + + @Override + public void onSubscribe(Subscription s) { + if (SubscriptionHelper.setOnce(this, s)) { + s.request(Long.MAX_VALUE); + } + } + + @Override + public void onNext(Object t) { + gate = true; + get().cancel(); + } + + @Override + public void onError(Throwable t) { + SubscriptionHelper.cancel(s); + HalfSerializer.onError(actual, t, SkipUntilMainSubscriber.this, error); + } + + @Override + public void onComplete() { + gate = true; + } } } } diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableTimeoutTimed.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableTimeoutTimed.java index a8d0027241..e0738b951b 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableTimeoutTimed.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableTimeoutTimed.java @@ -131,13 +131,10 @@ public void run() { done = true; s.cancel(); DisposableHelper.dispose(timer); - worker.dispose(); - if (other == null) { - actual.onError(new TimeoutException()); - } else { - subscribeNext(); - } + subscribeNext(); + + worker.dispose(); } } }, timeout, unit); diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableWindowTimed.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableWindowTimed.java index a4f027352f..6c380e8494 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableWindowTimed.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableWindowTimed.java @@ -15,15 +15,14 @@ import java.util.*; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicReference; import org.reactivestreams.*; import io.reactivex.*; import io.reactivex.Scheduler.Worker; import io.reactivex.disposables.Disposable; -import io.reactivex.exceptions.*; -import io.reactivex.internal.disposables.DisposableHelper; +import io.reactivex.exceptions.MissingBackpressureException; +import io.reactivex.internal.disposables.*; import io.reactivex.internal.fuseable.SimplePlainQueue; import io.reactivex.internal.queue.MpscLinkedQueue; import io.reactivex.internal.subscribers.QueueDrainSubscriber; @@ -87,7 +86,7 @@ static final class WindowExactUnboundedSubscriber UnicastProcessor window; - final AtomicReference timer = new AtomicReference(); + final SequentialDisposable timer = new SequentialDisposable(); static final Object NEXT = new Object(); @@ -104,37 +103,32 @@ static final class WindowExactUnboundedSubscriber @Override public void onSubscribe(Subscription s) { - if (!SubscriptionHelper.validate(this.s, s)) { - return; - } - this.s = s; + if (SubscriptionHelper.validate(this.s, s)) { + this.s = s; - window = UnicastProcessor.create(bufferSize); + window = UnicastProcessor.create(bufferSize); - Subscriber> a = actual; - a.onSubscribe(this); + Subscriber> a = actual; + a.onSubscribe(this); - long r = requested(); - if (r != 0L) { - a.onNext(window); - if (r != Long.MAX_VALUE) { - produced(1); - } - } else { - cancelled = true; - s.cancel(); - a.onError(new MissingBackpressureException("Could not deliver first window due to lack of requests.")); - return; - } - - if (!cancelled) { - Disposable d = scheduler.schedulePeriodicallyDirect(this, timespan, timespan, unit); - if (!timer.compareAndSet(null, d)) { - d.dispose(); + long r = requested(); + if (r != 0L) { + a.onNext(window); + if (r != Long.MAX_VALUE) { + produced(1); + } + } else { + cancelled = true; + s.cancel(); + a.onError(new MissingBackpressureException("Could not deliver first window due to lack of requests.")); return; } - s.request(Long.MAX_VALUE); + if (!cancelled) { + if (timer.replace(scheduler.schedulePeriodicallyDirect(this, timespan, timespan, unit))) { + s.request(Long.MAX_VALUE); + } + } } } @@ -300,7 +294,7 @@ static final class WindowExactBoundedSubscriber volatile boolean terminated; - final AtomicReference timer = new AtomicReference(); + final SequentialDisposable timer = new SequentialDisposable(); WindowExactBoundedSubscriber( Subscriber> actual, @@ -317,52 +311,49 @@ static final class WindowExactBoundedSubscriber @Override public void onSubscribe(Subscription s) { - if (!SubscriptionHelper.validate(this.s, s)) { - return; - } + if (SubscriptionHelper.validate(this.s, s)) { - this.s = s; + this.s = s; - Subscriber> a = actual; + Subscriber> a = actual; - a.onSubscribe(this); + a.onSubscribe(this); - if (cancelled) { - return; - } + if (cancelled) { + return; + } - UnicastProcessor w = UnicastProcessor.create(bufferSize); - window = w; + UnicastProcessor w = UnicastProcessor.create(bufferSize); + window = w; - long r = requested(); - if (r != 0L) { - a.onNext(w); - if (r != Long.MAX_VALUE) { - produced(1); + long r = requested(); + if (r != 0L) { + a.onNext(w); + if (r != Long.MAX_VALUE) { + produced(1); + } + } else { + cancelled = true; + s.cancel(); + a.onError(new MissingBackpressureException("Could not deliver initial window due to lack of requests.")); + return; } - } else { - cancelled = true; - s.cancel(); - a.onError(new MissingBackpressureException("Could not deliver initial window due to lack of requests.")); - return; - } - Disposable d; - ConsumerIndexHolder consumerIndexHolder = new ConsumerIndexHolder(producerIndex, this); - if (restartTimerOnMaxSize) { - Scheduler.Worker sw = scheduler.createWorker(); - worker = sw; - sw.schedulePeriodically(consumerIndexHolder, timespan, timespan, unit); - d = sw; - } else { - d = scheduler.schedulePeriodicallyDirect(consumerIndexHolder, timespan, timespan, unit); - } + Disposable d; + ConsumerIndexHolder consumerIndexHolder = new ConsumerIndexHolder(producerIndex, this); + if (restartTimerOnMaxSize) { + Scheduler.Worker sw = scheduler.createWorker(); + worker = sw; + sw.schedulePeriodically(consumerIndexHolder, timespan, timespan, unit); + d = sw; + } else { + d = scheduler.schedulePeriodicallyDirect(consumerIndexHolder, timespan, timespan, unit); + } - if (!timer.compareAndSet(null, d)) { - d.dispose(); - return; + if (timer.replace(d)) { + s.request(Long.MAX_VALUE); + } } - s.request(Long.MAX_VALUE); } @Override @@ -629,41 +620,40 @@ static final class WindowSkipSubscriber @Override public void onSubscribe(Subscription s) { - if (!SubscriptionHelper.validate(this.s, s)) { - return; - } + if (SubscriptionHelper.validate(this.s, s)) { - this.s = s; + this.s = s; - actual.onSubscribe(this); + actual.onSubscribe(this); - if (cancelled) { - return; - } + if (cancelled) { + return; + } - long r = requested(); - if (r != 0L) { - final UnicastProcessor w = UnicastProcessor.create(bufferSize); - windows.add(w); + long r = requested(); + if (r != 0L) { + final UnicastProcessor w = UnicastProcessor.create(bufferSize); + windows.add(w); - actual.onNext(w); - if (r != Long.MAX_VALUE) { - produced(1); - } - worker.schedule(new Runnable() { - @Override - public void run() { - complete(w); + actual.onNext(w); + if (r != Long.MAX_VALUE) { + produced(1); } - }, timespan, unit); + worker.schedule(new Runnable() { + @Override + public void run() { + complete(w); + } + }, timespan, unit); - worker.schedulePeriodically(this, timeskip, timeskip, unit); + worker.schedulePeriodically(this, timeskip, timeskip, unit); - s.request(Long.MAX_VALUE); + s.request(Long.MAX_VALUE); - } else { - s.cancel(); - actual.onError(new MissingBackpressureException("Could not emit the first window due to lack of requests")); + } else { + s.cancel(); + actual.onError(new MissingBackpressureException("Could not emit the first window due to lack of requests")); + } } } diff --git a/src/main/java/io/reactivex/internal/schedulers/IoScheduler.java b/src/main/java/io/reactivex/internal/schedulers/IoScheduler.java index 913a2643a6..d771ed03f6 100644 --- a/src/main/java/io/reactivex/internal/schedulers/IoScheduler.java +++ b/src/main/java/io/reactivex/internal/schedulers/IoScheduler.java @@ -74,8 +74,7 @@ static final class CachedWorkerPool implements Runnable { Future task = null; if (unit != null) { evictor = Executors.newScheduledThreadPool(1, EVICTOR_THREAD_FACTORY); - task = evictor.scheduleWithFixedDelay(this, this.keepAliveTime, this.keepAliveTime, TimeUnit.NANOSECONDS - ); + task = evictor.scheduleWithFixedDelay(this, this.keepAliveTime, this.keepAliveTime, TimeUnit.NANOSECONDS); } evictorService = evictor; evictorTask = task; diff --git a/src/main/java/io/reactivex/internal/subscribers/BasicFuseableConditionalSubscriber.java b/src/main/java/io/reactivex/internal/subscribers/BasicFuseableConditionalSubscriber.java index 9ae63a9765..2e4d49ffcd 100644 --- a/src/main/java/io/reactivex/internal/subscribers/BasicFuseableConditionalSubscriber.java +++ b/src/main/java/io/reactivex/internal/subscribers/BasicFuseableConditionalSubscriber.java @@ -90,29 +90,6 @@ protected void afterDownstream() { // Convenience and state-aware methods // ----------------------------------- - /** - * Emits the value to the actual subscriber if {@link #done} is false. - * @param value the value to signal - */ - protected final void next(R value) { - if (done) { - return; - } - actual.onNext(value); - } - - /** - * Tries to emit the value to the actual subscriber if {@link #done} is false - * and returns the response from the {@link ConditionalSubscriber#tryOnNext(Object)} - * call. - * @param value the value to signal - * @return the response from the actual subscriber: true indicates accepted value, - * false indicates dropped value - */ - protected final boolean tryNext(R value) { - return !done && actual.tryOnNext(value); - } - @Override public void onError(Throwable t) { if (done) { @@ -142,27 +119,6 @@ public void onComplete() { actual.onComplete(); } - /** - * Calls the upstream's QueueSubscription.requestFusion with the mode and - * saves the established mode in {@link #sourceMode}. - *

- * If the upstream doesn't support fusion ({@link #qs} is null), the method - * returns {@link QueueSubscription#NONE}. - * @param mode the fusion mode requested - * @return the established fusion mode - */ - protected final int transitiveFusion(int mode) { - QueueSubscription qs = this.qs; - if (qs != null) { - int m = qs.requestFusion(mode); - if (m != NONE) { - sourceMode = m; - } - return m; - } - return NONE; - } - /** * Calls the upstream's QueueSubscription.requestFusion with the mode and * saves the established mode in {@link #sourceMode} if that mode doesn't diff --git a/src/main/java/io/reactivex/internal/subscribers/InnerQueuedSubscriber.java b/src/main/java/io/reactivex/internal/subscribers/InnerQueuedSubscriber.java index 70aeadce3f..0f63c7285a 100644 --- a/src/main/java/io/reactivex/internal/subscribers/InnerQueuedSubscriber.java +++ b/src/main/java/io/reactivex/internal/subscribers/InnerQueuedSubscriber.java @@ -143,8 +143,4 @@ public void setDone() { public SimpleQueue queue() { return queue; } - - public int fusionMode() { - return fusionMode; - } } diff --git a/src/test/java/io/reactivex/internal/observers/LambdaObserverTest.java b/src/test/java/io/reactivex/internal/observers/LambdaObserverTest.java new file mode 100644 index 0000000000..b5c209fc68 --- /dev/null +++ b/src/test/java/io/reactivex/internal/observers/LambdaObserverTest.java @@ -0,0 +1,283 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.observers; + +import static org.junit.Assert.*; + +import java.util.*; + +import org.junit.Test; + +import io.reactivex.Observable; +import io.reactivex.Observer; +import io.reactivex.TestHelper; +import io.reactivex.disposables.*; +import io.reactivex.exceptions.*; +import io.reactivex.functions.*; +import io.reactivex.plugins.RxJavaPlugins; + +public class LambdaObserverTest { + + @Test + public void onSubscribeThrows() { + final List received = new ArrayList(); + + LambdaObserver o = new LambdaObserver(new Consumer() { + @Override + public void accept(Object v) throws Exception { + received.add(v); + } + }, + new Consumer() { + @Override + public void accept(Throwable e) throws Exception { + received.add(e); + } + }, new Action() { + @Override + public void run() throws Exception { + received.add(100); + } + }, new Consumer() { + @Override + public void accept(Disposable s) throws Exception { + throw new TestException(); + } + }); + + assertFalse(o.isDisposed()); + + Observable.just(1).subscribe(o); + + assertTrue(received.toString(), received.get(0) instanceof TestException); + assertEquals(received.toString(), 1, received.size()); + + assertTrue(o.isDisposed()); + } + + @Test + public void onNextThrows() { + final List received = new ArrayList(); + + LambdaObserver o = new LambdaObserver(new Consumer() { + @Override + public void accept(Object v) throws Exception { + throw new TestException(); + } + }, + new Consumer() { + @Override + public void accept(Throwable e) throws Exception { + received.add(e); + } + }, new Action() { + @Override + public void run() throws Exception { + received.add(100); + } + }, new Consumer() { + @Override + public void accept(Disposable s) throws Exception { + } + }); + + assertFalse(o.isDisposed()); + + Observable.just(1).subscribe(o); + + assertTrue(received.toString(), received.get(0) instanceof TestException); + assertEquals(received.toString(), 1, received.size()); + + assertTrue(o.isDisposed()); + } + + @Test + public void onErrorThrows() { + List errors = TestHelper.trackPluginErrors(); + + try { + final List received = new ArrayList(); + + LambdaObserver o = new LambdaObserver(new Consumer() { + @Override + public void accept(Object v) throws Exception { + received.add(v); + } + }, + new Consumer() { + @Override + public void accept(Throwable e) throws Exception { + throw new TestException("Inner"); + } + }, new Action() { + @Override + public void run() throws Exception { + received.add(100); + } + }, new Consumer() { + @Override + public void accept(Disposable s) throws Exception { + } + }); + + assertFalse(o.isDisposed()); + + Observable.error(new TestException("Outer")).subscribe(o); + + assertTrue(received.toString(), received.isEmpty()); + + assertTrue(o.isDisposed()); + + TestHelper.assertError(errors, 0, CompositeException.class); + List ce = TestHelper.compositeList(errors.get(0)); + TestHelper.assertError(ce, 0, TestException.class, "Outer"); + TestHelper.assertError(ce, 1, TestException.class, "Inner"); + } finally { + RxJavaPlugins.reset(); + } + } + + @Test + public void onCompleteThrows() { + List errors = TestHelper.trackPluginErrors(); + + try { + final List received = new ArrayList(); + + LambdaObserver o = new LambdaObserver(new Consumer() { + @Override + public void accept(Object v) throws Exception { + received.add(v); + } + }, + new Consumer() { + @Override + public void accept(Throwable e) throws Exception { + received.add(e); + } + }, new Action() { + @Override + public void run() throws Exception { + throw new TestException(); + } + }, new Consumer() { + @Override + public void accept(Disposable s) throws Exception { + } + }); + + assertFalse(o.isDisposed()); + + Observable.empty().subscribe(o); + + assertTrue(received.toString(), received.isEmpty()); + + assertTrue(o.isDisposed()); + + TestHelper.assertError(errors, 0, TestException.class); + } finally { + RxJavaPlugins.reset(); + } + } + + @Test + public void badSourceOnSubscribe() { + Observable source = new Observable() { + @Override + public void subscribeActual(Observer s) { + Disposable s1 = Disposables.empty(); + s.onSubscribe(s1); + Disposable s2 = Disposables.empty(); + s.onSubscribe(s2); + + assertFalse(s1.isDisposed()); + assertTrue(s2.isDisposed()); + + s.onNext(1); + s.onComplete(); + } + }; + + final List received = new ArrayList(); + + LambdaObserver o = new LambdaObserver(new Consumer() { + @Override + public void accept(Object v) throws Exception { + received.add(v); + } + }, + new Consumer() { + @Override + public void accept(Throwable e) throws Exception { + received.add(e); + } + }, new Action() { + @Override + public void run() throws Exception { + received.add(100); + } + }, new Consumer() { + @Override + public void accept(Disposable s) throws Exception { + } + }); + + source.subscribe(o); + + assertEquals(Arrays.asList(1, 100), received); + } + @Test + public void badSourceEmitAfterDone() { + Observable source = new Observable() { + @Override + public void subscribeActual(Observer s) { + s.onSubscribe(Disposables.empty()); + + s.onNext(1); + s.onComplete(); + s.onNext(2); + s.onError(new TestException()); + s.onComplete(); + } + }; + + final List received = new ArrayList(); + + LambdaObserver o = new LambdaObserver(new Consumer() { + @Override + public void accept(Object v) throws Exception { + received.add(v); + } + }, + new Consumer() { + @Override + public void accept(Throwable e) throws Exception { + received.add(e); + } + }, new Action() { + @Override + public void run() throws Exception { + received.add(100); + } + }, new Consumer() { + @Override + public void accept(Disposable s) throws Exception { + } + }); + + source.subscribe(o); + + assertEquals(Arrays.asList(1, 100), received); + } +} diff --git a/src/test/java/io/reactivex/internal/operators/flowable/FlowableConcatMapEagerTest.java b/src/test/java/io/reactivex/internal/operators/flowable/FlowableConcatMapEagerTest.java index 55525c84f0..2a0b522b53 100644 --- a/src/test/java/io/reactivex/internal/operators/flowable/FlowableConcatMapEagerTest.java +++ b/src/test/java/io/reactivex/internal/operators/flowable/FlowableConcatMapEagerTest.java @@ -21,12 +21,13 @@ import java.util.concurrent.atomic.*; import org.junit.*; -import org.reactivestreams.Publisher; +import org.reactivestreams.*; import io.reactivex.*; import io.reactivex.exceptions.*; import io.reactivex.functions.*; import io.reactivex.internal.functions.Functions; +import io.reactivex.internal.subscriptions.BooleanSubscription; import io.reactivex.plugins.RxJavaPlugins; import io.reactivex.processors.*; import io.reactivex.schedulers.Schedulers; @@ -1061,4 +1062,117 @@ public Flowable apply(Object v) throws Exception { }); } + @Test + public void doubleOnError() { + List errors = TestHelper.trackPluginErrors(); + try { + @SuppressWarnings("rawtypes") + final Subscriber[] sub = { null }; + + new Flowable() { + @Override + protected void subscribeActual(Subscriber s) { + sub[0] = s; + s.onSubscribe(new BooleanSubscription()); + s.onNext(1); + s.onError(new TestException("First")); + } + } + .concatMapEager(Functions.justFunction(Flowable.just(1))) + .test() + .assertFailureAndMessage(TestException.class, "First", 1); + + sub[0].onError(new TestException("Second")); + + TestHelper.assertError(errors, 0, TestException.class, "Second"); + } finally { + RxJavaPlugins.reset(); + } + } + + @Test + public void innerOverflow() { + List errors = TestHelper.trackPluginErrors(); + try { + Flowable.just(1) + .concatMapEager(new Function>() { + @Override + public Publisher apply(Integer v) throws Exception { + return new Flowable() { + @Override + protected void subscribeActual(Subscriber s) { + s.onSubscribe(new BooleanSubscription()); + s.onNext(1); + s.onNext(2); + s.onError(new TestException()); + } + }; + } + }, 1, 1) + .test(0L) + .assertFailure(MissingBackpressureException.class); + + TestHelper.assertError(errors, 0, TestException.class); + } finally { + RxJavaPlugins.reset(); + } + } + + @Test + public void unboundedIn() { + int n = Flowable.bufferSize() * 2; + Flowable.range(1, n) + .concatMapEager(new Function>() { + @Override + public Publisher apply(Integer v) throws Exception { + return Flowable.just(1); + } + }, Integer.MAX_VALUE, 16) + .test() + .assertValueCount(n) + .assertComplete() + .assertNoErrors(); + } + + @Test + public void drainCancelRaceOnEmpty() { + for (int i = 0; i < 500; i++) { + final PublishProcessor pp = PublishProcessor.create(); + + final TestSubscriber ts = new TestSubscriber(0L); + + Flowable.just(1) + .concatMapEager(Functions.justFunction(pp)) + .subscribe(ts); + + Runnable r1 = new Runnable() { + @Override + public void run() { + pp.onComplete(); + } + }; + + Runnable r2 = new Runnable() { + @Override + public void run() { + ts.cancel(); + } + }; + + TestHelper.race(r1, r2); + } + } + + @Test + public void innerLong() { + int n = Flowable.bufferSize() * 2; + + Flowable.just(1).hide() + .concatMapEager(Functions.justFunction(Flowable.range(1, n).hide())) + .rebatchRequests(1) + .test() + .assertValueCount(n) + .assertComplete() + .assertNoErrors(); + } } diff --git a/src/test/java/io/reactivex/internal/operators/flowable/FlowableDebounceTest.java b/src/test/java/io/reactivex/internal/operators/flowable/FlowableDebounceTest.java index b562216a75..05fb61204f 100644 --- a/src/test/java/io/reactivex/internal/operators/flowable/FlowableDebounceTest.java +++ b/src/test/java/io/reactivex/internal/operators/flowable/FlowableDebounceTest.java @@ -25,7 +25,7 @@ import io.reactivex.*; import io.reactivex.disposables.*; -import io.reactivex.exceptions.TestException; +import io.reactivex.exceptions.*; import io.reactivex.functions.Function; import io.reactivex.internal.functions.Functions; import io.reactivex.internal.subscriptions.BooleanSubscription; @@ -389,4 +389,22 @@ public void debounceWithEmpty() { .test() .assertResult(1); } + + @Test + public void backpressureNoRequest() { + Flowable.just(1) + .debounce(Functions.justFunction(Flowable.timer(1, TimeUnit.MILLISECONDS))) + .test(0L) + .awaitDone(5, TimeUnit.SECONDS) + .assertFailure(MissingBackpressureException.class); + } + + @Test + public void backpressureNoRequestTimed() { + Flowable.just(1) + .debounce(1, TimeUnit.MILLISECONDS) + .test(0L) + .awaitDone(5, TimeUnit.SECONDS) + .assertFailure(MissingBackpressureException.class); + } } \ No newline at end of file diff --git a/src/test/java/io/reactivex/internal/operators/flowable/FlowableFlatMapCompletableTest.java b/src/test/java/io/reactivex/internal/operators/flowable/FlowableFlatMapCompletableTest.java index a944b65aba..21dcce968a 100644 --- a/src/test/java/io/reactivex/internal/operators/flowable/FlowableFlatMapCompletableTest.java +++ b/src/test/java/io/reactivex/internal/operators/flowable/FlowableFlatMapCompletableTest.java @@ -18,7 +18,7 @@ import java.util.List; import java.util.concurrent.TimeUnit; -import org.junit.*; +import org.junit.Test; import org.reactivestreams.*; import io.reactivex.*; @@ -340,7 +340,7 @@ public CompletableSource apply(Integer v) throws Exception { @Test public void fused() { - TestObserver to = ObserverFusion.newTest(QueueDisposable.ANY); + TestSubscriber ts = SubscriberFusion.newTest(QueueDisposable.ANY); Flowable.range(1, 10) .flatMapCompletable(new Function() { @@ -349,11 +349,12 @@ public CompletableSource apply(Integer v) throws Exception { return Completable.complete(); } }) - .subscribe(to); + .toFlowable() + .subscribe(ts); - to - .assertOf(ObserverFusion.assertFuseable()) - .assertOf(ObserverFusion.assertFusionMode(QueueDisposable.ASYNC)) + ts + .assertOf(SubscriberFusion.assertFuseable()) + .assertOf(SubscriberFusion.assertFusionMode(QueueDisposable.ASYNC)) .assertResult(); } @@ -506,4 +507,20 @@ protected void subscribeActual(CompletableObserver s) { }) .test(); } + + @Test + public void delayErrorMaxConcurrency() { + Flowable.range(1, 3) + .flatMapCompletable(new Function() { + @Override + public CompletableSource apply(Integer v) throws Exception { + if (v == 2) { + return Completable.error(new TestException()); + } + return Completable.complete(); + } + }, true, 1) + .test() + .assertFailure(TestException.class); + } } diff --git a/src/test/java/io/reactivex/internal/operators/flowable/FlowableGroupByTest.java b/src/test/java/io/reactivex/internal/operators/flowable/FlowableGroupByTest.java index 6481a7fa0a..4acd39cc05 100644 --- a/src/test/java/io/reactivex/internal/operators/flowable/FlowableGroupByTest.java +++ b/src/test/java/io/reactivex/internal/operators/flowable/FlowableGroupByTest.java @@ -1732,4 +1732,114 @@ public void delayErrorSimpleComplete() { .assertResult(1); } + @Test + public void mainFusionRejected() { + TestSubscriber> ts = SubscriberFusion.newTest(QueueSubscription.SYNC); + + Flowable.just(1) + .groupBy(Functions.justFunction(1)) + .subscribe(ts); + + SubscriberFusion.assertFusion(ts, QueueSubscription.NONE) + .assertValueCount(1) + .assertComplete() + .assertNoErrors(); + } + + @Test + public void badSource() { + TestHelper.checkBadSourceFlowable(new Function, Object>() { + @Override + public Object apply(Flowable f) throws Exception { + return f.groupBy(Functions.justFunction(1)); + } + }, false, 1, 1, (Object[])null); + } + + @Test + public void badRequest() { + TestHelper.assertBadRequestReported(Flowable.just(1) + .groupBy(Functions.justFunction(1))); + } + + @Test + public void doubleOnSubscribe() { + TestHelper.checkDoubleOnSubscribeFlowable(new Function, Publisher>>() { + @Override + public Publisher> apply(Flowable f) throws Exception { + return f.groupBy(Functions.justFunction(1)); + } + }); + } + + @Test + public void nullKeyTakeInner() { + Flowable.just(1) + .groupBy(new Function() { + @Override + public Object apply(Integer v) throws Exception { + return null; + } + }) + .flatMap(new Function, Publisher>() { + @Override + public Publisher apply(GroupedFlowable g) throws Exception { + return g.take(1); + } + }) + .test() + .assertResult(1); + } + + @Test + public void errorFused() { + TestSubscriber ts = SubscriberFusion.newTest(QueueSubscription.ANY); + + Flowable.error(new TestException()) + .groupBy(Functions.justFunction(1)) + .subscribe(ts); + + SubscriberFusion.assertFusion(ts, QueueSubscription.ASYNC) + .assertFailure(TestException.class); + } + + @Test + public void errorFusedDelayed() { + TestSubscriber ts = SubscriberFusion.newTest(QueueSubscription.ANY); + + Flowable.error(new TestException()) + .groupBy(Functions.justFunction(1), true) + .subscribe(ts); + + SubscriberFusion.assertFusion(ts, QueueSubscription.ASYNC) + .assertFailure(TestException.class); + } + + @Test + public void groupError() { + Flowable.just(1).concatWith(Flowable.error(new TestException())) + .groupBy(Functions.justFunction(1), true) + .flatMap(new Function, Publisher>() { + @Override + public Publisher apply(GroupedFlowable g) throws Exception { + return g.hide(); + } + }) + .test() + .assertFailure(TestException.class, 1); + } + + @Test + public void groupComplete() { + Flowable.just(1) + .groupBy(Functions.justFunction(1), true) + .flatMap(new Function, Publisher>() { + @Override + public Publisher apply(GroupedFlowable g) throws Exception { + return g.hide(); + } + }) + .test() + .assertResult(1); + } } \ No newline at end of file diff --git a/src/test/java/io/reactivex/internal/operators/flowable/FlowableOnBackpressureBufferTest.java b/src/test/java/io/reactivex/internal/operators/flowable/FlowableOnBackpressureBufferTest.java index 7ede5d6daa..d05e051f27 100644 --- a/src/test/java/io/reactivex/internal/operators/flowable/FlowableOnBackpressureBufferTest.java +++ b/src/test/java/io/reactivex/internal/operators/flowable/FlowableOnBackpressureBufferTest.java @@ -291,4 +291,22 @@ public void fusedPreconsume() throws Exception { .assertNoErrors() .assertComplete(); } + + @Test + public void emptyDelayError() { + Flowable.empty() + .onBackpressureBuffer(true) + .test() + .assertResult(); + } + + @Test + public void fusionRejected() { + TestSubscriber ts = SubscriberFusion.newTest(QueueSubscription.SYNC); + + Flowable.never().onBackpressureBuffer().subscribe(ts); + + SubscriberFusion.assertFusion(ts, QueueSubscription.NONE) + .assertEmpty(); + } } \ No newline at end of file diff --git a/src/test/java/io/reactivex/internal/operators/flowable/FlowableOnBackpressureDropTest.java b/src/test/java/io/reactivex/internal/operators/flowable/FlowableOnBackpressureDropTest.java index cecc13b213..7bac544dbd 100644 --- a/src/test/java/io/reactivex/internal/operators/flowable/FlowableOnBackpressureDropTest.java +++ b/src/test/java/io/reactivex/internal/operators/flowable/FlowableOnBackpressureDropTest.java @@ -21,8 +21,8 @@ import org.junit.Test; import org.reactivestreams.*; -import io.reactivex.Flowable; -import io.reactivex.functions.Consumer; +import io.reactivex.*; +import io.reactivex.functions.*; import io.reactivex.internal.subscriptions.BooleanSubscription; import io.reactivex.schedulers.Schedulers; import io.reactivex.subscribers.*; @@ -179,4 +179,29 @@ public void accept(Throwable t) { assertFalse(errorOccurred.get()); } + + @Test + public void badSource() { + TestHelper.checkBadSourceFlowable(new Function, Object>() { + @Override + public Object apply(Flowable f) throws Exception { + return f.onBackpressureDrop(); + } + }, false, 1, 1, 1); + } + + @Test + public void doubleOnSubscribe() { + TestHelper.checkDoubleOnSubscribeFlowable(new Function, Publisher>() { + @Override + public Publisher apply(Flowable f) throws Exception { + return f.onBackpressureDrop(); + } + }); + } + + @Test + public void badRequest() { + TestHelper.assertBadRequestReported(Flowable.just(1).onBackpressureDrop()); + } } \ No newline at end of file diff --git a/src/test/java/io/reactivex/internal/operators/flowable/FlowableReduceTest.java b/src/test/java/io/reactivex/internal/operators/flowable/FlowableReduceTest.java index 76649a73db..811bb13c4c 100644 --- a/src/test/java/io/reactivex/internal/operators/flowable/FlowableReduceTest.java +++ b/src/test/java/io/reactivex/internal/operators/flowable/FlowableReduceTest.java @@ -18,6 +18,7 @@ import static org.mockito.Mockito.*; import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; import org.junit.*; import org.reactivestreams.*; @@ -28,6 +29,7 @@ import io.reactivex.internal.fuseable.HasUpstreamPublisher; import io.reactivex.internal.subscriptions.BooleanSubscription; import io.reactivex.plugins.RxJavaPlugins; +import io.reactivex.schedulers.Schedulers; import io.reactivex.subscribers.TestSubscriber; public class FlowableReduceTest { @@ -384,4 +386,88 @@ public Integer apply(Integer a, Integer b) throws Exception { .test() .assertFailure(TestException.class); } + + /** + * https://gist.github.com/jurna/353a2bd8ff83f0b24f0b5bc772077d61 + */ + @Test + public void shouldReduceTo10Events() { + final AtomicInteger count = new AtomicInteger(); + + Flowable.range(0, 10).flatMap(new Function>() { + @Override + public Publisher apply(final Integer x) throws Exception { + return Flowable.range(0, 2) + .map(new Function() { + @Override + public String apply(Integer y) throws Exception { + return blockingOp(x, y); + } + }).subscribeOn(Schedulers.io()) + .reduce(new BiFunction() { + @Override + public String apply(String l, String r) throws Exception { + return l + "_" + r; + } + }) + .doOnSuccess(new Consumer() { + @Override + public void accept(String s) throws Exception { + count.incrementAndGet(); + System.out.println("Completed with " + s);} + }) + .toFlowable(); + } + } + ).blockingLast(); + + assertEquals(10, count.get()); + } + + /** + * https://gist.github.com/jurna/353a2bd8ff83f0b24f0b5bc772077d61 + */ + @Test + public void shouldReduceTo10EventsFlowable() { + final AtomicInteger count = new AtomicInteger(); + + Flowable.range(0, 10).flatMap(new Function>() { + @Override + public Publisher apply(final Integer x) throws Exception { + return Flowable.range(0, 2) + .map(new Function() { + @Override + public String apply(Integer y) throws Exception { + return blockingOp(x, y); + } + }).subscribeOn(Schedulers.io()) + .reduce(new BiFunction() { + @Override + public String apply(String l, String r) throws Exception { + return l + "_" + r; + } + }) + .toFlowable() + .doOnNext(new Consumer() { + @Override + public void accept(String s) throws Exception { + count.incrementAndGet(); + System.out.println("Completed with " + s);} + }) + ; + } + } + ).blockingLast(); + + assertEquals(10, count.get()); + } + + static String blockingOp(Integer x, Integer y) { + try { + Thread.sleep(100); + } catch (InterruptedException e) { + e.printStackTrace(); + } + return "x" + x + "y" + y; + } } \ No newline at end of file diff --git a/src/test/java/io/reactivex/internal/operators/flowable/FlowableScanTest.java b/src/test/java/io/reactivex/internal/operators/flowable/FlowableScanTest.java index b1dc93f806..ac8311d970 100644 --- a/src/test/java/io/reactivex/internal/operators/flowable/FlowableScanTest.java +++ b/src/test/java/io/reactivex/internal/operators/flowable/FlowableScanTest.java @@ -362,10 +362,11 @@ public void onNext(Integer integer) { }); verify(producer.get(), never()).request(0); - verify(producer.get(), times(3)).request(1); // FIXME this was 2 in 1.x + verify(producer.get(), times(2)).request(1); } @Test + @Ignore("scanSeed no longer emits without upstream signal") public void testInitialValueEmittedNoProducer() { PublishProcessor source = PublishProcessor.create(); @@ -384,6 +385,7 @@ public Integer apply(Integer t1, Integer t2) { } @Test + @Ignore("scanSeed no longer emits without upstream signal") public void testInitialValueEmittedWithProducer() { Flowable source = Flowable.never(); @@ -457,19 +459,4 @@ public Object apply(Object a, Object b) throws Exception { .test() .assertFailure(TestException.class); } - - @Test - public void badSource() { - TestHelper.checkBadSourceFlowable(new Function, Object>() { - @Override - public Object apply(Flowable o) throws Exception { - return o.scan(0, new BiFunction() { - @Override - public Object apply(Object a, Object b) throws Exception { - return a; - } - }); - } - }, false, 1, 1, 0, 0); - } } \ No newline at end of file diff --git a/src/test/java/io/reactivex/internal/operators/flowable/FlowableSingleTest.java b/src/test/java/io/reactivex/internal/operators/flowable/FlowableSingleTest.java index 00da635808..87c7242282 100644 --- a/src/test/java/io/reactivex/internal/operators/flowable/FlowableSingleTest.java +++ b/src/test/java/io/reactivex/internal/operators/flowable/FlowableSingleTest.java @@ -742,6 +742,13 @@ public Object apply(Flowable o) throws Exception { return o.singleElement(); } }, false, 1, 1, 1); + + TestHelper.checkBadSourceFlowable(new Function, Object>() { + @Override + public Object apply(Flowable o) throws Exception { + return o.singleOrError().toFlowable(); + } + }, false, 1, 1, 1); } @Test diff --git a/src/test/java/io/reactivex/internal/operators/observable/ObservableBlockingTest.java b/src/test/java/io/reactivex/internal/operators/observable/ObservableBlockingTest.java index 6e11142c11..2b1a2662df 100644 --- a/src/test/java/io/reactivex/internal/operators/observable/ObservableBlockingTest.java +++ b/src/test/java/io/reactivex/internal/operators/observable/ObservableBlockingTest.java @@ -13,7 +13,7 @@ package io.reactivex.internal.operators.observable; -import static org.junit.Assert.assertEquals; +import static org.junit.Assert.*; import java.util.*; import java.util.concurrent.TimeUnit; @@ -27,6 +27,7 @@ import io.reactivex.exceptions.TestException; import io.reactivex.functions.*; import io.reactivex.internal.functions.Functions; +import io.reactivex.internal.observers.BlockingFirstObserver; import io.reactivex.observers.TestObserver; import io.reactivex.schedulers.Schedulers; @@ -283,4 +284,38 @@ public void onCompleteDelayed() { to.assertResult(); } + + @Test + public void blockingCancelUpfront() { + BlockingFirstObserver o = new BlockingFirstObserver(); + + assertFalse(o.isDisposed()); + o.dispose(); + assertTrue(o.isDisposed()); + + Disposable d = Disposables.empty(); + + o.onSubscribe(d); + + assertTrue(d.isDisposed()); + + Thread.currentThread().interrupt(); + try { + o.blockingGet(); + fail("Should have thrown"); + } catch (RuntimeException ex) { + assertTrue(ex.toString(), ex.getCause() instanceof InterruptedException); + } + + Thread.interrupted(); + + o.onError(new TestException()); + + try { + o.blockingGet(); + fail("Should have thrown"); + } catch (TestException ex) { + // expected + } + } } diff --git a/src/test/java/io/reactivex/internal/operators/observable/ObservableForEachTest.java b/src/test/java/io/reactivex/internal/operators/observable/ObservableForEachTest.java index c81608d893..ad859e0813 100644 --- a/src/test/java/io/reactivex/internal/operators/observable/ObservableForEachTest.java +++ b/src/test/java/io/reactivex/internal/operators/observable/ObservableForEachTest.java @@ -13,15 +13,20 @@ package io.reactivex.internal.operators.observable; -import static org.junit.Assert.assertEquals; +import static org.junit.Assert.*; import java.util.*; import org.junit.Test; import io.reactivex.Observable; -import io.reactivex.exceptions.TestException; +import io.reactivex.TestHelper; +import io.reactivex.disposables.Disposable; +import io.reactivex.exceptions.*; import io.reactivex.functions.*; +import io.reactivex.internal.functions.Functions; +import io.reactivex.plugins.RxJavaPlugins; +import io.reactivex.subjects.PublishSubject; public class ObservableForEachTest { @@ -72,4 +77,85 @@ public void accept(Throwable e) throws Exception { assertEquals(Arrays.asList(1, 2, 3, 4, 5, 100), list); } + @Test + public void badSource() { + TestHelper.checkBadSourceObservable(new Function, Object>() { + @Override + public Object apply(Observable f) throws Exception { + return f.forEachWhile(Functions.alwaysTrue()); + } + }, false, 1, 1, (Object[])null); + } + + @Test + public void dispose() { + PublishSubject ps = PublishSubject.create(); + + Disposable d = ps.forEachWhile(Functions.alwaysTrue()); + + assertFalse(d.isDisposed()); + + d.dispose(); + + assertTrue(d.isDisposed()); + } + + @Test + public void whilePredicateThrows() { + List errors = TestHelper.trackPluginErrors(); + try { + Observable.just(1).forEachWhile(new Predicate() { + @Override + public boolean test(Integer v) throws Exception { + throw new TestException(); + } + }); + + TestHelper.assertError(errors, 0, TestException.class); + } finally { + RxJavaPlugins.reset(); + } + } + + @Test + public void whileErrorThrows() { + List errors = TestHelper.trackPluginErrors(); + try { + Observable.error(new TestException("Outer")) + .forEachWhile(Functions.alwaysTrue(), new Consumer() { + @Override + public void accept(Throwable v) throws Exception { + throw new TestException("Inner"); + } + }); + + TestHelper.assertError(errors, 0, CompositeException.class); + + List ce = TestHelper.compositeList(errors.get(0)); + + TestHelper.assertError(ce, 0, TestException.class, "Outer"); + TestHelper.assertError(ce, 1, TestException.class, "Inner"); + } finally { + RxJavaPlugins.reset(); + } + } + + @Test + public void whileCompleteThrows() { + List errors = TestHelper.trackPluginErrors(); + try { + Observable.just(1).forEachWhile(Functions.alwaysTrue(), Functions.emptyConsumer(), + new Action() { + @Override + public void run() throws Exception { + throw new TestException(); + } + }); + + TestHelper.assertError(errors, 0, TestException.class); + } finally { + RxJavaPlugins.reset(); + } + } + } diff --git a/src/test/java/io/reactivex/internal/subscribers/BasicFuseableConditionalSubscriberTest.java b/src/test/java/io/reactivex/internal/subscribers/BasicFuseableConditionalSubscriberTest.java new file mode 100644 index 0000000000..636d1ed690 --- /dev/null +++ b/src/test/java/io/reactivex/internal/subscribers/BasicFuseableConditionalSubscriberTest.java @@ -0,0 +1,83 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.subscribers; + +import static org.junit.Assert.*; + +import org.junit.Test; +import org.reactivestreams.Subscription; + +import io.reactivex.TestHelper; +import io.reactivex.internal.fuseable.ConditionalSubscriber; +import io.reactivex.internal.subscriptions.ScalarSubscription; + +public class BasicFuseableConditionalSubscriberTest { + + @Test + public void offerThrows() { + ConditionalSubscriber cs = new ConditionalSubscriber() { + + @Override + public void onSubscribe(Subscription s) { + } + + @Override + public void onNext(Integer t) { + } + + @Override + public void onError(Throwable t) { + } + + @Override + public void onComplete() { + } + + @Override + public boolean tryOnNext(Integer t) { + return false; + } + }; + + BasicFuseableConditionalSubscriber fcs = new BasicFuseableConditionalSubscriber(cs) { + + @Override + public boolean tryOnNext(Integer t) { + return false; + } + + @Override + public void onNext(Integer t) { + } + + @Override + public int requestFusion(int mode) { + return 0; + } + + @Override + public Integer poll() throws Exception { + return null; + } + }; + + fcs.onSubscribe(new ScalarSubscription(fcs, 1)); + + TestHelper.assertNoOffer(fcs); + + assertFalse(fcs.isEmpty()); + fcs.clear(); + assertTrue(fcs.isEmpty()); + } +} diff --git a/src/test/java/io/reactivex/internal/subscribers/BasicFuseableSubscriberTest.java b/src/test/java/io/reactivex/internal/subscribers/BasicFuseableSubscriberTest.java new file mode 100644 index 0000000000..99734e1fa2 --- /dev/null +++ b/src/test/java/io/reactivex/internal/subscribers/BasicFuseableSubscriberTest.java @@ -0,0 +1,53 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.subscribers; + +import static org.junit.Assert.*; + +import org.junit.Test; + +import io.reactivex.TestHelper; +import io.reactivex.internal.subscriptions.ScalarSubscription; +import io.reactivex.subscribers.TestSubscriber; + +public class BasicFuseableSubscriberTest { + + @Test + public void offerThrows() { + BasicFuseableSubscriber fcs = new BasicFuseableSubscriber(new TestSubscriber(0L)) { + + @Override + public void onNext(Integer t) { + } + + @Override + public int requestFusion(int mode) { + return 0; + } + + @Override + public Integer poll() throws Exception { + return null; + } + }; + + fcs.onSubscribe(new ScalarSubscription(fcs, 1)); + + TestHelper.assertNoOffer(fcs); + + assertFalse(fcs.isEmpty()); + fcs.clear(); + assertTrue(fcs.isEmpty()); + } +} diff --git a/src/test/java/io/reactivex/internal/subscribers/InnerQueuedSubscriberTest.java b/src/test/java/io/reactivex/internal/subscribers/InnerQueuedSubscriberTest.java new file mode 100644 index 0000000000..0c23aa20af --- /dev/null +++ b/src/test/java/io/reactivex/internal/subscribers/InnerQueuedSubscriberTest.java @@ -0,0 +1,64 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.subscribers; + +import java.util.*; + +import static org.junit.Assert.*; +import org.junit.Test; +import org.reactivestreams.Subscription; + +public class InnerQueuedSubscriberTest { + + @Test + public void requestInBatches() { + InnerQueuedSubscriberSupport support = new InnerQueuedSubscriberSupport() { + @Override + public void innerNext(InnerQueuedSubscriber inner, Integer value) { + } + @Override + public void innerError(InnerQueuedSubscriber inner, Throwable e) { + } + @Override + public void innerComplete(InnerQueuedSubscriber inner) { + } + @Override + public void drain() { + } + }; + + InnerQueuedSubscriber inner = new InnerQueuedSubscriber(support, 4); + + final List requests = new ArrayList(); + + inner.onSubscribe(new Subscription() { + @Override + public void request(long n) { + requests.add(n); + } + @Override + public void cancel() { + // ignore + } + }); + + inner.request(1); + inner.request(1); + inner.request(1); + inner.request(1); + inner.request(1); + + assertEquals(Arrays.asList(4L, 3L), requests); + } +} diff --git a/src/test/java/io/reactivex/internal/subscribers/SinglePostCompleteSubscriberTest.java b/src/test/java/io/reactivex/internal/subscribers/SinglePostCompleteSubscriberTest.java new file mode 100644 index 0000000000..ce850d7d7d --- /dev/null +++ b/src/test/java/io/reactivex/internal/subscribers/SinglePostCompleteSubscriberTest.java @@ -0,0 +1,67 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.subscribers; + +import org.junit.Test; + +import io.reactivex.TestHelper; +import io.reactivex.internal.subscriptions.BooleanSubscription; +import io.reactivex.subscribers.TestSubscriber; + +public class SinglePostCompleteSubscriberTest { + + @Test + public void requestCompleteRace() { + for (int i = 0; i < 500; i++) { + final TestSubscriber ts = new TestSubscriber(0L); + + final SinglePostCompleteSubscriber spc = new SinglePostCompleteSubscriber(ts) { + private static final long serialVersionUID = -2848918821531562637L; + + @Override + public void onNext(Integer t) { + } + + @Override + public void onError(Throwable t) { + } + + @Override + public void onComplete() { + complete(1); + } + }; + + spc.onSubscribe(new BooleanSubscription()); + + Runnable r1 = new Runnable() { + @Override + public void run() { + spc.onComplete(); + } + }; + + Runnable r2 = new Runnable() { + @Override + public void run() { + ts.request(1); + } + }; + + TestHelper.race(r1, r2); + + ts.assertResult(1); + } + } +} diff --git a/src/test/java/io/reactivex/observable/ObservableGroupByTests.java b/src/test/java/io/reactivex/observable/ObservableGroupByTests.java index e3745ad879..28c44485cb 100644 --- a/src/test/java/io/reactivex/observable/ObservableGroupByTests.java +++ b/src/test/java/io/reactivex/observable/ObservableGroupByTests.java @@ -23,7 +23,7 @@ public class ObservableGroupByTests { @Test - public void testTakeUnsubscribesOnGroupBy() { + public void testTakeUnsubscribesOnGroupBy() throws Exception { Observable.merge( ObservableEventStream.getEventStream("HTTP-ClusterA", 50), ObservableEventStream.getEventStream("HTTP-ClusterB", 20) @@ -45,10 +45,12 @@ public void accept(GroupedObservable v) { }); System.out.println("**** finished"); + + Thread.sleep(200); // make sure the event streams receive their interrupt } @Test - public void testTakeUnsubscribesOnFlatMapOfGroupBy() { + public void testTakeUnsubscribesOnFlatMapOfGroupBy() throws Exception { Observable.merge( ObservableEventStream.getEventStream("HTTP-ClusterA", 50), ObservableEventStream.getEventStream("HTTP-ClusterB", 20) @@ -80,5 +82,7 @@ public void accept(Object pv) { }); System.out.println("**** finished"); + + Thread.sleep(200); // make sure the event streams receive their interrupt } } \ No newline at end of file diff --git a/src/test/java/io/reactivex/observable/ObservableScanTests.java b/src/test/java/io/reactivex/observable/ObservableScanTests.java index b5e99d6cee..e76b98f024 100644 --- a/src/test/java/io/reactivex/observable/ObservableScanTests.java +++ b/src/test/java/io/reactivex/observable/ObservableScanTests.java @@ -23,7 +23,7 @@ public class ObservableScanTests { @Test - public void testUnsubscribeScan() { + public void testUnsubscribeScan() throws Exception { ObservableEventStream.getEventStream("HTTP-ClusterB", 20) .scan(new HashMap(), new BiFunction, Event, HashMap>() { @@ -40,5 +40,7 @@ public void accept(HashMap pv) { System.out.println(pv); } }); + + Thread.sleep(200); // make sure the event streams receive their interrupt } } \ No newline at end of file diff --git a/src/test/java/io/reactivex/observable/ObservableZipTests.java b/src/test/java/io/reactivex/observable/ObservableZipTests.java index ec811e3bff..97dce6d6a5 100644 --- a/src/test/java/io/reactivex/observable/ObservableZipTests.java +++ b/src/test/java/io/reactivex/observable/ObservableZipTests.java @@ -28,7 +28,7 @@ public class ObservableZipTests { @Test - public void testZipObservableOfObservables() { + public void testZipObservableOfObservables() throws Exception { ObservableEventStream.getEventStream("HTTP-ClusterB", 20) .groupBy(new Function() { @Override @@ -63,6 +63,8 @@ public void accept(Object pv) { }); System.out.println("**** finished"); + + Thread.sleep(200); // make sure the event streams receive their interrupt } /**