Skip to content

Commit

Permalink
#34 build API for handling failures in a consumer
Browse files Browse the repository at this point in the history
  • Loading branch information
Benjamin Hodgson committed Sep 15, 2015
1 parent d3ead47 commit e30c0fd
Show file tree
Hide file tree
Showing 3 changed files with 79 additions and 1 deletion.
10 changes: 9 additions & 1 deletion src/asynqp/channel.py
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ def open(self):
consumers = queue.Consumers(self.loop)
consumers.add_consumer(basic_return_consumer)

handler = ChannelActor(synchroniser, sender)
handler = ChannelActor(consumers, synchroniser, sender)
reader, writer = routing.create_reader_and_writer(handler)
handler.message_receiver = MessageReceiver(synchroniser, sender, consumers, reader)

Expand Down Expand Up @@ -193,6 +193,14 @@ def open(self):


class ChannelActor(routing.Actor):
def __init__(self, consumers, *args, **kwargs):
super().__init__(*args, **kwargs)
self.consumers = consumers

def handle_PoisonPillFrame(self, frame):
super().handle_PoisonPillFrame(frame)
self.consumers.error(frame.exception)

def handle_ChannelOpenOK(self, frame):
self.synchroniser.notify(spec.ChannelOpenOK)

Expand Down
15 changes: 15 additions & 0 deletions src/asynqp/queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,14 @@ def consume(self, callback, *, no_local=False, no_ack=False, exclusive=False, ar
Start a consumer on the queue. Messages will be delivered asynchronously to the consumer.
The callback function will be called whenever a new message arrives on the queue.
Advanced usage: the callback object must be callable
(it must be a function or define a ``__call__`` method),
but may also define some further methods:
* ``callback.on_cancel()``: called with no parameters when the consumer is successfully cancelled.
* ``callback.on_error(exc)``: called when the channel is closed due to an error.
The argument passed is the exception which caused the error.
This method is a :ref:`coroutine <coroutine>`.
:param callable callback: a callback to be called when a message is delivered.
Expand Down Expand Up @@ -250,6 +258,8 @@ def cancel(self):
yield from self.synchroniser.await(spec.BasicCancelOK)
self.cancelled = True
self.cancelled_future.set_result(self)
if hasattr(self.callback, 'on_cancel'):
self.callback.on_cancel()
self.reader.ready()


Expand Down Expand Up @@ -288,3 +298,8 @@ def deliver(self, tag, msg):
assert tag in self.consumers, "Message got delivered to a non existent consumer"
consumer = self.consumers[tag]
self.loop.call_soon(consumer.callback, msg)

def error(self, exc):
for consumer in self.consumers.values():
if hasattr(consumer.callback, 'on_error'):
consumer.callback.on_error(exc)
55 changes: 55 additions & 0 deletions test/queue_tests.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import asyncio
from datetime import datetime
import contexts
import asynqp
from asynqp import message
from asynqp import frames
Expand Down Expand Up @@ -267,6 +268,60 @@ def it_should_be_cancelled(self):
assert self.consumer.cancelled


class WhenCancelOKArrivesForAConsumerWithAnOnCancelMethod(QueueContext):
def given_I_started_and_cancelled_a_consumer(self):
self.consumer = self.ConsumerWithOnCancel()
task = asyncio.async(self.queue.consume(self.consumer, no_local=False, no_ack=False, exclusive=False, arguments={'x-priority': 1}))
self.tick()
self.server.send_method(self.channel.id, spec.BasicConsumeOK('made.up.tag'))
self.tick()
asyncio.async(task.result().cancel())
self.tick()

def when_BasicCancelOK_arrives(self):
self.server.send_method(self.channel.id, spec.BasicCancelOK('made.up.tag'))

def it_should_call_on_cancel(self):
assert self.consumer.on_cancel_called

class ConsumerWithOnCancel:
def __init__(self):
self.on_cancel_called = False

def __call__(self):
pass

def on_cancel(self):
self.on_cancel_called = True


class WhenAConsumerWithAnOnCancelMethodIsKilledDueToAnError(QueueContext):
def given_I_started_a_consumer(self):
self.consumer = self.ConsumerWithOnError()
asyncio.async(self.queue.consume(self.consumer, no_local=False, no_ack=False, exclusive=False, arguments={'x-priority': 1}))
self.tick()
self.server.send_method(self.channel.id, spec.BasicConsumeOK('made.up.tag'))
self.tick()
self.exception = Exception()

def when_the_connection_dies(self):
contexts.catch(self.protocol.connection_lost, self.exception)
self.tick()

def it_should_call_on_error(self):
assert self.consumer.exc is self.exception

class ConsumerWithOnError:
def __init__(self):
self.exc = None

def __call__(self):
pass

def on_error(self, exc):
self.exc = exc


class WhenIPurgeAQueue(QueueContext):
def because_I_purge_the_queue(self):
self.async_partial(self.queue.purge())
Expand Down

0 comments on commit e30c0fd

Please sign in to comment.