From fe987dab6beb5330e0fb1b4bdd8b013454fa63c3 Mon Sep 17 00:00:00 2001 From: Tim McCormack Date: Tue, 23 Aug 2022 13:42:32 +0000 Subject: [PATCH] fix: Never evict producers from cache This should close out https://github.com/openedx/event-bus-kafka/issues/16 by just embracing the use of multiple producers. Also: - Improve cache-clearing during tests by connecting to the setting_changed signal -- this allows knowledge of what's cached to be local to the module where the caching happens, rather than relying on each test module to know about all of the modules it depends on indirectly. - Remove outdated comment about caching, and clarify other caching comments. --- CHANGELOG.rst | 8 +++++ edx_event_bus_kafka/__init__.py | 2 +- .../publishing/event_producer.py | 31 ++++++++++++++----- .../publishing/test_event_producer.py | 5 --- 4 files changed, 33 insertions(+), 13 deletions(-) diff --git a/CHANGELOG.rst b/CHANGELOG.rst index 0c6e884..b12d8d9 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -16,6 +16,14 @@ Unreleased * +[0.4.3] - 2022-08-24 +******************** + +Fixed +===== + +* Never evict producers from cache. There wasn't a real risk of this, but now we can rely on them being long-lived. Addresses remainder of ``__. + [0.4.2] - 2022-08-24 ******************** diff --git a/edx_event_bus_kafka/__init__.py b/edx_event_bus_kafka/__init__.py index 5cd477b..ff0a68e 100644 --- a/edx_event_bus_kafka/__init__.py +++ b/edx_event_bus_kafka/__init__.py @@ -2,4 +2,4 @@ Kafka implementation for Open edX event bus. """ -__version__ = '0.4.2' +__version__ = '0.4.3' diff --git a/edx_event_bus_kafka/publishing/event_producer.py b/edx_event_bus_kafka/publishing/event_producer.py index 0bd8c71..03785f3 100644 --- a/edx_event_bus_kafka/publishing/event_producer.py +++ b/edx_event_bus_kafka/publishing/event_producer.py @@ -9,6 +9,8 @@ from functools import lru_cache from typing import Any, List +from django.dispatch import receiver +from django.test.signals import setting_changed from openedx_events.event_bus.avro.serializer import AvroSignalSerializer from openedx_events.tooling import OpenEdxPublicSignal @@ -115,7 +117,7 @@ def get_serializer(signal: OpenEdxPublicSignal) -> AvroSignalSerializer: """ Get the serializer for a signal. - This is just defined to allow caching of serializers. + This is cached in order to save work re-transforming classes into Avro schemas. """ return AvroSignalSerializer(signal) @@ -124,10 +126,23 @@ def get_serializer(signal: OpenEdxPublicSignal) -> AvroSignalSerializer: # fall out of scope and be garbage-collected, destroying the # outbound-message queue and threads. The use of this cache allows the # producers to be long-lived. +# +# We are also likely to need to iterate through this cache at server +# shutdown in order to flush each of the producers, which means the +# cache needs to never evict. See https://github.com/openedx/event-bus-kafka/issues/11 +# for more details. +# +# (Why not change the code to use a single Producer rather than multiple +# SerializerProducer? Because the code actually turns out to be significantly +# uglier that way due to the number of separate values that need to be passed +# around in bundles. There aren't clear "cut-off" points. Additionally, it +# makes unit testing harder/uglier since now the mocks need to either deal with +# serialized bytes or mock out the serializers. Getting this down to a single +# Producer doesn't really seem worth the trouble.) # return type (Optional[SerializingProducer]) removed from signature to avoid error on import -@lru_cache +@lru_cache(maxsize=None) # Never evict an entry -- it's a small set and we need to keep all of them. def get_producer_for_signal(signal: OpenEdxPublicSignal, event_key_field: str): """ Create the producer for a signal and a key field path. @@ -141,11 +156,6 @@ def get_producer_for_signal(signal: OpenEdxPublicSignal, event_key_field: str): Returns: None if confluent_kafka is not defined or the settings are invalid. SerializingProducer if it is. - - Performance note: - This could be cached, but requires care such that it allows changes to settings via - remote-config (and in particular does not result in mixed cache/uncached configuration). - This complexity is being deferred until this becomes a performance issue. """ if not confluent_kafka: # pragma: no cover logger.warning('Library confluent-kafka not available. Cannot create event producer.') @@ -245,3 +255,10 @@ def send_to_event_bus( # # Docs: https://github.com/edenhill/librdkafka/blob/4faeb8132521da70b6bcde14423a14eb7ed5c55e/src/rdkafka.h#L3079 producer.poll(0) + + +@receiver(setting_changed) +def _reset_caches(sender, **kwargs): # pylint: disable=unused-argument + """Reset caches during testing when settings change.""" + get_serializer.cache_clear() + get_producer_for_signal.cache_clear() diff --git a/edx_event_bus_kafka/publishing/test_event_producer.py b/edx_event_bus_kafka/publishing/test_event_producer.py index 4e2f206..81521bc 100644 --- a/edx_event_bus_kafka/publishing/test_event_producer.py +++ b/edx_event_bus_kafka/publishing/test_event_producer.py @@ -24,11 +24,6 @@ class TestEventProducer(TestCase): """Test producer.""" - def setUp(self): - super().setUp() - ep.get_producer_for_signal.cache_clear() - ep.get_serializer.cache_clear() - def test_extract_event_key(self): event_data = { 'user': UserData(