From 6766745f9c6281539e2c6d0fe63b59e57cdd4e50 Mon Sep 17 00:00:00 2001 From: Daniel Mitterdorfer Date: Wed, 1 Apr 2020 11:46:55 +0200 Subject: [PATCH] Fix request timeout handling (#943) * Fix request timeout handling With this commit we address several issues in request timeout handling: 1. We ensure that the end of the request is properly measured 2. We set an explicit request timeout on the connection to avoid that the default connection timeout is picked. 3. We use the proper method parameter `timeout` when falling back to the raw transport API instead of setting the ineffective parameter `request_timeout`. --- esrally/async_connection.py | 6 ++++-- esrally/client.py | 2 ++ esrally/driver/runner.py | 7 +++---- tests/driver/runner_test.py | 7 +++---- 4 files changed, 12 insertions(+), 10 deletions(-) diff --git a/esrally/async_connection.py b/esrally/async_connection.py index bd7e302a3..f5293d073 100644 --- a/esrally/async_connection.py +++ b/esrally/async_connection.py @@ -117,8 +117,10 @@ def perform_request(self, method, url, params=None, body=None, timeout=None, ign start = self.loop.time() response = None try: - with async_timeout.timeout(timeout or self.timeout.total, loop=self.loop): - response = yield from self.session.request(method, url, data=body, headers=headers) + request_timeout = timeout or self.timeout.total + with async_timeout.timeout(request_timeout, loop=self.loop): + # override the default session timeout explicitly + response = yield from self.session.request(method, url, data=body, headers=headers, timeout=request_timeout) raw_data = yield from response.text() duration = self.loop.time() - start diff --git a/esrally/client.py b/esrally/client.py index e2df92d47..161d65013 100644 --- a/esrally/client.py +++ b/esrally/client.py @@ -161,6 +161,8 @@ async def on_request_end(session, trace_config_ctx, params): trace_config = aiohttp.TraceConfig() trace_config.on_request_start.append(on_request_start) trace_config.on_request_end.append(on_request_end) + # ensure that we also stop the timer when a request "ends" with an exception (e.g. a timeout) + trace_config.on_request_exception.append(on_request_end) # needs patching as https://github.com/elastic/elasticsearch-py-async/pull/68 is not merged yet class RallyAsyncTransport(elasticsearch_async.transport.AsyncTransport): diff --git a/esrally/driver/runner.py b/esrally/driver/runner.py index e832c0602..68a62d87e 100644 --- a/esrally/driver/runner.py +++ b/esrally/driver/runner.py @@ -605,12 +605,11 @@ async def __call__(self, es, params): except elasticsearch.TransportError as e: # this is caused by older versions of Elasticsearch (< 2.1), fall back to optimize if e.status_code == 400: - params = {"request_timeout": request_timeout} if max_num_segments: - await es.transport.perform_request("POST", "/_optimize?max_num_segments={}".format(max_num_segments), - params=params) + await es.transport.perform_request("POST", f"/_optimize?max_num_segments={max_num_segments}", + timeout=request_timeout) else: - await es.transport.perform_request("POST", "/_optimize", params=params) + await es.transport.perform_request("POST", "/_optimize", timeout=request_timeout) else: raise e diff --git a/tests/driver/runner_test.py b/tests/driver/runner_test.py index 8370edc8a..840dc4eba 100644 --- a/tests/driver/runner_test.py +++ b/tests/driver/runner_test.py @@ -911,7 +911,7 @@ async def test_force_merge_override_request_timeout(self, es): es.indices.forcemerge.return_value = as_future() force_merge = runner.ForceMerge() - await force_merge(es, params={"index" : "_all", "request-timeout": 50000}) + await force_merge(es, params={"index": "_all", "request-timeout": 50000}) es.indices.forcemerge.assert_called_once_with(index="_all", request_timeout=50000) @@ -934,7 +934,7 @@ async def test_optimize_with_defaults(self, es): force_merge = runner.ForceMerge() await force_merge(es, params={}) - es.transport.perform_request.assert_called_once_with("POST", "/_optimize", params={"request_timeout": None}) + es.transport.perform_request.assert_called_once_with("POST", "/_optimize", timeout=None) @mock.patch("elasticsearch.Elasticsearch") @run_async @@ -944,8 +944,7 @@ async def test_optimize_with_params(self, es): force_merge = runner.ForceMerge() await force_merge(es, params={"max-num-segments": 3, "request-timeout": 17000}) - es.transport.perform_request.assert_called_once_with("POST", "/_optimize?max_num_segments=3", - params={"request_timeout": 17000}) + es.transport.perform_request.assert_called_once_with("POST", "/_optimize?max_num_segments=3", timeout=17000) class IndicesStatsRunnerTests(TestCase):