-
Notifications
You must be signed in to change notification settings - Fork 7.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add lift to rxscala #1124
Add lift to rxscala #1124
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -645,4 +645,60 @@ class RxScalaDemo extends JUnitSuite { | |
val o : Observable[String] = List("alice", "bob", "carol").toObservable | ||
assertEquals(List("alice", "bob", "carol"), o.retry(3).toBlockingObservable.toList) | ||
} | ||
|
||
@Test def liftExample1(): Unit = { | ||
// Add "No. " in front of each item | ||
val o = List(1, 2, 3).toObservable.lift { | ||
subscriber: Subscriber[String] => | ||
Subscriber( | ||
subscriber, | ||
v => subscriber.onNext("No. " + v), | ||
e => subscriber.onError(e), | ||
() => subscriber.onCompleted | ||
) | ||
}.toBlockingObservable.toList | ||
println(o) | ||
} | ||
|
||
@Test def liftExample2(): Unit = { | ||
val o = Observable { | ||
subscriber: Subscriber[Int] => { | ||
for (i <- 1 to 10 if !subscriber.isUnsubscribed) { | ||
println("emit " + i) | ||
subscriber.onNext(i) | ||
} | ||
if (!subscriber.isUnsubscribed) { | ||
println("emit onCompleted") | ||
subscriber.onCompleted | ||
} | ||
} | ||
} | ||
// Take the first 5 items | ||
val take = 5 | ||
val result = o.lift { | ||
subscriber: Subscriber[String] => | ||
var index = 0 | ||
Subscriber( | ||
subscriber, | ||
v => { | ||
if (index < take) { | ||
subscriber.onNext("No. " + v) | ||
} | ||
if (index == take - 1) { | ||
subscriber.onCompleted | ||
} | ||
index += 1 | ||
}, | ||
e => subscriber.onError(e), | ||
() => subscriber.onCompleted | ||
) | ||
}.toBlockingObservable.toList | ||
println(result) | ||
// emit 1 | ||
// emit 2 | ||
// emit 3 | ||
// emit 4 | ||
// emit 5 | ||
// List(No. 1, No. 2, No. 3, No. 4, No. 5) | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The examples in RxScalaDemo should be nice code and only in a style that we encourage users to adopt (modulo toBlockingObservable and some Thread.sleep...). Imho this does not quite apply for liftExample2 ;-)
|
||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -2537,6 +2537,29 @@ trait Observable[+T] | |
toScalaObservable[util.Map[K, V]](o).map(m => mapFactory() ++ m.toMap) | ||
} | ||
|
||
/** | ||
* Lift a function to the current Observable and return a new Observable that when subscribed to will pass | ||
* the values of the current Observable through the function. | ||
* <p> | ||
* In other words, this allows chaining Observers together on an Observable for acting on the values within | ||
* the Observable. | ||
* {{{ | ||
* observable.map(...).filter(...).take(5).lift(new ObserverA()).lift(new ObserverB(...)).subscribe() | ||
* }}} | ||
* | ||
* @param operator | ||
* @return an Observable that emits values that are the result of applying the bind function to the values | ||
* of the current Observable | ||
*/ | ||
def lift[R](operator: Subscriber[_ >: R] => Subscriber[_ >: T]): Observable[R] = { | ||
val thisJava = asJavaObservable.asInstanceOf[rx.Observable[T]] | ||
val thatJava = thisJava.lift[R](new rx.Observable.Operator[R, T] { | ||
override def call(subscriber: rx.Subscriber[_ >: R]): rx.Subscriber[_ >: T] = { | ||
toJavaSubscriber(operator.call(toScalaSubscriber(subscriber))) | ||
} | ||
}) | ||
toScalaObservable(thatJava) | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Now it looks much nicer than before :) But again, why wildcards? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Already removed it. |
||
} | ||
|
||
/** | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
even if it's not required, I'd write
Subscriber[Int]
here, so that people immediately see that the function is of typeSubscriber[String] => Subscriber[Int]