Skip to content

Commit

Permalink
Add simple on_connection_close handler without exception handlers (#88)
Browse files Browse the repository at this point in the history
* ADD: connection close callback feature

* FIX: forgoten return
  • Loading branch information
polosaty authored and benjamin-hodgson committed Oct 20, 2016
1 parent 2fe7109 commit 992c356
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 6 deletions.
16 changes: 11 additions & 5 deletions src/asynqp/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@
def connect(host='localhost',
port=5672,
username='guest', password='guest',
virtual_host='/', *,
virtual_host='/',
on_connection_close=None, *,
loop=None, sock=None, **kwargs):
"""
Connect to an AMQP server on the given host and port.
Expand All @@ -35,6 +36,7 @@ def connect(host='localhost',
:param str username: the username to authenticate with.
:param str password: the password to authenticate with.
:param str virtual_host: the AMQP virtual host to connect to.
:param func on_connection_close: function called after connection lost.
:keyword BaseEventLoop loop: An instance of :class:`~asyncio.BaseEventLoop` to use.
(Defaults to :func:`asyncio.get_event_loop()`)
:keyword socket sock: A :func:`~socket.socket` instance to use for the connection.
Expand All @@ -60,7 +62,10 @@ def connect(host='localhost',
kwargs['sock'] = sock

dispatcher = Dispatcher()
transport, protocol = yield from loop.create_connection(lambda: AMQP(dispatcher, loop), **kwargs)

def protocol_factory():
return AMQP(dispatcher, loop, close_callback=on_connection_close)
transport, protocol = yield from loop.create_connection(protocol_factory, **kwargs)

# RPC-like applications require TCP_NODELAY in order to acheive
# minimal response time. Actually, this library send data in one
Expand All @@ -85,7 +90,8 @@ def connect(host='localhost',
def connect_and_open_channel(host='localhost',
port=5672,
username='guest', password='guest',
virtual_host='/', *,
virtual_host='/',
on_connection_close=None, *,
loop=None, **kwargs):
"""
Connect to an AMQP server and open a channel on the connection.
Expand All @@ -97,10 +103,10 @@ def connect_and_open_channel(host='localhost',
Equivalent to::
connection = yield from connect(host, port, username, password, virtual_host, loop=loop, **kwargs)
connection = yield from connect(host, port, username, password, virtual_host, on_connection_close, loop=loop, **kwargs)
channel = yield from connection.open_channel()
return connection, channel
"""
connection = yield from connect(host, port, username, password, virtual_host, loop=loop, **kwargs)
connection = yield from connect(host, port, username, password, virtual_host, on_connection_close, loop=loop, **kwargs)
channel = yield from connection.open_channel()
return connection, channel
7 changes: 6 additions & 1 deletion src/asynqp/protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,13 @@


class AMQP(asyncio.Protocol):
def __init__(self, dispatcher, loop):
def __init__(self, dispatcher, loop, close_callback=None):
self.dispatcher = dispatcher
self.partial_frame = b''
self.frame_reader = FrameReader()
self.heartbeat_monitor = HeartbeatMonitor(self, loop)
self._closed = False
self._close_callback = close_callback

def connection_made(self, transport):
self.transport = transport
Expand Down Expand Up @@ -51,6 +52,10 @@ def start_heartbeat(self, heartbeat_interval):
def connection_lost(self, exc):
# If self._closed=True - we closed the transport ourselves. No need to
# dispatch PoisonPillFrame, as we should have closed everything already
if self._close_callback:
# _close_callback now only accepts coroutines
asyncio.async(self._close_callback(exc))

if not self._closed:
poison_exc = ConnectionLostError(
'The connection was unexpectedly lost', exc)
Expand Down

0 comments on commit 992c356

Please sign in to comment.