diff --git a/x-pack/plugins/task_manager/server/lib/intervals.test.ts b/x-pack/plugins/task_manager/server/lib/intervals.test.ts index 147e41e1a9d60..efef05843cb40 100644 --- a/x-pack/plugins/task_manager/server/lib/intervals.test.ts +++ b/x-pack/plugins/task_manager/server/lib/intervals.test.ts @@ -14,6 +14,7 @@ import { secondsFromNow, secondsFromDate, asInterval, + maxIntervalFromDate, } from './intervals'; let fakeTimer: sinon.SinonFakeTimers; @@ -159,6 +160,44 @@ describe('taskIntervals', () => { }); }); + describe('maxIntervalFromDate', () => { + test('it handles a single interval', () => { + const mins = _.random(1, 100); + const now = new Date(); + const expected = now.getTime() + mins * 60 * 1000; + expect(maxIntervalFromDate(now, `${mins}m`)!.getTime()).toEqual(expected); + }); + + test('it handles multiple intervals', () => { + const mins = _.random(1, 100); + const maxMins = mins + _.random(1, 100); + const now = new Date(); + const expected = now.getTime() + maxMins * 60 * 1000; + expect(maxIntervalFromDate(now, `${mins}m`, `${maxMins}m`)!.getTime()).toEqual(expected); + }); + + test('it handles multiple mixed type intervals', () => { + const mins = _.random(1, 100); + const seconds = _.random(1, 100); + const maxSeconds = Math.max(mins * 60, seconds) + _.random(1, 100); + const now = new Date(); + const expected = now.getTime() + maxSeconds * 1000; + expect( + maxIntervalFromDate(now, `${mins}m`, `${maxSeconds}s`, `${seconds}s`)!.getTime() + ).toEqual(expected); + }); + + test('it handles undefined intervals', () => { + const mins = _.random(1, 100); + const maxMins = mins + _.random(1, 100); + const now = new Date(); + const expected = now.getTime() + maxMins * 60 * 1000; + expect(maxIntervalFromDate(now, `${mins}m`, undefined, `${maxMins}m`)!.getTime()).toEqual( + expected + ); + }); + }); + describe('intervalFromDate', () => { test('it returns the given date plus n minutes', () => { const originalDate = new Date(2019, 1, 1); diff --git a/x-pack/plugins/task_manager/server/lib/intervals.ts b/x-pack/plugins/task_manager/server/lib/intervals.ts index 94537277123ee..da04dffa4b5d1 100644 --- a/x-pack/plugins/task_manager/server/lib/intervals.ts +++ b/x-pack/plugins/task_manager/server/lib/intervals.ts @@ -4,7 +4,7 @@ * you may not use this file except in compliance with the Elastic License. */ -import { memoize } from 'lodash'; +import { isString, memoize } from 'lodash'; export enum IntervalCadence { Minute = 'm', @@ -57,6 +57,16 @@ export function intervalFromDate(date: Date, interval?: string): Date | undefine return secondsFromDate(date, parseIntervalAsSecond(interval)); } +export function maxIntervalFromDate( + date: Date, + ...intervals: Array +): Date | undefined { + const maxSeconds = Math.max(...intervals.filter(isString).map(parseIntervalAsSecond)); + if (!isNaN(maxSeconds)) { + return secondsFromDate(date, maxSeconds); + } +} + /** * Returns a date that is secs seconds from now. * diff --git a/x-pack/plugins/task_manager/server/task_running/task_runner.test.ts b/x-pack/plugins/task_manager/server/task_running/task_runner.test.ts index f5e2d3d96bc42..3777d89ce63dd 100644 --- a/x-pack/plugins/task_manager/server/task_running/task_runner.test.ts +++ b/x-pack/plugins/task_manager/server/task_running/task_runner.test.ts @@ -393,6 +393,102 @@ describe('TaskManagerRunner', () => { ); }); + test('calculates retryAt by schedule when running a recurring task', async () => { + const intervalMinutes = 10; + const id = _.random(1, 20).toString(); + const initialAttempts = _.random(0, 2); + const { runner, store } = testOpts({ + instance: { + id, + attempts: initialAttempts, + schedule: { + interval: `${intervalMinutes}m`, + }, + }, + definitions: { + bar: { + title: 'Bar!', + createTaskRunner: () => ({ + run: async () => undefined, + }), + }, + }, + }); + + await runner.markTaskAsRunning(); + + sinon.assert.calledOnce(store.update); + const instance = store.update.args[0][0]; + + expect(instance.retryAt.getTime()).toEqual( + instance.startedAt.getTime() + intervalMinutes * 60 * 1000 + ); + }); + + test('calculates retryAt by default timout when it exceeds the schedule of a recurring task', async () => { + const intervalSeconds = 20; + const id = _.random(1, 20).toString(); + const initialAttempts = _.random(0, 2); + const { runner, store } = testOpts({ + instance: { + id, + attempts: initialAttempts, + schedule: { + interval: `${intervalSeconds}s`, + }, + }, + definitions: { + bar: { + title: 'Bar!', + createTaskRunner: () => ({ + run: async () => undefined, + }), + }, + }, + }); + + await runner.markTaskAsRunning(); + + sinon.assert.calledOnce(store.update); + const instance = store.update.args[0][0]; + + expect(instance.retryAt.getTime()).toEqual(instance.startedAt.getTime() + 5 * 60 * 1000); + }); + + test('calculates retryAt by timeout if it exceeds the schedule when running a recurring task', async () => { + const timeoutMinutes = 1; + const intervalSeconds = 20; + const id = _.random(1, 20).toString(); + const initialAttempts = _.random(0, 2); + const { runner, store } = testOpts({ + instance: { + id, + attempts: initialAttempts, + schedule: { + interval: `${intervalSeconds}s`, + }, + }, + definitions: { + bar: { + title: 'Bar!', + timeout: `${timeoutMinutes}m`, + createTaskRunner: () => ({ + run: async () => undefined, + }), + }, + }, + }); + + await runner.markTaskAsRunning(); + + sinon.assert.calledOnce(store.update); + const instance = store.update.args[0][0]; + + expect(instance.retryAt.getTime()).toEqual( + instance.startedAt.getTime() + timeoutMinutes * 60 * 1000 + ); + }); + test('uses getRetry function (returning date) on error when defined', async () => { const initialAttempts = _.random(1, 3); const nextRetry = new Date(Date.now() + _.random(15, 100) * 1000); diff --git a/x-pack/plugins/task_manager/server/task_running/task_runner.ts b/x-pack/plugins/task_manager/server/task_running/task_runner.ts index fb7a28c8f402c..23d21d205ec26 100644 --- a/x-pack/plugins/task_manager/server/task_running/task_runner.ts +++ b/x-pack/plugins/task_manager/server/task_running/task_runner.ts @@ -26,7 +26,7 @@ import { startTaskTimer, TaskTiming, } from '../task_events'; -import { intervalFromDate, intervalFromNow } from '../lib/intervals'; +import { intervalFromDate, maxIntervalFromDate } from '../lib/intervals'; import { CancelFunction, CancellableTask, @@ -259,15 +259,16 @@ export class TaskManagerRunner implements TaskRunner { status: TaskStatus.Running, startedAt: now, attempts, - retryAt: this.instance.schedule - ? intervalFromNow(this.definition.timeout)! - : this.getRetryDelay({ - attempts, - // Fake an error. This allows retry logic when tasks keep timing out - // and lets us set a proper "retryAt" value each time. - error: new Error('Task timeout'), - addDuration: this.definition.timeout, - }) ?? null, + retryAt: + (this.instance.schedule + ? maxIntervalFromDate(now, this.instance.schedule!.interval, this.definition.timeout) + : this.getRetryDelay({ + attempts, + // Fake an error. This allows retry logic when tasks keep timing out + // and lets us set a proper "retryAt" value each time. + error: new Error('Task timeout'), + addDuration: this.definition.timeout, + })) ?? null, }); const timeUntilClaimExpiresAfterUpdate = howManyMsUntilOwnershipClaimExpires( diff --git a/x-pack/test/plugin_api_integration/plugins/sample_task_plugin/server/plugin.ts b/x-pack/test/plugin_api_integration/plugins/sample_task_plugin/server/plugin.ts index b5d2c98d8cbcd..0326adb90775a 100644 --- a/x-pack/test/plugin_api_integration/plugins/sample_task_plugin/server/plugin.ts +++ b/x-pack/test/plugin_api_integration/plugins/sample_task_plugin/server/plugin.ts @@ -115,6 +115,17 @@ export class SampleTaskManagerFixturePlugin }, }), }, + sampleRecurringTaskWhichHangs: { + title: 'Sample Recurring Task that Hangs for a minute', + description: 'A sample task that Hangs for a minute on each run.', + maxAttempts: 3, + timeout: '60s', + createTaskRunner: () => ({ + async run() { + return await new Promise((resolve) => {}); + }, + }), + }, sampleOneTimeTaskTimingOut: { title: 'Sample One-Time Task that Times Out', description: 'A sample task that times out each run.', diff --git a/x-pack/test/plugin_api_integration/test_suites/task_manager/task_management.ts b/x-pack/test/plugin_api_integration/test_suites/task_manager/task_management.ts index f34cb7594d288..7f4585fad4729 100644 --- a/x-pack/test/plugin_api_integration/test_suites/task_manager/task_management.ts +++ b/x-pack/test/plugin_api_integration/test_suites/task_manager/task_management.ts @@ -260,6 +260,28 @@ export default function ({ getService }: FtrProviderContext) { }); }); + it('should schedule the retry of recurring tasks to run at the next schedule when they time out', async () => { + const intervalInMinutes = 30; + const intervalInMilliseconds = intervalInMinutes * 60 * 1000; + const task = await scheduleTask({ + taskType: 'sampleRecurringTaskWhichHangs', + schedule: { interval: `${intervalInMinutes}m` }, + params: {}, + }); + + await retry.try(async () => { + const [scheduledTask] = (await currentTasks()).docs; + expect(scheduledTask.id).to.eql(task.id); + const retryAt = Date.parse(scheduledTask.retryAt!); + expect(isNaN(retryAt)).to.be(false); + + const buffer = 10000; // 10 second buffer + const retryDelay = retryAt - Date.parse(task.runAt); + expect(retryDelay).to.be.greaterThan(intervalInMilliseconds - buffer); + expect(retryDelay).to.be.lessThan(intervalInMilliseconds + buffer); + }); + }); + it('should reschedule if task returns runAt', async () => { const nextRunMilliseconds = _.random(60000, 200000); const count = _.random(1, 20);