Skip to content

Commit

Permalink
Fixed a bug that caused the process crashed when fails to upload
Browse files Browse the repository at this point in the history
  • Loading branch information
TheodoreKrypton committed Feb 17, 2024
1 parent cc54c9d commit f96815e
Show file tree
Hide file tree
Showing 6 changed files with 80 additions and 22 deletions.
42 changes: 35 additions & 7 deletions src/api/client/message-api/file-uploader.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
import fs from 'fs';
import { Readable } from 'stream';

import { RPCError } from 'telegram/errors';

import bigInt from 'big-integer';
import path from 'path';

import { ITDLibClient, TDLibApi } from 'src/api/interface';
import { SendMessageResp, UploadedFile } from 'src/api/types';
import { Queue, generateFileId, getAppropriatedPartSize } from 'src/api/utils';
import { TechnicalError } from 'src/errors/base';
import { AggregatedError, TechnicalError } from 'src/errors/base';
import { FileTooBig } from 'src/errors/telegram';
import { Logger } from 'src/utils/logger';

import {
Expand All @@ -28,6 +31,8 @@ export abstract class FileUploader<T extends GeneralFileMessage> {
private partCnt: number = 0;
private uploaded: number = 0;

private _errors: { [key: number]: Error } = {};

constructor(
protected readonly client: ITDLibClient,
protected readonly fileSize: number,
Expand Down Expand Up @@ -60,6 +65,10 @@ export abstract class FileUploader<T extends GeneralFileMessage> {

const chunk = await this.read(chunkLength);

if (chunk === null) {
return 0;
}

this.uploaded += chunkLength;
this.partCnt += 1;

Expand All @@ -73,14 +82,21 @@ export abstract class FileUploader<T extends GeneralFileMessage> {
bytes: chunk,
});
if (!rsp.success) {
console.log(rsp);
throw new TechnicalError(
`File chunk ${this.partCnt} of ${this.fileName} failed to upload`,
);
}
return chunkLength;
} catch (err) {
Logger.error(`error encountered ${workerId} ${err} ${retry}`);
if (err instanceof RPCError) {
if (err.errorMessage === 'FILE_PARTS_INVALID') {
throw new FileTooBig(this.fileSize);
}
}

Logger.error(
`error encountered in uploading worker ${workerId}: ${err} retries left: ${retry}`,
);

retry -= 1;
if (retry === 0) {
Expand All @@ -102,11 +118,15 @@ export abstract class FileUploader<T extends GeneralFileMessage> {

if (isBig(this.fileSize)) {
const createWorker = async (workerId: number): Promise<void> => {
while (!this.done()) {
await this.uploadNextPart(workerId);
if (callback) {
callback(this.uploaded, this.fileSize);
try {
while (!this.done()) {
await this.uploadNextPart(workerId);
if (callback) {
callback(this.uploaded, this.fileSize);
}
}
} catch (err) {
this._errors[workerId] = err;
}
};

Expand All @@ -132,10 +152,18 @@ export abstract class FileUploader<T extends GeneralFileMessage> {
return this.uploaded >= this.fileSize;
}

public get errors(): Array<Error> {
return Object.values(this._errors);
}

public async send(
chatId: number,
caption?: string,
): Promise<SendMessageResp> {
if (Object.keys(this._errors).length > 0) {
throw new AggregatedError(this.errors);
}

const req = {
chatId,
file: this.getUploadedFile(),
Expand Down
6 changes: 6 additions & 0 deletions src/errors/base.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,9 @@ export class BusinessError extends TechnicalError {
super(message, cause);
}
}

export class AggregatedError extends Error {
constructor(public readonly errors: Error[]) {
super(errors.map((e) => e.message).join('\n'));
}
}
3 changes: 2 additions & 1 deletion src/errors/error-codes.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,5 @@ export type ErrorCodes =
| 'INVALID_NAME'
| 'RELATIVE_PATH'
| 'UNKNOWN_COMMAND'
| 'DIR_IS_NOT_EMPTY';
| 'DIR_IS_NOT_EMPTY'
| 'FILE_TOO_BIG';
10 changes: 10 additions & 0 deletions src/errors/telegram.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
import { BusinessError } from './base';

export abstract class TelegramError extends BusinessError {}

export class FileTooBig extends TelegramError {
constructor(size: number) {
const message = `File size ${size} exceeds Telegram limit`;
super(message, 'FILE_TOO_BIG', message);
}
}
35 changes: 23 additions & 12 deletions src/server/webdav/tgfs-filesystem.ts
Original file line number Diff line number Diff line change
Expand Up @@ -240,19 +240,30 @@ export class TGFSFileSystem extends FileSystem {
ctx: OpenWriteStreamInfo,
callback: ReturnCallback<Writable>,
): void {
try {
const tgClient = this.tgClient;
const { estimatedSize } = ctx;
(async () => {
try {
const tgClient = this.tgClient;
const { estimatedSize } = ctx;

const stream = new PassThrough();
const stream = new PassThrough();

uploadFromStream(tgClient)(stream, estimatedSize, path.toString());
callback(null, stream);

callback(null, stream);
} catch (err) {
handleError(callback)(err);
Logger.error(err);
}
try {
await uploadFromStream(tgClient)(
stream,
estimatedSize,
path.toString(),
);
} catch (err) {
stream.destroy();
throw err;
}
} catch (err) {
handleError(callback)(err);
Logger.error(err);
}
})();
}

protected _openReadStream(
Expand All @@ -264,11 +275,11 @@ export class TGFSFileSystem extends FileSystem {
try {
const fileRef = await list(this.tgClient)(path.toString());
if (fileRef instanceof TGFSFileRef) {
const buffer = this.tgClient.downloadLatestVersion(
const chunks = this.tgClient.downloadLatestVersion(
fileRef,
fileRef.name,
);
callback(null, Readable.from(buffer));
callback(null, Readable.from(chunks));
} else {
callback(Errors.InvalidOperation);
}
Expand Down
6 changes: 4 additions & 2 deletions src/utils/logger.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { BusinessError, TechnicalError } from '../errors/base';
import { AggregatedError, BusinessError, TechnicalError } from '../errors/base';

export class Logger {
static tzOffset = new Date().getTimezoneOffset() * 60000;
Expand All @@ -18,7 +18,9 @@ export class Logger {
}

static error(err: string | Error) {
if (err instanceof BusinessError) {
if (err instanceof AggregatedError) {
err.errors.forEach((e) => this.error(e));
} else if (err instanceof BusinessError) {
console.error(
`[${this.getTime()}] [ERROR] ${err.code} ${err.name} ${err.message}`,
);
Expand Down

0 comments on commit f96815e

Please sign in to comment.