Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

speed up connector limiting #2937

Merged
merged 24 commits into from
Apr 27, 2018
Merged
Changes from 15 commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 34 additions & 17 deletions aiohttp/connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import sys
import traceback
import warnings
from collections import defaultdict
from collections import defaultdict, deque
from contextlib import suppress
from http.cookies import SimpleCookie
from itertools import cycle, islice
Expand Down Expand Up @@ -181,7 +181,11 @@ def __init__(self, *, keepalive_timeout=sentinel,
self._acquired_per_host = defaultdict(set)
self._keepalive_timeout = keepalive_timeout
self._force_close = force_close
self._waiters = defaultdict(list)

# {host_key: FIFO list of waiters}
# NOTE: this is not a true FIFO because the true order is lost amongst
# the dictionary keys
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would suggest using the deque [1] data strucutre that is the one used by al of the implementations of asyncio.locks. More likely because lists have the following constraint:

lists incur O(n) memory movement costs for pop(0)

[1] https://github.com/python/cpython/blob/master/Lib/asyncio/locks.py#L437

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

great idea, done, slight speed improvement :)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess that this can be removed, the FIFO is for each dictionary key. Different keys mean different tuple values of (hosts, port) which FIFO does not make sense.

Copy link
Contributor Author

@thehesiod thehesiod Apr 19, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

each deque is indeed a FIFO as the first one in will be the first one out (amongst items in that list), however across keys it's not a FIFO because it currently iterates across keys (which theoretically can be in any order) when choosing from which FIFO to release.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Which deque to release is not a random choice and its based on the hash of the host and port, so those connections that are waiting for a free connection and match the host and port will share the same deque in a FIFO way.

Yes we are saying the same, the dictionary is just the structure that keeps all of the FIFO queues.

Let's save the comments for what is really not understandable.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't say it was random, I said it wasn't a true FIFO queue because it's choosing which queue to release a connector from in dictionary key order, and not in FIFO order. Anyways, removed the comment and people will have to figure this out themselves now. If this were to be "correct" there would need to be a separate priority queue with pointers back to these queues....or perhaps a multiindex priority queue :)

self._waiters = defaultdict(deque)

self._loop = loop
self._factory = functools.partial(ResponseHandler, loop=loop)
Expand Down Expand Up @@ -392,11 +396,18 @@ async def connect(self, req, traces=None):

try:
await fut
finally:
# remove a waiter even if it was cancelled
waiters.remove(fut)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is slow because it's trying to find a future in a list for each request that's waiting on a connector

if not waiters:
del self._waiters[key]
except BaseException:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I don't understand why the deletion is moved from finally to except.
Why shouldn't we remove the waiter if no exceptions was raised?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@asvetlov this is for two reasons: performance, and that if no exception is thrown the removal happened by the release method.

# remove a waiter even if it was cancelled, normally it's
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if the future has been canceled, we do need to wake up another waiter. Take a look at the semaphore implementation [1]

[1] https://github.com/python/cpython/blob/master/Lib/asyncio/locks.py#L478

Copy link
Contributor Author

@thehesiod thehesiod Apr 14, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this method created the future, why would we need to wake up another waiter? That doesn't make sense as it would imply yet another connector is available. This is 1-1, one waiter was added, one removed. Also note that code is if the future was not cancelled, in this scenario it can only be cancelled

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My mistake, the wake up will be done automariaclly by the exit of the context manager in whatever scenario. So forget about this

# removed when it's notified
try:
waiters.remove(fut)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just checked and there is no remove method in defaultdict.


if not waiters:
del self._waiters[key]
except ValueError: # fut may no longer be in list
pass
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Be careful with that, in case a ValueError exception you will mask the original one

try:
    val = 1 / 0
except Exception:
    try:
        a = {}
        a['foo']
    except KeyError:
        pass
    raise

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with @pfreixes , try to scope what you are catching as much as possible.

This is much better imo:

try:
    ...

    if not waiters:
        try:
            del self._waiters[key]
        except ValueError:
            pass

    ...

except:
    ...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see what you're saying here, waiters.remove(fut) throws the ValueError, del self._waiters[key] could throw a KeyError, not another ValueError. Not going to change this unless it's really needed.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Read my first comment, unless you make it explicit with a raise from e, the second try/except masks the original exception in case of ValueError.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

gotchya, thanks


raise

if traces:
for trace in traces:
Expand All @@ -423,10 +434,8 @@ async def connect(self, req, traces=None):
except BaseException:
# signal to waiter
if key in self._waiters:
for waiter in self._waiters[key]:
if not waiter.done():
waiter.set_result(None)
break
waiters = self._waiters[key]
self._release_key_waiter(key, waiters)
raise
finally:
if not self._closed:
Expand Down Expand Up @@ -470,25 +479,33 @@ def _get(self, key):
del self._conns[key]
return None

def _release_key_waiter(self, key, waiters):
if not waiters:
return False

waiter = waiters.popleft()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Umm, you have to persist till reaches a waiter not done or you run out of waiters, see the implementation of CPython [1].

[1] https://github.com/python/cpython/blob/master/Lib/asyncio/locks.py#L450

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

are you saying the old implementation was wrong: https://github.com/aio-libs/aiohttp/pull/2937/files#diff-7f25afde79f309e2f8722c26cf1f10adL481 ? I don't believe this is the case. There are two ways a waiter can be removed:

  1. An exception happened while waiting (in exception handler)
    1. a release was dispatched for said waiter (someone will see a release)
  2. through this method

What you describe would create a double release for 1.i. This is in fact the scenario you before alluded to

Copy link
Contributor

@pfreixes pfreixes Apr 26, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, the issue was already there, indeed I can see the following issues with the code that we have in master:

  • The iteration till reach a none canceled waiter has to be done through all of the items of a list, right now is only done on top of the head of each list.
  • Each time that we try to release a waiter we have to calculate if the limit and the number of concurrent connections allows us to make it. This is done only in one when the release_waiter is called explicitly but not when we had an exception trying to make the connection.
  • The limit per host, TBH, i would say that is not well calculated.

So we have to fix them, but true that they were already there and would be nice if we decouple both things.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ya I have a feeling this is the tip of the iceberg :) I have a strong suspicious there's a leak in aiohttp or something aiohttp uses as right now we're leaking ~40MB / week in prod

waiter.set_result(None)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can not guarantee that the element is a none canceled future. Even you are removing from the list here [1].

Once the Future is set with an exception the callback [2] that will trigger this Exception will be scheduled by the loop, so before the callback that cancels the Task is really executed, the task that was holding a connection and wants to release it might pop a canceled future.

[1] https://github.com/aio-libs/aiohttp/pull/2937/files#diff-7f25afde79f309e2f8722c26cf1f10adR399
[2] https://github.com/python/cpython/blob/master/Lib/asyncio/futures.py#L254

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see how this can happen because the async stack ends on the waiter for Future and there are no done callbacks associated with it, see this example: https://ideone.com/izIfqw

Think of how this would work, currently no-one sets an exception on this future besides cancelling the waiter at [1], we can verify this by seeing the future is only available from self._waiters and no one sets an exception on it.

so, if you cancel task at [1], you have two options:

  1. you don't wait on the cancelled task:
    1.1) you can release it (ok, removed from list), and then wait on cancelled (item already gone from list)
    1.2) you can not release it (ok), then wait on cancelled (item will be removed from list)
  2. you wait on the cancelled task, in which case the baseexcept runs and removes item from list. Nothing else can happen because we're just waiting on the future and no-one else is waiting on said future or has callbacks against said future.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let me know if I'm missing something in my logic, I know it can get complex

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The task will end up once the reactor schedules again the task, the result that is given back to the task depends on the future taht yielded the task before. The cancel is only a signal that wakes up the task with an CanceledError exception.

So the task might be still there just waiting for its turn in the reactor/loop. While its happenning you have chances to have the situation that I told tou.

Please review the asyncio lock code, all of the different implementations take this into consideration.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks @pfreixes for that example, so it seems it depends on the ordering of the tasks called during cancellation. Here's a question, can you have an outstanding release for a single connector? It seems like you could end up releasing two connectors for a single cancel no if the except clause executes before wakeup_next_waiter

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure about the question, but once the race condition is considered and mitigated the code should work as is expected.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@pfreixes my "picture" :)

import asyncio

loop = asyncio.get_event_loop()
waiters = []


async def task(waiter):
    try:
        await waiter
    except:
        print("task finalized")
        if waiter in waiters:
            print("waiter removed")
            try:
                waiters.remove(waiter)
            except ValueError:
                print("Waiter already removed")
        else:
            print("waiter not present")
        raise


def wakeup_next_waiter():
    if not waiters:
        return

    waiter = waiters.pop(0)
    try:
        if not waiter.done():
            waiter.set_result(None)
    except Exception as e:
        print(f"Exception calling set_result {e!r}")
        raise


async def main(loop):
    # We add two waiters
    waiter = loop.create_future()
    waiters.append(waiter)

    waiter = loop.create_future()
    waiters.append(waiter)

    # create the task that will wait till either the waiter
    # is finished or the task is cancelled.
    t = asyncio.ensure_future(task(waiter))

    # make a loop iteration to allow the task reach the
    #     await waiter
    await asyncio.sleep(0)

    # put in the loop a callback to wake up the waiter.
    loop.call_later(0.1, wakeup_next_waiter)

    # cancel the task, this will mark the task as cancelled
    # but will be pending a loop iteration to wake up the
    # task, having as a result a CanceledError exception.
    # This implicitly will schedule the Task._wakeup function
    # to be executed in the next loop iteration.
    t.cancel()

    try:
        await t
    except asyncio.CancelledError:
        pass

    # wait for the release to run
    await asyncio.sleep(1)

    # now we have zero waiters even though only one was cancelled
    print(len(waiters))

loop.run_until_complete(main(loop))

note in this example I add two waiters, cancel one, but at the end none are left because I ensure that the wakeup_next_waiter happens after task

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see the issue in the code - be carefull with extra ifs that are not needed in thte task function. If you are claiming that both waiters are removed this IMO is perfectly fine.

The first waiter that is created is removed through the happy path of the wakeup_next_waiter, obviously being removed by waiter = waiters.pop(0), no task is wakened up because you didn't start any task with that waiter. The second waiter is canceled and the task related will remove the waiter by the waiters.remove(waiter).

Just to put all of us in the same page, the release of a used connection is done automatically by the context manager of a request calling the resp.release() [1]. So every time that a code path goes out of the context will release the connection and will try to wake up pending requests.

[1] https://github.com/aio-libs/aiohttp/blob/master/aiohttp/client_reqrep.py#L786

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok I think ya the way to think about this is that there are N waiters and M connectors, while they're represented in self._waiters the "ownership" on each side is unique. You may inefficiently wake up a cancelled waiter with the current algo but it's probably easier to deal with. Thanks ago for convo was enlightening. Will work on unittest fixes and recommended change today.


if not waiters:
del self._waiters[key]

return True
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you don't do anything with the result, don't implement a result at all, it's like dead code. This is just a small remark

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

? result is used in _release_waiter

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah I see sry


def _release_waiter(self):
# always release only one waiter

if self._limit:
# if we have limit and we have available
if self._limit - len(self._acquired) > 0:
for key, waiters in self._waiters.items():
if waiters:
if not waiters[0].done():
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

with the new model we can guarantee that there are only active waiters in the list so we can greatly simplify this to always popping and notifying the first item in the list

waiters[0].set_result(None)
if self._release_key_waiter(key, waiters):
break

elif self._limit_per_host:
# if we have dont have limit but have limit per host
# then release first available
for key, waiters in self._waiters.items():
if waiters:
if not waiters[0].done():
waiters[0].set_result(None)
if self._release_key_waiter(key, waiters):
break

def _release_acquired(self, key, proto):
Expand Down