Skip to content

Commit

Permalink
reactively disabled workload aggregation when services become unavail…
Browse files Browse the repository at this point in the history
…able
  • Loading branch information
gmmorris committed Oct 28, 2020
1 parent 03128e2 commit 1017116
Show file tree
Hide file tree
Showing 5 changed files with 64 additions and 5 deletions.
10 changes: 9 additions & 1 deletion x-pack/plugins/task_manager/server/monitoring/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,20 @@ export {
export function createMonitoringStats(
taskPollingLifecycle: TaskPollingLifecycle,
taskStore: TaskStore,
elasticsearchAndSOAvailability$: Observable<boolean>,
config: TaskManagerConfig,
managedConfig: ManagedConfiguration,
logger: Logger
): Observable<MonitoringStats> {
return createMonitoringStatsStream(
createAggregators(taskPollingLifecycle, taskStore, config, managedConfig, logger),
createAggregators(
taskPollingLifecycle,
taskStore,
elasticsearchAndSOAvailability$,
config,
managedConfig,
logger
),
config
);
}
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ export interface RawMonitoringStats {
export function createAggregators(
taskPollingLifecycle: TaskPollingLifecycle,
taskStore: TaskStore,
elasticsearchAndSOAvailability$: Observable<boolean>,
config: TaskManagerConfig,
managedConfig: ManagedConfiguration,
logger: Logger
Expand All @@ -72,6 +73,7 @@ export function createAggregators(
createTaskRunAggregator(taskPollingLifecycle, config.monitored_stats_running_average_window),
createWorkloadAggregator(
taskStore,
elasticsearchAndSOAvailability$,
config.monitored_aggregated_stats_refresh_rate,
config.poll_interval,
logger
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ import { ESSearchResponse } from '../../../apm/typings/elasticsearch';
import { AggregationResultOf } from '../../../apm/typings/elasticsearch/aggregations';
import { times } from 'lodash';
import { taskStoreMock } from '../task_store.mock';
import { of, Subject } from 'rxjs';
import { sleep } from '../test_utils';

type MockESResult = ESSearchResponse<
ConcreteTaskInstance,
Expand Down Expand Up @@ -75,6 +77,7 @@ describe('Workload Statistics Aggregator', () => {

const workloadAggregator = createWorkloadAggregator(
taskStore,
of(true),
10,
3000,
loggingSystemMock.create().get()
Expand Down Expand Up @@ -231,6 +234,7 @@ describe('Workload Statistics Aggregator', () => {

const workloadAggregator = createWorkloadAggregator(
taskStore,
of(true),
10,
3000,
loggingSystemMock.create().get()
Expand All @@ -252,12 +256,51 @@ describe('Workload Statistics Aggregator', () => {
});
});

test('skips summary of the workload when services are unavailable', async () => {
const taskStore = taskStoreMock.create({});
taskStore.aggregate.mockResolvedValue(mockAggregatedResult());

const availability$ = new Subject<boolean>();

const workloadAggregator = createWorkloadAggregator(
taskStore,
availability$,
10,
3000,
loggingSystemMock.create().get()
);

return new Promise(async (resolve) => {
workloadAggregator.pipe(first()).subscribe((result) => {
expect(result.key).toEqual('workload');
expect(result.value).toMatchObject({
count: 4,
task_types: {
actions_telemetry: { count: 2, status: { idle: 2 } },
alerting_telemetry: { count: 1, status: { idle: 1 } },
session_cleanup: { count: 1, status: { idle: 1 } },
},
});
resolve();
});

availability$.next(false);

await sleep(10);
expect(taskStore.aggregate).not.toHaveBeenCalled();
await sleep(10);
expect(taskStore.aggregate).not.toHaveBeenCalled();
availability$.next(true);
});
});

test('returns a count of the overdue workload', async () => {
const taskStore = taskStoreMock.create({});
taskStore.aggregate.mockResolvedValue(mockAggregatedResult());

const workloadAggregator = createWorkloadAggregator(
taskStore,
of(true),
10,
3000,
loggingSystemMock.create().get()
Expand All @@ -280,6 +323,7 @@ describe('Workload Statistics Aggregator', () => {

const workloadAggregator = createWorkloadAggregator(
taskStore,
of(true),
10,
3000,
loggingSystemMock.create().get()
Expand Down Expand Up @@ -307,6 +351,7 @@ describe('Workload Statistics Aggregator', () => {

const workloadAggregator = createWorkloadAggregator(
taskStore,
of(true),
60 * 1000,
3000,
loggingSystemMock.create().get()
Expand Down Expand Up @@ -344,6 +389,7 @@ describe('Workload Statistics Aggregator', () => {

const workloadAggregator = createWorkloadAggregator(
taskStore,
of(true),
15 * 60 * 1000,
3000,
loggingSystemMock.create().get()
Expand Down Expand Up @@ -392,7 +438,7 @@ describe('Workload Statistics Aggregator', () => {
})
);
const logger = loggingSystemMock.create().get();
const workloadAggregator = createWorkloadAggregator(taskStore, 10, 3000, logger);
const workloadAggregator = createWorkloadAggregator(taskStore, of(true), 10, 3000, logger);

return new Promise((resolve, reject) => {
workloadAggregator.pipe(take(2), bufferCount(2)).subscribe((results) => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@
* you may not use this file except in compliance with the Elastic License.
*/

import { timer } from 'rxjs';
import { mergeMap, map, catchError } from 'rxjs/operators';
import { combineLatest, Observable, timer } from 'rxjs';
import { mergeMap, map, filter, catchError } from 'rxjs/operators';
import { Logger } from 'src/core/server';
import { JsonObject } from 'src/plugins/kibana_utils/common';
import { keyBy, mapValues } from 'lodash';
Expand Down Expand Up @@ -94,6 +94,7 @@ const MAX_SHCEDULE_DENSITY_BUCKETS = 50;

export function createWorkloadAggregator(
taskStore: TaskStore,
elasticsearchAndSOAvailability$: Observable<boolean>,
refreshInterval: number,
pollInterval: number,
logger: Logger
Expand All @@ -105,7 +106,8 @@ export function createWorkloadAggregator(
MAX_SHCEDULE_DENSITY_BUCKETS
);

return timer(0, refreshInterval).pipe(
return combineLatest([timer(0, refreshInterval), elasticsearchAndSOAvailability$]).pipe(
filter(([, areElasticsearchAndSOAvailable]) => areElasticsearchAndSOAvailable),
mergeMap(() =>
taskStore.aggregate({
aggs: {
Expand Down
1 change: 1 addition & 0 deletions x-pack/plugins/task_manager/server/plugin.ts
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ export class TaskManagerPlugin
createMonitoringStats(
this.taskPollingLifecycle,
taskStore,
this.elasticsearchAndSOAvailability$!,
this.config!,
managedConfiguration,
this.logger
Expand Down

0 comments on commit 1017116

Please sign in to comment.