You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
importasynciofromrstreamimportAMQPMessagefromrstream.amqpimportamqp_decoderfromrstream.consumerimportConsumer, MessageContextasyncdefconsume():
consumer=Consumer(
host="localhost",
username="guest",
password="guest",
)
asyncdefon_message(msg: AMQPMessage, message_context: MessageContext):
print("message received, now let's close the consumer")
awaitconsumer.close()
awaitconsumer.start()
awaitconsumer.subscribe("test_request_stream", on_message, decoder=amqp_decoder)
awaitconsumer.run()
awaitasyncio.sleep(20)
asyncio.run(consume())
throws on await consumer.close():
Traceback (most recent call last): File "/home/gupta/.pyenv/versions/3.10.11/lib/python3.10/asyncio/tasks.py", line 456, in wait_for return fut.result()asyncio.exceptions.CancelledErrorThe above exception was the direct cause of the following exception:Traceback (most recent call last): File "/home/gupta/work/test/rstream/rstream/client.py", line 211, in _listener await maybe_coro File "/home/gupta/work/test/rstream/rstream/consumer.py", line 281, in _on_deliver await maybe_coro File "/home/gupta/work/test/rstream/rstream/__main__.py", line 17, in on_message await consumer.close() File "/home/gupta/work/test/rstream/rstream/consumer.py", line 119, in close await self.unsubscribe(subscriber.reference) File "/home/gupta/work/test/rstream/rstream/consumer.py", line 233, in unsubscribe await subscriber.client.unsubscribe(subscriber.subscription_id) File "/home/gupta/work/test/rstream/rstream/client.py", line 415, in unsubscribe await self.sync_request( File "/home/gupta/work/test/rstream/rstream/client.py", line 165, in sync_request resp = await waiter File "/home/gupta/work/test/rstream/rstream/utils.py", line 38, in _wait return await asyncio.wait_for(self.future, self.timeout) File "/home/gupta/.pyenv/versions/3.10.11/lib/python3.10/asyncio/tasks.py", line 458, in wait_for raise exceptions.TimeoutError() from excasyncio.exceptions.TimeoutErrorException in callback BaseClient.start_task.<locals>.on_task_done(<Task finishe...g iteration')>) at /home/gupta/work/test/rstream/rstream/client.py:103handle: <Handle BaseClient.start_task.<locals>.on_task_done(<Task finishe...g iteration')>) at /home/gupta/work/test/rstream/rstream/client.py:103>Traceback (most recent call last): File "/home/gupta/.pyenv/versions/3.10.11/lib/python3.10/asyncio/events.py", line 80, in _run self._context.run(self._callback, *self._args) File "/home/gupta/work/test/rstream/rstream/client.py", line 105, in on_task_done task.result() File "/home/gupta/work/test/rstream/rstream/client.py", line 207, in _listener for _, handler in self._handlers.get(frame.__class__, {}).items():RuntimeError: dictionary changed size during iteration
The text was updated successfully, but these errors were encountered:
Following code:
throws on
await consumer.close()
:The text was updated successfully, but these errors were encountered: