Skip to content

Commit

Permalink
Merge branch 'main' into x_ray_case_insensitive
Browse files Browse the repository at this point in the history
  • Loading branch information
blumamir authored Feb 7, 2023
2 parents 2c94b0c + 52136d8 commit 85ce208
Show file tree
Hide file tree
Showing 3 changed files with 285 additions and 43 deletions.
4 changes: 2 additions & 2 deletions plugins/node/instrumentation-amqplib/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -48,12 +48,12 @@
"dependencies": {
"@opentelemetry/core": "^1.8.0",
"@opentelemetry/instrumentation": "^0.35.1",
"@opentelemetry/semantic-conventions": "^1.0.0",
"@types/amqplib": "^0.5.17"
"@opentelemetry/semantic-conventions": "^1.0.0"
},
"devDependencies": {
"@opentelemetry/api": "^1.3.0",
"@opentelemetry/contrib-test-utils": "^0.33.0",
"@types/amqplib": "^0.5.17",
"@types/lodash": "4.14.178",
"@types/mocha": "8.2.3",
"@types/sinon": "10.0.2",
Expand Down
75 changes: 38 additions & 37 deletions plugins/node/instrumentation-amqplib/src/amqplib.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ import {
} from '@opentelemetry/core';
import {
InstrumentationBase,
InstrumentationModuleDefinition,
InstrumentationNodeModuleDefinition,
InstrumentationNodeModuleFile,
isWrapped,
Expand All @@ -41,11 +40,15 @@ import {
MessagingOperationValues,
MessagingDestinationKindValues,
} from '@opentelemetry/semantic-conventions';
import type * as amqp from 'amqplib';
import {
AmqplibInstrumentationConfig,
Connection,
ConsumeMessage,
DEFAULT_CONFIG,
EndOperation,
Message,
Options,
Replies,
} from './types';
import {
CHANNEL_CONSUME_TIMEOUT_TIMER,
Expand All @@ -64,7 +67,7 @@ import {
} from './utils';
import { VERSION } from './version';

export class AmqplibInstrumentation extends InstrumentationBase<typeof amqp> {
export class AmqplibInstrumentation extends InstrumentationBase {
protected override _config!: AmqplibInstrumentationConfig;

constructor(config?: AmqplibInstrumentationConfig) {
Expand All @@ -79,31 +82,29 @@ export class AmqplibInstrumentation extends InstrumentationBase<typeof amqp> {
this._config = Object.assign({}, DEFAULT_CONFIG, config);
}

protected init(): InstrumentationModuleDefinition<typeof amqp> {
const channelModelModuleFile =
new InstrumentationNodeModuleFile<amqp.Channel>(
'amqplib/lib/channel_model.js',
['>=0.5.5'],
this.patchChannelModel.bind(this),
this.unpatchChannelModel.bind(this)
);
protected init() {
const channelModelModuleFile = new InstrumentationNodeModuleFile(
'amqplib/lib/channel_model.js',
['>=0.5.5'],
this.patchChannelModel.bind(this),
this.unpatchChannelModel.bind(this)
);

const callbackModelModuleFile =
new InstrumentationNodeModuleFile<amqp.Channel>(
'amqplib/lib/callback_model.js',
['>=0.5.5'],
this.patchChannelModel.bind(this),
this.unpatchChannelModel.bind(this)
);
const callbackModelModuleFile = new InstrumentationNodeModuleFile(
'amqplib/lib/callback_model.js',
['>=0.5.5'],
this.patchChannelModel.bind(this),
this.unpatchChannelModel.bind(this)
);

const connectModuleFile = new InstrumentationNodeModuleFile<amqp.Channel>(
const connectModuleFile = new InstrumentationNodeModuleFile(
'amqplib/lib/connect.js',
['>=0.5.5'],
this.patchConnect.bind(this),
this.unpatchConnect.bind(this)
);

const module = new InstrumentationNodeModuleDefinition<typeof amqp>(
const module = new InstrumentationNodeModuleDefinition(
'amqplib',
['>=0.5.5'],
undefined,
Expand Down Expand Up @@ -231,22 +232,22 @@ export class AmqplibInstrumentation extends InstrumentationBase<typeof amqp> {

private getConnectPatch(
original: (
url: string | amqp.Options.Connect,
url: string | Options.Connect,
socketOptions: any,
openCallback: (err: any, connection: amqp.Connection) => void
) => amqp.Connection
openCallback: (err: any, connection: Connection) => void
) => Connection
) {
return function patchedConnect(
this: unknown,
url: string | amqp.Options.Connect,
url: string | Options.Connect,
socketOptions: any,
openCallback: Function
) {
return original.call(
this,
url,
socketOptions,
function (this: unknown, err, conn: amqp.Connection) {
function (this: unknown, err, conn: Connection) {
if (err == null) {
const urlAttributes = getConnectionAttributesFromUrl(url);
// the type of conn in @types/amqplib is amqp.Connection, but in practice the library send the
Expand Down Expand Up @@ -321,7 +322,7 @@ export class AmqplibInstrumentation extends InstrumentationBase<typeof amqp> {
const self = this;
return function ack(
this: InstrumentationConsumeChannel,
message: amqp.Message,
message: Message,
allUpToOrRequeue?: boolean,
requeue?: boolean
): void {
Expand All @@ -330,7 +331,7 @@ export class AmqplibInstrumentation extends InstrumentationBase<typeof amqp> {
const requeueResolved =
endOperation === EndOperation.Reject ? allUpToOrRequeue : requeue;

const spansNotEnded: { msg: amqp.Message }[] =
const spansNotEnded: { msg: Message }[] =
channel[CHANNEL_SPANS_NOT_ENDED] ?? [];
const msgIndex = spansNotEnded.findIndex(
msgDetails => msgDetails.msg === message
Expand Down Expand Up @@ -375,9 +376,9 @@ export class AmqplibInstrumentation extends InstrumentationBase<typeof amqp> {
return function consume(
this: InstrumentationConsumeChannel,
queue: string,
onMessage: (msg: amqp.ConsumeMessage | null) => void,
options?: amqp.Options.Consume
): Promise<amqp.Replies.Consume> {
onMessage: (msg: ConsumeMessage | null) => void,
options?: Options.Consume
): Promise<Replies.Consume> {
const channel = this;
if (
!Object.prototype.hasOwnProperty.call(channel, CHANNEL_SPANS_NOT_ENDED)
Expand Down Expand Up @@ -475,8 +476,8 @@ export class AmqplibInstrumentation extends InstrumentationBase<typeof amqp> {
exchange: string,
routingKey: string,
content: Buffer,
options?: amqp.Options.Publish,
callback?: (err: any, ok: amqp.Replies.Empty) => void
options?: Options.Publish,
callback?: (err: any, ok: Replies.Empty) => void
): boolean {
const channel = this;
const { span, modifiedOptions } = self.createPublishSpan(
Expand Down Expand Up @@ -510,7 +511,7 @@ export class AmqplibInstrumentation extends InstrumentationBase<typeof amqp> {
const patchedOnConfirm = function (
this: unknown,
err: any,
ok: amqp.Replies.Empty
ok: Replies.Empty
) {
try {
callback?.call(this, err, ok);
Expand Down Expand Up @@ -572,7 +573,7 @@ export class AmqplibInstrumentation extends InstrumentationBase<typeof amqp> {
exchange: string,
routingKey: string,
content: Buffer,
options?: amqp.Options.Publish
options?: Options.Publish
): boolean {
if (isConfirmChannelTracing(context.active())) {
// work already done
Expand Down Expand Up @@ -623,7 +624,7 @@ export class AmqplibInstrumentation extends InstrumentationBase<typeof amqp> {
exchange: string,
routingKey: string,
channel: InstrumentationPublishChannel,
options?: amqp.Options.Publish
options?: Options.Publish
) {
const normalizedExchange = normalizeExchange(exchange);

Expand Down Expand Up @@ -689,7 +690,7 @@ export class AmqplibInstrumentation extends InstrumentationBase<typeof amqp> {
operation: EndOperation,
requeue: boolean | undefined
) {
const spansNotEnded: { msg: amqp.Message }[] =
const spansNotEnded: { msg: Message }[] =
channel[CHANNEL_SPANS_NOT_ENDED] ?? [];
spansNotEnded.forEach(msgDetails => {
this.endConsumerSpan(msgDetails.msg, isRejected, operation, requeue);
Expand All @@ -699,7 +700,7 @@ export class AmqplibInstrumentation extends InstrumentationBase<typeof amqp> {

private callConsumeEndHook(
span: Span,
msg: amqp.ConsumeMessage,
msg: ConsumeMessage,
rejected: boolean | null,
endOperation: EndOperation
) {
Expand Down
Loading

0 comments on commit 85ce208

Please sign in to comment.