diff --git a/CHANGELOG.rst b/CHANGELOG.rst index 6719ad3..2738683 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -14,6 +14,12 @@ Change Log Unreleased ********** +[5.8.1] - 2024-08-02 +******************** +Changed +======= +* Monitoring: Add a custom attribute, ``kafka_received_message`` to track whether a message was processed or not. + [5.8.0] - 2024-08-01 ******************** Changed diff --git a/edx_event_bus_kafka/__init__.py b/edx_event_bus_kafka/__init__.py index 6e7d744..c80547c 100644 --- a/edx_event_bus_kafka/__init__.py +++ b/edx_event_bus_kafka/__init__.py @@ -9,4 +9,4 @@ from edx_event_bus_kafka.internal.consumer import KafkaEventConsumer from edx_event_bus_kafka.internal.producer import KafkaEventProducer, create_producer -__version__ = '5.8.0' +__version__ = '5.8.1' diff --git a/edx_event_bus_kafka/internal/consumer.py b/edx_event_bus_kafka/internal/consumer.py index 1318220..363e0cc 100644 --- a/edx_event_bus_kafka/internal/consumer.py +++ b/edx_event_bus_kafka/internal/consumer.py @@ -575,6 +575,10 @@ def _add_message_monitoring(self, run_context, message, error=None): set_custom_attribute('kafka_topic', run_context['full_topic']) if kafka_message: + # .. custom_attribute_name: kafka_received_message + # .. custom_attribute_description: True if we are processing a message with this span, False otherwise. + set_custom_attribute('kafka_received_message', True) + # .. custom_attribute_name: kafka_partition # .. custom_attribute_description: The partition of the message. set_custom_attribute('kafka_partition', kafka_message.partition()) @@ -594,6 +598,10 @@ def _add_message_monitoring(self, run_context, message, error=None): # .. custom_attribute_description: The event type of the message. Note that the header in the logs # will use 'ce_type'. set_custom_attribute('kafka_event_type', ",".join(event_types)) + else: + # .. custom_attribute_name: kafka_received_message + # .. custom_attribute_description: True if we are processing a message with this span. + set_custom_attribute('kafka_received_message', False) if kafka_error: # .. custom_attribute_name: kafka_error_fatal diff --git a/edx_event_bus_kafka/internal/tests/test_consumer.py b/edx_event_bus_kafka/internal/tests/test_consumer.py index c62db62..1590253 100644 --- a/edx_event_bus_kafka/internal/tests/test_consumer.py +++ b/edx_event_bus_kafka/internal/tests/test_consumer.py @@ -231,6 +231,7 @@ def raise_exception(): call("kafka_partition", 2), call("kafka_offset", 12345), call("kafka_event_type", "org.openedx.learning.auth.session.login.completed.v1"), + call("kafka_received_message", True), ] * len(mock_emit_side_effects), any_order=True, ) @@ -431,7 +432,13 @@ def poll_side_effect(*args, **kwargs): expected_custom_attribute_calls += [ call("kafka_message_id", "1111-1111"), call("kafka_event_type", "org.openedx.learning.auth.session.login.completed.v1"), + call("kafka_received_message", True), ] + else: + expected_custom_attribute_calls += [ + call("kafka_received_message", False), + ] + if has_kafka_error: expected_custom_attribute_calls += [ call('kafka_error_fatal', is_fatal),