Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

body thread helpers #3205

Merged
merged 33 commits into from
Nov 30, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
bc42974
Merge remote-tracking branch 'aio-libs/master'
thehesiod Aug 20, 2018
fda17d8
ensure large body prep processes run in a worker thread
thehesiod Aug 20, 2018
bf156dc
add feature file
thehesiod Aug 20, 2018
4e6d0be
flake fix
thehesiod Aug 20, 2018
1de3235
fix based on review and add tests
thehesiod Aug 20, 2018
b921a22
fix tests and changes based on review
thehesiod Aug 20, 2018
a8b570d
switch to new attribute
thehesiod Aug 20, 2018
1474167
fix and add test
thehesiod Aug 20, 2018
391f995
bugfixes
thehesiod Aug 20, 2018
1f421e7
fix warning
thehesiod Aug 21, 2018
56d8235
fix import order
thehesiod Aug 21, 2018
32c8171
fix unittests
thehesiod Aug 21, 2018
82aac11
fix coverage by fixing tests
thehesiod Aug 22, 2018
82b3119
add another sub-test
thehesiod Aug 22, 2018
ab71cea
Merge remote-tracking branch 'aio-libs/master' into thehesiod-thread-…
thehesiod Sep 24, 2018
d209ddb
Merge remote-tracking branch 'aio-libs/master' into thehesiod-thread-…
thehesiod Oct 4, 2018
d07be7d
remove class which never worked correctly
thehesiod Oct 4, 2018
1445324
fix imports
thehesiod Oct 4, 2018
2f50597
Merge remote-tracking branch 'aio-libs/master' into thehesiod-thread-…
thehesiod Oct 8, 2018
1a501dd
revert change?
thehesiod Oct 8, 2018
58b6f27
reword
thehesiod Oct 8, 2018
2ce23cb
Merge branch 'master' into thehesiod-thread-helpers
asvetlov Nov 24, 2018
a0cf83b
Update web_response.py
asvetlov Nov 24, 2018
33a4818
Update web_response.py
asvetlov Nov 24, 2018
9f31a81
Merge branch 'master' into thehesiod-thread-helpers
asvetlov Nov 24, 2018
274c8bc
Update web_response.py
asvetlov Nov 25, 2018
781ec52
fix merge, rename param and add zlib_executor param
thehesiod Nov 26, 2018
b18bbe4
fixes
thehesiod Nov 26, 2018
eb8f03e
mypy fixes
thehesiod Nov 26, 2018
a4a09a1
fix import order
thehesiod Nov 27, 2018
3ae1854
merge fix
thehesiod Nov 27, 2018
6799795
feedback from review
thehesiod Nov 30, 2018
fc57bcb
feedback from review
thehesiod Nov 30, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGES/3205.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add zlib_executor_size Response parameter to allow compression to run in a background executor to avoid blocking the main thread and potentially triggering health check failures.
42 changes: 31 additions & 11 deletions aiohttp/web_response.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import time
import warnings
import zlib
from concurrent.futures import Executor
from email.utils import parsedate
from http.cookies import SimpleCookie
from typing import (TYPE_CHECKING, Any, Dict, Iterator, Mapping, # noqa
Expand Down Expand Up @@ -310,7 +311,7 @@ def _generate_content_type_header(
ctype = self._content_type
self._headers[CONTENT_TYPE] = ctype

def _do_start_compression(self, coding: ContentCoding) -> None:
async def _do_start_compression(self, coding: ContentCoding) -> None:
if coding != ContentCoding.identity:
assert self._payload_writer is not None
self._headers[hdrs.CONTENT_ENCODING] = coding.value
Expand All @@ -319,15 +320,15 @@ def _do_start_compression(self, coding: ContentCoding) -> None:
# remove the header
self._headers.popall(hdrs.CONTENT_LENGTH, None)

def _start_compression(self, request: 'BaseRequest') -> None:
async def _start_compression(self, request: 'BaseRequest') -> None:
if self._compression_force:
self._do_start_compression(self._compression_force)
await self._do_start_compression(self._compression_force)
else:
accept_encoding = request.headers.get(
hdrs.ACCEPT_ENCODING, '').lower()
for coding in ContentCoding:
if coding.value in accept_encoding:
self._do_start_compression(coding)
await self._do_start_compression(coding)
return

async def prepare(
Expand Down Expand Up @@ -359,7 +360,7 @@ async def _start(self, request: 'BaseRequest') -> AbstractStreamWriter:
headers.add(hdrs.SET_COOKIE, value)

if self._compression:
self._start_compression(request)
await self._start_compression(request)

if self._chunked:
if version != HttpVersion11:
Expand Down Expand Up @@ -479,7 +480,9 @@ def __init__(self, *,
text: Optional[str]=None,
headers: Optional[LooseHeaders]=None,
content_type: Optional[str]=None,
charset: Optional[str]=None) -> None:
charset: Optional[str]=None,
zlib_executor_size: Optional[int]=None,
zlib_executor: Executor=None) -> None:
if body is not None and text is not None:
asvetlov marked this conversation as resolved.
Show resolved Hide resolved
raise ValueError("body and text are not allowed together")

Expand Down Expand Up @@ -533,6 +536,8 @@ def __init__(self, *,
self.body = body

self._compressed_body = None # type: Optional[bytes]
self._zlib_executor_size = zlib_executor_size
self._zlib_executor = zlib_executor

@property
def body(self) -> Optional[Union[bytes, Payload]]:
Expand Down Expand Up @@ -652,19 +657,34 @@ async def _start(self, request: 'BaseRequest') -> AbstractStreamWriter:

return await super()._start(request)

def _do_start_compression(self, coding: ContentCoding) -> None:
def _compress_body(self, zlib_mode: int) -> None:
compressobj = zlib.compressobj(wbits=zlib_mode)
body_in = self._body
assert body_in is not None
self._compressed_body = \
compressobj.compress(body_in) + compressobj.flush()

async def _do_start_compression(self, coding: ContentCoding) -> None:
if self._body_payload or self._chunked:
return super()._do_start_compression(coding)
return await super()._do_start_compression(coding)

if coding != ContentCoding.identity:
# Instead of using _payload_writer.enable_compression,
# compress the whole body
zlib_mode = (16 + zlib.MAX_WBITS
if coding == ContentCoding.gzip else -zlib.MAX_WBITS)
compressobj = zlib.compressobj(wbits=zlib_mode)
body_in = self._body
assert body_in is not None
body_out = compressobj.compress(body_in) + compressobj.flush()
self._compressed_body = body_out
if self._zlib_executor_size is not None and \
len(body_in) > self._zlib_executor_size:
await asyncio.get_event_loop().run_in_executor(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wouldn't it be better to have a thread pool in place?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would. If someone will set default executor as mutiprocessing one he will get very surprised by aiohttp behaviour. However, inplace thread pool will cause DoS situation - you need to limit overall threads by some sane number.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Multiprocessing pool as default is deprecated by asyncio: it doesn't work well with API like loop.getaddrinfo() anyway.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps ability to pass in thread pool? This can get complicated later if you want to support multiple pools

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make sense. Feel free to update the PR.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@asvetlov added

self._zlib_executor, self._compress_body, zlib_mode)
else:
self._compress_body(zlib_mode)

body_out = self._compressed_body
assert body_out is not None

self._headers[hdrs.CONTENT_ENCODING] = coding.value
self._headers[hdrs.CONTENT_LENGTH] = str(len(body_out))

Expand Down
12 changes: 11 additions & 1 deletion docs/web_reference.rst
Original file line number Diff line number Diff line change
Expand Up @@ -805,7 +805,8 @@ Response
^^^^^^^^

.. class:: Response(*, body=None, status=200, reason=None, text=None, \
headers=None, content_type=None, charset=None)
headers=None, content_type=None, charset=None, zlib_executor_size=sentinel,
zlib_executor=None)

The most usable response class, inherited from :class:`StreamResponse`.

Expand All @@ -830,6 +831,15 @@ Response
:param str charset: response's charset. ``'utf-8'`` if *text* is
passed also, ``None`` otherwise.

:param int zlib_executor_size: length in bytes which will trigger zlib compression
of body to happen in an executor

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please insert .. versionadded:: 3.5 directive here.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

aiohttp 3.5.0 was not released yet, only 3.5.0 alpha 0.
So please use just 3.5, the last zero should be omitted.

.. versionadded:: 3.5

:param int zlib_executor: executor to use for zlib compression
asvetlov marked this conversation as resolved.
Show resolved Hide resolved

.. versionadded:: 3.5


.. attribute:: body

Expand Down
28 changes: 28 additions & 0 deletions tests/test_web_response.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
import collections
import datetime
import gzip
import json
import re
from concurrent.futures import ThreadPoolExecutor
from unittest import mock

import pytest
Expand Down Expand Up @@ -418,6 +420,32 @@ async def test_force_compression_no_accept_gzip() -> None:
assert 'gzip' == resp.headers.get(hdrs.CONTENT_ENCODING)


async def test_change_content_threaded_compression_enabled() -> None:
req = make_request('GET', '/')
body_thread_size = 1024
body = b'answer' * body_thread_size
resp = Response(body=body,
zlib_executor_size=body_thread_size)
resp.enable_compression(ContentCoding.gzip)

await resp.prepare(req)
assert gzip.decompress(resp._compressed_body) == body


async def test_change_content_threaded_compression_enabled_explicit() -> None:
req = make_request('GET', '/')
body_thread_size = 1024
body = b'answer' * body_thread_size
with ThreadPoolExecutor(1) as executor:
resp = Response(body=body,
zlib_executor_size=body_thread_size,
zlib_executor=executor)
resp.enable_compression(ContentCoding.gzip)

await resp.prepare(req)
assert gzip.decompress(resp._compressed_body) == body


async def test_change_content_length_if_compression_enabled() -> None:
req = make_request('GET', '/')
resp = Response(body=b'answer')
Expand Down