From cf1114ca2f6fcc239ad684bac5623f6a9bdbe21f Mon Sep 17 00:00:00 2001 From: Denis Tingaikin <49399980+denis-tingajkin@users.noreply.github.com> Date: Sun, 18 Oct 2020 00:19:44 +0700 Subject: [PATCH] Fix critical data race conditions + improve performance (#6) * fix datarace conditions Signed-off-by: denis-tingajkin --- go.mod | 5 ++- serialize.go | 98 +++++++++++------------------------------------ serialize_test.go | 22 +++++++++++ 3 files changed, 49 insertions(+), 76 deletions(-) diff --git a/go.mod b/go.mod index c58e045..80229cf 100644 --- a/go.mod +++ b/go.mod @@ -2,4 +2,7 @@ module github.com/edwarnicke/serialize go 1.13 -require go.uber.org/goleak v1.1.10 +require ( + github.com/stretchr/testify v1.4.0 + go.uber.org/goleak v1.1.10 +) diff --git a/serialize.go b/serialize.go index e5c4858..2362e85 100644 --- a/serialize.go +++ b/serialize.go @@ -28,95 +28,43 @@ const ( channelSize = 256 // 256 is chosen because 256*8 = 2kb, or about the cost of a go routing ) -type job struct { - f func() - done chan struct{} -} - // Executor - a struct that can be used to guarantee exclusive, in order execution of functions. type Executor struct { - execCh chan *job - init sync.Once - count int32 - jobs []*job - runningCh chan struct{} - idle bool + orderCh chan func() + buffer []func() + bufferMutex sync.Mutex + init sync.Once + count int32 } // AsyncExec - guarantees f() will be executed Exclusively and in the Order submitted. // It immediately returns a channel that will be closed when f() has completed execution. func (e *Executor) AsyncExec(f func()) <-chan struct{} { - // Initialize *once* e.init.Do(func() { - e.execCh = make(chan *job, channelSize) + e.orderCh = make(chan func(), channelSize) }) - exec := &job{ - f: f, - done: make(chan struct{}), + result := make(chan struct{}) + e.orderCh <- func() { + f() + close(result) } + e.bufferMutex.Lock() + e.buffer = append(e.buffer, <-e.orderCh) + e.bufferMutex.Unlock() // Start go routine if we don't have one if atomic.AddInt32(&e.count, 1) == 1 { - go e.process(exec) - return exec.done - } - - e.execCh <- exec - return exec.done -} - -func (e *Executor) process(exec *job) { - // Selects are expensive. If we only need to execute one thing, - // do it here and be done with it. - e.jobs = []*job{exec} - if e.executeJobs() { - return - } - for { - select { - case exec := <-e.execCh: - e.jobs = append(e.jobs, exec) - // Selects are expensive. If our e.runningCh is just a placeholder - // so we don't busy wait on incoming executables (ie e.idle == true) - // then execute here - if e.idle { - if e.executeJobs() { + go func() { + for { + e.bufferMutex.Lock() + f := e.buffer[0] + e.buffer = e.buffer[1:] + e.bufferMutex.Unlock() + f() + if atomic.AddInt32(&e.count, -1) == 0 { return } } - - case <-e.runningCh: - // If we can run again, but we have nothing *to* run, note we are e.idle, setup - // a placeholder e.runningCh to keep us from hitting this case again until we have work - // to do, and break out of this case - if len(e.jobs) == 0 { - e.idle = true - e.runningCh = make(chan struct{}) - break - } - // If we have jobs to execute, execute them - if e.executeJobs() { - return - } - } + }() } -} - -func (e *Executor) executeJobs() bool { - // Execute jobs - go func(backlog []*job) { - for _, exec := range backlog { - exec.f() - close(exec.done) - } - }(e.jobs) - - // e.runningCh is busy till the last job is complete - e.runningCh = e.jobs[len(e.jobs)-1].done - jobsCompleted := -int32(len(e.jobs)) - - // e.jobs is now empty, and can start accumulating more jobs to do - e.jobs = nil - // We are definitely not idle - e.idle = false - return atomic.AddInt32(&e.count, jobsCompleted) == 0 + return result } diff --git a/serialize_test.go b/serialize_test.go index 3401918..37f2cae 100644 --- a/serialize_test.go +++ b/serialize_test.go @@ -21,13 +21,35 @@ package serialize_test import ( + "sync" "testing" + "github.com/stretchr/testify/require" + "go.uber.org/goleak" "github.com/edwarnicke/serialize" ) +func TestDataRace(t *testing.T) { + defer goleak.VerifyNone(t, goleak.IgnoreCurrent()) + e := new(serialize.Executor) + var arr []int + wg := sync.WaitGroup{} + for i := 0; i < 1000; i++ { + wg.Add(1) + go func(n int) { + e.AsyncExec(func() { + arr = append(arr, n) + wg.Done() + }) + }(i) + } + + wg.Wait() + require.Len(t, arr, 1000) +} + func TestASyncExec(t *testing.T) { defer goleak.VerifyNone(t, goleak.IgnoreCurrent()) exec := serialize.Executor{}