Skip to content

Commit

Permalink
Fix intelsdi-x#824: Add proper handling of metric rollover
Browse files Browse the repository at this point in the history
Added a check in availablePlugins.collectMetrics for a nil strategy that
was causing a panic when metric subscriptions were in the process of
being moved.

Modified the handling of load/unload plugin events to properly handle
moving subscriptions between differing versions of a plugin that
provides the same metrics and added tests for this behavior in
control/control_test.go.
  • Loading branch information
IRCody committed Apr 5, 2016
1 parent 2c573a1 commit 611e58e
Show file tree
Hide file tree
Showing 4 changed files with 261 additions and 7 deletions.
6 changes: 6 additions & 0 deletions control/available_plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -368,6 +368,12 @@ func (ap *availablePlugins) collectMetrics(pluginKey string, metricTypes []core.
if pool == nil {
return nil, serror.New(ErrPoolNotFound, map[string]interface{}{"pool-key": pluginKey})
}
// If the strategy is nil but the pool exists we likely are waiting on the pool to be fully initialized
// because of a plugin load/unload event that is currently being processed. Prevents panic from using nil
// RoutingAndCaching.
if pool.Strategy() == nil {
return nil, errors.New("Plugin strategy not set")
}

metricsToCollect, metricsFromCache := pool.CheckCache(metricTypes, taskID)

Expand Down
155 changes: 155 additions & 0 deletions control/control_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1418,3 +1418,158 @@ func TestProcessMetrics(t *testing.T) {

})
}

func TestMetricRolloverToNewVersion(t *testing.T) {
Convey("Given a metric that is being collected at v1", t, func() {
c := New(GetDefaultConfig())
lpe := newListenToPluginEvent()
c.eventManager.RegisterHandler("TestMetricRolloverToNewVersion", lpe)
c.Start()
rp, err := core.NewRequestedPlugin(path.Join(SnapPath, "plugin", "snap-collector-mock1"))
So(err, ShouldBeNil)
fmt.Println("before load mock1")
_, err = c.Load(rp)
So(err, ShouldBeNil)
fmt.Println("after load mock1")
select {
case <-lpe.done:
case <-time.After(10 * time.Second):
panic("error")
}
fmt.Println("after done loading mocok 1 event")
So(len(c.pluginManager.all()), ShouldEqual, 1)
lp, err2 := c.pluginManager.get("collector:mock:1")
So(err2, ShouldBeNil)
So(lp.Name(), ShouldResemble, "mock")
//Subscribe deps to create pools.
metric := MockMetricType{
namespace: []string{"intel", "mock", "foo"},
cfg: cdata.NewNode(),
ver: 0,
}
fmt.Println("before sub deps")
serr := c.SubscribeDeps("testTaskID", []core.Metric{metric}, []core.Plugin{})
So(serr, ShouldBeNil)
// collect metrics as a sanity check that everything is setup correctly
fmt.Println("collectMetrics")
_, errs := c.CollectMetrics([]core.Metric{metric}, time.Now(), "testTaskID")
So(errs, ShouldBeNil)
fmt.Println("after collect metrics")
//So(len(mts), ShouldEqual, 1)
// ensure the data coming back is from v1. V1's data is type string
//_, ok := mts[0].Data().(string)
//So(ok, ShouldEqual, true)
Convey("Loading v2 of that plugin should move subscriptions to newer version", func() {
// Load version snap-collector-mock2
rp, err := core.NewRequestedPlugin(path.Join(SnapPath, "plugin", "snap-collector-mock2"))
_, err = c.Load(rp)
So(err, ShouldBeNil)
// Wait for plugin load event to be done
select {
case <-lpe.done:
case <-time.After(10 * time.Second):
panic("error")
}
select {
case <-lpe.done:
case <-time.After(10 * time.Second):
panic("error")
}
select {
case <-lpe.done:
case <-time.After(10 * time.Second):
panic("error")
}
// wait for subscription event
// Check for subscription movement.
// Give some time for subscription to be moved.
var pool1 strategy.Pool
var errp error

pool1, errp = c.pluginRunner.AvailablePlugins().getOrCreatePool("collector:mock:1")
So(errp, ShouldBeNil)
So(pool1.SubscriptionCount(), ShouldEqual, 0)

pool2, errp := c.pluginRunner.AvailablePlugins().getOrCreatePool("collector:mock:2")
So(errp, ShouldBeNil)
So(pool2.SubscriptionCount(), ShouldEqual, 1)

// mts, errs = c.CollectMetrics([]core.Metric{metric}, time.Now(), "testTaskID")
// So(len(mts), ShouldEqual, 1)

// ensure the data coming back is from v2, V2's data is type int
// _, ok = mts[0].Data().(int)
// So(ok, ShouldEqual, true)
})
})
}

/*
func TestMetricRolloverToOlderVersion(t *testing.T) {
Convey("Given a metric that is being collected at v2", t, func() {
c := New(GetDefaultConfig())
lpe := newListenToPluginEvent()
c.eventManager.RegisterHandler("TestMetricRolloverToOlderVersion", lpe)
c.Start()
_, err := load(c, path.Join(SnapPath, "plugin", "snap-collector-mock2"))
So(err, ShouldBeNil)
<-lpe.done
So(len(c.pluginManager.all()), ShouldEqual, 1)
lp, err2 := c.pluginManager.get("collector:mock:2")
So(err2, ShouldBeNil)
So(lp.Name(), ShouldResemble, "mock")
//Subscribe deps to create pools.
metric := MockMetricType{
namespace: []string{"intel", "mock", "foo"},
cfg: cdata.NewNode(),
ver: 0,
}
serr := c.SubscribeDeps("testTaskID", []core.Metric{metric}, []core.Plugin{})
So(serr, ShouldBeNil)
// collect metrics as a sanity check that everything is setup correctly
mts, errs := c.CollectMetrics([]core.Metric{metric}, time.Now(), "testTaskID")
So(errs, ShouldBeNil)
So(len(mts), ShouldEqual, 1)
// ensure the data coming back is from v2. V2's data is type int
_, ok := mts[0].Data().(int)
So(ok, ShouldEqual, true)
// grab plugin for mock v2
pc := c.PluginCatalog()
mockv2 := pc[0]
Convey("Loading v1 of that plugin and unloading v2 should move subscriptions to older version", func() {
// Load version snap-collector-mock2
_, err = load(c, path.Join(SnapPath, "plugin", "snap-collector-mock1"))
So(err, ShouldBeNil)
// Wait for plugin load event to be done
<-lpe.done
_, err = c.Unload(mockv2)
So(err, ShouldBeNil)
// wait for unload event
<-lpe.done
// wait for PluginSubscription event
<-lpe.done
<-lpe.done
// Check for subscription movement.
// Give some time for subscription to be moved.
var pool1 strategy.Pool
var errp error
ap := c.pluginRunner.AvailablePlugins()
pool1, errp = ap.getOrCreatePool("collector:mock:2")
So(errp, ShouldBeNil)
So(pool1.SubscriptionCount(), ShouldEqual, 0)
pool2, errp := ap.getOrCreatePool("collector:mock:1")
So(errp, ShouldBeNil)
So(pool2.SubscriptionCount(), ShouldEqual, 1)
mts, errs = c.CollectMetrics([]core.Metric{metric}, time.Now(), "testTaskID")
So(len(mts), ShouldEqual, 1)
// ensure the data coming back is from v1, V1's data is type string
_, ok = mts[0].Data().(string)
So(ok, ShouldEqual, true)
})
})
}*/
102 changes: 97 additions & 5 deletions control/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -306,9 +306,87 @@ func (r *runner) HandleGomitEvent(e gomit.Event) {
if err != nil {
return
}
case *control_event.UnloadPluginEvent:
// On plugin unload, find the key and pool info for the plugin being unloaded.
r.availablePlugins.RLock()
var pool strategy.Pool
var k string
for key, p := range r.availablePlugins.table {
tnv := strings.Split(key, ":")
if core.PluginType(v.Type).String() == tnv[0] && v.Name == tnv[1] && v.Version == p.Version() {
pool = p
k = key
break
}
}

r.availablePlugins.RUnlock()
if pool == nil {
return
}
// Check for the highest lower version plugin and move subscriptions that
// are not bound to a plugin version to this pool.
plugin, err := r.pluginManager.get(fmt.Sprintf("%s:%s:%d", core.PluginType(v.Type).String(), v.Name, -1))
if err != nil {
return
}
newPool, err := r.availablePlugins.getOrCreatePool(plugin.Key())
if err != nil {
return
}
subs := pool.MoveSubscriptions(newPool)
// Start new plugins in newPool if needed
if newPool.Eligible() {
e := r.restartPlugin(plugin.Key())
if e != nil {
runnerLog.WithFields(log.Fields{
"_block": "handle-events",
}).Error(err.Error())
return
}
}
// Remove the unloaded plugin from available plugins
r.availablePlugins.Lock()
delete(r.availablePlugins.table, k)
r.availablePlugins.Unlock()
if len(subs) != 0 {
runnerLog.WithFields(log.Fields{
"_block": "subscribe-pool",
"event": v.Namespace(),
"plugin-name": v.Name,
"plugin-version": v.Version,
"plugin-type": core.PluginType(v.Type).String(),
}).Info("pool with subscriptions to move found")
for _, sub := range subs {
r.emitter.Emit(&control_event.PluginSubscriptionEvent{
PluginName: v.Name,
PluginVersion: v.Version,
TaskId: sub.TaskID,
PluginType: v.Type,
SubscriptionType: int(strategy.UnboundSubscriptionType),
})
r.emitter.Emit(&control_event.PluginUnsubscriptionEvent{
PluginName: v.Name,
PluginVersion: pool.Version(),
TaskId: sub.TaskID,
PluginType: v.Type,
})
r.emitter.Emit(&control_event.MovePluginSubscriptionEvent{
PluginName: v.Name,
PreviousVersion: pool.Version(),
NewVersion: v.Version,
TaskId: sub.TaskID,
PluginType: v.Type,
})
}
}
case *control_event.LoadPluginEvent:
// On loaded plugin event all subscriptions that are not bound to a specific version
// need to moved to the loaded version if it's version is greater than the currently
// available plugin.
var pool strategy.Pool
r.availablePlugins.RLock()
currentHighestVersion := -1
for key, p := range r.availablePlugins.pools() {
// tuple of type name and version
// type @ index 0, name @ index 1, version @ index 2
Expand All @@ -325,12 +403,17 @@ func (r *runner) HandleGomitEvent(e gomit.Event) {
}).Info("pool has bad key ", key)
continue
}

// attempt to find a pool whose type and name are the same, and whose version is
// less than newly loaded plugin. If we find it, break out of loop.
// less than newly loaded plugin.
if core.PluginType(v.Type).String() == tnv[0] && v.Name == tnv[1] && v.Version > p.Version() {
pool = p
break
// See if the pool version is higher than the current highest.
// We only want to move subscriptions from the currentHighest
// because that is where subscriptions that are bound to the
// latest version will be.
if p.Version() > currentHighestVersion {
pool = p
currentHighestVersion = p.Version()
}
}
}
r.availablePlugins.RUnlock()
Expand All @@ -355,8 +438,17 @@ func (r *runner) HandleGomitEvent(e gomit.Event) {
if err != nil {
return
}

// Move subscriptions to the new, higher versioned pool
subs := pool.MoveSubscriptions(newPool)
if newPool.Eligible() {
e := r.restartPlugin(plugin.Key())
if e != nil {
runnerLog.WithFields(log.Fields{
"_block": "handle-events",
}).Error(err.Error())
return
}
}
if len(subs) != 0 {
runnerLog.WithFields(log.Fields{
"_block": "subscribe-pool",
Expand Down
5 changes: 3 additions & 2 deletions control/strategy/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ func NewPool(key string, plugins ...AvailablePlugin) (Pool, error) {
RWMutex: &sync.RWMutex{},
version: ver,
key: key,
subs: map[string]*subscription{},
subs: make(map[string]*subscription),
plugins: make(map[uint32]AvailablePlugin),
max: MaximumRunningPlugins,
concurrencyCount: 1,
Expand Down Expand Up @@ -378,7 +378,8 @@ func (p *pool) MoveSubscriptions(to Pool) []subscription {
defer p.Unlock()

for task, sub := range p.subs {
if sub.SubType == UnboundSubscriptionType && to.(*pool).version > p.version {
// ensure that this sub was not bound to this pool specifically before moving
if sub.SubType == UnboundSubscriptionType {
subs = append(subs, *sub)
to.Subscribe(task, UnboundSubscriptionType)
delete(p.subs, task)
Expand Down

0 comments on commit 611e58e

Please sign in to comment.