Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Cache producers so that they don't lose data #21

Merged
merged 1 commit into from
Aug 9, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,16 @@ Unreleased

*

[0.2.0] - 2022-08-09
~~~~~~~~~~~~~~~~~~~~

Fixed
_____

* Cache producers so that they don't lose data.

[0.1.0] - 2022-06-16
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
~~~~~~~~~~~~~~~~~~~~

Added
_____
Expand Down
2 changes: 1 addition & 1 deletion edx_event_bus_kafka/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@
Kafka implementation for Open edX event bus.
"""

__version__ = '0.1.1'
__version__ = '0.2.0'
5 changes: 5 additions & 0 deletions edx_event_bus_kafka/publishing/event_producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,11 @@ def get_serializer(signal: OpenEdxPublicSignal) -> AvroSignalSerializer:
return AvroSignalSerializer(signal)


# Note: This caching is required, since otherwise the Producer will
# fall out of scope and be garbage-collected, destroying the
# outbound-message queue and threads. The use of this cache allows the
# producers to be long-lived.
@lru_cache
def get_producer_for_signal(signal: OpenEdxPublicSignal, event_key_field: str) -> Optional[SerializingProducer]:
"""
Create the producer for a signal and a key field path.
Expand Down
14 changes: 10 additions & 4 deletions edx_event_bus_kafka/publishing/test_event_producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,11 @@
class TestEventProducer(TestCase):
"""Test producer."""

def setUp(self):
super().setUp()
ep.get_producer_for_signal.cache_clear()
ep.get_serializer.cache_clear()

def test_extract_event_key(self):
event_data = {
'user': UserData(
Expand Down Expand Up @@ -56,17 +61,18 @@ def test_extract_key_schema(self):
schema = ep.extract_key_schema(AvroSignalSerializer(signal), 'user.pii.username')
assert schema == '{"name": "username", "type": "string"}'

def test_get_producer_for_signal(self):
def test_get_producer_for_signal_unconfigured(self):
"""With missing essential settings, just warn and return None."""
signal = openedx_events.learning.signals.SESSION_LOGIN_COMPLETED

# With missing essential settings, just warn and return None
with warnings.catch_warnings(record=True) as caught_warnings:
warnings.simplefilter('always')
assert ep.get_producer_for_signal(signal, 'user.id') is None
assert len(caught_warnings) == 1
assert str(caught_warnings[0].message).startswith("Cannot configure event-bus-kafka: Missing setting ")

# Creation succeeds when all settings are present
def test_get_producer_for_signal_configured(self):
"""Creation succeeds when all settings are present."""
signal = openedx_events.learning.signals.SESSION_LOGIN_COMPLETED
with override_settings(
SCHEMA_REGISTRY_URL='http://localhost:12345',
SCHEMA_REGISTRY_API_KEY='some_key',
Expand Down