From a8da61d3613d1c931053f1c92a0f3a79e4ecc87c Mon Sep 17 00:00:00 2001 From: Pau Freixes Date: Tue, 24 Oct 2017 17:49:46 +0200 Subject: [PATCH] POC Rquests tracing #2313 This commit has as a main proposing start a discussion about if this what was expected and clear some of the uncertainties related to the implementation. The following list enumerates each point that could be discussed, asside of the general implementation. 1) Break the `Signal` object to accommodate the old implementation as `AppSignal` having the `Signal` for generic signals 2) Coroutine signals are reserved only for the start, end and exception signals. Others are implemented as `FuncSignals` 3) The final list of signals suported is the ones that are in this commit, however not all of them are implemented yet. Just follow the new `ClienSession` properties that match the `on_*` pattern. 4) Redirects have an own signal `on_request_redirect` that is triggered at each redirect response. 5) There is no a clear way to pass the `Session` and the `trace_context` for the underlying objects used by the `ClientSession`. If you have better ideas please shout it. 7) Signals related to the progress of the upload/download will be implemented at the streamers/protocols --- aiohttp/client.py | 111 +++++++++++++++++++++++++++++++++-- aiohttp/client_reqrep.py | 12 +++- aiohttp/connector.py | 21 ++++++- aiohttp/signals.py | 19 ++++++ aiohttp/web.py | 10 ++-- tests/test_client_session.py | 96 +++++++++++++++++++++++++++++- tests/test_connector.py | 55 +++++++++++++++++ 7 files changed, 310 insertions(+), 14 deletions(-) diff --git a/aiohttp/client.py b/aiohttp/client.py index 7eaf4aea580..c31ade7dcc9 100644 --- a/aiohttp/client.py +++ b/aiohttp/client.py @@ -8,6 +8,7 @@ import sys import traceback import warnings +from types import SimpleNamespace from multidict import CIMultiDict, MultiDict, MultiDictProxy, istr from yarl import URL @@ -28,6 +29,7 @@ strip_auth_from_url) from .http import WS_KEY, WebSocketReader, WebSocketWriter from .http_websocket import WSHandshakeError, ws_ext_gen, ws_ext_parse +from .signals import FuncSignal, Signal from .streams import FlowControlDataQueue @@ -96,6 +98,7 @@ def __init__(self, *, connector=None, loop=None, cookies=None, if cookies is not None: self._cookie_jar.update_cookies(cookies) + self._connector = connector self._connector_owner = connector_owner self._default_auth = auth @@ -108,6 +111,24 @@ def __init__(self, *, connector=None, loop=None, cookies=None, self._auto_decompress = auto_decompress self._trust_env = trust_env + self._on_request_start = Signal() + self._on_request_end = Signal() + self._on_request_exception = Signal() + + self._on_request_queued_start = FuncSignal() + self._on_request_queued_start = FuncSignal() + self._on_request_createconn_start = FuncSignal() + self._on_request_createconn_end = FuncSignal() + self._on_request_redirect = FuncSignal() + self._on_request_headers_sent = FuncSignal() + self._on_request_content_sent = FuncSignal() + + # TODO: not implemented yet + self._on_request_content_chunk_sent = FuncSignal() + self._on_request_headers_received = FuncSignal() + self._on_request_content_chunk_received = FuncSignal() + self._on_request_content_received = FuncSignal() + # Convert to list of tuples if headers: headers = CIMultiDict(headers) @@ -161,7 +182,8 @@ def _request(self, method, url, *, verify_ssl=None, fingerprint=None, ssl_context=None, - proxy_headers=None): + proxy_headers=None, + trace_context=None): # NOTE: timeout clamps existing connect and read timeouts. We cannot # set the default to None because we need to detect if the user wants @@ -218,6 +240,18 @@ def _request(self, method, url, *, handle = tm.start() url = URL(url) + + if trace_context is None: + trace_context = SimpleNamespace() + + yield from self.on_request_start.send( + trace_context, + method, + url.host, + url.port, + headers + ) + timer = tm.timer() try: with timer: @@ -261,12 +295,16 @@ def _request(self, method, url, *, proxy=proxy, proxy_auth=proxy_auth, timer=timer, session=self, auto_decompress=self._auto_decompress, verify_ssl=verify_ssl, fingerprint=fingerprint, - ssl_context=ssl_context, proxy_headers=proxy_headers) + ssl_context=ssl_context, proxy_headers=proxy_headers, + trace_context=trace_context) # connection timeout try: with CeilTimeout(self._conn_timeout, loop=self._loop): - conn = yield from self._connector.connect(req) + conn = yield from self._connector.connect( + req, + session_tracing=(self, trace_context) + ) except asyncio.TimeoutError as exc: raise ServerTimeoutError( 'Connection timeout ' @@ -291,6 +329,9 @@ def _request(self, method, url, *, # redirects if resp.status in ( 301, 302, 303, 307, 308) and allow_redirects: + + self.on_request_redirect.send(trace_context, resp) + redirects += 1 history.append(resp) if max_redirects and redirects >= max_redirects: @@ -354,15 +395,17 @@ def _request(self, method, url, *, handle.cancel() resp._history = tuple(history) + yield from self.on_request_end.send(trace_context, resp) return resp - except: + except Exception as e: # cleanup timer tm.close() if handle: handle.cancel() handle = None + yield from self.on_request_exception.send(trace_context, e) raise def ws_connect(self, url, *, @@ -654,6 +697,66 @@ def loop(self): """Session's loop.""" return self._loop + @property + def on_request_start(self): + return self._on_request_start + + @property + def on_request_redirect_start(self): + return self._on_request_start + + @property + def on_request_redirect_end(self): + return self._on_request_start + + @property + def on_request_queued_start(self): + return self._on_request_queued_start + + @property + def on_request_queued_end(self): + return self._on_request_queued_end + + @property + def on_request_createconn_start(self): + return self._on_request_createconn_start + + @property + def on_request_createconn_end(self): + return self._on_request_createconn_end + + @property + def on_request_end(self): + return self._on_request_end + + @property + def on_request_exception(self): + return self._on_request_exception + + @property + def on_request_headers_sent(self): + return self._on_request_headers_sent + + @property + def on_request_content_chunk_sent(self): + return self._on_request_content_chunk_sent + + @property + def on_request_content_sent(self): + return self._on_request_content_sent + + @property + def on_request_headers_received(self): + return self._on_request_headers_received + + @property + def on_request_content_chunk_received(self): + return self._on_request_content_chunk_received + + @property + def on_request_content_received(self): + return self._on_request_content_received + def detach(self): """Detach connector from session without closing the former. diff --git a/aiohttp/client_reqrep.py b/aiohttp/client_reqrep.py index 3a57ac14ec3..c6398c617f8 100644 --- a/aiohttp/client_reqrep.py +++ b/aiohttp/client_reqrep.py @@ -87,7 +87,7 @@ def __init__(self, method, url, *, proxy=None, proxy_auth=None, timer=None, session=None, auto_decompress=True, verify_ssl=None, fingerprint=None, ssl_context=None, - proxy_headers=None): + proxy_headers=None, trace_context=None): if verify_ssl is False and ssl_context is not None: raise ValueError( @@ -117,6 +117,7 @@ def __init__(self, method, url, *, self._auto_decompress = auto_decompress self._verify_ssl = verify_ssl self._ssl_context = ssl_context + self._trace_context = trace_context if loop.get_debug(): self._source_traceback = traceback.extract_stack(sys._getframe(1)) @@ -400,6 +401,7 @@ def write_bytes(self, writer, conn): for chunk in self.body: writer.write(chunk) + self._session.on_request_content_sent.send(self._trace_context) yield from writer.write_eof() except OSError as exc: new_exc = ClientOSError( @@ -462,6 +464,8 @@ def send(self, conn): self.method, path, self.version) writer.write_headers(status_line, self.headers) + self._session.on_request_headers_sent.send(self._trace_context) + self._writer = helpers.ensure_future( self.write_bytes(writer, conn), loop=self.loop) @@ -469,7 +473,8 @@ def send(self, conn): self.method, self.original_url, writer=self._writer, continue100=self._continue, timer=self._timer, request_info=self.request_info, - auto_decompress=self._auto_decompress + auto_decompress=self._auto_decompress, + session=self._session, trace_context=self._trace_context ) self.response._post_init(self.loop, self._session) @@ -513,7 +518,8 @@ class ClientResponse(HeadersMixin): def __init__(self, method, url, *, writer=None, continue100=None, timer=None, - request_info=None, auto_decompress=True): + request_info=None, auto_decompress=True, + session=None, trace_context=None): assert isinstance(url, URL) self.method = method diff --git a/aiohttp/connector.py b/aiohttp/connector.py index b4065b492bd..5be550671a7 100644 --- a/aiohttp/connector.py +++ b/aiohttp/connector.py @@ -347,8 +347,12 @@ def closed(self): return self._closed @asyncio.coroutine - def connect(self, req): + def connect(self, req, session_tracing=None): """Get from pool or create new connection.""" + + if session_tracing: + session, trace_context = session_tracing + key = req.connection_key if self._limit: @@ -375,6 +379,9 @@ def connect(self, req): # This connection will now count towards the limit. waiters = self._waiters[key] waiters.append(fut) + if session_tracing: + session.on_request_queued_start.send(trace_context) + try: yield from fut finally: @@ -383,11 +390,19 @@ def connect(self, req): if not waiters: del self._waiters[key] + if session_tracing: + session.on_request_queued_end.send(trace_context) + proto = self._get(key) if proto is None: placeholder = _TransportPlaceholder() self._acquired.add(placeholder) self._acquired_per_host[key].add(placeholder) + + if session_tracing: + session.on_request_createconn_start.send( + trace_context) + try: proto = yield from self._create_connection(req) if self._closed: @@ -405,6 +420,10 @@ def connect(self, req): self._acquired.remove(placeholder) self._acquired_per_host[key].remove(placeholder) + if session_tracing: + session.on_request_createconn_end.send( + trace_context) + self._acquired.add(proto) self._acquired_per_host[key].add(proto) return Connection(self, key, proto, self._loop) diff --git a/aiohttp/signals.py b/aiohttp/signals.py index a244744938a..591d8d9f7ae 100644 --- a/aiohttp/signals.py +++ b/aiohttp/signals.py @@ -26,6 +26,25 @@ class Signal(BaseSignal): arguments. """ + __slots__ = () + + @asyncio.coroutine + def send(self, *args, **kwargs): + """ + Sends data to all registered receivers. + """ + yield from self._send(*args, **kwargs) + + +class AppSignal(BaseSignal): + """Coroutine-based signal implementation. + + To connect a callback to a signal, use any list method. + + Signals are fired using the :meth:`send` coroutine, which takes named + arguments. + """ + __slots__ = ('_app', '_name', '_pre', '_post') def __init__(self, app): diff --git a/aiohttp/web.py b/aiohttp/web.py index cc89d9304a8..65a4183816a 100644 --- a/aiohttp/web.py +++ b/aiohttp/web.py @@ -20,7 +20,7 @@ from .helpers import AccessLogger from .http import HttpVersion # noqa from .log import access_logger, web_logger -from .signals import FuncSignal, PostSignal, PreSignal, Signal +from .signals import AppSignal, FuncSignal, PostSignal, PreSignal from .web_exceptions import * # noqa from .web_fileresponse import * # noqa from .web_middlewares import * # noqa @@ -75,10 +75,10 @@ def __init__(self, *, self._on_pre_signal = PreSignal() self._on_post_signal = PostSignal() self._on_loop_available = FuncSignal(self) - self._on_response_prepare = Signal(self) - self._on_startup = Signal(self) - self._on_shutdown = Signal(self) - self._on_cleanup = Signal(self) + self._on_response_prepare = AppSignal(self) + self._on_startup = AppSignal(self) + self._on_shutdown = AppSignal(self) + self._on_cleanup = AppSignal(self) self._client_max_size = client_max_size # MutableMapping API diff --git a/tests/test_client_session.py b/tests/test_client_session.py index d8c10449c67..3c960ef4874 100644 --- a/tests/test_client_session.py +++ b/tests/test_client_session.py @@ -3,6 +3,7 @@ import gc import re import types +from types import SimpleNamespace from unittest import mock import pytest @@ -10,8 +11,9 @@ from yarl import URL import aiohttp -from aiohttp import web +from aiohttp import hdrs, helpers, web from aiohttp.client import ClientSession +from aiohttp.client_reqrep import ClientRequest from aiohttp.connector import BaseConnector, TCPConnector from aiohttp.helpers import SimpleCookie @@ -479,3 +481,95 @@ def test_client_session_implicit_loop_warn(): asyncio.set_event_loop(None) loop.close() + + +@asyncio.coroutine +def test_request_tracing(loop): + trace_context = {} + on_request_start = mock.Mock() + on_request_redirect = mock.Mock() + on_request_end = mock.Mock() + on_request_headers_sent = mock.Mock() + on_request_content_sent = mock.Mock() + + session = aiohttp.ClientSession(loop=loop) + session.on_request_start.append(on_request_start) + session.on_request_redirect.append(on_request_redirect) + session.on_request_end.append(on_request_end) + session.on_request_headers_sent.append(on_request_headers_sent) + session.on_request_content_sent.append(on_request_content_sent) + + resp = yield from session.get( + 'http://example.com', + trace_context=trace_context + ) + + on_request_start.assert_called_once_with( + trace_context, + hdrs.METH_GET, + "example.com", + 80, + CIMultiDict() + ) + + on_request_end.assert_called_once_with(trace_context, resp) + on_request_headers_sent.assert_called_once_with(trace_context) + on_request_content_sent.assert_called_once_with(trace_context) + assert not on_request_redirect.called + + +@asyncio.coroutine +def test_request_tracing_default_trace_context(loop): + on_request_start = mock.Mock() + + session = aiohttp.ClientSession(loop=loop) + session.on_request_start.append(on_request_start) + + yield from session.get('http://example.com') + + assert isinstance(on_request_start.call_args[0][0], SimpleNamespace) + + +@asyncio.coroutine +def test_request_tracing_exception(loop): + on_request_end = mock.Mock() + on_request_exception = mock.Mock() + + with mock.patch("aiohttp.client.TCPConnector.connect") as connect_patched: + error = Exception() + f = helpers.create_future(loop) + f.set_exception(error) + connect_patched.return_value = f + + session = aiohttp.ClientSession(loop=loop) + session.on_request_end.append(on_request_end) + session.on_request_exception.append(on_request_exception) + + try: + yield from session.get('http://example.com') + except Exception: + pass + + on_request_exception.assert_called_once_with(mock.ANY, error) + assert not on_request_end.called + + +@asyncio.coroutine +def test_request_tracing_interpose_headers(loop): + + class MyClientRequest(ClientRequest): + headers = None + + def __init__(self, *args, **kwargs): + super(MyClientRequest, self).__init__(*args, **kwargs) + MyClientRequest.headers = self.headers + + @asyncio.coroutine + def new_headers(trace_context, method, host, port, headers): + headers['foo'] = 'bar' + + session = aiohttp.ClientSession(loop=loop, request_class=MyClientRequest) + session.on_request_start.append(new_headers) + + yield from session.get('http://example.com') + assert MyClientRequest.headers['foo'] == 'bar' diff --git a/tests/test_connector.py b/tests/test_connector.py index 68167174e68..59396c8183f 100644 --- a/tests/test_connector.py +++ b/tests/test_connector.py @@ -590,6 +590,25 @@ def test_connect(loop): connection.close() +@asyncio.coroutine +def test_connect_request_tracing(loop): + proto = mock.Mock() + proto.is_connected.return_value = True + session = mock.Mock() + trace_context = mock.Mock() + + req = ClientRequest('GET', URL('http://host:80'), loop=loop) + + conn = aiohttp.BaseConnector(loop=loop) + conn._create_connection = mock.Mock() + conn._create_connection.return_value = helpers.create_future(loop) + conn._create_connection.return_value.set_result(proto) + + yield from conn.connect(req, session_tracing=(session, trace_context)) + session.on_request_createconn_start.send.assert_called_with(trace_context) + session.on_request_createconn_end.send.assert_called_with(trace_context) + + @asyncio.coroutine def test_close_during_connect(loop): proto = mock.Mock() @@ -884,6 +903,42 @@ def f(): conn.close() +@asyncio.coroutine +def test_connect_queued_operation_request_tracing(loop, key): + proto = mock.Mock() + proto.is_connected.return_value = True + + req = ClientRequest('GET', URL('http://localhost1:80'), + loop=loop, + response_class=mock.Mock()) + + conn = aiohttp.BaseConnector(loop=loop, limit=1) + conn._conns[key] = [(proto, loop.time())] + conn._create_connection = mock.Mock() + conn._create_connection.return_value = helpers.create_future(loop) + conn._create_connection.return_value.set_result(proto) + + connection1 = yield from conn.connect(req) + + @asyncio.coroutine + def f(): + session = mock.Mock() + trace_context = mock.Mock() + connection2 = yield from conn.connect( + req, + session_tracing=(session, trace_context) + ) + session.on_request_queued_start.send.assert_called_with(trace_context) + session.on_request_queued_end.send.assert_called_with(trace_context) + connection2.release() + + task = helpers.ensure_future(f(), loop=loop) + yield from asyncio.sleep(0.01, loop=loop) + connection1.release() + yield from task + conn.close() + + @asyncio.coroutine def test_connect_with_limit_and_limit_per_host(loop, key): proto = mock.Mock()