Skip to content

Commit 3634c92

Browse files
authored
2.x: cleanup, fixes, coverage 10/24-1 (#4761)
* 2.x: cleanup, fixes, coverage 10/24-1 * Add missing default method.
1 parent 699e8e9 commit 3634c92

File tree

50 files changed

+2136
-1030
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

50 files changed

+2136
-1030
lines changed

src/main/java/io/reactivex/Flowable.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -12019,7 +12019,7 @@ public final Flowable<T> startWithArray(T... items) {
1201912019
@SchedulerSupport(SchedulerSupport.NONE)
1202012020
public final Disposable subscribe() {
1202112021
return subscribe(Functions.emptyConsumer(), Functions.ERROR_CONSUMER,
12022-
Functions.EMPTY_ACTION, FlowableInternalHelper.requestMax());
12022+
Functions.EMPTY_ACTION, FlowableInternalHelper.RequestMax.INSTANCE);
1202312023
}
1202412024

1202512025
/**
@@ -12046,7 +12046,7 @@ public final Disposable subscribe() {
1204612046
@SchedulerSupport(SchedulerSupport.NONE)
1204712047
public final Disposable subscribe(Consumer<? super T> onNext) {
1204812048
return subscribe(onNext, Functions.ERROR_CONSUMER,
12049-
Functions.EMPTY_ACTION, FlowableInternalHelper.requestMax());
12049+
Functions.EMPTY_ACTION, FlowableInternalHelper.RequestMax.INSTANCE);
1205012050
}
1205112051

1205212052
/**
@@ -12075,7 +12075,7 @@ public final Disposable subscribe(Consumer<? super T> onNext) {
1207512075
@BackpressureSupport(BackpressureKind.UNBOUNDED_IN)
1207612076
@SchedulerSupport(SchedulerSupport.NONE)
1207712077
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError) {
12078-
return subscribe(onNext, onError, Functions.EMPTY_ACTION, FlowableInternalHelper.requestMax());
12078+
return subscribe(onNext, onError, Functions.EMPTY_ACTION, FlowableInternalHelper.RequestMax.INSTANCE);
1207912079
}
1208012080

1208112081
/**
@@ -12109,7 +12109,7 @@ public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super T
1210912109
@SchedulerSupport(SchedulerSupport.NONE)
1211012110
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError,
1211112111
Action onComplete) {
12112-
return subscribe(onNext, onError, onComplete, FlowableInternalHelper.requestMax());
12112+
return subscribe(onNext, onError, onComplete, FlowableInternalHelper.RequestMax.INSTANCE);
1211312113
}
1211412114

1211512115
/**

src/main/java/io/reactivex/internal/operators/flowable/BlockingFlowableIterable.java

Lines changed: 151 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,17 @@
1313

1414
package io.reactivex.internal.operators.flowable;
1515

16-
import java.util.Iterator;
16+
import java.util.*;
17+
import java.util.concurrent.atomic.AtomicReference;
18+
import java.util.concurrent.locks.*;
1719

18-
import org.reactivestreams.Publisher;
20+
import org.reactivestreams.*;
21+
22+
import io.reactivex.disposables.Disposable;
23+
import io.reactivex.exceptions.MissingBackpressureException;
24+
import io.reactivex.internal.queue.SpscArrayQueue;
25+
import io.reactivex.internal.subscriptions.SubscriptionHelper;
26+
import io.reactivex.internal.util.ExceptionHelper;
1927

2028
public final class BlockingFlowableIterable<T> implements Iterable<T> {
2129
final Publisher<? extends T> source;
@@ -33,4 +41,145 @@ public Iterator<T> iterator() {
3341
source.subscribe(it);
3442
return it;
3543
}
44+
45+
static final class BlockingFlowableIterator<T>
46+
extends AtomicReference<Subscription>
47+
implements Subscriber<T>, Iterator<T>, Runnable, Disposable {
48+
49+
private static final long serialVersionUID = 6695226475494099826L;
50+
51+
final SpscArrayQueue<T> queue;
52+
53+
final long batchSize;
54+
55+
final long limit;
56+
57+
final Lock lock;
58+
59+
final Condition condition;
60+
61+
long produced;
62+
63+
volatile boolean done;
64+
Throwable error;
65+
66+
BlockingFlowableIterator(int batchSize) {
67+
this.queue = new SpscArrayQueue<T>(batchSize);
68+
this.batchSize = batchSize;
69+
this.limit = batchSize - (batchSize >> 2);
70+
this.lock = new ReentrantLock();
71+
this.condition = lock.newCondition();
72+
}
73+
74+
@Override
75+
public boolean hasNext() {
76+
for (;;) {
77+
boolean d = done;
78+
boolean empty = queue.isEmpty();
79+
if (d) {
80+
Throwable e = error;
81+
if (e != null) {
82+
throw ExceptionHelper.wrapOrThrow(e);
83+
} else
84+
if (empty) {
85+
return false;
86+
}
87+
}
88+
if (empty) {
89+
lock.lock();
90+
try {
91+
while (!done && queue.isEmpty()) {
92+
condition.await();
93+
}
94+
} catch (InterruptedException ex) {
95+
run();
96+
throw ExceptionHelper.wrapOrThrow(ex);
97+
} finally {
98+
lock.unlock();
99+
}
100+
} else {
101+
return true;
102+
}
103+
}
104+
}
105+
106+
@Override
107+
public T next() {
108+
if (hasNext()) {
109+
T v = queue.poll();
110+
111+
long p = produced + 1;
112+
if (p == limit) {
113+
produced = 0;
114+
get().request(p);
115+
} else {
116+
produced = p;
117+
}
118+
119+
return v;
120+
}
121+
throw new NoSuchElementException();
122+
}
123+
124+
@Override
125+
public void onSubscribe(Subscription s) {
126+
if (SubscriptionHelper.setOnce(this, s)) {
127+
s.request(batchSize);
128+
}
129+
}
130+
131+
@Override
132+
public void onNext(T t) {
133+
if (!queue.offer(t)) {
134+
SubscriptionHelper.cancel(this);
135+
136+
onError(new MissingBackpressureException("Queue full?!"));
137+
} else {
138+
signalConsumer();
139+
}
140+
}
141+
142+
@Override
143+
public void onError(Throwable t) {
144+
error = t;
145+
done = true;
146+
signalConsumer();
147+
}
148+
149+
@Override
150+
public void onComplete() {
151+
done = true;
152+
signalConsumer();
153+
}
154+
155+
void signalConsumer() {
156+
lock.lock();
157+
try {
158+
condition.signalAll();
159+
} finally {
160+
lock.unlock();
161+
}
162+
}
163+
164+
@Override
165+
public void run() {
166+
SubscriptionHelper.cancel(this);
167+
signalConsumer();
168+
}
169+
170+
@Override // otherwise default method which isn't available in Java 7
171+
public void remove() {
172+
throw new UnsupportedOperationException("remove");
173+
}
174+
175+
@Override
176+
public void dispose() {
177+
SubscriptionHelper.cancel(this);
178+
}
179+
180+
@Override
181+
public boolean isDisposed() {
182+
return SubscriptionHelper.isCancelled(get());
183+
}
184+
}
36185
}

src/main/java/io/reactivex/internal/operators/flowable/BlockingFlowableIterator.java

Lines changed: 0 additions & 172 deletions
This file was deleted.

0 commit comments

Comments
 (0)