diff --git a/scheduler/distributed_task_test.go b/scheduler/distributed_task_test.go index 7849d42ec..81f55edcf 100644 --- a/scheduler/distributed_task_test.go +++ b/scheduler/distributed_task_test.go @@ -37,6 +37,7 @@ import ( "github.com/intelsdi-x/snap/grpc/controlproxy" "github.com/intelsdi-x/snap/pkg/schedule" "github.com/intelsdi-x/snap/plugin/helper" + "github.com/intelsdi-x/snap/scheduler/fixtures" "github.com/intelsdi-x/snap/scheduler/wmap" . "github.com/smartystreets/goconvey/convey" ) @@ -302,6 +303,8 @@ func TestDistributedSubscriptions(t *testing.T) { }) }) Convey("Single run task", func() { + lse := fixtures.NewListenToSchedulerEvent() + s.eventManager.RegisterHandler("Scheduler.TaskEnded", lse) count := uint(1) interval := time.Millisecond * 100 sch := schedule.NewWindowedSchedule(interval, nil, nil, count) @@ -323,9 +326,11 @@ func TestDistributedSubscriptions(t *testing.T) { So(remoteMockManager.SubscribeCallCount, ShouldBeGreaterThan, 0) }) Convey("Task should be ended after an interval", func() { - // wait for the end of the task - // we are ok to extend sleeping by 100ms to allow to complete post-schedule activities - time.Sleep(interval + time.Millisecond*100) + // wait for the end of the task (or timeout) + select { + case <-lse.Ended: + case <-time.After(time.Duration(interval.Nanoseconds()*int64(count)+interval.Nanoseconds()) + 1*time.Second): + } So(t.State(), ShouldEqual, core.TaskEnded) Convey("So all dependencies should have been usubscribed", func() { diff --git a/scheduler/fixtures/fixtures.go b/scheduler/fixtures/fixtures.go new file mode 100644 index 000000000..4571686b9 --- /dev/null +++ b/scheduler/fixtures/fixtures.go @@ -0,0 +1,43 @@ +// + build medium legacy + +/* +http://www.apache.org/licenses/LICENSE-2.0.txt + + +Copyright 2017 Intel Corporation + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package fixtures + +import "github.com/intelsdi-x/gomit" +import "github.com/intelsdi-x/snap/core/scheduler_event" + +type listenToSchedulerEvent struct { + Ended chan struct{} +} + +// NewListenToSchedulerEvent +func NewListenToSchedulerEvent() *listenToSchedulerEvent { + return &listenToSchedulerEvent{ + Ended: make(chan struct{}), + } +} + +func (l *listenToSchedulerEvent) HandleGomitEvent(e gomit.Event) { + switch e.Body.(type) { + case *scheduler_event.TaskEndedEvent: + l.Ended <- struct{}{} + } +} diff --git a/scheduler/scheduler_medium_test.go b/scheduler/scheduler_medium_test.go index 43b400223..d8688916f 100644 --- a/scheduler/scheduler_medium_test.go +++ b/scheduler/scheduler_medium_test.go @@ -34,6 +34,7 @@ import ( "github.com/intelsdi-x/snap/core/ctypes" "github.com/intelsdi-x/snap/core/serror" "github.com/intelsdi-x/snap/pkg/schedule" + "github.com/intelsdi-x/snap/scheduler/fixtures" "github.com/intelsdi-x/snap/scheduler/wmap" ) @@ -250,6 +251,8 @@ func TestCreateTask(t *testing.T) { }) }) Convey("should not error when the schedule is valid", func() { + lse := fixtures.NewListenToSchedulerEvent() + s.eventManager.RegisterHandler("Scheduler.TaskEnded", lse) start := time.Now().Add(startWait) stop := time.Now().Add(startWait + windowSize) sch := schedule.NewWindowedSchedule(interval, &start, &stop, 0) @@ -260,11 +263,11 @@ func TestCreateTask(t *testing.T) { task := s.tasks.Get(tsk.ID()) task.Spin() Convey("the task should be ended after reaching the end of window", func() { - // wait for the end of determined window - time.Sleep(startWait + windowSize) - // wait an interval to be sure that the task state has been updated - // we are ok to extend sleeping by 100ms to allow to complete post-schedule activities - time.Sleep(interval + time.Millisecond*100) + // wait for task ended event (or timeout) + select { + case <-lse.Ended: + case <-time.After(stop.Add(1 * time.Second).Sub(start)): + } // check if the task is ended So(tsk.State(), ShouldEqual, core.TaskEnded) }) @@ -290,6 +293,8 @@ func TestCreateTask(t *testing.T) { }) }) Convey("Single run task firing on defined start time", func() { + lse := fixtures.NewListenToSchedulerEvent() + s.eventManager.RegisterHandler("Scheduler.TaskEnded", lse) count := uint(1) start := time.Now().Add(startWait) sch := schedule.NewWindowedSchedule(interval, &start, nil, count) @@ -300,11 +305,11 @@ func TestCreateTask(t *testing.T) { task := s.tasks.Get(tsk.ID()) task.Spin() Convey("the task should be ended after reaching the end of window", func() { - // wait for the end of determined window - time.Sleep(startWait) - // wait an interval to be sure that the task state has been updated - // we are ok to extend sleeping by 100ms to allow to complete post-schedule activities - time.Sleep(interval + time.Millisecond*100) + // wait for task ended event (or timeout) + select { + case <-lse.Ended: + case <-time.After(time.Duration(interval.Nanoseconds()*int64(count)+interval.Nanoseconds()) + 1*time.Second): + } // check if the task is ended So(tsk.State(), ShouldEqual, core.TaskEnded) }) @@ -400,6 +405,8 @@ func TestStopTask(t *testing.T) { }) }) Convey("Calling StopTask on an ended task", t, func() { + lse := fixtures.NewListenToSchedulerEvent() + s.eventManager.RegisterHandler("Scheduler.TaskEnded", lse) start := time.Now().Add(startWait) stop := time.Now().Add(startWait + windowSize) @@ -412,11 +419,11 @@ func TestStopTask(t *testing.T) { task := s.tasks.Get(tsk.ID()) task.Spin() - // wait for the end of determined window - time.Sleep(startWait + windowSize) - // wait an interval to be sure that the task state has been updated - // we are ok to extend sleeping by 100ms to allow to complete post-schedule activities - time.Sleep(interval + time.Millisecond*100) + // wait for task ended event (or timeout) + select { + case <-lse.Ended: + case <-time.After(stop.Add(1 * time.Second).Sub(start)): + } // check if the task is ended So(tsk.State(), ShouldEqual, core.TaskEnded) @@ -486,6 +493,8 @@ func TestStartTask(t *testing.T) { }) }) Convey("Calling StartTask on an ended windowed task", t, func() { + lse := fixtures.NewListenToSchedulerEvent() + s.eventManager.RegisterHandler("Scheduler.TaskEnded", lse) start := time.Now().Add(startWait) stop := time.Now().Add(startWait + windowSize) @@ -498,11 +507,11 @@ func TestStartTask(t *testing.T) { task := s.tasks.Get(tsk.ID()) task.Spin() - // wait for the end of determined window - time.Sleep(startWait + windowSize) - // wait an interval to be sure that the task state has been updated - // we are ok to extend sleeping by 100ms to allow to complete post-schedule activities - time.Sleep(interval + time.Millisecond*100) + // wait for task ended event (or timeout) + select { + case <-lse.Ended: + case <-time.After(stop.Add(1 * time.Second).Sub(start)): + } // check if the task is ended So(tsk.State(), ShouldEqual, core.TaskEnded) @@ -583,6 +592,8 @@ func TestEnableTask(t *testing.T) { }) }) Convey("Calling EnableTask on an ended task", t, func() { + lse := fixtures.NewListenToSchedulerEvent() + s.eventManager.RegisterHandler("Scheduler.TaskEnded", lse) start := time.Now().Add(startWait) stop := time.Now().Add(startWait + windowSize) @@ -595,12 +606,11 @@ func TestEnableTask(t *testing.T) { task := s.tasks.Get(tsk.ID()) task.Spin() - // wait for the end of determined window - time.Sleep(startWait + windowSize) - // wait an interval to be sure that the task state has been updated - /// we are ok to extend sleeping by 100ms to allow to complete post-schedule activities - time.Sleep(interval + time.Millisecond*100) - + // wait for task ended event (or timeout) + select { + case <-lse.Ended: + case <-time.After(stop.Add(1 * time.Second).Sub(start)): + } // check if the task is ended So(tsk.State(), ShouldEqual, core.TaskEnded)