Skip to content

Commit 4f68c5a

Browse files
committed
Add unit test for sub/unsub/sub bug.
1 parent 2120103 commit 4f68c5a

File tree

1 file changed

+74
-0
lines changed

1 file changed

+74
-0
lines changed

rxjava-core/src/main/java/rx/operators/OperationGroupBy.java

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
package rx.operators;
1717

1818
import static org.junit.Assert.*;
19+
import static org.mockito.Mockito.*;
1920

2021
import java.util.Arrays;
2122
import java.util.Collection;
@@ -29,6 +30,7 @@
2930
import java.util.concurrent.atomic.AtomicReference;
3031

3132
import org.junit.Test;
33+
import org.mockito.InOrder;
3234

3335
import rx.Observable;
3436
import rx.Observer;
@@ -556,6 +558,78 @@ public String toString() {
556558
}
557559
}
558560

561+
/*
562+
* Test subscribing to a group, unsubscribing from it again, and subscribing to a next group
563+
*/
564+
@Test
565+
public void testSubscribeAndImmediatelyUnsubscribeFirstGroup() {
566+
CounterSource source = new CounterSource();
567+
@SuppressWarnings("unchecked")
568+
final Observer<Integer> observer = mock(Observer.class);
569+
570+
Func1<Integer, Integer> modulo2 = new Func1<Integer, Integer>() {
571+
@Override
572+
public Integer call(Integer x) {
573+
return x%2;
574+
}
575+
};
576+
577+
Subscription outerSubscription = source.groupBy(modulo2).subscribe(new Action1<GroupedObservable<Integer, Integer>>() {
578+
@Override
579+
public void call(GroupedObservable<Integer, Integer> group) {
580+
Subscription innerSubscription = group.subscribe(observer);
581+
if (group.getKey() == 0) {
582+
// We immediately unsubscribe again from the even numbers
583+
innerSubscription.unsubscribe();
584+
// We should still get the group of odd numbers
585+
}
586+
}
587+
});
588+
try {
589+
source.thread.join();
590+
} catch (InterruptedException ex) {
591+
}
592+
593+
InOrder o = inOrder(observer);
594+
// With a different implementation that subscribes to the group concurrently, we might actually receive 0.
595+
o.verify(observer, never()).onNext(0);
596+
o.verify(observer).onNext(1);
597+
o.verify(observer, never()).onNext(2);
598+
o.verify(observer).onNext(3);
599+
o.verify(observer, never()).onNext(4);
600+
o.verify(observer).onNext(5);
601+
o.verify(observer, never()).onNext(6);
602+
o.verify(observer).onNext(7);
603+
o.verify(observer, never()).onNext(8);
604+
o.verify(observer).onNext(9);
605+
}
606+
607+
private class CounterSource extends Observable<Integer> {
608+
public Thread thread = null;
609+
@Override
610+
public Subscription subscribe(final Observer<Integer> observer) {
611+
thread = new Thread(new Runnable() {
612+
@Override
613+
public void run() {
614+
int i = 0;
615+
while (i < 10) {
616+
observer.onNext(i++);
617+
if (Thread.interrupted()) {
618+
return;
619+
}
620+
}
621+
}
622+
});
623+
thread.start();
624+
return new Subscription() {
625+
@Override
626+
public void unsubscribe() {
627+
thread.interrupt();
628+
}
629+
};
630+
}
631+
}
632+
559633
}
560634

561635
}

0 commit comments

Comments
 (0)