-
Notifications
You must be signed in to change notification settings - Fork 2.7k
/
nodeStream.ts
95 lines (85 loc) · 2.54 KB
/
nodeStream.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
/**
* Original source:
* https://github.com/kmalakoff/response-iterator/blob/master/src/iterators/nodeStream.ts
*/
import type { Readable as NodeReadableStream } from "stream";
import { canUseAsyncIteratorSymbol } from "../../../utilities/index.js";
interface NodeStreamIterator<T> {
next(): Promise<IteratorResult<T, boolean | undefined>>;
[Symbol.asyncIterator]?(): AsyncIterator<T>;
}
export default function nodeStreamIterator<T>(
stream: NodeReadableStream
): AsyncIterableIterator<T> {
let cleanup: (() => void) | null = null;
let error: Error | null = null;
let done = false;
const data: unknown[] = [];
const waiting: [
(
value:
| IteratorResult<T, boolean | undefined>
| PromiseLike<IteratorResult<T, boolean | undefined>>
) => void,
(reason?: any) => void,
][] = [];
function onData(chunk: any) {
if (error) return;
if (waiting.length) {
const shiftedArr = waiting.shift();
if (Array.isArray(shiftedArr) && shiftedArr[0]) {
return shiftedArr[0]({ value: chunk, done: false });
}
}
data.push(chunk);
}
function onError(err: Error) {
error = err;
const all = waiting.slice();
all.forEach(function (pair) {
pair[1](err);
});
!cleanup || cleanup();
}
function onEnd() {
done = true;
const all = waiting.slice();
all.forEach(function (pair) {
pair[0]({ value: undefined, done: true });
});
!cleanup || cleanup();
}
cleanup = function () {
cleanup = null;
stream.removeListener("data", onData);
stream.removeListener("error", onError);
stream.removeListener("end", onEnd);
stream.removeListener("finish", onEnd);
stream.removeListener("close", onEnd);
};
stream.on("data", onData);
stream.on("error", onError);
stream.on("end", onEnd);
stream.on("finish", onEnd);
stream.on("close", onEnd);
function getNext(): Promise<IteratorResult<T, boolean | undefined>> {
return new Promise(function (resolve, reject) {
if (error) return reject(error);
if (data.length)
return resolve({ value: data.shift() as T, done: false });
if (done) return resolve({ value: undefined, done: true });
waiting.push([resolve, reject]);
});
}
const iterator: NodeStreamIterator<T> = {
next(): Promise<IteratorResult<T, boolean | undefined>> {
return getNext();
},
};
if (canUseAsyncIteratorSymbol) {
iterator[Symbol.asyncIterator] = function (): AsyncIterator<T> {
return this;
};
}
return iterator as AsyncIterableIterator<T>;
}