diff --git a/.gitignore b/.gitignore index 6234bb211..658b65ba7 100644 --- a/.gitignore +++ b/.gitignore @@ -30,6 +30,7 @@ build/ profile.cov gin-bin tags +.vscode/ # we don't vendor godep _workspace **/Godeps/_workspace/** diff --git a/.vscode/launch.json b/.vscode/launch.json new file mode 100644 index 000000000..4190fbbcb --- /dev/null +++ b/.vscode/launch.json @@ -0,0 +1,14 @@ +{ + "version": "0.2.0", + "configurations": [ + { + "name": "Launch main.go", + "type": "go", + "request": "launch", + "mode": "test", + "program": "./control/", + "env": {}, + "args": ["run TestRoutingCachingStrategy"] + } + ] +} \ No newline at end of file diff --git a/control/available_plugin.go b/control/available_plugin.go index 492d2ff2c..b8b137aaa 100644 --- a/control/available_plugin.go +++ b/control/available_plugin.go @@ -28,7 +28,6 @@ import ( "strconv" "strings" "sync" - "sync/atomic" "time" log "github.com/Sirupsen/logrus" @@ -54,23 +53,7 @@ const ( var ( ErrPoolNotFound = errors.New("plugin pool not found") - ErrPoolEmpty = errors.New("plugin pool is empty") ErrBadKey = errors.New("bad key") - ErrBadType = errors.New("bad plugin type") - ErrBadStrategy = errors.New("bad strategy") - - // This defines the maximum running instances of a loaded plugin. - // It is initialized at runtime via the cli. - maximumRunningPlugins = 3 -) - -type subscriptionType int - -const ( - // this subscription is bound to an explicit version - boundSubscriptionType subscriptionType = iota - // this subscription is akin to "latest" and must be moved if a newer version is loaded. - unboundSubscriptionType ) // availablePlugin represents a plugin which is @@ -98,7 +81,7 @@ type availablePlugin struct { // plugin.Response func newAvailablePlugin(resp *plugin.Response, emitter gomit.Emitter, ep executablePlugin) (*availablePlugin, error) { if resp.Type != plugin.CollectorPluginType && resp.Type != plugin.ProcessorPluginType && resp.Type != plugin.PublisherPluginType { - return nil, ErrBadType + return nil, strategy.ErrBadType } ap := &availablePlugin{ meta: resp.Meta, @@ -171,6 +154,26 @@ func (a *availablePlugin) ID() uint32 { return a.id } +func (a *availablePlugin) SetID(id uint32) { + a.id = id +} + +func (a *availablePlugin) Exclusive() bool { + return a.meta.Exclusive +} + +func (a *availablePlugin) CacheTTL() time.Duration { + return a.meta.CacheTTL +} + +func (a *availablePlugin) RoutingStrategy() plugin.RoutingStrategyType { + return a.meta.RoutingStrategy +} + +func (a *availablePlugin) ConcurrencyCount() int { + return a.meta.ConcurrencyCount +} + func (a *availablePlugin) String() string { return fmt.Sprintf("%s:%s:v%d:id%d", a.TypeName(), a.name, a.version, a.id) } @@ -179,6 +182,10 @@ func (a *availablePlugin) TypeName() string { return a.pluginType.String() } +func (a *availablePlugin) Type() plugin.PluginType { + return a.pluginType +} + func (a *availablePlugin) Name() string { return a.name } @@ -283,322 +290,25 @@ func (a *availablePlugin) healthCheckFailed() { defer a.emitter.Emit(hcfe) } -type apPool struct { - // used to coordinate changes to a pool - *sync.RWMutex - - // the version of the plugins in the pool. - // subscriptions uses this. - version int - // key is the primary key used in availablePlugins: - // {plugin_type}:{plugin_name}:{plugin_version} - key string - - // The subscriptions to this pool. - subs map[string]*subscription - - // The plugins in the pool. - // the primary key is an increasing --> uint from - // snapd epoch (`service snapd start`). - plugins map[uint32]*availablePlugin - pidCounter uint32 - - // The max size which this pool may grow. - max int - - // The number of subscriptions per running instance - concurrencyCount int - - // The routing and caching strategy declared by the plugin. - strategy strategy.RoutingAndCaching -} - -func newPool(key string, plugins ...*availablePlugin) (*apPool, error) { - versl := strings.Split(key, ":") - ver, err := strconv.Atoi(versl[len(versl)-1]) - if err != nil { - return nil, err - } - p := &apPool{ - RWMutex: &sync.RWMutex{}, - version: ver, - key: key, - subs: map[string]*subscription{}, - plugins: make(map[uint32]*availablePlugin), - max: maximumRunningPlugins, - concurrencyCount: 1, - } - - if len(plugins) > 0 { - for _, plg := range plugins { - p.insert(plg) - } - } - - return p, nil -} - -func (p *apPool) insert(ap *availablePlugin) error { - if ap.pluginType != plugin.CollectorPluginType && ap.pluginType != plugin.ProcessorPluginType && ap.pluginType != plugin.PublisherPluginType { - return ErrBadType - } - // If an empty pool is created, it does not have - // any available plugins from which to retrieve - // concurrency count or exclusivity. We ensure it - // is set correctly on an insert. - if len(p.plugins) == 0 { - if err := p.applyPluginMeta(ap); err != nil { - return err - } - } - - ap.id = p.generatePID() - p.plugins[ap.id] = ap - - return nil -} - -func (p *apPool) applyPluginMeta(ap *availablePlugin) error { - // Checking if plugin is exclusive - // (only one instance should be running). - if ap.meta.Exclusive { - p.max = 1 - } - - // Set the cache TTL - cacheTTL := strategy.GlobalCacheExpiration - // if the plugin exposes a default TTL that is greater the the global default use it - if ap.meta.CacheTTL != 0 && ap.meta.CacheTTL > strategy.GlobalCacheExpiration { - cacheTTL = ap.meta.CacheTTL - } - - // Set the routing and caching strategy - switch ap.meta.RoutingStrategy { - case plugin.DefaultRouting: - p.strategy = strategy.NewLRU(cacheTTL) - default: - return ErrBadStrategy - } - - // set concurrency count - p.concurrencyCount = ap.meta.ConcurrencyCount - - return nil -} - -// subscribe adds a subscription to the pool. -// Using subscribe is idempotent. -func (p *apPool) subscribe(taskID string, subType subscriptionType) { - p.Lock() - defer p.Unlock() - - if _, exists := p.subs[taskID]; !exists { - // Version is the last item in the key, so we split here - // to retrieve it for the subscription. - p.subs[taskID] = &subscription{ - taskID: taskID, - subType: subType, - version: p.version, - } - } -} - -// unsubscribe removes a subscription from the pool. -// Using unsubscribe is idempotent. -func (p *apPool) unsubscribe(taskID string) { - p.Lock() - defer p.Unlock() - delete(p.subs, taskID) -} - -func (p *apPool) eligible() bool { - p.RLock() - defer p.RUnlock() - - // optimization: don't even bother with concurrency - // count if we have already reached pool max - if p.count() == p.max { - return false - } - - should := p.subscriptionCount() / p.concurrencyCount - if should > p.count() && should <= p.max { - return true - } - - return false -} - -// kill kills and removes the available plugin from its pool. -// Using kill is idempotent. -func (p *apPool) kill(id uint32, reason string) { - p.Lock() - defer p.Unlock() - - ap, ok := p.plugins[id] - if ok { - ap.Kill(reason) - delete(p.plugins, id) - } -} - -// kills the plugin with the lowest hit count -func (p *apPool) killLeastUsed(reason string) { - p.Lock() - defer p.Unlock() - - var ( - id uint32 - prev int - ) - - // grab details from the first item - for _, p := range p.plugins { - prev = p.hitCount - id = p.id - break - } - - // walk through all and find the lowest hit count - for _, p := range p.plugins { - if p.hitCount < prev { - prev = p.hitCount - id = p.id - } - } - - // kill that ap - ap, ok := p.plugins[id] - if ok { - // only log on first ok health check - log.WithFields(log.Fields{ - "_module": "control-aplugin", - "block": "kill-least-used", - "aplugin": ap, - }).Debug("killing available plugin") - ap.Kill(reason) - delete(p.plugins, id) - } -} - -// remove removes an available plugin from the the pool. -// using remove is idempotent. -func (p *apPool) remove(id uint32) { - p.Lock() - defer p.Unlock() - delete(p.plugins, id) -} - -func (p *apPool) count() int { - return len(p.plugins) -} - -// NOTE: The data returned by subscriptions should be constant and read only. -func (p *apPool) subscriptions() map[string]*subscription { - p.RLock() - defer p.RUnlock() - return p.subs -} - -func (p *apPool) subscriptionCount() int { - p.RLock() - defer p.RUnlock() - return len(p.subs) -} - -func (p *apPool) selectAP() (*availablePlugin, serror.SnapError) { - p.RLock() - defer p.RUnlock() - - sp := make([]strategy.SelectablePlugin, p.count()) - i := 0 - for _, plg := range p.plugins { - sp[i] = plg - i++ - } - sap, err := p.strategy.Select(sp) - if err != nil || sap == nil { - return nil, serror.New(err) - } - return sap.(*availablePlugin), nil -} - -func (p *apPool) generatePID() uint32 { - atomic.AddUint32(&p.pidCounter, 1) - return p.pidCounter -} - -func (p *apPool) moveSubscriptions(to *apPool) []subscription { - var subs []subscription - - p.Lock() - defer p.Unlock() - - for task, sub := range p.subs { - if sub.subType == unboundSubscriptionType && to.version > p.version { - subs = append(subs, *sub) - to.subscribe(task, unboundSubscriptionType) - delete(p.subs, task) - } - } - return subs -} - -type subscription struct { - subType subscriptionType - version int - taskID string -} - -func (p *apPool) CheckCache(mts []core.Metric) ([]core.Metric, []core.Metric) { - return p.strategy.CheckCache(mts) -} - -func (p *apPool) UpdateCache(mts []core.Metric) { - p.strategy.UpdateCache(mts) -} - -func (p *apPool) CacheHits(ns string, ver int) (uint64, error) { - return p.strategy.CacheHits(ns, ver) -} - -func (p *apPool) CacheMisses(ns string, ver int) (uint64, error) { - return p.strategy.CacheMisses(ns, ver) -} -func (p *apPool) AllCacheHits() uint64 { - return p.strategy.AllCacheHits() -} - -func (p *apPool) AllCacheMisses() uint64 { - return p.strategy.AllCacheMisses() -} - -func (p *apPool) CacheTTL() (time.Duration, error) { - if len(p.plugins) == 0 { - return 0, ErrPoolEmpty - } - return p.strategy.CacheTTL(), nil -} - type availablePlugins struct { // Used to coordinate operations on the table. *sync.RWMutex // table holds all the plugin pools. // The Pools' primary keys are equal to // {plugin_type}:{plugin_name}:{plugin_version} - table map[string]*apPool + table map[string]strategy.Pool } func newAvailablePlugins() *availablePlugins { return &availablePlugins{ RWMutex: &sync.RWMutex{}, - table: make(map[string]*apPool), + table: make(map[string]strategy.Pool), } } func (ap *availablePlugins) insert(pl *availablePlugin) error { if pl.pluginType != plugin.CollectorPluginType && pl.pluginType != plugin.ProcessorPluginType && pl.pluginType != plugin.PublisherPluginType { - return ErrBadType + return strategy.ErrBadType } ap.Lock() @@ -607,7 +317,7 @@ func (ap *availablePlugins) insert(pl *availablePlugin) error { key := fmt.Sprintf("%s:%s:%d", pl.TypeName(), pl.name, pl.version) _, exists := ap.table[key] if !exists { - p, err := newPool(key, pl) + p, err := strategy.NewPool(key, pl) if err != nil { return serror.New(ErrBadKey, map[string]interface{}{ "key": key, @@ -616,11 +326,11 @@ func (ap *availablePlugins) insert(pl *availablePlugin) error { ap.table[key] = p return nil } - ap.table[key].insert(pl) + ap.table[key].Insert(pl) return nil } -func (ap *availablePlugins) getPool(key string) (*apPool, serror.SnapError) { +func (ap *availablePlugins) getPool(key string) (strategy.Pool, serror.SnapError) { ap.RLock() defer ap.RUnlock() pool, ok := ap.table[key] @@ -649,7 +359,7 @@ func (ap *availablePlugins) getPool(key string) (*apPool, serror.SnapError) { return pool, nil } -func (ap *availablePlugins) collectMetrics(pluginKey string, metricTypes []core.Metric) ([]core.Metric, error) { +func (ap *availablePlugins) collectMetrics(pluginKey string, metricTypes []core.Metric, taskID string) ([]core.Metric, error) { var results []core.Metric pool, serr := ap.getPool(pluginKey) if serr != nil { @@ -659,7 +369,7 @@ func (ap *availablePlugins) collectMetrics(pluginKey string, metricTypes []core. return nil, serror.New(ErrPoolNotFound, map[string]interface{}{"pool-key": pluginKey}) } - metricsToCollect, metricsFromCache := pool.CheckCache(metricTypes) + metricsToCollect, metricsFromCache := pool.CheckCache(metricTypes, taskID) if len(metricsToCollect) == 0 { return metricsFromCache, nil @@ -667,13 +377,13 @@ func (ap *availablePlugins) collectMetrics(pluginKey string, metricTypes []core. pool.RLock() defer pool.RUnlock() - p, serr := pool.selectAP() + p, serr := pool.SelectAP(taskID) if serr != nil { return nil, serr } // cast client to PluginCollectorClient - cli, ok := p.client.(client.PluginCollectorClient) + cli, ok := p.(*availablePlugin).client.(client.PluginCollectorClient) if !ok { return nil, serror.New(errors.New("unable to cast client to PluginCollectorClient")) } @@ -684,7 +394,7 @@ func (ap *availablePlugins) collectMetrics(pluginKey string, metricTypes []core. return nil, serror.New(err) } - pool.UpdateCache(metrics) + pool.UpdateCache(metrics, taskID) results = make([]core.Metric, len(metricsFromCache)+len(metrics)) idx := 0 @@ -698,13 +408,13 @@ func (ap *availablePlugins) collectMetrics(pluginKey string, metricTypes []core. } // update plugin stats - p.hitCount++ - p.lastHitTime = time.Now() + p.(*availablePlugin).hitCount++ + p.(*availablePlugin).lastHitTime = time.Now() return metrics, nil } -func (ap *availablePlugins) publishMetrics(contentType string, content []byte, pluginName string, pluginVersion int, config map[string]ctypes.ConfigValue) []error { +func (ap *availablePlugins) publishMetrics(contentType string, content []byte, pluginName string, pluginVersion int, config map[string]ctypes.ConfigValue, taskID string) []error { var errs []error key := strings.Join([]string{plugin.PublisherPluginType.String(), pluginName, strconv.Itoa(pluginVersion)}, ":") pool, serr := ap.getPool(key) @@ -718,13 +428,13 @@ func (ap *availablePlugins) publishMetrics(contentType string, content []byte, p pool.RLock() defer pool.RUnlock() - p, err := pool.selectAP() + p, err := pool.SelectAP(taskID) if err != nil { errs = append(errs, err) return errs } - cli, ok := p.client.(client.PluginPublisherClient) + cli, ok := p.(*availablePlugin).client.(client.PluginPublisherClient) if !ok { return []error{errors.New("unable to cast client to PluginPublisherClient")} } @@ -733,12 +443,12 @@ func (ap *availablePlugins) publishMetrics(contentType string, content []byte, p if errp != nil { return []error{errp} } - p.hitCount++ - p.lastHitTime = time.Now() + p.(*availablePlugin).hitCount++ + p.(*availablePlugin).lastHitTime = time.Now() return nil } -func (ap *availablePlugins) processMetrics(contentType string, content []byte, pluginName string, pluginVersion int, config map[string]ctypes.ConfigValue) (string, []byte, []error) { +func (ap *availablePlugins) processMetrics(contentType string, content []byte, pluginName string, pluginVersion int, config map[string]ctypes.ConfigValue, taskID string) (string, []byte, []error) { var errs []error key := strings.Join([]string{plugin.ProcessorPluginType.String(), pluginName, strconv.Itoa(pluginVersion)}, ":") pool, serr := ap.getPool(key) @@ -752,13 +462,13 @@ func (ap *availablePlugins) processMetrics(contentType string, content []byte, p pool.RLock() defer pool.RUnlock() - p, err := pool.selectAP() + p, err := pool.SelectAP(taskID) if err != nil { errs = append(errs, err) return "", nil, errs } - cli, ok := p.client.(client.PluginProcessorClient) + cli, ok := p.(*availablePlugin).client.(client.PluginProcessorClient) if !ok { return "", nil, []error{errors.New("unable to cast client to PluginProcessorClient")} } @@ -767,14 +477,14 @@ func (ap *availablePlugins) processMetrics(contentType string, content []byte, p if errp != nil { return "", nil, []error{errp} } - p.hitCount++ - p.lastHitTime = time.Now() + p.(*availablePlugin).hitCount++ + p.(*availablePlugin).lastHitTime = time.Now() return ct, c, nil } -func (ap *availablePlugins) findLatestPool(pType, name string) (*apPool, serror.SnapError) { +func (ap *availablePlugins) findLatestPool(pType, name string) (strategy.Pool, serror.SnapError) { // see if there exists a pool at all which matches name version. - var latest *apPool + var latest strategy.Pool for key, pool := range ap.table { tnv := strings.Split(key, ":") if tnv[0] == pType && tnv[1] == name { @@ -785,7 +495,7 @@ func (ap *availablePlugins) findLatestPool(pType, name string) (*apPool, serror. if latest != nil { for key, pool := range ap.table { tnv := strings.Split(key, ":") - if tnv[0] == pType && tnv[1] == name && pool.version > latest.version { + if tnv[0] == pType && tnv[1] == name && pool.Version() > latest.Version() { latest = pool } } @@ -795,13 +505,13 @@ func (ap *availablePlugins) findLatestPool(pType, name string) (*apPool, serror. return nil, nil } -func (ap *availablePlugins) getOrCreatePool(key string) (*apPool, error) { +func (ap *availablePlugins) getOrCreatePool(key string) (strategy.Pool, error) { var err error pool, ok := ap.table[key] if ok { return pool, nil } - pool, err = newPool(key) + pool, err = strategy.NewPool(key) if err != nil { return nil, err } @@ -809,30 +519,18 @@ func (ap *availablePlugins) getOrCreatePool(key string) (*apPool, error) { return pool, nil } -func (ap *availablePlugins) selectAP(key string) (*availablePlugin, serror.SnapError) { - ap.RLock() - defer ap.RUnlock() - - pool, err := ap.getPool(key) - if err != nil { - return nil, err - } - - return pool.selectAP() -} - -func (ap *availablePlugins) pools() map[string]*apPool { +func (ap *availablePlugins) pools() map[string]strategy.Pool { ap.RLock() defer ap.RUnlock() return ap.table } -func (ap *availablePlugins) all() []*availablePlugin { - var aps = []*availablePlugin{} +func (ap *availablePlugins) all() []strategy.AvailablePlugin { + var aps = []strategy.AvailablePlugin{} ap.RLock() defer ap.RUnlock() for _, pool := range ap.table { - for _, ap := range pool.plugins { + for _, ap := range pool.Plugins() { aps = append(aps, ap) } } diff --git a/control/available_plugin_test.go b/control/available_plugin_test.go index 165719ddf..ae8c6e449 100644 --- a/control/available_plugin_test.go +++ b/control/available_plugin_test.go @@ -84,7 +84,7 @@ func TestAvailablePlugins(t *testing.T) { pool, err := aps.getPool("collector:test:1") So(err, ShouldBeNil) - nap, ok := pool.plugins[ap.id] + nap, ok := pool.Plugins()[ap.id] So(ok, ShouldBeTrue) So(nap, ShouldEqual, ap) }) diff --git a/control/control.go b/control/control.go index ea80b03da..da6e95ec3 100644 --- a/control/control.go +++ b/control/control.go @@ -131,7 +131,7 @@ type PluginControlOpt func(*pluginControl) // MaxRunningPlugins sets the maximum number of plugins to run per pool func MaxRunningPlugins(m int) PluginControlOpt { return func(c *pluginControl) { - maximumRunningPlugins = m + strategy.MaximumRunningPlugins = m } } @@ -627,8 +627,8 @@ func (p *pluginControl) SubscribeDeps(taskID string, mts []core.Metric, plugins serrs = append(serrs, serror.New(err)) return serrs } - pool.subscribe(taskID, unboundSubscriptionType) - if pool.eligible() { + pool.Subscribe(taskID, strategy.UnboundSubscriptionType) + if pool.Eligible() { err = p.verifyPlugin(latest) if err != nil { serrs = append(serrs, serror.New(err)) @@ -646,8 +646,8 @@ func (p *pluginControl) SubscribeDeps(taskID string, mts []core.Metric, plugins serrs = append(serrs, serror.New(err)) return serrs } - pool.subscribe(taskID, boundSubscriptionType) - if pool.eligible() { + pool.Subscribe(taskID, strategy.BoundSubscriptionType) + if pool.Eligible() { pl, err := p.pluginManager.get(fmt.Sprintf("%s:%s:%d", sub.TypeName(), sub.Name(), sub.Version())) if err != nil { serrs = append(serrs, serror.New(err)) @@ -699,10 +699,10 @@ func (p *pluginControl) sendPluginSubscriptionEvent(taskID string, pl core.Plugi PluginType: int(pt), PluginName: pl.Name(), PluginVersion: pl.Version(), - SubscriptionType: int(unboundSubscriptionType), + SubscriptionType: int(strategy.UnboundSubscriptionType), } if pl.Version() > 0 { - e.SubscriptionType = int(boundSubscriptionType) + e.SubscriptionType = int(strategy.BoundSubscriptionType) } if _, err := p.eventManager.Emit(e); err != nil { return serror.New(err) @@ -726,7 +726,7 @@ func (p *pluginControl) UnsubscribeDeps(taskID string, mts []core.Metric, plugin return serrs } if pool != nil { - pool.unsubscribe(taskID) + pool.Unsubscribe(taskID) } serr := p.sendPluginUnsubscriptionEvent(taskID, sub) if serr != nil { @@ -836,7 +836,7 @@ func (p *pluginControl) MetricExists(mns []string, ver int) bool { // CollectMetrics is a blocking call to collector plugins returning a collection // of metrics and errors. If an error is encountered no metrics will be // returned. -func (p *pluginControl) CollectMetrics(metricTypes []core.Metric, deadline time.Time) (metrics []core.Metric, errs []error) { +func (p *pluginControl) CollectMetrics(metricTypes []core.Metric, deadline time.Time, taskID string) (metrics []core.Metric, errs []error) { pluginToMetricMap, err := groupMetricTypesByPlugin(p.metricCatalog, metricTypes) if err != nil { errs = append(errs, err) @@ -859,7 +859,7 @@ func (p *pluginControl) CollectMetrics(metricTypes []core.Metric, deadline time. wg.Add(1) go func(pluginKey string, mt []core.Metric) { - mts, err := p.pluginRunner.AvailablePlugins().collectMetrics(pluginKey, mt) + mts, err := p.pluginRunner.AvailablePlugins().collectMetrics(pluginKey, mt, taskID) if err != nil { cError <- err } else { @@ -893,23 +893,23 @@ func (p *pluginControl) CollectMetrics(metricTypes []core.Metric, deadline time. } // PublishMetrics -func (p *pluginControl) PublishMetrics(contentType string, content []byte, pluginName string, pluginVersion int, config map[string]ctypes.ConfigValue) []error { +func (p *pluginControl) PublishMetrics(contentType string, content []byte, pluginName string, pluginVersion int, config map[string]ctypes.ConfigValue, taskID string) []error { // merge global plugin config into the config for this request cfg := p.Config.Plugins.getPluginConfigDataNode(core.PublisherPluginType, pluginName, pluginVersion).Table() for k, v := range cfg { config[k] = v } - return p.pluginRunner.AvailablePlugins().publishMetrics(contentType, content, pluginName, pluginVersion, config) + return p.pluginRunner.AvailablePlugins().publishMetrics(contentType, content, pluginName, pluginVersion, config, taskID) } // ProcessMetrics -func (p *pluginControl) ProcessMetrics(contentType string, content []byte, pluginName string, pluginVersion int, config map[string]ctypes.ConfigValue) (string, []byte, []error) { +func (p *pluginControl) ProcessMetrics(contentType string, content []byte, pluginName string, pluginVersion int, config map[string]ctypes.ConfigValue, taskID string) (string, []byte, []error) { // merge global plugin config into the config for this request cfg := p.Config.Plugins.getPluginConfigDataNode(core.ProcessorPluginType, pluginName, pluginVersion).Table() for k, v := range cfg { config[k] = v } - return p.pluginRunner.AvailablePlugins().processMetrics(contentType, content, pluginName, pluginVersion, config) + return p.pluginRunner.AvailablePlugins().processMetrics(contentType, content, pluginName, pluginVersion, config, taskID) } // GetPluginContentTypes returns accepted and returned content types for the diff --git a/control/control_test.go b/control/control_test.go index d29e09848..86f4ed608 100644 --- a/control/control_test.go +++ b/control/control_test.go @@ -25,11 +25,13 @@ import ( "encoding/json" "errors" "fmt" + "math/rand" "path" "strings" "testing" "time" + "github.com/pborman/uuid" . "github.com/smartystreets/goconvey/convey" "github.com/intelsdi-x/gomit" @@ -824,6 +826,131 @@ func TestMetricConfig(t *testing.T) { }) } +func TestRoutingCachingStrategy(t *testing.T) { + Convey("Given loaded plugins that use sticky routing", t, func() { + config := NewConfig() + config.Plugins.All.AddItem("password", ctypes.ConfigValueStr{Value: "testval"}) + c := New(OptSetConfig(config)) + c.Start() + lpe := newListenToPluginEvent() + c.eventManager.RegisterHandler("Control.PluginLoaded", lpe) + _, e := load(c, PluginPath) + So(e, ShouldBeNil) + if e != nil { + t.FailNow() + } + metric, err := c.metricCatalog.Get([]string{"intel", "mock", "foo"}, 2) + So(err, ShouldBeNil) + So(metric.NamespaceAsString(), ShouldResemble, "/intel/mock/foo") + So(err, ShouldBeNil) + <-lpe.done + Convey("Start the plugins", func() { + lp, err := c.pluginManager.get("collector:mock:2") + So(err, ShouldBeNil) + So(lp, ShouldNotBeNil) + pool, errp := c.pluginRunner.AvailablePlugins().getOrCreatePool("collector:mock:2") + So(errp, ShouldBeNil) + So(pool, ShouldNotBeNil) + tasks := []string{ + uuid.New(), + uuid.New(), + uuid.New(), + uuid.New(), + uuid.New(), + } + for _, id := range tasks { + pool.Subscribe(id, strategy.BoundSubscriptionType) + err = c.pluginRunner.runPlugin(lp.Details) + So(err, ShouldBeNil) + } + // The cache ttl should be 100ms which is what the plugin exposed (no system default was provided) + ttl, err := pool.CacheTTL(tasks[0]) + So(err, ShouldBeNil) + So(ttl, ShouldResemble, 100*time.Millisecond) + So(pool.Strategy(), ShouldHaveSameTypeAs, strategy.NewSticky(ttl)) + So(pool.Count(), ShouldEqual, len(tasks)) + So(pool.SubscriptionCount(), ShouldEqual, len(tasks)) + Convey("Collect metrics", func() { + taskID := tasks[rand.Intn(len(tasks))] + for i := 0; i < 10; i++ { + _, errs := c.CollectMetrics([]core.Metric{metric}, time.Now().Add(time.Second*1), taskID) + So(errs, ShouldBeEmpty) + } + Convey("Check cache stats", func() { + So(pool.AllCacheHits(), ShouldEqual, 9) + So(pool.AllCacheMisses(), ShouldEqual, 1) + }) + }) + }) + }) + + Convey("Given loaded plugins that use least-recently-used routing", t, func() { + c := New() + c.Start() + c.Config.Plugins.Collector.Plugins["mock"] = newPluginConfigItem( + optAddPluginConfigItem("test", ctypes.ConfigValueBool{Value: true}), + optAddPluginConfigItem("user", ctypes.ConfigValueStr{Value: "jane"}), + optAddPluginConfigItem("password", ctypes.ConfigValueStr{Value: "doe"}), + ) + lpe := newListenToPluginEvent() + c.eventManager.RegisterHandler("Control.PluginLoaded", lpe) + _, e := load(c, JSONRPCPluginPath) + So(e, ShouldBeNil) + if e != nil { + t.FailNow() + } + metric, err := c.metricCatalog.Get([]string{"intel", "mock", "foo"}, 1) + metric.config = cdata.NewNode() + So(err, ShouldBeNil) + So(metric.NamespaceAsString(), ShouldResemble, "/intel/mock/foo") + So(err, ShouldBeNil) + <-lpe.done + Convey("Start the plugins", func() { + lp, err := c.pluginManager.get("collector:mock:1") + So(err, ShouldBeNil) + So(lp, ShouldNotBeNil) + pool, errp := c.pluginRunner.AvailablePlugins().getOrCreatePool("collector:mock:1") + time.Sleep(1 * time.Second) + So(errp, ShouldBeNil) + So(pool, ShouldNotBeNil) + tasks := []string{ + uuid.New(), + uuid.New(), + uuid.New(), + uuid.New(), + uuid.New(), + } + for _, id := range tasks { + pool.Subscribe(id, strategy.BoundSubscriptionType) + err = c.pluginRunner.runPlugin(lp.Details) + So(err, ShouldBeNil) + } + // The cache ttl should be 100ms which is what the plugin exposed (no system default was provided) + ttl, err := pool.CacheTTL(tasks[0]) + So(err, ShouldBeNil) + So(ttl, ShouldResemble, 1100*time.Millisecond) + So(pool.Strategy(), ShouldHaveSameTypeAs, strategy.NewLRU(ttl)) + So(pool.Count(), ShouldEqual, len(tasks)) + So(pool.SubscriptionCount(), ShouldEqual, len(tasks)) + Convey("Collect metrics", func() { + taskID := tasks[rand.Intn(len(tasks))] + for i := 0; i < 10; i++ { + cr, errs := c.CollectMetrics([]core.Metric{metric}, time.Now().Add(time.Second*1), taskID) + So(errs, ShouldBeEmpty) + for i := range cr { + So(cr[i].Data(), ShouldContainSubstring, "The mock collected data!") + So(cr[i].Data(), ShouldContainSubstring, "test=true") + } + } + Convey("Check cache stats", func() { + So(pool.AllCacheHits(), ShouldEqual, 9) + So(pool.AllCacheMisses(), ShouldEqual, 1) + }) + }) + }) + }) +} + func TestCollectDynamicMetrics(t *testing.T) { Convey("given a plugin using the native client", t, func() { config := NewConfig() @@ -873,30 +1000,38 @@ func TestCollectDynamicMetrics(t *testing.T) { pool, errp := c.pluginRunner.AvailablePlugins().getOrCreatePool("collector:mock:2") So(errp, ShouldBeNil) So(pool, ShouldNotBeNil) - ttl, err := pool.CacheTTL() - So(err, ShouldResemble, ErrPoolEmpty) + taskID := uuid.New() + ttl, err := pool.CacheTTL(taskID) + So(err, ShouldResemble, strategy.ErrPoolEmpty) So(ttl, ShouldEqual, 0) - pool.subscribe("1", unboundSubscriptionType) + So(pool.Count(), ShouldEqual, 0) + So(pool.SubscriptionCount(), ShouldEqual, 0) + pool.Subscribe(taskID, strategy.UnboundSubscriptionType) err = c.pluginRunner.runPlugin(lp.Details) + So(pool.Count(), ShouldEqual, 1) + So(pool.SubscriptionCount(), ShouldEqual, 1) So(err, ShouldBeNil) - ttl, err = pool.CacheTTL() + ttl, err = pool.CacheTTL(taskID) So(err, ShouldBeNil) // The minimum TTL advertised by the plugin is 100ms therefore the TTL for the // pool should be the global cache expiration So(ttl, ShouldEqual, strategy.GlobalCacheExpiration) - mts, errs := c.CollectMetrics([]core.Metric{m}, time.Now().Add(time.Second*1)) - hits, err := pool.CacheHits(core.JoinNamespace(m.namespace), 2) + mts, errs := c.CollectMetrics([]core.Metric{m}, time.Now().Add(time.Second*1), taskID) + hits, err := pool.CacheHits(core.JoinNamespace(m.namespace), 2, taskID) So(err, ShouldBeNil) So(hits, ShouldEqual, 0) So(errs, ShouldBeNil) So(len(mts), ShouldEqual, 10) - mts, errs = c.CollectMetrics([]core.Metric{m}, time.Now().Add(time.Second*1)) - hits, err = pool.CacheHits(core.JoinNamespace(m.namespace), 2) + mts, errs = c.CollectMetrics([]core.Metric{m}, time.Now().Add(time.Second*1), taskID) + hits, err = pool.CacheHits(core.JoinNamespace(m.namespace), 2, taskID) So(err, ShouldBeNil) So(hits, ShouldEqual, 1) So(errs, ShouldBeNil) So(len(mts), ShouldEqual, 10) - pool.unsubscribe("1") + pool.Unsubscribe(taskID) + pool.SelectAndKill(taskID, "unsubscription event") + So(pool.Count(), ShouldEqual, 0) + So(pool.SubscriptionCount(), ShouldEqual, 0) Convey("collects metrics from plugin using httpjson client", func() { lp, err := c.pluginManager.get("collector:mock:1") So(err, ShouldBeNil) @@ -904,33 +1039,40 @@ func TestCollectDynamicMetrics(t *testing.T) { pool, errp := c.pluginRunner.AvailablePlugins().getOrCreatePool("collector:mock:1") So(errp, ShouldBeNil) So(pool, ShouldNotBeNil) - ttl, err := pool.CacheTTL() - So(err, ShouldResemble, ErrPoolEmpty) + ttl, err := pool.CacheTTL(taskID) + So(err, ShouldResemble, strategy.ErrPoolEmpty) So(ttl, ShouldEqual, 0) - pool.subscribe("1", unboundSubscriptionType) + So(pool.Count(), ShouldEqual, 0) + So(pool.SubscriptionCount(), ShouldEqual, 0) + pool.Subscribe("1", strategy.UnboundSubscriptionType) err = c.pluginRunner.runPlugin(lp.Details) + So(pool.Count(), ShouldEqual, 1) + So(pool.SubscriptionCount(), ShouldEqual, 1) So(err, ShouldBeNil) - ttl, err = pool.CacheTTL() + ttl, err = pool.CacheTTL(taskID) So(err, ShouldBeNil) So(ttl, ShouldEqual, 1100*time.Millisecond) - mts, errs := c.CollectMetrics([]core.Metric{jsonm}, time.Now().Add(time.Second*1)) - hits, err := pool.CacheHits(core.JoinNamespace(jsonm.namespace), jsonm.version) - So(pool.subscriptionCount(), ShouldEqual, 1) - So(pool.strategy, ShouldNotBeNil) + mts, errs := c.CollectMetrics([]core.Metric{jsonm}, time.Now().Add(time.Second*1), uuid.New()) + hits, err := pool.CacheHits(core.JoinNamespace(jsonm.namespace), jsonm.version, taskID) + So(pool.SubscriptionCount(), ShouldEqual, 1) + So(pool.Strategy, ShouldNotBeNil) So(len(mts), ShouldBeGreaterThan, 0) So(err, ShouldBeNil) So(hits, ShouldEqual, 0) So(errs, ShouldBeNil) So(len(mts), ShouldEqual, 10) - mts, errs = c.CollectMetrics([]core.Metric{jsonm}, time.Now().Add(time.Second*1)) - hits, err = pool.CacheHits(core.JoinNamespace(m.namespace), 1) + mts, errs = c.CollectMetrics([]core.Metric{jsonm}, time.Now().Add(time.Second*1), uuid.New()) + hits, err = pool.CacheHits(core.JoinNamespace(m.namespace), 1, taskID) So(err, ShouldBeNil) So(hits, ShouldEqual, 1) So(errs, ShouldBeNil) So(len(mts), ShouldEqual, 10) So(pool.AllCacheHits(), ShouldEqual, 1) So(pool.AllCacheMisses(), ShouldEqual, 1) - pool.unsubscribe("1") + pool.Unsubscribe("1") + pool.SelectAndKill("1", "unsubscription event") + So(pool.Count(), ShouldEqual, 0) + So(pool.SubscriptionCount(), ShouldEqual, 0) c.Stop() time.Sleep(100 * time.Millisecond) }) @@ -987,16 +1129,16 @@ func TestCollectMetrics(t *testing.T) { Convey("create a pool, add subscriptions and start plugins", func() { pool, errp := c.pluginRunner.AvailablePlugins().getOrCreatePool("collector:mock:1") So(errp, ShouldBeNil) - pool.subscribe("1", unboundSubscriptionType) + pool.Subscribe("1", strategy.UnboundSubscriptionType) err = c.pluginRunner.runPlugin(lp.Details) So(err, ShouldBeNil) - pool.subscribe("2", unboundSubscriptionType) + pool.Subscribe("2", strategy.UnboundSubscriptionType) err = c.pluginRunner.runPlugin(lp.Details) So(err, ShouldBeNil) m = append(m, m1, m2, m3) Convey("collect metrics", func() { for x := 0; x < 4; x++ { - cr, err := c.CollectMetrics(m, time.Now().Add(time.Second*1)) + cr, err := c.CollectMetrics(m, time.Now().Add(time.Second*1), uuid.New()) So(err, ShouldBeNil) for i := range cr { So(cr[i].Data(), ShouldContainSubstring, "The mock collected data!") @@ -1005,12 +1147,13 @@ func TestCollectMetrics(t *testing.T) { } ap := c.AvailablePlugins() So(ap, ShouldNotBeEmpty) - So(pool.strategy.String(), ShouldEqual, plugin.DefaultRouting.String()) - So(len(pool.plugins), ShouldEqual, 2) - for _, p := range pool.plugins { - So(p.hitCount, ShouldEqual, 2) - So(p.hitCount, ShouldEqual, 2) - } + So(pool.Strategy().String(), ShouldEqual, plugin.DefaultRouting.String()) + So(len(pool.Plugins()), ShouldEqual, 2) + // when the first first plugin is hit the cache is populated the + // cache satifies the next 3 collect calls that come in within the + // cache duration + So(pool.Plugins()[1].HitCount(), ShouldEqual, 1) + So(pool.Plugins()[2].HitCount(), ShouldEqual, 0) c.Stop() }) }) @@ -1027,7 +1170,7 @@ func TestCollectMetrics(t *testing.T) { c.Start() load(c, PluginPath) m := []core.Metric{} - c.CollectMetrics(m, time.Now().Add(time.Second*60)) + c.CollectMetrics(m, time.Now().Add(time.Second*60), uuid.New()) c.Stop() time.Sleep(100 * time.Millisecond) }) @@ -1088,7 +1231,7 @@ func TestPublishMetrics(t *testing.T) { config.Plugins.Publisher.Plugins[lp.Name()] = newPluginConfigItem(optAddPluginConfigItem("file", ctypes.ConfigValueStr{Value: "/tmp/snap-TestPublishMetrics.out"})) pool, errp := c.pluginRunner.AvailablePlugins().getOrCreatePool("publisher:file:3") So(errp, ShouldBeNil) - pool.subscribe("1", unboundSubscriptionType) + pool.Subscribe("1", strategy.UnboundSubscriptionType) err := c.pluginRunner.runPlugin(lp.Details) So(err, ShouldBeNil) time.Sleep(2500 * time.Millisecond) @@ -1101,7 +1244,7 @@ func TestPublishMetrics(t *testing.T) { enc := gob.NewEncoder(&buf) enc.Encode(metrics) contentType := plugin.SnapGOBContentType - errs := c.PublishMetrics(contentType, buf.Bytes(), "file", 3, n.Table()) + errs := c.PublishMetrics(contentType, buf.Bytes(), "file", 3, n.Table(), uuid.New()) So(errs, ShouldBeNil) ap := c.AvailablePlugins() So(ap, ShouldNotBeEmpty) @@ -1141,7 +1284,7 @@ func TestProcessMetrics(t *testing.T) { n := cdata.NewNode() pool, errp := c.pluginRunner.AvailablePlugins().getOrCreatePool("processor:passthru:1") So(errp, ShouldBeNil) - pool.subscribe("1", unboundSubscriptionType) + pool.Subscribe("1", strategy.UnboundSubscriptionType) err := c.pluginRunner.runPlugin(lp.Details) So(err, ShouldBeNil) time.Sleep(2500 * time.Millisecond) @@ -1154,7 +1297,7 @@ func TestProcessMetrics(t *testing.T) { enc := gob.NewEncoder(&buf) enc.Encode(metrics) contentType := plugin.SnapGOBContentType - _, ct, errs := c.ProcessMetrics(contentType, buf.Bytes(), "passthru", 1, n.Table()) + _, ct, errs := c.ProcessMetrics(contentType, buf.Bytes(), "passthru", 1, n.Table(), uuid.New()) So(errs, ShouldBeEmpty) mts := []plugin.PluginMetricType{} dec := gob.NewDecoder(bytes.NewBuffer(ct)) diff --git a/control/monitor_test.go b/control/monitor_test.go index 56ee58e4c..fbb3fc96e 100644 --- a/control/monitor_test.go +++ b/control/monitor_test.go @@ -80,7 +80,7 @@ func TestMonitor(t *testing.T) { Convey("health monitor", func() { for _, ap := range aps.all() { So(ap, ShouldNotBeNil) - So(ap.failedHealthChecks, ShouldBeGreaterThan, 3) + So(ap.(*availablePlugin).failedHealthChecks, ShouldBeGreaterThan, 3) } }) }) diff --git a/control/runner.go b/control/runner.go index c1180ee18..375872597 100644 --- a/control/runner.go +++ b/control/runner.go @@ -31,6 +31,7 @@ import ( "github.com/intelsdi-x/gomit" "github.com/intelsdi-x/snap/control/plugin" + "github.com/intelsdi-x/snap/control/strategy" "github.com/intelsdi-x/snap/core" "github.com/intelsdi-x/snap/core/control_event" "github.com/intelsdi-x/snap/pkg/aci" @@ -218,16 +219,12 @@ func (r *runner) startPlugin(p executablePlugin) (*availablePlugin, error) { } func (r *runner) stopPlugin(reason string, ap *availablePlugin) error { - err := ap.Stop(reason) - if err != nil { - return err - } pool, err := r.availablePlugins.getPool(ap.key) if err != nil { return err } if pool != nil { - pool.remove(ap.id) + pool.Kill(ap.id, reason) } return nil } @@ -251,7 +248,7 @@ func (r *runner) HandleGomitEvent(e gomit.Event) { return } if pool != nil { - pool.kill(v.Id, "plugin dead") + pool.Kill(v.Id, "plugin dead") } case *control_event.PluginUnsubscriptionEvent: runnerLog.WithFields(log.Fields{ @@ -267,7 +264,7 @@ func (r *runner) HandleGomitEvent(e gomit.Event) { return } case *control_event.LoadPluginEvent: - var pool *apPool + var pool strategy.Pool r.availablePlugins.RLock() for key, p := range r.availablePlugins.pools() { // tuple of type name and version @@ -288,7 +285,7 @@ func (r *runner) HandleGomitEvent(e gomit.Event) { // attempt to find a pool whose type and name are the same, and whose version is // less than newly loaded plugin. If we find it, break out of loop. - if core.PluginType(v.Type).String() == tnv[0] && v.Name == tnv[1] && v.Version > p.version { + if core.PluginType(v.Type).String() == tnv[0] && v.Name == tnv[1] && v.Version > p.Version() { pool = p break } @@ -316,7 +313,7 @@ func (r *runner) HandleGomitEvent(e gomit.Event) { return } - subs := pool.moveSubscriptions(newPool) + subs := pool.MoveSubscriptions(newPool) if len(subs) != 0 { runnerLog.WithFields(log.Fields{ "_block": "subscribe-pool", @@ -329,21 +326,21 @@ func (r *runner) HandleGomitEvent(e gomit.Event) { r.emitter.Emit(&control_event.PluginSubscriptionEvent{ PluginName: v.Name, PluginVersion: v.Version, - TaskId: sub.taskID, + TaskId: sub.TaskID, PluginType: v.Type, - SubscriptionType: int(unboundSubscriptionType), + SubscriptionType: int(strategy.UnboundSubscriptionType), }) r.emitter.Emit(&control_event.PluginUnsubscriptionEvent{ PluginName: v.Name, - PluginVersion: pool.version, - TaskId: sub.taskID, + PluginVersion: pool.Version(), + TaskId: sub.TaskID, PluginType: v.Type, }) r.emitter.Emit(&control_event.MovePluginSubscriptionEvent{ PluginName: v.Name, - PreviousVersion: pool.version, + PreviousVersion: pool.Version(), NewVersion: v.Version, - TaskId: sub.taskID, + TaskId: sub.TaskID, PluginType: v.Type, }) } @@ -415,13 +412,13 @@ func (r *runner) handleUnsubscription(pType, pName string, pVersion int, taskID }).Error("pool not found") return errors.New("pool not found") } - if pool.subscriptionCount() < pool.count() { + if pool.SubscriptionCount() < pool.Count() { runnerLog.WithFields(log.Fields{ "_block": "handle-unsubscription", - "pool-count": pool.count(), - "pool-subscription-count": pool.subscriptionCount(), + "pool-count": pool.Count(), + "pool-subscription-count": pool.SubscriptionCount(), }).Debug(fmt.Sprintf("killing an available plugin in pool %s:%s:%d", pType, pName, pVersion)) - pool.killLeastUsed("unsubscription event") + pool.SelectAndKill(taskID, "unsubscription event") } return nil } diff --git a/control/strategy/cache.go b/control/strategy/cache.go index 0376b14a3..e4f831d8c 100644 --- a/control/strategy/cache.go +++ b/control/strategy/cache.go @@ -77,20 +77,19 @@ func (c *cache) get(ns string, version int) interface{} { return cell.metric } return cell.metrics - } else { - if !ok { - c.table[key] = &cachecell{ - time: time.Time{}, - metrics: nil, - } + } + if !ok { + c.table[key] = &cachecell{ + time: time.Time{}, + metrics: nil, } - c.table[key].misses++ - cacheLog.WithFields(log.Fields{ - "namespace": key, - "hits": c.table[key].hits, - "misses": c.table[key].misses, - }).Debug(fmt.Sprintf("cache miss [%s]", key)) } + c.table[key].misses++ + cacheLog.WithFields(log.Fields{ + "namespace": key, + "hits": c.table[key].hits, + "misses": c.table[key].misses, + }).Debug(fmt.Sprintf("cache miss [%s]", key)) return nil } @@ -102,6 +101,7 @@ func (c *cache) put(ns string, version int, m interface{}) { c.table[key].time = chrono.Chrono.Now() c.table[key].metric = metric } else { + log.Errorf("%+v", c.table) c.table[key] = &cachecell{ time: chrono.Chrono.Now(), metric: metric, @@ -124,3 +124,81 @@ func (c *cache) put(ns string, version int, m interface{}) { }).Error("unsupported type") } } + +func (c *cache) checkCache(mts []core.Metric) (metricsToCollect []core.Metric, fromCache []core.Metric) { + for _, mt := range mts { + if m := c.get(core.JoinNamespace(mt.Namespace()), mt.Version()); m != nil { + switch metric := m.(type) { + case core.Metric: + fromCache = append(fromCache, metric) + case []core.Metric: + for _, met := range metric { + fromCache = append(fromCache, met) + } + default: + cacheLog.WithFields(log.Fields{ + "_block": "checkCache", + }).Error("unsupported type found in the cache") + } + } else { + metricsToCollect = append(metricsToCollect, mt) + } + } + return metricsToCollect, fromCache +} + +func (c *cache) updateCache(mts []core.Metric) { + results := []core.Metric{} + dc := map[string][]core.Metric{} + for _, mt := range mts { + if mt.Labels() == nil { + // cache the individual metric + c.put(core.JoinNamespace(mt.Namespace()), mt.Version(), mt) + } else { + // collect the dynamic query results so we can cache + ns := make([]string, len(mt.Namespace())) + copy(ns, mt.Namespace()) + for _, label := range mt.Labels() { + ns[label.Index] = "*" + } + if _, ok := dc[core.JoinNamespace(ns)]; !ok { + dc[core.JoinNamespace(ns)] = []core.Metric{} + } + dc[core.JoinNamespace(ns)] = append(dc[core.JoinNamespace(ns)], mt) + c.put(core.JoinNamespace(ns), mt.Version(), dc[core.JoinNamespace(ns)]) + } + results = append(results, mt) + } +} + +func (c *cache) allCacheHits() uint64 { + var hits uint64 + for _, v := range c.table { + hits += v.hits + } + return hits +} + +func (c *cache) allCacheMisses() uint64 { + var misses uint64 + for _, v := range c.table { + misses += v.misses + } + return misses +} + +func (c *cache) cacheHits(ns string, version int) (uint64, error) { + key := fmt.Sprintf("%v:%v", ns, version) + if v, ok := c.table[key]; ok { + return v.hits, nil + } + return 0, ErrCacheEntryDoesNotExist +} + +func (c *cache) cacheMisses(ns string, version int) (uint64, error) { + key := fmt.Sprintf("%v:%v", ns, version) + if v, ok := c.table[key]; ok { + return v.misses, nil + } + return 0, ErrCacheEntryDoesNotExist +} diff --git a/control/strategy/lru.go b/control/strategy/lru.go index 7898693f8..1358716fb 100644 --- a/control/strategy/lru.go +++ b/control/strategy/lru.go @@ -20,46 +20,39 @@ limitations under the License. package strategy import ( - "errors" - "fmt" "time" log "github.com/Sirupsen/logrus" "github.com/intelsdi-x/snap/core" ) -var ( - ErrorCouldNotSelect = errors.New("could not select a plugin (round robin strategy)") -) - // lru provides a stragey that selects the least recently used available plugin. type lru struct { - metricCache *cache - logger *log.Entry + *cache + logger *log.Entry } func NewLRU(cacheTTL time.Duration) *lru { return &lru{ - metricCache: NewCache(cacheTTL), - logger: log.WithFields(log.Fields{ + NewCache(cacheTTL), + log.WithFields(log.Fields{ "_module": "control-routing", }), } } -func (l *lru) Cache() *cache { - return l.metricCache -} - +// String returns the strategy name. func (l *lru) String() string { return "least-recently-used" } -func (l *lru) CacheTTL() time.Duration { - return l.Cache().ttl +// CacheTTL returns the TTL for the cache. +func (l *lru) CacheTTL(taskID string) (time.Duration, error) { + return l.ttl, nil } -func (l *lru) Select(spa []SelectablePlugin) (SelectablePlugin, error) { +// Select selects an available plugin using the least-recently-used strategy. +func (l *lru) Select(spa []SelectablePlugin, _ string) (SelectablePlugin, error) { t := time.Now() index := -1 for i, sp := range spa { @@ -81,99 +74,51 @@ func (l *lru) Select(spa []SelectablePlugin) (SelectablePlugin, error) { } l.logger.WithFields(log.Fields{ "block": "select", - "strategy": "round-robin", - "error": ErrorCouldNotSelect, + "strategy": l.String(), + "error": ErrCouldNotSelect, }).Error("error selecting") - return nil, ErrorCouldNotSelect + return nil, ErrCouldNotSelect +} + +// Remove selects a plugin +// Since there is no state to cleanup we only need to return the selected plugin +func (l *lru) Remove(sp []SelectablePlugin, taskID string) (SelectablePlugin, error) { + p, err := l.Select(sp, taskID) + if err != nil { + return nil, err + } + return p, nil } // checkCache checks the cache for metric types. // returns: // - array of metrics that need to be collected // - array of metrics that were returned from the cache -func (l *lru) CheckCache(mts []core.Metric) ([]core.Metric, []core.Metric) { - var fromCache []core.Metric - var metricsToCollect []core.Metric - for _, mt := range mts { - if m := l.metricCache.get(core.JoinNamespace(mt.Namespace()), mt.Version()); m != nil { - switch metric := m.(type) { - case core.Metric: - fromCache = append(fromCache, metric) - case []core.Metric: - for _, met := range metric { - fromCache = append(fromCache, met) - } - default: - l.logger.WithFields(log.Fields{ - "_module": "client", - "_block": "checkCache", - }).Error("unsupported type found in the cache") - } - } else { - metricsToCollect = append(metricsToCollect, mt) - } - } - return metricsToCollect, fromCache +func (l *lru) CheckCache(mts []core.Metric, _ string) ([]core.Metric, []core.Metric) { + return l.checkCache(mts) } // updateCache updates the cache with the given array of metrics. -func (l *lru) UpdateCache(mts []core.Metric) { - results := []core.Metric{} - dc := map[string][]core.Metric{} - for _, mt := range mts { - if mt.Labels() == nil { - // cache the individual metric - l.metricCache.put(core.JoinNamespace(mt.Namespace()), mt.Version(), mt) - l.logger.Debugf("putting %v:%v in the cache", mt.Namespace(), mt.Version()) - } else { - // collect the dynamic query results so we can cache - ns := make([]string, len(mt.Namespace())) - copy(ns, mt.Namespace()) - for _, label := range mt.Labels() { - ns[label.Index] = "*" - } - if _, ok := dc[core.JoinNamespace(ns)]; !ok { - dc[core.JoinNamespace(ns)] = []core.Metric{} - } - dc[core.JoinNamespace(ns)] = append(dc[core.JoinNamespace(ns)], mt) - l.metricCache.put(core.JoinNamespace(ns), mt.Version(), dc[core.JoinNamespace(ns)]) - l.logger.Debugf("putting %v:%v in the cache", ns, mt.Version()) - } - results = append(results, mt) - } +func (l *lru) UpdateCache(mts []core.Metric, _ string) { + l.updateCache(mts) } +// AllCacheHits returns cache hits across all metrics. func (l *lru) AllCacheHits() uint64 { - var hits uint64 - for _, v := range l.metricCache.table { - hits += v.hits - } - return hits + return l.allCacheHits() } // AllCacheMisses returns cache misses across all metrics. func (l *lru) AllCacheMisses() uint64 { - var misses uint64 - for _, v := range l.metricCache.table { - misses += v.misses - } - return misses + return l.allCacheMisses() } // CacheHits returns the cache hits for a given metric namespace and version. -func (l *lru) CacheHits(ns string, version int) (uint64, error) { - key := fmt.Sprintf("%v:%v", ns, version) - if v, ok := l.metricCache.table[key]; ok { - return v.hits, nil - } - return 0, ErrCacheEntryDoesNotExist +func (l *lru) CacheHits(ns string, version int, _ string) (uint64, error) { + return l.cacheHits(ns, version) } // CacheMisses returns the cache misses for a given metric namespace and version. -func (l *lru) CacheMisses(ns string, version int) (uint64, error) { - key := fmt.Sprintf("%v:%v", ns, version) - if v, ok := l.metricCache.table[key]; ok { - return v.misses, nil - } - return 0, ErrCacheEntryDoesNotExist +func (l *lru) CacheMisses(ns string, version int, _ string) (uint64, error) { + return l.cacheMisses(ns, version) } diff --git a/control/strategy/pool.go b/control/strategy/pool.go new file mode 100644 index 000000000..609a7cdbf --- /dev/null +++ b/control/strategy/pool.go @@ -0,0 +1,379 @@ +/* +http://www.apache.org/licenses/LICENSE-2.0.txt + + +Copyright 2015 Intel Corporation + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package strategy + +import ( + "errors" + "strconv" + "strings" + "sync" + "sync/atomic" + "time" + + log "github.com/Sirupsen/logrus" + + "github.com/intelsdi-x/snap/control/plugin" + "github.com/intelsdi-x/snap/core" + "github.com/intelsdi-x/snap/core/serror" +) + +type subscriptionType int + +const ( + // this subscription is bound to an explicit version + BoundSubscriptionType subscriptionType = iota + // this subscription is akin to "latest" and must be moved if a newer version is loaded. + UnboundSubscriptionType +) + +var ( + // This defines the maximum running instances of a loaded plugin. + // It is initialized at runtime via the cli. + MaximumRunningPlugins = 3 +) + +var ( + ErrBadType = errors.New("bad plugin type") + ErrBadStrategy = errors.New("bad strategy") + ErrPoolEmpty = errors.New("plugin pool is empty") +) + +type Pool interface { + RoutingAndCaching + Count() int + Eligible() bool + Insert(a AvailablePlugin) error + Kill(id uint32, reason string) + MoveSubscriptions(to Pool) []subscription + Plugins() map[uint32]AvailablePlugin + RLock() + RUnlock() + SelectAndKill(taskID, reason string) + SelectAP(taskID string) (SelectablePlugin, serror.SnapError) + Strategy() RoutingAndCaching + Subscribe(taskID string, subType subscriptionType) + SubscriptionCount() int + Unsubscribe(taskID string) + Version() int +} + +type AvailablePlugin interface { + core.AvailablePlugin + CacheTTL() time.Duration + CheckHealth() + ConcurrencyCount() int + Exclusive() bool + Kill(r string) error + RoutingStrategy() plugin.RoutingStrategyType + SetID(id uint32) + String() string + Type() plugin.PluginType +} + +type subscription struct { + SubType subscriptionType + Version int + TaskID string +} + +type pool struct { + // used to coordinate changes to a pool + *sync.RWMutex + + // the version of the plugins in the pool. + // subscriptions uses this. + version int + // key is the primary key used in availablePlugins: + // {plugin_type}:{plugin_name}:{plugin_version} + key string + + // The subscriptions to this pool. + subs map[string]*subscription + + // The plugins in the pool. + // the primary key is an increasing --> uint from + // snapd epoch (`service snapd start`). + plugins map[uint32]AvailablePlugin + pidCounter uint32 + + // The max size which this pool may grow. + max int + + // The number of subscriptions per running instance + concurrencyCount int + + // The routing and caching strategy declared by the plugin. + // strategy RoutingAndCaching + RoutingAndCaching +} + +func NewPool(key string, plugins ...AvailablePlugin) (Pool, error) { + versl := strings.Split(key, ":") + ver, err := strconv.Atoi(versl[len(versl)-1]) + if err != nil { + return nil, err + } + p := &pool{ + RWMutex: &sync.RWMutex{}, + version: ver, + key: key, + subs: map[string]*subscription{}, + plugins: make(map[uint32]AvailablePlugin), + max: MaximumRunningPlugins, + concurrencyCount: 1, + } + + if len(plugins) > 0 { + for _, plg := range plugins { + p.Insert(plg) + } + } + + return p, nil +} + +// Version returns the version +func (p *pool) Version() int { + return p.version +} + +// Plugins returns a map of plugin ids to the AvailablePlugin +func (p *pool) Plugins() map[uint32]AvailablePlugin { + return p.plugins +} + +// Strategy returns the routing strategy +func (p *pool) Strategy() RoutingAndCaching { + return p.RoutingAndCaching +} + +// Insert inserts an AvailablePlugin into the pool +func (p *pool) Insert(a AvailablePlugin) error { + if a.Type() != plugin.CollectorPluginType && a.Type() != plugin.ProcessorPluginType && a.Type() != plugin.PublisherPluginType { + return ErrBadType + } + // If an empty pool is created, it does not have + // any available plugins from which to retrieve + // concurrency count or exclusivity. We ensure it + // is set correctly on an insert. + if len(p.plugins) == 0 { + if err := p.applyPluginMeta(a); err != nil { + return err + } + } + + a.SetID(p.generatePID()) + p.plugins[a.ID()] = a + + return nil +} + +// applyPluginMeta is called when the first plugin is added to the pool +func (p *pool) applyPluginMeta(a AvailablePlugin) error { + // Checking if plugin is exclusive + // (only one instance should be running). + if a.Exclusive() { + p.max = 1 + } + + // Set the cache TTL + cacheTTL := GlobalCacheExpiration + // if the plugin exposes a default TTL that is greater the the global default use it + if a.CacheTTL() != 0 && a.CacheTTL() > GlobalCacheExpiration { + cacheTTL = a.CacheTTL() + } + + // Set the concurrency count + p.concurrencyCount = a.ConcurrencyCount() + + // Set the routing and caching strategy + switch a.RoutingStrategy() { + case plugin.DefaultRouting: + p.RoutingAndCaching = NewLRU(cacheTTL) + case plugin.StickyRouting: + p.RoutingAndCaching = NewSticky(cacheTTL) + p.concurrencyCount = 1 + default: + return ErrBadStrategy + } + + return nil +} + +// subscribe adds a subscription to the pool. +// Using subscribe is idempotent. +func (p *pool) Subscribe(taskID string, subType subscriptionType) { + p.Lock() + defer p.Unlock() + + if _, exists := p.subs[taskID]; !exists { + // Version is the last item in the key, so we split here + // to retrieve it for the subscription. + p.subs[taskID] = &subscription{ + TaskID: taskID, + SubType: subType, + Version: p.version, + } + } +} + +// unsubscribe removes a subscription from the pool. +// Using unsubscribe is idempotent. +func (p *pool) Unsubscribe(taskID string) { + p.Lock() + defer p.Unlock() + delete(p.subs, taskID) +} + +// Eligible returns a bool indicating whether the pool is eligible to grow +func (p *pool) Eligible() bool { + p.RLock() + defer p.RUnlock() + + // optimization: don't even bother with concurrency + // count if we have already reached pool max + if p.Count() == p.max { + return false + } + + should := p.SubscriptionCount() / p.concurrencyCount + if should > p.Count() && should <= p.max { + return true + } + + return false +} + +// kill kills and removes the available plugin from its pool. +// Using kill is idempotent. +func (p *pool) Kill(id uint32, reason string) { + p.Lock() + defer p.Unlock() + + ap, ok := p.plugins[id] + if ok { + ap.Kill(reason) + delete(p.plugins, id) + } +} + +// SelectAndKill selects, kills and removes the available plugin from the pool +func (p *pool) SelectAndKill(taskID, reason string) { + sp := make([]SelectablePlugin, p.Count()) + i := 0 + for _, plg := range p.plugins { + sp[i] = plg + i++ + } + rp, err := p.Remove(sp, taskID) + if err != nil { + log.WithFields(log.Fields{ + "_block": "selectAndKill", + "taskID": taskID, + "reason": reason, + }).Error(err) + } + if err := rp.Kill(reason); err != nil { + log.WithFields(log.Fields{ + "_block": "selectAndKill", + "taskID": taskID, + "reason": reason, + }).Error(err) + } + p.remove(rp.ID()) +} + +// remove removes an available plugin from the the pool. +// using remove is idempotent. +func (p *pool) remove(id uint32) { + p.Lock() + defer p.Unlock() + delete(p.plugins, id) +} + +// Count returns the number of plugins in the pool +func (p *pool) Count() int { + p.RLock() + defer p.RUnlock() + return len(p.plugins) +} + +// NOTE: The data returned by subscriptions should be constant and read only. +func (p *pool) subscriptions() map[string]*subscription { + p.RLock() + defer p.RUnlock() + return p.subs +} + +// SubscriptionCount returns the number of subscriptions in the pool +func (p *pool) SubscriptionCount() int { + p.RLock() + defer p.RUnlock() + return len(p.subs) +} + +// SelectAP selects an available plugin from the pool +func (p *pool) SelectAP(taskID string) (SelectablePlugin, serror.SnapError) { + p.RLock() + defer p.RUnlock() + + sp := make([]SelectablePlugin, p.Count()) + i := 0 + for _, plg := range p.plugins { + sp[i] = plg + i++ + } + sap, err := p.Select(sp, taskID) + if err != nil || sap == nil { + return nil, serror.New(err) + } + return sap, nil +} + +// generatePID returns the next availble pid for the pool +func (p *pool) generatePID() uint32 { + atomic.AddUint32(&p.pidCounter, 1) + return p.pidCounter +} + +// MoveSubscriptions moves subscriptions to another pool +func (p *pool) MoveSubscriptions(to Pool) []subscription { + var subs []subscription + + p.Lock() + defer p.Unlock() + + for task, sub := range p.subs { + if sub.SubType == UnboundSubscriptionType && to.(*pool).version > p.version { + subs = append(subs, *sub) + to.Subscribe(task, UnboundSubscriptionType) + delete(p.subs, task) + } + } + return subs +} + +// CacheTTL returns the cacheTTL for the pool +func (p *pool) CacheTTL(taskID string) (time.Duration, error) { + if len(p.plugins) == 0 { + return 0, ErrPoolEmpty + } + return p.RoutingAndCaching.CacheTTL(taskID) +} diff --git a/control/strategy/sticky.go b/control/strategy/sticky.go new file mode 100644 index 000000000..120989714 --- /dev/null +++ b/control/strategy/sticky.go @@ -0,0 +1,156 @@ +/* +http://www.apache.org/licenses/LICENSE-2.0.txt + + +Copyright 2015 Intel Corporation + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package strategy + +import ( + "errors" + "fmt" + + log "github.com/Sirupsen/logrus" + "github.com/intelsdi-x/snap/core" +) + +import "time" + +var ( + ErrCacheDoesNotExist = errors.New("cache does not exist") +) + +// sticky provides a stragey that ... concurrency count is 1 +type sticky struct { + plugins map[string]SelectablePlugin + metricCache map[string]*cache + logger *log.Entry + cacheTTL time.Duration +} + +func NewSticky(cacheTTL time.Duration) *sticky { + return &sticky{ + metricCache: make(map[string]*cache), + plugins: make(map[string]SelectablePlugin), + cacheTTL: cacheTTL, + logger: log.WithFields(log.Fields{ + "_module": "control-routing", + }), + } +} + +// Select selects an available plugin using the sticky plugin strategy. +func (s *sticky) Select(spa []SelectablePlugin, taskID string) (SelectablePlugin, error) { + if sp, ok := s.plugins[taskID]; ok && sp != nil { + return sp, nil + } + return s.selectPlugin(spa, taskID) +} + +// Remove selects a plugin and and removes it from the cache +func (s *sticky) Remove(sp []SelectablePlugin, taskID string) (SelectablePlugin, error) { + p, err := s.Select(sp, taskID) + if err != nil { + return nil, err + } + delete(s.metricCache, taskID) + delete(s.plugins, taskID) + return p, nil +} + +// String returns the strategy name. +func (s *sticky) String() string { + return "sticky" +} + +// CacheTTL returns the TTL for the cache. +func (s *sticky) CacheTTL(taskID string) (time.Duration, error) { + return s.cacheTTL, nil +} + +// checkCache checks the cache for metric types. +// returns: +// - array of metrics that need to be collected +// - array of metrics that were returned from the cache +func (s *sticky) CheckCache(mts []core.Metric, taskID string) ([]core.Metric, []core.Metric) { + if _, ok := s.metricCache[taskID]; !ok { + s.metricCache[taskID] = NewCache(s.cacheTTL) + } + return s.metricCache[taskID].checkCache(mts) +} + +// updateCache updates the cache with the given array of metrics. +func (s *sticky) UpdateCache(mts []core.Metric, taskID string) { + if _, ok := s.metricCache[taskID]; !ok { + s.metricCache[taskID] = NewCache(s.cacheTTL) + } + s.metricCache[taskID].updateCache(mts) +} + +// AllCacheHits returns cache hits across all metrics. +func (s *sticky) AllCacheHits() uint64 { + var total uint64 + for _, cache := range s.metricCache { + total += cache.allCacheHits() + } + return total +} + +// AllCacheMisses returns cache misses across all metrics. +func (s *sticky) AllCacheMisses() uint64 { + var total uint64 + for _, cache := range s.metricCache { + total += cache.allCacheMisses() + } + return total +} + +// CacheHits returns the cache hits for a given metric namespace and version. +func (s *sticky) CacheHits(ns string, version int, taskID string) (uint64, error) { + if cache, ok := s.metricCache[taskID]; ok { + return cache.cacheHits(ns, version) + } + return 0, ErrCacheDoesNotExist +} + +// CacheMisses returns the cache misses for a given metric namespace and version. +func (s *sticky) CacheMisses(ns string, version int, taskID string) (uint64, error) { + if cache, ok := s.metricCache[taskID]; ok { + return cache.cacheMisses(ns, version) + } + return 0, ErrCacheDoesNotExist +} + +func (s *sticky) selectPlugin(sp []SelectablePlugin, taskID string) (SelectablePlugin, error) { + for _, p := range sp { + available := true + for _, busyPlugin := range s.plugins { + if p == busyPlugin { + available = false + } + } + if available { + s.plugins[taskID] = p + return p, nil + } + } + s.logger.WithFields(log.Fields{ + "_block": "findAvailablePlugin", + "strategy": s.String(), + "error": fmt.Sprintf("%v of %v plugins are available", len(sp)-len(s.plugins), len(sp)), + }).Error(ErrCouldNotSelect) + return nil, ErrCouldNotSelect +} diff --git a/control/strategy/sticky_test.go b/control/strategy/sticky_test.go new file mode 100644 index 000000000..8159e4e5b --- /dev/null +++ b/control/strategy/sticky_test.go @@ -0,0 +1,102 @@ +/* +http://www.apache.org/licenses/LICENSE-2.0.txt + + +Copyright 2015 Intel Corporation + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package strategy + +import ( + "strings" + "testing" + "time" + + "github.com/intelsdi-x/snap/core" + "github.com/intelsdi-x/snap/core/cdata" + . "github.com/smartystreets/goconvey/convey" +) + +type mockPlugin struct { + name string +} + +func (m *mockPlugin) HitCount() int { return 0 } +func (m *mockPlugin) LastHit() time.Time { return time.Time{} } +func (m *mockPlugin) String() string { return "" } +func (m *mockPlugin) Kill(string) error { return nil } +func (m *mockPlugin) ID() uint32 { return 0 } + +func newMockMetricType(ns string) mockMetricType { + return mockMetricType{ + namespace: strings.Split(ns, "/"), + } +} + +type mockMetricType struct { + namespace []string +} + +func (m mockMetricType) Namespace() []string { return m.namespace } + +func (m mockMetricType) LastAdvertisedTime() time.Time { return time.Now() } + +func (m mockMetricType) Version() int { return 1 } + +func (m mockMetricType) Config() *cdata.ConfigDataNode { return nil } + +func (m mockMetricType) Data() interface{} { return nil } + +func (m mockMetricType) Source() string { return "" } + +func (m mockMetricType) Tags() map[string]string { return nil } + +func (m mockMetricType) Labels() []core.Label { return nil } + +func (m mockMetricType) Timestamp() time.Time { return time.Time{} } + +func TestStickyRouter(t *testing.T) { + Convey("Given a sticky router", t, func() { + router := NewSticky(100 * time.Millisecond) + So(router, ShouldNotBeNil) + So(router.String(), ShouldResemble, "sticky") + Convey("Select a plugin when they are available", func() { + p1 := &mockPlugin{name: "p1"} + p2 := &mockPlugin{name: "p2"} + // select a plugin, for task1, given a task and two available plugins + sp1, err := router.Select([]SelectablePlugin{p1, p2}, "task1") + So(err, ShouldBeNil) + So(sp1, ShouldNotBeNil) + So(sp1, ShouldEqual, p1) + // change the order of the plugins provided to the select + sp2, err := router.Select([]SelectablePlugin{p2, p1}, "task1") + So(err, ShouldBeNil) + So(sp1, ShouldNotBeNil) + So(sp2, ShouldEqual, p1) + // select the other (last) available plugin for task2 + sp3, err := router.Select([]SelectablePlugin{p2, p1}, "task2") + So(err, ShouldBeNil) + So(sp3, ShouldNotBeNil) + So(sp3, ShouldEqual, p2) + Convey("Select a plugin when there are NONE available", func() { + plugins := []SelectablePlugin{p1, p2} + sp, err := router.Select(plugins, "task3") + So(sp, ShouldBeNil) + So(err, ShouldEqual, ErrCouldNotSelect) + }) + }) + + }) +} diff --git a/control/strategy/strategy.go b/control/strategy/strategy.go index 052cd2e10..2445d68fb 100644 --- a/control/strategy/strategy.go +++ b/control/strategy/strategy.go @@ -22,25 +22,33 @@ limitations under the License. package strategy import ( + "errors" "time" "github.com/intelsdi-x/snap/core" ) +var ( + ErrCouldNotSelect = errors.New("could not select a plugin") +) + type SelectablePlugin interface { HitCount() int LastHit() time.Time String() string + Kill(r string) error + ID() uint32 } type RoutingAndCaching interface { - Select([]SelectablePlugin) (SelectablePlugin, error) - CheckCache(mts []core.Metric) ([]core.Metric, []core.Metric) - UpdateCache(mts []core.Metric) - CacheHits(string, int) (uint64, error) - CacheMisses(string, int) (uint64, error) + Select(selectablePlugins []SelectablePlugin, taskID string) (SelectablePlugin, error) + Remove(selectablePlugins []SelectablePlugin, taskID string) (SelectablePlugin, error) + CheckCache(metrics []core.Metric, taskID string) ([]core.Metric, []core.Metric) + UpdateCache(metrics []core.Metric, taskID string) + CacheHits(ns string, ver int, taskID string) (uint64, error) + CacheMisses(ns string, ver int, taskID string) (uint64, error) AllCacheHits() uint64 AllCacheMisses() uint64 - CacheTTL() time.Duration + CacheTTL(taskID string) (time.Duration, error) String() string } diff --git a/mgmt/rest/client/client_func_test.go b/mgmt/rest/client/client_func_test.go index 07e70e11c..5e7ed532b 100644 --- a/mgmt/rest/client/client_func_test.go +++ b/mgmt/rest/client/client_func_test.go @@ -397,7 +397,7 @@ func TestSnapClient(t *testing.T) { }) Convey("event stream", func() { rest.StreamingBufferWindow = 0.01 - sch := &Schedule{Type: "simple", Interval: "500ms"} + sch := &Schedule{Type: "simple", Interval: "100ms"} tf := c.CreateTask(sch, wf, "baron", "", false) type ea struct { diff --git a/plugin/collector/snap-collector-mock1/mock/mock.go b/plugin/collector/snap-collector-mock1/mock/mock.go index 657651d5a..18d6635c0 100644 --- a/plugin/collector/snap-collector-mock1/mock/mock.go +++ b/plugin/collector/snap-collector-mock1/mock/mock.go @@ -68,18 +68,14 @@ func (f *Mock) CollectMetrics(mts []plugin.PluginMetricType) ([]plugin.PluginMet metrics = append(metrics, mt) } } else { - var data string if cv, ok := p.Config().Table()["test"]; ok { - data = fmt.Sprintf("The mock collected data! config data: user=%s password=%s test=%v", p.Config().Table()["user"], p.Config().Table()["password"], cv.(ctypes.ConfigValueBool).Value) + p.Data_ = fmt.Sprintf("The mock collected data! config data: user=%s password=%s test=%v", p.Config().Table()["user"], p.Config().Table()["password"], cv.(ctypes.ConfigValueBool).Value) } else { - data = fmt.Sprintf("The mock collected data! config data: user=%s password=%s", p.Config().Table()["user"], p.Config().Table()["password"]) + p.Data_ = fmt.Sprintf("The mock collected data! config data: user=%s password=%s", p.Config().Table()["user"], p.Config().Table()["password"]) } - mt := plugin.PluginMetricType{ - Data_: data, - Timestamp_: time.Now(), - Source_: hostname, - } - metrics = append(metrics, mt) + p.Timestamp_ = time.Now() + p.Source_ = hostname + metrics = append(metrics, p) } } return metrics, nil diff --git a/plugin/collector/snap-collector-mock2/mock/mock.go b/plugin/collector/snap-collector-mock2/mock/mock.go index 2b1a7c0d9..31b1a5353 100644 --- a/plugin/collector/snap-collector-mock2/mock/mock.go +++ b/plugin/collector/snap-collector-mock2/mock/mock.go @@ -47,7 +47,7 @@ type Mock struct { // CollectMetrics collects metrics for testing func (f *Mock) CollectMetrics(mts []plugin.PluginMetricType) ([]plugin.PluginMetricType, error) { for _, p := range mts { - log.Printf("collecting %+v", p) + log.Printf("collecting %+v\n", p) } rand.Seed(time.Now().UTC().UnixNano()) metrics := []plugin.PluginMetricType{} @@ -117,6 +117,7 @@ func Meta() *plugin.PluginMeta { []string{plugin.SnapGOBContentType}, []string{plugin.SnapGOBContentType}, plugin.CacheTTL(100*time.Millisecond), + plugin.RoutingStrategy(plugin.StickyRouting), ) } diff --git a/scheduler/job.go b/scheduler/job.go index ac12c56b1..bef0fb68f 100644 --- a/scheduler/job.go +++ b/scheduler/job.go @@ -86,6 +86,7 @@ type job interface { StartTime() time.Time Deadline() time.Time Type() jobType + TaskID() string Run() } @@ -93,17 +94,18 @@ type jobType int type coreJob struct { sync.Mutex - + taskID string jtype jobType deadline time.Time starttime time.Time errors []error } -func newCoreJob(t jobType, deadline time.Time) *coreJob { +func newCoreJob(t jobType, deadline time.Time, taskID string) *coreJob { return &coreJob{ jtype: t, deadline: deadline, + taskID: taskID, errors: make([]error, 0), starttime: time.Now(), } @@ -131,6 +133,10 @@ func (c *coreJob) Errors() []error { return c.errors } +func (c *coreJob) TaskID() string { + return c.taskID +} + type collectorJob struct { *coreJob collector collectsMetrics @@ -139,12 +145,12 @@ type collectorJob struct { configDataTree *cdata.ConfigDataTree } -func newCollectorJob(metricTypes []core.RequestedMetric, deadlineDuration time.Duration, collector collectsMetrics, cdt *cdata.ConfigDataTree) job { +func newCollectorJob(metricTypes []core.RequestedMetric, deadlineDuration time.Duration, collector collectsMetrics, cdt *cdata.ConfigDataTree, taskID string) job { return &collectorJob{ collector: collector, metricTypes: metricTypes, metrics: []core.Metric{}, - coreJob: newCoreJob(collectJobType, time.Now().Add(deadlineDuration)), + coreJob: newCoreJob(collectJobType, time.Now().Add(deadlineDuration), taskID), configDataTree: cdt, } } @@ -193,7 +199,7 @@ func (c *collectorJob) Run() { config: config, } } - ret, errs := c.collector.CollectMetrics(metrics, c.Deadline()) + ret, errs := c.collector.CollectMetrics(metrics, c.Deadline(), c.TaskID()) log.WithFields(log.Fields{ "_module": "scheduler-job", @@ -228,13 +234,13 @@ type processJob struct { content []byte } -func newProcessJob(parentJob job, pluginName string, pluginVersion int, contentType string, config map[string]ctypes.ConfigValue, processor processesMetrics) job { +func newProcessJob(parentJob job, pluginName string, pluginVersion int, contentType string, config map[string]ctypes.ConfigValue, processor processesMetrics, taskID string) job { return &processJob{ parentJob: parentJob, pluginName: pluginName, pluginVersion: pluginVersion, metrics: []core.Metric{}, - coreJob: newCoreJob(processJobType, parentJob.Deadline()), + coreJob: newCoreJob(processJobType, parentJob.Deadline(), taskID), config: config, processor: processor, contentType: contentType, @@ -269,7 +275,7 @@ func (p *processJob) Run() { } } enc.Encode(metrics) - _, content, errs := p.processor.ProcessMetrics(p.contentType, buf.Bytes(), p.pluginName, p.pluginVersion, p.config) + _, content, errs := p.processor.ProcessMetrics(p.contentType, buf.Bytes(), p.pluginName, p.pluginVersion, p.config, p.taskID) if errs != nil { for _, e := range errs { log.WithFields(log.Fields{ @@ -323,13 +329,13 @@ type publisherJob struct { contentType string } -func newPublishJob(parentJob job, pluginName string, pluginVersion int, contentType string, config map[string]ctypes.ConfigValue, publisher publishesMetrics) job { +func newPublishJob(parentJob job, pluginName string, pluginVersion int, contentType string, config map[string]ctypes.ConfigValue, publisher publishesMetrics, taskID string) job { return &publisherJob{ parentJob: parentJob, publisher: publisher, pluginName: pluginName, pluginVersion: pluginVersion, - coreJob: newCoreJob(publishJobType, parentJob.Deadline()), + coreJob: newCoreJob(publishJobType, parentJob.Deadline(), taskID), config: config, contentType: contentType, } @@ -362,7 +368,7 @@ func (p *publisherJob) Run() { } } enc.Encode(metrics) - errs := p.publisher.PublishMetrics(p.contentType, buf.Bytes(), p.pluginName, p.pluginVersion, p.config) + errs := p.publisher.PublishMetrics(p.contentType, buf.Bytes(), p.pluginName, p.pluginVersion, p.config, p.taskID) if errs != nil { for _, e := range errs { log.WithFields(log.Fields{ @@ -393,7 +399,7 @@ func (p *publisherJob) Run() { case processJobType: switch p.contentType { case plugin.SnapGOBContentType: - errs := p.publisher.PublishMetrics(p.contentType, p.parentJob.(*processJob).content, p.pluginName, p.pluginVersion, p.config) + errs := p.publisher.PublishMetrics(p.contentType, p.parentJob.(*processJob).content, p.pluginName, p.pluginVersion, p.config, p.taskID) if errs != nil { for _, e := range errs { log.WithFields(log.Fields{ diff --git a/scheduler/job_test.go b/scheduler/job_test.go index e7861a332..9064705a5 100644 --- a/scheduler/job_test.go +++ b/scheduler/job_test.go @@ -33,7 +33,7 @@ import ( type mockCollector struct{} -func (m *mockCollector) CollectMetrics([]core.Metric, time.Time) ([]core.Metric, []error) { +func (m *mockCollector) CollectMetrics([]core.Metric, time.Time, string) ([]core.Metric, []error) { return nil, nil } @@ -42,43 +42,37 @@ func TestCollectorJob(t *testing.T) { cdt := cdata.NewTree() Convey("newCollectorJob()", t, func() { Convey("it returns an init-ed collectorJob", func() { - cj := newCollectorJob([]core.RequestedMetric{}, defaultDeadline, &mockCollector{}, cdt) + cj := newCollectorJob([]core.RequestedMetric{}, defaultDeadline, &mockCollector{}, cdt, "taskid") So(cj, ShouldHaveSameTypeAs, &collectorJob{}) }) }) Convey("StartTime()", t, func() { Convey("it should return the job starttime", func() { - cj := newCollectorJob([]core.RequestedMetric{}, defaultDeadline, &mockCollector{}, cdt) + cj := newCollectorJob([]core.RequestedMetric{}, defaultDeadline, &mockCollector{}, cdt, "taskid") So(cj.StartTime(), ShouldHaveSameTypeAs, time.Now()) }) }) Convey("Deadline()", t, func() { Convey("it should return the job daedline", func() { - cj := newCollectorJob([]core.RequestedMetric{}, defaultDeadline, &mockCollector{}, cdt) + cj := newCollectorJob([]core.RequestedMetric{}, defaultDeadline, &mockCollector{}, cdt, "taskid") So(cj.Deadline(), ShouldResemble, cj.(*collectorJob).deadline) }) }) Convey("Type()", t, func() { Convey("it should return the job type", func() { - cj := newCollectorJob([]core.RequestedMetric{}, defaultDeadline, &mockCollector{}, cdt) + cj := newCollectorJob([]core.RequestedMetric{}, defaultDeadline, &mockCollector{}, cdt, "taskid") So(cj.Type(), ShouldEqual, collectJobType) }) }) - // Convey("Metrics()", t, func() { - // Convey("it should return the job metrics", func() { - // cj := newCollectorJob([]core.MetricType{}, defaultDeadline, &mockCollector{}) - // So(cj.Metrics(), ShouldResemble, []core.Metric{}) - // }) - // }) Convey("Errors()", t, func() { Convey("it should return the errors from the job", func() { - cj := newCollectorJob([]core.RequestedMetric{}, defaultDeadline, &mockCollector{}, cdt) + cj := newCollectorJob([]core.RequestedMetric{}, defaultDeadline, &mockCollector{}, cdt, "taskid") So(cj.Errors(), ShouldResemble, []error{}) }) }) Convey("AddErrors()", t, func() { Convey("it should append errors to the job", func() { - cj := newCollectorJob([]core.RequestedMetric{}, defaultDeadline, &mockCollector{}, cdt) + cj := newCollectorJob([]core.RequestedMetric{}, defaultDeadline, &mockCollector{}, cdt, "taskid") So(cj.Errors(), ShouldResemble, []error{}) e1 := errors.New("1") @@ -93,7 +87,7 @@ func TestCollectorJob(t *testing.T) { }) Convey("Run()", t, func() { Convey("it should complete without errors", func() { - cj := newCollectorJob([]core.RequestedMetric{}, defaultDeadline, &mockCollector{}, cdt) + cj := newCollectorJob([]core.RequestedMetric{}, defaultDeadline, &mockCollector{}, cdt, "taskid") cj.(*collectorJob).Run() So(cj.Errors(), ShouldResemble, []error{}) }) @@ -105,14 +99,14 @@ func TestQueuedJob(t *testing.T) { cdt := cdata.NewTree() Convey("Job()", t, func() { Convey("it should return the underlying job", func() { - cj := newCollectorJob([]core.RequestedMetric{}, defaultDeadline, &mockCollector{}, cdt) + cj := newCollectorJob([]core.RequestedMetric{}, defaultDeadline, &mockCollector{}, cdt, "taskid") qj := newQueuedJob(cj) So(qj.Job(), ShouldEqual, cj) }) }) Convey("Promise()", t, func() { Convey("it should return the underlying promise", func() { - cj := newCollectorJob([]core.RequestedMetric{}, defaultDeadline, &mockCollector{}, cdt) + cj := newCollectorJob([]core.RequestedMetric{}, defaultDeadline, &mockCollector{}, cdt, "taskid") qj := newQueuedJob(cj) So(qj.Promise().IsComplete(), ShouldBeFalse) }) diff --git a/scheduler/scheduler.go b/scheduler/scheduler.go index ddd718347..3b17131a4 100644 --- a/scheduler/scheduler.go +++ b/scheduler/scheduler.go @@ -77,15 +77,15 @@ type managesPluginContentTypes interface { } type collectsMetrics interface { - CollectMetrics([]core.Metric, time.Time) ([]core.Metric, []error) + CollectMetrics([]core.Metric, time.Time, string) ([]core.Metric, []error) } type publishesMetrics interface { - PublishMetrics(contentType string, content []byte, pluginName string, pluginVersion int, config map[string]ctypes.ConfigValue) []error + PublishMetrics(contentType string, content []byte, pluginName string, pluginVersion int, config map[string]ctypes.ConfigValue, taskID string) []error } type processesMetrics interface { - ProcessMetrics(contentType string, content []byte, pluginName string, pluginVersion int, config map[string]ctypes.ConfigValue) (string, []byte, []error) + ProcessMetrics(contentType string, content []byte, pluginName string, pluginVersion int, config map[string]ctypes.ConfigValue, taskID string) (string, []byte, []error) } type scheduler struct { diff --git a/scheduler/scheduler_test.go b/scheduler/scheduler_test.go index 7c081f416..922c1fbc6 100644 --- a/scheduler/scheduler_test.go +++ b/scheduler/scheduler_test.go @@ -79,15 +79,15 @@ func (m *mockMetricManager) GetPluginContentTypes(n string, t core.PluginType, v return m.acceptedContentTypes[key], m.returnedContentTypes[key], nil } -func (m *mockMetricManager) CollectMetrics([]core.Metric, time.Time) ([]core.Metric, []error) { +func (m *mockMetricManager) CollectMetrics([]core.Metric, time.Time, string) ([]core.Metric, []error) { return nil, nil } -func (m *mockMetricManager) PublishMetrics(contentType string, content []byte, pluginName string, pluginVersion int, config map[string]ctypes.ConfigValue) []error { +func (m *mockMetricManager) PublishMetrics(contentType string, content []byte, pluginName string, pluginVersion int, config map[string]ctypes.ConfigValue, taskID string) []error { return nil } -func (m *mockMetricManager) ProcessMetrics(contentType string, content []byte, pluginName string, pluginVersion int, config map[string]ctypes.ConfigValue) (string, []byte, []error) { +func (m *mockMetricManager) ProcessMetrics(contentType string, content []byte, pluginName string, pluginVersion int, config map[string]ctypes.ConfigValue, taskID string) (string, []byte, []error) { return "", nil, nil } diff --git a/scheduler/work_manager_test.go b/scheduler/work_manager_test.go index b1765f72f..ffaf0c907 100644 --- a/scheduler/work_manager_test.go +++ b/scheduler/work_manager_test.go @@ -81,6 +81,7 @@ func (mj *mockJob) Errors() []error { return mj.errors } func (mj *mockJob) StartTime() time.Time { return mj.starttime } func (mj *mockJob) Deadline() time.Time { return mj.deadline } func (mj *mockJob) Type() jobType { return collectJobType } +func (mj *mockJob) TaskID() string { return "" } // Complete the first incomplete rendez-vous (if there is one) func (mj *mockJob) RendezVous() { diff --git a/scheduler/workflow.go b/scheduler/workflow.go index b2d0c3c11..bbb4fd972 100644 --- a/scheduler/workflow.go +++ b/scheduler/workflow.go @@ -299,7 +299,7 @@ func bindPluginContentTypes(pus []*publishNode, prs []*processNode, mm managesPl // Start starts a workflow func (s *schedulerWorkflow) Start(t *task) { s.state = WorkflowStarted - j := newCollectorJob(s.metrics, t.deadlineDuration, t.metricsManager, t.workflow.configTree) + j := newCollectorJob(s.metrics, t.deadlineDuration, t.metricsManager, t.workflow.configTree, t.id) // dispatch 'collect' job to be worked // Block until the job has been either run or skipped. @@ -336,7 +336,7 @@ func (s *schedulerWorkflow) StateString() string { func (s *schedulerWorkflow) workJobs(prs []*processNode, pus []*publishNode, t *task, pj job) { for _, pr := range prs { - j := newProcessJob(pj, pr.Name(), pr.Version(), pr.InboundContentType, pr.config.Table(), t.metricsManager) + j := newProcessJob(pj, pr.Name(), pr.Version(), pr.InboundContentType, pr.config.Table(), t.metricsManager, t.id) errors := t.manager.Work(j).Promise().Await() if len(errors) != 0 { t.failedRuns++ @@ -348,7 +348,7 @@ func (s *schedulerWorkflow) workJobs(prs []*processNode, pus []*publishNode, t * s.workJobs(pr.ProcessNodes, pr.PublishNodes, t, j) } for _, pu := range pus { - j := newPublishJob(pj, pu.Name(), pu.Version(), pu.InboundContentType, pu.config.Table(), t.metricsManager) + j := newPublishJob(pj, pu.Name(), pu.Version(), pu.InboundContentType, pu.config.Table(), t.metricsManager, t.id) errors := t.manager.Work(j).Promise().Await() if len(errors) != 0 { t.failedRuns++