Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add lift to rxscala #1124

Merged
merged 4 commits into from
May 5, 2014
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -645,4 +645,60 @@ class RxScalaDemo extends JUnitSuite {
val o : Observable[String] = List("alice", "bob", "carol").toObservable
assertEquals(List("alice", "bob", "carol"), o.retry(3).toBlockingObservable.toList)
}

@Test def liftExample1(): Unit = {
// Add "No. " in front of each item
val o = List(1, 2, 3).toObservable.lift {
subscriber: Subscriber[String] =>
Subscriber(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

even if it's not required, I'd write Subscriber[Int] here, so that people immediately see that the function is of type Subscriber[String] => Subscriber[Int]

subscriber,
v => subscriber.onNext("No. " + v),
e => subscriber.onError(e),
() => subscriber.onCompleted
)
}.toBlockingObservable.toList
println(o)
}

@Test def liftExample2(): Unit = {
val o = Observable {
subscriber: Subscriber[Int] => {
for (i <- 1 to 10 if !subscriber.isUnsubscribed) {
println("emit " + i)
subscriber.onNext(i)
}
if (!subscriber.isUnsubscribed) {
println("emit onCompleted")
subscriber.onCompleted
}
}
}
// Take the first 5 items
val take = 5
val result = o.lift {
subscriber: Subscriber[String] =>
var index = 0
Subscriber(
subscriber,
v => {
if (index < take) {
subscriber.onNext("No. " + v)
}
if (index == take - 1) {
subscriber.onCompleted
}
index += 1
},
e => subscriber.onError(e),
() => subscriber.onCompleted
)
}.toBlockingObservable.toList
println(result)
// emit 1
// emit 2
// emit 3
// emit 4
// emit 5
// List(No. 1, No. 2, No. 3, No. 4, No. 5)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The examples in RxScalaDemo should be nice code and only in a style that we encourage users to adopt (modulo toBlockingObservable and some Thread.sleep...). Imho this does not quite apply for liftExample2 ;-)
It would be very nice to have a more realistic example, which also illustrates this:

   * In other words, this allows chaining Observers together on an Observable for acting on the values within
   * the Observable.
   * {{{
   * observable.map(...).filter(...).take(5).lift(new ObserverA()).lift(new ObserverB(...)).subscribe()
   * }}}

}
Original file line number Diff line number Diff line change
Expand Up @@ -2537,6 +2537,29 @@ trait Observable[+T]
toScalaObservable[util.Map[K, V]](o).map(m => mapFactory() ++ m.toMap)
}

/**
* Lift a function to the current Observable and return a new Observable that when subscribed to will pass
* the values of the current Observable through the function.
* <p>
* In other words, this allows chaining Observers together on an Observable for acting on the values within
* the Observable.
* {{{
* observable.map(...).filter(...).take(5).lift(new ObserverA()).lift(new ObserverB(...)).subscribe()
* }}}
*
* @param operator
* @return an Observable that emits values that are the result of applying the bind function to the values
* of the current Observable
*/
def lift[R](operator: Subscriber[_ >: R] => Subscriber[_ >: T]): Observable[R] = {
val thisJava = asJavaObservable.asInstanceOf[rx.Observable[T]]
val thatJava = thisJava.lift[R](new rx.Observable.Operator[R, T] {
override def call(subscriber: rx.Subscriber[_ >: R]): rx.Subscriber[_ >: T] = {
toJavaSubscriber(operator.call(toScalaSubscriber(subscriber)))
}
})
toScalaObservable(thatJava)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • Instead of building a new rx.Observable.Operator yourself, you should add a (maybe implicit) conversion from Subscriber[_ >: R] => Subscriber[_ >: T] to rx.Observable.Operator in ImplicitFunctionConversions.
  • Subscriber is declared contravariant, so writing Subscriber[_ >: R] is redundant.
  • I think you can implement this without casting (but maybe you need type ascriptions, or explicitly type a val)
  • operator.call is probably not what you want. You're converting operator to an RxJava Func1 using an implicit conversion... Use an IDE which underlines all implicit conversion ;-)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now it looks much nicer than before :) But again, why wildcards?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Already removed it.

}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,4 +49,14 @@ object Subscriber extends ObserverFactoryMethods[Subscriber] {
override def onCompleted(): Unit = c()
})
}

def apply[T](subscriber: Subscriber[_], onNext: T => Unit, onError: Throwable => Unit, onCompleted: () => Unit): Subscriber[T] = {
val n = onNext; val e = onError; val c = onCompleted
// Java calls XXX; Scala receives XXX.
Subscriber(new rx.Subscriber[T](subscriber.asJavaSubscriber) {
override def onNext(value: T): Unit = n(value)
override def onError(error: Throwable): Unit = e(error)
override def onCompleted(): Unit = c()
})
}
}