diff --git a/x-pack/plugins/task_manager/server/task_events.ts b/x-pack/plugins/task_manager/server/task_events.ts index b17a3636c17300..e1dd85f868cdd2 100644 --- a/x-pack/plugins/task_manager/server/task_events.ts +++ b/x-pack/plugins/task_manager/server/task_events.ts @@ -4,6 +4,8 @@ * you may not use this file except in compliance with the Elastic License. */ +import { Option } from 'fp-ts/lib/Option'; + import { ConcreteTaskInstance } from './task'; import { Result, Err } from './lib/result_type'; @@ -22,7 +24,7 @@ export interface TaskEvent { } export type TaskMarkRunning = TaskEvent; export type TaskRun = TaskEvent; -export type TaskClaim = TaskEvent; +export type TaskClaim = TaskEvent>; export type TaskRunRequest = TaskEvent; export function asTaskMarkRunningEvent( @@ -46,7 +48,7 @@ export function asTaskRunEvent(id: string, event: Result + event: Result> ): TaskClaim { return { id, diff --git a/x-pack/plugins/task_manager/server/task_manager.test.ts b/x-pack/plugins/task_manager/server/task_manager.test.ts index 07be25f0c72bd0..7035971ad60610 100644 --- a/x-pack/plugins/task_manager/server/task_manager.test.ts +++ b/x-pack/plugins/task_manager/server/task_manager.test.ts @@ -7,6 +7,7 @@ import _ from 'lodash'; import sinon from 'sinon'; import { Subject } from 'rxjs'; +import { none } from 'fp-ts/lib/Option'; import { asTaskMarkRunningEvent, @@ -325,7 +326,7 @@ describe('TaskManager', () => { const result = awaitTaskRunResult(id, events$, getLifecycle); - events$.next(asTaskClaimEvent(id, asErr(new Error('failed to claim')))); + events$.next(asTaskClaimEvent(id, asErr(none))); await expect(result).rejects.toEqual( new Error(`Failed to run task "${id}" as it does not exist`) @@ -341,7 +342,7 @@ describe('TaskManager', () => { const result = awaitTaskRunResult(id, events$, getLifecycle); - events$.next(asTaskClaimEvent(id, asErr(new Error('failed to claim')))); + events$.next(asTaskClaimEvent(id, asErr(none))); await expect(result).rejects.toEqual( new Error(`Failed to run task "${id}" as it is currently running`) @@ -357,7 +358,7 @@ describe('TaskManager', () => { const result = awaitTaskRunResult(id, events$, getLifecycle); - events$.next(asTaskClaimEvent(id, asErr(new Error('failed to claim')))); + events$.next(asTaskClaimEvent(id, asErr(none))); await expect(result).rejects.toEqual( new Error(`Failed to run task "${id}" as it is currently running`) @@ -390,7 +391,7 @@ describe('TaskManager', () => { const result = awaitTaskRunResult(id, events$, getLifecycle); - events$.next(asTaskClaimEvent(id, asErr(new Error('failed to claim')))); + events$.next(asTaskClaimEvent(id, asErr(none))); await expect(result).rejects.toMatchInlineSnapshot( `[Error: Failed to run task "01ddff11-e88a-4d13-bc4e-256164e755e2" for unknown reason (Current Task Lifecycle is "idle")]` @@ -406,7 +407,7 @@ describe('TaskManager', () => { const result = awaitTaskRunResult(id, events$, getLifecycle); - events$.next(asTaskClaimEvent(id, asErr(new Error('failed to claim')))); + events$.next(asTaskClaimEvent(id, asErr(none))); await expect(result).rejects.toMatchInlineSnapshot( `[Error: Failed to run task "01ddff11-e88a-4d13-bc4e-256164e755e2" for unknown reason (Current Task Lifecycle is "failed")]` diff --git a/x-pack/plugins/task_manager/server/task_manager.ts b/x-pack/plugins/task_manager/server/task_manager.ts index 01d3640c04384a..7165fd28678c16 100644 --- a/x-pack/plugins/task_manager/server/task_manager.ts +++ b/x-pack/plugins/task_manager/server/task_manager.ts @@ -9,13 +9,14 @@ import { filter } from 'rxjs/operators'; import { performance } from 'perf_hooks'; import { pipe } from 'fp-ts/lib/pipeable'; -import { Option, some, map as mapOptional } from 'fp-ts/lib/Option'; +import { Option, some, map as mapOptional, getOrElse } from 'fp-ts/lib/Option'; + import { SavedObjectsSerializer, ILegacyScopedClusterClient, ISavedObjectsRepository, } from '../../../../src/core/server'; -import { Result, asErr, either, map, mapErr, promiseResult } from './lib/result_type'; +import { Result, asOk, asErr, either, map, mapErr, promiseResult } from './lib/result_type'; import { TaskManagerConfig } from './config'; import { Logger } from './types'; @@ -405,7 +406,9 @@ export async function claimAvailableTasks( if (docs.length !== claimedTasks) { logger.warn( - `[Task Ownership error]: (${claimedTasks}) tasks were claimed by Kibana, but (${docs.length}) tasks were fetched` + `[Task Ownership error]: ${claimedTasks} tasks were claimed by Kibana, but ${ + docs.length + } task(s) were fetched (${docs.map((doc) => doc.id).join(', ')})` ); } return docs; @@ -437,56 +440,65 @@ export async function awaitTaskRunResult( // listen for all events related to the current task .pipe(filter(({ id }: TaskLifecycleEvent) => id === taskId)) .subscribe((taskEvent: TaskLifecycleEvent) => { - either( - taskEvent.event, - (taskInstance: ConcreteTaskInstance) => { - // resolve if the task has run sucessfully - if (isTaskRunEvent(taskEvent)) { - subscription.unsubscribe(); - resolve({ id: taskInstance.id }); - } - }, - async (error: Error) => { + if (isTaskClaimEvent(taskEvent)) { + mapErr(async (error: Option) => { // reject if any error event takes place for the requested task subscription.unsubscribe(); - if (isTaskRunRequestEvent(taskEvent)) { - return reject( - new Error( - `Failed to run task "${taskId}" as Task Manager is at capacity, please try again later` - ) - ); - } else if (isTaskClaimEvent(taskEvent)) { - return reject( - map( - // if the error happened in the Claim phase - we try to provide better insight - // into why we failed to claim by getting the task's current lifecycle status - await promiseResult(getLifecycle(taskId)), - (taskLifecycleStatus: TaskLifecycle) => { - if (taskLifecycleStatus === TaskLifecycleResult.NotFound) { - return new Error(`Failed to run task "${taskId}" as it does not exist`); - } else if ( - taskLifecycleStatus === TaskStatus.Running || - taskLifecycleStatus === TaskStatus.Claiming - ) { - return new Error(`Failed to run task "${taskId}" as it is currently running`); - } - return new Error( - `Failed to run task "${taskId}" for unknown reason (Current Task Lifecycle is "${taskLifecycleStatus}")` - ); - }, - (getLifecycleError: Error) => - new Error( - `Failed to run task "${taskId}" and failed to get current Status:${[ - error, - getLifecycleError, - ].join('\n')}` - ) - ) - ); + return reject( + map( + await pipe( + error, + mapOptional(async (taskReturnedBySweep) => asOk(taskReturnedBySweep.status)), + getOrElse(() => + // if the error happened in the Claim phase - we try to provide better insight + // into why we failed to claim by getting the task's current lifecycle status + promiseResult(getLifecycle(taskId)) + ) + ), + (taskLifecycleStatus: TaskLifecycle) => { + if (taskLifecycleStatus === TaskLifecycleResult.NotFound) { + return new Error(`Failed to run task "${taskId}" as it does not exist`); + } else if ( + taskLifecycleStatus === TaskStatus.Running || + taskLifecycleStatus === TaskStatus.Claiming + ) { + return new Error(`Failed to run task "${taskId}" as it is currently running`); + } + return new Error( + `Failed to run task "${taskId}" for unknown reason (Current Task Lifecycle is "${taskLifecycleStatus}")` + ); + }, + (getLifecycleError: Error) => + new Error( + `Failed to run task "${taskId}" and failed to get current Status:${getLifecycleError}` + ) + ) + ); + }, taskEvent.event); + } else { + either>( + taskEvent.event, + (taskInstance: ConcreteTaskInstance) => { + // resolve if the task has run sucessfully + if (isTaskRunEvent(taskEvent)) { + subscription.unsubscribe(); + resolve({ id: taskInstance.id }); + } + }, + async (error: Error | Option) => { + // reject if any error event takes place for the requested task + subscription.unsubscribe(); + if (isTaskRunRequestEvent(taskEvent)) { + return reject( + new Error( + `Failed to run task "${taskId}" as Task Manager is at capacity, please try again later` + ) + ); + } + return reject(new Error(`Failed to run task "${taskId}": ${error}`)); } - return reject(new Error(`Failed to run task "${taskId}": ${error}`)); - } - ); + ); + } }); }); } diff --git a/x-pack/plugins/task_manager/server/task_store.test.ts b/x-pack/plugins/task_manager/server/task_store.test.ts index 771b4e2d7d9cb6..d65c39f4f454d6 100644 --- a/x-pack/plugins/task_manager/server/task_store.test.ts +++ b/x-pack/plugins/task_manager/server/task_store.test.ts @@ -8,6 +8,7 @@ import _ from 'lodash'; import sinon from 'sinon'; import uuid from 'uuid'; import { filter } from 'rxjs/operators'; +import { Option, some, none } from 'fp-ts/lib/Option'; import { TaskDictionary, @@ -972,7 +973,7 @@ if (doc['task.runAt'].size()!=0) { const runAt = new Date(); const tasks = [ { - _id: 'aaa', + _id: 'claimed-by-id', _source: { type: 'task', task: { @@ -980,7 +981,7 @@ if (doc['task.runAt'].size()!=0) { taskType: 'foo', schedule: undefined, attempts: 0, - status: 'idle', + status: 'claiming', params: '{ "hello": "world" }', state: '{ "baby": "Henhen" }', user: 'jimbo', @@ -996,7 +997,31 @@ if (doc['task.runAt'].size()!=0) { sort: ['a', 1], }, { - _id: 'bbb', + _id: 'claimed-by-schedule', + _source: { + type: 'task', + task: { + runAt, + taskType: 'bar', + schedule: { interval: '5m' }, + attempts: 2, + status: 'claiming', + params: '{ "shazm": 1 }', + state: '{ "henry": "The 8th" }', + user: 'dabo', + scope: ['reporting', 'ceo'], + ownerId: taskManagerId, + startedAt: null, + retryAt: null, + scheduledAt: new Date(), + }, + }, + _seq_no: 3, + _primary_term: 4, + sort: ['b', 2], + }, + { + _id: 'already-running', _source: { type: 'task', task: { @@ -1045,19 +1070,24 @@ if (doc['task.runAt'].size()!=0) { }); const sub = store.events - .pipe(filter((event: TaskEvent) => event.id === 'aaa')) + .pipe( + filter( + (event: TaskEvent>) => + event.id === 'claimed-by-id' + ) + ) .subscribe({ - next: (event: TaskEvent) => { + next: (event: TaskEvent>) => { expect(event).toMatchObject( asTaskClaimEvent( - 'aaa', + 'claimed-by-id', asOk({ - id: 'aaa', + id: 'claimed-by-id', runAt, taskType: 'foo', schedule: undefined, attempts: 0, - status: 'idle' as TaskStatus, + status: 'claiming' as TaskStatus, params: { hello: 'world' }, state: { baby: 'Henhen' }, user: 'jimbo', @@ -1075,7 +1105,7 @@ if (doc['task.runAt'].size()!=0) { }); await store.claimAvailableTasks({ - claimTasksById: ['aaa'], + claimTasksById: ['claimed-by-id'], claimOwnershipUntil: new Date(), size: 10, }); @@ -1102,19 +1132,24 @@ if (doc['task.runAt'].size()!=0) { }); const sub = store.events - .pipe(filter((event: TaskEvent) => event.id === 'bbb')) + .pipe( + filter( + (event: TaskEvent>) => + event.id === 'claimed-by-schedule' + ) + ) .subscribe({ - next: (event: TaskEvent) => { + next: (event: TaskEvent>) => { expect(event).toMatchObject( asTaskClaimEvent( - 'bbb', + 'claimed-by-schedule', asOk({ - id: 'bbb', + id: 'claimed-by-schedule', runAt, taskType: 'bar', schedule: { interval: '5m' }, attempts: 2, - status: 'running' as TaskStatus, + status: 'claiming' as TaskStatus, params: { shazm: 1 }, state: { henry: 'The 8th' }, user: 'dabo', @@ -1132,14 +1167,14 @@ if (doc['task.runAt'].size()!=0) { }); await store.claimAvailableTasks({ - claimTasksById: ['aaa'], + claimTasksById: ['claimed-by-id'], claimOwnershipUntil: new Date(), size: 10, }); }); test('emits an event when the store fails to claim a required task by id', async (done) => { - const { taskManagerId, tasks } = generateTasks(); + const { taskManagerId, runAt, tasks } = generateTasks(); const callCluster = sinon.spy(async (name: string, params?: unknown) => name === 'updateByQuery' ? { @@ -1159,11 +1194,36 @@ if (doc['task.runAt'].size()!=0) { }); const sub = store.events - .pipe(filter((event: TaskEvent) => event.id === 'ccc')) + .pipe( + filter( + (event: TaskEvent>) => + event.id === 'already-running' + ) + ) .subscribe({ - next: (event: TaskEvent) => { + next: (event: TaskEvent>) => { expect(event).toMatchObject( - asTaskClaimEvent('ccc', asErr(new Error(`failed to claim task 'ccc'`))) + asTaskClaimEvent( + 'already-running', + asErr( + some({ + id: 'already-running', + runAt, + taskType: 'bar', + schedule: { interval: '5m' }, + attempts: 2, + status: 'running' as TaskStatus, + params: { shazm: 1 }, + state: { henry: 'The 8th' }, + user: 'dabo', + scope: ['reporting', 'ceo'], + ownerId: taskManagerId, + startedAt: null, + retryAt: null, + scheduledAt: new Date(), + }) + ) + ) ); sub.unsubscribe(); done(); @@ -1171,7 +1231,49 @@ if (doc['task.runAt'].size()!=0) { }); await store.claimAvailableTasks({ - claimTasksById: ['ccc'], + claimTasksById: ['already-running'], + claimOwnershipUntil: new Date(), + size: 10, + }); + }); + + test('emits an event when the store fails to find a task which was required by id', async (done) => { + const { taskManagerId, tasks } = generateTasks(); + const callCluster = sinon.spy(async (name: string, params?: unknown) => + name === 'updateByQuery' + ? { + total: tasks.length, + updated: tasks.length, + } + : { hits: { hits: tasks } } + ); + const store = new TaskStore({ + callCluster, + maxAttempts: 2, + definitions: taskDefinitions, + serializer, + savedObjectsRepository: savedObjectsClient, + taskManagerId, + index: '', + }); + + const sub = store.events + .pipe( + filter( + (event: TaskEvent>) => + event.id === 'unknown-task' + ) + ) + .subscribe({ + next: (event: TaskEvent>) => { + expect(event).toMatchObject(asTaskClaimEvent('unknown-task', asErr(none))); + sub.unsubscribe(); + done(); + }, + }); + + await store.claimAvailableTasks({ + claimTasksById: ['unknown-task'], claimOwnershipUntil: new Date(), size: 10, }); diff --git a/x-pack/plugins/task_manager/server/task_store.ts b/x-pack/plugins/task_manager/server/task_store.ts index 7ec3db5c99aa78..44b2d892f4c082 100644 --- a/x-pack/plugins/task_manager/server/task_store.ts +++ b/x-pack/plugins/task_manager/server/task_store.ts @@ -9,7 +9,9 @@ */ import apm from 'elastic-apm-node'; import { Subject, Observable } from 'rxjs'; -import { omit, difference, defaults } from 'lodash'; +import { omit, difference, partition, map, defaults } from 'lodash'; + +import { some, none } from 'fp-ts/lib/Option'; import { SearchResponse, UpdateDocumentByQueryResponse } from 'elasticsearch'; import { @@ -31,6 +33,7 @@ import { TaskLifecycle, TaskLifecycleResult, SerializedConcreteTaskInstance, + TaskStatus, } from './task'; import { TaskClaim, asTaskClaimEvent } from './task_events'; @@ -221,13 +224,35 @@ export class TaskStore { // emit success/fail events for claimed tasks by id if (claimTasksById && claimTasksById.length) { - this.emitEvents(docs.map((doc) => asTaskClaimEvent(doc.id, asOk(doc)))); + const [documentsReturnedById, documentsClaimedBySchedule] = partition(docs, (doc) => + claimTasksById.includes(doc.id) + ); + + const [documentsClaimedById, documentsRequestedButNotClaimed] = partition( + documentsReturnedById, + // we filter the schduled tasks down by status is 'claiming' in the esearch, + // but we do not apply this limitation on tasks claimed by ID so that we can + // provide more detailed error messages when we fail to claim them + (doc) => doc.status === TaskStatus.Claiming + ); + + const documentsRequestedButNotReturned = difference( + claimTasksById, + map(documentsReturnedById, 'id') + ); + + this.emitEvents( + [...documentsClaimedById, ...documentsClaimedBySchedule].map((doc) => + asTaskClaimEvent(doc.id, asOk(doc)) + ) + ); + + this.emitEvents( + documentsRequestedButNotClaimed.map((doc) => asTaskClaimEvent(doc.id, asErr(some(doc)))) + ); this.emitEvents( - difference( - claimTasksById, - docs.map((doc) => doc.id) - ).map((id) => asTaskClaimEvent(id, asErr(new Error(`failed to claim task '${id}'`)))) + documentsRequestedButNotReturned.map((id) => asTaskClaimEvent(id, asErr(none))) ); } diff --git a/x-pack/test/plugin_api_integration/test_suites/task_manager/task_manager_integration.js b/x-pack/test/plugin_api_integration/test_suites/task_manager/task_manager_integration.js index 488df3843bdaaa..c6ecd3c0f237f9 100644 --- a/x-pack/test/plugin_api_integration/test_suites/task_manager/task_manager_integration.js +++ b/x-pack/test/plugin_api_integration/test_suites/task_manager/task_manager_integration.js @@ -487,8 +487,13 @@ export default function ({ getService }) { expect( docs.filter((taskDoc) => taskDoc._source.taskId === longRunningTask.id).length ).to.eql(1); + + const task = await currentTask(longRunningTask.id); + expect(task.status).to.eql('running'); }); + await delay(1000); + // first runNow should fail const failedRunNowResult = await runTaskNow({ id: longRunningTask.id, @@ -504,8 +509,13 @@ export default function ({ getService }) { await retry.try(async () => { const tasks = (await currentTasks()).docs; expect(getTaskById(tasks, longRunningTask.id).state.count).to.eql(1); + + const task = await currentTask(longRunningTask.id); + expect(task.status).to.eql('idle'); }); + await delay(1000); + // second runNow should be successful const successfulRunNowResult = runTaskNow({ id: longRunningTask.id,