Skip to content
This repository has been archived by the owner on Nov 8, 2022. It is now read-only.

Commit

Permalink
Fix #824: Add proper handling of subscription movement
Browse files Browse the repository at this point in the history
Added a check in availablePlugins.collectMetrics for a nil strategy that

was causing a panic when metric subscriptions were in the process of

being moved.



Modified the handling of load/unload plugin events to properly handle

moving subscriptions between differing versions of a plugin that

provides the same metrics and added tests for this behavior in

control/control_test.go.
  • Loading branch information
IRCody committed Apr 7, 2016
1 parent d2f9ea6 commit b7ec954
Show file tree
Hide file tree
Showing 5 changed files with 373 additions and 59 deletions.
6 changes: 6 additions & 0 deletions control/available_plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -368,6 +368,12 @@ func (ap *availablePlugins) collectMetrics(pluginKey string, metricTypes []core.
if pool == nil {
return nil, serror.New(ErrPoolNotFound, map[string]interface{}{"pool-key": pluginKey})
}
// If the strategy is nil but the pool exists we likely are waiting on the pool to be fully initialized
// because of a plugin load/unload event that is currently being processed. Prevents panic from using nil
// RoutingAndCaching.
if pool.Strategy() == nil {
return nil, errors.New("Plugin strategy not set")
}

metricsToCollect, metricsFromCache := pool.CheckCache(metricTypes, taskID)

Expand Down
57 changes: 47 additions & 10 deletions control/control.go
Original file line number Diff line number Diff line change
Expand Up @@ -589,18 +589,22 @@ func (p *pluginControl) validateMetricTypeSubscription(mt core.RequestedMetric,
return serrs
}

func (p *pluginControl) gatherCollectors(mts []core.Metric) ([]core.Plugin, []serror.SnapError) {
type gatheredPlugin struct {
plugin core.Plugin
subscriptionType strategy.SubscriptionType
}

func (p *pluginControl) gatherCollectors(mts []core.Metric) ([]gatheredPlugin, []serror.SnapError) {
var (
plugins []core.Plugin
plugins []gatheredPlugin
serrs []serror.SnapError
)

// here we resolve and retrieve plugins for each metric type.
// if the incoming metric type version is < 1, we treat that as
// latest as with plugins. The following two loops create a set
// of plugins with proper versions needed to discern the subscription
// types.
colPlugins := make(map[string]*loadedPlugin)
colPlugins := make(map[string]gatheredPlugin)
for _, mt := range mts {
// If the version provided is <1 we will get the latest
// plugin for the given metric.
Expand All @@ -612,8 +616,14 @@ func (p *pluginControl) gatherCollectors(mts []core.Metric) ([]core.Plugin, []se
}))
continue
}

colPlugins[m.Plugin.Key()] = m.Plugin
subType := strategy.BoundSubscriptionType
if mt.Version() < 1 {
subType = strategy.UnboundSubscriptionType
}
colPlugins[fmt.Sprintf("%s:%d", m.Plugin.Key(), subType)] = gatheredPlugin{
plugin: m.Plugin,
subscriptionType: subType,
}
}
if len(serrs) > 0 {
return plugins, serrs
Expand All @@ -622,19 +632,44 @@ func (p *pluginControl) gatherCollectors(mts []core.Metric) ([]core.Plugin, []se
for _, lp := range colPlugins {
plugins = append(plugins, lp)
}

if len(plugins) == 0 {
serrs = append(serrs, serror.New(errors.New("No plugins found")))
return nil, serrs
}
return plugins, nil
}

func (p *pluginControl) SubscribeDeps(taskID string, mts []core.Metric, plugins []core.Plugin) []serror.SnapError {
var serrs []serror.SnapError

collectors, errs := p.gatherCollectors(mts)
if len(errs) > 0 {
serrs = append(serrs)
}
plugins = append(plugins, collectors...)

for _, gc := range collectors {
pool, err := p.pluginRunner.AvailablePlugins().getOrCreatePool(fmt.Sprintf("%s:%s:%d", gc.plugin.TypeName(), gc.plugin.Name(), gc.plugin.Version()))
if err != nil {
serrs = append(serrs, serror.New(err))
return serrs
}
pool.Subscribe(taskID, gc.subscriptionType)
if pool.Eligible() {
err = p.verifyPlugin(gc.plugin.(*loadedPlugin))
if err != nil {
serrs = append(serrs, serror.New(err))
return serrs
}
err = p.pluginRunner.runPlugin(gc.plugin.(*loadedPlugin).Details)
if err != nil {
serrs = append(serrs, serror.New(err))
return serrs
}
}
serr := p.sendPluginSubscriptionEvent(taskID, gc.plugin)
if serr != nil {
serrs = append(serrs, serr)
}
}
for _, sub := range plugins {
// pools are created statically, not with keys like "publisher:foo:-1"
// here we check to see if the version of the incoming plugin is -1, and
Expand Down Expand Up @@ -741,7 +776,9 @@ func (p *pluginControl) UnsubscribeDeps(taskID string, mts []core.Metric, plugin
if len(errs) > 0 {
serrs = append(serrs, errs...)
}
plugins = append(plugins, collectors...)
for _, gc := range collectors {
plugins = append(plugins, gc.plugin)
}

for _, sub := range plugins {
pool, err := p.pluginRunner.AvailablePlugins().getPool(fmt.Sprintf("%s:%s:%d", sub.TypeName(), sub.Name(), sub.Version()))
Expand Down
234 changes: 198 additions & 36 deletions control/control_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"testing"
"time"

log "github.com/Sirupsen/logrus"
"github.com/pborman/uuid"
. "github.com/smartystreets/goconvey/convey"
"github.com/vrischmann/jsonutil"
Expand Down Expand Up @@ -256,40 +257,48 @@ func newListenToPluginEvent() *listenToPluginEvent {
}

func (l *listenToPluginEvent) HandleGomitEvent(e gomit.Event) {
go func() {
switch v := e.Body.(type) {
case *control_event.RestartedAvailablePluginEvent:
l.plugin.EventNamespace = v.Namespace()
l.done <- struct{}{}
case *control_event.MaxPluginRestartsExceededEvent:
l.plugin.EventNamespace = v.Namespace()
l.done <- struct{}{}
case *control_event.DeadAvailablePluginEvent:
l.plugin.EventNamespace = v.Namespace()
l.done <- struct{}{}
case *control_event.LoadPluginEvent:
l.plugin.LoadedPluginName = v.Name
l.plugin.LoadedPluginVersion = v.Version
l.plugin.PluginType = v.Type
l.done <- struct{}{}
case *control_event.UnloadPluginEvent:
l.plugin.UnloadedPluginName = v.Name
l.plugin.UnloadedPluginVersion = v.Version
l.plugin.PluginType = v.Type
l.done <- struct{}{}
case *control_event.SwapPluginsEvent:
l.plugin.LoadedPluginName = v.LoadedPluginName
l.plugin.LoadedPluginVersion = v.LoadedPluginVersion
l.plugin.UnloadedPluginName = v.UnloadedPluginName
l.plugin.UnloadedPluginVersion = v.UnloadedPluginVersion
l.plugin.PluginType = v.PluginType
l.done <- struct{}{}
case *control_event.PluginSubscriptionEvent:
l.done <- struct{}{}
default:
fmt.Println("Got an event you're not handling")
}
}()
switch v := e.Body.(type) {
case *control_event.RestartedAvailablePluginEvent:
l.plugin.EventNamespace = v.Namespace()
l.done <- struct{}{}
case *control_event.MaxPluginRestartsExceededEvent:
l.plugin.EventNamespace = v.Namespace()
l.done <- struct{}{}
case *control_event.DeadAvailablePluginEvent:
l.plugin.EventNamespace = v.Namespace()
l.done <- struct{}{}
case *control_event.LoadPluginEvent:
l.plugin.LoadedPluginName = v.Name
l.plugin.LoadedPluginVersion = v.Version
l.plugin.PluginType = v.Type
l.plugin.EventNamespace = v.Namespace()
l.done <- struct{}{}
case *control_event.UnloadPluginEvent:
l.plugin.UnloadedPluginName = v.Name
l.plugin.UnloadedPluginVersion = v.Version
l.plugin.PluginType = v.Type
l.plugin.EventNamespace = v.Namespace()
l.done <- struct{}{}
case *control_event.SwapPluginsEvent:
l.plugin.LoadedPluginName = v.LoadedPluginName
l.plugin.LoadedPluginVersion = v.LoadedPluginVersion
l.plugin.UnloadedPluginName = v.UnloadedPluginName
l.plugin.UnloadedPluginVersion = v.UnloadedPluginVersion
l.plugin.PluginType = v.PluginType
l.plugin.EventNamespace = v.Namespace()
l.done <- struct{}{}
case *control_event.MovePluginSubscriptionEvent:
l.plugin.EventNamespace = v.Namespace()
l.done <- struct{}{}
case *control_event.PluginSubscriptionEvent:
l.plugin.EventNamespace = v.Namespace()
l.done <- struct{}{}
default:
controlLogger.WithFields(log.Fields{
"event:": v.Namespace(),
"_block": "HandleGomit",
}).Info("Unhandled Event")
}
}

var (
Expand Down Expand Up @@ -1414,7 +1423,7 @@ func TestGatherCollectors(t *testing.T) {
plgs, errs := c.gatherCollectors(m)
So(errs, ShouldBeNil)
So(plgs, ShouldNotBeEmpty)
So(plgs[0].Version(), ShouldEqual, 1)
So(plgs[0].plugin.Version(), ShouldEqual, 1)
})
Convey("it gathers the queried version of plugin", func() {
Convey("the version is available", func() {
Expand All @@ -1428,7 +1437,7 @@ func TestGatherCollectors(t *testing.T) {
plgs, errs := c.gatherCollectors(m)
So(errs, ShouldBeNil)
So(plgs, ShouldNotBeEmpty)
So(plgs[0].Version(), ShouldEqual, v)
So(plgs[0].plugin.Version(), ShouldEqual, v)
})
Convey("the version is not available", func() {
m := []core.Metric{
Expand Down Expand Up @@ -1588,3 +1597,156 @@ func TestProcessMetrics(t *testing.T) {

})
}

type listenToPluginSubscriptionEvent struct {
plugin *mockPluginEvent
done chan struct{}
}

func newListenToPluginSubscriptionEvent() *listenToPluginSubscriptionEvent {
return &listenToPluginSubscriptionEvent{
done: make(chan struct{}),
plugin: &mockPluginEvent{},
}
}

func (l *listenToPluginSubscriptionEvent) HandleGomitEvent(e gomit.Event) {
switch v := e.Body.(type) {
case *control_event.MovePluginSubscriptionEvent:
l.plugin.EventNamespace = v.Namespace()
l.done <- struct{}{}
default:
controlLogger.WithFields(log.Fields{
"event:": v.Namespace(),
"_block": "HandleGomit",
}).Info("Unhandled Event")
}
}
func TestMetricSubscriptionToNewVersion(t *testing.T) {
Convey("Given a metric that is being collected at v1", t, func() {
c := New(GetDefaultConfig())
lpe := newListenToPluginSubscriptionEvent()
c.eventManager.RegisterHandler("TestMetricSubscriptionToNewVersion", lpe)
c.Start()
_, err := load(c, path.Join(SnapPath, "plugin", "snap-collector-mock1"))
So(err, ShouldBeNil)
So(len(c.pluginManager.all()), ShouldEqual, 1)
lp, err2 := c.pluginManager.get("collector:mock:1")
So(err2, ShouldBeNil)
So(lp.Name(), ShouldResemble, "mock")
//Subscribe deps to create pools.
metric := MockMetricType{
namespace: []string{"intel", "mock", "foo"},
cfg: cdata.NewNode(),
ver: 0,
}
So(metric.Version(), ShouldEqual, 0)
serr := c.SubscribeDeps("testTaskID", []core.Metric{metric}, []core.Plugin{})
So(serr, ShouldBeNil)
// collect metrics as a sanity check that everything is setup correctly
mts, errs := c.CollectMetrics([]core.Metric{metric}, time.Now(), "testTaskID")
So(errs, ShouldBeNil)
So(len(mts), ShouldEqual, 1)
// ensure the data coming back is from v1. V1's data is type string
_, ok := mts[0].Data().(string)
So(ok, ShouldEqual, true)
Convey("Loading v2 of that plugin should move subscriptions to newer version", func() {
// Load version snap-collector-mock2
_, err := load(c, path.Join(SnapPath, "plugin", "snap-collector-mock2"))
So(err, ShouldBeNil)
select {
// Wait on subscriptionMovedEvent
case <-lpe.done:
case <-time.After(10 * time.Second):
fmt.Println("timeout waiting for move subscription event")
So(false, ShouldEqual, true)
}
// Check for subscription movement.
// Give some time for subscription to be moved.
var pool1 strategy.Pool
var errp error

pool1, errp = c.pluginRunner.AvailablePlugins().getOrCreatePool("collector:mock:1")
So(errp, ShouldBeNil)
So(pool1.SubscriptionCount(), ShouldEqual, 0)

pool2, errp := c.pluginRunner.AvailablePlugins().getOrCreatePool("collector:mock:2")
So(errp, ShouldBeNil)
So(pool2.SubscriptionCount(), ShouldEqual, 1)

mts, errs = c.CollectMetrics([]core.Metric{metric}, time.Now(), "testTaskID")
So(len(mts), ShouldEqual, 1)

// ensure the data coming back is from v2, V2's data is type int
_, ok = mts[0].Data().(int)
So(ok, ShouldEqual, true)
})
})
}

func TestMetricSubscriptionToOlderVersion(t *testing.T) {
Convey("Given a metric that is being collected at v2", t, func() {
c := New(GetDefaultConfig())
lpe := newListenToPluginSubscriptionEvent()
c.eventManager.RegisterHandler("TestMetricSubscriptionToOlderVersion", lpe)
c.Start()
_, err := load(c, path.Join(SnapPath, "plugin", "snap-collector-mock2"))
So(err, ShouldBeNil)
So(len(c.pluginManager.all()), ShouldEqual, 1)
lp, err2 := c.pluginManager.get("collector:mock:2")
So(err2, ShouldBeNil)
So(lp.Name(), ShouldResemble, "mock")
//Subscribe deps to create pools.
metric := MockMetricType{
namespace: []string{"intel", "mock", "foo"},
cfg: cdata.NewNode(),
ver: 0,
}
serr := c.SubscribeDeps("testTaskID", []core.Metric{metric}, []core.Plugin{})
So(serr, ShouldBeNil)
// collect metrics as a sanity check that everything is setup correctly
mts, errs := c.CollectMetrics([]core.Metric{metric}, time.Now(), "testTaskID")
So(errs, ShouldBeNil)
So(len(mts), ShouldEqual, 1)
// ensure the data coming back is from v2. V2's data is type int
_, ok := mts[0].Data().(int)
So(ok, ShouldEqual, true)
// grab plugin for mock v2
pc := c.PluginCatalog()
mockv2 := pc[0]
Convey("Loading v1 of that plugin and unloading v2 should move subscriptions to older version", func() {
// Load version snap-collector-mock2
_, err = load(c, path.Join(SnapPath, "plugin", "snap-collector-mock1"))
So(err, ShouldBeNil)
_, err = c.Unload(mockv2)
So(err, ShouldBeNil)
select {
// Wait on subscriptionMovedEvent
case <-lpe.done:
case <-time.After(10 * time.Second):
fmt.Println("Timeout waiting for move subscription event")
So(false, ShouldEqual, true)
}
// Check for subscription movement.
// Give some time for subscription to be moved.
var pool1 strategy.Pool
var errp error
ap := c.pluginRunner.AvailablePlugins()
pool1, errp = ap.getOrCreatePool("collector:mock:2")
So(errp, ShouldBeNil)
So(pool1.SubscriptionCount(), ShouldEqual, 0)

pool2, errp := ap.getOrCreatePool("collector:mock:1")
So(errp, ShouldBeNil)
So(pool2.SubscriptionCount(), ShouldEqual, 1)

mts, errs = c.CollectMetrics([]core.Metric{metric}, time.Now(), "testTaskID")
So(len(mts), ShouldEqual, 1)

// ensure the data coming back is from v1, V1's data is type string
_, ok = mts[0].Data().(string)
So(ok, ShouldEqual, true)
})
})

}
Loading

0 comments on commit b7ec954

Please sign in to comment.