-
-
Notifications
You must be signed in to change notification settings - Fork 2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
speed up connector limiting #2937
Changes from all commits
a647b47
dd56884
5d681f6
2b90858
b16fafe
d78d618
f8d1b60
438528c
5bff7b8
6ecd39d
de6043d
8ab132d
ebdbe9f
0a594bd
bdff085
e48fe4a
9494cb2
f157307
cf93f57
37dd8ac
9cc1b73
dd03363
5143a12
66333b5
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
Speed up connector limiting |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -3,7 +3,7 @@ | |
import sys | ||
import traceback | ||
import warnings | ||
from collections import defaultdict | ||
from collections import defaultdict, deque | ||
from contextlib import suppress | ||
from http.cookies import SimpleCookie | ||
from itertools import cycle, islice | ||
|
@@ -181,7 +181,9 @@ def __init__(self, *, keepalive_timeout=sentinel, | |
self._acquired_per_host = defaultdict(set) | ||
self._keepalive_timeout = keepalive_timeout | ||
self._force_close = force_close | ||
self._waiters = defaultdict(list) | ||
|
||
# {host_key: FIFO list of waiters} | ||
self._waiters = defaultdict(deque) | ||
|
||
self._loop = loop | ||
self._factory = functools.partial(ResponseHandler, loop=loop) | ||
|
@@ -392,12 +394,19 @@ async def connect(self, req, traces=None): | |
|
||
try: | ||
await fut | ||
finally: | ||
# remove a waiter even if it was cancelled | ||
waiters.remove(fut) | ||
except BaseException: | ||
# remove a waiter even if it was cancelled, normally it's | ||
# removed when it's notified | ||
try: | ||
waiters.remove(fut) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I just checked and there is no |
||
except ValueError: # fut may no longer be in list | ||
pass | ||
|
||
if not waiters: | ||
del self._waiters[key] | ||
|
||
raise | ||
|
||
if traces: | ||
for trace in traces: | ||
await trace.send_connection_queued_end() | ||
|
@@ -423,10 +432,8 @@ async def connect(self, req, traces=None): | |
except BaseException: | ||
# signal to waiter | ||
if key in self._waiters: | ||
for waiter in self._waiters[key]: | ||
if not waiter.done(): | ||
waiter.set_result(None) | ||
break | ||
waiters = self._waiters[key] | ||
self._release_key_waiter(key, waiters) | ||
raise | ||
finally: | ||
if not self._closed: | ||
|
@@ -470,25 +477,34 @@ def _get(self, key): | |
del self._conns[key] | ||
return None | ||
|
||
def _release_key_waiter(self, key, waiters): | ||
if not waiters: | ||
return False | ||
|
||
waiter = waiters.popleft() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Umm, you have to persist till reaches a waiter not done or you run out of waiters, see the implementation of CPython [1]. [1] https://github.com/python/cpython/blob/master/Lib/asyncio/locks.py#L450 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. are you saying the old implementation was wrong: https://github.com/aio-libs/aiohttp/pull/2937/files#diff-7f25afde79f309e2f8722c26cf1f10adL481 ? I don't believe this is the case. There are two ways a waiter can be removed:
What you describe would create a double release for 1.i. This is in fact the scenario you before alluded to There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, the issue was already there, indeed I can see the following issues with the code that we have in master:
So we have to fix them, but true that they were already there and would be nice if we decouple both things. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ya I have a feeling this is the tip of the iceberg :) I have a strong suspicious there's a leak in aiohttp or something aiohttp uses as right now we're leaking ~40MB / week in prod |
||
if not waiter.done(): | ||
waiter.set_result(None) | ||
|
||
if not waiters: | ||
del self._waiters[key] | ||
|
||
return True | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If you don't do anything with the result, don't implement a result at all, it's like dead code. This is just a small remark There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ? result is used in There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ah I see sry |
||
|
||
def _release_waiter(self): | ||
# always release only one waiter | ||
|
||
if self._limit: | ||
# if we have limit and we have available | ||
if self._limit - len(self._acquired) > 0: | ||
for key, waiters in self._waiters.items(): | ||
if waiters: | ||
if not waiters[0].done(): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. with the new model we can guarantee that there are only active waiters in the list so we can greatly simplify this to always popping and notifying the first item in the list |
||
waiters[0].set_result(None) | ||
if self._release_key_waiter(key, waiters): | ||
break | ||
|
||
elif self._limit_per_host: | ||
# if we have dont have limit but have limit per host | ||
# then release first available | ||
for key, waiters in self._waiters.items(): | ||
if waiters: | ||
if not waiters[0].done(): | ||
waiters[0].set_result(None) | ||
if self._release_key_waiter(key, waiters): | ||
break | ||
|
||
def _release_acquired(self, key, proto): | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is slow because it's trying to find a future in a list for each request that's waiting on a connector