Skip to content

Commit

Permalink
fix(window): final window stays open until source complete
Browse files Browse the repository at this point in the history
Resolves an issue where the windowBoundary complete would complete the resulting observable.

BREAKING CHANGE: The windowBoundaries observable no longer completes the result. It was only ever meant to notify of the window boundary. To get the same behavior as the old behavior, you would need to add an `endWith` and a `skipLast(1)` like so:  `source$.pipe(window(notifier$.pipe(endWith(true))), skipLast(1))`.
  • Loading branch information
benlesh committed Feb 24, 2021
1 parent 0c667d5 commit e8b05ef
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 37 deletions.
12 changes: 6 additions & 6 deletions spec/operators/window-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -106,13 +106,13 @@ describe('window operator', () => {

it('should be able to split a never Observable into timely empty windows', () => {
const source = hot('^--------');
const sourceSubs = '^ !';
const sourceSubs = '^ ';
const closings = cold('--x--x--|');
const closingSubs = '^ !';
const expected = 'a-b--c--|';
const expected = 'a-b--c---';
const a = cold('--| ');
const b = cold( '---| ');
const c = cold( '---|');
const c = cold( '----');
const expectedValues = { a: a, b: b, c: c };

const result = source.pipe(window(closings));
Expand Down Expand Up @@ -234,13 +234,13 @@ describe('window operator', () => {

it('should complete the resulting Observable when window closings completes', () => {
const source = hot('-1-2-^3-4-5-6-7-8-9-|');
const subs = '^ ! ';
const subs = '^ !';
const closings = hot('---^---x---x---| ');
const closingSubs = '^ ! ';
const expected = 'a---b---c---| ';
const expected = 'a---b---c------|';
const a = cold( '-3-4| ');
const b = cold( '-5-6| ');
const c = cold( '-7-8| ');
const c = cold( '-7-8-9-|');
const expectedValues = { a: a, b: b, c: c };

const result = source.pipe(window(closings));
Expand Down
64 changes: 33 additions & 31 deletions src/internal/operators/window.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import { OperatorFunction } from '../types';
import { Subject } from '../Subject';
import { operate } from '../util/lift';
import { OperatorSubscriber } from './OperatorSubscriber';
import { noop } from '../util/noop';

/**
* Branch out the source Observable values as a nested Observable whenever
Expand Down Expand Up @@ -47,45 +48,46 @@ import { OperatorSubscriber } from './OperatorSubscriber';
*/
export function window<T>(windowBoundaries: Observable<any>): OperatorFunction<T, Observable<T>> {
return operate((source, subscriber) => {
let windowSubject = new Subject<T>();
let windowSubject: Subject<T> = new Subject<T>();

subscriber.next(windowSubject.asObservable());

/**
* Subscribes to one of our two observables in this operator in the same way,
* only allowing for different behaviors with the next handler.
* @param sourceOrNotifier The observable to subscribe to.
* @param next The next handler to use with the subscription
*/
const windowSubscribe = (sourceOrNotifier: Observable<any>, next: (value: any) => void) =>
sourceOrNotifier.subscribe(
new OperatorSubscriber(
subscriber,
next,
(err: any) => {
windowSubject.error(err);
subscriber.error(err);
},
() => {
windowSubject.complete();
subscriber.complete();
}
)
);
const errorHandler = (err: any) => {
windowSubject.error(err);
subscriber.error(err);
};

// Subscribe to our source
windowSubscribe(source, (value) => windowSubject.next(value));
source.subscribe(
new OperatorSubscriber(
subscriber,
(value) => windowSubject?.next(value),
errorHandler,
() => {
windowSubject.complete();
subscriber.complete();
}
)
);

// Subscribe to the window boundaries.
windowSubscribe(windowBoundaries, () => {
windowSubject.complete();
subscriber.next((windowSubject = new Subject()));
});
windowBoundaries.subscribe(
new OperatorSubscriber(
subscriber,
() => {
windowSubject.complete();
subscriber.next((windowSubject = new Subject()));
},
errorHandler,
noop
)
);

// Additional teardown. Note that other teardown and post-subscription logic
// is encapsulated in the act of a Subscriber subscribing to the observable
// during the subscribe call. We can return additional teardown here.
return () => {
windowSubject.unsubscribe();
// Unsubscribing the subject ensures that anyone who has captured
// a reference to this window that tries to use it after it can
// no longer get values from the source will get an ObjectUnsubscribedError.
windowSubject?.unsubscribe();
windowSubject = null!;
};
});
Expand Down

0 comments on commit e8b05ef

Please sign in to comment.