-
Notifications
You must be signed in to change notification settings - Fork 5
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: Publish generic signals to event bus #6
Merged
Merged
Changes from 11 commits
Commits
Show all changes
19 commits
Select commit
Hold shift + click to select a range
731a2b6
feat: publish generic signals to event bus
afba483
fix: Remove deprecated config line
timmc-edx a8ebd28
fixup! Maybe in a working state?
timmc-edx eecb4bd
fixup! Include some recent improvements to cookiecutter
timmc-edx ac185dc
fixup! Fix indentation/params lint
timmc-edx d4a4551
fixup! Include schema-registry and avro extras for confluent_kafka
timmc-edx 7dd0069
fixup! Improve coverage; stop trying to cover all failure modes in tr…
timmc-edx ba255dd
fixup! Call poll(); simplify settings retrieval; add manual testing doc
timmc-edx bd8c80d
fixup! lint (link doc; use direct attr dereference)
timmc-edx 66ff999
fixup! Document some arguments and return values
timmc-edx fe0a9c8
fixup! Rename `bit` to `field_name`
timmc-edx 3c0f8bb
fixup! Include devstack settings in manual-testing instructions
timmc-edx 3c6f67f
fixup! Only cache serializer; soft fail on bad config
timmc-edx 199bf9a
fixup! Add arguments (in proper style)
timmc-edx 1b8d3ff
fixup! Rename verify_event and reformat args
timmc-edx 80b22bb
fixup! Test soft-fail on missing settings
timmc-edx 2aa6583
fixup! Don't soft fail just because producer is False-y
timmc-edx d8ef5d1
fixup! Clarify note
timmc-edx ec2090f
fixup! delint return
timmc-edx File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -7,6 +7,6 @@ coverage: | |
patch: | ||
default: | ||
enabled: yes | ||
target: 100% | ||
target: 95% | ||
|
||
comment: false |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,35 @@ | ||
Manual testing | ||
============== | ||
|
||
The producer can be tested manually against a Kafka running in devstack. | ||
|
||
#. Create a "unit test" in one of the test files that will actually call Kafka. For example, this could be added to the end of ``edx_event_bus_kafka/publishing/test_event_producer.py``:: | ||
|
||
import random | ||
|
||
def test_actually_send_to_event_bus(): | ||
signal = openedx_events.learning.signals.SESSION_LOGIN_COMPLETED | ||
# Make events distinguishable | ||
id = random.randrange(1000) | ||
event_data = { | ||
'user': UserData( | ||
id=id, | ||
is_active=True, | ||
pii=UserPersonalData( | ||
username=f'foobob_{id:03}', | ||
email='bob@foo.example', | ||
name="Bob Foo", | ||
) | ||
) | ||
} | ||
|
||
print(f"Sending event with random user ID {id}.") | ||
ep.send_to_event_bus(signal, 'user_stuff', 'user.id', event_data) | ||
|
||
#. Make or refresh a copy of this repo where it can be seen from inside devstack: ``rsync -sax -delete ./ ../src/event-bus-kafka/`` | ||
#. In devstack, start Kafka and the control webapp: ``make dev.up.kafka-control-center`` and watch ``make dev.logs.kafka-control-center`` until server is up and happy (may take a few minutes; watch for ``INFO Kafka startTimeMs``) | ||
#. Load the control center UI: http://localhost:9021/clusters and wait for the cluster to become healthy | ||
#. In devstack, run ``lms-up-without-deps-shell`` to bring up an arbitrary shell inside Docker networking (LMS, in this case) | ||
#. In the LMS shell, run ``pip install -e /edx/src/event-bus-kafka`` and then run whatever test you want, e.g. ``pytest /edx/src/event-bus-kafka/edx_event_bus_kafka/publishing/test_event_producer.py::test_actually_send_to_event_bus`` | ||
#. Go to the topic that was created and then into the Messages tab; select offset=0 to make sure you can see messages that were sent before you had the UI open. | ||
#. Rerun ``rsync`` after any edits |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -14,6 +14,7 @@ Contents: | |
:maxdepth: 2 | ||
|
||
readme | ||
how_tos/manual_testing | ||
getting_started | ||
testing | ||
internationalization | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Empty file.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,177 @@ | ||
""" | ||
Produce Kafka events from signals. | ||
""" | ||
|
||
import json | ||
import logging | ||
from functools import lru_cache | ||
from typing import Any, List | ||
|
||
from confluent_kafka import SerializingProducer | ||
from confluent_kafka.schema_registry import SchemaRegistryClient | ||
from confluent_kafka.schema_registry.avro import AvroSerializer | ||
from django.conf import settings | ||
from openedx_events.event_bus.avro.serializer import AvroSignalSerializer | ||
|
||
logger = logging.getLogger(__name__) | ||
|
||
# CloudEvent standard name for the event type header, see | ||
# https://github.com/cloudevents/spec/blob/v1.0.1/kafka-protocol-binding.md#325-example | ||
EVENT_TYPE_HEADER_KEY = "ce_type" | ||
|
||
|
||
def extract_event_key(event_data: dict, event_key_field: str) -> Any: | ||
""" | ||
From an event object, extract a Kafka event key (not yet serialized). | ||
timmc-edx marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
Arguments: | ||
event_data: The event data sent to a signal (the kwargs dictionary) | ||
event_key_field: Period-delimited string naming the dictionary keys to descend to find the event key data | ||
|
||
Returns: | ||
Key data, which might be an integer, string, dictionary, etc. | ||
""" | ||
field_path = event_key_field.split(".") | ||
current_data = event_data | ||
for field_name in field_path: | ||
if isinstance(current_data, dict): | ||
if field_name not in current_data: | ||
raise Exception( | ||
f"Could not extract key from event; lookup in {event_key_field} " | ||
f"failed at {field_name!r} in dictionary" | ||
) | ||
current_data = current_data[field_name] | ||
else: | ||
if not hasattr(current_data, field_name): | ||
raise Exception( | ||
f"Could not extract key from event; lookup in {event_key_field} " | ||
f"failed at {field_name!r} in object" | ||
) | ||
current_data = getattr(current_data, field_name) | ||
return current_data | ||
|
||
|
||
def descend_avro_schema(serializer_schema: dict, field_path: List[str]) -> dict: | ||
""" | ||
Extract a subfield within an Avro schema, recursively. | ||
|
||
Arguments: | ||
serializer_schema: An Avro schema (nested dictionaries) | ||
timmc-edx marked this conversation as resolved.
Show resolved
Hide resolved
|
||
field_path: List of strings matching the 'name' of successively deeper subfields | ||
|
||
Returns: | ||
Schema for some field | ||
|
||
TODO: Move to openedx_events.event_bus.avro.serializer? | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not a blocker, but we should clarify with @robrap how we want to handle TODOs like this |
||
""" | ||
subschema = serializer_schema | ||
for field_name in field_path: | ||
try: | ||
# Either descend into .fields (for dictionaries) or .type.fields (for classes). | ||
if 'fields' not in subschema: | ||
# Descend through .type wrapper first | ||
subschema = subschema['type'] | ||
field_list = subschema['fields'] | ||
|
||
matching = [field for field in field_list if field['name'] == field_name] | ||
subschema = matching[0] | ||
except BaseException as e: | ||
raise Exception( | ||
f"Error traversing Avro schema along path {field_path!r}; failed at {field_name!r}." | ||
) from e | ||
return subschema | ||
|
||
|
||
def extract_key_schema(signal_serializer: AvroSignalSerializer, event_key_field: str) -> str: | ||
""" | ||
From a signal's serializer, extract just the part of the Avro schema that will be used for the Kafka event key. | ||
timmc-edx marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
Arguments: | ||
signal_serializer: The signal serializer to extract a sub-schema from | ||
event_key_field: Period-delimited string naming the field 'name' keys to descend to find the key schema | ||
|
||
Returns: | ||
The key's schema, as a string. | ||
""" | ||
subschema = descend_avro_schema(signal_serializer.schema, event_key_field.split(".")) | ||
# Same as used by AvroSignalSerializer#schema_string in openedx-events | ||
return json.dumps(subschema, sort_keys=True) | ||
|
||
|
||
@lru_cache | ||
timmc-edx marked this conversation as resolved.
Show resolved
Hide resolved
|
||
def get_producer_for_signal(signal, event_key_field): | ||
timmc-edx marked this conversation as resolved.
Show resolved
Hide resolved
|
||
""" | ||
Create the producer for a signal and a key field path. | ||
""" | ||
schema_registry_config = { | ||
'url': getattr(settings, 'SCHEMA_REGISTRY_URL', ''), | ||
'basic.auth.user.info': f"{getattr(settings, 'SCHEMA_REGISTRY_API_KEY', '')}" | ||
f":{getattr(settings, 'SCHEMA_REGISTRY_API_SECRET', '')}", | ||
} | ||
schema_registry_client = SchemaRegistryClient(schema_registry_config) | ||
signal_serializer = AvroSignalSerializer(signal) | ||
|
||
def inner_to_dict(event_data, ctx=None): # pylint: disable=unused-argument | ||
"""Tells Avro how to turn objects into dictionaries.""" | ||
return signal_serializer.to_dict(event_data) | ||
|
||
# Serializers for key and value components of Kafka event | ||
key_serializer = AvroSerializer( | ||
schema_str=extract_key_schema(signal_serializer, event_key_field), | ||
schema_registry_client=schema_registry_client, | ||
to_dict=inner_to_dict, | ||
) | ||
value_serializer = AvroSerializer( | ||
schema_str=signal_serializer.schema_string(), | ||
schema_registry_client=schema_registry_client, | ||
to_dict=inner_to_dict, | ||
) | ||
|
||
producer_settings = { | ||
'bootstrap.servers': getattr(settings, 'KAFKA_BOOTSTRAP_SERVER', None), | ||
'key.serializer': key_serializer, | ||
'value.serializer': value_serializer, | ||
} | ||
|
||
if getattr(settings, 'KAFKA_API_KEY', None) and getattr(settings, 'KAFKA_API_SECRET', None): | ||
producer_settings.update({ | ||
'sasl.mechanism': 'PLAIN', | ||
'security.protocol': 'SASL_SSL', | ||
'sasl.username': settings.KAFKA_API_KEY, | ||
'sasl.password': settings.KAFKA_API_SECRET, | ||
}) | ||
|
||
return SerializingProducer(producer_settings) | ||
|
||
|
||
def verify_event(err, evt): | ||
""" | ||
Simple callback method for debugging event production | ||
|
||
:param err: Error if event production failed | ||
timmc-edx marked this conversation as resolved.
Show resolved
Hide resolved
|
||
:param evt: Event that was delivered | ||
""" | ||
if err is not None: | ||
logger.warning(f"Event delivery failed: {err!r}") | ||
else: | ||
# Don't log msg.value() because it may contain userids and/or emails | ||
logger.info(f"Event delivered to topic {evt.topic()}; key={evt.key()}; " | ||
f"partition={evt.partition()}") | ||
|
||
|
||
def send_to_event_bus(signal, topic, event_key_field, event_data): | ||
""" | ||
Send a signal event to the event bus under the specified topic. | ||
|
||
:param signal: The original OpenEdxPublicSignal the event was sent to | ||
:param topic: The event bus topic for the event | ||
:param event_key_field: The name of the signal data field to use as the | ||
event key (dot-separated path of dictionary key/attribute names) | ||
:param event_data: The data sent to the signal | ||
""" | ||
producer = get_producer_for_signal(signal, event_key_field) | ||
event_key = extract_event_key(event_data, event_key_field) | ||
producer.produce(topic, key=event_key, value=event_data, | ||
on_delivery=verify_event, | ||
timmc-edx marked this conversation as resolved.
Show resolved
Hide resolved
|
||
headers={EVENT_TYPE_HEADER_KEY: signal.event_type}) | ||
producer.poll() # wait indefinitely for the above event to either be delivered or fail |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,112 @@ | ||
""" | ||
Test the event producer code. | ||
""" | ||
|
||
from unittest import TestCase | ||
from unittest.mock import MagicMock, patch | ||
|
||
import openedx_events.learning.signals | ||
import pytest | ||
from django.test import override_settings | ||
from openedx_events.event_bus.avro.serializer import AvroSignalSerializer | ||
from openedx_events.learning.data import UserData, UserPersonalData | ||
|
||
import edx_event_bus_kafka.publishing.event_producer as ep | ||
|
||
|
||
class TestEventProducer(TestCase): | ||
"""Test producer.""" | ||
|
||
def test_extract_event_key(self): | ||
event_data = { | ||
'user': UserData( | ||
id=123, | ||
is_active=True, | ||
pii=UserPersonalData( | ||
username='foobob', | ||
email='bob@foo.example', | ||
name="Bob Foo", | ||
) | ||
) | ||
} | ||
|
||
assert ep.extract_event_key(event_data, 'user.pii.username') == 'foobob' | ||
with pytest.raises(Exception, | ||
match="Could not extract key from event; lookup in xxx failed at 'xxx' in dictionary"): | ||
ep.extract_event_key(event_data, 'xxx') | ||
with pytest.raises(Exception, | ||
match="Could not extract key from event; lookup in user.xxx failed at 'xxx' in object"): | ||
ep.extract_event_key(event_data, 'user.xxx') | ||
|
||
def test_descend_avro_schema(self): | ||
signal = openedx_events.learning.signals.SESSION_LOGIN_COMPLETED | ||
schema = AvroSignalSerializer(signal).schema | ||
|
||
assert ep.descend_avro_schema(schema, ['user', 'pii', 'username']) == {"name": "username", "type": "string"} | ||
|
||
with pytest.raises(Exception) as excinfo: | ||
ep.descend_avro_schema(schema, ['user', 'xxx']) | ||
assert excinfo.value.args == ("Error traversing Avro schema along path ['user', 'xxx']; failed at 'xxx'.",) | ||
assert isinstance(excinfo.value.__cause__, IndexError) | ||
|
||
def test_extract_key_schema(self): | ||
signal = openedx_events.learning.signals.SESSION_LOGIN_COMPLETED | ||
schema = ep.extract_key_schema(AvroSignalSerializer(signal), 'user.pii.username') | ||
assert schema == '{"name": "username", "type": "string"}' | ||
|
||
def test_get_producer_for_signal(self): | ||
signal = openedx_events.learning.signals.SESSION_LOGIN_COMPLETED | ||
with override_settings( | ||
SCHEMA_REGISTRY_URL='http://localhost:12345', | ||
SCHEMA_REGISTRY_API_KEY='some_key', | ||
SCHEMA_REGISTRY_API_SECRET='some_secret', | ||
KAFKA_BOOTSTRAP_SERVER='http://localhost:54321', | ||
# include these just to maximize code coverage | ||
KAFKA_API_KEY='some_other_key', | ||
KAFKA_API_SECRET='some_other_secret', | ||
): | ||
producer_first = ep.get_producer_for_signal(signal, 'user.id') | ||
producer_second = ep.get_producer_for_signal(signal, 'user.id') | ||
# There's not a lot we can test here, but we can at least | ||
# check that construction succeeds and that caching is | ||
# happening. | ||
assert producer_first is producer_second | ||
|
||
@patch('edx_event_bus_kafka.publishing.event_producer.logger') | ||
def test_verify_event(self, mock_logger): | ||
fake_event = MagicMock() | ||
fake_event.topic.return_value = 'some_topic' | ||
fake_event.key.return_value = 'some_key' | ||
fake_event.partition.return_value = 'some_partition' | ||
|
||
ep.verify_event(Exception("problem!"), fake_event) | ||
mock_logger.warning.assert_called_once_with("Event delivery failed: Exception('problem!')") | ||
|
||
ep.verify_event(None, fake_event) | ||
mock_logger.info.assert_called_once_with( | ||
'Event delivered to topic some_topic; key=some_key; partition=some_partition' | ||
) | ||
|
||
def test_send_to_event_bus(self): | ||
signal = openedx_events.learning.signals.SESSION_LOGIN_COMPLETED | ||
event_data = { | ||
'user': UserData( | ||
id=123, | ||
is_active=True, | ||
pii=UserPersonalData( | ||
username='foobob', | ||
email='bob@foo.example', | ||
name="Bob Foo", | ||
) | ||
) | ||
} | ||
|
||
mock_producer = MagicMock() | ||
with patch('edx_event_bus_kafka.publishing.event_producer.get_producer_for_signal', return_value=mock_producer): | ||
ep.send_to_event_bus(signal, 'user_stuff', 'user.id', event_data) | ||
|
||
mock_producer.produce.assert_called_once_with( | ||
'user_stuff', key=123, value=event_data, | ||
on_delivery=ep.verify_event, | ||
headers={'ce_type': 'org.openedx.learning.auth.session.login.completed.v1'}, | ||
) |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[not blocking] Should we update this to point to...whatever documentation we have around how to use edx-platform plugins in general?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I couldn't find anything, but I didn't look super hard. But I think it would be worth investigating instead if we can call a test cluster more directly, not requiring the test to be run inside of devstack.