Skip to content
This repository has been archived by the owner on Nov 8, 2022. It is now read-only.

Commit

Permalink
Ensures scheduler jobs submissions are sent concurrently for a workfl…
Browse files Browse the repository at this point in the history
…ow. Refactors scheduler.workJobs() to a package unexported function workJobs().
  • Loading branch information
lynxbat committed Mar 6, 2016
1 parent 0bbbdad commit cb7ead2
Showing 1 changed file with 59 additions and 20 deletions.
79 changes: 59 additions & 20 deletions scheduler/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ package scheduler
import (
"errors"
"fmt"
"sync"

"github.com/intelsdi-x/gomit"

Expand Down Expand Up @@ -323,7 +324,7 @@ func (s *schedulerWorkflow) Start(t *task) {
defer s.eventEmitter.Emit(event)

// walk through the tree and dispatch work
s.workJobs(s.processNodes, s.publishNodes, t, j)
workJobs(s.processNodes, s.publishNodes, t, j)
}

func (s *schedulerWorkflow) State() WorkflowState {
Expand All @@ -334,27 +335,65 @@ func (s *schedulerWorkflow) StateString() string {
return WorkflowStateLookup[s.state]
}

func (s *schedulerWorkflow) workJobs(prs []*processNode, pus []*publishNode, t *task, pj job) {
// workJobs takes a slice of proccess and publish nodes and submits jobs for each for a task.
// It then iterates down any process nodes to submit their child node jobs for the task
func workJobs(prs []*processNode, pus []*publishNode, t *task, pj job) {
// Create waitgroup to block until all jobs are submitted
wg := &sync.WaitGroup{}
// range over the process jobs and call submitProcessJob
for _, pr := range prs {
j := newProcessJob(pj, pr.Name(), pr.Version(), pr.InboundContentType, pr.config.Table(), t.metricsManager, t.id)
errors := t.manager.Work(j).Promise().Await()
if len(errors) != 0 {
t.failedRuns++
t.lastFailureTime = t.lastFireTime
t.lastFailureMessage = errors[len(errors)-1].Error()
return
}

s.workJobs(pr.ProcessNodes, pr.PublishNodes, t, j)
// increment the wait group (before starting goroutine to prevent a race condition)
wg.Add(1)
// Start goroutine to submit the process job
go submitProcessJob(pj, t, wg, pr)
}
// range over the publish jobs and call submitPublishJob
for _, pu := range pus {
j := newPublishJob(pj, pu.Name(), pu.Version(), pu.InboundContentType, pu.config.Table(), t.metricsManager, t.id)
errors := t.manager.Work(j).Promise().Await()
if len(errors) != 0 {
t.failedRuns++
t.lastFailureTime = t.lastFireTime
t.lastFailureMessage = errors[len(errors)-1].Error()
return
}
// increment the wait group (before starting goroutine to prevent a race condition)
wg.Add(1)
// Start goroutine to submit the process job
go submitPublishJob(pj, t, wg, pu)
}
// Wait until all job submisson goroutines are done
wg.Wait()
}

func submitProcessJob(pj job, t *task, wg *sync.WaitGroup, pr *processNode) {
// Decrement the waitgroup
defer wg.Done()
// Create a new process job
j := newProcessJob(pj, pr.Name(), pr.Version(), pr.InboundContentType, pr.config.Table(), t.metricsManager, t.id)
// Submit the job against the task.managesWork
errors := t.manager.Work(j).Promise().Await()
// Check for errors and update the task
if len(errors) != 0 {
// Note: we just update the error tracking, the change of state for a task is handled before this is reached.
t.failedRuns++
t.lastFailureTime = t.lastFireTime
t.lastFailureMessage = errors[len(errors)-1].Error()
// Return and do not proceed further down the workflow
return
}
// Iterate into any child process or publish nodes
workJobs(pr.ProcessNodes, pr.PublishNodes, t, j)
}

func submitPublishJob(pj job, t *task, wg *sync.WaitGroup, pu *publishNode) {
// Decrement the waitgroup
defer wg.Done()
// Create a new process job
j := newPublishJob(pj, pu.Name(), pu.Version(), pu.InboundContentType, pu.config.Table(), t.metricsManager, t.id)
// Submit the job against the task.managesWork
errors := t.manager.Work(j).Promise().Await()
// Check for errors and update the task
if len(errors) != 0 {
// Note: we just update the error tracking, the change of state for a task is handled before this is reached.
t.failedRuns++
t.lastFailureTime = t.lastFireTime
t.lastFailureMessage = errors[len(errors)-1].Error()
// Return and do not proceed further down the workflow
return
}
// Publish nodes cannot contain child nodes (publish is a terminal node)
// so unlike process nodes there is not a call to workJobs here for child nodes.
}

0 comments on commit cb7ead2

Please sign in to comment.