Skip to content

Commit

Permalink
Merge pull request intelsdi-x#805 from tjmcs/tb/processor-chaining
Browse files Browse the repository at this point in the history
Fixes intelsdi-x#785 (Adds ability to chain process plugins)
  • Loading branch information
pittma committed Mar 29, 2016
2 parents f4dbc8c + 28858f5 commit 2c573a1
Show file tree
Hide file tree
Showing 2 changed files with 144 additions and 7 deletions.
57 changes: 50 additions & 7 deletions scheduler/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -286,16 +286,17 @@ func (p *processJob) Run() {
var buf bytes.Buffer
enc := gob.NewEncoder(&buf)

switch p.parentJob.Type() {
case collectJobType:
switch pt := p.parentJob.(type) {
case *collectorJob:
switch p.contentType {
case plugin.SnapGOBContentType:
metrics := make([]plugin.PluginMetricType, len(p.parentJob.(*collectorJob).metrics))
for i, m := range p.parentJob.(*collectorJob).metrics {
switch mt := m.(type) {
case plugin.PluginMetricType:
metrics := make([]plugin.PluginMetricType, len(pt.metrics))
for i, m := range pt.metrics {
if mt, ok := m.(plugin.PluginMetricType); ok {
metrics[i] = mt
default:
} else {
// TODO; add a log statement and return an error
// (instead of panicking)
panic("unsupported type")
}
}
Expand All @@ -318,6 +319,44 @@ func (p *processJob) Run() {
}
p.content = content
default:
// TODO; change log from Fatal to Error and return an error
// (instead of panicking)
log.WithFields(log.Fields{
"_module": "scheduler-job",
"block": "run",
"job-type": "processor",
"content-type": p.contentType,
"plugin-name": p.name,
"plugin-version": p.version,
"plugin-config": p.config,
}).Fatal("unsupported content type")
panic(fmt.Sprintf("unsupported content type. {plugin name: %s version: %v content-type: '%v'}", p.name, p.version, p.contentType))
}
case *processJob:
// TODO: Remove switch statement and rely on processor to catch errors in type
// (separation of concerns; remove content-type definition from the framework?)
switch p.contentType {
case plugin.SnapGOBContentType:
_, content, errs := p.processor.ProcessMetrics(p.contentType, pt.content, p.name, p.version, p.config, p.taskID)
if errs != nil {
for _, e := range errs {
log.WithFields(log.Fields{
"_module": "scheduler-job",
"block": "run",
"job-type": "processor",
"content-type": p.contentType,
"plugin-name": p.name,
"plugin-version": p.version,
"plugin-config": p.config,
"error": e.Error(),
}).Error("error with processor job")
}
p.AddErrors(errs...)
}
p.content = content
default:
// TODO; change log from Fatal to Error and return an error
// (instead of panicking)
log.WithFields(log.Fields{
"_module": "scheduler-job",
"block": "run",
Expand All @@ -330,6 +369,8 @@ func (p *processJob) Run() {
panic(fmt.Sprintf("unsupported content type. {plugin name: %s version: %v content-type: '%v'}", p.name, p.version, p.contentType))
}
default:
// TODO; change log from Fatal to Error and return an error
// (instead of panicking)
log.WithFields(log.Fields{
"_module": "scheduler-job",
"block": "run",
Expand Down Expand Up @@ -418,6 +459,8 @@ func (p *publisherJob) Run() {
panic(fmt.Sprintf("unsupported content type. {plugin name: %s version: %v content-type: '%v'}", p.name, p.version, p.contentType))
}
case processJobType:
// TODO: Remove switch statement and rely on publisher to catch errors in type
// (separation of concerns; remove content-type definition from the framework?)
switch p.contentType {
case plugin.SnapGOBContentType:
errs := p.publisher.PublishMetrics(p.contentType, p.parentJob.(*processJob).content, p.name, p.version, p.config, p.taskID)
Expand Down
94 changes: 94 additions & 0 deletions scheduler/workflow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,11 @@ import (
"testing"
"time"

"github.com/intelsdi-x/gomit"
"github.com/intelsdi-x/snap/control"
"github.com/intelsdi-x/snap/core"
"github.com/intelsdi-x/snap/core/cdata"
"github.com/intelsdi-x/snap/core/control_event"
"github.com/intelsdi-x/snap/pkg/promise"
"github.com/intelsdi-x/snap/pkg/schedule"
"github.com/intelsdi-x/snap/scheduler/wmap"
Expand All @@ -50,6 +52,36 @@ type MockMetricType struct {
namespace []string
}

type mockPluginEvent struct {
LoadedPluginName string
LoadedPluginVersion int
UnloadedPluginName string
UnloadedPluginVersion int
PluginType int
EventNamespace string
}

type listenToPluginEvent struct {
plugin *mockPluginEvent
done chan struct{}
}

func newListenToPluginEvent() *listenToPluginEvent {
return &listenToPluginEvent{
done: make(chan struct{}),
}
}

func (l *listenToPluginEvent) HandleGomitEvent(e gomit.Event) {
go func() {
switch e.Body.(type) {
case *control_event.LoadPluginEvent:
l.done <- struct{}{}
default:
}
}()
}

func (m MockMetricType) Namespace() []string {
return m.namespace
}
Expand Down Expand Up @@ -126,6 +158,68 @@ func TestCollectPublishWorkflow(t *testing.T) {
})
}

func TestProcessChainingWorkflow(t *testing.T) {
log.SetLevel(log.FatalLevel)
Convey("Given a started plugin control", t, func() {

c := control.New(control.GetDefaultConfig())
c.Start()
s := New(GetDefaultConfig())
s.SetMetricManager(c)
Convey("create a workflow with chained processors", func() {
lpe := newListenToPluginEvent()
c.RegisterEventHandler("Control.PluginLoaded", lpe)
rp, err := core.NewRequestedPlugin(snap_collector_mock2_path)
So(err, ShouldBeNil)
_, err = c.Load(rp)
So(err, ShouldBeNil)
<-lpe.done
rp2, err := core.NewRequestedPlugin(snap_publisher_file_path)
So(err, ShouldBeNil)
_, err = c.Load(rp2)
So(err, ShouldBeNil)
<-lpe.done
rp3, err := core.NewRequestedPlugin(snap_processor_passthru_path)
So(err, ShouldBeNil)
_, err = c.Load(rp3)
So(err, ShouldBeNil)
<-lpe.done

metrics, err2 := c.MetricCatalog()
So(err2, ShouldBeNil)
So(metrics, ShouldNotBeEmpty)

w := wmap.NewWorkflowMap()
w.CollectNode.AddMetric("/intel/mock/foo", 2)
w.CollectNode.AddConfigItem("/intel/mock/foo", "password", "secret")

pu := wmap.NewPublishNode("file", 3)
pu.AddConfigItem("file", "/tmp/snap-TestCollectPublishWorkflow.out")

pr1 := wmap.NewProcessNode("passthru", 1)

pr2 := wmap.NewProcessNode("passthru", 1)

pr2.Add(pu)
pr1.Add(pr2)
w.CollectNode.Add(pr1)

Convey("Start scheduler", func() {
err := s.Start()
So(err, ShouldBeNil)
Convey("Create task", func() {
t, err := s.CreateTask(schedule.NewSimpleSchedule(time.Millisecond*500), w, false)
So(err.Errors(), ShouldBeEmpty)
So(t, ShouldNotBeNil)
t.(*task).Spin()
time.Sleep(3 * time.Second)

})
})
})
})
}

// The mocks below are here for testing work submission
type Mock1 struct {
sync.Mutex
Expand Down

0 comments on commit 2c573a1

Please sign in to comment.