Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix unnecessary and incorrect reallocation #1041

Merged
merged 14 commits into from
Aug 22, 2022
Merged
209 changes: 120 additions & 89 deletions cmd/otel-allocator/allocation/allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ var (
)

/*
Load balancer will serve on an HTTP server exposing /jobs/<job_id>/targets <- these are configured using least connection
Load balancer will serve on an HTTP server exposing /jobs/<job_id>/targets
The targets are allocated using the least connection method
Load balancer will need information about the collectors in order to set the URLs
Keep a Map of what each collector currently holds and update it based on new scrape target updates
*/
Expand Down Expand Up @@ -55,13 +56,12 @@ type collector struct {
// Allocator makes decisions to distribute work among
// a number of OpenTelemetry collectors based on the number of targets.
// Users need to call SetTargets when they have new targets in their
// clusters and call Reshard to process the new targets and reshard.
// clusters and call SetCollectors when the collectors have changed.
type Allocator struct {
// m protects targetsWaiting, collectors, and targetItems for concurrent use.
jaronoff97 marked this conversation as resolved.
Show resolved Hide resolved
m sync.RWMutex
targetsWaiting map[string]TargetItem // temp buffer to keep targets that are waiting to be processed
collectors map[string]*collector // all current collectors
targetItems map[string]*TargetItem
m sync.RWMutex
collectors map[string]*collector // all current collectors
targetItems map[string]*TargetItem

log logr.Logger
}
Expand All @@ -88,119 +88,150 @@ func (allocator *Allocator) Collectors() map[string]*collector {
return collectorsCopy
}

// SetWaitingTargets accepts a list of targets that will be used to make
// findNextCollector finds the next collector with fewer number of targets.
// This method is called from within SetTargets and SetCollectors, whose caller
// acquires the needed lock.
func (allocator *Allocator) findNextCollector() *collector {
var col *collector
for _, v := range allocator.collectors {
// If the initial collector is empty, set the initial collector to the first element of map
if col == nil {
col = v
} else {
if v.NumTargets < col.NumTargets {
col = v
}
}

jaronoff97 marked this conversation as resolved.
Show resolved Hide resolved
}
return col
}

// addTargetToTargetItems assigns a target to the next available collector and adds it to the allocator's targetItems
// This method is called from within SetTargets and SetCollectors, whose caller acquires the needed lock.
// This is only called after the collectors are cleared or when a new target has been found in the tempTargetMap
func (allocator *Allocator) addTargetToTargetItems(target *TargetItem) {
chosenCollector := allocator.findNextCollector()
targetItem := TargetItem{
JobName: target.JobName,
Link: LinkJSON{fmt.Sprintf("/jobs/%s/targets", url.QueryEscape(target.JobName))},
TargetURL: target.TargetURL,
Label: target.Label,
Collector: chosenCollector,
}
allocator.targetItems[targetItem.hash()] = &targetItem
chosenCollector.NumTargets++
targetsPerCollector.WithLabelValues(chosenCollector.Name).Set(float64(chosenCollector.NumTargets))
jaronoff97 marked this conversation as resolved.
Show resolved Hide resolved
}

// getCollectorChanges returns the new and removed collectors respectively.
// This method is called from within SetCollectors, which acquires the needed lock.
func (allocator *Allocator) getCollectorChanges(collectors []string) ([]string, []string) {
var newCollectors []string
var removedCollectors []string
// Used as a set to check for removed collectors
tempCollectorMap := map[string]bool{}
for _, s := range collectors {
if _, found := allocator.collectors[s]; !found {
newCollectors = append(newCollectors, s)
}
tempCollectorMap[s] = true
}
for k := range allocator.collectors {
if _, found := tempCollectorMap[k]; !found {
removedCollectors = append(removedCollectors, k)
}
}
return newCollectors, removedCollectors
}

// SetTargets accepts a list of targets that will be used to make
// load balancing decisions. This method should be called when there are
// new targets discovered or existing targets are shutdown.
func (allocator *Allocator) SetWaitingTargets(targets []TargetItem) {
// Dump old data
func (allocator *Allocator) SetTargets(targets []TargetItem) {
timer := prometheus.NewTimer(timeToAssign.WithLabelValues("SetTargets"))
defer timer.ObserveDuration()

allocator.m.Lock()
defer allocator.m.Unlock()
allocator.targetsWaiting = make(map[string]TargetItem, len(targets))
// Set new data
for _, i := range targets {
allocator.targetsWaiting[i.hash()] = i

// Make the temp map for access
tempTargetMap := make(map[string]TargetItem, len(targets))
for _, target := range targets {
tempTargetMap[target.hash()] = target
}

// Check for removals
for k, target := range allocator.targetItems {
// if the old target is no longer in the new list, remove it
if _, ok := tempTargetMap[k]; !ok {
allocator.collectors[target.Collector.Name].NumTargets--
jaronoff97 marked this conversation as resolved.
Show resolved Hide resolved
delete(allocator.targetItems, k)
targetsPerCollector.WithLabelValues(target.Collector.Name).Set(float64(allocator.collectors[target.Collector.Name].NumTargets))
}
}

// Check for additions
for k, target := range tempTargetMap {
// Do nothing if the item is already there
if _, ok := allocator.targetItems[k]; ok {
continue
} else {
// Assign a collector to the new target
allocator.addTargetToTargetItems(&target)
}
}
}

// SetCollectors sets the set of collectors with key=collectorName, value=Collector object.
// This method is called when Collectors are added or removed.
func (allocator *Allocator) SetCollectors(collectors []string) {
log := allocator.log.WithValues("component", "opentelemetry-targetallocator")
timer := prometheus.NewTimer(timeToAssign.WithLabelValues("SetCollectors"))
defer timer.ObserveDuration()

allocator.m.Lock()
defer allocator.m.Unlock()
newCollectors, removedCollectors := allocator.getCollectorChanges(collectors)
if len(collectors) == 0 {
log.Info("No collector instances present")
return
} else if len(newCollectors) == 0 && len(removedCollectors) == 0 {
log.Info("No changes to the collectors found")
return
}
for k := range allocator.collectors {

// Clear existing collectors
jaronoff97 marked this conversation as resolved.
Show resolved Hide resolved
for _, k := range removedCollectors {
delete(allocator.collectors, k)
targetsPerCollector.WithLabelValues(k).Set(0)
}

for _, i := range collectors {
// Insert the new collectors
for _, i := range newCollectors {
allocator.collectors[i] = &collector{Name: i, NumTargets: 0}
}
collectorsAllocatable.Set(float64(len(collectors)))
}

// AllocateTargets removes outdated targets and adds new ones from
// waitingTargets. This method needs to be called to process the new target
// updates. Until it is called, old targets will be served.
func (allocator *Allocator) AllocateTargets() {
allocator.m.Lock()
timer := prometheus.NewTimer(timeToAssign.WithLabelValues("AllocateTargets"))
defer timer.ObserveDuration()
defer allocator.m.Unlock()
allocator.removeOutdatedTargets()
allocator.processWaitingTargets()
}

// ReallocateCollectors reallocates the targets among the new collector instances.
func (allocator *Allocator) ReallocateCollectors() {
allocator.m.Lock()
timer := prometheus.NewTimer(timeToAssign.WithLabelValues("ReallocateCollectors"))
defer timer.ObserveDuration()
defer allocator.m.Unlock()
allocator.targetItems = make(map[string]*TargetItem)
allocator.processWaitingTargets()
}

// removeOutdatedTargets removes targets that are no longer available. This
// method is called after a lock has been acquired in ReallocateCollectors().
func (allocator *Allocator) removeOutdatedTargets() {
for k := range allocator.targetItems {
if _, ok := allocator.targetsWaiting[k]; !ok {
allocator.collectors[allocator.targetItems[k].Collector.Name].NumTargets--
delete(allocator.targetItems, k)
}
}
}

// processWaitingTargets processes the newly set targets. This method is called
// after a lock has been acquired in AllocateTargets() or ReallocateCollectors().
func (allocator *Allocator) processWaitingTargets() {
for k, v := range allocator.targetsWaiting {
if _, ok := allocator.targetItems[k]; !ok {
col := allocator.findNextCollector()
allocator.targetItems[k] = &v
targetItem := TargetItem{
JobName: v.JobName,
Link: LinkJSON{fmt.Sprintf("/jobs/%s/targets", url.QueryEscape(v.JobName))},
TargetURL: v.TargetURL,
Label: v.Label,
Collector: col,
// find targets which need to be redistributed
var redistribute []*TargetItem
for _, item := range allocator.targetItems {
for _, s := range removedCollectors {
if item.Collector.Name == s {
redistribute = append(redistribute, item)
jaronoff97 marked this conversation as resolved.
Show resolved Hide resolved
}
col.NumTargets++
targetsPerCollector.WithLabelValues(col.Name).Set(float64(col.NumTargets))
allocator.targetItems[v.hash()] = &targetItem
}
}
}

// findNextCollector finds the next collector with fewer number of targets.
// This method is called from within processWaitingTargets(), whose caller
// acquires the needed lock.
func (allocator *Allocator) findNextCollector() *collector {
var col *collector
for _, v := range allocator.collectors {
// If the initial collector is empty, set the initial collector to the first element of map
if col == nil {
col = v
} else {
if v.NumTargets < col.NumTargets {
col = v
}
}

// Re-Allocate the existing targets
for _, item := range redistribute {
allocator.addTargetToTargetItems(item)
}
return col

collectorsAllocatable.Set(float64(len(collectors)))
}

jaronoff97 marked this conversation as resolved.
Show resolved Hide resolved
func NewAllocator(log logr.Logger) *Allocator {
return &Allocator{
log: log,
targetsWaiting: make(map[string]TargetItem),
collectors: make(map[string]*collector),
targetItems: make(map[string]*TargetItem),
log: log,
collectors: make(map[string]*collector),
targetItems: make(map[string]*TargetItem),
}
}
Loading