Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(switchLatest): make switchLatest a standalone impl, add virtual time tests #317

Merged
merged 2 commits into from
Sep 17, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions spec/operators/switchAll-spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,23 @@ describe('Observable.prototype.switchAll()', function(){
}, null, done);
});

it('should unsub inner observables', function() {
var unsubbed = [];

Observable.of('a', 'b').map(function(x) {
return Observable.create(function(subscriber) {
subscriber.complete();
return function() {
unsubbed.push(x);
};
});
})
.mergeAll()
.subscribe();

expect(unsubbed).toEqual(['a', 'b']);
});

it("should switch to each inner Observable", function (done) {
var a = Observable.of(1, 2, 3);
var b = Observable.of(4, 5, 6);
Expand Down
174 changes: 170 additions & 4 deletions spec/operators/switchLatest-spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,178 @@ var immediateScheduler = Rx.Scheduler.immediate;
describe('Observable.prototype.switchLatest()', function () {
it("should switch with a selector function", function (done) {
var a = Observable.of(1, 2, 3);
var r = [11, 12, 13, 12, 13, 14, 13, 14, 15];
var i = 0;
var expected = ['a1', 'b1', 'c1', 'a2', 'b2', 'c2', 'a3', 'b3', 'c3'];
a.switchLatest(function(x) {
return Observable.range(x + 10, 3);
return Observable.of('a' + x, 'b' + x, 'c' + x);
}).subscribe(function (x) {
expect(x).toBe(r[i++]);
expect(x).toBe(expected.shift());
}, null, done);
});

it('should unsub inner observables', function(){
var unsubbed = [];

Observable.of('a', 'b').switchLatest(function(x) {
return Observable.create(function(subscriber) {
subscriber.complete();
return function() {
unsubbed.push(x);
};
});
}).subscribe();

expect(unsubbed).toEqual(['a', 'b']);
});

it('should switch inner cold observables', function (){
var x = cold( '--a--b--c--d--e--|')
var y = cold( '---f---g---h---i--|');
var e1 = hot('---------x---------y---------|');
var expected = '-----------a--b--c----f---g---h---i--|';

var observableLookup = { x: x, y: y };

expectObservable(e1.switchLatest(function(value) {
return observableLookup[value];
})).toBe(expected);
});

it('should switch inner hot observables', function (){
var x = hot('-----a--b--c--d--e--|')
var y = hot('--p-o-o-p-------------f---g---h---i--|');
var e1 = hot('---------x---------y---------|');
var expected = '-----------c--d--e----f---g---h---i--|';

var observableLookup = { x: x, y: y };

expectObservable(e1.switchLatest(function(value) {
return observableLookup[value];
})).toBe(expected);
});

it('should switch inner empty and empty', function () {
var x = Observable.empty();
var y = Observable.empty();
var e1 = hot('---------x---------y---------|');
var expected = '-----------------------------|';

var observableLookup = { x: x, y: y };

expectObservable(e1.switchLatest(function(value) {
return observableLookup[value];
})).toBe(expected);
});

it('should switch inner empty and never', function() {
var x = Observable.empty()
var y = Observable.never();
var e1 = hot('---------x---------y---------|');
var expected = '----------------------------------';

var observableLookup = { x: x, y: y };

expectObservable(e1.switchLatest(function(value) {
return observableLookup[value];
})).toBe(expected);
});

it('should switch inner never and empty', function (){
var x = Observable.never();
var y = Observable.empty();
var e1 = hot('---------x---------y---------|');
var expected = '-----------------------------|';

var observableLookup = { x: x, y: y };

expectObservable(e1.switchLatest(function(value) {
return observableLookup[value];
})).toBe(expected);
});

it('should switch inner never and throw', function (){
var x = Observable.never();
var y = Observable.throw(new Error('sad'));
var e1 = hot('---------x---------y---------|');
var expected = '-------------------#';

var observableLookup = { x: x, y: y };

expectObservable(e1.switchLatest(function(value) {
return observableLookup[value];
})).toBe(expected, undefined, new Error('sad'));
});

it('should switch inner empty and throw', function (){
var x = Observable.empty();
var y = Observable.throw(new Error('sad'));
var e1 = hot('---------x---------y---------|');
var expected = '-------------------#';

var observableLookup = { x: x, y: y };

expectObservable(e1.switchLatest(function(value) {
return observableLookup[value];
})).toBe(expected, undefined, new Error('sad'));
});

it('should handle outer empty', function (){
var e1 = Observable.empty();
var expected = '|';
expectObservable(e1.switchLatest(function(value) {
return Observable.of(value);
})).toBe(expected);
});

it('should handle outer never', function (){
var e1 = Observable.never();
var expected = '----';
expectObservable(e1.switchLatest(function(value) {
return Observable.of(value);
})).toBe(expected);
});

it('should handle outer throw', function (){
var e1 = Observable.throw(new Error('wah'));
var expected = '#';
expectObservable(e1.switchLatest(function(value) {
return Observable.of(value);
})).toBe(expected, undefined, new Error('wah'));
});

it('should handle outer error', function (){
var x = cold( '--a--b--c--d--e--|')
var e1 = hot('---------x---------#', undefined, new Error('boo-hoo'));
var expected = '-----------a--b--c-#';

var observableLookup = { x: x };

expectObservable(e1.switchLatest(function(value) {
return observableLookup[value];
})).toBe(expected, undefined, new Error('boo-hoo'));
});

it('should switch with resultSelector goodness', function (){
var x = cold( '--a--b--c--d--e--|')
var y = cold( '---f---g---h---i--|');
var e1 = hot('---------x---------y---------|');
var expected = '-----------a--b--c----f---g---h---i--|';

var observableLookup = { x: x, y: y };

var expectedValues = {
a: ['a', 'x', 0, 0],
b: ['b', 'x', 1, 0],
c: ['c', 'x', 2, 0],
f: ['f', 'y', 0, 1],
g: ['g', 'y', 1, 1],
h: ['h', 'y', 2, 1],
i: ['i', 'y', 3, 1]
};

expectObservable(e1.switchLatest(function(value) {
return observableLookup[value];
}, function(innerValue, outerValue, innerIndex, outerIndex) {
return [innerValue, outerValue, innerIndex, outerIndex];
})).toBe(expected, expectedValues);
})
});
21 changes: 17 additions & 4 deletions src/operators/switchAll.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ class SwitchSubscriber<T> extends Subscriber<T> {
_next(value: any) {
this.active++;
this.unsubscribeInner();
this.add(this.innerSubscription = value.subscribe(new InnerSwitchSubscriber(this.destination, this)));
this.add(this.innerSubscription = value.subscribe(new InnerSwitchSubscriber(this)));
}

_complete() {
Expand All @@ -49,6 +49,14 @@ class SwitchSubscriber<T> extends Subscriber<T> {
}
}

notifyNext(value: T) {
this.destination.next(value);
}

notifyError(err: any) {
this.destination.error(err);
}

notifyComplete() {
this.unsubscribeInner();
if(this.hasCompleted && this.active === 0) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Blesh couldn't we still set destination on the InnerSwitchSubscriber, but invoke super with no argument? I believe the only logic we want to short-circuit here is the shared _subscription assignment. Then we don't have to override _next or _error.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that would probably work. If you want to make that change, feel free to PR.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm going to devote some thought to the whole shared-subscription approach and see if I can poke any more holes in it. The solution may be as simple as explicitly passing the shared subscription to Subscriber's constructor, instead of reading it off the destination. Will let you know.

Expand All @@ -58,13 +66,18 @@ class SwitchSubscriber<T> extends Subscriber<T> {
}

class InnerSwitchSubscriber<T> extends Subscriber<T> {
constructor(destination: Observer<any>, private parent: SwitchSubscriber<T>) {
super(destination);
constructor(private parent: SwitchSubscriber<T>) {
super();
}

_next(value: T) {
super._next(value);
this.parent.notifyNext(value);
}

_error(err: any) {
this.parent.notifyError(err);
}

_complete() {
this.parent.notifyComplete();
}
Expand Down
91 changes: 84 additions & 7 deletions src/operators/switchLatest.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@ import Observer from '../Observer';
import Observable from '../Observable';
import Subscriber from '../Subscriber';
import Subscription from '../Subscription';

import tryCatch from '../util/tryCatch';
import { errorObject } from '../util/errorObject';
import { FlatMapSubscriber } from './flatMap-support';

export default function switchLatest<T, R, R2>(project: (value: T, index: number) => Observable<R>,
Expand All @@ -21,13 +22,89 @@ class SwitchLatestOperator<T, R, R2> implements Operator<T, R> {
}
}

class SwitchLatestSubscriber<T, R, R2> extends FlatMapSubscriber<T, R, R2> {

innerSubscription: Subscription<T>;
class SwitchLatestSubscriber<T, R, R2> extends Subscriber<T> {

private innerSubscription: Subscription<T>;
private hasCompleted = false;
index: number = 0;

constructor(destination: Observer<T>,
project: (value: T, index: number) => Observable<R>,
resultSelector?: (innerValue: R, outerValue: T, innerIndex: number, outerIndex: number) => R2) {
super(destination, project, resultSelector, 1);
private project: (value: T, index: number) => Observable<R>,
private resultSelector?: (innerValue: R, outerValue: T, innerIndex: number, outerIndex: number) => R2) {
super(destination);
}

_next(value: any) {
const index = this.index++;
const destination = this.destination;
let result = tryCatch(this.project)(value, index);
if(result === errorObject) {
destination.error(result.e);
} else {
const innerSubscription = this.innerSubscription;
if(innerSubscription) {
innerSubscription.unsubscribe();
}
this.add(this.innerSubscription = result.subscribe(new InnerSwitchLatestSubscriber(this, this.resultSelector, index, value)))
}
}

_complete() {
const innerSubscription = this.innerSubscription;
this.hasCompleted = true;
if(!innerSubscription || innerSubscription.isUnsubscribed) {
this.destination.complete();
}
}

notifyComplete(innerSub: Subscription<R>) {
this.remove(innerSub);
this.innerSubscription = null;
if(this.hasCompleted) {
this.destination.complete();
}
}

notifyError(err: any) {
this.destination.error(err);
}

notifyNext(value: T) {
this.destination.next(value);
}
}

class InnerSwitchLatestSubscriber<T, R, R2> extends Subscriber<T> {
private index: number = 0;

constructor(private parent: SwitchLatestSubscriber<T, R, R2>,
private resultSelector: (innerValue: R, outerValue: T, innerIndex: number, outerIndex: number) => R2,
private outerIndex: number,
private outerValue: any) {
super();
}

_next(value: T) {
const parent = this.parent;
const index = this.index++;
const resultSelector = this.resultSelector;
if(resultSelector) {
let result = tryCatch(resultSelector)(value, this.outerValue, index, this.outerIndex);
if(result === errorObject) {
parent.notifyError(result.e);
} else {
parent.notifyNext(result);
}
} else {
parent.notifyNext(value);
}
}

_error(err: T) {
this.parent.notifyError(err);
}

_complete() {
this.parent.notifyComplete(this);
}
}