Skip to content

Commit

Permalink
Fixes intelsdi-x#1598 and intelsdi-x#1601 - ability to stop tasks gra…
Browse files Browse the repository at this point in the history
…cefully
  • Loading branch information
rashmigottipati committed Apr 13, 2017
1 parent c068511 commit bfbb8e2
Show file tree
Hide file tree
Showing 4 changed files with 48 additions and 14 deletions.
4 changes: 4 additions & 0 deletions scheduler/fixtures/fixtures.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,15 @@ import "github.com/intelsdi-x/snap/core/scheduler_event"
type listenToSchedulerEvent struct {
Ended chan struct{}
UnsubscribedPluginEvents chan *scheduler_event.PluginsUnsubscribedEvent
TaskStoppedEvents chan struct{}
}

// NewListenToSchedulerEvent
func NewListenToSchedulerEvent() *listenToSchedulerEvent {
return &listenToSchedulerEvent{
Ended: make(chan struct{}),
UnsubscribedPluginEvents: make(chan *scheduler_event.PluginsUnsubscribedEvent),
TaskStoppedEvents: make(chan struct{}),
}
}

Expand All @@ -43,5 +45,7 @@ func (l *listenToSchedulerEvent) HandleGomitEvent(e gomit.Event) {
l.Ended <- struct{}{}
case *scheduler_event.PluginsUnsubscribedEvent:
l.UnsubscribedPluginEvents <- msg
case *scheduler_event.TaskStoppedEvent:
l.TaskStoppedEvents <- struct{}{}
}
}
13 changes: 3 additions & 10 deletions scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -569,16 +569,6 @@ func (s *scheduler) stopTask(id, source string) []serror.SnapError {
serror.New(ErrTaskDisabledNotStoppable),
}
default:

if errs := t.UnsubscribePlugins(); len(errs) != 0 {
return errs
}

event := &scheduler_event.TaskStoppedEvent{
TaskID: t.ID(),
Source: source,
}
defer s.eventManager.Emit(event)
t.Stop()
logger.WithFields(log.Fields{
"task-id": t.ID(),
Expand Down Expand Up @@ -753,6 +743,9 @@ func (s *scheduler) HandleGomitEvent(e gomit.Event) {
"event-namespace": e.Namespace(),
"task-id": v.TaskID,
}).Debug("event received")
// We need to unsubscribe from deps when a task has stopped
task, _ := s.getTask(v.TaskID)
task.UnsubscribePlugins()
s.taskWatcherColl.handleTaskStopped(v.TaskID)
case *scheduler_event.TaskEndedEvent:
log.WithFields(log.Fields{
Expand Down
38 changes: 36 additions & 2 deletions scheduler/scheduler_medium_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ type mockMetricManager struct {
failValidatingMetricsAfter int
failuredSoFar int
autodiscoverPaths []string
timeToWait time.Duration
}

func (m *mockMetricManager) StreamMetrics(string, map[string]map[string]string, time.Duration, int64) (chan []core.Metric, chan error, []error) {
Expand All @@ -54,6 +55,7 @@ func (m *mockMetricManager) CollectMetrics(string, map[string]map[string]string)
}

func (m *mockMetricManager) PublishMetrics([]core.Metric, map[string]ctypes.ConfigValue, string, string, int) []error {
time.Sleep(m.timeToWait)
return nil
}

Expand Down Expand Up @@ -349,9 +351,12 @@ func TestCreateTask(t *testing.T) {

func TestStopTask(t *testing.T) {
logrus.SetLevel(logrus.FatalLevel)
s := newScheduler()
s.Start()
c := new(mockMetricManager)
cfg := GetDefaultConfig()
s := New(cfg)
s.SetMetricManager(c)
w := newMockWorkflowMap()
s.Start()

Convey("Calling StopTask on a running task", t, func() {
sch := schedule.NewWindowedSchedule(interval, nil, nil, 0)
Expand All @@ -372,6 +377,35 @@ func TestStopTask(t *testing.T) {
So(tsk.State(), ShouldEqual, core.TaskStopped)
})
})

Convey("Calling StopTask on a firing task", t, func() {
Convey("Should allow last scheduled workfow execution to finish", func() {
c.timeToWait = 500 * time.Millisecond
lse := fixtures.NewListenToSchedulerEvent()
s.eventManager.RegisterHandler("Scheduler.TaskStopped", lse)
sc := schedule.NewWindowedSchedule(time.Second, nil, nil, 0)
t, _ := s.CreateTask(sc, w, false)
startTime := time.Now()
t.(*task).Spin()
time.Sleep(100 * time.Millisecond)
//allowing things to settle and waiting for task state to change to firing
So(t.State(), ShouldResemble, core.TaskFiring)
// stop task when task state is firing
t.(*task).Stop()
// the last scheduled workflow execution should be allowed to finish
// so we expect that stopping the task is going to happen not early than 500ms (set by by timeToWait)
So(t.HitCount(), ShouldEqual, 1)
//the task should have fired once
select {
case <-lse.TaskStoppedEvents:
//elapsed time should be greater than 500ms
So(time.Since(startTime), ShouldBeGreaterThan, c.timeToWait)
So(t.State(), ShouldResemble, core.TaskStopped)
case <-time.After(1 * time.Second):
}
})
})

Convey("Calling StopTask on a stopped task", t, func() {
sch := schedule.NewWindowedSchedule(interval, nil, nil, 0)
tskStopped, _ := s.CreateTask(sch, w, false)
Expand Down
7 changes: 5 additions & 2 deletions scheduler/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -472,8 +472,6 @@ func (t *task) spin() {
// If response show this schedule is still active we fire
case schedule.Active:
t.missedIntervals += sr.Missed()
t.lastFireTime = time.Now()
t.hitCount++
t.fire()
if t.lastFailureTime == t.lastFireTime {
consecutiveFailures++
Expand Down Expand Up @@ -528,6 +526,9 @@ func (t *task) spin() {
t.state = core.TaskStopped
t.lastFireTime = time.Time{}
t.Unlock()
event := new(scheduler_event.TaskStoppedEvent)
event.TaskID = t.id
defer t.eventEmitter.Emit(event)
return
}
}
Expand All @@ -538,7 +539,9 @@ func (t *task) fire() {
defer t.Unlock()

t.state = core.TaskFiring
t.lastFireTime = time.Now()
t.workflow.Start(t)
t.hitCount++
t.state = core.TaskSpinning
}

Expand Down

0 comments on commit bfbb8e2

Please sign in to comment.