Skip to content
This repository has been archived by the owner on Nov 8, 2022. It is now read-only.

Commit

Permalink
Merge pull request #499 from jcooklin/fb/query_model_support
Browse files Browse the repository at this point in the history
Adds support for query model
  • Loading branch information
pittma committed Nov 14, 2015
2 parents a49dc8a + e62d02f commit 2ecaaba
Show file tree
Hide file tree
Showing 21 changed files with 558 additions and 192 deletions.
3 changes: 2 additions & 1 deletion control/control.go
Original file line number Diff line number Diff line change
Expand Up @@ -435,6 +435,7 @@ func (p *pluginControl) validateMetricTypeSubscription(mt core.RequestedMetric,
controlLogger.WithFields(log.Fields{
"_block": "validate-metric-subscription",
"namespace": mt.Namespace(),
"version": mt.Version(),
}).Info("subscription called on metric")

m, err := p.metricCatalog.Get(mt.Namespace(), mt.Version())
Expand Down Expand Up @@ -499,7 +500,7 @@ func (p *pluginControl) gatherCollectors(mts []core.Metric) ([]core.Plugin, []pe
for _, mt := range mts {
m, err := p.metricCatalog.Get(mt.Namespace(), mt.Version())
if err != nil {
perrs = append(perrs, perror.New(err))
perrs = append(perrs, err)
continue
}
// if the metric subscription is to version -1, we need to carry
Expand Down
97 changes: 92 additions & 5 deletions control/control_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -698,6 +698,7 @@ func TestMetricExists(t *testing.T) {
type MockMetricType struct {
namespace []string
cfg *cdata.ConfigDataNode
ver int
}

func (m MockMetricType) MarshalJSON() ([]byte, error) {
Expand Down Expand Up @@ -727,7 +728,7 @@ func (m MockMetricType) Timestamp() time.Time {
}

func (m MockMetricType) Version() int {
return 1
return m.ver
}

func (m MockMetricType) Config() *cdata.ConfigDataNode {
Expand All @@ -738,6 +739,9 @@ func (m MockMetricType) Data() interface{} {
return nil
}

func (m MockMetricType) Labels() []core.Label { return nil }
func (m MockMetricType) Tags() map[string]string { return nil }

func TestMetricConfig(t *testing.T) {
Convey("required config provided by task", t, func() {
c := New()
Expand Down Expand Up @@ -797,6 +801,7 @@ func TestMetricConfig(t *testing.T) {
cd := cdata.NewNode()
m1 := MockMetricType{
namespace: []string{"intel", "mock", "foo"},
ver: 1,
}
metric, errs := c.validateMetricTypeSubscription(m1, cd)
Convey("So metric should be valid with config", func() {
Expand All @@ -808,6 +813,88 @@ func TestMetricConfig(t *testing.T) {
})
}

func TestCollectDynamicMetrics(t *testing.T) {
Convey("given a plugin using the native client", t, func() {
config := NewConfig()
config.Plugins.All.AddItem("password", ctypes.ConfigValueStr{Value: "testval"})
c := New(OptSetConfig(config), CacheExpiration(time.Second*1))
c.Start()
lpe := newListenToPluginEvent()
c.eventManager.RegisterHandler("Control.PluginLoaded", lpe)
load(c, PluginPath)
<-lpe.done
load(c, JSONRPC_PluginPath)
<-lpe.done
cd := cdata.NewNode()
metrics, err := c.metricCatalog.Fetch([]string{})
So(err, ShouldBeNil)
So(len(metrics), ShouldEqual, 6)
m, err := c.metricCatalog.Get([]string{"intel", "mock", "*", "baz"}, 2)
So(err, ShouldBeNil)
So(m, ShouldNotBeNil)
jsonm, err := c.metricCatalog.Get([]string{"intel", "mock", "*", "baz"}, 1)
So(err, ShouldBeNil)
So(jsonm, ShouldNotBeNil)
metric, errs := c.validateMetricTypeSubscription(m, cd)
So(errs, ShouldBeNil)
So(metric, ShouldNotBeNil)
Convey("collects metrics from plugin using native client", func() {
lp, err := c.pluginManager.get("collector:mock2:2")
So(err, ShouldBeNil)
So(lp, ShouldNotBeNil)
pool, errp := c.pluginRunner.AvailablePlugins().getOrCreatePool("collector:mock2:2")
So(errp, ShouldBeNil)
So(pool, ShouldNotBeNil)
pool.subscribe("1", unboundSubscriptionType)
err = c.pluginRunner.runPlugin(lp.Path)
So(err, ShouldBeNil)
mts, errs := c.CollectMetrics([]core.Metric{m}, time.Now().Add(time.Second*1))
hits, err := pool.plugins[1].client.CacheHits(core.JoinNamespace(m.namespace), 2)
So(err, ShouldBeNil)
So(hits, ShouldEqual, 0)
So(errs, ShouldBeNil)
So(len(mts), ShouldEqual, 10)
mts, errs = c.CollectMetrics([]core.Metric{m}, time.Now().Add(time.Second*1))
hits, err = pool.plugins[1].client.CacheHits(core.JoinNamespace(m.namespace), 2)
So(err, ShouldBeNil)
So(hits, ShouldEqual, 1)
So(errs, ShouldBeNil)
So(len(mts), ShouldEqual, 10)
pool.unsubscribe("1")
Convey("collects metrics from plugin using httpjson client", func() {
lp, err := c.pluginManager.get("collector:mock1:1")
So(err, ShouldBeNil)
So(lp, ShouldNotBeNil)
pool, errp := c.pluginRunner.AvailablePlugins().getOrCreatePool("collector:mock1:1")
So(errp, ShouldBeNil)
So(pool, ShouldNotBeNil)
pool.subscribe("1", unboundSubscriptionType)
err = c.pluginRunner.runPlugin(lp.Path)
So(err, ShouldBeNil)
mts, errs := c.CollectMetrics([]core.Metric{jsonm}, time.Now().Add(time.Second*1))
hits, err := pool.plugins[1].client.CacheHits(core.JoinNamespace(m.namespace), 1)
So(err, ShouldBeNil)
So(hits, ShouldEqual, 0)
So(errs, ShouldBeNil)
So(len(mts), ShouldEqual, 10)
mts, errs = c.CollectMetrics([]core.Metric{jsonm}, time.Now().Add(time.Second*1))
hits, err = pool.plugins[1].client.CacheHits(core.JoinNamespace(m.namespace), 1)
So(err, ShouldBeNil)
So(hits, ShouldEqual, 1)
So(errs, ShouldBeNil)
So(len(mts), ShouldEqual, 10)
hits = pool.plugins[1].client.AllCacheHits()
So(hits, ShouldEqual, 2)
misses := pool.plugins[1].client.AllCacheMisses()
So(misses, ShouldEqual, 2)
pool.unsubscribe("1")
c.Stop()
time.Sleep(100 * time.Millisecond)
})
})
})
}

func TestCollectMetrics(t *testing.T) {

Convey("given a new router", t, func() {
Expand All @@ -833,7 +920,7 @@ func TestCollectMetrics(t *testing.T) {
<-lpe.done
mts, err := c.MetricCatalog()
So(err, ShouldBeNil)
So(len(mts), ShouldEqual, 3)
So(len(mts), ShouldEqual, 4)

cd := cdata.NewNode()
cd.AddItem("password", ctypes.ConfigValueStr{Value: "testval"})
Expand Down Expand Up @@ -865,7 +952,7 @@ func TestCollectMetrics(t *testing.T) {
time.Sleep(time.Millisecond * 1100)

for x := 0; x < 5; x++ {
cr, err := c.CollectMetrics(m, time.Now().Add(time.Second*60))
cr, err := c.CollectMetrics(m, time.Now().Add(time.Second*1))
So(err, ShouldBeNil)
for i := range cr {
So(cr[i].Data(), ShouldContainSubstring, "The mock collected data!")
Expand Down Expand Up @@ -957,7 +1044,7 @@ func TestPublishMetrics(t *testing.T) {

Convey("Publish to file", func() {
metrics := []plugin.PluginMetricType{
*plugin.NewPluginMetricType([]string{"foo"}, time.Now(), "", 1),
*plugin.NewPluginMetricType([]string{"foo"}, time.Now(), "", nil, nil, 1),
}
var buf bytes.Buffer
enc := gob.NewEncoder(&buf)
Expand Down Expand Up @@ -1010,7 +1097,7 @@ func TestProcessMetrics(t *testing.T) {

Convey("process metrics", func() {
metrics := []plugin.PluginMetricType{
*plugin.NewPluginMetricType([]string{"foo"}, time.Now(), "", 1),
*plugin.NewPluginMetricType([]string{"foo"}, time.Now(), "", nil, nil, 1),
}
var buf bytes.Buffer
enc := gob.NewEncoder(&buf)
Expand Down
12 changes: 12 additions & 0 deletions control/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@ type metricType struct {
config *cdata.ConfigDataNode
data interface{}
source string
labels []core.Label
tags map[string]string
timestamp time.Time
}

Expand Down Expand Up @@ -143,6 +145,14 @@ func (m *metricType) Source() string {
return m.source
}

func (m *metricType) Tags() map[string]string {
return m.tags
}

func (m *metricType) Labels() []core.Label {
return m.labels
}

func (m *metricType) Timestamp() time.Time {
return m.timestamp
}
Expand Down Expand Up @@ -174,6 +184,8 @@ func (mc *metricCatalog) AddLoadedMetricType(lp *loadedPlugin, mt core.Metric) {
namespace: mt.Namespace(),
version: mt.Version(),
lastAdvertisedTime: mt.LastAdvertisedTime(),
tags: mt.Tags(),
labels: mt.Labels(),
policy: lp.ConfigPolicy.Get(mt.Namespace()),
}
mc.Add(&newMt)
Expand Down
71 changes: 51 additions & 20 deletions control/plugin/client/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ limitations under the License.
package client

import (
"errors"
"fmt"
"time"

Expand All @@ -35,53 +36,83 @@ var (
table: make(map[string]*cachecell),
}
cacheLog = log.WithField("_module", "client-cache")

ErrCacheEntryDoesNotExist = errors.New("cache entry does not exist")
)

type cachecell struct {
time time.Time
metric core.Metric
hits uint64
misses uint64
time time.Time
metric core.Metric
metrics []core.Metric
hits uint64
misses uint64
}

type cache struct {
table map[string]*cachecell
}

func (c *cache) get(key string) core.Metric {
func (c *cache) get(ns string, version int) interface{} {
var (
cell *cachecell
ok bool
)

key := fmt.Sprintf("%v:%v", ns, version)
if cell, ok = c.table[key]; ok && time.Since(cell.time) < GlobalCacheExpiration {
cell.hits++
cacheLog.WithFields(log.Fields{
"namespace": key,
"hits": cell.hits,
"misses": cell.misses,
}).Debug(fmt.Sprintf("cache hit [%s]", key))
return cell.metric
}
if ok {
cell.misses++
if cell.metric != nil {
return cell.metric
}
return cell.metrics
} else {
if !ok {
c.table[key] = &cachecell{
time: time.Time{},
metrics: nil,
}
}
c.table[key].misses++
cacheLog.WithFields(log.Fields{
"namespace": key,
"hits": cell.hits,
"misses": cell.misses,
"hits": c.table[key].hits,
"misses": c.table[key].misses,
}).Debug(fmt.Sprintf("cache miss [%s]", key))
}
return nil
}

func (c *cache) put(key string, metric core.Metric) {
if _, ok := c.table[key]; ok {
c.table[key].time = time.Now()
c.table[key].metric = metric
} else {
c.table[key] = &cachecell{
time: time.Now(),
metric: metric,
func (c *cache) put(ns string, version int, m interface{}) {
key := fmt.Sprintf("%v:%v", ns, version)
switch metric := m.(type) {
case core.Metric:
if _, ok := c.table[key]; ok {
c.table[key].time = time.Now()
c.table[key].metric = metric
} else {
c.table[key] = &cachecell{
time: time.Now(),
metric: metric,
}
}
case []core.Metric:
if _, ok := c.table[key]; ok {
c.table[key].time = time.Now()
c.table[key].metrics = metric
} else {
c.table[key] = &cachecell{
time: time.Now(),
metrics: metric,
}
}
default:
cacheLog.WithFields(log.Fields{
"namespace": key,
"_block": "put",
}).Error("unsupported type")
}
}
30 changes: 15 additions & 15 deletions control/plugin/client/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,16 +38,16 @@ func TestCache(t *testing.T) {
Namespace_: []string{"foo", "bar"},
}

mc.put("/foo/bar", foo)
ret := mc.get("/foo/bar")
mc.put("/foo/bar", 1, foo)
ret := mc.get("/foo/bar", 1)
So(ret, ShouldNotBeNil)
So(ret, ShouldEqual, foo)
})
Convey("returns nil if the cache cell does not exist", t, func() {
mc := &cache{
table: make(map[string]*cachecell),
}
ret := mc.get("/foo/bar")
ret := mc.get("/foo/bar", 1)
So(ret, ShouldBeNil)
})
Convey("returns nil if the cache cell has expired", t, func() {
Expand All @@ -57,10 +57,10 @@ func TestCache(t *testing.T) {
foo := &plugin.PluginMetricType{
Namespace_: []string{"foo", "bar"},
}
mc.put("/foo/bar", foo)
mc.put("/foo/bar", 1, foo)
time.Sleep(301 * time.Millisecond)

ret := mc.get("/foo/bar")
ret := mc.get("/foo/bar", 1)
So(ret, ShouldBeNil)
})
Convey("hit and miss counts", t, func() {
Expand All @@ -71,9 +71,9 @@ func TestCache(t *testing.T) {
foo := &plugin.PluginMetricType{
Namespace_: []string{"foo", "bar"},
}
mc.put("/foo/bar", foo)
mc.get("/foo/bar")
So(mc.table["/foo/bar"].hits, ShouldEqual, 1)
mc.put("/foo/bar", 1, foo)
mc.get("/foo/bar", 1)
So(mc.table["/foo/bar:1"].hits, ShouldEqual, 1)
})
Convey("ticks miss count when a cache entry is still a hit", func() {
mc := &cache{
Expand All @@ -82,10 +82,10 @@ func TestCache(t *testing.T) {
foo := &plugin.PluginMetricType{
Namespace_: []string{"foo", "bar"},
}
mc.put("/foo/bar", foo)
time.Sleep(295 * time.Millisecond)
mc.get("/foo/bar")
So(mc.table["/foo/bar"].hits, ShouldEqual, 1)
mc.put("/foo/bar", 1, foo)
time.Sleep(250 * time.Millisecond)
mc.get("/foo/bar", 1)
So(mc.table["/foo/bar:1"].hits, ShouldEqual, 1)
})
Convey("ticks miss count when a cache entry is missed", func() {
mc := &cache{
Expand All @@ -94,10 +94,10 @@ func TestCache(t *testing.T) {
foo := &plugin.PluginMetricType{
Namespace_: []string{"foo", "bar"},
}
mc.put("/foo/bar", foo)
mc.put("/foo/bar", 1, foo)
time.Sleep(301 * time.Millisecond)
mc.get("/foo/bar")
So(mc.table["/foo/bar"].misses, ShouldEqual, 1)
mc.get("/foo/bar", 1)
So(mc.table["/foo/bar:1"].misses, ShouldEqual, 1)
})
})

Expand Down
Loading

0 comments on commit 2ecaaba

Please sign in to comment.