Skip to content
This repository has been archived by the owner on Nov 8, 2022. It is now read-only.

Commit

Permalink
Guarantee queueing error in work manager test.
Browse files Browse the repository at this point in the history
- Adds a new RendezVous abstraction in pkg/promise to make it
  easier for coordinating goroutines to synchronize.
  • Loading branch information
ConnorDoyle committed Dec 23, 2015
1 parent 4670e35 commit d94bd63
Show file tree
Hide file tree
Showing 5 changed files with 135 additions and 38 deletions.
39 changes: 39 additions & 0 deletions pkg/promise/promise.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,3 +61,42 @@ func (p *promise) AndThen(f func([]error)) {
f(p.Await())
}()
}

// A reciprocal promise that makes it easy for two coordinating routines
// A and B to wait on each other before proceeding.
type RendezVous interface {
IsComplete() bool
A()
B()
}

func NewRendezVous() RendezVous {
return &rendezVous{
a: NewPromise(),
b: NewPromise(),
}
}

type rendezVous struct {
a Promise
b Promise
}

// Returns whether this rendez-vous is complete yet, without blocking.
func (r *rendezVous) IsComplete() bool {
return r.a.IsComplete() && r.b.IsComplete()
}

// Complete process A's half of the rendez-vous, and block until process
// B has done the same.
func (r *rendezVous) A() {
r.a.Complete([]error{})
r.b.Await()
}

// Complete process B's half of the rendez-vous, and block until process
// A has done the same.
func (r *rendezVous) B() {
r.b.Complete([]error{})
r.a.Await()
}
28 changes: 28 additions & 0 deletions pkg/promise/promise_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,3 +62,31 @@ func TestPromise(t *testing.T) {
})
})
}

func TestRendezVous(t *testing.T) {
Convey("IsComplete()", t, func() {
Convey("it should return the completion status", func() {
r := NewRendezVous()
So(r.IsComplete(), ShouldBeFalse)
go r.A()
r.B()
So(r.IsComplete(), ShouldBeTrue)
})
})
Convey("A() and B()", t, func() {
Convey("it should synchronize concurrent processes", func() {
r1, r2 := NewRendezVous(), NewRendezVous()
evidence := false

go func() {
r1.A()
evidence = true
r2.A()
}()

r1.B()
r2.B()
So(evidence, ShouldBeTrue)
})
})
}
2 changes: 1 addition & 1 deletion scheduler/work_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import "sync"

/*
caller [optional: qj.Await()]
caller [optional: qj.Promise().Await()]
^
|
|
Expand Down
100 changes: 65 additions & 35 deletions scheduler/work_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ limitations under the License.
package scheduler

import (
"sync"
"testing"
"time"

Expand All @@ -28,33 +29,46 @@ import (
. "github.com/smartystreets/goconvey/convey"
)

// A mockJob can be either asynchronous or synchronous (will not return from
// Run() until a caller is ready in Await()).
//
// Test code can rendez-vous with synchronous mockJobs in Run()
// by invoking Await(), which also unblocks the worker executing
// the job.
//
// For asynchronous mockJobs, Await() blocks the caller until the
// job is completed in Run().
type mockJob struct {
sync.Mutex

errors []error
sync bool
worked bool
deadline time.Time
starttime time.Time
completePromise Promise
syncPromise Promise
numSyncs int
rvs []RendezVous
}

// Create an asynchronous mockJob.
//
// The returned job will NOT block waiting for any calls to RendezVous()
// when Run() is invoked.
func newMockJob() *mockJob {
return newMultiSyncMockJob(0)
}

func newMockJob(sync bool) *mockJob {
// Create a synchronous mockJob.
//
// The returned job WILL block waiting for the supplied number of calls
// to RendezVous() when Run() is invoked before completing.
//
// All callers of Await() will also be transitively blocked by these
// rendez-vous steps.
func newMultiSyncMockJob(n int) *mockJob {
rvs := make([]RendezVous, n)
for i := 0; i < n; i++ {
rvs[i] = NewRendezVous()
}

return &mockJob{
sync: sync,
worked: false,
deadline: time.Now().Add(1 * time.Second),
starttime: time.Now(),
completePromise: NewPromise(),
syncPromise: NewPromise(),
numSyncs: 0,
rvs: rvs,
}
}

Expand All @@ -63,16 +77,26 @@ func (mj *mockJob) StartTime() time.Time { return mj.starttime }
func (mj *mockJob) Deadline() time.Time { return mj.deadline }
func (mj *mockJob) Type() jobType { return collectJobType }

// Complete the first incomplete rendez-vous (if there is one)
func (mj *mockJob) RendezVous() {
mj.Lock()
defer mj.Unlock()

if mj.numSyncs < len(mj.rvs) {
mj.rvs[mj.numSyncs].B()
mj.numSyncs++
}
}

func (mj *mockJob) Await() {
mj.syncPromise.Complete([]error{})
mj.completePromise.Await()
}

func (mj *mockJob) Run() {
mj.worked = true
if mj.sync {
mj.syncPromise.Await()
for _, rv := range mj.rvs {
rv.A()
}
mj.worked = true
mj.completePromise.Complete([]error{})
}

Expand All @@ -81,7 +105,7 @@ func TestWorkerManager(t *testing.T) {
Convey(".Work()", t, func() {
Convey("Sends / receives work to / from worker", func() {
manager := newWorkManager()
j := newMockJob(false)
j := newMockJob()
manager.Work(j)
j.Await()
So(j.worked, ShouldEqual, true)
Expand All @@ -92,32 +116,38 @@ func TestWorkerManager(t *testing.T) {
manager := newWorkManager(CollectQSizeOption(1), CollectWkrSizeOption(1))
manager.Start()

j1 := newMockJob(true) // j1 blocks in Run() until Await() is invoked.
j2 := newMockJob(false)
j3 := newMockJob(false)
j1 := newMultiSyncMockJob(2) // j1 will block in Run() twice.
j2 := newMockJob()
j3 := newMockJob()

// Submit three jobs.
qjs := []queuedJob{}
qjs = append(qjs, manager.Work(j1))
qjs = append(qjs, manager.Work(j2))
qjs = append(qjs, manager.Work(j3))
qj1 := manager.Work(j1)
j1.RendezVous() // First RendezVous with j1 in Run().
qj2 := manager.Work(j2)
qj3 := manager.Work(j3)

// Await completion of j1 (also unblocking j1.Run()).
j1.Await()
// Wait for the third queued job to be marked complete,
// "out-of-order" and with errors.
errs3 := qj3.Promise().Await()
So(errs3, ShouldNotBeEmpty)

// Wait for all queued jobs to be marked complete.
for _, qj := range qjs {
qj.Promise().Await()
}
j1.RendezVous() // Second RendezVous with j1 (unblocks j1.Run()).

errs1 := qj1.Promise().Await()
So(errs1, ShouldBeEmpty)

errs2 := qj2.Promise().Await()
So(errs2, ShouldBeEmpty)

// The work queue should be empty at this point.
So(manager.collectq.items, ShouldBeEmpty)

// The first job should have been worked.
// The first and second jobs should have been worked.
So(j1.worked, ShouldBeTrue)
So(j2.worked, ShouldBeTrue)

// At least one of the second and third jobs should have been dropped.
So(j2.worked && j3.worked, ShouldBeFalse)
// The third job should have been dropped.
So(j3.worked, ShouldBeFalse)
})

// The below convey is WIP
Expand Down
4 changes: 2 additions & 2 deletions scheduler/worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func TestWorker(t *testing.T) {
rcv := make(chan queuedJob)
w := newWorker(rcv)
go w.start()
mj := newMockJob(false)
mj := newMockJob()
rcv <- newQueuedJob(mj)
mj.Await()
So(mj.worked, ShouldEqual, true)
Expand All @@ -52,7 +52,7 @@ func TestWorker(t *testing.T) {
rcv := make(chan queuedJob)
w := newWorker(rcv)
go w.start()
mj := newMockJob(false)
mj := newMockJob()
// Time travel 1.5 seconds.
chrono.Chrono.Forward(1500 * time.Millisecond)
qj := newQueuedJob(mj)
Expand Down

0 comments on commit d94bd63

Please sign in to comment.