Skip to content

Commit

Permalink
feat: determine signals from message headers
Browse files Browse the repository at this point in the history
  • Loading branch information
Rebecca Graber committed May 5, 2023
1 parent 6712aa7 commit e36dc45
Show file tree
Hide file tree
Showing 4 changed files with 142 additions and 76 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,14 @@ Change Log
Unreleased
**********

[3.10.0] - 2023-05-05
*********************
Changed
=======
* Switch from ``edx-sphinx-theme`` to ``sphinx-book-theme`` since the former is
deprecated
* Refactored consumer to manually deserialize messages instead of using DeserializingConsumer
* Make signal argument optional in consumer command (take signal from message headers)

[3.9.6] - 2023-02-24
********************
Expand Down
2 changes: 1 addition & 1 deletion edx_event_bus_kafka/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,4 @@
from edx_event_bus_kafka.internal.consumer import KafkaEventConsumer
from edx_event_bus_kafka.internal.producer import KafkaEventProducer, create_producer

__version__ = '3.9.6'
__version__ = '3.10.0'
128 changes: 82 additions & 46 deletions edx_event_bus_kafka/internal/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,21 +103,20 @@ def _reconnect_to_db_if_needed():

class KafkaEventConsumer:
"""
Construct consumer for the given topic, group, and signal. The consumer can then
emit events from the event bus using the configured signal.
Construct consumer for the given topic and group. The consumer can then
emit events from the event bus using the signal from the message headers.
Note that the topic should be specified here *without* the optional environment prefix.
Can also consume messages indefinitely off the queue.
"""

def __init__(self, topic, group_id, signal):
def __init__(self, topic, group_id):
if confluent_kafka is None: # pragma: no cover
raise Exception('Library confluent-kafka not available. Cannot create event consumer.')

self.topic = topic
self.group_id = group_id
self.signal = signal
self.consumer = self._create_consumer()
self._shut_down_loop = False
self.schema_registry_client = get_schema_registry_client()
Expand Down Expand Up @@ -240,7 +239,6 @@ def _consume_indefinitely(self):
run_context = {
'full_topic': full_topic,
'consumer_group': self.group_id,
'expected_signal': self.signal,
}
self.consumer.subscribe([full_topic])
logger.info(f"Running consumer for {run_context!r}")
Expand Down Expand Up @@ -268,8 +266,9 @@ def _consume_indefinitely(self):
with function_trace('_consume_indefinitely_consume_single_message'):
# Before processing, make sure our db connection is still active
_reconnect_to_db_if_needed()
msg.set_value(self._deserialize_message_value(msg))
self.emit_signals_from_message(msg)
signal = self.determine_signal(msg)
msg.set_value(self._deserialize_message_value(msg, signal))
self.emit_signals_from_message(msg, signal)
consecutive_errors = 0

self._add_message_monitoring(run_context=run_context, message=msg)
Expand Down Expand Up @@ -317,54 +316,42 @@ def consume_indefinitely(self, offset_timestamp=None):
self.reset_offsets_and_sleep_indefinitely(offset_timestamp)

@function_trace('emit_signals_from_message')
def emit_signals_from_message(self, msg):
def emit_signals_from_message(self, msg, signal):
"""
Determine the correct signal and send the event from the message.
Send the event from the message via the given signal.
Assumes the message has been deserialized and the signal matches the event_type of the message header.
Arguments:
msg (Message): Consumed message.
msg (Message): Deserialized message.
signal (OpenEdxPublicSignal): Signal - must match the event_type of the message header.
"""
self._log_message_received(msg)

# DeserializingConsumer.poll() always returns either a valid message
# or None, and raises an exception in all other cases. This means
# we don't need to check msg.error() ourselves. But... check it here
# anyway for robustness against code changes.
if msg.error() is not None:
raise UnusableMessageError(
f"Polled message had error object (shouldn't happen): {msg.error()!r}"
f"Polled message had error object: {msg.error()!r}"
)

headers = msg.headers() or [] # treat None as []

event_types = get_message_header_values(headers, HEADER_EVENT_TYPE)
if len(event_types) == 0:
raise UnusableMessageError(
"Missing ce_type header on message, cannot determine signal"
)
if len(event_types) > 1:
raise UnusableMessageError(
"Multiple ce_type headers found on message, cannot determine signal"
)
event_type = event_types[0]
# This should also never happen since the signal should be determined from the message
# but it's here to prevent misuse of the method
msg_event_type = self._get_event_type_from_message(msg)
if signal.event_type != msg_event_type:
raise Exception(f"Error emitting event from Kafka: (UNEXPECTED) message event type {msg_event_type}"
f" does not match signal {signal.event_type}")

if event_type != self.signal.event_type:
raise UnusableMessageError(
f"Signal types do not match. Expected {self.signal.event_type}. "
f"Received message of type {event_type}."
)
try:
event_metadata = _get_metadata_from_headers(headers)
event_metadata = _get_metadata_from_headers(msg.headers())
except Exception as e:
raise UnusableMessageError(f"Error determining metadata from message headers: {e}") from e

with function_trace('emit_signals_from_message_send_event_with_custom_metadata'):
send_results = self.signal.send_event_with_custom_metadata(event_metadata, **msg.value())
send_results = signal.send_event_with_custom_metadata(event_metadata, **msg.value())

# Raise an exception if any receivers errored out. This allows logging of the receivers
# along with partition, offset, etc. in record_event_consuming_error. Hopefully the
# receiver code is idempotent and we can just replay any messages that were involved.
self._check_receiver_results(send_results)
self._check_receiver_results(send_results, signal)

# At the very end, log that a message was processed successfully.
# Since we're single-threaded, no other information is needed;
Expand All @@ -373,21 +360,69 @@ def emit_signals_from_message(self, msg):
if AUDIT_LOGGING_ENABLED.is_enabled():
logger.info('Message from Kafka processed successfully')

def _deserialize_message_value(self, msg):
def determine_signal(self, msg) -> OpenEdxPublicSignal:
"""
Determine which OpenEdxPublicSignal should be used to emit the event data in a message
Arguments:
msg (Message): Consumed message
Returns:
The OpenEdxPublicSignal instance corresponding to the ce_type header on the message
"""
event_type = self._get_event_type_from_message(msg)
try:
return OpenEdxPublicSignal.get_signal_by_type(event_type)
except KeyError as ke:
raise UnusableMessageError(
f"Unrecognized type {event_type} found on message, cannot determine signal"
) from ke

def _get_event_type_from_message(self, msg):
"""
Return the event type from the ce_type header
Arguments:
msg (Message): the consumed message
Returns
The associated event type as a string
"""
headers = msg.headers() or [] # treat None as []
event_types = get_message_header_values(headers, HEADER_EVENT_TYPE)
if len(event_types) == 0:
raise UnusableMessageError(
"Missing ce_type header on message, cannot determine signal"
)
if len(event_types) > 1:
raise UnusableMessageError(
"Multiple ce_type headers found on message, cannot determine signal"
)
return event_types[0]

def _deserialize_message_value(self, msg, signal: OpenEdxPublicSignal):
"""
Deserialize an Avro message value
The signal is expected to match the ce_type header on the message
Arguments:
msg (Message): the raw message from the consumer
signal (OpenEdxPublicSignal): The instance of OpenEdxPublicSignal corresponding to the ce_type header on msg
Returns:
The deserialized message value
"""
signal_deserializer = get_deserializer(self.signal, self.schema_registry_client)
msg_event_type = self._get_event_type_from_message(msg)
if signal.event_type != msg_event_type:
# This should never happen but it's here to prevent misuse of the method
raise Exception(f"Error deserializing event from Kafka: (UNEXPECTED) message event type {msg_event_type}"
f" does not match signal {signal.event_type}")
signal_deserializer = get_deserializer(signal, self.schema_registry_client)
ctx = SerializationContext(msg.topic(), MessageField.VALUE, msg.headers())
return signal_deserializer(msg.value(), ctx)

def _check_receiver_results(self, send_results: list):
def _check_receiver_results(self, send_results: list, signal: OpenEdxPublicSignal):
"""
Raises exception if any of the receivers produced an exception.
Expand Down Expand Up @@ -415,7 +450,7 @@ def _check_receiver_results(self, send_results: list):
raise ReceiverError(
f"{len(error_descriptions)} receiver(s) out of {len(send_results)} "
"produced errors (stack trace elsewhere in logs) "
f"when handling signal {self.signal}: {', '.join(error_descriptions)}",
f"when handling signal {signal}: {', '.join(error_descriptions)}",
errors
)

Expand Down Expand Up @@ -582,12 +617,11 @@ class ConsumeEventsCommand(BaseCommand):
Management command for Kafka consumer workers in the event bus.
"""
help = """
Consume messages of specified signal type from a Kafka topic and send their data to that signal.
Consume messages from a Kafka topic and send their data to the correct signal.
Example::
python3 manage.py cms consume_events -t user-login -g user-activity-service \
-s org.openedx.learning.auth.session.login.completed.v1
python3 manage.py cms consume_events -t user-login -g user-activity-service
"""

def add_arguments(self, parser):
Expand All @@ -605,12 +639,16 @@ def add_arguments(self, parser):
required=True,
help='Consumer group id'
)

# TODO: remove this once callers have been updated. Left optional to avoid the need for lockstep changes
parser.add_argument(
'-s', '--signal',
nargs=1,
required=True,
help='Type of signal to emit from consumed messages.'
required=False,
default=None,
help='Deprecated argument. Correct signal will be determined from event'
)

parser.add_argument(
'-o', '--offset_time',
nargs=1,
Expand All @@ -634,7 +672,6 @@ def handle(self, *args, **options):

try:
load_all_signals()
signal = OpenEdxPublicSignal.get_signal_by_type(options['signal'][0])
if options['offset_time'] and options['offset_time'][0] is not None:
try:
offset_timestamp = datetime.fromisoformat(options['offset_time'][0])
Expand All @@ -647,7 +684,6 @@ def handle(self, *args, **options):
event_consumer = KafkaEventConsumer(
topic=options['topic'][0],
group_id=options['group_id'][0],
signal=signal,
)
if offset_timestamp is None:
event_consumer.consume_indefinitely()
Expand Down
Loading

0 comments on commit e36dc45

Please sign in to comment.