From 0159e23e77a27efa07e13a575cad5b4644dfe70e Mon Sep 17 00:00:00 2001 From: Benjamin Hodgson Date: Wed, 12 Nov 2014 13:39:40 +0000 Subject: [PATCH] Silence "task was destroyed but is pending" error in tests which expect a coroutine to be incomplete. Solution is kinda hacky (see `if _TEST` in implementation), i will have a go at resolving this using monkey-patching next --- src/asynqp/bases.py | 12 +++++++++-- src/asynqp/channel.py | 7 ++++++- test/base_contexts.py | 16 +++++++++++++++ test/channel_tests.py | 14 ++++++------- test/connection_tests.py | 44 +++++++++++++++++++++------------------- test/exchange_tests.py | 11 ++++------ test/queue_tests.py | 18 ++++++++-------- 7 files changed, 75 insertions(+), 47 deletions(-) diff --git a/src/asynqp/bases.py b/src/asynqp/bases.py index ab154b6..eb13c01 100644 --- a/src/asynqp/bases.py +++ b/src/asynqp/bases.py @@ -4,6 +4,9 @@ from .exceptions import AMQPError +_TEST = False + + class Sender(object): def __init__(self, channel_id, protocol): self.channel_id = channel_id @@ -34,6 +37,9 @@ def create_reader_and_writer(handler): return reader, writer +# When ready() is called, wait for a frame to arrive on the queue. +# When the frame does arrive, dispatch it to the handler and do nothing +# until someone calls ready() again. class QueueReader(object): def __init__(self, handler, q): self.handler = handler @@ -43,10 +49,12 @@ def __init__(self, handler, q): def ready(self): assert not self.is_waiting, "ready() got called while waiting for a frame to be read" self.is_waiting = True - asyncio.async(self.read_next()) + t = asyncio.async(self._read_next()) + if _TEST: # this feels hacky to me + t._log_destroy_pending = False @asyncio.coroutine - def read_next(self): + def _read_next(self): assert self.is_waiting, "a frame got read without ready() having been called" frame = yield from self.q.get() self.is_waiting = False diff --git a/src/asynqp/channel.py b/src/asynqp/channel.py index 7ec0e6f..2057a2f 100644 --- a/src/asynqp/channel.py +++ b/src/asynqp/channel.py @@ -10,6 +10,9 @@ from .exceptions import UndeliverableMessage +_TEST = False + + VALID_QUEUE_NAME_RE = re.compile(r'^(?!amq\.)(\w|[-.:])*$', flags=re.A) VALID_EXCHANGE_NAME_RE = re.compile(r'^(?!amq\.)(\w|[-.:])+$', flags=re.A) @@ -220,7 +223,9 @@ def handle_BasicCancelOK(self, frame): self.synchroniser.notify(spec.BasicCancelOK) def handle_BasicDeliver(self, frame): - asyncio.async(self.message_receiver.receive_deliver(frame)) + t = asyncio.async(self.message_receiver.receive_deliver(frame)) + if _TEST: + t._log_destroy_pending = False def handle_ContentHeaderFrame(self, frame): asyncio.async(self.message_receiver.receive_header(frame)) diff --git a/test/base_contexts.py b/test/base_contexts.py index 23671da..f09a708 100644 --- a/test/base_contexts.py +++ b/test/base_contexts.py @@ -1,5 +1,6 @@ import asyncio import asynqp +import asynqp.bases from asyncio import test_utils from asynqp import spec from asynqp import protocol @@ -10,10 +11,25 @@ class LoopContext: def given_an_event_loop(self): self.loop = asyncio.get_event_loop() + # self.loop.set_debug(True) + asynqp.bases._TEST = True + asynqp.channel._TEST = True def tick(self): test_utils.run_briefly(self.loop) + def cleanup_test_hack(self): + asynqp.bases._TEST = False + + def async_partial(self, coro): + """ + Schedule a coroutine which you are not expecting to complete before the end of the test. + Disables the error log when the task is destroyed before completing. + """ + t = asyncio.async(coro) + t._log_destroy_pending = False + return t + class MockLoopContext(LoopContext): def given_an_event_loop(self): diff --git a/test/channel_tests.py b/test/channel_tests.py index 238da0b..3fd64a3 100644 --- a/test/channel_tests.py +++ b/test/channel_tests.py @@ -10,7 +10,7 @@ class WhenOpeningAChannel(OpenConnectionContext): def when_the_user_wants_to_open_a_channel(self): - asyncio.async(self.connection.open_channel(), loop=self.loop) + self.async_partial(self.connection.open_channel()) self.tick() def it_should_send_a_channel_open_frame(self): @@ -47,7 +47,7 @@ def it_should_have_the_correct_channel_id(self): class WhenTheApplicationClosesAChannel(OpenChannelContext): def when_I_close_the_channel(self): - asyncio.async(self.channel.close()) + self.async_partial(self.channel.close()) self.tick() def it_should_send_ChannelClose(self): @@ -64,9 +64,9 @@ def it_should_send_ChannelCloseOK(self): self.protocol.send_method.assert_called_once_with(self.channel.id, spec.ChannelCloseOK()) -class WhenAnotherMethodArrivesAfterIClosedTheChannel(OpenChannelContext): +class WhenAnotherMethodArrivesWhileTheChannelIsClosing(OpenChannelContext): def given_that_i_closed_the_channel(self): - asyncio.async(self.channel.close()) + self.async_partial(self.channel.close()) self.tick() self.protocol.reset_mock() @@ -95,7 +95,7 @@ def it_MUST_discard_the_method(self): class WhenAnAsyncMethodArrivesWhileWeAwaitASynchronousOne(OpenChannelContext): def given_we_are_awaiting_QueueDeclareOK(self): - self.task = asyncio.async(self.channel.declare_queue('my.nice.queue', durable=True, exclusive=True, auto_delete=True), loop=self.loop) + self.task = self.async_partial(self.channel.declare_queue('my.nice.queue', durable=True, exclusive=True, auto_delete=True)) self.tick() self.protocol.reset_mock() @@ -113,7 +113,7 @@ def it_should_not_throw_an_exception(self): class WhenAnUnexpectedChannelCloseArrives(OpenChannelContext): def given_we_are_awaiting_QueueDeclareOK(self): - asyncio.async(self.channel.declare_queue('my.nice.queue', durable=True, exclusive=True, auto_delete=True), loop=self.loop) + self.async_partial(self.channel.declare_queue('my.nice.queue', durable=True, exclusive=True, auto_delete=True)) self.tick() self.protocol.reset_mock() @@ -128,7 +128,7 @@ def it_should_send_ChannelCloseOK(self): class WhenSettingQOS(OpenChannelContext): def when_we_are_setting_prefetch_count_only(self): - asyncio.async(self.channel.set_qos(prefetch_size=1000, prefetch_count=100, apply_globally=True)) + self.async_partial(self.channel.set_qos(prefetch_size=1000, prefetch_count=100, apply_globally=True)) self.tick() def it_should_send_BasicQos_with_default_values(self): diff --git a/test/connection_tests.py b/test/connection_tests.py index 46a10e4..1ce52b1 100644 --- a/test/connection_tests.py +++ b/test/connection_tests.py @@ -9,7 +9,7 @@ class WhenRespondingToConnectionStart(ConnectionContext): def given_I_wrote_the_protocol_header(self): - asyncio.async(open_connection(self.loop, self.protocol, self.dispatcher, self.connection_info)) + self.async_partial(open_connection(self.loop, self.protocol, self.dispatcher, self.connection_info)) self.tick() def when_ConnectionStart_arrives(self): @@ -29,7 +29,7 @@ def it_should_send_start_ok(self): class WhenRespondingToConnectionTune(ConnectionContext): def given_a_started_connection(self): - asyncio.async(open_connection(self.loop, self.protocol, self.dispatcher, self.connection_info)) + self.async_partial(open_connection(self.loop, self.protocol, self.dispatcher, self.connection_info)) self.tick() start_method = spec.ConnectionStart(0, 9, {}, 'PLAIN AMQPLAIN', 'en_US') self.dispatcher.dispatch(asynqp.frames.MethodFrame(0, start_method)) @@ -78,27 +78,9 @@ def it_MUST_be_discarded(self): assert not self.mock_writer.method_calls -class WhenAConnectionThatWasClosedByTheApplicationReceivesAMethod(OpenConnectionContext): - def given_a_closed_connection(self): - asyncio.async(self.connection.close()) - self.tick() - - start_method = spec.ConnectionStart(0, 9, {}, 'PLAIN AMQPLAIN', 'en_US') - self.start_frame = asynqp.frames.MethodFrame(0, start_method) - self.mock_writer = mock.Mock() - - def when_another_frame_arrives(self): - with mock.patch.dict(self.dispatcher.queue_writers, {0: self.mock_writer}): - self.dispatcher.dispatch(self.start_frame) - self.tick() - - def it_MUST_be_discarded(self): - assert not self.mock_writer.method_calls - - class WhenTheApplicationClosesTheConnection(OpenConnectionContext): def when_I_close_the_connection(self): - asyncio.async(self.connection.close()) + self.async_partial(self.connection.close()) self.tick() def it_should_send_ConnectionClose_with_no_exception(self): @@ -118,3 +100,23 @@ def when_connection_close_ok_arrives(self): def it_should_close_the_transport(self): assert self.protocol.transport.close.called + + +# TODO: rewrite me to use a handler, not a queue writer +class WhenAConnectionThatIsClosingReceivesAMethod(OpenConnectionContext): + def given_a_closed_connection(self): + t = asyncio.async(self.connection.close()) + t._log_destroy_pending = False + self.tick() + + start_method = spec.ConnectionStart(0, 9, {}, 'PLAIN AMQPLAIN', 'en_US') + self.start_frame = asynqp.frames.MethodFrame(0, start_method) + self.mock_writer = mock.Mock() + + def when_another_frame_arrives(self): + with mock.patch.dict(self.dispatcher.queue_writers, {0: self.mock_writer}): + self.dispatcher.dispatch(self.start_frame) + self.tick() + + def it_MUST_be_discarded(self): + assert not self.mock_writer.method_calls diff --git a/test/exchange_tests.py b/test/exchange_tests.py index a79e3c5..47460f8 100644 --- a/test/exchange_tests.py +++ b/test/exchange_tests.py @@ -11,8 +11,7 @@ class WhenDeclaringAnExchange(OpenChannelContext): def when_I_declare_an_exchange(self): - asyncio.async(self.channel.declare_exchange('my.nice.exchange', 'fanout', durable=True, auto_delete=False, internal=False), - loop=self.loop) + self.async_partial(self.channel.declare_exchange('my.nice.exchange', 'fanout', durable=True, auto_delete=False, internal=False)) self.tick() def it_should_send_ExchangeDeclare(self): @@ -22,8 +21,7 @@ def it_should_send_ExchangeDeclare(self): class WhenExchangeDeclareOKArrives(OpenChannelContext): def given_I_declared_an_exchange(self): - self.task = asyncio.async(self.channel.declare_exchange('my.nice.exchange', 'fanout', durable=True, auto_delete=False, internal=False), - loop=self.loop) + self.task = asyncio.async(self.channel.declare_exchange('my.nice.exchange', 'fanout', durable=True, auto_delete=False, internal=False)) self.tick() def when_the_reply_arrives(self): @@ -52,8 +50,7 @@ def it_should_not_be_internal(self): # Clients are not allowed to re-declare the default exchange, but they are allowed to publish to it class WhenIDeclareTheDefaultExchange(OpenChannelContext): def when_I_declare_an_exchange_with_an_empty_name(self): - task = asyncio.async(self.channel.declare_exchange('', 'direct', durable=True, auto_delete=False, internal=False), - loop=self.loop) + task = asyncio.async(self.channel.declare_exchange('', 'direct', durable=True, auto_delete=False, internal=False)) self.tick() self.exchange = task.result() @@ -163,7 +160,7 @@ def it_should_send_multiple_body_frames(self): class WhenDeletingAnExchange(ExchangeContext): def when_I_delete_the_exchange(self): - asyncio.async(self.exchange.delete(if_unused=True), loop=self.loop) + self.async_partial(self.exchange.delete(if_unused=True)) self.tick() def it_should_send_ExchangeDelete(self): diff --git a/test/queue_tests.py b/test/queue_tests.py index b57352b..26f10c2 100644 --- a/test/queue_tests.py +++ b/test/queue_tests.py @@ -10,7 +10,7 @@ class WhenDeclaringAQueue(OpenChannelContext): def when_I_declare_a_queue(self): - asyncio.async(self.channel.declare_queue('my.nice.queue', durable=True, exclusive=True, auto_delete=True), loop=self.loop) + self.async_partial(self.channel.declare_queue('my.nice.queue', durable=True, exclusive=True, auto_delete=True)) self.tick() def it_should_send_a_QueueDeclare_method(self): @@ -21,7 +21,7 @@ def it_should_send_a_QueueDeclare_method(self): class WhenQueueDeclareOKArrives(OpenChannelContext): def given_I_declared_a_queue(self): self.queue_name = 'my.nice.queue' - self.task = asyncio.async(self.channel.declare_queue(self.queue_name, durable=True, exclusive=True, auto_delete=True), loop=self.loop) + self.task = asyncio.async(self.channel.declare_queue(self.queue_name, durable=True, exclusive=True, auto_delete=True)) self.tick() def when_QueueDeclareOK_arrives(self): @@ -75,7 +75,7 @@ def it_should_throw_ValueError(self): class WhenBindingAQueueToAnExchange(QueueContext, ExchangeContext): def when_I_bind_the_queue(self): - asyncio.async(self.queue.bind(self.exchange, 'routing.key')) + self.async_partial(self.queue.bind(self.exchange, 'routing.key')) self.tick() def it_should_send_QueueBind(self): @@ -102,7 +102,7 @@ def and_the_returned_binding_should_have_the_correct_exchange(self): class WhenUnbindingAQueue(BoundQueueContext): def when_I_unbind_the_queue(self): - asyncio.async(self.binding.unbind()) + self.async_partial(self.binding.unbind()) self.tick() def it_should_send_QueueUnbind(self): @@ -141,7 +141,7 @@ def it_should_throw_Deleted(self): class WhenIAskForAMessage(QueueContext): def when_I_get_a_message(self): - asyncio.async(self.queue.get(no_ack=False)) + self.async_partial(self.queue.get(no_ack=False)) self.tick() def it_should_send_BasicGet(self): @@ -193,7 +193,7 @@ def it_should_put_the_routing_key_on_the_msg(self): class WhenISubscribeToAQueue(QueueContext): def when_I_start_a_consumer(self): - asyncio.async(self.queue.consume(lambda msg: None, no_local=False, no_ack=False, exclusive=False)) + self.async_partial(self.queue.consume(lambda msg: None, no_local=False, no_ack=False, exclusive=False)) self.tick() def it_should_send_BasicConsume(self): @@ -271,7 +271,7 @@ def cleanup_the_exception_handler(self): class WhenICancelAConsumer(ConsumerContext): def when_I_cancel_the_consumer(self): - asyncio.async(self.consumer.cancel()) + self.async_partial(self.consumer.cancel()) self.tick() def it_should_send_a_BasicCancel_method(self): @@ -293,7 +293,7 @@ def it_should_be_cancelled(self): class WhenIPurgeAQueue(QueueContext): def because_I_purge_the_queue(self): - asyncio.async(self.queue.purge()) + self.async_partial(self.queue.purge()) self.tick() def it_should_send_a_QueuePurge_method(self): @@ -316,7 +316,7 @@ def it_should_return(self): class WhenDeletingAQueue(QueueContext): def because_I_delete_the_queue(self): - asyncio.async(self.queue.delete(if_unused=False, if_empty=False)) + self.async_partial(self.queue.delete(if_unused=False, if_empty=False)) self.tick() def it_should_send_a_QueueDelete_method(self):