Skip to content
This repository has been archived by the owner on Nov 8, 2022. It is now read-only.

Commit

Permalink
Improves reliability of tests
Browse files Browse the repository at this point in the history
Adds more event handling to determine when plugins are loaded.
  • Loading branch information
jcooklin committed Oct 1, 2015
1 parent 002be38 commit ef06e67
Show file tree
Hide file tree
Showing 3 changed files with 86 additions and 51 deletions.
86 changes: 51 additions & 35 deletions control/control_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ import (
"testing"
"time"

. "github.com/smartystreets/goconvey/convey"

"github.com/intelsdi-x/gomit"
"github.com/intelsdi-x/pulse/control/plugin"
"github.com/intelsdi-x/pulse/control/plugin/cpolicy"
Expand All @@ -19,8 +21,6 @@ import (
"github.com/intelsdi-x/pulse/core/control_event"
"github.com/intelsdi-x/pulse/core/ctypes"
"github.com/intelsdi-x/pulse/core/perror"

. "github.com/smartystreets/goconvey/convey"
)

// Mock Executor used to test
Expand Down Expand Up @@ -76,7 +76,7 @@ func TestSwapPlugin(t *testing.T) {
if PulsePath != "" {
Convey("SwapPlugin", t, func() {
c := New()
lpe := new(listenToPluginEvent)
lpe := newListenToPluginEvent()
c.eventManager.RegisterHandler("Control.PluginsSwapped", lpe)
c.Start()
_, e := c.Load(PluginPath)
Expand Down Expand Up @@ -143,29 +143,33 @@ type listenToPluginEvent struct {
done chan struct{}
}

func newListenToPluginEvent() *listenToPluginEvent {
return &listenToPluginEvent{
done: make(chan struct{}),
plugin: &mockPluginEvent{},
}
}

func (l *listenToPluginEvent) HandleGomitEvent(e gomit.Event) {
switch v := e.Body.(type) {
case *control_event.LoadPluginEvent:
l.plugin = &mockPluginEvent{
LoadedPluginName: v.Name,
LoadedPluginVersion: v.Version,
PluginType: v.Type,
}
l.plugin.LoadedPluginName = v.Name
l.plugin.LoadedPluginVersion = v.Version
l.plugin.PluginType = v.Type
l.done <- struct{}{}
case *control_event.UnloadPluginEvent:
l.plugin = &mockPluginEvent{
UnloadedPluginName: v.Name,
UnloadedPluginVersion: v.Version,
PluginType: v.Type,
}
l.plugin.UnloadedPluginName = v.Name
l.plugin.UnloadedPluginVersion = v.Version
l.plugin.PluginType = v.Type
l.done <- struct{}{}
case *control_event.SwapPluginsEvent:
l.plugin = &mockPluginEvent{
LoadedPluginName: v.LoadedPluginName,
LoadedPluginVersion: v.LoadedPluginVersion,
UnloadedPluginName: v.UnloadedPluginName,
UnloadedPluginVersion: v.UnloadedPluginVersion,
PluginType: v.PluginType,
}
l.plugin.LoadedPluginName = v.LoadedPluginName
l.plugin.LoadedPluginVersion = v.LoadedPluginVersion
l.plugin.UnloadedPluginName = v.UnloadedPluginName
l.plugin.UnloadedPluginVersion = v.UnloadedPluginVersion
l.plugin.PluginType = v.PluginType
case *control_event.PluginSubscriptionEvent:
l.done <- struct{}{}
default:
fmt.Println("Got an event you're not handling")
}
Expand All @@ -192,7 +196,7 @@ func TestLoad(t *testing.T) {

Convey("loads successfully", func() {
c := New()
lpe := new(listenToPluginEvent)
lpe := newListenToPluginEvent()
c.eventManager.RegisterHandler("Control.PluginLoaded", lpe)
c.Start()
_, err := c.Load(PluginPath)
Expand Down Expand Up @@ -234,7 +238,7 @@ func TestLoad(t *testing.T) {
c := New()
c.pluginTrust = 1
c.signingManager = &mocksigningManager{signed: true}
lpe := new(listenToPluginEvent)
lpe := newListenToPluginEvent()
c.eventManager.RegisterHandler("Control.PluginLoaded", lpe)
c.Start()
_, err := c.Load(PluginPath)
Expand All @@ -246,7 +250,7 @@ func TestLoad(t *testing.T) {
c := New()
c.pluginTrust = 2
c.signingManager = &mocksigningManager{signed: false}
lpe := new(listenToPluginEvent)
lpe := newListenToPluginEvent()
c.eventManager.RegisterHandler("Control.PluginLoaded", lpe)
c.Start()
_, err := c.Load(PluginPath)
Expand All @@ -258,7 +262,7 @@ func TestLoad(t *testing.T) {
c := New()
c.pluginTrust = 1
c.signingManager = &mocksigningManager{signed: false}
lpe := new(listenToPluginEvent)
lpe := newListenToPluginEvent()
c.eventManager.RegisterHandler("Control.PluginLoaded", lpe)
c.Start()
_, err := c.Load(PluginPath)
Expand All @@ -279,21 +283,22 @@ func TestUnload(t *testing.T) {
Convey("pluginControl.Unload", t, func() {
Convey("unloads successfully", func() {
c := New()
lpe := new(listenToPluginEvent)
done := make(chan struct{})
lpe.done = done
c.eventManager.RegisterHandler("Control.PluginUnloaded", lpe)
lpe := newListenToPluginEvent()
c.eventManager.RegisterHandler("TestUnload", lpe)
c.Start()
time.Sleep(100 * time.Millisecond)
_, err := c.Load(PluginPath)
<-lpe.done

So(c.pluginManager.all(), ShouldNotBeEmpty)
So(err, ShouldBeNil)

pc := c.PluginCatalog()

So(len(pc), ShouldEqual, 1)
So(pc[0].Name(), ShouldEqual, "dummy1")
_, err2 := c.Unload(pc[0])
<-done
<-lpe.done
So(err2, ShouldBeNil)
So(lpe.plugin.UnloadedPluginName, ShouldEqual, "dummy1")
So(lpe.plugin.UnloadedPluginVersion, ShouldEqual, 1)
Expand All @@ -318,15 +323,15 @@ func TestUnload(t *testing.T) {
})
Convey("Listen for PluginUnloaded event", func() {
c := New()
lpe := new(listenToPluginEvent)
done := make(chan struct{})
lpe.done = done
lpe := newListenToPluginEvent()
c.eventManager.RegisterHandler("Control.PluginUnloaded", lpe)
c.Start()
time.Sleep(100 * time.Millisecond)
c.Load(PluginPath)
<-lpe.done
pc := c.PluginCatalog()
c.Unload(pc[0])
<-done
<-lpe.done
So(lpe.plugin.UnloadedPluginName, ShouldEqual, "dummy1")
})

Expand Down Expand Up @@ -636,6 +641,7 @@ func TestCollectMetrics(t *testing.T) {
c := New()
c.pluginRunner.(*runner).monitor.duration = time.Millisecond * 100
c.Start()
time.Sleep(100 * time.Millisecond)
// Load plugin
c.Load(PluginPath)

Expand Down Expand Up @@ -725,11 +731,15 @@ func TestPublishMetrics(t *testing.T) {

// Create controller
c := New()
lpe := newListenToPluginEvent()
c.eventManager.RegisterHandler("TestPublishMetrics", lpe)
c.pluginRunner.(*runner).monitor.duration = time.Millisecond * 100
c.Start()
time.Sleep(1 * time.Second)

// Load plugin
_, err := c.Load(path.Join(PulsePath, "plugin", "pulse-publisher-file"))
<-lpe.done
So(err, ShouldBeNil)
So(len(c.pluginManager.all()), ShouldEqual, 1)
lp, err2 := c.pluginManager.get("publisher:file:1")
Expand All @@ -751,7 +761,8 @@ func TestPublishMetrics(t *testing.T) {
pool.subscribe(1, unboundSubscriptionType)
errs := c.sendPluginSubscriptionEvent(1, p)
So(errs, ShouldBeNil)
time.Sleep(1 * time.Second)
<-lpe.done
time.Sleep(1500 * time.Millisecond)

Convey("Publish to file", func() {
metrics := []plugin.PluginMetricType{
Expand All @@ -778,11 +789,15 @@ func TestProcessMetrics(t *testing.T) {

// Create controller
c := New()
lpe := newListenToPluginEvent()
c.eventManager.RegisterHandler("TestProcessMetrics", lpe)
c.pluginRunner.(*runner).monitor.duration = time.Millisecond * 100
c.Start()
time.Sleep(1 * time.Second)

// Load plugin
_, err := c.Load(path.Join(PulsePath, "plugin", "pulse-processor-passthru"))
<-lpe.done
So(err, ShouldBeNil)
So(len(c.pluginManager.all()), ShouldEqual, 1)
lp, err2 := c.pluginManager.get("processor:passthru:1")
Expand All @@ -803,7 +818,8 @@ func TestProcessMetrics(t *testing.T) {
pool.subscribe(1, unboundSubscriptionType)
errs := c.sendPluginSubscriptionEvent(1, p)
So(errs, ShouldBeNil)
time.Sleep(1 * time.Second)
<-lpe.done
time.Sleep(1500 * time.Millisecond)

Convey("process metrics", func() {
metrics := []plugin.PluginMetricType{
Expand Down
28 changes: 19 additions & 9 deletions mgmt/rest/client/client_func_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
"io/ioutil"
"os"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -561,6 +562,7 @@ func TestPulseClient(t *testing.T) {
})
Convey("WatchTasks", func() {
Convey("event stream", func() {
rest.StreamingBufferWindow = 0.01
port := getPort()
uri := startAPI(port)
c := New(uri, "v1")
Expand All @@ -569,21 +571,27 @@ func TestPulseClient(t *testing.T) {
c.LoadPlugin(FILE_PLUGIN_PATH)

wf := getWMFromSample("1.json")
sch := &Schedule{Type: "simple", Interval: "100ms"}
sch := &Schedule{Type: "simple", Interval: "500ms"}
p := c.CreateTask(sch, wf, "baron", false)

a := make([]string, 0)
type ea struct {
events []string
sync.Mutex
}

a := new(ea)
r := c.WatchTask(uint(p.ID))
time.Sleep(time.Millisecond * 100)
wait := make(chan struct{})
go func() {
for {
select {
case e := <-r.EventChan:
a = append(a, e.EventType)
if len(a) == 10 {
a.Lock()
a.events = append(a.events, e.EventType)
if len(a.events) == 10 {
r.Close()
}
a.Unlock()
case <-r.DoneChan:
close(wait)
return
Expand All @@ -593,11 +601,13 @@ func TestPulseClient(t *testing.T) {
c.StopTask(p.ID)
c.StartTask(p.ID)
<-wait
So(len(a), ShouldEqual, 10)
So(a[0], ShouldEqual, "task-stopped")
So(a[1], ShouldEqual, "task-started")
a.Lock()
So(len(a.events), ShouldEqual, 10)
a.Unlock()
So(a.events[0], ShouldEqual, "task-stopped")
So(a.events[1], ShouldEqual, "task-started")
for x := 2; x <= 9; x++ {
So(a[x], ShouldEqual, "metric-event")
So(a.events[x], ShouldEqual, "metric-event")
}
})

Expand Down
23 changes: 16 additions & 7 deletions mgmt/rest/rest_func_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"net/http"
"os"
"path/filepath"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -1025,7 +1026,7 @@ func TestPluginRestCalls(t *testing.T) {
uploadPlugin(FILE_PLUGIN_PATH, port)
uploadPlugin(PSUTIL_PLUGIN_PATH, port)

r1 := createTask("1.json", "xenu", "100ms", true, port)
r1 := createTask("1.json", "xenu", "500ms", true, port)
So(r1.Body, ShouldHaveSameTypeAs, new(rbody.AddScheduledTask))
plr1 := r1.Body.(*rbody.AddScheduledTask)

Expand All @@ -1036,16 +1037,22 @@ func TestPluginRestCalls(t *testing.T) {
r := watchTask(id, port)
time.Sleep(time.Millisecond * 100)
startTask(id, port)
var events []string
type ea struct {
events []string
sync.Mutex
}
a := new(ea)
wait := make(chan struct{})
go func() {
for {
select {
case e := <-r.eventChan:
events = append(events, e)
if len(events) == 10 {
a.Lock()
a.events = append(a.events, e)
if len(a.events) == 10 {
r.close()
}
a.Unlock()
case <-r.doneChan:
close(wait)
return
Expand All @@ -1054,10 +1061,12 @@ func TestPluginRestCalls(t *testing.T) {
}()
<-wait
stopTask(id, port)
So(len(events), ShouldEqual, 10)
So(events[0], ShouldEqual, "task-started")
a.Lock()
So(len(a.events), ShouldEqual, 10)
a.Unlock()
So(a.events[0], ShouldEqual, "task-started")
for x := 1; x <= 9; x++ {
So(events[x], ShouldEqual, "metric-event")
So(a.events[x], ShouldEqual, "metric-event")
}
})
})
Expand Down

0 comments on commit ef06e67

Please sign in to comment.