Skip to content

Commit ef8ed44

Browse files
xtianjohnsstaltz
authored andcommitted
feat(flattenConcurrentlyAtMost): add new extra
Add flattenConcurrentlyAtMost(n) extra. flattenConcurrentlyAtMost is designed to provide consumer-configurable concurrency to flattening operations. Two flattening extras exist which allow consumers to flatten a meta stream with maximum concurrency, or with no concurrency. This new operator supports a concurrency limit, representing the maximum amount of _additional_ streams to connect to during flattening. Resolve #161.
1 parent 0c3a855 commit ef8ed44

File tree

2 files changed

+541
-0
lines changed

2 files changed

+541
-0
lines changed
+132
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,132 @@
1+
import {Operator, Stream, OutSender, InternalListener} from '../index';
2+
3+
class FCAMIL<T> implements InternalListener<T>, OutSender<T> {
4+
constructor(public out: Stream<T>,
5+
private op: FlattenConcAMOperator<T>) {
6+
}
7+
8+
_n(t: T) {
9+
this.out._n(t);
10+
}
11+
12+
_e(err: any) {
13+
this.out._e(err);
14+
}
15+
16+
_c() {
17+
this.op.less();
18+
}
19+
}
20+
21+
export class FlattenConcAMOperator<T> implements Operator<Stream<T>, T> {
22+
public type = 'flattenConcurrentlyAtMost';
23+
public out: Stream<T> = null as any;
24+
private _l: number = 0;
25+
private _d: boolean = false;
26+
private _seq: Array<Stream<T>> = [];
27+
28+
constructor(public n: number, public ins: Stream<Stream<T>>) {
29+
}
30+
31+
_start(out: Stream<T>): void {
32+
this.out = out;
33+
this.ins._add(this);
34+
}
35+
36+
_stop(): void {
37+
this.ins._remove(this);
38+
this._l = 0;
39+
this.out = null as any;
40+
this._seq = [];
41+
}
42+
43+
less(): void {
44+
const seq = this._seq;
45+
if (--this._l === 0 && seq.length === 0 && this._d) {
46+
const u = this.out;
47+
if (!u) return;
48+
u._c();
49+
}
50+
if (this._l < this.n && seq.length > 0) {
51+
this._n(seq.shift() as Stream<T>);
52+
}
53+
}
54+
55+
_n(s: Stream<T>) {
56+
const u = this.out;
57+
if (!u) return;
58+
if (this._l < this.n) {
59+
this._l++;
60+
s._add(new FCAMIL(u, this));
61+
} else {
62+
this._seq.push(s);
63+
}
64+
}
65+
66+
_e(err: any) {
67+
const u = this.out;
68+
if (!u) return;
69+
u._e(err);
70+
}
71+
72+
_c() {
73+
const seq = this._seq;
74+
this._d = true;
75+
if (this._l === 0 && seq.length === 0) {
76+
const u = this.out;
77+
if (!u) return;
78+
u._c();
79+
}
80+
}
81+
}
82+
83+
/**
84+
* Flattens a "stream of streams", handling multiple concurrent nested streams
85+
* simultaneously, up to some limit `n`.
86+
*
87+
* If the input stream is a stream that emits streams, then this operator will
88+
* return an output stream which is a flat stream: emits regular events. The
89+
* flattening happens concurrently, up to the configured limit. It works like
90+
* this: when the input stream emits a nested stream,
91+
* *flattenConcurrentlyAtMost* will start imitating that nested one. When the
92+
* next nested stream is emitted on the input stream,
93+
* *flattenConcurrentlyAtMost* will check to see how many streams it is connected
94+
* to. If it is connected to a number of streams less than the limit, it will also
95+
* imitate that new one, but will continue to imitate the previous nested streams
96+
* as well.
97+
*
98+
* If the limit has already been reached, *flattenConcurrentlyAtMost* will put the
99+
* stream in a queue. When any of the streams it is listening to completes, a stream
100+
* is taken out of the queue and `flattenConcurrentlyAtMost` will connect to it.
101+
*
102+
* This process continues until the metastream completes and there are no more
103+
* connected streams or streams in the queue.
104+
*
105+
* Marble diagrams:
106+
*
107+
* ```text
108+
* --+--------+---------------
109+
* \ \
110+
* \ ----1----2---3--|
111+
* --a--b----c----|
112+
* flattenConcurrentlyAtMost(1)
113+
* -----a--b----c-1----2---3--|
114+
* ```
115+
*
116+
* ```text
117+
* --+---+---+-|
118+
* \ \ \
119+
* \ \ ---fgh----i-----jh--|
120+
* \ -----1----2----3--|
121+
* ---a--b-----c--|
122+
* flattenConcurrentlyAtMost(2)
123+
* ---------a--b-1---c2--i-3------fgh----i-----jh--|
124+
* ```
125+
*
126+
* @return {Stream}
127+
*/
128+
export default function flattenConcurrentlyAtMost<T>(n: number): (ins: Stream<Stream<T>>) => Stream<T> {
129+
return function flattenConcAMOperator(ins: Stream<Stream<T>>) {
130+
return new Stream<T>(new FlattenConcAMOperator(n, ins));
131+
};
132+
}

0 commit comments

Comments
 (0)