From 2e8001cf1f185d31d760447165f739bdd11f6cb0 Mon Sep 17 00:00:00 2001 From: Cody Roseborough Date: Tue, 29 Mar 2016 17:15:33 -0700 Subject: [PATCH] Fix #824: Add proper handling of metric rollover Added a check in availablePlugins.collectMetrics for a nil strategy that was causing a panic when metric subscriptions were in the process of being moved. Modified the handling of load/unload plugin events to properly handle moving subscriptions between differing versions of a plugin that provides the same metrics and added tests for this behavior in control/control_test.go. --- control/available_plugin.go | 6 ++ control/control_test.go | 129 ++++++++++++++++++++++++++++++++++++ control/runner.go | 102 ++++++++++++++++++++++++++-- control/strategy/pool.go | 5 +- 4 files changed, 235 insertions(+), 7 deletions(-) diff --git a/control/available_plugin.go b/control/available_plugin.go index b629a3f50..0a8ef1a48 100644 --- a/control/available_plugin.go +++ b/control/available_plugin.go @@ -368,6 +368,12 @@ func (ap *availablePlugins) collectMetrics(pluginKey string, metricTypes []core. if pool == nil { return nil, serror.New(ErrPoolNotFound, map[string]interface{}{"pool-key": pluginKey}) } + // If the strategy is nil but the pool exists we likely are waiting on the pool to be fully initialized + // because of a plugin load/unload event that is currently being processed. Prevents panic from using nil + // RoutingAndCaching. + if pool.Strategy() == nil { + return nil, errors.New("Plugin strategy not set") + } metricsToCollect, metricsFromCache := pool.CheckCache(metricTypes, taskID) diff --git a/control/control_test.go b/control/control_test.go index 44d26f319..ee428e42e 100644 --- a/control/control_test.go +++ b/control/control_test.go @@ -1418,3 +1418,132 @@ func TestProcessMetrics(t *testing.T) { }) } + +func TestMetricRolloverToNewVersion(t *testing.T) { + Convey("Given a metric that is being collected at v1", t, func() { + c := New(GetDefaultConfig()) + lpe := newListenToPluginEvent() + c.eventManager.RegisterHandler("TestMetricRolloverToNewVersion", lpe) + c.Start() + _, err := load(c, path.Join(SnapPath, "plugin", "snap-collector-mock1")) + So(err, ShouldBeNil) + <-lpe.done + So(len(c.pluginManager.all()), ShouldEqual, 1) + lp, err2 := c.pluginManager.get("collector:mock:1") + So(err2, ShouldBeNil) + So(lp.Name(), ShouldResemble, "mock") + //Subscribe deps to create pools. + metric := MockMetricType{ + namespace: []string{"intel", "mock", "foo"}, + cfg: cdata.NewNode(), + ver: 0, + } + serr := c.SubscribeDeps("testTaskID", []core.Metric{metric}, []core.Plugin{}) + So(serr, ShouldBeNil) + // collect metrics as a sanity check that everything is setup correctly + //mts, errs := c.CollectMetrics([]core.Metric{metric}, time.Now(), "testTaskID") + //So(errs, ShouldBeNil) + //So(len(mts), ShouldEqual, 1) + // ensure the data coming back is from v1. V1's data is type string + //_, ok := mts[0].Data().(string) + //So(ok, ShouldEqual, true) + Convey("Loading v2 of that plugin should move subscriptions to newer version", func() { + // Load version snap-collector-mock2 + _, err = load(c, path.Join(SnapPath, "plugin", "snap-collector-mock2")) + So(err, ShouldBeNil) + // Wait for plugin load event to be done + <-lpe.done + // wait for subscription event + <-lpe.done + <-lpe.done + // Check for subscription movement. + // Give some time for subscription to be moved. + var pool1 strategy.Pool + var errp error + + pool1, errp = c.pluginRunner.AvailablePlugins().getOrCreatePool("collector:mock:1") + So(errp, ShouldBeNil) + So(pool1.SubscriptionCount(), ShouldEqual, 0) + + pool2, errp := c.pluginRunner.AvailablePlugins().getOrCreatePool("collector:mock:2") + So(errp, ShouldBeNil) + So(pool2.SubscriptionCount(), ShouldEqual, 1) + + // mts, errs = c.CollectMetrics([]core.Metric{metric}, time.Now(), "testTaskID") + // So(len(mts), ShouldEqual, 1) + + // ensure the data coming back is from v2, V2's data is type int + // _, ok = mts[0].Data().(int) + // So(ok, ShouldEqual, true) + }) + }) +} + +func TestMetricRolloverToOlderVersion(t *testing.T) { + Convey("Given a metric that is being collected at v2", t, func() { + c := New(GetDefaultConfig()) + lpe := newListenToPluginEvent() + c.eventManager.RegisterHandler("TestMetricRolloverToOlderVersion", lpe) + c.Start() + _, err := load(c, path.Join(SnapPath, "plugin", "snap-collector-mock2")) + So(err, ShouldBeNil) + <-lpe.done + So(len(c.pluginManager.all()), ShouldEqual, 1) + lp, err2 := c.pluginManager.get("collector:mock:2") + So(err2, ShouldBeNil) + So(lp.Name(), ShouldResemble, "mock") + //Subscribe deps to create pools. + metric := MockMetricType{ + namespace: []string{"intel", "mock", "foo"}, + cfg: cdata.NewNode(), + ver: 0, + } + serr := c.SubscribeDeps("testTaskID", []core.Metric{metric}, []core.Plugin{}) + So(serr, ShouldBeNil) + // collect metrics as a sanity check that everything is setup correctly + mts, errs := c.CollectMetrics([]core.Metric{metric}, time.Now(), "testTaskID") + So(errs, ShouldBeNil) + So(len(mts), ShouldEqual, 1) + // ensure the data coming back is from v2. V2's data is type int + _, ok := mts[0].Data().(int) + So(ok, ShouldEqual, true) + // grab plugin for mock v2 + pc := c.PluginCatalog() + mockv2 := pc[0] + Convey("Loading v1 of that plugin and unloading v2 should move subscriptions to older version", func() { + // Load version snap-collector-mock2 + _, err = load(c, path.Join(SnapPath, "plugin", "snap-collector-mock1")) + So(err, ShouldBeNil) + // Wait for plugin load event to be done + <-lpe.done + + _, err = c.Unload(mockv2) + So(err, ShouldBeNil) + // wait for unload event + <-lpe.done + // wait for PluginSubscription event + <-lpe.done + <-lpe.done + // Check for subscription movement. + // Give some time for subscription to be moved. + var pool1 strategy.Pool + var errp error + ap := c.pluginRunner.AvailablePlugins() + pool1, errp = ap.getOrCreatePool("collector:mock:2") + So(errp, ShouldBeNil) + So(pool1.SubscriptionCount(), ShouldEqual, 0) + + pool2, errp := ap.getOrCreatePool("collector:mock:1") + So(errp, ShouldBeNil) + So(pool2.SubscriptionCount(), ShouldEqual, 1) + + mts, errs = c.CollectMetrics([]core.Metric{metric}, time.Now(), "testTaskID") + So(len(mts), ShouldEqual, 1) + + // ensure the data coming back is from v1, V1's data is type string + _, ok = mts[0].Data().(string) + So(ok, ShouldEqual, true) + }) + }) + +} diff --git a/control/runner.go b/control/runner.go index 831bc81c1..a8df7bfda 100644 --- a/control/runner.go +++ b/control/runner.go @@ -306,9 +306,87 @@ func (r *runner) HandleGomitEvent(e gomit.Event) { if err != nil { return } + case *control_event.UnloadPluginEvent: + // On plugin unload, find the key and pool info for the plugin being unloaded. + r.availablePlugins.RLock() + var pool strategy.Pool + var k string + for key, p := range r.availablePlugins.table { + tnv := strings.Split(key, ":") + if core.PluginType(v.Type).String() == tnv[0] && v.Name == tnv[1] && v.Version == p.Version() { + pool = p + k = key + break + } + } + + r.availablePlugins.RUnlock() + if pool == nil { + return + } + // Check for the highest lower version plugin and move subscriptions that + // are not bound to a plugin version to this pool. + plugin, err := r.pluginManager.get(fmt.Sprintf("%s:%s:%d", core.PluginType(v.Type).String(), v.Name, -1)) + if err != nil { + return + } + newPool, err := r.availablePlugins.getOrCreatePool(plugin.Key()) + if err != nil { + return + } + subs := pool.MoveSubscriptions(newPool) + // Start new plugins in newPool if needed + if newPool.Eligible() { + e := r.restartPlugin(plugin.Key()) + if e != nil { + runnerLog.WithFields(log.Fields{ + "_block": "handle-events", + }).Error(err.Error()) + return + } + } + // Remove the unloaded plugin from available plugins + r.availablePlugins.Lock() + delete(r.availablePlugins.table, k) + r.availablePlugins.Unlock() + if len(subs) != 0 { + runnerLog.WithFields(log.Fields{ + "_block": "subscribe-pool", + "event": v.Namespace(), + "plugin-name": v.Name, + "plugin-version": v.Version, + "plugin-type": core.PluginType(v.Type).String(), + }).Info("pool with subscriptions to move found") + for _, sub := range subs { + r.emitter.Emit(&control_event.PluginSubscriptionEvent{ + PluginName: v.Name, + PluginVersion: v.Version, + TaskId: sub.TaskID, + PluginType: v.Type, + SubscriptionType: int(strategy.UnboundSubscriptionType), + }) + r.emitter.Emit(&control_event.PluginUnsubscriptionEvent{ + PluginName: v.Name, + PluginVersion: pool.Version(), + TaskId: sub.TaskID, + PluginType: v.Type, + }) + r.emitter.Emit(&control_event.MovePluginSubscriptionEvent{ + PluginName: v.Name, + PreviousVersion: pool.Version(), + NewVersion: v.Version, + TaskId: sub.TaskID, + PluginType: v.Type, + }) + } + } case *control_event.LoadPluginEvent: + // On loaded plugin event all subscriptions that are not bound to a specific version + // need to moved to the loaded version if it's version is greater than the currently + // available plugin. var pool strategy.Pool r.availablePlugins.RLock() + currentHighestVersion := -1 for key, p := range r.availablePlugins.pools() { // tuple of type name and version // type @ index 0, name @ index 1, version @ index 2 @@ -325,12 +403,17 @@ func (r *runner) HandleGomitEvent(e gomit.Event) { }).Info("pool has bad key ", key) continue } - // attempt to find a pool whose type and name are the same, and whose version is - // less than newly loaded plugin. If we find it, break out of loop. + // less than newly loaded plugin. if core.PluginType(v.Type).String() == tnv[0] && v.Name == tnv[1] && v.Version > p.Version() { - pool = p - break + // See if the pool version is higher than the current highest. + // We only want to move subscriptions from the currentHighest + // because that is where subscriptions that are bound to the + // latest version will be. + if p.Version() > currentHighestVersion { + pool = p + currentHighestVersion = p.Version() + } } } r.availablePlugins.RUnlock() @@ -355,8 +438,17 @@ func (r *runner) HandleGomitEvent(e gomit.Event) { if err != nil { return } - + // Move subscriptions to the new, higher versioned pool subs := pool.MoveSubscriptions(newPool) + if newPool.Eligible() { + e := r.restartPlugin(plugin.Key()) + if e != nil { + runnerLog.WithFields(log.Fields{ + "_block": "handle-events", + }).Error(err.Error()) + return + } + } if len(subs) != 0 { runnerLog.WithFields(log.Fields{ "_block": "subscribe-pool", diff --git a/control/strategy/pool.go b/control/strategy/pool.go index 6cc6ac7a9..0766dd08f 100644 --- a/control/strategy/pool.go +++ b/control/strategy/pool.go @@ -140,7 +140,7 @@ func NewPool(key string, plugins ...AvailablePlugin) (Pool, error) { RWMutex: &sync.RWMutex{}, version: ver, key: key, - subs: map[string]*subscription{}, + subs: make(map[string]*subscription), plugins: make(map[uint32]AvailablePlugin), max: MaximumRunningPlugins, concurrencyCount: 1, @@ -378,7 +378,8 @@ func (p *pool) MoveSubscriptions(to Pool) []subscription { defer p.Unlock() for task, sub := range p.subs { - if sub.SubType == UnboundSubscriptionType && to.(*pool).version > p.version { + // ensure that this sub was not bound to this pool specifically before moving + if sub.SubType == UnboundSubscriptionType { subs = append(subs, *sub) to.Subscribe(task, UnboundSubscriptionType) delete(p.subs, task)