Skip to content

Commit

Permalink
Stop channel for master/worker also making sure no os.Exit is used
Browse files Browse the repository at this point in the history
  • Loading branch information
Siposattila committed May 23, 2024
1 parent 8b97f20 commit 6970124
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 8 deletions.
3 changes: 2 additions & 1 deletion internal/bulk/bulk.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package bulk

import (
"bufio"
"errors"
"fmt"
"os"
"strings"
Expand Down Expand Up @@ -103,7 +104,7 @@ func (b *Bulk) Start() {
case <-kill.KillCtx.Done():
console.Warning("Unexpected shutdown while sending emails.")

os.Exit(1)
return errors.New("Shutdown")
default:
time.Sleep(time.Duration(b.config.GetSendDelay()) * time.Millisecond)
b.emailClient.Send(&mail)
Expand Down
23 changes: 19 additions & 4 deletions internal/validate/master.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,21 @@ package validate
import (
"sync"

"github.com/Siposattila/gobulk/internal/console"

Check failure on line 6 in internal/validate/master.go

View workflow job for this annotation

GitHub Actions / build

"github.com/Siposattila/gobulk/internal/console" imported and not used
"github.com/schollz/progressbar/v3"
)

type MasterInterface interface {
NewWork(fn func())
Start()
Stop()
Wait()
}

type master struct {
maxWorkers int
pending chan *work
stop chan bool
bar *progressbar.ProgressBar
wg sync.WaitGroup
}
Expand All @@ -27,6 +30,7 @@ func NewMaster(totalWork int64, maxWorkers int) MasterInterface {
return &master{
maxWorkers: maxWorkers,
pending: make(chan *work),
stop: make(chan bool),
bar: progressbar.Default(totalWork),
}
}
Expand All @@ -40,10 +44,15 @@ func (m *master) NewWork(fn func()) {

func (m *master) worker() {
for {
w := <-m.pending
m.bar.Add(1)
w.fn()
m.wg.Done()
select {
case <-m.stop:
return
default:
w := <-m.pending
m.bar.Add(1)
w.fn()
m.wg.Done()
}
}
}

Expand All @@ -53,6 +62,12 @@ func (m *master) Start() {
}
}

func (m *master) Stop() {
for i := 0; i < m.maxWorkers; i++ {
m.stop <- true
}
}

func (m *master) Wait() {
m.wg.Wait()
}
6 changes: 3 additions & 3 deletions internal/validate/validate.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package validate

import (
"os"
"errors"

"github.com/Siposattila/gobulk/internal/console"
"github.com/Siposattila/gobulk/internal/email"
Expand Down Expand Up @@ -43,9 +43,10 @@ func (v *Validate) Start() {
for _, result := range results {
select {
case <-kill.KillCtx.Done():
master.Stop()
console.Warning("Unexpected shutdown while validating emails.")

os.Exit(1)
return errors.New("Shutdown")
default:
master.NewWork(func() {
result.ValidateEmail()
Expand All @@ -54,7 +55,6 @@ func (v *Validate) Start() {
}
}

// Returning an error will stop further batch processing
return nil
})
master.Wait()
Expand Down

0 comments on commit 6970124

Please sign in to comment.