Skip to content

Commit

Permalink
feat!: Implement producer API for openedx-events (#14)
Browse files Browse the repository at this point in the history
Remove caching of `get_producer` since openedx-events will take care of
that for us, and rename to `create_producer`.

This is part of openedx/openedx-events#87
  • Loading branch information
timmc-edx authored Nov 28, 2022
1 parent 67735df commit d9948f8
Show file tree
Hide file tree
Showing 12 changed files with 55 additions and 33 deletions.
14 changes: 14 additions & 0 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,20 @@ Change Log
Unreleased
**********

[2.0.0] - 2022-11-28
********************
Changed
=======
* Implement openedx-events Event Bus Producer API
* **BREAKING CHANGE**: Remove caching from ``get_producer`` and rename to ``create_producer``, as we now rely on the wrapper in openedx-events to cache that call

Upgrading library from 1.x:

- Replace calls to ``edx_event_bus_kafka.get_producer`` with ``openedx_events.event_bus.get_producer``
- Add Django setting ``EVENT_BUS_PRODUCER = "edx_event_bus_kafka.create_producer"``

These breaking changes are only relevant for the producing side. (This should only include the CMS at the moment.)

[1.10.0] - 2022-11-21
*********************
Changed
Expand Down
14 changes: 10 additions & 4 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,19 @@ The documentation/ADRs may also be moved to more appropriate places as the proce

The repository works together with the openedx/openedx-events repository to make the fully functional event bus.

For manual testing, see `<docs/how_tos/manual_testing.rst>`__.

Documentation
*************

- Main API: ``edx_event_bus_kafka`` exposes ``get_producer`` and a Producer API class. See `<https://github.com/openedx/openedx-events/issues/87>`_ for how these will be documented and used in the future.
- Django management commands: ``edx_event_bus_kafka.management.commands.*`` expose ``Command`` classes
To use this implementation of the Event Bus with openedx-events, set the following Django settings::

EVENT_BUS_PRODUCER: edx_event_bus_kafka.create_producer
EVENT_BUS_KAFKA_BOOTSTRAP_SERVERS: ...
EVENT_BUS_KAFKA_SCHEMA_REGISTRY_URL: ...
EVENT_BUS_TOPIC_PREFIX: ...

For manual testing, see `<docs/how_tos/manual_testing.rst>`__.

Django management commands: ``edx_event_bus_kafka.management.commands.*`` expose ``Command`` classes

OEP-52 documentation: https://open-edx-proposals.readthedocs.io/en/latest/architectural-decisions/oep-0052-arch-event-bus-architecture.html
(TODO: `Set up documentation <https://openedx.atlassian.net/wiki/spaces/DOC/pages/21627535/Publish+Documentation+on+Read+the+Docs>`_)
Expand Down
4 changes: 2 additions & 2 deletions edx_event_bus_kafka/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,6 @@
"""

from edx_event_bus_kafka.internal.consumer import KafkaEventConsumer
from edx_event_bus_kafka.internal.producer import KafkaEventProducer, get_producer
from edx_event_bus_kafka.internal.producer import KafkaEventProducer, create_producer

__version__ = '1.10.0'
__version__ = '2.0.0'
15 changes: 5 additions & 10 deletions edx_event_bus_kafka/internal/producer.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
"""
Produce Kafka events from signals.
Main function is ``get_producer()``.
Main function is ``create_producer()``, which should be referred to from ``EVENT_BUS_PRODUCER``.
"""

import json
Expand All @@ -17,6 +17,7 @@
from django.dispatch import receiver
from django.test.signals import setting_changed
from edx_django_utils.monitoring import record_exception
from openedx_events.event_bus import EventBusProducer
from openedx_events.event_bus.avro.serializer import AvroSignalSerializer
from openedx_events.tooling import OpenEdxPublicSignal

Expand Down Expand Up @@ -211,7 +212,7 @@ def on_event_deliver(self, err, evt):
f"partition={evt.partition()}")


class KafkaEventProducer():
class KafkaEventProducer(EventBusProducer):
"""
API singleton for event production to Kafka.
Expand Down Expand Up @@ -328,14 +329,9 @@ def poll_indefinitely(api_weakref: KafkaEventProducer):
api_object = None


# Note: This caching is required, since otherwise the Producer will
# fall out of scope and be garbage-collected, destroying the
# outbound-message queue and threads. The use of this cache allows the
# producer to be long-lived.
@lru_cache # will just be one cache entry, in practice
def get_producer() -> Optional[KafkaEventProducer]:
def create_producer() -> Optional[KafkaEventProducer]:
"""
Create or retrieve Producer API singleton.
Create a Producer API instance. Caller should cache the returned object.
If confluent-kafka library or essential settings are missing, warn and return None.
"""
Expand All @@ -354,4 +350,3 @@ def get_producer() -> Optional[KafkaEventProducer]:
def _reset_caches(sender, **kwargs): # pylint: disable=unused-argument
"""Reset caches when settings change during unit tests."""
get_serializers.cache_clear()
get_producer.cache_clear()
24 changes: 15 additions & 9 deletions edx_event_bus_kafka/internal/tests/test_producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from unittest import TestCase
from unittest.mock import ANY, Mock, call, patch

import openedx_events.event_bus
import openedx_events.learning.signals
import pytest
from django.test import override_settings
Expand Down Expand Up @@ -76,17 +77,22 @@ def test_serializers_unconfigured(self):
with pytest.raises(Exception, match="missing library or settings"):
ep.get_serializers(self.signal, 'user.id')

def test_get_producer_unconfigured(self):
def test_create_producer_unconfigured(self):
"""With missing essential settings, just warn and return None."""
with warnings.catch_warnings(record=True) as caught_warnings:
warnings.simplefilter('always')
assert ep.get_producer() is None
assert ep.create_producer() is None
assert len(caught_warnings) == 1
assert str(caught_warnings[0].message).startswith("Cannot configure event-bus-kafka: Missing setting ")

def test_get_producer_configured(self):
"""Creation succeeds when all settings are present."""
def test_create_producer_configured(self):
"""
Creation succeeds when all settings are present.
Also tests basic compliance with the implementation-loader API in openedx-events.
"""
with override_settings(
EVENT_BUS_PRODUCER='edx_event_bus_kafka.create_producer',
EVENT_BUS_KAFKA_SCHEMA_REGISTRY_URL='http://localhost:12345',
EVENT_BUS_KAFKA_SCHEMA_REGISTRY_API_KEY='some_key',
EVENT_BUS_KAFKA_SCHEMA_REGISTRY_API_SECRET='some_secret',
Expand All @@ -95,7 +101,7 @@ def test_get_producer_configured(self):
EVENT_BUS_KAFKA_API_KEY='some_other_key',
EVENT_BUS_KAFKA_API_SECRET='some_other_secret',
):
assert isinstance(ep.get_producer(), ep.KafkaEventProducer)
assert isinstance(openedx_events.event_bus.get_producer(), ep.KafkaEventProducer)

@patch('edx_event_bus_kafka.internal.producer.logger')
def test_on_event_deliver(self, mock_logger):
Expand Down Expand Up @@ -136,7 +142,7 @@ def test_send_to_event_bus(self, mock_get_serializers):
EVENT_BUS_KAFKA_BOOTSTRAP_SERVERS='localhost:54321',
EVENT_BUS_TOPIC_PREFIX='prod',
):
producer_api = ep.get_producer()
producer_api = ep.create_producer()
with patch.object(producer_api, 'producer', autospec=True) as mock_producer:
producer_api.send(
signal=self.signal, topic='user-stuff',
Expand Down Expand Up @@ -166,7 +172,7 @@ def test_full_event_data_present_in_key_extraction_error(self, mock_logger, *arg
EVENT_BUS_KAFKA_BOOTSTRAP_SERVERS='localhost:54321',
EVENT_BUS_TOPIC_PREFIX='dev',
):
producer_api = ep.get_producer()
producer_api = ep.create_producer()
# force an exception with a bad event_key_field
producer_api.send(signal=simple_signal, topic='topic', event_key_field='bad_field',
event_data={'test_data': SubTestData0(sub_name="name", course_id="id")})
Expand All @@ -193,7 +199,7 @@ def test_full_event_data_present_in_kafka_error(self, mock_logger, *args):
EVENT_BUS_KAFKA_BOOTSTRAP_SERVERS='localhost:54321',
EVENT_BUS_TOPIC_PREFIX='dev',
):
producer_api = ep.get_producer()
producer_api = ep.create_producer()
with patch.object(producer_api, 'producer', autospec=True) as mock_producer:
# imitate a failed send to Kafka
mock_producer.produce = Mock(side_effect=Exception('bad!'))
Expand Down Expand Up @@ -267,7 +273,7 @@ def increment_call_count(*args):
@override_settings(EVENT_BUS_KAFKA_BOOTSTRAP_SERVERS='localhost:54321')
@patch('edx_event_bus_kafka.internal.producer.SerializationContext')
def test_serialize_and_produce_to_same_topic(self, mock_context):
producer_api = ep.get_producer()
producer_api = ep.create_producer()
with patch('edx_event_bus_kafka.internal.producer.AvroSerializer',
return_value=lambda _x, _y: b'bytes-here'):
with patch.object(producer_api, 'producer', autospec=True) as mock_producer:
Expand Down
4 changes: 2 additions & 2 deletions edx_event_bus_kafka/management/commands/produce_event.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from django.core.management.base import BaseCommand
from django.utils.module_loading import import_string

from edx_event_bus_kafka.internal.producer import get_producer
from edx_event_bus_kafka.internal.producer import create_producer

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -52,7 +52,7 @@ def add_arguments(self, parser):

def handle(self, *args, **options):
try:
producer = get_producer()
producer = create_producer()
producer.send(
signal=import_string(options['signal'][0]),
topic=options['topic'][0],
Expand Down
3 changes: 2 additions & 1 deletion requirements/base.in
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
-c constraints.txt

Django # Web application framework
openedx-events # Events API
# openedx-events 3.1.0 defines producer interface
openedx-events>=3.1.0 # Events API
edx_django_utils
edx_toggles
2 changes: 1 addition & 1 deletion requirements/base.txt
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ markupsafe==2.1.1
# via jinja2
newrelic==8.4.0
# via edx-django-utils
openedx-events==3.0.1
openedx-events==3.1.0
# via -r requirements/base.in
pbr==5.11.0
# via stevedore
Expand Down
2 changes: 1 addition & 1 deletion requirements/dev.txt
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ newrelic==8.4.0
# via
# -r requirements/quality.txt
# edx-django-utils
openedx-events==3.0.1
openedx-events==3.1.0
# via -r requirements/quality.txt
packaging==21.3
# via
Expand Down
2 changes: 1 addition & 1 deletion requirements/doc.txt
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ newrelic==8.4.0
# via
# -r requirements/test.txt
# edx-django-utils
openedx-events==3.0.1
openedx-events==3.1.0
# via -r requirements/test.txt
packaging==21.3
# via
Expand Down
2 changes: 1 addition & 1 deletion requirements/quality.txt
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ newrelic==8.4.0
# via
# -r requirements/test.txt
# edx-django-utils
openedx-events==3.0.1
openedx-events==3.1.0
# via -r requirements/test.txt
packaging==21.3
# via
Expand Down
2 changes: 1 addition & 1 deletion requirements/test.txt
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ newrelic==8.4.0
# via
# -r requirements/base.txt
# edx-django-utils
openedx-events==3.0.1
openedx-events==3.1.0
# via -r requirements/base.txt
packaging==21.3
# via pytest
Expand Down

0 comments on commit d9948f8

Please sign in to comment.