Skip to content

Commit

Permalink
Accepting back-pressure from slow websocket clients #1367
Browse files Browse the repository at this point in the history
  • Loading branch information
Nikolay Kim committed Jan 31, 2017
1 parent 55b1f0e commit b3c80ee
Show file tree
Hide file tree
Showing 5 changed files with 41 additions and 9 deletions.
2 changes: 2 additions & 0 deletions CHANGES.rst
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ CHANGES

- Remove `web.Application` dependency from `web.UrlDispatcher` #1510

- Accepting back-pressure from slow websocket clients #1367

- Do not pause transport during set_parser stage #1211

- Lingering close doesn't terminate before timeout #1559
Expand Down
31 changes: 23 additions & 8 deletions aiohttp/_ws_impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ class WSMsgType(IntEnum):
PACK_LEN3 = Struct('!BBQ').pack
PACK_CLOSE_CODE = Struct('!H').pack
MSG_SIZE = 2 ** 14
DEFAULT_LIMIT = 2 ** 16


_WSMessageBase = collections.namedtuple('_WSMessageBase',
Expand Down Expand Up @@ -299,10 +300,13 @@ def parse_frame(buf, continuation=False):

class WebSocketWriter:

def __init__(self, writer, *, use_mask=False, random=random.Random()):
def __init__(self, writer, *,
use_mask=False, limit=DEFAULT_LIMIT, random=random.Random()):
self.writer = writer
self.use_mask = use_mask
self.randrange = random.randrange
self._limit = limit
self._output_size = 0

def _send_frame(self, message, opcode):
"""Send a frame over the websocket with message as its payload."""
Expand All @@ -325,43 +329,53 @@ def _send_frame(self, message, opcode):
mask = mask.to_bytes(4, 'big')
message = _websocket_mask(mask, bytearray(message))
self.writer.write(header + mask + message)
self._output_size += len(header) + len(mask) + len(message)
else:
if len(message) > MSG_SIZE:
self.writer.write(header)
self.writer.write(message)
else:
self.writer.write(header + message)

self._output_size += len(header) + len(message)

if self._output_size > self._limit:
self._output_size = 0
return self.writer.drain()

return ()

def pong(self, message=b''):
"""Send pong message."""
if isinstance(message, str):
message = message.encode('utf-8')
self._send_frame(message, WSMsgType.PONG)
return self._send_frame(message, WSMsgType.PONG)

def ping(self, message=b''):
"""Send ping message."""
if isinstance(message, str):
message = message.encode('utf-8')
self._send_frame(message, WSMsgType.PING)
return self._send_frame(message, WSMsgType.PING)

def send(self, message, binary=False):
"""Send a frame over the websocket with message as its payload."""
if isinstance(message, str):
message = message.encode('utf-8')
if binary:
self._send_frame(message, WSMsgType.BINARY)
return self._send_frame(message, WSMsgType.BINARY)
else:
self._send_frame(message, WSMsgType.TEXT)
return self._send_frame(message, WSMsgType.TEXT)

def close(self, code=1000, message=b''):
"""Close the websocket, sending the specified code and message."""
if isinstance(message, str):
message = message.encode('utf-8')
self._send_frame(
return self._send_frame(
PACK_CLOSE_CODE(code) + message, opcode=WSMsgType.CLOSE)


def do_handshake(method, headers, transport, protocols=()):
def do_handshake(method, headers, transport,
protocols=(), write_buffer_size=DEFAULT_LIMIT):
"""Prepare WebSocket handshake.
It return HTTP response code, response headers, websocket parser,
Expand All @@ -371,6 +385,7 @@ def do_handshake(method, headers, transport, protocols=()):
the returned response headers contain the first protocol in this list
which the server also knows.
`write_buffer_size` max size of write buffer before `drain()` get called.
"""
# WebSocket accepts only GET
if method.upper() != hdrs.METH_GET:
Expand Down Expand Up @@ -434,5 +449,5 @@ def do_handshake(method, headers, transport, protocols=()):
return (101,
response_headers,
WebSocketParser,
WebSocketWriter(transport),
WebSocketWriter(transport, limit=write_buffer_size),
protocol)
1 change: 1 addition & 0 deletions aiohttp/web_ws.py
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,7 @@ def close(self, *, code=1000, message=b''):
self._closed = True
try:
self._writer.close(code, message)
yield from self.drain()
except (asyncio.CancelledError, asyncio.TimeoutError):
self._close_code = 1006
raise
Expand Down
10 changes: 10 additions & 0 deletions docs/web_reference.rst
Original file line number Diff line number Diff line change
Expand Up @@ -1109,6 +1109,16 @@ WebSocketResponse
.. seealso:: :ref:`WebSockets handling<aiohttp-web-websockets>`


WebSocketResponse Send Flow Control
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

To enable send flow control you need to treat methods
`ping()`, `pong()`, `send_str()`, `send_bytes()`, `send_json()` as coroutines.
By default write buffer size is set to 64k.

.. versionadded:: 1.3.0


WebSocketReady
^^^^^^^^^^^^^^

Expand Down
6 changes: 5 additions & 1 deletion tests/test_web_websocket.py
Original file line number Diff line number Diff line change
Expand Up @@ -361,6 +361,8 @@ def test_receive_exc_in_reader(make_request, loop, reader):
res = helpers.create_future(loop)
res.set_exception(exc)
reader.read = make_mocked_coro(res)
ws._resp_impl.transport.drain.return_value = helpers.create_future(loop)
ws._resp_impl.transport.drain.return_value.set_result(True)

msg = yield from ws.receive()
assert msg.type == WSMsgType.ERROR
Expand Down Expand Up @@ -444,7 +446,7 @@ def test_concurrent_receive(make_request):


@asyncio.coroutine
def test_close_exc(make_request, reader, loop):
def test_close_exc(make_request, reader, loop, mocker):
req = make_request('GET', '/')

ws = WebSocketResponse()
Expand All @@ -453,6 +455,8 @@ def test_close_exc(make_request, reader, loop):
exc = ValueError()
reader.read.return_value = helpers.create_future(loop)
reader.read.return_value.set_exception(exc)
ws._resp_impl.transport.drain.return_value = helpers.create_future(loop)
ws._resp_impl.transport.drain.return_value.set_result(True)

yield from ws.close()
assert ws.closed
Expand Down

0 comments on commit b3c80ee

Please sign in to comment.