From 3b2b54b56b9b2b4ef496b10ec7192ba49ddc63a9 Mon Sep 17 00:00:00 2001 From: geauxvirtual Date: Wed, 21 Sep 2016 10:36:43 -0700 Subject: [PATCH] Fix #1210: Do not start plugins unless all plugins are available for workflow --- control/subscription_group.go | 85 ++++++++++++++--------------------- 1 file changed, 34 insertions(+), 51 deletions(-) diff --git a/control/subscription_group.go b/control/subscription_group.go index 31a58e041..b06223e61 100644 --- a/control/subscription_group.go +++ b/control/subscription_group.go @@ -129,8 +129,11 @@ func (s subscriptionGroups) add(id string, requested []core.RequestedMetric, } errs := subscriptionGroup.process(id) + if errs != nil { + return errs + } s.subscriptionMap[id] = subscriptionGroup - return errs + return nil } // Remove removes a subscription group given a subscription group ID. @@ -386,73 +389,53 @@ func (s *subscriptionGroup) process(id string) (serrs []serror.SnapError) { func (s *subscriptionGroup) subscribePlugins(id string, plugins []core.SubscribedPlugin) (serrs []serror.SnapError) { - for _, sub := range plugins { + plgs := make([]*loadedPlugin, len(plugins)) + // First range through plugins to verify if all required plugins + // are available + for i, sub := range plugins { + plg, err := s.pluginManager.get(key(sub)) + if err != nil { + serrs = append(serrs, serror.New(err)) + return serrs + } + plgs[i] = plg + } + + // If all plugins are available, subscribe to pools and start + // plugins as needed + for _, plg := range plgs { controlLogger.WithFields(log.Fields{ - "name": sub.Name(), - "type": sub.TypeName(), - "version": sub.Version(), + "name": plg.Name(), + "type": plg.TypeName(), + "version": plg.Version(), "_block": "subscriptionGroup.subscribePlugins", }).Debug("plugin subscription") - if sub.Version() < 1 { - latest, err := s.pluginManager.get(fmt.Sprintf("%s"+core.Separator+"%s"+core.Separator+"%d", sub.TypeName(), - sub.Name(), sub.Version())) - if err != nil { - serrs = append(serrs, serror.New(err)) - return serrs - } - pool, err := s.pluginRunner.AvailablePlugins().getOrCreatePool(latest.Key()) + pool, err := s.pluginRunner.AvailablePlugins().getOrCreatePool(plg.Key()) + if err != nil { + serrs = append(serrs, serror.New(err)) + return serrs + } + pool.Subscribe(id) + if pool.Eligible() { + err = s.verifyPlugin(plg) if err != nil { serrs = append(serrs, serror.New(err)) return serrs } - pool.Subscribe(id) - if pool.Eligible() { - err = s.verifyPlugin(latest) - if err != nil { - serrs = append(serrs, serror.New(err)) - return serrs - } - err = s.pluginRunner.runPlugin(latest.Details) - if err != nil { - serrs = append(serrs, serror.New(err)) - return serrs - } - } - } else { - pool, err := s.pluginRunner.AvailablePlugins().getOrCreatePool(fmt.Sprintf("%s"+core.Separator+"%s"+core.Separator+"%d", - sub.TypeName(), sub.Name(), sub.Version())) + err = s.pluginRunner.runPlugin(plg.Details) if err != nil { serrs = append(serrs, serror.New(err)) return serrs } - pool.Subscribe(id) - if pool.Eligible() { - pl, err := s.pluginManager.get(fmt.Sprintf("%s"+core.Separator+"%s"+core.Separator+"%d", - sub.TypeName(), sub.Name(), sub.Version())) - if err != nil { - serrs = append(serrs, serror.New(err)) - return serrs - } - err = s.verifyPlugin(pl) - if err != nil { - serrs = append(serrs, serror.New(err)) - return serrs - } - err = s.pluginRunner.runPlugin(pl.Details) - if err != nil { - serrs = append(serrs, serror.New(err)) - return serrs - } - } } - serr := s.sendPluginSubscriptionEvent(id, sub) + serr := s.sendPluginSubscriptionEvent(id, plg) if serr != nil { serrs = append(serrs, serr) + return serrs } } - - return + return serrs } func (p *subscriptionGroup) unsubscribePlugins(id string,