Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove scheduler waits to speed up recovery time #8200

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions pkg/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,18 +92,18 @@ type Evictor func(pod *corev1.Pod, vpod VPod, from *duckv1alpha1.Placement) erro
// Scheduler is responsible for placing VPods into real Kubernetes pods
type Scheduler interface {
// Schedule computes the new set of placements for vpod.
Schedule(vpod VPod) ([]duckv1alpha1.Placement, error)
Schedule(ctx context.Context, vpod VPod) ([]duckv1alpha1.Placement, error)
}

// SchedulerFunc type is an adapter to allow the use of
// ordinary functions as Schedulers. If f is a function
// with the appropriate signature, SchedulerFunc(f) is a
// Scheduler that calls f.
type SchedulerFunc func(vpod VPod) ([]duckv1alpha1.Placement, error)
type SchedulerFunc func(ctx context.Context, vpod VPod) ([]duckv1alpha1.Placement, error)

// Schedule implements the Scheduler interface.
func (f SchedulerFunc) Schedule(vpod VPod) ([]duckv1alpha1.Placement, error) {
return f(vpod)
func (f SchedulerFunc) Schedule(ctx context.Context, vpod VPod) ([]duckv1alpha1.Placement, error) {
return f(ctx, vpod)
}

// VPod represents virtual replicas placed into real Kubernetes pods
Expand Down
5 changes: 3 additions & 2 deletions pkg/scheduler/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package scheduler

import (
"context"
"testing"

"github.com/stretchr/testify/require"
Expand All @@ -28,12 +29,12 @@ func TestSchedulerFuncSchedule(t *testing.T) {

called := 0

var s Scheduler = SchedulerFunc(func(vpod VPod) ([]duckv1alpha1.Placement, error) {
var s Scheduler = SchedulerFunc(func(ctx context.Context, vpod VPod) ([]duckv1alpha1.Placement, error) {
called++
return nil, nil
})

_, err := s.Schedule(nil)
_, err := s.Schedule(context.Background(), nil)
require.Nil(t, err)
require.Equal(t, 1, called)
}
9 changes: 5 additions & 4 deletions pkg/scheduler/state/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (

"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/wait"

"knative.dev/eventing/pkg/scheduler"
)

Expand Down Expand Up @@ -55,10 +56,10 @@ func SatisfyZoneAvailability(feasiblePods []int32, states *State) bool {
var zoneName string
var err error
for _, podID := range feasiblePods {
wait.PollUntilContextTimeout(context.Background(), 50*time.Millisecond, 5*time.Second, true, func(ctx context.Context) (bool, error) {
zoneName, _, err = states.GetPodInfo(PodNameFromOrdinal(states.StatefulSetName, podID))
return err == nil, nil
})
zoneName, _, err = states.GetPodInfo(PodNameFromOrdinal(states.StatefulSetName, podID))
if err != nil {
continue
}
zoneMap[zoneName] = struct{}{}
}
return len(zoneMap) == int(states.NumZones)
Expand Down
95 changes: 44 additions & 51 deletions pkg/scheduler/state/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,17 +22,14 @@ import (
"errors"
"math"
"strconv"
"time"

"go.uber.org/zap"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
corev1 "k8s.io/client-go/listers/core/v1"

"knative.dev/pkg/logging"

"knative.dev/eventing/pkg/scheduler"
Expand All @@ -42,7 +39,7 @@ type StateAccessor interface {
// State returns the current state (snapshot) about placed vpods
// Take into account reserved vreplicas and update `reserved` to reflect
// the current state.
State(reserved map[types.NamespacedName]map[string]int32) (*State, error)
State(ctx context.Context, reserved map[types.NamespacedName]map[string]int32) (*State, error)
}

// state provides information about the current scheduling of all vpods
Expand Down Expand Up @@ -152,8 +149,6 @@ func (s *State) IsSchedulablePod(ordinal int32) bool {

// stateBuilder reconstruct the state from scratch, by listing vpods
type stateBuilder struct {
ctx context.Context
logger *zap.SugaredLogger
vpodLister scheduler.VPodLister
capacity int32
schedulerPolicy scheduler.SchedulerPolicyType
Expand All @@ -166,11 +161,9 @@ type stateBuilder struct {
}

// NewStateBuilder returns a StateAccessor recreating the state from scratch each time it is requested
func NewStateBuilder(ctx context.Context, namespace, sfsname string, lister scheduler.VPodLister, podCapacity int32, schedulerPolicy scheduler.SchedulerPolicyType, schedPolicy *scheduler.SchedulerPolicy, deschedPolicy *scheduler.SchedulerPolicy, podlister corev1.PodNamespaceLister, nodeLister corev1.NodeLister, statefulSetCache *scheduler.ScaleCache) StateAccessor {
func NewStateBuilder(sfsname string, lister scheduler.VPodLister, podCapacity int32, schedulerPolicy scheduler.SchedulerPolicyType, schedPolicy, deschedPolicy *scheduler.SchedulerPolicy, podlister corev1.PodNamespaceLister, nodeLister corev1.NodeLister, statefulSetCache *scheduler.ScaleCache) StateAccessor {

return &stateBuilder{
ctx: ctx,
logger: logging.FromContext(ctx),
vpodLister: lister,
capacity: podCapacity,
schedulerPolicy: schedulerPolicy,
Expand All @@ -183,15 +176,18 @@ func NewStateBuilder(ctx context.Context, namespace, sfsname string, lister sche
}
}

func (s *stateBuilder) State(reserved map[types.NamespacedName]map[string]int32) (*State, error) {
func (s *stateBuilder) State(ctx context.Context, reserved map[types.NamespacedName]map[string]int32) (*State, error) {
vpods, err := s.vpodLister()
if err != nil {
return nil, err
}

scale, err := s.statefulSetCache.GetScale(s.ctx, s.statefulSetName, metav1.GetOptions{})
logger := logging.FromContext(ctx).With("subcomponent", "statebuilder")
ctx = logging.WithLogger(ctx, logger)

scale, err := s.statefulSetCache.GetScale(ctx, s.statefulSetName, metav1.GetOptions{})
if err != nil {
s.logger.Infow("failed to get statefulset", zap.Error(err))
logger.Infow("failed to get statefulset", zap.Error(err))
return nil, err
}

Expand Down Expand Up @@ -235,36 +231,35 @@ func (s *stateBuilder) State(reserved map[types.NamespacedName]map[string]int32)
}

for podId := int32(0); podId < scale.Spec.Replicas && s.podLister != nil; podId++ {
var pod *v1.Pod
wait.PollUntilContextTimeout(context.Background(), 50*time.Millisecond, 5*time.Second, true, func(ctx context.Context) (bool, error) {
pod, err = s.podLister.Get(PodNameFromOrdinal(s.statefulSetName, podId))
return err == nil, nil
})

if pod != nil {
if isPodUnschedulable(pod) {
// Pod is marked for eviction - CANNOT SCHEDULE VREPS on this pod.
continue
}

node, err := s.nodeLister.Get(pod.Spec.NodeName)
if err != nil {
return nil, err
}
pod, err := s.podLister.Get(PodNameFromOrdinal(s.statefulSetName, podId))
if err != nil {
logger.Warnw("Failed to get pod", zap.Int32("ordinal", podId), zap.Error(err))
continue
}
if isPodUnschedulable(pod) {
// Pod is marked for eviction - CANNOT SCHEDULE VREPS on this pod.
logger.Debugw("Pod is unschedulable", zap.Any("pod", pod))
continue
}

if isNodeUnschedulable(node) {
// Node is marked as Unschedulable - CANNOT SCHEDULE VREPS on a pod running on this node.
continue
}
node, err := s.nodeLister.Get(pod.Spec.NodeName)
if err != nil {
return nil, err
}

// Pod has no annotation or not annotated as unschedulable and
// not on an unschedulable node, so add to feasible
schedulablePods.Insert(podId)
if isNodeUnschedulable(node) {
// Node is marked as Unschedulable - CANNOT SCHEDULE VREPS on a pod running on this node.
logger.Debugw("Pod is on an unschedulable node", zap.Any("pod", node))
continue
}

// Pod has no annotation or not annotated as unschedulable and
// not on an unschedulable node, so add to feasible
schedulablePods.Insert(podId)
}

for _, p := range schedulablePods.List() {
free, last = s.updateFreeCapacity(free, last, PodNameFromOrdinal(s.statefulSetName, p), 0)
free, last = s.updateFreeCapacity(logger, free, last, PodNameFromOrdinal(s.statefulSetName, p), 0)
}

// Getting current state from existing placements for all vpods
Expand All @@ -286,15 +281,14 @@ func (s *stateBuilder) State(reserved map[types.NamespacedName]map[string]int32)
// Account for reserved vreplicas
vreplicas = withReserved(vpod.GetKey(), podName, vreplicas, reserved)

free, last = s.updateFreeCapacity(free, last, podName, vreplicas)
free, last = s.updateFreeCapacity(logger, free, last, podName, vreplicas)

withPlacement[vpod.GetKey()][podName] = true

var pod *v1.Pod
wait.PollUntilContextTimeout(context.Background(), 50*time.Millisecond, 5*time.Second, true, func(ctx context.Context) (bool, error) {
pod, err = s.podLister.Get(podName)
return err == nil, nil
})
pod, err := s.podLister.Get(podName)
if err != nil {
logger.Warnw("Failed to get pod", zap.String("podName", podName), zap.Error(err))
}

if pod != nil && schedulablePods.Has(OrdinalFromPodName(pod.GetName())) {
nodeName := pod.Spec.NodeName //node name for this pod
Expand All @@ -315,11 +309,10 @@ func (s *stateBuilder) State(reserved map[types.NamespacedName]map[string]int32)
continue
}

var pod *v1.Pod
wait.PollUntilContextTimeout(context.Background(), 50*time.Millisecond, 5*time.Second, true, func(ctx context.Context) (bool, error) {
pod, err = s.podLister.Get(podName)
return err == nil, nil
})
pod, err := s.podLister.Get(podName)
if err != nil {
logger.Warnw("Failed to get pod", zap.String("podName", podName), zap.Error(err))
}

if pod != nil && schedulablePods.Has(OrdinalFromPodName(pod.GetName())) {
nodeName := pod.Spec.NodeName //node name for this pod
Expand All @@ -330,15 +323,15 @@ func (s *stateBuilder) State(reserved map[types.NamespacedName]map[string]int32)
}
}

free, last = s.updateFreeCapacity(free, last, podName, rvreplicas)
free, last = s.updateFreeCapacity(logger, free, last, podName, rvreplicas)
}
}

state := &State{FreeCap: free, SchedulablePods: schedulablePods.List(), LastOrdinal: last, Capacity: s.capacity, Replicas: scale.Spec.Replicas, NumZones: int32(len(zoneMap)), NumNodes: int32(len(nodeToZoneMap)),
SchedulerPolicy: s.schedulerPolicy, SchedPolicy: s.schedPolicy, DeschedPolicy: s.deschedPolicy, NodeToZoneMap: nodeToZoneMap, StatefulSetName: s.statefulSetName, PodLister: s.podLister,
PodSpread: podSpread, NodeSpread: nodeSpread, ZoneSpread: zoneSpread, Pending: pending, ExpectedVReplicaByVPod: expectedVReplicasByVPod}

s.logger.Infow("cluster state info", zap.Any("state", state), zap.Any("reserved", toJSONable(reserved)))
logger.Infow("cluster state info", zap.Any("state", state), zap.Any("reserved", toJSONable(reserved)))

return state, nil
}
Expand All @@ -350,7 +343,7 @@ func pendingFromVPod(vpod scheduler.VPod) int32 {
return int32(math.Max(float64(0), float64(expected-scheduled)))
}

func (s *stateBuilder) updateFreeCapacity(free []int32, last int32, podName string, vreplicas int32) ([]int32, int32) {
func (s *stateBuilder) updateFreeCapacity(logger *zap.SugaredLogger, free []int32, last int32, podName string, vreplicas int32) ([]int32, int32) {
ordinal := OrdinalFromPodName(podName)
free = grow(free, ordinal, s.capacity)

Expand All @@ -359,7 +352,7 @@ func (s *stateBuilder) updateFreeCapacity(free []int32, last int32, podName stri
// Assert the pod is not overcommitted
if free[ordinal] < 0 {
// This should not happen anymore. Log as an error but do not interrupt the current scheduling.
s.logger.Warnw("pod is overcommitted", zap.String("podName", podName), zap.Int32("free", free[ordinal]))
logger.Warnw("pod is overcommitted", zap.String("podName", podName), zap.Int32("free", free[ordinal]))
}

if ordinal > last {
Expand Down
4 changes: 2 additions & 2 deletions pkg/scheduler/state/state_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -645,8 +645,8 @@ func TestStateBuilder(t *testing.T) {

scaleCache := scheduler.NewScaleCache(ctx, testNs, kubeclient.Get(ctx).AppsV1().StatefulSets(testNs), scheduler.ScaleCacheConfig{RefreshPeriod: time.Minute * 5})

stateBuilder := NewStateBuilder(ctx, testNs, sfsName, vpodClient.List, int32(10), tc.schedulerPolicyType, &scheduler.SchedulerPolicy{}, &scheduler.SchedulerPolicy{}, lsp.GetPodLister().Pods(testNs), lsn.GetNodeLister(), scaleCache)
state, err := stateBuilder.State(tc.reserved)
stateBuilder := NewStateBuilder(sfsName, vpodClient.List, int32(10), tc.schedulerPolicyType, &scheduler.SchedulerPolicy{}, &scheduler.SchedulerPolicy{}, lsp.GetPodLister().Pods(testNs), lsn.GetNodeLister(), scaleCache)
state, err := stateBuilder.State(ctx, tc.reserved)
if err != nil {
t.Fatal("unexpected error", err)
}
Expand Down
Loading
Loading