diff --git a/internal/pusher/v2/condition.go b/internal/pusher/v2/condition.go new file mode 100644 index 00000000..ec2742c9 --- /dev/null +++ b/internal/pusher/v2/condition.go @@ -0,0 +1,22 @@ +package v2 + +// condition is a simple, channel-based condition variable. +type condition chan struct{} + +// Signal signals the condition. Waking up a waiting goroutine. +func (c condition) Signal() { + select { + case c <- struct{}{}: + default: + // Already signaled + } +} + +// C returns the channel used for waiting. +func (c condition) C() <-chan struct{} { + return c +} + +func newCondition() condition { + return make(chan struct{}, 1) +} diff --git a/internal/pusher/v2/condition_test.go b/internal/pusher/v2/condition_test.go new file mode 100644 index 00000000..4a9d6d7f --- /dev/null +++ b/internal/pusher/v2/condition_test.go @@ -0,0 +1,68 @@ +package v2 + +import ( + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +func TestCondition(t *testing.T) { + const timeout = time.Second + t.Run("don't fire until signaled", func(t *testing.T) { + c := newCondition() + wait(t, &c, false, timeout) + }) + t.Run("fire after signaled", func(t *testing.T) { + c := newCondition() + c.Signal() + wait(t, &c, true, timeout) + }) + t.Run("single fire", func(t *testing.T) { + c := newCondition() + c.Signal() + c.Signal() + wait(t, &c, true, timeout) + wait(t, &c, false, timeout) + }) + t.Run("single fire", func(t *testing.T) { + c := newCondition() + c.Signal() + c.Signal() + wait(t, &c, true, timeout) + wait(t, &c, false, timeout) + }) + t.Run("goroutine", func(t *testing.T) { + c := newCondition() + go func() { + c.Signal() + }() + wait(t, &c, true, timeout) + }) + t.Run("loop", func(t *testing.T) { + const iterations = 100 + a, b := newCondition(), newCondition() + go func() { + for i := 0; i < iterations; i++ { + a.Signal() + wait(t, &b, true, timeout) + } + }() + for i := 0; i < iterations; i++ { + wait(t, &a, true, timeout) + b.Signal() + } + }) +} + +func wait(t *testing.T, c *condition, fire bool, timeout time.Duration) { + tm := time.NewTimer(timeout) + defer tm.Stop() + fired := false + select { + case <-tm.C: + case <-c.C(): + fired = true + } + require.Equal(t, fire, fired) +}