Skip to content
This repository has been archived by the owner on Nov 8, 2022. It is now read-only.

Commit

Permalink
address golint warnings in package control
Browse files Browse the repository at this point in the history
  • Loading branch information
candysmurf committed Dec 11, 2015
1 parent c1bbfb3 commit 3d6392f
Show file tree
Hide file tree
Showing 13 changed files with 158 additions and 129 deletions.
31 changes: 17 additions & 14 deletions control/available_plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,12 @@ const (
)

var (
// ErrPoolNotFound - error message when the plugin pool not found
ErrPoolNotFound = errors.New("plugin pool not found")
ErrBadKey = errors.New("bad key")
ErrBadType = errors.New("bad plugin type")
// ErrBadKey - error message when a bad key used
ErrBadKey = errors.New("bad key")
// ErrBadType - error message when a bad plugin type used
ErrBadType = errors.New("bad plugin type")

// This defines the maximum running instances of a loaded plugin.
// It is initialized at runtime via the cli.
Expand Down Expand Up @@ -108,13 +111,13 @@ func newAvailablePlugin(resp *plugin.Response, emitter gomit.Emitter, ep executa
}
ap.key = fmt.Sprintf("%s:%s:%d", ap.pluginType.String(), ap.name, ap.version)

listenUrl := fmt.Sprintf("http://%v/rpc", resp.ListenAddress)
listenURL := fmt.Sprintf("http://%v/rpc", resp.ListenAddress)
// Create RPC Client
switch resp.Type {
case plugin.CollectorPluginType:
switch resp.Meta.RPCType {
case plugin.JSONRPC:
c, e := client.NewCollectorHttpJSONRPCClient(listenUrl, DefaultClientTimeout, resp.PublicKey, !resp.Meta.Unsecure)
c, e := client.NewCollectorHttpJSONRPCClient(listenURL, DefaultClientTimeout, resp.PublicKey, !resp.Meta.Unsecure)
if e != nil {
return nil, errors.New("error while creating client connection: " + e.Error())
}
Expand All @@ -129,7 +132,7 @@ func newAvailablePlugin(resp *plugin.Response, emitter gomit.Emitter, ep executa
case plugin.PublisherPluginType:
switch resp.Meta.RPCType {
case plugin.JSONRPC:
c, e := client.NewPublisherHttpJSONRPCClient(listenUrl, DefaultClientTimeout, resp.PublicKey, !resp.Meta.Unsecure)
c, e := client.NewPublisherHttpJSONRPCClient(listenURL, DefaultClientTimeout, resp.PublicKey, !resp.Meta.Unsecure)
if e != nil {
return nil, errors.New("error while creating client connection: " + e.Error())
}
Expand All @@ -144,7 +147,7 @@ func newAvailablePlugin(resp *plugin.Response, emitter gomit.Emitter, ep executa
case plugin.ProcessorPluginType:
switch resp.Meta.RPCType {
case plugin.JSONRPC:
c, e := client.NewProcessorHttpJSONRPCClient(listenUrl, DefaultClientTimeout, resp.PublicKey, !resp.Meta.Unsecure)
c, e := client.NewProcessorHttpJSONRPCClient(listenURL, DefaultClientTimeout, resp.PublicKey, !resp.Meta.Unsecure)
if e != nil {
return nil, errors.New("error while creating client connection: " + e.Error())
}
Expand Down Expand Up @@ -363,27 +366,27 @@ func (p *apPool) insert(ap *availablePlugin) error {

// subscribe adds a subscription to the pool.
// Using subscribe is idempotent.
func (p *apPool) subscribe(taskId string, subType subscriptionType) {
func (p *apPool) subscribe(taskID string, subType subscriptionType) {
p.Lock()
defer p.Unlock()

if _, exists := p.subs[taskId]; !exists {
if _, exists := p.subs[taskID]; !exists {
// Version is the last item in the key, so we split here
// to retrieve it for the subscription.
p.subs[taskId] = &subscription{
taskId: taskId,
p.subs[taskID] = &subscription{
taskID: taskID,
subType: subType,
version: p.version,
}
}
}

// subscribe adds a subscription to the pool.
// unsubscribe removes a subscription from the pool.
// Using unsubscribe is idempotent.
func (p *apPool) unsubscribe(taskId string) {
func (p *apPool) unsubscribe(taskID string) {
p.Lock()
defer p.Unlock()
delete(p.subs, taskId)
delete(p.subs, taskID)
}

func (p *apPool) eligible() bool {
Expand Down Expand Up @@ -526,7 +529,7 @@ func (p *apPool) moveSubscriptions(to *apPool) []subscription {
type subscription struct {
subType subscriptionType
version int
taskId string
taskID string
}

type availablePlugins struct {
Expand Down
59 changes: 29 additions & 30 deletions control/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ type config struct {
Plugins *pluginConfig `json:"plugins"`
}

// NewConfig returns a reference to a global config type for the snap daemon
// by using a newly created empty plugin config.
func NewConfig() *config {
return &config{
Plugins: newPluginConfig(),
Expand Down Expand Up @@ -196,18 +198,17 @@ func (p *pluginConfig) mergePluginConfigDataNode(pluginType core.PluginType, nam
}
res.Merge(cdn)
return
} else {
if name != "" {
cn := cdata.NewNode()
cn.Merge(cdn)
p.Collector.Plugins[name] = newPluginConfigItem()
if ver > 0 {
p.Collector.Plugins[name].Versions = map[int]*cdata.ConfigDataNode{ver: cn}
return
}
p.Collector.Plugins[name].ConfigDataNode = cn
}
if name != "" {
cn := cdata.NewNode()
cn.Merge(cdn)
p.Collector.Plugins[name] = newPluginConfigItem()
if ver > 0 {
p.Collector.Plugins[name].Versions = map[int]*cdata.ConfigDataNode{ver: cn}
return
}
p.Collector.Plugins[name].ConfigDataNode = cn
return
}
p.Collector.All.Merge(cdn)
case core.ProcessorPluginType:
Expand All @@ -218,18 +219,17 @@ func (p *pluginConfig) mergePluginConfigDataNode(pluginType core.PluginType, nam
}
res.Merge(cdn)
return
} else {
if name != "" {
cn := cdata.NewNode()
cn.Merge(cdn)
p.Processor.Plugins[name] = newPluginConfigItem()
if ver > 0 {
p.Processor.Plugins[name].Versions = map[int]*cdata.ConfigDataNode{ver: cn}
return
}
p.Processor.Plugins[name].ConfigDataNode = cn
}
if name != "" {
cn := cdata.NewNode()
cn.Merge(cdn)
p.Processor.Plugins[name] = newPluginConfigItem()
if ver > 0 {
p.Processor.Plugins[name].Versions = map[int]*cdata.ConfigDataNode{ver: cn}
return
}
p.Processor.Plugins[name].ConfigDataNode = cn
return
}
p.Processor.All.Merge(cdn)
case core.PublisherPluginType:
Expand All @@ -240,18 +240,17 @@ func (p *pluginConfig) mergePluginConfigDataNode(pluginType core.PluginType, nam
}
res.Merge(cdn)
return
} else {
if name != "" {
cn := cdata.NewNode()
cn.Merge(cdn)
p.Publisher.Plugins[name] = newPluginConfigItem()
if ver > 0 {
p.Publisher.Plugins[name].Versions = map[int]*cdata.ConfigDataNode{ver: cn}
return
}
p.Publisher.Plugins[name].ConfigDataNode = cn
}
if name != "" {
cn := cdata.NewNode()
cn.Merge(cdn)
p.Publisher.Plugins[name] = newPluginConfigItem()
if ver > 0 {
p.Publisher.Plugins[name].Versions = map[int]*cdata.ConfigDataNode{ver: cn}
return
}
p.Publisher.Plugins[name].ConfigDataNode = cn
return
}
p.Publisher.All.Merge(cdn)
}
Expand Down
52 changes: 30 additions & 22 deletions control/control.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,11 @@ import (
)

const (
// PluginTrustDisabled - enum representing plugin trust disabled
PluginTrustDisabled int = iota
// PluginTrustEnabled - enum representing plugin trust enabled
PluginTrustEnabled
// PluginTrustWarn - enum representing plugin trust warning
PluginTrustWarn
)

Expand All @@ -58,7 +61,9 @@ var (
"_module": "control",
})

// ErrLoadedPluginNotFound - error message when a loaded plugin is not found
ErrLoadedPluginNotFound = errors.New("Loaded plugin not found")
// ErrControllerNotStarted - error message when the Controller was not started
ErrControllerNotStarted = errors.New("Must start Controller before calling Load()")
)

Expand Down Expand Up @@ -125,29 +130,33 @@ type managesSigning interface {
ValidateSignature([]string, string, []byte) error
}

type ControlOpt func(*pluginControl)
// PluginControlOpt is used to set optional parameters on the pluginControl struct
type PluginControlOpt func(*pluginControl)

func MaxRunningPlugins(m int) ControlOpt {
// MaxRunningPlugins sets the maximum number of plugins to run per pool
func MaxRunningPlugins(m int) PluginControlOpt {
return func(c *pluginControl) {
maximumRunningPlugins = m
}
}

func CacheExpiration(t time.Duration) ControlOpt {
// CacheExpiration is the PluginControlOpt which sets the global metric cache TTL
func CacheExpiration(t time.Duration) PluginControlOpt {
return func(c *pluginControl) {
client.GlobalCacheExpiration = t
}
}

func OptSetConfig(cfg *config) ControlOpt {
// OptSetConfig sets the plugin control configuration.
func OptSetConfig(cfg *config) PluginControlOpt {
return func(c *pluginControl) {
c.Config = cfg
c.pluginManager.SetPluginConfig(cfg.Plugins)
}
}

// New returns a new pluginControl instance
func New(opts ...ControlOpt) *pluginControl {
func New(opts ...PluginControlOpt) *pluginControl {

c := &pluginControl{}
c.Config = NewConfig()
Expand Down Expand Up @@ -313,11 +322,10 @@ func (p *pluginControl) verifySignature(rp *core.RequestedPlugin) (bool, serror.
if rp.Signature() == nil {
controlLogger.WithFields(f).Warn("Loading unsigned plugin ", rp.Path())
return false, nil
} else {
err := p.signingManager.ValidateSignature(p.keyringFiles, rp.Path(), rp.Signature())
if err != nil {
return false, serror.New(err)
}
}
err := p.signingManager.ValidateSignature(p.keyringFiles, rp.Path(), rp.Signature())
if err != nil {
return false, serror.New(err)
}
}
return true, nil
Expand Down Expand Up @@ -521,7 +529,7 @@ func (p *pluginControl) validateMetricTypeSubscription(mt core.RequestedMetric,

// No metric found return error.
if m == nil {
serrs = append(serrs, serror.New(errors.New(fmt.Sprintf("no metric found cannot subscribe: (%s) version(%d)", mt.Namespace(), mt.Version()))))
serrs = append(serrs, serror.New(fmt.Errorf("no metric found cannot subscribe: (%s) version(%d)", mt.Namespace(), mt.Version())))
return nil, serrs
}

Expand Down Expand Up @@ -603,7 +611,7 @@ func (p *pluginControl) gatherCollectors(mts []core.Metric) ([]core.Plugin, []se
return plugins, nil
}

func (p *pluginControl) SubscribeDeps(taskId string, mts []core.Metric, plugins []core.Plugin) []serror.SnapError {
func (p *pluginControl) SubscribeDeps(taskID string, mts []core.Metric, plugins []core.Plugin) []serror.SnapError {
var serrs []serror.SnapError

collectors, errs := p.gatherCollectors(mts)
Expand All @@ -628,7 +636,7 @@ func (p *pluginControl) SubscribeDeps(taskId string, mts []core.Metric, plugins
serrs = append(serrs, serror.New(err))
return serrs
}
pool.subscribe(taskId, unboundSubscriptionType)
pool.subscribe(taskID, unboundSubscriptionType)
if pool.eligible() {
err = p.verifyPlugin(latest)
if err != nil {
Expand All @@ -647,7 +655,7 @@ func (p *pluginControl) SubscribeDeps(taskId string, mts []core.Metric, plugins
serrs = append(serrs, serror.New(err))
return serrs
}
pool.subscribe(taskId, boundSubscriptionType)
pool.subscribe(taskID, boundSubscriptionType)
if pool.eligible() {
pl, err := p.pluginManager.get(fmt.Sprintf("%s:%s:%d", sub.TypeName(), sub.Name(), sub.Version()))
if err != nil {
Expand All @@ -666,7 +674,7 @@ func (p *pluginControl) SubscribeDeps(taskId string, mts []core.Metric, plugins
}
}
}
serr := p.sendPluginSubscriptionEvent(taskId, sub)
serr := p.sendPluginSubscriptionEvent(taskID, sub)
if serr != nil {
serrs = append(serrs, serr)
}
Expand All @@ -690,13 +698,13 @@ func (p *pluginControl) verifyPlugin(lp *loadedPlugin) error {
return nil
}

func (p *pluginControl) sendPluginSubscriptionEvent(taskId string, pl core.Plugin) serror.SnapError {
func (p *pluginControl) sendPluginSubscriptionEvent(taskID string, pl core.Plugin) serror.SnapError {
pt, err := core.ToPluginType(pl.TypeName())
if err != nil {
return serror.New(err)
}
e := &control_event.PluginSubscriptionEvent{
TaskId: taskId,
TaskId: taskID,
PluginType: int(pt),
PluginName: pl.Name(),
PluginVersion: pl.Version(),
Expand All @@ -711,7 +719,7 @@ func (p *pluginControl) sendPluginSubscriptionEvent(taskId string, pl core.Plugi
return nil
}

func (p *pluginControl) UnsubscribeDeps(taskId string, mts []core.Metric, plugins []core.Plugin) []serror.SnapError {
func (p *pluginControl) UnsubscribeDeps(taskID string, mts []core.Metric, plugins []core.Plugin) []serror.SnapError {
var serrs []serror.SnapError

collectors, errs := p.gatherCollectors(mts)
Expand All @@ -727,9 +735,9 @@ func (p *pluginControl) UnsubscribeDeps(taskId string, mts []core.Metric, plugin
return serrs
}
if pool != nil {
pool.unsubscribe(taskId)
pool.unsubscribe(taskID)
}
serr := p.sendPluginUnsubscriptionEvent(taskId, sub)
serr := p.sendPluginUnsubscriptionEvent(taskID, sub)
if serr != nil {
serrs = append(serrs, serr)
}
Expand All @@ -738,13 +746,13 @@ func (p *pluginControl) UnsubscribeDeps(taskId string, mts []core.Metric, plugin
return serrs
}

func (p *pluginControl) sendPluginUnsubscriptionEvent(taskId string, pl core.Plugin) serror.SnapError {
func (p *pluginControl) sendPluginUnsubscriptionEvent(taskID string, pl core.Plugin) serror.SnapError {
pt, err := core.ToPluginType(pl.TypeName())
if err != nil {
return serror.New(err)
}
e := &control_event.PluginUnsubscriptionEvent{
TaskId: taskId,
TaskId: taskID,
PluginType: int(pt),
PluginName: pl.Name(),
PluginVersion: pl.Version(),
Expand Down
Loading

0 comments on commit 3d6392f

Please sign in to comment.