Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Fleet] Run agent policy schema in batches during fleet setup + add xpack.fleet.setup.agentPolicySchemaUpgradeBatchSize config #150688

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions x-pack/plugins/fleet/common/types/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@ export interface FleetConfigType {
packageVerification?: {
gpgKeyPath?: string;
};
setup?: {
agentPolicySchemaUpgradeBatchSize?: number;
};
developer?: {
disableRegistryVersionCheck?: boolean;
bundledPackageLocation?: string;
Expand Down
53 changes: 38 additions & 15 deletions x-pack/plugins/fleet/scripts/create_agents/create_agents.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ const printUsage = () =>
[--kibana]: full url of kibana instance to create agents and policy in e.g http://localhost:5601/mybase, defaults to http://localhost:5601
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is just changes to the test script to make creating envs with lots of agent policies easier.

[--username]: username for kibana, defaults to elastic
[--password]: password for kibana, defaults to changeme
[--batches]: run the script in batches, defaults to 1 e.g if count is 50 and batches is 10, 500 agents will be created and 10 agent policies
[--concurrentBatches]: how many batches to run concurrently, defaults to 10
`);

const DEFAULT_KIBANA_URL = 'http://localhost:5601';
Expand All @@ -49,6 +51,8 @@ const {
agentVersion: agentVersionArg,
username: kbnUsername = DEFAULT_KIBANA_USERNAME,
password: kbnPassword = DEFAULT_KIBANA_PASSWORD,
batches: batchesArg,
concurrentBatches: concurrentBatchesArg = 10,
// ignore yargs positional args, we only care about named args
_,
$0,
Expand All @@ -59,6 +63,8 @@ const statusesArg = (statusArg as string).split(',') as AgentStatus[];
const inactivityTimeout = inactivityTimeoutArg
? Number(inactivityTimeoutArg).valueOf()
: DEFAULT_UNENROLL_TIMEOUT;
const batches = inactivityTimeoutArg ? Number(batchesArg).valueOf() : 1;
const concurrentBatches = concurrentBatchesArg ? Number(concurrentBatchesArg).valueOf() : 10;
const count = countArg ? Number(countArg).valueOf() : DEFAULT_AGENT_COUNT;
const kbnAuth = 'Basic ' + Buffer.from(kbnUsername + ':' + kbnPassword).toString('base64');

Expand Down Expand Up @@ -258,7 +264,7 @@ async function createAgentPolicy(id: string) {
name: id,
namespace: 'default',
description: '',
monitoring_enabled: ['logs'],
monitoring_enabled: ['logs', 'metrics'],
inactivity_timeout: inactivityTimeout,
}),
headers: {
Expand Down Expand Up @@ -314,23 +320,40 @@ export async function run() {
logger.info(`Deleted ${deleteRes.deleted} agents, took ${deleteRes.took}ms`);
}

logger.info('Creating agent policy');

const agentPolicyId = 'script-create-agent-' + uuidv4();
const agentPolicy = await createAgentPolicy(agentPolicyId);
logger.info(`Created agent policy ${agentPolicy.item.id}`);

logger.info('Creating fleet superuser');
const { role, user } = await createSuperUser();
logger.info(`Role "${ES_SUPERUSER}" ${role.role.created ? 'created' : 'already exists'}`);
logger.info(`User "${ES_SUPERUSER}" ${user.created ? 'created' : 'already exists'}`);

logger.info('Creating agent documents');
const statusMap = statusesArg.reduce((acc, status) => ({ ...acc, [status]: count }), {});
logStatusMap(statusMap);
const agents = createAgentsWithStatuses(statusMap, agentPolicyId, agentVersion);
const createRes = await createAgentDocsBulk(agents);
logger.info(
`Created ${createRes.items.length} agent docs, took ${createRes.took}, errors: ${createRes.errors}`
);
let batchesRemaining = batches;
let totalAgents = 0;
while (batchesRemaining > 0) {
const currentBatchSize = Math.min(concurrentBatches, batchesRemaining);
if (batches > 1) {
logger.info(`Running ${currentBatchSize} batches. ${batchesRemaining} batches remaining`);
}

await Promise.all(
Array(currentBatchSize)
.fill(0)
.map(async (__, i) => {
const agentPolicyId = 'script-create-agent-' + uuidv4();
const agentPolicy = await createAgentPolicy(agentPolicyId);
logger.info(`Created agent policy ${agentPolicy.item.id}`);

const statusMap = statusesArg.reduce((acc, status) => ({ ...acc, [status]: count }), {});
logStatusMap(statusMap);
const agents = createAgentsWithStatuses(statusMap, agentPolicyId, agentVersion);
const createRes = await createAgentDocsBulk(agents);
logger.info(
`Batch complete, created ${createRes.items.length} agent docs, took ${createRes.took}, errors: ${createRes.errors}`
);
totalAgents += createRes.items.length;
})
);

batchesRemaining -= currentBatchSize;
}

logger.info(`All batches complete. Created ${totalAgents} agents in total. Goodbye!`);
}
5 changes: 5 additions & 0 deletions x-pack/plugins/fleet/server/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,11 @@ export const config: PluginConfigDescriptor = {
fleetServerHosts: PreconfiguredFleetServerHostsSchema,
proxies: PreconfiguredFleetProxiesSchema,
agentIdVerificationEnabled: schema.boolean({ defaultValue: true }),
setup: schema.maybe(
schema.object({
agentPolicySchemaUpgradeBatchSize: schema.maybe(schema.number()),
})
),
developer: schema.object({
disableRegistryVersionCheck: schema.boolean({ defaultValue: false }),
allowAgentUpgradeSourceUri: schema.boolean({ defaultValue: false }),
Expand Down
10 changes: 9 additions & 1 deletion x-pack/plugins/fleet/server/services/agent_policy.ts
Original file line number Diff line number Diff line change
Expand Up @@ -315,15 +315,22 @@ class AgentPolicyService {
soClient: SavedObjectsClientContract,
options: ListWithKuery & {
withPackagePolicies?: boolean;
fields?: string[];
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added the ability to restrict the agent policy fields returned to reduce payload size.

}
): Promise<{ items: AgentPolicy[]; total: number; page: number; perPage: number }> {
): Promise<{
items: AgentPolicy[];
total: number;
page: number;
perPage: number;
}> {
const {
page = 1,
perPage = 20,
sortField = 'updated_at',
sortOrder = 'desc',
kuery,
withPackagePolicies = false,
fields,
} = options;

const baseFindParams = {
Expand All @@ -332,6 +339,7 @@ class AgentPolicyService {
sortOrder,
page,
perPage,
...(fields ? { fields } : {}),
};
const filter = kuery ? normalizeKuery(SAVED_OBJECT_TYPE, kuery) : undefined;
let agentPoliciesSO;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,16 @@ import type { SavedObjectsClientContract } from '@kbn/core/server';
import {
AGENT_POLICY_SAVED_OBJECT_TYPE,
FLEET_AGENT_POLICIES_SCHEMA_VERSION,
SO_SEARCH_LIMIT,
} from '../../constants';
import { agentPolicyService } from '../agent_policy';
import { appContextService } from '../app_context';

function getOutdatedAgentPoliciesBatch(soClient: SavedObjectsClientContract) {
const DEFAULT_BATCH_SIZE = 100;
function getOutdatedAgentPoliciesBatch(soClient: SavedObjectsClientContract, batchSize: number) {
return agentPolicyService.list(soClient, {
perPage: SO_SEARCH_LIMIT,
perPage: batchSize,
kuery: `NOT ${AGENT_POLICY_SAVED_OBJECT_TYPE}.schema_version:${FLEET_AGENT_POLICIES_SCHEMA_VERSION}`,
fields: ['id'], // we only need the ID of the agent policy
});
}

Expand All @@ -26,13 +28,23 @@ function getOutdatedAgentPoliciesBatch(soClient: SavedObjectsClientContract) {
// deploy outdated policies to .fleet-policies index
// bump oudated SOs schema_version
export async function upgradeAgentPolicySchemaVersion(soClient: SavedObjectsClientContract) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great improvements!
It would be great to add an integration test with a small batch size.
I'm curious how long it takes to update a large set of agent policies in batches of 100. Hopefully not too long.

Copy link
Contributor Author

@hop-dev hop-dev Feb 9, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It does take a long time (e.g 50s per 1000 agent policies in my local dev env) but its such an expensive operation I didn't dare put the default higher at risk of overwhelming elastic and kibana. My reasoning was that I suspect the vast majority of users have less than 100 agent policies anyway.

let outdatedAgentPolicies = await getOutdatedAgentPoliciesBatch(soClient);
const config = appContextService.getConfig();
const logger = appContextService.getLogger();

const batchSize = config?.setup?.agentPolicySchemaUpgradeBatchSize ?? DEFAULT_BATCH_SIZE;
let outdatedAgentPolicies = await getOutdatedAgentPoliciesBatch(soClient, batchSize);
logger.debug(`Found ${outdatedAgentPolicies.total} outdated agent policies`);
while (outdatedAgentPolicies.total > 0) {
const start = Date.now();
const outdatedAgentPolicyIds = outdatedAgentPolicies.items.map(
(outdatedAgentPolicy) => outdatedAgentPolicy.id
);
await agentPolicyService.deployPolicies(soClient, outdatedAgentPolicyIds);
outdatedAgentPolicies = await getOutdatedAgentPoliciesBatch(soClient);
outdatedAgentPolicies = await getOutdatedAgentPoliciesBatch(soClient, batchSize);
logger.debug(
`Upgraded ${outdatedAgentPolicyIds.length} agent policies in ${Date.now() - start}ms, ${
outdatedAgentPolicies.total
} remaining`
);
}
}