Skip to content

Commit

Permalink
feat: add log for scheduling progress for long running scheduling sim…
Browse files Browse the repository at this point in the history
…ulations (#1788) (#1792)

Co-authored-by: Nick Tran <10810510+njtran@users.noreply.github.com>
  • Loading branch information
jmdeal and njtran authored Oct 30, 2024
1 parent 69d4919 commit 008f465
Show file tree
Hide file tree
Showing 8 changed files with 27 additions and 9 deletions.
2 changes: 1 addition & 1 deletion pkg/controllers/controllers.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func NewControllers(
) []controller.Controller {

cluster := state.NewCluster(clock, kubeClient)
p := provisioning.NewProvisioner(kubeClient, recorder, cloudProvider, cluster)
p := provisioning.NewProvisioner(kubeClient, recorder, cloudProvider, cluster, clock)
evictionQueue := terminator.NewQueue(kubeClient, recorder)
disruptionQueue := orchestration.NewQueue(kubeClient, recorder, cluster, clock, p)

Expand Down
2 changes: 1 addition & 1 deletion pkg/controllers/disruption/orchestration/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ var _ = BeforeSuite(func() {
nodeStateController = informer.NewNodeController(env.Client, cluster)
nodeClaimStateController = informer.NewNodeClaimController(env.Client, cluster)
recorder = test.NewEventRecorder()
prov = provisioning.NewProvisioner(env.Client, recorder, cloudProvider, cluster)
prov = provisioning.NewProvisioner(env.Client, recorder, cloudProvider, cluster, fakeClock)
queue = orchestration.NewTestingQueue(env.Client, recorder, cluster, fakeClock, prov)
})

Expand Down
2 changes: 1 addition & 1 deletion pkg/controllers/disruption/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ var _ = BeforeSuite(func() {
nodeStateController = informer.NewNodeController(env.Client, cluster)
nodeClaimStateController = informer.NewNodeClaimController(env.Client, cluster)
recorder = test.NewEventRecorder()
prov = provisioning.NewProvisioner(env.Client, recorder, cloudProvider, cluster)
prov = provisioning.NewProvisioner(env.Client, recorder, cloudProvider, cluster, fakeClock)
queue = orchestration.NewTestingQueue(env.Client, recorder, cluster, fakeClock, prov)
disruptionController = disruption.NewController(fakeClock, env.Client, prov, cloudProvider, recorder, cluster, queue)
})
Expand Down
6 changes: 5 additions & 1 deletion pkg/controllers/provisioning/provisioner.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog/v2"
"k8s.io/utils/clock"
controllerruntime "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/manager"
Expand Down Expand Up @@ -83,10 +84,12 @@ type Provisioner struct {
cluster *state.Cluster
recorder events.Recorder
cm *pretty.ChangeMonitor
clock clock.Clock
}

func NewProvisioner(kubeClient client.Client, recorder events.Recorder,
cloudProvider cloudprovider.CloudProvider, cluster *state.Cluster,
clock clock.Clock,
) *Provisioner {
p := &Provisioner{
batcher: NewBatcher(),
Expand All @@ -96,6 +99,7 @@ func NewProvisioner(kubeClient client.Client, recorder events.Recorder,
cluster: cluster,
recorder: recorder,
cm: pretty.NewChangeMonitor(),
clock: clock,
}
return p
}
Expand Down Expand Up @@ -302,7 +306,7 @@ func (p *Provisioner) NewScheduler(ctx context.Context, pods []*corev1.Pod, stat
if err != nil {
return nil, fmt.Errorf("getting daemon pods, %w", err)
}
return scheduler.NewScheduler(p.kubeClient, lo.ToSlicePtr(nodePoolList.Items), p.cluster, stateNodes, topology, instanceTypes, daemonSetPods, p.recorder), nil
return scheduler.NewScheduler(p.kubeClient, lo.ToSlicePtr(nodePoolList.Items), p.cluster, stateNodes, topology, instanceTypes, daemonSetPods, p.recorder, p.clock), nil
}

func (p *Provisioner) Schedule(ctx context.Context) (scheduler.Results, error) {
Expand Down
15 changes: 14 additions & 1 deletion pkg/controllers/provisioning/scheduling/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"context"
"fmt"
"sort"
"time"

"github.com/prometheus/client_golang/prometheus"
"github.com/samber/lo"
Expand All @@ -29,6 +30,7 @@ import (
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/uuid"
"k8s.io/klog/v2"
"k8s.io/utils/clock"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/log"

Expand All @@ -47,7 +49,7 @@ import (
func NewScheduler(kubeClient client.Client, nodePools []*v1.NodePool,
cluster *state.Cluster, stateNodes []*state.StateNode, topology *Topology,
instanceTypes map[string][]*cloudprovider.InstanceType, daemonSetPods []*corev1.Pod,
recorder events.Recorder) *Scheduler {
recorder events.Recorder, clock clock.Clock) *Scheduler {

// if any of the nodePools add a taint with a prefer no schedule effect, we add a toleration for the taint
// during preference relaxation
Expand All @@ -74,6 +76,7 @@ func NewScheduler(kubeClient client.Client, nodePools []*v1.NodePool,
remainingResources: lo.SliceToMap(nodePools, func(np *v1.NodePool) (string, corev1.ResourceList) {
return np.Name, corev1.ResourceList(np.Spec.Limits)
}),
clock: clock,
}
s.calculateExistingNodeClaims(stateNodes, daemonSetPods)
return s
Expand All @@ -92,6 +95,7 @@ type Scheduler struct {
cluster *state.Cluster
recorder events.Recorder
kubeClient client.Client
clock clock.Clock
}

// Results contains the results of the scheduling operation
Expand Down Expand Up @@ -206,10 +210,19 @@ func (s *Scheduler) Solve(ctx context.Context, pods []*corev1.Pod) Results {
errors := map[*corev1.Pod]error{}
QueueDepth.DeletePartialMatch(prometheus.Labels{ControllerLabel: injection.GetControllerName(ctx)}) // Reset the metric for the controller, so we don't keep old ids around
q := NewQueue(pods...)

startTime := s.clock.Now()
lastLogTime := s.clock.Now()
batchSize := len(q.pods)
for {
QueueDepth.With(
prometheus.Labels{ControllerLabel: injection.GetControllerName(ctx), schedulingIDLabel: string(s.id)},
).Set(float64(len(q.pods)))

if s.clock.Since(lastLogTime) > time.Minute {
log.FromContext(ctx).WithValues("pods-scheduled", batchSize-len(q.pods), "pods-remaining", len(q.pods), "duration", s.clock.Since(startTime).Truncate(time.Second), "scheduling-id", string(s.id)).Info("computing pod scheduling...")
lastLogTime = s.clock.Now()
}
// Try the next pod
pod, ok := q.Pop()
if !ok {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,8 @@ func benchmarkScheduler(b *testing.B, instanceCount, podCount int) {

client := fakecr.NewFakeClient()
pods := makeDiversePods(podCount)
cluster = state.NewCluster(&clock.RealClock{}, client)
clock := &clock.RealClock{}
cluster = state.NewCluster(clock, client)
domains := map[string]sets.Set[string]{}
topology, err := scheduling.NewTopology(ctx, client, cluster, domains, pods)
if err != nil {
Expand All @@ -176,7 +177,7 @@ func benchmarkScheduler(b *testing.B, instanceCount, podCount int) {
scheduler := scheduling.NewScheduler(client, []*v1.NodePool{nodePool},
cluster, nil, topology,
map[string][]*cloudprovider.InstanceType{nodePool.Name: instanceTypes}, nil,
events.NewRecorder(&record.FakeRecorder{}))
events.NewRecorder(&record.FakeRecorder{}), clock)

b.ResetTimer()
// Pack benchmark
Expand Down
2 changes: 1 addition & 1 deletion pkg/controllers/provisioning/scheduling/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ var _ = BeforeSuite(func() {
nodeStateController = informer.NewNodeController(env.Client, cluster)
nodeClaimStateController = informer.NewNodeClaimController(env.Client, cluster)
podStateController = informer.NewPodController(env.Client, cluster)
prov = provisioning.NewProvisioner(env.Client, events.NewRecorder(&record.FakeRecorder{}), cloudProvider, cluster)
prov = provisioning.NewProvisioner(env.Client, events.NewRecorder(&record.FakeRecorder{}), cloudProvider, cluster, fakeClock)
})

var _ = AfterSuite(func() {
Expand Down
2 changes: 1 addition & 1 deletion pkg/controllers/provisioning/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ var _ = BeforeSuite(func() {
fakeClock = clock.NewFakeClock(time.Now())
cluster = state.NewCluster(fakeClock, env.Client)
nodeController = informer.NewNodeController(env.Client, cluster)
prov = provisioning.NewProvisioner(env.Client, events.NewRecorder(&record.FakeRecorder{}), cloudProvider, cluster)
prov = provisioning.NewProvisioner(env.Client, events.NewRecorder(&record.FakeRecorder{}), cloudProvider, cluster, fakeClock)
daemonsetController = informer.NewDaemonSetController(env.Client, cluster)
instanceTypes, _ := cloudProvider.GetInstanceTypes(ctx, nil)
instanceTypeMap = map[string]*cloudprovider.InstanceType{}
Expand Down

0 comments on commit 008f465

Please sign in to comment.