Skip to content

Commit

Permalink
[Metrics Alerts] Add support for search query and groupBy on alerts (e…
Browse files Browse the repository at this point in the history
…lastic#59388)

* Add filterQuery to metric alert params

* Add groupBy alert support

* Fix typings

* Fix malformed query

* Fix filterQuery merge

* Fix groupBy afterkey insertion, add group name to alert action

* Convert iife to getter

* Fix type check

* Fix type check again

* Remove unnecessary order param
  • Loading branch information
Zacqary committed Mar 10, 2020
1 parent b89727e commit 8417b98
Show file tree
Hide file tree
Showing 4 changed files with 150 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,12 @@
* you may not use this file except in compliance with the Elastic License.
*/
import uuid from 'uuid';
import { mapValues } from 'lodash';
import { i18n } from '@kbn/i18n';
import { schema } from '@kbn/config-schema';
import { InfraDatabaseSearchResponse } from '../../adapters/framework/adapter_types';
import { createAfterKeyHandler } from '../../../utils/create_afterkey_handler';
import { getAllCompositeData } from '../../../utils/get_all_composite_data';
import { networkTraffic } from '../../../../common/inventory_models/shared/metrics/snapshot/network_traffic';
import {
MetricExpressionParams,
Expand All @@ -15,18 +19,58 @@ import {
} from './types';
import { AlertServices, PluginSetupContract } from '../../../../../alerting/server';

interface Aggregation {
aggregatedIntervals: { buckets: Array<{ aggregatedValue: { value: number } }> };
}

interface CompositeAggregationsResponse {
groupings: {
buckets: Aggregation[];
};
}

const FIRED_ACTIONS = {
id: 'metrics.threshold.fired',
name: i18n.translate('xpack.infra.metrics.alerting.threshold.fired', {
defaultMessage: 'Fired',
}),
};

async function getMetric(
{ callCluster }: AlertServices,
{ metric, aggType, timeUnit, timeSize, indexPattern }: MetricExpressionParams
const getCurrentValueFromAggregations = (aggregations: Aggregation) => {
const { buckets } = aggregations.aggregatedIntervals;
const { value } = buckets[buckets.length - 1].aggregatedValue;
return value;
};

const getParsedFilterQuery: (
filterQuery: string | undefined
) => Record<string, any> = filterQuery => {
if (!filterQuery) return {};
try {
return JSON.parse(filterQuery).bool;
} catch (e) {
return {
query_string: {
query: filterQuery,
analyze_wildcard: true,
},
};
}
};

const getMetric: (
services: AlertServices,
params: MetricExpressionParams,
groupBy: string | undefined,
filterQuery: string | undefined
) => Promise<Record<string, number>> = async function(
{ callCluster },
{ metric, aggType, timeUnit, timeSize, indexPattern },
groupBy,
filterQuery
) {
const interval = `${timeSize}${timeUnit}`;

const aggregations =
aggType === 'rate'
? networkTraffic('aggregatedValue', metric)
Expand All @@ -38,6 +82,38 @@ async function getMetric(
},
};

const baseAggs = {
aggregatedIntervals: {
date_histogram: {
field: '@timestamp',
fixed_interval: interval,
},
aggregations,
},
};

const aggs = groupBy
? {
groupings: {
composite: {
size: 10,
sources: [
{
groupBy: {
terms: {
field: groupBy,
},
},
},
],
},
aggs: baseAggs,
},
}
: baseAggs;

const parsedFilterQuery = getParsedFilterQuery(filterQuery);

const searchBody = {
query: {
bool: {
Expand All @@ -48,34 +124,49 @@ async function getMetric(
gte: `now-${interval}`,
},
},
},
{
exists: {
field: metric,
},
},
],
...parsedFilterQuery,
},
},
size: 0,
aggs: {
aggregatedIntervals: {
date_histogram: {
field: '@timestamp',
fixed_interval: interval,
},
aggregations,
},
},
aggs,
};

if (groupBy) {
const bucketSelector = (
response: InfraDatabaseSearchResponse<{}, CompositeAggregationsResponse>
) => response.aggregations?.groupings?.buckets || [];
const afterKeyHandler = createAfterKeyHandler(
'aggs.groupings.composite.after',
response => response.aggregations?.groupings?.after_key
);
const compositeBuckets = (await getAllCompositeData(
body => callCluster('search', { body, index: indexPattern }),
searchBody,
bucketSelector,
afterKeyHandler
)) as Array<Aggregation & { key: { groupBy: string } }>;
return compositeBuckets.reduce(
(result, bucket) => ({
...result,
[bucket.key.groupBy]: getCurrentValueFromAggregations(bucket),
}),
{}
);
}

const result = await callCluster('search', {
body: searchBody,
index: indexPattern,
});

const { buckets } = result.aggregations.aggregatedIntervals;
const { value } = buckets[buckets.length - 1].aggregatedValue;
return value;
}
return { '*': getCurrentValueFromAggregations(result.aggregations) };
};

const comparatorMap = {
[Comparator.BETWEEN]: (value: number, [a, b]: number[]) =>
Expand Down Expand Up @@ -112,39 +203,54 @@ export async function registerMetricThresholdAlertType(alertingPlugin: PluginSet
indexPattern: schema.string(),
})
),
groupBy: schema.maybe(schema.string()),
filterQuery: schema.maybe(schema.string()),
}),
},
defaultActionGroupId: FIRED_ACTIONS.id,
actionGroups: [FIRED_ACTIONS],
async executor({ services, params }) {
const { criteria } = params as { criteria: MetricExpressionParams[] };
const alertInstance = services.alertInstanceFactory(alertUUID);
const { criteria, groupBy, filterQuery } = params as {
criteria: MetricExpressionParams[];
groupBy: string | undefined;
filterQuery: string | undefined;
};

const alertResults = await Promise.all(
criteria.map(({ threshold, comparator }) =>
criteria.map(criterion =>
(async () => {
const currentValue = await getMetric(services, params as MetricExpressionParams);
if (typeof currentValue === 'undefined')
const currentValues = await getMetric(services, criterion, groupBy, filterQuery);
if (typeof currentValues === 'undefined')
throw new Error('Could not get current value of metric');

const { threshold, comparator } = criterion;
const comparisonFunction = comparatorMap[comparator];
return { shouldFire: comparisonFunction(currentValue, threshold), currentValue };

return mapValues(currentValues, value => ({
shouldFire: comparisonFunction(value, threshold),
currentValue: value,
}));
})()
)
);

const shouldAlertFire = alertResults.every(({ shouldFire }) => shouldFire);
const groups = Object.keys(alertResults[0]);
for (const group of groups) {
const alertInstance = services.alertInstanceFactory(`${alertUUID}-${group}`);

const shouldAlertFire = alertResults.every(result => result[group].shouldFire);

if (shouldAlertFire) {
alertInstance.scheduleActions(FIRED_ACTIONS.id, {
value: alertResults.map(({ currentValue }) => currentValue),
if (shouldAlertFire) {
alertInstance.scheduleActions(FIRED_ACTIONS.id, {
group,
value: alertResults.map(result => result[group].currentValue),
});
}

// Future use: ability to fetch display current alert state
alertInstance.replaceState({
alertState: shouldAlertFire ? AlertStates.ALERT : AlertStates.OK,
});
}

// Future use: ability to fetch display current alert state
alertInstance.replaceState({
alertState: shouldAlertFire ? AlertStates.ALERT : AlertStates.OK,
});
},
});
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,4 +31,5 @@ export interface MetricExpressionParams {
indexPattern: string;
threshold: number[];
comparator: Comparator;
filterQuery: string;
}
9 changes: 7 additions & 2 deletions x-pack/plugins/infra/server/lib/snapshot/snapshot.ts
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,11 @@ const handleAfterKey = createAfterKeyHandler(
input => input?.aggregations?.nodes?.after_key
);

const callClusterFactory = (framework: KibanaFramework, requestContext: RequestHandlerContext) => (
opts: any
) =>
framework.callWithRequest<{}, InfraSnapshotAggregationResponse>(requestContext, 'search', opts);

const requestGroupedNodes = async (
requestContext: RequestHandlerContext,
options: InfraSnapshotRequestOptions,
Expand Down Expand Up @@ -119,7 +124,7 @@ const requestGroupedNodes = async (
return await getAllCompositeData<
InfraSnapshotAggregationResponse,
InfraSnapshotNodeGroupByBucket
>(framework, requestContext, query, bucketSelector, handleAfterKey);
>(callClusterFactory(framework, requestContext), query, bucketSelector, handleAfterKey);
};

const requestNodeMetrics = async (
Expand Down Expand Up @@ -170,7 +175,7 @@ const requestNodeMetrics = async (
return await getAllCompositeData<
InfraSnapshotAggregationResponse,
InfraSnapshotNodeMetricsBucket
>(framework, requestContext, query, bucketSelector, handleAfterKey);
>(callClusterFactory(framework, requestContext), query, bucketSelector, handleAfterKey);
};

// buckets can be InfraSnapshotNodeGroupByBucket[] or InfraSnapshotNodeMetricsBucket[]
Expand Down
14 changes: 3 additions & 11 deletions x-pack/plugins/infra/server/utils/get_all_composite_data.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,27 +4,20 @@
* you may not use this file except in compliance with the Elastic License.
*/

import { RequestHandlerContext } from 'src/core/server';
import { KibanaFramework } from '../lib/adapters/framework/kibana_framework_adapter';
import { InfraDatabaseSearchResponse } from '../lib/adapters/framework';

export const getAllCompositeData = async <
Aggregation = undefined,
Bucket = {},
Options extends object = {}
>(
framework: KibanaFramework,
requestContext: RequestHandlerContext,
callCluster: (options: Options) => Promise<InfraDatabaseSearchResponse<{}, Aggregation>>,
options: Options,
bucketSelector: (response: InfraDatabaseSearchResponse<{}, Aggregation>) => Bucket[],
onAfterKey: (options: Options, response: InfraDatabaseSearchResponse<{}, Aggregation>) => Options,
previousBuckets: Bucket[] = []
): Promise<Bucket[]> => {
const response = await framework.callWithRequest<{}, Aggregation>(
requestContext,
'search',
options
);
const response = await callCluster(options);

// Nothing available, return the previous buckets.
if (response.hits.total.value === 0) {
Expand All @@ -46,8 +39,7 @@ export const getAllCompositeData = async <
// There is possibly more data, concat previous and current buckets and call ourselves recursively.
const newOptions = onAfterKey(options, response);
return getAllCompositeData(
framework,
requestContext,
callCluster,
newOptions,
bucketSelector,
onAfterKey,
Expand Down

0 comments on commit 8417b98

Please sign in to comment.