Skip to content

Commit

Permalink
fixup! Improve coverage and error messages
Browse files Browse the repository at this point in the history
- Provide better error message when get_serializers cannot get a client
- Add basic test coverage for serializer construction

I can't figure out how to test the serializers, though -- they want to
talk to a server.

Also:

- Lift test data to be instance variables
  • Loading branch information
timmc-edx committed Aug 26, 2022
1 parent bbf0585 commit efad224
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 24 deletions.
3 changes: 3 additions & 0 deletions edx_event_bus_kafka/publishing/event_producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,9 @@ def get_serializers(signal: OpenEdxPublicSignal, event_key_field: str):
2-tuple of AvroSignalSerializers, for event key and value
"""
client = get_schema_registry_client()
if client is None:
raise Exception('Cannot create Kafka serializers -- missing library or settings')

signal_serializer = AvroSignalSerializer(signal)

def inner_to_dict(event_data, ctx=None): # pylint: disable=unused-argument
Expand Down
55 changes: 31 additions & 24 deletions edx_event_bus_kafka/publishing/test_event_producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,20 @@

import edx_event_bus_kafka.publishing.event_producer as ep

# See https://github.com/openedx/event-bus-kafka/blob/main/docs/decisions/0005-optional-import-of-confluent-kafka.rst
try:
from confluent_kafka.schema_registry.avro import AvroSerializer
except ImportError: # pragma: no cover
pass


class TestEventProducer(TestCase):
"""Test producer."""

def test_extract_event_key(self):
event_data = {
def setUp(self):
super().setUp()
self.signal = openedx_events.learning.signals.SESSION_LOGIN_COMPLETED
self.event_data = {
'user': UserData(
id=123,
is_active=True,
Expand All @@ -31,17 +39,17 @@ def test_extract_event_key(self):
)
}

assert ep.extract_event_key(event_data, 'user.pii.username') == 'foobob'
def test_extract_event_key(self):
assert ep.extract_event_key(self.event_data, 'user.pii.username') == 'foobob'
with pytest.raises(Exception,
match="Could not extract key from event; lookup in xxx failed at 'xxx' in dictionary"):
ep.extract_event_key(event_data, 'xxx')
ep.extract_event_key(self.event_data, 'xxx')
with pytest.raises(Exception,
match="Could not extract key from event; lookup in user.xxx failed at 'xxx' in object"):
ep.extract_event_key(event_data, 'user.xxx')
ep.extract_event_key(self.event_data, 'user.xxx')

def test_descend_avro_schema(self):
signal = openedx_events.learning.signals.SESSION_LOGIN_COMPLETED
schema = AvroSignalSerializer(signal).schema
schema = AvroSignalSerializer(self.signal).schema

assert ep.descend_avro_schema(schema, ['user', 'pii', 'username']) == {"name": "username", "type": "string"}

Expand All @@ -51,10 +59,19 @@ def test_descend_avro_schema(self):
assert isinstance(excinfo.value.__cause__, IndexError)

def test_extract_key_schema(self):
signal = openedx_events.learning.signals.SESSION_LOGIN_COMPLETED
schema = ep.extract_key_schema(AvroSignalSerializer(signal), 'user.pii.username')
schema = ep.extract_key_schema(AvroSignalSerializer(self.signal), 'user.pii.username')
assert schema == '{"name": "username", "type": "string"}'

def test_serializers_configured(self):
with override_settings(EVENT_BUS_KAFKA_SCHEMA_REGISTRY_URL='http://localhost:12345'):
key_ser, value_ser = ep.get_serializers(self.signal, 'user.id')
assert isinstance(key_ser, AvroSerializer)
assert isinstance(value_ser, AvroSerializer)

def test_serializers_unconfigured(self):
with pytest.raises(Exception, match="missing library or settings"):
ep.get_serializers(self.signal, 'user.id')

def test_get_producer_unconfigured(self):
"""With missing essential settings, just warn and return None."""
with warnings.catch_warnings(record=True) as caught_warnings:
Expand Down Expand Up @@ -100,28 +117,18 @@ def test_on_event_deliver(self, mock_logger):
)
)
def test_send_to_event_bus(self, mock_get_serializers):
signal = openedx_events.learning.signals.SESSION_LOGIN_COMPLETED
event_data = {
'user': UserData(
id=123,
is_active=True,
pii=UserPersonalData(
username='foobob',
email='bob@foo.example',
name="Bob Foo",
)
)
}

with override_settings(
EVENT_BUS_KAFKA_SCHEMA_REGISTRY_URL='http://localhost:12345',
EVENT_BUS_KAFKA_BOOTSTRAP_SERVERS='http://localhost:54321',
):
producer_api = ep.get_producer()
with patch.object(producer_api, 'producer', autospec=True) as mock_producer:
producer_api.send(signal=signal, topic='user_stuff', event_key_field='user.id', event_data=event_data)
producer_api.send(
signal=self.signal, topic='user_stuff',
event_key_field='user.id', event_data=self.event_data
)

mock_get_serializers.assert_called_once_with(signal, 'user.id')
mock_get_serializers.assert_called_once_with(self.signal, 'user.id')

mock_producer.produce.assert_called_once_with(
'user_stuff', key=b'key-bytes-here', value=b'value-bytes-here',
Expand Down

0 comments on commit efad224

Please sign in to comment.