Skip to content

Commit

Permalink
botocore hooks (#679)
Browse files Browse the repository at this point in the history
* botocore hooks

* a single hook for all aws services

* fix test

* rename get_item_attributes variable to put_item_attributes

* rename tests

* delete redundant line
  • Loading branch information
ItayGibel-helios authored Sep 30, 2021
1 parent bba4b9e commit 7bc8f6c
Show file tree
Hide file tree
Showing 3 changed files with 228 additions and 41 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
([#670](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/670))
- `opentelemetry-instrumentation-redis` added request_hook and response_hook callbacks passed as arguments to the instrument method.
([#669](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/669))
- `opentelemetry-instrumentation-botocore` add `request_hook` and `response_hook` callbacks
([679](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/679))
- `opentelemetry-exporter-richconsole` Initial release
([#686](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/686))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,38 @@
API
---
The `instrument` method accepts the following keyword args:
tracer_provider (TracerProvider) - an optional tracer provider
request_hook (Callable) - a function with extra user-defined logic to be performed before performing the request
this function signature is: def request_hook(span: Span, service_name: str, operation_name: str, api_params: dict) -> None
response_hook (Callable) - a function with extra user-defined logic to be performed after performing the request
this function signature is: def request_hook(span: Span, service_name: str, operation_name: str, result: dict) -> None
for example:
.. code: python
from opentelemetry.instrumentation.botocore import BotocoreInstrumentor
import botocore
def request_hook(span, service_name, operation_name, api_params):
# request hook logic
def response_hook(span, service_name, operation_name, result):
# response hook logic
# Instrument Botocore with hooks
BotocoreInstrumentor().instrument(request_hook=request_hook, response_hooks=response_hook)
# This will create a span with Botocore-specific attributes, including custom attributes added from the hooks
session = botocore.session.get_session()
session.set_credentials(
access_key="access-key", secret_key="secret-key"
)
ec2 = self.session.create_client("ec2", region_name="us-west-2")
ec2.describe_instances()
"""

import json
Expand Down Expand Up @@ -91,16 +123,23 @@ class BotocoreInstrumentor(BaseInstrumentor):
See `BaseInstrumentor`
"""

def __init__(self):
super().__init__()
self.request_hook = None
self.response_hook = None

def instrumentation_dependencies(self) -> Collection[str]:
return _instruments

def _instrument(self, **kwargs):

# pylint: disable=attribute-defined-outside-init
self._tracer = get_tracer(
__name__, __version__, kwargs.get("tracer_provider")
)

self.request_hook = kwargs.get("request_hook")
self.response_hook = kwargs.get("response_hook")

wrap_function_wrapper(
"botocore.client",
"BaseClient._make_api_call",
Expand Down Expand Up @@ -159,21 +198,19 @@ def _patched_api_call(self, original_func, instance, args, kwargs):
):
BotocoreInstrumentor._patch_lambda_invoke(api_params)

if span.is_recording():
span.set_attribute("aws.operation", operation_name)
span.set_attribute("aws.region", instance.meta.region_name)
span.set_attribute("aws.service", service_name)
if "QueueUrl" in api_params:
span.set_attribute("aws.queue_url", api_params["QueueUrl"])
if "TableName" in api_params:
span.set_attribute(
"aws.table_name", api_params["TableName"]
)
self._set_api_call_attributes(
span, instance, service_name, operation_name, api_params
)

token = context_api.attach(
context_api.set_value(_SUPPRESS_HTTP_INSTRUMENTATION_KEY, True)
)

if callable(self.request_hook):
self.request_hook(
span, service_name, operation_name, api_params
)

try:
result = original_func(*args, **kwargs)
except ClientError as ex:
Expand All @@ -184,38 +221,58 @@ def _patched_api_call(self, original_func, instance, args, kwargs):
if error:
result = error.response

if span.is_recording():
if "ResponseMetadata" in result:
metadata = result["ResponseMetadata"]
req_id = None
if "RequestId" in metadata:
req_id = metadata["RequestId"]
elif "HTTPHeaders" in metadata:
headers = metadata["HTTPHeaders"]
if "x-amzn-RequestId" in headers:
req_id = headers["x-amzn-RequestId"]
elif "x-amz-request-id" in headers:
req_id = headers["x-amz-request-id"]
elif "x-amz-id-2" in headers:
req_id = headers["x-amz-id-2"]

if req_id:
span.set_attribute(
"aws.request_id", req_id,
)

if "RetryAttempts" in metadata:
span.set_attribute(
"retry_attempts", metadata["RetryAttempts"],
)

if "HTTPStatusCode" in metadata:
span.set_attribute(
SpanAttributes.HTTP_STATUS_CODE,
metadata["HTTPStatusCode"],
)
if callable(self.response_hook):
self.response_hook(span, service_name, operation_name, result)

self._set_api_call_result_attributes(span, result)

if error:
raise error

return result

@staticmethod
def _set_api_call_attributes(
span, instance, service_name, operation_name, api_params
):
if span.is_recording():
span.set_attribute("aws.operation", operation_name)
span.set_attribute("aws.region", instance.meta.region_name)
span.set_attribute("aws.service", service_name)
if "QueueUrl" in api_params:
span.set_attribute("aws.queue_url", api_params["QueueUrl"])
if "TableName" in api_params:
span.set_attribute("aws.table_name", api_params["TableName"])

@staticmethod
def _set_api_call_result_attributes(span, result):
if span.is_recording():
if "ResponseMetadata" in result:
metadata = result["ResponseMetadata"]
req_id = None
if "RequestId" in metadata:
req_id = metadata["RequestId"]
elif "HTTPHeaders" in metadata:
headers = metadata["HTTPHeaders"]
if "x-amzn-RequestId" in headers:
req_id = headers["x-amzn-RequestId"]
elif "x-amz-request-id" in headers:
req_id = headers["x-amz-request-id"]
elif "x-amz-id-2" in headers:
req_id = headers["x-amz-id-2"]

if req_id:
span.set_attribute(
"aws.request_id", req_id,
)

if "RetryAttempts" in metadata:
span.set_attribute(
"retry_attempts", metadata["RetryAttempts"],
)

if "HTTPStatusCode" in metadata:
span.set_attribute(
SpanAttributes.HTTP_STATUS_CODE,
metadata["HTTPStatusCode"],
)
Original file line number Diff line number Diff line change
Expand Up @@ -629,3 +629,131 @@ def test_dynamodb_client(self):
SpanAttributes.HTTP_STATUS_CODE: 200,
},
)

@mock_dynamodb2
def test_request_hook(self):
request_hook_service_attribute_name = "request_hook.service_name"
request_hook_operation_attribute_name = "request_hook.operation_name"
request_hook_api_params_attribute_name = "request_hook.api_params"

def request_hook(span, service_name, operation_name, api_params):
hook_attributes = {
request_hook_service_attribute_name: service_name,
request_hook_operation_attribute_name: operation_name,
request_hook_api_params_attribute_name: json.dumps(api_params),
}
if span and span.is_recording():
span.set_attributes(hook_attributes)

BotocoreInstrumentor().uninstrument()
BotocoreInstrumentor().instrument(request_hook=request_hook,)

self.session = botocore.session.get_session()
self.session.set_credentials(
access_key="access-key", secret_key="secret-key"
)

ddb = self.session.create_client("dynamodb", region_name="us-west-2")

test_table_name = "test_table_name"

ddb.create_table(
AttributeDefinitions=[
{"AttributeName": "id", "AttributeType": "S"},
],
KeySchema=[{"AttributeName": "id", "KeyType": "HASH"}],
ProvisionedThroughput={
"ReadCapacityUnits": 5,
"WriteCapacityUnits": 5,
},
TableName=test_table_name,
)

item = {"id": {"S": "test_key"}}

ddb.put_item(TableName=test_table_name, Item=item)

spans = self.memory_exporter.get_finished_spans()
assert spans
self.assertEqual(len(spans), 2)
put_item_attributes = spans[1].attributes

expected_api_params = json.dumps(
{"TableName": test_table_name, "Item": item}
)

self.assertEqual(
"dynamodb",
put_item_attributes.get(request_hook_service_attribute_name),
)
self.assertEqual(
"PutItem",
put_item_attributes.get(request_hook_operation_attribute_name),
)
self.assertEqual(
expected_api_params,
put_item_attributes.get(request_hook_api_params_attribute_name),
)

@mock_dynamodb2
def test_response_hook(self):
response_hook_service_attribute_name = "request_hook.service_name"
response_hook_operation_attribute_name = "response_hook.operation_name"
response_hook_result_attribute_name = "response_hook.result"

def response_hook(span, service_name, operation_name, result):
hook_attributes = {
response_hook_service_attribute_name: service_name,
response_hook_operation_attribute_name: operation_name,
response_hook_result_attribute_name: list(result.keys()),
}
if span and span.is_recording():
span.set_attributes(hook_attributes)

BotocoreInstrumentor().uninstrument()
BotocoreInstrumentor().instrument(response_hook=response_hook,)

self.session = botocore.session.get_session()
self.session.set_credentials(
access_key="access-key", secret_key="secret-key"
)

ddb = self.session.create_client("dynamodb", region_name="us-west-2")

test_table_name = "test_table_name"

ddb.create_table(
AttributeDefinitions=[
{"AttributeName": "id", "AttributeType": "S"},
],
KeySchema=[{"AttributeName": "id", "KeyType": "HASH"}],
ProvisionedThroughput={
"ReadCapacityUnits": 5,
"WriteCapacityUnits": 5,
},
TableName=test_table_name,
)

item = {"id": {"S": "test_key"}}

ddb.put_item(TableName=test_table_name, Item=item)

spans = self.memory_exporter.get_finished_spans()
assert spans
self.assertEqual(len(spans), 2)
put_item_attributes = spans[1].attributes

expected_result_keys = ("ResponseMetadata",)

self.assertEqual(
"dynamodb",
put_item_attributes.get(response_hook_service_attribute_name),
)
self.assertEqual(
"PutItem",
put_item_attributes.get(response_hook_operation_attribute_name),
)
self.assertEqual(
expected_result_keys,
put_item_attributes.get(response_hook_result_attribute_name),
)

0 comments on commit 7bc8f6c

Please sign in to comment.