Skip to content

Commit

Permalink
loader: make AddUpdateCallback thread-safe and don't block signaling
Browse files Browse the repository at this point in the history
This commit makes the Loader.AddUpdateCallback() method thread-safe and
ensures that signaling callbacks never blocks. Previously, if signaling
a callback blocked it would deadlock the Loader.
  • Loading branch information
Charlie Vieth committed Aug 14, 2020
1 parent 601c1e1 commit 2eca70a
Show file tree
Hide file tree
Showing 3 changed files with 197 additions and 7 deletions.
3 changes: 2 additions & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
sudo: required
language: go
go:
- "1.12"
- "1.13"
- "1.14"
- "1.15"

env:
- GO111MODULE=on
Expand Down
61 changes: 55 additions & 6 deletions loader/loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"path/filepath"
"strconv"
"strings"
"sync"
"sync/atomic"

"github.com/fsnotify/fsnotify"
Expand All @@ -31,14 +32,62 @@ func newLoaderStats(scope stats.Scope) loaderStats {
return ret
}

type callbacks struct {
mu sync.Mutex
cbs []chan<- struct{}
}

func notifyCallback(notify <-chan struct{}, callback chan<- int) {
for range notify {
callback <- 1 // potentially blocking send
}
}

func (c *callbacks) Add(callback chan<- int) {
//
// We cannot rely on sends to the user provided callback to not block and
// we guarantee that the callback will be signaled if there is a runtime
// change.
//
// The issue is that if the user provided callback blocks, we deadlock.
//
// To handle this we use our own buffered channel and a separate goroutine
// to signal the callback. If the callback blocks it may not be signaled
// for every update, but it will be signaled at least once. This is close
// enough to the original API contract to warrant the change and prevent
// deadlocks.
//
notify := make(chan struct{}, 1)
c.mu.Lock()
c.cbs = append(c.cbs, notify)
c.mu.Unlock()
go notifyCallback(notify, callback)
}

// Signal all callback channels without blocking.
func (c *callbacks) Signal() {
c.mu.Lock()
for _, ch := range c.cbs {
select {
case ch <- struct{}{}:
// The callback will be signaled (at some point).
default:
// We're still waiting for a previous signal to be sent, dropping
// this signal.
}
}
c.mu.Unlock()
}

// Implementation of Loader that watches a symlink and reads from the filesystem.
type Loader struct {
currentSnapshot atomic.Value
watcher *fsnotify.Watcher
watchPath string
subdirectory string
nextSnapshot snapshot.IFace
callbacks []chan<- int
callbacks callbacks
mu sync.Mutex
stats loaderStats
ignoreDotfiles bool
}
Expand All @@ -49,7 +98,10 @@ func (l *Loader) Snapshot() snapshot.IFace {
}

func (l *Loader) AddUpdateCallback(callback chan<- int) {
l.callbacks = append(l.callbacks, callback)
if callback == nil {
panic("goruntime/loader: nil callback")
}
l.callbacks.Add(callback)
}

func (l *Loader) onRuntimeChanged() {
Expand All @@ -63,10 +115,7 @@ func (l *Loader) onRuntimeChanged() {
l.currentSnapshot.Store(l.nextSnapshot)

l.nextSnapshot = nil
for _, callback := range l.callbacks {
// Arbitrary integer just to wake up channel.
callback <- 1
}
l.callbacks.Signal()
}

type walkError struct {
Expand Down
140 changes: 140 additions & 0 deletions loader/loader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"io/ioutil"
"os"
"path/filepath"
"sync"
"sync/atomic"
"testing"

"sort"
Expand Down Expand Up @@ -191,6 +193,144 @@ func TestDirectoryRefresher(t *testing.T) {
assert.Equal("hello3", snapshot.Get("file2"))
}

func TestOnRuntimeChanged(t *testing.T) {
tmpdir, err := ioutil.TempDir("", "goruntime-*")
if err != nil {
t.Fatal(err)
}

defer func() {
if err := os.RemoveAll(tmpdir); err != nil {
t.Error(err)
}
}()

dir, base := filepath.Split(tmpdir)
ll := Loader{
watchPath: dir,
subdirectory: base,
stats: newLoaderStats(stats.NewStore(stats.NewNullSink(), false)),
}

const Timeout = time.Second * 3

t.Run("Nil", func(t *testing.T) {
defer func() {
if e := recover(); e == nil {
t.Fatal("expected panic")
}
}()
ll.AddUpdateCallback(nil)
})

t.Run("One", func(t *testing.T) {
cb := make(chan int, 1)
ll.AddUpdateCallback(cb)
go ll.onRuntimeChanged()
select {
case i := <-cb:
if i != 1 {
t.Errorf("Callback: got: %d want: %d", i, 1)
}
case <-time.After(Timeout):
t.Fatalf("Time out after: %s", Timeout)
}
})

t.Run("Blocking", func(t *testing.T) {
done := make(chan struct{})
cb := make(chan int)
ll.AddUpdateCallback(cb)
go func() {
ll.onRuntimeChanged()
close(done)
}()
select {
case <-done:
// Ok
case <-time.After(Timeout):
t.Fatalf("Time out after: %s", Timeout)
}
})

t.Run("Many", func(t *testing.T) {
cbs := make([]chan int, 10)
for i := range cbs {
cbs[i] = make(chan int, 1)
ll.AddUpdateCallback(cbs[i])
}
go ll.onRuntimeChanged()

for _, cb := range cbs {
select {
case i := <-cb:
if i != 1 {
t.Errorf("Callback: got: %d want: %d", i, 1)
}
case <-time.After(Timeout):
t.Fatalf("Time out after: %s", Timeout)
}
}
})

t.Run("ManyDelayed", func(t *testing.T) {
total := new(int64)
wg := new(sync.WaitGroup)
ready := make(chan struct{})

cbs := make([]chan int, 10)

for i := range cbs {
cbs[i] = make(chan int) // blocking
ll.AddUpdateCallback(cbs[i])
wg.Add(1)
go func(cb chan int) {
defer wg.Done()
<-ready
atomic.AddInt64(total, int64(<-cb))
}(cbs[i])
}

done := make(chan struct{})
go func() {
ll.onRuntimeChanged()
close(done)
}()

select {
case <-done:
// Ok
case <-time.After(Timeout):
t.Fatalf("Time out after: %s", Timeout)
}
close(ready)
wg.Wait()

if n := atomic.LoadInt64(total); n != 10 {
t.Errorf("Expected %d channels to be signaled got: %d", 10, n)
}
})

t.Run("ManyBlocking", func(t *testing.T) {
cbs := make([]chan int, 10)
for i := range cbs {
cbs[i] = make(chan int)
ll.AddUpdateCallback(cbs[i])
}
done := make(chan struct{})
go func() {
ll.onRuntimeChanged()
close(done)
}()
select {
case <-done:
// Ok
case <-time.After(Timeout):
t.Fatalf("Time out after: %s", Timeout)
}
})
}

func BenchmarkSnapshot(b *testing.B) {
var ll Loader
for i := 0; i < b.N; i++ {
Expand Down

0 comments on commit 2eca70a

Please sign in to comment.