Skip to content
This repository has been archived by the owner on Nov 8, 2022. It is now read-only.

Commit

Permalink
Merge pull request #478 from jcooklin/ib/463
Browse files Browse the repository at this point in the history
Adds handling to tribe for task start/stop/remove
  • Loading branch information
jcooklin committed Nov 13, 2015
2 parents 65cbae1 + 90c6f94 commit a49dc8a
Show file tree
Hide file tree
Showing 13 changed files with 822 additions and 312 deletions.
1 change: 0 additions & 1 deletion control/plugin/client/httpjsonrpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,6 @@ func TestHTTPJSONRPC(t *testing.T) {
So(mts, ShouldNotBeNil)
So(mts, ShouldHaveSameTypeAs, []core.Metric{})
So(len(mts), ShouldBeGreaterThan, 0)
log.Errorf("!asdf %v", mts[0].Config())
So(len(mts[0].Config().Table()), ShouldBeGreaterThan, 0)
})

Expand Down
2 changes: 1 addition & 1 deletion control/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ func (r *runner) startPlugin(p executablePlugin) (*availablePlugin, error) {
}

// Wait for plugin response
resp, err := p.WaitForResponse(time.Second * 3)
resp, err := p.WaitForResponse(time.Second * 5)
if err != nil {
e := errors.New("error while waiting for response: " + err.Error())
runnerLog.WithFields(log.Fields{
Expand Down
4 changes: 4 additions & 0 deletions core/scheduler_event/scheduler_event.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ const (

type TaskStartedEvent struct {
TaskID string
Source string
}

func (e TaskStartedEvent) Namespace() string {
Expand All @@ -44,6 +45,7 @@ func (e TaskStartedEvent) Namespace() string {
type TaskCreatedEvent struct {
TaskID string
StartOnCreate bool
Source string
}

func (e TaskCreatedEvent) Namespace() string {
Expand All @@ -52,6 +54,7 @@ func (e TaskCreatedEvent) Namespace() string {

type TaskDeletedEvent struct {
TaskID string
Source string
}

func (e TaskDeletedEvent) Namespace() string {
Expand All @@ -60,6 +63,7 @@ func (e TaskDeletedEvent) Namespace() string {

type TaskStoppedEvent struct {
TaskID string
Source string
}

func (e TaskStoppedEvent) Namespace() string {
Expand Down
6 changes: 4 additions & 2 deletions mgmt/rest/client/client_tribe_func_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"net/http"
"strconv"
"sync"
"sync/atomic"
"testing"
"time"

Expand All @@ -40,14 +41,15 @@ import (
"github.com/intelsdi-x/pulse/scheduler"
)

var NextPort = 46000
var NextPort int32 = 46000

func getPort() int {
defer incrPort()
return NextPort
return int(atomic.LoadInt32(&NextPort))
}

func incrPort() {
atomic.AddInt32(&NextPort, 10)
NextPort += 10
}

Expand Down
177 changes: 136 additions & 41 deletions mgmt/rest/tribe_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,12 @@ import (
. "github.com/smartystreets/goconvey/convey"

"github.com/intelsdi-x/pulse/control"
"github.com/intelsdi-x/pulse/core"
"github.com/intelsdi-x/pulse/mgmt/rest/rbody"
"github.com/intelsdi-x/pulse/mgmt/tribe"
"github.com/intelsdi-x/pulse/scheduler"
)

var lock sync.Mutex = sync.Mutex{}

func getMembers(port int) *rbody.APIResponse {
resp, err := http.Get(fmt.Sprintf("http://127.0.0.1:%d/v1/tribe/members", port))
if err != nil {
Expand Down Expand Up @@ -149,11 +148,9 @@ func addAgreement(port int, name string) *rbody.APIResponse {

func TestTribeTaskAgreements(t *testing.T) {
log.SetLevel(log.WarnLevel)
lock.Lock()
numOfNodes := 5
aName := "agreement1"
mgtPorts := startTribes(numOfNodes)
lock.Unlock()
Convey("A cluster is started", t, func() {
Convey("Members are retrieved", func() {
for _, i := range mgtPorts {
Expand Down Expand Up @@ -204,19 +201,18 @@ func TestTribeTaskAgreements(t *testing.T) {
resp := uploadPlugin(MOCK_PLUGIN_PATH1, mgtPorts[0])
So(resp.Meta.Code, ShouldEqual, 201)
So(resp.Meta.Type, ShouldEqual, rbody.PluginsLoadedType)
resp = uploadPlugin(FILE_PLUGIN_PATH, mgtPorts[0])
So(resp.Meta.Code, ShouldEqual, 201)
So(resp.Meta.Type, ShouldEqual, rbody.PluginsLoadedType)
resp = getPluginList(mgtPorts[0])
So(resp.Meta.Code, ShouldEqual, 200)
So(len(resp.Body.(*rbody.PluginList).LoadedPlugins), ShouldEqual, 2)
So(len(resp.Body.(*rbody.PluginList).LoadedPlugins), ShouldEqual, 1)
pluginToUnload := resp.Body.(*rbody.PluginList).LoadedPlugins[0]
resp = getAgreement(mgtPorts[0], aName)
So(resp.Meta.Code, ShouldEqual, 200)
So(len(resp.Body.(*rbody.TribeGetAgreement).Agreement.PluginAgreement.Plugins), ShouldEqual, 2)
resp = createTask("1.json", "task1", "1s", true, mgtPorts[0])
So(len(resp.Body.(*rbody.TribeGetAgreement).Agreement.PluginAgreement.Plugins), ShouldEqual, 1)
resp = createTask("3.json", "task1", "1s", true, mgtPorts[0])
So(resp.Meta.Code, ShouldEqual, 201)
So(resp.Meta.Type, ShouldEqual, rbody.AddScheduledTaskType)
So(resp.Body, ShouldHaveSameTypeAs, new(rbody.AddScheduledTask))
taskID := resp.Body.(*rbody.AddScheduledTask).ID

Convey("The cluster agrees on tasks", func(c C) {
var wg sync.WaitGroup
Expand Down Expand Up @@ -246,38 +242,139 @@ func TestTribeTaskAgreements(t *testing.T) {
}
wg.Wait()
So(timedOut, ShouldEqual, false)

Convey("The task has been shared and loaded across the cluster", func(c C) {
var wg sync.WaitGroup
timedOut := false
for i := 0; i < numOfNodes; i++ {
timer := time.After(15 * time.Second)
wg.Add(1)
go func(port int) {
defer wg.Done()
for {
select {
case <-timer:
timedOut = true
return
default:
resp := getTasks(port)
if resp.Meta.Code == 200 {
if len(resp.Body.(*rbody.ScheduledTaskListReturned).ScheduledTasks) == 1 {
log.Debugf("node %v has %d tasks", port, len(resp.Body.(*rbody.ScheduledTaskListReturned).ScheduledTasks))
return
Convey("The task is started", func() {
resp := startTask(taskID, mgtPorts[0])
So(resp.Meta.Code, ShouldEqual, 200)
So(resp.Meta.Type, ShouldEqual, rbody.ScheduledTaskStartedType)
Convey("The task is started on all members of the tribe", func(c C) {
var wg sync.WaitGroup
timedOut := false
for i := 0; i < numOfNodes; i++ {
timer := time.After(15 * time.Second)
wg.Add(1)
go func(port int) {
defer wg.Done()
for {
select {
case <-timer:
timedOut = true
return
default:
resp := getTask(taskID, port)
if resp.Meta.Code == 200 {
if resp.Body.(*rbody.ScheduledTaskReturned).State == core.TaskSpinning.String() || resp.Body.(*rbody.ScheduledTaskReturned).State == core.TaskFiring.String() {
return
}
log.Debugf("port %v has task in state %v", port, resp.Body.(*rbody.ScheduledTaskReturned).State)
} else {
log.Debugf("node %v error getting task", port)
}
log.Debugf("node %v has %d tasks", port, len(resp.Body.(*rbody.ScheduledTaskListReturned).ScheduledTasks))
} else {
log.Debugf("node %v error getting task", port)
time.Sleep(400 * time.Millisecond)
}
time.Sleep(400 * time.Millisecond)
}
}(mgtPorts[i])
}
wg.Wait()
So(timedOut, ShouldEqual, false)
Convey("The task is stopped", func() {
resp := stopTask(taskID, mgtPorts[0])
So(resp.Meta.Code, ShouldEqual, 200)
So(resp.Meta.Type, ShouldEqual, rbody.ScheduledTaskStoppedType)
So(resp.Body, ShouldHaveSameTypeAs, new(rbody.ScheduledTaskStopped))
var wg sync.WaitGroup
timedOut := false
for i := 0; i < numOfNodes; i++ {
timer := time.After(15 * time.Second)
wg.Add(1)
go func(port int) {
defer wg.Done()
for {
select {
case <-timer:
timedOut = true
return
default:
resp := getTask(taskID, port)
if resp.Meta.Code == 200 {
if resp.Body.(*rbody.ScheduledTaskReturned).State == core.TaskStopped.String() {
return
}
}
time.Sleep(400 * time.Millisecond)
}
}
}(mgtPorts[i])
}
}(mgtPorts[i])
}
wg.Wait()
So(timedOut, ShouldEqual, false)
wg.Wait()
So(timedOut, ShouldEqual, false)
Convey("The task is removed", func() {
for _, port := range mgtPorts {
resp := getTask(taskID, port)
So(resp.Meta.Code, ShouldEqual, 200)
So(resp.Body.(*rbody.ScheduledTaskReturned).State, ShouldResemble, core.TaskStopped.String())
}
resp := removeTask(taskID, mgtPorts[0])
So(resp.Meta.Code, ShouldEqual, 200)
So(resp.Meta.Type, ShouldEqual, rbody.ScheduledTaskRemovedType)
So(resp.Body, ShouldHaveSameTypeAs, new(rbody.ScheduledTaskRemoved))
var wg sync.WaitGroup
timedOut := false
for i := 0; i < numOfNodes; i++ {
timer := time.After(15 * time.Second)
wg.Add(1)
go func(port int) {
defer wg.Done()
for {
select {
case <-timer:
timedOut = true
return
default:
resp := getTask(taskID, port)
if resp.Meta.Code == 404 {
return
}
time.Sleep(400 * time.Millisecond)
}
}
}(mgtPorts[i])
}
wg.Wait()
So(timedOut, ShouldEqual, false)
Convey("The plugins are unloaded", func(c C) {
resp := unloadPlugin(mgtPorts[0], pluginToUnload.Type, pluginToUnload.Name, pluginToUnload.Version)
So(resp.Meta.Code, ShouldEqual, 200)
So(resp.Meta.Type, ShouldEqual, rbody.PluginUnloadedType)
So(resp.Body, ShouldHaveSameTypeAs, new(rbody.PluginUnloaded))
var wg sync.WaitGroup
timedOut := false
for i := 0; i < numOfNodes; i++ {
timer := time.After(15 * time.Second)
wg.Add(1)
go func(port int) {
defer wg.Done()
for {
select {
case <-timer:
timedOut = true
return
default:
resp = getPluginList(port)
c.So(resp.Meta.Code, ShouldEqual, 200)
if len(resp.Body.(*rbody.PluginList).LoadedPlugins) == 0 {
return
}
time.Sleep(400 * time.Millisecond)
}
}
}(mgtPorts[i])
}
wg.Wait()
So(timedOut, ShouldEqual, false)
})
})
})
})
})
})
})
Expand All @@ -288,15 +385,13 @@ func TestTribeTaskAgreements(t *testing.T) {
}

func TestTribePluginAgreements(t *testing.T) {
lock.Lock()
var (
lpName, lpType string
lpVersion int
)
numOfNodes := 5
aName := "agreement1"
mgtPorts := startTribes(numOfNodes)
lock.Unlock()
Convey("A cluster is started", t, func() {
Convey("Members are retrieved", func() {
for _, i := range mgtPorts {
Expand Down Expand Up @@ -540,7 +635,7 @@ func startTribes(count int) []int {
mgtPorts = append(mgtPorts, mgtPort)
tribePort := getAvailablePort()
conf := tribe.DefaultConfig(fmt.Sprintf("member-%v", mgtPort), "127.0.0.1", tribePort, seed, mgtPort)
conf.MemberlistConfig.PushPullInterval = 5 * time.Second
// conf.MemberlistConfig.PushPullInterval = 5 * time.Second
conf.MemberlistConfig.RetransmitMult = conf.MemberlistConfig.RetransmitMult * 2
if seed == "" {
seed = fmt.Sprintf("%s:%d", "127.0.0.1", tribePort)
Expand Down
14 changes: 14 additions & 0 deletions mgmt/rest/wmap_sample/3.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
{
"collect": {
"metrics": {
"/intel/mock/foo": {"version": 1}
},
"config": {
"/intel/mock/foo": {
"password": "testval"
}
},
"process": [],
"publish": []
}
}
Loading

0 comments on commit a49dc8a

Please sign in to comment.