Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(instrumentation-aws-sdk): sqs message id missing on send command #968

Merged
merged 9 commits into from
May 3, 2022
Original file line number Diff line number Diff line change
Expand Up @@ -121,45 +121,61 @@ export class SqsServiceExtension implements ServiceExtension {
tracer: Tracer,
config: AwsSdkInstrumentationConfig
) => {
const messages: SQS.Message[] = response?.data?.Messages;
if (messages) {
const queueUrl = this.extractQueueUrl(response.request.commandInput);
const queueName = this.extractQueueNameFromUrl(queueUrl);

pubsubPropagation.patchMessagesArrayToStartProcessSpans<SQS.Message>({
messages,
parentContext: trace.setSpan(context.active(), span),
tracer,
messageToSpanDetails: (message: SQS.Message) => ({
name: queueName ?? 'unknown',
parentContext: propagation.extract(
ROOT_CONTEXT,
extractPropagationContext(
message,
config.sqsExtractContextPropagationFromPayload
),
contextGetter
),
attributes: {
[SemanticAttributes.MESSAGING_SYSTEM]: 'aws.sqs',
[SemanticAttributes.MESSAGING_DESTINATION]: queueName,
[SemanticAttributes.MESSAGING_DESTINATION_KIND]:
MessagingDestinationKindValues.QUEUE,
[SemanticAttributes.MESSAGING_MESSAGE_ID]: message.MessageId,
[SemanticAttributes.MESSAGING_URL]: queueUrl,
[SemanticAttributes.MESSAGING_OPERATION]:
MessagingOperationValues.PROCESS,
},
}),
processHook: (span: Span, message: SQS.Message) =>
config.sqsProcessHook?.(span, { message }),
});

pubsubPropagation.patchArrayForProcessSpans(
messages,
tracer,
context.active()
);
switch (response.request.commandName) {
case 'SendMessage':
span.setAttribute(
SemanticAttributes.MESSAGING_MESSAGE_ID,
response?.data?.MessageId
);
break;

case 'SendMessageBatch':
// TODO: How should this be handled?
break;

case 'ReceiveMessage': {
const messages: SQS.Message[] = response?.data?.Messages;
if (messages) {
const queueUrl = this.extractQueueUrl(response.request.commandInput);
const queueName = this.extractQueueNameFromUrl(queueUrl);

pubsubPropagation.patchMessagesArrayToStartProcessSpans<SQS.Message>({
messages,
parentContext: trace.setSpan(context.active(), span),
tracer,
messageToSpanDetails: (message: SQS.Message) => ({
name: queueName ?? 'unknown',
parentContext: propagation.extract(
ROOT_CONTEXT,
extractPropagationContext(
message,
config.sqsExtractContextPropagationFromPayload
),
contextGetter
),
attributes: {
[SemanticAttributes.MESSAGING_SYSTEM]: 'aws.sqs',
[SemanticAttributes.MESSAGING_DESTINATION]: queueName,
[SemanticAttributes.MESSAGING_DESTINATION_KIND]:
MessagingDestinationKindValues.QUEUE,
[SemanticAttributes.MESSAGING_MESSAGE_ID]: message.MessageId,
[SemanticAttributes.MESSAGING_URL]: queueUrl,
[SemanticAttributes.MESSAGING_OPERATION]:
MessagingOperationValues.PROCESS,
},
}),
processHook: (span: Span, message: SQS.Message) =>
config.sqsProcessHook?.(span, { message }),
});

pubsubPropagation.patchArrayForProcessSpans(
messages,
tracer,
context.active()
);
}
break;
}
}
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,7 @@ describe('instrumentation-aws-sdk-v3', () => {
'https://sqs.us-east-1.amazonaws.com/731241200085/otel-demo-aws-sdk',
MessageBody: 'payload example from v3 without batch',
};
await sqsClient.sendMessage(params);
const response = await sqsClient.sendMessage(params);
expect(getTestSpans().length).toBe(1);
const [span] = getTestSpans();

Expand All @@ -320,6 +320,9 @@ describe('instrumentation-aws-sdk-v3', () => {
expect(span.attributes[SemanticAttributes.MESSAGING_URL]).toEqual(
params.QueueUrl
);
expect(
span.attributes[SemanticAttributes.MESSAGING_MESSAGE_ID]
).toEqual(response.MessageId);
expect(span.attributes[SemanticAttributes.HTTP_STATUS_CODE]).toEqual(
200
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import * as AWS from 'aws-sdk';
import { AWSError } from 'aws-sdk';

import {
MessagingDestinationKindValues,
MessagingOperationValues,
SemanticAttributes,
} from '@opentelemetry/semantic-conventions';
Expand All @@ -41,6 +42,7 @@ import { Message } from 'aws-sdk/clients/sqs';
import * as expect from 'expect';
import * as sinon from 'sinon';
import * as messageAttributes from '../src/services/MessageAttributes';
import { AttributeNames } from '../src/enums';

const responseMockSuccess = {
requestId: '0000000000000',
Expand Down Expand Up @@ -361,6 +363,49 @@ describe('SQS', () => {
});

describe('hooks', () => {
it('sqsResponseHook add messaging attributes', async () => {
blumamir marked this conversation as resolved.
Show resolved Hide resolved
const region = 'us-east-1';
const sqs = new AWS.SQS();
sqs.config.update({ region });

const QueueName = 'unittest';
const params = {
QueueUrl: `queue/url/for/${QueueName}`,
MessageBody: 'payload example from v2 without batch',
};

const response = await sqs.sendMessage(params).promise();

expect(getTestSpans().length).toBe(1);
const [span] = getTestSpans();

// make sure we have the general aws attributes:
expect(span.attributes[SemanticAttributes.RPC_SYSTEM]).toEqual('aws-api');
expect(span.attributes[SemanticAttributes.RPC_METHOD]).toEqual(
'SendMessage'
);
expect(span.attributes[SemanticAttributes.RPC_SERVICE]).toEqual('SQS');
expect(span.attributes[AttributeNames.AWS_REGION]).toEqual(region);

// custom messaging attributes
expect(span.attributes[SemanticAttributes.MESSAGING_SYSTEM]).toEqual(
'aws.sqs'
);
expect(
span.attributes[SemanticAttributes.MESSAGING_DESTINATION_KIND]
).toEqual(MessagingDestinationKindValues.QUEUE);
expect(span.attributes[SemanticAttributes.MESSAGING_DESTINATION]).toEqual(
QueueName
);
expect(span.attributes[SemanticAttributes.MESSAGING_URL]).toEqual(
params.QueueUrl
);
expect(span.attributes[SemanticAttributes.MESSAGING_MESSAGE_ID]).toEqual(
response.MessageId
);
// expect(span.attributes[SemanticAttributes.HTTP_STATUS_CODE]).toEqual(200);
blumamir marked this conversation as resolved.
Show resolved Hide resolved
});

it('sqsProcessHook called and add message attribute to span', async () => {
const config = {
sqsProcessHook: (
Expand Down