diff --git a/control/plugin/client/cache.go b/control/plugin/client/cache.go new file mode 100644 index 000000000..1ab532746 --- /dev/null +++ b/control/plugin/client/cache.go @@ -0,0 +1,65 @@ +package client + +import ( + "fmt" + "time" + + log "github.com/Sirupsen/logrus" + "github.com/intelsdi-x/pulse/core" +) + +var ( + cacheExpiration = time.Duration(500 * time.Millisecond) + metricCache = cache{ + table: make(map[string]*cachecell), + } + cacheLog = log.WithField("_module", "client-cache") +) + +type cachecell struct { + time time.Time + metric core.Metric + hits uint64 + misses uint64 +} + +type cache struct { + table map[string]*cachecell +} + +func (c *cache) get(key string) core.Metric { + var ( + cell *cachecell + ok bool + ) + if cell, ok = c.table[key]; ok && time.Since(cell.time) < cacheExpiration { + cell.hits++ + cacheLog.WithFields(log.Fields{ + "namespace": key, + "hits": cell.hits, + "misses": cell.misses, + }).Debug(fmt.Sprintf("cache hit [%s]", key)) + return cell.metric + } + if ok { + cell.misses++ + cacheLog.WithFields(log.Fields{ + "namespace": key, + "hits": cell.hits, + "misses": cell.misses, + }).Debug(fmt.Sprintf("cache miss [%s]", key)) + } + return nil +} + +func (c *cache) put(key string, metric core.Metric) { + if _, ok := c.table[key]; ok { + c.table[key].time = time.Now() + c.table[key].metric = metric + } else { + c.table[key] = &cachecell{ + time: time.Now(), + metric: metric, + } + } +} diff --git a/control/plugin/client/cache_test.go b/control/plugin/client/cache_test.go new file mode 100644 index 000000000..16d6025cb --- /dev/null +++ b/control/plugin/client/cache_test.go @@ -0,0 +1,70 @@ +package client + +import ( + "testing" + "time" + + . "github.com/smartystreets/goconvey/convey" + + "github.com/intelsdi-x/pulse/control/plugin" +) + +func TestCache(t *testing.T) { + Convey("puts and gets a metric", t, func() { + mc := &cache{ + table: make(map[string]*cachecell), + } + foo := &plugin.PluginMetricType{ + Namespace_: []string{"foo", "bar"}, + } + mc.put("/foo/bar", foo) + ret := mc.get("/foo/bar") + So(ret, ShouldNotBeNil) + So(ret, ShouldEqual, foo) + }) + Convey("returns nil if the cache cell does not exist", t, func() { + mc := &cache{ + table: make(map[string]*cachecell), + } + ret := mc.get("/foo/bar") + So(ret, ShouldBeNil) + }) + Convey("returns nil if the cache cell has expired", t, func() { + mc := &cache{ + table: make(map[string]*cachecell), + } + foo := &plugin.PluginMetricType{ + Namespace_: []string{"foo", "bar"}, + } + mc.put("/foo/bar", foo) + time.Sleep(501 * time.Millisecond) + ret := mc.get("/foo/bar") + So(ret, ShouldBeNil) + }) + Convey("hit and miss counts", t, func() { + Convey("ticks hit count when a cache entry is hit", func() { + mc := &cache{ + table: make(map[string]*cachecell), + } + foo := &plugin.PluginMetricType{ + Namespace_: []string{"foo", "bar"}, + } + mc.put("/foo/bar", foo) + mc.get("/foo/bar") + So(mc.table["/foo/bar"].hits, ShouldEqual, 1) + }) + Convey("ticks miss count when a cache entry is missed", func() { + mc := &cache{ + table: make(map[string]*cachecell), + } + foo := &plugin.PluginMetricType{ + Namespace_: []string{"foo", "bar"}, + } + mc.put("/foo/bar", foo) + time.Sleep(501 * time.Millisecond) + mc.get("/foo/bar") + So(mc.table["/foo/bar"].misses, ShouldEqual, 1) + }) + }) + +} diff --git a/control/plugin/client/httpjsonrpc.go b/control/plugin/client/httpjsonrpc.go index d64d4226b..248b07e1c 100644 --- a/control/plugin/client/httpjsonrpc.go +++ b/control/plugin/client/httpjsonrpc.go @@ -63,65 +63,91 @@ func (h *httpJSONRPCClient) Kill(reason string) error { } // CollectMetrics returns collected metrics -func (h *httpJSONRPCClient) CollectMetrics(mts_ []core.Metric) ([]core.Metric, error) { - mts := make([]core.Metric, len(mts_)) - for i, m := range mts_ { - mts[i] = &plugin.PluginMetricType{ - Namespace_: m.Namespace(), - Config_: m.Config(), +func (h *httpJSONRPCClient) CollectMetrics(mts []core.Metric) ([]core.Metric, error) { + // Here we create two slices from the requested metric collection. One which + // contains the metrics we retreived from the cache, and one from which we had + // to use the plugin. + + // This is managed by walking through the complete list and hitting the cache for each item. + // If the metric is found in the cache, we nil out that entry in the complete collection. + // Then, we walk through the collection once more and create a new slice of metrics which + // were not found in the cache. + var fromCache []core.Metric + for i, m := range mts { + var metric core.Metric + if metric = metricCache.get(core.JoinNamespace(m.Namespace())); metric != nil { + fromCache = append(fromCache, metric) + mts[i] = nil } } - res, err := h.call("Collector.CollectMetrics", []interface{}{mts}) - if err != nil { - return nil, err - } - var metrics []core.Metric - if _, ok := res["result"]; !ok { - err := errors.New("Invalid response: expected the response map to contain the key 'result'.") - logger.WithFields(log.Fields{ - "_block": "CollectMetrics", - "jsonrpc response": fmt.Sprintf("%+v", res), - }).Error(err) - return nil, err + var fromPlugin []core.Metric + for _, mt := range mts { + if mt != nil { + fromPlugin = append(fromPlugin, &plugin.PluginMetricType{ + Namespace_: mt.Namespace(), + Config_: mt.Config(), + }) + } } - if resmap, ok := res["result"].(map[string]interface{}); ok { - if _, ok := resmap["PluginMetrics"]; !ok { - err := errors.New("Invalid response: expected the result value to be a map that contains key 'PluginMetrics'.") + // We only need to send a request to the plugin if there are metrics which were not available in the cache. + if len(fromPlugin) > 0 { + res, err := h.call("Collector.CollectMetrics", []interface{}{fromPlugin}) + if err != nil { + return nil, err + } + var metrics []core.Metric + if _, ok := res["result"]; !ok { + err := errors.New("Invalid response: expected the response map to contain the key 'result'.") logger.WithFields(log.Fields{ "_block": "CollectMetrics", "jsonrpc response": fmt.Sprintf("%+v", res), }).Error(err) return nil, err } - if pms, ok := resmap["PluginMetrics"].([]interface{}); ok { - for _, m := range pms { - j, err := json.Marshal(m) - if err != nil { - return nil, err - } - pmt := &plugin.PluginMetricType{} - if err := json.Unmarshal(j, &pmt); err != nil { - return nil, err + if resmap, ok := res["result"].(map[string]interface{}); ok { + if _, ok := resmap["PluginMetrics"]; !ok { + err := errors.New("Invalid response: expected the result value to be a map that contains key 'PluginMetrics'.") + logger.WithFields(log.Fields{ + "_block": "CollectMetrics", + "jsonrpc response": fmt.Sprintf("%+v", res), + }).Error(err) + return nil, err + } + if pms, ok := resmap["PluginMetrics"].([]interface{}); ok { + for _, m := range pms { + j, err := json.Marshal(m) + if err != nil { + return nil, err + } + pmt := &plugin.PluginMetricType{} + if err := json.Unmarshal(j, &pmt); err != nil { + return nil, err + } + metrics = append(metrics, pmt) } - metrics = append(metrics, pmt) + } else { + err := errors.New("Invalid response: expected 'PluginMetrics' to contain a list of metrics") + logger.WithFields(log.Fields{ + "_block": "CollectMetrics", + "jsonrpc response": fmt.Sprintf("%+v", res), + }).Error(err) + return nil, err } } else { - err := errors.New("Invalid response: expected 'PluginMetrics' to contain a list of metrics") + err := errors.New("Invalid response: expected 'result' to be a map") logger.WithFields(log.Fields{ "_block": "CollectMetrics", "jsonrpc response": fmt.Sprintf("%+v", res), }).Error(err) return nil, err } - } else { - err := errors.New("Invalid response: expected 'result' to be a map") - logger.WithFields(log.Fields{ - "_block": "CollectMetrics", - "jsonrpc response": fmt.Sprintf("%+v", res), - }).Error(err) - return nil, err + for _, m := range metrics { + metricCache.put(core.JoinNamespace(m.Namespace()), m) + } + metrics = append(metrics, fromCache...) + return metrics, err } - return metrics, err + return fromCache, nil } // GetMetricTypes returns metric types that can be collected diff --git a/control/plugin/client/httpjsonrpc_test.go b/control/plugin/client/httpjsonrpc_test.go index d6b9d5400..117cff90c 100644 --- a/control/plugin/client/httpjsonrpc_test.go +++ b/control/plugin/client/httpjsonrpc_test.go @@ -218,6 +218,7 @@ func TestHTTPJSONRPC(t *testing.T) { cdn.AddItem("someInt", ctypes.ConfigValueInt{Value: 1}) cdn.AddItem("password", ctypes.ConfigValueStr{Value: "secure"}) + time.Sleep(500 * time.Millisecond) mts, err := c.CollectMetrics([]core.Metric{ &plugin.PluginMetricType{ Namespace_: []string{"foo", "bar"}, @@ -251,6 +252,7 @@ func TestHTTPJSONRPC(t *testing.T) { cdn := cdata.NewNode() cdn.AddItem("someInt", ctypes.ConfigValueInt{Value: 1}) + time.Sleep(500 * time.Millisecond) mts, err := c.CollectMetrics([]core.Metric{ &plugin.PluginMetricType{ Namespace_: []string{"foo", "bar"}, @@ -264,14 +266,13 @@ func TestHTTPJSONRPC(t *testing.T) { So(mts[0].Config().Table(), ShouldNotBeEmpty) So(mts[0].Config().Table()["someInt"].Type(), ShouldResemble, "integer") - Convey("Get and proces the ConfigPolicyTree", func() { + Convey("Get and process the ConfigPolicyTree", func() { cpt, err := c.GetConfigPolicyTree() So(err, ShouldBeNil) So(cpt, ShouldNotBeNil) - So(cpt.Get([]string{"foo", "bar"}), ShouldNotBeNil) node := cpt.Get([]string{"foo", "bar"}) - So(err, ShouldBeNil) So(node, ShouldNotBeNil) + So(err, ShouldBeNil) cpn, cperrs := node.Process(mts[0].Config().Table()) So(cpn, ShouldBeNil) So(cperrs.Errors(), ShouldNotBeEmpty) diff --git a/control/plugin/client/native.go b/control/plugin/client/native.go index c0d6822b7..fe67f6f3a 100644 --- a/control/plugin/client/native.go +++ b/control/plugin/client/native.go @@ -2,6 +2,7 @@ package client import ( "encoding/gob" + "errors" "fmt" "net" "net/rpc" @@ -72,30 +73,57 @@ func (p *PluginNativeClient) Process(contentType string, content []byte, config func (p *PluginNativeClient) CollectMetrics(coreMetricTypes []core.Metric) ([]core.Metric, error) { // Convert core.MetricType slice into plugin.PluginMetricType slice as we have // to send structs over RPC - pluginMetricTypes := make([]plugin.PluginMetricType, len(coreMetricTypes)) - for i, _ := range coreMetricTypes { - pluginMetricTypes[i] = plugin.PluginMetricType{ - Namespace_: coreMetricTypes[i].Namespace(), - LastAdvertisedTime_: coreMetricTypes[i].LastAdvertisedTime(), - Version_: coreMetricTypes[i].Version(), - } - if coreMetricTypes[i].Config() != nil { - ///pluginMetricTypes[i].Config_ = coreMetricTypes[i].Config().Table() - pluginMetricTypes[i].Config_ = coreMetricTypes[i].Config() + if len(coreMetricTypes) == 0 { + return nil, errors.New("no metrics to collect") + } + + var fromCache []core.Metric + for i, mt := range coreMetricTypes { + var metric core.Metric + // Attempt to retreive the metric from the cache. If it is available, + // nil out that entry in the requested collection. + if metric = metricCache.get(core.JoinNamespace(mt.Namespace())); metric != nil { + fromCache = append(fromCache, metric) + coreMetricTypes[i] = nil } } + // If the size of fromCache is equal to the length of the requested metrics, + // then we retrieved all of the requested metrics and do not need to go the + // motions of the rpc call. + if len(fromCache) != len(coreMetricTypes) { + var pluginMetricTypes []plugin.PluginMetricType + // Walk through the requested collection. If the entry is not nil, + // add it to the slice of metrics to collect over rpc. + for i, mt := range coreMetricTypes { + if mt != nil { + pluginMetricTypes = append(pluginMetricTypes, plugin.PluginMetricType{ + Namespace_: mt.Namespace(), + LastAdvertisedTime_: mt.LastAdvertisedTime(), + Version_: mt.Version(), + }) + if mt.Config() != nil { + pluginMetricTypes[i].Config_ = mt.Config() + } + } + } - // TODO return err if mts is empty - args := plugin.CollectMetricsArgs{PluginMetricTypes: pluginMetricTypes} - reply := plugin.CollectMetricsReply{} + args := plugin.CollectMetricsArgs{PluginMetricTypes: pluginMetricTypes} + reply := plugin.CollectMetricsReply{} - err := p.connection.Call("Collector.CollectMetrics", args, &reply) + err := p.connection.Call("Collector.CollectMetrics", args, &reply) - retMetrics := make([]core.Metric, len(reply.PluginMetrics)) - for i, _ := range reply.PluginMetrics { - retMetrics[i] = reply.PluginMetrics[i] + var offset int + for i, mt := range fromCache { + coreMetricTypes[i] = mt + offset++ + } + for i, mt := range reply.PluginMetrics { + metricCache.put(core.JoinNamespace(mt.Namespace_), mt) + coreMetricTypes[i+offset] = mt + } + return coreMetricTypes, err } - return retMetrics, err + return fromCache, nil } func (p *PluginNativeClient) GetMetricTypes() ([]core.Metric, error) { diff --git a/plugin/collector/pulse-collector-dummy2/dummy/dummy.go b/plugin/collector/pulse-collector-dummy2/dummy/dummy.go index d4a001f45..99333517a 100644 --- a/plugin/collector/pulse-collector-dummy2/dummy/dummy.go +++ b/plugin/collector/pulse-collector-dummy2/dummy/dummy.go @@ -3,6 +3,7 @@ package dummy import ( "log" "math/rand" + "os" "time" "github.com/intelsdi-x/pulse/control/plugin" @@ -36,6 +37,8 @@ func (f *Dummy) CollectMetrics(mts []plugin.PluginMetricType) ([]plugin.PluginMe for i, _ := range mts { data := randInt(65, 90) mts[i].Data_ = data + mts[i].Source_, _ = os.Hostname() + mts[i].Timestamp_ = time.Now() } return mts, nil }