Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Protect allocator maps behind mutex, create getter funcs for them #1040

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 50 additions & 29 deletions cmd/otel-allocator/allocation/allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,40 +57,39 @@ type collector struct {
// Users need to call SetTargets when they have new targets in their
// clusters and call Reshard to process the new targets and reshard.
type Allocator struct {
m sync.Mutex

// m protects targetsWaiting, collectors, and targetItems for concurrent use.
m sync.RWMutex
targetsWaiting map[string]TargetItem // temp buffer to keep targets that are waiting to be processed

collectors map[string]*collector // all current collectors

targetItems map[string]*TargetItem
collectors map[string]*collector // all current collectors
targetItems map[string]*TargetItem

log logr.Logger
}

// TargetItems returns a shallow copy of the targetItems map.
func (allocator *Allocator) TargetItems() map[string]*TargetItem {
return allocator.targetItems
allocator.m.RLock()
defer allocator.m.RUnlock()
targetItemsCopy := make(map[string]*TargetItem)
for k, v := range allocator.targetItems {
targetItemsCopy[k] = v
}
return targetItemsCopy
}

// findNextCollector finds the next collector with less number of targets.
func (allocator *Allocator) findNextCollector() *collector {
var col *collector
for _, v := range allocator.collectors {
// If the initial collector is empty, set the initial collector to the first element of map
if col == nil {
col = v
} else {
if v.NumTargets < col.NumTargets {
col = v
}
}

// Collectors returns a shallow copy of the collectors map.
func (allocator *Allocator) Collectors() map[string]*collector {
allocator.m.RLock()
defer allocator.m.RUnlock()
collectorsCopy := make(map[string]*collector)
for k, v := range allocator.collectors {
collectorsCopy[k] = v
}
return col
return collectorsCopy
}

// SetTargets accepts the a list of targets that will be used to make
// load balancing decisions. This method should be called when where are
// SetWaitingTargets accepts a list of targets that will be used to make
// load balancing decisions. This method should be called when there are
// new targets discovered or existing targets are shutdown.
func (allocator *Allocator) SetWaitingTargets(targets []TargetItem) {
// Dump old data
Expand All @@ -104,7 +103,7 @@ func (allocator *Allocator) SetWaitingTargets(targets []TargetItem) {
}

// SetCollectors sets the set of collectors with key=collectorName, value=Collector object.
// SetCollectors is called when Collectors are added or removed
// This method is called when Collectors are added or removed.
func (allocator *Allocator) SetCollectors(collectors []string) {
log := allocator.log.WithValues("component", "opentelemetry-targetallocator")

Expand All @@ -124,8 +123,9 @@ func (allocator *Allocator) SetCollectors(collectors []string) {
collectorsAllocatable.Set(float64(len(collectors)))
}

// Reallocate needs to be called to process the new target updates.
// Until Reallocate is called, old targets will be served.
// AllocateTargets removes outdated targets and adds new ones from
// waitingTargets. This method needs to be called to process the new target
// updates. Until it is called, old targets will be served.
func (allocator *Allocator) AllocateTargets() {
allocator.m.Lock()
timer := prometheus.NewTimer(timeToAssign.WithLabelValues("AllocateTargets"))
Expand All @@ -135,7 +135,7 @@ func (allocator *Allocator) AllocateTargets() {
allocator.processWaitingTargets()
}

// ReallocateCollectors reallocates the targets among the new collector instances
// ReallocateCollectors reallocates the targets among the new collector instances.
func (allocator *Allocator) ReallocateCollectors() {
allocator.m.Lock()
timer := prometheus.NewTimer(timeToAssign.WithLabelValues("ReallocateCollectors"))
Expand All @@ -145,7 +145,8 @@ func (allocator *Allocator) ReallocateCollectors() {
allocator.processWaitingTargets()
}

// removeOutdatedTargets removes targets that are no longer available.
// removeOutdatedTargets removes targets that are no longer available. This
// method is called after a lock has been acquired in ReallocateCollectors().
func (allocator *Allocator) removeOutdatedTargets() {
for k := range allocator.targetItems {
kristinapathak marked this conversation as resolved.
Show resolved Hide resolved
if _, ok := allocator.targetsWaiting[k]; !ok {
Expand All @@ -155,7 +156,8 @@ func (allocator *Allocator) removeOutdatedTargets() {
}
}

// processWaitingTargets processes the newly set targets.
// processWaitingTargets processes the newly set targets. This method is called
// after a lock has been acquired in AllocateTargets() or ReallocateCollectors().
func (allocator *Allocator) processWaitingTargets() {
for k, v := range allocator.targetsWaiting {
if _, ok := allocator.targetItems[k]; !ok {
kristinapathak marked this conversation as resolved.
Show resolved Hide resolved
Expand All @@ -175,6 +177,25 @@ func (allocator *Allocator) processWaitingTargets() {
}
}

// findNextCollector finds the next collector with fewer number of targets.
// This method is called from within processWaitingTargets(), whose caller
// acquires the needed lock.
func (allocator *Allocator) findNextCollector() *collector {
var col *collector
for _, v := range allocator.collectors {
// If the initial collector is empty, set the initial collector to the first element of map
if col == nil {
col = v
} else {
if v.NumTargets < col.NumTargets {
col = v
}
}

}
return col
}

func NewAllocator(log logr.Logger) *Allocator {
return &Allocator{
log: log,
Expand Down
41 changes: 25 additions & 16 deletions cmd/otel-allocator/allocation/allocator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,12 @@ func TestSetCollectors(t *testing.T) {
cols := []string{"col-1", "col-2", "col-3"}
s.SetCollectors(cols)

excpectedColLen := len(cols)
assert.Len(t, s.collectors, excpectedColLen)
expectedColLen := len(cols)
collectors := s.Collectors()
assert.Len(t, collectors, expectedColLen)

for _, i := range cols {
assert.NotNil(t, s.collectors[i])
assert.NotNil(t, collectors[i])
}
}

Expand Down Expand Up @@ -73,12 +74,13 @@ func TestAddingAndRemovingTargets(t *testing.T) {
s.AllocateTargets()

// verify
targetItems := s.TargetItems()
expectedNewTargetLen := len(tar)
assert.Len(t, s.TargetItems(), expectedNewTargetLen)
assert.Len(t, targetItems, expectedNewTargetLen)

// verify results map
for _, i := range tar {
_, ok := s.TargetItems()["sample-name"+i+labels.Fingerprint().String()]
_, ok := targetItems["sample-name"+i+labels.Fingerprint().String()]
assert.True(t, ok)
}
}
Expand Down Expand Up @@ -107,12 +109,13 @@ func TestAllocationCollision(t *testing.T) {
s.AllocateTargets()

// verify
targetItems := s.TargetItems()
expectedTargetLen := len(targetList)
assert.Len(t, s.TargetItems(), expectedTargetLen)
assert.Len(t, targetItems, expectedTargetLen)

// verify results map
for _, i := range targetList {
_, ok := s.TargetItems()[i.hash()]
_, ok := targetItems[i.hash()]
assert.True(t, ok)
}
}
Expand All @@ -139,11 +142,13 @@ func TestCollectorBalanceWhenAddingAndRemovingAtRandom(t *testing.T) {
// Divisor needed to get 15%
divisor := 6.7

count := len(s.TargetItems()) / len(s.collectors)
percent := float64(len(s.TargetItems())) / divisor
targetItemLen := len(s.TargetItems())
collectors := s.Collectors()
count := targetItemLen / len(collectors)
percent := float64(targetItemLen) / divisor

// test
for _, i := range s.collectors {
for _, i := range collectors {
assert.InDelta(t, i.NumTargets, count, percent)
}

Expand All @@ -158,11 +163,13 @@ func TestCollectorBalanceWhenAddingAndRemovingAtRandom(t *testing.T) {
s.SetWaitingTargets(newTargetList)
s.AllocateTargets()

count = len(s.TargetItems()) / len(s.collectors)
percent = float64(len(s.TargetItems())) / divisor
targetItemLen = len(s.TargetItems())
collectors = s.Collectors()
count = targetItemLen / len(collectors)
percent = float64(targetItemLen) / divisor

// test
for _, i := range s.collectors {
for _, i := range collectors {
assert.InDelta(t, i.NumTargets, count, math.Round(percent))
}
// adding targets at 'random'
Expand All @@ -176,11 +183,13 @@ func TestCollectorBalanceWhenAddingAndRemovingAtRandom(t *testing.T) {
s.SetWaitingTargets(newTargetList)
s.AllocateTargets()

count = len(s.TargetItems()) / len(s.collectors)
percent = float64(len(s.TargetItems())) / divisor
targetItemLen = len(s.TargetItems())
collectors = s.Collectors()
count = targetItemLen / len(collectors)
percent = float64(targetItemLen) / divisor

// test
for _, i := range s.collectors {
for _, i := range collectors {
assert.InDelta(t, i.NumTargets, count, math.Round(percent))
}
}
2 changes: 1 addition & 1 deletion cmd/otel-allocator/allocation/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func GetAllTargetsByCollectorAndJob(collector string, job string, cMap map[strin
var tgs []targetGroupJSON
group := make(map[string]string)
labelSet := make(map[string]model.LabelSet)
for _, col := range allocator.collectors {
for _, col := range allocator.Collectors() {
if col.Name == collector {
for _, targetItemArr := range cMap {
for _, targetItem := range targetItemArr {
Expand Down