Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Publish generic signals to event bus #6

Merged
merged 19 commits into from
Jul 18, 2022
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ pip-log.txt
.tox
coverage.xml
htmlcov/

/pii_report/


# The Silver Searcher
Expand Down Expand Up @@ -59,3 +59,6 @@ docs/edx_event_bus_kafka.*.rst
# Private requirements
requirements/private.in
requirements/private.txt

# IDE
.idea
9 changes: 7 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,16 @@ docs: ## generate Sphinx HTML documentation, including API docs
$(BROWSER)docs/_build/html/index.html

# Define PIP_COMPILE_OPTS=-v to get more information during make upgrade.
PIP_COMPILE = pip-compile --rebuild --upgrade $(PIP_COMPILE_OPTS)
PIP_COMPILE = pip-compile --upgrade $(PIP_COMPILE_OPTS)

upgrade: export CUSTOM_COMPILE_COMMAND=make upgrade
upgrade: ## update the requirements/*.txt files with the latest packages satisfying requirements/*.in
pip install -qr requirements/pip-tools.txt
# Make sure to compile files after any other files they include!
$(PIP_COMPILE) --allow-unsafe -o requirements/pip.txt requirements/pip.in
$(PIP_COMPILE) -o requirements/pip-tools.txt requirements/pip-tools.in
pip install -qr requirements/pip.txt
pip install -qr requirements/pip-tools.txt
$(PIP_COMPILE) -o requirements/base.txt requirements/base.in
$(PIP_COMPILE) -o requirements/test.txt requirements/test.in
$(PIP_COMPILE) -o requirements/doc.txt requirements/doc.in
Expand All @@ -63,7 +66,9 @@ test: clean ## run tests in the current virtualenv
diff_cover: test ## find diff lines that need test coverage
diff-cover coverage.xml

test-all: quality pii_check ## run tests on every supported Python/Django combination
test-all: clean quality pii_check ## run tests on every supported Python/Django combination
tox -e docs
rm -rf build # artifact produced by docs run, interferes with pytest
tox

validate: quality pii_check test ## run tests and quality checks
Expand Down
2 changes: 1 addition & 1 deletion codecov.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,6 @@ coverage:
patch:
default:
enabled: yes
target: 100%
target: 95%

comment: false
35 changes: 35 additions & 0 deletions docs/how_tos/manual_testing.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
Manual testing
==============

The producer can be tested manually against a Kafka running in devstack.

#. Create a "unit test" in one of the test files that will actually call Kafka. For example, this could be added to the end of ``edx_event_bus_kafka/publishing/test_event_producer.py``::

import random

def test_actually_send_to_event_bus():
signal = openedx_events.learning.signals.SESSION_LOGIN_COMPLETED
# Make events distinguishable
id = random.randrange(1000)
event_data = {
'user': UserData(
id=id,
is_active=True,
pii=UserPersonalData(
username=f'foobob_{id:03}',
email='bob@foo.example',
name="Bob Foo",
)
)
}

print(f"Sending event with random user ID {id}.")
ep.send_to_event_bus(signal, 'user_stuff', 'user.id', event_data)

#. Make or refresh a copy of this repo where it can be seen from inside devstack: ``rsync -sax -delete ./ ../src/event-bus-kafka/``
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[not blocking] Should we update this to point to...whatever documentation we have around how to use edx-platform plugins in general?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I couldn't find anything, but I didn't look super hard. But I think it would be worth investigating instead if we can call a test cluster more directly, not requiring the test to be run inside of devstack.

#. In devstack, start Kafka and the control webapp: ``make dev.up.kafka-control-center`` and watch ``make dev.logs.kafka-control-center`` until server is up and happy (may take a few minutes; watch for ``INFO Kafka startTimeMs``)
#. Load the control center UI: http://localhost:9021/clusters and wait for the cluster to become healthy
#. In devstack, run ``lms-up-without-deps-shell`` to bring up an arbitrary shell inside Docker networking (LMS, in this case)
#. In the LMS shell, run ``pip install -e /edx/src/event-bus-kafka`` and then run whatever test you want, e.g. ``pytest /edx/src/event-bus-kafka/edx_event_bus_kafka/publishing/test_event_producer.py::test_actually_send_to_event_bus``
#. Go to the topic that was created and then into the Messages tab; select offset=0 to make sure you can see messages that were sent before you had the UI open.
#. Rerun ``rsync`` after any edits
1 change: 1 addition & 0 deletions docs/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ Contents:
:maxdepth: 2

readme
how_tos/manual_testing
getting_started
testing
internationalization
Expand Down
2 changes: 0 additions & 2 deletions edx_event_bus_kafka/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,3 @@
"""

__version__ = '0.1.0'

default_app_config = 'edx_event_bus_kafka.apps.EdxEventBusKafkaConfig' # pylint: disable=invalid-name
Empty file.
177 changes: 177 additions & 0 deletions edx_event_bus_kafka/publishing/event_producer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,177 @@
"""
Produce Kafka events from signals.
"""

import json
import logging
from functools import lru_cache
from typing import Any, List

from confluent_kafka import SerializingProducer
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry.avro import AvroSerializer
from django.conf import settings
from openedx_events.event_bus.avro.serializer import AvroSignalSerializer

logger = logging.getLogger(__name__)

# CloudEvent standard name for the event type header, see
# https://github.com/cloudevents/spec/blob/v1.0.1/kafka-protocol-binding.md#325-example
EVENT_TYPE_HEADER_KEY = "ce_type"


def extract_event_key(event_data: dict, event_key_field: str) -> Any:
"""
From an event object, extract a Kafka event key (not yet serialized).
timmc-edx marked this conversation as resolved.
Show resolved Hide resolved

Arguments:
event_data: The event data sent to a signal (the kwargs dictionary)
event_key_field: Period-delimited string naming the dictionary keys to descend to find the event key data

Returns:
Key data, which might be an integer, string, dictionary, etc.
"""
field_path = event_key_field.split(".")
current_data = event_data
for field_name in field_path:
if isinstance(current_data, dict):
if field_name not in current_data:
raise Exception(
f"Could not extract key from event; lookup in {event_key_field} "
f"failed at {field_name!r} in dictionary"
)
current_data = current_data[field_name]
else:
if not hasattr(current_data, field_name):
raise Exception(
f"Could not extract key from event; lookup in {event_key_field} "
f"failed at {field_name!r} in object"
)
current_data = getattr(current_data, field_name)
return current_data


def descend_avro_schema(serializer_schema: dict, field_path: List[str]) -> dict:
"""
Extract a subfield within an Avro schema, recursively.

Arguments:
serializer_schema: An Avro schema (nested dictionaries)
timmc-edx marked this conversation as resolved.
Show resolved Hide resolved
field_path: List of strings matching the 'name' of successively deeper subfields

Returns:
Schema for some field

TODO: Move to openedx_events.event_bus.avro.serializer?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not a blocker, but we should clarify with @robrap how we want to handle TODOs like this

"""
subschema = serializer_schema
for field_name in field_path:
try:
# Either descend into .fields (for dictionaries) or .type.fields (for classes).
if 'fields' not in subschema:
# Descend through .type wrapper first
subschema = subschema['type']
field_list = subschema['fields']

matching = [field for field in field_list if field['name'] == field_name]
subschema = matching[0]
except BaseException as e:
raise Exception(
f"Error traversing Avro schema along path {field_path!r}; failed at {field_name!r}."
) from e
return subschema


def extract_key_schema(signal_serializer: AvroSignalSerializer, event_key_field: str) -> str:
"""
From a signal's serializer, extract just the part of the Avro schema that will be used for the Kafka event key.
timmc-edx marked this conversation as resolved.
Show resolved Hide resolved

Arguments:
signal_serializer: The signal serializer to extract a sub-schema from
event_key_field: Period-delimited string naming the field 'name' keys to descend to find the key schema

Returns:
The key's schema, as a string.
"""
subschema = descend_avro_schema(signal_serializer.schema, event_key_field.split("."))
# Same as used by AvroSignalSerializer#schema_string in openedx-events
return json.dumps(subschema, sort_keys=True)


@lru_cache
timmc-edx marked this conversation as resolved.
Show resolved Hide resolved
def get_producer_for_signal(signal, event_key_field):
timmc-edx marked this conversation as resolved.
Show resolved Hide resolved
"""
Create the producer for a signal and a key field path.
"""
schema_registry_config = {
'url': getattr(settings, 'SCHEMA_REGISTRY_URL', ''),
'basic.auth.user.info': f"{getattr(settings, 'SCHEMA_REGISTRY_API_KEY', '')}"
f":{getattr(settings, 'SCHEMA_REGISTRY_API_SECRET', '')}",
}
schema_registry_client = SchemaRegistryClient(schema_registry_config)
signal_serializer = AvroSignalSerializer(signal)

def inner_to_dict(event_data, ctx=None): # pylint: disable=unused-argument
"""Tells Avro how to turn objects into dictionaries."""
return signal_serializer.to_dict(event_data)

# Serializers for key and value components of Kafka event
key_serializer = AvroSerializer(
schema_str=extract_key_schema(signal_serializer, event_key_field),
schema_registry_client=schema_registry_client,
to_dict=inner_to_dict,
)
value_serializer = AvroSerializer(
schema_str=signal_serializer.schema_string(),
schema_registry_client=schema_registry_client,
to_dict=inner_to_dict,
)

producer_settings = {
'bootstrap.servers': getattr(settings, 'KAFKA_BOOTSTRAP_SERVER', None),
'key.serializer': key_serializer,
'value.serializer': value_serializer,
}

if getattr(settings, 'KAFKA_API_KEY', None) and getattr(settings, 'KAFKA_API_SECRET', None):
producer_settings.update({
'sasl.mechanism': 'PLAIN',
'security.protocol': 'SASL_SSL',
'sasl.username': settings.KAFKA_API_KEY,
'sasl.password': settings.KAFKA_API_SECRET,
})

return SerializingProducer(producer_settings)


def verify_event(err, evt):
"""
Simple callback method for debugging event production

:param err: Error if event production failed
timmc-edx marked this conversation as resolved.
Show resolved Hide resolved
:param evt: Event that was delivered
"""
if err is not None:
logger.warning(f"Event delivery failed: {err!r}")
else:
# Don't log msg.value() because it may contain userids and/or emails
logger.info(f"Event delivered to topic {evt.topic()}; key={evt.key()}; "
f"partition={evt.partition()}")


def send_to_event_bus(signal, topic, event_key_field, event_data):
"""
Send a signal event to the event bus under the specified topic.

:param signal: The original OpenEdxPublicSignal the event was sent to
:param topic: The event bus topic for the event
:param event_key_field: The name of the signal data field to use as the
event key (dot-separated path of dictionary key/attribute names)
:param event_data: The data sent to the signal
"""
producer = get_producer_for_signal(signal, event_key_field)
event_key = extract_event_key(event_data, event_key_field)
producer.produce(topic, key=event_key, value=event_data,
on_delivery=verify_event,
timmc-edx marked this conversation as resolved.
Show resolved Hide resolved
headers={EVENT_TYPE_HEADER_KEY: signal.event_type})
producer.poll() # wait indefinitely for the above event to either be delivered or fail
112 changes: 112 additions & 0 deletions edx_event_bus_kafka/publishing/test_event_producer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
"""
Test the event producer code.
"""

from unittest import TestCase
from unittest.mock import MagicMock, patch

import openedx_events.learning.signals
import pytest
from django.test import override_settings
from openedx_events.event_bus.avro.serializer import AvroSignalSerializer
from openedx_events.learning.data import UserData, UserPersonalData

import edx_event_bus_kafka.publishing.event_producer as ep


class TestEventProducer(TestCase):
"""Test producer."""

def test_extract_event_key(self):
event_data = {
'user': UserData(
id=123,
is_active=True,
pii=UserPersonalData(
username='foobob',
email='bob@foo.example',
name="Bob Foo",
)
)
}

assert ep.extract_event_key(event_data, 'user.pii.username') == 'foobob'
with pytest.raises(Exception,
match="Could not extract key from event; lookup in xxx failed at 'xxx' in dictionary"):
ep.extract_event_key(event_data, 'xxx')
with pytest.raises(Exception,
match="Could not extract key from event; lookup in user.xxx failed at 'xxx' in object"):
ep.extract_event_key(event_data, 'user.xxx')

def test_descend_avro_schema(self):
signal = openedx_events.learning.signals.SESSION_LOGIN_COMPLETED
schema = AvroSignalSerializer(signal).schema

assert ep.descend_avro_schema(schema, ['user', 'pii', 'username']) == {"name": "username", "type": "string"}

with pytest.raises(Exception) as excinfo:
ep.descend_avro_schema(schema, ['user', 'xxx'])
assert excinfo.value.args == ("Error traversing Avro schema along path ['user', 'xxx']; failed at 'xxx'.",)
assert isinstance(excinfo.value.__cause__, IndexError)

def test_extract_key_schema(self):
signal = openedx_events.learning.signals.SESSION_LOGIN_COMPLETED
schema = ep.extract_key_schema(AvroSignalSerializer(signal), 'user.pii.username')
assert schema == '{"name": "username", "type": "string"}'

def test_get_producer_for_signal(self):
signal = openedx_events.learning.signals.SESSION_LOGIN_COMPLETED
with override_settings(
SCHEMA_REGISTRY_URL='http://localhost:12345',
SCHEMA_REGISTRY_API_KEY='some_key',
SCHEMA_REGISTRY_API_SECRET='some_secret',
KAFKA_BOOTSTRAP_SERVER='http://localhost:54321',
# include these just to maximize code coverage
KAFKA_API_KEY='some_other_key',
KAFKA_API_SECRET='some_other_secret',
):
producer_first = ep.get_producer_for_signal(signal, 'user.id')
producer_second = ep.get_producer_for_signal(signal, 'user.id')
# There's not a lot we can test here, but we can at least
# check that construction succeeds and that caching is
# happening.
assert producer_first is producer_second

@patch('edx_event_bus_kafka.publishing.event_producer.logger')
def test_verify_event(self, mock_logger):
fake_event = MagicMock()
fake_event.topic.return_value = 'some_topic'
fake_event.key.return_value = 'some_key'
fake_event.partition.return_value = 'some_partition'

ep.verify_event(Exception("problem!"), fake_event)
mock_logger.warning.assert_called_once_with("Event delivery failed: Exception('problem!')")

ep.verify_event(None, fake_event)
mock_logger.info.assert_called_once_with(
'Event delivered to topic some_topic; key=some_key; partition=some_partition'
)

def test_send_to_event_bus(self):
signal = openedx_events.learning.signals.SESSION_LOGIN_COMPLETED
event_data = {
'user': UserData(
id=123,
is_active=True,
pii=UserPersonalData(
username='foobob',
email='bob@foo.example',
name="Bob Foo",
)
)
}

mock_producer = MagicMock()
with patch('edx_event_bus_kafka.publishing.event_producer.get_producer_for_signal', return_value=mock_producer):
ep.send_to_event_bus(signal, 'user_stuff', 'user.id', event_data)

mock_producer.produce.assert_called_once_with(
'user_stuff', key=123, value=event_data,
on_delivery=ep.verify_event,
headers={'ce_type': 'org.openedx.learning.auth.session.login.completed.v1'},
)
Loading