-
-
Notifications
You must be signed in to change notification settings - Fork 137
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
fix(extra): move flattenConcurrently from core to extra
Make flattenConcurrently() an extra operator because it is not that often used as flatten() is. Usage is: `import flattenConcurrently from 'xstream/extra/flattenConcurrently'` and then `streamOfStreams.compose(flattenConcurrently)`. BREAKING CHANGE: flattenConcurrently must be separately imported as an extra operator and used with .compose()
- Loading branch information
Showing
5 changed files
with
109 additions
and
203 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,93 @@ | ||
import {Operator, Stream, InternalListener} from '../core'; | ||
|
||
class FCIL<T> implements InternalListener<T> { | ||
constructor(private out: Stream<T>, | ||
private op: FlattenConcOperator<T>) { | ||
} | ||
|
||
_n(t: T) { | ||
this.out._n(t); | ||
} | ||
|
||
_e(err: any) { | ||
this.out._e(err); | ||
} | ||
|
||
_c() { | ||
this.op.less(); | ||
} | ||
} | ||
|
||
export class FlattenConcOperator<T> implements Operator<Stream<T>, T> { | ||
public type = 'flattenConcurrently'; | ||
private active: number = 1; // number of outers and inners that have not yet ended | ||
private out: Stream<T> = null; | ||
|
||
constructor(public ins: Stream<Stream<T>>) { | ||
} | ||
|
||
_start(out: Stream<T>): void { | ||
this.out = out; | ||
this.ins._add(this); | ||
} | ||
|
||
_stop(): void { | ||
this.ins._remove(this); | ||
this.active = 1; | ||
this.out = null; | ||
} | ||
|
||
less(): void { | ||
if (--this.active === 0) { | ||
const u = this.out; | ||
if (!u) return; | ||
u._c(); | ||
} | ||
} | ||
|
||
_n(s: Stream<T>) { | ||
const u = this.out; | ||
if (!u) return; | ||
this.active++; | ||
s._add(new FCIL(u, this)); | ||
} | ||
|
||
_e(err: any) { | ||
const u = this.out; | ||
if (!u) return; | ||
u._e(err); | ||
} | ||
|
||
_c() { | ||
this.less(); | ||
} | ||
} | ||
|
||
/** | ||
* Flattens a "stream of streams", handling multiple concurrent nested streams | ||
* simultaneously. | ||
* | ||
* If the input stream is a stream that emits streams, then this operator will | ||
* return an output stream which is a flat stream: emits regular events. The | ||
* flattening happens concurrently. It works like this: when the input stream | ||
* emits a nested stream, *flattenConcurrently* will start imitating that | ||
* nested one. When the next nested stream is emitted on the input stream, | ||
* *flattenConcurrently* will also imitate that new one, but will continue to | ||
* imitate the previous nested streams as well. | ||
* | ||
* Marble diagram: | ||
* | ||
* ```text | ||
* --+--------+--------------- | ||
* \ \ | ||
* \ ----1----2---3-- | ||
* --a--b----c----d-------- | ||
* flattenConcurrently | ||
* -----a--b----c-1--d-2---3-- | ||
* ``` | ||
* | ||
* @return {Stream} | ||
*/ | ||
export default function flattenConcurrently<T>(ins: Stream<Stream<T>>): Stream<T> { | ||
return new Stream<T>(new FlattenConcOperator(ins)); | ||
} |
Oops, something went wrong.