Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix!: metrics fixes, updates, cleanups #338

Merged
merged 15 commits into from
Sep 21, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion libraries/grpc-sdk/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,13 @@
"@types/google-protobuf": "^3.15.6",
"@types/ioredis": "^4.28.10",
"@types/lodash": "^4.14.182",
"abort-controller-x": "^0.4.0",
"convict": "^6.2.3",
"express": "^4.18.1",
"fast-jwt": "^1.6.0",
"ioredis": "^5.1.0",
"lodash": "^4.17.21",
"nice-grpc": "^1.2.0",
"nice-grpc-prometheus": "^0.1.0",
"prom-client": "^14.0.1",
"protobufjs": "^6.11.3",
"winston": "^3.8.1",
Expand Down
5 changes: 2 additions & 3 deletions libraries/grpc-sdk/src/classes/HealthCheck.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { getGrpcSignedTokenInterceptor, getModuleNameInterceptor } from '../interceptors';
import { createChannel, createClientFactory } from 'nice-grpc';
import { HealthCheckResponse, HealthDefinition } from '../protoUtils/grpc_health_check';
import { prometheusClientMiddleware } from 'nice-grpc-prometheus';
import { clientMiddleware } from '../metrics/clientMiddleware';

export async function checkModuleHealth(
clientName: string,
Expand All @@ -14,8 +14,7 @@ export async function checkModuleHealth(
'grpc.max_send_message_length': 1024 * 1024 * 100,
});
const clientFactory = createClientFactory()
//@ts-ignore
.use(prometheusClientMiddleware())
.use(clientMiddleware())
.use(
grpcToken
? getGrpcSignedTokenInterceptor(grpcToken)
Expand Down
23 changes: 19 additions & 4 deletions libraries/grpc-sdk/src/classes/ManagedModule.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ import { status } from '@grpc/grpc-js';
import convict from 'convict';

export abstract class ManagedModule<T> extends ConduitServiceModule {
abstract readonly configSchema?: object;
protected abstract readonly configSchema?: object;
protected abstract readonly metricsSchema?: object;
readonly config?: convict.Config<T>;
service?: ConduitService;

Expand Down Expand Up @@ -83,10 +84,24 @@ export abstract class ManagedModule<T> extends ConduitServiceModule {
async onConfig() {}

/**
* This is triggered when a module needs to initialize its own custom metric
* types and configuration by using the sdk's ConduitMetrics.
* Registers common and module-specific metric types.
*/
initializeMetrics() {}
async registerMetrics() {
if (ConduitGrpcSdk.Metrics) {
ConduitGrpcSdk.Metrics.initializeDefaultMetrics();
if (this.metricsSchema) {
Object.values(this.metricsSchema).forEach(metric => {
ConduitGrpcSdk.Metrics!.registerMetric(metric.type, metric.config);
});
}
}
}

/**
* Initializes metric startup values.
* Implemented by individual modules.
*/
async initializeMetrics() {}

async createGrpcServer(servicePort?: string) {
this.grpcServer = new GrpcServer(servicePort);
Expand Down
10 changes: 2 additions & 8 deletions libraries/grpc-sdk/src/classes/ModuleManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,10 @@ export class ModuleManager<T> {
await this.module.preServerStart();
await this.grpcSdk.initializeEventBus();
await this.module.handleConfigSyncUpdate();
await this.module.registerMetrics();
await this.module.startGrpcServer();
await this.initializeMetrics();
await this.module.onServerStart();
await this.module.initializeMetrics();
await this.module.preRegister();
}

Expand All @@ -90,11 +91,4 @@ export class ModuleManager<T> {
await this.module.onConfig();
}
}

private initializeMetrics() {
if (process.env['METRICS_PORT']) {
this.grpcSdk.initializeDefaultMetrics();
this.module.initializeMetrics();
}
}
}
33 changes: 1 addition & 32 deletions libraries/grpc-sdk/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,21 +29,14 @@ import {
GetRedisDetailsResponse,
ModuleListResponse_ModuleResponse,
} from './protoUtils/core';
import { GrpcError, HealthCheckStatus, MetricConfiguration, MetricType } from './types';
import { GrpcError, HealthCheckStatus } from './types';
import { createSigner } from 'fast-jwt';
import { checkModuleHealth } from './classes/HealthCheck';
import { ConduitLogger } from './utilities/Logger';
import winston from 'winston';
import path from 'path';
import LokiTransport from 'winston-loki';
import { ConduitMetrics } from './metrics';
import defaultMetrics from './metrics/config/defaults';
import {
CounterConfiguration,
GaugeConfiguration,
HistogramConfiguration,
SummaryConfiguration,
} from 'prom-client';

export default class ConduitGrpcSdk {
private readonly serverUrl: string;
Expand Down Expand Up @@ -404,30 +397,6 @@ export default class ConduitGrpcSdk {
});
}

initializeDefaultMetrics() {
for (const metric of Object.values(defaultMetrics)) {
this.registerMetric(metric.type, metric.config);
}
}

registerMetric(type: MetricType, config: MetricConfiguration) {
config.name = `conduit_${config.name}`;
switch (type) {
case MetricType.Counter:
ConduitGrpcSdk.Metrics?.createCounter(config as CounterConfiguration<any>);
break;
case MetricType.Gauge:
ConduitGrpcSdk.Metrics?.createGauge(config as GaugeConfiguration<any>);
break;
case MetricType.Histogram:
ConduitGrpcSdk.Metrics?.createHistogram(config as HistogramConfiguration<any>);
break;
case MetricType.Summary:
ConduitGrpcSdk.Metrics?.createSummary(config as SummaryConfiguration<any>);
break;
}
}

createModuleClient(moduleName: string, moduleUrl: string) {
if (
this._modules[moduleName] ||
Expand Down
10 changes: 0 additions & 10 deletions libraries/grpc-sdk/src/metrics/MetricsServer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,6 @@ export class MetricsServer {
}

initialize() {
this._initialize();
}

private _initialize() {
const server = express();
const port = this.getHttpPort();
const url = '0.0.0.0:' + port.toString();
Expand All @@ -36,12 +32,6 @@ export class MetricsServer {
const metrics = await this.Registry.metrics();
return res.status(200).send(metrics);
});
server.get('/metrics/reset', async (req: express.Request, res: express.Response) => {
this.Registry.resetMetrics();
return res.status(200).send({
message: 'Metrics reset',
});
});
return server;
}

Expand Down
143 changes: 143 additions & 0 deletions libraries/grpc-sdk/src/metrics/clientMiddleware.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
// Forked from nice-grpc-prometheus

import { isAbortError } from 'abort-controller-x';
import {
CallOptions,
ClientError,
ClientMiddleware,
ClientMiddlewareCall,
Status,
MethodDescriptor,
} from 'nice-grpc-common';
import { Registry, Counter, Histogram, exponentialBuckets } from 'prom-client';

const registry = new Registry();

const typeLabel = 'grpc_type';
const serviceLabel = 'grpc_service';
const methodLabel = 'grpc_method';
const pathLabel = 'grpc_path';
const codeLabel = 'grpc_code';
/**
* 1ms, 4ms, 16ms, ..., ~1 hour in seconds
*/
const latencySecondsBuckets = exponentialBuckets(0.001, 4, 12);

function getLabels(method: MethodDescriptor) {
const callType = method.requestStream
? method.responseStream
? 'bidi_stream'
: 'client_stream'
: method.responseStream
? 'server_stream'
: 'unary';
const { path } = method;
const [serviceName, methodName] = path.split('/').slice(1);
return {
[typeLabel]: callType,
[serviceLabel]: serviceName,
[methodLabel]: methodName,
[pathLabel]: path,
};
}

async function* incrementStreamMessagesCounter<T>(
iterable: AsyncIterable<T>,
counter: Counter.Internal,
): AsyncIterable<T> {
for await (const item of iterable) {
counter.inc();
yield item;
}
}

const clientStartedMetric = new Counter({
registers: [registry],
name: 'conduit_grpc_client_started_total',
help: 'Total number of RPCs started on the client.',
labelNames: [typeLabel, serviceLabel, methodLabel, pathLabel],
});

const clientHandledMetric = new Counter({
registers: [registry],
name: 'conduit_grpc_client_handled_total',
help: 'Total number of RPCs completed on the client, regardless of success or failure.',
labelNames: [typeLabel, serviceLabel, methodLabel, pathLabel, codeLabel],
});

const clientStreamMsgReceivedMetric = new Counter({
registers: [registry],
name: 'conduit_grpc_client_msg_received_total',
help: 'Total number of RPC stream messages received by the client.',
labelNames: [typeLabel, serviceLabel, methodLabel, pathLabel],
});

const clientStreamMsgSentMetric = new Counter({
registers: [registry],
name: 'conduit_grpc_client_msg_sent_total',
help: 'Total number of gRPC stream messages sent by the client.',
labelNames: [typeLabel, serviceLabel, methodLabel, pathLabel],
});

const clientHandlingSecondsMetric = new Histogram({
registers: [registry],
name: 'grpc_client_handling_seconds',
help: 'Histogram of response latency (seconds) of the gRPC until it is finished by the application.',
labelNames: [typeLabel, serviceLabel, methodLabel, pathLabel, codeLabel],
buckets: latencySecondsBuckets,
});

export function clientMiddleware(): ClientMiddleware {
return async function* prometheusClientMiddlewareGenerator<Request, Response>(
call: ClientMiddlewareCall<Request, Response>,
options: CallOptions,
): AsyncGenerator<Response, Response | void, undefined> {
const labels = getLabels(call.method);
clientStartedMetric.inc(labels);
const stopTimer = clientHandlingSecondsMetric.startTimer(labels);
let settled = false;
let status: Status = Status.OK;
try {
let request;
if (!call.requestStream) {
request = call.request;
} else {
request = incrementStreamMessagesCounter(
call.request,
clientStreamMsgSentMetric.labels(labels),
);
}
if (!call.responseStream) {
const response = yield* call.next(request, options);
settled = true;
return response;
} else {
yield* incrementStreamMessagesCounter(
call.next(request, options),
clientStreamMsgReceivedMetric.labels(labels),
);
settled = true;
return;
}
} catch (err: unknown) {
settled = true;
if (err instanceof ClientError) {
status = err.code;
} else if (isAbortError(err)) {
status = Status.CANCELLED;
} else {
status = Status.UNKNOWN;
}
throw err;
} finally {
if (!settled) {
status = Status.CANCELLED;
}
stopTimer({ [codeLabel]: Status[status] });
clientHandledMetric.inc({
...labels,
[codeLabel]: Status[status],
});
}
};
}
34 changes: 31 additions & 3 deletions libraries/grpc-sdk/src/metrics/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@ import {
SummaryConfiguration,
} from 'prom-client';
import { MetricsServer } from './MetricsServer';
import { registry as niceGrpcRegistry } from 'nice-grpc-prometheus';
import defaultMetrics from './config/defaults';
import { Registry } from 'prom-client';
import { MetricConfiguration, MetricType } from '../types';

export class ConduitMetrics {
private readonly moduleName: string;
Expand All @@ -19,12 +21,38 @@ export class ConduitMetrics {
this.moduleName = moduleName;
this.instance = instance;
const globalRegistry = new client.Registry();
this.Registry = client.Registry.merge([globalRegistry, niceGrpcRegistry]);
this.Registry = client.Registry.merge([globalRegistry, new Registry()]);
this._httpServer = new MetricsServer(moduleName, instance, this.Registry);
this._httpServer.initialize();
this.collectDefaultMetrics();
}

initializeDefaultMetrics() {
for (const metric of Object.values(defaultMetrics)) {
this.registerMetric(metric.type, metric.config);
}
}

registerMetric(type: MetricType, config: MetricConfiguration) {
if (this.getMetric(config.name)) return;
const metricConfig = JSON.parse(JSON.stringify(config));
metricConfig.name = this.addPrefix(config.name);
switch (type) {
case MetricType.Counter:
this.createCounter(metricConfig as CounterConfiguration<any>);
break;
case MetricType.Gauge:
this.createGauge(metricConfig as GaugeConfiguration<any>);
break;
case MetricType.Histogram:
this.createHistogram(metricConfig as HistogramConfiguration<any>);
break;
case MetricType.Summary:
this.createSummary(metricConfig as SummaryConfiguration<any>);
break;
}
}

setDefaultLabels(labels: { [key: string]: string }) {
this.Registry.setDefaultLabels(labels);
}
Expand Down Expand Up @@ -56,7 +84,7 @@ export class ConduitMetrics {
}

getMetric(name: string) {
return this.Registry.getSingleMetric(name);
return this.Registry.getSingleMetric(this.addPrefix(name));
}

increment(metric: string, increment: number = 1, labels?: LabelValues<any>) {
Expand Down
Loading