Skip to content

Commit

Permalink
feat(callback): Add Observable.fromCallback
Browse files Browse the repository at this point in the history
  • Loading branch information
SomeKittens authored and benlesh committed Nov 25, 2015
1 parent 227c168 commit 9f751e7
Show file tree
Hide file tree
Showing 5 changed files with 214 additions and 0 deletions.
97 changes: 97 additions & 0 deletions spec/observables/from-callback-spec.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
/* globals describe, it, expect */
var Rx = require('../../dist/cjs/Rx');
var Observable = Rx.Observable;

describe('Observable.fromCallback', function () {
it('should emit one value from a callback', function (done) {
function callback (datum, cb) {
cb(datum);
}
var cbToObs = Observable.fromCallback(callback);

cbToObs(42)
.subscribe(function (x) {
expect(x).toBe(42);
}, function () {
done.fail('should not be called');
},
done);
});

it('should emit one value chosen by a selector', function (done) {
function callback (datum, cb) {
cb(null, datum);
}
var cbToObs = Observable.fromCallback(callback, null, function (err, datum) { return datum; });

cbToObs(42)
.subscribe(function (x) {
expect(x).toBe(42);
}, function () {
done.fail('should not be called');
},
done);
});

it('should override `this` in the callback', function (done) {
function callback (cb) {
cb(this.value);
}
var cbToObs = Observable.fromCallback(callback, {value: 42});

cbToObs()
.subscribe(function (x) {
expect(x).toBe(42);
}, function () {
done.fail('should not be called');
},
done);
});

it('should emit an error when the selector throws', function (done) {
function callback (cb) {
cb(42);
}
var cbToObs = Observable.fromCallback(callback, null, function (err) { throw new Error('Yikes!'); });

cbToObs()
.subscribe(function () {
// Considered a failure if we don't go directly to err handler
done.fail('should not be called');
},
function (err) {
expect(err.message).toBe('Yikes!');
done();
},
function () {
// Considered a failure if we don't go directly to err handler
done.fail('should not be called');
}
);
});

it('should not emit, throw or complete if immediately unsubscribed', function (done) {
var nextSpy = jasmine.createSpy('next');
var throwSpy = jasmine.createSpy('throw');
var completeSpy = jasmine.createSpy('complete');
var timeout;
function callback (datum, cb) {
// Need to cb async in order for the unsub to trigger
timeout = setTimeout(function () {
cb(datum);
});
}
var subscription = Observable.fromCallback(callback)(42)
.subscribe(nextSpy, throwSpy, completeSpy);
subscription.unsubscribe();

setTimeout(function () {
expect(nextSpy).not.toHaveBeenCalled();
expect(throwSpy).not.toHaveBeenCalled();
expect(completeSpy).not.toHaveBeenCalled();

clearTimeout(timeout);
done();
});
});
});
1 change: 1 addition & 0 deletions src/Observable.ts
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ export class Observable<T> implements CoreOperators<T> {
removeHandler: (handler: Function) => void,
selector?: (...args: Array<any>) => T) => Observable<T>;
static fromPromise: <T>(promise: Promise<T>, scheduler?: Scheduler) => Observable<T>;
static fromCallback: <T>(callbackFunc: Function, ctx?: Object, selector?: Function, scheduler?: Scheduler) => Function;
static interval: (interval: number, scheduler?: Scheduler) => Observable<number>;
static merge: <T>(...observables: Array<Observable<any> | Scheduler | number>) => Observable<T>;
static never: <T>() => Observable<T>;
Expand Down
3 changes: 3 additions & 0 deletions src/Rx.KitchenSink.ts
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,9 @@ Observable.fromEventPattern = FromEventPatternObservable.create;
import {PromiseObservable} from './observables/PromiseObservable';
Observable.fromPromise = PromiseObservable.create;

import {CallbackObservable} from './observables/CallbackObservable';
Observable.fromCallback = CallbackObservable.create;

import {IntervalObservable} from './observables/IntervalObservable';
Observable.interval = IntervalObservable.create;

Expand Down
3 changes: 3 additions & 0 deletions src/Rx.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@ Observable.fromEventPattern = FromEventPatternObservable.create;
import {PromiseObservable} from './observables/PromiseObservable';
Observable.fromPromise = PromiseObservable.create;

import {CallbackObservable} from './observables/CallbackObservable';
Observable.fromCallback = CallbackObservable.create;

import {IntervalObservable} from './observables/IntervalObservable';
Observable.interval = IntervalObservable.create;

Expand Down
110 changes: 110 additions & 0 deletions src/observables/CallbackObservable.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
import {Observable} from '../Observable';
import {Subscriber} from '../Subscriber';
import {Scheduler} from '../Scheduler';
import {Subscription} from '../Subscription';
import {immediate} from '../schedulers/immediate';
import {tryCatch} from '../util/tryCatch';
import {errorObject} from '../util/errorObject';

export class CallbackObservable<T> extends Observable<T> {

_isScalar: boolean = false;
value: T | T[];

static create<T>(callbackFunc: Function,
ctx: Object = undefined,
selector: Function = undefined,
scheduler: Scheduler = immediate): Function {
return (...args): Observable<T> => {
return new CallbackObservable(callbackFunc, ctx, selector, args, scheduler);
};
}

constructor(private callbackFunc: Function,
private ctx,
private selector,
private args: any[],
public scheduler: Scheduler = immediate) {
super();
}

_subscribe(subscriber: Subscriber<T | T[]>) {
const callbackFunc = this.callbackFunc;
const ctx = this.ctx;
const selector = this.selector;
const args = this.args;
const scheduler = this.scheduler;

let handler;

if (scheduler === immediate) {
if (this._isScalar) {
subscriber.next(this.value);
subscriber.complete();
} else {
handler = (...innerArgs) => {
let results;

this._isScalar = true;
this.value = innerArgs;

if (selector) {
results = tryCatch(selector).apply(ctx, innerArgs);
if (results === errorObject) { return subscriber.error(results.e); }
subscriber.next(results);
} else {
if (innerArgs.length <= 1) {
subscriber.next(innerArgs[0]);
} else {
subscriber.next(innerArgs);
}
}
subscriber.complete();
};
}
} else {
const subscription = new Subscription();
if (this._isScalar) {
const value = this.value;
subscription.add(scheduler.schedule(dispatchNext, 0, { value, subscriber }));
} else {
handler = (...innerArgs) => {
let results;

this._isScalar = true;

if (selector) {
results = tryCatch(selector).apply(ctx, innerArgs);
if (results === errorObject) {
return subscription.add(scheduler.schedule(dispatchError, 0, { err: results.e, subscriber }));
}
this.value = results;
} else {
if (innerArgs.length <= 1) {
this.value = innerArgs[0];
} else {
this.value = innerArgs;
}
}
const value = this.value;
subscription.add(scheduler.schedule(dispatchNext, 0, { value, subscriber }));
};
return subscription;
}
}

if (handler) {
args.push(handler);
callbackFunc.apply(ctx, args);
}
}
}

function dispatchNext({ value, subscriber }) {
subscriber.next(value);
subscriber.complete();
}

function dispatchError({ err, subscriber }) {
subscriber.error(err);
}

0 comments on commit 9f751e7

Please sign in to comment.