Skip to content

Commit

Permalink
Fixes intelsdi-x#799: Exposes collector plugin defaults to the framework
Browse files Browse the repository at this point in the history
  • Loading branch information
tiffanyfay committed May 30, 2016
1 parent 055d948 commit 309d917
Show file tree
Hide file tree
Showing 4 changed files with 75 additions and 12 deletions.
34 changes: 31 additions & 3 deletions control/control_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -807,7 +807,7 @@ func TestMetricConfig(t *testing.T) {
c.Start()
lpe := newListenToPluginEvent()
c.eventManager.RegisterHandler("Control.PluginLoaded", lpe)
_, err := load(c, JSONRPCPluginPath)
_, err := load(c, PluginPath)
So(err, ShouldBeNil)
<-lpe.done
cd := cdata.NewNode()
Expand Down Expand Up @@ -838,6 +838,32 @@ func TestMetricConfig(t *testing.T) {
time.Sleep(100 * time.Millisecond)
})

Convey("config provided by defaults", t, func() {
c := New(GetDefaultConfig())
c.Start()
lpe := newListenToPluginEvent()
c.eventManager.RegisterHandler("Control.PluginLoaded", lpe)
_, err := load(c, JSONRPCPluginPath)
So(err, ShouldBeNil)
<-lpe.done
cd := cdata.NewNode()
m1 := MockMetricType{
namespace: core.NewNamespace("intel", "mock", "foo"),
}

Convey("So metric should be valid with config", func() {
errs := c.validateMetricTypeSubscription(m1, cd)
So(errs, ShouldBeNil)
})
c.Config.Plugins.All.AddItem("name", ctypes.ConfigValueStr{Value: "jane"})
Convey("So mock should have name: bob config from defaults", func() {
So(c.Config.Plugins.pluginCache["0:mock:1"].Table()["name"], ShouldResemble, ctypes.ConfigValueStr{Value: "bob"})
})

c.Stop()
time.Sleep(100 * time.Millisecond)
})

Convey("nil config provided by task", t, func() {
config := GetDefaultConfig()
config.Plugins.All.AddItem("password", ctypes.ConfigValueStr{Value: "testval"})
Expand Down Expand Up @@ -1284,6 +1310,8 @@ func TestCollectMetrics(t *testing.T) {
for i := range cr {
So(cr[i].Data(), ShouldContainSubstring, "The mock collected data!")
So(cr[i].Data(), ShouldContainSubstring, "test=true")
So(cr[i].Data(), ShouldContainSubstring, "name={bob}")
So(cr[i].Data(), ShouldContainSubstring, "password={testval}")
}
}
ap := c.AvailablePlugins()
Expand Down Expand Up @@ -1510,7 +1538,7 @@ func TestPublishMetrics(t *testing.T) {

Convey("Subscribe to file publisher with good config", func() {
n := cdata.NewNode()
c.Config.Plugins.Publisher.Plugins[lp.Name()] = newPluginConfigItem(optAddPluginConfigItem("file", ctypes.ConfigValueStr{Value: "/tmp/snap-TestPublishMetrics.out"}))
c.Config.Plugins.Publisher.Plugins["file"] = newPluginConfigItem(optAddPluginConfigItem("file", ctypes.ConfigValueStr{Value: "/tmp/snap-TestPublishMetrics.out"}))
pool, errp := c.pluginRunner.AvailablePlugins().getOrCreatePool("publisher:file:3")
So(errp, ShouldBeNil)
pool.Subscribe("1", strategy.UnboundSubscriptionType)
Expand All @@ -1527,7 +1555,7 @@ func TestPublishMetrics(t *testing.T) {
enc.Encode(metrics)
contentType := plugin.SnapGOBContentType
errs := c.PublishMetrics(contentType, buf.Bytes(), "file", 3, n.Table(), uuid.New())
So(errs, ShouldBeNil)
So(errs, ShouldBeEmpty)
ap := c.AvailablePlugins()
So(ap, ShouldNotBeEmpty)
})
Expand Down
37 changes: 36 additions & 1 deletion control/plugin_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -355,10 +355,45 @@ func (p *pluginManager) LoadPlugin(details *pluginDetails, emitter gomit.Emitter
lPlugin.ConfigPolicy = cp

if resp.Type == plugin.CollectorPluginType {
cfgNode := p.pluginConfig.getPluginConfigDataNode(core.PluginType(resp.Type), resp.Meta.Name, resp.Meta.Version)

if lPlugin.ConfigPolicy != nil {
// Get config policy from root/plugin defaults
ncd := lPlugin.ConfigPolicy.Get([]string{""})
_, errs := ncd.Process(cfgNode.Table())
if errs != nil && errs.HasErrors() {
fields := make(map[string]interface{})
se := serror.New(fmt.Errorf("ConfigPolicy processing failed. Not loading collector:%v:%v", resp.Meta.Name, resp.Meta.Version))
for i, e := range errs.Errors() {
idx := fmt.Sprintf("%d", i)
fields[idx] = e.Error()
}
fields["name"] = resp.Meta.Name
fields["version"] = resp.Meta.Version
se.SetFields(fields)
return nil, se
}

// Update config policy with defaults
cp, err = c.GetConfigPolicy()
if err != nil {
pmLogger.WithFields(log.Fields{
"_block": "load-plugin",
"plugin-type": "collector",
"error": err.Error(),
"plugin-name": ap.Name(),
"plugin-version": ap.Version(),
"plugin-id": ap.ID(),
}).Error("error in getting config policy")
return nil, serror.New(err)
}
lPlugin.ConfigPolicy = cp
}

colClient := ap.client.(client.PluginCollectorClient)

cfg := plugin.ConfigType{
ConfigDataNode: p.pluginConfig.getPluginConfigDataNode(core.PluginType(resp.Type), resp.Meta.Name, resp.Meta.Version),
ConfigDataNode: cfgNode,
}

metricTypes, err := colClient.GetMetricTypes(cfg)
Expand Down
8 changes: 5 additions & 3 deletions docs/PLUGIN_AUTHORING.md
Original file line number Diff line number Diff line change
Expand Up @@ -95,20 +95,22 @@ github.com/intelsdi-x/snap/control/plugin/cpolicy
github.com/intelsdi-x/snap/core/ctypes
```
### Writing a collector plugin
A Snap collector plugin collects telemetry data by communicating with the Snap daemon. To confine to collector plugin interfaces and metric types defined in Snap, a collector plugin must implement the following methods:
A Snap collector plugin collects telemetry data by communicating with the Snap daemon. To confine to collector plugin interfaces and metric types defined in Snap, a collector plugin must implement the following methods:
```
GetConfigPolicy() (*cpolicy.ConfigPolicy, error)
CollectMetrics([]MetricType) ([]MetricType, error)
GetMetricTypes(ConfigType) ([]MetricType, error)
```
For creating a default plugin level config policy, the rule needs to be added at the root, i.e. at `[]string{""}`. This will make it so a config file doesn't need to be passed in for these rules. An example use case would be for the URL the Apache Collector collects from.

### Writing a processor plugin
A Snap processor plugin allows filtering, aggregation, transformation, etc of collected telemetry data. To complaint with processor plugin interfaces defined in Snap, a processor plugin must implement the following methods:
A Snap processor plugin allows filtering, aggregation, transformation, etc of collected telemetry data. To complaint with processor plugin interfaces defined in Snap, a processor plugin must implement the following methods:
```
GetConfigPolicy() (*cpolicy.ConfigPolicy, error)
Process(contentType string, content []byte, config map[string]ctypes.ConfigValue) (string, []byte, error)
```
### Writing a publisher plugin
A Snap publisher plugin allows publishing processed telemetry data into a variety of systems, databases, and monitors through Snap metrics. To compliant with metric types and plugin interfaces defined in Snap, a publisher plugin must implement the following methods:
A Snap publisher plugin allows publishing processed telemetry data into a variety of systems, databases, and monitors through Snap metrics. To compliant with metric types and plugin interfaces defined in Snap, a publisher plugin must implement the following methods:
```
GetConfigPolicy() (*cpolicy.ConfigPolicy, error)
Publish(contentType string, content []byte, config map[string]ctypes.ConfigValue) error
Expand Down
8 changes: 3 additions & 5 deletions plugin/collector/snap-collector-mock1/mock/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,9 @@ func (f *Mock) CollectMetrics(mts []plugin.MetricType) ([]plugin.MetricType, err
}
} else {
if cv, ok := p.Config().Table()["test"]; ok {
p.Data_ = fmt.Sprintf("The mock collected data! config data: user=%s password=%s test=%v", p.Config().Table()["user"], p.Config().Table()["password"], cv.(ctypes.ConfigValueBool).Value)
p.Data_ = fmt.Sprintf("The mock collected data! config data: name=%s password=%s test=%v", p.Config().Table()["name"], p.Config().Table()["password"], cv.(ctypes.ConfigValueBool).Value)
} else {
p.Data_ = fmt.Sprintf("The mock collected data! config data: user=%s password=%s", p.Config().Table()["user"], p.Config().Table()["password"])
p.Data_ = fmt.Sprintf("The mock collected data! config data: name=%s password=%s", p.Config().Table()["name"], p.Config().Table()["password"])
}
p.Timestamp_ = time.Now()
metrics = append(metrics, p)
Expand Down Expand Up @@ -99,11 +99,9 @@ func (f *Mock) GetMetricTypes(cfg plugin.ConfigType) ([]plugin.MetricType, error
func (f *Mock) GetConfigPolicy() (*cpolicy.ConfigPolicy, error) {
c := cpolicy.New()
rule, _ := cpolicy.NewStringRule("name", false, "bob")
rule2, _ := cpolicy.NewStringRule("password", true)
p := cpolicy.NewPolicyNode()
p.Add(rule)
p.Add(rule2)
c.Add([]string{"intel", "mock", "foo"}, p)
c.Add([]string{""}, p)
return c, nil
}

Expand Down

0 comments on commit 309d917

Please sign in to comment.