Skip to content

Commit

Permalink
updated to Scala 2.10.2 again, repaired Scala tests, generalized two …
Browse files Browse the repository at this point in the history
…more zip methods
  • Loading branch information
jmhofer committed Aug 31, 2013
1 parent 1b32a19 commit ecd3705
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 22 deletions.
3 changes: 1 addition & 2 deletions language-adaptors/rxjava-scala/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,7 @@ sourceSets {
}

dependencies {
// pinning to 2.10.1 as having issues with 2.10.2
compile 'org.scala-lang:scala-library:2.10.1'
compile 'org.scala-lang:scala-library:2.10.2'

compile project(':rxjava-core')

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ object RxImplicits {
* type never escapes the for-comprehension
*/
implicit class ScalaObservable[A](wrapped: Observable[A]) {
def map[B](f: A => B): Observable[B] = wrapped.map(f)
def map[B](f: A => B): Observable[B] = wrapped.map[B](f)
def flatMap[B](f: A => Observable[B]): Observable[B] = wrapped.mapMany(f)
def foreach(f: A => Unit): Unit = wrapped.toBlockingObservable.forEach(f)
def withFilter(p: A => Boolean): WithFilter = new WithFilter(p)
Expand Down Expand Up @@ -147,7 +147,7 @@ class UnitTestSuite extends JUnitSuite {
class ObservableWithException(s: Subscription, values: String*) extends Observable[String] {
var t: Thread = null

override def subscribe(observer: Observer[String]): Subscription = {
override def subscribe(observer: Observer[_ >: String]): Subscription = {
println("ObservableWithException subscribed to ...")
t = new Thread(new Runnable() {
override def run() {
Expand Down Expand Up @@ -272,6 +272,18 @@ class UnitTestSuite extends JUnitSuite {
assertSubscribeReceives(synchronized)(1, 2, 3)
}

@Test def testZip2() {
val colors: Observable[String] = Observable.from("red", "green", "blue")
val names: Observable[String] = Observable.from("lion-o", "cheetara", "panthro")

case class Character(color: String, name: String)

val cheetara = Character("green", "cheetara")
val panthro = Character("blue", "panthro")
val characters = Observable.zip[Character, String, String](colors, names, Character.apply _)
assertSubscribeReceives(characters)(cheetara, panthro)
}

@Test def testZip3() {
val numbers = Observable.from(1, 2, 3)
val colors = Observable.from("red", "green", "blue")
Expand All @@ -283,7 +295,7 @@ class UnitTestSuite extends JUnitSuite {
val cheetara = Character(2, "green", "cheetara")
val panthro = Character(3, "blue", "panthro")

val characters = Observable.zip(numbers, colors, names, Character.apply _)
val characters = Observable.zip[Character, Int, String, String](numbers, colors, names, Character.apply _)
assertSubscribeReceives(characters)(liono, cheetara, panthro)
}

Expand All @@ -299,7 +311,7 @@ class UnitTestSuite extends JUnitSuite {
val cheetara = Character(2, "green", "cheetara", false)
val panthro = Character(3, "blue", "panthro", false)

val characters = Observable.zip(numbers, colors, names, isLeader, Character.apply _)
val characters = Observable.zip[Character, Int, String, String, Boolean](numbers, colors, names, isLeader, Character.apply _)
assertSubscribeReceives(characters)(liono, cheetara, panthro)
}

Expand Down Expand Up @@ -459,18 +471,9 @@ class UnitTestSuite extends JUnitSuite {
assertSubscribeReceives(skipped)(3, 4)
}

/**
* Both testTake and testTakeWhileWithIndex exposed a bug with unsubscribes not properly propagating.
* observable.take(2) produces onNext(first), onNext(second), and 4 onCompleteds
* it should produce onNext(first), onNext(second), and 1 onCompleted
*
* Switching to Observable.create(OperationTake.take(observable, 2)) works as expected
*/
@Test def testTake {
import rx.operators._

val observable = Observable.from(1, 2, 3, 4, 5)
val took = Observable.create(OperationTake.take(observable, 2))
val took = observable.take(2)
assertSubscribeReceives(took)(1, 2)
}

Expand All @@ -480,11 +483,11 @@ class UnitTestSuite extends JUnitSuite {
assertSubscribeReceives(took)(1, 3, 5)
}

/*@Test def testTakeWhileWithIndex {
val observable = Observable.from(1, 3, 5, 6, 7, 9, 11, 12, 13, 15, 17)
val took = observable.takeWhileWithIndex((i: Int, idx: Int) => isOdd(i) && idx > 4)
assertSubscribeReceives(took)(9, 11)
}*/
@Test def testTakeWhileWithIndex {
val observable = Observable.from(1, 3, 5, 7, 9, 11, 12, 13, 15, 17)
val took = observable.takeWhileWithIndex((i: Int, idx: Int) => isOdd(i) && idx < 8)
assertSubscribeReceives(took)(1, 3, 5, 7, 9, 11)
}

@Test def testTakeLast {
val observable = Observable.from(1, 2, 3, 4, 5, 6, 7, 8, 9)
Expand Down
2 changes: 1 addition & 1 deletion rxjava-core/src/main/java/rx/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -1023,7 +1023,7 @@ public static <R, T0, T1, T2> Observable<R> zip(Observable<? extends T0> w0, Obs
* Observables, results in an item that will be emitted by the resulting Observable
* @return an Observable that emits the zipped results
*/
public static <R, T0, T1, T2, T3> Observable<R> zip(Observable<T0> w0, Observable<T1> w1, Observable<T2> w2, Observable<T3> w3, Func4<? super T0, ? super T1, ? super T2, ? super T3, ? extends R> reduceFunction) {
public static <R, T0, T1, T2, T3> Observable<R> zip(Observable<? extends T0> w0, Observable<? extends T1> w1, Observable<? extends T2> w2, Observable<? extends T3> w3, Func4<? super T0, ? super T1, ? super T2, ? super T3, ? extends R> reduceFunction) {
return create(OperationZip.zip(w0, w1, w2, w3, reduceFunction));
}

Expand Down

0 comments on commit ecd3705

Please sign in to comment.