Skip to content

Commit

Permalink
Merge pull request #1708 from trxcllnt/fix-window-subject-subscriptions
Browse files Browse the repository at this point in the history
fix(window): don't track internal window subjects as subscriptions.
  • Loading branch information
benlesh committed May 24, 2016
2 parents 0b36b96 + f3357b9 commit 2a7b001
Show file tree
Hide file tree
Showing 8 changed files with 49 additions and 116 deletions.
39 changes: 15 additions & 24 deletions spec/operators/window-spec.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import {expect} from 'chai';
import * as Rx from '../../dist/cjs/Rx';
declare const {hot, cold, asDiagram, time, expectObservable, expectSubscriptions};

Expand Down Expand Up @@ -57,6 +56,20 @@ describe('Observable.prototype.window', () => {
expectSubscriptions(closings.subscriptions).toBe(closingSubs);
});

it('should return a single empty window if source is sync empty and closing is sync empty', () => {
const source = cold('(|)');
const sourceSubs = '(^!)';
const expected = '(w|)';
const w = cold('|');
const expectedValues = { w: w };

const result = source.window(Observable.empty());

expectObservable(result).toBe(expected, expectedValues);
expectSubscriptions(source.subscriptions).toBe(sourceSubs);
// expectSubscriptions(closings.subscriptions).toBe(closingSubs);
});

it('should split a Just source into a single window identical to source, using a Never closing',
() => {
const source = cold('(a|)');
Expand Down Expand Up @@ -201,28 +214,6 @@ describe('Observable.prototype.window', () => {
expectSubscriptions(closings.subscriptions).toBe(closingSubs);
});

it('should dispose window Subjects if the outer is unsubscribed early', () => {
const source = hot('--a--b--c--d--e--f--g--h--|');
const sourceSubs = '^ ! ';
const expected = 'x--------- ';
const x = cold( '--a--b--c- ');
const unsub = ' ! ';
const late = time('---------------| ');
const values = { x: x };

let window;
const result = source.window(Observable.never())
.do((w: any) => { window = w; });

expectObservable(result, unsub).toBe(expected, values);
expectSubscriptions(source.subscriptions).toBe(sourceSubs);
rxTestScheduler.schedule(() => {
expect(() => {
window.subscribe();
}).to.throw(Rx.ObjectUnsubscribedError);
}, late);
});

it('should make outer emit error when closing throws', () => {
const source = hot('-1-2-^3-4-5-6-7-8-9-#');
const subs = '^ ! ';
Expand Down Expand Up @@ -256,4 +247,4 @@ describe('Observable.prototype.window', () => {
expectSubscriptions(source.subscriptions).toBe(subs);
expectSubscriptions(closings.subscriptions).toBe(closingSubs);
});
});
});
25 changes: 1 addition & 24 deletions spec/operators/windowCount-spec.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import {expect} from 'chai';
import * as Rx from '../../dist/cjs/Rx';
declare const {hot, cold, asDiagram, time, expectObservable, expectSubscriptions};

Expand Down Expand Up @@ -131,28 +130,6 @@ describe('Observable.prototype.windowCount', () => {
expectSubscriptions(source.subscriptions).toBe(subs);
});

it('should dispose window Subjects if the outer is unsubscribed early', () => {
const source = hot('--a--b--c--d--e--f--g--h--|');
const sourceSubs = '^ ! ';
const expected = 'x--------- ';
const x = cold( '--a--b--c- ');
const unsub = ' ! ';
const late = time('---------------| ');
const values = { x: x };

let window;
const result = source.windowCount(10, 10)
.do((w: any) => { window = w; });

expectObservable(result, unsub).toBe(expected, values);
expectSubscriptions(source.subscriptions).toBe(sourceSubs);
rxTestScheduler.schedule(() => {
expect(() => {
window.subscribe();
}).to.throw(Rx.ObjectUnsubscribedError);
}, late);
});

it('should not break unsubscription chains when result is unsubscribed explicitly', () => {
const source = hot('^-a--b--c--d--|');
const subs = '^ ! ';
Expand All @@ -172,4 +149,4 @@ describe('Observable.prototype.windowCount', () => {
expectObservable(result, unsub).toBe(expected, values);
expectSubscriptions(source.subscriptions).toBe(subs);
});
});
});
24 changes: 1 addition & 23 deletions spec/operators/windowTime-spec.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import {expect} from 'chai';
import * as Rx from '../../dist/cjs/Rx';
declare const {hot, cold, asDiagram, time, expectObservable, expectSubscriptions};

Expand Down Expand Up @@ -172,27 +171,6 @@ describe('Observable.prototype.windowTime', () => {
expectSubscriptions(source.subscriptions).toBe(subs);
});

it('should dispose window Subjects if the outer is unsubscribed early', () => {
const source = hot('--a--b--c--d--e--f--g--h--|');
const sourceSubs = '^ ! ';
const expected = 'x--------- ';
const x = cold( '--a--b--c- ');
const unsub = ' ! ';
const values = { x: x };

let window;
const result = source.windowTime(1000, 1000, rxTestScheduler)
.do((w: any) => { window = w; });

expectObservable(result, unsub).toBe(expected, values);
expectSubscriptions(source.subscriptions).toBe(sourceSubs);
rxTestScheduler.schedule(() => {
expect(() => {
window.subscribe();
}).to.throw(Rx.ObjectUnsubscribedError);
}, 150);
});

it('should not break unsubscription chains when result is unsubscribed explicitly', () => {
const source = hot('--1--2--^--a--b--c--d--e--f--g--h--|');
const sourcesubs = '^ ! ';
Expand All @@ -216,4 +194,4 @@ describe('Observable.prototype.windowTime', () => {
expectObservable(result, unsub).toBe(expected, values);
expectSubscriptions(source.subscriptions).toBe(sourcesubs);
});
});
});
26 changes: 1 addition & 25 deletions spec/operators/windowWhen-spec.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import {expect} from 'chai';
import * as Rx from '../../dist/cjs/Rx';
declare const {hot, cold, asDiagram, time, expectObservable, expectSubscriptions};

Expand Down Expand Up @@ -154,29 +153,6 @@ describe('Observable.prototype.windowWhen', () => {
expectSubscriptions(closings[1].subscriptions).toBe(closeSubs[1]);
});

it('should dispose window Subjects if the outer is unsubscribed early', () => {
const source = hot('--a--b--c--d--e--f--g--h--|');
const sourceSubs = '^ ! ';
const expected = 'x--------- ';
const x = cold( '--a--b--c- ');
const unsub = ' ! ';
const late = time('---------------| ');
const values = { x: x };

let window;
const result = source
.windowWhen(() => Observable.never())
.do((w: any) => { window = w; });

expectObservable(result, unsub).toBe(expected, values);
expectSubscriptions(source.subscriptions).toBe(sourceSubs);
rxTestScheduler.schedule(() => {
expect(() => {
window.subscribe();
}).to.throw(Rx.ObjectUnsubscribedError);
}, late);
});

it('should propagate error thrown from closingSelector', () => {
const e1 = hot('--a--^---b---c---d---e---f---g---h------| ');
const e1subs = '^ ! ';
Expand Down Expand Up @@ -355,4 +331,4 @@ describe('Observable.prototype.windowWhen', () => {
expectSubscriptions(e1.subscriptions).toBe(e1subs);
expectSubscriptions(e2.subscriptions).toBe(e2subs);
});
});
});
21 changes: 14 additions & 7 deletions src/operator/window.ts
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,12 @@ class WindowOperator<T> implements Operator<T, Observable<T>> {
}

call(subscriber: Subscriber<Observable<T>>, source: any): any {
return source._subscribe(new WindowSubscriber(subscriber, this.windowBoundaries));
const windowSubscriber = new WindowSubscriber(subscriber);
const sourceSubscription = source._subscribe(windowSubscriber);
if (!sourceSubscription.isUnsubscribed) {
windowSubscriber.add(subscribeToResult(windowSubscriber, this.windowBoundaries));
}
return sourceSubscription;
}
}

Expand All @@ -67,13 +72,12 @@ class WindowOperator<T> implements Operator<T, Observable<T>> {
* @extends {Ignored}
*/
class WindowSubscriber<T> extends OuterSubscriber<T, any> {
private window: Subject<T>;

constructor(protected destination: Subscriber<Observable<T>>,
private windowBoundaries: Observable<any>) {
private window: Subject<T> = new Subject<T>();

constructor(destination: Subscriber<Observable<T>>) {
super(destination);
this.add(subscribeToResult(this, windowBoundaries));
this.openWindow();
destination.next(this.window);
}

notifyNext(outerValue: T, innerValue: any,
Expand Down Expand Up @@ -104,14 +108,17 @@ class WindowSubscriber<T> extends OuterSubscriber<T, any> {
this.destination.complete();
}

protected _unsubscribe() {
this.window = null;
}

private openWindow(): void {
const prevWindow = this.window;
if (prevWindow) {
prevWindow.complete();
}
const destination = this.destination;
const newWindow = this.window = new Subject<T>();
destination.add(newWindow);
destination.next(newWindow);
}
}
28 changes: 17 additions & 11 deletions src/operator/windowCount.ts
Original file line number Diff line number Diff line change
Expand Up @@ -84,9 +84,7 @@ class WindowCountSubscriber<T> extends Subscriber<T> {
private windowSize: number,
private startWindowEvery: number) {
super(destination);
const firstWindow = this.windows[0];
destination.add(firstWindow);
destination.next(firstWindow);
destination.next(this.windows[0]);
}

protected _next(value: T) {
Expand All @@ -96,34 +94,42 @@ class WindowCountSubscriber<T> extends Subscriber<T> {
const windows = this.windows;
const len = windows.length;

for (let i = 0; i < len; i++) {
for (let i = 0; i < len && !this.isUnsubscribed; i++) {
windows[i].next(value);
}
const c = this.count - windowSize + 1;
if (c >= 0 && c % startWindowEvery === 0) {
if (c >= 0 && c % startWindowEvery === 0 && !this.isUnsubscribed) {
windows.shift().complete();
}
if (++this.count % startWindowEvery === 0) {
if (++this.count % startWindowEvery === 0 && !this.isUnsubscribed) {
const window = new Subject<T>();
windows.push(window);
destination.add(window);
destination.next(window);
}
}

protected _error(err: any) {
const windows = this.windows;
while (windows.length > 0) {
windows.shift().error(err);
if (windows) {
while (windows.length > 0 && !this.isUnsubscribed) {
windows.shift().error(err);
}
}
this.destination.error(err);
}

protected _complete() {
const windows = this.windows;
while (windows.length > 0) {
windows.shift().complete();
if (windows) {
while (windows.length > 0 && !this.isUnsubscribed) {
windows.shift().complete();
}
}
this.destination.complete();
}

protected _unsubscribe() {
this.count = 0;
this.windows = null;
}
}
1 change: 0 additions & 1 deletion src/operator/windowTime.ts
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,6 @@ class WindowTimeSubscriber<T> extends Subscriber<T> {
const window = new Subject<T>();
this.windows.push(window);
const destination = this.destination;
destination.add(window);
destination.next(window);
return window;
}
Expand Down
1 change: 0 additions & 1 deletion src/operator/windowWhen.ts
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,6 @@ class WindowSubscriber<T> extends OuterSubscriber<T, any> {
this.window.error(err);
} else {
this.add(this.closingNotification = subscribeToResult(this, closingNotifier));
this.add(window);
}
}
}

0 comments on commit 2a7b001

Please sign in to comment.