From ba3b2f7a382ef7c369f02c7939e1495f72d92bfe Mon Sep 17 00:00:00 2001 From: Madison Caldwell Date: Mon, 14 Jun 2021 12:19:54 -0400 Subject: [PATCH] Prevent max_signals from being exceeded on threat match rule executions --- .../signals/search_after_bulk_create.ts | 41 +++++++++++++++++-- .../threat_mapping/create_threat_signal.ts | 2 + .../threat_mapping/create_threat_signals.ts | 5 +++ .../signals/threat_mapping/types.ts | 2 + .../lib/detection_engine/signals/types.ts | 5 +++ .../lib/detection_engine/signals/utils.ts | 20 +++++++++ 6 files changed, 71 insertions(+), 4 deletions(-) diff --git a/x-pack/plugins/security_solution/server/lib/detection_engine/signals/search_after_bulk_create.ts b/x-pack/plugins/security_solution/server/lib/detection_engine/signals/search_after_bulk_create.ts index eb4af0c38ce254..be41e366c6d533 100644 --- a/x-pack/plugins/security_solution/server/lib/detection_engine/signals/search_after_bulk_create.ts +++ b/x-pack/plugins/security_solution/server/lib/detection_engine/signals/search_after_bulk_create.ts @@ -18,10 +18,13 @@ import { mergeReturns, mergeSearchResults, getSafeSortIds, + getLock, + releaseLock, } from './utils'; import { SearchAfterAndBulkCreateParams, SearchAfterAndBulkCreateReturnType } from './types'; // search_after through documents and re-index using bulk endpoint. +// eslint-disable-next-line complexity export const searchAfterAndBulkCreate = async ({ tuple, ruleSO, @@ -37,6 +40,7 @@ export const searchAfterAndBulkCreate = async ({ enrichment = identity, bulkCreate, wrapHits, + state, }: SearchAfterAndBulkCreateParams): Promise => { const ruleParams = ruleSO.attributes.params; let toReturn = createSearchAfterReturnType(); @@ -48,16 +52,22 @@ export const searchAfterAndBulkCreate = async ({ // signalsCreatedCount keeps track of how many signals we have created, // to ensure we don't exceed maxSignals let signalsCreatedCount = 0; + const signalsAlreadyCreated = () => state?.signalsCreated || 0; + const totalSignalsCreated = (_signalsCreatedCount: number): number => { + return _signalsCreatedCount + signalsAlreadyCreated(); + }; if (tuple == null || tuple.to == null || tuple.from == null) { logger.error(buildRuleMessage(`[-] malformed date tuple`)); + if (state != null) { + releaseLock(state); + } return createSearchAfterReturnType({ success: false, errors: ['malformed date tuple'], }); } - signalsCreatedCount = 0; - while (signalsCreatedCount < tuple.maxSignals) { + while (totalSignalsCreated(signalsCreatedCount) < tuple.maxSignals) { try { let mergedSearchResults = createSearchResultReturnType(); logger.debug(buildRuleMessage(`sortIds: ${sortIds}`)); @@ -134,11 +144,25 @@ export const searchAfterAndBulkCreate = async ({ // skip the call to bulk create and proceed to the next search_after, // if there is a sort id to continue the search_after with. if (filteredEvents.hits.hits.length !== 0) { + if (state != null) { + const error = await getLock(state); + if (error != null) { + logger.error(buildRuleMessage(error)); + return createSearchAfterReturnType({ + success: false, + errors: [error], + }); + } + } + // make sure we are not going to create more signals than maxSignals allows - if (signalsCreatedCount + filteredEvents.hits.hits.length > tuple.maxSignals) { + if ( + totalSignalsCreated(signalsCreatedCount) + filteredEvents.hits.hits.length > + tuple.maxSignals + ) { filteredEvents.hits.hits = filteredEvents.hits.hits.slice( 0, - tuple.maxSignals - signalsCreatedCount + tuple.maxSignals - totalSignalsCreated(signalsCreatedCount) ); } const enrichedEvents = await enrichment(filteredEvents); @@ -162,6 +186,12 @@ export const searchAfterAndBulkCreate = async ({ }), ]); signalsCreatedCount += createdCount; + if (state != null) { + // Protected by lock + // eslint-disable-next-line require-atomic-updates + state.signalsCreated += createdCount; + releaseLock(state); + } logger.debug(buildRuleMessage(`created ${createdCount} signals`)); logger.debug(buildRuleMessage(`signalsCreatedCount: ${signalsCreatedCount}`)); logger.debug( @@ -177,6 +207,9 @@ export const searchAfterAndBulkCreate = async ({ } } catch (exc: unknown) { logger.error(buildRuleMessage(`[-] search_after and bulk threw an error ${exc}`)); + if (state != null) { + releaseLock(state); + } return mergeReturns([ toReturn, createSearchAfterReturnType({ diff --git a/x-pack/plugins/security_solution/server/lib/detection_engine/signals/threat_mapping/create_threat_signal.ts b/x-pack/plugins/security_solution/server/lib/detection_engine/signals/threat_mapping/create_threat_signal.ts index 806f5e47608e40..3901db648b58ad 100644 --- a/x-pack/plugins/security_solution/server/lib/detection_engine/signals/threat_mapping/create_threat_signal.ts +++ b/x-pack/plugins/security_solution/server/lib/detection_engine/signals/threat_mapping/create_threat_signal.ts @@ -36,6 +36,7 @@ export const createThreatSignal = async ({ currentResult, bulkCreate, wrapHits, + signalState, }: CreateThreatSignalOptions): Promise => { const threatFilter = buildThreatMappingFilter({ threatMapping, @@ -86,6 +87,7 @@ export const createThreatSignal = async ({ enrichment: threatEnrichment, bulkCreate, wrapHits, + state: signalState, }); logger.debug( buildRuleMessage( diff --git a/x-pack/plugins/security_solution/server/lib/detection_engine/signals/threat_mapping/create_threat_signals.ts b/x-pack/plugins/security_solution/server/lib/detection_engine/signals/threat_mapping/create_threat_signals.ts index 169a820392a6e6..b2b4c1c899f852 100644 --- a/x-pack/plugins/security_solution/server/lib/detection_engine/signals/threat_mapping/create_threat_signals.ts +++ b/x-pack/plugins/security_solution/server/lib/detection_engine/signals/threat_mapping/create_threat_signals.ts @@ -101,6 +101,10 @@ export const createThreatSignals = async ({ while (threatList.hits.hits.length !== 0) { const chunks = chunk(itemsPerSearch, threatList.hits.hits); logger.debug(buildRuleMessage(`${chunks.length} concurrent indicator searches are starting.`)); + const signalState = { + isLocked: false, + signalsCreated: 0, + }; const concurrentSearchesPerformed = chunks.map>( (slicedChunk) => createThreatSignal({ @@ -127,6 +131,7 @@ export const createThreatSignals = async ({ currentResult: results, bulkCreate, wrapHits, + signalState, }) ); const searchesPerformed = await Promise.all(concurrentSearchesPerformed); diff --git a/x-pack/plugins/security_solution/server/lib/detection_engine/signals/threat_mapping/types.ts b/x-pack/plugins/security_solution/server/lib/detection_engine/signals/threat_mapping/types.ts index ded79fc647ac41..0c4a4869fbc446 100644 --- a/x-pack/plugins/security_solution/server/lib/detection_engine/signals/threat_mapping/types.ts +++ b/x-pack/plugins/security_solution/server/lib/detection_engine/signals/threat_mapping/types.ts @@ -32,6 +32,7 @@ import { BulkCreate, RuleRangeTuple, SearchAfterAndBulkCreateReturnType, + SearchAfterAndBulkCreateState, SignalsEnrichment, WrapHits, } from '../types'; @@ -93,6 +94,7 @@ export interface CreateThreatSignalOptions { currentResult: SearchAfterAndBulkCreateReturnType; bulkCreate: BulkCreate; wrapHits: WrapHits; + signalState: SearchAfterAndBulkCreateState; } export interface BuildThreatMappingFilterOptions { diff --git a/x-pack/plugins/security_solution/server/lib/detection_engine/signals/types.ts b/x-pack/plugins/security_solution/server/lib/detection_engine/signals/types.ts index 8a6ce91b2575ab..d29f99a48c12df 100644 --- a/x-pack/plugins/security_solution/server/lib/detection_engine/signals/types.ts +++ b/x-pack/plugins/security_solution/server/lib/detection_engine/signals/types.ts @@ -261,6 +261,10 @@ export type WrapHits = ( hits: Array> ) => Array>; +export interface SearchAfterAndBulkCreateState { + isLocked: boolean; + signalsCreated: number; +} export interface SearchAfterAndBulkCreateParams { tuple: { to: moment.Moment; @@ -282,6 +286,7 @@ export interface SearchAfterAndBulkCreateParams { enrichment?: SignalsEnrichment; bulkCreate: BulkCreate; wrapHits: WrapHits; + state?: SearchAfterAndBulkCreateState | undefined; } export interface SearchAfterAndBulkCreateReturnType { diff --git a/x-pack/plugins/security_solution/server/lib/detection_engine/signals/utils.ts b/x-pack/plugins/security_solution/server/lib/detection_engine/signals/utils.ts index 6d67bab6eb2f76..1220e637a76da8 100644 --- a/x-pack/plugins/security_solution/server/lib/detection_engine/signals/utils.ts +++ b/x-pack/plugins/security_solution/server/lib/detection_engine/signals/utils.ts @@ -914,3 +914,23 @@ export const buildChunkedOrFilter = (field: string, values: string[], chunkSize: }) .join(' OR '); }; + +const LOCK_INTERVAL_MS = 250; +const LOCK_MAX_TRIES = 100; +const sleep = (ms: number) => new Promise((resolve) => setTimeout(resolve, ms)); + +export const getLock = async (signalState: { isLocked: boolean }): Promise => { + let tries = 0; + while (signalState.isLocked && tries < LOCK_MAX_TRIES) { + await sleep(LOCK_INTERVAL_MS); + tries++; + } + if (!signalState.isLocked) { + signalState.isLocked = true; + } + return `Error retrieving lock after {tries} tries.`; +}; + +export const releaseLock = (signalState: { isLocked: boolean }) => { + signalState.isLocked = false; +};