diff --git a/language-adaptors/rxjava-scala/src/examples/scala/rx/lang/scala/examples/RxScalaDemo.scala b/language-adaptors/rxjava-scala/src/examples/scala/rx/lang/scala/examples/RxScalaDemo.scala index f4181c99ac..7d1ee11cd3 100644 --- a/language-adaptors/rxjava-scala/src/examples/scala/rx/lang/scala/examples/RxScalaDemo.scala +++ b/language-adaptors/rxjava-scala/src/examples/scala/rx/lang/scala/examples/RxScalaDemo.scala @@ -645,4 +645,67 @@ class RxScalaDemo extends JUnitSuite { val o : Observable[String] = List("alice", "bob", "carol").toObservable assertEquals(List("alice", "bob", "carol"), o.retry(3).toBlockingObservable.toList) } + + @Test def liftExample1(): Unit = { + // Add "No. " in front of each item + val o = List(1, 2, 3).toObservable.lift { + subscriber: Subscriber[String] => + Subscriber[Int]( + subscriber, + (v: Int) => subscriber.onNext("No. " + v), + e => subscriber.onError(e), + () => subscriber.onCompleted + ) + }.toBlockingObservable.toList + println(o) + } + + @Test def liftExample2(): Unit = { + // Split the input Strings with " " + val splitStringsWithSpace = (subscriber: Subscriber[String]) => { + Subscriber[String]( + subscriber, + (v: String) => v.split(" ").foreach(subscriber.onNext(_)), + e => subscriber.onError(e), + () => subscriber.onCompleted + ) + } + + // Convert the input Strings to Chars + val stringsToChars = (subscriber: Subscriber[Char]) => { + Subscriber[String]( + subscriber, + (v: String) => v.foreach(subscriber.onNext(_)), + e => subscriber.onError(e), + () => subscriber.onCompleted + ) + } + + // Skip the first n items. If the length of source is less than n, throw an IllegalArgumentException + def skipWithException[T](n: Int) = (subscriber: Subscriber[T]) => { + var count = 0 + Subscriber[T]( + subscriber, + (v: T) => { + if (count >= n) subscriber.onNext(v) + count += 1 + }, + e => subscriber.onError(e), + () => if (count < n) subscriber.onError(new IllegalArgumentException("There is no enough items")) else subscriber.onCompleted + ) + } + + val o = List("RxJava – Reactive Extensions for the JVM").toObservable + .lift(splitStringsWithSpace) + .map(_.toLowerCase) + .lift(stringsToChars) + .filter(_.isLetter) + .lift(skipWithException(100)) + try { + o.toBlockingObservable.toList + } + catch { + case e: IllegalArgumentException => println("IllegalArgumentException from skipWithException") + } + } } diff --git a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/JavaConversions.scala b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/JavaConversions.scala index c1d03667b4..df86c08f80 100644 --- a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/JavaConversions.scala +++ b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/JavaConversions.scala @@ -54,4 +54,12 @@ object JavaConversions { val asJavaObservable = observable } } + + implicit def toJavaOperator[T, R](operator: Subscriber[R] => Subscriber[T]): rx.Observable.Operator[R, T] = { + new rx.Observable.Operator[R, T] { + override def call(subscriber: rx.Subscriber[_ >: R]): rx.Subscriber[_ >: T] = { + toJavaSubscriber[T](operator(toScalaSubscriber[R](subscriber))) + } + } + } } \ No newline at end of file diff --git a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala index 78b5cedd97..35eae53767 100644 --- a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala +++ b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Observable.scala @@ -2057,7 +2057,28 @@ trait Observable[+T] * @see [[Observable.first]] */ def head: Observable[T] = first - + + /** + * Returns an Observable that emits all items except the first one, or raises an `UnsupportedOperationException` + * if the source Observable is empty. + * + * @return an Observable that emits all items except the first one, or raises an `UnsupportedOperationException` + * if the source Observable is empty. + */ + def tail: Observable[T] = { + lift { + (subscriber: Subscriber[T]) => { + var isFirst = true + Subscriber[T]( + subscriber, + (v: T) => if(isFirst) isFirst = false else subscriber.onNext(v), + e => subscriber.onError(e), + () => if(isFirst) subscriber.onError(new UnsupportedOperationException("tail of empty Observable")) else subscriber.onCompleted + ) + } + } + } + /** * Returns an Observable that emits the last item emitted by the source Observable or notifies observers of * an `NoSuchElementException` if the source Observable is empty. @@ -2537,6 +2558,23 @@ trait Observable[+T] toScalaObservable[util.Map[K, V]](o).map(m => mapFactory() ++ m.toMap) } + /** + * Lift a function to the current Observable and return a new Observable that when subscribed to will pass + * the values of the current Observable through the function. + *

+ * In other words, this allows chaining Observers together on an Observable for acting on the values within + * the Observable. + * {{{ + * observable.map(...).filter(...).take(5).lift(new ObserverA()).lift(new ObserverB(...)).subscribe() + * }}} + * + * @param operator + * @return an Observable that emits values that are the result of applying the bind function to the values + * of the current Observable + */ + def lift[R](operator: Subscriber[R] => Subscriber[T]): Observable[R] = { + toScalaObservable(asJavaObservable.lift(toJavaOperator[T, R](operator))) + } } /** diff --git a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Subscriber.scala b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Subscriber.scala index 9f7091123e..f19abebde1 100644 --- a/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Subscriber.scala +++ b/language-adaptors/rxjava-scala/src/main/scala/rx/lang/scala/Subscriber.scala @@ -49,4 +49,14 @@ object Subscriber extends ObserverFactoryMethods[Subscriber] { override def onCompleted(): Unit = c() }) } + + def apply[T](subscriber: Subscriber[_], onNext: T => Unit, onError: Throwable => Unit, onCompleted: () => Unit): Subscriber[T] = { + val n = onNext; val e = onError; val c = onCompleted + // Java calls XXX; Scala receives XXX. + Subscriber(new rx.Subscriber[T](subscriber.asJavaSubscriber) { + override def onNext(value: T): Unit = n(value) + override def onError(error: Throwable): Unit = e(error) + override def onCompleted(): Unit = c() + }) + } } diff --git a/language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/CompletenessTest.scala b/language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/CompletenessTest.scala index d4778d7d5e..eca5168da5 100644 --- a/language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/CompletenessTest.scala +++ b/language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/CompletenessTest.scala @@ -79,6 +79,7 @@ class CompletenessTest extends JUnitSuite { "firstOrDefault(T)" -> "firstOrElse(=> U)", "firstOrDefault(Func1[_ >: T, Boolean], T)" -> "[use `.filter(condition).firstOrElse(default)`]", "groupBy(Func1[_ >: T, _ <: K], Func1[_ >: T, _ <: R])" -> "[use `groupBy` and `map`]", + "lift(Operator[_ <: R, _ >: T])" -> "lift(Subscriber[R] => Subscriber[T])", "mapMany(Func1[_ >: T, _ <: Observable[_ <: R]])" -> "flatMap(T => Observable[R])", "mapWithIndex(Func2[_ >: T, Integer, _ <: R])" -> "[combine `zipWithIndex` with `map` or with a for comprehension]", "onErrorResumeNext(Func1[Throwable, _ <: Observable[_ <: T]])" -> "onErrorResumeNext(Throwable => Observable[U])", @@ -89,16 +90,28 @@ class CompletenessTest extends JUnitSuite { "parallel(Func1[Observable[T], Observable[R]], Scheduler)" -> "parallel(Observable[T] => Observable[R], Scheduler)", "reduce(Func2[T, T, T])" -> "reduce((U, U) => U)", "reduce(R, Func2[R, _ >: T, R])" -> "foldLeft(R)((R, T) => R)", + "retry()" -> "retry()", "scan(Func2[T, T, T])" -> unnecessary, "scan(R, Func2[R, _ >: T, R])" -> "scan(R)((R, T) => R)", "skip(Int)" -> "drop(Int)", + "skip(Long, TimeUnit)" -> "drop(Duration)", + "skip(Long, TimeUnit, Scheduler)" -> "drop(Duration, Scheduler)", "skipWhile(Func1[_ >: T, Boolean])" -> "dropWhile(T => Boolean)", "skipWhileWithIndex(Func2[_ >: T, Integer, Boolean])" -> unnecessary, "startWith(Iterable[T])" -> "[unnecessary because we can just use `++` instead]", + "skipLast(Int)" -> "dropRight(Int)", + "skipLast(Long, TimeUnit)" -> "dropRight(Duration)", + "skipLast(Long, TimeUnit, Scheduler)" -> "dropRight(Duration, Scheduler)", "takeFirst()" -> "first", "takeFirst(Func1[_ >: T, Boolean])" -> commentForFirstWithPredicate, "takeLast(Int)" -> "takeRight(Int)", "takeWhileWithIndex(Func2[_ >: T, _ >: Integer, Boolean])" -> "[use `.zipWithIndex.takeWhile{case (elem, index) => condition}.map(_._1)`]", + "timeout(Func0[_ <: Observable[U]], Func1[_ >: T, _ <: Observable[V]], Observable[_ <: T])" -> "timeout(() => Observable[U], T => Observable[V], Observable[O])", + "timeout(Func1[_ >: T, _ <: Observable[V]], Observable[_ <: T])" -> "timeout(() => Observable[U], T => Observable[V])", + "timeout(Long, TimeUnit, Observable[_ <: T])" -> "timeout(Duration, Observable[U])", + "timeout(Long, TimeUnit, Observable[_ <: T], Scheduler)" -> "timeout(Duration, Observable[U], Scheduler)", + "timer(Long, Long, TimeUnit)" -> "timer(Duration, Duration)", + "timer(Long, Long, TimeUnit, Scheduler)" -> "timer(Duration, Duration, Scheduler)", "toList()" -> "toSeq", "toSortedList()" -> "[Sorting is already done in Scala's collection library, use `.toSeq.map(_.sorted)`]", "toSortedList(Func2[_ >: T, _ >: T, Integer])" -> "[Sorting is already done in Scala's collection library, use `.toSeq.map(_.sortWith(f))`]", diff --git a/language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/ObservableTest.scala b/language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/ObservableTest.scala index 397907a6cb..029622d638 100644 --- a/language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/ObservableTest.scala +++ b/language-adaptors/rxjava-scala/src/test/scala/rx/lang/scala/ObservableTest.scala @@ -146,16 +146,26 @@ class ObservableTests extends JUnitSuite { assertEquals(l2, l1) } - /* - @Test def testHead() { - val observer = mock(classOf[Observer[Int]]) - val o = Observable().head - val sub = o.subscribe(observer) - - verify(observer, never).onNext(any(classOf[Int])) - verify(observer, never).onCompleted() - verify(observer, times(1)).onError(any(classOf[NoSuchElementException])) - } - */ + @Test def testHead() { + val o: Observable[String] = List("alice", "bob", "carol").toObservable.head + assertEquals(List("alice"), o.toBlockingObservable.toList) + } + + @Test(expected = classOf[NoSuchElementException]) + def testHeadWithEmptyObservable() { + val o: Observable[String] = List[String]().toObservable.head + o.toBlockingObservable.toList + } + @Test def testTail() { + val o: Observable[String] = List("alice", "bob", "carol").toObservable.tail + assertEquals(List("bob", "carol"), o.toBlockingObservable.toList) + assertEquals(List("bob", "carol"), o.toBlockingObservable.toList) + } + + @Test(expected = classOf[UnsupportedOperationException]) + def testTailWithEmptyObservable() { + val o: Observable[String] = List[String]().toObservable.tail + o.toBlockingObservable.toList + } }