diff --git a/x-pack/legacy/plugins/task_manager/task.ts b/x-pack/legacy/plugins/task_manager/task.ts index dfb8ce6c5f9f95..43e71ed8126328 100644 --- a/x-pack/legacy/plugins/task_manager/task.ts +++ b/x-pack/legacy/plugins/task_manager/task.ts @@ -108,10 +108,13 @@ export interface TaskDefinition { maxAttempts?: number; /** - * Function that returns the delay in seconds to wait before attempting the - * failed task again. + * Function that customizes how the task should behave when the task fails. This + * function can return `true`, `false` or a Date. True will tell task manager + * to retry using default delay logic. False will tell task manager to stop retrying + * this task. Date will suggest when to the task manager the task should retry. + * This function isn't used for interval type tasks, those retry at the next interval. */ - getRetryDelay?: (attempts: number, error: object) => number; + getRetry?: (attempts: number, error: object) => boolean | Date; /** * The numer of workers / slots a running instance of this task occupies. @@ -145,7 +148,7 @@ export const validateTaskDefinition = Joi.object({ .min(1) .default(1), createTaskRunner: Joi.func().required(), - getRetryDelay: Joi.func().optional(), + getRetry: Joi.func().optional(), }).default(); /** diff --git a/x-pack/legacy/plugins/task_manager/task_runner.test.ts b/x-pack/legacy/plugins/task_manager/task_runner.test.ts index 95dead2013a269..5ca8e6047bc8d3 100644 --- a/x-pack/legacy/plugins/task_manager/task_runner.test.ts +++ b/x-pack/legacy/plugins/task_manager/task_runner.test.ts @@ -6,7 +6,7 @@ import _ from 'lodash'; import sinon from 'sinon'; -import { minutesFromNow, secondsFromNow } from './lib/intervals'; +import { minutesFromNow } from './lib/intervals'; import { ConcreteTaskInstance } from './task'; import { TaskManagerRunner } from './task_runner'; @@ -255,20 +255,116 @@ describe('TaskManagerRunner', () => { ); }); - test('uses getRetryDelay function on error when defined', async () => { - const initialAttempts = _.random(0, 2); - const retryDelay = _.random(15, 100); + test('uses getRetry function (returning date) on error when defined', async () => { + const initialAttempts = _.random(1, 3); + const nextRetry = new Date(Date.now() + _.random(15, 100) * 1000); + const id = Date.now().toString(); + const getRetryStub = sinon.stub().returns(nextRetry); + const error = new Error('Dangit!'); + const { runner, store } = testOpts({ + instance: { + id, + attempts: initialAttempts, + }, + definitions: { + bar: { + getRetry: getRetryStub, + createTaskRunner: () => ({ + async run() { + throw error; + }, + }), + }, + }, + }); + + await runner.run(); + + sinon.assert.calledOnce(store.update); + sinon.assert.calledWith(getRetryStub, initialAttempts, error); + const instance = store.update.args[0][0]; + + expect(instance.runAt.getTime()).toEqual(nextRetry.getTime()); + }); + + test('uses getRetry function (returning true) on error when defined', async () => { + const initialAttempts = _.random(1, 3); + const id = Date.now().toString(); + const getRetryStub = sinon.stub().returns(true); + const error = new Error('Dangit!'); + const { runner, store } = testOpts({ + instance: { + id, + attempts: initialAttempts, + }, + definitions: { + bar: { + getRetry: getRetryStub, + createTaskRunner: () => ({ + async run() { + throw error; + }, + }), + }, + }, + }); + + await runner.run(); + + sinon.assert.calledOnce(store.update); + sinon.assert.calledWith(getRetryStub, initialAttempts, error); + const instance = store.update.args[0][0]; + + const expectedRunAt = new Date(Date.now() + initialAttempts * 5 * 60 * 1000); + expect(instance.runAt.getTime()).toEqual(expectedRunAt.getTime()); + }); + + test('uses getRetry function (returning false) on error when defined', async () => { + const initialAttempts = _.random(1, 3); + const id = Date.now().toString(); + const getRetryStub = sinon.stub().returns(false); + const error = new Error('Dangit!'); + const { runner, store } = testOpts({ + instance: { + id, + attempts: initialAttempts, + }, + definitions: { + bar: { + getRetry: getRetryStub, + createTaskRunner: () => ({ + async run() { + throw error; + }, + }), + }, + }, + }); + + await runner.run(); + + sinon.assert.calledOnce(store.update); + sinon.assert.calledWith(getRetryStub, initialAttempts, error); + const instance = store.update.args[0][0]; + + expect(instance.status).toBe('failed'); + }); + + test('bypasses getRetry function (returning false) on error of a recurring task', async () => { + const initialAttempts = _.random(1, 3); const id = Date.now().toString(); - const getRetryDelayStub = sinon.stub().returns(retryDelay); + const getRetryStub = sinon.stub().returns(false); const error = new Error('Dangit!'); const { runner, store } = testOpts({ instance: { id, attempts: initialAttempts, + interval: '1m', + startedAt: new Date(), }, definitions: { bar: { - getRetryDelay: getRetryDelayStub, + getRetry: getRetryStub, createTaskRunner: () => ({ async run() { throw error; @@ -281,18 +377,20 @@ describe('TaskManagerRunner', () => { await runner.run(); sinon.assert.calledOnce(store.update); - sinon.assert.calledWith(getRetryDelayStub, initialAttempts, error); + sinon.assert.notCalled(getRetryStub); const instance = store.update.args[0][0]; - expect(instance.runAt.getTime()).toEqual(secondsFromNow(retryDelay).getTime()); + const nextIntervalDelay = 60000; // 1m + const expectedRunAt = new Date(Date.now() + nextIntervalDelay); + expect(instance.runAt.getTime()).toEqual(expectedRunAt.getTime()); }); - test('uses getRetryDelay to set retryAt when defined', async () => { + test('uses getRetry (returning date) to set retryAt when defined', async () => { const id = _.random(1, 20).toString(); - const initialAttempts = _.random(0, 2); - const retryDelay = _.random(15, 100); + const initialAttempts = _.random(1, 3); + const nextRetry = new Date(Date.now() + _.random(15, 100) * 1000); const timeoutMinutes = 1; - const getRetryDelayStub = sinon.stub().returns(retryDelay); + const getRetryStub = sinon.stub().returns(nextRetry); const { runner, store } = testOpts({ instance: { id, @@ -302,7 +400,7 @@ describe('TaskManagerRunner', () => { definitions: { bar: { timeout: `${timeoutMinutes}m`, - getRetryDelay: getRetryDelayStub, + getRetry: getRetryStub, createTaskRunner: () => ({ run: async () => undefined, }), @@ -313,14 +411,114 @@ describe('TaskManagerRunner', () => { await runner.claimOwnership(); sinon.assert.calledOnce(store.update); - sinon.assert.calledWith(getRetryDelayStub, initialAttempts + 1); + sinon.assert.calledWith(getRetryStub, initialAttempts + 1); const instance = store.update.args[0][0]; expect(instance.retryAt.getTime()).toEqual( - secondsFromNow(retryDelay).getTime() + timeoutMinutes * 60 * 1000 + new Date(nextRetry.getTime() + timeoutMinutes * 60 * 1000).getTime() ); }); + test('uses getRetry (returning true) to set retryAt when defined', async () => { + const id = _.random(1, 20).toString(); + const initialAttempts = _.random(1, 3); + const timeoutMinutes = 1; + const getRetryStub = sinon.stub().returns(true); + const { runner, store } = testOpts({ + instance: { + id, + attempts: initialAttempts, + interval: undefined, + }, + definitions: { + bar: { + timeout: `${timeoutMinutes}m`, + getRetry: getRetryStub, + createTaskRunner: () => ({ + run: async () => undefined, + }), + }, + }, + }); + + await runner.claimOwnership(); + + sinon.assert.calledOnce(store.update); + sinon.assert.calledWith(getRetryStub, initialAttempts + 1); + const instance = store.update.args[0][0]; + + const attemptDelay = (initialAttempts + 1) * 5 * 60 * 1000; + const timeoutDelay = timeoutMinutes * 60 * 1000; + expect(instance.retryAt.getTime()).toEqual( + new Date(Date.now() + attemptDelay + timeoutDelay).getTime() + ); + }); + + test('uses getRetry (returning false) to set retryAt when defined', async () => { + const id = _.random(1, 20).toString(); + const initialAttempts = _.random(1, 3); + const timeoutMinutes = 1; + const getRetryStub = sinon.stub().returns(false); + const { runner, store } = testOpts({ + instance: { + id, + attempts: initialAttempts, + interval: undefined, + }, + definitions: { + bar: { + timeout: `${timeoutMinutes}m`, + getRetry: getRetryStub, + createTaskRunner: () => ({ + run: async () => undefined, + }), + }, + }, + }); + + await runner.claimOwnership(); + + sinon.assert.calledOnce(store.update); + sinon.assert.calledWith(getRetryStub, initialAttempts + 1); + const instance = store.update.args[0][0]; + + expect(instance.retryAt).toBeNull(); + expect(instance.status).toBe('running'); + }); + + test('bypasses getRetry (returning false) of a recurring task to set retryAt when defined', async () => { + const id = _.random(1, 20).toString(); + const initialAttempts = _.random(1, 3); + const timeoutMinutes = 1; + const getRetryStub = sinon.stub().returns(false); + const { runner, store } = testOpts({ + instance: { + id, + attempts: initialAttempts, + interval: '1m', + startedAt: new Date(), + }, + definitions: { + bar: { + timeout: `${timeoutMinutes}m`, + getRetry: getRetryStub, + createTaskRunner: () => ({ + run: async () => undefined, + }), + }, + }, + }); + + await runner.claimOwnership(); + + sinon.assert.calledOnce(store.update); + sinon.assert.notCalled(getRetryStub); + const instance = store.update.args[0][0]; + + const timeoutDelay = timeoutMinutes * 60 * 1000; + expect(instance.retryAt.getTime()).toEqual(new Date(Date.now() + timeoutDelay).getTime()); + }); + test('Fails non-recurring task when maxAttempts reached', async () => { const id = _.random(1, 20).toString(); const initialAttempts = 3; @@ -355,11 +553,13 @@ describe('TaskManagerRunner', () => { test(`Doesn't fail recurring tasks when maxAttempts reached`, async () => { const id = _.random(1, 20).toString(); const initialAttempts = 3; + const intervalSeconds = 10; const { runner, store } = testOpts({ instance: { id, attempts: initialAttempts, - interval: '10s', + interval: `${intervalSeconds}s`, + startedAt: new Date(), }, definitions: { bar: { @@ -379,7 +579,9 @@ describe('TaskManagerRunner', () => { const instance = store.update.args[0][0]; expect(instance.attempts).toEqual(3); expect(instance.status).toEqual('idle'); - expect(instance.runAt.getTime()).toEqual(minutesFromNow(15).getTime()); + expect(instance.runAt.getTime()).toEqual( + new Date(Date.now() + intervalSeconds * 1000).getTime() + ); }); interface TestOpts { diff --git a/x-pack/legacy/plugins/task_manager/task_runner.ts b/x-pack/legacy/plugins/task_manager/task_runner.ts index 49936488739031..082670817bf1dc 100644 --- a/x-pack/legacy/plugins/task_manager/task_runner.ts +++ b/x-pack/legacy/plugins/task_manager/task_runner.ts @@ -11,7 +11,6 @@ */ import Joi from 'joi'; -import Boom from 'boom'; import { intervalFromDate, intervalFromNow } from './lib/intervals'; import { Logger } from './lib/logger'; import { BeforeRunFunction } from './lib/middleware'; @@ -23,6 +22,7 @@ import { SanitizedTaskDefinition, TaskDictionary, validateRunResult, + TaskStatus, } from './task'; const defaultBackoffPerFailure = 5 * 60 * 1000; @@ -171,17 +171,21 @@ export class TaskManagerRunner implements TaskRunner { const attempts = this.instance.attempts + 1; const now = new Date(); - const timeoutDate = intervalFromNow(this.definition.timeout!)!; - try { this.instance = await this.store.update({ ...this.instance, status: 'running', startedAt: now, attempts, - retryAt: new Date( - timeoutDate.getTime() + this.getRetryDelay(attempts, Boom.clientTimeout()) - ), + retryAt: this.instance.interval + ? intervalFromNow(this.definition.timeout)! + : this.getRetryDelay({ + attempts, + // Fake an error. This allows retry logic when tasks keep timing out + // and lets us set a proper "retryAt" value each time. + error: new Error('Task timeout'), + addDuration: this.definition.timeout, + }), }); return true; @@ -223,7 +227,7 @@ export class TaskManagerRunner implements TaskRunner { // recurring task: update the task instance const startedAt = this.instance.startedAt!; const state = result.state || this.instance.state || {}; - const status = this.getInstanceStatus(); + let status: TaskStatus = this.getInstanceStatus(); let runAt; if (status === 'failed') { @@ -233,7 +237,18 @@ export class TaskManagerRunner implements TaskRunner { runAt = result.runAt; } else if (result.error) { // when result.error is truthy, then we're retrying because it failed - runAt = new Date(Date.now() + this.getRetryDelay(this.instance.attempts, result.error)); + const newRunAt = this.instance.interval + ? intervalFromDate(startedAt, this.instance.interval)! + : this.getRetryDelay({ + attempts: this.instance.attempts, + error: result.error, + }); + if (!newRunAt) { + status = 'failed'; + runAt = this.instance.runAt; + } else { + runAt = newRunAt; + } } else { runAt = intervalFromDate(startedAt, this.instance.interval)!; } @@ -286,11 +301,34 @@ export class TaskManagerRunner implements TaskRunner { return this.instance.attempts < maxAttempts ? 'idle' : 'failed'; } - private getRetryDelay(attempts: number, error: any) { - if (this.definition.getRetryDelay) { - return this.definition.getRetryDelay(attempts, error) * 1000; + private getRetryDelay({ + error, + attempts, + addDuration, + }: { + error: any; + attempts: number; + addDuration?: string; + }): Date | null { + let result = null; + + // Use custom retry logic, if any, otherwise we'll use the default logic + const retry: boolean | Date = this.definition.getRetry + ? this.definition.getRetry(attempts, error) + : true; + + if (retry instanceof Date) { + result = retry; + } else if (retry === true) { + result = new Date(Date.now() + attempts * defaultBackoffPerFailure); } - return attempts * defaultBackoffPerFailure; + + // Add a duration to the result + if (addDuration && result) { + result = intervalFromDate(result, addDuration)!; + } + + return result; } }