From f598bd13124ee7e89c6fef4ff9eb862ee9b89075 Mon Sep 17 00:00:00 2001 From: Joel Cooklin Date: Tue, 26 Apr 2016 18:22:03 -0700 Subject: [PATCH] Metric schema and metadata (#872) Resolves #872 - Metric schema and metadata --- cmd/snapctl/task.go | 3 +- control/control.go | 54 +++-- control/control_test.go | 125 +++++----- control/metrics.go | 145 ++++++----- control/metrics_test.go | 226 +++++++++--------- control/mttrie.go | 13 +- control/mttrie_test.go | 39 +-- control/plugin/client/client.go | 2 +- control/plugin/client/httpjsonrpc.go | 13 +- control/plugin/client/httpjsonrpc_test.go | 34 +-- control/plugin/client/native.go | 15 +- control/plugin/collector.go | 4 +- control/plugin/collector_proxy.go | 14 +- control/plugin/collector_proxy_test.go | 38 ++- control/plugin/collector_test.go | 11 +- control/plugin/metric.go | 95 ++++---- control/plugin/metric_test.go | 130 +++++----- control/plugin/plugin_test.go | 9 +- control/plugin_manager.go | 9 +- control/plugin_manager_test.go | 7 +- control/strategy/cache.go | 16 +- control/strategy/cache_test.go | 50 ++-- control/strategy/sticky_test.go | 5 - core/metric.go | 114 ++++++++- docs/PLUGIN_AUTHORING.md | 4 +- mgmt/rest/metric.go | 12 +- mgmt/rest/plugin_test.go | 6 +- mgmt/rest/rbody/task.go | 3 +- mgmt/rest/server.go | 6 +- mgmt/rest/task.go | 3 +- mgmt/tribe/tribe_test.go | 2 +- .../snap-collector-mock1/mock/mock.go | 36 ++- .../snap-collector-mock2/mock/mock.go | 50 ++-- .../passthru/passthru.go | 6 +- .../snap-publisher-file/file/file.go | 8 +- .../snap-publisher-file/file/file_test.go | 5 +- scheduler/job.go | 22 +- scheduler/job_test.go | 2 +- scheduler/scheduler.go | 8 +- scheduler/scheduler_test.go | 4 +- scheduler/workflow.go | 2 +- scheduler/workflow_string.go | 3 +- scheduler/workflow_test.go | 2 +- 43 files changed, 747 insertions(+), 608 deletions(-) diff --git a/cmd/snapctl/task.go b/cmd/snapctl/task.go index 0b4d8e66e..5313aeca0 100644 --- a/cmd/snapctl/task.go +++ b/cmd/snapctl/task.go @@ -381,7 +381,7 @@ func watchTask(ctx *cli.Context) { }() w := tabwriter.NewWriter(os.Stdout, 0, 8, 1, '\t', 0) - printFields(w, false, 0, "NAMESPACE", "DATA", "TIMESTAMP", "SOURCE") + printFields(w, false, 0, "NAMESPACE", "DATA", "TIMESTAMP") // Loop listening to events for { select { @@ -395,7 +395,6 @@ func watchTask(ctx *cli.Context) { event.Namespace, event.Data, event.Timestamp, - event.Source, ) } lines = len(e.Event) diff --git a/control/control.go b/control/control.go index 1cef22c56..283f6532d 100644 --- a/control/control.go +++ b/control/control.go @@ -109,19 +109,19 @@ type managesPlugins interface { } type catalogsMetrics interface { - Get([]string, int) (*metricType, error) - GetQueriedNamespaces([]string) ([][]string, error) - MatchQuery([]string) ([][]string, error) + Get(core.Namespace, int) (*metricType, error) + GetQueriedNamespaces(core.Namespace) ([]core.Namespace, error) + MatchQuery(core.Namespace) ([]core.Namespace, error) Add(*metricType) AddLoadedMetricType(*loadedPlugin, core.Metric) error RmUnloadedPluginMetrics(lp *loadedPlugin) - GetVersions([]string) ([]*metricType, error) - Fetch([]string) ([]*metricType, error) + GetVersions(core.Namespace) ([]*metricType, error) + Fetch(core.Namespace) ([]*metricType, error) Item() (string, []*metricType) Next() bool Subscribe([]string, int) error Unsubscribe([]string, int) error - GetPlugin([]string, int) (*loadedPlugin, error) + GetPlugin(core.Namespace, int) (*loadedPlugin, error) } type managesSigning interface { @@ -449,7 +449,7 @@ func (p *pluginControl) SwapPlugins(in *core.RequestedPlugin, out core.Cataloged } // MatchQueryToNamespaces performs the process of matching the 'ns' with namespaces of all cataloged metrics -func (p *pluginControl) MatchQueryToNamespaces(ns []string) ([][]string, serror.SnapError) { +func (p *pluginControl) MatchQueryToNamespaces(ns core.Namespace) ([]core.Namespace, serror.SnapError) { // carry out the matching process nss, err := p.metricCatalog.MatchQuery(ns) if err != nil { @@ -460,7 +460,7 @@ func (p *pluginControl) MatchQueryToNamespaces(ns []string) ([][]string, serror. // ExpandWildcards returns all matched metrics namespaces with given 'ns' // as the results of matching query process which has been done -func (p *pluginControl) ExpandWildcards(ns []string) ([][]string, serror.SnapError) { +func (p *pluginControl) ExpandWildcards(ns core.Namespace) ([]core.Namespace, serror.SnapError) { // retrieve queried namespaces nss, err := p.metricCatalog.GetQueriedNamespaces(ns) if err != nil { @@ -542,7 +542,7 @@ func (p *pluginControl) validateMetricTypeSubscription(mt core.RequestedMetric, if err != nil { serrs = append(serrs, serror.New(err, map[string]interface{}{ - "name": core.JoinNamespace(mt.Namespace()), + "name": mt.Namespace().String(), "version": mt.Version(), })) return serrs @@ -611,7 +611,7 @@ func (p *pluginControl) gatherCollectors(mts []core.Metric) ([]gatheredPlugin, [ m, err := p.metricCatalog.Get(mt.Namespace(), mt.Version()) if err != nil { serrs = append(serrs, serror.New(err, map[string]interface{}{ - "name": core.JoinNamespace(mt.Namespace()), + "name": mt.Namespace().String(), "version": mt.Version(), })) continue @@ -846,12 +846,12 @@ func (p *pluginControl) AvailablePlugins() []core.AvailablePlugin { // MetricCatalog returns the entire metric catalog // NOTE: The returned data from this function should be considered constant and read only func (p *pluginControl) MetricCatalog() ([]core.CatalogedMetric, error) { - return p.FetchMetrics([]string{}, 0) + return p.FetchMetrics(core.Namespace{}, 0) } // FetchMetrics returns the metrics which fall under the given namespace // NOTE: The returned data from this function should be considered constant and read only -func (p *pluginControl) FetchMetrics(ns []string, version int) ([]core.CatalogedMetric, error) { +func (p *pluginControl) FetchMetrics(ns core.Namespace, version int) ([]core.CatalogedMetric, error) { mts, err := p.metricCatalog.Fetch(ns) if err != nil { return nil, err @@ -869,11 +869,11 @@ func (p *pluginControl) FetchMetrics(ns []string, version int) ([]core.Cataloged return cmt, nil } -func (p *pluginControl) GetMetric(ns []string, ver int) (core.CatalogedMetric, error) { +func (p *pluginControl) GetMetric(ns core.Namespace, ver int) (core.CatalogedMetric, error) { return p.metricCatalog.Get(ns, ver) } -func (p *pluginControl) GetMetricVersions(ns []string) ([]core.CatalogedMetric, error) { +func (p *pluginControl) GetMetricVersions(ns core.Namespace) ([]core.CatalogedMetric, error) { mts, err := p.metricCatalog.GetVersions(ns) if err != nil { return nil, err @@ -886,7 +886,7 @@ func (p *pluginControl) GetMetricVersions(ns []string) ([]core.CatalogedMetric, return rmts, nil } -func (p *pluginControl) MetricExists(mns []string, ver int) bool { +func (p *pluginControl) MetricExists(mns core.Namespace, ver int) bool { _, err := p.metricCatalog.Get(mns, ver) if err == nil { return true @@ -932,6 +932,12 @@ func (p *pluginControl) CollectMetrics(metricTypes []core.Metric, deadline time. go func() { for m := range cMetrics { + // Reapply standard tags after collection as a precaution. It is common for + // plugin authors to inadvertently overwrite or not pass along the data + // passed to CollectMetrics so we will help them out here. + for i := range m { + m[i] = addStandardTags(m[i]) + } metrics = append(metrics, m...) wg.Done() } @@ -1035,20 +1041,20 @@ func (r *requestedPlugin) Config() *cdata.ConfigDataNode { // ------------------- helper struct and function for grouping metrics types ------ // just a tuple of loadedPlugin and metricType slice -type pluginMetricTypes struct { +type metricTypes struct { plugin *loadedPlugin metricTypes []core.Metric } -func (p *pluginMetricTypes) Count() int { +func (p *metricTypes) Count() int { return len(p.metricTypes) } // groupMetricTypesByPlugin groups metricTypes by a plugin.Key() and returns appropriate structure -func groupMetricTypesByPlugin(cat catalogsMetrics, metricTypes []core.Metric) (map[string]pluginMetricTypes, serror.SnapError) { - pmts := make(map[string]pluginMetricTypes) +func groupMetricTypesByPlugin(cat catalogsMetrics, mts []core.Metric) (map[string]metricTypes, serror.SnapError) { + pmts := make(map[string]metricTypes) // For each plugin type select a matching available plugin to call - for _, incomingmt := range metricTypes { + for _, incomingmt := range mts { version := incomingmt.Version() if version == 0 { // If the version is not provided we will choose the latest @@ -1058,17 +1064,17 @@ func groupMetricTypesByPlugin(cat catalogsMetrics, metricTypes []core.Metric) (m if err != nil { return nil, serror.New(err) } - returnedmt := plugin.PluginMetricType{ - Namespace_: incomingmt.Namespace(), + returnedmt := plugin.MetricType{ + Namespace_: catalogedmt.Namespace(), LastAdvertisedTime_: catalogedmt.LastAdvertisedTime(), Version_: incomingmt.Version(), Tags_: catalogedmt.Tags(), - Labels_: catalogedmt.Labels(), Config_: incomingmt.Config(), + Unit_: catalogedmt.Unit(), } lp := catalogedmt.Plugin if lp == nil { - return nil, serror.New(errorMetricNotFound(incomingmt.Namespace())) + return nil, serror.New(errorMetricNotFound(incomingmt.Namespace().String())) } key := lp.Key() pmt, _ := pmts[key] diff --git a/control/control_test.go b/control/control_test.go index 1092cd9d4..03617d8a5 100644 --- a/control/control_test.go +++ b/control/control_test.go @@ -613,7 +613,7 @@ type mc struct { e int } -func (m *mc) Fetch(ns []string) ([]*metricType, error) { +func (m *mc) Fetch(ns core.Namespace) ([]*metricType, error) { if m.e == 2 { return nil, serror.New(errors.New("test")) } @@ -624,33 +624,33 @@ func (m *mc) resolvePlugin(mns []string, ver int) (*loadedPlugin, error) { return nil, nil } -func (m *mc) GetPlugin([]string, int) (*loadedPlugin, error) { +func (m *mc) GetPlugin(core.Namespace, int) (*loadedPlugin, error) { return nil, nil } -func (m *mc) GetVersions([]string) ([]*metricType, error) { +func (m *mc) GetVersions(core.Namespace) ([]*metricType, error) { return nil, nil } -func (m *mc) Get(ns []string, ver int) (*metricType, error) { +func (m *mc) Get(ns core.Namespace, ver int) (*metricType, error) { if m.e == 1 { return &metricType{ policy: &mockCDProc{}, }, nil } - return nil, serror.New(errorMetricNotFound(ns)) + return nil, serror.New(errorMetricNotFound(ns.String())) } func (m *mc) Subscribe(ns []string, ver int) error { if ns[0] == "nf" { - return serror.New(errorMetricNotFound(ns)) + return serror.New(errorMetricNotFound("/" + strings.Join(ns, "/"))) } return nil } func (m *mc) Unsubscribe(ns []string, ver int) error { if ns[0] == "nf" { - return serror.New(errorMetricNotFound(ns)) + return serror.New(errorMetricNotFound("/" + strings.Join(ns, "/"))) } if ns[0] == "neg" { return errNegativeSubCount @@ -676,12 +676,12 @@ func (m *mc) RmUnloadedPluginMetrics(lp *loadedPlugin) { } -func (m *mc) GetQueriedNamespaces(ns []string) ([][]string, error) { - return [][]string{ns}, nil +func (m *mc) GetQueriedNamespaces(ns core.Namespace) ([]core.Namespace, error) { + return []core.Namespace{ns}, nil } -func (m *mc) MatchQuery(ns []string) ([][]string, error) { - return [][]string{ns}, nil +func (m *mc) MatchQuery(ns core.Namespace) ([]core.Namespace, error) { + return []core.Namespace{ns}, nil } type mockCDProc struct { @@ -724,13 +724,13 @@ func TestExportedMetricCatalog(t *testing.T) { Convey(".MetricCatalog()", t, func() { c := New(GetDefaultConfig()) lp := &loadedPlugin{} - mt := newMetricType([]string{"foo", "bar"}, time.Now(), lp) + mt := newMetricType(core.NewNamespace([]string{"foo", "bar"}), time.Now(), lp) c.metricCatalog.Add(mt) Convey("it returns a collection of core.MetricTypes", func() { t, err := c.MetricCatalog() So(err, ShouldBeNil) So(len(t), ShouldEqual, 1) - So(t[0].Namespace(), ShouldResemble, []string{"foo", "bar"}) + So(t[0].Namespace(), ShouldResemble, core.NewNamespace([]string{"foo", "bar"})) }) Convey("If metric catalog fetch fails", func() { c.metricCatalog = &mc{e: 2} @@ -745,19 +745,19 @@ func TestMetricExists(t *testing.T) { Convey("MetricExists()", t, func() { c := New(GetDefaultConfig()) c.metricCatalog = &mc{} - So(c.MetricExists([]string{"hi"}, -1), ShouldEqual, false) + So(c.MetricExists(core.NewNamespace([]string{"hi"}), -1), ShouldEqual, false) }) } type MockMetricType struct { - namespace []string + namespace core.Namespace cfg *cdata.ConfigDataNode ver int } func (m MockMetricType) MarshalJSON() ([]byte, error) { return json.Marshal(&struct { - Namespace []string `json:"namespace"` + Namespace core.Namespace `json:"namespace"` Config *cdata.ConfigDataNode `json:"config"` }{ Namespace: m.namespace, @@ -765,11 +765,15 @@ func (m MockMetricType) MarshalJSON() ([]byte, error) { }) } -func (m MockMetricType) Namespace() []string { +func (m MockMetricType) Namespace() core.Namespace { return m.namespace } -func (m MockMetricType) Source() string { +func (m MockMetricType) Description() string { + return "" +} + +func (m MockMetricType) Unit() string { return "" } @@ -793,7 +797,6 @@ func (m MockMetricType) Data() interface{} { return nil } -func (m MockMetricType) Labels() []core.Label { return nil } func (m MockMetricType) Tags() map[string]string { return nil } func TestMetricConfig(t *testing.T) { @@ -807,7 +810,7 @@ func TestMetricConfig(t *testing.T) { <-lpe.done cd := cdata.NewNode() m1 := MockMetricType{ - namespace: []string{"intel", "mock", "foo"}, + namespace: core.NewNamespace([]string{"intel", "mock", "foo"}), } Convey("So metric should not be valid without config", func() { @@ -823,7 +826,7 @@ func TestMetricConfig(t *testing.T) { Convey("So metric should not be valid if does not occur in the catalog", func() { m := MockMetricType{ - namespace: []string{"intel", "mock", "bad"}, + namespace: core.NewNamespace([]string{"intel", "mock", "bad"}), } errs := c.validateMetricTypeSubscription(m, cd) So(errs, ShouldNotBeNil) @@ -846,7 +849,7 @@ func TestMetricConfig(t *testing.T) { <-lpe.done var cd *cdata.ConfigDataNode m1 := MockMetricType{ - namespace: []string{"intel", "mock", "foo"}, + namespace: core.NewNamespace([]string{"intel", "mock", "foo"}), } Convey("So metric should be valid with config", func() { @@ -868,7 +871,7 @@ func TestMetricConfig(t *testing.T) { <-lpe.done cd := cdata.NewNode() m1 := MockMetricType{ - namespace: []string{"intel", "mock", "foo"}, + namespace: core.NewNamespace([]string{"intel", "mock", "foo"}), ver: 1, } errs := c.validateMetricTypeSubscription(m1, cd) @@ -894,7 +897,7 @@ func TestRoutingCachingStrategy(t *testing.T) { t.FailNow() } metric := MockMetricType{ - namespace: []string{"intel", "mock", "foo"}, + namespace: core.NewNamespace([]string{"intel", "mock", "foo"}), ver: 2, cfg: cdata.NewNode(), } @@ -954,10 +957,10 @@ func TestRoutingCachingStrategy(t *testing.T) { if e != nil { t.FailNow() } - metric, err := c.metricCatalog.Get([]string{"intel", "mock", "foo"}, 1) + metric, err := c.metricCatalog.Get(core.NewNamespace([]string{"intel", "mock", "foo"}), 1) metric.config = cdata.NewNode() So(err, ShouldBeNil) - So(metric.NamespaceAsString(), ShouldResemble, "/intel/mock/foo") + So(metric.Namespace().String(), ShouldResemble, "/intel/mock/foo") So(err, ShouldBeNil) <-lpe.done Convey("Start the plugins", func() { @@ -1037,15 +1040,15 @@ func TestCollectDynamicMetrics(t *testing.T) { } <-lpe.done cd := cdata.NewNode() - metrics, err := c.metricCatalog.Fetch([]string{}) + metrics, err := c.metricCatalog.Fetch(core.NewNamespace([]string{})) So(err, ShouldBeNil) So(len(metrics), ShouldEqual, 6) - m, err := c.metricCatalog.Get([]string{"intel", "mock", "*", "baz"}, 2) + m, err := c.metricCatalog.Get(core.NewNamespace([]string{"intel", "mock", "*", "baz"}), 2) So(err, ShouldBeNil) So(m, ShouldNotBeNil) - jsonm, err := c.metricCatalog.Get([]string{"intel", "mock", "*", "baz"}, 1) + jsonm, err := c.metricCatalog.Get(core.NewNamespace([]string{"intel", "mock", "*", "baz"}), 1) So(err, ShouldBeNil) So(jsonm, ShouldNotBeNil) @@ -1071,17 +1074,16 @@ func TestCollectDynamicMetrics(t *testing.T) { So(err, ShouldBeNil) ttl, err = pool.CacheTTL(taskID) So(err, ShouldBeNil) - // The minimum TTL advertised by the plugin is 100ms therefore the TTL for the - // pool should be the global cache expiration + // The minimum TTL advertised by the plugin is 100ms therefore the TTL for th // pool should be the global cache expiration So(ttl, ShouldEqual, strategy.GlobalCacheExpiration) mts, errs := c.CollectMetrics([]core.Metric{m}, time.Now().Add(time.Second*1), taskID) - hits, err := pool.CacheHits(core.JoinNamespace(m.namespace), 2, taskID) + hits, err := pool.CacheHits(m.namespace.String(), 2, taskID) So(err, ShouldBeNil) So(hits, ShouldEqual, 0) So(errs, ShouldBeNil) So(len(mts), ShouldEqual, 10) mts, errs = c.CollectMetrics([]core.Metric{m}, time.Now().Add(time.Second*1), taskID) - hits, err = pool.CacheHits(core.JoinNamespace(m.namespace), 2, taskID) + hits, err = pool.CacheHits(m.namespace.String(), 2, taskID) So(err, ShouldBeNil) // todo resolve problem with caching for dynamic metrics @@ -1114,7 +1116,7 @@ func TestCollectDynamicMetrics(t *testing.T) { So(err, ShouldBeNil) So(ttl, ShouldEqual, 1100*time.Millisecond) mts, errs := c.CollectMetrics([]core.Metric{jsonm}, time.Now().Add(time.Second*1), uuid.New()) - hits, err := pool.CacheHits(core.JoinNamespace(jsonm.namespace), jsonm.version, taskID) + hits, err := pool.CacheHits(jsonm.namespace.String(), jsonm.version, taskID) So(pool.SubscriptionCount(), ShouldEqual, 1) So(pool.Strategy, ShouldNotBeNil) So(len(mts), ShouldBeGreaterThan, 0) @@ -1123,7 +1125,7 @@ func TestCollectDynamicMetrics(t *testing.T) { So(errs, ShouldBeNil) So(len(mts), ShouldEqual, 10) mts, errs = c.CollectMetrics([]core.Metric{jsonm}, time.Now().Add(time.Second*1), uuid.New()) - hits, err = pool.CacheHits(core.JoinNamespace(m.namespace), 1, taskID) + hits, err = pool.CacheHits(m.namespace.String(), 1, taskID) So(err, ShouldBeNil) // todo resolve problem with caching for dynamic metrics @@ -1168,7 +1170,7 @@ func TestFailedPlugin(t *testing.T) { cfg.AddItem("panic", ctypes.ConfigValueBool{Value: true}) m := []core.Metric{ MockMetricType{ - namespace: []string{"intel", "mock", "foo"}, + namespace: core.NewNamespace([]string{"intel", "mock", "foo"}), cfg: cfg, }, } @@ -1247,15 +1249,15 @@ func TestCollectMetrics(t *testing.T) { m := []core.Metric{} m1 := MockMetricType{ - namespace: []string{"intel", "mock", "foo"}, + namespace: core.NewNamespace([]string{"intel", "mock", "foo"}), cfg: cd, } m2 := MockMetricType{ - namespace: []string{"intel", "mock", "bar"}, + namespace: core.NewNamespace([]string{"intel", "mock", "bar"}), cfg: cd, } m3 := MockMetricType{ - namespace: []string{"intel", "mock", "test"}, + namespace: core.NewNamespace([]string{"intel", "mock", "test"}), cfg: cd, } @@ -1339,43 +1341,43 @@ func TestExpandWildcards(t *testing.T) { So(err, ShouldBeNil) So(len(mts), ShouldEqual, 4) Convey("expand metric with an asterisk", func() { - ns := []string{"intel", "mock", "*"} + ns := core.NewNamespace([]string{"intel", "mock", "*"}) c.MatchQueryToNamespaces(ns) nss, err := c.ExpandWildcards(ns) So(err, ShouldBeNil) // "intel/mock/*" should be expanded to all available mock metrics So(len(nss), ShouldEqual, len(mts)) - So(nss, ShouldResemble, [][]string{ - {"intel", "mock", "test"}, - {"intel", "mock", "foo"}, - {"intel", "mock", "bar"}, - {"intel", "mock", "*", "baz"}, + So(nss, ShouldResemble, []core.Namespace{ + core.NewNamespace([]string{"intel", "mock", "test"}), + core.NewNamespace([]string{"intel", "mock", "foo"}), + core.NewNamespace([]string{"intel", "mock", "bar"}), + core.NewNamespace([]string{"intel", "mock", "*", "baz"}), }) }) Convey("expand metric with a tuple", func() { - ns := []string{"intel", "mock", "(test|foo|bad)"} + ns := core.NewNamespace([]string{"intel", "mock", "(test|foo|bad)"}) c.MatchQueryToNamespaces(ns) nss, err := c.ExpandWildcards(ns) So(err, ShouldBeNil) // '/intel/mock/bad' does not exist in metric catalog and shouldn't be returned So(len(nss), ShouldEqual, 2) - So(nss, ShouldResemble, [][]string{ - {"intel", "mock", "test"}, - {"intel", "mock", "foo"}, + So(nss, ShouldResemble, []core.Namespace{ + core.NewNamespace([]string{"intel", "mock", "test"}), + core.NewNamespace([]string{"intel", "mock", "foo"}), }) }) Convey("expanding for dynamic metrics", func() { // if asterisk is acceptable by plugin in this location, leave that - ns := []string{"intel", "mock", "*", "baz"} + ns := core.NewNamespace([]string{"intel", "mock", "*", "baz"}) c.MatchQueryToNamespaces(ns) nss, err := c.ExpandWildcards(ns) So(err, ShouldBeNil) So(len(nss), ShouldEqual, 1) - So(nss, ShouldResemble, [][]string{ns}) + So(nss, ShouldResemble, []core.Namespace{ns}) }) Convey("expanding for invalid metric name", func() { // if asterisk is acceptable by plugin in this location, leave that - ns := []string{"intel", "mock", "invalid", "metric"} + ns := core.NewNamespace([]string{"intel", "mock", "invalid", "metric"}) c.MatchQueryToNamespaces(ns) nss, err := c.ExpandWildcards(ns) So(err, ShouldNotBeNil) @@ -1411,7 +1413,7 @@ func TestGatherCollectors(t *testing.T) { <-lpe.done mts, err := c.MetricCatalog() - ns := []string{"intel", "mock", "foo"} + ns := core.NewNamespace([]string{"intel", "mock", "foo"}) So(err, ShouldBeNil) So(len(mts), ShouldEqual, 4) Convey("it gathers the latest version", func() { @@ -1515,8 +1517,8 @@ func TestPublishMetrics(t *testing.T) { time.Sleep(2500 * time.Millisecond) Convey("Publish to file", func() { - metrics := []plugin.PluginMetricType{ - *plugin.NewPluginMetricType([]string{"foo"}, time.Now(), "", nil, nil, 1), + metrics := []plugin.MetricType{ + *plugin.NewMetricType(core.NewNamespace([]string{"foo"}), time.Now(), nil, "", 1), } var buf bytes.Buffer enc := gob.NewEncoder(&buf) @@ -1568,8 +1570,8 @@ func TestProcessMetrics(t *testing.T) { time.Sleep(2500 * time.Millisecond) Convey("process metrics", func() { - metrics := []plugin.PluginMetricType{ - *plugin.NewPluginMetricType([]string{"foo"}, time.Now(), "", nil, nil, 1), + metrics := []plugin.MetricType{ + *plugin.NewMetricType(core.NewNamespace([]string{"foo"}), time.Now(), nil, "", 1), } var buf bytes.Buffer enc := gob.NewEncoder(&buf) @@ -1577,7 +1579,7 @@ func TestProcessMetrics(t *testing.T) { contentType := plugin.SnapGOBContentType _, ct, errs := c.ProcessMetrics(contentType, buf.Bytes(), "passthru", 1, n.Table(), uuid.New()) So(errs, ShouldBeEmpty) - mts := []plugin.PluginMetricType{} + mts := []plugin.MetricType{} dec := gob.NewDecoder(bytes.NewBuffer(ct)) err := dec.Decode(&mts) So(err, ShouldBeNil) @@ -1587,7 +1589,7 @@ func TestProcessMetrics(t *testing.T) { }) Convey("Count()", func() { - pmt := &pluginMetricTypes{} + pmt := &metricTypes{} count := pmt.Count() So(count, ShouldResemble, 0) @@ -1636,7 +1638,7 @@ func TestMetricSubscriptionToNewVersion(t *testing.T) { So(lp.Name(), ShouldResemble, "mock") //Subscribe deps to create pools. metric := MockMetricType{ - namespace: []string{"intel", "mock", "foo"}, + namespace: core.NewNamespace([]string{"intel", "mock", "foo"}), cfg: cdata.NewNode(), ver: 0, } @@ -1698,7 +1700,7 @@ func TestMetricSubscriptionToOlderVersion(t *testing.T) { So(lp.Name(), ShouldResemble, "mock") //Subscribe deps to create pools. metric := MockMetricType{ - namespace: []string{"intel", "mock", "foo"}, + namespace: core.NewNamespace([]string{"intel", "mock", "foo"}), cfg: cdata.NewNode(), ver: 0, } @@ -1741,6 +1743,7 @@ func TestMetricSubscriptionToOlderVersion(t *testing.T) { So(pool2.SubscriptionCount(), ShouldEqual, 1) mts, errs = c.CollectMetrics([]core.Metric{metric}, time.Now(), "testTaskID") + So(errs, ShouldBeEmpty) So(len(mts), ShouldEqual, 1) // ensure the data coming back is from v1, V1's data is type string diff --git a/control/metrics.go b/control/metrics.go index ad534dcbe..a56fb9fa5 100644 --- a/control/metrics.go +++ b/control/metrics.go @@ -22,6 +22,7 @@ package control import ( "errors" "fmt" + "os" "regexp" "strings" "sync" @@ -29,6 +30,7 @@ import ( log "github.com/Sirupsen/logrus" + "github.com/intelsdi-x/snap/control/plugin" "github.com/intelsdi-x/snap/control/plugin/cpolicy" "github.com/intelsdi-x/snap/core" "github.com/intelsdi-x/snap/core/cdata" @@ -49,18 +51,18 @@ var ( } ) -func errorMetricNotFound(ns []string, ver ...int) error { +func errorMetricNotFound(ns string, ver ...int) error { if len(ver) > 0 { - return fmt.Errorf("Metric not found: %s (version: %d)", core.JoinNamespace(ns), ver[0]) + return fmt.Errorf("Metric not found: %s (version: %d)", ns, ver[0]) } - return fmt.Errorf("Metric not found: %s", core.JoinNamespace(ns)) + return fmt.Errorf("Metric not found: %s", ns) } -func errorMetricContainsNotAllowedChars(ns []string) error { +func errorMetricContainsNotAllowedChars(ns string) error { return fmt.Errorf("Metric namespace %s contains not allowed characters. Avoid using %s", ns, listNotAllowedChars()) } -func errorMetricEndsWithAsterisk(ns []string) error { +func errorMetricEndsWithAsterisk(ns string) error { return fmt.Errorf("Metric namespace %s ends with an asterisk is not allowed", ns) } @@ -91,17 +93,17 @@ func (m *metricCatalogItem) Versions() map[int]core.Metric { type metricType struct { Plugin *loadedPlugin - namespace []string + namespace core.Namespace version int lastAdvertisedTime time.Time subscriptions int policy processesConfigData config *cdata.ConfigDataNode data interface{} - source string - labels []core.Label tags map[string]string timestamp time.Time + description string + unit string } type processesConfigData interface { @@ -109,7 +111,7 @@ type processesConfigData interface { HasRules() bool } -func newMetricType(ns []string, last time.Time, plugin *loadedPlugin) *metricType { +func newMetricType(ns core.Namespace, last time.Time, plugin *loadedPlugin) *metricType { return &metricType{ Plugin: plugin, @@ -119,17 +121,13 @@ func newMetricType(ns []string, last time.Time, plugin *loadedPlugin) *metricTyp } func (m *metricType) Key() string { - return fmt.Sprintf("%s/%d", m.NamespaceAsString(), m.Version()) + return fmt.Sprintf("%s/%d", m.Namespace().String(), m.Version()) } -func (m *metricType) Namespace() []string { +func (m *metricType) Namespace() core.Namespace { return m.namespace } -func (m *metricType) NamespaceAsString() string { - return core.JoinNamespace(m.Namespace()) -} - func (m *metricType) Data() interface{} { return m.data } @@ -172,22 +170,22 @@ func (m *metricType) Policy() *cpolicy.ConfigPolicyNode { return m.policy.(*cpolicy.ConfigPolicyNode) } -func (m *metricType) Source() string { - return m.source -} - func (m *metricType) Tags() map[string]string { return m.tags } -func (m *metricType) Labels() []core.Label { - return m.labels -} - func (m *metricType) Timestamp() time.Time { return m.timestamp } +func (m *metricType) Description() string { + return m.description +} + +func (m *metricType) Unit() string { + return m.unit +} + type metricCatalog struct { tree *MTTrie mutex *sync.Mutex @@ -213,12 +211,12 @@ func (mc *metricCatalog) Keys() []string { } // matchedNamespaces retrieves all matched items stored in mKey map under the key 'wkey' and converts them to namespaces -func (mc *metricCatalog) matchedNamespaces(wkey string) ([][]string, error) { +func (mc *metricCatalog) matchedNamespaces(wkey string) ([]core.Namespace, error) { // mkeys means matched metrics keys mkeys := mc.mKeys[wkey] if len(mkeys) == 0 { - return nil, errorMetricNotFound(getMetricNamespace(wkey)) + return nil, errorMetricNotFound(getMetricNamespace(wkey).String()) } // convert matched keys to a slice of namespaces @@ -227,24 +225,24 @@ func (mc *metricCatalog) matchedNamespaces(wkey string) ([][]string, error) { // GetQueriedNamespaces returns all matched metrics namespaces for query 'ns' which can contain // an asterisk or tuple (refer to query support) -func (mc *metricCatalog) GetQueriedNamespaces(ns []string) ([][]string, error) { +func (mc *metricCatalog) GetQueriedNamespaces(ns core.Namespace) ([]core.Namespace, error) { mc.mutex.Lock() defer mc.mutex.Unlock() // get metric key (might contain wildcard(s)) - wkey := getMetricKey(ns) + wkey := ns.Key() return mc.matchedNamespaces(wkey) } // MatchQuery matches given 'ns' which could contain an asterisk or a tuple and add them to matching map under key 'ns' -// The matched metrics namespaces are also returned (as a [][]string) -func (mc *metricCatalog) MatchQuery(ns []string) ([][]string, error) { +// The matched metrics namespaces are also returned (as a []core.Namespace) +func (mc *metricCatalog) MatchQuery(ns core.Namespace) ([]core.Namespace, error) { mc.mutex.Lock() defer mc.mutex.Unlock() // get metric key (might contain wildcard(s)) - wkey := getMetricKey(ns) + wkey := ns.Key() // adding matched namespaces to map mc.addItemToMatchingMap(wkey) @@ -252,9 +250,9 @@ func (mc *metricCatalog) MatchQuery(ns []string) ([][]string, error) { return mc.matchedNamespaces(wkey) } -func convertKeysToNamespaces(keys []string) [][]string { +func convertKeysToNamespaces(keys []string) []core.Namespace { // nss is a slice of slices which holds metrics namespaces - nss := [][]string{} + nss := []core.Namespace{} for _, key := range keys { ns := getMetricNamespace(key) if len(ns) != 0 { @@ -322,18 +320,21 @@ func (mc *metricCatalog) removeMatchedKey(key string) { } // validateMetricNamespace validates metric namespace in terms of containing not allowed characters and ending with an asterisk -func validateMetricNamespace(ns []string) error { - name := strings.Join(ns, "") +func validateMetricNamespace(ns core.Namespace) error { + name := "" + for _, i := range ns { + name += i.Value + } for _, chars := range notAllowedChars { for _, ch := range chars { if strings.ContainsAny(name, ch) { - return errorMetricContainsNotAllowedChars(ns) + return errorMetricContainsNotAllowedChars(ns.String()) } } } // plugin should NOT advertise metrics ending with a wildcard if strings.HasSuffix(name, "*") { - return errorMetricEndsWithAsterisk(ns) + return errorMetricEndsWithAsterisk(ns.String()) } return nil @@ -365,8 +366,9 @@ func (mc *metricCatalog) AddLoadedMetricType(lp *loadedPlugin, mt core.Metric) e version: mt.Version(), lastAdvertisedTime: mt.LastAdvertisedTime(), tags: mt.Tags(), - labels: mt.Labels(), - policy: lp.ConfigPolicy.Get(mt.Namespace()), + policy: lp.ConfigPolicy.Get(mt.Namespace().Strings()), + description: mt.Description(), + unit: mt.Unit(), } mc.Add(&newMt) return nil @@ -387,7 +389,7 @@ func (mc *metricCatalog) Add(m *metricType) { mc.mutex.Lock() defer mc.mutex.Unlock() - key := getMetricKey(m.Namespace()) + key := m.Namespace().Key() // adding key as a cataloged keys (mc.keys) mc.keys = appendIfMissing(mc.keys, key) @@ -397,24 +399,28 @@ func (mc *metricCatalog) Add(m *metricType) { // Get retrieves a metric given a namespace and version. // If provided a version of -1 the latest plugin will be returned. -func (mc *metricCatalog) Get(ns []string, version int) (*metricType, error) { +func (mc *metricCatalog) Get(ns core.Namespace, version int) (*metricType, error) { mc.mutex.Lock() defer mc.mutex.Unlock() - return mc.get(ns, version) + return mc.get(ns.Strings(), version) } // GetVersions retrieves all versions of a given metric namespace. -func (mc *metricCatalog) GetVersions(ns []string) ([]*metricType, error) { +func (mc *metricCatalog) GetVersions(ns core.Namespace) ([]*metricType, error) { mc.mutex.Lock() defer mc.mutex.Unlock() - return mc.getVersions(ns) + return mc.getVersions(strings.Split(ns.String(), "/")) } // Fetch transactionally retrieves all metrics which fall under namespace ns -func (mc *metricCatalog) Fetch(ns []string) ([]*metricType, error) { +func (mc *metricCatalog) Fetch(ns core.Namespace) ([]*metricType, error) { mc.mutex.Lock() defer mc.mutex.Unlock() - mtsi, err := mc.tree.Fetch(ns) + var nss []string + if ns.String() != "/" { + nss = ns.Strings() + } + mtsi, err := mc.tree.Fetch(nss) if err != nil { log.WithFields(log.Fields{ "_module": "control", @@ -428,14 +434,14 @@ func (mc *metricCatalog) Fetch(ns []string) ([]*metricType, error) { } // Remove removes a metricType from the catalog and from matching map -func (mc *metricCatalog) Remove(ns []string) { +func (mc *metricCatalog) Remove(ns core.Namespace) { mc.mutex.Lock() defer mc.mutex.Unlock() - mc.tree.Remove(ns) + mc.tree.Remove(ns.Strings()) // remove all items from map mKey mapped for this 'ns' - key := getMetricKey(ns) + key := ns.Key() mc.removeMatchedKey(key) } @@ -503,7 +509,7 @@ func (mc *metricCatalog) Unsubscribe(ns []string, version int) error { return m.Unsubscribe() } -func (mc *metricCatalog) GetPlugin(mns []string, ver int) (*loadedPlugin, error) { +func (mc *metricCatalog) GetPlugin(mns core.Namespace, ver int) (*loadedPlugin, error) { m, err := mc.Get(mns, ver) if err != nil { log.WithFields(log.Fields{ @@ -538,7 +544,7 @@ func (mc *metricCatalog) get(ns []string, ver int) (*metricType, error) { "_block": "get", "error": err, }).Error("error getting plugin version") - return nil, errorMetricNotFound(ns, ver) + return nil, errorMetricNotFound("/"+strings.Join(ns, "/"), ver) } return l, nil } @@ -558,17 +564,13 @@ func (mc *metricCatalog) getVersions(ns []string) ([]*metricType, error) { return nil, err } if len(mts) == 0 { - return nil, errorMetricNotFound(ns) + return nil, errorMetricNotFound("/" + strings.Join(ns, "/")) } return mts, nil } -func getMetricKey(metric []string) string { - return strings.Join(metric, ".") -} - -func getMetricNamespace(key string) []string { - return strings.Split(key, ".") +func getMetricNamespace(key string) core.Namespace { + return core.NewNamespace(strings.Split(key, ".")) } func getLatest(c []*metricType) *metricType { @@ -598,3 +600,32 @@ func getVersion(c []*metricType, ver int) (*metricType, error) { } return nil, errMetricNotFound } + +func addStandardTags(m core.Metric) core.Metric { + hostname, err := os.Hostname() + if err != nil { + log.WithFields(log.Fields{ + "_module": "control", + "_file": "metrics.go,", + "_block": "addStandardTags", + "error": err.Error(), + }).Error("Unable to determine hostname") + } + tags := m.Tags() + if tags == nil { + tags = map[string]string{} + } + tags[core.STD_TAG_PLUGIN_RUNNING_ON] = hostname + metric := plugin.MetricType{ + Namespace_: m.Namespace(), + Version_: m.Version(), + LastAdvertisedTime_: m.LastAdvertisedTime(), + Config_: m.Config(), + Data_: m.Data(), + Tags_: tags, + Description_: m.Description(), + Unit_: m.Unit(), + Timestamp_: m.Timestamp(), + } + return metric +} diff --git a/control/metrics_test.go b/control/metrics_test.go index 6c818b69f..bbfac318e 100644 --- a/control/metrics_test.go +++ b/control/metrics_test.go @@ -24,6 +24,7 @@ import ( "time" "github.com/intelsdi-x/snap/control/plugin" + "github.com/intelsdi-x/snap/core" . "github.com/smartystreets/goconvey/convey" ) @@ -31,13 +32,13 @@ import ( func TestMetricType(t *testing.T) { Convey("newMetricType()", t, func() { Convey("returns a metricType", func() { - mt := newMetricType([]string{"test"}, time.Now(), new(loadedPlugin)) + mt := newMetricType(core.NewNamespace([]string{"test"}), time.Now(), new(loadedPlugin)) So(mt, ShouldHaveSameTypeAs, new(metricType)) }) }) Convey("metricType.Namespace()", t, func() { Convey("returns the namespace of a metricType", func() { - ns := []string{"test"} + ns := core.NewNamespace([]string{"test"}) mt := newMetricType(ns, time.Now(), new(loadedPlugin)) So(mt.Namespace(), ShouldHaveSameTypeAs, ns) So(mt.Namespace(), ShouldResemble, ns) @@ -45,7 +46,7 @@ func TestMetricType(t *testing.T) { }) Convey("metricType.Version()", t, func() { Convey("returns the namespace of a metricType", func() { - ns := []string{"test"} + ns := core.NewNamespace([]string{"test"}) lp := &loadedPlugin{Meta: plugin.PluginMeta{Version: 1}} mt := newMetricType(ns, time.Now(), lp) So(mt.Version(), ShouldEqual, 1) @@ -54,7 +55,7 @@ func TestMetricType(t *testing.T) { Convey("metricType.LastAdvertisedTimestamp()", t, func() { Convey("returns the LastAdvertisedTimestamp for the metricType", func() { ts := time.Now() - mt := newMetricType([]string{"test"}, ts, new(loadedPlugin)) + mt := newMetricType(core.NewNamespace([]string{"test"}), ts, new(loadedPlugin)) So(mt.LastAdvertisedTime(), ShouldHaveSameTypeAs, ts) So(mt.LastAdvertisedTime(), ShouldResemble, ts) }) @@ -63,7 +64,7 @@ func TestMetricType(t *testing.T) { Convey("returns the key for the metricType", func() { ts := time.Now() lp := new(loadedPlugin) - mt := newMetricType([]string{"foo", "bar"}, ts, lp) + mt := newMetricType(core.NewNamespace([]string{"foo", "bar"}), ts, lp) key := mt.Key() So(key, ShouldEqual, "/foo/bar/0") }) @@ -71,7 +72,7 @@ func TestMetricType(t *testing.T) { ts := time.Now() lp2 := new(loadedPlugin) lp2.Meta.Version = 2 - mt := newMetricType([]string{"foo", "bar"}, ts, lp2) + mt := newMetricType(core.NewNamespace([]string{"foo", "bar"}), ts, lp2) key := mt.Key() So(key, ShouldEqual, "/foo/bar/2") }) @@ -82,15 +83,15 @@ func TestMetricMatching(t *testing.T) { Convey("metricCatalog.MatchQuery()", t, func() { Convey("verify query support for static metrics", func() { mc := newMetricCatalog() - ns := [][]string{ - {"mock", "foo", "bar"}, - {"mock", "foo", "baz"}, - {"mock", "asdf", "bar"}, - {"mock", "asdf", "baz"}, - {"mock", "test", "1"}, - {"mock", "test", "2"}, - {"mock", "test", "3"}, - {"mock", "test", "4"}, + ns := []core.Namespace{ + core.NewNamespace([]string{"mock", "foo", "bar"}), + core.NewNamespace([]string{"mock", "foo", "baz"}), + core.NewNamespace([]string{"mock", "asdf", "bar"}), + core.NewNamespace([]string{"mock", "asdf", "baz"}), + core.NewNamespace([]string{"mock", "test", "1"}), + core.NewNamespace([]string{"mock", "test", "2"}), + core.NewNamespace([]string{"mock", "test", "3"}), + core.NewNamespace([]string{"mock", "test", "4"}), } lp := new(loadedPlugin) ts := time.Now() @@ -109,62 +110,62 @@ func TestMetricMatching(t *testing.T) { mc.Add(v) } Convey("match /mock/foo/*", func() { - nss, err := mc.MatchQuery([]string{"mock", "foo", "*"}) + nss, err := mc.MatchQuery(core.NewNamespace([]string{"mock", "foo", "*"})) So(err, ShouldBeNil) So(len(nss), ShouldEqual, 2) - So(nss, ShouldResemble, [][]string{ - {"mock", "foo", "bar"}, - {"mock", "foo", "baz"}, + So(nss, ShouldResemble, []core.Namespace{ + core.NewNamespace([]string{"mock", "foo", "bar"}), + core.NewNamespace([]string{"mock", "foo", "baz"}), }) }) Convey("match /mock/test/*", func() { - nss, err := mc.MatchQuery([]string{"mock", "test", "*"}) + nss, err := mc.MatchQuery(core.NewNamespace([]string{"mock", "test", "*"})) So(err, ShouldBeNil) So(len(nss), ShouldEqual, 4) - So(nss, ShouldResemble, [][]string{ - {"mock", "test", "1"}, - {"mock", "test", "2"}, - {"mock", "test", "3"}, - {"mock", "test", "4"}, + So(nss, ShouldResemble, []core.Namespace{ + core.NewNamespace([]string{"mock", "test", "1"}), + core.NewNamespace([]string{"mock", "test", "2"}), + core.NewNamespace([]string{"mock", "test", "3"}), + core.NewNamespace([]string{"mock", "test", "4"}), }) }) Convey("match /mock/*/bar", func() { - nss, err := mc.MatchQuery([]string{"mock", "*", "bar"}) + nss, err := mc.MatchQuery(core.NewNamespace([]string{"mock", "*", "bar"})) So(err, ShouldBeNil) So(len(nss), ShouldEqual, 2) - So(nss, ShouldResemble, [][]string{ - {"mock", "foo", "bar"}, - {"mock", "asdf", "bar"}, + So(nss, ShouldResemble, []core.Namespace{ + core.NewNamespace([]string{"mock", "foo", "bar"}), + core.NewNamespace([]string{"mock", "asdf", "bar"}), }) }) Convey("match /mock/*", func() { - nss, err := mc.MatchQuery([]string{"mock", "*"}) + nss, err := mc.MatchQuery(core.NewNamespace([]string{"mock", "*"})) So(err, ShouldBeNil) So(len(nss), ShouldEqual, len(ns)) So(nss, ShouldResemble, ns) }) Convey("match /mock/(foo|asdf)/baz", func() { - nss, err := mc.MatchQuery([]string{"mock", "(foo|asdf)", "baz"}) + nss, err := mc.MatchQuery(core.NewNamespace([]string{"mock", "(foo|asdf)", "baz"})) So(err, ShouldBeNil) So(len(nss), ShouldEqual, 2) - So(nss, ShouldResemble, [][]string{ - {"mock", "foo", "baz"}, - {"mock", "asdf", "baz"}, + So(nss, ShouldResemble, []core.Namespace{ + core.NewNamespace([]string{"mock", "foo", "baz"}), + core.NewNamespace([]string{"mock", "asdf", "baz"}), }) }) Convey("match /mock/test/(1|2|3)", func() { - nss, err := mc.MatchQuery([]string{"mock", "test", "(1|2|3)"}) + nss, err := mc.MatchQuery(core.NewNamespace([]string{"mock", "test", "(1|2|3)"})) So(err, ShouldBeNil) So(len(nss), ShouldEqual, 3) - So(nss, ShouldResemble, [][]string{ - {"mock", "test", "1"}, - {"mock", "test", "2"}, - {"mock", "test", "3"}, + So(nss, ShouldResemble, []core.Namespace{ + core.NewNamespace([]string{"mock", "test", "1"}), + core.NewNamespace([]string{"mock", "test", "2"}), + core.NewNamespace([]string{"mock", "test", "3"}), }) }) Convey("invalid matching", func() { - nss, err := mc.MatchQuery([]string{"mock", "not", "exist", "metric"}) + nss, err := mc.MatchQuery(core.NewNamespace([]string{"mock", "not", "exist", "metric"})) So(err, ShouldNotBeNil) So(nss, ShouldBeEmpty) So(err.Error(), ShouldContainSubstring, "Metric not found:") @@ -172,15 +173,15 @@ func TestMetricMatching(t *testing.T) { }) Convey("verify query support for dynamic metrics", func() { mc := newMetricCatalog() - ns := [][]string{ - {"mock", "cgroups", "*", "bar"}, - {"mock", "cgroups", "*", "baz"}, - {"mock", "cgroups", "*", "in"}, - {"mock", "cgroups", "*", "out"}, - {"mock", "cgroups", "*", "test", "1"}, - {"mock", "cgroups", "*", "test", "2"}, - {"mock", "cgroups", "*", "test", "3"}, - {"mock", "cgroups", "*", "test", "4"}, + ns := []core.Namespace{ + core.NewNamespace([]string{"mock", "cgroups", "*", "bar"}), + core.NewNamespace([]string{"mock", "cgroups", "*", "baz"}), + core.NewNamespace([]string{"mock", "cgroups", "*", "in"}), + core.NewNamespace([]string{"mock", "cgroups", "*", "out"}), + core.NewNamespace([]string{"mock", "cgroups", "*", "test", "1"}), + core.NewNamespace([]string{"mock", "cgroups", "*", "test", "2"}), + core.NewNamespace([]string{"mock", "cgroups", "*", "test", "3"}), + core.NewNamespace([]string{"mock", "cgroups", "*", "test", "4"}), } lp := new(loadedPlugin) ts := time.Now() @@ -202,47 +203,47 @@ func TestMetricMatching(t *testing.T) { So(len(mc.Keys()), ShouldEqual, len(ns)) Convey("match /mock/cgroups/*", func() { - nss, err := mc.MatchQuery([]string{"mock", "cgroups", "*"}) + nss, err := mc.MatchQuery(core.NewNamespace([]string{"mock", "cgroups", "*"})) So(err, ShouldBeNil) So(len(nss), ShouldEqual, len(ns)) So(nss, ShouldResemble, ns) }) Convey("match /mock/cgroups/*/bar", func() { - nss, err := mc.MatchQuery([]string{"mock", "cgroups", "*", "bar"}) + nss, err := mc.MatchQuery(core.NewNamespace([]string{"mock", "cgroups", "*", "bar"})) So(err, ShouldBeNil) So(len(nss), ShouldEqual, 1) - So(nss, ShouldResemble, [][]string{ - {"mock", "cgroups", "*", "bar"}, + So(nss, ShouldResemble, []core.Namespace{ + core.NewNamespace([]string{"mock", "cgroups", "*", "bar"}), }) }) Convey("match /mock/cgroups/*/(bar|baz)", func() { - nss, err := mc.MatchQuery([]string{"mock", "cgroups", "*", "(bar|baz)"}) + nss, err := mc.MatchQuery(core.NewNamespace([]string{"mock", "cgroups", "*", "(bar|baz)"})) So(err, ShouldBeNil) So(len(nss), ShouldEqual, 2) - So(nss, ShouldResemble, [][]string{ - {"mock", "cgroups", "*", "bar"}, - {"mock", "cgroups", "*", "baz"}, + So(nss, ShouldResemble, []core.Namespace{ + core.NewNamespace([]string{"mock", "cgroups", "*", "bar"}), + core.NewNamespace([]string{"mock", "cgroups", "*", "baz"}), }) }) Convey("match /mock/cgroups/*/test/*", func() { - nss, err := mc.MatchQuery([]string{"mock", "cgroups", "*", "test", "*"}) + nss, err := mc.MatchQuery(core.NewNamespace([]string{"mock", "cgroups", "*", "test", "*"})) So(err, ShouldBeNil) So(len(nss), ShouldEqual, 4) - So(nss, ShouldResemble, [][]string{ - {"mock", "cgroups", "*", "test", "1"}, - {"mock", "cgroups", "*", "test", "2"}, - {"mock", "cgroups", "*", "test", "3"}, - {"mock", "cgroups", "*", "test", "4"}, + So(nss, ShouldResemble, []core.Namespace{ + core.NewNamespace([]string{"mock", "cgroups", "*", "test", "1"}), + core.NewNamespace([]string{"mock", "cgroups", "*", "test", "2"}), + core.NewNamespace([]string{"mock", "cgroups", "*", "test", "3"}), + core.NewNamespace([]string{"mock", "cgroups", "*", "test", "4"}), }) }) Convey("match /mock/cgroups/*/test/(1|2|3)", func() { - nss, err := mc.MatchQuery([]string{"mock", "cgroups", "*", "test", "(1|2|3)"}) + nss, err := mc.MatchQuery(core.NewNamespace([]string{"mock", "cgroups", "*", "test", "(1|2|3)"})) So(err, ShouldBeNil) So(len(nss), ShouldEqual, 3) - So(nss, ShouldResemble, [][]string{ - {"mock", "cgroups", "*", "test", "1"}, - {"mock", "cgroups", "*", "test", "2"}, - {"mock", "cgroups", "*", "test", "3"}, + So(nss, ShouldResemble, []core.Namespace{ + core.NewNamespace([]string{"mock", "cgroups", "*", "test", "1"}), + core.NewNamespace([]string{"mock", "cgroups", "*", "test", "2"}), + core.NewNamespace([]string{"mock", "cgroups", "*", "test", "3"}), }) }) }) @@ -259,8 +260,9 @@ func TestMetricCatalog(t *testing.T) { }) Convey("metricCatalog.Add()", t, func() { Convey("adds a metricType to the metricCatalog", func() { - ns := []string{"test"} + ns := core.NewNamespace([]string{"test"}) mt := newMetricType(ns, time.Now(), new(loadedPlugin)) + mt.description = "some description" mc := newMetricCatalog() mc.Add(mt) _mt, err := mc.Get(ns, -1) @@ -272,10 +274,10 @@ func TestMetricCatalog(t *testing.T) { mc := newMetricCatalog() ts := time.Now() Convey("add multiple metricTypes and get them back", func() { - ns := [][]string{ - {"test1"}, - {"test2"}, - {"test3"}, + ns := []core.Namespace{ + core.NewNamespace([]string{"test1"}), + core.NewNamespace([]string{"test2"}), + core.NewNamespace([]string{"test3"}), } lp := new(loadedPlugin) mt := []*metricType{ @@ -297,11 +299,11 @@ func TestMetricCatalog(t *testing.T) { lp2.Meta.Version = 2 lp35 := new(loadedPlugin) lp35.Meta.Version = 35 - m2 := newMetricType([]string{"foo", "bar"}, ts, lp2) + m2 := newMetricType(core.NewNamespace([]string{"foo", "bar"}), ts, lp2) mc.Add(m2) - m35 := newMetricType([]string{"foo", "bar"}, ts, lp35) + m35 := newMetricType(core.NewNamespace([]string{"foo", "bar"}), ts, lp35) mc.Add(m35) - m, err := mc.Get([]string{"foo", "bar"}, -1) + m, err := mc.Get(core.NewNamespace([]string{"foo", "bar"}), -1) So(err, ShouldBeNil) So(m, ShouldEqual, m35) }) @@ -310,11 +312,11 @@ func TestMetricCatalog(t *testing.T) { lp2.Meta.Version = 2 lp35 := new(loadedPlugin) lp35.Meta.Version = 35 - m2 := newMetricType([]string{"foo", "bar"}, ts, lp2) + m2 := newMetricType(core.NewNamespace([]string{"foo", "bar"}), ts, lp2) mc.Add(m2) - m35 := newMetricType([]string{"foo", "bar"}, ts, lp35) + m35 := newMetricType(core.NewNamespace([]string{"foo", "bar"}), ts, lp35) mc.Add(m35) - m, err := mc.Get([]string{"foo", "bar"}, 2) + m, err := mc.Get(core.NewNamespace([]string{"foo", "bar"}), 2) So(err, ShouldBeNil) So(m, ShouldEqual, m2) }) @@ -323,18 +325,18 @@ func TestMetricCatalog(t *testing.T) { lp2.Meta.Version = 2 lp35 := new(loadedPlugin) lp35.Meta.Version = 35 - m2 := newMetricType([]string{"foo", "bar"}, ts, lp2) + m2 := newMetricType(core.NewNamespace([]string{"foo", "bar"}), ts, lp2) mc.Add(m2) - m35 := newMetricType([]string{"foo", "bar"}, ts, lp35) + m35 := newMetricType(core.NewNamespace([]string{"foo", "bar"}), ts, lp35) mc.Add(m35) - _, err := mc.Get([]string{"foo", "bar"}, 7) + _, err := mc.Get(core.NewNamespace([]string{"foo", "bar"}), 7) So(err.Error(), ShouldContainSubstring, "Metric not found:") }) }) Convey("metricCatalog.Table()", t, func() { Convey("returns a copy of the table", func() { mc := newMetricCatalog() - mt := newMetricType([]string{"foo", "bar"}, time.Now(), &loadedPlugin{}) + mt := newMetricType(core.NewNamespace([]string{"foo", "bar"}), time.Now(), &loadedPlugin{}) mc.Add(mt) //TODO test tree //So(mc.Table(), ShouldHaveSameTypeAs, map[string][]*metricType{}) @@ -342,7 +344,7 @@ func TestMetricCatalog(t *testing.T) { }) }) Convey("metricCatalog.Next()", t, func() { - ns := []string{"test"} + ns := core.NewNamespace([]string{"test"}) mt := newMetricType(ns, time.Now(), new(loadedPlugin)) mc := newMetricCatalog() Convey("returns false on empty table", func() { @@ -356,10 +358,10 @@ func TestMetricCatalog(t *testing.T) { }) }) Convey("metricCatalog.Item()", t, func() { - ns := [][]string{ - {"test1"}, - {"test2"}, - {"test3"}, + ns := []core.Namespace{ + core.NewNamespace([]string{"test1"}), + core.NewNamespace([]string{"test2"}), + core.NewNamespace([]string{"test3"}), } lp := new(loadedPlugin) t := time.Now() @@ -375,14 +377,14 @@ func TestMetricCatalog(t *testing.T) { Convey("return first key and item in table", func() { mc.Next() key, item := mc.Item() - So(key, ShouldEqual, getMetricKey(ns[0])) + So(key, ShouldEqual, ns[0].Key()) So(item, ShouldResemble, []*metricType{mt[0]}) }) Convey("return second key and item in table", func() { mc.Next() mc.Next() key, item := mc.Item() - So(key, ShouldEqual, getMetricKey(ns[1])) + So(key, ShouldEqual, ns[1].Key()) So(item, ShouldResemble, []*metricType{mt[1]}) }) Convey("return third key and item in table", func() { @@ -390,7 +392,7 @@ func TestMetricCatalog(t *testing.T) { mc.Next() mc.Next() key, item := mc.Item() - So(key, ShouldEqual, getMetricKey(ns[2])) + So(key, ShouldEqual, ns[2].Key()) So(item, ShouldResemble, []*metricType{mt[2]}) }) }) @@ -398,12 +400,12 @@ func TestMetricCatalog(t *testing.T) { Convey("metricCatalog.Remove()", t, func() { mc := newMetricCatalog() ts := time.Now() - nss := [][]string{ - {"mock", "test", "1"}, - {"mock", "test", "2"}, - {"mock", "test", "3"}, - {"mock", "cgroups", "*", "in"}, - {"mock", "cgroups", "*", "out"}, + nss := []core.Namespace{ + core.NewNamespace([]string{"mock", "test", "1"}), + core.NewNamespace([]string{"mock", "test", "2"}), + core.NewNamespace([]string{"mock", "test", "3"}), + core.NewNamespace([]string{"mock", "cgroups", "*", "in"}), + core.NewNamespace([]string{"mock", "cgroups", "*", "out"}), } Convey("removes a metricType from the catalog", func() { // adding metrics to the catalog @@ -425,7 +427,7 @@ func TestMetricCatalog(t *testing.T) { mc.Remove(nss[0]) Convey("validate removing a single metric from the catalog", func() { - _mt, err := mc.Get([]string{"mock", "test", "1"}, -1) + _mt, err := mc.Get(core.NewNamespace([]string{"mock", "test", "1"}), -1) So(_mt, ShouldBeNil) So(err, ShouldNotBeNil) @@ -450,10 +452,10 @@ func TestMetricCatalog(t *testing.T) { } func TestSubscribe(t *testing.T) { - ns := [][]string{ - {"test1"}, - {"test2"}, - {"test3"}, + ns := []core.Namespace{ + core.NewNamespace([]string{"test1"}), + core.NewNamespace([]string{"test2"}), + core.NewNamespace([]string{"test3"}), } lp := new(loadedPlugin) ts := time.Now() @@ -476,7 +478,7 @@ func TestSubscribe(t *testing.T) { Convey("then it gets correctly increments the count", func() { err := mc.Subscribe([]string{"test1"}, -1) So(err, ShouldBeNil) - m, err2 := mc.Get([]string{"test1"}, -1) + m, err2 := mc.Get(core.NewNamespace([]string{"test1"}), -1) So(err2, ShouldBeNil) So(m.subscriptions, ShouldEqual, 1) }) @@ -484,10 +486,10 @@ func TestSubscribe(t *testing.T) { } func TestUnsubscribe(t *testing.T) { - ns := [][]string{ - {"test1"}, - {"test2"}, - {"test3"}, + ns := []core.Namespace{ + core.NewNamespace([]string{"test1"}), + core.NewNamespace([]string{"test2"}), + core.NewNamespace([]string{"test3"}), } lp := new(loadedPlugin) ts := time.Now() @@ -506,7 +508,7 @@ func TestUnsubscribe(t *testing.T) { So(err, ShouldBeNil) err1 := mc.Unsubscribe([]string{"test1"}, -1) So(err1, ShouldBeNil) - m, err2 := mc.Get([]string{"test1"}, -1) + m, err2 := mc.Get(core.NewNamespace([]string{"test1"}), -1) So(err2, ShouldBeNil) So(m.subscriptions, ShouldEqual, 0) }) @@ -526,7 +528,7 @@ func TestUnsubscribe(t *testing.T) { } func TestSubscriptionCount(t *testing.T) { - m := newMetricType([]string{"test"}, time.Now(), &loadedPlugin{}) + m := newMetricType(core.NewNamespace([]string{"test"}), time.Now(), &loadedPlugin{}) Convey("it returns the subscription count", t, func() { m.Subscribe() So(m.SubscriptionCount(), ShouldEqual, 1) @@ -541,17 +543,17 @@ func TestSubscriptionCount(t *testing.T) { func TestMetricNamespaceValidation(t *testing.T) { Convey("validateMetricNamespace()", t, func() { Convey("validation passes", func() { - ns := []string{"mock", "foo", "bar"} + ns := core.NewNamespace([]string{"mock", "foo", "bar"}) err := validateMetricNamespace(ns) So(err, ShouldBeNil) }) Convey("contains not allowed characters", func() { - ns := []string{"mock", "foo", "(bar)"} + ns := core.NewNamespace([]string{"mock", "foo", "(bar)"}) err := validateMetricNamespace(ns) So(err, ShouldNotBeNil) }) Convey("contains unacceptable wildcardat at the end", func() { - ns := []string{"mock", "foo", "*"} + ns := core.NewNamespace([]string{"mock", "foo", "*"}) err := validateMetricNamespace(ns) So(err, ShouldNotBeNil) }) diff --git a/control/mttrie.go b/control/mttrie.go index 76178b45a..9e114fd40 100644 --- a/control/mttrie.go +++ b/control/mttrie.go @@ -22,6 +22,7 @@ package control import ( "errors" "fmt" + "strings" ) /* @@ -105,7 +106,7 @@ func (m *MTTrie) DeleteByPlugin(lp *loadedPlugin) { // RemoveMetric removes a specific metric by namespace and version from the tree func (m *MTTrie) RemoveMetric(mt metricType) { - a, _ := m.find(mt.Namespace()) + a, _ := m.find(mt.Namespace().Strings()) if a != nil { for v, x := range a.mts { if mt.Version() == x.Version() { @@ -120,7 +121,7 @@ func (m *MTTrie) RemoveMetric(mt metricType) { // given MetricType func (mtt *mttNode) Add(mt *metricType) { ns := mt.Namespace() - node, index := mtt.walk(ns) + node, index := mtt.walk(ns.Strings()) if index == len(ns) { if node.mts == nil { node.mts = make(map[int]*metricType) @@ -134,8 +135,8 @@ func (mtt *mttNode) Add(mt *metricType) { if node.children == nil { node.children = make(map[string]*mttNode) } - node.children[n] = &mttNode{} - node = node.children[n] + node.children[n.Value] = &mttNode{} + node = node.children[n.Value] } node.mts = make(map[int]*metricType) node.mts[mt.Version()] = mt @@ -192,7 +193,7 @@ func (mtt *mttNode) Get(ns []string) ([]*metricType, error) { return nil, err } if node.mts == nil { - return nil, errorMetricNotFound(ns) + return nil, errorMetricNotFound("/" + strings.Join(ns, "/")) } var mts []*metricType for _, mt := range node.mts { @@ -222,7 +223,7 @@ func (mtt *mttNode) walk(ns []string) (*mttNode, int) { func (mtt *mttNode) find(ns []string) (*mttNode, error) { node, index := mtt.walk(ns) if index != len(ns) { - return nil, errorMetricNotFound(ns) + return nil, errorMetricNotFound("/" + strings.Join(ns, "/")) } return node, nil } diff --git a/control/mttrie_test.go b/control/mttrie_test.go index 72cea7984..b53418261 100644 --- a/control/mttrie_test.go +++ b/control/mttrie_test.go @@ -23,6 +23,7 @@ import ( "testing" "time" + "github.com/intelsdi-x/snap/core" . "github.com/smartystreets/goconvey/convey" ) @@ -35,8 +36,8 @@ func TestTrie(t *testing.T) { Convey("Fetch", t, func() { trie := NewMTTrie() Convey("Add and collect split namespace", func() { - mt := newMetricType([]string{"intel", "foo"}, time.Now(), new(loadedPlugin)) - mt2 := newMetricType([]string{"intel", "baz", "qux"}, time.Now(), new(loadedPlugin)) + mt := newMetricType(core.NewNamespace([]string{"intel", "foo"}), time.Now(), new(loadedPlugin)) + mt2 := newMetricType(core.NewNamespace([]string{"intel", "baz", "qux"}), time.Now(), new(loadedPlugin)) trie.Add(mt) trie.Add(mt2) @@ -45,13 +46,13 @@ func TestTrie(t *testing.T) { So(len(in), ShouldEqual, 2) for _, mt := range in { So(mt, ShouldNotBeNil) - So(mt.Namespace(), ShouldHaveSameTypeAs, []string{""}) + So(mt.Namespace(), ShouldHaveSameTypeAs, core.NewNamespace([]string{""})) } }) Convey("Add and collect with nodes with children", func() { - mt := newMetricType([]string{"intel", "foo", "bar"}, time.Now(), new(loadedPlugin)) - mt2 := newMetricType([]string{"intel", "foo"}, time.Now(), new(loadedPlugin)) - mt3 := newMetricType([]string{"intel", "foo", "qux"}, time.Now(), new(loadedPlugin)) + mt := newMetricType(core.NewNamespace([]string{"intel", "foo", "bar"}), time.Now(), new(loadedPlugin)) + mt2 := newMetricType(core.NewNamespace([]string{"intel", "foo"}), time.Now(), new(loadedPlugin)) + mt3 := newMetricType(core.NewNamespace([]string{"intel", "foo", "qux"}), time.Now(), new(loadedPlugin)) trie.Add(mt) trie.Add(mt2) trie.Add(mt3) @@ -61,8 +62,8 @@ func TestTrie(t *testing.T) { So(len(in), ShouldEqual, 3) }) Convey("Add and collect at node with mt and children", func() { - mt := newMetricType([]string{"intel", "foo", "bar"}, time.Now(), new(loadedPlugin)) - mt2 := newMetricType([]string{"intel", "foo"}, time.Now(), new(loadedPlugin)) + mt := newMetricType(core.NewNamespace([]string{"intel", "foo", "bar"}), time.Now(), new(loadedPlugin)) + mt2 := newMetricType(core.NewNamespace([]string{"intel", "foo"}), time.Now(), new(loadedPlugin)) trie.Add(mt) trie.Add(mt2) @@ -71,29 +72,29 @@ func TestTrie(t *testing.T) { So(len(in), ShouldEqual, 2) }) Convey("add and collect single depth namespace", func() { - mt := newMetricType([]string{"test"}, time.Now(), new(loadedPlugin)) + mt := newMetricType(core.NewNamespace([]string{"test"}), time.Now(), new(loadedPlugin)) trie.Add(mt) t, err := trie.Fetch([]string{"test"}) So(err, ShouldBeNil) - So(t[0].Namespace(), ShouldResemble, []string{"test"}) + So(t[0].Namespace(), ShouldResemble, core.NewNamespace([]string{"test"})) }) Convey("add and longer length with single child", func() { - mt := newMetricType([]string{"d", "a", "n", "b", "a", "r"}, time.Now(), new(loadedPlugin)) + mt := newMetricType(core.NewNamespace([]string{"d", "a", "n", "b", "a", "r"}), time.Now(), new(loadedPlugin)) trie.Add(mt) d, err := trie.Fetch([]string{"d", "a", "n", "b", "a", "r"}) So(err, ShouldBeNil) - So(d[0].Namespace(), ShouldResemble, []string{"d", "a", "n", "b", "a", "r"}) + So(d[0].Namespace(), ShouldResemble, core.NewNamespace([]string{"d", "a", "n", "b", "a", "r"})) dd, err := trie.Fetch([]string{"d", "a", "n"}) So(err, ShouldBeNil) - So(dd[0].Namespace(), ShouldResemble, []string{"d", "a", "n", "b", "a", "r"}) + So(dd[0].Namespace(), ShouldResemble, core.NewNamespace([]string{"d", "a", "n", "b", "a", "r"})) }) Convey("Multiple versions", func() { lp := new(loadedPlugin) lp.Meta.Version = 1 - mt := newMetricType([]string{"intel", "foo"}, time.Now(), lp) + mt := newMetricType(core.NewNamespace([]string{"intel", "foo"}), time.Now(), lp) lp2 := new(loadedPlugin) lp2.Meta.Version = 2 - mt2 := newMetricType([]string{"intel", "foo"}, time.Now(), lp2) + mt2 := newMetricType(core.NewNamespace([]string{"intel", "foo"}), time.Now(), lp2) trie.Add(mt) trie.Add(mt2) n, err := trie.Fetch([]string{"intel"}) @@ -108,7 +109,7 @@ func TestTrie(t *testing.T) { Convey("Fetch with error: depth exceeded", func() { lp := new(loadedPlugin) lp.Meta.Version = 1 - mt := newMetricType([]string{"intel", "foo"}, time.Now(), lp) + mt := newMetricType(core.NewNamespace([]string{"intel", "foo"}), time.Now(), lp) trie.Add(mt) _, err := trie.Fetch([]string{"intel", "foo", "bar", "baz"}) So(err, ShouldNotBeNil) @@ -121,17 +122,17 @@ func TestTrie(t *testing.T) { Convey("simple get", func() { lp := new(loadedPlugin) lp.Meta.Version = 1 - mt := newMetricType([]string{"intel", "foo"}, time.Now(), lp) + mt := newMetricType(core.NewNamespace([]string{"intel", "foo"}), time.Now(), lp) trie.Add(mt) n, err := trie.Get([]string{"intel", "foo"}) So(err, ShouldBeNil) So(len(n), ShouldEqual, 1) - So(n[0].Namespace(), ShouldResemble, []string{"intel", "foo"}) + So(n[0].Namespace(), ShouldResemble, core.NewNamespace([]string{"intel", "foo"})) }) Convey("error: no data at node", func() { lp := new(loadedPlugin) lp.Meta.Version = 1 - mt := newMetricType([]string{"intel", "foo"}, time.Now(), lp) + mt := newMetricType(core.NewNamespace([]string{"intel", "foo"}), time.Now(), lp) trie.Add(mt) n, err := trie.Get([]string{"intel"}) So(n, ShouldBeNil) diff --git a/control/plugin/client/client.go b/control/plugin/client/client.go index 6e56d0fb4..05a46d7dd 100644 --- a/control/plugin/client/client.go +++ b/control/plugin/client/client.go @@ -38,7 +38,7 @@ type PluginClient interface { type PluginCollectorClient interface { PluginClient CollectMetrics([]core.Metric) ([]core.Metric, error) - GetMetricTypes(plugin.PluginConfigType) ([]core.Metric, error) + GetMetricTypes(plugin.ConfigType) ([]core.Metric, error) } // PluginProcessorClient A client providing processor specific plugin method calls. diff --git a/control/plugin/client/httpjsonrpc.go b/control/plugin/client/httpjsonrpc.go index 6a00c6127..c69c32c7b 100644 --- a/control/plugin/client/httpjsonrpc.go +++ b/control/plugin/client/httpjsonrpc.go @@ -147,19 +147,18 @@ func (h *httpJSONRPCClient) CollectMetrics(mts []core.Metric) ([]core.Metric, er return nil, errors.New("no metrics to collect") } - metricsToCollect := make([]plugin.PluginMetricType, len(mts)) + metricsToCollect := make([]plugin.MetricType, len(mts)) for idx, mt := range mts { - metricsToCollect[idx] = plugin.PluginMetricType{ + metricsToCollect[idx] = plugin.MetricType{ Namespace_: mt.Namespace(), LastAdvertisedTime_: mt.LastAdvertisedTime(), Version_: mt.Version(), Tags_: mt.Tags(), - Labels_: mt.Labels(), Config_: mt.Config(), } } - args := &plugin.CollectMetricsArgs{PluginMetricTypes: metricsToCollect} + args := &plugin.CollectMetricsArgs{MetricTypes: metricsToCollect} out, err := h.encoder.Encode(args) if err != nil { @@ -195,7 +194,7 @@ func (h *httpJSONRPCClient) CollectMetrics(mts []core.Metric) ([]core.Metric, er } // GetMetricTypes returns metric types that can be collected -func (h *httpJSONRPCClient) GetMetricTypes(config plugin.PluginConfigType) ([]core.Metric, error) { +func (h *httpJSONRPCClient) GetMetricTypes(config plugin.ConfigType) ([]core.Metric, error) { args := plugin.GetMetricTypesArgs{PluginConfig: config} out, err := h.encoder.Encode(args) @@ -212,8 +211,8 @@ func (h *httpJSONRPCClient) GetMetricTypes(config plugin.PluginConfigType) ([]co if err != nil { return nil, err } - metrics := make([]core.Metric, len(mtr.PluginMetricTypes)) - for i, mt := range mtr.PluginMetricTypes { + metrics := make([]core.Metric, len(mtr.MetricTypes)) + for i, mt := range mtr.MetricTypes { mt.LastAdvertisedTime_ = time.Now() metrics[i] = mt } diff --git a/control/plugin/client/httpjsonrpc_test.go b/control/plugin/client/httpjsonrpc_test.go index 7ea84c272..79ca418f1 100644 --- a/control/plugin/client/httpjsonrpc_test.go +++ b/control/plugin/client/httpjsonrpc_test.go @@ -76,9 +76,9 @@ func (m *mockCollectorProxy) CollectMetrics(args []byte, reply *[]byte) error { if err != nil { return err } - var mts []plugin.PluginMetricType - for _, i := range dargs.PluginMetricTypes { - p := plugin.NewPluginMetricType(i.Namespace(), time.Now(), "", nil, nil, rand.Intn(100)) + var mts []plugin.MetricType + for _, i := range dargs.MetricTypes { + p := plugin.NewMetricType(i.Namespace(), time.Now(), nil, "", rand.Intn(100)) p.Config_ = i.Config() mts = append(mts, *p) } @@ -94,12 +94,12 @@ func (m *mockCollectorProxy) GetMetricTypes(args []byte, reply *[]byte) error { dargs := &plugin.GetMetricTypesArgs{} m.e.Decode(args, &dargs) - pmts := []plugin.PluginMetricType{} - pmts = append(pmts, plugin.PluginMetricType{ - Namespace_: []string{"foo", "bar"}, + pmts := []plugin.MetricType{} + pmts = append(pmts, plugin.MetricType{ + Namespace_: core.NewNamespace([]string{"foo", "bar"}), Config_: dargs.PluginConfig.ConfigDataNode, }) - *reply, _ = m.e.Encode(plugin.GetMetricTypesReply{PluginMetricTypes: pmts}) + *reply, _ = m.e.Encode(plugin.GetMetricTypesReply{MetricTypes: pmts}) return nil } @@ -216,8 +216,8 @@ func TestHTTPJSONRPC(t *testing.T) { time.Sleep(500 * time.Millisecond) mts, err := c.CollectMetrics([]core.Metric{ - &plugin.PluginMetricType{ - Namespace_: []string{"foo", "bar"}, + &plugin.MetricType{ + Namespace_: core.NewNamespace([]string{"foo", "bar"}), Config_: cdn, }, }) @@ -249,8 +249,8 @@ func TestHTTPJSONRPC(t *testing.T) { time.Sleep(500 * time.Millisecond) mts, err := c.CollectMetrics([]core.Metric{ - &plugin.PluginMetricType{ - Namespace_: []string{"foo", "bar"}, + &plugin.MetricType{ + Namespace_: core.NewNamespace([]string{"foo", "bar"}), Config_: cdn, }, }) @@ -302,18 +302,18 @@ func TestHTTPJSONRPC(t *testing.T) { }) Convey("Process metrics", func() { - pmt := plugin.NewPluginMetricType([]string{"foo", "bar"}, time.Now(), "", nil, nil, 1) - b, _ := json.Marshal([]plugin.PluginMetricType{*pmt}) + pmt := plugin.NewMetricType(core.NewNamespace([]string{"foo", "bar"}), time.Now(), nil, "", 1) + b, _ := json.Marshal([]plugin.MetricType{*pmt}) contentType, content, err := p.Process(plugin.SnapJSONContentType, b, nil) So(contentType, ShouldResemble, plugin.SnapJSONContentType) So(content, ShouldNotBeNil) So(err, ShouldEqual, nil) - var pmts []plugin.PluginMetricType + var pmts []plugin.MetricType err = json.Unmarshal(content, &pmts) So(err, ShouldBeNil) So(len(pmts), ShouldEqual, 1) So(pmts[0].Data(), ShouldEqual, 1) - So(pmts[0].Namespace(), ShouldResemble, []string{"foo", "bar"}) + So(pmts[0].Namespace(), ShouldResemble, core.NewNamespace([]string{"foo", "bar"})) }) }) @@ -342,8 +342,8 @@ func TestHTTPJSONRPC(t *testing.T) { }) Convey("Publish metrics", func() { - pmt := plugin.NewPluginMetricType([]string{"foo", "bar"}, time.Now(), "", nil, nil, 1) - b, _ := json.Marshal([]plugin.PluginMetricType{*pmt}) + pmt := plugin.NewMetricType(core.NewNamespace([]string{"foo", "bar"}), time.Now(), nil, "", 1) + b, _ := json.Marshal([]plugin.MetricType{*pmt}) err := p.Publish(plugin.SnapJSONContentType, b, nil) So(err, ShouldBeNil) }) diff --git a/control/plugin/client/native.go b/control/plugin/client/native.go index 52396f3c2..08f9f4cd1 100644 --- a/control/plugin/client/native.go +++ b/control/plugin/client/native.go @@ -129,26 +129,25 @@ func (p *PluginNativeClient) Process(contentType string, content []byte, config } func (p *PluginNativeClient) CollectMetrics(mts []core.Metric) ([]core.Metric, error) { - // Convert core.MetricType slice into plugin.PluginMetricType slice as we have + // Convert core.MetricType slice into plugin.nMetricType slice as we have // to send structs over RPC var results []core.Metric if len(mts) == 0 { return nil, errors.New("no metrics to collect") } - metricsToCollect := make([]plugin.PluginMetricType, len(mts)) + metricsToCollect := make([]plugin.MetricType, len(mts)) for idx, mt := range mts { - metricsToCollect[idx] = plugin.PluginMetricType{ + metricsToCollect[idx] = plugin.MetricType{ Namespace_: mt.Namespace(), LastAdvertisedTime_: mt.LastAdvertisedTime(), Version_: mt.Version(), Tags_: mt.Tags(), - Labels_: mt.Labels(), Config_: mt.Config(), } } - args := plugin.CollectMetricsArgs{PluginMetricTypes: metricsToCollect} + args := plugin.CollectMetricsArgs{MetricTypes: metricsToCollect} out, err := p.encoder.Encode(args) if err != nil { @@ -177,7 +176,7 @@ func (p *PluginNativeClient) CollectMetrics(mts []core.Metric) ([]core.Metric, e return results, nil } -func (p *PluginNativeClient) GetMetricTypes(config plugin.PluginConfigType) ([]core.Metric, error) { +func (p *PluginNativeClient) GetMetricTypes(config plugin.ConfigType) ([]core.Metric, error) { var reply []byte args := plugin.GetMetricTypesArgs{PluginConfig: config} @@ -199,8 +198,8 @@ func (p *PluginNativeClient) GetMetricTypes(config plugin.PluginConfigType) ([]c return nil, err } - retMetricTypes := make([]core.Metric, len(r.PluginMetricTypes)) - for i, mt := range r.PluginMetricTypes { + retMetricTypes := make([]core.Metric, len(r.MetricTypes)) + for i, mt := range r.MetricTypes { // Set the advertised time mt.LastAdvertisedTime_ = time.Now() retMetricTypes[i] = mt diff --git a/control/plugin/collector.go b/control/plugin/collector.go index 4ee3b866a..e4c264325 100644 --- a/control/plugin/collector.go +++ b/control/plugin/collector.go @@ -25,6 +25,6 @@ package plugin // Collector plugin type CollectorPlugin interface { Plugin - CollectMetrics([]PluginMetricType) ([]PluginMetricType, error) - GetMetricTypes(PluginConfigType) ([]PluginMetricType, error) + CollectMetrics([]MetricType) ([]MetricType, error) + GetMetricTypes(ConfigType) ([]MetricType, error) } diff --git a/control/plugin/collector_proxy.go b/control/plugin/collector_proxy.go index e639c73d7..d2dae131e 100644 --- a/control/plugin/collector_proxy.go +++ b/control/plugin/collector_proxy.go @@ -28,22 +28,22 @@ import ( // Arguments passed to CollectMetrics() for a Collector implementation type CollectMetricsArgs struct { - PluginMetricTypes []PluginMetricType + MetricTypes []MetricType } // Reply assigned by a Collector implementation using CollectMetrics() type CollectMetricsReply struct { - PluginMetrics []PluginMetricType + PluginMetrics []MetricType } // GetMetricTypesArgs args passed to GetMetricTypes type GetMetricTypesArgs struct { - PluginConfig PluginConfigType + PluginConfig ConfigType } // GetMetricTypesReply assigned by GetMetricTypes() implementation type GetMetricTypesReply struct { - PluginMetricTypes []PluginMetricType + MetricTypes []MetricType } type collectorPluginProxy struct { @@ -58,7 +58,7 @@ func (c *collectorPluginProxy) GetMetricTypes(args []byte, reply *[]byte) error // Reset heartbeat c.Session.ResetHeartbeat() - dargs := &GetMetricTypesArgs{PluginConfig: PluginConfigType{ConfigDataNode: cdata.NewNode()}} + dargs := &GetMetricTypesArgs{PluginConfig: ConfigType{ConfigDataNode: cdata.NewNode()}} c.Session.Decode(args, dargs) mts, err := c.Plugin.GetMetricTypes(dargs.PluginConfig) @@ -66,7 +66,7 @@ func (c *collectorPluginProxy) GetMetricTypes(args []byte, reply *[]byte) error return errors.New(fmt.Sprintf("GetMetricTypes call error : %s", err.Error())) } - r := GetMetricTypesReply{PluginMetricTypes: mts} + r := GetMetricTypesReply{MetricTypes: mts} *reply, err = c.Session.Encode(r) if err != nil { return err @@ -84,7 +84,7 @@ func (c *collectorPluginProxy) CollectMetrics(args []byte, reply *[]byte) error dargs := &CollectMetricsArgs{} c.Session.Decode(args, dargs) - ms, err := c.Plugin.CollectMetrics(dargs.PluginMetricTypes) + ms, err := c.Plugin.CollectMetrics(dargs.MetricTypes) if err != nil { return errors.New(fmt.Sprintf("CollectMetrics call error : %s", err.Error())) } diff --git a/control/plugin/collector_proxy_test.go b/control/plugin/collector_proxy_test.go index 6fbf0ee15..3a303b184 100644 --- a/control/plugin/collector_proxy_test.go +++ b/control/plugin/collector_proxy_test.go @@ -36,21 +36,22 @@ import ( type mockPlugin struct { } -var mockPluginMetricType []PluginMetricType = []PluginMetricType{ - *NewPluginMetricType([]string{"foo", "bar"}, time.Now(), "", nil, nil, 1), - *NewPluginMetricType([]string{"foo", "baz"}, time.Now(), "", nil, nil, 2), +var mockMetricType = []MetricType{ + *NewMetricType(core.NewNamespace([]string{"foo"}).AddDynamicElement("test", "something dynamic here").AddStaticElement("bar"), time.Now(), nil, "", 1), + *NewMetricType(core.NewNamespace([]string{"foo", "baz"}), time.Now(), nil, "", 2), } -func (p *mockPlugin) GetMetricTypes(cfg PluginConfigType) ([]PluginMetricType, error) { - return mockPluginMetricType, nil +func (p *mockPlugin) GetMetricTypes(cfg ConfigType) ([]MetricType, error) { + return mockMetricType, nil } -func (p *mockPlugin) CollectMetrics(mockPluginMetricTypes []PluginMetricType) ([]PluginMetricType, error) { - for i := range mockPluginMetricTypes { - mockPluginMetricTypes[i].Labels_ = []core.Label{{Index: 0, Name: "test"}} - mockPluginMetricTypes[i].Tags_ = map[string]string{"key": "value"} +func (p *mockPlugin) CollectMetrics(mockMetricTypes []MetricType) ([]MetricType, error) { + for i := range mockMetricTypes { + if mockMetricTypes[i].Namespace().String() == "/foo/*/bar" { + mockMetricTypes[i].Namespace_[1].Value = "test" + } } - return mockPluginMetricTypes, nil + return mockMetricTypes, nil } func (p *mockPlugin) GetConfigPolicy() (*cpolicy.ConfigPolicy, error) { @@ -69,11 +70,11 @@ func (p *mockPlugin) GetConfigPolicy() (*cpolicy.ConfigPolicy, error) { type mockErrorPlugin struct { } -func (p *mockErrorPlugin) GetMetricTypes(cfg PluginConfigType) ([]PluginMetricType, error) { +func (p *mockErrorPlugin) GetMetricTypes(cfg ConfigType) ([]MetricType, error) { return nil, errors.New("Error in get Metric Type") } -func (p *mockErrorPlugin) CollectMetrics(mockPluginMetricType []PluginMetricType) ([]PluginMetricType, error) { +func (p *mockErrorPlugin) CollectMetrics(_ []MetricType) ([]MetricType, error) { return nil, errors.New("Error in collect Metric") } @@ -107,7 +108,7 @@ func TestCollectorProxy(t *testing.T) { var mtr GetMetricTypesReply err := c.Session.Decode(reply, &mtr) So(err, ShouldBeNil) - So(mtr.PluginMetricTypes[0].Namespace(), ShouldResemble, []string{"foo", "bar"}) + So(mtr.MetricTypes[0].Namespace().String(), ShouldResemble, "/foo/*/bar") }) Convey("Get error in Get Metric Type", func() { @@ -122,7 +123,7 @@ func TestCollectorProxy(t *testing.T) { }) Convey("Collect Metric ", func() { args := CollectMetricsArgs{ - PluginMetricTypes: mockPluginMetricType, + MetricTypes: mockMetricType, } out, err := c.Session.Encode(args) So(err, ShouldBeNil) @@ -130,15 +131,12 @@ func TestCollectorProxy(t *testing.T) { c.CollectMetrics(out, &reply) var mtr CollectMetricsReply err = c.Session.Decode(reply, &mtr) - So(mtr.PluginMetrics[0].Namespace(), ShouldResemble, []string{"foo", "bar"}) - So(mtr.PluginMetrics[0].Labels(), ShouldNotBeNil) - So(mtr.PluginMetrics[0].Labels()[0].Name, ShouldEqual, "test") - So(mtr.PluginMetrics[0].Tags(), ShouldNotBeNil) - So(mtr.PluginMetrics[0].Tags()["key"], ShouldEqual, "value") + So(mtr.PluginMetrics[0].Namespace().String(), ShouldResemble, "/foo/test/bar") + So(mtr.PluginMetrics[0].Namespace()[1].Name, ShouldEqual, "test") Convey("Get error in Collect Metric ", func() { args := CollectMetricsArgs{ - PluginMetricTypes: mockPluginMetricType, + MetricTypes: mockMetricType, } mockErrorPlugin := &mockErrorPlugin{} errC := &collectorPluginProxy{ diff --git a/control/plugin/collector_test.go b/control/plugin/collector_test.go index 2122e9c58..dcdbfcdbc 100644 --- a/control/plugin/collector_test.go +++ b/control/plugin/collector_test.go @@ -23,6 +23,7 @@ import ( "testing" "github.com/intelsdi-x/snap/control/plugin/cpolicy" + "github.com/intelsdi-x/snap/core" . "github.com/smartystreets/goconvey/convey" ) @@ -34,13 +35,13 @@ func (f *MockPlugin) GetConfigPolicy() (*cpolicy.ConfigPolicy, error) { return &cpolicy.ConfigPolicy{}, nil } -func (f *MockPlugin) CollectMetrics(_ []PluginMetricType) ([]PluginMetricType, error) { - return []PluginMetricType{}, nil +func (f *MockPlugin) CollectMetrics(_ []MetricType) ([]MetricType, error) { + return []MetricType{}, nil } -func (c *MockPlugin) GetMetricTypes(_ PluginConfigType) ([]PluginMetricType, error) { - return []PluginMetricType{ - {Namespace_: []string{"foo", "bar"}}, +func (c *MockPlugin) GetMetricTypes(_ ConfigType) ([]MetricType, error) { + return []MetricType{ + {Namespace_: core.NewNamespace([]string{"foo", "bar"})}, }, nil } diff --git a/control/plugin/metric.go b/control/plugin/metric.go index b6c8591db..912e2fcdf 100644 --- a/control/plugin/metric.go +++ b/control/plugin/metric.go @@ -46,11 +46,11 @@ const ( // SnapProtoBuff = "snap.pb" // TO BE IMPLEMENTED ) -type PluginConfigType struct { +type ConfigType struct { *cdata.ConfigDataNode } -func (p *PluginConfigType) UnmarshalJSON(data []byte) error { +func (p *ConfigType) UnmarshalJSON(data []byte) error { cdn := cdata.NewNode() dec := json.NewDecoder(bytes.NewReader(data)) dec.UseNumber() @@ -61,7 +61,7 @@ func (p *PluginConfigType) UnmarshalJSON(data []byte) error { return nil } -func (p PluginConfigType) GobEncode() ([]byte, error) { +func (p ConfigType) GobEncode() ([]byte, error) { w := new(bytes.Buffer) encoder := gob.NewEncoder(w) if err := encoder.Encode(p.ConfigDataNode); err != nil { @@ -70,7 +70,7 @@ func (p PluginConfigType) GobEncode() ([]byte, error) { return w.Bytes(), nil } -func (p *PluginConfigType) GobDecode(data []byte) error { +func (p *ConfigType) GobDecode(data []byte) error { cdn := cdata.NewNode() decoder := gob.NewDecoder(bytes.NewReader(data)) if err := decoder.Decode(cdn); err != nil { @@ -81,17 +81,17 @@ func (p *PluginConfigType) GobDecode(data []byte) error { return nil } -func NewPluginConfigType() PluginConfigType { - return PluginConfigType{ +func NewPluginConfigType() ConfigType { + return ConfigType{ ConfigDataNode: cdata.NewNode(), } } // Represents a metric type. Only used within plugins and across plugin calls. // Converted to core.MetricType before being used within modules. -type PluginMetricType struct { +type MetricType struct { // Namespace is the identifier for a metric. - Namespace_ []string `json:"namespace"` + Namespace_ []core.NamespaceElement `json:"namespace"` // Last advertised time is the last time the snap agent was told about // a metric. @@ -105,80 +105,85 @@ type PluginMetricType struct { Data_ interface{} `json:"data"` - // labels are pulled from dynamic metrics to provide context for the metric - Labels_ []core.Label `json:"labels"` - + // Tags are key value pairs that can be added by the framework or any + // plugin along the collect -> process -> publish pipeline. Tags_ map[string]string `json:"tags"` - // The source of the metric (host, IP, etc). - Source_ string `json:"source"` + // Unit represents the unit of magnitude of the measured quantity. + // See http://metrics20.org/spec/#units as a guideline for this + // field. + Unit_ string + + // A (long) description for the metric. The description is stored on the + // metric catalog and not sent through collect -> process -> publish. + Description_ string `json:"description"` // The timestamp from when the metric was created. Timestamp_ time.Time `json:"timestamp"` } -// // PluginMetricType Constructor -func NewPluginMetricType(namespace []string, timestamp time.Time, source string, tags map[string]string, labels []core.Label, data interface{}) *PluginMetricType { - return &PluginMetricType{ +// NewMetricType returns a Constructor +func NewMetricType(namespace core.Namespace, timestamp time.Time, tags map[string]string, unit string, data interface{}) *MetricType { + return &MetricType{ Namespace_: namespace, Tags_: tags, - Labels_: labels, Data_: data, Timestamp_: timestamp, - Source_: source, + Unit_: unit, } } // Returns the namespace. -func (p PluginMetricType) Namespace() []string { +func (p MetricType) Namespace() core.Namespace { return p.Namespace_ } // Returns the last time this metric type was received from the plugin. -func (p PluginMetricType) LastAdvertisedTime() time.Time { +func (p MetricType) LastAdvertisedTime() time.Time { return p.LastAdvertisedTime_ } // Returns the namespace. -func (p PluginMetricType) Version() int { +func (p MetricType) Version() int { return p.Version_ } // Config returns the map of config data for this metric -func (p PluginMetricType) Config() *cdata.ConfigDataNode { +func (p MetricType) Config() *cdata.ConfigDataNode { return p.Config_ } // Tags returns the map of tags for this metric -func (p PluginMetricType) Tags() map[string]string { +func (p MetricType) Tags() map[string]string { return p.Tags_ } -// Labels returns the array of labels for this metric -func (p PluginMetricType) Labels() []core.Label { - return p.Labels_ -} - // returns the timestamp of when the metric was collected -func (p PluginMetricType) Timestamp() time.Time { +func (p MetricType) Timestamp() time.Time { return p.Timestamp_ } -// returns the source of the metric -func (p PluginMetricType) Source() string { - return p.Source_ +// returns the data for the metric +func (p MetricType) Data() interface{} { + return p.Data_ } -func (p PluginMetricType) Data() interface{} { - return p.Data_ +// returns the description of the metric +func (p MetricType) Description() string { + return p.Description_ +} + +// returns the metrics unit +func (p MetricType) Unit() string { + return p.Unit_ } -func (p *PluginMetricType) AddData(data interface{}) { +func (p *MetricType) AddData(data interface{}) { p.Data_ = data } -// MarshalMetricTypes returns a []byte containing a serialized version of []PluginMetricType using the content type provided. -func MarshalPluginMetricTypes(contentType string, metrics []PluginMetricType) ([]byte, string, error) { +// MarshalMetricTypes returns a []byte containing a serialized version of []MetricType using the content type provided. +func MarshalMetricTypes(contentType string, metrics []MetricType) ([]byte, string, error) { // If we have an empty slice we return an error if len(metrics) == 0 { es := fmt.Sprintf("attempt to marshall empty slice of metrics: %s", contentType) @@ -230,11 +235,11 @@ func MarshalPluginMetricTypes(contentType string, metrics []PluginMetricType) ([ } } -// UnmarshallPluginMetricTypes takes a content type and []byte payload and returns a []PluginMetricType -func UnmarshallPluginMetricTypes(contentType string, payload []byte) ([]PluginMetricType, error) { +// UnmarshallMetricTypes takes a content type and []byte payload and returns a []MetricType +func UnmarshallMetricTypes(contentType string, payload []byte) ([]MetricType, error) { switch contentType { case SnapGOBContentType: - var metrics []PluginMetricType + var metrics []MetricType r := bytes.NewBuffer(payload) err := gob.NewDecoder(r).Decode(&metrics) if err != nil { @@ -247,7 +252,7 @@ func UnmarshallPluginMetricTypes(contentType string, payload []byte) ([]PluginMe } return metrics, nil case SnapJSONContentType: - var metrics []PluginMetricType + var metrics []MetricType err := json.Unmarshal(payload, &metrics) if err != nil { log.WithFields(log.Fields{ @@ -270,9 +275,9 @@ func UnmarshallPluginMetricTypes(contentType string, payload []byte) ([]PluginMe } } -// SwapPluginMetricContentType swaps a payload with one content type to another one. -func SwapPluginMetricContentType(contentType, requestedContentType string, payload []byte) ([]byte, string, error) { - metrics, err1 := UnmarshallPluginMetricTypes(contentType, payload) +// SwapMetricContentType swaps a payload with one content type to another one. +func SwapMetricContentType(contentType, requestedContentType string, payload []byte) ([]byte, string, error) { + metrics, err1 := UnmarshallMetricTypes(contentType, payload) if err1 != nil { log.WithFields(log.Fields{ "_module": "control-plugin", @@ -281,7 +286,7 @@ func SwapPluginMetricContentType(contentType, requestedContentType string, paylo }).Error("error while swaping") return nil, "", err1 } - newPayload, newContentType, err2 := MarshalPluginMetricTypes(requestedContentType, metrics) + newPayload, newContentType, err2 := MarshalMetricTypes(requestedContentType, metrics) if err2 != nil { log.WithFields(log.Fields{ "_module": "control-plugin", diff --git a/control/plugin/metric_test.go b/control/plugin/metric_test.go index 457e8cfb7..490046e4c 100644 --- a/control/plugin/metric_test.go +++ b/control/plugin/metric_test.go @@ -20,10 +20,10 @@ limitations under the License. package plugin import ( - "strings" "testing" "time" + "github.com/intelsdi-x/snap/core" "github.com/intelsdi-x/snap/core/cdata" "github.com/intelsdi-x/snap/core/ctypes" . "github.com/smartystreets/goconvey/convey" @@ -31,11 +31,11 @@ import ( func TestMetric(t *testing.T) { Convey("error on invalid snap content type", t, func() { - m := []PluginMetricType{ - *NewPluginMetricType([]string{"foo", "bar"}, time.Now(), "", nil, nil, 1), - *NewPluginMetricType([]string{"foo", "baz"}, time.Now(), "", nil, nil, 2), + m := []MetricType{ + *NewMetricType(core.NewNamespace([]string{"foo", "bar"}), time.Now(), nil, "", 1), + *NewMetricType(core.NewNamespace([]string{"foo", "baz"}), time.Now(), nil, "", 2), } - a, c, e := MarshalPluginMetricTypes("foo", m) + a, c, e := MarshalMetricTypes("foo", m) m[0].Version_ = 1 m[0].AddData(3) configNewNode := cdata.NewNode() @@ -51,8 +51,8 @@ func TestMetric(t *testing.T) { }) Convey("error on empty metric slice", t, func() { - m := []PluginMetricType{} - a, c, e := MarshalPluginMetricTypes("foo", m) + m := []MetricType{} + a, c, e := MarshalMetricTypes("foo", m) So(e, ShouldNotBeNil) So(e.Error(), ShouldEqual, "attempt to marshall empty slice of metrics: foo") So(a, ShouldBeNil) @@ -60,184 +60,184 @@ func TestMetric(t *testing.T) { }) Convey("marshall using snap.* default to snap.gob", t, func() { - m := []PluginMetricType{ - *NewPluginMetricType([]string{"foo", "bar"}, time.Now(), "", nil, nil, 1), - *NewPluginMetricType([]string{"foo", "baz"}, time.Now(), "", nil, nil, "2"), + m := []MetricType{ + *NewMetricType(core.NewNamespace([]string{"foo", "bar"}), time.Now(), nil, "", 1), + *NewMetricType(core.NewNamespace([]string{"foo", "baz"}), time.Now(), nil, "", "2"), } - a, c, e := MarshalPluginMetricTypes("snap.*", m) + a, c, e := MarshalMetricTypes("snap.*", m) So(e, ShouldBeNil) So(a, ShouldNotBeNil) So(len(a), ShouldBeGreaterThan, 0) So(c, ShouldEqual, "snap.gob") Convey("unmarshal snap.gob", func() { - m, e = UnmarshallPluginMetricTypes("snap.gob", a) + m, e = UnmarshallMetricTypes("snap.gob", a) So(e, ShouldBeNil) - So(strings.Join(m[0].Namespace(), "/"), ShouldResemble, "foo/bar") + So(m[0].Namespace().String(), ShouldResemble, "/foo/bar") So(m[0].Data(), ShouldResemble, 1) - So(strings.Join(m[1].Namespace(), "/"), ShouldResemble, "foo/baz") + So(m[1].Namespace().String(), ShouldResemble, "/foo/baz") So(m[1].Data(), ShouldResemble, "2") }) }) Convey("marshall using snap.gob", t, func() { - m := []PluginMetricType{ - *NewPluginMetricType([]string{"foo", "bar"}, time.Now(), "", nil, nil, 1), - *NewPluginMetricType([]string{"foo", "baz"}, time.Now(), "", nil, nil, "2"), + m := []MetricType{ + *NewMetricType(core.NewNamespace([]string{"foo", "bar"}), time.Now(), nil, "", 1), + *NewMetricType(core.NewNamespace([]string{"foo", "baz"}), time.Now(), nil, "", "2"), } - a, c, e := MarshalPluginMetricTypes("snap.gob", m) + a, c, e := MarshalMetricTypes("snap.gob", m) So(e, ShouldBeNil) So(a, ShouldNotBeNil) So(len(a), ShouldBeGreaterThan, 0) So(c, ShouldEqual, "snap.gob") Convey("unmarshal snap.gob", func() { - m, e = UnmarshallPluginMetricTypes("snap.gob", a) + m, e = UnmarshallMetricTypes("snap.gob", a) So(e, ShouldBeNil) - So(strings.Join(m[0].Namespace(), "/"), ShouldResemble, "foo/bar") + So(m[0].Namespace().String(), ShouldResemble, "/foo/bar") So(m[0].Data(), ShouldResemble, 1) - So(strings.Join(m[1].Namespace(), "/"), ShouldResemble, "foo/baz") + So(m[1].Namespace().String(), ShouldResemble, "/foo/baz") So(m[1].Data(), ShouldResemble, "2") }) Convey("error on bad corrupt data", func() { a = []byte{1, 0, 1, 1, 1, 1, 1, 0, 0, 1} - m, e = UnmarshallPluginMetricTypes("snap.gob", a) + m, e = UnmarshallMetricTypes("snap.gob", a) So(e, ShouldNotBeNil) - So(e.Error(), ShouldResemble, "gob: decoding into local type *[]plugin.PluginMetricType, received remote type unknown type") + So(e.Error(), ShouldResemble, "gob: decoding into local type *[]plugin.MetricType, received remote type unknown type") }) }) Convey("marshall using snap.json", t, func() { - m := []PluginMetricType{ - *NewPluginMetricType([]string{"foo", "bar"}, time.Now(), "", nil, nil, 1), - *NewPluginMetricType([]string{"foo", "baz"}, time.Now(), "", nil, nil, "2"), + m := []MetricType{ + *NewMetricType(core.NewNamespace([]string{"foo", "bar"}), time.Now(), nil, "", 1), + *NewMetricType(core.NewNamespace([]string{"foo", "baz"}), time.Now(), nil, "", "2"), } - a, c, e := MarshalPluginMetricTypes("snap.json", m) + a, c, e := MarshalMetricTypes("snap.json", m) So(e, ShouldBeNil) So(a, ShouldNotBeNil) So(len(a), ShouldBeGreaterThan, 0) So(c, ShouldEqual, "snap.json") Convey("unmarshal snap.json", func() { - m, e = UnmarshallPluginMetricTypes("snap.json", a) + m, e = UnmarshallMetricTypes("snap.json", a) So(e, ShouldBeNil) - So(strings.Join(m[0].Namespace(), "/"), ShouldResemble, "foo/bar") + So(m[0].Namespace().String(), ShouldResemble, "/foo/bar") So(m[0].Data(), ShouldResemble, float64(1)) - So(strings.Join(m[1].Namespace(), "/"), ShouldResemble, "foo/baz") + So(m[1].Namespace().String(), ShouldResemble, "/foo/baz") So(m[1].Data(), ShouldResemble, "2") }) Convey("error on bad corrupt data", func() { a = []byte{1, 0, 1, 1, 1, 1, 1, 0, 0, 1} - m, e = UnmarshallPluginMetricTypes("snap.json", a) + m, e = UnmarshallMetricTypes("snap.json", a) So(e, ShouldNotBeNil) So(e.Error(), ShouldResemble, "invalid character '\\x01' looking for beginning of value") }) }) Convey("error on unmarshall using bad content type", t, func() { - m := []PluginMetricType{ - *NewPluginMetricType([]string{"foo", "bar"}, time.Now(), "", nil, nil, 1), - *NewPluginMetricType([]string{"foo", "baz"}, time.Now(), "", nil, nil, "2"), + m := []MetricType{ + *NewMetricType(core.NewNamespace([]string{"foo", "bar"}), time.Now(), nil, "", 1), + *NewMetricType(core.NewNamespace([]string{"foo", "baz"}), time.Now(), nil, "", "2"), } - a, c, e := MarshalPluginMetricTypes("snap.json", m) + a, c, e := MarshalMetricTypes("snap.json", m) So(e, ShouldBeNil) So(a, ShouldNotBeNil) So(len(a), ShouldBeGreaterThan, 0) So(c, ShouldEqual, "snap.json") - m, e = UnmarshallPluginMetricTypes("snap.wat", a) + m, e = UnmarshallMetricTypes("snap.wat", a) So(e, ShouldNotBeNil) So(e.Error(), ShouldEqual, "invalid snap content type for unmarshalling: snap.wat") So(m, ShouldBeNil) }) Convey("swap from snap.gob to snap.json", t, func() { - m := []PluginMetricType{ - *NewPluginMetricType([]string{"foo", "bar"}, time.Now(), "", nil, nil, 1), - *NewPluginMetricType([]string{"foo", "baz"}, time.Now(), "", nil, nil, "2"), + m := []MetricType{ + *NewMetricType(core.NewNamespace([]string{"foo", "bar"}), time.Now(), nil, "", 1), + *NewMetricType(core.NewNamespace([]string{"foo", "baz"}), time.Now(), nil, "", "2"), } - a, c, e := MarshalPluginMetricTypes("snap.gob", m) + a, c, e := MarshalMetricTypes("snap.gob", m) So(e, ShouldBeNil) So(a, ShouldNotBeNil) So(len(a), ShouldBeGreaterThan, 0) So(c, ShouldEqual, "snap.gob") - b, c, e := SwapPluginMetricContentType(c, "snap.json", a) + b, c, e := SwapMetricContentType(c, "snap.json", a) So(e, ShouldBeNil) So(c, ShouldResemble, "snap.json") So(b, ShouldNotBeNil) - m, e = UnmarshallPluginMetricTypes(c, b) + m, e = UnmarshallMetricTypes(c, b) So(e, ShouldBeNil) - So(strings.Join(m[0].Namespace(), "/"), ShouldResemble, "foo/bar") + So(m[0].Namespace().String(), ShouldResemble, "/foo/bar") So(m[0].Data(), ShouldResemble, float64(1)) - So(strings.Join(m[1].Namespace(), "/"), ShouldResemble, "foo/baz") + So(m[1].Namespace().String(), ShouldResemble, "/foo/baz") So(m[1].Data(), ShouldResemble, "2") }) Convey("swap from snap.json to snap.*", t, func() { - m := []PluginMetricType{ - *NewPluginMetricType([]string{"foo", "bar"}, time.Now(), "", nil, nil, 1), - *NewPluginMetricType([]string{"foo", "baz"}, time.Now(), "", nil, nil, "2"), + m := []MetricType{ + *NewMetricType(core.NewNamespace([]string{"foo", "bar"}), time.Now(), nil, "", 1), + *NewMetricType(core.NewNamespace([]string{"foo", "baz"}), time.Now(), nil, "", "2"), } - a, c, e := MarshalPluginMetricTypes("snap.json", m) + a, c, e := MarshalMetricTypes("snap.json", m) So(e, ShouldBeNil) So(a, ShouldNotBeNil) So(len(a), ShouldBeGreaterThan, 0) So(c, ShouldEqual, "snap.json") - b, c, e := SwapPluginMetricContentType(c, "snap.*", a) + b, c, e := SwapMetricContentType(c, "snap.*", a) So(e, ShouldBeNil) So(c, ShouldResemble, "snap.gob") So(b, ShouldNotBeNil) - m, e = UnmarshallPluginMetricTypes(c, b) + m, e = UnmarshallMetricTypes(c, b) So(e, ShouldBeNil) - So(strings.Join(m[0].Namespace(), "/"), ShouldResemble, "foo/bar") + So(m[0].Namespace().String(), ShouldResemble, "/foo/bar") So(m[0].Data(), ShouldResemble, float64(1)) - So(strings.Join(m[1].Namespace(), "/"), ShouldResemble, "foo/baz") + So(m[1].Namespace().String(), ShouldResemble, "/foo/baz") So(m[1].Data(), ShouldResemble, "2") }) Convey("swap from snap.json to snap.gob", t, func() { - m := []PluginMetricType{ - *NewPluginMetricType([]string{"foo", "bar"}, time.Now(), "", nil, nil, 1), - *NewPluginMetricType([]string{"foo", "baz"}, time.Now(), "", nil, nil, "2"), + m := []MetricType{ + *NewMetricType(core.NewNamespace([]string{"foo", "bar"}), time.Now(), nil, "", 1), + *NewMetricType(core.NewNamespace([]string{"foo", "baz"}), time.Now(), nil, "", "2"), } - a, c, e := MarshalPluginMetricTypes("snap.json", m) + a, c, e := MarshalMetricTypes("snap.json", m) So(e, ShouldBeNil) So(a, ShouldNotBeNil) So(len(a), ShouldBeGreaterThan, 0) So(c, ShouldEqual, "snap.json") - b, c, e := SwapPluginMetricContentType(c, "snap.gob", a) + b, c, e := SwapMetricContentType(c, "snap.gob", a) So(e, ShouldBeNil) So(c, ShouldResemble, "snap.gob") So(b, ShouldNotBeNil) - m, e = UnmarshallPluginMetricTypes(c, b) + m, e = UnmarshallMetricTypes(c, b) So(e, ShouldBeNil) - So(strings.Join(m[0].Namespace(), "/"), ShouldResemble, "foo/bar") + So(m[0].Namespace().String(), ShouldResemble, "/foo/bar") So(m[0].Data(), ShouldResemble, float64(1)) - So(strings.Join(m[1].Namespace(), "/"), ShouldResemble, "foo/baz") + So(m[1].Namespace().String(), ShouldResemble, "/foo/baz") So(m[1].Data(), ShouldResemble, "2") }) Convey("error on bad content type to swap", t, func() { - m := []PluginMetricType{ - *NewPluginMetricType([]string{"foo", "bar"}, time.Now(), "", nil, nil, 1), - *NewPluginMetricType([]string{"foo", "baz"}, time.Now(), "", nil, nil, "2"), + m := []MetricType{ + *NewMetricType(core.NewNamespace([]string{"foo", "bar"}), time.Now(), nil, "", 1), + *NewMetricType(core.NewNamespace([]string{"foo", "baz"}), time.Now(), nil, "", "2"), } - a, c, e := MarshalPluginMetricTypes("snap.json", m) + a, c, e := MarshalMetricTypes("snap.json", m) So(e, ShouldBeNil) So(a, ShouldNotBeNil) So(len(a), ShouldBeGreaterThan, 0) So(c, ShouldEqual, "snap.json") - b, c, e := SwapPluginMetricContentType("snap.wat", "snap.gob", a) + b, c, e := SwapMetricContentType("snap.wat", "snap.gob", a) So(e, ShouldNotBeNil) So(e.Error(), ShouldResemble, "invalid snap content type for unmarshalling: snap.wat") So(b, ShouldBeNil) diff --git a/control/plugin/plugin_test.go b/control/plugin/plugin_test.go index bddf9fac6..b25a07702 100644 --- a/control/plugin/plugin_test.go +++ b/control/plugin/plugin_test.go @@ -23,6 +23,7 @@ import ( "testing" "time" + "github.com/intelsdi-x/snap/core" . "github.com/smartystreets/goconvey/convey" ) @@ -38,15 +39,15 @@ func TestPluginType(t *testing.T) { func TestMetricType(t *testing.T) { Convey("MetricType", t, func() { now := time.Now() - m := &PluginMetricType{ - Namespace_: []string{"foo", "bar"}, + m := &MetricType{ + Namespace_: core.NewNamespace([]string{"foo", "bar"}), LastAdvertisedTime_: now, } Convey("New", func() { - So(m, ShouldHaveSameTypeAs, &PluginMetricType{}) + So(m, ShouldHaveSameTypeAs, &MetricType{}) }) Convey("Get Namespace", func() { - So(m.Namespace(), ShouldResemble, []string{"foo", "bar"}) + So(m.Namespace(), ShouldResemble, core.NewNamespace([]string{"foo", "bar"})) }) Convey("Get LastAdvertisedTimestamp", func() { So(m.LastAdvertisedTime().Unix(), ShouldBeGreaterThan, now.Unix()-2) diff --git a/control/plugin_manager.go b/control/plugin_manager.go index 8e8f0c008..50140e570 100644 --- a/control/plugin_manager.go +++ b/control/plugin_manager.go @@ -357,7 +357,7 @@ func (p *pluginManager) LoadPlugin(details *pluginDetails, emitter gomit.Emitter if resp.Type == plugin.CollectorPluginType { colClient := ap.client.(client.PluginCollectorClient) - cfg := plugin.PluginConfigType{ + cfg := plugin.ConfigType{ ConfigDataNode: p.pluginConfig.getPluginConfigDataNode(core.PluginType(resp.Type), resp.Meta.Name, resp.Meta.Version), } @@ -385,7 +385,8 @@ func (p *pluginManager) LoadPlugin(details *pluginDetails, emitter gomit.Emitter config: nmt.Config(), data: nmt.Data(), tags: nmt.Tags(), - labels: nmt.Labels(), + description: nmt.Description(), + unit: nmt.Unit(), } } // We quit and throw an error on bad metric versions (<1) @@ -404,6 +405,10 @@ func (p *pluginManager) LoadPlugin(details *pluginDetails, emitter gomit.Emitter }).Error("received metric with bad version") return nil, serror.New(err) } + + //Add standard tags + nmt = addStandardTags(nmt) + if err := p.metricCatalog.AddLoadedMetricType(lPlugin, nmt); err != nil { pmLogger.WithFields(log.Fields{ "_block": "load-plugin", diff --git a/control/plugin_manager_test.go b/control/plugin_manager_test.go index 90dbfdf25..de0f95ad9 100644 --- a/control/plugin_manager_test.go +++ b/control/plugin_manager_test.go @@ -30,6 +30,7 @@ import ( . "github.com/smartystreets/goconvey/convey" "github.com/intelsdi-x/snap/control/plugin" + "github.com/intelsdi-x/snap/core" "github.com/intelsdi-x/snap/core/ctypes" "github.com/intelsdi-x/snap/core/serror" ) @@ -127,9 +128,13 @@ func TestLoadPlugin(t *testing.T) { So(p.all(), ShouldNotBeEmpty) So(serr, ShouldBeNil) So(len(p.all()), ShouldBeGreaterThan, 0) - mts, err := p.metricCatalog.Fetch([]string{}) + mts, err := p.metricCatalog.Fetch(core.NewNamespace([]string{})) So(err, ShouldBeNil) So(len(mts), ShouldBeGreaterThan, 2) + So(mts[0].Description(), ShouldResemble, "mock description") + So(mts[0].Unit(), ShouldResemble, "mock unit") + So(mts[0].Tags(), ShouldContainKey, "plugin_running_on") + So(mts[0].Tags()["plugin_running_on"], ShouldNotResemble, "") }) Convey("for a plugin requiring a config an incomplete config will result in a load failure", func() { diff --git a/control/strategy/cache.go b/control/strategy/cache.go index 5482af2e4..421d3b0c8 100644 --- a/control/strategy/cache.go +++ b/control/strategy/cache.go @@ -126,7 +126,7 @@ func (c *cache) put(ns string, version int, m interface{}) { func (c *cache) checkCache(mts []core.Metric) (metricsToCollect []core.Metric, fromCache []core.Metric) { for _, mt := range mts { - if m := c.get(core.JoinNamespace(mt.Namespace()), mt.Version()); m != nil { + if m := c.get(mt.Namespace().String(), mt.Version()); m != nil { switch metric := m.(type) { case core.Metric: fromCache = append(fromCache, metric) @@ -155,17 +155,13 @@ type listMetricInfo struct { func (c *cache) updateCache(mts []core.Metric) { dc := map[string]listMetricInfo{} for _, mt := range mts { - if mt.Labels() == nil { + isDynamic, _ := mt.Namespace().IsDynamic() + if isDynamic == false { // cache the individual metric - c.put(core.JoinNamespace(mt.Namespace()), mt.Version(), mt) + c.put(mt.Namespace().String(), mt.Version(), mt) } else { // collect the dynamic query results so we can cache - ns := make([]string, len(mt.Namespace())) - copy(ns, mt.Namespace()) - for _, label := range mt.Labels() { - ns[label.Index] = "*" - } - key := fmt.Sprintf("%v:%v", core.JoinNamespace(ns), mt.Version()) + key := fmt.Sprintf("%v:%v", mt.Namespace().String(), mt.Version()) if _, ok := dc[key]; !ok { dc[key] = listMetricInfo{ metrics: []core.Metric{}, @@ -173,7 +169,7 @@ func (c *cache) updateCache(mts []core.Metric) { } var tmp = dc[key] tmp.metrics = append(dc[key].metrics, mt) - tmp.namespace = core.JoinNamespace(ns) + tmp.namespace = mt.Namespace().String() tmp.version = mt.Version() dc[key] = tmp } diff --git a/control/strategy/cache_test.go b/control/strategy/cache_test.go index 32202ecab..77863855c 100644 --- a/control/strategy/cache_test.go +++ b/control/strategy/cache_test.go @@ -34,8 +34,8 @@ func TestCache(t *testing.T) { GlobalCacheExpiration = time.Duration(300 * time.Millisecond) Convey("puts and gets a metric", t, func() { mc := NewCache(GlobalCacheExpiration) - foo := &plugin.PluginMetricType{ - Namespace_: []string{"foo", "bar"}, + foo := &plugin.MetricType{ + Namespace_: core.NewNamespace([]string{"foo", "bar"}), } mc.put("/foo/bar", 1, foo) @@ -58,8 +58,8 @@ func TestCache(t *testing.T) { chrono.Chrono.Pause() mc := NewCache(400 * time.Millisecond) - foo := &plugin.PluginMetricType{ - Namespace_: []string{"foo", "bar"}, + foo := &plugin.MetricType{ + Namespace_: core.NewNamespace([]string{"foo", "bar"}), } mc.put("/foo/bar", 1, foo) chrono.Chrono.Forward(401 * time.Millisecond) @@ -70,8 +70,8 @@ func TestCache(t *testing.T) { Convey("hit and miss counts", t, func() { Convey("ticks hit count when a cache entry is hit", func() { mc := NewCache(400 * time.Millisecond) - foo := &plugin.PluginMetricType{ - Namespace_: []string{"foo", "bar"}, + foo := &plugin.MetricType{ + Namespace_: core.NewNamespace([]string{"foo", "bar"}), } mc.put("/foo/bar", 1, foo) mc.get("/foo/bar", 1) @@ -84,8 +84,8 @@ func TestCache(t *testing.T) { chrono.Chrono.Pause() mc := NewCache(400 * time.Millisecond) - foo := &plugin.PluginMetricType{ - Namespace_: []string{"foo", "bar"}, + foo := &plugin.MetricType{ + Namespace_: core.NewNamespace([]string{"foo", "bar"}), } mc.put("/foo/bar", 1, foo) @@ -100,8 +100,8 @@ func TestCache(t *testing.T) { chrono.Chrono.Pause() mc := NewCache(GlobalCacheExpiration) - foo := &plugin.PluginMetricType{ - Namespace_: []string{"foo", "bar"}, + foo := &plugin.MetricType{ + Namespace_: core.NewNamespace([]string{"foo", "bar"}), } mc.put("/foo/bar", 1, foo) chrono.Chrono.Forward(301 * time.Millisecond) @@ -117,23 +117,23 @@ func TestCache(t *testing.T) { chrono.Chrono.Pause() mc := NewCache(GlobalCacheExpiration) - foo := &plugin.PluginMetricType{ - Namespace_: []string{"foo", "bar"}, + foo := &plugin.MetricType{ + Namespace_: core.NewNamespace([]string{"foo", "bar"}), } - baz := &plugin.PluginMetricType{ - Namespace_: []string{"foo", "baz"}, + baz := &plugin.MetricType{ + Namespace_: core.NewNamespace([]string{"foo", "baz"}), } metricList := []core.Metric{foo, baz} mc.updateCache(metricList) Convey("they should be retrievable via get", func() { - ret := mc.get(core.JoinNamespace(foo.Namespace()), foo.Version()) + ret := mc.get(foo.Namespace().String(), foo.Version()) So(ret, ShouldEqual, foo) - ret = mc.get(core.JoinNamespace(baz.Namespace()), baz.Version()) + ret = mc.get(baz.Namespace().String(), baz.Version()) So(ret, ShouldEqual, baz) }) Convey("they should be retrievable via checkCache", func() { - nonCached := &plugin.PluginMetricType{ - Namespace_: []string{"foo", "fooer"}, + nonCached := &plugin.MetricType{ + Namespace_: core.NewNamespace([]string{"foo", "fooer"}), } metricList = append(metricList, nonCached) toCollect, fromCache := mc.checkCache(metricList) @@ -154,22 +154,20 @@ func TestCache(t *testing.T) { defer chrono.Chrono.Continue() chrono.Chrono.Pause() mc := NewCache(GlobalCacheExpiration) - v1 := plugin.PluginMetricType{ - Namespace_: []string{"foo", "bar"}, + v1 := plugin.MetricType{ + Namespace_: core.NewNamespace([]string{"foo", "bar"}), Version_: 1, - Labels_: []core.Label{{Index: 1, Name: "Hostname"}}, } - v2 := plugin.PluginMetricType{ - Namespace_: []string{"foo", "Baz"}, + v2 := plugin.MetricType{ + Namespace_: core.NewNamespace([]string{"foo", "bar"}), Version_: 2, - Labels_: []core.Label{{Index: 1, Name: "Hostname"}}, } metricList := []core.Metric{v1, v2} mc.updateCache(metricList) Convey("Should be cached separately", func() { Convey("so only 1 should be returned from the cache", func() { - starMetric := &plugin.PluginMetricType{ - Namespace_: []string{"foo", "*"}, + starMetric := &plugin.MetricType{ + Namespace_: core.NewNamespace([]string{"foo", "bar"}), Version_: 2, } // Check /foo/* with both versions diff --git a/control/strategy/sticky_test.go b/control/strategy/sticky_test.go index 8159e4e5b..3243f1002 100644 --- a/control/strategy/sticky_test.go +++ b/control/strategy/sticky_test.go @@ -24,7 +24,6 @@ import ( "testing" "time" - "github.com/intelsdi-x/snap/core" "github.com/intelsdi-x/snap/core/cdata" . "github.com/smartystreets/goconvey/convey" ) @@ -59,12 +58,8 @@ func (m mockMetricType) Config() *cdata.ConfigDataNode { return nil } func (m mockMetricType) Data() interface{} { return nil } -func (m mockMetricType) Source() string { return "" } - func (m mockMetricType) Tags() map[string]string { return nil } -func (m mockMetricType) Labels() []core.Label { return nil } - func (m mockMetricType) Timestamp() time.Time { return time.Time{} } func TestStickyRouter(t *testing.T) { diff --git a/core/metric.go b/core/metric.go index dc5377844..972cd4890 100644 --- a/core/metric.go +++ b/core/metric.go @@ -27,10 +27,11 @@ import ( "github.com/intelsdi-x/snap/core/cdata" ) -type Label struct { - Index int `json:"index"` - Name string `json:"name"` -} +var ( + // Standard Tags are in added to the metric by the framework on plugin load. + // STD_TAG_PLUGIN_RUNNING_ON describes where the plugin is running (hostname). + STD_TAG_PLUGIN_RUNNING_ON = "plugin_running_on" +) // Metric represents a snap metric collected or to be collected type Metric interface { @@ -38,15 +39,110 @@ type Metric interface { Config() *cdata.ConfigDataNode LastAdvertisedTime() time.Time Data() interface{} - Source() string - Labels() []Label Tags() map[string]string Timestamp() time.Time + Description() string + Unit() string +} + +type Namespace []NamespaceElement + +// String returns the string representation of the namespace with "/" joining +// the elements of the namespace. A leading "/" is added. +func (n Namespace) String() string { + ns := "/" + for i, x := range n { + ns += x.Value + if i != len(n)-1 { + ns += "/" + } + } + return ns +} + +// Strings returns an array of strings that represent the elements of the +// namespace. +func (n Namespace) Strings() []string { + return strings.Split(strings.TrimPrefix(n.String(), "/"), "/") +} + +// Key returns a string representation of the namespace with "." joining +// the elements of the namespace. +func (n Namespace) Key() string { + return strings.Join(n.Strings(), ".") +} + +// IsDynamic returns true if there is any element of the namespace which is +// dynamic. If the namespace is dynamic the second return value will contain +// an array of namespace elements (indexes) where there are dynamic namespace +// elements. A dynamic component of the namespace are those elements that +// contain variable data. +func (n Namespace) IsDynamic() (bool, []int) { + var idx []int + ret := false + for i := range n { + if n[i].IsDynamic() { + ret = true + idx = append(idx, i) + } + } + return ret, idx +} + +// NewNamespace takes an array of strings and returns a Namespace. A Namespace +// is an array of NamespaceElements. The provided array of strings is used to +// set the corresponding Value fields in the array of NamespaceElements. +func NewNamespace(ns []string) Namespace { + n := make([]NamespaceElement, len(ns)) + for i, ns := range ns { + n[i] = NamespaceElement{Value: ns} + } + return n +} + +// AddDynamicElement adds a dynamic element to the given Namespace. A dynamic +// NamespaceElement is defined by having a nonempty Name field. +func (n Namespace) AddDynamicElement(name, description string) Namespace { + nse := NamespaceElement{Name: name, Description: description, Value: "*"} + return append(n, nse) +} + +// AddStaticElement adds a static element to the given Namespace. A static +// NamespaceElement is defined by having an empty Name field. +func (n Namespace) AddStaticElement(value string) Namespace { + nse := NamespaceElement{Value: value} + return append(n, nse) +} + +// NamespaceElement provides meta data related to the namespace. This is of particular importance when +// the namespace contains data. +type NamespaceElement struct { + Value string + Description string + Name string +} + +// NewNamespaceElement tasks a string and returns a NamespaceElement where the +// Value field is set to the provided string argument. +func NewNamespaceElement(e string) NamespaceElement { + if e != "" { + return NamespaceElement{Value: e} + } + return NamespaceElement{} +} + +// IsDynamic returns true if the namespace element contains data. A namespace +// element that has a nonempty Name field is considered dynamic. +func (n *NamespaceElement) IsDynamic() bool { + if n.Name != "" { + return true + } + return false } // RequestedMetric is a metric requested for collection type RequestedMetric interface { - Namespace() []string + Namespace() Namespace Version() int } @@ -55,7 +151,3 @@ type CatalogedMetric interface { LastAdvertisedTime() time.Time Policy() *cpolicy.ConfigPolicyNode } - -func JoinNamespace(ns []string) string { - return "/" + strings.Join(ns, "/") -} diff --git a/docs/PLUGIN_AUTHORING.md b/docs/PLUGIN_AUTHORING.md index 3d3f3e6c3..6b499aab0 100644 --- a/docs/PLUGIN_AUTHORING.md +++ b/docs/PLUGIN_AUTHORING.md @@ -98,8 +98,8 @@ github.com/intelsdi-x/snap/core/ctypes A snap collector plugin collects telemetry data by communicating with the snap daemon. To confine to collector plugin interfaces and metric types defined in snap, a collector plugin must implement the following methods: ``` GetConfigPolicy() (*cpolicy.ConfigPolicy, error) -CollectMetrics([]PluginMetricType) ([]PluginMetricType, error) -GetMetricTypes(PluginConfigType) ([]PluginMetricType, error) +CollectMetrics([]MetricType) ([]MetricType, error) +GetMetricTypes(ConfigType) ([]MetricType, error) ``` ### Writing a processor plugin A snap processor plugin allows filtering, aggregation, transformation, etc of collected telemetry data. To complaint with processor plugin interfaces defined in snap, a processor plugin must implement the following methods: diff --git a/mgmt/rest/metric.go b/mgmt/rest/metric.go index 2cea94287..a2aec9d69 100644 --- a/mgmt/rest/metric.go +++ b/mgmt/rest/metric.go @@ -72,7 +72,7 @@ func (s *Server) getMetricsFromTree(w http.ResponseWriter, r *http.Request, para } } - mets, err := s.mm.FetchMetrics(ns[:len(ns)-1], ver) + mets, err := s.mm.FetchMetrics(core.NewNamespace(ns[:len(ns)-1]), ver) if err != nil { respond(404, rbody.FromError(err), w) return @@ -83,7 +83,7 @@ func (s *Server) getMetricsFromTree(w http.ResponseWriter, r *http.Request, para // If no version was given, get all that fall at this namespace. if v == "" { - mts, err := s.mm.GetMetricVersions(ns) + mts, err := s.mm.GetMetricVersions(core.NewNamespace(ns)) if err != nil { respond(404, rbody.FromError(err), w) return @@ -98,7 +98,7 @@ func (s *Server) getMetricsFromTree(w http.ResponseWriter, r *http.Request, para respond(400, rbody.FromError(err), w) return } - mt, err := s.mm.GetMetric(ns, ver) + mt, err := s.mm.GetMetric(core.NewNamespace(ns), ver) if err != nil { respond(404, rbody.FromError(err), w) return @@ -106,7 +106,7 @@ func (s *Server) getMetricsFromTree(w http.ResponseWriter, r *http.Request, para b := &rbody.MetricReturned{} mb := &rbody.Metric{ - Namespace: core.JoinNamespace(mt.Namespace()), + Namespace: mt.Namespace().String(), Version: mt.Version(), LastAdvertisedTimestamp: mt.LastAdvertisedTime().Unix(), Href: catalogedMetricURI(r.Host, mt), @@ -145,7 +145,7 @@ func respondWithMetrics(host string, mets []core.CatalogedMetric, w http.Respons }) } b = append(b, rbody.Metric{ - Namespace: core.JoinNamespace(met.Namespace()), + Namespace: met.Namespace().String(), Version: met.Version(), LastAdvertisedTimestamp: met.LastAdvertisedTime().Unix(), Policy: policies, @@ -157,5 +157,5 @@ func respondWithMetrics(host string, mets []core.CatalogedMetric, w http.Respons } func catalogedMetricURI(host string, mt core.CatalogedMetric) string { - return fmt.Sprintf("%s://%s/v1/metrics%s?ver=%d", protocolPrefix, host, core.JoinNamespace(mt.Namespace()), mt.Version()) + return fmt.Sprintf("%s://%s/v1/metrics%s?ver=%d", protocolPrefix, host, mt.Namespace().String(), mt.Version()) } diff --git a/mgmt/rest/plugin_test.go b/mgmt/rest/plugin_test.go index b69b89a5f..f887f5b8c 100644 --- a/mgmt/rest/plugin_test.go +++ b/mgmt/rest/plugin_test.go @@ -59,13 +59,13 @@ type MockManagesMetrics struct{} func (m MockManagesMetrics) MetricCatalog() ([]core.CatalogedMetric, error) { return nil, nil } -func (m MockManagesMetrics) FetchMetrics([]string, int) ([]core.CatalogedMetric, error) { +func (m MockManagesMetrics) FetchMetrics(core.Namespace, int) ([]core.CatalogedMetric, error) { return nil, nil } -func (m MockManagesMetrics) GetMetricVersions([]string) ([]core.CatalogedMetric, error) { +func (m MockManagesMetrics) GetMetricVersions(core.Namespace) ([]core.CatalogedMetric, error) { return nil, nil } -func (m MockManagesMetrics) GetMetric([]string, int) (core.CatalogedMetric, error) { +func (m MockManagesMetrics) GetMetric(core.Namespace, int) (core.CatalogedMetric, error) { return nil, nil } func (m MockManagesMetrics) Load(*core.RequestedPlugin) (core.CatalogedPlugin, serror.SnapError) { diff --git a/mgmt/rest/rbody/task.go b/mgmt/rest/rbody/task.go index b5b04fad1..d960836b8 100644 --- a/mgmt/rest/rbody/task.go +++ b/mgmt/rest/rbody/task.go @@ -252,7 +252,6 @@ func (s *StreamedTaskEvent) ToJSON() string { type StreamedMetric struct { Namespace string `json:"namespace"` Data interface{} `json:"data"` - Source string `json:"source"` Timestamp time.Time `json:"timestamp"` Tags map[string]string `json:"tags"` } @@ -264,7 +263,7 @@ func (s StreamedMetrics) Len() int { } func (s StreamedMetrics) Less(i, j int) bool { - return fmt.Sprintf("%s:%s", s[i].Source, s[i].Namespace) < fmt.Sprintf("%s:%s", s[j].Source, s[j].Namespace) + return fmt.Sprintf("%s", s[i].Namespace) < fmt.Sprintf("%s", s[j].Namespace) } func (s StreamedMetrics) Swap(i, j int) { diff --git a/mgmt/rest/server.go b/mgmt/rest/server.go index d94e1cdec..bb2f471e2 100644 --- a/mgmt/rest/server.go +++ b/mgmt/rest/server.go @@ -79,9 +79,9 @@ type Config struct { type managesMetrics interface { MetricCatalog() ([]core.CatalogedMetric, error) - FetchMetrics([]string, int) ([]core.CatalogedMetric, error) - GetMetricVersions([]string) ([]core.CatalogedMetric, error) - GetMetric([]string, int) (core.CatalogedMetric, error) + FetchMetrics(core.Namespace, int) ([]core.CatalogedMetric, error) + GetMetricVersions(core.Namespace) ([]core.CatalogedMetric, error) + GetMetric(core.Namespace, int) (core.CatalogedMetric, error) Load(*core.RequestedPlugin) (core.CatalogedPlugin, serror.SnapError) Unload(core.Plugin) (core.CatalogedPlugin, serror.SnapError) PluginCatalog() core.PluginCatalog diff --git a/mgmt/rest/task.go b/mgmt/rest/task.go index 671e2a372..1f42b9021 100644 --- a/mgmt/rest/task.go +++ b/mgmt/rest/task.go @@ -386,9 +386,8 @@ func (t *TaskWatchHandler) CatchCollection(m []core.Metric) { sm := make([]rbody.StreamedMetric, len(m)) for i := range m { sm[i] = rbody.StreamedMetric{ - Namespace: core.JoinNamespace(m[i].Namespace()), + Namespace: m[i].Namespace().String(), Data: m[i].Data(), - Source: m[i].Source(), Timestamp: m[i].Timestamp(), Tags: m[i].Tags(), } diff --git a/mgmt/tribe/tribe_test.go b/mgmt/tribe/tribe_test.go index e7d34c3ae..802a583a5 100644 --- a/mgmt/tribe/tribe_test.go +++ b/mgmt/tribe/tribe_test.go @@ -83,7 +83,7 @@ func getTestConfig() *Config { } func TestTribeFullStateSync(t *testing.T) { - log.SetLevel(log.DebugLevel) + log.SetLevel(log.WarnLevel) tribes := []*tribe{} numOfTribes := 5 agreement1 := "agreement1" diff --git a/plugin/collector/snap-collector-mock1/mock/mock.go b/plugin/collector/snap-collector-mock1/mock/mock.go index 18d6635c0..b9e68401e 100644 --- a/plugin/collector/snap-collector-mock1/mock/mock.go +++ b/plugin/collector/snap-collector-mock1/mock/mock.go @@ -22,7 +22,6 @@ package mock import ( "fmt" "math/rand" - "os" "time" "github.com/intelsdi-x/snap/control/plugin" @@ -48,21 +47,20 @@ type Mock struct { } // CollectMetrics collects metrics for testing -func (f *Mock) CollectMetrics(mts []plugin.PluginMetricType) ([]plugin.PluginMetricType, error) { - metrics := []plugin.PluginMetricType{} +func (f *Mock) CollectMetrics(mts []plugin.MetricType) ([]plugin.MetricType, error) { + metrics := []plugin.MetricType{} rand.Seed(time.Now().UTC().UnixNano()) - hostname, _ := os.Hostname() for i, p := range mts { - if mts[i].Namespace()[2] == "*" { + if mts[i].Namespace()[2].Value == "*" { for j := 0; j < 10; j++ { - v := fmt.Sprintf("host%d", j) + ns := make([]core.NamespaceElement, len(mts[i].Namespace())) + copy(ns, mts[i].Namespace()) + ns[2].Value = fmt.Sprintf("host%d", j) data := randInt(65, 90) - mt := plugin.PluginMetricType{ + mt := plugin.MetricType{ Data_: data, - Namespace_: []string{"intel", "mock", v, "baz"}, - Source_: hostname, + Namespace_: ns, Timestamp_: time.Now(), - Labels_: mts[i].Labels(), Version_: mts[i].Version(), } metrics = append(metrics, mt) @@ -74,7 +72,6 @@ func (f *Mock) CollectMetrics(mts []plugin.PluginMetricType) ([]plugin.PluginMet p.Data_ = fmt.Sprintf("The mock collected data! config data: user=%s password=%s", p.Config().Table()["user"], p.Config().Table()["password"]) } p.Timestamp_ = time.Now() - p.Source_ = hostname metrics = append(metrics, p) } } @@ -82,20 +79,19 @@ func (f *Mock) CollectMetrics(mts []plugin.PluginMetricType) ([]plugin.PluginMet } //GetMetricTypes returns metric types for testing -func (f *Mock) GetMetricTypes(cfg plugin.PluginConfigType) ([]plugin.PluginMetricType, error) { - mts := []plugin.PluginMetricType{} +func (f *Mock) GetMetricTypes(cfg plugin.ConfigType) ([]plugin.MetricType, error) { + mts := []plugin.MetricType{} if _, ok := cfg.Table()["test-fail"]; ok { return mts, fmt.Errorf("missing on-load plugin config entry 'test'") } if _, ok := cfg.Table()["test"]; ok { - mts = append(mts, plugin.PluginMetricType{Namespace_: []string{"intel", "mock", "test"}}) + mts = append(mts, plugin.MetricType{Namespace_: core.NewNamespace([]string{"intel", "mock", "test"})}) } - mts = append(mts, plugin.PluginMetricType{Namespace_: []string{"intel", "mock", "foo"}}) - mts = append(mts, plugin.PluginMetricType{Namespace_: []string{"intel", "mock", "bar"}}) - mts = append(mts, plugin.PluginMetricType{ - Namespace_: []string{"intel", "mock", "*", "baz"}, - Labels_: []core.Label{{Index: 2, Name: "host"}}, - }) + mts = append(mts, plugin.MetricType{Namespace_: core.NewNamespace([]string{"intel", "mock", "foo"})}) + mts = append(mts, plugin.MetricType{Namespace_: core.NewNamespace([]string{"intel", "mock", "bar"})}) + mts = append(mts, plugin.MetricType{Namespace_: core.NewNamespace([]string{"intel", "mock"}). + AddDynamicElement("host", "name of the host"). + AddStaticElement("baz")}) return mts, nil } diff --git a/plugin/collector/snap-collector-mock2/mock/mock.go b/plugin/collector/snap-collector-mock2/mock/mock.go index 010a8d128..91b7b0b69 100644 --- a/plugin/collector/snap-collector-mock2/mock/mock.go +++ b/plugin/collector/snap-collector-mock2/mock/mock.go @@ -23,7 +23,6 @@ import ( "fmt" "log" "math/rand" - "os" "time" "github.com/intelsdi-x/snap/control/plugin" @@ -46,36 +45,34 @@ type Mock struct { } // CollectMetrics collects metrics for testing -func (f *Mock) CollectMetrics(mts []plugin.PluginMetricType) ([]plugin.PluginMetricType, error) { +func (f *Mock) CollectMetrics(mts []plugin.MetricType) ([]plugin.MetricType, error) { for _, p := range mts { log.Printf("collecting %+v\n", p) } rand.Seed(time.Now().UTC().UnixNano()) - metrics := []plugin.PluginMetricType{} + metrics := []plugin.MetricType{} for i := range mts { if c, ok := mts[i].Config().Table()["panic"]; ok && c.(ctypes.ConfigValueBool).Value { panic("Opps!") } - if mts[i].Namespace()[2] == "*" { - hostname, _ := os.Hostname() + if mts[i].Namespace()[2].Value == "*" { for j := 0; j < 10; j++ { - v := fmt.Sprintf("host%d", j) + ns := mts[i].Namespace() + ns[2].Value = fmt.Sprintf("host%d", j) data := randInt(65, 90) - mt := plugin.PluginMetricType{ + mt := plugin.MetricType{ Data_: data, - Namespace_: []string{"intel", "mock", v, "baz"}, - Source_: hostname, + Namespace_: ns, Timestamp_: time.Now(), - Labels_: mts[i].Labels(), Version_: mts[i].Version(), + Unit_: mts[i].Unit(), } metrics = append(metrics, mt) } } else { data := randInt(65, 90) mts[i].Data_ = data - mts[i].Source_, _ = os.Hostname() mts[i].Timestamp_ = time.Now() metrics = append(metrics, mts[i]) } @@ -84,19 +81,34 @@ func (f *Mock) CollectMetrics(mts []plugin.PluginMetricType) ([]plugin.PluginMet } //GetMetricTypes returns metric types for testing -func (f *Mock) GetMetricTypes(cfg plugin.PluginConfigType) ([]plugin.PluginMetricType, error) { - mts := []plugin.PluginMetricType{} +func (f *Mock) GetMetricTypes(cfg plugin.ConfigType) ([]plugin.MetricType, error) { + mts := []plugin.MetricType{} if _, ok := cfg.Table()["test-fail"]; ok { return mts, fmt.Errorf("testing") } if _, ok := cfg.Table()["test"]; ok { - mts = append(mts, plugin.PluginMetricType{Namespace_: []string{"intel", "mock", "test"}}) + mts = append(mts, plugin.MetricType{ + Namespace_: core.NewNamespace([]string{"intel", "mock", "test"}), + Description_: "mock description", + Unit_: "mock unit", + }) } - mts = append(mts, plugin.PluginMetricType{Namespace_: []string{"intel", "mock", "foo"}}) - mts = append(mts, plugin.PluginMetricType{Namespace_: []string{"intel", "mock", "bar"}}) - mts = append(mts, plugin.PluginMetricType{ - Namespace_: []string{"intel", "mock", "*", "baz"}, - Labels_: []core.Label{{Index: 2, Name: "host"}}, + mts = append(mts, plugin.MetricType{ + Namespace_: core.NewNamespace([]string{"intel", "mock", "foo"}), + Description_: "mock description", + Unit_: "mock unit", + }) + mts = append(mts, plugin.MetricType{ + Namespace_: core.NewNamespace([]string{"intel", "mock", "bar"}), + Description_: "mock description", + Unit_: "mock unit", + }) + mts = append(mts, plugin.MetricType{ + Namespace_: core.NewNamespace([]string{"intel", "mock"}). + AddDynamicElement("host", "name of the host"). + AddStaticElement("baz"), + Description_: "mock description", + Unit_: "mock unit", }) return mts, nil } diff --git a/plugin/processor/snap-processor-passthru/passthru/passthru.go b/plugin/processor/snap-processor-passthru/passthru/passthru.go index a3f9488c1..eee1164c0 100644 --- a/plugin/processor/snap-processor-passthru/passthru/passthru.go +++ b/plugin/processor/snap-processor-passthru/passthru/passthru.go @@ -59,8 +59,8 @@ func (p *passthruProcessor) Process(contentType string, content []byte, config m // The following block is for testing config see.. control_test.go if _, ok := config["test"]; ok { logger.Print("test configuration found") - var metrics []plugin.PluginMetricType - //Decodes the content into pluginMetricType + var metrics []plugin.MetricType + //Decodes the content into MetricType dec := gob.NewDecoder(bytes.NewBuffer(content)) if err := dec.Decode(&metrics); err != nil { logger.Printf("Error decoding: error=%v content=%v", err, content) @@ -68,7 +68,7 @@ func (p *passthruProcessor) Process(contentType string, content []byte, config m } for idx, m := range metrics { - if m.Namespace()[0] == "foo" { + if m.Namespace()[0].Value == "foo" { logger.Print("found foo metric") metrics[idx].Data_ = 2 } diff --git a/plugin/publisher/snap-publisher-file/file/file.go b/plugin/publisher/snap-publisher-file/file/file.go index 2bdbd8a7e..60af43ee3 100644 --- a/plugin/publisher/snap-publisher-file/file/file.go +++ b/plugin/publisher/snap-publisher-file/file/file.go @@ -50,7 +50,7 @@ func NewFilePublisher() *filePublisher { func (f *filePublisher) Publish(contentType string, content []byte, config map[string]ctypes.ConfigValue) error { logger := log.New() logger.Println("Publishing started") - var metrics []plugin.PluginMetricType + var metrics []plugin.MetricType switch contentType { case plugin.SnapGOBContentType: @@ -73,11 +73,7 @@ func (f *filePublisher) Publish(contentType string, content []byte, config map[s } w := bufio.NewWriter(file) for _, m := range metrics { - source := m.Source() - if source == "" { - source = "unknown" - } - w.WriteString(fmt.Sprintf("%v|%v|%v|%v\n", m.Timestamp(), m.Namespace(), m.Data(), source)) + w.WriteString(fmt.Sprintf("%v|%v|%v\n", m.Timestamp(), m.Namespace(), m.Data())) } w.Flush() diff --git a/plugin/publisher/snap-publisher-file/file/file_test.go b/plugin/publisher/snap-publisher-file/file/file_test.go index 7cebbcd1d..4e46c0df9 100644 --- a/plugin/publisher/snap-publisher-file/file/file_test.go +++ b/plugin/publisher/snap-publisher-file/file/file_test.go @@ -28,6 +28,7 @@ import ( "time" "github.com/intelsdi-x/snap/control/plugin" + "github.com/intelsdi-x/snap/core" "github.com/intelsdi-x/snap/core/ctypes" . "github.com/smartystreets/goconvey/convey" @@ -35,8 +36,8 @@ import ( func TestFilePublish(t *testing.T) { var buf bytes.Buffer - metrics := []plugin.PluginMetricType{ - *plugin.NewPluginMetricType([]string{"foo"}, time.Now(), "", nil, nil, 99), + metrics := []plugin.MetricType{ + *plugin.NewMetricType(core.NewNamespace([]string{"foo"}), time.Now(), nil, "", 99), } config := make(map[string]ctypes.ConfigValue) enc := gob.NewEncoder(&buf) diff --git a/scheduler/job.go b/scheduler/job.go index 69f89fd98..5bebf69a2 100644 --- a/scheduler/job.go +++ b/scheduler/job.go @@ -185,12 +185,12 @@ func newCollectorJob(metricTypes []core.RequestedMetric, deadlineDuration time.D } type metric struct { - namespace []string + namespace core.Namespace version int config *cdata.ConfigDataNode } -func (m *metric) Namespace() []string { +func (m *metric) Namespace() core.Namespace { return m.namespace } @@ -203,10 +203,10 @@ func (m *metric) Version() int { } func (m *metric) Data() interface{} { return nil } +func (m *metric) Description() string { return "" } +func (m *metric) Unit() string { return "" } func (m *metric) Tags() map[string]string { return nil } -func (m *metric) Labels() []core.Label { return nil } func (m *metric) LastAdvertisedTime() time.Time { return time.Unix(0, 0) } -func (m *metric) Source() string { return "" } func (m *metric) Timestamp() time.Time { return time.Unix(0, 0) } func (c *collectorJob) Run() { @@ -222,11 +222,11 @@ func (c *collectorJob) Run() { nss, err := c.collector.ExpandWildcards(rmt.Namespace()) if err != nil { // use metric directly from the workflow - nss = [][]string{rmt.Namespace()} + nss = []core.Namespace{rmt.Namespace()} } for _, ns := range nss { - config := c.configDataTree.Get(ns) + config := c.configDataTree.Get(ns.Strings()) if config == nil { config = cdata.NewNode() @@ -302,9 +302,9 @@ func (p *processJob) Run() { case *collectorJob: switch p.contentType { case plugin.SnapGOBContentType: - metrics := make([]plugin.PluginMetricType, len(pt.metrics)) + metrics := make([]plugin.MetricType, len(pt.metrics)) for i, m := range pt.metrics { - if mt, ok := m.(plugin.PluginMetricType); ok { + if mt, ok := m.(plugin.MetricType); ok { metrics[i] = mt } else { log.WithFields(log.Fields{ @@ -435,13 +435,13 @@ func (p *publisherJob) Run() { case collectJobType: switch p.contentType { case plugin.SnapGOBContentType: - metrics := make([]plugin.PluginMetricType, len(p.parentJob.(*collectorJob).metrics)) + metrics := make([]plugin.MetricType, len(p.parentJob.(*collectorJob).metrics)) for i, m := range p.parentJob.(*collectorJob).metrics { switch mt := m.(type) { - case plugin.PluginMetricType: + case plugin.MetricType: metrics[i] = mt default: - panic("unsupported type") + panic(fmt.Sprintf("unsupported type %T", mt)) } } enc.Encode(metrics) diff --git a/scheduler/job_test.go b/scheduler/job_test.go index 5c8abd6dc..5609058f9 100644 --- a/scheduler/job_test.go +++ b/scheduler/job_test.go @@ -38,7 +38,7 @@ func (m *mockCollector) CollectMetrics([]core.Metric, time.Time, string) ([]core return nil, nil } -func (m *mockCollector) ExpandWildcards([]string) ([][]string, serror.SnapError) { +func (m *mockCollector) ExpandWildcards(core.Namespace) ([]core.Namespace, serror.SnapError) { return nil, nil } diff --git a/scheduler/scheduler.go b/scheduler/scheduler.go index a5be51627..9b445f1ab 100644 --- a/scheduler/scheduler.go +++ b/scheduler/scheduler.go @@ -74,7 +74,7 @@ type managesMetrics interface { ValidateDeps([]core.Metric, []core.SubscribedPlugin) []serror.SnapError SubscribeDeps(string, []core.Metric, []core.Plugin) []serror.SnapError UnsubscribeDeps(string, []core.Metric, []core.Plugin) []serror.SnapError - MatchQueryToNamespaces([]string) ([][]string, serror.SnapError) + MatchQueryToNamespaces(core.Namespace) ([]core.Namespace, serror.SnapError) } // ManagesPluginContentTypes is an interface to a plugin manager that can tell us what content accept and returns are supported. @@ -83,7 +83,7 @@ type managesPluginContentTypes interface { } type collectsMetrics interface { - ExpandWildcards([]string) ([][]string, serror.SnapError) + ExpandWildcards(core.Namespace) ([]core.Namespace, serror.SnapError) CollectMetrics([]core.Metric, time.Time, string) ([]core.Metric, []error) } @@ -594,14 +594,14 @@ func (s *scheduler) gatherMetricsAndPlugins(wf *schedulerWorkflow) ([]core.Metri nss, err := s.metricManager.MatchQueryToNamespaces(m.Namespace()) if err != nil { // use metric directly from the workflow - nss = [][]string{m.Namespace()} + nss = []core.Namespace{m.Namespace()} } for _, ns := range nss { mts = append(mts, &metric{ namespace: ns, version: m.Version(), - config: wf.configTree.Get(ns), + config: wf.configTree.Get(ns.Strings()), }) } } diff --git a/scheduler/scheduler_test.go b/scheduler/scheduler_test.go index 5afc63e00..5c9a8fe54 100644 --- a/scheduler/scheduler_test.go +++ b/scheduler/scheduler_test.go @@ -109,11 +109,11 @@ func (m *mockMetricManager) UnsubscribeDeps(taskID string, mts []core.Metric, pr return nil } -func (m *mockMetricManager) MatchQueryToNamespaces([]string) ([][]string, serror.SnapError) { +func (m *mockMetricManager) MatchQueryToNamespaces(core.Namespace) ([]core.Namespace, serror.SnapError) { return nil, nil } -func (m *mockMetricManager) ExpandWildcards([]string) ([][]string, serror.SnapError) { +func (m *mockMetricManager) ExpandWildcards(core.Namespace) ([]core.Namespace, serror.SnapError) { return nil, nil } diff --git a/scheduler/workflow.go b/scheduler/workflow.go index 4d146ed42..5497c6089 100644 --- a/scheduler/workflow.go +++ b/scheduler/workflow.go @@ -87,7 +87,7 @@ func convertCollectionNode(cnode *wmap.CollectWorkflowMapNode, wf *schedulerWork mts := cnode.GetMetrics() wf.metrics = make([]core.RequestedMetric, len(mts)) for i, m := range mts { - wf.metrics[i] = m + wf.metrics[i] = &metric{namespace: core.NewNamespace(m.Namespace()), version: m.Version()} } // Get our config data tree diff --git a/scheduler/workflow_string.go b/scheduler/workflow_string.go index 56bdbc957..151e02390 100644 --- a/scheduler/workflow_string.go +++ b/scheduler/workflow_string.go @@ -21,7 +21,6 @@ package scheduler import ( "fmt" - "strings" "github.com/intelsdi-x/snap/core" ) @@ -46,7 +45,7 @@ func (s *schedulerWorkflow) String() string { func metricString(pad string, rm []core.RequestedMetric) string { var out string for _, m := range rm { - out += fmt.Sprintf("%sMetric: %s\n", pad, strings.Join(m.Namespace(), "/")) + out += fmt.Sprintf("%sMetric: %s\n", pad, m.Namespace().String()) out += fmt.Sprintf("%s Version: %d\n", pad, m.Version()) } return out diff --git a/scheduler/workflow_test.go b/scheduler/workflow_test.go index 6ea39862d..518670edb 100644 --- a/scheduler/workflow_test.go +++ b/scheduler/workflow_test.go @@ -266,7 +266,7 @@ func (m *Mock1) CollectMetrics([]core.Metric, time.Time, string) ([]core.Metric, return nil, nil } -func (m *Mock1) ExpandWildcards([]string) ([][]string, serror.SnapError) { +func (m *Mock1) ExpandWildcards(core.Namespace) ([]core.Namespace, serror.SnapError) { return nil, nil }