diff --git a/spec/observables/from-callback-spec.js b/spec/observables/from-callback-spec.js new file mode 100644 index 0000000000..1f6d9b1cae --- /dev/null +++ b/spec/observables/from-callback-spec.js @@ -0,0 +1,97 @@ +/* globals describe, it, expect */ +var Rx = require('../../dist/cjs/Rx'); +var Observable = Rx.Observable; + +describe('Observable.fromCallback', function () { + it('should emit one value from a callback', function (done) { + function callback (datum, cb) { + cb(datum); + } + var cbToObs = Observable.fromCallback(callback); + + cbToObs(42) + .subscribe(function (x) { + expect(x).toBe(42); + }, function () { + done.fail('should not be called'); + }, + done); + }); + + it('should emit one value chosen by a selector', function (done) { + function callback (datum, cb) { + cb(null, datum); + } + var cbToObs = Observable.fromCallback(callback, null, function (err, datum) { return datum; }); + + cbToObs(42) + .subscribe(function (x) { + expect(x).toBe(42); + }, function () { + done.fail('should not be called'); + }, + done); + }); + + it('should override `this` in the callback', function (done) { + function callback (cb) { + cb(this.value); + } + var cbToObs = Observable.fromCallback(callback, {value: 42}); + + cbToObs() + .subscribe(function (x) { + expect(x).toBe(42); + }, function () { + done.fail('should not be called'); + }, + done); + }); + + it('should emit an error when the selector throws', function (done) { + function callback (cb) { + cb(42); + } + var cbToObs = Observable.fromCallback(callback, null, function (err) { throw new Error('Yikes!'); }); + + cbToObs() + .subscribe(function () { + // Considered a failure if we don't go directly to err handler + done.fail('should not be called'); + }, + function (err) { + expect(err.message).toBe('Yikes!'); + done(); + }, + function () { + // Considered a failure if we don't go directly to err handler + done.fail('should not be called'); + } + ); + }); + + it('should not emit, throw or complete if immediately unsubscribed', function (done) { + var nextSpy = jasmine.createSpy('next'); + var throwSpy = jasmine.createSpy('throw'); + var completeSpy = jasmine.createSpy('complete'); + var timeout; + function callback (datum, cb) { + // Need to cb async in order for the unsub to trigger + timeout = setTimeout(function () { + cb(datum); + }); + } + var subscription = Observable.fromCallback(callback)(42) + .subscribe(nextSpy, throwSpy, completeSpy); + subscription.unsubscribe(); + + setTimeout(function () { + expect(nextSpy).not.toHaveBeenCalled(); + expect(throwSpy).not.toHaveBeenCalled(); + expect(completeSpy).not.toHaveBeenCalled(); + + clearTimeout(timeout); + done(); + }); + }); +}); \ No newline at end of file diff --git a/src/Observable.ts b/src/Observable.ts index c8c595cec0..48823002d9 100644 --- a/src/Observable.ts +++ b/src/Observable.ts @@ -152,6 +152,7 @@ export class Observable implements CoreOperators { removeHandler: (handler: Function) => void, selector?: (...args: Array) => T) => Observable; static fromPromise: (promise: Promise, scheduler?: Scheduler) => Observable; + static fromCallback: (callbackFunc: Function, ctx?: Object, selector?: Function, scheduler?: Scheduler) => Function; static interval: (interval: number, scheduler?: Scheduler) => Observable; static merge: (...observables: Array | Scheduler | number>) => Observable; static never: () => Observable; diff --git a/src/Rx.KitchenSink.ts b/src/Rx.KitchenSink.ts index 008045934c..0140a24b85 100644 --- a/src/Rx.KitchenSink.ts +++ b/src/Rx.KitchenSink.ts @@ -53,6 +53,9 @@ Observable.fromEventPattern = FromEventPatternObservable.create; import {PromiseObservable} from './observables/PromiseObservable'; Observable.fromPromise = PromiseObservable.create; +import {CallbackObservable} from './observables/CallbackObservable'; +Observable.fromCallback = CallbackObservable.create; + import {IntervalObservable} from './observables/IntervalObservable'; Observable.interval = IntervalObservable.create; diff --git a/src/Rx.ts b/src/Rx.ts index 3c3fcacdaa..96f746bb70 100644 --- a/src/Rx.ts +++ b/src/Rx.ts @@ -36,6 +36,9 @@ Observable.fromEventPattern = FromEventPatternObservable.create; import {PromiseObservable} from './observables/PromiseObservable'; Observable.fromPromise = PromiseObservable.create; +import {CallbackObservable} from './observables/CallbackObservable'; +Observable.fromCallback = CallbackObservable.create; + import {IntervalObservable} from './observables/IntervalObservable'; Observable.interval = IntervalObservable.create; diff --git a/src/observables/CallbackObservable.ts b/src/observables/CallbackObservable.ts new file mode 100644 index 0000000000..69064b1f4c --- /dev/null +++ b/src/observables/CallbackObservable.ts @@ -0,0 +1,110 @@ +import {Observable} from '../Observable'; +import {Subscriber} from '../Subscriber'; +import {Scheduler} from '../Scheduler'; +import {Subscription} from '../Subscription'; +import {immediate} from '../schedulers/immediate'; +import {tryCatch} from '../util/tryCatch'; +import {errorObject} from '../util/errorObject'; + +export class CallbackObservable extends Observable { + + _isScalar: boolean = false; + value: T | T[]; + + static create(callbackFunc: Function, + ctx: Object = undefined, + selector: Function = undefined, + scheduler: Scheduler = immediate): Function { + return (...args): Observable => { + return new CallbackObservable(callbackFunc, ctx, selector, args, scheduler); + }; + } + + constructor(private callbackFunc: Function, + private ctx, + private selector, + private args: any[], + public scheduler: Scheduler = immediate) { + super(); + } + + _subscribe(subscriber: Subscriber) { + const callbackFunc = this.callbackFunc; + const ctx = this.ctx; + const selector = this.selector; + const args = this.args; + const scheduler = this.scheduler; + + let handler; + + if (scheduler === immediate) { + if (this._isScalar) { + subscriber.next(this.value); + subscriber.complete(); + } else { + handler = (...innerArgs) => { + let results; + + this._isScalar = true; + this.value = innerArgs; + + if (selector) { + results = tryCatch(selector).apply(ctx, innerArgs); + if (results === errorObject) { return subscriber.error(results.e); } + subscriber.next(results); + } else { + if (innerArgs.length <= 1) { + subscriber.next(innerArgs[0]); + } else { + subscriber.next(innerArgs); + } + } + subscriber.complete(); + }; + } + } else { + const subscription = new Subscription(); + if (this._isScalar) { + const value = this.value; + subscription.add(scheduler.schedule(dispatchNext, 0, { value, subscriber })); + } else { + handler = (...innerArgs) => { + let results; + + this._isScalar = true; + + if (selector) { + results = tryCatch(selector).apply(ctx, innerArgs); + if (results === errorObject) { + return subscription.add(scheduler.schedule(dispatchError, 0, { err: results.e, subscriber })); + } + this.value = results; + } else { + if (innerArgs.length <= 1) { + this.value = innerArgs[0]; + } else { + this.value = innerArgs; + } + } + const value = this.value; + subscription.add(scheduler.schedule(dispatchNext, 0, { value, subscriber })); + }; + return subscription; + } + } + + if (handler) { + args.push(handler); + callbackFunc.apply(ctx, args); + } + } +} + +function dispatchNext({ value, subscriber }) { + subscriber.next(value); + subscriber.complete(); +} + +function dispatchError({ err, subscriber }) { + subscriber.error(err); +}