Skip to content

Commit

Permalink
Modify task manager getRetryDelay to be more custom (#42064) (#42136)
Browse files Browse the repository at this point in the history
* Modify task manager getRetryDelay to be more custom

* Ensure intervals don't die

* Update comment

* Update comment pt2

* Fix type check

* Intervals to not have retry logic

* Make test easier to read

* Modify docs

* Rename confusing variable

* Apply PR feedback

* Change error to task timeout error

* Re-add comment

* Fix type check
  • Loading branch information
mikecote committed Jul 29, 2019
1 parent e80c17d commit fefa154
Show file tree
Hide file tree
Showing 3 changed files with 276 additions and 33 deletions.
11 changes: 7 additions & 4 deletions x-pack/legacy/plugins/task_manager/task.ts
Original file line number Diff line number Diff line change
Expand Up @@ -108,10 +108,13 @@ export interface TaskDefinition {
maxAttempts?: number;

/**
* Function that returns the delay in seconds to wait before attempting the
* failed task again.
* Function that customizes how the task should behave when the task fails. This
* function can return `true`, `false` or a Date. True will tell task manager
* to retry using default delay logic. False will tell task manager to stop retrying
* this task. Date will suggest when to the task manager the task should retry.
* This function isn't used for interval type tasks, those retry at the next interval.
*/
getRetryDelay?: (attempts: number, error: object) => number;
getRetry?: (attempts: number, error: object) => boolean | Date;

/**
* The numer of workers / slots a running instance of this task occupies.
Expand Down Expand Up @@ -145,7 +148,7 @@ export const validateTaskDefinition = Joi.object({
.min(1)
.default(1),
createTaskRunner: Joi.func().required(),
getRetryDelay: Joi.func().optional(),
getRetry: Joi.func().optional(),
}).default();

/**
Expand Down
236 changes: 219 additions & 17 deletions x-pack/legacy/plugins/task_manager/task_runner.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

import _ from 'lodash';
import sinon from 'sinon';
import { minutesFromNow, secondsFromNow } from './lib/intervals';
import { minutesFromNow } from './lib/intervals';
import { ConcreteTaskInstance } from './task';
import { TaskManagerRunner } from './task_runner';

Expand Down Expand Up @@ -255,20 +255,116 @@ describe('TaskManagerRunner', () => {
);
});

test('uses getRetryDelay function on error when defined', async () => {
const initialAttempts = _.random(0, 2);
const retryDelay = _.random(15, 100);
test('uses getRetry function (returning date) on error when defined', async () => {
const initialAttempts = _.random(1, 3);
const nextRetry = new Date(Date.now() + _.random(15, 100) * 1000);
const id = Date.now().toString();
const getRetryStub = sinon.stub().returns(nextRetry);
const error = new Error('Dangit!');
const { runner, store } = testOpts({
instance: {
id,
attempts: initialAttempts,
},
definitions: {
bar: {
getRetry: getRetryStub,
createTaskRunner: () => ({
async run() {
throw error;
},
}),
},
},
});

await runner.run();

sinon.assert.calledOnce(store.update);
sinon.assert.calledWith(getRetryStub, initialAttempts, error);
const instance = store.update.args[0][0];

expect(instance.runAt.getTime()).toEqual(nextRetry.getTime());
});

test('uses getRetry function (returning true) on error when defined', async () => {
const initialAttempts = _.random(1, 3);
const id = Date.now().toString();
const getRetryStub = sinon.stub().returns(true);
const error = new Error('Dangit!');
const { runner, store } = testOpts({
instance: {
id,
attempts: initialAttempts,
},
definitions: {
bar: {
getRetry: getRetryStub,
createTaskRunner: () => ({
async run() {
throw error;
},
}),
},
},
});

await runner.run();

sinon.assert.calledOnce(store.update);
sinon.assert.calledWith(getRetryStub, initialAttempts, error);
const instance = store.update.args[0][0];

const expectedRunAt = new Date(Date.now() + initialAttempts * 5 * 60 * 1000);
expect(instance.runAt.getTime()).toEqual(expectedRunAt.getTime());
});

test('uses getRetry function (returning false) on error when defined', async () => {
const initialAttempts = _.random(1, 3);
const id = Date.now().toString();
const getRetryStub = sinon.stub().returns(false);
const error = new Error('Dangit!');
const { runner, store } = testOpts({
instance: {
id,
attempts: initialAttempts,
},
definitions: {
bar: {
getRetry: getRetryStub,
createTaskRunner: () => ({
async run() {
throw error;
},
}),
},
},
});

await runner.run();

sinon.assert.calledOnce(store.update);
sinon.assert.calledWith(getRetryStub, initialAttempts, error);
const instance = store.update.args[0][0];

expect(instance.status).toBe('failed');
});

test('bypasses getRetry function (returning false) on error of a recurring task', async () => {
const initialAttempts = _.random(1, 3);
const id = Date.now().toString();
const getRetryDelayStub = sinon.stub().returns(retryDelay);
const getRetryStub = sinon.stub().returns(false);
const error = new Error('Dangit!');
const { runner, store } = testOpts({
instance: {
id,
attempts: initialAttempts,
interval: '1m',
startedAt: new Date(),
},
definitions: {
bar: {
getRetryDelay: getRetryDelayStub,
getRetry: getRetryStub,
createTaskRunner: () => ({
async run() {
throw error;
Expand All @@ -281,18 +377,20 @@ describe('TaskManagerRunner', () => {
await runner.run();

sinon.assert.calledOnce(store.update);
sinon.assert.calledWith(getRetryDelayStub, initialAttempts, error);
sinon.assert.notCalled(getRetryStub);
const instance = store.update.args[0][0];

expect(instance.runAt.getTime()).toEqual(secondsFromNow(retryDelay).getTime());
const nextIntervalDelay = 60000; // 1m
const expectedRunAt = new Date(Date.now() + nextIntervalDelay);
expect(instance.runAt.getTime()).toEqual(expectedRunAt.getTime());
});

test('uses getRetryDelay to set retryAt when defined', async () => {
test('uses getRetry (returning date) to set retryAt when defined', async () => {
const id = _.random(1, 20).toString();
const initialAttempts = _.random(0, 2);
const retryDelay = _.random(15, 100);
const initialAttempts = _.random(1, 3);
const nextRetry = new Date(Date.now() + _.random(15, 100) * 1000);
const timeoutMinutes = 1;
const getRetryDelayStub = sinon.stub().returns(retryDelay);
const getRetryStub = sinon.stub().returns(nextRetry);
const { runner, store } = testOpts({
instance: {
id,
Expand All @@ -302,7 +400,7 @@ describe('TaskManagerRunner', () => {
definitions: {
bar: {
timeout: `${timeoutMinutes}m`,
getRetryDelay: getRetryDelayStub,
getRetry: getRetryStub,
createTaskRunner: () => ({
run: async () => undefined,
}),
Expand All @@ -313,14 +411,114 @@ describe('TaskManagerRunner', () => {
await runner.claimOwnership();

sinon.assert.calledOnce(store.update);
sinon.assert.calledWith(getRetryDelayStub, initialAttempts + 1);
sinon.assert.calledWith(getRetryStub, initialAttempts + 1);
const instance = store.update.args[0][0];

expect(instance.retryAt.getTime()).toEqual(
secondsFromNow(retryDelay).getTime() + timeoutMinutes * 60 * 1000
new Date(nextRetry.getTime() + timeoutMinutes * 60 * 1000).getTime()
);
});

test('uses getRetry (returning true) to set retryAt when defined', async () => {
const id = _.random(1, 20).toString();
const initialAttempts = _.random(1, 3);
const timeoutMinutes = 1;
const getRetryStub = sinon.stub().returns(true);
const { runner, store } = testOpts({
instance: {
id,
attempts: initialAttempts,
interval: undefined,
},
definitions: {
bar: {
timeout: `${timeoutMinutes}m`,
getRetry: getRetryStub,
createTaskRunner: () => ({
run: async () => undefined,
}),
},
},
});

await runner.claimOwnership();

sinon.assert.calledOnce(store.update);
sinon.assert.calledWith(getRetryStub, initialAttempts + 1);
const instance = store.update.args[0][0];

const attemptDelay = (initialAttempts + 1) * 5 * 60 * 1000;
const timeoutDelay = timeoutMinutes * 60 * 1000;
expect(instance.retryAt.getTime()).toEqual(
new Date(Date.now() + attemptDelay + timeoutDelay).getTime()
);
});

test('uses getRetry (returning false) to set retryAt when defined', async () => {
const id = _.random(1, 20).toString();
const initialAttempts = _.random(1, 3);
const timeoutMinutes = 1;
const getRetryStub = sinon.stub().returns(false);
const { runner, store } = testOpts({
instance: {
id,
attempts: initialAttempts,
interval: undefined,
},
definitions: {
bar: {
timeout: `${timeoutMinutes}m`,
getRetry: getRetryStub,
createTaskRunner: () => ({
run: async () => undefined,
}),
},
},
});

await runner.claimOwnership();

sinon.assert.calledOnce(store.update);
sinon.assert.calledWith(getRetryStub, initialAttempts + 1);
const instance = store.update.args[0][0];

expect(instance.retryAt).toBeNull();
expect(instance.status).toBe('running');
});

test('bypasses getRetry (returning false) of a recurring task to set retryAt when defined', async () => {
const id = _.random(1, 20).toString();
const initialAttempts = _.random(1, 3);
const timeoutMinutes = 1;
const getRetryStub = sinon.stub().returns(false);
const { runner, store } = testOpts({
instance: {
id,
attempts: initialAttempts,
interval: '1m',
startedAt: new Date(),
},
definitions: {
bar: {
timeout: `${timeoutMinutes}m`,
getRetry: getRetryStub,
createTaskRunner: () => ({
run: async () => undefined,
}),
},
},
});

await runner.claimOwnership();

sinon.assert.calledOnce(store.update);
sinon.assert.notCalled(getRetryStub);
const instance = store.update.args[0][0];

const timeoutDelay = timeoutMinutes * 60 * 1000;
expect(instance.retryAt.getTime()).toEqual(new Date(Date.now() + timeoutDelay).getTime());
});

test('Fails non-recurring task when maxAttempts reached', async () => {
const id = _.random(1, 20).toString();
const initialAttempts = 3;
Expand Down Expand Up @@ -355,11 +553,13 @@ describe('TaskManagerRunner', () => {
test(`Doesn't fail recurring tasks when maxAttempts reached`, async () => {
const id = _.random(1, 20).toString();
const initialAttempts = 3;
const intervalSeconds = 10;
const { runner, store } = testOpts({
instance: {
id,
attempts: initialAttempts,
interval: '10s',
interval: `${intervalSeconds}s`,
startedAt: new Date(),
},
definitions: {
bar: {
Expand All @@ -379,7 +579,9 @@ describe('TaskManagerRunner', () => {
const instance = store.update.args[0][0];
expect(instance.attempts).toEqual(3);
expect(instance.status).toEqual('idle');
expect(instance.runAt.getTime()).toEqual(minutesFromNow(15).getTime());
expect(instance.runAt.getTime()).toEqual(
new Date(Date.now() + intervalSeconds * 1000).getTime()
);
});

interface TestOpts {
Expand Down
Loading

0 comments on commit fefa154

Please sign in to comment.