From 4f7ed67976e94b0949ea26466b7d2d368be4f095 Mon Sep 17 00:00:00 2001 From: Marcin Krolik Date: Thu, 14 Apr 2016 09:28:51 +0200 Subject: [PATCH] Review comments implemented --- control/available_plugin.go | 20 +++----------- control/strategy/pool.go | 51 +++++++++++++++-------------------- control/strategy/pool_test.go | 29 +++++++++++++++++--- control/strategy/strategy.go | 40 --------------------------- 4 files changed, 51 insertions(+), 89 deletions(-) diff --git a/control/available_plugin.go b/control/available_plugin.go index 1d5301e02..9008389d7 100644 --- a/control/available_plugin.go +++ b/control/available_plugin.go @@ -386,14 +386,10 @@ func (ap *availablePlugins) collectMetrics(pluginKey string, metricTypes []core. if config != nil { cfg = config.Table() } - opts := strategy.SelectorValues{ - Task: taskID, - Config: cfg, - } pool.RLock() defer pool.RUnlock() - p, serr := pool.SelectAP(opts) + p, serr := pool.SelectAP(taskID, cfg) if serr != nil { return nil, serr } @@ -445,12 +441,7 @@ func (ap *availablePlugins) publishMetrics(contentType string, content []byte, p pool.RLock() defer pool.RUnlock() - opts := strategy.SelectorValues{ - Task: taskID, - Config: config, - } - - p, err := pool.SelectAP(opts) + p, err := pool.SelectAP(taskID, config) if err != nil { errs = append(errs, err) return errs @@ -481,13 +472,10 @@ func (ap *availablePlugins) processMetrics(contentType string, content []byte, p if pool == nil { return "", nil, []error{serror.New(ErrPoolNotFound, map[string]interface{}{"pool-key": key})} } - opts := strategy.SelectorValues{ - Task: taskID, - Config: config, - } + pool.RLock() defer pool.RUnlock() - p, err := pool.SelectAP(opts) + p, err := pool.SelectAP(taskID, config) if err != nil { errs = append(errs, err) return "", nil, errs diff --git a/control/strategy/pool.go b/control/strategy/pool.go index 45a38ae16..dd56e5a0a 100644 --- a/control/strategy/pool.go +++ b/control/strategy/pool.go @@ -50,7 +50,6 @@ var ( // This defines the maximum running instances of a loaded plugin. // It is initialized at runtime via the cli. MaximumRunningPlugins = 3 - ErrStrategyUnknown = errors.New("Provided RoutingAndCaching strategy unknown") // TODO - remove? ) var ( @@ -70,7 +69,7 @@ type Pool interface { RLock() RUnlock() SelectAndKill(taskID, reason string) - SelectAP(opts SelectorValues) (AvailablePlugin, serror.SnapError) + SelectAP(taskID string, configID map[string]ctypes.ConfigValue) (AvailablePlugin, serror.SnapError) Strategy() RoutingAndCaching Subscribe(taskID string, subType SubscriptionType) SubscriptionCount() int @@ -93,11 +92,6 @@ type AvailablePlugin interface { Type() plugin.PluginType } -type SelectorValues struct { - Task string - Config map[string]ctypes.ConfigValue -} - type subscription struct { SubType SubscriptionType Version int @@ -359,32 +353,26 @@ func (p *pool) SubscriptionCount() int { } // SelectAP selects an available plugin from the pool -func (p *pool) SelectAP(opts SelectorValues) (AvailablePlugin, serror.SnapError) { +func (p *pool) SelectAP(taskID string, config map[string]ctypes.ConfigValue) (AvailablePlugin, serror.SnapError) { p.RLock() defer p.RUnlock() - selector, err := func(opts SelectorValues) (string, error) { - var e error - s := "" - switch p.Strategy().String() { - case "least-recently-used": - s = "lru" - case "sticky": - s = opts.Task - case "config-based": - s = idFromCfg(opts.Config) - default: - e = ErrBadStrategy - } - return s, e - }(opts) + aps := p.plugins.Values() - if err != nil { - return nil, serror.New(err) + var id string + switch p.Strategy().String() { + case "least-recently-used": + id = "" + case "sticky": + id = taskID + case "config-based": + id = idFromCfg(config) + default: + return nil, serror.New(ErrBadStrategy) } - aps := p.plugins.Values() - ap, err := p.Select(aps, selector) - if err != nil || ap == nil { + + ap, err := p.Select(aps, id) + if err != nil { return nil, serror.New(err) } return ap, nil @@ -409,11 +397,16 @@ func (p *pool) generatePID() uint32 { // MoveSubscriptions moves subscriptions to another pool func (p *pool) MoveSubscriptions(to Pool) []subscription { var subs []subscription - + // If attempting to move between the same pool + // bail to prevent deadlock. + if to.(*pool) == p { + return []subscription{} + } p.Lock() defer p.Unlock() for task, sub := range p.subs { + // ensure that this sub was not bound to this pool specifically before moving if sub.SubType == UnboundSubscriptionType { subs = append(subs, *sub) to.Subscribe(task, UnboundSubscriptionType) diff --git a/control/strategy/pool_test.go b/control/strategy/pool_test.go index b40ecf09b..4dc7c5818 100644 --- a/control/strategy/pool_test.go +++ b/control/strategy/pool_test.go @@ -19,6 +19,7 @@ limitations under the License. package strategy import ( + "fmt" "strconv" "testing" "time" @@ -161,24 +162,44 @@ func TestPoolEligibility(t *testing.T) { {plugin.CollectorPluginType, plugin.StickyRouting, 999, 0, false, false}, {plugin.CollectorPluginType, plugin.StickyRouting, 999, 1, false, false}, {plugin.CollectorPluginType, plugin.StickyRouting, 999, 2, false, true}, + {plugin.CollectorPluginType, plugin.StickyRouting, 999, 3, false, true}, + {plugin.CollectorPluginType, plugin.StickyRouting, 999, 4, false, false}, {plugin.CollectorPluginType, plugin.StickyRouting, 999, 2, true, false}, + + {plugin.CollectorPluginType, plugin.ConfigRouting, 999, 0, false, false}, + {plugin.CollectorPluginType, plugin.ConfigRouting, 999, 1, false, false}, + {plugin.CollectorPluginType, plugin.ConfigRouting, 999, 2, false, false}, + {plugin.CollectorPluginType, plugin.ConfigRouting, 999, 3, false, false}, + {plugin.CollectorPluginType, plugin.ConfigRouting, 999, 4, false, true}, + {plugin.CollectorPluginType, plugin.ConfigRouting, 999, 5, false, true}, + {plugin.CollectorPluginType, plugin.ConfigRouting, 999, 4, true, false}, } Convey("Then new pool eligibility is defined", func() { for i, tc := range tcs { - plg. - WithPluginType(tc.PlgType). + plg.WithPluginType(tc.PlgType). WithStrategy(tc.Strategy). WithExclusive(tc.Exclusiveness). WithConCount(tc.Concurrency). WithID(uint32(i)) + pool, _ := NewPool(plg.String(), plg) for j := 0; j < tc.Subscriptions; j++ { pool.Subscribe(strconv.Itoa(j), BoundSubscriptionType) } - So(pool.SubscriptionCount(), ShouldEqual, tc.Subscriptions) - So(pool.Eligible(), ShouldEqual, tc.Expected) + Convey(fmt.Sprintf( + "{strategy = %s, concurreny = %d, subscriptions = %d, exclusiveness = %v, count = %d}", + tc.Strategy.String(), + tc.Concurrency, + tc.Subscriptions, + tc.Exclusiveness, + pool.Count(), + ), + func() { + So(pool.SubscriptionCount(), ShouldEqual, tc.Subscriptions) + So(pool.Eligible(), ShouldEqual, tc.Expected) + }) } }) }) diff --git a/control/strategy/strategy.go b/control/strategy/strategy.go index 392c40d85..2dc632019 100644 --- a/control/strategy/strategy.go +++ b/control/strategy/strategy.go @@ -47,40 +47,6 @@ type RoutingAndCaching interface { String() string } -func (sm MapAvailablePlugin) AddMap(m map[uint32]AvailablePlugin) { - if sm.Size() == 0 { - sm = m - } else { - for k, v := range m { - sm[k] = v - } - } -} - -// RemoveAll deletes all key/value pairs from map -func (sm MapAvailablePlugin) RemoveAll() { - sm = map[uint32]AvailablePlugin{} -} - -// Size return number of key/value pairs -func (sm MapAvailablePlugin) Size() int { - return len(sm) -} - -// Empty checks if map contains any key/value pair -func (sm MapAvailablePlugin) Empty() bool { - return len(sm) == 0 -} - -// Keys returns slice of map keys -func (sm MapAvailablePlugin) Keys() []uint32 { - keys := []uint32{} - for k := range sm { - keys = append(keys, k) - } - return keys -} - // Values returns slice of map values func (sm MapAvailablePlugin) Values() []AvailablePlugin { values := []AvailablePlugin{} @@ -89,9 +55,3 @@ func (sm MapAvailablePlugin) Values() []AvailablePlugin { } return values } - -// HasKey checks if key exists in map -func (sm MapAvailablePlugin) HasKey(key uint32) bool { - _, found := sm[key] - return found -}