From e00e26e86854bdbde7c14f88453b717505fed4d9 Mon Sep 17 00:00:00 2001 From: Julia Bardi <90178898+juliaElastic@users.noreply.github.com> Date: Wed, 23 Nov 2022 10:22:20 +0100 Subject: [PATCH] Fleet Usage telemetry extension (#145353) ## Summary Closes https://github.com/elastic/ingest-dev/issues/1261 Added a snippet to the telemetry that I added for each requirement. Please review and let me know if any changes are needed. Also asked a few questions below. @jlind23 @kpollich 6. is blocked by [elasticsearch change](https://github.com/elastic/elasticsearch/pull/91701) to give kibana_system the missing privilege to read logs-elastic_agent* indices. Took inspiration for task versioning from https://github.com/elastic/kibana/pull/144494/files#diff-0c7c49bf5c55c45c19e9c42d5428e99e52c3a39dd6703633f427724d36108186 - [x] 1. Elastic Agent versions Versions of all the Elastic Agent running: `agent.version` field on `.fleet-agents` documents ``` "agent_versions": [ "8.6.0" ], ``` - [x] 2. Fleet server configuration Think we can query for `.fleet-policies` where some `input` has `type: 'fleet-server'` for this, as well as use the `Fleet Server Hosts` settings that we define via saved objects in Fleet ``` "fleet_server_config": { "policies": [ { "input_config": { "server": { "limits.max_agents": 10000 }, "server.runtime": "gc_percent:20" } } ] } ``` - [x] 3. Number of policies Count of `.fleet-policies` index To confirm, did we mean agent policies here? ``` "agent_policies": { "count": 7, ``` - [x] 4. Output type contained in those policies Collecting this from ts logic, querying from `.fleet-policies` index. The alternative would be to write a painless script (because the `outputs` are an object with dynamic keys, we can't do an aggregation directly). ``` "agent_policies": { "output_types": [ "elasticsearch" ] } ``` Did we mean to just collect the types here, or any other info? e.g. output urls - [x] 5. Average number of checkin failures We only have the most recent checkin status and timestamp on `.fleet-agents`. Do we mean here to publish the total last checkin failure count? E.g. 3 if 3 agents are in failure checkin status currently. Or do we mean to publish specific info for all agents (`last_checkin_status`, `last_checkin` time, `last_checkin_message`)? Are the only statuses `error` and `degraded` that we want to send? ``` "agent_last_checkin_status": { "error": 0, "degraded": 0 }, ``` - [ ] 6. Top 3 most common errors in the Elastic Agent logs Do we mean here elastic-agent logs only, or fleet-server logs as well (maybe separately)? I found an alternative way to query the message field using sampler and categorize text aggregation: ``` GET logs-elastic_agent*/_search { "size": 0, "query": { "bool": { "must": [ { "term": { "log.level": "error" } }, { "range": { "@timestamp": { "gte": "now-1h" } } } ] } }, "aggregations": { "message_sample": { "sampler": { "shard_size": 200 }, "aggs": { "categories": { "categorize_text": { "field": "message", "size": 10 } } } } } } ``` Example response: ``` "aggregations": { "message_sample": { "doc_count": 112, "categories": { "buckets": [ { "doc_count": 73, "key": "failed to unenroll offline agents", "regex": ".*?failed.+?to.+?unenroll.+?offline.+?agents.*?", "max_matching_length": 36 }, { "doc_count": 7, "key": """stderr panic close of closed channel n ngoroutine running Stop ngithub.com/elastic/beats/v7/libbeat/cmd/instance Beat launch.func5 \n\t/go/src/github.com/elastic/beats/libbeat/cmd/instance/beat.go n ``` - [x] 7. Number of checkin failure over the past period of time I think this is almost the same as #5. The difference would be to report new failures happened only in the last hour, or report all agents in failure state. (which would be an increasing number if the agent stays in failed state). Do we want these 2 separate telemetry fields? EDIT: removed the last1hr query, instead added a new field to report agents enrolled per policy (top 10). See comments below. ``` "agent_checkin_status": { "error": 3, "degraded": 0 }, "agents_per_policy": [2, 1000], ``` - [x] 8. Number of Elastic Agent and number of fleet server This is already there in the existing telemetry: ``` "agents": { "total_enrolled": 0, "healthy": 0, "unhealthy": 0, "offline": 0, "total_all_statuses": 1, "updating": 0 }, "fleet_server": { "total_enrolled": 0, "healthy": 0, "unhealthy": 0, "offline": 0, "updating": 0, "total_all_statuses": 0, "num_host_urls": 1 }, ``` ### Checklist - [ ] [Unit or functional tests](https://www.elastic.co/guide/en/kibana/master/development-tests.html) were updated or added to match the most common scenarios Co-authored-by: Kibana Machine <42973632+kibanamachine@users.noreply.github.com> --- .../server/collectors/agent_collectors.ts | 85 +++++- .../fleet/server/collectors/agent_policies.ts | 61 +++++ .../collectors/fleet_server_collector.ts | 46 ++++ .../fleet/server/collectors/register.ts | 33 ++- .../fleet_usage_telemetry.test.ts | 256 ++++++++++++++++++ x-pack/plugins/fleet/server/plugin.ts | 17 +- .../server/services/fleet_usage_sender.ts | 187 ------------- x-pack/plugins/fleet/server/services/index.ts | 2 +- .../services/telemetry/fleet_usage_sender.ts | 132 +++++++++ .../services/telemetry/fleet_usages_schema.ts | 168 ++++++++++++ 10 files changed, 783 insertions(+), 204 deletions(-) create mode 100644 x-pack/plugins/fleet/server/collectors/agent_policies.ts create mode 100644 x-pack/plugins/fleet/server/integration_tests/fleet_usage_telemetry.test.ts delete mode 100644 x-pack/plugins/fleet/server/services/fleet_usage_sender.ts create mode 100644 x-pack/plugins/fleet/server/services/telemetry/fleet_usage_sender.ts create mode 100644 x-pack/plugins/fleet/server/services/telemetry/fleet_usages_schema.ts diff --git a/x-pack/plugins/fleet/server/collectors/agent_collectors.ts b/x-pack/plugins/fleet/server/collectors/agent_collectors.ts index 48289674621166..7861e7ac606a45 100644 --- a/x-pack/plugins/fleet/server/collectors/agent_collectors.ts +++ b/x-pack/plugins/fleet/server/collectors/agent_collectors.ts @@ -7,8 +7,9 @@ import type { SavedObjectsClient, ElasticsearchClient } from '@kbn/core/server'; -import type { FleetConfigType } from '../../common/types'; +import { AGENTS_INDEX } from '../../common'; import * as AgentService from '../services/agents'; +import { appContextService } from '../services'; export interface AgentUsage { total_enrolled: number; @@ -20,7 +21,6 @@ export interface AgentUsage { } export const getAgentUsage = async ( - config: FleetConfigType, soClient?: SavedObjectsClient, esClient?: ElasticsearchClient ): Promise => { @@ -47,3 +47,84 @@ export const getAgentUsage = async ( updating, }; }; + +export interface AgentData { + agent_versions: string[]; + agent_checkin_status: { + error: number; + degraded: number; + }; + agents_per_policy: number[]; +} + +const DEFAULT_AGENT_DATA = { + agent_versions: [], + agent_checkin_status: { error: 0, degraded: 0 }, + agents_per_policy: [], +}; + +export const getAgentData = async ( + esClient: ElasticsearchClient, + abortController: AbortController +): Promise => { + try { + const transformLastCheckinStatusBuckets = (resp: any) => + ((resp?.aggregations?.last_checkin_status as any).buckets ?? []).reduce( + (acc: any, bucket: any) => { + if (acc[bucket.key] !== undefined) acc[bucket.key] = bucket.doc_count; + return acc; + }, + { error: 0, degraded: 0 } + ); + const response = await esClient.search( + { + index: AGENTS_INDEX, + query: { + bool: { + filter: [ + { + term: { + active: 'true', + }, + }, + ], + }, + }, + size: 0, + aggs: { + versions: { + terms: { field: 'agent.version' }, + }, + last_checkin_status: { + terms: { field: 'last_checkin_status' }, + }, + policies: { + terms: { field: 'policy_id' }, + }, + }, + }, + { signal: abortController.signal } + ); + const versions = ((response?.aggregations?.versions as any).buckets ?? []).map( + (bucket: any) => bucket.key + ); + const statuses = transformLastCheckinStatusBuckets(response); + + const agentsPerPolicy = ((response?.aggregations?.policies as any).buckets ?? []).map( + (bucket: any) => bucket.doc_count + ); + + return { + agent_versions: versions, + agent_checkin_status: statuses, + agents_per_policy: agentsPerPolicy, + }; + } catch (error) { + if (error.statusCode === 404) { + appContextService.getLogger().debug('Index .fleet-agents does not exist yet.'); + } else { + throw error; + } + return DEFAULT_AGENT_DATA; + } +}; diff --git a/x-pack/plugins/fleet/server/collectors/agent_policies.ts b/x-pack/plugins/fleet/server/collectors/agent_policies.ts new file mode 100644 index 00000000000000..bd8075b09fd066 --- /dev/null +++ b/x-pack/plugins/fleet/server/collectors/agent_policies.ts @@ -0,0 +1,61 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import type { ElasticsearchClient } from '@kbn/core/server'; + +import { AGENT_POLICY_INDEX } from '../../common'; +import { ES_SEARCH_LIMIT } from '../../common/constants'; +import { appContextService } from '../services'; + +export interface AgentPoliciesUsage { + count: number; + output_types: string[]; +} + +const DEFAULT_AGENT_POLICIES_USAGE = { + count: 0, + output_types: [], +}; + +export const getAgentPoliciesUsage = async ( + esClient: ElasticsearchClient, + abortController: AbortController +): Promise => { + try { + const res = await esClient.search( + { + index: AGENT_POLICY_INDEX, + size: ES_SEARCH_LIMIT, + track_total_hits: true, + rest_total_hits_as_int: true, + }, + { signal: abortController.signal } + ); + + const agentPolicies = res.hits.hits; + + const outputTypes = new Set(); + agentPolicies.forEach((item) => { + const source = (item._source as any) ?? {}; + Object.keys(source.data.outputs).forEach((output) => { + outputTypes.add(source.data.outputs[output].type); + }); + }); + + return { + count: res.hits.total as number, + output_types: Array.from(outputTypes), + }; + } catch (error) { + if (error.statusCode === 404) { + appContextService.getLogger().debug('Index .fleet-policies does not exist yet.'); + } else { + throw error; + } + return DEFAULT_AGENT_POLICIES_USAGE; + } +}; diff --git a/x-pack/plugins/fleet/server/collectors/fleet_server_collector.ts b/x-pack/plugins/fleet/server/collectors/fleet_server_collector.ts index 4438b23c8a2858..4d587e78563f65 100644 --- a/x-pack/plugins/fleet/server/collectors/fleet_server_collector.ts +++ b/x-pack/plugins/fleet/server/collectors/fleet_server_collector.ts @@ -7,6 +7,8 @@ import type { SavedObjectsClient, ElasticsearchClient } from '@kbn/core/server'; +import { PACKAGE_POLICY_SAVED_OBJECT_TYPE, SO_SEARCH_LIMIT } from '../constants'; + import { packagePolicyService } from '../services'; import { getAgentStatusForAgentPolicy } from '../services/agents'; import { listFleetServerHosts } from '../services/fleet_server_host'; @@ -84,3 +86,47 @@ export const getFleetServerUsage = async ( num_host_urls: numHostsUrls, }; }; + +export const getFleetServerConfig = async (soClient: SavedObjectsClient): Promise => { + const res = await packagePolicyService.list(soClient, { + page: 1, + perPage: SO_SEARCH_LIMIT, + kuery: `${PACKAGE_POLICY_SAVED_OBJECT_TYPE}.package.name:fleet_server`, + }); + const getInputConfig = (item: any) => { + const config = (item.inputs[0] ?? {}).compiled_input; + if (config?.server) { + // whitelist only server limits, timeouts and runtime, sometimes fields are coming in "server.limits" format instead of nested object + const newConfig = Object.keys(config) + .filter((key) => key.startsWith('server')) + .reduce((acc: any, curr: string) => { + if (curr === 'server') { + acc.server = {}; + Object.keys(config.server) + .filter( + (key) => + key.startsWith('limits') || + key.startsWith('timeouts') || + key.startsWith('runtime') + ) + .forEach((serverKey: string) => { + acc.server[serverKey] = config.server[serverKey]; + return acc; + }); + } else { + acc[curr] = config[curr]; + } + return acc; + }, {}); + + return newConfig; + } else { + return {}; + } + }; + const policies = res.items.map((item) => ({ + input_config: getInputConfig(item), + })); + + return { policies }; +}; diff --git a/x-pack/plugins/fleet/server/collectors/register.ts b/x-pack/plugins/fleet/server/collectors/register.ts index a194ff9b560e51..2892de0685e2f9 100644 --- a/x-pack/plugins/fleet/server/collectors/register.ts +++ b/x-pack/plugins/fleet/server/collectors/register.ts @@ -11,13 +11,14 @@ import type { CoreSetup } from '@kbn/core/server'; import type { FleetConfigType } from '..'; import { getIsAgentsEnabled } from './config_collectors'; -import { getAgentUsage } from './agent_collectors'; +import { getAgentUsage, getAgentData } from './agent_collectors'; import type { AgentUsage } from './agent_collectors'; import { getInternalClients } from './helpers'; import { getPackageUsage } from './package_collectors'; import type { PackageUsage } from './package_collectors'; -import { getFleetServerUsage } from './fleet_server_collector'; +import { getFleetServerUsage, getFleetServerConfig } from './fleet_server_collector'; import type { FleetServerUsage } from './fleet_server_collector'; +import { getAgentPoliciesUsage } from './agent_policies'; export interface Usage { agents_enabled: boolean; @@ -26,11 +27,33 @@ export interface Usage { fleet_server: FleetServerUsage; } -export const fetchUsage = async (core: CoreSetup, config: FleetConfigType) => { +export const fetchFleetUsage = async ( + core: CoreSetup, + config: FleetConfigType, + abortController: AbortController +) => { + const [soClient, esClient] = await getInternalClients(core); + if (!soClient || !esClient) { + return; + } + const usage = { + agents_enabled: getIsAgentsEnabled(config), + agents: await getAgentUsage(soClient, esClient), + fleet_server: await getFleetServerUsage(soClient, esClient), + packages: await getPackageUsage(soClient), + ...(await getAgentData(esClient, abortController)), + fleet_server_config: await getFleetServerConfig(soClient), + agent_policies: await getAgentPoliciesUsage(esClient, abortController), + }; + return usage; +}; + +// used by kibana daily collector +const fetchUsage = async (core: CoreSetup, config: FleetConfigType) => { const [soClient, esClient] = await getInternalClients(core); const usage = { agents_enabled: getIsAgentsEnabled(config), - agents: await getAgentUsage(config, soClient, esClient), + agents: await getAgentUsage(soClient, esClient), fleet_server: await getFleetServerUsage(soClient, esClient), packages: await getPackageUsage(soClient), }; @@ -41,7 +64,7 @@ export const fetchAgentsUsage = async (core: CoreSetup, config: FleetConfigType) const [soClient, esClient] = await getInternalClients(core); const usage = { agents_enabled: getIsAgentsEnabled(config), - agents: await getAgentUsage(config, soClient, esClient), + agents: await getAgentUsage(soClient, esClient), fleet_server: await getFleetServerUsage(soClient, esClient), }; return usage; diff --git a/x-pack/plugins/fleet/server/integration_tests/fleet_usage_telemetry.test.ts b/x-pack/plugins/fleet/server/integration_tests/fleet_usage_telemetry.test.ts new file mode 100644 index 00000000000000..5197b34fc89fe4 --- /dev/null +++ b/x-pack/plugins/fleet/server/integration_tests/fleet_usage_telemetry.test.ts @@ -0,0 +1,256 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import path from 'path'; + +import * as kbnTestServer from '@kbn/core/test_helpers/kbn_server'; + +import { fetchFleetUsage } from '../collectors/register'; + +import { waitForFleetSetup } from './helpers'; + +const logFilePath = path.join(__dirname, 'logs.log'); + +describe('fleet usage telemetry', () => { + let core: any; + let esServer: kbnTestServer.TestElasticsearchUtils; + let kbnServer: kbnTestServer.TestKibanaUtils; + const registryUrl = 'http://localhost'; + + const startServers = async () => { + const { startES } = kbnTestServer.createTestServers({ + adjustTimeout: (t) => jest.setTimeout(t), + settings: { + es: { + license: 'trial', + }, + kbn: {}, + }, + }); + + esServer = await startES(); + const startKibana = async () => { + const root = kbnTestServer.createRootWithCorePlugins( + { + xpack: { + fleet: { + registryUrl, + agentPolicies: [ + { + name: 'Second preconfigured policy', + description: 'second policy', + is_default: false, + is_managed: true, + id: 'test-456789', + namespace: 'default', + monitoring_enabled: [], + package_policies: [], + }, + ], + }, + }, + logging: { + appenders: { + file: { + type: 'file', + fileName: logFilePath, + layout: { + type: 'json', + }, + }, + }, + loggers: [ + { + name: 'root', + appenders: ['file'], + }, + { + name: 'plugins.fleet', + level: 'info', + }, + ], + }, + }, + { oss: false } + ); + + await root.preboot(); + const coreSetup = await root.setup(); + const coreStart = await root.start(); + + return { + root, + coreSetup, + coreStart, + stop: async () => await root.shutdown(), + }; + }; + kbnServer = await startKibana(); + await waitForFleetSetup(kbnServer.root); + }; + + const stopServers = async () => { + if (kbnServer) { + await kbnServer.stop(); + } + + if (esServer) { + await esServer.stop(); + } + + await new Promise((res) => setTimeout(res, 10000)); + }; + + beforeAll(async () => { + await startServers(); + + const esClient = kbnServer.coreStart.elasticsearch.client.asInternalUser; + await esClient.bulk({ + index: '.fleet-agents', + body: [ + { + create: { + _id: 'agent1', + }, + }, + { + agent: { + version: '8.6.0', + }, + last_checkin_status: 'error', + last_checkin: '2022-11-21T12:26:24Z', + active: true, + policy_id: 'policy1', + }, + { + create: { + _id: 'agent2', + }, + }, + { + agent: { + version: '8.5.1', + }, + last_checkin_status: 'degraded', + last_checkin: '2022-11-21T12:27:24Z', + active: true, + policy_id: 'policy1', + }, + { + create: { + _id: 'inactive', + }, + }, + { + agent: { + version: '8.5.1', + }, + last_checkin_status: 'online', + last_checkin: '2021-11-21T12:27:24Z', + active: false, + policy_id: 'policy1', + }, + ], + refresh: 'wait_for', + }); + + await esClient.create({ + index: '.fleet-policies', + id: 'policy1', + body: { + data: { + id: 'fleet-server-policy', + outputs: { + default: { + type: 'elasticsearch', + }, + }, + }, + }, + refresh: 'wait_for', + }); + + const soClient = kbnServer.coreStart.savedObjects.createInternalRepository(); + await soClient.create('ingest-package-policies', { + name: 'fleet_server-1', + namespace: 'default', + package: { + name: 'fleet_server', + title: 'Fleet Server', + version: '1.2.0', + }, + enabled: true, + policy_id: 'fleet-server-policy', + inputs: [ + { + compiled_input: { + server: { + port: 8220, + host: '0.0.0.0', + 'limits.max_agents': 3000, + other: 'other', + }, + 'server.runtime': 'gc_percent:20', + ssl: 'ssl', + }, + }, + ], + }); + }); + + afterAll(async () => { + await stopServers(); + }); + + beforeEach(() => { + core = { getStartServices: jest.fn().mockResolvedValue([kbnServer.coreStart]) }; + }); + + it('should fetch usage telemetry', async () => { + const usage = await fetchFleetUsage(core, { agents: { enabled: true } }, new AbortController()); + + expect(usage).toEqual( + expect.objectContaining({ + agents_enabled: true, + agents: { + total_enrolled: 2, + healthy: 0, + unhealthy: 0, + offline: 2, + total_all_statuses: 3, + updating: 0, + }, + fleet_server: { + total_all_statuses: 0, + total_enrolled: 0, + healthy: 0, + unhealthy: 0, + offline: 0, + updating: 0, + num_host_urls: 0, + }, + packages: [], + agent_versions: ['8.5.1', '8.6.0'], + agent_checkin_status: { error: 1, degraded: 1 }, + agents_per_policy: [2], + fleet_server_config: { + policies: [ + { + input_config: { + server: { + 'limits.max_agents': 3000, + }, + 'server.runtime': 'gc_percent:20', + }, + }, + ], + }, + agent_policies: { count: 3, output_types: ['elasticsearch'] }, + }) + ); + }); +}); diff --git a/x-pack/plugins/fleet/server/plugin.ts b/x-pack/plugins/fleet/server/plugin.ts index 58043a2d3203b8..cadb7859cc2e5d 100644 --- a/x-pack/plugins/fleet/server/plugin.ts +++ b/x-pack/plugins/fleet/server/plugin.ts @@ -88,7 +88,11 @@ import { AgentServiceImpl, PackageServiceImpl, } from './services'; -import { registerFleetUsageCollector, fetchUsage, fetchAgentsUsage } from './collectors/register'; +import { + registerFleetUsageCollector, + fetchAgentsUsage, + fetchFleetUsage, +} from './collectors/register'; import { getAuthzFromRequest, makeRouterWithFleetAuthz } from './routes/security'; import { FleetArtifactsClient } from './services/artifacts'; import type { FleetRouter } from './types/request_context'; @@ -370,14 +374,9 @@ export class FleetPlugin // Register usage collection registerFleetUsageCollector(core, config, deps.usageCollection); - const fetch = async () => fetchUsage(core, config); - this.fleetUsageSender = new FleetUsageSender( - deps.taskManager, - core, - fetch, - this.kibanaVersion, - this.isProductionMode - ); + const fetch = async (abortController: AbortController) => + await fetchFleetUsage(core, config, abortController); + this.fleetUsageSender = new FleetUsageSender(deps.taskManager, core, fetch); registerFleetUsageLogger(deps.taskManager, async () => fetchAgentsUsage(core, config)); const router: FleetRouter = core.http.createRouter(); diff --git a/x-pack/plugins/fleet/server/services/fleet_usage_sender.ts b/x-pack/plugins/fleet/server/services/fleet_usage_sender.ts deleted file mode 100644 index ada764fcff9278..00000000000000 --- a/x-pack/plugins/fleet/server/services/fleet_usage_sender.ts +++ /dev/null @@ -1,187 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License - * 2.0; you may not use this file except in compliance with the Elastic License - * 2.0. - */ -import type { - ConcreteTaskInstance, - TaskManagerStartContract, - TaskManagerSetupContract, -} from '@kbn/task-manager-plugin/server'; -import type { CoreSetup } from '@kbn/core/server'; - -import type { Usage } from '../collectors/register'; - -import { appContextService } from './app_context'; - -const EVENT_TYPE = 'fleet_usage'; - -export class FleetUsageSender { - private taskManager?: TaskManagerStartContract; - private taskId = 'Fleet-Usage-Sender-Task'; - private taskType = 'Fleet-Usage-Sender'; - - constructor( - taskManager: TaskManagerSetupContract, - core: CoreSetup, - fetchUsage: () => Promise, - kibanaVersion: string, - isProductionMode: boolean - ) { - taskManager.registerTaskDefinitions({ - [this.taskType]: { - title: 'Fleet Usage Sender', - timeout: '1m', - maxAttempts: 1, - createTaskRunner: ({ taskInstance }: { taskInstance: ConcreteTaskInstance }) => { - return { - async run() { - appContextService.getLogger().info('Running Fleet Usage telemetry send task'); - - try { - const usageData = await fetchUsage(); - appContextService.getLogger().debug(JSON.stringify(usageData)); - core.analytics.reportEvent(EVENT_TYPE, usageData); - } catch (error) { - appContextService - .getLogger() - .error('Error occurred while sending Fleet Usage telemetry: ' + error); - } - }, - - async cancel() {}, - }; - }, - }, - }); - this.registerTelemetryEventType(core); - } - - public async start(taskManager: TaskManagerStartContract) { - this.taskManager = taskManager; - - appContextService.getLogger().info(`Task ${this.taskId} scheduled with interval 1h`); - await this.taskManager?.ensureScheduled({ - id: this.taskId, - taskType: this.taskType, - schedule: { - interval: '1h', - }, - scope: ['fleet'], - state: {}, - params: {}, - }); - } - - /** - * took schema from [here](https://github.com/elastic/kibana/blob/main/x-pack/plugins/fleet/server/collectors/register.ts#L53) and adapted to EBT format - */ - private registerTelemetryEventType(core: CoreSetup): void { - core.analytics.registerEventType({ - eventType: EVENT_TYPE, - schema: { - agents_enabled: { type: 'boolean', _meta: { description: 'agents enabled' } }, - agents: { - properties: { - total_enrolled: { - type: 'long', - _meta: { - description: 'The total number of enrolled agents, in any state', - }, - }, - healthy: { - type: 'long', - _meta: { - description: 'The total number of enrolled agents in a healthy state', - }, - }, - unhealthy: { - type: 'long', - _meta: { - description: 'The total number of enrolled agents in an unhealthy state', - }, - }, - updating: { - type: 'long', - _meta: { - description: 'The total number of enrolled agents in an updating state', - }, - }, - offline: { - type: 'long', - _meta: { - description: 'The total number of enrolled agents currently offline', - }, - }, - total_all_statuses: { - type: 'long', - _meta: { - description: 'The total number of agents in any state, both enrolled and inactive', - }, - }, - }, - }, - fleet_server: { - properties: { - total_enrolled: { - type: 'long', - _meta: { - description: 'The total number of enrolled Fleet Server agents, in any state', - }, - }, - total_all_statuses: { - type: 'long', - _meta: { - description: - 'The total number of Fleet Server agents in any state, both enrolled and inactive.', - }, - }, - healthy: { - type: 'long', - _meta: { - description: 'The total number of enrolled Fleet Server agents in a healthy state.', - }, - }, - unhealthy: { - type: 'long', - _meta: { - description: - 'The total number of enrolled Fleet Server agents in an unhealthy state', - }, - }, - updating: { - type: 'long', - _meta: { - description: - 'The total number of enrolled Fleet Server agents in an updating state', - }, - }, - offline: { - type: 'long', - _meta: { - description: 'The total number of enrolled Fleet Server agents currently offline', - }, - }, - num_host_urls: { - type: 'long', - _meta: { - description: 'The number of Fleet Server hosts configured in Fleet settings.', - }, - }, - }, - }, - packages: { - type: 'array', - items: { - properties: { - name: { type: 'keyword' }, - version: { type: 'keyword' }, - enabled: { type: 'boolean' }, - }, - }, - }, - }, - }); - } -} diff --git a/x-pack/plugins/fleet/server/services/index.ts b/x-pack/plugins/fleet/server/services/index.ts index 6078e2696f3b91..133d5cf88d71c7 100644 --- a/x-pack/plugins/fleet/server/services/index.ts +++ b/x-pack/plugins/fleet/server/services/index.ts @@ -63,4 +63,4 @@ export type { PackageService, PackageClient } from './epm'; // Fleet server policy config export { migrateSettingsToFleetServerHost } from './fleet_server_host'; -export { FleetUsageSender } from './fleet_usage_sender'; +export { FleetUsageSender } from './telemetry/fleet_usage_sender'; diff --git a/x-pack/plugins/fleet/server/services/telemetry/fleet_usage_sender.ts b/x-pack/plugins/fleet/server/services/telemetry/fleet_usage_sender.ts new file mode 100644 index 00000000000000..6e788e58fde578 --- /dev/null +++ b/x-pack/plugins/fleet/server/services/telemetry/fleet_usage_sender.ts @@ -0,0 +1,132 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ +import type { + ConcreteTaskInstance, + TaskManagerStartContract, + TaskManagerSetupContract, +} from '@kbn/task-manager-plugin/server'; +import { throwUnrecoverableError } from '@kbn/task-manager-plugin/server'; +import type { CoreSetup } from '@kbn/core/server'; +import { withSpan } from '@kbn/apm-utils'; + +import type { Usage } from '../../collectors/register'; + +import { appContextService } from '../app_context'; + +import { fleetUsagesSchema } from './fleet_usages_schema'; + +const EVENT_TYPE = 'fleet_usage'; + +export class FleetUsageSender { + private taskManager?: TaskManagerStartContract; + private taskVersion = '1.0.0'; + private taskType = 'Fleet-Usage-Sender'; + private wasStarted: boolean = false; + private interval = '1h'; + private timeout = '1m'; + private abortController = new AbortController(); + + constructor( + taskManager: TaskManagerSetupContract, + core: CoreSetup, + fetchUsage: (abortController: AbortController) => Promise + ) { + taskManager.registerTaskDefinitions({ + [this.taskType]: { + title: 'Fleet Usage Sender', + timeout: this.timeout, + maxAttempts: 1, + createTaskRunner: ({ taskInstance }: { taskInstance: ConcreteTaskInstance }) => { + return { + run: async () => { + return withSpan({ name: this.taskType, type: 'telemetry' }, () => + this.runTask(taskInstance, core, () => fetchUsage(this.abortController)) + ); + }, + + cancel: async () => { + this.abortController.abort('task timed out'); + }, + }; + }, + }, + }); + this.registerTelemetryEventType(core); + } + + private runTask = async ( + taskInstance: ConcreteTaskInstance, + core: CoreSetup, + fetchUsage: () => Promise + ) => { + if (!this.wasStarted) { + appContextService.getLogger().debug('[runTask()] Aborted. Task not started yet'); + return; + } + // Check that this task is current + if (taskInstance.id !== this.taskId) { + throwUnrecoverableError(new Error('Outdated task version for task: ' + taskInstance.id)); + return; + } + appContextService.getLogger().info('Running Fleet Usage telemetry send task'); + + try { + const usageData = await fetchUsage(); + if (!usageData) { + return; + } + appContextService.getLogger().debug(JSON.stringify(usageData)); + core.analytics.reportEvent(EVENT_TYPE, usageData); + } catch (error) { + appContextService + .getLogger() + .error('Error occurred while sending Fleet Usage telemetry: ' + error); + } + }; + + private get taskId() { + return `${this.taskType}-${this.taskVersion}`; + } + + public async start(taskManager: TaskManagerStartContract) { + this.taskManager = taskManager; + + if (!taskManager) { + appContextService.getLogger().error('missing required service during start'); + return; + } + + this.wasStarted = true; + + try { + appContextService.getLogger().info(`Task ${this.taskId} scheduled with interval 1h`); + + await this.taskManager.ensureScheduled({ + id: this.taskId, + taskType: this.taskType, + schedule: { + interval: this.interval, + }, + scope: ['fleet'], + state: {}, + params: {}, + }); + } catch (e) { + appContextService.getLogger().error(`Error scheduling task, received error: ${e}`); + } + } + + /** + * took schema from [here](https://github.com/elastic/kibana/blob/main/x-pack/plugins/fleet/server/collectors/register.ts#L53) and adapted to EBT format + */ + private registerTelemetryEventType(core: CoreSetup): void { + core.analytics.registerEventType({ + eventType: EVENT_TYPE, + schema: fleetUsagesSchema, + }); + } +} diff --git a/x-pack/plugins/fleet/server/services/telemetry/fleet_usages_schema.ts b/x-pack/plugins/fleet/server/services/telemetry/fleet_usages_schema.ts new file mode 100644 index 00000000000000..9eeb867bd9b91a --- /dev/null +++ b/x-pack/plugins/fleet/server/services/telemetry/fleet_usages_schema.ts @@ -0,0 +1,168 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import type { RootSchema } from '@kbn/analytics-client'; + +export const fleetUsagesSchema: RootSchema = { + agents_enabled: { type: 'boolean', _meta: { description: 'agents enabled' } }, + agents: { + properties: { + total_enrolled: { + type: 'long', + _meta: { + description: 'The total number of enrolled agents, in any state', + }, + }, + healthy: { + type: 'long', + _meta: { + description: 'The total number of enrolled agents in a healthy state', + }, + }, + unhealthy: { + type: 'long', + _meta: { + description: 'The total number of enrolled agents in an unhealthy state', + }, + }, + updating: { + type: 'long', + _meta: { + description: 'The total number of enrolled agents in an updating state', + }, + }, + offline: { + type: 'long', + _meta: { + description: 'The total number of enrolled agents currently offline', + }, + }, + total_all_statuses: { + type: 'long', + _meta: { + description: 'The total number of agents in any state, both enrolled and inactive', + }, + }, + }, + }, + fleet_server: { + properties: { + total_enrolled: { + type: 'long', + _meta: { + description: 'The total number of enrolled Fleet Server agents, in any state', + }, + }, + total_all_statuses: { + type: 'long', + _meta: { + description: + 'The total number of Fleet Server agents in any state, both enrolled and inactive.', + }, + }, + healthy: { + type: 'long', + _meta: { + description: 'The total number of enrolled Fleet Server agents in a healthy state.', + }, + }, + unhealthy: { + type: 'long', + _meta: { + description: 'The total number of enrolled Fleet Server agents in an unhealthy state', + }, + }, + updating: { + type: 'long', + _meta: { + description: 'The total number of enrolled Fleet Server agents in an updating state', + }, + }, + offline: { + type: 'long', + _meta: { + description: 'The total number of enrolled Fleet Server agents currently offline', + }, + }, + num_host_urls: { + type: 'long', + _meta: { + description: 'The number of Fleet Server hosts configured in Fleet settings.', + }, + }, + }, + }, + packages: { + type: 'array', + items: { + properties: { + name: { type: 'keyword' }, + version: { type: 'keyword' }, + enabled: { type: 'boolean' }, + }, + }, + }, + agent_versions: { + type: 'array', + items: { + type: 'keyword', + _meta: { description: 'The agent versions enrolled in this deployment.' }, + }, + }, + agents_per_policy: { + type: 'array', + items: { + type: 'long', + _meta: { description: 'Agent counts enrolled per agent policy.' }, + }, + }, + fleet_server_config: { + properties: { + policies: { + type: 'array', + items: { + properties: { + input_config: { type: 'pass_through' }, + }, + }, + }, + }, + }, + agent_policies: { + properties: { + count: { + type: 'long', + _meta: { + description: 'Number of agent policies', + }, + }, + output_types: { + type: 'array', + items: { + type: 'keyword', + _meta: { description: 'Output types of agent policies' }, + }, + }, + }, + }, + agent_checkin_status: { + properties: { + error: { + type: 'long', + _meta: { + description: 'Count of agent last checkin status error', + }, + }, + degraded: { + type: 'long', + _meta: { + description: 'Count of agent last checkin status degraded', + }, + }, + }, + }, +};