Skip to content

Commit

Permalink
feat: config option to extract sqs context from message payload (#737)
Browse files Browse the repository at this point in the history
  • Loading branch information
habmic authored Nov 17, 2021
1 parent 82ebc49 commit 28e2113
Show file tree
Hide file tree
Showing 6 changed files with 147 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ aws-sdk instrumentation has few options available to choose from. You can set th
| `responseHook` | `AwsSdkResponseCustomAttributeFunction` | Hook for adding custom attributes when response is received from aws. |
| `sqsProcessHook` | `AwsSdkSqsProcessCustomAttributeFunction` | Hook called after starting sqs `process` span (for each sqs received message), which allow to add custom attributes to it. |
| `suppressInternalInstrumentation` | `boolean` | Most aws operation use http requests under the hood. Set this to `true` to hide all underlying http spans. |
| `sqsExtractContextPropagationFromPayload` | `boolean` | Will parse and extract context propagation headers from SQS Payload, false by default. [When should it be used?](./doc/sns.md#integration-with-sqs)|

## Span Attributes

Expand Down
12 changes: 12 additions & 0 deletions plugins/node/opentelemetry-instrumentation-aws-sdk/doc/sns.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,15 @@ The following methods are automatically enhanced:

### Consumers
There are many potential consumers: SQS, Lambda, HTTP/S, Email, SMS, mobile notifications. each one of them will received the propagated context in its own way.


## Integration with SQS
AWS provide two ways of integrating SNS and SQS, one sends the message "as is" and one being parsed, this is called raw message delivery.

When it is turn off (by default) message attributes (sent in SNS) will appear in the payload of SQS, if it turned on the payload will be parsed before sent to SQS and the SNS attributes will be mapped to SQS Message attribute which allow this instrumentation to have propagated context works out-of-the-box.

If raw message delivery is turned off, you can solve it by enabling `sqsExtractContextPropagationFromPayload`, it will extract the context from the payload. It does have some performance affect as the instrumentation will run `JSON.parse` to get the data.

More details about raw message deliver can be found in [AWS docs](https://docs.aws.amazon.com/sns/latest/dg/sns-large-payload-raw-message-delivery.html)

>If you see partial / broken traces when integrating SNS with SQS this might be the reason
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import {
diag,
} from '@opentelemetry/api';
import type { SQS, SNS } from 'aws-sdk';
import type { MessageBodyAttributeMap } from 'aws-sdk/clients/sqs';

// https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-quotas.html
export const MAX_MESSAGE_ATTRIBUTES = 10;
Expand Down Expand Up @@ -73,3 +74,26 @@ export const injectPropagationContext = (
}
return attributes;
};

export const extractPropagationContext = (
message: SQS.Message,
sqsExtractContextPropagationFromPayload: boolean | undefined
): MessageBodyAttributeMap | undefined => {
const propagationFields = propagation.fields();
const hasPropagationFields = Object.keys(
message.MessageAttributes || []
).some(attr => propagationFields.includes(attr));
if (hasPropagationFields) {
return message.MessageAttributes;
} else if (sqsExtractContextPropagationFromPayload && message.Body) {
try {
const payload = JSON.parse(message.Body);
return payload.MessageAttributes;
} catch {
diag.debug(
'failed to parse SQS payload to extract context propagation, trace might be incomplete.'
);
}
}
return undefined;
};
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,11 @@ import {
MessagingDestinationKindValues,
SemanticAttributes,
} from '@opentelemetry/semantic-conventions';
import { contextGetter, injectPropagationContext } from './MessageAttributes';
import {
contextGetter,
extractPropagationContext,
injectPropagationContext,
} from './MessageAttributes';

export class SqsServiceExtension implements ServiceExtension {
requestPreSpanHook(request: NormalizedRequest): RequestMetadata {
Expand Down Expand Up @@ -128,7 +132,10 @@ export class SqsServiceExtension implements ServiceExtension {
name: queueName ?? 'unknown',
parentContext: propagation.extract(
ROOT_CONTEXT,
message.MessageAttributes,
extractPropagationContext(
message,
config.sqsExtractContextPropagationFromPayload
),
contextGetter
),
attributes: {
Expand Down
11 changes: 11 additions & 0 deletions plugins/node/opentelemetry-instrumentation-aws-sdk/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -80,4 +80,15 @@ export interface AwsSdkInstrumentationConfig extends InstrumentationConfig {
* effectively causing those http spans to be non-recordable.
*/
suppressInternalInstrumentation?: boolean;

/**
* In some cases the context propagation headers may be found in the message payload
* rather than the message attribute.
* When this field is turned on the instrumentation will parse the payload and extract the
* context from there.
* Even if the field is on and MessageAttribute contains context propagation field are present,
* the MessageAttribute will get priority.
* By default it is off.
*/
sqsExtractContextPropagationFromPayload?: boolean;
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,19 @@ import { ReadableSpan } from '@opentelemetry/sdk-trace-base';
import { mockV2AwsSend } from './testing-utils';
import { Message } from 'aws-sdk/clients/sqs';
import * as expect from 'expect';
import * as sinon from 'sinon';
import * as messageAttributes from '../src/services/MessageAttributes';

const responseMockSuccess = {
requestId: '0000000000000',
error: null,
};

const extractContextSpy = sinon.spy(
messageAttributes,
'extractPropagationContext'
);

describe('SQS', () => {
before(() => {
AWS.config.credentials = {
Expand Down Expand Up @@ -429,4 +436,87 @@ describe('SQS', () => {
expect(processSpans[1].status.code).toStrictEqual(SpanStatusCode.UNSET);
});
});

describe('extract payload', () => {
beforeEach(() => {
extractContextSpy.resetHistory();
});
it('should not extract from payload even if set', async () => {
mockV2AwsSend(responseMockSuccess, {
Messages: [{ Body: JSON.stringify({ traceparent: 1 }) }],
} as AWS.SQS.Types.ReceiveMessageResult);

const sqs = new AWS.SQS();
await sqs
.receiveMessage({
QueueUrl: 'queue/url/for/unittests1',
})
.promise();
expect(extractContextSpy.returnValues[0]?.traceparent).toBeUndefined();
});

it('should extract from payload', async () => {
const traceparent = {
traceparent: {
StringValue:
'00-a1d050b7c8ad93c405e7a0d94cda5b03-23a485dc98b24027-01',
DataType: 'String',
},
};
instrumentation.setConfig({
sqsExtractContextPropagationFromPayload: true,
});
mockV2AwsSend(responseMockSuccess, {
Messages: [
{ Body: JSON.stringify({ MessageAttributes: { traceparent } }) },
],
} as AWS.SQS.Types.ReceiveMessageResult);

const sqs = new AWS.SQS();
await sqs
.receiveMessage({
QueueUrl: 'queue/url/for/unittests',
})
.promise();

expect(extractContextSpy.returnValues[0]?.traceparent).toStrictEqual(
traceparent
);
});

it('should not extract from payload but from attributes', async () => {
const traceparentInPayload = 'some-trace-parent-value';
const traceparentInMessageAttributes = {
traceparent: {
StringValue:
'00-a1d050b7c8ad93c405e7a0d94cda5b03-23a485dc98b24027-01',
DataType: 'String',
},
};
instrumentation.setConfig({
sqsExtractContextPropagationFromPayload: false,
});
mockV2AwsSend(responseMockSuccess, {
Messages: [
{
MessageAttributes: traceparentInMessageAttributes,
Body: JSON.stringify({
MessageAttributes: { traceparentInPayload },
}),
},
],
} as AWS.SQS.Types.ReceiveMessageResult);

const sqs = new AWS.SQS();
await sqs
.receiveMessage({
QueueUrl: 'queue/url/for/unittests',
})
.promise();

expect(extractContextSpy.returnValues[0]).toBe(
traceparentInMessageAttributes
);
});
});
});

0 comments on commit 28e2113

Please sign in to comment.