From 32f3301f628d9af576e8ee99286ed2137a4be3e6 Mon Sep 17 00:00:00 2001 From: Joel Cooklin Date: Wed, 14 Oct 2015 18:03:12 -0700 Subject: [PATCH] Adds ability to pass a config to pulsed on startup The config enables plugins to get a config on load. --- control/config.go | 168 +++++++++++++++--- control/config_test.go | 61 ++++++- control/control.go | 24 ++- control/control_test.go | 54 +++--- control/plugin/client/httpjsonrpc_test.go | 9 +- control/plugin/client/native.go | 5 + control/plugin/collector_proxy.go | 13 +- control/plugin/cpolicy/node.go | 17 +- control/plugin/metric.go | 39 +++- control/plugin/session.go | 2 + control/plugin_manager.go | 5 +- control/plugin_manager_test.go | 24 +-- core/cdata/node.go | 14 +- core/ctypes/ctypes.go | 18 ++ examples/configs/pulse-config-sample.json | 47 +++++ mgmt/rest/tribe_test.go | 2 +- .../pulse-collector-dummy1/dummy/dummy.go | 4 +- .../pulse-collector-dummy2/dummy/dummy.go | 14 +- .../facter/facter_test.go | 4 +- .../perfevents/perfevents_test.go | 2 +- .../psutil/psutil_integration_test.go | 2 +- pulse.go | 38 +++- 22 files changed, 440 insertions(+), 126 deletions(-) create mode 100644 examples/configs/pulse-config-sample.json diff --git a/control/config.go b/control/config.go index 83dc04f2f..4f5258ca7 100644 --- a/control/config.go +++ b/control/config.go @@ -1,7 +1,11 @@ package control import ( + "bytes" + "encoding/json" "fmt" + "reflect" + "strconv" "github.com/intelsdi-x/pulse/control/plugin" "github.com/intelsdi-x/pulse/core/cdata" @@ -11,46 +15,79 @@ import ( // type configData map[string]*cdata.ConfigDataNode type pluginConfig struct { - all *cdata.ConfigDataNode - collector map[string]*pluginConfigItem - publisher map[string]*pluginConfigItem - processor map[string]*pluginConfigItem + All *cdata.ConfigDataNode `json:"all"` + Collector map[string]*pluginConfigItem `json:"collector"` + Publisher map[string]*pluginConfigItem `json:"publisher"` + Processor map[string]*pluginConfigItem `json:"processor"` pluginCache map[string]*cdata.ConfigDataNode } type pluginConfigItem struct { *cdata.ConfigDataNode - versions map[int]*cdata.ConfigDataNode + Versions map[int]*cdata.ConfigDataNode `json:"versions"` } type config struct { - control *cdata.ConfigDataNode - scheduler *cdata.ConfigDataNode - plugins *pluginConfig + Control *cdata.ConfigDataNode `json:"control"` + Scheduler *cdata.ConfigDataNode `json:"scheduler"` + Plugins *pluginConfig `json:"plugins"` } -func newConfig() *config { +func NewConfig() *config { return &config{ - control: cdata.NewNode(), - scheduler: cdata.NewNode(), - plugins: newPluginConfig(), + Control: cdata.NewNode(), + Scheduler: cdata.NewNode(), + Plugins: newPluginConfig(), } } func newPluginConfig() *pluginConfig { return &pluginConfig{ - all: cdata.NewNode(), - collector: make(map[string]*pluginConfigItem), - processor: make(map[string]*pluginConfigItem), - publisher: make(map[string]*pluginConfigItem), + All: cdata.NewNode(), + Collector: make(map[string]*pluginConfigItem), + Processor: make(map[string]*pluginConfigItem), + Publisher: make(map[string]*pluginConfigItem), pluginCache: make(map[string]*cdata.ConfigDataNode), } } +// UnmarshalJSON unmarshals valid json into pluginConfig. An example Config +// github.com/intelsdi-x/pulse/examples/configs/pulse-config-sample. +func (p *pluginConfig) UnmarshalJSON(data []byte) error { + t := map[string]interface{}{} + dec := json.NewDecoder(bytes.NewReader(data)) + dec.UseNumber() + if err := dec.Decode(&t); err != nil { + return err + } + + if v, ok := t["all"]; ok { + jv, err := json.Marshal(v) + if err != nil { + return err + } + cdn := &cdata.ConfigDataNode{} + dec = json.NewDecoder(bytes.NewReader(jv)) + dec.UseNumber() + if err := dec.Decode(&cdn); err != nil { + return err + } + p.All = cdn + } + + for _, typ := range []string{"collector", "processor", "publisher"} { + if err := unmarshalPluginConfig(typ, p, t); err != nil { + return err + } + } + + return nil +} + func newPluginConfigItem(opts ...pluginConfigOpt) *pluginConfigItem { p := &pluginConfigItem{ ConfigDataNode: cdata.NewNode(), - versions: make(map[int]*cdata.ConfigDataNode), + Versions: make(map[int]*cdata.ConfigDataNode), } for _, opt := range opts { @@ -78,28 +115,28 @@ func (p *pluginConfig) get(pluginType plugin.PluginType, name string, ver int) * //todo process/interpolate values p.pluginCache[key] = cdata.NewNode() - p.pluginCache[key].Merge(p.all) + p.pluginCache[key].Merge(p.All) // check for plugin config switch pluginType { case plugin.CollectorPluginType: - if res, ok := p.collector[name]; ok { + if res, ok := p.Collector[name]; ok { p.pluginCache[key].Merge(res.ConfigDataNode) - if res2, ok2 := res.versions[ver]; ok2 { + if res2, ok2 := res.Versions[ver]; ok2 { p.pluginCache[key].Merge(res2) } } case plugin.ProcessorPluginType: - if res, ok := p.processor[name]; ok { + if res, ok := p.Processor[name]; ok { p.pluginCache[key].Merge(res.ConfigDataNode) - if res2, ok2 := res.versions[ver]; ok2 { + if res2, ok2 := res.Versions[ver]; ok2 { p.pluginCache[key].Merge(res2) } } case plugin.PublisherPluginType: - if res, ok := p.publisher[name]; ok { + if res, ok := p.Publisher[name]; ok { p.pluginCache[key].Merge(res.ConfigDataNode) - if res2, ok2 := res.versions[ver]; ok2 { + if res2, ok2 := res.Versions[ver]; ok2 { p.pluginCache[key].Merge(res2) } } @@ -107,3 +144,86 @@ func (p *pluginConfig) get(pluginType plugin.PluginType, name string, ver int) * return p.pluginCache[key] } + +func unmarshalPluginConfig(typ string, p *pluginConfig, t map[string]interface{}) error { + if v, ok := t[typ]; ok { + switch plugins := v.(type) { + case map[string]interface{}: + for name, c := range plugins { + switch typ { + case "collector": + p.Collector[name] = newPluginConfigItem() + case "processor": + p.Processor[name] = newPluginConfigItem() + case "publisher": + p.Publisher[name] = newPluginConfigItem() + } + switch col := c.(type) { + case map[string]interface{}: + if v, ok := col["all"]; ok { + jv, err := json.Marshal(v) + if err != nil { + return err + } + cdn := cdata.NewNode() + dec := json.NewDecoder(bytes.NewReader(jv)) + dec.UseNumber() + if err := dec.Decode(&cdn); err != nil { + return err + } + switch typ { + case "collector": + p.Collector[name].ConfigDataNode = cdn + case "processor": + p.Processor[name].ConfigDataNode = cdn + case "publisher": + p.Publisher[name].ConfigDataNode = cdn + } + } + if vs, ok := col["versions"]; ok { + switch versions := vs.(type) { + case map[string]interface{}: + for ver, version := range versions { + switch v := version.(type) { + case map[string]interface{}: + jv, err := json.Marshal(v) + if err != nil { + return err + } + cdn := cdata.NewNode() + dec := json.NewDecoder(bytes.NewReader(jv)) + dec.UseNumber() + if err := dec.Decode(&cdn); err != nil { + return err + } + ver, err := strconv.Atoi(ver) + if err != nil { + return err + } + switch typ { + case "collector": + p.Collector[name].Versions[ver] = cdn + case "processor": + p.Processor[name].Versions[ver] = cdn + case "publisher": + p.Publisher[name].Versions[ver] = cdn + } + default: + return fmt.Errorf("Error unmarshalling %v'%v' expected '%v' got '%v'", typ, name, map[string]interface{}{}, reflect.TypeOf(v)) + } + } + + default: + return fmt.Errorf("Error unmarshalling %v '%v' expected '%v' got '%v'", typ, name, map[string]interface{}{}, reflect.TypeOf(versions)) + } + } + default: + return fmt.Errorf("Error unmarshalling %v '%v' expected '%v' got '%v'", typ, name, map[string]interface{}{}, reflect.TypeOf(col)) + } + } + default: + return fmt.Errorf("Error unmarshalling %v expected '%v' got '%v'", typ, map[string]interface{}{}, reflect.TypeOf(plugins)) + } + } + return nil +} diff --git a/control/config_test.go b/control/config_test.go index 0b339177b..d81fcd585 100644 --- a/control/config_test.go +++ b/control/config_test.go @@ -1,6 +1,8 @@ package control import ( + "encoding/json" + "io/ioutil" "testing" "github.com/intelsdi-x/pulse/control/plugin" @@ -11,25 +13,66 @@ import ( func TestPluginConfig(t *testing.T) { Convey("Given a plugin config", t, func() { - cfg := newConfig() + cfg := NewConfig() So(cfg, ShouldNotBeNil) Convey("with an entry for ALL plugins", func() { - cfg.plugins.all.AddItem("gvar", ctypes.ConfigValueBool{Value: true}) - So(len(cfg.plugins.all.Table()), ShouldEqual, 1) + cfg.Plugins.All.AddItem("gvar", ctypes.ConfigValueBool{Value: true}) + So(len(cfg.Plugins.All.Table()), ShouldEqual, 1) Convey("an entry for a specific plugin of any version", func() { - cfg.plugins.collector["test"] = newPluginConfigItem(optAddPluginConfigItem("pvar", ctypes.ConfigValueBool{Value: true})) - So(len(cfg.plugins.collector["test"].Table()), ShouldEqual, 1) + cfg.Plugins.Collector["test"] = newPluginConfigItem(optAddPluginConfigItem("pvar", ctypes.ConfigValueBool{Value: true})) + So(len(cfg.Plugins.Collector["test"].Table()), ShouldEqual, 1) Convey("and an entry for a specific plugin of a specific version", func() { - cfg.plugins.collector["test"].versions[1] = cdata.NewNode() - cfg.plugins.collector["test"].versions[1].AddItem("vvar", ctypes.ConfigValueBool{Value: true}) - So(len(cfg.plugins.collector["test"].versions[1].Table()), ShouldEqual, 1) + cfg.Plugins.Collector["test"].Versions[1] = cdata.NewNode() + cfg.Plugins.Collector["test"].Versions[1].AddItem("vvar", ctypes.ConfigValueBool{Value: true}) + So(len(cfg.Plugins.Collector["test"].Versions[1].Table()), ShouldEqual, 1) Convey("we can get the merged conf for the given plugin", func() { - cd := cfg.plugins.get(plugin.CollectorPluginType, "test", 1) + cd := cfg.Plugins.get(plugin.CollectorPluginType, "test", 1) So(len(cd.Table()), ShouldEqual, 3) }) }) }) }) + }) + + Convey("Provided a config in json", t, func() { + cfg := NewConfig() + b, err := ioutil.ReadFile("../examples/configs/pulse-config-sample.json") + So(b, ShouldNotBeEmpty) + So(b, ShouldNotBeNil) + So(err, ShouldBeNil) + Convey("We are able to unmarshal it into a valid config", func() { + err = json.Unmarshal(b, &cfg) + So(err, ShouldBeNil) + So(cfg.Control, ShouldNotBeNil) + So(cfg.Control.Table()["cache_ttl"], ShouldResemble, ctypes.ConfigValueStr{Value: "5s"}) + So(cfg.Plugins.All.Table()["password"], ShouldResemble, ctypes.ConfigValueStr{Value: "p@ssw0rd"}) + So(cfg.Plugins.Collector["pcm"], ShouldNotBeNil) + So(cfg.Plugins.Collector["pcm"].Table()["path"], ShouldResemble, ctypes.ConfigValueStr{Value: "/usr/local/pcm/bin"}) + So(cfg.Plugins, ShouldNotBeNil) + So(cfg.Plugins.All, ShouldNotBeNil) + So(cfg.Plugins.Collector["pcm"].Versions[1].Table()["user"], ShouldResemble, ctypes.ConfigValueStr{Value: "john"}) + So(cfg.Plugins.Processor["movingaverage"].Table()["user"], ShouldResemble, ctypes.ConfigValueStr{Value: "jane"}) + Convey("Through the config object we can access the stored configurations for plugins", func() { + c := cfg.Plugins.get(plugin.ProcessorPluginType, "movingaverage", 0) + So(c, ShouldNotBeNil) + So(c.Table()["password"], ShouldResemble, ctypes.ConfigValueStr{Value: "p@ssw0rd"}) + Convey("Overwritting the value of a variable defined for all plugins", func() { + c := cfg.Plugins.get(plugin.ProcessorPluginType, "movingaverage", 1) + So(c, ShouldNotBeNil) + So(c.Table()["password"], ShouldResemble, ctypes.ConfigValueStr{Value: "new password"}) + }) + Convey("Retrieving the value of a variable defined for all plugins", func() { + c := cfg.Plugins.get(plugin.CollectorPluginType, "pcm", 0) + So(c, ShouldNotBeNil) + So(c.Table()["password"], ShouldResemble, ctypes.ConfigValueStr{Value: "p@ssw0rd"}) + }) + Convey("Overwritting the value of a variable defined for all versions of the plugin", func() { + c := cfg.Plugins.get(plugin.ProcessorPluginType, "movingaverage", 1) + So(c, ShouldNotBeNil) + So(c.Table()["password"], ShouldResemble, ctypes.ConfigValueStr{Value: "new password"}) + }) + }) + }) }) } diff --git a/control/control.go b/control/control.go index 0c2142a9c..cc6c45e2c 100644 --- a/control/control.go +++ b/control/control.go @@ -112,25 +112,31 @@ type managesSigning interface { ValidateSignature(keyringFile string, signedFile string, signatureFile string) perror.PulseError } -type controlOpt func(*pluginControl) +type ControlOpt func(*pluginControl) -func MaxRunningPlugins(m int) controlOpt { +func MaxRunningPlugins(m int) ControlOpt { return func(c *pluginControl) { maximumRunningPlugins = m } } -func CacheExpiration(t time.Duration) controlOpt { +func CacheExpiration(t time.Duration) ControlOpt { return func(c *pluginControl) { client.GlobalCacheExpiration = t } } +func OptSetConfig(c *config) ControlOpt { + return func(p *pluginControl) { + p.config = c + } +} + // New returns a new pluginControl instance -func New(opts ...controlOpt) *pluginControl { +func New(opts ...ControlOpt) *pluginControl { c := &pluginControl{} - c.config = newConfig() + c.config = NewConfig() // Initialize components // // Event Manager @@ -147,7 +153,7 @@ func New(opts ...controlOpt) *pluginControl { }).Debug("metric catalog created") // Plugin Manager - c.pluginManager = newPluginManager(OptSetPluginConfig(c.config.plugins)) + c.pluginManager = newPluginManager(OptSetPluginConfig(c.config.Plugins)) controlLogger.WithFields(log.Fields{ "_block": "new", }).Debug("plugin manager created") @@ -684,7 +690,7 @@ func (p *pluginControl) CollectMetrics(metricTypes []core.Metric, deadline time. // merge global plugin config into the config for the metric for _, mt := range pmt.metricTypes { - mt.Config().Merge(p.config.plugins.get(plugin.CollectorPluginType, ap.Name(), ap.Version())) + mt.Config().Merge(p.config.Plugins.get(plugin.CollectorPluginType, ap.Name(), ap.Version())) } // get a metrics @@ -755,7 +761,7 @@ func (p *pluginControl) PublishMetrics(contentType string, content []byte, plugi } // merge global plugin config into the config for this request - cfg := p.config.plugins.get(plugin.PublisherPluginType, ap.Name(), ap.Version()).Table() + cfg := p.config.Plugins.get(plugin.PublisherPluginType, ap.Name(), ap.Version()).Table() for k, v := range config { cfg[k] = v } @@ -797,7 +803,7 @@ func (p *pluginControl) ProcessMetrics(contentType string, content []byte, plugi } // merge global plugin config into the config for this request - cfg := p.config.plugins.get(plugin.ProcessorPluginType, ap.Name(), ap.Version()).Table() + cfg := p.config.Plugins.get(plugin.ProcessorPluginType, ap.Name(), ap.Version()).Table() for k, v := range config { cfg[k] = v diff --git a/control/control_test.go b/control/control_test.go index c5c88a870..6cd7cd265 100644 --- a/control/control_test.go +++ b/control/control_test.go @@ -104,7 +104,7 @@ func TestSwapPlugin(t *testing.T) { So(e, ShouldBeNil) - dummy2Path := strings.Replace(PluginPath, "pulse-collector-dummy1", "pulse-collector-dummy2", 1) + dummy2Path := strings.Replace(PluginPath, "pulse-collector-dummy2", "pulse-collector-dummy1", 1) pc := c.PluginCatalog() dummy := pc[0] @@ -113,18 +113,18 @@ func TestSwapPlugin(t *testing.T) { So(err, ShouldBeNil) time.Sleep(50 * time.Millisecond) pc = c.PluginCatalog() - So(pc[0].Name(), ShouldEqual, "dummy2") - So(lpe.plugin.LoadedPluginName, ShouldEqual, "dummy2") - So(lpe.plugin.LoadedPluginVersion, ShouldEqual, 2) - So(lpe.plugin.UnloadedPluginName, ShouldEqual, "dummy1") - So(lpe.plugin.UnloadedPluginVersion, ShouldEqual, 1) + So(pc[0].Name(), ShouldEqual, "dummy1") + So(lpe.plugin.LoadedPluginName, ShouldEqual, "dummy1") + So(lpe.plugin.LoadedPluginVersion, ShouldEqual, 1) + So(lpe.plugin.UnloadedPluginName, ShouldEqual, "dummy2") + So(lpe.plugin.UnloadedPluginVersion, ShouldEqual, 2) So(lpe.plugin.PluginType, ShouldEqual, int(plugin.CollectorPluginType)) }) Convey("does not unload & returns an error if it cannot load a plugin", func() { err := c.SwapPlugins("/fake/plugin/path", pc[0]) So(err, ShouldNotBeNil) - So(pc[0].Name(), ShouldEqual, "dummy1") + So(pc[0].Name(), ShouldEqual, "dummy2") }) Convey("rollsback loaded plugin & returns an error if it cannot unload a plugin", func() { @@ -223,8 +223,8 @@ func TestLoad(t *testing.T) { time.Sleep(100) So(err, ShouldBeNil) So(c.pluginManager.all(), ShouldNotBeEmpty) - So(lpe.plugin.LoadedPluginName, ShouldEqual, "dummy1") - So(lpe.plugin.LoadedPluginVersion, ShouldEqual, 1) + So(lpe.plugin.LoadedPluginName, ShouldEqual, "dummy2") + So(lpe.plugin.LoadedPluginVersion, ShouldEqual, 2) So(lpe.plugin.PluginType, ShouldEqual, int(plugin.CollectorPluginType)) }) @@ -316,19 +316,22 @@ func TestUnload(t *testing.T) { pc := c.PluginCatalog() So(len(pc), ShouldEqual, 1) - So(pc[0].Name(), ShouldEqual, "dummy1") + So(pc[0].Name(), ShouldEqual, "dummy2") _, err2 := c.Unload(pc[0]) <-lpe.done So(err2, ShouldBeNil) - So(lpe.plugin.UnloadedPluginName, ShouldEqual, "dummy1") - So(lpe.plugin.UnloadedPluginVersion, ShouldEqual, 1) + So(lpe.plugin.UnloadedPluginName, ShouldEqual, "dummy2") + So(lpe.plugin.UnloadedPluginVersion, ShouldEqual, 2) So(lpe.plugin.PluginType, ShouldEqual, int(plugin.CollectorPluginType)) }) Convey("returns error on unload for unknown plugin(or already unloaded)", func() { c := New() + lpe := newListenToPluginEvent() + c.eventManager.RegisterHandler("TestUnload", lpe) c.Start() _, err := c.Load(PluginPath) + <-lpe.done So(c.pluginManager.all(), ShouldNotBeEmpty) So(err, ShouldBeNil) @@ -352,7 +355,7 @@ func TestUnload(t *testing.T) { pc := c.PluginCatalog() c.Unload(pc[0]) <-lpe.done - So(lpe.plugin.UnloadedPluginName, ShouldEqual, "dummy1") + So(lpe.plugin.UnloadedPluginName, ShouldEqual, "dummy2") }) }) @@ -657,7 +660,10 @@ func (m MockMetricType) Data() interface{} { func TestMetricConfig(t *testing.T) { c := New() c.Start() - c.Load(PluginPath) + lpe := newListenToPluginEvent() + c.eventManager.RegisterHandler("Control.PluginLoaded", lpe) + c.Load(JSONRPC_PluginPath) + <-lpe.done cd := cdata.NewNode() m1 := MockMetricType{ namespace: []string{"intel", "dummy", "foo"}, @@ -670,8 +676,8 @@ func TestMetricConfig(t *testing.T) { cd.AddItem("password", ctypes.ConfigValueStr{Value: "testval"}) metric, errs = c.validateMetricTypeSubscription(m1, cd) Convey("So metric should be valid with config", t, func() { - So(metric, ShouldNotBeNil) So(errs, ShouldBeNil) + So(metric, ShouldNotBeNil) }) } @@ -686,13 +692,16 @@ func TestCollectMetrics(t *testing.T) { c := New() c.pluginRunner.(*runner).monitor.duration = time.Millisecond * 100 c.Start() + lpe := newListenToPluginEvent() + c.eventManager.RegisterHandler("Control.PluginLoaded", lpe) time.Sleep(100 * time.Millisecond) // Add a global plugin config - c.config.plugins.collector["dummy1"] = newPluginConfigItem(optAddPluginConfigItem("test", ctypes.ConfigValueBool{Value: true})) + c.config.Plugins.Collector["dummy1"] = newPluginConfigItem(optAddPluginConfigItem("test", ctypes.ConfigValueBool{Value: true})) // Load plugin - c.Load(PluginPath) + c.Load(JSONRPC_PluginPath) + <-lpe.done mts, err := c.MetricCatalog() So(err, ShouldBeNil) So(len(mts), ShouldEqual, 3) @@ -724,8 +733,7 @@ func TestCollectMetrics(t *testing.T) { err = c.sendPluginSubscriptionEvent("1", lp) So(err, ShouldBeNil) m = append(m, m1, m2, m3) - - time.Sleep(time.Millisecond * 900) + time.Sleep(time.Millisecond * 1100) for x := 0; x < 5; x++ { cr, err := c.CollectMetrics(m, time.Now().Add(time.Second*60)) @@ -734,9 +742,7 @@ func TestCollectMetrics(t *testing.T) { So(cr[i].Data(), ShouldContainSubstring, "The dummy collected data!") So(cr[i].Data(), ShouldContainSubstring, "test=true") } - // fmt.Printf(" * Collect Response: %+v\n", cr) } - time.Sleep(time.Millisecond * 500) ap := c.AvailablePlugins() So(ap, ShouldNotBeEmpty) }) @@ -819,7 +825,7 @@ func TestPublishMetrics(t *testing.T) { errs := c.sendPluginSubscriptionEvent("1", p) So(errs, ShouldBeNil) <-lpe.done - time.Sleep(1500 * time.Millisecond) + time.Sleep(2500 * time.Millisecond) Convey("Publish to file", func() { metrics := []plugin.PluginMetricType{ @@ -851,7 +857,7 @@ func TestProcessMetrics(t *testing.T) { c.pluginRunner.(*runner).monitor.duration = time.Millisecond * 100 c.Start() time.Sleep(1 * time.Second) - c.config.plugins.processor["passthru"] = newPluginConfigItem(optAddPluginConfigItem("test", ctypes.ConfigValueBool{Value: true})) + c.config.Plugins.Processor["passthru"] = newPluginConfigItem(optAddPluginConfigItem("test", ctypes.ConfigValueBool{Value: true})) // Load plugin _, err := c.Load(path.Join(PulsePath, "plugin", "pulse-processor-passthru")) @@ -877,7 +883,7 @@ func TestProcessMetrics(t *testing.T) { errs := c.sendPluginSubscriptionEvent("1", p) So(errs, ShouldBeNil) <-lpe.done - time.Sleep(1500 * time.Millisecond) + time.Sleep(2500 * time.Millisecond) Convey("process metrics", func() { metrics := []plugin.PluginMetricType{ diff --git a/control/plugin/client/httpjsonrpc_test.go b/control/plugin/client/httpjsonrpc_test.go index 3d3a40e29..af6766a86 100644 --- a/control/plugin/client/httpjsonrpc_test.go +++ b/control/plugin/client/httpjsonrpc_test.go @@ -91,9 +91,13 @@ func (m *mockCollectorProxy) CollectMetrics(args []byte, reply *[]byte) error { } func (m *mockCollectorProxy) GetMetricTypes(args []byte, reply *[]byte) error { + dargs := &plugin.GetMetricTypesArgs{} + m.e.Decode(args, &dargs) + pmts := []plugin.PluginMetricType{} pmts = append(pmts, plugin.PluginMetricType{ Namespace_: []string{"foo", "bar"}, + Config_: dargs.PluginConfig.ConfigDataNode, }) *reply, _ = m.e.Encode(plugin.GetMetricTypesReply{PluginMetricTypes: pmts}) return nil @@ -195,12 +199,15 @@ func TestHTTPJSONRPC(t *testing.T) { }) Convey("GetMetricTypes", func() { - cfg := plugin.PluginConfigType{} + cfg := plugin.NewPluginConfigType() + cfg.AddItem("test", ctypes.ConfigValueBool{Value: true}) mts, err := c.GetMetricTypes(cfg) So(err, ShouldBeNil) So(mts, ShouldNotBeNil) So(mts, ShouldHaveSameTypeAs, []core.Metric{}) So(len(mts), ShouldBeGreaterThan, 0) + log.Errorf("!asdf %v", mts[0].Config()) + So(len(mts[0].Config().Table()), ShouldBeGreaterThan, 0) }) Convey("CollectMetrics provided a valid config", func() { diff --git a/control/plugin/client/native.go b/control/plugin/client/native.go index 11e4058f4..87eb355e5 100644 --- a/control/plugin/client/native.go +++ b/control/plugin/client/native.go @@ -28,11 +28,14 @@ import ( "time" "unicode" + log "github.com/Sirupsen/logrus" + "github.com/intelsdi-x/pulse/control/plugin" "github.com/intelsdi-x/pulse/control/plugin/cpolicy" "github.com/intelsdi-x/pulse/control/plugin/encoding" "github.com/intelsdi-x/pulse/control/plugin/encrypter" "github.com/intelsdi-x/pulse/core" + "github.com/intelsdi-x/pulse/core/cdata" "github.com/intelsdi-x/pulse/core/ctypes" ) @@ -202,6 +205,7 @@ func (p *PluginNativeClient) GetMetricTypes(config plugin.PluginConfigType) ([]c out, err := p.encoder.Encode(args) if err != nil { + log.Error("error while encoding args for getmetrictypes :(") return nil, err } @@ -283,6 +287,7 @@ func init() { gob.Register(*(&ctypes.ConfigValueBool{})) gob.Register(cpolicy.NewPolicyNode()) + gob.Register(&cdata.ConfigDataNode{}) gob.Register(&cpolicy.StringRule{}) gob.Register(&cpolicy.IntRule{}) gob.Register(&cpolicy.FloatRule{}) diff --git a/control/plugin/collector_proxy.go b/control/plugin/collector_proxy.go index 8430613ae..74c4322ca 100644 --- a/control/plugin/collector_proxy.go +++ b/control/plugin/collector_proxy.go @@ -22,6 +22,8 @@ package plugin import ( "errors" "fmt" + + "github.com/intelsdi-x/pulse/core/cdata" ) // Arguments passed to CollectMetrics() for a Collector implementation @@ -29,15 +31,6 @@ type CollectMetricsArgs struct { PluginMetricTypes []PluginMetricType } -//func (c *CollectMetricsArgs) UnmarshalJSON(data []byte) error { -// pmt := &[]PluginMetricType{} -// if err := json.Unmarshal(data, pmt); err != nil { -// return err -// } -// c.PluginMetricTypes = *pmt -// return nil -//} - // Reply assigned by a Collector implementation using CollectMetrics() type CollectMetricsReply struct { PluginMetrics []PluginMetricType @@ -65,7 +58,7 @@ func (c *collectorPluginProxy) GetMetricTypes(args []byte, reply *[]byte) error // Reset heartbeat c.Session.ResetHeartbeat() - dargs := &GetMetricTypesArgs{} + dargs := &GetMetricTypesArgs{PluginConfig: PluginConfigType{ConfigDataNode: cdata.NewNode()}} c.Session.Decode(args, dargs) mts, err := c.Plugin.GetMetricTypes(dargs.PluginConfig) diff --git a/control/plugin/cpolicy/node.go b/control/plugin/cpolicy/node.go index 880410ea2..e5e4e9af5 100644 --- a/control/plugin/cpolicy/node.go +++ b/control/plugin/cpolicy/node.go @@ -214,24 +214,27 @@ func addRulesToConfigPolicyNode(rules map[string]interface{}, cpn *ConfigPolicyN switch rule["type"] { case "integer": r, _ := NewIntegerRule(k, req) - if d, ok := rule["default"].(map[string]interface{}); ok { + if d, ok := rule["default"]; ok { // json encoding an int results in a float when decoding - def_, _ := d["Value"].(float64) + def_, _ := d.(float64) def := int(def_) r.default_ = &def } cpn.Add(r) case "string": r, _ := NewStringRule(k, req) - if d, ok := rule["default"].(map[string]interface{}); ok { - def, _ := d["Value"].(string) - r.default_ = &def + if d, ok := rule["default"]; ok { + def, _ := d.(string) + if def != "" { + r.default_ = &def + } } + cpn.Add(r) case "float": r, _ := NewFloatRule(k, req) - if d, ok := rule["default"].(map[string]interface{}); ok { - def, _ := d["Value"].(float64) + if d, ok := rule["default"]; ok { + def, _ := d.(float64) r.default_ = &def } cpn.Add(r) diff --git a/control/plugin/metric.go b/control/plugin/metric.go index 39b8c6b95..a735bb9d9 100644 --- a/control/plugin/metric.go +++ b/control/plugin/metric.go @@ -46,7 +46,44 @@ const ( ) type PluginConfigType struct { - Data *cdata.ConfigDataNode `json:"config"` + *cdata.ConfigDataNode +} + +func (p *PluginConfigType) UnmarshalJSON(data []byte) error { + cdn := cdata.NewNode() + dec := json.NewDecoder(bytes.NewReader(data)) + dec.UseNumber() + if err := dec.Decode(cdn); err != nil { + return err + } + p.ConfigDataNode = cdn + return nil +} + +func (p PluginConfigType) GobEncode() ([]byte, error) { + w := new(bytes.Buffer) + encoder := gob.NewEncoder(w) + if err := encoder.Encode(p.ConfigDataNode); err != nil { + return nil, err + } + return w.Bytes(), nil +} + +func (p *PluginConfigType) GobDecode(data []byte) error { + cdn := cdata.NewNode() + decoder := gob.NewDecoder(bytes.NewReader(data)) + if err := decoder.Decode(cdn); err != nil { + return err + } + p.ConfigDataNode = cdn + + return nil +} + +func NewPluginConfigType() PluginConfigType { + return PluginConfigType{ + ConfigDataNode: cdata.NewNode(), + } } // Represents a metric type. Only used within plugins and across plugin calls. diff --git a/control/plugin/session.go b/control/plugin/session.go index 4f93e52a8..480679362 100644 --- a/control/plugin/session.go +++ b/control/plugin/session.go @@ -34,6 +34,7 @@ import ( "github.com/intelsdi-x/pulse/control/plugin/cpolicy" "github.com/intelsdi-x/pulse/control/plugin/encoding" "github.com/intelsdi-x/pulse/control/plugin/encrypter" + "github.com/intelsdi-x/pulse/core/cdata" "github.com/intelsdi-x/pulse/core/ctypes" ) @@ -297,6 +298,7 @@ func init() { gob.Register(*(&ctypes.ConfigValueBool{})) gob.Register(cpolicy.NewPolicyNode()) + gob.Register(&cdata.ConfigDataNode{}) gob.Register(&cpolicy.StringRule{}) gob.Register(&cpolicy.IntRule{}) gob.Register(&cpolicy.FloatRule{}) diff --git a/control/plugin_manager.go b/control/plugin_manager.go index abdc5d1a6..299b4c913 100644 --- a/control/plugin_manager.go +++ b/control/plugin_manager.go @@ -320,11 +320,10 @@ func (p *pluginManager) LoadPlugin(path string, emitter gomit.Emitter) (*loadedP if resp.Type == plugin.CollectorPluginType { colClient := ap.client.(client.PluginCollectorClient) - // Get metric types - // TODO pass in the global config cfg := plugin.PluginConfigType{ - Data: p.pluginConfig.get(resp.Type, resp.Meta.Name, resp.Meta.Version), + ConfigDataNode: p.pluginConfig.get(resp.Type, resp.Meta.Name, resp.Meta.Version), } + metricTypes, err := colClient.GetMetricTypes(cfg) if err != nil { pmLogger.WithFields(log.Fields{ diff --git a/control/plugin_manager_test.go b/control/plugin_manager_test.go index 0844626f0..62f7613a9 100644 --- a/control/plugin_manager_test.go +++ b/control/plugin_manager_test.go @@ -33,11 +33,11 @@ import ( ) var ( - PluginName = "pulse-collector-dummy1" + PluginName = "pulse-collector-dummy2" PulsePath = os.Getenv("PULSE_PATH") PluginPath = path.Join(PulsePath, "plugin", PluginName) - JSONRPC_PluginName = "pulse-collector-dummy2" + JSONRPC_PluginName = "pulse-collector-dummy1" JSONRPC_PluginPath = path.Join(PulsePath, "plugin", JSONRPC_PluginName) ) @@ -91,9 +91,9 @@ func TestLoadPlugin(t *testing.T) { }) Convey("with a plugin config a plugin loads successfully", func() { - cfg := newConfig() - cfg.plugins.collector["dummy1"] = newPluginConfigItem(optAddPluginConfigItem("test", ctypes.ConfigValueBool{Value: true})) - p := newPluginManager(OptSetPluginConfig(cfg.plugins)) + cfg := NewConfig() + cfg.Plugins.Collector["dummy2"] = newPluginConfigItem(optAddPluginConfigItem("test", ctypes.ConfigValueBool{Value: true})) + p := newPluginManager(OptSetPluginConfig(cfg.Plugins)) p.SetMetricCatalog(newMetricCatalog()) lp, err := p.LoadPlugin(PluginPath, nil) @@ -107,16 +107,16 @@ func TestLoadPlugin(t *testing.T) { }) Convey("for a plugin requiring a config an incomplete config will result in a load failure", func() { - cfg := newConfig() - cfg.plugins.collector["dummy1"] = newPluginConfigItem(optAddPluginConfigItem("test-fail", ctypes.ConfigValueBool{Value: true})) - p := newPluginManager(OptSetPluginConfig(cfg.plugins)) + cfg := NewConfig() + cfg.Plugins.Collector["dummy2"] = newPluginConfigItem(optAddPluginConfigItem("test-fail", ctypes.ConfigValueBool{Value: true})) + p := newPluginManager(OptSetPluginConfig(cfg.Plugins)) p.SetMetricCatalog(newMetricCatalog()) lp, err := p.LoadPlugin(PluginPath, nil) So(lp, ShouldBeNil) So(p.all(), ShouldBeEmpty) So(err, ShouldNotBeNil) - So(err.Error(), ShouldContainSubstring, "missing on-load plugin config entry 'test'") + So(err.Error(), ShouldContainSubstring, "testing") So(len(p.all()), ShouldEqual, 0) }) @@ -167,7 +167,7 @@ func TestUnloadPlugin(t *testing.T) { numPluginsLoaded := len(p.all()) So(numPluginsLoaded, ShouldEqual, 1) - lp, _ := p.get("collector:dummy1:1") + lp, _ := p.get("collector:dummy2:2") _, err = p.UnloadPlugin(lp) So(err, ShouldBeNil) @@ -180,7 +180,7 @@ func TestUnloadPlugin(t *testing.T) { p := newPluginManager() p.SetMetricCatalog(newMetricCatalog()) lp, err := p.LoadPlugin(PluginPath, nil) - glp, err2 := p.get("collector:dummy1:1") + glp, err2 := p.get("collector:dummy2:2") So(err2, ShouldBeNil) glp.State = DetectedState _, err = p.UnloadPlugin(lp) @@ -194,7 +194,7 @@ func TestUnloadPlugin(t *testing.T) { p.SetMetricCatalog(newMetricCatalog()) _, err := p.LoadPlugin(PluginPath, nil) - lp, err2 := p.get("collector:dummy1:1") + lp, err2 := p.get("collector:dummy2:2") So(err2, ShouldBeNil) _, err = p.UnloadPlugin(lp) diff --git a/core/cdata/node.go b/core/cdata/node.go index f8f25d5c9..7a5a13ec2 100644 --- a/core/cdata/node.go +++ b/core/cdata/node.go @@ -56,16 +56,12 @@ func (c *ConfigDataNode) GobDecode(buf []byte) error { // MarshalJSON marshals a ConfigDataNode into JSON func (c *ConfigDataNode) MarshalJSON() ([]byte, error) { - return json.Marshal(&struct { - Table map[string]ctypes.ConfigValue `json:"table"` - }{ - Table: c.table, - }) + return json.Marshal(c.table) } // UnmarshalJSON unmarshals JSON into a ConfigDataNode func (c *ConfigDataNode) UnmarshalJSON(data []byte) error { - t := map[string]map[string]interface{}{} + t := map[string]interface{}{} c.table = map[string]ctypes.ConfigValue{} dec := json.NewDecoder(bytes.NewReader(data)) dec.UseNumber() @@ -73,8 +69,8 @@ func (c *ConfigDataNode) UnmarshalJSON(data []byte) error { return err } - for k, i := range t["table"] { - switch t := i.(map[string]interface{})["Value"].(type) { + for k, i := range t { + switch t := i.(type) { case string: c.table[k] = ctypes.ConfigValueStr{Value: t} case bool: @@ -90,7 +86,7 @@ func (c *ConfigDataNode) UnmarshalJSON(data []byte) error { continue } default: - return fmt.Errorf("Error Unmarshalling ConfigDataNode into JSON. Type %v is unsupported.", k) + return fmt.Errorf("Error Unmarshalling JSON ConfigDataNode. Key: %v Type: %v is unsupported.", k, t) } } c.mutex = new(sync.Mutex) diff --git a/core/ctypes/ctypes.go b/core/ctypes/ctypes.go index 55cb5f306..6b7399ef7 100644 --- a/core/ctypes/ctypes.go +++ b/core/ctypes/ctypes.go @@ -19,6 +19,8 @@ limitations under the License. package ctypes +import "encoding/json" + // TODO constructors for each that have typing for value (and optionally validate) type ConfigValue interface { @@ -33,6 +35,10 @@ func (c ConfigValueInt) Type() string { return "integer" } +func (c ConfigValueInt) MarshalJSON() ([]byte, error) { + return json.Marshal(c.Value) +} + type ConfigValueStr struct { Value string } @@ -41,6 +47,10 @@ func (c ConfigValueStr) Type() string { return "string" } +func (c ConfigValueStr) MarshalJSON() ([]byte, error) { + return json.Marshal(c.Value) +} + type ConfigValueFloat struct { Value float64 } @@ -49,6 +59,10 @@ func (c ConfigValueFloat) Type() string { return "float" } +func (c ConfigValueFloat) MarshalJSON() ([]byte, error) { + return json.Marshal(c.Value) +} + type ConfigValueBool struct { Value bool } @@ -57,6 +71,10 @@ func (c ConfigValueBool) Type() string { return "bool" } +func (c ConfigValueBool) MarshalJSON() ([]byte, error) { + return json.Marshal(c.Value) +} + // Returns a slice of string keywords for the types supported by ConfigValue. func SupportedTypes() []string { // This is kind of a hack but keeps the definiton of types here in diff --git a/examples/configs/pulse-config-sample.json b/examples/configs/pulse-config-sample.json new file mode 100644 index 000000000..b6598ed9b --- /dev/null +++ b/examples/configs/pulse-config-sample.json @@ -0,0 +1,47 @@ +{ + "control": { + "cache_ttl": "5s" + }, + "scheduler": { + "default_deadline": "5s", + "worker_pool_size": 5 + }, + "plugins": { + "all": { + "password": "p@ssw0rd" + }, + "collector": { + "pcm": { + "all": { + "path": "/usr/local/pcm/bin" + }, + "versions": { + "1": { + "user": "john" + } + } + } + }, + "publisher": { + "influxdb": { + "all": { + "server": "xyz.local", + "password": "$password" + } + } + }, + "processor": { + "movingaverage" : { + "all": { + "user": "jane" + }, + "versions": { + "1": { + "user": "tiffany", + "password": "new password" + } + } + } + } + } +} diff --git a/mgmt/rest/tribe_test.go b/mgmt/rest/tribe_test.go index 4f147a2bf..88e399a65 100644 --- a/mgmt/rest/tribe_test.go +++ b/mgmt/rest/tribe_test.go @@ -488,7 +488,7 @@ func startTribes(count int) []int { return mgtPorts } -var nextPort uint64 = 51234 +var nextPort uint64 = 55234 func getAvailablePort() int { atomic.AddUint64(&nextPort, 1) diff --git a/plugin/collector/pulse-collector-dummy1/dummy/dummy.go b/plugin/collector/pulse-collector-dummy1/dummy/dummy.go index b15cfaddf..6ad1f9e38 100644 --- a/plugin/collector/pulse-collector-dummy1/dummy/dummy.go +++ b/plugin/collector/pulse-collector-dummy1/dummy/dummy.go @@ -68,10 +68,10 @@ func (f *Dummy) CollectMetrics(mts []plugin.PluginMetricType) ([]plugin.PluginMe //GetMetricTypes returns metric types for testing func (f *Dummy) GetMetricTypes(cfg plugin.PluginConfigType) ([]plugin.PluginMetricType, error) { mts := []plugin.PluginMetricType{} - if _, ok := cfg.Data.Table()["test-fail"]; ok { + if _, ok := cfg.Table()["test-fail"]; ok { return mts, fmt.Errorf("missing on-load plugin config entry 'test'") } - if _, ok := cfg.Data.Table()["test"]; ok { + if _, ok := cfg.Table()["test"]; ok { mts = append(mts, plugin.PluginMetricType{Namespace_: []string{"intel", "dummy", "test"}}) } mts = append(mts, plugin.PluginMetricType{Namespace_: []string{"intel", "dummy", "foo"}}) diff --git a/plugin/collector/pulse-collector-dummy2/dummy/dummy.go b/plugin/collector/pulse-collector-dummy2/dummy/dummy.go index c8ff9b6b1..dfaa8f489 100644 --- a/plugin/collector/pulse-collector-dummy2/dummy/dummy.go +++ b/plugin/collector/pulse-collector-dummy2/dummy/dummy.go @@ -20,6 +20,7 @@ limitations under the License. package dummy import ( + "fmt" "log" "math/rand" "os" @@ -64,9 +65,16 @@ func (f *Dummy) CollectMetrics(mts []plugin.PluginMetricType) ([]plugin.PluginMe //GetMetricTypes returns metric types for testing func (f *Dummy) GetMetricTypes(cfg plugin.PluginConfigType) ([]plugin.PluginMetricType, error) { - m1 := &plugin.PluginMetricType{Namespace_: []string{"intel", "dummy", "foo"}} - m2 := &plugin.PluginMetricType{Namespace_: []string{"intel", "dummy", "bar"}} - return []plugin.PluginMetricType{*m1, *m2}, nil + mts := []plugin.PluginMetricType{} + if _, ok := cfg.Table()["test-fail"]; ok { + return mts, fmt.Errorf("testing") + } + if _, ok := cfg.Table()["test"]; ok { + mts = append(mts, plugin.PluginMetricType{Namespace_: []string{"intel", "dummy", "test"}}) + } + mts = append(mts, plugin.PluginMetricType{Namespace_: []string{"intel", "dummy", "foo"}}) + mts = append(mts, plugin.PluginMetricType{Namespace_: []string{"intel", "dummy", "bar"}}) + return mts, nil } //GetConfigPolicy returns a ConfigPolicy for testing diff --git a/plugin/collector/pulse-collector-facter/facter/facter_test.go b/plugin/collector/pulse-collector-facter/facter/facter_test.go index fca671ea6..18bb7ef45 100644 --- a/plugin/collector/pulse-collector-facter/facter/facter_test.go +++ b/plugin/collector/pulse-collector-facter/facter/facter_test.go @@ -134,7 +134,7 @@ func TestFacterInvalidBehavior(t *testing.T) { ) So(err, ShouldNotBeNil) - _, err = f.GetMetricTypes(plugin.PluginConfigType{Data: cdata.NewNode()}) + _, err = f.GetMetricTypes(plugin.PluginConfigType{ConfigDataNode: cdata.NewNode()}) So(err, ShouldNotBeNil) }) Convey("returns not as much values as asked", t, func() { @@ -166,7 +166,7 @@ func TestFacterGetMetricsTypes(t *testing.T) { Convey("GetMetricsTypes returns no error", func() { // exectues without error - metricTypes, err := f.GetMetricTypes(plugin.PluginConfigType{Data: cdata.NewNode()}) + metricTypes, err := f.GetMetricTypes(plugin.PluginConfigType{ConfigDataNode: cdata.NewNode()}) So(err, ShouldBeNil) Convey("metricTypesReply should contain more than zero metrics", func() { So(metricTypes, ShouldNotBeEmpty) diff --git a/plugin/collector/pulse-collector-perfevents/perfevents/perfevents_test.go b/plugin/collector/pulse-collector-perfevents/perfevents/perfevents_test.go index 41013f4ed..80addeb9b 100644 --- a/plugin/collector/pulse-collector-perfevents/perfevents/perfevents_test.go +++ b/plugin/collector/pulse-collector-perfevents/perfevents/perfevents_test.go @@ -52,7 +52,7 @@ func TestPluginLoads(t *testing.T) { p := NewPerfevents() Convey("invalid init", func() { p.Init = func() error { return errors.New("error") } - _, err := p.GetMetricTypes(plugin.PluginConfigType{Data: cdata.NewNode()}) + _, err := p.GetMetricTypes(plugin.PluginConfigType{cdata.NewNode()}) So(err, ShouldNotBeNil) }) Convey("set_supported_metrics", func() { diff --git a/plugin/collector/pulse-collector-psutil/psutil/psutil_integration_test.go b/plugin/collector/pulse-collector-psutil/psutil/psutil_integration_test.go index e50dc616f..b1a221752 100644 --- a/plugin/collector/pulse-collector-psutil/psutil/psutil_integration_test.go +++ b/plugin/collector/pulse-collector-psutil/psutil/psutil_integration_test.go @@ -57,7 +57,7 @@ func TestPsutilCollectMetrics(t *testing.T) { So(metrics, ShouldNotBeNil) }) Convey("get metric types", func() { - mts, err := p.GetMetricTypes(plugin.PluginConfigType{Data: cdata.NewNode()}) + mts, err := p.GetMetricTypes(plugin.PluginConfigType{cdata.NewNode()}) //prettyPrint(mts) So(err, ShouldBeNil) So(mts, ShouldNotBeNil) diff --git a/pulse.go b/pulse.go index 661148f40..1eac50f7e 100644 --- a/pulse.go +++ b/pulse.go @@ -20,6 +20,7 @@ limitations under the License. package main import ( + "encoding/json" "fmt" "io/ioutil" "os" @@ -97,18 +98,21 @@ var ( EnvVar: "PULSE_CACHE_EXPIRATION", Value: "500ms", } - + flConfig = cli.StringFlag{ + Name: "config", + Usage: "A path to a config file", + } flRestHttps = cli.BoolFlag{ Name: "rest-https", Usage: "start Pulse's API as https", } flRestCert = cli.StringFlag{ Name: "rest-cert", - Usage: "a path to a certificate to use for HTTPS deployment of Pulse's REST API.", + Usage: "A path to a certificate to use for HTTPS deployment of Pulse's REST API", } flRestKey = cli.StringFlag{ Name: "rest-key", - Usage: "a path to a key file to use for HTTPS deployment of Pulse's REST API.", + Usage: "A path to a key file to use for HTTPS deployment of Pulse's REST API", } gitversion string @@ -158,6 +162,7 @@ func main() { flPluginTrust, flKeyringFile, flRestCert, + flConfig, flRestHttps, flRestKey, } @@ -201,7 +206,7 @@ func action(ctx *cli.Context) { if err != nil { log.Fatal(fmt.Sprintf("invalid cache-expiration format: %s", cachestr)) } - + config := ctx.String("config") restHttps := ctx.Bool("rest-https") restKey := ctx.String("rest-key") restCert := ctx.String("rest-cert") @@ -259,13 +264,32 @@ func action(ctx *cli.Context) { } log.Info("setting log path to: stdout") - coreModules = []coreModule{} - - c := control.New( + controlOpts := []control.ControlOpt{ control.MaxRunningPlugins(maxRunning), control.CacheExpiration(cache), + } + + if config != "" { + b, err := ioutil.ReadFile(config) + if err != nil { + log.WithFields(log.Fields{ + "block": "main", + "_module": "pulsed", + "error": err.Error(), + "path": config, + }).Fatal("unable to read config") + } + cfg := control.NewConfig() + err = json.Unmarshal(b, &cfg) + controlOpts = append(controlOpts, control.OptSetConfig(cfg)) + } + + c := control.New( + controlOpts..., ) + coreModules = []coreModule{} + coreModules = append(coreModules, c) s := scheduler.New( scheduler.CollectQSizeOption(defaultQueueSize),