Skip to content

Commit

Permalink
Prevent max_signals from being exceeded on threat match rule executions
Browse files Browse the repository at this point in the history
  • Loading branch information
madirey committed Jun 14, 2021
1 parent cf3da56 commit ba3b2f7
Show file tree
Hide file tree
Showing 6 changed files with 71 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,13 @@ import {
mergeReturns,
mergeSearchResults,
getSafeSortIds,
getLock,
releaseLock,
} from './utils';
import { SearchAfterAndBulkCreateParams, SearchAfterAndBulkCreateReturnType } from './types';

// search_after through documents and re-index using bulk endpoint.
// eslint-disable-next-line complexity
export const searchAfterAndBulkCreate = async ({
tuple,
ruleSO,
Expand All @@ -37,6 +40,7 @@ export const searchAfterAndBulkCreate = async ({
enrichment = identity,
bulkCreate,
wrapHits,
state,
}: SearchAfterAndBulkCreateParams): Promise<SearchAfterAndBulkCreateReturnType> => {
const ruleParams = ruleSO.attributes.params;
let toReturn = createSearchAfterReturnType();
Expand All @@ -48,16 +52,22 @@ export const searchAfterAndBulkCreate = async ({
// signalsCreatedCount keeps track of how many signals we have created,
// to ensure we don't exceed maxSignals
let signalsCreatedCount = 0;
const signalsAlreadyCreated = () => state?.signalsCreated || 0;
const totalSignalsCreated = (_signalsCreatedCount: number): number => {
return _signalsCreatedCount + signalsAlreadyCreated();
};

if (tuple == null || tuple.to == null || tuple.from == null) {
logger.error(buildRuleMessage(`[-] malformed date tuple`));
if (state != null) {
releaseLock(state);
}
return createSearchAfterReturnType({
success: false,
errors: ['malformed date tuple'],
});
}
signalsCreatedCount = 0;
while (signalsCreatedCount < tuple.maxSignals) {
while (totalSignalsCreated(signalsCreatedCount) < tuple.maxSignals) {
try {
let mergedSearchResults = createSearchResultReturnType();
logger.debug(buildRuleMessage(`sortIds: ${sortIds}`));
Expand Down Expand Up @@ -134,11 +144,25 @@ export const searchAfterAndBulkCreate = async ({
// skip the call to bulk create and proceed to the next search_after,
// if there is a sort id to continue the search_after with.
if (filteredEvents.hits.hits.length !== 0) {
if (state != null) {
const error = await getLock(state);
if (error != null) {
logger.error(buildRuleMessage(error));
return createSearchAfterReturnType({
success: false,
errors: [error],
});
}
}

// make sure we are not going to create more signals than maxSignals allows
if (signalsCreatedCount + filteredEvents.hits.hits.length > tuple.maxSignals) {
if (
totalSignalsCreated(signalsCreatedCount) + filteredEvents.hits.hits.length >
tuple.maxSignals
) {
filteredEvents.hits.hits = filteredEvents.hits.hits.slice(
0,
tuple.maxSignals - signalsCreatedCount
tuple.maxSignals - totalSignalsCreated(signalsCreatedCount)
);
}
const enrichedEvents = await enrichment(filteredEvents);
Expand All @@ -162,6 +186,12 @@ export const searchAfterAndBulkCreate = async ({
}),
]);
signalsCreatedCount += createdCount;
if (state != null) {
// Protected by lock
// eslint-disable-next-line require-atomic-updates
state.signalsCreated += createdCount;
releaseLock(state);
}
logger.debug(buildRuleMessage(`created ${createdCount} signals`));
logger.debug(buildRuleMessage(`signalsCreatedCount: ${signalsCreatedCount}`));
logger.debug(
Expand All @@ -177,6 +207,9 @@ export const searchAfterAndBulkCreate = async ({
}
} catch (exc: unknown) {
logger.error(buildRuleMessage(`[-] search_after and bulk threw an error ${exc}`));
if (state != null) {
releaseLock(state);
}
return mergeReturns([
toReturn,
createSearchAfterReturnType({
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ export const createThreatSignal = async ({
currentResult,
bulkCreate,
wrapHits,
signalState,
}: CreateThreatSignalOptions): Promise<SearchAfterAndBulkCreateReturnType> => {
const threatFilter = buildThreatMappingFilter({
threatMapping,
Expand Down Expand Up @@ -86,6 +87,7 @@ export const createThreatSignal = async ({
enrichment: threatEnrichment,
bulkCreate,
wrapHits,
state: signalState,
});
logger.debug(
buildRuleMessage(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,10 @@ export const createThreatSignals = async ({
while (threatList.hits.hits.length !== 0) {
const chunks = chunk(itemsPerSearch, threatList.hits.hits);
logger.debug(buildRuleMessage(`${chunks.length} concurrent indicator searches are starting.`));
const signalState = {
isLocked: false,
signalsCreated: 0,
};
const concurrentSearchesPerformed = chunks.map<Promise<SearchAfterAndBulkCreateReturnType>>(
(slicedChunk) =>
createThreatSignal({
Expand All @@ -127,6 +131,7 @@ export const createThreatSignals = async ({
currentResult: results,
bulkCreate,
wrapHits,
signalState,
})
);
const searchesPerformed = await Promise.all(concurrentSearchesPerformed);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import {
BulkCreate,
RuleRangeTuple,
SearchAfterAndBulkCreateReturnType,
SearchAfterAndBulkCreateState,
SignalsEnrichment,
WrapHits,
} from '../types';
Expand Down Expand Up @@ -93,6 +94,7 @@ export interface CreateThreatSignalOptions {
currentResult: SearchAfterAndBulkCreateReturnType;
bulkCreate: BulkCreate;
wrapHits: WrapHits;
signalState: SearchAfterAndBulkCreateState;
}

export interface BuildThreatMappingFilterOptions {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,10 @@ export type WrapHits = (
hits: Array<estypes.SearchHit<unknown>>
) => Array<BaseHit<{ '@timestamp': string }>>;

export interface SearchAfterAndBulkCreateState {
isLocked: boolean;
signalsCreated: number;
}
export interface SearchAfterAndBulkCreateParams {
tuple: {
to: moment.Moment;
Expand All @@ -282,6 +286,7 @@ export interface SearchAfterAndBulkCreateParams {
enrichment?: SignalsEnrichment;
bulkCreate: BulkCreate;
wrapHits: WrapHits;
state?: SearchAfterAndBulkCreateState | undefined;
}

export interface SearchAfterAndBulkCreateReturnType {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -914,3 +914,23 @@ export const buildChunkedOrFilter = (field: string, values: string[], chunkSize:
})
.join(' OR ');
};

const LOCK_INTERVAL_MS = 250;
const LOCK_MAX_TRIES = 100;
const sleep = (ms: number) => new Promise((resolve) => setTimeout(resolve, ms));

export const getLock = async (signalState: { isLocked: boolean }): Promise<string | undefined> => {
let tries = 0;
while (signalState.isLocked && tries < LOCK_MAX_TRIES) {
await sleep(LOCK_INTERVAL_MS);
tries++;
}
if (!signalState.isLocked) {
signalState.isLocked = true;
}
return `Error retrieving lock after {tries} tries.`;
};

export const releaseLock = (signalState: { isLocked: boolean }) => {
signalState.isLocked = false;
};

0 comments on commit ba3b2f7

Please sign in to comment.