-
Notifications
You must be signed in to change notification settings - Fork 7.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Eager ConcatMap #3017
Comments
I can't help you with the name and it is non-trivial to change merge() to support it (#2928): fast-path would require unbounded buffers for the later sources; the scalar queue wouldn't really work if interleaved with regular sources and potentially one would need to wrap each value into an indexed container anyway. In theory, after getting rid of the optimizations, the plain merge's round-robin collector can be modified to stick to the first buffer until it is terminated before moving onto the next. (In my plans for the semi-auto-parallelization, I thought about the join option to be either ordered or unordered merge.) |
Merge does not need to be changed. I can implement this today by composition. For example, I iterate over an array of Observables, .cache().subscribe() them, then concat those together. |
One use case for a eager concat was a bug that I ran into a while ago with using concat and publish with a closure.
|
This would also be useful where the |
I like the idea. Out of interest I think |
I was misunderstanding concat operator as it subscribes each observables eagerly. http://reactivex.io/documentation/operators/concat.html In this figure concat is very, very looks eager :P |
That's the point. A |
Yes, that diagram looks wrong. It shows both as subscribing immediately, which it definitely does not do. /cc @DavidMGross |
Ah; I thought I'd fixed that. I'll go back and make sure the right diagrams On Wed, Jul 1, 2015 at 9:24 AM, Ben Christensen notifications@github.com
David M. Gross |
Oh; I see what happened. That particular illustration ( The other illustrations on the page (e.g. under the RxJava and RxGroovy On Wed, Jul 1, 2015 at 2:42 PM, PLP Consulting (David Gross) <
David M. Gross |
It is very useful to implement observable for data-store backed value. Refer: pwittchen/prefser#47 (I know BehaviorSubject is enough to implement observable for just a variable. :) |
Oops... there is pitfall when implementing get-and-observe with eager concatMap. If
It is easier to use merge for this purpose like below: Observable.merge(observableForObserve, observableForGet) This will subscribe observableForObserve at first, so no update will be missed. |
Such operator with backpressure has limited use because even if it subscribes to all sources at once, they can produce up to @ypresto If |
And I found it may deliver outdated data if observableForObserve called onNext() after observableForGet stored fetch result in local var. Then how do I implement get-and-observe with RxJava? private Observable<List<Item>> mSharedRequestQueueObservable = updateNotificationObservable
.onBackpressureLatest()
.map(aVoid -> getObservable.cache()) // cache to share result with all subscribers
.share();
public Observable<List<Item>> getAndObserve() {
return Observable.merge(mSharedRequestQueueObservable, getObservable.nest())
.flatMap(observable -> observable) // not using any schedulers to execute synchronously
.subscribeOn(Schedulers.io());
} (Let me know if mailing list is better place to discuss :) |
Sorry, I was using wrong snippet of
|
It looks like this thread may have gone off the rails. It seems like there is interest in having an eager concat operator. I think I have implemented one in the internal processing of #3203. It's a composition of |
I think the following example does exactly this: Observable<Long> source1 = Observable.intervalRange(0, 20, 100, 100, TimeUnit.MILLISECONDS);
Observable<Long> source2 = Observable.intervalRange(20, 20, 100, 100, TimeUnit.MILLISECONDS);
Observable<Long> source3 = Observable.intervalRange(40, 20, 100, 100, TimeUnit.MILLISECONDS);
Observable<Observable<Long>> sources = Observable.just(source1, source2, source3);
sources.map(v -> {
ConnectableObservable<Long> c = v.replay();
c.subscribe(); // to cache all
c.connect(); // starting now
return c;
})
.onBackpressureBuffer() // make sure all source started
.concatMap(v -> v)
.toBlocking()
.forEach(System.out::println); Edit: The following alternative doesn't retain everything: Observable<Long> source1 = Observable.intervalRange(0, 20, 100, 100, TimeUnit.MILLISECONDS);
Observable<Long> source2 = Observable.intervalRange(20, 20, 100, 100, TimeUnit.MILLISECONDS);
Observable<Long> source3 = Observable.intervalRange(40, 20, 100, 100, TimeUnit.MILLISECONDS);
Observable<Observable<Long>> sources = Observable.just(source1, source2, source3);
sources.map(v -> {
UnicastSubject<Long> subject = UnicastSubject.create();
v.subscribe(subject);
return subject;
})
.onBackpressureBuffer() // make sure all source started
.concatMap(v -> v)
.toBlocking()
.forEach(System.out::println); |
See #3357. |
The new operator has been delivered with 1.0.15. |
Is there already a name for something in the ecosystem that is like an eager concatMap, or ordered flatMap?
I often see the pattern of people wanting to eagerly kick off several Observables, but merge them together in order.
It is absolutely not a good idea for infinite lists, but similar to how
concatMap
andflatMap
can both be dangerous if used wrong, aneagerConcatMap
that eagerly subscribes and caches the innerObservable
s could be useful.Of course this can be done without adding anything to RxJava, it's just tedious.
Bad idea? Does it already have a name that I'm unaware of?
The text was updated successfully, but these errors were encountered: