Skip to content

Commit

Permalink
introduced poller monitor
Browse files Browse the repository at this point in the history
  • Loading branch information
gmmorris committed Aug 19, 2020
1 parent a3fd505 commit 04f06c3
Show file tree
Hide file tree
Showing 8 changed files with 299 additions and 18 deletions.
9 changes: 9 additions & 0 deletions x-pack/plugins/task_manager/server/polling/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

export { createObservableMonitor } from './observable_monitor';
export { createTaskPoller, PollingError, PollingErrorType } from './task_poller';
export { timeoutPromiseAfter } from './timeout_promise_after';
170 changes: 170 additions & 0 deletions x-pack/plugins/task_manager/server/polling/observable_monitor.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

import { interval, from, Subject } from 'rxjs';
import { map, concatMap, takeWhile, take } from 'rxjs/operators';

import { createObservableMonitor } from './observable_monitor';
import { times } from 'lodash';

describe('Poll Monitor', () => {
test('returns a cold observable so that the monitored Observable is only created on demand', async () => {
const instantiator = jest.fn(() => new Subject());

createObservableMonitor(instantiator);

expect(instantiator).not.toHaveBeenCalled();
});

test('subscribing to the observable instantiates a new observable and pipes its results through', async () => {
const instantiator = jest.fn(() => from([0, 1, 2]));
const monitoredObservable = createObservableMonitor(instantiator);

expect(instantiator).not.toHaveBeenCalled();

return new Promise((resolve) => {
const next = jest.fn();
monitoredObservable.pipe(take(3)).subscribe({
next,
complete: () => {
expect(instantiator).toHaveBeenCalled();
expect(next).toHaveBeenCalledWith(0);
expect(next).toHaveBeenCalledWith(1);
expect(next).toHaveBeenCalledWith(2);
resolve();
},
});
});
});

test('unsubscribing from the monitor prevents the monitor from resubscribing to the observable', async () => {
const heartbeatInterval = 1000;
const instantiator = jest.fn(() => interval(100));
const monitoredObservable = createObservableMonitor(instantiator, { heartbeatInterval });

return new Promise((resolve) => {
const next = jest.fn();
monitoredObservable.pipe(take(3)).subscribe({
next,
complete: () => {
expect(instantiator).toHaveBeenCalledTimes(1);
setTimeout(() => {
expect(instantiator).toHaveBeenCalledTimes(1);
resolve();
}, heartbeatInterval * 2);
},
});
});
});

test(`ensures the observable subscription hasn't closed at a fixed interval and reinstantiates if it has`, async () => {
let iteration = 0;
const instantiator = jest.fn(() => {
iteration++;
return interval(100).pipe(
map((index) => `${iteration}:${index}`),
// throw on 3rd value of the first iteration
map((value, index) => {
if (iteration === 1 && index === 3) {
throw new Error('Source threw an error!');
}
return value;
})
);
});

const onError = jest.fn();
const monitoredObservable = createObservableMonitor(instantiator, { onError });

return new Promise((resolve) => {
const next = jest.fn();
const error = jest.fn();
monitoredObservable
.pipe(
// unsubscribe once we confirm we have successfully recovered from an error in the source
takeWhile(function validateExpectation() {
try {
[...times(3, (index) => `1:${index}`), ...times(5, (index) => `2:${index}`)].forEach(
(expecteArg) => {
expect(next).toHaveBeenCalledWith(expecteArg);
}
);
return false;
} catch {
return true;
}
})
)
.subscribe({
next,
error,
complete: () => {
expect(error).not.toHaveBeenCalled();
expect(onError).toHaveBeenCalledWith(new Error('Source threw an error!'));
resolve();
},
});
});
});

test(`ensures the observable subscription hasn't hung at a fixed interval and reinstantiates if it has`, async () => {
let iteration = 0;
const instantiator = jest.fn(() => {
iteration++;
return interval(100).pipe(
map((index) => `${iteration}:${index}`),
// hang on 3rd value of the first iteration
concatMap((value, index) => {
if (iteration === 1 && index === 3) {
return new Promise(() => {
// never resolve or reject, just hang for EVER
});
}
return Promise.resolve(value);
})
);
});

const onError = jest.fn();
const monitoredObservable = createObservableMonitor(instantiator, {
onError,
heartbeatInterval: 100,
inactivityTimeout: 500,
});

return new Promise((resolve) => {
const next = jest.fn();
const error = jest.fn();
monitoredObservable
.pipe(
// unsubscribe once we confirm we have successfully recovered from an error in the source
takeWhile(function validateExpectation() {
try {
[...times(3, (index) => `1:${index}`), ...times(5, (index) => `2:${index}`)].forEach(
(expecteArg) => {
expect(next).toHaveBeenCalledWith(expecteArg);
}
);
return false;
} catch {
return true;
}
})
)
.subscribe({
next,
error,
complete: () => {
expect(error).not.toHaveBeenCalled();
expect(onError).toHaveBeenCalledWith(
new Error(`Observable Monitor: Hung Observable restarted after 500ms of inactivity`)
);
resolve();
},
});
});
});
});
77 changes: 77 additions & 0 deletions x-pack/plugins/task_manager/server/polling/observable_monitor.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/

import { Subject, Observable, throwError, interval, timer, Subscription } from 'rxjs';
import { exhaustMap, tap, takeUntil, switchMap, switchMapTo, catchError } from 'rxjs/operators';
import { noop } from 'lodash';

const DEFAULT_HEARTBEAT_INTERVAL = 1000;

// by default don't monitor inactivity as not all observables are expected
// to emit at any kind of fixed interval
const DEFAULT_INACTIVITY_TIMEOUT = 0;

export interface ObservableMonitorOptions<E> {
heartbeatInterval?: number;
inactivityTimeout?: number;
onError?: (err: E) => void;
}

export function createObservableMonitor<T, E>(
observableFactory: () => Observable<T>,
{
heartbeatInterval = DEFAULT_HEARTBEAT_INTERVAL,
inactivityTimeout = DEFAULT_INACTIVITY_TIMEOUT,
onError = noop,
}: ObservableMonitorOptions<E> = {}
): Observable<T> {
return new Observable((subscriber) => {
const subscription: Subscription = interval(heartbeatInterval)
.pipe(
// switch from the heartbeat interval to the instantiated observable until it completes / errors
exhaustMap(() => takeUntilDurationOfInactivity(observableFactory(), inactivityTimeout)),
// if an error is thrown, catch it, notify and try to recover
catchError((err: E, source$: Observable<T>) => {
onError(err);
// return source, which will allow our observable to recover from this error and
// keep pulling values out of it
return source$;
})
)
.subscribe(subscriber);
return () => {
subscription.unsubscribe();
};
});
}

function takeUntilDurationOfInactivity<T>(source$: Observable<T>, inactivityTimeout: number) {
// if there's a specified maximum duration of inactivity, only take values until that
// duration elapses without any new events
if (inactivityTimeout) {
// an observable which starts a timer every time a new value is passed in, replacing the previous timer
// if the timer goes off without having been reset by a fresh value, it will emit a single event - which will
// notify our monitor that the source has been inactive for too long
const inactivityMonitor$ = new Subject<void>();
return source$.pipe(
takeUntil(
inactivityMonitor$.pipe(
switchMap(() => timer(inactivityTimeout)),
switchMapTo(
throwError(
new Error(
`Observable Monitor: Hung Observable restarted after ${inactivityTimeout}ms of inactivity`
)
)
)
)
),
// poke `inactivityMonitor$` so it restarts the timer
tap(() => inactivityMonitor$.next())
);
}
return source$;
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ import { Subject } from 'rxjs';
import { Option, none, some } from 'fp-ts/lib/Option';
import { createTaskPoller, PollingError, PollingErrorType } from './task_poller';
import { fakeSchedulers } from 'rxjs-marbles/jest';
import { sleep, resolvable, Resolvable } from './test_utils';
import { asOk, asErr } from './lib/result_type';
import { sleep, resolvable, Resolvable } from '../test_utils';
import { asOk, asErr } from '../lib/result_type';

describe('TaskPoller', () => {
beforeEach(() => jest.useFakeTimers());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import { mapTo, filter, scan, concatMap, tap, catchError } from 'rxjs/operators'

import { pipe } from 'fp-ts/lib/pipeable';
import { Option, none, map as mapOptional, getOrElse } from 'fp-ts/lib/Option';
import { pullFromSet } from './lib/pull_from_set';
import { pullFromSet } from '../lib/pull_from_set';
import {
Result,
Err,
Expand All @@ -24,8 +24,8 @@ import {
asOk,
asErr,
promiseResult,
} from './lib/result_type';
import { timeoutPromiseAfter } from './lib/timeout_promise_after';
} from '../lib/result_type';
import { timeoutPromiseAfter } from './timeout_promise_after';

type WorkFn<T, H> = (...params: T[]) => Promise<H>;

Expand Down
51 changes: 38 additions & 13 deletions x-pack/plugins/task_manager/server/task_manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,12 @@ import {
TaskStatus,
ElasticJs,
} from './task';
import { createTaskPoller, PollingError, PollingErrorType } from './task_poller';
import {
createTaskPoller,
PollingError,
PollingErrorType,
createObservableMonitor,
} from './polling';
import { TaskPool } from './task_pool';
import { TaskManagerRunner, TaskRunner } from './task_runner';
import {
Expand Down Expand Up @@ -153,18 +158,38 @@ export class TaskManager {
maxWorkers: opts.config.max_workers,
});

this.poller$ = createTaskPoller<string, FillPoolResult>({
pollInterval: opts.config.poll_interval,
bufferCapacity: opts.config.request_capacity,
getCapacity: () => this.pool.availableWorkers,
pollRequests$: this.claimRequests$,
work: this.pollForWork,
// Time out the `work` phase if it takes longer than a certain number of polling cycles
// The `work` phase includes the prework needed *before* executing a task
// (such as polling for new work, marking tasks as running etc.) but does not
// include the time of actually running the task
workTimeout: opts.config.poll_interval * opts.config.max_poll_inactivity_cycles,
});
const {
max_poll_inactivity_cycles: maxPollInactivityCycles,
poll_interval: pollInterval,
} = opts.config;
this.poller$ = createObservableMonitor<Result<FillPoolResult, PollingError<string>>, Error>(
() =>
createTaskPoller<string, FillPoolResult>({
pollInterval,
bufferCapacity: opts.config.request_capacity,
getCapacity: () => this.pool.availableWorkers,
pollRequests$: this.claimRequests$,
work: this.pollForWork,
// Time out the `work` phase if it takes longer than a certain number of polling cycles
// The `work` phase includes the prework needed *before* executing a task
// (such as polling for new work, marking tasks as running etc.) but does not
// include the time of actually running the task
workTimeout: pollInterval * maxPollInactivityCycles,
}),
{
heartbeatInterval: pollInterval,
// Time out the poller itself if it has failed to complete the entire stream for a certain amount of time.
// This is different that the `work` timeout above, as the poller could enter an invalid state where
// it fails to complete a cycle even thought `work` is completing quickly.
// We grant it a single cycle longer than the time alotted to `work` so that timing out the `work`
// doesn't get short circuited by the monitor reinstantiating the poller all together (a far more expensive
// operation than just timing out the `work` internally)
inactivityTimeout: pollInterval * (maxPollInactivityCycles + 1),
onError: (error) => {
this.logger.error(error.message);
},
}
);
}

private emitEvent = (event: TaskLifecycleEvent) => {
Expand Down

0 comments on commit 04f06c3

Please sign in to comment.