Skip to content

Commit

Permalink
use golang 1.19 native atomic types
Browse files Browse the repository at this point in the history
  • Loading branch information
alphadose committed Aug 5, 2022
1 parent 0bf4046 commit d3952e5
Show file tree
Hide file tree
Showing 4 changed files with 58 additions and 53 deletions.
12 changes: 6 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ Benchmarks to support the above claims [here](#benchmarks)

## Installation

You need Golang [1.18.x](https://go.dev/dl/) or above since this package uses generics
You need Golang [1.19.x](https://go.dev/dl/) or above

```bash
$ go get github.com/alphadose/itogami@v0.3.0
Expand Down Expand Up @@ -101,12 +101,12 @@ OS -> darwin
Results were computed from [benchstat](https://pkg.go.dev/golang.org/x/perf/cmd/benchstat) of 30 cases
```
name time/op
UnlimitedGoroutines-8 301ms ± 4%
UnlimitedGoroutines-8 331ms ± 4%
ErrGroup-8 515ms ± 9%
AntsPool-8 582ms ± 9%
GammaZeroPool-8 740ms ±13%
BytedanceGoPool-8 572ms ±18%
ItogamiPool-8 331ms ± 7%
ItogamiPool-8 337ms ± 1%
name alloc/op
UnlimitedGoroutines-8 96.3MB ± 0%
Expand All @@ -120,14 +120,14 @@ name allocs/op
UnlimitedGoroutines-8 2.00M ± 0%
ErrGroup-8 3.00M ± 0%
AntsPool-8 1.10M ± 2%
GammaZeroPool-8 1.05M ± 0%
GammaZeroPool-8 1.08M ± 0%
BytedanceGoPool-8 2.59M ± 1%
ItogamiPool-8 1.05M ± 0%
ItogamiPool-8 1.08M ± 0%
```

The following conclusions can be drawn from the above results:-

1. [Itogami](https://github.com/alphadose/itogami) is the fastest among all goroutine pool implementations and slower only than unlimited goroutines
1. [Itogami](https://github.com/alphadose/itogami) is the fastest among all goroutine pool implementations and slightly slower than unlimited goroutines
2. [Itogami](https://github.com/alphadose/itogami) has the least `allocs/op` and hence the memory usage scales really well with high load
3. The memory used per operation is in the acceptable range of other pools and drastically lower than unlimited goroutines
4. The tolerance (± %) for [Itogami](https://github.com/alphadose/itogami) is quite low for all 3 metrics indicating that the algorithm is quite stable overall
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
module github.com/alphadose/itogami

go 1.18
go 1.19
29 changes: 15 additions & 14 deletions pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ type Pool struct {
maxSize uint64
_p2 [cacheLinePadSize - unsafe.Sizeof(uint64(0))]byte
// using a stack keeps cpu caches warm based on FILO property
top unsafe.Pointer
_p3 [cacheLinePadSize - unsafe.Sizeof(unsafe.Pointer(nil))]byte
top atomic.Pointer[node]
_p3 [cacheLinePadSize - unsafe.Sizeof(atomic.Pointer[node]{})]byte
}

// NewPool returns a new thread pool
Expand Down Expand Up @@ -78,23 +78,24 @@ var (

// a single node in this stack
type node struct {
next unsafe.Pointer
next atomic.Pointer[node]
value *slot
}

// pop pops value from the top of the stack
func (self *Pool) pop() (value *slot) {
var top, next unsafe.Pointer
var top, next *node
for {
top = atomic.LoadPointer(&self.top)
top = self.top.Load()
if top == nil {
return
}
next = atomic.LoadPointer(&(*node)(top).next)
if atomic.CompareAndSwapPointer(&self.top, top, next) {
value = (*node)(top).value
(*node)(top).next, (*node)(top).value = nil, nil
itemFree((*node)(top))
next = top.next.Load()
if self.top.CompareAndSwap(top, next) {
value = top.value
top.value = nil
top.next.Store(nil)
itemFree(top)
return
}
}
Expand All @@ -103,14 +104,14 @@ func (self *Pool) pop() (value *slot) {
// push pushes a value on top of the stack
func (self *Pool) push(v *slot) {
var (
top unsafe.Pointer
top *node
item = itemAlloc().(*node)
)
item.value = v
for {
top = atomic.LoadPointer(&self.top)
item.next = top
if atomic.CompareAndSwapPointer(&self.top, top, unsafe.Pointer(item)) {
top = self.top.Load()
item.next.Store(top)
if self.top.CompareAndSwap(top, item) {
return
}
}
Expand Down
68 changes: 36 additions & 32 deletions pool_func.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,27 +6,30 @@ import (
"unsafe"
)

// a single slot for a worker in PoolWithFunc
type slotFunc[T any] struct {
threadPtr unsafe.Pointer
data T
}
type (
// a single slot for a worker in PoolWithFunc
slotFunc[T any] struct {
threadPtr unsafe.Pointer
data T
}

// PoolWithFunc is used for spawning workers for a single pre-defined function with myriad inputs
// useful for throughput bound cases
// has lower memory usage and allocs per op than the default Pool
// ( type -> func(T) {} ) where T is a generic parameter
type PoolWithFunc[T any] struct {
currSize uint64
_p1 [cacheLinePadSize - unsafe.Sizeof(uint64(0))]byte
maxSize uint64
alloc func() any
free func(any)
task func(T)
_p2 [cacheLinePadSize - unsafe.Sizeof(uint64(0)) - 3*unsafe.Sizeof(func() {})]byte
top unsafe.Pointer
_p3 [cacheLinePadSize - unsafe.Sizeof(unsafe.Pointer(nil))]byte
}
// PoolWithFunc is used for spawning workers for a single pre-defined function with myriad inputs
// useful for throughput bound cases
// has lower memory usage and allocs per op than the default Pool
//
// ( type -> func(T) {} ) where T is a generic parameter
PoolWithFunc[T any] struct {
currSize uint64
_p1 [cacheLinePadSize - unsafe.Sizeof(uint64(0))]byte
maxSize uint64
alloc func() any
free func(any)
task func(T)
_p2 [cacheLinePadSize - unsafe.Sizeof(uint64(0)) - 3*unsafe.Sizeof(func() {})]byte
top atomic.Pointer[dataItem[T]]
_p3 [cacheLinePadSize - unsafe.Sizeof(atomic.Pointer[dataItem[T]]{})]byte
}
)

// NewPoolWithFunc returns a new PoolWithFunc
func NewPoolWithFunc[T any](size uint64, task func(T)) *PoolWithFunc[T] {
Expand Down Expand Up @@ -68,23 +71,24 @@ func (self *PoolWithFunc[T]) loopQ(d *slotFunc[T]) {

// a single node in the stack
type dataItem[T any] struct {
next unsafe.Pointer
next atomic.Pointer[dataItem[T]]
value *slotFunc[T]
}

// pop pops value from the top of the stack
func (self *PoolWithFunc[T]) pop() (value *slotFunc[T]) {
var top, next unsafe.Pointer
var top, next *dataItem[T]
for {
top = atomic.LoadPointer(&self.top)
top = self.top.Load()
if top == nil {
return
}
next = atomic.LoadPointer(&(*dataItem[T])(top).next)
if atomic.CompareAndSwapPointer(&self.top, top, next) {
value = (*dataItem[T])(top).value
(*dataItem[T])(top).next, (*dataItem[T])(top).value = nil, nil
self.free((*dataItem[T])(top))
next = top.next.Load()
if self.top.CompareAndSwap(top, next) {
value = top.value
top.value = nil
top.next.Store(nil)
self.free(top)
return
}
}
Expand All @@ -93,14 +97,14 @@ func (self *PoolWithFunc[T]) pop() (value *slotFunc[T]) {
// push pushes a value on top of the stack
func (self *PoolWithFunc[T]) push(v *slotFunc[T]) {
var (
top unsafe.Pointer
top *dataItem[T]
item = self.alloc().(*dataItem[T])
)
item.value = v
for {
top = atomic.LoadPointer(&self.top)
item.next = top
if atomic.CompareAndSwapPointer(&self.top, top, unsafe.Pointer(item)) {
top = self.top.Load()
item.next.Store(top)
if self.top.CompareAndSwap(top, item) {
return
}
}
Expand Down

0 comments on commit d3952e5

Please sign in to comment.