Skip to content

Commit

Permalink
fix(MemoryStream): fix a leaking execution bug
Browse files Browse the repository at this point in the history
Instead of MemoryStream._add using super._add, we reimplement MemoryStream._add with custom logic.
Now it's guaranteed that it will add the listener to the listeners array before emitting the
remembered value. This way, if the listener is an operator like take() which can synchronously stop
and remove itself from the source, we guarantee that effective _add happens before effective
_remove. Previously, we had effective _add happen after effective _remove, causing a stream
execution to remain even though the listener would never be removed.

BREAKING CHANGE:
This is generally safe to update, but note that the behavior around
MemoryStream, startWith, take, imitate etc may have slightly changed, so
it is recommended to run tests on your application and see if it is
working, in case your application code was relying on buggy behavior.

Closes #53
  • Loading branch information
staltz committed Oct 24, 2016
1 parent 1ef0019 commit 47e67ff
Show file tree
Hide file tree
Showing 2 changed files with 120 additions and 2 deletions.
18 changes: 16 additions & 2 deletions src/core.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2159,8 +2159,22 @@ export class MemoryStream<T> extends Stream<T> {
}

_add(il: InternalListener<T>): void {
if (this._has) { il._n(this._v); }
super._add(il);
const ta = this._target;
if (ta !== NO) return ta._add(il);
const a = this._ils;
a.push(il);
if (a.length > 1) {
if (this._has) il._n(this._v);
return;
}
if (this._stopID !== NO) {
if (this._has) il._n(this._v);
clearTimeout(this._stopID);
this._stopID = NO;
} else if (this._has) il._n(this._v); else {
const p = this._prod;
if (p !== NO) p._start(this);
}
}

_stopNow() {
Expand Down
104 changes: 104 additions & 0 deletions tests/memoryStream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -161,4 +161,108 @@ describe('MemoryStream', () => {
},
});
});

/**
* When an operator listener is about to be added to a memory stream, the
* operator will synchronously receive the remembered value, and may
* synchronously remove itself from the memory stream. This happens in the
* case of e.g. take(). In those cases, the operator should NOT be added AFTER
* it just "got the remembered value and removed itself", because that would
* create leaky executions of the stream. This was reported as bug #53 in
* GitHub.
*/
it('should not allow an operator listener to be indefinitely attached', (done) => {
let debugCalled = 0;
const debugExpected = [42, 0];
const source$ = xs.periodic(100).startWith(42)
.debug(x => {
debugCalled += 1;
assert.strictEqual(debugExpected.length > 0, true);
assert.strictEqual(x, debugExpected.shift());
});

const expectedA = [42, 0];
const expectedB = [42];
let completeCalled = 0;

source$.take(2).addListener({
next: (x) => {
assert.strictEqual(x, expectedA.shift());
},
error: (err) => {},
complete: () => {
completeCalled += 1;
},
});

source$.take(1).addListener({
next: (x) => {
assert.strictEqual(x, expectedB.shift());
},
error: (err) => {},
complete: () => {
completeCalled += 1;
}
});

setTimeout(() => {
assert.strictEqual(debugExpected.length, 0);
assert.strictEqual(expectedA.length, 0);
assert.strictEqual(expectedB.length, 0);
assert.strictEqual(completeCalled, 2);
assert.strictEqual(debugCalled, 2);
done();
}, 500);
});

it('should emit remembered value also when cancelling a stop', (done) => {
const expectedA = [1];
const expectedB = [1, 2];
let completeCalled = 0;
let doneCalled = 0;

const source$ = xs.createWithMemory({
start(listener: Listener<number>) {
listener.next(1);
setTimeout(() => {
listener.next(2);
listener.complete();
}, 100);
},
stop() {
doneCalled += 1;
},
});

const listenerA = {
next: (x: number) => {
assert.strictEqual(x, expectedA.shift());
source$.removeListener(listenerA);
},
error: (err: any) => {},
complete: () => {
completeCalled += 1;
},
};

source$.addListener(listenerA);

source$.addListener({
next: (x) => {
assert.strictEqual(x, expectedB.shift());
},
error: (err) => {},
complete: () => {
completeCalled += 1;
}
});

setTimeout(() => {
assert.strictEqual(expectedA.length, 0);
assert.strictEqual(expectedB.length, 0);
assert.strictEqual(completeCalled, 1);
assert.strictEqual(doneCalled, 1);
done();
}, 300);
});
});

0 comments on commit 47e67ff

Please sign in to comment.