Skip to content

Commit

Permalink
[Task manager] Prevents edge case where already running tasks are res…
Browse files Browse the repository at this point in the history
…chedule every polling interval (elastic#74606)

Fixes flaky tests in Task Manager and Alerting.

The fix in elastic#73244 was correct, but it missed an edge case which causes the already running task to be rescheduled over and over.

This prevents that edge case which was effecting both TM in general and Alerting specifically.
  • Loading branch information
gmmorris committed Aug 13, 2020
1 parent 1a09c87 commit eb03295
Show file tree
Hide file tree
Showing 4 changed files with 125 additions and 40 deletions.
104 changes: 100 additions & 4 deletions x-pack/plugins/task_manager/server/task_store.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -627,7 +627,7 @@ if (doc['task.runAt'].size()!=0) {
});
});

test('it returns task objects', async () => {
test('it filters out running tasks', async () => {
const taskManagerId = uuid.v1();
const claimOwnershipUntil = new Date(Date.now());
const runAt = new Date();
Expand All @@ -641,7 +641,7 @@ if (doc['task.runAt'].size()!=0) {
taskType: 'foo',
schedule: undefined,
attempts: 0,
status: 'idle',
status: 'claiming',
params: '{ "hello": "world" }',
state: '{ "baby": "Henhen" }',
user: 'jimbo',
Expand Down Expand Up @@ -715,7 +715,103 @@ if (doc['task.runAt'].size()!=0) {
runAt,
scope: ['reporting'],
state: { baby: 'Henhen' },
status: 'idle',
status: 'claiming',
taskType: 'foo',
user: 'jimbo',
ownerId: taskManagerId,
},
]);
});

test('it returns task objects', async () => {
const taskManagerId = uuid.v1();
const claimOwnershipUntil = new Date(Date.now());
const runAt = new Date();
const tasks = [
{
_id: 'aaa',
_source: {
type: 'task',
task: {
runAt,
taskType: 'foo',
schedule: undefined,
attempts: 0,
status: 'claiming',
params: '{ "hello": "world" }',
state: '{ "baby": "Henhen" }',
user: 'jimbo',
scope: ['reporting'],
ownerId: taskManagerId,
},
},
_seq_no: 1,
_primary_term: 2,
sort: ['a', 1],
},
{
_id: 'bbb',
_source: {
type: 'task',
task: {
runAt,
taskType: 'bar',
schedule: { interval: '5m' },
attempts: 2,
status: 'claiming',
params: '{ "shazm": 1 }',
state: '{ "henry": "The 8th" }',
user: 'dabo',
scope: ['reporting', 'ceo'],
ownerId: taskManagerId,
},
},
_seq_no: 3,
_primary_term: 4,
sort: ['b', 2],
},
];
const {
result: { docs },
args: {
search: {
body: { query },
},
},
} = await testClaimAvailableTasks({
opts: {
taskManagerId,
},
claimingOpts: {
claimOwnershipUntil,
size: 10,
},
hits: tasks,
});

expect(query.bool.must).toContainEqual({
bool: {
must: [
{
term: {
'task.ownerId': taskManagerId,
},
},
{ term: { 'task.status': 'claiming' } },
],
},
});

expect(docs).toMatchObject([
{
attempts: 0,
id: 'aaa',
schedule: undefined,
params: { hello: 'world' },
runAt,
scope: ['reporting'],
state: { baby: 'Henhen' },
status: 'claiming',
taskType: 'foo',
user: 'jimbo',
ownerId: taskManagerId,
Expand All @@ -728,7 +824,7 @@ if (doc['task.runAt'].size()!=0) {
runAt,
scope: ['reporting', 'ceo'],
state: { henry: 'The 8th' },
status: 'running',
status: 'claiming',
taskType: 'bar',
user: 'dabo',
ownerId: taskManagerId,
Expand Down
55 changes: 23 additions & 32 deletions x-pack/plugins/task_manager/server/task_store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -217,48 +217,39 @@ export class TaskStore {
claimTasksByIdWithRawIds,
size
);

const docs =
numberOfTasksClaimed > 0
? await this.sweepForClaimedTasks(claimTasksByIdWithRawIds, size)
: [];

// emit success/fail events for claimed tasks by id
if (claimTasksById && claimTasksById.length) {
const [documentsReturnedById, documentsClaimedBySchedule] = partition(docs, (doc) =>
claimTasksById.includes(doc.id)
);

const [documentsClaimedById, documentsRequestedButNotClaimed] = partition(
documentsReturnedById,
// we filter the schduled tasks down by status is 'claiming' in the esearch,
// but we do not apply this limitation on tasks claimed by ID so that we can
// provide more detailed error messages when we fail to claim them
(doc) => doc.status === TaskStatus.Claiming
);

const documentsRequestedButNotReturned = difference(
claimTasksById,
map(documentsReturnedById, 'id')
);
const [documentsReturnedById, documentsClaimedBySchedule] = partition(docs, (doc) =>
claimTasksById.includes(doc.id)
);

this.emitEvents(
[...documentsClaimedById, ...documentsClaimedBySchedule].map((doc) =>
asTaskClaimEvent(doc.id, asOk(doc))
)
);
const [documentsClaimedById, documentsRequestedButNotClaimed] = partition(
documentsReturnedById,
// we filter the schduled tasks down by status is 'claiming' in the esearch,
// but we do not apply this limitation on tasks claimed by ID so that we can
// provide more detailed error messages when we fail to claim them
(doc) => doc.status === TaskStatus.Claiming
);

this.emitEvents(
documentsRequestedButNotClaimed.map((doc) => asTaskClaimEvent(doc.id, asErr(some(doc))))
);
const documentsRequestedButNotReturned = difference(
claimTasksById,
map(documentsReturnedById, 'id')
);

this.emitEvents(
documentsRequestedButNotReturned.map((id) => asTaskClaimEvent(id, asErr(none)))
);
}
this.emitEvents([
...documentsClaimedById.map((doc) => asTaskClaimEvent(doc.id, asOk(doc))),
...documentsClaimedBySchedule.map((doc) => asTaskClaimEvent(doc.id, asOk(doc))),
...documentsRequestedButNotClaimed.map((doc) => asTaskClaimEvent(doc.id, asErr(some(doc)))),
...documentsRequestedButNotReturned.map((id) => asTaskClaimEvent(id, asErr(none))),
]);

return {
claimedTasks: numberOfTasksClaimed,
docs,
claimedTasks: documentsClaimedById.length + documentsClaimedBySchedule.length,
docs: docs.filter((doc) => doc.status === TaskStatus.Claiming),
};
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,7 @@ export default function createUpdateTests({ getService }: FtrProviderContext) {
.then((response: SupertestResponse) => response.body);
}

// FLAKY: https://github.com/elastic/kibana/issues/72803
describe.skip('update', () => {
describe('update', () => {
const objectRemover = new ObjectRemover(supertest);

after(() => objectRemover.removeAll());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,7 @@ export default function ({ getService }) {
const testHistoryIndex = '.kibana_task_manager_test_result';
const supertest = supertestAsPromised(url.format(config.get('servers.kibana')));

// FLAKY: https://github.com/elastic/kibana/issues/71390
describe.skip('scheduling and running tasks', () => {
describe('scheduling and running tasks', () => {
beforeEach(
async () => await supertest.delete('/api/sample_tasks').set('kbn-xsrf', 'xxx').expect(200)
);
Expand Down

0 comments on commit eb03295

Please sign in to comment.