Skip to content

Commit

Permalink
Fixes intelsdi-x#1598 and intelsdi-x#1601 - ability to stop tasks gra…
Browse files Browse the repository at this point in the history
…cefully
  • Loading branch information
rashmigottipati committed Apr 13, 2017
1 parent c068511 commit 472e549
Show file tree
Hide file tree
Showing 5 changed files with 49 additions and 12 deletions.
14 changes: 14 additions & 0 deletions examples/tasks/mock-file.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
---
version: 1
schedule:
type: "simple"
interval: "1s"
workflow:
collect:
metrics:
/intel/mock/foo: {}
config:
/intel/mock:
name: "root"
password: "secret"
time_to_wait: "5s"
9 changes: 9 additions & 0 deletions plugin/collector/snap-plugin-collector-mock2/mock/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,15 @@ func (f *Mock) CollectMetrics(mts []plugin.MetricType) ([]plugin.MetricType, err
panic("Oops!")
}

if c, ok := mts[i].Config().Table()["time_to_wait"]; ok {
dur, err := time.ParseDuration(c.(ctypes.ConfigValueStr).Value)
if err != nil {
log.Println(err.Error())
} else {
time.Sleep(dur)
}
}

if isDynamic, _ := mts[i].Namespace().IsDynamic(); isDynamic {
requestedHosts := []string{}

Expand Down
13 changes: 3 additions & 10 deletions scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -569,16 +569,6 @@ func (s *scheduler) stopTask(id, source string) []serror.SnapError {
serror.New(ErrTaskDisabledNotStoppable),
}
default:

if errs := t.UnsubscribePlugins(); len(errs) != 0 {
return errs
}

event := &scheduler_event.TaskStoppedEvent{
TaskID: t.ID(),
Source: source,
}
defer s.eventManager.Emit(event)
t.Stop()
logger.WithFields(log.Fields{
"task-id": t.ID(),
Expand Down Expand Up @@ -753,6 +743,9 @@ func (s *scheduler) HandleGomitEvent(e gomit.Event) {
"event-namespace": e.Namespace(),
"task-id": v.TaskID,
}).Debug("event received")
// We need to unsubscribe from deps when a task has stopped
task, _ := s.getTask(v.TaskID)
task.UnsubscribePlugins()
s.taskWatcherColl.handleTaskStopped(v.TaskID)
case *scheduler_event.TaskEndedEvent:
log.WithFields(log.Fields{
Expand Down
18 changes: 18 additions & 0 deletions scheduler/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,13 +45,15 @@ type mockMetricManager struct {
failValidatingMetricsAfter int
failuredSoFar int
autodiscoverPaths []string
timeToWait time.Duration
}

func (m *mockMetricManager) StreamMetrics(string, map[string]map[string]string, time.Duration, int64) (chan []core.Metric, chan error, []error) {
return nil, nil, nil
}

func (m *mockMetricManager) CollectMetrics(string, map[string]map[string]string) ([]core.Metric, []error) {
time.Sleep(m.timeToWait)
return nil, nil
}

Expand Down Expand Up @@ -202,6 +204,22 @@ func TestScheduler(t *testing.T) {
_, te := s.CreateTask(schedule.NewWindowedSchedule(time.Second, nil, nil, 0), w, false)
So(te.Errors(), ShouldBeEmpty)

Convey("stop task", func() {
t, _ := s.CreateTask(schedule.NewWindowedSchedule(time.Second, nil, nil, 0), w, false)
c.timeToWait = 500 * time.Millisecond
start := time.Now().UnixNano() / int64(time.Millisecond)
t.(*task).Spin()
So(t.State(), ShouldResemble, core.TaskSpinning)
time.Sleep(50 * time.Millisecond)
t.(*task).Stop()
end := time.Now().UnixNano() / int64(time.Millisecond)
elapsed := time.Duration(end - start)
So(elapsed*1e6, ShouldBeGreaterThanOrEqualTo, 500*time.Millisecond)
time.Sleep(50 * time.Millisecond)
So(t.State(), ShouldResemble, core.TaskStopped)
c.timeToWait = 0
})

Convey("returns errors when metrics do not validate", func() {
c.failValidatingMetrics = true
c.failValidatingMetricsAfter = 1
Expand Down
7 changes: 5 additions & 2 deletions scheduler/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -472,8 +472,6 @@ func (t *task) spin() {
// If response show this schedule is still active we fire
case schedule.Active:
t.missedIntervals += sr.Missed()
t.lastFireTime = time.Now()
t.hitCount++
t.fire()
if t.lastFailureTime == t.lastFireTime {
consecutiveFailures++
Expand Down Expand Up @@ -528,6 +526,9 @@ func (t *task) spin() {
t.state = core.TaskStopped
t.lastFireTime = time.Time{}
t.Unlock()
event := new(scheduler_event.TaskStoppedEvent)
event.TaskID = t.id
defer t.eventEmitter.Emit(event)
return
}
}
Expand All @@ -538,7 +539,9 @@ func (t *task) fire() {
defer t.Unlock()

t.state = core.TaskFiring
t.lastFireTime = time.Now()
t.workflow.Start(t)
t.hitCount++
t.state = core.TaskSpinning
}

Expand Down

0 comments on commit 472e549

Please sign in to comment.