Skip to content
This repository has been archived by the owner on Nov 8, 2022. It is now read-only.

Commit

Permalink
Fix #1210: Do not start plugins unless all plugins are available for …
Browse files Browse the repository at this point in the history
…workflow
  • Loading branch information
geauxvirtual committed Sep 21, 2016
1 parent 6dee889 commit 3b2b54b
Showing 1 changed file with 34 additions and 51 deletions.
85 changes: 34 additions & 51 deletions control/subscription_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,8 +129,11 @@ func (s subscriptionGroups) add(id string, requested []core.RequestedMetric,
}

errs := subscriptionGroup.process(id)
if errs != nil {
return errs
}
s.subscriptionMap[id] = subscriptionGroup
return errs
return nil
}

// Remove removes a subscription group given a subscription group ID.
Expand Down Expand Up @@ -386,73 +389,53 @@ func (s *subscriptionGroup) process(id string) (serrs []serror.SnapError) {

func (s *subscriptionGroup) subscribePlugins(id string,
plugins []core.SubscribedPlugin) (serrs []serror.SnapError) {
for _, sub := range plugins {
plgs := make([]*loadedPlugin, len(plugins))
// First range through plugins to verify if all required plugins
// are available
for i, sub := range plugins {
plg, err := s.pluginManager.get(key(sub))
if err != nil {
serrs = append(serrs, serror.New(err))
return serrs
}
plgs[i] = plg
}

// If all plugins are available, subscribe to pools and start
// plugins as needed
for _, plg := range plgs {
controlLogger.WithFields(log.Fields{
"name": sub.Name(),
"type": sub.TypeName(),
"version": sub.Version(),
"name": plg.Name(),
"type": plg.TypeName(),
"version": plg.Version(),
"_block": "subscriptionGroup.subscribePlugins",
}).Debug("plugin subscription")
if sub.Version() < 1 {
latest, err := s.pluginManager.get(fmt.Sprintf("%s"+core.Separator+"%s"+core.Separator+"%d", sub.TypeName(),
sub.Name(), sub.Version()))
if err != nil {
serrs = append(serrs, serror.New(err))
return serrs
}
pool, err := s.pluginRunner.AvailablePlugins().getOrCreatePool(latest.Key())
pool, err := s.pluginRunner.AvailablePlugins().getOrCreatePool(plg.Key())
if err != nil {
serrs = append(serrs, serror.New(err))
return serrs
}
pool.Subscribe(id)
if pool.Eligible() {
err = s.verifyPlugin(plg)
if err != nil {
serrs = append(serrs, serror.New(err))
return serrs
}
pool.Subscribe(id)
if pool.Eligible() {
err = s.verifyPlugin(latest)
if err != nil {
serrs = append(serrs, serror.New(err))
return serrs
}
err = s.pluginRunner.runPlugin(latest.Details)
if err != nil {
serrs = append(serrs, serror.New(err))
return serrs
}
}
} else {
pool, err := s.pluginRunner.AvailablePlugins().getOrCreatePool(fmt.Sprintf("%s"+core.Separator+"%s"+core.Separator+"%d",
sub.TypeName(), sub.Name(), sub.Version()))
err = s.pluginRunner.runPlugin(plg.Details)
if err != nil {
serrs = append(serrs, serror.New(err))
return serrs
}
pool.Subscribe(id)
if pool.Eligible() {
pl, err := s.pluginManager.get(fmt.Sprintf("%s"+core.Separator+"%s"+core.Separator+"%d",
sub.TypeName(), sub.Name(), sub.Version()))
if err != nil {
serrs = append(serrs, serror.New(err))
return serrs
}
err = s.verifyPlugin(pl)
if err != nil {
serrs = append(serrs, serror.New(err))
return serrs
}
err = s.pluginRunner.runPlugin(pl.Details)
if err != nil {
serrs = append(serrs, serror.New(err))
return serrs
}
}
}

serr := s.sendPluginSubscriptionEvent(id, sub)
serr := s.sendPluginSubscriptionEvent(id, plg)
if serr != nil {
serrs = append(serrs, serr)
return serrs
}
}

return
return serrs
}

func (p *subscriptionGroup) unsubscribePlugins(id string,
Expand Down

0 comments on commit 3b2b54b

Please sign in to comment.