Skip to content
This repository has been archived by the owner on Nov 8, 2022. It is now read-only.

Commit

Permalink
Adds RecordFailure() to task for recording run failures safely. Refac…
Browse files Browse the repository at this point in the history
…tors submitProcessJob and submitPublishJob to use this new function.
  • Loading branch information
lynxbat committed Mar 4, 2016
1 parent 90e3dc5 commit 8f1398c
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 13 deletions.
10 changes: 10 additions & 0 deletions scheduler/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -348,6 +348,16 @@ func (t *task) waitForSchedule() {
}
}

// RecordFailure updates the failed runs and last failure properties
func (t *task) RecordFailure(e []error) {
// We synchronize this update to ensure it is atomic
t.Lock()
defer t.Unlock()
t.failedRuns++
t.lastFailureTime = t.lastFireTime
t.lastFailureMessage = e[len(e)-1].Error()
}

type taskCollection struct {
*sync.Mutex

Expand Down
20 changes: 7 additions & 13 deletions scheduler/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -307,9 +307,7 @@ func (s *schedulerWorkflow) Start(t *task) {
errors := t.manager.Work(j).Promise().Await()

if len(errors) != 0 {
t.failedRuns++
t.lastFailureTime = t.lastFireTime
t.lastFailureMessage = j.Errors()[len(j.Errors())-1].Error()
t.RecordFailure(j.Errors())
event := new(scheduler_event.MetricCollectionFailedEvent)
event.TaskID = t.id
event.Errors = errors
Expand Down Expand Up @@ -367,11 +365,9 @@ func submitProcessJob(pj job, t *task, wg *sync.WaitGroup, pr *processNode) {
errors := t.manager.Work(j).Promise().Await()
// Check for errors and update the task
if len(errors) != 0 {
// Note: we just update the error tracking, the change of state for a task is handled before this is reached.
t.failedRuns++
t.lastFailureTime = t.lastFireTime
t.lastFailureMessage = errors[len(errors)-1].Error()
// Return and do not proceed further down the workflow
// Record the failures in the task
// note: this function is thread safe against t
t.RecordFailure(errors)
return
}
// Iterate into any child process or publish nodes
Expand All @@ -387,11 +383,9 @@ func submitPublishJob(pj job, t *task, wg *sync.WaitGroup, pu *publishNode) {
errors := t.manager.Work(j).Promise().Await()
// Check for errors and update the task
if len(errors) != 0 {
// Note: we just update the error tracking, the change of state for a task is handled before this is reached.
t.failedRuns++
t.lastFailureTime = t.lastFireTime
t.lastFailureMessage = errors[len(errors)-1].Error()
// Return and do not proceed further down the workflow
// Record the failures in the task
// note: this function is thread safe against t
t.RecordFailure(errors)
return
}
// Publish nodes cannot contain child nodes (publish is a terminal node)
Expand Down

0 comments on commit 8f1398c

Please sign in to comment.