Skip to content

Commit 0ee0eb8

Browse files
Factory methods for Observer.
1 parent 4f34376 commit 0ee0eb8

File tree

1 file changed

+53
-3
lines changed
  • language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala

1 file changed

+53
-3
lines changed

language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observer.scala

Lines changed: 53 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515
*/
1616
package rx.lang.scala
1717

18+
import rx.joins.ObserverBase
19+
1820
/**
1921
Provides a mechanism for receiving push-based notifications.
2022
*
@@ -33,7 +35,7 @@ trait Observer[-T] {
3335
*
3436
* The [[rx.lang.scala.Observable]] will not call this method again after it calls either `onCompleted` or `onError`.
3537
*/
36-
def onNext(value: T): Unit = asJavaObserver.onNext(value)
38+
def onNext(value: T): Unit = asJavaObserver.onNext(value)
3739

3840
/**
3941
* Notifies the Observer that the [[rx.lang.scala.Observable]] has experienced an error condition.
@@ -47,16 +49,64 @@ trait Observer[-T] {
4749
*
4850
* The [[rx.lang.scala.Observable]] will not call this method if it calls `onError`.
4951
*/
50-
def onCompleted(): Unit = asJavaObserver.onCompleted()
52+
def onCompleted(): Unit = asJavaObserver.onCompleted()
5153

5254
}
5355

5456
object Observer {
55-
def apply[T](observer: rx.Observer[T]) : Observer[T] = {
57+
58+
private [scala] def apply[T](observer: rx.Observer[T]) : Observer[T] = {
5659
new Observer[T]() {
5760
def asJavaObserver: rx.Observer[_ >: T] = observer
5861
}
5962
}
63+
64+
/**
65+
* Creates an [[rx.lang.scala.Observer]]
66+
* @param onNext the onNext action
67+
* @param onError the onError action
68+
* @param onCompleted the onCompleted action
69+
*/
70+
def apply[T](onNext: T => Unit, onError: Throwable => Unit, onCompleted: () => Unit): Observer[T] = {
71+
Observer(new ObserverBase[T] () {
72+
protected def onCompletedCore(): Unit = onCompleted()
73+
protected def onErrorCore(error: Throwable): Unit = onError(error)
74+
protected def onNextCore(value: T): Unit = onNext(value)
75+
})
76+
}
77+
78+
/**
79+
* Creates an [[rx.lang.scala.Observer]]
80+
* @param onNext the onNext action
81+
* @param onError the onError action
82+
*/
83+
def apply[T](onNext: T => Unit, onError: Throwable => Unit): Observer[T] = {
84+
Observer(new ObserverBase[T] () {
85+
protected def onCompletedCore(): Unit = {}
86+
protected def onErrorCore(error: Throwable): Unit = onError(error)
87+
protected def onNextCore(value: T): Unit = onNext(value)
88+
})
89+
}
90+
91+
def apply[T](onNext: T => Unit, onCompleted: () => Unit): Observer[T] = {
92+
Observer(new ObserverBase[T] () {
93+
protected def onCompletedCore(): Unit = onCompleted()
94+
protected def onErrorCore(error: Throwable): Unit = {}
95+
protected def onNextCore(value: T): Unit = onNext(value)
96+
})
97+
}
98+
99+
/**
100+
* Creates an [[rx.lang.scala.Observer]]
101+
* @param onNext the onNext action
102+
*/
103+
def apply[T](onNext: T => Unit): Observer[T] = {
104+
Observer(new ObserverBase[T] () {
105+
protected def onCompletedCore(): Unit = {}
106+
protected def onErrorCore(error: Throwable): Unit = {}
107+
protected def onNextCore(value: T): Unit = onNext(value)
108+
})
109+
}
60110
}
61111

62112

0 commit comments

Comments
 (0)