Skip to content

Commit

Permalink
fix(fromObservable): support synchronous unsubscribe on completion
Browse files Browse the repository at this point in the history
If the observable being converted to a stream will synchronously complete when subscribed to, then

it would also immediately unsubscribe. The previous implementation of fromObservable was buggy in

the sense that the synchronous unsubscribe would fail. This fixes fromObservable.
  • Loading branch information
staltz committed Dec 12, 2016
1 parent b3a0cf6 commit e82b8da
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 10 deletions.
29 changes: 19 additions & 10 deletions src/core.ts
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,12 @@ export interface Listener<T> {
complete: () => void;
}

export type Observable<T> = {
subscribe(listener: Listener<T>): { unsubscribe: () => void; }
export interface Subscription {
unsubscribe(): void;
}

export interface Observable<T> {
subscribe(listener: Listener<T>): Subscription;
};

export type FromInput<T> = Promise<T> | Stream<T> | Array<T> | Observable<T>;
Expand All @@ -79,7 +83,7 @@ function and<T>(f1: (t: T) => boolean, f2: (t: T) => boolean): (t: T) => boolean
};
}

export class Subscription<T> {
export class StreamSubscription<T> implements Subscription {
constructor(private _stream: Stream<T>, private _listener: Listener<T>) {}

unsubscribe(): void {
Expand All @@ -89,21 +93,26 @@ export class Subscription<T> {

class ObservableProducer<T> implements InternalProducer<T> {
public type = 'fromObservable';
public ins: any;
public ins: Observable<T>;
public out: Stream<T>;
private _subscription: { unsubscribe: () => void; };
private active: boolean;
private _sub: Subscription | undefined;

constructor(observable: any) {
constructor(observable: Observable<T>) {
this.ins = observable;
this.active = false;
}

_start(out: Stream<T>) {
this.out = out;
this._subscription = this.ins.subscribe(new ObservableListener(out));
this.active = true;
this._sub = this.ins.subscribe(new ObservableListener(out));
if (!this.active) this._sub.unsubscribe();
}

_stop() {
this._subscription.unsubscribe();
if (this._sub) this._sub.unsubscribe();
this.active = false;
}
}

Expand Down Expand Up @@ -1354,10 +1363,10 @@ export class Stream<T> implements InternalListener<T> {
* @param {Listener} listener
* @returns {Subscription}
*/
subscribe(listener: Listener<T>): Subscription<T> {
subscribe(listener: Listener<T>): Subscription {
this.addListener(listener);

return new Subscription<T>(this, listener);
return new StreamSubscription<T>(this, listener);
}

/**
Expand Down
18 changes: 18 additions & 0 deletions tests/factory/fromObservable.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,22 @@ describe('xs.fromObservable', () => {
},
});
});

it('should support synchronous unsubscribe on completion', (done: any) => {
const stream = xs.fromObservable(xs.of(10, 20, 30));
let expected = [10, 20, 30];

stream.addListener({
next(x: number) {
assert.strictEqual(x, expected.shift());
},
error(err: any) {
done(err);
},
complete() {
assert.strictEqual(expected.length, 0);
done();
},
});
});
});

0 comments on commit e82b8da

Please sign in to comment.