Skip to content

Commit 44f1508

Browse files
add Observable.from(Future) constructor
1 parent 8a7979d commit 44f1508

File tree

2 files changed

+48
-0
lines changed

2 files changed

+48
-0
lines changed

language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1828,8 +1828,11 @@ object Observable {
18281828
import scala.collection.JavaConverters._
18291829
import scala.collection.immutable.Range
18301830
import scala.concurrent.duration.Duration
1831+
import scala.concurrent.{Future, ExecutionContext}
1832+
import scala.util.{Success, Failure}
18311833
import ImplicitFunctionConversions._
18321834
import JavaConversions._
1835+
import rx.lang.scala.subjects.AsyncSubject
18331836

18341837
private[scala]
18351838
def jObsOfListToScObsOfSeq[T](jObs: rx.Observable[_ <: java.util.List[T]]): Observable[Seq[T]] = {
@@ -1911,6 +1914,24 @@ object Observable {
19111914
toScalaObservable[T](rx.Observable.from(items.toIterable.asJava))
19121915
}
19131916

1917+
/** Returns an Observable emitting the value produced by the Future as its single item.
1918+
* If the future fails, the Observable will fail as well.
1919+
*
1920+
* @param f Future whose value ends up in the resulting Observable
1921+
* @return an Observable completed after producing the value of the future, or with an exception
1922+
*/
1923+
def from[T](f: Future[T])(implicit execContext: ExecutionContext): Observable[T] = {
1924+
val s = AsyncSubject[T]()
1925+
f.onComplete {
1926+
case Failure(e) =>
1927+
s.onError(e)
1928+
case Success(c) =>
1929+
s.onNext(c)
1930+
s.onCompleted()
1931+
}
1932+
s
1933+
}
1934+
19141935
/**
19151936
* Converts an `Iterable` into an Observable.
19161937
*

language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/ObservableTest.scala

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
11
package rx.lang.scala
22

3+
import scala.concurrent.{Future, Await}
4+
import scala.concurrent.duration.Duration
5+
import scala.concurrent.ExecutionContext.Implicits.global
36
import org.junit.Assert._
47
import org.junit.{ Ignore, Test }
58
import org.scalatest.junit.JUnitSuite
@@ -67,6 +70,30 @@ class ObservableTests extends JUnitSuite {
6770
assertEquals(receivedMsg, msg)
6871
}
6972

73+
@Test def testFromFuture() {
74+
val o = Observable from Future { 5 }
75+
assertEquals(5, o.toBlockingObservable.single)
76+
}
77+
78+
@Test def testFromFutureWithDelay() {
79+
val o = Observable from Future { Thread.sleep(200); 42 }
80+
assertEquals(42, o.toBlockingObservable.single)
81+
}
82+
83+
@Test def testFromFutureWithError() {
84+
val err = new Exception("ooops42")
85+
val o: Observable[Int] = Observable from Future { Thread.sleep(200); throw err }
86+
assertEquals(List(Notification.OnError(err)), o.materialize.toBlockingObservable.toList)
87+
}
88+
89+
@Test def testFromFutureWithSubscribeOnlyAfterCompletion() {
90+
val f = Future { Thread.sleep(200); 6 }
91+
val o = Observable from f
92+
val res = Await.result(f, Duration.Inf)
93+
assertEquals(6, res)
94+
assertEquals(6, o.toBlockingObservable.single)
95+
}
96+
7097
/*
7198
@Test def testHead() {
7299
val observer = mock(classOf[Observer[Int]])

0 commit comments

Comments
 (0)