Skip to content

Commit

Permalink
renames wrapHits
Browse files Browse the repository at this point in the history
  • Loading branch information
ecezalp committed May 14, 2021
1 parent 97975bc commit ab31bc4
Show file tree
Hide file tree
Showing 12 changed files with 80 additions and 130 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,11 @@ import {
AlertInstanceState,
AlertServices,
} from '../../../../../alerting/server';
import { SingleBulkCreateResponse } from './single_bulk_create';
import { GenericBulkCreateResponse } from './single_bulk_create';
import { AnomalyResults, Anomaly } from '../../machine_learning';
import { BuildRuleMessage } from './rule_messages';
import { AlertAttributes, BulkCreate } from './types';
import { AlertAttributes, BulkCreate, WrapHits } from './types';
import { MachineLearningRuleParams } from '../schemas/rule_schemas';
import { filterAndWrapDocuments } from './search_after_bulk_create';

interface BulkCreateMlSignalsParams {
someResult: AnomalyResults;
Expand All @@ -30,6 +29,7 @@ interface BulkCreateMlSignalsParams {
signalsIndex: string;
buildRuleMessage: BuildRuleMessage;
bulkCreate: BulkCreate;
wrapHits: WrapHits;
}

interface EcsAnomaly extends Anomaly {
Expand Down Expand Up @@ -85,20 +85,10 @@ const transformAnomalyResultsToEcs = (

export const bulkCreateMlSignals = async (
params: BulkCreateMlSignalsParams
): Promise<SingleBulkCreateResponse> => {
): Promise<GenericBulkCreateResponse<{}>> => {
const anomalyResults = params.someResult;
const ecsResults = transformAnomalyResultsToEcs(anomalyResults);
const buildRuleMessage = params.buildRuleMessage;
console.log('wrapping documents');
const wrappedDocs = filterAndWrapDocuments({
enrichedEvents: ecsResults,
buildRuleMessage,
id: params.id,
logger: params.logger,
signalsIndex: params.signalsIndex,
ruleSO: params.ruleSO,
});
console.log(JSON.stringify(wrappedDocs));
console.log('bulk creating ml signals');

const wrappedDocs = params.wrapHits(ecsResults.hits.hits);
return params.bulkCreate(wrappedDocs);
};
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import { filterEventsAgainstList } from '../filters/filter_events_against_list';
import { findMlSignals } from '../find_ml_signals';
import { BuildRuleMessage } from '../rule_messages';
import { RuleStatusService } from '../rule_status_service';
import { AlertAttributes, BulkCreate } from '../types';
import { AlertAttributes, BulkCreate, WrapHits } from '../types';
import { createErrorsFromShard, createSearchAfterReturnType, mergeReturns } from '../utils';

export const mlExecutor = async ({
Expand All @@ -35,6 +35,7 @@ export const mlExecutor = async ({
logger,
buildRuleMessage,
bulkCreate,
wrapHits,
}: {
rule: SavedObject<AlertAttributes<MachineLearningRuleParams>>;
ml: SetupPlugins['ml'];
Expand All @@ -45,6 +46,7 @@ export const mlExecutor = async ({
logger: Logger;
buildRuleMessage: BuildRuleMessage;
bulkCreate: BulkCreate;
wrapHits: WrapHits;
}) => {
const result = createSearchAfterReturnType();
const ruleParams = rule.attributes.params;
Expand Down Expand Up @@ -121,8 +123,8 @@ export const mlExecutor = async ({
signalsIndex: ruleParams.outputIndex,
buildRuleMessage,
bulkCreate,
wrapHits,
});
console.log('finished bulk create ml signals');
// The legacy ES client does not define failures when it can be present on the structure, hence why I have the & { failures: [] }
const shardFailures =
(filteredAnomalyResults._shards as typeof filteredAnomalyResults._shards & {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ export const queryExecutor = async ({
eventsTelemetry,
buildRuleMessage,
bulkCreate,
wrapSignals,
wrapHits,
}: {
rule: SavedObject<AlertAttributes<QueryRuleParams | SavedQueryRuleParams>>;
tuples: RuleRangeTuple[];
Expand All @@ -47,7 +47,7 @@ export const queryExecutor = async ({
eventsTelemetry: TelemetryEventsSender | undefined;
buildRuleMessage: BuildRuleMessage;
bulkCreate: BulkCreate;
wrapSignals: WrapHits;
wrapHits: WrapHits;
}) => {
const ruleParams = rule.attributes.params;
const inputIndex = await getInputIndex(services, version, ruleParams.index);
Expand Down Expand Up @@ -77,6 +77,6 @@ export const queryExecutor = async ({
pageSize: searchAfterSize,
buildRuleMessage,
bulkCreate,
wrapSignals,
wrapHits,
});
};
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ export const threatMatchExecutor = async ({
eventsTelemetry,
buildRuleMessage,
bulkCreate,
wrapSignals,
wrapHits,
}: {
rule: SavedObject<AlertAttributes<ThreatRuleParams>>;
tuples: RuleRangeTuple[];
Expand All @@ -46,7 +46,7 @@ export const threatMatchExecutor = async ({
eventsTelemetry: TelemetryEventsSender | undefined;
buildRuleMessage: BuildRuleMessage;
bulkCreate: BulkCreate;
wrapSignals: WrapHits;
wrapHits: WrapHits;
}) => {
const ruleParams = rule.attributes.params;
const inputIndex = await getInputIndex(services, version, ruleParams.index);
Expand Down Expand Up @@ -77,6 +77,6 @@ export const threatMatchExecutor = async ({
concurrentSearches: ruleParams.concurrentSearches ?? 1,
itemsPerSearch: ruleParams.itemsPerSearch ?? 9000,
bulkCreate,
wrapSignals,
wrapHits,
});
};
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import { ExceptionListItemSchema } from '../../../../../common/shared_imports';
import { ThresholdRuleParams } from '../../schemas/rule_schemas';
import { getFilter } from '../get_filter';
import { getInputIndex } from '../get_input_output_index';
import { BuildRuleMessage } from '../rule_messages';
import { RuleStatusService } from '../rule_status_service';
import {
bulkCreateThresholdSignals,
Expand All @@ -30,12 +29,14 @@ import {
BulkCreate,
RuleRangeTuple,
SearchAfterAndBulkCreateReturnType,
WrapHits,
} from '../types';
import {
createSearchAfterReturnType,
createSearchAfterReturnTypeFromResponse,
mergeReturns,
} from '../utils';
import { BuildRuleMessage } from '../rule_messages';

export const thresholdExecutor = async ({
rule,
Expand All @@ -48,6 +49,7 @@ export const thresholdExecutor = async ({
buildRuleMessage,
startedAt,
bulkCreate,
wrapHits,
}: {
rule: SavedObject<AlertAttributes<ThresholdRuleParams>>;
tuples: RuleRangeTuple[];
Expand All @@ -59,6 +61,7 @@ export const thresholdExecutor = async ({
buildRuleMessage: BuildRuleMessage;
startedAt: Date;
bulkCreate: BulkCreate;
wrapHits: WrapHits;
}): Promise<SearchAfterAndBulkCreateReturnType> => {
let result = createSearchAfterReturnType();
const ruleParams = rule.attributes.params;
Expand Down Expand Up @@ -130,14 +133,13 @@ export const thresholdExecutor = async ({
filter: esFilter,
services,
logger,
id: rule.id,
inputIndexPattern: inputIndex,
signalsIndex: ruleParams.outputIndex,
startedAt,
from: tuple.from.toDate(),
thresholdSignalHistory,
buildRuleMessage,
bulkCreate,
wrapHits,
});

result = mergeReturns([
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,7 @@

import { identity } from 'lodash';
import { SortResults } from '@elastic/elasticsearch/api/types';
import { Logger } from '@kbn/logging';
import { singleSearchAfter } from './single_search_after';
import { filterDuplicateRules, filterDuplicateSignals } from './single_bulk_create';
import { filterEventsAgainstList } from './filters/filter_events_against_list';
import { sendAlertTelemetryEvents } from './send_telemetry_events';
import {
Expand All @@ -20,17 +18,8 @@ import {
mergeReturns,
mergeSearchResults,
getSafeSortIds,
generateId,
} from './utils';
import {
SearchAfterAndBulkCreateParams,
SearchAfterAndBulkCreateReturnType,
SignalSearchResponse,
WrapHits,
WrappedSignalHit,
} from './types';
import { buildBulkBody } from './build_bulk_body';
import { BuildRuleMessage } from './rule_messages';
import { SearchAfterAndBulkCreateParams, SearchAfterAndBulkCreateReturnType } from './types';

// search_after through documents and re-index using bulk endpoint.
export const searchAfterAndBulkCreate = async ({
Expand All @@ -41,15 +30,13 @@ export const searchAfterAndBulkCreate = async ({
listClient,
logger,
eventsTelemetry,
id,
inputIndexPattern,
signalsIndex,
filter,
pageSize,
buildRuleMessage,
enrichment = identity,
bulkCreate,
wrapSignals,
wrapHits,
}: SearchAfterAndBulkCreateParams): Promise<SearchAfterAndBulkCreateReturnType> => {
const ruleParams = ruleSO.attributes.params;
let toReturn = createSearchAfterReturnType();
Expand Down Expand Up @@ -160,7 +147,7 @@ export const searchAfterAndBulkCreate = async ({
);
}
const enrichedEvents = await enrichment(filteredEvents);
const wrappedDocs = wrapSignals(enrichedEvents.hits.hits);
const wrappedDocs = wrapHits(enrichedEvents.hits.hits);

const {
bulkCreateDuration: bulkDuration,
Expand Down Expand Up @@ -209,59 +196,3 @@ export const searchAfterAndBulkCreate = async ({
toReturn.totalToFromTuples = tuplesToBeLogged;
return toReturn;
};

export const buildWrappedSignalsFactory = ({
ruleSO,
signalsIndex,
}: {
ruleSO: SearchAfterAndBulkCreateParams['ruleSO'];
signalsIndex: string;
}): WrapHits => (events) => {
const wrappedDocs: WrappedSignalHit[] = events.flatMap((doc) => [
{
_index: signalsIndex,
_id: generateId(
doc._index,
doc._id,
doc._version ? doc._version.toString() : '',
ruleSO.attributes.params.ruleId ?? ''
),
_source: buildBulkBody(ruleSO, doc),
},
]);

return filterDuplicateSignals(ruleSO.id, wrappedDocs);
};

export const filterAndWrapDocuments = ({
buildRuleMessage,
enrichedEvents,
id,
logger,
ruleSO,
signalsIndex,
}: {
buildRuleMessage: BuildRuleMessage;
enrichedEvents: SignalSearchResponse;
id: string;
logger: Logger;
ruleSO: SearchAfterAndBulkCreateParams['ruleSO'];
signalsIndex: string;
}) => {
enrichedEvents.hits.hits = filterDuplicateRules(id, enrichedEvents);
logger.debug(buildRuleMessage(`about to bulk create ${enrichedEvents.hits.hits.length} events`));

const wrappedDocs: WrappedSignalHit[] = enrichedEvents.hits.hits.flatMap((doc) => [
{
_index: signalsIndex,
_id: generateId(
doc._index,
doc._id,
doc._version ? doc._version.toString() : '',
ruleSO.attributes.params.ruleId ?? ''
),
_source: buildBulkBody(ruleSO, doc),
},
]);
return wrappedDocs;
};
Loading

0 comments on commit ab31bc4

Please sign in to comment.