Skip to content

Commit

Permalink
lazy delete non-saved executions binary data
Browse files Browse the repository at this point in the history
  • Loading branch information
ahsanv committed Nov 19, 2021
1 parent c70c457 commit 7353ea4
Show file tree
Hide file tree
Showing 4 changed files with 149 additions and 33 deletions.
2 changes: 1 addition & 1 deletion packages/cli/commands/start.ts
Original file line number Diff line number Diff line change
Expand Up @@ -316,7 +316,7 @@ export class Start extends Command {
InternalHooksManager.init(instanceId);

const binaryDataConfig = config.get('binaryDataManager') as IBinaryDataConfig;
await BinaryDataHelper.init(binaryDataConfig);
await BinaryDataHelper.init(binaryDataConfig, true);

await Server.start();

Expand Down
2 changes: 1 addition & 1 deletion packages/cli/config/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ const config = convict({
process: {
doc: 'In what process workflows should be executed',
format: ['main', 'own'],
default: 'main',
default: 'own',
env: 'EXECUTIONS_PROCESS',
},

Expand Down
11 changes: 5 additions & 6 deletions packages/cli/src/WorkflowExecuteAdditionalData.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
/* eslint-disable @typescript-eslint/no-unused-vars */
/* eslint-disable func-names */
/* eslint-disable @typescript-eslint/no-unsafe-assignment */
import { UserSettings, WorkflowExecute } from 'n8n-core';
import { BinaryDataHelper, UserSettings, WorkflowExecute } from 'n8n-core';

import {
IDataObject,
Expand Down Expand Up @@ -481,10 +481,9 @@ function hookFunctionsSave(parentProcessMode?: string): IWorkflowExecuteHooks {

if (isManualMode && !saveManualExecutions && !fullRunData.waitTill) {
// Data is always saved, so we remove from database
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
console.log('JSON.stringify(fullRunData)');
// console.log(JSON.stringify(fullRunData));
await Db.collections.Execution!.delete(this.executionId);
BinaryDataHelper.getInstance().findAndMarkDataForDeletionFromFullRunData(fullRunData);

return;
}

Expand Down Expand Up @@ -516,9 +515,9 @@ function hookFunctionsSave(parentProcessMode?: string): IWorkflowExecuteHooks {
);
}
// Data is always saved, so we remove from database
console.log('JSON.stringify(fullRunData2)');
// console.log(JSON.stringify(fullRunData));
await Db.collections.Execution!.delete(this.executionId);
BinaryDataHelper.getInstance().findAndMarkDataForDeletionFromFullRunData(fullRunData);

return;
}
}
Expand Down
167 changes: 142 additions & 25 deletions packages/core/src/BinaryDataManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,14 @@ import { parse } from 'flatted';
import { promises as fs } from 'fs';
import * as path from 'path';
import { v4 as uuid } from 'uuid';
import { IBinaryData, IRunExecutionData, ITaskData } from 'n8n-workflow';
import {
IBinaryData,
IDataObject,
IRun,
IRunData,
IRunExecutionData,
ITaskData,
} from 'n8n-workflow';
import { BINARY_ENCODING } from './Constants';
import { IBinaryDataConfig, IExecutionFlattedDb } from './Interfaces';

Expand All @@ -13,17 +20,23 @@ export class BinaryDataHelper {

private storagePath: string;

private managerId: string;

constructor(mode: string, storagePath: string) {
this.storageMode = mode;
this.storagePath = storagePath;
this.managerId = `manager-${uuid()}`;
}

static async init(config: IBinaryDataConfig): Promise<void> {
static async init(config: IBinaryDataConfig, clearOldData = false): Promise<void> {
if (BinaryDataHelper.instance) {
throw new Error('Binary Data Manager already initialized');
}

BinaryDataHelper.instance = new BinaryDataHelper(config.mode, config.localStoragePath);
if (clearOldData) {
await BinaryDataHelper.instance.deleteMarkedFiles();
}

if (config.mode === 'LOCAL_STORAGE') {
return fs
Expand All @@ -39,6 +52,7 @@ export class BinaryDataHelper {
if (!BinaryDataHelper.instance) {
throw new Error('Binary Data Manager not initialized');
}

return BinaryDataHelper.instance;
}

Expand Down Expand Up @@ -75,38 +89,72 @@ export class BinaryDataHelper {
throw new Error('Binary data storage mode is set to default');
}

findAndMarkDataForDeletionFromFullRunData(fullRunData: IRun): void {
const identifiers = this.findBinaryDataFromRunData(fullRunData.data.resultData.runData);
void this.markDataForDeletion(identifiers);
}

findAndMarkDataForDeletion(fullExecutionDataList: IExecutionFlattedDb[]): void {
const identifiers = this.findBinaryData(fullExecutionDataList);
void this.markDataForDeletion(identifiers);
}

generateIdentifier(): string {
return uuid();
}

private async markDataForDeletion(identifiers: string[]): Promise<void> {
const currentFiles = await this.getFilesToDelete(`meta-${this.managerId}.json`);
const filesToDelete = identifiers.reduce((acc: IDataObject, cur: string) => {
acc[cur] = 1;
return acc;
}, currentFiles);

setTimeout(async () => {
const currentFilesToDelete = await this.getFilesToDelete(`meta-${this.managerId}.json`);
identifiers.forEach(async (identifier) => {
void this.deleteBinaryDataByIdentifier(identifier);
delete currentFilesToDelete[identifier];
});

void this.writeDeletionIdsToFile(currentFilesToDelete);
}, 60000 * 60); // 1 hour

return this.writeDeletionIdsToFile(filesToDelete);
}

private getBinaryDataMetaPath() {
return path.join(this.storagePath, 'meta');
}

private async writeDeletionIdsToFile(filesToDelete: IDataObject): Promise<void> {
return fs.writeFile(
path.join(this.getBinaryDataMetaPath(), `meta-${this.managerId}.json`),
JSON.stringify(filesToDelete, null, '\t'),
);
}

private async getFilesToDelete(metaFilename: string): Promise<IDataObject> {
let filesToDelete = {};
try {
const file = await fs.readFile(path.join(this.getBinaryDataMetaPath(), metaFilename), 'utf8');

filesToDelete = JSON.parse(file) as IDataObject;
} catch {
return {};
}

return filesToDelete;
}

async findAndDeleteBinaryData(fullExecutionDataList: IExecutionFlattedDb[]): Promise<unknown> {
if (this.storageMode === 'LOCAL_STORAGE') {
const allIdentifiers: string[] = [];

fullExecutionDataList.forEach((fullExecutionData) => {
const { runData } = (parse(fullExecutionData.data) as IRunExecutionData).resultData;

Object.values(runData).forEach((item: ITaskData[]) => {
item.forEach((taskData) => {
if (taskData?.data) {
Object.values(taskData.data).forEach((connectionData) => {
connectionData.forEach((executionData) => {
if (executionData) {
executionData.forEach((element) => {
if (element?.binary) {
Object.values(element?.binary).forEach((binaryItem) => {
if (binaryItem.internalIdentifier) {
allIdentifiers.push(binaryItem.internalIdentifier);
}
});
}
});
}
});
});
}
});
});
allIdentifiers.push(...this.findBinaryDataFromRunData(runData));
});

return Promise.all(
Expand All @@ -117,9 +165,78 @@ export class BinaryDataHelper {
return Promise.resolve();
}

async deleteBinaryDataByIdentifier(identifier: string): Promise<void> {
private async deleteMarkedFiles(): Promise<unknown> {
if (this.storageMode === 'LOCAL_STORAGE') {
const metaFileNames = (await fs.readdir(this.getBinaryDataMetaPath())).filter((filename) =>
filename.startsWith('meta-manager'),
);

const deletePromises = metaFileNames.map(async (metaFile) =>
this.deleteMarkedFilesByMetaFile(metaFile).then(async () =>
this.deleteMetaFileByName(metaFile),
),
);

return Promise.all(deletePromises).finally(async () => this.writeDeletionIdsToFile({}));
}

return Promise.resolve();
}

private async deleteMarkedFilesByMetaFile(metaFilename: string): Promise<void> {
return this.getFilesToDelete(metaFilename).then(async (filesToDelete) => {
return Promise.all(
Object.keys(filesToDelete).map(async (identifier) =>
this.deleteBinaryDataByIdentifier(identifier),
),
).then(() => {});
});
}

private findBinaryData(fullExecutionDataList: IExecutionFlattedDb[]): string[] {
const allIdentifiers: string[] = [];
fullExecutionDataList.forEach((fullExecutionData) => {
const { runData } = (parse(fullExecutionData.data) as IRunExecutionData).resultData;
allIdentifiers.push(...this.findBinaryDataFromRunData(runData));
});

return allIdentifiers;
}

private findBinaryDataFromRunData(runData: IRunData): string[] {
const allIdentifiers: string[] = [];

Object.values(runData).forEach((item: ITaskData[]) => {
item.forEach((taskData) => {
if (taskData?.data) {
Object.values(taskData.data).forEach((connectionData) => {
connectionData.forEach((executionData) => {
if (executionData) {
executionData.forEach((element) => {
if (element?.binary) {
Object.values(element?.binary).forEach((binaryItem) => {
if (binaryItem.internalIdentifier) {
allIdentifiers.push(binaryItem.internalIdentifier);
}
});
}
});
}
});
});
}
});
});

return allIdentifiers;
}

private async deleteMetaFileByName(filename: string): Promise<void> {
return fs.rm(path.join(this.getBinaryDataMetaPath(), filename));
}

private async deleteBinaryDataByIdentifier(identifier: string): Promise<void> {
if (this.storageMode === 'LOCAL_STORAGE') {
console.log('deleting: ', identifier);
return this.deleteFromLocalStorage(identifier);
}

Expand Down

0 comments on commit 7353ea4

Please sign in to comment.