Skip to content

Commit 29162d7

Browse files
Merge pull request #781 from Applied-Duality/join
Fixed buglet in join binding, simplified types
2 parents 64b984e + e359f3c commit 29162d7

File tree

4 files changed

+72
-30
lines changed

4 files changed

+72
-30
lines changed

language-adaptors/rxjava-scala/src/examples/scala/rx/lang/scala/examples/Olympics.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@ object Olympics {
8080
// So we don't use this:
8181
// Observable.interval(fourYears).take(1).map(i => neverUsedDummyMedal).filter(m => false)
8282
// But we just return empty, which completes immediately
83-
Observable.empty[Medal]
83+
Observable.empty
8484
}
8585

8686
}

language-adaptors/rxjava-scala/src/examples/scala/rx/lang/scala/examples/RxScalaDemo.scala

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -243,13 +243,10 @@ class RxScalaDemo extends JUnitSuite {
243243
}
244244

245245
@Test def groupByUntilExample() {
246-
val numbers = Observable.interval(250 millis) take 14
247-
val grouped = numbers.groupByUntil[Long, Long](
248-
{case x => x % 2},
249-
{case (key, obs) => obs filter {case x => x == 7}}
250-
)
251-
val sequenced = (grouped map {case (key, obs) => obs.toSeq}).flatten
252-
sequenced subscribe {x => println(s"Emitted group: $x")}
246+
val numbers = Observable.interval(250 millis).take(14)
247+
val grouped = numbers.groupByUntil[Long](x => x % 2, {case (key, obs) => obs.filter(x => x == 7)})
248+
val sequenced = (grouped.map({ case (key, obs) => obs.toSeq })).flatten
249+
sequenced.subscribe(x => println(s"Emitted group: $x"))
253250
}
254251

255252

@@ -312,7 +309,7 @@ class RxScalaDemo extends JUnitSuite {
312309
}
313310

314311
@Test def averageExample() {
315-
println(doubleAverage(Observable.empty[Double]).toBlockingObservable.single)
312+
println(doubleAverage(Observable.empty).toBlockingObservable.single)
316313
println(doubleAverage(List(0.0).toObservable).toBlockingObservable.single)
317314
println(doubleAverage(List(4.44).toObservable).toBlockingObservable.single)
318315
println(doubleAverage(List(1, 2, 3.5).toObservable).toBlockingObservable.single)

language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala

Lines changed: 59 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -353,10 +353,10 @@ trait Observable[+T]
353353
* @return
354354
* An [[rx.lang.scala.Observable]] which produces buffers which are created and emitted when the specified [[rx.lang.scala.Observable]]s publish certain objects.
355355
*/
356-
def buffer[Opening, Closing](openings: Observable[Opening], closings: Opening => Observable[Closing]): Observable[Seq[T]] = {
356+
def buffer[Opening](openings: Observable[Opening], closings: Opening => Observable[Any]): Observable[Seq[T]] = {
357357
val opening: rx.Observable[_ <: Opening] = openings.asJavaObservable
358-
val closing: Func1[_ >: Opening, _ <: rx.Observable[_ <: Closing]] = (o: Opening) => closings(o).asJavaObservable
359-
val jObs: rx.Observable[_ <: java.util.List[_]] = asJavaObservable.buffer[Opening, Closing](opening, closing)
358+
val closing: Func1[_ >: Opening, _ <: rx.Observable[_ <: Any]] = (o: Opening) => closings(o).asJavaObservable
359+
val jObs: rx.Observable[_ <: java.util.List[_]] = asJavaObservable.buffer[Opening, Any](opening, closing)
360360
Observable.jObsOfListToScObsOfSeq(jObs.asInstanceOf[rx.Observable[_ <: java.util.List[T]]])
361361
}
362362

@@ -540,9 +540,9 @@ trait Observable[+T]
540540
* An [[rx.lang.scala.Observable]] which produces connected non-overlapping windows, which are emitted
541541
* when the current [[rx.lang.scala.Observable]] created with the function argument produces an object.
542542
*/
543-
def window[Closing](closings: () => Observable[Closing]): Observable[Observable[T]] = {
544-
val func : Func0[_ <: rx.Observable[_ <: Closing]] = closings().asJavaObservable
545-
val o1: rx.Observable[_ <: rx.Observable[_]] = asJavaObservable.window[Closing](func)
543+
def window(closings: () => Observable[Any]): Observable[Observable[T]] = {
544+
val func : Func0[_ <: rx.Observable[_ <: Any]] = closings().asJavaObservable
545+
val o1: rx.Observable[_ <: rx.Observable[_]] = asJavaObservable.window[Any](func)
546546
val o2 = Observable.items(o1).map((x: rx.Observable[_]) => {
547547
val x2 = x.asInstanceOf[rx.Observable[_ <: T]]
548548
toScalaObservable[T](x2)
@@ -566,9 +566,9 @@ trait Observable[+T]
566566
* @return
567567
* An [[rx.lang.scala.Observable]] which produces windows which are created and emitted when the specified [[rx.lang.scala.Observable]]s publish certain objects.
568568
*/
569-
def window[Opening, Closing](openings: Observable[Opening], closings: Opening => Observable[Closing]) = {
569+
def window[Opening](openings: Observable[Opening], closings: Opening => Observable[Any]) = {
570570
Observable.jObsOfJObsToScObsOfScObs(
571-
asJavaObservable.window[Opening, Closing](openings.asJavaObservable, (op: Opening) => closings(op).asJavaObservable))
571+
asJavaObservable.window[Opening, Any](openings.asJavaObservable, (op: Opening) => closings(op).asJavaObservable))
572572
: Observable[Observable[T]] // SI-7818
573573
}
574574

@@ -1336,19 +1336,61 @@ trait Observable[+T]
13361336
* an observable that emits a single Closing when the group should be closed.
13371337
* @tparam K
13381338
* the type of the keys returned by the discriminator function.
1339-
* @tparam Closing
1340-
* the type of the element emitted from the closings observable.
13411339
* @return an Observable that emits `(key, observable)` pairs, where `observable`
13421340
* contains all items for which `f` returned `key` before `closings` emits a value.
13431341
*/
1344-
def groupByUntil[K, Closing](f: T => K, closings: (K, Observable[T])=>Observable[Closing]): Observable[(K, Observable[T])] = {
1345-
val fclosing: Func1[_ >: rx.observables.GroupedObservable[K, _ <: T], _ <: rx.Observable[_ <: Closing]] =
1342+
def groupByUntil[K](f: T => K, closings: (K, Observable[T])=>Observable[Any]): Observable[(K, Observable[T])] = {
1343+
val fclosing: Func1[_ >: rx.observables.GroupedObservable[K, _ <: T], _ <: rx.Observable[_ <: Any]] =
13461344
(jGrObs: rx.observables.GroupedObservable[K, _ <: T]) => closings(jGrObs.getKey, toScalaObservable[T](jGrObs)).asJavaObservable
1347-
val o1 = asJavaObservable.groupByUntil[K, Closing](f, fclosing) : rx.Observable[_ <: rx.observables.GroupedObservable[K, _ <: T]]
1345+
val o1 = asJavaObservable.groupByUntil[K, Any](f, fclosing) : rx.Observable[_ <: rx.observables.GroupedObservable[K, _ <: T]]
13481346
val func = (o: rx.observables.GroupedObservable[K, _ <: T]) => (o.getKey, toScalaObservable[T](o))
13491347
toScalaObservable[(K, Observable[T])](o1.map[(K, Observable[T])](func))
13501348
}
13511349

1350+
/**
1351+
* Correlates the items emitted by two Observables based on overlapping durations.
1352+
* <p>
1353+
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/join_.png">
1354+
*
1355+
* @param other
1356+
* the second Observable to join items from
1357+
* @param leftDurationSelector
1358+
* a function to select a duration for each item emitted by the source Observable,
1359+
* used to determine overlap
1360+
* @param rightDurationSelector
1361+
* a function to select a duration for each item emitted by the inner Observable,
1362+
* used to determine overlap
1363+
* @param resultSelector
1364+
* a function that computes an item to be emitted by the resulting Observable for any
1365+
* two overlapping items emitted by the two Observables
1366+
* @return
1367+
* an Observable that emits items correlating to items emitted by the source Observables
1368+
* that have overlapping durations
1369+
* @see <a href="https://github.com/Netflix/RxJava/wiki/Combining-Observables#join">RxJava Wiki: join()</a>
1370+
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229750.aspx">MSDN: Observable.Join</a>
1371+
*/
1372+
def join[S, R] (
1373+
other: Observable[S],
1374+
leftDurationSelector: T => Observable[Any],
1375+
rightDurationSelector: S => Observable[Any],
1376+
resultSelector: (T,S) => R
1377+
): Observable[R] = {
1378+
1379+
val outer : rx.Observable[_ <: T] = this.asJavaObservable
1380+
val inner : rx.Observable[_ <: S] = other.asJavaObservable
1381+
val left: Func1[_ >: T, _<: rx.Observable[_ <: Any]] = (t: T) => leftDurationSelector(t).asJavaObservable
1382+
val right: Func1[_ >: S, _<: rx.Observable[_ <: Any]] = (s: S) => rightDurationSelector(s).asJavaObservable
1383+
val f: Func2[_>: T, _ >: S, _ <: R] = resultSelector
1384+
1385+
toScalaObservable[R](
1386+
outer.asInstanceOf[rx.Observable[T]].join[S, Any, Any, R](
1387+
inner.asInstanceOf[rx.Observable[S]],
1388+
left. asInstanceOf[Func1[T, rx.Observable[Any]]],
1389+
right.asInstanceOf[Func1[S, rx.Observable[Any]]],
1390+
f.asInstanceOf[Func2[T,S,R]])
1391+
)
1392+
}
1393+
13521394
/**
13531395
* Given an Observable that emits Observables, creates a single Observable that
13541396
* emits the items emitted by the most recently published of those Observables.
@@ -2136,18 +2178,15 @@ object Observable {
21362178
* <p>
21372179
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/empty.s.png">
21382180
*
2139-
* @param scheduler the scheduler to call the
2140-
[[rx.lang.scala.Observer#onCompleted onCompleted]] method
2141-
* @param T the type of the items (ostensibly) emitted by the Observable
21422181
* @return an Observable that returns no data to the [[rx.lang.scala.Observer]] and
21432182
* immediately invokes the [[rx.lang.scala.Observer]]r's
21442183
* [[rx.lang.scala.Observer#onCompleted onCompleted]] method with the
21452184
* specified scheduler
21462185
* @see <a href="https://github.com/Netflix/RxJava/wiki/Creating-Observables#empty-error-and-never">RxJava Wiki: empty()</a>
21472186
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229066.aspx">MSDN: Observable.Empty Method (IScheduler)</a>
21482187
*/
2149-
def empty[T]: Observable[T] = {
2150-
toScalaObservable(rx.Observable.empty[T]())
2188+
def empty: Observable[Nothing] = {
2189+
toScalaObservable(rx.Observable.empty[Nothing]())
21512190
}
21522191

21532192
/**
@@ -2159,16 +2198,15 @@ object Observable {
21592198
*
21602199
* @param scheduler the scheduler to call the
21612200
[[rx.lang.scala.Observer#onCompleted onCompleted]] method
2162-
* @param T the type of the items (ostensibly) emitted by the Observable
21632201
* @return an Observable that returns no data to the [[rx.lang.scala.Observer]] and
21642202
* immediately invokes the [[rx.lang.scala.Observer]]r's
21652203
* [[rx.lang.scala.Observer#onCompleted onCompleted]] method with the
21662204
* specified scheduler
21672205
* @see <a href="https://github.com/Netflix/RxJava/wiki/Creating-Observables#empty-error-and-never">RxJava Wiki: empty()</a>
21682206
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229066.aspx">MSDN: Observable.Empty Method (IScheduler)</a>
21692207
*/
2170-
def empty[T](scheduler: Scheduler): Observable[T] = {
2171-
toScalaObservable(rx.Observable.empty[T](scalaSchedulerToJavaScheduler(scheduler)))
2208+
def empty(scheduler: Scheduler): Observable[Nothing] = {
2209+
toScalaObservable(rx.Observable.empty[Nothing](scalaSchedulerToJavaScheduler(scheduler)))
21722210
}
21732211

21742212
/**

language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/ObservableTest.scala

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,13 @@ class ObservableTests extends JUnitSuite {
123123
assertEquals(6, o.toBlockingObservable.single)
124124
}
125125

126+
@Test def testJoin() {
127+
val xs = Observable.items(1,2,3)
128+
val ys = Observable.items("a")
129+
val zs = xs.join[String,String](ys, x => Observable.never, y => Observable.never, (x,y) => y+x)
130+
assertEquals(List("a1", "a2", "a3"),zs.toBlockingObservable.toList)
131+
}
132+
126133
/*
127134
@Test def testHead() {
128135
val observer = mock(classOf[Observer[Int]])

0 commit comments

Comments
 (0)