Skip to content

Commit

Permalink
Silence "task was destroyed but is pending" error in tests which
Browse files Browse the repository at this point in the history
expect a coroutine to be incomplete.

Solution is kinda hacky (see `if _TEST` in implementation),
i will have a go at resolving this using monkey-patching next
  • Loading branch information
Benjamin Hodgson committed Nov 12, 2014
1 parent ff27e1c commit 0159e23
Show file tree
Hide file tree
Showing 7 changed files with 75 additions and 47 deletions.
12 changes: 10 additions & 2 deletions src/asynqp/bases.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@
from .exceptions import AMQPError


_TEST = False


class Sender(object):
def __init__(self, channel_id, protocol):
self.channel_id = channel_id
Expand Down Expand Up @@ -34,6 +37,9 @@ def create_reader_and_writer(handler):
return reader, writer


# When ready() is called, wait for a frame to arrive on the queue.
# When the frame does arrive, dispatch it to the handler and do nothing
# until someone calls ready() again.
class QueueReader(object):
def __init__(self, handler, q):
self.handler = handler
Expand All @@ -43,10 +49,12 @@ def __init__(self, handler, q):
def ready(self):
assert not self.is_waiting, "ready() got called while waiting for a frame to be read"
self.is_waiting = True
asyncio.async(self.read_next())
t = asyncio.async(self._read_next())
if _TEST: # this feels hacky to me
t._log_destroy_pending = False

@asyncio.coroutine
def read_next(self):
def _read_next(self):
assert self.is_waiting, "a frame got read without ready() having been called"
frame = yield from self.q.get()
self.is_waiting = False
Expand Down
7 changes: 6 additions & 1 deletion src/asynqp/channel.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@
from .exceptions import UndeliverableMessage


_TEST = False


VALID_QUEUE_NAME_RE = re.compile(r'^(?!amq\.)(\w|[-.:])*$', flags=re.A)
VALID_EXCHANGE_NAME_RE = re.compile(r'^(?!amq\.)(\w|[-.:])+$', flags=re.A)

Expand Down Expand Up @@ -220,7 +223,9 @@ def handle_BasicCancelOK(self, frame):
self.synchroniser.notify(spec.BasicCancelOK)

def handle_BasicDeliver(self, frame):
asyncio.async(self.message_receiver.receive_deliver(frame))
t = asyncio.async(self.message_receiver.receive_deliver(frame))
if _TEST:
t._log_destroy_pending = False

def handle_ContentHeaderFrame(self, frame):
asyncio.async(self.message_receiver.receive_header(frame))
Expand Down
16 changes: 16 additions & 0 deletions test/base_contexts.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import asyncio
import asynqp
import asynqp.bases
from asyncio import test_utils
from asynqp import spec
from asynqp import protocol
Expand All @@ -10,10 +11,25 @@
class LoopContext:
def given_an_event_loop(self):
self.loop = asyncio.get_event_loop()
# self.loop.set_debug(True)
asynqp.bases._TEST = True
asynqp.channel._TEST = True

def tick(self):
test_utils.run_briefly(self.loop)

def cleanup_test_hack(self):
asynqp.bases._TEST = False

def async_partial(self, coro):
"""
Schedule a coroutine which you are not expecting to complete before the end of the test.
Disables the error log when the task is destroyed before completing.
"""
t = asyncio.async(coro)
t._log_destroy_pending = False
return t


class MockLoopContext(LoopContext):
def given_an_event_loop(self):
Expand Down
14 changes: 7 additions & 7 deletions test/channel_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

class WhenOpeningAChannel(OpenConnectionContext):
def when_the_user_wants_to_open_a_channel(self):
asyncio.async(self.connection.open_channel(), loop=self.loop)
self.async_partial(self.connection.open_channel())
self.tick()

def it_should_send_a_channel_open_frame(self):
Expand Down Expand Up @@ -47,7 +47,7 @@ def it_should_have_the_correct_channel_id(self):

class WhenTheApplicationClosesAChannel(OpenChannelContext):
def when_I_close_the_channel(self):
asyncio.async(self.channel.close())
self.async_partial(self.channel.close())
self.tick()

def it_should_send_ChannelClose(self):
Expand All @@ -64,9 +64,9 @@ def it_should_send_ChannelCloseOK(self):
self.protocol.send_method.assert_called_once_with(self.channel.id, spec.ChannelCloseOK())


class WhenAnotherMethodArrivesAfterIClosedTheChannel(OpenChannelContext):
class WhenAnotherMethodArrivesWhileTheChannelIsClosing(OpenChannelContext):
def given_that_i_closed_the_channel(self):
asyncio.async(self.channel.close())
self.async_partial(self.channel.close())
self.tick()
self.protocol.reset_mock()

Expand Down Expand Up @@ -95,7 +95,7 @@ def it_MUST_discard_the_method(self):

class WhenAnAsyncMethodArrivesWhileWeAwaitASynchronousOne(OpenChannelContext):
def given_we_are_awaiting_QueueDeclareOK(self):
self.task = asyncio.async(self.channel.declare_queue('my.nice.queue', durable=True, exclusive=True, auto_delete=True), loop=self.loop)
self.task = self.async_partial(self.channel.declare_queue('my.nice.queue', durable=True, exclusive=True, auto_delete=True))
self.tick()
self.protocol.reset_mock()

Expand All @@ -113,7 +113,7 @@ def it_should_not_throw_an_exception(self):

class WhenAnUnexpectedChannelCloseArrives(OpenChannelContext):
def given_we_are_awaiting_QueueDeclareOK(self):
asyncio.async(self.channel.declare_queue('my.nice.queue', durable=True, exclusive=True, auto_delete=True), loop=self.loop)
self.async_partial(self.channel.declare_queue('my.nice.queue', durable=True, exclusive=True, auto_delete=True))
self.tick()
self.protocol.reset_mock()

Expand All @@ -128,7 +128,7 @@ def it_should_send_ChannelCloseOK(self):

class WhenSettingQOS(OpenChannelContext):
def when_we_are_setting_prefetch_count_only(self):
asyncio.async(self.channel.set_qos(prefetch_size=1000, prefetch_count=100, apply_globally=True))
self.async_partial(self.channel.set_qos(prefetch_size=1000, prefetch_count=100, apply_globally=True))
self.tick()

def it_should_send_BasicQos_with_default_values(self):
Expand Down
44 changes: 23 additions & 21 deletions test/connection_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

class WhenRespondingToConnectionStart(ConnectionContext):
def given_I_wrote_the_protocol_header(self):
asyncio.async(open_connection(self.loop, self.protocol, self.dispatcher, self.connection_info))
self.async_partial(open_connection(self.loop, self.protocol, self.dispatcher, self.connection_info))
self.tick()

def when_ConnectionStart_arrives(self):
Expand All @@ -29,7 +29,7 @@ def it_should_send_start_ok(self):

class WhenRespondingToConnectionTune(ConnectionContext):
def given_a_started_connection(self):
asyncio.async(open_connection(self.loop, self.protocol, self.dispatcher, self.connection_info))
self.async_partial(open_connection(self.loop, self.protocol, self.dispatcher, self.connection_info))
self.tick()
start_method = spec.ConnectionStart(0, 9, {}, 'PLAIN AMQPLAIN', 'en_US')
self.dispatcher.dispatch(asynqp.frames.MethodFrame(0, start_method))
Expand Down Expand Up @@ -78,27 +78,9 @@ def it_MUST_be_discarded(self):
assert not self.mock_writer.method_calls


class WhenAConnectionThatWasClosedByTheApplicationReceivesAMethod(OpenConnectionContext):
def given_a_closed_connection(self):
asyncio.async(self.connection.close())
self.tick()

start_method = spec.ConnectionStart(0, 9, {}, 'PLAIN AMQPLAIN', 'en_US')
self.start_frame = asynqp.frames.MethodFrame(0, start_method)
self.mock_writer = mock.Mock()

def when_another_frame_arrives(self):
with mock.patch.dict(self.dispatcher.queue_writers, {0: self.mock_writer}):
self.dispatcher.dispatch(self.start_frame)
self.tick()

def it_MUST_be_discarded(self):
assert not self.mock_writer.method_calls


class WhenTheApplicationClosesTheConnection(OpenConnectionContext):
def when_I_close_the_connection(self):
asyncio.async(self.connection.close())
self.async_partial(self.connection.close())
self.tick()

def it_should_send_ConnectionClose_with_no_exception(self):
Expand All @@ -118,3 +100,23 @@ def when_connection_close_ok_arrives(self):

def it_should_close_the_transport(self):
assert self.protocol.transport.close.called


# TODO: rewrite me to use a handler, not a queue writer
class WhenAConnectionThatIsClosingReceivesAMethod(OpenConnectionContext):
def given_a_closed_connection(self):
t = asyncio.async(self.connection.close())
t._log_destroy_pending = False
self.tick()

start_method = spec.ConnectionStart(0, 9, {}, 'PLAIN AMQPLAIN', 'en_US')
self.start_frame = asynqp.frames.MethodFrame(0, start_method)
self.mock_writer = mock.Mock()

def when_another_frame_arrives(self):
with mock.patch.dict(self.dispatcher.queue_writers, {0: self.mock_writer}):
self.dispatcher.dispatch(self.start_frame)
self.tick()

def it_MUST_be_discarded(self):
assert not self.mock_writer.method_calls
11 changes: 4 additions & 7 deletions test/exchange_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,7 @@

class WhenDeclaringAnExchange(OpenChannelContext):
def when_I_declare_an_exchange(self):
asyncio.async(self.channel.declare_exchange('my.nice.exchange', 'fanout', durable=True, auto_delete=False, internal=False),
loop=self.loop)
self.async_partial(self.channel.declare_exchange('my.nice.exchange', 'fanout', durable=True, auto_delete=False, internal=False))
self.tick()

def it_should_send_ExchangeDeclare(self):
Expand All @@ -22,8 +21,7 @@ def it_should_send_ExchangeDeclare(self):

class WhenExchangeDeclareOKArrives(OpenChannelContext):
def given_I_declared_an_exchange(self):
self.task = asyncio.async(self.channel.declare_exchange('my.nice.exchange', 'fanout', durable=True, auto_delete=False, internal=False),
loop=self.loop)
self.task = asyncio.async(self.channel.declare_exchange('my.nice.exchange', 'fanout', durable=True, auto_delete=False, internal=False))
self.tick()

def when_the_reply_arrives(self):
Expand Down Expand Up @@ -52,8 +50,7 @@ def it_should_not_be_internal(self):
# Clients are not allowed to re-declare the default exchange, but they are allowed to publish to it
class WhenIDeclareTheDefaultExchange(OpenChannelContext):
def when_I_declare_an_exchange_with_an_empty_name(self):
task = asyncio.async(self.channel.declare_exchange('', 'direct', durable=True, auto_delete=False, internal=False),
loop=self.loop)
task = asyncio.async(self.channel.declare_exchange('', 'direct', durable=True, auto_delete=False, internal=False))
self.tick()
self.exchange = task.result()

Expand Down Expand Up @@ -163,7 +160,7 @@ def it_should_send_multiple_body_frames(self):

class WhenDeletingAnExchange(ExchangeContext):
def when_I_delete_the_exchange(self):
asyncio.async(self.exchange.delete(if_unused=True), loop=self.loop)
self.async_partial(self.exchange.delete(if_unused=True))
self.tick()

def it_should_send_ExchangeDelete(self):
Expand Down
18 changes: 9 additions & 9 deletions test/queue_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

class WhenDeclaringAQueue(OpenChannelContext):
def when_I_declare_a_queue(self):
asyncio.async(self.channel.declare_queue('my.nice.queue', durable=True, exclusive=True, auto_delete=True), loop=self.loop)
self.async_partial(self.channel.declare_queue('my.nice.queue', durable=True, exclusive=True, auto_delete=True))
self.tick()

def it_should_send_a_QueueDeclare_method(self):
Expand All @@ -21,7 +21,7 @@ def it_should_send_a_QueueDeclare_method(self):
class WhenQueueDeclareOKArrives(OpenChannelContext):
def given_I_declared_a_queue(self):
self.queue_name = 'my.nice.queue'
self.task = asyncio.async(self.channel.declare_queue(self.queue_name, durable=True, exclusive=True, auto_delete=True), loop=self.loop)
self.task = asyncio.async(self.channel.declare_queue(self.queue_name, durable=True, exclusive=True, auto_delete=True))
self.tick()

def when_QueueDeclareOK_arrives(self):
Expand Down Expand Up @@ -75,7 +75,7 @@ def it_should_throw_ValueError(self):

class WhenBindingAQueueToAnExchange(QueueContext, ExchangeContext):
def when_I_bind_the_queue(self):
asyncio.async(self.queue.bind(self.exchange, 'routing.key'))
self.async_partial(self.queue.bind(self.exchange, 'routing.key'))
self.tick()

def it_should_send_QueueBind(self):
Expand All @@ -102,7 +102,7 @@ def and_the_returned_binding_should_have_the_correct_exchange(self):

class WhenUnbindingAQueue(BoundQueueContext):
def when_I_unbind_the_queue(self):
asyncio.async(self.binding.unbind())
self.async_partial(self.binding.unbind())
self.tick()

def it_should_send_QueueUnbind(self):
Expand Down Expand Up @@ -141,7 +141,7 @@ def it_should_throw_Deleted(self):

class WhenIAskForAMessage(QueueContext):
def when_I_get_a_message(self):
asyncio.async(self.queue.get(no_ack=False))
self.async_partial(self.queue.get(no_ack=False))
self.tick()

def it_should_send_BasicGet(self):
Expand Down Expand Up @@ -193,7 +193,7 @@ def it_should_put_the_routing_key_on_the_msg(self):

class WhenISubscribeToAQueue(QueueContext):
def when_I_start_a_consumer(self):
asyncio.async(self.queue.consume(lambda msg: None, no_local=False, no_ack=False, exclusive=False))
self.async_partial(self.queue.consume(lambda msg: None, no_local=False, no_ack=False, exclusive=False))
self.tick()

def it_should_send_BasicConsume(self):
Expand Down Expand Up @@ -271,7 +271,7 @@ def cleanup_the_exception_handler(self):

class WhenICancelAConsumer(ConsumerContext):
def when_I_cancel_the_consumer(self):
asyncio.async(self.consumer.cancel())
self.async_partial(self.consumer.cancel())
self.tick()

def it_should_send_a_BasicCancel_method(self):
Expand All @@ -293,7 +293,7 @@ def it_should_be_cancelled(self):

class WhenIPurgeAQueue(QueueContext):
def because_I_purge_the_queue(self):
asyncio.async(self.queue.purge())
self.async_partial(self.queue.purge())
self.tick()

def it_should_send_a_QueuePurge_method(self):
Expand All @@ -316,7 +316,7 @@ def it_should_return(self):

class WhenDeletingAQueue(QueueContext):
def because_I_delete_the_queue(self):
asyncio.async(self.queue.delete(if_unused=False, if_empty=False))
self.async_partial(self.queue.delete(if_unused=False, if_empty=False))
self.tick()

def it_should_send_a_QueueDelete_method(self):
Expand Down

0 comments on commit 0159e23

Please sign in to comment.