Skip to content

Commit

Permalink
Fixed CPU usage caused by polling inboxes (#27)
Browse files Browse the repository at this point in the history
  • Loading branch information
anthdm authored Mar 6, 2023
1 parent 2841b44 commit bae4cfa
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 9 deletions.
1 change: 1 addition & 0 deletions actor/inbox.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,5 +53,6 @@ func (in *Inbox) Stop() error {
}

func (in *Inbox) Send(msg Envelope) {
in.ggq.Awake()
in.ggq.Write(msg)
}
25 changes: 23 additions & 2 deletions ggq/ggq.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ package ggq

import (
"runtime"
"sync"
"sync/atomic"
"time"
"unsafe"

"github.com/anthdm/hollywood/log"
Expand Down Expand Up @@ -39,11 +39,13 @@ type GGQ[T any] struct {
_ [cacheLinePadding - unsafe.Sizeof(atomic.Uint32{})]byte
state atomic.Uint32
_ [cacheLinePadding - unsafe.Sizeof(atomic.Uint32{})]byte
isIdling atomic.Bool
buffer []slot[T]
_ [cacheLinePadding]byte
mask uint32
consumer Consumer[T]
itemBuffer []T
cond *sync.Cond
}

func New[T any](size uint32, consumer Consumer[T]) *GGQ[T] {
Expand All @@ -55,6 +57,7 @@ func New[T any](size uint32, consumer Consumer[T]) *GGQ[T] {
mask: size - 1,
consumer: consumer,
itemBuffer: make([]T, size+1),
cond: sync.NewCond(nil),
}
}

Expand Down Expand Up @@ -86,7 +89,13 @@ func (q *GGQ[T]) ReadN() (T, bool) {
} else if upper := q.written.Load(); lower <= upper {
runtime.Gosched()
} else if !q.state.CompareAndSwap(stateClosed, stateRunning) {
time.Sleep(time.Microsecond)
var mu sync.Mutex
q.cond.L = &mu
q.isIdling.Store(true)
mu.Lock()
q.cond.Wait()
mu.Unlock()
q.isIdling.Store(false)
} else {
break
}
Expand All @@ -95,6 +104,17 @@ func (q *GGQ[T]) ReadN() (T, bool) {
return t, true
}

// Awake the queue if its in the idle state.
func (q *GGQ[T]) Awake() {
if q.isIdling.Load() {
q.cond.Signal()
}
}

func (q *GGQ[T]) IsIdle() bool {
return q.isIdling.Load()
}

func (q *GGQ[T]) Consume(lower, upper uint32) {
consumed := 0
for ; lower <= upper; lower++ {
Expand Down Expand Up @@ -139,6 +159,7 @@ func (q *GGQ[T]) Read() (T, bool) {

func (q *GGQ[T]) Close() {
q.state.Store(stateClosed)
q.cond.Signal()
}

func isPOW2(n uint32) bool {
Expand Down
14 changes: 7 additions & 7 deletions ggq/ggq_test.go
Original file line number Diff line number Diff line change
@@ -1,24 +1,24 @@
package ggq

import (
"fmt"
"testing"
)

type consumer[T any] struct{}

func (c *consumer[T]) Consume(t []T) {
// fmt.Println(len(t))
fmt.Println(t)
}

func TestSingleMessageNotConsuming(t *testing.T) {
q := New[int](1024, &consumer[int]{})

go func() {
for i := 0; i < 1; i++ {
q.Write(i)
}
q.Close()
}()
q.cond.Signal()
for i := 0; i < 10; i++ {
q.Write(i)
}
q.Close()

q.ReadN()
}
Expand Down

0 comments on commit bae4cfa

Please sign in to comment.