Skip to content

Commit

Permalink
feat: Log warning in consumer when receivers fail with error (#80)
Browse files Browse the repository at this point in the history
This is part of #62

Also includes a foundational commit for improving testing of the signal/receivers.
Using a mock receiver rather than a mock signal allows more realistic tests.
  • Loading branch information
timmc-edx authored Nov 9, 2022
1 parent c967c1e commit 5dda415
Show file tree
Hide file tree
Showing 4 changed files with 106 additions and 11 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,12 @@ Change Log
Unreleased
**********

[1.8.0] - 2022-11-09
********************
Added
=====
* Consumer logs a warning for receivers that fail with an exception

[1.7.0] - 2022-11-04
********************

Expand Down
2 changes: 1 addition & 1 deletion edx_event_bus_kafka/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,4 @@
from edx_event_bus_kafka.internal.consumer import KafkaEventConsumer
from edx_event_bus_kafka.internal.producer import KafkaEventProducer, get_producer

__version__ = '1.7.0'
__version__ = '1.8.0'
27 changes: 26 additions & 1 deletion edx_event_bus_kafka/internal/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,32 @@ def emit_signals_from_message(self, msg):
f"Received message of type {event_type_str}."
)

self.signal.send_event(**msg.value())
send_results = self.signal.send_event(**msg.value())
self.report_receiver_errors(send_results)

def report_receiver_errors(self, send_results):
"""
Takes the list of (receiver, response) pairs and logs a warning if any are errors.
"""
error_list = []
for receiver, response in send_results:
if not isinstance(response, BaseException):
continue

# Probably every receiver will be a regular function or even a lambda with
# these attrs, so this check is just to be safe.
try:
receiver_name = f"{receiver.__module__}.{receiver.__qualname__}"
except AttributeError:
receiver_name = str(receiver)

error_list.append(f"{receiver_name}={response!r}")

if len(error_list) > 0:
logger.warning(
f"{len(error_list)} receiver(s) out of {len(send_results)} "
f"produced errors when handling signal {self.signal}: {', '.join(error_list)}"
)

def record_event_consuming_error(self, run_context, error, maybe_event):
"""
Expand Down
82 changes: 73 additions & 9 deletions edx_event_bus_kafka/internal/tests/test_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from django.test import TestCase
from django.test.utils import override_settings
from openedx_events.learning.data import UserData, UserPersonalData
from openedx_events.learning.signals import SESSION_LOGIN_COMPLETED

from edx_event_bus_kafka.internal.consumer import KafkaEventConsumer, UnusableMessageError
from edx_event_bus_kafka.management.commands.consume_events import Command
Expand Down Expand Up @@ -65,6 +66,14 @@ def error(self):
return self._error


def fake_receiver_returns_quietly(**kwargs):
return


def fake_receiver_raises_error(**kwargs):
raise Exception("receiver whoops")


@override_settings(
EVENT_BUS_KAFKA_SCHEMA_REGISTRY_URL='https://test-url',
EVENT_BUS_KAFKA_BOOTSTRAP_SERVERS='bootstrap-servers',
Expand Down Expand Up @@ -102,8 +111,37 @@ def setUp(self):
value=self.normal_event_data,
error=None,
)
self.mock_signal = Mock(event_type=self.signal_type, init_data={})
self.event_consumer = KafkaEventConsumer('some-topic', 'test_group_id', self.mock_signal)
self.mock_receiver = Mock()
self.signal = SESSION_LOGIN_COMPLETED
self.signal.connect(fake_receiver_returns_quietly)
self.signal.connect(fake_receiver_raises_error)
self.signal.connect(self.mock_receiver)
self.event_consumer = KafkaEventConsumer('some-topic', 'test_group_id', self.signal)

def tearDown(self):
self.signal.disconnect(fake_receiver_returns_quietly)
self.signal.disconnect(fake_receiver_raises_error)
self.signal.disconnect(self.mock_receiver)

def assert_signal_sent_with(self, signal, data):
"""
Check that a signal-send came in as expected to the mock receiver.
"""
self.mock_receiver.assert_called_once()
call_kwargs = self.mock_receiver.call_args[1]

# Standard signal stuff
assert call_kwargs['signal'] == signal
assert call_kwargs['sender'] is None

# There should just be one key-value pair in the data for all OpenEdxPublicEvents
((event_top_key, event_contents),) = data.items()
assert call_kwargs[event_top_key] == event_contents

# There should also be a metadata key -- spot-check it
metadata = call_kwargs['metadata']
assert metadata.event_type == signal.event_type
assert metadata.sourcehost is not None

@override_settings(EVENT_BUS_KAFKA_CONSUMERS_ENABLED=False)
@patch('edx_event_bus_kafka.internal.consumer.logger', autospec=True)
Expand Down Expand Up @@ -162,7 +200,8 @@ def fake_emit(*args, **kwargs):
assert "Error consuming event from Kafka: Exception('something broke') in context" in exc_log_msg
assert "full_topic='prod-some-topic'" in exc_log_msg
assert "consumer_group='test_group_id'" in exc_log_msg
assert f"expected_signal={self.mock_signal!r}" in exc_log_msg
assert ("expected_signal=<OpenEdxPublicSignal: "
"org.openedx.learning.auth.session.login.completed.v1>") in exc_log_msg
assert "-- event details: " in exc_log_msg
assert "'partition': 2" in exc_log_msg
assert "'offset': 12345" in exc_log_msg
Expand Down Expand Up @@ -205,7 +244,8 @@ def poll_side_effect(*args, **kwargs):
assert "Error consuming event from Kafka: Exception('something random') in context" in exc_log_msg
assert "full_topic='prod-some-topic'" in exc_log_msg
assert "consumer_group='test_group_id'" in exc_log_msg
assert f"expected_signal={self.mock_signal!r}" in exc_log_msg
assert ("expected_signal=<OpenEdxPublicSignal: "
"org.openedx.learning.auth.session.login.completed.v1>") in exc_log_msg
assert "-- no event available" in exc_log_msg

# No-event sleep branch was triggered
Expand Down Expand Up @@ -235,9 +275,33 @@ def test_check_event_error(self):
@patch('edx_event_bus_kafka.internal.consumer.logger', autospec=True)
def test_emit(self, mock_logger):
self.event_consumer.emit_signals_from_message(self.normal_message)
self.assert_signal_sent_with(self.signal, self.normal_event_data)
mock_logger.warning.assert_called_once_with(
"1 receiver(s) out of 3 produced errors when handling signal <OpenEdxPublicSignal: "
"org.openedx.learning.auth.session.login.completed.v1>: "
"edx_event_bus_kafka.internal.tests.test_consumer.fake_receiver_raises_error="
"Exception('receiver whoops')"
)

mock_logger.error.assert_not_called()
self.mock_signal.send_event.assert_called_once_with(**self.normal_event_data)
@patch('edx_event_bus_kafka.internal.consumer.logger', autospec=True)
def test_malformed_receiver_errors(self, mock_logger):
"""
Ensure that even a really messed-up receiver is still logged safely.
"""
self.event_consumer.report_receiver_errors([
(lambda x:x, Exception("for lambda")),
# This would actually raise an error inside send_robust(), but it will serve well enough for testing...
("not even a function", Exception("just plain bad")),
])
mock_logger.warning.assert_called_once_with(
"2 receiver(s) out of 2 produced errors when handling signal <OpenEdxPublicSignal: "
"org.openedx.learning.auth.session.login.completed.v1>: "

"edx_event_bus_kafka.internal.tests.test_consumer.TestEmitSignals."
"test_malformed_receiver_errors.<locals>.<lambda>=Exception('for lambda'), "

"not even a function=Exception('just plain bad')"
)

def test_no_type(self):
msg = copy.copy(self.normal_message)
Expand All @@ -249,7 +313,7 @@ def test_no_type(self):
assert excinfo.value.args == (
"Missing ce_type header on message, cannot determine signal",
)
assert not self.mock_signal.send_event.called
assert not self.mock_receiver.called

def test_multiple_types(self):
"""
Expand All @@ -264,7 +328,7 @@ def test_multiple_types(self):
assert excinfo.value.args == (
"Multiple ce_type headers found on message, cannot determine signal",
)
assert not self.mock_signal.send_event.called
assert not self.mock_receiver.called

def test_unexpected_signal_type_in_header(self):
msg = copy.copy(self.normal_message)
Expand All @@ -278,7 +342,7 @@ def test_unexpected_signal_type_in_header(self):
"Signal types do not match. Expected org.openedx.learning.auth.session.login.completed.v1. "
"Received message of type xxxx.",
)
assert not self.mock_signal.send_event.called
assert not self.mock_receiver.called

def test_no_commit_if_no_error_logged(self):
"""
Expand Down

0 comments on commit 5dda415

Please sign in to comment.