Skip to content
This repository has been archived by the owner on Nov 8, 2022. It is now read-only.

Commit

Permalink
Removes labels
Browse files Browse the repository at this point in the history
  • Loading branch information
jcooklin committed Apr 20, 2016
1 parent 10d107e commit 27b4622
Show file tree
Hide file tree
Showing 24 changed files with 204 additions and 195 deletions.
9 changes: 4 additions & 5 deletions control/control.go
Original file line number Diff line number Diff line change
Expand Up @@ -542,7 +542,7 @@ func (p *pluginControl) validateMetricTypeSubscription(mt core.RequestedMetric,

if err != nil {
serrs = append(serrs, serror.New(err, map[string]interface{}{
"name": core.JoinNamespace(mt.Namespace()),
"name": mt.Namespace().String(),
"version": mt.Version(),
}))
return serrs
Expand Down Expand Up @@ -611,7 +611,7 @@ func (p *pluginControl) gatherCollectors(mts []core.Metric) ([]gatheredPlugin, [
m, err := p.metricCatalog.Get(mt.Namespace(), mt.Version())
if err != nil {
serrs = append(serrs, serror.New(err, map[string]interface{}{
"name": core.JoinNamespace(mt.Namespace()),
"name": mt.Namespace().String(),
"version": mt.Version(),
}))
continue
Expand Down Expand Up @@ -1059,16 +1059,15 @@ func groupMetricTypesByPlugin(cat catalogsMetrics, metricTypes []core.Metric) (m
return nil, serror.New(err)
}
returnedmt := plugin.PluginMetricType{
Namespace_: incomingmt.Namespace(),
Namespace_: catalogedmt.Namespace(),
LastAdvertisedTime_: catalogedmt.LastAdvertisedTime(),
Version_: incomingmt.Version(),
Tags_: catalogedmt.Tags(),
Labels_: catalogedmt.Labels(),
Config_: incomingmt.Config(),
}
lp := catalogedmt.Plugin
if lp == nil {
return nil, serror.New(errorMetricNotFound(incomingmt.Namespace().Strings()))
return nil, serror.New(errorMetricNotFound(incomingmt.Namespace().String()))
}
key := lp.Key()
pmt, _ := pmts[key]
Expand Down
20 changes: 10 additions & 10 deletions control/control_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -638,19 +638,19 @@ func (m *mc) Get(ns core.Namespace, ver int) (*metricType, error) {
policy: &mockCDProc{},
}, nil
}
return nil, serror.New(errorMetricNotFound(ns.Strings()))
return nil, serror.New(errorMetricNotFound(ns.String()))
}

func (m *mc) Subscribe(ns []string, ver int) error {
if ns[0] == "nf" {
return serror.New(errorMetricNotFound(ns))
return serror.New(errorMetricNotFound("/" + strings.Join(ns, "/")))
}
return nil
}

func (m *mc) Unsubscribe(ns []string, ver int) error {
if ns[0] == "nf" {
return serror.New(errorMetricNotFound(ns))
return serror.New(errorMetricNotFound("/" + strings.Join(ns, "/")))
}
if ns[0] == "neg" {
return errNegativeSubCount
Expand Down Expand Up @@ -793,7 +793,6 @@ func (m MockMetricType) Data() interface{} {
return nil
}

func (m MockMetricType) Labels() []core.Label { return nil }
func (m MockMetricType) Tags() map[string]string { return nil }

func TestMetricConfig(t *testing.T) {
Expand Down Expand Up @@ -1075,13 +1074,13 @@ func TestCollectDynamicMetrics(t *testing.T) {
// pool should be the global cache expiration
So(ttl, ShouldEqual, strategy.GlobalCacheExpiration)
mts, errs := c.CollectMetrics([]core.Metric{m}, time.Now().Add(time.Second*1), taskID)
hits, err := pool.CacheHits(core.JoinNamespace(m.namespace), 2, taskID)
hits, err := pool.CacheHits(m.namespace.String(), 2, taskID)
So(err, ShouldBeNil)
So(hits, ShouldEqual, 0)
So(errs, ShouldBeNil)
So(len(mts), ShouldEqual, 10)
mts, errs = c.CollectMetrics([]core.Metric{m}, time.Now().Add(time.Second*1), taskID)
hits, err = pool.CacheHits(core.JoinNamespace(m.namespace), 2, taskID)
hits, err = pool.CacheHits(m.namespace.String(), 2, taskID)
So(err, ShouldBeNil)

// todo resolve problem with caching for dynamic metrics
Expand Down Expand Up @@ -1114,7 +1113,7 @@ func TestCollectDynamicMetrics(t *testing.T) {
So(err, ShouldBeNil)
So(ttl, ShouldEqual, 1100*time.Millisecond)
mts, errs := c.CollectMetrics([]core.Metric{jsonm}, time.Now().Add(time.Second*1), uuid.New())
hits, err := pool.CacheHits(core.JoinNamespace(jsonm.namespace), jsonm.version, taskID)
hits, err := pool.CacheHits(jsonm.namespace.String(), jsonm.version, taskID)
So(pool.SubscriptionCount(), ShouldEqual, 1)
So(pool.Strategy, ShouldNotBeNil)
So(len(mts), ShouldBeGreaterThan, 0)
Expand All @@ -1123,7 +1122,7 @@ func TestCollectDynamicMetrics(t *testing.T) {
So(errs, ShouldBeNil)
So(len(mts), ShouldEqual, 10)
mts, errs = c.CollectMetrics([]core.Metric{jsonm}, time.Now().Add(time.Second*1), uuid.New())
hits, err = pool.CacheHits(core.JoinNamespace(m.namespace), 1, taskID)
hits, err = pool.CacheHits(m.namespace.String(), 1, taskID)
So(err, ShouldBeNil)

// todo resolve problem with caching for dynamic metrics
Expand Down Expand Up @@ -1516,7 +1515,7 @@ func TestPublishMetrics(t *testing.T) {

Convey("Publish to file", func() {
metrics := []plugin.PluginMetricType{
*plugin.NewPluginMetricType(core.NewNamespace([]string{"foo"}), time.Now(), "", nil, nil, 1),
*plugin.NewPluginMetricType(core.NewNamespace([]string{"foo"}), time.Now(), "", nil, 1),
}
var buf bytes.Buffer
enc := gob.NewEncoder(&buf)
Expand Down Expand Up @@ -1569,7 +1568,7 @@ func TestProcessMetrics(t *testing.T) {

Convey("process metrics", func() {
metrics := []plugin.PluginMetricType{
*plugin.NewPluginMetricType(core.NewNamespace([]string{"foo"}), time.Now(), "", nil, nil, 1),
*plugin.NewPluginMetricType(core.NewNamespace([]string{"foo"}), time.Now(), "", nil, 1),
}
var buf bytes.Buffer
enc := gob.NewEncoder(&buf)
Expand Down Expand Up @@ -1741,6 +1740,7 @@ func TestMetricSubscriptionToOlderVersion(t *testing.T) {
So(pool2.SubscriptionCount(), ShouldEqual, 1)

mts, errs = c.CollectMetrics([]core.Metric{metric}, time.Now(), "testTaskID")
So(errs, ShouldBeEmpty)
So(len(mts), ShouldEqual, 1)

// ensure the data coming back is from v1, V1's data is type string
Expand Down
56 changes: 25 additions & 31 deletions control/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,18 +49,18 @@ var (
}
)

func errorMetricNotFound(ns []string, ver ...int) error {
func errorMetricNotFound(ns string, ver ...int) error {
if len(ver) > 0 {
return fmt.Errorf("Metric not found: %s (version: %d)", "/"+strings.Join(ns, "/"), ver[0])
return fmt.Errorf("Metric not found: %s (version: %d)", ns, ver[0])
}
return fmt.Errorf("Metric not found: %s", "/"+strings.Join(ns, "/"))
return fmt.Errorf("Metric not found: %s", ns)
}

func errorMetricContainsNotAllowedChars(ns []string) error {
func errorMetricContainsNotAllowedChars(ns string) error {
return fmt.Errorf("Metric namespace %s contains not allowed characters. Avoid using %s", ns, listNotAllowedChars())
}

func errorMetricEndsWithAsterisk(ns []string) error {
func errorMetricEndsWithAsterisk(ns string) error {
return fmt.Errorf("Metric namespace %s ends with an asterisk is not allowed", ns)
}

Expand Down Expand Up @@ -99,7 +99,6 @@ type metricType struct {
config *cdata.ConfigDataNode
data interface{}
source string
labels []core.Label
tags map[string]string
timestamp time.Time
}
Expand Down Expand Up @@ -127,7 +126,7 @@ func (m *metricType) Namespace() core.Namespace {
}

func (m *metricType) NamespaceAsString() string {
return core.JoinNamespace(m.Namespace())
return m.Namespace().String()
}

func (m *metricType) Data() interface{} {
Expand Down Expand Up @@ -180,10 +179,6 @@ func (m *metricType) Tags() map[string]string {
return m.tags
}

func (m *metricType) Labels() []core.Label {
return m.labels
}

func (m *metricType) Timestamp() time.Time {
return m.timestamp
}
Expand Down Expand Up @@ -218,7 +213,7 @@ func (mc *metricCatalog) matchedNamespaces(wkey string) ([]core.Namespace, error
mkeys := mc.mKeys[wkey]

if len(mkeys) == 0 {
return nil, errorMetricNotFound(getMetricNamespace(wkey).Strings())
return nil, errorMetricNotFound(getMetricNamespace(wkey).String())
}

// convert matched keys to a slice of namespaces
Expand All @@ -232,19 +227,19 @@ func (mc *metricCatalog) GetQueriedNamespaces(ns core.Namespace) ([]core.Namespa
defer mc.mutex.Unlock()

// get metric key (might contain wildcard(s))
wkey := getMetricKey(ns)
wkey := ns.Key()

return mc.matchedNamespaces(wkey)
}

// MatchQuery matches given 'ns' which could contain an asterisk or a tuple and add them to matching map under key 'ns'
// The matched metrics namespaces are also returned (as a [][]string)
// The matched metrics namespaces are also returned (as a []core.Namespace)
func (mc *metricCatalog) MatchQuery(ns core.Namespace) ([]core.Namespace, error) {
mc.mutex.Lock()
defer mc.mutex.Unlock()

// get metric key (might contain wildcard(s))
wkey := getMetricKey(ns)
wkey := ns.Key()

// adding matched namespaces to map
mc.addItemToMatchingMap(wkey)
Expand Down Expand Up @@ -331,13 +326,13 @@ func validateMetricNamespace(ns core.Namespace) error {
for _, chars := range notAllowedChars {
for _, ch := range chars {
if strings.ContainsAny(name, ch) {
return errorMetricContainsNotAllowedChars(ns.Strings())
return errorMetricContainsNotAllowedChars(ns.String())
}
}
}
// plugin should NOT advertise metrics ending with a wildcard
if strings.HasSuffix(name, "*") {
return errorMetricEndsWithAsterisk(ns.Strings())
return errorMetricEndsWithAsterisk(ns.String())
}

return nil
Expand Down Expand Up @@ -369,8 +364,7 @@ func (mc *metricCatalog) AddLoadedMetricType(lp *loadedPlugin, mt core.Metric) e
version: mt.Version(),
lastAdvertisedTime: mt.LastAdvertisedTime(),
tags: mt.Tags(),
labels: mt.Labels(),
policy: lp.ConfigPolicy.Get(mt.Namespace().Strings()),
policy: lp.ConfigPolicy.Get(strings.Split(strings.TrimPrefix(mt.Namespace().String(), "/"), "/")),
}
mc.Add(&newMt)
return nil
Expand All @@ -391,7 +385,7 @@ func (mc *metricCatalog) Add(m *metricType) {
mc.mutex.Lock()
defer mc.mutex.Unlock()

key := getMetricKey(m.Namespace())
key := m.Namespace().Key()

// adding key as a cataloged keys (mc.keys)
mc.keys = appendIfMissing(mc.keys, key)
Expand All @@ -404,21 +398,25 @@ func (mc *metricCatalog) Add(m *metricType) {
func (mc *metricCatalog) Get(ns core.Namespace, version int) (*metricType, error) {
mc.mutex.Lock()
defer mc.mutex.Unlock()
return mc.get(ns.Strings(), version)
return mc.get(strings.Split(strings.TrimPrefix(ns.String(), "/"), "/"), version)
}

// GetVersions retrieves all versions of a given metric namespace.
func (mc *metricCatalog) GetVersions(ns core.Namespace) ([]*metricType, error) {
mc.mutex.Lock()
defer mc.mutex.Unlock()
return mc.getVersions(ns.Strings())
return mc.getVersions(strings.Split(ns.String(), "/"))
}

// Fetch transactionally retrieves all metrics which fall under namespace ns
func (mc *metricCatalog) Fetch(ns core.Namespace) ([]*metricType, error) {
mc.mutex.Lock()
defer mc.mutex.Unlock()
mtsi, err := mc.tree.Fetch(ns.Strings())
var nss []string
if ns.String() != "" {
nss = strings.Split(strings.TrimPrefix(ns.String(), "/"), "/")
}
mtsi, err := mc.tree.Fetch(nss)
if err != nil {
log.WithFields(log.Fields{
"_module": "control",
Expand All @@ -436,10 +434,10 @@ func (mc *metricCatalog) Remove(ns core.Namespace) {
mc.mutex.Lock()
defer mc.mutex.Unlock()

mc.tree.Remove(ns.Strings())
mc.tree.Remove(strings.Split(strings.TrimPrefix(ns.String(), "/"), "/"))

// remove all items from map mKey mapped for this 'ns'
key := getMetricKey(ns)
key := ns.Key()
mc.removeMatchedKey(key)
}

Expand Down Expand Up @@ -542,7 +540,7 @@ func (mc *metricCatalog) get(ns []string, ver int) (*metricType, error) {
"_block": "get",
"error": err,
}).Error("error getting plugin version")
return nil, errorMetricNotFound(ns, ver)
return nil, errorMetricNotFound("/"+strings.Join(ns, "/"), ver)
}
return l, nil
}
Expand All @@ -562,15 +560,11 @@ func (mc *metricCatalog) getVersions(ns []string) ([]*metricType, error) {
return nil, err
}
if len(mts) == 0 {
return nil, errorMetricNotFound(ns)
return nil, errorMetricNotFound("/" + strings.Join(ns, "/"))
}
return mts, nil
}

func getMetricKey(metric core.Namespace) string {
return core.GenerateKey(metric)
}

func getMetricNamespace(key string) core.Namespace {
return core.NewNamespace(strings.Split(key, "."))
}
Expand Down
6 changes: 3 additions & 3 deletions control/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -376,22 +376,22 @@ func TestMetricCatalog(t *testing.T) {
Convey("return first key and item in table", func() {
mc.Next()
key, item := mc.Item()
So(key, ShouldEqual, getMetricKey(ns[0]))
So(key, ShouldEqual, ns[0].Key())
So(item, ShouldResemble, []*metricType{mt[0]})
})
Convey("return second key and item in table", func() {
mc.Next()
mc.Next()
key, item := mc.Item()
So(key, ShouldEqual, getMetricKey(ns[1]))
So(key, ShouldEqual, ns[1].Key())
So(item, ShouldResemble, []*metricType{mt[1]})
})
Convey("return third key and item in table", func() {
mc.Next()
mc.Next()
mc.Next()
key, item := mc.Item()
So(key, ShouldEqual, getMetricKey(ns[2]))
So(key, ShouldEqual, ns[2].Key())
So(item, ShouldResemble, []*metricType{mt[2]})
})
})
Expand Down
9 changes: 5 additions & 4 deletions control/mttrie.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ package control
import (
"errors"
"fmt"
"strings"
)

/*
Expand Down Expand Up @@ -105,7 +106,7 @@ func (m *MTTrie) DeleteByPlugin(lp *loadedPlugin) {

// RemoveMetric removes a specific metric by namespace and version from the tree
func (m *MTTrie) RemoveMetric(mt metricType) {
a, _ := m.find(mt.Namespace().Strings())
a, _ := m.find(strings.Split(strings.TrimPrefix(mt.Namespace().String(), "/"), "/"))
if a != nil {
for v, x := range a.mts {
if mt.Version() == x.Version() {
Expand All @@ -120,7 +121,7 @@ func (m *MTTrie) RemoveMetric(mt metricType) {
// given MetricType
func (mtt *mttNode) Add(mt *metricType) {
ns := mt.Namespace()
node, index := mtt.walk(ns.Strings())
node, index := mtt.walk(strings.Split(strings.TrimPrefix(ns.String(), "/"), "/"))
if index == len(ns) {
if node.mts == nil {
node.mts = make(map[int]*metricType)
Expand Down Expand Up @@ -192,7 +193,7 @@ func (mtt *mttNode) Get(ns []string) ([]*metricType, error) {
return nil, err
}
if node.mts == nil {
return nil, errorMetricNotFound(ns)
return nil, errorMetricNotFound("/" + strings.Join(ns, "/"))
}
var mts []*metricType
for _, mt := range node.mts {
Expand Down Expand Up @@ -222,7 +223,7 @@ func (mtt *mttNode) walk(ns []string) (*mttNode, int) {
func (mtt *mttNode) find(ns []string) (*mttNode, error) {
node, index := mtt.walk(ns)
if index != len(ns) {
return nil, errorMetricNotFound(ns)
return nil, errorMetricNotFound("/" + strings.Join(ns, "/"))
}
return node, nil
}
Expand Down
1 change: 0 additions & 1 deletion control/plugin/client/httpjsonrpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,6 @@ func (h *httpJSONRPCClient) CollectMetrics(mts []core.Metric) ([]core.Metric, er
LastAdvertisedTime_: mt.LastAdvertisedTime(),
Version_: mt.Version(),
Tags_: mt.Tags(),
Labels_: mt.Labels(),
Config_: mt.Config(),
}
}
Expand Down
Loading

0 comments on commit 27b4622

Please sign in to comment.