Skip to content

Commit

Permalink
POC Rquests tracing aio-libs#2313
Browse files Browse the repository at this point in the history
This commit has as a main proposing start a discussion about if this
what was expected and clear some of the uncertainties related to the
implementation. The following list enumerates each point that could
be discussed, asside of the general implementation.

1) Break the `Signal` object to accommodate the old implementation
as `AppSignal` having the `Signal` for generic signals
2) Coroutine signals are reserved only for the start, end and exception
signals. Others are implemented as `FuncSignals`
3) The final list of signals suported is the ones that are in this
commit, however not all of them are implemented yet. Just follow the new
`ClienSession` properties that match the `on_*` pattern.
4) Redirects have an own signal `on_request_redirect` that is triggered
at each redirect response.
5) There is no a clear way to pass the `Session` and the `trace_context`
for the underlying objects used by the `ClientSession`. If you have
better ideas please shout it.
7) Signals related to the progress of the upload/download will be
implemented at the streamers/protocols
  • Loading branch information
pfreixes committed Oct 24, 2017
1 parent 3b95e16 commit a8da61d
Show file tree
Hide file tree
Showing 7 changed files with 310 additions and 14 deletions.
111 changes: 107 additions & 4 deletions aiohttp/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import sys
import traceback
import warnings
from types import SimpleNamespace

from multidict import CIMultiDict, MultiDict, MultiDictProxy, istr
from yarl import URL
Expand All @@ -28,6 +29,7 @@
strip_auth_from_url)
from .http import WS_KEY, WebSocketReader, WebSocketWriter
from .http_websocket import WSHandshakeError, ws_ext_gen, ws_ext_parse
from .signals import FuncSignal, Signal
from .streams import FlowControlDataQueue


Expand Down Expand Up @@ -96,6 +98,7 @@ def __init__(self, *, connector=None, loop=None, cookies=None,

if cookies is not None:
self._cookie_jar.update_cookies(cookies)

self._connector = connector
self._connector_owner = connector_owner
self._default_auth = auth
Expand All @@ -108,6 +111,24 @@ def __init__(self, *, connector=None, loop=None, cookies=None,
self._auto_decompress = auto_decompress
self._trust_env = trust_env

self._on_request_start = Signal()
self._on_request_end = Signal()
self._on_request_exception = Signal()

self._on_request_queued_start = FuncSignal()
self._on_request_queued_start = FuncSignal()
self._on_request_createconn_start = FuncSignal()
self._on_request_createconn_end = FuncSignal()
self._on_request_redirect = FuncSignal()
self._on_request_headers_sent = FuncSignal()
self._on_request_content_sent = FuncSignal()

# TODO: not implemented yet
self._on_request_content_chunk_sent = FuncSignal()
self._on_request_headers_received = FuncSignal()
self._on_request_content_chunk_received = FuncSignal()
self._on_request_content_received = FuncSignal()

# Convert to list of tuples
if headers:
headers = CIMultiDict(headers)
Expand Down Expand Up @@ -161,7 +182,8 @@ def _request(self, method, url, *,
verify_ssl=None,
fingerprint=None,
ssl_context=None,
proxy_headers=None):
proxy_headers=None,
trace_context=None):

# NOTE: timeout clamps existing connect and read timeouts. We cannot
# set the default to None because we need to detect if the user wants
Expand Down Expand Up @@ -218,6 +240,18 @@ def _request(self, method, url, *,
handle = tm.start()

url = URL(url)

if trace_context is None:
trace_context = SimpleNamespace()

yield from self.on_request_start.send(
trace_context,
method,
url.host,
url.port,
headers
)

timer = tm.timer()
try:
with timer:
Expand Down Expand Up @@ -261,12 +295,16 @@ def _request(self, method, url, *,
proxy=proxy, proxy_auth=proxy_auth, timer=timer,
session=self, auto_decompress=self._auto_decompress,
verify_ssl=verify_ssl, fingerprint=fingerprint,
ssl_context=ssl_context, proxy_headers=proxy_headers)
ssl_context=ssl_context, proxy_headers=proxy_headers,
trace_context=trace_context)

# connection timeout
try:
with CeilTimeout(self._conn_timeout, loop=self._loop):
conn = yield from self._connector.connect(req)
conn = yield from self._connector.connect(
req,
session_tracing=(self, trace_context)
)
except asyncio.TimeoutError as exc:
raise ServerTimeoutError(
'Connection timeout '
Expand All @@ -291,6 +329,9 @@ def _request(self, method, url, *,
# redirects
if resp.status in (
301, 302, 303, 307, 308) and allow_redirects:

self.on_request_redirect.send(trace_context, resp)

redirects += 1
history.append(resp)
if max_redirects and redirects >= max_redirects:
Expand Down Expand Up @@ -354,15 +395,17 @@ def _request(self, method, url, *,
handle.cancel()

resp._history = tuple(history)
yield from self.on_request_end.send(trace_context, resp)
return resp

except:
except Exception as e:
# cleanup timer
tm.close()
if handle:
handle.cancel()
handle = None

yield from self.on_request_exception.send(trace_context, e)
raise

def ws_connect(self, url, *,
Expand Down Expand Up @@ -654,6 +697,66 @@ def loop(self):
"""Session's loop."""
return self._loop

@property
def on_request_start(self):
return self._on_request_start

@property
def on_request_redirect_start(self):
return self._on_request_start

@property
def on_request_redirect_end(self):
return self._on_request_start

@property
def on_request_queued_start(self):
return self._on_request_queued_start

@property
def on_request_queued_end(self):
return self._on_request_queued_end

@property
def on_request_createconn_start(self):
return self._on_request_createconn_start

@property
def on_request_createconn_end(self):
return self._on_request_createconn_end

@property
def on_request_end(self):
return self._on_request_end

@property
def on_request_exception(self):
return self._on_request_exception

@property
def on_request_headers_sent(self):
return self._on_request_headers_sent

@property
def on_request_content_chunk_sent(self):
return self._on_request_content_chunk_sent

@property
def on_request_content_sent(self):
return self._on_request_content_sent

@property
def on_request_headers_received(self):
return self._on_request_headers_received

@property
def on_request_content_chunk_received(self):
return self._on_request_content_chunk_received

@property
def on_request_content_received(self):
return self._on_request_content_received

def detach(self):
"""Detach connector from session without closing the former.
Expand Down
12 changes: 9 additions & 3 deletions aiohttp/client_reqrep.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ def __init__(self, method, url, *,
proxy=None, proxy_auth=None,
timer=None, session=None, auto_decompress=True,
verify_ssl=None, fingerprint=None, ssl_context=None,
proxy_headers=None):
proxy_headers=None, trace_context=None):

if verify_ssl is False and ssl_context is not None:
raise ValueError(
Expand Down Expand Up @@ -117,6 +117,7 @@ def __init__(self, method, url, *,
self._auto_decompress = auto_decompress
self._verify_ssl = verify_ssl
self._ssl_context = ssl_context
self._trace_context = trace_context

if loop.get_debug():
self._source_traceback = traceback.extract_stack(sys._getframe(1))
Expand Down Expand Up @@ -400,6 +401,7 @@ def write_bytes(self, writer, conn):
for chunk in self.body:
writer.write(chunk)

self._session.on_request_content_sent.send(self._trace_context)
yield from writer.write_eof()
except OSError as exc:
new_exc = ClientOSError(
Expand Down Expand Up @@ -462,14 +464,17 @@ def send(self, conn):
self.method, path, self.version)
writer.write_headers(status_line, self.headers)

self._session.on_request_headers_sent.send(self._trace_context)

self._writer = helpers.ensure_future(
self.write_bytes(writer, conn), loop=self.loop)

self.response = self.response_class(
self.method, self.original_url,
writer=self._writer, continue100=self._continue, timer=self._timer,
request_info=self.request_info,
auto_decompress=self._auto_decompress
auto_decompress=self._auto_decompress,
session=self._session, trace_context=self._trace_context
)

self.response._post_init(self.loop, self._session)
Expand Down Expand Up @@ -513,7 +518,8 @@ class ClientResponse(HeadersMixin):

def __init__(self, method, url, *,
writer=None, continue100=None, timer=None,
request_info=None, auto_decompress=True):
request_info=None, auto_decompress=True,
session=None, trace_context=None):
assert isinstance(url, URL)

self.method = method
Expand Down
21 changes: 20 additions & 1 deletion aiohttp/connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -347,8 +347,12 @@ def closed(self):
return self._closed

@asyncio.coroutine
def connect(self, req):
def connect(self, req, session_tracing=None):
"""Get from pool or create new connection."""

if session_tracing:
session, trace_context = session_tracing

key = req.connection_key

if self._limit:
Expand All @@ -375,6 +379,9 @@ def connect(self, req):
# This connection will now count towards the limit.
waiters = self._waiters[key]
waiters.append(fut)
if session_tracing:
session.on_request_queued_start.send(trace_context)

try:
yield from fut
finally:
Expand All @@ -383,11 +390,19 @@ def connect(self, req):
if not waiters:
del self._waiters[key]

if session_tracing:
session.on_request_queued_end.send(trace_context)

proto = self._get(key)
if proto is None:
placeholder = _TransportPlaceholder()
self._acquired.add(placeholder)
self._acquired_per_host[key].add(placeholder)

if session_tracing:
session.on_request_createconn_start.send(
trace_context)

try:
proto = yield from self._create_connection(req)
if self._closed:
Expand All @@ -405,6 +420,10 @@ def connect(self, req):
self._acquired.remove(placeholder)
self._acquired_per_host[key].remove(placeholder)

if session_tracing:
session.on_request_createconn_end.send(
trace_context)

self._acquired.add(proto)
self._acquired_per_host[key].add(proto)
return Connection(self, key, proto, self._loop)
Expand Down
19 changes: 19 additions & 0 deletions aiohttp/signals.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,25 @@ class Signal(BaseSignal):
arguments.
"""

__slots__ = ()

@asyncio.coroutine
def send(self, *args, **kwargs):
"""
Sends data to all registered receivers.
"""
yield from self._send(*args, **kwargs)


class AppSignal(BaseSignal):
"""Coroutine-based signal implementation.
To connect a callback to a signal, use any list method.
Signals are fired using the :meth:`send` coroutine, which takes named
arguments.
"""

__slots__ = ('_app', '_name', '_pre', '_post')

def __init__(self, app):
Expand Down
10 changes: 5 additions & 5 deletions aiohttp/web.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
from .helpers import AccessLogger
from .http import HttpVersion # noqa
from .log import access_logger, web_logger
from .signals import FuncSignal, PostSignal, PreSignal, Signal
from .signals import AppSignal, FuncSignal, PostSignal, PreSignal
from .web_exceptions import * # noqa
from .web_fileresponse import * # noqa
from .web_middlewares import * # noqa
Expand Down Expand Up @@ -75,10 +75,10 @@ def __init__(self, *,
self._on_pre_signal = PreSignal()
self._on_post_signal = PostSignal()
self._on_loop_available = FuncSignal(self)
self._on_response_prepare = Signal(self)
self._on_startup = Signal(self)
self._on_shutdown = Signal(self)
self._on_cleanup = Signal(self)
self._on_response_prepare = AppSignal(self)
self._on_startup = AppSignal(self)
self._on_shutdown = AppSignal(self)
self._on_cleanup = AppSignal(self)
self._client_max_size = client_max_size

# MutableMapping API
Expand Down
Loading

0 comments on commit a8da61d

Please sign in to comment.