From 8575e1669512fc084f27c799c3b035e883a5dec3 Mon Sep 17 00:00:00 2001 From: Hans Moog <3293976+hmoog@users.noreply.github.com> Date: Tue, 31 Jan 2023 23:58:08 +0100 Subject: [PATCH 01/16] Feat: Reworked syncutils.Counter --- core/syncutils/counter.go | 137 +++++++++++++++++++++++++++++--------- core/workerpool/group.go | 8 +++ 2 files changed, 113 insertions(+), 32 deletions(-) create mode 100644 core/workerpool/group.go diff --git a/core/syncutils/counter.go b/core/syncutils/counter.go index b18968aec..aa4803b53 100644 --- a/core/syncutils/counter.go +++ b/core/syncutils/counter.go @@ -2,63 +2,136 @@ package syncutils import ( "sync" + + "github.com/iotaledger/hive.go/core/generics/event" ) type Counter struct { - value int - mutex sync.RWMutex - valueIncreased *sync.Cond - valueDecreased *sync.Cond + value int + mutex sync.RWMutex + updatedEvent *event.Linkable[*counterEvent] + increasedCond *sync.Cond + decreasedCond *sync.Cond } func NewCounter() (newCounter *Counter) { newCounter = new(Counter) - newCounter.valueIncreased = sync.NewCond(&newCounter.mutex) - newCounter.valueDecreased = sync.NewCond(&newCounter.mutex) + newCounter.updatedEvent = event.NewLinkable[*counterEvent]() + newCounter.increasedCond = sync.NewCond(&newCounter.mutex) + newCounter.decreasedCond = sync.NewCond(&newCounter.mutex) return } -func (b *Counter) Value() int { - b.mutex.RLock() - defer b.mutex.RUnlock() - return b.value +func (c *Counter) Subscribe(updateCallbacks ...func(oldValue, newValue int)) (unsubscribe func()) { + if len(updateCallbacks) == 0 { + return func() {} + } + + closure := event.NewClosure(func(event *counterEvent) { + for _, updateCallback := range updateCallbacks { + updateCallback(event.oldValue, event.newValue) + } + }) + + c.updatedEvent.Hook(closure) + + return func() { + c.updatedEvent.Detach(closure) + } } -func (b *Counter) Increase() { - b.mutex.Lock() - b.value++ - b.mutex.Unlock() +func (c *Counter) Get() (value int) { + c.mutex.RLock() + defer c.mutex.RUnlock() - b.valueIncreased.Broadcast() + return c.value } -func (b *Counter) Decrease() { - b.mutex.Lock() - b.value-- - b.mutex.Unlock() +func (c *Counter) Set(newValue int) (oldValue int) { + if oldValue = c.set(newValue); oldValue < newValue { + c.increasedCond.Broadcast() + } else if oldValue > newValue { + c.decreasedCond.Broadcast() + } + + return oldValue +} + +func (c *Counter) Update(delta int) (newValue int) { + if newValue = c.update(delta); delta > 1 { + c.increasedCond.Broadcast() + } else if delta < 1 { + c.decreasedCond.Broadcast() + } + + return newValue +} + +func (c *Counter) Increase() (newValue int) { + return c.Update(1) +} - b.valueDecreased.Broadcast() +func (c *Counter) Decrease() (newValue int) { + return c.Update(-1) } -func (b *Counter) WaitIsZero() { - b.WaitIsBelow(1) +func (c *Counter) WaitIsZero() { + c.WaitIsBelow(1) } -func (b *Counter) WaitIsBelow(threshold int) { - b.mutex.Lock() - defer b.mutex.Unlock() +func (c *Counter) WaitIsBelow(threshold int) { + c.mutex.Lock() + defer c.mutex.Unlock() - for b.value >= threshold { - b.valueDecreased.Wait() + for c.value >= threshold { + c.decreasedCond.Wait() } } -func (b *Counter) WaitIsAbove(threshold int) { - b.mutex.Lock() - defer b.mutex.Unlock() +func (c *Counter) WaitIsAbove(threshold int) { + c.mutex.Lock() + defer c.mutex.Unlock() - for b.value <= threshold { - b.valueIncreased.Wait() + for c.value <= threshold { + c.increasedCond.Wait() } } + +func (c *Counter) set(newValue int) (oldValue int) { + c.mutex.Lock() + defer c.mutex.Unlock() + + if oldValue = c.value; newValue != oldValue { + c.updatedEvent.Trigger(&counterEvent{ + oldValue: oldValue, + newValue: newValue, + }) + } + + return oldValue +} + +func (c *Counter) update(delta int) (newValue int) { + c.mutex.Lock() + defer c.mutex.Unlock() + + if delta == 0 { + return c.value + } + + oldValue := c.value + newValue = oldValue + delta + + c.updatedEvent.Trigger(&counterEvent{ + oldValue: oldValue, + newValue: newValue, + }) + + return newValue +} + +type counterEvent struct { + oldValue int + newValue int +} diff --git a/core/workerpool/group.go b/core/workerpool/group.go new file mode 100644 index 000000000..4f9626e20 --- /dev/null +++ b/core/workerpool/group.go @@ -0,0 +1,8 @@ +package workerpool + +type Group struct { +} + +func (g *Group) CreatePool() *UnboundedWorkerPool { + +} From 27236e438fd421b4c5ff94bf310af0434b6dd907 Mon Sep 17 00:00:00 2001 From: Hans Moog <3293976+hmoog@users.noreply.github.com> Date: Wed, 1 Feb 2023 01:18:32 +0100 Subject: [PATCH 02/16] Refactor: fixed circular dependency --- core/syncutils/counter.go | 135 +++++++++++++++++++++----------------- 1 file changed, 76 insertions(+), 59 deletions(-) diff --git a/core/syncutils/counter.go b/core/syncutils/counter.go index aa4803b53..f3062bfdf 100644 --- a/core/syncutils/counter.go +++ b/core/syncutils/counter.go @@ -3,56 +3,40 @@ package syncutils import ( "sync" - "github.com/iotaledger/hive.go/core/generics/event" + "github.com/iotaledger/hive.go/core/generics/orderedmap" ) type Counter struct { - value int - mutex sync.RWMutex - updatedEvent *event.Linkable[*counterEvent] - increasedCond *sync.Cond - decreasedCond *sync.Cond + value int + valueMutex sync.RWMutex + valueIncreasedCond *sync.Cond + valueDecreasedCond *sync.Cond + subscribers *orderedmap.OrderedMap[uint64, func(oldValue, newValue int)] + subscribersCounter uint64 + subscribersMutex sync.RWMutex } func NewCounter() (newCounter *Counter) { newCounter = new(Counter) - newCounter.updatedEvent = event.NewLinkable[*counterEvent]() - newCounter.increasedCond = sync.NewCond(&newCounter.mutex) - newCounter.decreasedCond = sync.NewCond(&newCounter.mutex) + newCounter.valueIncreasedCond = sync.NewCond(&newCounter.valueMutex) + newCounter.valueDecreasedCond = sync.NewCond(&newCounter.valueMutex) + newCounter.subscribers = orderedmap.New[uint64, func(oldValue int, newValue int)]() return } -func (c *Counter) Subscribe(updateCallbacks ...func(oldValue, newValue int)) (unsubscribe func()) { - if len(updateCallbacks) == 0 { - return func() {} - } - - closure := event.NewClosure(func(event *counterEvent) { - for _, updateCallback := range updateCallbacks { - updateCallback(event.oldValue, event.newValue) - } - }) - - c.updatedEvent.Hook(closure) - - return func() { - c.updatedEvent.Detach(closure) - } -} - func (c *Counter) Get() (value int) { - c.mutex.RLock() - defer c.mutex.RUnlock() + c.valueMutex.RLock() + defer c.valueMutex.RUnlock() return c.value } func (c *Counter) Set(newValue int) (oldValue int) { if oldValue = c.set(newValue); oldValue < newValue { - c.increasedCond.Broadcast() + c.valueIncreasedCond.Broadcast() } else if oldValue > newValue { - c.decreasedCond.Broadcast() + c.valueDecreasedCond.Broadcast() } return oldValue @@ -60,9 +44,9 @@ func (c *Counter) Set(newValue int) (oldValue int) { func (c *Counter) Update(delta int) (newValue int) { if newValue = c.update(delta); delta > 1 { - c.increasedCond.Broadcast() + c.valueIncreasedCond.Broadcast() } else if delta < 1 { - c.decreasedCond.Broadcast() + c.valueDecreasedCond.Broadcast() } return newValue @@ -81,57 +65,90 @@ func (c *Counter) WaitIsZero() { } func (c *Counter) WaitIsBelow(threshold int) { - c.mutex.Lock() - defer c.mutex.Unlock() + c.valueMutex.Lock() + defer c.valueMutex.Unlock() for c.value >= threshold { - c.decreasedCond.Wait() + c.valueDecreasedCond.Wait() } } func (c *Counter) WaitIsAbove(threshold int) { - c.mutex.Lock() - defer c.mutex.Unlock() + c.valueMutex.Lock() + defer c.valueMutex.Unlock() for c.value <= threshold { - c.increasedCond.Wait() + c.valueIncreasedCond.Wait() + } +} + +func (c *Counter) Subscribe(subscribers ...func(oldValue, newValue int)) (unsubscribe func()) { + if len(subscribers) == 0 { + return func() {} + } + + subscriberID := c.subscribe(func(oldValue, newValue int) { + for _, updateCallback := range subscribers { + updateCallback(oldValue, newValue) + } + }) + + return func() { + c.unsubscribe(subscriberID) } } func (c *Counter) set(newValue int) (oldValue int) { - c.mutex.Lock() - defer c.mutex.Unlock() + c.valueMutex.Lock() + defer c.valueMutex.Unlock() if oldValue = c.value; newValue != oldValue { - c.updatedEvent.Trigger(&counterEvent{ - oldValue: oldValue, - newValue: newValue, - }) + c.value = newValue + + c.notifySubscribers(oldValue, newValue) } return oldValue } func (c *Counter) update(delta int) (newValue int) { - c.mutex.Lock() - defer c.mutex.Unlock() - - if delta == 0 { - return c.value - } + c.valueMutex.Lock() + defer c.valueMutex.Unlock() oldValue := c.value - newValue = oldValue + delta + if newValue = oldValue + delta; newValue != oldValue { + c.value = newValue - c.updatedEvent.Trigger(&counterEvent{ - oldValue: oldValue, - newValue: newValue, - }) + c.notifySubscribers(oldValue, newValue) + } return newValue } -type counterEvent struct { - oldValue int - newValue int +func (c *Counter) subscribe(callback func(oldValue, newValue int)) (subscriptionID uint64) { + c.subscribersMutex.Lock() + defer c.subscribersMutex.Unlock() + + c.subscribersCounter++ + c.subscribers.Set(c.subscribersCounter, callback) + + return c.subscribersCounter +} + +func (c *Counter) unsubscribe(subscriptionID uint64) { + c.subscribersMutex.Lock() + defer c.subscribersMutex.Unlock() + + c.subscribers.Delete(subscriptionID) +} + +func (c *Counter) notifySubscribers(oldValue, newValue int) { + c.subscribersMutex.RLock() + defer c.subscribersMutex.RUnlock() + + c.subscribers.ForEach(func(_ uint64, subscription func(oldValue, newValue int)) bool { + subscription(oldValue, newValue) + + return true + }) } From 82a9553ab9c88dcd2e290726834b0692a57718f0 Mon Sep 17 00:00:00 2001 From: Hans Moog <3293976+hmoog@users.noreply.github.com> Date: Wed, 1 Feb 2023 01:31:27 +0100 Subject: [PATCH 03/16] Feat: fixed refactor error --- core/workerpool/unboundedworkerpool.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/core/workerpool/unboundedworkerpool.go b/core/workerpool/unboundedworkerpool.go index c581969d4..231d4a414 100644 --- a/core/workerpool/unboundedworkerpool.go +++ b/core/workerpool/unboundedworkerpool.go @@ -53,11 +53,11 @@ func (u *UnboundedWorkerPool) Submit(task func(), optStackTrace ...string) { panic("worker pool is not running") } - if u.PendingTasksCounter.Increase(); len(optStackTrace) >= 1 { - u.Queue.Push(newWorkerPoolTask(u.PendingTasksCounter.Decrease, task, optStackTrace[0])) - } else { - u.Queue.Push(newWorkerPoolTask(u.PendingTasksCounter.Decrease, task, "")) + if u.PendingTasksCounter.Increase(); len(optStackTrace) == 0 { + optStackTrace = []string{""} } + + u.Queue.Push(newWorkerPoolTask(func() { u.PendingTasksCounter.Decrease() }, task, optStackTrace[0])) } func (u *UnboundedWorkerPool) Shutdown(cancelPendingTasks ...bool) (self *UnboundedWorkerPool) { From 8ee4e1c4cfc94c6927e00f233268e0920bb6b4f0 Mon Sep 17 00:00:00 2001 From: Hans Moog <3293976+hmoog@users.noreply.github.com> Date: Wed, 1 Feb 2023 01:35:05 +0100 Subject: [PATCH 04/16] Fix: fixed compile error --- core/workerpool/group.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/workerpool/group.go b/core/workerpool/group.go index 4f9626e20..777861a1d 100644 --- a/core/workerpool/group.go +++ b/core/workerpool/group.go @@ -4,5 +4,5 @@ type Group struct { } func (g *Group) CreatePool() *UnboundedWorkerPool { - + return nil } From a0842a90d142c412273c8319cc6a987eccbb40ec Mon Sep 17 00:00:00 2001 From: Hans Moog <3293976+hmoog@users.noreply.github.com> Date: Wed, 1 Feb 2023 01:49:02 +0100 Subject: [PATCH 05/16] Fix: fixed flaky test --- core/timed/executor_test.go | 1 - 1 file changed, 1 deletion(-) diff --git a/core/timed/executor_test.go b/core/timed/executor_test.go index 389016d8c..7158e219e 100644 --- a/core/timed/executor_test.go +++ b/core/timed/executor_test.go @@ -55,7 +55,6 @@ func TestTimedExecutor(t *testing.T) { for et, f := range elements { timedExecutor.ExecuteAt(f, et) } - assert.Equal(t, len(elements), timedExecutor.Size()) assert.Eventually(t, func() bool { return len(actual) == len(expected) }, 30*time.Second, 100*time.Millisecond) assert.Equal(t, 0, timedExecutor.Size()) From bf0387b19d9f792d785a9d73d39a33c41a809c4b Mon Sep 17 00:00:00 2001 From: Hans Moog <3293976+hmoog@users.noreply.github.com> Date: Wed, 1 Feb 2023 01:58:35 +0100 Subject: [PATCH 06/16] Refactor: reverted changes --- core/workerpool/unboundedworkerpool.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/core/workerpool/unboundedworkerpool.go b/core/workerpool/unboundedworkerpool.go index 231d4a414..cd66d3701 100644 --- a/core/workerpool/unboundedworkerpool.go +++ b/core/workerpool/unboundedworkerpool.go @@ -53,11 +53,11 @@ func (u *UnboundedWorkerPool) Submit(task func(), optStackTrace ...string) { panic("worker pool is not running") } - if u.PendingTasksCounter.Increase(); len(optStackTrace) == 0 { - optStackTrace = []string{""} + if u.PendingTasksCounter.Increase(); len(optStackTrace) >= 1 { + u.Queue.Push(newWorkerPoolTask(func() { u.PendingTasksCounter.Decrease() }, task, optStackTrace[0])) + } else { + u.Queue.Push(newWorkerPoolTask(func() { u.PendingTasksCounter.Decrease() }, task, "")) } - - u.Queue.Push(newWorkerPoolTask(func() { u.PendingTasksCounter.Decrease() }, task, optStackTrace[0])) } func (u *UnboundedWorkerPool) Shutdown(cancelPendingTasks ...bool) (self *UnboundedWorkerPool) { From 6e16815a570c0a34a91238a7c6093a5b758a303a Mon Sep 17 00:00:00 2001 From: Hans Moog <3293976+hmoog@users.noreply.github.com> Date: Wed, 1 Feb 2023 03:36:53 +0100 Subject: [PATCH 07/16] Feat: added workerpool group --- core/workerpool/group.go | 107 +++++++++++++++++++++++++++++++++- core/workerpool/group_test.go | 30 ++++++++++ 2 files changed, 135 insertions(+), 2 deletions(-) create mode 100644 core/workerpool/group_test.go diff --git a/core/workerpool/group.go b/core/workerpool/group.go index 777861a1d..0d6791bb2 100644 --- a/core/workerpool/group.go +++ b/core/workerpool/group.go @@ -1,8 +1,111 @@ package workerpool +import ( + "sync" + + "github.com/iotaledger/hive.go/core/generics/orderedmap" + "github.com/iotaledger/hive.go/core/syncutils" +) + type Group struct { + PendingChildrenCounter *syncutils.Counter + + name string + pools *orderedmap.OrderedMap[string, *UnboundedWorkerPool] + poolsMutex sync.RWMutex + groups *orderedmap.OrderedMap[string, *Group] + groupsMutex sync.RWMutex +} + +func NewGroup(name string) (group *Group) { + return &Group{ + PendingChildrenCounter: syncutils.NewCounter(), + name: name, + pools: orderedmap.New[string, *UnboundedWorkerPool](), + groups: orderedmap.New[string, *Group](), + } } -func (g *Group) CreatePool() *UnboundedWorkerPool { - return nil +func (g *Group) Name() (name string) { + return g.name +} + +func (g *Group) CreatePool(name string, optsWorkerCount ...int) (pool *UnboundedWorkerPool) { + g.poolsMutex.Lock() + defer g.poolsMutex.Unlock() + + pool = NewUnboundedWorkerPool(optsWorkerCount...) + pool.PendingTasksCounter.Subscribe(func(oldValue, newValue int) { + if oldValue == 0 { + g.PendingChildrenCounter.Increase() + } else if newValue == 0 { + g.PendingChildrenCounter.Decrease() + } + }) + + g.pools.Set(name, pool) + + return pool.Start() +} + +func (g *Group) Pool(name string) (pool *UnboundedWorkerPool, exists bool) { + g.poolsMutex.RLock() + defer g.poolsMutex.RUnlock() + + return g.pools.Get(name) +} + +func (g *Group) CreateGroup(name string) (group *Group) { + group = NewGroup(name) + group.PendingChildrenCounter.Subscribe(func(oldValue, newValue int) { + if oldValue == 0 { + g.PendingChildrenCounter.Increase() + } else if newValue == 0 { + g.PendingChildrenCounter.Decrease() + } + }) + + g.groups.Set(name, group) + + return group +} + +func (g *Group) Group(name string) (pool *Group, exists bool) { + g.groupsMutex.RLock() + defer g.groupsMutex.RUnlock() + + return g.groups.Get(name) +} + +func (g *Group) Shutdown() { + g.PendingChildrenCounter.WaitIsZero() + + g.shutdown() +} + +func (g *Group) shutdown() { + g.shutdownPools() + g.shutdownGroups() +} + +func (g *Group) shutdownPools() { + g.poolsMutex.RLock() + defer g.poolsMutex.RUnlock() + + g.pools.ForEach(func(_ string, pool *UnboundedWorkerPool) bool { + pool.Shutdown(true) + + return true + }) +} + +func (g *Group) shutdownGroups() { + g.groupsMutex.RLock() + defer g.groupsMutex.RUnlock() + + g.groups.ForEach(func(_ string, group *Group) bool { + group.shutdown() + + return true + }) } diff --git a/core/workerpool/group_test.go b/core/workerpool/group_test.go new file mode 100644 index 000000000..04c2da228 --- /dev/null +++ b/core/workerpool/group_test.go @@ -0,0 +1,30 @@ +package workerpool + +import ( + "fmt" + "testing" + "time" +) + +func Test(t *testing.T) { + group := NewGroup("protocol") + + pool1 := group.CreatePool("pool1") + pool2 := group.CreatePool("pool2") + + pool1.Submit(func() { + time.Sleep(1 * time.Second) + + fmt.Println("TASK1 done") + }) + + pool2.Submit(func() { + time.Sleep(3 * time.Second) + + fmt.Println("TASK2 done") + }) + + group.Shutdown() + + fmt.Println("ALL TASKS DONE") +} From 99eb714e95177f84d9e62768a254687aef2bf034 Mon Sep 17 00:00:00 2001 From: Alexander Sporn Date: Wed, 1 Feb 2023 11:29:25 +0100 Subject: [PATCH 08/16] Added Pools function to Group --- core/workerpool/group.go | 23 +++++++++++++++++++++++ core/workerpool/group_test.go | 12 +++++++++--- 2 files changed, 32 insertions(+), 3 deletions(-) diff --git a/core/workerpool/group.go b/core/workerpool/group.go index 0d6791bb2..57362368c 100644 --- a/core/workerpool/group.go +++ b/core/workerpool/group.go @@ -1,6 +1,7 @@ package workerpool import ( + "fmt" "sync" "github.com/iotaledger/hive.go/core/generics/orderedmap" @@ -55,6 +56,28 @@ func (g *Group) Pool(name string) (pool *UnboundedWorkerPool, exists bool) { return g.pools.Get(name) } +func (g *Group) Pools() map[string]*UnboundedWorkerPool { + g.poolsMutex.RLock() + defer g.poolsMutex.RUnlock() + + g.groupsMutex.RLock() + defer g.groupsMutex.RUnlock() + + wp := make(map[string]*UnboundedWorkerPool) + g.pools.ForEach(func(key string, val *UnboundedWorkerPool) bool { + wp[fmt.Sprintf("%s.%s", g.name, key)] = val + return true + }) + g.groups.ForEach(func(prefix string, group *Group) bool { + for name, pool := range group.Pools() { + wp[fmt.Sprintf("%s.%s", g.name, name)] = pool + } + return true + }) + + return wp +} + func (g *Group) CreateGroup(name string) (group *Group) { group = NewGroup(name) group.PendingChildrenCounter.Subscribe(func(oldValue, newValue int) { diff --git a/core/workerpool/group_test.go b/core/workerpool/group_test.go index 04c2da228..4f25c5b24 100644 --- a/core/workerpool/group_test.go +++ b/core/workerpool/group_test.go @@ -8,9 +8,15 @@ import ( func Test(t *testing.T) { group := NewGroup("protocol") + _ = group.CreatePool("poolA") - pool1 := group.CreatePool("pool1") - pool2 := group.CreatePool("pool2") + subgroup1 := group.CreateGroup("sub1") + pool1 := subgroup1.CreatePool("pool1") + pool2 := subgroup1.CreatePool("pool2") + + subgroup2 := group.CreateGroup("sub2") + subSubGroup := subgroup2.CreateGroup("loop") + _ = subSubGroup.CreatePool("pool3") pool1.Submit(func() { time.Sleep(1 * time.Second) @@ -25,6 +31,6 @@ func Test(t *testing.T) { }) group.Shutdown() - + fmt.Println("ALL TASKS DONE") } From 307585d15232fdb75ae44f40dbfab9ef19336755 Mon Sep 17 00:00:00 2001 From: Hans Moog <3293976+hmoog@users.noreply.github.com> Date: Wed, 1 Feb 2023 15:43:06 +0100 Subject: [PATCH 09/16] Feat: added String method for group --- core/workerpool/group.go | 57 +++++++++++++++++++++++++++++++++++ core/workerpool/group_test.go | 20 +++++++++++- 2 files changed, 76 insertions(+), 1 deletion(-) diff --git a/core/workerpool/group.go b/core/workerpool/group.go index 0d6791bb2..e9dd4fb69 100644 --- a/core/workerpool/group.go +++ b/core/workerpool/group.go @@ -1,6 +1,8 @@ package workerpool import ( + "strconv" + "strings" "sync" "github.com/iotaledger/hive.go/core/generics/orderedmap" @@ -83,6 +85,59 @@ func (g *Group) Shutdown() { g.shutdown() } +func (g *Group) String() (humanReadable string) { + if indentedString := g.indentedString(0); indentedString != "" { + return strings.TrimRight(g.indentedString(0), "\r\n") + } + + return "> " + g.name + " (0 pending children)" +} + +func (g *Group) indentedString(indentation int) (humanReadable string) { + if pendingChildren := g.PendingChildrenCounter.Get(); pendingChildren != 0 { + if children := g.childrenString(indentation + 1); children != "" { + humanReadable = strings.Repeat(indentationString, indentation) + "> " + g.name + " (" + strconv.Itoa(pendingChildren) + " pending children) {\n" + humanReadable += children + humanReadable += strings.Repeat(indentationString, indentation) + "}\n" + } + } + + return humanReadable +} + +func (g *Group) childrenString(indentation int) (humanReadable string) { + humanReadable = g.poolsString(indentation) + + groups := g.groupsString(indentation) + if humanReadable != "" && groups != "" { + humanReadable += strings.Repeat(indentationString, indentation) + "\n" + } + + return humanReadable + groups +} + +func (g *Group) poolsString(indentation int) (humanReadable string) { + g.pools.ForEach(func(key string, value *UnboundedWorkerPool) bool { + if currentValue := value.PendingTasksCounter.Get(); currentValue > 0 { + humanReadable += strings.Repeat(indentationString, indentation) + "- " + key + " (" + strconv.Itoa(currentValue) + " pending tasks)\n" + } + + return true + }) + + return humanReadable +} + +func (g *Group) groupsString(indentation int) (humanReadable string) { + g.groups.ForEach(func(key string, value *Group) bool { + humanReadable += value.indentedString(indentation) + + return true + }) + + return humanReadable +} + func (g *Group) shutdown() { g.shutdownPools() g.shutdownGroups() @@ -109,3 +164,5 @@ func (g *Group) shutdownGroups() { return true }) } + +const indentationString = " " diff --git a/core/workerpool/group_test.go b/core/workerpool/group_test.go index 04c2da228..bebe45efe 100644 --- a/core/workerpool/group_test.go +++ b/core/workerpool/group_test.go @@ -8,10 +8,12 @@ import ( func Test(t *testing.T) { group := NewGroup("protocol") - pool1 := group.CreatePool("pool1") pool2 := group.CreatePool("pool2") + subGroup := group.CreateGroup("engine") + pool3 := subGroup.CreatePool("booker") + pool1.Submit(func() { time.Sleep(1 * time.Second) @@ -24,7 +26,23 @@ func Test(t *testing.T) { fmt.Println("TASK2 done") }) + pool3.Submit(func() { + time.Sleep(2 * time.Second) + + fmt.Println("TASK3 done") + }) + + go func() { + for { + fmt.Println(group) + + time.Sleep(500 * time.Millisecond) + } + }() + group.Shutdown() fmt.Println("ALL TASKS DONE") + + time.Sleep(1 * time.Second) } From 45e5074816250cfba440d20fa072a8a829f6acdd Mon Sep 17 00:00:00 2001 From: Hans Moog <3293976+hmoog@users.noreply.github.com> Date: Wed, 1 Feb 2023 15:56:05 +0100 Subject: [PATCH 10/16] Feat: refactored code --- core/workerpool/group.go | 24 ++++++++++++------------ core/workerpool/group_test.go | 1 + 2 files changed, 13 insertions(+), 12 deletions(-) diff --git a/core/workerpool/group.go b/core/workerpool/group.go index 762e57f54..6a08dce03 100644 --- a/core/workerpool/group.go +++ b/core/workerpool/group.go @@ -58,26 +58,26 @@ func (g *Group) Pool(name string) (pool *UnboundedWorkerPool, exists bool) { return g.pools.Get(name) } -func (g *Group) Pools() map[string]*UnboundedWorkerPool { - g.poolsMutex.RLock() - defer g.poolsMutex.RUnlock() - - g.groupsMutex.RLock() - defer g.groupsMutex.RUnlock() +func (g *Group) Pools() (pools map[string]*UnboundedWorkerPool) { + pools = make(map[string]*UnboundedWorkerPool) - wp := make(map[string]*UnboundedWorkerPool) - g.pools.ForEach(func(key string, val *UnboundedWorkerPool) bool { - wp[fmt.Sprintf("%s.%s", g.name, key)] = val + g.poolsMutex.RLock() + g.pools.ForEach(func(name string, pool *UnboundedWorkerPool) bool { + pools[fmt.Sprintf("%s.%s", g.name, name)] = pool return true }) - g.groups.ForEach(func(prefix string, group *Group) bool { + g.poolsMutex.RUnlock() + + g.groupsMutex.RLock() + g.groups.ForEach(func(_ string, group *Group) bool { for name, pool := range group.Pools() { - wp[fmt.Sprintf("%s.%s", g.name, name)] = pool + pools[fmt.Sprintf("%s.%s", g.name, name)] = pool } return true }) + g.groupsMutex.RUnlock() - return wp + return pools } func (g *Group) CreateGroup(name string) (group *Group) { diff --git a/core/workerpool/group_test.go b/core/workerpool/group_test.go index 08e7f641e..1613c5e9b 100644 --- a/core/workerpool/group_test.go +++ b/core/workerpool/group_test.go @@ -31,6 +31,7 @@ func Test(t *testing.T) { }) fmt.Println(group) + fmt.Println(group.Pools()) group.Shutdown() From 894dab4bcbeb25ed714e9dcd38dfac5c68be47a6 Mon Sep 17 00:00:00 2001 From: Alexander Sporn Date: Wed, 1 Feb 2023 17:39:01 +0100 Subject: [PATCH 11/16] Added name to worker pools to be able to track which one is running --- core/generics/event/event.go | 13 ------------- core/generics/event/loop.go | 2 +- core/workerpool/group.go | 6 +++++- core/workerpool/unboundedworkerpool.go | 7 +++++-- 4 files changed, 11 insertions(+), 17 deletions(-) diff --git a/core/generics/event/event.go b/core/generics/event/event.go index 0c1464e98..ef037f1cb 100644 --- a/core/generics/event/event.go +++ b/core/generics/event/event.go @@ -37,19 +37,6 @@ func (e *Event[T]) Attach(closure *Closure[T], triggerMaxCount ...uint64) { e.asyncHandlers.Set(closure.ID, newHandler[T](e.callbackFromClosure(closure, triggerMaxCount...), Loop)) } -// AttachWithNewWorkerPool allows to register a Closure that is executed asynchronously in a separate, newly created worker pool when the Event triggers. -// If 'triggerMaxCount' is >0, the Closure is automatically detached after exceeding the trigger limit. -func (e *Event[T]) AttachWithNewWorkerPool(closure *Closure[T], workers int, triggerMaxCount ...uint64) *workerpool.UnboundedWorkerPool { - if closure == nil { - return nil - } - - wp := workerpool.NewUnboundedWorkerPool(workers) - e.asyncHandlers.Set(closure.ID, newHandler[T](e.callbackFromClosure(closure, triggerMaxCount...), wp)) - wp.Start() - return wp -} - // AttachWithWorkerPool allows to register a Closure that is executed asynchronously in the specified worker pool when the Event triggers. // If 'triggerMaxCount' is >0, the Closure is automatically detached after exceeding the trigger limit. func (e *Event[T]) AttachWithWorkerPool(closure *Closure[T], wp *workerpool.UnboundedWorkerPool, triggerMaxCount ...uint64) { diff --git a/core/generics/event/loop.go b/core/generics/event/loop.go index dc68eacbd..91516ef26 100644 --- a/core/generics/event/loop.go +++ b/core/generics/event/loop.go @@ -7,6 +7,6 @@ import ( var Loop *workerpool.UnboundedWorkerPool func init() { - Loop = workerpool.NewUnboundedWorkerPool() + Loop = workerpool.NewUnboundedWorkerPool("event.Loop") Loop.Start() } diff --git a/core/workerpool/group.go b/core/workerpool/group.go index 6a08dce03..4c561c063 100644 --- a/core/workerpool/group.go +++ b/core/workerpool/group.go @@ -37,7 +37,7 @@ func (g *Group) CreatePool(name string, optsWorkerCount ...int) (pool *Unbounded g.poolsMutex.Lock() defer g.poolsMutex.Unlock() - pool = NewUnboundedWorkerPool(optsWorkerCount...) + pool = NewUnboundedWorkerPool(name, optsWorkerCount...) pool.PendingTasksCounter.Subscribe(func(oldValue, newValue int) { if oldValue == 0 { g.PendingChildrenCounter.Increase() @@ -51,6 +51,10 @@ func (g *Group) CreatePool(name string, optsWorkerCount ...int) (pool *Unbounded return pool.Start() } +func (g *Group) Wait() { + g.PendingChildrenCounter.WaitIsZero() +} + func (g *Group) Pool(name string) (pool *UnboundedWorkerPool, exists bool) { g.poolsMutex.RLock() defer g.poolsMutex.RUnlock() diff --git a/core/workerpool/unboundedworkerpool.go b/core/workerpool/unboundedworkerpool.go index cd66d3701..02b0c0e47 100644 --- a/core/workerpool/unboundedworkerpool.go +++ b/core/workerpool/unboundedworkerpool.go @@ -1,6 +1,7 @@ package workerpool import ( + "fmt" "runtime" "sync" @@ -8,6 +9,7 @@ import ( ) type UnboundedWorkerPool struct { + Name string PendingTasksCounter *syncutils.Counter Queue *syncutils.Stack[*WorkerPoolTask] ShutdownComplete sync.WaitGroup @@ -19,12 +21,13 @@ type UnboundedWorkerPool struct { mutex syncutils.RWMutex } -func NewUnboundedWorkerPool(optsWorkerCount ...int) (newUnboundedWorkerPool *UnboundedWorkerPool) { +func NewUnboundedWorkerPool(name string, optsWorkerCount ...int) (newUnboundedWorkerPool *UnboundedWorkerPool) { if len(optsWorkerCount) == 0 { optsWorkerCount = append(optsWorkerCount, 2*runtime.NumCPU()) } return &UnboundedWorkerPool{ + Name: name, PendingTasksCounter: syncutils.NewCounter(), Queue: syncutils.NewStack[*WorkerPoolTask](), workerCount: optsWorkerCount[0], @@ -50,7 +53,7 @@ func (u *UnboundedWorkerPool) Start() (self *UnboundedWorkerPool) { func (u *UnboundedWorkerPool) Submit(task func(), optStackTrace ...string) { if !u.IsRunning() { - panic("worker pool is not running") + panic(fmt.Sprintf("worker pool '%s' is not running", u.Name)) } if u.PendingTasksCounter.Increase(); len(optStackTrace) >= 1 { From 0d262fa30570d2f0668f61076e855171b81a1918 Mon Sep 17 00:00:00 2001 From: Hans Moog <3293976+hmoog@users.noreply.github.com> Date: Wed, 1 Feb 2023 19:58:55 +0100 Subject: [PATCH 12/16] Fix: fixed sorry --- core/workerpool/unboundedworkerpool_test.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/core/workerpool/unboundedworkerpool_test.go b/core/workerpool/unboundedworkerpool_test.go index c987ce986..7bc6b184d 100644 --- a/core/workerpool/unboundedworkerpool_test.go +++ b/core/workerpool/unboundedworkerpool_test.go @@ -11,7 +11,7 @@ import ( func Test_NonBlockingNoFlush(t *testing.T) { const workerCount = 2 - wp := NewUnboundedWorkerPool(workerCount) + wp := NewUnboundedWorkerPool(t.Name(), workerCount) wp.Start() @@ -49,7 +49,7 @@ func Test_NonBlockingNoFlush(t *testing.T) { func Test_NonBlockingFlush(t *testing.T) { const workerCount = 2 - wp := NewUnboundedWorkerPool(workerCount) + wp := NewUnboundedWorkerPool(t.Name(), workerCount) wp.Start() @@ -82,7 +82,7 @@ func Test_NonBlockingFlush(t *testing.T) { func Test_QueueWaitSizeIsBelow(t *testing.T) { const workerCount = 2 - wp := NewUnboundedWorkerPool(workerCount) + wp := NewUnboundedWorkerPool(t.Name(), workerCount) wp.Start() @@ -117,7 +117,7 @@ func Test_QueueWaitSizeIsBelow(t *testing.T) { func Test_EmptyPoolStartupAndShutdown(t *testing.T) { const workerCount = 2 - wp := NewUnboundedWorkerPool(workerCount) + wp := NewUnboundedWorkerPool(t.Name(), workerCount) wp.Start() From 5238c174a68b205977f442a2026b0b81bf146bc2 Mon Sep 17 00:00:00 2001 From: Alexander Sporn Date: Thu, 2 Feb 2023 11:24:55 +0100 Subject: [PATCH 13/16] added a WaitAll() to the groups to be able to wait for the root group in the tree --- core/workerpool/group.go | 17 ++++++++++++++++- 1 file changed, 16 insertions(+), 1 deletion(-) diff --git a/core/workerpool/group.go b/core/workerpool/group.go index 4c561c063..a2a3096b4 100644 --- a/core/workerpool/group.go +++ b/core/workerpool/group.go @@ -18,14 +18,20 @@ type Group struct { poolsMutex sync.RWMutex groups *orderedmap.OrderedMap[string, *Group] groupsMutex sync.RWMutex + root *Group } func NewGroup(name string) (group *Group) { + return newGroupWithRoot(name, nil) +} + +func newGroupWithRoot(name string, root *Group) (group *Group) { return &Group{ PendingChildrenCounter: syncutils.NewCounter(), name: name, pools: orderedmap.New[string, *UnboundedWorkerPool](), groups: orderedmap.New[string, *Group](), + root: root, } } @@ -55,6 +61,15 @@ func (g *Group) Wait() { g.PendingChildrenCounter.WaitIsZero() } +func (g *Group) WaitAll() { + if g.root != nil { + g.root.Wait() + return + } + + g.Wait() +} + func (g *Group) Pool(name string) (pool *UnboundedWorkerPool, exists bool) { g.poolsMutex.RLock() defer g.poolsMutex.RUnlock() @@ -85,7 +100,7 @@ func (g *Group) Pools() (pools map[string]*UnboundedWorkerPool) { } func (g *Group) CreateGroup(name string) (group *Group) { - group = NewGroup(name) + group = newGroupWithRoot(name, g.root) group.PendingChildrenCounter.Subscribe(func(oldValue, newValue int) { if oldValue == 0 { g.PendingChildrenCounter.Increase() From 807f6426e4c00ca0d1db894343a25feb25b8fee4 Mon Sep 17 00:00:00 2001 From: Alexander Sporn Date: Thu, 2 Feb 2023 11:40:38 +0100 Subject: [PATCH 14/16] Fixed root not passing itself --- core/workerpool/group.go | 3 ++- core/workerpool/group_test.go | 2 +- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/core/workerpool/group.go b/core/workerpool/group.go index a2a3096b4..3d9de6f7b 100644 --- a/core/workerpool/group.go +++ b/core/workerpool/group.go @@ -6,6 +6,7 @@ import ( "strings" "sync" + "github.com/iotaledger/hive.go/core/generics/lo" "github.com/iotaledger/hive.go/core/generics/orderedmap" "github.com/iotaledger/hive.go/core/syncutils" ) @@ -100,7 +101,7 @@ func (g *Group) Pools() (pools map[string]*UnboundedWorkerPool) { } func (g *Group) CreateGroup(name string) (group *Group) { - group = newGroupWithRoot(name, g.root) + group = newGroupWithRoot(name, lo.Cond(g.root != nil, g.root, g)) group.PendingChildrenCounter.Subscribe(func(oldValue, newValue int) { if oldValue == 0 { g.PendingChildrenCounter.Increase() diff --git a/core/workerpool/group_test.go b/core/workerpool/group_test.go index 1613c5e9b..43f155318 100644 --- a/core/workerpool/group_test.go +++ b/core/workerpool/group_test.go @@ -7,7 +7,7 @@ import ( ) func Test(t *testing.T) { - group := NewGroup("protocol") + group := NewGroup(t.Name()) _ = group.CreatePool("poolA") subgroup1 := group.CreateGroup("sub1") From 10ff221ec303eb223d58f0ddd8e06e76fd6e3348 Mon Sep 17 00:00:00 2001 From: Alexander Sporn Date: Thu, 2 Feb 2023 11:46:57 +0100 Subject: [PATCH 15/16] expose root group in a getter --- core/workerpool/group.go | 13 ++++++------- core/workerpool/group_test.go | 6 ++++++ 2 files changed, 12 insertions(+), 7 deletions(-) diff --git a/core/workerpool/group.go b/core/workerpool/group.go index 3d9de6f7b..fdf8d273a 100644 --- a/core/workerpool/group.go +++ b/core/workerpool/group.go @@ -58,17 +58,16 @@ func (g *Group) CreatePool(name string, optsWorkerCount ...int) (pool *Unbounded return pool.Start() } +func (g *Group) Root() *Group { + return lo.Cond(g.root != nil, g.root, g) +} + func (g *Group) Wait() { g.PendingChildrenCounter.WaitIsZero() } func (g *Group) WaitAll() { - if g.root != nil { - g.root.Wait() - return - } - - g.Wait() + g.Root().Wait() } func (g *Group) Pool(name string) (pool *UnboundedWorkerPool, exists bool) { @@ -101,7 +100,7 @@ func (g *Group) Pools() (pools map[string]*UnboundedWorkerPool) { } func (g *Group) CreateGroup(name string) (group *Group) { - group = newGroupWithRoot(name, lo.Cond(g.root != nil, g.root, g)) + group = newGroupWithRoot(name, g.Root()) group.PendingChildrenCounter.Subscribe(func(oldValue, newValue int) { if oldValue == 0 { g.PendingChildrenCounter.Increase() diff --git a/core/workerpool/group_test.go b/core/workerpool/group_test.go index 43f155318..35225d52d 100644 --- a/core/workerpool/group_test.go +++ b/core/workerpool/group_test.go @@ -4,12 +4,16 @@ import ( "fmt" "testing" "time" + + "github.com/stretchr/testify/require" ) func Test(t *testing.T) { group := NewGroup(t.Name()) _ = group.CreatePool("poolA") + require.Equal(t, group, group.Root()) + subgroup1 := group.CreateGroup("sub1") pool1 := subgroup1.CreatePool("pool1") pool2 := subgroup1.CreatePool("pool2") @@ -18,6 +22,8 @@ func Test(t *testing.T) { subSubGroup := subgroup2.CreateGroup("loop") _ = subSubGroup.CreatePool("pool3") + require.Equal(t, group, subSubGroup.Root()) + pool1.Submit(func() { time.Sleep(1 * time.Second) From 256ed6b77ff3f30c11c9fca0778fa33eef078cf1 Mon Sep 17 00:00:00 2001 From: Alexander Sporn Date: Thu, 2 Feb 2023 18:39:50 +0100 Subject: [PATCH 16/16] panic if names are reused --- core/workerpool/group.go | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/core/workerpool/group.go b/core/workerpool/group.go index fdf8d273a..498c3b4bf 100644 --- a/core/workerpool/group.go +++ b/core/workerpool/group.go @@ -53,7 +53,9 @@ func (g *Group) CreatePool(name string, optsWorkerCount ...int) (pool *Unbounded } }) - g.pools.Set(name, pool) + if !g.pools.Set(name, pool) { + panic(fmt.Sprintf("pool '%s' already exists", name)) + } return pool.Start() } @@ -109,7 +111,9 @@ func (g *Group) CreateGroup(name string) (group *Group) { } }) - g.groups.Set(name, group) + if !g.groups.Set(name, group) { + panic(fmt.Sprintf("group '%s' already exists", name)) + } return group }