Skip to content

Commit

Permalink
feat: Poll producer on a regular cadence to avoid delayed callbacks
Browse files Browse the repository at this point in the history
Fixes #31
  • Loading branch information
timmc-edx committed Sep 1, 2022
1 parent 3d7c519 commit 923aa50
Show file tree
Hide file tree
Showing 4 changed files with 101 additions and 8 deletions.
8 changes: 8 additions & 0 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,14 @@ Unreleased

*

[0.6.1] - 2022-09-02
********************

Added
=====

* Producer now polls on an interval, improving callback reliability. Configurable with ``EVENT_BUS_KAFKA_POLL_INTERVAL_SEC``.

[0.6.0] - 2022-09-01
********************

Expand Down
2 changes: 1 addition & 1 deletion edx_event_bus_kafka/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,4 @@

from edx_event_bus_kafka.internal.producer import EventProducerKafka, get_producer

__version__ = '0.6.0'
__version__ = '0.6.1'
63 changes: 56 additions & 7 deletions edx_event_bus_kafka/internal/producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,13 @@

import json
import logging
import threading
import time
import weakref
from functools import lru_cache
from typing import Any, List, Optional

from django.conf import settings
from django.dispatch import receiver
from django.test.signals import setting_changed
from openedx_events.event_bus.avro.serializer import AvroSignalSerializer
Expand Down Expand Up @@ -167,6 +171,13 @@ class EventProducerKafka():
def __init__(self, producer):
self.producer = producer

threading.Thread(
target=poll_indefinitely,
name="kafka-producer-poll",
args=(weakref.ref(self),), # allow GC but also thread auto-stop (important for tests!)
daemon=True, # don't block shutdown
).start()

def send(
self, *, signal: OpenEdxPublicSignal, topic: str, event_key_field: str, event_data: dict,
) -> None:
Expand All @@ -192,13 +203,11 @@ def send(
)

# Opportunistically ensure any pending callbacks from recent event-sends are triggered.
#
# This assumes events come regularly, or that we're not concerned about
# high latency between delivery and callback. If those assumptions are
# false, we should switch to calling poll(1.0) or similar in a loop on
# a separate thread. Or do both.
#
# Issue: https://github.com/openedx/event-bus-kafka/issues/31
# This ensures that we're polling at least as often as we're producing, which is a
# reasonable balance. However, if events are infrequent, it doesn't ensure that
# callbacks happen in a timely fashion, and the last event emitted before shutdown
# would never get a delivery callback. That's why there's also a thread calling
# poll(0) on a regular interval (see `poll_indefinitely`).
self.producer.poll(0)

def prepare_for_shutdown(self):
Expand All @@ -210,6 +219,46 @@ def prepare_for_shutdown(self):
self.producer.flush(-1)


def poll_indefinitely(api_weakref: EventProducerKafka):
"""
Poll the producer indefinitely to ensure delivery/stats/etc. callbacks are triggered.
The thread stops automatically once the producer is garbage-collected.
This ensures that callbacks are triggered in a timely fashion, rather than waiting
for the poll() call that we make before or after each produce() call. This may be
important if events are produced infrequently, and it allows the last event the
server emits before shutdown to have its callback run (if it happens soon enough.)
"""
# The reason we hold a weakref to the whole EventProducerKafka and
# not directly to the Producer itself is that you just can't make
# a weakref to the latter (perhaps because it's a C object.)

# .. setting_name: EVENT_BUS_KAFKA_POLL_INTERVAL_SEC
# .. setting_default: 1.0
# .. setting_description: How frequently to poll the event-bus-kafka producer. This should
# be small enough that there's not too much latency in triggering delivery callbacks once
# a message has been acknowledged, but there's no point in setting it any lower than the
# expected round-trip-time of message delivery and acknowledgement. (100 ms – 5 s is
# probably a reasonable range.)
poll_interval_seconds = getattr(settings, 'EVENT_BUS_KAFKA_POLL_INTERVAL_SEC', 1.0)
while True:
time.sleep(poll_interval_seconds)

# Temporarily hold a strong ref to the producer API singleton
api_object = api_weakref()
if api_object is None:
return

try:
api_object.producer.poll(0)
except BaseException:
pass
finally:
# Get rid of that strong ref again
api_object = None


# Note: This caching is required, since otherwise the Producer will
# fall out of scope and be garbage-collected, destroying the
# outbound-message queue and threads. The use of this cache allows the
Expand Down
36 changes: 36 additions & 0 deletions edx_event_bus_kafka/internal/tests/test_producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
Test the event producer code.
"""

import gc
import time
import warnings
from unittest import TestCase
from unittest.mock import Mock, patch
Expand Down Expand Up @@ -137,3 +139,37 @@ def test_send_to_event_bus(self, mock_get_serializers):
on_delivery=ep.on_event_deliver,
headers={'ce_type': 'org.openedx.learning.auth.session.login.completed.v1'},
)

def test_polling_loop(self):
with override_settings(
EVENT_BUS_KAFKA_POLL_INTERVAL_SEC=0.1
):
call_count = 0

def increment_call_count(*args):
nonlocal call_count
call_count += 1

mock_producer = Mock(**{'poll.side_effect': increment_call_count})

producer_api = ep.EventProducerKafka(mock_producer) # Created, starts polling

# Allow a little time to pass and check that the mock poll has been called
time.sleep(1.0)
assert call_count > 0
print(producer_api) # Use the value here to ensure it isn't GC'd early

# Allow garbage collection of these objects, then ask for it to happen.
producer_api = None
mock_producer = None
gc.collect()

# Allow things to settle down post-GC (and let thread
# notice the object is gone), then save off the new call
# count.
time.sleep(0.2)
count_after_gc = call_count

# Wait a little longer and confirm that the count is no longer rising
time.sleep(1.0)
assert call_count == count_after_gc

0 comments on commit 923aa50

Please sign in to comment.