Skip to content
This repository has been archived by the owner on Nov 8, 2022. It is now read-only.

Commit

Permalink
Adds unit testing for workJob submission calls.
Browse files Browse the repository at this point in the history
  • Loading branch information
lynxbat committed Mar 6, 2016
1 parent 500fd73 commit 99c1b19
Showing 1 changed file with 163 additions and 0 deletions.
163 changes: 163 additions & 0 deletions scheduler/workflow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ limitations under the License.
package scheduler

import (
"errors"
"fmt"
"os"
"path"
"testing"
Expand All @@ -28,6 +30,7 @@ import (
"github.com/intelsdi-x/snap/control"
"github.com/intelsdi-x/snap/core"
"github.com/intelsdi-x/snap/core/cdata"
"github.com/intelsdi-x/snap/pkg/promise"
"github.com/intelsdi-x/snap/pkg/schedule"
"github.com/intelsdi-x/snap/scheduler/wmap"

Expand Down Expand Up @@ -121,3 +124,163 @@ func TestCollectPublishWorkflow(t *testing.T) {
})
})
}

// The mocks below are here for testing work submission
type Mock1 struct {
count int
errorIndex int
delay time.Duration
queue map[string]int
}

func (m *Mock1) CollectMetrics([]core.Metric, time.Time, string) ([]core.Metric, []error) {
return nil, nil
}

func (m *Mock1) Work(j job) queuedJob {
m.queue[j.TypeString()]++
return m
}

func (m *Mock1) Promise() promise.Promise {
return m
}

func (m *Mock1) Await() []error {
time.Sleep(m.delay)
m.count++
if m.count == m.errorIndex {
return []error{errors.New("I am an error")}
}
return nil
}

func (m *Mock1) AwaitUntil(time.Duration) []error {
return nil
}

func (m *Mock1) Complete([]error) {

}

func (m *Mock1) IsComplete() bool {
return false
}

func (m *Mock1) Job() job {
return nil
}

func (m *Mock1) AndThen(_ func([]error)) {
}

func (m *Mock1) AndThenUntil(_ time.Duration, _ func([]error)) {
}

func TestWorkJobs(t *testing.T) {
Convey("Test speed and concurrency of TestWorkJobs\n", t, func() {
Convey("submit multiple jobs\n", func() {
m := &Mock1{queue: make(map[string]int)}
m.delay = time.Millisecond * 100
pj := newCollectorJob(nil, time.Second*1, m, nil, "")
prs := make([]*processNode, 0)
pus := make([]*publishNode, 0)
counter := 0
t := &task{manager: m, id: "1", name: "mock"}
for x := 0; x < 3; x++ {
n := cdata.NewNode()
pr := &processNode{config: n, name: fmt.Sprintf("prjob%d", counter)}
pu := &publishNode{config: n, name: fmt.Sprintf("pujob%d", counter)}
counter++
prs = append(prs, pr)
pus = append(pus, pu)
}
workJobs(prs, pus, t, pj)
So(m.queue["processor"], ShouldEqual, 3)
So(m.queue["publisher"], ShouldEqual, 3)
So(t.failedRuns, ShouldEqual, 0)
})
Convey("submit multiple jobs with nesting", func() {
m := &Mock1{queue: make(map[string]int)}
m.delay = time.Millisecond * 100
pj := newCollectorJob(nil, time.Second*1, m, nil, "")
prs := make([]*processNode, 0)
pus := make([]*publishNode, 0)
counter := 0
t := &task{manager: m, id: "1", name: "mock"}
// 3 proc + 3 pub
for x := 0; x < 3; x++ {
n := cdata.NewNode()
pr := &processNode{config: n, name: fmt.Sprintf("prjob%d", counter)}
pu := &publishNode{config: n, name: fmt.Sprintf("pujob%d", counter)}
counter++
prs = append(prs, pr)
pus = append(pus, pu)
}
// 3 proc => 3 proc + 3 pub
for _, pr := range prs {
cprs := make([]*processNode, 0)
cpus := make([]*publishNode, 0)
for x := 0; x < 3; x++ {
n := cdata.NewNode()
cpr := &processNode{config: n, name: fmt.Sprintf("prjobchild%d", counter)}
cpu := &publishNode{config: n, name: fmt.Sprintf("pujobchild%d", counter)}
counter++
cprs = append(cprs, cpr)
cpus = append(cpus, cpu)
}
pr.ProcessNodes = cprs
pr.PublishNodes = cpus
}
workJobs(prs, pus, t, pj)
// (3*3)+3
So(m.queue["processor"], ShouldEqual, 12)
// (3*3)
So(m.queue["publisher"], ShouldEqual, 12)
So(t.failedRuns, ShouldEqual, 0)
})
Convey("submit multiple jobs where one has an error", func() {
m := &Mock1{queue: make(map[string]int)}
// make the 13th job fail
m.errorIndex = 13
m.delay = time.Millisecond * 100
pj := newCollectorJob(nil, time.Second*1, m, nil, "")
prs := make([]*processNode, 0)
pus := make([]*publishNode, 0)
counter := 0
t := &task{manager: m, id: "1", name: "mock"}
// 3 proc + 3 pub
for x := 0; x < 3; x++ {
n := cdata.NewNode()
pr := &processNode{config: n, name: fmt.Sprintf("prjob%d", counter)}
pu := &publishNode{config: n, name: fmt.Sprintf("pujob%d", counter)}
counter++
prs = append(prs, pr)
pus = append(pus, pu)
}
// 3 proc => 3 proc + 3 pub
for _, pr := range prs {
cprs := make([]*processNode, 0)
cpus := make([]*publishNode, 0)
for x := 0; x < 3; x++ {
n := cdata.NewNode()
cpr := &processNode{config: n, name: fmt.Sprintf("prjobchild%d", counter)}
cpu := &publishNode{config: n, name: fmt.Sprintf("pujobchild%d", counter)}
counter++
cprs = append(cprs, cpr)
cpus = append(cpus, cpu)
}
pr.ProcessNodes = cprs
pr.PublishNodes = cpus
}
workJobs(prs, pus, t, pj)
// (3*3)+3
So(m.queue["processor"], ShouldEqual, 12)
// (3*3)
So(m.queue["publisher"], ShouldEqual, 12)
So(t.failedRuns, ShouldEqual, 1)
So(t.lastFailureMessage, ShouldEqual, "I am an error")
})

})
}

0 comments on commit 99c1b19

Please sign in to comment.