Skip to content

Commit

Permalink
feat: Implement impl-loading API for openedx-events
Browse files Browse the repository at this point in the history
  • Loading branch information
timmc-edx committed Sep 1, 2022
1 parent 26d9c4f commit fb0978d
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 2 deletions.
15 changes: 15 additions & 0 deletions edx_event_bus_kafka/internal/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from django.core.management.base import BaseCommand
from django.dispatch import receiver
from edx_toggles.toggles import SettingToggle
from openedx_events.event_bus import EventConsumer
from openedx_events.event_bus.avro.deserializer import AvroSignalDeserializer
from openedx_events.learning.data import UserData
from openedx_events.learning.signals import SESSION_LOGIN_COMPLETED
Expand Down Expand Up @@ -168,6 +169,20 @@ def emit_signals_from_message(self, msg):
logger.exception(f"Signal not found: {event_type_str}")


class EventConsumerKafka(EventConsumer):
"""
Kafka implementation of the EventBus Consumer API expected by openedx-events.
"""

def consume_indefinitely(self, *, topic: str, group_id: str, signal: OpenEdxPublicSignal) -> NoReturn:
event_consumer = KafkaEventConsumer(
topic=topic,
group_id=group_id,
signal=signal,
)
event_consumer.consume_indefinitely()


class ConsumeEventsCommand(BaseCommand):
"""
Listen for events from the event bus and log them. Only run on servers where
Expand Down
7 changes: 5 additions & 2 deletions edx_event_bus_kafka/internal/producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

from django.dispatch import receiver
from django.test.signals import setting_changed
from openedx_events.event_bus import EventProducer
from openedx_events.event_bus.avro.serializer import AvroSignalSerializer
from openedx_events.tooling import OpenEdxPublicSignal

Expand Down Expand Up @@ -153,9 +154,11 @@ def inner_to_dict(event_data, ctx=None): # pylint: disable=unused-argument
return key_serializer, value_serializer


class EventProducerKafka():
class EventProducerKafka(EventProducer):
"""
API singleton for event production to Kafka.
Kafka implementation of the EventBus Producer API expected by openedx-events.
See comments for ``get_producer_for_signal`` to understand where state is stored.
This is just a wrapper around a confluent_kafka Producer that knows how to
serialize a signal to event wire format.
Expand Down

0 comments on commit fb0978d

Please sign in to comment.