From 47b178bddba65272f957602b8a6c69f79cc73da6 Mon Sep 17 00:00:00 2001 From: ptaylor Date: Fri, 14 Aug 2015 13:04:42 -0700 Subject: [PATCH] feat(operator): Add expand operator. --- spec/operators/expand-spec.js | 29 ++++++++++++++ src/Observable.ts | 2 + src/Rx.ts | 2 + src/operators/expand.ts | 71 +++++++++++++++++++++++++++++++++++ 4 files changed, 104 insertions(+) create mode 100644 spec/operators/expand-spec.js create mode 100644 src/operators/expand.ts diff --git a/spec/operators/expand-spec.js b/spec/operators/expand-spec.js new file mode 100644 index 0000000000..4bf6973abe --- /dev/null +++ b/spec/operators/expand-spec.js @@ -0,0 +1,29 @@ +var Rx = require('../../dist/cjs/Rx'); +var Observable = Rx.Observable; + +describe('Observable.prototype.expand()', function () { + it('should map and recursively flatten', function (done) { + var expected = [1, 2, 3, 4, 5]; + Observable.of(0).expand(function (x) { + if (x > 4) { + return Observable.empty(); + } + return Observable.of(x + 1); + }) + .subscribe(function (x) { + expect(x).toBe(expected.shift()); + }, null, done); + }); + it('should map and recursively flatten with ScalarObservables', function (done) { + var expected = [1, 2, 3, 4, 5]; + Observable.return(0).expand(function (x) { + if (x > 4) { + return Observable.empty(); + } + return Observable.return(x + 1); + }) + .subscribe(function (x) { + expect(x).toBe(expected.shift()); + }, null, done); + }); +}); \ No newline at end of file diff --git a/src/Observable.ts b/src/Observable.ts index 2a70c5d6f2..4bf769d2f5 100644 --- a/src/Observable.ts +++ b/src/Observable.ts @@ -107,6 +107,8 @@ export default class Observable { projectResult?: (x: T, y: any, ix: number, iy: number) => R, concurrent?: number) => Observable; + expand: (project: (x: T, ix: number) => Observable) => Observable; + switchAll: () => Observable; switchLatest: (project: ((x: T, ix: number) => Observable), projectResult?: (x: T, y: any, ix: number, iy: number) => R) => Observable; diff --git a/src/Rx.ts b/src/Rx.ts index 85a3a6b5ef..c6b1fe9a37 100644 --- a/src/Rx.ts +++ b/src/Rx.ts @@ -63,6 +63,7 @@ import flatMapTo from './operators/flatMapTo'; import switchAll from './operators/switchAll'; import switchLatest from './operators/switchLatest'; import switchLatestTo from './operators/switchLatestTo'; +import expand from './operators/expand'; Observable.merge = merge; observableProto.merge = merge; @@ -72,6 +73,7 @@ observableProto.flatMapTo = flatMapTo; observableProto.switchAll = switchAll; observableProto.switchLatest = switchLatest; observableProto.switchLatestTo = switchLatestTo; +observableProto.expand = expand; import map from './operators/map'; import mapTo from './operators/mapTo'; diff --git a/src/operators/expand.ts b/src/operators/expand.ts new file mode 100644 index 0000000000..e1aa41ad08 --- /dev/null +++ b/src/operators/expand.ts @@ -0,0 +1,71 @@ +import Operator from '../Operator'; +import Observer from '../Observer'; +import Observable from '../Observable'; +import Subscriber from '../Subscriber'; + +import {MergeSubscriber, MergeInnerSubscriber} from './merge'; +import EmptyObservable from '../observables/EmptyObservable'; +import ScalarObservable from '../observables/ScalarObservable'; + +import tryCatch from '../util/tryCatch'; +import {errorObject} from '../util/errorObject'; + +export default function expand(project: (x: T, ix: number) => Observable): Observable { + return this.lift(new ExpandOperator(project)); +} + +export class ExpandOperator extends Operator { + + project: (x: T, ix: number) => Observable; + + constructor(project: (x: T, ix: number) => Observable) { + super(); + this.project = project; + } + + call(observer: Observer): Observer { + return new ExpandSubscriber(observer, this.project); + } +} + +export class ExpandSubscriber extends MergeSubscriber { + + project: (x: T, ix: number) => Observable; + + constructor(destination: Observer, + project: (x: T, ix: number) => Observable) { + super(destination, Number.POSITIVE_INFINITY); + this.project = project; + } + + _project(value, index) { + const observable = tryCatch(this.project).call(this, value, index); + if (observable === errorObject) { + this.error(errorObject.e); + return null; + } + return observable; + } + + _subscribeInner(observable, value, index) { + if(observable instanceof ScalarObservable) { + this.destination.next((> observable).value); + this._innerComplete(); + this._next((> observable).value); + } else if(observable instanceof EmptyObservable) { + this._innerComplete(); + } else { + return observable.subscribe(new ExpandInnerSubscriber(this)); + } + } +} + +export class ExpandInnerSubscriber extends MergeInnerSubscriber { + constructor(parent: ExpandSubscriber) { + super(parent); + } + _next(value) { + this.destination.next(value); + this.parent.next(value); + } +}