diff --git a/scheduler/fixtures/fixtures.go b/scheduler/fixtures/fixtures.go index dd79e41ee..06bb267a1 100644 --- a/scheduler/fixtures/fixtures.go +++ b/scheduler/fixtures/fixtures.go @@ -27,6 +27,7 @@ import "github.com/intelsdi-x/snap/core/scheduler_event" type listenToSchedulerEvent struct { Ended chan struct{} UnsubscribedPluginEvents chan *scheduler_event.PluginsUnsubscribedEvent + TaskStoppedEvents chan struct{} } // NewListenToSchedulerEvent @@ -34,6 +35,7 @@ func NewListenToSchedulerEvent() *listenToSchedulerEvent { return &listenToSchedulerEvent{ Ended: make(chan struct{}), UnsubscribedPluginEvents: make(chan *scheduler_event.PluginsUnsubscribedEvent), + TaskStoppedEvents: make(chan struct{}), } } @@ -43,5 +45,7 @@ func (l *listenToSchedulerEvent) HandleGomitEvent(e gomit.Event) { l.Ended <- struct{}{} case *scheduler_event.PluginsUnsubscribedEvent: l.UnsubscribedPluginEvents <- msg + case *scheduler_event.TaskStoppedEvent: + l.TaskStoppedEvents <- struct{}{} } } diff --git a/scheduler/scheduler.go b/scheduler/scheduler.go index abd7a6e31..62644c851 100644 --- a/scheduler/scheduler.go +++ b/scheduler/scheduler.go @@ -569,16 +569,6 @@ func (s *scheduler) stopTask(id, source string) []serror.SnapError { serror.New(ErrTaskDisabledNotStoppable), } default: - - if errs := t.UnsubscribePlugins(); len(errs) != 0 { - return errs - } - - event := &scheduler_event.TaskStoppedEvent{ - TaskID: t.ID(), - Source: source, - } - defer s.eventManager.Emit(event) t.Stop() logger.WithFields(log.Fields{ "task-id": t.ID(), @@ -753,6 +743,9 @@ func (s *scheduler) HandleGomitEvent(e gomit.Event) { "event-namespace": e.Namespace(), "task-id": v.TaskID, }).Debug("event received") + // We need to unsubscribe from deps when a task has stopped + task, _ := s.getTask(v.TaskID) + task.UnsubscribePlugins() s.taskWatcherColl.handleTaskStopped(v.TaskID) case *scheduler_event.TaskEndedEvent: log.WithFields(log.Fields{ diff --git a/scheduler/scheduler_medium_test.go b/scheduler/scheduler_medium_test.go index a176b4675..9130c3d84 100644 --- a/scheduler/scheduler_medium_test.go +++ b/scheduler/scheduler_medium_test.go @@ -43,6 +43,7 @@ type mockMetricManager struct { failValidatingMetricsAfter int failuredSoFar int autodiscoverPaths []string + timeToWait time.Duration } func (m *mockMetricManager) StreamMetrics(string, map[string]map[string]string, time.Duration, int64) (chan []core.Metric, chan error, []error) { @@ -54,6 +55,7 @@ func (m *mockMetricManager) CollectMetrics(string, map[string]map[string]string) } func (m *mockMetricManager) PublishMetrics([]core.Metric, map[string]ctypes.ConfigValue, string, string, int) []error { + time.Sleep(m.timeToWait) return nil } @@ -349,9 +351,12 @@ func TestCreateTask(t *testing.T) { func TestStopTask(t *testing.T) { logrus.SetLevel(logrus.FatalLevel) - s := newScheduler() - s.Start() + c := new(mockMetricManager) + cfg := GetDefaultConfig() + s := New(cfg) + s.SetMetricManager(c) w := newMockWorkflowMap() + s.Start() Convey("Calling StopTask on a running task", t, func() { sch := schedule.NewWindowedSchedule(interval, nil, nil, 0) @@ -372,6 +377,35 @@ func TestStopTask(t *testing.T) { So(tsk.State(), ShouldEqual, core.TaskStopped) }) }) + + Convey("Calling StopTask on a firing task", t, func() { + Convey("Should allow last scheduled workfow execution to finish", func() { + c.timeToWait = 500 * time.Millisecond + lse := fixtures.NewListenToSchedulerEvent() + s.eventManager.RegisterHandler("Scheduler.TaskStopped", lse) + sc := schedule.NewWindowedSchedule(time.Second, nil, nil, 0) + t, _ := s.CreateTask(sc, w, false) + startTime := time.Now() + t.(*task).Spin() + time.Sleep(100 * time.Millisecond) + //allowing things to settle and waiting for task state to change to firing + So(t.State(), ShouldResemble, core.TaskFiring) + // stop task when task state is firing + t.(*task).Stop() + // the last scheduled workflow execution should be allowed to finish + // so we expect that stopping the task is going to happen not early than 500ms (set by by timeToWait) + So(t.HitCount(), ShouldEqual, 1) + //the task should have fired once + select { + case <-lse.TaskStoppedEvents: + //elapsed time should be greater than 500ms + So(time.Since(startTime), ShouldBeGreaterThan, c.timeToWait) + So(t.State(), ShouldResemble, core.TaskStopped) + case <-time.After(1 * time.Second): + } + }) + }) + Convey("Calling StopTask on a stopped task", t, func() { sch := schedule.NewWindowedSchedule(interval, nil, nil, 0) tskStopped, _ := s.CreateTask(sch, w, false) diff --git a/scheduler/task.go b/scheduler/task.go index e6e2f0279..0de072f64 100644 --- a/scheduler/task.go +++ b/scheduler/task.go @@ -472,8 +472,6 @@ func (t *task) spin() { // If response show this schedule is still active we fire case schedule.Active: t.missedIntervals += sr.Missed() - t.lastFireTime = time.Now() - t.hitCount++ t.fire() if t.lastFailureTime == t.lastFireTime { consecutiveFailures++ @@ -528,6 +526,9 @@ func (t *task) spin() { t.state = core.TaskStopped t.lastFireTime = time.Time{} t.Unlock() + event := new(scheduler_event.TaskStoppedEvent) + event.TaskID = t.id + defer t.eventEmitter.Emit(event) return } } @@ -538,7 +539,9 @@ func (t *task) fire() { defer t.Unlock() t.state = core.TaskFiring + t.lastFireTime = time.Now() t.workflow.Start(t) + t.hitCount++ t.state = core.TaskSpinning }