Skip to content

Commit

Permalink
[APM] Generate Service metrics with synthrace
Browse files Browse the repository at this point in the history
  • Loading branch information
kpatticha committed Sep 26, 2022
1 parent 4c95872 commit b460c80
Show file tree
Hide file tree
Showing 3 changed files with 55 additions and 21 deletions.
4 changes: 2 additions & 2 deletions packages/kbn-apm-synthtrace/src/cli/run_synthtrace.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import { parseRunCliFlags } from './utils/parse_run_cli_flags';
import { getCommonServices } from './utils/get_common_services';
import { ApmSynthtraceKibanaClient } from '../lib/apm/client/apm_synthtrace_kibana_client';
import { StreamAggregator } from '../lib/stream_aggregator';
import { ServiceLatencyAggregator } from '../lib/apm/aggregators/service_latency_aggregator';
import { ServicMetricsAggregator } from '../lib/apm/aggregators/service_metrics_aggregator';

function options(y: Argv) {
return y
Expand Down Expand Up @@ -207,7 +207,7 @@ export function runSynthtrace() {
}
const aggregators: StreamAggregator[] = [];
const registry = new Map<string, () => StreamAggregator[]>([
['service', () => [new ServiceLatencyAggregator()]],
['service', () => [new ServicMetricsAggregator()]],
]);
if (runOptions.streamProcessors && runOptions.streamProcessors.length > 0) {
for (const processorName of runOptions.streamProcessors) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import { StreamProcessor } from '../../lib/stream_processor';
import { Scenario } from '../scenario';
import { EntityIterable, Fields } from '../../..';
import { StreamAggregator } from '../../lib/stream_aggregator';
import { ServiceLatencyAggregator } from '../../lib/apm/aggregators/service_latency_aggregator';
import { ServicMetricsAggregator } from '../../lib/apm/aggregators/service_metrics_aggregator';

// logging proxy to main thread, ensures we see real time logging
const l = {
Expand Down Expand Up @@ -63,7 +63,7 @@ async function setup() {
parentPort?.postMessage({ workerIndex, lastTimestamp: item['@timestamp'] });
}
};
const aggregators: StreamAggregator[] = [new ServiceLatencyAggregator()];
const aggregators: StreamAggregator[] = [new ServicMetricsAggregator()];
// If we are sending data to apm-server we do not have to create any aggregates in the stream processor
streamProcessor = new StreamProcessor({
version,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,14 @@ import { ApmFields } from '../apm_fields';
import { Fields } from '../../entity';
import { StreamAggregator } from '../../stream_aggregator';

type LatencyState = {
type AggregationState = {
count: number;
min: number;
max: number;
sum: number;
timestamp: number;
failure_count: number;
success_count: number;
} & Pick<ApmFields, 'service.name' | 'service.environment' | 'transaction.type'>;

export type ServiceFields = Fields &
Expand All @@ -35,15 +37,22 @@ export type ServiceFields = Fields &
| 'transaction.type'
> &
Partial<{
'transaction.duration.aggregate': {
min: number;
max: number;
sum: number;
value_count: number;
_doc_count: number;
transaction: {
duration: {
summary: {
min: number;
max: number;
sum: number;
value_count: number;
};
};
failure_count: number;
success_count: number;
};
}>;

export class ServiceLatencyAggregator implements StreamAggregator<ApmFields> {
export class ServicMetricsAggregator implements StreamAggregator<ApmFields> {
public readonly name;

constructor() {
Expand All @@ -68,14 +77,20 @@ export class ServiceLatencyAggregator implements StreamAggregator<ApmFields> {
duration: {
type: 'object',
properties: {
aggregate: {
summary: {
type: 'aggregate_metric_double',
metrics: ['min', 'max', 'sum', 'value_count'],
default_metric: 'sum',
time_series_metric: 'gauge',
},
},
},
failure_count: {
type: { type: 'long' },
},
success_count: {
type: { type: 'long' },
},
},
},
service: {
Expand All @@ -99,7 +114,7 @@ export class ServiceLatencyAggregator implements StreamAggregator<ApmFields> {
return null;
}

private state: Record<string, LatencyState> = {};
private state: Record<string, AggregationState> = {};

private processedComponent: number = 0;

Expand All @@ -120,13 +135,25 @@ export class ServiceLatencyAggregator implements StreamAggregator<ApmFields> {
'service.name': service,
'service.environment': environment,
'transaction.type': transactionType,
failure_count: 0,
success_count: 0,
};
}

const state = this.state[key];
state.count++;

switch (event['event.outcome']) {
case 'failure':
state.failure_count++;
break;
case 'success':
state.success_count++;
break;
}

const duration = Number(event['transaction.duration.us']);
if (duration >= 0) {
const state = this.state[key];

state.count++;
state.sum += duration;
if (duration > state.max) state.max = duration;
if (duration < state.min) state.min = Math.min(0, duration);
Expand Down Expand Up @@ -164,17 +191,24 @@ export class ServiceLatencyAggregator implements StreamAggregator<ApmFields> {
const component = Date.now() % 100;
const state = this.state[key];
return {
_doc_count: state.count,
'@timestamp': state.timestamp + random(0, 100) + component + this.processedComponent,
'metricset.name': 'service',
'processor.event': 'metric',
'service.name': state['service.name'],
'service.environment': state['service.environment'],
'transaction.type': state['transaction.type'],
'transaction.duration.aggregate': {
min: state.min,
max: state.max,
sum: state.sum,
value_count: state.count,
transaction: {
duration: {
summary: {
min: state.min,
max: state.max,
sum: state.sum,
value_count: state.count,
},
},
failure_count: state.failure_count,
success_count: state.success_count,
},
};
}
Expand Down

0 comments on commit b460c80

Please sign in to comment.