Skip to content
This repository has been archived by the owner on Nov 8, 2022. It is now read-only.

Commit

Permalink
Exposes global plugin config through REST
Browse files Browse the repository at this point in the history
  • Loading branch information
jcooklin committed Oct 17, 2015
1 parent 32f3301 commit 6d82687
Show file tree
Hide file tree
Showing 21 changed files with 683 additions and 1,999 deletions.
231 changes: 199 additions & 32 deletions control/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,50 +7,90 @@ import (
"reflect"
"strconv"

"github.com/intelsdi-x/pulse/control/plugin"
log "github.com/Sirupsen/logrus"

"github.com/intelsdi-x/pulse/core"
"github.com/intelsdi-x/pulse/core/cdata"
"github.com/intelsdi-x/pulse/core/ctypes"
)

// type configData map[string]*cdata.ConfigDataNode

type pluginConfig struct {
All *cdata.ConfigDataNode `json:"all"`
Collector map[string]*pluginConfigItem `json:"collector"`
Publisher map[string]*pluginConfigItem `json:"publisher"`
Processor map[string]*pluginConfigItem `json:"processor"`
All *cdata.ConfigDataNode `json:"all"`
Collector *pluginTypeConfigItem `json:"collector"`
Publisher *pluginTypeConfigItem `json:"publisher"`
Processor *pluginTypeConfigItem `json:"processor"`
pluginCache map[string]*cdata.ConfigDataNode
}

type pluginTypeConfigItem struct {
Plugins map[string]*pluginConfigItem
All *cdata.ConfigDataNode `json:"all"`
}

type pluginConfigItem struct {
*cdata.ConfigDataNode
Versions map[int]*cdata.ConfigDataNode `json:"versions"`
}

type config struct {
Control *cdata.ConfigDataNode `json:"control"`
Scheduler *cdata.ConfigDataNode `json:"scheduler"`
Plugins *pluginConfig `json:"plugins"`
Plugins *pluginConfig `json:"plugins"`
}

func NewConfig() *config {
return &config{
Control: cdata.NewNode(),
Scheduler: cdata.NewNode(),
Plugins: newPluginConfig(),
Plugins: newPluginConfig(),
}
}

func newPluginTypeConfigItem() *pluginTypeConfigItem {
return &pluginTypeConfigItem{
make(map[string]*pluginConfigItem),
cdata.NewNode(),
}
}

func newPluginConfig() *pluginConfig {
return &pluginConfig{
All: cdata.NewNode(),
Collector: make(map[string]*pluginConfigItem),
Processor: make(map[string]*pluginConfigItem),
Publisher: make(map[string]*pluginConfigItem),
Collector: newPluginTypeConfigItem(),
Processor: newPluginTypeConfigItem(),
Publisher: newPluginTypeConfigItem(),
pluginCache: make(map[string]*cdata.ConfigDataNode),
}
}

func (p *config) GetPluginConfigDataNode(pluginType core.PluginType, name string, ver int) cdata.ConfigDataNode {
return *p.Plugins.getPluginConfigDataNode(pluginType, name, ver)
}

func (p *config) MergePluginConfigDataNode(pluginType core.PluginType, name string, ver int, cdn *cdata.ConfigDataNode) cdata.ConfigDataNode {
p.Plugins.mergePluginConfigDataNode(pluginType, name, ver, cdn)
return *p.Plugins.getPluginConfigDataNode(pluginType, name, ver)
}

func (p *config) MergePluginConfigDataNodeAll(cdn *cdata.ConfigDataNode) cdata.ConfigDataNode {
p.Plugins.mergePluginConfigDataNodeAll(cdn)
return *p.Plugins.All
}

func (p *config) DeletePluginConfigDataNodeField(pluginType core.PluginType, name string, ver int, fields ...string) cdata.ConfigDataNode {
for _, field := range fields {
p.Plugins.deletePluginConfigDataNodeField(pluginType, name, ver, field)
}
return *p.Plugins.getPluginConfigDataNode(pluginType, name, ver)
}

func (p *config) DeletePluginConfigDataNodeFieldAll(fields ...string) cdata.ConfigDataNode {
for _, field := range fields {
p.Plugins.deletePluginConfigDataNodeFieldAll(field)
}
return *p.Plugins.All
}

func (p *config) GetPluginConfigDataNodeAll() cdata.ConfigDataNode {
return *p.Plugins.All
}

// UnmarshalJSON unmarshals valid json into pluginConfig. An example Config
// github.com/intelsdi-x/pulse/examples/configs/pulse-config-sample.
func (p *pluginConfig) UnmarshalJSON(data []byte) error {
Expand All @@ -61,6 +101,7 @@ func (p *pluginConfig) UnmarshalJSON(data []byte) error {
return err
}

//process the key value pairs for ALL plugins
if v, ok := t["all"]; ok {
jv, err := json.Marshal(v)
if err != nil {
Expand All @@ -75,6 +116,7 @@ func (p *pluginConfig) UnmarshalJSON(data []byte) error {
p.All = cdn
}

//process the hierarchy of plugins
for _, typ := range []string{"collector", "processor", "publisher"} {
if err := unmarshalPluginConfig(typ, p, t); err != nil {
return err
Expand Down Expand Up @@ -105,7 +147,100 @@ func optAddPluginConfigItem(key string, value ctypes.ConfigValue) pluginConfigOp
}
}

func (p *pluginConfig) get(pluginType plugin.PluginType, name string, ver int) *cdata.ConfigDataNode {
func (p *pluginConfig) mergePluginConfigDataNodeAll(cdn *cdata.ConfigDataNode) {
// clear cache
p.pluginCache = make(map[string]*cdata.ConfigDataNode)

p.All.Merge(cdn)
return
}

func (p *pluginConfig) deletePluginConfigDataNodeFieldAll(key string) {
// clear cache
p.pluginCache = make(map[string]*cdata.ConfigDataNode)

p.All.DeleteItem(key)
return
}

func (p *pluginConfig) mergePluginConfigDataNode(pluginType core.PluginType, name string, ver int, cdn *cdata.ConfigDataNode) {
// clear cache
p.pluginCache = make(map[string]*cdata.ConfigDataNode)

// merge new config into existing
switch pluginType {
case core.CollectorPluginType:
if res, ok := p.Collector.Plugins[name]; ok {
if res2, ok2 := res.Versions[ver]; ok2 {
res2.Merge(cdn)
return
}
res.Merge(cdn)
return
}
p.Collector.All.Merge(cdn)
case core.ProcessorPluginType:
if res, ok := p.Processor.Plugins[name]; ok {
if res2, ok2 := res.Versions[ver]; ok2 {
res2.Merge(cdn)
return
}
res.Merge(cdn)
return
}
p.Processor.All.Merge(cdn)
case core.PublisherPluginType:
if res, ok := p.Publisher.Plugins[name]; ok {
if res2, ok2 := res.Versions[ver]; ok2 {
res2.Merge(cdn)
return
}
res.Merge(cdn)
return
}
p.Publisher.All.Merge(cdn)
}
}

func (p *pluginConfig) deletePluginConfigDataNodeField(pluginType core.PluginType, name string, ver int, key string) {
// clear cache
p.pluginCache = make(map[string]*cdata.ConfigDataNode)

switch pluginType {
case core.CollectorPluginType:
if res, ok := p.Collector.Plugins[name]; ok {
if res2, ok2 := res.Versions[ver]; ok2 {
res2.DeleteItem(key)
return
}
res.DeleteItem(key)
return
}
p.Collector.All.DeleteItem(key)
case core.ProcessorPluginType:
if res, ok := p.Processor.Plugins[name]; ok {
if res2, ok2 := res.Versions[ver]; ok2 {
res2.DeleteItem(key)
return
}
res.DeleteItem(key)
return
}
p.Processor.All.DeleteItem(key)
case core.PublisherPluginType:
if res, ok := p.Publisher.Plugins[name]; ok {
if res2, ok2 := res.Versions[ver]; ok2 {
res2.DeleteItem(key)
return
}
res.DeleteItem(key)
return
}
p.Publisher.All.DeleteItem(key)
}
}

func (p *pluginConfig) getPluginConfigDataNode(pluginType core.PluginType, name string, ver int) *cdata.ConfigDataNode {
// check cache
key := fmt.Sprintf("%d:%s:%d", pluginType, name, ver)
if res, ok := p.pluginCache[key]; ok {
Expand All @@ -119,29 +254,40 @@ func (p *pluginConfig) get(pluginType plugin.PluginType, name string, ver int) *

// check for plugin config
switch pluginType {
case plugin.CollectorPluginType:
if res, ok := p.Collector[name]; ok {
case core.CollectorPluginType:
p.pluginCache[key].Merge(p.Collector.All)
if res, ok := p.Collector.Plugins[name]; ok {
p.pluginCache[key].Merge(res.ConfigDataNode)
if res2, ok2 := res.Versions[ver]; ok2 {
p.pluginCache[key].Merge(res2)
}
}
case plugin.ProcessorPluginType:
if res, ok := p.Processor[name]; ok {
case core.ProcessorPluginType:
p.pluginCache[key].Merge(p.Processor.All)
if res, ok := p.Processor.Plugins[name]; ok {
p.pluginCache[key].Merge(res.ConfigDataNode)
if res2, ok2 := res.Versions[ver]; ok2 {
p.pluginCache[key].Merge(res2)
}
}
case plugin.PublisherPluginType:
if res, ok := p.Publisher[name]; ok {
case core.PublisherPluginType:
p.pluginCache[key].Merge(p.Publisher.All)
if res, ok := p.Publisher.Plugins[name]; ok {
p.pluginCache[key].Merge(res.ConfigDataNode)
if res2, ok2 := res.Versions[ver]; ok2 {
p.pluginCache[key].Merge(res2)
}
}
}

//todo change to debug
log.WithFields(log.Fields{
"_block_": "getPluginConfigDataNode",
"_module": "config",
"config-cache-key": key,
"config-cache-value": p.pluginCache[key],
}).Debug("Getting plugin config")

return p.pluginCache[key]
}

Expand All @@ -150,13 +296,34 @@ func unmarshalPluginConfig(typ string, p *pluginConfig, t map[string]interface{}
switch plugins := v.(type) {
case map[string]interface{}:
for name, c := range plugins {
if name == "all" {
jv, err := json.Marshal(c)
if err != nil {
return err
}
cdn := cdata.NewNode()
dec := json.NewDecoder(bytes.NewReader(jv))
dec.UseNumber()
if err := dec.Decode(&cdn); err != nil {
return err
}
switch typ {
case "collector":
p.Collector.All = cdn
case "processor":
p.Processor.All = cdn
case "publisher":
p.Publisher.All = cdn
}
continue
}
switch typ {
case "collector":
p.Collector[name] = newPluginConfigItem()
p.Collector.Plugins[name] = newPluginConfigItem()
case "processor":
p.Processor[name] = newPluginConfigItem()
p.Processor.Plugins[name] = newPluginConfigItem()
case "publisher":
p.Publisher[name] = newPluginConfigItem()
p.Publisher.Plugins[name] = newPluginConfigItem()
}
switch col := c.(type) {
case map[string]interface{}:
Expand All @@ -173,11 +340,11 @@ func unmarshalPluginConfig(typ string, p *pluginConfig, t map[string]interface{}
}
switch typ {
case "collector":
p.Collector[name].ConfigDataNode = cdn
p.Collector.Plugins[name].ConfigDataNode = cdn
case "processor":
p.Processor[name].ConfigDataNode = cdn
p.Processor.Plugins[name].ConfigDataNode = cdn
case "publisher":
p.Publisher[name].ConfigDataNode = cdn
p.Publisher.Plugins[name].ConfigDataNode = cdn
}
}
if vs, ok := col["versions"]; ok {
Expand All @@ -202,11 +369,11 @@ func unmarshalPluginConfig(typ string, p *pluginConfig, t map[string]interface{}
}
switch typ {
case "collector":
p.Collector[name].Versions[ver] = cdn
p.Collector.Plugins[name].Versions[ver] = cdn
case "processor":
p.Processor[name].Versions[ver] = cdn
p.Processor.Plugins[name].Versions[ver] = cdn
case "publisher":
p.Publisher[name].Versions[ver] = cdn
p.Publisher.Plugins[name].Versions[ver] = cdn
}
default:
return fmt.Errorf("Error unmarshalling %v'%v' expected '%v' got '%v'", typ, name, map[string]interface{}{}, reflect.TypeOf(v))
Expand Down
Loading

0 comments on commit 6d82687

Please sign in to comment.