diff --git a/control/control.go b/control/control.go index 5c2643c1d..318458c65 100644 --- a/control/control.go +++ b/control/control.go @@ -435,6 +435,7 @@ func (p *pluginControl) validateMetricTypeSubscription(mt core.RequestedMetric, controlLogger.WithFields(log.Fields{ "_block": "validate-metric-subscription", "namespace": mt.Namespace(), + "version": mt.Version(), }).Info("subscription called on metric") m, err := p.metricCatalog.Get(mt.Namespace(), mt.Version()) @@ -499,7 +500,7 @@ func (p *pluginControl) gatherCollectors(mts []core.Metric) ([]core.Plugin, []pe for _, mt := range mts { m, err := p.metricCatalog.Get(mt.Namespace(), mt.Version()) if err != nil { - perrs = append(perrs, perror.New(err)) + perrs = append(perrs, err) continue } // if the metric subscription is to version -1, we need to carry diff --git a/control/control_test.go b/control/control_test.go index b662af044..f455a47a6 100644 --- a/control/control_test.go +++ b/control/control_test.go @@ -698,6 +698,7 @@ func TestMetricExists(t *testing.T) { type MockMetricType struct { namespace []string cfg *cdata.ConfigDataNode + ver int } func (m MockMetricType) MarshalJSON() ([]byte, error) { @@ -727,7 +728,7 @@ func (m MockMetricType) Timestamp() time.Time { } func (m MockMetricType) Version() int { - return 1 + return m.ver } func (m MockMetricType) Config() *cdata.ConfigDataNode { @@ -738,6 +739,9 @@ func (m MockMetricType) Data() interface{} { return nil } +func (m MockMetricType) Labels() []core.Label { return nil } +func (m MockMetricType) Tags() map[string]string { return nil } + func TestMetricConfig(t *testing.T) { Convey("required config provided by task", t, func() { c := New() @@ -797,6 +801,7 @@ func TestMetricConfig(t *testing.T) { cd := cdata.NewNode() m1 := MockMetricType{ namespace: []string{"intel", "mock", "foo"}, + ver: 1, } metric, errs := c.validateMetricTypeSubscription(m1, cd) Convey("So metric should be valid with config", func() { @@ -808,6 +813,88 @@ func TestMetricConfig(t *testing.T) { }) } +func TestCollectDynamicMetrics(t *testing.T) { + Convey("given a plugin using the native client", t, func() { + config := NewConfig() + config.Plugins.All.AddItem("password", ctypes.ConfigValueStr{Value: "testval"}) + c := New(OptSetConfig(config), CacheExpiration(time.Second*1)) + c.Start() + lpe := newListenToPluginEvent() + c.eventManager.RegisterHandler("Control.PluginLoaded", lpe) + load(c, PluginPath) + <-lpe.done + load(c, JSONRPC_PluginPath) + <-lpe.done + cd := cdata.NewNode() + metrics, err := c.metricCatalog.Fetch([]string{}) + So(err, ShouldBeNil) + So(len(metrics), ShouldEqual, 6) + m, err := c.metricCatalog.Get([]string{"intel", "mock", "*", "baz"}, 2) + So(err, ShouldBeNil) + So(m, ShouldNotBeNil) + jsonm, err := c.metricCatalog.Get([]string{"intel", "mock", "*", "baz"}, 1) + So(err, ShouldBeNil) + So(jsonm, ShouldNotBeNil) + metric, errs := c.validateMetricTypeSubscription(m, cd) + So(errs, ShouldBeNil) + So(metric, ShouldNotBeNil) + Convey("collects metrics from plugin using native client", func() { + lp, err := c.pluginManager.get("collector:mock2:2") + So(err, ShouldBeNil) + So(lp, ShouldNotBeNil) + pool, errp := c.pluginRunner.AvailablePlugins().getOrCreatePool("collector:mock2:2") + So(errp, ShouldBeNil) + So(pool, ShouldNotBeNil) + pool.subscribe("1", unboundSubscriptionType) + err = c.pluginRunner.runPlugin(lp.Path) + So(err, ShouldBeNil) + mts, errs := c.CollectMetrics([]core.Metric{m}, time.Now().Add(time.Second*1)) + hits, err := pool.plugins[1].client.CacheHits(core.JoinNamespace(m.namespace), 2) + So(err, ShouldBeNil) + So(hits, ShouldEqual, 0) + So(errs, ShouldBeNil) + So(len(mts), ShouldEqual, 10) + mts, errs = c.CollectMetrics([]core.Metric{m}, time.Now().Add(time.Second*1)) + hits, err = pool.plugins[1].client.CacheHits(core.JoinNamespace(m.namespace), 2) + So(err, ShouldBeNil) + So(hits, ShouldEqual, 1) + So(errs, ShouldBeNil) + So(len(mts), ShouldEqual, 10) + pool.unsubscribe("1") + Convey("collects metrics from plugin using httpjson client", func() { + lp, err := c.pluginManager.get("collector:mock1:1") + So(err, ShouldBeNil) + So(lp, ShouldNotBeNil) + pool, errp := c.pluginRunner.AvailablePlugins().getOrCreatePool("collector:mock1:1") + So(errp, ShouldBeNil) + So(pool, ShouldNotBeNil) + pool.subscribe("1", unboundSubscriptionType) + err = c.pluginRunner.runPlugin(lp.Path) + So(err, ShouldBeNil) + mts, errs := c.CollectMetrics([]core.Metric{jsonm}, time.Now().Add(time.Second*1)) + hits, err := pool.plugins[1].client.CacheHits(core.JoinNamespace(m.namespace), 1) + So(err, ShouldBeNil) + So(hits, ShouldEqual, 0) + So(errs, ShouldBeNil) + So(len(mts), ShouldEqual, 10) + mts, errs = c.CollectMetrics([]core.Metric{jsonm}, time.Now().Add(time.Second*1)) + hits, err = pool.plugins[1].client.CacheHits(core.JoinNamespace(m.namespace), 1) + So(err, ShouldBeNil) + So(hits, ShouldEqual, 1) + So(errs, ShouldBeNil) + So(len(mts), ShouldEqual, 10) + hits = pool.plugins[1].client.AllCacheHits() + So(hits, ShouldEqual, 2) + misses := pool.plugins[1].client.AllCacheMisses() + So(misses, ShouldEqual, 2) + pool.unsubscribe("1") + c.Stop() + time.Sleep(100 * time.Millisecond) + }) + }) + }) +} + func TestCollectMetrics(t *testing.T) { Convey("given a new router", t, func() { @@ -833,7 +920,7 @@ func TestCollectMetrics(t *testing.T) { <-lpe.done mts, err := c.MetricCatalog() So(err, ShouldBeNil) - So(len(mts), ShouldEqual, 3) + So(len(mts), ShouldEqual, 4) cd := cdata.NewNode() cd.AddItem("password", ctypes.ConfigValueStr{Value: "testval"}) @@ -865,7 +952,7 @@ func TestCollectMetrics(t *testing.T) { time.Sleep(time.Millisecond * 1100) for x := 0; x < 5; x++ { - cr, err := c.CollectMetrics(m, time.Now().Add(time.Second*60)) + cr, err := c.CollectMetrics(m, time.Now().Add(time.Second*1)) So(err, ShouldBeNil) for i := range cr { So(cr[i].Data(), ShouldContainSubstring, "The mock collected data!") @@ -957,7 +1044,7 @@ func TestPublishMetrics(t *testing.T) { Convey("Publish to file", func() { metrics := []plugin.PluginMetricType{ - *plugin.NewPluginMetricType([]string{"foo"}, time.Now(), "", 1), + *plugin.NewPluginMetricType([]string{"foo"}, time.Now(), "", nil, nil, 1), } var buf bytes.Buffer enc := gob.NewEncoder(&buf) @@ -1010,7 +1097,7 @@ func TestProcessMetrics(t *testing.T) { Convey("process metrics", func() { metrics := []plugin.PluginMetricType{ - *plugin.NewPluginMetricType([]string{"foo"}, time.Now(), "", 1), + *plugin.NewPluginMetricType([]string{"foo"}, time.Now(), "", nil, nil, 1), } var buf bytes.Buffer enc := gob.NewEncoder(&buf) diff --git a/control/metrics.go b/control/metrics.go index 6ccc1b5d9..dd524ebe0 100644 --- a/control/metrics.go +++ b/control/metrics.go @@ -68,6 +68,8 @@ type metricType struct { config *cdata.ConfigDataNode data interface{} source string + labels []core.Label + tags map[string]string timestamp time.Time } @@ -143,6 +145,14 @@ func (m *metricType) Source() string { return m.source } +func (m *metricType) Tags() map[string]string { + return m.tags +} + +func (m *metricType) Labels() []core.Label { + return m.labels +} + func (m *metricType) Timestamp() time.Time { return m.timestamp } @@ -174,6 +184,8 @@ func (mc *metricCatalog) AddLoadedMetricType(lp *loadedPlugin, mt core.Metric) { namespace: mt.Namespace(), version: mt.Version(), lastAdvertisedTime: mt.LastAdvertisedTime(), + tags: mt.Tags(), + labels: mt.Labels(), policy: lp.ConfigPolicy.Get(mt.Namespace()), } mc.Add(&newMt) diff --git a/control/plugin/client/cache.go b/control/plugin/client/cache.go index 7a51d9df1..1322a2595 100644 --- a/control/plugin/client/cache.go +++ b/control/plugin/client/cache.go @@ -20,6 +20,7 @@ limitations under the License. package client import ( + "errors" "fmt" "time" @@ -35,25 +36,28 @@ var ( table: make(map[string]*cachecell), } cacheLog = log.WithField("_module", "client-cache") + + ErrCacheEntryDoesNotExist = errors.New("cache entry does not exist") ) type cachecell struct { - time time.Time - metric core.Metric - hits uint64 - misses uint64 + time time.Time + metric core.Metric + metrics []core.Metric + hits uint64 + misses uint64 } type cache struct { table map[string]*cachecell } -func (c *cache) get(key string) core.Metric { +func (c *cache) get(ns string, version int) interface{} { var ( cell *cachecell ok bool ) - + key := fmt.Sprintf("%v:%v", ns, version) if cell, ok = c.table[key]; ok && time.Since(cell.time) < GlobalCacheExpiration { cell.hits++ cacheLog.WithFields(log.Fields{ @@ -61,27 +65,54 @@ func (c *cache) get(key string) core.Metric { "hits": cell.hits, "misses": cell.misses, }).Debug(fmt.Sprintf("cache hit [%s]", key)) - return cell.metric - } - if ok { - cell.misses++ + if cell.metric != nil { + return cell.metric + } + return cell.metrics + } else { + if !ok { + c.table[key] = &cachecell{ + time: time.Time{}, + metrics: nil, + } + } + c.table[key].misses++ cacheLog.WithFields(log.Fields{ "namespace": key, - "hits": cell.hits, - "misses": cell.misses, + "hits": c.table[key].hits, + "misses": c.table[key].misses, }).Debug(fmt.Sprintf("cache miss [%s]", key)) } return nil } -func (c *cache) put(key string, metric core.Metric) { - if _, ok := c.table[key]; ok { - c.table[key].time = time.Now() - c.table[key].metric = metric - } else { - c.table[key] = &cachecell{ - time: time.Now(), - metric: metric, +func (c *cache) put(ns string, version int, m interface{}) { + key := fmt.Sprintf("%v:%v", ns, version) + switch metric := m.(type) { + case core.Metric: + if _, ok := c.table[key]; ok { + c.table[key].time = time.Now() + c.table[key].metric = metric + } else { + c.table[key] = &cachecell{ + time: time.Now(), + metric: metric, + } } + case []core.Metric: + if _, ok := c.table[key]; ok { + c.table[key].time = time.Now() + c.table[key].metrics = metric + } else { + c.table[key] = &cachecell{ + time: time.Now(), + metrics: metric, + } + } + default: + cacheLog.WithFields(log.Fields{ + "namespace": key, + "_block": "put", + }).Error("unsupported type") } } diff --git a/control/plugin/client/cache_test.go b/control/plugin/client/cache_test.go index 303de0c72..e090260e3 100644 --- a/control/plugin/client/cache_test.go +++ b/control/plugin/client/cache_test.go @@ -38,8 +38,8 @@ func TestCache(t *testing.T) { Namespace_: []string{"foo", "bar"}, } - mc.put("/foo/bar", foo) - ret := mc.get("/foo/bar") + mc.put("/foo/bar", 1, foo) + ret := mc.get("/foo/bar", 1) So(ret, ShouldNotBeNil) So(ret, ShouldEqual, foo) }) @@ -47,7 +47,7 @@ func TestCache(t *testing.T) { mc := &cache{ table: make(map[string]*cachecell), } - ret := mc.get("/foo/bar") + ret := mc.get("/foo/bar", 1) So(ret, ShouldBeNil) }) Convey("returns nil if the cache cell has expired", t, func() { @@ -57,10 +57,10 @@ func TestCache(t *testing.T) { foo := &plugin.PluginMetricType{ Namespace_: []string{"foo", "bar"}, } - mc.put("/foo/bar", foo) + mc.put("/foo/bar", 1, foo) time.Sleep(301 * time.Millisecond) - ret := mc.get("/foo/bar") + ret := mc.get("/foo/bar", 1) So(ret, ShouldBeNil) }) Convey("hit and miss counts", t, func() { @@ -71,9 +71,9 @@ func TestCache(t *testing.T) { foo := &plugin.PluginMetricType{ Namespace_: []string{"foo", "bar"}, } - mc.put("/foo/bar", foo) - mc.get("/foo/bar") - So(mc.table["/foo/bar"].hits, ShouldEqual, 1) + mc.put("/foo/bar", 1, foo) + mc.get("/foo/bar", 1) + So(mc.table["/foo/bar:1"].hits, ShouldEqual, 1) }) Convey("ticks miss count when a cache entry is still a hit", func() { mc := &cache{ @@ -82,10 +82,10 @@ func TestCache(t *testing.T) { foo := &plugin.PluginMetricType{ Namespace_: []string{"foo", "bar"}, } - mc.put("/foo/bar", foo) - time.Sleep(295 * time.Millisecond) - mc.get("/foo/bar") - So(mc.table["/foo/bar"].hits, ShouldEqual, 1) + mc.put("/foo/bar", 1, foo) + time.Sleep(250 * time.Millisecond) + mc.get("/foo/bar", 1) + So(mc.table["/foo/bar:1"].hits, ShouldEqual, 1) }) Convey("ticks miss count when a cache entry is missed", func() { mc := &cache{ @@ -94,10 +94,10 @@ func TestCache(t *testing.T) { foo := &plugin.PluginMetricType{ Namespace_: []string{"foo", "bar"}, } - mc.put("/foo/bar", foo) + mc.put("/foo/bar", 1, foo) time.Sleep(301 * time.Millisecond) - mc.get("/foo/bar") - So(mc.table["/foo/bar"].misses, ShouldEqual, 1) + mc.get("/foo/bar", 1) + So(mc.table["/foo/bar:1"].misses, ShouldEqual, 1) }) }) diff --git a/control/plugin/client/client.go b/control/plugin/client/client.go index 812f16fe6..c538d7513 100644 --- a/control/plugin/client/client.go +++ b/control/plugin/client/client.go @@ -20,14 +20,26 @@ limitations under the License. package client import ( + "fmt" + + log "github.com/Sirupsen/logrus" + "github.com/intelsdi-x/pulse/control/plugin" "github.com/intelsdi-x/pulse/control/plugin/cpolicy" "github.com/intelsdi-x/pulse/core" "github.com/intelsdi-x/pulse/core/ctypes" ) +type PluginCacheClient interface { + CacheHits(string, int) (uint64, error) + CacheMisses(string, int) (uint64, error) + AllCacheHits() uint64 + AllCacheMisses() uint64 +} + // PluginClient A client providing common plugin method calls. type PluginClient interface { + PluginCacheClient SetKey() error Ping() error Kill(string) error @@ -52,3 +64,103 @@ type PluginPublisherClient interface { PluginClient Publish(contentType string, content []byte, config map[string]ctypes.ConfigValue) error } + +type pluginCacheClient struct{} + +// AllCacheHits returns cache hits across all metrics. +func (c *pluginCacheClient) AllCacheHits() uint64 { + var hits uint64 + for _, v := range metricCache.table { + hits += v.hits + } + return hits +} + +// AllCacheMisses returns cache misses across all metrics. +func (c *pluginCacheClient) AllCacheMisses() uint64 { + var misses uint64 + for _, v := range metricCache.table { + misses += v.misses + } + return misses +} + +// CacheHits returns the cache hits for a given metric namespace and version. +func (c *pluginCacheClient) CacheHits(ns string, version int) (uint64, error) { + key := fmt.Sprintf("%v:%v", ns, version) + if v, ok := metricCache.table[key]; ok { + return v.hits, nil + } + return 0, ErrCacheEntryDoesNotExist +} + +// CacheMisses returns the cache misses for a given metric namespace and version. +func (c *pluginCacheClient) CacheMisses(ns string, version int) (uint64, error) { + key := fmt.Sprintf("%v:%v", ns, version) + if v, ok := metricCache.table[key]; ok { + return v.misses, nil + } + return 0, ErrCacheEntryDoesNotExist +} + +// checkCache checks the cache for metric types. +// returns: +// - array of metrics that need to be collected +// - array of metrics that were returned from the cache +func checkCache(mts []core.Metric) ([]plugin.PluginMetricType, []core.Metric) { + var fromCache []core.Metric + var metricsToCollect []plugin.PluginMetricType + for _, mt := range mts { + if m := metricCache.get(core.JoinNamespace(mt.Namespace()), mt.Version()); m != nil { + switch metric := m.(type) { + case core.Metric: + fromCache = append(fromCache, metric) + case []core.Metric: + for _, met := range metric { + fromCache = append(fromCache, met) + } + default: + log.WithFields(log.Fields{ + "_module": "client", + "_block": "checkCache", + }).Error("unsupported type found in the cache") + } + } else { + mt := plugin.PluginMetricType{ + Namespace_: mt.Namespace(), + LastAdvertisedTime_: mt.LastAdvertisedTime(), + Version_: mt.Version(), + Tags_: mt.Tags(), + Labels_: mt.Labels(), + Config_: mt.Config(), + } + metricsToCollect = append(metricsToCollect, mt) + } + } + return metricsToCollect, fromCache +} + +// updateCache updates the cache with the given array of metrics. +func updateCache(mts []plugin.PluginMetricType) { + results := []core.Metric{} + dc := map[string][]core.Metric{} + for _, mt := range mts { + if mt.Labels == nil { + // cache the individual metric + metricCache.put(core.JoinNamespace(mt.Namespace_), mt.Version(), mt) + } else { + // collect the dynamic query results so we can cache + ns := make([]string, len(mt.Namespace())) + copy(ns, mt.Namespace()) + for _, label := range mt.Labels_ { + ns[label.Index] = "*" + } + if _, ok := dc[core.JoinNamespace(ns)]; !ok { + dc[core.JoinNamespace(ns)] = []core.Metric{} + } + dc[core.JoinNamespace(ns)] = append(dc[core.JoinNamespace(ns)], mt) + metricCache.put(core.JoinNamespace(ns), mt.Version(), dc[core.JoinNamespace(ns)]) + } + results = append(results, mt) + } +} diff --git a/control/plugin/client/httpjsonrpc.go b/control/plugin/client/httpjsonrpc.go index fc149e76a..ace4713d8 100644 --- a/control/plugin/client/httpjsonrpc.go +++ b/control/plugin/client/httpjsonrpc.go @@ -43,6 +43,7 @@ import ( var logger = log.WithField("_module", "client-httpjsonrpc") type httpJSONRPCClient struct { + PluginCacheClient url string id uint64 timeout time.Duration @@ -54,10 +55,11 @@ type httpJSONRPCClient struct { // NewCollectorHttpJSONRPCClient returns CollectorHttpJSONRPCClient func NewCollectorHttpJSONRPCClient(u string, timeout time.Duration, pub *rsa.PublicKey, secure bool) (PluginCollectorClient, error) { hjr := &httpJSONRPCClient{ - url: u, - timeout: timeout, - pluginType: plugin.CollectorPluginType, - encoder: encoding.NewJsonEncoder(), + PluginCacheClient: &pluginCacheClient{}, + url: u, + timeout: timeout, + pluginType: plugin.CollectorPluginType, + encoder: encoding.NewJsonEncoder(), } if secure { key, err := encrypter.GenerateKey() @@ -74,10 +76,11 @@ func NewCollectorHttpJSONRPCClient(u string, timeout time.Duration, pub *rsa.Pub func NewProcessorHttpJSONRPCClient(u string, timeout time.Duration, pub *rsa.PublicKey, secure bool) (PluginProcessorClient, error) { hjr := &httpJSONRPCClient{ - url: u, - timeout: timeout, - pluginType: plugin.ProcessorPluginType, - encoder: encoding.NewJsonEncoder(), + PluginCacheClient: &pluginCacheClient{}, + url: u, + timeout: timeout, + pluginType: plugin.ProcessorPluginType, + encoder: encoding.NewJsonEncoder(), } if secure { key, err := encrypter.GenerateKey() @@ -94,10 +97,11 @@ func NewProcessorHttpJSONRPCClient(u string, timeout time.Duration, pub *rsa.Pub func NewPublisherHttpJSONRPCClient(u string, timeout time.Duration, pub *rsa.PublicKey, secure bool) (PluginPublisherClient, error) { hjr := &httpJSONRPCClient{ - url: u, - timeout: timeout, - pluginType: plugin.PublisherPluginType, - encoder: encoding.NewJsonEncoder(), + PluginCacheClient: &pluginCacheClient{}, + url: u, + timeout: timeout, + pluginType: plugin.PublisherPluginType, + encoder: encoding.NewJsonEncoder(), } if secure { key, err := encrypter.GenerateKey() @@ -142,38 +146,22 @@ func (h *httpJSONRPCClient) Kill(reason string) error { // CollectMetrics returns collected metrics func (h *httpJSONRPCClient) CollectMetrics(mts []core.Metric) ([]core.Metric, error) { - // Here we create two slices from the requested metric collection. One which - // contains the metrics we retreived from the cache, and one from which we had - // to use the plugin. - - // This is managed by walking through the complete list and hitting the cache for each item. - // If the metric is found in the cache, we nil out that entry in the complete collection. - // Then, we walk through the collection once more and create a new slice of metrics which - // were not found in the cache. - var fromCache []core.Metric - for i, m := range mts { - var metric core.Metric - if metric = metricCache.get(core.JoinNamespace(m.Namespace())); metric != nil { - fromCache = append(fromCache, metric) - mts[i] = nil - } - } - var fromPlugin []plugin.PluginMetricType - for _, mt := range mts { - if mt != nil { - fromPlugin = append(fromPlugin, plugin.PluginMetricType{ - Namespace_: mt.Namespace(), - Config_: mt.Config(), - }) - } + + var results []core.Metric + if len(mts) == 0 { + return nil, errors.New("no metrics to collect") } - // We only need to send a request to the plugin if there are metrics which were not available in the cache. - if len(fromPlugin) > 0 { - args := &plugin.CollectMetricsArgs{PluginMetricTypes: fromPlugin} + + metricsToCollect, metricsFromCache := checkCache(mts) + + if len(metricsToCollect) > 0 { + args := &plugin.CollectMetricsArgs{PluginMetricTypes: metricsToCollect} + out, err := h.encoder.Encode(args) if err != nil { return nil, err } + res, err := h.call("Collector.CollectMetrics", []interface{}{out}) if err != nil { return nil, err @@ -186,17 +174,28 @@ func (h *httpJSONRPCClient) CollectMetrics(mts []core.Metric) ([]core.Metric, er }).Error(err) return nil, err } - var mtr plugin.CollectMetricsReply - err = h.encoder.Decode(res.Result, &mtr) + r := &plugin.CollectMetricsReply{} + err = h.encoder.Decode(res.Result, r) if err != nil { return nil, err } - for _, m := range mtr.PluginMetrics { - metricCache.put(core.JoinNamespace(m.Namespace()), m) - fromCache = append(fromCache, m) + + updateCache(r.PluginMetrics) + + results = make([]core.Metric, len(metricsFromCache)+len(r.PluginMetrics)) + idx := 0 + for _, m := range r.PluginMetrics { + results[idx] = m + idx++ + } + for _, m := range metricsFromCache { + results[idx] = m + idx++ } + return results, nil + } else { + return metricsFromCache, nil } - return fromCache, nil } // GetMetricTypes returns metric types that can be collected diff --git a/control/plugin/client/httpjsonrpc_test.go b/control/plugin/client/httpjsonrpc_test.go index 1753829d6..bea67405e 100644 --- a/control/plugin/client/httpjsonrpc_test.go +++ b/control/plugin/client/httpjsonrpc_test.go @@ -78,7 +78,7 @@ func (m *mockCollectorProxy) CollectMetrics(args []byte, reply *[]byte) error { } var mts []plugin.PluginMetricType for _, i := range dargs.PluginMetricTypes { - p := plugin.NewPluginMetricType(i.Namespace(), time.Now(), "", rand.Intn(100)) + p := plugin.NewPluginMetricType(i.Namespace(), time.Now(), "", nil, nil, rand.Intn(100)) p.Config_ = i.Config() mts = append(mts, *p) } @@ -302,7 +302,7 @@ func TestHTTPJSONRPC(t *testing.T) { }) Convey("Process metrics", func() { - pmt := plugin.NewPluginMetricType([]string{"foo", "bar"}, time.Now(), "", 1) + pmt := plugin.NewPluginMetricType([]string{"foo", "bar"}, time.Now(), "", nil, nil, 1) b, _ := json.Marshal([]plugin.PluginMetricType{*pmt}) contentType, content, err := p.Process(plugin.PulseJSONContentType, b, nil) So(contentType, ShouldResemble, plugin.PulseJSONContentType) @@ -342,7 +342,7 @@ func TestHTTPJSONRPC(t *testing.T) { }) Convey("Publish metrics", func() { - pmt := plugin.NewPluginMetricType([]string{"foo", "bar"}, time.Now(), "", 1) + pmt := plugin.NewPluginMetricType([]string{"foo", "bar"}, time.Now(), "", nil, nil, 1) b, _ := json.Marshal([]plugin.PluginMetricType{*pmt}) err := p.Publish(plugin.PulseJSONContentType, b, nil) So(err, ShouldBeNil) diff --git a/control/plugin/client/native.go b/control/plugin/client/native.go index d7d84147a..460ab17f7 100644 --- a/control/plugin/client/native.go +++ b/control/plugin/client/native.go @@ -46,6 +46,7 @@ type CallsRPC interface { // Native clients use golang net/rpc for communication to a native rpc server. type PluginNativeClient struct { + PluginCacheClient connection CallsRPC pluginType plugin.PluginType encoder encoding.Encoder @@ -128,44 +129,18 @@ func (p *PluginNativeClient) Process(contentType string, content []byte, config return r.ContentType, r.Content, nil } -func (p *PluginNativeClient) CollectMetrics(coreMetricTypes []core.Metric) ([]core.Metric, error) { +func (p *PluginNativeClient) CollectMetrics(mts []core.Metric) ([]core.Metric, error) { // Convert core.MetricType slice into plugin.PluginMetricType slice as we have // to send structs over RPC - if len(coreMetricTypes) == 0 { + var results []core.Metric + if len(mts) == 0 { return nil, errors.New("no metrics to collect") } - var fromCache []core.Metric - for i, mt := range coreMetricTypes { - var metric core.Metric - // Attempt to retreive the metric from the cache. If it is available, - // nil out that entry in the requested collection. - if metric = metricCache.get(core.JoinNamespace(mt.Namespace())); metric != nil { - fromCache = append(fromCache, metric) - coreMetricTypes[i] = nil - } - } - // If the size of fromCache is equal to the length of the requested metrics, - // then we retrieved all of the requested metrics and do not need to go - // through the motions of the rpc call. - if len(fromCache) != len(coreMetricTypes) { - var pluginMetricTypes []plugin.PluginMetricType - // Walk through the requested collection. If the entry is not nil, - // add it to the slice of metrics to collect over rpc. - for i, mt := range coreMetricTypes { - if mt != nil { - pluginMetricTypes = append(pluginMetricTypes, plugin.PluginMetricType{ - Namespace_: mt.Namespace(), - LastAdvertisedTime_: mt.LastAdvertisedTime(), - Version_: mt.Version(), - }) - if mt.Config() != nil { - pluginMetricTypes[i].Config_ = mt.Config() - } - } - } + metricsToCollect, metricsFromCache := checkCache(mts) - args := plugin.CollectMetricsArgs{PluginMetricTypes: pluginMetricTypes} + if len(metricsToCollect) > 0 { + args := plugin.CollectMetricsArgs{PluginMetricTypes: metricsToCollect} out, err := p.encoder.Encode(args) if err != nil { @@ -184,18 +159,23 @@ func (p *PluginNativeClient) CollectMetrics(coreMetricTypes []core.Metric) ([]co return nil, err } - var offset int - for i, mt := range fromCache { - coreMetricTypes[i] = mt - offset++ + updateCache(r.PluginMetrics) + + results = make([]core.Metric, len(metricsFromCache)+len(r.PluginMetrics)) + idx := 0 + for _, m := range r.PluginMetrics { + results[idx] = m + idx++ } - for i, mt := range r.PluginMetrics { - metricCache.put(core.JoinNamespace(mt.Namespace_), mt) - coreMetricTypes[i+offset] = mt + for _, m := range metricsFromCache { + results[idx] = m + idx++ } - return coreMetricTypes, err + return results, nil + } else { + return metricsFromCache, nil } - return fromCache, nil + } func (p *PluginNativeClient) GetMetricTypes(config plugin.PluginConfigType) ([]core.Metric, error) { @@ -260,8 +240,9 @@ func newNativeClient(address string, timeout time.Duration, t plugin.PluginType, } r := rpc.NewClient(conn) p := &PluginNativeClient{ - connection: r, - pluginType: t, + PluginCacheClient: &pluginCacheClient{}, + connection: r, + pluginType: t, } p.encoder = encoding.NewGobEncoder() diff --git a/control/plugin/collector_proxy_test.go b/control/plugin/collector_proxy_test.go index d436a7457..2b8d819b1 100644 --- a/control/plugin/collector_proxy_test.go +++ b/control/plugin/collector_proxy_test.go @@ -28,6 +28,7 @@ import ( "github.com/intelsdi-x/pulse/control/plugin/cpolicy" "github.com/intelsdi-x/pulse/control/plugin/encoding" + "github.com/intelsdi-x/pulse/core" . "github.com/smartystreets/goconvey/convey" ) @@ -36,16 +37,20 @@ type mockPlugin struct { } var mockPluginMetricType []PluginMetricType = []PluginMetricType{ - *NewPluginMetricType([]string{"foo", "bar"}, time.Now(), "", 1), - *NewPluginMetricType([]string{"foo", "baz"}, time.Now(), "", 2), + *NewPluginMetricType([]string{"foo", "bar"}, time.Now(), "", nil, nil, 1), + *NewPluginMetricType([]string{"foo", "baz"}, time.Now(), "", nil, nil, 2), } func (p *mockPlugin) GetMetricTypes(cfg PluginConfigType) ([]PluginMetricType, error) { return mockPluginMetricType, nil } -func (p *mockPlugin) CollectMetrics(mockPluginMetricType []PluginMetricType) ([]PluginMetricType, error) { - return mockPluginMetricType, nil +func (p *mockPlugin) CollectMetrics(mockPluginMetricTypes []PluginMetricType) ([]PluginMetricType, error) { + for i := range mockPluginMetricTypes { + mockPluginMetricTypes[i].Labels_ = []core.Label{core.Label{Index: 0, Name: "test"}} + mockPluginMetricTypes[i].Tags_ = map[string]string{"key": "value"} + } + return mockPluginMetricTypes, nil } func (p *mockPlugin) GetConfigPolicy() (*cpolicy.ConfigPolicy, error) { @@ -126,6 +131,10 @@ func TestCollectorProxy(t *testing.T) { var mtr CollectMetricsReply err = c.Session.Decode(reply, &mtr) So(mtr.PluginMetrics[0].Namespace(), ShouldResemble, []string{"foo", "bar"}) + So(mtr.PluginMetrics[0].Labels(), ShouldNotBeNil) + So(mtr.PluginMetrics[0].Labels()[0].Name, ShouldEqual, "test") + So(mtr.PluginMetrics[0].Tags(), ShouldNotBeNil) + So(mtr.PluginMetrics[0].Tags()["key"], ShouldEqual, "value") Convey("Get error in Collect Metric ", func() { args := CollectMetricsArgs{ diff --git a/control/plugin/metric.go b/control/plugin/metric.go index a735bb9d9..be0f866a0 100644 --- a/control/plugin/metric.go +++ b/control/plugin/metric.go @@ -29,6 +29,7 @@ import ( log "github.com/Sirupsen/logrus" + "github.com/intelsdi-x/pulse/core" "github.com/intelsdi-x/pulse/core/cdata" ) @@ -86,6 +87,11 @@ func NewPluginConfigType() PluginConfigType { } } +type Label struct { + Index int `json:"index"` + Name string `json:"name"` +} + // Represents a metric type. Only used within plugins and across plugin calls. // Converted to core.MetricType before being used within modules. type PluginMetricType struct { @@ -104,6 +110,11 @@ type PluginMetricType struct { Data_ interface{} `json:"data"` + // labels are pulled from dynamic metrics to provide context for the metric + Labels_ []core.Label `json:"labels"` + + Tags_ map[string]string `json:"tags"` + // The source of the metric (host, IP, etc). Source_ string `json:"source"` @@ -112,9 +123,11 @@ type PluginMetricType struct { } // // PluginMetricType Constructor -func NewPluginMetricType(namespace []string, timestamp time.Time, source string, data interface{}) *PluginMetricType { +func NewPluginMetricType(namespace []string, timestamp time.Time, source string, tags map[string]string, labels []core.Label, data interface{}) *PluginMetricType { return &PluginMetricType{ Namespace_: namespace, + Tags_: tags, + Labels_: labels, Data_: data, Timestamp_: timestamp, Source_: source, @@ -141,6 +154,16 @@ func (p PluginMetricType) Config() *cdata.ConfigDataNode { return p.Config_ } +// Tags returns the map of tags for this metric +func (p PluginMetricType) Tags() map[string]string { + return p.Tags_ +} + +// Labels returns the array of labels for this metric +func (p PluginMetricType) Labels() []core.Label { + return p.Labels_ +} + // returns the timestamp of when the metric was collected func (p PluginMetricType) Timestamp() time.Time { return p.Timestamp_ diff --git a/control/plugin/metric_test.go b/control/plugin/metric_test.go index 860d47f9b..c363300c8 100644 --- a/control/plugin/metric_test.go +++ b/control/plugin/metric_test.go @@ -32,8 +32,8 @@ import ( func TestMetric(t *testing.T) { Convey("error on invalid pulse content type", t, func() { m := []PluginMetricType{ - *NewPluginMetricType([]string{"foo", "bar"}, time.Now(), "", 1), - *NewPluginMetricType([]string{"foo", "baz"}, time.Now(), "", 2), + *NewPluginMetricType([]string{"foo", "bar"}, time.Now(), "", nil, nil, 1), + *NewPluginMetricType([]string{"foo", "baz"}, time.Now(), "", nil, nil, 2), } a, c, e := MarshalPluginMetricTypes("foo", m) m[0].Version_ = 1 @@ -61,8 +61,8 @@ func TestMetric(t *testing.T) { Convey("marshall using pulse.* default to pulse.gob", t, func() { m := []PluginMetricType{ - *NewPluginMetricType([]string{"foo", "bar"}, time.Now(), "", 1), - *NewPluginMetricType([]string{"foo", "baz"}, time.Now(), "", "2"), + *NewPluginMetricType([]string{"foo", "bar"}, time.Now(), "", nil, nil, 1), + *NewPluginMetricType([]string{"foo", "baz"}, time.Now(), "", nil, nil, "2"), } a, c, e := MarshalPluginMetricTypes("pulse.*", m) So(e, ShouldBeNil) @@ -83,8 +83,8 @@ func TestMetric(t *testing.T) { Convey("marshall using pulse.gob", t, func() { m := []PluginMetricType{ - *NewPluginMetricType([]string{"foo", "bar"}, time.Now(), "", 1), - *NewPluginMetricType([]string{"foo", "baz"}, time.Now(), "", "2"), + *NewPluginMetricType([]string{"foo", "bar"}, time.Now(), "", nil, nil, 1), + *NewPluginMetricType([]string{"foo", "baz"}, time.Now(), "", nil, nil, "2"), } a, c, e := MarshalPluginMetricTypes("pulse.gob", m) So(e, ShouldBeNil) @@ -111,8 +111,8 @@ func TestMetric(t *testing.T) { Convey("marshall using pulse.json", t, func() { m := []PluginMetricType{ - *NewPluginMetricType([]string{"foo", "bar"}, time.Now(), "", 1), - *NewPluginMetricType([]string{"foo", "baz"}, time.Now(), "", "2"), + *NewPluginMetricType([]string{"foo", "bar"}, time.Now(), "", nil, nil, 1), + *NewPluginMetricType([]string{"foo", "baz"}, time.Now(), "", nil, nil, "2"), } a, c, e := MarshalPluginMetricTypes("pulse.json", m) So(e, ShouldBeNil) @@ -139,8 +139,8 @@ func TestMetric(t *testing.T) { Convey("error on unmarshall using bad content type", t, func() { m := []PluginMetricType{ - *NewPluginMetricType([]string{"foo", "bar"}, time.Now(), "", 1), - *NewPluginMetricType([]string{"foo", "baz"}, time.Now(), "", "2"), + *NewPluginMetricType([]string{"foo", "bar"}, time.Now(), "", nil, nil, 1), + *NewPluginMetricType([]string{"foo", "baz"}, time.Now(), "", nil, nil, "2"), } a, c, e := MarshalPluginMetricTypes("pulse.json", m) So(e, ShouldBeNil) @@ -156,8 +156,8 @@ func TestMetric(t *testing.T) { Convey("swap from pulse.gob to pulse.json", t, func() { m := []PluginMetricType{ - *NewPluginMetricType([]string{"foo", "bar"}, time.Now(), "", 1), - *NewPluginMetricType([]string{"foo", "baz"}, time.Now(), "", "2"), + *NewPluginMetricType([]string{"foo", "bar"}, time.Now(), "", nil, nil, 1), + *NewPluginMetricType([]string{"foo", "baz"}, time.Now(), "", nil, nil, "2"), } a, c, e := MarshalPluginMetricTypes("pulse.gob", m) So(e, ShouldBeNil) @@ -180,8 +180,8 @@ func TestMetric(t *testing.T) { Convey("swap from pulse.json to pulse.*", t, func() { m := []PluginMetricType{ - *NewPluginMetricType([]string{"foo", "bar"}, time.Now(), "", 1), - *NewPluginMetricType([]string{"foo", "baz"}, time.Now(), "", "2"), + *NewPluginMetricType([]string{"foo", "bar"}, time.Now(), "", nil, nil, 1), + *NewPluginMetricType([]string{"foo", "baz"}, time.Now(), "", nil, nil, "2"), } a, c, e := MarshalPluginMetricTypes("pulse.json", m) So(e, ShouldBeNil) @@ -204,8 +204,8 @@ func TestMetric(t *testing.T) { Convey("swap from pulse.json to pulse.gob", t, func() { m := []PluginMetricType{ - *NewPluginMetricType([]string{"foo", "bar"}, time.Now(), "", 1), - *NewPluginMetricType([]string{"foo", "baz"}, time.Now(), "", "2"), + *NewPluginMetricType([]string{"foo", "bar"}, time.Now(), "", nil, nil, 1), + *NewPluginMetricType([]string{"foo", "baz"}, time.Now(), "", nil, nil, "2"), } a, c, e := MarshalPluginMetricTypes("pulse.json", m) So(e, ShouldBeNil) @@ -228,8 +228,8 @@ func TestMetric(t *testing.T) { Convey("error on bad content type to swap", t, func() { m := []PluginMetricType{ - *NewPluginMetricType([]string{"foo", "bar"}, time.Now(), "", 1), - *NewPluginMetricType([]string{"foo", "baz"}, time.Now(), "", "2"), + *NewPluginMetricType([]string{"foo", "bar"}, time.Now(), "", nil, nil, 1), + *NewPluginMetricType([]string{"foo", "baz"}, time.Now(), "", nil, nil, "2"), } a, c, e := MarshalPluginMetricTypes("pulse.json", m) So(e, ShouldBeNil) diff --git a/control/plugin_manager.go b/control/plugin_manager.go index a57ea9b15..3b0e33633 100644 --- a/control/plugin_manager.go +++ b/control/plugin_manager.go @@ -354,6 +354,8 @@ func (p *pluginManager) LoadPlugin(path string, emitter gomit.Emitter) (*loadedP lastAdvertisedTime: nmt.LastAdvertisedTime(), config: nmt.Config(), data: nmt.Data(), + tags: nmt.Tags(), + labels: nmt.Labels(), } } // We quit and throw an error on bad metric versions (<1) diff --git a/control/runner_test.go b/control/runner_test.go index 4b73310ca..3c4a1985a 100644 --- a/control/runner_test.go +++ b/control/runner_test.go @@ -138,6 +138,22 @@ func (mucc *MockHealthyPluginCollectorClient) SetKey() error { return nil } +func (c *MockHealthyPluginCollectorClient) AllCacheHits() uint64 { + return 0 +} + +func (c *MockHealthyPluginCollectorClient) AllCacheMisses() uint64 { + return 0 +} + +func (c *MockHealthyPluginCollectorClient) CacheHits(key string, version int) (uint64, error) { + return 0, nil +} + +func (c *MockHealthyPluginCollectorClient) CacheMisses(key string, version int) (uint64, error) { + return 0, nil +} + type MockUnhealthyPluginCollectorClient struct{} func (mucc *MockUnhealthyPluginCollectorClient) Ping() error { @@ -156,6 +172,22 @@ func (mucc *MockUnhealthyPluginCollectorClient) SetKey() error { return nil } +func (c *MockUnhealthyPluginCollectorClient) AllCacheHits() uint64 { + return 0 +} + +func (c *MockUnhealthyPluginCollectorClient) AllCacheMisses() uint64 { + return 0 +} + +func (c *MockUnhealthyPluginCollectorClient) CacheHits(key string, version int) (uint64, error) { + return 0, nil +} + +func (c *MockUnhealthyPluginCollectorClient) CacheMisses(key string, version int) (uint64, error) { + return 0, nil +} + type MockEmitter struct{} func (me *MockEmitter) Emit(gomit.EventBody) (int, error) { return 0, nil } diff --git a/core/metric.go b/core/metric.go index fe18e54db..ecb089fdc 100644 --- a/core/metric.go +++ b/core/metric.go @@ -27,6 +27,11 @@ import ( "github.com/intelsdi-x/pulse/core/cdata" ) +type Label struct { + Index int `json:"index"` + Name string `json:"name"` +} + // Metric represents a Pulse metric collected or to be collected type Metric interface { RequestedMetric @@ -34,6 +39,8 @@ type Metric interface { LastAdvertisedTime() time.Time Data() interface{} Source() string + Labels() []Label + Tags() map[string]string Timestamp() time.Time } diff --git a/examples/tasks/mock-file.json b/examples/tasks/mock-file.json index 79fcd504e..d2d16d5d7 100644 --- a/examples/tasks/mock-file.json +++ b/examples/tasks/mock-file.json @@ -8,7 +8,8 @@ "collect": { "metrics": { "/intel/mock/foo": {}, - "/intel/mock/bar": {} + "/intel/mock/bar": {}, + "/intel/mock/*/baz": {} }, "config": { "/intel/mock": { @@ -18,13 +19,11 @@ }, "process": [ { - "plugin_name": "passthru", - "plugin_version": 1, + "plugin_name": "passthru", "process": null, "publish": [ { - "plugin_name": "file", - "plugin_version": 1, + "plugin_name": "file", "config": { "file": "/tmp/published" } diff --git a/mgmt/rest/client/client_func_test.go b/mgmt/rest/client/client_func_test.go index 8f2233e64..911d9cc75 100644 --- a/mgmt/rest/client/client_func_test.go +++ b/mgmt/rest/client/client_func_test.go @@ -209,15 +209,19 @@ func TestPulseClient(t *testing.T) { Convey("MetricCatalog", func() { m := c.GetMetricCatalog() So(m.Err, ShouldBeNil) - So(m.Len(), ShouldEqual, 4) - So(m.Catalog[0].Namespace, ShouldEqual, "/intel/mock/bar") + So(m.Len(), ShouldEqual, 6) + So(m.Catalog[0].Namespace, ShouldEqual, "/intel/mock/*/baz") So(m.Catalog[0].Version, ShouldEqual, 1) - So(m.Catalog[1].Namespace, ShouldEqual, "/intel/mock/bar") + So(m.Catalog[1].Namespace, ShouldEqual, "/intel/mock/*/baz") So(m.Catalog[1].Version, ShouldEqual, 2) - So(m.Catalog[2].Namespace, ShouldEqual, "/intel/mock/foo") + So(m.Catalog[2].Namespace, ShouldEqual, "/intel/mock/bar") So(m.Catalog[2].Version, ShouldEqual, 1) - So(m.Catalog[3].Namespace, ShouldEqual, "/intel/mock/foo") + So(m.Catalog[3].Namespace, ShouldEqual, "/intel/mock/bar") So(m.Catalog[3].Version, ShouldEqual, 2) + So(m.Catalog[4].Namespace, ShouldEqual, "/intel/mock/foo") + So(m.Catalog[4].Version, ShouldEqual, 1) + So(m.Catalog[5].Namespace, ShouldEqual, "/intel/mock/foo") + So(m.Catalog[5].Version, ShouldEqual, 2) }) Convey("FetchMetrics", func() { Convey("leaf metric all versions", func() { @@ -234,10 +238,12 @@ func TestPulseClient(t *testing.T) { }) Convey("version 2 non-leaf metrics", func() { m := c.FetchMetrics("/intel/mock/*", 2) - So(m.Catalog[0].Namespace, ShouldEqual, "/intel/mock/bar") + So(m.Catalog[0].Namespace, ShouldEqual, "/intel/mock/*/baz") So(m.Catalog[0].Version, ShouldEqual, 2) - So(m.Catalog[1].Namespace, ShouldEqual, "/intel/mock/foo") + So(m.Catalog[1].Namespace, ShouldEqual, "/intel/mock/bar") So(m.Catalog[1].Version, ShouldEqual, 2) + So(m.Catalog[2].Namespace, ShouldEqual, "/intel/mock/foo") + So(m.Catalog[2].Version, ShouldEqual, 2) }) }) }) diff --git a/plugin/collector/pulse-collector-mock1/mock/mock.go b/plugin/collector/pulse-collector-mock1/mock/mock.go index 879f47b3d..c5ec9dcd8 100644 --- a/plugin/collector/pulse-collector-mock1/mock/mock.go +++ b/plugin/collector/pulse-collector-mock1/mock/mock.go @@ -21,11 +21,13 @@ package mock import ( "fmt" + "math/rand" "os" "time" "github.com/intelsdi-x/pulse/control/plugin" "github.com/intelsdi-x/pulse/control/plugin/cpolicy" + "github.com/intelsdi-x/pulse/core" "github.com/intelsdi-x/pulse/core/ctypes" ) @@ -47,20 +49,38 @@ type Mock struct { // CollectMetrics collects metrics for testing func (f *Mock) CollectMetrics(mts []plugin.PluginMetricType) ([]plugin.PluginMetricType, error) { - metrics := make([]plugin.PluginMetricType, len(mts)) + metrics := []plugin.PluginMetricType{} + rand.Seed(time.Now().UTC().UnixNano()) + hostname, _ := os.Hostname() for i, p := range mts { - var data string - if cv, ok := p.Config().Table()["test"]; ok { - data = fmt.Sprintf("The mock collected data! config data: user=%s password=%s test=%v", p.Config().Table()["user"], p.Config().Table()["password"], cv.(ctypes.ConfigValueBool).Value) + if mts[i].Namespace()[2] == "*" { + for j := 0; j < 10; j++ { + v := fmt.Sprintf("host%d", j) + data := randInt(65, 90) + mt := plugin.PluginMetricType{ + Data_: data, + Namespace_: []string{"intel", "mock", v, "baz"}, + Source_: hostname, + Timestamp_: time.Now(), + Labels_: mts[i].Labels(), + Version_: mts[i].Version(), + } + metrics = append(metrics, mt) + } } else { - data = fmt.Sprintf("The mock collected data! config data: user=%s password=%s", p.Config().Table()["user"], p.Config().Table()["password"]) + var data string + if cv, ok := p.Config().Table()["test"]; ok { + data = fmt.Sprintf("The mock collected data! config data: user=%s password=%s test=%v", p.Config().Table()["user"], p.Config().Table()["password"], cv.(ctypes.ConfigValueBool).Value) + } else { + data = fmt.Sprintf("The mock collected data! config data: user=%s password=%s", p.Config().Table()["user"], p.Config().Table()["password"]) + } + mt := plugin.PluginMetricType{ + Data_: data, + Timestamp_: time.Now(), + Source_: hostname, + } + metrics = append(metrics, mt) } - metrics[i] = plugin.PluginMetricType{ - Namespace_: p.Namespace(), - Data_: data, - Timestamp_: time.Now(), - } - metrics[i].Source_, _ = os.Hostname() } return metrics, nil } @@ -76,6 +96,10 @@ func (f *Mock) GetMetricTypes(cfg plugin.PluginConfigType) ([]plugin.PluginMetri } mts = append(mts, plugin.PluginMetricType{Namespace_: []string{"intel", "mock", "foo"}}) mts = append(mts, plugin.PluginMetricType{Namespace_: []string{"intel", "mock", "bar"}}) + mts = append(mts, plugin.PluginMetricType{ + Namespace_: []string{"intel", "mock", "*", "baz"}, + Labels_: []core.Label{core.Label{Index: 2, Name: "host"}}, + }) return mts, nil } @@ -95,3 +119,8 @@ func (f *Mock) GetConfigPolicy() (*cpolicy.ConfigPolicy, error) { func Meta() *plugin.PluginMeta { return plugin.NewPluginMeta(Name, Version, Type, []string{plugin.PulseGOBContentType}, []string{plugin.PulseGOBContentType}, plugin.Unsecure(true)) } + +//Random number generator +func randInt(min int, max int) int { + return min + rand.Intn(max-min) +} diff --git a/plugin/collector/pulse-collector-mock2/mock/mock.go b/plugin/collector/pulse-collector-mock2/mock/mock.go index 8f461e5fa..00b0d71a6 100644 --- a/plugin/collector/pulse-collector-mock2/mock/mock.go +++ b/plugin/collector/pulse-collector-mock2/mock/mock.go @@ -28,6 +28,7 @@ import ( "github.com/intelsdi-x/pulse/control/plugin" "github.com/intelsdi-x/pulse/control/plugin/cpolicy" + "github.com/intelsdi-x/pulse/core" ) const ( @@ -43,24 +44,38 @@ const ( type Mock struct { } -//Random number generator -func randInt(min int, max int) int { - return min + rand.Intn(max-min) -} - // CollectMetrics collects metrics for testing func (f *Mock) CollectMetrics(mts []plugin.PluginMetricType) ([]plugin.PluginMetricType, error) { for _, p := range mts { - log.Println("collecting", p) + log.Printf("collecting %+v", p) } rand.Seed(time.Now().UTC().UnixNano()) + metrics := []plugin.PluginMetricType{} for i := range mts { - data := randInt(65, 90) - mts[i].Data_ = data - mts[i].Source_, _ = os.Hostname() - mts[i].Timestamp_ = time.Now() + if mts[i].Namespace()[2] == "*" { + hostname, _ := os.Hostname() + for j := 0; j < 10; j++ { + v := fmt.Sprintf("host%d", j) + data := randInt(65, 90) + mt := plugin.PluginMetricType{ + Data_: data, + Namespace_: []string{"intel", "mock", v, "baz"}, + Source_: hostname, + Timestamp_: time.Now(), + Labels_: mts[i].Labels(), + Version_: mts[i].Version(), + } + metrics = append(metrics, mt) + } + } else { + data := randInt(65, 90) + mts[i].Data_ = data + mts[i].Source_, _ = os.Hostname() + mts[i].Timestamp_ = time.Now() + metrics = append(metrics, mts[i]) + } } - return mts, nil + return metrics, nil } //GetMetricTypes returns metric types for testing @@ -74,6 +89,10 @@ func (f *Mock) GetMetricTypes(cfg plugin.PluginConfigType) ([]plugin.PluginMetri } mts = append(mts, plugin.PluginMetricType{Namespace_: []string{"intel", "mock", "foo"}}) mts = append(mts, plugin.PluginMetricType{Namespace_: []string{"intel", "mock", "bar"}}) + mts = append(mts, plugin.PluginMetricType{ + Namespace_: []string{"intel", "mock", "*", "baz"}, + Labels_: []core.Label{core.Label{Index: 2, Name: "host"}}, + }) return mts, nil } @@ -93,3 +112,8 @@ func (f *Mock) GetConfigPolicy() (*cpolicy.ConfigPolicy, error) { func Meta() *plugin.PluginMeta { return plugin.NewPluginMeta(Name, Version, Type, []string{plugin.PulseGOBContentType}, []string{plugin.PulseGOBContentType}) } + +//Random number generator +func randInt(min int, max int) int { + return min + rand.Intn(max-min) +} diff --git a/plugin/publisher/pulse-publisher-file/file/file_test.go b/plugin/publisher/pulse-publisher-file/file/file_test.go index 17f620d52..6482261e8 100644 --- a/plugin/publisher/pulse-publisher-file/file/file_test.go +++ b/plugin/publisher/pulse-publisher-file/file/file_test.go @@ -36,7 +36,7 @@ import ( func TestFilePublish(t *testing.T) { var buf bytes.Buffer metrics := []plugin.PluginMetricType{ - *plugin.NewPluginMetricType([]string{"foo"}, time.Now(), "", 99), + *plugin.NewPluginMetricType([]string{"foo"}, time.Now(), "", nil, nil, 99), } config := make(map[string]ctypes.ConfigValue) enc := gob.NewEncoder(&buf) diff --git a/scheduler/job.go b/scheduler/job.go index f21acbd85..661e43998 100644 --- a/scheduler/job.go +++ b/scheduler/job.go @@ -132,6 +132,8 @@ func (m *metric) Version() int { } func (m *metric) Data() interface{} { return nil } +func (m *metric) Tags() map[string]string { return nil } +func (m *metric) Labels() []core.Label { return nil } func (m *metric) LastAdvertisedTime() time.Time { return time.Unix(0, 0) } func (m *metric) Source() string { return "" } func (m *metric) Timestamp() time.Time { return time.Unix(0, 0) } @@ -224,7 +226,12 @@ func (p *processJob) Run() { case plugin.PulseGOBContentType: metrics := make([]plugin.PluginMetricType, len(p.parentJob.(*collectorJob).metrics)) for i, m := range p.parentJob.(*collectorJob).metrics { - metrics[i] = *plugin.NewPluginMetricType(m.Namespace(), m.Timestamp(), m.Source(), m.Data()) + switch mt := m.(type) { + case plugin.PluginMetricType: + metrics[i] = mt + default: + panic("unsupported type") + } } enc.Encode(metrics) _, content, errs := p.processor.ProcessMetrics(p.contentType, buf.Bytes(), p.pluginName, p.pluginVersion, p.config) @@ -314,7 +321,12 @@ func (p *publisherJob) Run() { case plugin.PulseGOBContentType: metrics := make([]plugin.PluginMetricType, len(p.parentJob.(*collectorJob).metrics)) for i, m := range p.parentJob.(*collectorJob).metrics { - metrics[i] = *plugin.NewPluginMetricType(m.Namespace(), m.Timestamp(), m.Source(), m.Data()) + switch mt := m.(type) { + case plugin.PluginMetricType: + metrics[i] = mt + default: + panic("unsupported type") + } } enc.Encode(metrics) errs := p.publisher.PublishMetrics(p.contentType, buf.Bytes(), p.pluginName, p.pluginVersion, p.config)