Skip to content

Commit

Permalink
Fix critical data race conditions + improve performance (#6)
Browse files Browse the repository at this point in the history
* fix datarace conditions

Signed-off-by: denis-tingajkin <denis.tingajkin@xored.com>
  • Loading branch information
denis-tingaikin authored Oct 17, 2020
1 parent 148c9f0 commit cf1114c
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 76 deletions.
5 changes: 4 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,7 @@ module github.com/edwarnicke/serialize

go 1.13

require go.uber.org/goleak v1.1.10
require (
github.com/stretchr/testify v1.4.0
go.uber.org/goleak v1.1.10
)
98 changes: 23 additions & 75 deletions serialize.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,95 +28,43 @@ const (
channelSize = 256 // 256 is chosen because 256*8 = 2kb, or about the cost of a go routing
)

type job struct {
f func()
done chan struct{}
}

// Executor - a struct that can be used to guarantee exclusive, in order execution of functions.
type Executor struct {
execCh chan *job
init sync.Once
count int32
jobs []*job
runningCh chan struct{}
idle bool
orderCh chan func()
buffer []func()
bufferMutex sync.Mutex
init sync.Once
count int32
}

// AsyncExec - guarantees f() will be executed Exclusively and in the Order submitted.
// It immediately returns a channel that will be closed when f() has completed execution.
func (e *Executor) AsyncExec(f func()) <-chan struct{} {
// Initialize *once*
e.init.Do(func() {
e.execCh = make(chan *job, channelSize)
e.orderCh = make(chan func(), channelSize)
})
exec := &job{
f: f,
done: make(chan struct{}),
result := make(chan struct{})
e.orderCh <- func() {
f()
close(result)
}
e.bufferMutex.Lock()
e.buffer = append(e.buffer, <-e.orderCh)
e.bufferMutex.Unlock()
// Start go routine if we don't have one
if atomic.AddInt32(&e.count, 1) == 1 {
go e.process(exec)
return exec.done
}

e.execCh <- exec
return exec.done
}

func (e *Executor) process(exec *job) {
// Selects are expensive. If we only need to execute one thing,
// do it here and be done with it.
e.jobs = []*job{exec}
if e.executeJobs() {
return
}
for {
select {
case exec := <-e.execCh:
e.jobs = append(e.jobs, exec)
// Selects are expensive. If our e.runningCh is just a placeholder
// so we don't busy wait on incoming executables (ie e.idle == true)
// then execute here
if e.idle {
if e.executeJobs() {
go func() {
for {
e.bufferMutex.Lock()
f := e.buffer[0]
e.buffer = e.buffer[1:]
e.bufferMutex.Unlock()
f()
if atomic.AddInt32(&e.count, -1) == 0 {
return
}
}

case <-e.runningCh:
// If we can run again, but we have nothing *to* run, note we are e.idle, setup
// a placeholder e.runningCh to keep us from hitting this case again until we have work
// to do, and break out of this case
if len(e.jobs) == 0 {
e.idle = true
e.runningCh = make(chan struct{})
break
}
// If we have jobs to execute, execute them
if e.executeJobs() {
return
}
}
}()
}
}

func (e *Executor) executeJobs() bool {
// Execute jobs
go func(backlog []*job) {
for _, exec := range backlog {
exec.f()
close(exec.done)
}
}(e.jobs)

// e.runningCh is busy till the last job is complete
e.runningCh = e.jobs[len(e.jobs)-1].done
jobsCompleted := -int32(len(e.jobs))

// e.jobs is now empty, and can start accumulating more jobs to do
e.jobs = nil
// We are definitely not idle
e.idle = false
return atomic.AddInt32(&e.count, jobsCompleted) == 0
return result
}
22 changes: 22 additions & 0 deletions serialize_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,35 @@
package serialize_test

import (
"sync"
"testing"

"github.com/stretchr/testify/require"

"go.uber.org/goleak"

"github.com/edwarnicke/serialize"
)

func TestDataRace(t *testing.T) {
defer goleak.VerifyNone(t, goleak.IgnoreCurrent())
e := new(serialize.Executor)
var arr []int
wg := sync.WaitGroup{}
for i := 0; i < 1000; i++ {
wg.Add(1)
go func(n int) {
e.AsyncExec(func() {
arr = append(arr, n)
wg.Done()
})
}(i)
}

wg.Wait()
require.Len(t, arr, 1000)
}

func TestASyncExec(t *testing.T) {
defer goleak.VerifyNone(t, goleak.IgnoreCurrent())
exec := serialize.Executor{}
Expand Down

0 comments on commit cf1114c

Please sign in to comment.