Skip to content

Commit

Permalink
check readiness using kstatus
Browse files Browse the repository at this point in the history
This change replaces all the many functions and ways of calculating
readiness of objects into one unified way that uses kstatus.Compute() to
check if the object is in progress or current. Only the objects that are
current are considered to be ready. This takes advantage of the kstatus
compatibility of Flux's APIs and also makes sure that they remain
kstatus compatible.

The new isObjectReady() function is also aware of static/statusless
objects and knows how to check their readiness using kstatus. This
prepares the CLI for the upcoming static API objects.

All the is*Ready() functions for specific objects have been removed.

This change doesn't affect any of the existing tests results.

Introduce suspend and resume subcommands for alert-provider.

Signed-off-by: Sunny <darkowlzz@protonmail.com>
  • Loading branch information
darkowlzz committed Oct 17, 2023
1 parent ae0c3c8 commit 25a8559
Show file tree
Hide file tree
Showing 26 changed files with 478 additions and 446 deletions.
4 changes: 2 additions & 2 deletions cmd/flux/create.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,8 +131,8 @@ func (names apiType) upsertAndWait(object upsertWaitable, mutate func() error) e
}

logger.Waitingf("waiting for %s reconciliation", names.kind)
if err := wait.PollImmediate(rootArgs.pollInterval, rootArgs.timeout,
isReady(ctx, kubeClient, namespacedName, object)); err != nil {
if err := wait.PollUntilContextTimeout(ctx, rootArgs.pollInterval, rootArgs.timeout, true,
isObjectReadyConditionFunc(kubeClient, namespacedName, object.asClientObject())); err != nil {
return err
}
logger.Successf("%s reconciliation completed", names.kind)
Expand Down
25 changes: 2 additions & 23 deletions cmd/flux/create_alert.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (

"github.com/spf13/cobra"
"k8s.io/apimachinery/pkg/api/errors"
apimeta "k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/wait"
Expand Down Expand Up @@ -132,8 +131,8 @@ func createAlertCmdRun(cmd *cobra.Command, args []string) error {
}

logger.Waitingf("waiting for Alert reconciliation")
if err := wait.PollImmediate(rootArgs.pollInterval, rootArgs.timeout,
isAlertReady(ctx, kubeClient, namespacedName, &alert)); err != nil {
if err := wait.PollUntilContextTimeout(ctx, rootArgs.pollInterval, rootArgs.timeout, true,
isObjectReadyConditionFunc(kubeClient, namespacedName, &alert)); err != nil {
return err
}
logger.Successf("Alert %s is ready", name)
Expand Down Expand Up @@ -170,23 +169,3 @@ func upsertAlert(ctx context.Context, kubeClient client.Client,
logger.Successf("Alert updated")
return namespacedName, nil
}

func isAlertReady(ctx context.Context, kubeClient client.Client,
namespacedName types.NamespacedName, alert *notificationv1b2.Alert) wait.ConditionFunc {
return func() (bool, error) {
err := kubeClient.Get(ctx, namespacedName, alert)
if err != nil {
return false, err
}

if c := apimeta.FindStatusCondition(alert.Status.Conditions, meta.ReadyCondition); c != nil {
switch c.Status {
case metav1.ConditionTrue:
return true, nil
case metav1.ConditionFalse:
return false, fmt.Errorf(c.Message)
}
}
return false, nil
}
}
25 changes: 2 additions & 23 deletions cmd/flux/create_alertprovider.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (

"github.com/spf13/cobra"
"k8s.io/apimachinery/pkg/api/errors"
apimeta "k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/wait"
Expand Down Expand Up @@ -127,8 +126,8 @@ func createAlertProviderCmdRun(cmd *cobra.Command, args []string) error {
}

logger.Waitingf("waiting for Provider reconciliation")
if err := wait.PollImmediate(rootArgs.pollInterval, rootArgs.timeout,
isAlertProviderReady(ctx, kubeClient, namespacedName, &provider)); err != nil {
if err := wait.PollUntilContextTimeout(ctx, rootArgs.pollInterval, rootArgs.timeout, true,
isObjectReadyConditionFunc(kubeClient, namespacedName, &provider)); err != nil {
return err
}

Expand Down Expand Up @@ -167,23 +166,3 @@ func upsertAlertProvider(ctx context.Context, kubeClient client.Client,
logger.Successf("Provider updated")
return namespacedName, nil
}

func isAlertProviderReady(ctx context.Context, kubeClient client.Client,
namespacedName types.NamespacedName, provider *notificationv1.Provider) wait.ConditionFunc {
return func() (bool, error) {
err := kubeClient.Get(ctx, namespacedName, provider)
if err != nil {
return false, err
}

if c := apimeta.FindStatusCondition(provider.Status.Conditions, meta.ReadyCondition); c != nil {
switch c.Status {
case metav1.ConditionTrue:
return true, nil
case metav1.ConditionFalse:
return false, fmt.Errorf(c.Message)
}
}
return false, nil
}
}
22 changes: 2 additions & 20 deletions cmd/flux/create_helmrelease.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ import (
"github.com/spf13/cobra"
apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
"k8s.io/apimachinery/pkg/api/errors"
apimeta "k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/wait"
Expand Down Expand Up @@ -303,8 +302,8 @@ func createHelmReleaseCmdRun(cmd *cobra.Command, args []string) error {
}

logger.Waitingf("waiting for HelmRelease reconciliation")
if err := wait.PollImmediate(rootArgs.pollInterval, rootArgs.timeout,
isHelmReleaseReady(ctx, kubeClient, namespacedName, &helmRelease)); err != nil {
if err := wait.PollUntilContextTimeout(ctx, rootArgs.pollInterval, rootArgs.timeout, true,
isObjectReadyConditionFunc(kubeClient, namespacedName, &helmRelease)); err != nil {
return err
}
logger.Successf("HelmRelease %s is ready", name)
Expand Down Expand Up @@ -344,23 +343,6 @@ func upsertHelmRelease(ctx context.Context, kubeClient client.Client,
return namespacedName, nil
}

func isHelmReleaseReady(ctx context.Context, kubeClient client.Client,
namespacedName types.NamespacedName, helmRelease *helmv2.HelmRelease) wait.ConditionFunc {
return func() (bool, error) {
err := kubeClient.Get(ctx, namespacedName, helmRelease)
if err != nil {
return false, err
}

// Confirm the state we are observing is for the current generation
if helmRelease.Generation != helmRelease.Status.ObservedGeneration {
return false, nil
}

return apimeta.IsStatusConditionTrue(helmRelease.Status.Conditions, meta.ReadyCondition), nil
}
}

func validateStrategy(input string) bool {
allowedStrategy := []string{"Revision", "ChartVersion"}

Expand Down
30 changes: 2 additions & 28 deletions cmd/flux/create_kustomization.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (

"github.com/spf13/cobra"
"k8s.io/apimachinery/pkg/api/errors"
apimeta "k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/wait"
Expand Down Expand Up @@ -263,8 +262,8 @@ func createKsCmdRun(cmd *cobra.Command, args []string) error {
}

logger.Waitingf("waiting for Kustomization reconciliation")
if err := wait.PollImmediate(rootArgs.pollInterval, rootArgs.timeout,
isKustomizationReady(ctx, kubeClient, namespacedName, &kustomization)); err != nil {
if err := wait.PollUntilContextTimeout(ctx, rootArgs.pollInterval, rootArgs.timeout, true,
isObjectReadyConditionFunc(kubeClient, namespacedName, &kustomization)); err != nil {
return err
}
logger.Successf("Kustomization %s is ready", name)
Expand Down Expand Up @@ -303,28 +302,3 @@ func upsertKustomization(ctx context.Context, kubeClient client.Client,
logger.Successf("Kustomization updated")
return namespacedName, nil
}

func isKustomizationReady(ctx context.Context, kubeClient client.Client,
namespacedName types.NamespacedName, kustomization *kustomizev1.Kustomization) wait.ConditionFunc {
return func() (bool, error) {
err := kubeClient.Get(ctx, namespacedName, kustomization)
if err != nil {
return false, err
}

// Confirm the state we are observing is for the current generation
if kustomization.Generation != kustomization.Status.ObservedGeneration {
return false, nil
}

if c := apimeta.FindStatusCondition(kustomization.Status.Conditions, meta.ReadyCondition); c != nil {
switch c.Status {
case metav1.ConditionTrue:
return true, nil
case metav1.ConditionFalse:
return false, fmt.Errorf(c.Message)
}
}
return false, nil
}
}
25 changes: 2 additions & 23 deletions cmd/flux/create_receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (

"github.com/spf13/cobra"
"k8s.io/apimachinery/pkg/api/errors"
apimeta "k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/wait"
Expand Down Expand Up @@ -139,8 +138,8 @@ func createReceiverCmdRun(cmd *cobra.Command, args []string) error {
}

logger.Waitingf("waiting for Receiver reconciliation")
if err := wait.PollImmediate(rootArgs.pollInterval, rootArgs.timeout,
isReceiverReady(ctx, kubeClient, namespacedName, &receiver)); err != nil {
if err := wait.PollUntilContextTimeout(ctx, rootArgs.pollInterval, rootArgs.timeout, true,
isObjectReadyConditionFunc(kubeClient, namespacedName, &receiver)); err != nil {
return err
}
logger.Successf("Receiver %s is ready", name)
Expand Down Expand Up @@ -179,23 +178,3 @@ func upsertReceiver(ctx context.Context, kubeClient client.Client,
logger.Successf("Receiver updated")
return namespacedName, nil
}

func isReceiverReady(ctx context.Context, kubeClient client.Client,
namespacedName types.NamespacedName, receiver *notificationv1.Receiver) wait.ConditionFunc {
return func() (bool, error) {
err := kubeClient.Get(ctx, namespacedName, receiver)
if err != nil {
return false, err
}

if c := apimeta.FindStatusCondition(receiver.Status.Conditions, meta.ReadyCondition); c != nil {
switch c.Status {
case metav1.ConditionTrue:
return true, nil
case metav1.ConditionFalse:
return false, fmt.Errorf(c.Message)
}
}
return false, nil
}
}
32 changes: 2 additions & 30 deletions cmd/flux/create_source_bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ import (
"sigs.k8s.io/controller-runtime/pkg/client"

"github.com/fluxcd/pkg/apis/meta"
"github.com/fluxcd/pkg/runtime/conditions"

sourcev1 "github.com/fluxcd/source-controller/api/v1beta2"

Expand Down Expand Up @@ -204,8 +203,8 @@ func createSourceBucketCmdRun(cmd *cobra.Command, args []string) error {
}

logger.Waitingf("waiting for Bucket source reconciliation")
if err := wait.PollImmediate(rootArgs.pollInterval, rootArgs.timeout,
isBucketReady(ctx, kubeClient, namespacedName, bucket)); err != nil {
if err := wait.PollUntilContextTimeout(ctx, rootArgs.pollInterval, rootArgs.timeout, true,
isObjectReadyConditionFunc(kubeClient, namespacedName, bucket)); err != nil {
return err
}
logger.Successf("Bucket source reconciliation completed")
Expand Down Expand Up @@ -247,30 +246,3 @@ func upsertBucket(ctx context.Context, kubeClient client.Client,
logger.Successf("Bucket source updated")
return namespacedName, nil
}

func isBucketReady(ctx context.Context, kubeClient client.Client,
namespacedName types.NamespacedName, bucket *sourcev1.Bucket) wait.ConditionFunc {
return func() (bool, error) {
err := kubeClient.Get(ctx, namespacedName, bucket)
if err != nil {
return false, err
}

if c := conditions.Get(bucket, meta.ReadyCondition); c != nil {
// Confirm the Ready condition we are observing is for the
// current generation
if c.ObservedGeneration != bucket.GetGeneration() {
return false, nil
}

// Further check the Status
switch c.Status {
case metav1.ConditionTrue:
return true, nil
case metav1.ConditionFalse:
return false, fmt.Errorf(c.Message)
}
}
return false, nil
}
}
32 changes: 2 additions & 30 deletions cmd/flux/create_source_git.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ import (
"sigs.k8s.io/yaml"

"github.com/fluxcd/pkg/apis/meta"
"github.com/fluxcd/pkg/runtime/conditions"

sourcev1 "github.com/fluxcd/source-controller/api/v1"

Expand Down Expand Up @@ -325,8 +324,8 @@ func createSourceGitCmdRun(cmd *cobra.Command, args []string) error {
}

logger.Waitingf("waiting for GitRepository source reconciliation")
if err := wait.PollImmediate(rootArgs.pollInterval, rootArgs.timeout,
isGitRepositoryReady(ctx, kubeClient, namespacedName, &gitRepository)); err != nil {
if err := wait.PollUntilContextTimeout(ctx, rootArgs.pollInterval, rootArgs.timeout, true,
isObjectReadyConditionFunc(kubeClient, namespacedName, &gitRepository)); err != nil {
return err
}
logger.Successf("GitRepository source reconciliation completed")
Expand Down Expand Up @@ -368,30 +367,3 @@ func upsertGitRepository(ctx context.Context, kubeClient client.Client,
logger.Successf("GitRepository source updated")
return namespacedName, nil
}

func isGitRepositoryReady(ctx context.Context, kubeClient client.Client,
namespacedName types.NamespacedName, gitRepository *sourcev1.GitRepository) wait.ConditionFunc {
return func() (bool, error) {
err := kubeClient.Get(ctx, namespacedName, gitRepository)
if err != nil {
return false, err
}

if c := conditions.Get(gitRepository, meta.ReadyCondition); c != nil {
// Confirm the Ready condition we are observing is for the
// current generation
if c.ObservedGeneration != gitRepository.GetGeneration() {
return false, nil
}

// Further check the Status
switch c.Status {
case metav1.ConditionTrue:
return true, nil
case metav1.ConditionFalse:
return false, fmt.Errorf(c.Message)
}
}
return false, nil
}
}
11 changes: 11 additions & 0 deletions cmd/flux/create_source_git_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,12 +181,21 @@ func TestCreateSourceGit(t *testing.T) {
Time: time.Now(),
},
}
repo.Status.ObservedGeneration = repo.GetGeneration()
},
}, {
"Failed",
command,
assertError("failed message"),
func(repo *sourcev1.GitRepository) {
stalledCondition := metav1.Condition{
Type: meta.StalledCondition,
Status: metav1.ConditionTrue,
Reason: sourcev1.URLInvalidReason,
Message: "failed message",
ObservedGeneration: repo.GetGeneration(),
}
apimeta.SetStatusCondition(&repo.Status.Conditions, stalledCondition)
newCondition := metav1.Condition{
Type: meta.ReadyCondition,
Status: metav1.ConditionFalse,
Expand All @@ -195,6 +204,7 @@ func TestCreateSourceGit(t *testing.T) {
ObservedGeneration: repo.GetGeneration(),
}
apimeta.SetStatusCondition(&repo.Status.Conditions, newCondition)
repo.Status.ObservedGeneration = repo.GetGeneration()
},
}, {
"NoArtifact",
Expand All @@ -210,6 +220,7 @@ func TestCreateSourceGit(t *testing.T) {
ObservedGeneration: repo.GetGeneration(),
}
apimeta.SetStatusCondition(&repo.Status.Conditions, newCondition)
repo.Status.ObservedGeneration = repo.GetGeneration()
},
},
}
Expand Down
Loading

0 comments on commit 25a8559

Please sign in to comment.