Skip to content

Commit 2f7ec6f

Browse files
Manually merge branch 'master' of github.com:Netflix/RxJava into scala-constructors
Conflicts: language-adaptors/rxjava-scala/src/examples/scala/rx/lang/scala/examples/RxScalaDemo.scala language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/ImplicitFunctionConversions.scala language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala
2 parents 44f1508 + 7cc5baa commit 2f7ec6f

26 files changed

+982
-266
lines changed

language-adaptors/rxjava-scala/src/examples/scala/rx/lang/scala/examples/RxScalaDemo.scala

Lines changed: 10 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import scala.concurrent.duration.Duration
2121
import scala.concurrent.duration.DurationInt
2222
import scala.concurrent.duration.DurationLong
2323
import scala.language.postfixOps
24+
import scala.language.implicitConversions
2425

2526
import org.junit.Assert.assertEquals
2627
import org.junit.Assert.assertTrue
@@ -29,7 +30,7 @@ import org.junit.Test
2930
import org.scalatest.junit.JUnitSuite
3031

3132
import rx.lang.scala._
32-
import rx.lang.scala.concurrency.Schedulers
33+
import rx.lang.scala.concurrency._
3334

3435
@Ignore // Since this doesn't do automatic testing, don't increase build time unnecessarily
3536
class RxScalaDemo extends JUnitSuite {
@@ -164,21 +165,21 @@ class RxScalaDemo extends JUnitSuite {
164165
@Test def testTwoSubscriptionsToOneInterval() {
165166
val o = Observable.interval(100 millis).take(8)
166167
o.subscribe(
167-
i => println(s"${i}a (on thread #${Thread.currentThread().getId()})")
168+
i => println(s"${i}a (on thread #${Thread.currentThread().getId})")
168169
)
169170
o.subscribe(
170-
i => println(s"${i}b (on thread #${Thread.currentThread().getId()})")
171+
i => println(s"${i}b (on thread #${Thread.currentThread().getId})")
171172
)
172173
waitFor(o)
173174
}
174175

175176
@Test def schedulersExample() {
176177
val o = Observable.interval(100 millis).take(8)
177-
o.observeOn(Schedulers.newThread).subscribe(
178-
i => println(s"${i}a (on thread #${Thread.currentThread().getId()})")
178+
o.observeOn(NewThreadScheduler()).subscribe(
179+
i => println(s"${i}a (on thread #${Thread.currentThread().getId})")
179180
)
180-
o.observeOn(Schedulers.newThread).subscribe(
181-
i => println(s"${i}b (on thread #${Thread.currentThread().getId()})")
181+
o.observeOn(NewThreadScheduler()).subscribe(
182+
i => println(s"${i}b (on thread #${Thread.currentThread().getId})")
182183
)
183184
waitFor(o)
184185
}
@@ -355,13 +356,13 @@ class RxScalaDemo extends JUnitSuite {
355356
}
356357

357358
def square(x: Int): Int = {
358-
println(s"$x*$x is being calculated on thread ${Thread.currentThread().getId()}")
359+
println(s"$x*$x is being calculated on thread ${Thread.currentThread().getId}")
359360
Thread.sleep(100) // calculating a square is heavy work :)
360361
x*x
361362
}
362363

363364
def work(o1: Observable[Int]): Observable[String] = {
364-
println(s"map() is being called on thread ${Thread.currentThread().getId()}")
365+
println(s"map() is being called on thread ${Thread.currentThread().getId}")
365366
o1.map(i => s"The square of $i is ${square(i)}")
366367
}
367368

@@ -425,40 +426,6 @@ class RxScalaDemo extends JUnitSuite {
425426
assertEquals("!!", Observable("a", "b", "c").drop(10).firstOrElse("!!").toBlockingObservable.single)
426427
}
427428

428-
@Test def observableLikeFuture1() {
429-
implicit val scheduler = Schedulers.threadPoolForIO
430-
val o1 = observable {
431-
Thread.sleep(1000)
432-
5
433-
}
434-
val o2 = observable {
435-
Thread.sleep(500)
436-
4
437-
}
438-
Thread.sleep(500)
439-
val t1 = System.currentTimeMillis
440-
println((o1 merge o2).first.toBlockingObservable.single)
441-
println(System.currentTimeMillis - t1)
442-
}
443-
444-
@Test def observableLikeFuture2() {
445-
class Friend {}
446-
val session = new Object {
447-
def getFriends: List[Friend] = List(new Friend, new Friend)
448-
}
449-
450-
implicit val scheduler = Schedulers.threadPoolForIO
451-
val o: Observable[List[Friend]] = observable {
452-
session.getFriends
453-
}
454-
o.subscribe(
455-
friendList => println(friendList),
456-
err => println(err.getMessage)
457-
)
458-
459-
Thread.sleep(1500) // or convert to BlockingObservable
460-
}
461-
462429
@Test def takeWhileWithIndexAlternative {
463430
val condition = true
464431
Observable("a", "b").zipWithIndex.takeWhile{case (elem, index) => condition}.map(_._1)

language-adaptors/rxjava-scala/src/examples/scala/rx/lang/scala/examples/TestSchedulerExample.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ class TestSchedulerExample extends JUnitSuite {
2828

2929
scheduler.advanceTimeTo(2 seconds)
3030

31-
val inOrdr = inOrder(observer);
31+
val inOrdr = inOrder(observer)
3232
inOrdr.verify(observer, times(1)).onNext(0L)
3333
inOrdr.verify(observer, times(1)).onNext(1L)
3434
inOrdr.verify(observer, never).onNext(2L)
@@ -37,7 +37,7 @@ class TestSchedulerExample extends JUnitSuite {
3737

3838
verify(observer, never).onNext(2L)
3939

40-
sub.unsubscribe();
40+
sub.unsubscribe()
4141

4242
scheduler.advanceTimeTo(4 seconds)
4343

language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/ImplicitFunctionConversions.scala

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,14 @@ package rx.lang.scala
1717

1818
import java.lang.Exception
1919
import java.{ lang => jlang }
20-
import rx.lang.scala._
21-
import rx.util.functions._
20+
21+
import scala.language.implicitConversions
2222
import scala.collection.Seq
23-
import java.{lang => jlang}
24-
import language.implicitConversions
23+
24+
import rx.util.functions._
2525
import rx.lang.scala.JavaConversions._
2626

27+
2728
/**
2829
* These function conversions convert between Scala functions and Rx `Func`s and `Action`s.
2930
* Most RxScala users won't need them, but they might be useful if one wants to use
@@ -34,10 +35,17 @@ import rx.lang.scala.JavaConversions._
3435
*/
3536
object ImplicitFunctionConversions {
3637

38+
// implicit def schedulerActionToFunc2[T](action: (Scheduler, T) => Subscription): Func2[rx.Scheduler, T, rx.Subscription] with Object {def call(s: rx.Scheduler, t: T): rx.Subscription} =
39+
// new Func2[rx.Scheduler, T, rx.Subscription] {
40+
// def call(s: rx.Scheduler, t: T): rx.Subscription = {
41+
// action(rx.lang.scala.Scheduler(s), t).asJavaSubscription
42+
// }
43+
// }
44+
3745
implicit def schedulerActionToFunc2[T](action: (Scheduler, T) => Subscription) =
3846
new Func2[rx.Scheduler, T, rx.Subscription] {
3947
def call(s: rx.Scheduler, t: T): rx.Subscription = {
40-
action(s, t).asJavaSubscription
48+
action(Scheduler(s), t).asJavaSubscription
4149
}
4250
}
4351

language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala

Lines changed: 80 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,6 @@ import rx.Observable.OnSubscribeFunc
2222
/**
2323
* The Observable interface that implements the Reactive Pattern.
2424
*
25-
* @param asJavaObservable the underlying Java observable
26-
*
2725
* @define subscribeObserverMain
2826
* Call this method to subscribe an [[rx.lang.scala.Observer]] for receiving
2927
* items and notifications from the Observable.
@@ -228,7 +226,6 @@ trait Observable[+T]
228226
* otherwise you'll get a compilation error.
229227
*
230228
* @usecase def concat[U]: Observable[U]
231-
* @inheritdoc
232229
*/
233230
def concat[U](implicit evidence: Observable[T] <:< Observable[Observable[U]]): Observable[U] = {
234231
val o2: Observable[Observable[U]] = this
@@ -274,7 +271,19 @@ trait Observable[+T]
274271
* is the minumum of the number of `onNext` invocations of `this` and `that`.
275272
*/
276273
def zip[U](that: Observable[U]): Observable[(T, U)] = {
277-
toScalaObservable[(T, U)](rx.Observable.zip[T, U, (T, U)](this.asJavaObservable, that.asJavaObservable, (t: T, u: U) => (t, u)))
274+
zip(that, (t: T, u: U) => (t, u))
275+
}
276+
277+
/**
278+
* Returns an Observable formed from this Observable and another Observable by combining
279+
* corresponding elements using the selector function.
280+
* The number of `onNext` invocations of the resulting `Observable[(T, U)]`
281+
* is the minumum of the number of `onNext` invocations of `this` and `that`.
282+
*
283+
* Note that this function is private because Scala collections don't have such a function.
284+
*/
285+
private def zip[U, R](that: Observable[U], selector: (T,U) => R): Observable[R] = {
286+
toScalaObservable[R](rx.Observable.zip[T, U, R](this.asJavaObservable, that.asJavaObservable, selector))
278287
}
279288

280289
/**
@@ -1819,6 +1828,65 @@ trait Observable[+T]
18191828
new WithFilter[T](p, asJavaObservable)
18201829
}
18211830

1831+
/**
1832+
* Returns an Observable that applies the given function to each item emitted by an
1833+
* Observable.
1834+
*
1835+
* @param observer the observer
1836+
*
1837+
* @return an Observable with the side-effecting behavior applied.
1838+
*/
1839+
def doOnEach(observer: Observer[T]): Observable[T] = {
1840+
toScalaObservable[T](asJavaObservable.doOnEach(observer.asJavaObserver))
1841+
}
1842+
1843+
/**
1844+
* Returns an Observable that applies the given function to each item emitted by an
1845+
* Observable.
1846+
*
1847+
* @param onNext this function will be called whenever the Observable emits an item
1848+
*
1849+
* @return an Observable with the side-effecting behavior applied.
1850+
*/
1851+
def doOnEach(onNext: T => Unit): Observable[T] = {
1852+
toScalaObservable[T](asJavaObservable.doOnEach(
1853+
onNext
1854+
))
1855+
}
1856+
1857+
/**
1858+
* Returns an Observable that applies the given function to each item emitted by an
1859+
* Observable.
1860+
*
1861+
* @param onNext this function will be called whenever the Observable emits an item
1862+
* @param onError this function will be called if an error occurs
1863+
*
1864+
* @return an Observable with the side-effecting behavior applied.
1865+
*/
1866+
def doOnEach(onNext: T => Unit, onError: Throwable => Unit): Observable[T] = {
1867+
toScalaObservable[T](asJavaObservable.doOnEach(
1868+
onNext,
1869+
onError
1870+
))
1871+
}
1872+
1873+
/**
1874+
* Returns an Observable that applies the given function to each item emitted by an
1875+
* Observable.
1876+
*
1877+
* @param onNext this function will be called whenever the Observable emits an item
1878+
* @param onError this function will be called if an error occurs
1879+
* @param onCompleted the action to invoke when the source Observable calls
1880+
*
1881+
* @return an Observable with the side-effecting behavior applied.
1882+
*/
1883+
def doOnEach(onNext: T => Unit, onError: Throwable => Unit, onCompleted: () => Unit): Observable[T] = {
1884+
toScalaObservable[T](asJavaObservable.doOnEach(
1885+
onNext,
1886+
onError,
1887+
onCompleted
1888+
))
1889+
}
18221890
}
18231891

18241892
/**
@@ -1863,13 +1931,13 @@ object Observable {
18631931
*
18641932
*
18651933
* @tparam T
1866-
* the type of the items that this Observable emits
1934+
* the type of the items that this Observable emits.
18671935
* @param func
18681936
* a function that accepts an `Observer[T]`, invokes its `onNext`, `onError`, and `onCompleted` methods
18691937
* as appropriate, and returns a [[rx.lang.scala.Subscription]] to allow the Observer to
1870-
* canceling the subscription
1871-
* @return an Observable that, when an [[rx.lang.scala.Observer]] subscribes to it, will execute the given
1872-
* function
1938+
* canceling the subscription.
1939+
* @return
1940+
* an Observable that, when an [[rx.lang.scala.Observer]] subscribes to it, will execute the given function.
18731941
*/
18741942
def create[T](func: Observer[T] => Subscription): Observable[T] = {
18751943
toScalaObservable[T](rx.Observable.create(new OnSubscribeFunc[T] {
@@ -1880,15 +1948,17 @@ object Observable {
18801948
}
18811949

18821950
/**
1883-
* Returns an Observable that invokes an [[rx.lang.scala.Observer]]'s [[rx.lang.scala.Observer.onError onError]] method when the Observer subscribes to it
1951+
* Returns an Observable that invokes an [[rx.lang.scala.Observer]]'s [[rx.lang.scala.Observer.onError onError]]
1952+
* method when the Observer subscribes to it.
18841953
*
18851954
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/error.png">
18861955
*
18871956
* @param exception
18881957
* the particular error to report
18891958
* @tparam T
18901959
* the type of the items (ostensibly) emitted by the Observable
1891-
* @return an Observable that invokes the [[rx.lang.scala.Observer]]'s [[rx.lang.scala.Observer.onError onError]] method when the Observer subscribes to it
1960+
* @return an Observable that invokes the [[rx.lang.scala.Observer]]'s [[rx.lang.scala.Observer.onError onError]]
1961+
* method when the Observer subscribes to it
18921962
*/
18931963
def error[T](exception: Throwable): Observable[T] = {
18941964
toScalaObservable[T](rx.Observable.error(exception))

language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observer.scala

Lines changed: 23 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515
*/
1616
package rx.lang.scala
1717

18+
import rx.joins.ObserverBase
19+
1820
/**
1921
Provides a mechanism for receiving push-based notifications.
2022
*
@@ -24,7 +26,11 @@ package rx.lang.scala
2426
*/
2527
trait Observer[-T] {
2628

27-
def asJavaObserver: rx.Observer[_ >: T]
29+
private [scala] def asJavaObserver: rx.Observer[_ >: T] = new ObserverBase[T] {
30+
protected def onCompletedCore(): Unit = onCompleted()
31+
protected def onErrorCore(error: Throwable): Unit = onError(error)
32+
protected def onNextCore(value: T): Unit = onNext(value)
33+
}
2834

2935
/**
3036
* Provides the Observer with new data.
@@ -33,30 +39,37 @@ trait Observer[-T] {
3339
*
3440
* The [[rx.lang.scala.Observable]] will not call this method again after it calls either `onCompleted` or `onError`.
3541
*/
36-
def onNext(value: T): Unit = asJavaObserver.onNext(value)
42+
def onNext(value: T): Unit
3743

3844
/**
3945
* Notifies the Observer that the [[rx.lang.scala.Observable]] has experienced an error condition.
4046
*
4147
* If the [[rx.lang.scala.Observable]] calls this method, it will not thereafter call `onNext` or `onCompleted`.
4248
*/
43-
def onError(error: Throwable): Unit = asJavaObserver.onError(error)
49+
def onError(error: Throwable): Unit
4450

4551
/**
4652
* Notifies the Observer that the [[rx.lang.scala.Observable]] has finished sending push-based notifications.
4753
*
4854
* The [[rx.lang.scala.Observable]] will not call this method if it calls `onError`.
4955
*/
50-
def onCompleted(): Unit = asJavaObserver.onCompleted()
56+
def onCompleted(): Unit
5157

5258
}
5359

5460
object Observer {
55-
def apply[T](observer: rx.Observer[T]) : Observer[T] = {
56-
new Observer[T]() {
57-
def asJavaObserver: rx.Observer[_ >: T] = observer
58-
}
59-
}
60-
}
61+
/**
62+
* Assume that the underlying rx.Observer does not need to be wrapped.
63+
*/
64+
private [scala] def apply[T](observer: rx.Observer[T]) : Observer[T] = {
65+
new Observer[T] {
66+
67+
override def asJavaObserver = observer
6168

69+
def onNext(value: T): Unit = asJavaObserver.onNext(value)
70+
def onError(error: Throwable): Unit = asJavaObserver.onError(error)
71+
def onCompleted(): Unit = asJavaObserver.onCompleted()
6272

73+
}
74+
}
75+
}

0 commit comments

Comments
 (0)