diff --git a/CHANGES.rst b/CHANGES.rst index 3edee3ed..bcaa63bb 100644 --- a/CHANGES.rst +++ b/CHANGES.rst @@ -1,6 +1,11 @@ CHANGES ------- +552.feature +^^^^^^^^^^^ + +Add new callbacks for connection opened/close for consumer and producer + 523.feature ^^^^^^^^^^^ diff --git a/aiokafka/client.py b/aiokafka/client.py old mode 100644 new mode 100755 index 8737737f..a6d6a503 --- a/aiokafka/client.py +++ b/aiokafka/client.py @@ -137,6 +137,8 @@ def __init__(self, *, loop, bootstrap_servers='localhost', self._md_update_fut = None self._md_update_waiter = create_future(loop=self._loop) self._get_conn_lock = asyncio.Lock(loop=loop) + self._on_connection_closed_callback = None + self._on_connection_opened_callback = None def __repr__(self): return '' % self._client_id @@ -371,7 +373,24 @@ def set_topics(self, topics): self._topics = set(topics) return res + def set_on_connection_closed_callback(self, on_connection_closed_callback): + """Set a callback function to invoke when a connection to Kafka is closed + Arguments: + on_connection_closed_callback: a callback function to call + """ + self._on_connection_closed_callback = on_connection_closed_callback + + def set_on_connection_opened_callback(self, on_connection_opened_callback): + """Set a callback function to invoke when a connection to Kafka is opened + Arguments: + on_connection_opened_callback: a callback function to call + """ + self._on_connection_opened_callback = on_connection_opened_callback + def _on_connection_closed(self, conn, reason): + if self._on_connection_closed_callback: + self._on_connection_closed_callback(conn, reason) + """ Callback called when connection is closed """ # Connection failures imply that our metadata is stale, so let's @@ -441,6 +460,9 @@ async def _get_conn( self.force_metadata_update() return None else: + if self._on_connection_opened_callback: + self._on_connection_opened_callback(self._conns[conn_id]) + return self._conns[conn_id] async def ready(self, node_id, *, group=ConnectionGroup.DEFAULT): diff --git a/aiokafka/consumer/consumer.py b/aiokafka/consumer/consumer.py old mode 100644 new mode 100755 index 5b861f0c..4e5a8fff --- a/aiokafka/consumer/consumer.py +++ b/aiokafka/consumer/consumer.py @@ -321,6 +321,21 @@ def __del__(self, _warnings=warnings): context['source_traceback'] = self._source_traceback self._loop.call_exception_handler(context) + def set_on_connection_closed_callback(self, on_connection_closed_callback): + """Set a callback function to invoke when a connection to Kafka is closed + Arguments: + on_connection_closed_callback: a callback function to call + """ + self._client.set_on_connection_closed_callback(on_connection_closed_callback) + + def set_on_connection_opened_callback(self, on_connection_opened_callback): + """Set a callback function to invoke when a connection to Kafka is opened + Arguments: + on_connection_opened_callback: a callback function to call + """ + self._client.set_on_connection_opened_callback(on_connection_opened_callback) + + async def start(self): """ Connect to Kafka cluster. This will: diff --git a/aiokafka/producer/producer.py b/aiokafka/producer/producer.py old mode 100644 new mode 100755 index d96e0a1b..925f5d08 --- a/aiokafka/producer/producer.py +++ b/aiokafka/producer/producer.py @@ -549,6 +549,20 @@ async def send_offsets_to_transaction(self, offsets, group_id): fut = self._txn_manager.add_offsets_to_txn(formatted_offsets, group_id) await asyncio.shield(fut, loop=self._loop) + def set_on_connection_closed_callback(self, on_connection_closed_callback): + """Set a callback function to invoke when a connection to Kafka is closed + Arguments: + on_connection_closed_callback: a callback function to call + """ + self.client.set_on_connection_closed_callback(on_connection_closed_callback) + + def set_on_connection_opened_callback(self, on_connection_opened_callback): + """Set a callback function to invoke when a connection to Kafka is opened + Arguments: + on_connection_opened_callback: a callback function to call + """ + self.client.set_on_connection_opened_callback(on_connection_opened_callback) + class TransactionContext: