Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

body thread helpers #3205

Merged
merged 33 commits into from
Nov 30, 2018
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
bc42974
Merge remote-tracking branch 'aio-libs/master'
thehesiod Aug 20, 2018
fda17d8
ensure large body prep processes run in a worker thread
thehesiod Aug 20, 2018
bf156dc
add feature file
thehesiod Aug 20, 2018
4e6d0be
flake fix
thehesiod Aug 20, 2018
1de3235
fix based on review and add tests
thehesiod Aug 20, 2018
b921a22
fix tests and changes based on review
thehesiod Aug 20, 2018
a8b570d
switch to new attribute
thehesiod Aug 20, 2018
1474167
fix and add test
thehesiod Aug 20, 2018
391f995
bugfixes
thehesiod Aug 20, 2018
1f421e7
fix warning
thehesiod Aug 21, 2018
56d8235
fix import order
thehesiod Aug 21, 2018
32c8171
fix unittests
thehesiod Aug 21, 2018
82aac11
fix coverage by fixing tests
thehesiod Aug 22, 2018
82b3119
add another sub-test
thehesiod Aug 22, 2018
ab71cea
Merge remote-tracking branch 'aio-libs/master' into thehesiod-thread-…
thehesiod Sep 24, 2018
d209ddb
Merge remote-tracking branch 'aio-libs/master' into thehesiod-thread-…
thehesiod Oct 4, 2018
d07be7d
remove class which never worked correctly
thehesiod Oct 4, 2018
1445324
fix imports
thehesiod Oct 4, 2018
2f50597
Merge remote-tracking branch 'aio-libs/master' into thehesiod-thread-…
thehesiod Oct 8, 2018
1a501dd
revert change?
thehesiod Oct 8, 2018
58b6f27
reword
thehesiod Oct 8, 2018
2ce23cb
Merge branch 'master' into thehesiod-thread-helpers
asvetlov Nov 24, 2018
a0cf83b
Update web_response.py
asvetlov Nov 24, 2018
33a4818
Update web_response.py
asvetlov Nov 24, 2018
9f31a81
Merge branch 'master' into thehesiod-thread-helpers
asvetlov Nov 24, 2018
274c8bc
Update web_response.py
asvetlov Nov 25, 2018
781ec52
fix merge, rename param and add zlib_executor param
thehesiod Nov 26, 2018
b18bbe4
fixes
thehesiod Nov 26, 2018
eb8f03e
mypy fixes
thehesiod Nov 26, 2018
a4a09a1
fix import order
thehesiod Nov 27, 2018
3ae1854
merge fix
thehesiod Nov 27, 2018
6799795
feedback from review
thehesiod Nov 30, 2018
fc57bcb
feedback from review
thehesiod Nov 30, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGES/3205.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Ensure that large compression and json processes run in a background thread to avoid blocking the main thread and potentially trigger health check failures.
68 changes: 53 additions & 15 deletions aiohttp/web_response.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import asyncio
import collections
import datetime
import enum
Expand All @@ -18,7 +19,8 @@
from .typedefs import LooseHeaders


__all__ = ('ContentCoding', 'StreamResponse', 'Response', 'json_response')
__all__ = ('ContentCoding', 'StreamResponse', 'Response', 'json_response',
'async_json_response')


class ContentCoding(enum.Enum):
Expand Down Expand Up @@ -276,23 +278,23 @@ def _generate_content_type_header(self, CONTENT_TYPE=hdrs.CONTENT_TYPE):
ctype = self._content_type
self.headers[CONTENT_TYPE] = ctype

def _do_start_compression(self, coding):
async def _do_start_compression(self, coding):
if coding != ContentCoding.identity:
self.headers[hdrs.CONTENT_ENCODING] = coding.value
self._payload_writer.enable_compression(coding.value)
# Compressed payload may have different content length,
# remove the header
self._headers.popall(hdrs.CONTENT_LENGTH, None)

def _start_compression(self, request):
async def _start_compression(self, request):
if self._compression_force:
self._do_start_compression(self._compression_force)
await self._do_start_compression(self._compression_force)
else:
accept_encoding = request.headers.get(
hdrs.ACCEPT_ENCODING, '').lower()
for coding in ContentCoding:
if coding.value in accept_encoding:
self._do_start_compression(coding)
await self._do_start_compression(coding)
return

async def prepare(self, request):
Expand Down Expand Up @@ -331,7 +333,7 @@ async def _start(self, request,
headers.add(SET_COOKIE, value)

if self._compression:
self._start_compression(request)
await self._start_compression(request)

if self._chunked:
if version != HttpVersion11:
Expand Down Expand Up @@ -450,7 +452,7 @@ def __init__(self, *,
text: Optional[str]=None,
headers: Optional[LooseHeaders]=None,
content_type: Optional[str]=None,
charset: Optional[str]=None) -> None:
charset: Optional[str]=None, zlib_thread_size=None) -> None:
if body is not None and text is not None:
asvetlov marked this conversation as resolved.
Show resolved Hide resolved
raise ValueError("body and text are not allowed together")

Expand Down Expand Up @@ -504,6 +506,7 @@ def __init__(self, *,
self.body = body

self._compressed_body = None
self._zlib_thread_size = zlib_thread_size

@property
def body(self):
Expand Down Expand Up @@ -590,7 +593,9 @@ def content_length(self):
def content_length(self, value):
raise RuntimeError("Content length is set automatically")

async def write_eof(self):
async def write_eof(self, data=b''):
assert not data

if self._eof_sent:
return
if self._compressed_body is not None:
Expand All @@ -609,27 +614,36 @@ async def write_eof(self):
else:
await super().write_eof()

async def _start(self, request):
async def _start(self, request, *args, **kwargs):
if not self._chunked and hdrs.CONTENT_LENGTH not in self._headers:
if not self._body_payload:
if self._body is not None:
self._headers[hdrs.CONTENT_LENGTH] = str(len(self._body))
else:
self._headers[hdrs.CONTENT_LENGTH] = '0'

return await super()._start(request)
return await super()._start(request, *args, **kwargs)

def _do_start_compression(self, coding):
def _compress_body(self, zlib_mode):
compressobj = zlib.compressobj(wbits=zlib_mode)
self._compressed_body = \
compressobj.compress(self._body) + compressobj.flush()

async def _do_start_compression(self, coding):
if self._body_payload or self._chunked:
return super()._do_start_compression(coding)
return await super()._do_start_compression(coding)
if coding != ContentCoding.identity:
# Instead of using _payload_writer.enable_compression,
# compress the whole body
zlib_mode = (16 + zlib.MAX_WBITS
if coding.value == 'gzip' else -zlib.MAX_WBITS)
compressobj = zlib.compressobj(wbits=zlib_mode)
self._compressed_body = compressobj.compress(self._body) +\
compressobj.flush()
if self._zlib_thread_size is not None and \
len(self._body) > self._zlib_thread_size:
await asyncio.get_event_loop().run_in_executor(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wouldn't it be better to have a thread pool in place?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would. If someone will set default executor as mutiprocessing one he will get very surprised by aiohttp behaviour. However, inplace thread pool will cause DoS situation - you need to limit overall threads by some sane number.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Multiprocessing pool as default is deprecated by asyncio: it doesn't work well with API like loop.getaddrinfo() anyway.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps ability to pass in thread pool? This can get complicated later if you want to support multiple pools

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make sense. Feel free to update the PR.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@asvetlov added

None, self._compress_body, zlib_mode)
else:
self._compress_body(zlib_mode)

self._headers[hdrs.CONTENT_ENCODING] = coding.value
self._headers[hdrs.CONTENT_LENGTH] = \
str(len(self._compressed_body))
Expand All @@ -647,3 +661,27 @@ def json_response(data=sentinel, *, text=None, body=None, status=200,
text = dumps(data)
return Response(text=text, body=body, status=status, reason=reason,
headers=headers, content_type=content_type)


async def async_json_response(data=sentinel, *, text=None, body=None,
asvetlov marked this conversation as resolved.
Show resolved Hide resolved
status=200, reason=None, headers=None,
content_type='application/json',
dumps=json.dumps, executor_body_size=None):
if data is not sentinel:
if text or body:
raise ValueError(
"only one of data, text, or body should be specified"
)
else:
if asyncio.iscoroutinefunction(dumps):
text = await dumps(data)
elif executor_body_size is not None and \
len(data) > executor_body_size:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was curious to try your patch "at home" and found that this check actually ruins all the idea. Imagine the following data: {"users": [{"id": 1, "name": ..., ...few more 50 fields}, ..., ..about 100 more items]}. In this case len will return 1 all the time, but response data is big enough.

Instead, I was able to make async dump easily in place where I'm sure big data will happens and this async dump will be useful. On my application side it's easy to do: I know my data and how big it could be. On aiohttp side it's hard to figure out without tricky inspections which will may negotiate all the profit of this feature.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

M/b try (recursive) sizeof?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sizeof will require you somehow convert objects memory usage into string length, which wouldn't be trivial. And all this complexity just to prevent users explicitly define behaviour on their side with two lines of code?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I agree with users needing to just opt-in by themselves.
As for converting to string length, it's not needed, just bytes it occupies in memory would be enough :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hah! wow that was a huge oversight, thanks...however thinking about this, I still need something like a sizeof, I don't know how large the object is without doing the dump ;) I'll have to think about this for a bit

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removing this from this PR, thanks guys and sorry for the distraction...however it remains a problem to be solved. Given a dict, to programmatically determine if it should be dumped in a thread or not. I suppose you just need to decide if you always want to run it in a thread or not.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No worries :)
One of the solutions might be recursively accumulating the size of stuff in the nested object with interruption of the process once reached some threshold as a flag for compression being needed as an optimization.

loop = asyncio.get_event_loop()
text = await loop.run_in_executor(None, dumps, data)
else:
text = dumps(data)

return Response(text=text, body=body, status=status, reason=reason,
headers=headers, content_type=content_type,
zlib_thread_size=executor_body_size)
4 changes: 3 additions & 1 deletion docs/web_reference.rst
Original file line number Diff line number Diff line change
Expand Up @@ -805,7 +805,7 @@ Response
^^^^^^^^

.. class:: Response(*, body=None, status=200, reason=None, text=None, \
headers=None, content_type=None, charset=None)
headers=None, content_type=None, charset=None, zlib_thread_size=sentinel)

The most usable response class, inherited from :class:`StreamResponse`.

Expand All @@ -830,6 +830,8 @@ Response
:param str charset: response's charset. ``'utf-8'`` if *text* is
passed also, ``None`` otherwise.

:param int zlib_thread_size: length in bytes which will trigger zlib compression
of body to happen in the default executor

.. attribute:: body

Expand Down
44 changes: 43 additions & 1 deletion tests/test_web_response.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import collections
import datetime
import gzip
import json
import re
from unittest import mock
Expand All @@ -10,7 +11,8 @@
from aiohttp import HttpVersion, HttpVersion10, HttpVersion11, hdrs, signals
from aiohttp.payload import BytesPayload
from aiohttp.test_utils import make_mocked_coro, make_mocked_request
from aiohttp.web import ContentCoding, Response, StreamResponse, json_response
from aiohttp.web import (ContentCoding, Response, StreamResponse,
async_json_response, json_response)


def make_request(method, path, headers=CIMultiDict(),
Expand Down Expand Up @@ -416,6 +418,18 @@ async def test_force_compression_no_accept_gzip():
assert 'gzip' == resp.headers.get(hdrs.CONTENT_ENCODING)


async def test_change_content_threaded_compression_enabled():
req = make_request('GET', '/')
body_thread_size = 1024
body = b'answer' * body_thread_size
resp = Response(body=body,
zlib_thread_size=body_thread_size)
resp.enable_compression(ContentCoding.gzip)

await resp.prepare(req)
assert gzip.decompress(resp._compressed_body) == body


async def test_change_content_length_if_compression_enabled():
req = make_request('GET', '/')
resp = Response(body=b'answer')
Expand Down Expand Up @@ -1103,6 +1117,34 @@ def test_response_with_content_length_header_without_body():
assert resp.content_length == 123


async def test_async_json_small_response():
text = 'jaysawn'
resp = await async_json_response(text=json.dumps(text))
assert resp.text == json.dumps(text)

resp = await async_json_response(text)
assert resp.text == json.dumps(text)

with pytest.raises(ValueError):
await async_json_response(text, body=text)


async def test_async_json_large_response():
cuttoff_length = 1024
text = 'ja' * cuttoff_length
resp = await async_json_response(text, executor_body_size=cuttoff_length)
assert resp.text == json.dumps(text)


async def test_async_json_coro_response():
async def dumps(data):
return json.dumps(data)

text = 'jaysawn'
resp = await async_json_response(text, dumps=dumps)
assert resp.text == json.dumps(text)


class TestJSONResponse:

def test_content_type_is_application_json_by_default(self):
Expand Down
2 changes: 1 addition & 1 deletion vendor/http-parser
Submodule http-parser updated 3 files
+34 −56 http_parser.c
+0 −3 http_parser.h
+84 −100 test.c