From 994764b597b470dd839dda02ce9eaf33c6463034 Mon Sep 17 00:00:00 2001 From: Daniel Mitterdorfer Date: Wed, 18 Apr 2018 07:52:26 +0200 Subject: [PATCH 1/2] Enable actor message handlers to fail immediately With this commit we add a `no_retry` decorator that is meant to be used on actor message handlers. The actor framework (Thespian) will retry failed messages once after a failure and then give up. However, we don't want that behavior: Instead our actors should fail immediately in most cases and propagate a `BenchmarkFailure` to its parent. --- esrally/actor.py | 48 ++++++++++++++++++++++++++++++++++++ esrally/driver/driver.py | 14 +++++++++++ esrally/mechanic/mechanic.py | 15 +++++++++-- 3 files changed, 75 insertions(+), 2 deletions(-) diff --git a/esrally/actor.py b/esrally/actor.py index 16cf0a178..e113958f8 100644 --- a/esrally/actor.py +++ b/esrally/actor.py @@ -29,6 +29,54 @@ class BenchmarkCancelled: pass +def parametrized(decorator): + """ + + Helper meta-decorator that allows us to provide parameters to a decorator. + + :param decorator: The decorator that should accept parameters. + """ + def inner(*args, **kwargs): + def g(f): + return decorator(f, *args, **kwargs) + return g + return inner + + +@parametrized +def no_retry(f, actor_name): + """ + + Decorator intended for Thespian message handlers with the signature ``receiveMsg_$MSG_NAME(self, msg, sender)``. Thespian will + assume that a message handler that raises an exception can be retried. It will then retry once and give up afterwards just leaving + a trace of that in the actor system's internal log file. However, this is usually *not* what we want in Rally. If handling of a + message fails we instead want to notify a node higher up in the actor hierarchy. + + We achieve that by sending a ``BenchmarkFailure`` message to the original sender. Note that this might as well be the current + actor (e.g. when handling a ``Wakeup`` message). In that case the actor itself is responsible for forwarding the benchmark failure + to its parent actor. + + Example usage: + + @no_retry("special forces actor") + def receiveMsg_DefuseBomb(self, msg, sender): + # might raise an exception + pass + + If this message handler raises an exception, the decorator will turn it into a ``BenchmarkFailure`` message with its ``message`` + property set to "Error in special forces actor" which is returned to the original sender. + + :param f: The message handler. Does not need to passed directly, this is handled by the decorator infrastructure. + :param actor_name: A human readable name of the current actor that should be used in the exception message. + """ + def guard(self, msg, sender): + try: + return f(self, msg, sender) + except BaseException as e: + self.send(sender, BenchmarkFailure("Error in {}".format(actor_name), e)) + return guard + + class RallyActor(thespian.actors.ActorTypeDispatcher): def __init__(self, *args, **kw): super().__init__(*args, **kw) diff --git a/esrally/driver/driver.py b/esrally/driver/driver.py index 112471b46..c505f0778 100644 --- a/esrally/driver/driver.py +++ b/esrally/driver/driver.py @@ -183,22 +183,27 @@ def receiveMsg_ChildActorExited(self, msg, sender): def receiveUnrecognizedMessage(self, msg, sender): logger.info("Main driver received unknown message [%s] (ignoring)." % (str(msg))) + @actor.no_retry("driver") def receiveMsg_StartBenchmark(self, msg, sender): self.start_sender = sender self.coordinator = Driver(self, msg.config) self.coordinator.start_benchmark(msg.track, msg.lap, msg.metrics_meta_info) self.wakeupAfter(datetime.timedelta(seconds=DriverActor.WAKEUP_INTERVAL_SECONDS)) + @actor.no_retry("driver") def receiveMsg_TrackPrepared(self, msg, sender): self.transition_when_all_children_responded(sender, msg, expected_status=None, new_status=None, transition=self.after_track_prepared) + @actor.no_retry("driver") def receiveMsg_JoinPointReached(self, msg, sender): self.coordinator.joinpoint_reached(msg.client_id, msg.client_local_timestamp, msg.task) + @actor.no_retry("driver") def receiveMsg_UpdateSamples(self, msg, sender): self.coordinator.update_samples(msg.samples) + @actor.no_retry("driver") def receiveMsg_WakeupMessage(self, msg, sender): if msg.payload == DriverActor.RESET_RELATIVE_TIME_MARKER: self.coordinator.reset_relative_time() @@ -273,6 +278,7 @@ def __init__(self): super().__init__() actor.RallyActor.configure_logging(logger) + @actor.no_retry("track preparator") def receiveMsg_PrepareTrack(self, msg, sender): # load node-specific config to have correct paths available cfg = load_local_config(msg.config) @@ -574,6 +580,7 @@ def __init__(self): self.start_driving = False self.wakeup_interval = LoadGenerator.WAKEUP_INTERVAL_SECONDS + @actor.no_retry("load generator") def receiveMsg_StartLoadGenerator(self, msg, sender): logger.info("LoadGenerator[%d] is about to start." % msg.client_id) self.master = sender @@ -594,6 +601,7 @@ def receiveMsg_StartLoadGenerator(self, msg, sender): track.load_track_plugins(self.config, runner.register_runner, scheduler.register_scheduler) self.drive() + @actor.no_retry("load generator") def receiveMsg_Drive(self, msg, sender): sleep_time = datetime.timedelta(seconds=msg.client_start_timestamp - time.perf_counter()) logger.info("LoadGenerator[%d] is continuing its work at task index [%d] on [%f], that is in [%s]." % @@ -601,6 +609,7 @@ def receiveMsg_Drive(self, msg, sender): self.start_driving = True self.wakeupAfter(sleep_time) + @actor.no_retry("load generator") def receiveMsg_CompleteCurrentTask(self, msg, sender): # finish now ASAP. Remaining samples will be sent with the next WakeupMessage. We will also need to skip to the next # JoinPoint. But if we are already at a JoinPoint at the moment, there is nothing to do. @@ -612,6 +621,7 @@ def receiveMsg_CompleteCurrentTask(self, msg, sender): % (str(self.client_id), self.current_task)) self.complete.set() + @actor.no_retry("load generator") def receiveMsg_WakeupMessage(self, msg, sender): # it would be better if we could send ourselves a message at a specific time, simulate this with a boolean... if self.start_driving: @@ -652,6 +662,10 @@ def receiveMsg_ActorExitRequest(self, msg, sender): self.cancel.set() self.pool.shutdown() + def receiveMsg_BenchmarkFailure(self, msg, sender): + # sent by our no_retry infrastructure; forward to master + self.send(self.master, msg) + def receiveUnrecognizedMessage(self, msg, sender): logger.info("LoadGenerator[%d] received unknown message [%s] (ignoring)." % (self.client_id, str(msg))) diff --git a/esrally/mechanic/mechanic.py b/esrally/mechanic/mechanic.py index b14ec1cab..08fdd111e 100644 --- a/esrally/mechanic/mechanic.py +++ b/esrally/mechanic/mechanic.py @@ -248,6 +248,7 @@ def receiveMsg_PoisonMessage(self, msg, sender): logger.error(failmsg) self.send(self.race_control, actor.BenchmarkFailure(failmsg)) + @actor.no_retry("mechanic") def receiveMsg_StartEngine(self, msg, sender): logger.info("Received signal from race control to start engine.") self.race_control = sender @@ -278,6 +279,7 @@ def receiveMsg_StartEngine(self, msg, sender): self.status = "starting" self.received_responses = [] + @actor.no_retry("mechanic") def receiveMsg_NodesStarted(self, msg, sender): self.metrics_store.merge_meta_info(msg.system_meta_info) @@ -291,18 +293,22 @@ def receiveMsg_NodesStarted(self, msg, sender): self.transition_when_all_children_responded(sender, msg, "starting", "nodes_started", self.on_all_nodes_started) + @actor.no_retry("mechanic") def receiveMsg_MetricsMetaInfoApplied(self, msg, sender): self.transition_when_all_children_responded(sender, msg, "apply_meta_info", "cluster_started", self.on_cluster_started) + @actor.no_retry("mechanic") def receiveMsg_OnBenchmarkStart(self, msg, sender): self.metrics_store.lap = msg.lap # in the first lap, we are in state "cluster_started", after that in "benchmark_stopped" self.send_to_children_and_transition(sender, msg, ["cluster_started", "benchmark_stopped"], "benchmark_starting") + @actor.no_retry("mechanic") def receiveMsg_BenchmarkStarted(self, msg, sender): self.transition_when_all_children_responded( sender, msg, "benchmark_starting", "benchmark_started", self.on_benchmark_started) + @actor.no_retry("mechanic") def receiveMsg_ResetRelativeTime(self, msg, sender): if msg.reset_in_seconds > 0: self.wakeupAfter(msg.reset_in_seconds) @@ -315,14 +321,17 @@ def receiveMsg_WakeupMessage(self, msg, sender): def receiveMsg_BenchmarkFailure(self, msg, sender): self.send(self.race_control, msg) + @actor.no_retry("mechanic") def receiveMsg_OnBenchmarkStop(self, msg, sender): self.send_to_children_and_transition(sender, msg, "benchmark_started", "benchmark_stopping") + @actor.no_retry("mechanic") def receiveMsg_BenchmarkStopped(self, msg, sender): self.metrics_store.bulk_add(msg.system_metrics) self.transition_when_all_children_responded( sender, msg, "benchmark_stopping", "benchmark_stopped", self.on_benchmark_stopped) + @actor.no_retry("mechanic") def receiveMsg_StopEngine(self, msg, sender): # detach from cluster and gather all system metrics self.cluster_launcher.stop(self.cluster) @@ -330,6 +339,7 @@ def receiveMsg_StopEngine(self, msg, sender): # cluster from various states and we don't check here for a specific one. self.send_to_children_and_transition(sender, StopNodes(), [], "cluster_stopping") + @actor.no_retry("mechanic") def receiveMsg_NodesStopped(self, msg, sender): self.metrics_store.bulk_add(msg.system_metrics) self.transition_when_all_children_responded(sender, msg, "cluster_stopping", "cluster_stopped", self.on_all_nodes_stopped) @@ -401,12 +411,13 @@ def __init__(self): Dispatcher. """ + @actor.no_retry("mechanic dispatcher") def receiveMsg_StartEngine(self, startmsg, sender): - all_ips_and_ports = to_ip_port(startmsg.hosts) - all_node_ips = extract_all_node_ips(all_ips_and_ports) self.start_sender = sender self.pending = [] self.remotes = defaultdict(list) + all_ips_and_ports = to_ip_port(startmsg.hosts) + all_node_ips = extract_all_node_ips(all_ips_and_ports) for (ip, port), node in nodes_by_host(all_ips_and_ports).items(): submsg = startmsg.for_nodes(all_node_ips, ip, port, node) From 70faa2c7be3a769637e3f1feef26e95be97384b4 Mon Sep 17 00:00:00 2001 From: Daniel Mitterdorfer Date: Wed, 18 Apr 2018 08:00:53 +0200 Subject: [PATCH 2/2] Log full trace immediately --- esrally/actor.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/esrally/actor.py b/esrally/actor.py index e113958f8..f14faf918 100644 --- a/esrally/actor.py +++ b/esrally/actor.py @@ -73,7 +73,10 @@ def guard(self, msg, sender): try: return f(self, msg, sender) except BaseException as e: - self.send(sender, BenchmarkFailure("Error in {}".format(actor_name), e)) + msg = "Error in {}".format(actor_name) + # log here as the full trace might get lost. + logger.exception(msg) + self.send(sender, BenchmarkFailure(msg, e)) return guard