Skip to content
This repository has been archived by the owner on Nov 8, 2022. It is now read-only.

Commit

Permalink
Fixes #785 (adds ability to chain process plugins and the unit test t…
Browse files Browse the repository at this point in the history
…o prove that the code added actually works)
  • Loading branch information
Tom McSweeney committed Mar 25, 2016
1 parent 89e29ab commit 28858f5
Show file tree
Hide file tree
Showing 2 changed files with 144 additions and 7 deletions.
57 changes: 50 additions & 7 deletions scheduler/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -286,16 +286,17 @@ func (p *processJob) Run() {
var buf bytes.Buffer
enc := gob.NewEncoder(&buf)

switch p.parentJob.Type() {
case collectJobType:
switch pt := p.parentJob.(type) {
case *collectorJob:
switch p.contentType {
case plugin.SnapGOBContentType:
metrics := make([]plugin.PluginMetricType, len(p.parentJob.(*collectorJob).metrics))
for i, m := range p.parentJob.(*collectorJob).metrics {
switch mt := m.(type) {
case plugin.PluginMetricType:
metrics := make([]plugin.PluginMetricType, len(pt.metrics))
for i, m := range pt.metrics {
if mt, ok := m.(plugin.PluginMetricType); ok {
metrics[i] = mt
default:
} else {
// TODO; add a log statement and return an error
// (instead of panicking)
panic("unsupported type")
}
}
Expand All @@ -318,6 +319,44 @@ func (p *processJob) Run() {
}
p.content = content
default:
// TODO; change log from Fatal to Error and return an error
// (instead of panicking)
log.WithFields(log.Fields{
"_module": "scheduler-job",
"block": "run",
"job-type": "processor",
"content-type": p.contentType,
"plugin-name": p.name,
"plugin-version": p.version,
"plugin-config": p.config,
}).Fatal("unsupported content type")
panic(fmt.Sprintf("unsupported content type. {plugin name: %s version: %v content-type: '%v'}", p.name, p.version, p.contentType))
}
case *processJob:
// TODO: Remove switch statement and rely on processor to catch errors in type
// (separation of concerns; remove content-type definition from the framework?)
switch p.contentType {
case plugin.SnapGOBContentType:
_, content, errs := p.processor.ProcessMetrics(p.contentType, pt.content, p.name, p.version, p.config, p.taskID)
if errs != nil {
for _, e := range errs {
log.WithFields(log.Fields{
"_module": "scheduler-job",
"block": "run",
"job-type": "processor",
"content-type": p.contentType,
"plugin-name": p.name,
"plugin-version": p.version,
"plugin-config": p.config,
"error": e.Error(),
}).Error("error with processor job")
}
p.AddErrors(errs...)
}
p.content = content
default:
// TODO; change log from Fatal to Error and return an error
// (instead of panicking)
log.WithFields(log.Fields{
"_module": "scheduler-job",
"block": "run",
Expand All @@ -330,6 +369,8 @@ func (p *processJob) Run() {
panic(fmt.Sprintf("unsupported content type. {plugin name: %s version: %v content-type: '%v'}", p.name, p.version, p.contentType))
}
default:
// TODO; change log from Fatal to Error and return an error
// (instead of panicking)
log.WithFields(log.Fields{
"_module": "scheduler-job",
"block": "run",
Expand Down Expand Up @@ -418,6 +459,8 @@ func (p *publisherJob) Run() {
panic(fmt.Sprintf("unsupported content type. {plugin name: %s version: %v content-type: '%v'}", p.name, p.version, p.contentType))
}
case processJobType:
// TODO: Remove switch statement and rely on publisher to catch errors in type
// (separation of concerns; remove content-type definition from the framework?)
switch p.contentType {
case plugin.SnapGOBContentType:
errs := p.publisher.PublishMetrics(p.contentType, p.parentJob.(*processJob).content, p.name, p.version, p.config, p.taskID)
Expand Down
94 changes: 94 additions & 0 deletions scheduler/workflow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,11 @@ import (
"testing"
"time"

"github.com/intelsdi-x/gomit"
"github.com/intelsdi-x/snap/control"
"github.com/intelsdi-x/snap/core"
"github.com/intelsdi-x/snap/core/cdata"
"github.com/intelsdi-x/snap/core/control_event"
"github.com/intelsdi-x/snap/pkg/promise"
"github.com/intelsdi-x/snap/pkg/schedule"
"github.com/intelsdi-x/snap/scheduler/wmap"
Expand All @@ -50,6 +52,36 @@ type MockMetricType struct {
namespace []string
}

type mockPluginEvent struct {
LoadedPluginName string
LoadedPluginVersion int
UnloadedPluginName string
UnloadedPluginVersion int
PluginType int
EventNamespace string
}

type listenToPluginEvent struct {
plugin *mockPluginEvent
done chan struct{}
}

func newListenToPluginEvent() *listenToPluginEvent {
return &listenToPluginEvent{
done: make(chan struct{}),
}
}

func (l *listenToPluginEvent) HandleGomitEvent(e gomit.Event) {
go func() {
switch e.Body.(type) {
case *control_event.LoadPluginEvent:
l.done <- struct{}{}
default:
}
}()
}

func (m MockMetricType) Namespace() []string {
return m.namespace
}
Expand Down Expand Up @@ -126,6 +158,68 @@ func TestCollectPublishWorkflow(t *testing.T) {
})
}

func TestProcessChainingWorkflow(t *testing.T) {
log.SetLevel(log.FatalLevel)
Convey("Given a started plugin control", t, func() {

c := control.New(control.GetDefaultConfig())
c.Start()
s := New(GetDefaultConfig())
s.SetMetricManager(c)
Convey("create a workflow with chained processors", func() {
lpe := newListenToPluginEvent()
c.RegisterEventHandler("Control.PluginLoaded", lpe)
rp, err := core.NewRequestedPlugin(snap_collector_mock2_path)
So(err, ShouldBeNil)
_, err = c.Load(rp)
So(err, ShouldBeNil)
<-lpe.done
rp2, err := core.NewRequestedPlugin(snap_publisher_file_path)
So(err, ShouldBeNil)
_, err = c.Load(rp2)
So(err, ShouldBeNil)
<-lpe.done
rp3, err := core.NewRequestedPlugin(snap_processor_passthru_path)
So(err, ShouldBeNil)
_, err = c.Load(rp3)
So(err, ShouldBeNil)
<-lpe.done

metrics, err2 := c.MetricCatalog()
So(err2, ShouldBeNil)
So(metrics, ShouldNotBeEmpty)

w := wmap.NewWorkflowMap()
w.CollectNode.AddMetric("/intel/mock/foo", 2)
w.CollectNode.AddConfigItem("/intel/mock/foo", "password", "secret")

pu := wmap.NewPublishNode("file", 3)
pu.AddConfigItem("file", "/tmp/snap-TestCollectPublishWorkflow.out")

pr1 := wmap.NewProcessNode("passthru", 1)

pr2 := wmap.NewProcessNode("passthru", 1)

pr2.Add(pu)
pr1.Add(pr2)
w.CollectNode.Add(pr1)

Convey("Start scheduler", func() {
err := s.Start()
So(err, ShouldBeNil)
Convey("Create task", func() {
t, err := s.CreateTask(schedule.NewSimpleSchedule(time.Millisecond*500), w, false)
So(err.Errors(), ShouldBeEmpty)
So(t, ShouldNotBeNil)
t.(*task).Spin()
time.Sleep(3 * time.Second)

})
})
})
})
}

// The mocks below are here for testing work submission
type Mock1 struct {
sync.Mutex
Expand Down

0 comments on commit 28858f5

Please sign in to comment.