diff --git a/control/available_plugin.go b/control/available_plugin.go index 417f827ff..1a714545b 100644 --- a/control/available_plugin.go +++ b/control/available_plugin.go @@ -36,8 +36,10 @@ import ( "github.com/intelsdi-x/gomit" "github.com/intelsdi-x/snap/control/plugin" "github.com/intelsdi-x/snap/control/plugin/client" - "github.com/intelsdi-x/snap/control/routing" + "github.com/intelsdi-x/snap/control/strategy" + "github.com/intelsdi-x/snap/core" "github.com/intelsdi-x/snap/core/control_event" + "github.com/intelsdi-x/snap/core/ctypes" "github.com/intelsdi-x/snap/core/serror" ) @@ -51,12 +53,11 @@ const ( ) var ( - // ErrPoolNotFound - error message when the plugin pool not found ErrPoolNotFound = errors.New("plugin pool not found") - // ErrBadKey - error message when a bad key used - ErrBadKey = errors.New("bad key") - // ErrBadType - error message when a bad plugin type used - ErrBadType = errors.New("bad plugin type") + ErrPoolEmpty = errors.New("plugin pool is empty") + ErrBadKey = errors.New("bad key") + ErrBadType = errors.New("bad plugin type") + ErrBadStrategy = errors.New("bad strategy") // This defines the maximum running instances of a loaded plugin. // It is initialized at runtime via the cli. @@ -307,6 +308,9 @@ type apPool struct { // The number of subscriptions per running instance concurrencyCount int + + // The routing and caching strategy declared by the plugin. + strategy Strategy } func newPool(key string, plugins ...*availablePlugin) (*apPool, error) { @@ -327,20 +331,8 @@ func newPool(key string, plugins ...*availablePlugin) (*apPool, error) { if len(plugins) > 0 { for _, plg := range plugins { - plg.id = p.generatePID() - p.plugins[plg.id] = plg - } - // Because plugin metadata is a singleton and immutable (in static code) - // it is safe to take the first item. Reloading an identical plugin - // with new metadata is protected by plugin loading. - - // Checking if plugin is exclusive - // (only one instance should be running). - if plugins[0].meta.Exclusive { - p.max = 1 + p.insert(plg) } - // set concurrency count - p.concurrencyCount = plugins[0].meta.ConcurrencyCount } return p, nil @@ -350,17 +342,46 @@ func (p *apPool) insert(ap *availablePlugin) error { if ap.pluginType != plugin.CollectorPluginType && ap.pluginType != plugin.ProcessorPluginType && ap.pluginType != plugin.PublisherPluginType { return ErrBadType } - ap.id = p.generatePID() - p.plugins[ap.id] = ap - // If an empty pool is created, it does not have // any available plugins from which to retrieve // concurrency count or exclusivity. We ensure it // is set correctly on an insert. + if len(p.plugins) == 0 { + if err := p.applyPluginMeta(ap); err != nil { + return err + } + } + + ap.id = p.generatePID() + p.plugins[ap.id] = ap + + return nil +} + +func (p *apPool) applyPluginMeta(ap *availablePlugin) error { + // Checking if plugin is exclusive + // (only one instance should be running). if ap.meta.Exclusive { p.max = 1 } + + // Set the cache TTL + cacheTTL := strategy.GlobalCacheExpiration + if ap.meta.CacheTTL != 0 { + cacheTTL = ap.meta.CacheTTL + } + + // Set the routing and caching strategy + switch ap.meta.RoutingStrategy { + case plugin.DefaultRouting: + p.strategy = strategy.NewLRU(cacheTTL) + default: + return ErrBadStrategy + } + + // set concurrency count p.concurrencyCount = ap.meta.ConcurrencyCount + return nil } @@ -484,17 +505,17 @@ func (p *apPool) subscriptionCount() int { return len(p.subs) } -func (p *apPool) selectAP(strat RoutingStrategy) (*availablePlugin, serror.SnapError) { +func (p *apPool) selectAP() (*availablePlugin, serror.SnapError) { p.RLock() defer p.RUnlock() - sp := make([]routing.SelectablePlugin, p.count()) + sp := make([]strategy.SelectablePlugin, p.count()) i := 0 for _, plg := range p.plugins { sp[i] = plg i++ } - sap, err := strat.Select(p, sp) + sap, err := p.strategy.Select(sp) if err != nil || sap == nil { return nil, serror.New(err) } @@ -506,10 +527,6 @@ func (p *apPool) generatePID() uint32 { return p.pidCounter } -func (p *apPool) release() { - p.RUnlock() -} - func (p *apPool) moveSubscriptions(to *apPool) []subscription { var subs []subscription @@ -532,24 +549,58 @@ type subscription struct { taskID string } +func (p *apPool) CheckCache(mts []core.Metric) ([]core.Metric, []core.Metric) { + return p.strategy.CheckCache(mts) +} + +func (p *apPool) UpdateCache(mts []core.Metric) { + p.strategy.UpdateCache(mts) +} + +func (p *apPool) CacheHits(ns string, ver int) (uint64, error) { + return p.strategy.CacheHits(ns, ver) +} + +func (p *apPool) CacheMisses(ns string, ver int) (uint64, error) { + return p.strategy.CacheMisses(ns, ver) +} +func (p *apPool) AllCacheHits() uint64 { + return p.strategy.AllCacheHits() +} + +func (p *apPool) AllCacheMisses() uint64 { + return p.strategy.AllCacheMisses() +} + +func (p *apPool) CacheTTL() (time.Duration, error) { + if len(p.plugins) == 0 { + return 0, ErrPoolEmpty + } + return p.strategy.CacheTTL(), nil +} + type availablePlugins struct { // Used to coordinate operations on the table. *sync.RWMutex - - // the strategy used to select a plugin for execution - routingStrategy RoutingStrategy - // table holds all the plugin pools. // The Pools' primary keys are equal to // {plugin_type}:{plugin_name}:{plugin_version} table map[string]*apPool } -func newAvailablePlugins(routingStrategy RoutingStrategy) *availablePlugins { +type requestedMetrics struct { + plugin *availablePlugin + metricTypes []core.Metric +} + +type selectsAvailablePlugins interface { + get(metricTypes []core.Metric) (*requestedMetrics, serror.SnapError) +} + +func newAvailablePlugins() *availablePlugins { return &availablePlugins{ - RWMutex: &sync.RWMutex{}, - table: make(map[string]*apPool), - routingStrategy: routingStrategy, + RWMutex: &sync.RWMutex{}, + table: make(map[string]*apPool), } } @@ -606,16 +657,127 @@ func (ap *availablePlugins) getPool(key string) (*apPool, serror.SnapError) { return pool, nil } -func (ap *availablePlugins) holdPool(key string) (*apPool, serror.SnapError) { - pool, err := ap.getPool(key) +func (ap *availablePlugins) collectMetrics(pluginKey string, metricTypes []core.Metric) ([]core.Metric, error) { + var results []core.Metric + pool, serr := ap.getPool(pluginKey) + if serr != nil { + return nil, serr + } + if pool == nil { + return nil, serror.New(ErrPoolNotFound, map[string]interface{}{"pool-key": pluginKey}) + } + + metricsToCollect, metricsFromCache := pool.CheckCache(metricTypes) + + if len(metricsToCollect) == 0 { + return metricsFromCache, nil + } + + pool.RLock() + defer pool.RUnlock() + p, serr := pool.selectAP() + if serr != nil { + return nil, serr + } + + // cast client to PluginCollectorClient + cli, ok := p.client.(client.PluginCollectorClient) + if !ok { + return nil, serror.New(errors.New("unable to cast client to PluginCollectorClient")) + } + + // get a metrics + metrics, err := cli.CollectMetrics(metricsToCollect) if err != nil { - return nil, err + return nil, serror.New(err) } - if pool != nil { - pool.RLock() + pool.UpdateCache(metrics) + + results = make([]core.Metric, len(metricsFromCache)+len(metrics)) + idx := 0 + for _, m := range metrics { + results[idx] = m + idx++ } - return pool, nil + for _, m := range metricsFromCache { + results[idx] = m + idx++ + } + + // update statics about plugin + p.hitCount++ + p.lastHitTime = time.Now() + + return metrics, nil +} + +func (ap *availablePlugins) publishMetrics(contentType string, content []byte, pluginName string, pluginVersion int, config map[string]ctypes.ConfigValue) []error { + var errs []error + key := strings.Join([]string{plugin.PublisherPluginType.String(), pluginName, strconv.Itoa(pluginVersion)}, ":") + pool, serr := ap.getPool(key) + if serr != nil { + errs = append(errs, serr) + return errs + } + if pool == nil { + return []error{serror.New(ErrPoolNotFound, map[string]interface{}{"pool-key": key})} + } + + pool.RLock() + defer pool.RUnlock() + p, err := pool.selectAP() + if err != nil { + errs = append(errs, err) + return errs + } + + cli, ok := p.client.(client.PluginPublisherClient) + if !ok { + return []error{errors.New("unable to cast client to PluginPublisherClient")} + } + + errp := cli.Publish(contentType, content, config) + if errp != nil { + return []error{errp} + } + p.hitCount++ + p.lastHitTime = time.Now() + return nil +} + +func (ap *availablePlugins) ProcessMetrics(contentType string, content []byte, pluginName string, pluginVersion int, config map[string]ctypes.ConfigValue) (string, []byte, []error) { + var errs []error + key := strings.Join([]string{plugin.ProcessorPluginType.String(), pluginName, strconv.Itoa(pluginVersion)}, ":") + pool, serr := ap.getPool(key) + if serr != nil { + errs = append(errs, serr) + return "", nil, errs + } + if pool == nil { + return "", nil, []error{serror.New(ErrPoolNotFound, map[string]interface{}{"pool-key": key})} + } + + pool.RLock() + defer pool.RUnlock() + p, err := pool.selectAP() + if err != nil { + errs = append(errs, err) + return "", nil, errs + } + + cli, ok := p.client.(client.PluginProcessorClient) + if !ok { + return "", nil, []error{errors.New("unable to cast client to PluginProcessorClient")} + } + + ct, c, errp := cli.Process(contentType, content, config) + if errp != nil { + return "", nil, []error{errp} + } + p.hitCount++ + p.lastHitTime = time.Now() + return ct, c, nil } func (ap *availablePlugins) findLatestPool(pType, name string) (*apPool, serror.SnapError) { @@ -664,7 +826,7 @@ func (ap *availablePlugins) selectAP(key string) (*availablePlugin, serror.SnapE return nil, err } - return pool.selectAP(ap.routingStrategy) + return pool.selectAP() } func (ap *availablePlugins) pools() map[string]*apPool { diff --git a/control/available_plugin_test.go b/control/available_plugin_test.go index 13cc46f50..165719ddf 100644 --- a/control/available_plugin_test.go +++ b/control/available_plugin_test.go @@ -25,7 +25,6 @@ import ( "testing" "github.com/intelsdi-x/snap/control/plugin" - "github.com/intelsdi-x/snap/control/routing" . "github.com/smartystreets/goconvey/convey" ) @@ -50,7 +49,7 @@ func TestAvailablePlugin(t *testing.T) { Convey("Stop()", t, func() { Convey("returns nil if plugin successfully stopped", func() { - r := newRunner(&routing.RoundRobinStrategy{}) + r := newRunner() a := plugin.Arg{ PluginLogPath: "/tmp/snap-test-plugin-stop.log", } @@ -68,13 +67,13 @@ func TestAvailablePlugin(t *testing.T) { func TestAvailablePlugins(t *testing.T) { Convey("newAvailablePlugins()", t, func() { Convey("returns a pointer to an availablePlugins struct", func() { - aps := newAvailablePlugins(&routing.RoundRobinStrategy{}) + aps := newAvailablePlugins() So(aps, ShouldHaveSameTypeAs, new(availablePlugins)) }) }) Convey("insert()", t, func() { Convey("adds a collector into the collectors collection", func() { - aps := newAvailablePlugins(&routing.RoundRobinStrategy{}) + aps := newAvailablePlugins() ap := &availablePlugin{ pluginType: plugin.CollectorPluginType, name: "test", @@ -90,7 +89,7 @@ func TestAvailablePlugins(t *testing.T) { So(nap, ShouldEqual, ap) }) Convey("returns an error if an unknown plugin type is given", func() { - aps := newAvailablePlugins(&routing.RoundRobinStrategy{}) + aps := newAvailablePlugins() ap := &availablePlugin{ pluginType: 99, name: "test", diff --git a/control/control.go b/control/control.go index 311127854..46d9240a7 100644 --- a/control/control.go +++ b/control/control.go @@ -27,8 +27,6 @@ import ( "os" "path" "path/filepath" - "strconv" - "strings" "sync" "time" @@ -36,8 +34,7 @@ import ( "github.com/intelsdi-x/gomit" "github.com/intelsdi-x/snap/control/plugin" - "github.com/intelsdi-x/snap/control/plugin/client" - "github.com/intelsdi-x/snap/control/routing" + "github.com/intelsdi-x/snap/control/strategy" "github.com/intelsdi-x/snap/core" "github.com/intelsdi-x/snap/core/cdata" "github.com/intelsdi-x/snap/core/control_event" @@ -95,8 +92,6 @@ type runsPlugins interface { SetEmitter(gomit.Emitter) SetMetricCatalog(catalogsMetrics) SetPluginManager(managesPlugins) - SetStrategy(RoutingStrategy) - Strategy() RoutingStrategy Monitor() *monitor runPlugin(*pluginDetails) error } @@ -143,7 +138,7 @@ func MaxRunningPlugins(m int) PluginControlOpt { // CacheExpiration is the PluginControlOpt which sets the global metric cache TTL func CacheExpiration(t time.Duration) PluginControlOpt { return func(c *pluginControl) { - client.GlobalCacheExpiration = t + strategy.GlobalCacheExpiration = t } } @@ -191,7 +186,7 @@ func New(opts ...PluginControlOpt) *pluginControl { // Plugin Runner // TODO (danielscottt): handle routing strat changes via events - c.pluginRunner = newRunner(&routing.RoundRobinStrategy{}) + c.pluginRunner = newRunner() controlLogger.WithFields(log.Fields{ "_block": "new", }).Debug("runner created") @@ -199,7 +194,6 @@ func New(opts ...PluginControlOpt) *pluginControl { c.pluginRunner.SetEmitter(c.eventManager) // emitter is passed to created availablePlugins c.pluginRunner.SetMetricCatalog(c.metricCatalog) c.pluginRunner.SetPluginManager(c.pluginManager) - c.pluginRunner.SetStrategy(&routing.RoundRobinStrategy{}) // Wire event manager @@ -846,7 +840,6 @@ func (p *pluginControl) MetricExists(mns []string, ver int) bool { // of metrics and errors. If an error is encountered no metrics will be // returned. func (p *pluginControl) CollectMetrics(metricTypes []core.Metric, deadline time.Time) (metrics []core.Metric, errs []error) { - pluginToMetricMap, err := groupMetricTypesByPlugin(p.metricCatalog, metricTypes) if err != nil { errs = append(errs, err) @@ -859,55 +852,24 @@ func (p *pluginControl) CollectMetrics(metricTypes []core.Metric, deadline time. // For each available plugin call available plugin using RPC client and wait for response (goroutines) for pluginKey, pmt := range pluginToMetricMap { - - // retrieve an available plugin - pool, err := p.pluginRunner.AvailablePlugins().holdPool(pluginKey) - if err != nil { - errs = append(errs, err) - continue - } - if pool != nil { - defer pool.release() - - ap, err := pool.selectAP(p.pluginRunner.Strategy()) - if err != nil { - errs = append(errs, err) - continue - } - - // cast client to PluginCollectorClient - cli, ok := ap.client.(client.PluginCollectorClient) - if !ok { - err := errors.New("unable to cast client to PluginCollectorClient") - errs = append(errs, err) - continue + // merge global plugin config into the config for the metric + for _, mt := range pmt.metricTypes { + if mt.Config() != nil { + mt.Config().Merge(p.Config.Plugins.getPluginConfigDataNode(core.CollectorPluginType, pmt.plugin.Name(), pmt.plugin.Version())) } + } - wg.Add(1) + wg.Add(1) - // merge global plugin config into the config for the metric - for _, mt := range pmt.metricTypes { - if mt.Config() != nil { - mt.Config().Merge(p.Config.Plugins.getPluginConfigDataNode(core.CollectorPluginType, ap.Name(), ap.Version())) - } + // get a metrics + go func(pluginKey string, mt []core.Metric) { + mts, err := p.pluginRunner.AvailablePlugins().collectMetrics(pluginKey, mt) + if err != nil { + cError <- err + } else { + cMetrics <- mts } - - // get a metrics - go func(mt []core.Metric) { - mts, err := cli.CollectMetrics(mt) - if err != nil { - cError <- err - } else { - cMetrics <- mts - } - }(pmt.metricTypes) - - // update statics about plugin - ap.hitCount++ - ap.lastHitTime = time.Now() - } else { - errs = append(errs, fmt.Errorf("pool not found for plugin key: %s", pluginKey)) - } + }(pluginKey, pmt.metricTypes) } go func() { @@ -936,86 +898,22 @@ func (p *pluginControl) CollectMetrics(metricTypes []core.Metric, deadline time. // PublishMetrics func (p *pluginControl) PublishMetrics(contentType string, content []byte, pluginName string, pluginVersion int, config map[string]ctypes.ConfigValue) []error { - var errs []error - key := strings.Join([]string{"publisher", pluginName, strconv.Itoa(pluginVersion)}, ":") - - // retrieve an available plugin - pool, err := p.pluginRunner.AvailablePlugins().holdPool(key) - if err != nil { - errs = append(errs, err) - return errs + // merge global plugin config into the config for this request + cfg := p.Config.Plugins.getPluginConfigDataNode(core.PublisherPluginType, pluginName, pluginVersion).Table() + for k, v := range config { + cfg[k] = v } - if pool != nil { - defer pool.release() - - ap, err := pool.selectAP(p.pluginRunner.Strategy()) - if err != nil { - errs = append(errs, err) - return errs - } - - cli, ok := ap.client.(client.PluginPublisherClient) - if !ok { - return []error{errors.New("unable to cast client to PluginPublisherClient")} - } - - // merge global plugin config into the config for this request - cfg := p.Config.Plugins.getPluginConfigDataNode(core.PublisherPluginType, ap.Name(), ap.Version()).Table() - for k, v := range config { - cfg[k] = v - } - - errp := cli.Publish(contentType, content, cfg) - if errp != nil { - return []error{errp} - } - ap.hitCount++ - ap.lastHitTime = time.Now() - return nil - } - return []error{errors.New("pool not found")} + return p.pluginRunner.AvailablePlugins().publishMetrics(contentType, content, pluginName, pluginVersion, cfg) } // ProcessMetrics func (p *pluginControl) ProcessMetrics(contentType string, content []byte, pluginName string, pluginVersion int, config map[string]ctypes.ConfigValue) (string, []byte, []error) { - var errs []error - key := strings.Join([]string{"processor", pluginName, strconv.Itoa(pluginVersion)}, ":") - - // retrieve an available plugin - pool, err := p.pluginRunner.AvailablePlugins().holdPool(key) - if err != nil { - errs = append(errs, err) - return "", nil, errs - } - if pool != nil { - defer pool.release() - - ap, err := pool.selectAP(p.pluginRunner.Strategy()) - if err != nil { - errs = append(errs, err) - return "", nil, errs - } - - cli, ok := ap.client.(client.PluginProcessorClient) - if !ok { - return "", nil, []error{errors.New("unable to cast client to PluginProcessorClient")} - } - - // merge global plugin config into the config for this request - cfg := p.Config.Plugins.getPluginConfigDataNode(core.ProcessorPluginType, ap.Name(), ap.Version()).Table() - for k, v := range config { - cfg[k] = v - } - - ct, c, errp := cli.Process(contentType, content, cfg) - if errp != nil { - return "", nil, []error{errp} - } - ap.hitCount++ - ap.lastHitTime = time.Now() - return ct, c, nil + // merge global plugin config into the config for this request + cfg := p.Config.Plugins.getPluginConfigDataNode(core.ProcessorPluginType, pluginName, pluginVersion).Table() + for k, v := range config { + cfg[k] = v } - return "", nil, []error{errors.New("pool not found")} + return p.pluginRunner.AvailablePlugins().ProcessMetrics(contentType, content, pluginName, pluginVersion, cfg) } // GetPluginContentTypes returns accepted and returned content types for the @@ -1077,29 +975,30 @@ func (p *pluginMetricTypes) Count() int { } // groupMetricTypesByPlugin groups metricTypes by a plugin.Key() and returns appropriate structure -func groupMetricTypesByPlugin(cat catalogsMetrics, metricTypes []core.Metric) (map[string]pluginMetricTypes, error) { +func groupMetricTypesByPlugin(cat catalogsMetrics, metricTypes []core.Metric) (map[string]pluginMetricTypes, serror.SnapError) { pmts := make(map[string]pluginMetricTypes) // For each plugin type select a matching available plugin to call for _, mt := range metricTypes { + version := mt.Version() + if version == 0 { + // If the version is not provided we will choose the latest + version = -1 + } - // This is set to choose the newest and not pin version. TODO, be sure version is set to -1 if not provided by user on Task creation. - lp, err := cat.GetPlugin(mt.Namespace(), -1) + lp, err := cat.GetPlugin(mt.Namespace(), version) if err != nil { - return nil, err + return nil, serror.New(err) } // if loaded plugin is nil, we have failed. return error if lp == nil { - return nil, errorMetricNotFound(mt.Namespace()) + return nil, serror.New(errorMetricNotFound(mt.Namespace())) } key := lp.Key() - - // pmt, _ := pmts[key] pmt.plugin = lp pmt.metricTypes = append(pmt.metricTypes, mt) pmts[key] = pmt - } return pmts, nil } diff --git a/control/control_test.go b/control/control_test.go index 5325272ee..cc99168f8 100644 --- a/control/control_test.go +++ b/control/control_test.go @@ -35,6 +35,7 @@ import ( "github.com/intelsdi-x/gomit" "github.com/intelsdi-x/snap/control/plugin" "github.com/intelsdi-x/snap/control/plugin/cpolicy" + "github.com/intelsdi-x/snap/control/strategy" "github.com/intelsdi-x/snap/core" "github.com/intelsdi-x/snap/core/cdata" "github.com/intelsdi-x/snap/core/control_event" @@ -829,6 +830,7 @@ func TestCollectDynamicMetrics(t *testing.T) { config.Plugins.All.AddItem("password", ctypes.ConfigValueStr{Value: "testval"}) c := New(OptSetConfig(config), CacheExpiration(time.Second*1)) c.Start() + So(strategy.GlobalCacheExpiration, ShouldResemble, time.Second*1) lpe := newListenToPluginEvent() c.eventManager.RegisterHandler("Control.PluginLoaded", lpe) _, e := load(c, PluginPath) @@ -871,17 +873,23 @@ func TestCollectDynamicMetrics(t *testing.T) { pool, errp := c.pluginRunner.AvailablePlugins().getOrCreatePool("collector:mock:2") So(errp, ShouldBeNil) So(pool, ShouldNotBeNil) + ttl, err := pool.CacheTTL() + So(err, ShouldResemble, ErrPoolEmpty) + So(ttl, ShouldEqual, 0) pool.subscribe("1", unboundSubscriptionType) err = c.pluginRunner.runPlugin(lp.Details) So(err, ShouldBeNil) + ttl, err = pool.CacheTTL() + So(err, ShouldBeNil) + So(ttl, ShouldEqual, strategy.GlobalCacheExpiration) mts, errs := c.CollectMetrics([]core.Metric{m}, time.Now().Add(time.Second*1)) - hits, err := pool.plugins[1].client.CacheHits(core.JoinNamespace(m.namespace), 2) + hits, err := pool.CacheHits(core.JoinNamespace(m.namespace), 2) So(err, ShouldBeNil) So(hits, ShouldEqual, 0) So(errs, ShouldBeNil) So(len(mts), ShouldEqual, 10) mts, errs = c.CollectMetrics([]core.Metric{m}, time.Now().Add(time.Second*1)) - hits, err = pool.plugins[1].client.CacheHits(core.JoinNamespace(m.namespace), 2) + hits, err = pool.CacheHits(core.JoinNamespace(m.namespace), 2) So(err, ShouldBeNil) So(hits, ShouldEqual, 1) So(errs, ShouldBeNil) @@ -894,25 +902,32 @@ func TestCollectDynamicMetrics(t *testing.T) { pool, errp := c.pluginRunner.AvailablePlugins().getOrCreatePool("collector:mock:1") So(errp, ShouldBeNil) So(pool, ShouldNotBeNil) + ttl, err := pool.CacheTTL() + So(err, ShouldResemble, ErrPoolEmpty) + So(ttl, ShouldEqual, 0) pool.subscribe("1", unboundSubscriptionType) err = c.pluginRunner.runPlugin(lp.Details) So(err, ShouldBeNil) + ttl, err = pool.CacheTTL() + So(err, ShouldBeNil) + So(ttl, ShouldEqual, 1100*time.Millisecond) mts, errs := c.CollectMetrics([]core.Metric{jsonm}, time.Now().Add(time.Second*1)) - hits, err := pool.plugins[1].client.CacheHits(core.JoinNamespace(m.namespace), 1) + hits, err := pool.CacheHits(core.JoinNamespace(jsonm.namespace), jsonm.version) + So(pool.subscriptionCount(), ShouldEqual, 1) + So(pool.strategy, ShouldNotBeNil) + So(len(mts), ShouldBeGreaterThan, 0) So(err, ShouldBeNil) So(hits, ShouldEqual, 0) So(errs, ShouldBeNil) So(len(mts), ShouldEqual, 10) mts, errs = c.CollectMetrics([]core.Metric{jsonm}, time.Now().Add(time.Second*1)) - hits, err = pool.plugins[1].client.CacheHits(core.JoinNamespace(m.namespace), 1) + hits, err = pool.CacheHits(core.JoinNamespace(m.namespace), 1) So(err, ShouldBeNil) So(hits, ShouldEqual, 1) So(errs, ShouldBeNil) So(len(mts), ShouldEqual, 10) - hits = pool.plugins[1].client.AllCacheHits() - So(hits, ShouldEqual, 2) - misses := pool.plugins[1].client.AllCacheMisses() - So(misses, ShouldEqual, 2) + So(pool.AllCacheHits(), ShouldEqual, 1) + So(pool.AllCacheMisses(), ShouldEqual, 1) pool.unsubscribe("1") c.Stop() time.Sleep(100 * time.Millisecond) diff --git a/control/metrics_cache.go b/control/metrics_cache.go new file mode 100644 index 000000000..85e9a02ff --- /dev/null +++ b/control/metrics_cache.go @@ -0,0 +1,129 @@ +/* +http://www.apache.org/licenses/LICENSE-2.0.txt + + +Copyright 2015 Intel Corporation + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package control + +import ( + "errors" + "fmt" + "time" + + log "github.com/Sirupsen/logrus" + + "github.com/intelsdi-x/snap/core" + "github.com/intelsdi-x/snap/pkg/chrono" +) + +// the time limit for which a cache entry is valid. +var GlobalCacheExpiration time.Duration + +var ( + // TODO: add field to Control when it's newed up + // TODO: remove the comments below. + // metricCache = cache{ + // table: make(map[string]*cachecell), + // } + cacheLog = log.WithField("_module", "control-metric-cache") + + ErrCacheEntryDoesNotExist = errors.New("cache entry does not exist") +) + +type cachecell struct { + cacheExpiration time.Duration + time time.Time + metric core.Metric + metrics []core.Metric + hits uint64 + misses uint64 +} + +type cache struct { + table map[string]*cachecell +} + +func (c *cache) get(ns string, version int) interface{} { + var ( + cell *cachecell + ok bool + ) + + key := fmt.Sprintf("%v:%v", ns, version) + if cell, ok = c.table[key]; ok && chrono.Chrono.Now().Sub(cell.time) < cell.cacheExpiration { + cell.hits++ + cacheLog.WithFields(log.Fields{ + "namespace": key, + "hits": cell.hits, + "misses": cell.misses, + }).Debug(fmt.Sprintf("cache hit [%s]", key)) + if cell.metric != nil { + return cell.metric + } + return cell.metrics + } + if !ok { + c.table[key] = &cachecell{ + time: time.Time{}, + metrics: nil, + } + } + c.table[key].misses++ + cacheLog.WithFields(log.Fields{ + "namespace": key, + "hits": c.table[key].hits, + "misses": c.table[key].misses, + }).Debug(fmt.Sprintf("cache miss [%s]", key)) + + return nil +} + +func (c *cache) put(lp *loadedPlugin, ns string, version int, m interface{}) { + key := fmt.Sprintf("%v:%v", ns, version) + switch metric := m.(type) { + case core.Metric: + if _, ok := c.table[key]; ok { + c.table[key].time = chrono.Chrono.Now() + c.table[key].metric = metric + if lp.Meta.CacheTTL == 0 { + c.table[key].cacheExpiration = GlobalCacheExpiration + } else { + c.table[key].cacheExpiration = lp.Meta.CacheTTL + } + } else { + c.table[key] = &cachecell{ + time: chrono.Chrono.Now(), + metric: metric, + } + } + case []core.Metric: + if _, ok := c.table[key]; ok { + c.table[key].time = time.Now() + c.table[key].metrics = metric + } else { + c.table[key] = &cachecell{ + time: chrono.Chrono.Now(), + metrics: metric, + } + } + default: + cacheLog.WithFields(log.Fields{ + "namespace": key, + "_block": "put", + }).Error("unsupported type") + } +} diff --git a/control/monitor_test.go b/control/monitor_test.go index 42f397b82..56ee58e4c 100644 --- a/control/monitor_test.go +++ b/control/monitor_test.go @@ -26,14 +26,13 @@ import ( "github.com/intelsdi-x/gomit" "github.com/intelsdi-x/snap/control/plugin" - "github.com/intelsdi-x/snap/control/routing" . "github.com/smartystreets/goconvey/convey" ) func TestMonitor(t *testing.T) { Convey("monitor", t, func() { - aps := newAvailablePlugins(&routing.RoundRobinStrategy{}) + aps := newAvailablePlugins() ap1 := &availablePlugin{ pluginType: plugin.CollectorPluginType, diff --git a/control/plugin/client/client.go b/control/plugin/client/client.go index d2b75b47e..6e56d0fb4 100644 --- a/control/plugin/client/client.go +++ b/control/plugin/client/client.go @@ -20,26 +20,14 @@ limitations under the License. package client import ( - "fmt" - - log "github.com/Sirupsen/logrus" - "github.com/intelsdi-x/snap/control/plugin" "github.com/intelsdi-x/snap/control/plugin/cpolicy" "github.com/intelsdi-x/snap/core" "github.com/intelsdi-x/snap/core/ctypes" ) -type PluginCacheClient interface { - CacheHits(string, int) (uint64, error) - CacheMisses(string, int) (uint64, error) - AllCacheHits() uint64 - AllCacheMisses() uint64 -} - // PluginClient A client providing common plugin method calls. type PluginClient interface { - PluginCacheClient SetKey() error Ping() error Kill(string) error @@ -64,103 +52,3 @@ type PluginPublisherClient interface { PluginClient Publish(contentType string, content []byte, config map[string]ctypes.ConfigValue) error } - -type pluginCacheClient struct{} - -// AllCacheHits returns cache hits across all metrics. -func (c *pluginCacheClient) AllCacheHits() uint64 { - var hits uint64 - for _, v := range metricCache.table { - hits += v.hits - } - return hits -} - -// AllCacheMisses returns cache misses across all metrics. -func (c *pluginCacheClient) AllCacheMisses() uint64 { - var misses uint64 - for _, v := range metricCache.table { - misses += v.misses - } - return misses -} - -// CacheHits returns the cache hits for a given metric namespace and version. -func (c *pluginCacheClient) CacheHits(ns string, version int) (uint64, error) { - key := fmt.Sprintf("%v:%v", ns, version) - if v, ok := metricCache.table[key]; ok { - return v.hits, nil - } - return 0, ErrCacheEntryDoesNotExist -} - -// CacheMisses returns the cache misses for a given metric namespace and version. -func (c *pluginCacheClient) CacheMisses(ns string, version int) (uint64, error) { - key := fmt.Sprintf("%v:%v", ns, version) - if v, ok := metricCache.table[key]; ok { - return v.misses, nil - } - return 0, ErrCacheEntryDoesNotExist -} - -// checkCache checks the cache for metric types. -// returns: -// - array of metrics that need to be collected -// - array of metrics that were returned from the cache -func checkCache(mts []core.Metric) ([]plugin.PluginMetricType, []core.Metric) { - var fromCache []core.Metric - var metricsToCollect []plugin.PluginMetricType - for _, mt := range mts { - if m := metricCache.get(core.JoinNamespace(mt.Namespace()), mt.Version()); m != nil { - switch metric := m.(type) { - case core.Metric: - fromCache = append(fromCache, metric) - case []core.Metric: - for _, met := range metric { - fromCache = append(fromCache, met) - } - default: - log.WithFields(log.Fields{ - "_module": "client", - "_block": "checkCache", - }).Error("unsupported type found in the cache") - } - } else { - mt := plugin.PluginMetricType{ - Namespace_: mt.Namespace(), - LastAdvertisedTime_: mt.LastAdvertisedTime(), - Version_: mt.Version(), - Tags_: mt.Tags(), - Labels_: mt.Labels(), - Config_: mt.Config(), - } - metricsToCollect = append(metricsToCollect, mt) - } - } - return metricsToCollect, fromCache -} - -// updateCache updates the cache with the given array of metrics. -func updateCache(mts []plugin.PluginMetricType) { - results := []core.Metric{} - dc := map[string][]core.Metric{} - for _, mt := range mts { - if mt.Labels() == nil { - // cache the individual metric - metricCache.put(core.JoinNamespace(mt.Namespace_), mt.Version(), mt) - } else { - // collect the dynamic query results so we can cache - ns := make([]string, len(mt.Namespace())) - copy(ns, mt.Namespace()) - for _, label := range mt.Labels_ { - ns[label.Index] = "*" - } - if _, ok := dc[core.JoinNamespace(ns)]; !ok { - dc[core.JoinNamespace(ns)] = []core.Metric{} - } - dc[core.JoinNamespace(ns)] = append(dc[core.JoinNamespace(ns)], mt) - metricCache.put(core.JoinNamespace(ns), mt.Version(), dc[core.JoinNamespace(ns)]) - } - results = append(results, mt) - } -} diff --git a/control/plugin/client/httpjsonrpc.go b/control/plugin/client/httpjsonrpc.go index dac551846..6a00c6127 100644 --- a/control/plugin/client/httpjsonrpc.go +++ b/control/plugin/client/httpjsonrpc.go @@ -43,7 +43,6 @@ import ( var logger = log.WithField("_module", "client-httpjsonrpc") type httpJSONRPCClient struct { - PluginCacheClient url string id uint64 timeout time.Duration @@ -55,11 +54,10 @@ type httpJSONRPCClient struct { // NewCollectorHttpJSONRPCClient returns CollectorHttpJSONRPCClient func NewCollectorHttpJSONRPCClient(u string, timeout time.Duration, pub *rsa.PublicKey, secure bool) (PluginCollectorClient, error) { hjr := &httpJSONRPCClient{ - PluginCacheClient: &pluginCacheClient{}, - url: u, - timeout: timeout, - pluginType: plugin.CollectorPluginType, - encoder: encoding.NewJsonEncoder(), + url: u, + timeout: timeout, + pluginType: plugin.CollectorPluginType, + encoder: encoding.NewJsonEncoder(), } if secure { key, err := encrypter.GenerateKey() @@ -76,11 +74,10 @@ func NewCollectorHttpJSONRPCClient(u string, timeout time.Duration, pub *rsa.Pub func NewProcessorHttpJSONRPCClient(u string, timeout time.Duration, pub *rsa.PublicKey, secure bool) (PluginProcessorClient, error) { hjr := &httpJSONRPCClient{ - PluginCacheClient: &pluginCacheClient{}, - url: u, - timeout: timeout, - pluginType: plugin.ProcessorPluginType, - encoder: encoding.NewJsonEncoder(), + url: u, + timeout: timeout, + pluginType: plugin.ProcessorPluginType, + encoder: encoding.NewJsonEncoder(), } if secure { key, err := encrypter.GenerateKey() @@ -97,11 +94,10 @@ func NewProcessorHttpJSONRPCClient(u string, timeout time.Duration, pub *rsa.Pub func NewPublisherHttpJSONRPCClient(u string, timeout time.Duration, pub *rsa.PublicKey, secure bool) (PluginPublisherClient, error) { hjr := &httpJSONRPCClient{ - PluginCacheClient: &pluginCacheClient{}, - url: u, - timeout: timeout, - pluginType: plugin.PublisherPluginType, - encoder: encoding.NewJsonEncoder(), + url: u, + timeout: timeout, + pluginType: plugin.PublisherPluginType, + encoder: encoding.NewJsonEncoder(), } if secure { key, err := encrypter.GenerateKey() @@ -146,56 +142,56 @@ func (h *httpJSONRPCClient) Kill(reason string) error { // CollectMetrics returns collected metrics func (h *httpJSONRPCClient) CollectMetrics(mts []core.Metric) ([]core.Metric, error) { - var results []core.Metric if len(mts) == 0 { return nil, errors.New("no metrics to collect") } - metricsToCollect, metricsFromCache := checkCache(mts) - - if len(metricsToCollect) > 0 { - args := &plugin.CollectMetricsArgs{PluginMetricTypes: metricsToCollect} - - out, err := h.encoder.Encode(args) - if err != nil { - return nil, err + metricsToCollect := make([]plugin.PluginMetricType, len(mts)) + for idx, mt := range mts { + metricsToCollect[idx] = plugin.PluginMetricType{ + Namespace_: mt.Namespace(), + LastAdvertisedTime_: mt.LastAdvertisedTime(), + Version_: mt.Version(), + Tags_: mt.Tags(), + Labels_: mt.Labels(), + Config_: mt.Config(), } + } - res, err := h.call("Collector.CollectMetrics", []interface{}{out}) - if err != nil { - return nil, err - } - if len(res.Result) == 0 { - err := errors.New("Invalid response: result is 0") - logger.WithFields(log.Fields{ - "_block": "CollectMetrics", - "jsonrpc response": fmt.Sprintf("%+v", res), - }).Error(err) - return nil, err - } - r := &plugin.CollectMetricsReply{} - err = h.encoder.Decode(res.Result, r) - if err != nil { - return nil, err - } + args := &plugin.CollectMetricsArgs{PluginMetricTypes: metricsToCollect} + + out, err := h.encoder.Encode(args) + if err != nil { + return nil, err + } - updateCache(r.PluginMetrics) + res, err := h.call("Collector.CollectMetrics", []interface{}{out}) + if err != nil { + return nil, err + } + if len(res.Result) == 0 { + err := errors.New("Invalid response: result is 0") + logger.WithFields(log.Fields{ + "_block": "CollectMetrics", + "jsonrpc response": fmt.Sprintf("%+v", res), + }).Error(err) + return nil, err + } + r := &plugin.CollectMetricsReply{} + err = h.encoder.Decode(res.Result, r) + if err != nil { + return nil, err + } - results = make([]core.Metric, len(metricsFromCache)+len(r.PluginMetrics)) - idx := 0 - for _, m := range r.PluginMetrics { - results[idx] = m - idx++ - } - for _, m := range metricsFromCache { - results[idx] = m - idx++ - } - return results, nil - } else { - return metricsFromCache, nil + results = make([]core.Metric, len(r.PluginMetrics)) + idx := 0 + for _, m := range r.PluginMetrics { + results[idx] = m + idx++ } + + return results, nil } // GetMetricTypes returns metric types that can be collected diff --git a/control/plugin/client/native.go b/control/plugin/client/native.go index 852a59a3a..95c4f64bb 100644 --- a/control/plugin/client/native.go +++ b/control/plugin/client/native.go @@ -46,7 +46,6 @@ type CallsRPC interface { // Native clients use golang net/rpc for communication to a native rpc server. type PluginNativeClient struct { - PluginCacheClient connection CallsRPC pluginType plugin.PluginType encoder encoding.Encoder @@ -137,45 +136,45 @@ func (p *PluginNativeClient) CollectMetrics(mts []core.Metric) ([]core.Metric, e return nil, errors.New("no metrics to collect") } - metricsToCollect, metricsFromCache := checkCache(mts) - - if len(metricsToCollect) > 0 { - args := plugin.CollectMetricsArgs{PluginMetricTypes: metricsToCollect} - - out, err := p.encoder.Encode(args) - if err != nil { - return nil, err + metricsToCollect := make([]plugin.PluginMetricType, len(mts)) + for idx, mt := range mts { + metricsToCollect[idx] = plugin.PluginMetricType{ + Namespace_: mt.Namespace(), + LastAdvertisedTime_: mt.LastAdvertisedTime(), + Version_: mt.Version(), + Tags_: mt.Tags(), + Labels_: mt.Labels(), + Config_: mt.Config(), } + } - var reply []byte - err = p.connection.Call("Collector.CollectMetrics", out, &reply) - if err != nil { - return nil, err - } + args := plugin.CollectMetricsArgs{PluginMetricTypes: metricsToCollect} - r := &plugin.CollectMetricsReply{} - err = p.encoder.Decode(reply, r) - if err != nil { - return nil, err - } + out, err := p.encoder.Encode(args) + if err != nil { + return nil, err + } - updateCache(r.PluginMetrics) + var reply []byte + err = p.connection.Call("Collector.CollectMetrics", out, &reply) + if err != nil { + return nil, err + } - results = make([]core.Metric, len(metricsFromCache)+len(r.PluginMetrics)) - idx := 0 - for _, m := range r.PluginMetrics { - results[idx] = m - idx++ - } - for _, m := range metricsFromCache { - results[idx] = m - idx++ - } - return results, nil - } else { - return metricsFromCache, nil + r := &plugin.CollectMetricsReply{} + err = p.encoder.Decode(reply, r) + if err != nil { + return nil, err + } + + results = make([]core.Metric, len(r.PluginMetrics)) + idx := 0 + for _, m := range r.PluginMetrics { + results[idx] = m + idx++ } + return results, nil } func (p *PluginNativeClient) GetMetricTypes(config plugin.PluginConfigType) ([]core.Metric, error) { @@ -240,9 +239,8 @@ func newNativeClient(address string, timeout time.Duration, t plugin.PluginType, } r := rpc.NewClient(conn) p := &PluginNativeClient{ - PluginCacheClient: &pluginCacheClient{}, - connection: r, - pluginType: t, + connection: r, + pluginType: t, } p.encoder = encoding.NewGobEncoder() diff --git a/control/plugin/plugin.go b/control/plugin/plugin.go index b199708b3..21d33a47b 100644 --- a/control/plugin/plugin.go +++ b/control/plugin/plugin.go @@ -52,6 +52,25 @@ const ( PublisherPluginType ) +type RoutingStrategyType int + +// Returns string for matching enum RoutingStrategy type +func (p RoutingStrategyType) String() string { + return routingStrategyTypes[p] +} + +const ( + // DefaultRouting is a least recently used strategy. + DefaultRouting RoutingStrategyType = iota + // StickyRouting is a one-to-one strategy. + // Using this strategy a tasks requests are sent to the same running instance of a plugin. + StickyRouting + // ConfigRouting is routing to plugins based on the config provided to the plugin. + // Using this strategy enables a running database plugin that has the same connection info between + // two tasks to be shared. + ConfigRouting +) + // Plugin response states type PluginResponseState int @@ -81,6 +100,12 @@ var ( "processor", "publisher", } + + routingStrategyTypes = [...]string{ + "least-recently-used", + "sticky", + "config", + } ) type Plugin interface { @@ -93,44 +118,65 @@ type PluginMeta struct { Version int Type PluginType RPCType RPCType - // Content types accepted by this plugin in priority order - // snap.* means any snap type + // AcceptedContentTypes are types accepted by this plugin in priority order. + // snap.* means any snap type. AcceptedContentTypes []string - // Return content types in priority order - // This is only really valid on processors + // ReturnedContentTypes are content types returned in priority order. + // This is only applicable on processors. ReturnedContentTypes []string - // the max number of subscriptions this plugin - // can handle + // ConcurrencyCount is the max number concurrent calls the plugin may take. + // If there are 5 tasks using the plugin and concurrency count is 2 there + // will be 3 plugins running. ConcurrencyCount int - // should always only be one instance of this plugin running + // Exclusive results in a single instance of the plugin running regardless + // the number of tasks using the plugin. Exclusive bool - // do not encrypt communication with this plugin + // Unsecure results in unencrypted communication with this plugin. Unsecure bool - // plugin cache TTL duration. - // It will be converted from the client + // CacheTTL will override the default cache TTL for the provided plugin. CacheTTL time.Duration + // RoutingStrategy will override the routing strategy this plugin requires. + // The default routing strategy round-robin. + RoutingStrategy RoutingStrategyType } type metaOp func(m *PluginMeta) +// ConcurrencyCount is an option that can be be provided to the func NewPluginMeta. func ConcurrencyCount(cc int) metaOp { return func(m *PluginMeta) { m.ConcurrencyCount = cc } } +// Exclusive is an option that can be be provided to the func NewPluginMeta. func Exclusive(e bool) metaOp { return func(m *PluginMeta) { m.Exclusive = e } } +// Unsecure is an option that can be be provided to the func NewPluginMeta. func Unsecure(e bool) metaOp { return func(m *PluginMeta) { m.Unsecure = e } } +// RoutingStrategy is an option that can be be provided to the func NewPluginMeta. +func RoutingStrategy(r RoutingStrategyType) metaOp { + return func(m *PluginMeta) { + m.RoutingStrategy = r + } +} + +// CacheTTL is an option that can be be provided to the func NewPluginMeta. +func CacheTTL(t time.Duration) metaOp { + return func(m *PluginMeta) { + m.CacheTTL = t + } +} + // NewPluginMeta constructs and returns a PluginMeta struct func NewPluginMeta(name string, version int, pluginType PluginType, acceptContentTypes, returnContentTypes []string, opts ...metaOp) *PluginMeta { // An empty accepted content type default to "snap.*" diff --git a/control/plugin_manager.go b/control/plugin_manager.go index 59d1bf4c7..112296a1d 100644 --- a/control/plugin_manager.go +++ b/control/plugin_manager.go @@ -362,8 +362,6 @@ func (p *pluginManager) LoadPlugin(details *pluginDetails, emitter gomit.Emitter return nil, serror.New(err) } - // The plugin cache client will be integrated here later - // Add metric types to metric catalog for _, nmt := range metricTypes { // If the version is 0 default it to the plugin version diff --git a/control/plugin_manager_test.go b/control/plugin_manager_test.go index 28b21a7f0..fa2cbee6e 100644 --- a/control/plugin_manager_test.go +++ b/control/plugin_manager_test.go @@ -164,7 +164,7 @@ func TestLoadPlugin(t *testing.T) { So(err, ShouldBeNil) So(lp.Meta.CacheTTL, ShouldNotBeNil) - So(lp.Meta.CacheTTL, ShouldResemble, time.Duration(time.Millisecond*100)) + So(lp.Meta.CacheTTL, ShouldResemble, time.Duration(time.Millisecond*1100)) }) }) diff --git a/control/routing/round_robin.go b/control/routing/round_robin.go deleted file mode 100644 index 63de4ab52..000000000 --- a/control/routing/round_robin.go +++ /dev/null @@ -1,75 +0,0 @@ -/* -http://www.apache.org/licenses/LICENSE-2.0.txt - - -Copyright 2015 Intel Corporation - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package routing - -import ( - "errors" - "math/rand" - - log "github.com/Sirupsen/logrus" -) - -var ( - ErrorCouldNotSelect = errors.New("could not select a plugin (round robin strategy)") -) - -type RoundRobinStrategy struct { -} - -func (r *RoundRobinStrategy) String() string { - return "round-robin" -} - -func (r *RoundRobinStrategy) Select(spp SelectablePluginPool, spa []SelectablePlugin) (SelectablePlugin, error) { - var h int = -1 - var index int = -1 - for i, sp := range spa { - // look for the lowest hit count - if sp.HitCount() < h || h == -1 { - index = i - h = sp.HitCount() - } - // on a hitcount tie we randomly choose one - if sp.HitCount() == h { - if rand.Intn(1) == 1 { - index = i - h = sp.HitCount() - } - } - } - if index > -1 { - log.WithFields(log.Fields{ - "_module": "control-routing", - "block": "select", - "strategy": "round-robin", - "pool size": len(spa), - "index": spa[index].String(), - "hitcount": spa[index].HitCount(), - }).Debug("plugin selected") - return spa[index], nil - } - log.WithFields(log.Fields{ - "_module": "control-routing", - "block": "select", - "strategy": "round-robin", - "error": ErrorCouldNotSelect, - }).Error("error selecting") - return nil, ErrorCouldNotSelect -} diff --git a/control/runner.go b/control/runner.go index 12ab5c218..c1180ee18 100644 --- a/control/runner.go +++ b/control/runner.go @@ -69,26 +69,16 @@ type runner struct { availablePlugins *availablePlugins metricCatalog catalogsMetrics pluginManager managesPlugins - routingStrategy RoutingStrategy } -func newRunner(routingStrategy RoutingStrategy) *runner { +func newRunner() *runner { r := &runner{ monitor: newMonitor(), - availablePlugins: newAvailablePlugins(routingStrategy), - routingStrategy: routingStrategy, + availablePlugins: newAvailablePlugins(), } return r } -func (r *runner) SetStrategy(rs RoutingStrategy) { - r.routingStrategy = rs -} - -func (r *runner) Strategy() RoutingStrategy { - return r.routingStrategy -} - func (r *runner) SetMetricCatalog(c catalogsMetrics) { r.metricCatalog = c } diff --git a/control/runner_test.go b/control/runner_test.go index 75eb4f29f..3aed68506 100644 --- a/control/runner_test.go +++ b/control/runner_test.go @@ -29,7 +29,6 @@ import ( "github.com/intelsdi-x/snap/control/plugin" "github.com/intelsdi-x/snap/control/plugin/cpolicy" - "github.com/intelsdi-x/snap/control/routing" . "github.com/smartystreets/goconvey/convey" ) @@ -205,7 +204,7 @@ func TestRunnerState(t *testing.T) { Convey(".AddDelegates", func() { Convey("adds a handler delegate", func() { - r := newRunner(&routing.RoundRobinStrategy{}) + r := newRunner() r.AddDelegates(new(MockHandlerDelegate)) r.SetEmitter(new(MockEmitter)) @@ -213,7 +212,7 @@ func TestRunnerState(t *testing.T) { }) Convey("adds multiple delegates", func() { - r := newRunner(&routing.RoundRobinStrategy{}) + r := newRunner() r.AddDelegates(new(MockHandlerDelegate)) r.AddDelegates(new(MockHandlerDelegate)) @@ -221,7 +220,7 @@ func TestRunnerState(t *testing.T) { }) Convey("adds multiple delegates (batch)", func() { - r := newRunner(&routing.RoundRobinStrategy{}) + r := newRunner() r.AddDelegates(new(MockHandlerDelegate), new(MockHandlerDelegate)) So(len(r.delegates), ShouldEqual, 2) @@ -232,7 +231,7 @@ func TestRunnerState(t *testing.T) { Convey(".Start", func() { Convey("returns error without adding delegates", func() { - r := newRunner(&routing.RoundRobinStrategy{}) + r := newRunner() e := r.Start() So(e, ShouldNotBeNil) @@ -240,7 +239,7 @@ func TestRunnerState(t *testing.T) { }) Convey("starts after adding one delegates", func() { - r := newRunner(&routing.RoundRobinStrategy{}) + r := newRunner() m1 := new(MockHandlerDelegate) r.AddDelegates(m1) e := r.Start() @@ -250,7 +249,7 @@ func TestRunnerState(t *testing.T) { }) Convey("starts after after adding multiple delegates", func() { - r := newRunner(&routing.RoundRobinStrategy{}) + r := newRunner() m1 := new(MockHandlerDelegate) m2 := new(MockHandlerDelegate) m3 := new(MockHandlerDelegate) @@ -265,7 +264,7 @@ func TestRunnerState(t *testing.T) { }) Convey("error if delegate cannot RegisterHandler", func() { - r := newRunner(&routing.RoundRobinStrategy{}) + r := newRunner() me := new(MockHandlerDelegate) me.ErrorMode = true r.AddDelegates(me) @@ -280,7 +279,7 @@ func TestRunnerState(t *testing.T) { Convey(".Stop", func() { Convey("removes handlers from delegates", func() { - r := newRunner(&routing.RoundRobinStrategy{}) + r := newRunner() m1 := new(MockHandlerDelegate) m2 := new(MockHandlerDelegate) m3 := new(MockHandlerDelegate) @@ -297,7 +296,7 @@ func TestRunnerState(t *testing.T) { }) Convey("returns errors for handlers errors on stop", func() { - r := newRunner(&routing.RoundRobinStrategy{}) + r := newRunner() m1 := new(MockHandlerDelegate) m1.StopError = errors.New("0") m2 := new(MockHandlerDelegate) @@ -349,7 +348,7 @@ func TestRunnerPluginRunning(t *testing.T) { // These tests only work if snap Path is known to discover mock plugin used for testing if SnapPath != "" { Convey("should return an AvailablePlugin", func() { - r := newRunner(&routing.RoundRobinStrategy{}) + r := newRunner() r.SetEmitter(new(MockEmitter)) a := plugin.Arg{ PluginLogPath: "/tmp/snap-test-plugin.log", @@ -374,7 +373,7 @@ func TestRunnerPluginRunning(t *testing.T) { }) Convey("availablePlugins should include returned availablePlugin", func() { - r := newRunner(&routing.RoundRobinStrategy{}) + r := newRunner() r.SetEmitter(new(MockEmitter)) a := plugin.Arg{ PluginLogPath: "/tmp/snap-test-plugin.log", @@ -394,7 +393,7 @@ func TestRunnerPluginRunning(t *testing.T) { }) Convey("healthcheck on healthy plugin does not increment failedHealthChecks", func() { - r := newRunner(&routing.RoundRobinStrategy{}) + r := newRunner() r.SetEmitter(new(MockEmitter)) a := plugin.Arg{ PluginLogPath: "/tmp/snap-test-plugin.log", @@ -413,7 +412,7 @@ func TestRunnerPluginRunning(t *testing.T) { }) Convey("healthcheck on unhealthy plugin increments failedHealthChecks", func() { - r := newRunner(&routing.RoundRobinStrategy{}) + r := newRunner() r.SetEmitter(new(MockEmitter)) a := plugin.Arg{ PluginLogPath: "/tmp/snap-test-plugin.log", @@ -432,7 +431,7 @@ func TestRunnerPluginRunning(t *testing.T) { }) Convey("successful healthcheck resets failedHealthChecks", func() { - r := newRunner(&routing.RoundRobinStrategy{}) + r := newRunner() r.SetEmitter(new(MockEmitter)) a := plugin.Arg{ PluginLogPath: "/tmp/snap-test-plugin-foo.log", @@ -455,7 +454,7 @@ func TestRunnerPluginRunning(t *testing.T) { }) Convey("three consecutive failedHealthChecks disables the plugin", func() { - r := newRunner(&routing.RoundRobinStrategy{}) + r := newRunner() r.SetEmitter(new(MockEmitter)) a := plugin.Arg{ PluginLogPath: "/tmp/snap-test-plugin.log", @@ -476,7 +475,7 @@ func TestRunnerPluginRunning(t *testing.T) { }) Convey("should return error for WaitForResponse error", func() { - r := newRunner(&routing.RoundRobinStrategy{}) + r := newRunner() r.SetEmitter(new(MockEmitter)) exPlugin := new(MockExecutablePlugin) exPlugin.Timeout = true // set to not response @@ -487,7 +486,7 @@ func TestRunnerPluginRunning(t *testing.T) { }) Convey("should return error for nil availablePlugin", func() { - r := newRunner(&routing.RoundRobinStrategy{}) + r := newRunner() exPlugin := new(MockExecutablePlugin) exPlugin.NilResponse = true // set to not response ap, e := r.startPlugin(exPlugin) @@ -497,7 +496,7 @@ func TestRunnerPluginRunning(t *testing.T) { }) Convey("should return error if plugin fails while starting", func() { - r := newRunner(&routing.RoundRobinStrategy{}) + r := newRunner() exPlugin := &MockExecutablePlugin{ StartError: true, } @@ -508,7 +507,7 @@ func TestRunnerPluginRunning(t *testing.T) { }) Convey("should return error if plugin fails to start", func() { - r := newRunner(&routing.RoundRobinStrategy{}) + r := newRunner() exPlugin := &MockExecutablePlugin{ PluginFailure: true, } @@ -523,7 +522,7 @@ func TestRunnerPluginRunning(t *testing.T) { Convey("stopPlugin", func() { Convey("should return an AvailablePlugin in a Running state", func() { - r := newRunner(&routing.RoundRobinStrategy{}) + r := newRunner() a := plugin.Arg{ PluginLogPath: "/tmp/snap-test-plugin-stop.log", } diff --git a/control/router.go b/control/strategy.go similarity index 55% rename from control/router.go rename to control/strategy.go index 2631451f2..5bfab08a2 100644 --- a/control/router.go +++ b/control/strategy.go @@ -17,18 +17,24 @@ See the License for the specific language governing permissions and limitations under the License. */ -// Package control contains the routing functionality. Router is the entry point for execution commands and routing to plugins package control -import "github.com/intelsdi-x/snap/control/routing" - -// RouterResponse interface -type RouterResponse interface { -} - -// RoutingStrategy interface -type RoutingStrategy interface { - Select(routing.SelectablePluginPool, []routing.SelectablePlugin) (routing.SelectablePlugin, error) - // Handy string for logging what strategy is selected +import ( + "time" + + "github.com/intelsdi-x/snap/control/strategy" + "github.com/intelsdi-x/snap/core" +) + +// Strategy defines the interface for plugin routing and caching. +type Strategy interface { + Select([]strategy.SelectablePlugin) (strategy.SelectablePlugin, error) + CheckCache(mts []core.Metric) ([]core.Metric, []core.Metric) + UpdateCache(mts []core.Metric) + CacheHits(string, int) (uint64, error) + CacheMisses(string, int) (uint64, error) + AllCacheHits() uint64 + AllCacheMisses() uint64 + CacheTTL() time.Duration String() string } diff --git a/control/plugin/client/cache.go b/control/strategy/cache.go similarity index 88% rename from control/plugin/client/cache.go rename to control/strategy/cache.go index d37783c08..0376b14a3 100644 --- a/control/plugin/client/cache.go +++ b/control/strategy/cache.go @@ -17,7 +17,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package client +package strategy import ( "errors" @@ -29,14 +29,12 @@ import ( "github.com/intelsdi-x/snap/pkg/chrono" ) -// the time limit for which a cache entry is valid. +// GlobalCacheExpiration the default time limit for which a cache entry is valid. +// A plugin can override the GlobalCacheExpiration (default). var GlobalCacheExpiration time.Duration var ( - metricCache = cache{ - table: make(map[string]*cachecell), - } - cacheLog = log.WithField("_module", "client-cache") + cacheLog = log.WithField("_module", "routing-cache") ErrCacheEntryDoesNotExist = errors.New("cache entry does not exist") ) @@ -51,6 +49,14 @@ type cachecell struct { type cache struct { table map[string]*cachecell + ttl time.Duration +} + +func NewCache(expiration time.Duration) *cache { + return &cache{ + table: make(map[string]*cachecell), + ttl: expiration, + } } func (c *cache) get(ns string, version int) interface{} { @@ -60,7 +66,7 @@ func (c *cache) get(ns string, version int) interface{} { ) key := fmt.Sprintf("%v:%v", ns, version) - if cell, ok = c.table[key]; ok && chrono.Chrono.Now().Sub(cell.time) < GlobalCacheExpiration { + if cell, ok = c.table[key]; ok && chrono.Chrono.Now().Sub(cell.time) < c.ttl { cell.hits++ cacheLog.WithFields(log.Fields{ "namespace": key, diff --git a/control/plugin/client/cache_test.go b/control/strategy/cache_test.go similarity index 86% rename from control/plugin/client/cache_test.go rename to control/strategy/cache_test.go index 72e046ed5..149142f23 100644 --- a/control/plugin/client/cache_test.go +++ b/control/strategy/cache_test.go @@ -17,7 +17,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package client +package strategy import ( "testing" @@ -32,22 +32,19 @@ import ( func TestCache(t *testing.T) { GlobalCacheExpiration = time.Duration(300 * time.Millisecond) Convey("puts and gets a metric", t, func() { - mc := &cache{ - table: make(map[string]*cachecell), - } + mc := NewCache(GlobalCacheExpiration) foo := &plugin.PluginMetricType{ Namespace_: []string{"foo", "bar"}, } mc.put("/foo/bar", 1, foo) ret := mc.get("/foo/bar", 1) + So(ret, ShouldNotBeNil) So(ret, ShouldEqual, foo) }) Convey("returns nil if the cache cell does not exist", t, func() { - mc := &cache{ - table: make(map[string]*cachecell), - } + mc := NewCache(GlobalCacheExpiration) ret := mc.get("/foo/bar", 1) So(ret, ShouldBeNil) }) @@ -59,23 +56,19 @@ func TestCache(t *testing.T) { // Use artificial time: pause to get base time. chrono.Chrono.Pause() - mc := &cache{ - table: make(map[string]*cachecell), - } + mc := NewCache(400 * time.Millisecond) foo := &plugin.PluginMetricType{ Namespace_: []string{"foo", "bar"}, } mc.put("/foo/bar", 1, foo) - chrono.Chrono.Forward(301 * time.Millisecond) + chrono.Chrono.Forward(401 * time.Millisecond) ret := mc.get("/foo/bar", 1) So(ret, ShouldBeNil) }) Convey("hit and miss counts", t, func() { Convey("ticks hit count when a cache entry is hit", func() { - mc := &cache{ - table: make(map[string]*cachecell), - } + mc := NewCache(400 * time.Millisecond) foo := &plugin.PluginMetricType{ Namespace_: []string{"foo", "bar"}, } @@ -89,9 +82,7 @@ func TestCache(t *testing.T) { chrono.Chrono.Pause() - mc := &cache{ - table: make(map[string]*cachecell), - } + mc := NewCache(400 * time.Millisecond) foo := &plugin.PluginMetricType{ Namespace_: []string{"foo", "bar"}, } @@ -107,9 +98,7 @@ func TestCache(t *testing.T) { chrono.Chrono.Pause() - mc := &cache{ - table: make(map[string]*cachecell), - } + mc := NewCache(GlobalCacheExpiration) foo := &plugin.PluginMetricType{ Namespace_: []string{"foo", "bar"}, } diff --git a/control/strategy/lru.go b/control/strategy/lru.go new file mode 100644 index 000000000..e13398946 --- /dev/null +++ b/control/strategy/lru.go @@ -0,0 +1,186 @@ +/* +http://www.apache.org/licenses/LICENSE-2.0.txt + + +Copyright 2015 Intel Corporation + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package strategy + +import ( + "errors" + "fmt" + "math/rand" + "time" + + log "github.com/Sirupsen/logrus" + "github.com/intelsdi-x/snap/core" +) + +var ( + ErrorCouldNotSelect = errors.New("could not select a plugin (round robin strategy)") +) + +// lru provides a stragey that selects the least recently used available plugin. +type lru struct { + metricCache *cache + logger *log.Entry +} + +func NewLRU(cacheTTL time.Duration) *lru { + return &lru{ + metricCache: NewCache(cacheTTL), + logger: log.WithFields(log.Fields{ + "_module": "control-routing", + }), + } +} + +func (r *lru) Cache() *cache { + return r.metricCache +} + +func (r *lru) String() string { + return "least-recently-used" +} + +func (r *lru) CacheTTL() time.Duration { + return r.Cache().ttl +} + +func (r *lru) Select(spa []SelectablePlugin) (SelectablePlugin, error) { + var h int = -1 + var index int = -1 + for i, sp := range spa { + // look for the lowest hit count + if sp.HitCount() < h || h == -1 { + index = i + h = sp.HitCount() + } + // on a hitcount tie we randomly choose one + if sp.HitCount() == h { + if rand.Intn(1) == 1 { + index = i + h = sp.HitCount() + } + } + } + if index > -1 { + r.logger.WithFields(log.Fields{ + "block": "select", + "strategy": "round-robin", + "pool size": len(spa), + "index": spa[index].String(), + "hitcount": spa[index].HitCount(), + }).Debug("plugin selected") + return spa[index], nil + } + r.logger.WithFields(log.Fields{ + "block": "select", + "strategy": "round-robin", + "error": ErrorCouldNotSelect, + }).Error("error selecting") + return nil, ErrorCouldNotSelect +} + +// checkCache checks the cache for metric types. +// returns: +// - array of metrics that need to be collected +// - array of metrics that were returned from the cache +func (r *lru) CheckCache(mts []core.Metric) ([]core.Metric, []core.Metric) { + var fromCache []core.Metric + var metricsToCollect []core.Metric + for _, mt := range mts { + if m := r.metricCache.get(core.JoinNamespace(mt.Namespace()), mt.Version()); m != nil { + switch metric := m.(type) { + case core.Metric: + fromCache = append(fromCache, metric) + case []core.Metric: + for _, met := range metric { + fromCache = append(fromCache, met) + } + default: + r.logger.WithFields(log.Fields{ + "_module": "client", + "_block": "checkCache", + }).Error("unsupported type found in the cache") + } + } else { + metricsToCollect = append(metricsToCollect, mt) + } + } + return metricsToCollect, fromCache +} + +// updateCache updates the cache with the given array of metrics. +func (r *lru) UpdateCache(mts []core.Metric) { + results := []core.Metric{} + dc := map[string][]core.Metric{} + for _, mt := range mts { + if mt.Labels() == nil { + // cache the individual metric + r.metricCache.put(core.JoinNamespace(mt.Namespace()), mt.Version(), mt) + } else { + // collect the dynamic query results so we can cache + ns := make([]string, len(mt.Namespace())) + copy(ns, mt.Namespace()) + for _, label := range mt.Labels() { + ns[label.Index] = "*" + } + if _, ok := dc[core.JoinNamespace(ns)]; !ok { + dc[core.JoinNamespace(ns)] = []core.Metric{} + } + dc[core.JoinNamespace(ns)] = append(dc[core.JoinNamespace(ns)], mt) + r.metricCache.put(core.JoinNamespace(ns), mt.Version(), dc[core.JoinNamespace(ns)]) + r.logger.Errorf("putting %v:%v in the cache", mt.Namespace(), mt.Version()) + } + results = append(results, mt) + } +} + +func (r *lru) AllCacheHits() uint64 { + var hits uint64 + for _, v := range r.metricCache.table { + hits += v.hits + } + return hits +} + +// AllCacheMisses returns cache misses across all metrics. +func (r *lru) AllCacheMisses() uint64 { + var misses uint64 + for _, v := range r.metricCache.table { + misses += v.misses + } + return misses +} + +// CacheHits returns the cache hits for a given metric namespace and version. +func (r *lru) CacheHits(ns string, version int) (uint64, error) { + key := fmt.Sprintf("%v:%v", ns, version) + if v, ok := r.metricCache.table[key]; ok { + return v.hits, nil + } + return 0, ErrCacheEntryDoesNotExist +} + +// CacheMisses returns the cache misses for a given metric namespace and version. +func (r *lru) CacheMisses(ns string, version int) (uint64, error) { + key := fmt.Sprintf("%v:%v", ns, version) + if v, ok := r.metricCache.table[key]; ok { + return v.misses, nil + } + return 0, ErrCacheEntryDoesNotExist +} diff --git a/control/routing/routing.go b/control/strategy/strategy.go similarity index 90% rename from control/routing/routing.go rename to control/strategy/strategy.go index 7bb41d32f..a5bd9c05d 100644 --- a/control/routing/routing.go +++ b/control/strategy/strategy.go @@ -17,14 +17,9 @@ See the License for the specific language governing permissions and limitations under the License. */ -package routing +package strategy -import ( - "time" -) - -type SelectablePluginPool interface { -} +import "time" type SelectablePlugin interface { HitCount() int diff --git a/plugin/collector/snap-collector-mock1/main.go b/plugin/collector/snap-collector-mock1/main.go index f1d07590c..52c367c46 100644 --- a/plugin/collector/snap-collector-mock1/main.go +++ b/plugin/collector/snap-collector-mock1/main.go @@ -21,7 +21,6 @@ package main import ( "os" - "time" // Import the snap plugin library "github.com/intelsdi-x/snap/control/plugin" @@ -37,7 +36,6 @@ func main() { // Define metadata about Plugin meta := mock.Meta() meta.RPCType = plugin.JSONRPC - meta.CacheTTL = time.Duration(time.Millisecond * 100) // Start a collector plugin.Start(meta, new(mock.Mock), os.Args[1]) diff --git a/plugin/collector/snap-collector-mock1/mock/mock.go b/plugin/collector/snap-collector-mock1/mock/mock.go index c69b1b357..657651d5a 100644 --- a/plugin/collector/snap-collector-mock1/mock/mock.go +++ b/plugin/collector/snap-collector-mock1/mock/mock.go @@ -117,7 +117,16 @@ func (f *Mock) GetConfigPolicy() (*cpolicy.ConfigPolicy, error) { //Meta returns meta data for testing func Meta() *plugin.PluginMeta { - return plugin.NewPluginMeta(Name, Version, Type, []string{plugin.SnapGOBContentType}, []string{plugin.SnapGOBContentType}, plugin.Unsecure(true)) + return plugin.NewPluginMeta( + Name, + Version, + Type, + []string{plugin.SnapGOBContentType}, + []string{plugin.SnapGOBContentType}, + plugin.Unsecure(true), + plugin.RoutingStrategy(plugin.DefaultRouting), + plugin.CacheTTL(1100*time.Millisecond), + ) } //Random number generator