Skip to content

Commit

Permalink
feat(switch): add promise, iterable and array support
Browse files Browse the repository at this point in the history
- switch will now work with an observable of mixed Observables, observables, arrays, promises and iterables
  • Loading branch information
benlesh committed Sep 23, 2015
1 parent d249895 commit 24fdd34
Show file tree
Hide file tree
Showing 4 changed files with 41 additions and 25 deletions.
32 changes: 31 additions & 1 deletion spec/operators/switch-spec.js
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
/* expect, it, describe, expectObserable, hot, cold */
var Rx = require('../../dist/cjs/Rx');
var Promise = require('promise');

var Observable = Rx.Observable;
var immediateScheduler = Rx.Scheduler.immediate;

describe('Observable.prototype.switch()', function(){
fdescribe('Observable.prototype.switch()', function(){
it("should switch to each immediately-scheduled inner Observable", function (done) {
var a = Observable.of(1, 2, 3, immediateScheduler);
var b = Observable.of(4, 5, 6, immediateScheduler);
Expand Down Expand Up @@ -51,4 +52,33 @@ describe('Observable.prototype.switch()', function(){
var expected = '--------a---b----d--e---f---|';
expectObservable(e1.switch()).toBe(expected);
});

it('should handle an observable of promises', function(done){
var expected = [3];

Observable.of(Promise.resolve(1), Promise.resolve(2), Promise.resolve(3))
.switch()
.subscribe(function(x) {
expect(x).toBe(expected.shift());
}, null, function(){
expect(expected.length).toBe(0);
done();
});
});

it('should handle an observable with Arrays in it', function() {
var expected = [1,2,3,4];
var completed = false;

Observable.of(Observable.never(), Observable.never(), [1,2,3,4])
.switch()
.subscribe(function(x) {
expect(x).toBe(expected.shift());
}, null, function() {
completed = true;
expect(expected.length).toBe(0);
});

expect(completed).toBe(true);
})
});
30 changes: 7 additions & 23 deletions src/operators/switch.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ import Observer from '../Observer';
import Observable from '../Observable';
import Subscriber from '../Subscriber';
import Subscription from '../Subscription';
import OuterSubscriber from '../OuterSubscriber';
import subscribeToResult from '../util/subscribeToResult';

export default function _switch<T>(): Observable<T> {
return this.lift(new SwitchOperator());
Expand All @@ -18,7 +20,7 @@ class SwitchOperator<T, R> implements Operator<T, R> {
}
}

class SwitchSubscriber<T> extends Subscriber<T> {
class SwitchSubscriber<T, R> extends OuterSubscriber<T, R> {
private active: number = 0;
private hasCompleted: boolean = false;
innerSubscription: Subscription<T>;
Expand All @@ -28,9 +30,9 @@ class SwitchSubscriber<T> extends Subscriber<T> {
}

_next(value: any) {
this.active++;
this.unsubscribeInner();
this.add(this.innerSubscription = value.subscribe(new InnerSwitchSubscriber(this)));
this.active++;
this.add(this.innerSubscription = subscribeToResult(this, value));
}

_complete() {
Expand All @@ -41,15 +43,15 @@ class SwitchSubscriber<T> extends Subscriber<T> {
}

unsubscribeInner() {
this.active = this.active > 0 ? this.active - 1 : 0;
const innerSubscription = this.innerSubscription;
if(innerSubscription) {
this.active--;
innerSubscription.unsubscribe();
this.remove(innerSubscription);
}
}

notifyNext(value: T) {
notifyNext(value: any) {
this.destination.next(value);
}

Expand All @@ -65,21 +67,3 @@ class SwitchSubscriber<T> extends Subscriber<T> {
}
}

class InnerSwitchSubscriber<T> extends Subscriber<T> {
constructor(private parent: SwitchSubscriber<T>) {
super();
}

_next(value: T) {
this.parent.notifyNext(value);
}

_error(err: any) {
this.parent.notifyError(err);
}

_complete() {
this.parent.notifyComplete();
}
}

1 change: 1 addition & 0 deletions src/operators/switchMap.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ export default function switchMap<T, R, R2>(project: (value: T, index: number) =
return this.lift(new SwitchMapOperator(project, resultSelector));
}


class SwitchMapOperator<T, R, R2> implements Operator<T, R> {
constructor(private project: (value: T, index: number) => Observable<R>,
private resultSelector?: (innerValue: R, outerValue: T, innerIndex: number, outerIndex: number) => R2) {
Expand Down
3 changes: 2 additions & 1 deletion src/util/subscribeToResult.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,11 @@ export default function subscribeToResult<T, R, R2>(outerSubscriber: OuterSubscr
} else if (typeof result.then === 'function') {
result.then(x => {
if(!destination.isUnsubscribed) {
destination.next(result);
destination.next(x);
destination.complete();
}
}, err => destination.error(err));
return destination;
} else if (typeof result[$$iterator] === 'function') {
for(let item of result) {
destination.next(item);
Expand Down

0 comments on commit 24fdd34

Please sign in to comment.