Skip to content

Commit

Permalink
protect allocator maps behind mutex, create getter funcs for them
Browse files Browse the repository at this point in the history
  • Loading branch information
Kristina Pathak committed Aug 17, 2022
1 parent 388a7c9 commit 30f2002
Show file tree
Hide file tree
Showing 4 changed files with 46 additions and 23 deletions.
43 changes: 33 additions & 10 deletions cmd/otel-allocator/allocation/allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,17 +53,40 @@ type collector struct {
// Users need to call SetTargets when they have new targets in their
// clusters and call Reshard to process the new targets and reshard.
type Allocator struct {
m sync.Mutex
m sync.RWMutex

targetsWaiting map[string]TargetItem // temp buffer to keep targets that are waiting to be processed

collectors map[string]*collector // all current collectors

TargetItems map[string]*TargetItem
targetItems map[string]*TargetItem

log logr.Logger
}

// TargetItems returns a shallow copy of the targetItems map.
func (allocator *Allocator) TargetItems() map[string]*TargetItem {
allocator.m.RLock()
defer allocator.m.RUnlock()
targetItemsCopy := make(map[string]*TargetItem)
for k, v := range allocator.targetItems {
targetItemsCopy[k] = v
}
return targetItemsCopy
}

// Collectors returns a shallow copy of the collectors map.
func (allocator *Allocator) Collectors() map[string]*collector {
allocator.m.RLock()
defer allocator.m.RUnlock()
collectorsCopy := make(map[string]*collector)
for k, v := range allocator.collectors {
collectorsCopy[k] = v
}
return collectorsCopy

}

// findNextCollector finds the next collector with less number of targets.
func (allocator *Allocator) findNextCollector() *collector {
var col *collector
Expand Down Expand Up @@ -133,26 +156,26 @@ func (allocator *Allocator) ReallocateCollectors() {
timer := prometheus.NewTimer(timeToAssign.WithLabelValues("ReallocateCollectors"))
defer timer.ObserveDuration()
defer allocator.m.Unlock()
allocator.TargetItems = make(map[string]*TargetItem)
allocator.targetItems = make(map[string]*TargetItem)
allocator.processWaitingTargets()
}

// removeOutdatedTargets removes targets that are no longer available.
func (allocator *Allocator) removeOutdatedTargets() {
for k := range allocator.TargetItems {
for k := range allocator.targetItems {
if _, ok := allocator.targetsWaiting[k]; !ok {
allocator.collectors[allocator.TargetItems[k].Collector.Name].NumTargets--
delete(allocator.TargetItems, k)
allocator.collectors[allocator.targetItems[k].Collector.Name].NumTargets--
delete(allocator.targetItems, k)
}
}
}

// processWaitingTargets processes the newly set targets.
func (allocator *Allocator) processWaitingTargets() {
for k, v := range allocator.targetsWaiting {
if _, ok := allocator.TargetItems[k]; !ok {
if _, ok := allocator.targetItems[k]; !ok {
col := allocator.findNextCollector()
allocator.TargetItems[k] = &v
allocator.targetItems[k] = &v
targetItem := TargetItem{
JobName: v.JobName,
Link: LinkJSON{fmt.Sprintf("/jobs/%s/targets", url.QueryEscape(v.JobName))},
Expand All @@ -162,7 +185,7 @@ func (allocator *Allocator) processWaitingTargets() {
}
col.NumTargets++
targetsPerCollector.WithLabelValues(col.Name).Set(float64(col.NumTargets))
allocator.TargetItems[v.JobName+v.TargetURL] = &targetItem
allocator.targetItems[v.JobName+v.TargetURL] = &targetItem
}
}
}
Expand All @@ -172,6 +195,6 @@ func NewAllocator(log logr.Logger) *Allocator {
log: log,
targetsWaiting: make(map[string]TargetItem),
collectors: make(map[string]*collector),
TargetItems: make(map[string]*TargetItem),
targetItems: make(map[string]*TargetItem),
}
}
18 changes: 9 additions & 9 deletions cmd/otel-allocator/allocation/allocator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func TestAddingAndRemovingTargets(t *testing.T) {

// verify
expectedTargetLen := len(initTargets)
assert.Len(t, s.TargetItems, expectedTargetLen)
assert.Len(t, s.TargetItems(), expectedTargetLen)

// prepare second round of targets
tar := []string{"prometheus:1001", "prometheus:1002", "prometheus:1003", "prometheus:1004"}
Expand All @@ -73,11 +73,11 @@ func TestAddingAndRemovingTargets(t *testing.T) {

// verify
expectedNewTargetLen := len(tar)
assert.Len(t, s.TargetItems, expectedNewTargetLen)
assert.Len(t, s.TargetItems(), expectedNewTargetLen)

// verify results map
for _, i := range tar {
_, ok := s.TargetItems["sample-name"+i]
_, ok := s.targetItems["sample-name"+i]
assert.True(t, ok)
}
}
Expand All @@ -104,8 +104,8 @@ func TestCollectorBalanceWhenAddingAndRemovingAtRandom(t *testing.T) {
// Divisor needed to get 15%
divisor := 6.7

count := len(s.TargetItems) / len(s.collectors)
percent := float64(len(s.TargetItems)) / divisor
count := len(s.targetItems) / len(s.collectors)
percent := float64(len(s.targetItems)) / divisor

// test
for _, i := range s.collectors {
Expand All @@ -123,8 +123,8 @@ func TestCollectorBalanceWhenAddingAndRemovingAtRandom(t *testing.T) {
s.SetWaitingTargets(newTargetList)
s.AllocateTargets()

count = len(s.TargetItems) / len(s.collectors)
percent = float64(len(s.TargetItems)) / divisor
count = len(s.targetItems) / len(s.collectors)
percent = float64(len(s.targetItems)) / divisor

// test
for _, i := range s.collectors {
Expand All @@ -141,8 +141,8 @@ func TestCollectorBalanceWhenAddingAndRemovingAtRandom(t *testing.T) {
s.SetWaitingTargets(newTargetList)
s.AllocateTargets()

count = len(s.TargetItems) / len(s.collectors)
percent = float64(len(s.TargetItems)) / divisor
count = len(s.targetItems) / len(s.collectors)
percent = float64(len(s.targetItems)) / divisor

// test
for _, i := range s.collectors {
Expand Down
6 changes: 3 additions & 3 deletions cmd/otel-allocator/allocation/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ type targetGroupJSON struct {

func GetAllTargetsByJob(job string, cMap map[string][]TargetItem, allocator *Allocator) map[string]collectorJSON {
displayData := make(map[string]collectorJSON)
for _, j := range allocator.TargetItems {
for _, j := range allocator.TargetItems() {
if j.JobName == job {
var targetList []TargetItem
targetList = append(targetList, cMap[j.Collector.Name+j.JobName]...)
Expand All @@ -48,11 +48,11 @@ func GetAllTargetsByCollectorAndJob(collector string, job string, cMap map[strin
var tgs []targetGroupJSON
group := make(map[string]string)
labelSet := make(map[string]model.LabelSet)
for _, col := range allocator.collectors {
for _, col := range allocator.Collectors() {
if col.Name == collector {
for _, targetItemArr := range cMap {
for _, targetItem := range targetItemArr {
if targetItem.Collector.Name == collector && targetItem.JobName == job {
if targetItem.Collector.Name == collector && targetItem.JobName == job {
group[targetItem.Label.String()] = targetItem.TargetURL
labelSet[targetItem.TargetURL] = targetItem.Label
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/otel-allocator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ func (s *server) TargetsHandler(w http.ResponseWriter, r *http.Request) {
q := r.URL.Query()["collector_id"]

var compareMap = make(map[string][]allocation.TargetItem) // CollectorName+jobName -> TargetItem
for _, v := range s.allocator.TargetItems {
for _, v := range s.allocator.TargetItems() {
compareMap[v.Collector.Name+v.JobName] = append(compareMap[v.Collector.Name+v.JobName], *v)
}
params := mux.Vars(r)
Expand Down

0 comments on commit 30f2002

Please sign in to comment.