Skip to content

Commit

Permalink
fix: Use non-blocking poll after production; allow flush for testing (#…
Browse files Browse the repository at this point in the history
…12)

I believe a nullary call to poll uses a default timeout of -1 (wait
indefinitely), but we really just want to make sure that pending callbacks
are triggered for the acks that have been buffered in the background, from
previous events. poll(0) will not block if the buffer is empty.

For testing we need to call flush(-1), so add sync=True as an option.

Documentation for `rd_kafka_poll`:
https://github.com/edenhill/librdkafka/blob/4faeb8132521da70b6bcde14423a14eb7ed5c55e/src/rdkafka.h#L3079

This addresses part of #10
  • Loading branch information
timmc-edx authored and whuang1202 committed Aug 10, 2022
1 parent 2a4150e commit c3f33d1
Show file tree
Hide file tree
Showing 3 changed files with 81 additions and 4 deletions.
59 changes: 59 additions & 0 deletions edx_event_bus_kafka/config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
"""
Configuration loading and validation.
"""

from typing import Optional

from attrs import attr
from confluent_kafka.schema_registry import SchemaRegistryClient
from django.conf import settings


@attr.s
class CommonConfig:
settings = attr.ib(type=dict)


def create_schema_registry_client() -> Optional[SchemaRegistryClient]:
"""
Create a schema registry client from common settings.
"""
url = getattr(settings, 'EVENT_BUS_KAFKA_SCHEMA_REGISTRY_URL', None)
if url is None:
warnings.warn("Cannot configure event-bus-kafka: Missing setting EVENT_BUS_KAFKA_SCHEMA_REGISTRY_URL")
return None

key = getattr(settings, 'EVENT_BUS_KAFKA_SCHEMA_REGISTRY_API_KEY', '')
secret = getattr(settings, 'EVENT_BUS_KAFKA_SCHEMA_REGISTRY_API_SECRET', '')

return SchemaRegistryClient({
'url': url,
'basic.auth.user.info': f"{key}:{secret}",
})


def load_common_settings() -> Optional[dict]:
"""
Load common settings, a base for either producer or consumer configuration.
"""
bootstrap_servers = getattr(settings, 'EVENT_BUS_KAFKA_BOOTSTRAP_SERVERS', None)
if bootstrap_servers is None:
warnings.warn("Cannot configure event-bus-kafka: Missing setting EVENT_BUS_KAFKA_BOOTSTRAP_SERVERS")
return None

settings = {
'bootstrap.servers': bootstrap_servers,
}

key = getattr(settings, 'EVENT_BUS_KAFKA_API_KEY', None)
secret = getattr(settings, 'EVENT_BUS_KAFKA_API_SECRET', None)

if key and secret:
settings.update({
'sasl.mechanism': 'PLAIN',
'security.protocol': 'SASL_SSL',
'sasl.username': key,
'sasl.password': secret,
})

return settings
1 change: 1 addition & 0 deletions edx_event_bus_kafka/management/commands/produce_event.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ def handle(self, *args, **options):
topic=options['topic'][0],
event_key_field=options['key_field'][0],
event_data=json.loads(options['data'][0]),
sync=True, # otherwise command may exit before delivery is complete
)
except Exception: # pylint: disable=broad-except
logger.exception("Error producing Kafka event")
25 changes: 21 additions & 4 deletions edx_event_bus_kafka/publishing/event_producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,10 @@ def on_event_deliver(err, evt):
f"partition={evt.partition()}")


def send_to_event_bus(signal: OpenEdxPublicSignal, topic: str, event_key_field: str, event_data: dict) -> None:
def send_to_event_bus(
signal: OpenEdxPublicSignal, topic: str, event_key_field: str, event_data: dict,
sync: bool = False,
) -> None:
"""
Send a signal event to the event bus under the specified topic.
Expand All @@ -215,6 +218,8 @@ def send_to_event_bus(signal: OpenEdxPublicSignal, topic: str, event_key_field:
event_key_field: Path to the event data field to use as the event key (period-delimited
string naming the dictionary keys to descend)
event_data: The event data (kwargs) sent to the signal
sync: Whether to wait indefinitely for event to be received by the message bus (probably
only want to use this for testing)
"""
producer = get_producer_for_signal(signal, event_key_field)
if producer is None: # Note: SerializingProducer has False truthiness when len() == 0
Expand All @@ -224,6 +229,18 @@ def send_to_event_bus(signal: OpenEdxPublicSignal, topic: str, event_key_field:
producer.produce(topic, key=event_key, value=event_data,
on_delivery=on_event_deliver,
headers={EVENT_TYPE_HEADER_KEY: signal.event_type})
# TODO (EventBus): Investigate poll() vs. flush(), and other related settings:
# See https://github.com/openedx/event-bus-kafka/issues/10
producer.poll() # wait indefinitely for the above event to either be delivered or fail

if sync:
# Wait for all buffered events to send, then wait for all of
# them to be acknowledged, and trigger all callbacks.
producer.flush(-1)
else:
# Opportunistically ensure any pending callbacks from recent events are triggered.
#
# This assumes events come regularly, or that we're not concerned about
# high latency between delivery and callback. If those assumptions are
# false, we should switch to calling poll(1.0) or similar in a loop on
# a separate thread.
#
# Docs: https://github.com/edenhill/librdkafka/blob/4faeb8132521da70b6bcde14423a14eb7ed5c55e/src/rdkafka.h#L3079
producer.poll(0)

0 comments on commit c3f33d1

Please sign in to comment.