Skip to content

Commit b7fb332

Browse files
Merge pull request ReactiveX#272 from benjchristensen/BlockingObservable
BlockingObservable
2 parents f667a91 + a1eaf1e commit b7fb332

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

46 files changed

+970
-866
lines changed

language-adaptors/rxjava-groovy/README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ This adaptor allows 'groovy.lang.Closure' functions to be used and RxJava will k
66
This enables code such as:
77

88
```groovy
9-
Observable.toObservable("one", "two", "three")
9+
Observable.from("one", "two", "three")
1010
.take(2)
1111
.subscribe({arg -> println(arg)})
1212
```

language-adaptors/rxjava-groovy/src/examples/groovy/rx/lang/groovy/examples/RxExamples.groovy

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ import rx.util.functions.Func1;
2626
// --------------------------------------------------
2727

2828
def hello(String[] names) {
29-
Observable.toObservable(names)
29+
Observable.from(names)
3030
.subscribe({ println "Hello " + it + "!"})
3131
}
3232

@@ -38,15 +38,15 @@ hello("Ben", "George")
3838
// --------------------------------------------------
3939

4040
def existingDataFromNumbers() {
41-
Observable<Integer> o = Observable.toObservable(1, 2, 3, 4, 5, 6);
41+
Observable<Integer> o = Observable.from(1, 2, 3, 4, 5, 6);
4242
}
4343

4444
def existingDataFromNumbersUsingFrom() {
4545
Observable<Integer> o2 = Observable.from(1, 2, 3, 4, 5, 6);
4646
}
4747

4848
def existingDataFromObjects() {
49-
Observable<String> o = Observable.toObservable("a", "b", "c");
49+
Observable<String> o = Observable.from("a", "b", "c");
5050
}
5151

5252
def existingDataFromObjectsUsingFrom() {
@@ -55,7 +55,7 @@ def existingDataFromObjectsUsingFrom() {
5555

5656
def existingDataFromList() {
5757
def list = [5, 6, 7, 8]
58-
Observable<Integer> o = Observable.toObservable(list);
58+
Observable<Integer> o = Observable.from(list);
5959
}
6060

6161
def existingDataFromListUsingFrom() {

language-adaptors/rxjava-groovy/src/test/groovy/rx/lang/groovy/ObservableTests.groovy

Lines changed: 27 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -54,20 +54,20 @@ def class ObservableTests {
5454

5555
@Test
5656
public void testFilter() {
57-
Observable.filter(Observable.toObservable(1, 2, 3), {it >= 2}).subscribe({ result -> a.received(result)});
57+
Observable.filter(Observable.from(1, 2, 3), {it >= 2}).subscribe({ result -> a.received(result)});
5858
verify(a, times(0)).received(1);
5959
verify(a, times(1)).received(2);
6060
verify(a, times(1)).received(3);
6161
}
6262

6363
@Test
6464
public void testLast() {
65-
assertEquals("three", Observable.toObservable("one", "two", "three").last())
65+
assertEquals("three", Observable.from("one", "two", "three").toBlockingObservable().last())
6666
}
6767

6868
@Test
6969
public void testLastWithPredicate() {
70-
assertEquals("two", Observable.toObservable("one", "two", "three").last({ x -> x.length() == 3}))
70+
assertEquals("two", Observable.from("one", "two", "three").toBlockingObservable().last({ x -> x.length() == 3}))
7171
}
7272

7373
@Test
@@ -78,15 +78,15 @@ def class ObservableTests {
7878

7979
@Test
8080
public void testMap2() {
81-
Observable.map(Observable.toObservable(1, 2, 3), {'hello_' + it}).subscribe({ result -> a.received(result)});
81+
Observable.map(Observable.from(1, 2, 3), {'hello_' + it}).subscribe({ result -> a.received(result)});
8282
verify(a, times(1)).received("hello_" + 1);
8383
verify(a, times(1)).received("hello_" + 2);
8484
verify(a, times(1)).received("hello_" + 3);
8585
}
8686

8787
@Test
8888
public void testMaterialize() {
89-
Observable.materialize(Observable.toObservable(1, 2, 3)).subscribe({ result -> a.received(result)});
89+
Observable.materialize(Observable.from(1, 2, 3)).subscribe({ result -> a.received(result)});
9090
// we expect 4 onNext calls: 3 for 1, 2, 3 ObservableNotification.OnNext and 1 for ObservableNotification.OnCompleted
9191
verify(a, times(4)).received(any(Notification.class));
9292
verify(a, times(0)).error(any(Exception.class));
@@ -95,12 +95,12 @@ def class ObservableTests {
9595
@Test
9696
public void testMergeDelayError() {
9797
Observable.mergeDelayError(
98-
Observable.toObservable(1, 2, 3),
98+
Observable.from(1, 2, 3),
9999
Observable.merge(
100-
Observable.toObservable(6),
100+
Observable.from(6),
101101
Observable.error(new NullPointerException()),
102-
Observable.toObservable(7)),
103-
Observable.toObservable(4, 5))
102+
Observable.from(7)),
103+
Observable.from(4, 5))
104104
.subscribe( { result -> a.received(result)}, { exception -> a.error(exception)});
105105

106106
verify(a, times(1)).received(1);
@@ -116,12 +116,12 @@ def class ObservableTests {
116116
@Test
117117
public void testMerge() {
118118
Observable.merge(
119-
Observable.toObservable(1, 2, 3),
119+
Observable.from(1, 2, 3),
120120
Observable.merge(
121-
Observable.toObservable(6),
121+
Observable.from(6),
122122
Observable.error(new NullPointerException()),
123-
Observable.toObservable(7)),
124-
Observable.toObservable(4, 5))
123+
Observable.from(7)),
124+
Observable.from(4, 5))
125125
.subscribe({ result -> a.received(result)}, { exception -> a.error(exception)});
126126

127127
// executing synchronously so we can deterministically know what order things will come
@@ -158,23 +158,23 @@ def class ObservableTests {
158158

159159
@Test
160160
public void testSkipTake() {
161-
Observable.skip(Observable.toObservable(1, 2, 3), 1).take(1).subscribe({ result -> a.received(result)});
161+
Observable.skip(Observable.from(1, 2, 3), 1).take(1).subscribe({ result -> a.received(result)});
162162
verify(a, times(0)).received(1);
163163
verify(a, times(1)).received(2);
164164
verify(a, times(0)).received(3);
165165
}
166166

167167
@Test
168168
public void testSkip() {
169-
Observable.skip(Observable.toObservable(1, 2, 3), 2).subscribe({ result -> a.received(result)});
169+
Observable.skip(Observable.from(1, 2, 3), 2).subscribe({ result -> a.received(result)});
170170
verify(a, times(0)).received(1);
171171
verify(a, times(0)).received(2);
172172
verify(a, times(1)).received(3);
173173
}
174174

175175
@Test
176176
public void testTake() {
177-
Observable.take(Observable.toObservable(1, 2, 3), 2).subscribe({ result -> a.received(result)});
177+
Observable.take(Observable.from(1, 2, 3), 2).subscribe({ result -> a.received(result)});
178178
verify(a, times(1)).received(1);
179179
verify(a, times(1)).received(2);
180180
verify(a, times(0)).received(3);
@@ -188,15 +188,15 @@ def class ObservableTests {
188188

189189
@Test
190190
public void testTakeWhileViaGroovy() {
191-
Observable.takeWhile(Observable.toObservable(1, 2, 3), { x -> x < 3}).subscribe({ result -> a.received(result)});
191+
Observable.takeWhile(Observable.from(1, 2, 3), { x -> x < 3}).subscribe({ result -> a.received(result)});
192192
verify(a, times(1)).received(1);
193193
verify(a, times(1)).received(2);
194194
verify(a, times(0)).received(3);
195195
}
196196

197197
@Test
198198
public void testTakeWhileWithIndexViaGroovy() {
199-
Observable.takeWhileWithIndex(Observable.toObservable(1, 2, 3), { x, i -> i < 2}).subscribe({ result -> a.received(result)});
199+
Observable.takeWhileWithIndex(Observable.from(1, 2, 3), { x, i -> i < 2}).subscribe({ result -> a.received(result)});
200200
verify(a, times(1)).received(1);
201201
verify(a, times(1)).received(2);
202202
verify(a, times(0)).received(3);
@@ -210,7 +210,7 @@ def class ObservableTests {
210210

211211
@Test
212212
public void testToSortedListStatic() {
213-
Observable.toSortedList(Observable.toObservable(1, 3, 2, 5, 4)).subscribe({ result -> a.received(result)});
213+
Observable.toSortedList(Observable.from(1, 3, 2, 5, 4)).subscribe({ result -> a.received(result)});
214214
verify(a, times(1)).received(Arrays.asList(1, 2, 3, 4, 5));
215215
}
216216

@@ -222,7 +222,7 @@ def class ObservableTests {
222222

223223
@Test
224224
public void testToSortedListWithFunctionStatic() {
225-
Observable.toSortedList(Observable.toObservable(1, 3, 2, 5, 4), {a, b -> a - b}).subscribe({ result -> a.received(result)});
225+
Observable.toSortedList(Observable.from(1, 3, 2, 5, 4), {a, b -> a - b}).subscribe({ result -> a.received(result)});
226226
verify(a, times(1)).received(Arrays.asList(1, 2, 3, 4, 5));
227227
}
228228

@@ -246,29 +246,29 @@ def class ObservableTests {
246246

247247
@Test
248248
public void testLastOrDefault() {
249-
def val = Observable.toObservable("one", "two").lastOrDefault("default", { x -> x.length() == 3})
249+
def val = Observable.from("one", "two").toBlockingObservable().lastOrDefault("default", { x -> x.length() == 3})
250250
assertEquals("two", val)
251251
}
252252

253253
@Test
254254
public void testLastOrDefault2() {
255-
def val = Observable.toObservable("one", "two").lastOrDefault("default", { x -> x.length() > 3})
255+
def val = Observable.from("one", "two").toBlockingObservable().lastOrDefault("default", { x -> x.length() > 3})
256256
assertEquals("default", val)
257257
}
258258

259259
public void testSingle1() {
260-
def s = Observable.toObservable("one").single({ x -> x.length() == 3})
260+
def s = Observable.from("one").toBlockingObservable().single({ x -> x.length() == 3})
261261
assertEquals("one", s)
262262
}
263263

264264
@Test(expected = IllegalStateException.class)
265265
public void testSingle2() {
266-
Observable.toObservable("one", "two").single({ x -> x.length() == 3})
266+
Observable.from("one", "two").toBlockingObservable().single({ x -> x.length() == 3})
267267
}
268268

269269
@Test
270270
public void testDefer() {
271-
def obs = Observable.toObservable(1, 2)
271+
def obs = Observable.from(1, 2)
272272
Observable.defer({-> obs }).subscribe({ result -> a.received(result)})
273273
verify(a, times(1)).received(1);
274274
verify(a, times(1)).received(2);
@@ -277,7 +277,7 @@ def class ObservableTests {
277277

278278
@Test
279279
public void testAll() {
280-
Observable.toObservable(1, 2, 3).all({ x -> x > 0 }).subscribe({ result -> a.received(result) });
280+
Observable.from(1, 2, 3).all({ x -> x > 0 }).subscribe({ result -> a.received(result) });
281281
verify(a, times(1)).received(true);
282282
}
283283

@@ -305,7 +305,7 @@ def class ObservableTests {
305305
int counter = 1;
306306

307307
public Observable<Integer> getNumbers() {
308-
return Observable.toObservable(1, 3, 2, 5, 4);
308+
return Observable.from(1, 3, 2, 5, 4);
309309
}
310310

311311
public TestObservable getObservable() {

language-adaptors/rxjava-jruby/README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ This adaptor allows `org.jruby.RubyProc` lambda functions to be used and RxJava
66
This enables code such as:
77

88
```ruby
9-
Observable.toObservable("one", "two", "three")
9+
Observable.from("one", "two", "three")
1010
.take(2)
1111
.subscribe(lambda { |arg| puts arg })
1212
```

language-adaptors/rxjava-jruby/src/main/java/rx/lang/jruby/JRubyAdaptor.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ public void testCreateViaGroovy() {
7676

7777
@Test
7878
public void testFilterViaGroovy() {
79-
runGroovyScript("Observable.filter(Observable.toObservable(1, 2, 3), lambda{|it| it >= 2}).subscribe(lambda{|result| a.received(result)});");
79+
runGroovyScript("Observable.filter(Observable.from(1, 2, 3), lambda{|it| it >= 2}).subscribe(lambda{|result| a.received(result)});");
8080
verify(assertion, times(0)).received(1L);
8181
verify(assertion, times(1)).received(2L);
8282
verify(assertion, times(1)).received(3L);
@@ -98,7 +98,7 @@ public void testMap() {
9898

9999
@Test
100100
public void testMaterializeViaGroovy() {
101-
runGroovyScript("Observable.materialize(Observable.toObservable(1, 2, 3)).subscribe(lambda{|result| a.received(result)});");
101+
runGroovyScript("Observable.materialize(Observable.from(1, 2, 3)).subscribe(lambda{|result| a.received(result)});");
102102
// we expect 4 onNext calls: 3 for 1, 2, 3 ObservableNotification.OnNext and 1 for ObservableNotification.OnCompleted
103103
verify(assertion, times(4)).received(any(Notification.class));
104104
verify(assertion, times(0)).error(any(Exception.class));
@@ -129,23 +129,23 @@ public void testScriptWithOnNext() {
129129

130130
@Test
131131
public void testSkipTakeViaGroovy() {
132-
runGroovyScript("Observable.skip(Observable.toObservable(1, 2, 3), 1).take(1).subscribe(lambda{|result| a.received(result)});");
132+
runGroovyScript("Observable.skip(Observable.from(1, 2, 3), 1).take(1).subscribe(lambda{|result| a.received(result)});");
133133
verify(assertion, times(0)).received(1);
134134
verify(assertion, times(1)).received(2L);
135135
verify(assertion, times(0)).received(3);
136136
}
137137

138138
@Test
139139
public void testSkipViaGroovy() {
140-
runGroovyScript("Observable.skip(Observable.toObservable(1, 2, 3), 2).subscribe(lambda{|result| a.received(result)});");
140+
runGroovyScript("Observable.skip(Observable.from(1, 2, 3), 2).subscribe(lambda{|result| a.received(result)});");
141141
verify(assertion, times(0)).received(1);
142142
verify(assertion, times(0)).received(2);
143143
verify(assertion, times(1)).received(3L);
144144
}
145145

146146
@Test
147147
public void testTakeViaGroovy() {
148-
runGroovyScript("Observable.take(Observable.toObservable(1, 2, 3), 2).subscribe(lambda{|result| a.received(result)});");
148+
runGroovyScript("Observable.take(Observable.from(1, 2, 3), 2).subscribe(lambda{|result| a.received(result)});");
149149
verify(assertion, times(1)).received(1L);
150150
verify(assertion, times(1)).received(2L);
151151
verify(assertion, times(0)).received(3);
@@ -183,7 +183,7 @@ public static class TestFactory {
183183
int counter = 1;
184184

185185
public Observable<Integer> getNumbers() {
186-
return Observable.toObservable(1, 3, 2, 5, 4);
186+
return Observable.from(1, 3, 2, 5, 4);
187187
}
188188

189189
public TestObservable getObservable() {

language-adaptors/rxjava-scala/README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ This adaptor allows 'fn' functions to be used and RxJava will know how to invoke
66
This enables code such as:
77

88
```scala
9-
Observable.toObservable("1", "2", "3")
9+
Observable.from("1", "2", "3")
1010
.take(2)
1111
.subscribe((callback: String) => {
1212
println(callback)

language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/ScalaAdaptor.scala

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -122,7 +122,7 @@ class UnitTestSuite extends JUnitSuite {
122122
}
123123

124124
@Test def testTake() {
125-
Observable.toObservable("1", "2", "3").take(1).subscribe(Map(
125+
Observable.from("1", "2", "3").take(1).subscribe(Map(
126126
"onNext" -> ((callback: String) => {
127127
print("testTake: callback = " + callback)
128128
assertion.received(callback)
@@ -133,14 +133,14 @@ class UnitTestSuite extends JUnitSuite {
133133

134134
@Test def testClosureVersusMap() {
135135
// using closure
136-
Observable.toObservable("1", "2", "3")
136+
Observable.from("1", "2", "3")
137137
.take(2)
138138
.subscribe((callback: String) => {
139139
println(callback)
140140
})
141141

142142
// using Map of closures
143-
Observable.toObservable("1", "2", "3")
143+
Observable.from("1", "2", "3")
144144
.take(2)
145145
.subscribe(Map(
146146
"onNext" -> ((callback: String) => {
@@ -149,7 +149,7 @@ class UnitTestSuite extends JUnitSuite {
149149
}
150150

151151
@Test def testFilterWithToList() {
152-
val numbers = Observable.toObservable[Int](1, 2, 3, 4, 5, 6, 7, 8, 9)
152+
val numbers = Observable.from[Int](1, 2, 3, 4, 5, 6, 7, 8, 9)
153153
numbers.filter((x: Int) => 0 == (x % 2)).toList().subscribe(
154154
(callback: java.util.List[Int]) => {
155155
val lst = callback.asScala.toList
@@ -161,7 +161,7 @@ class UnitTestSuite extends JUnitSuite {
161161
}
162162

163163
@Test def testTakeLast() {
164-
val numbers = Observable.toObservable[Int](1, 2, 3, 4, 5, 6, 7, 8, 9)
164+
val numbers = Observable.from[Int](1, 2, 3, 4, 5, 6, 7, 8, 9)
165165
numbers.takeLast(1).subscribe((callback: Int) => {
166166
println("testTakeLast: onNext -> got " + callback)
167167
assertion.received(callback)
@@ -170,7 +170,7 @@ class UnitTestSuite extends JUnitSuite {
170170
}
171171

172172
@Test def testMap() {
173-
val numbers = Observable.toObservable(1, 2, 3, 4, 5, 6, 7, 8, 9)
173+
val numbers = Observable.from(1, 2, 3, 4, 5, 6, 7, 8, 9)
174174
val mappedNumbers = new ArrayBuffer[Int]()
175175
numbers.map(((x: Int)=> { x * x })).subscribe(((squareVal: Int) => {
176176
println("square is " + squareVal )
@@ -181,9 +181,9 @@ class UnitTestSuite extends JUnitSuite {
181181
}
182182

183183
@Test def testZip() {
184-
val numbers = Observable.toObservable(1, 2, 3)
185-
val colors = Observable.toObservable("red", "green", "blue")
186-
val characters = Observable.toObservable("lion-o", "cheetara", "panthro")
184+
val numbers = Observable.from(1, 2, 3)
185+
val colors = Observable.from("red", "green", "blue")
186+
val characters = Observable.from("lion-o", "cheetara", "panthro")
187187

188188
Observable.zip(numbers.toList, colors.toList, characters.toList, ((n: java.util.List[Int], c: java.util.List[String], t: java.util.List[String]) => { Map(
189189
"numbers" -> n,

0 commit comments

Comments
 (0)