Skip to content

Commit

Permalink
plugged Task Manager lifecycle into status reactively
Browse files Browse the repository at this point in the history
  • Loading branch information
gmmorris committed Oct 27, 2020
1 parent 38a8063 commit b18991e
Show file tree
Hide file tree
Showing 3 changed files with 124 additions and 62 deletions.
39 changes: 24 additions & 15 deletions x-pack/plugins/task_manager/server/plugin.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,16 @@
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
import { PluginInitializerContext, Plugin, CoreSetup, Logger, CoreStart } from 'src/core/server';
import { first } from 'rxjs/operators';
import { Observable } from 'rxjs';
import { first, map, distinctUntilChanged } from 'rxjs/operators';
import {
PluginInitializerContext,
Plugin,
CoreSetup,
Logger,
CoreStart,
ServiceStatusLevels,
} from '../../../../src/core/server';
import { TaskDefinition } from './task';
import { TaskPollingLifecycle } from './polling_lifecycle';
import { TaskManagerConfig } from './config';
Expand Down Expand Up @@ -34,19 +42,29 @@ export class TaskManagerPlugin
private logger: Logger;
private definitions: TaskTypeDictionary;
private middleware: Middleware = createInitialMiddleware();
private elasticsearchAndSOAvailability$?: Observable<boolean>;

constructor(private readonly initContext: PluginInitializerContext) {
this.initContext = initContext;
this.logger = initContext.logger.get();
this.definitions = new TaskTypeDictionary(this.logger);
}

public async setup({ savedObjects }: CoreSetup): Promise<TaskManagerSetupContract> {
public async setup({ savedObjects, status }: CoreSetup): Promise<TaskManagerSetupContract> {
this.config = await this.initContext.config
.create<TaskManagerConfig>()
.pipe(first())
.toPromise();

this.elasticsearchAndSOAvailability$ = status.core$.pipe(
map(
(ev) =>
ev.elasticsearch.level === ServiceStatusLevels.available &&
ev.savedObjects.level === ServiceStatusLevels.available
),
distinctUntilChanged()
);

setupSavedObjects(savedObjects, this.config);
this.taskManagerId = this.initContext.env.instanceUuid;

Expand Down Expand Up @@ -91,27 +109,24 @@ export class TaskManagerPlugin
startingPollInterval: this.config!.poll_interval,
});

const taskPollingLifecycle = new TaskPollingLifecycle({
this.taskPollingLifecycle = new TaskPollingLifecycle({
config: this.config!,
definitions: this.definitions,
logger: this.logger,
taskStore,
middleware: this.middleware,
maxWorkersConfiguration$,
pollIntervalConfiguration$,
elasticsearchAndSOAvailability$: this.elasticsearchAndSOAvailability$!,
});
this.taskPollingLifecycle = taskPollingLifecycle;

const taskScheduling = new TaskScheduling({
logger: this.logger,
taskStore,
middleware: this.middleware,
taskPollingLifecycle,
taskPollingLifecycle: this.taskPollingLifecycle,
});

// start polling for work
taskPollingLifecycle.start();

return {
fetch: (opts: SearchOpts): Promise<FetchResult> => taskStore.fetch(opts),
get: (id: string) => taskStore.get(id),
Expand All @@ -122,12 +137,6 @@ export class TaskManagerPlugin
};
}

public stop() {
if (this.taskPollingLifecycle) {
this.taskPollingLifecycle.stop();
}
}

/**
* Ensures task manager hasn't started
*
Expand Down
57 changes: 53 additions & 4 deletions x-pack/plugins/task_manager/server/polling_lifecycle.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

import _ from 'lodash';
import sinon from 'sinon';
import { of } from 'rxjs';
import { of, Subject } from 'rxjs';

import { TaskPollingLifecycle, claimAvailableTasks } from './polling_lifecycle';
import { createInitialMiddleware } from './lib/middleware';
Expand Down Expand Up @@ -45,15 +45,64 @@ describe('TaskPollingLifecycle', () => {
afterEach(() => clock.restore());

describe('start', () => {
test('begins polling once start is called', () => {
const taskManager = new TaskPollingLifecycle(taskManagerOpts);
test('begins polling once the ES and SavedObjects services are available', () => {
const elasticsearchAndSOAvailability$ = new Subject<boolean>();
new TaskPollingLifecycle({
elasticsearchAndSOAvailability$,
...taskManagerOpts,
});

clock.tick(150);
expect(mockTaskStore.claimAvailableTasks).not.toHaveBeenCalled();

elasticsearchAndSOAvailability$.next(true);

clock.tick(150);
expect(mockTaskStore.claimAvailableTasks).toHaveBeenCalled();
});
});

describe('stop', () => {
test('stops polling once the ES and SavedObjects services become unavailable', () => {
const elasticsearchAndSOAvailability$ = new Subject<boolean>();
new TaskPollingLifecycle({
elasticsearchAndSOAvailability$,
...taskManagerOpts,
});

elasticsearchAndSOAvailability$.next(true);

clock.tick(150);
expect(mockTaskStore.claimAvailableTasks).toHaveBeenCalled();

elasticsearchAndSOAvailability$.next(false);

mockTaskStore.claimAvailableTasks.mockClear();
clock.tick(150);
expect(mockTaskStore.claimAvailableTasks).not.toHaveBeenCalled();
});

test('restarts polling once the ES and SavedObjects services become available again', () => {
const elasticsearchAndSOAvailability$ = new Subject<boolean>();
new TaskPollingLifecycle({
elasticsearchAndSOAvailability$,
...taskManagerOpts,
});

elasticsearchAndSOAvailability$.next(true);

taskManager.start();
clock.tick(150);
expect(mockTaskStore.claimAvailableTasks).toHaveBeenCalled();

elasticsearchAndSOAvailability$.next(false);
mockTaskStore.claimAvailableTasks.mockClear();
clock.tick(150);

expect(mockTaskStore.claimAvailableTasks).not.toHaveBeenCalled();

elasticsearchAndSOAvailability$.next(true);
clock.tick(150);

expect(mockTaskStore.claimAvailableTasks).toHaveBeenCalled();
});
});
Expand Down
90 changes: 47 additions & 43 deletions x-pack/plugins/task_manager/server/polling_lifecycle.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ export type TaskPollingLifecycleOpts = {
taskStore: TaskStore;
config: TaskManagerConfig;
middleware: Middleware;
elasticsearchAndSOAvailability$: Observable<boolean>;
} & ManagedConfiguration;

export type TaskLifecycleEvent = TaskMarkRunning | TaskRun | TaskClaim | TaskRunRequest;
Expand All @@ -64,8 +65,6 @@ export class TaskPollingLifecycle {
private events$ = new Subject<TaskLifecycleEvent>();
// all on-demand requests we wish to pipe into the poller
private claimRequests$ = new Subject<Option<string>>();
// the task poller that polls for work on fixed intervals and on demand
private poller$: Observable<Result<FillPoolResult, PollingError<string>>>;
// our subscription to the poller
private pollingSubscription: Subscription = Subscription.EMPTY;

Expand All @@ -76,36 +75,50 @@ export class TaskPollingLifecycle {
* enabling the task manipulation methods, and beginning the background polling
* mechanism.
*/
constructor(opts: TaskPollingLifecycleOpts) {
const { logger, middleware, maxWorkersConfiguration$, pollIntervalConfiguration$ } = opts;
constructor({
logger,
middleware,
maxWorkersConfiguration$,
pollIntervalConfiguration$,
// Elasticsearch and SavedObjects availability status
elasticsearchAndSOAvailability$,
config,
taskStore,
definitions,
}: TaskPollingLifecycleOpts) {
this.logger = logger;
this.middleware = middleware;
this.definitions = definitions;
this.store = taskStore;

this.definitions = opts.definitions;
this.store = opts.taskStore;
// pipe store events into the lifecycle event stream
this.store.events.subscribe((event) => this.events$.next(event));

this.bufferedStore = new BufferedTaskStore(this.store, {
bufferMaxOperations: opts.config.max_workers,
logger: this.logger,
bufferMaxOperations: config.max_workers,
logger,
});

this.pool = new TaskPool({
logger: this.logger,
logger,
maxWorkers$: maxWorkersConfiguration$,
});

const {
max_poll_inactivity_cycles: maxPollInactivityCycles,
poll_interval: pollInterval,
} = opts.config;
this.poller$ = createObservableMonitor<Result<FillPoolResult, PollingError<string>>, Error>(
} = config;

// the task poller that polls for work on fixed intervals and on demand
const poller$: Observable<Result<
FillPoolResult,
PollingError<string>
>> = createObservableMonitor<Result<FillPoolResult, PollingError<string>>, Error>(
() =>
createTaskPoller<string, FillPoolResult>({
logger: this.logger,
logger,
pollInterval$: pollIntervalConfiguration$,
bufferCapacity: opts.config.request_capacity,
bufferCapacity: config.request_capacity,
getCapacity: () => this.pool.availableWorkers,
pollRequests$: this.claimRequests$,
work: this.pollForWork,
Expand All @@ -125,10 +138,30 @@ export class TaskPollingLifecycle {
// operation than just timing out the `work` internally)
inactivityTimeout: pollInterval * (maxPollInactivityCycles + 1),
onError: (error) => {
this.logger.error(`[Task Poller Monitor]: ${error.message}`);
logger.error(`[Task Poller Monitor]: ${error.message}`);
},
}
);

elasticsearchAndSOAvailability$.subscribe((areESAndSOAvailable) => {
if (areESAndSOAvailable && !this.isStarted) {
// start polling for work
this.pollingSubscription = poller$.subscribe(
mapErr((error: PollingError<string>) => {
if (error.type === PollingErrorType.RequestCapacityReached) {
pipe(
error.data,
mapOptional((id) => this.emitEvent(asTaskRunRequestEvent(id, asErr(error))))
);
}
logger.error(error.message);
})
);
} else if (!areESAndSOAvailable && this.isStarted) {
this.pollingSubscription.unsubscribe();
this.pool.cancelRunningTasks();
}
});
}

public get events(): Observable<TaskLifecycleEvent> {
Expand Down Expand Up @@ -175,35 +208,6 @@ export class TaskPollingLifecycle {
async (tasks: TaskRunner[]) => await this.pool.run(tasks)
);
};

/**
* Starts up the task manager and starts picking up tasks.
*/
public start() {
if (!this.isStarted) {
this.pollingSubscription = this.poller$.subscribe(
mapErr((error: PollingError<string>) => {
if (error.type === PollingErrorType.RequestCapacityReached) {
pipe(
error.data,
mapOptional((id) => this.emitEvent(asTaskRunRequestEvent(id, asErr(error))))
);
}
this.logger.error(error.message);
})
);
}
}

/**
* Stops the task manager and cancels running tasks.
*/
public stop() {
if (this.isStarted) {
this.pollingSubscription.unsubscribe();
this.pool.cancelRunningTasks();
}
}
}

export async function claimAvailableTasks(
Expand Down

0 comments on commit b18991e

Please sign in to comment.