Skip to content

Commit

Permalink
fix: Include message_id correctly in producer audit log
Browse files Browse the repository at this point in the history
`evt.headers()` turns out to be unpopulated, so use the bound context data
instead. Addresses #128

Also:

- Use FakeMessage rather than mock
- Fix previous release's date in changelog
  • Loading branch information
timmc-edx committed Feb 10, 2023
1 parent e0450b7 commit 7871f28
Show file tree
Hide file tree
Showing 4 changed files with 45 additions and 24 deletions.
8 changes: 7 additions & 1 deletion CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,13 @@ Change Log
Unreleased
**********

[3.9.2] - 2023-02-07
[3.9.3] - 2023-02-10
********************
Fixed
=====
* Include ``message_id`` in audit log when message is produced (was ``None``)

[3.9.2] - 2023-02-08
********************
Fixed
=====
Expand Down
2 changes: 1 addition & 1 deletion edx_event_bus_kafka/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,4 @@
from edx_event_bus_kafka.internal.consumer import KafkaEventConsumer
from edx_event_bus_kafka.internal.producer import KafkaEventProducer, create_producer

__version__ = '3.9.2'
__version__ = '3.9.3'
6 changes: 4 additions & 2 deletions edx_event_bus_kafka/internal/producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
from openedx_events.tooling import OpenEdxPublicSignal

from .config import get_full_topic, get_schema_registry_client, load_common_settings
from .utils import AUDIT_LOGGING_ENABLED, HEADER_ID, _get_headers_from_metadata, last_message_header_value
from .utils import AUDIT_LOGGING_ENABLED, _get_headers_from_metadata

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -211,7 +211,9 @@ def on_event_deliver(self, err, evt):
if not AUDIT_LOGGING_ENABLED.is_enabled():
return

message_id = last_message_header_value(evt.headers() or [], HEADER_ID)
# `evt.headers()` is None in this callback, so we need to use the bound data.
# https://github.com/confluentinc/confluent-kafka-python/issues/574
message_id = self.event_metadata.id
# See ADR for details on why certain fields were included or omitted.
logger.info(
f"Message delivered to Kafka event bus: topic={evt.topic()}, partition={evt.partition()}, "
Expand Down
53 changes: 33 additions & 20 deletions edx_event_bus_kafka/internal/tests/test_producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from openedx_events.learning.data import UserData, UserPersonalData

import edx_event_bus_kafka.internal.producer as ep
from edx_event_bus_kafka.internal.tests.test_utils import FakeMessage

# See https://github.com/openedx/event-bus-kafka/blob/main/docs/decisions/0005-optional-import-of-confluent-kafka.rst
try:
Expand Down Expand Up @@ -108,23 +109,28 @@ def test_create_producer_configured(self):
assert isinstance(openedx_events.event_bus.get_producer(), ep.KafkaEventProducer)

@patch('edx_event_bus_kafka.internal.producer.logger')
@ddt.data(
(True, [('ce_id', b'id7890')], 'id7890'),
(True, None, 'None'),
(False, 'N/A', 'N/A'),
)
@ddt.unpack
def test_on_event_deliver(self, audit_logging, headers, exp_msg_id, mock_logger):
# TODO: Move FakeMessage from test_consumer to common module and use it here
fake_event = Mock()
fake_event.topic.return_value = 'some_topic'
fake_event.partition.return_value = 'some_partition'
fake_event.offset.return_value = 12345
fake_event.headers.return_value = headers
fake_event.key.return_value = 'some_key'

# simple producing context, we check the full object in other tests
context = ep.ProducingContext(full_topic='some_topic')
@ddt.data(True, False)
def test_on_event_deliver(self, audit_logging, mock_logger):
metadata = EventsMetadata(
event_type=self.signal.event_type,
time=datetime.datetime.now(datetime.timezone.utc),
sourcelib=(1, 2, 3),
)

context = ep.ProducingContext(
full_topic='some_topic',
event_key='foobob',
signal=self.signal,
initial_topic='bare_topic',
event_key_field='a.key.field',
event_data=self.event_data,
event_metadata=metadata,
)

fake_event = FakeMessage(
topic=context.full_topic, partition='some_partition', offset=12345,
key=b'\x00\x00\x00\x00\x01\x0cfoobob',
)

# ensure on_event_deliver reports the entire calling context if there was an error
context.on_event_deliver(Exception("problem!"), fake_event)
Expand All @@ -135,13 +141,20 @@ def test_on_event_deliver(self, audit_logging, headers, exp_msg_id, mock_logger)
assert "full_topic='some_topic'" in error_string
assert "error=problem!" in error_string

with override_settings(EVENT_BUS_KAFKA_AUDIT_LOGGING_ENABLED=audit_logging):
context.on_event_deliver(None, fake_event)
# Now check behavior with a delivered event
with patch.object(FakeMessage, 'headers') as mock_headers:
with override_settings(EVENT_BUS_KAFKA_AUDIT_LOGGING_ENABLED=audit_logging):
context.on_event_deliver(None, fake_event)

# We know that headers() is not implemented yet for the delivery
# callback, so fail early in unit tests if someone tries to use it.
# https://github.com/confluentinc/confluent-kafka-python/issues/574
mock_headers.assert_not_called()

if audit_logging:
mock_logger.info.assert_called_once_with(
'Message delivered to Kafka event bus: topic=some_topic, partition=some_partition, '
f'offset=12345, message_id={exp_msg_id}, key=some_key'
f'offset=12345, message_id={metadata.id}, key=b\'\\x00\\x00\\x00\\x00\\x01\\x0cfoobob\''
)
else:
mock_logger.info.assert_not_called()
Expand Down

0 comments on commit 7871f28

Please sign in to comment.