diff --git a/CHANGELOG.rst b/CHANGELOG.rst index a5da0c9f..fc35fbf3 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -16,6 +16,14 @@ Unreleased * +[0.6.1] - 2022-09-02 +******************** + +Added +===== + +* Producer now polls on an interval, improving callback reliability. Configurable with ``EVENT_BUS_KAFKA_POLL_INTERVAL_SEC``. + [0.6.0] - 2022-09-01 ******************** diff --git a/edx_event_bus_kafka/__init__.py b/edx_event_bus_kafka/__init__.py index cb1260e5..57305617 100644 --- a/edx_event_bus_kafka/__init__.py +++ b/edx_event_bus_kafka/__init__.py @@ -8,4 +8,4 @@ from edx_event_bus_kafka.internal.producer import EventProducerKafka, get_producer -__version__ = '0.6.0' +__version__ = '0.6.1' diff --git a/edx_event_bus_kafka/internal/producer.py b/edx_event_bus_kafka/internal/producer.py index ba8084c7..96f144f2 100644 --- a/edx_event_bus_kafka/internal/producer.py +++ b/edx_event_bus_kafka/internal/producer.py @@ -6,9 +6,13 @@ import json import logging +import threading +import time +import weakref from functools import lru_cache from typing import Any, List, Optional +from django.conf import settings from django.dispatch import receiver from django.test.signals import setting_changed from openedx_events.event_bus.avro.serializer import AvroSignalSerializer @@ -167,6 +171,13 @@ class EventProducerKafka(): def __init__(self, producer): self.producer = producer + threading.Thread( + target=poll_indefinitely, + name="kafka-producer-poll", + args=(weakref.ref(self),), # allow GC but also thread auto-stop (important for tests!) + daemon=True, # don't block shutdown + ).start() + def send( self, *, signal: OpenEdxPublicSignal, topic: str, event_key_field: str, event_data: dict, ) -> None: @@ -192,13 +203,11 @@ def send( ) # Opportunistically ensure any pending callbacks from recent event-sends are triggered. - # - # This assumes events come regularly, or that we're not concerned about - # high latency between delivery and callback. If those assumptions are - # false, we should switch to calling poll(1.0) or similar in a loop on - # a separate thread. Or do both. - # - # Issue: https://github.com/openedx/event-bus-kafka/issues/31 + # This ensures that we're polling at least as often as we're producing, which is a + # reasonable balance. However, if events are infrequent, it doesn't ensure that + # callbacks happen in a timely fashion, and the last event emitted before shutdown + # would never get a delivery callback. That's why there's also a thread calling + # poll(0) on a regular interval (see `poll_indefinitely`). self.producer.poll(0) def prepare_for_shutdown(self): @@ -210,6 +219,46 @@ def prepare_for_shutdown(self): self.producer.flush(-1) +def poll_indefinitely(api_weakref: EventProducerKafka): + """ + Poll the producer indefinitely to ensure delivery/stats/etc. callbacks are triggered. + + The thread stops automatically once the producer is garbage-collected. + + This ensures that callbacks are triggered in a timely fashion, rather than waiting + for the poll() call that we make before or after each produce() call. This may be + important if events are produced infrequently, and it allows the last event the + server emits before shutdown to have its callback run (if it happens soon enough.) + """ + # The reason we hold a weakref to the whole EventProducerKafka and + # not directly to the Producer itself is that you just can't make + # a weakref to the latter (perhaps because it's a C object.) + + # .. setting_name: EVENT_BUS_KAFKA_POLL_INTERVAL_SEC + # .. setting_default: 1.0 + # .. setting_description: How frequently to poll the event-bus-kafka producer. This should + # be small enough that there's not too much latency in triggering delivery callbacks once + # a message has been acknowledged, but there's no point in setting it any lower than the + # expected round-trip-time of message delivery and acknowledgement. (100 ms – 5 s is + # probably a reasonable range.) + poll_interval_seconds = getattr(settings, 'EVENT_BUS_KAFKA_POLL_INTERVAL_SEC', 1.0) + while True: + time.sleep(poll_interval_seconds) + + # Temporarily hold a strong ref to the producer API singleton + api_object = api_weakref() + if api_object is None: + return + + try: + api_object.producer.poll(0) + except BaseException: + pass + finally: + # Get rid of that strong ref again + api_object = None + + # Note: This caching is required, since otherwise the Producer will # fall out of scope and be garbage-collected, destroying the # outbound-message queue and threads. The use of this cache allows the diff --git a/edx_event_bus_kafka/internal/tests/test_producer.py b/edx_event_bus_kafka/internal/tests/test_producer.py index b5f37afa..fad2aaca 100644 --- a/edx_event_bus_kafka/internal/tests/test_producer.py +++ b/edx_event_bus_kafka/internal/tests/test_producer.py @@ -2,6 +2,8 @@ Test the event producer code. """ +import gc +import time import warnings from unittest import TestCase from unittest.mock import Mock, patch @@ -137,3 +139,37 @@ def test_send_to_event_bus(self, mock_get_serializers): on_delivery=ep.on_event_deliver, headers={'ce_type': 'org.openedx.learning.auth.session.login.completed.v1'}, ) + + def test_polling_loop(self): + with override_settings( + EVENT_BUS_KAFKA_POLL_INTERVAL_SEC=0.1 + ): + call_count = 0 + + def increment_call_count(*args): + nonlocal call_count + call_count += 1 + + mock_producer = Mock(**{'poll.side_effect': increment_call_count}) + + producer_api = ep.EventProducerKafka(mock_producer) # Created, starts polling + + # Allow a little time to pass and check that the mock poll has been called + time.sleep(1.0) + assert call_count > 0 + print(producer_api) # Use the value here to ensure it isn't GC'd early + + # Allow garbage collection of these objects, then ask for it to happen. + producer_api = None + mock_producer = None + gc.collect() + + # Allow things to settle down post-GC (and let thread + # notice the object is gone), then save off the new call + # count. + time.sleep(0.2) + count_after_gc = call_count + + # Wait a little longer and confirm that the count is no longer rising + time.sleep(1.0) + assert call_count == count_after_gc