diff --git a/x-pack/plugins/security_solution/server/lib/entity_analytics/risk_engine/risk_engine_data_client.test.ts b/x-pack/plugins/security_solution/server/lib/entity_analytics/risk_engine/risk_engine_data_client.test.ts index b13a9cda99352..5eb9b436c59b4 100644 --- a/x-pack/plugins/security_solution/server/lib/entity_analytics/risk_engine/risk_engine_data_client.test.ts +++ b/x-pack/plugins/security_solution/server/lib/entity_analytics/risk_engine/risk_engine_data_client.test.ts @@ -57,7 +57,7 @@ jest.mock('../utils/create_or_update_index', () => ({ })); jest.spyOn(transforms, 'createTransform').mockResolvedValue(Promise.resolve()); -jest.spyOn(transforms, 'startTransform').mockResolvedValue(Promise.resolve()); +jest.spyOn(transforms, 'scheduleTransformNow').mockResolvedValue(Promise.resolve()); describe('RiskEngineDataClient', () => { for (const useDataStreamForAlerts of [false, true]) { diff --git a/x-pack/plugins/security_solution/server/lib/entity_analytics/risk_score/configurations.test.ts b/x-pack/plugins/security_solution/server/lib/entity_analytics/risk_score/configurations.test.ts new file mode 100644 index 0000000000000..b0fab13b109bf --- /dev/null +++ b/x-pack/plugins/security_solution/server/lib/entity_analytics/risk_score/configurations.test.ts @@ -0,0 +1,52 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { getTransformOptions } from './configurations'; + +describe('getTransformOptions', () => { + it('transform content has changed, please update the transform version and regenerate the snapshot', () => { + const options = getTransformOptions({ + dest: 'dest', + source: ['source'], + }); + + expect(options).toMatchInlineSnapshot(` + Object { + "_meta": Object { + "managed": true, + "managed_by": "security-entity-analytics", + "version": 2, + }, + "dest": Object { + "index": "dest", + }, + "frequency": "1h", + "latest": Object { + "sort": "@timestamp", + "unique_key": Array [ + "host.name", + "user.name", + ], + }, + "settings": Object { + "unattended": true, + }, + "source": Object { + "index": Array [ + "source", + ], + }, + "sync": Object { + "time": Object { + "delay": "0s", + "field": "@timestamp", + }, + }, + } + `); + }); +}); diff --git a/x-pack/plugins/security_solution/server/lib/entity_analytics/risk_score/configurations.ts b/x-pack/plugins/security_solution/server/lib/entity_analytics/risk_score/configurations.ts index 2077a152e07b4..6dbf68c699fd5 100644 --- a/x-pack/plugins/security_solution/server/lib/entity_analytics/risk_score/configurations.ts +++ b/x-pack/plugins/security_solution/server/lib/entity_analytics/risk_score/configurations.ts @@ -4,6 +4,7 @@ * 2.0; you may not use this file except in compliance with the Elastic License * 2.0. */ +import type { TransformPutTransformRequest } from '@elastic/elasticsearch/lib/api/types'; import type { FieldMap } from '@kbn/alerts-as-data-utils'; import type { IdentifierType } from '../../../../common/entity_analytics/risk_engine'; import { @@ -135,11 +136,24 @@ export const getIndexPatternDataStream = (namespace: string): IIndexPatternStrin alias: `${riskScoreBaseIndexName}.${riskScoreBaseIndexName}-${namespace}`, }); -export const getTransformOptions = ({ dest, source }: { dest: string; source: string[] }) => ({ +export type TransformOptions = Omit; + +/** + * WARNING: We must increase the version when changing any configuration + * + * The risk engine starts the transforms executions after writing the documents to the risk score index. + * So the transform don't need to run on a schedule. + */ +export const getTransformOptions = ({ + dest, + source, +}: { + dest: string; + source: string[]; +}): Omit => ({ dest: { index: dest, }, - frequency: '1h', latest: { sort: '@timestamp', unique_key: [`host.name`, `user.name`], @@ -147,10 +161,20 @@ export const getTransformOptions = ({ dest, source }: { dest: string; source: st source: { index: source, }, + frequency: '1h', // 1h is the maximum value sync: { time: { - delay: '2s', + delay: '0s', // It doesn't have any delay because the risk engine writes the documents to the index and schedules the transform synchronously. field: '@timestamp', }, }, + settings: { + unattended: true, // In unattended mode, the transform retries indefinitely in case of an error + }, + _meta: { + version: 2, // When this field is updated we automatically update the transform + + managed: true, // Metadata that identifies the transform. It has no functionality + managed_by: 'security-entity-analytics', // Metadata that identifies the transform. It has no functionality + }, }); diff --git a/x-pack/plugins/security_solution/server/lib/entity_analytics/risk_score/risk_score_data_client.test.ts b/x-pack/plugins/security_solution/server/lib/entity_analytics/risk_score/risk_score_data_client.test.ts index 493f419cf6cc2..2ddd04a766944 100644 --- a/x-pack/plugins/security_solution/server/lib/entity_analytics/risk_score/risk_score_data_client.test.ts +++ b/x-pack/plugins/security_solution/server/lib/entity_analytics/risk_score/risk_score_data_client.test.ts @@ -36,7 +36,7 @@ jest.mock('../utils/create_or_update_index', () => ({ })); jest.spyOn(transforms, 'createTransform').mockResolvedValue(Promise.resolve()); -jest.spyOn(transforms, 'startTransform').mockResolvedValue(Promise.resolve()); +jest.spyOn(transforms, 'scheduleTransformNow').mockResolvedValue(Promise.resolve()); describe('RiskScoreDataClient', () => { let riskScoreDataClient: RiskScoreDataClient; @@ -428,11 +428,19 @@ describe('RiskScoreDataClient', () => { }, sync: { time: { - delay: '2s', + delay: '0s', field: '@timestamp', }, }, transform_id: 'risk_score_latest_transform_default', + settings: { + unattended: true, + }, + _meta: { + version: 2, + managed: true, + managed_by: 'security-entity-analytics', + }, }, }); }); diff --git a/x-pack/plugins/security_solution/server/lib/entity_analytics/risk_score/risk_score_data_client.ts b/x-pack/plugins/security_solution/server/lib/entity_analytics/risk_score/risk_score_data_client.ts index c0920188acd31..a95b29e5cc6ee 100644 --- a/x-pack/plugins/security_solution/server/lib/entity_analytics/risk_score/risk_score_data_client.ts +++ b/x-pack/plugins/security_solution/server/lib/entity_analytics/risk_score/risk_score_data_client.ts @@ -28,7 +28,10 @@ import { import { createDataStream } from '../utils/create_datastream'; import type { RiskEngineDataWriter as Writer } from './risk_engine_data_writer'; import { RiskEngineDataWriter } from './risk_engine_data_writer'; -import { getRiskScoreLatestIndex } from '../../../../common/entity_analytics/risk_engine'; +import { + getRiskScoreLatestIndex, + getRiskScoreTimeSeriesIndex, +} from '../../../../common/entity_analytics/risk_engine'; import { createTransform, getLatestTransformId } from '../utils/transforms'; import { getRiskInputsIndex } from './get_risk_inputs_index'; @@ -71,6 +74,12 @@ export class RiskScoreDataClient { return writer; } + public refreshRiskScoreIndex = async () => { + await this.options.esClient.indices.refresh({ + index: getRiskScoreTimeSeriesIndex(this.options.namespace), + }); + }; + public getRiskInputsIndex = ({ dataViewId }: { dataViewId: string }) => getRiskInputsIndex({ dataViewId, diff --git a/x-pack/plugins/security_solution/server/lib/entity_analytics/risk_score/risk_score_service.mock.ts b/x-pack/plugins/security_solution/server/lib/entity_analytics/risk_score/risk_score_service.mock.ts index 2423a87559b17..d1b74637725db 100644 --- a/x-pack/plugins/security_solution/server/lib/entity_analytics/risk_score/risk_score_service.mock.ts +++ b/x-pack/plugins/security_solution/server/lib/entity_analytics/risk_score/risk_score_service.mock.ts @@ -35,6 +35,7 @@ const createRiskScoreServiceMock = (): jest.Mocked => ({ getConfigurationWithDefaults: jest.fn(), getRiskInputsIndex: jest.fn(), scheduleLatestTransformNow: jest.fn(), + refreshRiskScoreIndex: jest.fn(), }); export const riskScoreServiceMock = { diff --git a/x-pack/plugins/security_solution/server/lib/entity_analytics/risk_score/risk_score_service.ts b/x-pack/plugins/security_solution/server/lib/entity_analytics/risk_score/risk_score_service.ts index cb89d61d8720d..4d39eb9a068a5 100644 --- a/x-pack/plugins/security_solution/server/lib/entity_analytics/risk_score/risk_score_service.ts +++ b/x-pack/plugins/security_solution/server/lib/entity_analytics/risk_score/risk_score_service.ts @@ -35,6 +35,7 @@ export interface RiskScoreService { ) => Promise; getRiskInputsIndex: ({ dataViewId }: { dataViewId: string }) => Promise; scheduleLatestTransformNow: () => Promise; + refreshRiskScoreIndex: () => Promise; } export interface RiskScoreServiceFactoryParams { @@ -83,5 +84,7 @@ export const riskScoreServiceFactory = ({ }; }, getRiskInputsIndex: async (params) => riskScoreDataClient.getRiskInputsIndex(params), - scheduleLatestTransformNow: () => scheduleLatestTransformNow({ namespace: spaceId, esClient }), + scheduleLatestTransformNow: () => + scheduleLatestTransformNow({ namespace: spaceId, esClient, logger }), + refreshRiskScoreIndex: () => riskScoreDataClient.refreshRiskScoreIndex(), }); diff --git a/x-pack/plugins/security_solution/server/lib/entity_analytics/risk_score/routes/entity_calculation.test.ts b/x-pack/plugins/security_solution/server/lib/entity_analytics/risk_score/routes/entity_calculation.test.ts index dbb8459d0ae42..54b42d02c5495 100644 --- a/x-pack/plugins/security_solution/server/lib/entity_analytics/risk_score/routes/entity_calculation.test.ts +++ b/x-pack/plugins/security_solution/server/lib/entity_analytics/risk_score/routes/entity_calculation.test.ts @@ -85,6 +85,16 @@ describe('entity risk score calculation route', () => { expect(response.status).toEqual(200); }); + it('should schedule transform when risk scores are persisted ', async () => { + const request = buildRequest(); + + const response = await server.inject(request, requestContextMock.convertContext(context)); + + expect(mockRiskScoreService.scheduleLatestTransformNow).toHaveBeenCalled(); + + expect(response.status).toEqual(200); + }); + it('should call "calculateAndPersistScores" with entity filter', async () => { const request = buildRequest(); diff --git a/x-pack/plugins/security_solution/server/lib/entity_analytics/risk_score/routes/entity_calculation.ts b/x-pack/plugins/security_solution/server/lib/entity_analytics/risk_score/routes/entity_calculation.ts index 502e296db3644..fe6e404e9e96d 100644 --- a/x-pack/plugins/security_solution/server/lib/entity_analytics/risk_score/routes/entity_calculation.ts +++ b/x-pack/plugins/security_solution/server/lib/entity_analytics/risk_score/routes/entity_calculation.ts @@ -146,6 +146,10 @@ export const riskScoreEntityCalculationRoute = ( }); } + if (result.scores_written > 0) { + await riskScoreService.scheduleLatestTransformNow(); + } + const score = result.scores_written === 1 ? result.scores?.[identifierType]?.[0] : undefined; diff --git a/x-pack/plugins/security_solution/server/lib/entity_analytics/risk_score/tasks/risk_scoring_task.test.ts b/x-pack/plugins/security_solution/server/lib/entity_analytics/risk_score/tasks/risk_scoring_task.test.ts index 7e1519e3745c4..2b7f77b407d31 100644 --- a/x-pack/plugins/security_solution/server/lib/entity_analytics/risk_score/tasks/risk_scoring_task.test.ts +++ b/x-pack/plugins/security_solution/server/lib/entity_analytics/risk_score/tasks/risk_scoring_task.test.ts @@ -474,19 +474,6 @@ describe('Risk Scoring Task', () => { expect.stringContaining('task was cancelled') ); }); - - it('schedules the transform to run now', async () => { - await runTask({ - getRiskScoreService, - isCancelled: mockIsCancelled, - logger: mockLogger, - taskInstance: riskScoringTaskInstanceMock, - telemetry: mockTelemetry, - entityAnalyticsConfig, - }); - - expect(mockRiskScoreService.scheduleLatestTransformNow).toHaveBeenCalledTimes(1); - }); }); describe('when execution was successful', () => { @@ -520,6 +507,19 @@ describe('Risk Scoring Task', () => { expect(mockRiskScoreService.scheduleLatestTransformNow).toHaveBeenCalledTimes(1); }); + + it('refreshes the risk score index', async () => { + await runTask({ + getRiskScoreService, + isCancelled: mockIsCancelled, + logger: mockLogger, + taskInstance: riskScoringTaskInstanceMock, + telemetry: mockTelemetry, + entityAnalyticsConfig, + }); + + expect(mockRiskScoreService.refreshRiskScoreIndex).toHaveBeenCalledTimes(1); + }); }); describe('when execution was unsuccessful', () => { diff --git a/x-pack/plugins/security_solution/server/lib/entity_analytics/risk_score/tasks/risk_scoring_task.ts b/x-pack/plugins/security_solution/server/lib/entity_analytics/risk_score/tasks/risk_scoring_task.ts index 1b2b44dae6a95..da4a48dfd02d2 100644 --- a/x-pack/plugins/security_solution/server/lib/entity_analytics/risk_score/tasks/risk_scoring_task.ts +++ b/x-pack/plugins/security_solution/server/lib/entity_analytics/risk_score/tasks/risk_scoring_task.ts @@ -310,15 +310,20 @@ export const runTask = async ({ }; telemetry.reportEvent(RISK_SCORE_EXECUTION_SUCCESS_EVENT.eventType, telemetryEvent); - await riskScoreService.scheduleLatestTransformNow(); - if (isCancelled()) { log('task was cancelled'); telemetry.reportEvent(RISK_SCORE_EXECUTION_CANCELLATION_EVENT.eventType, telemetryEvent); } + if (scoresWritten > 0) { + log('refreshing risk score index and scheduling transform'); + await riskScoreService.refreshRiskScoreIndex(); + await riskScoreService.scheduleLatestTransformNow(); + } + log('task run completed'); log(JSON.stringify({ ...telemetryEvent, runs })); + return { state: updatedState, }; diff --git a/x-pack/plugins/security_solution/server/lib/entity_analytics/utils/transforms.test.ts b/x-pack/plugins/security_solution/server/lib/entity_analytics/utils/transforms.test.ts index 25aeaa4ad566d..ff1aa5e041c60 100644 --- a/x-pack/plugins/security_solution/server/lib/entity_analytics/utils/transforms.test.ts +++ b/x-pack/plugins/security_solution/server/lib/entity_analytics/utils/transforms.test.ts @@ -5,9 +5,17 @@ * 2.0. */ -import type { TransformGetTransformStatsResponse } from '@elastic/elasticsearch/lib/api/types'; -import { elasticsearchServiceMock } from '@kbn/core/server/mocks'; -import { scheduleTransformNow } from './transforms'; +import type { + TransformGetTransformResponse, + TransformGetTransformStatsResponse, +} from '@elastic/elasticsearch/lib/api/types'; +import { elasticsearchServiceMock, loggingSystemMock } from '@kbn/core/server/mocks'; +import { + getRiskScoreLatestIndex, + getRiskScoreTimeSeriesIndex, +} from '../../../../common/entity_analytics/risk_engine'; +import { getTransformOptions } from '../risk_score/configurations'; +import { scheduleLatestTransformNow, scheduleTransformNow } from './transforms'; const transformId = 'test_transform_id'; @@ -31,6 +39,44 @@ const stoppedTransformsMock = { ], } as TransformGetTransformStatsResponse; +const latestIndex = getRiskScoreLatestIndex('tests'); +const timeSeriesIndex = getRiskScoreTimeSeriesIndex('tests'); +const transformConfig = getTransformOptions({ + dest: latestIndex, + source: [timeSeriesIndex], +}); + +const updatedTransformsMock = { + count: 1, + transforms: [ + { + id: 'test_transform_id_3', + ...transformConfig, + }, + ], +} as TransformGetTransformResponse; + +const outdatedTransformsMock = { + count: 1, + transforms: [ + { + ...transformConfig, + id: 'test_transform_id_3', + sync: { + time: { + field: '@timestamp', + delay: '2s', + }, + }, + _meta: { + version: '1', + }, + }, + ], +} as TransformGetTransformResponse; + +const logger = loggingSystemMock.createLogger(); + describe('transforms utils', () => { beforeEach(() => { jest.resetAllMocks(); @@ -55,4 +101,39 @@ describe('transforms utils', () => { expect(esClient.transform.scheduleNowTransform).toHaveBeenCalled(); }); }); + + describe('scheduleLatestTransformNow', () => { + it('update the latest transform when scheduleTransformNow is called and the transform is outdated', async () => { + const esClient = elasticsearchServiceMock.createScopedClusterClient().asCurrentUser; + esClient.transform.getTransformStats.mockResolvedValueOnce(stoppedTransformsMock); + esClient.transform.getTransform.mockResolvedValueOnce(outdatedTransformsMock); + + await scheduleLatestTransformNow({ esClient, namespace: 'tests', logger }); + + expect(esClient.transform.updateTransform).toHaveBeenCalled(); + }); + + it('does not update the latest transform when scheduleTransformNow is called if the transform is updated', async () => { + const esClient = elasticsearchServiceMock.createScopedClusterClient().asCurrentUser; + esClient.transform.getTransformStats.mockResolvedValueOnce(stoppedTransformsMock); + esClient.transform.getTransform.mockResolvedValueOnce(updatedTransformsMock); + + await scheduleLatestTransformNow({ esClient, namespace: 'tests', logger }); + + expect(esClient.transform.updateTransform).not.toHaveBeenCalled(); + }); + + it('it logs the error if update transform fails', async () => { + const esClient = elasticsearchServiceMock.createScopedClusterClient().asCurrentUser; + esClient.transform.getTransformStats.mockResolvedValueOnce(stoppedTransformsMock); + esClient.transform.getTransform.mockResolvedValueOnce(outdatedTransformsMock); + esClient.transform.updateTransform.mockRejectedValueOnce(new Error('Test error')); + + await scheduleLatestTransformNow({ esClient, namespace: 'tests', logger }); + + expect(logger.error).toHaveBeenCalledWith( + 'There was an error upgrading the transform risk_score_latest_transform_tests. Continuing with transform scheduling. Test error' + ); + }); + }); }); diff --git a/x-pack/plugins/security_solution/server/lib/entity_analytics/utils/transforms.ts b/x-pack/plugins/security_solution/server/lib/entity_analytics/utils/transforms.ts index d1a544233339e..35a7f7113bfa5 100644 --- a/x-pack/plugins/security_solution/server/lib/entity_analytics/utils/transforms.ts +++ b/x-pack/plugins/security_solution/server/lib/entity_analytics/utils/transforms.ts @@ -14,11 +14,17 @@ import type { TransformPutTransformRequest, TransformGetTransformStatsTransformStats, } from '@elastic/elasticsearch/lib/api/types'; +import { + getRiskScoreLatestIndex, + getRiskScoreTimeSeriesIndex, +} from '../../../../common/entity_analytics/risk_engine'; import { RiskScoreEntity } from '../../../../common/search_strategy'; import { getRiskScorePivotTransformId, getRiskScoreLatestTransformId, } from '../../../../common/utils/risk_score_modules'; +import type { TransformOptions } from '../risk_score/configurations'; +import { getTransformOptions } from '../risk_score/configurations'; export const getLegacyTransforms = async ({ namespace, @@ -107,51 +113,66 @@ const hasTransformStarted = (transformStats: TransformGetTransformStatsTransform return transformStats.state === 'indexing' || transformStats.state === 'started'; }; -export const startTransform = async ({ +export const scheduleTransformNow = async ({ esClient, transformId, }: { esClient: ElasticsearchClient; transformId: string; -}): Promise => { +}): Promise => { const transformStats = await esClient.transform.getTransformStats({ transform_id: transformId, }); if (transformStats.count <= 0) { throw new Error( - `Unable to find transform status for [${transformId}] while attempting to start` + `Unable to find transform status for [${transformId}] while attempting to schedule` ); } - if (hasTransformStarted(transformStats.transforms[0])) { - return; - } - return esClient.transform.startTransform({ transform_id: transformId }); + if (!hasTransformStarted(transformStats.transforms[0])) { + await esClient.transform.startTransform({ + transform_id: transformId, + }); + } else { + await esClient.transform.scheduleNowTransform({ + transform_id: transformId, + }); + } }; -export const scheduleTransformNow = async ({ +/** + * Whenever we change the latest transform configuration, we must ensure we update the transform in environments where it has already been installed. + */ +const upgradeLatestTransformIfNeeded = async ({ esClient, - transformId, + namespace, + logger, }: { esClient: ElasticsearchClient; - transformId: string; + namespace: string; + logger: Logger; }): Promise => { - const transformStats = await esClient.transform.getTransformStats({ + const transformId = getLatestTransformId(namespace); + const latestIndex = getRiskScoreLatestIndex(namespace); + const timeSeriesIndex = getRiskScoreTimeSeriesIndex(namespace); + + const response = await esClient.transform.getTransform({ transform_id: transformId, }); - if (transformStats.count <= 0) { - throw new Error( - `Unable to find transform status for [${transformId}] while attempting to schedule now` - ); - } - if (hasTransformStarted(transformStats.transforms[0])) { - await esClient.transform.scheduleNowTransform({ - transform_id: transformId, - }); - } else { - await esClient.transform.startTransform({ + const newConfig = getTransformOptions({ + dest: latestIndex, + source: [timeSeriesIndex], + }); + + if (isTransformOutdated(response.transforms[0], newConfig)) { + logger.info(`Upgrading transform ${transformId}`); + + const { latest: _unused, ...changes } = newConfig; + + await esClient.transform.updateTransform({ transform_id: transformId, + ...changes, }); } }; @@ -159,10 +180,30 @@ export const scheduleTransformNow = async ({ export const scheduleLatestTransformNow = async ({ namespace, esClient, + logger, }: { namespace: string; esClient: ElasticsearchClient; + logger: Logger; }): Promise => { const transformId = getLatestTransformId(namespace); + + try { + await upgradeLatestTransformIfNeeded({ esClient, namespace, logger }); + } catch (err) { + logger.error( + `There was an error upgrading the transform ${transformId}. Continuing with transform scheduling. ${err.message}` + ); + } + await scheduleTransformNow({ esClient, transformId }); }; + +/** + * Whitelist the transform fields that we can update. + */ + +const isTransformOutdated = ( + transform: TransformGetTransformTransformSummary, + newConfig: TransformOptions +): boolean => transform._meta?.version !== newConfig._meta?.version; diff --git a/x-pack/test/security_solution_api_integration/test_suites/entity_analytics/risk_engine/trial_license_complete_tier/telemetry_usage.ts b/x-pack/test/security_solution_api_integration/test_suites/entity_analytics/risk_engine/trial_license_complete_tier/telemetry_usage.ts index 3eca71c37dab5..6b45ab7a18427 100644 --- a/x-pack/test/security_solution_api_integration/test_suites/entity_analytics/risk_engine/trial_license_complete_tier/telemetry_usage.ts +++ b/x-pack/test/security_solution_api_integration/test_suites/entity_analytics/risk_engine/trial_license_complete_tier/telemetry_usage.ts @@ -100,10 +100,10 @@ export default ({ getService }: FtrProviderContext) => { ...otherStats } = await getRiskEngineStats(supertest, log); const expected = { - unique_host_risk_score_total: 0, - unique_user_risk_score_total: 0, - unique_user_risk_score_day: 0, - unique_host_risk_score_day: 0, + unique_host_risk_score_total: 10, + unique_user_risk_score_total: 10, + unique_user_risk_score_day: 10, + unique_host_risk_score_day: 10, all_user_risk_scores_total: 10, all_host_risk_scores_total: 10, all_user_risk_scores_total_day: 10,