Skip to content
This repository has been archived by the owner on Nov 8, 2022. It is now read-only.

Commit

Permalink
Fixes #820 - Task fails when metrics come from different versions of …
Browse files Browse the repository at this point in the history
…the same plugin
  • Loading branch information
jcooklin committed Mar 30, 2016
1 parent 2c573a1 commit e9f052e
Show file tree
Hide file tree
Showing 3 changed files with 72 additions and 32 deletions.
13 changes: 13 additions & 0 deletions control/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,19 @@ func GetDefaultConfig() *Config {
}
}

// NewPluginsConfig returns a map of *pluginConfigItems where the key is the plugin name.
func NewPluginsConfig() map[string]*pluginConfigItem {
return map[string]*pluginConfigItem{}
}

// NewPluginConfigItem returns a *pluginConfigItem.
func NewPluginConfigItem() *pluginConfigItem {
return &pluginConfigItem{
cdata.NewNode(),
map[int]*cdata.ConfigDataNode{},
}
}

func newPluginTypeConfigItem() *pluginTypeConfigItem {
return &pluginTypeConfigItem{
make(map[string]*pluginConfigItem),
Expand Down
14 changes: 4 additions & 10 deletions control/control.go
Original file line number Diff line number Diff line change
Expand Up @@ -577,6 +577,8 @@ func (p *pluginControl) gatherCollectors(mts []core.Metric) ([]core.Plugin, []se
// types.
colPlugins := make(map[string]*loadedPlugin)
for _, mt := range mts {
// If the version provided is <1 we will get the latest
// plugin for the given metric.
m, err := p.metricCatalog.Get(mt.Namespace(), mt.Version())
if err != nil {
serrs = append(serrs, serror.New(err, map[string]interface{}{
Expand All @@ -585,16 +587,8 @@ func (p *pluginControl) gatherCollectors(mts []core.Metric) ([]core.Plugin, []se
}))
continue
}
// if the metric subscription is to version -1, we need to carry
// that forward in the subscription.
if mt.Version() < 1 {
// make a copy of the loadedPlugin and overwrite the version.
npl := *m.Plugin
npl.Meta.Version = -1
colPlugins[npl.Key()] = &npl
} else {
colPlugins[m.Plugin.Key()] = m.Plugin
}

colPlugins[m.Plugin.Key()] = m.Plugin
}
if len(serrs) > 0 {
return plugins, serrs
Expand Down
77 changes: 55 additions & 22 deletions scheduler/workflow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ import (
"github.com/intelsdi-x/snap/core"
"github.com/intelsdi-x/snap/core/cdata"
"github.com/intelsdi-x/snap/core/control_event"
"github.com/intelsdi-x/snap/core/ctypes"
"github.com/intelsdi-x/snap/core/scheduler_event"
"github.com/intelsdi-x/snap/pkg/promise"
"github.com/intelsdi-x/snap/pkg/schedule"
"github.com/intelsdi-x/snap/scheduler/wmap"
Expand All @@ -43,9 +45,12 @@ import (

var (
SnapPath = os.Getenv("SNAP_PATH")
snap_collector_mock1_path = path.Join(SnapPath, "plugin", "snap-collector-mock1")
snap_collector_mock2_path = path.Join(SnapPath, "plugin", "snap-collector-mock2")
snap_processor_passthru_path = path.Join(SnapPath, "plugin", "snap-processor-passthru")
snap_publisher_file_path = path.Join(SnapPath, "plugin", "snap-publisher-file")

metricsToCollect = 3
)

type MockMetricType struct {
Expand All @@ -61,22 +66,35 @@ type mockPluginEvent struct {
EventNamespace string
}

type listenToPluginEvent struct {
plugin *mockPluginEvent
done chan struct{}
type eventListener struct {
plugin *mockPluginEvent
metricCollectCount int
MetricCollectFailureCount int
metricsCollectionDone chan bool
done chan struct{}
}

func newListenToPluginEvent() *listenToPluginEvent {
return &listenToPluginEvent{
func newEventListener() *eventListener {
return &eventListener{
done: make(chan struct{}),
}
}

func (l *listenToPluginEvent) HandleGomitEvent(e gomit.Event) {
func (l *eventListener) HandleGomitEvent(e gomit.Event) {
go func() {
switch e.Body.(type) {
case *control_event.LoadPluginEvent:
l.done <- struct{}{}
case *scheduler_event.MetricCollectedEvent:
if l.metricCollectCount > metricsToCollect {
l.done <- struct{}{}
}
l.metricCollectCount++
case *scheduler_event.MetricCollectionFailedEvent:
if l.MetricCollectFailureCount > 1 {
l.done <- struct{}{}
}
l.MetricCollectFailureCount++
default:
}
}()
Expand Down Expand Up @@ -105,8 +123,13 @@ func (m MockMetricType) Data() interface{} {
func TestCollectPublishWorkflow(t *testing.T) {
log.SetLevel(log.FatalLevel)
Convey("Given a started plugin control", t, func() {

c := control.New(control.GetDefaultConfig())
cfg := control.GetDefaultConfig()
cfg.Plugins.Collector.Plugins = control.NewPluginsConfig()
cfg.Plugins.Collector.Plugins["mock"] = control.NewPluginConfigItem()
cfg.Plugins.Collector.Plugins["mock"].Versions = map[int]*cdata.ConfigDataNode{}
cfg.Plugins.Collector.Plugins["mock"].Versions[1] = cdata.NewNode()
cfg.Plugins.Collector.Plugins["mock"].Versions[1].AddItem("test", ctypes.ConfigValueBool{Value: true})
c := control.New(cfg)
c.Start()
s := New(GetDefaultConfig())
s.SetMetricManager(c)
Expand All @@ -123,35 +146,43 @@ func TestCollectPublishWorkflow(t *testing.T) {
So(err, ShouldBeNil)
_, err = c.Load(rp3)
So(err, ShouldBeNil)
time.Sleep(100 * time.Millisecond)
rp4, err := core.NewRequestedPlugin(snap_collector_mock1_path)
So(err, ShouldBeNil)
_, err = c.Load(rp4)
So(err, ShouldBeNil)

metrics, err2 := c.MetricCatalog()
So(err2, ShouldBeNil)
So(metrics, ShouldNotBeEmpty)

// The following two metrics will result in both versions (1 and 2) of
// the mock plugin to be used. '/intel/mock/test' will be coming from
// mock version 1 due to the global config above.
w := wmap.NewWorkflowMap()
w.CollectNode.AddMetric("/intel/mock/foo", 2)
w.CollectNode.AddMetric("/intel/mock/foo", -1)
w.CollectNode.AddMetric("/intel/mock/test", -1)
w.CollectNode.AddConfigItem("/intel/mock/foo", "password", "secret")

pu := wmap.NewPublishNode("file", 3)
pu.AddConfigItem("file", "/tmp/snap-TestCollectPublishWorkflow.out")

pr := wmap.NewProcessNode("passthru", 1)
time.Sleep(100 * time.Millisecond)

pr.Add(pu)
w.CollectNode.Add(pr)

Convey("Start scheduler", func() {
err := s.Start()
So(err, ShouldBeNil)
Convey("Create task", func() {
t, err := s.CreateTask(schedule.NewSimpleSchedule(time.Millisecond*500), w, false)
Convey("Create and start task", func() {
el := newEventListener()
s.RegisterEventHandler("TestCollectPublishWorkflow", el)
t, err := s.CreateTask(schedule.NewSimpleSchedule(time.Millisecond*200), w, true)
So(err.Errors(), ShouldBeEmpty)
So(t, ShouldNotBeNil)
t.(*task).Spin()
time.Sleep(3 * time.Second)

<-el.done
So(t.LastFailureMessage(), ShouldBeEmpty)
So(t.FailedCount(), ShouldEqual, 0)
So(t.HitCount(), ShouldBeGreaterThan, metricsToCollect)
})
})
})
Expand All @@ -167,7 +198,7 @@ func TestProcessChainingWorkflow(t *testing.T) {
s := New(GetDefaultConfig())
s.SetMetricManager(c)
Convey("create a workflow with chained processors", func() {
lpe := newListenToPluginEvent()
lpe := newEventListener()
c.RegisterEventHandler("Control.PluginLoaded", lpe)
rp, err := core.NewRequestedPlugin(snap_collector_mock2_path)
So(err, ShouldBeNil)
Expand Down Expand Up @@ -208,12 +239,14 @@ func TestProcessChainingWorkflow(t *testing.T) {
err := s.Start()
So(err, ShouldBeNil)
Convey("Create task", func() {
t, err := s.CreateTask(schedule.NewSimpleSchedule(time.Millisecond*500), w, false)
t, err := s.CreateTask(schedule.NewSimpleSchedule(time.Millisecond*200), w, true)
s.RegisterEventHandler("TestProcessChainingWorkflow", lpe)
So(err.Errors(), ShouldBeEmpty)
So(t, ShouldNotBeNil)
t.(*task).Spin()
time.Sleep(3 * time.Second)

<-lpe.done
So(t.LastFailureMessage(), ShouldBeEmpty)
So(t.FailedCount(), ShouldEqual, 0)
So(t.HitCount(), ShouldBeGreaterThan, metricsToCollect)
})
})
})
Expand Down

0 comments on commit e9f052e

Please sign in to comment.