Skip to content

Commit

Permalink
reduced memory usage when uploading big files
Browse files Browse the repository at this point in the history
  • Loading branch information
TheodoreKrypton committed Feb 16, 2024
1 parent 6117760 commit c485bb3
Show file tree
Hide file tree
Showing 21 changed files with 446 additions and 219 deletions.
29 changes: 12 additions & 17 deletions src/api/client/file-api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,14 @@ import { TGFSFileVersion } from 'src/model/file';
import { validateName } from 'src/utils/validate-name';

import { DirectoryApi } from './directory-api';
import { GeneralFileMessage, isFileMessageEmpty } from './message-api/types';

export class FileApi extends DirectoryApi {
private async createFile(
name: string,
where: TGFSDirectory,
fileContent?: string | Buffer,
) {
validateName(name);
private async createFile(where: TGFSDirectory, fileMsg: GeneralFileMessage) {
validateName(fileMsg.name);

const id = await this.createFileDesc(name, fileContent);
const fr = where.createFileRef(name, id);
const id = await this.createFileDesc(fileMsg);
const fr = where.createFileRef(fileMsg.name, id);

await this.syncMetadata();

Expand All @@ -22,15 +19,14 @@ export class FileApi extends DirectoryApi {

private async updateFile(
fr: TGFSFileRef,
file?: string | Buffer,
fileMsg: GeneralFileMessage,
versionId?: string,
) {
const fd = await this.getFileDesc(fr, false);

if (file) {
const id = await this.sendFile(
typeof file === 'string' ? { path: file } : { buffer: file },
);
if (!isFileMessageEmpty(fileMsg)) {
let id: number = null;
id = await this.sendFile(fileMsg);

if (!versionId) {
fd.addVersionFromFileMessageId(id);
Expand Down Expand Up @@ -68,17 +64,16 @@ export class FileApi extends DirectoryApi {

public async uploadFile(
where: {
name: string;
under: TGFSDirectory;
versionId?: string;
},
file?: string | Buffer,
file?: GeneralFileMessage,
) {
const fr = where.under.findFiles([where.name])[0];
const fr = where.under.findFiles([file.name])[0];
if (fr) {
return await this.updateFile(fr, file, where.versionId);
} else {
return await this.createFile(where.name, where.under, file);
return await this.createFile(where.under, file);
}
}

Expand Down
20 changes: 7 additions & 13 deletions src/api/client/file-desc-api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,23 +2,17 @@ import { TGFSFileRef } from 'src/model/directory';
import { TGFSFile } from 'src/model/file';

import { MessageApi } from './message-api';
import { GeneralFileMessage } from './message-api/types';

export class FileDescApi extends MessageApi {
public async createFileDesc(
name: string,
fileContent?: string | Buffer,
): Promise<number> {
const tgfsFile = new TGFSFile(name);
public async createFileDesc(file: GeneralFileMessage): Promise<number> {
const tgfsFile = new TGFSFile(file.name);

if (fileContent) {
const id = await this.sendFile(
typeof fileContent === 'string'
? { path: fileContent }
: { buffer: fileContent },
);
tgfsFile.addVersionFromFileMessageId(id);
} else {
if ('empty' in file) {
tgfsFile.addEmptyVersion();
} else {
const id = await this.sendFile(file);
tgfsFile.addVersionFromFileMessageId(id);
}

return await this.sendText(JSON.stringify(tgfsFile.toObject()));
Expand Down
8 changes: 4 additions & 4 deletions src/api/client/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@ import { FileApi } from './file-api';

export const createClient = async () => {
const client = new Client(
new gramjs.GramJSApi(
await gramjs.loginAsAccount(config),
await gramjs.loginAsBot(config),
),
{
account: new gramjs.GramJSApi(await gramjs.loginAsAccount(config)),
bot: new gramjs.GramJSApi(await gramjs.loginAsBot(config)),
},
new telegraf.TelegrafApi(telegraf.createBot(config)),
);
await client.init();
Expand Down
72 changes: 35 additions & 37 deletions src/api/client/message-api/api.ts
Original file line number Diff line number Diff line change
@@ -1,25 +1,26 @@
import { Hash, createHash } from 'crypto';
import fs from 'fs';

import { IBotApi, ITDLibApi } from 'src/api/interface';
import { EditMessageMediaReq, SendFileReq } from 'src/api/types';
import { IBot, TDLibApi } from 'src/api/interface';
import { config } from 'src/config';
import { TechnicalError } from 'src/errors/base';
import { db } from 'src/server/manager/db';
import { Logger } from 'src/utils/logger';

import { UploaderFromBuffer, UploaderFromPath } from './file-uploader';
import { getUploader } from './file-uploader';
import { MessageBroker } from './message-broker';
import {
FileMessageEmpty,
FileMessageFromBuffer,
FileMessageFromPath,
GeneralFileMessage,
isFileMessageEmpty,
} from './types';

export class MessageApi extends MessageBroker {
constructor(
protected readonly tdlib: ITDLibApi,
protected readonly bot: IBotApi,
protected readonly tdlib: TDLibApi,
protected readonly bot: IBot,
) {
super(tdlib);
}
Expand Down Expand Up @@ -71,8 +72,11 @@ export class MessageApi extends MessageBroker {
}

private async sha256(
fileMsg: FileMessageFromPath | FileMessageFromBuffer,
fileMsg: FileMessageFromPath | FileMessageFromBuffer | FileMessageEmpty,
): Promise<Hash> {
if (isFileMessageEmpty(fileMsg)) {
throw new TechnicalError('File is empty');
}
if ('path' in fileMsg) {
return new Promise((resolve) => {
const rs = fs.createReadStream(fileMsg.path);
Expand All @@ -91,44 +95,42 @@ export class MessageApi extends MessageBroker {
}

private static getFileCaption(fileMsg: GeneralFileMessage): string {
const caption = fileMsg.caption ? `${fileMsg.caption}\n` : '';
return `${caption}#sha256IS${fileMsg.tags.sha256}`;
let caption = fileMsg.caption ? `${fileMsg.caption}\n` : '';
if (fileMsg.tags) {
if (fileMsg.tags.sha256) {
caption += `#sha256IS${fileMsg.tags.sha256}`;
}
}
return caption;
}

private static report(uploaded: number, totalSize: number) {
Logger.info(`${(uploaded / totalSize) * 100}% uploaded`);
}

private async sendFileFromPath(fileMsg: FileMessageFromPath) {
const { path } = fileMsg;

const uploader = new UploaderFromPath(this.tdlib);
await uploader.upload(path, MessageApi.report);
return await uploader.send(
this.privateChannelId,
MessageApi.getFileCaption(fileMsg),
);
}

private async sendFileFromBuffer(fileMsg: FileMessageFromBuffer) {
const { buffer } = fileMsg;

const uploader = new UploaderFromBuffer(this.tdlib);
await uploader.upload(buffer, MessageApi.report);
return await uploader.send(
this.privateChannelId,
MessageApi.getFileCaption(fileMsg),
);
}

protected async sendFile(fileMsg: GeneralFileMessage): Promise<number> {
const _send = async (fileMsg: GeneralFileMessage): Promise<number> => {
const uploader = getUploader(this.tdlib, fileMsg);
await uploader.upload(fileMsg, MessageApi.report);
return (
await uploader.send(
this.privateChannelId,
MessageApi.getFileCaption(fileMsg),
)
).messageId;
};

if ('stream' in fileMsg) {
return await _send(fileMsg);
}

const fileHash = (await this.sha256(fileMsg))
.digest('hex')
.substring(0, 16);

fileMsg.tags = { sha256: fileHash };

const existingFile = await this.tdlib.searchMessages({
const existingFile = await this.tdlib.account.searchMessages({
chatId: this.privateChannelId,
search: `#sha256IS${fileHash}`,
});
Expand All @@ -137,11 +139,7 @@ export class MessageApi extends MessageBroker {
return existingFile[0].messageId;
}

return (
await ('path' in fileMsg
? this.sendFileFromPath(fileMsg)
: this.sendFileFromBuffer(fileMsg))
).messageId;
return await _send(fileMsg);
}

protected async *downloadFile(
Expand All @@ -152,7 +150,7 @@ export class MessageApi extends MessageBroker {

let downloaded = 0;

for await (const buffer of this.tdlib.downloadFile({
for await (const buffer of this.tdlib.account.downloadFile({
chatId: this.privateChannelId,
messageId: messageId,
chunkSize: config.tgfs.download.chunk_size_kb,
Expand Down
Loading

0 comments on commit c485bb3

Please sign in to comment.