Skip to content
This repository has been archived by the owner on Nov 8, 2022. It is now read-only.

Commit

Permalink
Adds sticky (plugin) routing strategy
Browse files Browse the repository at this point in the history
  • Loading branch information
jcooklin committed Jan 14, 2016
1 parent 6fc20b4 commit 36f0e7a
Show file tree
Hide file tree
Showing 17 changed files with 645 additions and 241 deletions.
110 changes: 47 additions & 63 deletions control/available_plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -376,6 +376,9 @@ func (p *apPool) applyPluginMeta(ap *availablePlugin) error {
switch ap.meta.RoutingStrategy {
case plugin.DefaultRouting:
p.strategy = strategy.NewLRU(cacheTTL)
case plugin.StickyRouting:
p.strategy = strategy.NewSticky(cacheTTL)
ap.meta.ConcurrencyCount = 1
default:
return ErrBadStrategy
}
Expand Down Expand Up @@ -442,43 +445,24 @@ func (p *apPool) kill(id uint32, reason string) {
}
}

// kills the plugin with the lowest hit count
func (p *apPool) killLeastUsed(reason string) {
p.Lock()
defer p.Unlock()

var (
id uint32
prev int
)

// grab details from the first item
for _, p := range p.plugins {
prev = p.hitCount
id = p.id
break
}

// walk through all and find the lowest hit count
for _, p := range p.plugins {
if p.hitCount < prev {
prev = p.hitCount
id = p.id
}
func (p *apPool) selectAndKill(taskID, reason string) {
sp := make([]strategy.SelectablePlugin, p.count())
i := 0
for _, plg := range p.plugins {
sp[i] = plg
i++
}

// kill that ap
ap, ok := p.plugins[id]
if ok {
// only log on first ok health check
sap, err := p.strategy.Select(sp, taskID)
if err != nil || sap == nil {
log.WithFields(log.Fields{
"_module": "control-aplugin",
"block": "kill-least-used",
"aplugin": ap,
}).Debug("killing available plugin")
ap.Kill(reason)
delete(p.plugins, id)
"_block": "selectAndKill",
"taskID": taskID,
"reason": reason,
}).Error(err)
return
}
p.kill(sap.(*availablePlugin).ID(), reason)
p.strategy.Remove(sap, taskID)
}

// remove removes an available plugin from the the pool.
Expand Down Expand Up @@ -506,7 +490,7 @@ func (p *apPool) subscriptionCount() int {
return len(p.subs)
}

func (p *apPool) selectAP() (*availablePlugin, serror.SnapError) {
func (p *apPool) selectAP(taskID string) (*availablePlugin, serror.SnapError) {
p.RLock()
defer p.RUnlock()

Expand All @@ -516,7 +500,7 @@ func (p *apPool) selectAP() (*availablePlugin, serror.SnapError) {
sp[i] = plg
i++
}
sap, err := p.strategy.Select(sp)
sap, err := p.strategy.Select(sp, taskID)
if err != nil || sap == nil {
return nil, serror.New(err)
}
Expand Down Expand Up @@ -550,20 +534,20 @@ type subscription struct {
taskID string
}

func (p *apPool) CheckCache(mts []core.Metric) ([]core.Metric, []core.Metric) {
return p.strategy.CheckCache(mts)
func (p *apPool) CheckCache(mts []core.Metric, taskID string) ([]core.Metric, []core.Metric) {
return p.strategy.CheckCache(mts, taskID)
}

func (p *apPool) UpdateCache(mts []core.Metric) {
p.strategy.UpdateCache(mts)
func (p *apPool) UpdateCache(mts []core.Metric, taskID string) {
p.strategy.UpdateCache(mts, taskID)
}

func (p *apPool) CacheHits(ns string, ver int) (uint64, error) {
return p.strategy.CacheHits(ns, ver)
func (p *apPool) CacheHits(ns string, ver int, taskID string) (uint64, error) {
return p.strategy.CacheHits(ns, ver, taskID)
}

func (p *apPool) CacheMisses(ns string, ver int) (uint64, error) {
return p.strategy.CacheMisses(ns, ver)
func (p *apPool) CacheMisses(ns string, ver int, taskID string) (uint64, error) {
return p.strategy.CacheMisses(ns, ver, taskID)
}
func (p *apPool) AllCacheHits() uint64 {
return p.strategy.AllCacheHits()
Expand All @@ -573,11 +557,11 @@ func (p *apPool) AllCacheMisses() uint64 {
return p.strategy.AllCacheMisses()
}

func (p *apPool) CacheTTL() (time.Duration, error) {
func (p *apPool) CacheTTL(taskID string) (time.Duration, error) {
if len(p.plugins) == 0 {
return 0, ErrPoolEmpty
}
return p.strategy.CacheTTL(), nil
return p.strategy.CacheTTL(taskID), nil
}

type availablePlugins struct {
Expand Down Expand Up @@ -649,7 +633,7 @@ func (ap *availablePlugins) getPool(key string) (*apPool, serror.SnapError) {
return pool, nil
}

func (ap *availablePlugins) collectMetrics(pluginKey string, metricTypes []core.Metric) ([]core.Metric, error) {
func (ap *availablePlugins) collectMetrics(pluginKey string, metricTypes []core.Metric, taskID string) ([]core.Metric, error) {
var results []core.Metric
pool, serr := ap.getPool(pluginKey)
if serr != nil {
Expand All @@ -659,15 +643,15 @@ func (ap *availablePlugins) collectMetrics(pluginKey string, metricTypes []core.
return nil, serror.New(ErrPoolNotFound, map[string]interface{}{"pool-key": pluginKey})
}

metricsToCollect, metricsFromCache := pool.CheckCache(metricTypes)
metricsToCollect, metricsFromCache := pool.CheckCache(metricTypes, taskID)

if len(metricsToCollect) == 0 {
return metricsFromCache, nil
}

pool.RLock()
defer pool.RUnlock()
p, serr := pool.selectAP()
p, serr := pool.selectAP(taskID)
if serr != nil {
return nil, serr
}
Expand All @@ -684,7 +668,7 @@ func (ap *availablePlugins) collectMetrics(pluginKey string, metricTypes []core.
return nil, serror.New(err)
}

pool.UpdateCache(metrics)
pool.UpdateCache(metrics, taskID)

results = make([]core.Metric, len(metricsFromCache)+len(metrics))
idx := 0
Expand All @@ -704,7 +688,7 @@ func (ap *availablePlugins) collectMetrics(pluginKey string, metricTypes []core.
return metrics, nil
}

func (ap *availablePlugins) publishMetrics(contentType string, content []byte, pluginName string, pluginVersion int, config map[string]ctypes.ConfigValue) []error {
func (ap *availablePlugins) publishMetrics(contentType string, content []byte, pluginName string, pluginVersion int, config map[string]ctypes.ConfigValue, taskID string) []error {
var errs []error
key := strings.Join([]string{plugin.PublisherPluginType.String(), pluginName, strconv.Itoa(pluginVersion)}, ":")
pool, serr := ap.getPool(key)
Expand All @@ -718,7 +702,7 @@ func (ap *availablePlugins) publishMetrics(contentType string, content []byte, p

pool.RLock()
defer pool.RUnlock()
p, err := pool.selectAP()
p, err := pool.selectAP(taskID)
if err != nil {
errs = append(errs, err)
return errs
Expand All @@ -738,7 +722,7 @@ func (ap *availablePlugins) publishMetrics(contentType string, content []byte, p
return nil
}

func (ap *availablePlugins) processMetrics(contentType string, content []byte, pluginName string, pluginVersion int, config map[string]ctypes.ConfigValue) (string, []byte, []error) {
func (ap *availablePlugins) processMetrics(contentType string, content []byte, pluginName string, pluginVersion int, config map[string]ctypes.ConfigValue, taskID string) (string, []byte, []error) {
var errs []error
key := strings.Join([]string{plugin.ProcessorPluginType.String(), pluginName, strconv.Itoa(pluginVersion)}, ":")
pool, serr := ap.getPool(key)
Expand All @@ -752,7 +736,7 @@ func (ap *availablePlugins) processMetrics(contentType string, content []byte, p

pool.RLock()
defer pool.RUnlock()
p, err := pool.selectAP()
p, err := pool.selectAP(taskID)
if err != nil {
errs = append(errs, err)
return "", nil, errs
Expand Down Expand Up @@ -809,17 +793,17 @@ func (ap *availablePlugins) getOrCreatePool(key string) (*apPool, error) {
return pool, nil
}

func (ap *availablePlugins) selectAP(key string) (*availablePlugin, serror.SnapError) {
ap.RLock()
defer ap.RUnlock()
// func (ap *availablePlugins) selectAP(key string) (*availablePlugin, serror.SnapError) {
// ap.RLock()
// defer ap.RUnlock()

pool, err := ap.getPool(key)
if err != nil {
return nil, err
}
// pool, err := ap.getPool(key)
// if err != nil {
// return nil, err
// }

return pool.selectAP()
}
// return pool.selectAP()
// }

func (ap *availablePlugins) pools() map[string]*apPool {
ap.RLock()
Expand Down
12 changes: 6 additions & 6 deletions control/control.go
Original file line number Diff line number Diff line change
Expand Up @@ -836,7 +836,7 @@ func (p *pluginControl) MetricExists(mns []string, ver int) bool {
// CollectMetrics is a blocking call to collector plugins returning a collection
// of metrics and errors. If an error is encountered no metrics will be
// returned.
func (p *pluginControl) CollectMetrics(metricTypes []core.Metric, deadline time.Time) (metrics []core.Metric, errs []error) {
func (p *pluginControl) CollectMetrics(metricTypes []core.Metric, deadline time.Time, taskID string) (metrics []core.Metric, errs []error) {
pluginToMetricMap, err := groupMetricTypesByPlugin(p.metricCatalog, metricTypes)
if err != nil {
errs = append(errs, err)
Expand All @@ -859,7 +859,7 @@ func (p *pluginControl) CollectMetrics(metricTypes []core.Metric, deadline time.
wg.Add(1)

go func(pluginKey string, mt []core.Metric) {
mts, err := p.pluginRunner.AvailablePlugins().collectMetrics(pluginKey, mt)
mts, err := p.pluginRunner.AvailablePlugins().collectMetrics(pluginKey, mt, taskID)
if err != nil {
cError <- err
} else {
Expand Down Expand Up @@ -893,23 +893,23 @@ func (p *pluginControl) CollectMetrics(metricTypes []core.Metric, deadline time.
}

// PublishMetrics
func (p *pluginControl) PublishMetrics(contentType string, content []byte, pluginName string, pluginVersion int, config map[string]ctypes.ConfigValue) []error {
func (p *pluginControl) PublishMetrics(contentType string, content []byte, pluginName string, pluginVersion int, config map[string]ctypes.ConfigValue, taskID string) []error {
// merge global plugin config into the config for this request
cfg := p.Config.Plugins.getPluginConfigDataNode(core.PublisherPluginType, pluginName, pluginVersion).Table()
for k, v := range config {
cfg[k] = v
}
return p.pluginRunner.AvailablePlugins().publishMetrics(contentType, content, pluginName, pluginVersion, cfg)
return p.pluginRunner.AvailablePlugins().publishMetrics(contentType, content, pluginName, pluginVersion, cfg, taskID)
}

// ProcessMetrics
func (p *pluginControl) ProcessMetrics(contentType string, content []byte, pluginName string, pluginVersion int, config map[string]ctypes.ConfigValue) (string, []byte, []error) {
func (p *pluginControl) ProcessMetrics(contentType string, content []byte, pluginName string, pluginVersion int, config map[string]ctypes.ConfigValue, taskID string) (string, []byte, []error) {
// merge global plugin config into the config for this request
cfg := p.Config.Plugins.getPluginConfigDataNode(core.ProcessorPluginType, pluginName, pluginVersion).Table()
for k, v := range config {
cfg[k] = v
}
return p.pluginRunner.AvailablePlugins().processMetrics(contentType, content, pluginName, pluginVersion, cfg)
return p.pluginRunner.AvailablePlugins().processMetrics(contentType, content, pluginName, pluginVersion, cfg, taskID)
}

// GetPluginContentTypes returns accepted and returned content types for the
Expand Down
Loading

0 comments on commit 36f0e7a

Please sign in to comment.