-
Notifications
You must be signed in to change notification settings - Fork 3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
IteratorObserable's never termines when within Observable.merge #2476
Comments
There seems to be a problem with order in which Note how it subscribes first and then it adds returned subscription to root subscription. Because iterator starts emitting synchronously when |
Yeah, that's the fix. We need to create a wrapper subscription first, add it to the subscriptions list, then add the subscription to the source Observable to the composite subscription. |
This problem is caused by As @mpodlasin pointed, for synchronous observables (e.g Array, Iterable, Range...) which are subscribed by Quick grep reveals there are more than 20 where
const notifier = Observable.range(0, Infinite);
Observable.range(0, 3).buffer(notifier).take(2).subscribe();
// should be : [[], []]
Observable.using(..., (resource) => Observable.range(0, Infinite)).take(2).subscribe();
Observable.combineLatest(Observable.of('a'),Observable.range(0, Infinite)).take(1).subscribe();
Observable.if(() => true, Observable.range(0, Infinite), ..).take(1).subscribe();
Observable.defer(() => Observable.range(0, Infinite)).take(1).subscribe(); I was trying to look over all occurences but just have found @mpodlasin 's PR #2479 and changed my mind to help him to work more on his PR. The patch I made was export function subscribeToResult<T>(outerSubscriber: OuterSubscriber<any, any>,
result: ObservableInput<T>,
outerValue?: T,
outerIndex?: number,
handleInnerSubscriber?: (s: Subscriber<any>) => void): Subscription {
...
if (handleInnerSubscriber) handleInnerSubscriber(destination);
return result.subscribe(destination); With this, subscribeToResult<Observable<T>, T>(this, observable, undefined, undefined, (s) => this.add(s)); The reason why I choose a callback is that there are too many places In #2560 I commented my suggestion on "creating custom operators". The problem I tried to avoid was also a synchronous observable with early unsubscribe. |
…ubscribing (#2479) Add subscriptions for source Observables to mergeAll composite subscription before actually subscribing to any of these Observables, so that if source Observable emits synchronously and consumer of mergeAll unsubscribes at that moment (for example `take` operator), subscription to source is unsubscribed as well and Observable stops emitting. Closes #2476
RxJS version:
5.2.0
Code to reproduce:
unit tests added to IteratorObservable-spec.js
Expected behavior:
See test
Actual behavior:
The test never terminates (until V8 runs out of memory that is).
*A simpler example
Given:
This terminates:
But this never terminates
The text was updated successfully, but these errors were encountered: