-
Notifications
You must be signed in to change notification settings - Fork 270
/
parse-remote.ts
404 lines (375 loc) · 11.3 KB
/
parse-remote.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
import { Semaphore } from '@php-wasm/util';
import { filterStream } from '../utils/filter-stream';
import { concatUint8Array } from '../utils/concat-uint8-array';
import { collectBytes } from '../utils/collect-bytes';
import {
readCentralDirectoryEntry,
readFileEntry,
unzipFiles,
} from './parse-stream';
import { CentralDirectoryEntry, FileEntry } from './common';
import { SIGNATURE_CENTRAL_DIRECTORY_END } from './common';
import { IterableReadableStream } from './iterable-stream-polyfill';
const CENTRAL_DIRECTORY_END_SCAN_CHUNK_SIZE = 110 * 1024;
const BATCH_DOWNLOAD_OF_FILES_IF_CLOSER_THAN = 10 * 1024;
const PREFER_RANGES_IF_FILE_LARGER_THAN = 1024 * 1024 * 1;
const fetchSemaphore = new Semaphore({ concurrency: 10 });
const DEFAULT_PREDICATE = () => true;
/**
* Streams the contents of a remote zip file.
*
* If the zip is large and the predicate is filtering the zip contents,
* only the matching files will be downloaded using the Range header
* (if supported by the server).
*
* @param url The URL of the zip file.
* @param predicate Optional. A function that returns true if the file should be downloaded.
* @returns A stream of zip entries.
*/
export async function unzipFilesRemote(
url: string,
predicate: (
dirEntry: CentralDirectoryEntry | FileEntry
) => boolean = DEFAULT_PREDICATE
) {
if (predicate === DEFAULT_PREDICATE) {
// If we're not filtering the zip contents, let's just
// grab the entire zip.
const response = await fetch(url);
return unzipFiles(response.body!);
}
const contentLength = await fetchContentLength(url);
if (contentLength <= PREFER_RANGES_IF_FILE_LARGER_THAN) {
// If the zip is small enough, let's just grab it.
const response = await fetch(url);
return unzipFiles(response.body!);
}
// Ensure ranges query support:
// Fetch one byte
const response = await fetch(url, {
headers: {
// 0-0 looks weird, doesn't it?
// The Range header is inclusive so it's actually
// a valid header asking for the first byte.
Range: 'bytes=0-0',
'Accept-Encoding': 'none',
},
});
// Fork the stream so that we can reuse it in case
// the Range header is unsupported and we're now streaming
// the entire file
const [peekStream, responseStream] = response.body!.tee();
// Read from the forked stream and close it.
const peekReader = peekStream.getReader();
const { value: peekBytes } = await peekReader.read();
const { done: peekDone } = await peekReader.read();
peekReader.releaseLock();
peekStream.cancel();
// Confirm our Range query worked as intended:
const rangesSupported = peekBytes?.length === 1 && peekDone;
if (!rangesSupported) {
// Uh-oh, we're actually streaming the entire file.
// Let's reuse the forked stream as our response stream.
return unzipFiles(responseStream);
}
// We're good, let's clean up the other branch of the response stream.
responseStream.cancel();
const source = await createFetchSource(url, contentLength);
return streamCentralDirectoryEntries(source)
.pipeThrough(filterStream(predicate))
.pipeThrough(partitionNearbyEntries())
.pipeThrough(
fetchPartitionedEntries(source)
) as IterableReadableStream<FileEntry>;
}
/**
* Streams the central directory entries of a zip file.
*
* @param source
* @returns
*/
function streamCentralDirectoryEntries(source: BytesSource) {
let centralDirectoryStream: ReadableStream<Uint8Array>;
return new ReadableStream<CentralDirectoryEntry>({
async start() {
centralDirectoryStream = await streamCentralDirectoryBytes(source);
},
async pull(controller) {
const entry = await readCentralDirectoryEntry(
centralDirectoryStream
);
if (!entry) {
controller.close();
return;
}
controller.enqueue(entry);
},
});
}
/**
* Streams the central directory bytes of a zip file.
*
* @param source
* @returns
*/
async function streamCentralDirectoryBytes(source: BytesSource) {
const chunkSize = CENTRAL_DIRECTORY_END_SCAN_CHUNK_SIZE;
let centralDirectory: Uint8Array = new Uint8Array();
let chunkStart = source.length;
do {
chunkStart = Math.max(0, chunkStart - chunkSize);
const chunkEnd = Math.min(
chunkStart + chunkSize - 1,
source.length - 1
);
const bytes = await collectBytes(
await source.streamBytes(chunkStart, chunkEnd)
);
centralDirectory = concatUint8Array(bytes!, centralDirectory);
// Scan the buffer for the signature
const view = new DataView(bytes!.buffer);
for (let i = view.byteLength - 4; i >= 0; i--) {
if (view.getUint32(i, true) !== SIGNATURE_CENTRAL_DIRECTORY_END) {
continue;
}
// Confirm we have enough data to read the offset and the
// length of the central directory.
const centralDirectoryLengthAt = i + 12;
const centralDirectoryOffsetAt = centralDirectoryLengthAt + 4;
if (centralDirectory.byteLength < centralDirectoryOffsetAt + 4) {
throw new Error('Central directory not found');
}
// Read where the central directory starts
const dirStart = view.getUint32(centralDirectoryOffsetAt, true);
if (dirStart < chunkStart) {
// We're missing some bytes, let's grab them
const missingBytes = await collectBytes(
await source.streamBytes(dirStart, chunkStart - 1)
);
centralDirectory = concatUint8Array(
missingBytes!,
centralDirectory
);
} else if (dirStart > chunkStart) {
// We've read too many bytes, let's trim them
centralDirectory = centralDirectory.slice(
dirStart - chunkStart
);
}
return new Blob([centralDirectory]).stream();
}
} while (chunkStart >= 0);
throw new Error('Central directory not found');
}
/**
* Partitions files that are no further apart in the zip
* archive than BATCH_DOWNLOAD_OF_FILES_IF_CLOSER_THAN.
* It may download some extra files living within the gaps
* between the partitions.
*/
function partitionNearbyEntries() {
let lastFileEndsAt = 0;
let currentChunk: CentralDirectoryEntry[] = [];
return new TransformStream<CentralDirectoryEntry, CentralDirectoryEntry[]>({
transform(zipEntry, controller) {
// Byte distance too large, flush and start a new chunk
if (
zipEntry.firstByteAt >
lastFileEndsAt + BATCH_DOWNLOAD_OF_FILES_IF_CLOSER_THAN
) {
controller.enqueue(currentChunk);
currentChunk = [];
}
lastFileEndsAt = zipEntry.lastByteAt;
currentChunk.push(zipEntry);
},
flush(controller) {
controller.enqueue(currentChunk);
},
});
}
/**
* Fetches a chunk of files from the zip archive.
*
* If any extra files are present in the received
* bytes stream, they are filtered out.
*/
function fetchPartitionedEntries(
source: BytesSource
): ReadableWritablePair<FileEntry, CentralDirectoryEntry[]> {
/**
* This function implements a ReadableStream and a WritableStream
* instead of a TransformStream. This is intentional.
*
* In TransformStream, the `transform` function may return a
* promise. The next call to `transform` will be delayed until
* the promise resolves. This is a problem for us because we
* want to issue many fetch() requests in parallel.
*
* The only way to do that seems to be creating separate ReadableStream
* and WritableStream implementations.
*/
let isWritableClosed = false;
let requestsInProgress = 0;
let readableController: ReadableStreamDefaultController<FileEntry>;
const byteStreams: Array<
[CentralDirectoryEntry[], ReadableStream<Uint8Array>]
> = [];
/**
* Receives chunks of CentralDirectoryEntries, and fetches
* the corresponding byte ranges from the remote zip file.
*/
const writable = new WritableStream<CentralDirectoryEntry[]>({
write(zipEntries, controller) {
if (!zipEntries.length) {
return;
}
++requestsInProgress;
// If the write() method returns a promise, the next
// call will be delayed until the promise resolves.
// Let's not return the promise, then.
// This will effectively issue many requests in parallel.
requestChunkRange(source, zipEntries)
.then((byteStream) => {
byteStreams.push([zipEntries, byteStream]);
})
.catch((e) => {
controller.error(e);
})
.finally(() => {
--requestsInProgress;
});
},
abort() {
isWritableClosed = true;
readableController.close();
},
async close() {
isWritableClosed = true;
},
});
/**
* Decodes zipped bytes into FileEntry objects.
*/
const readable = new ReadableStream<FileEntry>({
start(controller) {
readableController = controller;
},
async pull(controller) {
while (true) {
const allChunksProcessed =
isWritableClosed &&
!byteStreams.length &&
requestsInProgress === 0;
if (allChunksProcessed) {
controller.close();
return;
}
// There's no bytes available, but the writable
// stream is still open or there are still requests
// in progress. Let's wait for more bytes.
const waitingForMoreBytes = !byteStreams.length;
if (waitingForMoreBytes) {
await new Promise((resolve) => setTimeout(resolve, 50));
continue;
}
const [requestedPaths, stream] = byteStreams[0];
const file = await readFileEntry(stream);
// The stream is exhausted, let's remove it from the queue
// and try the next one.
const streamExhausted = !file;
if (streamExhausted) {
byteStreams.shift();
continue;
}
// There may be some extra files between the ones we're
// interested in. Let's filter out any files that got
// intertwined in the byte stream.
const isOneOfRequestedPaths = requestedPaths.find(
(entry) => entry.path === file.path
);
if (!isOneOfRequestedPaths) {
continue;
}
// Finally! We've got a file we're interested in.
controller.enqueue(file);
break;
}
},
});
return {
readable,
writable,
};
}
/**
* Requests a chunk of bytes from the bytes source.
*
* @param source
* @param zipEntries
*/
async function requestChunkRange(
source: BytesSource,
zipEntries: CentralDirectoryEntry[]
) {
const release = await fetchSemaphore.acquire();
try {
const lastZipEntry = zipEntries[zipEntries.length - 1];
const substream = await source.streamBytes(
zipEntries[0].firstByteAt,
lastZipEntry.lastByteAt
);
return substream;
} finally {
release();
}
}
/**
* Fetches the Content-Length header from a remote URL.
*/
async function fetchContentLength(url: string) {
return await fetch(url, { method: 'HEAD' })
.then((response) => response.headers.get('Content-Length'))
.then((contentLength) => {
if (!contentLength) {
throw new Error('Content-Length header is missing');
}
return parseInt(contentLength, 10);
});
}
/**
* Private and experimental API: Range-based data sources.
*
* The idea is that if we can read arbitrary byte ranges from
* a file, we can retrieve a specific subset of a zip file.
*/
type BytesSource = {
length: number;
streamBytes: (
start: number,
end: number
) => Promise<ReadableStream<Uint8Array>>;
};
/**
* Creates a BytesSource enabling fetching ranges of bytes
* from a remote URL.
*/
async function createFetchSource(
url: string,
contentLength?: number
): Promise<BytesSource> {
if (contentLength === undefined) {
contentLength = await fetchContentLength(url);
}
return {
length: contentLength,
streamBytes: async (from: number, to: number) =>
await fetch(url, {
headers: {
// The Range header is inclusive, so we need to subtract 1
Range: `bytes=${from}-${to - 1}`,
'Accept-Encoding': 'none',
},
}).then((response) => response.body!),
};
}