Skip to content
This repository has been archived by the owner on Nov 8, 2022. It is now read-only.

Commit

Permalink
Adds ability to pass a config to pulsed on startup
Browse files Browse the repository at this point in the history
The config enables plugins to get a config on load.
  • Loading branch information
jcooklin committed Oct 17, 2015
1 parent b6ad0d0 commit 32f3301
Show file tree
Hide file tree
Showing 22 changed files with 440 additions and 126 deletions.
168 changes: 144 additions & 24 deletions control/config.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
package control

import (
"bytes"
"encoding/json"
"fmt"
"reflect"
"strconv"

"github.com/intelsdi-x/pulse/control/plugin"
"github.com/intelsdi-x/pulse/core/cdata"
Expand All @@ -11,46 +15,79 @@ import (
// type configData map[string]*cdata.ConfigDataNode

type pluginConfig struct {
all *cdata.ConfigDataNode
collector map[string]*pluginConfigItem
publisher map[string]*pluginConfigItem
processor map[string]*pluginConfigItem
All *cdata.ConfigDataNode `json:"all"`
Collector map[string]*pluginConfigItem `json:"collector"`
Publisher map[string]*pluginConfigItem `json:"publisher"`
Processor map[string]*pluginConfigItem `json:"processor"`
pluginCache map[string]*cdata.ConfigDataNode
}

type pluginConfigItem struct {
*cdata.ConfigDataNode
versions map[int]*cdata.ConfigDataNode
Versions map[int]*cdata.ConfigDataNode `json:"versions"`
}

type config struct {
control *cdata.ConfigDataNode
scheduler *cdata.ConfigDataNode
plugins *pluginConfig
Control *cdata.ConfigDataNode `json:"control"`
Scheduler *cdata.ConfigDataNode `json:"scheduler"`
Plugins *pluginConfig `json:"plugins"`
}

func newConfig() *config {
func NewConfig() *config {
return &config{
control: cdata.NewNode(),
scheduler: cdata.NewNode(),
plugins: newPluginConfig(),
Control: cdata.NewNode(),
Scheduler: cdata.NewNode(),
Plugins: newPluginConfig(),
}
}

func newPluginConfig() *pluginConfig {
return &pluginConfig{
all: cdata.NewNode(),
collector: make(map[string]*pluginConfigItem),
processor: make(map[string]*pluginConfigItem),
publisher: make(map[string]*pluginConfigItem),
All: cdata.NewNode(),
Collector: make(map[string]*pluginConfigItem),
Processor: make(map[string]*pluginConfigItem),
Publisher: make(map[string]*pluginConfigItem),
pluginCache: make(map[string]*cdata.ConfigDataNode),
}
}

// UnmarshalJSON unmarshals valid json into pluginConfig. An example Config
// github.com/intelsdi-x/pulse/examples/configs/pulse-config-sample.
func (p *pluginConfig) UnmarshalJSON(data []byte) error {
t := map[string]interface{}{}
dec := json.NewDecoder(bytes.NewReader(data))
dec.UseNumber()
if err := dec.Decode(&t); err != nil {
return err
}

if v, ok := t["all"]; ok {
jv, err := json.Marshal(v)
if err != nil {
return err
}
cdn := &cdata.ConfigDataNode{}
dec = json.NewDecoder(bytes.NewReader(jv))
dec.UseNumber()
if err := dec.Decode(&cdn); err != nil {
return err
}
p.All = cdn
}

for _, typ := range []string{"collector", "processor", "publisher"} {
if err := unmarshalPluginConfig(typ, p, t); err != nil {
return err
}
}

return nil
}

func newPluginConfigItem(opts ...pluginConfigOpt) *pluginConfigItem {
p := &pluginConfigItem{
ConfigDataNode: cdata.NewNode(),
versions: make(map[int]*cdata.ConfigDataNode),
Versions: make(map[int]*cdata.ConfigDataNode),
}

for _, opt := range opts {
Expand Down Expand Up @@ -78,32 +115,115 @@ func (p *pluginConfig) get(pluginType plugin.PluginType, name string, ver int) *
//todo process/interpolate values

p.pluginCache[key] = cdata.NewNode()
p.pluginCache[key].Merge(p.all)
p.pluginCache[key].Merge(p.All)

// check for plugin config
switch pluginType {
case plugin.CollectorPluginType:
if res, ok := p.collector[name]; ok {
if res, ok := p.Collector[name]; ok {
p.pluginCache[key].Merge(res.ConfigDataNode)
if res2, ok2 := res.versions[ver]; ok2 {
if res2, ok2 := res.Versions[ver]; ok2 {
p.pluginCache[key].Merge(res2)
}
}
case plugin.ProcessorPluginType:
if res, ok := p.processor[name]; ok {
if res, ok := p.Processor[name]; ok {
p.pluginCache[key].Merge(res.ConfigDataNode)
if res2, ok2 := res.versions[ver]; ok2 {
if res2, ok2 := res.Versions[ver]; ok2 {
p.pluginCache[key].Merge(res2)
}
}
case plugin.PublisherPluginType:
if res, ok := p.publisher[name]; ok {
if res, ok := p.Publisher[name]; ok {
p.pluginCache[key].Merge(res.ConfigDataNode)
if res2, ok2 := res.versions[ver]; ok2 {
if res2, ok2 := res.Versions[ver]; ok2 {
p.pluginCache[key].Merge(res2)
}
}
}

return p.pluginCache[key]
}

func unmarshalPluginConfig(typ string, p *pluginConfig, t map[string]interface{}) error {
if v, ok := t[typ]; ok {
switch plugins := v.(type) {
case map[string]interface{}:
for name, c := range plugins {
switch typ {
case "collector":
p.Collector[name] = newPluginConfigItem()
case "processor":
p.Processor[name] = newPluginConfigItem()
case "publisher":
p.Publisher[name] = newPluginConfigItem()
}
switch col := c.(type) {
case map[string]interface{}:
if v, ok := col["all"]; ok {
jv, err := json.Marshal(v)
if err != nil {
return err
}
cdn := cdata.NewNode()
dec := json.NewDecoder(bytes.NewReader(jv))
dec.UseNumber()
if err := dec.Decode(&cdn); err != nil {
return err
}
switch typ {
case "collector":
p.Collector[name].ConfigDataNode = cdn
case "processor":
p.Processor[name].ConfigDataNode = cdn
case "publisher":
p.Publisher[name].ConfigDataNode = cdn
}
}
if vs, ok := col["versions"]; ok {
switch versions := vs.(type) {
case map[string]interface{}:
for ver, version := range versions {
switch v := version.(type) {
case map[string]interface{}:
jv, err := json.Marshal(v)
if err != nil {
return err
}
cdn := cdata.NewNode()
dec := json.NewDecoder(bytes.NewReader(jv))
dec.UseNumber()
if err := dec.Decode(&cdn); err != nil {
return err
}
ver, err := strconv.Atoi(ver)
if err != nil {
return err
}
switch typ {
case "collector":
p.Collector[name].Versions[ver] = cdn
case "processor":
p.Processor[name].Versions[ver] = cdn
case "publisher":
p.Publisher[name].Versions[ver] = cdn
}
default:
return fmt.Errorf("Error unmarshalling %v'%v' expected '%v' got '%v'", typ, name, map[string]interface{}{}, reflect.TypeOf(v))
}
}

default:
return fmt.Errorf("Error unmarshalling %v '%v' expected '%v' got '%v'", typ, name, map[string]interface{}{}, reflect.TypeOf(versions))
}
}
default:
return fmt.Errorf("Error unmarshalling %v '%v' expected '%v' got '%v'", typ, name, map[string]interface{}{}, reflect.TypeOf(col))
}
}
default:
return fmt.Errorf("Error unmarshalling %v expected '%v' got '%v'", typ, map[string]interface{}{}, reflect.TypeOf(plugins))
}
}
return nil
}
61 changes: 52 additions & 9 deletions control/config_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package control

import (
"encoding/json"
"io/ioutil"
"testing"

"github.com/intelsdi-x/pulse/control/plugin"
Expand All @@ -11,25 +13,66 @@ import (

func TestPluginConfig(t *testing.T) {
Convey("Given a plugin config", t, func() {
cfg := newConfig()
cfg := NewConfig()
So(cfg, ShouldNotBeNil)
Convey("with an entry for ALL plugins", func() {
cfg.plugins.all.AddItem("gvar", ctypes.ConfigValueBool{Value: true})
So(len(cfg.plugins.all.Table()), ShouldEqual, 1)
cfg.Plugins.All.AddItem("gvar", ctypes.ConfigValueBool{Value: true})
So(len(cfg.Plugins.All.Table()), ShouldEqual, 1)
Convey("an entry for a specific plugin of any version", func() {
cfg.plugins.collector["test"] = newPluginConfigItem(optAddPluginConfigItem("pvar", ctypes.ConfigValueBool{Value: true}))
So(len(cfg.plugins.collector["test"].Table()), ShouldEqual, 1)
cfg.Plugins.Collector["test"] = newPluginConfigItem(optAddPluginConfigItem("pvar", ctypes.ConfigValueBool{Value: true}))
So(len(cfg.Plugins.Collector["test"].Table()), ShouldEqual, 1)
Convey("and an entry for a specific plugin of a specific version", func() {
cfg.plugins.collector["test"].versions[1] = cdata.NewNode()
cfg.plugins.collector["test"].versions[1].AddItem("vvar", ctypes.ConfigValueBool{Value: true})
So(len(cfg.plugins.collector["test"].versions[1].Table()), ShouldEqual, 1)
cfg.Plugins.Collector["test"].Versions[1] = cdata.NewNode()
cfg.Plugins.Collector["test"].Versions[1].AddItem("vvar", ctypes.ConfigValueBool{Value: true})
So(len(cfg.Plugins.Collector["test"].Versions[1].Table()), ShouldEqual, 1)
Convey("we can get the merged conf for the given plugin", func() {
cd := cfg.plugins.get(plugin.CollectorPluginType, "test", 1)
cd := cfg.Plugins.get(plugin.CollectorPluginType, "test", 1)
So(len(cd.Table()), ShouldEqual, 3)
})
})
})
})
})

Convey("Provided a config in json", t, func() {
cfg := NewConfig()
b, err := ioutil.ReadFile("../examples/configs/pulse-config-sample.json")
So(b, ShouldNotBeEmpty)
So(b, ShouldNotBeNil)
So(err, ShouldBeNil)
Convey("We are able to unmarshal it into a valid config", func() {
err = json.Unmarshal(b, &cfg)
So(err, ShouldBeNil)
So(cfg.Control, ShouldNotBeNil)
So(cfg.Control.Table()["cache_ttl"], ShouldResemble, ctypes.ConfigValueStr{Value: "5s"})
So(cfg.Plugins.All.Table()["password"], ShouldResemble, ctypes.ConfigValueStr{Value: "p@ssw0rd"})
So(cfg.Plugins.Collector["pcm"], ShouldNotBeNil)
So(cfg.Plugins.Collector["pcm"].Table()["path"], ShouldResemble, ctypes.ConfigValueStr{Value: "/usr/local/pcm/bin"})
So(cfg.Plugins, ShouldNotBeNil)
So(cfg.Plugins.All, ShouldNotBeNil)
So(cfg.Plugins.Collector["pcm"].Versions[1].Table()["user"], ShouldResemble, ctypes.ConfigValueStr{Value: "john"})
So(cfg.Plugins.Processor["movingaverage"].Table()["user"], ShouldResemble, ctypes.ConfigValueStr{Value: "jane"})

Convey("Through the config object we can access the stored configurations for plugins", func() {
c := cfg.Plugins.get(plugin.ProcessorPluginType, "movingaverage", 0)
So(c, ShouldNotBeNil)
So(c.Table()["password"], ShouldResemble, ctypes.ConfigValueStr{Value: "p@ssw0rd"})
Convey("Overwritting the value of a variable defined for all plugins", func() {
c := cfg.Plugins.get(plugin.ProcessorPluginType, "movingaverage", 1)
So(c, ShouldNotBeNil)
So(c.Table()["password"], ShouldResemble, ctypes.ConfigValueStr{Value: "new password"})
})
Convey("Retrieving the value of a variable defined for all plugins", func() {
c := cfg.Plugins.get(plugin.CollectorPluginType, "pcm", 0)
So(c, ShouldNotBeNil)
So(c.Table()["password"], ShouldResemble, ctypes.ConfigValueStr{Value: "p@ssw0rd"})
})
Convey("Overwritting the value of a variable defined for all versions of the plugin", func() {
c := cfg.Plugins.get(plugin.ProcessorPluginType, "movingaverage", 1)
So(c, ShouldNotBeNil)
So(c.Table()["password"], ShouldResemble, ctypes.ConfigValueStr{Value: "new password"})
})
})
})
})
}
24 changes: 15 additions & 9 deletions control/control.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,25 +112,31 @@ type managesSigning interface {
ValidateSignature(keyringFile string, signedFile string, signatureFile string) perror.PulseError
}

type controlOpt func(*pluginControl)
type ControlOpt func(*pluginControl)

func MaxRunningPlugins(m int) controlOpt {
func MaxRunningPlugins(m int) ControlOpt {
return func(c *pluginControl) {
maximumRunningPlugins = m
}
}

func CacheExpiration(t time.Duration) controlOpt {
func CacheExpiration(t time.Duration) ControlOpt {
return func(c *pluginControl) {
client.GlobalCacheExpiration = t
}
}

func OptSetConfig(c *config) ControlOpt {
return func(p *pluginControl) {
p.config = c
}
}

// New returns a new pluginControl instance
func New(opts ...controlOpt) *pluginControl {
func New(opts ...ControlOpt) *pluginControl {

c := &pluginControl{}
c.config = newConfig()
c.config = NewConfig()
// Initialize components
//
// Event Manager
Expand All @@ -147,7 +153,7 @@ func New(opts ...controlOpt) *pluginControl {
}).Debug("metric catalog created")

// Plugin Manager
c.pluginManager = newPluginManager(OptSetPluginConfig(c.config.plugins))
c.pluginManager = newPluginManager(OptSetPluginConfig(c.config.Plugins))
controlLogger.WithFields(log.Fields{
"_block": "new",
}).Debug("plugin manager created")
Expand Down Expand Up @@ -684,7 +690,7 @@ func (p *pluginControl) CollectMetrics(metricTypes []core.Metric, deadline time.

// merge global plugin config into the config for the metric
for _, mt := range pmt.metricTypes {
mt.Config().Merge(p.config.plugins.get(plugin.CollectorPluginType, ap.Name(), ap.Version()))
mt.Config().Merge(p.config.Plugins.get(plugin.CollectorPluginType, ap.Name(), ap.Version()))
}

// get a metrics
Expand Down Expand Up @@ -755,7 +761,7 @@ func (p *pluginControl) PublishMetrics(contentType string, content []byte, plugi
}

// merge global plugin config into the config for this request
cfg := p.config.plugins.get(plugin.PublisherPluginType, ap.Name(), ap.Version()).Table()
cfg := p.config.Plugins.get(plugin.PublisherPluginType, ap.Name(), ap.Version()).Table()
for k, v := range config {
cfg[k] = v
}
Expand Down Expand Up @@ -797,7 +803,7 @@ func (p *pluginControl) ProcessMetrics(contentType string, content []byte, plugi
}

// merge global plugin config into the config for this request
cfg := p.config.plugins.get(plugin.ProcessorPluginType, ap.Name(), ap.Version()).Table()
cfg := p.config.Plugins.get(plugin.ProcessorPluginType, ap.Name(), ap.Version()).Table()

for k, v := range config {
cfg[k] = v
Expand Down
Loading

0 comments on commit 32f3301

Please sign in to comment.