From 73ce75aa8c392d367deac28c8919b4a80ba9538f Mon Sep 17 00:00:00 2001 From: Paul Tavares <56442535+paul-tavares@users.noreply.github.com> Date: Tue, 6 Feb 2024 16:48:12 -0500 Subject: [PATCH] [Security Solution][Endpoint] Fix Manifest Manger so that it works with large (>10k) (#174411) ## Summary ### Fleet Changes: - Two new utilities that return `AsyncIterator`'s: - one for working with ElasticSearch `.search()` method - one for working with SavedObjects `.find()` method - NOTE: although the `SavedObjects` client already supports getting back an `find` interface that returns an `AysncIterable`, I was not convenient to use in our use cases where we are returning the data from the SO back to an external consumer (services exposed by Fleet). We need to be able to first process the data out of the SO before returning it to the consumer, thus having this utility facilitates that. - both handle looping through ALL data in a given query (even if >10k) - new `fetchAllArtifacts()` method in `ArtifactsClient`: Returns an `AsyncIterator` enabling one to loop through all artifacts (even if >10k) - new `fetchAllItemIds()` method in `PackagePolicyService`: return an `AsyncIterator` enabling one to loop through all item IDs (even if >10k) - new `fetchAllItems()` method in `PackagePolicyService`: returns an `AsyncIterator` enabling one to loop through all package policies (even if >10k) ### Endpoint Changes: - Retrieval of existing artifacts as well as list of all policies and policy IDs now use new methods introduced into fleet services (above) - Added new config property - `xpack.securitySolution.packagerTaskTimeout` - to enable customer to adjust the timeout value for how long the artifact packager task can run. Default has been set to `20m` - Efficiencies around batch processing of updates to Policies and artifact creation - improved logging ### Checklist - [x] [Unit or functional tests](https://www.elastic.co/guide/en/kibana/master/development-tests.html) were updated or added to match the most common scenarios --- x-pack/plugins/fleet/server/mocks/index.ts | 20 + .../server/mocks/package_policy.mocks.ts | 109 +++++ .../services/artifacts/artifacts.test.ts | 137 ++++++- .../server/services/artifacts/artifacts.ts | 88 +++- .../fleet/server/services/artifacts/client.ts | 44 +- .../fleet/server/services/artifacts/mocks.ts | 29 ++ .../fleet/server/services/artifacts/types.ts | 8 + .../server/services/package_policies/index.ts | 1 + .../services/package_policies/utils.test.ts | 49 +++ .../server/services/package_policies/utils.ts | 59 +++ .../server/services/package_policy.test.ts | 147 +++++++ .../fleet/server/services/package_policy.ts | 68 +++- .../server/services/package_policy_service.ts | 27 ++ .../utils/create_es_search_iterable.ts | 165 ++++++++ .../services/utils/create_so_find_iterable.ts | 142 +++++++ .../common/endpoint/errors.ts | 4 + .../security_solution/server/config.mock.ts | 1 + .../security_solution/server/config.ts | 10 +- .../endpoint/lib/artifacts/task.test.ts | 4 +- .../server/endpoint/lib/artifacts/task.ts | 71 +++- .../services/artifacts/artifact_client.ts | 13 +- .../manifest_manager/manifest_manager.test.ts | 114 +++--- .../manifest_manager/manifest_manager.ts | 379 ++++++++++-------- .../endpoint/services/artifacts/mocks.ts | 1 + .../server/endpoint/utils/queue_processor.ts | 152 +++++++ .../security_solution/server/plugin.ts | 2 +- 26 files changed, 1580 insertions(+), 264 deletions(-) create mode 100644 x-pack/plugins/fleet/server/mocks/package_policy.mocks.ts create mode 100644 x-pack/plugins/fleet/server/services/package_policies/utils.test.ts create mode 100644 x-pack/plugins/fleet/server/services/package_policies/utils.ts create mode 100644 x-pack/plugins/fleet/server/services/utils/create_es_search_iterable.ts create mode 100644 x-pack/plugins/fleet/server/services/utils/create_so_find_iterable.ts create mode 100644 x-pack/plugins/security_solution/server/endpoint/utils/queue_processor.ts diff --git a/x-pack/plugins/fleet/server/mocks/index.ts b/x-pack/plugins/fleet/server/mocks/index.ts index fb6dd7d075cea..857882c57525f 100644 --- a/x-pack/plugins/fleet/server/mocks/index.ts +++ b/x-pack/plugins/fleet/server/mocks/index.ts @@ -31,6 +31,8 @@ import { packageServiceMock } from '../services/epm/package_service.mock'; import type { UninstallTokenServiceInterface } from '../services/security/uninstall_token_service'; import type { MessageSigningServiceInterface } from '../services/security'; +import { PackagePolicyMocks } from './package_policy.mocks'; + // Export all mocks from artifacts export * from '../services/artifacts/mocks'; @@ -40,6 +42,8 @@ export * from '../services/files/mocks'; // export all mocks from fleet actions client export * from '../services/actions/mocks'; +export * from './package_policy.mocks'; + export interface MockedFleetAppContext extends FleetAppContext { elasticsearch: ReturnType; data: ReturnType; @@ -144,6 +148,22 @@ export const createPackagePolicyServiceMock = (): jest.Mocked { + return { + async *[Symbol.asyncIterator]() { + yield Promise.resolve([PackagePolicyMocks.generatePackagePolicy({ id: '111' })]); + yield Promise.resolve([PackagePolicyMocks.generatePackagePolicy({ id: '222' })]); + }, + }; + }), + fetchAllItemIds: jest.fn((..._) => { + return { + async *[Symbol.asyncIterator]() { + yield Promise.resolve(['111']); + yield Promise.resolve(['222']); + }, + }; + }), }; }; diff --git a/x-pack/plugins/fleet/server/mocks/package_policy.mocks.ts b/x-pack/plugins/fleet/server/mocks/package_policy.mocks.ts new file mode 100644 index 0000000000000..a159917cb5e17 --- /dev/null +++ b/x-pack/plugins/fleet/server/mocks/package_policy.mocks.ts @@ -0,0 +1,109 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import type { SavedObjectsFindResponse } from '@kbn/core-saved-objects-api-server'; + +import type { SavedObjectsFindResult } from '@kbn/core-saved-objects-api-server'; + +import { mapPackagePolicySavedObjectToPackagePolicy } from '../services/package_policies'; + +import type { PackagePolicy } from '../../common'; +import { PACKAGE_POLICY_SAVED_OBJECT_TYPE } from '../../common'; + +import type { PackagePolicySOAttributes } from '../types'; + +const generatePackagePolicySOAttributesMock = ( + overrides: Partial = {} +): PackagePolicySOAttributes => { + return { + name: `Package Policy 1`, + description: 'Policy for things', + created_at: '2024-01-24T15:21:13.389Z', + created_by: 'elastic', + updated_at: '2024-01-25T15:21:13.389Z', + updated_by: 'user-a', + policy_id: '444-555-666', + enabled: true, + inputs: [], + namespace: 'default', + package: { + name: 'endpoint', + title: 'Elastic Endpoint', + version: '1.0.0', + }, + revision: 1, + is_managed: false, + secret_references: [], + vars: {}, + elasticsearch: { + privileges: { + cluster: [], + }, + }, + agents: 2, + + ...overrides, + }; +}; + +const generatePackagePolicyMock = (overrides: Partial = {}) => { + return { + ...mapPackagePolicySavedObjectToPackagePolicy(generatePackagePolicySavedObjectMock()), + ...overrides, + }; +}; + +const generatePackagePolicySavedObjectMock = ( + soAttributes: PackagePolicySOAttributes = generatePackagePolicySOAttributesMock() +): SavedObjectsFindResult => { + return { + score: 1, + id: 'so-123', + type: PACKAGE_POLICY_SAVED_OBJECT_TYPE, + version: 'abc', + created_at: soAttributes.created_at, + updated_at: soAttributes.updated_at, + attributes: soAttributes, + references: [], + sort: ['created_at'], + }; +}; + +const generatePackagePolicySavedObjectFindResponseMock = ( + soResults?: PackagePolicySOAttributes[] +): SavedObjectsFindResponse => { + const soList = soResults ?? [ + generatePackagePolicySOAttributesMock(), + generatePackagePolicySOAttributesMock(), + ]; + + return { + saved_objects: soList.map((soAttributes) => { + return { + score: 1, + id: 'so-123', + type: PACKAGE_POLICY_SAVED_OBJECT_TYPE, + version: 'abc', + created_at: soAttributes.created_at, + updated_at: soAttributes.updated_at, + attributes: soAttributes, + references: [], + sort: ['created_at'], + }; + }), + total: soList.length, + per_page: 10, + page: 1, + pit_id: 'pit-id-1', + }; +}; + +export const PackagePolicyMocks = Object.freeze({ + generatePackagePolicySOAttributes: generatePackagePolicySOAttributesMock, + generatePackagePolicySavedObjectFindResponse: generatePackagePolicySavedObjectFindResponseMock, + generatePackagePolicy: generatePackagePolicyMock, +}); diff --git a/x-pack/plugins/fleet/server/services/artifacts/artifacts.test.ts b/x-pack/plugins/fleet/server/services/artifacts/artifacts.test.ts index f3b332a5930fc..782b044a84697 100644 --- a/x-pack/plugins/fleet/server/services/artifacts/artifacts.test.ts +++ b/x-pack/plugins/fleet/server/services/artifacts/artifacts.test.ts @@ -11,6 +11,8 @@ import { errors } from '@elastic/elasticsearch'; import type { TransportResult } from '@elastic/elasticsearch'; +import { set } from '@kbn/safer-lodash-set'; + import { FLEET_SERVER_ARTIFACTS_INDEX } from '../../../common'; import { ArtifactsElasticsearchError } from '../../errors'; @@ -33,12 +35,14 @@ import { createArtifact, deleteArtifact, encodeArtifactContent, + fetchAllArtifacts, generateArtifactContentHash, getArtifact, listArtifacts, } from './artifacts'; import type { NewArtifact } from './types'; +import type { FetchAllArtifactsOptions } from './types'; describe('When using the artifacts services', () => { let esClientMock: ReturnType; @@ -324,8 +328,28 @@ describe('When using the artifacts services', () => { newArtifact, ]); - expect(responseErrors).toEqual([new Error('error')]); - expect(artifacts).toBeUndefined(); + expect(responseErrors).toEqual([ + new Error( + 'Create of artifact id [undefined] returned: result [undefined], status [400], reason [{"reason":"error"}]' + ), + ]); + expect(artifacts).toEqual([ + { + body: 'eJyrVkrNKynKTC1WsoqOrQUAJxkFKQ==', + compressionAlgorithm: 'zlib', + created: expect.any(String), + decodedSha256: 'd801aa1fb', + decodedSize: 14, + encodedSha256: 'd29238d40', + encodedSize: 22, + encryptionAlgorithm: 'none', + id: 'endpoint:trustlist-v1-d801aa1fb', + identifier: 'trustlist-v1', + packageName: 'endpoint', + relative_url: '/api/fleet/artifacts/trustlist-v1/d801aa1fb', + type: 'trustlist', + }, + ]); }); }); @@ -488,4 +512,113 @@ describe('When using the artifacts services', () => { }); }); }); + + describe('and calling `fetchAll()`', () => { + beforeEach(() => { + esClientMock.search + .mockResolvedValueOnce(generateArtifactEsSearchResultHitsMock()) + .mockResolvedValueOnce(generateArtifactEsSearchResultHitsMock()) + .mockResolvedValueOnce(set(generateArtifactEsSearchResultHitsMock(), 'hits.hits', [])); + }); + + it('should return an iterator', async () => { + expect(fetchAllArtifacts(esClientMock)).toEqual({ + [Symbol.asyncIterator]: expect.any(Function), + }); + }); + + it('should provide artifacts on each iteration', async () => { + for await (const artifacts of fetchAllArtifacts(esClientMock)) { + expect(artifacts[0]).toEqual({ + body: expect.anything(), + compressionAlgorithm: expect.anything(), + created: expect.anything(), + decodedSha256: expect.anything(), + decodedSize: expect.anything(), + encodedSha256: expect.anything(), + encodedSize: expect.anything(), + encryptionAlgorithm: expect.anything(), + id: expect.anything(), + identifier: expect.anything(), + packageName: expect.anything(), + relative_url: expect.anything(), + type: expect.anything(), + }); + } + + expect(esClientMock.search).toHaveBeenCalledTimes(3); + }); + + it('should use defaults if no `options` were provided', async () => { + for await (const artifacts of fetchAllArtifacts(esClientMock)) { + expect(artifacts.length).toBeGreaterThan(0); + } + + expect(esClientMock.search).toHaveBeenLastCalledWith( + expect.objectContaining({ + q: '', + size: 1000, + sort: [{ created: { order: 'asc' } }], + _source_excludes: undefined, + }) + ); + }); + + it('should use custom options when provided', async () => { + const options: FetchAllArtifactsOptions = { + kuery: 'foo: something', + sortOrder: 'desc', + perPage: 500, + sortField: 'someField', + includeArtifactBody: false, + }; + + for await (const artifacts of fetchAllArtifacts(esClientMock, options)) { + expect(artifacts.length).toBeGreaterThan(0); + } + + expect(esClientMock.search).toHaveBeenCalledWith( + expect.objectContaining({ + q: options.kuery, + size: options.perPage, + sort: [{ [options.sortField!]: { order: options.sortOrder } }], + _source_excludes: 'body', + }) + ); + }); + + it('should set `done` to true if loop `break`s out', async () => { + const iterator = fetchAllArtifacts(esClientMock); + + for await (const _ of iterator) { + break; + } + + await expect(iterator[Symbol.asyncIterator]().next()).resolves.toEqual({ + done: true, + value: expect.any(Array), + }); + + expect(esClientMock.search).toHaveBeenCalledTimes(1); + }); + + it('should handle throwing in loop by setting `done` to `true`', async () => { + const iterator = fetchAllArtifacts(esClientMock); + + try { + for await (const _ of iterator) { + throw new Error('test'); + } + } catch (e) { + expect(e); // just to silence eslint + } + + await expect(iterator[Symbol.asyncIterator]().next()).resolves.toEqual({ + done: true, + value: expect.any(Array), + }); + + expect(esClientMock.search).toHaveBeenCalledTimes(1); + }); + }); }); diff --git a/x-pack/plugins/fleet/server/services/artifacts/artifacts.ts b/x-pack/plugins/fleet/server/services/artifacts/artifacts.ts index 5516ab6f70e23..43cf3f745cc6c 100644 --- a/x-pack/plugins/fleet/server/services/artifacts/artifacts.ts +++ b/x-pack/plugins/fleet/server/services/artifacts/artifacts.ts @@ -15,6 +15,8 @@ import { isEmpty, sortBy } from 'lodash'; import type { ElasticsearchClient } from '@kbn/core/server'; +import { createEsSearchIterable } from '../utils/create_es_search_iterable'; + import type { ListResult } from '../../../common/types'; import { FLEET_SERVER_ARTIFACTS_INDEX } from '../../../common'; @@ -34,6 +36,7 @@ import type { ArtifactsClientCreateOptions, ListArtifactsProps, NewArtifact, + FetchAllArtifactsOptions, } from './types'; import { esSearchHitToArtifact, @@ -137,10 +140,10 @@ export const bulkCreateArtifacts = async ( artifacts, appContextService.getConfig()?.createArtifactsBulkBatchSize ); - const logger = appContextService.getLogger(); const nonConflictErrors = []; logger.debug(`Number of batches generated for fleet artifacts: ${batches.length}`); + for (let batchN = 0; batchN < batches.length; batchN++) { logger.debug( `Creating artifacts for batch ${batchN + 1} with ${batches[batchN].length / 2} artifacts` @@ -154,12 +157,22 @@ export const bulkCreateArtifacts = async ( refresh, }) ); + // Track errors of the bulk create action if (res.errors) { nonConflictErrors.push( ...res.items.reduce((acc, item) => { - if (item.create?.status !== 409) { - acc.push(new Error(item.create?.error?.reason)); + // 409's (conflict - record already exists) are ignored since the artifact already exists + if (item.create && item.create.status !== 409) { + acc.push( + new Error( + `Create of artifact id [${item.create._id}] returned: result [${ + item.create.result + }], status [${item.create.status}], reason [${JSON.stringify( + item.create?.error || '' + )}]` + ) + ); } return acc; }, []) @@ -167,11 +180,6 @@ export const bulkCreateArtifacts = async ( } } - // If any non conflict error, it returns only the errors - if (nonConflictErrors.length > 0) { - return { errors: nonConflictErrors }; - } - // Use non sorted artifacts array to preserve the artifacts order in the response const nonSortedEsArtifactsResponse: Artifact[] = artifacts.map((artifact) => { return esSearchHitToArtifact({ @@ -182,6 +190,7 @@ export const bulkCreateArtifacts = async ( return { artifacts: nonSortedEsArtifactsResponse, + errors: nonConflictErrors.length ? nonConflictErrors : undefined, }; }; @@ -281,3 +290,66 @@ export const encodeArtifactContent = async ( return encodedArtifact; }; + +/** + * Returns an iterator that loops through all the artifacts stored in the index + * + * @param esClient + * @param options + * + * @example + * + * async () => { + * for await (const value of fetchAllArtifactsIterator()) { + * // process page of data here + * } + * } + */ +export const fetchAllArtifacts = ( + esClient: ElasticsearchClient, + options: FetchAllArtifactsOptions = {} +): AsyncIterable => { + const { kuery = '', perPage = 1000, sortField, sortOrder, includeArtifactBody = true } = options; + + return createEsSearchIterable({ + esClient, + searchRequest: { + index: FLEET_SERVER_ARTIFACTS_INDEX, + rest_total_hits_as_int: true, + track_total_hits: false, + q: kuery, + size: perPage, + sort: [ + { + // MUST have a sort field and sort order + [sortField || 'created']: { + order: sortOrder || 'asc', + }, + }, + ], + _source_excludes: includeArtifactBody ? undefined : 'body', + }, + resultsMapper: (data): Artifact[] => { + return data.hits.hits.map((hit) => { + // @ts-expect-error @elastic/elasticsearch _source is optional + const artifact = esSearchHitToArtifact(hit); + + // If not body attribute is included, still create the property in the object (since the + // return type is `Artifact` and `body` is required), but throw an error is caller attempts + // to still access it. + if (!includeArtifactBody) { + Object.defineProperty(artifact, 'body', { + enumerable: false, + get(): string { + throw new Error( + `'body' attribute not included due to request to 'fetchAllArtifacts()' having options 'includeArtifactBody' set to 'false'` + ); + }, + }); + } + + return artifact; + }); + }, + }); +}; diff --git a/x-pack/plugins/fleet/server/services/artifacts/client.ts b/x-pack/plugins/fleet/server/services/artifacts/client.ts index 7ba2452e83fe7..0b40a7acdcc8d 100644 --- a/x-pack/plugins/fleet/server/services/artifacts/client.ts +++ b/x-pack/plugins/fleet/server/services/artifacts/client.ts @@ -17,6 +17,7 @@ import type { ArtifactsClientInterface, NewArtifact, ListArtifactsProps, + FetchAllArtifactsOptions, } from './types'; import { relativeDownloadUrlFromArtifact, uniqueIdFromId } from './mappings'; @@ -29,6 +30,7 @@ import { listArtifacts, bulkCreateArtifacts, bulkDeleteArtifacts, + fetchAllArtifacts, } from './artifacts'; /** @@ -49,6 +51,15 @@ export class FleetArtifactsClient implements ArtifactsClientInterface { return artifact; } + /** + * Creates a `kuery` string using the provided value on input that is bound to the integration package + * @param kuery + * @private + */ + private buildFilter(kuery: string): string { + return `(package_name: "${this.packageName}")${kuery ? ` AND ${kuery}` : ''}`; + } + async getArtifact(id: string): Promise { const artifact = await getArtifact(this.esClient, id); return artifact ? this.validate(artifact) : undefined; @@ -119,20 +130,37 @@ export class FleetArtifactsClient implements ArtifactsClientInterface { } /** - * Get a list of artifacts. - * NOTE that when using the `kuery` filtering param, that all filters property names should - * match the internal attribute names of the index + * Get a list of artifacts. A few things to note: + * - if wanting to get ALL artifacts, consider using instead the `fetchAll()` method instead + * as it will property return data past the 10k ES limitation + * - when using the `kuery` filtering param, all filters property names should match the + * internal attribute names in the index */ async listArtifacts({ kuery, ...options }: ListArtifactsProps = {}): Promise< ListResult > { - // All filtering for artifacts should be bound to the `packageName`, so we insert - // that into the KQL value and use `AND` to add the defined `kuery` (if any) to it. - const filter = `(package_name: "${this.packageName}")${kuery ? ` AND ${kuery}` : ''}`; - return listArtifacts(this.esClient, { ...options, - kuery: filter, + kuery: this.buildFilter(kuery), + }); + } + + /** + * Returns an `AsyncIterable` object that can be used to iterate over all artifacts + * + * @param options + * + * @example + * async () => { + * for await (const artifacts of fleetArtifactsClient.fetchAll()) { + * // artifacts === first page of items + * } + * } + */ + fetchAll({ kuery, ...options }: FetchAllArtifactsOptions = {}): AsyncIterable { + return fetchAllArtifacts(this.esClient, { + ...options, + kuery: this.buildFilter(kuery), }); } diff --git a/x-pack/plugins/fleet/server/services/artifacts/mocks.ts b/x-pack/plugins/fleet/server/services/artifacts/mocks.ts index dc831558cb7bb..4e5d8c93f0643 100644 --- a/x-pack/plugins/fleet/server/services/artifacts/mocks.ts +++ b/x-pack/plugins/fleet/server/services/artifacts/mocks.ts @@ -44,6 +44,34 @@ export const createArtifactsClientMock = (): jest.Mocked { + return createFetchAllArtifactsIterableMock(); + }), + }; +}; + +export const createFetchAllArtifactsIterableMock = (artifactPages: Artifact[][] = []) => { + const totalPagesOfResults = artifactPages.length; + let nextResults = 0; + + return { + [Symbol.asyncIterator]() { + return { + async next() { + return { + value: artifactPages[nextResults++] ?? [], + done: nextResults > totalPagesOfResults, + }; + }, + + async return() { + return { + value: [], + done: true, + }; + }, + }; + }, }; }; @@ -100,6 +128,7 @@ export const generateArtifactEsGetSingleHitMock = ( _version: 1, _score: 1, _source, + sort: ['abc'], }; }; diff --git a/x-pack/plugins/fleet/server/services/artifacts/types.ts b/x-pack/plugins/fleet/server/services/artifacts/types.ts index 4b0aacd92bc20..697815a593fdd 100644 --- a/x-pack/plugins/fleet/server/services/artifacts/types.ts +++ b/x-pack/plugins/fleet/server/services/artifacts/types.ts @@ -72,6 +72,12 @@ export type ListArtifactsProps = Pick & { + sortField?: string | keyof ArtifactElasticsearchProperties; + /** If false, then the `body` property of the Artifact will be excluded from the results. Default is `true` */ + includeArtifactBody?: boolean; +}; + /** * The interface exposed out of Fleet's Artifact service via the client class */ @@ -93,4 +99,6 @@ export interface ArtifactsClientInterface { encodeContent(content: ArtifactsClientCreateOptions['content']): Promise; generateHash(content: string): string; + + fetchAll(options?: FetchAllArtifactsOptions): AsyncIterable; } diff --git a/x-pack/plugins/fleet/server/services/package_policies/index.ts b/x-pack/plugins/fleet/server/services/package_policies/index.ts index d0d4fa4aae825..a7eacdc76a3a7 100644 --- a/x-pack/plugins/fleet/server/services/package_policies/index.ts +++ b/x-pack/plugins/fleet/server/services/package_policies/index.ts @@ -7,3 +7,4 @@ export * from './experimental_datastream_features'; export * from './package_policy_name_helper'; +export * from './utils'; diff --git a/x-pack/plugins/fleet/server/services/package_policies/utils.test.ts b/x-pack/plugins/fleet/server/services/package_policies/utils.test.ts new file mode 100644 index 0000000000000..363ffe9c38fa4 --- /dev/null +++ b/x-pack/plugins/fleet/server/services/package_policies/utils.test.ts @@ -0,0 +1,49 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { PackagePolicyMocks } from '../../mocks'; + +import { mapPackagePolicySavedObjectToPackagePolicy } from './utils'; + +describe('Package Policy Utils', () => { + describe('mapPackagePolicySavedObjectToPackagePolicy()', () => { + it('should return only exposed SO properties', () => { + const soItem = + PackagePolicyMocks.generatePackagePolicySavedObjectFindResponse().saved_objects.at(0)!; + + expect(mapPackagePolicySavedObjectToPackagePolicy(soItem)).toEqual({ + agents: 2, + created_at: '2024-01-24T15:21:13.389Z', + created_by: 'elastic', + description: 'Policy for things', + elasticsearch: { + privileges: { + cluster: [], + }, + }, + enabled: true, + id: 'so-123', + inputs: [], + is_managed: false, + name: 'Package Policy 1', + namespace: 'default', + package: { + name: 'endpoint', + title: 'Elastic Endpoint', + version: '1.0.0', + }, + policy_id: '444-555-666', + revision: 1, + secret_references: [], + updated_at: '2024-01-25T15:21:13.389Z', + updated_by: 'user-a', + vars: {}, + version: 'abc', + }); + }); + }); +}); diff --git a/x-pack/plugins/fleet/server/services/package_policies/utils.ts b/x-pack/plugins/fleet/server/services/package_policies/utils.ts new file mode 100644 index 0000000000000..309db211bbf14 --- /dev/null +++ b/x-pack/plugins/fleet/server/services/package_policies/utils.ts @@ -0,0 +1,59 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import type { SavedObject } from '@kbn/core-saved-objects-common/src/server_types'; + +import type { PackagePolicy, PackagePolicySOAttributes } from '../../types'; + +export const mapPackagePolicySavedObjectToPackagePolicy = ({ + /* eslint-disable @typescript-eslint/naming-convention */ + id, + version, + attributes: { + name, + description, + namespace, + enabled, + is_managed, + policy_id, + // `package` is a reserved keyword + package: packageInfo, + inputs, + vars, + elasticsearch, + agents, + revision, + secret_references, + updated_at, + updated_by, + created_at, + created_by, + /* eslint-enable @typescript-eslint/naming-convention */ + }, +}: SavedObject): PackagePolicy => { + return { + id, + name, + description, + namespace, + enabled, + is_managed, + policy_id, + package: packageInfo, + inputs, + vars, + elasticsearch, + version, + agents, + revision, + secret_references, + updated_at, + updated_by, + created_at, + created_by, + }; +}; diff --git a/x-pack/plugins/fleet/server/services/package_policy.test.ts b/x-pack/plugins/fleet/server/services/package_policy.test.ts index 24483be93a9f5..cc605900c3a58 100644 --- a/x-pack/plugins/fleet/server/services/package_policy.test.ts +++ b/x-pack/plugins/fleet/server/services/package_policy.test.ts @@ -19,6 +19,8 @@ import type { } from '@kbn/core/server'; import { SavedObjectsErrorHelpers } from '@kbn/core/server'; +import { PackagePolicyMocks } from '../mocks/package_policy.mocks'; + import type { PackageInfo, PackagePolicySOAttributes, @@ -53,6 +55,8 @@ import { import { PACKAGE_POLICY_SAVED_OBJECT_TYPE } from '../constants'; +import { mapPackagePolicySavedObjectToPackagePolicy } from './package_policies'; + import { preconfigurePackageInputs, updatePackageInputs, @@ -4918,6 +4922,149 @@ describe('Package policy service', () => { ).rejects.toEqual(new FleetError('Package notinstalled is not installed')); }); }); + + describe('fetchAllItemIds()', () => { + let soClientMock: ReturnType; + + beforeEach(() => { + soClientMock = savedObjectsClientMock.create(); + + soClientMock.find + .mockResolvedValueOnce(PackagePolicyMocks.generatePackagePolicySavedObjectFindResponse()) + .mockResolvedValueOnce(PackagePolicyMocks.generatePackagePolicySavedObjectFindResponse()) + .mockResolvedValueOnce( + Object.assign(PackagePolicyMocks.generatePackagePolicySavedObjectFindResponse(), { + saved_objects: [], + }) + ); + }); + + it('should return an iterator', async () => { + expect(packagePolicyService.fetchAllItemIds(soClientMock)).toEqual({ + [Symbol.asyncIterator]: expect.any(Function), + }); + }); + + it('should provide item ids on every iteration', async () => { + for await (const ids of packagePolicyService.fetchAllItemIds(soClientMock)) { + expect(ids).toEqual(['so-123', 'so-123']); + } + + expect(soClientMock.find).toHaveBeenCalledTimes(3); + }); + + it('should use default options', async () => { + for await (const ids of packagePolicyService.fetchAllItemIds(soClientMock)) { + expect(ids); + } + + expect(soClientMock.find).toHaveBeenCalledWith( + expect.objectContaining({ + type: PACKAGE_POLICY_SAVED_OBJECT_TYPE, + perPage: 1000, + sortField: 'created_at', + sortOrder: 'asc', + fields: [], + filter: undefined, + }) + ); + }); + + it('should use custom options when defined', async () => { + for await (const ids of packagePolicyService.fetchAllItemIds(soClientMock, { + perPage: 13, + kuery: 'one=two', + })) { + expect(ids); + } + + expect(soClientMock.find).toHaveBeenCalledWith( + expect.objectContaining({ + type: PACKAGE_POLICY_SAVED_OBJECT_TYPE, + perPage: 13, + sortField: 'created_at', + sortOrder: 'asc', + fields: [], + filter: 'one=two', + }) + ); + }); + }); + + describe('fetchAllItems()', () => { + let soClientMock: ReturnType; + + beforeEach(() => { + soClientMock = savedObjectsClientMock.create(); + + soClientMock.find + .mockResolvedValueOnce(PackagePolicyMocks.generatePackagePolicySavedObjectFindResponse()) + .mockResolvedValueOnce(PackagePolicyMocks.generatePackagePolicySavedObjectFindResponse()) + .mockResolvedValueOnce( + Object.assign(PackagePolicyMocks.generatePackagePolicySavedObjectFindResponse(), { + saved_objects: [], + }) + ); + }); + + it('should return an iterator', async () => { + expect(packagePolicyService.fetchAllItems(soClientMock)).toEqual({ + [Symbol.asyncIterator]: expect.any(Function), + }); + }); + + it('should provide items on every iteration', async () => { + for await (const items of packagePolicyService.fetchAllItems(soClientMock)) { + expect(items).toEqual( + PackagePolicyMocks.generatePackagePolicySavedObjectFindResponse().saved_objects.map( + (soItem) => { + return mapPackagePolicySavedObjectToPackagePolicy(soItem); + } + ) + ); + } + + expect(soClientMock.find).toHaveBeenCalledTimes(3); + }); + + it('should use default options', async () => { + for await (const ids of packagePolicyService.fetchAllItemIds(soClientMock)) { + expect(ids); + } + + expect(soClientMock.find).toHaveBeenCalledWith( + expect.objectContaining({ + type: PACKAGE_POLICY_SAVED_OBJECT_TYPE, + perPage: 1000, + sortField: 'created_at', + sortOrder: 'asc', + fields: [], + filter: undefined, + }) + ); + }); + + it('should use custom options when defined', async () => { + for await (const ids of packagePolicyService.fetchAllItems(soClientMock, { + kuery: 'one=two', + perPage: 12, + sortOrder: 'desc', + sortField: 'updated_by', + })) { + expect(ids); + } + + expect(soClientMock.find).toHaveBeenCalledWith( + expect.objectContaining({ + type: PACKAGE_POLICY_SAVED_OBJECT_TYPE, + perPage: 12, + sortField: 'updated_by', + sortOrder: 'desc', + filter: 'one=two', + }) + ); + }); + }); }); describe('getUpgradeDryRunDiff', () => { diff --git a/x-pack/plugins/fleet/server/services/package_policy.ts b/x-pack/plugins/fleet/server/services/package_policy.ts index e89ac0160f62c..45753540af256 100644 --- a/x-pack/plugins/fleet/server/services/package_policy.ts +++ b/x-pack/plugins/fleet/server/services/package_policy.ts @@ -95,6 +95,8 @@ import type { } from '../types'; import type { ExternalCallback } from '..'; +import { createSoFindIterable } from './utils/create_so_find_iterable'; + import type { FleetAuthzRouteConfig } from './security'; import { getAuthzFromRequest, doesNotHaveRequiredFleetAuthz } from './security'; @@ -109,9 +111,16 @@ import { appContextService } from '.'; import { removeOldAssets } from './epm/packages/cleanup'; import type { PackageUpdateEvent, UpdateEventType } from './upgrade_sender'; import { sendTelemetryEvents } from './upgrade_sender'; -import { handleExperimentalDatastreamFeatureOptIn } from './package_policies'; +import { + handleExperimentalDatastreamFeatureOptIn, + mapPackagePolicySavedObjectToPackagePolicy, +} from './package_policies'; import { updateDatastreamExperimentalFeatures } from './epm/packages/update'; -import type { PackagePolicyClient, PackagePolicyService } from './package_policy_service'; +import type { + PackagePolicyClient, + PackagePolicyClientFetchAllItemsOptions, + PackagePolicyService, +} from './package_policy_service'; import { installAssetsForInputPackagePolicy } from './epm/packages/install'; import { auditLoggingService } from './audit_logging'; import { @@ -122,6 +131,7 @@ import { } from './secrets'; import { getPackageAssetsMap } from './epm/packages/get'; import { validateOutputForNewPackagePolicy } from './agent_policies/outputs_helpers'; +import type { PackagePolicyClientFetchAllItemIdsOptions } from './package_policy_service'; export type InputsOverride = Partial & { vars?: Array; @@ -1886,6 +1896,60 @@ class PackagePolicyClientImpl implements PackagePolicyClient { } } } + + fetchAllItemIds( + soClient: SavedObjectsClientContract, + { perPage = 1000, kuery }: PackagePolicyClientFetchAllItemIdsOptions = {} + ): AsyncIterable { + // TODO:PT Question for fleet team: do I need to `auditLoggingService.writeCustomSoAuditLog()` here? Its only IDs + + return createSoFindIterable<{}>({ + soClient, + findRequest: { + type: SAVED_OBJECT_TYPE, + perPage, + sortField: 'created_at', + sortOrder: 'asc', + fields: [], + filter: kuery ? normalizeKuery(SAVED_OBJECT_TYPE, kuery) : undefined, + }, + resultsMapper: (data) => { + return data.saved_objects.map((packagePolicySO) => packagePolicySO.id); + }, + }); + } + + fetchAllItems( + soClient: SavedObjectsClientContract, + { + perPage = 1000, + kuery, + sortOrder = 'asc', + sortField = 'created_at', + }: PackagePolicyClientFetchAllItemsOptions = {} + ): AsyncIterable { + return createSoFindIterable({ + soClient, + findRequest: { + type: SAVED_OBJECT_TYPE, + sortField, + sortOrder, + perPage, + filter: kuery ? normalizeKuery(SAVED_OBJECT_TYPE, kuery) : undefined, + }, + resultsMapper(data) { + return data.saved_objects.map((packagePolicySO) => { + auditLoggingService.writeCustomSoAuditLog({ + action: 'find', + id: packagePolicySO.id, + savedObjectType: PACKAGE_POLICY_SAVED_OBJECT_TYPE, + }); + + return mapPackagePolicySavedObjectToPackagePolicy(packagePolicySO); + }); + }, + }); + } } export class PackagePolicyServiceImpl diff --git a/x-pack/plugins/fleet/server/services/package_policy_service.ts b/x-pack/plugins/fleet/server/services/package_policy_service.ts index 9519cafbc6a73..de960c44b7879 100644 --- a/x-pack/plugins/fleet/server/services/package_policy_service.ts +++ b/x-pack/plugins/fleet/server/services/package_policy_service.ts @@ -213,4 +213,31 @@ export interface PackagePolicyClient { packageInfo: PackageInfo; experimentalDataStreamFeatures: ExperimentalDataStreamFeature[]; }>; + + /** + * Returns an `AsyncIterable` for retrieving all integration policy IDs + * @param soClient + * @param options + */ + fetchAllItemIds( + soClient: SavedObjectsClientContract, + options?: PackagePolicyClientFetchAllItemIdsOptions + ): AsyncIterable; + + /** + * Returns an `AsyncIterable` for retrieving all integration policies + * @param soClient + * @param options + */ + fetchAllItems( + soClient: SavedObjectsClientContract, + options?: PackagePolicyClientFetchAllItemsOptions + ): AsyncIterable; } + +export type PackagePolicyClientFetchAllItemIdsOptions = Pick; + +export type PackagePolicyClientFetchAllItemsOptions = Pick< + ListWithKuery, + 'perPage' | 'kuery' | 'sortField' | 'sortOrder' +>; diff --git a/x-pack/plugins/fleet/server/services/utils/create_es_search_iterable.ts b/x-pack/plugins/fleet/server/services/utils/create_es_search_iterable.ts new file mode 100644 index 0000000000000..ae4cb9551bc8c --- /dev/null +++ b/x-pack/plugins/fleet/server/services/utils/create_es_search_iterable.ts @@ -0,0 +1,165 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import type { ElasticsearchClient } from '@kbn/core-elasticsearch-server'; + +import type * as estypes from '@kbn/es-types'; + +import type { SearchRequest, SearchResponse } from '@elastic/elasticsearch/lib/api/types'; + +export interface CreateEsSearchIterableOptions { + esClient: ElasticsearchClient; + searchRequest: Omit & + Pick, 'sort' | 'index'>; + /** + * An optional callback for mapping the results retrieved from ES. If defined, the iterator + * `value` will be set to the data returned by this mapping function. + * + * @param data + */ + resultsMapper?: (data: SearchResponse) => any; + /** If a Point in Time should be used while executing the search. Defaults to `true` */ + usePointInTime?: boolean; +} + +export type InferEsSearchIteratorResultValue = + CreateEsSearchIterableOptions['resultsMapper'] extends undefined + ? SearchResponse + : ReturnType>['resultsMapper']>; + +/** + * Creates an `AsyncIterable` that can be used to iterate (ex. via `for..await..of`) over all the data + * matching the search query. The search request to ES will use `search_after`, thus can iterate over + * datasets above 10k items as well. + * + * @param options + * + * @example + * + * const yourFn = async () => { + * const dataIterable = createEsSearchIterable({ + * esClient, + * searchRequest: { + * index: 'some-index', + * sort: [ + * { + * created: { order: 'asc' } + * } + * ] + * } + * }); + * + * for await (const data of dataIterable) { + * // data === your search results + * } + * } + */ +export const createEsSearchIterable = ({ + esClient, + searchRequest: { size = 1000, index, ...searchOptions }, + resultsMapper, + usePointInTime = true, +}: CreateEsSearchIterableOptions): AsyncIterable< + InferEsSearchIteratorResultValue +> => { + const keepAliveValue = '5m'; + let done = false; + let value: SearchResponse; + let searchAfterValue: estypes.SearchHit['sort'] | undefined; + let pointInTime: Promise<{ id: string }> = usePointInTime + ? esClient.openPointInTime({ + index, + ignore_unavailable: true, + keep_alive: keepAliveValue, + }) + : Promise.resolve({ id: '' }); + + const createIteratorResult = (): IteratorResult> => { + return { done, value }; + }; + + const setValue = (searchResponse: SearchResponse): void => { + value = resultsMapper ? resultsMapper(searchResponse) : searchResponse; + }; + + const setDone = async (): Promise => { + done = true; + + if (usePointInTime) { + const pitId = (await pointInTime).id; + + if (pitId) { + await esClient.closePointInTime({ id: pitId }); + } + } + }; + + const fetchData = async () => { + const pitId = (await pointInTime).id; + + const searchResult = await esClient + .search({ + ...searchOptions, + size, + ...(usePointInTime + ? { + pit: { + id: pitId, + keep_alive: keepAliveValue, + }, + } + : { index }), + search_after: searchAfterValue, + }) + .catch((e) => { + Error.captureStackTrace(e); + throw e; + }); + + const searchHits = searchResult.hits.hits; + const lastSearchHit = searchHits[searchHits.length - 1]; + + if (searchHits.length === 0) { + await setDone(); + return; + } + + searchAfterValue = lastSearchHit.sort; + pointInTime = Promise.resolve({ id: searchResult.pit_id ?? '' }); + setValue(searchResult); + + // If (for some reason) we don't have a `searchAfterValue`, + // then throw an error, or else we'll keep looping forever + if (!searchAfterValue) { + await setDone(); + throw new Error( + `Unable to store 'search_after' value. Last 'SearchHit' did not include a 'sort' property \n(did you forget to set the 'sort' attribute on your SearchRequest?)':\n${JSON.stringify( + lastSearchHit + )}` + ); + } + }; + + return { + [Symbol.asyncIterator]() { + return { + async next() { + if (!done) { + await fetchData(); + } + + return createIteratorResult(); + }, + + async return() { + done = true; + return createIteratorResult(); + }, + }; + }, + }; +}; diff --git a/x-pack/plugins/fleet/server/services/utils/create_so_find_iterable.ts b/x-pack/plugins/fleet/server/services/utils/create_so_find_iterable.ts new file mode 100644 index 0000000000000..6b17b3ba98040 --- /dev/null +++ b/x-pack/plugins/fleet/server/services/utils/create_so_find_iterable.ts @@ -0,0 +1,142 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import type { + SavedObjectsClientContract, + SavedObjectsFindOptions, + SavedObjectsFindResponse, + SavedObjectsFindResult, +} from '@kbn/core-saved-objects-api-server'; + +export interface CreateSoFindIterableOptions { + soClient: SavedObjectsClientContract; + findRequest: Omit & + // sortField is required + Pick, 'sortField'>; + /** + * An optional callback for mapping the results retrieved from SavedObjects. If defined, the iterator + * `value` will be set to the data returned by this mapping function. + * + * @param data + */ + resultsMapper?: (data: SavedObjectsFindResponse) => any; + /** If a Point in Time should be used while executing the search. Defaults to `true` */ + usePointInTime?: boolean; +} + +export type InferSoFindIteratorResultValue = + CreateSoFindIterableOptions['resultsMapper'] extends undefined + ? SavedObjectsFindResponse + : ReturnType>['resultsMapper']>; + +/** + * Creates an `AsyncIterable` that can be used to iterate (ex. via `for..await..of`) over all the data + * matching the search query. The search request to Saved Object will use `searchAfter`, thus can iterate over + * datasets above 10k items as well. + * + * @param options + */ +export const createSoFindIterable = ({ + soClient, + findRequest: { perPage = 1000, ...findOptions }, + resultsMapper, + usePointInTime = true, +}: CreateSoFindIterableOptions): AsyncIterable< + InferSoFindIteratorResultValue +> => { + const keepAliveValue = '5m'; + let done = false; + let value: SavedObjectsFindResponse; + let searchAfterValue: SavedObjectsFindResult['sort'] | undefined; + let pointInTime: Promise<{ id: string }> = usePointInTime + ? soClient.openPointInTimeForType(findOptions.type, { keepAlive: keepAliveValue }) + : Promise.resolve({ id: '' }); + + const setValue = (findResponse: SavedObjectsFindResponse): void => { + value = resultsMapper ? resultsMapper(findResponse) : findResponse; + }; + + const setDone = async (): Promise => { + done = true; + + if (usePointInTime) { + const pitId = (await pointInTime).id; + + if (pitId) { + await soClient.closePointInTime(pitId); + } + } + }; + + const fetchData = async () => { + const findResult = await soClient + .find({ + ...findOptions, + ...(usePointInTime + ? { + pit: { + id: (await pointInTime).id, + keepAlive: keepAliveValue, + }, + } + : {}), + perPage, + searchAfter: searchAfterValue, + }) + .catch((e) => { + Error.captureStackTrace(e); + throw e; + }); + + const soItems = findResult.saved_objects; + const lastSearchHit = soItems[soItems.length - 1]; + + if (soItems.length === 0) { + setValue(findResult); + await setDone(); + return; + } + + searchAfterValue = lastSearchHit.sort; + pointInTime = Promise.resolve({ id: findResult.pit_id ?? '' }); + setValue(findResult); + + // If (for some reason) we don't have a `searchAfterValue`, + // then throw an error, or else we'll keep looping forever + if (!searchAfterValue) { + await setDone(); + throw new Error( + `Unable to store 'searchAfter' value. Last 'SavedObjectsFindResult' did not include a 'sort' property \n(did you forget to set the 'sortField' attribute on your SavedObjectsFindOptions?)':\n${JSON.stringify( + lastSearchHit + )}` + ); + } + }; + + const createIteratorResult = (): IteratorResult> => { + return { done, value }; + }; + + return { + [Symbol.asyncIterator]() { + return { + async next() { + if (!done) { + await fetchData(); + } + + return createIteratorResult(); + }, + + async return() { + done = true; + return createIteratorResult(); + }, + }; + }, + }; +}; diff --git a/x-pack/plugins/security_solution/common/endpoint/errors.ts b/x-pack/plugins/security_solution/common/endpoint/errors.ts index 495afaa126e2b..2cd7fd931583b 100644 --- a/x-pack/plugins/security_solution/common/endpoint/errors.ts +++ b/x-pack/plugins/security_solution/common/endpoint/errors.ts @@ -14,5 +14,9 @@ export class EndpointError extends Error { super(message); // For debugging - capture name of subclasses this.name = this.constructor.name; + + if (meta instanceof Error) { + this.stack += `\n----- original error -----\n${meta.stack}`; + } } } diff --git a/x-pack/plugins/security_solution/server/config.mock.ts b/x-pack/plugins/security_solution/server/config.mock.ts index 855cec11ab16e..9f61523dbbe8b 100644 --- a/x-pack/plugins/security_solution/server/config.mock.ts +++ b/x-pack/plugins/security_solution/server/config.mock.ts @@ -22,6 +22,7 @@ export const createMockConfig = (): ConfigType => { maxTimelineImportPayloadBytes: 10485760, enableExperimental, packagerTaskInterval: '60s', + packagerTaskTimeout: '5m', packagerTaskPackagePolicyUpdateBatchSize: 10, prebuiltRulesPackageVersion: '', alertMergeStrategy: 'missingFields', diff --git a/x-pack/plugins/security_solution/server/config.ts b/x-pack/plugins/security_solution/server/config.ts index adc8fbfb1174c..4cb9ff479fff1 100644 --- a/x-pack/plugins/security_solution/server/config.ts +++ b/x-pack/plugins/security_solution/server/config.ts @@ -92,14 +92,20 @@ export const configSchema = schema.object({ }), /** - * Artifacts Configuration + * Endpoint Artifacts Configuration: the interval between runs of the task that builds the + * artifacts and associated manifest. */ packagerTaskInterval: schema.string({ defaultValue: '60s' }), + /** + * Endpoint Artifacts Configuration: timeout value for how long the task should run. + */ + packagerTaskTimeout: schema.string({ defaultValue: '20m' }), + /** * Artifacts Configuration for package policy update concurrency */ - packagerTaskPackagePolicyUpdateBatchSize: schema.number({ defaultValue: 10, max: 50, min: 1 }), + packagerTaskPackagePolicyUpdateBatchSize: schema.number({ defaultValue: 25, max: 50, min: 1 }), /** * For internal use. Specify which version of the Detection Rules fleet package to install diff --git a/x-pack/plugins/security_solution/server/endpoint/lib/artifacts/task.test.ts b/x-pack/plugins/security_solution/server/endpoint/lib/artifacts/task.test.ts index a0ad1f9712be1..a1988cb7a13ae 100644 --- a/x-pack/plugins/security_solution/server/endpoint/lib/artifacts/task.test.ts +++ b/x-pack/plugins/security_solution/server/endpoint/lib/artifacts/task.test.ts @@ -19,7 +19,7 @@ import { getMockArtifacts } from './mocks'; import { InvalidInternalManifestError } from '../../services/artifacts/errors'; import { loggingSystemMock } from '@kbn/core/server/mocks'; -describe('task', () => { +describe('Endpoint artifact packager task', () => { const MOCK_TASK_INSTANCE = { id: `${ManifestTaskConstants.TYPE}:1.0.0`, runAt: new Date(), @@ -170,7 +170,7 @@ describe('task', () => { await runTask(manifestManager); - expect(logger.info).toHaveBeenCalledWith('recovering from invalid internal manifest'); + expect(logger.warn).toHaveBeenCalledWith('recovering from invalid internal manifest'); expect(logger.error).toHaveBeenNthCalledWith(1, expect.any(InvalidInternalManifestError)); }); diff --git a/x-pack/plugins/security_solution/server/endpoint/lib/artifacts/task.ts b/x-pack/plugins/security_solution/server/endpoint/lib/artifacts/task.ts index dafa13141a0c6..8547eb6dca11c 100644 --- a/x-pack/plugins/security_solution/server/endpoint/lib/artifacts/task.ts +++ b/x-pack/plugins/security_solution/server/endpoint/lib/artifacts/task.ts @@ -22,6 +22,10 @@ import { wrapErrorIfNeeded } from '../../utils'; import { EndpointError } from '../../../../common/endpoint/errors'; export const ManifestTaskConstants = { + /** + * No longer used. Timeout value now comes from `xpack.securitySolution.packagerTaskTimeout` + * @deprecated + */ TIMEOUT: '1m', TYPE: 'endpoint:user-artifact-packager', VERSION: '1.0.0', @@ -44,22 +48,37 @@ export class ManifestTask { constructor(setupContract: ManifestTaskSetupContract) { this.endpointAppContext = setupContract.endpointAppContext; this.logger = this.endpointAppContext.logFactory.get(this.getTaskId()); + const { packagerTaskInterval, packagerTaskTimeout, packagerTaskPackagePolicyUpdateBatchSize } = + this.endpointAppContext.serverConfig; + + this.logger.info( + `Registering ${ManifestTaskConstants.TYPE} task with timeout of [${packagerTaskTimeout}], interval of [${packagerTaskInterval}] and policy update batch size of [${packagerTaskPackagePolicyUpdateBatchSize}]` + ); setupContract.taskManager.registerTaskDefinitions({ [ManifestTaskConstants.TYPE]: { title: 'Security Solution Endpoint Exceptions Handler', - timeout: ManifestTaskConstants.TIMEOUT, + timeout: packagerTaskTimeout, createTaskRunner: ({ taskInstance }: { taskInstance: ConcreteTaskInstance }) => { return { run: async () => { - const taskInterval = (await this.endpointAppContext.config()).packagerTaskInterval; - const startTime = new Date().getTime(); + const taskInterval = packagerTaskInterval; + const startTime = new Date(); + + this.logger.info(`Started. Checking for changes to endpoint artifacts`); + await this.runTask(taskInstance.id); + const endTime = new Date().getTime(); - this.logger.debug( - `${ManifestTaskConstants.TYPE} task run took ${endTime - startTime}ms` + + this.logger.info( + `Complete. Task run took ${ + endTime - startTime.getTime() + }ms [ stated: ${startTime.toISOString()} ]` ); + const nextRun = new Date(); + if (taskInterval.endsWith('s')) { const seconds = parseInt(taskInterval.slice(0, -1), 10); nextRun.setSeconds(nextRun.getSeconds() + seconds); @@ -70,12 +89,20 @@ export class ManifestTask { this.logger.error(`Invalid task interval: ${taskInterval}`); return; } + return { state: {}, runAt: nextRun, }; }, - cancel: async () => {}, + cancel: async () => { + // TODO:PT add support for AbortController to Task manager + this.logger.warn( + 'Task run was canceled. Packaging of endpoint artifacts may be taking longer due to the ' + + 'amount of policies/artifacts. Consider increasing the `xpack.securitySolution.packagerTaskTimeout` ' + + 'server configuration setting if this continues' + ); + }, }; }, }, @@ -91,7 +118,7 @@ export class ManifestTask { taskType: ManifestTaskConstants.TYPE, scope: ['securitySolution'], schedule: { - interval: (await this.endpointAppContext.config()).packagerTaskInterval, + interval: this.endpointAppContext.serverConfig.packagerTaskInterval, }, state: {}, params: { version: ManifestTaskConstants.VERSION }, @@ -127,23 +154,29 @@ export class ManifestTask { } try { - let oldManifest: Manifest | null; + let oldManifest: Manifest | null = null; try { // Last manifest we computed, which was saved to ES oldManifest = await manifestManager.getLastComputedManifest(); } catch (e) { + this.logger.error(e); + // Lets recover from a failure in getting the internal manifest map by creating an empty default manifest if (e instanceof InvalidInternalManifestError) { - this.logger.error(e); - this.logger.info('recovering from invalid internal manifest'); + this.logger.warn('recovering from invalid internal manifest'); oldManifest = ManifestManager.createDefaultManifest(); + } else { + this.logger.error( + `unable to recover from error while attempting to retrieve last computed manifest` + ); + + return; } } - // eslint-disable-next-line @typescript-eslint/no-non-null-assertion - if (oldManifest! == null) { - this.logger.debug('Last computed manifest not available yet'); + if (!oldManifest) { + this.logger.info('Last computed manifest not available yet'); return; } @@ -152,10 +185,17 @@ export class ManifestTask { const diff = newManifest.diff(oldManifest); + this.logger.debug( + `New -vs- old manifest diff counts: ${Object.entries(diff).map( + ([diffType, diffItems]) => `${diffType}: ${diffItems.length}` + )}` + ); + const persistErrors = await manifestManager.pushArtifacts( diff.additions as InternalArtifactCompleteSchema[], newManifest ); + if (persistErrors.length) { reportErrors(this.logger, persistErrors); throw new Error('Unable to persist new artifacts.'); @@ -167,8 +207,9 @@ export class ManifestTask { await manifestManager.commit(newManifest); } - // Try dispatching to ingest-manager package policies + // Dispatch updates to Fleet integration policies with new manifest info const dispatchErrors = await manifestManager.tryDispatch(newManifest); + if (dispatchErrors.length) { reportErrors(this.logger, dispatchErrors); throw new Error('Error dispatching manifest.'); @@ -178,9 +219,11 @@ export class ManifestTask { const deleteErrors = await manifestManager.deleteArtifacts( diff.removals.map((artifact) => getArtifactId(artifact)) ); + if (deleteErrors.length) { reportErrors(this.logger, deleteErrors); } + await manifestManager.cleanup(newManifest); } catch (err) { this.logger.error(wrapErrorIfNeeded(err)); diff --git a/x-pack/plugins/security_solution/server/endpoint/services/artifacts/artifact_client.ts b/x-pack/plugins/security_solution/server/endpoint/services/artifacts/artifact_client.ts index 3e00310a5bb64..24c04ee881b8e 100644 --- a/x-pack/plugins/security_solution/server/endpoint/services/artifacts/artifact_client.ts +++ b/x-pack/plugins/security_solution/server/endpoint/services/artifacts/artifact_client.ts @@ -11,9 +11,11 @@ import type { ListArtifactsProps, } from '@kbn/fleet-plugin/server'; import type { ListResult } from '@kbn/fleet-plugin/common'; +import type { FetchAllArtifactsOptions } from '@kbn/fleet-plugin/server/services'; import type { InternalArtifactCompleteSchema } from '../../schemas/artifacts'; -export interface EndpointArtifactClientInterface { +export interface EndpointArtifactClientInterface + extends Pick { getArtifact(id: string): Promise; createArtifact(artifact: InternalArtifactCompleteSchema): Promise; @@ -67,6 +69,15 @@ export class EndpointArtifactClient implements EndpointArtifactClientInterface { return this.fleetArtifacts.listArtifacts(options); } + fetchAll({ + // Our default, unlike the Fleet service, is to NOT include the body of + // the artifact, since we really don't need it when processing all artifacts + includeArtifactBody = false, + ...options + }: FetchAllArtifactsOptions = {}): AsyncIterable { + return this.fleetArtifacts.fetchAll({ ...options, includeArtifactBody }); + } + async createArtifact( artifact: InternalArtifactCompleteSchema ): Promise { diff --git a/x-pack/plugins/security_solution/server/endpoint/services/artifacts/manifest_manager/manifest_manager.test.ts b/x-pack/plugins/security_solution/server/endpoint/services/artifacts/manifest_manager/manifest_manager.test.ts index 9ff37c67e613d..1d935ccb905d0 100644 --- a/x-pack/plugins/security_solution/server/endpoint/services/artifacts/manifest_manager/manifest_manager.test.ts +++ b/x-pack/plugins/security_solution/server/endpoint/services/artifacts/manifest_manager/manifest_manager.test.ts @@ -39,6 +39,10 @@ import { EndpointError } from '../../../../../common/endpoint/errors'; import type { Artifact } from '@kbn/fleet-plugin/server'; import { AppFeatureSecurityKey } from '@kbn/security-solution-features/keys'; import type { ExceptionListItemSchema } from '@kbn/securitysolution-io-ts-list-types/src/response/exception_list_item_schema'; +import { + createFetchAllArtifactsIterableMock, + generateArtifactMock, +} from '@kbn/fleet-plugin/server/services/artifacts/mocks'; const getArtifactObject = (artifact: InternalArtifactSchema) => JSON.parse(Buffer.from(artifact.body!, 'base64').toString()); @@ -76,12 +80,9 @@ describe('ManifestManager', () => { const ARTIFACT_NAME_BLOCKLISTS_WINDOWS = 'endpoint-blocklist-windows-v1'; const ARTIFACT_NAME_BLOCKLISTS_LINUX = 'endpoint-blocklist-linux-v1'; - const mockPolicyListIdsResponse = (items: string[]) => - jest.fn().mockResolvedValue({ - items, - page: 1, - per_page: 100, - total: items.length, + const getMockPolicyFetchAllItemIds = (items: string[]) => + jest.fn(async function* () { + yield items; }); let ARTIFACTS: InternalArtifactCompleteSchema[] = []; @@ -200,9 +201,7 @@ describe('ManifestManager', () => { ( manifestManagerContext.artifactClient as jest.Mocked - ).listArtifacts.mockImplementation(async () => { - return { items: ARTIFACTS as Artifact[], total: 100, page: 1, perPage: 100 }; - }); + ).fetchAll.mockReturnValue(createFetchAllArtifactsIterableMock([ARTIFACTS as Artifact[]])); const manifest = await manifestManager.getLastComputedManifest(); @@ -259,33 +258,26 @@ describe('ManifestManager', () => { ( manifestManagerContext.artifactClient as jest.Mocked - ).listArtifacts.mockImplementation(async () => { - // report the MACOS Exceptions artifact as not found - return { - items: [ + ).fetchAll.mockReturnValue( + createFetchAllArtifactsIterableMock([ + // report the MACOS Exceptions artifact as not found + [ ARTIFACT_TRUSTED_APPS_MACOS, ARTIFACT_EXCEPTIONS_WINDOWS, ARTIFACT_TRUSTED_APPS_WINDOWS, ARTIFACTS_BY_ID[ARTIFACT_ID_EXCEPTIONS_LINUX], ] as Artifact[], - total: 100, - page: 1, - perPage: 100, - }; - }); + ]) + ); const manifest = await manifestManager.getLastComputedManifest(); expect(manifest?.getAllArtifacts()).toStrictEqual(ARTIFACTS.slice(1, 5)); - expect(manifestManagerContext.logger.error).toHaveBeenCalledWith( - new InvalidInternalManifestError( - `artifact id [${ARTIFACT_ID_EXCEPTIONS_MACOS}] not found!`, - { - entry: ARTIFACTS_BY_ID[ARTIFACT_ID_EXCEPTIONS_MACOS], - action: 'removed from internal ManifestManger tracking map', - } - ) + expect(manifestManagerContext.logger.warn).toHaveBeenCalledWith( + "Missing artifacts detected! Internal artifact manifest (SavedObject version [2.0.0]) references [1] artifact IDs that don't exist.\n" + + "First 10 below (run with logging set to 'debug' to see all):\n" + + 'endpoint-exceptionlist-macos-v1-96b76a1a911662053a1562ac14c4ff1e87c2ff550d6fe52e1e0b3790526597d3' ); }); }); @@ -327,7 +319,9 @@ describe('ManifestManager', () => { const manifestManager = new ManifestManager(context); context.exceptionListClient.findExceptionListItem = mockFindExceptionListItemResponses({}); - context.packagePolicyService.listIds = mockPolicyListIdsResponse([TEST_POLICY_ID_1]); + context.packagePolicyService.fetchAllItemIds = getMockPolicyFetchAllItemIds([ + TEST_POLICY_ID_1, + ]); context.savedObjectsClient.create = jest .fn() @@ -389,7 +383,9 @@ describe('ManifestManager', () => { .mockImplementation((_type: string, object: InternalManifestSchema) => ({ attributes: object, })); - context.packagePolicyService.listIds = mockPolicyListIdsResponse([TEST_POLICY_ID_1]); + context.packagePolicyService.fetchAllItemIds = getMockPolicyFetchAllItemIds([ + TEST_POLICY_ID_1, + ]); const manifest = await manifestManager.buildNewManifest(); @@ -460,7 +456,9 @@ describe('ManifestManager', () => { context.exceptionListClient.findExceptionListItem = mockFindExceptionListItemResponses({ [ENDPOINT_LIST_ID]: { macos: [exceptionListItem] }, }); - context.packagePolicyService.listIds = mockPolicyListIdsResponse([TEST_POLICY_ID_1]); + context.packagePolicyService.fetchAllItemIds = getMockPolicyFetchAllItemIds([ + TEST_POLICY_ID_1, + ]); context.savedObjectsClient.create = jest .fn() .mockImplementation((_type: string, object: InternalManifestSchema) => ({ @@ -576,7 +574,7 @@ describe('ManifestManager', () => { .mockImplementation((_type: string, object: InternalManifestSchema) => ({ attributes: object, })); - context.packagePolicyService.listIds = mockPolicyListIdsResponse([ + context.packagePolicyService.fetchAllItemIds = getMockPolicyFetchAllItemIds([ TEST_POLICY_ID_1, TEST_POLICY_ID_2, ]); @@ -679,7 +677,7 @@ describe('ManifestManager', () => { linux: [trustedAppListItem, trustedAppListItemPolicy2], }, }); - context.packagePolicyService.listIds = mockPolicyListIdsResponse([ + context.packagePolicyService.fetchAllItemIds = getMockPolicyFetchAllItemIds([ TEST_POLICY_ID_1, TEST_POLICY_ID_2, ]); @@ -795,7 +793,9 @@ describe('ManifestManager', () => { .mockImplementation((_type: string, object: InternalManifestSchema) => ({ attributes: object, })); - context.packagePolicyService.listIds = mockPolicyListIdsResponse([TEST_POLICY_ID_1]); + context.packagePolicyService.fetchAllItemIds = getMockPolicyFetchAllItemIds([ + TEST_POLICY_ID_1, + ]); const manifest = await manifestManager.buildNewManifest(); @@ -878,7 +878,9 @@ describe('ManifestManager', () => { .mockImplementation((_type: string, object: InternalManifestSchema) => ({ attributes: object, })); - context.packagePolicyService.listIds = mockPolicyListIdsResponse([TEST_POLICY_ID_1]); + context.packagePolicyService.fetchAllItemIds = getMockPolicyFetchAllItemIds([ + TEST_POLICY_ID_1, + ]); const manifest = await manifestManager.buildNewManifest(); @@ -960,7 +962,9 @@ describe('ManifestManager', () => { .mockImplementation((_type: string, object: InternalManifestSchema) => ({ attributes: object, })); - context.packagePolicyService.listIds = mockPolicyListIdsResponse([TEST_POLICY_ID_1]); + context.packagePolicyService.fetchAllItemIds = getMockPolicyFetchAllItemIds([ + TEST_POLICY_ID_1, + ]); const manifest = await manifestManager.buildNewManifest(); @@ -1026,7 +1030,9 @@ describe('ManifestManager', () => { .mockImplementation((_type: string, object: InternalManifestSchema) => ({ attributes: object, })); - context.packagePolicyService.listIds = mockPolicyListIdsResponse([TEST_POLICY_ID_1]); + context.packagePolicyService.fetchAllItemIds = getMockPolicyFetchAllItemIds([ + TEST_POLICY_ID_1, + ]); const manifest = await manifestManager.buildNewManifest(); @@ -1068,7 +1074,9 @@ describe('ManifestManager', () => { .mockImplementation((_type: string, object: InternalManifestSchema) => ({ attributes: object, })); - context.packagePolicyService.listIds = mockPolicyListIdsResponse([TEST_POLICY_ID_1]); + context.packagePolicyService.fetchAllItemIds = getMockPolicyFetchAllItemIds([ + TEST_POLICY_ID_1, + ]); const manifest = await manifestManager.buildNewManifest(); @@ -1299,12 +1307,9 @@ describe('ManifestManager', () => { }); describe('tryDispatch', () => { - const mockPolicyListResponse = (items: PackagePolicy[]) => - jest.fn().mockResolvedValue({ - items, - page: 1, - per_page: 100, - total: items.length, + const getMockPolicyFetchAllItems = (items: PackagePolicy[]) => + jest.fn(async function* () { + yield items; }); test('Should not dispatch if no policies', async () => { @@ -1313,8 +1318,7 @@ describe('ManifestManager', () => { const manifest = new Manifest({ soVersion: '1.0.0' }); manifest.addEntry(ARTIFACT_EXCEPTIONS_MACOS); - - context.packagePolicyService.list = mockPolicyListResponse([]); + context.packagePolicyService.fetchAllItems = getMockPolicyFetchAllItems([]); await expect(manifestManager.tryDispatch(manifest)).resolves.toStrictEqual([]); @@ -1328,7 +1332,7 @@ describe('ManifestManager', () => { const manifest = new Manifest({ soVersion: '1.0.0' }); manifest.addEntry(ARTIFACT_EXCEPTIONS_MACOS); - context.packagePolicyService.list = mockPolicyListResponse([ + context.packagePolicyService.fetchAllItems = getMockPolicyFetchAllItems([ createPackagePolicyWithConfigMock({ id: TEST_POLICY_ID_1 }), ]); @@ -1346,7 +1350,7 @@ describe('ManifestManager', () => { const manifest = new Manifest({ soVersion: '1.0.0' }); manifest.addEntry(ARTIFACT_EXCEPTIONS_MACOS); - context.packagePolicyService.list = mockPolicyListResponse([ + context.packagePolicyService.fetchAllItems = getMockPolicyFetchAllItems([ createPackagePolicyWithConfigMock({ id: TEST_POLICY_ID_1, config: { @@ -1378,7 +1382,7 @@ describe('ManifestManager', () => { manifest.addEntry(ARTIFACT_EXCEPTIONS_WINDOWS, TEST_POLICY_ID_2); manifest.addEntry(ARTIFACT_TRUSTED_APPS_MACOS, TEST_POLICY_ID_2); - context.packagePolicyService.list = mockPolicyListResponse([ + context.packagePolicyService.fetchAllItems = getMockPolicyFetchAllItems([ createPackagePolicyWithConfigMock({ id: TEST_POLICY_ID_1, config: { @@ -1446,7 +1450,7 @@ describe('ManifestManager', () => { manifest.addEntry(ARTIFACT_EXCEPTIONS_WINDOWS, TEST_POLICY_ID_2); manifest.addEntry(ARTIFACT_TRUSTED_APPS_MACOS, TEST_POLICY_ID_2); - context.packagePolicyService.list = mockPolicyListResponse([ + context.packagePolicyService.fetchAllItems = getMockPolicyFetchAllItems([ createPackagePolicyWithConfigMock({ id: TEST_POLICY_ID_1, config: { @@ -1516,7 +1520,7 @@ describe('ManifestManager', () => { const manifest = new Manifest({ soVersion: '1.0.0', semanticVersion: '1.0.1' }); manifest.addEntry(ARTIFACT_EXCEPTIONS_MACOS); - context.packagePolicyService.list = mockPolicyListResponse([ + context.packagePolicyService.fetchAllItems = getMockPolicyFetchAllItems([ createPackagePolicyWithConfigMock({ id: TEST_POLICY_ID_1, config: { @@ -1557,8 +1561,14 @@ describe('ManifestManager', () => { const context = buildManifestManagerContextMock({}); const manifestManager = new ManifestManager(context); + (context.artifactClient.fetchAll as jest.Mock).mockReturnValue( + createFetchAllArtifactsIterableMock([[generateArtifactMock()]]) + ); + context.exceptionListClient.findExceptionListItem = mockFindExceptionListItemResponses({}); - context.packagePolicyService.listIds = mockPolicyListIdsResponse([TEST_POLICY_ID_1]); + context.packagePolicyService.fetchAllItemIds = getMockPolicyFetchAllItemIds([ + TEST_POLICY_ID_1, + ]); context.savedObjectsClient.create = jest .fn() @@ -1581,7 +1591,9 @@ describe('ManifestManager', () => { const manifestManager = new ManifestManager(context); context.exceptionListClient.findExceptionListItem = mockFindExceptionListItemResponses({}); - context.packagePolicyService.listIds = mockPolicyListIdsResponse([TEST_POLICY_ID_1]); + context.packagePolicyService.fetchAllItemIds = getMockPolicyFetchAllItemIds([ + TEST_POLICY_ID_1, + ]); context.savedObjectsClient.create = jest .fn() diff --git a/x-pack/plugins/security_solution/server/endpoint/services/artifacts/manifest_manager/manifest_manager.ts b/x-pack/plugins/security_solution/server/endpoint/services/artifacts/manifest_manager/manifest_manager.ts index 27f528aba2716..a1ec74bc57b09 100644 --- a/x-pack/plugins/security_solution/server/endpoint/services/artifacts/manifest_manager/manifest_manager.ts +++ b/x-pack/plugins/security_solution/server/endpoint/services/artifacts/manifest_manager/manifest_manager.ts @@ -6,15 +6,17 @@ */ import semver from 'semver'; -import { chunk, isEmpty, isEqual, keyBy } from 'lodash'; +import { isEmpty, isEqual, keyBy } from 'lodash'; import type { ElasticsearchClient } from '@kbn/core/server'; import { type Logger, type SavedObjectsClientContract } from '@kbn/core/server'; import { ENDPOINT_LIST_ID, ENDPOINT_ARTIFACT_LISTS } from '@kbn/securitysolution-list-constants'; -import type { ListResult, PackagePolicy } from '@kbn/fleet-plugin/common'; +import type { PackagePolicy } from '@kbn/fleet-plugin/common'; import type { Artifact, PackagePolicyClient } from '@kbn/fleet-plugin/server'; import type { ExceptionListClient } from '@kbn/lists-plugin/server'; import type { ExceptionListItemSchema } from '@kbn/securitysolution-io-ts-list-types'; import { AppFeatureKey } from '@kbn/security-solution-features/keys'; +import { stringify } from '../../../utils/stringify'; +import { QueueProcessor } from '../../../utils/queue_processor'; import type { AppFeaturesService } from '../../../../lib/app_features_service/app_features_service'; import type { ExperimentalFeatures } from '../../../../../common'; import type { ManifestSchemaVersion } from '../../../../../common/endpoint/schema/common'; @@ -70,24 +72,6 @@ const iterateArtifactsBuildResult = ( } }; -const iterateAllListItems = async ( - pageSupplier: (page: number, perPage: number) => Promise>, - itemCallback: (items: T[]) => void -) => { - let paging = true; - let page = 1; - const perPage = 1000; - - while (paging) { - const { items, total } = await pageSupplier(page, perPage); - - itemCallback(items); - - paging = (page - 1) * perPage + items.length < total; - page++; - } -}; - export interface ManifestManagerContext { savedObjectsClient: SavedObjectsClientContract; artifactClient: EndpointArtifactClientInterface; @@ -407,7 +391,7 @@ export class ManifestManager { } /** - * Writes new artifact SOs. + * Writes new artifact to Fleet * * @param artifacts An InternalArtifactCompleteSchema array representing the artifacts. * @param newManifest A Manifest representing the new manifest @@ -418,7 +402,6 @@ export class ManifestManager { newManifest: Manifest ): Promise { const errors: Error[] = []; - const artifactsToCreate: InternalArtifactCompleteSchema[] = []; for (const artifact of artifacts) { @@ -433,28 +416,58 @@ export class ManifestManager { return errors; } + this.logger.debug(`Creating [${artifactsToCreate.length}] artifacts`); + const { artifacts: fleetArtifacts, errors: createErrors } = await this.artifactClient.bulkCreateArtifacts(artifactsToCreate); + this.logger.info(`Count of artifacts created: ${fleetArtifacts?.length ?? 0}`); + if (createErrors) { errors.push(...createErrors); } + const newArtifactsAddedToManifest: string[] = []; + const artifactsNotCreated: string[] = []; + if (fleetArtifacts) { - const fleetArtfactsByIdentifier: { [key: string]: InternalArtifactCompleteSchema } = {}; + const fleetArtifactsByIdentifier: { [key: string]: InternalArtifactCompleteSchema } = {}; + fleetArtifacts.forEach((fleetArtifact) => { - fleetArtfactsByIdentifier[getArtifactId(fleetArtifact)] = fleetArtifact; + fleetArtifactsByIdentifier[getArtifactId(fleetArtifact)] = fleetArtifact; }); + artifactsToCreate.forEach((artifact) => { const artifactId = getArtifactId(artifact); - const fleetArtifact = fleetArtfactsByIdentifier[artifactId]; + const fleetArtifact = fleetArtifactsByIdentifier[artifactId]; + + if (!fleetArtifact) { + artifactsNotCreated.push(artifactId); + + return; + } - if (!fleetArtifact) return; newManifest.replaceArtifact(fleetArtifact); - this.logger.debug(`New created artifact ${artifactId} added to the manifest`); + newArtifactsAddedToManifest.push(artifactId); }); } + if (artifactsNotCreated.length) { + this.logger.debug( + `A total of [${ + artifactsNotCreated.length + }] artifacts were not created. Prior version of the artifact will remain in manifest.\n${artifactsNotCreated.join( + '\n' + )}` + ); + } + + if (newArtifactsAddedToManifest.length !== 0) { + this.logger.debug( + `Newly created artifacts added to the manifest:\n${newArtifactsAddedToManifest.join('\n')}` + ); + } + return errors; } @@ -469,15 +482,24 @@ export class ManifestManager { if (isEmpty(artifactIds)) { return []; } + const errors = await this.artifactClient.bulkDeleteArtifacts(artifactIds); + if (!isEmpty(errors)) { return errors; } - for (const artifactId of artifactIds) { - this.logger.info(`Cleaned up artifact ${artifactId}`); + + this.logger.info(`Count of cleaned up artifacts: ${artifactIds.length}`); + + if (artifactIds.length !== 0) { + this.logger.debug(`Deleted artifacts from cleanup:\n${artifactIds.join('\n ')}`); } + return []; } catch (err) { + this.logger.error( + `Attempted to delete [${artifactIds.length}] outdated artifacts failed with: ${err.message}\n${err.stack}` + ); return [err]; } } @@ -508,22 +530,35 @@ export class ManifestManager { const fleetArtifacts = await this.listAllArtifacts(); const fleetArtifactsById = keyBy(fleetArtifacts, (artifact) => getArtifactId(artifact)); + const invalidArtifactIds: string[] = []; + // Ensure that all artifacts currently defined in the Manifest have a valid artifact in fleet, + // and remove any that does not have an actual artifact from the manifest for (const entry of manifestSo.attributes.artifacts) { const artifact = fleetArtifactsById[entry.artifactId]; if (!artifact) { - this.logger.error( - new InvalidInternalManifestError(`artifact id [${entry.artifactId}] not found!`, { - entry, - action: 'removed from internal ManifestManger tracking map', - }) - ); + invalidArtifactIds.push(entry.artifactId); } else { manifest.addEntry(artifact, entry.policyId); } } + if (invalidArtifactIds.length) { + this.logger.warn( + `Missing artifacts detected! Internal artifact manifest (SavedObject version [${ + manifestSo.version + }]) references [${ + invalidArtifactIds.length + }] artifact IDs that don't exist.\nFirst 10 below (run with logging set to 'debug' to see all):\n${invalidArtifactIds + .slice(0, 10) + .join('\n')}` + ); + this.logger.debug( + `Artifact ID references that are missing:\n${stringify(invalidArtifactIds)}` + ); + } + return manifest; } catch (error) { if (!error.output || error.output.statusCode !== 404) { @@ -569,15 +604,10 @@ export class ManifestManager { for (const result of results) { iterateArtifactsBuildResult(result, (artifact, policyId) => { - const artifactToAdd = baselineManifest.getArtifact(getArtifactId(artifact)) || artifact; - if (!internalArtifactCompleteSchema.is(artifactToAdd)) { - throw new EndpointError( - `Incomplete artifact detected: ${getArtifactId(artifactToAdd)}`, - artifactToAdd - ); - } - - manifest.addEntry(artifactToAdd, policyId); + manifest.addEntry( + baselineManifest.getArtifact(getArtifactId(artifact)) || artifact, + policyId + ); }); } @@ -592,81 +622,93 @@ export class ManifestManager { * @returns {Promise} Any errors encountered. */ public async tryDispatch(manifest: Manifest): Promise { - const allPackagePolicies: PackagePolicy[] = []; - await iterateAllListItems( - (page, perPage) => this.listEndpointPolicies(page, perPage), - (packagePoliciesBatch) => { - allPackagePolicies.push(...packagePoliciesBatch); - } - ); + const errors: Error[] = []; + const updatedPolicies: string[] = []; + const unChangedPolicies: string[] = []; + const manifestVersion = manifest.getSemanticVersion(); + const execId = Math.random().toString(32).substring(3, 8); + const policyUpdateBatchProcessor = new QueueProcessor({ + batchSize: this.packagerTaskPackagePolicyUpdateBatchSize, + logger: this.logger, + key: `tryDispatch.${execId}`, + batchHandler: async ({ data: currentBatch }) => { + const response = await this.packagePolicyService.bulkUpdate( + this.savedObjectsClient, + this.esClient, + currentBatch + ); - const packagePoliciesToUpdate: PackagePolicy[] = []; + if (!isEmpty(response.failedPolicies)) { + errors.push( + ...response.failedPolicies.map((failedPolicy) => { + if (failedPolicy.error instanceof Error) { + return failedPolicy.error; + } else { + return new Error(failedPolicy.error.message); + } + }) + ); + } - const errors: Error[] = []; - allPackagePolicies.forEach((packagePolicy) => { - const { id } = packagePolicy; - if (packagePolicy.inputs.length > 0 && packagePolicy.inputs[0].config !== undefined) { - const oldManifest = packagePolicy.inputs[0].config.artifact_manifest ?? { - value: {}, - }; - - const newManifestVersion = manifest.getSemanticVersion(); - if (semver.gt(newManifestVersion, oldManifest.value.manifest_version)) { - const serializedManifest = manifest.toPackagePolicyManifest(id); - - if (!manifestDispatchSchema.is(serializedManifest)) { - errors.push(new EndpointError(`Invalid manifest for policy ${id}`, serializedManifest)); - } else if (!manifestsEqual(serializedManifest, oldManifest.value)) { - packagePolicy.inputs[0].config.artifact_manifest = { value: serializedManifest }; - packagePoliciesToUpdate.push(packagePolicy); + if (response.updatedPolicies) { + updatedPolicies.push( + ...response.updatedPolicies.map((policy) => { + return `[${policy.id}][${policy.name}] updated with manifest version: [${manifestVersion}]`; + }) + ); + } + }, + }); + + for await (const policies of this.fetchAllPolicies()) { + for (const packagePolicy of policies) { + const { id, name } = packagePolicy; + + if (packagePolicy.inputs.length > 0 && packagePolicy.inputs[0].config !== undefined) { + const oldManifest = packagePolicy.inputs[0].config.artifact_manifest ?? { + value: {}, + }; + + const newManifestVersion = manifest.getSemanticVersion(); + + if (semver.gt(newManifestVersion, oldManifest.value.manifest_version)) { + const serializedManifest = manifest.toPackagePolicyManifest(id); + + if (!manifestDispatchSchema.is(serializedManifest)) { + errors.push( + new EndpointError(`Invalid manifest for policy ${id}`, serializedManifest) + ); + } else if (!manifestsEqual(serializedManifest, oldManifest.value)) { + packagePolicy.inputs[0].config.artifact_manifest = { value: serializedManifest }; + policyUpdateBatchProcessor.addToQueue(packagePolicy); + } else { + unChangedPolicies.push(`[${id}][${name}] No change in manifest content`); + } } else { - this.logger.debug( - `No change in manifest content for package policy: ${id}. Staying on old version` - ); + unChangedPolicies.push(`[${id}][${name}] No change in manifest version`); } } else { - this.logger.debug(`No change in manifest version for package policy: ${id}`); + errors.push( + new EndpointError(`Package Policy ${id} has no 'inputs[0].config'`, packagePolicy) + ); } - } else { - errors.push( - new EndpointError(`Package Policy ${id} has no 'inputs[0].config'`, packagePolicy) - ); } - }); + } - // Split updates in batches with batch size: packagerTaskPackagePolicyUpdateBatchSize - const updateBatches = chunk( - packagePoliciesToUpdate, - this.packagerTaskPackagePolicyUpdateBatchSize + await policyUpdateBatchProcessor.complete(); + + this.logger.info( + `Processed [${updatedPolicies.length + unChangedPolicies.length}] Policies: updated: [${ + updatedPolicies.length + }], un-changed: [${unChangedPolicies.length}]` ); - for (const currentBatch of updateBatches) { - const response = await this.packagePolicyService.bulkUpdate( - this.savedObjectsClient, - this.esClient, - currentBatch - ); + if (updatedPolicies.length) { + this.logger.debug(`Updated Policies:\n ${updatedPolicies.join('\n ')}`); + } - // Update errors - if (!isEmpty(response.failedPolicies)) { - errors.push( - ...response.failedPolicies.map((failedPolicy) => { - if (failedPolicy.error instanceof Error) { - return failedPolicy.error; - } else { - return new Error(failedPolicy.error.message); - } - }) - ); - } - // Log success updates - for (const updatedPolicy of response.updatedPolicies || []) { - this.logger.debug( - `Updated package policy ${ - updatedPolicy.id - } with manifest version ${manifest.getSemanticVersion()}` - ); - } + if (unChangedPolicies.length) { + this.logger.debug(`Un-changed Policies:\n ${unChangedPolicies.join('\n ')}`); } return errors; @@ -696,31 +738,24 @@ export class ManifestManager { this.logger.info(`Committed manifest ${manifest.getSemanticVersion()}`); } - private async listEndpointPolicies( - page: number, - perPage: number - ): Promise> { - return this.packagePolicyService.list(this.savedObjectsClient, { - page, - perPage, + private fetchAllPolicies(): AsyncIterable { + return this.packagePolicyService.fetchAllItems(this.savedObjectsClient, { kuery: 'ingest-package-policies.package.name:endpoint', }); } private async listEndpointPolicyIds(): Promise { const allPolicyIds: string[] = []; - await iterateAllListItems( - (page, perPage) => { - return this.packagePolicyService.listIds(this.savedObjectsClient, { - page, - perPage, - kuery: 'ingest-package-policies.package.name:endpoint', - }); - }, - (packagePolicyIdsBatch) => { - allPolicyIds.push(...packagePolicyIdsBatch); - } - ); + const idFetcher = this.packagePolicyService.fetchAllItemIds(this.savedObjectsClient, { + kuery: 'ingest-package-policies.package.name:endpoint', + }); + + for await (const itemIds of idFetcher) { + allPolicyIds.push(...itemIds); + } + + this.logger.debug(`Retrieved [${allPolicyIds.length}] endpoint integration policy IDs`); + return allPolicyIds; } @@ -733,70 +768,68 @@ export class ManifestManager { * @returns Artifact[] */ private async listAllArtifacts(): Promise { - const fleetArtifacts = []; - const perPage = 100; - let page = 1; + const fleetArtifacts: Artifact[] = []; + let total = 0; - let fleetArtifactsResponse = await this.artifactClient.listArtifacts({ - perPage, - page, - }); - fleetArtifacts.push(...fleetArtifactsResponse.items); - - while ( - fleetArtifactsResponse.total > fleetArtifacts.length && - !isEmpty(fleetArtifactsResponse.items) - ) { - page += 1; - fleetArtifactsResponse = await this.artifactClient.listArtifacts({ - perPage, - page, - }); - fleetArtifacts.push(...fleetArtifactsResponse.items); + for await (const artifacts of this.artifactClient.fetchAll()) { + fleetArtifacts.push(...artifacts); + total += artifacts.length; } + + this.logger.info(`Count of current stored artifacts: ${total}`); + return fleetArtifacts; } /** - * Cleanup .fleet-artifacts index if there are some orphan artifacts + * Pulls in all artifacts from Fleet and checks to ensure they are all being referenced + * by the Manifest. If any are found to not be in the current Manifest (orphan), they + * are cleaned up (deleted) */ public async cleanup(manifest: Manifest) { - try { - const fleetArtifacts = await this.listAllArtifacts(); - if (isEmpty(fleetArtifacts)) { - return; - } - - const badArtifacts = []; - const badArtifactIds = []; + const badArtifactIds: string[] = []; + const errors: string[] = []; + const artifactDeletionProcess = new QueueProcessor({ + batchSize: this.packagerTaskPackagePolicyUpdateBatchSize, + logger: this.logger, + key: 'cleanup', + batchHandler: async ({ batch, data }) => { + const deleteErrors = await this.artifactClient.bulkDeleteArtifacts(data); + + badArtifactIds.push(...data); + + if (deleteErrors.length) { + errors.push( + `Delete batch #[${batch}] with [${data.length}] items:\n${stringify(deleteErrors)}` + ); + } + }, + }); - const manifestArtifactsIds = manifest - .getAllArtifacts() - .map((artifact) => getArtifactId(artifact)); + const validArtifactIds = manifest.getAllArtifacts().map((artifact) => getArtifactId(artifact)); - for (const fleetArtifact of fleetArtifacts) { - const artifactId = getArtifactId(fleetArtifact); - const isArtifactInManifest = manifestArtifactsIds.includes(artifactId); + for await (const artifacts of this.artifactClient.fetchAll()) { + for (const artifact of artifacts) { + const artifactId = getArtifactId(artifact); + const isArtifactInManifest = validArtifactIds.includes(artifactId); if (!isArtifactInManifest) { - badArtifacts.push(fleetArtifact); - badArtifactIds.push(artifactId); + artifactDeletionProcess.addToQueue(artifactId); } } + } - if (isEmpty(badArtifacts)) { - return; - } + await artifactDeletionProcess.complete(); + if (errors.length > 0) { this.logger.error( - new EndpointError(`Cleaning up ${badArtifacts.length} orphan artifacts`, badArtifacts) + `The following errors were encountered while attempting to delete [${ + badArtifactIds.length + }] orphaned artifacts:\n${stringify(errors)}` ); - - await this.artifactClient.bulkDeleteArtifacts(badArtifactIds); - - this.logger.info(`All orphan artifacts has been removed successfully`); - } catch (error) { - this.logger.error(new EndpointError('There was an error cleaning orphan artifacts', error)); + } else if (badArtifactIds.length > 0) { + this.logger.info(`Count of orphan artifacts cleaned up: ${badArtifactIds.length}`); + this.logger.debug(`Orphan artifacts deleted from Fleet:\n${stringify(badArtifactIds)}`); } } } diff --git a/x-pack/plugins/security_solution/server/endpoint/services/artifacts/mocks.ts b/x-pack/plugins/security_solution/server/endpoint/services/artifacts/mocks.ts index 2acfa7b7b4794..1a1dd701e9803 100644 --- a/x-pack/plugins/security_solution/server/endpoint/services/artifacts/mocks.ts +++ b/x-pack/plugins/security_solution/server/endpoint/services/artifacts/mocks.ts @@ -65,6 +65,7 @@ export const createEndpointArtifactClientMock = ( bulkDeleteArtifacts: jest.fn(async (...args) => endpointArtifactClientMocked.bulkDeleteArtifacts(...args) ), + fetchAll: jest.fn((...args) => endpointArtifactClientMocked.fetchAll(...args)), _esClient: esClient, }; }; diff --git a/x-pack/plugins/security_solution/server/endpoint/utils/queue_processor.ts b/x-pack/plugins/security_solution/server/endpoint/utils/queue_processor.ts new file mode 100644 index 0000000000000..f4f3e4ac76852 --- /dev/null +++ b/x-pack/plugins/security_solution/server/endpoint/utils/queue_processor.ts @@ -0,0 +1,152 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import type { Logger } from '@kbn/core/server'; + +export interface QueueProcessorOptions { + batchHandler: (batch: { batch: number; data: T[] }) => Promise; + batchSize?: number; + logger?: Logger; + /** + * Used when `logger` is passed. It will be used to define the logging messages context path. + * Defaults to the name of the callback provided in `batchHandler` + */ + key?: string; +} + +/** + * Process an un-bound amount of items in batches. Each batch is process once the queued reach the + * `batchSize`, thus processing is gradually executed ensuring that data is not held in memory + * for too long. Once all items are added to the Queue, calling + * `.complete()` will ensure they are all processed. + * + * @example + * const processor = new QueueProcessor<{ id: string }>({ + * batchHandler: ({ data, batch }) => { + * // data === array of `{ id: string }` + * // batch === batch number + * } + * }); + * + * const myIdList = [ .... ]; // Array with 50 string + * + * for (const id of myIdList) { + * batchHandler.addToQueue({ id: id}); + * } + * + * await processor.complete(); + */ +export class QueueProcessor { + private readonly batchSize: number; + private readonly batchHandler: QueueProcessorOptions['batchHandler']; + private readonly logger: Logger | undefined = undefined; + + private queue: T[] = []; + private processingPromise: Promise | undefined = undefined; + private batchCount = 0; + private itemsProcessedCount = 0; + + constructor({ + batchHandler, + batchSize = 10, + logger, + key = 'QueueProcessor', + }: QueueProcessorOptions) { + if (batchSize < 1 || !Number.isFinite(batchSize)) { + throw new Error(`batchSize must be a number greater than zero`); + } + + this.batchSize = batchSize; + this.batchHandler = batchHandler; + this.logger = logger ? logger.get(key) : undefined; + } + + protected log( + message: string, + output: keyof Pick = 'info' + ): void { + if (this.logger) { + this.logger[output](message); + } + } + + protected async processQueue(all: boolean = false) { + if (this.processingPromise || this.queue.length === 0) { + return; + } + + const runThroughQueue = async () => { + let hasMoreData = true; + + while (hasMoreData) { + try { + if (all || this.queue.length >= this.batchSize) { + const batchPage = this.queue.splice(0, this.batchSize); + const batchPageSize = batchPage.length; + const remainingItemsSize = this.queue.length; + + hasMoreData = (all && remainingItemsSize > 0) || remainingItemsSize >= this.batchSize; + this.itemsProcessedCount += batchPageSize; + this.batchCount++; + + try { + this.log( + `Processing batch [${this.batchCount}] with [${batchPageSize}] items. Items remaining in queue: [${remainingItemsSize}]`, + 'debug' + ); + await this.batchHandler({ batch: this.batchCount, data: batchPage }); + } catch (err) { + this.log( + `batchHandler threw error (below). Will continue on to next batch page:\n${err}`, + 'debug' + ); + // ignore errors in the batch page processing and keep going to process others. + // callback should have handled errors that its process might throw + } + } else { + hasMoreData = false; + } + } catch (err) { + hasMoreData = false; + throw err; + } + } + }; + + this.processingPromise = runThroughQueue().finally(() => { + this.processingPromise = undefined; + }); + + return this.processingPromise; + } + + /** + * Adds an update to the queue + */ + public addToQueue(...data: T[]) { + this.queue.push(...data); + this.processQueue(); + } + + /** + * Flushes the queue and awaits processing of all remaining updates. + * + * **IMPORTANT**: Always make sure `complete()` is called to ensure no items are left in the queue + */ + public async complete(): Promise { + if (this.processingPromise) { + await this.processingPromise.finally(() => {}); + } + + await this.processQueue(true); + + this.log( + `Processed [${this.batchCount}] batches and a total of [${this.itemsProcessedCount}] items`, + 'debug' + ); + } +} diff --git a/x-pack/plugins/security_solution/server/plugin.ts b/x-pack/plugins/security_solution/server/plugin.ts index aaa104533e080..0bc81fc0b17fd 100644 --- a/x-pack/plugins/security_solution/server/plugin.ts +++ b/x-pack/plugins/security_solution/server/plugin.ts @@ -532,7 +532,7 @@ export class Plugin implements ISecuritySolutionPlugin { artifactClient, exceptionListClient, packagePolicyService: plugins.fleet.packagePolicyService, - logger, + logger: this.pluginContext.logger.get('ManifestManager'), experimentalFeatures: config.experimentalFeatures, packagerTaskPackagePolicyUpdateBatchSize: config.packagerTaskPackagePolicyUpdateBatchSize, esClient: core.elasticsearch.client.asInternalUser,