diff --git a/spec/operators/window-spec.ts b/spec/operators/window-spec.ts index adeca0240b..923a8cb6aa 100644 --- a/spec/operators/window-spec.ts +++ b/spec/operators/window-spec.ts @@ -106,13 +106,13 @@ describe('window operator', () => { it('should be able to split a never Observable into timely empty windows', () => { const source = hot('^--------'); - const sourceSubs = '^ !'; + const sourceSubs = '^ '; const closings = cold('--x--x--|'); const closingSubs = '^ !'; - const expected = 'a-b--c--|'; + const expected = 'a-b--c---'; const a = cold('--| '); const b = cold( '---| '); - const c = cold( '---|'); + const c = cold( '----'); const expectedValues = { a: a, b: b, c: c }; const result = source.pipe(window(closings)); @@ -234,13 +234,13 @@ describe('window operator', () => { it('should complete the resulting Observable when window closings completes', () => { const source = hot('-1-2-^3-4-5-6-7-8-9-|'); - const subs = '^ ! '; + const subs = '^ !'; const closings = hot('---^---x---x---| '); const closingSubs = '^ ! '; - const expected = 'a---b---c---| '; + const expected = 'a---b---c------|'; const a = cold( '-3-4| '); const b = cold( '-5-6| '); - const c = cold( '-7-8| '); + const c = cold( '-7-8-9-|'); const expectedValues = { a: a, b: b, c: c }; const result = source.pipe(window(closings)); diff --git a/src/internal/operators/window.ts b/src/internal/operators/window.ts index da7d28b14d..8671a2a7cb 100644 --- a/src/internal/operators/window.ts +++ b/src/internal/operators/window.ts @@ -3,6 +3,7 @@ import { OperatorFunction } from '../types'; import { Subject } from '../Subject'; import { operate } from '../util/lift'; import { OperatorSubscriber } from './OperatorSubscriber'; +import { noop } from '../util/noop'; /** * Branch out the source Observable values as a nested Observable whenever @@ -47,45 +48,46 @@ import { OperatorSubscriber } from './OperatorSubscriber'; */ export function window(windowBoundaries: Observable): OperatorFunction> { return operate((source, subscriber) => { - let windowSubject = new Subject(); + let windowSubject: Subject = new Subject(); subscriber.next(windowSubject.asObservable()); - /** - * Subscribes to one of our two observables in this operator in the same way, - * only allowing for different behaviors with the next handler. - * @param sourceOrNotifier The observable to subscribe to. - * @param next The next handler to use with the subscription - */ - const windowSubscribe = (sourceOrNotifier: Observable, next: (value: any) => void) => - sourceOrNotifier.subscribe( - new OperatorSubscriber( - subscriber, - next, - (err: any) => { - windowSubject.error(err); - subscriber.error(err); - }, - () => { - windowSubject.complete(); - subscriber.complete(); - } - ) - ); + const errorHandler = (err: any) => { + windowSubject.error(err); + subscriber.error(err); + }; // Subscribe to our source - windowSubscribe(source, (value) => windowSubject.next(value)); + source.subscribe( + new OperatorSubscriber( + subscriber, + (value) => windowSubject?.next(value), + errorHandler, + () => { + windowSubject.complete(); + subscriber.complete(); + } + ) + ); + // Subscribe to the window boundaries. - windowSubscribe(windowBoundaries, () => { - windowSubject.complete(); - subscriber.next((windowSubject = new Subject())); - }); + windowBoundaries.subscribe( + new OperatorSubscriber( + subscriber, + () => { + windowSubject.complete(); + subscriber.next((windowSubject = new Subject())); + }, + errorHandler, + noop + ) + ); - // Additional teardown. Note that other teardown and post-subscription logic - // is encapsulated in the act of a Subscriber subscribing to the observable - // during the subscribe call. We can return additional teardown here. return () => { - windowSubject.unsubscribe(); + // Unsubscribing the subject ensures that anyone who has captured + // a reference to this window that tries to use it after it can + // no longer get values from the source will get an ObjectUnsubscribedError. + windowSubject?.unsubscribe(); windowSubject = null!; }; });