Skip to content

Commit

Permalink
Protect allocator maps behind mutex, create getter funcs for them (op…
Browse files Browse the repository at this point in the history
…en-telemetry#1040)

* protect allocator maps behind mutex, create getter funcs for them

* remove empty line and adjust tests

* update comments, formatting
  • Loading branch information
kristinapathak authored Aug 18, 2022
1 parent 8cd3a1b commit a9ef0a2
Show file tree
Hide file tree
Showing 3 changed files with 76 additions and 46 deletions.
79 changes: 50 additions & 29 deletions cmd/otel-allocator/allocation/allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,40 +57,39 @@ type collector struct {
// Users need to call SetTargets when they have new targets in their
// clusters and call Reshard to process the new targets and reshard.
type Allocator struct {
m sync.Mutex

// m protects targetsWaiting, collectors, and targetItems for concurrent use.
m sync.RWMutex
targetsWaiting map[string]TargetItem // temp buffer to keep targets that are waiting to be processed

collectors map[string]*collector // all current collectors

targetItems map[string]*TargetItem
collectors map[string]*collector // all current collectors
targetItems map[string]*TargetItem

log logr.Logger
}

// TargetItems returns a shallow copy of the targetItems map.
func (allocator *Allocator) TargetItems() map[string]*TargetItem {
return allocator.targetItems
allocator.m.RLock()
defer allocator.m.RUnlock()
targetItemsCopy := make(map[string]*TargetItem)
for k, v := range allocator.targetItems {
targetItemsCopy[k] = v
}
return targetItemsCopy
}

// findNextCollector finds the next collector with less number of targets.
func (allocator *Allocator) findNextCollector() *collector {
var col *collector
for _, v := range allocator.collectors {
// If the initial collector is empty, set the initial collector to the first element of map
if col == nil {
col = v
} else {
if v.NumTargets < col.NumTargets {
col = v
}
}

// Collectors returns a shallow copy of the collectors map.
func (allocator *Allocator) Collectors() map[string]*collector {
allocator.m.RLock()
defer allocator.m.RUnlock()
collectorsCopy := make(map[string]*collector)
for k, v := range allocator.collectors {
collectorsCopy[k] = v
}
return col
return collectorsCopy
}

// SetTargets accepts the a list of targets that will be used to make
// load balancing decisions. This method should be called when where are
// SetWaitingTargets accepts a list of targets that will be used to make
// load balancing decisions. This method should be called when there are
// new targets discovered or existing targets are shutdown.
func (allocator *Allocator) SetWaitingTargets(targets []TargetItem) {
// Dump old data
Expand All @@ -104,7 +103,7 @@ func (allocator *Allocator) SetWaitingTargets(targets []TargetItem) {
}

// SetCollectors sets the set of collectors with key=collectorName, value=Collector object.
// SetCollectors is called when Collectors are added or removed
// This method is called when Collectors are added or removed.
func (allocator *Allocator) SetCollectors(collectors []string) {
log := allocator.log.WithValues("component", "opentelemetry-targetallocator")

Expand All @@ -124,8 +123,9 @@ func (allocator *Allocator) SetCollectors(collectors []string) {
collectorsAllocatable.Set(float64(len(collectors)))
}

// Reallocate needs to be called to process the new target updates.
// Until Reallocate is called, old targets will be served.
// AllocateTargets removes outdated targets and adds new ones from
// waitingTargets. This method needs to be called to process the new target
// updates. Until it is called, old targets will be served.
func (allocator *Allocator) AllocateTargets() {
allocator.m.Lock()
timer := prometheus.NewTimer(timeToAssign.WithLabelValues("AllocateTargets"))
Expand All @@ -135,7 +135,7 @@ func (allocator *Allocator) AllocateTargets() {
allocator.processWaitingTargets()
}

// ReallocateCollectors reallocates the targets among the new collector instances
// ReallocateCollectors reallocates the targets among the new collector instances.
func (allocator *Allocator) ReallocateCollectors() {
allocator.m.Lock()
timer := prometheus.NewTimer(timeToAssign.WithLabelValues("ReallocateCollectors"))
Expand All @@ -145,7 +145,8 @@ func (allocator *Allocator) ReallocateCollectors() {
allocator.processWaitingTargets()
}

// removeOutdatedTargets removes targets that are no longer available.
// removeOutdatedTargets removes targets that are no longer available. This
// method is called after a lock has been acquired in ReallocateCollectors().
func (allocator *Allocator) removeOutdatedTargets() {
for k := range allocator.targetItems {
if _, ok := allocator.targetsWaiting[k]; !ok {
Expand All @@ -155,7 +156,8 @@ func (allocator *Allocator) removeOutdatedTargets() {
}
}

// processWaitingTargets processes the newly set targets.
// processWaitingTargets processes the newly set targets. This method is called
// after a lock has been acquired in AllocateTargets() or ReallocateCollectors().
func (allocator *Allocator) processWaitingTargets() {
for k, v := range allocator.targetsWaiting {
if _, ok := allocator.targetItems[k]; !ok {
Expand All @@ -175,6 +177,25 @@ func (allocator *Allocator) processWaitingTargets() {
}
}

// findNextCollector finds the next collector with fewer number of targets.
// This method is called from within processWaitingTargets(), whose caller
// acquires the needed lock.
func (allocator *Allocator) findNextCollector() *collector {
var col *collector
for _, v := range allocator.collectors {
// If the initial collector is empty, set the initial collector to the first element of map
if col == nil {
col = v
} else {
if v.NumTargets < col.NumTargets {
col = v
}
}

}
return col
}

func NewAllocator(log logr.Logger) *Allocator {
return &Allocator{
log: log,
Expand Down
41 changes: 25 additions & 16 deletions cmd/otel-allocator/allocation/allocator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,12 @@ func TestSetCollectors(t *testing.T) {
cols := []string{"col-1", "col-2", "col-3"}
s.SetCollectors(cols)

excpectedColLen := len(cols)
assert.Len(t, s.collectors, excpectedColLen)
expectedColLen := len(cols)
collectors := s.Collectors()
assert.Len(t, collectors, expectedColLen)

for _, i := range cols {
assert.NotNil(t, s.collectors[i])
assert.NotNil(t, collectors[i])
}
}

Expand Down Expand Up @@ -73,12 +74,13 @@ func TestAddingAndRemovingTargets(t *testing.T) {
s.AllocateTargets()

// verify
targetItems := s.TargetItems()
expectedNewTargetLen := len(tar)
assert.Len(t, s.TargetItems(), expectedNewTargetLen)
assert.Len(t, targetItems, expectedNewTargetLen)

// verify results map
for _, i := range tar {
_, ok := s.TargetItems()["sample-name"+i+labels.Fingerprint().String()]
_, ok := targetItems["sample-name"+i+labels.Fingerprint().String()]
assert.True(t, ok)
}
}
Expand Down Expand Up @@ -107,12 +109,13 @@ func TestAllocationCollision(t *testing.T) {
s.AllocateTargets()

// verify
targetItems := s.TargetItems()
expectedTargetLen := len(targetList)
assert.Len(t, s.TargetItems(), expectedTargetLen)
assert.Len(t, targetItems, expectedTargetLen)

// verify results map
for _, i := range targetList {
_, ok := s.TargetItems()[i.hash()]
_, ok := targetItems[i.hash()]
assert.True(t, ok)
}
}
Expand All @@ -139,11 +142,13 @@ func TestCollectorBalanceWhenAddingAndRemovingAtRandom(t *testing.T) {
// Divisor needed to get 15%
divisor := 6.7

count := len(s.TargetItems()) / len(s.collectors)
percent := float64(len(s.TargetItems())) / divisor
targetItemLen := len(s.TargetItems())
collectors := s.Collectors()
count := targetItemLen / len(collectors)
percent := float64(targetItemLen) / divisor

// test
for _, i := range s.collectors {
for _, i := range collectors {
assert.InDelta(t, i.NumTargets, count, percent)
}

Expand All @@ -158,11 +163,13 @@ func TestCollectorBalanceWhenAddingAndRemovingAtRandom(t *testing.T) {
s.SetWaitingTargets(newTargetList)
s.AllocateTargets()

count = len(s.TargetItems()) / len(s.collectors)
percent = float64(len(s.TargetItems())) / divisor
targetItemLen = len(s.TargetItems())
collectors = s.Collectors()
count = targetItemLen / len(collectors)
percent = float64(targetItemLen) / divisor

// test
for _, i := range s.collectors {
for _, i := range collectors {
assert.InDelta(t, i.NumTargets, count, math.Round(percent))
}
// adding targets at 'random'
Expand All @@ -176,11 +183,13 @@ func TestCollectorBalanceWhenAddingAndRemovingAtRandom(t *testing.T) {
s.SetWaitingTargets(newTargetList)
s.AllocateTargets()

count = len(s.TargetItems()) / len(s.collectors)
percent = float64(len(s.TargetItems())) / divisor
targetItemLen = len(s.TargetItems())
collectors = s.Collectors()
count = targetItemLen / len(collectors)
percent = float64(targetItemLen) / divisor

// test
for _, i := range s.collectors {
for _, i := range collectors {
assert.InDelta(t, i.NumTargets, count, math.Round(percent))
}
}
2 changes: 1 addition & 1 deletion cmd/otel-allocator/allocation/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func GetAllTargetsByCollectorAndJob(collector string, job string, cMap map[strin
var tgs []targetGroupJSON
group := make(map[string]string)
labelSet := make(map[string]model.LabelSet)
for _, col := range allocator.collectors {
for _, col := range allocator.Collectors() {
if col.Name == collector {
for _, targetItemArr := range cMap {
for _, targetItem := range targetItemArr {
Expand Down

0 comments on commit a9ef0a2

Please sign in to comment.