Skip to content

Commit

Permalink
[task manager] do not sort tasks to be claimed by score if no pinned …
Browse files Browse the repository at this point in the history
…tasks (#80692) (#81495)

resolves: #80371

Previously, when claiming tasks, we were always sorting the tasks to claim by
the score and then by the time they should be run.  We sort by score to
capture `runNow()` tasks, also referred to internally as "pinned" tasks
in the update by query.

The change in this PR is to only sort by score if there are pinned tasks, and
to not sort by score at all if there aren't any.
  • Loading branch information
pmuellr committed Oct 22, 2020
1 parent 57c31e8 commit 3ed7ac7
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 7 deletions.
21 changes: 20 additions & 1 deletion x-pack/plugins/task_manager/server/task_store.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -367,7 +367,7 @@ describe('TaskStore', () => {

const {
args: {
updateByQuery: { body: { query } = {} },
updateByQuery: { body: { query, sort } = {} },
},
} = await testClaimAvailableTasks({
opts: {
Expand Down Expand Up @@ -476,6 +476,25 @@ describe('TaskStore', () => {
],
},
});
expect(sort).toMatchObject([
{
_script: {
type: 'number',
order: 'asc',
script: {
lang: 'painless',
source: `
if (doc['task.retryAt'].size()!=0) {
return doc['task.retryAt'].value.toInstant().toEpochMilli();
}
if (doc['task.runAt'].size()!=0) {
return doc['task.runAt'].value.toInstant().toEpochMilli();
}
`,
},
},
},
]);
});

test('it supports claiming specific tasks by id', async () => {
Expand Down
19 changes: 13 additions & 6 deletions x-pack/plugins/task_manager/server/task_store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ import {
RangeFilter,
asPinnedQuery,
matchesClauses,
SortOptions,
} from './queries/query_clauses';

import {
Expand Down Expand Up @@ -272,6 +273,17 @@ export class TaskStore {
)
);

// The documents should be sorted by runAt/retryAt, unless there are pinned
// tasks being queried, in which case we want to sort by score first, and then
// the runAt/retryAt. That way we'll get the pinned tasks first. Note that
// the score seems to favor newer documents rather than older documents, so
// if there are not pinned tasks being queried, we do NOT want to sort by score
// at all, just by runAt/retryAt.
const sort: SortOptions = [SortByRunAtAndRetryAt];
if (claimTasksById && claimTasksById.length) {
sort.unshift('_score');
}

const apmTrans = apm.startTransaction(`taskManager markAvailableTasksAsClaimed`, 'taskManager');
const { updated } = await this.updateByQuery(
asUpdateByQuery({
Expand All @@ -288,12 +300,7 @@ export class TaskStore {
status: 'claiming',
retryAt: claimOwnershipUntil,
}),
sort: [
// sort by score first, so the "pinned" Tasks are first
'_score',
// the nsort by other fields
SortByRunAtAndRetryAt,
],
sort,
}),
{
max_docs: size,
Expand Down

0 comments on commit 3ed7ac7

Please sign in to comment.