Skip to content

Commit

Permalink
feat!: Make use of abstract consumer API (#154)
Browse files Browse the repository at this point in the history
Implement openedx-events Event Bus Consumer API.

This is a breaking change that removes the `consume_events` management command,
as this command will be provided by openedx_events. See changelog for other changes
to the consumer implementation.
  • Loading branch information
navinkarkera authored May 10, 2023
1 parent 6d72fba commit 1f3896a
Show file tree
Hide file tree
Showing 8 changed files with 92 additions and 184 deletions.
10 changes: 10 additions & 0 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,16 @@ Change Log
Unreleased
**********

[4.0.0] - 2023-05-10
********************
Changed
=======
* Implement openedx-events Event Bus Consumer API.
* **BREAKING CHANGE**: Remove ``consume_events`` management command as this command will be provided by openedx_events. To replay events using the
openedx-events version of the management command, pass ``--extra '{"offset_time": "2023-01-08T06:46:22"}'`` instead of ``-o 2023-01-08T06:46:22``.
* **BREAKING Change**: ``offset_timestamp`` argument has been removed from ``consume_indefinitely`` and ``reset_offsets_and_sleep_indefinitely`` methods.
It is now added as an optional argument named ``offset_time`` to ``KafkaEventConsumer`` constructor.

[3.10.0] - 2023-05-05
*********************
Changed
Expand Down
23 changes: 20 additions & 3 deletions docs/how_tos/manual_testing.rst
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,20 @@ Setting up for testing

#. Make or refresh a copy of this repo where it can be seen from inside devstack: ``rsync -sax -delete ./ ../src/event-bus-kafka/`` (and rerun rsync after any edits as needed)
#. In devstack, start Kafka and the control webapp: ``make dev.up.kafka-control-center`` and watch ``make dev.logs.kafka-control-center`` until server is up and happy (may take a few minutes; watch for ``INFO Kafka startTimeMs``)
#. Sometimes kafka fails to start, run ``make dev.up.kafka-control-center`` again.
#. Load the control center UI: http://localhost:9021/clusters and wait for the cluster to become healthy
#. Create the topic you want to use for testing. For the examples used in the management commands, you'll want to use ``dev-user-login`` with default settings.
#. In edx-platform's ``cms/envs/common.py``, add ``'edx_event_bus_kafka'`` to the ``INSTALLED_APPS`` list
#. You can either create the topic you want to use for testing manually or perform the action in LMS or studio which will publish the event, creating the topic automatically. For the examples used in the management commands, you'll want to use ``dev-user-login`` with default settings.
#. Update below settings in ``cms/envs/devstack.py``, make sure that they are defined only once.

.. code-block:: python
EVENT_BUS_PRODUCER = 'edx_event_bus_kafka.create_producer'
EVENT_BUS_CONSUMER = 'edx_event_bus_kafka.KafkaEventConsumer'
EVENT_BUS_KAFKA_SCHEMA_REGISTRY_URL = 'http://edx.devstack.schema-registry:8081'
EVENT_BUS_KAFKA_BOOTSTRAP_SERVERS = 'edx.devstack.kafka:29092'
EVENT_BUS_TOPIC_PREFIX = 'dev'
#. In edx-platform's ``cms/envs/devstack.py``, add ``'edx_event_bus_kafka'`` and ``'openedx_events'`` to the ``INSTALLED_APPS`` list
#. In devstack, run ``make devpi-up studio-up-without-deps-shell`` to bring up Studio with a shell.
#. In the Studio shell, run ``pip install -e /edx/src/event-bus-kafka``
#. In the Studio shell, run ``pip install 'confluent_kafka[avro,schema-registry]'`` (necessary external dependency)
Expand All @@ -22,7 +33,12 @@ Testing the consumer

The consumer may not read older events from the topic—and it never will for the first run of a topic/group pair—so you may need to started it before the producer, or re-run the producer after the consumer is started.

- Run the example command listed in the ``edx_event_bus_kafka.consumer.event_consumer.ConsumeEventsCommand`` docstring
- Run below example command:

.. code-block:: bash
python3 manage.py cms consume_events -t user-login -g user-activity-service -s org.openedx.learning.auth.session.login.completed.v1
- Once an event comes in, expect to see output that ends with a line containing "Received SESSION_LOGIN_COMPLETED signal with user_data"

Testing the producer
Expand All @@ -35,3 +51,4 @@ Note: If you're also running the consumer, you'll need to do this in a separate
- Run the example command listed in the ``edx_event_bus_kafka.management.commands.produce_event.Command`` docstring
- Expect to see output that ends with a line containing "Event delivered to topic"
- Go to the topic that was created and then into the Messages tab; select offset=0 to make sure you can see messages that were sent before you had the UI open.

2 changes: 1 addition & 1 deletion edx_event_bus_kafka/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,4 @@
from edx_event_bus_kafka.internal.consumer import KafkaEventConsumer
from edx_event_bus_kafka.internal.producer import KafkaEventProducer, create_producer

__version__ = '3.10.0'
__version__ = '4.0.0'
139 changes: 28 additions & 111 deletions edx_event_bus_kafka/internal/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,18 @@
"""
import logging
import time
import warnings
from datetime import datetime
from functools import lru_cache

from django.conf import settings
from django.core.management.base import BaseCommand
from django.db import connection
from django.dispatch import receiver
from django.test.signals import setting_changed
from edx_django_utils.monitoring import function_trace, record_exception, set_custom_attribute
from edx_toggles.toggles import SettingToggle
from openedx_events.event_bus import EventBusConsumer
from openedx_events.event_bus.avro.deserializer import AvroSignalDeserializer
from openedx_events.tooling import OpenEdxPublicSignal, load_all_signals
from openedx_events.tooling import OpenEdxPublicSignal

from .config import get_full_topic, get_schema_registry_client, load_common_settings
from .utils import (
Expand Down Expand Up @@ -101,23 +100,38 @@ def _reconnect_to_db_if_needed():
connection.connect()


class KafkaEventConsumer:
class KafkaEventConsumer(EventBusConsumer):
"""
Construct consumer for the given topic and group. The consumer can then
emit events from the event bus using the signal from the message headers.
Note that the topic should be specified here *without* the optional environment prefix.
Can also consume messages indefinitely off the queue.
Attributes:
topic: Topic to consume (without environment prefix).
group_id: Consumer group id.
signal: Type of signal to emit from consumed messages.
consumer: Actual kafka consumer instance.
offset_time: The timestamp (in ISO format) that we would like to reset the consumers to. If this is used, the
consumers will only reset the offsets of the topic but will not actually consume and process any messages.
"""

def __init__(self, topic, group_id):
def __init__(self, topic, group_id, offset_time=None):
if confluent_kafka is None: # pragma: no cover
raise Exception('Library confluent-kafka not available. Cannot create event consumer.')

self.topic = topic
self.group_id = group_id
self.consumer = self._create_consumer()
self.offset_time = None
if offset_time:
try:
self.offset_time = datetime.fromisoformat(str(offset_time))
except ValueError:
logger.exception('Could not parse the offset timestamp.')
raise
self._shut_down_loop = False
self.schema_registry_client = get_schema_registry_client()

Expand Down Expand Up @@ -148,25 +162,22 @@ def _shut_down(self):
"""
self._shut_down_loop = True

def reset_offsets_and_sleep_indefinitely(self, offset_timestamp):
def reset_offsets_and_sleep_indefinitely(self):
"""
Reset any assigned partitions to the given offset, and sleep indefinitely.
Arguments:
offset_timestamp (datetime): Reset the offsets of the consumer partitions to this timestamp.
"""

def reset_offsets(consumer, partitions):
# This is a callback method used on consumer assignment to handle offset reset logic.
# We do not want to attempt to change offsets if the offset is None.
if offset_timestamp is None:
if self.offset_time is None:
return

# Get the offset from the epoch. Kafka expects offsets in milliseconds for offsets_for_times. Although
# this is undocumented in the libraries we're using (confluent-kafka and librdkafa), for reference
# see the docs for kafka-python:
# https://kafka-python.readthedocs.io/en/master/apidoc/KafkaConsumer.html#kafka.KafkaConsumer.offsets_for_times
offset_timestamp_ms = int(offset_timestamp.timestamp()*1000)
offset_timestamp_ms = int(self.offset_time.timestamp()*1000)
# We set the epoch timestamp in the offset position.
for partition in partitions:
partition.offset = offset_timestamp_ms
Expand All @@ -178,7 +189,7 @@ def reset_offsets(consumer, partitions):
if len(errors) > 0:
raise Exception("Error getting offsets for timestamps: {errors}")

logger.info(f'Found offsets for timestamp {offset_timestamp}: {partitions_with_offsets}')
logger.info(f'Found offsets for timestamp {self.offset_time}: {partitions_with_offsets}')

# We need to commit these offsets to Kafka in order to ensure these offsets are persisted.
consumer.commit(offsets=partitions_with_offsets)
Expand Down Expand Up @@ -214,8 +225,6 @@ def _consume_indefinitely(self):
Consume events from a topic in an infinite loop.
"""

# This is already checked at the Command level, but it's possible this loop
# could get called some other way, so check it here too.
if not KAFKA_CONSUMERS_ENABLED.is_enabled():
logger.error("Kafka consumers not enabled, exiting.")
return
Expand Down Expand Up @@ -294,26 +303,15 @@ def _consume_indefinitely(self):
finally:
self.consumer.close()

def consume_indefinitely(self, offset_timestamp=None):
def consume_indefinitely(self):
"""
Consume events from a topic in an infinite loop.
Arguments:
offset_timestamp (datetime): Optional and deprecated; if supplied, calls
``reset_offsets_and_sleep_indefinitely`` instead. Relying code should
switch to calling that method directly.
Consume events from a topic in an infinite loop if offset_time is not set else reset any assigned partitions to
the given offset, and sleep indefinitely.
"""
# TODO: Once this deprecated argument can be removed, just
# remove this delegation method entirely and rename
# `_consume_indefinitely` to no longer have the `_` prefix.
if offset_timestamp is None:
if self.offset_time is None:
self._consume_indefinitely()
else:
warnings.warn(
"Calling consume_indefinitely with offset_timestamp is deprecated; "
"please call reset_offsets_and_sleep_indefinitely directly instead."
)
self.reset_offsets_and_sleep_indefinitely(offset_timestamp)
self.reset_offsets_and_sleep_indefinitely()

@function_trace('emit_signals_from_message')
def emit_signals_from_message(self, msg, signal):
Expand Down Expand Up @@ -612,87 +610,6 @@ def _get_kafka_message_and_error(self, message, error):
return kafka_message, kafka_error


class ConsumeEventsCommand(BaseCommand):
"""
Management command for Kafka consumer workers in the event bus.
"""
help = """
Consume messages from a Kafka topic and send their data to the correct signal.
Example::
python3 manage.py cms consume_events -t user-login -g user-activity-service
"""

def add_arguments(self, parser):

parser.add_argument(
'-t', '--topic',
nargs=1,
required=True,
help='Topic to consume (without environment prefix)'
)

parser.add_argument(
'-g', '--group_id',
nargs=1,
required=True,
help='Consumer group id'
)

# TODO: remove this once callers have been updated. Left optional to avoid the need for lockstep changes
parser.add_argument(
'-s', '--signal',
nargs=1,
required=False,
default=None,
help='Deprecated argument. Correct signal will be determined from event'
)

parser.add_argument(
'-o', '--offset_time',
nargs=1,
required=False,
default=None,
help='The timestamp (in ISO format) that we would like to reset the consumers to. '
'If this is used, the consumers will only reset the offsets of the topic '
'but will not actually consume and process any messages.'
)

def handle(self, *args, **options):
if confluent_kafka is None:
logger.error(
"Cannot consume events because confluent_kafka dependency (or one of its extras) was not installed"
)
return

if not KAFKA_CONSUMERS_ENABLED.is_enabled():
logger.error("Kafka consumers not enabled, exiting.")
return

try:
load_all_signals()
if options['offset_time'] and options['offset_time'][0] is not None:
try:
offset_timestamp = datetime.fromisoformat(options['offset_time'][0])
except ValueError:
logger.exception('Could not parse the offset timestamp.')
raise
else:
offset_timestamp = None

event_consumer = KafkaEventConsumer(
topic=options['topic'][0],
group_id=options['group_id'][0],
)
if offset_timestamp is None:
event_consumer.consume_indefinitely()
else:
event_consumer.reset_offsets_and_sleep_indefinitely(offset_timestamp=offset_timestamp)
except Exception: # pylint: disable=broad-except
logger.exception("Error consuming Kafka events")


# argument type SchemaRegistryClient for schema_registry_client removed from signature to avoid error on import
@lru_cache
def get_deserializer(signal: OpenEdxPublicSignal, schema_registry_client):
Expand Down
Loading

0 comments on commit 1f3896a

Please sign in to comment.