Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Target Allocator implementation (Part 2 - Target Allocator Image logic) #354

Merged
merged 29 commits into from
Sep 15, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
5ba37d1
Target Allocation server logic
Jul 22, 2021
f9a3875
Added cmd to indicate executable
alexperez52 Jul 22, 2021
0301307
Update cmd/otel-allocator/allocation/allocator.go
alexperez52 Jul 28, 2021
4674635
Update cmd/otel-allocator/allocation/allocator.go
alexperez52 Jul 28, 2021
86d58e9
Updated discovery manager, collector component and added testing file…
Jul 28, 2021
3853006
Removed unnecessary struct in config.go
Jul 28, 2021
51056f2
Added load testing
alexperez52 Jul 29, 2021
760982e
Update cmd/otel-allocator/allocation/allocator.go
alexperez52 Jul 28, 2021
aa1aecb
Update cmd/otel-allocator/allocation/allocator.go
alexperez52 Jul 28, 2021
8b6fbf1
Update cmd/otel-allocator/allocation/allocator.go
alexperez52 Jul 28, 2021
e3bc7c8
Update cmd/otel-allocator/allocation/allocator.go
Aneurysm9 Jul 28, 2021
b9a5232
Update cmd/otel-allocator/allocation/allocator.go
Aneurysm9 Jul 28, 2021
f640e50
Removed nextCollector and modified locks
alexperez52 Aug 10, 2021
f523de9
Updated collector.go to reflect new namespace
Aug 4, 2021
a5db21d
Refactored display map logic & updated locking convention
alexperez52 Aug 11, 2021
a572659
Updated container port
Aug 11, 2021
cf51157
Change initialized empty collector to nil collector
alexperez52 Aug 13, 2021
286f9e9
Updated collector test logic
rsvarma95 Aug 18, 2021
725515d
Updated allocation files
alexperez52 Aug 19, 2021
8f03e47
Updated allocation import in main.go
alexperez52 Aug 19, 2021
f5a7220
Updated collector & discovery files
rsvarma95 Aug 23, 2021
d638147
Updated unit tallocator unit tests
alexperez52 Aug 23, 2021
9db3956
Updated runWatch to prevent panic
rsvarma95 Aug 23, 2021
d51b39b
Seperated http logic from allocator logic
alexperez52 Aug 24, 2021
6bf5eda
Integrated logr
alexperez52 Aug 24, 2021
6d73fa2
Updated collector test to use channels
rsvarma95 Aug 24, 2021
902a212
Update use of logger and fix error messages
alexperez52 Aug 27, 2021
f450e17
Merge branch 'main' of https://github.com/open-o11y/opentelemetry-ope…
alexperez52 Aug 27, 2021
284ff54
Update test files
alexperez52 Aug 27, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions cmd/otel-allocator/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
# Build the target allocator binary
FROM golang:1.17 as builder

WORKDIR /app

# Copy go mod and sum files
COPY go.mod go.sum ./

RUN go mod download

COPY . .

# Build the Go app
RUN CGO_ENABLED=0 GOOS=linux go build -a -installsuffix cgo -o main .

######## Start a new stage from scratch #######
FROM alpine:latest

RUN apk --no-cache add ca-certificates

WORKDIR /root/

# Copy the pre-built binary file from the previous stage
COPY --from=builder /app/main .

CMD ["./main"]
153 changes: 153 additions & 0 deletions cmd/otel-allocator/allocation/allocator.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
package allocation
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

None of these packages are used outside of this program, correct? Can we move them to cmd/otel-allocator/internal/*? That way we can be confident their scope is contained and not worry about breaking any external packages that have depended on them.

This can be done separately and doesn't need to hold up this PR.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Has this been addressed?


import (
"fmt"
"sync"

"github.com/go-logr/logr"
"github.com/prometheus/common/model"
)

/*
Load balancer will serve on an HTTP server exposing /jobs/<job_id>/targets <- these are configured using least connection
Load balancer will need information about the collectors in order to set the URLs
Keep a Map of what each collector currently holds and update it based on new scrape target updates
*/

type TargetItem struct {
JobName string
Link LinkJSON
TargetURL string
Label model.LabelSet
Collector *collector
}

// Create a struct that holds collector - and jobs for that collector
// This struct will be parsed into endpoint with collector and jobs info

type collector struct {
Name string
NumTargets int
}

// Allocator makes decisions to distribute work among
// a number of OpenTelemetry collectors based on the number of targets.
// Users need to call SetTargets when they have new targets in their
// clusters and call Reshard to process the new targets and reshard.
type Allocator struct {
m sync.Mutex

targetsWaiting map[string]TargetItem // temp buffer to keep targets that are waiting to be processed
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this not holding a pointer to TargetItem, like targetItems is?


collectors map[string]*collector // all current collectors

TargetItems map[string]*TargetItem

log logr.Logger
}

// findNextCollector finds the next collector with less number of targets.
func (allocator *Allocator) findNextCollector() *collector {
var col *collector
for _, v := range allocator.collectors {
// If the initial collector is empty, set the initial collector to the first element of map
if col == nil {
col = v
} else {
if v.NumTargets < col.NumTargets {
col = v
}
}

}
return col
}

// SetTargets accepts the a list of targets that will be used to make
// load balancing decisions. This method should be called when where are
// new targets discovered or existing targets are shutdown.
func (allocator *Allocator) SetWaitingTargets(targets []TargetItem) {
// Dump old data
allocator.m.Lock()
defer allocator.m.Unlock()
allocator.targetsWaiting = make(map[string]TargetItem, len(targets))
// Set new data
for _, i := range targets {
allocator.targetsWaiting[i.JobName+i.TargetURL] = i
}
}

// SetCollectors sets the set of collectors with key=collectorName, value=Collector object.
// SetCollectors is called when Collectors are added or removed
func (allocator *Allocator) SetCollectors(collectors []string) {
log := allocator.log.WithValues("opentelemetry-targetallocator")

allocator.m.Lock()
Aneurysm9 marked this conversation as resolved.
Show resolved Hide resolved
defer allocator.m.Unlock()
if len(collectors) == 0 {
log.Info("No collector instances present")
return
}
for k := range allocator.collectors {
delete(allocator.collectors, k)
}

for _, i := range collectors {
allocator.collectors[i] = &collector{Name: i, NumTargets: 0}
}
}

// Reallocate needs to be called to process the new target updates.
// Until Reallocate is called, old targets will be served.
func (allocator *Allocator) AllocateTargets() {
allocator.m.Lock()
defer allocator.m.Unlock()
allocator.removeOutdatedTargets()
alexperez52 marked this conversation as resolved.
Show resolved Hide resolved
allocator.processWaitingTargets()
}

// ReallocateCollectors reallocates the targets among the new collector instances
func (allocator *Allocator) ReallocateCollectors() {
allocator.m.Lock()
defer allocator.m.Unlock()
allocator.TargetItems = make(map[string]*TargetItem)
allocator.processWaitingTargets()
}

// removeOutdatedTargets removes targets that are no longer available.
func (allocator *Allocator) removeOutdatedTargets() {
for k := range allocator.TargetItems {
if _, ok := allocator.targetsWaiting[k]; !ok {
allocator.collectors[allocator.TargetItems[k].Collector.Name].NumTargets--
delete(allocator.TargetItems, k)
}
}
}

// processWaitingTargets processes the newly set targets.
func (allocator *Allocator) processWaitingTargets() {
for k, v := range allocator.targetsWaiting {
if _, ok := allocator.TargetItems[k]; !ok {
col := allocator.findNextCollector()
allocator.TargetItems[k] = &v
targetItem := TargetItem{
JobName: v.JobName,
Link: LinkJSON{fmt.Sprintf("/jobs/%s/targets", v.JobName)},
TargetURL: v.TargetURL,
Label: v.Label,
Collector: col,
}
col.NumTargets++
allocator.TargetItems[v.JobName+v.TargetURL] = &targetItem
}
}
}

func NewAllocator(log logr.Logger) *Allocator {
return &Allocator{
log: log,
targetsWaiting: make(map[string]TargetItem),
collectors: make(map[string]*collector),
TargetItems: make(map[string]*TargetItem),
}
}
154 changes: 154 additions & 0 deletions cmd/otel-allocator/allocation/allocator_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
package allocation

import (
"math"
"testing"

"github.com/go-logr/logr"
"github.com/prometheus/common/model"
"github.com/stretchr/testify/assert"
)

// Tests least connection - The expected collector after running findNextCollector should be the collector with the least amount of workload
func TestFindNextCollector(t *testing.T) {
var log logr.Logger
s := NewAllocator(log)

defaultCol := collector{Name: "default-col", NumTargets: 1}
maxCol := collector{Name: "max-col", NumTargets: 2}
leastCol := collector{Name: "least-col", NumTargets: 0}
s.collectors[maxCol.Name] = &maxCol
s.collectors[leastCol.Name] = &leastCol
s.collectors[defaultCol.Name] = &defaultCol

assert.Equal(t, "least-col", s.findNextCollector().Name)
}

func TestSetCollectors(t *testing.T) {

var log logr.Logger
s := NewAllocator(log)

cols := []string{"col-1", "col-2", "col-3"}
s.SetCollectors(cols)

excpectedColLen := len(cols)
assert.Len(t, s.collectors, excpectedColLen)

for _, i := range cols {
assert.NotNil(t, s.collectors[i])
}
}

func TestAddingAndRemovingTargets(t *testing.T) {
// prepare allocator with initial targets and collectors
var log logr.Logger
s := NewAllocator(log)

cols := []string{"col-1", "col-2", "col-3"}
s.SetCollectors(cols)

initTargets := []string{"prometheus:1000", "prometheus:1001", "prometheus:1002", "prometheus:1003", "prometheus:1004", "prometheus:1005"}
var targetList []TargetItem
for _, i := range initTargets {
targetList = append(targetList, TargetItem{JobName: "sample-name", TargetURL: i, Label: model.LabelSet{}})
}

// test that targets and collectors are added properly
s.SetWaitingTargets(targetList)
s.AllocateTargets()

// verify
expectedTargetLen := len(initTargets)
assert.Len(t, s.TargetItems, expectedTargetLen)

// prepare second round of targets
tar := []string{"prometheus:1001", "prometheus:1002", "prometheus:1003", "prometheus:1004"}
var newTargetList []TargetItem
for _, i := range tar {
newTargetList = append(newTargetList, TargetItem{JobName: "sample-name", TargetURL: i, Label: model.LabelSet{}})
}

// test that less targets are found - removed
s.SetWaitingTargets(newTargetList)
s.AllocateTargets()

// verify
expectedNewTargetLen := len(tar)
assert.Len(t, s.TargetItems, expectedNewTargetLen)

// verify results map
for _, i := range tar {
_, ok := s.TargetItems["sample-name"+i]
assert.True(t, ok)
}
}

// Tests that the delta in number of targets per collector is less than 15% of an even distribution
func TestCollectorBalanceWhenAddingAndRemovingAtRandom(t *testing.T) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What advantages is this bringing in comparison to the test TestAddingAndRemovingTargets? Is this exercising a different code path than TestCollectorBalanceWhenAddingTargets when it comes to the balancing itself?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TestAddingAndRemovingTargets tests that targets are successfully added with the expected with the expected key
TestCollectorBalanceWhenAddingTargets tests the the balance on an easily divisible number for the purpose of even allocation
TestCollectorBalanceWhenAddingAndRemovingAtRandom uses a more random approach where targets are both added and removed while verifying that at each step there isnt above a 15% difference in workload from collector to collector

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure I get the value-add between the second and the third, but assuming there's enough value justifying the added complexity, isn't it covering the third case covering the second? Meaning, do we need the second? And the first?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From a straight coverage perspective, I think the third test will cover what the first two do as well. But they may still have value in enabling testing and reasoning about each capability somewhat more independently.

I'd like to take another pass at this allocation logic in the future, but I think for now it is a solid starting point.


// prepare allocator with 3 collectors and 'random' amount of targets
var log logr.Logger
s := NewAllocator(log)

cols := []string{"col-1", "col-2", "col-3"}
s.SetCollectors(cols)

targets := []string{"prometheus:1001", "prometheus:1002", "prometheus:1003", "prometheus:1004", "prometheus:1005", "prometheus:1006",
"prometheus:1011", "prometheus:1012", "prometheus:1013", "prometheus:1014", "prometheus:1015", "prometheus:1016",
"prometheus:1021", "prometheus:1022", "prometheus:1023", "prometheus:1024", "prometheus:1025", "prometheus:1026"}
var newTargetList []TargetItem
for _, i := range targets {
newTargetList = append(newTargetList, TargetItem{JobName: "sample-name", TargetURL: i, Label: model.LabelSet{}})
}
s.SetWaitingTargets(newTargetList)
s.AllocateTargets()

// Divisor needed to get 15%
divisor := 6.7

count := len(s.TargetItems) / len(s.collectors)
percent := float64(len(s.TargetItems)) / divisor

// test
for _, i := range s.collectors {
assert.InDelta(t, i.NumTargets, count, percent)
}

// removing targets at 'random'
targets = []string{"prometheus:1002", "prometheus:1003", "prometheus:1004", "prometheus:1006",
"prometheus:1011", "prometheus:1012", "prometheus:1013", "prometheus:1014", "prometheus:1016",
"prometheus:1023", "prometheus:1024", "prometheus:1025", "prometheus:1026"}
newTargetList = []TargetItem{}
for _, i := range targets {
newTargetList = append(newTargetList, TargetItem{JobName: "sample-name", TargetURL: i, Label: model.LabelSet{}})
}
s.SetWaitingTargets(newTargetList)
s.AllocateTargets()

count = len(s.TargetItems) / len(s.collectors)
percent = float64(len(s.TargetItems)) / divisor

// test
for _, i := range s.collectors {
assert.InDelta(t, i.NumTargets, count, math.Round(percent))
}
// adding targets at 'random'
targets = []string{"prometheus:1002", "prometheus:1003", "prometheus:1004", "prometheus:1006",
"prometheus:1011", "prometheus:1012", "prometheus:1001", "prometheus:1014", "prometheus:1016",
"prometheus:1023", "prometheus:1024", "prometheus:1025", "prometheus:1126", "prometheus:1227"}
newTargetList = []TargetItem{}
for _, i := range targets {
newTargetList = append(newTargetList, TargetItem{JobName: "sample-name", TargetURL: i, Label: model.LabelSet{}})
}
s.SetWaitingTargets(newTargetList)
s.AllocateTargets()

count = len(s.TargetItems) / len(s.collectors)
percent = float64(len(s.TargetItems)) / divisor

// test
for _, i := range s.collectors {
assert.InDelta(t, i.NumTargets, count, math.Round(percent))
}
}
Loading