diff --git a/control/config.go b/control/config.go new file mode 100644 index 000000000..83dc04f2f --- /dev/null +++ b/control/config.go @@ -0,0 +1,109 @@ +package control + +import ( + "fmt" + + "github.com/intelsdi-x/pulse/control/plugin" + "github.com/intelsdi-x/pulse/core/cdata" + "github.com/intelsdi-x/pulse/core/ctypes" +) + +// type configData map[string]*cdata.ConfigDataNode + +type pluginConfig struct { + all *cdata.ConfigDataNode + collector map[string]*pluginConfigItem + publisher map[string]*pluginConfigItem + processor map[string]*pluginConfigItem + pluginCache map[string]*cdata.ConfigDataNode +} + +type pluginConfigItem struct { + *cdata.ConfigDataNode + versions map[int]*cdata.ConfigDataNode +} + +type config struct { + control *cdata.ConfigDataNode + scheduler *cdata.ConfigDataNode + plugins *pluginConfig +} + +func newConfig() *config { + return &config{ + control: cdata.NewNode(), + scheduler: cdata.NewNode(), + plugins: newPluginConfig(), + } +} + +func newPluginConfig() *pluginConfig { + return &pluginConfig{ + all: cdata.NewNode(), + collector: make(map[string]*pluginConfigItem), + processor: make(map[string]*pluginConfigItem), + publisher: make(map[string]*pluginConfigItem), + pluginCache: make(map[string]*cdata.ConfigDataNode), + } +} + +func newPluginConfigItem(opts ...pluginConfigOpt) *pluginConfigItem { + p := &pluginConfigItem{ + ConfigDataNode: cdata.NewNode(), + versions: make(map[int]*cdata.ConfigDataNode), + } + + for _, opt := range opts { + opt(p) + } + + return p +} + +type pluginConfigOpt func(*pluginConfigItem) + +func optAddPluginConfigItem(key string, value ctypes.ConfigValue) pluginConfigOpt { + return func(p *pluginConfigItem) { + p.AddItem(key, value) + } +} + +func (p *pluginConfig) get(pluginType plugin.PluginType, name string, ver int) *cdata.ConfigDataNode { + // check cache + key := fmt.Sprintf("%d:%s:%d", pluginType, name, ver) + if res, ok := p.pluginCache[key]; ok { + return res + } + + //todo process/interpolate values + + p.pluginCache[key] = cdata.NewNode() + p.pluginCache[key].Merge(p.all) + + // check for plugin config + switch pluginType { + case plugin.CollectorPluginType: + if res, ok := p.collector[name]; ok { + p.pluginCache[key].Merge(res.ConfigDataNode) + if res2, ok2 := res.versions[ver]; ok2 { + p.pluginCache[key].Merge(res2) + } + } + case plugin.ProcessorPluginType: + if res, ok := p.processor[name]; ok { + p.pluginCache[key].Merge(res.ConfigDataNode) + if res2, ok2 := res.versions[ver]; ok2 { + p.pluginCache[key].Merge(res2) + } + } + case plugin.PublisherPluginType: + if res, ok := p.publisher[name]; ok { + p.pluginCache[key].Merge(res.ConfigDataNode) + if res2, ok2 := res.versions[ver]; ok2 { + p.pluginCache[key].Merge(res2) + } + } + } + + return p.pluginCache[key] +} diff --git a/control/config_test.go b/control/config_test.go new file mode 100644 index 000000000..0b339177b --- /dev/null +++ b/control/config_test.go @@ -0,0 +1,35 @@ +package control + +import ( + "testing" + + "github.com/intelsdi-x/pulse/control/plugin" + "github.com/intelsdi-x/pulse/core/cdata" + "github.com/intelsdi-x/pulse/core/ctypes" + . "github.com/smartystreets/goconvey/convey" +) + +func TestPluginConfig(t *testing.T) { + Convey("Given a plugin config", t, func() { + cfg := newConfig() + So(cfg, ShouldNotBeNil) + Convey("with an entry for ALL plugins", func() { + cfg.plugins.all.AddItem("gvar", ctypes.ConfigValueBool{Value: true}) + So(len(cfg.plugins.all.Table()), ShouldEqual, 1) + Convey("an entry for a specific plugin of any version", func() { + cfg.plugins.collector["test"] = newPluginConfigItem(optAddPluginConfigItem("pvar", ctypes.ConfigValueBool{Value: true})) + So(len(cfg.plugins.collector["test"].Table()), ShouldEqual, 1) + Convey("and an entry for a specific plugin of a specific version", func() { + cfg.plugins.collector["test"].versions[1] = cdata.NewNode() + cfg.plugins.collector["test"].versions[1].AddItem("vvar", ctypes.ConfigValueBool{Value: true}) + So(len(cfg.plugins.collector["test"].versions[1].Table()), ShouldEqual, 1) + Convey("we can get the merged conf for the given plugin", func() { + cd := cfg.plugins.get(plugin.CollectorPluginType, "test", 1) + So(len(cd.Table()), ShouldEqual, 3) + }) + }) + }) + }) + + }) +} diff --git a/control/control.go b/control/control.go index 7d3eccf2c..0c2142a9c 100644 --- a/control/control.go +++ b/control/control.go @@ -58,6 +58,7 @@ type pluginControl struct { // TODO, going to need coordination on changing of these RunningPlugins executablePlugins Started bool + config *config autodiscoverPaths []string eventManager *gomit.EventController @@ -129,6 +130,7 @@ func CacheExpiration(t time.Duration) controlOpt { func New(opts ...controlOpt) *pluginControl { c := &pluginControl{} + c.config = newConfig() // Initialize components // // Event Manager @@ -145,7 +147,7 @@ func New(opts ...controlOpt) *pluginControl { }).Debug("metric catalog created") // Plugin Manager - c.pluginManager = newPluginManager() + c.pluginManager = newPluginManager(OptSetPluginConfig(c.config.plugins)) controlLogger.WithFields(log.Fields{ "_block": "new", }).Debug("plugin manager created") @@ -680,6 +682,11 @@ func (p *pluginControl) CollectMetrics(metricTypes []core.Metric, deadline time. wg.Add(1) + // merge global plugin config into the config for the metric + for _, mt := range pmt.metricTypes { + mt.Config().Merge(p.config.plugins.get(plugin.CollectorPluginType, ap.Name(), ap.Version())) + } + // get a metrics go func(mt []core.Metric) { mts, err := cli.CollectMetrics(mt) @@ -747,7 +754,13 @@ func (p *pluginControl) PublishMetrics(contentType string, content []byte, plugi return []error{errors.New("unable to cast client to PluginPublisherClient")} } - errp := cli.Publish(contentType, content, config) + // merge global plugin config into the config for this request + cfg := p.config.plugins.get(plugin.PublisherPluginType, ap.Name(), ap.Version()).Table() + for k, v := range config { + cfg[k] = v + } + + errp := cli.Publish(contentType, content, cfg) if errp != nil { return []error{errp} } @@ -783,7 +796,14 @@ func (p *pluginControl) ProcessMetrics(contentType string, content []byte, plugi return "", nil, []error{errors.New("unable to cast client to PluginProcessorClient")} } - ct, c, errp := cli.Process(contentType, content, config) + // merge global plugin config into the config for this request + cfg := p.config.plugins.get(plugin.ProcessorPluginType, ap.Name(), ap.Version()).Table() + + for k, v := range config { + cfg[k] = v + } + + ct, c, errp := cli.Process(contentType, content, cfg) if errp != nil { return "", nil, []error{errp} } diff --git a/control/control_test.go b/control/control_test.go index 6d6a44199..c5c88a870 100644 --- a/control/control_test.go +++ b/control/control_test.go @@ -66,6 +66,7 @@ func (m *MockPluginManagerBadSwap) UnloadPlugin(c core.Plugin) (*loadedPlugin, p } func (m *MockPluginManagerBadSwap) get(string) (*loadedPlugin, error) { return nil, nil } func (m *MockPluginManagerBadSwap) teardown() {} +func (m *MockPluginManagerBadSwap) setPluginConfig(*pluginConfig) {} func (m *MockPluginManagerBadSwap) SetMetricCatalog(catalogsMetrics) {} func (m *MockPluginManagerBadSwap) SetEmitter(gomit.Emitter) {} func (m *MockPluginManagerBadSwap) GenerateArgs(string) plugin.Arg { return plugin.Arg{} } @@ -686,8 +687,15 @@ func TestCollectMetrics(t *testing.T) { c.pluginRunner.(*runner).monitor.duration = time.Millisecond * 100 c.Start() time.Sleep(100 * time.Millisecond) + + // Add a global plugin config + c.config.plugins.collector["dummy1"] = newPluginConfigItem(optAddPluginConfigItem("test", ctypes.ConfigValueBool{Value: true})) + // Load plugin c.Load(PluginPath) + mts, err := c.MetricCatalog() + So(err, ShouldBeNil) + So(len(mts), ShouldEqual, 3) cd := cdata.NewNode() cd.AddItem("password", ctypes.ConfigValueStr{Value: "testval"}) @@ -701,6 +709,10 @@ func TestCollectMetrics(t *testing.T) { namespace: []string{"intel", "dummy", "bar"}, cfg: cd, } + m3 := MockMetricType{ + namespace: []string{"intel", "dummy", "test"}, + cfg: cd, + } // retrieve loaded plugin lp, err := c.pluginManager.get("collector:dummy1:1") @@ -711,7 +723,7 @@ func TestCollectMetrics(t *testing.T) { pool.subscribe("1", unboundSubscriptionType) err = c.sendPluginSubscriptionEvent("1", lp) So(err, ShouldBeNil) - m = append(m, m1, m2) + m = append(m, m1, m2, m3) time.Sleep(time.Millisecond * 900) @@ -720,6 +732,7 @@ func TestCollectMetrics(t *testing.T) { So(err, ShouldBeNil) for i := range cr { So(cr[i].Data(), ShouldContainSubstring, "The dummy collected data!") + So(cr[i].Data(), ShouldContainSubstring, "test=true") } // fmt.Printf(" * Collect Response: %+v\n", cr) } @@ -838,6 +851,7 @@ func TestProcessMetrics(t *testing.T) { c.pluginRunner.(*runner).monitor.duration = time.Millisecond * 100 c.Start() time.Sleep(1 * time.Second) + c.config.plugins.processor["passthru"] = newPluginConfigItem(optAddPluginConfigItem("test", ctypes.ConfigValueBool{Value: true})) // Load plugin _, err := c.Load(path.Join(PulsePath, "plugin", "pulse-processor-passthru")) @@ -873,8 +887,13 @@ func TestProcessMetrics(t *testing.T) { enc := gob.NewEncoder(&buf) enc.Encode(metrics) contentType := plugin.PulseGOBContentType - cnt, ct, errs := c.ProcessMetrics(contentType, buf.Bytes(), "passthru", 1, n.Table()) - fmt.Printf("%v %v", cnt, ct) + _, ct, errs := c.ProcessMetrics(contentType, buf.Bytes(), "passthru", 1, n.Table()) + So(errs, ShouldBeEmpty) + mts := []plugin.PluginMetricType{} + dec := gob.NewDecoder(bytes.NewBuffer(ct)) + err := dec.Decode(&mts) + So(err, ShouldBeNil) + So(mts[0].Data_, ShouldEqual, 2) So(errs, ShouldBeNil) }) }) diff --git a/control/plugin/client/client.go b/control/plugin/client/client.go index 4cbd5c2b4..812f16fe6 100644 --- a/control/plugin/client/client.go +++ b/control/plugin/client/client.go @@ -20,6 +20,7 @@ limitations under the License. package client import ( + "github.com/intelsdi-x/pulse/control/plugin" "github.com/intelsdi-x/pulse/control/plugin/cpolicy" "github.com/intelsdi-x/pulse/core" "github.com/intelsdi-x/pulse/core/ctypes" @@ -37,7 +38,7 @@ type PluginClient interface { type PluginCollectorClient interface { PluginClient CollectMetrics([]core.Metric) ([]core.Metric, error) - GetMetricTypes() ([]core.Metric, error) + GetMetricTypes(plugin.PluginConfigType) ([]core.Metric, error) } // PluginProcessorClient A client providing processor specific plugin method calls. diff --git a/control/plugin/client/httpjsonrpc.go b/control/plugin/client/httpjsonrpc.go index 19fd011d5..007381016 100644 --- a/control/plugin/client/httpjsonrpc.go +++ b/control/plugin/client/httpjsonrpc.go @@ -200,8 +200,15 @@ func (h *httpJSONRPCClient) CollectMetrics(mts []core.Metric) ([]core.Metric, er } // GetMetricTypes returns metric types that can be collected -func (h *httpJSONRPCClient) GetMetricTypes() ([]core.Metric, error) { - res, err := h.call("Collector.GetMetricTypes", []interface{}{}) +func (h *httpJSONRPCClient) GetMetricTypes(config plugin.PluginConfigType) ([]core.Metric, error) { + args := plugin.GetMetricTypesArgs{PluginConfig: config} + + out, err := h.encoder.Encode(args) + if err != nil { + return nil, err + } + + res, err := h.call("Collector.GetMetricTypes", []interface{}{out}) if err != nil { return nil, err } @@ -322,5 +329,8 @@ func (h *httpJSONRPCClient) call(method string, args []interface{}) (*jsonRpcRes return nil, err } atomic.AddUint64(&h.id, 1) + if result.Error != "" { + return result, errors.New(result.Error) + } return result, nil } diff --git a/control/plugin/client/httpjsonrpc_test.go b/control/plugin/client/httpjsonrpc_test.go index c74e8d143..3d3a40e29 100644 --- a/control/plugin/client/httpjsonrpc_test.go +++ b/control/plugin/client/httpjsonrpc_test.go @@ -195,7 +195,8 @@ func TestHTTPJSONRPC(t *testing.T) { }) Convey("GetMetricTypes", func() { - mts, err := c.GetMetricTypes() + cfg := plugin.PluginConfigType{} + mts, err := c.GetMetricTypes(cfg) So(err, ShouldBeNil) So(mts, ShouldNotBeNil) So(mts, ShouldHaveSameTypeAs, []core.Metric{}) diff --git a/control/plugin/client/native.go b/control/plugin/client/native.go index a190505b9..11e4058f4 100644 --- a/control/plugin/client/native.go +++ b/control/plugin/client/native.go @@ -195,9 +195,17 @@ func (p *PluginNativeClient) CollectMetrics(coreMetricTypes []core.Metric) ([]co return fromCache, nil } -func (p *PluginNativeClient) GetMetricTypes() ([]core.Metric, error) { +func (p *PluginNativeClient) GetMetricTypes(config plugin.PluginConfigType) ([]core.Metric, error) { var reply []byte - err := p.connection.Call("Collector.GetMetricTypes", []byte{}, &reply) + + args := plugin.GetMetricTypesArgs{PluginConfig: config} + + out, err := p.encoder.Encode(args) + if err != nil { + return nil, err + } + + err = p.connection.Call("Collector.GetMetricTypes", out, &reply) if err != nil { return nil, err } @@ -272,6 +280,7 @@ func init() { gob.Register(*(&ctypes.ConfigValueStr{})) gob.Register(*(&ctypes.ConfigValueInt{})) gob.Register(*(&ctypes.ConfigValueFloat{})) + gob.Register(*(&ctypes.ConfigValueBool{})) gob.Register(cpolicy.NewPolicyNode()) gob.Register(&cpolicy.StringRule{}) diff --git a/control/plugin/collector.go b/control/plugin/collector.go index 8bd4b580b..4ee3b866a 100644 --- a/control/plugin/collector.go +++ b/control/plugin/collector.go @@ -26,5 +26,5 @@ package plugin type CollectorPlugin interface { Plugin CollectMetrics([]PluginMetricType) ([]PluginMetricType, error) - GetMetricTypes() ([]PluginMetricType, error) + GetMetricTypes(PluginConfigType) ([]PluginMetricType, error) } diff --git a/control/plugin/collector_proxy.go b/control/plugin/collector_proxy.go index 5bfdd0385..8430613ae 100644 --- a/control/plugin/collector_proxy.go +++ b/control/plugin/collector_proxy.go @@ -45,6 +45,7 @@ type CollectMetricsReply struct { // GetMetricTypesArgs args passed to GetMetricTypes type GetMetricTypesArgs struct { + PluginConfig PluginConfigType } // GetMetricTypesReply assigned by GetMetricTypes() implementation @@ -64,7 +65,10 @@ func (c *collectorPluginProxy) GetMetricTypes(args []byte, reply *[]byte) error // Reset heartbeat c.Session.ResetHeartbeat() - mts, err := c.Plugin.GetMetricTypes() + dargs := &GetMetricTypesArgs{} + c.Session.Decode(args, dargs) + + mts, err := c.Plugin.GetMetricTypes(dargs.PluginConfig) if err != nil { return errors.New(fmt.Sprintf("GetMetricTypes call error : %s", err.Error())) } diff --git a/control/plugin/collector_proxy_test.go b/control/plugin/collector_proxy_test.go index c14b114b6..d436a7457 100644 --- a/control/plugin/collector_proxy_test.go +++ b/control/plugin/collector_proxy_test.go @@ -40,7 +40,7 @@ var mockPluginMetricType []PluginMetricType = []PluginMetricType{ *NewPluginMetricType([]string{"foo", "baz"}, time.Now(), "", 2), } -func (p *mockPlugin) GetMetricTypes() ([]PluginMetricType, error) { +func (p *mockPlugin) GetMetricTypes(cfg PluginConfigType) ([]PluginMetricType, error) { return mockPluginMetricType, nil } @@ -64,7 +64,7 @@ func (p *mockPlugin) GetConfigPolicy() (*cpolicy.ConfigPolicy, error) { type mockErrorPlugin struct { } -func (p *mockErrorPlugin) GetMetricTypes() ([]PluginMetricType, error) { +func (p *mockErrorPlugin) GetMetricTypes(cfg PluginConfigType) ([]PluginMetricType, error) { return nil, errors.New("Error in get Metric Type") } diff --git a/control/plugin/collector_test.go b/control/plugin/collector_test.go index 4635825d5..1f5247f8f 100644 --- a/control/plugin/collector_test.go +++ b/control/plugin/collector_test.go @@ -38,7 +38,7 @@ func (f *MockPlugin) CollectMetrics(_ []PluginMetricType) ([]PluginMetricType, e return []PluginMetricType{}, nil } -func (c *MockPlugin) GetMetricTypes() ([]PluginMetricType, error) { +func (c *MockPlugin) GetMetricTypes(_ PluginConfigType) ([]PluginMetricType, error) { return []PluginMetricType{ PluginMetricType{Namespace_: []string{"foo", "bar"}}, }, nil diff --git a/control/plugin/metric.go b/control/plugin/metric.go index 0e819a5d7..39b8c6b95 100644 --- a/control/plugin/metric.go +++ b/control/plugin/metric.go @@ -45,6 +45,10 @@ const ( // PulseProtoBuff = "pulse.pb" // TO BE IMPLEMENTED ) +type PluginConfigType struct { + Data *cdata.ConfigDataNode `json:"config"` +} + // Represents a metric type. Only used within plugins and across plugin calls. // Converted to core.MetricType before being used within modules. type PluginMetricType struct { diff --git a/control/plugin/session.go b/control/plugin/session.go index 5faa88044..4f93e52a8 100644 --- a/control/plugin/session.go +++ b/control/plugin/session.go @@ -294,6 +294,7 @@ func init() { gob.Register(*(&ctypes.ConfigValueInt{})) gob.Register(*(&ctypes.ConfigValueStr{})) gob.Register(*(&ctypes.ConfigValueFloat{})) + gob.Register(*(&ctypes.ConfigValueBool{})) gob.Register(cpolicy.NewPolicyNode()) gob.Register(&cpolicy.StringRule{}) diff --git a/control/plugin_manager.go b/control/plugin_manager.go index 322234d96..abdc5d1a6 100644 --- a/control/plugin_manager.go +++ b/control/plugin_manager.go @@ -206,16 +206,31 @@ type pluginManager struct { metricCatalog catalogsMetrics loadedPlugins *loadedPlugins logPath string + pluginConfig *pluginConfig } -func newPluginManager() *pluginManager { +func newPluginManager(opts ...pluginManagerOpt) *pluginManager { p := &pluginManager{ loadedPlugins: newLoadedPlugins(), logPath: "/tmp", + pluginConfig: newPluginConfig(), } + + for _, opt := range opts { + opt(p) + } + return p } +type pluginManagerOpt func(*pluginManager) + +func OptSetPluginConfig(cf *pluginConfig) pluginManagerOpt { + return func(p *pluginManager) { + p.pluginConfig = cf + } +} + func (p *pluginManager) SetMetricCatalog(mc catalogsMetrics) { p.metricCatalog = mc } @@ -306,7 +321,11 @@ func (p *pluginManager) LoadPlugin(path string, emitter gomit.Emitter) (*loadedP colClient := ap.client.(client.PluginCollectorClient) // Get metric types - metricTypes, err := colClient.GetMetricTypes() + // TODO pass in the global config + cfg := plugin.PluginConfigType{ + Data: p.pluginConfig.get(resp.Type, resp.Meta.Name, resp.Meta.Version), + } + metricTypes, err := colClient.GetMetricTypes(cfg) if err != nil { pmLogger.WithFields(log.Fields{ "_block": "load-plugin", diff --git a/control/plugin_manager_test.go b/control/plugin_manager_test.go index 8f613d9da..0844626f0 100644 --- a/control/plugin_manager_test.go +++ b/control/plugin_manager_test.go @@ -29,6 +29,7 @@ import ( . "github.com/smartystreets/goconvey/convey" "github.com/intelsdi-x/pulse/control/plugin" + "github.com/intelsdi-x/pulse/core/ctypes" ) var ( @@ -89,6 +90,36 @@ func TestLoadPlugin(t *testing.T) { So(len(p.all()), ShouldBeGreaterThan, 0) }) + Convey("with a plugin config a plugin loads successfully", func() { + cfg := newConfig() + cfg.plugins.collector["dummy1"] = newPluginConfigItem(optAddPluginConfigItem("test", ctypes.ConfigValueBool{Value: true})) + p := newPluginManager(OptSetPluginConfig(cfg.plugins)) + p.SetMetricCatalog(newMetricCatalog()) + lp, err := p.LoadPlugin(PluginPath, nil) + + So(lp, ShouldHaveSameTypeAs, new(loadedPlugin)) + So(p.all(), ShouldNotBeEmpty) + So(err, ShouldBeNil) + So(len(p.all()), ShouldBeGreaterThan, 0) + mts, err := p.metricCatalog.Fetch([]string{}) + So(err, ShouldBeNil) + So(len(mts), ShouldBeGreaterThan, 2) + }) + + Convey("for a plugin requiring a config an incomplete config will result in a load failure", func() { + cfg := newConfig() + cfg.plugins.collector["dummy1"] = newPluginConfigItem(optAddPluginConfigItem("test-fail", ctypes.ConfigValueBool{Value: true})) + p := newPluginManager(OptSetPluginConfig(cfg.plugins)) + p.SetMetricCatalog(newMetricCatalog()) + lp, err := p.LoadPlugin(PluginPath, nil) + + So(lp, ShouldBeNil) + So(p.all(), ShouldBeEmpty) + So(err, ShouldNotBeNil) + So(err.Error(), ShouldContainSubstring, "missing on-load plugin config entry 'test'") + So(len(p.all()), ShouldEqual, 0) + }) + Convey("loads json-rpc plugin successfully", func() { p := newPluginManager() p.SetMetricCatalog(newMetricCatalog()) diff --git a/plugin/collector/pulse-collector-dummy1/dummy/dummy.go b/plugin/collector/pulse-collector-dummy1/dummy/dummy.go index 94cdddb69..b15cfaddf 100644 --- a/plugin/collector/pulse-collector-dummy1/dummy/dummy.go +++ b/plugin/collector/pulse-collector-dummy1/dummy/dummy.go @@ -26,6 +26,7 @@ import ( "github.com/intelsdi-x/pulse/control/plugin" "github.com/intelsdi-x/pulse/control/plugin/cpolicy" + "github.com/intelsdi-x/pulse/core/ctypes" ) const ( @@ -37,6 +38,9 @@ const ( Type = plugin.CollectorPluginType ) +// make sure that we actually satisify requierd interface +var _ plugin.CollectorPlugin = (*Dummy)(nil) + // Dummy collector implementation used for testing type Dummy struct { } @@ -45,7 +49,12 @@ type Dummy struct { func (f *Dummy) CollectMetrics(mts []plugin.PluginMetricType) ([]plugin.PluginMetricType, error) { metrics := make([]plugin.PluginMetricType, len(mts)) for i, p := range mts { - data := fmt.Sprintf("The dummy collected data! config data: user=%s password=%s", p.Config().Table()["name"], p.Config().Table()["name"]) + var data string + if cv, ok := p.Config().Table()["test"]; ok { + data = fmt.Sprintf("The dummy collected data! config data: user=%s password=%s test=%v", p.Config().Table()["user"], p.Config().Table()["password"], cv.(ctypes.ConfigValueBool).Value) + } else { + data = fmt.Sprintf("The dummy collected data! config data: user=%s password=%s", p.Config().Table()["user"], p.Config().Table()["password"]) + } metrics[i] = plugin.PluginMetricType{ Namespace_: p.Namespace(), Data_: data, @@ -57,10 +66,17 @@ func (f *Dummy) CollectMetrics(mts []plugin.PluginMetricType) ([]plugin.PluginMe } //GetMetricTypes returns metric types for testing -func (f *Dummy) GetMetricTypes() ([]plugin.PluginMetricType, error) { - m1 := &plugin.PluginMetricType{Namespace_: []string{"intel", "dummy", "foo"}} - m2 := &plugin.PluginMetricType{Namespace_: []string{"intel", "dummy", "bar"}} - return []plugin.PluginMetricType{*m1, *m2}, nil +func (f *Dummy) GetMetricTypes(cfg plugin.PluginConfigType) ([]plugin.PluginMetricType, error) { + mts := []plugin.PluginMetricType{} + if _, ok := cfg.Data.Table()["test-fail"]; ok { + return mts, fmt.Errorf("missing on-load plugin config entry 'test'") + } + if _, ok := cfg.Data.Table()["test"]; ok { + mts = append(mts, plugin.PluginMetricType{Namespace_: []string{"intel", "dummy", "test"}}) + } + mts = append(mts, plugin.PluginMetricType{Namespace_: []string{"intel", "dummy", "foo"}}) + mts = append(mts, plugin.PluginMetricType{Namespace_: []string{"intel", "dummy", "bar"}}) + return mts, nil } //GetConfigPolicy returns a ConfigPolicyTree for testing diff --git a/plugin/collector/pulse-collector-dummy2/dummy/dummy.go b/plugin/collector/pulse-collector-dummy2/dummy/dummy.go index 8ea5224f3..c8ff9b6b1 100644 --- a/plugin/collector/pulse-collector-dummy2/dummy/dummy.go +++ b/plugin/collector/pulse-collector-dummy2/dummy/dummy.go @@ -53,7 +53,7 @@ func (f *Dummy) CollectMetrics(mts []plugin.PluginMetricType) ([]plugin.PluginMe log.Println("collecting", p) } rand.Seed(time.Now().UTC().UnixNano()) - for i, _ := range mts { + for i := range mts { data := randInt(65, 90) mts[i].Data_ = data mts[i].Source_, _ = os.Hostname() @@ -63,7 +63,7 @@ func (f *Dummy) CollectMetrics(mts []plugin.PluginMetricType) ([]plugin.PluginMe } //GetMetricTypes returns metric types for testing -func (f *Dummy) GetMetricTypes() ([]plugin.PluginMetricType, error) { +func (f *Dummy) GetMetricTypes(cfg plugin.PluginConfigType) ([]plugin.PluginMetricType, error) { m1 := &plugin.PluginMetricType{Namespace_: []string{"intel", "dummy", "foo"}} m2 := &plugin.PluginMetricType{Namespace_: []string{"intel", "dummy", "bar"}} return []plugin.PluginMetricType{*m1, *m2}, nil diff --git a/plugin/collector/pulse-collector-facter/facter/facter.go b/plugin/collector/pulse-collector-facter/facter/facter.go new file mode 100644 index 000000000..d55906532 --- /dev/null +++ b/plugin/collector/pulse-collector-facter/facter/facter.go @@ -0,0 +1,219 @@ +/* +http://www.apache.org/licenses/LICENSE-2.0.txt + + +Copyright 2015 Intel Coporation + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +/* This modules converts implements Pulse API to become an plugin. + +legend: +- metric - represents value of metric from Pulse side +- fact - represents a value about a system gathered from Facter +- name - is string identifier that refers to metric from the Pulse side, so name points to metric + + Implementation details: + + GetMetricTypes() + + + | +------------------+ + +----v---+ getFacts() | | + | Facter +-------------> ./facter --json | + +----^---+ | (goroutine) | + | +------------------+ + + + CollectMetrics() + + +*/ + +package facter + +import ( + "errors" + "fmt" + "os" + "strings" + "time" + + "github.com/intelsdi-x/pulse/control/plugin" + "github.com/intelsdi-x/pulse/control/plugin/cpolicy" +) + +const ( + // parts of returned namescape + vendor = "intel" + prefix = "facter" + // how long we are caching the date from external binary to prevent overuse of resources + defaultCacheTTL = 60 * time.Second + // how long are we going to cache available types of metrics + defaultMetricTypesTTL = defaultCacheTTL + // timeout we are ready to wait for external binary to gather the data + defaultFacterTimeout = 5 * time.Second +) + +/********** + * Facter * + **********/ + +// Facter implements API to communicate with Pulse +type Facter struct { + ttl time.Duration + // injects implementation for getting facts - defaults to use getFacts from cmd.go + // but allows to replace with fake during tests + getFacts func( + names []string, + facterTimeout time.Duration, + cmdConfig *cmdConfig, + ) (facts, error) + // how much time we are ready to wait for getting result from cmd + // is going to be passed to facterTimeout parameter in getFacts + facterTimeout time.Duration +} + +// make sure that we actually satisify requierd interface +var _ plugin.CollectorPlugin = (*Facter)(nil) + +// NewFacter constructs new Facter with default values +func NewFacter() *Facter { + return &Facter{ + // injection of default implementation for gathering facts from Facter + getFacts: getFacts, + facterTimeout: defaultFacterTimeout, + } +} + +// ------------ Pulse plugin interface implementation -------------- + +// GetMetricTypes returns available metrics types +func (f *Facter) GetMetricTypes(cfg plugin.PluginConfigType) ([]plugin.PluginMetricType, error) { + + // facts composed of entries + facts, err := f.getFacts( + nil, // ask for everything + f.facterTimeout, + nil, //default cmd configuration + ) + if err != nil { + if strings.Contains(err.Error(), "executable file not found") { + // Facter cannot be found. Since this is called on load we should + // not send an error as loading a plugin should not fail based on + // whether or not a dynamic path is set. + return []plugin.PluginMetricType{}, nil + } + return nil, err + } + + // capacity - we are going to return all the facts + metricTypes := make([]plugin.PluginMetricType, 0, len(facts)) + + // create types with given namespace + for name, _ := range facts { + namespace := createNamespace(name) + metricType := plugin.PluginMetricType{Namespace_: namespace} + metricTypes = append(metricTypes, metricType) + } + + return metricTypes, nil +} + +// Collect collects metrics from external binary a returns them in form +// acceptable by Pulse, only returns collects that were asked for and return nothing when asked for none +// the order of requested and received metrics isn't guaranted +func (f *Facter) CollectMetrics(metricTypes []plugin.PluginMetricType) ([]plugin.PluginMetricType, error) { + + // parse and check requested names of metrics + names := []string{} + for _, metricType := range metricTypes { + namespace := metricType.Namespace() + + err := validateNamespace(namespace) + if err != nil { + return nil, err + } + + // name of fact - last part of namespace + name := namespace[2] + names = append(names, name) + } + + if len(names) == 0 { + // nothing request, none returned + // !because returned by value, it would return nil slice + return nil, nil + } + + // facts composed of entries + facts, err := f.getFacts(names, f.facterTimeout, nil) + if err != nil { + return nil, err + } + + // make sure that recevied len of names equals asked + if len(facts) != len(names) { + return nil, errors.New("assertion: getFacts returns more/less than asked!") + } + + host, _ := os.Hostname() + // convert facts into PluginMetrics + metrics := make([]plugin.PluginMetricType, 0, len(facts)) + for name, value := range facts { + namespace := createNamespace(name) + metric := *plugin.NewPluginMetricType(namespace, time.Now(), host, value) + metrics = append(metrics, metric) + } + return metrics, nil +} + +func (f *Facter) GetConfigPolicy() (*cpolicy.ConfigPolicy, error) { + c := cpolicy.New() + rule, _ := cpolicy.NewStringRule("name", false, "bob") + rule2, _ := cpolicy.NewStringRule("password", true) + p := cpolicy.NewPolicyNode() + p.Add(rule) + p.Add(rule2) + c.Add([]string{"intel", "facter", "foo"}, p) + return c, nil +} + +// ------------ helper functions -------------- + +// validateNamespace checks namespace intel(vendor)/facter(prefix)/FACTNAME +func validateNamespace(namespace []string) error { + if len(namespace) != 3 { + return errors.New(fmt.Sprintf("unknown metricType %s (should containt just 3 segments)", namespace)) + } + if namespace[0] != vendor { + return errors.New(fmt.Sprintf("unknown metricType %s (expected vendor %s)", namespace, vendor)) + } + + if namespace[1] != prefix { + return errors.New(fmt.Sprintf("unknown metricType %s (expected prefix %s)", namespace, prefix)) + } + return nil +} + +// namspace returns namespace slice of strings +// composed from: vendor, prefix and fact name +func createNamespace(name string) []string { + return []string{vendor, prefix, name} + +} + +// helper type to deal with json values which additionally stores last update moment +type entry struct { + value interface{} + lastUpdate time.Time +} diff --git a/plugin/collector/pulse-collector-facter/facter/facter_test.go b/plugin/collector/pulse-collector-facter/facter/facter_test.go new file mode 100644 index 000000000..fca671ea6 --- /dev/null +++ b/plugin/collector/pulse-collector-facter/facter/facter_test.go @@ -0,0 +1,193 @@ +// +build linux integration + +/* +http://www.apache.org/licenses/LICENSE-2.0.txt + + +Copyright 2015 Intel Coporation + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +/* +# testing +go test -v github.com/intelsdi-x/pulse/plugin/collector/pulse-collector-facter/facter +*/ +package facter + +import ( + "errors" + "strings" + "testing" + "time" + + "github.com/intelsdi-x/pulse/control/plugin" + "github.com/intelsdi-x/pulse/core/cdata" + . "github.com/smartystreets/goconvey/convey" +) + +// fact expected to be available on every system +// can be allways received from Facter for test purposes +const someFact = "kernel" +const someValue = "linux 1234" + +var existingNamespace = []string{vendor, prefix, someFact} + +func TestFacterCollectMetrics(t *testing.T) { + Convey("TestFacterCollect tests", t, func() { + + f := NewFacter() + // always return at least one metric + f.getFacts = func(_ []string, _ time.Duration, _ *cmdConfig) (facts, error) { + return facts{someFact: someValue}, nil + } + + Convey("asked for nothgin returns nothing", func() { + metricTypes := []plugin.PluginMetricType{} + metrics, err := f.CollectMetrics(metricTypes) + So(err, ShouldBeNil) + So(metrics, ShouldBeEmpty) + }) + + Convey("asked for somehting returns something", func() { + metricTypes := []plugin.PluginMetricType{ + plugin.PluginMetricType{ + Namespace_: existingNamespace, + }, + } + metrics, err := f.CollectMetrics(metricTypes) + So(err, ShouldBeNil) + So(metrics, ShouldNotBeEmpty) + So(len(metrics), ShouldEqual, 1) + + // check just one metric + metric := metrics[0] + So(metric.Namespace()[2], ShouldResemble, someFact) + So(metric.Data().(string), ShouldEqual, someValue) + }) + + Convey("ask for inappriopriate metrics", func() { + Convey("wrong number of parts", func() { + _, err := f.CollectMetrics( + []plugin.PluginMetricType{ + plugin.PluginMetricType{ + Namespace_: []string{"where are my other parts"}, + }, + }, + ) + So(err, ShouldNotBeNil) + So(err.Error(), ShouldContainSubstring, "segments") + }) + + Convey("wrong vendor", func() { + _, err := f.CollectMetrics( + []plugin.PluginMetricType{ + plugin.PluginMetricType{ + Namespace_: []string{"nonintelvendor", prefix, someFact}, + }, + }, + ) + So(err, ShouldNotBeNil) + So(err.Error(), ShouldContainSubstring, "expected vendor") + }) + + Convey("wrong prefix", func() { + _, err := f.CollectMetrics( + []plugin.PluginMetricType{ + plugin.PluginMetricType{ + Namespace_: []string{vendor, "this is wrong prefix", someFact}, + }, + }, + ) + So(err, ShouldNotBeNil) + So(err.Error(), ShouldContainSubstring, "expected prefix") + }) + + }) + }) +} + +func TestFacterInvalidBehavior(t *testing.T) { + + Convey("returns errors as expected when cmd isn't working", t, func() { + f := NewFacter() + // mock that getFacts returns error every time + f.getFacts = func(_ []string, _ time.Duration, _ *cmdConfig) (facts, error) { + return nil, errors.New("dummy error") + } + + _, err := f.CollectMetrics([]plugin.PluginMetricType{ + plugin.PluginMetricType{ + Namespace_: existingNamespace, + }, + }, + ) + So(err, ShouldNotBeNil) + + _, err = f.GetMetricTypes(plugin.PluginConfigType{Data: cdata.NewNode()}) + So(err, ShouldNotBeNil) + }) + Convey("returns not as much values as asked", t, func() { + + f := NewFacter() + // mock that getFacts returns error every time + //returns zero elements even when asked for one + f.getFacts = func(_ []string, _ time.Duration, _ *cmdConfig) (facts, error) { + return nil, nil + } + + _, err := f.CollectMetrics([]plugin.PluginMetricType{ + plugin.PluginMetricType{ + Namespace_: existingNamespace, + }, + }, + ) + So(err, ShouldNotBeNil) + So(err.Error(), ShouldContainSubstring, "more/less") + }) + +} + +func TestFacterGetMetricsTypes(t *testing.T) { + + Convey("GetMetricTypes functionallity", t, func() { + + f := NewFacter() + + Convey("GetMetricsTypes returns no error", func() { + // exectues without error + metricTypes, err := f.GetMetricTypes(plugin.PluginConfigType{Data: cdata.NewNode()}) + So(err, ShouldBeNil) + Convey("metricTypesReply should contain more than zero metrics", func() { + So(metricTypes, ShouldNotBeEmpty) + }) + + Convey("at least one metric contains metric namespace \"intel/facter/kernel\"", func() { + + expectedNamespaceStr := strings.Join(existingNamespace, "/") + + found := false + for _, metricType := range metricTypes { + // join because we cannot compare slices + if strings.Join(metricType.Namespace(), "/") == expectedNamespaceStr { + found = true + break + } + } + if !found { + t.Error("It was expected to find at least on intel/facter/kernel metricType (but it wasn't there)") + } + }) + }) + }) +} diff --git a/plugin/collector/pulse-collector-pcm/pcm/pcm.go b/plugin/collector/pulse-collector-pcm/pcm/pcm.go new file mode 100644 index 000000000..fbf0f7247 --- /dev/null +++ b/plugin/collector/pulse-collector-pcm/pcm/pcm.go @@ -0,0 +1,186 @@ +/* +http://www.apache.org/licenses/LICENSE-2.0.txt + + +Copyright 2015 Intel Coporation + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package pcm + +import ( + "bufio" + "fmt" + "os" + "os/exec" + "path/filepath" + "strconv" + "strings" + "sync" + "time" + + "github.com/intelsdi-x/pulse/control/plugin" + "github.com/intelsdi-x/pulse/control/plugin/cpolicy" +) + +const ( + // Name of plugin + Name = "pcm" + // Version of plugin + Version = 1 + // Type of plugin + Type = plugin.CollectorPluginType +) + +// PCM +type PCM struct { + keys []string + data map[string]interface{} + mutex *sync.RWMutex +} + +func (p *PCM) Keys() []string { + return p.keys +} + +func (p *PCM) Data() map[string]interface{} { + return p.data +} + +// CollectMetrics returns metrics from pcm +func (p *PCM) CollectMetrics(mts []plugin.PluginMetricType) ([]plugin.PluginMetricType, error) { + metrics := make([]plugin.PluginMetricType, len(mts)) + p.mutex.RLock() + defer p.mutex.RUnlock() + hostname, _ := os.Hostname() + for i, m := range mts { + if v, ok := p.data[joinNamespace(m.Namespace())]; ok { + metrics[i] = plugin.PluginMetricType{ + Namespace_: m.Namespace(), + Data_: v, + Source_: hostname, + Timestamp_: time.Now(), + } + } + } + // fmt.Fprintf(os.Stderr, "collected >>> %+v\n", metrics) + return metrics, nil +} + +// GetMetricTypes returns the metric types exposed by pcm +func (p *PCM) GetMetricTypes(_ plugin.PluginConfigType) ([]plugin.PluginMetricType, error) { + mts := make([]plugin.PluginMetricType, len(p.keys)) + p.mutex.RLock() + defer p.mutex.RUnlock() + for i, k := range p.keys { + mts[i] = plugin.PluginMetricType{Namespace_: strings.Split(strings.TrimPrefix(k, "/"), "/")} + } + return mts, nil +} + +//GetConfigPolicy +func (p *PCM) GetConfigPolicy() (*cpolicy.ConfigPolicy, error) { + c := cpolicy.New() + return c, nil +} + +func New() (*PCM, error) { + pcm := &PCM{mutex: &sync.RWMutex{}, data: map[string]interface{}{}} + var cmd *exec.Cmd + if path := os.Getenv("PULSE_PCM_PATH"); path != "" { + cmd = exec.Command(filepath.Join(path, "pcm.x"), "/csv", "-nc", "-r", "1") + } else { + c, err := exec.LookPath("pcm.x") + if err != nil { + fmt.Fprint(os.Stderr, "Unable to find PCM. Ensure it's in your path or set PULSE_PCM_PATH.") + panic(err) + } + cmd = exec.Command(c, "/csv", "-nc", "-r", "1") + } + + cmdReader, err := cmd.StdoutPipe() + if err != nil { + fmt.Fprintln(os.Stderr, "Error creating StdoutPipe", err) + return nil, err + } + // read the data from stdout + scanner := bufio.NewScanner(cmdReader) + go func() { + first := true + for scanner.Scan() { + if first { + first = false + continue + } + if len(pcm.keys) == 0 { + pcm.mutex.Lock() + keys := strings.Split(strings.TrimSuffix(scanner.Text(), ";"), ";") + //skip the date and time fields + pcm.keys = make([]string, len(keys[2:])) + for i, k := range keys[2:] { + pcm.keys[i] = fmt.Sprintf("/intel/pcm/%s", k) + } + pcm.mutex.Unlock() + continue + } + + pcm.mutex.Lock() + datal := strings.Split(strings.TrimSuffix(scanner.Text(), ";"), ";") + for i, d := range datal[2:] { + v, err := strconv.ParseFloat(strings.TrimSpace(d), 64) + if err == nil { + pcm.data[pcm.keys[i]] = v + } else { + panic(err) + } + } + pcm.mutex.Unlock() + // fmt.Fprintf(os.Stderr, "data >>> %+v\n", pcm.data) + // fmt.Fprintf(os.Stdout, "data >>> %+v\n", pcm.data) + } + }() + + err = cmd.Start() + if err != nil { + fmt.Fprintln(os.Stderr, "Error starting pcm", err) + return nil, err + } + + // we need to wait until we have our metric types + st := time.Now() + for { + pcm.mutex.RLock() + c := len(pcm.keys) + pcm.mutex.RUnlock() + if c > 0 { + break + } + if time.Since(st) > time.Second*2 { + return nil, fmt.Errorf("Timed out waiting for metrics from pcm") + } + } + + // LEAVE the following block for debugging + // err = cmd.Wait() + // if err != nil { + // fmt.Fprintln(os.Stderr, "Error waiting for pcm", err) + // return nil, err + // } + + return pcm, nil +} + +func joinNamespace(ns []string) string { + return "/" + strings.Join(ns, "/") +} diff --git a/plugin/collector/pulse-collector-perfevents/perfevents/perfevents.go b/plugin/collector/pulse-collector-perfevents/perfevents/perfevents.go new file mode 100644 index 000000000..347b341da --- /dev/null +++ b/plugin/collector/pulse-collector-perfevents/perfevents/perfevents.go @@ -0,0 +1,282 @@ +/* +http://www.apache.org/licenses/LICENSE-2.0.txt + + +Copyright 2015 Intel Coporation + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package perfevents + +import ( + "bufio" + "errors" + "fmt" + "os" + "os/exec" + "path/filepath" + "strconv" + "strings" + "time" + + "github.com/intelsdi-x/pulse/control/plugin" + "github.com/intelsdi-x/pulse/control/plugin/cpolicy" +) + +const ( + // Name of plugin + Name = "perfevents" + // Version of plugin + Version = 1 + // Type of plugin + Type = plugin.CollectorPluginType + // Namespace definition + ns_vendor = "intel" + ns_class = "linux" + ns_type = "perfevents" + ns_subtype = "cgroup" +) + +type event struct { + id string + etype string + value uint64 +} + +type Perfevents struct { + cgroup_events []event + Init func() error +} + +var CGROUP_EVENTS = []string{"cycles", "instructions", "cache-references", "cache-misses", + "branch-instructions", "branch-misses", "stalled-cycles-frontend", + "stalled-cycles-backend", "ref-cycles"} + +// CollectMetrics returns HW metrics from perf events subsystem +// for Cgroups present on the host. +func (p *Perfevents) CollectMetrics(mts []plugin.PluginMetricType) ([]plugin.PluginMetricType, error) { + if len(mts) == 0 { + return nil, nil + } + events := []string{} + cgroups := []string{} + + // Get list of events and cgroups from Namespace + // Replace "_" with "/" in cgroup name + for _, m := range mts { + err := validateNamespace(m.Namespace()) + if err != nil { + return nil, err + } + events = append(events, m.Namespace()[4]) + cgroups = append(cgroups, strings.Replace(m.Namespace()[5], "_", "/", -1)) + } + + // Prepare events (-e) and Cgroups (-G) switches for "perf stat" + cgroups_switch := "-G" + strings.Join(cgroups, ",") + events_switch := "-e" + strings.Join(events, ",") + + // Prepare "perf stat" command + cmd := exec.Command("perf", "stat", "--log-fd", "1", `-x;`, "-a", events_switch, cgroups_switch, "--", "sleep", "1") + + cmdReader, err := cmd.StdoutPipe() + if err != nil { + fmt.Fprintln(os.Stderr, "Error creating StdoutPipe", err) + return nil, err + } + + // Parse "perf stat" output + p.cgroup_events = make([]event, len(mts)) + scanner := bufio.NewScanner(cmdReader) + go func() { + for i := 0; scanner.Scan(); i++ { + line := strings.Split(scanner.Text(), ";") + value, err := strconv.ParseUint(line[0], 10, 64) + if err != nil { + fmt.Fprintln(os.Stderr, "Invalid metric value", err) + } + etype := line[2] + id := line[3] + e := event{id: id, etype: etype, value: value} + p.cgroup_events[i] = e + } + }() + + // Run command and wait (up to 2 secs) for completion + err = cmd.Start() + if err != nil { + fmt.Fprintln(os.Stderr, "Error starting perf stat", err) + return nil, err + } + + st := time.Now() + for { + if len(p.cgroup_events) == cap(p.cgroup_events) { + break + } + if time.Since(st) > time.Second*2 { + return nil, fmt.Errorf("Timed out waiting for metrics from perf stat") + } + } + err = cmd.Wait() + if err != nil { + fmt.Fprintln(os.Stderr, "Error waiting for perf stat", err) + return nil, err + } + + // Populate metrics + metrics := make([]plugin.PluginMetricType, len(mts)) + i := 0 + for _, m := range mts { + metric, err := populate_metric(m.Namespace(), p.cgroup_events[i]) + if err != nil { + return nil, err + } + metrics[i] = *metric + metrics[i].Source_, _ = os.Hostname() + i++ + } + return metrics, nil +} + +// GetMetricTypes returns the metric types exposed by perf events subsystem +func (p *Perfevents) GetMetricTypes(_ plugin.PluginConfigType) ([]plugin.PluginMetricType, error) { + err := p.Init() + if err != nil { + return nil, err + } + cgroups, err := list_cgroups() + if err != nil { + return nil, err + } + if len(cgroups) == 0 { + return nil, nil + } + mts := []plugin.PluginMetricType{} + mts = append(mts, set_supported_metrics(ns_subtype, cgroups, CGROUP_EVENTS)...) + + return mts, nil +} + +// GetConfigPolicy returns a ConfigPolicy +func (p *Perfevents) GetConfigPolicy() (*cpolicy.ConfigPolicy, error) { + c := cpolicy.New() + return c, nil +} + +// New initializes Perfevents plugin +func NewPerfevents() *Perfevents { + return &Perfevents{Init: initialize} +} + +func initialize() error { + file, err := os.Open("/proc/sys/kernel/perf_event_paranoid") + if err != nil { + if os.IsExist(err) { + return errors.New("perf_event_paranoid file exists but couldn't be opened") + } + return errors.New("perf event system not enabled") + } + + scanner := bufio.NewScanner(file) + if !scanner.Scan() { + return errors.New("cannot read from perf_event_paranoid") + } + + i, err := strconv.ParseInt(scanner.Text(), 10, 64) + if err != nil { + return errors.New("invalid value in perf_event_paranoid file") + } + + if i >= 1 { + return errors.New("insufficient perf event subsystem capabilities") + } + return nil +} + +func set_supported_metrics(source string, cgroups []string, events []string) []plugin.PluginMetricType { + mts := make([]plugin.PluginMetricType, len(events)*len(cgroups)) + for _, e := range events { + for _, c := range flatten_cg_name(cgroups) { + mts = append(mts, plugin.PluginMetricType{Namespace_: []string{ns_vendor, ns_class, ns_type, source, e, c}}) + } + } + return mts +} +func flatten_cg_name(cg []string) []string { + flat_cg := []string{} + for _, c := range cg { + flat_cg = append(flat_cg, strings.Replace(c, "/", "_", -1)) + } + return flat_cg +} + +func populate_metric(ns []string, e event) (*plugin.PluginMetricType, error) { + return &plugin.PluginMetricType{ + Namespace_: ns, + Data_: e.value, + Timestamp_: time.Now(), + }, nil +} + +func list_cgroups() ([]string, error) { + cgroups := []string{} + base_path := "/sys/fs/cgroup/perf_event/" + err := filepath.Walk(base_path, func(path string, info os.FileInfo, _ error) error { + if info.IsDir() { + cgroup_name := strings.TrimPrefix(path, base_path) + if len(cgroup_name) > 0 { + cgroups = append(cgroups, cgroup_name) + } + } + return nil + + }) + if err != nil { + return nil, err + } + return cgroups, nil +} + +func validateNamespace(namespace []string) error { + if len(namespace) != 6 { + return errors.New(fmt.Sprintf("unknown metricType %s (should containt exactly 6 segments)", namespace)) + } + if namespace[0] != ns_vendor { + return errors.New(fmt.Sprintf("unknown metricType %s (expected 1st segment %s)", namespace, ns_vendor)) + } + + if namespace[1] != ns_class { + return errors.New(fmt.Sprintf("unknown metricType %s (expected 2nd segment %s)", namespace, ns_class)) + } + if namespace[2] != ns_type { + return errors.New(fmt.Sprintf("unknown metricType %s (expected 3rd segment %s)", namespace, ns_type)) + } + if namespace[3] != ns_subtype { + return errors.New(fmt.Sprintf("unknown metricType %s (expected 4th segment %s)", namespace, ns_subtype)) + } + if !namespaceContains(namespace[4], CGROUP_EVENTS) { + return errors.New(fmt.Sprintf("unknown metricType %s (expected 5th segment %v)", namespace, CGROUP_EVENTS)) + } + return nil +} + +func namespaceContains(element string, slice []string) bool { + for _, v := range slice { + if v == element { + return true + } + } + return false +} diff --git a/plugin/collector/pulse-collector-perfevents/perfevents/perfevents_test.go b/plugin/collector/pulse-collector-perfevents/perfevents/perfevents_test.go new file mode 100644 index 000000000..41013f4ed --- /dev/null +++ b/plugin/collector/pulse-collector-perfevents/perfevents/perfevents_test.go @@ -0,0 +1,150 @@ +/* +http://www.apache.org/licenses/LICENSE-2.0.txt + + +Copyright 2015 Intel Coporation + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package perfevents + +import ( + "errors" + "fmt" + "os" + "path" + "testing" + + "github.com/intelsdi-x/pulse/control/plugin" + "github.com/intelsdi-x/pulse/core/cdata" + "github.com/intelsdi-x/pulse/plugin/helper" + . "github.com/smartystreets/goconvey/convey" +) + +var ( + PluginName = "pulse-collector-perfevents" + PluginType = "collector" + PluginVersion = 1 + PulsePath = os.Getenv("PULSE_PATH") + PluginPath = path.Join(PulsePath, "plugin", PluginName) +) + +func TestPluginLoads(t *testing.T) { + // These tests only work if PULSE_PATH is known. + // It is the responsibility of the testing framework to + // build the plugins first into the build dir. + if PulsePath != "" { + // Helper plugin trigger build if possible for this plugin + helper.BuildPlugin(PluginType, PluginName) + // + Convey("GetMetricTypes functionality", t, func() { + p := NewPerfevents() + Convey("invalid init", func() { + p.Init = func() error { return errors.New("error") } + _, err := p.GetMetricTypes(plugin.PluginConfigType{Data: cdata.NewNode()}) + So(err, ShouldNotBeNil) + }) + Convey("set_supported_metrics", func() { + cg := []string{"cgroup1", "cgroup2", "cgroup3"} + events := []string{"event1", "event2", "event3"} + a := set_supported_metrics(ns_subtype, cg, events) + So(a[len(a)-1].Namespace_, ShouldResemble, []string{ns_vendor, ns_class, ns_type, ns_subtype, "event3", "cgroup3"}) + }) + Convey("flatten cgroup name", func() { + cg := []string{"cg_root/cg_sub1/cg_sub2"} + events := []string{"event"} + a := set_supported_metrics(ns_subtype, cg, events) + So(a[len(a)-1].Namespace_, ShouldContain, "cg_root_cg_sub1_cg_sub2") + }) + }) + Convey("CollectMetrics error cases", t, func() { + p := NewPerfevents() + Convey("empty list of requested metrics", func() { + metricTypes := []plugin.PluginMetricType{} + metrics, err := p.CollectMetrics(metricTypes) + So(err, ShouldBeNil) + So(metrics, ShouldBeEmpty) + }) + Convey("namespace too short", func() { + _, err := p.CollectMetrics( + []plugin.PluginMetricType{ + plugin.PluginMetricType{ + Namespace_: []string{"invalid"}, + }, + }, + ) + So(err, ShouldNotBeNil) + So(err.Error(), ShouldContainSubstring, "segments") + }) + Convey("namespace wrong vendor", func() { + _, err := p.CollectMetrics( + []plugin.PluginMetricType{ + plugin.PluginMetricType{ + Namespace_: []string{"invalid", ns_class, ns_type, ns_subtype, "cycles", "A"}, + }, + }, + ) + So(err, ShouldNotBeNil) + So(err.Error(), ShouldContainSubstring, "1st") + }) + Convey("namespace wrong class", func() { + _, err := p.CollectMetrics( + []plugin.PluginMetricType{ + plugin.PluginMetricType{ + Namespace_: []string{ns_vendor, "invalid", ns_type, ns_subtype, "cycles", "A"}, + }, + }, + ) + So(err, ShouldNotBeNil) + So(err.Error(), ShouldContainSubstring, "2nd") + }) + Convey("namespace wrong type", func() { + _, err := p.CollectMetrics( + []plugin.PluginMetricType{ + plugin.PluginMetricType{ + Namespace_: []string{ns_vendor, ns_class, "invalid", ns_subtype, "cycles", "A"}, + }, + }, + ) + So(err, ShouldNotBeNil) + So(err.Error(), ShouldContainSubstring, "3rd") + }) + Convey("namespace wrong subtype", func() { + _, err := p.CollectMetrics( + []plugin.PluginMetricType{ + plugin.PluginMetricType{ + Namespace_: []string{ns_vendor, ns_class, ns_type, "invalid", "cycles", "A"}, + }, + }, + ) + So(err, ShouldNotBeNil) + So(err.Error(), ShouldContainSubstring, "4th") + }) + Convey("namespace wrong event", func() { + _, err := p.CollectMetrics( + []plugin.PluginMetricType{ + plugin.PluginMetricType{ + Namespace_: []string{ns_vendor, ns_class, ns_type, ns_subtype, "invalid", "A"}, + }, + }, + ) + So(err, ShouldNotBeNil) + So(err.Error(), ShouldContainSubstring, "5th") + }) + + }) + } else { + fmt.Printf("PULSE_PATH not set. Cannot test %s plugin.\n", PluginName) + } +} diff --git a/plugin/collector/pulse-collector-psutil/psutil/psutil.go b/plugin/collector/pulse-collector-psutil/psutil/psutil.go new file mode 100644 index 000000000..49ecab73f --- /dev/null +++ b/plugin/collector/pulse-collector-psutil/psutil/psutil.go @@ -0,0 +1,129 @@ +/* +http://www.apache.org/licenses/LICENSE-2.0.txt + + +Copyright 2015 Intel Coporation + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package psutil + +import ( + "bytes" + "encoding/json" + "fmt" + "os" + "regexp" + "strings" + "time" + + "github.com/intelsdi-x/pulse/control/plugin" + "github.com/intelsdi-x/pulse/control/plugin/cpolicy" +) + +const ( + // Name of plugin + Name = "psutil" + // Version of plugin + Version = 1 + // Type of plugin + Type = plugin.CollectorPluginType +) + +type Psutil struct { +} + +// CollectMetrics returns metrics from gopsutil +func (p *Psutil) CollectMetrics(mts []plugin.PluginMetricType) ([]plugin.PluginMetricType, error) { + hostname, _ := os.Hostname() + metrics := make([]plugin.PluginMetricType, len(mts)) + loadre := regexp.MustCompile(`^/psutil/load/load[1,5,15]`) + cpure := regexp.MustCompile(`^/psutil/cpu.*/.*`) + memre := regexp.MustCompile(`^/psutil/vm/.*`) + netre := regexp.MustCompile(`^/psutil/net/.*`) + + for i, p := range mts { + ns := joinNamespace(p.Namespace()) + switch { + case loadre.MatchString(ns): + metric, err := loadAvg(p.Namespace()) + if err != nil { + return nil, err + } + metrics[i] = *metric + case cpure.MatchString(ns): + metric, err := cpuTimes(p.Namespace()) + if err != nil { + return nil, err + } + metrics[i] = *metric + case memre.MatchString(ns): + metric, err := virtualMemory(p.Namespace()) + if err != nil { + return nil, err + } + metrics[i] = *metric + case netre.MatchString(ns): + metric, err := netIOCounters(p.Namespace()) + if err != nil { + return nil, err + } + metrics[i] = *metric + } + metrics[i].Source_ = hostname + metrics[i].Timestamp_ = time.Now() + + } + return metrics, nil +} + +// GetMetricTypes returns the metric types exposed by gopsutil +func (p *Psutil) GetMetricTypes(_ plugin.PluginConfigType) ([]plugin.PluginMetricType, error) { + mts := []plugin.PluginMetricType{} + + mts = append(mts, getLoadAvgMetricTypes()...) + mts_, err := getCPUTimesMetricTypes() + if err != nil { + return nil, err + } + mts = append(mts, mts_...) + mts = append(mts, getVirtualMemoryMetricTypes()...) + mts_, err = getNetIOCounterMetricTypes() + if err != nil { + return nil, err + } + mts = append(mts, mts_...) + + return mts, nil +} + +//GetConfigPolicy returns a ConfigPolicy +func (p *Psutil) GetConfigPolicy() (*cpolicy.ConfigPolicy, error) { + c := cpolicy.New() + return c, nil +} + +func joinNamespace(ns []string) string { + return "/" + strings.Join(ns, "/") +} + +func prettyPrint(mts []plugin.PluginMetricType) error { + var out bytes.Buffer + mtsb, _, _ := plugin.MarshalPluginMetricTypes(plugin.PulseJSONContentType, mts) + if err := json.Indent(&out, mtsb, "", " "); err != nil { + return err + } + fmt.Println(out.String()) + return nil +} diff --git a/plugin/collector/pulse-collector-psutil/psutil/psutil_integration_test.go b/plugin/collector/pulse-collector-psutil/psutil/psutil_integration_test.go new file mode 100644 index 000000000..e50dc616f --- /dev/null +++ b/plugin/collector/pulse-collector-psutil/psutil/psutil_integration_test.go @@ -0,0 +1,67 @@ +/* +http://www.apache.org/licenses/LICENSE-2.0.txt + + +Copyright 2015 Intel Coporation + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package psutil + +import ( + "runtime" + "testing" + + "github.com/intelsdi-x/pulse/control/plugin" + "github.com/intelsdi-x/pulse/core/cdata" + . "github.com/smartystreets/goconvey/convey" +) + +func TestPsutilCollectMetrics(t *testing.T) { + Convey("psutil collector", t, func() { + p := &Psutil{} + Convey("collect metrics", func() { + mts := []plugin.PluginMetricType{ + plugin.PluginMetricType{ + Namespace_: []string{"psutil", "load", "load1"}, + }, + plugin.PluginMetricType{ + Namespace_: []string{"psutil", "load", "load5"}, + }, + plugin.PluginMetricType{ + Namespace_: []string{"psutil", "load", "load15"}, + }, + plugin.PluginMetricType{ + Namespace_: []string{"psutil", "vm", "total"}, + }, + } + if runtime.GOOS != "darwin" { + mts = append(mts, plugin.PluginMetricType{ + Namespace_: []string{"psutil", "cpu0", "user"}, + }) + } + metrics, err := p.CollectMetrics(mts) + //prettyPrint(metrics) + So(err, ShouldBeNil) + So(metrics, ShouldNotBeNil) + }) + Convey("get metric types", func() { + mts, err := p.GetMetricTypes(plugin.PluginConfigType{Data: cdata.NewNode()}) + //prettyPrint(mts) + So(err, ShouldBeNil) + So(mts, ShouldNotBeNil) + }) + + }) +} diff --git a/plugin/processor/pulse-processor-passthru/passthru/passthru.go b/plugin/processor/pulse-processor-passthru/passthru/passthru.go index 7a561b587..1853ef70d 100644 --- a/plugin/processor/pulse-processor-passthru/passthru/passthru.go +++ b/plugin/processor/pulse-processor-passthru/passthru/passthru.go @@ -20,6 +20,9 @@ limitations under the License. package passthru import ( + "bytes" + "encoding/gob" + log "github.com/Sirupsen/logrus" "github.com/intelsdi-x/pulse/control/plugin" @@ -52,6 +55,31 @@ func (p *passthruProcessor) GetConfigPolicy() (*cpolicy.ConfigPolicy, error) { func (p *passthruProcessor) Process(contentType string, content []byte, config map[string]ctypes.ConfigValue) (string, []byte, error) { logger := log.New() logger.Println("Processor started") + + // The following block is for testing config see.. control_test.go + if _, ok := config["test"]; ok { + logger.Print("test configuration found") + var metrics []plugin.PluginMetricType + //Decodes the content into pluginMetricType + dec := gob.NewDecoder(bytes.NewBuffer(content)) + if err := dec.Decode(&metrics); err != nil { + logger.Printf("Error decoding: error=%v content=%v", err, content) + return "", nil, err + } + + for idx, m := range metrics { + if m.Namespace()[0] == "foo" { + logger.Print("found foo metric") + metrics[idx].Data_ = 2 + } + } + + var buf bytes.Buffer + enc := gob.NewEncoder(&buf) + enc.Encode(metrics) + content = buf.Bytes() + } + //just passing through return contentType, content, nil }