-
Notifications
You must be signed in to change notification settings - Fork 435
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Target Allocator implementation (Part 2 - Target Allocator Image logic) #354
Changes from all commits
5ba37d1
f9a3875
0301307
4674635
86d58e9
3853006
51056f2
760982e
aa1aecb
8b6fbf1
e3bc7c8
b9a5232
f640e50
f523de9
a5db21d
a572659
cf51157
286f9e9
725515d
8f03e47
f5a7220
d638147
9db3956
d51b39b
6bf5eda
6d73fa2
902a212
f450e17
284ff54
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,26 @@ | ||
# Build the target allocator binary | ||
FROM golang:1.17 as builder | ||
|
||
WORKDIR /app | ||
|
||
# Copy go mod and sum files | ||
COPY go.mod go.sum ./ | ||
|
||
RUN go mod download | ||
|
||
COPY . . | ||
|
||
# Build the Go app | ||
RUN CGO_ENABLED=0 GOOS=linux go build -a -installsuffix cgo -o main . | ||
|
||
######## Start a new stage from scratch ####### | ||
FROM alpine:latest | ||
|
||
RUN apk --no-cache add ca-certificates | ||
|
||
WORKDIR /root/ | ||
|
||
# Copy the pre-built binary file from the previous stage | ||
COPY --from=builder /app/main . | ||
|
||
CMD ["./main"] |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,153 @@ | ||
package allocation | ||
|
||
import ( | ||
"fmt" | ||
"sync" | ||
|
||
"github.com/go-logr/logr" | ||
"github.com/prometheus/common/model" | ||
) | ||
|
||
/* | ||
Load balancer will serve on an HTTP server exposing /jobs/<job_id>/targets <- these are configured using least connection | ||
Load balancer will need information about the collectors in order to set the URLs | ||
Keep a Map of what each collector currently holds and update it based on new scrape target updates | ||
*/ | ||
|
||
type TargetItem struct { | ||
JobName string | ||
Link LinkJSON | ||
TargetURL string | ||
Label model.LabelSet | ||
Collector *collector | ||
} | ||
|
||
// Create a struct that holds collector - and jobs for that collector | ||
// This struct will be parsed into endpoint with collector and jobs info | ||
|
||
type collector struct { | ||
Name string | ||
NumTargets int | ||
} | ||
|
||
// Allocator makes decisions to distribute work among | ||
// a number of OpenTelemetry collectors based on the number of targets. | ||
// Users need to call SetTargets when they have new targets in their | ||
// clusters and call Reshard to process the new targets and reshard. | ||
type Allocator struct { | ||
m sync.Mutex | ||
|
||
targetsWaiting map[string]TargetItem // temp buffer to keep targets that are waiting to be processed | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why is this not holding a pointer to TargetItem, like |
||
|
||
collectors map[string]*collector // all current collectors | ||
|
||
TargetItems map[string]*TargetItem | ||
|
||
log logr.Logger | ||
} | ||
|
||
// findNextCollector finds the next collector with less number of targets. | ||
func (allocator *Allocator) findNextCollector() *collector { | ||
var col *collector | ||
for _, v := range allocator.collectors { | ||
// If the initial collector is empty, set the initial collector to the first element of map | ||
if col == nil { | ||
col = v | ||
} else { | ||
if v.NumTargets < col.NumTargets { | ||
col = v | ||
} | ||
} | ||
|
||
} | ||
return col | ||
} | ||
|
||
// SetTargets accepts the a list of targets that will be used to make | ||
// load balancing decisions. This method should be called when where are | ||
// new targets discovered or existing targets are shutdown. | ||
func (allocator *Allocator) SetWaitingTargets(targets []TargetItem) { | ||
// Dump old data | ||
allocator.m.Lock() | ||
defer allocator.m.Unlock() | ||
allocator.targetsWaiting = make(map[string]TargetItem, len(targets)) | ||
// Set new data | ||
for _, i := range targets { | ||
allocator.targetsWaiting[i.JobName+i.TargetURL] = i | ||
} | ||
} | ||
|
||
// SetCollectors sets the set of collectors with key=collectorName, value=Collector object. | ||
// SetCollectors is called when Collectors are added or removed | ||
func (allocator *Allocator) SetCollectors(collectors []string) { | ||
log := allocator.log.WithValues("opentelemetry-targetallocator") | ||
|
||
allocator.m.Lock() | ||
Aneurysm9 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
defer allocator.m.Unlock() | ||
if len(collectors) == 0 { | ||
log.Info("No collector instances present") | ||
return | ||
} | ||
for k := range allocator.collectors { | ||
delete(allocator.collectors, k) | ||
} | ||
|
||
for _, i := range collectors { | ||
allocator.collectors[i] = &collector{Name: i, NumTargets: 0} | ||
} | ||
} | ||
|
||
// Reallocate needs to be called to process the new target updates. | ||
// Until Reallocate is called, old targets will be served. | ||
func (allocator *Allocator) AllocateTargets() { | ||
allocator.m.Lock() | ||
defer allocator.m.Unlock() | ||
allocator.removeOutdatedTargets() | ||
alexperez52 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
allocator.processWaitingTargets() | ||
} | ||
|
||
// ReallocateCollectors reallocates the targets among the new collector instances | ||
func (allocator *Allocator) ReallocateCollectors() { | ||
allocator.m.Lock() | ||
defer allocator.m.Unlock() | ||
allocator.TargetItems = make(map[string]*TargetItem) | ||
allocator.processWaitingTargets() | ||
} | ||
|
||
// removeOutdatedTargets removes targets that are no longer available. | ||
func (allocator *Allocator) removeOutdatedTargets() { | ||
for k := range allocator.TargetItems { | ||
if _, ok := allocator.targetsWaiting[k]; !ok { | ||
allocator.collectors[allocator.TargetItems[k].Collector.Name].NumTargets-- | ||
delete(allocator.TargetItems, k) | ||
} | ||
} | ||
} | ||
|
||
// processWaitingTargets processes the newly set targets. | ||
func (allocator *Allocator) processWaitingTargets() { | ||
for k, v := range allocator.targetsWaiting { | ||
if _, ok := allocator.TargetItems[k]; !ok { | ||
col := allocator.findNextCollector() | ||
allocator.TargetItems[k] = &v | ||
targetItem := TargetItem{ | ||
JobName: v.JobName, | ||
Link: LinkJSON{fmt.Sprintf("/jobs/%s/targets", v.JobName)}, | ||
TargetURL: v.TargetURL, | ||
Label: v.Label, | ||
Collector: col, | ||
} | ||
col.NumTargets++ | ||
allocator.TargetItems[v.JobName+v.TargetURL] = &targetItem | ||
} | ||
} | ||
} | ||
|
||
func NewAllocator(log logr.Logger) *Allocator { | ||
return &Allocator{ | ||
log: log, | ||
targetsWaiting: make(map[string]TargetItem), | ||
collectors: make(map[string]*collector), | ||
TargetItems: make(map[string]*TargetItem), | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,154 @@ | ||
package allocation | ||
|
||
import ( | ||
"math" | ||
"testing" | ||
|
||
"github.com/go-logr/logr" | ||
"github.com/prometheus/common/model" | ||
"github.com/stretchr/testify/assert" | ||
) | ||
|
||
// Tests least connection - The expected collector after running findNextCollector should be the collector with the least amount of workload | ||
func TestFindNextCollector(t *testing.T) { | ||
var log logr.Logger | ||
s := NewAllocator(log) | ||
|
||
defaultCol := collector{Name: "default-col", NumTargets: 1} | ||
maxCol := collector{Name: "max-col", NumTargets: 2} | ||
leastCol := collector{Name: "least-col", NumTargets: 0} | ||
s.collectors[maxCol.Name] = &maxCol | ||
s.collectors[leastCol.Name] = &leastCol | ||
s.collectors[defaultCol.Name] = &defaultCol | ||
|
||
assert.Equal(t, "least-col", s.findNextCollector().Name) | ||
} | ||
|
||
func TestSetCollectors(t *testing.T) { | ||
|
||
var log logr.Logger | ||
s := NewAllocator(log) | ||
|
||
cols := []string{"col-1", "col-2", "col-3"} | ||
s.SetCollectors(cols) | ||
|
||
excpectedColLen := len(cols) | ||
assert.Len(t, s.collectors, excpectedColLen) | ||
|
||
for _, i := range cols { | ||
assert.NotNil(t, s.collectors[i]) | ||
} | ||
} | ||
|
||
func TestAddingAndRemovingTargets(t *testing.T) { | ||
// prepare allocator with initial targets and collectors | ||
var log logr.Logger | ||
s := NewAllocator(log) | ||
|
||
cols := []string{"col-1", "col-2", "col-3"} | ||
s.SetCollectors(cols) | ||
|
||
initTargets := []string{"prometheus:1000", "prometheus:1001", "prometheus:1002", "prometheus:1003", "prometheus:1004", "prometheus:1005"} | ||
var targetList []TargetItem | ||
for _, i := range initTargets { | ||
targetList = append(targetList, TargetItem{JobName: "sample-name", TargetURL: i, Label: model.LabelSet{}}) | ||
} | ||
|
||
// test that targets and collectors are added properly | ||
s.SetWaitingTargets(targetList) | ||
s.AllocateTargets() | ||
|
||
// verify | ||
expectedTargetLen := len(initTargets) | ||
assert.Len(t, s.TargetItems, expectedTargetLen) | ||
|
||
// prepare second round of targets | ||
tar := []string{"prometheus:1001", "prometheus:1002", "prometheus:1003", "prometheus:1004"} | ||
var newTargetList []TargetItem | ||
for _, i := range tar { | ||
newTargetList = append(newTargetList, TargetItem{JobName: "sample-name", TargetURL: i, Label: model.LabelSet{}}) | ||
} | ||
|
||
// test that less targets are found - removed | ||
s.SetWaitingTargets(newTargetList) | ||
s.AllocateTargets() | ||
|
||
// verify | ||
expectedNewTargetLen := len(tar) | ||
assert.Len(t, s.TargetItems, expectedNewTargetLen) | ||
|
||
// verify results map | ||
for _, i := range tar { | ||
_, ok := s.TargetItems["sample-name"+i] | ||
assert.True(t, ok) | ||
} | ||
} | ||
|
||
// Tests that the delta in number of targets per collector is less than 15% of an even distribution | ||
func TestCollectorBalanceWhenAddingAndRemovingAtRandom(t *testing.T) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What advantages is this bringing in comparison to the test There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not sure I get the value-add between the second and the third, but assuming there's enough value justifying the added complexity, isn't it covering the third case covering the second? Meaning, do we need the second? And the first? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. From a straight coverage perspective, I think the third test will cover what the first two do as well. But they may still have value in enabling testing and reasoning about each capability somewhat more independently. I'd like to take another pass at this allocation logic in the future, but I think for now it is a solid starting point. |
||
|
||
// prepare allocator with 3 collectors and 'random' amount of targets | ||
var log logr.Logger | ||
s := NewAllocator(log) | ||
|
||
cols := []string{"col-1", "col-2", "col-3"} | ||
s.SetCollectors(cols) | ||
|
||
targets := []string{"prometheus:1001", "prometheus:1002", "prometheus:1003", "prometheus:1004", "prometheus:1005", "prometheus:1006", | ||
"prometheus:1011", "prometheus:1012", "prometheus:1013", "prometheus:1014", "prometheus:1015", "prometheus:1016", | ||
"prometheus:1021", "prometheus:1022", "prometheus:1023", "prometheus:1024", "prometheus:1025", "prometheus:1026"} | ||
var newTargetList []TargetItem | ||
for _, i := range targets { | ||
newTargetList = append(newTargetList, TargetItem{JobName: "sample-name", TargetURL: i, Label: model.LabelSet{}}) | ||
} | ||
s.SetWaitingTargets(newTargetList) | ||
s.AllocateTargets() | ||
|
||
// Divisor needed to get 15% | ||
divisor := 6.7 | ||
|
||
count := len(s.TargetItems) / len(s.collectors) | ||
percent := float64(len(s.TargetItems)) / divisor | ||
|
||
// test | ||
for _, i := range s.collectors { | ||
assert.InDelta(t, i.NumTargets, count, percent) | ||
} | ||
|
||
// removing targets at 'random' | ||
targets = []string{"prometheus:1002", "prometheus:1003", "prometheus:1004", "prometheus:1006", | ||
"prometheus:1011", "prometheus:1012", "prometheus:1013", "prometheus:1014", "prometheus:1016", | ||
"prometheus:1023", "prometheus:1024", "prometheus:1025", "prometheus:1026"} | ||
newTargetList = []TargetItem{} | ||
for _, i := range targets { | ||
newTargetList = append(newTargetList, TargetItem{JobName: "sample-name", TargetURL: i, Label: model.LabelSet{}}) | ||
} | ||
s.SetWaitingTargets(newTargetList) | ||
s.AllocateTargets() | ||
|
||
count = len(s.TargetItems) / len(s.collectors) | ||
percent = float64(len(s.TargetItems)) / divisor | ||
|
||
// test | ||
for _, i := range s.collectors { | ||
assert.InDelta(t, i.NumTargets, count, math.Round(percent)) | ||
} | ||
// adding targets at 'random' | ||
targets = []string{"prometheus:1002", "prometheus:1003", "prometheus:1004", "prometheus:1006", | ||
"prometheus:1011", "prometheus:1012", "prometheus:1001", "prometheus:1014", "prometheus:1016", | ||
"prometheus:1023", "prometheus:1024", "prometheus:1025", "prometheus:1126", "prometheus:1227"} | ||
newTargetList = []TargetItem{} | ||
for _, i := range targets { | ||
newTargetList = append(newTargetList, TargetItem{JobName: "sample-name", TargetURL: i, Label: model.LabelSet{}}) | ||
} | ||
s.SetWaitingTargets(newTargetList) | ||
s.AllocateTargets() | ||
|
||
count = len(s.TargetItems) / len(s.collectors) | ||
percent = float64(len(s.TargetItems)) / divisor | ||
|
||
// test | ||
for _, i := range s.collectors { | ||
assert.InDelta(t, i.NumTargets, count, math.Round(percent)) | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
None of these packages are used outside of this program, correct? Can we move them to
cmd/otel-allocator/internal/*
? That way we can be confident their scope is contained and not worry about breaking any external packages that have depended on them.This can be done separately and doesn't need to hold up this PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Has this been addressed?