Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

bug fix: ensure ComponentStatus is initialized in ExpectedPodCount #170

Merged
merged 1 commit into from
Jun 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions internal/controller/appwrapper/appwrapper_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ import (
"sigs.k8s.io/controller-runtime/pkg/reconcile"

workloadv1beta2 "github.com/project-codeflare/appwrapper/api/v1beta2"
"github.com/project-codeflare/appwrapper/internal/controller/awstatus"
"github.com/project-codeflare/appwrapper/pkg/config"
"github.com/project-codeflare/appwrapper/pkg/utils"
)
Expand Down Expand Up @@ -155,7 +154,7 @@ func (r *AppWrapperReconciler) Reconcile(ctx context.Context, req ctrl.Request)
}
}

if err := awstatus.EnsureComponentStatusInitialized(ctx, aw); err != nil {
if err := utils.EnsureComponentStatusInitialized(aw); err != nil {
return ctrl.Result{}, err
}

Expand Down Expand Up @@ -491,7 +490,11 @@ func (r *AppWrapperReconciler) getPodStatus(ctx context.Context, aw *workloadv1b
client.MatchingLabels{AppWrapperLabel: aw.Name}); err != nil {
return nil, err
}
summary := &podStatusSummary{expected: utils.ExpectedPodCount(aw)}
pc, err := utils.ExpectedPodCount(aw)
if err != nil {
return nil, err
}
summary := &podStatusSummary{expected: pc}

for _, pod := range pods.Items {
switch pod.Status.Phase {
Expand Down
20 changes: 13 additions & 7 deletions internal/controller/appwrapper/appwrapper_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ var _ = Describe("AppWrapper Controller", func() {
Expect((*workload.AppWrapper)(aw).IsSuspended()).Should(BeFalse())
podStatus, err := awReconciler.getPodStatus(ctx, aw)
Expect(err).NotTo(HaveOccurred())
Expect(podStatus.pending).Should(Equal(utils.ExpectedPodCount(aw)))
Expect(utils.ExpectedPodCount(aw)).Should(Equal(podStatus.pending))

By("Simulating first Pod Running")
Expect(setPodStatus(aw, v1.PodRunning, 1)).To(Succeed())
Expand All @@ -123,15 +123,18 @@ var _ = Describe("AppWrapper Controller", func() {
Expect((*workload.AppWrapper)(aw).IsSuspended()).Should(BeFalse())
podStatus, err = awReconciler.getPodStatus(ctx, aw)
Expect(err).NotTo(HaveOccurred())
Expect(podStatus.pending).Should(Equal(utils.ExpectedPodCount(aw) - 1))
Expect(podStatus.running).Should(Equal(int32(1)))
Expect(utils.ExpectedPodCount(aw)).Should(Equal(podStatus.pending + podStatus.running))
}

fullyRunning := func() {
aw := getAppWrapper(awName)
By("Simulating all Pods Running")
Expect(setPodStatus(aw, v1.PodRunning, utils.ExpectedPodCount(aw))).To(Succeed())
pc, err := utils.ExpectedPodCount(aw)
Expect(err).NotTo(HaveOccurred())
Expect(setPodStatus(aw, v1.PodRunning, pc)).To(Succeed())
By("Reconciling: Running -> Running")
_, err := awReconciler.Reconcile(ctx, reconcile.Request{NamespacedName: awName})
_, err = awReconciler.Reconcile(ctx, reconcile.Request{NamespacedName: awName})
Expect(err).NotTo(HaveOccurred())

aw = getAppWrapper(awName)
Expand All @@ -144,7 +147,7 @@ var _ = Describe("AppWrapper Controller", func() {
Expect((*workload.AppWrapper)(aw).PodsReady()).Should(BeTrue())
podStatus, err := awReconciler.getPodStatus(ctx, aw)
Expect(err).NotTo(HaveOccurred())
Expect(podStatus.running).Should(Equal(utils.ExpectedPodCount(aw)))
Expect(podStatus.running).Should(Equal(pc))
_, finished := (*workload.AppWrapper)(aw).Finished()
Expect(finished).Should(BeFalse())
}
Expand Down Expand Up @@ -184,13 +187,16 @@ var _ = Describe("AppWrapper Controller", func() {
Expect(meta.IsStatusConditionTrue(aw.Status.Conditions, string(workloadv1beta2.QuotaReserved))).Should(BeTrue())
Expect((*workload.AppWrapper)(aw).IsActive()).Should(BeTrue())
Expect((*workload.AppWrapper)(aw).IsSuspended()).Should(BeFalse())
pc, err := utils.ExpectedPodCount(aw)
Expect(err).NotTo(HaveOccurred())
Expect(pc).Should(Equal(int32(2)))
podStatus, err := awReconciler.getPodStatus(ctx, aw)
Expect(err).NotTo(HaveOccurred())
Expect(podStatus.running).Should(Equal(utils.ExpectedPodCount(aw) - 1))
Expect(podStatus.running).Should(Equal(int32(1)))
Expect(podStatus.succeeded).Should(Equal(int32(1)))

By("Simulating all Pods Completing")
Expect(setPodStatus(aw, v1.PodSucceeded, utils.ExpectedPodCount(aw))).To(Succeed())
Expect(setPodStatus(aw, v1.PodSucceeded, 2)).To(Succeed())
By("Reconciling: Running -> Succeeded")
_, err = awReconciler.Reconcile(ctx, reconcile.Request{NamespacedName: awName})
Expect(err).NotTo(HaveOccurred())
Expand Down
54 changes: 0 additions & 54 deletions internal/controller/awstatus/status_utils.go

This file was deleted.

4 changes: 1 addition & 3 deletions internal/controller/workload/workload_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ limitations under the License.
package workload

import (
"context"
"fmt"

"k8s.io/apimachinery/pkg/api/meta"
Expand All @@ -33,7 +32,6 @@ import (
"sigs.k8s.io/kueue/pkg/podset"

workloadv1beta2 "github.com/project-codeflare/appwrapper/api/v1beta2"
"github.com/project-codeflare/appwrapper/internal/controller/awstatus"
"github.com/project-codeflare/appwrapper/pkg/utils"
)

Expand Down Expand Up @@ -79,7 +77,7 @@ func (aw *AppWrapper) GVK() schema.GroupVersionKind {

func (aw *AppWrapper) PodSets() []kueue.PodSet {
podSets := []kueue.PodSet{}
if err := awstatus.EnsureComponentStatusInitialized(context.Background(), (*workloadv1beta2.AppWrapper)(aw)); err != nil {
if err := utils.EnsureComponentStatusInitialized((*workloadv1beta2.AppWrapper)(aw)); err != nil {
// Kueue will raise an error on zero length PodSet. Unfortunately, the Kueue API prevents propagating the actual error
return podSets
}
Expand Down
36 changes: 34 additions & 2 deletions pkg/utils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -285,14 +285,46 @@ func Replicas(ps workloadv1beta2.AppWrapperPodSet) int32 {
}
}

func ExpectedPodCount(aw *workloadv1beta2.AppWrapper) int32 {
func ExpectedPodCount(aw *workloadv1beta2.AppWrapper) (int32, error) {
if err := EnsureComponentStatusInitialized(aw); err != nil {
return 0, err
}
var expected int32
for _, c := range aw.Status.ComponentStatus {
for _, s := range c.PodSets {
expected += Replicas(s)
}
}
return expected
return expected, nil
}

// EnsureComponentStatusInitialized initializes aw.Status.ComponenetStatus, including performing PodSet inference for known GVKs
func EnsureComponentStatusInitialized(aw *workloadv1beta2.AppWrapper) error {
if len(aw.Status.ComponentStatus) == len(aw.Spec.Components) {
return nil
}

// Construct definitive PodSets from the Spec + InferPodSets and cache in the Status (to avoid clashing with user updates to the Spec via apply)
compStatus := make([]workloadv1beta2.AppWrapperComponentStatus, len(aw.Spec.Components))
for idx := range aw.Spec.Components {
if len(aw.Spec.Components[idx].DeclaredPodSets) > 0 {
compStatus[idx].PodSets = aw.Spec.Components[idx].DeclaredPodSets
} else {
obj := &unstructured.Unstructured{}
if _, _, err := unstructured.UnstructuredJSONScheme.Decode(aw.Spec.Components[idx].Template.Raw, nil, obj); err != nil {
// Transient error; Template.Raw was validated by our AdmissionController
return err
}
podSets, err := InferPodSets(obj)
if err != nil {
// Transient error; InferPodSets was validated by our AdmissionController
return err
}
compStatus[idx].PodSets = podSets
}
}
aw.Status.ComponentStatus = compStatus
return nil
}

// inferReplicas parses the value at the given path within obj as an int or return 1 or error
Expand Down
12 changes: 9 additions & 3 deletions test/e2e/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,15 +253,21 @@ func waitAWPodsDeleted(ctx context.Context, awNamespace string, awName string) e
}

func waitAWPodsReady(ctx context.Context, aw *workloadv1beta2.AppWrapper) error {
numExpected := utils.ExpectedPodCount(aw)
numExpected, err := utils.ExpectedPodCount(aw)
if err != nil {
return err
}
phases := []v1.PodPhase{v1.PodRunning, v1.PodSucceeded}
return wait.PollUntilContextTimeout(ctx, 100*time.Millisecond, 120*time.Second, true, podsInPhase(aw.Namespace, aw.Name, phases, numExpected))
}

func checkAllAWPodsReady(ctx context.Context, aw *workloadv1beta2.AppWrapper) bool {
numExpected := utils.ExpectedPodCount(aw)
numExpected, err := utils.ExpectedPodCount(aw)
if err != nil {
return false
}
phases := []v1.PodPhase{v1.PodRunning, v1.PodSucceeded}
err := wait.PollUntilContextTimeout(ctx, 100*time.Millisecond, 100*time.Millisecond, true, podsInPhase(aw.Namespace, aw.Name, phases, numExpected))
err = wait.PollUntilContextTimeout(ctx, 100*time.Millisecond, 100*time.Millisecond, true, podsInPhase(aw.Namespace, aw.Name, phases, numExpected))
return err == nil
}

Expand Down