From f96815e452548ca8faceca1859ff5cb6b750a124 Mon Sep 17 00:00:00 2001 From: Wheat Carrier Date: Sat, 17 Feb 2024 08:07:48 +0800 Subject: [PATCH] Fixed a bug that caused the process crashed when fails to upload --- src/api/client/message-api/file-uploader.ts | 42 +++++++++++++++++---- src/errors/base.ts | 6 +++ src/errors/error-codes.ts | 3 +- src/errors/telegram.ts | 10 +++++ src/server/webdav/tgfs-filesystem.ts | 35 +++++++++++------ src/utils/logger.ts | 6 ++- 6 files changed, 80 insertions(+), 22 deletions(-) create mode 100644 src/errors/telegram.ts diff --git a/src/api/client/message-api/file-uploader.ts b/src/api/client/message-api/file-uploader.ts index aa06ab8..b8cba5d 100644 --- a/src/api/client/message-api/file-uploader.ts +++ b/src/api/client/message-api/file-uploader.ts @@ -1,13 +1,16 @@ import fs from 'fs'; import { Readable } from 'stream'; +import { RPCError } from 'telegram/errors'; + import bigInt from 'big-integer'; import path from 'path'; import { ITDLibClient, TDLibApi } from 'src/api/interface'; import { SendMessageResp, UploadedFile } from 'src/api/types'; import { Queue, generateFileId, getAppropriatedPartSize } from 'src/api/utils'; -import { TechnicalError } from 'src/errors/base'; +import { AggregatedError, TechnicalError } from 'src/errors/base'; +import { FileTooBig } from 'src/errors/telegram'; import { Logger } from 'src/utils/logger'; import { @@ -28,6 +31,8 @@ export abstract class FileUploader { private partCnt: number = 0; private uploaded: number = 0; + private _errors: { [key: number]: Error } = {}; + constructor( protected readonly client: ITDLibClient, protected readonly fileSize: number, @@ -60,6 +65,10 @@ export abstract class FileUploader { const chunk = await this.read(chunkLength); + if (chunk === null) { + return 0; + } + this.uploaded += chunkLength; this.partCnt += 1; @@ -73,14 +82,21 @@ export abstract class FileUploader { bytes: chunk, }); if (!rsp.success) { - console.log(rsp); throw new TechnicalError( `File chunk ${this.partCnt} of ${this.fileName} failed to upload`, ); } return chunkLength; } catch (err) { - Logger.error(`error encountered ${workerId} ${err} ${retry}`); + if (err instanceof RPCError) { + if (err.errorMessage === 'FILE_PARTS_INVALID') { + throw new FileTooBig(this.fileSize); + } + } + + Logger.error( + `error encountered in uploading worker ${workerId}: ${err} retries left: ${retry}`, + ); retry -= 1; if (retry === 0) { @@ -102,11 +118,15 @@ export abstract class FileUploader { if (isBig(this.fileSize)) { const createWorker = async (workerId: number): Promise => { - while (!this.done()) { - await this.uploadNextPart(workerId); - if (callback) { - callback(this.uploaded, this.fileSize); + try { + while (!this.done()) { + await this.uploadNextPart(workerId); + if (callback) { + callback(this.uploaded, this.fileSize); + } } + } catch (err) { + this._errors[workerId] = err; } }; @@ -132,10 +152,18 @@ export abstract class FileUploader { return this.uploaded >= this.fileSize; } + public get errors(): Array { + return Object.values(this._errors); + } + public async send( chatId: number, caption?: string, ): Promise { + if (Object.keys(this._errors).length > 0) { + throw new AggregatedError(this.errors); + } + const req = { chatId, file: this.getUploadedFile(), diff --git a/src/errors/base.ts b/src/errors/base.ts index dff3bdd..431b770 100644 --- a/src/errors/base.ts +++ b/src/errors/base.ts @@ -18,3 +18,9 @@ export class BusinessError extends TechnicalError { super(message, cause); } } + +export class AggregatedError extends Error { + constructor(public readonly errors: Error[]) { + super(errors.map((e) => e.message).join('\n')); + } +} diff --git a/src/errors/error-codes.ts b/src/errors/error-codes.ts index e387d81..27dd9a7 100644 --- a/src/errors/error-codes.ts +++ b/src/errors/error-codes.ts @@ -6,4 +6,5 @@ export type ErrorCodes = | 'INVALID_NAME' | 'RELATIVE_PATH' | 'UNKNOWN_COMMAND' - | 'DIR_IS_NOT_EMPTY'; + | 'DIR_IS_NOT_EMPTY' + | 'FILE_TOO_BIG'; diff --git a/src/errors/telegram.ts b/src/errors/telegram.ts new file mode 100644 index 0000000..66a1d04 --- /dev/null +++ b/src/errors/telegram.ts @@ -0,0 +1,10 @@ +import { BusinessError } from './base'; + +export abstract class TelegramError extends BusinessError {} + +export class FileTooBig extends TelegramError { + constructor(size: number) { + const message = `File size ${size} exceeds Telegram limit`; + super(message, 'FILE_TOO_BIG', message); + } +} diff --git a/src/server/webdav/tgfs-filesystem.ts b/src/server/webdav/tgfs-filesystem.ts index fb6f356..922bdc5 100644 --- a/src/server/webdav/tgfs-filesystem.ts +++ b/src/server/webdav/tgfs-filesystem.ts @@ -240,19 +240,30 @@ export class TGFSFileSystem extends FileSystem { ctx: OpenWriteStreamInfo, callback: ReturnCallback, ): void { - try { - const tgClient = this.tgClient; - const { estimatedSize } = ctx; + (async () => { + try { + const tgClient = this.tgClient; + const { estimatedSize } = ctx; - const stream = new PassThrough(); + const stream = new PassThrough(); - uploadFromStream(tgClient)(stream, estimatedSize, path.toString()); + callback(null, stream); - callback(null, stream); - } catch (err) { - handleError(callback)(err); - Logger.error(err); - } + try { + await uploadFromStream(tgClient)( + stream, + estimatedSize, + path.toString(), + ); + } catch (err) { + stream.destroy(); + throw err; + } + } catch (err) { + handleError(callback)(err); + Logger.error(err); + } + })(); } protected _openReadStream( @@ -264,11 +275,11 @@ export class TGFSFileSystem extends FileSystem { try { const fileRef = await list(this.tgClient)(path.toString()); if (fileRef instanceof TGFSFileRef) { - const buffer = this.tgClient.downloadLatestVersion( + const chunks = this.tgClient.downloadLatestVersion( fileRef, fileRef.name, ); - callback(null, Readable.from(buffer)); + callback(null, Readable.from(chunks)); } else { callback(Errors.InvalidOperation); } diff --git a/src/utils/logger.ts b/src/utils/logger.ts index 6fc11ab..1e601eb 100644 --- a/src/utils/logger.ts +++ b/src/utils/logger.ts @@ -1,4 +1,4 @@ -import { BusinessError, TechnicalError } from '../errors/base'; +import { AggregatedError, BusinessError, TechnicalError } from '../errors/base'; export class Logger { static tzOffset = new Date().getTimezoneOffset() * 60000; @@ -18,7 +18,9 @@ export class Logger { } static error(err: string | Error) { - if (err instanceof BusinessError) { + if (err instanceof AggregatedError) { + err.errors.forEach((e) => this.error(e)); + } else if (err instanceof BusinessError) { console.error( `[${this.getTime()}] [ERROR] ${err.code} ${err.name} ${err.message}`, );