From 7353ea42ab84e75efb809eb48653d7eee35a9125 Mon Sep 17 00:00:00 2001 From: ahsan-virani Date: Fri, 19 Nov 2021 17:33:37 +0100 Subject: [PATCH] lazy delete non-saved executions binary data --- packages/cli/commands/start.ts | 2 +- packages/cli/config/index.ts | 2 +- .../cli/src/WorkflowExecuteAdditionalData.ts | 11 +- packages/core/src/BinaryDataManager.ts | 167 +++++++++++++++--- 4 files changed, 149 insertions(+), 33 deletions(-) diff --git a/packages/cli/commands/start.ts b/packages/cli/commands/start.ts index cccaa8249436d..f4540ab7cfad9 100644 --- a/packages/cli/commands/start.ts +++ b/packages/cli/commands/start.ts @@ -316,7 +316,7 @@ export class Start extends Command { InternalHooksManager.init(instanceId); const binaryDataConfig = config.get('binaryDataManager') as IBinaryDataConfig; - await BinaryDataHelper.init(binaryDataConfig); + await BinaryDataHelper.init(binaryDataConfig, true); await Server.start(); diff --git a/packages/cli/config/index.ts b/packages/cli/config/index.ts index 0c759b92c4d99..7edc6329f25c5 100644 --- a/packages/cli/config/index.ts +++ b/packages/cli/config/index.ts @@ -172,7 +172,7 @@ const config = convict({ process: { doc: 'In what process workflows should be executed', format: ['main', 'own'], - default: 'main', + default: 'own', env: 'EXECUTIONS_PROCESS', }, diff --git a/packages/cli/src/WorkflowExecuteAdditionalData.ts b/packages/cli/src/WorkflowExecuteAdditionalData.ts index 0963a733420c3..475d8877aeaf3 100644 --- a/packages/cli/src/WorkflowExecuteAdditionalData.ts +++ b/packages/cli/src/WorkflowExecuteAdditionalData.ts @@ -14,7 +14,7 @@ /* eslint-disable @typescript-eslint/no-unused-vars */ /* eslint-disable func-names */ /* eslint-disable @typescript-eslint/no-unsafe-assignment */ -import { UserSettings, WorkflowExecute } from 'n8n-core'; +import { BinaryDataHelper, UserSettings, WorkflowExecute } from 'n8n-core'; import { IDataObject, @@ -481,10 +481,9 @@ function hookFunctionsSave(parentProcessMode?: string): IWorkflowExecuteHooks { if (isManualMode && !saveManualExecutions && !fullRunData.waitTill) { // Data is always saved, so we remove from database - // eslint-disable-next-line @typescript-eslint/no-non-null-assertion - console.log('JSON.stringify(fullRunData)'); - // console.log(JSON.stringify(fullRunData)); await Db.collections.Execution!.delete(this.executionId); + BinaryDataHelper.getInstance().findAndMarkDataForDeletionFromFullRunData(fullRunData); + return; } @@ -516,9 +515,9 @@ function hookFunctionsSave(parentProcessMode?: string): IWorkflowExecuteHooks { ); } // Data is always saved, so we remove from database - console.log('JSON.stringify(fullRunData2)'); - // console.log(JSON.stringify(fullRunData)); await Db.collections.Execution!.delete(this.executionId); + BinaryDataHelper.getInstance().findAndMarkDataForDeletionFromFullRunData(fullRunData); + return; } } diff --git a/packages/core/src/BinaryDataManager.ts b/packages/core/src/BinaryDataManager.ts index 7b2af296d6342..43693cc4f0652 100644 --- a/packages/core/src/BinaryDataManager.ts +++ b/packages/core/src/BinaryDataManager.ts @@ -2,7 +2,14 @@ import { parse } from 'flatted'; import { promises as fs } from 'fs'; import * as path from 'path'; import { v4 as uuid } from 'uuid'; -import { IBinaryData, IRunExecutionData, ITaskData } from 'n8n-workflow'; +import { + IBinaryData, + IDataObject, + IRun, + IRunData, + IRunExecutionData, + ITaskData, +} from 'n8n-workflow'; import { BINARY_ENCODING } from './Constants'; import { IBinaryDataConfig, IExecutionFlattedDb } from './Interfaces'; @@ -13,17 +20,23 @@ export class BinaryDataHelper { private storagePath: string; + private managerId: string; + constructor(mode: string, storagePath: string) { this.storageMode = mode; this.storagePath = storagePath; + this.managerId = `manager-${uuid()}`; } - static async init(config: IBinaryDataConfig): Promise { + static async init(config: IBinaryDataConfig, clearOldData = false): Promise { if (BinaryDataHelper.instance) { throw new Error('Binary Data Manager already initialized'); } BinaryDataHelper.instance = new BinaryDataHelper(config.mode, config.localStoragePath); + if (clearOldData) { + await BinaryDataHelper.instance.deleteMarkedFiles(); + } if (config.mode === 'LOCAL_STORAGE') { return fs @@ -39,6 +52,7 @@ export class BinaryDataHelper { if (!BinaryDataHelper.instance) { throw new Error('Binary Data Manager not initialized'); } + return BinaryDataHelper.instance; } @@ -75,10 +89,64 @@ export class BinaryDataHelper { throw new Error('Binary data storage mode is set to default'); } + findAndMarkDataForDeletionFromFullRunData(fullRunData: IRun): void { + const identifiers = this.findBinaryDataFromRunData(fullRunData.data.resultData.runData); + void this.markDataForDeletion(identifiers); + } + + findAndMarkDataForDeletion(fullExecutionDataList: IExecutionFlattedDb[]): void { + const identifiers = this.findBinaryData(fullExecutionDataList); + void this.markDataForDeletion(identifiers); + } + generateIdentifier(): string { return uuid(); } + private async markDataForDeletion(identifiers: string[]): Promise { + const currentFiles = await this.getFilesToDelete(`meta-${this.managerId}.json`); + const filesToDelete = identifiers.reduce((acc: IDataObject, cur: string) => { + acc[cur] = 1; + return acc; + }, currentFiles); + + setTimeout(async () => { + const currentFilesToDelete = await this.getFilesToDelete(`meta-${this.managerId}.json`); + identifiers.forEach(async (identifier) => { + void this.deleteBinaryDataByIdentifier(identifier); + delete currentFilesToDelete[identifier]; + }); + + void this.writeDeletionIdsToFile(currentFilesToDelete); + }, 60000 * 60); // 1 hour + + return this.writeDeletionIdsToFile(filesToDelete); + } + + private getBinaryDataMetaPath() { + return path.join(this.storagePath, 'meta'); + } + + private async writeDeletionIdsToFile(filesToDelete: IDataObject): Promise { + return fs.writeFile( + path.join(this.getBinaryDataMetaPath(), `meta-${this.managerId}.json`), + JSON.stringify(filesToDelete, null, '\t'), + ); + } + + private async getFilesToDelete(metaFilename: string): Promise { + let filesToDelete = {}; + try { + const file = await fs.readFile(path.join(this.getBinaryDataMetaPath(), metaFilename), 'utf8'); + + filesToDelete = JSON.parse(file) as IDataObject; + } catch { + return {}; + } + + return filesToDelete; + } + async findAndDeleteBinaryData(fullExecutionDataList: IExecutionFlattedDb[]): Promise { if (this.storageMode === 'LOCAL_STORAGE') { const allIdentifiers: string[] = []; @@ -86,27 +154,7 @@ export class BinaryDataHelper { fullExecutionDataList.forEach((fullExecutionData) => { const { runData } = (parse(fullExecutionData.data) as IRunExecutionData).resultData; - Object.values(runData).forEach((item: ITaskData[]) => { - item.forEach((taskData) => { - if (taskData?.data) { - Object.values(taskData.data).forEach((connectionData) => { - connectionData.forEach((executionData) => { - if (executionData) { - executionData.forEach((element) => { - if (element?.binary) { - Object.values(element?.binary).forEach((binaryItem) => { - if (binaryItem.internalIdentifier) { - allIdentifiers.push(binaryItem.internalIdentifier); - } - }); - } - }); - } - }); - }); - } - }); - }); + allIdentifiers.push(...this.findBinaryDataFromRunData(runData)); }); return Promise.all( @@ -117,9 +165,78 @@ export class BinaryDataHelper { return Promise.resolve(); } - async deleteBinaryDataByIdentifier(identifier: string): Promise { + private async deleteMarkedFiles(): Promise { + if (this.storageMode === 'LOCAL_STORAGE') { + const metaFileNames = (await fs.readdir(this.getBinaryDataMetaPath())).filter((filename) => + filename.startsWith('meta-manager'), + ); + + const deletePromises = metaFileNames.map(async (metaFile) => + this.deleteMarkedFilesByMetaFile(metaFile).then(async () => + this.deleteMetaFileByName(metaFile), + ), + ); + + return Promise.all(deletePromises).finally(async () => this.writeDeletionIdsToFile({})); + } + + return Promise.resolve(); + } + + private async deleteMarkedFilesByMetaFile(metaFilename: string): Promise { + return this.getFilesToDelete(metaFilename).then(async (filesToDelete) => { + return Promise.all( + Object.keys(filesToDelete).map(async (identifier) => + this.deleteBinaryDataByIdentifier(identifier), + ), + ).then(() => {}); + }); + } + + private findBinaryData(fullExecutionDataList: IExecutionFlattedDb[]): string[] { + const allIdentifiers: string[] = []; + fullExecutionDataList.forEach((fullExecutionData) => { + const { runData } = (parse(fullExecutionData.data) as IRunExecutionData).resultData; + allIdentifiers.push(...this.findBinaryDataFromRunData(runData)); + }); + + return allIdentifiers; + } + + private findBinaryDataFromRunData(runData: IRunData): string[] { + const allIdentifiers: string[] = []; + + Object.values(runData).forEach((item: ITaskData[]) => { + item.forEach((taskData) => { + if (taskData?.data) { + Object.values(taskData.data).forEach((connectionData) => { + connectionData.forEach((executionData) => { + if (executionData) { + executionData.forEach((element) => { + if (element?.binary) { + Object.values(element?.binary).forEach((binaryItem) => { + if (binaryItem.internalIdentifier) { + allIdentifiers.push(binaryItem.internalIdentifier); + } + }); + } + }); + } + }); + }); + } + }); + }); + + return allIdentifiers; + } + + private async deleteMetaFileByName(filename: string): Promise { + return fs.rm(path.join(this.getBinaryDataMetaPath(), filename)); + } + + private async deleteBinaryDataByIdentifier(identifier: string): Promise { if (this.storageMode === 'LOCAL_STORAGE') { - console.log('deleting: ', identifier); return this.deleteFromLocalStorage(identifier); }