-
Notifications
You must be signed in to change notification settings - Fork 544
/
Copy pathMessageAttributes.ts
108 lines (102 loc) · 3.1 KB
/
MessageAttributes.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
/*
* Copyright The OpenTelemetry Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import {
TextMapGetter,
TextMapSetter,
context,
propagation,
diag,
} from '@opentelemetry/api';
import type { SQS, SNS } from 'aws-sdk';
// https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-quotas.html
export const MAX_MESSAGE_ATTRIBUTES = 10;
class ContextSetter
implements
TextMapSetter<SQS.MessageBodyAttributeMap | SNS.MessageAttributeMap>
{
set(
carrier: SQS.MessageBodyAttributeMap | SNS.MessageAttributeMap,
key: string,
value: string
) {
carrier[key] = {
DataType: 'String',
StringValue: value as string,
};
}
}
export const contextSetter = new ContextSetter();
export interface AwsSdkContextObject {
[key: string]: {
StringValue?: string;
Value?: string;
};
}
class ContextGetter
implements
TextMapGetter<SQS.MessageBodyAttributeMap | SNS.MessageAttributeMap>
{
keys(
carrier: SQS.MessageBodyAttributeMap | SNS.MessageAttributeMap
): string[] {
return Object.keys(carrier);
}
get(
carrier: AwsSdkContextObject,
key: string
): undefined | string | string[] {
return carrier?.[key]?.StringValue || carrier?.[key]?.Value;
}
}
export const contextGetter = new ContextGetter();
export const injectPropagationContext = (
attributesMap?: SQS.MessageBodyAttributeMap | SNS.MessageAttributeMap
): SQS.MessageBodyAttributeMap | SNS.MessageAttributeMap => {
const attributes = attributesMap ?? {};
if (
Object.keys(attributes).length + propagation.fields().length <=
MAX_MESSAGE_ATTRIBUTES
) {
propagation.inject(context.active(), attributes, contextSetter);
} else {
diag.warn(
'aws-sdk instrumentation: cannot set context propagation on SQS/SNS message due to maximum amount of MessageAttributes'
);
}
return attributes;
};
export const extractPropagationContext = (
message: SQS.Message,
sqsExtractContextPropagationFromPayload: boolean | undefined
): AwsSdkContextObject | undefined => {
const propagationFields = propagation.fields();
const hasPropagationFields = Object.keys(
message.MessageAttributes || []
).some(attr => propagationFields.includes(attr));
if (hasPropagationFields) {
return message.MessageAttributes;
} else if (sqsExtractContextPropagationFromPayload && message.Body) {
try {
const payload = JSON.parse(message.Body);
return payload.MessageAttributes;
} catch {
diag.debug(
'failed to parse SQS payload to extract context propagation, trace might be incomplete.'
);
}
}
return undefined;
};