Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[release-4.17] OCPBUGS-43820: UPSTREAM: <carry>: kubelet/cm: fix bug where kubelet restarts from missing cpuset cgroup #2126

Open
wants to merge 4 commits into
base: release-4.17
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions pkg/kubelet/cm/cgroup_manager_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -384,6 +384,9 @@ func (m *cgroupManagerImpl) toResources(resourceConfig *ResourceConfig) *libcont
if resourceConfig.PidsLimit != nil {
resources.PidsLimit = *resourceConfig.PidsLimit
}
if !resourceConfig.CPUSet.IsEmpty() {
resources.CpusetCpus = resourceConfig.CPUSet.String()
}

m.maybeSetHugetlb(resourceConfig, resources)

Expand Down
24 changes: 19 additions & 5 deletions pkg/kubelet/cm/cpumanager/cpu_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,10 @@ type Manager interface {
// GetCPUAffinity returns cpuset which includes cpus from shared pools
// as well as exclusively allocated cpus
GetCPUAffinity(podUID, containerName string) cpuset.CPUSet

// GetAllCPUs returns all the CPUs known by cpumanager, as reported by the
// hardware discovery. Maps to the CPU capacity.
GetAllCPUs() cpuset.CPUSet
}

type manager struct {
Expand Down Expand Up @@ -137,7 +141,11 @@ type manager struct {
// stateFileDirectory holds the directory where the state file for checkpoints is held.
stateFileDirectory string

// allocatableCPUs is the set of online CPUs as reported by the system
// allCPUs is the set of online CPUs as reported by the system
allCPUs cpuset.CPUSet

// allocatableCPUs is the set of online CPUs as reported by the system,
// and available for allocation, minus the reserved set
allocatableCPUs cpuset.CPUSet

// pendingAdmissionPod contain the pod during the admission phase
Expand All @@ -157,6 +165,11 @@ func NewManager(cpuPolicyName string, cpuPolicyOptions map[string]string, reconc
var policy Policy
var err error

topo, err = topology.Discover(machineInfo)
if err != nil {
return nil, err
}

switch policyName(cpuPolicyName) {

case PolicyNone:
Expand All @@ -166,10 +179,6 @@ func NewManager(cpuPolicyName string, cpuPolicyOptions map[string]string, reconc
}

case PolicyStatic:
topo, err = topology.Discover(machineInfo)
if err != nil {
return nil, err
}
klog.InfoS("Detected CPU topology", "topology", topo)

reservedCPUs, ok := nodeAllocatableReservation[v1.ResourceCPU]
Expand Down Expand Up @@ -206,6 +215,7 @@ func NewManager(cpuPolicyName string, cpuPolicyOptions map[string]string, reconc
topology: topo,
nodeAllocatableReservation: nodeAllocatableReservation,
stateFileDirectory: stateFileDirectory,
allCPUs: topo.CPUDetails.CPUs(),
}
manager.sourcesReady = &sourcesReadyStub{}
return manager, nil
Expand Down Expand Up @@ -340,6 +350,10 @@ func (m *manager) GetAllocatableCPUs() cpuset.CPUSet {
return m.allocatableCPUs.Clone()
}

func (m *manager) GetAllCPUs() cpuset.CPUSet {
return m.allCPUs.Clone()
}

type reconciledContainer struct {
podName string
containerName string
Expand Down
11 changes: 2 additions & 9 deletions pkg/kubelet/cm/cpumanager/cpu_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -693,15 +693,8 @@ func TestCPUManagerGenerate(t *testing.T) {
if rawMgr.policy.Name() != testCase.expectedPolicy {
t.Errorf("Unexpected policy name. Have: %q wants %q", rawMgr.policy.Name(), testCase.expectedPolicy)
}
if rawMgr.policy.Name() == string(PolicyNone) {
if rawMgr.topology != nil {
t.Errorf("Expected topology to be nil for 'none' policy. Have: %q", rawMgr.topology)
}
}
if rawMgr.policy.Name() != string(PolicyNone) {
if rawMgr.topology == nil {
t.Errorf("Expected topology to be non-nil for policy '%v'. Have: %q", rawMgr.policy.Name(), rawMgr.topology)
}
if rawMgr.topology == nil {
t.Errorf("Expected topology to be non-nil for policy '%v'. Have: %q", rawMgr.policy.Name(), rawMgr.topology)
}
}
})
Expand Down
5 changes: 5 additions & 0 deletions pkg/kubelet/cm/cpumanager/fake_cpu_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,11 @@ func (m *fakeManager) GetCPUAffinity(podUID, containerName string) cpuset.CPUSet
return cpuset.CPUSet{}
}

func (m *fakeManager) GetAllCPUs() cpuset.CPUSet {
klog.InfoS("GetAllCPUs")
return cpuset.CPUSet{}
}

// NewFakeManager creates empty/fake cpu manager
func NewFakeManager() Manager {
return &fakeManager{
Expand Down
31 changes: 22 additions & 9 deletions pkg/kubelet/cm/node_container_manager_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func (cm *containerManagerImpl) createNodeAllocatableCgroups() error {
cgroupConfig := &CgroupConfig{
Name: cm.cgroupRoot,
// The default limits for cpu shares can be very low which can lead to CPU starvation for pods.
ResourceParameters: getCgroupConfig(nodeAllocatable),
ResourceParameters: cm.getCgroupConfig(nodeAllocatable),
}
if cm.cgroupManager.Exists(cgroupConfig.Name) {
return nil
Expand Down Expand Up @@ -80,7 +80,7 @@ func (cm *containerManagerImpl) enforceNodeAllocatableCgroups() error {

cgroupConfig := &CgroupConfig{
Name: cm.cgroupRoot,
ResourceParameters: getCgroupConfig(nodeAllocatable),
ResourceParameters: cm.getCgroupConfig(nodeAllocatable),
}

// Using ObjectReference for events as the node maybe not cached; refer to #42701 for detail.
Expand Down Expand Up @@ -114,7 +114,7 @@ func (cm *containerManagerImpl) enforceNodeAllocatableCgroups() error {
// Now apply kube reserved and system reserved limits if required.
if nc.EnforceNodeAllocatable.Has(kubetypes.SystemReservedEnforcementKey) {
klog.V(2).InfoS("Enforcing system reserved on cgroup", "cgroupName", nc.SystemReservedCgroupName, "limits", nc.SystemReserved)
if err := enforceExistingCgroup(cm.cgroupManager, cm.cgroupManager.CgroupName(nc.SystemReservedCgroupName), nc.SystemReserved); err != nil {
if err := cm.enforceExistingCgroup(nc.SystemReservedCgroupName, nc.SystemReserved); err != nil {
message := fmt.Sprintf("Failed to enforce System Reserved Cgroup Limits on %q: %v", nc.SystemReservedCgroupName, err)
cm.recorder.Event(nodeRef, v1.EventTypeWarning, events.FailedNodeAllocatableEnforcement, message)
return fmt.Errorf(message)
Expand All @@ -123,7 +123,7 @@ func (cm *containerManagerImpl) enforceNodeAllocatableCgroups() error {
}
if nc.EnforceNodeAllocatable.Has(kubetypes.KubeReservedEnforcementKey) {
klog.V(2).InfoS("Enforcing kube reserved on cgroup", "cgroupName", nc.KubeReservedCgroupName, "limits", nc.KubeReserved)
if err := enforceExistingCgroup(cm.cgroupManager, cm.cgroupManager.CgroupName(nc.KubeReservedCgroupName), nc.KubeReserved); err != nil {
if err := cm.enforceExistingCgroup(nc.KubeReservedCgroupName, nc.KubeReserved); err != nil {
message := fmt.Sprintf("Failed to enforce Kube Reserved Cgroup Limits on %q: %v", nc.KubeReservedCgroupName, err)
cm.recorder.Event(nodeRef, v1.EventTypeWarning, events.FailedNodeAllocatableEnforcement, message)
return fmt.Errorf(message)
Expand All @@ -134,8 +134,9 @@ func (cm *containerManagerImpl) enforceNodeAllocatableCgroups() error {
}

// enforceExistingCgroup updates the limits `rl` on existing cgroup `cName` using `cgroupManager` interface.
func enforceExistingCgroup(cgroupManager CgroupManager, cName CgroupName, rl v1.ResourceList) error {
rp := getCgroupConfig(rl)
func (cm *containerManagerImpl) enforceExistingCgroup(cNameStr string, rl v1.ResourceList) error {
cName := cm.cgroupManager.CgroupName(cNameStr)
rp := cm.getCgroupConfig(rl)
if rp == nil {
return fmt.Errorf("%q cgroup is not configured properly", cName)
}
Expand All @@ -156,17 +157,17 @@ func enforceExistingCgroup(cgroupManager CgroupManager, cName CgroupName, rl v1.
ResourceParameters: rp,
}
klog.V(4).InfoS("Enforcing limits on cgroup", "cgroupName", cName, "cpuShares", cgroupConfig.ResourceParameters.CPUShares, "memory", cgroupConfig.ResourceParameters.Memory, "pidsLimit", cgroupConfig.ResourceParameters.PidsLimit)
if err := cgroupManager.Validate(cgroupConfig.Name); err != nil {
if err := cm.cgroupManager.Validate(cgroupConfig.Name); err != nil {
return err
}
if err := cgroupManager.Update(cgroupConfig); err != nil {
if err := cm.cgroupManager.Update(cgroupConfig); err != nil {
return err
}
return nil
}

// getCgroupConfig returns a ResourceConfig object that can be used to create or update cgroups via CgroupManager interface.
func getCgroupConfig(rl v1.ResourceList) *ResourceConfig {
func (cm *containerManagerImpl) getCgroupConfig(rl v1.ResourceList) *ResourceConfig {
// TODO(vishh): Set CPU Quota if necessary.
if rl == nil {
return nil
Expand All @@ -188,6 +189,18 @@ func getCgroupConfig(rl v1.ResourceList) *ResourceConfig {
}
rc.HugePageLimit = HugePageLimits(rl)

// In the case of a None policy, cgroupv2 and systemd cgroup manager, we must make sure systemd is aware of the cpuset cgroup.
// By default, systemd will not create it, as we've not chosen to delegate it, and we haven't included it in the Apply() request.
// However, this causes a bug where kubelet restarts unnecessarily (cpuset cgroup is created in the cgroupfs, but systemd
// doesn't know about it and deletes it, and then kubelet doesn't continue because the cgroup isn't configured as expected).
// An alternative is to delegate the `cpuset` cgroup to the kubelet, but that would require some plumbing in libcontainer,
// and this is sufficient.
// Only do so on None policy, as Static policy will do its own updating of the cpuset.
// Please see the comment on policy none's GetAllocatableCPUs
if cm.cpuManager.GetAllocatableCPUs().IsEmpty() {
rc.CPUSet = cm.cpuManager.GetAllCPUs()
}

return &rc
}

Expand Down
3 changes: 3 additions & 0 deletions pkg/kubelet/cm/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,15 @@ package cm
import (
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/utils/cpuset"
)

// ResourceConfig holds information about all the supported cgroup resource parameters.
type ResourceConfig struct {
// Memory limit (in bytes).
Memory *int64
// CPU set (number of cpus the cgroup has access to).
CPUSet cpuset.CPUSet
// CPU shares (relative weight vs. other containers).
CPUShares *uint64
// CPU hardcap limit (in usecs). Allowed cpu time in a given period.
Expand Down
57 changes: 57 additions & 0 deletions test/e2e_node/node_container_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,63 @@ var _ = SIGDescribe("Node Container Manager", framework.WithSerial(), func() {
framework.ExpectNoError(runTest(ctx, f))
})
})
f.Describe("Validate CGroup management", func() {
// Regression test for https://issues.k8s.io/125923
// In this issue there's a race involved with systemd which seems to manifest most likely, or perhaps only
// (data gathered so far seems inconclusive) on the very first boot of the machine, so restarting the kubelet
// seems not sufficient. OTOH, the exact reproducer seems to require a dedicate lane with only this test, or
// to reboot the machine before to run this test. Both are practically unrealistic in CI.
// The closest approximation is this test in this current form, using a kubelet restart. This at least
// acts as non regression testing, so it still brings value.
ginkgo.It("should correctly start with cpumanager none policy in use with systemd", func(ctx context.Context) {
if !IsCgroup2UnifiedMode() {
ginkgo.Skip("this test requires cgroups v2")
}

var err error
var oldCfg *kubeletconfig.KubeletConfiguration
// Get current kubelet configuration
oldCfg, err = getCurrentKubeletConfig(ctx)
framework.ExpectNoError(err)

ginkgo.DeferCleanup(func(ctx context.Context) {
if oldCfg != nil {
// Update the Kubelet configuration.
framework.ExpectNoError(e2enodekubelet.WriteKubeletConfigFile(oldCfg))

ginkgo.By("Restarting the kubelet")
restartKubelet(true)

// wait until the kubelet health check will succeed
gomega.Eventually(ctx, func(ctx context.Context) bool {
return kubeletHealthCheck(kubeletHealthCheckURL)
}).WithTimeout(2 * time.Minute).WithPolling(5 * time.Second).Should(gomega.BeTrueBecause("expected kubelet to be in healthy state"))
ginkgo.By("Started the kubelet")
}
})

newCfg := oldCfg.DeepCopy()
// Change existing kubelet configuration
newCfg.CPUManagerPolicy = "none"
newCfg.CgroupDriver = "systemd"

// Update the Kubelet configuration.
framework.ExpectNoError(e2enodekubelet.WriteKubeletConfigFile(newCfg))

ginkgo.By("Restarting the kubelet")
restartKubelet(true)

// wait until the kubelet health check will succeed
gomega.Eventually(ctx, func(ctx context.Context) bool {
return kubeletHealthCheck(kubeletHealthCheckURL)
}).WithTimeout(2 * time.Minute).WithPolling(5 * time.Second).Should(gomega.BeTrueBecause("expected kubelet to be in healthy state"))
ginkgo.By("Started the kubelet")

gomega.Consistently(ctx, func(ctx context.Context) bool {
return getNodeReadyStatus(ctx, f) && kubeletHealthCheck(kubeletHealthCheckURL)
}).WithTimeout(2 * time.Minute).WithPolling(2 * time.Second).Should(gomega.BeTrueBecause("node keeps reporting ready status"))
})
})
})

func expectFileValToEqual(filePath string, expectedValue, delta int64) error {
Expand Down