diff --git a/control/available_plugin.go b/control/available_plugin.go index 492d2ff2c..44aedaed6 100644 --- a/control/available_plugin.go +++ b/control/available_plugin.go @@ -376,6 +376,9 @@ func (p *apPool) applyPluginMeta(ap *availablePlugin) error { switch ap.meta.RoutingStrategy { case plugin.DefaultRouting: p.strategy = strategy.NewLRU(cacheTTL) + case plugin.StickyRouting: + p.strategy = strategy.NewSticky(cacheTTL) + ap.meta.ConcurrencyCount = 1 default: return ErrBadStrategy } @@ -442,43 +445,24 @@ func (p *apPool) kill(id uint32, reason string) { } } -// kills the plugin with the lowest hit count -func (p *apPool) killLeastUsed(reason string) { - p.Lock() - defer p.Unlock() - - var ( - id uint32 - prev int - ) - - // grab details from the first item - for _, p := range p.plugins { - prev = p.hitCount - id = p.id - break - } - - // walk through all and find the lowest hit count - for _, p := range p.plugins { - if p.hitCount < prev { - prev = p.hitCount - id = p.id - } +func (p *apPool) selectAndKill(taskID, reason string) { + sp := make([]strategy.SelectablePlugin, p.count()) + i := 0 + for _, plg := range p.plugins { + sp[i] = plg + i++ } - - // kill that ap - ap, ok := p.plugins[id] - if ok { - // only log on first ok health check + sap, err := p.strategy.Select(sp, taskID) + if err != nil || sap == nil { log.WithFields(log.Fields{ - "_module": "control-aplugin", - "block": "kill-least-used", - "aplugin": ap, - }).Debug("killing available plugin") - ap.Kill(reason) - delete(p.plugins, id) + "_block": "selectAndKill", + "taskID": taskID, + "reason": reason, + }).Error(err) + return } + p.kill(sap.(*availablePlugin).ID(), reason) + p.strategy.Remove(sap, taskID) } // remove removes an available plugin from the the pool. @@ -506,7 +490,7 @@ func (p *apPool) subscriptionCount() int { return len(p.subs) } -func (p *apPool) selectAP() (*availablePlugin, serror.SnapError) { +func (p *apPool) selectAP(taskID string) (*availablePlugin, serror.SnapError) { p.RLock() defer p.RUnlock() @@ -516,7 +500,7 @@ func (p *apPool) selectAP() (*availablePlugin, serror.SnapError) { sp[i] = plg i++ } - sap, err := p.strategy.Select(sp) + sap, err := p.strategy.Select(sp, taskID) if err != nil || sap == nil { return nil, serror.New(err) } @@ -550,20 +534,20 @@ type subscription struct { taskID string } -func (p *apPool) CheckCache(mts []core.Metric) ([]core.Metric, []core.Metric) { - return p.strategy.CheckCache(mts) +func (p *apPool) CheckCache(mts []core.Metric, taskID string) ([]core.Metric, []core.Metric) { + return p.strategy.CheckCache(mts, taskID) } -func (p *apPool) UpdateCache(mts []core.Metric) { - p.strategy.UpdateCache(mts) +func (p *apPool) UpdateCache(mts []core.Metric, taskID string) { + p.strategy.UpdateCache(mts, taskID) } -func (p *apPool) CacheHits(ns string, ver int) (uint64, error) { - return p.strategy.CacheHits(ns, ver) +func (p *apPool) CacheHits(ns string, ver int, taskID string) (uint64, error) { + return p.strategy.CacheHits(ns, ver, taskID) } -func (p *apPool) CacheMisses(ns string, ver int) (uint64, error) { - return p.strategy.CacheMisses(ns, ver) +func (p *apPool) CacheMisses(ns string, ver int, taskID string) (uint64, error) { + return p.strategy.CacheMisses(ns, ver, taskID) } func (p *apPool) AllCacheHits() uint64 { return p.strategy.AllCacheHits() @@ -573,11 +557,11 @@ func (p *apPool) AllCacheMisses() uint64 { return p.strategy.AllCacheMisses() } -func (p *apPool) CacheTTL() (time.Duration, error) { +func (p *apPool) CacheTTL(taskID string) (time.Duration, error) { if len(p.plugins) == 0 { return 0, ErrPoolEmpty } - return p.strategy.CacheTTL(), nil + return p.strategy.CacheTTL(taskID), nil } type availablePlugins struct { @@ -649,7 +633,7 @@ func (ap *availablePlugins) getPool(key string) (*apPool, serror.SnapError) { return pool, nil } -func (ap *availablePlugins) collectMetrics(pluginKey string, metricTypes []core.Metric) ([]core.Metric, error) { +func (ap *availablePlugins) collectMetrics(pluginKey string, metricTypes []core.Metric, taskID string) ([]core.Metric, error) { var results []core.Metric pool, serr := ap.getPool(pluginKey) if serr != nil { @@ -659,7 +643,7 @@ func (ap *availablePlugins) collectMetrics(pluginKey string, metricTypes []core. return nil, serror.New(ErrPoolNotFound, map[string]interface{}{"pool-key": pluginKey}) } - metricsToCollect, metricsFromCache := pool.CheckCache(metricTypes) + metricsToCollect, metricsFromCache := pool.CheckCache(metricTypes, taskID) if len(metricsToCollect) == 0 { return metricsFromCache, nil @@ -667,7 +651,7 @@ func (ap *availablePlugins) collectMetrics(pluginKey string, metricTypes []core. pool.RLock() defer pool.RUnlock() - p, serr := pool.selectAP() + p, serr := pool.selectAP(taskID) if serr != nil { return nil, serr } @@ -684,7 +668,7 @@ func (ap *availablePlugins) collectMetrics(pluginKey string, metricTypes []core. return nil, serror.New(err) } - pool.UpdateCache(metrics) + pool.UpdateCache(metrics, taskID) results = make([]core.Metric, len(metricsFromCache)+len(metrics)) idx := 0 @@ -704,7 +688,7 @@ func (ap *availablePlugins) collectMetrics(pluginKey string, metricTypes []core. return metrics, nil } -func (ap *availablePlugins) publishMetrics(contentType string, content []byte, pluginName string, pluginVersion int, config map[string]ctypes.ConfigValue) []error { +func (ap *availablePlugins) publishMetrics(contentType string, content []byte, pluginName string, pluginVersion int, config map[string]ctypes.ConfigValue, taskID string) []error { var errs []error key := strings.Join([]string{plugin.PublisherPluginType.String(), pluginName, strconv.Itoa(pluginVersion)}, ":") pool, serr := ap.getPool(key) @@ -718,7 +702,7 @@ func (ap *availablePlugins) publishMetrics(contentType string, content []byte, p pool.RLock() defer pool.RUnlock() - p, err := pool.selectAP() + p, err := pool.selectAP(taskID) if err != nil { errs = append(errs, err) return errs @@ -738,7 +722,7 @@ func (ap *availablePlugins) publishMetrics(contentType string, content []byte, p return nil } -func (ap *availablePlugins) processMetrics(contentType string, content []byte, pluginName string, pluginVersion int, config map[string]ctypes.ConfigValue) (string, []byte, []error) { +func (ap *availablePlugins) processMetrics(contentType string, content []byte, pluginName string, pluginVersion int, config map[string]ctypes.ConfigValue, taskID string) (string, []byte, []error) { var errs []error key := strings.Join([]string{plugin.ProcessorPluginType.String(), pluginName, strconv.Itoa(pluginVersion)}, ":") pool, serr := ap.getPool(key) @@ -752,7 +736,7 @@ func (ap *availablePlugins) processMetrics(contentType string, content []byte, p pool.RLock() defer pool.RUnlock() - p, err := pool.selectAP() + p, err := pool.selectAP(taskID) if err != nil { errs = append(errs, err) return "", nil, errs @@ -809,17 +793,17 @@ func (ap *availablePlugins) getOrCreatePool(key string) (*apPool, error) { return pool, nil } -func (ap *availablePlugins) selectAP(key string) (*availablePlugin, serror.SnapError) { - ap.RLock() - defer ap.RUnlock() +// func (ap *availablePlugins) selectAP(key string) (*availablePlugin, serror.SnapError) { +// ap.RLock() +// defer ap.RUnlock() - pool, err := ap.getPool(key) - if err != nil { - return nil, err - } +// pool, err := ap.getPool(key) +// if err != nil { +// return nil, err +// } - return pool.selectAP() -} +// return pool.selectAP() +// } func (ap *availablePlugins) pools() map[string]*apPool { ap.RLock() diff --git a/control/control.go b/control/control.go index b00ffbcfb..d5f9f4034 100644 --- a/control/control.go +++ b/control/control.go @@ -836,7 +836,7 @@ func (p *pluginControl) MetricExists(mns []string, ver int) bool { // CollectMetrics is a blocking call to collector plugins returning a collection // of metrics and errors. If an error is encountered no metrics will be // returned. -func (p *pluginControl) CollectMetrics(metricTypes []core.Metric, deadline time.Time) (metrics []core.Metric, errs []error) { +func (p *pluginControl) CollectMetrics(metricTypes []core.Metric, deadline time.Time, taskID string) (metrics []core.Metric, errs []error) { pluginToMetricMap, err := groupMetricTypesByPlugin(p.metricCatalog, metricTypes) if err != nil { errs = append(errs, err) @@ -859,7 +859,7 @@ func (p *pluginControl) CollectMetrics(metricTypes []core.Metric, deadline time. wg.Add(1) go func(pluginKey string, mt []core.Metric) { - mts, err := p.pluginRunner.AvailablePlugins().collectMetrics(pluginKey, mt) + mts, err := p.pluginRunner.AvailablePlugins().collectMetrics(pluginKey, mt, taskID) if err != nil { cError <- err } else { @@ -893,23 +893,23 @@ func (p *pluginControl) CollectMetrics(metricTypes []core.Metric, deadline time. } // PublishMetrics -func (p *pluginControl) PublishMetrics(contentType string, content []byte, pluginName string, pluginVersion int, config map[string]ctypes.ConfigValue) []error { +func (p *pluginControl) PublishMetrics(contentType string, content []byte, pluginName string, pluginVersion int, config map[string]ctypes.ConfigValue, taskID string) []error { // merge global plugin config into the config for this request cfg := p.Config.Plugins.getPluginConfigDataNode(core.PublisherPluginType, pluginName, pluginVersion).Table() for k, v := range config { cfg[k] = v } - return p.pluginRunner.AvailablePlugins().publishMetrics(contentType, content, pluginName, pluginVersion, cfg) + return p.pluginRunner.AvailablePlugins().publishMetrics(contentType, content, pluginName, pluginVersion, cfg, taskID) } // ProcessMetrics -func (p *pluginControl) ProcessMetrics(contentType string, content []byte, pluginName string, pluginVersion int, config map[string]ctypes.ConfigValue) (string, []byte, []error) { +func (p *pluginControl) ProcessMetrics(contentType string, content []byte, pluginName string, pluginVersion int, config map[string]ctypes.ConfigValue, taskID string) (string, []byte, []error) { // merge global plugin config into the config for this request cfg := p.Config.Plugins.getPluginConfigDataNode(core.ProcessorPluginType, pluginName, pluginVersion).Table() for k, v := range config { cfg[k] = v } - return p.pluginRunner.AvailablePlugins().processMetrics(contentType, content, pluginName, pluginVersion, cfg) + return p.pluginRunner.AvailablePlugins().processMetrics(contentType, content, pluginName, pluginVersion, cfg, taskID) } // GetPluginContentTypes returns accepted and returned content types for the diff --git a/control/control_test.go b/control/control_test.go index d29e09848..9a2cdd897 100644 --- a/control/control_test.go +++ b/control/control_test.go @@ -25,11 +25,13 @@ import ( "encoding/json" "errors" "fmt" + "math/rand" "path" "strings" "testing" "time" + "github.com/pborman/uuid" . "github.com/smartystreets/goconvey/convey" "github.com/intelsdi-x/gomit" @@ -824,6 +826,131 @@ func TestMetricConfig(t *testing.T) { }) } +func TestRoutingCachingStrategy(t *testing.T) { + Convey("Given loaded plugins that use sticky routing", t, func() { + config := NewConfig() + config.Plugins.All.AddItem("password", ctypes.ConfigValueStr{Value: "testval"}) + c := New(OptSetConfig(config)) + c.Start() + lpe := newListenToPluginEvent() + c.eventManager.RegisterHandler("Control.PluginLoaded", lpe) + _, e := load(c, PluginPath) + So(e, ShouldBeNil) + if e != nil { + t.FailNow() + } + metric, err := c.metricCatalog.Get([]string{"intel", "mock", "foo"}, 2) + So(err, ShouldBeNil) + So(metric.NamespaceAsString(), ShouldResemble, "/intel/mock/foo") + So(err, ShouldBeNil) + <-lpe.done + Convey("Start the plugins", func() { + lp, err := c.pluginManager.get("collector:mock:2") + So(err, ShouldBeNil) + So(lp, ShouldNotBeNil) + pool, errp := c.pluginRunner.AvailablePlugins().getOrCreatePool("collector:mock:2") + So(errp, ShouldBeNil) + So(pool, ShouldNotBeNil) + tasks := []string{ + uuid.New(), + uuid.New(), + uuid.New(), + uuid.New(), + uuid.New(), + } + for _, id := range tasks { + pool.subscribe(id, boundSubscriptionType) //TODO (JC) unbound or bounded here? + err = c.pluginRunner.runPlugin(lp.Details) + So(err, ShouldBeNil) + } + // The cache ttl should be 100ms which is what the plugin exposed (no system default was provided) + ttl, err := pool.CacheTTL(tasks[0]) + So(err, ShouldBeNil) + So(ttl, ShouldResemble, 100*time.Millisecond) + So(pool.strategy, ShouldHaveSameTypeAs, strategy.NewSticky(ttl)) + So(pool.count(), ShouldEqual, len(tasks)) + So(pool.subscriptionCount(), ShouldEqual, len(tasks)) + Convey("Collect metrics", func() { + taskID := tasks[rand.Intn(len(tasks))] + for i := 0; i < 10; i++ { + _, errs := c.CollectMetrics([]core.Metric{metric}, time.Now().Add(time.Second*1), taskID) + So(errs, ShouldBeEmpty) + } + Convey("Check cache stats", func() { + So(pool.AllCacheHits(), ShouldEqual, 9) + So(pool.AllCacheMisses(), ShouldEqual, 1) + }) + }) + }) + }) + + Convey("Given loaded plugins that use least-recently-used routing", t, func() { + c := New() + c.Start() + c.Config.Plugins.Collector.Plugins["mock"] = newPluginConfigItem( + optAddPluginConfigItem("test", ctypes.ConfigValueBool{Value: true}), + optAddPluginConfigItem("user", ctypes.ConfigValueStr{Value: "jane"}), + optAddPluginConfigItem("password", ctypes.ConfigValueStr{Value: "doe"}), + ) + lpe := newListenToPluginEvent() + c.eventManager.RegisterHandler("Control.PluginLoaded", lpe) + _, e := load(c, JSONRPCPluginPath) + So(e, ShouldBeNil) + if e != nil { + t.FailNow() + } + metric, err := c.metricCatalog.Get([]string{"intel", "mock", "foo"}, 1) + metric.config = cdata.NewNode() + So(err, ShouldBeNil) + So(metric.NamespaceAsString(), ShouldResemble, "/intel/mock/foo") + So(err, ShouldBeNil) + <-lpe.done + Convey("Start the plugins", func() { + lp, err := c.pluginManager.get("collector:mock:1") + So(err, ShouldBeNil) + So(lp, ShouldNotBeNil) + pool, errp := c.pluginRunner.AvailablePlugins().getOrCreatePool("collector:mock:1") + time.Sleep(1 * time.Second) + So(errp, ShouldBeNil) + So(pool, ShouldNotBeNil) + tasks := []string{ + uuid.New(), + uuid.New(), + uuid.New(), + uuid.New(), + uuid.New(), + } + for _, id := range tasks { + pool.subscribe(id, boundSubscriptionType) + err = c.pluginRunner.runPlugin(lp.Details) + So(err, ShouldBeNil) + } + // The cache ttl should be 100ms which is what the plugin exposed (no system default was provided) + ttl, err := pool.CacheTTL(tasks[0]) + So(err, ShouldBeNil) + So(ttl, ShouldResemble, 1100*time.Millisecond) + So(pool.strategy, ShouldHaveSameTypeAs, strategy.NewLRU(ttl)) + So(pool.count(), ShouldEqual, len(tasks)) + So(pool.subscriptionCount(), ShouldEqual, len(tasks)) + Convey("Collect metrics", func() { + taskID := tasks[rand.Intn(len(tasks))] + for i := 0; i < 10; i++ { + cr, errs := c.CollectMetrics([]core.Metric{metric}, time.Now().Add(time.Second*1), taskID) + So(errs, ShouldBeEmpty) + for i := range cr { + So(cr[i].Data(), ShouldContainSubstring, "The mock collected data!") + So(cr[i].Data(), ShouldContainSubstring, "test=true") + } + } + Convey("Check cache stats", func() { + So(pool.AllCacheHits(), ShouldEqual, 9) + So(pool.AllCacheMisses(), ShouldEqual, 1) + }) + }) + }) + }) +} + func TestCollectDynamicMetrics(t *testing.T) { Convey("given a plugin using the native client", t, func() { config := NewConfig() @@ -873,30 +1000,38 @@ func TestCollectDynamicMetrics(t *testing.T) { pool, errp := c.pluginRunner.AvailablePlugins().getOrCreatePool("collector:mock:2") So(errp, ShouldBeNil) So(pool, ShouldNotBeNil) - ttl, err := pool.CacheTTL() + taskID := uuid.New() + ttl, err := pool.CacheTTL(taskID) So(err, ShouldResemble, ErrPoolEmpty) So(ttl, ShouldEqual, 0) - pool.subscribe("1", unboundSubscriptionType) + So(pool.count(), ShouldEqual, 0) + So(pool.subscriptionCount(), ShouldEqual, 0) + pool.subscribe(taskID, unboundSubscriptionType) err = c.pluginRunner.runPlugin(lp.Details) + So(pool.count(), ShouldEqual, 1) + So(pool.subscriptionCount(), ShouldEqual, 1) So(err, ShouldBeNil) - ttl, err = pool.CacheTTL() + ttl, err = pool.CacheTTL(taskID) So(err, ShouldBeNil) // The minimum TTL advertised by the plugin is 100ms therefore the TTL for the // pool should be the global cache expiration So(ttl, ShouldEqual, strategy.GlobalCacheExpiration) - mts, errs := c.CollectMetrics([]core.Metric{m}, time.Now().Add(time.Second*1)) - hits, err := pool.CacheHits(core.JoinNamespace(m.namespace), 2) + mts, errs := c.CollectMetrics([]core.Metric{m}, time.Now().Add(time.Second*1), taskID) + hits, err := pool.CacheHits(core.JoinNamespace(m.namespace), 2, taskID) So(err, ShouldBeNil) So(hits, ShouldEqual, 0) So(errs, ShouldBeNil) So(len(mts), ShouldEqual, 10) - mts, errs = c.CollectMetrics([]core.Metric{m}, time.Now().Add(time.Second*1)) - hits, err = pool.CacheHits(core.JoinNamespace(m.namespace), 2) + mts, errs = c.CollectMetrics([]core.Metric{m}, time.Now().Add(time.Second*1), taskID) + hits, err = pool.CacheHits(core.JoinNamespace(m.namespace), 2, taskID) So(err, ShouldBeNil) So(hits, ShouldEqual, 1) So(errs, ShouldBeNil) So(len(mts), ShouldEqual, 10) - pool.unsubscribe("1") + pool.unsubscribe(taskID) + pool.selectAndKill(taskID, "unsubscription event") + So(pool.count(), ShouldEqual, 0) + So(pool.subscriptionCount(), ShouldEqual, 0) Convey("collects metrics from plugin using httpjson client", func() { lp, err := c.pluginManager.get("collector:mock:1") So(err, ShouldBeNil) @@ -904,17 +1039,21 @@ func TestCollectDynamicMetrics(t *testing.T) { pool, errp := c.pluginRunner.AvailablePlugins().getOrCreatePool("collector:mock:1") So(errp, ShouldBeNil) So(pool, ShouldNotBeNil) - ttl, err := pool.CacheTTL() + ttl, err := pool.CacheTTL(taskID) So(err, ShouldResemble, ErrPoolEmpty) So(ttl, ShouldEqual, 0) + So(pool.count(), ShouldEqual, 0) + So(pool.subscriptionCount(), ShouldEqual, 0) pool.subscribe("1", unboundSubscriptionType) err = c.pluginRunner.runPlugin(lp.Details) + So(pool.count(), ShouldEqual, 1) + So(pool.subscriptionCount(), ShouldEqual, 1) So(err, ShouldBeNil) - ttl, err = pool.CacheTTL() + ttl, err = pool.CacheTTL(taskID) So(err, ShouldBeNil) So(ttl, ShouldEqual, 1100*time.Millisecond) - mts, errs := c.CollectMetrics([]core.Metric{jsonm}, time.Now().Add(time.Second*1)) - hits, err := pool.CacheHits(core.JoinNamespace(jsonm.namespace), jsonm.version) + mts, errs := c.CollectMetrics([]core.Metric{jsonm}, time.Now().Add(time.Second*1), uuid.New()) + hits, err := pool.CacheHits(core.JoinNamespace(jsonm.namespace), jsonm.version, taskID) So(pool.subscriptionCount(), ShouldEqual, 1) So(pool.strategy, ShouldNotBeNil) So(len(mts), ShouldBeGreaterThan, 0) @@ -922,8 +1061,8 @@ func TestCollectDynamicMetrics(t *testing.T) { So(hits, ShouldEqual, 0) So(errs, ShouldBeNil) So(len(mts), ShouldEqual, 10) - mts, errs = c.CollectMetrics([]core.Metric{jsonm}, time.Now().Add(time.Second*1)) - hits, err = pool.CacheHits(core.JoinNamespace(m.namespace), 1) + mts, errs = c.CollectMetrics([]core.Metric{jsonm}, time.Now().Add(time.Second*1), uuid.New()) + hits, err = pool.CacheHits(core.JoinNamespace(m.namespace), 1, taskID) So(err, ShouldBeNil) So(hits, ShouldEqual, 1) So(errs, ShouldBeNil) @@ -931,6 +1070,9 @@ func TestCollectDynamicMetrics(t *testing.T) { So(pool.AllCacheHits(), ShouldEqual, 1) So(pool.AllCacheMisses(), ShouldEqual, 1) pool.unsubscribe("1") + pool.selectAndKill("1", "unsubscription event") + So(pool.count(), ShouldEqual, 0) + So(pool.subscriptionCount(), ShouldEqual, 0) c.Stop() time.Sleep(100 * time.Millisecond) }) @@ -996,7 +1138,7 @@ func TestCollectMetrics(t *testing.T) { m = append(m, m1, m2, m3) Convey("collect metrics", func() { for x := 0; x < 4; x++ { - cr, err := c.CollectMetrics(m, time.Now().Add(time.Second*1)) + cr, err := c.CollectMetrics(m, time.Now().Add(time.Second*1), uuid.New()) So(err, ShouldBeNil) for i := range cr { So(cr[i].Data(), ShouldContainSubstring, "The mock collected data!") @@ -1007,10 +1149,11 @@ func TestCollectMetrics(t *testing.T) { So(ap, ShouldNotBeEmpty) So(pool.strategy.String(), ShouldEqual, plugin.DefaultRouting.String()) So(len(pool.plugins), ShouldEqual, 2) - for _, p := range pool.plugins { - So(p.hitCount, ShouldEqual, 2) - So(p.hitCount, ShouldEqual, 2) - } + // when the first first plugin is hit the cache is populated the + // cache satifies the next 3 collect calls that come in within the + // cache duration + So(pool.plugins[1].hitCount, ShouldEqual, 1) + So(pool.plugins[2].hitCount, ShouldEqual, 0) c.Stop() }) }) @@ -1027,7 +1170,7 @@ func TestCollectMetrics(t *testing.T) { c.Start() load(c, PluginPath) m := []core.Metric{} - c.CollectMetrics(m, time.Now().Add(time.Second*60)) + c.CollectMetrics(m, time.Now().Add(time.Second*60), uuid.New()) c.Stop() time.Sleep(100 * time.Millisecond) }) @@ -1101,7 +1244,7 @@ func TestPublishMetrics(t *testing.T) { enc := gob.NewEncoder(&buf) enc.Encode(metrics) contentType := plugin.SnapGOBContentType - errs := c.PublishMetrics(contentType, buf.Bytes(), "file", 3, n.Table()) + errs := c.PublishMetrics(contentType, buf.Bytes(), "file", 3, n.Table(), uuid.New()) So(errs, ShouldBeNil) ap := c.AvailablePlugins() So(ap, ShouldNotBeEmpty) @@ -1154,7 +1297,7 @@ func TestProcessMetrics(t *testing.T) { enc := gob.NewEncoder(&buf) enc.Encode(metrics) contentType := plugin.SnapGOBContentType - _, ct, errs := c.ProcessMetrics(contentType, buf.Bytes(), "passthru", 1, n.Table()) + _, ct, errs := c.ProcessMetrics(contentType, buf.Bytes(), "passthru", 1, n.Table(), uuid.New()) So(errs, ShouldBeEmpty) mts := []plugin.PluginMetricType{} dec := gob.NewDecoder(bytes.NewBuffer(ct)) diff --git a/control/runner.go b/control/runner.go index c1180ee18..659ec5605 100644 --- a/control/runner.go +++ b/control/runner.go @@ -421,7 +421,7 @@ func (r *runner) handleUnsubscription(pType, pName string, pVersion int, taskID "pool-count": pool.count(), "pool-subscription-count": pool.subscriptionCount(), }).Debug(fmt.Sprintf("killing an available plugin in pool %s:%s:%d", pType, pName, pVersion)) - pool.killLeastUsed("unsubscription event") + pool.selectAndKill(taskID, "unsubscription event") } return nil } diff --git a/control/strategy/cache.go b/control/strategy/cache.go index 0376b14a3..90853699a 100644 --- a/control/strategy/cache.go +++ b/control/strategy/cache.go @@ -77,20 +77,19 @@ func (c *cache) get(ns string, version int) interface{} { return cell.metric } return cell.metrics - } else { - if !ok { - c.table[key] = &cachecell{ - time: time.Time{}, - metrics: nil, - } + } + if !ok { + c.table[key] = &cachecell{ + time: time.Time{}, + metrics: nil, } - c.table[key].misses++ - cacheLog.WithFields(log.Fields{ - "namespace": key, - "hits": c.table[key].hits, - "misses": c.table[key].misses, - }).Debug(fmt.Sprintf("cache miss [%s]", key)) } + c.table[key].misses++ + cacheLog.WithFields(log.Fields{ + "namespace": key, + "hits": c.table[key].hits, + "misses": c.table[key].misses, + }).Debug(fmt.Sprintf("cache miss [%s]", key)) return nil } @@ -102,6 +101,7 @@ func (c *cache) put(ns string, version int, m interface{}) { c.table[key].time = chrono.Chrono.Now() c.table[key].metric = metric } else { + log.Errorf("%+v", c.table) c.table[key] = &cachecell{ time: chrono.Chrono.Now(), metric: metric, @@ -124,3 +124,84 @@ func (c *cache) put(ns string, version int, m interface{}) { }).Error("unsupported type") } } + +//checkCache doc here... +func (c *cache) checkCache(mts []core.Metric) ([]core.Metric, []core.Metric) { + var fromCache []core.Metric + var metricsToCollect []core.Metric + for _, mt := range mts { + if m := c.get(core.JoinNamespace(mt.Namespace()), mt.Version()); m != nil { + switch metric := m.(type) { + case core.Metric: + fromCache = append(fromCache, metric) + case []core.Metric: + for _, met := range metric { + fromCache = append(fromCache, met) + } + default: + cacheLog.WithFields(log.Fields{ + "_block": "checkCache", + }).Error("unsupported type found in the cache") + } + } else { + metricsToCollect = append(metricsToCollect, mt) + } + } + return metricsToCollect, fromCache +} + +func (c *cache) updateCache(mts []core.Metric) { + results := []core.Metric{} + dc := map[string][]core.Metric{} + for _, mt := range mts { + if mt.Labels() == nil { + // cache the individual metric + c.put(core.JoinNamespace(mt.Namespace()), mt.Version(), mt) + } else { + // collect the dynamic query results so we can cache + ns := make([]string, len(mt.Namespace())) + copy(ns, mt.Namespace()) + for _, label := range mt.Labels() { + ns[label.Index] = "*" + } + if _, ok := dc[core.JoinNamespace(ns)]; !ok { + dc[core.JoinNamespace(ns)] = []core.Metric{} + } + dc[core.JoinNamespace(ns)] = append(dc[core.JoinNamespace(ns)], mt) + c.put(core.JoinNamespace(ns), mt.Version(), dc[core.JoinNamespace(ns)]) + } + results = append(results, mt) + } +} + +func (c *cache) allCacheHits() uint64 { + var hits uint64 + for _, v := range c.table { + hits += v.hits + } + return hits +} + +func (c *cache) allCacheMisses() uint64 { + var misses uint64 + for _, v := range c.table { + misses += v.misses + } + return misses +} + +func (c *cache) cacheHits(ns string, version int) (uint64, error) { + key := fmt.Sprintf("%v:%v", ns, version) + if v, ok := c.table[key]; ok { + return v.hits, nil + } + return 0, ErrCacheEntryDoesNotExist +} + +func (c *cache) cacheMisses(ns string, version int) (uint64, error) { + key := fmt.Sprintf("%v:%v", ns, version) + if v, ok := c.table[key]; ok { + return v.misses, nil + } + return 0, ErrCacheEntryDoesNotExist +} diff --git a/control/strategy/lru.go b/control/strategy/lru.go index 99781f9f1..bcced16ef 100644 --- a/control/strategy/lru.go +++ b/control/strategy/lru.go @@ -20,18 +20,12 @@ limitations under the License. package strategy import ( - "errors" - "fmt" "time" log "github.com/Sirupsen/logrus" "github.com/intelsdi-x/snap/core" ) -var ( - ErrorCouldNotSelect = errors.New("could not select a plugin (round robin strategy)") -) - // lru provides a stragey that selects the least recently used available plugin. type lru struct { metricCache *cache @@ -47,19 +41,18 @@ func NewLRU(cacheTTL time.Duration) *lru { } } -func (l *lru) Cache() *cache { - return l.metricCache -} - +// String returns the strategy name. func (l *lru) String() string { return "least-recently-used" } -func (l *lru) CacheTTL() time.Duration { - return l.Cache().ttl +// CacheTTL returns the TTL for the cache. +func (l *lru) CacheTTL(taskID string) time.Duration { + return l.metricCache.ttl } -func (l *lru) Select(spa []SelectablePlugin) (SelectablePlugin, error) { +// Select selects an available plugin using the least-recently-used strategy. +func (l *lru) Select(spa []SelectablePlugin, _ string) (SelectablePlugin, error) { t := time.Now() index := -1 for i, sp := range spa { @@ -81,98 +74,46 @@ func (l *lru) Select(spa []SelectablePlugin) (SelectablePlugin, error) { } l.logger.WithFields(log.Fields{ "block": "select", - "strategy": "round-robin", - "error": ErrorCouldNotSelect, + "strategy": l.String(), + "error": ErrCouldNotSelect, }).Error("error selecting") - return nil, ErrorCouldNotSelect + return nil, ErrCouldNotSelect +} + +// Remove removes a SelectablePlugin from plugins +func (l *lru) Remove(sp SelectablePlugin, taskID string) { + // The LRU strategy stores no state so there is nothing to do } // checkCache checks the cache for metric types. // returns: // - array of metrics that need to be collected // - array of metrics that were returned from the cache -func (l *lru) CheckCache(mts []core.Metric) ([]core.Metric, []core.Metric) { - var fromCache []core.Metric - var metricsToCollect []core.Metric - for _, mt := range mts { - if m := l.metricCache.get(core.JoinNamespace(mt.Namespace()), mt.Version()); m != nil { - switch metric := m.(type) { - case core.Metric: - fromCache = append(fromCache, metric) - case []core.Metric: - for _, met := range metric { - fromCache = append(fromCache, met) - } - default: - l.logger.WithFields(log.Fields{ - "_module": "client", - "_block": "checkCache", - }).Error("unsupported type found in the cache") - } - } else { - metricsToCollect = append(metricsToCollect, mt) - } - } - return metricsToCollect, fromCache +func (l *lru) CheckCache(mts []core.Metric, _ string) ([]core.Metric, []core.Metric) { + return l.metricCache.checkCache(mts) } // updateCache updates the cache with the given array of metrics. -func (l *lru) UpdateCache(mts []core.Metric) { - results := []core.Metric{} - dc := map[string][]core.Metric{} - for _, mt := range mts { - if mt.Labels() == nil { - // cache the individual metric - l.metricCache.put(core.JoinNamespace(mt.Namespace()), mt.Version(), mt) - } else { - // collect the dynamic query results so we can cache - ns := make([]string, len(mt.Namespace())) - copy(ns, mt.Namespace()) - for _, label := range mt.Labels() { - ns[label.Index] = "*" - } - if _, ok := dc[core.JoinNamespace(ns)]; !ok { - dc[core.JoinNamespace(ns)] = []core.Metric{} - } - dc[core.JoinNamespace(ns)] = append(dc[core.JoinNamespace(ns)], mt) - l.metricCache.put(core.JoinNamespace(ns), mt.Version(), dc[core.JoinNamespace(ns)]) - l.logger.Errorf("putting %v:%v in the cache", mt.Namespace(), mt.Version()) - } - results = append(results, mt) - } +func (l *lru) UpdateCache(mts []core.Metric, _ string) { + l.metricCache.updateCache(mts) } +// AllCacheHits returns cache hits across all metrics. func (l *lru) AllCacheHits() uint64 { - var hits uint64 - for _, v := range l.metricCache.table { - hits += v.hits - } - return hits + return l.metricCache.allCacheHits() } // AllCacheMisses returns cache misses across all metrics. func (l *lru) AllCacheMisses() uint64 { - var misses uint64 - for _, v := range l.metricCache.table { - misses += v.misses - } - return misses + return l.metricCache.allCacheMisses() } // CacheHits returns the cache hits for a given metric namespace and version. -func (l *lru) CacheHits(ns string, version int) (uint64, error) { - key := fmt.Sprintf("%v:%v", ns, version) - if v, ok := l.metricCache.table[key]; ok { - return v.hits, nil - } - return 0, ErrCacheEntryDoesNotExist +func (l *lru) CacheHits(ns string, version int, _ string) (uint64, error) { + return l.metricCache.cacheHits(ns, version) } // CacheMisses returns the cache misses for a given metric namespace and version. -func (l *lru) CacheMisses(ns string, version int) (uint64, error) { - key := fmt.Sprintf("%v:%v", ns, version) - if v, ok := l.metricCache.table[key]; ok { - return v.misses, nil - } - return 0, ErrCacheEntryDoesNotExist +func (l *lru) CacheMisses(ns string, version int, _ string) (uint64, error) { + return l.metricCache.cacheMisses(ns, version) } diff --git a/control/strategy/sticky.go b/control/strategy/sticky.go new file mode 100644 index 000000000..da0a8058a --- /dev/null +++ b/control/strategy/sticky.go @@ -0,0 +1,151 @@ +/* +http://www.apache.org/licenses/LICENSE-2.0.txt + + +Copyright 2015 Intel Corporation + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package strategy + +import ( + "errors" + "fmt" + + log "github.com/Sirupsen/logrus" + "github.com/intelsdi-x/snap/core" +) + +import "time" + +var ( + ErrCacheDoesNotExist = errors.New("cache does not exist") +) + +// sticky provides a stragey that ... concurrency count is 1 +type sticky struct { + plugins map[string]SelectablePlugin + metricCache map[string]*cache + logger *log.Entry + cacheTTL time.Duration +} + +func NewSticky(cacheTTL time.Duration) *sticky { + return &sticky{ + metricCache: make(map[string]*cache), + plugins: make(map[string]SelectablePlugin), + cacheTTL: cacheTTL, + logger: log.WithFields(log.Fields{ + "_module": "control-routing", + }), + } +} + +// Select selects an available plugin using the sticky plugin strategy. +func (s *sticky) Select(spa []SelectablePlugin, taskId string) (SelectablePlugin, error) { + if sp, ok := s.plugins[taskId]; ok && sp != nil { + return sp, nil + } + return s.selectPlugin(spa, taskId) +} + +// Remove removes a SelectablePlugin from plugins +func (s *sticky) Remove(sp SelectablePlugin, taskID string) { + delete(s.plugins, taskID) +} + +// String returns the strategy name. +func (s *sticky) String() string { + return "sticky" +} + +// TODO (JC) should we be passing the taskID in here? +// CacheTTL returns the TTL for the cache. +func (s *sticky) CacheTTL(taskID string) time.Duration { + return s.cacheTTL +} + +// checkCache checks the cache for metric types. +// returns: +// - array of metrics that need to be collected +// - array of metrics that were returned from the cache +func (s *sticky) CheckCache(mts []core.Metric, taskID string) ([]core.Metric, []core.Metric) { + if _, ok := s.metricCache[taskID]; !ok { + s.metricCache[taskID] = NewCache(s.cacheTTL) + } + return s.metricCache[taskID].checkCache(mts) +} + +// updateCache updates the cache with the given array of metrics. +func (s *sticky) UpdateCache(mts []core.Metric, taskID string) { + if _, ok := s.metricCache[taskID]; !ok { + s.metricCache[taskID] = NewCache(s.cacheTTL) + } + s.metricCache[taskID].updateCache(mts) +} + +// AllCacheHits returns cache hits across all metrics. +func (s *sticky) AllCacheHits() uint64 { + var total uint64 + for _, cache := range s.metricCache { + total += cache.allCacheHits() + } + return total +} + +// AllCacheMisses returns cache misses across all metrics. +func (s *sticky) AllCacheMisses() uint64 { + var total uint64 + for _, cache := range s.metricCache { + total += cache.allCacheMisses() + } + return total +} + +// CacheHits returns the cache hits for a given metric namespace and version. +func (s *sticky) CacheHits(ns string, version int, taskID string) (uint64, error) { + if cache, ok := s.metricCache[taskID]; ok { + return cache.cacheHits(ns, version) + } + return 0, ErrCacheDoesNotExist +} + +// CacheMisses returns the cache misses for a given metric namespace and version. +func (s *sticky) CacheMisses(ns string, version int, taskID string) (uint64, error) { + if cache, ok := s.metricCache[taskID]; ok { + return cache.cacheMisses(ns, version) + } + return 0, ErrCacheDoesNotExist +} + +func (s *sticky) selectPlugin(sp []SelectablePlugin, taskID string) (SelectablePlugin, error) { + for _, p := range sp { + available := true + for _, busyPlugin := range s.plugins { + if p == busyPlugin { + available = false + } + } + if available { + s.plugins[taskID] = p + return p, nil + } + } + s.logger.WithFields(log.Fields{ + "_block": "findAvailablePlugin", + "strategy": s.String(), + "error": fmt.Sprintf("%v of %v plugins are available", len(sp)-len(s.plugins), len(sp)), + }).Error(ErrCouldNotSelect) + return nil, ErrCouldNotSelect +} diff --git a/control/strategy/sticky_test.go b/control/strategy/sticky_test.go new file mode 100644 index 000000000..5fe7fab8a --- /dev/null +++ b/control/strategy/sticky_test.go @@ -0,0 +1,100 @@ +/* +http://www.apache.org/licenses/LICENSE-2.0.txt + + +Copyright 2015 Intel Corporation + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package strategy + +import ( + "strings" + "testing" + "time" + + "github.com/intelsdi-x/snap/core" + "github.com/intelsdi-x/snap/core/cdata" + . "github.com/smartystreets/goconvey/convey" +) + +type mockPlugin struct { + name string +} + +func (m *mockPlugin) HitCount() int { return 0 } +func (m *mockPlugin) LastHit() time.Time { return time.Time{} } +func (m *mockPlugin) String() string { return "" } + +func newMockMetricType(ns string) mockMetricType { + return mockMetricType{ + namespace: strings.Split(ns, "/"), + } +} + +type mockMetricType struct { + namespace []string +} + +func (m mockMetricType) Namespace() []string { return m.namespace } + +func (m mockMetricType) LastAdvertisedTime() time.Time { return time.Now() } + +func (m mockMetricType) Version() int { return 1 } + +func (m mockMetricType) Config() *cdata.ConfigDataNode { return nil } + +func (m mockMetricType) Data() interface{} { return nil } + +func (m mockMetricType) Source() string { return "" } + +func (m mockMetricType) Tags() map[string]string { return nil } + +func (m mockMetricType) Labels() []core.Label { return nil } + +func (m mockMetricType) Timestamp() time.Time { return time.Time{} } + +func TestStickyRouter(t *testing.T) { + Convey("Given a sticky router", t, func() { + router := NewSticky(100 * time.Millisecond) + So(router, ShouldNotBeNil) + So(router.String(), ShouldResemble, "sticky") + Convey("Select a plugin when they are available", func() { + p1 := &mockPlugin{name: "p1"} + p2 := &mockPlugin{name: "p2"} + // select a plugin, for task1, given a task and two available plugins + sp1, err := router.Select([]SelectablePlugin{p1, p2}, "task1") + So(err, ShouldBeNil) + So(sp1, ShouldNotBeNil) + So(sp1, ShouldEqual, p1) + // change the order of the plugins provided to the select + sp2, err := router.Select([]SelectablePlugin{p2, p1}, "task1") + So(err, ShouldBeNil) + So(sp1, ShouldNotBeNil) + So(sp2, ShouldEqual, p1) + // select the other (last) available plugin for task2 + sp3, err := router.Select([]SelectablePlugin{p2, p1}, "task2") + So(err, ShouldBeNil) + So(sp3, ShouldNotBeNil) + So(sp3, ShouldEqual, p2) + Convey("Select a plugin when there are NONE available", func() { + plugins := []SelectablePlugin{p1, p2} + sp, err := router.Select(plugins, "task3") + So(sp, ShouldBeNil) + So(err, ShouldEqual, ErrCouldNotSelect) + }) + }) + + }) +} diff --git a/control/strategy/strategy.go b/control/strategy/strategy.go index 052cd2e10..76310ea5b 100644 --- a/control/strategy/strategy.go +++ b/control/strategy/strategy.go @@ -22,11 +22,16 @@ limitations under the License. package strategy import ( + "errors" "time" "github.com/intelsdi-x/snap/core" ) +var ( + ErrCouldNotSelect = errors.New("could not select a plugin") +) + type SelectablePlugin interface { HitCount() int LastHit() time.Time @@ -34,13 +39,14 @@ type SelectablePlugin interface { } type RoutingAndCaching interface { - Select([]SelectablePlugin) (SelectablePlugin, error) - CheckCache(mts []core.Metric) ([]core.Metric, []core.Metric) - UpdateCache(mts []core.Metric) - CacheHits(string, int) (uint64, error) - CacheMisses(string, int) (uint64, error) + Select([]SelectablePlugin, string) (SelectablePlugin, error) + Remove(SelectablePlugin, string) + CheckCache([]core.Metric, string) ([]core.Metric, []core.Metric) + UpdateCache([]core.Metric, string) + CacheHits(string, int, string) (uint64, error) + CacheMisses(string, int, string) (uint64, error) AllCacheHits() uint64 AllCacheMisses() uint64 - CacheTTL() time.Duration + CacheTTL(string) time.Duration String() string } diff --git a/plugin/collector/snap-collector-mock1/mock/mock.go b/plugin/collector/snap-collector-mock1/mock/mock.go index 657651d5a..18d6635c0 100644 --- a/plugin/collector/snap-collector-mock1/mock/mock.go +++ b/plugin/collector/snap-collector-mock1/mock/mock.go @@ -68,18 +68,14 @@ func (f *Mock) CollectMetrics(mts []plugin.PluginMetricType) ([]plugin.PluginMet metrics = append(metrics, mt) } } else { - var data string if cv, ok := p.Config().Table()["test"]; ok { - data = fmt.Sprintf("The mock collected data! config data: user=%s password=%s test=%v", p.Config().Table()["user"], p.Config().Table()["password"], cv.(ctypes.ConfigValueBool).Value) + p.Data_ = fmt.Sprintf("The mock collected data! config data: user=%s password=%s test=%v", p.Config().Table()["user"], p.Config().Table()["password"], cv.(ctypes.ConfigValueBool).Value) } else { - data = fmt.Sprintf("The mock collected data! config data: user=%s password=%s", p.Config().Table()["user"], p.Config().Table()["password"]) + p.Data_ = fmt.Sprintf("The mock collected data! config data: user=%s password=%s", p.Config().Table()["user"], p.Config().Table()["password"]) } - mt := plugin.PluginMetricType{ - Data_: data, - Timestamp_: time.Now(), - Source_: hostname, - } - metrics = append(metrics, mt) + p.Timestamp_ = time.Now() + p.Source_ = hostname + metrics = append(metrics, p) } } return metrics, nil diff --git a/plugin/collector/snap-collector-mock2/mock/mock.go b/plugin/collector/snap-collector-mock2/mock/mock.go index 2b1a7c0d9..31b1a5353 100644 --- a/plugin/collector/snap-collector-mock2/mock/mock.go +++ b/plugin/collector/snap-collector-mock2/mock/mock.go @@ -47,7 +47,7 @@ type Mock struct { // CollectMetrics collects metrics for testing func (f *Mock) CollectMetrics(mts []plugin.PluginMetricType) ([]plugin.PluginMetricType, error) { for _, p := range mts { - log.Printf("collecting %+v", p) + log.Printf("collecting %+v\n", p) } rand.Seed(time.Now().UTC().UnixNano()) metrics := []plugin.PluginMetricType{} @@ -117,6 +117,7 @@ func Meta() *plugin.PluginMeta { []string{plugin.SnapGOBContentType}, []string{plugin.SnapGOBContentType}, plugin.CacheTTL(100*time.Millisecond), + plugin.RoutingStrategy(plugin.StickyRouting), ) } diff --git a/scheduler/job.go b/scheduler/job.go index ac12c56b1..bef0fb68f 100644 --- a/scheduler/job.go +++ b/scheduler/job.go @@ -86,6 +86,7 @@ type job interface { StartTime() time.Time Deadline() time.Time Type() jobType + TaskID() string Run() } @@ -93,17 +94,18 @@ type jobType int type coreJob struct { sync.Mutex - + taskID string jtype jobType deadline time.Time starttime time.Time errors []error } -func newCoreJob(t jobType, deadline time.Time) *coreJob { +func newCoreJob(t jobType, deadline time.Time, taskID string) *coreJob { return &coreJob{ jtype: t, deadline: deadline, + taskID: taskID, errors: make([]error, 0), starttime: time.Now(), } @@ -131,6 +133,10 @@ func (c *coreJob) Errors() []error { return c.errors } +func (c *coreJob) TaskID() string { + return c.taskID +} + type collectorJob struct { *coreJob collector collectsMetrics @@ -139,12 +145,12 @@ type collectorJob struct { configDataTree *cdata.ConfigDataTree } -func newCollectorJob(metricTypes []core.RequestedMetric, deadlineDuration time.Duration, collector collectsMetrics, cdt *cdata.ConfigDataTree) job { +func newCollectorJob(metricTypes []core.RequestedMetric, deadlineDuration time.Duration, collector collectsMetrics, cdt *cdata.ConfigDataTree, taskID string) job { return &collectorJob{ collector: collector, metricTypes: metricTypes, metrics: []core.Metric{}, - coreJob: newCoreJob(collectJobType, time.Now().Add(deadlineDuration)), + coreJob: newCoreJob(collectJobType, time.Now().Add(deadlineDuration), taskID), configDataTree: cdt, } } @@ -193,7 +199,7 @@ func (c *collectorJob) Run() { config: config, } } - ret, errs := c.collector.CollectMetrics(metrics, c.Deadline()) + ret, errs := c.collector.CollectMetrics(metrics, c.Deadline(), c.TaskID()) log.WithFields(log.Fields{ "_module": "scheduler-job", @@ -228,13 +234,13 @@ type processJob struct { content []byte } -func newProcessJob(parentJob job, pluginName string, pluginVersion int, contentType string, config map[string]ctypes.ConfigValue, processor processesMetrics) job { +func newProcessJob(parentJob job, pluginName string, pluginVersion int, contentType string, config map[string]ctypes.ConfigValue, processor processesMetrics, taskID string) job { return &processJob{ parentJob: parentJob, pluginName: pluginName, pluginVersion: pluginVersion, metrics: []core.Metric{}, - coreJob: newCoreJob(processJobType, parentJob.Deadline()), + coreJob: newCoreJob(processJobType, parentJob.Deadline(), taskID), config: config, processor: processor, contentType: contentType, @@ -269,7 +275,7 @@ func (p *processJob) Run() { } } enc.Encode(metrics) - _, content, errs := p.processor.ProcessMetrics(p.contentType, buf.Bytes(), p.pluginName, p.pluginVersion, p.config) + _, content, errs := p.processor.ProcessMetrics(p.contentType, buf.Bytes(), p.pluginName, p.pluginVersion, p.config, p.taskID) if errs != nil { for _, e := range errs { log.WithFields(log.Fields{ @@ -323,13 +329,13 @@ type publisherJob struct { contentType string } -func newPublishJob(parentJob job, pluginName string, pluginVersion int, contentType string, config map[string]ctypes.ConfigValue, publisher publishesMetrics) job { +func newPublishJob(parentJob job, pluginName string, pluginVersion int, contentType string, config map[string]ctypes.ConfigValue, publisher publishesMetrics, taskID string) job { return &publisherJob{ parentJob: parentJob, publisher: publisher, pluginName: pluginName, pluginVersion: pluginVersion, - coreJob: newCoreJob(publishJobType, parentJob.Deadline()), + coreJob: newCoreJob(publishJobType, parentJob.Deadline(), taskID), config: config, contentType: contentType, } @@ -362,7 +368,7 @@ func (p *publisherJob) Run() { } } enc.Encode(metrics) - errs := p.publisher.PublishMetrics(p.contentType, buf.Bytes(), p.pluginName, p.pluginVersion, p.config) + errs := p.publisher.PublishMetrics(p.contentType, buf.Bytes(), p.pluginName, p.pluginVersion, p.config, p.taskID) if errs != nil { for _, e := range errs { log.WithFields(log.Fields{ @@ -393,7 +399,7 @@ func (p *publisherJob) Run() { case processJobType: switch p.contentType { case plugin.SnapGOBContentType: - errs := p.publisher.PublishMetrics(p.contentType, p.parentJob.(*processJob).content, p.pluginName, p.pluginVersion, p.config) + errs := p.publisher.PublishMetrics(p.contentType, p.parentJob.(*processJob).content, p.pluginName, p.pluginVersion, p.config, p.taskID) if errs != nil { for _, e := range errs { log.WithFields(log.Fields{ diff --git a/scheduler/job_test.go b/scheduler/job_test.go index e7861a332..9064705a5 100644 --- a/scheduler/job_test.go +++ b/scheduler/job_test.go @@ -33,7 +33,7 @@ import ( type mockCollector struct{} -func (m *mockCollector) CollectMetrics([]core.Metric, time.Time) ([]core.Metric, []error) { +func (m *mockCollector) CollectMetrics([]core.Metric, time.Time, string) ([]core.Metric, []error) { return nil, nil } @@ -42,43 +42,37 @@ func TestCollectorJob(t *testing.T) { cdt := cdata.NewTree() Convey("newCollectorJob()", t, func() { Convey("it returns an init-ed collectorJob", func() { - cj := newCollectorJob([]core.RequestedMetric{}, defaultDeadline, &mockCollector{}, cdt) + cj := newCollectorJob([]core.RequestedMetric{}, defaultDeadline, &mockCollector{}, cdt, "taskid") So(cj, ShouldHaveSameTypeAs, &collectorJob{}) }) }) Convey("StartTime()", t, func() { Convey("it should return the job starttime", func() { - cj := newCollectorJob([]core.RequestedMetric{}, defaultDeadline, &mockCollector{}, cdt) + cj := newCollectorJob([]core.RequestedMetric{}, defaultDeadline, &mockCollector{}, cdt, "taskid") So(cj.StartTime(), ShouldHaveSameTypeAs, time.Now()) }) }) Convey("Deadline()", t, func() { Convey("it should return the job daedline", func() { - cj := newCollectorJob([]core.RequestedMetric{}, defaultDeadline, &mockCollector{}, cdt) + cj := newCollectorJob([]core.RequestedMetric{}, defaultDeadline, &mockCollector{}, cdt, "taskid") So(cj.Deadline(), ShouldResemble, cj.(*collectorJob).deadline) }) }) Convey("Type()", t, func() { Convey("it should return the job type", func() { - cj := newCollectorJob([]core.RequestedMetric{}, defaultDeadline, &mockCollector{}, cdt) + cj := newCollectorJob([]core.RequestedMetric{}, defaultDeadline, &mockCollector{}, cdt, "taskid") So(cj.Type(), ShouldEqual, collectJobType) }) }) - // Convey("Metrics()", t, func() { - // Convey("it should return the job metrics", func() { - // cj := newCollectorJob([]core.MetricType{}, defaultDeadline, &mockCollector{}) - // So(cj.Metrics(), ShouldResemble, []core.Metric{}) - // }) - // }) Convey("Errors()", t, func() { Convey("it should return the errors from the job", func() { - cj := newCollectorJob([]core.RequestedMetric{}, defaultDeadline, &mockCollector{}, cdt) + cj := newCollectorJob([]core.RequestedMetric{}, defaultDeadline, &mockCollector{}, cdt, "taskid") So(cj.Errors(), ShouldResemble, []error{}) }) }) Convey("AddErrors()", t, func() { Convey("it should append errors to the job", func() { - cj := newCollectorJob([]core.RequestedMetric{}, defaultDeadline, &mockCollector{}, cdt) + cj := newCollectorJob([]core.RequestedMetric{}, defaultDeadline, &mockCollector{}, cdt, "taskid") So(cj.Errors(), ShouldResemble, []error{}) e1 := errors.New("1") @@ -93,7 +87,7 @@ func TestCollectorJob(t *testing.T) { }) Convey("Run()", t, func() { Convey("it should complete without errors", func() { - cj := newCollectorJob([]core.RequestedMetric{}, defaultDeadline, &mockCollector{}, cdt) + cj := newCollectorJob([]core.RequestedMetric{}, defaultDeadline, &mockCollector{}, cdt, "taskid") cj.(*collectorJob).Run() So(cj.Errors(), ShouldResemble, []error{}) }) @@ -105,14 +99,14 @@ func TestQueuedJob(t *testing.T) { cdt := cdata.NewTree() Convey("Job()", t, func() { Convey("it should return the underlying job", func() { - cj := newCollectorJob([]core.RequestedMetric{}, defaultDeadline, &mockCollector{}, cdt) + cj := newCollectorJob([]core.RequestedMetric{}, defaultDeadline, &mockCollector{}, cdt, "taskid") qj := newQueuedJob(cj) So(qj.Job(), ShouldEqual, cj) }) }) Convey("Promise()", t, func() { Convey("it should return the underlying promise", func() { - cj := newCollectorJob([]core.RequestedMetric{}, defaultDeadline, &mockCollector{}, cdt) + cj := newCollectorJob([]core.RequestedMetric{}, defaultDeadline, &mockCollector{}, cdt, "taskid") qj := newQueuedJob(cj) So(qj.Promise().IsComplete(), ShouldBeFalse) }) diff --git a/scheduler/scheduler.go b/scheduler/scheduler.go index ddd718347..3b17131a4 100644 --- a/scheduler/scheduler.go +++ b/scheduler/scheduler.go @@ -77,15 +77,15 @@ type managesPluginContentTypes interface { } type collectsMetrics interface { - CollectMetrics([]core.Metric, time.Time) ([]core.Metric, []error) + CollectMetrics([]core.Metric, time.Time, string) ([]core.Metric, []error) } type publishesMetrics interface { - PublishMetrics(contentType string, content []byte, pluginName string, pluginVersion int, config map[string]ctypes.ConfigValue) []error + PublishMetrics(contentType string, content []byte, pluginName string, pluginVersion int, config map[string]ctypes.ConfigValue, taskID string) []error } type processesMetrics interface { - ProcessMetrics(contentType string, content []byte, pluginName string, pluginVersion int, config map[string]ctypes.ConfigValue) (string, []byte, []error) + ProcessMetrics(contentType string, content []byte, pluginName string, pluginVersion int, config map[string]ctypes.ConfigValue, taskID string) (string, []byte, []error) } type scheduler struct { diff --git a/scheduler/scheduler_test.go b/scheduler/scheduler_test.go index 7c081f416..922c1fbc6 100644 --- a/scheduler/scheduler_test.go +++ b/scheduler/scheduler_test.go @@ -79,15 +79,15 @@ func (m *mockMetricManager) GetPluginContentTypes(n string, t core.PluginType, v return m.acceptedContentTypes[key], m.returnedContentTypes[key], nil } -func (m *mockMetricManager) CollectMetrics([]core.Metric, time.Time) ([]core.Metric, []error) { +func (m *mockMetricManager) CollectMetrics([]core.Metric, time.Time, string) ([]core.Metric, []error) { return nil, nil } -func (m *mockMetricManager) PublishMetrics(contentType string, content []byte, pluginName string, pluginVersion int, config map[string]ctypes.ConfigValue) []error { +func (m *mockMetricManager) PublishMetrics(contentType string, content []byte, pluginName string, pluginVersion int, config map[string]ctypes.ConfigValue, taskID string) []error { return nil } -func (m *mockMetricManager) ProcessMetrics(contentType string, content []byte, pluginName string, pluginVersion int, config map[string]ctypes.ConfigValue) (string, []byte, []error) { +func (m *mockMetricManager) ProcessMetrics(contentType string, content []byte, pluginName string, pluginVersion int, config map[string]ctypes.ConfigValue, taskID string) (string, []byte, []error) { return "", nil, nil } diff --git a/scheduler/work_manager_test.go b/scheduler/work_manager_test.go index b1765f72f..ffaf0c907 100644 --- a/scheduler/work_manager_test.go +++ b/scheduler/work_manager_test.go @@ -81,6 +81,7 @@ func (mj *mockJob) Errors() []error { return mj.errors } func (mj *mockJob) StartTime() time.Time { return mj.starttime } func (mj *mockJob) Deadline() time.Time { return mj.deadline } func (mj *mockJob) Type() jobType { return collectJobType } +func (mj *mockJob) TaskID() string { return "" } // Complete the first incomplete rendez-vous (if there is one) func (mj *mockJob) RendezVous() { diff --git a/scheduler/workflow.go b/scheduler/workflow.go index b2d0c3c11..bbb4fd972 100644 --- a/scheduler/workflow.go +++ b/scheduler/workflow.go @@ -299,7 +299,7 @@ func bindPluginContentTypes(pus []*publishNode, prs []*processNode, mm managesPl // Start starts a workflow func (s *schedulerWorkflow) Start(t *task) { s.state = WorkflowStarted - j := newCollectorJob(s.metrics, t.deadlineDuration, t.metricsManager, t.workflow.configTree) + j := newCollectorJob(s.metrics, t.deadlineDuration, t.metricsManager, t.workflow.configTree, t.id) // dispatch 'collect' job to be worked // Block until the job has been either run or skipped. @@ -336,7 +336,7 @@ func (s *schedulerWorkflow) StateString() string { func (s *schedulerWorkflow) workJobs(prs []*processNode, pus []*publishNode, t *task, pj job) { for _, pr := range prs { - j := newProcessJob(pj, pr.Name(), pr.Version(), pr.InboundContentType, pr.config.Table(), t.metricsManager) + j := newProcessJob(pj, pr.Name(), pr.Version(), pr.InboundContentType, pr.config.Table(), t.metricsManager, t.id) errors := t.manager.Work(j).Promise().Await() if len(errors) != 0 { t.failedRuns++ @@ -348,7 +348,7 @@ func (s *schedulerWorkflow) workJobs(prs []*processNode, pus []*publishNode, t * s.workJobs(pr.ProcessNodes, pr.PublishNodes, t, j) } for _, pu := range pus { - j := newPublishJob(pj, pu.Name(), pu.Version(), pu.InboundContentType, pu.config.Table(), t.metricsManager) + j := newPublishJob(pj, pu.Name(), pu.Version(), pu.InboundContentType, pu.config.Table(), t.metricsManager, t.id) errors := t.manager.Work(j).Promise().Await() if len(errors) != 0 { t.failedRuns++