Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Default max_queue blocks websocket cancellation with high traffic #1555

Closed
btschwertfeger opened this issue Nov 25, 2024 · 14 comments
Closed
Labels

Comments

@btschwertfeger
Copy link

I'm working on the python-kraken-sdk and encounter strange behavior, where the websocket connection doesn't get properly closed when the connection is continuously receiving messages filling up the queue (max_queue (https://websockets.readthedocs.io/en/stable/reference/asyncio/client.html#module-websockets.asyncio.client).

This happens locally (ubuntu 24.04, Python 3.11.9) and in CI (GitHub actions, windows-latest, ubuntu-latest, py3.11 and py3.12, e.g. https://github.com/btschwertfeger/python-kraken-sdk/actions/runs/12016162113 and https://github.com/btschwertfeger/python-kraken-sdk/actions/runs/12016429517)

I'm using websockets==14.1.

In the following is the log from Ubuntu (and other runs when having max_queue not touched):

INFO     tests.spot.helper:helper.py:97 ["BTC/USD", {"channel": "book", "type": "update", "data": [{"symbol": "BTC/USD", "bids": [{"price": 95398.3, "qty": 0.25}], "asks": [], "checksum": 329006635, "timestamp": "2024-11-25T18:10:51.622559Z"}]}]
DEBUG    websockets.client:protocol.py:745 > CLOSE 1000 (OK) [2 bytes]
DEBUG    websockets.client:protocol.py:175 = connection is CLOSING
DEBUG    websockets.client:protocol.py:599 < TEXT '{"channel":"book","type":"update","data":[{"sym...25T18:10:51.656589Z"}]}' [182 bytes]
DEBUG    websockets.client:protocol.py:599 < TEXT '{"channel":"book","type":"update","data":[{"sym...25T18:10:51.656612Z"}]}' [181 bytes]
DEBUG    websockets.client:protocol.py:599 < TEXT '{"channel":"book","type":"update","data":[{"sym...25T18:10:51.660735Z"}]}' [182 bytes]
DEBUG    websockets.client:protocol.py:599 < TEXT '{"channel":"book","type":"update","data":[{"sym...25T18:10:51.660780Z"}]}' [182 bytes]
DEBUG    websockets.client:protocol.py:599 < TEXT '{"channel":"book","type":"update","data":[{"sym...25T18:10:51.660828Z"}]}' [182 bytes]
DEBUG    websockets.client:protocol.py:599 < TEXT '{"channel":"book","type":"update","data":[{"sym...25T18:10:51.661035Z"}]}' [182 bytes]
DEBUG    websockets.client:protocol.py:599 < TEXT '{"channel":"book","type":"update","data":[{"sym...25T18:10:51.661093Z"}]}' [182 bytes]
DEBUG    websockets.client:protocol.py:599 < TEXT '{"channel":"book","type":"update","data":[{"sym...25T18:10:51.661109Z"}]}' [181 bytes]
DEBUG    websockets.client:protocol.py:599 < TEXT '{"channel":"book","type":"update","data":[{"sym...25T18:10:51.665449Z"}]}' [217 bytes]
DEBUG    websockets.client:protocol.py:599 < TEXT '{"channel":"book","type":"update","data":[{"sym...25T18:10:51.666942Z"}]}' [217 bytes]
DEBUG    websockets.client:protocol.py:599 < TEXT '{"channel":"book","type":"update","data":[{"sym...25T18:10:51.667265Z"}]}' [217 bytes]
DEBUG    websockets.client:protocol.py:599 < TEXT '{"channel":"book","type":"update","data":[{"sym...25T18:10:51.667316Z"}]}' [217 bytes]
DEBUG    websockets.client:protocol.py:599 < TEXT '{"channel":"book","type":"update","data":[{"sym...25T18:10:51.667336Z"}]}' [217 bytes]
DEBUG    websockets.client:protocol.py:599 < TEXT '{"channel":"book","type":"update","data":[{"sym...25T18:10:51.667410Z"}]}' [216 bytes]
DEBUG    websockets.client:protocol.py:599 < TEXT '{"channel":"book","type":"update","data":[{"sym...25T18:10:51.669715Z"}]}' [252 bytes]
DEBUG    websockets.client:protocol.py:599 < TEXT '{"channel":"book","type":"update","data":[{"sym...25T18:10:51.675411Z"}]}' [182 bytes]
DEBUG    websockets.client:protocol.py:599 < TEXT '{"channel":"book","type":"update","data":[{"sym...25T18:10:51.675436Z"}]}' [182 bytes]
DEBUG    websockets.client:protocol.py:599 < TEXT '{"channel":"book","type":"update","data":[{"sym...25T18:10:51.675518Z"}]}' [252 bytes]
DEBUG    websockets.client:protocol.py:599 < TEXT '{"channel":"book","type":"update","data":[{"sym...25T18:10:51.675575Z"}]}' [181 bytes]
DEBUG    websockets.client:connection.py:799 % sending keepalive ping
=========================== short test summary info ============================
FAILED tests/spot/test_spot_orderbook.py::test_add_book - Failed: Timeout >60.0s

... when setting the max_queue to None (not recommended by the documentation), everything is fine:

INFO     tests.spot.helper:helper.py:97 ["BTC/USD", {"channel": "book", "type": "update", "data": [{"symbol": "BTC/USD", "bids": [{"price": 94794.3, "qty": 0.2}], "asks": [], "checksum": 3009784427, "timestamp": "2024-11-25T19:55:47.910197Z"}]}]
DEBUG    websockets.client:protocol.py:745 > CLOSE 1000 (OK) [2 bytes]
DEBUG    websockets.client:protocol.py:175 = connection is CLOSING
DEBUG    websockets.client:protocol.py:599 < TEXT '{"channel":"book","type":"update","data":[{"sym...25T19:55:47.911987Z"}]}' [217 bytes]
DEBUG    websockets.client:protocol.py:599 < TEXT '{"channel":"book","type":"update","data":[{"sym...25T19:55:47.912155Z"}]}' [252 bytes]
DEBUG    websockets.client:protocol.py:599 < TEXT '{"channel":"book","type":"update","data":[{"sym...25T19:55:47.912337Z"}]}' [182 bytes]
DEBUG    websockets.client:protocol.py:599 < TEXT '{"channel":"book","type":"update","data":[{"sym...25T19:55:47.912350Z"}]}' [182 bytes]
DEBUG    websockets.client:protocol.py:599 < TEXT '{"channel":"book","type":"update","data":[{"sym...25T19:55:47.915588Z"}]}' [217 bytes]
DEBUG    websockets.client:protocol.py:599 < TEXT '{"channel":"book","type":"update","data":[{"sym...25T19:55:47.918207Z"}]}' [181 bytes]
DEBUG    websockets.client:protocol.py:599 < TEXT '{"channel":"book","type":"update","data":[{"sym...25T19:55:47.918814Z"}]}' [181 bytes]
DEBUG    websockets.client:protocol.py:599 < TEXT '{"channel":"book","type":"update","data":[{"sym...25T19:55:47.918849Z"}]}' [182 bytes]
DEBUG    websockets.client:protocol.py:599 < TEXT '{"channel":"book","type":"update","data":[{"sym...25T19:55:47.918940Z"}]}' [182 bytes]
DEBUG    websockets.client:protocol.py:599 < TEXT '{"channel":"book","type":"update","data":[{"sym...25T19:55:47.919616Z"}]}' [252 bytes]
DEBUG    websockets.client:protocol.py:599 < TEXT '{"channel":"book","type":"update","data":[{"sym...25T19:55:47.920569Z"}]}' [252 bytes]
DEBUG    websockets.client:protocol.py:599 < TEXT '{"channel":"book","type":"update","data":[{"sym...25T19:55:47.922159Z"}]}' [217 bytes]
DEBUG    websockets.client:protocol.py:599 < TEXT '{"channel":"book","type":"update","data":[{"sym...25T19:55:47.922502Z"}]}' [182 bytes]
DEBUG    websockets.client:protocol.py:599 < TEXT '{"channel":"book","type":"update","data":[{"sym...25T19:55:47.923636Z"}]}' [182 bytes]
DEBUG    websockets.client:protocol.py:599 < TEXT '{"channel":"book","type":"update","data":[{"sym...25T19:55:47.929364Z"}]}' [251 bytes]
DEBUG    websockets.client:protocol.py:599 < TEXT '{"channel":"book","type":"update","data":[{"sym...25T19:55:47.929945Z"}]}' [217 bytes]
DEBUG    websockets.client:protocol.py:599 < TEXT '{"channel":"book","type":"update","data":[{"sym...25T19:55:47.930777Z"}]}' [182 bytes]
DEBUG    websockets.client:protocol.py:599 < TEXT '{"channel":"book","type":"update","data":[{"sym...25T19:55:47.930900Z"}]}' [182 bytes]
DEBUG    websockets.client:protocol.py:599 < TEXT '{"channel":"book","type":"update","data":[{"sym...25T19:55:47.930995Z"}]}' [182 bytes]
DEBUG    websockets.client:protocol.py:599 < TEXT '{"channel":"book","type":"update","data":[{"sym...25T19:55:47.932799Z"}]}' [217 bytes]
DEBUG    websockets.client:protocol.py:599 < TEXT '{"channel":"book","type":"update","data":[{"sym...25T19:55:47.932858Z"}]}' [251 bytes]
DEBUG    websockets.client:protocol.py:599 < TEXT '{"channel":"book","type":"update","data":[{"sym...25T19:55:47.934082Z"}]}' [182 bytes]
DEBUG    websockets.client:protocol.py:599 < TEXT '{"channel":"book","type":"update","data":[{"sym...25T19:55:47.934498Z"}]}' [252 bytes]
DEBUG    websockets.client:protocol.py:599 < TEXT '{"channel":"book","type":"update","data":[{"sym...25T19:55:47.934839Z"}]}' [218 bytes]
DEBUG    websockets.client:protocol.py:599 < TEXT '{"channel":"book","type":"update","data":[{"sym...25T19:55:47.938315Z"}]}' [217 bytes]
DEBUG    websockets.client:protocol.py:599 < TEXT '{"channel":"book","type":"update","data":[{"sym...25T19:55:47.938505Z"}]}' [182 bytes]
DEBUG    websockets.client:protocol.py:599 < TEXT '{"channel":"book","type":"update","data":[{"sym...25T19:55:47.938646Z"}]}' [252 bytes]
DEBUG    websockets.client:protocol.py:599 < TEXT '{"channel":"book","type":"update","data":[{"sym...25T19:55:47.939445Z"}]}' [217 bytes]
DEBUG    websockets.client:protocol.py:599 < TEXT '{"channel":"book","type":"update","data":[{"sym...25T19:55:47.939583Z"}]}' [217 bytes]
DEBUG    websockets.client:protocol.py:599 < TEXT '{"channel":"book","type":"update","data":[{"sym...25T19:55:47.940378Z"}]}' [182 bytes]
DEBUG    websockets.client:protocol.py:599 < TEXT '{"channel":"book","type":"update","data":[{"sym...25T19:55:47.941481Z"}]}' [252 bytes]
DEBUG    websockets.client:protocol.py:599 < TEXT '{"channel":"book","type":"update","data":[{"sym...25T19:55:47.942796Z"}]}' [182 bytes]
DEBUG    websockets.client:protocol.py:599 < TEXT '{"channel":"book","type":"update","data":[{"sym...25T19:55:47.947044Z"}]}' [252 bytes]
DEBUG    websockets.client:protocol.py:599 < TEXT '{"channel":"book","type":"update","data":[{"sym...25T19:55:47.947067Z"}]}' [251 bytes]
DEBUG    websockets.client:protocol.py:599 < TEXT '{"channel":"book","type":"update","data":[{"sym...25T19:55:47.949671Z"}]}' [251 bytes]
DEBUG    websockets.client:protocol.py:599 < TEXT '{"channel":"book","type":"update","data":[{"sym...25T19:55:47.952241Z"}]}' [217 bytes]
DEBUG    websockets.client:protocol.py:599 < TEXT '{"channel":"book","type":"update","data":[{"sym...25T19:55:47.952430Z"}]}' [182 bytes]
DEBUG    websockets.client:protocol.py:599 < TEXT '{"channel":"book","type":"update","data":[{"sym...25T19:55:47.953434Z"}]}' [252 bytes]
DEBUG    websockets.client:protocol.py:599 < CLOSE 1000 (OK) [2 bytes]
DEBUG    websockets.client:protocol.py:649 < EOF
DEBUG    websockets.client:protocol.py:757 > EOF
DEBUG    websockets.client:protocol.py:175 = connection is CLOSED
DEBUG    websockets.client:connection.py:960 x closing TCP connection
WARNING  kraken.spot.websocket.connectors:connectors.py:259 Connection closed

I'm not that into how the websockets implementation of yours work, but it doesn't seem to respect connections with high traffic. Proper cancellation should be done in any case, regardless of the size or number of queued messages.

This can be reproduced by running the following:

git clone https://github.com/btschwertfeger/python-kraken-sdk
cd python-kraken-sdk
git switch 
uv venv --pyhton=3.11
source .venv/bin/activate
uv pip install ".[test]"
python3 -m pytest -vv --log-cli-level=DEBUG tests/spot/test_spot_orderbook.py::test_add_book

One may need to run this test multiple times in order to get the situation with too much messages during cancellation. An alternative would be to add more books to the test case (e.g. await orderbook.add_book(pairs=["BTC/USD", "DOT/USD", "ETH/USD", "MATIC/USD", "BTC/EUR"])).

@aaugustin
Copy link
Member

This is a plausible bug. If I understand correctly, to reproduce, I need to:

  • saturate the queue (simply by not reading messages received by the connection)
  • close the connection <-- this is where I'm not 100% sure -- can you confirm it's a simple await websocket.close()? Or is it about cancelling recv() in the sense of cancelling an asyncio task?

@aaugustin
Copy link
Member

As a mitigation, you can use websockets.legacy.client.connect instead of websockets.connect.

See https://websockets.readthedocs.io/en/latest/project/changelog.html and https://websockets.readthedocs.io/en/latest/howto/upgrade.html for context.

@btschwertfeger
Copy link
Author

btschwertfeger commented Nov 26, 2024

This is a plausible bug. If I understand correctly, to reproduce, I need to:

  • saturate the queue (simply by not reading messages received by the connection)
  • close the connection <-- this is where I'm not 100% sure -- can you confirm it's a simple await websocket.close()? Or is it about cancelling recv() in the sense of cancelling an asyncio task?

Unfortunately I'm not sure about websocket.close() and recv().

I tried debugging it using the following script, debugpy, and vscode, but until now I had not much time and didn't managed to access the "asyncio0" thread. Do you have any recommendations how to debug it?

# repro.py
import asyncio
import logging
import json
from websockets.asyncio.client import connect

logging.getLogger(__name__)
logging.basicConfig(level=logging.DEBUG)


class MyClient:

    def __init__(self):
        self.keep_alive = True

    async def start(self):
        if hasattr(self, "task") and not self.task.done():
            return
        self.task: asyncio.Task = asyncio.create_task(self.run())

    async def stop(self):
        self.keep_alive = False
        if hasattr(self, "task") and not self.task.done():
            await self.task

    async def run(self):

        event: asyncio.Event = asyncio.Event()
        async with connect(
            f"wss://ws.kraken.com/v2",
            ping_interval=30,
            # max_queue=None,  # having this enabled doesn't cause problems
        ) as socket:

            if not event.is_set():
                # subscribe to some orderbook events
                # also try to reduce "depth" to 10 and just take DOT/EUR  
                await socket.send(
                    json.dumps(
                        {
                            "method": "subscribe",
                            "params": {
                                "channel": "book",
                                "symbol": [
                                    "BTC/USD",
                                    "DOT/USD",
                                    "ETH/USD",
                                    "MATIC/USD",
                                    "BTC/EUR",
                                ],
                                "depth": 100
                            },
                        }
                    )
                )
                event.set()

            while self.keep_alive:
                try:
                    _message = await asyncio.wait_for(socket.recv(), timeout=10)
                except TimeoutError:
                    pass
                except asyncio.CancelledError:
                    self.keep_alive = False
                else:
                    try:
                        message = json.loads(_message)
                    except ValueError:
                        pass
                    else:
                        pass

    async def __aenter__(self):
        await self.start()
        return self

    async def __aexit__(self, *args, **kwargs):
        await self.stop()


async def main():
    async with MyClient():
        await asyncio.sleep(2)


if __name__ == "__main__":
    asyncio.run(main())

BTW: Thanks for the note about the legacy asyncio stuff, but I just set max_queue=None until this is resolved.

@btschwertfeger
Copy link
Author

I opened a pull request that addresses the issue #1556.

@aaugustin
Copy link
Member

Thank you, that helps.

@aaugustin
Copy link
Member

To make the best use of your time, I recommend that you wait until I have time to review the PR before doing more work (e.g. it isn't useful to polish the code and make CI green if we decide to take a different approach to fixing the issue).

@aaugustin
Copy link
Member

I could reproduce the problem with the following scripts:

SERVER

import asyncio
from websockets.asyncio.server import serve


async def count(websocket):
    for i in range(20):
        await websocket.send(str(i))


async def main():
    async with serve(count, "localhost", 8765) as server:
        await server.serve_forever()


if __name__ == "__main__":
    asyncio.run(main())

Client

import time
from websockets.sync.client import connect


def block():
    ws = connect("ws://localhost:8765", close_timeout=1)
    time.sleep(1)
    print("closing")
    ws.close()


if __name__ == "__main__":
    block()

In the client, ws.close() never returns, while it should time out after 1 second.

@aaugustin
Copy link
Member

I tested if the asyncio implementation also suffers from this issue. It doesn't.

@aaugustin
Copy link
Member

The correct behavior for close() in this case is to time out: the incoming buffer is saturated, the closing handshake never completes properly, and websockets gives up after close_timeout.

@aaugustin aaugustin reopened this Jan 11, 2025
@aaugustin
Copy link
Member

Oops - I just fixed a related issue in the threading implementation but I missed that this one affects the asyncio implementation. Reopening.

@aaugustin
Copy link
Member

I could reproduce the problem with the script you provided above.

At first sight:

  • There's clearly a bug: websockets should time out according to close_timeout in this scenario and it doesn't.
  • The structure of your program is unnecessarily complex. The bug wouldn't manifest if you used a while True: ... loop instead of while self.keep_alive: ... and you exited the loop:
    • either by canceling self.task(which would raise CancelledError in self.talk)
    • or by closing the WebSocket connection (which would raise ConnectionClosed in self.task)
  • The default value of max_queue is too small for this server. Since it sends small messages, you can safely set max_queue to a higher value and you could get slightly better performance. There's a real risk that the server sends messages faster than you can read them so I recommend against disabling it entirely.

@aaugustin
Copy link
Member

aaugustin commented Jan 14, 2025

EDIT - incorrect analysis removed.

@newton108
Copy link

newton108 commented Jan 15, 2025

I also encountered this issue in the async version of client (connecting to some other server). Everything is fine when max_queue=None and not able to close the connection otherwise if the feed strikes too fast. My conjecture is the closing handshake responded by the server is discarded when the queue is full. If that's the case, maybe we can inspect the type of the incoming message before discarding them even when the queue is filled?

@aaugustin
Copy link
Member

It's not discarded; it's simply not read due to backpressure. Indeed, websockets stop reading from the network when it's receiving data faster than the application can process it.

By setting max_queue to None, you create an unbounded buffer in websockets, which is a bad idea if you cannot keep up with the rate at which you receive data. If the messages are typically 1kB, you set it to 1000 and the buffer is at most 1MB, which is large but not unbounded.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
4 participants
@aaugustin @newton108 @btschwertfeger and others