From 9f47f1d903da44218f02ce93a9b526f06ee70fa3 Mon Sep 17 00:00:00 2001 From: Zhang Kang Date: Tue, 30 Apr 2024 16:11:42 +0800 Subject: [PATCH] add pod resctrl Signed-off-by: Zhang Kang --- .../plugins/resctrl/resctrl_reconcile.go | 5 + pkg/koordlet/resourceexecutor/config.go | 1 + .../resourceexecutor/resctrl_updater.go | 73 ++++ .../resourceexecutor/resctrl_updater_test.go | 44 +++ pkg/koordlet/runtimehooks/config.go | 9 + pkg/koordlet/runtimehooks/hooks/hooks.go | 7 +- .../runtimehooks/hooks/resctrl/resctrl.go | 320 ++++++++++++++++++ .../runtimehooks/hooks/resctrl/rule.go | 72 ++++ pkg/koordlet/runtimehooks/nri/server.go | 22 +- .../protocol/container_context.go | 5 + .../runtimehooks/protocol/pod_context.go | 59 +++- .../runtimehooks/protocol/protocol.go | 24 ++ .../runtimehooks/reconciler/reconciler.go | 117 ++++++- pkg/koordlet/runtimehooks/runtimehooks.go | 5 +- pkg/koordlet/statesinformer/api.go | 6 +- .../statesinformer/impl/states_pods.go | 71 +++- .../statesinformer/impl/states_pods_test.go | 3 + pkg/koordlet/util/resctrl/ctrl_mgr.go | 194 +++++++++++ pkg/koordlet/util/resctrl/resctrl.go | 319 +++++++++++++++++ pkg/koordlet/util/system/resctrl.go | 75 +++- pkg/runtimeproxy/config/config.go | 1 + 21 files changed, 1409 insertions(+), 23 deletions(-) create mode 100644 pkg/koordlet/runtimehooks/hooks/resctrl/resctrl.go create mode 100644 pkg/koordlet/runtimehooks/hooks/resctrl/rule.go create mode 100644 pkg/koordlet/util/resctrl/ctrl_mgr.go create mode 100644 pkg/koordlet/util/resctrl/resctrl.go diff --git a/pkg/koordlet/qosmanager/plugins/resctrl/resctrl_reconcile.go b/pkg/koordlet/qosmanager/plugins/resctrl/resctrl_reconcile.go index 973d12d75..2bd49f2c1 100644 --- a/pkg/koordlet/qosmanager/plugins/resctrl/resctrl_reconcile.go +++ b/pkg/koordlet/qosmanager/plugins/resctrl/resctrl_reconcile.go @@ -459,6 +459,11 @@ func (r *resctrlReconcile) reconcileResctrlGroups(qosStrategy *slov1alpha1.Resou podsMeta := r.statesInformer.GetAllPods() for _, podMeta := range podsMeta { pod := podMeta.Pod + + // only QoS class level pod are considered + if _, ok := pod.Annotations[extension.AnnotationResctrl]; ok { + continue + } // only Running and Pending pods are considered if pod.Status.Phase != corev1.PodRunning && pod.Status.Phase != corev1.PodPending { continue diff --git a/pkg/koordlet/resourceexecutor/config.go b/pkg/koordlet/resourceexecutor/config.go index c66ca6e54..b0ab184b0 100644 --- a/pkg/koordlet/resourceexecutor/config.go +++ b/pkg/koordlet/resourceexecutor/config.go @@ -21,6 +21,7 @@ import "flag" const ( ReasonUpdateCgroups = "UpdateCgroups" ReasonUpdateSystemConfig = "UpdateSystemConfig" + CreateCATGroup = "CreateCATGroup" ReasonUpdateResctrl = "UpdateResctrl" // update resctrl tasks, schemata EvictPodByNodeMemoryUsage = "EvictPodByNodeMemoryUsage" diff --git a/pkg/koordlet/resourceexecutor/resctrl_updater.go b/pkg/koordlet/resourceexecutor/resctrl_updater.go index ba8a19b63..b0b64eb80 100644 --- a/pkg/koordlet/resourceexecutor/resctrl_updater.go +++ b/pkg/koordlet/resourceexecutor/resctrl_updater.go @@ -55,6 +55,64 @@ func (r *ResctrlSchemataResourceUpdater) Clone() ResourceUpdater { } } +func NewResctrlSchemataResource(group, schemata string, e *audit.EventHelper) (ResourceUpdater, error) { + if schemata == "" { + return nil, fmt.Errorf("schemata is nil") + } + schemataFile := sysutil.ResctrlSchemata.Path(group) + schemataKey := sysutil.ResctrlSchemataName + ":" + schemataFile + // The current assumption is that the cache ids obtained through + // resctrl schemata will not go wrong. TODO: Use the ability of node info + // to obtain cache ids to replace the current method. + ids, _ := sysutil.CacheIdsCacheFunc() + schemataRaw := sysutil.NewResctrlSchemataRaw(ids).WithL3Num(len(ids)) + err := schemataRaw.ParseResctrlSchemata(schemata, -1) + if err != nil { + klog.Errorf("failed to parse %v", err) + } + items := []string{} + for _, item := range []struct { + validFunc func() (bool, string) + value func() string + }{ + {validFunc: schemataRaw.ValidateL3, value: schemataRaw.L3String}, + {validFunc: schemataRaw.ValidateMB, value: schemataRaw.MBString}, + } { + if valid, _ := item.validFunc(); valid { + items = append(items, item.value()) + } + } + schemataStr := strings.Join(items, "") + klog.V(6).Infof("generate new resctrl schemata resource, file %s, key %s, value %s, schemata %s", + schemataFile, schemataKey, schemataStr, schemata) + return &ResctrlSchemataResourceUpdater{ + DefaultResourceUpdater: DefaultResourceUpdater{ + key: schemataKey, + file: schemataFile, + value: schemataStr, + updateFunc: UpdateResctrlSchemataFunc, + eventHelper: e, + }, + schemataRaw: schemataRaw, + }, err +} + +func NewCatGroupResource(group string, e *audit.EventHelper) (ResourceUpdater, error) { + if group == "" { + return nil, fmt.Errorf("group is nil") + } + schemataFile := sysutil.ResctrlSchemata.Path(group) + + klog.V(6).Infof("generate new cat group resource, file %s", schemataFile) + return &DefaultResourceUpdater{ + key: group, + file: schemataFile, + value: "", + updateFunc: InitCatGroupFunc, + eventHelper: e, + }, nil +} + func NewResctrlL3SchemataResource(group, schemataDelta string, l3Num int) ResourceUpdater { schemataFile := sysutil.ResctrlSchemata.Path(group) l3SchemataKey := sysutil.L3SchemataPrefix + ":" + schemataFile @@ -116,6 +174,21 @@ func CalculateResctrlL3TasksResource(group string, taskIds []int32) (ResourceUpd return NewCommonDefaultUpdaterWithUpdateFunc(tasksPath, tasksPath, builder.String(), UpdateResctrlTasksFunc, eventHelper) } +func InitCatGroupFunc(u ResourceUpdater) error { + r, ok := u.(*DefaultResourceUpdater) + if !ok { + return fmt.Errorf("not a ResctrlSchemataResourceUpdater") + } + + err := sysutil.InitCatGroupIfNotExist(r.key) + if err != nil { + return err + } + _ = audit.V(3).Reason(CreateCATGroup).Message("Create %v to %v", u.Key(), u.Value()).Do() + + return nil +} + func UpdateResctrlSchemataFunc(u ResourceUpdater) error { r, ok := u.(*ResctrlSchemataResourceUpdater) if !ok { diff --git a/pkg/koordlet/resourceexecutor/resctrl_updater_test.go b/pkg/koordlet/resourceexecutor/resctrl_updater_test.go index 201927cf1..6b579899a 100644 --- a/pkg/koordlet/resourceexecutor/resctrl_updater_test.go +++ b/pkg/koordlet/resourceexecutor/resctrl_updater_test.go @@ -112,3 +112,47 @@ func TestNewResctrlMbSchemataResource(t *testing.T) { assert.NoError(t, err) }) } + +func TestNewResctrlSchemataResource(t *testing.T) { + t.Run("test_all_schemata", func(t *testing.T) { + helper := system.NewFileTestUtil(t) + defer helper.Cleanup() + + sysFSRootDirName := "NewResctrlSchemataResource" + helper.MkDirAll(sysFSRootDirName) + system.Conf.SysFSRootDir = filepath.Join(helper.TempDir, sysFSRootDirName) + testingPrepareResctrlL3CatGroups(t, "7ff", " L3:0=ff;1=ff\n MB:0=100;1=100") + updater, _ := NewResctrlSchemataResource("BE", "L3:0=f;1=f\nMB:0=60;1=60", nil) + assert.Equal(t, "L3:0=f;1=f;\nMB:0=60;1=60;\n", updater.Value()) + err := updater.update() + assert.NoError(t, err) + }) + + t.Run("test_LLC_resource", func(t *testing.T) { + helper := system.NewFileTestUtil(t) + defer helper.Cleanup() + + sysFSRootDirName := "NewResctrlSchemataResourceSingleLLC" + helper.MkDirAll(sysFSRootDirName) + system.Conf.SysFSRootDir = filepath.Join(helper.TempDir, sysFSRootDirName) + testingPrepareResctrlL3CatGroups(t, "7ff", " L3:0=ff;1=ff") + updater, _ := NewResctrlSchemataResource("BE", "L3:0=f;1=f", nil) + assert.Equal(t, "L3:0=f;1=f;\n", updater.Value()) + err := updater.update() + assert.NoError(t, err) + }) + + t.Run("test_MB_resource", func(t *testing.T) { + helper := system.NewFileTestUtil(t) + defer helper.Cleanup() + + sysFSRootDirName := "NewResctrlSchemataResourceSingleMB" + helper.MkDirAll(sysFSRootDirName) + system.Conf.SysFSRootDir = filepath.Join(helper.TempDir, sysFSRootDirName) + testingPrepareResctrlL3CatGroups(t, "", " MB:0=10;1=10") + updater, _ := NewResctrlSchemataResource("BE", "MB:0=20;1=20", nil) + assert.Equal(t, "MB:0=20;1=20;\n", updater.Value()) + err := updater.update() + assert.NoError(t, err) + }) +} diff --git a/pkg/koordlet/runtimehooks/config.go b/pkg/koordlet/runtimehooks/config.go index 294a92b7f..ee04a9275 100644 --- a/pkg/koordlet/runtimehooks/config.go +++ b/pkg/koordlet/runtimehooks/config.go @@ -31,6 +31,7 @@ import ( "github.com/koordinator-sh/koordinator/pkg/koordlet/runtimehooks/hooks/cpuset" "github.com/koordinator-sh/koordinator/pkg/koordlet/runtimehooks/hooks/gpu" "github.com/koordinator-sh/koordinator/pkg/koordlet/runtimehooks/hooks/groupidentity" + "github.com/koordinator-sh/koordinator/pkg/koordlet/runtimehooks/hooks/resctrl" "github.com/koordinator-sh/koordinator/pkg/koordlet/runtimehooks/hooks/terwayqos" "github.com/koordinator-sh/koordinator/pkg/koordlet/util/system" ) @@ -80,6 +81,12 @@ const ( // owner: @l1b0k // alpha: v1.5 TerwayQoS featuregate.Feature = "TerwayQoS" + + // Resctrl adjusts LLC/MB value for pod. + // + // owner: @kangclzjc @saintube @zwzhang0107 + // alpha: v1.5 + Resctrl featuregate.Feature = "Resctrl" ) var ( @@ -91,6 +98,7 @@ var ( CPUNormalization: {Default: false, PreRelease: featuregate.Alpha}, CoreSched: {Default: false, PreRelease: featuregate.Alpha}, TerwayQoS: {Default: false, PreRelease: featuregate.Alpha}, + Resctrl: {Default: false, PreRelease: featuregate.Alpha}, } runtimeHookPlugins = map[featuregate.Feature]HookPlugin{ @@ -101,6 +109,7 @@ var ( CPUNormalization: cpunormalization.Object(), CoreSched: coresched.Object(), TerwayQoS: terwayqos.Object(), + Resctrl: resctrl.Object(), } ) diff --git a/pkg/koordlet/runtimehooks/hooks/hooks.go b/pkg/koordlet/runtimehooks/hooks/hooks.go index c1ca3e350..72702e1e3 100644 --- a/pkg/koordlet/runtimehooks/hooks/hooks.go +++ b/pkg/koordlet/runtimehooks/hooks/hooks.go @@ -25,6 +25,7 @@ import ( "github.com/koordinator-sh/koordinator/pkg/koordlet/metrics" "github.com/koordinator-sh/koordinator/pkg/koordlet/resourceexecutor" "github.com/koordinator-sh/koordinator/pkg/koordlet/runtimehooks/protocol" + "github.com/koordinator-sh/koordinator/pkg/koordlet/statesinformer" rmconfig "github.com/koordinator-sh/koordinator/pkg/runtimeproxy/config" ) @@ -36,8 +37,9 @@ type Hook struct { } type Options struct { - Reader resourceexecutor.CgroupReader - Executor resourceexecutor.ResourceUpdateExecutor + Reader resourceexecutor.CgroupReader + Executor resourceexecutor.ResourceUpdateExecutor + StatesInformer statesinformer.StatesInformer } type HookFn func(protocol.HooksProtocol) error @@ -106,6 +108,7 @@ func init() { rmconfig.PostStopContainer: make([]*Hook, 0), rmconfig.PostStopPodSandbox: make([]*Hook, 0), rmconfig.PreUpdateContainerResources: make([]*Hook, 0), + rmconfig.PreRemoveRunPodSandbox: make([]*Hook, 0), } } diff --git a/pkg/koordlet/runtimehooks/hooks/resctrl/resctrl.go b/pkg/koordlet/runtimehooks/hooks/resctrl/resctrl.go new file mode 100644 index 000000000..601608d54 --- /dev/null +++ b/pkg/koordlet/runtimehooks/hooks/resctrl/resctrl.go @@ -0,0 +1,320 @@ +/* +Copyright 2022 The Koordinator Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package resctrl + +import ( + "fmt" + "os" + "strings" + + "k8s.io/klog/v2" + + apiext "github.com/koordinator-sh/koordinator/apis/extension" + "github.com/koordinator-sh/koordinator/pkg/koordlet/resourceexecutor" + "github.com/koordinator-sh/koordinator/pkg/koordlet/runtimehooks/hooks" + "github.com/koordinator-sh/koordinator/pkg/koordlet/runtimehooks/protocol" + "github.com/koordinator-sh/koordinator/pkg/koordlet/runtimehooks/reconciler" + "github.com/koordinator-sh/koordinator/pkg/koordlet/runtimehooks/rule" + "github.com/koordinator-sh/koordinator/pkg/koordlet/statesinformer" + util "github.com/koordinator-sh/koordinator/pkg/koordlet/util/resctrl" + "github.com/koordinator-sh/koordinator/pkg/koordlet/util/system" + sysutil "github.com/koordinator-sh/koordinator/pkg/koordlet/util/system" + rmconfig "github.com/koordinator-sh/koordinator/pkg/runtimeproxy/config" +) + +const ( + name = "Resctrl" + description = "set resctrl for pod" + ruleNameForAllPods = name + " (AllPods)" +) + +type UpdateFunc func(resource util.ResctrlUpdater) error + +type DefaultResctrlProtocolUpdater struct { + hooksProtocol protocol.HooksProtocol + group string + schemata string + updateFunc UpdateFunc +} + +func (u DefaultResctrlProtocolUpdater) Name() string { + return "default" +} + +func (u DefaultResctrlProtocolUpdater) Key() string { + return u.group +} + +func (u DefaultResctrlProtocolUpdater) Value() string { + return u.schemata +} + +func (r *DefaultResctrlProtocolUpdater) SetKey(key string) { + r.group = key +} + +func (r *DefaultResctrlProtocolUpdater) SetValue(val string) { + r.schemata = val +} + +func (u *DefaultResctrlProtocolUpdater) Update() error { + return u.updateFunc(u) +} + +type Updater func(u DefaultResctrlProtocolUpdater) error + +func NewCreateResctrlProtocolUpdater(hooksProtocol protocol.HooksProtocol) util.ResctrlUpdater { + return &DefaultResctrlProtocolUpdater{ + hooksProtocol: hooksProtocol, + updateFunc: CreateResctrlProtocolUpdaterFunc, + } +} + +func NewRemoveResctrlProtocolUpdater(hooksProtocol protocol.HooksProtocol) util.ResctrlUpdater { + return &DefaultResctrlProtocolUpdater{ + hooksProtocol: hooksProtocol, + updateFunc: RemoveResctrlProtocolUpdaterFunc, + } +} + +func NewRemoveResctrlUpdater(group string) util.ResctrlUpdater { + return &DefaultResctrlProtocolUpdater{ + group: group, + updateFunc: RemoveResctrlUpdaterFunc, + } +} + +func CreateResctrlProtocolUpdaterFunc(u util.ResctrlUpdater) error { + r, ok := u.(*DefaultResctrlProtocolUpdater) + if !ok { + return fmt.Errorf("not a ResctrlSchemataResourceUpdater") + } + + podCtx, ok := r.hooksProtocol.(*protocol.PodContext) + if !ok { + return fmt.Errorf("pod protocol is nil for plugin %v", name) + } + if podCtx.Response.Resources.Resctrl != nil { + podCtx.Response.Resources.Resctrl.Schemata = r.Value() + podCtx.Response.Resources.Resctrl.Closid = r.Key() + } else { + resctrlInfo := &protocol.Resctrl{ + NewTaskIds: make([]int32, 0), + } + resctrlInfo.Schemata = r.Value() + resctrlInfo.Closid = r.Key() + podCtx.Response.Resources.Resctrl = resctrlInfo + } + return nil +} + +func RemoveResctrlProtocolUpdaterFunc(u util.ResctrlUpdater) error { + r, ok := u.(*DefaultResctrlProtocolUpdater) + if !ok { + return fmt.Errorf("not a ResctrlSchemataResourceUpdater") + } + resctrlInfo := &protocol.Resctrl{ + NewTaskIds: make([]int32, 0), + } + podCtx, ok := r.hooksProtocol.(*protocol.PodContext) + if !ok { + return fmt.Errorf("pod protocol is nil for plugin %v", name) + } + resctrlInfo.Closid = util.ClosdIdPrefix + podCtx.Request.PodMeta.UID + podCtx.Response.Resources.Resctrl = resctrlInfo + return nil +} + +func RemoveResctrlUpdaterFunc(u util.ResctrlUpdater) error { + r, ok := u.(*DefaultResctrlProtocolUpdater) + if !ok { + return fmt.Errorf("not a ResctrlSchemataResourceUpdater") + } + if err := os.Remove(system.GetResctrlGroupRootDirPath(r.group)); err != nil { + return err + } else { + klog.V(5).Infof("successfully remove ctrl group %s", r.group) + } + return nil +} + +type plugin struct { + rule *Rule + engine util.ResctrlEngine + executor resourceexecutor.ResourceUpdateExecutor + statesInformer statesinformer.StatesInformer +} + +var singleton *plugin + +func Object() *plugin { + if singleton == nil { + singleton = newPlugin() + } + return singleton +} + +func newPlugin() *plugin { + return &plugin{ + rule: newRule(), + } +} + +func (p *plugin) Register(op hooks.Options) { + // skip if host not support resctrl + if support, err := system.IsSupportResctrl(); err != nil { + klog.Warningf("check support resctrl failed, err: %s", err) + return + } else if !support { + klog.V(5).Infof("resctrl runtime hook skipped, cpu not support CAT/MBA") + return + } + + if vendorID, err := sysutil.GetVendorIDByCPUInfo(sysutil.GetCPUInfoPath()); err == nil { + p.engine, err = util.NewRDTEngine(vendorID) + if err != nil { + klog.Errorf("New RDT Engine failed, error is %v", err) + return + } + } + p.executor = op.Executor + p.statesInformer = op.StatesInformer + + p.engine.Rebuild() + rule.Register(ruleNameForAllPods, description, + rule.WithParseFunc(statesinformer.RegisterTypeAllPods, p.parseRuleForAllPods), + rule.WithUpdateCallback(p.ruleUpdateCbForAllPods)) + hooks.Register(rmconfig.PreRunPodSandbox, name, description+" (pod)", p.SetPodResctrlResourcesForHooks) + hooks.Register(rmconfig.PreCreateContainer, name, description+" (pod)", p.SetContainerResctrlResources) + hooks.Register(rmconfig.PreRemoveRunPodSandbox, name, description+" (pod)", p.RemovePodResctrlResources) + + reconciler.RegisterCgroupReconciler(reconciler.PodLevel, system.ResctrlSchemata, description+" (pod resctrl schema)", p.SetPodResctrlResourcesForReconciler, reconciler.NoneFilter()) + reconciler.RegisterCgroupReconciler(reconciler.PodLevel, system.ResctrlTasks, description+" (pod resctrl tasks)", p.UpdatePodTaskIds, reconciler.NoneFilter()) + reconciler.RegisterCgroupReconciler4AllPods(reconciler.AllPodsLevel, system.ResctrlRoot, description+" (pod resctl schema)", p.RemoveUnusedResctrlPath, reconciler.PodAnnotationResctrlFilter(), "resctrl") + +} + +func (p *plugin) SetPodResctrlResourcesForHooks(proto protocol.HooksProtocol) error { + return p.setPodResctrlResources(proto, true) +} + +func (p *plugin) SetPodResctrlResourcesForReconciler(proto protocol.HooksProtocol) error { + return p.setPodResctrlResources(proto, false) +} + +func (p *plugin) setPodResctrlResources(proto protocol.HooksProtocol, fromNRI bool) error { + podCtx, ok := proto.(*protocol.PodContext) + if !ok { + return fmt.Errorf("pod protocol is nil for plugin %v", name) + } + + if v, ok := podCtx.Request.Annotations[apiext.AnnotationResctrl]; ok { + app, ok := p.engine.GetApp(podCtx.Request.PodMeta.UID) + if ok && app.Annotation == v { + return nil + } + updater := NewCreateResctrlProtocolUpdater(proto) + err := p.engine.RegisterApp(podCtx.Request.PodMeta.UID, v, fromNRI, updater) + if err != nil { + return err + } + } + + return nil +} + +func (p *plugin) RemoveUnusedResctrlPath(protos []protocol.HooksProtocol) error { + currentPods := make(map[string]protocol.HooksProtocol) + + for _, proto := range protos { + podCtx, ok := proto.(*protocol.PodContext) + if !ok { + return fmt.Errorf("pod protocol is nil for plugin %v", name) + } + + if _, ok := podCtx.Request.Annotations[apiext.AnnotationResctrl]; ok { + group := podCtx.Request.PodMeta.UID + currentPods[group] = podCtx + } + } + + apps := p.engine.GetApps() + for k, v := range apps { + if _, ok := currentPods[k]; !ok { + updater := NewRemoveResctrlUpdater(v.Closid) + p.engine.UnRegisterApp(strings.TrimPrefix(v.Closid, util.ClosdIdPrefix), false, updater) + } + } + return nil +} + +func (p *plugin) UpdatePodTaskIds(proto protocol.HooksProtocol) error { + podCtx, ok := proto.(*protocol.PodContext) + if !ok { + return fmt.Errorf("pod protocol is nil for plugin %v", name) + } + + if _, ok := podCtx.Request.Annotations[apiext.AnnotationResctrl]; ok { + curTaskMaps := map[string]map[int32]struct{}{} + var err error + group := podCtx.Request.PodMeta.UID + curTaskMaps[group], err = system.ReadResctrlTasksMap(util.ClosdIdPrefix + group) + if err != nil { + klog.Warningf("failed to read Cat L3 tasks for resctrl group %s, err: %s", group, err) + } + + newTaskIds := util.GetPodCgroupNewTaskIdsFromPodCtx(podCtx, curTaskMaps[group]) + resctrlInfo := &protocol.Resctrl{ + Closid: util.ClosdIdPrefix + group, + NewTaskIds: make([]int32, 0), + } + resctrlInfo.NewTaskIds = newTaskIds + podCtx.Response.Resources.Resctrl = resctrlInfo + } + return nil +} + +func (p *plugin) SetContainerResctrlResources(proto protocol.HooksProtocol) error { + containerCtx, ok := proto.(*protocol.ContainerContext) + if !ok { + return fmt.Errorf("container protocol is nil for plugin %v", name) + } + + if _, ok := containerCtx.Request.PodAnnotations[apiext.AnnotationResctrl]; ok { + containerCtx.Response.Resources.Resctrl = &protocol.Resctrl{ + Schemata: "", + Hook: "", + Closid: util.ClosdIdPrefix + containerCtx.Request.PodMeta.UID, + NewTaskIds: make([]int32, 0), + } + } + + return nil +} + +func (p *plugin) RemovePodResctrlResources(proto protocol.HooksProtocol) error { + podCtx, ok := proto.(*protocol.PodContext) + if !ok { + return fmt.Errorf("pod protocol is nil for plugin %v", name) + } + + if _, ok := podCtx.Request.Annotations[apiext.AnnotationResctrl]; ok { + updater := NewRemoveResctrlProtocolUpdater(proto) + p.engine.UnRegisterApp(podCtx.Request.PodMeta.UID, true, updater) + } + return nil +} diff --git a/pkg/koordlet/runtimehooks/hooks/resctrl/rule.go b/pkg/koordlet/runtimehooks/hooks/resctrl/rule.go new file mode 100644 index 000000000..afc1aa08b --- /dev/null +++ b/pkg/koordlet/runtimehooks/hooks/resctrl/rule.go @@ -0,0 +1,72 @@ +/* +Copyright 2022 The Koordinator Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package resctrl + +import ( + "strings" + "sync" + + corev1 "k8s.io/api/core/v1" + "k8s.io/klog/v2" + + apiext "github.com/koordinator-sh/koordinator/apis/extension" + "github.com/koordinator-sh/koordinator/pkg/koordlet/statesinformer" + util "github.com/koordinator-sh/koordinator/pkg/koordlet/util/resctrl" +) + +type Rule struct { + lock sync.RWMutex +} + +func newRule() *Rule { + return &Rule{} +} + +func (p *plugin) parseRuleForAllPods(allPods interface{}) (bool, error) { + return true, nil +} + +func (p *plugin) ruleUpdateCbForAllPods(target *statesinformer.CallbackTarget) error { + if target == nil { + klog.Warningf("callback target is nil") + return nil + } + + if p.rule == nil { + klog.V(5).Infof("hook plugin rule is nil, nothing to do for plugin %v", ruleNameForAllPods) + return nil + } + + apps := p.engine.GetApps() + + currentPods := make(map[string]*corev1.Pod) + for _, podMeta := range target.Pods { + pod := podMeta.Pod + if _, ok := podMeta.Pod.Annotations[apiext.AnnotationResctrl]; ok { + group := string(podMeta.Pod.UID) + currentPods[group] = pod + } + } + + for k, v := range apps { + if _, ok := currentPods[k]; !ok { + updater := NewRemoveResctrlUpdater(v.Closid) + p.engine.UnRegisterApp(strings.TrimPrefix(v.Closid, util.ClosdIdPrefix), false, updater) + } + } + return nil +} diff --git a/pkg/koordlet/runtimehooks/nri/server.go b/pkg/koordlet/runtimehooks/nri/server.go index 6c047deee..37711c104 100644 --- a/pkg/koordlet/runtimehooks/nri/server.go +++ b/pkg/koordlet/runtimehooks/nri/server.go @@ -65,7 +65,7 @@ type NriServer struct { } const ( - events = "RunPodSandbox,CreateContainer,UpdateContainer" + events = "RunPodSandbox,RemovePodSandbox,CreateContainer,UpdateContainer" pluginName = "koordlet_nri" pluginIdx = "00" ) @@ -74,6 +74,7 @@ var ( _ = stub.ConfigureInterface(&NriServer{}) _ = stub.SynchronizeInterface(&NriServer{}) _ = stub.RunPodInterface(&NriServer{}) + _ = stub.RemovePodInterface(&NriServer{}) _ = stub.CreateContainerInterface(&NriServer{}) _ = stub.UpdateContainerInterface(&NriServer{}) ) @@ -186,6 +187,7 @@ func (p *NriServer) CreateContainer(pod *api.PodSandbox, container *api.Containe } func (p *NriServer) UpdateContainer(pod *api.PodSandbox, container *api.Container) ([]*api.ContainerUpdate, error) { + containerCtx := &protocol.ContainerContext{} containerCtx.FromNri(pod, container) // todo: return error or bypass error based on PluginFailurePolicy @@ -202,12 +204,28 @@ func (p *NriServer) UpdateContainer(pod *api.PodSandbox, container *api.Containe klog.Errorf("containerCtx nri done failed: %v", err) return nil, nil } - klog.V(6).Infof("handle NRI UpdateContainer successfully, container %s/%s/%s", pod.GetNamespace(), pod.GetName(), container.GetName()) return []*api.ContainerUpdate{update}, nil } +func (p *NriServer) RemovePodSandbox(pod *api.PodSandbox) error { + podCtx := &protocol.PodContext{} + podCtx.FromNri(pod) + // todo: return error or bypass error based on PluginFailurePolicy + err := hooks.RunHooks(p.options.PluginFailurePolicy, rmconfig.PreRemoveRunPodSandbox, podCtx) + if err != nil { + klog.Errorf("nri hooks run error: %v", err) + if p.options.PluginFailurePolicy == rmconfig.PolicyFail { + return err + } + } + podCtx.NriRemoveDone(p.options.Executor) + + klog.V(6).Infof("handle NRI RemovePodSandbox successfully, pod %s/%s", pod.GetNamespace(), pod.GetName()) + return nil +} + func (p *NriServer) onClose() { p.stub.Stop() klog.V(6).Infof("NRI server closes") diff --git a/pkg/koordlet/runtimehooks/protocol/container_context.go b/pkg/koordlet/runtimehooks/protocol/container_context.go index 1d85ab009..f0a901cee 100644 --- a/pkg/koordlet/runtimehooks/protocol/container_context.go +++ b/pkg/koordlet/runtimehooks/protocol/container_context.go @@ -262,6 +262,11 @@ func (c *ContainerContext) NriDone(executor resourceexecutor.ResourceUpdateExecu update.SetLinuxMemoryLimit(*c.Response.Resources.MemoryLimit) } + if c.Response.Resources.Resctrl != nil { + adjust.SetLinuxRDTClass((*(c.Response.Resources.Resctrl)).Closid) + update.SetLinuxRDTClass((*(c.Response.Resources.Resctrl)).Closid) + } + if c.Response.AddContainerEnvs != nil { for k, v := range c.Response.AddContainerEnvs { adjust.AddEnv(k, v) diff --git a/pkg/koordlet/runtimehooks/protocol/pod_context.go b/pkg/koordlet/runtimehooks/protocol/pod_context.go index f81a160d4..b96da2d71 100644 --- a/pkg/koordlet/runtimehooks/protocol/pod_context.go +++ b/pkg/koordlet/runtimehooks/protocol/pod_context.go @@ -18,9 +18,9 @@ package protocol import ( "fmt" + "os" "github.com/containerd/nri/pkg/api" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/klog/v2" @@ -29,6 +29,7 @@ import ( "github.com/koordinator-sh/koordinator/pkg/koordlet/audit" "github.com/koordinator-sh/koordinator/pkg/koordlet/resourceexecutor" "github.com/koordinator-sh/koordinator/pkg/koordlet/statesinformer" + "github.com/koordinator-sh/koordinator/pkg/koordlet/util/system" "github.com/koordinator-sh/koordinator/pkg/util" ) @@ -67,6 +68,7 @@ type PodRequest struct { CgroupParent string Resources *Resources // TODO: support proxy & nri mode ExtendedResources *apiext.ExtendedResourceSpec + ContainerTaskIds map[string][]int32 } func (p *PodRequest) FromNri(pod *api.PodSandbox) { @@ -106,6 +108,7 @@ func (p *PodRequest) FromReconciler(podMeta *statesinformer.PodMeta) { p.Labels = podMeta.Pod.Labels p.Annotations = podMeta.Pod.Annotations p.CgroupParent = podMeta.CgroupDir + p.ContainerTaskIds = podMeta.ContainerTaskIds p.Resources = &Resources{} p.Resources.FromPod(podMeta.Pod) // retrieve ExtendedResources from pod spec and pod annotations (prefer pod spec) @@ -177,6 +180,14 @@ func (p *PodContext) NriDone(executor resourceexecutor.ResourceUpdateExecutor) { p.Update() } +func (p *PodContext) NriRemoveDone(executor resourceexecutor.ResourceUpdateExecutor) { + if p.executor == nil { + p.executor = executor + } + p.removeForExt() + p.Update() +} + func (p *PodContext) FromReconciler(podMeta *statesinformer.PodMeta) { p.Request.FromReconciler(podMeta) } @@ -277,4 +288,50 @@ func (p *PodContext) injectForExt() { p.Request.PodMeta.Namespace, p.Request.PodMeta.Name, *p.Response.Resources.MemoryLimit, p.Request.CgroupParent) } } + if p.Response.Resources.Resctrl != nil { + eventHelper := audit.V(3).Pod(p.Request.PodMeta.Namespace, p.Request.PodMeta.Name).Reason("runtime-hooks").Message( + "set pod LLC/MB limit to %v", *p.Response.Resources.Resctrl) + if p.Response.Resources.Resctrl.Closid != "" || p.Response.Resources.Resctrl.Schemata != "" { + updater, err := createCatGroup(p.Response.Resources.Resctrl.Closid, eventHelper, p.executor) + if err != nil { + klog.Infof("create pod %v/%v cat group %v failed, error %v", p.Request.PodMeta.Namespace, + p.Request.PodMeta.Name, p.Response.Resources.Resctrl.Closid, err) + } else { + p.updaters = append(p.updaters, updater) + klog.V(5).Infof("create pod %v/%v cat group %v", + p.Request.PodMeta.Namespace, p.Request.PodMeta.Name, p.Response.Resources.Resctrl.Closid) + } + + updater, err = injectResctrl(p.Response.Resources.Resctrl.Closid, p.Response.Resources.Resctrl.Schemata, eventHelper, p.executor) + if err != nil { + klog.Infof("set pod %v/%v LLC/MB limit %v on cgroup parent %v failed, error %v", p.Request.PodMeta.Namespace, + p.Request.PodMeta.Name, p.Response.Resources.Resctrl.Closid, p.Response.Resources.Resctrl.Schemata, err) + } else { + p.updaters = append(p.updaters, updater) + klog.V(5).Infof("set pod %v/%v LLC/MB limit %v on cgroup parent %v", + p.Request.PodMeta.Namespace, p.Request.PodMeta.Name, *p.Response.Resources.Resctrl, p.Request.CgroupParent) + } + } + + if len(p.Response.Resources.Resctrl.NewTaskIds) > 0 { + updater, err := resourceexecutor.CalculateResctrlL3TasksResource(p.Response.Resources.Resctrl.Closid, p.Response.Resources.Resctrl.NewTaskIds) + if err != nil { + klog.V(5).Infof("failed to get l3 tasks resource for group %s, err: %s", p.Response.Resources.Resctrl.Closid, err) + } else { + p.updaters = append(p.updaters, updater) + } + } + } +} + +func (p *PodContext) removeForExt() { + if p.Response.Resources.Resctrl != nil && p.Response.Resources.Resctrl.Closid != "" { + if err := os.Remove(system.GetResctrlGroupRootDirPath(p.Response.Resources.Resctrl.Closid)); err != nil { + klog.Infof("cannot remove ctrl group, err: %v", err) + } else { + klog.Infof("remove pod %v/%v ctrl group %v on cgroup parent %v", + p.Request.PodMeta.Namespace, p.Request.PodMeta.Name, *p.Response.Resources.Resctrl, p.Request.CgroupParent) + + } + } } diff --git a/pkg/koordlet/runtimehooks/protocol/protocol.go b/pkg/koordlet/runtimehooks/protocol/protocol.go index 0688e2bfe..7458fa381 100644 --- a/pkg/koordlet/runtimehooks/protocol/protocol.go +++ b/pkg/koordlet/runtimehooks/protocol/protocol.go @@ -71,6 +71,13 @@ var HooksProtocolBuilder = hooksProtocolBuilder{ }, } +type Resctrl struct { + Schemata string + Hook string + Closid string + NewTaskIds []int32 +} + type Resources struct { // origin resources CPUShares *int64 @@ -81,6 +88,7 @@ type Resources struct { // extended resources CPUBvt *int64 CPUIdle *int64 + Resctrl *Resctrl } func (r *Resources) IsOriginResSet() bool { @@ -159,6 +167,22 @@ func injectMemoryLimit(cgroupParent string, memoryLimit int64, a *audit.EventHel return updater, nil } +func createCatGroup(closid string, a *audit.EventHelper, e resourceexecutor.ResourceUpdateExecutor) (resourceexecutor.ResourceUpdater, error) { + updater, err := resourceexecutor.NewCatGroupResource(closid, a) + if err != nil { + return nil, err + } + return updater, nil +} + +func injectResctrl(closid string, schemata string, e *audit.EventHelper, executor resourceexecutor.ResourceUpdateExecutor) (resourceexecutor.ResourceUpdater, error) { + updater, err := resourceexecutor.NewResctrlSchemataResource(closid, schemata, e) + if err != nil { + return nil, err + } + return updater, nil +} + func injectCPUBvt(cgroupParent string, bvtValue int64, a *audit.EventHelper, e resourceexecutor.ResourceUpdateExecutor) (resourceexecutor.ResourceUpdater, error) { bvtValueStr := strconv.FormatInt(bvtValue, 10) updater, err := resourceexecutor.DefaultCgroupUpdaterFactory.New(sysutil.CPUBVTWarpNsName, cgroupParent, bvtValueStr, a) diff --git a/pkg/koordlet/runtimehooks/reconciler/reconciler.go b/pkg/koordlet/runtimehooks/reconciler/reconciler.go index 7f38153e8..43ac30bee 100644 --- a/pkg/koordlet/runtimehooks/reconciler/reconciler.go +++ b/pkg/koordlet/runtimehooks/reconciler/reconciler.go @@ -39,22 +39,24 @@ const ( PodLevel ReconcilerLevel = "pod" ContainerLevel ReconcilerLevel = "container" SandboxLevel ReconcilerLevel = "sandbox" + AllPodsLevel ReconcilerLevel = "allpods" ) var globalCgroupReconcilers = struct { all []*cgroupReconciler - kubeQOSLevel map[string]*cgroupReconciler - podLevel map[string]*cgroupReconciler - containerLevel map[string]*cgroupReconciler - + kubeQOSLevel map[string]*cgroupReconciler + podLevel map[string]*cgroupReconciler + containerLevel map[string]*cgroupReconciler sandboxContainerLevel map[string]*cgroupReconciler + allPodsLevel map[string]*cgroupReconciler }{ kubeQOSLevel: map[string]*cgroupReconciler{}, podLevel: map[string]*cgroupReconciler{}, containerLevel: map[string]*cgroupReconciler{}, sandboxContainerLevel: map[string]*cgroupReconciler{}, + allPodsLevel: map[string]*cgroupReconciler{}, } type cgroupReconciler struct { @@ -63,6 +65,7 @@ type cgroupReconciler struct { level ReconcilerLevel filter Filter fn map[string]reconcileFunc + fn4AllPods map[string]reconcileFunc4AllPods } // Filter & Conditions: @@ -133,7 +136,90 @@ func PodQOSFilter() Filter { return singletonPodQOSFilter } +type podAnnotationResctrlFilter struct{} + +const ( + podAnnotationResctrlFilterName = "resctrl" +) + +func (p *podAnnotationResctrlFilter) Name() string { + return podAnnotationResctrlFilterName +} + +func (p *podAnnotationResctrlFilter) Filter(podMeta *statesinformer.PodMeta) string { + if _, ok := podMeta.Pod.Annotations[apiext.AnnotationResctrl]; ok { + return podAnnotationResctrlFilterName + } + + return "" +} + +var singletonPodAnnotationResctrlFilter *podAnnotationResctrlFilter + +// PodQOSFilter returns a Filter which filters pod qos class +func PodAnnotationResctrlFilter() *podAnnotationResctrlFilter { + if singletonPodQOSFilter == nil { + singletonPodQOSFilter = &podQOSFilter{} + } + return singletonPodAnnotationResctrlFilter +} + type reconcileFunc func(protocol.HooksProtocol) error +type reconcileFunc4AllPods func([]protocol.HooksProtocol) error + +func RegisterCgroupReconciler4AllPods(level ReconcilerLevel, cgroupFile system.Resource, description string, + fn reconcileFunc4AllPods, filter Filter, conditions ...string) { + if len(conditions) <= 0 { // default condition + conditions = []string{NoneFilterCondition} + } + + for _, r := range globalCgroupReconcilers.all { + if level != r.level || cgroupFile.ResourceType() != r.cgroupFile.ResourceType() { + continue + } + + // if reconciler exist + if r.filter.Name() != filter.Name() { + klog.Fatalf("%v of level %v is already registered with filter %v by %v, cannot change to %v by %v", + cgroupFile.ResourceType(), level, r.filter.Name(), r.description, filter.Name(), description) + } + + for _, condition := range conditions { + if _, ok := r.fn[condition]; ok { + klog.Fatalf("%v of level %v is already registered with condition %v by %v, cannot change by %v", + cgroupFile.ResourceType(), level, condition, r.description, description) + } + + r.fn4AllPods[condition] = fn + } + klog.V(1).Infof("register reconcile function %v finished, info: level=%v, resourceType=%v, add conditions=%v", + description, level, cgroupFile.ResourceType(), conditions) + return + } + + // if reconciler not exist + r := &cgroupReconciler{ + cgroupFile: cgroupFile, + description: description, + level: level, + fn: map[string]reconcileFunc{}, + fn4AllPods: map[string]reconcileFunc4AllPods{}, + } + + globalCgroupReconcilers.all = append(globalCgroupReconcilers.all, r) + switch level { + case AllPodsLevel: + r.filter = filter + for _, condition := range conditions { + r.fn4AllPods[condition] = fn + } + globalCgroupReconcilers.allPodsLevel[string(r.cgroupFile.ResourceType())] = r + default: + klog.Fatalf("cgroup level %v is not supported", level) + } + klog.V(1).Infof("register reconcile function %v finished, info: level=%v, resourceType=%v, filter=%v, conditions=%v", + description, level, cgroupFile.ResourceType(), filter.Name(), conditions) +} // RegisterCgroupReconciler registers a cgroup reconciler according to the cgroup file, reconcile function and filter // conditions. A cgroup file of one level can have multiple reconcile functions with different filtered conditions. @@ -178,6 +264,7 @@ func RegisterCgroupReconciler(level ReconcilerLevel, cgroupFile system.Resource, description: description, level: level, fn: map[string]reconcileFunc{}, + fn4AllPods: map[string]reconcileFunc4AllPods{}, } globalCgroupReconcilers.all = append(globalCgroupReconcilers.all, r) @@ -385,6 +472,28 @@ func (c *reconciler) reconcilePodCgroup(stopCh <-chan struct{}) { } } } + + for _, r := range globalCgroupReconcilers.allPodsLevel { + currentPods := make([]protocol.HooksProtocol, 0) + for _, podMeta := range podsMeta { + if _, ok := r.fn4AllPods[r.filter.Filter(podMeta)]; ok { + podCtx := protocol.HooksProtocolBuilder.Pod(podMeta) + currentPods = append(currentPods, podCtx) + } + } + + reconcileFn, ok := r.fn4AllPods[r.filter.Name()] + if !ok { + klog.V(5).Infof("calling reconcile function %v aborted, condition %s not registered", + r.description, r.filter.Name()) + continue + } + + if err := reconcileFn(currentPods); err != nil { + klog.Warningf("calling reconcile function %v for pod %v failed, error %v", + r.description, err) + } + } case <-stopCh: klog.V(1).Infof("stop reconcile pod cgroup") return diff --git a/pkg/koordlet/runtimehooks/runtimehooks.go b/pkg/koordlet/runtimehooks/runtimehooks.go index 9a888ba42..270d4ecde 100644 --- a/pkg/koordlet/runtimehooks/runtimehooks.go +++ b/pkg/koordlet/runtimehooks/runtimehooks.go @@ -125,8 +125,9 @@ func NewRuntimeHook(si statesinformer.StatesInformer, cfg *Config) (RuntimeHook, } newPluginOptions := hooks.Options{ - Reader: cr, - Executor: e, + Reader: cr, + Executor: e, + StatesInformer: si, } if err != nil { diff --git a/pkg/koordlet/statesinformer/api.go b/pkg/koordlet/statesinformer/api.go index 656b28179..efed4ecc6 100644 --- a/pkg/koordlet/statesinformer/api.go +++ b/pkg/koordlet/statesinformer/api.go @@ -27,14 +27,16 @@ import ( ) type PodMeta struct { - Pod *corev1.Pod - CgroupDir string + Pod *corev1.Pod + CgroupDir string + ContainerTaskIds map[string][]int32 } func (in *PodMeta) DeepCopy() *PodMeta { out := new(PodMeta) out.Pod = in.Pod.DeepCopy() out.CgroupDir = in.CgroupDir + out.ContainerTaskIds = in.ContainerTaskIds return out } diff --git a/pkg/koordlet/statesinformer/impl/states_pods.go b/pkg/koordlet/statesinformer/impl/states_pods.go index 7da62dbf1..12d2d3867 100644 --- a/pkg/koordlet/statesinformer/impl/states_pods.go +++ b/pkg/koordlet/statesinformer/impl/states_pods.go @@ -31,6 +31,7 @@ import ( apiext "github.com/koordinator-sh/koordinator/apis/extension" "github.com/koordinator-sh/koordinator/pkg/koordlet/metrics" "github.com/koordinator-sh/koordinator/pkg/koordlet/pleg" + "github.com/koordinator-sh/koordinator/pkg/koordlet/resourceexecutor" "github.com/koordinator-sh/koordinator/pkg/koordlet/statesinformer" koordletutil "github.com/koordinator-sh/koordinator/pkg/koordlet/util" "github.com/koordinator-sh/koordinator/pkg/koordlet/util/system" @@ -57,6 +58,8 @@ type podsInformer struct { nodeInformer *nodeInformer callbackRunner *callbackRunner + + cgroupReader resourceexecutor.CgroupReader } func NewPodsInformer() *podsInformer { @@ -85,6 +88,8 @@ func (s *podsInformer) Setup(ctx *PluginOption, states *PluginState) { s.nodeInformer = nodeInformer s.callbackRunner = states.callbackRunner + + s.cgroupReader = resourceexecutor.NewCgroupReader() } func (s *podsInformer) Start(stopCh <-chan struct{}) { @@ -141,6 +146,65 @@ func (s *podsInformer) GetAllPods() []*statesinformer.PodMeta { return pods } +func (s *podsInformer) getTaskIds(podMeta *statesinformer.PodMeta) { + pod := podMeta.Pod + containerMap := make(map[string]*corev1.Container, len(pod.Spec.Containers)) + for i := range pod.Spec.Containers { + container := &pod.Spec.Containers[i] + containerMap[container.Name] = container + } + + for _, containerStat := range pod.Status.ContainerStatuses { + container, exist := containerMap[containerStat.Name] + if !exist { + klog.Warningf("container %s/%s/%s lost during reconcile resctrl group", pod.Namespace, + pod.Name, containerStat.Name) + continue + } + + containerDir, err := koordletutil.GetContainerCgroupParentDir(podMeta.CgroupDir, &containerStat) + if err != nil { + klog.V(4).Infof("failed to get pod container cgroup path for container %s/%s/%s, err: %s", + pod.Namespace, pod.Name, container.Name, err) + continue + } + ids, err := s.cgroupReader.ReadCPUTasks(containerDir) + if err != nil && resourceexecutor.IsCgroupDirErr(err) { + klog.V(5).Infof("failed to read container task ids whose cgroup path %s does not exists, err: %s", + containerDir, err) + return + } else if err != nil { + klog.Warningf("failed to get pod container cgroup task ids for container %s/%s/%s, err: %s", + pod.Namespace, pod.Name, container.Name, err) + continue + } + podMeta.ContainerTaskIds[containerStat.ContainerID] = ids + } + + sandboxID, err := koordletutil.GetPodSandboxContainerID(pod) + if err != nil { + klog.V(4).Infof("failed to get sandbox container ID for pod %s/%s, err: %s", + pod.Namespace, pod.Name, err) + return + } + sandboxContainerDir, err := koordletutil.GetContainerCgroupParentDirByID(podMeta.CgroupDir, sandboxID) + if err != nil { + klog.V(4).Infof("failed to get pod container cgroup path for sandbox container %s/%s/%s, err: %s", + pod.Namespace, pod.Name, sandboxID, err) + } + ids, err := s.cgroupReader.ReadCPUTasks(sandboxContainerDir) + if err != nil && resourceexecutor.IsCgroupDirErr(err) { + klog.V(5).Infof("failed to read container task ids whose cgroup path %s does not exists, err: %s", + sandboxContainerDir, err) + return + } else if err != nil { + klog.Warningf("failed to get pod container cgroup task ids for sandbox container %s/%s/%s, err: %s", + pod.Namespace, pod.Name, sandboxID, err) + return + } + podMeta.ContainerTaskIds[sandboxID] = ids +} + func (s *podsInformer) syncPods() error { podList, err := s.kubelet.GetAllPods() @@ -155,10 +219,13 @@ func (s *podsInformer) syncPods() error { for i := range podList.Items { pod := &podList.Items[i] podMeta := &statesinformer.PodMeta{ - Pod: pod, // no need to deep-copy from unmarshalled - CgroupDir: genPodCgroupParentDir(pod), + Pod: pod, // no need to deep-copy from unmarshalled + CgroupDir: genPodCgroupParentDir(pod), + ContainerTaskIds: make(map[string][]int32), } newPodMap[string(pod.UID)] = podMeta + // record pod's containers taskids + s.getTaskIds(podMeta) // record pod container metrics recordPodResourceMetrics(podMeta) } diff --git a/pkg/koordlet/statesinformer/impl/states_pods_test.go b/pkg/koordlet/statesinformer/impl/states_pods_test.go index aaa6eebbe..a877ea4a2 100644 --- a/pkg/koordlet/statesinformer/impl/states_pods_test.go +++ b/pkg/koordlet/statesinformer/impl/states_pods_test.go @@ -39,6 +39,7 @@ import ( apiext "github.com/koordinator-sh/koordinator/apis/extension" fakekoordclientset "github.com/koordinator-sh/koordinator/pkg/client/clientset/versioned/fake" "github.com/koordinator-sh/koordinator/pkg/koordlet/metrics" + "github.com/koordinator-sh/koordinator/pkg/koordlet/resourceexecutor" "github.com/koordinator-sh/koordinator/pkg/koordlet/statesinformer" "github.com/koordinator-sh/koordinator/pkg/koordlet/util/system" ) @@ -223,6 +224,7 @@ func Test_statesInformer_syncPods(t *testing.T) { }}, podHasSynced: atomic.NewBool(false), callbackRunner: NewCallbackRunner(), + cgroupReader: resourceexecutor.NewCgroupReader(), } err := m.syncPods() @@ -377,6 +379,7 @@ func Test_statesInformer_syncKubeletLoop(t *testing.T) { }}, callbackRunner: NewCallbackRunner(), podHasSynced: atomic.NewBool(false), + cgroupReader: resourceexecutor.NewCgroupReader(), } go m.syncKubeletLoop(c.KubeletSyncInterval, stopCh) time.Sleep(5 * time.Second) diff --git a/pkg/koordlet/util/resctrl/ctrl_mgr.go b/pkg/koordlet/util/resctrl/ctrl_mgr.go new file mode 100644 index 000000000..d71676914 --- /dev/null +++ b/pkg/koordlet/util/resctrl/ctrl_mgr.go @@ -0,0 +1,194 @@ +/* +Copyright 2022 The Koordinator Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package util + +import ( + "io" + "os" + "path/filepath" + "strings" + "sync" + "time" + + gocache "github.com/patrickmn/go-cache" + "k8s.io/klog/v2" + + "github.com/koordinator-sh/koordinator/pkg/koordlet/metricsadvisor/framework" + sysutil "github.com/koordinator-sh/koordinator/pkg/koordlet/util/system" +) + +const ( + Remove = "Remove" + Add = "Add" + ExpirationTime int64 = 10 +) + +type Updater interface { + Update(string) error +} + +type SchemataUpdater interface { + Update(id, schemata string) error +} + +type ControlGroup struct { + Appid string + Groupid string + Schemata string + Status string + CreatedTime int64 +} + +type ControlGroupManager struct { + rdtcgs *gocache.Cache + reconcileInterval int64 + sync.Mutex +} + +func NewControlGroupManager() ControlGroupManager { + return ControlGroupManager{ + rdtcgs: gocache.New(time.Duration(ExpirationTime), framework.CleanupInterval), + } +} + +func (c *ControlGroupManager) Init() { + c.Lock() + defer c.Unlock() + // get resctrl filesystem root + root := sysutil.GetResctrlSubsystemDirPath() + files, err := os.ReadDir(root) + if err != nil { + klog.Errorf("read %s failed err is %v", root, err) + return + } + for _, file := range files { + // rebuild c.rdtcgs + if file.IsDir() && strings.HasPrefix(file.Name(), ClosdIdPrefix) { + path := filepath.Join(root, file.Name(), "schemata") + if _, err := os.Stat(path); err == nil { + reader, err := os.Open(path) + if err != nil { + klog.Errorf("open resctrl file path fail, %v", err) + } + content, err := io.ReadAll(reader) + if err != nil { + klog.Errorf("read resctrl file path fail, %v", err) + return + } + schemata := string(content) + podid := strings.TrimPrefix(file.Name(), ClosdIdPrefix) + c.rdtcgs.Set(podid, &ControlGroup{ + Appid: podid, + Groupid: file.Name(), + Schemata: schemata, + Status: Add, + CreatedTime: time.Now().UnixNano(), + }, -1) + klog.Infof("podid is %s, ctrl group is %v", podid, file.Name()) + } + } + } +} + +func (c *ControlGroupManager) AddPod(podid string, schemata string, fromNRI bool, createUpdater ResctrlUpdater, schemataUpdater ResctrlUpdater) { + c.Lock() + defer c.Unlock() + p, ok := c.rdtcgs.Get(podid) + + var pod *ControlGroup + if !ok { + pod = &ControlGroup{ + Appid: podid, + Groupid: "", + Schemata: "", + Status: Add, + } + } else { + pod = p.(*ControlGroup) + } + + if pod.Status == Add && pod.Groupid == "" { + if createUpdater != nil { + err := createUpdater.Update() + if err != nil { + klog.Errorf("create ctrl group error %v", err) + } else { + pod.Groupid = ClosdIdPrefix + podid + pod.CreatedTime = time.Now().UnixNano() + } + } + + if schemataUpdater != nil { + err := schemataUpdater.Update() + if err != nil { + klog.Errorf("updater ctrl group schemata error %v", err) + } + pod.Schemata = schemata + } + + c.rdtcgs.Set(podid, pod, -1) + } else { + if pod.Status == Add && pod.Groupid != "" { + if !fromNRI { + // Update Schemata + if schemataUpdater != nil { + err := schemataUpdater.Update() + if err != nil { + klog.Errorf("updater ctrl group schemata error %v", err) + } + pod.Schemata = schemata + } + c.rdtcgs.Set(podid, pod, -1) + } + } + } +} + +func (c *ControlGroupManager) RemovePod(podid string, fromNRI bool, removeUpdater ResctrlUpdater) bool { + c.Lock() + defer c.Unlock() + + p, ok := c.rdtcgs.Get(podid) + if !ok { + pod := &ControlGroup{podid, "", "", Remove, -1} + if removeUpdater != nil { + err := removeUpdater.Update() + if err != nil { + klog.Errorf("remove updater fail %v", err) + return false + } + } + + c.rdtcgs.Set(podid, pod, gocache.DefaultExpiration) + return true + } + pod := p.(*ControlGroup) + if (fromNRI || time.Now().UnixNano()-pod.CreatedTime >= ExpirationTime*time.Second.Nanoseconds()) && pod.Status == Add { + pod.Status = Remove + if removeUpdater != nil { + err := removeUpdater.Update() + if err != nil { + klog.Errorf("remove updater fail %v", err) + return false + } + } + + c.rdtcgs.Set(podid, pod, gocache.DefaultExpiration) + return true + } + return false +} diff --git a/pkg/koordlet/util/resctrl/resctrl.go b/pkg/koordlet/util/resctrl/resctrl.go new file mode 100644 index 000000000..9d3db5f9f --- /dev/null +++ b/pkg/koordlet/util/resctrl/resctrl.go @@ -0,0 +1,319 @@ +/* +Copyright 2022 The Koordinator Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package util + +import ( + "encoding/json" + "fmt" + "strconv" + "strings" + "sync" + + "k8s.io/klog/v2" + + apiext "github.com/koordinator-sh/koordinator/apis/extension" + "github.com/koordinator-sh/koordinator/pkg/koordlet/runtimehooks/protocol" + koordletutil "github.com/koordinator-sh/koordinator/pkg/koordlet/util" + sysutil "github.com/koordinator-sh/koordinator/pkg/koordlet/util/system" +) + +const ( + // Max memory bandwidth for AMD CPU, Gb/s, since the extreme limit is hard to reach, we set a discount by 0.8 + // TODO The max memory bandwidth varies across SKU, so koordlet should be aware of the maximum automatically, + // or support an configuration list. + // Currently, the value is measured on "AMD EPYC(TM) MILAN" + + AMDCCDMaxMBGbps = 25 * 8 * 0.8 + + // the AMD CPU use 2048 to express the unlimited memory bandwidth + AMDCCDUnlimitedMB = 2048 +) + +type ResctrlUpdater interface { + Name() string + Key() string + Value() string + Update() error + SetKey(key string) + SetValue(key string) +} + +const ClosdIdPrefix = "koordlet-" + +type App struct { + Resctrl *sysutil.ResctrlSchemataRaw + // Hooks Hook + Closid string + Annotation string +} + +type ResctrlEngine interface { + Rebuild() + RegisterApp(podid, annotation string, fromNRI bool, updater ResctrlUpdater) error + UnRegisterApp(podid string, fromNRI bool, updater ResctrlUpdater) error + GetApp(podid string) (App, bool) + GetApps() map[string]App +} + +func NewRDTEngine(vendor string) (ResctrlEngine, error) { + var CatL3CbmMask string + var err error + if CatL3CbmMask, err = sysutil.ReadCatL3CbmString(); err != nil { + klog.Errorf("get l3 cache bit mask error: %v", err) + return nil, err + } + + if len(CatL3CbmMask) <= 0 { + return nil, fmt.Errorf("failed to get cat l3 cbm, cbm is empty") + } + cbmValue, err := strconv.ParseUint(CatL3CbmMask, 16, 32) + if err != nil { + return nil, fmt.Errorf("failed to parse cat l3 cbm %s, err: %v", CatL3CbmMask, err) + } + cbm := uint(cbmValue) + + return &RDTEngine{ + Apps: make(map[string]App), + CtrlGroups: make(map[string]apiext.Resctrl), + CBM: cbm, + Cgm: NewControlGroupManager(), + Vendor: vendor, + }, nil +} + +type RDTEngine struct { + Apps map[string]App + Cgm ControlGroupManager + CtrlGroups map[string]apiext.Resctrl + l sync.RWMutex + CBM uint + Vendor string +} + +func (R *RDTEngine) GetApps() map[string]App { + R.l.RLock() + defer R.l.RUnlock() + apps := make(map[string]App) + for podid, app := range R.Apps { + apps[podid] = app + } + return apps +} + +func (R *RDTEngine) Rebuild() { + R.l.RLock() + defer R.l.RUnlock() + R.Cgm.Init() + for podid, item := range R.Cgm.rdtcgs.Items() { + v, ok := item.Object.(*ControlGroup) + if !ok { + continue + } + + ids, _ := sysutil.CacheIdsCacheFunc() + schemataRaw := sysutil.NewResctrlSchemataRaw(ids).WithL3Num(len(ids)) + err := schemataRaw.ParseResctrlSchemata(v.Schemata, -1) + if err != nil { + klog.Errorf("failed to parse %v", err) + } + R.Apps[podid] = App{ + Resctrl: schemataRaw, + Closid: v.Groupid, + } + } +} + +func (R *RDTEngine) RegisterApp(podid, annotation string, fromNRI bool, updater ResctrlUpdater) error { + R.l.Lock() + defer R.l.Unlock() + var res apiext.ResctrlConfig + err := json.Unmarshal([]byte(annotation), &res) + if err != nil { + klog.Errorf("error is %v", err) + return err + } + + schemata := R.ParseSchemata(res, R.CBM) + app := App{ + Resctrl: schemata, + Closid: ClosdIdPrefix + podid, + Annotation: annotation, + } + + items := []string{} + for _, item := range []struct { + validFunc func() (bool, string) + value func() string + }{ + {validFunc: app.Resctrl.ValidateL3, value: app.Resctrl.L3String}, + {validFunc: app.Resctrl.ValidateMB, value: app.Resctrl.MBString}, + } { + if valid, _ := item.validFunc(); valid { + items = append(items, item.value()) + } + } + schemataStr := strings.Join(items, "") + if updater != nil { + updater.SetKey(ClosdIdPrefix + podid) + updater.SetValue(schemataStr) + } + R.Cgm.AddPod(podid, schemataStr, fromNRI, updater, nil) + + R.Apps[podid] = app + return nil +} + +func (R *RDTEngine) UnRegisterApp(podid string, fromNRI bool, updater ResctrlUpdater) error { + R.l.Lock() + defer R.l.Unlock() + + if _, ok := R.Apps[podid]; !ok { + return fmt.Errorf("pod %s not registered", podid) + } + removed := R.Cgm.RemovePod(podid, fromNRI, updater) + if removed { + delete(R.Apps, podid) + } + + return nil +} + +func (R *RDTEngine) GetApp(id string) (App, bool) { + R.l.RLock() + defer R.l.RUnlock() + + if v, ok := R.Apps[id]; ok { + return v, true + } else { + return App{}, false + } +} + +func (R *RDTEngine) calculateMba(mbaPercent int64) int64 { + if R.Vendor == sysutil.INTEL_VENDOR_ID { + return calculateIntelMba(mbaPercent) + } else if R.Vendor == sysutil.AMD_VENDOR_ID { + return calculateAMDMba(mbaPercent) + } + return 0 +} + +func calculateIntelMba(mbaPercent int64) int64 { + if mbaPercent%10 != 0 { + actualPercent := mbaPercent/10*10 + 10 + klog.V(4).Infof("cat MBA must multiple of 10, mbaPercentConfig is %d, actualMBAPercent will be %d", + mbaPercent, actualPercent) + return actualPercent + } + + return mbaPercent +} + +func calculateAMDMba(mbaPercent int64) int64 { + if mbaPercent == 100 { + return AMDCCDUnlimitedMB + } + mbaLimitValue := float64(AMDCCDMaxMBGbps*mbaPercent) / 100 + return int64(mbaLimitValue) +} + +func (R *RDTEngine) ParseSchemata(config apiext.ResctrlConfig, cbm uint) *sysutil.ResctrlSchemataRaw { + ids, _ := sysutil.CacheIdsCacheFunc() + schemataRaw := sysutil.NewResctrlSchemataRaw(ids).WithL3Num(len(ids)) + if config.MB.Schemata.Percent != 0 { + percent := R.calculateMba(int64(config.MB.Schemata.Percent)) + for k := range schemataRaw.MB { + schemataRaw.MB[k] = percent + } + } + + if config.MB.SchemataPerCache != nil { + for _, v := range config.MB.SchemataPerCache { + percent := R.calculateMba(int64(v.Percent)) + schemataRaw.MB[v.CacheID] = percent + } + } + + if config.LLC.Schemata.Range != nil && len(config.LLC.Schemata.Range) == 2 { + start := config.LLC.Schemata.Range[0] + end := config.LLC.Schemata.Range[1] + + l3MaskValue, err := sysutil.CalculateCatL3MaskValue(cbm, int64(start), int64(end)) + if err != nil { + klog.Warningf("failed to calculate l3 cat schemata err: %v", err) + return schemataRaw + } + + schemataRaw.WithL3Num(len(ids)).WithL3Mask(l3MaskValue) + } + + if config.LLC.SchemataPerCache != nil { + for _, v := range config.LLC.SchemataPerCache { + if len(v.Range) == 2 { + start := v.Range[0] + end := v.Range[1] + l3MaskValue, err := sysutil.CalculateCatL3MaskValue(cbm, int64(start), int64(end)) + if err != nil { + klog.Warningf("failed to calculate l3 cat schemata err: %v", err) + return schemataRaw + } + // l3 mask MUST be a valid hex + maskValue, err := strconv.ParseInt(strings.TrimSpace(l3MaskValue), 16, 64) + if err != nil { + klog.V(5).Infof("failed to parse l3 mask %s, err: %v", l3MaskValue, err) + } + schemataRaw.L3[v.CacheID] = maskValue + } + } + } + return schemataRaw +} + +func GetPodCgroupNewTaskIdsFromPodCtx(podMeta *protocol.PodContext, tasksMap map[int32]struct{}) []int32 { + var taskIds []int32 + + for containerId, v := range podMeta.Request.ContainerTaskIds { + containerDir, err := koordletutil.GetContainerCgroupParentDirByID(podMeta.Request.CgroupParent, containerId) + if err != nil { + klog.Errorf("container %s lost during reconcile", containerDir) + continue + } + ids, err := GetNewTaskIds(v, tasksMap) + if err != nil { + klog.Warningf("failed to get pod container cgroup task ids for container %s/%s/%s, err: %s", + podMeta.Request.PodMeta.Name, containerId) + continue + } + taskIds = append(taskIds, ids...) + } + return taskIds +} + +func GetNewTaskIds(ids []int32, tasksMap map[int32]struct{}) ([]int32, error) { + if tasksMap == nil { + return ids, nil + } + + // only append the non-mapped ids + var taskIDs []int32 + for _, id := range ids { + if _, ok := tasksMap[id]; !ok { + taskIDs = append(taskIDs, id) + } + } + return taskIDs, nil +} diff --git a/pkg/koordlet/util/system/resctrl.go b/pkg/koordlet/util/system/resctrl.go index 9580bdd15..7663a8304 100644 --- a/pkg/koordlet/util/system/resctrl.go +++ b/pkg/koordlet/util/system/resctrl.go @@ -29,6 +29,7 @@ import ( "strings" "sync" + "go.uber.org/multierr" "k8s.io/klog/v2" "github.com/koordinator-sh/koordinator/pkg/util" @@ -37,9 +38,10 @@ import ( const ( ResctrlName string = "resctrl" - ResctrlDir string = "resctrl/" - RdtInfoDir string = "info" - L3CatDir string = "L3" + ResctrlDir string = "resctrl/" + RdtInfoDir string = "info" + L3CatDir string = "L3" + LastCMDStatus string = "last_cmd_status" ResctrlSchemataName string = "schemata" ResctrlCbmMaskName string = "cbm_mask" @@ -51,7 +53,8 @@ const ( MbSchemataPrefix = "MB" // other cpu vendor like "GenuineIntel" - AMD_VENDOR_ID = "AuthenticAMD" + AMD_VENDOR_ID = "AuthenticAMD" + INTEL_VENDOR_ID = "GenuineIntel" ) var ( @@ -112,6 +115,7 @@ func IsSupportResctrl() (bool, error) { } var ( + ResctrlRoot = NewCommonResctrlResource("", "") ResctrlSchemata = NewCommonResctrlResource(ResctrlSchemataName, "") ResctrlTasks = NewCommonResctrlResource(ResctrlTasksName, "") ResctrlL3CbmMask = NewCommonResctrlResource(ResctrlCbmMaskName, filepath.Join(RdtInfoDir, L3CatDir)) @@ -256,11 +260,19 @@ func (r *ResctrlSchemataRaw) L3Number() int { } func (r *ResctrlSchemataRaw) CacheIds() []int { - ids := []int{} + ids1 := []int{} for id := range r.L3 { - ids = append(ids, id) + ids1 = append(ids1, id) } - return ids + ids2 := []int{} + for id := range r.MB { + ids2 = append(ids2, id) + } + if len(ids1) >= len(ids2) { + return ids1 + } + + return ids2 } func (r *ResctrlSchemataRaw) L3String() string { @@ -342,6 +354,11 @@ func (r *ResctrlSchemataRaw) ValidateL3() (bool, string) { if r.L3Num != len(r.L3) { return false, "unmatched L3 number and CAT infos" } + for _, value := range r.L3 { + if value <= 0 { + return false, "wrong value of L3 mask" + } + } return true, "" } @@ -352,6 +369,11 @@ func (r *ResctrlSchemataRaw) ValidateMB() (bool, string) { if len(r.MB) <= 0 { return false, "no MBA info" } + for _, value := range r.MB { + if value <= 0 { + return false, "wrong value of MB mask" + } + } return true, "" } @@ -444,7 +466,13 @@ func ReadResctrlSchemataRaw(schemataFile string, l3Num int) (*ResctrlSchemataRaw return nil, fmt.Errorf("failed to parse l3 schemata, content %s, err: %v", string(content), err) } if l3Num == -1 { - schemataRaw.WithL3Num(len(schemataRaw.L3)) + len1 := len(schemataRaw.L3) + len2 := len(schemataRaw.MB) + if len1 >= len2 { + schemataRaw.WithL3Num(len1) + } else { + schemataRaw.WithL3Num(len2) + } } return schemataRaw, nil @@ -560,6 +588,25 @@ func CheckAndTryEnableResctrlCat() error { return nil } +func InitCatGroupIfNotExist(group string) error { + path := GetResctrlGroupRootDirPath(group) + _, err := os.Stat(path) + if err == nil { + return nil + } else if !os.IsNotExist(err) { + return fmt.Errorf("check dir %v for group %s but got unexpected err: %v", path, group, err) + } else if os.IsExist(err) { + return nil + } + err = os.Mkdir(path, 0755) + if err != nil { + resctrlErr := GetCMDStatus() + return fmt.Errorf("create dir %v failed for group %s, err: %v", + path, group, multierr.Combine(err, resctrlErr)) + } + return nil +} + func CheckResctrlSchemataValid() error { schemataPath := GetResctrlSchemataFilePath("") schemataRaw, err := ReadResctrlSchemataRaw(schemataPath, -1) @@ -701,3 +748,15 @@ func isResctrlAvailableByKernelCmd(path string) (bool, bool, error) { } return isCatFlagSet, isMbaFlagSet, nil } + +func GetCMDStatus() error { + lastCMDStatusPath := filepath.Join(Conf.SysFSRootDir, ResctrlDir, RdtInfoDir, LastCMDStatus) + errInfo, err := os.ReadFile(lastCMDStatusPath) + if err != nil { + return fmt.Errorf("failed to read last cmd status, path %s, err: %v", lastCMDStatusPath, err) + } + if len(errInfo) > 0 { + return fmt.Errorf("last cmd status: %s", string(errInfo)) + } + return nil +} diff --git a/pkg/runtimeproxy/config/config.go b/pkg/runtimeproxy/config/config.go index 940d6531c..5bfe19ae0 100644 --- a/pkg/runtimeproxy/config/config.go +++ b/pkg/runtimeproxy/config/config.go @@ -57,6 +57,7 @@ const ( PostStartContainer RuntimeHookType = "PostStartContainer" PreUpdateContainerResources RuntimeHookType = "PreUpdateContainerResources" PostStopContainer RuntimeHookType = "PostStopContainer" + PreRemoveRunPodSandbox RuntimeHookType = "PreRemoveRunPodSandbox" NoneRuntimeHookType RuntimeHookType = "NoneRuntimeHookType" )