Skip to content
This repository has been archived by the owner on Nov 8, 2022. It is now read-only.

Commit

Permalink
Adds global config
Browse files Browse the repository at this point in the history
closes #386
JRA-313 #closes
JRA-314 #closes
  • Loading branch information
jcooklin committed Oct 17, 2015
1 parent e38e852 commit b6ad0d0
Show file tree
Hide file tree
Showing 26 changed files with 1,559 additions and 26 deletions.
109 changes: 109 additions & 0 deletions control/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
package control

import (
"fmt"

"github.com/intelsdi-x/pulse/control/plugin"
"github.com/intelsdi-x/pulse/core/cdata"
"github.com/intelsdi-x/pulse/core/ctypes"
)

// type configData map[string]*cdata.ConfigDataNode

type pluginConfig struct {
all *cdata.ConfigDataNode
collector map[string]*pluginConfigItem
publisher map[string]*pluginConfigItem
processor map[string]*pluginConfigItem
pluginCache map[string]*cdata.ConfigDataNode
}

type pluginConfigItem struct {
*cdata.ConfigDataNode
versions map[int]*cdata.ConfigDataNode
}

type config struct {
control *cdata.ConfigDataNode
scheduler *cdata.ConfigDataNode
plugins *pluginConfig
}

func newConfig() *config {
return &config{
control: cdata.NewNode(),
scheduler: cdata.NewNode(),
plugins: newPluginConfig(),
}
}

func newPluginConfig() *pluginConfig {
return &pluginConfig{
all: cdata.NewNode(),
collector: make(map[string]*pluginConfigItem),
processor: make(map[string]*pluginConfigItem),
publisher: make(map[string]*pluginConfigItem),
pluginCache: make(map[string]*cdata.ConfigDataNode),
}
}

func newPluginConfigItem(opts ...pluginConfigOpt) *pluginConfigItem {
p := &pluginConfigItem{
ConfigDataNode: cdata.NewNode(),
versions: make(map[int]*cdata.ConfigDataNode),
}

for _, opt := range opts {
opt(p)
}

return p
}

type pluginConfigOpt func(*pluginConfigItem)

func optAddPluginConfigItem(key string, value ctypes.ConfigValue) pluginConfigOpt {
return func(p *pluginConfigItem) {
p.AddItem(key, value)
}
}

func (p *pluginConfig) get(pluginType plugin.PluginType, name string, ver int) *cdata.ConfigDataNode {
// check cache
key := fmt.Sprintf("%d:%s:%d", pluginType, name, ver)
if res, ok := p.pluginCache[key]; ok {
return res
}

//todo process/interpolate values

p.pluginCache[key] = cdata.NewNode()
p.pluginCache[key].Merge(p.all)

// check for plugin config
switch pluginType {
case plugin.CollectorPluginType:
if res, ok := p.collector[name]; ok {
p.pluginCache[key].Merge(res.ConfigDataNode)
if res2, ok2 := res.versions[ver]; ok2 {
p.pluginCache[key].Merge(res2)
}
}
case plugin.ProcessorPluginType:
if res, ok := p.processor[name]; ok {
p.pluginCache[key].Merge(res.ConfigDataNode)
if res2, ok2 := res.versions[ver]; ok2 {
p.pluginCache[key].Merge(res2)
}
}
case plugin.PublisherPluginType:
if res, ok := p.publisher[name]; ok {
p.pluginCache[key].Merge(res.ConfigDataNode)
if res2, ok2 := res.versions[ver]; ok2 {
p.pluginCache[key].Merge(res2)
}
}
}

return p.pluginCache[key]
}
35 changes: 35 additions & 0 deletions control/config_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package control

import (
"testing"

"github.com/intelsdi-x/pulse/control/plugin"
"github.com/intelsdi-x/pulse/core/cdata"
"github.com/intelsdi-x/pulse/core/ctypes"
. "github.com/smartystreets/goconvey/convey"
)

func TestPluginConfig(t *testing.T) {
Convey("Given a plugin config", t, func() {
cfg := newConfig()
So(cfg, ShouldNotBeNil)
Convey("with an entry for ALL plugins", func() {
cfg.plugins.all.AddItem("gvar", ctypes.ConfigValueBool{Value: true})
So(len(cfg.plugins.all.Table()), ShouldEqual, 1)
Convey("an entry for a specific plugin of any version", func() {
cfg.plugins.collector["test"] = newPluginConfigItem(optAddPluginConfigItem("pvar", ctypes.ConfigValueBool{Value: true}))
So(len(cfg.plugins.collector["test"].Table()), ShouldEqual, 1)
Convey("and an entry for a specific plugin of a specific version", func() {
cfg.plugins.collector["test"].versions[1] = cdata.NewNode()
cfg.plugins.collector["test"].versions[1].AddItem("vvar", ctypes.ConfigValueBool{Value: true})
So(len(cfg.plugins.collector["test"].versions[1].Table()), ShouldEqual, 1)
Convey("we can get the merged conf for the given plugin", func() {
cd := cfg.plugins.get(plugin.CollectorPluginType, "test", 1)
So(len(cd.Table()), ShouldEqual, 3)
})
})
})
})

})
}
26 changes: 23 additions & 3 deletions control/control.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ type pluginControl struct {
// TODO, going to need coordination on changing of these
RunningPlugins executablePlugins
Started bool
config *config

autodiscoverPaths []string
eventManager *gomit.EventController
Expand Down Expand Up @@ -129,6 +130,7 @@ func CacheExpiration(t time.Duration) controlOpt {
func New(opts ...controlOpt) *pluginControl {

c := &pluginControl{}
c.config = newConfig()
// Initialize components
//
// Event Manager
Expand All @@ -145,7 +147,7 @@ func New(opts ...controlOpt) *pluginControl {
}).Debug("metric catalog created")

// Plugin Manager
c.pluginManager = newPluginManager()
c.pluginManager = newPluginManager(OptSetPluginConfig(c.config.plugins))
controlLogger.WithFields(log.Fields{
"_block": "new",
}).Debug("plugin manager created")
Expand Down Expand Up @@ -680,6 +682,11 @@ func (p *pluginControl) CollectMetrics(metricTypes []core.Metric, deadline time.

wg.Add(1)

// merge global plugin config into the config for the metric
for _, mt := range pmt.metricTypes {
mt.Config().Merge(p.config.plugins.get(plugin.CollectorPluginType, ap.Name(), ap.Version()))
}

// get a metrics
go func(mt []core.Metric) {
mts, err := cli.CollectMetrics(mt)
Expand Down Expand Up @@ -747,7 +754,13 @@ func (p *pluginControl) PublishMetrics(contentType string, content []byte, plugi
return []error{errors.New("unable to cast client to PluginPublisherClient")}
}

errp := cli.Publish(contentType, content, config)
// merge global plugin config into the config for this request
cfg := p.config.plugins.get(plugin.PublisherPluginType, ap.Name(), ap.Version()).Table()
for k, v := range config {
cfg[k] = v
}

errp := cli.Publish(contentType, content, cfg)
if errp != nil {
return []error{errp}
}
Expand Down Expand Up @@ -783,7 +796,14 @@ func (p *pluginControl) ProcessMetrics(contentType string, content []byte, plugi
return "", nil, []error{errors.New("unable to cast client to PluginProcessorClient")}
}

ct, c, errp := cli.Process(contentType, content, config)
// merge global plugin config into the config for this request
cfg := p.config.plugins.get(plugin.ProcessorPluginType, ap.Name(), ap.Version()).Table()

for k, v := range config {
cfg[k] = v
}

ct, c, errp := cli.Process(contentType, content, cfg)
if errp != nil {
return "", nil, []error{errp}
}
Expand Down
25 changes: 22 additions & 3 deletions control/control_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ func (m *MockPluginManagerBadSwap) UnloadPlugin(c core.Plugin) (*loadedPlugin, p
}
func (m *MockPluginManagerBadSwap) get(string) (*loadedPlugin, error) { return nil, nil }
func (m *MockPluginManagerBadSwap) teardown() {}
func (m *MockPluginManagerBadSwap) setPluginConfig(*pluginConfig) {}
func (m *MockPluginManagerBadSwap) SetMetricCatalog(catalogsMetrics) {}
func (m *MockPluginManagerBadSwap) SetEmitter(gomit.Emitter) {}
func (m *MockPluginManagerBadSwap) GenerateArgs(string) plugin.Arg { return plugin.Arg{} }
Expand Down Expand Up @@ -686,8 +687,15 @@ func TestCollectMetrics(t *testing.T) {
c.pluginRunner.(*runner).monitor.duration = time.Millisecond * 100
c.Start()
time.Sleep(100 * time.Millisecond)

// Add a global plugin config
c.config.plugins.collector["dummy1"] = newPluginConfigItem(optAddPluginConfigItem("test", ctypes.ConfigValueBool{Value: true}))

// Load plugin
c.Load(PluginPath)
mts, err := c.MetricCatalog()
So(err, ShouldBeNil)
So(len(mts), ShouldEqual, 3)

cd := cdata.NewNode()
cd.AddItem("password", ctypes.ConfigValueStr{Value: "testval"})
Expand All @@ -701,6 +709,10 @@ func TestCollectMetrics(t *testing.T) {
namespace: []string{"intel", "dummy", "bar"},
cfg: cd,
}
m3 := MockMetricType{
namespace: []string{"intel", "dummy", "test"},
cfg: cd,
}

// retrieve loaded plugin
lp, err := c.pluginManager.get("collector:dummy1:1")
Expand All @@ -711,7 +723,7 @@ func TestCollectMetrics(t *testing.T) {
pool.subscribe("1", unboundSubscriptionType)
err = c.sendPluginSubscriptionEvent("1", lp)
So(err, ShouldBeNil)
m = append(m, m1, m2)
m = append(m, m1, m2, m3)

time.Sleep(time.Millisecond * 900)

Expand All @@ -720,6 +732,7 @@ func TestCollectMetrics(t *testing.T) {
So(err, ShouldBeNil)
for i := range cr {
So(cr[i].Data(), ShouldContainSubstring, "The dummy collected data!")
So(cr[i].Data(), ShouldContainSubstring, "test=true")
}
// fmt.Printf(" * Collect Response: %+v\n", cr)
}
Expand Down Expand Up @@ -838,6 +851,7 @@ func TestProcessMetrics(t *testing.T) {
c.pluginRunner.(*runner).monitor.duration = time.Millisecond * 100
c.Start()
time.Sleep(1 * time.Second)
c.config.plugins.processor["passthru"] = newPluginConfigItem(optAddPluginConfigItem("test", ctypes.ConfigValueBool{Value: true}))

// Load plugin
_, err := c.Load(path.Join(PulsePath, "plugin", "pulse-processor-passthru"))
Expand Down Expand Up @@ -873,8 +887,13 @@ func TestProcessMetrics(t *testing.T) {
enc := gob.NewEncoder(&buf)
enc.Encode(metrics)
contentType := plugin.PulseGOBContentType
cnt, ct, errs := c.ProcessMetrics(contentType, buf.Bytes(), "passthru", 1, n.Table())
fmt.Printf("%v %v", cnt, ct)
_, ct, errs := c.ProcessMetrics(contentType, buf.Bytes(), "passthru", 1, n.Table())
So(errs, ShouldBeEmpty)
mts := []plugin.PluginMetricType{}
dec := gob.NewDecoder(bytes.NewBuffer(ct))
err := dec.Decode(&mts)
So(err, ShouldBeNil)
So(mts[0].Data_, ShouldEqual, 2)

This comment has been minimized.

Copy link
@tiffanyfay

tiffanyfay Nov 3, 2015

Contributor

Should So(len(mts), ShouldBeGreaterThan, 0) happen first?

So(errs, ShouldBeNil)
})
})
Expand Down
3 changes: 2 additions & 1 deletion control/plugin/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ limitations under the License.
package client

import (
"github.com/intelsdi-x/pulse/control/plugin"
"github.com/intelsdi-x/pulse/control/plugin/cpolicy"
"github.com/intelsdi-x/pulse/core"
"github.com/intelsdi-x/pulse/core/ctypes"
Expand All @@ -37,7 +38,7 @@ type PluginClient interface {
type PluginCollectorClient interface {
PluginClient
CollectMetrics([]core.Metric) ([]core.Metric, error)
GetMetricTypes() ([]core.Metric, error)
GetMetricTypes(plugin.PluginConfigType) ([]core.Metric, error)
}

// PluginProcessorClient A client providing processor specific plugin method calls.
Expand Down
14 changes: 12 additions & 2 deletions control/plugin/client/httpjsonrpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,8 +200,15 @@ func (h *httpJSONRPCClient) CollectMetrics(mts []core.Metric) ([]core.Metric, er
}

// GetMetricTypes returns metric types that can be collected
func (h *httpJSONRPCClient) GetMetricTypes() ([]core.Metric, error) {
res, err := h.call("Collector.GetMetricTypes", []interface{}{})
func (h *httpJSONRPCClient) GetMetricTypes(config plugin.PluginConfigType) ([]core.Metric, error) {
args := plugin.GetMetricTypesArgs{PluginConfig: config}

out, err := h.encoder.Encode(args)
if err != nil {
return nil, err
}

res, err := h.call("Collector.GetMetricTypes", []interface{}{out})
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -322,5 +329,8 @@ func (h *httpJSONRPCClient) call(method string, args []interface{}) (*jsonRpcRes
return nil, err
}
atomic.AddUint64(&h.id, 1)
if result.Error != "" {
return result, errors.New(result.Error)
}
return result, nil
}
3 changes: 2 additions & 1 deletion control/plugin/client/httpjsonrpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,8 @@ func TestHTTPJSONRPC(t *testing.T) {
})

Convey("GetMetricTypes", func() {
mts, err := c.GetMetricTypes()
cfg := plugin.PluginConfigType{}
mts, err := c.GetMetricTypes(cfg)
So(err, ShouldBeNil)
So(mts, ShouldNotBeNil)
So(mts, ShouldHaveSameTypeAs, []core.Metric{})
Expand Down
13 changes: 11 additions & 2 deletions control/plugin/client/native.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,9 +195,17 @@ func (p *PluginNativeClient) CollectMetrics(coreMetricTypes []core.Metric) ([]co
return fromCache, nil
}

func (p *PluginNativeClient) GetMetricTypes() ([]core.Metric, error) {
func (p *PluginNativeClient) GetMetricTypes(config plugin.PluginConfigType) ([]core.Metric, error) {
var reply []byte
err := p.connection.Call("Collector.GetMetricTypes", []byte{}, &reply)

args := plugin.GetMetricTypesArgs{PluginConfig: config}

out, err := p.encoder.Encode(args)
if err != nil {
return nil, err
}

err = p.connection.Call("Collector.GetMetricTypes", out, &reply)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -272,6 +280,7 @@ func init() {
gob.Register(*(&ctypes.ConfigValueStr{}))
gob.Register(*(&ctypes.ConfigValueInt{}))
gob.Register(*(&ctypes.ConfigValueFloat{}))
gob.Register(*(&ctypes.ConfigValueBool{}))

gob.Register(cpolicy.NewPolicyNode())
gob.Register(&cpolicy.StringRule{})
Expand Down
2 changes: 1 addition & 1 deletion control/plugin/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,5 +26,5 @@ package plugin
type CollectorPlugin interface {
Plugin
CollectMetrics([]PluginMetricType) ([]PluginMetricType, error)
GetMetricTypes() ([]PluginMetricType, error)
GetMetricTypes(PluginConfigType) ([]PluginMetricType, error)
}
6 changes: 5 additions & 1 deletion control/plugin/collector_proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ type CollectMetricsReply struct {

// GetMetricTypesArgs args passed to GetMetricTypes
type GetMetricTypesArgs struct {
PluginConfig PluginConfigType
}

// GetMetricTypesReply assigned by GetMetricTypes() implementation
Expand All @@ -64,7 +65,10 @@ func (c *collectorPluginProxy) GetMetricTypes(args []byte, reply *[]byte) error
// Reset heartbeat
c.Session.ResetHeartbeat()

mts, err := c.Plugin.GetMetricTypes()
dargs := &GetMetricTypesArgs{}
c.Session.Decode(args, dargs)

mts, err := c.Plugin.GetMetricTypes(dargs.PluginConfig)
if err != nil {
return errors.New(fmt.Sprintf("GetMetricTypes call error : %s", err.Error()))
}
Expand Down
Loading

0 comments on commit b6ad0d0

Please sign in to comment.