-
Notifications
You must be signed in to change notification settings - Fork 3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
mergeMap drops grouped observables greater than concurrency limit #2028
Comments
@jeffbski mergeMap is working as expected. The groups that come in after mergeMap has reached the concurrency limit aren't subscribed to immediately. #2023 would solve your problem by allowing you to replay events from the GroupedObservables if they aren't immediately subscribed to. Or you can immediately publish and connect the groups as they're created, so they store events and replay them to late subscribers: Rx.Observable.from([
{ a: 1, b: 10 },
{ a: 1, b: 20 },
{ a: 2, b: 30 },
{ a: 1, b: 40 },
{ a: 1, b: 50 },
{ a: 2, b: 60 },
{ a: 3, b: 70 }
])
.groupBy(x => x.a)
.map((group) => {
const historic = group.publishReplay();
// connect immediately so we don't miss any events
historic.connect();
// refCount here so the group connection will be
// unsubscribed when the subscriber count goes to 0
const refCounted = historic.refCount();
// make the refCounted observable look like a GroupedObservable
refCounted.key = group.key;
return refCounted;
})
.mergeMap(group => group, CONCURRENCY)
.subscribe(x => console.log(x)); |
Thanks @trxcllnt, I didn't realize that the GroupedObservables were hot, but this makes sense. #2023 would be a great addition that would solve my problem, so I hope that they accept your PR. I appreciate the explanation and work around code. I'll close this issue since your PR would be the best way forward. |
This thread has been automatically locked since there has not been any recent activity after it was closed. Please open a new issue for related bugs. |
It appears that mergeMap will drop grouped observables (from groupBy) greater than the concurrency limit. The extra groups are simply lost and never processed rather than simply being delayed.
RxJS version: rxjs@5.0.0-rc.1
Code to reproduce:
Expected behavior:
Actual behavior:
Additional information:
This appears to be a bug, since even though we are restricting the concurrency of mergeMap, I would have expected all of the groups to eventually be processed but the current code seems to drop the extra groups.
The text was updated successfully, but these errors were encountered: