diff --git a/CHANGELOG.rst b/CHANGELOG.rst index 31434e0..a883efe 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -16,8 +16,16 @@ Unreleased * +[0.2.0] - 2022-08-09 +~~~~~~~~~~~~~~~~~~~~ + +Fixed +_____ + +* Cache producers so that they don't lose data. + [0.1.0] - 2022-06-16 -~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ +~~~~~~~~~~~~~~~~~~~~ Added _____ diff --git a/edx_event_bus_kafka/__init__.py b/edx_event_bus_kafka/__init__.py index 6ea8237..63623c6 100644 --- a/edx_event_bus_kafka/__init__.py +++ b/edx_event_bus_kafka/__init__.py @@ -2,4 +2,4 @@ Kafka implementation for Open edX event bus. """ -__version__ = '0.1.1' +__version__ = '0.2.0' diff --git a/edx_event_bus_kafka/publishing/event_producer.py b/edx_event_bus_kafka/publishing/event_producer.py index 52c4c6d..aa891e3 100644 --- a/edx_event_bus_kafka/publishing/event_producer.py +++ b/edx_event_bus_kafka/publishing/event_producer.py @@ -115,6 +115,11 @@ def get_serializer(signal: OpenEdxPublicSignal) -> AvroSignalSerializer: return AvroSignalSerializer(signal) +# Note: This caching is required, since otherwise the Producer will +# fall out of scope and be garbage-collected, destroying the +# outbound-message queue and threads. The use of this cache allows the +# producers to be long-lived. +@lru_cache def get_producer_for_signal(signal: OpenEdxPublicSignal, event_key_field: str) -> Optional[SerializingProducer]: """ Create the producer for a signal and a key field path. diff --git a/edx_event_bus_kafka/publishing/test_event_producer.py b/edx_event_bus_kafka/publishing/test_event_producer.py index 1f5e4b1..2125980 100644 --- a/edx_event_bus_kafka/publishing/test_event_producer.py +++ b/edx_event_bus_kafka/publishing/test_event_producer.py @@ -19,6 +19,11 @@ class TestEventProducer(TestCase): """Test producer.""" + def setUp(self): + super().setUp() + ep.get_producer_for_signal.cache_clear() + ep.get_serializer.cache_clear() + def test_extract_event_key(self): event_data = { 'user': UserData( @@ -56,17 +61,18 @@ def test_extract_key_schema(self): schema = ep.extract_key_schema(AvroSignalSerializer(signal), 'user.pii.username') assert schema == '{"name": "username", "type": "string"}' - def test_get_producer_for_signal(self): + def test_get_producer_for_signal_unconfigured(self): + """With missing essential settings, just warn and return None.""" signal = openedx_events.learning.signals.SESSION_LOGIN_COMPLETED - - # With missing essential settings, just warn and return None with warnings.catch_warnings(record=True) as caught_warnings: warnings.simplefilter('always') assert ep.get_producer_for_signal(signal, 'user.id') is None assert len(caught_warnings) == 1 assert str(caught_warnings[0].message).startswith("Cannot configure event-bus-kafka: Missing setting ") - # Creation succeeds when all settings are present + def test_get_producer_for_signal_configured(self): + """Creation succeeds when all settings are present.""" + signal = openedx_events.learning.signals.SESSION_LOGIN_COMPLETED with override_settings( SCHEMA_REGISTRY_URL='http://localhost:12345', SCHEMA_REGISTRY_API_KEY='some_key',