diff --git a/x-pack/plugins/task_manager/server/task_store.test.ts b/x-pack/plugins/task_manager/server/task_store.test.ts index 81d72c68b3a9e4..a2a0ee11380ff6 100644 --- a/x-pack/plugins/task_manager/server/task_store.test.ts +++ b/x-pack/plugins/task_manager/server/task_store.test.ts @@ -269,12 +269,13 @@ describe('TaskStore', () => { opts = {}, hits = generateFakeTasks(1), claimingOpts, + versionConflicts = 2, }: { opts: Partial; hits?: unknown[]; claimingOpts: OwnershipClaimingOpts; + versionConflicts?: number; }) { - const versionConflicts = 2; const esClient = elasticsearchServiceMock.createClusterClient().asInternalUser; esClient.search.mockResolvedValue(asApiResponse({ hits: { hits } })); esClient.updateByQuery.mockResolvedValue( @@ -971,6 +972,77 @@ if (doc['task.runAt'].size()!=0) { ]); }); + test('it returns version_conflicts that do not include conflicts that were proceeded against', async () => { + const taskManagerId = uuid.v1(); + const claimOwnershipUntil = new Date(Date.now()); + const runAt = new Date(); + const tasks = [ + { + _id: 'task:aaa', + _source: { + type: 'task', + task: { + runAt, + taskType: 'foo', + schedule: undefined, + attempts: 0, + status: 'claiming', + params: '{ "hello": "world" }', + state: '{ "baby": "Henhen" }', + user: 'jimbo', + scope: ['reporting'], + ownerId: taskManagerId, + }, + }, + _seq_no: 1, + _primary_term: 2, + sort: ['a', 1], + }, + { + _id: 'task:bbb', + _source: { + type: 'task', + task: { + runAt, + taskType: 'bar', + schedule: { interval: '5m' }, + attempts: 2, + status: 'claiming', + params: '{ "shazm": 1 }', + state: '{ "henry": "The 8th" }', + user: 'dabo', + scope: ['reporting', 'ceo'], + ownerId: taskManagerId, + }, + }, + _seq_no: 3, + _primary_term: 4, + sort: ['b', 2], + }, + ]; + const maxDocs = 10; + const { + result: { stats: { tasksUpdated, tasksConflicted, tasksClaimed } = {} } = {}, + } = await testClaimAvailableTasks({ + opts: { + taskManagerId, + }, + claimingOpts: { + claimOwnershipUntil, + size: maxDocs, + }, + hits: tasks, + // assume there were 20 version conflists, but thanks to `conflicts="proceed"` + // we proceeded to claim tasks + versionConflicts: 20, + }); + + expect(tasksUpdated).toEqual(2); + // ensure we only count conflicts that *may* have counted against max_docs, no more than that + expect(tasksConflicted).toEqual(10 - tasksUpdated!); + expect(tasksClaimed).toEqual(2); + }); + test('pushes error from saved objects client to errors$', async () => { const esClient = elasticsearchServiceMock.createClusterClient().asInternalUser; const store = new TaskStore({ diff --git a/x-pack/plugins/task_manager/server/task_store.ts b/x-pack/plugins/task_manager/server/task_store.ts index 5d17c6246088a3..4b02e35c615826 100644 --- a/x-pack/plugins/task_manager/server/task_store.ts +++ b/x-pack/plugins/task_manager/server/task_store.ts @@ -529,7 +529,7 @@ export class TaskStore { private async updateByQuery( opts: UpdateByQuerySearchOpts = {}, // eslint-disable-next-line @typescript-eslint/naming-convention - { max_docs }: UpdateByQueryOpts = {} + { max_docs: max_docs }: UpdateByQueryOpts = {} ): Promise { const { query } = ensureQueryOnlyReturnsTaskObjects(opts); try { @@ -548,10 +548,22 @@ export class TaskStore { }, }); + /** + * When we run updateByQuery with conflicts='proceed', it's possible for the `version_conflicts` + * to count against the specified `max_docs`, as per https://github.com/elastic/elasticsearch/issues/63671 + * In order to correct for that happening, we only count `version_conflicts` if we haven't updated as + * many docs as we could have. + * This is still no more than an estimation, as there might have been less docuemnt to update that the + * `max_docs`, but we bias in favour of over zealous `version_conflicts` as that's the best indicator we + * have for an unhealthy cluster distribution of Task Manager polling intervals + */ + const conflictsCorrectedForContinuation = + max_docs && version_conflicts + updated > max_docs ? max_docs - updated : version_conflicts; + return { total, updated, - version_conflicts, + version_conflicts: conflictsCorrectedForContinuation, }; } catch (e) { this.errors$.next(e);