Skip to content

Commit

Permalink
feat(pg): Patch client inside lib and lib/pg-native (#2563)
Browse files Browse the repository at this point in the history
  • Loading branch information
onurtemizkan authored Jan 20, 2025
1 parent bcf1da7 commit 5f214eb
Show file tree
Hide file tree
Showing 4 changed files with 172 additions and 32 deletions.
108 changes: 76 additions & 32 deletions plugins/node/opentelemetry-instrumentation-pg/src/instrumentation.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import {
InstrumentationBase,
InstrumentationNodeModuleDefinition,
safeExecuteInTheMiddle,
InstrumentationNodeModuleFile,
} from '@opentelemetry/instrumentation';
import {
context,
Expand Down Expand Up @@ -67,6 +68,12 @@ import {
ATTR_DB_OPERATION_NAME,
} from './semconv';

function extractModuleExports(module: any) {
return module[Symbol.toStringTag] === 'Module'
? module.default // ESM
: module; // CommonJS
}

export class PgInstrumentation extends InstrumentationBase<PgInstrumentationConfig> {
private _operationDuration!: Histogram;
private _connectionsCount!: UpDownCounter;
Expand Down Expand Up @@ -125,45 +132,38 @@ export class PgInstrumentation extends InstrumentationBase<PgInstrumentationConf
}

protected init() {
const modulePG = new InstrumentationNodeModuleDefinition(
'pg',
['>=8.0.3 <9'],
(module: any) => {
const moduleExports: typeof pgTypes =
module[Symbol.toStringTag] === 'Module'
? module.default // ESM
: module; // CommonJS
if (isWrapped(moduleExports.Client.prototype.query)) {
this._unwrap(moduleExports.Client.prototype, 'query');
}
const SUPPORTED_PG_VERSIONS = ['>=8.0.3 <9'];

if (isWrapped(moduleExports.Client.prototype.connect)) {
this._unwrap(moduleExports.Client.prototype, 'connect');
}
const modulePgNativeClient = new InstrumentationNodeModuleFile(
'pg/lib/native/client.js',
SUPPORTED_PG_VERSIONS,
this._patchPgClient.bind(this),
this._unpatchPgClient.bind(this)
);

this._wrap(
moduleExports.Client.prototype,
'query',
this._getClientQueryPatch() as any
);
const modulePgClient = new InstrumentationNodeModuleFile(
'pg/lib/client.js',
SUPPORTED_PG_VERSIONS,
this._patchPgClient.bind(this),
this._unpatchPgClient.bind(this)
);

this._wrap(
moduleExports.Client.prototype,
'connect',
this._getClientConnectPatch() as any
);
const modulePG = new InstrumentationNodeModuleDefinition(
'pg',
SUPPORTED_PG_VERSIONS,
(module: any) => {
const moduleExports = extractModuleExports(module);

this._patchPgClient(moduleExports.Client);
return module;
},
(module: any) => {
const moduleExports: typeof pgTypes =
module[Symbol.toStringTag] === 'Module'
? module.default // ESM
: module; // CommonJS
if (isWrapped(moduleExports.Client.prototype.query)) {
this._unwrap(moduleExports.Client.prototype, 'query');
}
}
const moduleExports = extractModuleExports(module);

this._unpatchPgClient(moduleExports.Client);
return module;
},
[modulePgClient, modulePgNativeClient]
);

const modulePGPool = new InstrumentationNodeModuleDefinition(
Expand All @@ -190,6 +190,50 @@ export class PgInstrumentation extends InstrumentationBase<PgInstrumentationConf
return [modulePG, modulePGPool];
}

private _patchPgClient(module: any) {
if (!module) {
return;
}

const moduleExports = extractModuleExports(module);

if (isWrapped(moduleExports.prototype.query)) {
this._unwrap(moduleExports.prototype, 'query');
}

if (isWrapped(moduleExports.prototype.connect)) {
this._unwrap(moduleExports.prototype, 'connect');
}

this._wrap(
moduleExports.prototype,
'query',
this._getClientQueryPatch() as any
);

this._wrap(
moduleExports.prototype,
'connect',
this._getClientConnectPatch() as any
);

return module;
}

private _unpatchPgClient(module: any) {
const moduleExports = extractModuleExports(module);

if (isWrapped(moduleExports.prototype.query)) {
this._unwrap(moduleExports.prototype, 'query');
}

if (isWrapped(moduleExports.prototype.connect)) {
this._unwrap(moduleExports.prototype, 'connect');
}

return module;
}

private _getClientConnectPatch() {
const plugin = this;
return (original: PgClientConnect) => {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Copyright The OpenTelemetry Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

// Use postgres from an ES module:
// node --experimental-loader=@opentelemetry/instrumentation/hook.mjs use-pg.mjs

import { trace } from '@opentelemetry/api';
import { createTestNodeSdk } from '@opentelemetry/contrib-test-utils';
import assert from 'assert';

import { PgInstrumentation } from '../../build/src/index.js';

const CONFIG = {
user: process.env.POSTGRES_USER || 'postgres',
password: process.env.POSTGRES_PASSWORD || 'postgres',
database: process.env.POSTGRES_DB || 'postgres',
host: process.env.POSTGRES_HOST || 'localhost',
port: process.env.POSTGRES_PORT
? parseInt(process.env.POSTGRES_PORT, 10)
: 54320,
};

const sdk = createTestNodeSdk({
serviceName: 'use-pg',
instrumentations: [new PgInstrumentation()],
});
sdk.start();

import pg from 'pg';
const client = new pg.Client(CONFIG);

await client.connect();

const tracer = trace.getTracer();

await tracer.startActiveSpan('test-span', async span => {
const res = await client.query('SELECT NOW()');

assert.ok(res);
span.end();

await client.end();
await sdk.shutdown();
});
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,10 @@ describe('pg-pool', () => {
function create(config: PgInstrumentationConfig = {}) {
instrumentation.setConfig(config);
instrumentation.enable();

// Disable and enable the instrumentation to visit unwrap calls
instrumentation.disable();
instrumentation.enable();
}

let pool: pgPool<pg.Client>;
Expand Down
35 changes: 35 additions & 0 deletions plugins/node/opentelemetry-instrumentation-pg/test/pg.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,10 @@ describe('pg', () => {
function create(config: PgInstrumentationConfig = {}) {
instrumentation.setConfig(config);
instrumentation.enable();

// Disable and enable the instrumentation to visit unwrap calls
instrumentation.disable();
instrumentation.enable();
}

let postgres: typeof pg;
Expand Down Expand Up @@ -152,13 +156,15 @@ describe('pg', () => {

postgres = require('pg');
client = new postgres.Client(CONFIG);

await client.connect();
});

after(async () => {
if (testPostgresLocally) {
testUtils.cleanUpDocker('postgres');
}

await client.end();
});

Expand Down Expand Up @@ -1087,3 +1093,32 @@ describe('pg', () => {
});
});
});

describe('pg (ESM)', () => {
it('should work with ESM usage', async () => {
await testUtils.runTestFixture({
cwd: __dirname,
argv: ['fixtures/use-pg.mjs'],
env: {
NODE_OPTIONS:
'--experimental-loader=@opentelemetry/instrumentation/hook.mjs',
NODE_NO_WARNINGS: '1',
},
checkResult: (err, stdout, stderr) => {
assert.ifError(err);
},
checkCollector: (collector: testUtils.TestCollector) => {
const spans = collector.sortedSpans;

assert.strictEqual(spans.length, 3);

assert.strictEqual(spans[0].name, 'pg.connect');
assert.strictEqual(spans[0].kind, 3);
assert.strictEqual(spans[1].name, 'test-span');
assert.strictEqual(spans[1].kind, 1);
assert.strictEqual(spans[2].name, 'pg.query:SELECT otel_pg_database');
assert.strictEqual(spans[2].kind, 3);
},
});
});
});

0 comments on commit 5f214eb

Please sign in to comment.