Skip to content

Commit

Permalink
Merge pull request #203 from askervin/5S8_balloons_close_split
Browse files Browse the repository at this point in the history
Add preferCloseToDevices balloon type option
  • Loading branch information
kad authored Dec 15, 2023
2 parents b014264 + f1a7dc4 commit ca7c4f8
Show file tree
Hide file tree
Showing 8 changed files with 349 additions and 18 deletions.
39 changes: 38 additions & 1 deletion cmd/plugins/balloons/policy/balloons-policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -568,10 +568,12 @@ func (p *balloons) newBalloon(blnDef *BalloonDef, confCpus bool) (*Balloon, erro
// are type specific allocator options, otherwise use policy
// default allocator.
cpuTreeAllocator := p.cpuTreeAllocator
if blnDef.AllocatorTopologyBalancing != nil || blnDef.PreferSpreadOnPhysicalCores != nil {
if blnDef.AllocatorTopologyBalancing != nil || blnDef.PreferSpreadOnPhysicalCores != nil || len(blnDef.PreferCloseToDevices) > 0 || len(blnDef.PreferFarFromDevices) > 0 {
allocatorOptions := cpuTreeAllocatorOptions{
topologyBalancing: p.bpoptions.AllocatorTopologyBalancing,
preferSpreadOnPhysicalCores: p.bpoptions.PreferSpreadOnPhysicalCores,
preferCloseToDevices: blnDef.PreferCloseToDevices,
preferFarFromDevices: blnDef.PreferFarFromDevices,
}
if blnDef.AllocatorTopologyBalancing != nil {
allocatorOptions.topologyBalancing = *blnDef.AllocatorTopologyBalancing
Expand Down Expand Up @@ -1091,6 +1093,8 @@ func (p *balloons) setConfig(bpoptions *BalloonsOptions) error {
p.balloons = []*Balloon{}
p.freeCpus = p.allowed.Clone()
p.freeCpus = p.freeCpus.Difference(p.reserved)
p.fillFarFromDevices(bpoptions.BalloonDefs)

p.cpuTreeAllocator = p.cpuTree.NewAllocator(cpuTreeAllocatorOptions{
topologyBalancing: bpoptions.AllocatorTopologyBalancing,
preferSpreadOnPhysicalCores: bpoptions.PreferSpreadOnPhysicalCores,
Expand Down Expand Up @@ -1146,6 +1150,39 @@ func (p *balloons) setConfig(bpoptions *BalloonsOptions) error {
return nil
}

// fillFarFromDevices adds BalloonDefs implicit device anti-affinities
// towards devices that other BalloonDefs prefer to be close to.
func (p *balloons) fillFarFromDevices(blnDefs []*BalloonDef) {
// devDefClose[device][blnDef.Name] equals true if and
// only if the blnDef prefers to be close to the device.
devDefClose := map[string]map[string]bool{}
// avoidDevs is a list of devices for which at least one
// balloon type prefers to be close to. The order of devices
// in the avoidDevs list is significant: devices in the
// beginning of the list will be more effectively avoided than
// devices later in the list.
avoidDevs := []string{}
for _, blnDef := range blnDefs {
for _, closeDev := range blnDef.PreferCloseToDevices {
if _, ok := devDefClose[closeDev]; !ok {
avoidDevs = append(avoidDevs, closeDev)
devDefClose[closeDev] = map[string]bool{}
}
devDefClose[closeDev][blnDef.Name] = true
}
}
// Add every device in avoidDev to PreferFarFromDevices lists
// of those balloon types that do not prefer to be close to
// the device.
for _, avoidDev := range avoidDevs {
for _, blnDef := range blnDefs {
if !devDefClose[avoidDev][blnDef.Name] {
blnDef.PreferFarFromDevices = append(blnDef.PreferFarFromDevices, avoidDev)
}
}
}
}

// closestMems returns memory node IDs good for pinning containers
// that run on given CPUs
func (p *balloons) closestMems(cpus cpuset.CPUSet) idset.IDSet {
Expand Down
189 changes: 180 additions & 9 deletions cmd/plugins/balloons/policy/cputree.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"strings"

system "github.com/containers/nri-plugins/pkg/sysfs"
"github.com/containers/nri-plugins/pkg/topology"
"github.com/containers/nri-plugins/pkg/utils/cpuset"
)

Expand Down Expand Up @@ -55,8 +56,9 @@ type cpuTreeNodeAttributes struct {
// cpuTreeAllocator allocates CPUs from the branch of a CPU tree
// where the "root" node is the topmost CPU of the branch.
type cpuTreeAllocator struct {
options cpuTreeAllocatorOptions
root *cpuTreeNode
options cpuTreeAllocatorOptions
root *cpuTreeNode
cacheCloseCpuSets map[string][]cpuset.CPUSet
}

// cpuTreeAllocatorOptions contains parameters for the CPU allocator
Expand All @@ -67,8 +69,12 @@ type cpuTreeAllocatorOptions struct {
// the opposite (packed allocations).
topologyBalancing bool
preferSpreadOnPhysicalCores bool
preferCloseToDevices []string
preferFarFromDevices []string
}

var emptyCpuSet = cpuset.New()

// String returns string representation of a CPU tree node.
func (t *cpuTreeNode) String() string {
if len(t.children) == 0 {
Expand Down Expand Up @@ -395,8 +401,9 @@ func (t *cpuTreeNode) SplitLevel(splitLevel CPUTopologyLevel, cpuClassifier func
// CPU tree branch.
func (t *cpuTreeNode) NewAllocator(options cpuTreeAllocatorOptions) *cpuTreeAllocator {
ta := &cpuTreeAllocator{
root: t,
options: options,
root: t,
options: options,
cacheCloseCpuSets: map[string][]cpuset.CPUSet{},
}
if options.preferSpreadOnPhysicalCores {
newTree := t.SplitLevel(CPUTopologyLevelNuma,
Expand Down Expand Up @@ -502,8 +509,172 @@ func (ta *cpuTreeAllocator) sorterRelease(tnas []cpuTreeNodeAttributes) func(int
// - removeFromCpus contains CPUs in currentCpus set from which
// abs(delta) CPUs can be freed.
func (ta *cpuTreeAllocator) ResizeCpus(currentCpus, freeCpus cpuset.CPUSet, delta int) (cpuset.CPUSet, cpuset.CPUSet, error) {
resizers := []cpuResizerFunc{
ta.resizeCpusOnlyIfNecessary,
ta.resizeCpusWithDevices,
ta.resizeCpusOneAtATime,
ta.resizeCpusMaxLocalSet,
ta.resizeCpusNow}
return ta.nextCpuResizer(resizers, currentCpus, freeCpus, delta)
}

type cpuResizerFunc func(resizers []cpuResizerFunc, currentCpus, freeCpus cpuset.CPUSet, delta int) (cpuset.CPUSet, cpuset.CPUSet, error)

func (ta *cpuTreeAllocator) nextCpuResizer(resizers []cpuResizerFunc, currentCpus, freeCpus cpuset.CPUSet, delta int) (cpuset.CPUSet, cpuset.CPUSet, error) {
if len(resizers) == 0 {
return freeCpus, currentCpus, fmt.Errorf("internal error: a CPU resizer consulted next resizer but there was no one left")
}
remainingResizers := resizers[1:]
log.Debugf("- resizer-%d(%q, %q, %d)", len(remainingResizers), currentCpus, freeCpus, delta)
addFrom, removeFrom, err := resizers[0](remainingResizers, currentCpus, freeCpus, delta)
return addFrom, removeFrom, err
}

// resizeCpusNow does not call next resizer. Instead it keeps all CPU
// allocations from freeCpus and CPU releases from currentCpus equally
// good. This is the terminal block of resizers chain.
func (ta *cpuTreeAllocator) resizeCpusNow(resizers []cpuResizerFunc, currentCpus, freeCpus cpuset.CPUSet, delta int) (cpuset.CPUSet, cpuset.CPUSet, error) {
return freeCpus, currentCpus, nil
}

// resizeCpusOnlyIfNecessary is the fast path for making trivial
// reservations and to fail if resizing is not possible.
func (ta *cpuTreeAllocator) resizeCpusOnlyIfNecessary(resizers []cpuResizerFunc, currentCpus, freeCpus cpuset.CPUSet, delta int) (cpuset.CPUSet, cpuset.CPUSet, error) {
switch {
case delta == 0:
// Nothing to do.
return emptyCpuSet, emptyCpuSet, nil
case delta > 0:
if freeCpus.Size() < delta {
return freeCpus, emptyCpuSet, fmt.Errorf("not enough free CPUs (%d) to resize current CPU set from %d to %d CPUs", freeCpus.Size(), currentCpus.Size(), currentCpus.Size()+delta)
} else if freeCpus.Size() == delta {
// Allocate all the remaining free CPUs.
return freeCpus, emptyCpuSet, nil
}
case delta < 0:
if currentCpus.Size() < -delta {
return emptyCpuSet, currentCpus, fmt.Errorf("not enough current CPUs (%d) to release %d CPUs", currentCpus.Size(), -delta)
} else if currentCpus.Size() == -delta {
// Free all allocated CPUs.
return emptyCpuSet, currentCpus, nil
}
}
return ta.nextCpuResizer(resizers, currentCpus, freeCpus, delta)
}

// resizeCpusWithDevices prefers allocating CPUs from those freeCpus
// that are topologically close to preferred devices, and releasing
// those currentCpus that are not.
func (ta *cpuTreeAllocator) resizeCpusWithDevices(resizers []cpuResizerFunc, currentCpus, freeCpus cpuset.CPUSet, delta int) (cpuset.CPUSet, cpuset.CPUSet, error) {
// allCloseCpuSets contains cpusets in the order of priority.
// Applying the first cpusets in it are prioritized over ones
// after them.
allCloseCpuSets := [][]cpuset.CPUSet{}
for _, devPath := range ta.options.preferCloseToDevices {
if closeCpuSets := ta.topologyHintCpus(devPath); len(closeCpuSets) > 0 {
allCloseCpuSets = append(allCloseCpuSets, closeCpuSets)
}
}
for _, devPath := range ta.options.preferFarFromDevices {
for _, farCpuSet := range ta.topologyHintCpus(devPath) {
allCloseCpuSets = append(allCloseCpuSets, []cpuset.CPUSet{freeCpus.Difference(farCpuSet)})
}
}
if len(allCloseCpuSets) == 0 {
return ta.nextCpuResizer(resizers, currentCpus, freeCpus, delta)
}
if delta > 0 {
// Allocate N=delta CPUs from freeCpus based on topology hints.
// Build a new set of freeCpus with at least N CPUs based on
// intersection with CPU hints.
// In case of conflicting topology hints the first
// hints in the list are the most important.
remainingFreeCpus := freeCpus
appliedHints := 0
totalHints := 0
for _, closeCpuSets := range allCloseCpuSets {
for _, cpus := range closeCpuSets {
totalHints++
newRemainingFreeCpus := remainingFreeCpus.Intersection(cpus)
if newRemainingFreeCpus.Size() >= delta {
appliedHints++
log.Debugf(" - take hinted cpus %q, common free %q", cpus, newRemainingFreeCpus)
remainingFreeCpus = newRemainingFreeCpus
} else {
log.Debugf(" - drop hinted cpus %q, not enough common free in %q", cpus, newRemainingFreeCpus)
}
}
}
log.Debugf(" - original free cpus %q, took %d/%d hints, remaining free: %q",
freeCpus, appliedHints, totalHints, remainingFreeCpus)
return ta.nextCpuResizer(resizers, currentCpus, remainingFreeCpus, delta)
} else if delta < 0 {
// Free N=-delta CPUs from currentCpus based on topology hints.
// 1. Sort currentCpus based on topology hints (leastHintedCpus).
// 2. Pick largest hint value that has to be released (maxHints).
// 3. Free all CPUs that have a hint value smaller than maxHints.
// 4. Let next CPU resizer choose CPUs to be freed among
// CPUs with hint value maxHints.
currentCpuHints := map[int]uint64{}
for hintPriority, closeCpuSets := range allCloseCpuSets {
for _, cpus := range closeCpuSets {
for _, cpu := range cpus.Intersection(currentCpus).UnsortedList() {
currentCpuHints[cpu] += 1 << (len(allCloseCpuSets) - 1 - hintPriority)
}
}
}
leastHintedCpus := currentCpus.UnsortedList()
sort.Slice(leastHintedCpus, func(i, j int) bool {
return currentCpuHints[leastHintedCpus[i]] < currentCpuHints[leastHintedCpus[j]]
})
maxHints := currentCpuHints[leastHintedCpus[-delta]]
currentToFreeForSure := cpuset.New()
currentToFreeMaybe := cpuset.New()
for i := 0; i < len(leastHintedCpus) && currentCpuHints[leastHintedCpus[i]] <= maxHints; i++ {
if currentCpuHints[leastHintedCpus[i]] < maxHints {
currentToFreeForSure = currentToFreeForSure.Union(cpuset.New(leastHintedCpus[i]))
} else {
currentToFreeMaybe = currentToFreeMaybe.Union(cpuset.New(leastHintedCpus[i]))
}
}
remainingDelta := delta + currentToFreeForSure.Size()
log.Debugf(" - device hints: from cpus %q: free for sure: %q and %d more from: %q",
currentCpus, currentToFreeForSure, -remainingDelta, currentToFreeMaybe)
_, freeFromMaybe, err := ta.nextCpuResizer(resizers, currentToFreeMaybe, freeCpus, remainingDelta)
// Do not include possible extra CPUs from
// freeFromMaybe to make sure that all CPUs with least
// hints will be freed.
for _, cpu := range freeFromMaybe.UnsortedList() {
if currentToFreeForSure.Size() >= -delta {
break
}
currentToFreeForSure = currentToFreeForSure.Union(cpuset.New(cpu))
}
return freeCpus, currentToFreeForSure, err
}
return freeCpus, currentCpus, nil
}

// Fetch cached topology hint, return error only once per bad dev
func (ta *cpuTreeAllocator) topologyHintCpus(dev string) []cpuset.CPUSet {
if closeCpuSets, ok := ta.cacheCloseCpuSets[dev]; ok {
return closeCpuSets
}
topologyHints, err := topology.NewTopologyHints(dev)
if err != nil {
log.Errorf("failed to find topology of device %q: %v", dev, err)
ta.cacheCloseCpuSets[dev] = []cpuset.CPUSet{}
} else {
for _, topologyHint := range topologyHints {
ta.cacheCloseCpuSets[dev] = append(ta.cacheCloseCpuSets[dev], cpuset.MustParse(topologyHint.CPUs))
}
}
return ta.cacheCloseCpuSets[dev]
}

func (ta *cpuTreeAllocator) resizeCpusOneAtATime(resizers []cpuResizerFunc, currentCpus, freeCpus cpuset.CPUSet, delta int) (cpuset.CPUSet, cpuset.CPUSet, error) {
if delta > 0 {
addFromSuperset, removeFromSuperset, err := ta.resizeCpus(currentCpus, freeCpus, delta)
addFromSuperset, removeFromSuperset, err := ta.nextCpuResizer(resizers, currentCpus, freeCpus, delta)
if !ta.options.preferSpreadOnPhysicalCores || addFromSuperset.Size() == delta {
return addFromSuperset, removeFromSuperset, err
}
Expand All @@ -515,7 +686,7 @@ func (ta *cpuTreeAllocator) ResizeCpus(currentCpus, freeCpus cpuset.CPUSet, delt
// set by adding one CPU at a time.
addFrom := cpuset.New()
for n := 0; n < delta; n++ {
addSingleFrom, _, err := ta.resizeCpus(currentCpus, freeCpus, 1)
addSingleFrom, _, err := ta.nextCpuResizer(resizers, currentCpus, freeCpus, 1)
if err != nil {
return addFromSuperset, removeFromSuperset, err
}
Expand All @@ -540,7 +711,7 @@ func (ta *cpuTreeAllocator) ResizeCpus(currentCpus, freeCpus cpuset.CPUSet, delt
removeFrom := cpuset.New()
addFrom := cpuset.New()
for n := 0; n < -delta; n++ {
_, removeSingleFrom, err := ta.resizeCpus(currentCpus, freeCpus, -1)
_, removeSingleFrom, err := ta.nextCpuResizer(resizers, currentCpus, freeCpus, -1)
if err != nil {
return addFrom, removeFrom, err
}
Expand All @@ -563,7 +734,7 @@ func (ta *cpuTreeAllocator) ResizeCpus(currentCpus, freeCpus cpuset.CPUSet, delt
return addFrom, removeFrom, nil
}

func (ta *cpuTreeAllocator) resizeCpus(currentCpus, freeCpus cpuset.CPUSet, delta int) (cpuset.CPUSet, cpuset.CPUSet, error) {
func (ta *cpuTreeAllocator) resizeCpusMaxLocalSet(resizers []cpuResizerFunc, currentCpus, freeCpus cpuset.CPUSet, delta int) (cpuset.CPUSet, cpuset.CPUSet, error) {
tnas := ta.root.ToAttributedSlice(currentCpus, freeCpus,
func(tna *cpuTreeNodeAttributes) bool {
// filter out branches with insufficient cpus
Expand All @@ -587,5 +758,5 @@ func (ta *cpuTreeAllocator) resizeCpus(currentCpus, freeCpus cpuset.CPUSet, delt
if len(tnas) == 0 {
return freeCpus, currentCpus, fmt.Errorf("not enough free CPUs")
}
return tnas[0].freeCpus, tnas[0].currentCpus, nil
return ta.nextCpuResizer(resizers, tnas[0].currentCpus, tnas[0].freeCpus, delta)
}
Loading

0 comments on commit ca7c4f8

Please sign in to comment.