Skip to content

Commit

Permalink
FIX add error_cb to confluent.Consumer config in ConsumerFromTopic (
Browse files Browse the repository at this point in the history
  • Loading branch information
SuccessMoses authored Dec 8, 2024
1 parent da618aa commit 7d05a47
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,10 @@ Kafka hooks and operators use ``kafka_default`` by default, this connection is v
Configuring the Connection
--------------------------

Connections are configured as a json serializable string provided to the ``extra`` field. A full list of parameters
are described in the `Confluent Kafka python library <https://github.com/confluentinc/librdkafka/blob/master/CONFIGURATION.md>`_.
Connections are configured as a json serializable string provided to the ``extra`` field. The ``error_cb`` parameter can be
used to specify a callback function by providing a path to the function. e.g ``"module.callback_func"``. A full list
of parameters are described in the
`Confluent Kafka python library <https://github.com/confluentinc/librdkafka/blob/master/CONFIGURATION.md>`_.

If you are defining the Airflow connection from the Airflow UI, the ``extra`` field will be renamed to ``Config Dict``.

Expand Down
14 changes: 14 additions & 0 deletions docs/apache-airflow-providers-apache-kafka/hooks.rst
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,20 @@ Reference
For further information, look at `Apache Kafka Admin config documentation <https://kafka.apache.org/documentation/#adminclientconfigs>`_.


.. _howto/hook:KafkaAuthenticationError:

KafkaAuthenticationError
------------------------

Custom exception for Kafka authentication failures.

Reference
"""""""""

For further information, look at
`Confluent Kafka Documentation <https://docs.confluent.io/platform/current/clients/confluent-kafka-python/html/index.html#pythonclient-kafkaerror>`_.


.. _howto/hook:KafkaConsumerHook:

KafkaConsumerHook
Expand Down
23 changes: 21 additions & 2 deletions providers/src/airflow/providers/apache/kafka/hooks/consume.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,23 @@

from collections.abc import Sequence

from confluent_kafka import Consumer
from confluent_kafka import Consumer, KafkaError

from airflow.providers.apache.kafka.hooks.base import KafkaBaseHook
from airflow.utils.module_loading import import_string


class KafkaAuthenticationError(Exception):
"""Custom exception for Kafka authentication failures."""

pass


def error_callback(err):
"""Handle kafka errors."""
if err.code() == KafkaError._AUTHENTICATION:
raise KafkaAuthenticationError(f"Authentication failed: {err}")
print("Exception received: ", err)


class KafkaConsumerHook(KafkaBaseHook):
Expand All @@ -36,7 +50,12 @@ def __init__(self, topics: Sequence[str], kafka_config_id=KafkaBaseHook.default_
self.topics = topics

def _get_client(self, config) -> Consumer:
return Consumer(config)
config_shallow = config.copy()
if config.get("error_cb") is None:
config_shallow["error_cb"] = error_callback
else:
config_shallow["error_cb"] = import_string(config["error_cb"])
return Consumer(config_shallow)

def get_consumer(self) -> Consumer:
"""Return a Consumer that has been subscribed to topics."""
Expand Down

0 comments on commit 7d05a47

Please sign in to comment.