From 60d574650b9a0d250dd3ced0d1487294f833a709 Mon Sep 17 00:00:00 2001 From: Daan Klarenbeek Date: Sat, 15 Jul 2023 17:16:29 +0200 Subject: [PATCH] perf(ChunkUpload): improve upload and merge speeds --- apps/server/src/api/upload-chunk.ts | 14 +-- apps/server/src/lib/ChunkUpload.ts | 163 ++++++++++++---------------- 2 files changed, 75 insertions(+), 102 deletions(-) diff --git a/apps/server/src/api/upload-chunk.ts b/apps/server/src/api/upload-chunk.ts index 37b88b0c..a0adcb76 100644 --- a/apps/server/src/api/upload-chunk.ts +++ b/apps/server/src/api/upload-chunk.ts @@ -2,7 +2,7 @@ import type { NextFunction, Request, Response } from "express"; import { rm, stat, rename } from "node:fs/promises"; import { join } from "node:path"; import { Auth } from "../lib/Auth.js"; -import { chunkUpload, Utils } from "../lib/index.js"; +import { ChunkUpload, Utils } from "../lib/index.js"; import type { DashboardRequest, Middleware, RequestMethods } from "../lib/types.js"; import type Server from "../Server.js"; import { extension } from "mime-types"; @@ -21,12 +21,12 @@ export default async function handler(server: Server, req: DashboardRequest, res const maxSize = getSize(req.locals.domain.maxStorage === 0 ? 0 : req.locals.domain.maxStorage - req.locals.domain.storage); try { - const assembleChunks = await chunkUpload( - req, - join(req.locals.domain.filesPath, "..", "tmp"), - maxSize ?? Infinity, - req.locals.domain.server.config.parseStorage("10 MB") - ); + const assembleChunks = await ChunkUpload(req, { + tmpDir: join(req.locals.domain.filesPath, "..", "tmp"), + maxChunkSize: req.locals.domain.server.config.parseStorage("10 MB"), + maxFileSize: maxSize ?? Infinity + }); + if (!assembleChunks) { res.sendStatus(204); return; diff --git a/apps/server/src/lib/ChunkUpload.ts b/apps/server/src/lib/ChunkUpload.ts index 342d035d..6ea91eee 100644 --- a/apps/server/src/lib/ChunkUpload.ts +++ b/apps/server/src/lib/ChunkUpload.ts @@ -1,40 +1,8 @@ -/** -BSD 3-Clause License - -Copyright (c) 2018, Quentin Busuttil -All rights reserved. - -Redistribution and use in source and binary forms, with or without -modification, are permitted provided that the following conditions are met: - -* Redistributions of source code must retain the above copyright notice, this - list of conditions and the following disclaimer. - -* Redistributions in binary form must reproduce the above copyright notice, - this list of conditions and the following disclaimer in the documentation - and/or other materials provided with the distribution. - -* Neither the name of the copyright holder nor the names of its - contributors may be used to endorse or promote products derived from - this software without specific prior written permission. - -THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" -AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE -IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE -DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE -FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL -DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR -SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER -CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, -OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE -OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. -*/ - -import type { Request } from "express"; import Busboy from "busboy"; -import { createWriteStream, mkdir, readdir, rmdir, stat, unlink, WriteStream } from "node:fs"; -import { join } from "node:path"; +import type { Request } from "express"; +import { createWriteStream, mkdir, readdir, rmdir, stat, unlink, type WriteStream } from "node:fs"; import { appendFile, readFile } from "node:fs/promises"; +import { join } from "node:path"; import type internal from "node:stream"; /** @@ -68,6 +36,28 @@ function checkTotalSize(maxFileSize: number, maxChunkSize: number, totalChunks: return true; } +/** + * Take all chunks of a file and reassemble them in a unique file + * @param tmpDir + * @param dirPath + * @param fileId + * @param totalChunks + * @param postParams – form post fields + */ +function assembleChunk(tmpDir: string, dirPath: string, chunk: number, fileId: string): Promise { + const assembledFile = join(tmpDir, fileId); + return new Promise((resolve, reject) => { + const pipeChunk = () => { + readFile(join(dirPath, chunk.toString())) + .then((chunkData) => appendFile(assembledFile, chunkData)) + .then(resolve) + .catch(reject); + }; + + pipeChunk(); + }); +} + /** * Delete tmp directory containing chunks * @param dirPath @@ -84,41 +74,16 @@ function cleanChunks(dirPath: string) { }); } -/** - * Take all chunks of a file and reassemble them in a unique file - * @param tmpDir - * @param dirPath - * @param fileId - * @param totalChunks - * @param postParams – form post fields - */ -function assembleChunks( - tmpDir: string, - dirPath: string, - fileId: string, - totalChunks: number, - postParams: Record -): () => Promise { +function finishUpload(tmpDir: string, dirPath: string, fileId: string, postParams: Record, chunk: number) { const assembledFile = join(tmpDir, fileId); - let chunkCount = 0; - return () => { - return new Promise((resolve, reject) => { - const pipeChunk = () => { - readFile(join(dirPath, chunkCount.toString())) - .then((chunk) => appendFile(assembledFile, chunk)) - .then(() => { - // 0 indexed files = length - 1, so increment before comparison - if (totalChunks > ++chunkCount) pipeChunk(); - else { - cleanChunks(dirPath); - resolve({ filePath: assembledFile, postParams }); - } - }) - .catch(reject); - }; - - pipeChunk(); + return new Promise(async (resolve, reject) => { + console.log("hello1"); + await assembleChunk(tmpDir, dirPath, chunk, fileId).catch(reject); + console.log("hello2"); + cleanChunks(dirPath); + console.log("hello3"); + resolve({ filePath: assembledFile, postParams }); }); }; } @@ -151,26 +116,26 @@ function handleFile(tmpDir: string, headers: Request["headers"], fileStream: int const totalChunks = +Number(headers["uploader-chunks-total"]); let error: any; - let assembleChunksPromise: () => Promise; + let finishUploadFn: () => Promise; let finished = false; let writeStream: WriteStream; const writeFile = () => { writeStream = createWriteStream(chunkPath); - writeStream.on("error", (err) => { - error = err; - fileStream.resume(); - }); - - writeStream.on("close", () => { - finished = true; - - // if all is uploaded - if (chunkCount === totalChunks - 1) { - assembleChunksPromise = assembleChunks(tmpDir, dirPath, `${headers["uploader-file-id"]}`, totalChunks, postParams); - } - }); + writeStream + .on("error", (err) => { + error = err; + fileStream.resume(); + }) + .on("close", () => { + finished = true; + + // if all is uploaded + if (chunkCount === totalChunks - 1) { + finishUploadFn = finishUpload(tmpDir, dirPath, `${headers["uploader-file-id"]}`, postParams, chunkCount) as () => Promise; + } else assembleChunk(tmpDir, dirPath, chunkCount, `${headers["uploader-file-id"]}`).catch((err) => (error = err)); + }); fileStream.pipe(writeStream); }; @@ -202,11 +167,11 @@ function handleFile(tmpDir: string, headers: Request["headers"], fileStream: int } return (callback: (...props: any) => void) => { - if (finished && !error) callback(null, assembleChunksPromise); + if (finished && !error) callback(null, finishUploadFn); else if (error) callback(error); else { writeStream.on("error", callback); - writeStream.on("close", () => callback(null, assembleChunksPromise)); + writeStream.on("close", () => callback(null, finishUploadFn)); } }; } @@ -214,14 +179,11 @@ function handleFile(tmpDir: string, headers: Request["headers"], fileStream: int /** * Master function. Parse form and call child ƒs for writing and assembling * @param req – nodejs req object - * @param tmpDir – upload temp dir - * @param maxChunkSize + * @param options The chunk upload options */ -export function chunkUpload( +export function ChunkUpload( req: Request, - tmpDir: string, - maxFileSize: number, - maxChunkSize: number + options: ChunkUploadOptions ): Promise<() => Promise<{ filePath: string; postParams: Record }>> { return new Promise((resolve, reject) => { if (!checkHeaders(req.headers)) { @@ -229,7 +191,7 @@ export function chunkUpload( return; } - if (!checkTotalSize(maxFileSize, maxChunkSize, Number(req.headers["uploader-chunks-total"]))) { + if (!checkTotalSize(options.maxFileSize, options.maxChunkSize, Number(req.headers["uploader-chunks-total"]))) { reject(new Error("File is above size limit")); return; } @@ -239,7 +201,7 @@ export function chunkUpload( let limitReached = false; let getFileStatus: (...props: any) => void; - const busboy = Busboy({ headers: req.headers, limits: { files: 1, fileSize: maxChunkSize * 1000 * 1000 } }); + const busboy = Busboy({ headers: req.headers, limits: { files: 1, fileSize: options.maxChunkSize * 1e3 * 1e3 } }); busboy.on("file", (_, fileStream) => { fileStream.on("limit", () => { @@ -247,7 +209,7 @@ export function chunkUpload( fileStream.resume(); }); - getFileStatus = handleFile(tmpDir, req.headers, fileStream, postParams); + getFileStatus = handleFile(options.tmpDir, req.headers, fileStream, postParams); }); busboy.on("field", (key, val) => { @@ -260,9 +222,9 @@ export function chunkUpload( return; } - getFileStatus((fileErr: any, assembleChunksF: () => Promise<{ filePath: string; postParams: Record }>) => { + getFileStatus((fileErr: any, finishUpload: () => Promise<{ filePath: string; postParams: Record }>) => { if (fileErr) reject(fileErr); - else resolve(assembleChunksF); + else resolve(finishUpload); }); }); @@ -272,3 +234,14 @@ export function chunkUpload( } }); } + +export interface ChunkUploadOptions { + /** The temporary data directory */ + tmpDir: string; + + /** The max file size per upload */ + maxFileSize: number; + + /** The max size per chunk */ + maxChunkSize: number; +}