Skip to content

Commit

Permalink
fix(flattenSequentially): fix behaviour of outer stream completion
Browse files Browse the repository at this point in the history
The operator flattenSequentially should still emit values from the inner streams even after the
outer stream has completed. This commit changes/fixes the behaviour of flattenSequentially so that a
complete on the outer stream does not complete the result stream if there is still some inner stream
active.

BREAKING CHANGES:
If your code was relying on buggy behavior, you may need to migrate carefully. Check your usages of
flattenSequentially and how the outer stream completes.
![maybe
won't](https://img.shields.io/badge/will%20it%20affect%20me%3F-maybe%20won't-yellowgreen.svg)

ISSUES CLOSED: #141.
  • Loading branch information
staltz committed Nov 17, 2016
1 parent a07e5c9 commit fd31d49
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 6 deletions.
21 changes: 15 additions & 6 deletions src/extra/flattenSequentially.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,25 @@ class FSInner<T> implements InternalListener<T> {

export class FlattenSeqOperator<T> implements Operator<Stream<T>, T> {
public type = 'flattenSequentially';
private open: boolean = true;
private active: boolean = false;
private seq: Array<Stream<T>> = [];
public out: Stream<T> = null as any;
public ins: Stream<Stream<T>>;
private open: boolean;
private active: boolean;
private seq: Array<Stream<T>>;
public out: Stream<T>;

constructor(public ins: Stream<Stream<T>>) {
constructor(ins: Stream<Stream<T>>) {
this.ins = ins;
this.out = null as any;
this.open = true;
this.active = false;
this.seq = [];
}

_start(out: Stream<T>): void {
this.out = out;
this.open = true;
this.active = false;
this.seq = [];
this.ins._add(this);
}

Expand Down Expand Up @@ -73,7 +82,7 @@ export class FlattenSeqOperator<T> implements Operator<Stream<T>, T> {
const u = this.out;
if (!u) return;
this.open = false;
if (this.seq.length === 0) {
if (!this.active && this.seq.length === 0) {
u._c();
}
}
Expand Down
18 changes: 18 additions & 0 deletions tests/extra/flattenSequentially.ts
Original file line number Diff line number Diff line change
Expand Up @@ -133,5 +133,23 @@ describe('flattenSequentially (extra)', () => {
},
});
});

it('should emit data from inner streams after synchronous outer completes', (done) => {
const outer = xs.of(42);
const stream = outer.map(i => xs.periodic(50).take(2).mapTo(i))
.compose(flattenSequentially);
const expected = [42, 42];

stream.addListener({
next: (x: number) => {
assert.strictEqual(x, expected.shift());
},
error: (err: any) => done(err),
complete: () => {
assert.strictEqual(expected.length, 0);
done();
},
});
});
});
});

0 comments on commit fd31d49

Please sign in to comment.