-
-
Notifications
You must be signed in to change notification settings - Fork 5
/
index.js
142 lines (120 loc) · 3.81 KB
/
index.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
import {Duplex as DuplexStream} from 'node:stream';
import {concatUint8Arrays, isUint8Array} from 'uint8array-extras';
const stop = Symbol('FirstChunkStream.stop');
export default class FirstChunkStream extends DuplexStream {
constructor(options, callback) {
const state = {
isSent: false,
chunks: [],
size: 0,
};
if (typeof options !== 'object' || options === null) {
throw new TypeError('FirstChunkStream constructor requires `options` to be an object.');
}
if (typeof callback !== 'function') {
throw new TypeError('FirstChunkStream constructor requires a callback as its second argument.');
}
if (typeof options.chunkSize !== 'number') {
throw new TypeError('FirstChunkStream constructor requires `options.chunkSize` to be a number.');
}
if (options.objectMode) {
throw new Error('FirstChunkStream doesn\'t support `objectMode` yet.');
}
super(options);
// Initialize the internal state
state.manager = createReadStreamBackpressureManager(this);
const processCallback = (buffer, encoding, done) => {
state.isSent = true;
(async () => {
let result;
try {
result = await callback(buffer, encoding);
} catch (error) {
setImmediate(() => {
this.emit('error', error);
done();
});
return;
}
if (result === stop) {
state.manager.programPush(null, undefined, done);
} else if (isUint8Array(result) || (typeof result === 'string')) {
state.manager.programPush(result, undefined, done);
} else {
state.manager.programPush(result.buffer, result.encoding, done);
}
})();
};
// Writes management
this._write = (chunk, encoding, done) => {
state.encoding = encoding;
if (state.isSent) {
state.manager.programPush(chunk, state.encoding, done);
} else if (chunk.length < options.chunkSize - state.size) {
state.chunks.push(chunk);
state.size += chunk.length;
done();
} else {
state.chunks.push(chunk.slice(0, options.chunkSize - state.size));
chunk = chunk.slice(options.chunkSize - state.size);
state.size += state.chunks.at(-1).length;
processCallback(concatUint8Arrays(state.chunks, state.size), state.encoding, () => {
if (chunk.length === 0) {
done();
return;
}
state.manager.programPush(chunk, state.encoding, done);
});
}
};
this.on('finish', () => {
if (!state.isSent) {
return processCallback(concatUint8Arrays(state.chunks, state.size), state.encoding, () => {
state.manager.programPush(null, state.encoding);
});
}
state.manager.programPush(null, state.encoding);
});
}
}
// Utils to manage readable stream backpressure
function createReadStreamBackpressureManager(readableStream) {
const manager = {
waitPush: true,
programmedPushs: [],
programPush(chunk, encoding, isDone = () => {}) {
// Store the current write
manager.programmedPushs.push([chunk, encoding, isDone]);
// Need to be async to avoid nested push attempts
// Programm a push attempt
setImmediate(manager.attemptPush);
// Let's say we're ready for a read
readableStream.emit('readable');
readableStream.emit('drain');
},
attemptPush() {
let nextPush;
if (manager.waitPush) {
if (manager.programmedPushs.length > 0) {
nextPush = manager.programmedPushs.shift();
manager.waitPush = readableStream.push(nextPush[0], nextPush[1]);
(nextPush[2])();
}
} else {
setImmediate(() => {
// Need to be async to avoid nested push attempts
readableStream.emit('readable');
});
}
},
};
function streamFilterRestoreRead() {
manager.waitPush = true;
// Need to be async to avoid nested push attempts
setImmediate(manager.attemptPush);
}
// Patch the readable stream to manage reads
readableStream._read = streamFilterRestoreRead;
return manager;
}
FirstChunkStream.stop = stop;