Skip to content

Commit

Permalink
feat: Add topic prefix when configured (EVENT_BUS_TOPIC_PREFIX) (#42)
Browse files Browse the repository at this point in the history
Closes #35

Also add missing test for consume-loop.
  • Loading branch information
timmc-edx authored Sep 8, 2022
1 parent 7dad050 commit bb4262f
Show file tree
Hide file tree
Showing 8 changed files with 96 additions and 9 deletions.
8 changes: 8 additions & 0 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,14 @@ Unreleased

*

[0.6.2] - 2022-09-08
********************

Added
=====

* Topic names can be autoprefixed by setting ``EVENT_BUS_TOPIC_PREFIX``

[0.6.1] - 2022-09-06
********************

Expand Down
2 changes: 1 addition & 1 deletion edx_event_bus_kafka/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,4 @@

from edx_event_bus_kafka.internal.producer import EventProducerKafka, get_producer

__version__ = '0.6.1'
__version__ = '0.6.2'
18 changes: 18 additions & 0 deletions edx_event_bus_kafka/internal/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,24 @@ def load_common_settings() -> Optional[dict]:
return base_settings


def get_full_topic(base_topic: str) -> str:
"""
Given a base topic name, add a prefix (if configured).
"""
# .. setting_name: EVENT_BUS_TOPIC_PREFIX
# .. setting_default: None
# .. setting_description: If provided, add this as a prefix to any topic names (delimited by a hyphen)
# when either producing or consuming events. This can be used to support separation of environments,
# e.g. if multiple staging or test environments are sharing a cluster. For example, if the base topic
# name is "user-logins", then if EVENT_BUS_TOPIC_PREFIX=stage, the producer and consumer would instead
# work with the topic "stage-user-logins".
topic_prefix = getattr(settings, 'EVENT_BUS_TOPIC_PREFIX', None)
if topic_prefix:
return f"{topic_prefix}-{base_topic}"
else:
return base_topic


@receiver(setting_changed)
def _reset_state(sender, **kwargs): # pylint: disable=unused-argument
"""Reset caches when settings change during unit tests."""
Expand Down
5 changes: 3 additions & 2 deletions edx_event_bus_kafka/internal/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
from openedx_events.learning.signals import SESSION_LOGIN_COMPLETED
from openedx_events.tooling import OpenEdxPublicSignal

from .config import get_schema_registry_client, load_common_settings
from .config import get_full_topic, get_schema_registry_client, load_common_settings

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -99,7 +99,8 @@ def consume_indefinitely(self):
"""

try:
self.consumer.subscribe([self.topic])
full_topic = get_full_topic(self.topic)
self.consumer.subscribe([full_topic])

# TODO (EventBus):
# 1. Is there an elegant way to exit the loop?
Expand Down
6 changes: 4 additions & 2 deletions edx_event_bus_kafka/internal/producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
from openedx_events.event_bus.avro.serializer import AvroSignalSerializer
from openedx_events.tooling import OpenEdxPublicSignal

from .config import get_schema_registry_client, load_common_settings
from .config import get_full_topic, get_schema_registry_client, load_common_settings

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -198,8 +198,10 @@ def send(
key_bytes = key_serializer(event_key, SerializationContext(topic, MessageField.KEY, headers))
value_bytes = value_serializer(event_data, SerializationContext(topic, MessageField.VALUE, headers))

full_topic = get_full_topic(topic)

self.producer.produce(
topic, key=key_bytes, value=value_bytes, headers=headers, on_delivery=on_event_deliver,
full_topic, key=key_bytes, value=value_bytes, headers=headers, on_delivery=on_event_deliver,
)

# Opportunistically ensure any pending callbacks from recent event-sends are triggered.
Expand Down
17 changes: 17 additions & 0 deletions edx_event_bus_kafka/internal/tests/test_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,3 +58,20 @@ def test_full(self):
'sasl.username': 'some_other_key',
'sasl.password': 'some_other_secret',
}


class TestTopicPrefixing(TestCase):
"""
Test autoprefixing of base topic.
"""
def test_no_prefix(self):
assert config.get_full_topic('user-logins') == 'user-logins'

@override_settings(EVENT_BUS_TOPIC_PREFIX='')
def test_empty_string_prefix(self):
"""Check that empty string is treated the same as None."""
assert config.get_full_topic('user-logins') == 'user-logins'

@override_settings(EVENT_BUS_TOPIC_PREFIX='stage')
def test_regular_prefix(self):
assert config.get_full_topic('user-logins') == 'stage-user-logins'
44 changes: 42 additions & 2 deletions edx_event_bus_kafka/internal/tests/test_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@
"""

import copy
from unittest.mock import Mock, patch
from unittest.mock import Mock, call, patch

import pytest
from django.core.management import call_command
from django.test import TestCase
from django.test.utils import override_settings
Expand Down Expand Up @@ -82,7 +83,46 @@ def setUp(self):
error=None,
)
self.mock_signal = Mock(event_type=self.signal_type, init_data={})
self.event_consumer = KafkaEventConsumer('test_topic', 'test_group_id', self.mock_signal)
self.event_consumer = KafkaEventConsumer('some-topic', 'test_group_id', self.mock_signal)

@override_settings(
EVENT_BUS_KAFKA_SCHEMA_REGISTRY_URL='http://localhost:12345',
EVENT_BUS_KAFKA_BOOTSTRAP_SERVERS='localhost:54321',
EVENT_BUS_TOPIC_PREFIX='prod',
)
def test_consume_loop(self):
"""
Check the basic loop lifecycle.
"""
poll_call_count = 0

def fake_poll(*args, **kwargs):
nonlocal poll_call_count
poll_call_count += 1
# Return normally twice (to show looping), then throw (to
# show that we're not actually handling exceptions, at
# least at the moment.) If we start suppressing
# exceptions, we'll need some other way to break the loop
# for this test.
if poll_call_count >= 3:
raise Exception("something broke")
return self.normal_message

with patch.object(self.event_consumer, 'process_single_message') as mock_process:
mock_consumer = Mock(**{'poll.side_effect': fake_poll}, autospec=True)
self.event_consumer.consumer = mock_consumer
with pytest.raises(Exception, match="something broke"):
self.event_consumer.consume_indefinitely()

# Check that each of the mocked out methods got called as expected.
mock_consumer.subscribe.assert_called_once_with(['prod-some-topic'])
assert mock_consumer.poll.call_args_list == [
call(timeout=1.0), call(timeout=1.0), call(timeout=1.0)
]
assert mock_process.call_args_list == [
call(self.normal_message), call(self.normal_message)
]
mock_consumer.close.assert_called_once_with()

def test_emit(self):
with patch.object(OpenEdxPublicSignal, 'get_signal_by_type', return_value=self.mock_signal) as mock_lookup:
Expand Down
5 changes: 3 additions & 2 deletions edx_event_bus_kafka/internal/tests/test_producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,18 +124,19 @@ def test_send_to_event_bus(self, mock_get_serializers):
with override_settings(
EVENT_BUS_KAFKA_SCHEMA_REGISTRY_URL='http://localhost:12345',
EVENT_BUS_KAFKA_BOOTSTRAP_SERVERS='localhost:54321',
EVENT_BUS_TOPIC_PREFIX='prod',
):
producer_api = ep.get_producer()
with patch.object(producer_api, 'producer', autospec=True) as mock_producer:
producer_api.send(
signal=self.signal, topic='user_stuff',
signal=self.signal, topic='user-stuff',
event_key_field='user.id', event_data=self.event_data
)

mock_get_serializers.assert_called_once_with(self.signal, 'user.id')

mock_producer.produce.assert_called_once_with(
'user_stuff', key=b'key-bytes-here', value=b'value-bytes-here',
'prod-user-stuff', key=b'key-bytes-here', value=b'value-bytes-here',
on_delivery=ep.on_event_deliver,
headers={'ce_type': 'org.openedx.learning.auth.session.login.completed.v1'},
)
Expand Down

0 comments on commit bb4262f

Please sign in to comment.