Skip to content

Commit

Permalink
remove boolean result from worker start / stop since false = already …
Browse files Browse the repository at this point in the history
…running or already stopped
  • Loading branch information
NyaaaWhatsUpDoc committed Apr 26, 2024
1 parent 273dcc6 commit 92b2f7a
Show file tree
Hide file tree
Showing 6 changed files with 152 additions and 91 deletions.
57 changes: 41 additions & 16 deletions internal/transport/delivery/delivery.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,28 +100,53 @@ func (p *WorkerPool) Init(client *httpclient.Client) {
}

// Start will attempt to start 'n' Worker{}s.
func (p *WorkerPool) Start(n int) (ok bool) {
if ok = (len(p.workers) == 0); ok {
p.workers = make([]*Worker, n)
for i := range p.workers {
p.workers[i] = new(Worker)
p.workers[i].Client = p.Client
p.workers[i].Queue = &p.Queue
ok = p.workers[i].Start() && ok
}
func (p *WorkerPool) Start(n int) {
// Check whether workers are
// set (is already running).
ok := (len(p.workers) > 0)
if ok {
return
}

// Allocate new workers slice.
p.workers = make([]*Worker, n)
for i := range p.workers {

// Allocate new Worker{}.
p.workers[i] = new(Worker)
p.workers[i].Client = p.Client
p.workers[i].Queue = &p.Queue

// Attempt to start worker.
// Return bool not useful
// here, as true = started,
// false = already running.
_ = p.workers[i].Start()
}

return
}

// Stop will attempt to stop contained Worker{}s.
func (p *WorkerPool) Stop() (ok bool) {
if ok = (len(p.workers) > 0); ok {
for i := range p.workers {
ok = p.workers[i].Stop() && ok
p.workers[i] = nil
}
p.workers = p.workers[:0]
func (p *WorkerPool) Stop() {
// Check whether workers are
// set (is currently running).
ok := (len(p.workers) == 0)
if ok {
return
}

// Stop all running workers.
for i := range p.workers {

// return bool not useful
// here, as true = stopped,
// false = never running.
_ = p.workers[i].Stop()
}

// Unset workers slice.
p.workers = p.workers[:0]
return
}

Expand Down
4 changes: 1 addition & 3 deletions internal/transport/delivery/delivery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,7 @@ func testDeliveryWorkerPool(t *testing.T, sz int, input []*testrequest) {
"127.0.0.0/8",
}),
}))
if !wp.Start(sz) {
t.Fatal("failed starting pool")
}
wp.Start(sz)
defer wp.Stop()
test(t, &wp.Queue, input)
}
Expand Down
55 changes: 40 additions & 15 deletions internal/workers/worker_fn.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,27 +39,52 @@ type FnWorkerPool struct {
}

// Start will attempt to start 'n' FnWorker{}s.
func (p *FnWorkerPool) Start(n int) (ok bool) {
if ok = (len(p.workers) == 0); ok {
p.workers = make([]*FnWorker, n)
for i := range p.workers {
p.workers[i] = new(FnWorker)
p.workers[i].Queue = &p.Queue
ok = p.workers[i].Start() && ok
}
func (p *FnWorkerPool) Start(n int) {
// Check whether workers are
// set (is already running).
ok := (len(p.workers) > 0)
if ok {
return
}

// Allocate new workers slice.
p.workers = make([]*FnWorker, n)
for i := range p.workers {

// Allocate new FnWorker{}.
p.workers[i] = new(FnWorker)
p.workers[i].Queue = &p.Queue

// Attempt to start worker.
// Return bool not useful
// here, as true = started,
// false = already running.
_ = p.workers[i].Start()
}

return
}

// Stop will attempt to stop contained FnWorker{}s.
func (p *FnWorkerPool) Stop() (ok bool) {
if ok = (len(p.workers) > 0); ok {
for i := range p.workers {
ok = p.workers[i].Stop() && ok
p.workers[i] = nil
}
p.workers = p.workers[:0]
func (p *FnWorkerPool) Stop() {
// Check whether workers are
// set (is currently running).
ok := (len(p.workers) == 0)
if ok {
return
}

// Stop all running workers.
for i := range p.workers {

// return bool not useful
// here, as true = stopped,
// false = never running.
_ = p.workers[i].Stop()
}

// Unset workers slice.
p.workers = p.workers[:0]
return
}

Expand Down
57 changes: 41 additions & 16 deletions internal/workers/worker_msg.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,28 +48,53 @@ func (p *MsgWorkerPool[T]) Init(indices []structr.IndexConfig) {
}

// Start will attempt to start 'n' Worker{}s.
func (p *MsgWorkerPool[T]) Start(n int) (ok bool) {
if ok = (len(p.workers) == 0); ok {
p.workers = make([]*MsgWorker[T], n)
for i := range p.workers {
p.workers[i] = new(MsgWorker[T])
p.workers[i].Process = p.Process
p.workers[i].Queue = &p.Queue
ok = p.workers[i].Start() && ok
}
func (p *MsgWorkerPool[T]) Start(n int) {
// Check whether workers are
// set (is already running).
ok := (len(p.workers) > 0)
if ok {
return
}

// Allocate new msg workers slice.
p.workers = make([]*MsgWorker[T], n)
for i := range p.workers {

// Allocate new MsgWorker[T]{}.
p.workers[i] = new(MsgWorker[T])
p.workers[i].Process = p.Process
p.workers[i].Queue = &p.Queue

// Attempt to start worker.
// Return bool not useful
// here, as true = started,
// false = already running.
_ = p.workers[i].Start()
}

return
}

// Stop will attempt to stop contained Worker{}s.
func (p *MsgWorkerPool[T]) Stop() (ok bool) {
if ok = (len(p.workers) > 0); ok {
for i := range p.workers {
ok = p.workers[i].Stop() && ok
p.workers[i] = nil
}
p.workers = p.workers[:0]
func (p *MsgWorkerPool[T]) Stop() {
// Check whether workers are
// set (is currently running).
ok := (len(p.workers) == 0)
if ok {
return
}

// Stop all running workers.
for i := range p.workers {

// return bool not useful
// here, as true = stopped,
// false = never running.
_ = p.workers[i].Stop()
}

// Unset workers slice.
p.workers = p.workers[:0]
return
}

Expand Down
54 changes: 21 additions & 33 deletions internal/workers/workers.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,48 +59,27 @@ type Workers struct {

// StartScheduler starts the job scheduler.
func (w *Workers) StartScheduler() {
tryUntil("starting scheduler", 5, w.Scheduler.Start)
_ = w.Scheduler.Start() // false = already running
}

// Start will start contained worker pools.
func (w *Workers) Start() {
// Get currently set GOMAXPROCS.
maxprocs := runtime.GOMAXPROCS(0)

tryUntil("start delivery workerpool", 5, func() bool {
n := config.GetAdvancedSenderMultiplier()
if n < 1 {
// clamp min senders to 1.
return w.Delivery.Start(1)
}
return w.Delivery.Start(n * maxprocs)
})

tryUntil("starting client workerpool", 5, func() bool {
return w.Client.Start(4 * maxprocs)
})

tryUntil("starting federator workerpool", 5, func() bool {
return w.Federator.Start(4 * maxprocs)
})

tryUntil("starting dereference workerpool", 5, func() bool {
return w.Dereference.Start(4 * maxprocs)
})

tryUntil("starting media workerpool", 5, func() bool {
return w.Media.Start(8 * maxprocs)
})
w.Delivery.Start(deliveryWorkers(maxprocs))
w.Client.Start(4 * maxprocs)
w.Federator.Start(4 * maxprocs)
w.Dereference.Start(4 * maxprocs)
w.Media.Start(8 * maxprocs)
}

// Stop will stop all of the contained worker pools (and global scheduler).
func (w *Workers) Stop() {
tryUntil("stopping scheduler", 5, w.Scheduler.Stop)
tryUntil("stopping delivery workerpool", 5, w.Delivery.Stop)
tryUntil("stopping client API workerpool", 5, w.Client.Stop)
tryUntil("stopping federator workerpool", 5, w.Federator.Stop)
tryUntil("stopping dereference workerpool", 5, w.Dereference.Stop)
tryUntil("stopping media workerpool", 5, w.Media.Stop)
_ = w.Scheduler.Stop() // false = not running
w.Delivery.Stop()
w.Client.Stop()
w.Federator.Stop()
w.Dereference.Stop()
w.Media.Stop()
}

// nocopy when embedded will signal linter to
Expand All @@ -111,6 +90,15 @@ func (*nocopy) Lock() {}

func (*nocopy) Unlock() {}

func deliveryWorkers(maxprocs int) int {
n := config.GetAdvancedSenderMultiplier()
if n < 1 {
// clamp to 1
return 1
}
return n * maxprocs
}

// tryUntil will attempt to call 'do' for 'count' attempts, before panicking with 'msg'.
func tryUntil(msg string, count int, do func() bool) {
for i := 0; i < count; i++ {
Expand Down
16 changes: 8 additions & 8 deletions testrig/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,18 +79,18 @@ func StartWorkers(state *state.State, processor *workers.Processor) {
state.Workers.Delivery.Init(nil)

_ = state.Workers.Scheduler.Start()
_ = state.Workers.Client.Start(1)
_ = state.Workers.Federator.Start(1)
_ = state.Workers.Dereference.Start(1)
_ = state.Workers.Media.Start(1)
state.Workers.Client.Start(1)
state.Workers.Federator.Start(1)
state.Workers.Dereference.Start(1)
state.Workers.Media.Start(1)
}

func StopWorkers(state *state.State) {
_ = state.Workers.Scheduler.Stop()
_ = state.Workers.Client.Stop()
_ = state.Workers.Federator.Stop()
_ = state.Workers.Dereference.Stop()
_ = state.Workers.Media.Stop()
state.Workers.Client.Stop()
state.Workers.Federator.Stop()
state.Workers.Dereference.Stop()
state.Workers.Media.Stop()
}

func StartTimelines(state *state.State, filter *visibility.Filter, converter *typeutils.Converter) {
Expand Down

0 comments on commit 92b2f7a

Please sign in to comment.