Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: eliminate race condition on Monitor.globalTags #23467

Merged
merged 1 commit into from
Jun 16, 2022
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 41 additions & 8 deletions monitor/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,39 @@ const (
MonitorRetentionPolicyReplicaN = 1
)

// tags provides thread-safe tag handling
type tags struct {
mu sync.RWMutex
tags map[string]string
}

// NewTags creates a new tags struct to use
func newTags() *tags {
return &tags{
tags: make(map[string]string),
}
}

// Add adds a new tag to the tag collection
func (t *tags) Add(key string, value interface{}) {
t.mu.Lock()
defer t.mu.Unlock()

t.tags[key] = fmt.Sprintf("%v", value)
}

// Tags safely returns a copy of the current tag mapping
func (t *tags) Tags() map[string]string {
t.mu.RLock()
defer t.mu.RUnlock()

r := make(map[string]string)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a note of interest, 1.18 gave us a nice map copy function.

https://pkg.go.dev/golang.org/x/exp/maps#Copy

for k, v := range t.tags {
r[k] = v
}
return r
}

// Monitor represents an instance of the monitor system.
type Monitor struct {
// Build information for diagnostics.
Expand All @@ -42,8 +75,9 @@ type Monitor struct {

wg sync.WaitGroup

globalTags *tags

mu sync.RWMutex
globalTags map[string]string
diagRegistrations map[string]diagnostics.Client
reporter Reporter
done chan struct{}
Expand Down Expand Up @@ -73,7 +107,7 @@ type PointsWriter interface {
// New returns a new instance of the monitor system.
func New(r Reporter, c Config) *Monitor {
return &Monitor{
globalTags: make(map[string]string),
globalTags: newTags(),
diagRegistrations: make(map[string]diagnostics.Client),
reporter: r,
storeEnabled: c.StoreEnabled,
Expand Down Expand Up @@ -138,9 +172,10 @@ func (m *Monitor) WritePoints(p models.Points) error {
return nil
}

if len(m.globalTags) > 0 {
gt := m.globalTags.Tags()
if len(gt) > 0 {
for _, pp := range p {
pp.SetTags(pp.Tags().Merge(m.globalTags))
pp.SetTags(pp.Tags().Merge(gt))
}
}

Expand Down Expand Up @@ -185,9 +220,7 @@ func (m *Monitor) Close() error {
// SetGlobalTag can be used to set tags that will appear on all points
// written by the Monitor.
func (m *Monitor) SetGlobalTag(key string, value interface{}) {
m.mu.Lock()
m.globalTags[key] = fmt.Sprintf("%v", value)
m.mu.Unlock()
m.globalTags.Add(key, value)
}

// RemoteWriterConfig represents the configuration of a remote writer.
Expand Down Expand Up @@ -440,7 +473,7 @@ func (m *Monitor) storeStatistics() {
m.createInternalStorage()
}()

stats, err := m.Statistics(m.globalTags)
stats, err := m.Statistics(m.globalTags.Tags())
if err != nil {
m.Logger.Info("Failed to retrieve registered statistics", zap.Error(err))
return
Expand Down