From dc594249201f95c1cae3e7041dc60a5d6c5f036c Mon Sep 17 00:00:00 2001 From: Joel Cooklin Date: Mon, 1 Feb 2016 20:56:57 -0800 Subject: [PATCH] Adds event on plugin restart --- control/control_test.go | 24 ++++++++++---- control/runner.go | 37 +++++++++++++++------ core/control_event/control_event.go | 50 ++++++++++++++++++++++------- 3 files changed, 83 insertions(+), 28 deletions(-) diff --git a/control/control_test.go b/control/control_test.go index 266dac78a..457dd852a 100644 --- a/control/control_test.go +++ b/control/control_test.go @@ -246,6 +246,12 @@ func newListenToPluginEvent() *listenToPluginEvent { func (l *listenToPluginEvent) HandleGomitEvent(e gomit.Event) { switch v := e.Body.(type) { + case *control_event.RestartedAvailablePluginEvent: + l.plugin.EventNamespace = v.Namespace() + l.done <- struct{}{} + case *control_event.MaxPluginRestartsExceededEvent: + l.plugin.EventNamespace = v.Namespace() + l.done <- struct{}{} case *control_event.DeadAvailablePluginEvent: l.plugin.EventNamespace = v.Namespace() l.done <- struct{}{} @@ -1127,22 +1133,28 @@ func TestFailedPlugin(t *testing.T) { var err []error var cr []core.Metric - for i := 0; i <= MaximumRestartOnDeadPluginEvent; i++ { + eventMap := map[string]int{} + for i := 0; i <= MaxPluginRestartCount; i++ { cr, err = c.CollectMetrics(m, time.Now().Add(time.Second*1), uuid.New()) So(err, ShouldNotBeNil) So(cr, ShouldBeNil) <-lpe.done - So(lpe.plugin.EventNamespace, ShouldResemble, control_event.AvailablePluginDead) - - time.Sleep(700 * time.Millisecond) + eventMap[lpe.plugin.EventNamespace]++ - if i < 3 { + if i < MaxPluginRestartCount { + <-lpe.done + eventMap[lpe.plugin.EventNamespace]++ So(len(pool.Plugins()), ShouldEqual, 1) So(pool.RestartCount(), ShouldEqual, i+1) } } + <-lpe.done + eventMap[lpe.plugin.EventNamespace]++ + So(eventMap[control_event.AvailablePluginDead], ShouldEqual, 4) + So(eventMap[control_event.AvailablePluginRestarted], ShouldEqual, 3) + So(eventMap[control_event.PluginRestartsExceeded], ShouldEqual, 1) So(len(pool.Plugins()), ShouldEqual, 0) - So(pool.RestartCount(), ShouldEqual, MaximumRestartOnDeadPluginEvent) + So(pool.RestartCount(), ShouldEqual, MaxPluginRestartCount) }) }) c.Stop() diff --git a/control/runner.go b/control/runner.go index 1bfcdee98..d30f01a88 100644 --- a/control/runner.go +++ b/control/runner.go @@ -56,7 +56,7 @@ const ( // MaximumRestartOnDeadPluginEvent is the maximum count of restarting a plugin // after the event of control_event.DeadAvailablePluginEvent - MaximumRestartOnDeadPluginEvent = 3 + MaxPluginRestartCount = 3 ) // TBD @@ -257,16 +257,33 @@ func (r *runner) HandleGomitEvent(e gomit.Event) { pool.Kill(v.Id, "plugin dead") } - if pool.Eligible() && pool.RestartCount() < MaximumRestartOnDeadPluginEvent { - e := r.restartPlugin(v.Key) - if e != nil { - runnerLog.WithFields(log.Fields{ - "_block": "handle-events", - "aplugin": v.String, - }).Error(err.Error()) - return + if pool.Eligible() { + if pool.RestartCount() < MaxPluginRestartCount { + e := r.restartPlugin(v.Key) + if e != nil { + runnerLog.WithFields(log.Fields{ + "_block": "handle-events", + "aplugin": v.String, + }).Error(err.Error()) + return + } + pool.IncRestartCount() + r.emitter.Emit(&control_event.RestartedAvailablePluginEvent{ + Id: v.Id, + Name: v.Name, + Version: v.Version, + Key: v.Key, + Type: v.Type, + }) + } else { + r.emitter.Emit(&control_event.MaxPluginRestartsExceededEvent{ + Id: v.Id, + Name: v.Name, + Version: v.Version, + Key: v.Key, + Type: v.Type, + }) } - pool.IncRestartCount() } case *control_event.PluginUnsubscriptionEvent: runnerLog.WithFields(log.Fields{ diff --git a/core/control_event/control_event.go b/core/control_event/control_event.go index 40b46d71b..da1843da6 100644 --- a/core/control_event/control_event.go +++ b/core/control_event/control_event.go @@ -20,18 +20,20 @@ limitations under the License. package control_event const ( - AvailablePluginDead = "Control.AvailablePluginDead" - PluginLoaded = "Control.PluginLoaded" - PluginUnloaded = "Control.PluginUnloaded" - PluginsSwapped = "Control.PluginsSwapped" - PluginSubscribed = "Control.PluginSubscribed" - PluginUnsubscribed = "Control.PluginUnsubscribed" - ProcessorSubscribed = "Control.ProcessorSubscribed" - ProcessorUnsubscribed = "Control.ProcessorUnsubscribed" - MetricSubscribed = "Control.MetricSubscribed" - MetricUnsubscribed = "Control.MetricUnsubscribed" - HealthCheckFailed = "Control.PluginHealthCheckFailed" - MoveSubscription = "Control.PluginSubscriptionMoved" + AvailablePluginDead = "Control.AvailablePluginDead" + AvailablePluginRestarted = "Control.RestartedAvailablePlugin" + PluginRestartsExceeded = "Control.PluginRestartsExceeded" + PluginLoaded = "Control.PluginLoaded" + PluginUnloaded = "Control.PluginUnloaded" + PluginsSwapped = "Control.PluginsSwapped" + PluginSubscribed = "Control.PluginSubscribed" + PluginUnsubscribed = "Control.PluginUnsubscribed" + ProcessorSubscribed = "Control.ProcessorSubscribed" + ProcessorUnsubscribed = "Control.ProcessorUnsubscribed" + MetricSubscribed = "Control.MetricSubscribed" + MetricUnsubscribed = "Control.MetricUnsubscribed" + HealthCheckFailed = "Control.PluginHealthCheckFailed" + MoveSubscription = "Control.PluginSubscriptionMoved" ) type LoadPluginEvent struct { @@ -68,6 +70,30 @@ func (e *DeadAvailablePluginEvent) Namespace() string { return AvailablePluginDead } +type RestartedAvailablePluginEvent struct { + Name string + Version int + Type int + Key string + Id uint32 +} + +func (e *MaxPluginRestartsExceededEvent) Namespace() string { + return PluginRestartsExceeded +} + +type MaxPluginRestartsExceededEvent struct { + Name string + Version int + Type int + Key string + Id uint32 +} + +func (e *RestartedAvailablePluginEvent) Namespace() string { + return AvailablePluginRestarted +} + type SwapPluginsEvent struct { LoadedPluginName string LoadedPluginVersion int