From 7d05a47def9c8f84c7b558cead88da9fa90d9552 Mon Sep 17 00:00:00 2001 From: Success Moses Date: Sun, 8 Dec 2024 23:17:22 +0100 Subject: [PATCH] FIX add error_cb to `confluent.Consumer` config in `ConsumerFromTopic` (#44307) --- .../connections/kafka.rst | 6 +++-- .../hooks.rst | 14 +++++++++++ .../providers/apache/kafka/hooks/consume.py | 23 +++++++++++++++++-- 3 files changed, 39 insertions(+), 4 deletions(-) diff --git a/docs/apache-airflow-providers-apache-kafka/connections/kafka.rst b/docs/apache-airflow-providers-apache-kafka/connections/kafka.rst index 3e570506b647d..c1803f0b21c71 100644 --- a/docs/apache-airflow-providers-apache-kafka/connections/kafka.rst +++ b/docs/apache-airflow-providers-apache-kafka/connections/kafka.rst @@ -35,8 +35,10 @@ Kafka hooks and operators use ``kafka_default`` by default, this connection is v Configuring the Connection -------------------------- -Connections are configured as a json serializable string provided to the ``extra`` field. A full list of parameters -are described in the `Confluent Kafka python library `_. +Connections are configured as a json serializable string provided to the ``extra`` field. The ``error_cb`` parameter can be +used to specify a callback function by providing a path to the function. e.g ``"module.callback_func"``. A full list +of parameters are described in the +`Confluent Kafka python library `_. If you are defining the Airflow connection from the Airflow UI, the ``extra`` field will be renamed to ``Config Dict``. diff --git a/docs/apache-airflow-providers-apache-kafka/hooks.rst b/docs/apache-airflow-providers-apache-kafka/hooks.rst index 7737d4fdb550f..6d80d30ad4b48 100644 --- a/docs/apache-airflow-providers-apache-kafka/hooks.rst +++ b/docs/apache-airflow-providers-apache-kafka/hooks.rst @@ -42,6 +42,20 @@ Reference For further information, look at `Apache Kafka Admin config documentation `_. +.. _howto/hook:KafkaAuthenticationError: + +KafkaAuthenticationError +------------------------ + +Custom exception for Kafka authentication failures. + +Reference +""""""""" + +For further information, look at +`Confluent Kafka Documentation `_. + + .. _howto/hook:KafkaConsumerHook: KafkaConsumerHook diff --git a/providers/src/airflow/providers/apache/kafka/hooks/consume.py b/providers/src/airflow/providers/apache/kafka/hooks/consume.py index 014798910d06c..67d594ae7c8d2 100644 --- a/providers/src/airflow/providers/apache/kafka/hooks/consume.py +++ b/providers/src/airflow/providers/apache/kafka/hooks/consume.py @@ -18,9 +18,23 @@ from collections.abc import Sequence -from confluent_kafka import Consumer +from confluent_kafka import Consumer, KafkaError from airflow.providers.apache.kafka.hooks.base import KafkaBaseHook +from airflow.utils.module_loading import import_string + + +class KafkaAuthenticationError(Exception): + """Custom exception for Kafka authentication failures.""" + + pass + + +def error_callback(err): + """Handle kafka errors.""" + if err.code() == KafkaError._AUTHENTICATION: + raise KafkaAuthenticationError(f"Authentication failed: {err}") + print("Exception received: ", err) class KafkaConsumerHook(KafkaBaseHook): @@ -36,7 +50,12 @@ def __init__(self, topics: Sequence[str], kafka_config_id=KafkaBaseHook.default_ self.topics = topics def _get_client(self, config) -> Consumer: - return Consumer(config) + config_shallow = config.copy() + if config.get("error_cb") is None: + config_shallow["error_cb"] = error_callback + else: + config_shallow["error_cb"] = import_string(config["error_cb"]) + return Consumer(config_shallow) def get_consumer(self) -> Consumer: """Return a Consumer that has been subscribed to topics."""