Skip to content
This repository has been archived by the owner on Nov 8, 2022. It is now read-only.

Commit

Permalink
caching support for collector plugins
Browse files Browse the repository at this point in the history
This commit adds support for caching collections for a default amount of
time (500ms).
  • Loading branch information
pittma committed Sep 3, 2015
1 parent 1d138b9 commit c8371e1
Show file tree
Hide file tree
Showing 6 changed files with 254 additions and 61 deletions.
65 changes: 65 additions & 0 deletions control/plugin/client/cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package client

import (
"fmt"
"time"

log "github.com/Sirupsen/logrus"
"github.com/intelsdi-x/pulse/core"
)

var (
cacheExpiration = time.Duration(500 * time.Millisecond)
metricCache = cache{
table: make(map[string]*cachecell),
}
cacheLog = log.WithField("_module", "client-cache")
)

type cachecell struct {
time time.Time
metric core.Metric
hits uint64
misses uint64
}

type cache struct {
table map[string]*cachecell
}

func (c *cache) get(key string) core.Metric {
var (
cell *cachecell
ok bool
)
if cell, ok = c.table[key]; ok && time.Since(cell.time) < cacheExpiration {
cell.hits++
cacheLog.WithFields(log.Fields{
"namespace": key,
"hits": cell.hits,
"misses": cell.misses,
}).Debug(fmt.Sprintf("cache hit [%s]", key))
return cell.metric
}
if ok {
cell.misses++
cacheLog.WithFields(log.Fields{
"namespace": key,
"hits": cell.hits,
"misses": cell.misses,
}).Debug(fmt.Sprintf("cache miss [%s]", key))
}
return nil
}

func (c *cache) put(key string, metric core.Metric) {
if _, ok := c.table[key]; ok {
c.table[key].time = time.Now()
c.table[key].metric = metric
} else {
c.table[key] = &cachecell{
time: time.Now(),
metric: metric,
}
}
}
70 changes: 70 additions & 0 deletions control/plugin/client/cache_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
package client

import (
"testing"
"time"

. "github.com/smartystreets/goconvey/convey"

"github.com/intelsdi-x/pulse/control/plugin"
)

func TestCache(t *testing.T) {
Convey("puts and gets a metric", t, func() {
mc := &cache{
table: make(map[string]*cachecell),
}
foo := &plugin.PluginMetricType{
Namespace_: []string{"foo", "bar"},
}
mc.put("/foo/bar", foo)
ret := mc.get("/foo/bar")
So(ret, ShouldNotBeNil)
So(ret, ShouldEqual, foo)
})
Convey("returns nil if the cache cell does not exist", t, func() {
mc := &cache{
table: make(map[string]*cachecell),
}
ret := mc.get("/foo/bar")
So(ret, ShouldBeNil)
})
Convey("returns nil if the cache cell has expired", t, func() {
mc := &cache{
table: make(map[string]*cachecell),
}
foo := &plugin.PluginMetricType{
Namespace_: []string{"foo", "bar"},
}
mc.put("/foo/bar", foo)
time.Sleep(501 * time.Millisecond)
ret := mc.get("/foo/bar")
So(ret, ShouldBeNil)
})
Convey("hit and miss counts", t, func() {
Convey("ticks hit count when a cache entry is hit", func() {
mc := &cache{
table: make(map[string]*cachecell),
}
foo := &plugin.PluginMetricType{
Namespace_: []string{"foo", "bar"},
}
mc.put("/foo/bar", foo)
mc.get("/foo/bar")
So(mc.table["/foo/bar"].hits, ShouldEqual, 1)
})
Convey("ticks miss count when a cache entry is missed", func() {
mc := &cache{
table: make(map[string]*cachecell),
}
foo := &plugin.PluginMetricType{
Namespace_: []string{"foo", "bar"},
}
mc.put("/foo/bar", foo)
time.Sleep(501 * time.Millisecond)
mc.get("/foo/bar")
So(mc.table["/foo/bar"].misses, ShouldEqual, 1)
})
})

}
106 changes: 66 additions & 40 deletions control/plugin/client/httpjsonrpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,65 +63,91 @@ func (h *httpJSONRPCClient) Kill(reason string) error {
}

// CollectMetrics returns collected metrics
func (h *httpJSONRPCClient) CollectMetrics(mts_ []core.Metric) ([]core.Metric, error) {
mts := make([]core.Metric, len(mts_))
for i, m := range mts_ {
mts[i] = &plugin.PluginMetricType{
Namespace_: m.Namespace(),
Config_: m.Config(),
func (h *httpJSONRPCClient) CollectMetrics(mts []core.Metric) ([]core.Metric, error) {
// Here we create two slices from the requested metric collection. One which
// contains the metrics we retreived from the cache, and one from which we had
// to use the plugin.

// This is managed by walking through the complete list and hitting the cache for each item.
// If the metric is found in the cache, we nil out that entry in the complete collection.
// Then, we walk through the collection once more and create a new slice of metrics which
// were not found in the cache.
var fromCache []core.Metric
for i, m := range mts {
var metric core.Metric
if metric = metricCache.get(core.JoinNamespace(m.Namespace())); metric != nil {
fromCache = append(fromCache, metric)
mts[i] = nil
}
}
res, err := h.call("Collector.CollectMetrics", []interface{}{mts})
if err != nil {
return nil, err
}
var metrics []core.Metric
if _, ok := res["result"]; !ok {
err := errors.New("Invalid response: expected the response map to contain the key 'result'.")
logger.WithFields(log.Fields{
"_block": "CollectMetrics",
"jsonrpc response": fmt.Sprintf("%+v", res),
}).Error(err)
return nil, err
var fromPlugin []core.Metric
for _, mt := range mts {
if mt != nil {
fromPlugin = append(fromPlugin, &plugin.PluginMetricType{
Namespace_: mt.Namespace(),
Config_: mt.Config(),
})
}
}
if resmap, ok := res["result"].(map[string]interface{}); ok {
if _, ok := resmap["PluginMetrics"]; !ok {
err := errors.New("Invalid response: expected the result value to be a map that contains key 'PluginMetrics'.")
// We only need to send a request to the plugin if there are metrics which were not available in the cache.
if len(fromPlugin) > 0 {
res, err := h.call("Collector.CollectMetrics", []interface{}{fromPlugin})
if err != nil {
return nil, err
}
var metrics []core.Metric
if _, ok := res["result"]; !ok {
err := errors.New("Invalid response: expected the response map to contain the key 'result'.")
logger.WithFields(log.Fields{
"_block": "CollectMetrics",
"jsonrpc response": fmt.Sprintf("%+v", res),
}).Error(err)
return nil, err
}
if pms, ok := resmap["PluginMetrics"].([]interface{}); ok {
for _, m := range pms {
j, err := json.Marshal(m)
if err != nil {
return nil, err
}
pmt := &plugin.PluginMetricType{}
if err := json.Unmarshal(j, &pmt); err != nil {
return nil, err
if resmap, ok := res["result"].(map[string]interface{}); ok {
if _, ok := resmap["PluginMetrics"]; !ok {
err := errors.New("Invalid response: expected the result value to be a map that contains key 'PluginMetrics'.")
logger.WithFields(log.Fields{
"_block": "CollectMetrics",
"jsonrpc response": fmt.Sprintf("%+v", res),
}).Error(err)
return nil, err
}
if pms, ok := resmap["PluginMetrics"].([]interface{}); ok {
for _, m := range pms {
j, err := json.Marshal(m)
if err != nil {
return nil, err
}
pmt := &plugin.PluginMetricType{}
if err := json.Unmarshal(j, &pmt); err != nil {
return nil, err
}
metrics = append(metrics, pmt)
}
metrics = append(metrics, pmt)
} else {
err := errors.New("Invalid response: expected 'PluginMetrics' to contain a list of metrics")
logger.WithFields(log.Fields{
"_block": "CollectMetrics",
"jsonrpc response": fmt.Sprintf("%+v", res),
}).Error(err)
return nil, err
}
} else {
err := errors.New("Invalid response: expected 'PluginMetrics' to contain a list of metrics")
err := errors.New("Invalid response: expected 'result' to be a map")
logger.WithFields(log.Fields{
"_block": "CollectMetrics",
"jsonrpc response": fmt.Sprintf("%+v", res),
}).Error(err)
return nil, err
}
} else {
err := errors.New("Invalid response: expected 'result' to be a map")
logger.WithFields(log.Fields{
"_block": "CollectMetrics",
"jsonrpc response": fmt.Sprintf("%+v", res),
}).Error(err)
return nil, err
for _, m := range metrics {
metricCache.put(core.JoinNamespace(m.Namespace()), m)
}
metrics = append(metrics, fromCache...)
return metrics, err
}
return metrics, err
return fromCache, nil
}

// GetMetricTypes returns metric types that can be collected
Expand Down
7 changes: 4 additions & 3 deletions control/plugin/client/httpjsonrpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,7 @@ func TestHTTPJSONRPC(t *testing.T) {
cdn.AddItem("someInt", ctypes.ConfigValueInt{Value: 1})
cdn.AddItem("password", ctypes.ConfigValueStr{Value: "secure"})

time.Sleep(500 * time.Millisecond)
mts, err := c.CollectMetrics([]core.Metric{
&plugin.PluginMetricType{
Namespace_: []string{"foo", "bar"},
Expand Down Expand Up @@ -251,6 +252,7 @@ func TestHTTPJSONRPC(t *testing.T) {
cdn := cdata.NewNode()
cdn.AddItem("someInt", ctypes.ConfigValueInt{Value: 1})

time.Sleep(500 * time.Millisecond)
mts, err := c.CollectMetrics([]core.Metric{
&plugin.PluginMetricType{
Namespace_: []string{"foo", "bar"},
Expand All @@ -264,14 +266,13 @@ func TestHTTPJSONRPC(t *testing.T) {
So(mts[0].Config().Table(), ShouldNotBeEmpty)
So(mts[0].Config().Table()["someInt"].Type(), ShouldResemble, "integer")

Convey("Get and proces the ConfigPolicyTree", func() {
Convey("Get and process the ConfigPolicyTree", func() {
cpt, err := c.GetConfigPolicyTree()
So(err, ShouldBeNil)
So(cpt, ShouldNotBeNil)
So(cpt.Get([]string{"foo", "bar"}), ShouldNotBeNil)
node := cpt.Get([]string{"foo", "bar"})
So(err, ShouldBeNil)
So(node, ShouldNotBeNil)
So(err, ShouldBeNil)
cpn, cperrs := node.Process(mts[0].Config().Table())
So(cpn, ShouldBeNil)
So(cperrs.Errors(), ShouldNotBeEmpty)
Expand Down
64 changes: 46 additions & 18 deletions control/plugin/client/native.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package client

import (
"encoding/gob"
"errors"
"fmt"
"net"
"net/rpc"
Expand Down Expand Up @@ -72,30 +73,57 @@ func (p *PluginNativeClient) Process(contentType string, content []byte, config
func (p *PluginNativeClient) CollectMetrics(coreMetricTypes []core.Metric) ([]core.Metric, error) {
// Convert core.MetricType slice into plugin.PluginMetricType slice as we have
// to send structs over RPC
pluginMetricTypes := make([]plugin.PluginMetricType, len(coreMetricTypes))
for i, _ := range coreMetricTypes {
pluginMetricTypes[i] = plugin.PluginMetricType{
Namespace_: coreMetricTypes[i].Namespace(),
LastAdvertisedTime_: coreMetricTypes[i].LastAdvertisedTime(),
Version_: coreMetricTypes[i].Version(),
}
if coreMetricTypes[i].Config() != nil {
///pluginMetricTypes[i].Config_ = coreMetricTypes[i].Config().Table()
pluginMetricTypes[i].Config_ = coreMetricTypes[i].Config()
if len(coreMetricTypes) == 0 {
return nil, errors.New("no metrics to collect")
}

var fromCache []core.Metric
for i, mt := range coreMetricTypes {
var metric core.Metric
// Attempt to retreive the metric from the cache. If it is available,
// nil out that entry in the requested collection.
if metric = metricCache.get(core.JoinNamespace(mt.Namespace())); metric != nil {
fromCache = append(fromCache, metric)
coreMetricTypes[i] = nil
}
}
// If the size of fromCache is equal to the length of the requested metrics,
// then we retrieved all of the requested metrics and do not need to go the
// motions of the rpc call.
if len(fromCache) != len(coreMetricTypes) {
var pluginMetricTypes []plugin.PluginMetricType
// Walk through the requested collection. If the entry is not nil,
// add it to the slice of metrics to collect over rpc.
for i, mt := range coreMetricTypes {
if mt != nil {
pluginMetricTypes = append(pluginMetricTypes, plugin.PluginMetricType{
Namespace_: mt.Namespace(),
LastAdvertisedTime_: mt.LastAdvertisedTime(),
Version_: mt.Version(),
})
if mt.Config() != nil {
pluginMetricTypes[i].Config_ = mt.Config()
}
}
}

// TODO return err if mts is empty
args := plugin.CollectMetricsArgs{PluginMetricTypes: pluginMetricTypes}
reply := plugin.CollectMetricsReply{}
args := plugin.CollectMetricsArgs{PluginMetricTypes: pluginMetricTypes}
reply := plugin.CollectMetricsReply{}

err := p.connection.Call("Collector.CollectMetrics", args, &reply)
err := p.connection.Call("Collector.CollectMetrics", args, &reply)

retMetrics := make([]core.Metric, len(reply.PluginMetrics))
for i, _ := range reply.PluginMetrics {
retMetrics[i] = reply.PluginMetrics[i]
var offset int
for i, mt := range fromCache {
coreMetricTypes[i] = mt
offset++
}
for i, mt := range reply.PluginMetrics {
metricCache.put(core.JoinNamespace(mt.Namespace_), mt)
coreMetricTypes[i+offset] = mt
}
return coreMetricTypes, err
}
return retMetrics, err
return fromCache, nil
}

func (p *PluginNativeClient) GetMetricTypes() ([]core.Metric, error) {
Expand Down
Loading

0 comments on commit c8371e1

Please sign in to comment.