Skip to content

Commit

Permalink
Throttle tasks based on presence of a scheduler (#1109)
Browse files Browse the repository at this point in the history
Previously Rally has considered a task only throttled if it specified a
target throughput. This assumes that there is a meaningful target
throughput to specify though. Consider a more complex case where we want
to simulate a traffic pattern that changes over the course of a
(simulated) day. In that case there is no single "right" target
throughput that can be specified.

With this commit we consider tasks also throttled if no target
throughput is specified but the user has supplied a scheduler
explicitly. We assume that the scheduler will use different parameters
specified on the task to determine its behavior.
  • Loading branch information
danielmitterdorfer authored Nov 9, 2020
1 parent 202314b commit 93b7eee
Show file tree
Hide file tree
Showing 5 changed files with 55 additions and 12 deletions.
9 changes: 5 additions & 4 deletions esrally/driver/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,10 +90,11 @@ def scheduler_for(task: esrally.track.Task):
if not task.throttled:
return Unthrottled()

schedule = task.schedule or "deterministic"
try:
scheduler_class = __SCHEDULERS[task.schedule]
scheduler_class = __SCHEDULERS[schedule]
except KeyError:
raise exceptions.RallyError(f"No scheduler available for name [{task.schedule}]")
raise exceptions.RallyError(f"No scheduler available for name [{schedule}]")

# for backwards-compatibility - treat existing schedulers as top-level schedulers
if is_legacy_scheduler(scheduler_class):
Expand All @@ -113,7 +114,7 @@ def is_legacy_scheduler(scheduler_class):
target throughput.
"""
constructor_params = inspect.signature(scheduler_class.__init__).parameters
return len(constructor_params) == 2 and "params" in constructor_params
return len(constructor_params) >= 2 and "params" in constructor_params


def is_simple_scheduler(scheduler_class):
Expand Down Expand Up @@ -195,7 +196,7 @@ def __init__(self, task, legacy_scheduler_class):
self.legacy_scheduler = legacy_scheduler_class(task.params)

def next(self, current):
return self.legacy_scheduler(current)
return self.legacy_scheduler.next(current)


class Unthrottled(Scheduler):
Expand Down
4 changes: 2 additions & 2 deletions esrally/track/loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -796,7 +796,7 @@ def post_process_for_test_mode(t):
logger.debug("Resetting measurement time period for [%s] to [%d] seconds.", str(leaf_task), leaf_task.time_period)

# Keep throttled to expose any errors but increase the target throughput for short execution times.
if leaf_task.throttled:
if leaf_task.throttled and leaf_task.target_throughput:
original_throughput = leaf_task.target_throughput
leaf_task.params.pop("target-throughput", None)
leaf_task.params.pop("target-interval", None)
Expand Down Expand Up @@ -1321,7 +1321,7 @@ def parse_task(self, task_spec, ops, challenge_name, default_warmup_iterations=N
# may as well an inline operation
op = self.parse_operation(op_spec, error_ctx="inline operation in challenge %s" % challenge_name)

schedule = self._r(task_spec, "schedule", error_ctx=op.name, mandatory=False, default_value="deterministic")
schedule = self._r(task_spec, "schedule", error_ctx=op.name, mandatory=False)
task_name = self._r(task_spec, "name", error_ctx=op.name, mandatory=False, default_value=op.name)
task = track.Task(name=task_name,
operation=op,
Expand Down
7 changes: 3 additions & 4 deletions esrally/track/track.py
Original file line number Diff line number Diff line change
Expand Up @@ -764,9 +764,8 @@ def __eq__(self, other):
class Task:
THROUGHPUT_PATTERN = re.compile(r"(?P<value>(\d*\.)?\d+)\s(?P<unit>\w+/s)")

def __init__(self, name, operation, meta_data=None, warmup_iterations=None, iterations=None, warmup_time_period=None, time_period=None,
clients=1,
completes_parent=False, schedule="deterministic", params=None):
def __init__(self, name, operation, meta_data=None, warmup_iterations=None, iterations=None, warmup_time_period=None,
time_period=None, clients=1, completes_parent=False, schedule=None, params=None):
self.name = name
self.operation = operation
self.meta_data = meta_data if meta_data else {}
Expand Down Expand Up @@ -824,7 +823,7 @@ def numeric(v):

@property
def throttled(self):
return self.target_throughput is not None
return self.schedule is not None or self.target_throughput is not None

def __hash__(self):
# Note that we do not include `params` in __hash__ and __eq__ (the other attributes suffice to uniquely define a task)
Expand Down
35 changes: 35 additions & 0 deletions tests/driver/scheduler_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,8 +126,14 @@ class LegacyScheduler:
def __init__(self, params):
pass

class LegacySchedulerWithAdditionalArgs:
# pylint: disable=unused-variable
def __init__(self, params, my_default_param=True):
pass

def test_detects_legacy_scheduler(self):
self.assertTrue(scheduler.is_legacy_scheduler(SchedulerCategorizationTests.LegacyScheduler))
self.assertTrue(scheduler.is_legacy_scheduler(SchedulerCategorizationTests.LegacySchedulerWithAdditionalArgs))

def test_a_regular_scheduler_is_not_a_legacy_scheduler(self):
self.assertFalse(scheduler.is_legacy_scheduler(scheduler.DeterministicScheduler))
Expand All @@ -138,3 +144,32 @@ def test_is_simple_scheduler(self):

def test_is_not_simple_scheduler(self):
self.assertFalse(scheduler.is_simple_scheduler(scheduler.UnitAwareScheduler))


class LegacyWrappingSchedulerTests(TestCase):
class SimpleLegacyScheduler:
# pylint: disable=unused-variable
def __init__(self, params):
pass

def next(self, current):
return current

def setUp(self):
scheduler.register_scheduler("simple", LegacyWrappingSchedulerTests.SimpleLegacyScheduler)

def tearDown(self):
scheduler.remove_scheduler("simple")

def test_legacy_scheduler(self):
task = track.Task(name="raw-request",
operation=track.Operation(
name="raw",
operation_type=track.OperationType.RawRequest.name),
clients=1,
schedule="simple")

s = scheduler.scheduler_for(task)

self.assertEqual(0, s.next(0))
self.assertEqual(0, s.next(0))
12 changes: 10 additions & 2 deletions tests/track/track_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -226,26 +226,34 @@ def test_cannot_union_mixed_document_corpora_by_meta_data(self):


class TaskTests(TestCase):
def task(self, target_throughput=None, target_interval=None):
def task(self, schedule=None, target_throughput=None, target_interval=None):
op = track.Operation("bulk-index", track.OperationType.Bulk.name)
params = {}
if target_throughput:
params["target-throughput"] = target_throughput
if target_interval:
params["target-interval"] = target_interval
return track.Task("test", op, params=params)
return track.Task("test", op, schedule=schedule, params=params)

def test_unthrottled_task(self):
task = self.task()
self.assertIsNone(task.target_throughput)
self.assertFalse(task.throttled)

def test_task_with_scheduler_is_throttled(self):
task = self.task(schedule="daily-traffic-pattern")
self.assertIsNone(task.target_throughput)
self.assertTrue(task.throttled)

def test_valid_throughput_with_unit(self):
task = self.task(target_throughput="5 MB/s")
self.assertEqual(track.Throughput(5.0, "MB/s"), task.target_throughput)
self.assertTrue(task.throttled)

def test_valid_throughput_numeric(self):
task = self.task(target_throughput=3.2)
self.assertEqual(track.Throughput(3.2, "ops/s"), task.target_throughput)
self.assertTrue(task.throttled)

def test_invalid_throughput_format_is_rejected(self):
task = self.task(target_throughput="3.2 docs")
Expand Down

0 comments on commit 93b7eee

Please sign in to comment.