From 3ed7ac79e5042a81511ed43a8dc0164819101415 Mon Sep 17 00:00:00 2001 From: Patrick Mueller Date: Thu, 22 Oct 2020 13:38:11 -0400 Subject: [PATCH] [task manager] do not sort tasks to be claimed by score if no pinned tasks (#80692) (#81495) resolves: https://github.com/elastic/kibana/issues/80371 Previously, when claiming tasks, we were always sorting the tasks to claim by the score and then by the time they should be run. We sort by score to capture `runNow()` tasks, also referred to internally as "pinned" tasks in the update by query. The change in this PR is to only sort by score if there are pinned tasks, and to not sort by score at all if there aren't any. --- .../task_manager/server/task_store.test.ts | 21 ++++++++++++++++++- .../plugins/task_manager/server/task_store.ts | 19 +++++++++++------ 2 files changed, 33 insertions(+), 7 deletions(-) diff --git a/x-pack/plugins/task_manager/server/task_store.test.ts b/x-pack/plugins/task_manager/server/task_store.test.ts index 8d47d3dd30b825..a40df3b84132ed 100644 --- a/x-pack/plugins/task_manager/server/task_store.test.ts +++ b/x-pack/plugins/task_manager/server/task_store.test.ts @@ -367,7 +367,7 @@ describe('TaskStore', () => { const { args: { - updateByQuery: { body: { query } = {} }, + updateByQuery: { body: { query, sort } = {} }, }, } = await testClaimAvailableTasks({ opts: { @@ -476,6 +476,25 @@ describe('TaskStore', () => { ], }, }); + expect(sort).toMatchObject([ + { + _script: { + type: 'number', + order: 'asc', + script: { + lang: 'painless', + source: ` +if (doc['task.retryAt'].size()!=0) { + return doc['task.retryAt'].value.toInstant().toEpochMilli(); +} +if (doc['task.runAt'].size()!=0) { + return doc['task.runAt'].value.toInstant().toEpochMilli(); +} + `, + }, + }, + }, + ]); }); test('it supports claiming specific tasks by id', async () => { diff --git a/x-pack/plugins/task_manager/server/task_store.ts b/x-pack/plugins/task_manager/server/task_store.ts index 4c41be9577ad00..63b6ab7412ec53 100644 --- a/x-pack/plugins/task_manager/server/task_store.ts +++ b/x-pack/plugins/task_manager/server/task_store.ts @@ -46,6 +46,7 @@ import { RangeFilter, asPinnedQuery, matchesClauses, + SortOptions, } from './queries/query_clauses'; import { @@ -272,6 +273,17 @@ export class TaskStore { ) ); + // The documents should be sorted by runAt/retryAt, unless there are pinned + // tasks being queried, in which case we want to sort by score first, and then + // the runAt/retryAt. That way we'll get the pinned tasks first. Note that + // the score seems to favor newer documents rather than older documents, so + // if there are not pinned tasks being queried, we do NOT want to sort by score + // at all, just by runAt/retryAt. + const sort: SortOptions = [SortByRunAtAndRetryAt]; + if (claimTasksById && claimTasksById.length) { + sort.unshift('_score'); + } + const apmTrans = apm.startTransaction(`taskManager markAvailableTasksAsClaimed`, 'taskManager'); const { updated } = await this.updateByQuery( asUpdateByQuery({ @@ -288,12 +300,7 @@ export class TaskStore { status: 'claiming', retryAt: claimOwnershipUntil, }), - sort: [ - // sort by score first, so the "pinned" Tasks are first - '_score', - // the nsort by other fields - SortByRunAtAndRetryAt, - ], + sort, }), { max_docs: size,