Skip to content

Commit

Permalink
fix linter errors
Browse files Browse the repository at this point in the history
  • Loading branch information
Mpdreamz committed Mar 8, 2022
1 parent f1f1cc2 commit 5c4f064
Show file tree
Hide file tree
Showing 5 changed files with 132 additions and 117 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,7 @@ export class ApmSynthtraceEsClient {

async runningVersion() {
const info = await this.client.info();
console.log(info.version);
return info.version.number;

}

async clean() {
Expand Down Expand Up @@ -90,33 +88,35 @@ export class ApmSynthtraceEsClient {

async registerGcpRepository(connectionString: string) {
// <client_name>:<bucket>[:base_path]
let [clientName, bucket, basePath] = connectionString.split(':');
if (!clientName) throw new Error(`client name is mandatory for gcp repostitory registration: ${connectionString}`)
if (!bucket) throw new Error(`bucket is mandatory for gcp repostitory registration: ${connectionString}`)
const [clientName, bucket, basePath] = connectionString.split(':');
if (!clientName)
throw new Error(
`client name is mandatory for gcp repostitory registration: ${connectionString}`
);
if (!bucket)
throw new Error(`bucket is mandatory for gcp repostitory registration: ${connectionString}`);

const name = `gcp-repository-${clientName}`;
this.logger.info(`Registering gcp repository ${name}`)
this.logger.info(`Registering gcp repository ${name}`);
const putRepository = await this.client.snapshot.createRepository({
name: name,
name,
type: 'gcs',
settings: {
// @ts-ignore
// missing from es types
'bucket': bucket,
bucket,
client: clientName,
base_path: basePath
}
})
base_path: basePath,
},
});
this.logger.info(putRepository);

this.logger.info(`Verifying gcp repository ${name}`)
const verifyRepository = await this.client.snapshot.verifyRepository({ name: name })
this.logger.info(`Verifying gcp repository ${name}`);
const verifyRepository = await this.client.snapshot.verifyRepository({ name });
this.logger.info(verifyRepository);
}


async refresh() {

const writeTargets = await this.getWriteTargets();

const indices = Object.values(writeTargets);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,35 +42,34 @@ export class ApmSynthtraceKibanaClient {
}

async discoverLocalKibana() {
return await fetch("http://localhost:5601", { method: "HEAD", follow: 1, redirect: "manual"})
.then((res) => {
const kibanaUrl = res.headers.get('location');
this.logger.info(`Discovered local kibana running at: ${kibanaUrl}`);
return kibanaUrl;
})
return await fetch('http://localhost:5601', {
method: 'HEAD',
follow: 1,
redirect: 'manual',
}).then((res) => {
const kibanaUrl = res.headers.get('location');
this.logger.info(`Discovered local kibana running at: ${kibanaUrl}`);
return kibanaUrl;
});
}

async installApmPackage(kibanaUrl: string, version:string, username: string, password: string) {
const response = await fetch(
kibanaUrl + '/api/fleet/epm/packages/apm/' + version,
{
method: 'POST',
headers: {
Authorization: 'Basic ' + Buffer.from(username + ':' + password).toString('base64'),
Accept: 'application/json',
'Content-Type': 'application/json',
'kbn-xsrf': 'kibana',
},
body: '{"force":true}'
}
);
async installApmPackage(kibanaUrl: string, version: string, username: string, password: string) {
const response = await fetch(kibanaUrl + '/api/fleet/epm/packages/apm/' + version, {
method: 'POST',
headers: {
Authorization: 'Basic ' + Buffer.from(username + ':' + password).toString('base64'),
Accept: 'application/json',
'Content-Type': 'application/json',
'kbn-xsrf': 'kibana',
},
body: '{"force":true}',
});
const responseJson = await response.json();
if (responseJson.statusCode) {
throw Error(`unable to install apm package ${version}`)
throw Error(`unable to install apm package ${version}`);
}
if (responseJson.items) {
this.logger.info(`Installed apm package ${version}`);
}
else this.logger.error(responseJson)
} else this.logger.error(responseJson);
}
}
18 changes: 11 additions & 7 deletions packages/elastic-apm-synthtrace/src/lib/stream_processor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,8 @@ export class StreamProcessor<TFields extends Fields = ApmFields> {
? parseInterval(this.options.flushInterval)
: parseInterval('1m');
this.name = this.options?.name ?? 'StreamProcessor';
this.version = this.options.version ?? "8.0.0";
this.versionMajor = Number.parseInt(this.version.split('.')[0]);
this.version = this.options.version ?? '8.0.0';
this.versionMajor = Number.parseInt(this.version.split('.')[0], 10);
}
private readonly intervalAmount: number;
private readonly intervalUnit: any;
Expand All @@ -55,7 +55,7 @@ export class StreamProcessor<TFields extends Fields = ApmFields> {
private readonly versionMajor: number;

// TODO move away from chunking and feed this data one by one to processors
*stream(...eventSources: Array<EntityIterable<TFields>>) : Generator<ApmFields, any, any> {
*stream(...eventSources: Array<EntityIterable<TFields>>): Generator<ApmFields, any, any> {
const maxBufferSize = this.options.maxBufferSize ?? StreamProcessor.defaultFlushInterval;
const maxSourceEvents = this.options.maxSourceEvents;
let localBuffer = [];
Expand Down Expand Up @@ -97,7 +97,9 @@ export class StreamProcessor<TFields extends Fields = ApmFields> {
`${this.name} flush ${localBuffer.length} documents ${order}: ${e} => ${f}`
);
for (const processor of this.options.processors) {
yield* processor(localBuffer).map((d) => StreamProcessor.enrich(d, this.version, this.versionMajor));
yield* processor(localBuffer).map((d) =>
StreamProcessor.enrich(d, this.version, this.versionMajor)
);
}
localBuffer = [];
flushAfter = this.calculateFlushAfter(flushAfter, order);
Expand All @@ -115,7 +117,9 @@ export class StreamProcessor<TFields extends Fields = ApmFields> {
`${this.name} processing remaining buffer: ${localBuffer.length} items left`
);
for (const processor of this.options.processors) {
yield* processor(localBuffer).map((d) => StreamProcessor.enrich(d, this.version, this.versionMajor));
yield* processor(localBuffer).map((d) =>
StreamProcessor.enrich(d, this.version, this.versionMajor)
);
}
this.options.processedCallback?.apply(this, [localBuffer.length]);
}
Expand Down Expand Up @@ -153,11 +157,11 @@ export class StreamProcessor<TFields extends Fields = ApmFields> {
return Array.from<ApmFields>(this.stream(...eventSources));
}

private static enrich(document: ApmFields, version: string, versionMajor:number): ApmFields {
private static enrich(document: ApmFields, version: string, versionMajor: number): ApmFields {
// see https://github.com/elastic/apm-server/issues/7088 can not be provided as flat key/values
document.observer = {
version: version ?? '8.2.0',
version_major: versionMajor,
version_major: versionMajor,
};
document['service.node.name'] =
document['service.node.name'] || document['container.id'] || document['host.name'];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,10 @@ import { ApmFields } from '../../lib/apm/apm_fields';
const scenario: Scenario<ApmFields> = async (runOptions: RunOptions) => {
const logger = getLogger(runOptions);

if (!runOptions.writeTarget) {
throw new Error('Write target is not defined');
}
// TODO reintroduce overwrite
// if (!runOptions.writeTarget) {
// throw new Error('Write target is not defined');
// }

return {
generate: ({ from, to }) => {
Expand Down
149 changes: 80 additions & 69 deletions packages/elastic-apm-synthtrace/src/scripts/run.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,12 @@ function options(y: Argv) {
.option('username', {
describe: 'Basic authentication username',
string: true,
default: "elastic",
default: 'elastic',
})
.option('password', {
describe: 'Basic authentication password',
string: true,
default: "changeme",
default: 'changeme',
})
.option('from', {
description: 'The start of the time window',
Expand Down Expand Up @@ -107,7 +107,8 @@ function options(y: Argv) {
},
})
.option('gcpRepository', {
describe: 'Allows you to register a GCP repository in <client_name>:<bucket>[:base_path] format',
describe:
'Allows you to register a GCP repository in <client_name>:<bucket>[:base_path] format',
string: true,
})

Expand All @@ -122,82 +123,92 @@ function options(y: Argv) {
export type RunCliFlags = ReturnType<typeof options>['argv'];

yargs(process.argv.slice(2))
.command('*', 'Generate data and index into Elasticsearch', options, async (argv: RunCliFlags) => {
if (argv.local) {
argv.target = "http://localhost:9200";
}
.command(
'*',
'Generate data and index into Elasticsearch',
options,
async (argv: RunCliFlags) => {
if (argv.local) {
argv.target = 'http://localhost:9200';
}

const runOptions = parseRunCliFlags(argv);
const runOptions = parseRunCliFlags(argv);

const { logger, apmEsClient } = getCommonServices(runOptions);
const { logger, apmEsClient } = getCommonServices(runOptions);

const toMs = datemath.parse(String(argv.to ?? 'now'))!.valueOf();
const to = new Date(toMs);
const defaultTimeRange = !runOptions.maxDocs ? '15m' : '520w';
const fromMs = argv.from
? datemath.parse(String(argv.from))!.valueOf()
: toMs - intervalToMs(defaultTimeRange);
const from = new Date(fromMs);
const toMs = datemath.parse(String(argv.to ?? 'now'))!.valueOf();
const to = new Date(toMs);
const defaultTimeRange = !runOptions.maxDocs ? '15m' : '520w';
const fromMs = argv.from
? datemath.parse(String(argv.from))!.valueOf()
: toMs - intervalToMs(defaultTimeRange);
const from = new Date(fromMs);

const live = argv.live;
const live = argv.live;

if (runOptions.dryRun) {
await startHistoricalDataUpload(apmEsClient, logger, runOptions, from, to, "8.0.0");
return;
}
if (runOptions.dryRun) {
await startHistoricalDataUpload(apmEsClient, logger, runOptions, from, to, '8.0.0');
return;
}

// we need to know the running version to generate events that satisfy the min version requirements
let version = await apmEsClient.runningVersion();
logger.info(`Discovered Elasticsearch running version: ${version}`);
version = version.replace("-SNAPSHOT", "");

// We automatically set up managed APM either by migrating on cloud or installing the package locally
if (runOptions.cloudId || argv.local || argv.kibana) {
const kibanaClient = new ApmSynthtraceKibanaClient(logger);
if (runOptions.cloudId) {
await kibanaClient.migrateCloudToManagedApm(
runOptions.cloudId,
runOptions.username,
runOptions.password
);
} else {
let kibanaUrl: string | null = argv.kibana ?? null;
if (argv.local) {
kibanaUrl = await kibanaClient.discoverLocalKibana();
// we need to know the running version to generate events that satisfy the min version requirements
let version = await apmEsClient.runningVersion();
logger.info(`Discovered Elasticsearch running version: ${version}`);
version = version.replace('-SNAPSHOT', '');

// We automatically set up managed APM either by migrating on cloud or installing the package locally
if (runOptions.cloudId || argv.local || argv.kibana) {
const kibanaClient = new ApmSynthtraceKibanaClient(logger);
if (runOptions.cloudId) {
await kibanaClient.migrateCloudToManagedApm(
runOptions.cloudId,
runOptions.username,
runOptions.password
);
} else {
let kibanaUrl: string | null = argv.kibana ?? null;
if (argv.local) {
kibanaUrl = await kibanaClient.discoverLocalKibana();
}
if (!kibanaUrl) throw Error('kibanaUrl could not be determined');
await kibanaClient.installApmPackage(
kibanaUrl,
version,
runOptions.username,
runOptions.password
);
}
if (!kibanaUrl) throw Error("kibanaUrl could not be determined")
await kibanaClient.installApmPackage(kibanaUrl, version, runOptions.username, runOptions.password);
}
}

if (runOptions.cloudId && runOptions.numShards && runOptions.numShards > 0) {
await apmEsClient.updateComponentTemplates(runOptions.numShards);
}
if (runOptions.cloudId && runOptions.numShards && runOptions.numShards > 0) {
await apmEsClient.updateComponentTemplates(runOptions.numShards);
}

if (argv.clean) {
await apmEsClient.clean();
}
if (runOptions.gcpRepository) {
await apmEsClient.registerGcpRepository(runOptions.gcpRepository);
}
if (argv.clean) {
await apmEsClient.clean();
}
if (runOptions.gcpRepository) {
await apmEsClient.registerGcpRepository(runOptions.gcpRepository);
}

logger.info(
`Starting data generation\n: ${JSON.stringify(
{
...runOptions,
from: from.toISOString(),
to: to.toISOString(),
},
null,
2
)}`
);

if (runOptions.maxDocs !== 0)
await startHistoricalDataUpload(apmEsClient, logger, runOptions, from, to, version);

if (live) {
await startLiveDataUpload(apmEsClient, logger, runOptions, to, version);
logger.info(
`Starting data generation\n: ${JSON.stringify(
{
...runOptions,
from: from.toISOString(),
to: to.toISOString(),
},
null,
2
)}`
);

if (runOptions.maxDocs !== 0)
await startHistoricalDataUpload(apmEsClient, logger, runOptions, from, to, version);

if (live) {
await startLiveDataUpload(apmEsClient, logger, runOptions, to, version);
}
}
})
)
.parse();

0 comments on commit 5c4f064

Please sign in to comment.