Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Consumer.close throws a RuntimeError #123

Closed
kyrylokovalenko opened this issue Sep 19, 2023 · 1 comment · Fixed by #134
Closed

Consumer.close throws a RuntimeError #123

kyrylokovalenko opened this issue Sep 19, 2023 · 1 comment · Fixed by #134
Assignees
Labels
bug Something isn't working

Comments

@kyrylokovalenko
Copy link

Following code:

import asyncio

from rstream import AMQPMessage
from rstream.amqp import amqp_decoder
from rstream.consumer import Consumer, MessageContext


async def consume():
    consumer = Consumer(
        host="localhost",
        username="guest",
        password="guest",
    )

    async def on_message(msg: AMQPMessage, message_context: MessageContext):
        print("message received, now let's close the consumer")
        await consumer.close()

    await consumer.start()
    await consumer.subscribe("test_request_stream", on_message, decoder=amqp_decoder)
    await consumer.run()

    await asyncio.sleep(20)

asyncio.run(consume())

throws on await consumer.close():

Traceback (most recent call last):
  File "/home/gupta/.pyenv/versions/3.10.11/lib/python3.10/asyncio/tasks.py", line 456, in wait_for
    return fut.result()
asyncio.exceptions.CancelledError

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/home/gupta/work/test/rstream/rstream/client.py", line 211, in _listener
    await maybe_coro
  File "/home/gupta/work/test/rstream/rstream/consumer.py", line 281, in _on_deliver
    await maybe_coro
  File "/home/gupta/work/test/rstream/rstream/__main__.py", line 17, in on_message
    await consumer.close()
  File "/home/gupta/work/test/rstream/rstream/consumer.py", line 119, in close
    await self.unsubscribe(subscriber.reference)
  File "/home/gupta/work/test/rstream/rstream/consumer.py", line 233, in unsubscribe
    await subscriber.client.unsubscribe(subscriber.subscription_id)
  File "/home/gupta/work/test/rstream/rstream/client.py", line 415, in unsubscribe
    await self.sync_request(
  File "/home/gupta/work/test/rstream/rstream/client.py", line 165, in sync_request
    resp = await waiter
  File "/home/gupta/work/test/rstream/rstream/utils.py", line 38, in _wait
    return await asyncio.wait_for(self.future, self.timeout)
  File "/home/gupta/.pyenv/versions/3.10.11/lib/python3.10/asyncio/tasks.py", line 458, in wait_for
    raise exceptions.TimeoutError() from exc
asyncio.exceptions.TimeoutError
Exception in callback BaseClient.start_task.<locals>.on_task_done(<Task finishe...g iteration')>) at /home/gupta/work/test/rstream/rstream/client.py:103
handle: <Handle BaseClient.start_task.<locals>.on_task_done(<Task finishe...g iteration')>) at /home/gupta/work/test/rstream/rstream/client.py:103>
Traceback (most recent call last):
  File "/home/gupta/.pyenv/versions/3.10.11/lib/python3.10/asyncio/events.py", line 80, in _run
    self._context.run(self._callback, *self._args)
  File "/home/gupta/work/test/rstream/rstream/client.py", line 105, in on_task_done
    task.result()
  File "/home/gupta/work/test/rstream/rstream/client.py", line 207, in _listener
    for _, handler in self._handlers.get(frame.__class__, {}).items():
RuntimeError: dictionary changed size during iteration
@DanielePalaia
Copy link
Collaborator

I suspect in maybe due to #80. We have currently issues to make a
synchronous call (and unsubscribe does it) from a callback because of that issue.

It would be possible for you at the moment to try to handle consumer.close when a signal is catched like we do:
https://github.com/qweeze/rstream/blob/master/docs/examples/basic_consumers/basic_consumer_binary.py

In this case when you do a control+c the handle run the consumer.close inside a different task.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants