From ed10ea68d034a8af5066c84a87d6e8545fc92174 Mon Sep 17 00:00:00 2001 From: zsxwing Date: Tue, 3 Jun 2014 11:03:32 +0800 Subject: [PATCH 01/21] Add onErrorFlatMap to RxScala --- .../rx/lang/scala/examples/RxScalaDemo.scala | 34 +++++++++++++++++++ .../main/scala/rx/lang/scala/Observable.scala | 24 +++++++++++++ .../rx/lang/scala/CompletenessTest.scala | 1 + 3 files changed, 59 insertions(+) diff --git a/language-adaptors/rxjava-scala/src/examples/scala/rx/lang/scala/examples/RxScalaDemo.scala b/language-adaptors/rxjava-scala/src/examples/scala/rx/lang/scala/examples/RxScalaDemo.scala index 2c432adf36..1ba6d2ca04 100644 --- a/language-adaptors/rxjava-scala/src/examples/scala/rx/lang/scala/examples/RxScalaDemo.scala +++ b/language-adaptors/rxjava-scala/src/examples/scala/rx/lang/scala/examples/RxScalaDemo.scala @@ -1176,4 +1176,38 @@ class RxScalaDemo extends JUnitSuite { .take(20) .toBlocking.foreach(println) } + + @Test def onErrorResumeNextExample() { + val o = Observable { + (subscriber: Subscriber[Int]) => + subscriber.onNext(1) + subscriber.onNext(2) + subscriber.onError(new IOException("Oops")) + subscriber.onNext(3) + subscriber.onNext(4) + } + o.onErrorResumeNext(_ => Observable.items(10, 11, 12)).subscribe(println(_)) + } + + @Test def onErrorFlatMapExample() { + val o = Observable { + (subscriber: Subscriber[Int]) => + subscriber.onNext(1) + subscriber.onNext(2) + subscriber.onError(new IOException("Oops")) + subscriber.onNext(3) + subscriber.onNext(4) + } + o.onErrorFlatMap((_, _) => Observable.items(10, 11, 12)).subscribe(println(_)) + } + + @Test def onErrorFlatMapExample2() { + val o = Observable.items(4, 2, 0).map(16 / _).onErrorFlatMap { + (e, op) => op match { + case Some(v) if v == 0 => Observable.items(Int.MinValue) + case _ => Observable.empty + } + } + o.subscribe(println(_)) + } } diff --git a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala index d3c0621341..f1c22b357f 100644 --- a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala +++ b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala @@ -1208,6 +1208,30 @@ trait Observable[+T] toScalaObservable[U](asJavaObservable.onErrorReturn(f2)) } + /** + * Intercepts `onError` notifications from the source Observable and replaces them with the + * `onNext` emissions of an Observable returned by a specified function. This allows the source + * sequence to continue even if it issues multiple `onError` notifications. + * + * + * + * @param resumeFunction a function that accepts an `Throwable` and an `Option` associated with this error representing + * the Throwable issued by the source Observable, and returns an Observable that emits items + * that will be emitted in place of the error. If no value is associated with the error, the value + * will be `None`. + * @return the original Observable, with appropriately modified behavior + */ + def onErrorFlatMap[U >: T](resumeFunction: (Throwable, Option[Any]) => Observable[U]): Observable[U] = { + val f = new Func1[rx.exceptions.OnErrorThrowable, rx.Observable[_ <: U]] { + override def call(t: rx.exceptions.OnErrorThrowable): rx.Observable[_ <: U] = { + val v = if (t.isValueNull) Some(t.getValue) else None + resumeFunction(t.getCause, v).asJavaObservable + } + } + val thisJava = asJavaObservable.asInstanceOf[rx.Observable[U]] + toScalaObservable[U](thisJava.onErrorFlatMap(f)) + } + /** * Returns an Observable that applies a function of your choosing to the first item emitted by a * source Observable, then feeds the result of that function along with the second item emitted diff --git a/language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/CompletenessTest.scala b/language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/CompletenessTest.scala index d8a9528daf..108c7b6ad5 100644 --- a/language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/CompletenessTest.scala +++ b/language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/CompletenessTest.scala @@ -106,6 +106,7 @@ class CompletenessTest extends JUnitSuite { "onErrorResumeNext(Observable[_ <: T])" -> "onErrorResumeNext(Observable[U])", "onErrorReturn(Func1[Throwable, _ <: T])" -> "onErrorReturn(Throwable => U)", "onExceptionResumeNext(Observable[_ <: T])" -> "onExceptionResumeNext(Observable[U])", + "onErrorFlatMap(Func1[OnErrorThrowable, _ <: Observable[_ <: T]])" -> "onErrorFlatMap((Throwable, Option[Any]) => Observable[U])", "parallel(Func1[Observable[T], Observable[R]])" -> "parallel(Observable[T] => Observable[R])", "parallel(Func1[Observable[T], Observable[R]], Scheduler)" -> "parallel(Observable[T] => Observable[R], Scheduler)", "publish(T)" -> "publish(U)", From bd444259a11a7652bed0e913c17aaeb379f462fd Mon Sep 17 00:00:00 2001 From: zsxwing Date: Tue, 3 Jun 2014 11:34:47 +0800 Subject: [PATCH 02/21] Add nest to RxScala --- .../src/main/scala/rx/lang/scala/Observable.scala | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala index f1c22b357f..d02063b839 100644 --- a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala +++ b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala @@ -3626,6 +3626,17 @@ trait Observable[+T] def lift[R](operator: Subscriber[R] => Subscriber[T]): Observable[R] = { toScalaObservable(asJavaObservable.lift(toJavaOperator[T, R](operator))) } + + /** + * Converts the source `Observable[T]` into an `Observable[Observable[T]]` that emits the source Observable as its single emission. + * + * + * + * @return an Observable that emits a single item: the source Observable + */ + def nest: Observable[Observable[T]] = { + toScalaObservable(asJavaObservable.nest).map(toScalaObservable[T](_)) + } } /** From 2ba5fe8d7faeaf7d932c38e0594f6a378598c5be Mon Sep 17 00:00:00 2001 From: zsxwing Date: Tue, 3 Jun 2014 13:09:21 +0800 Subject: [PATCH 03/21] Add flatten variant to RxScala --- .../rx/lang/scala/examples/RxScalaDemo.scala | 20 ++++++++++++++++- .../main/scala/rx/lang/scala/Observable.scala | 22 +++++++++++++++++++ .../rx/lang/scala/CompletenessTest.scala | 7 ++++++ 3 files changed, 48 insertions(+), 1 deletion(-) diff --git a/language-adaptors/rxjava-scala/src/examples/scala/rx/lang/scala/examples/RxScalaDemo.scala b/language-adaptors/rxjava-scala/src/examples/scala/rx/lang/scala/examples/RxScalaDemo.scala index 1ba6d2ca04..e1845e9c2d 100644 --- a/language-adaptors/rxjava-scala/src/examples/scala/rx/lang/scala/examples/RxScalaDemo.scala +++ b/language-adaptors/rxjava-scala/src/examples/scala/rx/lang/scala/examples/RxScalaDemo.scala @@ -130,7 +130,7 @@ class RxScalaDemo extends JUnitSuite { o.flatten.takeUntil(stopper).toBlockingObservable.foreach(println(_)) } - @Test def fattenSomeExample() { + @Test def flattenSomeExample() { // To merge some observables which are all known already: List( Observable.interval(200 millis), @@ -139,6 +139,24 @@ class RxScalaDemo extends JUnitSuite { ).toObservable.flatten.take(12).toBlockingObservable.foreach(println(_)) } + @Test def flattenExample() { + List( + Observable.interval(200 millis).map(_ => 1).take(5), + Observable.interval(200 millis).map(_ => 2).take(5), + Observable.interval(200 millis).map(_ => 3).take(5), + Observable.interval(200 millis).map(_ => 4).take(5) + ).toObservable.flatten.toBlocking.foreach(println(_)) + } + + @Test def flattenExample2() { + List( + Observable.interval(200 millis).map(_ => 1).take(5), + Observable.interval(200 millis).map(_ => 2).take(5), + Observable.interval(200 millis).map(_ => 3).take(5), + Observable.interval(200 millis).map(_ => 4).take(5) + ).toObservable.flatten(2).toBlocking.foreach(println(_)) + } + @Test def rangeAndBufferExample() { val o = Observable.from(1 to 18) o.buffer(5).subscribe((l: Seq[Int]) => println(l.mkString("[", ", ", "]"))) diff --git a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala index d02063b839..80845545cd 100644 --- a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala +++ b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala @@ -2329,6 +2329,28 @@ trait Observable[+T] toScalaObservable[U](o5) } + /** + * Flattens an Observable that emits Observables into a single Observable that emits the items emitted by + * those Observables, without any transformation, while limiting the maximum number of concurrent + * subscriptions to these Observables. + * + * + * + * You can combine the items emitted by multiple Observables so that they appear as a single Observable, by + * using the `flatten` method. + * + * @param maxConcurrent the maximum number of Observables that may be subscribed to concurrently + * @return an Observable that emits items that are the result of flattening the Observables emitted by the `source` Observable + * @throws IllegalArgumentException if `maxConcurrent` is less than or equal to 0 + */ + def flatten[U](maxConcurrent: Int)(implicit evidence: Observable[T] <:< Observable[Observable[U]]): Observable[U] = { + val o2: Observable[Observable[U]] = this + val o3: Observable[rx.Observable[_ <: U]] = o2.map(_.asJavaObservable) + val o4: rx.Observable[_ <: rx.Observable[_ <: U]] = o3.asJavaObservable + val o5 = rx.Observable.merge[U](o4, maxConcurrent) + toScalaObservable[U](o5) + } + /** * This behaves like `flatten` except that if any of the merged Observables * notify of an error via [[rx.lang.scala.Observer.onError onError]], this method will diff --git a/language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/CompletenessTest.scala b/language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/CompletenessTest.scala index 108c7b6ad5..905df6bc4e 100644 --- a/language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/CompletenessTest.scala +++ b/language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/CompletenessTest.scala @@ -191,6 +191,13 @@ class CompletenessTest extends JUnitSuite { "just(T, Scheduler)" -> "[use `items(T*).subscribeOn(scheduler)`]", "merge(Observable[_ <: T], Observable[_ <: T])" -> "merge(Observable[U])", "merge(Observable[_ <: Observable[_ <: T]])" -> "flatten(<:<[Observable[T], Observable[Observable[U]]])", + "merge(Observable[_ <: Observable[_ <: T]], Int)" -> "flatten(Int)(<:<[Observable[T], Observable[Observable[U]]])", + "merge(Array[Observable[_ <: T]])" -> "[use `Observable.from(array).flatten`]", + "merge(Array[Observable[_ <: T]], Scheduler)" -> "[use `Observable.from(array, scheduler).flatten`]", + "merge(Iterable[_ <: Observable[_ <: T]])" -> "[use `Observable.from(iter).flatten`]", + "merge(Iterable[_ <: Observable[_ <: T]], Int)" -> "[use `Observable.from(iter).flatten(n)`]", + "merge(Iterable[_ <: Observable[_ <: T]], Int, Scheduler)" -> "[use `Observable.from(iter, scheduler).flatten(n)]", + "merge(Iterable[_ <: Observable[_ <: T]], Scheduler)" -> "[use `Observable.from(iter, scheduler).flatten`]", "mergeDelayError(Observable[_ <: T], Observable[_ <: T])" -> "mergeDelayError(Observable[U])", "mergeDelayError(Observable[_ <: Observable[_ <: T]])" -> "flattenDelayError(<:<[Observable[T], Observable[Observable[U]]])", "parallelMerge(Observable[Observable[T]], Int)" -> "parallelMerge(Int)(<:<[Observable[T], Observable[Observable[U]]])", From 03148a3428e754c61897a7adbab49a1016a9cd1f Mon Sep 17 00:00:00 2001 From: zsxwing Date: Tue, 3 Jun 2014 16:16:08 +0800 Subject: [PATCH 04/21] Add switchMap to RxScala --- .../rx/lang/scala/examples/RxScalaDemo.scala | 7 +++++++ .../main/scala/rx/lang/scala/Observable.scala | 17 +++++++++++++++++ 2 files changed, 24 insertions(+) diff --git a/language-adaptors/rxjava-scala/src/examples/scala/rx/lang/scala/examples/RxScalaDemo.scala b/language-adaptors/rxjava-scala/src/examples/scala/rx/lang/scala/examples/RxScalaDemo.scala index e1845e9c2d..a9f088ed14 100644 --- a/language-adaptors/rxjava-scala/src/examples/scala/rx/lang/scala/examples/RxScalaDemo.scala +++ b/language-adaptors/rxjava-scala/src/examples/scala/rx/lang/scala/examples/RxScalaDemo.scala @@ -1228,4 +1228,11 @@ class RxScalaDemo extends JUnitSuite { } o.subscribe(println(_)) } + + @Test def switchMapExample() { + val o = Observable.interval(300 millis).take(5).switchMap[String] { + n => Observable.interval(50 millis).take(10).map(i => s"Seq ${n}: ${i}") + } + o.toBlocking.foreach(println) + } } diff --git a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala index 80845545cd..7119a8eb2a 100644 --- a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala +++ b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala @@ -2237,6 +2237,23 @@ trait Observable[+T] ) } + /** + * Returns a new Observable by applying a function that you supply to each item emitted by the source + * Observable that returns an Observable, and then emitting the items emitted by the most recently emitted + * of these Observables. + * + * + * + * @param f a function that, when applied to an item emitted by the source Observable, returns an Observable + * @return an Observable that emits the items emitted by the Observable returned from applying a function to + * the most recently emitted item emitted by the source Observable + */ + def switchMap[R](f: T => Observable[R]): Observable[R] = { + toScalaObservable[R](asJavaObservable.switchMap[R](new Func1[T, rx.Observable[_ <: R]] { + def call(t: T): rx.Observable[_ <: R] = f(t).asJavaObservable + })) + } + /** * Given an Observable that emits Observables, creates a single Observable that * emits the items emitted by the most recently published of those Observables. From 54a47935d66ca1fa0af073ffeeaf772ef701517c Mon Sep 17 00:00:00 2001 From: zsxwing Date: Tue, 3 Jun 2014 17:33:57 +0800 Subject: [PATCH 05/21] Add retry variant to RxScala --- .../rx/lang/scala/examples/RxScalaDemo.scala | 22 +++++++++++++++++++ .../main/scala/rx/lang/scala/Observable.scala | 16 ++++++++++++++ .../rx/lang/scala/CompletenessTest.scala | 1 + 3 files changed, 39 insertions(+) diff --git a/language-adaptors/rxjava-scala/src/examples/scala/rx/lang/scala/examples/RxScalaDemo.scala b/language-adaptors/rxjava-scala/src/examples/scala/rx/lang/scala/examples/RxScalaDemo.scala index a9f088ed14..34f5b7ebfb 100644 --- a/language-adaptors/rxjava-scala/src/examples/scala/rx/lang/scala/examples/RxScalaDemo.scala +++ b/language-adaptors/rxjava-scala/src/examples/scala/rx/lang/scala/examples/RxScalaDemo.scala @@ -875,6 +875,28 @@ class RxScalaDemo extends JUnitSuite { assertEquals(List("alice", "bob", "carol"), o.retry(3).toBlockingObservable.toList) } + @Test def retryExample3(): Unit = { + var isFirst = true + val o = Observable { + (subscriber: Subscriber[String]) => + if (isFirst) { + subscriber.onNext("alice") + subscriber.onError(new IOException("Oops")) + isFirst = false + } + else { + subscriber.onNext("bob") + subscriber.onError(new RuntimeException("Oops")) + } + } + o.retry { + (times, e) => e match { + case e: IOException => times <= 3 + case _ => false + } + }.subscribe(s => println(s), e => e.printStackTrace()) + } + @Test def liftExample1(): Unit = { // Add "No. " in front of each item val o = List(1, 2, 3).toObservable.lift { diff --git a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala index 7119a8eb2a..c1766b5444 100644 --- a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala +++ b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala @@ -3086,6 +3086,22 @@ trait Observable[+T] toScalaObservable[T](asJavaObservable.retry()) } + /** + * Returns an Observable that mirrors the source Observable, resubscribing to it if it calls `onError` + * and the predicate returns true for that specific exception and retry count. + * + * + * + * @param predicate the predicate that determines if a resubscription may happen in case of a specific exception and retry count + * @return the source Observable modified with retry logic + */ + def retry(predicate: (Int, Throwable) => Boolean): Observable[T] = { + val f = new Func2[java.lang.Integer, Throwable, java.lang.Boolean] { + def call(times: java.lang.Integer, e: Throwable): java.lang.Boolean = predicate(times, e) + } + toScalaObservable[T](asJavaObservable.retry(f)) + } + /** * Returns an Observable that repeats the sequence of items emitted by the source Observable indefinitely. *

diff --git a/language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/CompletenessTest.scala b/language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/CompletenessTest.scala index 905df6bc4e..33aa94066d 100644 --- a/language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/CompletenessTest.scala +++ b/language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/CompletenessTest.scala @@ -123,6 +123,7 @@ class CompletenessTest extends JUnitSuite { "replay(Func1[_ >: Observable[T], _ <: Observable[R]], Long, TimeUnit)" -> "replay(Observable[U] => Observable[R], Duration)", "replay(Func1[_ >: Observable[T], _ <: Observable[R]], Long, TimeUnit, Scheduler)" -> "replay(Observable[U] => Observable[R], Duration, Scheduler)", "replay(Func1[_ >: Observable[T], _ <: Observable[R]], Scheduler)" -> "replay(Observable[U] => Observable[R], Scheduler)", + "retry(Func2[Integer, Throwable, Boolean])" -> "retry((Int, Throwable) => Boolean)", "sample(Observable[U])" -> "sample(Observable[Any])", "scan(Func2[T, T, T])" -> unnecessary, "scan(R, Func2[R, _ >: T, R])" -> "scan(R)((R, T) => R)", From 31412786a579645a53e83a667a3aba14618dcccc Mon Sep 17 00:00:00 2001 From: zsxwing Date: Tue, 3 Jun 2014 20:48:17 +0800 Subject: [PATCH 06/21] Add error variant to RxScala --- .../src/main/scala/rx/lang/scala/Observable.scala | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala index c1766b5444..de2e3a88c2 100644 --- a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala +++ b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala @@ -3811,6 +3811,21 @@ object Observable { toScalaObservable[T](rx.Observable.error(exception)) } + /** + * Returns an Observable that invokes an `Observer`'s `onError` method on the + * specified Scheduler. + * + * + * + * @param exception the particular Throwable to pass to `onError` + * @param scheduler the Scheduler on which to call `onError` + * @tparam T the type of the items (ostensibly) emitted by the Observable + * @return an Observable that invokes the `Observer`'s `onError` method, on the specified Scheduler + */ + def error[T](exception: Throwable, scheduler: Scheduler): Observable[T] = { + toScalaObservable[T](rx.Observable.error(exception, scheduler)) + } + /** * Returns an Observable that emits no data to the [[rx.lang.scala.Observer]] and * immediately invokes its [[rx.lang.scala.Observer#onCompleted onCompleted]] method From 8aae497af0aba87ab11685dcf0017b7ab6105ad8 Mon Sep 17 00:00:00 2001 From: zsxwing Date: Tue, 3 Jun 2014 21:14:32 +0800 Subject: [PATCH 07/21] Add foreach to RxScala --- .../main/scala/rx/lang/scala/Observable.scala | 42 +++++++++++++++++++ .../rx/lang/scala/CompletenessTest.scala | 3 ++ 2 files changed, 45 insertions(+) diff --git a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala index de2e3a88c2..220c5fd4f8 100644 --- a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala +++ b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala @@ -3692,6 +3692,48 @@ trait Observable[+T] def nest: Observable[Observable[T]] = { toScalaObservable(asJavaObservable.nest).map(toScalaObservable[T](_)) } + + /** + * Subscribes to the [[Observable]] and receives notifications for each element. + * + * Alias to `subscribe(T => Unit)`. + * + * @param onNext function to execute for each item. + * @throws IllegalArgumentException if `onNext` is null + * @since 0.19 + */ + def foreach(onNext: T => Unit): Unit = { + asJavaObservable.subscribe(onNext) + } + + /** + * Subscribes to the [[Observable]] and receives notifications for each element and error events. + * + * Alias to `subscribe(T => Unit, Throwable => Unit)`. + * + * @param onNext function to execute for each item. + * @param onError function to execute when an error is emitted. + * @throws IllegalArgumentException if `onNext` is null, or if `onError` is null + * @since 0.19 + */ + def foreach(onNext: T => Unit, onError: Throwable => Unit): Unit = { + asJavaObservable.subscribe(onNext, onError) + } + + /** + * Subscribes to the [[Observable]] and receives notifications for each element and the terminal events. + * + * Alias to `subscribe(T => Unit, Throwable => Unit, () => Unit)`. + * + * @param onNext function to execute for each item. + * @param onError function to execute when an error is emitted. + * @param onComplete function to execute when completion is signalled. + * @throws IllegalArgumentException if `onNext` is null, or if `onError` is null, or if `onComplete` is null + * @since 0.19 + */ + def foreach(onNext: T => Unit, onError: Throwable => Unit, onComplete: () => Unit): Unit = { + asJavaObservable.subscribe(onNext, onError, onComplete) + } } /** diff --git a/language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/CompletenessTest.scala b/language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/CompletenessTest.scala index 33aa94066d..413e5c441a 100644 --- a/language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/CompletenessTest.scala +++ b/language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/CompletenessTest.scala @@ -86,6 +86,9 @@ class CompletenessTest extends JUnitSuite { "first(Func1[_ >: T, Boolean])" -> commentForFirstWithPredicate, "firstOrDefault(T)" -> "firstOrElse(=> U)", "firstOrDefault(T, Func1[_ >: T, Boolean])" -> "[use `.filter(condition).firstOrElse(default)`]", + "forEach(Action1[_ >: T])" -> "foreach(T => Unit)", + "forEach(Action1[_ >: T], Action1[Throwable])" -> "foreach(T => Unit, Throwable => Unit)", + "forEach(Action1[_ >: T], Action1[Throwable], Action0)" -> "foreach(T => Unit, Throwable => Unit, () => Unit)", "groupBy(Func1[_ >: T, _ <: K], Func1[_ >: T, _ <: R])" -> "[use `groupBy` and `map`]", "groupByUntil(Func1[_ >: T, _ <: TKey], Func1[_ >: GroupedObservable[TKey, T], _ <: Observable[_ <: TDuration]])" -> "groupByUntil(T => K, (K, Observable[T]) => Observable[Any])", "ignoreElements()" -> "[use `filter(_ => false)`]", From 6f85e545c3185b52402cd84f2d8731b01087fdd4 Mon Sep 17 00:00:00 2001 From: zsxwing Date: Tue, 3 Jun 2014 21:44:59 +0800 Subject: [PATCH 08/21] Add combineLatest variant to RxScala --- .../rx/lang/scala/examples/RxScalaDemo.scala | 11 ++++++++++ .../main/scala/rx/lang/scala/Observable.scala | 20 +++++++++++++++++++ .../rx/lang/scala/CompletenessTest.scala | 1 + 3 files changed, 32 insertions(+) diff --git a/language-adaptors/rxjava-scala/src/examples/scala/rx/lang/scala/examples/RxScalaDemo.scala b/language-adaptors/rxjava-scala/src/examples/scala/rx/lang/scala/examples/RxScalaDemo.scala index 34f5b7ebfb..626ffe0252 100644 --- a/language-adaptors/rxjava-scala/src/examples/scala/rx/lang/scala/examples/RxScalaDemo.scala +++ b/language-adaptors/rxjava-scala/src/examples/scala/rx/lang/scala/examples/RxScalaDemo.scala @@ -304,6 +304,17 @@ class RxScalaDemo extends JUnitSuite { waitFor(combinedCounter) } + @Test def combineLatestExample2() { + val firstCounter = Observable.interval(250 millis) + val secondCounter = Observable.interval(550 millis) + val thirdCounter = Observable.interval(850 millis) + val sources = Seq(firstCounter, secondCounter, thirdCounter) + val combinedCounter = Observable.combineLatest(sources, (items: Seq[Long]) => items.toList).take(10) + + combinedCounter subscribe {x => println(s"Emitted group: $x")} + waitFor(combinedCounter) + } + @Test def olympicsExampleWithoutPublish() { val medals = Olympics.mountainBikeMedals.doOnEach(_ => println("onNext")) medals.subscribe(println(_)) // triggers an execution of medals Observable diff --git a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala index 220c5fd4f8..1c62c8a3af 100644 --- a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala +++ b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala @@ -4185,6 +4185,26 @@ object Observable { def amb[T](sources: Observable[T]*): Observable[T] = { toScalaObservable[T](rx.Observable.amb[T](sources.map(_.asJavaObservable).asJava)) } + + /** + * Combines a list of source Observables by emitting an item that aggregates the latest values of each of + * the source Observables each time an item is received from any of the source Observables, where this + * aggregation is defined by a specified function. + * + * @tparam T the common base type of source values + * @tparam R the result type + * @param sources the list of source Observables + * @param combineFunction the aggregation function used to combine the items emitted by the source Observables + * @return an Observable that emits items that are the result of combining the items emitted by the source + * Observables by means of the given aggregation function + */ + def combineLatest[T, R](sources: Seq[Observable[T]], combineFunction: Seq[T] => R): Observable[R] = { + val jSources = new java.util.ArrayList[rx.Observable[_ <: T]](sources.map(_.asJavaObservable).asJava) + val jCombineFunction = new rx.functions.FuncN[R] { + override def call(args: java.lang.Object*): R = combineFunction(args.map(_.asInstanceOf[T])) + } + toScalaObservable[R](rx.Observable.combineLatest[T, R](jSources, jCombineFunction)) + } } diff --git a/language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/CompletenessTest.scala b/language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/CompletenessTest.scala index 413e5c441a..0691609075 100644 --- a/language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/CompletenessTest.scala +++ b/language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/CompletenessTest.scala @@ -184,6 +184,7 @@ class CompletenessTest extends JUnitSuite { "create(OnSubscribeFunc[T])" -> "create(Observer[T] => Subscription)", "create(OnSubscribe[T])" -> "apply(Subscriber[T] => Unit)", "combineLatest(Observable[_ <: T1], Observable[_ <: T2], Func2[_ >: T1, _ >: T2, _ <: R])" -> "combineLatest(Observable[U])", + "combineLatest(List[_ <: Observable[_ <: T]], FuncN[_ <: R])" -> "combineLatest(Seq[Observable[T]], Seq[T] => R)", "concat(Observable[_ <: Observable[_ <: T]])" -> "concat(<:<[Observable[T], Observable[Observable[U]]])", "defer(Func0[_ <: Observable[_ <: T]])" -> "defer(=> Observable[T])", "from(Array[T])" -> "[use `items(T*)`]", From 39237865b667261b4d97c601e7bdeaa314641427 Mon Sep 17 00:00:00 2001 From: zsxwing Date: Tue, 3 Jun 2014 22:02:10 +0800 Subject: [PATCH 09/21] Add orElse to RxScala --- .../main/scala/rx/lang/scala/Observable.scala | 20 +++++++++++++++++++ .../rx/lang/scala/CompletenessTest.scala | 1 + .../scala/rx/lang/scala/ObservableTest.scala | 12 +++++++++++ 3 files changed, 33 insertions(+) diff --git a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala index 1c62c8a3af..b3e4bb3f8d 100644 --- a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala +++ b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala @@ -2975,6 +2975,26 @@ trait Observable[+T] } } + /** + * Returns an Observable that emits the items emitted by the source Observable or a specified default item + * if the source Observable is empty. + * + * + * + * @param default the item to emit if the source Observable emits no items. This is a by-name parameter, so it is + * only evaluated if the source Observable doesn't emit anything. + * @return an Observable that emits either the specified default item if the source Observable emits no + * items, or the items emitted by the source Observable + */ + def orElse[U >: T](default: => U): Observable[U] = { + val jObservableOption = map(Some(_)).asJavaObservable.asInstanceOf[rx.Observable[Option[T]]] + val o = toScalaObservable[Option[T]](jObservableOption.defaultIfEmpty(None)) + o map { + case Some(element) => element + case None => default + } + } + /** * Returns an Observable that forwards all sequentially distinct items emitted from the source Observable. * diff --git a/language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/CompletenessTest.scala b/language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/CompletenessTest.scala index 0691609075..2f10ddf3c0 100644 --- a/language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/CompletenessTest.scala +++ b/language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/CompletenessTest.scala @@ -79,6 +79,7 @@ class CompletenessTest extends JUnitSuite { "contains(Any)" -> "contains(U)", "count()" -> "length", "debounce(Func1[_ >: T, _ <: Observable[U]])" -> "debounce(T => Observable[Any])", + "defaultIfEmpty(T)" -> "orElse(=> U)", "delay(Func0[_ <: Observable[U]], Func1[_ >: T, _ <: Observable[V]])" -> "delay(() => Observable[Any], T => Observable[Any])", "delay(Func1[_ >: T, _ <: Observable[U]])" -> "delay(T => Observable[Any])", "dematerialize()" -> "dematerialize(<:<[Observable[T], Observable[Notification[U]]])", diff --git a/language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/ObservableTest.scala b/language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/ObservableTest.scala index cda1395b27..a5d64335b2 100644 --- a/language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/ObservableTest.scala +++ b/language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/ObservableTest.scala @@ -216,4 +216,16 @@ class ObservableTests extends JUnitSuite { o.subscribe() assertFalse(called) } + + @Test + def testOrElse() { + val o = Observable.items(1, 2, 3).orElse(4) + assertEquals(List(1, 2, 3), o.toBlocking.toList) + } + + @Test + def testOrElseWithEmpty() { + val o = Observable.empty.orElse(-1) + assertEquals(List(-1), o.toBlocking.toList) + } } From d4d43a6731fd3a0ea6c1b289330c39c93b7b1792 Mon Sep 17 00:00:00 2001 From: zsxwing Date: Tue, 3 Jun 2014 22:14:44 +0800 Subject: [PATCH 10/21] Update CompletenessTest.scala --- .../test/scala/rx/lang/scala/CompletenessTest.scala | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/CompletenessTest.scala b/language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/CompletenessTest.scala index 2f10ddf3c0..89f86bf8de 100644 --- a/language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/CompletenessTest.scala +++ b/language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/CompletenessTest.scala @@ -70,12 +70,15 @@ class CompletenessTest extends JUnitSuite { "aggregate(Func2[T, T, T])" -> "reduce((U, U) => U)", "aggregate(R, Func2[R, _ >: T, R])" -> "foldLeft(R)((R, T) => R)", "all(Func1[_ >: T, Boolean])" -> "forall(T => Boolean)", + "asObservable()" -> unnecessary, "buffer(Long, Long, TimeUnit)" -> "buffer(Duration, Duration)", "buffer(Long, Long, TimeUnit, Scheduler)" -> "buffer(Duration, Duration, Scheduler)", "buffer(Func0[_ <: Observable[_ <: TClosing]])" -> "buffer(=> Observable[Any])", "buffer(Observable[B])" -> "buffer(=> Observable[Any])", "buffer(Observable[B], Int)" -> "buffer(Observable[Any], Int)", "buffer(Observable[_ <: TOpening], Func1[_ >: TOpening, _ <: Observable[_ <: TClosing]])" -> "buffer(Observable[Opening], Opening => Observable[Any])", + "cast(Class[R])" -> unnecessary, + "collect(R, Action2[R, _ >: T])" -> unnecessary, "contains(Any)" -> "contains(U)", "count()" -> "length", "debounce(Func1[_ >: T, _ <: Observable[U]])" -> "debounce(T => Observable[Any])", @@ -83,6 +86,7 @@ class CompletenessTest extends JUnitSuite { "delay(Func0[_ <: Observable[U]], Func1[_ >: T, _ <: Observable[V]])" -> "delay(() => Observable[Any], T => Observable[Any])", "delay(Func1[_ >: T, _ <: Observable[U]])" -> "delay(T => Observable[Any])", "dematerialize()" -> "dematerialize(<:<[Observable[T], Observable[Notification[U]]])", + "doOnEach(Action1[Notification[_ >: T]])" -> "[use `doOnEach(T => Unit, Throwable => Unit, () => Unit)`]", "elementAtOrDefault(Int, T)" -> "elementAtOrDefault(Int, U)", "first(Func1[_ >: T, Boolean])" -> commentForFirstWithPredicate, "firstOrDefault(T)" -> "firstOrElse(=> U)", @@ -106,6 +110,7 @@ class CompletenessTest extends JUnitSuite { "mergeMapIterable(Func1[_ >: T, _ <: Iterable[_ <: U]], Func2[_ >: T, _ >: U, _ <: R])" -> "flatMapIterable(T => Iterable[U], (T, U) => R)", "multicast(Subject[_ >: T, _ <: R])" -> "multicast(Subject[R])", "multicast(Func0[_ <: Subject[_ >: T, _ <: TIntermediate]], Func1[_ >: Observable[TIntermediate], _ <: Observable[TResult]])" -> "multicast(() => Subject[R], Observable[R] => Observable[U])", + "ofType(Class[R])" -> "[use `filter(_.isInstanceOf[Class])", "onErrorResumeNext(Func1[Throwable, _ <: Observable[_ <: T]])" -> "onErrorResumeNext(Throwable => Observable[U])", "onErrorResumeNext(Observable[_ <: T])" -> "onErrorResumeNext(Observable[U])", "onErrorReturn(Func1[Throwable, _ <: T])" -> "onErrorReturn(Throwable => U)", @@ -178,6 +183,7 @@ class CompletenessTest extends JUnitSuite { "window(Long, Long, TimeUnit, Scheduler)" -> "window(Duration, Duration, Scheduler)", // manually added entries for Java static methods + "amb(Iterable[_ <: Observable[_ <: T]])" -> "amb(Observable[T]*)", "average(Observable[Integer])" -> averageProblem, "averageDoubles(Observable[Double])" -> averageProblem, "averageFloats(Observable[Float])" -> averageProblem, @@ -188,7 +194,9 @@ class CompletenessTest extends JUnitSuite { "combineLatest(List[_ <: Observable[_ <: T]], FuncN[_ <: R])" -> "combineLatest(Seq[Observable[T]], Seq[T] => R)", "concat(Observable[_ <: Observable[_ <: T]])" -> "concat(<:<[Observable[T], Observable[Observable[U]]])", "defer(Func0[_ <: Observable[_ <: T]])" -> "defer(=> Observable[T])", - "from(Array[T])" -> "[use `items(T*)`]", + "from(Array[T])" -> "items(T*)", + "from([T])" -> "items(T*)", + "from(Array[T], Scheduler)" -> "from(Iterable[T], Scheduler)", "from(Iterable[_ <: T])" -> "from(Iterable[T])", "from(Future[_ <: T])" -> fromFuture, "from(Future[_ <: T], Long, TimeUnit)" -> fromFuture, From 1220de782cce7ee35ea90aff572ca929f4086fb4 Mon Sep 17 00:00:00 2001 From: zsxwing Date: Wed, 4 Jun 2014 11:53:11 +0800 Subject: [PATCH 11/21] Add groupByUntil variant to RxScala --- .../rx/lang/scala/examples/RxScalaDemo.scala | 7 +++++ .../main/scala/rx/lang/scala/Observable.scala | 29 +++++++++++++++++++ .../rx/lang/scala/CompletenessTest.scala | 1 + 3 files changed, 37 insertions(+) diff --git a/language-adaptors/rxjava-scala/src/examples/scala/rx/lang/scala/examples/RxScalaDemo.scala b/language-adaptors/rxjava-scala/src/examples/scala/rx/lang/scala/examples/RxScalaDemo.scala index 626ffe0252..c3f99bdc75 100644 --- a/language-adaptors/rxjava-scala/src/examples/scala/rx/lang/scala/examples/RxScalaDemo.scala +++ b/language-adaptors/rxjava-scala/src/examples/scala/rx/lang/scala/examples/RxScalaDemo.scala @@ -294,6 +294,13 @@ class RxScalaDemo extends JUnitSuite { sequenced.subscribe(x => println(s"Emitted group: $x")) } + @Test def groupByUntilExample2() { + val numbers = Observable.interval(250 millis).take(14) + val grouped = numbers.groupByUntil[Long, Long](x => x % 2, x => x * 10, {case (key, obs) => Observable.interval(2 seconds)}) + val sequenced = (grouped.map({ case (key, obs) => obs.toSeq })).flatten + sequenced.toBlocking.foreach(x => println(s"Emitted group: $x")) + } + @Test def combineLatestExample() { val firstCounter = Observable.interval(250 millis) val secondCounter = Observable.interval(550 millis) diff --git a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala index b3e4bb3f8d..b101889a71 100644 --- a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala +++ b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala @@ -2193,6 +2193,35 @@ trait Observable[+T] toScalaObservable[(K, Observable[T])](o1.map[(K, Observable[T])](func)) } + /** + * Groups the items emitted by an [[Observable]] (transformed by a selector) according to a specified key selector function + * until the duration Observable expires for the key. + * + * + * + * Note: The `Observable` in the pair `(K, Observable[V])` will cache the items it is to emit until such time as it + * is subscribed to. For this reason, in order to avoid memory leaks, you should not simply ignore those `Observable` that + * do not concern you. Instead, you can signal to them that they may discard their buffers by applying an operator like `take(0)` to them. + * + * @param keySelector a function to extract the key for each item + * @param valueSelector a function to map each item emitted by the source [[Observable]] to an item emitted by one + * of the resulting `Observable[V]`s + * @param closings a function to signal the expiration of a group + * @return an [[Observable]] that emits pairs of key and `Observable[V]`, each of which corresponds to a key + * value and each of which emits all items emitted by the source [[Observable]] during that + * key's duration that share that same key value, transformed by the value selector + */ + def groupByUntil[K, V](keySelector: T => K, valueSelector: T => V, closings: (K, Observable[V]) => Observable[Any]): Observable[(K, Observable[V])] = { + val jKeySelector: Func1[_ >: T, _ <: K] = keySelector + val jValueSelector: Func1[_ >: T, _ <: V] = valueSelector + val jDurationSelector = new Func1[rx.observables.GroupedObservable[_ <: K, _ <: V], rx.Observable[_ <: Any]] { + override def call(jgo: rx.observables.GroupedObservable[_ <: K, _ <: V]): rx.Observable[_ <: Any] = closings(jgo.getKey, toScalaObservable[V](jgo)) + } + val f = (o: rx.observables.GroupedObservable[K, _ <: V]) => (o.getKey, toScalaObservable[V](o)) + val jo = asJavaObservable.groupByUntil[K, V, Any](jKeySelector, jValueSelector, jDurationSelector).map[(K, Observable[V])](f) + toScalaObservable[(K, Observable[V])](jo) + } + /** * Correlates the items emitted by two Observables based on overlapping durations. *

diff --git a/language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/CompletenessTest.scala b/language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/CompletenessTest.scala index 89f86bf8de..93decd1788 100644 --- a/language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/CompletenessTest.scala +++ b/language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/CompletenessTest.scala @@ -96,6 +96,7 @@ class CompletenessTest extends JUnitSuite { "forEach(Action1[_ >: T], Action1[Throwable], Action0)" -> "foreach(T => Unit, Throwable => Unit, () => Unit)", "groupBy(Func1[_ >: T, _ <: K], Func1[_ >: T, _ <: R])" -> "[use `groupBy` and `map`]", "groupByUntil(Func1[_ >: T, _ <: TKey], Func1[_ >: GroupedObservable[TKey, T], _ <: Observable[_ <: TDuration]])" -> "groupByUntil(T => K, (K, Observable[T]) => Observable[Any])", + "groupByUntil(Func1[_ >: T, _ <: TKey], Func1[_ >: T, _ <: TValue], Func1[_ >: GroupedObservable[TKey, TValue], _ <: Observable[_ <: TDuration]])" -> "groupByUntil(T => K, T => V, (K, Observable[V]) => Observable[Any])", "ignoreElements()" -> "[use `filter(_ => false)`]", "last(Func1[_ >: T, Boolean])" -> "[use `filter(predicate).last`]", "lastOrDefault(T)" -> "lastOrElse(=> U)", From fa9ff95e44ceb6f97fd3dd550764a5882e77cb47 Mon Sep 17 00:00:00 2001 From: zsxwing Date: Wed, 4 Jun 2014 14:46:40 +0800 Subject: [PATCH 12/21] Add groupJoin to RxScala --- .../rx/lang/scala/examples/RxScalaDemo.scala | 20 +++++++++++++ .../main/scala/rx/lang/scala/Observable.scala | 30 +++++++++++++++++++ .../rx/lang/scala/CompletenessTest.scala | 2 ++ 3 files changed, 52 insertions(+) diff --git a/language-adaptors/rxjava-scala/src/examples/scala/rx/lang/scala/examples/RxScalaDemo.scala b/language-adaptors/rxjava-scala/src/examples/scala/rx/lang/scala/examples/RxScalaDemo.scala index c3f99bdc75..360ff6cb8f 100644 --- a/language-adaptors/rxjava-scala/src/examples/scala/rx/lang/scala/examples/RxScalaDemo.scala +++ b/language-adaptors/rxjava-scala/src/examples/scala/rx/lang/scala/examples/RxScalaDemo.scala @@ -1275,4 +1275,24 @@ class RxScalaDemo extends JUnitSuite { } o.toBlocking.foreach(println) } + + @Test def joinExample() { + val o1 = Observable.interval(500 millis).map(n => "1: " + n) + val o2 = Observable.interval(100 millis).map(n => "2: " + n) + val o = o1.join(o2, + (_: String) => Observable.timer(300 millis), + (_: String) => Observable.timer(200 millis), + (t1: String, t2: String) => (t1, t2)) + o.take(10).toBlocking.foreach(println) + } + + @Test def groupJoinExample() { + val o1 = Observable.interval(500 millis).map(n => "1: " + n) + val o2 = Observable.interval(100 millis).map(n => "2: " + n) + val o = o1.groupJoin(o2, + (_: String) => Observable.timer(300 millis), + (_: String) => Observable.timer(200 millis), + (t1: String, t2: Observable[String]) => (t1, t2.toSeq.toBlocking.single)) + o.take(3).toBlocking.foreach(println) + } } diff --git a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala index b101889a71..eb75ef27a0 100644 --- a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala +++ b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala @@ -2266,6 +2266,36 @@ trait Observable[+T] ) } + /** + * Returns an Observable that correlates two Observables when they overlap in time and groups the results. + * + * + * + * @param other the other Observable to correlate items from the source Observable with + * @param leftDuration a function that returns an Observable whose emissions indicate the duration of the values of + * the source Observable + * @param rightDuration a function that returns an Observable whose emissions indicate the duration of the values of + * the `other` Observable + * @param resultSelector a function that takes an item emitted by each Observable and returns the value to be emitted + * by the resulting Observable + * @return an Observable that emits items based on combining those items emitted by the source Observables + * whose durations overlap + */ + def groupJoin[S, R](other: Observable[S], leftDuration: T => Observable[Any], rightDuration: S => Observable[Any], resultSelector: (T, Observable[S]) => R): Observable[R] = { + val outer: rx.Observable[_ <: T] = this.asJavaObservable + val inner: rx.Observable[_ <: S] = other.asJavaObservable + val left: Func1[_ >: T, _ <: rx.Observable[_ <: Any]] = (t: T) => leftDuration(t).asJavaObservable + val right: Func1[_ >: S, _ <: rx.Observable[_ <: Any]] = (s: S) => rightDuration(s).asJavaObservable + val f: Func2[_ >: T, _ >: rx.Observable[S], _ <: R] = (t: T, o: rx.Observable[S]) => resultSelector(t, toScalaObservable[S](o)) + toScalaObservable[R]( + outer.asInstanceOf[rx.Observable[T]].groupJoin[S, Any, Any, R]( + inner.asInstanceOf[rx.Observable[S]], + left.asInstanceOf[Func1[T, rx.Observable[Any]]], + right.asInstanceOf[Func1[S, rx.Observable[Any]]], + f) + ) + } + /** * Returns a new Observable by applying a function that you supply to each item emitted by the source * Observable that returns an Observable, and then emitting the items emitted by the most recently emitted diff --git a/language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/CompletenessTest.scala b/language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/CompletenessTest.scala index 93decd1788..2c71ce749c 100644 --- a/language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/CompletenessTest.scala +++ b/language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/CompletenessTest.scala @@ -97,7 +97,9 @@ class CompletenessTest extends JUnitSuite { "groupBy(Func1[_ >: T, _ <: K], Func1[_ >: T, _ <: R])" -> "[use `groupBy` and `map`]", "groupByUntil(Func1[_ >: T, _ <: TKey], Func1[_ >: GroupedObservable[TKey, T], _ <: Observable[_ <: TDuration]])" -> "groupByUntil(T => K, (K, Observable[T]) => Observable[Any])", "groupByUntil(Func1[_ >: T, _ <: TKey], Func1[_ >: T, _ <: TValue], Func1[_ >: GroupedObservable[TKey, TValue], _ <: Observable[_ <: TDuration]])" -> "groupByUntil(T => K, T => V, (K, Observable[V]) => Observable[Any])", + "groupJoin(Observable[T2], Func1[_ >: T, _ <: Observable[D1]], Func1[_ >: T2, _ <: Observable[D2]], Func2[_ >: T, _ >: Observable[T2], _ <: R])" -> "groupJoin(Observable[S], T => Observable[Any], S => Observable[Any], (T, Observable[S]) => R)", "ignoreElements()" -> "[use `filter(_ => false)`]", + "join(Observable[TRight], Func1[T, Observable[TLeftDuration]], Func1[TRight, Observable[TRightDuration]], Func2[T, TRight, R])" -> "join(Observable[S], T => Observable[Any], S => Observable[Any], (T, S) => R)", "last(Func1[_ >: T, Boolean])" -> "[use `filter(predicate).last`]", "lastOrDefault(T)" -> "lastOrElse(=> U)", "lastOrDefault(T, Func1[_ >: T, Boolean])" -> "[use `filter(predicate).lastOrElse(default)`]", From ba9331fb8da7f8510a52ad2d3e2b02c1bbc829f8 Mon Sep 17 00:00:00 2001 From: zsxwing Date: Wed, 4 Jun 2014 16:56:37 +0800 Subject: [PATCH 13/21] Add pivot to RxScala --- .../rx/lang/scala/examples/RxScalaDemo.scala | 29 ++++++++++ .../main/scala/rx/lang/scala/Observable.scala | 57 +++++++++++++++++++ .../rx/lang/scala/CompletenessTest.scala | 1 + 3 files changed, 87 insertions(+) diff --git a/language-adaptors/rxjava-scala/src/examples/scala/rx/lang/scala/examples/RxScalaDemo.scala b/language-adaptors/rxjava-scala/src/examples/scala/rx/lang/scala/examples/RxScalaDemo.scala index 360ff6cb8f..c34a49d136 100644 --- a/language-adaptors/rxjava-scala/src/examples/scala/rx/lang/scala/examples/RxScalaDemo.scala +++ b/language-adaptors/rxjava-scala/src/examples/scala/rx/lang/scala/examples/RxScalaDemo.scala @@ -1295,4 +1295,33 @@ class RxScalaDemo extends JUnitSuite { (t1: String, t2: Observable[String]) => (t1, t2.toSeq.toBlocking.single)) o.take(3).toBlocking.foreach(println) } + + @Test def pivotExample() { + val o1 = (1 to 20).toObservable.groupBy(i => if (i <= 10) "x" else "y").map { + case (t: String, o: Observable[Int]) => (t, o.groupBy(i => if (i % 2 == 0) true else false)) + } + println("o1:") + o1.subscribe { + n => n match { + case (k1: String, o: Observable[(Boolean, Observable[Int])]) => + o.subscribe { + m => m match { + case (k2: Boolean, oi: Observable[Int]) => oi.subscribe(v => println(s"$k1 $k2 $v")) + } + } + } + } + val o2 = o1.pivot + println("o2:") + o2.subscribe { + n => n match { + case (k1: Boolean, o: Observable[(String, Observable[Int])]) => + o.subscribe { + m => m match { + case (k2: String, oi: Observable[Int]) => oi.subscribe(v => println(s"$k1 $k2 $v")) + } + } + } + } + } } diff --git a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala index eb75ef27a0..a12b3ac668 100644 --- a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala +++ b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala @@ -3813,6 +3813,63 @@ trait Observable[+T] def foreach(onNext: T => Unit, onError: Throwable => Unit, onComplete: () => Unit): Unit = { asJavaObservable.subscribe(onNext, onError, onComplete) } + + /** + * Pivots a sequence of `(K1, Observable[(K2, Observable[U])])`s emitted by an `Observable` so as to swap the group + * and and the set on which their items are grouped. + *

+ * + * + * For example an `Observable` such as `this = Observable[(String, Observable[(Boolean, Observable[Integer])])`: + *

    + *
  • o1.odd: 1, 3, 5, 7, 9 on Thread 1
  • + *
  • o1.even: 2, 4, 6, 8, 10 on Thread 1
  • + *
  • o2.odd: 11, 13, 15, 17, 19 on Thread 2
  • + *
  • o2.even: 12, 14, 16, 18, 20 on Thread 2
  • + *
+ * is pivoted to become `this = Observable[(Boolean, Observable[(String, Observable[Integer])])`: + * + *
    + *
  • odd.o1: 1, 3, 5, 7, 9 on Thread 1
  • + *
  • odd.o2: 11, 13, 15, 17, 19 on Thread 2
  • + *
  • even.o1: 2, 4, 6, 8, 10 on Thread 1
  • + *
  • even.o2: 12, 14, 16, 18, 20 on Thread 2
  • + *
+ *

+ * + *

+ * Note: A `(K, Observable[_])` will cache the items it is to emit until such time as it + * is subscribed to. For this reason, in order to avoid memory leaks, you should not simply ignore those + * `(K, Observable[_])`s that do not concern you. Instead, you can signal to them that they may + * discard their buffers by applying an operator like `take(0)` to them. + * + * @return an `Observable`containing a stream of nested `(K1, Observable[(K2, Observable[U])])`s with swapped + * inner-outer keys. + */ + def pivot[U, K1, K2](implicit evidence: Observable[T] <:< Observable[(K1, Observable[(K2, Observable[U])])]): Observable[(K2, Observable[(K1, Observable[U])])] = { + import rx.observables.{GroupedObservable => JGroupedObservable} + val f1 = new Func1[(K1, Observable[(K2, Observable[U])]), JGroupedObservable[K1, JGroupedObservable[K2, U]]]() { + override def call(t1: (K1, Observable[(K2, Observable[U])])): JGroupedObservable[K1, JGroupedObservable[K2, U]] = { + val jo = t1._2.asJavaObservable.asInstanceOf[rx.Observable[(K2, Observable[U])]].map[JGroupedObservable[K2, U]](new Func1[(K2, Observable[U]), JGroupedObservable[K2, U]]() { + override def call(t2: (K2, Observable[U])): JGroupedObservable[K2, U] = { + JGroupedObservable.from(t2._1, t2._2.asJavaObservable.asInstanceOf[rx.Observable[U]]) + } + }) + JGroupedObservable.from(t1._1, jo) + } + } + val o1: Observable[(K1, Observable[(K2, Observable[U])])] = this + val o2 = toScalaObservable[JGroupedObservable[K2, JGroupedObservable[K1, U]]](rx.Observable.pivot(o1.asJavaObservable.map(f1))) + o2.map { + (jgo1: JGroupedObservable[K2, JGroupedObservable[K1, U]]) => { + val jo = jgo1.map[(K1, Observable[U])](new Func1[JGroupedObservable[K1, U], (K1, Observable[U])]() { + override def call(jgo2: JGroupedObservable[K1, U]): (K1, Observable[U]) = (jgo2.getKey, toScalaObservable[U](jgo2)) + }) + (jgo1.getKey, toScalaObservable[(K1, Observable[U])](jo)) + } + } + } + } /** diff --git a/language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/CompletenessTest.scala b/language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/CompletenessTest.scala index 2c71ce749c..08794338b4 100644 --- a/language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/CompletenessTest.scala +++ b/language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/CompletenessTest.scala @@ -219,6 +219,7 @@ class CompletenessTest extends JUnitSuite { "mergeDelayError(Observable[_ <: Observable[_ <: T]])" -> "flattenDelayError(<:<[Observable[T], Observable[Observable[U]]])", "parallelMerge(Observable[Observable[T]], Int)" -> "parallelMerge(Int)(<:<[Observable[T], Observable[Observable[U]]])", "parallelMerge(Observable[Observable[T]], Int, Scheduler)" -> "parallelMerge(Int, Scheduler)(<:<[Observable[T], Observable[Observable[U]]])", + "pivot(Observable[GroupedObservable[K1, GroupedObservable[K2, T]]])" -> "pivot(<:<[Observable[T], Observable[(K1, Observable[(K2, Observable[U])])]])", "sequenceEqual(Observable[_ <: T], Observable[_ <: T])" -> "sequenceEqual(Observable[U])", "sequenceEqual(Observable[_ <: T], Observable[_ <: T], Func2[_ >: T, _ >: T, Boolean])" -> "sequenceEqual(Observable[U], (U, U) => Boolean)", "range(Int, Int)" -> "[use `(start until (start + count)).toObservable` instead of `range(start, count)`]", From 13bf771b93d5e232e79679c17bda953fcb03b968 Mon Sep 17 00:00:00 2001 From: zsxwing Date: Wed, 4 Jun 2014 17:03:23 +0800 Subject: [PATCH 14/21] Add longCount to RxScala --- .../src/main/scala/rx/lang/scala/Observable.scala | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala index a12b3ac668..e968daeac8 100644 --- a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala +++ b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala @@ -3870,6 +3870,16 @@ trait Observable[+T] } } + /** + * Returns an Observable that counts the total number of items emitted by the source Observable and emits this count as a 64-bit Long. + * + * + * + * @return an Observable that emits a single item: the number of items emitted by the source Observable as a 64-bit Long item + */ + def longCount: Observable[Long] = { + toScalaObservable[java.lang.Long](asJavaObservable.longCount()).map(_.longValue()) + } } /** From 71cf3022163e50c012c22b2af4826e9e8ebb9d1e Mon Sep 17 00:00:00 2001 From: zsxwing Date: Wed, 4 Jun 2014 17:56:48 +0800 Subject: [PATCH 15/21] Add toMultimap to RxScala --- .../rx/lang/scala/examples/RxScalaDemo.scala | 34 ++++++++ .../main/scala/rx/lang/scala/Observable.scala | 77 +++++++++++++++++++ .../rx/lang/scala/CompletenessTest.scala | 2 + 3 files changed, 113 insertions(+) diff --git a/language-adaptors/rxjava-scala/src/examples/scala/rx/lang/scala/examples/RxScalaDemo.scala b/language-adaptors/rxjava-scala/src/examples/scala/rx/lang/scala/examples/RxScalaDemo.scala index c34a49d136..3a04dd4524 100644 --- a/language-adaptors/rxjava-scala/src/examples/scala/rx/lang/scala/examples/RxScalaDemo.scala +++ b/language-adaptors/rxjava-scala/src/examples/scala/rx/lang/scala/examples/RxScalaDemo.scala @@ -865,6 +865,40 @@ class RxScalaDemo extends JUnitSuite { println(m.toBlockingObservable.single) } + @Test def toMultimapExample1(): Unit = { + val o : Observable[String] = List("alice", "bob", "carol", "allen", "clarke").toObservable + val keySelector = (s: String) => s.head + val m = o.toMultimap(keySelector) + println(m.toBlocking.single.mapValues(_.toList)) + } + + @Test def toMultimapExample2(): Unit = { + val o : Observable[String] = List("alice", "bob", "carol", "allen", "clarke").toObservable + val keySelector = (s: String) => s.head + val valueSelector = (s: String) => s.tail + val m = o.toMultimap(keySelector, valueSelector) + println(m.toBlocking.single.mapValues(_.toList)) + } + + @Test def toMultimapExample3(): Unit = { + val o : Observable[String] = List("alice", "bob", "carol", "allen", "clarke").toObservable + val keySelector = (s: String) => s.head + val valueSelector = (s: String) => s.tail + val mapFactory = () => Map('d' -> List("oug")) + val m = o.toMultimap(keySelector, valueSelector, mapFactory) + println(m.toBlocking.single.mapValues(_.toList)) + } + + @Test def toMultimapExample4(): Unit = { + val o : Observable[String] = List("alice", "bob", "carol", "allen", "clarke").toObservable + val keySelector = (s: String) => s.head + val valueSelector = (s: String) => s.tail + val mapFactory = () => Map('d' -> List("oug")) + val valueFactor = (k: Char) => List[String]() + val m = o.toMultimap(keySelector, valueSelector, mapFactory, valueFactor) + println(m.toBlocking.single) + } + @Test def containsExample(): Unit = { val o1 = List(1, 2, 3).toObservable.contains(2) assertTrue(o1.toBlockingObservable.single) diff --git a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala index e968daeac8..b48eac4e9f 100644 --- a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala +++ b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala @@ -3880,6 +3880,83 @@ trait Observable[+T] def longCount: Observable[Long] = { toScalaObservable[java.lang.Long](asJavaObservable.longCount()).map(_.longValue()) } + + /** + * Returns an Observable that emits a single `Map` that contains an `Seq` of items emitted by the + * source Observable keyed by a specified keySelector` function. + * + * + * + * @param keySelector the function that extracts the key from the source items to be used as key in the HashMap + * @return an Observable that emits a single item: a `Map` that contains an `Seq` of items mapped from + * the source Observable + */ + def toMultimap[K](keySelector: T => K): Observable[Map[K, Seq[T]]] = { + val thisJava = asJavaObservable.asInstanceOf[rx.Observable[T]] + val o: rx.Observable[java.util.Map[K, java.util.Collection[T]]] = thisJava.toMultimap[K](keySelector) + toScalaObservable[java.util.Map[K, java.util.Collection[T]]](o).map(m => m.toMap.mapValues(_.toSeq)) + } + + /** + * Returns an Observable that emits a single `Map` that contains an `Seq` of values extracted by a + * specified `valueSelector` function from items emitted by the source Observable, keyed by a + * specified `keySelector` function. + * + * + * + * @param keySelector the function that extracts a key from the source items to be used as key in the HashMap + * @param valueSelector the function that extracts a value from the source items to be used as value in the HashMap + * @return an Observable that emits a single item: a `Map` that contains an `Seq` of items mapped from + * the source Observable + */ + def toMultimap[K, V](keySelector: T => K, valueSelector: T => V): Observable[Map[K, Seq[V]]] = { + val thisJava = asJavaObservable.asInstanceOf[rx.Observable[T]] + val o: rx.Observable[java.util.Map[K, java.util.Collection[V]]] = thisJava.toMultimap[K, V](keySelector, valueSelector) + toScalaObservable[java.util.Map[K, java.util.Collection[V]]](o).map(m => m.toMap.mapValues(_.toSeq)) + } + + /** + * Returns an Observable that emits a single `Map`, returned by a specified mapFactory` function, that + * contains an `Seq` of values, extracted by a specified `valueSelector` function from items + * emitted by the source Observable and keyed by the `keySelector` function. + * + * + * + * @param keySelector the function that extracts a key from the source items to be used as the key in the Map + * @param valueSelector the function that extracts a value from the source items to be used as the value in the Map + * @param mapFactory he function that returns a Map instance to be used + * @return an Observable that emits a single item: a `Map` that contains a `Seq` items mapped from the source + * Observable + */ + def toMultimap[K, V](keySelector: T => K, valueSelector: T => V, mapFactory: () => Map[K, Seq[V]]): Observable[Map[K, Seq[V]]] = { + val thisJava = asJavaObservable.asInstanceOf[rx.Observable[T]] + val o: rx.Observable[java.util.Map[K, java.util.Collection[V]]] = thisJava.toMultimap[K, V](keySelector, valueSelector) + toScalaObservable[java.util.Map[K, java.util.Collection[V]]](o).map(m => mapFactory() ++ m.toMap.mapValues(_.toSeq)) + } + + /** + * Returns an Observable that emits a single `Map`, returned by a specified `mapFactory` function, that + * contains a custom `Seq` of values, extracted by a specified `valueSelector` function from + * items emitted by the source Observable, and keyed by the `keySelector` function. + * + * + * + * @param keySelector the function that extracts a key from the source items to be used as the key in the Map + * @param valueSelector the function that extracts a value from the source items to be used as the value in the Map + * @param mapFactory the function that returns a Map instance to be used + * @param collectionFactory the function that returns a Collection instance for a particular key to be used in the Map + * @return an Observable that emits a single item: a `Map` that contains the `Seq` of mapped items from + * the source Observable + */ + def toMultimap[K, V](keySelector: T => K, valueSelector: T => V, mapFactory: () => Map[K, Seq[V]], collectionFactory: K => Seq[V]): Observable[Map[K, Seq[V]]] = { + val thisJava = asJavaObservable.asInstanceOf[rx.Observable[T]] + val o: rx.Observable[java.util.Map[K, java.util.Collection[V]]] = thisJava.toMultimap[K, V](keySelector, valueSelector) + toScalaObservable[java.util.Map[K, java.util.Collection[V]]](o).map { + m => mapFactory() ++ m.toMap.map { + case (k: K, v: java.util.Collection[V]) => (k, collectionFactory(k) ++ v) + } + } + } } /** diff --git a/language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/CompletenessTest.scala b/language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/CompletenessTest.scala index 08794338b4..761afd3480 100644 --- a/language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/CompletenessTest.scala +++ b/language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/CompletenessTest.scala @@ -177,6 +177,8 @@ class CompletenessTest extends JUnitSuite { "timer(Long, Long, TimeUnit)" -> "timer(Duration, Duration)", "timer(Long, Long, TimeUnit, Scheduler)" -> "timer(Duration, Duration, Scheduler)", "toList()" -> "toSeq", + "toMultimap(Func1[_ >: T, _ <: K], Func1[_ >: T, _ <: V], Func0[_ <: Map[K, Collection[V]]])" -> "toMultimap(T => K, T => V, () => Map[K, Seq[V]])", + "toMultimap(Func1[_ >: T, _ <: K], Func1[_ >: T, _ <: V], Func0[_ <: Map[K, Collection[V]]], Func1[_ >: K, _ <: Collection[V]])" -> "toMultimap(T => K, T => V, () => Map[K, Seq[V]], K => Seq[V])", "toSortedList()" -> "[Sorting is already done in Scala's collection library, use `.toSeq.map(_.sorted)`]", "toSortedList(Func2[_ >: T, _ >: T, Integer])" -> "[Sorting is already done in Scala's collection library, use `.toSeq.map(_.sortWith(f))`]", "window(Observable[U])" -> "window(=> Observable[Any])", From 06fee6e923d847e5d3cc7ac11d7b9ec3e05c54d1 Mon Sep 17 00:00:00 2001 From: zsxwing Date: Thu, 5 Jun 2014 14:04:03 +0800 Subject: [PATCH 16/21] Update pivotExample --- .../rx/lang/scala/examples/RxScalaDemo.scala | 30 ++++++------------- 1 file changed, 9 insertions(+), 21 deletions(-) diff --git a/language-adaptors/rxjava-scala/src/examples/scala/rx/lang/scala/examples/RxScalaDemo.scala b/language-adaptors/rxjava-scala/src/examples/scala/rx/lang/scala/examples/RxScalaDemo.scala index 3a04dd4524..71e66b3f81 100644 --- a/language-adaptors/rxjava-scala/src/examples/scala/rx/lang/scala/examples/RxScalaDemo.scala +++ b/language-adaptors/rxjava-scala/src/examples/scala/rx/lang/scala/examples/RxScalaDemo.scala @@ -1332,30 +1332,18 @@ class RxScalaDemo extends JUnitSuite { @Test def pivotExample() { val o1 = (1 to 20).toObservable.groupBy(i => if (i <= 10) "x" else "y").map { - case (t: String, o: Observable[Int]) => (t, o.groupBy(i => if (i % 2 == 0) true else false)) + case (t: String, o: Observable[Int]) => (t, o.groupBy(i => i % 2 == 0)) } println("o1:") - o1.subscribe { - n => n match { - case (k1: String, o: Observable[(Boolean, Observable[Int])]) => - o.subscribe { - m => m match { - case (k2: Boolean, oi: Observable[Int]) => oi.subscribe(v => println(s"$k1 $k2 $v")) - } - } - } - } + (for ((k1, o) <- o1; + (k2, vs) <- o; + v <- vs + ) yield (k1, k2, v)).subscribe(println(_)) val o2 = o1.pivot println("o2:") - o2.subscribe { - n => n match { - case (k1: Boolean, o: Observable[(String, Observable[Int])]) => - o.subscribe { - m => m match { - case (k2: String, oi: Observable[Int]) => oi.subscribe(v => println(s"$k1 $k2 $v")) - } - } - } - } + (for ((k1, o) <- o2; + (k2, vs) <- o; + v <- vs + ) yield (k1, k2, v)).subscribe(println(_)) } } From c6da63f6cdcffd6bbf1c02d0507bb348f6dd77e8 Mon Sep 17 00:00:00 2001 From: zsxwing Date: Thu, 5 Jun 2014 16:51:51 +0800 Subject: [PATCH 17/21] Rewrite toMultimap --- .../rx/lang/scala/examples/RxScalaDemo.scala | 11 ++--- .../main/scala/rx/lang/scala/Observable.scala | 43 +++++++++++-------- 2 files changed, 31 insertions(+), 23 deletions(-) diff --git a/language-adaptors/rxjava-scala/src/examples/scala/rx/lang/scala/examples/RxScalaDemo.scala b/language-adaptors/rxjava-scala/src/examples/scala/rx/lang/scala/examples/RxScalaDemo.scala index 71e66b3f81..9b162a1f12 100644 --- a/language-adaptors/rxjava-scala/src/examples/scala/rx/lang/scala/examples/RxScalaDemo.scala +++ b/language-adaptors/rxjava-scala/src/examples/scala/rx/lang/scala/examples/RxScalaDemo.scala @@ -19,6 +19,7 @@ import java.io.IOException import java.util.concurrent.CountDownLatch import java.util.concurrent.TimeUnit +import scala.collection.mutable import scala.concurrent.duration.Duration import scala.concurrent.duration.DurationInt import scala.concurrent.duration.DurationLong @@ -881,10 +882,10 @@ class RxScalaDemo extends JUnitSuite { } @Test def toMultimapExample3(): Unit = { - val o : Observable[String] = List("alice", "bob", "carol", "allen", "clarke").toObservable + val o: Observable[String] = List("alice", "bob", "carol", "allen", "clarke").toObservable val keySelector = (s: String) => s.head val valueSelector = (s: String) => s.tail - val mapFactory = () => Map('d' -> List("oug")) + val mapFactory: () => mutable.Map[Char, mutable.Buffer[String]] = () => mutable.Map('d' -> mutable.ListBuffer("oug")) val m = o.toMultimap(keySelector, valueSelector, mapFactory) println(m.toBlocking.single.mapValues(_.toList)) } @@ -893,9 +894,9 @@ class RxScalaDemo extends JUnitSuite { val o : Observable[String] = List("alice", "bob", "carol", "allen", "clarke").toObservable val keySelector = (s: String) => s.head val valueSelector = (s: String) => s.tail - val mapFactory = () => Map('d' -> List("oug")) - val valueFactor = (k: Char) => List[String]() - val m = o.toMultimap(keySelector, valueSelector, mapFactory, valueFactor) + val mapFactory: () => mutable.Map[Char, mutable.Buffer[String]] = () => mutable.Map('d' -> mutable.ListBuffer("oug")) + val valueFactory = (k: Char) => mutable.ListBuffer[String]() + val m = o.toMultimap(keySelector, valueSelector, mapFactory, valueFactory) println(m.toBlocking.single) } diff --git a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala index b48eac4e9f..1fda9a706e 100644 --- a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala +++ b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala @@ -102,6 +102,7 @@ trait Observable[+T] import scala.collection.JavaConverters._ import scala.collection.Seq import scala.concurrent.duration.{Duration, TimeUnit, MILLISECONDS} + import scala.collection.mutable import rx.functions._ import rx.lang.scala.observables.BlockingObservable import ImplicitFunctionConversions._ @@ -3891,10 +3892,8 @@ trait Observable[+T] * @return an Observable that emits a single item: a `Map` that contains an `Seq` of items mapped from * the source Observable */ - def toMultimap[K](keySelector: T => K): Observable[Map[K, Seq[T]]] = { - val thisJava = asJavaObservable.asInstanceOf[rx.Observable[T]] - val o: rx.Observable[java.util.Map[K, java.util.Collection[T]]] = thisJava.toMultimap[K](keySelector) - toScalaObservable[java.util.Map[K, java.util.Collection[T]]](o).map(m => m.toMap.mapValues(_.toSeq)) + def toMultimap[K](keySelector: T => K): Observable[scala.collection.Map[K, Seq[T]]] = { + toMultimap(keySelector, k => k) } /** @@ -3909,10 +3908,8 @@ trait Observable[+T] * @return an Observable that emits a single item: a `Map` that contains an `Seq` of items mapped from * the source Observable */ - def toMultimap[K, V](keySelector: T => K, valueSelector: T => V): Observable[Map[K, Seq[V]]] = { - val thisJava = asJavaObservable.asInstanceOf[rx.Observable[T]] - val o: rx.Observable[java.util.Map[K, java.util.Collection[V]]] = thisJava.toMultimap[K, V](keySelector, valueSelector) - toScalaObservable[java.util.Map[K, java.util.Collection[V]]](o).map(m => m.toMap.mapValues(_.toSeq)) + def toMultimap[K, V](keySelector: T => K, valueSelector: T => V): Observable[scala.collection.Map[K, Seq[V]]] = { + toMultimap(keySelector, valueSelector, () => mutable.Map[K, mutable.Buffer[V]]()) } /** @@ -3928,10 +3925,8 @@ trait Observable[+T] * @return an Observable that emits a single item: a `Map` that contains a `Seq` items mapped from the source * Observable */ - def toMultimap[K, V](keySelector: T => K, valueSelector: T => V, mapFactory: () => Map[K, Seq[V]]): Observable[Map[K, Seq[V]]] = { - val thisJava = asJavaObservable.asInstanceOf[rx.Observable[T]] - val o: rx.Observable[java.util.Map[K, java.util.Collection[V]]] = thisJava.toMultimap[K, V](keySelector, valueSelector) - toScalaObservable[java.util.Map[K, java.util.Collection[V]]](o).map(m => mapFactory() ++ m.toMap.mapValues(_.toSeq)) + def toMultimap[K, V](keySelector: T => K, valueSelector: T => V, mapFactory: () => mutable.Map[K, mutable.Buffer[V]]): Observable[scala.collection.Map[K, Seq[V]]] = { + toMultimap(keySelector, valueSelector, mapFactory, k => mutable.ListBuffer[V]()) } /** @@ -3948,12 +3943,24 @@ trait Observable[+T] * @return an Observable that emits a single item: a `Map` that contains the `Seq` of mapped items from * the source Observable */ - def toMultimap[K, V](keySelector: T => K, valueSelector: T => V, mapFactory: () => Map[K, Seq[V]], collectionFactory: K => Seq[V]): Observable[Map[K, Seq[V]]] = { - val thisJava = asJavaObservable.asInstanceOf[rx.Observable[T]] - val o: rx.Observable[java.util.Map[K, java.util.Collection[V]]] = thisJava.toMultimap[K, V](keySelector, valueSelector) - toScalaObservable[java.util.Map[K, java.util.Collection[V]]](o).map { - m => mapFactory() ++ m.toMap.map { - case (k: K, v: java.util.Collection[V]) => (k, collectionFactory(k) ++ v) + def toMultimap[K, V](keySelector: T => K, valueSelector: T => V, mapFactory: () => mutable.Map[K, mutable.Buffer[V]], collectionFactory: K => mutable.Buffer[V]): Observable[scala.collection.Map[K, Seq[V]]] = { + lift { + (subscriber: Subscriber[scala.collection.Map[K, Seq[V]]]) => { + val map = mapFactory().withDefault(collectionFactory) + Subscriber[T]( + subscriber, + (t: T) => { + val key = keySelector(t) + val value = map(key) + value += valueSelector(t) + map += key -> value: Unit + }, + e => subscriber.onError(e), + () => { + subscriber.onNext(map) + subscriber.onCompleted() + } + ) } } } From 0f29382995514a7f8b37d872bd2617c5ec0c5fd6 Mon Sep 17 00:00:00 2001 From: zsxwing Date: Thu, 5 Jun 2014 17:04:04 +0800 Subject: [PATCH 18/21] Replace unnecessary with details --- .../src/test/scala/rx/lang/scala/CompletenessTest.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/CompletenessTest.scala b/language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/CompletenessTest.scala index 761afd3480..feecfb1a4b 100644 --- a/language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/CompletenessTest.scala +++ b/language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/CompletenessTest.scala @@ -77,8 +77,8 @@ class CompletenessTest extends JUnitSuite { "buffer(Observable[B])" -> "buffer(=> Observable[Any])", "buffer(Observable[B], Int)" -> "buffer(Observable[Any], Int)", "buffer(Observable[_ <: TOpening], Func1[_ >: TOpening, _ <: Observable[_ <: TClosing]])" -> "buffer(Observable[Opening], Opening => Observable[Any])", - "cast(Class[R])" -> unnecessary, - "collect(R, Action2[R, _ >: T])" -> unnecessary, + "cast(Class[R])" -> "[RxJava needs this one because `rx.Observable` is invariant. `Observable` in RxScala is covariant and does not need this operator.]", + "collect(R, Action2[R, _ >: T])" -> "foldLeft(R)((R, T) => R)", "contains(Any)" -> "contains(U)", "count()" -> "length", "debounce(Func1[_ >: T, _ <: Observable[U]])" -> "debounce(T => Observable[Any])", From dbf4fc0fa475045f29250418ea692e4e3643591a Mon Sep 17 00:00:00 2001 From: zsxwing Date: Thu, 5 Jun 2014 22:36:18 +0800 Subject: [PATCH 19/21] Change the return type, and add tests --- .../rx/lang/scala/examples/RxScalaDemo.scala | 8 ++-- .../main/scala/rx/lang/scala/Observable.scala | 45 ++++++++++--------- .../rx/lang/scala/CompletenessTest.scala | 4 +- .../scala/rx/lang/scala/ObservableTest.scala | 39 ++++++++++++++++ 4 files changed, 70 insertions(+), 26 deletions(-) diff --git a/language-adaptors/rxjava-scala/src/examples/scala/rx/lang/scala/examples/RxScalaDemo.scala b/language-adaptors/rxjava-scala/src/examples/scala/rx/lang/scala/examples/RxScalaDemo.scala index 9b162a1f12..480ff90c13 100644 --- a/language-adaptors/rxjava-scala/src/examples/scala/rx/lang/scala/examples/RxScalaDemo.scala +++ b/language-adaptors/rxjava-scala/src/examples/scala/rx/lang/scala/examples/RxScalaDemo.scala @@ -870,7 +870,7 @@ class RxScalaDemo extends JUnitSuite { val o : Observable[String] = List("alice", "bob", "carol", "allen", "clarke").toObservable val keySelector = (s: String) => s.head val m = o.toMultimap(keySelector) - println(m.toBlocking.single.mapValues(_.toList)) + println(m.toBlocking.single) } @Test def toMultimapExample2(): Unit = { @@ -878,14 +878,14 @@ class RxScalaDemo extends JUnitSuite { val keySelector = (s: String) => s.head val valueSelector = (s: String) => s.tail val m = o.toMultimap(keySelector, valueSelector) - println(m.toBlocking.single.mapValues(_.toList)) + println(m.toBlocking.single) } @Test def toMultimapExample3(): Unit = { val o: Observable[String] = List("alice", "bob", "carol", "allen", "clarke").toObservable val keySelector = (s: String) => s.head val valueSelector = (s: String) => s.tail - val mapFactory: () => mutable.Map[Char, mutable.Buffer[String]] = () => mutable.Map('d' -> mutable.ListBuffer("oug")) + val mapFactory = () => mutable.Map('d' -> mutable.Buffer("oug")) val m = o.toMultimap(keySelector, valueSelector, mapFactory) println(m.toBlocking.single.mapValues(_.toList)) } @@ -894,7 +894,7 @@ class RxScalaDemo extends JUnitSuite { val o : Observable[String] = List("alice", "bob", "carol", "allen", "clarke").toObservable val keySelector = (s: String) => s.head val valueSelector = (s: String) => s.tail - val mapFactory: () => mutable.Map[Char, mutable.Buffer[String]] = () => mutable.Map('d' -> mutable.ListBuffer("oug")) + val mapFactory = () => mutable.Map('d' -> mutable.ListBuffer("oug")) val valueFactory = (k: Char) => mutable.ListBuffer[String]() val m = o.toMultimap(keySelector, valueSelector, mapFactory, valueFactory) println(m.toBlocking.single) diff --git a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala index 1fda9a706e..85a1fbac74 100644 --- a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala +++ b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala @@ -3913,47 +3913,52 @@ trait Observable[+T] } /** - * Returns an Observable that emits a single `Map`, returned by a specified mapFactory` function, that - * contains an `Seq` of values, extracted by a specified `valueSelector` function from items - * emitted by the source Observable and keyed by the `keySelector` function. + * Returns an Observable that emits a single `mutable.Map[K, mutable.Buffer[V]]`, returned by a specified `mapFactory` function, that + * contains values, extracted by a specified `valueSelector` function from items emitted by the source Observable and + * keyed by the `keySelector` function. * * * * @param keySelector the function that extracts a key from the source items to be used as the key in the Map * @param valueSelector the function that extracts a value from the source items to be used as the value in the Map - * @param mapFactory he function that returns a Map instance to be used - * @return an Observable that emits a single item: a `Map` that contains a `Seq` items mapped from the source - * Observable + * @param mapFactory he function that returns a `mutable.Map[K, mutable.Buffer[V]]` instance to be used + * @return an Observable that emits a single item: a `mutable.Map[K, mutable.Buffer[V]]` that contains items mapped + * from the source Observable */ - def toMultimap[K, V](keySelector: T => K, valueSelector: T => V, mapFactory: () => mutable.Map[K, mutable.Buffer[V]]): Observable[scala.collection.Map[K, Seq[V]]] = { - toMultimap(keySelector, valueSelector, mapFactory, k => mutable.ListBuffer[V]()) + def toMultimap[K, V, M <: mutable.Map[K, mutable.Buffer[V]]](keySelector: T => K, valueSelector: T => V, mapFactory: () => M): Observable[M] = { + toMultimap[K, V, mutable.Buffer[V], M](keySelector, valueSelector, mapFactory, k => mutable.Buffer[V]()) } /** - * Returns an Observable that emits a single `Map`, returned by a specified `mapFactory` function, that - * contains a custom `Seq` of values, extracted by a specified `valueSelector` function from - * items emitted by the source Observable, and keyed by the `keySelector` function. + * Returns an Observable that emits a single `mutable.Map[K, B]`, returned by a specified `mapFactory` function, that + * contains values extracted by a specified `valueSelector` function from items emitted by the source Observable, and + * keyed by the `keySelector` function. * * * * @param keySelector the function that extracts a key from the source items to be used as the key in the Map * @param valueSelector the function that extracts a value from the source items to be used as the value in the Map * @param mapFactory the function that returns a Map instance to be used - * @param collectionFactory the function that returns a Collection instance for a particular key to be used in the Map - * @return an Observable that emits a single item: a `Map` that contains the `Seq` of mapped items from - * the source Observable + * @param collectionFactory the function that returns a `mutable.Buffer[V]` instance for a particular key to be used in the Map + * @return an Observable that emits a single item: a `mutable.Map[K, B]` that contains mapped items from the source Observable */ - def toMultimap[K, V](keySelector: T => K, valueSelector: T => V, mapFactory: () => mutable.Map[K, mutable.Buffer[V]], collectionFactory: K => mutable.Buffer[V]): Observable[scala.collection.Map[K, Seq[V]]] = { + def toMultimap[K, V, B <: mutable.Buffer[V], M <: mutable.Map[K, B]](keySelector: T => K, valueSelector: T => V, mapFactory: () => M, collectionFactory: K => B): Observable[M] = { + // It's complicated to convert `mutable.Map[K, mutable.Buffer[V]]` to `java.util.Map[K, java.util.Collection[V]]`, + // so RxScala implements `toMultimap` directly. + // Choosing `mutable.Buffer/Map` is because `append/update` is necessary to implement an efficient `toMultimap`. lift { - (subscriber: Subscriber[scala.collection.Map[K, Seq[V]]]) => { - val map = mapFactory().withDefault(collectionFactory) + (subscriber: Subscriber[M]) => { + val map = mapFactory() Subscriber[T]( subscriber, (t: T) => { val key = keySelector(t) - val value = map(key) - value += valueSelector(t) - map += key -> value: Unit + val values = map.get(key) match { + case Some(v) => v + case None => collectionFactory(key) + } + values += valueSelector(t) + map += key -> values: Unit }, e => subscriber.onError(e), () => { diff --git a/language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/CompletenessTest.scala b/language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/CompletenessTest.scala index feecfb1a4b..f3482f5376 100644 --- a/language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/CompletenessTest.scala +++ b/language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/CompletenessTest.scala @@ -177,8 +177,8 @@ class CompletenessTest extends JUnitSuite { "timer(Long, Long, TimeUnit)" -> "timer(Duration, Duration)", "timer(Long, Long, TimeUnit, Scheduler)" -> "timer(Duration, Duration, Scheduler)", "toList()" -> "toSeq", - "toMultimap(Func1[_ >: T, _ <: K], Func1[_ >: T, _ <: V], Func0[_ <: Map[K, Collection[V]]])" -> "toMultimap(T => K, T => V, () => Map[K, Seq[V]])", - "toMultimap(Func1[_ >: T, _ <: K], Func1[_ >: T, _ <: V], Func0[_ <: Map[K, Collection[V]]], Func1[_ >: K, _ <: Collection[V]])" -> "toMultimap(T => K, T => V, () => Map[K, Seq[V]], K => Seq[V])", + "toMultimap(Func1[_ >: T, _ <: K], Func1[_ >: T, _ <: V], Func0[_ <: Map[K, Collection[V]]])" -> "toMultimap(T => K, T => V, () => M)", + "toMultimap(Func1[_ >: T, _ <: K], Func1[_ >: T, _ <: V], Func0[_ <: Map[K, Collection[V]]], Func1[_ >: K, _ <: Collection[V]])" -> "toMultimap(T => K, T => V, () => M, K => B)", "toSortedList()" -> "[Sorting is already done in Scala's collection library, use `.toSeq.map(_.sorted)`]", "toSortedList(Func2[_ >: T, _ >: T, Integer])" -> "[Sorting is already done in Scala's collection library, use `.toSeq.map(_.sortWith(f))`]", "window(Observable[U])" -> "window(=> Observable[Any])", diff --git a/language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/ObservableTest.scala b/language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/ObservableTest.scala index a5d64335b2..6b53dc6cf4 100644 --- a/language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/ObservableTest.scala +++ b/language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/ObservableTest.scala @@ -15,6 +15,7 @@ */ package rx.lang.scala +import scala.collection.mutable import scala.collection.mutable.ListBuffer import scala.concurrent.{Future, Await} import scala.concurrent.duration.Duration @@ -228,4 +229,42 @@ class ObservableTests extends JUnitSuite { val o = Observable.empty.orElse(-1) assertEquals(List(-1), o.toBlocking.toList) } + + @Test + def testToMultimap() { + val o = Observable.items("a", "b", "cc", "dd").toMultimap(_.length) + val expected = Map(1 -> List("a", "b"), 2 -> List("cc", "dd")) + assertEquals(expected, o.toBlocking.single) + } + + @Test + def testToMultimapWithValueSelector() { + val o = Observable.items("a", "b", "cc", "dd").toMultimap(_.length, s => s + s) + val expected = Map(1 -> List("aa", "bb"), 2 -> List("cccc", "dddd")) + assertEquals(expected, o.toBlocking.single) + } + + @Test + def testToMultimapWithMapFactory() { + val m = mutable.Map[Int, mutable.Buffer[String]]() + val o = Observable.items("a", "b", "cc", "dd").toMultimap(_.length, s => s, () => m) + val expected = Map(1 -> List("a", "b"), 2 -> List("cc", "dd")) + val r = o.toBlocking.single + assertTrue(m eq r) // check same instance + assertEquals(expected, r) + } + + @Test + def testToMultimapWithCollectionFactory() { + val m = mutable.Map[Int, mutable.Buffer[String]]() + val ls = List(mutable.Buffer[String](), mutable.Buffer[String]()) + val o = Observable.items("a", "b", "cc", "dd").toMultimap(_.length, s => s, () => m, (i: Int) => ls(i - 1)) + val expected = Map(1 -> List("a", "b"), 2 -> List("cc", "dd")) + val r = o.toBlocking.single + assertTrue(m eq r) // check same instance + assertTrue(ls(0) eq r(1)) // check same instance + assertTrue(ls(1) eq r(2)) // check same instance + assertEquals(expected, r) + } + } From a6f22b29fc62482b188b0ddd5df7a1c44e999c34 Mon Sep 17 00:00:00 2001 From: zsxwing Date: Fri, 6 Jun 2014 13:05:34 +0800 Subject: [PATCH 20/21] comments, docs and names improvement --- .../scala/rx/lang/scala/examples/RxScalaDemo.scala | 4 ++-- .../src/main/scala/rx/lang/scala/Observable.scala | 12 ++++++------ .../test/scala/rx/lang/scala/ObservableTest.scala | 14 +++++++++----- 3 files changed, 17 insertions(+), 13 deletions(-) diff --git a/language-adaptors/rxjava-scala/src/examples/scala/rx/lang/scala/examples/RxScalaDemo.scala b/language-adaptors/rxjava-scala/src/examples/scala/rx/lang/scala/examples/RxScalaDemo.scala index 480ff90c13..6d66cf15ca 100644 --- a/language-adaptors/rxjava-scala/src/examples/scala/rx/lang/scala/examples/RxScalaDemo.scala +++ b/language-adaptors/rxjava-scala/src/examples/scala/rx/lang/scala/examples/RxScalaDemo.scala @@ -895,8 +895,8 @@ class RxScalaDemo extends JUnitSuite { val keySelector = (s: String) => s.head val valueSelector = (s: String) => s.tail val mapFactory = () => mutable.Map('d' -> mutable.ListBuffer("oug")) - val valueFactory = (k: Char) => mutable.ListBuffer[String]() - val m = o.toMultimap(keySelector, valueSelector, mapFactory, valueFactory) + val bufferFactory = (k: Char) => mutable.ListBuffer[String]() + val m = o.toMultimap(keySelector, valueSelector, mapFactory, bufferFactory) println(m.toBlocking.single) } diff --git a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala index 85a1fbac74..0d8ddcda20 100644 --- a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala +++ b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala @@ -3915,7 +3915,7 @@ trait Observable[+T] /** * Returns an Observable that emits a single `mutable.Map[K, mutable.Buffer[V]]`, returned by a specified `mapFactory` function, that * contains values, extracted by a specified `valueSelector` function from items emitted by the source Observable and - * keyed by the `keySelector` function. + * keyed by the `keySelector` function. `mutable.Map[K, B]` is the same instance create by `mapFactory`. * * * @@ -3932,17 +3932,17 @@ trait Observable[+T] /** * Returns an Observable that emits a single `mutable.Map[K, B]`, returned by a specified `mapFactory` function, that * contains values extracted by a specified `valueSelector` function from items emitted by the source Observable, and - * keyed by the `keySelector` function. + * keyed by the `keySelector` function. `mutable.Map[K, B]` is the same instance create by `mapFactory`. * * * * @param keySelector the function that extracts a key from the source items to be used as the key in the Map * @param valueSelector the function that extracts a value from the source items to be used as the value in the Map * @param mapFactory the function that returns a Map instance to be used - * @param collectionFactory the function that returns a `mutable.Buffer[V]` instance for a particular key to be used in the Map - * @return an Observable that emits a single item: a `mutable.Map[K, B]` that contains mapped items from the source Observable + * @param bufferFactory the function that returns a `mutable.Buffer[V]` instance for a particular key to be used in the Map + * @return an Observable that emits a single item: a `mutable.Map[K, B]` that contains mapped items from the source Observable. */ - def toMultimap[K, V, B <: mutable.Buffer[V], M <: mutable.Map[K, B]](keySelector: T => K, valueSelector: T => V, mapFactory: () => M, collectionFactory: K => B): Observable[M] = { + def toMultimap[K, V, B <: mutable.Buffer[V], M <: mutable.Map[K, B]](keySelector: T => K, valueSelector: T => V, mapFactory: () => M, bufferFactory: K => B): Observable[M] = { // It's complicated to convert `mutable.Map[K, mutable.Buffer[V]]` to `java.util.Map[K, java.util.Collection[V]]`, // so RxScala implements `toMultimap` directly. // Choosing `mutable.Buffer/Map` is because `append/update` is necessary to implement an efficient `toMultimap`. @@ -3955,7 +3955,7 @@ trait Observable[+T] val key = keySelector(t) val values = map.get(key) match { case Some(v) => v - case None => collectionFactory(key) + case None => bufferFactory(key) } values += valueSelector(t) map += key -> values: Unit diff --git a/language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/ObservableTest.scala b/language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/ObservableTest.scala index 6b53dc6cf4..01068c393e 100644 --- a/language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/ObservableTest.scala +++ b/language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/ObservableTest.scala @@ -250,20 +250,24 @@ class ObservableTests extends JUnitSuite { val o = Observable.items("a", "b", "cc", "dd").toMultimap(_.length, s => s, () => m) val expected = Map(1 -> List("a", "b"), 2 -> List("cc", "dd")) val r = o.toBlocking.single - assertTrue(m eq r) // check same instance + // r should be the same instance created by the `mapFactory` + assertTrue(m eq r) assertEquals(expected, r) } @Test - def testToMultimapWithCollectionFactory() { + def testToMultimapWithBufferFactory() { val m = mutable.Map[Int, mutable.Buffer[String]]() val ls = List(mutable.Buffer[String](), mutable.Buffer[String]()) val o = Observable.items("a", "b", "cc", "dd").toMultimap(_.length, s => s, () => m, (i: Int) => ls(i - 1)) val expected = Map(1 -> List("a", "b"), 2 -> List("cc", "dd")) val r = o.toBlocking.single - assertTrue(m eq r) // check same instance - assertTrue(ls(0) eq r(1)) // check same instance - assertTrue(ls(1) eq r(2)) // check same instance + // r should be the same instance created by the `mapFactory` + assertTrue(m eq r) + // r(1) should be the same instance created by the first calling `bufferFactory` + assertTrue(ls(0) eq r(1)) + // r(2) should be the same instance created by the second calling `bufferFactory` + assertTrue(ls(1) eq r(2)) assertEquals(expected, r) } From c5605bc566e82b21136530ae002a8931f4021b63 Mon Sep 17 00:00:00 2001 From: zsxwing Date: Sun, 8 Jun 2014 12:16:38 +0800 Subject: [PATCH 21/21] Add subscribeExample to explain subscribe and foreach are same --- .../rx/lang/scala/examples/RxScalaDemo.scala | 23 +++++++++++++++++++ 1 file changed, 23 insertions(+) diff --git a/language-adaptors/rxjava-scala/src/examples/scala/rx/lang/scala/examples/RxScalaDemo.scala b/language-adaptors/rxjava-scala/src/examples/scala/rx/lang/scala/examples/RxScalaDemo.scala index 6d66cf15ca..e0e92c83eb 100644 --- a/language-adaptors/rxjava-scala/src/examples/scala/rx/lang/scala/examples/RxScalaDemo.scala +++ b/language-adaptors/rxjava-scala/src/examples/scala/rx/lang/scala/examples/RxScalaDemo.scala @@ -48,6 +48,29 @@ import rx.lang.scala.schedulers._ @Ignore // Since this doesn't do automatic testing, don't increase build time unnecessarily class RxScalaDemo extends JUnitSuite { + @Test def subscribeExample() { + val o = Observable.items(1, 2, 3) + + // Generally, we have two methods, `subscribe` and `foreach`, to listen to the messages from an Observable. + // `foreach` is just an alias to `subscribe`. + o.subscribe( + n => println(n), + e => e.printStackTrace(), + () => println("done") + ) + + o.foreach( + n => println(n), + e => e.printStackTrace(), + () => println("done") + ) + + // For-comprehension is also an alternative, if you are only interested in `onNext` + for (i <- o) { + println(i) + } + } + @Test def intervalExample() { val o = Observable.interval(200 millis).take(5) o.subscribe(n => println("n = " + n))