diff --git a/edx_event_bus_kafka/config.py b/edx_event_bus_kafka/config.py new file mode 100644 index 00000000..dfe128fc --- /dev/null +++ b/edx_event_bus_kafka/config.py @@ -0,0 +1,59 @@ +""" +Configuration loading and validation. +""" + +from typing import Optional + +from attrs import attr +from confluent_kafka.schema_registry import SchemaRegistryClient +from django.conf import settings + + +@attr.s +class CommonConfig: + settings = attr.ib(type=dict) + + +def create_schema_registry_client() -> Optional[SchemaRegistryClient]: + """ + Create a schema registry client from common settings. + """ + url = getattr(settings, 'EVENT_BUS_KAFKA_SCHEMA_REGISTRY_URL', None) + if url is None: + warnings.warn("Cannot configure event-bus-kafka: Missing setting EVENT_BUS_KAFKA_SCHEMA_REGISTRY_URL") + return None + + key = getattr(settings, 'EVENT_BUS_KAFKA_SCHEMA_REGISTRY_API_KEY', '') + secret = getattr(settings, 'EVENT_BUS_KAFKA_SCHEMA_REGISTRY_API_SECRET', '') + + return SchemaRegistryClient({ + 'url': url, + 'basic.auth.user.info': f"{key}:{secret}", + }) + + +def load_common_settings() -> Optional[dict]: + """ + Load common settings, a base for either producer or consumer configuration. + """ + bootstrap_servers = getattr(settings, 'EVENT_BUS_KAFKA_BOOTSTRAP_SERVERS', None) + if bootstrap_servers is None: + warnings.warn("Cannot configure event-bus-kafka: Missing setting EVENT_BUS_KAFKA_BOOTSTRAP_SERVERS") + return None + + settings = { + 'bootstrap.servers': bootstrap_servers, + } + + key = getattr(settings, 'EVENT_BUS_KAFKA_API_KEY', None) + secret = getattr(settings, 'EVENT_BUS_KAFKA_API_SECRET', None) + + if key and secret: + settings.update({ + 'sasl.mechanism': 'PLAIN', + 'security.protocol': 'SASL_SSL', + 'sasl.username': key, + 'sasl.password': secret, + }) + + return settings diff --git a/edx_event_bus_kafka/management/commands/produce_event.py b/edx_event_bus_kafka/management/commands/produce_event.py index 5d0cd16c..4afd3b2d 100644 --- a/edx_event_bus_kafka/management/commands/produce_event.py +++ b/edx_event_bus_kafka/management/commands/produce_event.py @@ -59,6 +59,7 @@ def handle(self, *args, **options): topic=options['topic'][0], event_key_field=options['key_field'][0], event_data=json.loads(options['data'][0]), + sync=True, # otherwise command may exit before delivery is complete ) except Exception: # pylint: disable=broad-except logger.exception("Error producing Kafka event") diff --git a/edx_event_bus_kafka/publishing/event_producer.py b/edx_event_bus_kafka/publishing/event_producer.py index 285cab43..52c4c6db 100644 --- a/edx_event_bus_kafka/publishing/event_producer.py +++ b/edx_event_bus_kafka/publishing/event_producer.py @@ -203,7 +203,10 @@ def on_event_deliver(err, evt): f"partition={evt.partition()}") -def send_to_event_bus(signal: OpenEdxPublicSignal, topic: str, event_key_field: str, event_data: dict) -> None: +def send_to_event_bus( + signal: OpenEdxPublicSignal, topic: str, event_key_field: str, event_data: dict, + sync: bool = False, +) -> None: """ Send a signal event to the event bus under the specified topic. @@ -215,6 +218,8 @@ def send_to_event_bus(signal: OpenEdxPublicSignal, topic: str, event_key_field: event_key_field: Path to the event data field to use as the event key (period-delimited string naming the dictionary keys to descend) event_data: The event data (kwargs) sent to the signal + sync: Whether to wait indefinitely for event to be received by the message bus (probably + only want to use this for testing) """ producer = get_producer_for_signal(signal, event_key_field) if producer is None: # Note: SerializingProducer has False truthiness when len() == 0 @@ -224,6 +229,18 @@ def send_to_event_bus(signal: OpenEdxPublicSignal, topic: str, event_key_field: producer.produce(topic, key=event_key, value=event_data, on_delivery=on_event_deliver, headers={EVENT_TYPE_HEADER_KEY: signal.event_type}) - # TODO (EventBus): Investigate poll() vs. flush(), and other related settings: - # See https://github.com/openedx/event-bus-kafka/issues/10 - producer.poll() # wait indefinitely for the above event to either be delivered or fail + + if sync: + # Wait for all buffered events to send, then wait for all of + # them to be acknowledged, and trigger all callbacks. + producer.flush(-1) + else: + # Opportunistically ensure any pending callbacks from recent events are triggered. + # + # This assumes events come regularly, or that we're not concerned about + # high latency between delivery and callback. If those assumptions are + # false, we should switch to calling poll(1.0) or similar in a loop on + # a separate thread. + # + # Docs: https://github.com/edenhill/librdkafka/blob/4faeb8132521da70b6bcde14423a14eb7ed5c55e/src/rdkafka.h#L3079 + producer.poll(0)