Skip to content

Commit

Permalink
fix(switchAll/switchLatest): inner subscriptions should now properly …
Browse files Browse the repository at this point in the history
…unsub

- ensures that inner subscriptions are NOT provided a destination, which changes the executiong path of unsubscribe()

fixes #302
  • Loading branch information
benlesh committed Sep 17, 2015
1 parent c78f940 commit 38a45f8
Show file tree
Hide file tree
Showing 4 changed files with 69 additions and 12 deletions.
17 changes: 17 additions & 0 deletions spec/operators/switchAll-spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,23 @@ describe('Observable.prototype.switchAll()', function(){
}, null, done);
});

it('should unsub inner observables', function() {
var unsubbed = [];

Observable.of('a', 'b').map(function(x) {
return Observable.create(function(subscriber) {
subscriber.complete();
return function() {
unsubbed.push(x);
};
});
})
.mergeAll()
.subscribe();

expect(unsubbed).toEqual(['a', 'b']);
});

it("should switch to each inner Observable", function (done) {
var a = Observable.of(1, 2, 3);
var b = Observable.of(4, 5, 6);
Expand Down
17 changes: 16 additions & 1 deletion spec/operators/switchLatest-spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ var Rx = require('../../dist/cjs/Rx');
var Observable = Rx.Observable;
var immediateScheduler = Rx.Scheduler.immediate;

fdescribe('Observable.prototype.switchLatest()', function () {
describe('Observable.prototype.switchLatest()', function () {
it("should switch with a selector function", function (done) {
var a = Observable.of(1, 2, 3);
var expected = ['a1', 'b1', 'c1', 'a2', 'b2', 'c2', 'a3', 'b3', 'c3'];
Expand All @@ -13,6 +13,21 @@ fdescribe('Observable.prototype.switchLatest()', function () {
expect(x).toBe(expected.shift());
}, null, done);
});

it('should unsub inner observables', function(){
var unsubbed = [];

Observable.of('a', 'b').switchLatest(function(x) {
return Observable.create(function(subscriber) {
subscriber.complete();
return function() {
unsubbed.push(x);
};
});
}).subscribe();

expect(unsubbed).toEqual(['a', 'b']);
});

it('should switch inner cold observables', function (){
var x = cold( '--a--b--c--d--e--|')
Expand Down
21 changes: 17 additions & 4 deletions src/operators/switchAll.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ class SwitchSubscriber<T> extends Subscriber<T> {
_next(value: any) {
this.active++;
this.unsubscribeInner();
this.add(this.innerSubscription = value.subscribe(new InnerSwitchSubscriber(this.destination, this)));
this.add(this.innerSubscription = value.subscribe(new InnerSwitchSubscriber(this)));
}

_complete() {
Expand All @@ -49,6 +49,14 @@ class SwitchSubscriber<T> extends Subscriber<T> {
}
}

notifyNext(value: T) {
this.destination.next(value);
}

notifyError(err: any) {
this.destination.error(err);
}

notifyComplete() {
this.unsubscribeInner();
if(this.hasCompleted && this.active === 0) {
Expand All @@ -58,13 +66,18 @@ class SwitchSubscriber<T> extends Subscriber<T> {
}

class InnerSwitchSubscriber<T> extends Subscriber<T> {
constructor(destination: Observer<any>, private parent: SwitchSubscriber<T>) {
super(destination);
constructor(private parent: SwitchSubscriber<T>) {
super();
}

_next(value: T) {
super._next(value);
this.parent.notifyNext(value);
}

_error(err: any) {
this.parent.notifyError(err);
}

_complete() {
this.parent.notifyComplete();
}
Expand Down
26 changes: 19 additions & 7 deletions src/operators/switchLatest.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ class SwitchLatestSubscriber<T, R, R2> extends Subscriber<T> {
if(innerSubscription) {
innerSubscription.unsubscribe();
}
this.add(this.innerSubscription = result.subscribe(new InnerSwitchLatestSubscriber(destination, this, this.resultSelector, index, value)))
this.add(this.innerSubscription = result.subscribe(new InnerSwitchLatestSubscriber(this, this.resultSelector, index, value)))
}
}

Expand All @@ -64,34 +64,46 @@ class SwitchLatestSubscriber<T, R, R2> extends Subscriber<T> {
this.destination.complete();
}
}

notifyError(err: any) {
this.destination.error(err);
}

notifyNext(value: T) {
this.destination.next(value);
}
}

class InnerSwitchLatestSubscriber<T, R, R2> extends Subscriber<T> {
private index: number = 0;

constructor(destination: Observer<T>, private parent: SwitchLatestSubscriber<T, R, R2>,
constructor(private parent: SwitchLatestSubscriber<T, R, R2>,
private resultSelector: (innerValue: R, outerValue: T, innerIndex: number, outerIndex: number) => R2,
private outerIndex: number,
private outerValue: any) {
super(destination);
super();
}

_next(value: T) {
const destination = this.destination;
const parent = this.parent;
const index = this.index++;
const resultSelector = this.resultSelector;
if(resultSelector) {
let result = tryCatch(resultSelector)(value, this.outerValue, index, this.outerIndex);
if(result === errorObject) {
destination.error(result.e);
parent.notifyError(result.e);
} else {
destination.next(result);
parent.notifyNext(result);
}
} else {
destination.next(value);
parent.notifyNext(value);
}
}

_error(err: T) {
this.parent.notifyError(err);
}

_complete() {
this.parent.notifyComplete(this);
}
Expand Down

0 comments on commit 38a45f8

Please sign in to comment.