Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat: Added workerpool.Group #458

Merged
merged 17 commits into from
Feb 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 0 additions & 13 deletions core/generics/event/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,19 +37,6 @@ func (e *Event[T]) Attach(closure *Closure[T], triggerMaxCount ...uint64) {
e.asyncHandlers.Set(closure.ID, newHandler[T](e.callbackFromClosure(closure, triggerMaxCount...), Loop))
}

// AttachWithNewWorkerPool allows to register a Closure that is executed asynchronously in a separate, newly created worker pool when the Event triggers.
// If 'triggerMaxCount' is >0, the Closure is automatically detached after exceeding the trigger limit.
func (e *Event[T]) AttachWithNewWorkerPool(closure *Closure[T], workers int, triggerMaxCount ...uint64) *workerpool.UnboundedWorkerPool {
if closure == nil {
return nil
}

wp := workerpool.NewUnboundedWorkerPool(workers)
e.asyncHandlers.Set(closure.ID, newHandler[T](e.callbackFromClosure(closure, triggerMaxCount...), wp))
wp.Start()
return wp
}

// AttachWithWorkerPool allows to register a Closure that is executed asynchronously in the specified worker pool when the Event triggers.
// If 'triggerMaxCount' is >0, the Closure is automatically detached after exceeding the trigger limit.
func (e *Event[T]) AttachWithWorkerPool(closure *Closure[T], wp *workerpool.UnboundedWorkerPool, triggerMaxCount ...uint64) {
Expand Down
2 changes: 1 addition & 1 deletion core/generics/event/loop.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,6 @@ import (
var Loop *workerpool.UnboundedWorkerPool

func init() {
Loop = workerpool.NewUnboundedWorkerPool()
Loop = workerpool.NewUnboundedWorkerPool("event.Loop")
Loop.Start()
}
154 changes: 122 additions & 32 deletions core/syncutils/counter.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,63 +2,153 @@ package syncutils

import (
"sync"

"github.com/iotaledger/hive.go/core/generics/orderedmap"
)

type Counter struct {
value int
mutex sync.RWMutex
valueIncreased *sync.Cond
valueDecreased *sync.Cond
value int
valueMutex sync.RWMutex
valueIncreasedCond *sync.Cond
valueDecreasedCond *sync.Cond
subscribers *orderedmap.OrderedMap[uint64, func(oldValue, newValue int)]
subscribersCounter uint64
subscribersMutex sync.RWMutex
}

func NewCounter() (newCounter *Counter) {
newCounter = new(Counter)
newCounter.valueIncreased = sync.NewCond(&newCounter.mutex)
newCounter.valueDecreased = sync.NewCond(&newCounter.mutex)
newCounter.valueIncreasedCond = sync.NewCond(&newCounter.valueMutex)
newCounter.valueDecreasedCond = sync.NewCond(&newCounter.valueMutex)
newCounter.subscribers = orderedmap.New[uint64, func(oldValue int, newValue int)]()

return
}
func (b *Counter) Value() int {
b.mutex.RLock()
defer b.mutex.RUnlock()

return b.value
func (c *Counter) Get() (value int) {
c.valueMutex.RLock()
defer c.valueMutex.RUnlock()

return c.value
}

func (c *Counter) Set(newValue int) (oldValue int) {
if oldValue = c.set(newValue); oldValue < newValue {
c.valueIncreasedCond.Broadcast()
} else if oldValue > newValue {
c.valueDecreasedCond.Broadcast()
}

return oldValue
}

func (b *Counter) Increase() {
b.mutex.Lock()
b.value++
b.mutex.Unlock()
func (c *Counter) Update(delta int) (newValue int) {
if newValue = c.update(delta); delta > 1 {
c.valueIncreasedCond.Broadcast()
} else if delta < 1 {
c.valueDecreasedCond.Broadcast()
}

b.valueIncreased.Broadcast()
return newValue
}

func (b *Counter) Decrease() {
b.mutex.Lock()
b.value--
b.mutex.Unlock()
func (c *Counter) Increase() (newValue int) {
return c.Update(1)
}

b.valueDecreased.Broadcast()
func (c *Counter) Decrease() (newValue int) {
return c.Update(-1)
}

func (b *Counter) WaitIsZero() {
b.WaitIsBelow(1)
func (c *Counter) WaitIsZero() {
c.WaitIsBelow(1)
}

func (b *Counter) WaitIsBelow(threshold int) {
b.mutex.Lock()
defer b.mutex.Unlock()
func (c *Counter) WaitIsBelow(threshold int) {
c.valueMutex.Lock()
defer c.valueMutex.Unlock()

for b.value >= threshold {
b.valueDecreased.Wait()
for c.value >= threshold {
c.valueDecreasedCond.Wait()
}
}

func (b *Counter) WaitIsAbove(threshold int) {
b.mutex.Lock()
defer b.mutex.Unlock()
func (c *Counter) WaitIsAbove(threshold int) {
c.valueMutex.Lock()
defer c.valueMutex.Unlock()

for c.value <= threshold {
c.valueIncreasedCond.Wait()
}
}

func (c *Counter) Subscribe(subscribers ...func(oldValue, newValue int)) (unsubscribe func()) {
if len(subscribers) == 0 {
return func() {}
}

subscriberID := c.subscribe(func(oldValue, newValue int) {
for _, updateCallback := range subscribers {
updateCallback(oldValue, newValue)
}
})

return func() {
c.unsubscribe(subscriberID)
}
}

func (c *Counter) set(newValue int) (oldValue int) {
c.valueMutex.Lock()
defer c.valueMutex.Unlock()

if oldValue = c.value; newValue != oldValue {
c.value = newValue

for b.value <= threshold {
b.valueIncreased.Wait()
c.notifySubscribers(oldValue, newValue)
}

return oldValue
}

func (c *Counter) update(delta int) (newValue int) {
c.valueMutex.Lock()
defer c.valueMutex.Unlock()

oldValue := c.value
if newValue = oldValue + delta; newValue != oldValue {
c.value = newValue

c.notifySubscribers(oldValue, newValue)
}

return newValue
}

func (c *Counter) subscribe(callback func(oldValue, newValue int)) (subscriptionID uint64) {
c.subscribersMutex.Lock()
defer c.subscribersMutex.Unlock()

c.subscribersCounter++
c.subscribers.Set(c.subscribersCounter, callback)

return c.subscribersCounter
}

func (c *Counter) unsubscribe(subscriptionID uint64) {
c.subscribersMutex.Lock()
defer c.subscribersMutex.Unlock()

c.subscribers.Delete(subscriptionID)
}

func (c *Counter) notifySubscribers(oldValue, newValue int) {
c.subscribersMutex.RLock()
defer c.subscribersMutex.RUnlock()

c.subscribers.ForEach(func(_ uint64, subscription func(oldValue, newValue int)) bool {
subscription(oldValue, newValue)

return true
})
}
1 change: 0 additions & 1 deletion core/timed/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@ func TestTimedExecutor(t *testing.T) {
for et, f := range elements {
timedExecutor.ExecuteAt(f, et)
}
assert.Equal(t, len(elements), timedExecutor.Size())

assert.Eventually(t, func() bool { return len(actual) == len(expected) }, 30*time.Second, 100*time.Millisecond)
assert.Equal(t, 0, timedExecutor.Size())
Expand Down
Loading