Skip to content

Commit

Permalink
feat(imitate): implement imitate() operator for circular dependencies
Browse files Browse the repository at this point in the history
  • Loading branch information
Andre Medeiros committed Mar 27, 2016
1 parent 46c4782 commit 6545670
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 27 deletions.
40 changes: 23 additions & 17 deletions src/Stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,15 +34,17 @@ export class Stream<T> implements InternalListener<T> {
this._listeners = [];
}

static create<T>(producer: Producer<T>): Stream<T> {
(<InternalProducer<T>> (<any> producer))._start =
function _start(il: InternalListener<T>) {
(<Listener<T>> (<any> il)).next = il._n;
(<Listener<T>> (<any> il)).error = il._e;
(<Listener<T>> (<any> il)).complete = il._c;
this.start(<Listener<T>> (<any> il));
};
(<InternalProducer<T>> (<any> producer))._stop = producer.stop;
static create<T>(producer?: Producer<T>): Stream<T> {
if (producer) {
(<InternalProducer<T>> (<any> producer))._start =
function _start(il: InternalListener<T>) {
(<Listener<T>> (<any> il)).next = il._n;
(<Listener<T>> (<any> il)).error = il._e;
(<Listener<T>> (<any> il)).complete = il._c;
this.start(<Listener<T>> (<any> il));
};
(<InternalProducer<T>> (<any> producer))._stop = producer.stop;
}
return new Stream(<InternalProducer<T>> (<any> producer));
}

Expand Down Expand Up @@ -89,7 +91,7 @@ export class Stream<T> implements InternalListener<T> {
this._listeners[i]._c();
}
}
this._stopID = setTimeout(() => this._prod._stop());
if (this._prod) this._stopID = setTimeout(() => this._prod._stop());
this._listeners = [];
}

Expand All @@ -104,22 +106,22 @@ export class Stream<T> implements InternalListener<T> {
this._remove(<InternalListener<T>> (<any> listener));
}

_add(listener: InternalListener<T>): void {
this._listeners.push(listener);
_add(il: InternalListener<T>): void {
this._listeners.push(il);
if (this._listeners.length === 1) {
if (this._stopID !== empty) {
clearTimeout(this._stopID);
this._stopID = empty;
}
this._prod._start(this);
if (this._prod) this._prod._start(this);
}
}

_remove(listener: InternalListener<T>): void {
const i = this._listeners.indexOf(listener);
_remove(il: InternalListener<T>): void {
const i = this._listeners.indexOf(il);
if (i > -1) {
this._listeners.splice(i, 1);
if (this._listeners.length <= 0) {
if (this._prod && this._listeners.length <= 0) {
this._stopID = setTimeout(() => this._prod._stop());
}
}
Expand Down Expand Up @@ -163,7 +165,7 @@ export class Stream<T> implements InternalListener<T> {

static empty(): Stream<void> {
return new Stream<void>({
_start(lner: InternalListener<void>) { lner._c(); },
_start(il: InternalListener<void>) { il._c(); },
_stop: noop,
});
}
Expand Down Expand Up @@ -222,6 +224,10 @@ export class Stream<T> implements InternalListener<T> {
streams.unshift(this);
return Stream.combine(project, ...streams);
};

imitate(other: Stream<T>): void {
other._add(this);
}
}

export class MemoryStream<T> extends Stream<T> {
Expand Down
31 changes: 21 additions & 10 deletions tests/stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,11 +45,7 @@ describe('Stream', () => {
});

it('should have all the core operators as methods, plus addListener and removeListener', () => {
const emptyProducer = {
start(): void { return undefined; },
stop(): void { return undefined; },
};
const stream = xs.create(emptyProducer);
const stream = xs.create();
assert.equal(typeof stream.addListener, 'function');
assert.equal(typeof stream.removeListener, 'function');
assert.equal(typeof stream.map, 'function');
Expand All @@ -71,11 +67,7 @@ describe('Stream', () => {
const expected = [10, 20, 30];
let listenerGotEnd: boolean = false;

const emptyProducer = {
start(): void { return undefined; },
stop(): void { return undefined; },
};
const stream = xs.create(emptyProducer);
const stream = xs.create();

stream.addListener({
next: (x: number) => {
Expand All @@ -97,6 +89,25 @@ describe('Stream', () => {
done();
});

it('should allow being imitated by a proxy Stream', (done) => {
const stream = xs.of(10, 20, 30);
const proxyStream = xs.create();

const expected = [10, 20, 30];
proxyStream.addListener({
next: (x: string) => {
assert.equal(x, expected.shift());
},
error: e => done(e),
complete: () => {
assert.equal(expected.length, 0);
done();
},
});

proxyStream.imitate(stream);
});

it('should be possible to addListener and removeListener with 1 listener', (done) => {
const stream = xs.interval(100);
const expected = [0, 1, 2];
Expand Down

0 comments on commit 6545670

Please sign in to comment.