Skip to content
This repository has been archived by the owner on Nov 8, 2022. It is now read-only.

SDI-818: Restart failed plugins #688

Merged
merged 6 commits into from
Feb 18, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion control/available_plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,7 @@ func (a *availablePlugin) CheckHealth() {
} else {
a.healthCheckFailed()
}
case <-time.After(time.Second * 1):
case <-time.After(DefaultHealthCheckTimeout):
a.healthCheckFailed()
}
}
Expand Down
87 changes: 83 additions & 4 deletions control/control_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,7 @@ type mockPluginEvent struct {
UnloadedPluginName string
UnloadedPluginVersion int
PluginType int
EventNamespace string
}

type listenToPluginEvent struct {
Expand All @@ -245,6 +246,15 @@ func newListenToPluginEvent() *listenToPluginEvent {

func (l *listenToPluginEvent) HandleGomitEvent(e gomit.Event) {
switch v := e.Body.(type) {
case *control_event.RestartedAvailablePluginEvent:
l.plugin.EventNamespace = v.Namespace()
l.done <- struct{}{}
case *control_event.MaxPluginRestartsExceededEvent:
l.plugin.EventNamespace = v.Namespace()
l.done <- struct{}{}
case *control_event.DeadAvailablePluginEvent:
l.plugin.EventNamespace = v.Namespace()
l.done <- struct{}{}
case *control_event.LoadPluginEvent:
l.plugin.LoadedPluginName = v.Name
l.plugin.LoadedPluginVersion = v.Version
Expand Down Expand Up @@ -839,10 +849,11 @@ func TestRoutingCachingStrategy(t *testing.T) {
if e != nil {
t.FailNow()
}
metric, err := c.metricCatalog.Get([]string{"intel", "mock", "foo"}, 2)
So(err, ShouldBeNil)
So(metric.NamespaceAsString(), ShouldResemble, "/intel/mock/foo")
So(err, ShouldBeNil)
metric := MockMetricType{
namespace: []string{"intel", "mock", "foo"},
ver: 2,
cfg: cdata.NewNode(),
}
<-lpe.done
Convey("Start the plugins", func() {
lp, err := c.pluginManager.get("collector:mock:2")
Expand Down Expand Up @@ -1080,6 +1091,74 @@ func TestCollectDynamicMetrics(t *testing.T) {
})
}

func TestFailedPlugin(t *testing.T) {
Convey("given a loaded plugin", t, func() {
// Create controller
c := New()
c.Start()
lpe := newListenToPluginEvent()
c.eventManager.RegisterHandler("TEST", lpe)
c.Config.Plugins.All.AddItem("password", ctypes.ConfigValueStr{Value: "testval"})

// Load plugin
load(c, PluginPath)
<-lpe.done
_, err := c.MetricCatalog()
So(err, ShouldBeNil)

// metrics to collect
cfg := cdata.NewNode()
cfg.AddItem("panic", ctypes.ConfigValueBool{Value: true})
m := []core.Metric{
MockMetricType{
namespace: []string{"intel", "mock", "foo"},
cfg: cfg,
},
}

// retrieve loaded plugin
lp, err := c.pluginManager.get("collector:mock:2")
So(err, ShouldBeNil)
So(lp, ShouldNotBeNil)

Convey("create a pool, add subscriptions and start plugins", func() {
pool, errp := c.pluginRunner.AvailablePlugins().getOrCreatePool("collector:mock:2")
So(errp, ShouldBeNil)
pool.Subscribe("1", strategy.UnboundSubscriptionType)
err = c.pluginRunner.runPlugin(lp.Details)
So(err, ShouldBeNil)

Convey("collect metrics against a plugin that will panic", func() {
So(len(pool.Plugins()), ShouldEqual, 1)

var err []error
var cr []core.Metric
eventMap := map[string]int{}
for i := 0; i < MaxPluginRestartCount+1; i++ {
cr, err = c.CollectMetrics(m, time.Now().Add(time.Second*1), uuid.New())
So(err, ShouldNotBeNil)
So(cr, ShouldBeNil)
<-lpe.done
eventMap[lpe.plugin.EventNamespace]++

if i < MaxPluginRestartCount {
<-lpe.done
eventMap[lpe.plugin.EventNamespace]++
So(pool.RestartCount(), ShouldEqual, i+1)
So(lpe.plugin.EventNamespace, ShouldEqual, control_event.AvailablePluginRestarted)
}
}
<-lpe.done
So(lpe.plugin.EventNamespace, ShouldEqual, control_event.PluginRestartsExceeded)
So(eventMap[control_event.AvailablePluginRestarted], ShouldEqual, MaxPluginRestartCount)
So(len(pool.Plugins()), ShouldEqual, 0)
So(pool.RestartCount(), ShouldEqual, MaxPluginRestartCount)
})
})
c.Stop()
})
}

func TestCollectMetrics(t *testing.T) {
Convey("given a loaded plugin", t, func() {
// adjust HB timeouts for test
Expand Down
51 changes: 51 additions & 0 deletions control/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,10 @@ const (
PluginStopped
// PluginDisabled is the disabled state of a plugin
PluginDisabled

// MaximumRestartOnDeadPluginEvent is the maximum count of restarting a plugin
// after the event of control_event.DeadAvailablePluginEvent
MaxPluginRestartCount = 3
)

// TBD
Expand Down Expand Up @@ -239,6 +243,7 @@ func (r *runner) HandleGomitEvent(e gomit.Event) {
"event": v.Namespace(),
"aplugin": v.String,
}).Warning("handling dead available plugin event")

pool, err := r.availablePlugins.getPool(v.Key)
if err != nil {
runnerLog.WithFields(log.Fields{
Expand All @@ -247,9 +252,47 @@ func (r *runner) HandleGomitEvent(e gomit.Event) {
}).Error(err.Error())
return
}

if pool != nil {
pool.Kill(v.Id, "plugin dead")
}

if pool.Eligible() {
if pool.RestartCount() < MaxPluginRestartCount {
e := r.restartPlugin(v.Key)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's add a log statement (warn) that we restarted plugin (name:version) x times.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add the warn log. please check. thanks

if e != nil {
runnerLog.WithFields(log.Fields{
"_block": "handle-events",
"aplugin": v.String,
}).Error(err.Error())
return
}
pool.IncRestartCount()

runnerLog.WithFields(log.Fields{
"_block": "handle-events",
"event": v.Name,
"aplugin": v.Version,
"restart_count": pool.RestartCount(),
}).Warning("plugin restarted")

r.emitter.Emit(&control_event.RestartedAvailablePluginEvent{
Id: v.Id,
Name: v.Name,
Version: v.Version,
Key: v.Key,
Type: v.Type,
})
} else {
r.emitter.Emit(&control_event.MaxPluginRestartsExceededEvent{
Id: v.Id,
Name: v.Name,
Version: v.Version,
Key: v.Key,
Type: v.Type,
})
}
}
case *control_event.PluginUnsubscriptionEvent:
runnerLog.WithFields(log.Fields{
"_block": "subscribe-pool",
Expand Down Expand Up @@ -422,3 +465,11 @@ func (r *runner) handleUnsubscription(pType, pName string, pVersion int, taskID
}
return nil
}

func (r *runner) restartPlugin(key string) error {
lp, err := r.pluginManager.get(key)
if err != nil {
return err
}
return r.runPlugin(lp.Details)
}
17 changes: 17 additions & 0 deletions control/strategy/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@ type Pool interface {
SubscriptionCount() int
Unsubscribe(taskID string)
Version() int
RestartCount() int
IncRestartCount()
}

type AvailablePlugin interface {
Expand Down Expand Up @@ -122,6 +124,10 @@ type pool struct {
// The routing and caching strategy declared by the plugin.
// strategy RoutingAndCaching
RoutingAndCaching

// restartCount the restart count of available plugins
// when the DeadAvailablePluginEvent occurs
restartCount int
}

func NewPool(key string, plugins ...AvailablePlugin) (Pool, error) {
Expand Down Expand Up @@ -164,6 +170,17 @@ func (p *pool) Strategy() RoutingAndCaching {
return p.RoutingAndCaching
}

// RestartCount returns the restart count of a pool
func (p *pool) RestartCount() int {
return p.restartCount
}

func (p *pool) IncRestartCount() {
p.RLock()
defer p.RUnlock()
p.restartCount++
}

// Insert inserts an AvailablePlugin into the pool
func (p *pool) Insert(a AvailablePlugin) error {
if a.Type() != plugin.CollectorPluginType && a.Type() != plugin.ProcessorPluginType && a.Type() != plugin.PublisherPluginType {
Expand Down
50 changes: 38 additions & 12 deletions core/control_event/control_event.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,20 @@ limitations under the License.
package control_event

const (
AvailablePluginDead = "Control.AvailablePluginDead"
PluginLoaded = "Control.PluginLoaded"
PluginUnloaded = "Control.PluginUnloaded"
PluginsSwapped = "Control.PluginsSwapped"
PluginSubscribed = "Control.PluginSubscribed"
PluginUnsubscribed = "Control.PluginUnsubscribed"
ProcessorSubscribed = "Control.ProcessorSubscribed"
ProcessorUnsubscribed = "Control.ProcessorUnsubscribed"
MetricSubscribed = "Control.MetricSubscribed"
MetricUnsubscribed = "Control.MetricUnsubscribed"
HealthCheckFailed = "Control.PluginHealthCheckFailed"
MoveSubscription = "Control.PluginSubscriptionMoved"
AvailablePluginDead = "Control.AvailablePluginDead"
AvailablePluginRestarted = "Control.RestartedAvailablePlugin"
PluginRestartsExceeded = "Control.PluginRestartsExceeded"
PluginLoaded = "Control.PluginLoaded"
PluginUnloaded = "Control.PluginUnloaded"
PluginsSwapped = "Control.PluginsSwapped"
PluginSubscribed = "Control.PluginSubscribed"
PluginUnsubscribed = "Control.PluginUnsubscribed"
ProcessorSubscribed = "Control.ProcessorSubscribed"
ProcessorUnsubscribed = "Control.ProcessorUnsubscribed"
MetricSubscribed = "Control.MetricSubscribed"
MetricUnsubscribed = "Control.MetricUnsubscribed"
HealthCheckFailed = "Control.PluginHealthCheckFailed"
MoveSubscription = "Control.PluginSubscriptionMoved"
)

type LoadPluginEvent struct {
Expand Down Expand Up @@ -68,6 +70,30 @@ func (e *DeadAvailablePluginEvent) Namespace() string {
return AvailablePluginDead
}

type RestartedAvailablePluginEvent struct {
Name string
Version int
Type int
Key string
Id uint32
}

func (e *MaxPluginRestartsExceededEvent) Namespace() string {
return PluginRestartsExceeded
}

type MaxPluginRestartsExceededEvent struct {
Name string
Version int
Type int
Key string
Id uint32
}

func (e *RestartedAvailablePluginEvent) Namespace() string {
return AvailablePluginRestarted
}

type SwapPluginsEvent struct {
LoadedPluginName string
LoadedPluginVersion int
Expand Down
5 changes: 5 additions & 0 deletions plugin/collector/snap-collector-mock2/mock/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/intelsdi-x/snap/control/plugin"
"github.com/intelsdi-x/snap/control/plugin/cpolicy"
"github.com/intelsdi-x/snap/core"
"github.com/intelsdi-x/snap/core/ctypes"
)

const (
Expand All @@ -49,9 +50,13 @@ func (f *Mock) CollectMetrics(mts []plugin.PluginMetricType) ([]plugin.PluginMet
for _, p := range mts {
log.Printf("collecting %+v\n", p)
}

rand.Seed(time.Now().UTC().UnixNano())
metrics := []plugin.PluginMetricType{}
for i := range mts {
if c, ok := mts[i].Config().Table()["panic"]; ok && c.(ctypes.ConfigValueBool).Value {
panic("Opps!")
}
if mts[i].Namespace()[2] == "*" {
hostname, _ := os.Hostname()
for j := 0; j < 10; j++ {
Expand Down