diff --git a/spec/operators/switchAll-spec.js b/spec/operators/switchAll-spec.js index 9731cc6c7c..555749c287 100644 --- a/spec/operators/switchAll-spec.js +++ b/spec/operators/switchAll-spec.js @@ -17,6 +17,23 @@ describe('Observable.prototype.switchAll()', function(){ }, null, done); }); + it('should unsub inner observables', function() { + var unsubbed = []; + + Observable.of('a', 'b').map(function(x) { + return Observable.create(function(subscriber) { + subscriber.complete(); + return function() { + unsubbed.push(x); + }; + }); + }) + .mergeAll() + .subscribe(); + + expect(unsubbed).toEqual(['a', 'b']); + }); + it("should switch to each inner Observable", function (done) { var a = Observable.of(1, 2, 3); var b = Observable.of(4, 5, 6); diff --git a/spec/operators/switchLatest-spec.js b/spec/operators/switchLatest-spec.js index 680161a22c..0502a71684 100644 --- a/spec/operators/switchLatest-spec.js +++ b/spec/operators/switchLatest-spec.js @@ -6,12 +6,178 @@ var immediateScheduler = Rx.Scheduler.immediate; describe('Observable.prototype.switchLatest()', function () { it("should switch with a selector function", function (done) { var a = Observable.of(1, 2, 3); - var r = [11, 12, 13, 12, 13, 14, 13, 14, 15]; - var i = 0; + var expected = ['a1', 'b1', 'c1', 'a2', 'b2', 'c2', 'a3', 'b3', 'c3']; a.switchLatest(function(x) { - return Observable.range(x + 10, 3); + return Observable.of('a' + x, 'b' + x, 'c' + x); }).subscribe(function (x) { - expect(x).toBe(r[i++]); + expect(x).toBe(expected.shift()); }, null, done); }); + + it('should unsub inner observables', function(){ + var unsubbed = []; + + Observable.of('a', 'b').switchLatest(function(x) { + return Observable.create(function(subscriber) { + subscriber.complete(); + return function() { + unsubbed.push(x); + }; + }); + }).subscribe(); + + expect(unsubbed).toEqual(['a', 'b']); + }); + + it('should switch inner cold observables', function (){ + var x = cold( '--a--b--c--d--e--|') + var y = cold( '---f---g---h---i--|'); + var e1 = hot('---------x---------y---------|'); + var expected = '-----------a--b--c----f---g---h---i--|'; + + var observableLookup = { x: x, y: y }; + + expectObservable(e1.switchLatest(function(value) { + return observableLookup[value]; + })).toBe(expected); + }); + + it('should switch inner hot observables', function (){ + var x = hot('-----a--b--c--d--e--|') + var y = hot('--p-o-o-p-------------f---g---h---i--|'); + var e1 = hot('---------x---------y---------|'); + var expected = '-----------c--d--e----f---g---h---i--|'; + + var observableLookup = { x: x, y: y }; + + expectObservable(e1.switchLatest(function(value) { + return observableLookup[value]; + })).toBe(expected); + }); + + it('should switch inner empty and empty', function () { + var x = Observable.empty(); + var y = Observable.empty(); + var e1 = hot('---------x---------y---------|'); + var expected = '-----------------------------|'; + + var observableLookup = { x: x, y: y }; + + expectObservable(e1.switchLatest(function(value) { + return observableLookup[value]; + })).toBe(expected); + }); + + it('should switch inner empty and never', function() { + var x = Observable.empty() + var y = Observable.never(); + var e1 = hot('---------x---------y---------|'); + var expected = '----------------------------------'; + + var observableLookup = { x: x, y: y }; + + expectObservable(e1.switchLatest(function(value) { + return observableLookup[value]; + })).toBe(expected); + }); + + it('should switch inner never and empty', function (){ + var x = Observable.never(); + var y = Observable.empty(); + var e1 = hot('---------x---------y---------|'); + var expected = '-----------------------------|'; + + var observableLookup = { x: x, y: y }; + + expectObservable(e1.switchLatest(function(value) { + return observableLookup[value]; + })).toBe(expected); + }); + + it('should switch inner never and throw', function (){ + var x = Observable.never(); + var y = Observable.throw(new Error('sad')); + var e1 = hot('---------x---------y---------|'); + var expected = '-------------------#'; + + var observableLookup = { x: x, y: y }; + + expectObservable(e1.switchLatest(function(value) { + return observableLookup[value]; + })).toBe(expected, undefined, new Error('sad')); + }); + + it('should switch inner empty and throw', function (){ + var x = Observable.empty(); + var y = Observable.throw(new Error('sad')); + var e1 = hot('---------x---------y---------|'); + var expected = '-------------------#'; + + var observableLookup = { x: x, y: y }; + + expectObservable(e1.switchLatest(function(value) { + return observableLookup[value]; + })).toBe(expected, undefined, new Error('sad')); + }); + + it('should handle outer empty', function (){ + var e1 = Observable.empty(); + var expected = '|'; + expectObservable(e1.switchLatest(function(value) { + return Observable.of(value); + })).toBe(expected); + }); + + it('should handle outer never', function (){ + var e1 = Observable.never(); + var expected = '----'; + expectObservable(e1.switchLatest(function(value) { + return Observable.of(value); + })).toBe(expected); + }); + + it('should handle outer throw', function (){ + var e1 = Observable.throw(new Error('wah')); + var expected = '#'; + expectObservable(e1.switchLatest(function(value) { + return Observable.of(value); + })).toBe(expected, undefined, new Error('wah')); + }); + + it('should handle outer error', function (){ + var x = cold( '--a--b--c--d--e--|') + var e1 = hot('---------x---------#', undefined, new Error('boo-hoo')); + var expected = '-----------a--b--c-#'; + + var observableLookup = { x: x }; + + expectObservable(e1.switchLatest(function(value) { + return observableLookup[value]; + })).toBe(expected, undefined, new Error('boo-hoo')); + }); + + it('should switch with resultSelector goodness', function (){ + var x = cold( '--a--b--c--d--e--|') + var y = cold( '---f---g---h---i--|'); + var e1 = hot('---------x---------y---------|'); + var expected = '-----------a--b--c----f---g---h---i--|'; + + var observableLookup = { x: x, y: y }; + + var expectedValues = { + a: ['a', 'x', 0, 0], + b: ['b', 'x', 1, 0], + c: ['c', 'x', 2, 0], + f: ['f', 'y', 0, 1], + g: ['g', 'y', 1, 1], + h: ['h', 'y', 2, 1], + i: ['i', 'y', 3, 1] + }; + + expectObservable(e1.switchLatest(function(value) { + return observableLookup[value]; + }, function(innerValue, outerValue, innerIndex, outerIndex) { + return [innerValue, outerValue, innerIndex, outerIndex]; + })).toBe(expected, expectedValues); + }) }); \ No newline at end of file diff --git a/src/operators/switchAll.ts b/src/operators/switchAll.ts index f60a965c25..9b84ad6598 100644 --- a/src/operators/switchAll.ts +++ b/src/operators/switchAll.ts @@ -30,7 +30,7 @@ class SwitchSubscriber extends Subscriber { _next(value: any) { this.active++; this.unsubscribeInner(); - this.add(this.innerSubscription = value.subscribe(new InnerSwitchSubscriber(this.destination, this))); + this.add(this.innerSubscription = value.subscribe(new InnerSwitchSubscriber(this))); } _complete() { @@ -49,6 +49,14 @@ class SwitchSubscriber extends Subscriber { } } + notifyNext(value: T) { + this.destination.next(value); + } + + notifyError(err: any) { + this.destination.error(err); + } + notifyComplete() { this.unsubscribeInner(); if(this.hasCompleted && this.active === 0) { @@ -58,13 +66,18 @@ class SwitchSubscriber extends Subscriber { } class InnerSwitchSubscriber extends Subscriber { - constructor(destination: Observer, private parent: SwitchSubscriber) { - super(destination); + constructor(private parent: SwitchSubscriber) { + super(); } _next(value: T) { - super._next(value); + this.parent.notifyNext(value); + } + + _error(err: any) { + this.parent.notifyError(err); } + _complete() { this.parent.notifyComplete(); } diff --git a/src/operators/switchLatest.ts b/src/operators/switchLatest.ts index 4e2b2d3378..7a984eb97c 100644 --- a/src/operators/switchLatest.ts +++ b/src/operators/switchLatest.ts @@ -3,7 +3,8 @@ import Observer from '../Observer'; import Observable from '../Observable'; import Subscriber from '../Subscriber'; import Subscription from '../Subscription'; - +import tryCatch from '../util/tryCatch'; +import { errorObject } from '../util/errorObject'; import { FlatMapSubscriber } from './flatMap-support'; export default function switchLatest(project: (value: T, index: number) => Observable, @@ -21,13 +22,89 @@ class SwitchLatestOperator implements Operator { } } -class SwitchLatestSubscriber extends FlatMapSubscriber { - - innerSubscription: Subscription; +class SwitchLatestSubscriber extends Subscriber { + private innerSubscription: Subscription; + private hasCompleted = false; + index: number = 0; + constructor(destination: Observer, - project: (value: T, index: number) => Observable, - resultSelector?: (innerValue: R, outerValue: T, innerIndex: number, outerIndex: number) => R2) { - super(destination, project, resultSelector, 1); + private project: (value: T, index: number) => Observable, + private resultSelector?: (innerValue: R, outerValue: T, innerIndex: number, outerIndex: number) => R2) { + super(destination); + } + + _next(value: any) { + const index = this.index++; + const destination = this.destination; + let result = tryCatch(this.project)(value, index); + if(result === errorObject) { + destination.error(result.e); + } else { + const innerSubscription = this.innerSubscription; + if(innerSubscription) { + innerSubscription.unsubscribe(); + } + this.add(this.innerSubscription = result.subscribe(new InnerSwitchLatestSubscriber(this, this.resultSelector, index, value))) + } + } + + _complete() { + const innerSubscription = this.innerSubscription; + this.hasCompleted = true; + if(!innerSubscription || innerSubscription.isUnsubscribed) { + this.destination.complete(); + } + } + + notifyComplete(innerSub: Subscription) { + this.remove(innerSub); + this.innerSubscription = null; + if(this.hasCompleted) { + this.destination.complete(); + } + } + + notifyError(err: any) { + this.destination.error(err); + } + + notifyNext(value: T) { + this.destination.next(value); + } +} + +class InnerSwitchLatestSubscriber extends Subscriber { + private index: number = 0; + + constructor(private parent: SwitchLatestSubscriber, + private resultSelector: (innerValue: R, outerValue: T, innerIndex: number, outerIndex: number) => R2, + private outerIndex: number, + private outerValue: any) { + super(); + } + + _next(value: T) { + const parent = this.parent; + const index = this.index++; + const resultSelector = this.resultSelector; + if(resultSelector) { + let result = tryCatch(resultSelector)(value, this.outerValue, index, this.outerIndex); + if(result === errorObject) { + parent.notifyError(result.e); + } else { + parent.notifyNext(result); + } + } else { + parent.notifyNext(value); + } + } + + _error(err: T) { + this.parent.notifyError(err); + } + + _complete() { + this.parent.notifyComplete(this); } }