Skip to content
This repository has been archived by the owner on Nov 8, 2022. It is now read-only.

Commit

Permalink
Adds support for loading remote plugins
Browse files Browse the repository at this point in the history
  • Loading branch information
aarontay committed May 10, 2017
1 parent 1c4f665 commit f09abc5
Show file tree
Hide file tree
Showing 15 changed files with 341 additions and 73 deletions.
20 changes: 20 additions & 0 deletions control/available_plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ type availablePlugin struct {
execPath string
fromPackage bool
pprofPort string
isRemote bool
}

// newAvailablePlugin returns an availablePlugin with information from a
Expand All @@ -100,6 +101,7 @@ func newAvailablePlugin(resp plugin.Response, emitter gomit.Emitter, ep executab
lastHitTime: time.Now(),
ePlugin: ep,
pprofPort: resp.PprofAddress,
isRemote: false,
}
ap.key = fmt.Sprintf("%s"+core.Separator+"%s"+core.Separator+"%d", ap.pluginType.String(), ap.name, ap.version)

Expand Down Expand Up @@ -233,13 +235,24 @@ func (a *availablePlugin) LastHit() time.Time {
return a.lastHitTime
}

func (a *availablePlugin) IsRemote() bool {
return a.isRemote
}

func (a *availablePlugin) SetIsRemote(isRemote bool) {
a.isRemote = isRemote
}

// Stop halts a running availablePlugin
func (a *availablePlugin) Stop(r string) error {
log.WithFields(log.Fields{
"_module": "control-aplugin",
"block": "stop",
"plugin_name": a,
}).Info("stopping available plugin")
if a.IsRemote() {
return a.client.Close()
}
return a.client.Kill(r)
}

Expand Down Expand Up @@ -272,6 +285,13 @@ func (a *availablePlugin) Kill(r string) error {
// CheckHealth checks the health of a plugin and updates
// a.failedHealthChecks
func (a *availablePlugin) CheckHealth() {
if a.IsRemote() {
runnerLog.WithFields(log.Fields{
"_module": "control-aplugin",
"_block": "check-health",
}).Debug(fmt.Sprintf("bypassing check-health on standalone plugin"))
return
}
go func() {
a.healthChan <- a.client.Ping()
}()
Expand Down
14 changes: 13 additions & 1 deletion control/control.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,9 @@ import (
"time"

log "github.com/Sirupsen/logrus"
"github.com/intelsdi-x/gomit"
"google.golang.org/grpc"

"github.com/intelsdi-x/gomit"
"github.com/intelsdi-x/snap/control/plugin"
"github.com/intelsdi-x/snap/control/plugin/client"
"github.com/intelsdi-x/snap/control/strategy"
Expand Down Expand Up @@ -587,6 +587,11 @@ func (p *pluginControl) verifySignature(rp *core.RequestedPlugin) (bool, serror.
}

func (p *pluginControl) returnPluginDetails(rp *core.RequestedPlugin) (*pluginDetails, serror.SnapError) {
if rp.Uri() != nil {
return &pluginDetails{
Uri: rp.Uri(),
}, nil
}
details := &pluginDetails{}
var serr serror.SnapError
//Check plugin signing
Expand Down Expand Up @@ -725,6 +730,13 @@ func (p *pluginControl) UnsubscribeDeps(id string) []serror.SnapError {
}

func (p *pluginControl) verifyPlugin(lp *loadedPlugin) error {
if lp.Details.Uri != nil {
// remote plugin
if core.IsUri(lp.Details.Uri.String()) {
return fmt.Errorf(fmt.Sprintf("Remote plugin failed to load: bad uri: (%x)", lp.Details.Uri))
}
return nil
}
b, err := ioutil.ReadFile(lp.Details.Path)
if err != nil {
return err
Expand Down
4 changes: 3 additions & 1 deletion control/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,9 @@ func (m *monitor) Start(availablePlugins *availablePlugins) {
go func() {
availablePlugins.RLock()
for _, ap := range availablePlugins.all() {
go ap.CheckHealth()
if !ap.IsRemote() {
go ap.CheckHealth()
}
}
availablePlugins.RUnlock()
}()
Expand Down
1 change: 1 addition & 0 deletions control/plugin/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ type PluginClient interface {
SetKey() error
Ping() error
Kill(string) error
Close() error
GetConfigPolicy() (*cpolicy.ConfigPolicy, error)
}

Expand Down
8 changes: 8 additions & 0 deletions control/plugin/client/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -344,6 +344,14 @@ func (g *grpcClient) Kill(reason string) error {
return nil
}

func (g *grpcClient) Close() error {
err := g.conn.Close()
if err != nil {
return err
}
return nil
}

func (g *grpcClient) Publish(metrics []core.Metric, config map[string]ctypes.ConfigValue) error {
arg := &rpc.PubProcArg{
Metrics: NewMetrics(metrics),
Expand Down
5 changes: 5 additions & 0 deletions control/plugin/client/native.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,11 @@ func (p *PluginNativeClient) Kill(reason string) error {
return err
}

func (p *PluginNativeClient) Close() error {
// Added to conform to interface, but not needed by native
return nil
}

// Used to catch zero values for times and overwrite with current time
// the 0 value for time.Time is year 1 which isn't a valid value for metric
// collection (until we get a time machine).
Expand Down
146 changes: 93 additions & 53 deletions control/plugin_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,12 @@ package control

import (
"crypto/sha256"
"encoding/json"
"errors"
"fmt"
"io/ioutil"
"net/http"
"net/url"
"os"
"path/filepath"
"runtime"
Expand Down Expand Up @@ -170,6 +174,7 @@ type pluginDetails struct {
KeyPath string
CACertPaths string
TLSEnabled bool
Uri *url.URL
}

type loadedPlugin struct {
Expand Down Expand Up @@ -342,56 +347,83 @@ func (p *pluginManager) LoadPlugin(details *pluginDetails, emitter gomit.Emitter
lPlugin.Details = details
lPlugin.State = DetectedState

pmLogger.WithFields(log.Fields{
"_block": "load-plugin",
"path": filepath.Base(lPlugin.Details.Exec[0]),
}).Info("plugin load called")

// We will create commands by appending the ExecPath to the actual command.
// The ExecPath is a temporary location where the plugin/package will be
// run from.
commands := make([]string, len(lPlugin.Details.Exec))
for i, e := range lPlugin.Details.Exec {
commands[i] = filepath.Join(lPlugin.Details.ExecPath, e)
}

ePlugin, err := plugin.NewExecutablePlugin(
p.GenerateArgs(int(log.GetLevel())).
SetCertPath(details.CertPath).
SetKeyPath(details.KeyPath).
SetCACertPaths(details.CACertPaths).
SetTLSEnabled(details.TLSEnabled),
commands...)
if err != nil {
var (
ePlugin *plugin.ExecutablePlugin
resp plugin.Response
err error
)

if lPlugin.Details.Uri == nil {
pmLogger.WithFields(log.Fields{
"_block": "load-plugin",
"error": err.Error(),
}).Error("load plugin error while creating executable plugin")
return nil, serror.New(err)
}
"path": filepath.Base(lPlugin.Details.Exec[0]),
}).Info("plugin load called")
// We will create commands by appending the ExecPath to the actual command.
// The ExecPath is a temporary location where the plugin/package will be
// run from.
commands := make([]string, len(lPlugin.Details.Exec))
for i, e := range lPlugin.Details.Exec {
commands[i] = filepath.Join(lPlugin.Details.ExecPath, e)
}

pmLogger.WithFields(log.Fields{
"_block": "load-plugin",
"path": lPlugin.Details.Exec,
}).Debug(fmt.Sprintf("plugin load timeout set to %ds", p.pluginLoadTimeout))
resp, err := ePlugin.Run(time.Second * time.Duration(p.pluginLoadTimeout))
if err != nil {
ePlugin, err = plugin.NewExecutablePlugin(
p.GenerateArgs(int(log.GetLevel())).
SetCertPath(details.CertPath).
SetKeyPath(details.KeyPath).
SetCACertPaths(details.CACertPaths).
SetTLSEnabled(details.TLSEnabled),
commands...)
if err != nil {
pmLogger.WithFields(log.Fields{
"_block": "load-plugin",
"error": err.Error(),
}).Error("load plugin error while creating executable plugin")
return nil, serror.New(err)
}
pmLogger.WithFields(log.Fields{
"_block": "load-plugin",
"error": err.Error(),
}).Error("load plugin error when starting plugin")
return nil, serror.New(err)
}
"path": lPlugin.Details.Exec,
}).Debug(fmt.Sprintf("plugin load timeout set to %ds", p.pluginLoadTimeout))
resp, err = ePlugin.Run(time.Second * time.Duration(p.pluginLoadTimeout))
if err != nil {
pmLogger.WithFields(log.Fields{
"_block": "load-plugin",
"error": err.Error(),
}).Error("load plugin error when starting plugin")
return nil, serror.New(err)
}

ePlugin.SetName(resp.Meta.Name)
ePlugin.SetName(resp.Meta.Name)

key := fmt.Sprintf("%s"+core.Separator+"%s"+core.Separator+"%d", resp.Meta.Type.String(), resp.Meta.Name, resp.Meta.Version)
if _, exists := p.loadedPlugins.table[key]; exists {
return nil, serror.New(ErrPluginAlreadyLoaded, map[string]interface{}{
"plugin-name": resp.Meta.Name,
"plugin-version": resp.Meta.Version,
"plugin-type": resp.Type.String(),
})
key := fmt.Sprintf("%s"+core.Separator+"%s"+core.Separator+"%d", resp.Meta.Type.String(), resp.Meta.Name, resp.Meta.Version)
if _, exists := p.loadedPlugins.table[key]; exists {
return nil, serror.New(ErrPluginAlreadyLoaded, map[string]interface{}{
"plugin-name": resp.Meta.Name,
"plugin-version": resp.Meta.Version,
"plugin-type": resp.Type.String(),
})
}
} else {
pmLogger.WithFields(log.Fields{
"_block": "load-plugin",
"uri": lPlugin.Details.Uri.String(),
}).Info("plugin load called")
res, err := http.Get(lPlugin.Details.Uri.String())
if err != nil {
return nil, serror.New(err)
}

body, err := ioutil.ReadAll(res.Body)
if err != nil {
return nil, serror.New(err)
}
err = json.Unmarshal(body, &resp)
if err != nil {
pmLogger.WithFields(log.Fields{
"_block": "load-plugin",
"error": err.Error(),
}).Error("error during json unmarshal")
}
}
ap, err := newAvailablePlugin(resp, emitter, ePlugin, p.grpcSecurity)
if err != nil {
Expand All @@ -402,6 +434,10 @@ func (p *pluginManager) LoadPlugin(details *pluginDetails, emitter gomit.Emitter
return nil, serror.New(err)
}

if lPlugin.Details.Uri != nil {
ap.SetIsRemote(true)
}

if resp.Meta.Unsecure {
err = ap.client.Ping()
} else {
Expand Down Expand Up @@ -481,6 +517,7 @@ func (p *pluginManager) LoadPlugin(details *pluginDetails, emitter gomit.Emitter
}

colClient := ap.client.(client.PluginCollectorClient)
defer ap.client.(client.PluginCollectorClient).Close()

cfg := plugin.ConfigType{
ConfigDataNode: cfgNode,
Expand Down Expand Up @@ -550,19 +587,21 @@ func (p *pluginManager) LoadPlugin(details *pluginDetails, emitter gomit.Emitter
}
}

// Added so clients can adequately clean up connections
ap.client.Kill("Retrieved necessary plugin info")
err = ePlugin.Kill()
if err != nil {
pmLogger.WithFields(log.Fields{
"_block": "load-plugin",
"error": err.Error(),
}).Error("load plugin error while killing plugin executable plugin")
return nil, serror.New(err)
if lPlugin.Details.Uri == nil {
// Added so clients can adequately clean up connections
ap.client.Kill("Retrieved necessary plugin info")
err = ePlugin.Kill()
if err != nil {
pmLogger.WithFields(log.Fields{
"_block": "load-plugin",
"error": err.Error(),
}).Error("load plugin error while killing plugin executable plugin")
return nil, serror.New(err)
}
}

if resp.State != plugin.PluginSuccess {
e := fmt.Errorf("Plugin loading did not succeed: %s\n", resp.ErrorMessage)
e := fmt.Errorf("plugin loading did not succeed: %s\n", resp.ErrorMessage)
pmLogger.WithFields(log.Fields{
"_block": "load-plugin",
"error": e,
Expand Down Expand Up @@ -594,6 +633,7 @@ func (p *pluginManager) UnloadPlugin(pl core.Plugin) (*loadedPlugin, serror.Snap
})
return nil, se
}

pmLogger.WithFields(log.Fields{
"_block": "unload-plugin",
"path": plugin.Details.Exec,
Expand Down
27 changes: 21 additions & 6 deletions control/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -395,12 +395,27 @@ func (r *runner) handleUnsubscription(pType, pName string, pVersion int, taskID
return errors.New("pool not found")
}
if pool.SubscriptionCount() < pool.Count() {
runnerLog.WithFields(log.Fields{
"_block": "handle-unsubscription",
"pool-count": pool.Count(),
"pool-subscription-count": pool.SubscriptionCount(),
}).Debug(fmt.Sprintf("killing an available plugin in pool %s:%s:%d", pType, pName, pVersion))
pool.SelectAndKill(taskID, "unsubscription event")
lp, err := r.pluginManager.get(fmt.Sprintf("%s"+core.Separator+"%s"+core.Separator+"%d", pType, pName, pVersion))
if lp != nil && lp.Details.Uri != nil {
if err != nil {
runnerLog.WithFields(log.Fields{
"_block": "handle-unsubscription",
"pool-count": pool.Count(),
"pool-subscription-count": pool.SubscriptionCount(),
"plugin-name": pName,
"plugin-version": pVersion,
"plugin-type": pType,
"error": err.Error(),
}).Error("unable to get loaded plugin")
}
runnerLog.WithFields(log.Fields{
"_block": "handle-unsubscription",
"plugin-uri": lp.Details.Uri,
}).Debug(fmt.Sprintf("unsubscribe called on standalone plugin"))
pool.SelectAndStop(taskID, "remote unsubscription event")
} else {
pool.SelectAndKill(taskID, "unsubscription event")
}
}
return nil
}
Expand Down
Loading

0 comments on commit f09abc5

Please sign in to comment.