Skip to content
This repository has been archived by the owner on Nov 8, 2022. It is now read-only.

Commit

Permalink
Made workManager.Work nonblocking.
Browse files Browse the repository at this point in the history
  • Loading branch information
ConnorDoyle committed Dec 9, 2015
1 parent 6b71f41 commit 717dffb
Show file tree
Hide file tree
Showing 10 changed files with 332 additions and 169 deletions.
78 changes: 66 additions & 12 deletions scheduler/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"bytes"
"encoding/gob"
"fmt"
"sync"
"time"

log "github.com/Sirupsen/logrus"
Expand All @@ -43,6 +44,71 @@ const (
defaultDeadline = time.Duration(5 * time.Second)
)

// Represents a queued job, together with a synchronization
// barrier to signal job completion (successful or otherwise).
//
// Functions that operate on this type (IsComplete, Complete,
// Await) are idempotent and thread-safe.
type queuedJob interface {
Job() job
IsComplete() bool
Complete()
Await() []error
AndThen(f func(queuedJob))
}

type qj struct {
sync.Mutex

job job
complete bool
completeChan chan struct{}
}

func newQueuedJob(job job) queuedJob {
return &qj{
job: job,
complete: false,
completeChan: make(chan struct{}),
}
}

// Returns the underlying job.
func (j *qj) Job() job {
return j.job
}

// Returns whether this job is complete yet, without blocking.
func (j *qj) IsComplete() bool {
return j.complete
}

// This function unblocks everyone waiting for job completion.
func (j *qj) Complete() {
j.Lock()
defer j.Unlock()

if !j.complete {
j.complete = true
close(j.completeChan)
}
}

// This function BLOCKS the caller until the job is
// marked complete.
func (j *qj) Await() []error {
<-j.completeChan
return j.Job().Errors()
}

// Invokes the supplied function after the job completes.
func (j *qj) AndThen(f func(queuedJob)) {
go func() {
j.Await()
f(j)
}()
}

// Primary type for job inside
// the scheduler. Job encompasses all
// all job types -- collect, process, and publish.
Expand All @@ -51,7 +117,6 @@ type job interface {
StartTime() time.Time
Deadline() time.Time
Type() jobType
ReplChan() chan struct{}
Run()
}

Expand All @@ -62,7 +127,6 @@ type coreJob struct {
deadline time.Time
starttime time.Time
errors []error
replchan chan struct{}
}

func newCoreJob(t jobType, deadline time.Time) *coreJob {
Expand All @@ -71,7 +135,6 @@ func newCoreJob(t jobType, deadline time.Time) *coreJob {
deadline: deadline,
errors: make([]error, 0),
starttime: time.Now(),
replchan: make(chan struct{}),
}
}

Expand All @@ -87,10 +150,6 @@ func (c *coreJob) Type() jobType {
return c.jtype
}

func (c *coreJob) ReplChan() chan struct{} {
return c.replchan
}

func (c *coreJob) Errors() []error {
return c.errors
}
Expand Down Expand Up @@ -178,7 +237,6 @@ func (c *collectorJob) Run() {
}
c.errors = errs
}
c.replchan <- struct{}{}
}

type processJob struct {
Expand Down Expand Up @@ -276,8 +334,6 @@ func (p *processJob) Run() {
}).Fatal("unsupported parent job type")
panic("unsupported parent job type")
}

p.replchan <- struct{}{}
}

type publisherJob struct {
Expand Down Expand Up @@ -390,6 +446,4 @@ func (p *publisherJob) Run() {
}).Fatal("unsupported parent job type")
panic("unsupported job type")
}

p.replchan <- struct{}{}
}
77 changes: 68 additions & 9 deletions scheduler/job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ limitations under the License.
package scheduler

import (
"sync"
"testing"
"time"

Expand Down Expand Up @@ -63,12 +64,6 @@ func TestCollectorJob(t *testing.T) {
So(cj.Type(), ShouldEqual, collectJobType)
})
})
Convey("ReplChan()", t, func() {
Convey("it should return the reply channel", func() {
cj := newCollectorJob([]core.RequestedMetric{}, defaultDeadline, &mockCollector{}, cdt)
So(cj.ReplChan(), ShouldHaveSameTypeAs, make(chan struct{}))
})
})
// Convey("Metrics()", t, func() {
// Convey("it should return the job metrics", func() {
// cj := newCollectorJob([]core.MetricType{}, defaultDeadline, &mockCollector{})
Expand All @@ -82,11 +77,75 @@ func TestCollectorJob(t *testing.T) {
})
})
Convey("Run()", t, func() {
Convey("it should reply on the reply chan", func() {
Convey("it should complete without errors", func() {
cj := newCollectorJob([]core.RequestedMetric{}, defaultDeadline, &mockCollector{}, cdt)
go cj.(*collectorJob).Run()
<-cj.(*collectorJob).replchan
cj.(*collectorJob).Run()
So(cj.Errors(), ShouldResemble, []error{})
})
})
}

func TestQueuedJob(t *testing.T) {
log.SetLevel(log.FatalLevel)
cdt := cdata.NewTree()
Convey("Job()", t, func() {
Convey("it should return the underlying job", func() {
cj := newCollectorJob([]core.RequestedMetric{}, defaultDeadline, &mockCollector{}, cdt)
qj := newQueuedJob(cj)
So(qj.Job(), ShouldEqual, cj)
})
})
Convey("IsComplete()", t, func() {
Convey("it should return the completion status", func() {
cj := newCollectorJob([]core.RequestedMetric{}, defaultDeadline, &mockCollector{}, cdt)
qj := newQueuedJob(cj)
So(qj.IsComplete(), ShouldBeFalse)
qj.Complete()
So(qj.IsComplete(), ShouldBeTrue)
})
})
Convey("Complete()", t, func() {
Convey("it should unblock any waiting goroutines", func() {
cj := newCollectorJob([]core.RequestedMetric{}, defaultDeadline, &mockCollector{}, cdt)
qj := newQueuedJob(cj)

numWaiters := 3
var wg sync.WaitGroup
wg.Add(numWaiters)

for i := 0; i < numWaiters; i++ {
go func() {
qj.Await()
wg.Done()
}()
}

qj.Complete()
wg.Wait()
})
})
Convey("AndThen()", t, func() {
Convey("it should defer the supplied closure until after completion", func() {
cj := newCollectorJob([]core.RequestedMetric{}, defaultDeadline, &mockCollector{}, cdt)
qj := newQueuedJob(cj)

funcRan := false
c := make(chan struct{})

qj.AndThen(func(queuedJob) {
funcRan = true
close(c)
})

// The callback should not have been executed yet.
So(funcRan, ShouldBeFalse)

// Trigger callback execution by completing the queued job.
qj.Complete()

// Wait for the deferred function to be executed.
<-c
So(funcRan, ShouldBeTrue)
})
})
}
23 changes: 12 additions & 11 deletions scheduler/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,16 @@ var (
errLimitExceeded = errors.New("limit exceeded")
)

type jobHandler func(queuedJob)

type queue struct {
Event chan job
Event chan queuedJob
Err chan *queuingError

handler func(job)
handler jobHandler
limit uint
kill chan struct{}
items []job
items []queuedJob
mutex *sync.Mutex
status queueStatus
}
Expand All @@ -49,8 +51,6 @@ const (
queueWorking // queue is currently being worked (a goroutine is currently inside q.handle())
)

type jobHandler func(job)

type queuingError struct {
Job job
Err error
Expand All @@ -62,13 +62,13 @@ func (qe *queuingError) Error() string {

func newQueue(limit uint, handler jobHandler) *queue {
return &queue{
Event: make(chan job),
Event: make(chan queuedJob),
Err: make(chan *queuingError),

handler: handler,
limit: limit,
kill: make(chan struct{}),
items: []job{},
items: []queuedJob{},
mutex: &sync.Mutex{},
status: queueStopped,
}
Expand Down Expand Up @@ -114,9 +114,10 @@ func (q *queue) start() {
if err := q.push(e); err != nil {
qe := &queuingError{
Err: err,
Job: e,
Job: e.Job(),
}
q.Err <- qe
e.Complete() // Signal job termination.
continue
}

Expand Down Expand Up @@ -159,7 +160,7 @@ func (q *queue) length() int {
return len(q.items)
}

func (q *queue) push(j job) error {
func (q *queue) push(j queuedJob) error {

q.mutex.Lock()
defer q.mutex.Unlock()
Expand All @@ -171,12 +172,12 @@ func (q *queue) push(j job) error {
return errLimitExceeded
}

func (q *queue) pop() (job, error) {
func (q *queue) pop() (queuedJob, error) {

q.mutex.Lock()
defer q.mutex.Unlock()

var j job
var j queuedJob

if q.length() == 0 {
return j, errQueueEmpty
Expand Down
Loading

0 comments on commit 717dffb

Please sign in to comment.