You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
// retrieveWorker returns an available worker to run the tasks.func (p*Pool) retrieveWorker(ctx context.Context) (w*goWorker) {
spawnWorker:=func() {
w=p.workerCache.Get().(*goWorker)
w.run()
}
p.lock.Lock()
w=p.workers.detach()
ifw!=nil { // first try to fetch the worker from the queuep.lock.Unlock()
} elseifcapacity:=p.Cap(); capacity==-1||capacity>p.Running() {
// if the worker queue is empty and we don't run out of the pool capacity,// then just spawn a new worker goroutine.p.lock.Unlock()
spawnWorker()
} else { // otherwise, we'll have to keep them blocked and wait for at least one worker to be put back into pool.ifp.options.Nonblocking {
p.lock.Unlock()
return
}
varcchanstruct{}
ifctx!=nil {
c=make(chanstruct{})
}
retry:
ifp.options.MaxBlockingTasks!=0&&p.Waiting() >=p.options.MaxBlockingTasks {
p.lock.Unlock()
return
}
p.addWaiting(1)
ifctx==nil {
p.cond.Wait() // block and wait for an available worker
} else {
gofunc() {
p.cond.Wait() // block and wait for an available workerselect {
casec<-struct{}{}:
}
}()
select {
case<-ctx.Done():
p.lock.Unlock()
returncase<-c:
}
}
p.addWaiting(-1)
ifp.IsClosed() {
p.lock.Unlock()
return
}
varnwintifnw=p.Running(); nw==0 { // awakened by the scavengerp.lock.Unlock()
spawnWorker()
return
}
ifw=p.workers.detach(); w==nil {
ifnw<p.Cap() {
p.lock.Unlock()
spawnWorker()
return
}
goto retry
}
p.lock.Unlock()
}
return
}
Is your feature request related to a problem? Please describe.
当前 指定 pool size 且没有指定 Nonblocking 时, 当 pool 满了时 submit 是会阻寨的, 但有时我不能让任务一直等着, 想要加一个取消机制。
我想只要对 retrieveWorker 改造一下, 就可以实现了
The text was updated successfully, but these errors were encountered: