Skip to content
This repository has been archived by the owner on Nov 8, 2022. It is now read-only.

Commit

Permalink
Merge pull request #688 from candysmurf/sdi-818-jc
Browse files Browse the repository at this point in the history
SDI-818: Restart failed plugins
  • Loading branch information
jcooklin committed Feb 18, 2016
2 parents 46b3011 + 61984a5 commit be26613
Show file tree
Hide file tree
Showing 6 changed files with 195 additions and 17 deletions.
2 changes: 1 addition & 1 deletion control/available_plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,7 @@ func (a *availablePlugin) CheckHealth() {
} else {
a.healthCheckFailed()
}
case <-time.After(time.Second * 1):
case <-time.After(DefaultHealthCheckTimeout):
a.healthCheckFailed()
}
}
Expand Down
87 changes: 83 additions & 4 deletions control/control_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,7 @@ type mockPluginEvent struct {
UnloadedPluginName string
UnloadedPluginVersion int
PluginType int
EventNamespace string
}

type listenToPluginEvent struct {
Expand All @@ -245,6 +246,15 @@ func newListenToPluginEvent() *listenToPluginEvent {

func (l *listenToPluginEvent) HandleGomitEvent(e gomit.Event) {
switch v := e.Body.(type) {
case *control_event.RestartedAvailablePluginEvent:
l.plugin.EventNamespace = v.Namespace()
l.done <- struct{}{}
case *control_event.MaxPluginRestartsExceededEvent:
l.plugin.EventNamespace = v.Namespace()
l.done <- struct{}{}
case *control_event.DeadAvailablePluginEvent:
l.plugin.EventNamespace = v.Namespace()
l.done <- struct{}{}
case *control_event.LoadPluginEvent:
l.plugin.LoadedPluginName = v.Name
l.plugin.LoadedPluginVersion = v.Version
Expand Down Expand Up @@ -839,10 +849,11 @@ func TestRoutingCachingStrategy(t *testing.T) {
if e != nil {
t.FailNow()
}
metric, err := c.metricCatalog.Get([]string{"intel", "mock", "foo"}, 2)
So(err, ShouldBeNil)
So(metric.NamespaceAsString(), ShouldResemble, "/intel/mock/foo")
So(err, ShouldBeNil)
metric := MockMetricType{
namespace: []string{"intel", "mock", "foo"},
ver: 2,
cfg: cdata.NewNode(),
}
<-lpe.done
Convey("Start the plugins", func() {
lp, err := c.pluginManager.get("collector:mock:2")
Expand Down Expand Up @@ -1080,6 +1091,74 @@ func TestCollectDynamicMetrics(t *testing.T) {
})
}

func TestFailedPlugin(t *testing.T) {
Convey("given a loaded plugin", t, func() {
// Create controller
c := New()
c.Start()
lpe := newListenToPluginEvent()
c.eventManager.RegisterHandler("TEST", lpe)
c.Config.Plugins.All.AddItem("password", ctypes.ConfigValueStr{Value: "testval"})

// Load plugin
load(c, PluginPath)
<-lpe.done
_, err := c.MetricCatalog()
So(err, ShouldBeNil)

// metrics to collect
cfg := cdata.NewNode()
cfg.AddItem("panic", ctypes.ConfigValueBool{Value: true})
m := []core.Metric{
MockMetricType{
namespace: []string{"intel", "mock", "foo"},
cfg: cfg,
},
}

// retrieve loaded plugin
lp, err := c.pluginManager.get("collector:mock:2")
So(err, ShouldBeNil)
So(lp, ShouldNotBeNil)

Convey("create a pool, add subscriptions and start plugins", func() {
pool, errp := c.pluginRunner.AvailablePlugins().getOrCreatePool("collector:mock:2")
So(errp, ShouldBeNil)
pool.Subscribe("1", strategy.UnboundSubscriptionType)
err = c.pluginRunner.runPlugin(lp.Details)
So(err, ShouldBeNil)

Convey("collect metrics against a plugin that will panic", func() {
So(len(pool.Plugins()), ShouldEqual, 1)

var err []error
var cr []core.Metric
eventMap := map[string]int{}
for i := 0; i < MaxPluginRestartCount+1; i++ {
cr, err = c.CollectMetrics(m, time.Now().Add(time.Second*1), uuid.New())
So(err, ShouldNotBeNil)
So(cr, ShouldBeNil)
<-lpe.done
eventMap[lpe.plugin.EventNamespace]++

if i < MaxPluginRestartCount {
<-lpe.done
eventMap[lpe.plugin.EventNamespace]++
So(pool.RestartCount(), ShouldEqual, i+1)
So(lpe.plugin.EventNamespace, ShouldEqual, control_event.AvailablePluginRestarted)
}
}
<-lpe.done
So(lpe.plugin.EventNamespace, ShouldEqual, control_event.PluginRestartsExceeded)
So(eventMap[control_event.AvailablePluginRestarted], ShouldEqual, MaxPluginRestartCount)
So(len(pool.Plugins()), ShouldEqual, 0)
So(pool.RestartCount(), ShouldEqual, MaxPluginRestartCount)
})
})
c.Stop()
})
}

func TestCollectMetrics(t *testing.T) {
Convey("given a loaded plugin", t, func() {
// adjust HB timeouts for test
Expand Down
51 changes: 51 additions & 0 deletions control/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,10 @@ const (
PluginStopped
// PluginDisabled is the disabled state of a plugin
PluginDisabled

// MaximumRestartOnDeadPluginEvent is the maximum count of restarting a plugin
// after the event of control_event.DeadAvailablePluginEvent
MaxPluginRestartCount = 3
)

// TBD
Expand Down Expand Up @@ -239,6 +243,7 @@ func (r *runner) HandleGomitEvent(e gomit.Event) {
"event": v.Namespace(),
"aplugin": v.String,
}).Warning("handling dead available plugin event")

pool, err := r.availablePlugins.getPool(v.Key)
if err != nil {
runnerLog.WithFields(log.Fields{
Expand All @@ -247,9 +252,47 @@ func (r *runner) HandleGomitEvent(e gomit.Event) {
}).Error(err.Error())
return
}

if pool != nil {
pool.Kill(v.Id, "plugin dead")
}

if pool.Eligible() {
if pool.RestartCount() < MaxPluginRestartCount {
e := r.restartPlugin(v.Key)
if e != nil {
runnerLog.WithFields(log.Fields{
"_block": "handle-events",
"aplugin": v.String,
}).Error(err.Error())
return
}
pool.IncRestartCount()

runnerLog.WithFields(log.Fields{
"_block": "handle-events",
"event": v.Name,
"aplugin": v.Version,
"restart_count": pool.RestartCount(),
}).Warning("plugin restarted")

r.emitter.Emit(&control_event.RestartedAvailablePluginEvent{
Id: v.Id,
Name: v.Name,
Version: v.Version,
Key: v.Key,
Type: v.Type,
})
} else {
r.emitter.Emit(&control_event.MaxPluginRestartsExceededEvent{
Id: v.Id,
Name: v.Name,
Version: v.Version,
Key: v.Key,
Type: v.Type,
})
}
}
case *control_event.PluginUnsubscriptionEvent:
runnerLog.WithFields(log.Fields{
"_block": "subscribe-pool",
Expand Down Expand Up @@ -422,3 +465,11 @@ func (r *runner) handleUnsubscription(pType, pName string, pVersion int, taskID
}
return nil
}

func (r *runner) restartPlugin(key string) error {
lp, err := r.pluginManager.get(key)
if err != nil {
return err
}
return r.runPlugin(lp.Details)
}
17 changes: 17 additions & 0 deletions control/strategy/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@ type Pool interface {
SubscriptionCount() int
Unsubscribe(taskID string)
Version() int
RestartCount() int
IncRestartCount()
}

type AvailablePlugin interface {
Expand Down Expand Up @@ -122,6 +124,10 @@ type pool struct {
// The routing and caching strategy declared by the plugin.
// strategy RoutingAndCaching
RoutingAndCaching

// restartCount the restart count of available plugins
// when the DeadAvailablePluginEvent occurs
restartCount int
}

func NewPool(key string, plugins ...AvailablePlugin) (Pool, error) {
Expand Down Expand Up @@ -164,6 +170,17 @@ func (p *pool) Strategy() RoutingAndCaching {
return p.RoutingAndCaching
}

// RestartCount returns the restart count of a pool
func (p *pool) RestartCount() int {
return p.restartCount
}

func (p *pool) IncRestartCount() {
p.RLock()
defer p.RUnlock()
p.restartCount++
}

// Insert inserts an AvailablePlugin into the pool
func (p *pool) Insert(a AvailablePlugin) error {
if a.Type() != plugin.CollectorPluginType && a.Type() != plugin.ProcessorPluginType && a.Type() != plugin.PublisherPluginType {
Expand Down
50 changes: 38 additions & 12 deletions core/control_event/control_event.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,20 @@ limitations under the License.
package control_event

const (
AvailablePluginDead = "Control.AvailablePluginDead"
PluginLoaded = "Control.PluginLoaded"
PluginUnloaded = "Control.PluginUnloaded"
PluginsSwapped = "Control.PluginsSwapped"
PluginSubscribed = "Control.PluginSubscribed"
PluginUnsubscribed = "Control.PluginUnsubscribed"
ProcessorSubscribed = "Control.ProcessorSubscribed"
ProcessorUnsubscribed = "Control.ProcessorUnsubscribed"
MetricSubscribed = "Control.MetricSubscribed"
MetricUnsubscribed = "Control.MetricUnsubscribed"
HealthCheckFailed = "Control.PluginHealthCheckFailed"
MoveSubscription = "Control.PluginSubscriptionMoved"
AvailablePluginDead = "Control.AvailablePluginDead"
AvailablePluginRestarted = "Control.RestartedAvailablePlugin"
PluginRestartsExceeded = "Control.PluginRestartsExceeded"
PluginLoaded = "Control.PluginLoaded"
PluginUnloaded = "Control.PluginUnloaded"
PluginsSwapped = "Control.PluginsSwapped"
PluginSubscribed = "Control.PluginSubscribed"
PluginUnsubscribed = "Control.PluginUnsubscribed"
ProcessorSubscribed = "Control.ProcessorSubscribed"
ProcessorUnsubscribed = "Control.ProcessorUnsubscribed"
MetricSubscribed = "Control.MetricSubscribed"
MetricUnsubscribed = "Control.MetricUnsubscribed"
HealthCheckFailed = "Control.PluginHealthCheckFailed"
MoveSubscription = "Control.PluginSubscriptionMoved"
)

type LoadPluginEvent struct {
Expand Down Expand Up @@ -68,6 +70,30 @@ func (e *DeadAvailablePluginEvent) Namespace() string {
return AvailablePluginDead
}

type RestartedAvailablePluginEvent struct {
Name string
Version int
Type int
Key string
Id uint32
}

func (e *MaxPluginRestartsExceededEvent) Namespace() string {
return PluginRestartsExceeded
}

type MaxPluginRestartsExceededEvent struct {
Name string
Version int
Type int
Key string
Id uint32
}

func (e *RestartedAvailablePluginEvent) Namespace() string {
return AvailablePluginRestarted
}

type SwapPluginsEvent struct {
LoadedPluginName string
LoadedPluginVersion int
Expand Down
5 changes: 5 additions & 0 deletions plugin/collector/snap-collector-mock2/mock/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/intelsdi-x/snap/control/plugin"
"github.com/intelsdi-x/snap/control/plugin/cpolicy"
"github.com/intelsdi-x/snap/core"
"github.com/intelsdi-x/snap/core/ctypes"
)

const (
Expand All @@ -49,9 +50,13 @@ func (f *Mock) CollectMetrics(mts []plugin.PluginMetricType) ([]plugin.PluginMet
for _, p := range mts {
log.Printf("collecting %+v\n", p)
}

rand.Seed(time.Now().UTC().UnixNano())
metrics := []plugin.PluginMetricType{}
for i := range mts {
if c, ok := mts[i].Config().Table()["panic"]; ok && c.(ctypes.ConfigValueBool).Value {
panic("Opps!")
}
if mts[i].Namespace()[2] == "*" {
hostname, _ := os.Hostname()
for j := 0; j < 10; j++ {
Expand Down

0 comments on commit be26613

Please sign in to comment.