Skip to content

Commit

Permalink
feat: Implement impl-loading API for openedx-events
Browse files Browse the repository at this point in the history
  • Loading branch information
timmc-edx committed Aug 22, 2022
1 parent 701098b commit bd08d1e
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 2 deletions.
30 changes: 30 additions & 0 deletions edx_event_bus_kafka/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,34 @@
Kafka implementation for Open edX event bus.
"""

from openedx_events.event_bus import EventBus

from edx_event_bus_kafka.consumer.event_consumer import consume_indefinitely
from edx_event_bus_kafka.publishing.event_producer import send_to_event_bus

__version__ = '0.4.1'


class EventBusKafka(EventBus):
"""Kafka implementation of the EventBus API."""

def __init__(config):
... # TODO Read config here and create producer and consumer?

def send(
self, *,
signal: OpenEdxPublicSignal, topic: str, event_key_field: str, event_data: dict, sync: bool
) -> None:
return send_to_event_bus(
signal=signal, topic=topic, event_key_field=event_key_field, event_data=event_data, sync=sync
)

def consume_indefinitely(self, *, topic: str, group_id: str) -> NoReturn:
consume_indefinitely(topic=topic, group_id=group_id)


def create(config) -> EventBusKafka:
"""
Create the Kafka implementation of the EventBus API.
"""
return EventBusKafka(config)
6 changes: 6 additions & 0 deletions edx_event_bus_kafka/consumer/event_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,12 @@ def emit_signals_from_message(self, msg):
logger.error(f"Multiple {EVENT_TYPE_HEADER} headers found on message, cannot determine signal")
return

def consume_indefinitely(*, topic, group_id):
"""
Consume events from a topic in an infinite loop.
"""
consumer = create_consumer(group_id)

event_type = event_types[0]

# TODO (EventBus): Figure out who is doing the encoding and get the
Expand Down
6 changes: 4 additions & 2 deletions edx_event_bus_kafka/publishing/event_producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -204,14 +204,16 @@ def on_event_deliver(err, evt):


def send_to_event_bus(
signal: OpenEdxPublicSignal, topic: str, event_key_field: str, event_data: dict,
sync: bool = False,
*, signal: OpenEdxPublicSignal, topic: str, event_key_field: str, event_data: dict,
sync: bool,
) -> None:
"""
Send a signal event to the event bus under the specified topic.
If the Kafka settings are missing or invalid, return with a warning.
See openedx_events.event_bus.EventBus.send for details.
Arguments:
signal: The original OpenEdxPublicSignal the event was sent to
topic: The event bus topic for the event
Expand Down

0 comments on commit bd08d1e

Please sign in to comment.