From b18991e1511b3285ad2696fa2878363640cfd36e Mon Sep 17 00:00:00 2001 From: Gidi Meir Morris Date: Tue, 27 Oct 2020 14:17:34 +0000 Subject: [PATCH 1/7] plugged Task Manager lifecycle into status reactively --- x-pack/plugins/task_manager/server/plugin.ts | 39 ++++---- .../server/polling_lifecycle.test.ts | 57 +++++++++++- .../task_manager/server/polling_lifecycle.ts | 90 ++++++++++--------- 3 files changed, 124 insertions(+), 62 deletions(-) diff --git a/x-pack/plugins/task_manager/server/plugin.ts b/x-pack/plugins/task_manager/server/plugin.ts index 0381698e6fb77..92f492acf591e 100644 --- a/x-pack/plugins/task_manager/server/plugin.ts +++ b/x-pack/plugins/task_manager/server/plugin.ts @@ -3,8 +3,16 @@ * or more contributor license agreements. Licensed under the Elastic License; * you may not use this file except in compliance with the Elastic License. */ -import { PluginInitializerContext, Plugin, CoreSetup, Logger, CoreStart } from 'src/core/server'; -import { first } from 'rxjs/operators'; +import { Observable } from 'rxjs'; +import { first, map, distinctUntilChanged } from 'rxjs/operators'; +import { + PluginInitializerContext, + Plugin, + CoreSetup, + Logger, + CoreStart, + ServiceStatusLevels, +} from '../../../../src/core/server'; import { TaskDefinition } from './task'; import { TaskPollingLifecycle } from './polling_lifecycle'; import { TaskManagerConfig } from './config'; @@ -34,6 +42,7 @@ export class TaskManagerPlugin private logger: Logger; private definitions: TaskTypeDictionary; private middleware: Middleware = createInitialMiddleware(); + private elasticsearchAndSOAvailability$?: Observable; constructor(private readonly initContext: PluginInitializerContext) { this.initContext = initContext; @@ -41,12 +50,21 @@ export class TaskManagerPlugin this.definitions = new TaskTypeDictionary(this.logger); } - public async setup({ savedObjects }: CoreSetup): Promise { + public async setup({ savedObjects, status }: CoreSetup): Promise { this.config = await this.initContext.config .create() .pipe(first()) .toPromise(); + this.elasticsearchAndSOAvailability$ = status.core$.pipe( + map( + (ev) => + ev.elasticsearch.level === ServiceStatusLevels.available && + ev.savedObjects.level === ServiceStatusLevels.available + ), + distinctUntilChanged() + ); + setupSavedObjects(savedObjects, this.config); this.taskManagerId = this.initContext.env.instanceUuid; @@ -91,7 +109,7 @@ export class TaskManagerPlugin startingPollInterval: this.config!.poll_interval, }); - const taskPollingLifecycle = new TaskPollingLifecycle({ + this.taskPollingLifecycle = new TaskPollingLifecycle({ config: this.config!, definitions: this.definitions, logger: this.logger, @@ -99,19 +117,16 @@ export class TaskManagerPlugin middleware: this.middleware, maxWorkersConfiguration$, pollIntervalConfiguration$, + elasticsearchAndSOAvailability$: this.elasticsearchAndSOAvailability$!, }); - this.taskPollingLifecycle = taskPollingLifecycle; const taskScheduling = new TaskScheduling({ logger: this.logger, taskStore, middleware: this.middleware, - taskPollingLifecycle, + taskPollingLifecycle: this.taskPollingLifecycle, }); - // start polling for work - taskPollingLifecycle.start(); - return { fetch: (opts: SearchOpts): Promise => taskStore.fetch(opts), get: (id: string) => taskStore.get(id), @@ -122,12 +137,6 @@ export class TaskManagerPlugin }; } - public stop() { - if (this.taskPollingLifecycle) { - this.taskPollingLifecycle.stop(); - } - } - /** * Ensures task manager hasn't started * diff --git a/x-pack/plugins/task_manager/server/polling_lifecycle.test.ts b/x-pack/plugins/task_manager/server/polling_lifecycle.test.ts index 29c8e836303f8..2e73c74a8b247 100644 --- a/x-pack/plugins/task_manager/server/polling_lifecycle.test.ts +++ b/x-pack/plugins/task_manager/server/polling_lifecycle.test.ts @@ -6,7 +6,7 @@ import _ from 'lodash'; import sinon from 'sinon'; -import { of } from 'rxjs'; +import { of, Subject } from 'rxjs'; import { TaskPollingLifecycle, claimAvailableTasks } from './polling_lifecycle'; import { createInitialMiddleware } from './lib/middleware'; @@ -45,15 +45,64 @@ describe('TaskPollingLifecycle', () => { afterEach(() => clock.restore()); describe('start', () => { - test('begins polling once start is called', () => { - const taskManager = new TaskPollingLifecycle(taskManagerOpts); + test('begins polling once the ES and SavedObjects services are available', () => { + const elasticsearchAndSOAvailability$ = new Subject(); + new TaskPollingLifecycle({ + elasticsearchAndSOAvailability$, + ...taskManagerOpts, + }); + + clock.tick(150); + expect(mockTaskStore.claimAvailableTasks).not.toHaveBeenCalled(); + + elasticsearchAndSOAvailability$.next(true); + + clock.tick(150); + expect(mockTaskStore.claimAvailableTasks).toHaveBeenCalled(); + }); + }); + + describe('stop', () => { + test('stops polling once the ES and SavedObjects services become unavailable', () => { + const elasticsearchAndSOAvailability$ = new Subject(); + new TaskPollingLifecycle({ + elasticsearchAndSOAvailability$, + ...taskManagerOpts, + }); + + elasticsearchAndSOAvailability$.next(true); + + clock.tick(150); + expect(mockTaskStore.claimAvailableTasks).toHaveBeenCalled(); + elasticsearchAndSOAvailability$.next(false); + + mockTaskStore.claimAvailableTasks.mockClear(); clock.tick(150); expect(mockTaskStore.claimAvailableTasks).not.toHaveBeenCalled(); + }); + + test('restarts polling once the ES and SavedObjects services become available again', () => { + const elasticsearchAndSOAvailability$ = new Subject(); + new TaskPollingLifecycle({ + elasticsearchAndSOAvailability$, + ...taskManagerOpts, + }); + + elasticsearchAndSOAvailability$.next(true); - taskManager.start(); + clock.tick(150); + expect(mockTaskStore.claimAvailableTasks).toHaveBeenCalled(); + elasticsearchAndSOAvailability$.next(false); + mockTaskStore.claimAvailableTasks.mockClear(); clock.tick(150); + + expect(mockTaskStore.claimAvailableTasks).not.toHaveBeenCalled(); + + elasticsearchAndSOAvailability$.next(true); + clock.tick(150); + expect(mockTaskStore.claimAvailableTasks).toHaveBeenCalled(); }); }); diff --git a/x-pack/plugins/task_manager/server/polling_lifecycle.ts b/x-pack/plugins/task_manager/server/polling_lifecycle.ts index 8a506cca699de..31fef6520ede5 100644 --- a/x-pack/plugins/task_manager/server/polling_lifecycle.ts +++ b/x-pack/plugins/task_manager/server/polling_lifecycle.ts @@ -45,6 +45,7 @@ export type TaskPollingLifecycleOpts = { taskStore: TaskStore; config: TaskManagerConfig; middleware: Middleware; + elasticsearchAndSOAvailability$: Observable; } & ManagedConfiguration; export type TaskLifecycleEvent = TaskMarkRunning | TaskRun | TaskClaim | TaskRunRequest; @@ -64,8 +65,6 @@ export class TaskPollingLifecycle { private events$ = new Subject(); // all on-demand requests we wish to pipe into the poller private claimRequests$ = new Subject>(); - // the task poller that polls for work on fixed intervals and on demand - private poller$: Observable>>; // our subscription to the poller private pollingSubscription: Subscription = Subscription.EMPTY; @@ -76,36 +75,50 @@ export class TaskPollingLifecycle { * enabling the task manipulation methods, and beginning the background polling * mechanism. */ - constructor(opts: TaskPollingLifecycleOpts) { - const { logger, middleware, maxWorkersConfiguration$, pollIntervalConfiguration$ } = opts; + constructor({ + logger, + middleware, + maxWorkersConfiguration$, + pollIntervalConfiguration$, + // Elasticsearch and SavedObjects availability status + elasticsearchAndSOAvailability$, + config, + taskStore, + definitions, + }: TaskPollingLifecycleOpts) { this.logger = logger; this.middleware = middleware; + this.definitions = definitions; + this.store = taskStore; - this.definitions = opts.definitions; - this.store = opts.taskStore; // pipe store events into the lifecycle event stream this.store.events.subscribe((event) => this.events$.next(event)); this.bufferedStore = new BufferedTaskStore(this.store, { - bufferMaxOperations: opts.config.max_workers, - logger: this.logger, + bufferMaxOperations: config.max_workers, + logger, }); this.pool = new TaskPool({ - logger: this.logger, + logger, maxWorkers$: maxWorkersConfiguration$, }); const { max_poll_inactivity_cycles: maxPollInactivityCycles, poll_interval: pollInterval, - } = opts.config; - this.poller$ = createObservableMonitor>, Error>( + } = config; + + // the task poller that polls for work on fixed intervals and on demand + const poller$: Observable + >> = createObservableMonitor>, Error>( () => createTaskPoller({ - logger: this.logger, + logger, pollInterval$: pollIntervalConfiguration$, - bufferCapacity: opts.config.request_capacity, + bufferCapacity: config.request_capacity, getCapacity: () => this.pool.availableWorkers, pollRequests$: this.claimRequests$, work: this.pollForWork, @@ -125,10 +138,30 @@ export class TaskPollingLifecycle { // operation than just timing out the `work` internally) inactivityTimeout: pollInterval * (maxPollInactivityCycles + 1), onError: (error) => { - this.logger.error(`[Task Poller Monitor]: ${error.message}`); + logger.error(`[Task Poller Monitor]: ${error.message}`); }, } ); + + elasticsearchAndSOAvailability$.subscribe((areESAndSOAvailable) => { + if (areESAndSOAvailable && !this.isStarted) { + // start polling for work + this.pollingSubscription = poller$.subscribe( + mapErr((error: PollingError) => { + if (error.type === PollingErrorType.RequestCapacityReached) { + pipe( + error.data, + mapOptional((id) => this.emitEvent(asTaskRunRequestEvent(id, asErr(error)))) + ); + } + logger.error(error.message); + }) + ); + } else if (!areESAndSOAvailable && this.isStarted) { + this.pollingSubscription.unsubscribe(); + this.pool.cancelRunningTasks(); + } + }); } public get events(): Observable { @@ -175,35 +208,6 @@ export class TaskPollingLifecycle { async (tasks: TaskRunner[]) => await this.pool.run(tasks) ); }; - - /** - * Starts up the task manager and starts picking up tasks. - */ - public start() { - if (!this.isStarted) { - this.pollingSubscription = this.poller$.subscribe( - mapErr((error: PollingError) => { - if (error.type === PollingErrorType.RequestCapacityReached) { - pipe( - error.data, - mapOptional((id) => this.emitEvent(asTaskRunRequestEvent(id, asErr(error)))) - ); - } - this.logger.error(error.message); - }) - ); - } - } - - /** - * Stops the task manager and cancels running tasks. - */ - public stop() { - if (this.isStarted) { - this.pollingSubscription.unsubscribe(); - this.pool.cancelRunningTasks(); - } - } } export async function claimAvailableTasks( From 39f885bab5f0c0df6a256091dce5f5fcd3267aad Mon Sep 17 00:00:00 2001 From: Gidi Meir Morris Date: Tue, 27 Oct 2020 17:57:29 +0000 Subject: [PATCH 2/7] extracted subscription code --- .../task_manager/server/polling_lifecycle.ts | 38 ++++++++++--------- 1 file changed, 21 insertions(+), 17 deletions(-) diff --git a/x-pack/plugins/task_manager/server/polling_lifecycle.ts b/x-pack/plugins/task_manager/server/polling_lifecycle.ts index 73c2a897aaacb..2bba07d0eab11 100644 --- a/x-pack/plugins/task_manager/server/polling_lifecycle.ts +++ b/x-pack/plugins/task_manager/server/polling_lifecycle.ts @@ -154,23 +154,7 @@ export class TaskPollingLifecycle { elasticsearchAndSOAvailability$.subscribe((areESAndSOAvailable) => { if (areESAndSOAvailable && !this.isStarted) { // start polling for work - this.pollingSubscription = poller$ - .pipe( - tap( - mapErr((error: PollingError) => { - if (error.type === PollingErrorType.RequestCapacityReached) { - pipe( - error.data, - mapOptional((id) => this.emitEvent(asTaskRunRequestEvent(id, asErr(error)))) - ); - } - this.logger.error(error.message); - }) - ) - ) - .subscribe((event: Result>) => { - this.emitEvent(asTaskPollingCycleEvent(event)); - }); + this.pollingSubscription = this.subscribeToPoller(poller$); } else if (!areESAndSOAvailable && this.isStarted) { this.pollingSubscription.unsubscribe(); this.pool.cancelRunningTasks(); @@ -206,6 +190,26 @@ export class TaskPollingLifecycle { return !this.pollingSubscription.closed; } + private subscribeToPoller(poller$: Observable>>) { + return poller$ + .pipe( + tap( + mapErr((error: PollingError) => { + if (error.type === PollingErrorType.RequestCapacityReached) { + pipe( + error.data, + mapOptional((id) => this.emitEvent(asTaskRunRequestEvent(id, asErr(error)))) + ); + } + this.logger.error(error.message); + }) + ) + ) + .subscribe((event: Result>) => { + this.emitEvent(asTaskPollingCycleEvent(event)); + }); + } + private pollForWork = async (...tasksToClaim: string[]): Promise => { return fillPool( // claim available tasks From ff9934ffb53cb098258d1a96fefe0136faf060f7 Mon Sep 17 00:00:00 2001 From: Gidi Meir Morris Date: Tue, 27 Oct 2020 17:59:25 +0000 Subject: [PATCH 3/7] small refactor --- .../task_manager/server/polling_lifecycle.ts | 34 +++++++++---------- 1 file changed, 17 insertions(+), 17 deletions(-) diff --git a/x-pack/plugins/task_manager/server/polling_lifecycle.ts b/x-pack/plugins/task_manager/server/polling_lifecycle.ts index 2bba07d0eab11..ccba750401f28 100644 --- a/x-pack/plugins/task_manager/server/polling_lifecycle.ts +++ b/x-pack/plugins/task_manager/server/polling_lifecycle.ts @@ -190,6 +190,23 @@ export class TaskPollingLifecycle { return !this.pollingSubscription.closed; } + private pollForWork = async (...tasksToClaim: string[]): Promise => { + return fillPool( + // claim available tasks + () => + claimAvailableTasks( + tasksToClaim.splice(0, this.pool.availableWorkers), + this.store.claimAvailableTasks, + this.pool.availableWorkers, + this.logger + ), + // wrap each task in a Task Runner + this.createTaskRunnerForTask, + // place tasks in the Task Pool + async (tasks: TaskRunner[]) => await this.pool.run(tasks) + ); + }; + private subscribeToPoller(poller$: Observable>>) { return poller$ .pipe( @@ -209,23 +226,6 @@ export class TaskPollingLifecycle { this.emitEvent(asTaskPollingCycleEvent(event)); }); } - - private pollForWork = async (...tasksToClaim: string[]): Promise => { - return fillPool( - // claim available tasks - () => - claimAvailableTasks( - tasksToClaim.splice(0, this.pool.availableWorkers), - this.store.claimAvailableTasks, - this.pool.availableWorkers, - this.logger - ), - // wrap each task in a Task Runner - this.createTaskRunnerForTask, - // place tasks in the Task Pool - async (tasks: TaskRunner[]) => await this.pool.run(tasks) - ); - }; } export async function claimAvailableTasks( From ea154a0803a484273d97c9f33b87b78b53d916f7 Mon Sep 17 00:00:00 2001 From: Gidi Meir Morris Date: Wed, 28 Oct 2020 12:13:01 +0000 Subject: [PATCH 4/7] added tests for availability --- .../task_manager/server/plugin.test.ts | 180 +++++++++++++++++- x-pack/plugins/task_manager/server/plugin.ts | 25 ++- .../server/polling_lifecycle.mock.ts | 2 - 3 files changed, 195 insertions(+), 12 deletions(-) diff --git a/x-pack/plugins/task_manager/server/plugin.test.ts b/x-pack/plugins/task_manager/server/plugin.test.ts index 8388468164a4f..fa3b4ae3968e4 100644 --- a/x-pack/plugins/task_manager/server/plugin.test.ts +++ b/x-pack/plugins/task_manager/server/plugin.test.ts @@ -4,9 +4,12 @@ * you may not use this file except in compliance with the Elastic License. */ -import { TaskManagerPlugin } from './plugin'; +import { TaskManagerPlugin, getElasticsearchAndSOAvailability } from './plugin'; import { coreMock } from '../../../../src/core/server/mocks'; import { TaskManagerConfig } from './config'; +import { Subject } from 'rxjs'; +import { bufferCount, take } from 'rxjs/operators'; +import { CoreStatus, ServiceStatusLevels } from 'src/core/server'; describe('TaskManagerPlugin', () => { describe('setup', () => { @@ -88,4 +91,179 @@ describe('TaskManagerPlugin', () => { ); }); }); + + describe('getElasticsearchAndSOAvailability', () => { + test('returns true when both services are available', async () => { + const core$ = new Subject(); + + const availability = getElasticsearchAndSOAvailability(core$) + .pipe(take(1), bufferCount(1)) + .toPromise(); + + core$.next({ + elasticsearch: { + level: ServiceStatusLevels.available, + summary: '', + }, + savedObjects: { + level: ServiceStatusLevels.available, + summary: '', + }, + }); + + expect(await availability).toEqual([true]); + }); + + test('returns false when both services are unavailable', async () => { + const core$ = new Subject(); + + const availability = getElasticsearchAndSOAvailability(core$) + .pipe(take(1), bufferCount(1)) + .toPromise(); + + core$.next({ + elasticsearch: { + level: ServiceStatusLevels.unavailable, + summary: '', + }, + savedObjects: { + level: ServiceStatusLevels.unavailable, + summary: '', + }, + }); + + expect(await availability).toEqual([false]); + }); + + test('returns false when one service is unavailable but the other is available', async () => { + const core$ = new Subject(); + + const availability = getElasticsearchAndSOAvailability(core$) + .pipe(take(1), bufferCount(1)) + .toPromise(); + + core$.next({ + elasticsearch: { + level: ServiceStatusLevels.available, + summary: '', + }, + savedObjects: { + level: ServiceStatusLevels.unavailable, + summary: '', + }, + }); + + expect(await availability).toEqual([false]); + }); + + test('shift back and forth between values as status changes', async () => { + const core$ = new Subject(); + + const availability = getElasticsearchAndSOAvailability(core$) + .pipe(take(3), bufferCount(3)) + .toPromise(); + + core$.next({ + elasticsearch: { + level: ServiceStatusLevels.available, + summary: '', + }, + savedObjects: { + level: ServiceStatusLevels.unavailable, + summary: '', + }, + }); + + core$.next({ + elasticsearch: { + level: ServiceStatusLevels.available, + summary: '', + }, + savedObjects: { + level: ServiceStatusLevels.available, + summary: '', + }, + }); + + core$.next({ + elasticsearch: { + level: ServiceStatusLevels.unavailable, + summary: '', + }, + savedObjects: { + level: ServiceStatusLevels.unavailable, + summary: '', + }, + }); + + expect(await availability).toEqual([false, true, false]); + }); + + test(`skips values when the status hasn't changed`, async () => { + const core$ = new Subject(); + + const availability = getElasticsearchAndSOAvailability(core$) + .pipe(take(3), bufferCount(3)) + .toPromise(); + + core$.next({ + elasticsearch: { + level: ServiceStatusLevels.available, + summary: '', + }, + savedObjects: { + level: ServiceStatusLevels.unavailable, + summary: '', + }, + }); + + // still false, so shouldn't emit a second time + core$.next({ + elasticsearch: { + level: ServiceStatusLevels.unavailable, + summary: '', + }, + savedObjects: { + level: ServiceStatusLevels.available, + summary: '', + }, + }); + + core$.next({ + elasticsearch: { + level: ServiceStatusLevels.available, + summary: '', + }, + savedObjects: { + level: ServiceStatusLevels.available, + summary: '', + }, + }); + + // shouldn't emit as already true + core$.next({ + elasticsearch: { + level: ServiceStatusLevels.available, + summary: '', + }, + savedObjects: { + level: ServiceStatusLevels.available, + summary: '', + }, + }); + + core$.next({ + elasticsearch: { + level: ServiceStatusLevels.unavailable, + summary: '', + }, + savedObjects: { + level: ServiceStatusLevels.unavailable, + summary: '', + }, + }); + + expect(await availability).toEqual([false, true, false]); + }); + }); }); diff --git a/x-pack/plugins/task_manager/server/plugin.ts b/x-pack/plugins/task_manager/server/plugin.ts index 8b723864bd234..77de90ea76d11 100644 --- a/x-pack/plugins/task_manager/server/plugin.ts +++ b/x-pack/plugins/task_manager/server/plugin.ts @@ -4,7 +4,7 @@ * you may not use this file except in compliance with the Elastic License. */ import { combineLatest, Observable, Subject } from 'rxjs'; -import { first, map, distinctUntilChanged } from 'rxjs/operators'; +import { first, map, tap, distinctUntilChanged } from 'rxjs/operators'; import { PluginInitializerContext, Plugin, @@ -12,6 +12,7 @@ import { Logger, CoreStart, ServiceStatusLevels, + CoreStatus, } from '../../../../src/core/server'; import { TaskDefinition } from './task'; import { TaskPollingLifecycle } from './polling_lifecycle'; @@ -59,14 +60,7 @@ export class TaskManagerPlugin .pipe(first()) .toPromise(); - this.elasticsearchAndSOAvailability$ = core.status.core$.pipe( - map( - ({ elasticsearch, savedObjects }) => - elasticsearch.level === ServiceStatusLevels.available && - savedObjects.level === ServiceStatusLevels.available - ), - distinctUntilChanged() - ); + this.elasticsearchAndSOAvailability$ = getElasticsearchAndSOAvailability(core.status.core$); setupSavedObjects(core.savedObjects, this.config); this.taskManagerId = this.initContext.env.instanceUuid; @@ -179,3 +173,16 @@ export class TaskManagerPlugin } } } + +export function getElasticsearchAndSOAvailability( + core$: Observable +): Observable { + return core$.pipe( + map( + ({ elasticsearch, savedObjects }) => + elasticsearch.level === ServiceStatusLevels.available && + savedObjects.level === ServiceStatusLevels.available + ), + distinctUntilChanged() + ); +} diff --git a/x-pack/plugins/task_manager/server/polling_lifecycle.mock.ts b/x-pack/plugins/task_manager/server/polling_lifecycle.mock.ts index 9df1e06165bc6..286e29194d6e6 100644 --- a/x-pack/plugins/task_manager/server/polling_lifecycle.mock.ts +++ b/x-pack/plugins/task_manager/server/polling_lifecycle.mock.ts @@ -10,7 +10,6 @@ import { of, Observable } from 'rxjs'; export const taskPollingLifecycleMock = { create(opts: { isStarted?: boolean; events$?: Observable }) { return ({ - start: jest.fn(), attemptToRun: jest.fn(), get isStarted() { return opts.isStarted ?? true; @@ -18,7 +17,6 @@ export const taskPollingLifecycleMock = { get events() { return opts.events$ ?? of(); }, - stop: jest.fn(), } as unknown) as jest.Mocked; }, }; From 162345b63c6b1a642e89598b62bac268089d9b24 Mon Sep 17 00:00:00 2001 From: Gidi Meir Morris Date: Wed, 28 Oct 2020 12:19:38 +0000 Subject: [PATCH 5/7] removed unused import --- x-pack/plugins/task_manager/server/plugin.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/x-pack/plugins/task_manager/server/plugin.ts b/x-pack/plugins/task_manager/server/plugin.ts index 77de90ea76d11..c3b9d6e917b28 100644 --- a/x-pack/plugins/task_manager/server/plugin.ts +++ b/x-pack/plugins/task_manager/server/plugin.ts @@ -4,7 +4,7 @@ * you may not use this file except in compliance with the Elastic License. */ import { combineLatest, Observable, Subject } from 'rxjs'; -import { first, map, tap, distinctUntilChanged } from 'rxjs/operators'; +import { first, map, distinctUntilChanged } from 'rxjs/operators'; import { PluginInitializerContext, Plugin, From 03128e285763cda10244b779e362e553ed55afda Mon Sep 17 00:00:00 2001 From: Gidi Meir Morris Date: Wed, 28 Oct 2020 12:25:13 +0000 Subject: [PATCH 6/7] improved mocking to make it easier to maintain --- .../task_manager/server/plugin.test.ts | 140 ++++-------------- 1 file changed, 30 insertions(+), 110 deletions(-) diff --git a/x-pack/plugins/task_manager/server/plugin.test.ts b/x-pack/plugins/task_manager/server/plugin.test.ts index fa3b4ae3968e4..9a1d83f6195ab 100644 --- a/x-pack/plugins/task_manager/server/plugin.test.ts +++ b/x-pack/plugins/task_manager/server/plugin.test.ts @@ -100,16 +100,7 @@ describe('TaskManagerPlugin', () => { .pipe(take(1), bufferCount(1)) .toPromise(); - core$.next({ - elasticsearch: { - level: ServiceStatusLevels.available, - summary: '', - }, - savedObjects: { - level: ServiceStatusLevels.available, - summary: '', - }, - }); + core$.next(mockCoreStatusAvailability({ elasticsearch: true, savedObjects: true })); expect(await availability).toEqual([true]); }); @@ -121,16 +112,7 @@ describe('TaskManagerPlugin', () => { .pipe(take(1), bufferCount(1)) .toPromise(); - core$.next({ - elasticsearch: { - level: ServiceStatusLevels.unavailable, - summary: '', - }, - savedObjects: { - level: ServiceStatusLevels.unavailable, - summary: '', - }, - }); + core$.next(mockCoreStatusAvailability({ elasticsearch: false, savedObjects: false })); expect(await availability).toEqual([false]); }); @@ -142,16 +124,7 @@ describe('TaskManagerPlugin', () => { .pipe(take(1), bufferCount(1)) .toPromise(); - core$.next({ - elasticsearch: { - level: ServiceStatusLevels.available, - summary: '', - }, - savedObjects: { - level: ServiceStatusLevels.unavailable, - summary: '', - }, - }); + core$.next(mockCoreStatusAvailability({ elasticsearch: true, savedObjects: false })); expect(await availability).toEqual([false]); }); @@ -163,38 +136,11 @@ describe('TaskManagerPlugin', () => { .pipe(take(3), bufferCount(3)) .toPromise(); - core$.next({ - elasticsearch: { - level: ServiceStatusLevels.available, - summary: '', - }, - savedObjects: { - level: ServiceStatusLevels.unavailable, - summary: '', - }, - }); + core$.next(mockCoreStatusAvailability({ elasticsearch: true, savedObjects: false })); - core$.next({ - elasticsearch: { - level: ServiceStatusLevels.available, - summary: '', - }, - savedObjects: { - level: ServiceStatusLevels.available, - summary: '', - }, - }); + core$.next(mockCoreStatusAvailability({ elasticsearch: true, savedObjects: true })); - core$.next({ - elasticsearch: { - level: ServiceStatusLevels.unavailable, - summary: '', - }, - savedObjects: { - level: ServiceStatusLevels.unavailable, - summary: '', - }, - }); + core$.next(mockCoreStatusAvailability({ elasticsearch: false, savedObjects: false })); expect(await availability).toEqual([false, true, false]); }); @@ -206,64 +152,38 @@ describe('TaskManagerPlugin', () => { .pipe(take(3), bufferCount(3)) .toPromise(); - core$.next({ - elasticsearch: { - level: ServiceStatusLevels.available, - summary: '', - }, - savedObjects: { - level: ServiceStatusLevels.unavailable, - summary: '', - }, - }); + core$.next(mockCoreStatusAvailability({ elasticsearch: true, savedObjects: false })); // still false, so shouldn't emit a second time - core$.next({ - elasticsearch: { - level: ServiceStatusLevels.unavailable, - summary: '', - }, - savedObjects: { - level: ServiceStatusLevels.available, - summary: '', - }, - }); + core$.next(mockCoreStatusAvailability({ elasticsearch: false, savedObjects: true })); - core$.next({ - elasticsearch: { - level: ServiceStatusLevels.available, - summary: '', - }, - savedObjects: { - level: ServiceStatusLevels.available, - summary: '', - }, - }); + core$.next(mockCoreStatusAvailability({ elasticsearch: true, savedObjects: true })); // shouldn't emit as already true - core$.next({ - elasticsearch: { - level: ServiceStatusLevels.available, - summary: '', - }, - savedObjects: { - level: ServiceStatusLevels.available, - summary: '', - }, - }); + core$.next(mockCoreStatusAvailability({ elasticsearch: true, savedObjects: true })); - core$.next({ - elasticsearch: { - level: ServiceStatusLevels.unavailable, - summary: '', - }, - savedObjects: { - level: ServiceStatusLevels.unavailable, - summary: '', - }, - }); + core$.next(mockCoreStatusAvailability({ elasticsearch: false, savedObjects: false })); expect(await availability).toEqual([false, true, false]); }); }); }); + +function mockCoreStatusAvailability({ + elasticsearch, + savedObjects, +}: { + elasticsearch: boolean; + savedObjects: boolean; +}) { + return { + elasticsearch: { + level: elasticsearch ? ServiceStatusLevels.available : ServiceStatusLevels.unavailable, + summary: '', + }, + savedObjects: { + level: savedObjects ? ServiceStatusLevels.available : ServiceStatusLevels.unavailable, + summary: '', + }, + }; +} From 1017116b6b4ccf0f3ddd89f1ff4aea5f42615817 Mon Sep 17 00:00:00 2001 From: Gidi Meir Morris Date: Wed, 28 Oct 2020 12:59:47 +0000 Subject: [PATCH 7/7] reactively disabled workload aggregation when services become unavailable --- .../task_manager/server/monitoring/index.ts | 10 +++- .../monitoring/monitoring_stats_stream.ts | 2 + .../monitoring/workload_statistics.test.ts | 48 ++++++++++++++++++- .../server/monitoring/workload_statistics.ts | 8 ++-- x-pack/plugins/task_manager/server/plugin.ts | 1 + 5 files changed, 64 insertions(+), 5 deletions(-) diff --git a/x-pack/plugins/task_manager/server/monitoring/index.ts b/x-pack/plugins/task_manager/server/monitoring/index.ts index 8e71ce2519a7c..0a4c8c56a5a79 100644 --- a/x-pack/plugins/task_manager/server/monitoring/index.ts +++ b/x-pack/plugins/task_manager/server/monitoring/index.ts @@ -28,12 +28,20 @@ export { export function createMonitoringStats( taskPollingLifecycle: TaskPollingLifecycle, taskStore: TaskStore, + elasticsearchAndSOAvailability$: Observable, config: TaskManagerConfig, managedConfig: ManagedConfiguration, logger: Logger ): Observable { return createMonitoringStatsStream( - createAggregators(taskPollingLifecycle, taskStore, config, managedConfig, logger), + createAggregators( + taskPollingLifecycle, + taskStore, + elasticsearchAndSOAvailability$, + config, + managedConfig, + logger + ), config ); } diff --git a/x-pack/plugins/task_manager/server/monitoring/monitoring_stats_stream.ts b/x-pack/plugins/task_manager/server/monitoring/monitoring_stats_stream.ts index 374660a257c59..524afb8d78e21 100644 --- a/x-pack/plugins/task_manager/server/monitoring/monitoring_stats_stream.ts +++ b/x-pack/plugins/task_manager/server/monitoring/monitoring_stats_stream.ts @@ -63,6 +63,7 @@ export interface RawMonitoringStats { export function createAggregators( taskPollingLifecycle: TaskPollingLifecycle, taskStore: TaskStore, + elasticsearchAndSOAvailability$: Observable, config: TaskManagerConfig, managedConfig: ManagedConfiguration, logger: Logger @@ -72,6 +73,7 @@ export function createAggregators( createTaskRunAggregator(taskPollingLifecycle, config.monitored_stats_running_average_window), createWorkloadAggregator( taskStore, + elasticsearchAndSOAvailability$, config.monitored_aggregated_stats_refresh_rate, config.poll_interval, logger diff --git a/x-pack/plugins/task_manager/server/monitoring/workload_statistics.test.ts b/x-pack/plugins/task_manager/server/monitoring/workload_statistics.test.ts index d9af3307e75cb..cb6e48530b027 100644 --- a/x-pack/plugins/task_manager/server/monitoring/workload_statistics.test.ts +++ b/x-pack/plugins/task_manager/server/monitoring/workload_statistics.test.ts @@ -17,6 +17,8 @@ import { ESSearchResponse } from '../../../apm/typings/elasticsearch'; import { AggregationResultOf } from '../../../apm/typings/elasticsearch/aggregations'; import { times } from 'lodash'; import { taskStoreMock } from '../task_store.mock'; +import { of, Subject } from 'rxjs'; +import { sleep } from '../test_utils'; type MockESResult = ESSearchResponse< ConcreteTaskInstance, @@ -75,6 +77,7 @@ describe('Workload Statistics Aggregator', () => { const workloadAggregator = createWorkloadAggregator( taskStore, + of(true), 10, 3000, loggingSystemMock.create().get() @@ -231,6 +234,7 @@ describe('Workload Statistics Aggregator', () => { const workloadAggregator = createWorkloadAggregator( taskStore, + of(true), 10, 3000, loggingSystemMock.create().get() @@ -252,12 +256,51 @@ describe('Workload Statistics Aggregator', () => { }); }); + test('skips summary of the workload when services are unavailable', async () => { + const taskStore = taskStoreMock.create({}); + taskStore.aggregate.mockResolvedValue(mockAggregatedResult()); + + const availability$ = new Subject(); + + const workloadAggregator = createWorkloadAggregator( + taskStore, + availability$, + 10, + 3000, + loggingSystemMock.create().get() + ); + + return new Promise(async (resolve) => { + workloadAggregator.pipe(first()).subscribe((result) => { + expect(result.key).toEqual('workload'); + expect(result.value).toMatchObject({ + count: 4, + task_types: { + actions_telemetry: { count: 2, status: { idle: 2 } }, + alerting_telemetry: { count: 1, status: { idle: 1 } }, + session_cleanup: { count: 1, status: { idle: 1 } }, + }, + }); + resolve(); + }); + + availability$.next(false); + + await sleep(10); + expect(taskStore.aggregate).not.toHaveBeenCalled(); + await sleep(10); + expect(taskStore.aggregate).not.toHaveBeenCalled(); + availability$.next(true); + }); + }); + test('returns a count of the overdue workload', async () => { const taskStore = taskStoreMock.create({}); taskStore.aggregate.mockResolvedValue(mockAggregatedResult()); const workloadAggregator = createWorkloadAggregator( taskStore, + of(true), 10, 3000, loggingSystemMock.create().get() @@ -280,6 +323,7 @@ describe('Workload Statistics Aggregator', () => { const workloadAggregator = createWorkloadAggregator( taskStore, + of(true), 10, 3000, loggingSystemMock.create().get() @@ -307,6 +351,7 @@ describe('Workload Statistics Aggregator', () => { const workloadAggregator = createWorkloadAggregator( taskStore, + of(true), 60 * 1000, 3000, loggingSystemMock.create().get() @@ -344,6 +389,7 @@ describe('Workload Statistics Aggregator', () => { const workloadAggregator = createWorkloadAggregator( taskStore, + of(true), 15 * 60 * 1000, 3000, loggingSystemMock.create().get() @@ -392,7 +438,7 @@ describe('Workload Statistics Aggregator', () => { }) ); const logger = loggingSystemMock.create().get(); - const workloadAggregator = createWorkloadAggregator(taskStore, 10, 3000, logger); + const workloadAggregator = createWorkloadAggregator(taskStore, of(true), 10, 3000, logger); return new Promise((resolve, reject) => { workloadAggregator.pipe(take(2), bufferCount(2)).subscribe((results) => { diff --git a/x-pack/plugins/task_manager/server/monitoring/workload_statistics.ts b/x-pack/plugins/task_manager/server/monitoring/workload_statistics.ts index fe70f24684ad9..17448ea412ae6 100644 --- a/x-pack/plugins/task_manager/server/monitoring/workload_statistics.ts +++ b/x-pack/plugins/task_manager/server/monitoring/workload_statistics.ts @@ -4,8 +4,8 @@ * you may not use this file except in compliance with the Elastic License. */ -import { timer } from 'rxjs'; -import { mergeMap, map, catchError } from 'rxjs/operators'; +import { combineLatest, Observable, timer } from 'rxjs'; +import { mergeMap, map, filter, catchError } from 'rxjs/operators'; import { Logger } from 'src/core/server'; import { JsonObject } from 'src/plugins/kibana_utils/common'; import { keyBy, mapValues } from 'lodash'; @@ -94,6 +94,7 @@ const MAX_SHCEDULE_DENSITY_BUCKETS = 50; export function createWorkloadAggregator( taskStore: TaskStore, + elasticsearchAndSOAvailability$: Observable, refreshInterval: number, pollInterval: number, logger: Logger @@ -105,7 +106,8 @@ export function createWorkloadAggregator( MAX_SHCEDULE_DENSITY_BUCKETS ); - return timer(0, refreshInterval).pipe( + return combineLatest([timer(0, refreshInterval), elasticsearchAndSOAvailability$]).pipe( + filter(([, areElasticsearchAndSOAvailable]) => areElasticsearchAndSOAvailable), mergeMap(() => taskStore.aggregate({ aggs: { diff --git a/x-pack/plugins/task_manager/server/plugin.ts b/x-pack/plugins/task_manager/server/plugin.ts index c3b9d6e917b28..70688cd169d7e 100644 --- a/x-pack/plugins/task_manager/server/plugin.ts +++ b/x-pack/plugins/task_manager/server/plugin.ts @@ -139,6 +139,7 @@ export class TaskManagerPlugin createMonitoringStats( this.taskPollingLifecycle, taskStore, + this.elasticsearchAndSOAvailability$!, this.config!, managedConfiguration, this.logger