Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

mergeMap drops grouped observables greater than concurrency limit #2028

Closed
jeffbski opened this issue Oct 12, 2016 · 3 comments
Closed

mergeMap drops grouped observables greater than concurrency limit #2028

jeffbski opened this issue Oct 12, 2016 · 3 comments

Comments

@jeffbski
Copy link
Contributor

It appears that mergeMap will drop grouped observables (from groupBy) greater than the concurrency limit. The extra groups are simply lost and never processed rather than simply being delayed.

RxJS version: rxjs@5.0.0-rc.1

Code to reproduce:

const CONCURRENCY = 1; // setting this to less than 3 will result in groups being lost

Rx.Observable.from([
  { a: 1, b: 10 },
  { a: 1, b: 20 },
  { a: 2, b: 30 },
  { a: 1, b: 40 },
  { a: 1, b: 50 },
  { a: 2, b: 60 },
  { a: 3, b: 70 }
])
  .groupBy(x => x.a)
  .mergeMap( /* call foo once per group with group.key */
             group => {
               return foo(group.key)
                 .mergeMap(
                   fx => group,
                   (fx, obj) => ({ fx, obj })
                 );
             },
             CONCURRENCY
  )
  .subscribe(x => console.log(x));

function foo(a) {
  if (a === 1) return Rx.Observable.of('ONE');
  return Rx.Observable.of(a * 1000);
}

// Also can be duplicated in this more simple example below
console.log('single mergeMap');

Rx.Observable.from([
  { a: 1, b: 10 },
  { a: 1, b: 20 },
  { a: 2, b: 30 },
  { a: 1, b: 40 },
  { a: 1, b: 50 },
  { a: 2, b: 60 },
  { a: 3, b: 70 }
])
  .groupBy(x => x.a)
  .mergeMap(group => group, CONCURRENCY)
  .subscribe(x => console.log(x));

Expected behavior:

{ fx: 'ONE', obj: { a: 1, b: 10 } }
{ fx: 'ONE', obj: { a: 1, b: 20 } }
{ fx: 2000, obj: { a: 2, b: 30 } }
{ fx: 'ONE', obj: { a: 1, b: 40 } }
{ fx: 'ONE', obj: { a: 1, b: 50 } }
{ fx: 2000, obj: { a: 2, b: 60 } }
{ fx: 3000, obj: { a: 3, b: 70 } }
single mergeMap
{ a: 1, b: 10 }
{ a: 1, b: 20 }
{ a: 2, b: 30 }
{ a: 1, b: 40 }
{ a: 1, b: 50 }
{ a: 2, b: 60 }
{ a: 3, b: 70 }

Actual behavior:

{ fx: 'ONE', obj: { a: 1, b: 10 } }
{ fx: 'ONE', obj: { a: 1, b: 20 } }
{ fx: 'ONE', obj: { a: 1, b: 40 } }
{ fx: 'ONE', obj: { a: 1, b: 50 } }
single mergeMap
{ a: 1, b: 10 }
{ a: 1, b: 20 }
{ a: 1, b: 40 }
{ a: 1, b: 50 }

Additional information:
This appears to be a bug, since even though we are restricting the concurrency of mergeMap, I would have expected all of the groups to eventually be processed but the current code seems to drop the extra groups.

@trxcllnt
Copy link
Member

trxcllnt commented Oct 12, 2016

@jeffbski mergeMap is working as expected. The groups that come in after mergeMap has reached the concurrency limit aren't subscribed to immediately. #2023 would solve your problem by allowing you to replay events from the GroupedObservables if they aren't immediately subscribed to.

Or you can immediately publish and connect the groups as they're created, so they store events and replay them to late subscribers:

Rx.Observable.from([
  { a: 1, b: 10 },
  { a: 1, b: 20 },
  { a: 2, b: 30 },
  { a: 1, b: 40 },
  { a: 1, b: 50 },
  { a: 2, b: 60 },
  { a: 3, b: 70 }
])
  .groupBy(x => x.a)
  .map((group) => {
    const historic = group.publishReplay();
    // connect immediately so we don't miss any events
    historic.connect();
    // refCount here so the group connection will be
    // unsubscribed when the subscriber count goes to 0
    const refCounted = historic.refCount();
    // make the refCounted observable look like a GroupedObservable
    refCounted.key = group.key; 
    return refCounted;
  })
  .mergeMap(group => group, CONCURRENCY)
  .subscribe(x => console.log(x));

@jeffbski
Copy link
Contributor Author

Thanks @trxcllnt, I didn't realize that the GroupedObservables were hot, but this makes sense. #2023 would be a great addition that would solve my problem, so I hope that they accept your PR. I appreciate the explanation and work around code. I'll close this issue since your PR would be the best way forward.

@lock
Copy link

lock bot commented Jun 6, 2018

This thread has been automatically locked since there has not been any recent activity after it was closed. Please open a new issue for related bugs.

@lock lock bot locked as resolved and limited conversation to collaborators Jun 6, 2018
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants