From 6d82687dc1cce6731e407727700fbcab58c46f40 Mon Sep 17 00:00:00 2001 From: Joel Cooklin Date: Thu, 15 Oct 2015 12:43:22 -0700 Subject: [PATCH] Exposes global plugin config through REST --- control/config.go | 231 ++++- control/config_test.go | 68 +- control/control.go | 18 +- control/control_test.go | 4 +- control/plugin_manager.go | 2 +- control/plugin_manager_test.go | 4 +- core/cdata/node.go | 9 +- examples/configs/pulse-config-sample.json | 14 +- mgmt/rest/config.go | 141 +++ mgmt/rest/rbody/body.go | 8 + mgmt/rest/rbody/config.go | 60 ++ mgmt/rest/rest_func_test.go | 876 ++++-------------- mgmt/rest/server.go | 18 + .../pulse-collector-facter/facter/facter.go | 219 ----- .../facter/facter_test.go | 193 ---- .../collector/pulse-collector-pcm/pcm/pcm.go | 186 ---- .../perfevents/perfevents.go | 282 ------ .../perfevents/perfevents_test.go | 150 --- .../pulse-collector-psutil/psutil/psutil.go | 129 --- .../psutil/psutil_integration_test.go | 67 -- scripts/Dockerfile | 3 +- 21 files changed, 683 insertions(+), 1999 deletions(-) create mode 100644 mgmt/rest/config.go create mode 100644 mgmt/rest/rbody/config.go delete mode 100644 plugin/collector/pulse-collector-facter/facter/facter.go delete mode 100644 plugin/collector/pulse-collector-facter/facter/facter_test.go delete mode 100644 plugin/collector/pulse-collector-pcm/pcm/pcm.go delete mode 100644 plugin/collector/pulse-collector-perfevents/perfevents/perfevents.go delete mode 100644 plugin/collector/pulse-collector-perfevents/perfevents/perfevents_test.go delete mode 100644 plugin/collector/pulse-collector-psutil/psutil/psutil.go delete mode 100644 plugin/collector/pulse-collector-psutil/psutil/psutil_integration_test.go diff --git a/control/config.go b/control/config.go index 4f5258ca7..137cc605e 100644 --- a/control/config.go +++ b/control/config.go @@ -7,50 +7,90 @@ import ( "reflect" "strconv" - "github.com/intelsdi-x/pulse/control/plugin" + log "github.com/Sirupsen/logrus" + + "github.com/intelsdi-x/pulse/core" "github.com/intelsdi-x/pulse/core/cdata" "github.com/intelsdi-x/pulse/core/ctypes" ) -// type configData map[string]*cdata.ConfigDataNode - type pluginConfig struct { - All *cdata.ConfigDataNode `json:"all"` - Collector map[string]*pluginConfigItem `json:"collector"` - Publisher map[string]*pluginConfigItem `json:"publisher"` - Processor map[string]*pluginConfigItem `json:"processor"` + All *cdata.ConfigDataNode `json:"all"` + Collector *pluginTypeConfigItem `json:"collector"` + Publisher *pluginTypeConfigItem `json:"publisher"` + Processor *pluginTypeConfigItem `json:"processor"` pluginCache map[string]*cdata.ConfigDataNode } +type pluginTypeConfigItem struct { + Plugins map[string]*pluginConfigItem + All *cdata.ConfigDataNode `json:"all"` +} + type pluginConfigItem struct { *cdata.ConfigDataNode Versions map[int]*cdata.ConfigDataNode `json:"versions"` } type config struct { - Control *cdata.ConfigDataNode `json:"control"` - Scheduler *cdata.ConfigDataNode `json:"scheduler"` - Plugins *pluginConfig `json:"plugins"` + Plugins *pluginConfig `json:"plugins"` } func NewConfig() *config { return &config{ - Control: cdata.NewNode(), - Scheduler: cdata.NewNode(), - Plugins: newPluginConfig(), + Plugins: newPluginConfig(), + } +} + +func newPluginTypeConfigItem() *pluginTypeConfigItem { + return &pluginTypeConfigItem{ + make(map[string]*pluginConfigItem), + cdata.NewNode(), } } func newPluginConfig() *pluginConfig { return &pluginConfig{ All: cdata.NewNode(), - Collector: make(map[string]*pluginConfigItem), - Processor: make(map[string]*pluginConfigItem), - Publisher: make(map[string]*pluginConfigItem), + Collector: newPluginTypeConfigItem(), + Processor: newPluginTypeConfigItem(), + Publisher: newPluginTypeConfigItem(), pluginCache: make(map[string]*cdata.ConfigDataNode), } } +func (p *config) GetPluginConfigDataNode(pluginType core.PluginType, name string, ver int) cdata.ConfigDataNode { + return *p.Plugins.getPluginConfigDataNode(pluginType, name, ver) +} + +func (p *config) MergePluginConfigDataNode(pluginType core.PluginType, name string, ver int, cdn *cdata.ConfigDataNode) cdata.ConfigDataNode { + p.Plugins.mergePluginConfigDataNode(pluginType, name, ver, cdn) + return *p.Plugins.getPluginConfigDataNode(pluginType, name, ver) +} + +func (p *config) MergePluginConfigDataNodeAll(cdn *cdata.ConfigDataNode) cdata.ConfigDataNode { + p.Plugins.mergePluginConfigDataNodeAll(cdn) + return *p.Plugins.All +} + +func (p *config) DeletePluginConfigDataNodeField(pluginType core.PluginType, name string, ver int, fields ...string) cdata.ConfigDataNode { + for _, field := range fields { + p.Plugins.deletePluginConfigDataNodeField(pluginType, name, ver, field) + } + return *p.Plugins.getPluginConfigDataNode(pluginType, name, ver) +} + +func (p *config) DeletePluginConfigDataNodeFieldAll(fields ...string) cdata.ConfigDataNode { + for _, field := range fields { + p.Plugins.deletePluginConfigDataNodeFieldAll(field) + } + return *p.Plugins.All +} + +func (p *config) GetPluginConfigDataNodeAll() cdata.ConfigDataNode { + return *p.Plugins.All +} + // UnmarshalJSON unmarshals valid json into pluginConfig. An example Config // github.com/intelsdi-x/pulse/examples/configs/pulse-config-sample. func (p *pluginConfig) UnmarshalJSON(data []byte) error { @@ -61,6 +101,7 @@ func (p *pluginConfig) UnmarshalJSON(data []byte) error { return err } + //process the key value pairs for ALL plugins if v, ok := t["all"]; ok { jv, err := json.Marshal(v) if err != nil { @@ -75,6 +116,7 @@ func (p *pluginConfig) UnmarshalJSON(data []byte) error { p.All = cdn } + //process the hierarchy of plugins for _, typ := range []string{"collector", "processor", "publisher"} { if err := unmarshalPluginConfig(typ, p, t); err != nil { return err @@ -105,7 +147,100 @@ func optAddPluginConfigItem(key string, value ctypes.ConfigValue) pluginConfigOp } } -func (p *pluginConfig) get(pluginType plugin.PluginType, name string, ver int) *cdata.ConfigDataNode { +func (p *pluginConfig) mergePluginConfigDataNodeAll(cdn *cdata.ConfigDataNode) { + // clear cache + p.pluginCache = make(map[string]*cdata.ConfigDataNode) + + p.All.Merge(cdn) + return +} + +func (p *pluginConfig) deletePluginConfigDataNodeFieldAll(key string) { + // clear cache + p.pluginCache = make(map[string]*cdata.ConfigDataNode) + + p.All.DeleteItem(key) + return +} + +func (p *pluginConfig) mergePluginConfigDataNode(pluginType core.PluginType, name string, ver int, cdn *cdata.ConfigDataNode) { + // clear cache + p.pluginCache = make(map[string]*cdata.ConfigDataNode) + + // merge new config into existing + switch pluginType { + case core.CollectorPluginType: + if res, ok := p.Collector.Plugins[name]; ok { + if res2, ok2 := res.Versions[ver]; ok2 { + res2.Merge(cdn) + return + } + res.Merge(cdn) + return + } + p.Collector.All.Merge(cdn) + case core.ProcessorPluginType: + if res, ok := p.Processor.Plugins[name]; ok { + if res2, ok2 := res.Versions[ver]; ok2 { + res2.Merge(cdn) + return + } + res.Merge(cdn) + return + } + p.Processor.All.Merge(cdn) + case core.PublisherPluginType: + if res, ok := p.Publisher.Plugins[name]; ok { + if res2, ok2 := res.Versions[ver]; ok2 { + res2.Merge(cdn) + return + } + res.Merge(cdn) + return + } + p.Publisher.All.Merge(cdn) + } +} + +func (p *pluginConfig) deletePluginConfigDataNodeField(pluginType core.PluginType, name string, ver int, key string) { + // clear cache + p.pluginCache = make(map[string]*cdata.ConfigDataNode) + + switch pluginType { + case core.CollectorPluginType: + if res, ok := p.Collector.Plugins[name]; ok { + if res2, ok2 := res.Versions[ver]; ok2 { + res2.DeleteItem(key) + return + } + res.DeleteItem(key) + return + } + p.Collector.All.DeleteItem(key) + case core.ProcessorPluginType: + if res, ok := p.Processor.Plugins[name]; ok { + if res2, ok2 := res.Versions[ver]; ok2 { + res2.DeleteItem(key) + return + } + res.DeleteItem(key) + return + } + p.Processor.All.DeleteItem(key) + case core.PublisherPluginType: + if res, ok := p.Publisher.Plugins[name]; ok { + if res2, ok2 := res.Versions[ver]; ok2 { + res2.DeleteItem(key) + return + } + res.DeleteItem(key) + return + } + p.Publisher.All.DeleteItem(key) + } +} + +func (p *pluginConfig) getPluginConfigDataNode(pluginType core.PluginType, name string, ver int) *cdata.ConfigDataNode { // check cache key := fmt.Sprintf("%d:%s:%d", pluginType, name, ver) if res, ok := p.pluginCache[key]; ok { @@ -119,22 +254,25 @@ func (p *pluginConfig) get(pluginType plugin.PluginType, name string, ver int) * // check for plugin config switch pluginType { - case plugin.CollectorPluginType: - if res, ok := p.Collector[name]; ok { + case core.CollectorPluginType: + p.pluginCache[key].Merge(p.Collector.All) + if res, ok := p.Collector.Plugins[name]; ok { p.pluginCache[key].Merge(res.ConfigDataNode) if res2, ok2 := res.Versions[ver]; ok2 { p.pluginCache[key].Merge(res2) } } - case plugin.ProcessorPluginType: - if res, ok := p.Processor[name]; ok { + case core.ProcessorPluginType: + p.pluginCache[key].Merge(p.Processor.All) + if res, ok := p.Processor.Plugins[name]; ok { p.pluginCache[key].Merge(res.ConfigDataNode) if res2, ok2 := res.Versions[ver]; ok2 { p.pluginCache[key].Merge(res2) } } - case plugin.PublisherPluginType: - if res, ok := p.Publisher[name]; ok { + case core.PublisherPluginType: + p.pluginCache[key].Merge(p.Publisher.All) + if res, ok := p.Publisher.Plugins[name]; ok { p.pluginCache[key].Merge(res.ConfigDataNode) if res2, ok2 := res.Versions[ver]; ok2 { p.pluginCache[key].Merge(res2) @@ -142,6 +280,14 @@ func (p *pluginConfig) get(pluginType plugin.PluginType, name string, ver int) * } } + //todo change to debug + log.WithFields(log.Fields{ + "_block_": "getPluginConfigDataNode", + "_module": "config", + "config-cache-key": key, + "config-cache-value": p.pluginCache[key], + }).Debug("Getting plugin config") + return p.pluginCache[key] } @@ -150,13 +296,34 @@ func unmarshalPluginConfig(typ string, p *pluginConfig, t map[string]interface{} switch plugins := v.(type) { case map[string]interface{}: for name, c := range plugins { + if name == "all" { + jv, err := json.Marshal(c) + if err != nil { + return err + } + cdn := cdata.NewNode() + dec := json.NewDecoder(bytes.NewReader(jv)) + dec.UseNumber() + if err := dec.Decode(&cdn); err != nil { + return err + } + switch typ { + case "collector": + p.Collector.All = cdn + case "processor": + p.Processor.All = cdn + case "publisher": + p.Publisher.All = cdn + } + continue + } switch typ { case "collector": - p.Collector[name] = newPluginConfigItem() + p.Collector.Plugins[name] = newPluginConfigItem() case "processor": - p.Processor[name] = newPluginConfigItem() + p.Processor.Plugins[name] = newPluginConfigItem() case "publisher": - p.Publisher[name] = newPluginConfigItem() + p.Publisher.Plugins[name] = newPluginConfigItem() } switch col := c.(type) { case map[string]interface{}: @@ -173,11 +340,11 @@ func unmarshalPluginConfig(typ string, p *pluginConfig, t map[string]interface{} } switch typ { case "collector": - p.Collector[name].ConfigDataNode = cdn + p.Collector.Plugins[name].ConfigDataNode = cdn case "processor": - p.Processor[name].ConfigDataNode = cdn + p.Processor.Plugins[name].ConfigDataNode = cdn case "publisher": - p.Publisher[name].ConfigDataNode = cdn + p.Publisher.Plugins[name].ConfigDataNode = cdn } } if vs, ok := col["versions"]; ok { @@ -202,11 +369,11 @@ func unmarshalPluginConfig(typ string, p *pluginConfig, t map[string]interface{} } switch typ { case "collector": - p.Collector[name].Versions[ver] = cdn + p.Collector.Plugins[name].Versions[ver] = cdn case "processor": - p.Processor[name].Versions[ver] = cdn + p.Processor.Plugins[name].Versions[ver] = cdn case "publisher": - p.Publisher[name].Versions[ver] = cdn + p.Publisher.Plugins[name].Versions[ver] = cdn } default: return fmt.Errorf("Error unmarshalling %v'%v' expected '%v' got '%v'", typ, name, map[string]interface{}{}, reflect.TypeOf(v)) diff --git a/control/config_test.go b/control/config_test.go index d81fcd585..48cc15d33 100644 --- a/control/config_test.go +++ b/control/config_test.go @@ -5,7 +5,7 @@ import ( "io/ioutil" "testing" - "github.com/intelsdi-x/pulse/control/plugin" + "github.com/intelsdi-x/pulse/core" "github.com/intelsdi-x/pulse/core/cdata" "github.com/intelsdi-x/pulse/core/ctypes" . "github.com/smartystreets/goconvey/convey" @@ -18,16 +18,22 @@ func TestPluginConfig(t *testing.T) { Convey("with an entry for ALL plugins", func() { cfg.Plugins.All.AddItem("gvar", ctypes.ConfigValueBool{Value: true}) So(len(cfg.Plugins.All.Table()), ShouldEqual, 1) - Convey("an entry for a specific plugin of any version", func() { - cfg.Plugins.Collector["test"] = newPluginConfigItem(optAddPluginConfigItem("pvar", ctypes.ConfigValueBool{Value: true})) - So(len(cfg.Plugins.Collector["test"].Table()), ShouldEqual, 1) - Convey("and an entry for a specific plugin of a specific version", func() { - cfg.Plugins.Collector["test"].Versions[1] = cdata.NewNode() - cfg.Plugins.Collector["test"].Versions[1].AddItem("vvar", ctypes.ConfigValueBool{Value: true}) - So(len(cfg.Plugins.Collector["test"].Versions[1].Table()), ShouldEqual, 1) - Convey("we can get the merged conf for the given plugin", func() { - cd := cfg.Plugins.get(plugin.CollectorPluginType, "test", 1) - So(len(cd.Table()), ShouldEqual, 3) + Convey("with an entry for ALL collector plugins", func() { + cfg.Plugins.Collector.All.AddItem("user", ctypes.ConfigValueStr{"jane"}) + cfg.Plugins.Collector.All.AddItem("password", ctypes.ConfigValueStr{"P@ssw0rd"}) + So(len(cfg.Plugins.Collector.All.Table()), ShouldEqual, 2) + Convey("an entry for a specific plugin of any version", func() { + cfg.Plugins.Collector.Plugins["test"] = newPluginConfigItem(optAddPluginConfigItem("user", ctypes.ConfigValueStr{"john"})) + So(len(cfg.Plugins.Collector.Plugins["test"].Table()), ShouldEqual, 1) + Convey("and an entry for a specific plugin of a specific version", func() { + cfg.Plugins.Collector.Plugins["test"].Versions[1] = cdata.NewNode() + cfg.Plugins.Collector.Plugins["test"].Versions[1].AddItem("vvar", ctypes.ConfigValueBool{Value: true}) + So(len(cfg.Plugins.Collector.Plugins["test"].Versions[1].Table()), ShouldEqual, 1) + Convey("we can get the merged conf for the given plugin", func() { + cd := cfg.Plugins.getPluginConfigDataNode(core.CollectorPluginType, "test", 1) + So(len(cd.Table()), ShouldEqual, 4) + So(cd.Table()["user"], ShouldResemble, ctypes.ConfigValueStr{"john"}) + }) }) }) }) @@ -43,35 +49,47 @@ func TestPluginConfig(t *testing.T) { Convey("We are able to unmarshal it into a valid config", func() { err = json.Unmarshal(b, &cfg) So(err, ShouldBeNil) - So(cfg.Control, ShouldNotBeNil) - So(cfg.Control.Table()["cache_ttl"], ShouldResemble, ctypes.ConfigValueStr{Value: "5s"}) So(cfg.Plugins.All.Table()["password"], ShouldResemble, ctypes.ConfigValueStr{Value: "p@ssw0rd"}) - So(cfg.Plugins.Collector["pcm"], ShouldNotBeNil) - So(cfg.Plugins.Collector["pcm"].Table()["path"], ShouldResemble, ctypes.ConfigValueStr{Value: "/usr/local/pcm/bin"}) + So(cfg.Plugins.Collector.Plugins["pcm"], ShouldNotBeNil) + So(cfg.Plugins.Collector.Plugins["pcm"].Table()["path"], ShouldResemble, ctypes.ConfigValueStr{Value: "/usr/local/pcm/bin"}) So(cfg.Plugins, ShouldNotBeNil) So(cfg.Plugins.All, ShouldNotBeNil) - So(cfg.Plugins.Collector["pcm"].Versions[1].Table()["user"], ShouldResemble, ctypes.ConfigValueStr{Value: "john"}) - So(cfg.Plugins.Processor["movingaverage"].Table()["user"], ShouldResemble, ctypes.ConfigValueStr{Value: "jane"}) + So(cfg.Plugins.Collector.Plugins["pcm"].Versions[1].Table()["user"], ShouldResemble, ctypes.ConfigValueStr{Value: "john"}) + So(cfg.Plugins.Processor.Plugins["movingaverage"].Table()["user"], ShouldResemble, ctypes.ConfigValueStr{Value: "jane"}) - Convey("Through the config object we can access the stored configurations for plugins", func() { - c := cfg.Plugins.get(plugin.ProcessorPluginType, "movingaverage", 0) - So(c, ShouldNotBeNil) - So(c.Table()["password"], ShouldResemble, ctypes.ConfigValueStr{Value: "p@ssw0rd"}) + Convey("We can access the config for plugins", func() { + Convey("Getting the values of a specific version of a plugin", func() { + c := cfg.Plugins.getPluginConfigDataNode(core.CollectorPluginType, "pcm", 1) + So(c, ShouldNotBeNil) + So(c.Table()["password"], ShouldResemble, ctypes.ConfigValueStr{Value: "p@ssw0rd"}) + So(c.Table()["user"], ShouldResemble, ctypes.ConfigValueStr{Value: "john"}) + So(c.Table()["float"], ShouldResemble, ctypes.ConfigValueFloat{Value: 3.14}) + So(c.Table()["int"], ShouldResemble, ctypes.ConfigValueInt{Value: 1234}) + So(c.Table()["flag"], ShouldResemble, ctypes.ConfigValueBool{Value: true}) + }) + Convey("Getting the common config for collectors", func() { + c := cfg.Plugins.getPluginConfigDataNode(core.CollectorPluginType, "", -2) + So(c, ShouldNotBeNil) + So(len(c.Table()), ShouldEqual, 2) + So(c.Table()["user"], ShouldResemble, ctypes.ConfigValueStr{Value: "jane"}) + So(c.Table()["password"], ShouldResemble, ctypes.ConfigValueStr{Value: "p@ssw0rd"}) + }) Convey("Overwritting the value of a variable defined for all plugins", func() { - c := cfg.Plugins.get(plugin.ProcessorPluginType, "movingaverage", 1) + c := cfg.Plugins.getPluginConfigDataNode(core.ProcessorPluginType, "movingaverage", 1) So(c, ShouldNotBeNil) So(c.Table()["password"], ShouldResemble, ctypes.ConfigValueStr{Value: "new password"}) }) - Convey("Retrieving the value of a variable defined for all plugins", func() { - c := cfg.Plugins.get(plugin.CollectorPluginType, "pcm", 0) + Convey("Retrieving the value of a variable defined for all versions of a plugin", func() { + c := cfg.Plugins.getPluginConfigDataNode(core.CollectorPluginType, "pcm", 0) So(c, ShouldNotBeNil) So(c.Table()["password"], ShouldResemble, ctypes.ConfigValueStr{Value: "p@ssw0rd"}) }) Convey("Overwritting the value of a variable defined for all versions of the plugin", func() { - c := cfg.Plugins.get(plugin.ProcessorPluginType, "movingaverage", 1) + c := cfg.Plugins.getPluginConfigDataNode(core.ProcessorPluginType, "movingaverage", 1) So(c, ShouldNotBeNil) So(c.Table()["password"], ShouldResemble, ctypes.ConfigValueStr{Value: "new password"}) }) + }) }) }) diff --git a/control/control.go b/control/control.go index cc6c45e2c..1140b2566 100644 --- a/control/control.go +++ b/control/control.go @@ -58,7 +58,7 @@ type pluginControl struct { // TODO, going to need coordination on changing of these RunningPlugins executablePlugins Started bool - config *config + Config *config autodiscoverPaths []string eventManager *gomit.EventController @@ -126,9 +126,9 @@ func CacheExpiration(t time.Duration) ControlOpt { } } -func OptSetConfig(c *config) ControlOpt { - return func(p *pluginControl) { - p.config = c +func OptSetConfig(cfg *config) ControlOpt { + return func(c *pluginControl) { + c.Config = cfg } } @@ -136,7 +136,7 @@ func OptSetConfig(c *config) ControlOpt { func New(opts ...ControlOpt) *pluginControl { c := &pluginControl{} - c.config = NewConfig() + c.Config = NewConfig() // Initialize components // // Event Manager @@ -153,7 +153,7 @@ func New(opts ...ControlOpt) *pluginControl { }).Debug("metric catalog created") // Plugin Manager - c.pluginManager = newPluginManager(OptSetPluginConfig(c.config.Plugins)) + c.pluginManager = newPluginManager(OptSetPluginConfig(c.Config.Plugins)) controlLogger.WithFields(log.Fields{ "_block": "new", }).Debug("plugin manager created") @@ -690,7 +690,7 @@ func (p *pluginControl) CollectMetrics(metricTypes []core.Metric, deadline time. // merge global plugin config into the config for the metric for _, mt := range pmt.metricTypes { - mt.Config().Merge(p.config.Plugins.get(plugin.CollectorPluginType, ap.Name(), ap.Version())) + mt.Config().Merge(p.Config.Plugins.getPluginConfigDataNode(core.CollectorPluginType, ap.Name(), ap.Version())) } // get a metrics @@ -761,7 +761,7 @@ func (p *pluginControl) PublishMetrics(contentType string, content []byte, plugi } // merge global plugin config into the config for this request - cfg := p.config.Plugins.get(plugin.PublisherPluginType, ap.Name(), ap.Version()).Table() + cfg := p.Config.Plugins.getPluginConfigDataNode(core.PublisherPluginType, ap.Name(), ap.Version()).Table() for k, v := range config { cfg[k] = v } @@ -803,7 +803,7 @@ func (p *pluginControl) ProcessMetrics(contentType string, content []byte, plugi } // merge global plugin config into the config for this request - cfg := p.config.Plugins.get(plugin.ProcessorPluginType, ap.Name(), ap.Version()).Table() + cfg := p.Config.Plugins.getPluginConfigDataNode(core.ProcessorPluginType, ap.Name(), ap.Version()).Table() for k, v := range config { cfg[k] = v diff --git a/control/control_test.go b/control/control_test.go index 6cd7cd265..4168b79dc 100644 --- a/control/control_test.go +++ b/control/control_test.go @@ -697,7 +697,7 @@ func TestCollectMetrics(t *testing.T) { time.Sleep(100 * time.Millisecond) // Add a global plugin config - c.config.Plugins.Collector["dummy1"] = newPluginConfigItem(optAddPluginConfigItem("test", ctypes.ConfigValueBool{Value: true})) + c.Config.Plugins.Collector.Plugins["dummy1"] = newPluginConfigItem(optAddPluginConfigItem("test", ctypes.ConfigValueBool{Value: true})) // Load plugin c.Load(JSONRPC_PluginPath) @@ -857,7 +857,7 @@ func TestProcessMetrics(t *testing.T) { c.pluginRunner.(*runner).monitor.duration = time.Millisecond * 100 c.Start() time.Sleep(1 * time.Second) - c.config.Plugins.Processor["passthru"] = newPluginConfigItem(optAddPluginConfigItem("test", ctypes.ConfigValueBool{Value: true})) + c.Config.Plugins.Processor.Plugins["passthru"] = newPluginConfigItem(optAddPluginConfigItem("test", ctypes.ConfigValueBool{Value: true})) // Load plugin _, err := c.Load(path.Join(PulsePath, "plugin", "pulse-processor-passthru")) diff --git a/control/plugin_manager.go b/control/plugin_manager.go index 299b4c913..888a5c34d 100644 --- a/control/plugin_manager.go +++ b/control/plugin_manager.go @@ -321,7 +321,7 @@ func (p *pluginManager) LoadPlugin(path string, emitter gomit.Emitter) (*loadedP colClient := ap.client.(client.PluginCollectorClient) cfg := plugin.PluginConfigType{ - ConfigDataNode: p.pluginConfig.get(resp.Type, resp.Meta.Name, resp.Meta.Version), + ConfigDataNode: p.pluginConfig.getPluginConfigDataNode(core.PluginType(resp.Type), resp.Meta.Name, resp.Meta.Version), } metricTypes, err := colClient.GetMetricTypes(cfg) diff --git a/control/plugin_manager_test.go b/control/plugin_manager_test.go index 62f7613a9..c3daf8c6c 100644 --- a/control/plugin_manager_test.go +++ b/control/plugin_manager_test.go @@ -92,7 +92,7 @@ func TestLoadPlugin(t *testing.T) { Convey("with a plugin config a plugin loads successfully", func() { cfg := NewConfig() - cfg.Plugins.Collector["dummy2"] = newPluginConfigItem(optAddPluginConfigItem("test", ctypes.ConfigValueBool{Value: true})) + cfg.Plugins.Collector.Plugins["dummy2"] = newPluginConfigItem(optAddPluginConfigItem("test", ctypes.ConfigValueBool{Value: true})) p := newPluginManager(OptSetPluginConfig(cfg.Plugins)) p.SetMetricCatalog(newMetricCatalog()) lp, err := p.LoadPlugin(PluginPath, nil) @@ -108,7 +108,7 @@ func TestLoadPlugin(t *testing.T) { Convey("for a plugin requiring a config an incomplete config will result in a load failure", func() { cfg := NewConfig() - cfg.Plugins.Collector["dummy2"] = newPluginConfigItem(optAddPluginConfigItem("test-fail", ctypes.ConfigValueBool{Value: true})) + cfg.Plugins.Collector.Plugins["dummy2"] = newPluginConfigItem(optAddPluginConfigItem("test-fail", ctypes.ConfigValueBool{Value: true})) p := newPluginManager(OptSetPluginConfig(cfg.Plugins)) p.SetMetricCatalog(newMetricCatalog()) lp, err := p.LoadPlugin(PluginPath, nil) diff --git a/core/cdata/node.go b/core/cdata/node.go index 7a5a13ec2..a60be78e4 100644 --- a/core/cdata/node.go +++ b/core/cdata/node.go @@ -81,7 +81,6 @@ func (c *ConfigDataNode) UnmarshalJSON(data []byte) error { continue } if v, err := t.Float64(); err == nil { - fmt.Printf("%v is a float64\n", k) c.table[k] = ctypes.ConfigValueFloat{Value: v} continue } @@ -140,3 +139,11 @@ func (c ConfigDataNode) Merge(n ctree.Node) ctree.Node { // Return modified version of ConfigDataNode(as ctree.Node) return c } + +// Deletes a field in ConfigDataNode. If the field does not exist Delete is +// considered a no-op +func (c ConfigDataNode) DeleteItem(k string) { + c.mutex.Lock() + defer c.mutex.Unlock() + delete(c.table, k) +} diff --git a/examples/configs/pulse-config-sample.json b/examples/configs/pulse-config-sample.json index b6598ed9b..9b815280e 100644 --- a/examples/configs/pulse-config-sample.json +++ b/examples/configs/pulse-config-sample.json @@ -11,16 +11,28 @@ "password": "p@ssw0rd" }, "collector": { + "all": { + "user": "jane" + }, "pcm": { "all": { "path": "/usr/local/pcm/bin" }, "versions": { "1": { - "user": "john" + "user": "john", + "int": 1234, + "float": 3.14, + "flag": true } } + }, + "psutil": { + "all": { + "path": "/usr/local/bin/psutil" + } } + }, "publisher": { "influxdb": { diff --git a/mgmt/rest/config.go b/mgmt/rest/config.go new file mode 100644 index 000000000..8fa42c98f --- /dev/null +++ b/mgmt/rest/config.go @@ -0,0 +1,141 @@ +/* +http://www.apache.org/licenses/LICENSE-2.0.txt + + +Copyright 2015 Intel Coporation + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package rest + +import ( + "net/http" + "strconv" + + "github.com/intelsdi-x/pulse/core" + "github.com/intelsdi-x/pulse/core/cdata" + "github.com/intelsdi-x/pulse/mgmt/rest/rbody" + "github.com/julienschmidt/httprouter" +) + +func (s *Server) getPluginConfigItem(w http.ResponseWriter, r *http.Request, p httprouter.Params) { + var err error + styp := p.ByName("type") + if styp == "" { + cdn := s.mc.GetPluginConfigDataNodeAll() + item := &rbody.PluginConfigItem{cdn} + respond(200, item, w) + return + } + var ityp int + if ityp, err = strconv.Atoi(styp); err != nil { + respond(400, rbody.FromError(err), w) + return + } + name := p.ByName("name") + sver := p.ByName("version") + var iver int + if sver != "" { + if iver, err = strconv.Atoi(sver); err != nil { + respond(400, rbody.FromError(err), w) + return + } + } else { + iver = -2 + } + + cdn := s.mc.GetPluginConfigDataNode(core.PluginType(ityp), name, iver) + item := &rbody.PluginConfigItem{cdn} + respond(200, item, w) +} + +func (s *Server) deletePluginConfigItem(w http.ResponseWriter, r *http.Request, p httprouter.Params) { + var err error + styp := p.ByName("type") + var ityp int + if styp != "" { + if ityp, err = strconv.Atoi(styp); err != nil { + respond(400, rbody.FromError(err), w) + return + } + } + name := p.ByName("name") + sver := p.ByName("version") + var iver int + if sver != "" { + if iver, err = strconv.Atoi(sver); err != nil { + respond(400, rbody.FromError(err), w) + return + } + } else { + iver = -2 + } + + src := []string{} + errCode, err := marshalBody(&src, r.Body) + if errCode != 0 && err != nil { + respond(400, rbody.FromError(err), w) + return + } + + var res cdata.ConfigDataNode + if styp == "" { + res = s.mc.DeletePluginConfigDataNodeFieldAll(src...) + } else { + res = s.mc.DeletePluginConfigDataNodeField(core.PluginType(ityp), name, iver, src...) + } + + item := &rbody.DeletePluginConfigItem{res} + respond(200, item, w) +} + +func (s *Server) setPluginConfigItem(w http.ResponseWriter, r *http.Request, p httprouter.Params) { + var err error + styp := p.ByName("type") + var ityp int + if styp != "" { + if ityp, err = strconv.Atoi(styp); err != nil { + respond(400, rbody.FromError(err), w) + return + } + } + name := p.ByName("name") + sver := p.ByName("version") + var iver int + if sver != "" { + if iver, err = strconv.Atoi(sver); err != nil { + respond(400, rbody.FromError(err), w) + return + } + } else { + iver = -2 + } + + src := cdata.NewNode() + errCode, err := marshalBody(src, r.Body) + if errCode != 0 && err != nil { + respond(400, rbody.FromError(err), w) + return + } + + var res cdata.ConfigDataNode + if styp == "" { + res = s.mc.MergePluginConfigDataNodeAll(src) + } else { + res = s.mc.MergePluginConfigDataNode(core.PluginType(ityp), name, iver, src) + } + + item := &rbody.SetPluginConfigItem{res} + respond(200, item, w) +} diff --git a/mgmt/rest/rbody/body.go b/mgmt/rest/rbody/body.go index 76f19cc29..7cda08e4a 100644 --- a/mgmt/rest/rbody/body.go +++ b/mgmt/rest/rbody/body.go @@ -22,6 +22,8 @@ package rbody import ( "encoding/json" "errors" + + "github.com/intelsdi-x/pulse/core/cdata" ) type Body interface { @@ -111,6 +113,12 @@ func UnmarshalBody(t string, b []byte) (Body, error) { return unmarshalAndHandleError(b, &TribeJoinAgreement{}) case TribeGetAgreementType: return unmarshalAndHandleError(b, &TribeGetAgreement{}) + case PluginConfigItemType: + return unmarshalAndHandleError(b, &PluginConfigItem{*cdata.NewNode()}) + case SetPluginConfigItemType: + return unmarshalAndHandleError(b, &SetPluginConfigItem{*cdata.NewNode()}) + case DeletePluginConfigItemType: + return unmarshalAndHandleError(b, &DeletePluginConfigItem{*cdata.NewNode()}) case ErrorType: return unmarshalAndHandleError(b, &Error{}) default: diff --git a/mgmt/rest/rbody/config.go b/mgmt/rest/rbody/config.go new file mode 100644 index 000000000..cf1b77c06 --- /dev/null +++ b/mgmt/rest/rbody/config.go @@ -0,0 +1,60 @@ +/* +http://www.apache.org/licenses/LICENSE-2.0.txt + + +Copyright 2015 Intel Coporation + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package rbody + +import "github.com/intelsdi-x/pulse/core/cdata" + +const ( + PluginConfigItemType = "config_plugin_item_returned" + SetPluginConfigItemType = "config_plugin_item_created" + DeletePluginConfigItemType = "config_plugin_item_deleted" +) + +type DeletePluginConfigItem PluginConfigItem + +func (t *DeletePluginConfigItem) ResponseBodyMessage() string { + return "Plugin config item field(s) deleted" +} + +func (t *DeletePluginConfigItem) ResponseBodyType() string { + return DeletePluginConfigItemType +} + +type SetPluginConfigItem PluginConfigItem + +func (t *SetPluginConfigItem) ResponseBodyMessage() string { + return "Plugin config item(s) set" +} + +func (t *SetPluginConfigItem) ResponseBodyType() string { + return SetPluginConfigItemType +} + +type PluginConfigItem struct { + cdata.ConfigDataNode +} + +func (t *PluginConfigItem) ResponseBodyMessage() string { + return "Plugin config item retrieved" +} + +func (t *PluginConfigItem) ResponseBodyType() string { + return PluginConfigItemType +} diff --git a/mgmt/rest/rest_func_test.go b/mgmt/rest/rest_func_test.go index c4ce41dc1..aa33be08d 100644 --- a/mgmt/rest/rest_func_test.go +++ b/mgmt/rest/rest_func_test.go @@ -33,14 +33,15 @@ import ( "net/http" "os" "path/filepath" - "sync" "testing" "time" log "github.com/Sirupsen/logrus" - "github.com/pborman/uuid" "github.com/intelsdi-x/pulse/control" + "github.com/intelsdi-x/pulse/core" + "github.com/intelsdi-x/pulse/core/cdata" + "github.com/intelsdi-x/pulse/core/ctypes" "github.com/intelsdi-x/pulse/mgmt/rest/rbody" "github.com/intelsdi-x/pulse/mgmt/rest/request" "github.com/intelsdi-x/pulse/scheduler" @@ -50,7 +51,7 @@ import ( var ( // Switching this turns on logging for all the REST API calls - LOG_LEVEL = log.FatalLevel + LOG_LEVEL = log.WarnLevel PULSE_PATH = os.Getenv("PULSE_PATH") DUMMY_PLUGIN_PATH1 = PULSE_PATH + "/plugin/pulse-collector-dummy1" @@ -377,19 +378,93 @@ func fetchMetricsWithVersion(port int, ns string, ver int) *rbody.APIResponse { return getAPIResponse(resp) } +func getPluginConfigItem(port int, typ *core.PluginType, name, ver string) *rbody.APIResponse { + var uri string + if typ != nil { + uri = fmt.Sprintf("http://localhost:%d/v1/plugins/%d/%s/%s/config", port, *typ, name, ver) + } else { + uri = fmt.Sprintf("http://localhost:%d/v1/plugins/%s/%s/%s/config", port, "", name, ver) + } + resp, err := http.Get(uri) + if err != nil { + log.Fatal(err) + } + + return getAPIResponse(resp) +} + +func setPluginConfigItem(port int, typ *core.PluginType, name, ver string, cdn *cdata.ConfigDataNode) *rbody.APIResponse { + var uri string + if typ != nil { + uri = fmt.Sprintf("http://localhost:%d/v1/plugins/%d/%s/%s/config", port, *typ, name, ver) + } else { + uri = fmt.Sprintf("http://localhost:%d/v1/plugins/%s/%s/%s/config", port, "", name, ver) + } + + client := &http.Client{} + b, err := json.Marshal(cdn) + if err != nil { + log.Fatal(err) + } + req, err := http.NewRequest("PUT", uri, bytes.NewReader(b)) + if err != nil { + log.Fatal(err) + } + req.Header.Add("Content-Type", "application/json") + resp, err := client.Do(req) + if err != nil { + log.Fatal(err) + } + + return getAPIResponse(resp) +} + +func deletePluginConfigItem(port int, typ *core.PluginType, name, ver string, fields []string) *rbody.APIResponse { + var uri string + if typ != nil { + uri = fmt.Sprintf("http://localhost:%d/v1/plugins/%d/%s/%s/config", port, *typ, name, ver) + } else { + uri = fmt.Sprintf("http://localhost:%d/v1/plugins/%s/%s/%s/config", port, "", name, ver) + } + + client := &http.Client{} + b, err := json.Marshal(fields) + if err != nil { + log.Fatal(err) + } + req, err := http.NewRequest("DELETE", uri, bytes.NewReader(b)) + if err != nil { + log.Fatal(err) + } + req.Header.Add("Content-Type", "application/json") + resp, err := client.Do(req) + if err != nil { + log.Fatal(err) + } + return getAPIResponse(resp) +} + // REST API instances that are started are killed when the tests end. // When we eventually have a REST API Stop command this can be killed. -func startAPI(port int) *restAPIInstance { +func startAPI(port int, opts ...interface{}) *restAPIInstance { // Start a REST API to talk to log.SetLevel(LOG_LEVEL) r, _ := New(false, "", "") - c := control.New() + controlOpts := []control.ControlOpt{} + for _, opt := range opts { + switch t := opt.(type) { + case control.ControlOpt: + controlOpts = append(controlOpts, t) + } + } + c := control.New(controlOpts...) c.Start() s := scheduler.New() s.SetMetricManager(c) s.Start() r.BindMetricManager(c) r.BindTaskManager(s) + r.BindConfigManager(c.Config) r.Start(":" + fmt.Sprint(port)) time.Sleep(time.Millisecond * 100) return &restAPIInstance{ @@ -405,705 +480,108 @@ func TestPluginRestCalls(t *testing.T) { Convey("a single plugin loads", func() { // This test alone tests gzip. Saves on test time. CompressedUpload = true - port := getPort() - startAPI(port) // Make this unique for each Convey hierarchy - - // The second argument here is a string from the HTTP response body - // Useful to println if you want to see what the return looks like. - r := uploadPlugin(DUMMY_PLUGIN_PATH1, port) - So(r.Body, ShouldHaveSameTypeAs, new(rbody.PluginsLoaded)) - plr := r.Body.(*rbody.PluginsLoaded) - - // We should have gotten out loaded plugin back - So(plr.ResponseBodyType(), ShouldEqual, rbody.PluginsLoadedType) - So(plr.ResponseBodyMessage(), ShouldEqual, "Plugins loaded: dummy1(collector v1)") - So(len(plr.LoadedPlugins), ShouldEqual, 1) - So(plr.LoadedPlugins[0].Name, ShouldEqual, "dummy1") - So(plr.LoadedPlugins[0].Version, ShouldEqual, 1) - So(plr.LoadedPlugins[0].Status, ShouldEqual, "loaded") - So(plr.LoadedPlugins[0].Type, ShouldEqual, "collector") - So(plr.LoadedPlugins[0].LoadedTimestamp, ShouldBeLessThanOrEqualTo, time.Now().Unix()) - - // Should only be one in the list - r2 := getPluginList(port) - So(r2.Body, ShouldHaveSameTypeAs, new(rbody.PluginList)) - plr2 := r2.Body.(*rbody.PluginList) - - So(len(plr2.LoadedPlugins), ShouldEqual, 1) - So(plr2.LoadedPlugins[0].Name, ShouldEqual, "dummy1") - So(plr2.LoadedPlugins[0].Version, ShouldEqual, 1) - So(plr2.LoadedPlugins[0].Status, ShouldEqual, "loaded") - So(plr2.LoadedPlugins[0].Type, ShouldEqual, "collector") - So(plr2.LoadedPlugins[0].LoadedTimestamp, ShouldBeLessThanOrEqualTo, time.Now().Unix()) - CompressedUpload = false - }) - - Convey("load attempt to load same plugin", func() { - port := getPort() - startAPI(port) - - r := uploadPlugin(DUMMY_PLUGIN_PATH1, port) - So(r.Body, ShouldHaveSameTypeAs, new(rbody.PluginsLoaded)) - plr := r.Body.(*rbody.PluginsLoaded) - - So(plr.ResponseBodyType(), ShouldEqual, rbody.PluginsLoadedType) - So(plr.ResponseBodyMessage(), ShouldEqual, "Plugins loaded: dummy1(collector v1)") - So(len(plr.LoadedPlugins), ShouldEqual, 1) - So(plr.LoadedPlugins[0].Name, ShouldEqual, "dummy1") - So(plr.LoadedPlugins[0].Version, ShouldEqual, 1) - So(plr.LoadedPlugins[0].Status, ShouldEqual, "loaded") - So(plr.LoadedPlugins[0].Type, ShouldEqual, "collector") - So(plr.LoadedPlugins[0].LoadedTimestamp, ShouldBeLessThanOrEqualTo, time.Now().Unix()) - - r2 := uploadPlugin(DUMMY_PLUGIN_PATH1, port) - So(r2.Meta.Code, ShouldEqual, 409) - So(r2.Body, ShouldHaveSameTypeAs, new(rbody.Error)) - plr2 := r2.Body.(*rbody.Error) - - So(plr2.ResponseBodyType(), ShouldEqual, rbody.ErrorType) - So(plr2.ResponseBodyMessage(), ShouldEqual, "plugin is already loaded") - - // Should only be one in the list - r3 := getPluginList(port) - So(r3.Body, ShouldHaveSameTypeAs, new(rbody.PluginList)) - plr3 := r3.Body.(*rbody.PluginList) - - So(len(plr3.LoadedPlugins), ShouldEqual, 1) - So(plr3.LoadedPlugins[0].Name, ShouldEqual, "dummy1") - So(plr3.LoadedPlugins[0].Version, ShouldEqual, 1) - So(plr3.LoadedPlugins[0].Status, ShouldEqual, "loaded") - So(plr3.LoadedPlugins[0].Type, ShouldEqual, "collector") - So(plr3.LoadedPlugins[0].LoadedTimestamp, ShouldBeLessThanOrEqualTo, time.Now().Unix()) - }) - - Convey("load two plugins", func() { - port := getPort() - startAPI(port) - - r := uploadPlugin(DUMMY_PLUGIN_PATH1, port) - So(r.Body, ShouldHaveSameTypeAs, new(rbody.PluginsLoaded)) - plr := r.Body.(*rbody.PluginsLoaded) - - So(plr.ResponseBodyType(), ShouldEqual, rbody.PluginsLoadedType) - So(plr.ResponseBodyMessage(), ShouldEqual, "Plugins loaded: dummy1(collector v1)") - So(len(plr.LoadedPlugins), ShouldEqual, 1) - So(plr.LoadedPlugins[0].Name, ShouldEqual, "dummy1") - So(plr.LoadedPlugins[0].Version, ShouldEqual, 1) - So(plr.LoadedPlugins[0].Status, ShouldEqual, "loaded") - So(plr.LoadedPlugins[0].Type, ShouldEqual, "collector") - So(plr.LoadedPlugins[0].LoadedTimestamp, ShouldBeLessThanOrEqualTo, time.Now().Unix()) - - r2 := uploadPlugin(DUMMY_PLUGIN_PATH2, port) - So(r2.Body, ShouldHaveSameTypeAs, new(rbody.PluginsLoaded)) - plr2 := r2.Body.(*rbody.PluginsLoaded) - - So(plr2.ResponseBodyType(), ShouldEqual, rbody.PluginsLoadedType) - So(plr2.ResponseBodyMessage(), ShouldEqual, "Plugins loaded: dummy2(collector v2)") - So(len(plr2.LoadedPlugins), ShouldEqual, 1) - So(plr2.LoadedPlugins[0].Name, ShouldEqual, "dummy2") - So(plr2.LoadedPlugins[0].Version, ShouldEqual, 2) - So(plr2.LoadedPlugins[0].Status, ShouldEqual, "loaded") - So(plr2.LoadedPlugins[0].Type, ShouldEqual, "collector") - So(plr2.LoadedPlugins[0].LoadedTimestamp, ShouldBeLessThanOrEqualTo, time.Now().Unix()) - - // Should be two in the list - r3 := getPluginList(port) - So(r3.Body, ShouldHaveSameTypeAs, new(rbody.PluginList)) - plr3 := r3.Body.(*rbody.PluginList) - - So(len(plr3.LoadedPlugins), ShouldEqual, 2) - So(plr3.LoadedPlugins[0].Name, ShouldContainSubstring, "dummy") - So(plr3.LoadedPlugins[0].Version, ShouldBeIn, []int{1, 2}) - So(plr3.LoadedPlugins[0].Status, ShouldEqual, "loaded") - So(plr3.LoadedPlugins[0].Type, ShouldEqual, "collector") - So(plr3.LoadedPlugins[0].LoadedTimestamp, ShouldBeLessThanOrEqualTo, time.Now().Unix()) - if plr3.LoadedPlugins[0].Name == "dummy1" { - So(plr3.LoadedPlugins[1].Name, ShouldEqual, "dummy2") - So(plr3.LoadedPlugins[1].Version, ShouldEqual, 2) - } else { - So(plr3.LoadedPlugins[1].Name, ShouldEqual, "dummy1") - So(plr3.LoadedPlugins[1].Version, ShouldEqual, 1) - } - So(plr3.LoadedPlugins[1].Status, ShouldEqual, "loaded") - So(plr3.LoadedPlugins[1].Type, ShouldEqual, "collector") - So(plr3.LoadedPlugins[1].LoadedTimestamp, ShouldBeLessThanOrEqualTo, time.Now().Unix()) - }) - }) - - Convey("Unload Plugin - DELETE - /v1/plugins/:name/:version", func() { - Convey("error in unload of unknown plugin", func() { - port := getPort() - startAPI(port) - - r := unloadPlugin(port, "collector", "dummy1", 1) - So(r.Body, ShouldHaveSameTypeAs, new(rbody.Error)) - plr := r.Body.(*rbody.Error) - - So(plr.ResponseBodyType(), ShouldEqual, rbody.ErrorType) - So(plr.ResponseBodyMessage(), ShouldEqual, "plugin not found") - }) - - Convey("unload single plugin", func() { - port := getPort() - startAPI(port) - // Load one - r1 := uploadPlugin(DUMMY_PLUGIN_PATH1, port) - So(r1.Body, ShouldHaveSameTypeAs, new(rbody.PluginsLoaded)) - - // Unload it now - r := unloadPlugin(port, "collector", "dummy1", 1) - So(r.Body, ShouldHaveSameTypeAs, new(rbody.PluginUnloaded)) - plr := r.Body.(*rbody.PluginUnloaded) - - So(plr.ResponseBodyType(), ShouldEqual, rbody.PluginUnloadedType) - So(plr.ResponseBodyMessage(), ShouldEqual, "Plugin successfuly unloaded (dummy1v1)") - So(plr.Name, ShouldEqual, "dummy1") - So(plr.Version, ShouldEqual, 1) - So(plr.Type, ShouldEqual, "collector") - - // Plugin should NOT be in the list - r2 := getPluginList(port) - So(r2.Body, ShouldHaveSameTypeAs, new(rbody.PluginList)) - plr2 := r2.Body.(*rbody.PluginList) - - So(len(plr2.LoadedPlugins), ShouldEqual, 0) - }) - - Convey("unload one of two plugins", func() { - port := getPort() - startAPI(port) - // Load first - r1 := uploadPlugin(DUMMY_PLUGIN_PATH1, port) - So(r1.Body, ShouldHaveSameTypeAs, new(rbody.PluginsLoaded)) - // Load second - r2 := uploadPlugin(DUMMY_PLUGIN_PATH2, port) - So(r2.Body, ShouldHaveSameTypeAs, new(rbody.PluginsLoaded)) - - // Unload second - r := unloadPlugin(port, "collector", "dummy2", 2) - So(r.Body, ShouldHaveSameTypeAs, new(rbody.PluginUnloaded)) - plr := r.Body.(*rbody.PluginUnloaded) - - So(plr.ResponseBodyType(), ShouldEqual, rbody.PluginUnloadedType) - So(plr.ResponseBodyMessage(), ShouldEqual, "Plugin successfuly unloaded (dummy2v2)") - So(plr.Name, ShouldEqual, "dummy2") - So(plr.Version, ShouldEqual, 2) - So(plr.Type, ShouldEqual, "collector") - - r = getPluginList(port) - So(r.Body, ShouldHaveSameTypeAs, new(rbody.PluginList)) - plr2 := r.Body.(*rbody.PluginList) - - So(len(plr2.LoadedPlugins), ShouldEqual, 1) - So(plr2.LoadedPlugins[0].Name, ShouldNotEqual, "dummy2") - So(plr2.LoadedPlugins[0].Version, ShouldEqual, 1) - So(plr2.LoadedPlugins[0].Status, ShouldEqual, "loaded") - So(plr2.LoadedPlugins[0].Type, ShouldEqual, "collector") - }) - }) - - Convey("Plugin List - GET - /v1/plugins", func() { - Convey("no plugins", func() { - port := getPort() - startAPI(port) - - r := getPluginList(port) - So(r.Body, ShouldHaveSameTypeAs, new(rbody.PluginList)) - plr := r.Body.(*rbody.PluginList) - - So(plr.ResponseBodyType(), ShouldEqual, rbody.PluginListType) - So(plr.ResponseBodyMessage(), ShouldEqual, "Plugin list returned") - So(len(plr.LoadedPlugins), ShouldEqual, 0) - So(len(plr.AvailablePlugins), ShouldEqual, 0) - }) - - Convey("one plugin in list", func() { - port := getPort() - startAPI(port) - - uploadPlugin(DUMMY_PLUGIN_PATH1, port) - - r := getPluginList(port) - So(r.Body, ShouldHaveSameTypeAs, new(rbody.PluginList)) - plr := r.Body.(*rbody.PluginList) - - So(plr.ResponseBodyType(), ShouldEqual, rbody.PluginListType) - So(plr.ResponseBodyMessage(), ShouldEqual, "Plugin list returned") - So(len(plr.LoadedPlugins), ShouldEqual, 1) - So(len(plr.AvailablePlugins), ShouldEqual, 0) - So(plr.LoadedPlugins[0].Name, ShouldEqual, "dummy1") - So(plr.LoadedPlugins[0].Version, ShouldEqual, 1) - So(plr.LoadedPlugins[0].Status, ShouldEqual, "loaded") - So(plr.LoadedPlugins[0].Type, ShouldEqual, "collector") - So(plr.LoadedPlugins[0].LoadedTimestamp, ShouldBeLessThanOrEqualTo, time.Now().Unix()) - }) - - Convey("multiple plugins in list", func() { - port := getPort() - startAPI(port) - - uploadPlugin(DUMMY_PLUGIN_PATH1, port) - uploadPlugin(DUMMY_PLUGIN_PATH2, port) - - r := getPluginList(port) - So(r.Body, ShouldHaveSameTypeAs, new(rbody.PluginList)) - plr := r.Body.(*rbody.PluginList) - - So(plr.ResponseBodyType(), ShouldEqual, rbody.PluginListType) - So(plr.ResponseBodyMessage(), ShouldEqual, "Plugin list returned") - So(len(plr.LoadedPlugins), ShouldEqual, 2) - So(len(plr.AvailablePlugins), ShouldEqual, 0) - var ( - x, y int - ) - if plr.LoadedPlugins[0].Name == "dummy1" { - y = 1 - } else { - x = 1 - } - So(plr.LoadedPlugins[x].Name, ShouldEqual, "dummy1") - So(plr.LoadedPlugins[x].Version, ShouldEqual, 1) - So(plr.LoadedPlugins[x].Status, ShouldEqual, "loaded") - So(plr.LoadedPlugins[x].Type, ShouldEqual, "collector") - So(plr.LoadedPlugins[x].LoadedTimestamp, ShouldBeLessThanOrEqualTo, time.Now().Unix()) - // - So(plr.LoadedPlugins[y].Name, ShouldEqual, "dummy2") - So(plr.LoadedPlugins[y].Version, ShouldEqual, 2) - So(plr.LoadedPlugins[y].Status, ShouldEqual, "loaded") - So(plr.LoadedPlugins[y].Type, ShouldEqual, "collector") - So(plr.LoadedPlugins[y].LoadedTimestamp, ShouldBeLessThanOrEqualTo, time.Now().Unix()) - }) - }) - - Convey("Metric Catalog - GET - /v1/metrics", func() { - Convey("empty catalog", func() { - port := getPort() - startAPI(port) - - r := getMetricCatalog(port) - So(r.Body, ShouldHaveSameTypeAs, new(rbody.MetricsReturned)) - plr := r.Body.(*rbody.MetricsReturned) - - So(len(*plr), ShouldEqual, 0) - }) - - Convey("plugin metrics show up in the catalog", func() { - port := getPort() - startAPI(port) - - uploadPlugin(DUMMY_PLUGIN_PATH1, port) - r := getMetricCatalog(port) - So(r.Body, ShouldHaveSameTypeAs, new(rbody.MetricsReturned)) - plr := r.Body.(*rbody.MetricsReturned) - - So(len(*plr), ShouldEqual, 2) - So((*plr)[0].Namespace, ShouldEqual, "/intel/dummy/bar") - So((*plr)[0].LastAdvertisedTimestamp, ShouldBeLessThanOrEqualTo, time.Now().Unix()) - So((*plr)[1].Namespace, ShouldEqual, "/intel/dummy/foo") - So((*plr)[1].LastAdvertisedTimestamp, ShouldBeLessThanOrEqualTo, time.Now().Unix()) - }) - - Convey("newer plugin upgrades the metrics", func() { - port := getPort() - startAPI(port) - - // upload v1 - uploadPlugin(DUMMY_PLUGIN_PATH1, port) - r := getMetricCatalog(port) - So(r.Body, ShouldHaveSameTypeAs, new(rbody.MetricsReturned)) - plr := r.Body.(*rbody.MetricsReturned) - - So(len(*plr), ShouldEqual, 2) - So((*plr)[0].Namespace, ShouldEqual, "/intel/dummy/bar") - So((*plr)[0].LastAdvertisedTimestamp, ShouldBeLessThanOrEqualTo, time.Now().Unix()) - So((*plr)[1].Namespace, ShouldEqual, "/intel/dummy/foo") - So((*plr)[1].LastAdvertisedTimestamp, ShouldBeLessThanOrEqualTo, time.Now().Unix()) - - // upload v2 - uploadPlugin(DUMMY_PLUGIN_PATH2, port) - r2 := getMetricCatalog(port) - So(r2.Body, ShouldHaveSameTypeAs, new(rbody.MetricsReturned)) - plr2 := r2.Body.(*rbody.MetricsReturned) - - So(len(*plr2), ShouldEqual, 4) - So((*plr2)[0].Namespace, ShouldEqual, "/intel/dummy/bar") - So((*plr2)[0].Version, ShouldEqual, 1) - So((*plr2)[0].LastAdvertisedTimestamp, ShouldBeLessThanOrEqualTo, time.Now().Unix()) - So((*plr2)[1].Namespace, ShouldEqual, "/intel/dummy/bar") - So((*plr2)[1].Version, ShouldEqual, 2) - So((*plr2)[1].LastAdvertisedTimestamp, ShouldBeLessThanOrEqualTo, time.Now().Unix()) - So((*plr2)[2].Namespace, ShouldEqual, "/intel/dummy/foo") - So((*plr2)[2].Version, ShouldEqual, 1) - So((*plr2)[2].LastAdvertisedTimestamp, ShouldBeLessThanOrEqualTo, time.Now().Unix()) - So((*plr2)[3].Namespace, ShouldEqual, "/intel/dummy/foo") - So((*plr2)[3].LastAdvertisedTimestamp, ShouldBeLessThanOrEqualTo, time.Now().Unix()) - So((*plr2)[3].Version, ShouldEqual, 2) - - }) - - Convey("removing a newer plugin downgrades the metrics", func() { - port := getPort() - startAPI(port) - - // upload v1 - uploadPlugin(DUMMY_PLUGIN_PATH1, port) - r := getMetricCatalog(port) - So(r.Body, ShouldHaveSameTypeAs, new(rbody.MetricsReturned)) - plr := r.Body.(*rbody.MetricsReturned) - - So(len(*plr), ShouldEqual, 2) - So((*plr)[0].Namespace, ShouldEqual, "/intel/dummy/bar") - So((*plr)[0].LastAdvertisedTimestamp, ShouldBeLessThanOrEqualTo, time.Now().Unix()) - So((*plr)[1].Namespace, ShouldEqual, "/intel/dummy/foo") - So((*plr)[1].LastAdvertisedTimestamp, ShouldBeLessThanOrEqualTo, time.Now().Unix()) - - // upload v2 - uploadPlugin(DUMMY_PLUGIN_PATH2, port) - r2 := getMetricCatalog(port) - So(r2.Body, ShouldHaveSameTypeAs, new(rbody.MetricsReturned)) - plr2 := r2.Body.(*rbody.MetricsReturned) - - So(len(*plr2), ShouldEqual, 4) - So((*plr2)[0].Namespace, ShouldEqual, "/intel/dummy/bar") - So((*plr2)[0].Version, ShouldEqual, 1) - So((*plr2)[0].LastAdvertisedTimestamp, ShouldBeLessThanOrEqualTo, time.Now().Unix()) - So((*plr2)[1].Namespace, ShouldEqual, "/intel/dummy/bar") - So((*plr2)[1].Version, ShouldEqual, 2) - So((*plr2)[1].LastAdvertisedTimestamp, ShouldBeLessThanOrEqualTo, time.Now().Unix()) - So((*plr2)[2].Namespace, ShouldEqual, "/intel/dummy/foo") - So((*plr2)[2].Version, ShouldEqual, 1) - So((*plr2)[2].LastAdvertisedTimestamp, ShouldBeLessThanOrEqualTo, time.Now().Unix()) - So((*plr2)[3].Namespace, ShouldEqual, "/intel/dummy/foo") - So((*plr2)[3].LastAdvertisedTimestamp, ShouldBeLessThanOrEqualTo, time.Now().Unix()) - So((*plr2)[3].Version, ShouldEqual, 2) - - // remove v2 - unloadPlugin(port, "collector", "dummy2", 2) - r3 := getMetricCatalog(port) - So(r3.Body, ShouldHaveSameTypeAs, new(rbody.MetricsReturned)) - plr3 := r3.Body.(*rbody.MetricsReturned) - - So(len(*plr3), ShouldEqual, 2) - So((*plr3)[0].Namespace, ShouldEqual, "/intel/dummy/bar") - So((*plr3)[0].LastAdvertisedTimestamp, ShouldBeLessThanOrEqualTo, time.Now().Unix()) - So((*plr3)[1].Namespace, ShouldEqual, "/intel/dummy/foo") - So((*plr3)[1].LastAdvertisedTimestamp, ShouldBeLessThanOrEqualTo, time.Now().Unix()) - - }) - }) - Convey("metrics accessible via tree-like lookup", func() { - port := getPort() - startAPI(port) - - uploadPlugin(DUMMY_PLUGIN_PATH1, port) - r := fetchMetrics(port, "/intel/dummy/*") - So(r.Body, ShouldHaveSameTypeAs, new(rbody.MetricsReturned)) - plr := r.Body.(*rbody.MetricsReturned) - - So(len(*plr), ShouldEqual, 2) - So((*plr)[0].Namespace, ShouldEqual, "/intel/dummy/bar") - So((*plr)[0].Version, ShouldEqual, 1) - So((*plr)[0].LastAdvertisedTimestamp, ShouldBeLessThanOrEqualTo, time.Now().Unix()) - So((*plr)[1].Namespace, ShouldEqual, "/intel/dummy/foo") - So((*plr)[1].Version, ShouldEqual, 1) - So((*plr)[1].LastAdvertisedTimestamp, ShouldBeLessThanOrEqualTo, time.Now().Unix()) - - }) - - Convey("metrics with version accessible via tree-like lookup", func() { - port := getPort() - startAPI(port) - - uploadPlugin(DUMMY_PLUGIN_PATH1, port) - uploadPlugin(DUMMY_PLUGIN_PATH2, port) - r := fetchMetricsWithVersion(port, "/intel/dummy/*", 2) - So(r.Body, ShouldHaveSameTypeAs, new(rbody.MetricsReturned)) - plr := r.Body.(*rbody.MetricsReturned) - - So(len(*plr), ShouldEqual, 2) - So((*plr)[0].Namespace, ShouldEqual, "/intel/dummy/bar") - So((*plr)[0].Version, ShouldEqual, 2) - So((*plr)[0].LastAdvertisedTimestamp, ShouldBeLessThanOrEqualTo, time.Now().Unix()) - So((*plr)[1].Namespace, ShouldEqual, "/intel/dummy/foo") - So((*plr)[1].Version, ShouldEqual, 2) - So((*plr)[1].LastAdvertisedTimestamp, ShouldBeLessThanOrEqualTo, time.Now().Unix()) - - }) - - Convey("Create Task - POST - /v1/tasks", func() { - Convey("creating task with missing metric errors", func() { - port := getPort() - startAPI(port) - - r := createTask("1.json", "foo", "1s", true, port) - So(r.Body, ShouldHaveSameTypeAs, new(rbody.Error)) - plr := r.Body.(*rbody.Error) - So(plr.ErrorMessage, ShouldContainSubstring, "Metric not found: /intel/dummy/foo") - }) - - Convey("create task works when plugins are loaded", func() { - port := getPort() - startAPI(port) - - uploadPlugin(DUMMY_PLUGIN_PATH2, port) - uploadPlugin(FILE_PLUGIN_PATH, port) - r := createTask("1.json", "foo", "1s", true, port) - So(r.Body, ShouldHaveSameTypeAs, new(rbody.AddScheduledTask)) - plr := r.Body.(*rbody.AddScheduledTask) - So(plr.CreationTimestamp, ShouldBeLessThanOrEqualTo, time.Now().Unix()) - So(plr.Name, ShouldEqual, "foo") - So(plr.HitCount, ShouldEqual, 0) - So(plr.FailedCount, ShouldEqual, 0) - So(plr.MissCount, ShouldEqual, 0) - So(plr.State, ShouldEqual, "Stopped") - So(plr.Deadline, ShouldEqual, "5s") - }) - - }) - - Convey("Get Tasks - GET - /v1/tasks", func() { - Convey("get tasks after single task added", func() { - port := getPort() - startAPI(port) - - uploadPlugin(DUMMY_PLUGIN_PATH2, port) - uploadPlugin(FILE_PLUGIN_PATH, port) - r := createTask("1.json", "bar", "1s", true, port) - So(r.Body, ShouldHaveSameTypeAs, new(rbody.AddScheduledTask)) - - r2 := getTasks(port) - So(r2.Body, ShouldHaveSameTypeAs, new(rbody.ScheduledTaskListReturned)) - plr2 := r2.Body.(*rbody.ScheduledTaskListReturned) - So(len(plr2.ScheduledTasks), ShouldEqual, 1) - So(plr2.ScheduledTasks[0].Name, ShouldEqual, "bar") - }) - - Convey("get tasks after multiple tasks added", func() { - port := getPort() - startAPI(port) - - uploadPlugin(DUMMY_PLUGIN_PATH2, port) - uploadPlugin(FILE_PLUGIN_PATH, port) - - r1 := createTask("1.json", "alpha", "1s", true, port) - So(r1.Body, ShouldHaveSameTypeAs, new(rbody.AddScheduledTask)) - - r2 := createTask("1.json", "beta", "1s", true, port) - So(r2.Body, ShouldHaveSameTypeAs, new(rbody.AddScheduledTask)) - - r3 := getTasks(port) - So(r3.Body, ShouldHaveSameTypeAs, new(rbody.ScheduledTaskListReturned)) - plr3 := r3.Body.(*rbody.ScheduledTaskListReturned) - So(len(plr3.ScheduledTasks), ShouldEqual, 2) - So(plr3.ScheduledTasks[0].Name, ShouldEqual, "alpha") - So(plr3.ScheduledTasks[1].Name, ShouldEqual, "beta") - }) - }) - - Convey("Get Task By ID - GET - /v1/tasks/:id", func() { - Convey("get task after task added", func() { - port := getPort() - startAPI(port) - - uploadPlugin(DUMMY_PLUGIN_PATH2, port) - uploadPlugin(FILE_PLUGIN_PATH, port) - r1 := createTask("1.json", "foo", "3s", true, port) - So(r1.Body, ShouldHaveSameTypeAs, new(rbody.AddScheduledTask)) - t1 := r1.Body.(*rbody.AddScheduledTask) - r2 := getTask(t1.ID, port) - t2 := r2.Body.(*rbody.ScheduledTaskReturned) - So(t2.AddScheduledTask.Name, ShouldEqual, "foo") - }) - }) - - Convey("Start Task - PUT - /v1/tasks/:id/start", func() { - Convey("starts after being created", func() { - port := getPort() - startAPI(port) - - uploadPlugin(DUMMY_PLUGIN_PATH2, port) - uploadPlugin(FILE_PLUGIN_PATH, port) - - r1 := createTask("1.json", "xenu", "1s", true, port) - So(r1.Body, ShouldHaveSameTypeAs, new(rbody.AddScheduledTask)) - plr1 := r1.Body.(*rbody.AddScheduledTask) - - id := plr1.ID - - r2 := startTask(id, port) - So(r2.Body, ShouldHaveSameTypeAs, new(rbody.ScheduledTaskStarted)) - plr2 := r2.Body.(*rbody.ScheduledTaskStarted) - So(plr2.ID, ShouldEqual, id) - - r3 := getTasks(port) - So(r3.Body, ShouldHaveSameTypeAs, new(rbody.ScheduledTaskListReturned)) - plr3 := r3.Body.(*rbody.ScheduledTaskListReturned) - So(len(plr3.ScheduledTasks), ShouldEqual, 1) - So(plr3.ScheduledTasks[0].Name, ShouldEqual, "xenu") - So(plr3.ScheduledTasks[0].State, ShouldEqual, "Running") - - // cleanup for test perf reasons - removeTask(id, port) - }) - Convey("starts when created", func() { - port := getPort() - startAPI(port) - - uploadPlugin(DUMMY_PLUGIN_PATH2, port) - uploadPlugin(FILE_PLUGIN_PATH, port) - - r1 := createTask("1.json", "xenu", "1s", false, port) - So(r1.Body, ShouldHaveSameTypeAs, new(rbody.AddScheduledTask)) - plr1 := r1.Body.(*rbody.AddScheduledTask) - - id := plr1.ID - - r3 := getTasks(port) - So(r3.Body, ShouldHaveSameTypeAs, new(rbody.ScheduledTaskListReturned)) - plr3 := r3.Body.(*rbody.ScheduledTaskListReturned) - So(len(plr3.ScheduledTasks), ShouldEqual, 1) - So(plr3.ScheduledTasks[0].Name, ShouldEqual, "xenu") - So(plr3.ScheduledTasks[0].State, ShouldEqual, "Running") - - // cleanup for test perf reasons - removeTask(id, port) - }) - }) - - Convey("Stop Task - PUT - /v1/tasks/:id/stop", func() { - Convey("stops after being started", func() { - port := getPort() - startAPI(port) - - uploadPlugin(DUMMY_PLUGIN_PATH2, port) - uploadPlugin(FILE_PLUGIN_PATH, port) - - r1 := createTask("1.json", "yeti", "1s", true, port) - So(r1.Body, ShouldHaveSameTypeAs, new(rbody.AddScheduledTask)) - plr1 := r1.Body.(*rbody.AddScheduledTask) - - id := plr1.ID - - r2 := startTask(id, port) - So(r2.Body, ShouldHaveSameTypeAs, new(rbody.ScheduledTaskStarted)) - plr2 := r2.Body.(*rbody.ScheduledTaskStarted) - So(plr2.ID, ShouldEqual, id) - - r3 := getTasks(port) - So(r3.Body, ShouldHaveSameTypeAs, new(rbody.ScheduledTaskListReturned)) - plr3 := r3.Body.(*rbody.ScheduledTaskListReturned) - So(len(plr3.ScheduledTasks), ShouldEqual, 1) - So(plr3.ScheduledTasks[0].Name, ShouldEqual, "yeti") - So(plr3.ScheduledTasks[0].State, ShouldEqual, "Running") - - r4 := stopTask(id, port) - So(r4.Body, ShouldHaveSameTypeAs, new(rbody.ScheduledTaskStopped)) - plr4 := r4.Body.(*rbody.ScheduledTaskStopped) - So(plr4.ID, ShouldEqual, id) - - time.Sleep(1 * time.Second) - - r5 := getTasks(port) - So(r5.Body, ShouldHaveSameTypeAs, new(rbody.ScheduledTaskListReturned)) - plr5 := r5.Body.(*rbody.ScheduledTaskListReturned) - So(len(plr5.ScheduledTasks), ShouldEqual, 1) - So(plr5.ScheduledTasks[0].Name, ShouldEqual, "yeti") - So(plr5.ScheduledTasks[0].State, ShouldEqual, "Stopped") - }) - }) - - Convey("Remove Task - DELETE - /v1/tasks/:id", func() { - Convey("error on trying to remove unknown task", func() { - port := getPort() - startAPI(port) - - uuid := uuid.New() - r1 := removeTask(uuid, port) - So(r1.Body, ShouldHaveSameTypeAs, new(rbody.Error)) - plr1 := r1.Body.(*rbody.Error) - So(plr1.ErrorMessage, ShouldEqual, fmt.Sprintf("No task found with id '%s'", uuid)) - }) - Convey("removes a task", func() { port := getPort() startAPI(port) + pub := core.PublisherPluginType + col := core.CollectorPluginType + Convey("A global plugin config is added for all plugins", func() { + cdn := cdata.NewNode() + cdn.AddItem("password", ctypes.ConfigValueStr{"p@ssw0rd"}) + r := setPluginConfigItem(port, nil, "", "", cdn) + So(r.Body, ShouldHaveSameTypeAs, &rbody.SetPluginConfigItem{}) + r1 := r.Body.(*rbody.SetPluginConfigItem) + So(r1.Table()["password"], ShouldResemble, ctypes.ConfigValueStr{Value: "p@ssw0rd"}) + + r2 := getPluginConfigItem(port, &col, "", "") + So(r2.Body, ShouldHaveSameTypeAs, &rbody.PluginConfigItem{}) + r3 := r2.Body.(*rbody.PluginConfigItem) + So(len(r3.Table()), ShouldEqual, 1) + So(r3.Table()["password"], ShouldResemble, ctypes.ConfigValueStr{Value: "p@ssw0rd"}) + + Convey("A plugin config is added for all publishers", func() { + cdn := cdata.NewNode() + cdn.AddItem("user", ctypes.ConfigValueStr{"john"}) + r := setPluginConfigItem(port, &pub, "", "", cdn) + So(r.Body, ShouldHaveSameTypeAs, &rbody.SetPluginConfigItem{}) + r1 := r.Body.(*rbody.SetPluginConfigItem) + So(r1.Table()["user"], ShouldResemble, ctypes.ConfigValueStr{Value: "john"}) + So(len(r1.Table()), ShouldEqual, 2) + + Convey("A plugin config is added for all versions of a publisher", func() { + cdn := cdata.NewNode() + cdn.AddItem("path", ctypes.ConfigValueStr{"/usr/local/influxdb/bin"}) + r := setPluginConfigItem(port, &pub, "influxdb", "", cdn) + So(r.Body, ShouldHaveSameTypeAs, &rbody.SetPluginConfigItem{}) + r1 := r.Body.(*rbody.SetPluginConfigItem) + So(r1.Table()["path"], ShouldResemble, ctypes.ConfigValueStr{Value: "/usr/local/influxdb/bin"}) + So(len(r1.Table()), ShouldEqual, 3) + + Convey("A plugin config is added for a specific version of a publisher", func() { + cdn := cdata.NewNode() + cdn.AddItem("rate", ctypes.ConfigValueFloat{.8}) + r := setPluginConfigItem(port, &pub, "influxdb", "", cdn) + So(r.Body, ShouldHaveSameTypeAs, &rbody.SetPluginConfigItem{}) + r1 := r.Body.(*rbody.SetPluginConfigItem) + So(r1.Table()["rate"], ShouldResemble, ctypes.ConfigValueFloat{Value: .8}) + So(len(r1.Table()), ShouldEqual, 4) + + Convey("A global plugin config field is deleted", func() { + r := deletePluginConfigItem(port, nil, "", "", []string{"password"}) + So(r.Body, ShouldHaveSameTypeAs, &rbody.DeletePluginConfigItem{}) + r1 := r.Body.(*rbody.DeletePluginConfigItem) + So(len(r1.Table()), ShouldEqual, 0) + + r2 := setPluginConfigItem(port, &pub, "influxdb", "", cdn) + So(r2.Body, ShouldHaveSameTypeAs, &rbody.SetPluginConfigItem{}) + r3 := r2.Body.(*rbody.SetPluginConfigItem) + So(len(r3.Table()), ShouldEqual, 3) + }) + }) + }) + }) + }) - uploadPlugin(DUMMY_PLUGIN_PATH2, port) - uploadPlugin(FILE_PLUGIN_PATH, port) - - r1 := createTask("1.json", "yeti", "1s", true, port) - So(r1.Body, ShouldHaveSameTypeAs, new(rbody.AddScheduledTask)) - plr1 := r1.Body.(*rbody.AddScheduledTask) - - id := plr1.ID - - r2 := getTasks(port) - So(r2.Body, ShouldHaveSameTypeAs, new(rbody.ScheduledTaskListReturned)) - plr2 := r2.Body.(*rbody.ScheduledTaskListReturned) - So(len(plr2.ScheduledTasks), ShouldEqual, 1) - So(plr2.ScheduledTasks[0].Name, ShouldEqual, "yeti") - So(plr2.ScheduledTasks[0].State, ShouldEqual, "Stopped") - - r3 := removeTask(id, port) - So(r3.Body, ShouldHaveSameTypeAs, new(rbody.ScheduledTaskRemoved)) - plr3 := r3.Body.(*rbody.ScheduledTaskRemoved) - So(plr3.ID, ShouldEqual, id) - - r4 := getTasks(port) - So(r4.Body, ShouldHaveSameTypeAs, new(rbody.ScheduledTaskListReturned)) - plr4 := r4.Body.(*rbody.ScheduledTaskListReturned) - So(len(plr4.ScheduledTasks), ShouldEqual, 0) }) - }) - Convey("Watch task - get - /v1/tasks/:id/watch", func() { - Convey("---", func(c C) { + Convey("Plugin config is set at startup", func() { port := getPort() - startAPI(port) - - uploadPlugin(DUMMY_PLUGIN_PATH2, port) - uploadPlugin(FILE_PLUGIN_PATH, port) - - r1 := createTask("1.json", "xenu", "500ms", true, port) - So(r1.Meta.Code, ShouldEqual, 201) - So(r1.Body, ShouldHaveSameTypeAs, new(rbody.AddScheduledTask)) - plr1 := r1.Body.(*rbody.AddScheduledTask) - id := plr1.ID - - // Change buffer window to 10ms (do not do this IRL) - StreamingBufferWindow = 0.01 - r := watchTask(id, port) - time.Sleep(time.Millisecond * 100) - startTask(id, port) - type ea struct { - events []string - sync.Mutex - } - a := new(ea) - wait := make(chan struct{}) - go func() { - for { - select { - case e := <-r.eventChan: - a.Lock() - a.events = append(a.events, e) - if len(a.events) == 10 { - r.close() - } - a.Unlock() - case <-r.doneChan: - close(wait) - return - } - } - }() - <-wait - stopTask(id, port) - a.Lock() - So(len(a.events), ShouldEqual, 10) - a.Unlock() - So(a.events[0], ShouldEqual, "task-started") - for x := 1; x <= 9; x++ { - So(a.events[x], ShouldEqual, "metric-event") - } + cfg := control.NewConfig() + b, err := ioutil.ReadFile("../../examples/configs/pulse-config-sample.json") + So(err, ShouldBeNil) + json.Unmarshal(b, cfg) + startAPI(port, control.OptSetConfig(cfg)) + col := core.CollectorPluginType + + Convey("Gets the collector config by name and version", func() { + r := getPluginConfigItem(port, &col, "pcm", "1") + So(r.Body, ShouldHaveSameTypeAs, &rbody.PluginConfigItem{}) + r1 := r.Body.(*rbody.PluginConfigItem) + So(r1.Table()["path"], ShouldResemble, ctypes.ConfigValueStr{Value: "/usr/local/pcm/bin"}) + So(r1.Table()["user"], ShouldResemble, ctypes.ConfigValueStr{Value: "john"}) + So(len(r1.Table()), ShouldEqual, 6) + }) + Convey("Gets the config for a collector by name", func() { + r := getPluginConfigItem(port, &col, "pcm", "") + So(r.Body, ShouldHaveSameTypeAs, &rbody.PluginConfigItem{}) + r1 := r.Body.(*rbody.PluginConfigItem) + So(r1.Table()["path"], ShouldResemble, ctypes.ConfigValueStr{Value: "/usr/local/pcm/bin"}) + So(r1.Table()["user"], ShouldResemble, ctypes.ConfigValueStr{Value: "jane"}) + So(len(r1.Table()), ShouldEqual, 3) + }) + Convey("Gets the config for all collectors", func() { + r := getPluginConfigItem(port, &col, "", "") + So(r.Body, ShouldHaveSameTypeAs, &rbody.PluginConfigItem{}) + r1 := r.Body.(*rbody.PluginConfigItem) + So(r1.Table()["user"], ShouldResemble, ctypes.ConfigValueStr{Value: "jane"}) + So(r1.Table()["password"], ShouldResemble, ctypes.ConfigValueStr{Value: "p@ssw0rd"}) + So(len(r1.Table()), ShouldEqual, 2) + }) + Convey("Gets the config for all plugins", func() { + r := getPluginConfigItem(port, nil, "", "") + So(r.Body, ShouldHaveSameTypeAs, &rbody.PluginConfigItem{}) + r1 := r.Body.(*rbody.PluginConfigItem) + So(r1.Table()["password"], ShouldResemble, ctypes.ConfigValueStr{Value: "p@ssw0rd"}) + So(len(r1.Table()), ShouldEqual, 1) + }) }) }) diff --git a/mgmt/rest/server.go b/mgmt/rest/server.go index f6cc918f6..5339b847a 100644 --- a/mgmt/rest/server.go +++ b/mgmt/rest/server.go @@ -33,6 +33,7 @@ import ( "github.com/julienschmidt/httprouter" "github.com/intelsdi-x/pulse/core" + "github.com/intelsdi-x/pulse/core/cdata" "github.com/intelsdi-x/pulse/core/perror" "github.com/intelsdi-x/pulse/mgmt/rest/rbody" "github.com/intelsdi-x/pulse/mgmt/tribe/agreement" @@ -101,10 +102,20 @@ type managesTribe interface { GetMember(name string) *agreement.Member } +type managesConfig interface { + GetPluginConfigDataNode(core.PluginType, string, int) cdata.ConfigDataNode + GetPluginConfigDataNodeAll() cdata.ConfigDataNode + MergePluginConfigDataNode(pluginType core.PluginType, name string, ver int, cdn *cdata.ConfigDataNode) cdata.ConfigDataNode + MergePluginConfigDataNodeAll(cdn *cdata.ConfigDataNode) cdata.ConfigDataNode + DeletePluginConfigDataNodeField(pluginType core.PluginType, name string, ver int, fields ...string) cdata.ConfigDataNode + DeletePluginConfigDataNodeFieldAll(fields ...string) cdata.ConfigDataNode +} + type Server struct { mm managesMetrics mt managesTasks tr managesTribe + mc managesConfig n *negroni.Negroni r *httprouter.Router tls *tls @@ -159,6 +170,10 @@ func (s *Server) BindTribeManager(t managesTribe) { s.tr = t } +func (s *Server) BindConfigManager(c managesConfig) { + s.mc = c +} + func (s *Server) start(addrString string) { // plugin routes s.r.GET("/v1/plugins", s.getPlugins) @@ -167,6 +182,9 @@ func (s *Server) start(addrString string) { s.r.GET("/v1/plugins/:type/:name/:version", s.getPlugin) s.r.POST("/v1/plugins", s.loadPlugin) s.r.DELETE("/v1/plugins/:type/:name/:version", s.unloadPlugin) + s.r.GET("/v1/plugins/:type/:name/:version/config", s.getPluginConfigItem) + s.r.PUT("/v1/plugins/:type/:name/:version/config", s.setPluginConfigItem) + s.r.DELETE("/v1/plugins/:type/:name/:version/config", s.deletePluginConfigItem) // metric routes s.r.GET("/v1/metrics", s.getMetrics) diff --git a/plugin/collector/pulse-collector-facter/facter/facter.go b/plugin/collector/pulse-collector-facter/facter/facter.go deleted file mode 100644 index d55906532..000000000 --- a/plugin/collector/pulse-collector-facter/facter/facter.go +++ /dev/null @@ -1,219 +0,0 @@ -/* -http://www.apache.org/licenses/LICENSE-2.0.txt - - -Copyright 2015 Intel Coporation - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -/* This modules converts implements Pulse API to become an plugin. - -legend: -- metric - represents value of metric from Pulse side -- fact - represents a value about a system gathered from Facter -- name - is string identifier that refers to metric from the Pulse side, so name points to metric - - Implementation details: - - GetMetricTypes() - + - | +------------------+ - +----v---+ getFacts() | | - | Facter +-------------> ./facter --json | - +----^---+ | (goroutine) | - | +------------------+ - + - CollectMetrics() - - -*/ - -package facter - -import ( - "errors" - "fmt" - "os" - "strings" - "time" - - "github.com/intelsdi-x/pulse/control/plugin" - "github.com/intelsdi-x/pulse/control/plugin/cpolicy" -) - -const ( - // parts of returned namescape - vendor = "intel" - prefix = "facter" - // how long we are caching the date from external binary to prevent overuse of resources - defaultCacheTTL = 60 * time.Second - // how long are we going to cache available types of metrics - defaultMetricTypesTTL = defaultCacheTTL - // timeout we are ready to wait for external binary to gather the data - defaultFacterTimeout = 5 * time.Second -) - -/********** - * Facter * - **********/ - -// Facter implements API to communicate with Pulse -type Facter struct { - ttl time.Duration - // injects implementation for getting facts - defaults to use getFacts from cmd.go - // but allows to replace with fake during tests - getFacts func( - names []string, - facterTimeout time.Duration, - cmdConfig *cmdConfig, - ) (facts, error) - // how much time we are ready to wait for getting result from cmd - // is going to be passed to facterTimeout parameter in getFacts - facterTimeout time.Duration -} - -// make sure that we actually satisify requierd interface -var _ plugin.CollectorPlugin = (*Facter)(nil) - -// NewFacter constructs new Facter with default values -func NewFacter() *Facter { - return &Facter{ - // injection of default implementation for gathering facts from Facter - getFacts: getFacts, - facterTimeout: defaultFacterTimeout, - } -} - -// ------------ Pulse plugin interface implementation -------------- - -// GetMetricTypes returns available metrics types -func (f *Facter) GetMetricTypes(cfg plugin.PluginConfigType) ([]plugin.PluginMetricType, error) { - - // facts composed of entries - facts, err := f.getFacts( - nil, // ask for everything - f.facterTimeout, - nil, //default cmd configuration - ) - if err != nil { - if strings.Contains(err.Error(), "executable file not found") { - // Facter cannot be found. Since this is called on load we should - // not send an error as loading a plugin should not fail based on - // whether or not a dynamic path is set. - return []plugin.PluginMetricType{}, nil - } - return nil, err - } - - // capacity - we are going to return all the facts - metricTypes := make([]plugin.PluginMetricType, 0, len(facts)) - - // create types with given namespace - for name, _ := range facts { - namespace := createNamespace(name) - metricType := plugin.PluginMetricType{Namespace_: namespace} - metricTypes = append(metricTypes, metricType) - } - - return metricTypes, nil -} - -// Collect collects metrics from external binary a returns them in form -// acceptable by Pulse, only returns collects that were asked for and return nothing when asked for none -// the order of requested and received metrics isn't guaranted -func (f *Facter) CollectMetrics(metricTypes []plugin.PluginMetricType) ([]plugin.PluginMetricType, error) { - - // parse and check requested names of metrics - names := []string{} - for _, metricType := range metricTypes { - namespace := metricType.Namespace() - - err := validateNamespace(namespace) - if err != nil { - return nil, err - } - - // name of fact - last part of namespace - name := namespace[2] - names = append(names, name) - } - - if len(names) == 0 { - // nothing request, none returned - // !because returned by value, it would return nil slice - return nil, nil - } - - // facts composed of entries - facts, err := f.getFacts(names, f.facterTimeout, nil) - if err != nil { - return nil, err - } - - // make sure that recevied len of names equals asked - if len(facts) != len(names) { - return nil, errors.New("assertion: getFacts returns more/less than asked!") - } - - host, _ := os.Hostname() - // convert facts into PluginMetrics - metrics := make([]plugin.PluginMetricType, 0, len(facts)) - for name, value := range facts { - namespace := createNamespace(name) - metric := *plugin.NewPluginMetricType(namespace, time.Now(), host, value) - metrics = append(metrics, metric) - } - return metrics, nil -} - -func (f *Facter) GetConfigPolicy() (*cpolicy.ConfigPolicy, error) { - c := cpolicy.New() - rule, _ := cpolicy.NewStringRule("name", false, "bob") - rule2, _ := cpolicy.NewStringRule("password", true) - p := cpolicy.NewPolicyNode() - p.Add(rule) - p.Add(rule2) - c.Add([]string{"intel", "facter", "foo"}, p) - return c, nil -} - -// ------------ helper functions -------------- - -// validateNamespace checks namespace intel(vendor)/facter(prefix)/FACTNAME -func validateNamespace(namespace []string) error { - if len(namespace) != 3 { - return errors.New(fmt.Sprintf("unknown metricType %s (should containt just 3 segments)", namespace)) - } - if namespace[0] != vendor { - return errors.New(fmt.Sprintf("unknown metricType %s (expected vendor %s)", namespace, vendor)) - } - - if namespace[1] != prefix { - return errors.New(fmt.Sprintf("unknown metricType %s (expected prefix %s)", namespace, prefix)) - } - return nil -} - -// namspace returns namespace slice of strings -// composed from: vendor, prefix and fact name -func createNamespace(name string) []string { - return []string{vendor, prefix, name} - -} - -// helper type to deal with json values which additionally stores last update moment -type entry struct { - value interface{} - lastUpdate time.Time -} diff --git a/plugin/collector/pulse-collector-facter/facter/facter_test.go b/plugin/collector/pulse-collector-facter/facter/facter_test.go deleted file mode 100644 index 18bb7ef45..000000000 --- a/plugin/collector/pulse-collector-facter/facter/facter_test.go +++ /dev/null @@ -1,193 +0,0 @@ -// +build linux integration - -/* -http://www.apache.org/licenses/LICENSE-2.0.txt - - -Copyright 2015 Intel Coporation - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -/* -# testing -go test -v github.com/intelsdi-x/pulse/plugin/collector/pulse-collector-facter/facter -*/ -package facter - -import ( - "errors" - "strings" - "testing" - "time" - - "github.com/intelsdi-x/pulse/control/plugin" - "github.com/intelsdi-x/pulse/core/cdata" - . "github.com/smartystreets/goconvey/convey" -) - -// fact expected to be available on every system -// can be allways received from Facter for test purposes -const someFact = "kernel" -const someValue = "linux 1234" - -var existingNamespace = []string{vendor, prefix, someFact} - -func TestFacterCollectMetrics(t *testing.T) { - Convey("TestFacterCollect tests", t, func() { - - f := NewFacter() - // always return at least one metric - f.getFacts = func(_ []string, _ time.Duration, _ *cmdConfig) (facts, error) { - return facts{someFact: someValue}, nil - } - - Convey("asked for nothgin returns nothing", func() { - metricTypes := []plugin.PluginMetricType{} - metrics, err := f.CollectMetrics(metricTypes) - So(err, ShouldBeNil) - So(metrics, ShouldBeEmpty) - }) - - Convey("asked for somehting returns something", func() { - metricTypes := []plugin.PluginMetricType{ - plugin.PluginMetricType{ - Namespace_: existingNamespace, - }, - } - metrics, err := f.CollectMetrics(metricTypes) - So(err, ShouldBeNil) - So(metrics, ShouldNotBeEmpty) - So(len(metrics), ShouldEqual, 1) - - // check just one metric - metric := metrics[0] - So(metric.Namespace()[2], ShouldResemble, someFact) - So(metric.Data().(string), ShouldEqual, someValue) - }) - - Convey("ask for inappriopriate metrics", func() { - Convey("wrong number of parts", func() { - _, err := f.CollectMetrics( - []plugin.PluginMetricType{ - plugin.PluginMetricType{ - Namespace_: []string{"where are my other parts"}, - }, - }, - ) - So(err, ShouldNotBeNil) - So(err.Error(), ShouldContainSubstring, "segments") - }) - - Convey("wrong vendor", func() { - _, err := f.CollectMetrics( - []plugin.PluginMetricType{ - plugin.PluginMetricType{ - Namespace_: []string{"nonintelvendor", prefix, someFact}, - }, - }, - ) - So(err, ShouldNotBeNil) - So(err.Error(), ShouldContainSubstring, "expected vendor") - }) - - Convey("wrong prefix", func() { - _, err := f.CollectMetrics( - []plugin.PluginMetricType{ - plugin.PluginMetricType{ - Namespace_: []string{vendor, "this is wrong prefix", someFact}, - }, - }, - ) - So(err, ShouldNotBeNil) - So(err.Error(), ShouldContainSubstring, "expected prefix") - }) - - }) - }) -} - -func TestFacterInvalidBehavior(t *testing.T) { - - Convey("returns errors as expected when cmd isn't working", t, func() { - f := NewFacter() - // mock that getFacts returns error every time - f.getFacts = func(_ []string, _ time.Duration, _ *cmdConfig) (facts, error) { - return nil, errors.New("dummy error") - } - - _, err := f.CollectMetrics([]plugin.PluginMetricType{ - plugin.PluginMetricType{ - Namespace_: existingNamespace, - }, - }, - ) - So(err, ShouldNotBeNil) - - _, err = f.GetMetricTypes(plugin.PluginConfigType{ConfigDataNode: cdata.NewNode()}) - So(err, ShouldNotBeNil) - }) - Convey("returns not as much values as asked", t, func() { - - f := NewFacter() - // mock that getFacts returns error every time - //returns zero elements even when asked for one - f.getFacts = func(_ []string, _ time.Duration, _ *cmdConfig) (facts, error) { - return nil, nil - } - - _, err := f.CollectMetrics([]plugin.PluginMetricType{ - plugin.PluginMetricType{ - Namespace_: existingNamespace, - }, - }, - ) - So(err, ShouldNotBeNil) - So(err.Error(), ShouldContainSubstring, "more/less") - }) - -} - -func TestFacterGetMetricsTypes(t *testing.T) { - - Convey("GetMetricTypes functionallity", t, func() { - - f := NewFacter() - - Convey("GetMetricsTypes returns no error", func() { - // exectues without error - metricTypes, err := f.GetMetricTypes(plugin.PluginConfigType{ConfigDataNode: cdata.NewNode()}) - So(err, ShouldBeNil) - Convey("metricTypesReply should contain more than zero metrics", func() { - So(metricTypes, ShouldNotBeEmpty) - }) - - Convey("at least one metric contains metric namespace \"intel/facter/kernel\"", func() { - - expectedNamespaceStr := strings.Join(existingNamespace, "/") - - found := false - for _, metricType := range metricTypes { - // join because we cannot compare slices - if strings.Join(metricType.Namespace(), "/") == expectedNamespaceStr { - found = true - break - } - } - if !found { - t.Error("It was expected to find at least on intel/facter/kernel metricType (but it wasn't there)") - } - }) - }) - }) -} diff --git a/plugin/collector/pulse-collector-pcm/pcm/pcm.go b/plugin/collector/pulse-collector-pcm/pcm/pcm.go deleted file mode 100644 index fbf0f7247..000000000 --- a/plugin/collector/pulse-collector-pcm/pcm/pcm.go +++ /dev/null @@ -1,186 +0,0 @@ -/* -http://www.apache.org/licenses/LICENSE-2.0.txt - - -Copyright 2015 Intel Coporation - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package pcm - -import ( - "bufio" - "fmt" - "os" - "os/exec" - "path/filepath" - "strconv" - "strings" - "sync" - "time" - - "github.com/intelsdi-x/pulse/control/plugin" - "github.com/intelsdi-x/pulse/control/plugin/cpolicy" -) - -const ( - // Name of plugin - Name = "pcm" - // Version of plugin - Version = 1 - // Type of plugin - Type = plugin.CollectorPluginType -) - -// PCM -type PCM struct { - keys []string - data map[string]interface{} - mutex *sync.RWMutex -} - -func (p *PCM) Keys() []string { - return p.keys -} - -func (p *PCM) Data() map[string]interface{} { - return p.data -} - -// CollectMetrics returns metrics from pcm -func (p *PCM) CollectMetrics(mts []plugin.PluginMetricType) ([]plugin.PluginMetricType, error) { - metrics := make([]plugin.PluginMetricType, len(mts)) - p.mutex.RLock() - defer p.mutex.RUnlock() - hostname, _ := os.Hostname() - for i, m := range mts { - if v, ok := p.data[joinNamespace(m.Namespace())]; ok { - metrics[i] = plugin.PluginMetricType{ - Namespace_: m.Namespace(), - Data_: v, - Source_: hostname, - Timestamp_: time.Now(), - } - } - } - // fmt.Fprintf(os.Stderr, "collected >>> %+v\n", metrics) - return metrics, nil -} - -// GetMetricTypes returns the metric types exposed by pcm -func (p *PCM) GetMetricTypes(_ plugin.PluginConfigType) ([]plugin.PluginMetricType, error) { - mts := make([]plugin.PluginMetricType, len(p.keys)) - p.mutex.RLock() - defer p.mutex.RUnlock() - for i, k := range p.keys { - mts[i] = plugin.PluginMetricType{Namespace_: strings.Split(strings.TrimPrefix(k, "/"), "/")} - } - return mts, nil -} - -//GetConfigPolicy -func (p *PCM) GetConfigPolicy() (*cpolicy.ConfigPolicy, error) { - c := cpolicy.New() - return c, nil -} - -func New() (*PCM, error) { - pcm := &PCM{mutex: &sync.RWMutex{}, data: map[string]interface{}{}} - var cmd *exec.Cmd - if path := os.Getenv("PULSE_PCM_PATH"); path != "" { - cmd = exec.Command(filepath.Join(path, "pcm.x"), "/csv", "-nc", "-r", "1") - } else { - c, err := exec.LookPath("pcm.x") - if err != nil { - fmt.Fprint(os.Stderr, "Unable to find PCM. Ensure it's in your path or set PULSE_PCM_PATH.") - panic(err) - } - cmd = exec.Command(c, "/csv", "-nc", "-r", "1") - } - - cmdReader, err := cmd.StdoutPipe() - if err != nil { - fmt.Fprintln(os.Stderr, "Error creating StdoutPipe", err) - return nil, err - } - // read the data from stdout - scanner := bufio.NewScanner(cmdReader) - go func() { - first := true - for scanner.Scan() { - if first { - first = false - continue - } - if len(pcm.keys) == 0 { - pcm.mutex.Lock() - keys := strings.Split(strings.TrimSuffix(scanner.Text(), ";"), ";") - //skip the date and time fields - pcm.keys = make([]string, len(keys[2:])) - for i, k := range keys[2:] { - pcm.keys[i] = fmt.Sprintf("/intel/pcm/%s", k) - } - pcm.mutex.Unlock() - continue - } - - pcm.mutex.Lock() - datal := strings.Split(strings.TrimSuffix(scanner.Text(), ";"), ";") - for i, d := range datal[2:] { - v, err := strconv.ParseFloat(strings.TrimSpace(d), 64) - if err == nil { - pcm.data[pcm.keys[i]] = v - } else { - panic(err) - } - } - pcm.mutex.Unlock() - // fmt.Fprintf(os.Stderr, "data >>> %+v\n", pcm.data) - // fmt.Fprintf(os.Stdout, "data >>> %+v\n", pcm.data) - } - }() - - err = cmd.Start() - if err != nil { - fmt.Fprintln(os.Stderr, "Error starting pcm", err) - return nil, err - } - - // we need to wait until we have our metric types - st := time.Now() - for { - pcm.mutex.RLock() - c := len(pcm.keys) - pcm.mutex.RUnlock() - if c > 0 { - break - } - if time.Since(st) > time.Second*2 { - return nil, fmt.Errorf("Timed out waiting for metrics from pcm") - } - } - - // LEAVE the following block for debugging - // err = cmd.Wait() - // if err != nil { - // fmt.Fprintln(os.Stderr, "Error waiting for pcm", err) - // return nil, err - // } - - return pcm, nil -} - -func joinNamespace(ns []string) string { - return "/" + strings.Join(ns, "/") -} diff --git a/plugin/collector/pulse-collector-perfevents/perfevents/perfevents.go b/plugin/collector/pulse-collector-perfevents/perfevents/perfevents.go deleted file mode 100644 index 347b341da..000000000 --- a/plugin/collector/pulse-collector-perfevents/perfevents/perfevents.go +++ /dev/null @@ -1,282 +0,0 @@ -/* -http://www.apache.org/licenses/LICENSE-2.0.txt - - -Copyright 2015 Intel Coporation - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package perfevents - -import ( - "bufio" - "errors" - "fmt" - "os" - "os/exec" - "path/filepath" - "strconv" - "strings" - "time" - - "github.com/intelsdi-x/pulse/control/plugin" - "github.com/intelsdi-x/pulse/control/plugin/cpolicy" -) - -const ( - // Name of plugin - Name = "perfevents" - // Version of plugin - Version = 1 - // Type of plugin - Type = plugin.CollectorPluginType - // Namespace definition - ns_vendor = "intel" - ns_class = "linux" - ns_type = "perfevents" - ns_subtype = "cgroup" -) - -type event struct { - id string - etype string - value uint64 -} - -type Perfevents struct { - cgroup_events []event - Init func() error -} - -var CGROUP_EVENTS = []string{"cycles", "instructions", "cache-references", "cache-misses", - "branch-instructions", "branch-misses", "stalled-cycles-frontend", - "stalled-cycles-backend", "ref-cycles"} - -// CollectMetrics returns HW metrics from perf events subsystem -// for Cgroups present on the host. -func (p *Perfevents) CollectMetrics(mts []plugin.PluginMetricType) ([]plugin.PluginMetricType, error) { - if len(mts) == 0 { - return nil, nil - } - events := []string{} - cgroups := []string{} - - // Get list of events and cgroups from Namespace - // Replace "_" with "/" in cgroup name - for _, m := range mts { - err := validateNamespace(m.Namespace()) - if err != nil { - return nil, err - } - events = append(events, m.Namespace()[4]) - cgroups = append(cgroups, strings.Replace(m.Namespace()[5], "_", "/", -1)) - } - - // Prepare events (-e) and Cgroups (-G) switches for "perf stat" - cgroups_switch := "-G" + strings.Join(cgroups, ",") - events_switch := "-e" + strings.Join(events, ",") - - // Prepare "perf stat" command - cmd := exec.Command("perf", "stat", "--log-fd", "1", `-x;`, "-a", events_switch, cgroups_switch, "--", "sleep", "1") - - cmdReader, err := cmd.StdoutPipe() - if err != nil { - fmt.Fprintln(os.Stderr, "Error creating StdoutPipe", err) - return nil, err - } - - // Parse "perf stat" output - p.cgroup_events = make([]event, len(mts)) - scanner := bufio.NewScanner(cmdReader) - go func() { - for i := 0; scanner.Scan(); i++ { - line := strings.Split(scanner.Text(), ";") - value, err := strconv.ParseUint(line[0], 10, 64) - if err != nil { - fmt.Fprintln(os.Stderr, "Invalid metric value", err) - } - etype := line[2] - id := line[3] - e := event{id: id, etype: etype, value: value} - p.cgroup_events[i] = e - } - }() - - // Run command and wait (up to 2 secs) for completion - err = cmd.Start() - if err != nil { - fmt.Fprintln(os.Stderr, "Error starting perf stat", err) - return nil, err - } - - st := time.Now() - for { - if len(p.cgroup_events) == cap(p.cgroup_events) { - break - } - if time.Since(st) > time.Second*2 { - return nil, fmt.Errorf("Timed out waiting for metrics from perf stat") - } - } - err = cmd.Wait() - if err != nil { - fmt.Fprintln(os.Stderr, "Error waiting for perf stat", err) - return nil, err - } - - // Populate metrics - metrics := make([]plugin.PluginMetricType, len(mts)) - i := 0 - for _, m := range mts { - metric, err := populate_metric(m.Namespace(), p.cgroup_events[i]) - if err != nil { - return nil, err - } - metrics[i] = *metric - metrics[i].Source_, _ = os.Hostname() - i++ - } - return metrics, nil -} - -// GetMetricTypes returns the metric types exposed by perf events subsystem -func (p *Perfevents) GetMetricTypes(_ plugin.PluginConfigType) ([]plugin.PluginMetricType, error) { - err := p.Init() - if err != nil { - return nil, err - } - cgroups, err := list_cgroups() - if err != nil { - return nil, err - } - if len(cgroups) == 0 { - return nil, nil - } - mts := []plugin.PluginMetricType{} - mts = append(mts, set_supported_metrics(ns_subtype, cgroups, CGROUP_EVENTS)...) - - return mts, nil -} - -// GetConfigPolicy returns a ConfigPolicy -func (p *Perfevents) GetConfigPolicy() (*cpolicy.ConfigPolicy, error) { - c := cpolicy.New() - return c, nil -} - -// New initializes Perfevents plugin -func NewPerfevents() *Perfevents { - return &Perfevents{Init: initialize} -} - -func initialize() error { - file, err := os.Open("/proc/sys/kernel/perf_event_paranoid") - if err != nil { - if os.IsExist(err) { - return errors.New("perf_event_paranoid file exists but couldn't be opened") - } - return errors.New("perf event system not enabled") - } - - scanner := bufio.NewScanner(file) - if !scanner.Scan() { - return errors.New("cannot read from perf_event_paranoid") - } - - i, err := strconv.ParseInt(scanner.Text(), 10, 64) - if err != nil { - return errors.New("invalid value in perf_event_paranoid file") - } - - if i >= 1 { - return errors.New("insufficient perf event subsystem capabilities") - } - return nil -} - -func set_supported_metrics(source string, cgroups []string, events []string) []plugin.PluginMetricType { - mts := make([]plugin.PluginMetricType, len(events)*len(cgroups)) - for _, e := range events { - for _, c := range flatten_cg_name(cgroups) { - mts = append(mts, plugin.PluginMetricType{Namespace_: []string{ns_vendor, ns_class, ns_type, source, e, c}}) - } - } - return mts -} -func flatten_cg_name(cg []string) []string { - flat_cg := []string{} - for _, c := range cg { - flat_cg = append(flat_cg, strings.Replace(c, "/", "_", -1)) - } - return flat_cg -} - -func populate_metric(ns []string, e event) (*plugin.PluginMetricType, error) { - return &plugin.PluginMetricType{ - Namespace_: ns, - Data_: e.value, - Timestamp_: time.Now(), - }, nil -} - -func list_cgroups() ([]string, error) { - cgroups := []string{} - base_path := "/sys/fs/cgroup/perf_event/" - err := filepath.Walk(base_path, func(path string, info os.FileInfo, _ error) error { - if info.IsDir() { - cgroup_name := strings.TrimPrefix(path, base_path) - if len(cgroup_name) > 0 { - cgroups = append(cgroups, cgroup_name) - } - } - return nil - - }) - if err != nil { - return nil, err - } - return cgroups, nil -} - -func validateNamespace(namespace []string) error { - if len(namespace) != 6 { - return errors.New(fmt.Sprintf("unknown metricType %s (should containt exactly 6 segments)", namespace)) - } - if namespace[0] != ns_vendor { - return errors.New(fmt.Sprintf("unknown metricType %s (expected 1st segment %s)", namespace, ns_vendor)) - } - - if namespace[1] != ns_class { - return errors.New(fmt.Sprintf("unknown metricType %s (expected 2nd segment %s)", namespace, ns_class)) - } - if namespace[2] != ns_type { - return errors.New(fmt.Sprintf("unknown metricType %s (expected 3rd segment %s)", namespace, ns_type)) - } - if namespace[3] != ns_subtype { - return errors.New(fmt.Sprintf("unknown metricType %s (expected 4th segment %s)", namespace, ns_subtype)) - } - if !namespaceContains(namespace[4], CGROUP_EVENTS) { - return errors.New(fmt.Sprintf("unknown metricType %s (expected 5th segment %v)", namespace, CGROUP_EVENTS)) - } - return nil -} - -func namespaceContains(element string, slice []string) bool { - for _, v := range slice { - if v == element { - return true - } - } - return false -} diff --git a/plugin/collector/pulse-collector-perfevents/perfevents/perfevents_test.go b/plugin/collector/pulse-collector-perfevents/perfevents/perfevents_test.go deleted file mode 100644 index 80addeb9b..000000000 --- a/plugin/collector/pulse-collector-perfevents/perfevents/perfevents_test.go +++ /dev/null @@ -1,150 +0,0 @@ -/* -http://www.apache.org/licenses/LICENSE-2.0.txt - - -Copyright 2015 Intel Coporation - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package perfevents - -import ( - "errors" - "fmt" - "os" - "path" - "testing" - - "github.com/intelsdi-x/pulse/control/plugin" - "github.com/intelsdi-x/pulse/core/cdata" - "github.com/intelsdi-x/pulse/plugin/helper" - . "github.com/smartystreets/goconvey/convey" -) - -var ( - PluginName = "pulse-collector-perfevents" - PluginType = "collector" - PluginVersion = 1 - PulsePath = os.Getenv("PULSE_PATH") - PluginPath = path.Join(PulsePath, "plugin", PluginName) -) - -func TestPluginLoads(t *testing.T) { - // These tests only work if PULSE_PATH is known. - // It is the responsibility of the testing framework to - // build the plugins first into the build dir. - if PulsePath != "" { - // Helper plugin trigger build if possible for this plugin - helper.BuildPlugin(PluginType, PluginName) - // - Convey("GetMetricTypes functionality", t, func() { - p := NewPerfevents() - Convey("invalid init", func() { - p.Init = func() error { return errors.New("error") } - _, err := p.GetMetricTypes(plugin.PluginConfigType{cdata.NewNode()}) - So(err, ShouldNotBeNil) - }) - Convey("set_supported_metrics", func() { - cg := []string{"cgroup1", "cgroup2", "cgroup3"} - events := []string{"event1", "event2", "event3"} - a := set_supported_metrics(ns_subtype, cg, events) - So(a[len(a)-1].Namespace_, ShouldResemble, []string{ns_vendor, ns_class, ns_type, ns_subtype, "event3", "cgroup3"}) - }) - Convey("flatten cgroup name", func() { - cg := []string{"cg_root/cg_sub1/cg_sub2"} - events := []string{"event"} - a := set_supported_metrics(ns_subtype, cg, events) - So(a[len(a)-1].Namespace_, ShouldContain, "cg_root_cg_sub1_cg_sub2") - }) - }) - Convey("CollectMetrics error cases", t, func() { - p := NewPerfevents() - Convey("empty list of requested metrics", func() { - metricTypes := []plugin.PluginMetricType{} - metrics, err := p.CollectMetrics(metricTypes) - So(err, ShouldBeNil) - So(metrics, ShouldBeEmpty) - }) - Convey("namespace too short", func() { - _, err := p.CollectMetrics( - []plugin.PluginMetricType{ - plugin.PluginMetricType{ - Namespace_: []string{"invalid"}, - }, - }, - ) - So(err, ShouldNotBeNil) - So(err.Error(), ShouldContainSubstring, "segments") - }) - Convey("namespace wrong vendor", func() { - _, err := p.CollectMetrics( - []plugin.PluginMetricType{ - plugin.PluginMetricType{ - Namespace_: []string{"invalid", ns_class, ns_type, ns_subtype, "cycles", "A"}, - }, - }, - ) - So(err, ShouldNotBeNil) - So(err.Error(), ShouldContainSubstring, "1st") - }) - Convey("namespace wrong class", func() { - _, err := p.CollectMetrics( - []plugin.PluginMetricType{ - plugin.PluginMetricType{ - Namespace_: []string{ns_vendor, "invalid", ns_type, ns_subtype, "cycles", "A"}, - }, - }, - ) - So(err, ShouldNotBeNil) - So(err.Error(), ShouldContainSubstring, "2nd") - }) - Convey("namespace wrong type", func() { - _, err := p.CollectMetrics( - []plugin.PluginMetricType{ - plugin.PluginMetricType{ - Namespace_: []string{ns_vendor, ns_class, "invalid", ns_subtype, "cycles", "A"}, - }, - }, - ) - So(err, ShouldNotBeNil) - So(err.Error(), ShouldContainSubstring, "3rd") - }) - Convey("namespace wrong subtype", func() { - _, err := p.CollectMetrics( - []plugin.PluginMetricType{ - plugin.PluginMetricType{ - Namespace_: []string{ns_vendor, ns_class, ns_type, "invalid", "cycles", "A"}, - }, - }, - ) - So(err, ShouldNotBeNil) - So(err.Error(), ShouldContainSubstring, "4th") - }) - Convey("namespace wrong event", func() { - _, err := p.CollectMetrics( - []plugin.PluginMetricType{ - plugin.PluginMetricType{ - Namespace_: []string{ns_vendor, ns_class, ns_type, ns_subtype, "invalid", "A"}, - }, - }, - ) - So(err, ShouldNotBeNil) - So(err.Error(), ShouldContainSubstring, "5th") - }) - - }) - } else { - fmt.Printf("PULSE_PATH not set. Cannot test %s plugin.\n", PluginName) - } -} diff --git a/plugin/collector/pulse-collector-psutil/psutil/psutil.go b/plugin/collector/pulse-collector-psutil/psutil/psutil.go deleted file mode 100644 index 49ecab73f..000000000 --- a/plugin/collector/pulse-collector-psutil/psutil/psutil.go +++ /dev/null @@ -1,129 +0,0 @@ -/* -http://www.apache.org/licenses/LICENSE-2.0.txt - - -Copyright 2015 Intel Coporation - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package psutil - -import ( - "bytes" - "encoding/json" - "fmt" - "os" - "regexp" - "strings" - "time" - - "github.com/intelsdi-x/pulse/control/plugin" - "github.com/intelsdi-x/pulse/control/plugin/cpolicy" -) - -const ( - // Name of plugin - Name = "psutil" - // Version of plugin - Version = 1 - // Type of plugin - Type = plugin.CollectorPluginType -) - -type Psutil struct { -} - -// CollectMetrics returns metrics from gopsutil -func (p *Psutil) CollectMetrics(mts []plugin.PluginMetricType) ([]plugin.PluginMetricType, error) { - hostname, _ := os.Hostname() - metrics := make([]plugin.PluginMetricType, len(mts)) - loadre := regexp.MustCompile(`^/psutil/load/load[1,5,15]`) - cpure := regexp.MustCompile(`^/psutil/cpu.*/.*`) - memre := regexp.MustCompile(`^/psutil/vm/.*`) - netre := regexp.MustCompile(`^/psutil/net/.*`) - - for i, p := range mts { - ns := joinNamespace(p.Namespace()) - switch { - case loadre.MatchString(ns): - metric, err := loadAvg(p.Namespace()) - if err != nil { - return nil, err - } - metrics[i] = *metric - case cpure.MatchString(ns): - metric, err := cpuTimes(p.Namespace()) - if err != nil { - return nil, err - } - metrics[i] = *metric - case memre.MatchString(ns): - metric, err := virtualMemory(p.Namespace()) - if err != nil { - return nil, err - } - metrics[i] = *metric - case netre.MatchString(ns): - metric, err := netIOCounters(p.Namespace()) - if err != nil { - return nil, err - } - metrics[i] = *metric - } - metrics[i].Source_ = hostname - metrics[i].Timestamp_ = time.Now() - - } - return metrics, nil -} - -// GetMetricTypes returns the metric types exposed by gopsutil -func (p *Psutil) GetMetricTypes(_ plugin.PluginConfigType) ([]plugin.PluginMetricType, error) { - mts := []plugin.PluginMetricType{} - - mts = append(mts, getLoadAvgMetricTypes()...) - mts_, err := getCPUTimesMetricTypes() - if err != nil { - return nil, err - } - mts = append(mts, mts_...) - mts = append(mts, getVirtualMemoryMetricTypes()...) - mts_, err = getNetIOCounterMetricTypes() - if err != nil { - return nil, err - } - mts = append(mts, mts_...) - - return mts, nil -} - -//GetConfigPolicy returns a ConfigPolicy -func (p *Psutil) GetConfigPolicy() (*cpolicy.ConfigPolicy, error) { - c := cpolicy.New() - return c, nil -} - -func joinNamespace(ns []string) string { - return "/" + strings.Join(ns, "/") -} - -func prettyPrint(mts []plugin.PluginMetricType) error { - var out bytes.Buffer - mtsb, _, _ := plugin.MarshalPluginMetricTypes(plugin.PulseJSONContentType, mts) - if err := json.Indent(&out, mtsb, "", " "); err != nil { - return err - } - fmt.Println(out.String()) - return nil -} diff --git a/plugin/collector/pulse-collector-psutil/psutil/psutil_integration_test.go b/plugin/collector/pulse-collector-psutil/psutil/psutil_integration_test.go deleted file mode 100644 index b1a221752..000000000 --- a/plugin/collector/pulse-collector-psutil/psutil/psutil_integration_test.go +++ /dev/null @@ -1,67 +0,0 @@ -/* -http://www.apache.org/licenses/LICENSE-2.0.txt - - -Copyright 2015 Intel Coporation - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package psutil - -import ( - "runtime" - "testing" - - "github.com/intelsdi-x/pulse/control/plugin" - "github.com/intelsdi-x/pulse/core/cdata" - . "github.com/smartystreets/goconvey/convey" -) - -func TestPsutilCollectMetrics(t *testing.T) { - Convey("psutil collector", t, func() { - p := &Psutil{} - Convey("collect metrics", func() { - mts := []plugin.PluginMetricType{ - plugin.PluginMetricType{ - Namespace_: []string{"psutil", "load", "load1"}, - }, - plugin.PluginMetricType{ - Namespace_: []string{"psutil", "load", "load5"}, - }, - plugin.PluginMetricType{ - Namespace_: []string{"psutil", "load", "load15"}, - }, - plugin.PluginMetricType{ - Namespace_: []string{"psutil", "vm", "total"}, - }, - } - if runtime.GOOS != "darwin" { - mts = append(mts, plugin.PluginMetricType{ - Namespace_: []string{"psutil", "cpu0", "user"}, - }) - } - metrics, err := p.CollectMetrics(mts) - //prettyPrint(metrics) - So(err, ShouldBeNil) - So(metrics, ShouldNotBeNil) - }) - Convey("get metric types", func() { - mts, err := p.GetMetricTypes(plugin.PluginConfigType{cdata.NewNode()}) - //prettyPrint(mts) - So(err, ShouldBeNil) - So(mts, ShouldNotBeNil) - }) - - }) -} diff --git a/scripts/Dockerfile b/scripts/Dockerfile index d17c3bce6..25ba0b746 100644 --- a/scripts/Dockerfile +++ b/scripts/Dockerfile @@ -10,6 +10,7 @@ ADD . /go/src/github.com/intelsdi-x/pulse RUN go get github.com/tools/godep && \ go get golang.org/x/tools/cmd/goimports && \ go get golang.org/x/tools/cmd/vet && \ - go get golang.org/x/tools/cmd/cover + go get golang.org/x/tools/cmd/cover && \ + go get github.com/smartystreets/goconvey RUN scripts/deps.sh RUN make