Skip to content

Commit

Permalink
fix(core): Register workflows as active only after all of the trigger…
Browse files Browse the repository at this point in the history
…s and pollers setup successfully (#12244)
  • Loading branch information
netroy authored Dec 20, 2024
1 parent a8dd35b commit f924f2a
Show file tree
Hide file tree
Showing 3 changed files with 373 additions and 76 deletions.
21 changes: 11 additions & 10 deletions packages/core/src/ActiveWorkflows.ts
Original file line number Diff line number Diff line change
Expand Up @@ -71,16 +71,13 @@ export class ActiveWorkflows {
getTriggerFunctions: IGetExecuteTriggerFunctions,
getPollFunctions: IGetExecutePollFunctions,
) {
this.activeWorkflows[workflowId] = {};
const triggerNodes = workflow.getTriggerNodes();

let triggerResponse: ITriggerResponse | undefined;

this.activeWorkflows[workflowId].triggerResponses = [];
const triggerResponses: ITriggerResponse[] = [];

for (const triggerNode of triggerNodes) {
try {
triggerResponse = await this.triggersAndPollers.runTrigger(
const triggerResponse = await this.triggersAndPollers.runTrigger(
workflow,
triggerNode,
getTriggerFunctions,
Expand All @@ -89,10 +86,7 @@ export class ActiveWorkflows {
activation,
);
if (triggerResponse !== undefined) {
// If a response was given save it

// eslint-disable-next-line @typescript-eslint/no-unnecessary-type-assertion
this.activeWorkflows[workflowId].triggerResponses!.push(triggerResponse);
triggerResponses.push(triggerResponse);
}
} catch (e) {
const error = e instanceof Error ? e : new Error(`${e}`);
Expand All @@ -104,6 +98,8 @@ export class ActiveWorkflows {
}
}

this.activeWorkflows[workflowId] = { triggerResponses };

const pollingNodes = workflow.getPollNodes();

if (pollingNodes.length === 0) return;
Expand All @@ -119,6 +115,11 @@ export class ActiveWorkflows {
activation,
);
} catch (e) {
// Do not mark this workflow as active if there are no triggerResponses, and any polling activation failed
if (triggerResponses.length === 0) {
delete this.activeWorkflows[workflowId];
}

const error = e instanceof Error ? e : new Error(`${e}`);

throw new WorkflowActivationError(
Expand All @@ -132,7 +133,7 @@ export class ActiveWorkflows {
/**
* Activates polling for the given node
*/
async activatePolling(
private async activatePolling(
node: INode,
workflow: Workflow,
additionalData: IWorkflowExecuteAdditionalData,
Expand Down
290 changes: 290 additions & 0 deletions packages/core/src/__tests__/ActiveWorkflows.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,290 @@
import { mock } from 'jest-mock-extended';
import type {
IGetExecuteTriggerFunctions,
INode,
ITriggerResponse,
IWorkflowExecuteAdditionalData,
Workflow,
WorkflowActivateMode,
WorkflowExecuteMode,
TriggerTime,
CronExpression,
} from 'n8n-workflow';
import { LoggerProxy, TriggerCloseError, WorkflowActivationError } from 'n8n-workflow';

import { ActiveWorkflows } from '@/ActiveWorkflows';
import type { ErrorReporter } from '@/error-reporter';
import type { PollContext } from '@/node-execution-context';
import type { ScheduledTaskManager } from '@/ScheduledTaskManager';
import type { TriggersAndPollers } from '@/TriggersAndPollers';

describe('ActiveWorkflows', () => {
const workflowId = 'test-workflow-id';
const workflow = mock<Workflow>();
const additionalData = mock<IWorkflowExecuteAdditionalData>();
const mode: WorkflowExecuteMode = 'trigger';
const activation: WorkflowActivateMode = 'init';

const getTriggerFunctions = jest.fn() as IGetExecuteTriggerFunctions;
const triggerResponse = mock<ITriggerResponse>();

const pollFunctions = mock<PollContext>();
const getPollFunctions = jest.fn<PollContext, unknown[]>();

LoggerProxy.init(mock());
const scheduledTaskManager = mock<ScheduledTaskManager>();
const triggersAndPollers = mock<TriggersAndPollers>();
const errorReporter = mock<ErrorReporter>();
const triggerNode = mock<INode>();
const pollNode = mock<INode>();

let activeWorkflows: ActiveWorkflows;

beforeEach(() => {
jest.clearAllMocks();
activeWorkflows = new ActiveWorkflows(scheduledTaskManager, triggersAndPollers, errorReporter);
});

type PollTimes = { item: TriggerTime[] };
type TestOptions = {
triggerNodes?: INode[];
pollNodes?: INode[];
triggerError?: Error;
pollError?: Error;
pollTimes?: PollTimes;
};

const addWorkflow = async ({
triggerNodes = [],
pollNodes = [],
triggerError,
pollError,
pollTimes = { item: [{ mode: 'everyMinute' }] },
}: TestOptions) => {
workflow.getTriggerNodes.mockReturnValue(triggerNodes);
workflow.getPollNodes.mockReturnValue(pollNodes);
pollFunctions.getNodeParameter.calledWith('pollTimes').mockReturnValue(pollTimes);

if (triggerError) {
triggersAndPollers.runTrigger.mockRejectedValueOnce(triggerError);
} else {
triggersAndPollers.runTrigger.mockResolvedValue(triggerResponse);
}

if (pollError) {
triggersAndPollers.runPoll.mockRejectedValueOnce(pollError);
} else {
getPollFunctions.mockReturnValue(pollFunctions);
}

return await activeWorkflows.add(
workflowId,
workflow,
additionalData,
mode,
activation,
getTriggerFunctions,
getPollFunctions,
);
};

describe('add()', () => {
describe('should activate workflow', () => {
it('with trigger nodes', async () => {
await addWorkflow({ triggerNodes: [triggerNode] });

expect(activeWorkflows.isActive(workflowId)).toBe(true);
expect(workflow.getTriggerNodes).toHaveBeenCalled();
expect(triggersAndPollers.runTrigger).toHaveBeenCalledWith(
workflow,
triggerNode,
getTriggerFunctions,
additionalData,
mode,
activation,
);
});

it('with polling nodes', async () => {
await addWorkflow({ pollNodes: [pollNode] });

expect(activeWorkflows.isActive(workflowId)).toBe(true);
expect(workflow.getPollNodes).toHaveBeenCalled();
expect(scheduledTaskManager.registerCron).toHaveBeenCalled();
});

it('with both trigger and polling nodes', async () => {
await addWorkflow({ triggerNodes: [triggerNode], pollNodes: [pollNode] });

expect(activeWorkflows.isActive(workflowId)).toBe(true);
expect(workflow.getTriggerNodes).toHaveBeenCalled();
expect(workflow.getPollNodes).toHaveBeenCalled();
expect(triggersAndPollers.runTrigger).toHaveBeenCalledWith(
workflow,
triggerNode,
getTriggerFunctions,
additionalData,
mode,
activation,
);
expect(scheduledTaskManager.registerCron).toHaveBeenCalled();
expect(triggersAndPollers.runPoll).toHaveBeenCalledWith(workflow, pollNode, pollFunctions);
});
});

describe('should throw error', () => {
it('if trigger activation fails', async () => {
const error = new Error('Trigger activation failed');
await expect(
addWorkflow({ triggerNodes: [triggerNode], triggerError: error }),
).rejects.toThrow(WorkflowActivationError);
expect(activeWorkflows.isActive(workflowId)).toBe(false);
});

it('if polling activation fails', async () => {
const error = new Error('Failed to activate polling');
await expect(addWorkflow({ pollNodes: [pollNode], pollError: error })).rejects.toThrow(
WorkflowActivationError,
);
expect(activeWorkflows.isActive(workflowId)).toBe(false);
});

it('if the polling interval is too short', async () => {
const pollTimes: PollTimes = {
item: [
{
mode: 'custom',
cronExpression: '* * * * *' as CronExpression,
},
],
};

await expect(addWorkflow({ pollNodes: [pollNode], pollTimes })).rejects.toThrow(
'The polling interval is too short. It has to be at least a minute.',
);

expect(scheduledTaskManager.registerCron).not.toHaveBeenCalled();
});
});

describe('should handle polling errors', () => {
it('should throw error when poll fails during initial testing', async () => {
const error = new Error('Poll function failed');

await expect(addWorkflow({ pollNodes: [pollNode], pollError: error })).rejects.toThrow(
WorkflowActivationError,
);

expect(triggersAndPollers.runPoll).toHaveBeenCalledWith(workflow, pollNode, pollFunctions);
expect(pollFunctions.__emit).not.toHaveBeenCalled();
expect(pollFunctions.__emitError).not.toHaveBeenCalled();
});

it('should emit error when poll fails during regular polling', async () => {
const error = new Error('Poll function failed');
triggersAndPollers.runPoll
.mockResolvedValueOnce(null) // Succeed on first call (testing)
.mockRejectedValueOnce(error); // Fail on second call (regular polling)

await addWorkflow({ pollNodes: [pollNode] });

// Get the executeTrigger function that was registered
const registerCronCall = scheduledTaskManager.registerCron.mock.calls[0];
const executeTrigger = registerCronCall[2] as () => Promise<void>;

// Execute the trigger function to simulate a regular poll
await executeTrigger();

expect(triggersAndPollers.runPoll).toHaveBeenCalledTimes(2);
expect(pollFunctions.__emit).not.toHaveBeenCalled();
expect(pollFunctions.__emitError).toHaveBeenCalledWith(error);
});
});
});

describe('remove()', () => {
const setupForRemoval = async () => {
await addWorkflow({ triggerNodes: [triggerNode] });
return await activeWorkflows.remove(workflowId);
};

it('should remove an active workflow', async () => {
const result = await setupForRemoval();

expect(result).toBe(true);
expect(activeWorkflows.isActive(workflowId)).toBe(false);
expect(scheduledTaskManager.deregisterCrons).toHaveBeenCalledWith(workflowId);
expect(triggerResponse.closeFunction).toHaveBeenCalled();
});

it('should return false when removing non-existent workflow', async () => {
const result = await activeWorkflows.remove('non-existent');

expect(result).toBe(false);
expect(scheduledTaskManager.deregisterCrons).not.toHaveBeenCalled();
});

it('should handle TriggerCloseError when closing trigger', async () => {
const triggerCloseError = new TriggerCloseError(triggerNode, { level: 'warning' });
(triggerResponse.closeFunction as jest.Mock).mockRejectedValueOnce(triggerCloseError);

const result = await setupForRemoval();

expect(result).toBe(true);
expect(activeWorkflows.isActive(workflowId)).toBe(false);
expect(triggerResponse.closeFunction).toHaveBeenCalled();
expect(errorReporter.error).toHaveBeenCalledWith(triggerCloseError, {
extra: { workflowId },
});
});

it('should throw WorkflowDeactivationError when closeFunction throws regular error', async () => {
const error = new Error('Close function failed');
(triggerResponse.closeFunction as jest.Mock).mockRejectedValueOnce(error);

await addWorkflow({ triggerNodes: [triggerNode] });

await expect(activeWorkflows.remove(workflowId)).rejects.toThrow(
`Failed to deactivate trigger of workflow ID "${workflowId}": "Close function failed"`,
);

expect(triggerResponse.closeFunction).toHaveBeenCalled();
expect(errorReporter.error).not.toHaveBeenCalled();
});
});

describe('get() and isActive()', () => {
it('should return workflow data for active workflow', async () => {
await addWorkflow({ triggerNodes: [triggerNode] });

expect(activeWorkflows.isActive(workflowId)).toBe(true);
expect(activeWorkflows.get(workflowId)).toBeDefined();
});

it('should return undefined for non-active workflow', () => {
expect(activeWorkflows.isActive('non-existent')).toBe(false);
expect(activeWorkflows.get('non-existent')).toBeUndefined();
});
});

describe('allActiveWorkflows()', () => {
it('should return all active workflow IDs', async () => {
await addWorkflow({ triggerNodes: [triggerNode] });

const activeIds = activeWorkflows.allActiveWorkflows();

expect(activeIds).toEqual([workflowId]);
});
});

describe('removeAllTriggerAndPollerBasedWorkflows()', () => {
it('should remove all active workflows', async () => {
await addWorkflow({ triggerNodes: [triggerNode] });

await activeWorkflows.removeAllTriggerAndPollerBasedWorkflows();

expect(activeWorkflows.allActiveWorkflows()).toEqual([]);
expect(scheduledTaskManager.deregisterCrons).toHaveBeenCalledWith(workflowId);
});
});
});
Loading

0 comments on commit f924f2a

Please sign in to comment.