Skip to content

Commit

Permalink
[Fleet] Run agent policy schema in batches during fleet setup + add `…
Browse files Browse the repository at this point in the history
…xpack.fleet.setup.agentPolicySchemaUpgradeBatchSize` config (elastic#150688)

## Summary

Closes elastic#150538 

As part of the Fleet plugin setup, we check to see if any agent policies
have an out of date `schema_version` and upgrade them. We encountered an
error when this upgrade happens on a large number of agent policies as
we attempted the upgrade in one large batch.

This pull request performs the schema upgrade in batches of 100 by
default and also adds the config value
`xpack.fleet.setup.agentPolicySchemaUpgradeBatchSize` to make the batch
size configurable.

I have also added more debug logging to show progress, and reduced the
response payload of one of our requests which was very large.

### Dev testing

To test this you need an environemnt with lots of agent policies (> 2k)
where `schema_version`
is not set. To create an environment with a large number of agent
policies I have added a new param to the agent creation script, I ran:

```
cd x-pack/plugins/fleet
node scripts/create_agents --count 20  --kibana http://127.0.0.1:5601/mark --status online --delete --batches 3000 --concurrentBatches 100
```

To generate 3000 agent policies each with 20 agents in.

I then modified the agent policies so that they require an upgrade, as
`system_indices_superuser` run:

```
POST /.kibana/_update_by_query
{
  "query": {
    "bool": {
      "filter": [
        {
          "term": {
            "type": "ingest-agent-policies"
          }
        }
      ]
    }
  },
  "script": {
    "source": "ctx._source['ingest-agent-policies'].remove('schema_version')",
    "lang": "painless"
  }
}
```

restarting kibana will run the setup and in batches.
  • Loading branch information
hop-dev authored Feb 9, 2023
1 parent b5dde1f commit 6e06452
Show file tree
Hide file tree
Showing 5 changed files with 72 additions and 21 deletions.
3 changes: 3 additions & 0 deletions x-pack/plugins/fleet/common/types/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@ export interface FleetConfigType {
packageVerification?: {
gpgKeyPath?: string;
};
setup?: {
agentPolicySchemaUpgradeBatchSize?: number;
};
developer?: {
disableRegistryVersionCheck?: boolean;
bundledPackageLocation?: string;
Expand Down
53 changes: 38 additions & 15 deletions x-pack/plugins/fleet/scripts/create_agents/create_agents.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ const printUsage = () =>
[--kibana]: full url of kibana instance to create agents and policy in e.g http://localhost:5601/mybase, defaults to http://localhost:5601
[--username]: username for kibana, defaults to elastic
[--password]: password for kibana, defaults to changeme
[--batches]: run the script in batches, defaults to 1 e.g if count is 50 and batches is 10, 500 agents will be created and 10 agent policies
[--concurrentBatches]: how many batches to run concurrently, defaults to 10
`);

const DEFAULT_KIBANA_URL = 'http://localhost:5601';
Expand All @@ -49,6 +51,8 @@ const {
agentVersion: agentVersionArg,
username: kbnUsername = DEFAULT_KIBANA_USERNAME,
password: kbnPassword = DEFAULT_KIBANA_PASSWORD,
batches: batchesArg,
concurrentBatches: concurrentBatchesArg = 10,
// ignore yargs positional args, we only care about named args
_,
$0,
Expand All @@ -59,6 +63,8 @@ const statusesArg = (statusArg as string).split(',') as AgentStatus[];
const inactivityTimeout = inactivityTimeoutArg
? Number(inactivityTimeoutArg).valueOf()
: DEFAULT_UNENROLL_TIMEOUT;
const batches = inactivityTimeoutArg ? Number(batchesArg).valueOf() : 1;
const concurrentBatches = concurrentBatchesArg ? Number(concurrentBatchesArg).valueOf() : 10;
const count = countArg ? Number(countArg).valueOf() : DEFAULT_AGENT_COUNT;
const kbnAuth = 'Basic ' + Buffer.from(kbnUsername + ':' + kbnPassword).toString('base64');

Expand Down Expand Up @@ -258,7 +264,7 @@ async function createAgentPolicy(id: string) {
name: id,
namespace: 'default',
description: '',
monitoring_enabled: ['logs'],
monitoring_enabled: ['logs', 'metrics'],
inactivity_timeout: inactivityTimeout,
}),
headers: {
Expand Down Expand Up @@ -314,23 +320,40 @@ export async function run() {
logger.info(`Deleted ${deleteRes.deleted} agents, took ${deleteRes.took}ms`);
}

logger.info('Creating agent policy');

const agentPolicyId = 'script-create-agent-' + uuidv4();
const agentPolicy = await createAgentPolicy(agentPolicyId);
logger.info(`Created agent policy ${agentPolicy.item.id}`);

logger.info('Creating fleet superuser');
const { role, user } = await createSuperUser();
logger.info(`Role "${ES_SUPERUSER}" ${role.role.created ? 'created' : 'already exists'}`);
logger.info(`User "${ES_SUPERUSER}" ${user.created ? 'created' : 'already exists'}`);

logger.info('Creating agent documents');
const statusMap = statusesArg.reduce((acc, status) => ({ ...acc, [status]: count }), {});
logStatusMap(statusMap);
const agents = createAgentsWithStatuses(statusMap, agentPolicyId, agentVersion);
const createRes = await createAgentDocsBulk(agents);
logger.info(
`Created ${createRes.items.length} agent docs, took ${createRes.took}, errors: ${createRes.errors}`
);
let batchesRemaining = batches;
let totalAgents = 0;
while (batchesRemaining > 0) {
const currentBatchSize = Math.min(concurrentBatches, batchesRemaining);
if (batches > 1) {
logger.info(`Running ${currentBatchSize} batches. ${batchesRemaining} batches remaining`);
}

await Promise.all(
Array(currentBatchSize)
.fill(0)
.map(async (__, i) => {
const agentPolicyId = 'script-create-agent-' + uuidv4();
const agentPolicy = await createAgentPolicy(agentPolicyId);
logger.info(`Created agent policy ${agentPolicy.item.id}`);

const statusMap = statusesArg.reduce((acc, status) => ({ ...acc, [status]: count }), {});
logStatusMap(statusMap);
const agents = createAgentsWithStatuses(statusMap, agentPolicyId, agentVersion);
const createRes = await createAgentDocsBulk(agents);
logger.info(
`Batch complete, created ${createRes.items.length} agent docs, took ${createRes.took}, errors: ${createRes.errors}`
);
totalAgents += createRes.items.length;
})
);

batchesRemaining -= currentBatchSize;
}

logger.info(`All batches complete. Created ${totalAgents} agents in total. Goodbye!`);
}
5 changes: 5 additions & 0 deletions x-pack/plugins/fleet/server/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,11 @@ export const config: PluginConfigDescriptor = {
fleetServerHosts: PreconfiguredFleetServerHostsSchema,
proxies: PreconfiguredFleetProxiesSchema,
agentIdVerificationEnabled: schema.boolean({ defaultValue: true }),
setup: schema.maybe(
schema.object({
agentPolicySchemaUpgradeBatchSize: schema.maybe(schema.number()),
})
),
developer: schema.object({
disableRegistryVersionCheck: schema.boolean({ defaultValue: false }),
allowAgentUpgradeSourceUri: schema.boolean({ defaultValue: false }),
Expand Down
10 changes: 9 additions & 1 deletion x-pack/plugins/fleet/server/services/agent_policy.ts
Original file line number Diff line number Diff line change
Expand Up @@ -315,15 +315,22 @@ class AgentPolicyService {
soClient: SavedObjectsClientContract,
options: ListWithKuery & {
withPackagePolicies?: boolean;
fields?: string[];
}
): Promise<{ items: AgentPolicy[]; total: number; page: number; perPage: number }> {
): Promise<{
items: AgentPolicy[];
total: number;
page: number;
perPage: number;
}> {
const {
page = 1,
perPage = 20,
sortField = 'updated_at',
sortOrder = 'desc',
kuery,
withPackagePolicies = false,
fields,
} = options;

const baseFindParams = {
Expand All @@ -332,6 +339,7 @@ class AgentPolicyService {
sortOrder,
page,
perPage,
...(fields ? { fields } : {}),
};
const filter = kuery ? normalizeKuery(SAVED_OBJECT_TYPE, kuery) : undefined;
let agentPoliciesSO;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,16 @@ import type { SavedObjectsClientContract } from '@kbn/core/server';
import {
AGENT_POLICY_SAVED_OBJECT_TYPE,
FLEET_AGENT_POLICIES_SCHEMA_VERSION,
SO_SEARCH_LIMIT,
} from '../../constants';
import { agentPolicyService } from '../agent_policy';
import { appContextService } from '../app_context';

function getOutdatedAgentPoliciesBatch(soClient: SavedObjectsClientContract) {
const DEFAULT_BATCH_SIZE = 100;
function getOutdatedAgentPoliciesBatch(soClient: SavedObjectsClientContract, batchSize: number) {
return agentPolicyService.list(soClient, {
perPage: SO_SEARCH_LIMIT,
perPage: batchSize,
kuery: `NOT ${AGENT_POLICY_SAVED_OBJECT_TYPE}.schema_version:${FLEET_AGENT_POLICIES_SCHEMA_VERSION}`,
fields: ['id'], // we only need the ID of the agent policy
});
}

Expand All @@ -26,13 +28,23 @@ function getOutdatedAgentPoliciesBatch(soClient: SavedObjectsClientContract) {
// deploy outdated policies to .fleet-policies index
// bump oudated SOs schema_version
export async function upgradeAgentPolicySchemaVersion(soClient: SavedObjectsClientContract) {
let outdatedAgentPolicies = await getOutdatedAgentPoliciesBatch(soClient);
const config = appContextService.getConfig();
const logger = appContextService.getLogger();

const batchSize = config?.setup?.agentPolicySchemaUpgradeBatchSize ?? DEFAULT_BATCH_SIZE;
let outdatedAgentPolicies = await getOutdatedAgentPoliciesBatch(soClient, batchSize);
logger.debug(`Found ${outdatedAgentPolicies.total} outdated agent policies`);
while (outdatedAgentPolicies.total > 0) {
const start = Date.now();
const outdatedAgentPolicyIds = outdatedAgentPolicies.items.map(
(outdatedAgentPolicy) => outdatedAgentPolicy.id
);
await agentPolicyService.deployPolicies(soClient, outdatedAgentPolicyIds);
outdatedAgentPolicies = await getOutdatedAgentPoliciesBatch(soClient);
outdatedAgentPolicies = await getOutdatedAgentPoliciesBatch(soClient, batchSize);
logger.debug(
`Upgraded ${outdatedAgentPolicyIds.length} agent policies in ${Date.now() - start}ms, ${
outdatedAgentPolicies.total
} remaining`
);
}
}

0 comments on commit 6e06452

Please sign in to comment.