Skip to content

Commit

Permalink
correctly handle sweep by id when task is already running
Browse files Browse the repository at this point in the history
  • Loading branch information
gmmorris committed Jul 31, 2020
1 parent cd70954 commit 9e6f63f
Show file tree
Hide file tree
Showing 6 changed files with 235 additions and 83 deletions.
6 changes: 4 additions & 2 deletions x-pack/plugins/task_manager/server/task_events.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
* you may not use this file except in compliance with the Elastic License.
*/

import { Option } from 'fp-ts/lib/Option';

import { ConcreteTaskInstance } from './task';

import { Result, Err } from './lib/result_type';
Expand All @@ -22,7 +24,7 @@ export interface TaskEvent<T, E> {
}
export type TaskMarkRunning = TaskEvent<ConcreteTaskInstance, Error>;
export type TaskRun = TaskEvent<ConcreteTaskInstance, Error>;
export type TaskClaim = TaskEvent<ConcreteTaskInstance, Error>;
export type TaskClaim = TaskEvent<ConcreteTaskInstance, Option<ConcreteTaskInstance>>;
export type TaskRunRequest = TaskEvent<ConcreteTaskInstance, Error>;

export function asTaskMarkRunningEvent(
Expand All @@ -46,7 +48,7 @@ export function asTaskRunEvent(id: string, event: Result<ConcreteTaskInstance, E

export function asTaskClaimEvent(
id: string,
event: Result<ConcreteTaskInstance, Error>
event: Result<ConcreteTaskInstance, Option<ConcreteTaskInstance>>
): TaskClaim {
return {
id,
Expand Down
11 changes: 6 additions & 5 deletions x-pack/plugins/task_manager/server/task_manager.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import _ from 'lodash';
import sinon from 'sinon';
import { Subject } from 'rxjs';
import { none } from 'fp-ts/lib/Option';

import {
asTaskMarkRunningEvent,
Expand Down Expand Up @@ -325,7 +326,7 @@ describe('TaskManager', () => {

const result = awaitTaskRunResult(id, events$, getLifecycle);

events$.next(asTaskClaimEvent(id, asErr(new Error('failed to claim'))));
events$.next(asTaskClaimEvent(id, asErr(none)));

await expect(result).rejects.toEqual(
new Error(`Failed to run task "${id}" as it does not exist`)
Expand All @@ -341,7 +342,7 @@ describe('TaskManager', () => {

const result = awaitTaskRunResult(id, events$, getLifecycle);

events$.next(asTaskClaimEvent(id, asErr(new Error('failed to claim'))));
events$.next(asTaskClaimEvent(id, asErr(none)));

await expect(result).rejects.toEqual(
new Error(`Failed to run task "${id}" as it is currently running`)
Expand All @@ -357,7 +358,7 @@ describe('TaskManager', () => {

const result = awaitTaskRunResult(id, events$, getLifecycle);

events$.next(asTaskClaimEvent(id, asErr(new Error('failed to claim'))));
events$.next(asTaskClaimEvent(id, asErr(none)));

await expect(result).rejects.toEqual(
new Error(`Failed to run task "${id}" as it is currently running`)
Expand Down Expand Up @@ -390,7 +391,7 @@ describe('TaskManager', () => {

const result = awaitTaskRunResult(id, events$, getLifecycle);

events$.next(asTaskClaimEvent(id, asErr(new Error('failed to claim'))));
events$.next(asTaskClaimEvent(id, asErr(none)));

await expect(result).rejects.toMatchInlineSnapshot(
`[Error: Failed to run task "01ddff11-e88a-4d13-bc4e-256164e755e2" for unknown reason (Current Task Lifecycle is "idle")]`
Expand All @@ -406,7 +407,7 @@ describe('TaskManager', () => {

const result = awaitTaskRunResult(id, events$, getLifecycle);

events$.next(asTaskClaimEvent(id, asErr(new Error('failed to claim'))));
events$.next(asTaskClaimEvent(id, asErr(none)));

await expect(result).rejects.toMatchInlineSnapshot(
`[Error: Failed to run task "01ddff11-e88a-4d13-bc4e-256164e755e2" for unknown reason (Current Task Lifecycle is "failed")]`
Expand Down
112 changes: 62 additions & 50 deletions x-pack/plugins/task_manager/server/task_manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,14 @@ import { filter } from 'rxjs/operators';
import { performance } from 'perf_hooks';

import { pipe } from 'fp-ts/lib/pipeable';
import { Option, some, map as mapOptional } from 'fp-ts/lib/Option';
import { Option, some, map as mapOptional, getOrElse } from 'fp-ts/lib/Option';

import {
SavedObjectsSerializer,
ILegacyScopedClusterClient,
ISavedObjectsRepository,
} from '../../../../src/core/server';
import { Result, asErr, either, map, mapErr, promiseResult } from './lib/result_type';
import { Result, asOk, asErr, either, map, mapErr, promiseResult } from './lib/result_type';
import { TaskManagerConfig } from './config';

import { Logger } from './types';
Expand Down Expand Up @@ -405,7 +406,9 @@ export async function claimAvailableTasks(

if (docs.length !== claimedTasks) {
logger.warn(
`[Task Ownership error]: (${claimedTasks}) tasks were claimed by Kibana, but (${docs.length}) tasks were fetched`
`[Task Ownership error]: ${claimedTasks} tasks were claimed by Kibana, but ${
docs.length
} task(s) were fetched (${docs.map((doc) => doc.id).join(', ')})`
);
}
return docs;
Expand Down Expand Up @@ -437,56 +440,65 @@ export async function awaitTaskRunResult(
// listen for all events related to the current task
.pipe(filter(({ id }: TaskLifecycleEvent) => id === taskId))
.subscribe((taskEvent: TaskLifecycleEvent) => {
either(
taskEvent.event,
(taskInstance: ConcreteTaskInstance) => {
// resolve if the task has run sucessfully
if (isTaskRunEvent(taskEvent)) {
subscription.unsubscribe();
resolve({ id: taskInstance.id });
}
},
async (error: Error) => {
if (isTaskClaimEvent(taskEvent)) {
mapErr(async (error: Option<ConcreteTaskInstance>) => {
// reject if any error event takes place for the requested task
subscription.unsubscribe();
if (isTaskRunRequestEvent(taskEvent)) {
return reject(
new Error(
`Failed to run task "${taskId}" as Task Manager is at capacity, please try again later`
)
);
} else if (isTaskClaimEvent(taskEvent)) {
return reject(
map(
// if the error happened in the Claim phase - we try to provide better insight
// into why we failed to claim by getting the task's current lifecycle status
await promiseResult<TaskLifecycle, Error>(getLifecycle(taskId)),
(taskLifecycleStatus: TaskLifecycle) => {
if (taskLifecycleStatus === TaskLifecycleResult.NotFound) {
return new Error(`Failed to run task "${taskId}" as it does not exist`);
} else if (
taskLifecycleStatus === TaskStatus.Running ||
taskLifecycleStatus === TaskStatus.Claiming
) {
return new Error(`Failed to run task "${taskId}" as it is currently running`);
}
return new Error(
`Failed to run task "${taskId}" for unknown reason (Current Task Lifecycle is "${taskLifecycleStatus}")`
);
},
(getLifecycleError: Error) =>
new Error(
`Failed to run task "${taskId}" and failed to get current Status:${[
error,
getLifecycleError,
].join('\n')}`
)
)
);
return reject(
map(
await pipe(
error,
mapOptional(async (taskReturnedBySweep) => asOk(taskReturnedBySweep.status)),
getOrElse(() =>
// if the error happened in the Claim phase - we try to provide better insight
// into why we failed to claim by getting the task's current lifecycle status
promiseResult<TaskLifecycle, Error>(getLifecycle(taskId))
)
),
(taskLifecycleStatus: TaskLifecycle) => {
if (taskLifecycleStatus === TaskLifecycleResult.NotFound) {
return new Error(`Failed to run task "${taskId}" as it does not exist`);
} else if (
taskLifecycleStatus === TaskStatus.Running ||
taskLifecycleStatus === TaskStatus.Claiming
) {
return new Error(`Failed to run task "${taskId}" as it is currently running`);
}
return new Error(
`Failed to run task "${taskId}" for unknown reason (Current Task Lifecycle is "${taskLifecycleStatus}")`
);
},
(getLifecycleError: Error) =>
new Error(
`Failed to run task "${taskId}" and failed to get current Status:${getLifecycleError}`
)
)
);
}, taskEvent.event);
} else {
either<ConcreteTaskInstance, Error | Option<ConcreteTaskInstance>>(
taskEvent.event,
(taskInstance: ConcreteTaskInstance) => {
// resolve if the task has run sucessfully
if (isTaskRunEvent(taskEvent)) {
subscription.unsubscribe();
resolve({ id: taskInstance.id });
}
},
async (error: Error | Option<ConcreteTaskInstance>) => {
// reject if any error event takes place for the requested task
subscription.unsubscribe();
if (isTaskRunRequestEvent(taskEvent)) {
return reject(
new Error(
`Failed to run task "${taskId}" as Task Manager is at capacity, please try again later`
)
);
}
return reject(new Error(`Failed to run task "${taskId}": ${error}`));
}
return reject(new Error(`Failed to run task "${taskId}": ${error}`));
}
);
);
}
});
});
}
Loading

0 comments on commit 9e6f63f

Please sign in to comment.