Skip to content

Commit

Permalink
Await in websockets (#2475)
Browse files Browse the repository at this point in the history
* Work on

* Fix tests

* Update docs
  • Loading branch information
asvetlov authored Nov 7, 2017
1 parent 132495a commit c29b630
Show file tree
Hide file tree
Showing 14 changed files with 163 additions and 144 deletions.
2 changes: 2 additions & 0 deletions CHANGES/2475.removal
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
`send_str()`, `send_bytes()`, `send_json()`, `ping()` and `pong()` are
genuine async functions now.
24 changes: 12 additions & 12 deletions aiohttp/client_ws.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ def _reset_heartbeat(self):

def _send_heartbeat(self):
if self._heartbeat is not None and not self._closed:
self.ping()
self._writer.ping()

if self._pong_response_cb is not None:
self._pong_response_cb.cancel()
Expand Down Expand Up @@ -106,25 +106,25 @@ def get_extra_info(self, name, default=None):
def exception(self):
return self._exception

def ping(self, message='b'):
self._writer.ping(message)
async def ping(self, message='b'):
await self._writer.ping(message)

def pong(self, message='b'):
self._writer.pong(message)
async def pong(self, message='b'):
await self._writer.pong(message)

def send_str(self, data):
async def send_str(self, data):
if not isinstance(data, str):
raise TypeError('data argument must be str (%r)' % type(data))
return self._writer.send(data, binary=False)
await self._writer.send(data, binary=False)

def send_bytes(self, data):
async def send_bytes(self, data):
if not isinstance(data, (bytes, bytearray, memoryview)):
raise TypeError('data argument must be byte-ish (%r)' %
type(data))
return self._writer.send(data, binary=True)
await self._writer.send(data, binary=True)

def send_json(self, data, *, dumps=json.dumps):
return self.send_str(dumps(data))
async def send_json(self, data, *, dumps=json.dumps):
await self.send_str(dumps(data))

async def close(self, *, code=1000, message=b''):
# we need to break `receive()` cycle first,
Expand Down Expand Up @@ -223,7 +223,7 @@ async def receive(self, timeout=None):
elif msg.type == WSMsgType.CLOSING:
self._closing = True
elif msg.type == WSMsgType.PING and self._autoping:
self.pong(msg.data)
await self.pong(msg.data)
continue
elif msg.type == WSMsgType.PONG and self._autoping:
continue
Expand Down
15 changes: 6 additions & 9 deletions aiohttp/http_writer.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
"""Http related parsers and protocol."""

import asyncio
import collections
import socket
import zlib
Expand Down Expand Up @@ -112,17 +111,16 @@ def set_tcp_cork(self, value):
except OSError:
pass

@asyncio.coroutine
def drain(self):
async def drain(self):
"""Flush the write buffer.
The intended use is to write
w.write(data)
yield from w.drain()
await w.drain()
"""
if self._protocol.transport is not None:
yield from self._protocol._drain_helper()
await self._protocol._drain_helper()


class PayloadWriter(AbstractPayloadWriter):
Expand Down Expand Up @@ -281,17 +279,16 @@ async def write_eof(self, chunk=b''):
self._transport = None
self._stream.release()

@asyncio.coroutine
def drain(self, last=False):
async def drain(self, last=False):
if self._transport is not None:
if self._buffer:
self._transport.write(b''.join(self._buffer))
if not last:
self._buffer.clear()
yield from self._stream.drain()
await self._stream.drain()
else:
# wait for transport
if self._drain_waiter is None:
self._drain_waiter = self.loop.create_future()

yield from self._drain_waiter
await self._drain_waiter
4 changes: 2 additions & 2 deletions aiohttp/payload_streamer.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@
As a simple case, you can upload data from file::
@aiohttp.streamer
def file_sender(writer, file_name=None):
async def file_sender(writer, file_name=None):
with open(file_name, 'rb') as f:
chunk = f.read(2**16)
while chunk:
yield from writer.write(chunk)
await writer.write(chunk)
chunk = f.read(2**16)
Expand Down
26 changes: 13 additions & 13 deletions aiohttp/web_ws.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ def _reset_heartbeat(self):

def _send_heartbeat(self):
if self._heartbeat is not None and not self._closed:
self.ping()
self._writer.ping()

if self._pong_response_cb is not None:
self._pong_response_cb.cancel()
Expand Down Expand Up @@ -166,34 +166,34 @@ def compress(self):
def exception(self):
return self._exception

def ping(self, message='b'):
async def ping(self, message='b'):
if self._writer is None:
raise RuntimeError('Call .prepare() first')
self._writer.ping(message)
await self._writer.ping(message)

def pong(self, message='b'):
async def pong(self, message='b'):
# unsolicited pong
if self._writer is None:
raise RuntimeError('Call .prepare() first')
self._writer.pong(message)
await self._writer.pong(message)

def send_str(self, data):
async def send_str(self, data):
if self._writer is None:
raise RuntimeError('Call .prepare() first')
if not isinstance(data, str):
raise TypeError('data argument must be str (%r)' % type(data))
return self._writer.send(data, binary=False)
await self._writer.send(data, binary=False)

def send_bytes(self, data):
async def send_bytes(self, data):
if self._writer is None:
raise RuntimeError('Call .prepare() first')
if not isinstance(data, (bytes, bytearray, memoryview)):
raise TypeError('data argument must be byte-ish (%r)' %
type(data))
return self._writer.send(data, binary=True)
await self._writer.send(data, binary=True)

def send_json(self, data, *, dumps=json.dumps):
return self.send_str(dumps(data))
async def send_json(self, data, *, dumps=json.dumps):
await self.send_str(dumps(data))

async def write_eof(self):
if self._eof_sent:
Expand Down Expand Up @@ -303,7 +303,7 @@ async def receive(self, timeout=None):
elif msg.type == WSMsgType.CLOSING:
self._closing = True
elif msg.type == WSMsgType.PING and self._autoping:
self.pong(msg.data)
await self.pong(msg.data)
continue
elif msg.type == WSMsgType.PONG and self._autoping:
continue
Expand All @@ -330,7 +330,7 @@ async def receive_json(self, *, loads=json.loads, timeout=None):
data = await self.receive_str(timeout=timeout)
return loads(data)

def write(self, data):
async def write(self, data):
raise RuntimeError("Cannot call .write() for websocket")

def __aiter__(self):
Expand Down
30 changes: 29 additions & 1 deletion docs/client_reference.rst
Original file line number Diff line number Diff line change
Expand Up @@ -1256,14 +1256,30 @@ manually.

Returns exception if any occurs or returns None.

.. method:: ping(message=b'')
.. comethod:: ping(message=b'')

Send :const:`~aiohttp.WSMsgType.PING` to peer.

:param message: optional payload of *ping* message,
:class:`str` (converted to *UTF-8* encoded bytes)
or :class:`bytes`.

.. versionchanged:: 3.0

The method is converted into :term:`coroutine`

.. comethod:: pong(message=b'')

Send :const:`~aiohttp.WSMsgType.PONG` to peer.

:param message: optional payload of *pong* message,
:class:`str` (converted to *UTF-8* encoded bytes)
or :class:`bytes`.

.. versionchanged:: 3.0

The method is converted into :term:`coroutine`

.. comethod:: send_str(data)

Send *data* to peer as :const:`~aiohttp.WSMsgType.TEXT` message.
Expand All @@ -1272,6 +1288,10 @@ manually.

:raise TypeError: if data is not :class:`str`

.. versionchanged:: 3.0

The method is converted into :term:`coroutine`

.. comethod:: send_bytes(data)

Send *data* to peer as :const:`~aiohttp.WSMsgType.BINARY` message.
Expand All @@ -1281,6 +1301,10 @@ manually.
:raise TypeError: if data is not :class:`bytes`,
:class:`bytearray` or :class:`memoryview`.

.. versionchanged:: 3.0

The method is converted into :term:`coroutine`

.. comethod:: send_json(data, *, dumps=json.dumps)

Send *data* to peer as JSON string.
Expand All @@ -1298,6 +1322,10 @@ manually.
:raise TypeError: if value returned by ``dumps(data)`` is not
:class:`str`

.. versionchanged:: 3.0

The method is converted into :term:`coroutine`

.. comethod:: close(*, code=1000, message=b'')

A :ref:`coroutine<coroutine>` that initiates closing handshake by sending
Expand Down
Loading

0 comments on commit c29b630

Please sign in to comment.