From 055ef417ade05072421c1cee9f8221ce731bdf89 Mon Sep 17 00:00:00 2001 From: Trent Mick Date: Wed, 26 Jun 2024 23:16:10 -0700 Subject: [PATCH] feat(instrumentation-pino): add log sending to Logs Bridge API (#2249) * feat(instrumentation-pino): add log sending to Logs Bridge API * refactor tests (mostly from separate #2247 PR); lint:fix; some in-progress changes * remove some old dev/debug code * feat!: make it so re-enabled instr after creating a logger will NOT change behaviour for that logger This removes functionality that was there before, so technically could be breaking. The motivation is to have the contract of pino instrumentation be cleaner. With this change a pino Logger instance will not be touched if the PinoInstrumentation is disabled. I.e. we are hands-off as much as possible when disabled. Before this change, even when disabled, the instrumentation would tweak the pino Logger config to have a no-op mixin. If the instrumentation was later enabled, then the mixin would become active (adding trace_id et al fields in a span context). The coming "log sending" to the Logs Bridge API will *not* support this "work if instrumentation is re-enabled later", so I think it is clearer if neither "log sending" nor "log correlation" support this. We can back this out if we think it is important to support a possible future feature of the SDK doing live enabling/disabling of individual instrumentations. * impl disableLogCorrelation config; undo the previous commit so that log-correlation *will* follow the live instr enable/disable state * log sending: first tests; change impl to use pino.multistream * edge case tests; almost complete * more tests and a fix for 'useOnlyCustomLevels: true' usage * lint:fix * refactor some code out to utils file * add some internal docs * update readme * fix lint * avoid a possible flaky error if using pino 'unixTime' and logging in the first half-second since process start Effectively the issue is that this sometimes returns true: node -e 'console.log(Math.round(Date.now() / 1e3) * 1e3 < performance.timeOrigin)' * limit log-sending to pino@7 and later because that's when pino.multistream was added * lint:fix * discuss pino-opentelemetry-transport alternative * fix a mis-merge * update changed deps to their new latest * typo in README Co-authored-by: Hector Hernandez <39923391+hectorhdzg@users.noreply.github.com> --------- Co-authored-by: Hector Hernandez <39923391+hectorhdzg@users.noreply.github.com> Co-authored-by: Marc Pichler --- package-lock.json | 6 + .../README.md | 101 +++-- .../package.json | 3 + .../src/instrumentation.ts | 111 ++++-- .../src/log-sending-utils.ts | 241 ++++++++++++ .../src/types.ts | 24 +- .../test/pino.test.ts | 370 ++++++++++++++++++ 7 files changed, 784 insertions(+), 72 deletions(-) create mode 100644 plugins/node/opentelemetry-instrumentation-pino/src/log-sending-utils.ts diff --git a/package-lock.json b/package-lock.json index 6b95edea28..5ea1ebd21b 100644 --- a/package-lock.json +++ b/package-lock.json @@ -39393,6 +39393,8 @@ "version": "0.40.0", "license": "Apache-2.0", "dependencies": { + "@opentelemetry/api-logs": "^0.52.0", + "@opentelemetry/core": "^1.25.0", "@opentelemetry/instrumentation": "^0.52.0" }, "devDependencies": { @@ -39400,6 +39402,7 @@ "@opentelemetry/contrib-test-utils": "^0.40.0", "@opentelemetry/sdk-trace-base": "^1.8.0", "@opentelemetry/sdk-trace-node": "^1.8.0", + "@opentelemetry/semantic-conventions": "^1.22.0", "@types/mocha": "7.0.2", "@types/node": "18.6.5", "@types/semver": "7.5.3", @@ -52949,10 +52952,13 @@ "version": "file:plugins/node/opentelemetry-instrumentation-pino", "requires": { "@opentelemetry/api": "^1.3.0", + "@opentelemetry/api-logs": "^0.52.0", "@opentelemetry/contrib-test-utils": "^0.40.0", + "@opentelemetry/core": "^1.25.0", "@opentelemetry/instrumentation": "^0.52.0", "@opentelemetry/sdk-trace-base": "^1.8.0", "@opentelemetry/sdk-trace-node": "^1.8.0", + "@opentelemetry/semantic-conventions": "^1.22.0", "@types/mocha": "7.0.2", "@types/node": "18.6.5", "@types/semver": "7.5.3", diff --git a/plugins/node/opentelemetry-instrumentation-pino/README.md b/plugins/node/opentelemetry-instrumentation-pino/README.md index 4e132bd220..bbd6fa9e5c 100644 --- a/plugins/node/opentelemetry-instrumentation-pino/README.md +++ b/plugins/node/opentelemetry-instrumentation-pino/README.md @@ -3,7 +3,7 @@ [![NPM Published Version][npm-img]][npm-url] [![Apache License][license-image]][license-image] -This module provides automatic instrumentation for injection of trace context for the [`pino`](https://www.npmjs.com/package/pino) module, which may be loaded using the [`@opentelemetry/sdk-trace-node`](https://github.com/open-telemetry/opentelemetry-js/tree/main/packages/opentelemetry-sdk-trace-node) package and is included in the [`@opentelemetry/auto-instrumentations-node`](https://www.npmjs.com/package/@opentelemetry/auto-instrumentations-node) bundle. +This module provides automatic instrumentation of the [`pino`](https://www.npmjs.com/package/pino) module to inject trace-context into Pino log records (log correlation) and to send Pino logging to the OpenTelemetry Logging SDK (log sending). It may be loaded using the [`@opentelemetry/sdk-trace-node`](https://github.com/open-telemetry/opentelemetry-js/tree/main/packages/opentelemetry-sdk-trace-node) package and is included in the [`@opentelemetry/auto-instrumentations-node`](https://www.npmjs.com/package/@opentelemetry/auto-instrumentations-node) bundle. If total installation size is not constrained, it is recommended to use the [`@opentelemetry/auto-instrumentations-node`](https://www.npmjs.com/package/@opentelemetry/auto-instrumentations-node) bundle with [@opentelemetry/sdk-node](`https://www.npmjs.com/package/@opentelemetry/sdk-node`) for the most seamless instrumentation experience. @@ -15,60 +15,105 @@ Compatible with OpenTelemetry JS API and SDK `1.0+`. npm install --save @opentelemetry/instrumentation-pino ``` -### Supported Versions +## Supported Versions - [`pino`](https://www.npmjs.com/package/pino) versions `>=5.14.0 <10` + - The "log sending" feature is only supported in pino v7 and later. ## Usage ```js -const { NodeTracerProvider } = require('@opentelemetry/sdk-trace-node'); +const { NodeSDK, tracing, logs, api } = require('@opentelemetry/sdk-node'); const { PinoInstrumentation } = require('@opentelemetry/instrumentation-pino'); -const { registerInstrumentations } = require('@opentelemetry/instrumentation'); - -const provider = new NodeTracerProvider(); -provider.register(); - -registerInstrumentations({ +const sdk = new NodeSDK({ + spanProcessor: new tracing.SimpleSpanProcessor(new tracing.ConsoleSpanExporter()), + logRecordProcessor: new logs.SimpleLogRecordProcessor(new logs.ConsoleLogRecordExporter()), instrumentations: [ new PinoInstrumentation({ - // Optional hook to insert additional context to log object. - logHook: (span, record, level) => { - record['resource.service.name'] = - provider.resource.attributes['service.name']; - }, - // Log span context under custom keys - // This is optional, and will default to "trace_id", "span_id" and "trace_flags" as the keys - logKeys: { - traceId: 'traceId', - spanId: 'spanId', - traceFlags: 'traceFlags', - }, + // See below for Pino instrumentation options. }), - // other instrumentations - ], -}); + ] +}) +sdk.start(); const pino = require('pino'); const logger = pino(); -logger.info('foobar'); -// {"msg":"foobar","trace_id":"fc30029f30df383a4090d3189fe0ffdf","span_id":"625fa861d19d1056","trace_flags":"01", ...} + +logger.info('hi'); +// 1. Log records will be sent to the SDK-registered log record processor, if any. +// This is called "log sending". + +const tracer = api.trace.getTracer('example'); +tracer.startActiveSpan('manual-span', span => { + logger.info('in a span'); + // 2. Fields identifying the current span will be added to log records: + // {"level":30,...,"msg":"in a span","trace_id":"d61b4e4af1032e0aae279d12f3ab0159","span_id":"d140da862204f2a2","trace_flags":"01"} + // This feature is called "log correlation". +}); ``` -### Fields added to pino log objects +### Log sending + +Creation of a Pino Logger will automatically add a [Pino destination](https://getpino.io/#/docs/api?id=pinooptions-destination-gt-logger) that sends log records to the OpenTelemetry Logs SDK. The OpenTelemetry SDK can be configured to handle those records -- for example, sending them on to an OpenTelemetry collector for log archiving and processing. The example above shows a minimal configuration that emits OpenTelemetry log records to the console for debugging. + +If the OpenTelemetry SDK is not configured with a Logger provider, then this added destination will be a no-op. + +Log sending can be disabled with the `disableLogSending: true` option. -For the current active span, the following fields are injected. These field names can be optionally configured via `logKeys` in the PinoInstrumentation config: +### Log correlation + +Pino logger calls in the context of a tracing span will have fields identifying the span added to the log record. This allows [correlating](https://opentelemetry.io/docs/specs/otel/logs/#log-correlation) log records with tracing data. The added fields are ([spec](https://opentelemetry.io/docs/specs/otel/compatibility/logging_trace_context/)): - `trace_id` - `span_id` - `trace_flags` +These field names can optionally be configured via the `logKeys` option. For example: + +```js + new PinoInstrumentation({ + logKeys: { + traceId: 'myTraceId', + spanId: 'mySpanId', + traceFlags: 'myTraceFlags', + }, + }), +``` + +After adding these fields, the optional `logHook` is called to allow injecting additional fields. For example: + +```js + logHook: (span, record) => { + record['resource.service.name'] = provider.resource.attributes['service.name']; + } +``` + When no span context is active or the span context is invalid, injection is skipped. +Log injection can be disabled with the `disableLogCorrelation: true` option. + +### Pino instrumentation options + +| Option | Type | Description | +| ----------------------- | ----------------- | ----------- | +| `disableLogSending` | `boolean` | Whether to disable [log sending](#log-sending). Default `false`. | +| `disableLogCorrelation` | `boolean` | Whether to disable [log correlation](#log-correlation). Default `false`. | +| `logKeys` | record | A record with keys `traceId`, `spanId`, and `traceFlags` string fields giving the field names to use for log-correlation span data. | +| `logHook` | `LogHookFunction` | An option hook to inject additional context to a log record after trace-context has been added. This requires `disableLogCorrelation` to be false. | ## Semantic Conventions This package does not currently generate any attributes from semantic conventions. +## Alternative log sending with `pino-opentelemetry-transport` + +A possible alternative to the "log sending" support provided by this instrumentation is the [`pino-opentelemetry-transport` package](https://github.com/pinojs/pino-opentelemetry-transport). + +Every Pino logger has an output ["destination"](https://getpino.io/#/docs/api?id=destination), for example, a file or stdout. Since v7, Pino includes support for ["transports"](https://getpino.io/#/docs/transports), a type of destination that uses a [worker thread](https://nodejs.org/api/worker_threads.html) to run the transport code. When calling `logger.info("hi")`, Pino serializes the log record to a JSON string, [sends that string to the worker](https://nodejs.org/api/worker_threads.html#workerpostmessagevalue-transferlist) for it to be handled. + +The `pino-opentelemetry-transport` package uses this mechanism. It starts an OpenTelemetry SDK `LoggerProvider` in the worker thread, parses each log record string, translates it into the OpenTelemetry Logs data model and sends it. Note that this `LoggerProvider` is independent of any OpenTelemetry SDK components in the main thread. + +The log sending support in this instrumentation works on the main thread and uses the OpenTelemetry SDK configured in the main thread. Otherwise the two mechanisms are very similar. Note that because they are maintained separately, there might be small differences in how Pino log records are translated into the OpenTelemetry Logs data model. + ## Useful links - For more information on OpenTelemetry, visit: diff --git a/plugins/node/opentelemetry-instrumentation-pino/package.json b/plugins/node/opentelemetry-instrumentation-pino/package.json index 12f2485424..7fefd37481 100644 --- a/plugins/node/opentelemetry-instrumentation-pino/package.json +++ b/plugins/node/opentelemetry-instrumentation-pino/package.json @@ -49,6 +49,7 @@ "@opentelemetry/contrib-test-utils": "^0.40.0", "@opentelemetry/sdk-trace-base": "^1.8.0", "@opentelemetry/sdk-trace-node": "^1.8.0", + "@opentelemetry/semantic-conventions": "^1.22.0", "@types/mocha": "7.0.2", "@types/node": "18.6.5", "@types/semver": "7.5.3", @@ -64,6 +65,8 @@ "typescript": "4.4.4" }, "dependencies": { + "@opentelemetry/api-logs": "^0.52.0", + "@opentelemetry/core": "^1.25.0", "@opentelemetry/instrumentation": "^0.52.0" }, "homepage": "https://github.com/open-telemetry/opentelemetry-js-contrib/tree/main/plugins/node/opentelemetry-instrumentation-pino#readme" diff --git a/plugins/node/opentelemetry-instrumentation-pino/src/instrumentation.ts b/plugins/node/opentelemetry-instrumentation-pino/src/instrumentation.ts index f7dc81f034..7a6edf907d 100644 --- a/plugins/node/opentelemetry-instrumentation-pino/src/instrumentation.ts +++ b/plugins/node/opentelemetry-instrumentation-pino/src/instrumentation.ts @@ -28,6 +28,7 @@ import { } from '@opentelemetry/instrumentation'; import { PinoInstrumentationConfig } from './types'; import { PACKAGE_NAME, PACKAGE_VERSION } from './version'; +import { getTimeConverter, OTelPinoStream } from './log-sending-utils'; const pinoVersions = ['>=5.14.0 <10']; @@ -48,30 +49,73 @@ export class PinoInstrumentation extends InstrumentationBase { const isESM = module[Symbol.toStringTag] === 'Module'; const moduleExports = isESM ? module.default : module; const instrumentation = this; + const patchedPino = Object.assign((...args: unknown[]) => { - if (args.length === 0) { - return moduleExports({ - mixin: instrumentation._getMixinFunction(), - }); + const config = instrumentation.getConfig(); + const isEnabled = instrumentation.isEnabled(); + + const logger = moduleExports(...args); + + // Setup "log correlation" -- injection of `trace_id` et al fields. + // Note: If the Pino logger is configured with `nestedKey`, then + // the `trace_id` et al fields added by `otelMixin` will be nested + // as well. https://getpino.io/#/docs/api?id=mixin-function + const otelMixin = instrumentation._getMixinFunction(); + const mixinSym = moduleExports.symbols.mixinSym; + const origMixin = logger[mixinSym]; + if (origMixin === undefined) { + logger[mixinSym] = otelMixin; + } else { + logger[mixinSym] = (ctx: object, level: number) => { + return Object.assign( + otelMixin(ctx, level), + origMixin(ctx, level) + ); + }; } - if (args.length === 1) { - // eslint-disable-next-line @typescript-eslint/no-explicit-any - const optsOrStream = args[0] as any; - if ( - typeof optsOrStream === 'string' || - typeof optsOrStream?.write === 'function' - ) { - args.splice(0, 0, { - mixin: instrumentation._getMixinFunction(), - }); - return moduleExports(...args); - } - } + // Setup "log sending" -- sending log records to the Logs API. + // This depends on `pino.multistream`, which was added in v7.0.0. + if ( + isEnabled && + !config.disableLogSending && + typeof moduleExports.multistream === 'function' + ) { + const otelTimestampFromTime = getTimeConverter( + logger, + moduleExports + ); + const otelStream = new OTelPinoStream({ + messageKey: logger[moduleExports.symbols.messageKeySym], + levels: logger.levels, + otelTimestampFromTime, + }); + (otelStream as any)[Symbol.for('pino.metadata')] = true; // for `stream.lastLevel` + + // An error typically indicates a Pino bug, or logger configuration + // bug. `diag.warn` *once* for the first error on the assumption + // subsequent ones stem from the same bug. + otelStream.once('unknown', (line, err) => { + instrumentation._diag.warn( + 'could not send pino log line (will only log first occurrence)', + { line, err } + ); + }); - args[0] = instrumentation._combineOptions(args[0]); + // Use pino's own `multistream` to send to the original stream and + // to the OTel Logs API/SDK. + // https://getpino.io/#/docs/api?id=pinomultistreamstreamsarray-opts-gt-multistreamres + const origStream = logger[moduleExports.symbols.streamSym]; + logger[moduleExports.symbols.streamSym] = moduleExports.multistream( + [ + { level: logger.level, stream: origStream }, + { level: logger.level, stream: otelStream }, + ], + { levels: logger.levels.values } + ); + } - return moduleExports(...args); + return logger; }, moduleExports); if (typeof patchedPino.pino === 'function') { @@ -80,6 +124,7 @@ export class PinoInstrumentation extends InstrumentationBase { if (typeof patchedPino.default === 'function') { patchedPino.default = patchedPino; } + /* istanbul ignore if */ if (isESM) { if (module.pino) { // This was added in pino@6.8.0 (https://github.com/pinojs/pino/pull/936). @@ -122,7 +167,10 @@ export class PinoInstrumentation extends InstrumentationBase { private _getMixinFunction() { const instrumentation = this; return function otelMixin(_context: object, level: number) { - if (!instrumentation.isEnabled()) { + if ( + !instrumentation.isEnabled() || + instrumentation.getConfig().disableLogCorrelation + ) { return {}; } @@ -151,27 +199,4 @@ export class PinoInstrumentation extends InstrumentationBase { return record; }; } - - private _combineOptions(options?: any) { - if (options === undefined) { - return { mixin: this._getMixinFunction() }; - } - - if (options.mixin === undefined) { - options.mixin = this._getMixinFunction(); - return options; - } - - const originalMixin = options.mixin; - const otelMixin = this._getMixinFunction(); - - options.mixin = (context: object, level: number) => { - return Object.assign( - otelMixin(context, level), - originalMixin(context, level) - ); - }; - - return options; - } } diff --git a/plugins/node/opentelemetry-instrumentation-pino/src/log-sending-utils.ts b/plugins/node/opentelemetry-instrumentation-pino/src/log-sending-utils.ts new file mode 100644 index 0000000000..f24cd20cd2 --- /dev/null +++ b/plugins/node/opentelemetry-instrumentation-pino/src/log-sending-utils.ts @@ -0,0 +1,241 @@ +/* + * Copyright The OpenTelemetry Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import { Writable } from 'stream'; + +import { logs, Logger, SeverityNumber } from '@opentelemetry/api-logs'; +import { PACKAGE_NAME, PACKAGE_VERSION } from './version'; +import { millisToHrTime } from '@opentelemetry/core'; + +// This block is a copy (modulo code style and TypeScript types) of the Pino +// code that defines log level value and names. This file is part of +// *instrumenting* Pino, so we want to avoid a dependency on the library. +const DEFAULT_LEVELS = { + trace: 10, + debug: 20, + info: 30, + warn: 40, + error: 50, + fatal: 60, +}; + +const OTEL_SEV_NUM_FROM_PINO_LEVEL: { [level: number]: SeverityNumber } = { + [DEFAULT_LEVELS.trace]: SeverityNumber.TRACE, + [DEFAULT_LEVELS.debug]: SeverityNumber.DEBUG, + [DEFAULT_LEVELS.info]: SeverityNumber.INFO, + [DEFAULT_LEVELS.warn]: SeverityNumber.WARN, + [DEFAULT_LEVELS.error]: SeverityNumber.ERROR, + [DEFAULT_LEVELS.fatal]: SeverityNumber.FATAL, +}; + +const EXTRA_SEV_NUMS = [ + SeverityNumber.TRACE2, + SeverityNumber.TRACE3, + SeverityNumber.TRACE4, + SeverityNumber.DEBUG2, + SeverityNumber.DEBUG3, + SeverityNumber.DEBUG4, + SeverityNumber.INFO2, + SeverityNumber.INFO3, + SeverityNumber.INFO4, + SeverityNumber.WARN2, + SeverityNumber.WARN3, + SeverityNumber.WARN4, + SeverityNumber.ERROR2, + SeverityNumber.ERROR3, + SeverityNumber.ERROR4, + SeverityNumber.FATAL2, + SeverityNumber.FATAL3, + SeverityNumber.FATAL4, +]; + +function severityNumberFromPinoLevel(lvl: number) { + // Fast common case: one of the known levels + const sev = OTEL_SEV_NUM_FROM_PINO_LEVEL[lvl]; + if (sev !== undefined) { + return sev; + } + + // Otherwise, scale the Pino level range -- 10 (trace) to 70 (fatal+10) + // -- onto the extra OTel severity numbers (TRACE2, TRACE3, ..., FATAL4). + // Values below trace (10) map to SeverityNumber.TRACE2, which may be + // considered a bit weird, but it means the unnumbered levels are always + // just for exactly matching values. + const relativeLevelWeight = (lvl - 10) / (70 - 10); + const otelSevIdx = Math.floor(relativeLevelWeight * EXTRA_SEV_NUMS.length); + const cappedOTelIdx = Math.min( + EXTRA_SEV_NUMS.length - 1, + Math.max(0, otelSevIdx) + ); + const otelSevValue = EXTRA_SEV_NUMS[cappedOTelIdx]; + return otelSevValue; +} + +/** + * Return a function that knows how to convert the "time" field value on a + * Pino log record to an OTel LogRecord timestamp value. + * + * How to convert the serialized "time" on a Pino log record + * depends on the Logger's `Symbol(pino.time)` prop, configurable + * via https://getpino.io/#/docs/api?id=timestamp-boolean-function + * + * For example: + * const logger = pino({timestamp: pino.stdTimeFunctions.isoTime}) + * results in log record entries of the form: + * ,"time":"2024-05-17T22:03:25.969Z" + * `otelTimestampFromTime` will be given the value of the "time" field: + * "2024-05-17T22:03:25.969Z" + * which should be parsed to a number of milliseconds since the epoch. + */ +export function getTimeConverter(pinoLogger: any, pinoMod: any) { + const stdTimeFns = pinoMod.stdTimeFunctions; + const loggerTimeFn = pinoLogger[pinoMod.symbols.timeSym]; + if (loggerTimeFn === stdTimeFns.epochTime) { + return (time: number) => time; + } else if (loggerTimeFn === stdTimeFns.unixTime) { + return (time: number) => time * 1e3; + } else if (loggerTimeFn === stdTimeFns.isoTime) { + return (time: string) => new Date(time).getTime(); + } else if (loggerTimeFn === stdTimeFns.nullTime) { + return () => Date.now(); + } else { + // The logger has a custom time function. Don't guess. + return () => NaN; + } +} + +interface OTelPinoStreamOptions { + messageKey: string; + levels: any; // Pino.LevelMapping + otelTimestampFromTime: (time: any) => number; +} + +/** + * A Pino stream for sending records to the OpenTelemetry Logs API. + * + * - This stream emits an 'unknown' event on an unprocessable pino record. + * The event arguments are: `logLine: string`, `err: string | Error`. + */ +export class OTelPinoStream extends Writable { + private _otelLogger: Logger; + private _messageKey: string; + private _levels; + private _otelTimestampFromTime; + + constructor(options: OTelPinoStreamOptions) { + super(); + + // Note: A PINO_CONFIG event was added to pino (2024-04-04) to send config + // to transports. Eventually OTelPinoStream might be able to use this + // for auto-configuration in newer pino versions. The event currently does + // not include the `timeSym` value that is needed here, however. + this._messageKey = options.messageKey; + this._levels = options.levels; + this._otelTimestampFromTime = options.otelTimestampFromTime; + + // Cannot use `instrumentation.logger` until have delegating LoggerProvider: + // https://github.com/open-telemetry/opentelemetry-js/issues/4399 + this._otelLogger = logs.getLogger(PACKAGE_NAME, PACKAGE_VERSION); + } + + override _write(s: string, _encoding: string, callback: Function) { + /* istanbul ignore if */ + if (!s) { + return; + } + + // Parse, and handle edge cases similar to how `pino-abtract-transport` does: + // https://github.com/pinojs/pino-abstract-transport/blob/v1.2.0/index.js#L28-L45 + // - Emitting an 'unknown' event on parse error mimicks pino-abstract-transport. + let recObj; + try { + recObj = JSON.parse(s); + } catch (parseErr) { + // Invalid JSON suggests a bug in Pino, or a logger configuration bug + // (a bogus `options.timestamp` or serializer). + this.emit('unknown', s.toString(), parseErr); + callback(); + return; + } + /* istanbul ignore if */ + if (recObj === null) { + this.emit('unknown', s.toString(), 'Null value ignored'); + callback(); + return; + } + /* istanbul ignore if */ + if (typeof recObj !== 'object') { + recObj = { + data: recObj, + }; + } + + const { + time, + [this._messageKey]: body, + level, // eslint-disable-line @typescript-eslint/no-unused-vars + + // The typical Pino `hostname` and `pid` fields are removed because they + // are redundant with the OpenTelemetry `host.name` and `process.pid` + // Resource attributes, respectively. This code cannot change the + // LoggerProvider's `resource`, so getting the OpenTelemetry equivalents + // depends on the user using the OpenTelemetry HostDetector and + // ProcessDetector. + // https://getpino.io/#/docs/api?id=opt-base + hostname, // eslint-disable-line @typescript-eslint/no-unused-vars + pid, // eslint-disable-line @typescript-eslint/no-unused-vars + + // The `trace_id` et al fields that may have been added by the + // "log correlation" feature are stripped, because they are redundant. + trace_id, // eslint-disable-line @typescript-eslint/no-unused-vars + span_id, // eslint-disable-line @typescript-eslint/no-unused-vars + trace_flags, // eslint-disable-line @typescript-eslint/no-unused-vars + + ...attributes + } = recObj; + + let timestamp = this._otelTimestampFromTime(time); + if (isNaN(timestamp)) { + attributes['time'] = time; // save the unexpected "time" field to attributes + timestamp = Date.now(); + } + + // This avoids a possible subtle bug when a Pino logger uses + // `time: pino.stdTimeFunctions.unixTime` and logs in the first half-second + // since process start. The rounding involved results in: + // timestamp < performance.timeOrigin + // If that is passed to Logger.emit() it will be misinterpreted by + // `timeInputToHrTime` as a `performance.now()` value. + const timestampHrTime = millisToHrTime(timestamp); + + // Prefer using `stream.lastLevel`, because `recObj.level` can be customized + // to anything via `formatters.level` + // (https://getpino.io/#/docs/api?id=formatters-object). + const lastLevel = (this as any).lastLevel; + + const otelRec = { + timestamp: timestampHrTime, + observedTimestamp: timestampHrTime, + severityNumber: severityNumberFromPinoLevel(lastLevel), + severityText: this._levels.labels[lastLevel], + body, + attributes, + }; + + this._otelLogger.emit(otelRec); + callback(); + } +} diff --git a/plugins/node/opentelemetry-instrumentation-pino/src/types.ts b/plugins/node/opentelemetry-instrumentation-pino/src/types.ts index 52484885a7..43fabc5065 100644 --- a/plugins/node/opentelemetry-instrumentation-pino/src/types.ts +++ b/plugins/node/opentelemetry-instrumentation-pino/src/types.ts @@ -25,9 +25,31 @@ export type LogHookFunction = ( ) => void; export interface PinoInstrumentationConfig extends InstrumentationConfig { + /** + * Whether to disable the automatic sending of log records to the + * OpenTelemetry Logs SDK. + * @default false + */ + disableLogSending?: boolean; + + /** + * Whether to disable the injection trace-context fields, and possibly other + * fields from `logHook()`, into log records for log correlation. + * @default false + */ + disableLogCorrelation?: boolean; + + /** + * A function that allows injecting additional fields in log records. It is + * called, as `logHook(span, record)`, for each log record emitted in a valid + * span context. It requires `disableLogCorrelation` to be false. + */ logHook?: LogHookFunction; - /** Configure the names of field injected into logs when there is span context available. */ + /** + * Configure the names of field injected into logs when there is span context + * available. + */ logKeys?: { traceId: string; spanId: string; diff --git a/plugins/node/opentelemetry-instrumentation-pino/test/pino.test.ts b/plugins/node/opentelemetry-instrumentation-pino/test/pino.test.ts index 4b0936452d..6f03e197cd 100644 --- a/plugins/node/opentelemetry-instrumentation-pino/test/pino.test.ts +++ b/plugins/node/opentelemetry-instrumentation-pino/test/pino.test.ts @@ -20,17 +20,28 @@ import { Writable } from 'stream'; import * as semver from 'semver'; import * as sinon from 'sinon'; import { INVALID_SPAN_CONTEXT, context, trace, Span } from '@opentelemetry/api'; +import { diag, DiagLogLevel } from '@opentelemetry/api'; +import { hrTimeToMilliseconds } from '@opentelemetry/core'; +import { SEMRESATTRS_SERVICE_NAME } from '@opentelemetry/semantic-conventions'; +import { Resource } from '@opentelemetry/resources'; import { InMemorySpanExporter, SimpleSpanProcessor, } from '@opentelemetry/sdk-trace-base'; import { NodeTracerProvider } from '@opentelemetry/sdk-trace-node'; +import { logs, SeverityNumber } from '@opentelemetry/api-logs'; +import { + LoggerProvider, + SimpleLogRecordProcessor, + InMemoryLogRecordExporter, +} from '@opentelemetry/sdk-logs'; import { runTestFixture, TestCollector, } from '@opentelemetry/contrib-test-utils'; import { PinoInstrumentation, PinoInstrumentationConfig } from '../src'; +import { PACKAGE_NAME, PACKAGE_VERSION } from '../src/version'; import type { pino as Pino } from 'pino'; @@ -41,6 +52,15 @@ tracerProvider.addSpanProcessor( ); const tracer = tracerProvider.getTracer('default'); +// Setup LoggerProvider for "log sending" tests. +const resource = new Resource({ + [SEMRESATTRS_SERVICE_NAME]: 'test-instrumentation-pino', +}); +const loggerProvider = new LoggerProvider({ resource }); +const memExporter = new InMemoryLogRecordExporter(); +loggerProvider.addLogRecordProcessor(new SimpleLogRecordProcessor(memExporter)); +logs.setGlobalLoggerProvider(loggerProvider); + const instrumentation = new PinoInstrumentation(); const pino = require('pino'); @@ -113,6 +133,7 @@ describe('PinoInstrumentation', () => { beforeEach(() => { instrumentation.setConfig({}); // reset to defaults + memExporter.getFinishedLogRecords().length = 0; // clear stream = new Writable(); stream._write = () => {}; writeSpy = sinon.spy(stream, 'write'); @@ -228,6 +249,27 @@ describe('PinoInstrumentation', () => { }); }); + it('does not inject or call logHook if disableLogCorrelation=true', () => { + instrumentation.setConfig({ + disableLogCorrelation: true, + logHook: (_span, record) => { + record['resource.service.name'] = 'test-service'; + }, + }); + tracer.startActiveSpan('abc', span => { + logger.info('foo'); + span.end(); + + sinon.assert.calledOnce(writeSpy); + const record = JSON.parse(writeSpy.firstCall.args[0].toString()); + assert.strictEqual('foo', record['msg']); + assert.strictEqual(record['trace_id'], undefined); + assert.strictEqual(record['span_id'], undefined); + assert.strictEqual(record['trace_flags'], undefined); + assert.strictEqual(record['resource.service.name'], undefined); + }); + }); + it('instrumentation of `pino.default(...)` works', function () { if (!pino.default) { this.skip(); @@ -349,6 +391,334 @@ describe('PinoInstrumentation', () => { }); }); + describe('log sending', () => { + let logger: Pino.Logger; + let stream: Writable; + let writeSpy: sinon.SinonSpy; + + before(function () { + if (typeof pino.multistream !== 'function') { + this.skip(); + } + }); + + beforeEach(() => { + instrumentation.setConfig({}); // reset to defaults + memExporter.getFinishedLogRecords().length = 0; // clear + stream = new Writable(); + stream._write = () => {}; + writeSpy = sinon.spy(stream, 'write'); + logger = pino( + { + name: 'test-logger-name', + level: 'debug', + }, + stream + ); + }); + + it('emits log records to Logs SDK', () => { + const logRecords = memExporter.getFinishedLogRecords(); + + // levels + logger.silent('silent'); + logger.trace('at trace level'); + logger.debug('at debug level'); + logger.info('at info level'); + logger.warn('at warn level'); + logger.error('at error level'); + logger.fatal('at fatal level'); + assert.strictEqual(logRecords.length, 5); + assert.strictEqual(logRecords[0].severityNumber, SeverityNumber.DEBUG); + assert.strictEqual(logRecords[0].severityText, 'debug'); + assert.strictEqual(logRecords[1].severityNumber, SeverityNumber.INFO); + assert.strictEqual(logRecords[1].severityText, 'info'); + assert.strictEqual(logRecords[2].severityNumber, SeverityNumber.WARN); + assert.strictEqual(logRecords[2].severityText, 'warn'); + assert.strictEqual(logRecords[3].severityNumber, SeverityNumber.ERROR); + assert.strictEqual(logRecords[3].severityText, 'error'); + assert.strictEqual(logRecords[4].severityNumber, SeverityNumber.FATAL); + assert.strictEqual(logRecords[4].severityText, 'fatal'); + + // attributes, resource, instrumentationScope, etc. + logger.info({ foo: 'bar' }, 'a message'); + const rec = logRecords[logRecords.length - 1]; + assert.strictEqual(rec.body, 'a message'); + assert.deepStrictEqual(rec.attributes, { + name: 'test-logger-name', + foo: 'bar', + }); + assert.strictEqual( + rec.resource.attributes['service.name'], + 'test-instrumentation-pino' + ); + assert.strictEqual(rec.instrumentationScope.name, PACKAGE_NAME); + assert.strictEqual(rec.instrumentationScope.version, PACKAGE_VERSION); + assert.strictEqual(rec.spanContext, undefined); + + // spanContext + tracer.startActiveSpan('abc', span => { + logger.info('in active span'); + span.end(); + + const { traceId, spanId, traceFlags } = span.spanContext(); + const rec = logRecords[logRecords.length - 1]; + assert.strictEqual(rec.spanContext?.traceId, traceId); + assert.strictEqual(rec.spanContext?.spanId, spanId); + assert.strictEqual(rec.spanContext?.traceFlags, traceFlags); + + // This rec should *NOT* have the `trace_id` et al attributes. + assert.strictEqual(rec.attributes.trace_id, undefined); + assert.strictEqual(rec.attributes.span_id, undefined); + assert.strictEqual(rec.attributes.trace_flags, undefined); + }); + }); + + it('does not emit to the Logs SDK if disableLogSending=true', () => { + instrumentation.setConfig({ disableLogSending: true }); + + // Changing `disableLogSending` only has an impact on Loggers created + // *after* it is set. So we cannot test with the `logger` created in + // `beforeEach()` above. + logger = pino({ name: 'test-logger-name' }, stream); + + tracer.startActiveSpan('abc', span => { + logger.info('foo'); + span.end(); + + assert.strictEqual(memExporter.getFinishedLogRecords().length, 0); + + // Test log correlation still works. + const { traceId, spanId } = span.spanContext(); + sinon.assert.calledOnce(writeSpy); + const record = JSON.parse(writeSpy.firstCall.args[0].toString()); + assert.strictEqual('foo', record['msg']); + assert.strictEqual(record['trace_id'], traceId); + assert.strictEqual(record['span_id'], spanId); + }); + }); + + it('edge case: non-time "time" field is stored in attributes', () => { + const logRecords = memExporter.getFinishedLogRecords(); + + // Pino will emit a JSON object with two "time" fields, e.g. + // {...,"time":1716933636063,...,"time":"miller"} + // JSON *parsing* rules are that the last duplicate key wins, so it + // would be nice to maintain that "time" attribute if possible. + logger.info({ time: 'miller' }, 'hi'); + const rec = logRecords[logRecords.length - 1]; + assert.deepEqual( + rec.hrTime.map(n => typeof n), + ['number', 'number'] + ); + assert.strictEqual(rec.attributes.time, 'miller'); + }); + + it('edge case: custom "timestamp" option', () => { + let otelRec, pinoRec; + const logRecords = memExporter.getFinishedLogRecords(); + + logger = pino({ timestamp: false }, stream); + logger.info('using false'); + otelRec = logRecords[logRecords.length - 1]; + pinoRec = JSON.parse(writeSpy.lastCall.args[0].toString()); + assert.deepEqual( + otelRec.hrTime.map(n => typeof n), + ['number', 'number'] + ); + assert.strictEqual(pinoRec.time, undefined); + + logger = pino({ timestamp: pino.stdTimeFunctions.epochTime }, stream); + logger.info('using epochTime'); + otelRec = logRecords[logRecords.length - 1]; + pinoRec = JSON.parse(writeSpy.lastCall.args[0].toString()); + assert.strictEqual(hrTimeToMilliseconds(otelRec.hrTime), pinoRec.time); + + logger = pino({ timestamp: pino.stdTimeFunctions.unixTime }, stream); + logger.info('using unixTime'); + otelRec = logRecords[logRecords.length - 1]; + pinoRec = JSON.parse(writeSpy.lastCall.args[0].toString()); + assert.strictEqual( + hrTimeToMilliseconds(otelRec.hrTime), + pinoRec.time * 1e3 + ); + + logger = pino({ timestamp: pino.stdTimeFunctions.isoTime }, stream); + logger.info('using isoTime'); + otelRec = logRecords[logRecords.length - 1]; + pinoRec = JSON.parse(writeSpy.lastCall.args[0].toString()); + assert.strictEqual( + hrTimeToMilliseconds(otelRec.hrTime), + new Date(pinoRec.time).getTime() + ); + + logger = pino({ timestamp: () => ',"time":"quittin"' }, stream); + logger.info('using custom timestamp fn'); + otelRec = logRecords[logRecords.length - 1]; + pinoRec = JSON.parse(writeSpy.lastCall.args[0].toString()); + assert.deepEqual( + otelRec.hrTime.map(n => typeof n), + ['number', 'number'] + ); + assert.strictEqual(pinoRec.time, 'quittin'); + assert.strictEqual(otelRec.attributes.time, 'quittin'); + }); + + // A custom 'timestamp' fn that returns invalid data will result in a Pino + // log record line that is invalid JSON. We expect the OTel stream to + // gracefully handle this. + it('edge case: error parsing pino log line', () => { + const logRecords = memExporter.getFinishedLogRecords(); + + const diagWarns = [] as any; + // This messily leaves the diag logger set for other tests. + diag.setLogger( + { + verbose() {}, + debug() {}, + info() {}, + warn(...args) { + diagWarns.push(args); + }, + error() {}, + }, + DiagLogLevel.WARN + ); + + logger = pino({ timestamp: () => 'invalid JSON' }, stream); + logger.info('using custom timestamp fn returning bogus result'); + assert.strictEqual(logRecords.length, 0); + assert.ok(writeSpy.lastCall.args[0].toString().includes('invalid JSON')); + assert.equal(diagWarns.length, 1); + assert.ok(diagWarns[0][1].includes('could not send pino log line')); + }); + + it('edge case: customLevels', () => { + let rec; + const logRecords = memExporter.getFinishedLogRecords(); + + logger = pino( + { + customLevels: { + foo: pino.levels.values.warn, + bar: pino.levels.values.warn - 1, // a little closer to INFO + baz: pino.levels.values.warn + 1, // a little above WARN + }, + }, + stream + ); + + (logger as any).foo('foomsg'); + rec = logRecords[logRecords.length - 1]; + assert.strictEqual(rec.severityNumber, SeverityNumber.WARN); + assert.strictEqual(rec.severityText, 'foo'); + + (logger as any).bar('barmsg'); + rec = logRecords[logRecords.length - 1]; + assert.strictEqual(rec.severityNumber, SeverityNumber.INFO4); + assert.strictEqual(rec.severityText, 'bar'); + + (logger as any).baz('bazmsg'); + rec = logRecords[logRecords.length - 1]; + assert.strictEqual(rec.severityNumber, SeverityNumber.WARN2); + assert.strictEqual(rec.severityText, 'baz'); + }); + + it('edge case: customLevels and formatters.level', () => { + logger = pino( + { + customLevels: { + foo: pino.levels.values.warn, + bar: pino.levels.values.warn - 1, // a little closer to INFO + }, + formatters: { + level(label: string, _num: number) { + return { level: label }; + }, + }, + }, + stream + ); + + const logRecords = memExporter.getFinishedLogRecords(); + (logger as any).foo('foomsg'); + const otelRec = logRecords[logRecords.length - 1]; + assert.strictEqual(otelRec.severityNumber, SeverityNumber.WARN); + assert.strictEqual(otelRec.severityText, 'foo'); + + sinon.assert.calledOnce(writeSpy); + const pinoRec = JSON.parse(writeSpy.firstCall.args[0].toString()); + assert.equal((pinoRec as any).level, 'foo'); + }); + + it('edge case: customLevels and useOnlyCustomLevels', () => { + let rec; + const logRecords = memExporter.getFinishedLogRecords(); + + logger = pino( + { + customLevels: { + foo: pino.levels.values.warn, + bar: pino.levels.values.warn - 1, // a little closer to INFO + }, + useOnlyCustomLevels: true, + level: 'bar', + }, + stream + ); + + (logger as any).foo('foomsg'); + rec = logRecords[logRecords.length - 1]; + assert.strictEqual(rec.severityNumber, SeverityNumber.WARN); + assert.strictEqual(rec.severityText, 'foo'); + + (logger as any).bar('barmsg'); + rec = logRecords[logRecords.length - 1]; + assert.strictEqual(rec.severityNumber, SeverityNumber.INFO4); + assert.strictEqual(rec.severityText, 'bar'); + }); + + // We use multistream internally to write to the OTel SDK. This test ensures + // that multistream wrapping of a multistream works. + it('edge case: multistream', () => { + const logRecords = memExporter.getFinishedLogRecords(); + + const stream2 = new Writable(); + stream2._write = () => {}; + const writeSpy2 = sinon.spy(stream2, 'write'); + + logger = pino( + {}, + pino.multistream([{ stream: stream }, { stream: stream2 }]) + ); + logger.info('using multistream'); + + const otelRec = logRecords[logRecords.length - 1]; + assert.equal(otelRec.body, 'using multistream'); + + sinon.assert.calledOnce(writeSpy); + const pinoRec = JSON.parse(writeSpy.firstCall.args[0].toString()); + assert.equal((pinoRec as any).msg, 'using multistream'); + + sinon.assert.calledOnce(writeSpy2); + const pinoRec2 = JSON.parse(writeSpy2.firstCall.args[0].toString()); + assert.equal((pinoRec2 as any).msg, 'using multistream'); + }); + + it('edge case: messageKey', () => { + logger = pino({ messageKey: 'mymsg' }, stream); + logger.info('using messageKey'); + + const logRecords = memExporter.getFinishedLogRecords(); + const otelRec = logRecords[logRecords.length - 1]; + assert.equal(otelRec.body, 'using messageKey'); + + sinon.assert.calledOnce(writeSpy); + const pinoRec = JSON.parse(writeSpy.firstCall.args[0].toString()); + assert.equal((pinoRec as any).mymsg, 'using messageKey'); + }); + }); + describe('ESM usage', () => { it('should work with ESM default import', async function () { let logRecords: any[];