Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add lift to rxscala #1124

Merged
merged 4 commits into from
May 5, 2014
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -645,4 +645,67 @@ class RxScalaDemo extends JUnitSuite {
val o : Observable[String] = List("alice", "bob", "carol").toObservable
assertEquals(List("alice", "bob", "carol"), o.retry(3).toBlockingObservable.toList)
}

@Test def liftExample1(): Unit = {
// Add "No. " in front of each item
val o = List(1, 2, 3).toObservable.lift {
subscriber: Subscriber[String] =>
Subscriber[Int](
subscriber,
(v: Int) => subscriber.onNext("No. " + v),
e => subscriber.onError(e),
() => subscriber.onCompleted
)
}.toBlockingObservable.toList
println(o)
}

@Test def liftExample2(): Unit = {
// Split the input Strings with " "
val splitStringsWithSpace = (subscriber: Subscriber[String]) => {
Subscriber[String](
subscriber,
(v: String) => v.split(" ").foreach(subscriber.onNext(_)),
e => subscriber.onError(e),
() => subscriber.onCompleted
)
}

// Convert the input Strings to Chars
val stringsToChars = (subscriber: Subscriber[Char]) => {
Subscriber[String](
subscriber,
(v: String) => v.foreach(subscriber.onNext(_)),
e => subscriber.onError(e),
() => subscriber.onCompleted
)
}

// Skip the first n items. If the length of source is less than n, throw an IllegalArgumentException
def skipWithException[T](n: Int) = (subscriber: Subscriber[T]) => {
var count = 0
Subscriber[T](
subscriber,
(v: T) => {
if (count >= n) subscriber.onNext(v)
count += 1
},
e => subscriber.onError(e),
() => if (count < n) subscriber.onError(new IllegalArgumentException("There is no enough items")) else subscriber.onCompleted
)
}

val o = List("RxJava – Reactive Extensions for the JVM").toObservable
.lift(splitStringsWithSpace)
.map(_.toLowerCase)
.lift(stringsToChars)
.filter(_.isLetter)
.lift(skipWithException(100))
try {
o.toBlockingObservable.toList
}
catch {
case e: IllegalArgumentException => println("IllegalArgumentException from skipWithException")
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The examples in RxScalaDemo should be nice code and only in a style that we encourage users to adopt (modulo toBlockingObservable and some Thread.sleep...). Imho this does not quite apply for liftExample2 ;-)
It would be very nice to have a more realistic example, which also illustrates this:

   * In other words, this allows chaining Observers together on an Observable for acting on the values within
   * the Observable.
   * {{{
   * observable.map(...).filter(...).take(5).lift(new ObserverA()).lift(new ObserverB(...)).subscribe()
   * }}}

}
Original file line number Diff line number Diff line change
Expand Up @@ -54,4 +54,12 @@ object JavaConversions {
val asJavaObservable = observable
}
}

implicit def toJavaOperator[T, R](operator: Subscriber[R] => Subscriber[T]): rx.Observable.Operator[R, T] = {
new rx.Observable.Operator[R, T] {
override def call(subscriber: rx.Subscriber[_ >: R]): rx.Subscriber[_ >: T] = {
toJavaSubscriber[T](operator(toScalaSubscriber[R](subscriber)))
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2057,7 +2057,28 @@ trait Observable[+T]
* @see [[Observable.first]]
*/
def head: Observable[T] = first


/**
* Returns an Observable that emits all items except the first one, or raises an `UnsupportedOperationException`
* if the source Observable is empty.
*
* @return an Observable that emits all items except the first one, or raises an `UnsupportedOperationException`
* if the source Observable is empty.
*/
def tail: Observable[T] = {
lift {
(subscriber: Subscriber[T]) => {
var isFirst = true
Subscriber[T](
subscriber,
(v: T) => if(isFirst) isFirst = false else subscriber.onNext(v),
e => subscriber.onError(e),
() => if(isFirst) subscriber.onError(new UnsupportedOperationException("tail of empty Observable")) else subscriber.onCompleted
)
}
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here you're implementing logic in Scala. Shouldn't this be in RxJava core? Maybe not the whole tail method, because that's Scala specific, but asserting that an Observable contains a certain number of elements is something quite general, so maybe some part of this should go into RxJava core?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe the following operator in liftExample2 is better?

    // Skip the first n items. If the length of source is less than n, throw an IllegalArgumentException
    def skipWithException[T](n: Int) = (subscriber: Subscriber[T]) => {
      var count = 0
      Subscriber[T](
        subscriber,
        (v: T) => {
          if (count >= n) subscriber.onNext(v)
          count += 1
        },
        e => subscriber.onError(e),
        () => if (count < n) subscriber.onError(new IllegalArgumentException("There is no enough items")) else subscriber.onCompleted
      )
    }

If only asserting the length, because we can not get the length until onCompleted, how to handle the previous onNext if the assert fails in onCompleted.

However, I'm not sure if it's worth to add it into RxJava...

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the purpose of lift is providing the expansion capability so that RxJava APIs can keep stable and don't need to provide too many operator. So I don't think we need to add it into RxJava.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@samuelgruetter what's your opinion about the current tail implementation? If you still don't like it, I can remove it from this PR.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can merge this, since the signature is definitely right, and if we can move the logic to core later, all the better.


/**
* Returns an Observable that emits the last item emitted by the source Observable or notifies observers of
* an `NoSuchElementException` if the source Observable is empty.
Expand Down Expand Up @@ -2537,6 +2558,23 @@ trait Observable[+T]
toScalaObservable[util.Map[K, V]](o).map(m => mapFactory() ++ m.toMap)
}

/**
* Lift a function to the current Observable and return a new Observable that when subscribed to will pass
* the values of the current Observable through the function.
* <p>
* In other words, this allows chaining Observers together on an Observable for acting on the values within
* the Observable.
* {{{
* observable.map(...).filter(...).take(5).lift(new ObserverA()).lift(new ObserverB(...)).subscribe()
* }}}
*
* @param operator
* @return an Observable that emits values that are the result of applying the bind function to the values
* of the current Observable
*/
def lift[R](operator: Subscriber[R] => Subscriber[T]): Observable[R] = {
toScalaObservable(asJavaObservable.lift(toJavaOperator[T, R](operator)))
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • Instead of building a new rx.Observable.Operator yourself, you should add a (maybe implicit) conversion from Subscriber[_ >: R] => Subscriber[_ >: T] to rx.Observable.Operator in ImplicitFunctionConversions.
  • Subscriber is declared contravariant, so writing Subscriber[_ >: R] is redundant.
  • I think you can implement this without casting (but maybe you need type ascriptions, or explicitly type a val)
  • operator.call is probably not what you want. You're converting operator to an RxJava Func1 using an implicit conversion... Use an IDE which underlines all implicit conversion ;-)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now it looks much nicer than before :) But again, why wildcards?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Already removed it.

}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,4 +49,14 @@ object Subscriber extends ObserverFactoryMethods[Subscriber] {
override def onCompleted(): Unit = c()
})
}

def apply[T](subscriber: Subscriber[_], onNext: T => Unit, onError: Throwable => Unit, onCompleted: () => Unit): Subscriber[T] = {
val n = onNext; val e = onError; val c = onCompleted
// Java calls XXX; Scala receives XXX.
Subscriber(new rx.Subscriber[T](subscriber.asJavaSubscriber) {
override def onNext(value: T): Unit = n(value)
override def onError(error: Throwable): Unit = e(error)
override def onCompleted(): Unit = c()
})
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ class CompletenessTest extends JUnitSuite {
"firstOrDefault(T)" -> "firstOrElse(=> U)",
"firstOrDefault(Func1[_ >: T, Boolean], T)" -> "[use `.filter(condition).firstOrElse(default)`]",
"groupBy(Func1[_ >: T, _ <: K], Func1[_ >: T, _ <: R])" -> "[use `groupBy` and `map`]",
"lift(Operator[_ <: R, _ >: T])" -> "lift(Subscriber[R] => Subscriber[T])",
"mapMany(Func1[_ >: T, _ <: Observable[_ <: R]])" -> "flatMap(T => Observable[R])",
"mapWithIndex(Func2[_ >: T, Integer, _ <: R])" -> "[combine `zipWithIndex` with `map` or with a for comprehension]",
"onErrorResumeNext(Func1[Throwable, _ <: Observable[_ <: T]])" -> "onErrorResumeNext(Throwable => Observable[U])",
Expand All @@ -89,16 +90,28 @@ class CompletenessTest extends JUnitSuite {
"parallel(Func1[Observable[T], Observable[R]], Scheduler)" -> "parallel(Observable[T] => Observable[R], Scheduler)",
"reduce(Func2[T, T, T])" -> "reduce((U, U) => U)",
"reduce(R, Func2[R, _ >: T, R])" -> "foldLeft(R)((R, T) => R)",
"retry()" -> "retry()",
"scan(Func2[T, T, T])" -> unnecessary,
"scan(R, Func2[R, _ >: T, R])" -> "scan(R)((R, T) => R)",
"skip(Int)" -> "drop(Int)",
"skip(Long, TimeUnit)" -> "drop(Duration)",
"skip(Long, TimeUnit, Scheduler)" -> "drop(Duration, Scheduler)",
"skipWhile(Func1[_ >: T, Boolean])" -> "dropWhile(T => Boolean)",
"skipWhileWithIndex(Func2[_ >: T, Integer, Boolean])" -> unnecessary,
"startWith(Iterable[T])" -> "[unnecessary because we can just use `++` instead]",
"skipLast(Int)" -> "dropRight(Int)",
"skipLast(Long, TimeUnit)" -> "dropRight(Duration)",
"skipLast(Long, TimeUnit, Scheduler)" -> "dropRight(Duration, Scheduler)",
"takeFirst()" -> "first",
"takeFirst(Func1[_ >: T, Boolean])" -> commentForFirstWithPredicate,
"takeLast(Int)" -> "takeRight(Int)",
"takeWhileWithIndex(Func2[_ >: T, _ >: Integer, Boolean])" -> "[use `.zipWithIndex.takeWhile{case (elem, index) => condition}.map(_._1)`]",
"timeout(Func0[_ <: Observable[U]], Func1[_ >: T, _ <: Observable[V]], Observable[_ <: T])" -> "timeout(() => Observable[U], T => Observable[V], Observable[O])",
"timeout(Func1[_ >: T, _ <: Observable[V]], Observable[_ <: T])" -> "timeout(() => Observable[U], T => Observable[V])",
"timeout(Long, TimeUnit, Observable[_ <: T])" -> "timeout(Duration, Observable[U])",
"timeout(Long, TimeUnit, Observable[_ <: T], Scheduler)" -> "timeout(Duration, Observable[U], Scheduler)",
"timer(Long, Long, TimeUnit)" -> "timer(Duration, Duration)",
"timer(Long, Long, TimeUnit, Scheduler)" -> "timer(Duration, Duration, Scheduler)",
"toList()" -> "toSeq",
"toSortedList()" -> "[Sorting is already done in Scala's collection library, use `.toSeq.map(_.sorted)`]",
"toSortedList(Func2[_ >: T, _ >: T, Integer])" -> "[Sorting is already done in Scala's collection library, use `.toSeq.map(_.sortWith(f))`]",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,16 +146,26 @@ class ObservableTests extends JUnitSuite {
assertEquals(l2, l1)
}

/*
@Test def testHead() {
val observer = mock(classOf[Observer[Int]])
val o = Observable().head
val sub = o.subscribe(observer)

verify(observer, never).onNext(any(classOf[Int]))
verify(observer, never).onCompleted()
verify(observer, times(1)).onError(any(classOf[NoSuchElementException]))
}
*/
@Test def testHead() {
val o: Observable[String] = List("alice", "bob", "carol").toObservable.head
assertEquals(List("alice"), o.toBlockingObservable.toList)
}

@Test(expected = classOf[NoSuchElementException])
def testHeadWithEmptyObservable() {
val o: Observable[String] = List[String]().toObservable.head
o.toBlockingObservable.toList
}

@Test def testTail() {
val o: Observable[String] = List("alice", "bob", "carol").toObservable.tail
assertEquals(List("bob", "carol"), o.toBlockingObservable.toList)
assertEquals(List("bob", "carol"), o.toBlockingObservable.toList)
}

@Test(expected = classOf[UnsupportedOperationException])
def testTailWithEmptyObservable() {
val o: Observable[String] = List[String]().toObservable.tail
o.toBlockingObservable.toList
}
}