From a54dbd2e37392090a5ac755316dcd09acd6a56fe Mon Sep 17 00:00:00 2001 From: Marcin Krolik Date: Mon, 18 Apr 2016 17:23:38 +0200 Subject: [PATCH] Tagging metrics via task manifest --- control/control.go | 16 ++- control/control_test.go | 26 ++-- control/metrics.go | 33 ++++- control/metrics_small_test.go | 121 ++++++++++++++++ control/plugin_manager.go | 2 +- scheduler/job.go | 27 +++- scheduler/job_test.go | 23 +-- scheduler/scheduler.go | 4 +- scheduler/scheduler_test.go | 2 +- scheduler/wmap/fixtures/tasks.go | 135 ++++++++++++++++++ scheduler/wmap/sample/1.json | 9 ++ scheduler/wmap/sample/1.yml | 6 + scheduler/wmap/string.go | 8 ++ scheduler/wmap/wmap.go | 5 + scheduler/wmap/wmap_small_test.go | 225 ++++++++++++++++++++++++++++++ scheduler/wmap/wmap_test.go | 30 ++++ scheduler/workflow.go | 5 +- scheduler/workflow_test.go | 8 +- 18 files changed, 646 insertions(+), 39 deletions(-) create mode 100644 control/metrics_small_test.go create mode 100644 scheduler/wmap/fixtures/tasks.go create mode 100644 scheduler/wmap/wmap_small_test.go diff --git a/control/control.go b/control/control.go index 283f6532d..96843a456 100644 --- a/control/control.go +++ b/control/control.go @@ -897,7 +897,19 @@ func (p *pluginControl) MetricExists(mns core.Namespace, ver int) bool { // CollectMetrics is a blocking call to collector plugins returning a collection // of metrics and errors. If an error is encountered no metrics will be // returned. -func (p *pluginControl) CollectMetrics(metricTypes []core.Metric, deadline time.Time, taskID string) (metrics []core.Metric, errs []error) { +func (p *pluginControl) CollectMetrics(metricTypes []core.Metric, deadline time.Time, taskID string, allTags map[string]map[string]string) (metrics []core.Metric, errs []error) { + for ns, nsTags := range allTags { + for k, v := range nsTags { + log.WithFields(log.Fields{ + "_module": "control", + "block": "CollectMetrics", + "type": "pluginCollector", + "ns": ns, + "tag-key": k, + "tag-val": v, + }).Debug("Tags in CollectMetrics") + } + } pluginToMetricMap, err := groupMetricTypesByPlugin(p.metricCatalog, metricTypes) if err != nil { @@ -936,7 +948,7 @@ func (p *pluginControl) CollectMetrics(metricTypes []core.Metric, deadline time. // plugin authors to inadvertently overwrite or not pass along the data // passed to CollectMetrics so we will help them out here. for i := range m { - m[i] = addStandardTags(m[i]) + m[i] = addStandardAndWorkflowTags(m[i], allTags) } metrics = append(metrics, m...) wg.Done() diff --git a/control/control_test.go b/control/control_test.go index cb69ca324..cda9d59fc 100644 --- a/control/control_test.go +++ b/control/control_test.go @@ -933,7 +933,7 @@ func TestRoutingCachingStrategy(t *testing.T) { Convey("Collect metrics", func() { taskID := tasks[rand.Intn(len(tasks))] for i := 0; i < 10; i++ { - _, errs := c.CollectMetrics([]core.Metric{metric}, time.Now().Add(time.Second*1), taskID) + _, errs := c.CollectMetrics([]core.Metric{metric}, time.Now().Add(time.Second*1), taskID, nil) So(errs, ShouldBeEmpty) } Convey("Check cache stats", func() { @@ -995,7 +995,7 @@ func TestRoutingCachingStrategy(t *testing.T) { Convey("Collect metrics", func() { taskID := tasks[rand.Intn(len(tasks))] for i := 0; i < 10; i++ { - cr, errs := c.CollectMetrics([]core.Metric{metric}, time.Now().Add(time.Second*1), taskID) + cr, errs := c.CollectMetrics([]core.Metric{metric}, time.Now().Add(time.Second*1), taskID, nil) So(errs, ShouldBeEmpty) for i := range cr { So(cr[i].Data(), ShouldContainSubstring, "The mock collected data!") @@ -1078,13 +1078,13 @@ func TestCollectDynamicMetrics(t *testing.T) { So(err, ShouldBeNil) // The minimum TTL advertised by the plugin is 100ms therefore the TTL for th // pool should be the global cache expiration So(ttl, ShouldEqual, strategy.GlobalCacheExpiration) - mts, errs := c.CollectMetrics([]core.Metric{m}, time.Now().Add(time.Second*1), taskID) + mts, errs := c.CollectMetrics([]core.Metric{m}, time.Now().Add(time.Second*1), taskID, nil) hits, err := pool.CacheHits(m.namespace.String(), 2, taskID) So(err, ShouldBeNil) So(hits, ShouldEqual, 0) So(errs, ShouldBeNil) So(len(mts), ShouldEqual, 10) - mts, errs = c.CollectMetrics([]core.Metric{m}, time.Now().Add(time.Second*1), taskID) + mts, errs = c.CollectMetrics([]core.Metric{m}, time.Now().Add(time.Second*1), taskID, nil) hits, err = pool.CacheHits(m.namespace.String(), 2, taskID) So(err, ShouldBeNil) @@ -1117,7 +1117,7 @@ func TestCollectDynamicMetrics(t *testing.T) { ttl, err = pool.CacheTTL(taskID) So(err, ShouldBeNil) So(ttl, ShouldEqual, 1100*time.Millisecond) - mts, errs := c.CollectMetrics([]core.Metric{jsonm}, time.Now().Add(time.Second*1), uuid.New()) + mts, errs := c.CollectMetrics([]core.Metric{jsonm}, time.Now().Add(time.Second*1), uuid.New(), nil) hits, err := pool.CacheHits(jsonm.namespace.String(), jsonm.version, taskID) So(pool.SubscriptionCount(), ShouldEqual, 1) So(pool.Strategy, ShouldNotBeNil) @@ -1126,7 +1126,7 @@ func TestCollectDynamicMetrics(t *testing.T) { So(hits, ShouldEqual, 0) So(errs, ShouldBeNil) So(len(mts), ShouldEqual, 10) - mts, errs = c.CollectMetrics([]core.Metric{jsonm}, time.Now().Add(time.Second*1), uuid.New()) + mts, errs = c.CollectMetrics([]core.Metric{jsonm}, time.Now().Add(time.Second*1), uuid.New(), nil) hits, err = pool.CacheHits(m.namespace.String(), 1, taskID) So(err, ShouldBeNil) @@ -1196,7 +1196,7 @@ func TestFailedPlugin(t *testing.T) { var cr []core.Metric eventMap := map[string]int{} for i := 0; i < MaxPluginRestartCount+1; i++ { - cr, err = c.CollectMetrics(m, time.Now().Add(time.Second*1), uuid.New()) + cr, err = c.CollectMetrics(m, time.Now().Add(time.Second*1), uuid.New(), nil) So(err, ShouldNotBeNil) So(cr, ShouldBeNil) <-lpe.done @@ -1279,7 +1279,7 @@ func TestCollectMetrics(t *testing.T) { m = append(m, m1, m2, m3) Convey("collect metrics", func() { for x := 0; x < 4; x++ { - cr, err := c.CollectMetrics(m, time.Now().Add(time.Second*1), uuid.New()) + cr, err := c.CollectMetrics(m, time.Now().Add(time.Second*1), uuid.New(), nil) So(err, ShouldBeNil) for i := range cr { So(cr[i].Data(), ShouldContainSubstring, "The mock collected data!") @@ -1311,7 +1311,7 @@ func TestCollectMetrics(t *testing.T) { c.Start() load(c, PluginPath) m := []core.Metric{} - c.CollectMetrics(m, time.Now().Add(time.Second*60), uuid.New()) + c.CollectMetrics(m, time.Now().Add(time.Second*60), uuid.New(), nil) c.Stop() time.Sleep(100 * time.Millisecond) }) @@ -1648,7 +1648,7 @@ func TestMetricSubscriptionToNewVersion(t *testing.T) { serr := c.SubscribeDeps("testTaskID", []core.Metric{metric}, []core.Plugin{}) So(serr, ShouldBeNil) // collect metrics as a sanity check that everything is setup correctly - mts, errs := c.CollectMetrics([]core.Metric{metric}, time.Now(), "testTaskID") + mts, errs := c.CollectMetrics([]core.Metric{metric}, time.Now(), "testTaskID", nil) So(errs, ShouldBeNil) So(len(mts), ShouldEqual, 1) // ensure the data coming back is from v1. V1's data is type string @@ -1678,7 +1678,7 @@ func TestMetricSubscriptionToNewVersion(t *testing.T) { So(errp, ShouldBeNil) So(pool2.SubscriptionCount(), ShouldEqual, 1) - mts, errs = c.CollectMetrics([]core.Metric{metric}, time.Now(), "testTaskID") + mts, errs = c.CollectMetrics([]core.Metric{metric}, time.Now(), "testTaskID", nil) So(len(mts), ShouldEqual, 1) // ensure the data coming back is from v2, V2's data is type int @@ -1709,7 +1709,7 @@ func TestMetricSubscriptionToOlderVersion(t *testing.T) { serr := c.SubscribeDeps("testTaskID", []core.Metric{metric}, []core.Plugin{}) So(serr, ShouldBeNil) // collect metrics as a sanity check that everything is setup correctly - mts, errs := c.CollectMetrics([]core.Metric{metric}, time.Now(), "testTaskID") + mts, errs := c.CollectMetrics([]core.Metric{metric}, time.Now(), "testTaskID", nil) So(errs, ShouldBeNil) So(len(mts), ShouldEqual, 1) // ensure the data coming back is from v2. V2's data is type int @@ -1744,7 +1744,7 @@ func TestMetricSubscriptionToOlderVersion(t *testing.T) { So(errp, ShouldBeNil) So(pool2.SubscriptionCount(), ShouldEqual, 1) - mts, errs = c.CollectMetrics([]core.Metric{metric}, time.Now(), "testTaskID") + mts, errs = c.CollectMetrics([]core.Metric{metric}, time.Now(), "testTaskID", nil) So(errs, ShouldBeEmpty) So(len(mts), ShouldEqual, 1) diff --git a/control/metrics.go b/control/metrics.go index 72ade01d5..08fbd2321 100644 --- a/control/metrics.go +++ b/control/metrics.go @@ -49,8 +49,25 @@ var ( "carets": {"^"}, "quotations": {"\"", "`", "'"}, } + + hostnameReader hostnamer ) +// hostnameReader, hostnamer created for mocking +func init() { + hostnameReader = &hostnameReaderType{} +} + +type hostnamer interface { + Hostname() (name string, err error) +} + +type hostnameReaderType struct{} + +func (h *hostnameReaderType) Hostname() (name string, err error) { + return os.Hostname() +} + func errorMetricNotFound(ns string, ver ...int) error { if len(ver) > 0 { return fmt.Errorf("Metric not found: %s (version: %d)", ns, ver[0]) @@ -598,13 +615,13 @@ func getVersion(c []*metricType, ver int) (*metricType, error) { return nil, errMetricNotFound } -func addStandardTags(m core.Metric) core.Metric { - hostname, err := os.Hostname() +func addStandardAndWorkflowTags(m core.Metric, allTags map[string]map[string]string) core.Metric { + hostname, err := hostnameReader.Hostname() if err != nil { log.WithFields(log.Fields{ "_module": "control", "_file": "metrics.go,", - "_block": "addStandardTags", + "_block": "addStandardAndWorkflowTags", "error": err.Error(), }).Error("Unable to determine hostname") } @@ -612,7 +629,17 @@ func addStandardTags(m core.Metric) core.Metric { if tags == nil { tags = map[string]string{} } + // apply tags from workflow + for ns, nsTags := range allTags { + if strings.HasPrefix(m.Namespace().String(), ns) { + for k, v := range nsTags { + tags[k] = v + } + } + } + // apply standard tag tags[core.STD_TAG_PLUGIN_RUNNING_ON] = hostname + metric := plugin.MetricType{ Namespace_: m.Namespace(), Version_: m.Version(), diff --git a/control/metrics_small_test.go b/control/metrics_small_test.go new file mode 100644 index 000000000..a902ee184 --- /dev/null +++ b/control/metrics_small_test.go @@ -0,0 +1,121 @@ +// +build small + +/* +http://www.apache.org/licenses/LICENSE-2.0.txt + + +Copyright 2015-2016 Intel Corporation + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package control + +import ( + "testing" + + "github.com/intelsdi-x/snap/control/plugin" + "github.com/intelsdi-x/snap/core" + + . "github.com/smartystreets/goconvey/convey" +) + +const ( + foo = "/intel/foo" + bar = "/intel/foo/bar" + tar = "/intel/tar/qaz" +) + +func TestAddTagsFromWorkflow(t *testing.T) { + hostnameReader = &mockHostnameReader{} + tcs := prepareTestCases() + Convey("Adding tags to metric type", t, func() { + for _, tc := range tcs { + outputTags := addStandardAndWorkflowTags(tc.Metric, tc.InputTags).Tags() + So(outputTags, ShouldNotBeNil) + So(outputTags, ShouldResemble, tc.ExpectedTags) + } + }) +} + +type mockHostnameReader struct{} + +func (m *mockHostnameReader) Hostname() (string, error) { + return "hostname", nil +} + +type testCase struct { + Metric plugin.MetricType + InputTags map[string]map[string]string + ExpectedTags map[string]string +} + +func prepareTestCases() []testCase { + hostname, _ := hostnameReader.Hostname() + fooTags := map[string]string{ + "foo_tag": "foo_val", + } + barTags := map[string]string{ + "foobar_tag": "foobar_val", + } + tarTags := map[string]string{ + "tarqaz_tag": "tarqaz_val", + } + + allTags := map[string]map[string]string{ + foo: fooTags, + bar: barTags, + tar: tarTags, + } + + foobazMetric := plugin.MetricType{ + Namespace_: core.NewNamespace("intel", "foo", "baz"), + } + foobarMetric := plugin.MetricType{ + Namespace_: core.NewNamespace("intel", "foo", "bar"), + } + tarqazMetric := plugin.MetricType{ + Namespace_: core.NewNamespace("intel", "tar", "qaz"), + } + + stdMetric := plugin.MetricType{ + Namespace_: core.NewNamespace("intel", "std"), + } + + foobazExpected := map[string]string{ + core.STD_TAG_PLUGIN_RUNNING_ON: hostname, + "foo_tag": "foo_val", + } + foobarExpected := map[string]string{ + core.STD_TAG_PLUGIN_RUNNING_ON: hostname, + "foo_tag": "foo_val", + "foobar_tag": "foobar_val", + } + tarqazExpected := map[string]string{ + core.STD_TAG_PLUGIN_RUNNING_ON: hostname, + "tarqaz_tag": "tarqaz_val", + } + stdExpected := map[string]string{ + core.STD_TAG_PLUGIN_RUNNING_ON: hostname, + } + + testCases := []testCase{ + {foobazMetric, allTags, foobazExpected}, + {foobarMetric, allTags, foobarExpected}, + {tarqazMetric, allTags, tarqazExpected}, + {stdMetric, allTags, stdExpected}, + {foobazMetric, nil, stdExpected}, + } + + return testCases +} diff --git a/control/plugin_manager.go b/control/plugin_manager.go index 50140e570..ceab224d7 100644 --- a/control/plugin_manager.go +++ b/control/plugin_manager.go @@ -407,7 +407,7 @@ func (p *pluginManager) LoadPlugin(details *pluginDetails, emitter gomit.Emitter } //Add standard tags - nmt = addStandardTags(nmt) + nmt = addStandardAndWorkflowTags(nmt, nil) if err := p.metricCatalog.AddLoadedMetricType(lPlugin, nmt); err != nil { pmLogger.WithFields(log.Fields{ diff --git a/scheduler/job.go b/scheduler/job.go index 5bebf69a2..14566dca6 100644 --- a/scheduler/job.go +++ b/scheduler/job.go @@ -172,15 +172,24 @@ type collectorJob struct { metricTypes []core.RequestedMetric metrics []core.Metric configDataTree *cdata.ConfigDataTree + tags map[string]map[string]string } -func newCollectorJob(metricTypes []core.RequestedMetric, deadlineDuration time.Duration, collector collectsMetrics, cdt *cdata.ConfigDataTree, taskID string) job { +func newCollectorJob( + metricTypes []core.RequestedMetric, + deadlineDuration time.Duration, + collector collectsMetrics, + cdt *cdata.ConfigDataTree, + taskID string, + tags map[string]map[string]string, +) job { return &collectorJob{ collector: collector, metricTypes: metricTypes, metrics: []core.Metric{}, coreJob: newCoreJob(collectJobType, time.Now().Add(deadlineDuration), taskID, "", 0), configDataTree: cdt, + tags: tags, } } @@ -217,6 +226,19 @@ func (c *collectorJob) Run() { "metric-count": len(c.metricTypes), }).Debug("starting collector job") + for ns, tags := range c.tags { + for k, v := range tags { + log.WithFields(log.Fields{ + "_module": "scheduler-job", + "block": "run", + "job-type": "collector", + "ns": ns, + "tag-key": k, + "tag-val": v, + }).Debug("Tags sent to collectorJob") + } + } + metrics := []core.Metric{} for _, rmt := range c.metricTypes { nss, err := c.collector.ExpandWildcards(rmt.Namespace()) @@ -231,6 +253,7 @@ func (c *collectorJob) Run() { if config == nil { config = cdata.NewNode() } + metric := &metric{ namespace: ns, version: rmt.Version(), @@ -240,7 +263,7 @@ func (c *collectorJob) Run() { } } - ret, errs := c.collector.CollectMetrics(metrics, c.Deadline(), c.TaskID()) + ret, errs := c.collector.CollectMetrics(metrics, c.Deadline(), c.TaskID(), c.tags) log.WithFields(log.Fields{ "_module": "scheduler-job", diff --git a/scheduler/job_test.go b/scheduler/job_test.go index 7e1dc51b4..f5ea6871a 100644 --- a/scheduler/job_test.go +++ b/scheduler/job_test.go @@ -36,7 +36,7 @@ import ( type mockCollector struct{} -func (m *mockCollector) CollectMetrics([]core.Metric, time.Time, string) ([]core.Metric, []error) { +func (m *mockCollector) CollectMetrics([]core.Metric, time.Time, string, map[string]map[string]string) ([]core.Metric, []error) { return nil, nil } @@ -47,39 +47,41 @@ func (m *mockCollector) ExpandWildcards(core.Namespace) ([]core.Namespace, serro func TestCollectorJob(t *testing.T) { log.SetLevel(log.FatalLevel) cdt := cdata.NewTree() + //TODO: kromar do something with tags? + tags := map[string]map[string]string{} Convey("newCollectorJob()", t, func() { Convey("it returns an init-ed collectorJob", func() { - cj := newCollectorJob([]core.RequestedMetric{}, defaultDeadline, &mockCollector{}, cdt, "taskid") + cj := newCollectorJob([]core.RequestedMetric{}, defaultDeadline, &mockCollector{}, cdt, "taskid", tags) So(cj, ShouldHaveSameTypeAs, &collectorJob{}) }) }) Convey("StartTime()", t, func() { Convey("it should return the job starttime", func() { - cj := newCollectorJob([]core.RequestedMetric{}, defaultDeadline, &mockCollector{}, cdt, "taskid") + cj := newCollectorJob([]core.RequestedMetric{}, defaultDeadline, &mockCollector{}, cdt, "taskid", tags) So(cj.StartTime(), ShouldHaveSameTypeAs, time.Now()) }) }) Convey("Deadline()", t, func() { Convey("it should return the job daedline", func() { - cj := newCollectorJob([]core.RequestedMetric{}, defaultDeadline, &mockCollector{}, cdt, "taskid") + cj := newCollectorJob([]core.RequestedMetric{}, defaultDeadline, &mockCollector{}, cdt, "taskid", tags) So(cj.Deadline(), ShouldResemble, cj.(*collectorJob).deadline) }) }) Convey("Type()", t, func() { Convey("it should return the job type", func() { - cj := newCollectorJob([]core.RequestedMetric{}, defaultDeadline, &mockCollector{}, cdt, "taskid") + cj := newCollectorJob([]core.RequestedMetric{}, defaultDeadline, &mockCollector{}, cdt, "taskid", tags) So(cj.Type(), ShouldEqual, collectJobType) }) }) Convey("Errors()", t, func() { Convey("it should return the errors from the job", func() { - cj := newCollectorJob([]core.RequestedMetric{}, defaultDeadline, &mockCollector{}, cdt, "taskid") + cj := newCollectorJob([]core.RequestedMetric{}, defaultDeadline, &mockCollector{}, cdt, "taskid", tags) So(cj.Errors(), ShouldResemble, []error{}) }) }) Convey("AddErrors()", t, func() { Convey("it should append errors to the job", func() { - cj := newCollectorJob([]core.RequestedMetric{}, defaultDeadline, &mockCollector{}, cdt, "taskid") + cj := newCollectorJob([]core.RequestedMetric{}, defaultDeadline, &mockCollector{}, cdt, "taskid", tags) So(cj.Errors(), ShouldResemble, []error{}) e1 := errors.New("1") @@ -94,7 +96,7 @@ func TestCollectorJob(t *testing.T) { }) Convey("Run()", t, func() { Convey("it should complete without errors", func() { - cj := newCollectorJob([]core.RequestedMetric{}, defaultDeadline, &mockCollector{}, cdt, "taskid") + cj := newCollectorJob([]core.RequestedMetric{}, defaultDeadline, &mockCollector{}, cdt, "taskid", tags) cj.(*collectorJob).Run() So(cj.Errors(), ShouldResemble, []error{}) }) @@ -104,16 +106,17 @@ func TestCollectorJob(t *testing.T) { func TestQueuedJob(t *testing.T) { log.SetLevel(log.FatalLevel) cdt := cdata.NewTree() + tags := map[string]map[string]string{} Convey("Job()", t, func() { Convey("it should return the underlying job", func() { - cj := newCollectorJob([]core.RequestedMetric{}, defaultDeadline, &mockCollector{}, cdt, "taskid") + cj := newCollectorJob([]core.RequestedMetric{}, defaultDeadline, &mockCollector{}, cdt, "taskid", tags) qj := newQueuedJob(cj) So(qj.Job(), ShouldEqual, cj) }) }) Convey("Promise()", t, func() { Convey("it should return the underlying promise", func() { - cj := newCollectorJob([]core.RequestedMetric{}, defaultDeadline, &mockCollector{}, cdt, "taskid") + cj := newCollectorJob([]core.RequestedMetric{}, defaultDeadline, &mockCollector{}, cdt, "taskid", tags) qj := newQueuedJob(cj) So(qj.Promise().IsComplete(), ShouldBeFalse) }) diff --git a/scheduler/scheduler.go b/scheduler/scheduler.go index 9b445f1ab..e16d96a9a 100644 --- a/scheduler/scheduler.go +++ b/scheduler/scheduler.go @@ -22,7 +22,6 @@ package scheduler import ( "errors" "fmt" - // "strings" "time" log "github.com/Sirupsen/logrus" @@ -84,7 +83,7 @@ type managesPluginContentTypes interface { type collectsMetrics interface { ExpandWildcards(core.Namespace) ([]core.Namespace, serror.SnapError) - CollectMetrics([]core.Metric, time.Time, string) ([]core.Metric, []error) + CollectMetrics([]core.Metric, time.Time, string, map[string]map[string]string) ([]core.Metric, []error) } type publishesMetrics interface { @@ -204,6 +203,7 @@ func (s *scheduler) createTask(sch schedule.Schedule, wfMap *wmap.WorkflowMap, s } // validate plugins and metrics + mts, plugins := s.gatherMetricsAndPlugins(wf) errs := s.metricManager.ValidateDeps(mts, plugins) if len(errs) > 0 { diff --git a/scheduler/scheduler_test.go b/scheduler/scheduler_test.go index 575a66c8d..78fec0da8 100644 --- a/scheduler/scheduler_test.go +++ b/scheduler/scheduler_test.go @@ -81,7 +81,7 @@ func (m *mockMetricManager) GetPluginContentTypes(n string, t core.PluginType, v return m.acceptedContentTypes[key], m.returnedContentTypes[key], nil } -func (m *mockMetricManager) CollectMetrics([]core.Metric, time.Time, string) ([]core.Metric, []error) { +func (m *mockMetricManager) CollectMetrics([]core.Metric, time.Time, string, map[string]map[string]string) ([]core.Metric, []error) { return nil, nil } diff --git a/scheduler/wmap/fixtures/tasks.go b/scheduler/wmap/fixtures/tasks.go new file mode 100644 index 000000000..33b6f15d4 --- /dev/null +++ b/scheduler/wmap/fixtures/tasks.go @@ -0,0 +1,135 @@ +/* +http://www.apache.org/licenses/LICENSE-2.0.txt + + +Copyright 2015 Intel Corporation + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package fixtures + +var TaskJSON = ` +{ + "collect": { + "metrics": { + "/foo/bar": { + "version": 1 + }, + "/foo/baz": {} + }, + "config": { + "/foo/bar": { + "password": "drowssap", + "user": "root" + } + }, + "tags": { + "/foo/bar": { + "tag1": "val1", + "tag2": "val2" + }, + "/foo/baz": { + "tag3": "val3" + } + }, + "process": [ + { + "plugin_name": "floor", + "plugin_version": 1, + "process": [ + { + "plugin_name": "oslo", + "plugin_version": 1, + "process": null, + "publish": null, + "config": { + "version": "kilo" + } + } + ], + "publish": [ + { + "plugin_name": "rabbitmq", + "plugin_version": 5, + "config": { + "port": 5672, + "server": "localhost" + } + } + ], + "config": { + "something": true, + "somethingelse": false + } + } + ], + "publish": [ + { + "plugin_name": "riemann", + "plugin_version": 3, + "config": { + "port": 8080, + "user": "root" + } + } + ] + } +} +` + +var TaskYAML = ` +--- + collect: + metrics: + /foo/bar: + version: 1 + /foo/baz: + config: + /foo/bar: + user: "root" + password: "drowssap" + tags: + /foo/bar: + tag1: "val1" + tag2: "val2" + /foo/baz: + tag3: "val3" + process: + - + plugin_name: "floor" + plugin_version: 1 + config: + something: true + somethingelse: false + process: + - + plugin_name: oslo + plugin_version: 1 + config: + version: kilo + publish: + - + plugin_name: "rabbitmq" + plugin_version: 5 + config: + server: "localhost" + port: 5672 + publish: + - + plugin_name: "riemann" + plugin_version: 3 + config: + user: "root" + port: 8080 +` diff --git a/scheduler/wmap/sample/1.json b/scheduler/wmap/sample/1.json index 78ed62688..c336327a8 100644 --- a/scheduler/wmap/sample/1.json +++ b/scheduler/wmap/sample/1.json @@ -12,6 +12,15 @@ "user": "root" } }, + "tags": { + "/foo/bar": { + "tag1": "val1", + "tag2": "val2" + }, + "/foo/baz": { + "tag3": "val3" + } + }, "process": [ { "plugin_name": "floor", diff --git a/scheduler/wmap/sample/1.yml b/scheduler/wmap/sample/1.yml index 148de0971..8eb6b449f 100644 --- a/scheduler/wmap/sample/1.yml +++ b/scheduler/wmap/sample/1.yml @@ -8,6 +8,12 @@ /foo/bar: user: "root" password: "drowssap" + tags: + /foo/bar: + tag1: "val1" + tag2: "val2" + /foo/baz: + tag3: "val3" process: - plugin_name: "floor" diff --git a/scheduler/wmap/string.go b/scheduler/wmap/string.go index db516b316..014e7ee5b 100644 --- a/scheduler/wmap/string.go +++ b/scheduler/wmap/string.go @@ -52,6 +52,14 @@ func (c *CollectWorkflowMapNode) String(pad string) string { } } out += "\n" + out += pad + "Tags:\n" + for k, v := range c.Tags { + out += pad + " " + k + "\n" + for x, y := range v { + out += pad + " " + fmt.Sprintf("%s=%+v\n", x, y) + } + } + out += "\n" out += pad + "Process Nodes:\n" for _, pr := range c.ProcessNodes { out += pr.String(pad) diff --git a/scheduler/wmap/wmap.go b/scheduler/wmap/wmap.go index 3c37b75b6..f1748d5c7 100644 --- a/scheduler/wmap/wmap.go +++ b/scheduler/wmap/wmap.go @@ -153,6 +153,7 @@ func (w *WorkflowMap) ToYaml() ([]byte, error) { type CollectWorkflowMapNode struct { Metrics map[string]metricInfo `json:"metrics"yaml:"metrics"` Config map[string]map[string]interface{} `json:"config,omitempty"yaml:"config"` + Tags map[string]map[string]string `json:"tags,omitempty"yaml:"tags"` ProcessNodes []ProcessWorkflowMapNode `json:"process,omitempty"yaml:"process"` PublishNodes []PublishWorkflowMapNode `json:"publish,omitempty"yaml:"publish"` } @@ -171,6 +172,10 @@ func (c *CollectWorkflowMapNode) GetMetrics() []Metric { return metrics } +func (c *CollectWorkflowMapNode) GetTags() map[string]map[string]string { + return c.Tags +} + // GetConfigTree converts config data for collection node in wmap into a proper cdata.ConfigDataTree func (c *CollectWorkflowMapNode) GetConfigTree() (*cdata.ConfigDataTree, error) { cdt := cdata.NewTree() diff --git a/scheduler/wmap/wmap_small_test.go b/scheduler/wmap/wmap_small_test.go new file mode 100644 index 000000000..4b19b6f31 --- /dev/null +++ b/scheduler/wmap/wmap_small_test.go @@ -0,0 +1,225 @@ +// + build small + +/* +http://www.apache.org/licenses/LICENSE-2.0.txt + + +Copyright 2015 Intel Corporation + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package wmap + +import ( + "testing" + + . "github.com/smartystreets/goconvey/convey" + + "github.com/intelsdi-x/snap/scheduler/wmap/fixtures" +) + +func TestWorkflowFromYAML(t *testing.T) { + Convey("Workflow map from yaml", t, func() { + wmap, err := FromYaml(fixtures.TaskYAML) + So(err, ShouldBeNil) + So(wmap, ShouldNotBeNil) + + wmap, err = FromYaml(1) + So(err, ShouldNotBeEmpty) + So(wmap, ShouldBeNil) + }) +} + +func TestWorkflowFromJSON(t *testing.T) { + Convey("Workflow map from json", t, func() { + wmap, err := FromJson(fixtures.TaskJSON) + So(err, ShouldBeNil) + So(wmap, ShouldNotBeNil) + + wmap, err = FromJson(1) + So(err, ShouldNotBeEmpty) + So(wmap, ShouldBeNil) + }) +} + +func TestSampleWorkflows(t *testing.T) { + Convey("Sampling workflow map to json", t, func() { + So(SampleWorkflowMapJson(), ShouldNotBeEmpty) + }) + + Convey("Sampling workflow map to yaml", t, func() { + So(SampleWorkflowMapYaml(), ShouldNotBeEmpty) + }) + +} + +func TestTagsOnWorkflow(t *testing.T) { + Convey("Extracting tags from workflow", t, func() { + Convey("From JSON", func() { + wmap, _ := FromJson(fixtures.TaskJSON) + tags := wmap.CollectNode.GetTags() + So(tags, ShouldNotBeNil) + So(tags, ShouldResemble, map[string]map[string]string{ + "/foo/bar": { + "tag1": "val1", + "tag2": "val2", + }, + "/foo/baz": { + "tag3": "val3", + }, + }) + }) + + Convey("From YAML", func() { + wmap, _ := FromYaml(fixtures.TaskYAML) + tags := wmap.CollectNode.GetTags() + So(tags, ShouldNotBeNil) + So(tags, ShouldResemble, map[string]map[string]string{ + "/foo/bar": { + "tag1": "val1", + "tag2": "val2", + }, + "/foo/baz": { + "tag3": "val3", + }, + }) + }) + + }) +} + +func TestWfGetRequestedMetrics(t *testing.T) { + Convey("NewWorkFlowMap()/GetRequestedMetrics()", t, func() { + wmap := NewWorkflowMap() + So(wmap, ShouldNotBeNil) + So(wmap.CollectNode.GetMetrics(), ShouldBeEmpty) + wmap.CollectNode.AddMetric("/foo/bar", 1) + So(wmap.CollectNode.GetMetrics(), ShouldNotBeEmpty) + wmap.CollectNode.GetMetrics()[0].Namespace() + So(wmap.CollectNode.GetMetrics()[0].Namespace(), ShouldResemble, []string{"foo", "bar"}) + wmap.CollectNode.GetMetrics()[0].Version() + So(wmap.CollectNode.GetMetrics()[0].Version(), ShouldResemble, 1) + }) +} + +func TestWfAddConfigItem(t *testing.T) { + Convey("AddMetric()/AddConfigItem()", t, func() { + wmap := NewWorkflowMap() + So(wmap, ShouldNotBeNil) + So(wmap.CollectNode.Metrics, ShouldBeEmpty) + wmap.CollectNode.AddMetric("/foo/bar", 1) + So(wmap.CollectNode.Metrics, ShouldNotBeEmpty) + So(wmap.CollectNode.Config, ShouldBeEmpty) + wmap.CollectNode.AddConfigItem("/foo/bar", "user", "bob") + So(wmap.CollectNode.Config, ShouldNotBeEmpty) + }) +} + +func TestWfPublishProcessNodes(t *testing.T) { + Convey("Add()/New Process/New Publish nodes", t, func() { + wmap := NewWorkflowMap() + wmap.CollectNode.AddConfigItem("/foo/bar", "user", "stu") + + pr1 := &ProcessWorkflowMapNode{ + Name: "oslo", + Version: 1, + Config: make(map[string]interface{}), + } + + pr1.Config["version"] = "kilo" + + //NewProcessNode, NewPublishNode + pr2 := NewProcessNode("floor", 1) + pu1 := NewPublishNode("isis", 1) + pu2 := NewPublishNode("zorro", 1) + + //Collect Node Add + wmap.CollectNode.Add(pr1) //case process node + wmap.CollectNode.Add(pu1) //case publish node + wmap.CollectNode.Add(wmap.CollectNode) //case default + + So(wmap.CollectNode.ProcessNodes, ShouldNotBeEmpty) + So(wmap.CollectNode.PublishNodes, ShouldNotBeEmpty) + + //Process Node Add + wmap.CollectNode.ProcessNodes[0].Add(pr2) + wmap.CollectNode.ProcessNodes[0].Add(pu2) + wmap.CollectNode.ProcessNodes[0].Add(wmap.CollectNode) + + So(wmap.CollectNode.ProcessNodes[0].ProcessNodes, ShouldNotBeEmpty) + So(wmap.CollectNode.ProcessNodes[0].PublishNodes, ShouldNotBeEmpty) + + //GetConfigNode() nil case + cn, err := wmap.CollectNode.ProcessNodes[0].ProcessNodes[0].GetConfigNode() + So(cn, ShouldNotBeEmpty) + So(err, ShouldBeNil) + cn, err = wmap.CollectNode.PublishNodes[0].GetConfigNode() + So(cn, ShouldNotBeEmpty) + So(err, ShouldBeNil) + + }) + +} + +func TestWfGetConfigNodeTree(t *testing.T) { + Convey("Gets the config tree and the config node", t, func() { + wmap := NewWorkflowMap() + wmap.CollectNode.AddConfigItem("/foo/bar", "user", "stu") + pu1 := NewPublishNode("stuff", 1) + pr1 := NewProcessNode("name", 1) + pr2 := NewProcessNode("thing", 1) + pr3 := NewProcessNode("thing", 1) + + wmap.CollectNode.Add(pu1) + wmap.CollectNode.Add(pr1) + wmap.CollectNode.Add(pr2) + + wmap.CollectNode.ProcessNodes[0].Add(pr3) + wmap.CollectNode.PublishNodes[0].AddConfigItem("key", 1) + wmap.CollectNode.ProcessNodes[0].AddConfigItem("key", 3.14) + wmap.CollectNode.ProcessNodes[1].AddConfigItem("key", true) + wmap.CollectNode.ProcessNodes[0].ProcessNodes[0].AddConfigItem("key", struct{}{}) + + pu1conf, err2 := wmap.CollectNode.PublishNodes[0].GetConfigNode() + So(pu1conf, ShouldNotBeEmpty) + So(err2, ShouldBeNil) + + pr1conf, err3 := wmap.CollectNode.ProcessNodes[0].GetConfigNode() + So(pr1conf, ShouldNotBeEmpty) + So(err3, ShouldBeNil) + + pr2conf, err3 := wmap.CollectNode.ProcessNodes[1].GetConfigNode() + So(pr2conf, ShouldNotBeEmpty) + So(err3, ShouldBeNil) + + pr3conf, err4 := wmap.CollectNode.ProcessNodes[0].ProcessNodes[0].GetConfigNode() + So(pr3conf, ShouldNotBeEmpty) + So(err4, ShouldNotBeNil) + + ctree, err := wmap.CollectNode.GetConfigTree() + So(ctree, ShouldNotBeEmpty) + So(err, ShouldBeNil) + }) +} + +func TestStringByteConvertion(t *testing.T) { + Convey("Converts strings to bytes or keeps byte type", t, func() { + p, err := inStringBytes("test") + So(p, ShouldResemble, []byte("test")) + So(err, ShouldBeNil) + p, err = inStringBytes(1) + So(p, ShouldBeEmpty) + So(err, ShouldNotBeNil) + }) +} diff --git a/scheduler/wmap/wmap_test.go b/scheduler/wmap/wmap_test.go index 144090ddc..c7b196dec 100644 --- a/scheduler/wmap/wmap_test.go +++ b/scheduler/wmap/wmap_test.go @@ -62,6 +62,36 @@ func TestWorkflow(t *testing.T) { So(SampleWorkflowMapYaml(), ShouldNotBeEmpty) }) + Convey("from json/CollectNode.GetTags()", func() { + wmap, _ := FromJson(jsonP) + tags := wmap.CollectNode.GetTags() + So(tags, ShouldNotBeNil) + So(tags, ShouldResemble, map[string]map[string]string{ + "/foo/bar": { + "tag1": "val1", + "tag2": "val2", + }, + "/foo/baz": { + "tag3": "val3", + }, + }) + }) + + Convey("from yaml/CollectNode.GetTags()", func() { + wmap, _ := FromYaml(jsonP) + tags := wmap.CollectNode.GetTags() + So(tags, ShouldNotBeNil) + So(tags, ShouldResemble, map[string]map[string]string{ + "/foo/bar": { + "tag1": "val1", + "tag2": "val2", + }, + "/foo/baz": { + "tag3": "val3", + }, + }) + }) + Convey("NewWorkFlowMap()/GetRequestedMetrics()", func() { wmap := NewWorkflowMap() So(wmap, ShouldNotBeNil) diff --git a/scheduler/workflow.go b/scheduler/workflow.go index 9c5bf18d3..27064e089 100644 --- a/scheduler/workflow.go +++ b/scheduler/workflow.go @@ -89,6 +89,8 @@ func convertCollectionNode(cnode *wmap.CollectWorkflowMapNode, wf *schedulerWork for i, m := range mts { wf.metrics[i] = &metric{namespace: core.NewNamespace(m.Namespace()...), version: m.Version()} } + // get tags defined + wf.tags = cnode.GetTags() // Get our config data tree cdt, err := cnode.GetConfigTree() @@ -177,6 +179,7 @@ type schedulerWorkflow struct { // workflowMap used to generate this workflow workflowMap *wmap.WorkflowMap eventEmitter gomit.Emitter + tags map[string]map[string]string } type processNode struct { @@ -309,7 +312,7 @@ func (s *schedulerWorkflow) Start(t *task) { "task-name": t.name, }).Info(fmt.Sprintf("Starting workflow for task (%s\\%s)", t.id, t.name)) s.state = WorkflowStarted - j := newCollectorJob(s.metrics, t.deadlineDuration, t.metricsManager, t.workflow.configTree, t.id) + j := newCollectorJob(s.metrics, t.deadlineDuration, t.metricsManager, t.workflow.configTree, t.id, s.tags) // dispatch 'collect' job to be worked // Block until the job has been either run or skipped. diff --git a/scheduler/workflow_test.go b/scheduler/workflow_test.go index 227f6f791..45fb8189c 100644 --- a/scheduler/workflow_test.go +++ b/scheduler/workflow_test.go @@ -264,7 +264,7 @@ type Mock1 struct { queue map[string]int } -func (m *Mock1) CollectMetrics([]core.Metric, time.Time, string) ([]core.Metric, []error) { +func (m *Mock1) CollectMetrics([]core.Metric, time.Time, string, map[string]map[string]string) ([]core.Metric, []error) { return nil, nil } @@ -324,7 +324,7 @@ func TestWorkJobs(t *testing.T) { Convey("Test speed and concurrency of TestWorkJobs\n", t, func() { Convey("submit multiple jobs\n", func() { m1 := &Mock1{queue: make(map[string]int)} - pj := newCollectorJob(nil, time.Second*1, m1, nil, "") + pj := newCollectorJob(nil, time.Second*1, m1, nil, "", nil) prs := make([]*processNode, 0) pus := make([]*publishNode, 0) counter := 0 @@ -344,7 +344,7 @@ func TestWorkJobs(t *testing.T) { }) Convey("submit multiple jobs with nesting", func() { m2 := &Mock1{queue: make(map[string]int)} - pj := newCollectorJob(nil, time.Second*1, m2, nil, "") + pj := newCollectorJob(nil, time.Second*1, m2, nil, "", nil) prs := make([]*processNode, 0) pus := make([]*publishNode, 0) counter := 0 @@ -385,7 +385,7 @@ func TestWorkJobs(t *testing.T) { m3 := &Mock1{queue: make(map[string]int)} // make the 13th job fail m3.errorIndex = 13 - pj := newCollectorJob(nil, time.Second*1, m3, nil, "") + pj := newCollectorJob(nil, time.Second*1, m3, nil, "", nil) prs := make([]*processNode, 0) pus := make([]*publishNode, 0) counter := 0