Skip to content

Commit

Permalink
.
Browse files Browse the repository at this point in the history
  • Loading branch information
TheodoreKrypton committed Mar 31, 2024
1 parent b646753 commit b7a802f
Show file tree
Hide file tree
Showing 14 changed files with 235 additions and 114 deletions.
48 changes: 18 additions & 30 deletions src/api/client/file-api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import { TGFSFileVersion } from 'src/model/file';
import { validateName } from 'src/utils/validate-name';

import { DirectoryApi } from './directory-api';
import { GeneralFileMessage, isFileMessageEmpty } from './message-api/types';
import { GeneralFileMessage } from './message-api/types';

export class FileApi extends DirectoryApi {
private async createFile(where: TGFSDirectory, fileMsg: GeneralFileMessage) {
Expand All @@ -17,37 +17,26 @@ export class FileApi extends DirectoryApi {
return fr;
}

private async updateFileRefMessageIdIfNecessary(
fr: TGFSFileRef,
messageId: number,
) {
if (fr.getMessageId() !== messageId) {
// original file description message is gone
fr.setMessageId(messageId);
await this.syncMetadata();
}
}

private async updateFile(
fr: TGFSFileRef,
fileMsg: GeneralFileMessage,
versionId?: string,
) {
const fd = await this.getFileDesc(fr, false);

if (!isFileMessageEmpty(fileMsg)) {
let id: number = null;
id = await this.sendFile(fileMsg);

if (!versionId) {
fd.addVersionFromFileMessageId(id);
} else {
const fv = fd.getVersion(versionId);
fv.messageId = id;

fd.updateVersion(fv);
}
} else {
if (!versionId) {
fd.addEmptyVersion();
} else {
const fv = fd.getVersion(versionId);
fv.setInvalid();
fd.updateVersion(fv);
}
}

await this.updateFileDesc(fr, fd);

const messageId = versionId
? await this.updateFileVersion(fr, fileMsg, versionId)
: await this.addFileVersion(fr, fileMsg);
await this.updateFileRefMessageIdIfNecessary(fr, messageId);
return fr;
}

Expand All @@ -56,9 +45,8 @@ export class FileApi extends DirectoryApi {
fr.delete();
await this.syncMetadata();
} else {
const fd = await this.getFileDesc(fr, false);
fd.deleteVersion(version);
await this.updateFileDesc(fr, fd);
const messageId = await this.deleteFileVersion(fr, version);
await this.updateFileRefMessageIdIfNecessary(fr, messageId);
}
}

Expand Down
92 changes: 81 additions & 11 deletions src/api/client/file-desc-api.ts
Original file line number Diff line number Diff line change
@@ -1,21 +1,46 @@
import { MessageNotFound } from 'src/errors/telegram';
import { TGFSFileRef } from 'src/model/directory';
import { TGFSFile } from 'src/model/file';
import { Logger } from 'src/utils/logger';

import { MessageApi } from './message-api';
import { GeneralFileMessage } from './message-api/types';
import { GeneralFileMessage, isFileMessageEmpty } from './message-api/types';

export class FileDescApi extends MessageApi {
public async createFileDesc(file: GeneralFileMessage): Promise<number> {
const tgfsFile = new TGFSFile(file.name);
private async sendFileDesc(
fd: TGFSFile,
messageId?: number,
): Promise<number> {
if (!messageId) {
return await this.sendText(JSON.stringify(fd.toObject()));
}
// edit an existing message
try {
return await this.editMessageText(
messageId,
JSON.stringify(fd.toObject()),
);
} catch (err) {
if (err instanceof MessageNotFound) {
// the message to edit is gone
return await this.sendText(JSON.stringify(fd.toObject()));
} else {
throw err;
}
}
}

public async createFileDesc(fileMsg: GeneralFileMessage): Promise<number> {
const tgfsFile = new TGFSFile(fileMsg.name);

if ('empty' in file) {
if ('empty' in fileMsg) {
tgfsFile.addEmptyVersion();
} else {
const id = await this.sendFile(file);
const id = await this.sendFile(fileMsg);
tgfsFile.addVersionFromFileMessageId(id);
}

return await this.sendText(JSON.stringify(tgfsFile.toObject()));
return await this.sendFileDesc(tgfsFile);
}

public async getFileDesc(
Expand All @@ -24,6 +49,15 @@ export class FileDescApi extends MessageApi {
): Promise<TGFSFile> {
const message = (await this.getMessages([fileRef.getMessageId()]))[0];

if (!message) {
Logger.error(
`File description (messageId: ${fileRef.getMessageId()}) for ${
fileRef.name
} not found`,
);
return TGFSFile.empty(`[Content Not Found]${fileRef.name}`);
}

const fileDesc = TGFSFile.fromObject(JSON.parse(message.text));

if (withVersionInfo) {
Expand All @@ -50,10 +84,46 @@ export class FileDescApi extends MessageApi {
return fileDesc;
}

public async updateFileDesc(fr: TGFSFileRef, fd: TGFSFile): Promise<number> {
return await this.editMessageText(
fr.getMessageId(),
JSON.stringify(fd.toObject()),
);
public async addFileVersion(
fr: TGFSFileRef,
fileMsg: GeneralFileMessage,
): Promise<number> {
const fd = await this.getFileDesc(fr, false);

if (isFileMessageEmpty(fileMsg)) {
fd.addEmptyVersion();
} else {
const messageId = await this.sendFile(fileMsg);
fd.addVersionFromFileMessageId(messageId);
}
return await this.sendFileDesc(fd, fr.getMessageId());
}

public async updateFileVersion(
fr: TGFSFileRef,
fileMsg: GeneralFileMessage,
versionId: string,
): Promise<number> {
const fd = await this.getFileDesc(fr);
if (isFileMessageEmpty(fileMsg)) {
const fv = fd.getVersion(versionId);
fv.setInvalid();
fd.updateVersion(fv);
} else {
const messageId = await this.sendFile(fileMsg);
const fv = fd.getVersion(versionId);
fv.messageId = messageId;
fd.updateVersion(fv);
}
return await this.sendFileDesc(fd, fr.getMessageId());
}

public async deleteFileVersion(
fr: TGFSFileRef,
versionId: string,
): Promise<number> {
const fd = await this.getFileDesc(fr, false);
fd.deleteVersion(versionId);
return await this.sendFileDesc(fd, fr.getMessageId());
}
}
43 changes: 31 additions & 12 deletions src/api/client/message-api/api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@ import fs from 'fs';
import { IBot, TDLibApi } from 'src/api/interface';
import { config } from 'src/config';
import { TechnicalError } from 'src/errors/base';
import { MessageNotFound } from 'src/errors/telegram';
import { db } from 'src/server/manager/db';
import { Logger } from 'src/utils/logger';

import { getUploader } from './file-uploader';
import { MessageBroker } from './message-broker';
Expand Down Expand Up @@ -37,13 +39,21 @@ export class MessageApi extends MessageBroker {
messageId: number,
message: string,
): Promise<number> {
return (
await this.bot.editMessageText({
chatId: this.privateChannelId,
messageId,
text: message,
})
).messageId;
try {
return (
await this.bot.editMessageText({
chatId: this.privateChannelId,
messageId,
text: message,
})
).messageId;
} catch (err) {
if (err.message === 'message to edit not found') {
throw new MessageNotFound(messageId);
} else {
throw err;
}
}
}

protected async editMessageMedia(
Expand Down Expand Up @@ -110,22 +120,27 @@ export class MessageApi extends MessageBroker {
protected async sendFile(fileMsg: GeneralFileMessage): Promise<number> {
const _send = async (fileMsg: GeneralFileMessage): Promise<number> => {
const uploader = getUploader(this.tdlib, fileMsg);
await uploader.upload(fileMsg, MessageApi.report);
return (
await uploader.upload(fileMsg, MessageApi.report, fileMsg.name);
const messageId = (
await uploader.send(
this.privateChannelId,
MessageApi.getFileCaption(fileMsg),
)
).messageId;
Logger.debug('File sent', JSON.stringify(fileMsg));
return messageId;
};

if ('stream' in fileMsg) {
Logger.debug(
`Sending file ${JSON.stringify({ ...fileMsg, stream: 'hidden' })}`,
);
return await _send(fileMsg);
}

const fileHash = (await this.sha256(fileMsg))
.digest('hex')
.substring(0, 16);
Logger.debug(`Sending file ${JSON.stringify(fileMsg)}`);

const fileHash = (await this.sha256(fileMsg)).digest('hex');

fileMsg.tags = { sha256: fileHash };

Expand All @@ -135,6 +150,10 @@ export class MessageApi extends MessageBroker {
});

if (existingFile.length > 0) {
Logger.debug(
`Found file with the same sha256 ${fileHash}, skip uploading`,
JSON.stringify(existingFile[0]),
);
return existingFile[0].messageId;
}

Expand Down
27 changes: 20 additions & 7 deletions src/api/client/message-api/file-uploader.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ export abstract class FileUploader<T extends GeneralFileMessage> {
private fileName: string;
private fileId: bigInt.BigInteger;

private isBig: boolean;
private partCnt: number = 0;
private uploaded: number = 0;

Expand All @@ -38,6 +39,7 @@ export abstract class FileUploader<T extends GeneralFileMessage> {
protected readonly fileSize: number,
) {
this.fileId = generateFileId();
this.isBig = isBig(fileSize);
}

protected abstract get defaultFileName(): string;
Expand Down Expand Up @@ -75,7 +77,7 @@ export abstract class FileUploader<T extends GeneralFileMessage> {
let retry = 3;
while (retry) {
try {
const rsp = isBig(this.fileSize)
const rsp = this.isBig
? await this.client.saveBigFilePart({
fileId: this.fileId,
filePart: this.partCnt - 1, // 0-indexed
Expand Down Expand Up @@ -116,32 +118,42 @@ export abstract class FileUploader<T extends GeneralFileMessage> {
file: T,
callback?: (uploaded: number, totalSize: number) => void,
fileName?: string,
workers: number = 15,
workers: {
small?: number;
big?: number;
} = {
small: 3,
big: 15,
},
): Promise<void> {
this.prepare(file);
try {
this.fileName = fileName ?? this.defaultFileName;

const createWorker = async (workerId: number): Promise<void> => {
const createWorker = async (workerId: number): Promise<boolean> => {
try {
while (!this.done()) {
await this.uploadNextPart(workerId);
if (callback) {
Logger.info(
`[worker ${workerId}] ${
this.uploaded / this.fileSize
(this.uploaded * 100) / this.fileSize
}% uploaded`,
);
callback(this.uploaded, this.fileSize);
}
}
return true;
} catch (err) {
this._errors[workerId] = err;
return false;
}
};

const promises: Array<Promise<void>> = [];
for (let i = 0; i < workers; i++) {
const promises: Array<Promise<boolean>> = [];

const numWorkers = this.isBig ? workers.big : workers.small;
for (let i = 0; i < numWorkers; i++) {
promises.push(createWorker(i));
}

Expand Down Expand Up @@ -173,7 +185,7 @@ export abstract class FileUploader<T extends GeneralFileMessage> {
name: this.fileName,
caption,
};
if (isBig(this.fileSize)) {
if (this.isBig) {
return await this.client.sendBigFile(req);
} else {
return await this.client.sendSmallFile(req);
Expand Down Expand Up @@ -309,6 +321,7 @@ export function getUploader(
fileMsg: GeneralFileMessage,
): FileUploader<GeneralFileMessage> {
const selectApi = (fileSize: number) => {
// bot cannot upload files larger than 50MB
return fileSize > 50 * 1024 * 1024 ? tdlib.account : tdlib.bot;
};

Expand Down
2 changes: 1 addition & 1 deletion src/api/impl/gramjs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ export class GramJSApi implements ITDLibClient {
const rsp = await this.client.sendFile(req.chatId, {
file: new Api.InputFile({
id: req.file.id,
parts: 1,
parts: req.file.parts,
name: req.file.name,
md5Checksum: '',
}),
Expand Down
3 changes: 2 additions & 1 deletion src/errors/error-codes.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,5 @@ export type ErrorCodes =
| 'RELATIVE_PATH'
| 'UNKNOWN_COMMAND'
| 'DIR_IS_NOT_EMPTY'
| 'FILE_TOO_BIG';
| 'FILE_TOO_BIG'
| 'MESSAGE_NOT_FOUND';
7 changes: 7 additions & 0 deletions src/errors/telegram.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,10 @@ export class FileTooBig extends TelegramError {
super(message, 'FILE_TOO_BIG', message);
}
}

export class MessageNotFound extends TelegramError {
constructor(messageId: number) {
const message = `Message with id ${messageId} not found`;
super(message, 'MESSAGE_NOT_FOUND', message);
}
}
Loading

0 comments on commit b7a802f

Please sign in to comment.