Skip to content

Commit

Permalink
botocore hooks
Browse files Browse the repository at this point in the history
  • Loading branch information
ItayGibel-helios committed Sep 15, 2021
1 parent db636a4 commit f0c9dcb
Show file tree
Hide file tree
Showing 3 changed files with 205 additions and 43 deletions.
4 changes: 2 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Added
- `opentelemetry-instrumentation-elasticsearch` Added `response_hook` and `request_hook` callbacks
([#670](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/670))

### Added
- `opentelemetry-instrumentation-redis` added request_hook and response_hook callbacks passed as arguments to the instrument method.
([#669](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/669))
- `opentelemetry-instrumentation-botocore` add request_hooks and response_hooks
([679](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/679))

### Changed
- `opentelemetry-instrumentation-botocore` Unpatch botocore Endpoint.prepare_request on uninstrument
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,40 @@
API
---
The `instrument` method accepts the following keyword args:
tracer_provider (TracerProvider) - an optional tracer provider
request_hooks (dict) - a mapping between service names their respective callable request hooks
* a request hook signature is: def request_hook(span: Span, operation_name: str, api_params: dict) -> None
response_hooks (dict) - a mapping between service names their respective callable response hooks
* a response hook signature is: def response_hook(span: Span, operation_name: str, result: dict) -> None
for example:
.. code: python
from opentelemetry.instrumentation.botocore import BotocoreInstrumentor
import botocore
def ec2_request_hook(span, operation_name, api_params):
# request hook logic
def ec2_response_hook(span, operation_name, result):
# response hook logic
# Instrument Botocore with hooks
BotocoreInstrumentor().instrument(
request_hooks={"ec2": ec2_request_hook}, response_hooks={"ec2": ec2_response_hook}
)
# This will create a span with Botocore-specific attributes, including custom attributes added from the hooks
session = botocore.session.get_session()
session.set_credentials(
access_key="access-key", secret_key="secret-key"
)
ec2 = self.session.create_client("ec2", region_name="us-west-2")
ec2.describe_instances()
"""

import json
Expand Down Expand Up @@ -91,16 +125,29 @@ class BotocoreInstrumentor(BaseInstrumentor):
See `BaseInstrumentor`
"""

def __init__(self):
super().__init__()
self.request_hooks = dict()
self.response_hooks = dict()

def instrumentation_dependencies(self) -> Collection[str]:
return _instruments

def _instrument(self, **kwargs):

# pylint: disable=attribute-defined-outside-init
self._tracer = get_tracer(
__name__, __version__, kwargs.get("tracer_provider")
)

request_hooks = kwargs.get("request_hooks")
response_hooks = kwargs.get("response_hooks")

if isinstance(request_hooks, dict):
self.request_hooks = request_hooks

if isinstance(response_hooks, dict):
self.response_hooks = response_hooks

wrap_function_wrapper(
"botocore.client",
"BaseClient._make_api_call",
Expand Down Expand Up @@ -159,21 +206,18 @@ def _patched_api_call(self, original_func, instance, args, kwargs):
):
BotocoreInstrumentor._patch_lambda_invoke(api_params)

if span.is_recording():
span.set_attribute("aws.operation", operation_name)
span.set_attribute("aws.region", instance.meta.region_name)
span.set_attribute("aws.service", service_name)
if "QueueUrl" in api_params:
span.set_attribute("aws.queue_url", api_params["QueueUrl"])
if "TableName" in api_params:
span.set_attribute(
"aws.table_name", api_params["TableName"]
)
self._set_api_call_attributes(
span, instance, service_name, operation_name, api_params
)

token = context_api.attach(
context_api.set_value(_SUPPRESS_HTTP_INSTRUMENTATION_KEY, True)
)

self.apply_request_hook(
span, service_name, operation_name, api_params
)

try:
result = original_func(*args, **kwargs)
except ClientError as ex:
Expand All @@ -184,38 +228,73 @@ def _patched_api_call(self, original_func, instance, args, kwargs):
if error:
result = error.response

if span.is_recording():
if "ResponseMetadata" in result:
metadata = result["ResponseMetadata"]
req_id = None
if "RequestId" in metadata:
req_id = metadata["RequestId"]
elif "HTTPHeaders" in metadata:
headers = metadata["HTTPHeaders"]
if "x-amzn-RequestId" in headers:
req_id = headers["x-amzn-RequestId"]
elif "x-amz-request-id" in headers:
req_id = headers["x-amz-request-id"]
elif "x-amz-id-2" in headers:
req_id = headers["x-amz-id-2"]

if req_id:
span.set_attribute(
"aws.request_id", req_id,
)

if "RetryAttempts" in metadata:
span.set_attribute(
"retry_attempts", metadata["RetryAttempts"],
)

if "HTTPStatusCode" in metadata:
span.set_attribute(
SpanAttributes.HTTP_STATUS_CODE,
metadata["HTTPStatusCode"],
)
self.apply_response_hook(
span, service_name, operation_name, result
)

self._set_api_call_result_attributes(span, result)

if error:
raise error

return result

@staticmethod
def _set_api_call_attributes(
span, instance, service_name, operation_name, api_params
):
if span.is_recording():
span.set_attribute("aws.operation", operation_name)
span.set_attribute("aws.region", instance.meta.region_name)
span.set_attribute("aws.service", service_name)
if "QueueUrl" in api_params:
span.set_attribute("aws.queue_url", api_params["QueueUrl"])
if "TableName" in api_params:
span.set_attribute("aws.table_name", api_params["TableName"])

@staticmethod
def _set_api_call_result_attributes(span, result):
if span.is_recording():
if "ResponseMetadata" in result:
metadata = result["ResponseMetadata"]
req_id = None
if "RequestId" in metadata:
req_id = metadata["RequestId"]
elif "HTTPHeaders" in metadata:
headers = metadata["HTTPHeaders"]
if "x-amzn-RequestId" in headers:
req_id = headers["x-amzn-RequestId"]
elif "x-amz-request-id" in headers:
req_id = headers["x-amz-request-id"]
elif "x-amz-id-2" in headers:
req_id = headers["x-amz-id-2"]

if req_id:
span.set_attribute(
"aws.request_id", req_id,
)

if "RetryAttempts" in metadata:
span.set_attribute(
"retry_attempts", metadata["RetryAttempts"],
)

if "HTTPStatusCode" in metadata:
span.set_attribute(
SpanAttributes.HTTP_STATUS_CODE,
metadata["HTTPStatusCode"],
)

def apply_request_hook(
self, span, service_name, operation_name, api_params
):
if service_name in self.request_hooks:
request_hook = self.request_hooks.get(service_name)
if callable(request_hook):
request_hook(span, operation_name, api_params)

def apply_response_hook(self, span, service_name, operation_name, result):
if service_name in self.response_hooks:
response_hook = self.response_hooks.get(service_name)
if callable(response_hook):
response_hook(span, operation_name, result)
Original file line number Diff line number Diff line change
Expand Up @@ -629,3 +629,86 @@ def test_dynamodb_client(self):
SpanAttributes.HTTP_STATUS_CODE: 200,
},
)

@mock_dynamodb2
def test_hooks(self):
request_hook_operation_attribute_name = "request_hook.operation_name"
request_hook_api_params_attribute_name = "request_hook.api_params"
response_hook_operation_attribute_name = "response_hook.operation_name"
response_hook_result_attribute_name = "response_hook.result"

def request_hook(span, operation_name, api_params):
hook_attributes = {
request_hook_operation_attribute_name: operation_name,
request_hook_api_params_attribute_name: json.dumps(api_params),
}
if span and span.is_recording():
span.set_attributes(hook_attributes)

def response_hook(span, operation_name, result):
if span and span.is_recording():
span.set_attribute(
response_hook_operation_attribute_name, operation_name,
)
span.set_attribute(
response_hook_result_attribute_name, list(result.keys()),
)

BotocoreInstrumentor().uninstrument()
BotocoreInstrumentor().instrument(
request_hooks={"dynamodb": request_hook},
response_hooks={"dynamodb": response_hook},
)

self.session = botocore.session.get_session()
self.session.set_credentials(
access_key="access-key", secret_key="secret-key"
)

ddb = self.session.create_client("dynamodb", region_name="us-west-2")

test_table_name = "test_table_name"

ddb.create_table(
AttributeDefinitions=[
{"AttributeName": "id", "AttributeType": "S"},
],
KeySchema=[{"AttributeName": "id", "KeyType": "HASH"}],
ProvisionedThroughput={
"ReadCapacityUnits": 5,
"WriteCapacityUnits": 5,
},
TableName=test_table_name,
)

item = {"id": {"S": "test_key"}}

ddb.put_item(TableName=test_table_name, Item=item)

spans = self.memory_exporter.get_finished_spans()
assert spans
self.assertEqual(len(spans), 2)
get_item_attributes = spans[1].attributes

expected_api_params = json.dumps(
{"TableName": test_table_name, "Item": item}
)

expected_result_keys = ("ConsumedCapacity", "ResponseMetadata")

self.assertEqual(
"PutItem",
get_item_attributes.get(request_hook_operation_attribute_name),
)
self.assertEqual(
expected_api_params,
get_item_attributes.get(request_hook_api_params_attribute_name),
)
self.assertEqual(
"PutItem",
get_item_attributes.get(response_hook_operation_attribute_name),
)
self.assertEqual(
expected_result_keys,
get_item_attributes.get(response_hook_result_attribute_name),
)

0 comments on commit f0c9dcb

Please sign in to comment.