From ef06e67894183ff94d2ab43ff93bfb85624ee125 Mon Sep 17 00:00:00 2001 From: Joel Cooklin Date: Wed, 30 Sep 2015 19:33:44 -0700 Subject: [PATCH] Improves reliability of tests Adds more event handling to determine when plugins are loaded. --- control/control_test.go | 86 +++++++++++++++++----------- mgmt/rest/client/client_func_test.go | 28 ++++++--- mgmt/rest/rest_func_test.go | 23 +++++--- 3 files changed, 86 insertions(+), 51 deletions(-) diff --git a/control/control_test.go b/control/control_test.go index cf4ceeaa3..42feecb03 100644 --- a/control/control_test.go +++ b/control/control_test.go @@ -11,6 +11,8 @@ import ( "testing" "time" + . "github.com/smartystreets/goconvey/convey" + "github.com/intelsdi-x/gomit" "github.com/intelsdi-x/pulse/control/plugin" "github.com/intelsdi-x/pulse/control/plugin/cpolicy" @@ -19,8 +21,6 @@ import ( "github.com/intelsdi-x/pulse/core/control_event" "github.com/intelsdi-x/pulse/core/ctypes" "github.com/intelsdi-x/pulse/core/perror" - - . "github.com/smartystreets/goconvey/convey" ) // Mock Executor used to test @@ -76,7 +76,7 @@ func TestSwapPlugin(t *testing.T) { if PulsePath != "" { Convey("SwapPlugin", t, func() { c := New() - lpe := new(listenToPluginEvent) + lpe := newListenToPluginEvent() c.eventManager.RegisterHandler("Control.PluginsSwapped", lpe) c.Start() _, e := c.Load(PluginPath) @@ -143,29 +143,33 @@ type listenToPluginEvent struct { done chan struct{} } +func newListenToPluginEvent() *listenToPluginEvent { + return &listenToPluginEvent{ + done: make(chan struct{}), + plugin: &mockPluginEvent{}, + } +} + func (l *listenToPluginEvent) HandleGomitEvent(e gomit.Event) { switch v := e.Body.(type) { case *control_event.LoadPluginEvent: - l.plugin = &mockPluginEvent{ - LoadedPluginName: v.Name, - LoadedPluginVersion: v.Version, - PluginType: v.Type, - } + l.plugin.LoadedPluginName = v.Name + l.plugin.LoadedPluginVersion = v.Version + l.plugin.PluginType = v.Type + l.done <- struct{}{} case *control_event.UnloadPluginEvent: - l.plugin = &mockPluginEvent{ - UnloadedPluginName: v.Name, - UnloadedPluginVersion: v.Version, - PluginType: v.Type, - } + l.plugin.UnloadedPluginName = v.Name + l.plugin.UnloadedPluginVersion = v.Version + l.plugin.PluginType = v.Type l.done <- struct{}{} case *control_event.SwapPluginsEvent: - l.plugin = &mockPluginEvent{ - LoadedPluginName: v.LoadedPluginName, - LoadedPluginVersion: v.LoadedPluginVersion, - UnloadedPluginName: v.UnloadedPluginName, - UnloadedPluginVersion: v.UnloadedPluginVersion, - PluginType: v.PluginType, - } + l.plugin.LoadedPluginName = v.LoadedPluginName + l.plugin.LoadedPluginVersion = v.LoadedPluginVersion + l.plugin.UnloadedPluginName = v.UnloadedPluginName + l.plugin.UnloadedPluginVersion = v.UnloadedPluginVersion + l.plugin.PluginType = v.PluginType + case *control_event.PluginSubscriptionEvent: + l.done <- struct{}{} default: fmt.Println("Got an event you're not handling") } @@ -192,7 +196,7 @@ func TestLoad(t *testing.T) { Convey("loads successfully", func() { c := New() - lpe := new(listenToPluginEvent) + lpe := newListenToPluginEvent() c.eventManager.RegisterHandler("Control.PluginLoaded", lpe) c.Start() _, err := c.Load(PluginPath) @@ -234,7 +238,7 @@ func TestLoad(t *testing.T) { c := New() c.pluginTrust = 1 c.signingManager = &mocksigningManager{signed: true} - lpe := new(listenToPluginEvent) + lpe := newListenToPluginEvent() c.eventManager.RegisterHandler("Control.PluginLoaded", lpe) c.Start() _, err := c.Load(PluginPath) @@ -246,7 +250,7 @@ func TestLoad(t *testing.T) { c := New() c.pluginTrust = 2 c.signingManager = &mocksigningManager{signed: false} - lpe := new(listenToPluginEvent) + lpe := newListenToPluginEvent() c.eventManager.RegisterHandler("Control.PluginLoaded", lpe) c.Start() _, err := c.Load(PluginPath) @@ -258,7 +262,7 @@ func TestLoad(t *testing.T) { c := New() c.pluginTrust = 1 c.signingManager = &mocksigningManager{signed: false} - lpe := new(listenToPluginEvent) + lpe := newListenToPluginEvent() c.eventManager.RegisterHandler("Control.PluginLoaded", lpe) c.Start() _, err := c.Load(PluginPath) @@ -279,12 +283,12 @@ func TestUnload(t *testing.T) { Convey("pluginControl.Unload", t, func() { Convey("unloads successfully", func() { c := New() - lpe := new(listenToPluginEvent) - done := make(chan struct{}) - lpe.done = done - c.eventManager.RegisterHandler("Control.PluginUnloaded", lpe) + lpe := newListenToPluginEvent() + c.eventManager.RegisterHandler("TestUnload", lpe) c.Start() + time.Sleep(100 * time.Millisecond) _, err := c.Load(PluginPath) + <-lpe.done So(c.pluginManager.all(), ShouldNotBeEmpty) So(err, ShouldBeNil) @@ -292,8 +296,9 @@ func TestUnload(t *testing.T) { pc := c.PluginCatalog() So(len(pc), ShouldEqual, 1) + So(pc[0].Name(), ShouldEqual, "dummy1") _, err2 := c.Unload(pc[0]) - <-done + <-lpe.done So(err2, ShouldBeNil) So(lpe.plugin.UnloadedPluginName, ShouldEqual, "dummy1") So(lpe.plugin.UnloadedPluginVersion, ShouldEqual, 1) @@ -318,15 +323,15 @@ func TestUnload(t *testing.T) { }) Convey("Listen for PluginUnloaded event", func() { c := New() - lpe := new(listenToPluginEvent) - done := make(chan struct{}) - lpe.done = done + lpe := newListenToPluginEvent() c.eventManager.RegisterHandler("Control.PluginUnloaded", lpe) c.Start() + time.Sleep(100 * time.Millisecond) c.Load(PluginPath) + <-lpe.done pc := c.PluginCatalog() c.Unload(pc[0]) - <-done + <-lpe.done So(lpe.plugin.UnloadedPluginName, ShouldEqual, "dummy1") }) @@ -636,6 +641,7 @@ func TestCollectMetrics(t *testing.T) { c := New() c.pluginRunner.(*runner).monitor.duration = time.Millisecond * 100 c.Start() + time.Sleep(100 * time.Millisecond) // Load plugin c.Load(PluginPath) @@ -725,11 +731,15 @@ func TestPublishMetrics(t *testing.T) { // Create controller c := New() + lpe := newListenToPluginEvent() + c.eventManager.RegisterHandler("TestPublishMetrics", lpe) c.pluginRunner.(*runner).monitor.duration = time.Millisecond * 100 c.Start() + time.Sleep(1 * time.Second) // Load plugin _, err := c.Load(path.Join(PulsePath, "plugin", "pulse-publisher-file")) + <-lpe.done So(err, ShouldBeNil) So(len(c.pluginManager.all()), ShouldEqual, 1) lp, err2 := c.pluginManager.get("publisher:file:1") @@ -751,7 +761,8 @@ func TestPublishMetrics(t *testing.T) { pool.subscribe(1, unboundSubscriptionType) errs := c.sendPluginSubscriptionEvent(1, p) So(errs, ShouldBeNil) - time.Sleep(1 * time.Second) + <-lpe.done + time.Sleep(1500 * time.Millisecond) Convey("Publish to file", func() { metrics := []plugin.PluginMetricType{ @@ -778,11 +789,15 @@ func TestProcessMetrics(t *testing.T) { // Create controller c := New() + lpe := newListenToPluginEvent() + c.eventManager.RegisterHandler("TestProcessMetrics", lpe) c.pluginRunner.(*runner).monitor.duration = time.Millisecond * 100 c.Start() + time.Sleep(1 * time.Second) // Load plugin _, err := c.Load(path.Join(PulsePath, "plugin", "pulse-processor-passthru")) + <-lpe.done So(err, ShouldBeNil) So(len(c.pluginManager.all()), ShouldEqual, 1) lp, err2 := c.pluginManager.get("processor:passthru:1") @@ -803,7 +818,8 @@ func TestProcessMetrics(t *testing.T) { pool.subscribe(1, unboundSubscriptionType) errs := c.sendPluginSubscriptionEvent(1, p) So(errs, ShouldBeNil) - time.Sleep(1 * time.Second) + <-lpe.done + time.Sleep(1500 * time.Millisecond) Convey("process metrics", func() { metrics := []plugin.PluginMetricType{ diff --git a/mgmt/rest/client/client_func_test.go b/mgmt/rest/client/client_func_test.go index f1f43fc49..f9417c594 100644 --- a/mgmt/rest/client/client_func_test.go +++ b/mgmt/rest/client/client_func_test.go @@ -6,6 +6,7 @@ import ( "fmt" "io/ioutil" "os" + "sync" "testing" "time" @@ -561,6 +562,7 @@ func TestPulseClient(t *testing.T) { }) Convey("WatchTasks", func() { Convey("event stream", func() { + rest.StreamingBufferWindow = 0.01 port := getPort() uri := startAPI(port) c := New(uri, "v1") @@ -569,21 +571,27 @@ func TestPulseClient(t *testing.T) { c.LoadPlugin(FILE_PLUGIN_PATH) wf := getWMFromSample("1.json") - sch := &Schedule{Type: "simple", Interval: "100ms"} + sch := &Schedule{Type: "simple", Interval: "500ms"} p := c.CreateTask(sch, wf, "baron", false) - a := make([]string, 0) + type ea struct { + events []string + sync.Mutex + } + + a := new(ea) r := c.WatchTask(uint(p.ID)) - time.Sleep(time.Millisecond * 100) wait := make(chan struct{}) go func() { for { select { case e := <-r.EventChan: - a = append(a, e.EventType) - if len(a) == 10 { + a.Lock() + a.events = append(a.events, e.EventType) + if len(a.events) == 10 { r.Close() } + a.Unlock() case <-r.DoneChan: close(wait) return @@ -593,11 +601,13 @@ func TestPulseClient(t *testing.T) { c.StopTask(p.ID) c.StartTask(p.ID) <-wait - So(len(a), ShouldEqual, 10) - So(a[0], ShouldEqual, "task-stopped") - So(a[1], ShouldEqual, "task-started") + a.Lock() + So(len(a.events), ShouldEqual, 10) + a.Unlock() + So(a.events[0], ShouldEqual, "task-stopped") + So(a.events[1], ShouldEqual, "task-started") for x := 2; x <= 9; x++ { - So(a[x], ShouldEqual, "metric-event") + So(a.events[x], ShouldEqual, "metric-event") } }) diff --git a/mgmt/rest/rest_func_test.go b/mgmt/rest/rest_func_test.go index db9176375..e8ab71a3c 100644 --- a/mgmt/rest/rest_func_test.go +++ b/mgmt/rest/rest_func_test.go @@ -14,6 +14,7 @@ import ( "net/http" "os" "path/filepath" + "sync" "testing" "time" @@ -1025,7 +1026,7 @@ func TestPluginRestCalls(t *testing.T) { uploadPlugin(FILE_PLUGIN_PATH, port) uploadPlugin(PSUTIL_PLUGIN_PATH, port) - r1 := createTask("1.json", "xenu", "100ms", true, port) + r1 := createTask("1.json", "xenu", "500ms", true, port) So(r1.Body, ShouldHaveSameTypeAs, new(rbody.AddScheduledTask)) plr1 := r1.Body.(*rbody.AddScheduledTask) @@ -1036,16 +1037,22 @@ func TestPluginRestCalls(t *testing.T) { r := watchTask(id, port) time.Sleep(time.Millisecond * 100) startTask(id, port) - var events []string + type ea struct { + events []string + sync.Mutex + } + a := new(ea) wait := make(chan struct{}) go func() { for { select { case e := <-r.eventChan: - events = append(events, e) - if len(events) == 10 { + a.Lock() + a.events = append(a.events, e) + if len(a.events) == 10 { r.close() } + a.Unlock() case <-r.doneChan: close(wait) return @@ -1054,10 +1061,12 @@ func TestPluginRestCalls(t *testing.T) { }() <-wait stopTask(id, port) - So(len(events), ShouldEqual, 10) - So(events[0], ShouldEqual, "task-started") + a.Lock() + So(len(a.events), ShouldEqual, 10) + a.Unlock() + So(a.events[0], ShouldEqual, "task-started") for x := 1; x <= 9; x++ { - So(events[x], ShouldEqual, "metric-event") + So(a.events[x], ShouldEqual, "metric-event") } }) })