diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableGroupJoin.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableGroupJoin.java index 9553de52f0..87fb7b181c 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableGroupJoin.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableGroupJoin.java @@ -484,7 +484,7 @@ public void onNext(Object t) { @Override public void onError(Throwable t) { - parent.innerError(t); + parent.innerCloseError(t); } @Override diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableInterval.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableInterval.java index 58e238811b..a7b2ac6c6d 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableInterval.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableInterval.java @@ -82,15 +82,10 @@ public void run() { if (r != 0L) { actual.onNext(count++); - if (r != Long.MAX_VALUE) { - decrementAndGet(); - } + BackpressureHelper.produced(this, 1); } else { - try { - actual.onError(new MissingBackpressureException("Can't deliver value " + count + " due to lack of requests")); - } finally { - DisposableHelper.dispose(resource); - } + actual.onError(new MissingBackpressureException("Can't deliver value " + count + " due to lack of requests")); + DisposableHelper.dispose(resource); } } } diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableJoin.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableJoin.java index 3f2cb64fc5..6865b5217c 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableJoin.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableJoin.java @@ -55,8 +55,8 @@ public FlowableJoin( @Override protected void subscribeActual(Subscriber s) { - GroupJoinSubscription parent = - new GroupJoinSubscription(s, leftEnd, rightEnd, resultSelector); + JoinSubscription parent = + new JoinSubscription(s, leftEnd, rightEnd, resultSelector); s.onSubscribe(parent); @@ -69,7 +69,7 @@ protected void subscribeActual(Subscriber s) { other.subscribe(right); } - static final class GroupJoinSubscription + static final class JoinSubscription extends AtomicInteger implements Subscription, JoinSupport { @@ -111,7 +111,7 @@ static final class GroupJoinSubscription static final Integer RIGHT_CLOSE = 4; - GroupJoinSubscription(Subscriber actual, Function> leftEnd, + JoinSubscription(Subscriber actual, Function> leftEnd, Function> rightEnd, BiFunction resultSelector) { this.actual = actual; diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableOnBackpressureDrop.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableOnBackpressureDrop.java index 064cd060cb..13c1c4989a 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableOnBackpressureDrop.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableOnBackpressureDrop.java @@ -81,9 +81,7 @@ public void onNext(T t) { long r = get(); if (r != 0L) { actual.onNext(t); - if (r != Long.MAX_VALUE) { - decrementAndGet(); - } + BackpressureHelper.produced(this, 1); } else { try { onDrop.accept(t); diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableOnBackpressureError.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableOnBackpressureError.java index 71ccf99e76..0707e59b66 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableOnBackpressureError.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableOnBackpressureError.java @@ -65,9 +65,7 @@ public void onNext(T t) { long r = get(); if (r != 0L) { actual.onNext(t); - if (r != Long.MAX_VALUE) { - decrementAndGet(); - } + BackpressureHelper.produced(this, 1); } else { onError(new MissingBackpressureException("could not emit value due to lack of requests")); } diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowablePublishMulticast.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowablePublishMulticast.java index 625c8ffdd2..ebf87f0b45 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowablePublishMulticast.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowablePublishMulticast.java @@ -102,20 +102,14 @@ public void onNext(R t) { @Override public void onError(Throwable t) { - try { - actual.onError(t); - } finally { - processor.dispose(); - } + actual.onError(t); + processor.dispose(); } @Override public void onComplete() { - try { - actual.onComplete(); - } finally { - processor.dispose(); - } + actual.onComplete(); + processor.dispose(); } @Override @@ -214,12 +208,10 @@ public void onNext(T t) { if (done) { return; } - if (sourceMode == QueueSubscription.NONE) { - if (!queue.offer(t)) { - SubscriptionHelper.cancel(s); - onError(new MissingBackpressureException()); - return; - } + if (sourceMode == QueueSubscription.NONE && !queue.offer(t)) { + s.get().cancel(); + onError(new MissingBackpressureException()); + return; } drain(); } @@ -473,20 +465,7 @@ static final class MulticastSubscription @Override public void request(long n) { if (SubscriptionHelper.validate(n)) { - for (;;) { - long r = get(); - if (r == Long.MIN_VALUE) { - return; - } - if (r != Long.MAX_VALUE) { - long u = BackpressureHelper.addCap(r, n); - if (compareAndSet(r, u)) { - break; - } - } else { - break; - } - } + BackpressureHelper.addCancel(this, n); parent.drain(); } } diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableReplay.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableReplay.java index 5a119573af..630e62fe10 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableReplay.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableReplay.java @@ -348,18 +348,14 @@ static final class ReplaySubscriber implements Subscriber, Disposable { static final InnerSubscription[] TERMINATED = new InnerSubscription[0]; /** Tracks the subscribed InnerSubscriptions. */ - final AtomicReference subscribers; + final AtomicReference[]> subscribers; /** * Atomically changed from false to true by connect to make sure the * connection is only performed by one thread. */ final AtomicBoolean shouldConnect; - /** Guarded by this. */ - boolean emitting; - /** Guarded by this. */ - boolean missed; - + final AtomicInteger management; /** Contains the maximum element index the child Subscribers requested so far. Accessed while emitting is true. */ long maxChildRequested; @@ -368,10 +364,11 @@ static final class ReplaySubscriber implements Subscriber, Disposable { /** The upstream producer. */ volatile Subscription subscription; + @SuppressWarnings("unchecked") ReplaySubscriber(ReplayBuffer buffer) { this.buffer = buffer; - - this.subscribers = new AtomicReference(EMPTY); + this.management = new AtomicInteger(); + this.subscribers = new AtomicReference[]>(EMPTY); this.shouldConnect = new AtomicBoolean(); } @@ -380,6 +377,7 @@ public boolean isDisposed() { return subscribers.get() == TERMINATED; } + @SuppressWarnings("unchecked") @Override public void dispose() { subscribers.set(TERMINATED); @@ -397,6 +395,7 @@ public void dispose() { * @param producer the producer to add * @return true if succeeded, false otherwise */ + @SuppressWarnings("unchecked") boolean add(InnerSubscription producer) { if (producer == null) { throw new NullPointerException(); @@ -404,7 +403,7 @@ boolean add(InnerSubscription producer) { // the state can change so we do a CAS loop to achieve atomicity for (;;) { // get the current producer array - InnerSubscription[] c = subscribers.get(); + InnerSubscription[] c = subscribers.get(); // if this subscriber-to-source reached a terminal state by receiving // an onError or onComplete, just refuse to add the new producer if (c == TERMINATED) { @@ -412,7 +411,7 @@ boolean add(InnerSubscription producer) { } // we perform a copy-on-write logic int len = c.length; - InnerSubscription[] u = new InnerSubscription[len + 1]; + InnerSubscription[] u = new InnerSubscription[len + 1]; System.arraycopy(c, 0, u, 0, len); u[len] = producer; // try setting the subscribers array @@ -428,19 +427,20 @@ boolean add(InnerSubscription producer) { * Atomically removes the given InnerSubscription from the subscribers array. * @param p the InnerSubscription to remove */ + @SuppressWarnings("unchecked") void remove(InnerSubscription p) { // the state can change so we do a CAS loop to achieve atomicity for (;;) { // let's read the current subscribers array - InnerSubscription[] c = subscribers.get(); + InnerSubscription[] c = subscribers.get(); + int len = c.length; // if it is either empty or terminated, there is nothing to remove so we quit - if (c == EMPTY || c == TERMINATED) { + if (len == 0) { return; } // let's find the supplied producer in the array // although this is O(n), we don't expect too many child subscribers in general int j = -1; - int len = c.length; for (int i = 0; i < len; i++) { if (c[i].equals(p)) { j = i; @@ -452,7 +452,7 @@ void remove(InnerSubscription p) { return; } // we do copy-on-write logic here - InnerSubscription[] u; + InnerSubscription[] u; // we don't create a new empty array if producer was the single inhabitant // but rather reuse an empty array if (len == 1) { @@ -476,50 +476,50 @@ void remove(InnerSubscription p) { @Override public void onSubscribe(Subscription p) { - Subscription p0 = subscription; - if (p0 != null) { - RxJavaPlugins.onError(new IllegalStateException("Only a single producer can be set on a Subscriber.")); - return; + if (SubscriptionHelper.validate(subscription, p)) { + subscription = p; + manageRequests(); + for (InnerSubscription rp : subscribers.get()) { + buffer.replay(rp); + } } - subscription = p; - manageRequests(); - replay(); } @Override public void onNext(T t) { if (!done) { buffer.next(t); - replay(); + for (InnerSubscription rp : subscribers.get()) { + buffer.replay(rp); + } } } + + @SuppressWarnings("unchecked") @Override public void onError(Throwable e) { // The observer front is accessed serially as required by spec so // no need to CAS in the terminal value if (!done) { done = true; - try { - buffer.error(e); - replay(); - } finally { - dispose(); // expectation of testIssue2191 + buffer.error(e); + for (InnerSubscription rp : subscribers.getAndSet(TERMINATED)) { + buffer.replay(rp); } } else { RxJavaPlugins.onError(e); } } + @SuppressWarnings("unchecked") @Override public void onComplete() { // The observer front is accessed serially as required by spec so // no need to CAS in the terminal value if (!done) { done = true; - try { - buffer.complete(); - replay(); - } finally { - dispose(); + buffer.complete(); + for (InnerSubscription rp : subscribers.getAndSet(TERMINATED)) { + buffer.replay(rp); } } } @@ -528,24 +528,16 @@ public void onComplete() { * Coordinates the request amounts of various child Subscribers. */ void manageRequests() { - // if the upstream has completed, no more requesting is possible - if (isDisposed()) { + if (management.getAndIncrement() != 0) { return; } - synchronized (this) { - if (emitting) { - missed = true; - return; - } - emitting = true; - } + int missed = 1; for (;;) { // if the upstream has completed, no more requesting is possible if (isDisposed()) { return; } - @SuppressWarnings("unchecked") InnerSubscription[] a = subscribers.get(); long ri = maxChildRequested; @@ -584,26 +576,12 @@ void manageRequests() { p.request(ur); } - synchronized (this) { - if (!missed) { - emitting = false; - return; - } - missed = false; + missed = management.addAndGet(-missed); + if (missed == 0) { + break; } } } - - /** - * Tries to replay the buffer contents to all known subscribers. - */ - void replay() { - @SuppressWarnings("unchecked") - InnerSubscription[] a = subscribers.get(); - for (InnerSubscription rp : a) { - buffer.replay(rp); - } - } } /** * A Subscription that manages the request and cancellation state of a @@ -648,39 +626,38 @@ static final class InnerSubscription extends AtomicLong implements Subscripti @Override public void request(long n) { // ignore negative requests - if (!SubscriptionHelper.validate(n)) { - return; - } - // In general, RxJava doesn't prevent concurrent requests (with each other or with - // a cancel) so we need a CAS-loop, but we need to handle - // request overflow and cancelled/not requested state as well. - for (;;) { - // get the current request amount - long r = get(); - // if child called cancel() do nothing - if (r == CANCELLED) { - return; - } - // ignore zero requests except any first that sets in zero - if (r >= 0L && n == 0) { - return; - } - // otherwise, increase the request count - long u = BackpressureHelper.addCap(r, n); - - // try setting the new request value - if (compareAndSet(r, u)) { - // increment the total request counter - BackpressureHelper.add(totalRequested, n); - // if successful, notify the parent dispatcher this child can receive more - // elements - parent.manageRequests(); - - parent.buffer.replay(this); - return; + if (SubscriptionHelper.validate(n)) { + // In general, RxJava doesn't prevent concurrent requests (with each other or with + // a cancel) so we need a CAS-loop, but we need to handle + // request overflow and cancelled/not requested state as well. + for (;;) { + // get the current request amount + long r = get(); + // if child called cancel() do nothing + if (r == CANCELLED) { + return; + } + // ignore zero requests except any first that sets in zero + if (r >= 0L && n == 0) { + return; + } + // otherwise, increase the request count + long u = BackpressureHelper.addCap(r, n); + + // try setting the new request value + if (compareAndSet(r, u)) { + // increment the total request counter + BackpressureHelper.add(totalRequested, n); + // if successful, notify the parent dispatcher this child can receive more + // elements + parent.manageRequests(); + + parent.buffer.replay(this); + return; + } + // otherwise, someone else changed the state (perhaps a concurrent + // request or cancellation) so retry } - // otherwise, someone else changed the state (perhaps a concurrent - // request or cancellation) so retry } } @@ -690,30 +667,7 @@ public void request(long n) { * @return the updated request value (may indicate how much can be produced or a terminal state) */ public long produced(long n) { - // we don't allow producing zero or less: it would be a bug in the operator - if (n <= 0) { - throw new IllegalArgumentException("Cant produce zero or less"); - } - for (;;) { - // get the current request value - long r = get(); - // if the child has cancelled, simply return and indicate this - if (r == CANCELLED) { - return CANCELLED; - } - // reduce the requested amount - long u = r - n; - // if the new amount is less than zero, we have a bug in this operator - if (u < 0) { - throw new IllegalStateException("More produced (" + n + ") than requested (" + r + ")"); - } - // try updating the request value - if (compareAndSet(r, u)) { - // and return the updated value - return u; - } - // otherwise, some concurrent activity happened and we need to retry - } + return BackpressureHelper.producedCancel(this, n); } @Override @@ -728,24 +682,14 @@ public void cancel() { @Override public void dispose() { - long r = get(); - // let's see if we are cancelled - if (r != CANCELLED) { - // if not, swap in the terminal state, this is idempotent - // because other methods using CAS won't overwrite this value, - // concurrent calls to dispose/cancel will atomically swap in the same - // terminal value - r = getAndSet(CANCELLED); - // and only one of them will see a non-terminated value before the swap - if (r != CANCELLED) { - // remove this from the parent - parent.remove(this); - // After removal, we might have unblocked the other child subscribers: - // let's assume this child had 0 requested before the cancellation while - // the others had non-zero. By removing this 'blocking' child, the others - // are now free to receive events - parent.manageRequests(); - } + if (getAndSet(CANCELLED) != CANCELLED) { + // remove this from the parent + parent.remove(this); + // After removal, we might have unblocked the other child subscribers: + // let's assume this child had 0 requested before the cancellation while + // the others had non-zero. By removing this 'blocking' child, the others + // are now free to receive events + parent.manageRequests(); } } /** diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableTakeUntil.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableTakeUntil.java index f5c6324bc0..ebc8ea813a 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableTakeUntil.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableTakeUntil.java @@ -13,12 +13,12 @@ package io.reactivex.internal.operators.flowable; -import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.*; import org.reactivestreams.*; -import io.reactivex.internal.subscriptions.*; -import io.reactivex.subscribers.SerializedSubscriber; +import io.reactivex.internal.subscriptions.SubscriptionHelper; +import io.reactivex.internal.util.*; public final class FlowableTakeUntil extends AbstractFlowableWithUpstream { final Publisher other; @@ -29,101 +29,98 @@ public FlowableTakeUntil(Publisher source, Publisher other) { @Override protected void subscribeActual(Subscriber child) { - final SerializedSubscriber serial = new SerializedSubscriber(child); + TakeUntilMainSubscriber parent = new TakeUntilMainSubscriber(child); + child.onSubscribe(parent); - final ArrayCompositeSubscription frc = new ArrayCompositeSubscription(2); + other.subscribe(parent.other); - final TakeUntilSubscriber tus = new TakeUntilSubscriber(serial, frc); - - other.subscribe(new Subscriber() { - @Override - public void onSubscribe(Subscription s) { - if (frc.setResource(1, s)) { - s.request(Long.MAX_VALUE); - } - } - @Override - public void onNext(U t) { - frc.dispose(); - if (tus.compareAndSet(false, true)) { - EmptySubscription.complete(serial); - } else { - serial.onComplete(); - } - } - @Override - public void onError(Throwable t) { - frc.dispose(); - if (tus.compareAndSet(false, true)) { - EmptySubscription.error(t, serial); - } else { - serial.onError(t); - } - } - @Override - public void onComplete() { - frc.dispose(); - if (tus.compareAndSet(false, true)) { - EmptySubscription.complete(serial); - } else { - serial.onComplete(); - } - } - }); - - source.subscribe(tus); + source.subscribe(parent); } - static final class TakeUntilSubscriber extends AtomicBoolean implements Subscriber, Subscription { + static final class TakeUntilMainSubscriber extends AtomicInteger implements Subscriber, Subscription { + + private static final long serialVersionUID = -4945480365982832967L; - private static final long serialVersionUID = 3451719290311127173L; final Subscriber actual; - final ArrayCompositeSubscription frc; - Subscription s; + final AtomicLong requested; + + final AtomicReference s; + + final AtomicThrowable error; - TakeUntilSubscriber(Subscriber actual, ArrayCompositeSubscription frc) { + final OtherSubscriber other; + + TakeUntilMainSubscriber(Subscriber actual) { this.actual = actual; - this.frc = frc; + this.requested = new AtomicLong(); + this.s = new AtomicReference(); + this.other = new OtherSubscriber(); + this.error = new AtomicThrowable(); } @Override public void onSubscribe(Subscription s) { - if (SubscriptionHelper.validate(this.s, s)) { - this.s = s; - if (frc.setResource(0, s)) { - if (compareAndSet(false, true)) { - actual.onSubscribe(this); - } - } - } + SubscriptionHelper.deferredSetOnce(this.s, requested, s); } @Override public void onNext(T t) { - actual.onNext(t); + HalfSerializer.onNext(actual, t, this, error); } @Override public void onError(Throwable t) { - frc.dispose(); - actual.onError(t); + SubscriptionHelper.cancel(other); + HalfSerializer.onError(actual, t, this, error); } @Override public void onComplete() { - frc.dispose(); - actual.onComplete(); + SubscriptionHelper.cancel(other); + HalfSerializer.onComplete(actual, this, error); } @Override public void request(long n) { - s.request(n); + SubscriptionHelper.deferredRequest(s, requested, n); } @Override public void cancel() { - frc.dispose(); + SubscriptionHelper.cancel(s); + SubscriptionHelper.cancel(other); + } + + final class OtherSubscriber extends AtomicReference implements Subscriber { + + private static final long serialVersionUID = -3592821756711087922L; + + @Override + public void onSubscribe(Subscription s) { + if (SubscriptionHelper.setOnce(this, s)) { + s.request(Long.MAX_VALUE); + } + } + + @Override + public void onNext(Object t) { + SubscriptionHelper.cancel(this); + onComplete(); + } + + @Override + public void onError(Throwable t) { + SubscriptionHelper.cancel(s); + HalfSerializer.onError(actual, t, TakeUntilMainSubscriber.this, error); + } + + @Override + public void onComplete() { + SubscriptionHelper.cancel(s); + HalfSerializer.onComplete(actual, TakeUntilMainSubscriber.this, error); + } + } } } diff --git a/src/main/java/io/reactivex/internal/operators/observable/ObservableJoin.java b/src/main/java/io/reactivex/internal/operators/observable/ObservableJoin.java index 3173249bf2..f80ce709ec 100644 --- a/src/main/java/io/reactivex/internal/operators/observable/ObservableJoin.java +++ b/src/main/java/io/reactivex/internal/operators/observable/ObservableJoin.java @@ -56,8 +56,8 @@ public ObservableJoin( @Override protected void subscribeActual(Observer s) { - GroupJoinDisposable parent = - new GroupJoinDisposable( + JoinDisposable parent = + new JoinDisposable( s, leftEnd, rightEnd, resultSelector); s.onSubscribe(parent); @@ -71,7 +71,7 @@ protected void subscribeActual(Observer s) { other.subscribe(right); } - static final class GroupJoinDisposable + static final class JoinDisposable extends AtomicInteger implements Disposable, JoinSupport { @@ -111,7 +111,7 @@ static final class GroupJoinDisposable static final Integer RIGHT_CLOSE = 4; - GroupJoinDisposable(Observer actual, + JoinDisposable(Observer actual, Function> leftEnd, Function> rightEnd, BiFunction resultSelector) { diff --git a/src/main/java/io/reactivex/internal/schedulers/ComputationScheduler.java b/src/main/java/io/reactivex/internal/schedulers/ComputationScheduler.java index ca9618e334..2cac1c47b9 100644 --- a/src/main/java/io/reactivex/internal/schedulers/ComputationScheduler.java +++ b/src/main/java/io/reactivex/internal/schedulers/ComputationScheduler.java @@ -47,15 +47,7 @@ public final class ComputationScheduler extends Scheduler { private static final String KEY_COMPUTATION_PRIORITY = "rx2.computation-priority"; static { - int maxThreads = Integer.getInteger(KEY_MAX_THREADS, 0); - int cpuCount = Runtime.getRuntime().availableProcessors(); - int max; - if (maxThreads <= 0 || maxThreads > cpuCount) { - max = cpuCount; - } else { - max = maxThreads; - } - MAX_THREADS = max; + MAX_THREADS = cap(Runtime.getRuntime().availableProcessors(), Integer.getInteger(KEY_MAX_THREADS, 0)); SHUTDOWN_WORKER = new PoolWorker(new RxThreadFactory("RxComputationShutdown")); SHUTDOWN_WORKER.dispose(); @@ -66,6 +58,10 @@ public final class ComputationScheduler extends Scheduler { THREAD_FACTORY = new RxThreadFactory(THREAD_NAME_PREFIX, priority); } + static int cap(int cpuCount, int paramThreads) { + return paramThreads <= 0 || paramThreads > cpuCount ? cpuCount : paramThreads; + } + static final class FixedSchedulerPool { final int cores; diff --git a/src/main/java/io/reactivex/internal/schedulers/IoScheduler.java b/src/main/java/io/reactivex/internal/schedulers/IoScheduler.java index af938f52b9..913a2643a6 100644 --- a/src/main/java/io/reactivex/internal/schedulers/IoScheduler.java +++ b/src/main/java/io/reactivex/internal/schedulers/IoScheduler.java @@ -22,7 +22,6 @@ import io.reactivex.Scheduler; import io.reactivex.disposables.*; import io.reactivex.internal.disposables.EmptyDisposable; -import io.reactivex.plugins.RxJavaPlugins; /** * Scheduler that creates and caches a set of thread pools and reuses them if possible. @@ -59,7 +58,7 @@ public final class IoScheduler extends Scheduler { EVICTOR_THREAD_FACTORY = new RxThreadFactory(EVICTOR_THREAD_NAME_PREFIX, priority); } - static final class CachedWorkerPool { + static final class CachedWorkerPool implements Runnable { private final long keepAliveTime; private final ConcurrentLinkedQueue expiringWorkerQueue; final CompositeDisposable allWorkers; @@ -75,23 +74,18 @@ static final class CachedWorkerPool { Future task = null; if (unit != null) { evictor = Executors.newScheduledThreadPool(1, EVICTOR_THREAD_FACTORY); - try { - task = evictor.scheduleWithFixedDelay( - new Runnable() { - @Override - public void run() { - evictExpiredWorkers(); - } - }, this.keepAliveTime, this.keepAliveTime, TimeUnit.NANOSECONDS - ); - } catch (RejectedExecutionException ex) { - RxJavaPlugins.onError(ex); - } + task = evictor.scheduleWithFixedDelay(this, this.keepAliveTime, this.keepAliveTime, TimeUnit.NANOSECONDS + ); } evictorService = evictor; evictorTask = task; } + @Override + public void run() { + evictExpiredWorkers(); + } + ThreadWorker get() { if (allWorkers.isDisposed()) { return SHUTDOWN_THREAD_WORKER; @@ -139,15 +133,12 @@ long now() { } void shutdown() { - try { - if (evictorTask != null) { - evictorTask.cancel(true); - } - if (evictorService != null) { - evictorService.shutdownNow(); - } - } finally { - allWorkers.dispose(); + allWorkers.dispose(); + if (evictorTask != null) { + evictorTask.cancel(true); + } + if (evictorService != null) { + evictorService.shutdownNow(); } } } diff --git a/src/main/java/io/reactivex/internal/schedulers/NewThreadWorker.java b/src/main/java/io/reactivex/internal/schedulers/NewThreadWorker.java index fff0b65353..9144b49773 100644 --- a/src/main/java/io/reactivex/internal/schedulers/NewThreadWorker.java +++ b/src/main/java/io/reactivex/internal/schedulers/NewThreadWorker.java @@ -123,6 +123,7 @@ public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, Time } sr.setFuture(f); } catch (RejectedExecutionException ex) { + parent.remove(sr); RxJavaPlugins.onError(ex); } diff --git a/src/main/java/io/reactivex/internal/schedulers/ScheduledRunnable.java b/src/main/java/io/reactivex/internal/schedulers/ScheduledRunnable.java index 9c8e2de64b..2bccfd77a6 100644 --- a/src/main/java/io/reactivex/internal/schedulers/ScheduledRunnable.java +++ b/src/main/java/io/reactivex/internal/schedulers/ScheduledRunnable.java @@ -46,7 +46,7 @@ public ScheduledRunnable(Runnable actual, DisposableContainer parent) { } @Override - public Object call() throws Exception { + public Object call() { // Being Callable saves an allocation in ThreadPoolExecutor run(); return null; @@ -55,27 +55,21 @@ public Object call() throws Exception { @Override public void run() { try { - actual.run(); - } catch (Throwable e) { - // Exceptions.throwIfFatal(e); nowhere to go - RxJavaPlugins.onError(e); + try { + actual.run(); + } catch (Throwable e) { + // Exceptions.throwIfFatal(e); nowhere to go + RxJavaPlugins.onError(e); + } } finally { Object o = get(PARENT_INDEX); - if (o != DISPOSED && o != null) { - // done races with dispose here - if (compareAndSet(PARENT_INDEX, o, DONE)) { - ((DisposableContainer)o).delete(this); - } + if (o != DISPOSED && o != null && compareAndSet(PARENT_INDEX, o, DONE)) { + ((DisposableContainer)o).delete(this); } for (;;) { o = get(FUTURE_INDEX); - if (o != DISPOSED) { - // o is either null or a future - if (compareAndSet(FUTURE_INDEX, o, DONE)) { - break; - } - } else { + if (o == DISPOSED || compareAndSet(FUTURE_INDEX, o, DONE)) { break; } } @@ -116,7 +110,7 @@ public void dispose() { for (;;) { Object o = get(PARENT_INDEX); if (o == DONE || o == DISPOSED || o == null) { - break; + return; } if (compareAndSet(PARENT_INDEX, o, DISPOSED)) { ((DisposableContainer)o).delete(this); @@ -127,6 +121,7 @@ public void dispose() { @Override public boolean isDisposed() { - return get(FUTURE_INDEX) == DISPOSED; + Object o = get(FUTURE_INDEX); + return o == DISPOSED || o == DONE; } } diff --git a/src/main/java/io/reactivex/internal/schedulers/SchedulerPoolFactory.java b/src/main/java/io/reactivex/internal/schedulers/SchedulerPoolFactory.java index 5124b34a6f..04ca52fd47 100644 --- a/src/main/java/io/reactivex/internal/schedulers/SchedulerPoolFactory.java +++ b/src/main/java/io/reactivex/internal/schedulers/SchedulerPoolFactory.java @@ -25,8 +25,11 @@ /** * Manages the creating of ScheduledExecutorServices and sets up purging. */ -public enum SchedulerPoolFactory { - ; +public final class SchedulerPoolFactory { + /** Utility class. */ + private SchedulerPoolFactory() { + throw new IllegalStateException("No instances!"); + } static final String PURGE_ENABLED_KEY = "rx2.purge-enabled"; diff --git a/src/main/java/io/reactivex/internal/schedulers/TrampolineScheduler.java b/src/main/java/io/reactivex/internal/schedulers/TrampolineScheduler.java index e0bbae9722..5a90b1378f 100644 --- a/src/main/java/io/reactivex/internal/schedulers/TrampolineScheduler.java +++ b/src/main/java/io/reactivex/internal/schedulers/TrampolineScheduler.java @@ -168,27 +168,25 @@ static final class SleepingRunnable implements Runnable { @Override public void run() { - if (worker.disposed) { - return; - } - long t = worker.now(TimeUnit.MILLISECONDS); - if (execTime > t) { - long delay = execTime - t; - if (delay > 0) { - try { - Thread.sleep(delay); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - RxJavaPlugins.onError(e); - return; + if (!worker.disposed) { + long t = worker.now(TimeUnit.MILLISECONDS); + if (execTime > t) { + long delay = execTime - t; + if (delay > 0) { + try { + Thread.sleep(delay); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + RxJavaPlugins.onError(e); + return; + } } } - } - if (worker.disposed) { - return; + if (!worker.disposed) { + run.run(); + } } - run.run(); } } } \ No newline at end of file diff --git a/src/test/java/io/reactivex/internal/operators/flowable/FlowableJoinTest.java b/src/test/java/io/reactivex/internal/operators/flowable/FlowableJoinTest.java index 5b602f0a5c..06e6b6eeb4 100644 --- a/src/test/java/io/reactivex/internal/operators/flowable/FlowableJoinTest.java +++ b/src/test/java/io/reactivex/internal/operators/flowable/FlowableJoinTest.java @@ -25,7 +25,7 @@ import org.reactivestreams.Subscriber; import io.reactivex.*; -import io.reactivex.exceptions.TestException; +import io.reactivex.exceptions.*; import io.reactivex.functions.*; import io.reactivex.internal.functions.Functions; import io.reactivex.internal.subscriptions.BooleanSubscription; @@ -446,4 +446,44 @@ public Integer apply(Integer a, Integer b) throws Exception { RxJavaPlugins.reset(); } } + + @Test + public void backpressureOverflowRight() { + PublishProcessor pp1 = PublishProcessor.create(); + PublishProcessor pp2 = PublishProcessor.create(); + + TestSubscriber ts = pp1.join(pp2, Functions.justFunction(Flowable.never()), Functions.justFunction(Flowable.never()), + new BiFunction() { + @Override + public Object apply(Integer a, Integer b) throws Exception { + return a + b; + } + }) + .test(0L); + + pp1.onNext(1); + pp2.onNext(2); + + ts.assertFailure(MissingBackpressureException.class); + } + + @Test + public void backpressureOverflowLeft() { + PublishProcessor pp1 = PublishProcessor.create(); + PublishProcessor pp2 = PublishProcessor.create(); + + TestSubscriber ts = pp1.join(pp2, Functions.justFunction(Flowable.never()), Functions.justFunction(Flowable.never()), + new BiFunction() { + @Override + public Object apply(Integer a, Integer b) throws Exception { + return a + b; + } + }) + .test(0L); + + pp2.onNext(2); + pp1.onNext(1); + + ts.assertFailure(MissingBackpressureException.class); + } } diff --git a/src/test/java/io/reactivex/internal/operators/flowable/FlowableMergeTest.java b/src/test/java/io/reactivex/internal/operators/flowable/FlowableMergeTest.java index b27d73d54d..59bbcce304 100644 --- a/src/test/java/io/reactivex/internal/operators/flowable/FlowableMergeTest.java +++ b/src/test/java/io/reactivex/internal/operators/flowable/FlowableMergeTest.java @@ -15,6 +15,7 @@ import static java.util.Arrays.asList; import static org.junit.Assert.*; +import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.*; import java.lang.reflect.Method; @@ -26,12 +27,11 @@ import org.reactivestreams.*; import io.reactivex.*; -import io.reactivex.Flowable; import io.reactivex.Scheduler.Worker; import io.reactivex.functions.*; import io.reactivex.internal.functions.Functions; import io.reactivex.internal.subscriptions.*; -import io.reactivex.internal.util.BackpressureHelper; +import io.reactivex.internal.util.*; import io.reactivex.processors.PublishProcessor; import io.reactivex.schedulers.*; import io.reactivex.subscribers.*; @@ -229,6 +229,8 @@ public void testSynchronizationOfMultipleSequences() throws Throwable { final AtomicInteger concurrentCounter = new AtomicInteger(); final AtomicInteger totalCounter = new AtomicInteger(); + final AtomicReference error = new AtomicReference(); + Flowable m = Flowable.merge(Flowable.unsafeCreate(o1), Flowable.unsafeCreate(o2)); m.subscribe(new DefaultSubscriber() { @@ -239,7 +241,7 @@ public void onComplete() { @Override public void onError(Throwable e) { - throw new RuntimeException("failed", e); + error.set(e); } @Override @@ -280,6 +282,10 @@ public void onNext(String v) { } try { // in try/finally so threads are released via latch countDown even if assertion fails + if (error.get() != null) { + throw ExceptionHelper.wrapOrThrow(error.get()); + } + assertEquals(1, concurrentCounter.get()); } finally { // release so it can finish diff --git a/src/test/java/io/reactivex/internal/operators/flowable/FlowableOnBackpressureErrorTest.java b/src/test/java/io/reactivex/internal/operators/flowable/FlowableOnBackpressureErrorTest.java new file mode 100644 index 0000000000..fb0290b490 --- /dev/null +++ b/src/test/java/io/reactivex/internal/operators/flowable/FlowableOnBackpressureErrorTest.java @@ -0,0 +1,53 @@ +/** + * Copyright 2016 Netflix, Inc. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.operators.flowable; + +import org.junit.Test; +import org.reactivestreams.Publisher; + +import io.reactivex.*; +import io.reactivex.functions.Function; + +public class FlowableOnBackpressureErrorTest { + + @Test + public void dispose() { + TestHelper.checkDisposed(Observable.just(1).toFlowable(BackpressureStrategy.ERROR)); + } + + @Test + public void badRequest() { + TestHelper.assertBadRequestReported(Observable.just(1).toFlowable(BackpressureStrategy.ERROR)); + } + + @Test + public void doubleOnSubscribe() { + TestHelper.checkDoubleOnSubscribeFlowable(new Function, Publisher>() { + @Override + public Publisher apply(Flowable f) throws Exception { + return new FlowableOnBackpressureError(f); + } + }); + } + + @Test + public void badSource() { + TestHelper.checkBadSourceFlowable(new Function, Object>() { + @Override + public Object apply(Flowable f) throws Exception { + return new FlowableOnBackpressureError(f); + } + }, false, 1, 1, 1); + } +} diff --git a/src/test/java/io/reactivex/internal/operators/flowable/FlowablePublishFunctionTest.java b/src/test/java/io/reactivex/internal/operators/flowable/FlowablePublishFunctionTest.java index aadcba0ef1..29ad8ca192 100644 --- a/src/test/java/io/reactivex/internal/operators/flowable/FlowablePublishFunctionTest.java +++ b/src/test/java/io/reactivex/internal/operators/flowable/FlowablePublishFunctionTest.java @@ -18,16 +18,19 @@ import static org.junit.Assert.*; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import org.junit.*; -import org.reactivestreams.Publisher; +import org.reactivestreams.*; -import io.reactivex.Flowable; -import io.reactivex.exceptions.MissingBackpressureException; +import io.reactivex.*; +import io.reactivex.exceptions.*; import io.reactivex.functions.Function; import io.reactivex.internal.functions.Functions; +import io.reactivex.internal.subscriptions.BooleanSubscription; import io.reactivex.processors.PublishProcessor; +import io.reactivex.schedulers.Schedulers; import io.reactivex.subscribers.TestSubscriber; @@ -290,4 +293,170 @@ public Publisher apply(Flowable v) throws Exception { assertFalse("pp has Subscribers?!", pp.hasSubscribers()); } + + @Test + public void badSource() { + TestHelper.checkBadSourceFlowable(new Function, Object>() { + @Override + public Object apply(Flowable f) throws Exception { + return f.publish(Functions.>identity()); + } + }, false, 1, 1, 1); + } + + @Test + public void frontOverflow() { + new Flowable() { + @Override + protected void subscribeActual(Subscriber s) { + s.onSubscribe(new BooleanSubscription()); + for (int i = 0; i < 9; i++) { + s.onNext(i); + } + } + } + .publish(Functions.>identity(), 8) + .test(0) + .assertFailure(MissingBackpressureException.class); + } + + @Test + public void errorResubscribe() { + Flowable.error(new TestException()) + .publish(new Function, Publisher>() { + @Override + public Publisher apply(Flowable f) throws Exception { + return f.onErrorResumeNext(f); + } + }) + .test() + .assertFailure(TestException.class); + } + + @Test + public void fusedInputCrash() { + Flowable.just(1) + .map(new Function() { + @Override + public Integer apply(Integer v) throws Exception { + throw new TestException(); + } + }) + .publish(Functions.>identity()) + .test() + .assertFailure(TestException.class); + } + + @Test + public void error() { + new FlowablePublishMulticast(Flowable.just(1).concatWith(Flowable.error(new TestException())), + Functions.>identity(), 16, true) + .test() + .assertFailure(TestException.class, 1); + } + + @Test + public void backpressuredEmpty() { + Flowable.empty() + .publish(Functions.>identity()) + .test(0L) + .assertResult(); + } + + @Test + public void oneByOne() { + Flowable.range(1, 10) + .publish(Functions.>identity()) + .rebatchRequests(1) + .test() + .assertResult(1, 2, 3, 4, 5, 6, 7, 8, 9, 10); + } + + @Test + public void completeCancelRaceNoRequest() { + final PublishProcessor pp = PublishProcessor.create(); + + final TestSubscriber ts = new TestSubscriber(1L) { + @Override + public void onNext(Integer t) { + super.onNext(t); + if (t == 1) { + cancel(); + onComplete(); + } + } + }; + + pp.publish(Functions.>identity()).subscribe(ts); + + pp.onNext(1); + + assertFalse(pp.hasSubscribers()); + + ts.assertResult(1); + } + + @Test + public void inputOutputSubscribeRace() { + Flowable source = Flowable.just(1) + .publish(new Function, Publisher>() { + @Override + public Publisher apply(Flowable f) throws Exception { + return f.subscribeOn(Schedulers.single()); + } + }); + + for (int i = 0; i < 500; i++) { + source.test() + .awaitDone(5, TimeUnit.MILLISECONDS) + .assertResult(1); + } + } + + @Test + public void inputOutputSubscribeRace2() { + Flowable source = Flowable.just(1).subscribeOn(Schedulers.single()) + .publish(Functions.>identity()); + + for (int i = 0; i < 500; i++) { + source.test() + .awaitDone(5, TimeUnit.MILLISECONDS) + .assertResult(1); + } + } + + @Test + public void sourceSubscriptionDelayed() { + for (int i = 0; i < 500; i++) { + final TestSubscriber ts1 = new TestSubscriber(0L); + + Flowable.just(1) + .publish(new Function, Publisher>() { + @Override + public Publisher apply(final Flowable f) throws Exception { + Runnable r1 = new Runnable() { + @Override + public void run() { + f.subscribe(ts1); + } + }; + + Runnable r2 = new Runnable() { + @Override + public void run() { + for (int j = 0; j < 100; j++) { + ts1.request(1); + } + } + }; + + TestHelper.race(r1, r2); + return f; + } + }).test() + .assertResult(1); + + ts1.assertResult(1); + } + } } diff --git a/src/test/java/io/reactivex/internal/operators/flowable/FlowableReplayTest.java b/src/test/java/io/reactivex/internal/operators/flowable/FlowableReplayTest.java index 0cc237776c..ed37e6e54a 100644 --- a/src/test/java/io/reactivex/internal/operators/flowable/FlowableReplayTest.java +++ b/src/test/java/io/reactivex/internal/operators/flowable/FlowableReplayTest.java @@ -544,7 +544,7 @@ public void testIssue2191_UnsubscribeSource() throws Exception { verifyObserverMock(spiedSubscriberBeforeConnect, 2, 4); verifyObserverMock(spiedSubscriberAfterConnect, 2, 4); - verify(sourceUnsubscribed, times(1)).run(); + verify(sourceUnsubscribed, never()).run(); verifyNoMoreInteractions(sourceNext); verifyNoMoreInteractions(sourceCompleted); @@ -603,8 +603,8 @@ public void testIssue2191_SchedulerUnsubscribe() throws Exception { // FIXME not supported // verify(spiedWorker, times(1)).isUnsubscribed(); // FIXME publish calls cancel too - verify(spiedWorker, times(2)).dispose(); - verify(sourceUnsubscribed, times(1)).run(); + verify(spiedWorker, times(1)).dispose(); + verify(sourceUnsubscribed, never()).run(); verifyNoMoreInteractions(sourceNext); verifyNoMoreInteractions(sourceCompleted); @@ -669,8 +669,8 @@ public void testIssue2191_SchedulerUnsubscribeOnError() throws Exception { // FIXME no longer supported // verify(spiedWorker, times(1)).isUnsubscribed(); // FIXME publish also calls cancel - verify(spiedWorker, times(2)).dispose(); - verify(sourceUnsubscribed, times(1)).run(); + verify(spiedWorker, times(1)).dispose(); + verify(sourceUnsubscribed, never()).run(); verifyNoMoreInteractions(sourceNext); verifyNoMoreInteractions(sourceCompleted); diff --git a/src/test/java/io/reactivex/internal/operators/flowable/FlowableTakeUntilTest.java b/src/test/java/io/reactivex/internal/operators/flowable/FlowableTakeUntilTest.java index 12a168ed1f..69b8d3c706 100644 --- a/src/test/java/io/reactivex/internal/operators/flowable/FlowableTakeUntilTest.java +++ b/src/test/java/io/reactivex/internal/operators/flowable/FlowableTakeUntilTest.java @@ -68,7 +68,7 @@ public void testTakeUntilSourceCompleted() { verify(result, times(1)).onNext("one"); verify(result, times(1)).onNext("two"); - verify(sSource, times(1)).cancel(); + verify(sSource, never()).cancel(); verify(sOther, times(1)).cancel(); } @@ -93,7 +93,7 @@ public void testTakeUntilSourceError() { verify(result, times(1)).onNext("two"); verify(result, times(0)).onNext("three"); verify(result, times(1)).onError(error); - verify(sSource, times(1)).cancel(); + verify(sSource, never()).cancel(); verify(sOther, times(1)).cancel(); } @@ -120,7 +120,7 @@ public void testTakeUntilOtherError() { verify(result, times(1)).onError(error); verify(result, times(0)).onComplete(); verify(sSource, times(1)).cancel(); - verify(sOther, times(1)).cancel(); + verify(sOther, never()).cancel(); } @@ -147,7 +147,7 @@ public void testTakeUntilOtherCompleted() { verify(result, times(0)).onNext("three"); verify(result, times(1)).onComplete(); verify(sSource, times(1)).cancel(); - verify(sOther, times(1)).cancel(); // unsubscribed since SafeSubscriber unsubscribes after onComplete + verify(sOther, never()).cancel(); // unsubscribed since SafeSubscriber unsubscribes after onComplete } diff --git a/src/test/java/io/reactivex/internal/schedulers/ComputationSchedulerInternalTest.java b/src/test/java/io/reactivex/internal/schedulers/ComputationSchedulerInternalTest.java new file mode 100644 index 0000000000..16856c8ed6 --- /dev/null +++ b/src/test/java/io/reactivex/internal/schedulers/ComputationSchedulerInternalTest.java @@ -0,0 +1,32 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.reactivex.internal.schedulers; + +import static org.junit.Assert.*; +import org.junit.Test; + +public class ComputationSchedulerInternalTest { + + @Test + public void capPoolSize() { + assertEquals(8, ComputationScheduler.cap(8, -1)); + assertEquals(8, ComputationScheduler.cap(8, 0)); + assertEquals(4, ComputationScheduler.cap(8, 4)); + assertEquals(8, ComputationScheduler.cap(8, 8)); + assertEquals(8, ComputationScheduler.cap(8, 9)); + assertEquals(8, ComputationScheduler.cap(8, 16)); + } +} diff --git a/src/test/java/io/reactivex/internal/schedulers/DisposeOnCancelTest.java b/src/test/java/io/reactivex/internal/schedulers/DisposeOnCancelTest.java new file mode 100644 index 0000000000..869061b2e7 --- /dev/null +++ b/src/test/java/io/reactivex/internal/schedulers/DisposeOnCancelTest.java @@ -0,0 +1,42 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.schedulers; + +import static org.junit.Assert.*; + +import java.util.concurrent.TimeUnit; + +import org.junit.Test; + +import io.reactivex.disposables.*; + +public class DisposeOnCancelTest { + + @Test + public void basicCoverage() throws Exception { + Disposable d = Disposables.empty(); + + DisposeOnCancel doc = new DisposeOnCancel(d); + + assertFalse(doc.cancel(true)); + + assertFalse(doc.isCancelled()); + + assertFalse(doc.isDone()); + + assertNull(doc.get()); + + assertNull(doc.get(1, TimeUnit.SECONDS)); + } +} diff --git a/src/test/java/io/reactivex/internal/schedulers/ScheduledRunnableTest.java b/src/test/java/io/reactivex/internal/schedulers/ScheduledRunnableTest.java new file mode 100644 index 0000000000..f19caa0a97 --- /dev/null +++ b/src/test/java/io/reactivex/internal/schedulers/ScheduledRunnableTest.java @@ -0,0 +1,220 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.schedulers; + +import static org.junit.Assert.*; + +import java.lang.Thread.UncaughtExceptionHandler; +import java.util.List; +import java.util.concurrent.FutureTask; + +import org.junit.Test; + +import io.reactivex.TestHelper; +import io.reactivex.disposables.CompositeDisposable; +import io.reactivex.exceptions.TestException; +import io.reactivex.internal.functions.Functions; +import io.reactivex.plugins.RxJavaPlugins; + +public class ScheduledRunnableTest { + + @Test + public void dispose() { + CompositeDisposable set = new CompositeDisposable(); + ScheduledRunnable run = new ScheduledRunnable(Functions.EMPTY_RUNNABLE, set); + set.add(run); + + assertFalse(run.isDisposed()); + + set.dispose(); + + assertTrue(run.isDisposed()); + } + + @Test + public void disposeRun() { + CompositeDisposable set = new CompositeDisposable(); + ScheduledRunnable run = new ScheduledRunnable(Functions.EMPTY_RUNNABLE, set); + set.add(run); + + assertFalse(run.isDisposed()); + + run.dispose(); + run.dispose(); + + assertTrue(run.isDisposed()); + } + + @Test + public void setFutureCancelRace() { + for (int i = 0; i < 500; i++) { + CompositeDisposable set = new CompositeDisposable(); + final ScheduledRunnable run = new ScheduledRunnable(Functions.EMPTY_RUNNABLE, set); + set.add(run); + + final FutureTask ft = new FutureTask(Functions.EMPTY_RUNNABLE, 0); + + Runnable r1 = new Runnable() { + @Override + public void run() { + run.setFuture(ft); + } + }; + + Runnable r2 = new Runnable() { + @Override + public void run() { + run.dispose(); + } + }; + + TestHelper.race(r1, r2); + + assertEquals(0, set.size()); + } + } + + @Test + public void setFutureRunRace() { + for (int i = 0; i < 500; i++) { + CompositeDisposable set = new CompositeDisposable(); + final ScheduledRunnable run = new ScheduledRunnable(Functions.EMPTY_RUNNABLE, set); + set.add(run); + + final FutureTask ft = new FutureTask(Functions.EMPTY_RUNNABLE, 0); + + Runnable r1 = new Runnable() { + @Override + public void run() { + run.setFuture(ft); + } + }; + + Runnable r2 = new Runnable() { + @Override + public void run() { + run.run(); + } + }; + + TestHelper.race(r1, r2); + + assertEquals(0, set.size()); + } + } + + @Test + public void disposeRace() { + for (int i = 0; i < 500; i++) { + CompositeDisposable set = new CompositeDisposable(); + final ScheduledRunnable run = new ScheduledRunnable(Functions.EMPTY_RUNNABLE, set); + set.add(run); + + Runnable r1 = new Runnable() { + @Override + public void run() { + run.dispose(); + } + }; + + TestHelper.race(r1, r1); + + assertEquals(0, set.size()); + } + } + + @Test + public void runDispose() { + for (int i = 0; i < 500; i++) { + CompositeDisposable set = new CompositeDisposable(); + final ScheduledRunnable run = new ScheduledRunnable(Functions.EMPTY_RUNNABLE, set); + set.add(run); + + Runnable r1 = new Runnable() { + @Override + public void run() { + run.call(); + } + }; + + Runnable r2 = new Runnable() { + @Override + public void run() { + run.dispose(); + } + }; + + TestHelper.race(r1, r2); + + assertEquals(0, set.size()); + } + } + + @Test + public void pluginCrash() { + Thread.currentThread().setUncaughtExceptionHandler(new UncaughtExceptionHandler() { + @Override + public void uncaughtException(Thread t, Throwable e) { + throw new TestException("Second"); + } + }); + + CompositeDisposable set = new CompositeDisposable(); + final ScheduledRunnable run = new ScheduledRunnable(new Runnable() { + @Override + public void run() { + throw new TestException("First"); + } + }, set); + set.add(run); + + try { + run.run(); + + fail("Should have thrown!"); + } catch (TestException ex) { + assertEquals("Second", ex.getMessage()); + } finally { + Thread.currentThread().setUncaughtExceptionHandler(null); + } + assertTrue(run.isDisposed()); + + assertEquals(0, set.size()); + } + + @Test + public void crashReported() { + List errors = TestHelper.trackPluginErrors(); + try { + CompositeDisposable set = new CompositeDisposable(); + final ScheduledRunnable run = new ScheduledRunnable(new Runnable() { + @Override + public void run() { + throw new TestException("First"); + } + }, set); + set.add(run); + + run.run(); + + assertTrue(run.isDisposed()); + + assertEquals(0, set.size()); + + TestHelper.assertError(errors, 0, TestException.class, "First"); + } finally { + RxJavaPlugins.reset(); + } + } +} diff --git a/src/test/java/io/reactivex/internal/schedulers/SchedulerPoolFactoryTest.java b/src/test/java/io/reactivex/internal/schedulers/SchedulerPoolFactoryTest.java new file mode 100644 index 0000000000..7e799502c4 --- /dev/null +++ b/src/test/java/io/reactivex/internal/schedulers/SchedulerPoolFactoryTest.java @@ -0,0 +1,29 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.reactivex.internal.schedulers; + +import org.junit.Test; + +import io.reactivex.TestHelper; + +public class SchedulerPoolFactoryTest { + + @Test + public void utilityClass() { + TestHelper.checkUtilityClass(SchedulerPoolFactory.class); + } +} diff --git a/src/test/java/io/reactivex/internal/schedulers/SingleSchedulerTest.java b/src/test/java/io/reactivex/internal/schedulers/SingleSchedulerTest.java new file mode 100644 index 0000000000..50ce5a575c --- /dev/null +++ b/src/test/java/io/reactivex/internal/schedulers/SingleSchedulerTest.java @@ -0,0 +1,81 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.schedulers; + +import static org.junit.Assert.*; + +import java.util.concurrent.TimeUnit; + +import org.junit.Test; + +import io.reactivex.*; +import io.reactivex.Scheduler.Worker; +import io.reactivex.disposables.Disposables; +import io.reactivex.internal.schedulers.SingleScheduler.ScheduledWorker; + +public class SingleSchedulerTest { + + @Test + public void shutdownRejects() { + final int[] calls = { 0 }; + + Runnable r = new Runnable() { + @Override + public void run() { + calls[0]++; + } + }; + + Scheduler s = new SingleScheduler(); + s.shutdown(); + + assertEquals(Disposables.disposed(), s.scheduleDirect(r)); + + assertEquals(Disposables.disposed(), s.scheduleDirect(r, 1, TimeUnit.SECONDS)); + + assertEquals(Disposables.disposed(), s.schedulePeriodicallyDirect(r, 1, 1, TimeUnit.SECONDS)); + + Worker w = s.createWorker(); + ((ScheduledWorker)w).executor.shutdownNow(); + + assertEquals(Disposables.disposed(), w.schedule(r)); + + assertEquals(Disposables.disposed(), w.schedule(r, 1, TimeUnit.SECONDS)); + + assertEquals(Disposables.disposed(), w.schedulePeriodically(r, 1, 1, TimeUnit.SECONDS)); + + assertEquals(0, calls[0]); + + w.dispose(); + + assertTrue(w.isDisposed()); + } + + @Test + public void startRace() { + final Scheduler s = new SingleScheduler(); + for (int i = 0; i < 500; i++) { + s.shutdown(); + + Runnable r1 = new Runnable() { + @Override + public void run() { + s.start(); + } + }; + + TestHelper.race(r1, r1); + } + } +} diff --git a/src/test/java/io/reactivex/internal/schedulers/TrampolineSchedulerInternalTest.java b/src/test/java/io/reactivex/internal/schedulers/TrampolineSchedulerInternalTest.java new file mode 100644 index 0000000000..cfb9c728ea --- /dev/null +++ b/src/test/java/io/reactivex/internal/schedulers/TrampolineSchedulerInternalTest.java @@ -0,0 +1,163 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.reactivex.internal.schedulers; + +import static org.junit.Assert.*; + +import java.util.concurrent.TimeUnit; + +import org.junit.Test; + +import io.reactivex.Scheduler.Worker; +import io.reactivex.internal.disposables.EmptyDisposable; +import io.reactivex.internal.functions.Functions; +import io.reactivex.schedulers.Schedulers; + +public class TrampolineSchedulerInternalTest { + + @Test + public void scheduleDirectInterrupt() { + Thread.currentThread().interrupt(); + + final int[] calls = { 0 }; + + assertSame(EmptyDisposable.INSTANCE, Schedulers.trampoline().scheduleDirect(new Runnable() { + @Override + public void run() { + calls[0]++; + } + }, 1, TimeUnit.SECONDS)); + + assertTrue(Thread.interrupted()); + + assertEquals(0, calls[0]); + } + + @Test + public void dispose() { + Worker w = Schedulers.trampoline().createWorker(); + + assertFalse(w.isDisposed()); + + w.dispose(); + + assertTrue(w.isDisposed()); + + assertEquals(EmptyDisposable.INSTANCE, w.schedule(Functions.EMPTY_RUNNABLE)); + } + + @Test + public void reentrantScheduleDispose() { + final Worker w = Schedulers.trampoline().createWorker(); + try { + final int[] calls = { 0, 0 }; + w.schedule(new Runnable() { + @Override + public void run() { + calls[0]++; + w.schedule(new Runnable() { + @Override + public void run() { + calls[1]++; + } + }) + .dispose(); + } + }); + + assertEquals(1, calls[0]); + assertEquals(0, calls[1]); + } finally { + w.dispose(); + } + } + + @Test + public void reentrantScheduleShutdown() { + final Worker w = Schedulers.trampoline().createWorker(); + try { + final int[] calls = { 0, 0 }; + w.schedule(new Runnable() { + @Override + public void run() { + calls[0]++; + w.schedule(new Runnable() { + @Override + public void run() { + calls[1]++; + } + }, 1, TimeUnit.MILLISECONDS); + + w.dispose(); + } + }); + + assertEquals(1, calls[0]); + assertEquals(0, calls[1]); + } finally { + w.dispose(); + } + } + + @Test + public void reentrantScheduleShutdown2() { + final Worker w = Schedulers.trampoline().createWorker(); + try { + final int[] calls = { 0, 0 }; + w.schedule(new Runnable() { + @Override + public void run() { + calls[0]++; + w.dispose(); + + assertSame(EmptyDisposable.INSTANCE, w.schedule(new Runnable() { + @Override + public void run() { + calls[1]++; + } + }, 1, TimeUnit.MILLISECONDS)); + } + }); + + assertEquals(1, calls[0]); + assertEquals(0, calls[1]); + } finally { + w.dispose(); + } + } + + @Test(timeout = 5000) + public void reentrantScheduleInterrupt() { + final Worker w = Schedulers.trampoline().createWorker(); + try { + final int[] calls = { 0 }; + Thread.currentThread().interrupt(); + w.schedule(new Runnable() { + @Override + public void run() { + calls[0]++; + } + }, 1, TimeUnit.DAYS); + + assertTrue(Thread.interrupted()); + + assertEquals(0, calls[0]); + } finally { + w.dispose(); + } + } +} diff --git a/src/test/java/io/reactivex/internal/subscribers/EmptyComponentTest.java b/src/test/java/io/reactivex/internal/subscribers/EmptyComponentTest.java index e08eddbf8e..9ebd1b0960 100644 --- a/src/test/java/io/reactivex/internal/subscribers/EmptyComponentTest.java +++ b/src/test/java/io/reactivex/internal/subscribers/EmptyComponentTest.java @@ -63,6 +63,8 @@ public void normal() { c.onError(new TestException()); + c.onSuccess(2); + c.cancel(); TestHelper.assertError(errors, 0, TestException.class); diff --git a/src/test/java/io/reactivex/internal/util/BackpressureHelperTest.java b/src/test/java/io/reactivex/internal/util/BackpressureHelperTest.java index d77929af69..adf1467b18 100644 --- a/src/test/java/io/reactivex/internal/util/BackpressureHelperTest.java +++ b/src/test/java/io/reactivex/internal/util/BackpressureHelperTest.java @@ -66,6 +66,21 @@ public void producedMore() { } } + @Test + public void producedMoreCancel() { + List list = TestHelper.trackPluginErrors(); + + try { + AtomicLong requested = new AtomicLong(1); + + assertEquals(0, BackpressureHelper.producedCancel(requested, 2)); + + TestHelper.assertError(list, 0, IllegalStateException.class, "More produced than requested: -1"); + } finally { + RxJavaPlugins.reset(); + } + } + @Test public void requestProduceRace() { final AtomicLong requested = new AtomicLong(1); diff --git a/src/test/java/io/reactivex/schedulers/CachedThreadSchedulerTest.java b/src/test/java/io/reactivex/schedulers/CachedThreadSchedulerTest.java index 33838e68d8..0ab787b1c8 100644 --- a/src/test/java/io/reactivex/schedulers/CachedThreadSchedulerTest.java +++ b/src/test/java/io/reactivex/schedulers/CachedThreadSchedulerTest.java @@ -13,13 +13,17 @@ package io.reactivex.schedulers; -import static org.junit.Assert.assertTrue; +import static org.junit.Assert.*; + +import java.util.concurrent.TimeUnit; import org.junit.*; import io.reactivex.*; import io.reactivex.Scheduler.Worker; +import io.reactivex.disposables.*; import io.reactivex.functions.*; +import io.reactivex.internal.schedulers.IoScheduler; public class CachedThreadSchedulerTest extends AbstractSchedulerConcurrencyTests { @@ -81,4 +85,47 @@ public void testCancelledTaskRetention() throws InterruptedException { } } + @Test + public void workerDisposed() { + Worker w = Schedulers.io().createWorker(); + + assertFalse(((Disposable)w).isDisposed()); + + w.dispose(); + + assertTrue(((Disposable)w).isDisposed()); + } + + @Test + public void shutdownRejects() { + final int[] calls = { 0 }; + + Runnable r = new Runnable() { + @Override + public void run() { + calls[0]++; + } + }; + + IoScheduler s = new IoScheduler(); + s.shutdown(); + s.shutdown(); + + s.scheduleDirect(r); + + s.scheduleDirect(r, 1, TimeUnit.SECONDS); + + s.schedulePeriodicallyDirect(r, 1, 1, TimeUnit.SECONDS); + + Worker w = s.createWorker(); + w.dispose(); + + assertEquals(Disposables.disposed(), w.schedule(r)); + + assertEquals(Disposables.disposed(), w.schedule(r, 1, TimeUnit.SECONDS)); + + assertEquals(Disposables.disposed(), w.schedulePeriodically(r, 1, 1, TimeUnit.SECONDS)); + + assertEquals(0, calls[0]); + } } \ No newline at end of file diff --git a/src/test/java/io/reactivex/schedulers/ComputationSchedulerTests.java b/src/test/java/io/reactivex/schedulers/ComputationSchedulerTests.java index 7d4fcbf9e6..1076161aea 100644 --- a/src/test/java/io/reactivex/schedulers/ComputationSchedulerTests.java +++ b/src/test/java/io/reactivex/schedulers/ComputationSchedulerTests.java @@ -16,13 +16,15 @@ import static org.junit.Assert.*; import java.util.HashMap; -import java.util.concurrent.CountDownLatch; +import java.util.concurrent.*; import org.junit.*; import io.reactivex.*; import io.reactivex.Scheduler.Worker; +import io.reactivex.disposables.Disposables; import io.reactivex.functions.*; +import io.reactivex.internal.schedulers.ComputationScheduler; public class ComputationSchedulerTests extends AbstractSchedulerConcurrencyTests { @@ -162,4 +164,39 @@ public void testCancelledTaskRetention() throws InterruptedException { w.dispose(); } } + + @Test + public void shutdownRejects() { + final int[] calls = { 0 }; + + Runnable r = new Runnable() { + @Override + public void run() { + calls[0]++; + } + }; + + Scheduler s = new ComputationScheduler(); + s.shutdown(); + s.shutdown(); + + assertEquals(Disposables.disposed(), s.scheduleDirect(r)); + + assertEquals(Disposables.disposed(), s.scheduleDirect(r, 1, TimeUnit.SECONDS)); + + assertEquals(Disposables.disposed(), s.schedulePeriodicallyDirect(r, 1, 1, TimeUnit.SECONDS)); + + Worker w = s.createWorker(); + w.dispose(); + + assertTrue(w.isDisposed()); + + assertEquals(Disposables.disposed(), w.schedule(r)); + + assertEquals(Disposables.disposed(), w.schedule(r, 1, TimeUnit.SECONDS)); + + assertEquals(Disposables.disposed(), w.schedulePeriodically(r, 1, 1, TimeUnit.SECONDS)); + + assertEquals(0, calls[0]); + } } \ No newline at end of file diff --git a/src/test/java/io/reactivex/schedulers/ExecutorSchedulerTest.java b/src/test/java/io/reactivex/schedulers/ExecutorSchedulerTest.java index 2b9e532ce8..1ec25aa265 100644 --- a/src/test/java/io/reactivex/schedulers/ExecutorSchedulerTest.java +++ b/src/test/java/io/reactivex/schedulers/ExecutorSchedulerTest.java @@ -446,4 +446,48 @@ public void run() { assertTrue(s.isDisposed()); } + + @Test + public void disposeRace() { + ExecutorService exec = Executors.newSingleThreadExecutor(); + final Scheduler s = Schedulers.from(exec); + try { + for (int i = 0; i < 500; i++) { + final Worker w = s.createWorker(); + + final AtomicInteger c = new AtomicInteger(2); + + w.schedule(new Runnable() { + @Override + public void run() { + c.decrementAndGet(); + while (c.get() != 0) { } + } + }); + + c.decrementAndGet(); + while (c.get() != 0) { } + w.dispose(); + } + } finally { + exec.shutdownNow(); + } + } + + @Test + public void runnableDisposed() { + final Scheduler s = Schedulers.from(new Executor() { + @Override + public void execute(Runnable r) { + r.run(); + } + }); + Disposable d = s.scheduleDirect(Functions.EMPTY_RUNNABLE); + + assertFalse(d.isDisposed()); + + d.dispose(); + + assertTrue(d.isDisposed()); + } } \ No newline at end of file diff --git a/src/test/java/io/reactivex/schedulers/NewThreadSchedulerTest.java b/src/test/java/io/reactivex/schedulers/NewThreadSchedulerTest.java index 2d72e77e36..aa20f73987 100644 --- a/src/test/java/io/reactivex/schedulers/NewThreadSchedulerTest.java +++ b/src/test/java/io/reactivex/schedulers/NewThreadSchedulerTest.java @@ -13,9 +13,16 @@ package io.reactivex.schedulers; +import static org.junit.Assert.*; + +import java.util.concurrent.TimeUnit; + import org.junit.*; import io.reactivex.Scheduler; +import io.reactivex.Scheduler.Worker; +import io.reactivex.disposables.*; +import io.reactivex.internal.schedulers.NewThreadWorker; public class NewThreadSchedulerTest extends AbstractSchedulerConcurrencyTests { @@ -74,4 +81,38 @@ public final void testHandledErrorIsNotDeliveredToThreadHandler() throws Interru // worker.dispose(); // } // } + + @Test + public void shutdownRejects() { + final int[] calls = { 0 }; + + Runnable r = new Runnable() { + @Override + public void run() { + calls[0]++; + } + }; + + Scheduler s = getScheduler(); + Worker w = s.createWorker(); + w.dispose(); + + assertTrue(w.isDisposed()); + + assertEquals(Disposables.disposed(), w.schedule(r)); + + assertEquals(Disposables.disposed(), w.schedule(r, 1, TimeUnit.SECONDS)); + + assertEquals(Disposables.disposed(), w.schedulePeriodically(r, 1, 1, TimeUnit.SECONDS)); + + NewThreadWorker actual = (NewThreadWorker)w; + + CompositeDisposable cd = new CompositeDisposable(); + + actual.scheduleActual(r, 1, TimeUnit.SECONDS, cd); + + assertEquals(0, cd.size()); + + assertEquals(0, calls[0]); + } } \ No newline at end of file