From baf568d2d4d4d92d615c64e0f4120125a0d62ac2 Mon Sep 17 00:00:00 2001 From: zsxwing Date: Sat, 3 May 2014 15:07:24 +0800 Subject: [PATCH 1/7] Add 'dropUntil' to RxScala --- .../rx/lang/scala/examples/RxScalaDemo.scala | 9 +++++++++ .../main/scala/rx/lang/scala/Observable.scala | 16 ++++++++++++++++ .../scala/rx/lang/scala/CompletenessTest.scala | 1 + 3 files changed, 26 insertions(+) diff --git a/language-adaptors/rxjava-scala/src/examples/scala/rx/lang/scala/examples/RxScalaDemo.scala b/language-adaptors/rxjava-scala/src/examples/scala/rx/lang/scala/examples/RxScalaDemo.scala index 7d1ee11cd3..307b7886c9 100644 --- a/language-adaptors/rxjava-scala/src/examples/scala/rx/lang/scala/examples/RxScalaDemo.scala +++ b/language-adaptors/rxjava-scala/src/examples/scala/rx/lang/scala/examples/RxScalaDemo.scala @@ -426,6 +426,15 @@ class RxScalaDemo extends JUnitSuite { ) } + @Test def dropUntilExample() { + val o = List("Alice", "Bob", "Carlos").toObservable.zip( + Observable.interval(700 millis, IOScheduler())).map(_._1) // emit every 700 millis + val other = List(1).toObservable.delay(1 seconds) + println( + o.dropUntil(other).toBlockingObservable.toList // output List("Bob", "Carlos") + ) + } + def square(x: Int): Int = { println(s"$x*$x is being calculated on thread ${Thread.currentThread().getId}") Thread.sleep(100) // calculating a square is heavy work :) diff --git a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala index 35eae53767..2510b14551 100644 --- a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala +++ b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala @@ -1327,6 +1327,22 @@ trait Observable[+T] toScalaObservable(asJavaObservable.skipLast(time.length, time.unit, scheduler)) } + /** + * Returns an Observable that skips items emitted by the source Observable until a second Observable emits an item. + *

+ * + * + * @param other the second Observable that has to emit an item before the source Observable's elements begin + * to be mirrored by the resulting Observable + * @return an Observable that skips items from the source Observable until the second Observable emits an + * item, then emits the remaining items + * @see RxJava Wiki: skipUntil() + * @see MSDN: Observable.SkipUntil + */ + def dropUntil[E](other: Observable[E]): Observable[T] = { + toScalaObservable[T](asJavaObservable.skipUntil(other)) + } + /** * Returns an Observable that emits only the first `num` items emitted by the source * Observable. diff --git a/language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/CompletenessTest.scala b/language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/CompletenessTest.scala index eca5168da5..3a64c36a44 100644 --- a/language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/CompletenessTest.scala +++ b/language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/CompletenessTest.scala @@ -98,6 +98,7 @@ class CompletenessTest extends JUnitSuite { "skip(Long, TimeUnit, Scheduler)" -> "drop(Duration, Scheduler)", "skipWhile(Func1[_ >: T, Boolean])" -> "dropWhile(T => Boolean)", "skipWhileWithIndex(Func2[_ >: T, Integer, Boolean])" -> unnecessary, + "skipUntil(Observable[U])" -> "dropUntil(Observable[E])", "startWith(Iterable[T])" -> "[unnecessary because we can just use `++` instead]", "skipLast(Int)" -> "dropRight(Int)", "skipLast(Long, TimeUnit)" -> "dropRight(Duration)", From bb788092a437580889a476604bdb1ccdb7b7b3ff Mon Sep 17 00:00:00 2001 From: zsxwing Date: Sat, 3 May 2014 16:00:25 +0800 Subject: [PATCH 2/7] Add 'contains' to RxScala --- .../rx/lang/scala/examples/RxScalaDemo.scala | 9 +++++++++ .../main/scala/rx/lang/scala/Observable.scala | 16 ++++++++++++++++ .../scala/rx/lang/scala/CompletenessTest.scala | 1 + 3 files changed, 26 insertions(+) diff --git a/language-adaptors/rxjava-scala/src/examples/scala/rx/lang/scala/examples/RxScalaDemo.scala b/language-adaptors/rxjava-scala/src/examples/scala/rx/lang/scala/examples/RxScalaDemo.scala index 307b7886c9..6ed20db2a3 100644 --- a/language-adaptors/rxjava-scala/src/examples/scala/rx/lang/scala/examples/RxScalaDemo.scala +++ b/language-adaptors/rxjava-scala/src/examples/scala/rx/lang/scala/examples/RxScalaDemo.scala @@ -25,6 +25,7 @@ import scala.language.implicitConversions import org.junit.Assert.assertEquals import org.junit.Assert.assertTrue +import org.junit.Assert.assertFalse import org.junit.Ignore import org.junit.Test import org.scalatest.junit.JUnitSuite @@ -645,6 +646,14 @@ class RxScalaDemo extends JUnitSuite { println(m.toBlockingObservable.single) } + @Test def containsExample(): Unit = { + val o1 = List(1, 2, 3).toObservable.contains(2) + assertTrue(o1.toBlockingObservable.single) + + val o2 = List(1, 2, 3).toObservable.contains(4) + assertFalse(o2.toBlockingObservable.single) + } + @Test def retryExample1(): Unit = { val o : Observable[String] = List("alice", "bob", "carol").toObservable assertEquals(List("alice", "bob", "carol"), o.retry.toBlockingObservable.toList) diff --git a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala index 2510b14551..d7c5a8aeb1 100644 --- a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala +++ b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala @@ -1061,6 +1061,22 @@ trait Observable[+T] toScalaObservable[T](asJavaObservable.cache()) } + /** + * Returns an Observable that emits a Boolean that indicates whether the source Observable emitted a + * specified item. + * + * Note: this method uses `==` to compare elements. It's a bit different from RxJava which uses `Object.equals`. + *

+ * + * + *@param elem the item to search for in the emissions from the source Observable + * @return an Observable that emits `true` if the specified item is emitted by the source Observable, + * or `false` if the source Observable completes without emitting that item + */ + def contains(elem: Any): Observable[Boolean] = { + exists(_ == elem) + } + /** * Returns a a pair of a start function and an [[rx.lang.scala.Observable]], which waits until the start function is called before it begins emitting * items to those [[rx.lang.scala.Observer]]s that have subscribed to it. diff --git a/language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/CompletenessTest.scala b/language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/CompletenessTest.scala index 3a64c36a44..6e36756d54 100644 --- a/language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/CompletenessTest.scala +++ b/language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/CompletenessTest.scala @@ -71,6 +71,7 @@ class CompletenessTest extends JUnitSuite { "all(Func1[_ >: T, Boolean])" -> "forall(T => Boolean)", "buffer(Long, Long, TimeUnit)" -> "buffer(Duration, Duration)", "buffer(Long, Long, TimeUnit, Scheduler)" -> "buffer(Duration, Duration, Scheduler)", + "contains(T)" -> "contains(Any)", "count()" -> "length", "dematerialize()" -> "dematerialize(<:<[Observable[T], Observable[Notification[U]]])", "elementAt(Int)" -> "[use `.drop(index).first`]", From d5074f43a0f60cdb9556041676dcec9a8565b374 Mon Sep 17 00:00:00 2001 From: zsxwing Date: Sat, 3 May 2014 23:54:36 +0800 Subject: [PATCH 3/7] Add 'repeat' to RxScala --- .../rx/lang/scala/examples/RxScalaDemo.scala | 10 +++ .../main/scala/rx/lang/scala/Observable.scala | 62 +++++++++++++++++++ .../rx/lang/scala/CompletenessTest.scala | 2 + 3 files changed, 74 insertions(+) diff --git a/language-adaptors/rxjava-scala/src/examples/scala/rx/lang/scala/examples/RxScalaDemo.scala b/language-adaptors/rxjava-scala/src/examples/scala/rx/lang/scala/examples/RxScalaDemo.scala index 6ed20db2a3..f8b0dba233 100644 --- a/language-adaptors/rxjava-scala/src/examples/scala/rx/lang/scala/examples/RxScalaDemo.scala +++ b/language-adaptors/rxjava-scala/src/examples/scala/rx/lang/scala/examples/RxScalaDemo.scala @@ -654,6 +654,16 @@ class RxScalaDemo extends JUnitSuite { assertFalse(o2.toBlockingObservable.single) } + @Test def repeatExample1(): Unit = { + val o : Observable[String] = List("alice", "bob", "carol").toObservable.repeat().take(6) + assertEquals(List("alice", "bob", "carol", "alice", "bob", "carol"), o.toBlockingObservable.toList) + } + + @Test def repeatExample2(): Unit = { + val o : Observable[String] = List("alice", "bob", "carol").toObservable.repeat(2) + assertEquals(List("alice", "bob", "carol", "alice", "bob", "carol"), o.toBlockingObservable.toList) + } + @Test def retryExample1(): Unit = { val o : Observable[String] = List("alice", "bob", "carol").toObservable assertEquals(List("alice", "bob", "carol"), o.retry.toBlockingObservable.toList) diff --git a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala index d7c5a8aeb1..dfb507b9c2 100644 --- a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala +++ b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala @@ -2253,6 +2253,68 @@ trait Observable[+T] toScalaObservable[T](asJavaObservable.retry()) } + /** + * Returns an Observable that repeats the sequence of items emitted by the source Observable indefinitely. + *

+ * + * + * @return an Observable that emits the items emitted by the source Observable repeatedly and in sequence + * @see RxJava Wiki: repeat() + * @see MSDN: Observable.Repeat + */ + def repeat(): Observable[T] = { + toScalaObservable[T](asJavaObservable.repeat()) + } + + /** + * Returns an Observable that repeats the sequence of items emitted by the source Observable indefinitely, + * on a particular Scheduler. + *

+ * + * + * @param scheduler the Scheduler to emit the items on + * @return an Observable that emits the items emitted by the source Observable repeatedly and in sequence + * @see RxJava Wiki: repeat() + * @see MSDN: Observable.Repeat + */ + def repeat(scheduler: Scheduler): Observable[T] = { + toScalaObservable[T](asJavaObservable.repeat(scheduler)) + } + + /** + * Returns an Observable that repeats the sequence of items emitted by the source Observable at most `count` times. + *

+ * + * + * @param count the number of times the source Observable items are repeated, + * a count of 0 will yield an empty sequence + * @return an Observable that repeats the sequence of items emitted by the source Observable at most `count` times + * @throws IllegalArgumentException if `count` is less than zero + * @see RxJava Wiki: repeat() + * @see MSDN: Observable.Repeat + */ + def repeat(count: Long): Observable[T] = { + toScalaObservable[T](asJavaObservable.repeat(count)) + } + + /** + * Returns an Observable that repeats the sequence of items emitted by the source Observable + * at most `count` times, on a particular Scheduler. + *

+ * + * + * @param count the number of times the source Observable items are repeated, + * a count of 0 will yield an empty sequence + * @param scheduler the `Scheduler` to emit the items on + * @return an Observable that repeats the sequence of items emitted by the source Observable at most `count` times + * on a particular Scheduler + * @see RxJava Wiki: repeat() + * @see MSDN: Observable.Repeat + */ + def repeat(count: Long, scheduler: Scheduler): Observable[T] = { + toScalaObservable[T](asJavaObservable.repeat(count, scheduler)) + } + /** * Converts an Observable into a [[rx.lang.scala.observables.BlockingObservable]] (an Observable with blocking * operators). diff --git a/language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/CompletenessTest.scala b/language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/CompletenessTest.scala index 6e36756d54..1de16eab50 100644 --- a/language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/CompletenessTest.scala +++ b/language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/CompletenessTest.scala @@ -143,6 +143,8 @@ class CompletenessTest extends JUnitSuite { "mergeDelayError(Observable[_ <: T], Observable[_ <: T])" -> "mergeDelayError(Observable[U])", "mergeDelayError(Observable[_ <: Observable[_ <: T]])" -> "flattenDelayError(<:<[Observable[T], Observable[Observable[U]]])", "range(Int, Int)" -> "apply(Range)", + "repeat()" -> "repeat()", + "retry()" -> "retry()", "sequenceEqual(Observable[_ <: T], Observable[_ <: T])" -> "[use `(first zip second) map (p => p._1 == p._2)`]", "sequenceEqual(Observable[_ <: T], Observable[_ <: T], Func2[_ >: T, _ >: T, Boolean])" -> "[use `(first zip second) map (p => equality(p._1, p._2))`]", "sum(Observable[Integer])" -> "sum(Numeric[U])", From 1b4e2a8f429ac335c277f6dbc56d20f69c4a349e Mon Sep 17 00:00:00 2001 From: zsxwing Date: Sun, 4 May 2014 22:57:42 +0800 Subject: [PATCH 4/7] Add 'doOnTerminate' to RxScala --- .../rx/lang/scala/examples/RxScalaDemo.scala | 20 +++++++++++++++++++ .../main/scala/rx/lang/scala/Observable.scala | 16 +++++++++++++++ 2 files changed, 36 insertions(+) diff --git a/language-adaptors/rxjava-scala/src/examples/scala/rx/lang/scala/examples/RxScalaDemo.scala b/language-adaptors/rxjava-scala/src/examples/scala/rx/lang/scala/examples/RxScalaDemo.scala index f8b0dba233..fcb8f83e26 100644 --- a/language-adaptors/rxjava-scala/src/examples/scala/rx/lang/scala/examples/RxScalaDemo.scala +++ b/language-adaptors/rxjava-scala/src/examples/scala/rx/lang/scala/examples/RxScalaDemo.scala @@ -560,6 +560,26 @@ class RxScalaDemo extends JUnitSuite { obs.toBlockingObservable.toIterable.last } + @Test def doOnTerminateExample(): Unit = { + val o = List("red", "green", "blue").toObservable.doOnTerminate(() => println("terminate")) + o.subscribe(v => println(v), e => e.printStackTrace, () => println("onCompleted")) + // red + // green + // blud + // terminate + // onCompleted + } + + @Test def finallyDoExample(): Unit = { + val o = List("red", "green", "blue").toObservable.finallyDo(() => println("finally")) + o.subscribe(v => println(v), e => e.printStackTrace, () => println("onCompleted")) + // red + // green + // blud + // onCompleted + // finally + } + @Test def timeoutExample(): Unit = { val other = List(100L, 200L, 300L).toObservable val result = Observable.interval(100 millis).timeout(50 millis, other).toBlockingObservable.toList diff --git a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala index dfb507b9c2..3d3e159c33 100644 --- a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala +++ b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala @@ -2461,6 +2461,22 @@ trait Observable[+T] toScalaObservable[T](asJavaObservable.doOnEach(Observer(onNext, onError,onCompleted))) } + /** + * Modifies an Observable so that it invokes an action when it calls `onCompleted` or `onError`. + *

+ * + *

+ * This differs from `finallyDo` in that this happens BEFORE onCompleted/onError are emitted. + * + * @param onTerminate the action to invoke when the source Observable calls `onCompleted` or `onError` + * @return the source Observable with the side-effecting behavior applied + * @see RxJava Wiki: doOnTerminate() + * @see MSDN: Observable.Do + */ + def doOnTerminate(onTerminate: () => Unit): Observable[T] = { + toScalaObservable[T](asJavaObservable.doOnTerminate(onTerminate)) + } + /** * Given two Observables, mirror the one that first emits an item. * From 7e1c2d495075843a44b4885b5fe45b736ce270dc Mon Sep 17 00:00:00 2001 From: zsxwing Date: Tue, 6 May 2014 12:01:33 +0800 Subject: [PATCH 5/7] Add 'startWith' to RxScala --- .../rx/lang/scala/examples/RxScalaDemo.scala | 22 +++++++ .../main/scala/rx/lang/scala/Observable.scala | 60 +++++++++++++++++++ .../rx/lang/scala/CompletenessTest.scala | 8 ++- 3 files changed, 88 insertions(+), 2 deletions(-) diff --git a/language-adaptors/rxjava-scala/src/examples/scala/rx/lang/scala/examples/RxScalaDemo.scala b/language-adaptors/rxjava-scala/src/examples/scala/rx/lang/scala/examples/RxScalaDemo.scala index fcb8f83e26..799c5e3351 100644 --- a/language-adaptors/rxjava-scala/src/examples/scala/rx/lang/scala/examples/RxScalaDemo.scala +++ b/language-adaptors/rxjava-scala/src/examples/scala/rx/lang/scala/examples/RxScalaDemo.scala @@ -756,4 +756,26 @@ class RxScalaDemo extends JUnitSuite { case e: IllegalArgumentException => println("IllegalArgumentException from skipWithException") } } + + @Test def startWithExample1(): Unit = { + val o = List(2, 3).toObservable + 1 + assertEquals(List(1, 2, 3), o.toBlockingObservable.toList) + } + + @Test def startWithExample2(): Unit = { + val prepended = List(2, 4).toObservable + val o = List(5, 6, 7, 8).toObservable.filter(_ % 2 == 0).startWith(prepended) + assertEquals(List(2, 4, 6, 8), o.toBlockingObservable.toList) + } + + @Test def startWithExample3(): Unit = { + val o = List(5, 6, 7, 8).toObservable.filter(_ % 2 == 0).startWith(List(2, 4)) + assertEquals(List(2, 4, 6, 8), o.toBlockingObservable.toList) + } + + @Test def startWithExample4(): Unit = { + val o = List(5, 6, 7, 8).toObservable.filter(_ % 2 == 0).startWith(Array(2, 4)) + assertEquals(List(2, 4, 6, 8), o.toBlockingObservable.toList) + } + } diff --git a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala index 3d3e159c33..7e8540326c 100644 --- a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala +++ b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala @@ -77,6 +77,7 @@ import collection.JavaConversions._ */ trait Observable[+T] { + import scala.collection.JavaConverters._ import scala.collection.Seq import scala.concurrent.duration.{Duration, TimeUnit} import rx.functions._ @@ -238,6 +239,65 @@ trait Observable[+T] toScalaObservable(rx.Observable.concat(o1, o2)) } + /** + * Returns an Observable that emits a specified item before it begins to emit items emitted by the source Observable. + *

+ * + * + * @param elem the item to emit + * @return an Observable that emits the specified item before it begins to emit items emitted by the source Observable + */ + def +[U >: T](elem: U): Observable[U] = { + val thisJava = this.asJavaObservable.asInstanceOf[rx.Observable[U]] + toScalaObservable(thisJava.startWith(elem)) + } + + /** + * Returns an Observable that emits the items in a specified `Observable` before it begins to emit + * items emitted by the source Observable. + *

+ * + * + * @param that an Observable that contains the items you want the modified Observable to emit first + * @return an Observable that emits the items in the specified `Observable` and then emits the items + * emitted by the source Observable + */ + def startWith[U >: T](that: Observable[U]): Observable[U] = { + val thisJava = this.asJavaObservable.asInstanceOf[rx.Observable[U]] + val thatJava = that.asJavaObservable.asInstanceOf[rx.Observable[U]] + toScalaObservable(thisJava.startWith(thatJava)) + } + + /** + * Returns an Observable that emits the items in a specified `Iterable` before it begins to emit items + * emitted by the source Observable. + *

+ * + * + * @param iterable an Iterable that contains the items you want the modified Observable to emit first + * @return an Observable that emits the items in the specified `Iterable` and then emits the items + * emitted by the source Observable + */ + def startWith[U >: T](iterable: Iterable[U]): Observable[U] = { + val thisJava = this.asJavaObservable.asInstanceOf[rx.Observable[U]] + toScalaObservable(thisJava.startWith(iterable.asJava)) + } + + /** + * Returns an Observable that emits the items in a specified `Iterable`, on a specified `Scheduler`, before it begins to emit items emitted by the source Observable. + *

+ * + * + * @param iterable an Iterable that contains the items you want the modified Observable to emit first + * @param scheduler the Scheduler to emit the prepended values on + * @return an Observable that emits the items in the specified `Iterable`, on a specified `Scheduler`, and then emits the items + * emitted by the source Observable + */ + def startWith[U >: T](iterable: Iterable[U], scheduler: Scheduler): Observable[U] = { + val thisJava = this.asJavaObservable.asInstanceOf[rx.Observable[U]] + toScalaObservable(thisJava.startWith(iterable.asJava, scalaSchedulerToJavaScheduler(scheduler))) + } + /** * Returns an Observable that emits the items emitted by several Observables, one after the * other. diff --git a/language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/CompletenessTest.scala b/language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/CompletenessTest.scala index 1de16eab50..fcbce1f2b1 100644 --- a/language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/CompletenessTest.scala +++ b/language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/CompletenessTest.scala @@ -100,7 +100,11 @@ class CompletenessTest extends JUnitSuite { "skipWhile(Func1[_ >: T, Boolean])" -> "dropWhile(T => Boolean)", "skipWhileWithIndex(Func2[_ >: T, Integer, Boolean])" -> unnecessary, "skipUntil(Observable[U])" -> "dropUntil(Observable[E])", - "startWith(Iterable[T])" -> "[unnecessary because we can just use `++` instead]", + "startWith(Array[T])" -> "startWith(Iterable[U])", + "startWith(Array[T], Scheduler)" -> "startWith(Iterable[U], Scheduler)", + "startWith(Iterable[T])" -> "startWith(Iterable[U])", + "startWith(Iterable[T], Scheduler)" -> "startWith(Iterable[U], Scheduler)", + "startWith(Observable[T])" -> "startWith(Observable[U])", "skipLast(Int)" -> "dropRight(Int)", "skipLast(Long, TimeUnit)" -> "dropRight(Duration)", "skipLast(Long, TimeUnit, Scheduler)" -> "dropRight(Duration, Scheduler)", @@ -159,7 +163,7 @@ class CompletenessTest extends JUnitSuite { "zip(Iterable[_ <: Observable[_]], FuncN[_ <: R])" -> "[use `zip` in companion object and `map`]" ) ++ List.iterate("T", 9)(s => s + ", T").map( // all 9 overloads of startWith: - "startWith(" + _ + ")" -> "[unnecessary because we can just use `++` instead]" + "startWith(" + _ + ")" -> "[unnecessary because we can just use `+` instead]" ).toMap ++ List.iterate("Observable[_ <: T]", 9)(s => s + ", Observable[_ <: T]").map( // concat 2-9 "concat(" + _ + ")" -> "[unnecessary because we can use `++` instead or `Observable(o1, o2, ...).concat`]" From 6dcc9d3533431dfec245bf114d6793308cbc13e5 Mon Sep 17 00:00:00 2001 From: zsxwing Date: Tue, 6 May 2014 15:31:07 +0800 Subject: [PATCH 6/7] Add 'publish' variants to RxScala --- .../rx/lang/scala/examples/RxScalaDemo.scala | 22 ++++++++ .../main/scala/rx/lang/scala/Observable.scala | 56 ++++++++++++++++++- .../rx/lang/scala/CompletenessTest.scala | 4 ++ 3 files changed, 81 insertions(+), 1 deletion(-) diff --git a/language-adaptors/rxjava-scala/src/examples/scala/rx/lang/scala/examples/RxScalaDemo.scala b/language-adaptors/rxjava-scala/src/examples/scala/rx/lang/scala/examples/RxScalaDemo.scala index 799c5e3351..a11ec76638 100644 --- a/language-adaptors/rxjava-scala/src/examples/scala/rx/lang/scala/examples/RxScalaDemo.scala +++ b/language-adaptors/rxjava-scala/src/examples/scala/rx/lang/scala/examples/RxScalaDemo.scala @@ -297,6 +297,28 @@ class RxScalaDemo extends JUnitSuite { shared.connect } + @Test def exampleWithPublish2() { + val unshared = Observable.from(1 to 4) + val shared = unshared.publish(0) + shared.subscribe(n => println(s"subscriber 1 gets $n")) + shared.subscribe(n => println(s"subscriber 2 gets $n")) + shared.connect + } + + @Test def exampleWithPublish3() { + val o = Observable.interval(100 millis).take(5).publish((o: Observable[Long]) => o.map(_ * 2)) + o.subscribe(n => println(s"subscriber 1 gets $n")) + o.subscribe(n => println(s"subscriber 2 gets $n")) + Thread.sleep(1000) + } + + @Test def exampleWithPublish4() { + val o = Observable.interval(100 millis).take(5).publish((o: Observable[Long]) => o.map(_ * 2), -1L) + o.subscribe(n => println(s"subscriber 1 gets $n")) + o.subscribe(n => println(s"subscriber 2 gets $n")) + Thread.sleep(1000) + } + def doLater(waitTime: Duration, action: () => Unit): Unit = { Observable.interval(waitTime).take(1).subscribe(_ => action()) } diff --git a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala index 7e8540326c..0473189e63 100644 --- a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala +++ b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala @@ -1145,10 +1145,64 @@ trait Observable[+T] * * @return an [[rx.lang.scala.observables.ConnectableObservable]]. */ - def publish: ConnectableObservable[T] = { + def publish(): ConnectableObservable[T] = { new ConnectableObservable[T](asJavaObservable.publish()) } + + /** + * Returns an Observable that emits `initialValue` followed by the items emitted by a `ConnectableObservable` that shares a single subscription to the source Observable. + *

+ * + * + * @param initialValue the initial value to be emitted by the resulting Observable + * @return a `ConnectableObservable` that shares a single subscription to the underlying Observable and starts with `initialValue` + */ + def publish[U >: T](initialValue: U): ConnectableObservable[U] = { + val thisJava = this.asJavaObservable.asInstanceOf[rx.Observable[U]] + new ConnectableObservable[U](thisJava.publish(initialValue)) + } + + /** + * Returns an Observable that emits the results of invoking a specified selector on items emitted by a `ConnectableObservable` + * that shares a single subscription to the underlying sequence. + *

+ * + * + * @param selector a function that can use the multicasted source sequence as many times as needed, without + * causing multiple subscriptions to the source sequence. Subscribers to the given source will + * receive all notifications of the source from the time of the subscription forward. + * @return an Observable that emits the results of invoking the selector on the items emitted by a `ConnectableObservable` + * that shares a single subscription to the underlying sequence + */ + def publish[U >: T, R](selector: Observable[U] => Observable[R]): Observable[R] = { + val thisJava = this.asJavaObservable.asInstanceOf[rx.Observable[U]] + val fJava: Func1[rx.Observable[U], rx.Observable[R]] = + (jo: rx.Observable[U]) => selector(toScalaObservable[U](jo)).asJavaObservable.asInstanceOf[rx.Observable[R]] + toScalaObservable[R](thisJava.publish(fJava)) + } + + /** + * Returns an Observable that emits `initialValue` followed by the results of invoking a specified + * selector on items emitted by a `ConnectableObservable` that shares a single subscription to the + * source Observable. + *

+ * + * + * @param selector a function that can use the multicasted source sequence as many times as needed, without + * causing multiple subscriptions to the source Observable. Subscribers to the source will + * receive all notifications of the source from the time of the subscription forward + * @param initialValue the initial value of the underlying `BehaviorSubject` + * @return an Observable that emits `initialValue` followed by the results of invoking the selector + * on a `ConnectableObservable` that shares a single subscription to the underlying Observable + */ + def publish[U >: T, R](selector: Observable[U] => Observable[R], initialValue: U): Observable[R] = { + val thisJava = this.asJavaObservable.asInstanceOf[rx.Observable[U]] + val fJava: Func1[rx.Observable[U], rx.Observable[R]] = + (jo: rx.Observable[U]) => selector(toScalaObservable[U](jo)).asJavaObservable.asInstanceOf[rx.Observable[R]] + toScalaObservable[R](thisJava.publish(fJava, initialValue)) + } + // TODO add Scala-like aggregate function /** diff --git a/language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/CompletenessTest.scala b/language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/CompletenessTest.scala index fcbce1f2b1..03b4d879bb 100644 --- a/language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/CompletenessTest.scala +++ b/language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/CompletenessTest.scala @@ -89,6 +89,10 @@ class CompletenessTest extends JUnitSuite { "onExceptionResumeNext(Observable[_ <: T])" -> "onExceptionResumeNext(Observable[U])", "parallel(Func1[Observable[T], Observable[R]])" -> "parallel(Observable[T] => Observable[R])", "parallel(Func1[Observable[T], Observable[R]], Scheduler)" -> "parallel(Observable[T] => Observable[R], Scheduler)", + "publish()" -> "publish()", + "publish(T)" -> "publish(U)", + "publish(Func1[_ >: Observable[T], _ <: Observable[R]])" -> "publish(Observable[U] => Observable[R])", + "publish(Func1[_ >: Observable[T], _ <: Observable[R]], T)" -> "publish(Observable[U] => Observable[R], U)", "reduce(Func2[T, T, T])" -> "reduce((U, U) => U)", "reduce(R, Func2[R, _ >: T, R])" -> "foldLeft(R)((R, T) => R)", "retry()" -> "retry()", From 8397f742884c01199534ffae564675d9223319ee Mon Sep 17 00:00:00 2001 From: zsxwing Date: Tue, 6 May 2014 22:10:04 +0800 Subject: [PATCH 7/7] Replace '+' with '::' --- .../examples/scala/rx/lang/scala/examples/RxScalaDemo.scala | 5 +++-- .../src/main/scala/rx/lang/scala/Observable.scala | 2 +- .../src/test/scala/rx/lang/scala/CompletenessTest.scala | 2 +- 3 files changed, 5 insertions(+), 4 deletions(-) diff --git a/language-adaptors/rxjava-scala/src/examples/scala/rx/lang/scala/examples/RxScalaDemo.scala b/language-adaptors/rxjava-scala/src/examples/scala/rx/lang/scala/examples/RxScalaDemo.scala index a11ec76638..ac61b8d3e9 100644 --- a/language-adaptors/rxjava-scala/src/examples/scala/rx/lang/scala/examples/RxScalaDemo.scala +++ b/language-adaptors/rxjava-scala/src/examples/scala/rx/lang/scala/examples/RxScalaDemo.scala @@ -780,8 +780,9 @@ class RxScalaDemo extends JUnitSuite { } @Test def startWithExample1(): Unit = { - val o = List(2, 3).toObservable + 1 - assertEquals(List(1, 2, 3), o.toBlockingObservable.toList) + val o1 = List(3, 4).toObservable + val o2 = 1 :: 2 :: o1 + assertEquals(List(1, 2, 3, 4), o2.toBlockingObservable.toList) } @Test def startWithExample2(): Unit = { diff --git a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala index 0473189e63..1b32033e4b 100644 --- a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala +++ b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala @@ -247,7 +247,7 @@ trait Observable[+T] * @param elem the item to emit * @return an Observable that emits the specified item before it begins to emit items emitted by the source Observable */ - def +[U >: T](elem: U): Observable[U] = { + def ::[U >: T](elem: U): Observable[U] = { val thisJava = this.asJavaObservable.asInstanceOf[rx.Observable[U]] toScalaObservable(thisJava.startWith(elem)) } diff --git a/language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/CompletenessTest.scala b/language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/CompletenessTest.scala index 03b4d879bb..1dd6436060 100644 --- a/language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/CompletenessTest.scala +++ b/language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/CompletenessTest.scala @@ -167,7 +167,7 @@ class CompletenessTest extends JUnitSuite { "zip(Iterable[_ <: Observable[_]], FuncN[_ <: R])" -> "[use `zip` in companion object and `map`]" ) ++ List.iterate("T", 9)(s => s + ", T").map( // all 9 overloads of startWith: - "startWith(" + _ + ")" -> "[unnecessary because we can just use `+` instead]" + "startWith(" + _ + ")" -> "[unnecessary because we can just use `::` instead]" ).toMap ++ List.iterate("Observable[_ <: T]", 9)(s => s + ", Observable[_ <: T]").map( // concat 2-9 "concat(" + _ + ")" -> "[unnecessary because we can use `++` instead or `Observable(o1, o2, ...).concat`]"