diff --git a/control/available_plugin.go b/control/available_plugin.go index b629a3f50..0a8ef1a48 100644 --- a/control/available_plugin.go +++ b/control/available_plugin.go @@ -368,6 +368,12 @@ func (ap *availablePlugins) collectMetrics(pluginKey string, metricTypes []core. if pool == nil { return nil, serror.New(ErrPoolNotFound, map[string]interface{}{"pool-key": pluginKey}) } + // If the strategy is nil but the pool exists we likely are waiting on the pool to be fully initialized + // because of a plugin load/unload event that is currently being processed. Prevents panic from using nil + // RoutingAndCaching. + if pool.Strategy() == nil { + return nil, errors.New("Plugin strategy not set") + } metricsToCollect, metricsFromCache := pool.CheckCache(metricTypes, taskID) diff --git a/control/control.go b/control/control.go index 744359bac..43b3835bc 100644 --- a/control/control.go +++ b/control/control.go @@ -589,18 +589,22 @@ func (p *pluginControl) validateMetricTypeSubscription(mt core.RequestedMetric, return serrs } -func (p *pluginControl) gatherCollectors(mts []core.Metric) ([]core.Plugin, []serror.SnapError) { +type gatheredPlugin struct { + plugin core.Plugin + subscriptionType strategy.SubscriptionType +} + +func (p *pluginControl) gatherCollectors(mts []core.Metric) ([]gatheredPlugin, []serror.SnapError) { var ( - plugins []core.Plugin + plugins []gatheredPlugin serrs []serror.SnapError ) - // here we resolve and retrieve plugins for each metric type. // if the incoming metric type version is < 1, we treat that as // latest as with plugins. The following two loops create a set // of plugins with proper versions needed to discern the subscription // types. - colPlugins := make(map[string]*loadedPlugin) + colPlugins := make(map[string]gatheredPlugin) for _, mt := range mts { // If the version provided is <1 we will get the latest // plugin for the given metric. @@ -612,8 +616,14 @@ func (p *pluginControl) gatherCollectors(mts []core.Metric) ([]core.Plugin, []se })) continue } - - colPlugins[m.Plugin.Key()] = m.Plugin + subType := strategy.BoundSubscriptionType + if mt.Version() < 1 { + subType = strategy.UnboundSubscriptionType + } + colPlugins[fmt.Sprintf("%s:%d", m.Plugin.Key(), subType)] = gatheredPlugin{ + plugin: m.Plugin, + subscriptionType: subType, + } } if len(serrs) > 0 { return plugins, serrs @@ -622,19 +632,44 @@ func (p *pluginControl) gatherCollectors(mts []core.Metric) ([]core.Plugin, []se for _, lp := range colPlugins { plugins = append(plugins, lp) } - + if len(plugins) == 0 { + serrs = append(serrs, serror.New(errors.New("No plugins found"))) + return nil, serrs + } return plugins, nil } func (p *pluginControl) SubscribeDeps(taskID string, mts []core.Metric, plugins []core.Plugin) []serror.SnapError { var serrs []serror.SnapError - collectors, errs := p.gatherCollectors(mts) if len(errs) > 0 { serrs = append(serrs) } - plugins = append(plugins, collectors...) + for _, gc := range collectors { + pool, err := p.pluginRunner.AvailablePlugins().getOrCreatePool(fmt.Sprintf("%s:%s:%d", gc.plugin.TypeName(), gc.plugin.Name(), gc.plugin.Version())) + if err != nil { + serrs = append(serrs, serror.New(err)) + return serrs + } + pool.Subscribe(taskID, gc.subscriptionType) + if pool.Eligible() { + err = p.verifyPlugin(gc.plugin.(*loadedPlugin)) + if err != nil { + serrs = append(serrs, serror.New(err)) + return serrs + } + err = p.pluginRunner.runPlugin(gc.plugin.(*loadedPlugin).Details) + if err != nil { + serrs = append(serrs, serror.New(err)) + return serrs + } + } + serr := p.sendPluginSubscriptionEvent(taskID, gc.plugin) + if serr != nil { + serrs = append(serrs, serr) + } + } for _, sub := range plugins { // pools are created statically, not with keys like "publisher:foo:-1" // here we check to see if the version of the incoming plugin is -1, and @@ -741,7 +776,9 @@ func (p *pluginControl) UnsubscribeDeps(taskID string, mts []core.Metric, plugin if len(errs) > 0 { serrs = append(serrs, errs...) } - plugins = append(plugins, collectors...) + for _, gc := range collectors { + plugins = append(plugins, gc.plugin) + } for _, sub := range plugins { pool, err := p.pluginRunner.AvailablePlugins().getPool(fmt.Sprintf("%s:%s:%d", sub.TypeName(), sub.Name(), sub.Version())) diff --git a/control/control_test.go b/control/control_test.go index 4eb8da136..1092cd9d4 100644 --- a/control/control_test.go +++ b/control/control_test.go @@ -31,6 +31,7 @@ import ( "testing" "time" + log "github.com/Sirupsen/logrus" "github.com/pborman/uuid" . "github.com/smartystreets/goconvey/convey" "github.com/vrischmann/jsonutil" @@ -256,40 +257,48 @@ func newListenToPluginEvent() *listenToPluginEvent { } func (l *listenToPluginEvent) HandleGomitEvent(e gomit.Event) { - go func() { - switch v := e.Body.(type) { - case *control_event.RestartedAvailablePluginEvent: - l.plugin.EventNamespace = v.Namespace() - l.done <- struct{}{} - case *control_event.MaxPluginRestartsExceededEvent: - l.plugin.EventNamespace = v.Namespace() - l.done <- struct{}{} - case *control_event.DeadAvailablePluginEvent: - l.plugin.EventNamespace = v.Namespace() - l.done <- struct{}{} - case *control_event.LoadPluginEvent: - l.plugin.LoadedPluginName = v.Name - l.plugin.LoadedPluginVersion = v.Version - l.plugin.PluginType = v.Type - l.done <- struct{}{} - case *control_event.UnloadPluginEvent: - l.plugin.UnloadedPluginName = v.Name - l.plugin.UnloadedPluginVersion = v.Version - l.plugin.PluginType = v.Type - l.done <- struct{}{} - case *control_event.SwapPluginsEvent: - l.plugin.LoadedPluginName = v.LoadedPluginName - l.plugin.LoadedPluginVersion = v.LoadedPluginVersion - l.plugin.UnloadedPluginName = v.UnloadedPluginName - l.plugin.UnloadedPluginVersion = v.UnloadedPluginVersion - l.plugin.PluginType = v.PluginType - l.done <- struct{}{} - case *control_event.PluginSubscriptionEvent: - l.done <- struct{}{} - default: - fmt.Println("Got an event you're not handling") - } - }() + switch v := e.Body.(type) { + case *control_event.RestartedAvailablePluginEvent: + l.plugin.EventNamespace = v.Namespace() + l.done <- struct{}{} + case *control_event.MaxPluginRestartsExceededEvent: + l.plugin.EventNamespace = v.Namespace() + l.done <- struct{}{} + case *control_event.DeadAvailablePluginEvent: + l.plugin.EventNamespace = v.Namespace() + l.done <- struct{}{} + case *control_event.LoadPluginEvent: + l.plugin.LoadedPluginName = v.Name + l.plugin.LoadedPluginVersion = v.Version + l.plugin.PluginType = v.Type + l.plugin.EventNamespace = v.Namespace() + l.done <- struct{}{} + case *control_event.UnloadPluginEvent: + l.plugin.UnloadedPluginName = v.Name + l.plugin.UnloadedPluginVersion = v.Version + l.plugin.PluginType = v.Type + l.plugin.EventNamespace = v.Namespace() + l.done <- struct{}{} + case *control_event.SwapPluginsEvent: + l.plugin.LoadedPluginName = v.LoadedPluginName + l.plugin.LoadedPluginVersion = v.LoadedPluginVersion + l.plugin.UnloadedPluginName = v.UnloadedPluginName + l.plugin.UnloadedPluginVersion = v.UnloadedPluginVersion + l.plugin.PluginType = v.PluginType + l.plugin.EventNamespace = v.Namespace() + l.done <- struct{}{} + case *control_event.MovePluginSubscriptionEvent: + l.plugin.EventNamespace = v.Namespace() + l.done <- struct{}{} + case *control_event.PluginSubscriptionEvent: + l.plugin.EventNamespace = v.Namespace() + l.done <- struct{}{} + default: + controlLogger.WithFields(log.Fields{ + "event:": v.Namespace(), + "_block": "HandleGomit", + }).Info("Unhandled Event") + } } var ( @@ -1414,7 +1423,7 @@ func TestGatherCollectors(t *testing.T) { plgs, errs := c.gatherCollectors(m) So(errs, ShouldBeNil) So(plgs, ShouldNotBeEmpty) - So(plgs[0].Version(), ShouldEqual, 1) + So(plgs[0].plugin.Version(), ShouldEqual, 1) }) Convey("it gathers the queried version of plugin", func() { Convey("the version is available", func() { @@ -1428,7 +1437,7 @@ func TestGatherCollectors(t *testing.T) { plgs, errs := c.gatherCollectors(m) So(errs, ShouldBeNil) So(plgs, ShouldNotBeEmpty) - So(plgs[0].Version(), ShouldEqual, v) + So(plgs[0].plugin.Version(), ShouldEqual, v) }) Convey("the version is not available", func() { m := []core.Metric{ @@ -1588,3 +1597,156 @@ func TestProcessMetrics(t *testing.T) { }) } + +type listenToPluginSubscriptionEvent struct { + plugin *mockPluginEvent + done chan struct{} +} + +func newListenToPluginSubscriptionEvent() *listenToPluginSubscriptionEvent { + return &listenToPluginSubscriptionEvent{ + done: make(chan struct{}), + plugin: &mockPluginEvent{}, + } +} + +func (l *listenToPluginSubscriptionEvent) HandleGomitEvent(e gomit.Event) { + switch v := e.Body.(type) { + case *control_event.MovePluginSubscriptionEvent: + l.plugin.EventNamespace = v.Namespace() + l.done <- struct{}{} + default: + controlLogger.WithFields(log.Fields{ + "event:": v.Namespace(), + "_block": "HandleGomit", + }).Info("Unhandled Event") + } +} +func TestMetricSubscriptionToNewVersion(t *testing.T) { + Convey("Given a metric that is being collected at v1", t, func() { + c := New(GetDefaultConfig()) + lpe := newListenToPluginSubscriptionEvent() + c.eventManager.RegisterHandler("TestMetricSubscriptionToNewVersion", lpe) + c.Start() + _, err := load(c, path.Join(SnapPath, "plugin", "snap-collector-mock1")) + So(err, ShouldBeNil) + So(len(c.pluginManager.all()), ShouldEqual, 1) + lp, err2 := c.pluginManager.get("collector:mock:1") + So(err2, ShouldBeNil) + So(lp.Name(), ShouldResemble, "mock") + //Subscribe deps to create pools. + metric := MockMetricType{ + namespace: []string{"intel", "mock", "foo"}, + cfg: cdata.NewNode(), + ver: 0, + } + So(metric.Version(), ShouldEqual, 0) + serr := c.SubscribeDeps("testTaskID", []core.Metric{metric}, []core.Plugin{}) + So(serr, ShouldBeNil) + // collect metrics as a sanity check that everything is setup correctly + mts, errs := c.CollectMetrics([]core.Metric{metric}, time.Now(), "testTaskID") + So(errs, ShouldBeNil) + So(len(mts), ShouldEqual, 1) + // ensure the data coming back is from v1. V1's data is type string + _, ok := mts[0].Data().(string) + So(ok, ShouldEqual, true) + Convey("Loading v2 of that plugin should move subscriptions to newer version", func() { + // Load version snap-collector-mock2 + _, err := load(c, path.Join(SnapPath, "plugin", "snap-collector-mock2")) + So(err, ShouldBeNil) + select { + // Wait on subscriptionMovedEvent + case <-lpe.done: + case <-time.After(10 * time.Second): + fmt.Println("timeout waiting for move subscription event") + So(false, ShouldEqual, true) + } + // Check for subscription movement. + // Give some time for subscription to be moved. + var pool1 strategy.Pool + var errp error + + pool1, errp = c.pluginRunner.AvailablePlugins().getOrCreatePool("collector:mock:1") + So(errp, ShouldBeNil) + So(pool1.SubscriptionCount(), ShouldEqual, 0) + + pool2, errp := c.pluginRunner.AvailablePlugins().getOrCreatePool("collector:mock:2") + So(errp, ShouldBeNil) + So(pool2.SubscriptionCount(), ShouldEqual, 1) + + mts, errs = c.CollectMetrics([]core.Metric{metric}, time.Now(), "testTaskID") + So(len(mts), ShouldEqual, 1) + + // ensure the data coming back is from v2, V2's data is type int + _, ok = mts[0].Data().(int) + So(ok, ShouldEqual, true) + }) + }) +} + +func TestMetricSubscriptionToOlderVersion(t *testing.T) { + Convey("Given a metric that is being collected at v2", t, func() { + c := New(GetDefaultConfig()) + lpe := newListenToPluginSubscriptionEvent() + c.eventManager.RegisterHandler("TestMetricSubscriptionToOlderVersion", lpe) + c.Start() + _, err := load(c, path.Join(SnapPath, "plugin", "snap-collector-mock2")) + So(err, ShouldBeNil) + So(len(c.pluginManager.all()), ShouldEqual, 1) + lp, err2 := c.pluginManager.get("collector:mock:2") + So(err2, ShouldBeNil) + So(lp.Name(), ShouldResemble, "mock") + //Subscribe deps to create pools. + metric := MockMetricType{ + namespace: []string{"intel", "mock", "foo"}, + cfg: cdata.NewNode(), + ver: 0, + } + serr := c.SubscribeDeps("testTaskID", []core.Metric{metric}, []core.Plugin{}) + So(serr, ShouldBeNil) + // collect metrics as a sanity check that everything is setup correctly + mts, errs := c.CollectMetrics([]core.Metric{metric}, time.Now(), "testTaskID") + So(errs, ShouldBeNil) + So(len(mts), ShouldEqual, 1) + // ensure the data coming back is from v2. V2's data is type int + _, ok := mts[0].Data().(int) + So(ok, ShouldEqual, true) + // grab plugin for mock v2 + pc := c.PluginCatalog() + mockv2 := pc[0] + Convey("Loading v1 of that plugin and unloading v2 should move subscriptions to older version", func() { + // Load version snap-collector-mock2 + _, err = load(c, path.Join(SnapPath, "plugin", "snap-collector-mock1")) + So(err, ShouldBeNil) + _, err = c.Unload(mockv2) + So(err, ShouldBeNil) + select { + // Wait on subscriptionMovedEvent + case <-lpe.done: + case <-time.After(10 * time.Second): + fmt.Println("Timeout waiting for move subscription event") + So(false, ShouldEqual, true) + } + // Check for subscription movement. + // Give some time for subscription to be moved. + var pool1 strategy.Pool + var errp error + ap := c.pluginRunner.AvailablePlugins() + pool1, errp = ap.getOrCreatePool("collector:mock:2") + So(errp, ShouldBeNil) + So(pool1.SubscriptionCount(), ShouldEqual, 0) + + pool2, errp := ap.getOrCreatePool("collector:mock:1") + So(errp, ShouldBeNil) + So(pool2.SubscriptionCount(), ShouldEqual, 1) + + mts, errs = c.CollectMetrics([]core.Metric{metric}, time.Now(), "testTaskID") + So(len(mts), ShouldEqual, 1) + + // ensure the data coming back is from v1, V1's data is type string + _, ok = mts[0].Data().(string) + So(ok, ShouldEqual, true) + }) + }) + +} diff --git a/control/runner.go b/control/runner.go index 831bc81c1..379320afd 100644 --- a/control/runner.go +++ b/control/runner.go @@ -306,9 +306,91 @@ func (r *runner) HandleGomitEvent(e gomit.Event) { if err != nil { return } + case *control_event.UnloadPluginEvent: + // On plugin unload, find the key and pool info for the plugin being unloaded. + r.availablePlugins.RLock() + var pool strategy.Pool + var k string + for key, p := range r.availablePlugins.table { + tnv := strings.Split(key, ":") + if core.PluginType(v.Type).String() == tnv[0] && v.Name == tnv[1] && v.Version == p.Version() { + pool = p + k = key + break + } + } + + r.availablePlugins.RUnlock() + if pool == nil { + return + } + // Check for the highest lower version plugin and move subscriptions that + // are not bound to a plugin version to this pool. + plugin, err := r.pluginManager.get(fmt.Sprintf("%s:%s:%d", core.PluginType(v.Type).String(), v.Name, -1)) + if err != nil { + return + } + newPool, err := r.availablePlugins.getOrCreatePool(plugin.Key()) + if err != nil { + return + } + subs := pool.MoveSubscriptions(newPool) + // Start new plugins in newPool if needed + if newPool.Eligible() { + e := r.restartPlugin(plugin.Key()) + if e != nil { + runnerLog.WithFields(log.Fields{ + "_block": "handle-events", + }).Error(err.Error()) + return + } + } + // Remove the unloaded plugin from available plugins + r.availablePlugins.Lock() + delete(r.availablePlugins.table, k) + r.availablePlugins.Unlock() + pool.RLock() + defer pool.RUnlock() + if len(subs) != 0 { + runnerLog.WithFields(log.Fields{ + "_block": "subscribe-pool", + "event": v.Namespace(), + "plugin-name": v.Name, + "plugin-version": v.Version, + "plugin-type": core.PluginType(v.Type).String(), + }).Info("pool with subscriptions to move found") + for _, sub := range subs { + r.emitter.Emit(&control_event.PluginSubscriptionEvent{ + PluginName: v.Name, + PluginVersion: v.Version, + TaskId: sub.TaskID, + PluginType: v.Type, + SubscriptionType: int(strategy.UnboundSubscriptionType), + }) + r.emitter.Emit(&control_event.PluginUnsubscriptionEvent{ + PluginName: v.Name, + PluginVersion: pool.Version(), + TaskId: sub.TaskID, + PluginType: v.Type, + }) + r.emitter.Emit(&control_event.MovePluginSubscriptionEvent{ + PluginName: v.Name, + PreviousVersion: pool.Version(), + NewVersion: v.Version, + TaskId: sub.TaskID, + PluginType: v.Type, + }) + } + } case *control_event.LoadPluginEvent: + // On loaded plugin event all subscriptions that are not bound to a specific version + // need to moved to the loaded version if it's version is greater than the currently + // available plugin. var pool strategy.Pool + //k := fmt.Sprintf("%v:%v:%v", core.PluginType(v.Type).String(), v.Name, -1) + //pool, _ = r.availablePlugins.getPool(k) r.availablePlugins.RLock() + currentHighestVersion := -1 for key, p := range r.availablePlugins.pools() { // tuple of type name and version // type @ index 0, name @ index 1, version @ index 2 @@ -325,15 +407,21 @@ func (r *runner) HandleGomitEvent(e gomit.Event) { }).Info("pool has bad key ", key) continue } - // attempt to find a pool whose type and name are the same, and whose version is - // less than newly loaded plugin. If we find it, break out of loop. + // less than newly loaded plugin. if core.PluginType(v.Type).String() == tnv[0] && v.Name == tnv[1] && v.Version > p.Version() { - pool = p - break + // See if the pool version is higher than the current highest. + // We only want to move subscriptions from the currentHighest + // because that is where subscriptions that are bound to the + // latest version will be. + if p.Version() > currentHighestVersion { + pool = p + currentHighestVersion = p.Version() + } } } r.availablePlugins.RUnlock() + // now check to see if anything was put where pool points. // if not, there are no older pools whose subscriptions need to be // moved. @@ -356,7 +444,24 @@ func (r *runner) HandleGomitEvent(e gomit.Event) { return } + // Move subscriptions to the new, higher versioned pool subs := pool.MoveSubscriptions(newPool) + if newPool.Eligible() { + e := r.restartPlugin(plugin.Key()) + if e != nil { + runnerLog.WithFields(log.Fields{ + "_block": "handle-events", + }).Error(err.Error()) + return + } + runnerLog.WithFields(log.Fields{ + "_block": "pool eligible", + }).Info("starting plugin") + } + + pool.RLock() + defer pool.RUnlock() + if len(subs) != 0 { runnerLog.WithFields(log.Fields{ "_block": "subscribe-pool", diff --git a/control/strategy/pool.go b/control/strategy/pool.go index 6cc6ac7a9..1b1268d44 100644 --- a/control/strategy/pool.go +++ b/control/strategy/pool.go @@ -34,11 +34,11 @@ import ( "github.com/intelsdi-x/snap/core/serror" ) -type subscriptionType int +type SubscriptionType int const ( // this subscription is bound to an explicit version - BoundSubscriptionType subscriptionType = iota + BoundSubscriptionType SubscriptionType = iota // this subscription is akin to "latest" and must be moved if a newer version is loaded. UnboundSubscriptionType ) @@ -68,7 +68,7 @@ type Pool interface { SelectAndKill(taskID, reason string) SelectAP(taskID string) (SelectablePlugin, serror.SnapError) Strategy() RoutingAndCaching - Subscribe(taskID string, subType subscriptionType) + Subscribe(taskID string, subType SubscriptionType) SubscriptionCount() int Unsubscribe(taskID string) Version() int @@ -90,7 +90,7 @@ type AvailablePlugin interface { } type subscription struct { - SubType subscriptionType + SubType SubscriptionType Version int TaskID string } @@ -140,7 +140,7 @@ func NewPool(key string, plugins ...AvailablePlugin) (Pool, error) { RWMutex: &sync.RWMutex{}, version: ver, key: key, - subs: map[string]*subscription{}, + subs: make(map[string]*subscription), plugins: make(map[uint32]AvailablePlugin), max: MaximumRunningPlugins, concurrencyCount: 1, @@ -236,7 +236,7 @@ func (p *pool) applyPluginMeta(a AvailablePlugin) error { // subscribe adds a subscription to the pool. // Using subscribe is idempotent. -func (p *pool) Subscribe(taskID string, subType subscriptionType) { +func (p *pool) Subscribe(taskID string, subType SubscriptionType) { p.Lock() defer p.Unlock() @@ -373,12 +373,16 @@ func (p *pool) generatePID() uint32 { // MoveSubscriptions moves subscriptions to another pool func (p *pool) MoveSubscriptions(to Pool) []subscription { var subs []subscription - + // If attempting to move between the same pool + // bail to prevent deadlock. + if to.(*pool) == p { + return []subscription{} + } p.Lock() defer p.Unlock() - for task, sub := range p.subs { - if sub.SubType == UnboundSubscriptionType && to.(*pool).version > p.version { + // ensure that this sub was not bound to this pool specifically before moving + if sub.SubType == UnboundSubscriptionType { subs = append(subs, *sub) to.Subscribe(task, UnboundSubscriptionType) delete(p.subs, task)