Skip to content
This repository has been archived by the owner on Nov 8, 2022. It is now read-only.

Commit

Permalink
Update rest tests with change from riemann to file plugin and update …
Browse files Browse the repository at this point in the history
…watch test to not depend on task going disabled
  • Loading branch information
geauxvirtual committed Sep 30, 2015
1 parent cef280f commit 7c47024
Showing 1 changed file with 89 additions and 46 deletions.
135 changes: 89 additions & 46 deletions mgmt/rest/rest_func_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,11 @@ var (
// Switching this turns on logging for all the REST API calls
LOG_LEVEL = log.FatalLevel

PULSE_PATH = os.Getenv("PULSE_PATH")
DUMMY_PLUGIN_PATH1 = PULSE_PATH + "/plugin/pulse-collector-dummy1"
DUMMY_PLUGIN_PATH2 = PULSE_PATH + "/plugin/pulse-collector-dummy2"
PSUTIL_PLUGIN_PATH = PULSE_PATH + "/plugin/pulse-collector-psutil"
RIEMANN_PLUGIN_PATH = PULSE_PATH + "/plugin/pulse-publisher-riemann"
PULSE_PATH = os.Getenv("PULSE_PATH")
DUMMY_PLUGIN_PATH1 = PULSE_PATH + "/plugin/pulse-collector-dummy1"
DUMMY_PLUGIN_PATH2 = PULSE_PATH + "/plugin/pulse-collector-dummy2"
PSUTIL_PLUGIN_PATH = PULSE_PATH + "/plugin/pulse-collector-psutil"
FILE_PLUGIN_PATH = PULSE_PATH + "/plugin/pulse-publisher-file"

NextPort = 40000
CompressedUpload = true
Expand Down Expand Up @@ -92,30 +92,53 @@ func getStreamingAPIResponse(resp *http.Response) *APIResponse {
return r
}

func watchTask(id, port int) []string {
type watchTaskResult struct {
eventChan chan string
doneChan chan struct{}
killChan chan struct{}
}

func (w *watchTaskResult) close() {
close(w.doneChan)
}

func watchTask(id, port int) *watchTaskResult {
resp, err := http.Get(fmt.Sprintf("http://localhost:%d/v1/tasks/%d/watch", port, id))
if err != nil {
log.Fatal(err)
}

reader := bufio.NewReader(resp.Body)
r := make([]string, 0)
for {
line, _ := reader.ReadBytes('\n')
ste := &rbody.StreamedTaskEvent{}
err := json.Unmarshal(line, ste)
if err != nil {
log.Fatal(err)
}
switch ste.EventType {
case rbody.TaskWatchTaskDisabled:
r = append(r, ste.EventType)
return r
case rbody.TaskWatchTaskStopped, rbody.TaskWatchTaskStarted, rbody.TaskWatchMetricEvent:
r = append(r, ste.EventType)
}
r := &watchTaskResult{
eventChan: make(chan string),
doneChan: make(chan struct{}),
killChan: make(chan struct{}),
}

go func() {
reader := bufio.NewReader(resp.Body)
for {
select {
case <-r.doneChan:
resp.Body.Close()
return
default:
line, _ := reader.ReadBytes('\n')
ste := &rbody.StreamedTaskEvent{}
err := json.Unmarshal(line, ste)
if err != nil {
return
}
switch ste.EventType {
case rbody.TaskWatchTaskDisabled:
r.eventChan <- ste.EventType
close(r.doneChan)
return
case rbody.TaskWatchTaskStopped, rbody.TaskWatchTaskStarted, rbody.TaskWatchMetricEvent:
r.eventChan <- ste.EventType
}
}
}
}()
return r
}

func getTasks(port int) *APIResponse {
Expand Down Expand Up @@ -783,7 +806,7 @@ func TestPluginRestCalls(t *testing.T) {
startAPI(port)

uploadPlugin(DUMMY_PLUGIN_PATH2, port)
uploadPlugin(RIEMANN_PLUGIN_PATH, port)
uploadPlugin(FILE_PLUGIN_PATH, port)
r := createTask("1.json", "foo", "1s", true, port)
So(r.Body, ShouldHaveSameTypeAs, new(rbody.AddScheduledTask))
plr := r.Body.(*rbody.AddScheduledTask)
Expand All @@ -804,7 +827,7 @@ func TestPluginRestCalls(t *testing.T) {
startAPI(port)

uploadPlugin(DUMMY_PLUGIN_PATH2, port)
uploadPlugin(RIEMANN_PLUGIN_PATH, port)
uploadPlugin(FILE_PLUGIN_PATH, port)
r := createTask("1.json", "bar", "1s", true, port)
So(r.Body, ShouldHaveSameTypeAs, new(rbody.AddScheduledTask))

Expand All @@ -820,7 +843,7 @@ func TestPluginRestCalls(t *testing.T) {
startAPI(port)

uploadPlugin(DUMMY_PLUGIN_PATH2, port)
uploadPlugin(RIEMANN_PLUGIN_PATH, port)
uploadPlugin(FILE_PLUGIN_PATH, port)

r1 := createTask("1.json", "alpha", "1s", true, port)
So(r1.Body, ShouldHaveSameTypeAs, new(rbody.AddScheduledTask))
Expand All @@ -843,7 +866,7 @@ func TestPluginRestCalls(t *testing.T) {
startAPI(port)

uploadPlugin(DUMMY_PLUGIN_PATH2, port)
uploadPlugin(RIEMANN_PLUGIN_PATH, port)
uploadPlugin(FILE_PLUGIN_PATH, port)
r1 := createTask("1.json", "foo", "3s", true, port)
So(r1.Body, ShouldHaveSameTypeAs, new(rbody.AddScheduledTask))
t1 := r1.Body.(*rbody.AddScheduledTask)
Expand All @@ -859,7 +882,7 @@ func TestPluginRestCalls(t *testing.T) {
startAPI(port)

uploadPlugin(DUMMY_PLUGIN_PATH2, port)
uploadPlugin(RIEMANN_PLUGIN_PATH, port)
uploadPlugin(FILE_PLUGIN_PATH, port)

r1 := createTask("1.json", "xenu", "1s", true, port)
So(r1.Body, ShouldHaveSameTypeAs, new(rbody.AddScheduledTask))
Expand Down Expand Up @@ -887,7 +910,7 @@ func TestPluginRestCalls(t *testing.T) {
startAPI(port)

uploadPlugin(DUMMY_PLUGIN_PATH2, port)
uploadPlugin(RIEMANN_PLUGIN_PATH, port)
uploadPlugin(FILE_PLUGIN_PATH, port)

r1 := createTask("1.json", "xenu", "1s", false, port)
So(r1.Body, ShouldHaveSameTypeAs, new(rbody.AddScheduledTask))
Expand All @@ -913,7 +936,7 @@ func TestPluginRestCalls(t *testing.T) {
startAPI(port)

uploadPlugin(DUMMY_PLUGIN_PATH2, port)
uploadPlugin(RIEMANN_PLUGIN_PATH, port)
uploadPlugin(FILE_PLUGIN_PATH, port)

r1 := createTask("1.json", "yeti", "1s", true, port)
So(r1.Body, ShouldHaveSameTypeAs, new(rbody.AddScheduledTask))
Expand Down Expand Up @@ -950,7 +973,7 @@ func TestPluginRestCalls(t *testing.T) {
})

Convey("Remove Task - DELETE - /v1/tasks/:id", func() {
Convey("error on trying to remove unknown plugin", func() {
Convey("error on trying to remove unknown task", func() {
port := getPort()
startAPI(port)

Expand All @@ -964,7 +987,7 @@ func TestPluginRestCalls(t *testing.T) {
startAPI(port)

uploadPlugin(DUMMY_PLUGIN_PATH2, port)
uploadPlugin(RIEMANN_PLUGIN_PATH, port)
uploadPlugin(FILE_PLUGIN_PATH, port)

r1 := createTask("1.json", "yeti", "1s", true, port)
So(r1.Body, ShouldHaveSameTypeAs, new(rbody.AddScheduledTask))
Expand Down Expand Up @@ -996,7 +1019,7 @@ func TestPluginRestCalls(t *testing.T) {
startAPI(port)

uploadPlugin(DUMMY_PLUGIN_PATH2, port)
uploadPlugin(RIEMANN_PLUGIN_PATH, port)
uploadPlugin(FILE_PLUGIN_PATH, port)
uploadPlugin(PSUTIL_PLUGIN_PATH, port)

r1 := createTask("1.json", "xenu", "10ms", true, port)
Expand All @@ -1008,28 +1031,48 @@ func TestPluginRestCalls(t *testing.T) {
// Change buffer window to 10ms (do not do this IRL)
// sample 1.json should fail after 10 attempts and be disabled
StreamingBufferWindow = 0.01
var r []string
r := watchTask(id, port)
time.Sleep(time.Millisecond * 100)
startTask(id, port)
var events []string
wait := make(chan struct{})
go func() {
r = watchTask(id, port)
close(wait)
for {
select {
case e := <-r.eventChan:
events = append(events, e)
if len(events) == 10 {
r.close()
}
case <-r.doneChan:
close(wait)
return
}
}
}()
<-wait
stopTask(id, port)
//var r []string
//wait := make(chan struct{})
//go func() {
// r = watchTask(id, port)
// close(wait)
//}()

// Just enough time for watcher to start
time.Sleep(time.Millisecond * 100)
stopTask(id, port)
startTask(id, port)
//time.Sleep(time.Millisecond * 100)
//stopTask(id, port)
//startTask(id, port)

// Wait for streaming to end and then test the order and type of events from stream
<-wait
So(len(r), ShouldBeGreaterThanOrEqualTo, 3)
//<-wait
So(len(events), ShouldBeGreaterThanOrEqualTo, 0)
So(events[0], ShouldEqual, "task-started")
// So(r[0], ShouldEqual, "task-stopped") disabled because of Bug
// So(r[1], ShouldEqual, "task-started")
// this is depedent on there being >= 12 events, which is unlikely on a system under stress.
// disabling for now.
//for x := 2; x <= 11; x++ {
// So(r[x], ShouldEqual, "metric-event")
//}
for x := 1; x <= 9; x++ {
So(events[x], ShouldEqual, "metric-event")
}
})
})
})
Expand Down

0 comments on commit 7c47024

Please sign in to comment.