Skip to content
This repository has been archived by the owner on Nov 8, 2022. It is now read-only.

Commit

Permalink
Review comments implemented
Browse files Browse the repository at this point in the history
  • Loading branch information
marcin-krolik committed Apr 25, 2016
1 parent 0722d08 commit 4f7ed67
Show file tree
Hide file tree
Showing 4 changed files with 51 additions and 89 deletions.
20 changes: 4 additions & 16 deletions control/available_plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -386,14 +386,10 @@ func (ap *availablePlugins) collectMetrics(pluginKey string, metricTypes []core.
if config != nil {
cfg = config.Table()
}
opts := strategy.SelectorValues{
Task: taskID,
Config: cfg,
}

pool.RLock()
defer pool.RUnlock()
p, serr := pool.SelectAP(opts)
p, serr := pool.SelectAP(taskID, cfg)
if serr != nil {
return nil, serr
}
Expand Down Expand Up @@ -445,12 +441,7 @@ func (ap *availablePlugins) publishMetrics(contentType string, content []byte, p
pool.RLock()
defer pool.RUnlock()

opts := strategy.SelectorValues{
Task: taskID,
Config: config,
}

p, err := pool.SelectAP(opts)
p, err := pool.SelectAP(taskID, config)
if err != nil {
errs = append(errs, err)
return errs
Expand Down Expand Up @@ -481,13 +472,10 @@ func (ap *availablePlugins) processMetrics(contentType string, content []byte, p
if pool == nil {
return "", nil, []error{serror.New(ErrPoolNotFound, map[string]interface{}{"pool-key": key})}
}
opts := strategy.SelectorValues{
Task: taskID,
Config: config,
}

pool.RLock()
defer pool.RUnlock()
p, err := pool.SelectAP(opts)
p, err := pool.SelectAP(taskID, config)
if err != nil {
errs = append(errs, err)
return "", nil, errs
Expand Down
51 changes: 22 additions & 29 deletions control/strategy/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@ var (
// This defines the maximum running instances of a loaded plugin.
// It is initialized at runtime via the cli.
MaximumRunningPlugins = 3
ErrStrategyUnknown = errors.New("Provided RoutingAndCaching strategy unknown") // TODO - remove?
)

var (
Expand All @@ -70,7 +69,7 @@ type Pool interface {
RLock()
RUnlock()
SelectAndKill(taskID, reason string)
SelectAP(opts SelectorValues) (AvailablePlugin, serror.SnapError)
SelectAP(taskID string, configID map[string]ctypes.ConfigValue) (AvailablePlugin, serror.SnapError)
Strategy() RoutingAndCaching
Subscribe(taskID string, subType SubscriptionType)
SubscriptionCount() int
Expand All @@ -93,11 +92,6 @@ type AvailablePlugin interface {
Type() plugin.PluginType
}

type SelectorValues struct {
Task string
Config map[string]ctypes.ConfigValue
}

type subscription struct {
SubType SubscriptionType
Version int
Expand Down Expand Up @@ -359,32 +353,26 @@ func (p *pool) SubscriptionCount() int {
}

// SelectAP selects an available plugin from the pool
func (p *pool) SelectAP(opts SelectorValues) (AvailablePlugin, serror.SnapError) {
func (p *pool) SelectAP(taskID string, config map[string]ctypes.ConfigValue) (AvailablePlugin, serror.SnapError) {
p.RLock()
defer p.RUnlock()

selector, err := func(opts SelectorValues) (string, error) {
var e error
s := ""
switch p.Strategy().String() {
case "least-recently-used":
s = "lru"
case "sticky":
s = opts.Task
case "config-based":
s = idFromCfg(opts.Config)
default:
e = ErrBadStrategy
}
return s, e
}(opts)
aps := p.plugins.Values()

if err != nil {
return nil, serror.New(err)
var id string
switch p.Strategy().String() {
case "least-recently-used":
id = ""
case "sticky":
id = taskID
case "config-based":
id = idFromCfg(config)
default:
return nil, serror.New(ErrBadStrategy)
}
aps := p.plugins.Values()
ap, err := p.Select(aps, selector)
if err != nil || ap == nil {

ap, err := p.Select(aps, id)
if err != nil {
return nil, serror.New(err)
}
return ap, nil
Expand All @@ -409,11 +397,16 @@ func (p *pool) generatePID() uint32 {
// MoveSubscriptions moves subscriptions to another pool
func (p *pool) MoveSubscriptions(to Pool) []subscription {
var subs []subscription

// If attempting to move between the same pool
// bail to prevent deadlock.
if to.(*pool) == p {
return []subscription{}
}
p.Lock()
defer p.Unlock()

for task, sub := range p.subs {
// ensure that this sub was not bound to this pool specifically before moving
if sub.SubType == UnboundSubscriptionType {
subs = append(subs, *sub)
to.Subscribe(task, UnboundSubscriptionType)
Expand Down
29 changes: 25 additions & 4 deletions control/strategy/pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ limitations under the License.
package strategy

import (
"fmt"
"strconv"
"testing"
"time"
Expand Down Expand Up @@ -161,24 +162,44 @@ func TestPoolEligibility(t *testing.T) {
{plugin.CollectorPluginType, plugin.StickyRouting, 999, 0, false, false},
{plugin.CollectorPluginType, plugin.StickyRouting, 999, 1, false, false},
{plugin.CollectorPluginType, plugin.StickyRouting, 999, 2, false, true},
{plugin.CollectorPluginType, plugin.StickyRouting, 999, 3, false, true},
{plugin.CollectorPluginType, plugin.StickyRouting, 999, 4, false, false},
{plugin.CollectorPluginType, plugin.StickyRouting, 999, 2, true, false},

{plugin.CollectorPluginType, plugin.ConfigRouting, 999, 0, false, false},
{plugin.CollectorPluginType, plugin.ConfigRouting, 999, 1, false, false},
{plugin.CollectorPluginType, plugin.ConfigRouting, 999, 2, false, false},
{plugin.CollectorPluginType, plugin.ConfigRouting, 999, 3, false, false},
{plugin.CollectorPluginType, plugin.ConfigRouting, 999, 4, false, true},
{plugin.CollectorPluginType, plugin.ConfigRouting, 999, 5, false, true},
{plugin.CollectorPluginType, plugin.ConfigRouting, 999, 4, true, false},
}
Convey("Then new pool eligibility is defined", func() {
for i, tc := range tcs {
plg.
WithPluginType(tc.PlgType).
plg.WithPluginType(tc.PlgType).
WithStrategy(tc.Strategy).
WithExclusive(tc.Exclusiveness).
WithConCount(tc.Concurrency).
WithID(uint32(i))

pool, _ := NewPool(plg.String(), plg)

for j := 0; j < tc.Subscriptions; j++ {
pool.Subscribe(strconv.Itoa(j), BoundSubscriptionType)
}
So(pool.SubscriptionCount(), ShouldEqual, tc.Subscriptions)
So(pool.Eligible(), ShouldEqual, tc.Expected)

Convey(fmt.Sprintf(
"{strategy = %s, concurreny = %d, subscriptions = %d, exclusiveness = %v, count = %d}",
tc.Strategy.String(),
tc.Concurrency,
tc.Subscriptions,
tc.Exclusiveness,
pool.Count(),
),
func() {
So(pool.SubscriptionCount(), ShouldEqual, tc.Subscriptions)
So(pool.Eligible(), ShouldEqual, tc.Expected)
})
}
})
})
Expand Down
40 changes: 0 additions & 40 deletions control/strategy/strategy.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,40 +47,6 @@ type RoutingAndCaching interface {
String() string
}

func (sm MapAvailablePlugin) AddMap(m map[uint32]AvailablePlugin) {
if sm.Size() == 0 {
sm = m
} else {
for k, v := range m {
sm[k] = v
}
}
}

// RemoveAll deletes all key/value pairs from map
func (sm MapAvailablePlugin) RemoveAll() {
sm = map[uint32]AvailablePlugin{}
}

// Size return number of key/value pairs
func (sm MapAvailablePlugin) Size() int {
return len(sm)
}

// Empty checks if map contains any key/value pair
func (sm MapAvailablePlugin) Empty() bool {
return len(sm) == 0
}

// Keys returns slice of map keys
func (sm MapAvailablePlugin) Keys() []uint32 {
keys := []uint32{}
for k := range sm {
keys = append(keys, k)
}
return keys
}

// Values returns slice of map values
func (sm MapAvailablePlugin) Values() []AvailablePlugin {
values := []AvailablePlugin{}
Expand All @@ -89,9 +55,3 @@ func (sm MapAvailablePlugin) Values() []AvailablePlugin {
}
return values
}

// HasKey checks if key exists in map
func (sm MapAvailablePlugin) HasKey(key uint32) bool {
_, found := sm[key]
return found
}

0 comments on commit 4f7ed67

Please sign in to comment.