Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

How to close websockets coroutines ? #754

Closed
Insoleet opened this issue Jan 29, 2016 · 8 comments
Closed

How to close websockets coroutines ? #754

Insoleet opened this issue Jan 29, 2016 · 8 comments
Labels

Comments

@Insoleet
Copy link
Contributor

Hello,

I'm running the following code which hangs :

import quamash
import asyncio
import aiohttp
import sys


class Testor:
    def __init__(self):
        self.loop = asyncio.get_event_loop()
        self.myWS = None

    async def connect(self):
        self.myWS = aiohttp.ws_connect('ws://metab.ucoin.io:9201/ws/block')
        async with self.myWS as ws:

            async for msg in ws:
                if msg.tp == aiohttp.MsgType.text:
                    print(msg.data)
                elif msg.tp == aiohttp.MsgType.closed:
                    print("closed")
                    break
                elif msg.tp == aiohttp.MsgType.error:
                    print("error")
                    break

    def disconnect(self):
        self.myWS.close()

    def run(self):
        self.loop.call_later(2, self.disconnect)
        self.loop.run_until_complete(self.connect())

test = Testor()
test.run()

What am I doing wrong and how am I supposed to close websockets sessions ?

I also tried the following, which resulted in the same behaviour :

    def disconnect(self):
        asyncio.wait_for(self.myWS.close(), timeout=2)

Thanks a lot for your great library.

@jettify
Copy link
Member

jettify commented Jan 29, 2016

Do you have any error? Because your code looks good, since you are using async with, connection is closed implicitly on moving out from context manager block.

@Insoleet
Copy link
Contributor Author

It hanks, no error available. If I kill the program, I get :

Traceback (most recent call last):
  File "/home/inso/code/quamash/bug.py", line 37, in <module>
    test.run()
  File "/home/inso/code/quamash/bug.py", line 34, in run
    self.loop.run_until_complete(self.connect())
  File "/home/inso/.pyenv/versions/3.5.0/lib/python3.5/asyncio/base_events.py", line 330, in run_until_complete
    self.run_forever()
  File "/home/inso/.pyenv/versions/3.5.0/lib/python3.5/asyncio/base_events.py", line 301, in run_forever
    self._run_once()
  File "/home/inso/.pyenv/versions/3.5.0/lib/python3.5/asyncio/base_events.py", line 1162, in _run_once
    event_list = self._selector.select(timeout)
  File "/home/inso/.pyenv/versions/3.5.0/lib/python3.5/selectors.py", line 432, in select
    fd_event_list = self._epoll.poll(timeout, max_ev)
KeyboardInterrupt
Unclosed response
client_response: <ClientResponse(ws://metab.ucoin.io:9201/ws/block) [101 Switching Protocols]>
<CIMultiDictProxy('UPGRADE': 'websocket', 'CONNECTION': 'Upgrade', 'SEC-WEBSOCKET-ACCEPT': 'XCx173Vm1g+nCPvBCjngfPkECoI=')>

@Insoleet
Copy link
Contributor Author

Insoleet commented Feb 4, 2016

When closing websockets I got multiple errors in my code :

DEBUG:main:async_exception_handler:Exception handler executing
ERROR:main:async_exception_handler:Task exception was never retrieved
future: <Task finished coro=<connect_peers() done, defined at /home/inso/code/ucoin/sakia/src/sakia/core/net/node.py:518> exception=AttributeError("'NoneType' object has no attribute 'close'",)>
Traceback (most recent call last):
  File "/home/inso/code/ucoin/sakia/src/sakia/core/net/node.py", line 531, in connect_peers
    async for msg in ws:
TypeError: 'async for' requires an object with __aiter__ method, got NoneType

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/inso/.pyenv/versions/3.5.0/lib/python3.5/asyncio/tasks.py", line 239, in _step
    result = coro.send(value)
  File "/home/inso/code/ucoin/sakia/src/sakia/core/net/node.py", line 539, in connect_peers
    break
  File "/home/inso/.pyenv/versions/sakia-env/lib/python3.5/site-packages/aiohttp/client.py", line 530, in __aexit__
    yield from self._resp.close()
AttributeError: 'NoneType' object has no attribute 'close'

My code which opens websockets is :

                self._ws_connection['peer'] = peer_websocket.connect()
                async with self._ws_connection['peer'] as ws:
                    logging.debug("Connected successfully to peer ws : {0}".format(self.pubkey[:5]))
                    async for msg in ws:
                        if msg.tp == aiohttp.MsgType.text:
                            logging.debug("Received a peer : {0}".format(self.pubkey[:5]))
                            peer_data = peer_websocket.parse_text(msg.data)
                            self.refresh_peer_data(peer_data)
                        elif msg.tp == aiohttp.MsgType.closed:
                            break
                        elif msg.tp == aiohttp.MsgType.error:
                            break

And I close it this way :

    def close_ws(self):
        for ws in self._ws_connection.values():
            if ws:
                asyncio.as_completed(ws.close(), timeout=15)

I suppose there is something I'm doing wrong ?

@Insoleet
Copy link
Contributor Author

Insoleet commented Feb 8, 2016

I fixed a lot on things in my code, and now I'm left with this last problem :

I now handle the websocket coroutines instances like this :

    @asyncify
    async def connect_current_block(self):
        if not self._ws_connection['block']:
            try:
                conn_handler = self.endpoint.conn_handler()
                block_websocket = bma.ws.Block(conn_handler)
                ws_connection = block_websocket.connect()
                async with ws_connection as ws:
                    self._ws_connection['block'] = ws
                    logging.debug("Connected successfully to block ws : {0}".format(self.pubkey[:5]))
                    async for msg in ws:
                        if msg.tp == aiohttp.MsgType.text:
                            logging.debug("Received a block : {0}".format(self.pubkey[:5]))
                            block_data = block_websocket.parse_text(msg.data)
                            await self.refresh_block(block_data)
                        elif msg.tp == aiohttp.MsgType.closed:
                            break
                        elif msg.tp == aiohttp.MsgType.error:
                            break

I close my application with the following :

        try:
            loop.run_until_complete(app.stop())
        except asyncio.CancelledError:
            logging.info('CancelledError')

App.stop closes successfully my code coroutines but not the websockets :

    async def close_ws(self):
        for ws in self._ws_connection.values():
            if ws:
                await ws.close()
        logging.debug("closed")

The await passes, I get "closed" printed as expected, but when my application exits, it appears that multiples tasks would not be stopped :

DEBUG:main:async_exception_handler:Exception handler executing
ERROR:main:async_exception_handler:Task was destroyed but it is pending!
task: <Task pending coro=<connect_current_block() running at D:\Dev\Code\sakia\src\sakia\core\net\node.py:332> wait_for=<Future pending cb=[Task._wakeup()]>>
DEBUG:main:async_exception_handler:Exception handler executing
ERROR:main:async_exception_handler:Task was destroyed but it is pending!
task: <Task pending coro=<connect_peers() running at D:\Dev\Code\sakia\src\sakia\core\net\node.py:534> wait_for=<Future pending cb=[Task._wakeup()]>>

It looks like the async with is never interrupted :

async with self._ws_connection['peer'] as ws:

I tried to put breakpoints in the __anext__ of WebSocketClientResponse but it would never be called.

Do you have any example of closing websockets from outside the async with scope ?

@juacker
Copy link

juacker commented Mar 29, 2016

Your task running connect_peers() has not finished, so when you stop the loop, asyncio raises the exception.

After running ws.close(), and before stopping the asyncio loop, you need the loop to execute the task, so it can leave the async with loop and end.

@fafhrd91
Copy link
Member

fafhrd91 commented Feb 1, 2017

is this still relevant?

actually, this is interesting question. lets say we have task that reads incoming messages,
but we can not run close() if other task waits on read(). current behavior is broken.

i think we need to introduce new message type WSMsgType.CLOSING and inject this message into _reader if websocket connection is in _waiting state

@fafhrd91
Copy link
Member

fafhrd91 commented Feb 1, 2017

close() is safe now.

@fafhrd91 fafhrd91 closed this as completed Feb 1, 2017
mpaolini added a commit to feedstock/aiohttp that referenced this issue Aug 16, 2017
Due to improvements done in aio-libs#754, websocket `close()`
can now be called from a separate task.
asvetlov pushed a commit that referenced this issue Aug 24, 2017
* Update server-side websocket close FAQ

Due to improvements done in #754, websocket `close()`
can now be called from a separate task.

* Minor doc fix for websocket receive

* Fix web module usage and use .gather

* Use set() instead of dict in FAQ
@lock
Copy link

lock bot commented Oct 29, 2019

This thread has been automatically locked since there has not been
any recent activity after it was closed. Please open a new issue for
related bugs.

If you feel like there's important points made in this discussion,
please include those exceprts into that new issue.

@lock lock bot added the outdated label Oct 29, 2019
@lock lock bot locked as resolved and limited conversation to collaborators Oct 29, 2019
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
Projects
None yet
Development

No branches or pull requests

4 participants