Skip to content
This repository has been archived by the owner on Nov 8, 2022. It is now read-only.

Commit

Permalink
Adds Name(), Version(), TypeString() to scheduler.job to allow for be…
Browse files Browse the repository at this point in the history
…tter logging during execution.
  • Loading branch information
lynxbat committed Mar 6, 2016
1 parent 1da6589 commit abe8e0e
Showing 1 changed file with 67 additions and 44 deletions.
111 changes: 67 additions & 44 deletions scheduler/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,10 @@ type job interface {
Errors() []error
StartTime() time.Time
Deadline() time.Time
Name() string
Version() int
Type() jobType
TypeString() string
TaskID() string
Run()
}
Expand All @@ -94,16 +97,20 @@ type jobType int

type coreJob struct {
sync.Mutex
name string
version int
taskID string
jtype jobType
deadline time.Time
starttime time.Time
errors []error
}

func newCoreJob(t jobType, deadline time.Time, taskID string) *coreJob {
func newCoreJob(t jobType, deadline time.Time, taskID string, name string, version int) *coreJob {
return &coreJob{
jtype: t,
name: name,
version: version,
deadline: deadline,
taskID: taskID,
errors: make([]error, 0),
Expand All @@ -119,10 +126,32 @@ func (c *coreJob) Deadline() time.Time {
return c.deadline
}

func (c *coreJob) Name() string {
return c.name
}

func (c *coreJob) Version() int {
return c.version
}

func (c *coreJob) Type() jobType {
return c.jtype
}

func (c *coreJob) TypeString() string {
switch c.jtype {
case collectJobType:
return "collector"

case processJobType:
return "processor"

case publishJobType:
return "publisher"
}
return "unknown"
}

func (c *coreJob) AddErrors(errs ...error) {
c.Lock()
defer c.Unlock()
Expand Down Expand Up @@ -150,7 +179,7 @@ func newCollectorJob(metricTypes []core.RequestedMetric, deadlineDuration time.D
collector: collector,
metricTypes: metricTypes,
metrics: []core.Metric{},
coreJob: newCoreJob(collectJobType, time.Now().Add(deadlineDuration), taskID),
coreJob: newCoreJob(collectJobType, time.Now().Add(deadlineDuration), taskID, "", 0),
configDataTree: cdt,
}
}
Expand Down Expand Up @@ -236,14 +265,12 @@ type processJob struct {

func newProcessJob(parentJob job, pluginName string, pluginVersion int, contentType string, config map[string]ctypes.ConfigValue, processor processesMetrics, taskID string) job {
return &processJob{
parentJob: parentJob,
pluginName: pluginName,
pluginVersion: pluginVersion,
metrics: []core.Metric{},
coreJob: newCoreJob(processJobType, parentJob.Deadline(), taskID),
config: config,
processor: processor,
contentType: contentType,
parentJob: parentJob,
metrics: []core.Metric{},
coreJob: newCoreJob(processJobType, parentJob.Deadline(), taskID, pluginName, pluginVersion),
config: config,
processor: processor,
contentType: contentType,
}
}

Expand All @@ -253,8 +280,8 @@ func (p *processJob) Run() {
"block": "run",
"job-type": "processor",
"content-type": p.contentType,
"plugin-name": p.pluginName,
"plugin-version": p.pluginVersion,
"plugin-name": p.name,
"plugin-version": p.version,
"plugin-config": p.config,
}).Debug("starting processor job")

Expand Down Expand Up @@ -283,8 +310,8 @@ func (p *processJob) Run() {
"block": "run",
"job-type": "processor",
"content-type": p.contentType,
"plugin-name": p.pluginName,
"plugin-version": p.pluginVersion,
"plugin-name": p.name,
"plugin-version": p.version,
"plugin-config": p.config,
"error": e.Error(),
}).Error("error with processor job")
Expand All @@ -298,8 +325,8 @@ func (p *processJob) Run() {
"block": "run",
"job-type": "processor",
"content-type": p.contentType,
"plugin-name": p.pluginName,
"plugin-version": p.pluginVersion,
"plugin-name": p.name,
"plugin-version": p.version,
"plugin-config": p.config,
}).Fatal("unsupported content type")
panic(fmt.Sprintf("unsupported content type. {plugin name: %s version: %v content-type: '%v'}", p.pluginName, p.pluginVersion, p.contentType))
Expand All @@ -310,8 +337,8 @@ func (p *processJob) Run() {
"block": "run",
"job-type": "processor",
"content-type": p.contentType,
"plugin-name": p.pluginName,
"plugin-version": p.pluginVersion,
"plugin-name": p.name,
"plugin-version": p.version,
"plugin-config": p.config,
"parent-job-type": p.parentJob.Type(),
}).Fatal("unsupported parent job type")
Expand All @@ -321,23 +348,19 @@ func (p *processJob) Run() {

type publisherJob struct {
*coreJob
parentJob job
publisher publishesMetrics
pluginName string
pluginVersion int
config map[string]ctypes.ConfigValue
contentType string
parentJob job
publisher publishesMetrics
config map[string]ctypes.ConfigValue
contentType string
}

func newPublishJob(parentJob job, pluginName string, pluginVersion int, contentType string, config map[string]ctypes.ConfigValue, publisher publishesMetrics, taskID string) job {
return &publisherJob{
parentJob: parentJob,
publisher: publisher,
pluginName: pluginName,
pluginVersion: pluginVersion,
coreJob: newCoreJob(publishJobType, parentJob.Deadline(), taskID),
config: config,
contentType: contentType,
parentJob: parentJob,
publisher: publisher,
coreJob: newCoreJob(publishJobType, parentJob.Deadline(), taskID, pluginName, pluginVersion),
config: config,
contentType: contentType,
}
}

Expand All @@ -347,8 +370,8 @@ func (p *publisherJob) Run() {
"block": "run",
"job-type": "publisher",
"content-type": p.contentType,
"plugin-name": p.pluginName,
"plugin-version": p.pluginVersion,
"plugin-name": p.name,
"plugin-version": p.version,
"plugin-config": p.config,
}).Debug("starting publisher job")
var buf bytes.Buffer
Expand All @@ -368,16 +391,16 @@ func (p *publisherJob) Run() {
}
}
enc.Encode(metrics)
errs := p.publisher.PublishMetrics(p.contentType, buf.Bytes(), p.pluginName, p.pluginVersion, p.config, p.taskID)
errs := p.publisher.PublishMetrics(p.contentType, buf.Bytes(), p.name, p.version, p.config, p.taskID)
if errs != nil {
for _, e := range errs {
log.WithFields(log.Fields{
"_module": "scheduler-job",
"block": "run",
"job-type": "publisher",
"content-type": p.contentType,
"plugin-name": p.pluginName,
"plugin-version": p.pluginVersion,
"plugin-name": p.name,
"plugin-version": p.version,
"plugin-config": p.config,
"error": e.Error(),
}).Error("error with publisher job")
Expand All @@ -390,25 +413,25 @@ func (p *publisherJob) Run() {
"block": "run",
"job-type": "publisher",
"content-type": p.contentType,
"plugin-name": p.pluginName,
"plugin-version": p.pluginVersion,
"plugin-name": p.name,
"plugin-version": p.version,
"plugin-config": p.config,
}).Fatal("unsupported content type")
panic(fmt.Sprintf("unsupported content type. {plugin name: %s version: %v content-type: '%v'}", p.pluginName, p.pluginVersion, p.contentType))
panic(fmt.Sprintf("unsupported content type. {plugin name: %s version: %v content-type: '%v'}", p.name, p.version, p.contentType))
}
case processJobType:
switch p.contentType {
case plugin.SnapGOBContentType:
errs := p.publisher.PublishMetrics(p.contentType, p.parentJob.(*processJob).content, p.pluginName, p.pluginVersion, p.config, p.taskID)
errs := p.publisher.PublishMetrics(p.contentType, p.parentJob.(*processJob).content, p.name, p.version, p.config, p.taskID)
if errs != nil {
for _, e := range errs {
log.WithFields(log.Fields{
"_module": "scheduler-job",
"block": "run",
"job-type": "publisher",
"content-type": p.contentType,
"plugin-name": p.pluginName,
"plugin-version": p.pluginVersion,
"plugin-name": p.name,
"plugin-version": p.version,
"plugin-config": p.config,
"error": e.Error(),
}).Error("error with publisher job")
Expand All @@ -422,8 +445,8 @@ func (p *publisherJob) Run() {
"block": "run",
"job-type": "publisher",
"content-type": p.contentType,
"plugin-name": p.pluginName,
"plugin-version": p.pluginVersion,
"plugin-name": p.name,
"plugin-version": p.version,
"plugin-config": p.config,
"parent-job-type": p.parentJob.Type(),
}).Fatal("unsupported parent job type")
Expand Down

0 comments on commit abe8e0e

Please sign in to comment.