Skip to content

Commit

Permalink
feat: Add custom metric to track whether a message processed.
Browse files Browse the repository at this point in the history
New custom metric `kafka_received_message` added to span
contexts.
  • Loading branch information
dianakhuang committed Aug 2, 2024
1 parent 823645f commit b8ee95c
Show file tree
Hide file tree
Showing 4 changed files with 16 additions and 1 deletion.
6 changes: 6 additions & 0 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,12 @@ Change Log
Unreleased
**********

[5.8.1] - 2024-08-02
********************
Changed
=======
* Monitoring: Add a custom attribute, ``kafka_received_message`` to track whether a message was processed or not.

[5.8.0] - 2024-08-01
********************
Changed
Expand Down
2 changes: 1 addition & 1 deletion edx_event_bus_kafka/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,4 @@
from edx_event_bus_kafka.internal.consumer import KafkaEventConsumer
from edx_event_bus_kafka.internal.producer import KafkaEventProducer, create_producer

__version__ = '5.8.0'
__version__ = '5.8.1'
8 changes: 8 additions & 0 deletions edx_event_bus_kafka/internal/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -575,6 +575,10 @@ def _add_message_monitoring(self, run_context, message, error=None):
set_custom_attribute('kafka_topic', run_context['full_topic'])

if kafka_message:
# .. custom_attribute_name: kafka_received_message
# .. custom_attribute_description: True if we are processing a message with this span, False otherwise.
set_custom_attribute('kafka_received_message', True)

# .. custom_attribute_name: kafka_partition
# .. custom_attribute_description: The partition of the message.
set_custom_attribute('kafka_partition', kafka_message.partition())
Expand All @@ -594,6 +598,10 @@ def _add_message_monitoring(self, run_context, message, error=None):
# .. custom_attribute_description: The event type of the message. Note that the header in the logs
# will use 'ce_type'.
set_custom_attribute('kafka_event_type', ",".join(event_types))
else:
# .. custom_attribute_name: kafka_received_message
# .. custom_attribute_description: True if we are processing a message with this span.
set_custom_attribute('kafka_received_message', False)

if kafka_error:
# .. custom_attribute_name: kafka_error_fatal
Expand Down
1 change: 1 addition & 0 deletions edx_event_bus_kafka/internal/tests/test_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -431,6 +431,7 @@ def poll_side_effect(*args, **kwargs):
expected_custom_attribute_calls += [
call("kafka_message_id", "1111-1111"),
call("kafka_event_type", "org.openedx.learning.auth.session.login.completed.v1"),
call("kafka_received_message", True),
]
if has_kafka_error:
expected_custom_attribute_calls += [
Expand Down

0 comments on commit b8ee95c

Please sign in to comment.