Skip to content
This repository has been archived by the owner on Nov 8, 2022. It is now read-only.

Commit

Permalink
Factored out promise abstraction from queued job.
Browse files Browse the repository at this point in the history
  • Loading branch information
ConnorDoyle committed Dec 22, 2015
1 parent 090dae0 commit 8ba0f97
Show file tree
Hide file tree
Showing 2 changed files with 127 additions and 0 deletions.
63 changes: 63 additions & 0 deletions pkg/promise/promise.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package promise

import (
"sync"
)

// A disposable write-once latch, to act as a synchronization
// barrier to signal completion of some asynchronous operation
// (successful or otherwise).
//
// Functions that operate on this type (IsComplete, Complete,
// Await) are idempotent and thread-safe.
type Promise interface {
IsComplete() bool
Complete(errors []error)
Await() []error
AndThen(f func([]error))
}

func NewPromise() Promise {
return &promise{
complete: false,
completeChan: make(chan struct{}),
}
}

type promise struct {
sync.Mutex

complete bool
errors []error
completeChan chan struct{}
}

// Returns whether this promise is complete yet, without blocking.
func (p *promise) IsComplete() bool {
return p.complete
}

// Unblock all goroutines awaiting promise completion.
func (p *promise) Complete(errors []error) {
p.Lock()
defer p.Unlock()

if !p.complete {
p.complete = true
p.errors = errors
close(p.completeChan)
}
}

// Blocks the caller until the promise is marked complete.
func (p *promise) Await() []error {
<-p.completeChan
return p.errors
}

// Invokes the supplied function after this promise completes.
func (p *promise) AndThen(f func([]error)) {
go func() {
f(p.Await())
}()
}
64 changes: 64 additions & 0 deletions pkg/promise/promise_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package promise

import (
"sync"
"testing"

. "github.com/smartystreets/goconvey/convey"
)

func TestPromise(t *testing.T) {
Convey("IsComplete()", t, func() {
Convey("it should return the completion status", func() {
p := NewPromise()
So(p.IsComplete(), ShouldBeFalse)
p.Complete([]error{})
So(p.IsComplete(), ShouldBeTrue)
})
})
Convey("Complete()", t, func() {
Convey("it should unblock any waiting goroutines", func() {
p := NewPromise()

numWaiters := 3
var wg sync.WaitGroup
wg.Add(numWaiters)

for i := 0; i < numWaiters; i++ {
go func() {
Convey("all waiting goroutines should see empty errors", t, func() {
errors := p.Await()
So(errors, ShouldBeEmpty)
wg.Done()
})
}()
}

p.Complete([]error{})
wg.Wait()
})
})
Convey("AndThen()", t, func() {
Convey("it should defer the supplied closure until after completion", func() {
p := NewPromise()

funcRan := false
c := make(chan struct{})

p.AndThen(func(errors []error) {
funcRan = true
close(c)
})

// The callback should not have been executed yet.
So(funcRan, ShouldBeFalse)

// Trigger callback execution by completing the queued job.
p.Complete([]error{})

// Wait for the deferred function to be executed.
<-c
So(funcRan, ShouldBeTrue)
})
})
}

0 comments on commit 8ba0f97

Please sign in to comment.