Skip to content

Commit

Permalink
[8.7] [Fleet] Run agent policy schema in batches during fleet setup +…
Browse files Browse the repository at this point in the history
… add `xpack.fleet.setup.agentPolicySchemaUpgradeBatchSize` config (#150688) (#150750)

# Backport

This will backport the following commits from `main` to `8.7`:
- [[Fleet] Run agent policy schema in batches during fleet setup + add
`xpack.fleet.setup.agentPolicySchemaUpgradeBatchSize` config
(#150688)](#150688)

<!--- Backport version: 8.9.7 -->

### Questions ?
Please refer to the [Backport tool
documentation](https://github.com/sqren/backport)

<!--BACKPORT [{"author":{"name":"Mark
Hopkin","email":"mark.hopkin@elastic.co"},"sourceCommit":{"committedDate":"2023-02-09T17:16:35Z","message":"[Fleet]
Run agent policy schema in batches during fleet setup + add
`xpack.fleet.setup.agentPolicySchemaUpgradeBatchSize` config
(#150688)\n\n## Summary\r\n\r\nCloses #150538 \r\n\r\nAs part of the
Fleet plugin setup, we check to see if any agent policies\r\nhave an out
of date `schema_version` and upgrade them. We encountered an\r\nerror
when this upgrade happens on a large number of agent policies as\r\nwe
attempted the upgrade in one large batch.\r\n\r\nThis pull request
performs the schema upgrade in batches of 100 by\r\ndefault and also
adds the config
value\r\n`xpack.fleet.setup.agentPolicySchemaUpgradeBatchSize` to make
the batch\r\nsize configurable.\r\n\r\nI have also added more debug
logging to show progress, and reduced the\r\nresponse payload of one of
our requests which was very large.\r\n\r\n### Dev testing\r\n\r\nTo test
this you need an environemnt with lots of agent policies (> 2k)\r\nwhere
`schema_version`\r\nis not set. To create an environment with a large
number of agent\r\npolicies I have added a new param to the agent
creation script, I ran:\r\n\r\n```\r\ncd x-pack/plugins/fleet\r\nnode
scripts/create_agents --count 20 --kibana http://127.0.0.1:5601/mark
--status online --delete --batches 3000 --concurrentBatches
100\r\n```\r\n\r\nTo generate 3000 agent policies each with 20 agents
in.\r\n\r\nI then modified the agent policies so that they require an
upgrade, as\r\n`system_indices_superuser` run:\r\n\r\n```\r\nPOST
/.kibana/_update_by_query\r\n{\r\n \"query\": {\r\n \"bool\": {\r\n
\"filter\": [\r\n {\r\n \"term\": {\r\n \"type\":
\"ingest-agent-policies\"\r\n }\r\n }\r\n ]\r\n }\r\n },\r\n \"script\":
{\r\n \"source\":
\"ctx._source['ingest-agent-policies'].remove('schema_version')\",\r\n
\"lang\": \"painless\"\r\n }\r\n}\r\n```\r\n\r\nrestarting kibana will
run the setup and in
batches.","sha":"6e06452aac11ed22efa923284fbb9ad4da1f7ce1","branchLabelMapping":{"^v8.8.0$":"main","^v(\\d+).(\\d+).\\d+$":"$1.$2"}},"sourcePullRequest":{"labels":["release_note:enhancement","Team:Fleet","backport:all-open","v8.8.0"],"number":150688,"url":"https://github.com/elastic/kibana/pull/150688","mergeCommit":{"message":"[Fleet]
Run agent policy schema in batches during fleet setup + add
`xpack.fleet.setup.agentPolicySchemaUpgradeBatchSize` config
(#150688)\n\n## Summary\r\n\r\nCloses #150538 \r\n\r\nAs part of the
Fleet plugin setup, we check to see if any agent policies\r\nhave an out
of date `schema_version` and upgrade them. We encountered an\r\nerror
when this upgrade happens on a large number of agent policies as\r\nwe
attempted the upgrade in one large batch.\r\n\r\nThis pull request
performs the schema upgrade in batches of 100 by\r\ndefault and also
adds the config
value\r\n`xpack.fleet.setup.agentPolicySchemaUpgradeBatchSize` to make
the batch\r\nsize configurable.\r\n\r\nI have also added more debug
logging to show progress, and reduced the\r\nresponse payload of one of
our requests which was very large.\r\n\r\n### Dev testing\r\n\r\nTo test
this you need an environemnt with lots of agent policies (> 2k)\r\nwhere
`schema_version`\r\nis not set. To create an environment with a large
number of agent\r\npolicies I have added a new param to the agent
creation script, I ran:\r\n\r\n```\r\ncd x-pack/plugins/fleet\r\nnode
scripts/create_agents --count 20 --kibana http://127.0.0.1:5601/mark
--status online --delete --batches 3000 --concurrentBatches
100\r\n```\r\n\r\nTo generate 3000 agent policies each with 20 agents
in.\r\n\r\nI then modified the agent policies so that they require an
upgrade, as\r\n`system_indices_superuser` run:\r\n\r\n```\r\nPOST
/.kibana/_update_by_query\r\n{\r\n \"query\": {\r\n \"bool\": {\r\n
\"filter\": [\r\n {\r\n \"term\": {\r\n \"type\":
\"ingest-agent-policies\"\r\n }\r\n }\r\n ]\r\n }\r\n },\r\n \"script\":
{\r\n \"source\":
\"ctx._source['ingest-agent-policies'].remove('schema_version')\",\r\n
\"lang\": \"painless\"\r\n }\r\n}\r\n```\r\n\r\nrestarting kibana will
run the setup and in
batches.","sha":"6e06452aac11ed22efa923284fbb9ad4da1f7ce1"}},"sourceBranch":"main","suggestedTargetBranches":[],"targetPullRequestStates":[{"branch":"main","label":"v8.8.0","labelRegex":"^v8.8.0$","isSourceBranch":true,"state":"MERGED","url":"https://github.com/elastic/kibana/pull/150688","number":150688,"mergeCommit":{"message":"[Fleet]
Run agent policy schema in batches during fleet setup + add
`xpack.fleet.setup.agentPolicySchemaUpgradeBatchSize` config
(#150688)\n\n## Summary\r\n\r\nCloses #150538 \r\n\r\nAs part of the
Fleet plugin setup, we check to see if any agent policies\r\nhave an out
of date `schema_version` and upgrade them. We encountered an\r\nerror
when this upgrade happens on a large number of agent policies as\r\nwe
attempted the upgrade in one large batch.\r\n\r\nThis pull request
performs the schema upgrade in batches of 100 by\r\ndefault and also
adds the config
value\r\n`xpack.fleet.setup.agentPolicySchemaUpgradeBatchSize` to make
the batch\r\nsize configurable.\r\n\r\nI have also added more debug
logging to show progress, and reduced the\r\nresponse payload of one of
our requests which was very large.\r\n\r\n### Dev testing\r\n\r\nTo test
this you need an environemnt with lots of agent policies (> 2k)\r\nwhere
`schema_version`\r\nis not set. To create an environment with a large
number of agent\r\npolicies I have added a new param to the agent
creation script, I ran:\r\n\r\n```\r\ncd x-pack/plugins/fleet\r\nnode
scripts/create_agents --count 20 --kibana http://127.0.0.1:5601/mark
--status online --delete --batches 3000 --concurrentBatches
100\r\n```\r\n\r\nTo generate 3000 agent policies each with 20 agents
in.\r\n\r\nI then modified the agent policies so that they require an
upgrade, as\r\n`system_indices_superuser` run:\r\n\r\n```\r\nPOST
/.kibana/_update_by_query\r\n{\r\n \"query\": {\r\n \"bool\": {\r\n
\"filter\": [\r\n {\r\n \"term\": {\r\n \"type\":
\"ingest-agent-policies\"\r\n }\r\n }\r\n ]\r\n }\r\n },\r\n \"script\":
{\r\n \"source\":
\"ctx._source['ingest-agent-policies'].remove('schema_version')\",\r\n
\"lang\": \"painless\"\r\n }\r\n}\r\n```\r\n\r\nrestarting kibana will
run the setup and in
batches.","sha":"6e06452aac11ed22efa923284fbb9ad4da1f7ce1"}}]}]
BACKPORT-->

Co-authored-by: Mark Hopkin <mark.hopkin@elastic.co>
  • Loading branch information
kibanamachine and hop-dev authored Feb 9, 2023
1 parent 5cc66cb commit 2713d13
Show file tree
Hide file tree
Showing 5 changed files with 72 additions and 21 deletions.
3 changes: 3 additions & 0 deletions x-pack/plugins/fleet/common/types/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@ export interface FleetConfigType {
packageVerification?: {
gpgKeyPath?: string;
};
setup?: {
agentPolicySchemaUpgradeBatchSize?: number;
};
developer?: {
disableRegistryVersionCheck?: boolean;
bundledPackageLocation?: string;
Expand Down
53 changes: 38 additions & 15 deletions x-pack/plugins/fleet/scripts/create_agents/create_agents.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ const printUsage = () =>
[--kibana]: full url of kibana instance to create agents and policy in e.g http://localhost:5601/mybase, defaults to http://localhost:5601
[--username]: username for kibana, defaults to elastic
[--password]: password for kibana, defaults to changeme
[--batches]: run the script in batches, defaults to 1 e.g if count is 50 and batches is 10, 500 agents will be created and 10 agent policies
[--concurrentBatches]: how many batches to run concurrently, defaults to 10
`);

const DEFAULT_KIBANA_URL = 'http://localhost:5601';
Expand All @@ -49,6 +51,8 @@ const {
agentVersion: agentVersionArg,
username: kbnUsername = DEFAULT_KIBANA_USERNAME,
password: kbnPassword = DEFAULT_KIBANA_PASSWORD,
batches: batchesArg,
concurrentBatches: concurrentBatchesArg = 10,
// ignore yargs positional args, we only care about named args
_,
$0,
Expand All @@ -59,6 +63,8 @@ const statusesArg = (statusArg as string).split(',') as AgentStatus[];
const inactivityTimeout = inactivityTimeoutArg
? Number(inactivityTimeoutArg).valueOf()
: DEFAULT_UNENROLL_TIMEOUT;
const batches = inactivityTimeoutArg ? Number(batchesArg).valueOf() : 1;
const concurrentBatches = concurrentBatchesArg ? Number(concurrentBatchesArg).valueOf() : 10;
const count = countArg ? Number(countArg).valueOf() : DEFAULT_AGENT_COUNT;
const kbnAuth = 'Basic ' + Buffer.from(kbnUsername + ':' + kbnPassword).toString('base64');

Expand Down Expand Up @@ -258,7 +264,7 @@ async function createAgentPolicy(id: string) {
name: id,
namespace: 'default',
description: '',
monitoring_enabled: ['logs'],
monitoring_enabled: ['logs', 'metrics'],
inactivity_timeout: inactivityTimeout,
}),
headers: {
Expand Down Expand Up @@ -314,23 +320,40 @@ export async function run() {
logger.info(`Deleted ${deleteRes.deleted} agents, took ${deleteRes.took}ms`);
}

logger.info('Creating agent policy');

const agentPolicyId = 'script-create-agent-' + uuidv4();
const agentPolicy = await createAgentPolicy(agentPolicyId);
logger.info(`Created agent policy ${agentPolicy.item.id}`);

logger.info('Creating fleet superuser');
const { role, user } = await createSuperUser();
logger.info(`Role "${ES_SUPERUSER}" ${role.role.created ? 'created' : 'already exists'}`);
logger.info(`User "${ES_SUPERUSER}" ${user.created ? 'created' : 'already exists'}`);

logger.info('Creating agent documents');
const statusMap = statusesArg.reduce((acc, status) => ({ ...acc, [status]: count }), {});
logStatusMap(statusMap);
const agents = createAgentsWithStatuses(statusMap, agentPolicyId, agentVersion);
const createRes = await createAgentDocsBulk(agents);
logger.info(
`Created ${createRes.items.length} agent docs, took ${createRes.took}, errors: ${createRes.errors}`
);
let batchesRemaining = batches;
let totalAgents = 0;
while (batchesRemaining > 0) {
const currentBatchSize = Math.min(concurrentBatches, batchesRemaining);
if (batches > 1) {
logger.info(`Running ${currentBatchSize} batches. ${batchesRemaining} batches remaining`);
}

await Promise.all(
Array(currentBatchSize)
.fill(0)
.map(async (__, i) => {
const agentPolicyId = 'script-create-agent-' + uuidv4();
const agentPolicy = await createAgentPolicy(agentPolicyId);
logger.info(`Created agent policy ${agentPolicy.item.id}`);

const statusMap = statusesArg.reduce((acc, status) => ({ ...acc, [status]: count }), {});
logStatusMap(statusMap);
const agents = createAgentsWithStatuses(statusMap, agentPolicyId, agentVersion);
const createRes = await createAgentDocsBulk(agents);
logger.info(
`Batch complete, created ${createRes.items.length} agent docs, took ${createRes.took}, errors: ${createRes.errors}`
);
totalAgents += createRes.items.length;
})
);

batchesRemaining -= currentBatchSize;
}

logger.info(`All batches complete. Created ${totalAgents} agents in total. Goodbye!`);
}
5 changes: 5 additions & 0 deletions x-pack/plugins/fleet/server/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,11 @@ export const config: PluginConfigDescriptor = {
fleetServerHosts: PreconfiguredFleetServerHostsSchema,
proxies: PreconfiguredFleetProxiesSchema,
agentIdVerificationEnabled: schema.boolean({ defaultValue: true }),
setup: schema.maybe(
schema.object({
agentPolicySchemaUpgradeBatchSize: schema.maybe(schema.number()),
})
),
developer: schema.object({
disableRegistryVersionCheck: schema.boolean({ defaultValue: false }),
allowAgentUpgradeSourceUri: schema.boolean({ defaultValue: false }),
Expand Down
10 changes: 9 additions & 1 deletion x-pack/plugins/fleet/server/services/agent_policy.ts
Original file line number Diff line number Diff line change
Expand Up @@ -315,15 +315,22 @@ class AgentPolicyService {
soClient: SavedObjectsClientContract,
options: ListWithKuery & {
withPackagePolicies?: boolean;
fields?: string[];
}
): Promise<{ items: AgentPolicy[]; total: number; page: number; perPage: number }> {
): Promise<{
items: AgentPolicy[];
total: number;
page: number;
perPage: number;
}> {
const {
page = 1,
perPage = 20,
sortField = 'updated_at',
sortOrder = 'desc',
kuery,
withPackagePolicies = false,
fields,
} = options;

const baseFindParams = {
Expand All @@ -332,6 +339,7 @@ class AgentPolicyService {
sortOrder,
page,
perPage,
...(fields ? { fields } : {}),
};
const filter = kuery ? normalizeKuery(SAVED_OBJECT_TYPE, kuery) : undefined;
let agentPoliciesSO;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,16 @@ import type { SavedObjectsClientContract } from '@kbn/core/server';
import {
AGENT_POLICY_SAVED_OBJECT_TYPE,
FLEET_AGENT_POLICIES_SCHEMA_VERSION,
SO_SEARCH_LIMIT,
} from '../../constants';
import { agentPolicyService } from '../agent_policy';
import { appContextService } from '../app_context';

function getOutdatedAgentPoliciesBatch(soClient: SavedObjectsClientContract) {
const DEFAULT_BATCH_SIZE = 100;
function getOutdatedAgentPoliciesBatch(soClient: SavedObjectsClientContract, batchSize: number) {
return agentPolicyService.list(soClient, {
perPage: SO_SEARCH_LIMIT,
perPage: batchSize,
kuery: `NOT ${AGENT_POLICY_SAVED_OBJECT_TYPE}.schema_version:${FLEET_AGENT_POLICIES_SCHEMA_VERSION}`,
fields: ['id'], // we only need the ID of the agent policy
});
}

Expand All @@ -26,13 +28,23 @@ function getOutdatedAgentPoliciesBatch(soClient: SavedObjectsClientContract) {
// deploy outdated policies to .fleet-policies index
// bump oudated SOs schema_version
export async function upgradeAgentPolicySchemaVersion(soClient: SavedObjectsClientContract) {
let outdatedAgentPolicies = await getOutdatedAgentPoliciesBatch(soClient);
const config = appContextService.getConfig();
const logger = appContextService.getLogger();

const batchSize = config?.setup?.agentPolicySchemaUpgradeBatchSize ?? DEFAULT_BATCH_SIZE;
let outdatedAgentPolicies = await getOutdatedAgentPoliciesBatch(soClient, batchSize);
logger.debug(`Found ${outdatedAgentPolicies.total} outdated agent policies`);
while (outdatedAgentPolicies.total > 0) {
const start = Date.now();
const outdatedAgentPolicyIds = outdatedAgentPolicies.items.map(
(outdatedAgentPolicy) => outdatedAgentPolicy.id
);
await agentPolicyService.deployPolicies(soClient, outdatedAgentPolicyIds);
outdatedAgentPolicies = await getOutdatedAgentPoliciesBatch(soClient);
outdatedAgentPolicies = await getOutdatedAgentPoliciesBatch(soClient, batchSize);
logger.debug(
`Upgraded ${outdatedAgentPolicyIds.length} agent policies in ${Date.now() - start}ms, ${
outdatedAgentPolicies.total
} remaining`
);
}
}

0 comments on commit 2713d13

Please sign in to comment.