Skip to content
This repository has been archived by the owner on Nov 8, 2022. It is now read-only.

Commit

Permalink
Adds event on plugin restart
Browse files Browse the repository at this point in the history
  • Loading branch information
jcooklin committed Feb 18, 2016
1 parent 04133ab commit dc59424
Show file tree
Hide file tree
Showing 3 changed files with 83 additions and 28 deletions.
24 changes: 18 additions & 6 deletions control/control_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,12 @@ func newListenToPluginEvent() *listenToPluginEvent {

func (l *listenToPluginEvent) HandleGomitEvent(e gomit.Event) {
switch v := e.Body.(type) {
case *control_event.RestartedAvailablePluginEvent:
l.plugin.EventNamespace = v.Namespace()
l.done <- struct{}{}
case *control_event.MaxPluginRestartsExceededEvent:
l.plugin.EventNamespace = v.Namespace()
l.done <- struct{}{}
case *control_event.DeadAvailablePluginEvent:
l.plugin.EventNamespace = v.Namespace()
l.done <- struct{}{}
Expand Down Expand Up @@ -1127,22 +1133,28 @@ func TestFailedPlugin(t *testing.T) {

var err []error
var cr []core.Metric
for i := 0; i <= MaximumRestartOnDeadPluginEvent; i++ {
eventMap := map[string]int{}
for i := 0; i <= MaxPluginRestartCount; i++ {
cr, err = c.CollectMetrics(m, time.Now().Add(time.Second*1), uuid.New())
So(err, ShouldNotBeNil)
So(cr, ShouldBeNil)
<-lpe.done
So(lpe.plugin.EventNamespace, ShouldResemble, control_event.AvailablePluginDead)

time.Sleep(700 * time.Millisecond)
eventMap[lpe.plugin.EventNamespace]++

if i < 3 {
if i < MaxPluginRestartCount {
<-lpe.done
eventMap[lpe.plugin.EventNamespace]++
So(len(pool.Plugins()), ShouldEqual, 1)
So(pool.RestartCount(), ShouldEqual, i+1)
}
}
<-lpe.done
eventMap[lpe.plugin.EventNamespace]++
So(eventMap[control_event.AvailablePluginDead], ShouldEqual, 4)
So(eventMap[control_event.AvailablePluginRestarted], ShouldEqual, 3)
So(eventMap[control_event.PluginRestartsExceeded], ShouldEqual, 1)
So(len(pool.Plugins()), ShouldEqual, 0)
So(pool.RestartCount(), ShouldEqual, MaximumRestartOnDeadPluginEvent)
So(pool.RestartCount(), ShouldEqual, MaxPluginRestartCount)
})
})
c.Stop()
Expand Down
37 changes: 27 additions & 10 deletions control/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ const (

// MaximumRestartOnDeadPluginEvent is the maximum count of restarting a plugin
// after the event of control_event.DeadAvailablePluginEvent
MaximumRestartOnDeadPluginEvent = 3
MaxPluginRestartCount = 3
)

// TBD
Expand Down Expand Up @@ -257,16 +257,33 @@ func (r *runner) HandleGomitEvent(e gomit.Event) {
pool.Kill(v.Id, "plugin dead")
}

if pool.Eligible() && pool.RestartCount() < MaximumRestartOnDeadPluginEvent {
e := r.restartPlugin(v.Key)
if e != nil {
runnerLog.WithFields(log.Fields{
"_block": "handle-events",
"aplugin": v.String,
}).Error(err.Error())
return
if pool.Eligible() {
if pool.RestartCount() < MaxPluginRestartCount {
e := r.restartPlugin(v.Key)
if e != nil {
runnerLog.WithFields(log.Fields{
"_block": "handle-events",
"aplugin": v.String,
}).Error(err.Error())
return
}
pool.IncRestartCount()
r.emitter.Emit(&control_event.RestartedAvailablePluginEvent{
Id: v.Id,
Name: v.Name,
Version: v.Version,
Key: v.Key,
Type: v.Type,
})
} else {
r.emitter.Emit(&control_event.MaxPluginRestartsExceededEvent{
Id: v.Id,
Name: v.Name,
Version: v.Version,
Key: v.Key,
Type: v.Type,
})
}
pool.IncRestartCount()
}
case *control_event.PluginUnsubscriptionEvent:
runnerLog.WithFields(log.Fields{
Expand Down
50 changes: 38 additions & 12 deletions core/control_event/control_event.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,20 @@ limitations under the License.
package control_event

const (
AvailablePluginDead = "Control.AvailablePluginDead"
PluginLoaded = "Control.PluginLoaded"
PluginUnloaded = "Control.PluginUnloaded"
PluginsSwapped = "Control.PluginsSwapped"
PluginSubscribed = "Control.PluginSubscribed"
PluginUnsubscribed = "Control.PluginUnsubscribed"
ProcessorSubscribed = "Control.ProcessorSubscribed"
ProcessorUnsubscribed = "Control.ProcessorUnsubscribed"
MetricSubscribed = "Control.MetricSubscribed"
MetricUnsubscribed = "Control.MetricUnsubscribed"
HealthCheckFailed = "Control.PluginHealthCheckFailed"
MoveSubscription = "Control.PluginSubscriptionMoved"
AvailablePluginDead = "Control.AvailablePluginDead"
AvailablePluginRestarted = "Control.RestartedAvailablePlugin"
PluginRestartsExceeded = "Control.PluginRestartsExceeded"
PluginLoaded = "Control.PluginLoaded"
PluginUnloaded = "Control.PluginUnloaded"
PluginsSwapped = "Control.PluginsSwapped"
PluginSubscribed = "Control.PluginSubscribed"
PluginUnsubscribed = "Control.PluginUnsubscribed"
ProcessorSubscribed = "Control.ProcessorSubscribed"
ProcessorUnsubscribed = "Control.ProcessorUnsubscribed"
MetricSubscribed = "Control.MetricSubscribed"
MetricUnsubscribed = "Control.MetricUnsubscribed"
HealthCheckFailed = "Control.PluginHealthCheckFailed"
MoveSubscription = "Control.PluginSubscriptionMoved"
)

type LoadPluginEvent struct {
Expand Down Expand Up @@ -68,6 +70,30 @@ func (e *DeadAvailablePluginEvent) Namespace() string {
return AvailablePluginDead
}

type RestartedAvailablePluginEvent struct {
Name string
Version int
Type int
Key string
Id uint32
}

func (e *MaxPluginRestartsExceededEvent) Namespace() string {
return PluginRestartsExceeded
}

type MaxPluginRestartsExceededEvent struct {
Name string
Version int
Type int
Key string
Id uint32
}

func (e *RestartedAvailablePluginEvent) Namespace() string {
return AvailablePluginRestarted
}

type SwapPluginsEvent struct {
LoadedPluginName string
LoadedPluginVersion int
Expand Down

0 comments on commit dc59424

Please sign in to comment.