-
Notifications
You must be signed in to change notification settings - Fork 7.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add lift to rxscala #1124
Add lift to rxscala #1124
Changes from 3 commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -54,4 +54,12 @@ object JavaConversions { | |
val asJavaObservable = observable | ||
} | ||
} | ||
|
||
implicit def toJavaOperator[T, R](operator: Subscriber[R] => Subscriber[_ >: T]): rx.Observable.Operator[R, T] = { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why do you use There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thank you. Already removed it. |
||
new rx.Observable.Operator[R, T] { | ||
override def call(subscriber: rx.Subscriber[_ >: R]): rx.Subscriber[_ >: T] = { | ||
toJavaSubscriber[T](operator(toScalaSubscriber[R](subscriber))) | ||
} | ||
} | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -2057,7 +2057,28 @@ trait Observable[+T] | |
* @see [[Observable.first]] | ||
*/ | ||
def head: Observable[T] = first | ||
|
||
|
||
/** | ||
* Returns an Observable that emits all items except the first one, or raises an `UnsupportedOperationException` | ||
* if the source Observable is empty. | ||
* | ||
* @return an Observable that emits all items except the first one, or raises an `UnsupportedOperationException` | ||
* if the source Observable is empty. | ||
*/ | ||
def tail: Observable[T] = { | ||
lift { | ||
(subscriber: Subscriber[T]) => { | ||
var isFirst = true | ||
Subscriber[T]( | ||
subscriber, | ||
(v: T) => if(isFirst) isFirst = false else subscriber.onNext(v), | ||
e => subscriber.onError(e), | ||
() => if(isFirst) subscriber.onError(new UnsupportedOperationException("tail of empty Observable")) else subscriber.onCompleted | ||
) | ||
} | ||
} | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Here you're implementing logic in Scala. Shouldn't this be in RxJava core? Maybe not the whole tail method, because that's Scala specific, but asserting that an Observable contains a certain number of elements is something quite general, so maybe some part of this should go into RxJava core? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Maybe the following operator in // Skip the first n items. If the length of source is less than n, throw an IllegalArgumentException
def skipWithException[T](n: Int) = (subscriber: Subscriber[T]) => {
var count = 0
Subscriber[T](
subscriber,
(v: T) => {
if (count >= n) subscriber.onNext(v)
count += 1
},
e => subscriber.onError(e),
() => if (count < n) subscriber.onError(new IllegalArgumentException("There is no enough items")) else subscriber.onCompleted
)
} If only asserting the length, because we can not get the length until However, I'm not sure if it's worth to add it into RxJava... There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think the purpose of There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @samuelgruetter what's your opinion about the current There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think we can merge this, since the signature is definitely right, and if we can move the logic to core later, all the better. |
||
|
||
/** | ||
* Returns an Observable that emits the last item emitted by the source Observable or notifies observers of | ||
* an `NoSuchElementException` if the source Observable is empty. | ||
|
@@ -2537,6 +2558,23 @@ trait Observable[+T] | |
toScalaObservable[util.Map[K, V]](o).map(m => mapFactory() ++ m.toMap) | ||
} | ||
|
||
/** | ||
* Lift a function to the current Observable and return a new Observable that when subscribed to will pass | ||
* the values of the current Observable through the function. | ||
* <p> | ||
* In other words, this allows chaining Observers together on an Observable for acting on the values within | ||
* the Observable. | ||
* {{{ | ||
* observable.map(...).filter(...).take(5).lift(new ObserverA()).lift(new ObserverB(...)).subscribe() | ||
* }}} | ||
* | ||
* @param operator | ||
* @return an Observable that emits values that are the result of applying the bind function to the values | ||
* of the current Observable | ||
*/ | ||
def lift[R](operator: Subscriber[R] => Subscriber[_ >: T]): Observable[R] = { | ||
toScalaObservable(asJavaObservable.lift(toJavaOperator[T, R](operator))) | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Now it looks much nicer than before :) But again, why wildcards? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Already removed it. |
||
} | ||
|
||
/** | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The examples in RxScalaDemo should be nice code and only in a style that we encourage users to adopt (modulo toBlockingObservable and some Thread.sleep...). Imho this does not quite apply for liftExample2 ;-)
It would be very nice to have a more realistic example, which also illustrates this: