diff --git a/spec/observables/using-spec.ts b/spec/observables/using-spec.ts index 078c36159d..cc4f0c04c4 100644 --- a/spec/observables/using-spec.ts +++ b/spec/observables/using-spec.ts @@ -1,11 +1,11 @@ import * as Rx from '../../dist/cjs/Rx.KitchenSink'; -import {it} from '../helpers/test-helper'; +import {DoneSignature} from '../helpers/test-helper'; const Observable = Rx.Observable; const Subscription = Rx.Subscription; describe('Observable.using', () => { - it('should dispose of the resource when the subscription is disposed', (done) => { + it('should dispose of the resource when the subscription is disposed', (done: DoneSignature) => { let disposed = false; const source = Observable.using( () => new Subscription(() => disposed = true), @@ -21,4 +21,83 @@ describe('Observable.using', () => { done.fail('disposed should be true but was false'); } }); + + it('should accept factory returns promise resolves', (done: DoneSignature) => { + const expected = 42; + + let disposed = false; + const e1 = Observable.using( + () => new Subscription(() => disposed = true), + (resource) => new Promise((resolve: any) => { resolve(expected); })); + + e1.subscribe(x => { + expect(x).toBe(expected); + }, (x) => { + done.fail('should not be called'); + }, () => { + done(); + }); + }); + + it('should accept factory returns promise rejects', (done: DoneSignature) => { + const expected = 42; + + let disposed = false; + const e1 = Observable.using( + () => new Subscription(() => disposed = true), + (resource) => new Promise((resolve: any, reject: any) => { reject(expected); })); + + e1.subscribe(x => { + done.fail('should not be called'); + }, (x) => { + expect(x).toBe(expected); + done(); + }, () => { + done.fail('should not be called'); + }); + }); + + it('should raise error when resource factory throws', (done: DoneSignature) => { + const expectedError = 'expected'; + const error = 'error'; + + const source = Observable.using( + () => { + throw expectedError; + }, + (resource) => { + throw error; + } + ); + + source.subscribe((x) => { + done.fail('should not be called'); + }, (x) => { + expect(x).toBe(expectedError); + done(); + }, () => { + done.fail(); + }); + }); + + it('should raise error when observable factory throws', (done: DoneSignature) => { + const error = 'error'; + let disposed = false; + + const source = Observable.using( + () => new Subscription(() => disposed = true), + (resource) => { + throw error; + } + ); + + source.subscribe((x) => { + done.fail('should not be called'); + }, (x) => { + expect(x).toBe(error); + done(); + }, () => { + done.fail(); + }); + }); }); diff --git a/src/observable/UsingObservable.ts b/src/observable/UsingObservable.ts index 90089e0bf0..08071d217c 100644 --- a/src/observable/UsingObservable.ts +++ b/src/observable/UsingObservable.ts @@ -1,7 +1,9 @@ -import {Observable} from '../Observable'; +import {Observable, SubscribableOrPromise} from '../Observable'; import {Subscriber} from '../Subscriber'; import {Subscription} from '../Subscription'; +import {subscribeToResult} from '../util/subscribeToResult'; +import {OuterSubscriber} from '../OuterSubscriber'; /** * We need this JSDoc comment for affecting ESDoc. * @extends {Ignored} @@ -9,47 +11,47 @@ import {Subscription} from '../Subscription'; */ export class UsingObservable extends Observable { - static create(resourceFactory: () => Subscription, - observableFactory: (resource: Subscription) => Observable): Observable { + static create(resourceFactory: () => Subscription | void, + observableFactory: (resource: Subscription) => SubscribableOrPromise | void): Observable { return new UsingObservable(resourceFactory, observableFactory); } - constructor(private resourceFactory: () => Subscription, - private observableFactory: (resource: Subscription) => Observable) { + constructor(private resourceFactory: () => Subscription | void, + private observableFactory: (resource: Subscription) => SubscribableOrPromise | void) { super(); } protected _subscribe(subscriber: Subscriber): Subscription | Function | void { - const { resourceFactory, observableFactory } = this; - let resource: Subscription, - source: Observable, - error: any, errorHappened = false; + let resource: Subscription; try { - resource = resourceFactory(); - } catch (e) { - error = e; - errorHappened = true; + resource = resourceFactory(); + return new UsingSubscriber(subscriber, resource, observableFactory); + } catch (err) { + subscriber.error(err); } + } +} - if (errorHappened) { - subscriber.error(error); - } else { - subscriber.add(resource); - try { - source = observableFactory(resource); - } catch (e) { - error = e; - errorHappened = true; - } +class UsingSubscriber extends OuterSubscriber { + constructor(destination: Subscriber, + private resource: Subscription, + private observableFactory: (resource: Subscription) => SubscribableOrPromise | void) { + super(destination); + destination.add(resource); + this.tryUse(); + } - if (errorHappened) { - subscriber.error(error); - } else { - return source.subscribe(subscriber); + private tryUse(): void { + try { + const source = this.observableFactory.call(this, this.resource); + if (source) { + this.add(subscribeToResult(this, source)); } + } catch (err) { + this._error(err); } } -} +} \ No newline at end of file