Skip to content
This repository has been archived by the owner on Nov 8, 2022. It is now read-only.

Commit

Permalink
SDI-818 restart plugin to the allowed count at the DeadAvailablePlugi…
Browse files Browse the repository at this point in the history
…nEvent
  • Loading branch information
candysmurf committed Feb 2, 2016
1 parent 0e6725e commit 408e55a
Show file tree
Hide file tree
Showing 3 changed files with 64 additions and 16 deletions.
37 changes: 21 additions & 16 deletions control/control_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1118,26 +1118,31 @@ func TestFailedPlugin(t *testing.T) {
Convey("create a pool, add subscriptions and start plugins", func() {
pool, errp := c.pluginRunner.AvailablePlugins().getOrCreatePool("collector:mock:2")
So(errp, ShouldBeNil)
pool.subscribe("1", unboundSubscriptionType)
pool.Subscribe("1", strategy.UnboundSubscriptionType)
err = c.pluginRunner.runPlugin(lp.Details)
So(err, ShouldBeNil)

Convey("collect metrics against a plugin that will panic", func() {
So(len(pool.plugins), ShouldEqual, 1)
cr, err := c.CollectMetrics(m, time.Now().Add(time.Second*1))
So(err, ShouldNotBeNil)
So(cr, ShouldBeNil)
<-lpe.done
So(lpe.plugin.EventNamespace, ShouldResemble, control_event.AvailablePluginDead)
// sleep is necessary unless/until we add an event that is fired when the AP is removed
// from the array of plugins on the pool
time.Sleep(100 * time.Millisecond)
So(len(pool.plugins), ShouldEqual, 1)
// TODO: On the AvailablePluginDeadEvent we should attempt to restart the plugin
// ensuring that we can't enter an endless failure loop.
//
// - Add a configurable max-failure-count for running plugins
// - Add the logic that detects a failure loop to the eligibilty method on the pool
So(len(pool.Plugins()), ShouldEqual, 1)

var err []error
var cr []core.Metric
for i := 0; i <= MaximumRestartOnDeadPluginEvent; i++ {
cr, err = c.CollectMetrics(m, time.Now().Add(time.Second*1), uuid.New())
So(err, ShouldNotBeNil)
So(cr, ShouldBeNil)
<-lpe.done
So(lpe.plugin.EventNamespace, ShouldResemble, control_event.AvailablePluginDead)

time.Sleep(1 * time.Second)

if i < MaximumRestartOnDeadPluginEvent {
So(len(pool.Plugins()), ShouldEqual, 1)
So(pool.RestartCount(), ShouldEqual, i+1)
}
}
So(len(pool.Plugins()), ShouldEqual, 0)
So(pool.RestartCount(), ShouldEqual, MaximumRestartOnDeadPluginEvent)
})
})
c.Stop()
Expand Down
26 changes: 26 additions & 0 deletions control/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,10 @@ const (
PluginStopped
// PluginDisabled is the disabled state of a plugin
PluginDisabled

// MaximumRestartOnDeadPluginEvent is the maximum count of restarting a plugin
// after the event of control_event.DeadAvailablePluginEvent
MaximumRestartOnDeadPluginEvent = 3
)

// TBD
Expand Down Expand Up @@ -239,6 +243,7 @@ func (r *runner) HandleGomitEvent(e gomit.Event) {
"event": v.Namespace(),
"aplugin": v.String,
}).Warning("handling dead available plugin event")

pool, err := r.availablePlugins.getPool(v.Key)
if err != nil {
runnerLog.WithFields(log.Fields{
Expand All @@ -247,9 +252,22 @@ func (r *runner) HandleGomitEvent(e gomit.Event) {
}).Error(err.Error())
return
}

if pool != nil {
pool.Kill(v.Id, "plugin dead")
}

if pool.Eligible() && pool.RestartCount() < MaximumRestartOnDeadPluginEvent {
e := r.restartPlugin(v.Key)
if e != nil {
runnerLog.WithFields(log.Fields{
"_block": "handle-events",
"aplugin": v.String,
}).Error(err.Error())
return
}
pool.IncRestartCount()
}
case *control_event.PluginUnsubscriptionEvent:
runnerLog.WithFields(log.Fields{
"_block": "subscribe-pool",
Expand Down Expand Up @@ -422,3 +440,11 @@ func (r *runner) handleUnsubscription(pType, pName string, pVersion int, taskID
}
return nil
}

func (r *runner) restartPlugin(key string) error {
lp, err := r.pluginManager.get(key)
if err != nil {
return err
}
return r.runPlugin(lp.Details)
}
17 changes: 17 additions & 0 deletions control/strategy/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@ type Pool interface {
SubscriptionCount() int
Unsubscribe(taskID string)
Version() int
RestartCount() int
IncRestartCount()
}

type AvailablePlugin interface {
Expand Down Expand Up @@ -122,6 +124,10 @@ type pool struct {
// The routing and caching strategy declared by the plugin.
// strategy RoutingAndCaching
RoutingAndCaching

// restartCount the restart count of available plugins
// when the DeadAvailablePluginEvent occurs
restartCount int
}

func NewPool(key string, plugins ...AvailablePlugin) (Pool, error) {
Expand Down Expand Up @@ -164,6 +170,17 @@ func (p *pool) Strategy() RoutingAndCaching {
return p.RoutingAndCaching
}

// RestartCount returns the restart count of a pool
func (p *pool) RestartCount() int {
return p.restartCount
}

func (p *pool) IncRestartCount() {
p.RLock()
defer p.RUnlock()
p.restartCount++
}

// Insert inserts an AvailablePlugin into the pool
func (p *pool) Insert(a AvailablePlugin) error {
if a.Type() != plugin.CollectorPluginType && a.Type() != plugin.ProcessorPluginType && a.Type() != plugin.PublisherPluginType {
Expand Down

0 comments on commit 408e55a

Please sign in to comment.