From 8acfb0e2aa4d769ab41a0cd1067557bd7dbdbba0 Mon Sep 17 00:00:00 2001 From: Dima Arnautov Date: Wed, 9 Jun 2021 11:00:05 +0200 Subject: [PATCH] [ML] Remove script fields from the Anomaly detection alerting rule executor (#101607) * [ML] remove script fields * [ML] fix initial score --- .../ml/server/lib/alerts/alerting_service.ts | 138 +++++------------- 1 file changed, 39 insertions(+), 99 deletions(-) diff --git a/x-pack/plugins/ml/server/lib/alerts/alerting_service.ts b/x-pack/plugins/ml/server/lib/alerts/alerting_service.ts index 0d4d117b69bf30..e7d3ef97a301b4 100644 --- a/x-pack/plugins/ml/server/lib/alerts/alerting_service.ts +++ b/x-pack/plugins/ml/server/lib/alerts/alerting_service.ts @@ -41,6 +41,8 @@ type AggResultsResponse = { key?: number } & { }; }; +const TIME_RANGE_PADDING = 10; + /** * Mapping for result types and corresponding score fields. */ @@ -63,43 +65,6 @@ export function alertingServiceProvider(mlClient: MlClient, datafeedsService: Da }; }; - const getCommonScriptedFields = () => { - return { - start: { - script: { - lang: 'painless', - source: `LocalDateTime.ofEpochSecond((doc["timestamp"].value.getMillis()-((doc["bucket_span"].value * 1000) - * params.padding)) / 1000, 0, ZoneOffset.UTC).toString()+\":00.000Z\"`, - params: { - padding: 10, - }, - }, - }, - end: { - script: { - lang: 'painless', - source: `LocalDateTime.ofEpochSecond((doc["timestamp"].value.getMillis()+((doc["bucket_span"].value * 1000) - * params.padding)) / 1000, 0, ZoneOffset.UTC).toString()+\":00.000Z\"`, - params: { - padding: 10, - }, - }, - }, - timestamp_epoch: { - script: { - lang: 'painless', - source: 'doc["timestamp"].value.getMillis()/1000', - }, - }, - timestamp_iso8601: { - script: { - lang: 'painless', - source: 'doc["timestamp"].value', - }, - }, - }; - }; - /** * Builds an agg query based on the requested result type. * @param resultType @@ -110,9 +75,9 @@ export function alertingServiceProvider(mlClient: MlClient, datafeedsService: Da severity: number, useInitialScore?: boolean ) => { - const influencerScoreField = `${useInitialScore ? 'initial_' : ''}influencer_score`; - const recordScoreField = `${useInitialScore ? 'initial_' : ''}record_score`; - const bucketScoreField = `${useInitialScore ? 'initial_' : ''}anomaly_score`; + const influencerScoreField = getScoreFields(ANOMALY_RESULT_TYPE.INFLUENCER, useInitialScore); + const recordScoreField = getScoreFields(ANOMALY_RESULT_TYPE.RECORD, useInitialScore); + const bucketScoreField = getScoreFields(ANOMALY_RESULT_TYPE.BUCKET, useInitialScore); return { influencer_results: { @@ -140,27 +105,13 @@ export function alertingServiceProvider(mlClient: MlClient, datafeedsService: Da 'influencer_field_name', 'influencer_field_value', 'influencer_score', + 'initial_influencer_score', 'is_interim', 'job_id', + 'bucket_span', ], }, size: 3, - script_fields: { - ...getCommonScriptedFields(), - score: { - script: { - lang: 'painless', - source: `Math.floor(doc["${influencerScoreField}"].value)`, - }, - }, - unique_key: { - script: { - lang: 'painless', - source: - 'doc["timestamp"].value + "_" + doc["influencer_field_name"].value + "_" + doc["influencer_field_value"].value', - }, - }, - }, }, }, }, @@ -188,6 +139,7 @@ export function alertingServiceProvider(mlClient: MlClient, datafeedsService: Da 'result_type', 'timestamp', 'record_score', + 'initial_record_score', 'is_interim', 'function', 'field_name', @@ -199,24 +151,10 @@ export function alertingServiceProvider(mlClient: MlClient, datafeedsService: Da 'partition_field_value', 'job_id', 'detector_index', + 'bucket_span', ], }, size: 3, - script_fields: { - ...getCommonScriptedFields(), - score: { - script: { - lang: 'painless', - source: `Math.floor(doc["${recordScoreField}"].value)`, - }, - }, - unique_key: { - script: { - lang: 'painless', - source: 'doc["timestamp"].value + "_" + doc["function"].value', - }, - }, - }, }, }, }, @@ -247,25 +185,12 @@ export function alertingServiceProvider(mlClient: MlClient, datafeedsService: Da 'result_type', 'timestamp', 'anomaly_score', + 'initial_anomaly_score', 'is_interim', + 'bucket_span', ], }, size: 1, - script_fields: { - ...getCommonScriptedFields(), - score: { - script: { - lang: 'painless', - source: `Math.floor(doc["${bucketScoreField}"].value)`, - }, - }, - unique_key: { - script: { - lang: 'painless', - source: 'doc["timestamp"].value', - }, - }, - }, }, }, }, @@ -282,6 +207,10 @@ export function alertingServiceProvider(mlClient: MlClient, datafeedsService: Da return source.job_id; }; + const getScoreFields = (resultType: AnomalyResultType, useInitialScore?: boolean) => { + return `${useInitialScore ? 'initial_' : ''}${resultTypeScoreMapping[resultType]}`; + }; + const getRecordKey = (source: AnomalyRecordDoc): string => { let alertInstanceKey = `${source.job_id}_${source.timestamp}`; @@ -294,18 +223,23 @@ export function alertingServiceProvider(mlClient: MlClient, datafeedsService: Da return alertInstanceKey; }; - const getResultsFormatter = (resultType: AnomalyResultType) => { + /** + * Returns a callback for formatting elasticsearch aggregation response + * to the alert context. + * @param resultType + */ + const getResultsFormatter = (resultType: AnomalyResultType, useInitialScore: boolean = false) => { const resultsLabel = getAggResultsLabel(resultType); return (v: AggResultsResponse): AlertExecutionResult | undefined => { const aggTypeResults = v[resultsLabel.aggGroupLabel]; if (aggTypeResults.doc_count === 0) { return; } - const requestedAnomalies = aggTypeResults[resultsLabel.topHitsLabel].hits.hits; - const topAnomaly = requestedAnomalies[0]; const alertInstanceKey = getAlertInstanceKey(topAnomaly._source); + const timestamp = topAnomaly._source.timestamp; + const bucketSpanInSeconds = topAnomaly._source.bucket_span; return { count: aggTypeResults.doc_count, @@ -315,26 +249,32 @@ export function alertingServiceProvider(mlClient: MlClient, datafeedsService: Da alertInstanceKey, jobIds: [...new Set(requestedAnomalies.map((h) => h._source.job_id))], isInterim: requestedAnomalies.some((h) => h._source.is_interim), - timestamp: topAnomaly._source.timestamp, - timestampIso8601: topAnomaly.fields.timestamp_iso8601[0], - timestampEpoch: topAnomaly.fields.timestamp_epoch[0], - score: topAnomaly.fields.score[0], + timestamp, + timestampIso8601: new Date(timestamp).toISOString(), + timestampEpoch: timestamp / 1000, + score: Math.floor(topAnomaly._source[getScoreFields(resultType, useInitialScore)]), bucketRange: { - start: topAnomaly.fields.start[0], - end: topAnomaly.fields.end[0], + start: new Date( + timestamp - bucketSpanInSeconds * 1000 * TIME_RANGE_PADDING + ).toISOString(), + end: new Date(timestamp + bucketSpanInSeconds * 1000 * TIME_RANGE_PADDING).toISOString(), }, topRecords: v.record_results.top_record_hits.hits.hits.map((h) => { return { ...h._source, - score: h.fields.score[0], + score: Math.floor( + h._source[getScoreFields(ANOMALY_RESULT_TYPE.RECORD, useInitialScore)] + ), unique_key: getRecordKey(h._source), }; }) as RecordAnomalyAlertDoc[], topInfluencers: v.influencer_results.top_influencer_hits.hits.hits.map((h) => { return { ...h._source, - score: h.fields.score[0], - unique_key: h.fields.unique_key[0], + score: Math.floor( + h._source[getScoreFields(ANOMALY_RESULT_TYPE.INFLUENCER, useInitialScore)] + ), + unique_key: `${h._source.timestamp}_${h._source.influencer_field_name}_${h._source.influencer_field_value}`, }; }) as InfluencerAnomalyAlertDoc[], }; @@ -447,7 +387,7 @@ export function alertingServiceProvider(mlClient: MlClient, datafeedsService: Da const resultsLabel = getAggResultsLabel(params.resultType); - const formatter = getResultsFormatter(params.resultType); + const formatter = getResultsFormatter(params.resultType, !!previewTimeInterval); return (previewTimeInterval ? (result as {