From d9948f8910c7bc11aacd802d8283f187c6aac188 Mon Sep 17 00:00:00 2001 From: Tim McCormack Date: Mon, 28 Nov 2022 17:18:57 +0000 Subject: [PATCH] feat!: Implement producer API for openedx-events (#14) Remove caching of `get_producer` since openedx-events will take care of that for us, and rename to `create_producer`. This is part of https://github.com/openedx/openedx-events/issues/87 --- CHANGELOG.rst | 14 +++++++++++ README.rst | 14 +++++++---- edx_event_bus_kafka/__init__.py | 4 ++-- edx_event_bus_kafka/internal/producer.py | 15 ++++-------- .../internal/tests/test_producer.py | 24 ++++++++++++------- .../management/commands/produce_event.py | 4 ++-- requirements/base.in | 3 ++- requirements/base.txt | 2 +- requirements/dev.txt | 2 +- requirements/doc.txt | 2 +- requirements/quality.txt | 2 +- requirements/test.txt | 2 +- 12 files changed, 55 insertions(+), 33 deletions(-) diff --git a/CHANGELOG.rst b/CHANGELOG.rst index 19f5bb3..a435156 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -14,6 +14,20 @@ Change Log Unreleased ********** +[2.0.0] - 2022-11-28 +******************** +Changed +======= +* Implement openedx-events Event Bus Producer API +* **BREAKING CHANGE**: Remove caching from ``get_producer`` and rename to ``create_producer``, as we now rely on the wrapper in openedx-events to cache that call + +Upgrading library from 1.x: + +- Replace calls to ``edx_event_bus_kafka.get_producer`` with ``openedx_events.event_bus.get_producer`` +- Add Django setting ``EVENT_BUS_PRODUCER = "edx_event_bus_kafka.create_producer"`` + +These breaking changes are only relevant for the producing side. (This should only include the CMS at the moment.) + [1.10.0] - 2022-11-21 ********************* Changed diff --git a/README.rst b/README.rst index 6d2c9f2..0b607d6 100644 --- a/README.rst +++ b/README.rst @@ -24,13 +24,19 @@ The documentation/ADRs may also be moved to more appropriate places as the proce The repository works together with the openedx/openedx-events repository to make the fully functional event bus. -For manual testing, see ``__. - Documentation ************* -- Main API: ``edx_event_bus_kafka`` exposes ``get_producer`` and a Producer API class. See ``_ for how these will be documented and used in the future. -- Django management commands: ``edx_event_bus_kafka.management.commands.*`` expose ``Command`` classes +To use this implementation of the Event Bus with openedx-events, set the following Django settings:: + + EVENT_BUS_PRODUCER: edx_event_bus_kafka.create_producer + EVENT_BUS_KAFKA_BOOTSTRAP_SERVERS: ... + EVENT_BUS_KAFKA_SCHEMA_REGISTRY_URL: ... + EVENT_BUS_TOPIC_PREFIX: ... + +For manual testing, see ``__. + +Django management commands: ``edx_event_bus_kafka.management.commands.*`` expose ``Command`` classes OEP-52 documentation: https://open-edx-proposals.readthedocs.io/en/latest/architectural-decisions/oep-0052-arch-event-bus-architecture.html (TODO: `Set up documentation `_) diff --git a/edx_event_bus_kafka/__init__.py b/edx_event_bus_kafka/__init__.py index a407f09..477eb90 100644 --- a/edx_event_bus_kafka/__init__.py +++ b/edx_event_bus_kafka/__init__.py @@ -7,6 +7,6 @@ """ from edx_event_bus_kafka.internal.consumer import KafkaEventConsumer -from edx_event_bus_kafka.internal.producer import KafkaEventProducer, get_producer +from edx_event_bus_kafka.internal.producer import KafkaEventProducer, create_producer -__version__ = '1.10.0' +__version__ = '2.0.0' diff --git a/edx_event_bus_kafka/internal/producer.py b/edx_event_bus_kafka/internal/producer.py index f80a0da..56a9d91 100644 --- a/edx_event_bus_kafka/internal/producer.py +++ b/edx_event_bus_kafka/internal/producer.py @@ -1,7 +1,7 @@ """ Produce Kafka events from signals. -Main function is ``get_producer()``. +Main function is ``create_producer()``, which should be referred to from ``EVENT_BUS_PRODUCER``. """ import json @@ -17,6 +17,7 @@ from django.dispatch import receiver from django.test.signals import setting_changed from edx_django_utils.monitoring import record_exception +from openedx_events.event_bus import EventBusProducer from openedx_events.event_bus.avro.serializer import AvroSignalSerializer from openedx_events.tooling import OpenEdxPublicSignal @@ -211,7 +212,7 @@ def on_event_deliver(self, err, evt): f"partition={evt.partition()}") -class KafkaEventProducer(): +class KafkaEventProducer(EventBusProducer): """ API singleton for event production to Kafka. @@ -328,14 +329,9 @@ def poll_indefinitely(api_weakref: KafkaEventProducer): api_object = None -# Note: This caching is required, since otherwise the Producer will -# fall out of scope and be garbage-collected, destroying the -# outbound-message queue and threads. The use of this cache allows the -# producer to be long-lived. -@lru_cache # will just be one cache entry, in practice -def get_producer() -> Optional[KafkaEventProducer]: +def create_producer() -> Optional[KafkaEventProducer]: """ - Create or retrieve Producer API singleton. + Create a Producer API instance. Caller should cache the returned object. If confluent-kafka library or essential settings are missing, warn and return None. """ @@ -354,4 +350,3 @@ def get_producer() -> Optional[KafkaEventProducer]: def _reset_caches(sender, **kwargs): # pylint: disable=unused-argument """Reset caches when settings change during unit tests.""" get_serializers.cache_clear() - get_producer.cache_clear() diff --git a/edx_event_bus_kafka/internal/tests/test_producer.py b/edx_event_bus_kafka/internal/tests/test_producer.py index 9ef5a6c..acb2434 100644 --- a/edx_event_bus_kafka/internal/tests/test_producer.py +++ b/edx_event_bus_kafka/internal/tests/test_producer.py @@ -8,6 +8,7 @@ from unittest import TestCase from unittest.mock import ANY, Mock, call, patch +import openedx_events.event_bus import openedx_events.learning.signals import pytest from django.test import override_settings @@ -76,17 +77,22 @@ def test_serializers_unconfigured(self): with pytest.raises(Exception, match="missing library or settings"): ep.get_serializers(self.signal, 'user.id') - def test_get_producer_unconfigured(self): + def test_create_producer_unconfigured(self): """With missing essential settings, just warn and return None.""" with warnings.catch_warnings(record=True) as caught_warnings: warnings.simplefilter('always') - assert ep.get_producer() is None + assert ep.create_producer() is None assert len(caught_warnings) == 1 assert str(caught_warnings[0].message).startswith("Cannot configure event-bus-kafka: Missing setting ") - def test_get_producer_configured(self): - """Creation succeeds when all settings are present.""" + def test_create_producer_configured(self): + """ + Creation succeeds when all settings are present. + + Also tests basic compliance with the implementation-loader API in openedx-events. + """ with override_settings( + EVENT_BUS_PRODUCER='edx_event_bus_kafka.create_producer', EVENT_BUS_KAFKA_SCHEMA_REGISTRY_URL='http://localhost:12345', EVENT_BUS_KAFKA_SCHEMA_REGISTRY_API_KEY='some_key', EVENT_BUS_KAFKA_SCHEMA_REGISTRY_API_SECRET='some_secret', @@ -95,7 +101,7 @@ def test_get_producer_configured(self): EVENT_BUS_KAFKA_API_KEY='some_other_key', EVENT_BUS_KAFKA_API_SECRET='some_other_secret', ): - assert isinstance(ep.get_producer(), ep.KafkaEventProducer) + assert isinstance(openedx_events.event_bus.get_producer(), ep.KafkaEventProducer) @patch('edx_event_bus_kafka.internal.producer.logger') def test_on_event_deliver(self, mock_logger): @@ -136,7 +142,7 @@ def test_send_to_event_bus(self, mock_get_serializers): EVENT_BUS_KAFKA_BOOTSTRAP_SERVERS='localhost:54321', EVENT_BUS_TOPIC_PREFIX='prod', ): - producer_api = ep.get_producer() + producer_api = ep.create_producer() with patch.object(producer_api, 'producer', autospec=True) as mock_producer: producer_api.send( signal=self.signal, topic='user-stuff', @@ -166,7 +172,7 @@ def test_full_event_data_present_in_key_extraction_error(self, mock_logger, *arg EVENT_BUS_KAFKA_BOOTSTRAP_SERVERS='localhost:54321', EVENT_BUS_TOPIC_PREFIX='dev', ): - producer_api = ep.get_producer() + producer_api = ep.create_producer() # force an exception with a bad event_key_field producer_api.send(signal=simple_signal, topic='topic', event_key_field='bad_field', event_data={'test_data': SubTestData0(sub_name="name", course_id="id")}) @@ -193,7 +199,7 @@ def test_full_event_data_present_in_kafka_error(self, mock_logger, *args): EVENT_BUS_KAFKA_BOOTSTRAP_SERVERS='localhost:54321', EVENT_BUS_TOPIC_PREFIX='dev', ): - producer_api = ep.get_producer() + producer_api = ep.create_producer() with patch.object(producer_api, 'producer', autospec=True) as mock_producer: # imitate a failed send to Kafka mock_producer.produce = Mock(side_effect=Exception('bad!')) @@ -267,7 +273,7 @@ def increment_call_count(*args): @override_settings(EVENT_BUS_KAFKA_BOOTSTRAP_SERVERS='localhost:54321') @patch('edx_event_bus_kafka.internal.producer.SerializationContext') def test_serialize_and_produce_to_same_topic(self, mock_context): - producer_api = ep.get_producer() + producer_api = ep.create_producer() with patch('edx_event_bus_kafka.internal.producer.AvroSerializer', return_value=lambda _x, _y: b'bytes-here'): with patch.object(producer_api, 'producer', autospec=True) as mock_producer: diff --git a/edx_event_bus_kafka/management/commands/produce_event.py b/edx_event_bus_kafka/management/commands/produce_event.py index da9c5ff..e4088aa 100644 --- a/edx_event_bus_kafka/management/commands/produce_event.py +++ b/edx_event_bus_kafka/management/commands/produce_event.py @@ -10,7 +10,7 @@ from django.core.management.base import BaseCommand from django.utils.module_loading import import_string -from edx_event_bus_kafka.internal.producer import get_producer +from edx_event_bus_kafka.internal.producer import create_producer logger = logging.getLogger(__name__) @@ -52,7 +52,7 @@ def add_arguments(self, parser): def handle(self, *args, **options): try: - producer = get_producer() + producer = create_producer() producer.send( signal=import_string(options['signal'][0]), topic=options['topic'][0], diff --git a/requirements/base.in b/requirements/base.in index 7211f20..5c3b519 100644 --- a/requirements/base.in +++ b/requirements/base.in @@ -2,6 +2,7 @@ -c constraints.txt Django # Web application framework -openedx-events # Events API +# openedx-events 3.1.0 defines producer interface +openedx-events>=3.1.0 # Events API edx_django_utils edx_toggles diff --git a/requirements/base.txt b/requirements/base.txt index 436e3c9..61ada50 100644 --- a/requirements/base.txt +++ b/requirements/base.txt @@ -48,7 +48,7 @@ markupsafe==2.1.1 # via jinja2 newrelic==8.4.0 # via edx-django-utils -openedx-events==3.0.1 +openedx-events==3.1.0 # via -r requirements/base.in pbr==5.11.0 # via stevedore diff --git a/requirements/dev.txt b/requirements/dev.txt index 149b1bf..19cd809 100644 --- a/requirements/dev.txt +++ b/requirements/dev.txt @@ -200,7 +200,7 @@ newrelic==8.4.0 # via # -r requirements/quality.txt # edx-django-utils -openedx-events==3.0.1 +openedx-events==3.1.0 # via -r requirements/quality.txt packaging==21.3 # via diff --git a/requirements/doc.txt b/requirements/doc.txt index 3733043..ec6a2d6 100644 --- a/requirements/doc.txt +++ b/requirements/doc.txt @@ -142,7 +142,7 @@ newrelic==8.4.0 # via # -r requirements/test.txt # edx-django-utils -openedx-events==3.0.1 +openedx-events==3.1.0 # via -r requirements/test.txt packaging==21.3 # via diff --git a/requirements/quality.txt b/requirements/quality.txt index 90f8c09..d309785 100644 --- a/requirements/quality.txt +++ b/requirements/quality.txt @@ -145,7 +145,7 @@ newrelic==8.4.0 # via # -r requirements/test.txt # edx-django-utils -openedx-events==3.0.1 +openedx-events==3.1.0 # via -r requirements/test.txt packaging==21.3 # via diff --git a/requirements/test.txt b/requirements/test.txt index a09ea4c..90fa390 100644 --- a/requirements/test.txt +++ b/requirements/test.txt @@ -87,7 +87,7 @@ newrelic==8.4.0 # via # -r requirements/base.txt # edx-django-utils -openedx-events==3.0.1 +openedx-events==3.1.0 # via -r requirements/base.txt packaging==21.3 # via pytest