Skip to content

Commit

Permalink
Fixes intelsdi-x#1520 - Ended task flow
Browse files Browse the repository at this point in the history
  • Loading branch information
IzabellaRaulin committed Feb 16, 2017
1 parent 1cdc8a7 commit 5f2f16d
Show file tree
Hide file tree
Showing 11 changed files with 95 additions and 7 deletions.
11 changes: 11 additions & 0 deletions core/scheduler_event/scheduler_event.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ const (
TaskDeleted = "Scheduler.TaskDeleted"
TaskStarted = "Scheduler.TaskStarted"
TaskStopped = "Scheduler.TaskStopped"
TaskEnded = "Scheduler.TaskEnded"
TaskDisabled = "Scheduler.TaskDisabled"
MetricCollected = "Scheduler.MetricsCollected"
MetricCollectionFailed = "Scheduler.MetricCollectionFailed"
Expand Down Expand Up @@ -70,6 +71,16 @@ func (e TaskStoppedEvent) Namespace() string {
return TaskStopped
}


type TaskEndedEvent struct {
TaskID string
Source string
}

func (e TaskEndedEvent) Namespace() string {
return TaskEnded
}

type TaskDisabledEvent struct {
TaskID string
Why string
Expand Down
1 change: 1 addition & 0 deletions core/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ type TaskWatcherHandler interface {
CatchCollection([]Metric)
CatchTaskStarted()
CatchTaskStopped()
CatchTaskEnded()
CatchTaskDisabled(string)
}

Expand Down
3 changes: 2 additions & 1 deletion mgmt/rest/client/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,11 +153,12 @@ func (c *Client) WatchTask(id string) *WatchTasksResult {
r.Close()
return
}
//todo iza - ask about if for ended/completed task 'r' should be closed or not
switch ste.EventType {
case rbody.TaskWatchTaskDisabled:
r.EventChan <- ste
r.Close()
case rbody.TaskWatchTaskStopped, rbody.TaskWatchTaskStarted, rbody.TaskWatchMetricEvent:
case rbody.TaskWatchTaskStopped, rbody.TaskWatchTaskEnded, rbody.TaskWatchTaskStarted, rbody.TaskWatchMetricEvent:
r.EventChan <- ste
}
}
Expand Down
2 changes: 1 addition & 1 deletion mgmt/rest/rest_v1_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ func watchTask(id string, port int) *watchTaskResult {
r.eventChan <- ste.EventType
r.close()
return
case rbody.TaskWatchTaskStopped, rbody.TaskWatchTaskStarted, rbody.TaskWatchMetricEvent:
case rbody.TaskWatchTaskStopped, rbody.TaskWatchTaskEnded, rbody.TaskWatchTaskStarted, rbody.TaskWatchMetricEvent:
log.Info(ste.EventType)
r.eventChan <- ste.EventType
}
Expand Down
3 changes: 2 additions & 1 deletion mgmt/rest/v1/rbody/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ const (
AddScheduledTaskType = "scheduled_task_created"
ScheduledTaskType = "scheduled_task"
ScheduledTaskStartedType = "scheduled_task_started"
ScheduledTaskStoppedType = "scheduled_task_stopped"
ScheduledTaskStoppedType = "scheduled_task_ended"
ScheduledTaskRemovedType = "scheduled_task_removed"
ScheduledTaskWatchingEndedType = "schedule_task_watch_ended"
ScheduledTaskEnabledType = "scheduled_task_enabled"
Expand All @@ -46,6 +46,7 @@ const (
TaskWatchTaskDisabled = "task-disabled"
TaskWatchTaskStarted = "task-started"
TaskWatchTaskStopped = "task-stopped"
TaskWatchTaskEnded = "task-ended"
)

type ScheduledTaskListReturned struct {
Expand Down
8 changes: 7 additions & 1 deletion mgmt/rest/v1/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ func (s *apiV1) watchTask(w http.ResponseWriter, r *http.Request, p httprouter.P
// The client can decide to stop receiving on the stream on Task Stopped.
// We write the event to the buffer
fmt.Fprintf(w, "data: %s\n\n", e.ToJSON())
case rbody.TaskWatchTaskDisabled, rbody.TaskWatchTaskStopped:
case rbody.TaskWatchTaskDisabled, rbody.TaskWatchTaskStopped, rbody.TaskWatchTaskEnded:
// A disabled task should end the streaming and close the connection
fmt.Fprintf(w, "data: %s\n\n", e.ToJSON())
// Flush since we are sending nothing new
Expand Down Expand Up @@ -289,6 +289,12 @@ func (t *TaskWatchHandler) CatchTaskStopped() {
}
}

func (t *TaskWatchHandler) CatchTaskEnded() {
t.mChan <- rbody.StreamedTaskEvent{
EventType: rbody.TaskWatchTaskEnded,
}
}

func (t *TaskWatchHandler) CatchTaskDisabled(why string) {
t.mChan <- rbody.StreamedTaskEvent{
EventType: rbody.TaskWatchTaskDisabled,
Expand Down
9 changes: 8 additions & 1 deletion mgmt/rest/v2/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ const (
TaskWatchTaskDisabled = "task-disabled"
TaskWatchTaskStarted = "task-started"
TaskWatchTaskStopped = "task-stopped"
TaskWatchTaskEnded = "task-ended"
)

// The amount of time to buffer streaming events before flushing in seconds
Expand Down Expand Up @@ -95,7 +96,7 @@ func (s *apiV2) watchTask(w http.ResponseWriter, r *http.Request, p httprouter.P
// The client can decide to stop receiving on the stream on Task Stopped.
// We write the event to the buffer
fmt.Fprintf(w, "data: %s\n\n", e.ToJSON())
case TaskWatchTaskDisabled, TaskWatchTaskStopped:
case TaskWatchTaskDisabled, TaskWatchTaskStopped, TaskWatchTaskEnded:
// A disabled task should end the streaming and close the connection
fmt.Fprintf(w, "data: %s\n\n", e.ToJSON())
// Flush since we are sending nothing new
Expand Down Expand Up @@ -165,6 +166,12 @@ func (t *TaskWatchHandler) CatchTaskStopped() {
}
}

func (t *TaskWatchHandler) CatchTaskEnded() {
t.mChan <- StreamedTaskEvent{
EventType: TaskWatchTaskEnded,
}
}

func (t *TaskWatchHandler) CatchTaskDisabled(why string) {
t.mChan <- StreamedTaskEvent{
EventType: TaskWatchTaskDisabled,
Expand Down
30 changes: 30 additions & 0 deletions scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ var (
ErrTaskDisabledNotRunnable = errors.New("Task is disabled. Cannot be started.")
// ErrTaskDisabledNotStoppable - The error message for when a task is disabled and cannot be stopped
ErrTaskDisabledNotStoppable = errors.New("Task is disabled. Only running tasks can be stopped.")
// ErrTaskEndedNotStoppable - The error message for when a task is ended and cannot be stopped
ErrTaskEndedNotStoppable = errors.New("Task is ended. Only running tasks can be stopped.")
)

type schedulerState int
Expand Down Expand Up @@ -481,6 +483,18 @@ func (s *scheduler) startTask(id, source string) []serror.SnapError {
}
}

//Ensure the schedule is valid at this point and time.
//In particular case of restarting a task with windowed scheduler,
//the scheduler might be invalid because of stop time in in the past
if err := t.schedule.Validate(); err != nil {
logger.WithFields(log.Fields{
"task-id": t.ID(),
"task-state": t.State(),
}).Info("Not valid task schedule")
return []serror.SnapError{
serror.New(err),
}
}
// Group dependencies by the node they live on
// and subscribe to them.
depGroups := getWorkflowPlugins(t.workflow.processNodes, t.workflow.publishNodes, t.workflow.metrics)
Expand Down Expand Up @@ -559,6 +573,14 @@ func (s *scheduler) stopTask(id, source string) []serror.SnapError {
return []serror.SnapError{
serror.New(ErrTaskAlreadyStopped),
}
case core.TaskEnded:
logger.WithFields(log.Fields{
"task-id": t.ID(),
"task-state": t.State(),
}).Error("task is already ended")
return []serror.SnapError{
serror.New(ErrTaskEndedNotStoppable),
}
case core.TaskDisabled:
logger.WithFields(log.Fields{
"task-id": t.ID(),
Expand Down Expand Up @@ -768,6 +790,14 @@ func (s *scheduler) HandleGomitEvent(e gomit.Event) {
"task-id": v.TaskID,
}).Debug("event received")
s.taskWatcherColl.handleTaskStopped(v.TaskID)
case *scheduler_event.TaskEndedEvent:
log.WithFields(log.Fields{
"_module": "scheduler-events",
"_block": "handle-events",
"event-namespace": e.Namespace(),
"task-id": v.TaskID,
}).Debug("event received")
s.taskWatcherColl.handleTaskEnded(v.TaskID)
case *scheduler_event.TaskDisabledEvent:
log.WithFields(log.Fields{
"_module": "scheduler-events",
Expand Down
5 changes: 3 additions & 2 deletions scheduler/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ func newTask(s schedule.Schedule, wf *schedulerWorkflow, m *workManager, mm mana
for _, opt := range opts {
opt(task)
}

return task, nil
}

Expand Down Expand Up @@ -228,7 +229,7 @@ func (t *task) Spin() {
// misses for the interval while stopped.
t.lastFireTime = time.Time{}

if t.state == core.TaskStopped {
if t.state == core.TaskStopped || t.state == core.TaskEnded {
t.state = core.TaskSpinning
t.killChan = make(chan struct{})
// spin in a goroutine
Expand Down Expand Up @@ -433,7 +434,7 @@ func (t *taskCollection) remove(task *task) error {
t.Lock()
defer t.Unlock()
if _, ok := t.table[task.id]; ok {
if task.state != core.TaskStopped && task.state != core.TaskDisabled {
if task.state != core.TaskStopped && task.state != core.TaskDisabled && task.state != core.TaskEnded {
taskLogger.WithFields(log.Fields{
"_block": "remove",
"task id": task.id,
Expand Down
25 changes: 25 additions & 0 deletions scheduler/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,31 @@ func (t *taskWatcherCollection) handleTaskStopped(taskID string) {
}
}


func (t *taskWatcherCollection) handleTaskEnded(taskID string) {
t.mutex.Lock()
defer t.mutex.Unlock()
// no taskID means no watches, early exit
if t.coll[taskID] == nil || len(t.coll[taskID]) == 0 {
// Uncomment this debug line if needed. Otherwise this is too verbose for even debug level.
// watcherLog.WithFields(log.Fields{
// "task-id": taskID,
// }).Debug("no watchers")
return
}
// Walk all watchers for a task ID
for _, v := range t.coll[taskID] {
// Check if they have a catcher assigned
watcherLog.WithFields(log.Fields{
"task-id": taskID,
"task-watcher-id": v.id,
}).Debug("calling taskwatcher task ended func")
// Call the catcher
v.handler.CatchTaskEnded()
}
}


func (t *taskWatcherCollection) handleTaskDisabled(taskID string, why string) {
t.mutex.Lock()
defer t.mutex.Unlock()
Expand Down
5 changes: 5 additions & 0 deletions scheduler/watcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,11 @@ func (d *mockCatcher) CatchTaskStopped() {
sum++
}

func (d *mockCatcher) CatchTaskEnded() {
d.count++
sum++
}

func (d *mockCatcher) CatchTaskStarted() {
d.count++
sum++
Expand Down

0 comments on commit 5f2f16d

Please sign in to comment.