Skip to content
This repository has been archived by the owner on Nov 8, 2022. It is now read-only.

Commit

Permalink
Adds debug logging to scheduler.workflow.
Browse files Browse the repository at this point in the history
  • Loading branch information
lynxbat committed Mar 6, 2016
1 parent c539df3 commit a61f835
Showing 1 changed file with 76 additions and 0 deletions.
76 changes: 76 additions & 0 deletions scheduler/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"fmt"
"sync"

log "github.com/Sirupsen/logrus"
"github.com/intelsdi-x/gomit"

"github.com/intelsdi-x/snap/control/plugin"
Expand All @@ -44,6 +45,8 @@ const (

// WorkflowStateLookup map and error vars
var (
workflowLogger = schedulerLogger.WithField("_module", "scheduler-workflow")

WorkflowStateLookup = map[WorkflowState]string{
WorkflowStopped: "Stopped",
WorkflowStarted: "Started",
Expand Down Expand Up @@ -299,6 +302,11 @@ func bindPluginContentTypes(pus []*publishNode, prs []*processNode, mm managesPl

// Start starts a workflow
func (s *schedulerWorkflow) Start(t *task) {
workflowLogger.WithFields(log.Fields{
"_block": "workflow-start",
"task-id": t.id,
"task-name": t.name,
}).Info(fmt.Sprintf("Starting workflow for task (%s\\%s)", t.id, t.name))
s.state = WorkflowStarted
j := newCollectorJob(s.metrics, t.deadlineDuration, t.metricsManager, t.workflow.configTree, t.id)

Expand Down Expand Up @@ -336,8 +344,20 @@ func (s *schedulerWorkflow) StateString() string {
// workJobs takes a slice of proccess and publish nodes and submits jobs for each for a task.
// It then iterates down any process nodes to submit their child node jobs for the task
func workJobs(prs []*processNode, pus []*publishNode, t *task, pj job) {
// optimize for no jobs
if len(prs) == 0 && len(pus) == 0 {
return
}
// Create waitgroup to block until all jobs are submitted
wg := &sync.WaitGroup{}
workflowLogger.WithFields(log.Fields{
"_block": "work-jobs",
"task-id": t.id,
"task-name": t.name,
"count-process-nodes": len(prs),
"count-publish-nodes": len(pus),
"parent-node-type": pj.TypeString(),
}).Debug("Batch submission of process and publish nodes")
// range over the process jobs and call submitProcessJob
for _, pr := range prs {
// increment the wait group (before starting goroutine to prevent a race condition)
Expand All @@ -354,22 +374,54 @@ func workJobs(prs []*processNode, pus []*publishNode, t *task, pj job) {
}
// Wait until all job submisson goroutines are done
wg.Wait()
workflowLogger.WithFields(log.Fields{
"_block": "work-jobs",
"task-id": t.id,
"task-name": t.name,
"count-process-nodes": len(prs),
"count-publish-nodes": len(pus),
"parent-node-type": pj.TypeString(),
}).Debug("Batch submission complete")
}

func submitProcessJob(pj job, t *task, wg *sync.WaitGroup, pr *processNode) {
// Decrement the waitgroup
defer wg.Done()
// Create a new process job
j := newProcessJob(pj, pr.Name(), pr.Version(), pr.InboundContentType, pr.config.Table(), t.metricsManager, t.id)
workflowLogger.WithFields(log.Fields{
"_block": "submit-process-job",
"task-id": t.id,
"task-name": t.name,
"process-name": pr.Name(),
"process-version": pr.Version(),
"parent-node-type": pj.TypeString(),
}).Debug("Submitting process job")
// Submit the job against the task.managesWork
errors := t.manager.Work(j).Promise().Await()
// Check for errors and update the task
if len(errors) != 0 {
// Record the failures in the task
// note: this function is thread safe against t
t.RecordFailure(errors)
workflowLogger.WithFields(log.Fields{
"_block": "submit-process-job",
"task-id": t.id,
"task-name": t.name,
"process-name": pr.Name(),
"process-version": pr.Version(),
"parent-node-type": pj.TypeString(),
}).Warn("Process job failed")
return
}
workflowLogger.WithFields(log.Fields{
"_block": "submit-process-job",
"task-id": t.id,
"task-name": t.name,
"process-name": pr.Name(),
"process-version": pr.Version(),
"parent-node-type": pj.TypeString(),
}).Debug("Process job completed")
// Iterate into any child process or publish nodes
workJobs(pr.ProcessNodes, pr.PublishNodes, t, j)
}
Expand All @@ -379,15 +431,39 @@ func submitPublishJob(pj job, t *task, wg *sync.WaitGroup, pu *publishNode) {
defer wg.Done()
// Create a new process job
j := newPublishJob(pj, pu.Name(), pu.Version(), pu.InboundContentType, pu.config.Table(), t.metricsManager, t.id)
workflowLogger.WithFields(log.Fields{
"_block": "submit-publish-job",
"task-id": t.id,
"task-name": t.name,
"publish-name": pu.Name(),
"publish-version": pu.Version(),
"parent-node-type": pj.TypeString(),
}).Debug("Submitting publish job")
// Submit the job against the task.managesWork
errors := t.manager.Work(j).Promise().Await()
// Check for errors and update the task
if len(errors) != 0 {
// Record the failures in the task
// note: this function is thread safe against t
t.RecordFailure(errors)
workflowLogger.WithFields(log.Fields{
"_block": "submit-publish-job",
"task-id": t.id,
"task-name": t.name,
"publish-name": pu.Name(),
"publish-version": pu.Version(),
"parent-node-type": pj.TypeString(),
}).Warn("Publish job failed")
return
}
workflowLogger.WithFields(log.Fields{
"_block": "submit-publish-job",
"task-id": t.id,
"task-name": t.name,
"publish-name": pu.Name(),
"publish-version": pu.Version(),
"parent-node-type": pj.TypeString(),
}).Debug("Publish job completed")
// Publish nodes cannot contain child nodes (publish is a terminal node)
// so unlike process nodes there is not a call to workJobs here for child nodes.
}

0 comments on commit a61f835

Please sign in to comment.