Skip to content
This repository has been archived by the owner on Nov 8, 2022. It is now read-only.

Commit

Permalink
Adds event listener to avoid sleep statements
Browse files Browse the repository at this point in the history
  • Loading branch information
jcooklin authored and IzabellaRaulin committed Mar 24, 2017
1 parent 4f681cb commit 4b98fbb
Show file tree
Hide file tree
Showing 3 changed files with 87 additions and 29 deletions.
11 changes: 8 additions & 3 deletions scheduler/distributed_task_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (
"github.com/intelsdi-x/snap/grpc/controlproxy"
"github.com/intelsdi-x/snap/pkg/schedule"
"github.com/intelsdi-x/snap/plugin/helper"
"github.com/intelsdi-x/snap/scheduler/fixtures"
"github.com/intelsdi-x/snap/scheduler/wmap"
. "github.com/smartystreets/goconvey/convey"
)
Expand Down Expand Up @@ -302,6 +303,8 @@ func TestDistributedSubscriptions(t *testing.T) {
})
})
Convey("Single run task", func() {
lse := fixtures.NewListenToSchedulerEvent()
s.eventManager.RegisterHandler("Scheduler.TaskEnded", lse)
count := uint(1)
interval := time.Millisecond * 100
sch := schedule.NewWindowedSchedule(interval, nil, nil, count)
Expand All @@ -323,9 +326,11 @@ func TestDistributedSubscriptions(t *testing.T) {
So(remoteMockManager.SubscribeCallCount, ShouldBeGreaterThan, 0)
})
Convey("Task should be ended after an interval", func() {
// wait for the end of the task
// we are ok to extend sleeping by 100ms to allow to complete post-schedule activities
time.Sleep(interval + time.Millisecond*100)
// wait for the end of the task (or timeout)
select {
case <-lse.Ended:
case <-time.After(time.Duration(interval.Nanoseconds()*int64(count)+interval.Nanoseconds()) + 1*time.Second):
}
So(t.State(), ShouldEqual, core.TaskEnded)

Convey("So all dependencies should have been usubscribed", func() {
Expand Down
43 changes: 43 additions & 0 deletions scheduler/fixtures/fixtures.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
// + build medium legacy

/*
http://www.apache.org/licenses/LICENSE-2.0.txt
Copyright 2017 Intel Corporation
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package fixtures

import "github.com/intelsdi-x/gomit"
import "github.com/intelsdi-x/snap/core/scheduler_event"

type listenToSchedulerEvent struct {
Ended chan struct{}
}

// NewListenToSchedulerEvent
func NewListenToSchedulerEvent() *listenToSchedulerEvent {
return &listenToSchedulerEvent{
Ended: make(chan struct{}),
}
}

func (l *listenToSchedulerEvent) HandleGomitEvent(e gomit.Event) {
switch e.Body.(type) {
case *scheduler_event.TaskEndedEvent:
l.Ended <- struct{}{}
}
}
62 changes: 36 additions & 26 deletions scheduler/scheduler_medium_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"github.com/intelsdi-x/snap/core/ctypes"
"github.com/intelsdi-x/snap/core/serror"
"github.com/intelsdi-x/snap/pkg/schedule"
"github.com/intelsdi-x/snap/scheduler/fixtures"
"github.com/intelsdi-x/snap/scheduler/wmap"
)

Expand Down Expand Up @@ -250,6 +251,8 @@ func TestCreateTask(t *testing.T) {
})
})
Convey("should not error when the schedule is valid", func() {
lse := fixtures.NewListenToSchedulerEvent()
s.eventManager.RegisterHandler("Scheduler.TaskEnded", lse)
start := time.Now().Add(startWait)
stop := time.Now().Add(startWait + windowSize)
sch := schedule.NewWindowedSchedule(interval, &start, &stop, 0)
Expand All @@ -260,11 +263,11 @@ func TestCreateTask(t *testing.T) {
task := s.tasks.Get(tsk.ID())
task.Spin()
Convey("the task should be ended after reaching the end of window", func() {
// wait for the end of determined window
time.Sleep(startWait + windowSize)
// wait an interval to be sure that the task state has been updated
// we are ok to extend sleeping by 100ms to allow to complete post-schedule activities
time.Sleep(interval + time.Millisecond*100)
// wait for task ended event (or timeout)
select {
case <-lse.Ended:
case <-time.After(stop.Add(1 * time.Second).Sub(start)):
}
// check if the task is ended
So(tsk.State(), ShouldEqual, core.TaskEnded)
})
Expand All @@ -290,6 +293,8 @@ func TestCreateTask(t *testing.T) {
})
})
Convey("Single run task firing on defined start time", func() {
lse := fixtures.NewListenToSchedulerEvent()
s.eventManager.RegisterHandler("Scheduler.TaskEnded", lse)
count := uint(1)
start := time.Now().Add(startWait)
sch := schedule.NewWindowedSchedule(interval, &start, nil, count)
Expand All @@ -300,11 +305,11 @@ func TestCreateTask(t *testing.T) {
task := s.tasks.Get(tsk.ID())
task.Spin()
Convey("the task should be ended after reaching the end of window", func() {
// wait for the end of determined window
time.Sleep(startWait)
// wait an interval to be sure that the task state has been updated
// we are ok to extend sleeping by 100ms to allow to complete post-schedule activities
time.Sleep(interval + time.Millisecond*100)
// wait for task ended event (or timeout)
select {
case <-lse.Ended:
case <-time.After(time.Duration(interval.Nanoseconds()*int64(count)+interval.Nanoseconds()) + 1*time.Second):
}
// check if the task is ended
So(tsk.State(), ShouldEqual, core.TaskEnded)
})
Expand Down Expand Up @@ -400,6 +405,8 @@ func TestStopTask(t *testing.T) {
})
})
Convey("Calling StopTask on an ended task", t, func() {
lse := fixtures.NewListenToSchedulerEvent()
s.eventManager.RegisterHandler("Scheduler.TaskEnded", lse)
start := time.Now().Add(startWait)
stop := time.Now().Add(startWait + windowSize)

Expand All @@ -412,11 +419,11 @@ func TestStopTask(t *testing.T) {
task := s.tasks.Get(tsk.ID())
task.Spin()

// wait for the end of determined window
time.Sleep(startWait + windowSize)
// wait an interval to be sure that the task state has been updated
// we are ok to extend sleeping by 100ms to allow to complete post-schedule activities
time.Sleep(interval + time.Millisecond*100)
// wait for task ended event (or timeout)
select {
case <-lse.Ended:
case <-time.After(stop.Add(1 * time.Second).Sub(start)):
}
// check if the task is ended
So(tsk.State(), ShouldEqual, core.TaskEnded)

Expand Down Expand Up @@ -486,6 +493,8 @@ func TestStartTask(t *testing.T) {
})
})
Convey("Calling StartTask on an ended windowed task", t, func() {
lse := fixtures.NewListenToSchedulerEvent()
s.eventManager.RegisterHandler("Scheduler.TaskEnded", lse)
start := time.Now().Add(startWait)
stop := time.Now().Add(startWait + windowSize)

Expand All @@ -498,11 +507,11 @@ func TestStartTask(t *testing.T) {
task := s.tasks.Get(tsk.ID())
task.Spin()

// wait for the end of determined window
time.Sleep(startWait + windowSize)
// wait an interval to be sure that the task state has been updated
// we are ok to extend sleeping by 100ms to allow to complete post-schedule activities
time.Sleep(interval + time.Millisecond*100)
// wait for task ended event (or timeout)
select {
case <-lse.Ended:
case <-time.After(stop.Add(1 * time.Second).Sub(start)):
}

// check if the task is ended
So(tsk.State(), ShouldEqual, core.TaskEnded)
Expand Down Expand Up @@ -583,6 +592,8 @@ func TestEnableTask(t *testing.T) {
})
})
Convey("Calling EnableTask on an ended task", t, func() {
lse := fixtures.NewListenToSchedulerEvent()
s.eventManager.RegisterHandler("Scheduler.TaskEnded", lse)
start := time.Now().Add(startWait)
stop := time.Now().Add(startWait + windowSize)

Expand All @@ -595,12 +606,11 @@ func TestEnableTask(t *testing.T) {
task := s.tasks.Get(tsk.ID())
task.Spin()

// wait for the end of determined window
time.Sleep(startWait + windowSize)
// wait an interval to be sure that the task state has been updated
/// we are ok to extend sleeping by 100ms to allow to complete post-schedule activities
time.Sleep(interval + time.Millisecond*100)

// wait for task ended event (or timeout)
select {
case <-lse.Ended:
case <-time.After(stop.Add(1 * time.Second).Sub(start)):
}
// check if the task is ended
So(tsk.State(), ShouldEqual, core.TaskEnded)

Expand Down

0 comments on commit 4b98fbb

Please sign in to comment.