From cb68359b0cf22754bae16a5ee5247c33f6fce3ed Mon Sep 17 00:00:00 2001 From: Rashmi Gottipati Date: Fri, 28 Apr 2017 10:25:38 -0700 Subject: [PATCH] Fixes #1618 - Fix plugin load timeout in LoadPlugin and startPlugin by blocking until receiving plugin's response or timeout expiry --- control/control.go | 2 + control/plugin_manager.go | 495 ++++++++++-------- control/plugin_manager_test.go | 33 +- control/runner.go | 133 +++-- control/runner_test.go | 3 - .../snap-plugin-collector-mock2/mock/mock.go | 7 + 6 files changed, 374 insertions(+), 299 deletions(-) diff --git a/control/control.go b/control/control.go index cca9d327a..783fe0f24 100644 --- a/control/control.go +++ b/control/control.go @@ -116,6 +116,7 @@ type runsPlugins interface { SetPluginManager(managesPlugins) Monitor() *monitor runPlugin(string, *pluginDetails) error + SetPluginLoadTimeout(int) } type managesPlugins interface { @@ -174,6 +175,7 @@ func OptSetConfig(cfg *Config) PluginControlOpt { c.Config = cfg c.pluginManager.SetPluginConfig(cfg.Plugins) c.pluginManager.SetPluginLoadTimeout(c.Config.PluginLoadTimeout) + c.pluginRunner.SetPluginLoadTimeout(c.Config.PluginLoadTimeout) } } diff --git a/control/plugin_manager.go b/control/plugin_manager.go index ab9b26159..3ad525a7a 100644 --- a/control/plugin_manager.go +++ b/control/plugin_manager.go @@ -343,283 +343,314 @@ func (p *pluginManager) SetMetricCatalog(mc catalogsMetrics) { // LoadPlugin is the method for loading a plugin and // saving plugin into the LoadedPlugins array func (p *pluginManager) LoadPlugin(details *pluginDetails, emitter gomit.Emitter) (*loadedPlugin, serror.SnapError) { - lPlugin := new(loadedPlugin) - lPlugin.Details = details - lPlugin.State = DetectedState + type result struct { + lp *loadedPlugin + err serror.SnapError + } + resultChan := make(chan result) + go func() { + lPlugin := new(loadedPlugin) + lPlugin.Details = details + lPlugin.State = DetectedState + + var ( + ePlugin *plugin.ExecutablePlugin + resp plugin.Response + err error + ) + + if lPlugin.Details.Uri == nil { + pmLogger.WithFields(log.Fields{ + "_block": "load-plugin", + "path": filepath.Base(lPlugin.Details.Exec[0]), + }).Info("plugin load called") + // We will create commands by appending the ExecPath to the actual command. + // The ExecPath is a temporary location where the plugin/package will be + // run from. + commands := make([]string, len(lPlugin.Details.Exec)) + for i, e := range lPlugin.Details.Exec { + commands[i] = filepath.Join(lPlugin.Details.ExecPath, e) + } - var ( - ePlugin *plugin.ExecutablePlugin - resp plugin.Response - err error - ) + ePlugin, err = plugin.NewExecutablePlugin( + p.GenerateArgs(int(log.GetLevel())). + SetCertPath(details.CertPath). + SetKeyPath(details.KeyPath). + SetCACertPaths(details.CACertPaths). + SetTLSEnabled(details.TLSEnabled), + commands...) + if err != nil { + pmLogger.WithFields(log.Fields{ + "_block": "load-plugin", + "error": err.Error(), + }).Error("load plugin error while creating executable plugin") + resultChan <- result{nil, serror.New(err)} + return + } + pmLogger.WithFields(log.Fields{ + "_block": "load-plugin", + "path": lPlugin.Details.Exec, + }).Debug(fmt.Sprintf("plugin load timeout set to %ds", p.pluginLoadTimeout)) + resp, err = ePlugin.Run(time.Second * time.Duration(p.pluginLoadTimeout)) + if err != nil { + pmLogger.WithFields(log.Fields{ + "_block": "load-plugin", + "error": err.Error(), + }).Error("load plugin error when starting plugin") + resultChan <- result{nil, serror.New(err)} + return + } - if lPlugin.Details.Uri == nil { - pmLogger.WithFields(log.Fields{ - "_block": "load-plugin", - "path": filepath.Base(lPlugin.Details.Exec[0]), - }).Info("plugin load called") - // We will create commands by appending the ExecPath to the actual command. - // The ExecPath is a temporary location where the plugin/package will be - // run from. - commands := make([]string, len(lPlugin.Details.Exec)) - for i, e := range lPlugin.Details.Exec { - commands[i] = filepath.Join(lPlugin.Details.ExecPath, e) - } + ePlugin.SetName(resp.Meta.Name) - ePlugin, err = plugin.NewExecutablePlugin( - p.GenerateArgs(int(log.GetLevel())). - SetCertPath(details.CertPath). - SetKeyPath(details.KeyPath). - SetCACertPaths(details.CACertPaths). - SetTLSEnabled(details.TLSEnabled), - commands...) - if err != nil { + key := fmt.Sprintf("%s"+core.Separator+"%s"+core.Separator+"%d", resp.Meta.Type.String(), resp.Meta.Name, resp.Meta.Version) + if _, exists := p.loadedPlugins.table[key]; exists { + resultChan <- result{nil, serror.New(ErrPluginAlreadyLoaded, map[string]interface{}{ + "plugin-name": resp.Meta.Name, + "plugin-version": resp.Meta.Version, + "plugin-type": resp.Type.String(), + })} + } + } else { pmLogger.WithFields(log.Fields{ "_block": "load-plugin", - "error": err.Error(), - }).Error("load plugin error while creating executable plugin") - return nil, serror.New(err) + "uri": lPlugin.Details.Uri.String(), + }).Info("plugin load called") + res, err := http.Get(lPlugin.Details.Uri.String()) + if err != nil { + resultChan <- result{nil, serror.New(err)} + return + } + + body, err := ioutil.ReadAll(res.Body) + if err != nil { + resultChan <- result{nil, serror.New(err)} + return + } + err = json.Unmarshal(body, &resp) + if err != nil { + pmLogger.WithFields(log.Fields{ + "_block": "load-plugin", + "error": err.Error(), + }).Error("error during json unmarshal") + } } - pmLogger.WithFields(log.Fields{ - "_block": "load-plugin", - "path": lPlugin.Details.Exec, - }).Debug(fmt.Sprintf("plugin load timeout set to %ds", p.pluginLoadTimeout)) - resp, err = ePlugin.Run(time.Second * time.Duration(p.pluginLoadTimeout)) + ap, err := newAvailablePlugin(resp, emitter, ePlugin, p.grpcSecurity) if err != nil { pmLogger.WithFields(log.Fields{ "_block": "load-plugin", "error": err.Error(), - }).Error("load plugin error when starting plugin") - return nil, serror.New(err) + }).Error("load plugin error while creating available plugin") + resultChan <- result{nil, serror.New(err)} + return } - ePlugin.SetName(resp.Meta.Name) + if lPlugin.Details.Uri != nil { + ap.SetIsRemote(true) + } - key := fmt.Sprintf("%s"+core.Separator+"%s"+core.Separator+"%d", resp.Meta.Type.String(), resp.Meta.Name, resp.Meta.Version) - if _, exists := p.loadedPlugins.table[key]; exists { - return nil, serror.New(ErrPluginAlreadyLoaded, map[string]interface{}{ - "plugin-name": resp.Meta.Name, - "plugin-version": resp.Meta.Version, - "plugin-type": resp.Type.String(), - }) + if resp.Meta.Unsecure { + err = ap.client.Ping() + } else { + err = ap.client.SetKey() } - } else { - pmLogger.WithFields(log.Fields{ - "_block": "load-plugin", - "uri": lPlugin.Details.Uri.String(), - }).Info("plugin load called") - res, err := http.Get(lPlugin.Details.Uri.String()) + if err != nil { - return nil, serror.New(err) + pmLogger.WithFields(log.Fields{ + "_block": "load-plugin", + "error": err.Error(), + }).Error("load plugin error while pinging the plugin") + resultChan <- result{nil, serror.New(err)} + return } - body, err := ioutil.ReadAll(res.Body) - if err != nil { - return nil, serror.New(err) + // Get the ConfigPolicy and add it to the loaded plugin + c, ok := ap.client.(plugin.Plugin) + if !ok { + resultChan <- result{nil, serror.New(errors.New("missing GetConfigPolicy function"))} + return } - err = json.Unmarshal(body, &resp) + cp, err := c.GetConfigPolicy() if err != nil { pmLogger.WithFields(log.Fields{ - "_block": "load-plugin", - "error": err.Error(), - }).Error("error during json unmarshal") + "_block": "load-plugin", + "plugin-type": "collector", + "error": err.Error(), + "plugin-name": ap.Name(), + "plugin-version": ap.Version(), + "plugin-id": ap.ID(), + }).Error("error in getting config policy") + resultChan <- result{nil, serror.New(err)} + return } - } - ap, err := newAvailablePlugin(resp, emitter, ePlugin, p.grpcSecurity) - if err != nil { - pmLogger.WithFields(log.Fields{ - "_block": "load-plugin", - "error": err.Error(), - }).Error("load plugin error while creating available plugin") - return nil, serror.New(err) - } - if lPlugin.Details.Uri != nil { - ap.SetIsRemote(true) - } + lPlugin.ConfigPolicy = cp + lPlugin.Meta = resp.Meta + lPlugin.Type = resp.Type + lPlugin.Token = resp.Token + lPlugin.LoadedTime = time.Now() + lPlugin.State = LoadedState + + if resp.Type == plugin.CollectorPluginType { + cfgNode := p.pluginConfig.getPluginConfigDataNode(core.PluginType(resp.Type), resp.Meta.Name, resp.Meta.Version) + + if lPlugin.ConfigPolicy != nil { + // Get plugin config defaults + defaults := cdata.NewNode() + for _, cpolicy := range lPlugin.ConfigPolicy.GetAll() { + _, errs := cpolicy.AddDefaults(defaults.Table()) + if len(errs.Errors()) > 0 { + for _, err := range errs.Errors() { + pmLogger.WithFields(log.Fields{ + "_block": "load-plugin", + "plugin-type": "collector", + "plugin-name": ap.Name(), + "plugin-version": ap.Version(), + "plugin-id": ap.ID(), + }).Error(err.Error()) + } + resultChan <- result{nil, serror.New(errors.New("error getting default config"))} + return - if resp.Meta.Unsecure { - err = ap.client.Ping() - } else { - err = ap.client.SetKey() - } + } + } - if err != nil { - pmLogger.WithFields(log.Fields{ - "_block": "load-plugin", - "error": err.Error(), - }).Error("load plugin error while pinging the plugin") - return nil, serror.New(err) - } + // Update config policy with defaults + cfgNode = cfgNode.ReverseMerge(defaults) + cp, err = c.GetConfigPolicy() + if err != nil { + pmLogger.WithFields(log.Fields{ + "_block": "load-plugin", + "plugin-type": "collector", + "error": err.Error(), + "plugin-name": ap.Name(), + "plugin-version": ap.Version(), + "plugin-id": ap.ID(), + }).Error("error in getting config policy") + resultChan <- result{nil, serror.New(err)} + return + } + lPlugin.ConfigPolicy = cp + } - // Get the ConfigPolicy and add it to the loaded plugin - c, ok := ap.client.(plugin.Plugin) - if !ok { - return nil, serror.New(errors.New("missing GetConfigPolicy function")) - } - cp, err := c.GetConfigPolicy() - if err != nil { - pmLogger.WithFields(log.Fields{ - "_block": "load-plugin", - "plugin-type": "collector", - "error": err.Error(), - "plugin-name": ap.Name(), - "plugin-version": ap.Version(), - "plugin-id": ap.ID(), - }).Error("error in getting config policy") - return nil, serror.New(err) - } + colClient := ap.client.(client.PluginCollectorClient) + defer ap.client.(client.PluginCollectorClient).Close() - lPlugin.ConfigPolicy = cp - lPlugin.Meta = resp.Meta - lPlugin.Type = resp.Type - lPlugin.Token = resp.Token - lPlugin.LoadedTime = time.Now() - lPlugin.State = LoadedState - - if resp.Type == plugin.CollectorPluginType { - cfgNode := p.pluginConfig.getPluginConfigDataNode(core.PluginType(resp.Type), resp.Meta.Name, resp.Meta.Version) - - if lPlugin.ConfigPolicy != nil { - // Get plugin config defaults - defaults := cdata.NewNode() - for _, cpolicy := range lPlugin.ConfigPolicy.GetAll() { - _, errs := cpolicy.AddDefaults(defaults.Table()) - if len(errs.Errors()) > 0 { - for _, err := range errs.Errors() { - pmLogger.WithFields(log.Fields{ - "_block": "load-plugin", - "plugin-type": "collector", - "plugin-name": ap.Name(), - "plugin-version": ap.Version(), - "plugin-id": ap.ID(), - }).Error(err.Error()) - } - return nil, serror.New(errors.New("error getting default config")) - } + cfg := plugin.ConfigType{ + ConfigDataNode: cfgNode, } - // Update config policy with defaults - cfgNode = cfgNode.ReverseMerge(defaults) - cp, err = c.GetConfigPolicy() + metricTypes, err := colClient.GetMetricTypes(cfg) if err != nil { pmLogger.WithFields(log.Fields{ - "_block": "load-plugin", - "plugin-type": "collector", - "error": err.Error(), - "plugin-name": ap.Name(), - "plugin-version": ap.Version(), - "plugin-id": ap.ID(), - }).Error("error in getting config policy") - return nil, serror.New(err) + "_block": "load-plugin", + "plugin-type": "collector", + "error": err.Error(), + }).Error("error in getting metric types") + resultChan <- result{nil, serror.New(err)} + return } - lPlugin.ConfigPolicy = cp - } - colClient := ap.client.(client.PluginCollectorClient) - defer ap.client.(client.PluginCollectorClient).Close() - - cfg := plugin.ConfigType{ - ConfigDataNode: cfgNode, - } - - metricTypes, err := colClient.GetMetricTypes(cfg) - if err != nil { - pmLogger.WithFields(log.Fields{ - "_block": "load-plugin", - "plugin-type": "collector", - "error": err.Error(), - }).Error("error in getting metric types") - return nil, serror.New(err) - } + // Add metric types to metric catalog + for _, nmt := range metricTypes { + // If the version is 0 default it to the plugin version + // This honors the plugins explicit version but falls back + // to the plugin version as default + if nmt.Version() < 1 { + // Since we have to override version we convert to a internal struct + nmt = &metricType{ + namespace: nmt.Namespace(), + version: resp.Meta.Version, + lastAdvertisedTime: nmt.LastAdvertisedTime(), + config: nmt.Config(), + data: nmt.Data(), + tags: nmt.Tags(), + description: nmt.Description(), + unit: nmt.Unit(), + } + } + // We quit and throw an error on bad metric versions (<1) + // the is a safety catch otherwise the catalog will be corrupted + if nmt.Version() < 1 { + err := errors.New("Bad metric version from plugin") + pmLogger.WithFields(log.Fields{ + "_block": "load-plugin", + "plugin-name": resp.Meta.Name, + "plugin-version": resp.Meta.Version, + "plugin-type": resp.Meta.Type.String(), + "plugin-path": filepath.Base(lPlugin.Details.ExecPath), + "metric-namespace": nmt.Namespace(), + "metric-version": nmt.Version(), + "error": err.Error(), + }).Error("received metric with bad version") + resultChan <- result{nil, serror.New(err)} + return + } - // Add metric types to metric catalog - for _, nmt := range metricTypes { - // If the version is 0 default it to the plugin version - // This honors the plugins explicit version but falls back - // to the plugin version as default - if nmt.Version() < 1 { - // Since we have to override version we convert to a internal struct - nmt = &metricType{ - namespace: nmt.Namespace(), - version: resp.Meta.Version, - lastAdvertisedTime: nmt.LastAdvertisedTime(), - config: nmt.Config(), - data: nmt.Data(), - tags: nmt.Tags(), - description: nmt.Description(), - unit: nmt.Unit(), + //Add standard tags + nmt = p.AddStandardAndWorkflowTags(nmt, nil) + + if err := p.metricCatalog.AddLoadedMetricType(lPlugin, nmt); err != nil { + pmLogger.WithFields(log.Fields{ + "_block": "load-plugin", + "plugin-name": resp.Meta.Name, + "plugin-version": resp.Meta.Version, + "plugin-type": resp.Meta.Type.String(), + "plugin-path": filepath.Base(lPlugin.Details.ExecPath), + "metric-namespace": nmt.Namespace(), + "metric-version": nmt.Version(), + "error": err.Error(), + }).Error("error adding loaded metric type") + resultChan <- result{nil, serror.New(err)} + return } } - // We quit and throw an error on bad metric versions (<1) - // the is a safety catch otherwise the catalog will be corrupted - if nmt.Version() < 1 { - err := errors.New("Bad metric version from plugin") - pmLogger.WithFields(log.Fields{ - "_block": "load-plugin", - "plugin-name": resp.Meta.Name, - "plugin-version": resp.Meta.Version, - "plugin-type": resp.Meta.Type.String(), - "plugin-path": filepath.Base(lPlugin.Details.ExecPath), - "metric-namespace": nmt.Namespace(), - "metric-version": nmt.Version(), - "error": err.Error(), - }).Error("received metric with bad version") - return nil, serror.New(err) - } - - //Add standard tags - nmt = p.AddStandardAndWorkflowTags(nmt, nil) + } - if err := p.metricCatalog.AddLoadedMetricType(lPlugin, nmt); err != nil { + if lPlugin.Details.Uri == nil { + // Added so clients can adequately clean up connections + ap.client.Kill("Retrieved necessary plugin info") + err = ePlugin.Kill() + if err != nil { pmLogger.WithFields(log.Fields{ - "_block": "load-plugin", - "plugin-name": resp.Meta.Name, - "plugin-version": resp.Meta.Version, - "plugin-type": resp.Meta.Type.String(), - "plugin-path": filepath.Base(lPlugin.Details.ExecPath), - "metric-namespace": nmt.Namespace(), - "metric-version": nmt.Version(), - "error": err.Error(), - }).Error("error adding loaded metric type") - return nil, serror.New(err) + "_block": "load-plugin", + "error": err.Error(), + }).Error("load plugin error while killing plugin executable plugin") + resultChan <- result{nil, serror.New(err)} + return } } - } - if lPlugin.Details.Uri == nil { - // Added so clients can adequately clean up connections - ap.client.Kill("Retrieved necessary plugin info") - err = ePlugin.Kill() - if err != nil { + if resp.State != plugin.PluginSuccess { + e := fmt.Errorf("plugin loading did not succeed: %s\n", resp.ErrorMessage) pmLogger.WithFields(log.Fields{ - "_block": "load-plugin", - "error": err.Error(), - }).Error("load plugin error while killing plugin executable plugin") - return nil, serror.New(err) + "_block": "load-plugin", + "error": e, + "plugin response": resp.ErrorMessage, + }).Error("load plugin error") + resultChan <- result{nil, serror.New(e)} + return } - } - if resp.State != plugin.PluginSuccess { - e := fmt.Errorf("plugin loading did not succeed: %s\n", resp.ErrorMessage) - pmLogger.WithFields(log.Fields{ - "_block": "load-plugin", - "error": e, - "plugin response": resp.ErrorMessage, - }).Error("load plugin error") - return nil, serror.New(e) - } + aErr := p.loadedPlugins.add(lPlugin) + if aErr != nil { + pmLogger.WithFields(log.Fields{ + "_block": "load-plugin", + "error": aErr, + }).Error("load plugin error while adding loaded plugin to load plugins collection") + resultChan <- result{nil, aErr} + } + resultChan <- result{lPlugin, nil} + return + }() - aErr := p.loadedPlugins.add(lPlugin) - if aErr != nil { - pmLogger.WithFields(log.Fields{ - "_block": "load-plugin", - "error": aErr, - }).Error("load plugin error while adding loaded plugin to load plugins collection") - return nil, aErr + select { + case results := <-resultChan: + return results.lp, results.err + case <-time.After(time.Second * time.Duration(p.pluginLoadTimeout)): + e := serror.New(errors.New("timed out waiting for plugin to load")) + return nil, e } - - return lPlugin, nil } // UnloadPlugin unloads a plugin from the LoadedPlugins table diff --git a/control/plugin_manager_test.go b/control/plugin_manager_test.go index 8806f1495..c77997605 100644 --- a/control/plugin_manager_test.go +++ b/control/plugin_manager_test.go @@ -67,10 +67,16 @@ func TestLoadedPlugins(t *testing.T) { }) } -func loadPlugin(p *pluginManager, fileName string) (*loadedPlugin, serror.SnapError) { +func loadPlugin(p *pluginManager, fileName string, retries ...int) (*loadedPlugin, serror.SnapError) { // This is a Travis optimized loading of plugins. From time to time, tests will error in Travis // due to a timeout when waiting for a response from a plugin. We are going to attempt loading a plugin // 3 times before letting the error through. Hopefully this cuts down on the number of Travis failures + var retryCount int + if len(retries) == 0 { + retryCount = 3 + } else { + retryCount = retries[0] + } var e serror.SnapError var lp *loadedPlugin file, err := os.Open(fileName) @@ -102,17 +108,13 @@ func loadPlugin(p *pluginManager, fileName string) (*loadedPlugin, serror.SnapEr Exec: []string{filepath.Base(path)}, } - for i := 0; i < 3; i++ { + for i := 0; i < retryCount; i++ { lp, e = p.LoadPlugin(details, nil) if e == nil { - break - } - if e != nil && i == 2 { - return nil, e - + break //success } } - return lp, nil + return lp, e } // Uses the mock collector plugin to simulate loading @@ -120,10 +122,8 @@ func TestLoadPlugin(t *testing.T) { // These tests only work if SNAP_PATH is known // It is the responsibility of the testing framework to // build the plugins first into the build dir - if fixtures.SnapPath != "" { Convey("PluginManager.LoadPlugin", t, func() { - Convey("loads plugin successfully", func() { p := newPluginManager() p.SetMetricCatalog(newMetricCatalog()) @@ -135,6 +135,18 @@ func TestLoadPlugin(t *testing.T) { So(len(p.all()), ShouldBeGreaterThan, 0) }) + Convey("load plugin unsuccessful due to timeout", func() { + cfg := GetDefaultConfig() + cfg.Plugins.Collector.Plugins["mock"] = newPluginConfigItem(optAddPluginConfigItem("test-sleep-duration", ctypes.ConfigValueStr{Value: "2s"})) + pm := newPluginManager(OptSetPluginConfig(cfg.Plugins)) + pm.SetPluginLoadTimeout(1) + pm.SetMetricCatalog(newMetricCatalog()) + lplugin, lerr := loadPlugin(pm, fixtures.PluginPathMock2, 1) + So(lplugin, ShouldBeNil) + So(lerr, ShouldNotBeNil) + So(lerr.Error(), ShouldContainSubstring, "timed out") + }) + Convey("with a plugin config a plugin loads successfully", func() { cfg := GetDefaultConfig() cfg.Plugins.Collector.Plugins["mock"] = newPluginConfigItem(optAddPluginConfigItem("test", ctypes.ConfigValueBool{Value: true})) @@ -166,7 +178,6 @@ func TestLoadPlugin(t *testing.T) { p := newPluginManager(OptSetPluginConfig(cfg.Plugins)) p.SetMetricCatalog(newMetricCatalog()) lp, err := loadPlugin(p, fixtures.PluginPathMock2) - So(lp, ShouldBeNil) So(p.all(), ShouldBeEmpty) So(err, ShouldNotBeNil) diff --git a/control/runner.go b/control/runner.go index 26a7f2152..c7c08e77c 100644 --- a/control/runner.go +++ b/control/runner.go @@ -69,19 +69,21 @@ type executablePlugin interface { // Handles events pertaining to plugins and control the runnning state accordingly. type runner struct { - delegates []gomit.Delegator - emitter gomit.Emitter - monitor *monitor - availablePlugins *availablePlugins - metricCatalog catalogsMetrics - pluginManager managesPlugins - grpcSecurity client.GRPCSecurity + delegates []gomit.Delegator + emitter gomit.Emitter + monitor *monitor + availablePlugins *availablePlugins + metricCatalog catalogsMetrics + pluginManager managesPlugins + grpcSecurity client.GRPCSecurity + pluginLoadTimeout int } func newRunner(opts ...pluginRunnerOpt) *runner { r := &runner{ - monitor: newMonitor(), - availablePlugins: newAvailablePlugins(), + pluginLoadTimeout: defaultPluginLoadTimeout, + monitor: newMonitor(), + availablePlugins: newAvailablePlugins(), } mergedOpts := append([]pluginRunnerOpt{}, defaultRunnerOpts...) mergedOpts = append(mergedOpts, opts...) @@ -126,6 +128,11 @@ func (r *runner) Monitor() *monitor { return r.monitor } +// SetPluginLoadTimeout sets plugin load timeout +func (r *runner) SetPluginLoadTimeout(timeout int) { + r.pluginLoadTimeout = timeout +} + // Adds Delegates (gomit.Delegator) for adding Runner handlers to on Start and // unregistration on Stop. func (r *runner) AddDelegates(delegates ...gomit.Delegator) { @@ -180,55 +187,75 @@ func (r *runner) Stop() []error { } func (r *runner) startPlugin(p executablePlugin) (*availablePlugin, error) { - resp, err := p.Run(time.Second * 5) - if err != nil { - e := errors.New("error starting plugin: " + err.Error()) - runnerLog.WithFields(log.Fields{ - "_block": "start-plugin", - "error": e.Error(), - }).Error("error starting a plugin") - return nil, e + type result struct { + ap *availablePlugin + err error } + resultChan := make(chan result) + go func() { + resp, err := p.Run(time.Second * time.Duration(r.pluginLoadTimeout)) + if err != nil { + e := errors.New("error starting plugin: " + err.Error()) + runnerLog.WithFields(log.Fields{ + "_block": "start-plugin", + "error": e.Error(), + }).Error("error starting a plugin") + resultChan <- result{nil, e} + return - if resp.State != plugin.PluginSuccess { - e := errors.New("plugin could not start error: " + resp.ErrorMessage) - runnerLog.WithFields(log.Fields{ - "_block": "start-plugin", - "error": e.Error(), - }).Error("error starting a plugin") - return nil, e - } + } - ap, err := newAvailablePlugin(resp, r.emitter, p, r.grpcSecurity) - if err != nil { - return nil, err - } + if resp.State != plugin.PluginSuccess { + e := errors.New("plugin could not start error: " + resp.ErrorMessage) + runnerLog.WithFields(log.Fields{ + "_block": "start-plugin", + "error": e.Error(), + }).Error("error starting a plugin") + resultChan <- result{nil, e} + return + } - if resp.Meta.Unsecure { - err = ap.client.Ping() - } else { - err = ap.client.SetKey() - } - if err != nil { - return nil, err - } - r.availablePlugins.insert(ap) + // build availablePlugin + ap, err := newAvailablePlugin(resp, r.emitter, p, r.grpcSecurity) + if err != nil { + resultChan <- result{nil, err} + return + } - runnerLog.WithFields(log.Fields{ - "_block": "start-plugin", - "available-plugin": ap.String(), - "available-plugin-type": ap.TypeName(), - }).Info("available plugin started") - - defer r.emitter.Emit(&control_event.StartPluginEvent{ - Name: ap.Name(), - Version: ap.Version(), - Type: int(ap.Type()), - Key: ap.key, - Id: ap.ID(), - }) - - return ap, nil + if resp.Meta.Unsecure { + err = ap.client.Ping() + } else { + err = ap.client.SetKey() + } + if err != nil { + resultChan <- result{nil, err} + return + } + r.availablePlugins.insert(ap) + + runnerLog.WithFields(log.Fields{ + "_block": "start-plugin", + "available-plugin": ap.String(), + "available-plugin-type": ap.TypeName(), + }).Info("available plugin started") + resultChan <- result{ap, nil} + + defer r.emitter.Emit(&control_event.StartPluginEvent{ + Name: ap.Name(), + Version: ap.Version(), + Type: int(ap.Type()), + Key: ap.key, + Id: ap.ID(), + }) + }() + + select { + case results := <-resultChan: + return results.ap, results.err + case <-time.After(time.Second * time.Duration(r.pluginLoadTimeout)): + e := errors.New("error starting plugin due to timeout") + return nil, e + } } func (r *runner) stopPlugin(reason string, ap *availablePlugin) error { diff --git a/control/runner_test.go b/control/runner_test.go index 8458b98e2..d2634c718 100644 --- a/control/runner_test.go +++ b/control/runner_test.go @@ -333,7 +333,6 @@ func TestRunnerState(t *testing.T) { } func TestRunnerPluginRunning(t *testing.T) { - // log.SetLevel(log.DebugLevel) Convey("snap/control", t, func() { Convey("Runner", func() { Convey("startPlugin", func() { @@ -350,8 +349,6 @@ func TestRunnerPluginRunning(t *testing.T) { } So(err, ShouldBeNil) - - // exPlugin := new(MockExecutablePlugin) ap, e := r.startPlugin(exPlugin) So(e, ShouldBeNil) diff --git a/plugin/collector/snap-plugin-collector-mock2/mock/mock.go b/plugin/collector/snap-plugin-collector-mock2/mock/mock.go index 7dc551f1a..93d6c0e0a 100644 --- a/plugin/collector/snap-plugin-collector-mock2/mock/mock.go +++ b/plugin/collector/snap-plugin-collector-mock2/mock/mock.go @@ -141,6 +141,13 @@ func (f *Mock) GetMetricTypes(cfg plugin.ConfigType) ([]plugin.MetricType, error if _, ok := cfg.Table()["test-fail"]; ok { return mts, fmt.Errorf("testing") } + if d, ok := cfg.Table()["test-sleep-duration"]; ok { + duration, err := time.ParseDuration(d.(ctypes.ConfigValueStr).Value) + if err != nil { + return nil, err + } + time.Sleep(duration) + } if _, ok := cfg.Table()["test"]; ok { mts = append(mts, plugin.MetricType{ Namespace_: core.NewNamespace("intel", "mock", "test"),