From 6e06452aac11ed22efa923284fbb9ad4da1f7ce1 Mon Sep 17 00:00:00 2001 From: Mark Hopkin Date: Thu, 9 Feb 2023 17:16:35 +0000 Subject: [PATCH] [Fleet] Run agent policy schema in batches during fleet setup + add `xpack.fleet.setup.agentPolicySchemaUpgradeBatchSize` config (#150688) ## Summary Closes #150538 As part of the Fleet plugin setup, we check to see if any agent policies have an out of date `schema_version` and upgrade them. We encountered an error when this upgrade happens on a large number of agent policies as we attempted the upgrade in one large batch. This pull request performs the schema upgrade in batches of 100 by default and also adds the config value `xpack.fleet.setup.agentPolicySchemaUpgradeBatchSize` to make the batch size configurable. I have also added more debug logging to show progress, and reduced the response payload of one of our requests which was very large. ### Dev testing To test this you need an environemnt with lots of agent policies (> 2k) where `schema_version` is not set. To create an environment with a large number of agent policies I have added a new param to the agent creation script, I ran: ``` cd x-pack/plugins/fleet node scripts/create_agents --count 20 --kibana http://127.0.0.1:5601/mark --status online --delete --batches 3000 --concurrentBatches 100 ``` To generate 3000 agent policies each with 20 agents in. I then modified the agent policies so that they require an upgrade, as `system_indices_superuser` run: ``` POST /.kibana/_update_by_query { "query": { "bool": { "filter": [ { "term": { "type": "ingest-agent-policies" } } ] } }, "script": { "source": "ctx._source['ingest-agent-policies'].remove('schema_version')", "lang": "painless" } } ``` restarting kibana will run the setup and in batches. --- x-pack/plugins/fleet/common/types/index.ts | 3 ++ .../scripts/create_agents/create_agents.ts | 53 +++++++++++++------ x-pack/plugins/fleet/server/config.ts | 5 ++ .../fleet/server/services/agent_policy.ts | 10 +++- .../upgrade_agent_policy_schema_version.ts | 22 ++++++-- 5 files changed, 72 insertions(+), 21 deletions(-) diff --git a/x-pack/plugins/fleet/common/types/index.ts b/x-pack/plugins/fleet/common/types/index.ts index f819705b02230..202dd0c81ead5 100644 --- a/x-pack/plugins/fleet/common/types/index.ts +++ b/x-pack/plugins/fleet/common/types/index.ts @@ -36,6 +36,9 @@ export interface FleetConfigType { packageVerification?: { gpgKeyPath?: string; }; + setup?: { + agentPolicySchemaUpgradeBatchSize?: number; + }; developer?: { disableRegistryVersionCheck?: boolean; bundledPackageLocation?: string; diff --git a/x-pack/plugins/fleet/scripts/create_agents/create_agents.ts b/x-pack/plugins/fleet/scripts/create_agents/create_agents.ts index 4b47b56d9a48d..683d45a03757c 100644 --- a/x-pack/plugins/fleet/scripts/create_agents/create_agents.ts +++ b/x-pack/plugins/fleet/scripts/create_agents/create_agents.ts @@ -25,6 +25,8 @@ const printUsage = () => [--kibana]: full url of kibana instance to create agents and policy in e.g http://localhost:5601/mybase, defaults to http://localhost:5601 [--username]: username for kibana, defaults to elastic [--password]: password for kibana, defaults to changeme + [--batches]: run the script in batches, defaults to 1 e.g if count is 50 and batches is 10, 500 agents will be created and 10 agent policies + [--concurrentBatches]: how many batches to run concurrently, defaults to 10 `); const DEFAULT_KIBANA_URL = 'http://localhost:5601'; @@ -49,6 +51,8 @@ const { agentVersion: agentVersionArg, username: kbnUsername = DEFAULT_KIBANA_USERNAME, password: kbnPassword = DEFAULT_KIBANA_PASSWORD, + batches: batchesArg, + concurrentBatches: concurrentBatchesArg = 10, // ignore yargs positional args, we only care about named args _, $0, @@ -59,6 +63,8 @@ const statusesArg = (statusArg as string).split(',') as AgentStatus[]; const inactivityTimeout = inactivityTimeoutArg ? Number(inactivityTimeoutArg).valueOf() : DEFAULT_UNENROLL_TIMEOUT; +const batches = inactivityTimeoutArg ? Number(batchesArg).valueOf() : 1; +const concurrentBatches = concurrentBatchesArg ? Number(concurrentBatchesArg).valueOf() : 10; const count = countArg ? Number(countArg).valueOf() : DEFAULT_AGENT_COUNT; const kbnAuth = 'Basic ' + Buffer.from(kbnUsername + ':' + kbnPassword).toString('base64'); @@ -258,7 +264,7 @@ async function createAgentPolicy(id: string) { name: id, namespace: 'default', description: '', - monitoring_enabled: ['logs'], + monitoring_enabled: ['logs', 'metrics'], inactivity_timeout: inactivityTimeout, }), headers: { @@ -314,23 +320,40 @@ export async function run() { logger.info(`Deleted ${deleteRes.deleted} agents, took ${deleteRes.took}ms`); } - logger.info('Creating agent policy'); - - const agentPolicyId = 'script-create-agent-' + uuidv4(); - const agentPolicy = await createAgentPolicy(agentPolicyId); - logger.info(`Created agent policy ${agentPolicy.item.id}`); - logger.info('Creating fleet superuser'); const { role, user } = await createSuperUser(); logger.info(`Role "${ES_SUPERUSER}" ${role.role.created ? 'created' : 'already exists'}`); logger.info(`User "${ES_SUPERUSER}" ${user.created ? 'created' : 'already exists'}`); - logger.info('Creating agent documents'); - const statusMap = statusesArg.reduce((acc, status) => ({ ...acc, [status]: count }), {}); - logStatusMap(statusMap); - const agents = createAgentsWithStatuses(statusMap, agentPolicyId, agentVersion); - const createRes = await createAgentDocsBulk(agents); - logger.info( - `Created ${createRes.items.length} agent docs, took ${createRes.took}, errors: ${createRes.errors}` - ); + let batchesRemaining = batches; + let totalAgents = 0; + while (batchesRemaining > 0) { + const currentBatchSize = Math.min(concurrentBatches, batchesRemaining); + if (batches > 1) { + logger.info(`Running ${currentBatchSize} batches. ${batchesRemaining} batches remaining`); + } + + await Promise.all( + Array(currentBatchSize) + .fill(0) + .map(async (__, i) => { + const agentPolicyId = 'script-create-agent-' + uuidv4(); + const agentPolicy = await createAgentPolicy(agentPolicyId); + logger.info(`Created agent policy ${agentPolicy.item.id}`); + + const statusMap = statusesArg.reduce((acc, status) => ({ ...acc, [status]: count }), {}); + logStatusMap(statusMap); + const agents = createAgentsWithStatuses(statusMap, agentPolicyId, agentVersion); + const createRes = await createAgentDocsBulk(agents); + logger.info( + `Batch complete, created ${createRes.items.length} agent docs, took ${createRes.took}, errors: ${createRes.errors}` + ); + totalAgents += createRes.items.length; + }) + ); + + batchesRemaining -= currentBatchSize; + } + + logger.info(`All batches complete. Created ${totalAgents} agents in total. Goodbye!`); } diff --git a/x-pack/plugins/fleet/server/config.ts b/x-pack/plugins/fleet/server/config.ts index 3c982ef9b516b..6cbbbd3a94bfe 100644 --- a/x-pack/plugins/fleet/server/config.ts +++ b/x-pack/plugins/fleet/server/config.ts @@ -120,6 +120,11 @@ export const config: PluginConfigDescriptor = { fleetServerHosts: PreconfiguredFleetServerHostsSchema, proxies: PreconfiguredFleetProxiesSchema, agentIdVerificationEnabled: schema.boolean({ defaultValue: true }), + setup: schema.maybe( + schema.object({ + agentPolicySchemaUpgradeBatchSize: schema.maybe(schema.number()), + }) + ), developer: schema.object({ disableRegistryVersionCheck: schema.boolean({ defaultValue: false }), allowAgentUpgradeSourceUri: schema.boolean({ defaultValue: false }), diff --git a/x-pack/plugins/fleet/server/services/agent_policy.ts b/x-pack/plugins/fleet/server/services/agent_policy.ts index d845d466ea381..5912e022370de 100644 --- a/x-pack/plugins/fleet/server/services/agent_policy.ts +++ b/x-pack/plugins/fleet/server/services/agent_policy.ts @@ -315,8 +315,14 @@ class AgentPolicyService { soClient: SavedObjectsClientContract, options: ListWithKuery & { withPackagePolicies?: boolean; + fields?: string[]; } - ): Promise<{ items: AgentPolicy[]; total: number; page: number; perPage: number }> { + ): Promise<{ + items: AgentPolicy[]; + total: number; + page: number; + perPage: number; + }> { const { page = 1, perPage = 20, @@ -324,6 +330,7 @@ class AgentPolicyService { sortOrder = 'desc', kuery, withPackagePolicies = false, + fields, } = options; const baseFindParams = { @@ -332,6 +339,7 @@ class AgentPolicyService { sortOrder, page, perPage, + ...(fields ? { fields } : {}), }; const filter = kuery ? normalizeKuery(SAVED_OBJECT_TYPE, kuery) : undefined; let agentPoliciesSO; diff --git a/x-pack/plugins/fleet/server/services/setup/upgrade_agent_policy_schema_version.ts b/x-pack/plugins/fleet/server/services/setup/upgrade_agent_policy_schema_version.ts index a02c81c2cfab1..0dfe1227af050 100644 --- a/x-pack/plugins/fleet/server/services/setup/upgrade_agent_policy_schema_version.ts +++ b/x-pack/plugins/fleet/server/services/setup/upgrade_agent_policy_schema_version.ts @@ -10,14 +10,16 @@ import type { SavedObjectsClientContract } from '@kbn/core/server'; import { AGENT_POLICY_SAVED_OBJECT_TYPE, FLEET_AGENT_POLICIES_SCHEMA_VERSION, - SO_SEARCH_LIMIT, } from '../../constants'; import { agentPolicyService } from '../agent_policy'; +import { appContextService } from '../app_context'; -function getOutdatedAgentPoliciesBatch(soClient: SavedObjectsClientContract) { +const DEFAULT_BATCH_SIZE = 100; +function getOutdatedAgentPoliciesBatch(soClient: SavedObjectsClientContract, batchSize: number) { return agentPolicyService.list(soClient, { - perPage: SO_SEARCH_LIMIT, + perPage: batchSize, kuery: `NOT ${AGENT_POLICY_SAVED_OBJECT_TYPE}.schema_version:${FLEET_AGENT_POLICIES_SCHEMA_VERSION}`, + fields: ['id'], // we only need the ID of the agent policy }); } @@ -26,13 +28,23 @@ function getOutdatedAgentPoliciesBatch(soClient: SavedObjectsClientContract) { // deploy outdated policies to .fleet-policies index // bump oudated SOs schema_version export async function upgradeAgentPolicySchemaVersion(soClient: SavedObjectsClientContract) { - let outdatedAgentPolicies = await getOutdatedAgentPoliciesBatch(soClient); + const config = appContextService.getConfig(); + const logger = appContextService.getLogger(); + const batchSize = config?.setup?.agentPolicySchemaUpgradeBatchSize ?? DEFAULT_BATCH_SIZE; + let outdatedAgentPolicies = await getOutdatedAgentPoliciesBatch(soClient, batchSize); + logger.debug(`Found ${outdatedAgentPolicies.total} outdated agent policies`); while (outdatedAgentPolicies.total > 0) { + const start = Date.now(); const outdatedAgentPolicyIds = outdatedAgentPolicies.items.map( (outdatedAgentPolicy) => outdatedAgentPolicy.id ); await agentPolicyService.deployPolicies(soClient, outdatedAgentPolicyIds); - outdatedAgentPolicies = await getOutdatedAgentPoliciesBatch(soClient); + outdatedAgentPolicies = await getOutdatedAgentPoliciesBatch(soClient, batchSize); + logger.debug( + `Upgraded ${outdatedAgentPolicyIds.length} agent policies in ${Date.now() - start}ms, ${ + outdatedAgentPolicies.total + } remaining` + ); } }