diff --git a/scheduler/workflow.go b/scheduler/workflow.go index bbb4fd972..330ed7217 100644 --- a/scheduler/workflow.go +++ b/scheduler/workflow.go @@ -22,6 +22,7 @@ package scheduler import ( "errors" "fmt" + "sync" "github.com/intelsdi-x/gomit" @@ -323,7 +324,7 @@ func (s *schedulerWorkflow) Start(t *task) { defer s.eventEmitter.Emit(event) // walk through the tree and dispatch work - s.workJobs(s.processNodes, s.publishNodes, t, j) + workJobs(s.processNodes, s.publishNodes, t, j) } func (s *schedulerWorkflow) State() WorkflowState { @@ -334,27 +335,65 @@ func (s *schedulerWorkflow) StateString() string { return WorkflowStateLookup[s.state] } -func (s *schedulerWorkflow) workJobs(prs []*processNode, pus []*publishNode, t *task, pj job) { +// workJobs takes a slice of proccess and publish nodes and submits jobs for each for a task. +// It then iterates down any process nodes to submit their child node jobs for the task +func workJobs(prs []*processNode, pus []*publishNode, t *task, pj job) { + // Create waitgroup to block until all jobs are submitted + wg := &sync.WaitGroup{} + // range over the process jobs and call submitProcessJob for _, pr := range prs { - j := newProcessJob(pj, pr.Name(), pr.Version(), pr.InboundContentType, pr.config.Table(), t.metricsManager, t.id) - errors := t.manager.Work(j).Promise().Await() - if len(errors) != 0 { - t.failedRuns++ - t.lastFailureTime = t.lastFireTime - t.lastFailureMessage = errors[len(errors)-1].Error() - return - } - - s.workJobs(pr.ProcessNodes, pr.PublishNodes, t, j) + // increment the wait group (before starting goroutine to prevent a race condition) + wg.Add(1) + // Start goroutine to submit the process job + go submitProcessJob(pj, t, wg, pr) } + // range over the publish jobs and call submitPublishJob for _, pu := range pus { - j := newPublishJob(pj, pu.Name(), pu.Version(), pu.InboundContentType, pu.config.Table(), t.metricsManager, t.id) - errors := t.manager.Work(j).Promise().Await() - if len(errors) != 0 { - t.failedRuns++ - t.lastFailureTime = t.lastFireTime - t.lastFailureMessage = errors[len(errors)-1].Error() - return - } + // increment the wait group (before starting goroutine to prevent a race condition) + wg.Add(1) + // Start goroutine to submit the process job + go submitPublishJob(pj, t, wg, pu) + } + // Wait until all job submisson goroutines are done + wg.Wait() +} + +func submitProcessJob(pj job, t *task, wg *sync.WaitGroup, pr *processNode) { + // Decrement the waitgroup + defer wg.Done() + // Create a new process job + j := newProcessJob(pj, pr.Name(), pr.Version(), pr.InboundContentType, pr.config.Table(), t.metricsManager, t.id) + // Submit the job against the task.managesWork + errors := t.manager.Work(j).Promise().Await() + // Check for errors and update the task + if len(errors) != 0 { + // Note: we just update the error tracking, the change of state for a task is handled before this is reached. + t.failedRuns++ + t.lastFailureTime = t.lastFireTime + t.lastFailureMessage = errors[len(errors)-1].Error() + // Return and do not proceed further down the workflow + return + } + // Iterate into any child process or publish nodes + workJobs(pr.ProcessNodes, pr.PublishNodes, t, j) +} + +func submitPublishJob(pj job, t *task, wg *sync.WaitGroup, pu *publishNode) { + // Decrement the waitgroup + defer wg.Done() + // Create a new process job + j := newPublishJob(pj, pu.Name(), pu.Version(), pu.InboundContentType, pu.config.Table(), t.metricsManager, t.id) + // Submit the job against the task.managesWork + errors := t.manager.Work(j).Promise().Await() + // Check for errors and update the task + if len(errors) != 0 { + // Note: we just update the error tracking, the change of state for a task is handled before this is reached. + t.failedRuns++ + t.lastFailureTime = t.lastFireTime + t.lastFailureMessage = errors[len(errors)-1].Error() + // Return and do not proceed further down the workflow + return } + // Publish nodes cannot contain child nodes (publish is a terminal node) + // so unlike process nodes there is not a call to workJobs here for child nodes. }