From 28e2113ec1091e73a1d1b62b48fee8c01c72afee Mon Sep 17 00:00:00 2001 From: Michael Haberman Date: Wed, 17 Nov 2021 09:51:14 +0200 Subject: [PATCH] feat: config option to extract sqs context from message payload (#737) --- .../README.md | 1 + .../doc/sns.md | 12 +++ .../src/services/MessageAttributes.ts | 24 +++++ .../src/services/sqs.ts | 11 ++- .../src/types.ts | 11 +++ .../test/sqs.test.ts | 90 +++++++++++++++++++ 6 files changed, 147 insertions(+), 2 deletions(-) diff --git a/plugins/node/opentelemetry-instrumentation-aws-sdk/README.md b/plugins/node/opentelemetry-instrumentation-aws-sdk/README.md index fa6673fee0..b84fb36ee0 100644 --- a/plugins/node/opentelemetry-instrumentation-aws-sdk/README.md +++ b/plugins/node/opentelemetry-instrumentation-aws-sdk/README.md @@ -48,6 +48,7 @@ aws-sdk instrumentation has few options available to choose from. You can set th | `responseHook` | `AwsSdkResponseCustomAttributeFunction` | Hook for adding custom attributes when response is received from aws. | | `sqsProcessHook` | `AwsSdkSqsProcessCustomAttributeFunction` | Hook called after starting sqs `process` span (for each sqs received message), which allow to add custom attributes to it. | | `suppressInternalInstrumentation` | `boolean` | Most aws operation use http requests under the hood. Set this to `true` to hide all underlying http spans. | +| `sqsExtractContextPropagationFromPayload` | `boolean` | Will parse and extract context propagation headers from SQS Payload, false by default. [When should it be used?](./doc/sns.md#integration-with-sqs)| ## Span Attributes diff --git a/plugins/node/opentelemetry-instrumentation-aws-sdk/doc/sns.md b/plugins/node/opentelemetry-instrumentation-aws-sdk/doc/sns.md index 5c62318de3..f42d5ec2ef 100644 --- a/plugins/node/opentelemetry-instrumentation-aws-sdk/doc/sns.md +++ b/plugins/node/opentelemetry-instrumentation-aws-sdk/doc/sns.md @@ -13,3 +13,15 @@ The following methods are automatically enhanced: ### Consumers There are many potential consumers: SQS, Lambda, HTTP/S, Email, SMS, mobile notifications. each one of them will received the propagated context in its own way. + + +## Integration with SQS +AWS provide two ways of integrating SNS and SQS, one sends the message "as is" and one being parsed, this is called raw message delivery. + +When it is turn off (by default) message attributes (sent in SNS) will appear in the payload of SQS, if it turned on the payload will be parsed before sent to SQS and the SNS attributes will be mapped to SQS Message attribute which allow this instrumentation to have propagated context works out-of-the-box. + +If raw message delivery is turned off, you can solve it by enabling `sqsExtractContextPropagationFromPayload`, it will extract the context from the payload. It does have some performance affect as the instrumentation will run `JSON.parse` to get the data. + +More details about raw message deliver can be found in [AWS docs](https://docs.aws.amazon.com/sns/latest/dg/sns-large-payload-raw-message-delivery.html) + +>If you see partial / broken traces when integrating SNS with SQS this might be the reason \ No newline at end of file diff --git a/plugins/node/opentelemetry-instrumentation-aws-sdk/src/services/MessageAttributes.ts b/plugins/node/opentelemetry-instrumentation-aws-sdk/src/services/MessageAttributes.ts index 67b864daa1..064e550b5f 100644 --- a/plugins/node/opentelemetry-instrumentation-aws-sdk/src/services/MessageAttributes.ts +++ b/plugins/node/opentelemetry-instrumentation-aws-sdk/src/services/MessageAttributes.ts @@ -21,6 +21,7 @@ import { diag, } from '@opentelemetry/api'; import type { SQS, SNS } from 'aws-sdk'; +import type { MessageBodyAttributeMap } from 'aws-sdk/clients/sqs'; // https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-quotas.html export const MAX_MESSAGE_ATTRIBUTES = 10; @@ -73,3 +74,26 @@ export const injectPropagationContext = ( } return attributes; }; + +export const extractPropagationContext = ( + message: SQS.Message, + sqsExtractContextPropagationFromPayload: boolean | undefined +): MessageBodyAttributeMap | undefined => { + const propagationFields = propagation.fields(); + const hasPropagationFields = Object.keys( + message.MessageAttributes || [] + ).some(attr => propagationFields.includes(attr)); + if (hasPropagationFields) { + return message.MessageAttributes; + } else if (sqsExtractContextPropagationFromPayload && message.Body) { + try { + const payload = JSON.parse(message.Body); + return payload.MessageAttributes; + } catch { + diag.debug( + 'failed to parse SQS payload to extract context propagation, trace might be incomplete.' + ); + } + } + return undefined; +}; diff --git a/plugins/node/opentelemetry-instrumentation-aws-sdk/src/services/sqs.ts b/plugins/node/opentelemetry-instrumentation-aws-sdk/src/services/sqs.ts index d2c06cf640..506a4af29f 100644 --- a/plugins/node/opentelemetry-instrumentation-aws-sdk/src/services/sqs.ts +++ b/plugins/node/opentelemetry-instrumentation-aws-sdk/src/services/sqs.ts @@ -34,7 +34,11 @@ import { MessagingDestinationKindValues, SemanticAttributes, } from '@opentelemetry/semantic-conventions'; -import { contextGetter, injectPropagationContext } from './MessageAttributes'; +import { + contextGetter, + extractPropagationContext, + injectPropagationContext, +} from './MessageAttributes'; export class SqsServiceExtension implements ServiceExtension { requestPreSpanHook(request: NormalizedRequest): RequestMetadata { @@ -128,7 +132,10 @@ export class SqsServiceExtension implements ServiceExtension { name: queueName ?? 'unknown', parentContext: propagation.extract( ROOT_CONTEXT, - message.MessageAttributes, + extractPropagationContext( + message, + config.sqsExtractContextPropagationFromPayload + ), contextGetter ), attributes: { diff --git a/plugins/node/opentelemetry-instrumentation-aws-sdk/src/types.ts b/plugins/node/opentelemetry-instrumentation-aws-sdk/src/types.ts index c582603068..340be8deeb 100644 --- a/plugins/node/opentelemetry-instrumentation-aws-sdk/src/types.ts +++ b/plugins/node/opentelemetry-instrumentation-aws-sdk/src/types.ts @@ -80,4 +80,15 @@ export interface AwsSdkInstrumentationConfig extends InstrumentationConfig { * effectively causing those http spans to be non-recordable. */ suppressInternalInstrumentation?: boolean; + + /** + * In some cases the context propagation headers may be found in the message payload + * rather than the message attribute. + * When this field is turned on the instrumentation will parse the payload and extract the + * context from there. + * Even if the field is on and MessageAttribute contains context propagation field are present, + * the MessageAttribute will get priority. + * By default it is off. + */ + sqsExtractContextPropagationFromPayload?: boolean; } diff --git a/plugins/node/opentelemetry-instrumentation-aws-sdk/test/sqs.test.ts b/plugins/node/opentelemetry-instrumentation-aws-sdk/test/sqs.test.ts index 42a66f7db3..b35a0da305 100644 --- a/plugins/node/opentelemetry-instrumentation-aws-sdk/test/sqs.test.ts +++ b/plugins/node/opentelemetry-instrumentation-aws-sdk/test/sqs.test.ts @@ -36,12 +36,19 @@ import { ReadableSpan } from '@opentelemetry/sdk-trace-base'; import { mockV2AwsSend } from './testing-utils'; import { Message } from 'aws-sdk/clients/sqs'; import * as expect from 'expect'; +import * as sinon from 'sinon'; +import * as messageAttributes from '../src/services/MessageAttributes'; const responseMockSuccess = { requestId: '0000000000000', error: null, }; +const extractContextSpy = sinon.spy( + messageAttributes, + 'extractPropagationContext' +); + describe('SQS', () => { before(() => { AWS.config.credentials = { @@ -429,4 +436,87 @@ describe('SQS', () => { expect(processSpans[1].status.code).toStrictEqual(SpanStatusCode.UNSET); }); }); + + describe('extract payload', () => { + beforeEach(() => { + extractContextSpy.resetHistory(); + }); + it('should not extract from payload even if set', async () => { + mockV2AwsSend(responseMockSuccess, { + Messages: [{ Body: JSON.stringify({ traceparent: 1 }) }], + } as AWS.SQS.Types.ReceiveMessageResult); + + const sqs = new AWS.SQS(); + await sqs + .receiveMessage({ + QueueUrl: 'queue/url/for/unittests1', + }) + .promise(); + expect(extractContextSpy.returnValues[0]?.traceparent).toBeUndefined(); + }); + + it('should extract from payload', async () => { + const traceparent = { + traceparent: { + StringValue: + '00-a1d050b7c8ad93c405e7a0d94cda5b03-23a485dc98b24027-01', + DataType: 'String', + }, + }; + instrumentation.setConfig({ + sqsExtractContextPropagationFromPayload: true, + }); + mockV2AwsSend(responseMockSuccess, { + Messages: [ + { Body: JSON.stringify({ MessageAttributes: { traceparent } }) }, + ], + } as AWS.SQS.Types.ReceiveMessageResult); + + const sqs = new AWS.SQS(); + await sqs + .receiveMessage({ + QueueUrl: 'queue/url/for/unittests', + }) + .promise(); + + expect(extractContextSpy.returnValues[0]?.traceparent).toStrictEqual( + traceparent + ); + }); + + it('should not extract from payload but from attributes', async () => { + const traceparentInPayload = 'some-trace-parent-value'; + const traceparentInMessageAttributes = { + traceparent: { + StringValue: + '00-a1d050b7c8ad93c405e7a0d94cda5b03-23a485dc98b24027-01', + DataType: 'String', + }, + }; + instrumentation.setConfig({ + sqsExtractContextPropagationFromPayload: false, + }); + mockV2AwsSend(responseMockSuccess, { + Messages: [ + { + MessageAttributes: traceparentInMessageAttributes, + Body: JSON.stringify({ + MessageAttributes: { traceparentInPayload }, + }), + }, + ], + } as AWS.SQS.Types.ReceiveMessageResult); + + const sqs = new AWS.SQS(); + await sqs + .receiveMessage({ + QueueUrl: 'queue/url/for/unittests', + }) + .promise(); + + expect(extractContextSpy.returnValues[0]).toBe( + traceparentInMessageAttributes + ); + }); + }); });