From c8e9036b20949eabd035e2c84031bf8b91c99d11 Mon Sep 17 00:00:00 2001 From: Dmitrii Date: Mon, 13 Dec 2021 18:19:46 +0100 Subject: [PATCH] Fetch rule actions in chunks --- .../routes/rules/find_rules_route.ts | 11 +- .../rules/utils/get_current_rule_statuses.ts | 64 +++++++ ...gacy_get_bulk_rule_actions_saved_object.ts | 32 +++- .../server/utils/promise_pool.test.ts | 174 ++++++++++++++++++ .../server/utils/promise_pool.ts | 58 ++++++ 5 files changed, 325 insertions(+), 14 deletions(-) create mode 100644 x-pack/plugins/security_solution/server/lib/detection_engine/routes/rules/utils/get_current_rule_statuses.ts create mode 100644 x-pack/plugins/security_solution/server/utils/promise_pool.test.ts create mode 100644 x-pack/plugins/security_solution/server/utils/promise_pool.ts diff --git a/x-pack/plugins/security_solution/server/lib/detection_engine/routes/rules/find_rules_route.ts b/x-pack/plugins/security_solution/server/lib/detection_engine/routes/rules/find_rules_route.ts index 199ef75e22f25..859f84241ae38 100644 --- a/x-pack/plugins/security_solution/server/lib/detection_engine/routes/rules/find_rules_route.ts +++ b/x-pack/plugins/security_solution/server/lib/detection_engine/routes/rules/find_rules_route.ts @@ -18,6 +18,7 @@ import { findRules } from '../../rules/find_rules'; import { buildSiemResponse } from '../utils'; import { buildRouteValidation } from '../../../../utils/build_validation/route_validation'; import { transformFindAlerts } from './utils'; +import { getCurrentRuleStatuses } from './utils/get_current_rule_statuses'; // eslint-disable-next-line no-restricted-imports import { legacyGetBulkRuleActionsSavedObject } from '../../rule_actions/legacy_get_bulk_rule_actions_saved_object'; @@ -66,14 +67,12 @@ export const findRulesRoute = ( filter: query.filter, fields: query.fields, }); - const alertIds = rules.data.map((rule) => rule.id); + const ruleIds = rules.data.map((rule) => rule.id); + const spaceId = context.securitySolution.getSpaceId(); const [currentStatusesByRuleId, ruleActions] = await Promise.all([ - execLogClient.getCurrentStatusBulk({ - ruleIds: alertIds, - spaceId: context.securitySolution.getSpaceId(), - }), - legacyGetBulkRuleActionsSavedObject({ alertIds, savedObjectsClient, logger }), + getCurrentRuleStatuses({ ruleIds, execLogClient, spaceId, logger }), + legacyGetBulkRuleActionsSavedObject({ alertIds: ruleIds, savedObjectsClient, logger }), ]); const transformed = transformFindAlerts(rules, currentStatusesByRuleId, ruleActions); if (transformed == null) { diff --git a/x-pack/plugins/security_solution/server/lib/detection_engine/routes/rules/utils/get_current_rule_statuses.ts b/x-pack/plugins/security_solution/server/lib/detection_engine/routes/rules/utils/get_current_rule_statuses.ts new file mode 100644 index 0000000000000..4622805e11db6 --- /dev/null +++ b/x-pack/plugins/security_solution/server/lib/detection_engine/routes/rules/utils/get_current_rule_statuses.ts @@ -0,0 +1,64 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { chunk } from 'lodash'; +import { Logger } from 'src/core/server'; +import { initPromisePool } from '../../../../../utils/promise_pool'; +import { GetCurrentStatusBulkResult, IRuleExecutionLogClient } from '../../../rule_execution_log'; + +const RULES_PER_CHUNK = 1000; + +interface GetCurrentRuleStatusesArgs { + ruleIds: string[]; + execLogClient: IRuleExecutionLogClient; + spaceId: string; + logger: Logger; +} + +/** + * Get the most recent execution status for each of the given rule IDs. + * This method splits work into chunks so not to owerwhelm Elasticsearch + * when fetching statuses for a big number of rules. + * + * @param ruleIds Rule IDs to fetch statuses for + * @param execLogClient RuleExecutionLogClient + * @param spaceId Current Space ID + * @param logger Logger + * @returns A dict with rule IDs as keys and rule statuses as values + * + * @throws AggregateError if any of the rule status requests fail + */ +export async function getCurrentRuleStatuses({ + ruleIds, + execLogClient, + spaceId, + logger, +}: GetCurrentRuleStatusesArgs): Promise { + const { results, errors } = await initPromisePool({ + concurrency: 1, + items: chunk(ruleIds, RULES_PER_CHUNK), + executor: (ruleIdsChunk) => + execLogClient + .getCurrentStatusBulk({ + ruleIds: ruleIdsChunk, + spaceId, + }) + .catch((error) => { + logger.error( + `Error fetching rule status: ${error instanceof Error ? error.message : String(error)}` + ); + throw error; + }), + }); + + if (errors.length) { + throw new AggregateError(errors, 'Error fetching rule statuses'); + } + + // Merge all rule statuses into a single dict + return Object.assign({}, ...results); +} diff --git a/x-pack/plugins/security_solution/server/lib/detection_engine/rule_actions/legacy_get_bulk_rule_actions_saved_object.ts b/x-pack/plugins/security_solution/server/lib/detection_engine/rule_actions/legacy_get_bulk_rule_actions_saved_object.ts index b0c5dba77ad74..72cfada909cdc 100644 --- a/x-pack/plugins/security_solution/server/lib/detection_engine/rule_actions/legacy_get_bulk_rule_actions_saved_object.ts +++ b/x-pack/plugins/security_solution/server/lib/detection_engine/rule_actions/legacy_get_bulk_rule_actions_saved_object.ts @@ -5,6 +5,7 @@ * 2.0. */ +import { chunk } from 'lodash'; import { SavedObjectsFindOptionsReference } from 'kibana/server'; import { Logger } from 'src/core/server'; @@ -17,6 +18,7 @@ import { LegacyIRuleActionsAttributesSavedObjectAttributes } from './legacy_type import { legacyGetRuleActionsFromSavedObject } from './legacy_utils'; // eslint-disable-next-line no-restricted-imports import { LegacyRulesActionsSavedObject } from './legacy_get_rule_actions_saved_object'; +import { initPromisePool } from '../../../utils/promise_pool'; /** * @deprecated Once we are confident all rules relying on side-car actions SO's have been migrated to SO references we should remove this function @@ -39,15 +41,29 @@ export const legacyGetBulkRuleActionsSavedObject = async ({ id: alertId, type: 'alert', })); - const { - // eslint-disable-next-line @typescript-eslint/naming-convention - saved_objects, - } = await savedObjectsClient.find({ - type: legacyRuleActionsSavedObjectType, - perPage: 10000, - hasReference: references, + const { results, errors } = await initPromisePool({ + concurrency: 1, + items: chunk(references, 1000), + executor: (referencesChunk) => + savedObjectsClient + .find({ + type: legacyRuleActionsSavedObjectType, + perPage: 10000, + hasReference: referencesChunk, + }) + .catch((error) => { + logger.error( + `Error fetching rule actions: ${error instanceof Error ? error.message : String(error)}` + ); + throw error; + }), }); - return saved_objects.reduce( + if (errors.length) { + throw new AggregateError(errors, 'Error fetching rule actions'); + } + + const savedObjects = results.flatMap((result) => result.saved_objects); + return savedObjects.reduce( (acc: { [key: string]: LegacyRulesActionsSavedObject }, savedObject) => { const ruleAlertId = savedObject.references.find((reference) => { // Find the first rule alert and assume that is the one we want since we should only ever have 1. diff --git a/x-pack/plugins/security_solution/server/utils/promise_pool.test.ts b/x-pack/plugins/security_solution/server/utils/promise_pool.test.ts new file mode 100644 index 0000000000000..3a2e7ad160bd2 --- /dev/null +++ b/x-pack/plugins/security_solution/server/utils/promise_pool.test.ts @@ -0,0 +1,174 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { initPromisePool } from './promise_pool'; + +const nextTick = () => new Promise((resolve) => setImmediate(resolve)); + +const initPoolWithTasks = ({ concurrency = 1, items = [1, 2, 3] }) => { + const asyncTasks: Record< + number, + { + status: 'pending' | 'resolved' | 'rejected'; + resolve: () => void; + reject: () => void; + } + > = {}; + + const promisePool = initPromisePool({ + concurrency, + items, + executor: async (x) => + new Promise((resolve, reject) => { + asyncTasks[x] = { + status: 'pending', + resolve: () => { + asyncTasks[x].status = 'resolved'; + resolve(x); + }, + reject: () => { + asyncTasks[x].status = 'rejected'; + reject(new Error(`Error processing ${x}`)); + }, + }; + }), + }); + + return [promisePool, asyncTasks] as const; +}; + +describe('initPromisePool', () => { + it('should execute async tasks', async () => { + const { results, errors } = await initPromisePool({ + concurrency: 1, + items: [1, 2, 3], + executor: async (x) => x, + }); + + expect(results).toEqual([1, 2, 3]); + expect(errors).toEqual([]); + }); + + it('should capture any errors that occur during tasks execution', async () => { + const { results, errors } = await initPromisePool({ + concurrency: 1, + items: [1, 2, 3], + executor: async (x) => { + throw new Error(`Error processing ${x}`); + }, + }); + + expect(results).toEqual([]); + expect(errors).toEqual([ + new Error(`Error processing 1`), + new Error(`Error processing 2`), + new Error(`Error processing 3`), + ]); + }); + + it('should respect concurrency', async () => { + const [promisePool, asyncTasks] = initPoolWithTasks({ + concurrency: 1, + items: [1, 2, 3], + }); + + // Check that we have only one task pending initially as concurrency = 1 + expect(asyncTasks).toEqual({ + 1: expect.objectContaining({ status: 'pending' }), + }); + + asyncTasks[1].resolve(); + await nextTick(); + + // Check that after resolving the first task, the second is pending + expect(asyncTasks).toEqual({ + 1: expect.objectContaining({ status: 'resolved' }), + 2: expect.objectContaining({ status: 'pending' }), + }); + + asyncTasks[2].reject(); + await nextTick(); + + // Check that after rejecting the second task, the third is pending + expect(asyncTasks).toEqual({ + 1: expect.objectContaining({ status: 'resolved' }), + 2: expect.objectContaining({ status: 'rejected' }), + 3: expect.objectContaining({ status: 'pending' }), + }); + + asyncTasks[3].resolve(); + await nextTick(); + + // Check that all taks have been settled + expect(asyncTasks).toEqual({ + 1: expect.objectContaining({ status: 'resolved' }), + 2: expect.objectContaining({ status: 'rejected' }), + 3: expect.objectContaining({ status: 'resolved' }), + }); + + const { results, errors } = await promisePool; + + // Check final reesuts + expect(results).toEqual([1, 3]); + expect(errors).toEqual([new Error(`Error processing 2`)]); + }); + + it('should be possible to configure concurrency', async () => { + const [promisePool, asyncTasks] = initPoolWithTasks({ + concurrency: 2, + items: [1, 2, 3, 4, 5], + }); + + // Check that we have only two tasks pending initially as concurrency = 2 + expect(asyncTasks).toEqual({ + 1: expect.objectContaining({ status: 'pending' }), + 2: expect.objectContaining({ status: 'pending' }), + }); + + asyncTasks[1].resolve(); + await nextTick(); + + // Check that after resolving the first task, the second and the third is pending + expect(asyncTasks).toEqual({ + 1: expect.objectContaining({ status: 'resolved' }), + 2: expect.objectContaining({ status: 'pending' }), + 3: expect.objectContaining({ status: 'pending' }), + }); + + asyncTasks[2].reject(); + asyncTasks[3].reject(); + await nextTick(); + + // Check that after rejecting the second and the third tasks, the rest are pending + expect(asyncTasks).toEqual({ + 1: expect.objectContaining({ status: 'resolved' }), + 2: expect.objectContaining({ status: 'rejected' }), + 3: expect.objectContaining({ status: 'rejected' }), + 4: expect.objectContaining({ status: 'pending' }), + 5: expect.objectContaining({ status: 'pending' }), + }); + + asyncTasks[4].resolve(); + asyncTasks[5].resolve(); + await nextTick(); + + // Check that all taks have been settled + expect(asyncTasks).toEqual({ + 1: expect.objectContaining({ status: 'resolved' }), + 2: expect.objectContaining({ status: 'rejected' }), + 3: expect.objectContaining({ status: 'rejected' }), + 4: expect.objectContaining({ status: 'resolved' }), + 5: expect.objectContaining({ status: 'resolved' }), + }); + + const { results, errors } = await promisePool; + + // Check final reesuts + expect(results).toEqual([1, 4, 5]); + expect(errors).toEqual([new Error(`Error processing 2`), new Error(`Error processing 3`)]); + }); +}); diff --git a/x-pack/plugins/security_solution/server/utils/promise_pool.ts b/x-pack/plugins/security_solution/server/utils/promise_pool.ts new file mode 100644 index 0000000000000..d0c848bc11787 --- /dev/null +++ b/x-pack/plugins/security_solution/server/utils/promise_pool.ts @@ -0,0 +1,58 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +interface PromisePoolArgs { + concurrency?: number; + items: Item[]; + executor: (item: Item) => Promise; +} + +/** + * Runs promises in batches. It ensures that the number of running async tasks + * doesn't exceed the concurrency parameter passed to the function. + * + * @param concurrency - number of tasks run in parallel + * @param items - array of items to be passes to async executor + * @param executor - an async function to be called with each provided item + * + * @returns Struct holding results or errors of async tasks + */ +export const initPromisePool = async ({ + concurrency = 1, + items, + executor, +}: PromisePoolArgs) => { + const tasks: Array> = []; + const results: Result[] = []; + const errors: unknown[] = []; + + for (const item of items) { + // Check if the pool is full + if (tasks.length >= concurrency) { + // Wait for any first task to finish + await Promise.race(tasks); + } + + const task: Promise = executor(item) + .then((result) => { + results.push(result); + }) + .catch(async (error) => { + errors.push(error); + }) + .finally(() => { + tasks.splice(tasks.indexOf(task), 1); + }); + + tasks.push(task); + } + + // Wait for all remaining tasks to finish + await Promise.all(tasks); + + return { results, errors }; +};