Skip to content

Commit

Permalink
Add Retry runner
Browse files Browse the repository at this point in the history
Closes #363
  • Loading branch information
danielmitterdorfer committed Nov 16, 2017
1 parent d13e556 commit bab1224
Show file tree
Hide file tree
Showing 2 changed files with 291 additions and 0 deletions.
66 changes: 66 additions & 0 deletions esrally/driver/runner.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import sys
import types
import time
import logging
from collections import Counter, OrderedDict

Expand Down Expand Up @@ -547,6 +548,71 @@ def __repr__(self, *args, **kwargs):
return "query"


# TODO: Allow to use this from (selected) regular runners and add user documentation.
# TODO: It would maybe be interesting to add meta-data on how many retries there were.
class Retry(Runner):
"""
This runner can be used as a wrapper around regular runners to retry operations.
It defines the following parameters:
* ``retries`` (optional, default 0): The number of times the operation is retried.
* ``retry-wait-period`` (optional, default 0.5): The time in seconds to wait after an error.
* ``retry-on-timeout`` (optional, default True): Whether to retry on connection timeout.
* ``retry-on-error`` (optional, default False): Whether to retry on failure (i.e. the delegate returns ``success == False``)
"""

def __init__(self, delegate):
self.delegate = delegate

def __enter__(self):
self.delegate.__enter__()
return self

def __call__(self, es, params):
import elasticsearch
import socket

max_attempts = params.get("retries", 0) + 1
sleep_time = params.get("retry-wait-period", 0.5)
retry_on_timeout = params.get("retry-on-timeout", True)
retry_on_error = params.get("retry-on-error", False)

for attempt in range(max_attempts):
last_attempt = attempt + 1 == max_attempts
try:
return_value = self.delegate(es, params)
if last_attempt or not retry_on_error:
return return_value
# we can determine success if and only if the runner returns a dict. Otherwise, we have to assume it was fine.
elif isinstance(return_value, dict):
if return_value.get("success", True):
return return_value
else:
time.sleep(sleep_time)
else:
return return_value
except (socket.timeout, elasticsearch.exceptions.ConnectionError):
if last_attempt or not retry_on_timeout:
raise
else:
time.sleep(sleep_time)
except elasticsearch.exceptions.TransportError as e:
if last_attempt or not retry_on_timeout:
raise e
elif e.status_code == 408:
logger.debug("%s has timed out." % repr(self.delegate))
time.sleep(sleep_time)
else:
raise e

def __exit__(self, exc_type, exc_val, exc_tb):
return self.delegate.__exit__(exc_type, exc_val, exc_tb)

def __repr__(self, *args, **kwargs):
return "retryable %s" % repr(self.delegate)


register_runner(track.OperationType.Index.name, BulkIndex())
register_runner(track.OperationType.ForceMerge.name, ForceMerge())
register_runner(track.OperationType.IndicesStats.name, IndicesStats())
Expand Down
225 changes: 225 additions & 0 deletions tests/driver/runner_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -880,3 +880,228 @@ def test_scroll_query_request_all_pages(self, es):
self.assertEqual("ops", results["unit"])
self.assertFalse(results["timed_out"])
self.assertFalse("error-type" in results)


class RetryTests(TestCase):
def test_is_transparent_on_success_when_no_retries(self):
delegate = mock.Mock()
es = None
params = {
# no retries
}
retrier = runner.Retry(delegate)

retrier(es, params)

delegate.assert_called_once_with(es, params)

def test_is_transparent_on_exception_when_no_retries(self):
import elasticsearch

delegate = mock.Mock(side_effect=elasticsearch.ConnectionError("N/A", "no route to host"))
es = None
params = {
# no retries
}
retrier = runner.Retry(delegate)

with self.assertRaises(elasticsearch.ConnectionError):
retrier(es, params)

delegate.assert_called_once_with(es, params)

def test_is_transparent_on_application_error_when_no_retries(self):
original_return_value = {"weight": 1, "unit": "ops", "success": False}

delegate = mock.Mock(return_value=original_return_value)
es = None
params = {
# no retries
}
retrier = runner.Retry(delegate)

result = retrier(es, params)

self.assertEqual(original_return_value, result)
delegate.assert_called_once_with(es, params)

def test_is_does_not_retry_on_success(self):
delegate = mock.Mock()
es = None
params = {
"retries": 3,
"retry-wait-period": 0.1,
"retry-on-timeout": True,
"retry-on-error": True
}
retrier = runner.Retry(delegate)

retrier(es, params)

delegate.assert_called_once_with(es, params)

def test_retries_on_timeout_if_wanted_and_raises_if_no_recovery(self):
import elasticsearch

delegate = mock.Mock(side_effect=elasticsearch.ConnectionError("N/A", "no route to host"))
es = None
params = {
"retries": 3,
"retry-wait-period": 0.01,
"retry-on-timeout": True,
"retry-on-error": True
}
retrier = runner.Retry(delegate)

with self.assertRaises(elasticsearch.ConnectionError):
retrier(es, params)

delegate.assert_has_calls([
mock.call(es, params),
mock.call(es, params),
mock.call(es, params)
])

def test_retries_on_timeout_if_wanted_and_returns_first_call(self):
import elasticsearch
failed_return_value = {"weight": 1, "unit": "ops", "success": False}

delegate = mock.Mock(side_effect=[elasticsearch.ConnectionError("N/A", "no route to host"), failed_return_value])
es = None
params = {
"retries": 3,
"retry-wait-period": 0.01,
"retry-on-timeout": True,
"retry-on-error": False
}
retrier = runner.Retry(delegate)

result = retrier(es, params)
self.assertEqual(failed_return_value, result)

delegate.assert_has_calls([
# has returned a connection error
mock.call(es, params),
# has returned normally
mock.call(es, params)
])

def test_retries_mixed_timeout_and_application_errors(self):
import elasticsearch
connection_error = elasticsearch.ConnectionError("N/A", "no route to host")
failed_return_value = {"weight": 1, "unit": "ops", "success": False}
success_return_value = {"weight": 1, "unit": "ops", "success": False}

delegate = mock.Mock(side_effect=[
connection_error,
failed_return_value,
connection_error,
connection_error,
failed_return_value,
success_return_value]
)
es = None
params = {
# we try exactly as often as there are errors to also test the semantics of "retry".
"retries": 5,
"retry-wait-period": 0.01,
"retry-on-timeout": True,
"retry-on-error": True
}
retrier = runner.Retry(delegate)

result = retrier(es, params)
self.assertEqual(success_return_value, result)

delegate.assert_has_calls([
# connection error
mock.call(es, params),
# application error
mock.call(es, params),
# connection error
mock.call(es, params),
# connection error
mock.call(es, params),
# application error
mock.call(es, params),
# success
mock.call(es, params)
])

def test_does_not_retry_on_timeout_if_not_wanted(self):
import elasticsearch

delegate = mock.Mock(side_effect=elasticsearch.ConnectionTimeout(408, "timed out"))
es = None
params = {
"retries": 3,
"retry-wait-period": 0.01,
"retry-on-timeout": False,
"retry-on-error": True
}
retrier = runner.Retry(delegate)

with self.assertRaises(elasticsearch.ConnectionTimeout):
retrier(es, params)

delegate.assert_called_once_with(es, params)

def test_retries_on_application_error_if_wanted(self):
failed_return_value = {"weight": 1, "unit": "ops", "success": False}
success_return_value = {"weight": 1, "unit": "ops", "success": True}

delegate = mock.Mock(side_effect=[failed_return_value, success_return_value])
es = None
params = {
"retries": 3,
"retry-wait-period": 0.01,
"retry-on-timeout": False,
"retry-on-error": True
}
retrier = runner.Retry(delegate)

result = retrier(es, params)

self.assertEqual(success_return_value, result)

delegate.assert_has_calls([
mock.call(es, params),
# one retry
mock.call(es, params)
])

def test_does_not_retry_on_application_error_if_not_wanted(self):
failed_return_value = {"weight": 1, "unit": "ops", "success": False}

delegate = mock.Mock(return_value=failed_return_value)
es = None
params = {
"retries": 3,
"retry-wait-period": 0.01,
"retry-on-timeout": True,
"retry-on-error": False
}
retrier = runner.Retry(delegate)

result = retrier(es, params)

self.assertEqual(failed_return_value, result)

delegate.assert_called_once_with(es, params)

def test_assumes_success_if_runner_returns_non_dict(self):
delegate = mock.Mock(return_value=(1, "ops"))
es = None
params = {
"retries": 3,
"retry-wait-period": 0.01,
"retry-on-timeout": True,
"retry-on-error": True
}
retrier = runner.Retry(delegate)

result = retrier(es, params)

self.assertEqual((1, "ops"), result)

delegate.assert_called_once_with(es, params)

0 comments on commit bab1224

Please sign in to comment.