Skip to content
This repository has been archived by the owner on Nov 8, 2022. It is now read-only.

Commit

Permalink
Tagging metrics via task manifest
Browse files Browse the repository at this point in the history
  • Loading branch information
marcin-krolik committed May 20, 2016
1 parent 3b5c732 commit a54dbd2
Show file tree
Hide file tree
Showing 18 changed files with 646 additions and 39 deletions.
16 changes: 14 additions & 2 deletions control/control.go
Original file line number Diff line number Diff line change
Expand Up @@ -897,7 +897,19 @@ func (p *pluginControl) MetricExists(mns core.Namespace, ver int) bool {
// CollectMetrics is a blocking call to collector plugins returning a collection
// of metrics and errors. If an error is encountered no metrics will be
// returned.
func (p *pluginControl) CollectMetrics(metricTypes []core.Metric, deadline time.Time, taskID string) (metrics []core.Metric, errs []error) {
func (p *pluginControl) CollectMetrics(metricTypes []core.Metric, deadline time.Time, taskID string, allTags map[string]map[string]string) (metrics []core.Metric, errs []error) {
for ns, nsTags := range allTags {
for k, v := range nsTags {
log.WithFields(log.Fields{
"_module": "control",
"block": "CollectMetrics",
"type": "pluginCollector",
"ns": ns,
"tag-key": k,
"tag-val": v,
}).Debug("Tags in CollectMetrics")
}
}

pluginToMetricMap, err := groupMetricTypesByPlugin(p.metricCatalog, metricTypes)
if err != nil {
Expand Down Expand Up @@ -936,7 +948,7 @@ func (p *pluginControl) CollectMetrics(metricTypes []core.Metric, deadline time.
// plugin authors to inadvertently overwrite or not pass along the data
// passed to CollectMetrics so we will help them out here.
for i := range m {
m[i] = addStandardTags(m[i])
m[i] = addStandardAndWorkflowTags(m[i], allTags)
}
metrics = append(metrics, m...)
wg.Done()
Expand Down
26 changes: 13 additions & 13 deletions control/control_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -933,7 +933,7 @@ func TestRoutingCachingStrategy(t *testing.T) {
Convey("Collect metrics", func() {
taskID := tasks[rand.Intn(len(tasks))]
for i := 0; i < 10; i++ {
_, errs := c.CollectMetrics([]core.Metric{metric}, time.Now().Add(time.Second*1), taskID)
_, errs := c.CollectMetrics([]core.Metric{metric}, time.Now().Add(time.Second*1), taskID, nil)
So(errs, ShouldBeEmpty)
}
Convey("Check cache stats", func() {
Expand Down Expand Up @@ -995,7 +995,7 @@ func TestRoutingCachingStrategy(t *testing.T) {
Convey("Collect metrics", func() {
taskID := tasks[rand.Intn(len(tasks))]
for i := 0; i < 10; i++ {
cr, errs := c.CollectMetrics([]core.Metric{metric}, time.Now().Add(time.Second*1), taskID)
cr, errs := c.CollectMetrics([]core.Metric{metric}, time.Now().Add(time.Second*1), taskID, nil)
So(errs, ShouldBeEmpty)
for i := range cr {
So(cr[i].Data(), ShouldContainSubstring, "The mock collected data!")
Expand Down Expand Up @@ -1078,13 +1078,13 @@ func TestCollectDynamicMetrics(t *testing.T) {
So(err, ShouldBeNil)
// The minimum TTL advertised by the plugin is 100ms therefore the TTL for th // pool should be the global cache expiration
So(ttl, ShouldEqual, strategy.GlobalCacheExpiration)
mts, errs := c.CollectMetrics([]core.Metric{m}, time.Now().Add(time.Second*1), taskID)
mts, errs := c.CollectMetrics([]core.Metric{m}, time.Now().Add(time.Second*1), taskID, nil)
hits, err := pool.CacheHits(m.namespace.String(), 2, taskID)
So(err, ShouldBeNil)
So(hits, ShouldEqual, 0)
So(errs, ShouldBeNil)
So(len(mts), ShouldEqual, 10)
mts, errs = c.CollectMetrics([]core.Metric{m}, time.Now().Add(time.Second*1), taskID)
mts, errs = c.CollectMetrics([]core.Metric{m}, time.Now().Add(time.Second*1), taskID, nil)
hits, err = pool.CacheHits(m.namespace.String(), 2, taskID)
So(err, ShouldBeNil)

Expand Down Expand Up @@ -1117,7 +1117,7 @@ func TestCollectDynamicMetrics(t *testing.T) {
ttl, err = pool.CacheTTL(taskID)
So(err, ShouldBeNil)
So(ttl, ShouldEqual, 1100*time.Millisecond)
mts, errs := c.CollectMetrics([]core.Metric{jsonm}, time.Now().Add(time.Second*1), uuid.New())
mts, errs := c.CollectMetrics([]core.Metric{jsonm}, time.Now().Add(time.Second*1), uuid.New(), nil)
hits, err := pool.CacheHits(jsonm.namespace.String(), jsonm.version, taskID)
So(pool.SubscriptionCount(), ShouldEqual, 1)
So(pool.Strategy, ShouldNotBeNil)
Expand All @@ -1126,7 +1126,7 @@ func TestCollectDynamicMetrics(t *testing.T) {
So(hits, ShouldEqual, 0)
So(errs, ShouldBeNil)
So(len(mts), ShouldEqual, 10)
mts, errs = c.CollectMetrics([]core.Metric{jsonm}, time.Now().Add(time.Second*1), uuid.New())
mts, errs = c.CollectMetrics([]core.Metric{jsonm}, time.Now().Add(time.Second*1), uuid.New(), nil)
hits, err = pool.CacheHits(m.namespace.String(), 1, taskID)
So(err, ShouldBeNil)

Expand Down Expand Up @@ -1196,7 +1196,7 @@ func TestFailedPlugin(t *testing.T) {
var cr []core.Metric
eventMap := map[string]int{}
for i := 0; i < MaxPluginRestartCount+1; i++ {
cr, err = c.CollectMetrics(m, time.Now().Add(time.Second*1), uuid.New())
cr, err = c.CollectMetrics(m, time.Now().Add(time.Second*1), uuid.New(), nil)
So(err, ShouldNotBeNil)
So(cr, ShouldBeNil)
<-lpe.done
Expand Down Expand Up @@ -1279,7 +1279,7 @@ func TestCollectMetrics(t *testing.T) {
m = append(m, m1, m2, m3)
Convey("collect metrics", func() {
for x := 0; x < 4; x++ {
cr, err := c.CollectMetrics(m, time.Now().Add(time.Second*1), uuid.New())
cr, err := c.CollectMetrics(m, time.Now().Add(time.Second*1), uuid.New(), nil)
So(err, ShouldBeNil)
for i := range cr {
So(cr[i].Data(), ShouldContainSubstring, "The mock collected data!")
Expand Down Expand Up @@ -1311,7 +1311,7 @@ func TestCollectMetrics(t *testing.T) {
c.Start()
load(c, PluginPath)
m := []core.Metric{}
c.CollectMetrics(m, time.Now().Add(time.Second*60), uuid.New())
c.CollectMetrics(m, time.Now().Add(time.Second*60), uuid.New(), nil)
c.Stop()
time.Sleep(100 * time.Millisecond)
})
Expand Down Expand Up @@ -1648,7 +1648,7 @@ func TestMetricSubscriptionToNewVersion(t *testing.T) {
serr := c.SubscribeDeps("testTaskID", []core.Metric{metric}, []core.Plugin{})
So(serr, ShouldBeNil)
// collect metrics as a sanity check that everything is setup correctly
mts, errs := c.CollectMetrics([]core.Metric{metric}, time.Now(), "testTaskID")
mts, errs := c.CollectMetrics([]core.Metric{metric}, time.Now(), "testTaskID", nil)
So(errs, ShouldBeNil)
So(len(mts), ShouldEqual, 1)
// ensure the data coming back is from v1. V1's data is type string
Expand Down Expand Up @@ -1678,7 +1678,7 @@ func TestMetricSubscriptionToNewVersion(t *testing.T) {
So(errp, ShouldBeNil)
So(pool2.SubscriptionCount(), ShouldEqual, 1)

mts, errs = c.CollectMetrics([]core.Metric{metric}, time.Now(), "testTaskID")
mts, errs = c.CollectMetrics([]core.Metric{metric}, time.Now(), "testTaskID", nil)
So(len(mts), ShouldEqual, 1)

// ensure the data coming back is from v2, V2's data is type int
Expand Down Expand Up @@ -1709,7 +1709,7 @@ func TestMetricSubscriptionToOlderVersion(t *testing.T) {
serr := c.SubscribeDeps("testTaskID", []core.Metric{metric}, []core.Plugin{})
So(serr, ShouldBeNil)
// collect metrics as a sanity check that everything is setup correctly
mts, errs := c.CollectMetrics([]core.Metric{metric}, time.Now(), "testTaskID")
mts, errs := c.CollectMetrics([]core.Metric{metric}, time.Now(), "testTaskID", nil)
So(errs, ShouldBeNil)
So(len(mts), ShouldEqual, 1)
// ensure the data coming back is from v2. V2's data is type int
Expand Down Expand Up @@ -1744,7 +1744,7 @@ func TestMetricSubscriptionToOlderVersion(t *testing.T) {
So(errp, ShouldBeNil)
So(pool2.SubscriptionCount(), ShouldEqual, 1)

mts, errs = c.CollectMetrics([]core.Metric{metric}, time.Now(), "testTaskID")
mts, errs = c.CollectMetrics([]core.Metric{metric}, time.Now(), "testTaskID", nil)
So(errs, ShouldBeEmpty)
So(len(mts), ShouldEqual, 1)

Expand Down
33 changes: 30 additions & 3 deletions control/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,25 @@ var (
"carets": {"^"},
"quotations": {"\"", "`", "'"},
}

hostnameReader hostnamer
)

// hostnameReader, hostnamer created for mocking
func init() {
hostnameReader = &hostnameReaderType{}
}

type hostnamer interface {
Hostname() (name string, err error)
}

type hostnameReaderType struct{}

func (h *hostnameReaderType) Hostname() (name string, err error) {
return os.Hostname()
}

func errorMetricNotFound(ns string, ver ...int) error {
if len(ver) > 0 {
return fmt.Errorf("Metric not found: %s (version: %d)", ns, ver[0])
Expand Down Expand Up @@ -598,21 +615,31 @@ func getVersion(c []*metricType, ver int) (*metricType, error) {
return nil, errMetricNotFound
}

func addStandardTags(m core.Metric) core.Metric {
hostname, err := os.Hostname()
func addStandardAndWorkflowTags(m core.Metric, allTags map[string]map[string]string) core.Metric {
hostname, err := hostnameReader.Hostname()
if err != nil {
log.WithFields(log.Fields{
"_module": "control",
"_file": "metrics.go,",
"_block": "addStandardTags",
"_block": "addStandardAndWorkflowTags",
"error": err.Error(),
}).Error("Unable to determine hostname")
}
tags := m.Tags()
if tags == nil {
tags = map[string]string{}
}
// apply tags from workflow
for ns, nsTags := range allTags {
if strings.HasPrefix(m.Namespace().String(), ns) {
for k, v := range nsTags {
tags[k] = v
}
}
}
// apply standard tag
tags[core.STD_TAG_PLUGIN_RUNNING_ON] = hostname

metric := plugin.MetricType{
Namespace_: m.Namespace(),
Version_: m.Version(),
Expand Down
121 changes: 121 additions & 0 deletions control/metrics_small_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
// +build small

/*
http://www.apache.org/licenses/LICENSE-2.0.txt
Copyright 2015-2016 Intel Corporation
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package control

import (
"testing"

"github.com/intelsdi-x/snap/control/plugin"
"github.com/intelsdi-x/snap/core"

. "github.com/smartystreets/goconvey/convey"
)

const (
foo = "/intel/foo"
bar = "/intel/foo/bar"
tar = "/intel/tar/qaz"
)

func TestAddTagsFromWorkflow(t *testing.T) {
hostnameReader = &mockHostnameReader{}
tcs := prepareTestCases()
Convey("Adding tags to metric type", t, func() {
for _, tc := range tcs {
outputTags := addStandardAndWorkflowTags(tc.Metric, tc.InputTags).Tags()
So(outputTags, ShouldNotBeNil)
So(outputTags, ShouldResemble, tc.ExpectedTags)
}
})
}

type mockHostnameReader struct{}

func (m *mockHostnameReader) Hostname() (string, error) {
return "hostname", nil
}

type testCase struct {
Metric plugin.MetricType
InputTags map[string]map[string]string
ExpectedTags map[string]string
}

func prepareTestCases() []testCase {
hostname, _ := hostnameReader.Hostname()
fooTags := map[string]string{
"foo_tag": "foo_val",
}
barTags := map[string]string{
"foobar_tag": "foobar_val",
}
tarTags := map[string]string{
"tarqaz_tag": "tarqaz_val",
}

allTags := map[string]map[string]string{
foo: fooTags,
bar: barTags,
tar: tarTags,
}

foobazMetric := plugin.MetricType{
Namespace_: core.NewNamespace("intel", "foo", "baz"),
}
foobarMetric := plugin.MetricType{
Namespace_: core.NewNamespace("intel", "foo", "bar"),
}
tarqazMetric := plugin.MetricType{
Namespace_: core.NewNamespace("intel", "tar", "qaz"),
}

stdMetric := plugin.MetricType{
Namespace_: core.NewNamespace("intel", "std"),
}

foobazExpected := map[string]string{
core.STD_TAG_PLUGIN_RUNNING_ON: hostname,
"foo_tag": "foo_val",
}
foobarExpected := map[string]string{
core.STD_TAG_PLUGIN_RUNNING_ON: hostname,
"foo_tag": "foo_val",
"foobar_tag": "foobar_val",
}
tarqazExpected := map[string]string{
core.STD_TAG_PLUGIN_RUNNING_ON: hostname,
"tarqaz_tag": "tarqaz_val",
}
stdExpected := map[string]string{
core.STD_TAG_PLUGIN_RUNNING_ON: hostname,
}

testCases := []testCase{
{foobazMetric, allTags, foobazExpected},
{foobarMetric, allTags, foobarExpected},
{tarqazMetric, allTags, tarqazExpected},
{stdMetric, allTags, stdExpected},
{foobazMetric, nil, stdExpected},
}

return testCases
}
2 changes: 1 addition & 1 deletion control/plugin_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -407,7 +407,7 @@ func (p *pluginManager) LoadPlugin(details *pluginDetails, emitter gomit.Emitter
}

//Add standard tags
nmt = addStandardTags(nmt)
nmt = addStandardAndWorkflowTags(nmt, nil)

if err := p.metricCatalog.AddLoadedMetricType(lPlugin, nmt); err != nil {
pmLogger.WithFields(log.Fields{
Expand Down
27 changes: 25 additions & 2 deletions scheduler/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,15 +172,24 @@ type collectorJob struct {
metricTypes []core.RequestedMetric
metrics []core.Metric
configDataTree *cdata.ConfigDataTree
tags map[string]map[string]string
}

func newCollectorJob(metricTypes []core.RequestedMetric, deadlineDuration time.Duration, collector collectsMetrics, cdt *cdata.ConfigDataTree, taskID string) job {
func newCollectorJob(
metricTypes []core.RequestedMetric,
deadlineDuration time.Duration,
collector collectsMetrics,
cdt *cdata.ConfigDataTree,
taskID string,
tags map[string]map[string]string,
) job {
return &collectorJob{
collector: collector,
metricTypes: metricTypes,
metrics: []core.Metric{},
coreJob: newCoreJob(collectJobType, time.Now().Add(deadlineDuration), taskID, "", 0),
configDataTree: cdt,
tags: tags,
}
}

Expand Down Expand Up @@ -217,6 +226,19 @@ func (c *collectorJob) Run() {
"metric-count": len(c.metricTypes),
}).Debug("starting collector job")

for ns, tags := range c.tags {
for k, v := range tags {
log.WithFields(log.Fields{
"_module": "scheduler-job",
"block": "run",
"job-type": "collector",
"ns": ns,
"tag-key": k,
"tag-val": v,
}).Debug("Tags sent to collectorJob")
}
}

metrics := []core.Metric{}
for _, rmt := range c.metricTypes {
nss, err := c.collector.ExpandWildcards(rmt.Namespace())
Expand All @@ -231,6 +253,7 @@ func (c *collectorJob) Run() {
if config == nil {
config = cdata.NewNode()
}

metric := &metric{
namespace: ns,
version: rmt.Version(),
Expand All @@ -240,7 +263,7 @@ func (c *collectorJob) Run() {
}
}

ret, errs := c.collector.CollectMetrics(metrics, c.Deadline(), c.TaskID())
ret, errs := c.collector.CollectMetrics(metrics, c.Deadline(), c.TaskID(), c.tags)

log.WithFields(log.Fields{
"_module": "scheduler-job",
Expand Down
Loading

0 comments on commit a54dbd2

Please sign in to comment.