Skip to content

Commit

Permalink
feat(using): add static Observable.using creation operator.
Browse files Browse the repository at this point in the history
  • Loading branch information
trxcllnt authored and kwonoj committed Mar 3, 2016
1 parent 2ef4682 commit 6c76593
Show file tree
Hide file tree
Showing 5 changed files with 83 additions and 0 deletions.
24 changes: 24 additions & 0 deletions spec/observables/using-spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
import * as Rx from '../../dist/cjs/Rx.KitchenSink';
import {it} from '../helpers/test-helper';

const Observable = Rx.Observable;
const Subscription = Rx.Subscription;

describe('Observable.using', () => {
it('should dispose of the resource when the subscription is disposed', (done) => {
let disposed = false;
const source = Observable.using(
() => new Subscription(() => disposed = true),
(resource) => Observable.range(0, 3)
)
.take(2);

source.subscribe();

if (disposed) {
done();
} else {
done.fail('disposed should be true but was false');
}
});
});
2 changes: 2 additions & 0 deletions src/Observable.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import {raceStatic} from './operator/race';
import {RangeObservable} from './observable/RangeObservable';
import {NeverObservable} from './observable/NeverObservable';
import {ErrorObservable} from './observable/ErrorObservable';
import {UsingObservable} from './observable/UsingObservable';
import {AjaxCreationMethod} from './observable/dom/AjaxObservable';
import {WebSocketSubject} from './observable/dom/WebSocketSubject';

Expand Down Expand Up @@ -268,6 +269,7 @@ export class Observable<T> implements CoreOperators<T> {
static range: typeof RangeObservable.create;
static throw: typeof ErrorObservable.create;
static timer: typeof TimerObservable.create;
static using: typeof UsingObservable.create;
static webSocket: typeof WebSocketSubject.create;
static zip: typeof zipStatic;

Expand Down
1 change: 1 addition & 0 deletions src/Rx.KitchenSink.ts
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ import './add/observable/never';
import './add/observable/range';
import './add/observable/throw';
import './add/observable/timer';
import './add/observable/using';
import './add/observable/zip';

// Operators
Expand Down
6 changes: 6 additions & 0 deletions src/add/observable/using.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
import {Observable} from '../../Observable';
import {UsingObservable} from '../../observable/UsingObservable';

Observable.using = UsingObservable.create;

export var _void: void;
50 changes: 50 additions & 0 deletions src/observable/UsingObservable.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
import {Observable} from '../Observable';
import {Subscriber} from '../Subscriber';
import {Subscription} from '../Subscription';

export class UsingObservable<T> extends Observable<T> {

static create<T>(resourceFactory: () => Subscription,
observableFactory: (resource: Subscription) => Observable<T>): Observable<T> {
return new UsingObservable<T>(resourceFactory, observableFactory);
}

constructor(private resourceFactory: () => Subscription,
private observableFactory: (resource: Subscription) => Observable<T>) {
super();
}

protected _subscribe(subscriber: Subscriber<T>): Subscription | Function | void {

const { resourceFactory, observableFactory } = this;

let resource: Subscription,
source: Observable<T>,
error: any, errorHappened = false;

try {
resource = resourceFactory();
} catch (e) {
error = e;
errorHappened = true;
}

if (errorHappened) {
subscriber.error(error);
} else {
subscriber.add(resource);
try {
source = observableFactory(resource);
} catch (e) {
error = e;
errorHappened = true;
}

if (errorHappened) {
subscriber.error(error);
} else {
return source.subscribe(subscriber);
}
}
}
}

0 comments on commit 6c76593

Please sign in to comment.