Skip to content
This repository has been archived by the owner on Nov 8, 2022. It is now read-only.

Commit

Permalink
Add errors to dropped overdue jobs.
Browse files Browse the repository at this point in the history
- Added AddErrors() to job interface.
  • Loading branch information
ConnorDoyle committed Jan 13, 2016
1 parent 0ce4f77 commit 86e3927
Show file tree
Hide file tree
Showing 5 changed files with 44 additions and 9 deletions.
18 changes: 14 additions & 4 deletions scheduler/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"bytes"
"encoding/gob"
"fmt"
"sync"
"time"

log "github.com/Sirupsen/logrus"
Expand Down Expand Up @@ -80,6 +81,7 @@ func (j *qj) Promise() Promise {
// the scheduler. Job encompasses all
// all job types -- collect, process, and publish.
type job interface {
AddErrors(errs ...error)
Errors() []error
StartTime() time.Time
Deadline() time.Time
Expand All @@ -90,6 +92,8 @@ type job interface {
type jobType int

type coreJob struct {
sync.Mutex

jtype jobType
deadline time.Time
starttime time.Time
Expand Down Expand Up @@ -117,6 +121,12 @@ func (c *coreJob) Type() jobType {
return c.jtype
}

func (c *coreJob) AddErrors(errs ...error) {
c.Lock()
defer c.Unlock()
c.errors = append(c.errors, errs...)
}

func (c *coreJob) Errors() []error {
return c.errors
}
Expand Down Expand Up @@ -202,7 +212,7 @@ func (c *collectorJob) Run() {
"error": e,
}).Error("collector run error")
}
c.errors = errs
c.AddErrors(errs...)
}
}

Expand Down Expand Up @@ -273,7 +283,7 @@ func (p *processJob) Run() {
"error": e.Error(),
}).Error("error with processor job")
}
p.errors = append(p.errors, errs...)
p.AddErrors(errs...)
}
p.content = content
default:
Expand Down Expand Up @@ -366,7 +376,7 @@ func (p *publisherJob) Run() {
"error": e.Error(),
}).Error("error with publisher job")
}
p.errors = append(p.errors, errs...)
p.AddErrors(errs...)
}
default:
log.WithFields(log.Fields{
Expand Down Expand Up @@ -397,7 +407,7 @@ func (p *publisherJob) Run() {
"error": e.Error(),
}).Error("error with publisher job")
}
p.errors = append(p.errors, errs...)
p.AddErrors(errs...)
}
}
default:
Expand Down
16 changes: 16 additions & 0 deletions scheduler/job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ limitations under the License.
package scheduler

import (
"errors"
"testing"
"time"

Expand Down Expand Up @@ -75,6 +76,21 @@ func TestCollectorJob(t *testing.T) {
So(cj.Errors(), ShouldResemble, []error{})
})
})
Convey("AddErrors()", t, func() {
Convey("it should append errors to the job", func() {
cj := newCollectorJob([]core.RequestedMetric{}, defaultDeadline, &mockCollector{}, cdt)
So(cj.Errors(), ShouldResemble, []error{})

e1 := errors.New("1")
e2 := errors.New("2")
e3 := errors.New("3")

cj.AddErrors(e1)
So(cj.Errors(), ShouldResemble, []error{e1})
cj.AddErrors(e2, e3)
So(cj.Errors(), ShouldResemble, []error{e1, e2, e3})
})
})
Convey("Run()", t, func() {
Convey("it should complete without errors", func() {
cj := newCollectorJob([]core.RequestedMetric{}, defaultDeadline, &mockCollector{}, cdt)
Expand Down
5 changes: 5 additions & 0 deletions scheduler/work_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,11 @@ func newMultiSyncMockJob(n int) *mockJob {
}
}

func (mj *mockJob) AddErrors(errs ...error) {
mj.Lock()
defer mj.Unlock()
mj.errors = append(mj.errors, errs...)
}
func (mj *mockJob) Errors() []error { return mj.errors }
func (mj *mockJob) StartTime() time.Time { return mj.starttime }
func (mj *mockJob) Deadline() time.Time { return mj.deadline }
Expand Down
9 changes: 6 additions & 3 deletions scheduler/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ limitations under the License.
package scheduler

import (
"errors"

"github.com/intelsdi-x/snap/pkg/chrono"
"github.com/pborman/uuid"
)
Expand Down Expand Up @@ -48,11 +50,12 @@ func (w *worker) start() {
// assert that deadline is not exceeded
if chrono.Chrono.Now().Before(q.Job().Deadline()) {
q.Job().Run()
} else {
// the deadline was exceeded and this job will not run
q.Job().AddErrors(errors.New("Worker refused to run overdue job."))
}

// mark the job complete for one of two reasons:
// - this job was just run
// - the deadline was exceeded and this job will not run
// mark the job complete
q.Promise().Complete(q.Job().Errors())

// the single kill-channel -- used when resizing worker pools
Expand Down
5 changes: 3 additions & 2 deletions scheduler/worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,9 @@ func TestWorker(t *testing.T) {
chrono.Chrono.Forward(1500 * time.Millisecond)
qj := newQueuedJob(mj)
rcv <- qj
qj.Promise().Await()
So(mj.worked, ShouldEqual, false)
errors := qj.Promise().Await()
So(errors, ShouldNotBeEmpty)
So(mj.worked, ShouldBeFalse)
})
Convey("stops the worker if kamikaze chan is closed", t, func() {
workerKillChan = make(chan struct{})
Expand Down

0 comments on commit 86e3927

Please sign in to comment.