Skip to content

Commit

Permalink
feat: Publish generic signals to event bus (#6)
Browse files Browse the repository at this point in the history
Implement `send_to_event_bus`, accepting signal, data, topic, and key information.

- Addresses #5
- Core talk-to-Kafka parts were copied from license manager prototype work, including
  https://github.com/openedx/license-manager/blob/38b38b0979bb5f643595337a9c189f619a7d483b/license_manager/apps/subscriptions/event_bus_utils.py#L56
- Manually tested against devstack Kafka (instructions included in commit)
- Punting on caching the producer for now, but do at least cache the serializer

Also:

- Remove deprecated config line
- Include some recent improvements from cookiecutter:
    - openedx/edx-cookiecutters#199 (requirements)
    - openedx/edx-cookiecutters#216 (test-all)

Small improvements that should be upstreamed to the cookiecutter:

- Clean up after docs build and before test-all
- Ignore pii_report output dir
  • Loading branch information
timmc-edx authored Jul 18, 2022
1 parent 8357eb8 commit f0f7386
Show file tree
Hide file tree
Showing 19 changed files with 661 additions and 88 deletions.
5 changes: 4 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ pip-log.txt
.tox
coverage.xml
htmlcov/

/pii_report/


# The Silver Searcher
Expand Down Expand Up @@ -59,3 +59,6 @@ docs/edx_event_bus_kafka.*.rst
# Private requirements
requirements/private.in
requirements/private.txt

# IDE
.idea
9 changes: 7 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,16 @@ docs: ## generate Sphinx HTML documentation, including API docs
$(BROWSER)docs/_build/html/index.html

# Define PIP_COMPILE_OPTS=-v to get more information during make upgrade.
PIP_COMPILE = pip-compile --rebuild --upgrade $(PIP_COMPILE_OPTS)
PIP_COMPILE = pip-compile --upgrade $(PIP_COMPILE_OPTS)

upgrade: export CUSTOM_COMPILE_COMMAND=make upgrade
upgrade: ## update the requirements/*.txt files with the latest packages satisfying requirements/*.in
pip install -qr requirements/pip-tools.txt
# Make sure to compile files after any other files they include!
$(PIP_COMPILE) --allow-unsafe -o requirements/pip.txt requirements/pip.in
$(PIP_COMPILE) -o requirements/pip-tools.txt requirements/pip-tools.in
pip install -qr requirements/pip.txt
pip install -qr requirements/pip-tools.txt
$(PIP_COMPILE) -o requirements/base.txt requirements/base.in
$(PIP_COMPILE) -o requirements/test.txt requirements/test.in
$(PIP_COMPILE) -o requirements/doc.txt requirements/doc.in
Expand All @@ -63,7 +66,9 @@ test: clean ## run tests in the current virtualenv
diff_cover: test ## find diff lines that need test coverage
diff-cover coverage.xml

test-all: quality pii_check ## run tests on every supported Python/Django combination
test-all: clean quality pii_check ## run tests on every supported Python/Django combination
tox -e docs
rm -rf build # artifact produced by docs run, interferes with pytest
tox

validate: quality pii_check test ## run tests and quality checks
Expand Down
2 changes: 1 addition & 1 deletion codecov.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,6 @@ coverage:
patch:
default:
enabled: yes
target: 100%
target: 95%

comment: false
38 changes: 38 additions & 0 deletions docs/how_tos/manual_testing.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
Manual testing
==============

The producer can be tested manually against a Kafka running in devstack.

#. Create a "unit test" in one of the test files that will actually call Kafka. For example, this could be added to the end of ``edx_event_bus_kafka/publishing/test_event_producer.py``::

def test_actually_send_to_event_bus():
import random
signal = openedx_events.learning.signals.SESSION_LOGIN_COMPLETED
# Make events distinguishable
id = random.randrange(1000)
event_data = {
'user': UserData(
id=id,
is_active=True,
pii=UserPersonalData(
username=f'foobob_{id:03}',
email='bob@foo.example',
name="Bob Foo",
)
)
}

print(f"Sending event with random user ID {id}.")
with override_settings(
SCHEMA_REGISTRY_URL='http://edx.devstack.schema-registry:8081',
KAFKA_BOOTSTRAP_SERVERS='edx.devstack.kafka:29092',
):
ep.send_to_event_bus(signal, 'user_stuff', 'user.id', event_data)

#. Make or refresh a copy of this repo where it can be seen from inside devstack: ``rsync -sax -delete ./ ../src/event-bus-kafka/``
#. In devstack, start Kafka and the control webapp: ``make dev.up.kafka-control-center`` and watch ``make dev.logs.kafka-control-center`` until server is up and happy (may take a few minutes; watch for ``INFO Kafka startTimeMs``)
#. Load the control center UI: http://localhost:9021/clusters and wait for the cluster to become healthy
#. In devstack, run ``lms-up-without-deps-shell`` to bring up an arbitrary shell inside Docker networking (LMS, in this case)
#. In the LMS shell, run ``pip install -e /edx/src/event-bus-kafka`` and then run whatever test you want, e.g. ``pytest /edx/src/event-bus-kafka/edx_event_bus_kafka/publishing/test_event_producer.py::test_actually_send_to_event_bus``
#. Go to the topic that was created and then into the Messages tab; select offset=0 to make sure you can see messages that were sent before you had the UI open.
#. Rerun ``rsync`` after any edits
1 change: 1 addition & 0 deletions docs/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ Contents:
:maxdepth: 2

readme
how_tos/manual_testing
getting_started
testing
internationalization
Expand Down
2 changes: 0 additions & 2 deletions edx_event_bus_kafka/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,3 @@
"""

__version__ = '0.1.0'

default_app_config = 'edx_event_bus_kafka.apps.EdxEventBusKafkaConfig' # pylint: disable=invalid-name
Empty file.
221 changes: 221 additions & 0 deletions edx_event_bus_kafka/publishing/event_producer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,221 @@
"""
Produce Kafka events from signals.
Main function is ``send_to_event_bus``.
"""

import json
import logging
import warnings
from functools import lru_cache
from typing import Any, List, Optional

from confluent_kafka import SerializingProducer
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry.avro import AvroSerializer
from django.conf import settings
from openedx_events.event_bus.avro.serializer import AvroSignalSerializer
from openedx_events.tooling import OpenEdxPublicSignal

logger = logging.getLogger(__name__)

# CloudEvent standard name for the event type header, see
# https://github.com/cloudevents/spec/blob/v1.0.1/kafka-protocol-binding.md#325-example
EVENT_TYPE_HEADER_KEY = "ce_type"


def extract_event_key(event_data: dict, event_key_field: str) -> Any:
"""
From an event object, extract a Kafka event key (not yet serialized).
Arguments:
event_data: The event data (kwargs) sent to the signal
event_key_field: Path to the event data field to use as the event key (period-delimited
string naming the dictionary keys to descend)
Returns:
Key data, which might be an integer, string, dictionary, etc.
"""
field_path = event_key_field.split(".")
current_data = event_data
for field_name in field_path:
if isinstance(current_data, dict):
if field_name not in current_data:
raise Exception(
f"Could not extract key from event; lookup in {event_key_field} "
f"failed at {field_name!r} in dictionary"
)
current_data = current_data[field_name]
else:
if not hasattr(current_data, field_name):
raise Exception(
f"Could not extract key from event; lookup in {event_key_field} "
f"failed at {field_name!r} in object"
)
current_data = getattr(current_data, field_name)
return current_data


def descend_avro_schema(serializer_schema: dict, field_path: List[str]) -> dict:
"""
Extract a subfield within an Avro schema, recursively.
Arguments:
serializer_schema: An Avro schema (nested dictionaries)
field_path: List of strings matching the 'name' of successively deeper subfields
Returns:
Schema for some field
TODO: Move to openedx_events.event_bus.avro.serializer?
"""
subschema = serializer_schema
for field_name in field_path:
try:
# Either descend into .fields (for dictionaries) or .type.fields (for classes).
if 'fields' not in subschema:
# Descend through .type wrapper first
subschema = subschema['type']
field_list = subschema['fields']

matching = [field for field in field_list if field['name'] == field_name]
subschema = matching[0]
except BaseException as e:
raise Exception(
f"Error traversing Avro schema along path {field_path!r}; failed at {field_name!r}."
) from e
return subschema


def extract_key_schema(signal_serializer: AvroSignalSerializer, event_key_field: str) -> str:
"""
From a signal's serializer, extract just the part of the Avro schema that will be used for the Kafka event key.
Arguments:
signal_serializer: The signal serializer to extract a sub-schema from
event_key_field: Path to the event data field to use as the event key (period-delimited
string naming the dictionary keys to descend)
Returns:
The key's schema, as a string.
"""
subschema = descend_avro_schema(signal_serializer.schema, event_key_field.split("."))
# Same as used by AvroSignalSerializer#schema_string in openedx-events
return json.dumps(subschema, sort_keys=True)


@lru_cache
def get_serializer(signal: OpenEdxPublicSignal) -> AvroSignalSerializer:
"""
Get the serializer for a signal.
This is just defined to allow caching of serializers.
"""
return AvroSignalSerializer(signal)


# TODO: Cache this, but in a way that still allows changes to settings
# via remote-config (and in particular does not result in mixed
# cache/uncached configuration).
def get_producer_for_signal(signal: OpenEdxPublicSignal, event_key_field: str) -> Optional[SerializingProducer]:
"""
Create the producer for a signal and a key field path.
If essential settings are missing or invalid, warn and return None.
Arguments:
signal: The OpenEdxPublicSignal to make a producer for
event_key_field: Path to the event data field to use as the event key (period-delimited
string naming the dictionary keys to descend)
"""
if schema_registry_url := getattr(settings, 'SCHEMA_REGISTRY_URL', None):
schema_registry_config = {
'url': schema_registry_url,
'basic.auth.user.info': f"{getattr(settings, 'SCHEMA_REGISTRY_API_KEY', '')}"
f":{getattr(settings, 'SCHEMA_REGISTRY_API_SECRET', '')}",
}
else:
warnings.warn("Cannot configure event-bus-kafka: Missing setting SCHEMA_REGISTRY_URL")
return None

if bootstrap_servers := getattr(settings, 'KAFKA_BOOTSTRAP_SERVERS', None):
producer_settings = {
'bootstrap.servers': bootstrap_servers,
}
else:
warnings.warn("Cannot configure event-bus-kafka: Missing setting KAFKA_BOOTSTRAP_SERVERS")
return None

if getattr(settings, 'KAFKA_API_KEY', None) and getattr(settings, 'KAFKA_API_SECRET', None):
producer_settings.update({
'sasl.mechanism': 'PLAIN',
'security.protocol': 'SASL_SSL',
'sasl.username': settings.KAFKA_API_KEY,
'sasl.password': settings.KAFKA_API_SECRET,
})

schema_registry_client = SchemaRegistryClient(schema_registry_config)
signal_serializer = get_serializer(signal)

def inner_to_dict(event_data, ctx=None): # pylint: disable=unused-argument
"""Tells Avro how to turn objects into dictionaries."""
return signal_serializer.to_dict(event_data)

# Serializers for key and value components of Kafka event
key_serializer = AvroSerializer(
schema_str=extract_key_schema(signal_serializer, event_key_field),
schema_registry_client=schema_registry_client,
to_dict=inner_to_dict,
)
value_serializer = AvroSerializer(
schema_str=signal_serializer.schema_string(),
schema_registry_client=schema_registry_client,
to_dict=inner_to_dict,
)

producer_settings.update({
'key.serializer': key_serializer,
'value.serializer': value_serializer,
})

return SerializingProducer(producer_settings)


def on_event_deliver(err, evt):
"""
Simple callback method for debugging event production
Arguments:
err: Error if event production failed
evt: Event that was delivered (or failed to be delivered)
"""
if err is not None:
logger.warning(f"Event delivery failed: {err!r}")
else:
# Don't log msg.value() because it may contain userids and/or emails
logger.info(f"Event delivered to topic {evt.topic()}; key={evt.key()}; "
f"partition={evt.partition()}")


def send_to_event_bus(signal: OpenEdxPublicSignal, topic: str, event_key_field: str, event_data: dict) -> None:
"""
Send a signal event to the event bus under the specified topic.
If the Kafka settings are missing or invalid, return with a warning.
Arguments:
signal: The original OpenEdxPublicSignal the event was sent to
topic: The event bus topic for the event
event_key_field: Path to the event data field to use as the event key (period-delimited
string naming the dictionary keys to descend)
event_data: The event data (kwargs) sent to the signal
"""
producer = get_producer_for_signal(signal, event_key_field)
if producer is None: # Note: SerializingProducer has False truthiness when len() == 0
return

event_key = extract_event_key(event_data, event_key_field)
producer.produce(topic, key=event_key, value=event_data,
on_delivery=on_event_deliver,
headers={EVENT_TYPE_HEADER_KEY: signal.event_type})
producer.poll() # wait indefinitely for the above event to either be delivered or fail
Loading

0 comments on commit f0f7386

Please sign in to comment.