Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

When multiple goroutine calls consume, mb.newShard(md) will be called multiple times. #9739

Closed
luyuanx2 opened this issue Mar 11, 2024 · 3 comments · Fixed by #9817
Closed
Labels
bug Something isn't working

Comments

@luyuanx2
Copy link

Describe the bug

The problematic code is as shown below:
image

Steps to reproduce

Here is a simple code showing this bug:

package main

import (
	"fmt"
	"sync"
	"time"
)

type multiShardBatcher struct {
	batchers sync.Map

	lock sync.Mutex
	size int
}
type shard struct {
}

func (mb *multiShardBatcher) newShard() *shard {
	b := &shard{}
	fmt.Println("new shard")
	time.Sleep(1 * time.Second)
	return b
}

func (mb *multiShardBatcher) consume(key string) *shard {

	b, ok := mb.batchers.Load(key)
	if !ok {
		mb.lock.Lock()
		b, _ = mb.batchers.LoadOrStore(key, mb.newShard())
		mb.lock.Unlock()
	}
	return b.(*shard)
}

func main() {

	mb := &multiShardBatcher{}

	group := sync.WaitGroup{}
	for i := 0; i < 10; i++ {
		group.Add(1)
		go func() {
			mb.consume("key")
			group.Done()
		}()
	}
	group.Wait()

}
image

What did you expect to see?

What did you see instead?

What version did you use?

What config did you use?

Environment

Additional context

@luyuanx2 luyuanx2 added the bug Something isn't working label Mar 11, 2024
@mx-psi
Copy link
Member

mx-psi commented Mar 11, 2024

cc @dmitryax

@andrzej-stencel
Copy link
Member

andrzej-stencel commented Mar 21, 2024

This report looks valid to me. The mb.newShard(md) method not only creates a new shard, but also starts a new goroutine on it, so we should not be doing that if we might not keep the shard around (which is what happens when the shard had already been created in another goroutine). Otherwise we end up with a goroutine leak.

We should move mb.lock.Lock() to before the initial mb.batchers.Load(aset).

This code was introduced in #7714. @bogdandrutu can you take a look?

cc @jmacd as the original contributor of the feature (#7578).

andrzej-stencel added a commit to andrzej-stencel/opentelemetry-collector that referenced this issue Mar 21, 2024
andrzej-stencel added a commit to andrzej-stencel/opentelemetry-collector that referenced this issue Mar 21, 2024
@andrzej-stencel
Copy link
Member

Proposed a fix for this in #9814.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
3 participants