Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add connection open/close callbacks #552

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions CHANGES.rst
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
CHANGES
-------

552.feature
^^^^^^^^^^^

Add new callbacks for connection opened/close for consumer and producer

523.feature
^^^^^^^^^^^

Expand Down
22 changes: 22 additions & 0 deletions aiokafka/client.py
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,8 @@ def __init__(self, *, loop, bootstrap_servers='localhost',
self._md_update_fut = None
self._md_update_waiter = create_future(loop=self._loop)
self._get_conn_lock = asyncio.Lock(loop=loop)
self._on_connection_closed_callback = None
self._on_connection_opened_callback = None

def __repr__(self):
return '<AIOKafkaClient client_id=%s>' % self._client_id
Expand Down Expand Up @@ -371,7 +373,24 @@ def set_topics(self, topics):
self._topics = set(topics)
return res

def set_on_connection_closed_callback(self, on_connection_closed_callback):
"""Set a callback function to invoke when a connection to Kafka is closed
Arguments:
on_connection_closed_callback: a callback function to call
"""
self._on_connection_closed_callback = on_connection_closed_callback

def set_on_connection_opened_callback(self, on_connection_opened_callback):
"""Set a callback function to invoke when a connection to Kafka is opened
Arguments:
on_connection_opened_callback: a callback function to call
"""
self._on_connection_opened_callback = on_connection_opened_callback

def _on_connection_closed(self, conn, reason):
if self._on_connection_closed_callback:
self._on_connection_closed_callback(conn, reason)

""" Callback called when connection is closed
"""
# Connection failures imply that our metadata is stale, so let's
Expand Down Expand Up @@ -441,6 +460,9 @@ async def _get_conn(
self.force_metadata_update()
return None
else:
if self._on_connection_opened_callback:
self._on_connection_opened_callback(self._conns[conn_id])

return self._conns[conn_id]

async def ready(self, node_id, *, group=ConnectionGroup.DEFAULT):
Expand Down
15 changes: 15 additions & 0 deletions aiokafka/consumer/consumer.py
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -321,6 +321,21 @@ def __del__(self, _warnings=warnings):
context['source_traceback'] = self._source_traceback
self._loop.call_exception_handler(context)

def set_on_connection_closed_callback(self, on_connection_closed_callback):
"""Set a callback function to invoke when a connection to Kafka is closed
Arguments:
on_connection_closed_callback: a callback function to call
"""
self._client.set_on_connection_closed_callback(on_connection_closed_callback)

def set_on_connection_opened_callback(self, on_connection_opened_callback):
"""Set a callback function to invoke when a connection to Kafka is opened
Arguments:
on_connection_opened_callback: a callback function to call
"""
self._client.set_on_connection_opened_callback(on_connection_opened_callback)


async def start(self):
""" Connect to Kafka cluster. This will:
Expand Down
14 changes: 14 additions & 0 deletions aiokafka/producer/producer.py
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -549,6 +549,20 @@ async def send_offsets_to_transaction(self, offsets, group_id):
fut = self._txn_manager.add_offsets_to_txn(formatted_offsets, group_id)
await asyncio.shield(fut, loop=self._loop)

def set_on_connection_closed_callback(self, on_connection_closed_callback):
"""Set a callback function to invoke when a connection to Kafka is closed
Arguments:
on_connection_closed_callback: a callback function to call
"""
self.client.set_on_connection_closed_callback(on_connection_closed_callback)

def set_on_connection_opened_callback(self, on_connection_opened_callback):
"""Set a callback function to invoke when a connection to Kafka is opened
Arguments:
on_connection_opened_callback: a callback function to call
"""
self.client.set_on_connection_opened_callback(on_connection_opened_callback)


class TransactionContext:

Expand Down