Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Slow request body copy #2126

Merged
merged 10 commits into from
Nov 24, 2017
Merged

Conversation

arthurdarcet
Copy link
Contributor

When sending a request with a large body (e.g.session.post('…', data=b'0' * 2_000_000_000),) PayloadWriter._write is taking very long (5+ seconds for a 2G body).

This is especially problematic because this locks the event loop and nothing else gets executed (so if you are running this in a request handler, the whole server becomes unresponsive).

As far as I understand this, there is no need to actually copy the data before passing it to the underlying asyncio transport: its write method handles fragmented chunks and we spare a big ''.join(buffer).

This leads to a x2 speed up for the actual write, what ever the actual body size is (I tested with bodies from 2KB to 2GB)

What do these changes do?

When the asyncio transport is open, leave the buffering to the OS and write data directly to it.
I have also factorised some code, to keep the _buffer / _transport variables in a clear state (exactly one of them is None at all time)

Are there changes in behavior for the user?

The headers are not buffered until the first byte of the body is ready if the transport is already there. I'm not sure why this was done, I can't find anything related to that in the git history. @fafhrd91, since this is your code, any idea why sending the headers was buffered even when the transport is ready?

Also, see the changed test: some user might get a ClientPayloadError instead of a ServerDisconnectedError since the headers are now sent even if write_eof raises. This is mostly a side effect of the tests: in real life I would expect users already receiving a bit of both ClientPayloadError and ServerDisconnectedError for this use case (depending on when exactly the connection was cut off).

@arthurdarcet
Copy link
Contributor Author

Also, kind of related:
I have not seen any section of the documentation relating to large request/response bodies, both from the server and client point of view.
There are some gotchas when dealing with this in production, to avoid locking the event loop and the server, and it would be nice to gather a list of best practice on how to handle heavy requests.
This section could also mention that doing CPU-intensive jobs on the main thread/process will lock the GIL, the event loop, and the server which might be unexpected for someone coming from multi-threaded frameworks like Flask/…

@arthurdarcet arthurdarcet force-pushed the big-request-body branch 2 times, most recently from 08b4965 to 4337bd1 Compare July 24, 2017 15:29
@arthurdarcet
Copy link
Contributor Author

And lastly, using session.post('…', data=io.BytesIO(…)) makes everything work correctly, but if I'm not mistaken, this isn't documented anywhere (i had to read the code to see that chunking was happening for BytesIO and not for raw bytes)

I think one of three things should be done:

  • This should be documented somewhere, probably in this same "big requests" section of the docs I mentioned earlier
  • The payload.BytesPayload class should do some chunking automatically in my opinion, but I can understand that it makes sense to reserve that for BytesIO payloads.
  • The BytesPayload class could log a warning when a payload above ~1MB is passed directly (something like 2 * streams.DEFAULT_LIMIT?)

Let me know which solution(s) sound good, and I can open PR for them

@@ -129,7 +129,7 @@ class PayloadWriter(AbstractPayloadWriter):

def __init__(self, stream, loop, acquire=True):
self._stream = stream
self._transport = None
self._transport, self._buffer = None, []
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use separate lines

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

self._compress = None
self._drain_waiter = None

if self._stream.available:
self._transport = self._stream.transport
self._transport, self._buffer = self._stream.transport, None
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

separate lines

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

with pytest.raises(client.ServerDisconnectedError):
yield from cli.get('/path/to')
resp = yield from cli.get('/path/to')
with pytest.raises(client.ClientPayloadError):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why exception is different?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

until now, the headers were buffered and not sent to the transport until the first byte of the body was written.

I changed that to send the headers immediately if the transport is ready when write_headers is called; it impacts this test because if the server cancels the write_eof, the headers are still already sent, so the client does not see the ServerDisconnectedError and instead I have to try to read the body to get a ClientPayloadError (the server did not send a complete body)

see the last paragraph of the issue

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh, I see. reason is performance for small responses. for small response it usually one syscall, with your change it is two. so I'd like to measure performance on small responses.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

other note. buffering is needed because of http pipelining. but if we decide on #2109 then it may go away.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See #2179 for some benchmarks. I'm not really sure how to benchmark the syscalls, but impact seems negligible

@fafhrd91
Copy link
Member

regarding payloads. I agree. you are welcome to make PR!

@arthurdarcet
Copy link
Contributor Author

@fafhrd91 you agree with using chunking for BytesPayload, or with logging a warning for large bytes payloads?

@fafhrd91
Copy link
Member

all tree. but I am not sure if chunking in BytesPayload makes sense.

@arthurdarcet
Copy link
Contributor Author

i would say, either we handle the chunking in BytesPayload and it does not really make sense to log a warning (nothing bad would be happening with big payloads); or we do not do any chunking there (like now), and we log something to warn the user that sending big payloads as bytes directly is a bad idea

For the documentation, I … don't really have the time to write it… but I'm planning on writing a blog article related to our experience with all this. I think it will be easy to extract some guidelines from there and i will open a doc PR when i'm ready. Do you want me to open another pending issue to track this until then?

@fafhrd91
Copy link
Member

we have already doc related issue for payload system.

lets add log to BytesPayload.

@fafhrd91
Copy link
Member

@asvetlov any comments?

@codecov-io
Copy link

codecov-io commented Jul 24, 2017

Codecov Report

Merging #2126 into master will increase coverage by 0.15%.
The diff coverage is 93.1%.

Impacted file tree graph

@@            Coverage Diff             @@
##           master    #2126      +/-   ##
==========================================
+ Coverage   97.09%   97.24%   +0.15%     
==========================================
  Files          40       40              
  Lines        8153     8134      -19     
  Branches     1439     1432       -7     
==========================================
- Hits         7916     7910       -6     
+ Misses        100       90      -10     
+ Partials      137      134       -3
Impacted Files Coverage Δ
aiohttp/web_fileresponse.py 97.91% <90%> (+7.25%) ⬆️
aiohttp/http_writer.py 96.15% <94.73%> (+0.76%) ⬆️

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update 84825da...669d476. Read the comment docs.

transport.write(chunk)
self._buffer.clear()
if self._buffer is not None:
for b in self._buffer:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I prefer joining all chunks in buffer and sending the whole at once.
It might be only one syscall with very high chance to pass data into kernel without need for loop.add_writer().

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if i'm understanding this correctly, the buffer is usually just one item, the headers, so sending them in a for loop or joining won't change much.

The tradeoff however is when the buffer is exactly a small chunk (the headers) and one very large chunk (the body). This will copy the body to create a slightly larger chunk and will be much slower than calling write twice.

I have no idea of how the internals of asyncio works, so I'll of course defer to you here ; but do we have benchmarks for this?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, nevermind.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i'll try to write some benchmark of how multiple syscalls behave. I'm pretty sure the transport is almost always set before the body is written, so it's not really buffered here.

The main optimisation in all this is sending the buffered data as soon as the transport is set. Right now, the buffered data waits for the next chunk, and then is joined with this new chunk and everything is sent together. That behaves very poorly with a large body because it means copying everything to avoid a second syscall. For very small buffers it probably makes way more sense to bundle everything and do only one syscall

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we check size of chunk and do join for small chunk?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

checking the size of the chunk would mean buffering the headers until the first chunk is written - as is done currently. I'm not a big fan of that because it makes the code really more complicated (you might end up having data buffered and a transport already set). Plus I don't think the syscall matters that much

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any news about benchmarks?

P.S. Let's avoid single char sized variables.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Benchmarks were added in a separate pull request: #2179

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Awesome! Thanks!

@@ -130,6 +130,7 @@ class PayloadWriter(AbstractPayloadWriter):
def __init__(self, stream, loop, acquire=True):
self._stream = stream
self._transport = None
self._buffer = []
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Side note: asyncio streams switched from list to bytearray for sake of speed, maybe we need it too.
Feel free to create a PR for this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if i remember correctly, aiohttp switched from bytearrays to list for performance reasons too, so this probably need to be benchmarked…

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We just follow asyncio streams design, it switched from bytearrays to lists and back to bytearrays :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried to benchmark bytearray vs join in #2179, but the results were inconclusive at best (or lean in favor of b''.join). I'll try to dig up why asyncio changed again to bytearray

self._drain_waiter = create_future(self.loop)
yield from self._drain_waiter

assert self._transport is not None
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What could reset a transport to None?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nothing that i can think of, i was using the asserts as safeguards/way to comment the code. I'll remove them

self._buffer.clear()
else:
self._transport.write(chunk)
assert self._buffer is None
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Assertion again.
Well, if you do think that something may go wrong -- let's keep the assertion (and previous one) for a while and drop them later.
But please add comments like "FIXME: xxxx"


self._eof = True
self._transport = None
self._stream.release()

@asyncio.coroutine
def drain(self, last=False):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pretty like the dropping last parameter

if not waiter.done():
waiter.set_result(None)
def __init__(self, *args, **kwargs):
self.__buffer = []
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's mostly taste question but why don't invite new name for this buffer instead of mangling?
We never use mangling in aiohttp.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, you're perfectly right, i'll update the PR with a new name instead of mangling

@arthurdarcet
Copy link
Contributor Author

@asvetlov @fafhrd91 Could you have a look at this? Are there still issue blocking this PR?

@fafhrd91
Copy link
Member

fafhrd91 commented Nov 6, 2017

I can not comment on this, I didn't work on aiohttp for long time. this is question for other contributors.

@asvetlov @kxepal

btw can we ping contributors somehow?

@fafhrd91 fafhrd91 requested a review from kxepal November 6, 2017 17:05
@asvetlov
Copy link
Member

asvetlov commented Nov 6, 2017

I'll try to find a time tomorrow maybe

@aio-libs/aiohttp-committers should work for pinging all aiohttp committers in the list

Copy link
Member

@kxepal kxepal left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Basically, LGFM, except few code bits, but need make more comperhensive review to be sure. Anyway, it's a good call, @arthurdarcet, to improve performance!

transport.write(chunk)
self._buffer.clear()
if self._buffer is not None:
for b in self._buffer:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any news about benchmarks?

P.S. Let's avoid single char sized variables.

self._buffer.clear()
yield from self._stream.drain()
print(self._stream)
print(self._stream.drain)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

print

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oops, thanks, fixed

@@ -710,7 +710,7 @@ def connection_lost(self, exc):
await r.read()
self.assertEqual(1, len(connector._conns))

with self.assertRaises(aiohttp.ServerDisconnectedError):
with self.assertRaises(aiohttp.ClientConnectionError):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why exception had changed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the headers are now sent even if write_eof raises. This is mostly a side effect of the tests: in real life I would expect users already receiving a bit of both ClientPayloadError and ServerDisconnectedError for this use case (depending on when exactly the connection was cut off).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, thanks for explanation!

self._compress = None
self._drain_waiter = None

if self._stream.available:
self._transport = self._stream.transport
self._buffer = None
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's not good idea to jungle variable type. How empty buffer [] is different from None?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm using [] and None a bit differently here: once the buffer has been emptied into the new transport, it should never be used again. So I would prefer setting it to None and crash if something very wrong happens

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm...I see. Well, then such crash shold be controlled, imho. Otherwise it would be hard to distinguish transport re-use case from else error.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's purely internal to PayloadWriter, so everything is controlled where it should be.

This None means that at any given time, exactly one of (self._transport, self._buffer) is None. It makes it easier to check the code / prove that it's correct.

@arthurdarcet
Copy link
Contributor Author

I've rebase this against master, again. Could we reach a decision here?

@asvetlov @kxepal

@asvetlov
Copy link
Member

Looks good.
Could you add CHANGES/2126.feature changelog note.

@arthurdarcet
Copy link
Contributor Author

@asvetlov sure, thanks, just pushed the commit. let me know if the message isn't informative enough

@asvetlov
Copy link
Member

Let's wait for a while for feedback from other devs.

@asvetlov
Copy link
Member

Looks like nobody objects, let's go ahead.

@arthurdarcet I've dropped tests/test_client_functional_oldstyle.py and moved all remaining tests into tests/test_client_functional.py.
Could you solve resolved conflicts?
Also 100% test coverage would be awesome.

@arthurdarcet
Copy link
Contributor Author

arthurdarcet commented Nov 24, 2017

@asvetlov

  • rebase is done

  • I pushed two more tests, that cover two branches in PayloadWriter. The only things left untested in this class are the tcp cork related functions (i have no idea what that is, so i'm not sure i should try to write the tests...) and an edge case where the PayloadWriter receives two large concurrent writes before the transport is set (i could test this with a manual calls to PayloadWriter.get_transport, but that's not really "public-api").

  • I removed a "if" that wasn't covered with 78eb7b4 . I'm pretty confident it was impossible for the if condition to become false, but please review this commit

(Also, sorry I push over you commit fixing my 2126.feature file and I haven't found a way to cherry pick your commit back, so I rewrote your changes in my original commit)

… is set back to None at the same time. So self._drain_waiter can never be done()
@asvetlov
Copy link
Member

@arthurdarcet thanks.
For your information: rebase is not required, just merge is enough. The PR is squashed on merging anyway.

@lock
Copy link

lock bot commented Oct 28, 2019

This thread has been automatically locked since there has not been any recent activity after it was closed. Please open a [new issue] for related bugs.
If you feel like there's important points made in this discussion, please include those exceprts into that [new issue].
[new issue]: https://github.com/aio-libs/aiohttp/issues/new

@lock lock bot added the outdated label Oct 28, 2019
@lock lock bot locked as resolved and limited conversation to collaborators Oct 28, 2019
@psf-chronographer psf-chronographer bot added the bot:chronographer:provided There is a change note present in this PR label Oct 28, 2019
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
bot:chronographer:provided There is a change note present in this PR outdated
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants