Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable actor message handlers to fail immediately #471

Merged
merged 2 commits into from
Apr 18, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 51 additions & 0 deletions esrally/actor.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,57 @@ class BenchmarkCancelled:
pass


def parametrized(decorator):
"""

Helper meta-decorator that allows us to provide parameters to a decorator.

:param decorator: The decorator that should accept parameters.
"""
def inner(*args, **kwargs):
def g(f):
return decorator(f, *args, **kwargs)
return g
return inner


@parametrized
def no_retry(f, actor_name):
"""

Decorator intended for Thespian message handlers with the signature ``receiveMsg_$MSG_NAME(self, msg, sender)``. Thespian will
assume that a message handler that raises an exception can be retried. It will then retry once and give up afterwards just leaving
a trace of that in the actor system's internal log file. However, this is usually *not* what we want in Rally. If handling of a
message fails we instead want to notify a node higher up in the actor hierarchy.

We achieve that by sending a ``BenchmarkFailure`` message to the original sender. Note that this might as well be the current
actor (e.g. when handling a ``Wakeup`` message). In that case the actor itself is responsible for forwarding the benchmark failure
to its parent actor.

Example usage:

@no_retry("special forces actor")
def receiveMsg_DefuseBomb(self, msg, sender):
# might raise an exception
pass

If this message handler raises an exception, the decorator will turn it into a ``BenchmarkFailure`` message with its ``message``
property set to "Error in special forces actor" which is returned to the original sender.

:param f: The message handler. Does not need to passed directly, this is handled by the decorator infrastructure.
:param actor_name: A human readable name of the current actor that should be used in the exception message.
"""
def guard(self, msg, sender):
try:
return f(self, msg, sender)
except BaseException as e:
msg = "Error in {}".format(actor_name)
# log here as the full trace might get lost.
logger.exception(msg)
self.send(sender, BenchmarkFailure(msg, e))
return guard


class RallyActor(thespian.actors.ActorTypeDispatcher):
def __init__(self, *args, **kw):
super().__init__(*args, **kw)
Expand Down
14 changes: 14 additions & 0 deletions esrally/driver/driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -183,22 +183,27 @@ def receiveMsg_ChildActorExited(self, msg, sender):
def receiveUnrecognizedMessage(self, msg, sender):
logger.info("Main driver received unknown message [%s] (ignoring)." % (str(msg)))

@actor.no_retry("driver")
def receiveMsg_StartBenchmark(self, msg, sender):
self.start_sender = sender
self.coordinator = Driver(self, msg.config)
self.coordinator.start_benchmark(msg.track, msg.lap, msg.metrics_meta_info)
self.wakeupAfter(datetime.timedelta(seconds=DriverActor.WAKEUP_INTERVAL_SECONDS))

@actor.no_retry("driver")
def receiveMsg_TrackPrepared(self, msg, sender):
self.transition_when_all_children_responded(sender, msg,
expected_status=None, new_status=None, transition=self.after_track_prepared)

@actor.no_retry("driver")
def receiveMsg_JoinPointReached(self, msg, sender):
self.coordinator.joinpoint_reached(msg.client_id, msg.client_local_timestamp, msg.task)

@actor.no_retry("driver")
def receiveMsg_UpdateSamples(self, msg, sender):
self.coordinator.update_samples(msg.samples)

@actor.no_retry("driver")
def receiveMsg_WakeupMessage(self, msg, sender):
if msg.payload == DriverActor.RESET_RELATIVE_TIME_MARKER:
self.coordinator.reset_relative_time()
Expand Down Expand Up @@ -273,6 +278,7 @@ def __init__(self):
super().__init__()
actor.RallyActor.configure_logging(logger)

@actor.no_retry("track preparator")
def receiveMsg_PrepareTrack(self, msg, sender):
# load node-specific config to have correct paths available
cfg = load_local_config(msg.config)
Expand Down Expand Up @@ -574,6 +580,7 @@ def __init__(self):
self.start_driving = False
self.wakeup_interval = LoadGenerator.WAKEUP_INTERVAL_SECONDS

@actor.no_retry("load generator")
def receiveMsg_StartLoadGenerator(self, msg, sender):
logger.info("LoadGenerator[%d] is about to start." % msg.client_id)
self.master = sender
Expand All @@ -594,13 +601,15 @@ def receiveMsg_StartLoadGenerator(self, msg, sender):
track.load_track_plugins(self.config, runner.register_runner, scheduler.register_scheduler)
self.drive()

@actor.no_retry("load generator")
def receiveMsg_Drive(self, msg, sender):
sleep_time = datetime.timedelta(seconds=msg.client_start_timestamp - time.perf_counter())
logger.info("LoadGenerator[%d] is continuing its work at task index [%d] on [%f], that is in [%s]." %
(self.client_id, self.current_task_index, msg.client_start_timestamp, sleep_time))
self.start_driving = True
self.wakeupAfter(sleep_time)

@actor.no_retry("load generator")
def receiveMsg_CompleteCurrentTask(self, msg, sender):
# finish now ASAP. Remaining samples will be sent with the next WakeupMessage. We will also need to skip to the next
# JoinPoint. But if we are already at a JoinPoint at the moment, there is nothing to do.
Expand All @@ -612,6 +621,7 @@ def receiveMsg_CompleteCurrentTask(self, msg, sender):
% (str(self.client_id), self.current_task))
self.complete.set()

@actor.no_retry("load generator")
def receiveMsg_WakeupMessage(self, msg, sender):
# it would be better if we could send ourselves a message at a specific time, simulate this with a boolean...
if self.start_driving:
Expand Down Expand Up @@ -652,6 +662,10 @@ def receiveMsg_ActorExitRequest(self, msg, sender):
self.cancel.set()
self.pool.shutdown()

def receiveMsg_BenchmarkFailure(self, msg, sender):
# sent by our no_retry infrastructure; forward to master
self.send(self.master, msg)

def receiveUnrecognizedMessage(self, msg, sender):
logger.info("LoadGenerator[%d] received unknown message [%s] (ignoring)." % (self.client_id, str(msg)))

Expand Down
15 changes: 13 additions & 2 deletions esrally/mechanic/mechanic.py
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,7 @@ def receiveMsg_PoisonMessage(self, msg, sender):
logger.error(failmsg)
self.send(self.race_control, actor.BenchmarkFailure(failmsg))

@actor.no_retry("mechanic")
def receiveMsg_StartEngine(self, msg, sender):
logger.info("Received signal from race control to start engine.")
self.race_control = sender
Expand Down Expand Up @@ -278,6 +279,7 @@ def receiveMsg_StartEngine(self, msg, sender):
self.status = "starting"
self.received_responses = []

@actor.no_retry("mechanic")
def receiveMsg_NodesStarted(self, msg, sender):
self.metrics_store.merge_meta_info(msg.system_meta_info)

Expand All @@ -291,18 +293,22 @@ def receiveMsg_NodesStarted(self, msg, sender):

self.transition_when_all_children_responded(sender, msg, "starting", "nodes_started", self.on_all_nodes_started)

@actor.no_retry("mechanic")
def receiveMsg_MetricsMetaInfoApplied(self, msg, sender):
self.transition_when_all_children_responded(sender, msg, "apply_meta_info", "cluster_started", self.on_cluster_started)

@actor.no_retry("mechanic")
def receiveMsg_OnBenchmarkStart(self, msg, sender):
self.metrics_store.lap = msg.lap
# in the first lap, we are in state "cluster_started", after that in "benchmark_stopped"
self.send_to_children_and_transition(sender, msg, ["cluster_started", "benchmark_stopped"], "benchmark_starting")

@actor.no_retry("mechanic")
def receiveMsg_BenchmarkStarted(self, msg, sender):
self.transition_when_all_children_responded(
sender, msg, "benchmark_starting", "benchmark_started", self.on_benchmark_started)

@actor.no_retry("mechanic")
def receiveMsg_ResetRelativeTime(self, msg, sender):
if msg.reset_in_seconds > 0:
self.wakeupAfter(msg.reset_in_seconds)
Expand All @@ -315,21 +321,25 @@ def receiveMsg_WakeupMessage(self, msg, sender):
def receiveMsg_BenchmarkFailure(self, msg, sender):
self.send(self.race_control, msg)

@actor.no_retry("mechanic")
def receiveMsg_OnBenchmarkStop(self, msg, sender):
self.send_to_children_and_transition(sender, msg, "benchmark_started", "benchmark_stopping")

@actor.no_retry("mechanic")
def receiveMsg_BenchmarkStopped(self, msg, sender):
self.metrics_store.bulk_add(msg.system_metrics)
self.transition_when_all_children_responded(
sender, msg, "benchmark_stopping", "benchmark_stopped", self.on_benchmark_stopped)

@actor.no_retry("mechanic")
def receiveMsg_StopEngine(self, msg, sender):
# detach from cluster and gather all system metrics
self.cluster_launcher.stop(self.cluster)
# we might have experienced a launch error or the user has cancelled the benchmark. Hence we need to allow to stop the
# cluster from various states and we don't check here for a specific one.
self.send_to_children_and_transition(sender, StopNodes(), [], "cluster_stopping")

@actor.no_retry("mechanic")
def receiveMsg_NodesStopped(self, msg, sender):
self.metrics_store.bulk_add(msg.system_metrics)
self.transition_when_all_children_responded(sender, msg, "cluster_stopping", "cluster_stopped", self.on_all_nodes_stopped)
Expand Down Expand Up @@ -401,12 +411,13 @@ def __init__(self):
Dispatcher.
"""

@actor.no_retry("mechanic dispatcher")
def receiveMsg_StartEngine(self, startmsg, sender):
all_ips_and_ports = to_ip_port(startmsg.hosts)
all_node_ips = extract_all_node_ips(all_ips_and_ports)
self.start_sender = sender
self.pending = []
self.remotes = defaultdict(list)
all_ips_and_ports = to_ip_port(startmsg.hosts)
all_node_ips = extract_all_node_ips(all_ips_and_ports)

for (ip, port), node in nodes_by_host(all_ips_and_ports).items():
submsg = startmsg.for_nodes(all_node_ips, ip, port, node)
Expand Down