Skip to content

Commit

Permalink
refactor(workers): start goroutines concurrently
Browse files Browse the repository at this point in the history
  • Loading branch information
EverythingSuckz committed Dec 4, 2023
1 parent 2fabdfd commit fa27487
Showing 1 changed file with 32 additions and 24 deletions.
56 changes: 32 additions & 24 deletions internal/bot/workers.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"os"
"path/filepath"
"sync"
"sync/atomic"
"time"

"github.com/celestix/gotgproto"
Expand Down Expand Up @@ -91,44 +92,51 @@ func GetNextWorker() *Worker {

func StartWorkers(log *zap.Logger) {
log.Sugar().Info("Starting workers")
timeOut := 30 * time.Second
ctx, cancel := context.WithTimeout(context.Background(), timeOut)
defer cancel()

Workers.Init(log)

if config.ValueOf.UseSessionFile {
log.Sugar().Info("Using session file for workers")
newpath := filepath.Join(".", "sessions")
err := os.MkdirAll(newpath, os.ModePerm)
if err != nil {
if err := os.MkdirAll(newpath, os.ModePerm); err != nil {
log.Error("Failed to create sessions directory", zap.Error(err))
return
}
}
c := make(chan struct{}, len(config.ValueOf.MultiTokens))
for i := 0; i < len(config.ValueOf.MultiTokens); i++ {

var wg sync.WaitGroup
var successfulStarts int32
totalBots := len(config.ValueOf.MultiTokens)

for i := 0; i < totalBots; i++ {
wg.Add(1)
go func(i int) {
err := Workers.Add(config.ValueOf.MultiTokens[i])
if err != nil {
log.Error("Failed to start worker", zap.Error(err))
return
}
defer wg.Done()

ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()

done := make(chan error, 1)
go func() {
err := Workers.Add(config.ValueOf.MultiTokens[i])
done <- err
}()

select {
default:
c <- struct{}{}
case err := <-done:
if err != nil {
log.Error("Failed to start worker", zap.Int("Worker Index", i), zap.Error(err))
} else {
atomic.AddInt32(&successfulStarts, 1)
}
case <-ctx.Done():
return
log.Error("Timed out starting worker", zap.Int("Worker Index", i))
}
}(i)
}
// wait for all workers to start
log.Sugar().Info("Waiting for all workers to start")
for i := 0; i < len(config.ValueOf.MultiTokens); i++ {
select {
case <-c:
case <-time.After(timeOut):
log.Sugar().Warnf("Timed out waiting for worker %d to start", i)
}
}

wg.Wait() // Wait for all goroutines to finish
log.Sugar().Infof("Successfully started %d/%d bots", successfulStarts, totalBots)
}

func startWorker(l *zap.Logger, botToken string, index int) (*gotgproto.Client, error) {
Expand Down

0 comments on commit fa27487

Please sign in to comment.