Skip to content

Commit

Permalink
Merge branch 'gtest/tke/v1.20.6' into 'tke/v1.20.6' (merge request !458)
Browse files Browse the repository at this point in the history
feat(*): support batch schedule on eklet ip resource
MR 描述 / 目的:

虚拟节点支持批量调度,IP资源感知

关联 issue:
Fixes #
特性合入tke分支,注意事项及测试case:
http://tapd.oa.com/eks/prong/stories/view/1020426016865778281
该特性已在eks项目中发布,具体实现及设计方案:
http://tapd.oa.com/eks/prong/stories/view/1020426016863511325

代码审查须知:
无
MR 是否对用户有影响?:
无
  • Loading branch information
borgerli committed Jun 16, 2021
2 parents 5e8aacc + 2cc4f77 commit af19637
Show file tree
Hide file tree
Showing 8 changed files with 232 additions and 21 deletions.
11 changes: 9 additions & 2 deletions pkg/features/kube_features.go
Original file line number Diff line number Diff line change
Expand Up @@ -732,6 +732,12 @@ const (
//
// Schedule pods according to remain resources in available zone.
EnableComputeResource featuregate.Feature = "EnableComputeResource"

// owner @tke.tencent
// alpha: v1.20
//
// Schedule pods according to remain IP resource in the subnet of virtual node.
EnableFitIPResource featuregate.Feature = "EnableFitIPResource"
)

func init() {
Expand Down Expand Up @@ -860,6 +866,7 @@ var defaultKubernetesFeatureGates = map[featuregate.Feature]featuregate.FeatureS
HPAScaleToZero: {Default: false, PreRelease: featuregate.Alpha},
LegacyNodeRoleBehavior: {Default: true, PreRelease: featuregate.Beta},

// tke specific features
EnableComputeResource: {Default: true, PreRelease: featuregate.Alpha},
// eks specific features
EnableComputeResource: {Default: true, PreRelease: featuregate.Alpha},
EnableFitIPResource: {Default: true, PreRelease: featuregate.Alpha},
}
3 changes: 3 additions & 0 deletions pkg/scheduler/algorithmprovider/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package algorithmprovider
import (
"fmt"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/computeresource"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/fitipresource"

utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/klog/v2"
Expand Down Expand Up @@ -84,6 +85,7 @@ func getDefaultConfig() *schedulerapi.Plugins {
{Name: podtopologyspread.Name},
{Name: interpodaffinity.Name},
{Name: volumebinding.Name},
{Name: computeresource.Name},
},
},
Filter: &schedulerapi.PluginSet{
Expand All @@ -105,6 +107,7 @@ func getDefaultConfig() *schedulerapi.Plugins {
{Name: interpodaffinity.Name},
{Name: computeresource.Name},
{Name: localreplicas.Name},
{Name: fitipresource.Name},
},
},
PostFilter: &schedulerapi.PluginSet{
Expand Down
57 changes: 57 additions & 0 deletions pkg/scheduler/framework/plugins/fitipresource/fit_ip_resource.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package fitipresource

import (
"context"
"fmt"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/scheduler/framework"
"k8s.io/kubernetes/pkg/scheduler/util"
)

const (
// Name is the name of the plugin used in the plugin registry and configurations.
Name = "FitIPResource"
)

type FitIPResource struct {
}

var _ framework.FilterPlugin = &FitIPResource{}

// Name returns name of the plugin. It is used in logs, etc.
func (f *FitIPResource) Name() string {
return Name
}

// Filter invoked at the filter extension point.
func (f *FitIPResource) Filter(ctx context.Context, cycleState *framework.CycleState, pod *v1.Pod, nodeInfo *framework.NodeInfo) *framework.Status {
node := nodeInfo.Node()
if node == nil {
return framework.NewStatus(framework.Error, fmt.Sprintf("node not found"))
}
if !util.IsEkletNode(node) {
// 不是eklet节点,调度器放过
return nil
}
klog.V(4).Infof("Run filter FitIPResource for pod %s on eklet node \"%s\".", pod.Name, node.Name)

allowedIPCount, exists := util.AllowedIPCount(node)
if !exists { // when eklet doesn't work immediately, skip this policy
klog.V(4).Infof("pod \"%s/%s\" skip filter FitIPResource on node \"%s\" because no available-ip-count label on node now.", pod.Namespace, pod.Name, node.Name)
return nil
}

usedIPResource := nodeInfo.UsedIPResource()
klog.V(4).Infof("node %s consume ip resource %d, allowedIPCount %d", node.Name, usedIPResource, allowedIPCount)
if allowedIPCount <= usedIPResource {
return framework.NewStatus(framework.UnschedulableAndUnresolvable, fmt.Sprintf("Insufficient ip resource"))
}
return nil
}

// New initializes a new plugin and returns it.
func New(_ runtime.Object, _ framework.Handle) (framework.Plugin, error) {
return &FitIPResource{}, nil
}
15 changes: 14 additions & 1 deletion pkg/scheduler/framework/plugins/legacy_registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/scheduler/apis/config"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/computeresource"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/fitipresource"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/imagelocality"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/interpodaffinity"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/localreplicas"
Expand Down Expand Up @@ -87,6 +88,8 @@ const (
)

const (
// FitIPResource defines the name of predicate FitIPResource.
FitIPResourcePred = "FitIPResourcePred"
// MatchInterPodAffinityPred defines the name of predicate MatchInterPodAffinity.
MatchInterPodAffinityPred = "MatchInterPodAffinity"
// CheckVolumeBindingPred defines the name of predicate CheckVolumeBinding.
Expand Down Expand Up @@ -149,7 +152,7 @@ var predicateOrdering = []string{
PodToleratesNodeTaintsPred, CheckNodeLabelPresencePred,
CheckServiceAffinityPred, MaxEBSVolumeCountPred, MaxGCEPDVolumeCountPred, MaxCSIVolumeCountPred,
MaxAzureDiskVolumeCountPred, MaxCinderVolumeCountPred, CheckVolumeBindingPred, NoVolumeZoneConflictPred,
EvenPodsSpreadPred, MatchInterPodAffinityPred, CheckComputeResourcePred, CheckLocalReplicasPred,MaxQcloudCbsVolumeCount,
EvenPodsSpreadPred, MatchInterPodAffinityPred, CheckComputeResourcePred, CheckLocalReplicasPred, MaxQcloudCbsVolumeCount, FitIPResourcePred,
}

// LegacyRegistry is used to store current state of registered predicates and priorities.
Expand Down Expand Up @@ -482,6 +485,16 @@ func NewLegacyRegistry() *LegacyRegistry {
})
registry.DefaultPriorities[ComputeResourcePriority] = 1
}

// Only register FitIPResource predicate if the feature is enabled
if feature.DefaultFeatureGate.Enabled(features.EnableFitIPResource) {
klog.Infof("Registering FitIPResource predicate function")
registry.registerPredicateConfigProducer(FitIPResourcePred,
func(_ ConfigProducerArgs, plugins *config.Plugins, _ *[]config.PluginConfig) {
plugins.Filter = appendToPluginSet(plugins.Filter, fitipresource.Name, nil)
})
registry.DefaultPredicates.Insert(FitIPResourcePred)
}
return registry
}

Expand Down
2 changes: 2 additions & 0 deletions pkg/scheduler/framework/plugins/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/computeresource"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/defaultbinder"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/defaultpreemption"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/fitipresource"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/imagelocality"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/interpodaffinity"
"k8s.io/kubernetes/pkg/scheduler/framework/plugins/localreplicas"
Expand Down Expand Up @@ -78,5 +79,6 @@ func NewInTreeRegistry() runtime.Registry {
computeresource.Name: computeresource.New,
localreplicas.Name: localreplicas.New,
nodevolumelimits.QcloudCBSName: nodevolumelimits.NewQcloudCBS,
fitipresource.Name: fitipresource.New,
}
}
88 changes: 73 additions & 15 deletions pkg/scheduler/framework/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,8 @@ type NodeInfo struct {
// Ports allocated on the node.
UsedPorts HostPortInfo

usedEkletIPResource int

// Total requested resources of all pods on this node. This includes assumed
// pods, which scheduler has sent for binding, but may not be scheduled yet.
Requested *Resource
Expand Down Expand Up @@ -412,13 +414,14 @@ func (r *Resource) SetMaxResource(rl v1.ResourceList) {
// the returned object.
func NewNodeInfo(pods ...*v1.Pod) *NodeInfo {
ni := &NodeInfo{
Requested: &Resource{},
NonZeroRequested: &Resource{},
Allocatable: &Resource{},
TransientInfo: NewTransientSchedulerInfo(),
Generation: nextGeneration(),
UsedPorts: make(HostPortInfo),
ImageStates: make(map[string]*ImageStateSummary),
Requested: &Resource{},
NonZeroRequested: &Resource{},
Allocatable: &Resource{},
TransientInfo: NewTransientSchedulerInfo(),
Generation: nextGeneration(),
UsedPorts: make(HostPortInfo),
usedEkletIPResource: 0,
ImageStates: make(map[string]*ImageStateSummary),
}
for _, pod := range pods {
ni.AddPod(pod)
Expand All @@ -434,17 +437,26 @@ func (n *NodeInfo) Node() *v1.Node {
return n.node
}

// UsedPorts returns used ports on this node.
func (n *NodeInfo) UsedIPResource() int {
if !schedutil.IsEkletNode(n.node) {
return 0
}
return n.usedEkletIPResource
}

// Clone returns a copy of this node.
func (n *NodeInfo) Clone() *NodeInfo {
clone := &NodeInfo{
node: n.node,
Requested: n.Requested.Clone(),
NonZeroRequested: n.NonZeroRequested.Clone(),
Allocatable: n.Allocatable.Clone(),
TransientInfo: n.TransientInfo,
UsedPorts: make(HostPortInfo),
ImageStates: n.ImageStates,
Generation: n.Generation,
node: n.node,
Requested: n.Requested.Clone(),
NonZeroRequested: n.NonZeroRequested.Clone(),
Allocatable: n.Allocatable.Clone(),
TransientInfo: n.TransientInfo,
UsedPorts: make(HostPortInfo),
ImageStates: n.ImageStates,
Generation: n.Generation,
usedEkletIPResource: n.usedEkletIPResource,
}
if len(n.Pods) > 0 {
clone.Pods = append([]*PodInfo(nil), n.Pods...)
Expand Down Expand Up @@ -480,6 +492,10 @@ func (n *NodeInfo) String() string {

// AddPod adds pod information to this NodeInfo.
func (n *NodeInfo) AddPod(pod *v1.Pod) {
n.addPod(pod, false)
}

func (n *NodeInfo) addPod(pod *v1.Pod, assumedOnEkletNode bool) {
podInfo := NewPodInfo(pod)
res, non0CPU, non0Mem := calculateResource(pod)
n.Requested.MilliCPU += res.MilliCPU
Expand All @@ -504,9 +520,21 @@ func (n *NodeInfo) AddPod(pod *v1.Pod) {
// Consume ports when pods added.
n.updateUsedPorts(podInfo.Pod, true)

if assumedOnEkletNode {
klog.V(4).Infof("consume node %s ip resource, used ip resource=%d", n.node.Name, n.UsedIPResource()+1)
n.usedEkletIPResource++
}

n.Generation = nextGeneration()
}

// AddAssumedPod adds assumed pod information to this eklet NodeInfo.
// It is a must to check the pod is in assumed before trigger this func
func (n *NodeInfo) AddAssumedPod(pod *v1.Pod) {
klog.V(4).Infof("AddAssumedPod when add pod %s", pod.Name)
n.addPod(pod, schedutil.IsEkletNode(n.node))
}

func podWithAffinity(p *v1.Pod) bool {
affinity := p.Spec.Affinity
return affinity != nil && (affinity.PodAffinity != nil || affinity.PodAntiAffinity != nil)
Expand Down Expand Up @@ -537,6 +565,10 @@ func removeFromSlice(s []*PodInfo, k string) []*PodInfo {

// RemovePod subtracts pod information from this NodeInfo.
func (n *NodeInfo) RemovePod(pod *v1.Pod) error {
return n.removePod(pod, false)
}

func (n *NodeInfo) removePod(pod *v1.Pod, assumedOnEkletNode bool) error {
k, err := GetPodKey(pod)
if err != nil {
return err
Expand Down Expand Up @@ -576,6 +608,10 @@ func (n *NodeInfo) RemovePod(pod *v1.Pod) error {
// Release ports when remove Pods.
n.updateUsedPorts(pod, false)

if assumedOnEkletNode {
klog.V(4).Infof("dec node %s ip resource, used ip resource=%d", n.node.Name, n.UsedIPResource()-1)
n.usedEkletIPResource--
}
n.Generation = nextGeneration()
n.resetSlicesIfEmpty()
return nil
Expand All @@ -584,6 +620,13 @@ func (n *NodeInfo) RemovePod(pod *v1.Pod) error {
return fmt.Errorf("no corresponding pod %s in pods of node %s", pod.Name, n.node.Name)
}

// RemoveAssumedPod subtracts assumed pod information from this NodeInfo.
// It is a must to check the pod is in assumed before trigger this func
func (n *NodeInfo) RemoveAssumedPod(pod *v1.Pod) error {
klog.V(4).Infof("RemoveAssumedPod when remove pod %s", pod.Name)
return n.removePod(pod, schedutil.IsEkletNode(n.node))
}

// resets the slices to nil so that we can do DeepEqual in unit tests.
func (n *NodeInfo) resetSlicesIfEmpty() {
if len(n.PodsWithAffinity) == 0 {
Expand Down Expand Up @@ -652,13 +695,28 @@ func (n *NodeInfo) updateUsedPorts(pod *v1.Pod, add bool) {

// SetNode sets the overall node information.
func (n *NodeInfo) SetNode(node *v1.Node) error {
return n.setNode(node, false)
}

func (n *NodeInfo) setNode(node *v1.Node, ekletIPResourceChanged bool) error {
n.node = node

if ekletIPResourceChanged {
n.usedEkletIPResource = 0
}

n.Allocatable = NewResource(node.Status.Allocatable)
n.TransientInfo = NewTransientSchedulerInfo()
n.Generation = nextGeneration()
return nil
}

// SetEkletNode sets the overall node information if eklet available ip count changes.
// It is a must to check the node available ip count changes
func (n *NodeInfo) SetEkletNode(node *v1.Node) error {
return n.setNode(node, schedutil.IsEkletNode(node))
}

// RemoveNode removes the node object, leaving all other tracking information.
func (n *NodeInfo) RemoveNode() {
n.node = nil
Expand Down
Loading

0 comments on commit af19637

Please sign in to comment.