From eb03295f858bca0c19a5a40786ccdfc56f845813 Mon Sep 17 00:00:00 2001 From: Gidi Meir Morris Date: Thu, 13 Aug 2020 12:20:38 +0100 Subject: [PATCH] [Task manager] Prevents edge case where already running tasks are reschedule every polling interval (#74606) Fixes flaky tests in Task Manager and Alerting. The fix in #73244 was correct, but it missed an edge case which causes the already running task to be rescheduled over and over. This prevents that edge case which was effecting both TM in general and Alerting specifically. --- .../task_manager/server/task_store.test.ts | 104 +++++++++++++++++- .../plugins/task_manager/server/task_store.ts | 55 ++++----- .../tests/alerting/update.ts | 3 +- .../task_manager/task_manager_integration.js | 3 +- 4 files changed, 125 insertions(+), 40 deletions(-) diff --git a/x-pack/plugins/task_manager/server/task_store.test.ts b/x-pack/plugins/task_manager/server/task_store.test.ts index d65c39f4f454d6..a02123c4a3f8d2 100644 --- a/x-pack/plugins/task_manager/server/task_store.test.ts +++ b/x-pack/plugins/task_manager/server/task_store.test.ts @@ -627,7 +627,7 @@ if (doc['task.runAt'].size()!=0) { }); }); - test('it returns task objects', async () => { + test('it filters out running tasks', async () => { const taskManagerId = uuid.v1(); const claimOwnershipUntil = new Date(Date.now()); const runAt = new Date(); @@ -641,7 +641,7 @@ if (doc['task.runAt'].size()!=0) { taskType: 'foo', schedule: undefined, attempts: 0, - status: 'idle', + status: 'claiming', params: '{ "hello": "world" }', state: '{ "baby": "Henhen" }', user: 'jimbo', @@ -715,7 +715,103 @@ if (doc['task.runAt'].size()!=0) { runAt, scope: ['reporting'], state: { baby: 'Henhen' }, - status: 'idle', + status: 'claiming', + taskType: 'foo', + user: 'jimbo', + ownerId: taskManagerId, + }, + ]); + }); + + test('it returns task objects', async () => { + const taskManagerId = uuid.v1(); + const claimOwnershipUntil = new Date(Date.now()); + const runAt = new Date(); + const tasks = [ + { + _id: 'aaa', + _source: { + type: 'task', + task: { + runAt, + taskType: 'foo', + schedule: undefined, + attempts: 0, + status: 'claiming', + params: '{ "hello": "world" }', + state: '{ "baby": "Henhen" }', + user: 'jimbo', + scope: ['reporting'], + ownerId: taskManagerId, + }, + }, + _seq_no: 1, + _primary_term: 2, + sort: ['a', 1], + }, + { + _id: 'bbb', + _source: { + type: 'task', + task: { + runAt, + taskType: 'bar', + schedule: { interval: '5m' }, + attempts: 2, + status: 'claiming', + params: '{ "shazm": 1 }', + state: '{ "henry": "The 8th" }', + user: 'dabo', + scope: ['reporting', 'ceo'], + ownerId: taskManagerId, + }, + }, + _seq_no: 3, + _primary_term: 4, + sort: ['b', 2], + }, + ]; + const { + result: { docs }, + args: { + search: { + body: { query }, + }, + }, + } = await testClaimAvailableTasks({ + opts: { + taskManagerId, + }, + claimingOpts: { + claimOwnershipUntil, + size: 10, + }, + hits: tasks, + }); + + expect(query.bool.must).toContainEqual({ + bool: { + must: [ + { + term: { + 'task.ownerId': taskManagerId, + }, + }, + { term: { 'task.status': 'claiming' } }, + ], + }, + }); + + expect(docs).toMatchObject([ + { + attempts: 0, + id: 'aaa', + schedule: undefined, + params: { hello: 'world' }, + runAt, + scope: ['reporting'], + state: { baby: 'Henhen' }, + status: 'claiming', taskType: 'foo', user: 'jimbo', ownerId: taskManagerId, @@ -728,7 +824,7 @@ if (doc['task.runAt'].size()!=0) { runAt, scope: ['reporting', 'ceo'], state: { henry: 'The 8th' }, - status: 'running', + status: 'claiming', taskType: 'bar', user: 'dabo', ownerId: taskManagerId, diff --git a/x-pack/plugins/task_manager/server/task_store.ts b/x-pack/plugins/task_manager/server/task_store.ts index a18fb57b35b3d7..f2da41053e6ab6 100644 --- a/x-pack/plugins/task_manager/server/task_store.ts +++ b/x-pack/plugins/task_manager/server/task_store.ts @@ -217,48 +217,39 @@ export class TaskStore { claimTasksByIdWithRawIds, size ); + const docs = numberOfTasksClaimed > 0 ? await this.sweepForClaimedTasks(claimTasksByIdWithRawIds, size) : []; - // emit success/fail events for claimed tasks by id - if (claimTasksById && claimTasksById.length) { - const [documentsReturnedById, documentsClaimedBySchedule] = partition(docs, (doc) => - claimTasksById.includes(doc.id) - ); - - const [documentsClaimedById, documentsRequestedButNotClaimed] = partition( - documentsReturnedById, - // we filter the schduled tasks down by status is 'claiming' in the esearch, - // but we do not apply this limitation on tasks claimed by ID so that we can - // provide more detailed error messages when we fail to claim them - (doc) => doc.status === TaskStatus.Claiming - ); - - const documentsRequestedButNotReturned = difference( - claimTasksById, - map(documentsReturnedById, 'id') - ); + const [documentsReturnedById, documentsClaimedBySchedule] = partition(docs, (doc) => + claimTasksById.includes(doc.id) + ); - this.emitEvents( - [...documentsClaimedById, ...documentsClaimedBySchedule].map((doc) => - asTaskClaimEvent(doc.id, asOk(doc)) - ) - ); + const [documentsClaimedById, documentsRequestedButNotClaimed] = partition( + documentsReturnedById, + // we filter the schduled tasks down by status is 'claiming' in the esearch, + // but we do not apply this limitation on tasks claimed by ID so that we can + // provide more detailed error messages when we fail to claim them + (doc) => doc.status === TaskStatus.Claiming + ); - this.emitEvents( - documentsRequestedButNotClaimed.map((doc) => asTaskClaimEvent(doc.id, asErr(some(doc)))) - ); + const documentsRequestedButNotReturned = difference( + claimTasksById, + map(documentsReturnedById, 'id') + ); - this.emitEvents( - documentsRequestedButNotReturned.map((id) => asTaskClaimEvent(id, asErr(none))) - ); - } + this.emitEvents([ + ...documentsClaimedById.map((doc) => asTaskClaimEvent(doc.id, asOk(doc))), + ...documentsClaimedBySchedule.map((doc) => asTaskClaimEvent(doc.id, asOk(doc))), + ...documentsRequestedButNotClaimed.map((doc) => asTaskClaimEvent(doc.id, asErr(some(doc)))), + ...documentsRequestedButNotReturned.map((id) => asTaskClaimEvent(id, asErr(none))), + ]); return { - claimedTasks: numberOfTasksClaimed, - docs, + claimedTasks: documentsClaimedById.length + documentsClaimedBySchedule.length, + docs: docs.filter((doc) => doc.status === TaskStatus.Claiming), }; }; diff --git a/x-pack/test/alerting_api_integration/security_and_spaces/tests/alerting/update.ts b/x-pack/test/alerting_api_integration/security_and_spaces/tests/alerting/update.ts index cac6355409ac93..ab3a92d0b3f706 100644 --- a/x-pack/test/alerting_api_integration/security_and_spaces/tests/alerting/update.ts +++ b/x-pack/test/alerting_api_integration/security_and_spaces/tests/alerting/update.ts @@ -31,8 +31,7 @@ export default function createUpdateTests({ getService }: FtrProviderContext) { .then((response: SupertestResponse) => response.body); } - // FLAKY: https://github.com/elastic/kibana/issues/72803 - describe.skip('update', () => { + describe('update', () => { const objectRemover = new ObjectRemover(supertest); after(() => objectRemover.removeAll()); diff --git a/x-pack/test/plugin_api_integration/test_suites/task_manager/task_manager_integration.js b/x-pack/test/plugin_api_integration/test_suites/task_manager/task_manager_integration.js index ea95eb42dd6ff5..c87a5039360b8f 100644 --- a/x-pack/test/plugin_api_integration/test_suites/task_manager/task_manager_integration.js +++ b/x-pack/test/plugin_api_integration/test_suites/task_manager/task_manager_integration.js @@ -28,8 +28,7 @@ export default function ({ getService }) { const testHistoryIndex = '.kibana_task_manager_test_result'; const supertest = supertestAsPromised(url.format(config.get('servers.kibana'))); - // FLAKY: https://github.com/elastic/kibana/issues/71390 - describe.skip('scheduling and running tasks', () => { + describe('scheduling and running tasks', () => { beforeEach( async () => await supertest.delete('/api/sample_tasks').set('kbn-xsrf', 'xxx').expect(200) );