Skip to content
This repository has been archived by the owner on Nov 8, 2022. It is now read-only.

Commit

Permalink
Merge pull request #1677 from IzabellaRaulin/block_unloading_collecto…
Browse files Browse the repository at this point in the history
…r_plugin

Block unloading plugin which is used in a running task
  • Loading branch information
IzabellaRaulin authored Jul 6, 2017
2 parents c520706 + 8185b92 commit a492827
Show file tree
Hide file tree
Showing 13 changed files with 272 additions and 77 deletions.
4 changes: 2 additions & 2 deletions control/available_plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -555,7 +555,7 @@ func (ap *availablePlugins) streamMetrics(
}

func (ap *availablePlugins) publishMetrics(metrics []core.Metric, pluginName string, pluginVersion int, config map[string]ctypes.ConfigValue, taskID string) []error {
key := strings.Join([]string{plugin.PublisherPluginType.String(), pluginName, strconv.Itoa(pluginVersion)}, core.Separator)
key := fmt.Sprintf("%s"+core.Separator+"%s"+core.Separator+"%d", plugin.PublisherPluginType.String(), pluginName, pluginVersion)
pool, serr := ap.getPool(key)
if serr != nil {
return []error{serr}
Expand Down Expand Up @@ -588,7 +588,7 @@ func (ap *availablePlugins) publishMetrics(metrics []core.Metric, pluginName str

func (ap *availablePlugins) processMetrics(metrics []core.Metric, pluginName string, pluginVersion int, config map[string]ctypes.ConfigValue, taskID string) ([]core.Metric, []error) {
var errs []error
key := strings.Join([]string{plugin.ProcessorPluginType.String(), pluginName, strconv.Itoa(pluginVersion)}, core.Separator)
key := fmt.Sprintf("%s"+core.Separator+"%s"+core.Separator+"%d", plugin.ProcessorPluginType.String(), pluginName, pluginVersion)
pool, serr := ap.getPool(key)
if serr != nil {
errs = append(errs, serr)
Expand Down
34 changes: 32 additions & 2 deletions control/control.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ type catalogsMetrics interface {
Subscribe([]string, int) error
Unsubscribe([]string, int) error
GetPlugin(core.Namespace, int) (core.CatalogedPlugin, error)
GetPlugins(core.Namespace) ([]core.CatalogedPlugin, error)
}

type managesSigning interface {
Expand Down Expand Up @@ -637,8 +638,34 @@ func (p *pluginControl) returnPluginDetails(rp *core.RequestedPlugin) (*pluginDe
}

func (p *pluginControl) Unload(pl core.Plugin) (core.CatalogedPlugin, serror.SnapError) {
up, err := p.pluginManager.UnloadPlugin(pl)
up, err := p.pluginManager.get(fmt.Sprintf("%s"+core.Separator+"%s"+core.Separator+"%d", pl.TypeName(), pl.Name(), pl.Version()))
if err != nil {
se := serror.New(ErrPluginNotFound, map[string]interface{}{
"plugin-name": pl.Name(),
"plugin-version": pl.Version(),
"plugin-type": pl.TypeName(),
})
return nil, se
}

if errs := p.subscriptionGroups.validatePluginUnloading(up); errs != nil {
impactOnTasks := []string{}
for _, err := range errs {
taskId := err.Fields()["task-id"].(string)
impactOnTasks = append(impactOnTasks, taskId)
}
se := serror.New(errorPluginCannotBeUnloaded(impactOnTasks), map[string]interface{}{
"plugin-name": pl.Name(),
"plugin-version": pl.Version(),
"plugin-type": pl.TypeName(),
"impacted-tasks": impactOnTasks,
})
return nil, se
}

// unload the plugin means removing it from plugin catalog
// and, for collector plugins, removing its metrics from metric catalog
if _, err := p.pluginManager.UnloadPlugin(pl); err != nil {
return nil, err
}

Expand Down Expand Up @@ -685,7 +712,6 @@ func (p *pluginControl) SwapPlugins(in *core.RequestedPlugin, out core.Cataloged
}
return serr
}

up, err := p.pluginManager.UnloadPlugin(out)
if err != nil {
_, err2 := p.pluginManager.UnloadPlugin(lp)
Expand Down Expand Up @@ -943,6 +969,10 @@ func (p *pluginControl) GetMetricVersions(ns core.Namespace) ([]core.CatalogedMe
return rmts, nil
}

func (p *pluginControl) GetPlugins(ns core.Namespace) ([]core.CatalogedPlugin, error) {
return p.metricCatalog.GetPlugins(ns)
}

func (p *pluginControl) MetricExists(mns core.Namespace, ver int) bool {
_, err := p.metricCatalog.GetMetric(mns, ver)
if err == nil {
Expand Down
4 changes: 4 additions & 0 deletions control/control_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -763,6 +763,10 @@ func (m *mc) GetPlugin(core.Namespace, int) (core.CatalogedPlugin, error) {
return nil, nil
}

func (m *mc) GetPlugins(core.Namespace) ([]core.CatalogedPlugin, error) {
return nil, nil
}

func (m *mc) GetVersions(core.Namespace) ([]*metricType, error) {
return nil, nil
}
Expand Down
4 changes: 4 additions & 0 deletions control/fixtures/fixtures.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ package fixtures

import (
"encoding/json"
"fmt"
"time"

"github.com/intelsdi-x/snap/core"
Expand Down Expand Up @@ -142,6 +143,9 @@ func (m MockPlugin) Name() string { return m.name }
func (m MockPlugin) TypeName() string { return m.pluginType.String() }
func (m MockPlugin) Version() int { return m.ver }
func (m MockPlugin) Config() *cdata.ConfigDataNode { return m.config }
func (m MockPlugin) Key() string {
return fmt.Sprintf("%s"+core.Separator+"%s"+core.Separator+"%d", m.pluginType.String(), m.name, m.ver)
}

type MockRequestedMetric struct {
namespace core.Namespace
Expand Down
31 changes: 31 additions & 0 deletions control/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,10 @@ func (cp *catalogedPlugin) Version() int {
return cp.version
}

func (cp *catalogedPlugin) Key() string {
return fmt.Sprintf("%s"+core.Separator+"%s"+core.Separator+"%d", cp.TypeName(), cp.Name(), cp.Version())
}

func (cp *catalogedPlugin) IsSigned() bool {
return cp.signed
}
Expand Down Expand Up @@ -612,6 +616,33 @@ func (mc *metricCatalog) GetPlugin(mns core.Namespace, ver int) (core.CatalogedP
return mt.Plugin, nil
}

func (mc *metricCatalog) GetPlugins(mns core.Namespace) ([]core.CatalogedPlugin, error) {
plugins := []core.CatalogedPlugin{}
pluginsMap := map[string]core.CatalogedPlugin{}

mts, err := mc.tree.GetVersions(mns.Strings())
if err != nil {
log.WithFields(log.Fields{
"_module": "control",
"_file": "metrics.go,",
"_block": "get-plugins",
"error": err,
}).Error("error getting plugin")
return nil, err
}
for _, mt := range mts {
// iterate over metrics and add the plugin which exposes the following metric to a map
// under plugin key to ensure that plugins do not repeat
key := mt.Plugin.Key()
pluginsMap[key] = mt.Plugin
}
for _, plg := range pluginsMap {
plugins = append(plugins, plg)
}

return plugins, nil
}

func appendIfMissing(keys []string, ns string) []string {
for _, key := range keys {
if ns == key {
Expand Down
16 changes: 15 additions & 1 deletion control/plugin_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ var (
ErrPluginNotFound = errors.New("plugin not found")
// ErrPluginAlreadyLoaded - error message when a plugin is already loaded
ErrPluginAlreadyLoaded = errors.New("plugin is already loaded")
// ErrPluginCannotBeUnloaded - error message when a plugin cannot be unloaded because is already in use by running task(s)
ErrPluginCannotBeUnloaded = errors.New("Plugin is used by running task. Stop the task to be able to unload the plugin")
// ErrPluginNotInLoadedState - error message when a plugin must ne in a loaded state
ErrPluginNotInLoadedState = errors.New("Plugin must be in a LoadedState")

Expand All @@ -73,6 +75,15 @@ var (
defaultManagerOpts = []pluginManagerOpt{optDefaultManagerSecurity()}
)

func errorPluginCannotBeUnloaded(impactedTaskIDs []string) error {
var impactedTasks string

for _, id := range impactedTaskIDs {
impactedTasks += fmt.Sprintf("\n%s", id)
}
return fmt.Errorf("%s:%s", ErrPluginCannotBeUnloaded, impactedTasks)
}

type pluginState string

type loadedPlugins struct {
Expand Down Expand Up @@ -687,6 +698,8 @@ func (p *pluginManager) UnloadPlugin(pl core.Plugin) (*loadedPlugin, serror.Snap
"plugin-version": plugin.Version(),
"plugin-path": plugin.Details.Path,
}).Debugf("Removing plugin")

// remove plugin binary from tempDirPath (do not apply for remote plugin)
if strings.Contains(plugin.Details.Path, p.tempDirPath) {
if err := os.RemoveAll(filepath.Dir(plugin.Details.Path)); err != nil {
pmLogger.WithFields(log.Fields{
Expand All @@ -713,9 +726,10 @@ func (p *pluginManager) UnloadPlugin(pl core.Plugin) (*loadedPlugin, serror.Snap
}).Debug("Nothing to delete as temp path is empty")
}

// remove plugin key
p.loadedPlugins.remove(plugin.Key())

// Remove any metrics from the catalog if this was a collector
// remove any metrics from the catalog if this was a collector
if plugin.TypeName() == core.CollectorPluginType.String() || plugin.TypeName() == core.StreamingCollectorPluginType.String() {
p.metricCatalog.RmUnloadedPluginMetrics(plugin)
}
Expand Down
5 changes: 2 additions & 3 deletions control/strategy/fixtures/fixtures.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,7 @@ limitations under the License.
package fixtures

import (
"strconv"
"strings"
"fmt"
"time"

"github.com/intelsdi-x/snap/control/plugin"
Expand Down Expand Up @@ -140,7 +139,7 @@ func (m MockAvailablePlugin) LastHit() time.Time {
}

func (m MockAvailablePlugin) String() string {
return strings.Join([]string{m.pluginType.String(), m.pluginName, strconv.Itoa(m.Version())}, core.Separator)
return fmt.Sprintf("%s"+core.Separator+"%s"+core.Separator+"%d", m.pluginType.String(), m.pluginName, m.Version())
}

func (m MockAvailablePlugin) Kill(string) error {
Expand Down
105 changes: 91 additions & 14 deletions control/subscription_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ type ManagesSubscriptionGroups interface {
plugins []core.SubscribedPlugin,
configTree *cdata.ConfigDataTree, asserts ...core.SubscribedPluginAssert) (serrs []serror.SnapError)
validateMetric(metric core.Metric) (serrs []serror.SnapError)
validatePluginUnloading(*loadedPlugin) (errs []serror.SnapError)
}

type subscriptionGroup struct {
Expand Down Expand Up @@ -253,6 +254,20 @@ func (s *subscriptionGroups) ValidateDeps(requested []core.RequestedMetric,
return
}

// validatePluginUnloading checks if process of unloading the plugin is safe for existing running tasks.
// If the plugin is used by running task and there is no replacements, return an error with appropriate message
// containing ids of tasks which use the plugin, what blocks unloading process until they are stopped
func (s *subscriptionGroups) validatePluginUnloading(pluginToUnload *loadedPlugin) (errs []serror.SnapError) {
s.Lock()
defer s.Unlock()
for id, group := range s.subscriptionMap {
if err := group.validatePluginUnloading(id, pluginToUnload); err != nil {
errs = append(errs, err)
}
}
return errs
}

func (p *subscriptionGroups) validatePluginSubscription(pl core.SubscribedPlugin, mergedConfig *cdata.ConfigDataNode) []serror.SnapError {
var serrs = []serror.SnapError{}
controlLogger.WithFields(log.Fields{
Expand Down Expand Up @@ -345,6 +360,69 @@ func (s *subscriptionGroups) validateMetric(
return serrs
}

// pluginIsSubscribed returns true if a provided plugin has been found among subscribed plugins
// in the following subscription group
func (s *subscriptionGroup) pluginIsSubscribed(plugin *loadedPlugin) bool {
// range over subscribed plugins to find if the plugin is there
for _, sp := range s.plugins {
if sp.TypeName() == plugin.TypeName() && sp.Name() == plugin.Name() && sp.Version() == plugin.Version() {
return true
}
}
return false
}

// validatePluginUnloading verifies if a given plugin might be unloaded without causing running task failures
func (s *subscriptionGroup) validatePluginUnloading(id string, plgToUnload *loadedPlugin) (serr serror.SnapError) {
impacted := false
if !s.pluginIsSubscribed(plgToUnload) {
// the plugin is not subscribed, so the task is not impacted by its unloading
return nil
}
controlLogger.WithFields(log.Fields{
"_block": "subscriptionGroup.validatePluginUnloading",
"task-id": id,
"plugin-to-unload": plgToUnload.Key(),
}).Debug("validating impact of unloading the plugin")

for _, requestedMetric := range s.requestedMetrics {
// get all plugins exposing the requested metric
plgs, _ := s.GetPlugins(requestedMetric.Namespace())
// when requested version is fixed (greater than 0), take into account only plugins in the requested version
if requestedMetric.Version() > 0 {
// skip those which are not impacted by unloading (version different than plgToUnload.Version())
if requestedMetric.Version() == plgToUnload.Version() {
plgsInVer := []core.CatalogedPlugin{}

for _, plg := range plgs {
if plg.Version() == requestedMetric.Version() {
plgsInVer = append(plgsInVer, plg)
}
}
// set plugins only in the requested version
plgs = plgsInVer
}
}
if len(plgs) == 1 && plgs[0].Key() == plgToUnload.Key() {
// the requested metric is exposed only by the single plugin and there is no replacement
impacted = true
controlLogger.WithFields(log.Fields{
"_block": "subscriptionGroup.validatePluginUnloading",
"task-id": id,
"plugin-to-unload": plgToUnload.Key(),
"requested-metric": fmt.Sprintf("%s:%d", requestedMetric.Namespace(), requestedMetric.Version()),
}).Errorf("unloading the plugin would cause missing in collection the requested metric")
}
}
if impacted {
serr = serror.New(ErrPluginCannotBeUnloaded, map[string]interface{}{
"task-id": id,
"plugin-to-unload": plgToUnload.Key(),
})
}
return serr
}

func (s *subscriptionGroup) process(id string) (serrs []serror.SnapError) {
// gathers collectors based on requested metrics
pluginToMetricMap, plugins, serrs := s.getMetricsAndCollectors(s.requestedMetrics, s.configTree)
Expand All @@ -353,23 +431,22 @@ func (s *subscriptionGroup) process(id string) (serrs []serror.SnapError) {
"metrics": fmt.Sprintf("%+v", s.requestedMetrics),
}).Debug("gathered collectors")

// notice that requested plugins contains only processors and publishers
for _, plugin := range s.requestedPlugins {
//add processors and publishers to collectors just gathered
if plugin.TypeName() != core.CollectorPluginType.String() {
plugins = append(plugins, plugin)
// add defaults to plugins (exposed in a plugins ConfigPolicy)
if lp, err := s.pluginManager.get(
fmt.Sprintf("%s"+core.Separator+"%s"+core.Separator+"%d",
plugin.TypeName(),
plugin.Name(),
plugin.Version())); err == nil && lp.ConfigPolicy != nil {
if policy := lp.ConfigPolicy.Get([]string{""}); policy != nil && len(policy.Defaults()) > 0 {
plugin.Config().ApplyDefaults(policy.Defaults())
}
// add processors and publishers to collectors just gathered
plugins = append(plugins, plugin)
// add defaults to plugins (exposed in a plugins ConfigPolicy)
if lp, err := s.pluginManager.get(
fmt.Sprintf("%s"+core.Separator+"%s"+core.Separator+"%d",
plugin.TypeName(),
plugin.Name(),
plugin.Version())); err == nil && lp.ConfigPolicy != nil {
if policy := lp.ConfigPolicy.Get([]string{""}); policy != nil && len(policy.Defaults()) > 0 {
// set defaults to plugin config
plugin.Config().ApplyDefaults(policy.Defaults())
}
}
}

// calculates those plugins that need to be subscribed and unsubscribed to
subs, unsubs := comparePlugins(plugins, s.plugins)
controlLogger.WithFields(log.Fields{
Expand All @@ -387,7 +464,7 @@ func (s *subscriptionGroup) process(id string) (serrs []serror.SnapError) {
}
}

//updating view
// updating view
// metrics are grouped by plugin
s.metrics = pluginToMetricMap
s.plugins = plugins
Expand Down
Loading

0 comments on commit a492827

Please sign in to comment.