Skip to content

Commit

Permalink
feat(pg): add requireParentSpan option (#1199)
Browse files Browse the repository at this point in the history
* feat(pg): add requireParentTrace option

* Remove accidental console.log

* Fix up config type casts

* Split tests into corresponding blocks

* Add tests for new util

* Update config type docstring

Co-authored-by: Henri Normak <henri.normak@gmail.com>

* Fix lint errors

Co-authored-by: Daniel Dyla <dyladan@users.noreply.github.com>
Co-authored-by: Henri Normak <henri.normak@gmail.com>
Co-authored-by: Rauno Viskus <Rauno56@users.noreply.github.com>
  • Loading branch information
4 people authored Oct 4, 2022
1 parent 5da46ef commit a6f054d
Show file tree
Hide file tree
Showing 6 changed files with 178 additions and 57 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,7 @@ import {
InstrumentationNodeModuleDefinition,
} from '@opentelemetry/instrumentation';

import {
context,
diag,
trace,
Span,
SpanKind,
SpanStatusCode,
} from '@opentelemetry/api';
import { context, diag, trace, Span, SpanStatusCode } from '@opentelemetry/api';
import * as pgTypes from 'pg';
import * as pgPoolTypes from 'pg-pool';
import {
Expand All @@ -46,6 +39,7 @@ import {
DbSystemValues,
} from '@opentelemetry/semantic-conventions';
import { VERSION } from './version';
import { startSpan } from './utils';

const PG_POOL_COMPONENT = 'pg-pool';

Expand Down Expand Up @@ -122,26 +116,33 @@ export class PgInstrumentation extends InstrumentationBase {
return [modulePG, modulePGPool];
}

override setConfig(config: PgInstrumentationConfig = {}) {
this._config = Object.assign({}, config);
}

override getConfig(): PgInstrumentationConfig {
return this._config as PgInstrumentationConfig;
}

private _getClientConnectPatch() {
const plugin = this;
return (original: PgClientConnect) => {
return function connect(
this: pgTypes.Client,
callback?: PgErrorCallback
) {
const span = plugin.tracer.startSpan(
const span = startSpan(
plugin.tracer,
plugin.getConfig(),
`${PgInstrumentation.COMPONENT}.connect`,
{
kind: SpanKind.CLIENT,
attributes: {
[SemanticAttributes.DB_SYSTEM]: DbSystemValues.POSTGRESQL,
[SemanticAttributes.DB_NAME]: this.database,
[SemanticAttributes.NET_PEER_NAME]: this.host,
[SemanticAttributes.DB_CONNECTION_STRING]:
utils.getConnectionString(this),
[SemanticAttributes.NET_PEER_PORT]: this.port,
[SemanticAttributes.DB_USER]: this.user,
},
[SemanticAttributes.DB_SYSTEM]: DbSystemValues.POSTGRESQL,
[SemanticAttributes.DB_NAME]: this.database,
[SemanticAttributes.NET_PEER_NAME]: this.host,
[SemanticAttributes.DB_CONNECTION_STRING]:
utils.getConnectionString(this),
[SemanticAttributes.NET_PEER_PORT]: this.port,
[SemanticAttributes.DB_USER]: this.user,
}
);

Expand Down Expand Up @@ -182,25 +183,31 @@ export class PgInstrumentation extends InstrumentationBase {
span = utils.handleParameterizedQuery.call(
this,
plugin.tracer,
plugin.getConfig() as PgInstrumentationConfig,
plugin.getConfig(),
query,
params
);
} else {
span = utils.handleTextQuery.call(this, plugin.tracer, query);
span = utils.handleTextQuery.call(
this,
plugin.tracer,
plugin.getConfig(),
query
);
}
} else if (typeof args[0] === 'object') {
const queryConfig = args[0] as NormalizedQueryConfig;
span = utils.handleConfigQuery.call(
this,
plugin.tracer,
plugin.getConfig() as PgInstrumentationConfig,
plugin.getConfig(),
queryConfig
);
} else {
return utils.handleInvalidQuery.call(
this,
plugin.tracer,
plugin.getConfig(),
original,
...args
);
Expand All @@ -212,7 +219,7 @@ export class PgInstrumentation extends InstrumentationBase {
if (typeof args[args.length - 1] === 'function') {
// Patch ParameterQuery callback
args[args.length - 1] = utils.patchCallback(
plugin.getConfig() as PgInstrumentationConfig,
plugin.getConfig(),
span,
args[args.length - 1] as PostgresCallback
);
Expand All @@ -228,7 +235,7 @@ export class PgInstrumentation extends InstrumentationBase {
) {
// Patch ConfigQuery callback
let callback = utils.patchCallback(
plugin.getConfig() as PgInstrumentationConfig,
plugin.getConfig(),
span,
(args[0] as NormalizedQueryConfig).callback!
);
Expand All @@ -252,11 +259,7 @@ export class PgInstrumentation extends InstrumentationBase {
.then((result: unknown) => {
// Return a pass-along promise which ends the span and then goes to user's orig resolvers
return new Promise(resolve => {
utils.handleExecutionResult(
plugin.getConfig() as PgInstrumentationConfig,
span,
result
);
utils.handleExecutionResult(plugin.getConfig(), span, result);
span.end();
resolve(result);
});
Expand Down Expand Up @@ -285,9 +288,11 @@ export class PgInstrumentation extends InstrumentationBase {
return function connect(this: PgPoolExtended, callback?: PgPoolCallback) {
const connString = utils.getConnectionString(this.options);
// setup span
const span = plugin.tracer.startSpan(`${PG_POOL_COMPONENT}.connect`, {
kind: SpanKind.CLIENT,
attributes: {
const span = startSpan(
plugin.tracer,
plugin.getConfig(),
`${PG_POOL_COMPONENT}.connect`,
{
[SemanticAttributes.DB_SYSTEM]: DbSystemValues.POSTGRESQL,
[SemanticAttributes.DB_NAME]: this.options.database, // required
[SemanticAttributes.NET_PEER_NAME]: this.options.host, // required
Expand All @@ -297,8 +302,8 @@ export class PgInstrumentation extends InstrumentationBase {
[AttributeNames.IDLE_TIMEOUT_MILLIS]:
this.options.idleTimeoutMillis,
[AttributeNames.MAX_CLIENT]: this.options.maxClient,
},
});
}
);

if (callback) {
const parentSpan = trace.getSpan(context.active());
Expand Down
7 changes: 7 additions & 0 deletions plugins/node/opentelemetry-instrumentation-pg/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,13 @@ export interface PgInstrumentationConfig extends InstrumentationConfig {
* @default undefined
*/
responseHook?: PgInstrumentationExecutionResponseHook;

/**
* If true, requires a parent span to create new spans.
*
* @default false
*/
requireParentSpan?: boolean;
}

export type PostgresCallback = (err: Error, res: object) => unknown;
Expand Down
63 changes: 48 additions & 15 deletions plugins/node/opentelemetry-instrumentation-pg/src/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,15 @@
*/

import {
context,
trace,
Span,
SpanStatusCode,
Tracer,
SpanKind,
diag,
INVALID_SPAN_CONTEXT,
Attributes,
} from '@opentelemetry/api';
import { AttributeNames } from './enums/AttributeNames';
import {
Expand Down Expand Up @@ -58,19 +62,41 @@ export function getConnectionString(params: PgClientConnectionParams) {
return `postgresql://${host}:${port}/${database}`;
}

// Private helper function to start a span
function pgStartSpan(tracer: Tracer, client: PgClientExtended, name: string) {
const jdbcString = getConnectionString(client.connectionParameters);
export function startSpan(
tracer: Tracer,
instrumentationConfig: PgInstrumentationConfig,
name: string,
attributes: Attributes
): Span {
// If a parent span is required but not present, use a noop span to propagate
// context without recording it. Adapted from opentelemetry-instrumentation-http:
// https://github.com/open-telemetry/opentelemetry-js/blob/597ea98e58a4f68bcd9aec5fd283852efe444cd6/experimental/packages/opentelemetry-instrumentation-http/src/http.ts#L660
const currentSpan = trace.getSpan(context.active());
if (instrumentationConfig.requireParentSpan && currentSpan === undefined) {
return trace.wrapSpanContext(INVALID_SPAN_CONTEXT);
}

return tracer.startSpan(name, {
kind: SpanKind.CLIENT,
attributes: {
[SemanticAttributes.DB_NAME]: client.connectionParameters.database, // required
[SemanticAttributes.DB_SYSTEM]: DbSystemValues.POSTGRESQL, // required
[SemanticAttributes.DB_CONNECTION_STRING]: jdbcString, // required
[SemanticAttributes.NET_PEER_NAME]: client.connectionParameters.host, // required
[SemanticAttributes.NET_PEER_PORT]: client.connectionParameters.port,
[SemanticAttributes.DB_USER]: client.connectionParameters.user,
},
attributes,
});
}

// Private helper function to start a span after a connection has been established
function startQuerySpan(
client: PgClientExtended,
tracer: Tracer,
instrumentationConfig: PgInstrumentationConfig,
name: string
) {
const jdbcString = getConnectionString(client.connectionParameters);
return startSpan(tracer, instrumentationConfig, name, {
[SemanticAttributes.DB_NAME]: client.connectionParameters.database, // required
[SemanticAttributes.DB_SYSTEM]: DbSystemValues.POSTGRESQL, // required
[SemanticAttributes.DB_CONNECTION_STRING]: jdbcString, // required
[SemanticAttributes.NET_PEER_NAME]: client.connectionParameters.host, // required
[SemanticAttributes.NET_PEER_PORT]: client.connectionParameters.port,
[SemanticAttributes.DB_USER]: client.connectionParameters.user,
});
}

Expand All @@ -84,7 +110,7 @@ export function handleConfigQuery(
// Set child span name
const queryCommand = getCommandFromText(queryConfig.name || queryConfig.text);
const name = PgInstrumentation.BASE_SPAN_NAME + ':' + queryCommand;
const span = pgStartSpan(tracer, this, name);
const span = startQuerySpan(this, tracer, instrumentationConfig, name);

// Set attributes
if (queryConfig.text) {
Expand Down Expand Up @@ -118,7 +144,7 @@ export function handleParameterizedQuery(
// Set child span name
const queryCommand = getCommandFromText(query);
const name = PgInstrumentation.BASE_SPAN_NAME + ':' + queryCommand;
const span = pgStartSpan(tracer, this, name);
const span = startQuerySpan(this, tracer, instrumentationConfig, name);

// Set attributes
span.setAttribute(SemanticAttributes.DB_STATEMENT, query);
Expand All @@ -133,12 +159,13 @@ export function handleParameterizedQuery(
export function handleTextQuery(
this: PgClientExtended,
tracer: Tracer,
instrumentationConfig: PgInstrumentationConfig,
query: string
) {
// Set child span name
const queryCommand = getCommandFromText(query);
const name = PgInstrumentation.BASE_SPAN_NAME + ':' + queryCommand;
const span = pgStartSpan(tracer, this, name);
const span = startQuerySpan(this, tracer, instrumentationConfig, name);

// Set attributes
span.setAttribute(SemanticAttributes.DB_STATEMENT, query);
Expand All @@ -153,11 +180,17 @@ export function handleTextQuery(
export function handleInvalidQuery(
this: PgClientExtended,
tracer: Tracer,
instrumentationConfig: PgInstrumentationConfig,
originalQuery: typeof pgTypes.Client.prototype.query,
...args: unknown[]
) {
let result;
const span = pgStartSpan(tracer, this, PgInstrumentation.BASE_SPAN_NAME);
const span = startQuerySpan(
this,
tracer,
instrumentationConfig,
PgInstrumentation.BASE_SPAN_NAME
);
try {
result = originalQuery.apply(this, args as never);
} catch (e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -340,7 +340,7 @@ describe('pg-pool', () => {
};

beforeEach(async () => {
const config: PgInstrumentationConfig = {
create({
enhancedDatabaseReporting: true,
responseHook: (
span: Span,
Expand All @@ -350,9 +350,7 @@ describe('pg-pool', () => {
dataAttributeName,
JSON.stringify({ rowCount: responseInfo?.data.rowCount })
),
};

create(config);
});
});

it('should attach response hook data to resulting spans for query with callback ', done => {
Expand Down
29 changes: 26 additions & 3 deletions plugins/node/opentelemetry-instrumentation-pg/test/pg.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,16 @@ describe('pg', () => {
testUtils.assertPropagation(connectSpan, span);
});
});

it('should not generate traces when requireParentSpan=true is specified', async () => {
instrumentation.setConfig({
requireParentSpan: true,
});
memoryExporter.reset();
await connClient.connect();
const spans = memoryExporter.getFinishedSpans();
assert.strictEqual(spans.length, 0);
});
});

describe('#client.query(...)', () => {
Expand Down Expand Up @@ -482,7 +492,7 @@ describe('pg', () => {
[dataAttributeName]: '{"rowCount":1}',
};
beforeEach(async () => {
const config: PgInstrumentationConfig = {
create({
enhancedDatabaseReporting: true,
responseHook: (
span: Span,
Expand All @@ -492,8 +502,7 @@ describe('pg', () => {
dataAttributeName,
JSON.stringify({ rowCount: responseInfo?.data.rowCount })
),
};
create(config);
});
});

it('should attach response hook data to resulting spans for query with callback ', done => {
Expand Down Expand Up @@ -639,5 +648,19 @@ describe('pg', () => {
client.query('SELECT NOW()').then(queryHandler);
});
});

it('should not generate traces for client.query() when requireParentSpan=true is specified', done => {
instrumentation.setConfig({
requireParentSpan: true,
});
memoryExporter.reset();
client.query('SELECT NOW()', (err, res) => {
assert.strictEqual(err, null);
assert.ok(res);
const spans = memoryExporter.getFinishedSpans();
assert.strictEqual(spans.length, 0);
done();
});
});
});
});
Loading

0 comments on commit a6f054d

Please sign in to comment.