diff --git a/packages/elastic-apm-synthtrace/src/lib/apm/client/apm_synthtrace_es_client.ts b/packages/elastic-apm-synthtrace/src/lib/apm/client/apm_synthtrace_es_client.ts index 5340d858bedbd..ed8ff82d51624 100644 --- a/packages/elastic-apm-synthtrace/src/lib/apm/client/apm_synthtrace_es_client.ts +++ b/packages/elastic-apm-synthtrace/src/lib/apm/client/apm_synthtrace_es_client.ts @@ -40,9 +40,7 @@ export class ApmSynthtraceEsClient { async runningVersion() { const info = await this.client.info(); - console.log(info.version); return info.version.number; - } async clean() { @@ -90,33 +88,35 @@ export class ApmSynthtraceEsClient { async registerGcpRepository(connectionString: string) { // :[:base_path] - let [clientName, bucket, basePath] = connectionString.split(':'); - if (!clientName) throw new Error(`client name is mandatory for gcp repostitory registration: ${connectionString}`) - if (!bucket) throw new Error(`bucket is mandatory for gcp repostitory registration: ${connectionString}`) + const [clientName, bucket, basePath] = connectionString.split(':'); + if (!clientName) + throw new Error( + `client name is mandatory for gcp repostitory registration: ${connectionString}` + ); + if (!bucket) + throw new Error(`bucket is mandatory for gcp repostitory registration: ${connectionString}`); const name = `gcp-repository-${clientName}`; - this.logger.info(`Registering gcp repository ${name}`) + this.logger.info(`Registering gcp repository ${name}`); const putRepository = await this.client.snapshot.createRepository({ - name: name, + name, type: 'gcs', settings: { // @ts-ignore // missing from es types - 'bucket': bucket, + bucket, client: clientName, - base_path: basePath - } - }) + base_path: basePath, + }, + }); this.logger.info(putRepository); - this.logger.info(`Verifying gcp repository ${name}`) - const verifyRepository = await this.client.snapshot.verifyRepository({ name: name }) + this.logger.info(`Verifying gcp repository ${name}`); + const verifyRepository = await this.client.snapshot.verifyRepository({ name }); this.logger.info(verifyRepository); } - async refresh() { - const writeTargets = await this.getWriteTargets(); const indices = Object.values(writeTargets); diff --git a/packages/elastic-apm-synthtrace/src/lib/apm/client/apm_synthtrace_kibana_client.ts b/packages/elastic-apm-synthtrace/src/lib/apm/client/apm_synthtrace_kibana_client.ts index 1f9d26b2ab075..a3161c74b1910 100644 --- a/packages/elastic-apm-synthtrace/src/lib/apm/client/apm_synthtrace_kibana_client.ts +++ b/packages/elastic-apm-synthtrace/src/lib/apm/client/apm_synthtrace_kibana_client.ts @@ -42,35 +42,34 @@ export class ApmSynthtraceKibanaClient { } async discoverLocalKibana() { - return await fetch("http://localhost:5601", { method: "HEAD", follow: 1, redirect: "manual"}) - .then((res) => { - const kibanaUrl = res.headers.get('location'); - this.logger.info(`Discovered local kibana running at: ${kibanaUrl}`); - return kibanaUrl; - }) + return await fetch('http://localhost:5601', { + method: 'HEAD', + follow: 1, + redirect: 'manual', + }).then((res) => { + const kibanaUrl = res.headers.get('location'); + this.logger.info(`Discovered local kibana running at: ${kibanaUrl}`); + return kibanaUrl; + }); } - async installApmPackage(kibanaUrl: string, version:string, username: string, password: string) { - const response = await fetch( - kibanaUrl + '/api/fleet/epm/packages/apm/' + version, - { - method: 'POST', - headers: { - Authorization: 'Basic ' + Buffer.from(username + ':' + password).toString('base64'), - Accept: 'application/json', - 'Content-Type': 'application/json', - 'kbn-xsrf': 'kibana', - }, - body: '{"force":true}' - } - ); + async installApmPackage(kibanaUrl: string, version: string, username: string, password: string) { + const response = await fetch(kibanaUrl + '/api/fleet/epm/packages/apm/' + version, { + method: 'POST', + headers: { + Authorization: 'Basic ' + Buffer.from(username + ':' + password).toString('base64'), + Accept: 'application/json', + 'Content-Type': 'application/json', + 'kbn-xsrf': 'kibana', + }, + body: '{"force":true}', + }); const responseJson = await response.json(); if (responseJson.statusCode) { - throw Error(`unable to install apm package ${version}`) + throw Error(`unable to install apm package ${version}`); } if (responseJson.items) { this.logger.info(`Installed apm package ${version}`); - } - else this.logger.error(responseJson) + } else this.logger.error(responseJson); } } diff --git a/packages/elastic-apm-synthtrace/src/lib/stream_processor.ts b/packages/elastic-apm-synthtrace/src/lib/stream_processor.ts index 565a2d42733ae..6c4cb93c7c713 100644 --- a/packages/elastic-apm-synthtrace/src/lib/stream_processor.ts +++ b/packages/elastic-apm-synthtrace/src/lib/stream_processor.ts @@ -45,8 +45,8 @@ export class StreamProcessor { ? parseInterval(this.options.flushInterval) : parseInterval('1m'); this.name = this.options?.name ?? 'StreamProcessor'; - this.version = this.options.version ?? "8.0.0"; - this.versionMajor = Number.parseInt(this.version.split('.')[0]); + this.version = this.options.version ?? '8.0.0'; + this.versionMajor = Number.parseInt(this.version.split('.')[0], 10); } private readonly intervalAmount: number; private readonly intervalUnit: any; @@ -55,7 +55,7 @@ export class StreamProcessor { private readonly versionMajor: number; // TODO move away from chunking and feed this data one by one to processors - *stream(...eventSources: Array>) : Generator { + *stream(...eventSources: Array>): Generator { const maxBufferSize = this.options.maxBufferSize ?? StreamProcessor.defaultFlushInterval; const maxSourceEvents = this.options.maxSourceEvents; let localBuffer = []; @@ -97,7 +97,9 @@ export class StreamProcessor { `${this.name} flush ${localBuffer.length} documents ${order}: ${e} => ${f}` ); for (const processor of this.options.processors) { - yield* processor(localBuffer).map((d) => StreamProcessor.enrich(d, this.version, this.versionMajor)); + yield* processor(localBuffer).map((d) => + StreamProcessor.enrich(d, this.version, this.versionMajor) + ); } localBuffer = []; flushAfter = this.calculateFlushAfter(flushAfter, order); @@ -115,7 +117,9 @@ export class StreamProcessor { `${this.name} processing remaining buffer: ${localBuffer.length} items left` ); for (const processor of this.options.processors) { - yield* processor(localBuffer).map((d) => StreamProcessor.enrich(d, this.version, this.versionMajor)); + yield* processor(localBuffer).map((d) => + StreamProcessor.enrich(d, this.version, this.versionMajor) + ); } this.options.processedCallback?.apply(this, [localBuffer.length]); } @@ -153,11 +157,11 @@ export class StreamProcessor { return Array.from(this.stream(...eventSources)); } - private static enrich(document: ApmFields, version: string, versionMajor:number): ApmFields { + private static enrich(document: ApmFields, version: string, versionMajor: number): ApmFields { // see https://github.com/elastic/apm-server/issues/7088 can not be provided as flat key/values document.observer = { version: version ?? '8.2.0', - version_major: versionMajor, + version_major: versionMajor, }; document['service.node.name'] = document['service.node.name'] || document['container.id'] || document['host.name']; diff --git a/packages/elastic-apm-synthtrace/src/scripts/examples/02_kibana_stats.ts b/packages/elastic-apm-synthtrace/src/scripts/examples/02_kibana_stats.ts index 716ed1a38f214..2b33607c8a3cb 100644 --- a/packages/elastic-apm-synthtrace/src/scripts/examples/02_kibana_stats.ts +++ b/packages/elastic-apm-synthtrace/src/scripts/examples/02_kibana_stats.ts @@ -15,9 +15,10 @@ import { ApmFields } from '../../lib/apm/apm_fields'; const scenario: Scenario = async (runOptions: RunOptions) => { const logger = getLogger(runOptions); - if (!runOptions.writeTarget) { - throw new Error('Write target is not defined'); - } + // TODO reintroduce overwrite + // if (!runOptions.writeTarget) { + // throw new Error('Write target is not defined'); + // } return { generate: ({ from, to }) => { diff --git a/packages/elastic-apm-synthtrace/src/scripts/run.ts b/packages/elastic-apm-synthtrace/src/scripts/run.ts index cf1394361ece5..1c44ec516427b 100644 --- a/packages/elastic-apm-synthtrace/src/scripts/run.ts +++ b/packages/elastic-apm-synthtrace/src/scripts/run.ts @@ -43,12 +43,12 @@ function options(y: Argv) { .option('username', { describe: 'Basic authentication username', string: true, - default: "elastic", + default: 'elastic', }) .option('password', { describe: 'Basic authentication password', string: true, - default: "changeme", + default: 'changeme', }) .option('from', { description: 'The start of the time window', @@ -107,7 +107,8 @@ function options(y: Argv) { }, }) .option('gcpRepository', { - describe: 'Allows you to register a GCP repository in :[:base_path] format', + describe: + 'Allows you to register a GCP repository in :[:base_path] format', string: true, }) @@ -122,82 +123,92 @@ function options(y: Argv) { export type RunCliFlags = ReturnType['argv']; yargs(process.argv.slice(2)) - .command('*', 'Generate data and index into Elasticsearch', options, async (argv: RunCliFlags) => { - if (argv.local) { - argv.target = "http://localhost:9200"; - } + .command( + '*', + 'Generate data and index into Elasticsearch', + options, + async (argv: RunCliFlags) => { + if (argv.local) { + argv.target = 'http://localhost:9200'; + } - const runOptions = parseRunCliFlags(argv); + const runOptions = parseRunCliFlags(argv); - const { logger, apmEsClient } = getCommonServices(runOptions); + const { logger, apmEsClient } = getCommonServices(runOptions); - const toMs = datemath.parse(String(argv.to ?? 'now'))!.valueOf(); - const to = new Date(toMs); - const defaultTimeRange = !runOptions.maxDocs ? '15m' : '520w'; - const fromMs = argv.from - ? datemath.parse(String(argv.from))!.valueOf() - : toMs - intervalToMs(defaultTimeRange); - const from = new Date(fromMs); + const toMs = datemath.parse(String(argv.to ?? 'now'))!.valueOf(); + const to = new Date(toMs); + const defaultTimeRange = !runOptions.maxDocs ? '15m' : '520w'; + const fromMs = argv.from + ? datemath.parse(String(argv.from))!.valueOf() + : toMs - intervalToMs(defaultTimeRange); + const from = new Date(fromMs); - const live = argv.live; + const live = argv.live; - if (runOptions.dryRun) { - await startHistoricalDataUpload(apmEsClient, logger, runOptions, from, to, "8.0.0"); - return; - } + if (runOptions.dryRun) { + await startHistoricalDataUpload(apmEsClient, logger, runOptions, from, to, '8.0.0'); + return; + } - // we need to know the running version to generate events that satisfy the min version requirements - let version = await apmEsClient.runningVersion(); - logger.info(`Discovered Elasticsearch running version: ${version}`); - version = version.replace("-SNAPSHOT", ""); - - // We automatically set up managed APM either by migrating on cloud or installing the package locally - if (runOptions.cloudId || argv.local || argv.kibana) { - const kibanaClient = new ApmSynthtraceKibanaClient(logger); - if (runOptions.cloudId) { - await kibanaClient.migrateCloudToManagedApm( - runOptions.cloudId, - runOptions.username, - runOptions.password - ); - } else { - let kibanaUrl: string | null = argv.kibana ?? null; - if (argv.local) { - kibanaUrl = await kibanaClient.discoverLocalKibana(); + // we need to know the running version to generate events that satisfy the min version requirements + let version = await apmEsClient.runningVersion(); + logger.info(`Discovered Elasticsearch running version: ${version}`); + version = version.replace('-SNAPSHOT', ''); + + // We automatically set up managed APM either by migrating on cloud or installing the package locally + if (runOptions.cloudId || argv.local || argv.kibana) { + const kibanaClient = new ApmSynthtraceKibanaClient(logger); + if (runOptions.cloudId) { + await kibanaClient.migrateCloudToManagedApm( + runOptions.cloudId, + runOptions.username, + runOptions.password + ); + } else { + let kibanaUrl: string | null = argv.kibana ?? null; + if (argv.local) { + kibanaUrl = await kibanaClient.discoverLocalKibana(); + } + if (!kibanaUrl) throw Error('kibanaUrl could not be determined'); + await kibanaClient.installApmPackage( + kibanaUrl, + version, + runOptions.username, + runOptions.password + ); } - if (!kibanaUrl) throw Error("kibanaUrl could not be determined") - await kibanaClient.installApmPackage(kibanaUrl, version, runOptions.username, runOptions.password); } - } - if (runOptions.cloudId && runOptions.numShards && runOptions.numShards > 0) { - await apmEsClient.updateComponentTemplates(runOptions.numShards); - } + if (runOptions.cloudId && runOptions.numShards && runOptions.numShards > 0) { + await apmEsClient.updateComponentTemplates(runOptions.numShards); + } - if (argv.clean) { - await apmEsClient.clean(); - } - if (runOptions.gcpRepository) { - await apmEsClient.registerGcpRepository(runOptions.gcpRepository); - } + if (argv.clean) { + await apmEsClient.clean(); + } + if (runOptions.gcpRepository) { + await apmEsClient.registerGcpRepository(runOptions.gcpRepository); + } - logger.info( - `Starting data generation\n: ${JSON.stringify( - { - ...runOptions, - from: from.toISOString(), - to: to.toISOString(), - }, - null, - 2 - )}` - ); - - if (runOptions.maxDocs !== 0) - await startHistoricalDataUpload(apmEsClient, logger, runOptions, from, to, version); - - if (live) { - await startLiveDataUpload(apmEsClient, logger, runOptions, to, version); + logger.info( + `Starting data generation\n: ${JSON.stringify( + { + ...runOptions, + from: from.toISOString(), + to: to.toISOString(), + }, + null, + 2 + )}` + ); + + if (runOptions.maxDocs !== 0) + await startHistoricalDataUpload(apmEsClient, logger, runOptions, from, to, version); + + if (live) { + await startLiveDataUpload(apmEsClient, logger, runOptions, to, version); + } } - }) + ) .parse();