-
Notifications
You must be signed in to change notification settings - Fork 244
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Intelligent addition or reduction of workers #193
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,6 +1,7 @@ | ||
package boomer | ||
|
||
import ( | ||
"context" | ||
"fmt" | ||
"log" | ||
"math/rand" | ||
|
@@ -41,9 +42,9 @@ type runner struct { | |
numClients int32 | ||
spawnRate float64 | ||
|
||
// all running workers(goroutines) will select on this channel. | ||
// close this channel will stop all running workers. | ||
stopChan chan bool | ||
// Cancellation method for all running workers(goroutines) | ||
cancelFuncs []context.CancelFunc | ||
mu sync.Mutex | ||
|
||
// close this channel will stop all goroutines used in runner, including running workers. | ||
shutdownChan chan bool | ||
|
@@ -120,22 +121,19 @@ func (r *runner) outputOnStop() { | |
wg.Wait() | ||
} | ||
|
||
func (r *runner) spawnWorkers(spawnCount int, quit chan bool, spawnCompleteFunc func()) { | ||
log.Println("Spawning", spawnCount, "clients immediately") | ||
|
||
for i := 1; i <= spawnCount; i++ { | ||
// addWorkers start the goroutines and add it to cancelFuncs | ||
func (r *runner) addWorkers(gapCount int) { | ||
for i := 0; i < gapCount; i++ { | ||
select { | ||
case <-quit: | ||
// quit spawning goroutine | ||
return | ||
case <-r.shutdownChan: | ||
return | ||
default: | ||
atomic.AddInt32(&r.numClients, 1) | ||
go func() { | ||
ctx, cancel := context.WithCancel(context.TODO()) | ||
r.cancelFuncs = append(r.cancelFuncs, cancel) | ||
go func(ctx context.Context) { | ||
for { | ||
select { | ||
case <-quit: | ||
case <-ctx.Done(): | ||
return | ||
case <-r.shutdownChan: | ||
return | ||
|
@@ -152,9 +150,44 @@ func (r *runner) spawnWorkers(spawnCount int, quit chan bool, spawnCompleteFunc | |
} | ||
} | ||
} | ||
}() | ||
}(ctx) | ||
} | ||
} | ||
} | ||
|
||
// reduceWorkers Stop the goroutines and remove it from the cancelFuncs | ||
func (r *runner) reduceWorkers(gapCount int) { | ||
if gapCount == 0 { | ||
return | ||
} | ||
num := len(r.cancelFuncs) - gapCount | ||
for _, cancelFunc := range r.cancelFuncs[num:] { | ||
cancelFunc() | ||
} | ||
|
||
r.cancelFuncs = r.cancelFuncs[:num] | ||
|
||
} | ||
|
||
func (r *runner) spawnWorkers(spawnCount int, spawnCompleteFunc func()) { | ||
// Avoid changing the number of clients simultaneously | ||
r.mu.Lock() | ||
defer r.mu.Unlock() | ||
|
||
log.Println("The total number of clients required is ", spawnCount) | ||
|
||
var gapCount int | ||
if spawnCount > int(r.numClients) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We should consider the weight of tasks. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 在setTask的时候根据权重生成一个runTask []*Task There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 嗯,你是对的,直接用 getTask 来管权重就好。 |
||
gapCount = spawnCount - int(r.numClients) | ||
log.Printf("The current number of clients is %v, %v clients will be added\n", r.numClients, gapCount) | ||
r.addWorkers(gapCount) | ||
} else { | ||
gapCount = int(r.numClients) - spawnCount | ||
log.Printf("The current number of clients is %v, %v clients will be removed\n", r.numClients, gapCount) | ||
r.reduceWorkers(gapCount) | ||
} | ||
|
||
r.numClients = int32(spawnCount) | ||
|
||
if spawnCompleteFunc != nil { | ||
spawnCompleteFunc() | ||
|
@@ -204,20 +237,15 @@ func (r *runner) getTask() *Task { | |
func (r *runner) startSpawning(spawnCount int, spawnRate float64, spawnCompleteFunc func()) { | ||
Events.Publish(EVENT_SPAWN, spawnCount, spawnRate) | ||
|
||
r.stopChan = make(chan bool) | ||
r.numClients = 0 | ||
|
||
go r.spawnWorkers(spawnCount, r.stopChan, spawnCompleteFunc) | ||
go r.spawnWorkers(spawnCount, spawnCompleteFunc) | ||
} | ||
|
||
func (r *runner) stop() { | ||
// publish the boomer stop event | ||
// user's code can subscribe to this event and do thins like cleaning up | ||
Events.Publish(EVENT_STOP) | ||
|
||
// stop previous goroutines without blocking | ||
// those goroutines will exit when r.safeRun returns | ||
close(r.stopChan) | ||
go r.spawnWorkers(0, nil) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 靠 spawn 0 来实现 stop,有点拗口。 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 如果把 spawnWorkers方法名修改为 setwWorkerNumber会不会好点 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 直接用 stop 就好了吧,那怕是在这里遍历所有 context There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 是的,这里可以直接调用reduceWorkers 把当前活跃线程总数丢进去 |
||
} | ||
|
||
type localRunner struct { | ||
|
@@ -460,7 +488,6 @@ func (r *slaveRunner) onMessage(msgInterface message) { | |
switch msgType { | ||
case "spawn": | ||
r.state = stateSpawning | ||
r.stop() | ||
r.onSpawnMessage(genericMsg) | ||
case "stop": | ||
r.stop() | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
避免用 mutex,维护的心智负担有点大。有状态机来保证执行顺序了。
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
这里看起来可以直接删除锁机制
理由:增减线程命令来自于监听chan得到,本身是非并发的。整个操作不引入协程的话,就可以直接删除锁机制了
但是这样是否合理,大佬可以一起帮忙看一下,我现在提交
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
我只是不想在这里用锁,具体的实现有没有问题,要看测试用例。