Skip to content

Commit

Permalink
fix(Stream): fix unsubscription semantics w.r.t. restarting
Browse files Browse the repository at this point in the history
Stream should not restart (stop + start) producing if unsubscribe and subscribe happened
consecutively and synchronously. Delay the actual production stop to the next event loop iteration.
  • Loading branch information
Andre Medeiros committed Feb 27, 2016
1 parent 27bafd0 commit 9a0f3af
Show file tree
Hide file tree
Showing 4 changed files with 154 additions and 5 deletions.
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
"commit": "git-cz",
"lint": "tslint -c tslint.json src/*.ts",
"premocha": "npm run lib",
"mocha": "mocha tests/**/*.ts --require ts-node/register",
"mocha": "mocha tests/*.ts tests/**/*.ts --require ts-node/register",
"postmocha": "rm -rf tests/**/*.js",
"test": "npm run lint && npm run mocha",
"prelib": "rm -rf lib/ && mkdirp lib/",
Expand Down
15 changes: 13 additions & 2 deletions src/Stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,11 @@ import {SkipProducer} from './operator/SkipProducer';
import {DebugProducer} from './operator/DebugProducer';
import {FoldProducer} from './operator/FoldProducer';
import {LastProducer} from './operator/LastProducer';
import {empty} from './utils/empty';

export class Stream<T> implements Observer<T> {
public _observers: Array<Observer<T>>;
public _stopID: any = empty;

constructor(public _producer: Producer<T>) {
this._observers = [];
Expand Down Expand Up @@ -46,18 +48,27 @@ export class Stream<T> implements Observer<T> {
this._observers[i].end();
}
}
this._stopID = setTimeout(() => this._producer.stop());
}

subscribe(observer: Observer<T>): void {
this._observers.push(observer);
if (this._observers.length === 1) this._producer.start(this);
if (this._observers.length === 1) {
if (this._stopID !== empty) {
clearTimeout(this._stopID);
this._stopID = empty;
}
this._producer.start(this);
}
}

unsubscribe(observer: Observer<T>): void {
const i = this._observers.indexOf(observer);
if (i > -1) {
this._observers.splice(i, 1);
if (this._observers.length <= 0) this._producer.stop();
if (this._observers.length <= 0) {
this._stopID = setTimeout(() => this._producer.stop());
}
}
}

Expand Down
8 changes: 8 additions & 0 deletions src/index.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,15 @@
import {Stream} from './Stream';
import {Producer} from './Producer';
import {Observer} from './Observer';
import from from './factory/from';
import interval from './factory/interval';

export {
Producer,
Observer,
Stream,
};

export default {
Stream,
from,
Expand Down
134 changes: 132 additions & 2 deletions tests/stream.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,56 @@
import xs from '../src/index';
import xs, {Producer, Observer, Stream} from '../src/index';
import * as assert from 'assert';

describe('Stream', () => {
it('can be subscribed and unsubscribed with one observer', (done) => {
it('should be createable giving a custom producer object', (done) => {
const expected = [10, 20, 30];
let observerGotEnd: boolean = false;

const producer: Producer<number> = {
start(observer: Observer<number>) {
observer.next(10);
observer.next(20);
observer.next(30);
observer.end();
},

stop() {
done();
assert.equal(expected.length, 0);
assert.equal(observerGotEnd, true);
},
};

const stream: Stream<number> = new Stream(producer);
stream.subscribe({
next: (x: number) => {
assert.equal(x, expected.shift());
},
error: done.fail,
end: () => {
observerGotEnd = true;
},
});
});

it('should have all the core operators as methods, plus subscribe and unsubscribe', () => {
const emptyProducer = {
start(): void { return undefined; },
stop(): void { return undefined; },
};
const stream = new Stream(emptyProducer);
assert.equal(typeof stream.subscribe, 'function');
assert.equal(typeof stream.unsubscribe, 'function');
assert.equal(typeof stream.map, 'function');
assert.equal(typeof stream.filter, 'function');
assert.equal(typeof stream.take, 'function');
assert.equal(typeof stream.skip, 'function');
assert.equal(typeof stream.debug, 'function');
assert.equal(typeof stream.fold, 'function');
assert.equal(typeof stream.last, 'function');
});

it('should be subscribeable and unsubscribeable with one observer', (done) => {
const stream = xs.interval(100);
const expected = [0, 1, 2];
let observer = {
Expand All @@ -18,4 +66,86 @@ describe('Stream', () => {
};
stream.subscribe(observer);
});

it('should broadcast events to two observers', (done) => {
const stream = xs.interval(100);
const expected1 = [0, 1, 2];
const expected2 = [1, 2];

let observer1 = {
next: (x: number) => {
assert.equal(x, expected1.shift());
},
error: done.fail,
end: done.fail,
};
stream.subscribe(observer1);

let observer2 = {
next: (x: number) => {
assert.equal(x, expected2.shift());
},
error: done.fail,
end: done.fail,
};
setTimeout(() => {
stream.subscribe(observer2)
}, 150);

setTimeout(() => {
stream.unsubscribe(observer1);
stream.unsubscribe(observer2);
assert.equal(expected1.length, 0);
assert.equal(expected2.length, 0);
done();
}, 400);
});

it('should not stop if unsubscribed and re-subscribed synchronously', (done) => {
const stream = xs.interval(100);
const expected = [0, 1, 2];
let observer = {
next: (x: number) => {
assert.equal(x, expected.shift());
if (expected.length === 0) {
stream.unsubscribe(observer);
done();
}
},
error: done.fail,
end: done.fail,
};
stream.subscribe(observer);

setTimeout(() => {
stream.unsubscribe(observer);
stream.subscribe(observer);
}, 150);
});

it('should restart if unsubscribed and re-subscribed asynchronously', (done) => {
const stream = xs.interval(100);
let expected = [0, 1, 2];
let observer = {
next: (x: number) => {
assert.equal(x, expected.shift());
if (expected.length === 0) {
stream.unsubscribe(observer);
done();
}
},
error: done.fail,
end: done.fail,
};
stream.subscribe(observer);

setTimeout(() => {
stream.unsubscribe(observer);
}, 130);
setTimeout(() => {
assert.equal(expected.length, 2);
expected = [0, 1, 2];
stream.subscribe(observer);
}, 180);
});
});

0 comments on commit 9a0f3af

Please sign in to comment.