Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

body thread helpers #3205

Merged
merged 33 commits into from
Nov 30, 2018
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
bc42974
Merge remote-tracking branch 'aio-libs/master'
thehesiod Aug 20, 2018
fda17d8
ensure large body prep processes run in a worker thread
thehesiod Aug 20, 2018
bf156dc
add feature file
thehesiod Aug 20, 2018
4e6d0be
flake fix
thehesiod Aug 20, 2018
1de3235
fix based on review and add tests
thehesiod Aug 20, 2018
b921a22
fix tests and changes based on review
thehesiod Aug 20, 2018
a8b570d
switch to new attribute
thehesiod Aug 20, 2018
1474167
fix and add test
thehesiod Aug 20, 2018
391f995
bugfixes
thehesiod Aug 20, 2018
1f421e7
fix warning
thehesiod Aug 21, 2018
56d8235
fix import order
thehesiod Aug 21, 2018
32c8171
fix unittests
thehesiod Aug 21, 2018
82aac11
fix coverage by fixing tests
thehesiod Aug 22, 2018
82b3119
add another sub-test
thehesiod Aug 22, 2018
ab71cea
Merge remote-tracking branch 'aio-libs/master' into thehesiod-thread-…
thehesiod Sep 24, 2018
d209ddb
Merge remote-tracking branch 'aio-libs/master' into thehesiod-thread-…
thehesiod Oct 4, 2018
d07be7d
remove class which never worked correctly
thehesiod Oct 4, 2018
1445324
fix imports
thehesiod Oct 4, 2018
2f50597
Merge remote-tracking branch 'aio-libs/master' into thehesiod-thread-…
thehesiod Oct 8, 2018
1a501dd
revert change?
thehesiod Oct 8, 2018
58b6f27
reword
thehesiod Oct 8, 2018
2ce23cb
Merge branch 'master' into thehesiod-thread-helpers
asvetlov Nov 24, 2018
a0cf83b
Update web_response.py
asvetlov Nov 24, 2018
33a4818
Update web_response.py
asvetlov Nov 24, 2018
9f31a81
Merge branch 'master' into thehesiod-thread-helpers
asvetlov Nov 24, 2018
274c8bc
Update web_response.py
asvetlov Nov 25, 2018
781ec52
fix merge, rename param and add zlib_executor param
thehesiod Nov 26, 2018
b18bbe4
fixes
thehesiod Nov 26, 2018
eb8f03e
mypy fixes
thehesiod Nov 26, 2018
a4a09a1
fix import order
thehesiod Nov 27, 2018
3ae1854
merge fix
thehesiod Nov 27, 2018
6799795
feedback from review
thehesiod Nov 30, 2018
fc57bcb
feedback from review
thehesiod Nov 30, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGES/3205.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Ensure that large compression and json processes run in a background thread to avoid blocking the main thread and potentially trigger health check failures.
61 changes: 51 additions & 10 deletions aiohttp/web_response.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import asyncio
import collections
import datetime
import enum
Expand Down Expand Up @@ -33,6 +34,10 @@ class ContentCoding(enum.Enum):
# HTTP Response classes
############################################################

# Length in bytes of body which will trigger sync methods to process in a
# thread pool to avoid blocking the main thread
_BODY_LENGTH_THREAD_CUTOFF = 1024
asvetlov marked this conversation as resolved.
Show resolved Hide resolved


class StreamResponse(collections.MutableMapping, HeadersMixin):

Expand Down Expand Up @@ -271,23 +276,23 @@ def _generate_content_type_header(self, CONTENT_TYPE=hdrs.CONTENT_TYPE):
ctype = self._content_type
self.headers[CONTENT_TYPE] = ctype

def _do_start_compression(self, coding):
async def _do_start_compression(self, coding):
if coding != ContentCoding.identity:
self.headers[hdrs.CONTENT_ENCODING] = coding.value
self._payload_writer.enable_compression(coding.value)
# Compressed payload may have different content length,
# remove the header
self._headers.popall(hdrs.CONTENT_LENGTH, None)

def _start_compression(self, request):
async def _start_compression(self, request):
if self._compression_force:
self._do_start_compression(self._compression_force)
await self._do_start_compression(self._compression_force)
else:
accept_encoding = request.headers.get(
hdrs.ACCEPT_ENCODING, '').lower()
for coding in ContentCoding:
if coding.value in accept_encoding:
self._do_start_compression(coding)
await self._do_start_compression(coding)
return

async def prepare(self, request):
Expand Down Expand Up @@ -326,7 +331,7 @@ async def _start(self, request,
headers.add(SET_COOKIE, value)

if self._compression:
self._start_compression(request)
await self._start_compression(request)

if self._chunked:
if version != HttpVersion11:
Expand Down Expand Up @@ -578,7 +583,9 @@ def content_length(self):
def content_length(self, value):
raise RuntimeError("Content length is set automatically")

async def write_eof(self):
async def write_eof(self, data=b''):
assert not data

if self._eof_sent:
return
if self._compressed_body is not None:
Expand All @@ -597,26 +604,38 @@ async def write_eof(self):
else:
await super().write_eof()

async def _start(self, request):
async def _start(self, request, *args, **kwargs):
if not self._chunked and hdrs.CONTENT_LENGTH not in self._headers:
if not self._body_payload:
if self._body is not None:
self._headers[hdrs.CONTENT_LENGTH] = str(len(self._body))
else:
self._headers[hdrs.CONTENT_LENGTH] = '0'

return await super()._start(request)
return await super()._start(request, *args, **kwargs)

def _do_start_compression(self, coding):
def _compress_body(self, zlib_mode):
compressobj = zlib.compressobj(wbits=zlib_mode)
self._compressed_body = \
compressobj.compress(self._body) + compressobj.flush()

async def _do_start_compression(self, coding):
if self._body_payload or self._chunked:
return super()._do_start_compression(coding)
if coding != ContentCoding.identity:
# Instead of using _payload_writer.enable_compression,
# compress the whole body
zlib_mode = (16 + zlib.MAX_WBITS
if coding.value == 'gzip' else -zlib.MAX_WBITS)
if len(self._body) > _BODY_LENGTH_THREAD_CUTOFF:
loop = asyncio.get_event_loop()
await loop.run_in_executor(None,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if default executor will be process one? That may hurt instead of making things better.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that's already a problem in aiohttp's use of getaddrinfo

self._compress_body(zlib_mode))
asvetlov marked this conversation as resolved.
Show resolved Hide resolved
else:
self._compress_body(zlib_mode)

compressobj = zlib.compressobj(wbits=zlib_mode)
self._compressed_body = compressobj.compress(self._body) +\
self._compressed_body = compressobj.compress(self._body) + \
asvetlov marked this conversation as resolved.
Show resolved Hide resolved
compressobj.flush()
self._headers[hdrs.CONTENT_ENCODING] = coding.value
self._headers[hdrs.CONTENT_LENGTH] = \
Expand All @@ -635,3 +654,25 @@ def json_response(data=sentinel, *, text=None, body=None, status=200,
text = dumps(data)
return Response(text=text, body=body, status=status, reason=reason,
headers=headers, content_type=content_type)


async def async_json_response(data=sentinel, *, text=None, body=None,
asvetlov marked this conversation as resolved.
Show resolved Hide resolved
status=200, reason=None, headers=None,
content_type='application/json',
dumps=json.dumps):
if data is not sentinel:
if text or body:
raise ValueError(
"only one of data, text, or body should be specified"
)
else:
if asyncio.iscoroutine(dumps):
text = await dumps(data)
elif len(data) > _BODY_LENGTH_THREAD_CUTOFF:
loop = asyncio.get_event_loop()
text = await loop.run_in_executor(None, dumps, data)
else:
text = dumps(data)

return Response(text=text, body=body, status=status, reason=reason,
headers=headers, content_type=content_type)