Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Can add SubmitWithContext(ctx context.Context, task func(ctx context.Context)) error #237

Closed
mei-rune opened this issue Jul 10, 2022 · 3 comments
Assignees
Labels
proposal Proposal for this repo waiting for response waiting for the response from commenter

Comments

@mei-rune
Copy link

Is your feature request related to a problem? Please describe.
当前 指定 pool size 且没有指定 Nonblocking 时, 当 pool 满了时 submit 是会阻寨的, 但有时我不能让任务一直等着, 想要加一个取消机制。

我想只要对 retrieveWorker 改造一下, 就可以实现了

func (p *Pool) SubmitWithContext(ctx context.Context, task func(ctx context.Context)) error  {
	if p.IsClosed() {
		return ErrPoolClosed
	}
	var w *goWorker
	if w = p.retrieveWorker(ctx); w == nil {
		return ErrPoolOverload
	}
	w.task <- func(ctx context.Context) {
	 	 task(ctx)
	}
	return nil
}
@mei-rune mei-rune added the proposal Proposal for this repo label Jul 10, 2022
@panjf2000
Copy link
Owner

pool 满了在

if w = p.retrieveWorker(ctx); w == nil {
		return ErrPoolOverload
	}

这一步就阻塞住了,所以无法通过你说的方式解决,你可以将 pool 设置成非阻塞的,这样 pool 满了就能直接返回错误了。

@panjf2000 panjf2000 added the waiting for response waiting for the response from commenter label Jul 19, 2022
@mei-rune
Copy link
Author

mei-rune commented Jul 20, 2022

是啊, 改造一下 retrieveWorker() 啊

我下面的例子, 启了一个线程是不对的, 但可以改进去掉, 我只是在这里示例

// retrieveWorker returns an available worker to run the tasks.
func (p *Pool) retrieveWorker(ctx context.Context) (w *goWorker) {
	spawnWorker := func() {
		w = p.workerCache.Get().(*goWorker)
		w.run()
	}

	p.lock.Lock()

	w = p.workers.detach()
	if w != nil { // first try to fetch the worker from the queue
		p.lock.Unlock()
	} else if capacity := p.Cap(); capacity == -1 || capacity > p.Running() {
		// if the worker queue is empty and we don't run out of the pool capacity,
		// then just spawn a new worker goroutine.
		p.lock.Unlock()
		spawnWorker()
	} else { // otherwise, we'll have to keep them blocked and wait for at least one worker to be put back into pool.
		if p.options.Nonblocking {
			p.lock.Unlock()
			return
		}
	       var c chan struct{}
               if ctx != nil {
	              c = make(chan struct{})
               }
	retry:
		if p.options.MaxBlockingTasks != 0 && p.Waiting() >= p.options.MaxBlockingTasks {
			p.lock.Unlock()
			return
		}
		p.addWaiting(1)
                if ctx == nil {
                       p.cond.Wait() // block and wait for an available worker
                } else {
                    go func() {
		       p.cond.Wait() // block and wait for an available worker
                       select {
		       case c <- struct{}{}:
                       }
                    }()
                    select {
                    case <- ctx.Done():
			p.lock.Unlock()
                        return
                    case <-c:
                    }
                }
                 
		p.addWaiting(-1)

		if p.IsClosed() {
			p.lock.Unlock()
			return
		}

		var nw int
		if nw = p.Running(); nw == 0 { // awakened by the scavenger
			p.lock.Unlock()
			spawnWorker()
			return
		}
		if w = p.workers.detach(); w == nil {
			if nw < p.Cap() {
				p.lock.Unlock()
				spawnWorker()
				return
			}
			goto retry
		}
		p.lock.Unlock()
	}
	return
}

@mei-rune
Copy link
Author

我重新想了一下, 发现还是有问题的

sync.Cond 不支持 WaitContext , 也不支持 WaitTimeout, 你想要实现一个是有困难的。

golang/go#16620
golang/go#9578

如果引入 chan 的话, 失去了 ants 的本意。

因此这个问题可以关了

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
proposal Proposal for this repo waiting for response waiting for the response from commenter
Projects
None yet
Development

No branches or pull requests

2 participants