Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Intelligent addition or reduction of workers #193

Merged
merged 4 commits into from
Oct 11, 2023
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 49 additions & 22 deletions runner.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package boomer

import (
"context"
"fmt"
"log"
"math/rand"
Expand Down Expand Up @@ -41,9 +42,9 @@ type runner struct {
numClients int32
spawnRate float64

// all running workers(goroutines) will select on this channel.
// close this channel will stop all running workers.
stopChan chan bool
// Cancellation method for all running workers(goroutines)
cancelFuncs []context.CancelFunc
mu sync.Mutex
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

避免用 mutex,维护的心智负担有点大。有状态机来保证执行顺序了。

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里看起来可以直接删除锁机制
理由:增减线程命令来自于监听chan得到,本身是非并发的。整个操作不引入协程的话,就可以直接删除锁机制了
但是这样是否合理,大佬可以一起帮忙看一下,我现在提交

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

我只是不想在这里用锁,具体的实现有没有问题,要看测试用例。


// close this channel will stop all goroutines used in runner, including running workers.
shutdownChan chan bool
Expand Down Expand Up @@ -120,22 +121,19 @@ func (r *runner) outputOnStop() {
wg.Wait()
}

func (r *runner) spawnWorkers(spawnCount int, quit chan bool, spawnCompleteFunc func()) {
log.Println("Spawning", spawnCount, "clients immediately")

for i := 1; i <= spawnCount; i++ {
// addWorkers start the goroutines and add it to cancelFuncs
func (r *runner) addWorkers(gapCount int) {
for i := 0; i < gapCount; i++ {
select {
case <-quit:
// quit spawning goroutine
return
case <-r.shutdownChan:
return
default:
atomic.AddInt32(&r.numClients, 1)
go func() {
ctx, cancel := context.WithCancel(context.TODO())
r.cancelFuncs = append(r.cancelFuncs, cancel)
go func(ctx context.Context) {
for {
select {
case <-quit:
case <-ctx.Done():
return
case <-r.shutdownChan:
return
Expand All @@ -152,9 +150,44 @@ func (r *runner) spawnWorkers(spawnCount int, quit chan bool, spawnCompleteFunc
}
}
}
}()
}(ctx)
}
}
}

// reduceWorkers Stop the goroutines and remove it from the cancelFuncs
func (r *runner) reduceWorkers(gapCount int) {
if gapCount == 0 {
return
}
num := len(r.cancelFuncs) - gapCount
for _, cancelFunc := range r.cancelFuncs[num:] {
cancelFunc()
}

r.cancelFuncs = r.cancelFuncs[:num]

}

func (r *runner) spawnWorkers(spawnCount int, spawnCompleteFunc func()) {
// Avoid changing the number of clients simultaneously
r.mu.Lock()
defer r.mu.Unlock()

log.Println("The total number of clients required is ", spawnCount)

var gapCount int
if spawnCount > int(r.numClients) {
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should consider the weight of tasks.

Copy link
Contributor Author

@MyNextWeekend MyNextWeekend Sep 18, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

请见谅,怕表达不清晰,采用中文
我是这样理解的
1、spawnWorkers应该专注于活跃线程的增减

func (r *runner) spawnWorkers(spawnCount int, quit chan bool, spawnCompleteFunc func()) {

2、getTask应该专注于给活跃线程提供任务(按权重提供)
func (r *runner) getTask() *Task {

此处没有理解关于任务权重的想法,还请明确一下

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

在setTask的时候根据权重生成一个runTask []*Task
每个活跃的goroutine将一直getTask 从runTask中获取任务并执行任务
关于权重这块提交了代码,请查阅

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

嗯,你是对的,直接用 getTask 来管权重就好。

gapCount = spawnCount - int(r.numClients)
log.Printf("The current number of clients is %v, %v clients will be added\n", r.numClients, gapCount)
r.addWorkers(gapCount)
} else {
gapCount = int(r.numClients) - spawnCount
log.Printf("The current number of clients is %v, %v clients will be removed\n", r.numClients, gapCount)
r.reduceWorkers(gapCount)
}

r.numClients = int32(spawnCount)

if spawnCompleteFunc != nil {
spawnCompleteFunc()
Expand Down Expand Up @@ -204,20 +237,15 @@ func (r *runner) getTask() *Task {
func (r *runner) startSpawning(spawnCount int, spawnRate float64, spawnCompleteFunc func()) {
Events.Publish(EVENT_SPAWN, spawnCount, spawnRate)

r.stopChan = make(chan bool)
r.numClients = 0

go r.spawnWorkers(spawnCount, r.stopChan, spawnCompleteFunc)
go r.spawnWorkers(spawnCount, spawnCompleteFunc)
}

func (r *runner) stop() {
// publish the boomer stop event
// user's code can subscribe to this event and do thins like cleaning up
Events.Publish(EVENT_STOP)

// stop previous goroutines without blocking
// those goroutines will exit when r.safeRun returns
close(r.stopChan)
go r.spawnWorkers(0, nil)
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

靠 spawn 0 来实现 stop,有点拗口。

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

如果把 spawnWorkers方法名修改为 setwWorkerNumber会不会好点

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

直接用 stop 就好了吧,那怕是在这里遍历所有 context

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

是的,这里可以直接调用reduceWorkers 把当前活跃线程总数丢进去
我修改一下

}

type localRunner struct {
Expand Down Expand Up @@ -460,7 +488,6 @@ func (r *slaveRunner) onMessage(msgInterface message) {
switch msgType {
case "spawn":
r.state = stateSpawning
r.stop()
r.onSpawnMessage(genericMsg)
case "stop":
r.stop()
Expand Down