diff --git a/language-adaptors/rxjava-scala/build.gradle b/language-adaptors/rxjava-scala/build.gradle index d0d012019b..bac27a0c5b 100644 --- a/language-adaptors/rxjava-scala/build.gradle +++ b/language-adaptors/rxjava-scala/build.gradle @@ -21,8 +21,7 @@ sourceSets { } dependencies { - // pinning to 2.10.1 as having issues with 2.10.2 - compile 'org.scala-lang:scala-library:2.10.1' + compile 'org.scala-lang:scala-library:2.10.2' compile project(':rxjava-core') diff --git a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/RxImplicits.scala b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/RxImplicits.scala index 39bd349413..88deadb810 100644 --- a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/RxImplicits.scala +++ b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/RxImplicits.scala @@ -106,7 +106,7 @@ object RxImplicits { * type never escapes the for-comprehension */ implicit class ScalaObservable[A](wrapped: Observable[A]) { - def map[B](f: A => B): Observable[B] = wrapped.map(f) + def map[B](f: A => B): Observable[B] = wrapped.map[B](f) def flatMap[B](f: A => Observable[B]): Observable[B] = wrapped.mapMany(f) def foreach(f: A => Unit): Unit = wrapped.toBlockingObservable.forEach(f) def withFilter(p: A => Boolean): WithFilter = new WithFilter(p) @@ -147,7 +147,7 @@ class UnitTestSuite extends JUnitSuite { class ObservableWithException(s: Subscription, values: String*) extends Observable[String] { var t: Thread = null - override def subscribe(observer: Observer[String]): Subscription = { + override def subscribe(observer: Observer[_ >: String]): Subscription = { println("ObservableWithException subscribed to ...") t = new Thread(new Runnable() { override def run() { @@ -272,6 +272,18 @@ class UnitTestSuite extends JUnitSuite { assertSubscribeReceives(synchronized)(1, 2, 3) } + @Test def testZip2() { + val colors: Observable[String] = Observable.from("red", "green", "blue") + val names: Observable[String] = Observable.from("lion-o", "cheetara", "panthro") + + case class Character(color: String, name: String) + + val cheetara = Character("green", "cheetara") + val panthro = Character("blue", "panthro") + val characters = Observable.zip[Character, String, String](colors, names, Character.apply _) + assertSubscribeReceives(characters)(cheetara, panthro) + } + @Test def testZip3() { val numbers = Observable.from(1, 2, 3) val colors = Observable.from("red", "green", "blue") @@ -283,7 +295,7 @@ class UnitTestSuite extends JUnitSuite { val cheetara = Character(2, "green", "cheetara") val panthro = Character(3, "blue", "panthro") - val characters = Observable.zip(numbers, colors, names, Character.apply _) + val characters = Observable.zip[Character, Int, String, String](numbers, colors, names, Character.apply _) assertSubscribeReceives(characters)(liono, cheetara, panthro) } @@ -299,7 +311,7 @@ class UnitTestSuite extends JUnitSuite { val cheetara = Character(2, "green", "cheetara", false) val panthro = Character(3, "blue", "panthro", false) - val characters = Observable.zip(numbers, colors, names, isLeader, Character.apply _) + val characters = Observable.zip[Character, Int, String, String, Boolean](numbers, colors, names, isLeader, Character.apply _) assertSubscribeReceives(characters)(liono, cheetara, panthro) } @@ -459,18 +471,9 @@ class UnitTestSuite extends JUnitSuite { assertSubscribeReceives(skipped)(3, 4) } - /** - * Both testTake and testTakeWhileWithIndex exposed a bug with unsubscribes not properly propagating. - * observable.take(2) produces onNext(first), onNext(second), and 4 onCompleteds - * it should produce onNext(first), onNext(second), and 1 onCompleted - * - * Switching to Observable.create(OperationTake.take(observable, 2)) works as expected - */ @Test def testTake { - import rx.operators._ - val observable = Observable.from(1, 2, 3, 4, 5) - val took = Observable.create(OperationTake.take(observable, 2)) + val took = observable.take(2) assertSubscribeReceives(took)(1, 2) } @@ -480,11 +483,11 @@ class UnitTestSuite extends JUnitSuite { assertSubscribeReceives(took)(1, 3, 5) } - /*@Test def testTakeWhileWithIndex { - val observable = Observable.from(1, 3, 5, 6, 7, 9, 11, 12, 13, 15, 17) - val took = observable.takeWhileWithIndex((i: Int, idx: Int) => isOdd(i) && idx > 4) - assertSubscribeReceives(took)(9, 11) - }*/ + @Test def testTakeWhileWithIndex { + val observable = Observable.from(1, 3, 5, 7, 9, 11, 12, 13, 15, 17) + val took = observable.takeWhileWithIndex((i: Int, idx: Int) => isOdd(i) && idx < 8) + assertSubscribeReceives(took)(1, 3, 5, 7, 9, 11) + } @Test def testTakeLast { val observable = Observable.from(1, 2, 3, 4, 5, 6, 7, 8, 9) diff --git a/rxjava-core/src/main/java/rx/Observable.java b/rxjava-core/src/main/java/rx/Observable.java index fff321277b..68935749ba 100644 --- a/rxjava-core/src/main/java/rx/Observable.java +++ b/rxjava-core/src/main/java/rx/Observable.java @@ -1023,7 +1023,7 @@ public static Observable zip(Observable w0, Obs * Observables, results in an item that will be emitted by the resulting Observable * @return an Observable that emits the zipped results */ - public static Observable zip(Observable w0, Observable w1, Observable w2, Observable w3, Func4 reduceFunction) { + public static Observable zip(Observable w0, Observable w1, Observable w2, Observable w3, Func4 reduceFunction) { return create(OperationZip.zip(w0, w1, w2, w3, reduceFunction)); }