Skip to content

Commit

Permalink
fix(usingObservable): accepts factory returns promise
Browse files Browse the repository at this point in the history
relates to #1483
  • Loading branch information
kwonoj authored and benlesh committed Mar 28, 2016
1 parent 147166e commit f8d7d1b
Show file tree
Hide file tree
Showing 2 changed files with 111 additions and 30 deletions.
83 changes: 81 additions & 2 deletions spec/observables/using-spec.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
import * as Rx from '../../dist/cjs/Rx.KitchenSink';
import {it} from '../helpers/test-helper';
import {DoneSignature} from '../helpers/test-helper';

const Observable = Rx.Observable;
const Subscription = Rx.Subscription;

describe('Observable.using', () => {
it('should dispose of the resource when the subscription is disposed', (done) => {
it('should dispose of the resource when the subscription is disposed', (done: DoneSignature) => {
let disposed = false;
const source = Observable.using(
() => new Subscription(() => disposed = true),
Expand All @@ -21,4 +21,83 @@ describe('Observable.using', () => {
done.fail('disposed should be true but was false');
}
});

it('should accept factory returns promise resolves', (done: DoneSignature) => {
const expected = 42;

let disposed = false;
const e1 = Observable.using(
() => new Subscription(() => disposed = true),
(resource) => new Promise((resolve: any) => { resolve(expected); }));

e1.subscribe(x => {
expect(x).toBe(expected);
}, (x) => {
done.fail('should not be called');
}, () => {
done();
});
});

it('should accept factory returns promise rejects', (done: DoneSignature) => {
const expected = 42;

let disposed = false;
const e1 = Observable.using(
() => new Subscription(() => disposed = true),
(resource) => new Promise((resolve: any, reject: any) => { reject(expected); }));

e1.subscribe(x => {
done.fail('should not be called');
}, (x) => {
expect(x).toBe(expected);
done();
}, () => {
done.fail('should not be called');
});
});

it('should raise error when resource factory throws', (done: DoneSignature) => {
const expectedError = 'expected';
const error = 'error';

const source = Observable.using(
() => {
throw expectedError;
},
(resource) => {
throw error;
}
);

source.subscribe((x) => {
done.fail('should not be called');
}, (x) => {
expect(x).toBe(expectedError);
done();
}, () => {
done.fail();
});
});

it('should raise error when observable factory throws', (done: DoneSignature) => {
const error = 'error';
let disposed = false;

const source = Observable.using(
() => new Subscription(() => disposed = true),
(resource) => {
throw error;
}
);

source.subscribe((x) => {
done.fail('should not be called');
}, (x) => {
expect(x).toBe(error);
done();
}, () => {
done.fail();
});
});
});
58 changes: 30 additions & 28 deletions src/observable/UsingObservable.ts
Original file line number Diff line number Diff line change
@@ -1,55 +1,57 @@
import {Observable} from '../Observable';
import {Observable, SubscribableOrPromise} from '../Observable';
import {Subscriber} from '../Subscriber';
import {Subscription} from '../Subscription';

import {subscribeToResult} from '../util/subscribeToResult';
import {OuterSubscriber} from '../OuterSubscriber';
/**
* We need this JSDoc comment for affecting ESDoc.
* @extends {Ignored}
* @hide true
*/
export class UsingObservable<T> extends Observable<T> {

static create<T>(resourceFactory: () => Subscription,
observableFactory: (resource: Subscription) => Observable<T>): Observable<T> {
static create<T>(resourceFactory: () => Subscription | void,
observableFactory: (resource: Subscription) => SubscribableOrPromise<T> | void): Observable<T> {
return new UsingObservable<T>(resourceFactory, observableFactory);
}

constructor(private resourceFactory: () => Subscription,
private observableFactory: (resource: Subscription) => Observable<T>) {
constructor(private resourceFactory: () => Subscription | void,
private observableFactory: (resource: Subscription) => SubscribableOrPromise<T> | void) {
super();
}

protected _subscribe(subscriber: Subscriber<T>): Subscription | Function | void {

const { resourceFactory, observableFactory } = this;

let resource: Subscription,
source: Observable<T>,
error: any, errorHappened = false;
let resource: Subscription;

try {
resource = resourceFactory();
} catch (e) {
error = e;
errorHappened = true;
resource = <Subscription>resourceFactory();
return new UsingSubscriber(subscriber, resource, observableFactory);
} catch (err) {
subscriber.error(err);
}
}
}

if (errorHappened) {
subscriber.error(error);
} else {
subscriber.add(resource);
try {
source = observableFactory(resource);
} catch (e) {
error = e;
errorHappened = true;
}
class UsingSubscriber<T> extends OuterSubscriber<T, T> {
constructor(destination: Subscriber<T>,
private resource: Subscription,
private observableFactory: (resource: Subscription) => SubscribableOrPromise<T> | void) {
super(destination);
destination.add(resource);
this.tryUse();
}

if (errorHappened) {
subscriber.error(error);
} else {
return source.subscribe(subscriber);
private tryUse(): void {
try {
const source = this.observableFactory.call(this, this.resource);
if (source) {
this.add(subscribeToResult(this, source));
}
} catch (err) {
this._error(err);
}
}
}
}

0 comments on commit f8d7d1b

Please sign in to comment.