diff --git a/x-pack/plugins/task_manager/server/monitoring/index.ts b/x-pack/plugins/task_manager/server/monitoring/index.ts index 8e71ce2519a7c..0a4c8c56a5a79 100644 --- a/x-pack/plugins/task_manager/server/monitoring/index.ts +++ b/x-pack/plugins/task_manager/server/monitoring/index.ts @@ -28,12 +28,20 @@ export { export function createMonitoringStats( taskPollingLifecycle: TaskPollingLifecycle, taskStore: TaskStore, + elasticsearchAndSOAvailability$: Observable, config: TaskManagerConfig, managedConfig: ManagedConfiguration, logger: Logger ): Observable { return createMonitoringStatsStream( - createAggregators(taskPollingLifecycle, taskStore, config, managedConfig, logger), + createAggregators( + taskPollingLifecycle, + taskStore, + elasticsearchAndSOAvailability$, + config, + managedConfig, + logger + ), config ); } diff --git a/x-pack/plugins/task_manager/server/monitoring/monitoring_stats_stream.ts b/x-pack/plugins/task_manager/server/monitoring/monitoring_stats_stream.ts index 374660a257c59..524afb8d78e21 100644 --- a/x-pack/plugins/task_manager/server/monitoring/monitoring_stats_stream.ts +++ b/x-pack/plugins/task_manager/server/monitoring/monitoring_stats_stream.ts @@ -63,6 +63,7 @@ export interface RawMonitoringStats { export function createAggregators( taskPollingLifecycle: TaskPollingLifecycle, taskStore: TaskStore, + elasticsearchAndSOAvailability$: Observable, config: TaskManagerConfig, managedConfig: ManagedConfiguration, logger: Logger @@ -72,6 +73,7 @@ export function createAggregators( createTaskRunAggregator(taskPollingLifecycle, config.monitored_stats_running_average_window), createWorkloadAggregator( taskStore, + elasticsearchAndSOAvailability$, config.monitored_aggregated_stats_refresh_rate, config.poll_interval, logger diff --git a/x-pack/plugins/task_manager/server/monitoring/workload_statistics.test.ts b/x-pack/plugins/task_manager/server/monitoring/workload_statistics.test.ts index d9af3307e75cb..cb6e48530b027 100644 --- a/x-pack/plugins/task_manager/server/monitoring/workload_statistics.test.ts +++ b/x-pack/plugins/task_manager/server/monitoring/workload_statistics.test.ts @@ -17,6 +17,8 @@ import { ESSearchResponse } from '../../../apm/typings/elasticsearch'; import { AggregationResultOf } from '../../../apm/typings/elasticsearch/aggregations'; import { times } from 'lodash'; import { taskStoreMock } from '../task_store.mock'; +import { of, Subject } from 'rxjs'; +import { sleep } from '../test_utils'; type MockESResult = ESSearchResponse< ConcreteTaskInstance, @@ -75,6 +77,7 @@ describe('Workload Statistics Aggregator', () => { const workloadAggregator = createWorkloadAggregator( taskStore, + of(true), 10, 3000, loggingSystemMock.create().get() @@ -231,6 +234,7 @@ describe('Workload Statistics Aggregator', () => { const workloadAggregator = createWorkloadAggregator( taskStore, + of(true), 10, 3000, loggingSystemMock.create().get() @@ -252,12 +256,51 @@ describe('Workload Statistics Aggregator', () => { }); }); + test('skips summary of the workload when services are unavailable', async () => { + const taskStore = taskStoreMock.create({}); + taskStore.aggregate.mockResolvedValue(mockAggregatedResult()); + + const availability$ = new Subject(); + + const workloadAggregator = createWorkloadAggregator( + taskStore, + availability$, + 10, + 3000, + loggingSystemMock.create().get() + ); + + return new Promise(async (resolve) => { + workloadAggregator.pipe(first()).subscribe((result) => { + expect(result.key).toEqual('workload'); + expect(result.value).toMatchObject({ + count: 4, + task_types: { + actions_telemetry: { count: 2, status: { idle: 2 } }, + alerting_telemetry: { count: 1, status: { idle: 1 } }, + session_cleanup: { count: 1, status: { idle: 1 } }, + }, + }); + resolve(); + }); + + availability$.next(false); + + await sleep(10); + expect(taskStore.aggregate).not.toHaveBeenCalled(); + await sleep(10); + expect(taskStore.aggregate).not.toHaveBeenCalled(); + availability$.next(true); + }); + }); + test('returns a count of the overdue workload', async () => { const taskStore = taskStoreMock.create({}); taskStore.aggregate.mockResolvedValue(mockAggregatedResult()); const workloadAggregator = createWorkloadAggregator( taskStore, + of(true), 10, 3000, loggingSystemMock.create().get() @@ -280,6 +323,7 @@ describe('Workload Statistics Aggregator', () => { const workloadAggregator = createWorkloadAggregator( taskStore, + of(true), 10, 3000, loggingSystemMock.create().get() @@ -307,6 +351,7 @@ describe('Workload Statistics Aggregator', () => { const workloadAggregator = createWorkloadAggregator( taskStore, + of(true), 60 * 1000, 3000, loggingSystemMock.create().get() @@ -344,6 +389,7 @@ describe('Workload Statistics Aggregator', () => { const workloadAggregator = createWorkloadAggregator( taskStore, + of(true), 15 * 60 * 1000, 3000, loggingSystemMock.create().get() @@ -392,7 +438,7 @@ describe('Workload Statistics Aggregator', () => { }) ); const logger = loggingSystemMock.create().get(); - const workloadAggregator = createWorkloadAggregator(taskStore, 10, 3000, logger); + const workloadAggregator = createWorkloadAggregator(taskStore, of(true), 10, 3000, logger); return new Promise((resolve, reject) => { workloadAggregator.pipe(take(2), bufferCount(2)).subscribe((results) => { diff --git a/x-pack/plugins/task_manager/server/monitoring/workload_statistics.ts b/x-pack/plugins/task_manager/server/monitoring/workload_statistics.ts index fe70f24684ad9..17448ea412ae6 100644 --- a/x-pack/plugins/task_manager/server/monitoring/workload_statistics.ts +++ b/x-pack/plugins/task_manager/server/monitoring/workload_statistics.ts @@ -4,8 +4,8 @@ * you may not use this file except in compliance with the Elastic License. */ -import { timer } from 'rxjs'; -import { mergeMap, map, catchError } from 'rxjs/operators'; +import { combineLatest, Observable, timer } from 'rxjs'; +import { mergeMap, map, filter, catchError } from 'rxjs/operators'; import { Logger } from 'src/core/server'; import { JsonObject } from 'src/plugins/kibana_utils/common'; import { keyBy, mapValues } from 'lodash'; @@ -94,6 +94,7 @@ const MAX_SHCEDULE_DENSITY_BUCKETS = 50; export function createWorkloadAggregator( taskStore: TaskStore, + elasticsearchAndSOAvailability$: Observable, refreshInterval: number, pollInterval: number, logger: Logger @@ -105,7 +106,8 @@ export function createWorkloadAggregator( MAX_SHCEDULE_DENSITY_BUCKETS ); - return timer(0, refreshInterval).pipe( + return combineLatest([timer(0, refreshInterval), elasticsearchAndSOAvailability$]).pipe( + filter(([, areElasticsearchAndSOAvailable]) => areElasticsearchAndSOAvailable), mergeMap(() => taskStore.aggregate({ aggs: { diff --git a/x-pack/plugins/task_manager/server/plugin.test.ts b/x-pack/plugins/task_manager/server/plugin.test.ts index 8388468164a4f..9a1d83f6195ab 100644 --- a/x-pack/plugins/task_manager/server/plugin.test.ts +++ b/x-pack/plugins/task_manager/server/plugin.test.ts @@ -4,9 +4,12 @@ * you may not use this file except in compliance with the Elastic License. */ -import { TaskManagerPlugin } from './plugin'; +import { TaskManagerPlugin, getElasticsearchAndSOAvailability } from './plugin'; import { coreMock } from '../../../../src/core/server/mocks'; import { TaskManagerConfig } from './config'; +import { Subject } from 'rxjs'; +import { bufferCount, take } from 'rxjs/operators'; +import { CoreStatus, ServiceStatusLevels } from 'src/core/server'; describe('TaskManagerPlugin', () => { describe('setup', () => { @@ -88,4 +91,99 @@ describe('TaskManagerPlugin', () => { ); }); }); + + describe('getElasticsearchAndSOAvailability', () => { + test('returns true when both services are available', async () => { + const core$ = new Subject(); + + const availability = getElasticsearchAndSOAvailability(core$) + .pipe(take(1), bufferCount(1)) + .toPromise(); + + core$.next(mockCoreStatusAvailability({ elasticsearch: true, savedObjects: true })); + + expect(await availability).toEqual([true]); + }); + + test('returns false when both services are unavailable', async () => { + const core$ = new Subject(); + + const availability = getElasticsearchAndSOAvailability(core$) + .pipe(take(1), bufferCount(1)) + .toPromise(); + + core$.next(mockCoreStatusAvailability({ elasticsearch: false, savedObjects: false })); + + expect(await availability).toEqual([false]); + }); + + test('returns false when one service is unavailable but the other is available', async () => { + const core$ = new Subject(); + + const availability = getElasticsearchAndSOAvailability(core$) + .pipe(take(1), bufferCount(1)) + .toPromise(); + + core$.next(mockCoreStatusAvailability({ elasticsearch: true, savedObjects: false })); + + expect(await availability).toEqual([false]); + }); + + test('shift back and forth between values as status changes', async () => { + const core$ = new Subject(); + + const availability = getElasticsearchAndSOAvailability(core$) + .pipe(take(3), bufferCount(3)) + .toPromise(); + + core$.next(mockCoreStatusAvailability({ elasticsearch: true, savedObjects: false })); + + core$.next(mockCoreStatusAvailability({ elasticsearch: true, savedObjects: true })); + + core$.next(mockCoreStatusAvailability({ elasticsearch: false, savedObjects: false })); + + expect(await availability).toEqual([false, true, false]); + }); + + test(`skips values when the status hasn't changed`, async () => { + const core$ = new Subject(); + + const availability = getElasticsearchAndSOAvailability(core$) + .pipe(take(3), bufferCount(3)) + .toPromise(); + + core$.next(mockCoreStatusAvailability({ elasticsearch: true, savedObjects: false })); + + // still false, so shouldn't emit a second time + core$.next(mockCoreStatusAvailability({ elasticsearch: false, savedObjects: true })); + + core$.next(mockCoreStatusAvailability({ elasticsearch: true, savedObjects: true })); + + // shouldn't emit as already true + core$.next(mockCoreStatusAvailability({ elasticsearch: true, savedObjects: true })); + + core$.next(mockCoreStatusAvailability({ elasticsearch: false, savedObjects: false })); + + expect(await availability).toEqual([false, true, false]); + }); + }); }); + +function mockCoreStatusAvailability({ + elasticsearch, + savedObjects, +}: { + elasticsearch: boolean; + savedObjects: boolean; +}) { + return { + elasticsearch: { + level: elasticsearch ? ServiceStatusLevels.available : ServiceStatusLevels.unavailable, + summary: '', + }, + savedObjects: { + level: savedObjects ? ServiceStatusLevels.available : ServiceStatusLevels.unavailable, + summary: '', + }, + }; +} diff --git a/x-pack/plugins/task_manager/server/plugin.ts b/x-pack/plugins/task_manager/server/plugin.ts index 0e7abb817490a..70688cd169d7e 100644 --- a/x-pack/plugins/task_manager/server/plugin.ts +++ b/x-pack/plugins/task_manager/server/plugin.ts @@ -3,9 +3,17 @@ * or more contributor license agreements. Licensed under the Elastic License; * you may not use this file except in compliance with the Elastic License. */ -import { PluginInitializerContext, Plugin, CoreSetup, Logger, CoreStart } from 'src/core/server'; -import { combineLatest, Subject } from 'rxjs'; -import { first, map } from 'rxjs/operators'; +import { combineLatest, Observable, Subject } from 'rxjs'; +import { first, map, distinctUntilChanged } from 'rxjs/operators'; +import { + PluginInitializerContext, + Plugin, + CoreSetup, + Logger, + CoreStart, + ServiceStatusLevels, + CoreStatus, +} from '../../../../src/core/server'; import { TaskDefinition } from './task'; import { TaskPollingLifecycle } from './polling_lifecycle'; import { TaskManagerConfig } from './config'; @@ -37,6 +45,7 @@ export class TaskManagerPlugin private logger: Logger; private definitions: TaskTypeDictionary; private middleware: Middleware = createInitialMiddleware(); + private elasticsearchAndSOAvailability$?: Observable; private monitoringStats$ = new Subject(); constructor(private readonly initContext: PluginInitializerContext) { @@ -51,6 +60,8 @@ export class TaskManagerPlugin .pipe(first()) .toPromise(); + this.elasticsearchAndSOAvailability$ = getElasticsearchAndSOAvailability(core.status.core$); + setupSavedObjects(core.savedObjects, this.config); this.taskManagerId = this.initContext.env.instanceUuid; @@ -115,19 +126,20 @@ export class TaskManagerPlugin startingPollInterval: this.config!.poll_interval, }); - const taskPollingLifecycle = new TaskPollingLifecycle({ + this.taskPollingLifecycle = new TaskPollingLifecycle({ config: this.config!, definitions: this.definitions, logger: this.logger, taskStore, middleware: this.middleware, + elasticsearchAndSOAvailability$: this.elasticsearchAndSOAvailability$!, ...managedConfiguration, }); - this.taskPollingLifecycle = taskPollingLifecycle; createMonitoringStats( - taskPollingLifecycle, + this.taskPollingLifecycle, taskStore, + this.elasticsearchAndSOAvailability$!, this.config!, managedConfiguration, this.logger @@ -137,12 +149,9 @@ export class TaskManagerPlugin logger: this.logger, taskStore, middleware: this.middleware, - taskPollingLifecycle, + taskPollingLifecycle: this.taskPollingLifecycle, }); - // start polling for work - taskPollingLifecycle.start(); - return { fetch: (opts: SearchOpts): Promise => taskStore.fetch(opts), get: (id: string) => taskStore.get(id), @@ -153,12 +162,6 @@ export class TaskManagerPlugin }; } - public stop() { - if (this.taskPollingLifecycle) { - this.taskPollingLifecycle.stop(); - } - } - /** * Ensures task manager hasn't started * @@ -171,3 +174,16 @@ export class TaskManagerPlugin } } } + +export function getElasticsearchAndSOAvailability( + core$: Observable +): Observable { + return core$.pipe( + map( + ({ elasticsearch, savedObjects }) => + elasticsearch.level === ServiceStatusLevels.available && + savedObjects.level === ServiceStatusLevels.available + ), + distinctUntilChanged() + ); +} diff --git a/x-pack/plugins/task_manager/server/polling_lifecycle.mock.ts b/x-pack/plugins/task_manager/server/polling_lifecycle.mock.ts index 9df1e06165bc6..286e29194d6e6 100644 --- a/x-pack/plugins/task_manager/server/polling_lifecycle.mock.ts +++ b/x-pack/plugins/task_manager/server/polling_lifecycle.mock.ts @@ -10,7 +10,6 @@ import { of, Observable } from 'rxjs'; export const taskPollingLifecycleMock = { create(opts: { isStarted?: boolean; events$?: Observable }) { return ({ - start: jest.fn(), attemptToRun: jest.fn(), get isStarted() { return opts.isStarted ?? true; @@ -18,7 +17,6 @@ export const taskPollingLifecycleMock = { get events() { return opts.events$ ?? of(); }, - stop: jest.fn(), } as unknown) as jest.Mocked; }, }; diff --git a/x-pack/plugins/task_manager/server/polling_lifecycle.test.ts b/x-pack/plugins/task_manager/server/polling_lifecycle.test.ts index 5f2e774177fd4..0f807976970cf 100644 --- a/x-pack/plugins/task_manager/server/polling_lifecycle.test.ts +++ b/x-pack/plugins/task_manager/server/polling_lifecycle.test.ts @@ -6,7 +6,7 @@ import _ from 'lodash'; import sinon from 'sinon'; -import { of } from 'rxjs'; +import { of, Subject } from 'rxjs'; import { TaskPollingLifecycle, claimAvailableTasks } from './polling_lifecycle'; import { createInitialMiddleware } from './lib/middleware'; @@ -55,15 +55,64 @@ describe('TaskPollingLifecycle', () => { afterEach(() => clock.restore()); describe('start', () => { - test('begins polling once start is called', () => { - const taskManager = new TaskPollingLifecycle(taskManagerOpts); + test('begins polling once the ES and SavedObjects services are available', () => { + const elasticsearchAndSOAvailability$ = new Subject(); + new TaskPollingLifecycle({ + elasticsearchAndSOAvailability$, + ...taskManagerOpts, + }); + + clock.tick(150); + expect(mockTaskStore.claimAvailableTasks).not.toHaveBeenCalled(); + + elasticsearchAndSOAvailability$.next(true); + + clock.tick(150); + expect(mockTaskStore.claimAvailableTasks).toHaveBeenCalled(); + }); + }); + + describe('stop', () => { + test('stops polling once the ES and SavedObjects services become unavailable', () => { + const elasticsearchAndSOAvailability$ = new Subject(); + new TaskPollingLifecycle({ + elasticsearchAndSOAvailability$, + ...taskManagerOpts, + }); + + elasticsearchAndSOAvailability$.next(true); + + clock.tick(150); + expect(mockTaskStore.claimAvailableTasks).toHaveBeenCalled(); + elasticsearchAndSOAvailability$.next(false); + + mockTaskStore.claimAvailableTasks.mockClear(); clock.tick(150); expect(mockTaskStore.claimAvailableTasks).not.toHaveBeenCalled(); + }); + + test('restarts polling once the ES and SavedObjects services become available again', () => { + const elasticsearchAndSOAvailability$ = new Subject(); + new TaskPollingLifecycle({ + elasticsearchAndSOAvailability$, + ...taskManagerOpts, + }); + + elasticsearchAndSOAvailability$.next(true); - taskManager.start(); + clock.tick(150); + expect(mockTaskStore.claimAvailableTasks).toHaveBeenCalled(); + elasticsearchAndSOAvailability$.next(false); + mockTaskStore.claimAvailableTasks.mockClear(); clock.tick(150); + + expect(mockTaskStore.claimAvailableTasks).not.toHaveBeenCalled(); + + elasticsearchAndSOAvailability$.next(true); + clock.tick(150); + expect(mockTaskStore.claimAvailableTasks).toHaveBeenCalled(); }); }); diff --git a/x-pack/plugins/task_manager/server/polling_lifecycle.ts b/x-pack/plugins/task_manager/server/polling_lifecycle.ts index ba19cb63fffa2..ccba750401f28 100644 --- a/x-pack/plugins/task_manager/server/polling_lifecycle.ts +++ b/x-pack/plugins/task_manager/server/polling_lifecycle.ts @@ -48,6 +48,7 @@ export type TaskPollingLifecycleOpts = { taskStore: TaskStore; config: TaskManagerConfig; middleware: Middleware; + elasticsearchAndSOAvailability$: Observable; } & ManagedConfiguration; export type TaskLifecycleEvent = @@ -72,8 +73,6 @@ export class TaskPollingLifecycle { private events$ = new Subject(); // all on-demand requests we wish to pipe into the poller private claimRequests$ = new Subject>(); - // the task poller that polls for work on fixed intervals and on demand - private poller$: Observable>>; // our subscription to the poller private pollingSubscription: Subscription = Subscription.EMPTY; @@ -84,36 +83,50 @@ export class TaskPollingLifecycle { * enabling the task manipulation methods, and beginning the background polling * mechanism. */ - constructor(opts: TaskPollingLifecycleOpts) { - const { logger, middleware, maxWorkersConfiguration$, pollIntervalConfiguration$ } = opts; + constructor({ + logger, + middleware, + maxWorkersConfiguration$, + pollIntervalConfiguration$, + // Elasticsearch and SavedObjects availability status + elasticsearchAndSOAvailability$, + config, + taskStore, + definitions, + }: TaskPollingLifecycleOpts) { this.logger = logger; this.middleware = middleware; + this.definitions = definitions; + this.store = taskStore; - this.definitions = opts.definitions; - this.store = opts.taskStore; // pipe store events into the lifecycle event stream this.store.events.subscribe((event) => this.events$.next(event)); this.bufferedStore = new BufferedTaskStore(this.store, { - bufferMaxOperations: opts.config.max_workers, - logger: this.logger, + bufferMaxOperations: config.max_workers, + logger, }); this.pool = new TaskPool({ - logger: this.logger, + logger, maxWorkers$: maxWorkersConfiguration$, }); const { max_poll_inactivity_cycles: maxPollInactivityCycles, poll_interval: pollInterval, - } = opts.config; - this.poller$ = createObservableMonitor>, Error>( + } = config; + + // the task poller that polls for work on fixed intervals and on demand + const poller$: Observable + >> = createObservableMonitor>, Error>( () => createTaskPoller({ - logger: this.logger, + logger, pollInterval$: pollIntervalConfiguration$, - bufferCapacity: opts.config.request_capacity, + bufferCapacity: config.request_capacity, getCapacity: () => this.pool.availableWorkers, pollRequests$: this.claimRequests$, work: this.pollForWork, @@ -133,10 +146,20 @@ export class TaskPollingLifecycle { // operation than just timing out the `work` internally) inactivityTimeout: pollInterval * (maxPollInactivityCycles + 1), onError: (error) => { - this.logger.error(`[Task Poller Monitor]: ${error.message}`); + logger.error(`[Task Poller Monitor]: ${error.message}`); }, } ); + + elasticsearchAndSOAvailability$.subscribe((areESAndSOAvailable) => { + if (areESAndSOAvailable && !this.isStarted) { + // start polling for work + this.pollingSubscription = this.subscribeToPoller(poller$); + } else if (!areESAndSOAvailable && this.isStarted) { + this.pollingSubscription.unsubscribe(); + this.pool.cancelRunningTasks(); + } + }); } public get events(): Observable { @@ -184,39 +207,24 @@ export class TaskPollingLifecycle { ); }; - /** - * Starts up the task manager and starts picking up tasks. - */ - public start() { - if (!this.isStarted) { - this.pollingSubscription = this.poller$ - .pipe( - tap( - mapErr((error: PollingError) => { - if (error.type === PollingErrorType.RequestCapacityReached) { - pipe( - error.data, - mapOptional((id) => this.emitEvent(asTaskRunRequestEvent(id, asErr(error)))) - ); - } - this.logger.error(error.message); - }) - ) + private subscribeToPoller(poller$: Observable>>) { + return poller$ + .pipe( + tap( + mapErr((error: PollingError) => { + if (error.type === PollingErrorType.RequestCapacityReached) { + pipe( + error.data, + mapOptional((id) => this.emitEvent(asTaskRunRequestEvent(id, asErr(error)))) + ); + } + this.logger.error(error.message); + }) ) - .subscribe((event: Result>) => { - this.emitEvent(asTaskPollingCycleEvent(event)); - }); - } - } - - /** - * Stops the task manager and cancels running tasks. - */ - public stop() { - if (this.isStarted) { - this.pollingSubscription.unsubscribe(); - this.pool.cancelRunningTasks(); - } + ) + .subscribe((event: Result>) => { + this.emitEvent(asTaskPollingCycleEvent(event)); + }); } }