Skip to content

Commit

Permalink
✨ Add is-paused annotation
Browse files Browse the repository at this point in the history
  • Loading branch information
nojnhuh committed May 17, 2023
1 parent ba50476 commit bfa7c87
Show file tree
Hide file tree
Showing 7 changed files with 269 additions and 7 deletions.
7 changes: 7 additions & 0 deletions api/v1beta1/common_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,13 @@ const (
// on the reconciled object.
PausedAnnotation = "cluster.x-k8s.io/paused"

// IsPausedAnnotation is an annotation whose value indicates whether or not
// a Cluster resource is fully paused. It may differ from `spec.paused` when
// the Cluster is in the process of pausing or unpausing. This annotation is
// only expected to be set by the Cluster API controller, not the user. When
// not set, `spec.paused` is assumed to indicate the same status.
IsPausedAnnotation = "cluster.x-k8s.io/is-paused"

// DisableMachineCreateAnnotation is an annotation that can be used to signal a MachineSet to stop creating new machines.
// It is utilized in the OnDelete MachineDeploymentStrategy to allow the MachineDeployment controller to scale down
// older MachineSets when Machines are deleted and add the new replicas to the latest MachineSet.
Expand Down
58 changes: 58 additions & 0 deletions cmd/clusterctl/client/cluster/mover.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import (
"fmt"
"os"
"path/filepath"
"strconv"
"time"

"github.com/pkg/errors"
corev1 "k8s.io/api/core/v1"
Expand All @@ -32,6 +34,7 @@ import (
kerrors "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/version"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/klog/v2"
"sigs.k8s.io/controller-runtime/pkg/client"

Expand Down Expand Up @@ -565,6 +568,26 @@ func setClusterPause(proxy Proxy, clusters []*node, value bool, dryRun bool, mut
return errors.Wrapf(err, "error setting Cluster.Spec.Paused=%t", value)
}
}

// exponential backoff configuration which returns durations for a total time of ~2m.
// Example: 0, 5s, 8s, 11s, 17s, 26s, 38s, 57s, 86s, 128s
waitForPauseBackoff := wait.Backoff{
Duration: 5 * time.Second,
Factor: 1.5,
Steps: 10,
Jitter: 0.1,
}
for i := range clusters {
cluster := clusters[i]
log.V(5).Info("Waiting for cluster paused status", "Cluster", klog.KRef(cluster.identity.Namespace, cluster.identity.Name), "paused", value)

if err := retryWithExponentialBackoff(waitForPauseBackoff, checkClusterPaused(proxy, cluster, value, mutators...)); err != nil {
return errors.Wrapf(err, "error checking cluster paused is %t", value)
}

log.V(5).Info("Cluster paused status has been updated", "Cluster", klog.KRef(cluster.identity.Namespace, cluster.identity.Name), "paused", value)
}

return nil
}

Expand Down Expand Up @@ -691,6 +714,41 @@ func pauseClusterClass(proxy Proxy, n *node, pause bool, mutators ...ResourceMut
return nil
}

func checkClusterPaused(proxy Proxy, n *node, value bool, mutators ...ResourceMutatorFunc) func() error {
return func() error {
c, err := proxy.NewClient()
if err != nil {
return err
}

// Since the patch has been generated already in caller of this function, the ONLY affect that mutators can have
// here is on namespace of the resource.
clusterObj, err := applyMutators(&clusterv1.Cluster{
TypeMeta: metav1.TypeMeta{
Kind: clusterv1.ClusterKind,
APIVersion: clusterv1.GroupVersion.String(),
},
ObjectMeta: metav1.ObjectMeta{
Name: n.identity.Name,
Namespace: n.identity.Namespace,
},
}, mutators...)
if err != nil {
return err
}

if err := c.Get(ctx, client.ObjectKeyFromObject(clusterObj), clusterObj); err != nil {
return errors.Wrapf(err, "error getting Cluster %s/%s",
clusterObj.GetNamespace(), clusterObj.GetName())
}
isPaused, defined := clusterObj.GetAnnotations()[clusterv1.IsPausedAnnotation]
if defined && isPaused != strconv.FormatBool(value) {
return errors.Errorf("expected %q annotation to be %t, but is %s", clusterv1.IsPausedAnnotation, value, isPaused)
}
return nil
}
}

// ensureNamespaces ensures all the expected target namespaces are in place before creating objects.
func (o *objectMover) ensureNamespaces(graph *objectGraph, toProxy Proxy) error {
if o.dryRun {
Expand Down
94 changes: 94 additions & 0 deletions cmd/clusterctl/client/cluster/mover_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package cluster

import (
"context"
"fmt"
"os"
"path/filepath"
Expand All @@ -29,6 +30,7 @@ import (
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/utils/pointer"
"sigs.k8s.io/controller-runtime/pkg/client"
Expand Down Expand Up @@ -2240,3 +2242,95 @@ func Test_deleteSourceObject(t *testing.T) {
})
}
}

func TestCheckClusterPaused(t *testing.T) {
tests := []struct {
name string
pausedAnnotation string
value bool
wantErr bool
}{
{
name: "checking paused cluster is paused",
pausedAnnotation: "true",
value: true,
wantErr: false,
},
{
name: "checking paused cluster is unpaused",
pausedAnnotation: "true",
value: false,
wantErr: true,
},
{
name: "checking unpaused cluster is paused",
pausedAnnotation: "false",
value: true,
wantErr: true,
},
{
name: "checking unpaused cluster is unpaused",
pausedAnnotation: "false",
value: false,
wantErr: false,
},
{
name: "checking unannotated cluster is paused",
pausedAnnotation: "",
value: true,
wantErr: false,
},
{
name: "checking unannotated cluster is unpaused",
pausedAnnotation: "",
value: false,
wantErr: false,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
g := NewWithT(t)

clusterName := "foo"
clusterNamespace := "ns1"
objs := test.NewFakeCluster(clusterNamespace, clusterName).Objs()

// Create an objectGraph bound a source cluster with all the CRDs for the types involved in the test.
graph := getObjectGraphWithObjs(objs)

if tt.pausedAnnotation != "" {
c, err := graph.proxy.NewClient()
g.Expect(err).NotTo(HaveOccurred())

ctx := context.Background()
cluster := &clusterv1.Cluster{}
err = c.Get(ctx, types.NamespacedName{Namespace: clusterNamespace, Name: clusterName}, cluster)
g.Expect(err).NotTo(HaveOccurred())
anns := cluster.GetAnnotations()
if anns == nil {
anns = make(map[string]string)
}
anns[clusterv1.IsPausedAnnotation] = tt.pausedAnnotation
cluster.SetAnnotations(anns)

g.Expect(c.Update(ctx, cluster)).To(Succeed())
}

// Get all the types to be considered for discovery
g.Expect(getFakeDiscoveryTypes(graph)).To(Succeed())

// trigger discovery the content of the source cluster
g.Expect(graph.Discovery("")).To(Succeed())

node := graph.getClusters()[0]

err := checkClusterPaused(graph.proxy, node, tt.value)()
if tt.wantErr {
g.Expect(err).To(HaveOccurred())
} else {
g.Expect(err).NotTo(HaveOccurred())
}
})
}
}
4 changes: 3 additions & 1 deletion docs/book/src/clusterctl/provider-contract.md
Original file line number Diff line number Diff line change
Expand Up @@ -465,7 +465,9 @@ If moving some of excluded object is required, the provider authors should creat
exact move sequence to be executed by the user.

Additionally, provider authors should be aware that `clusterctl move` assumes all the provider's Controllers respect the
`Cluster.Spec.Paused` field introduced in the v1alpha3 Cluster API specification.
`Cluster.Spec.Paused` field introduced in the v1alpha3 Cluster API specification. `clusterctl move` will also wait for
the Cluster to be paused as indicated by the `cluster.x-k8s.io/is-paused=<true|false>` annotation when it exists for
providers that need to do extra work to ensure a Cluster is completely paused.

<aside class="note warning">

Expand Down
4 changes: 4 additions & 0 deletions docs/book/src/developer/providers/cluster-infrastructure.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@ A cluster infrastructure provider must define an API type for "infrastructure cl
1. The CRD name must have the format produced by `sigs.k8s.io/cluster-api/util/contract.CalculateCRDName(Group, Kind)`.
3. Must be namespace-scoped
4. Must have the standard Kubernetes "type metadata" and "object metadata"
1. OPTIONAL: Providers may add a `cluster.x-k8s.io/is-paused=<true|false>` annotation to indicate whether
or not reconciliation for infrastructure is paused. This can be used to block tooling like `clusterctl
move` when the provider needs to do extra work in reaction to a Cluster's `spec.paused` changing. When
not defined, the Cluster's `spec.paused` is assumed to indicate the same status.
5. Must have a `spec` field with the following:
1. Required fields:
1. `controlPlaneEndpoint` (`apiEndpoint`): the endpoint for the cluster's control plane. `apiEndpoint` is defined
Expand Down
24 changes: 18 additions & 6 deletions internal/controllers/cluster/cluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,12 +114,6 @@ func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (_ ctrl.Re
return ctrl.Result{}, err
}

// Return early if the object or Cluster is paused.
if annotations.IsPaused(cluster, cluster) {
log.Info("Reconciliation is paused for this object")
return ctrl.Result{}, nil
}

// Initialize the patch helper.
patchHelper, err := patch.NewHelper(cluster, r.Client)
if err != nil {
Expand All @@ -141,6 +135,24 @@ func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (_ ctrl.Re
}
}()

anns := cluster.GetAnnotations()
if anns == nil {
anns = make(map[string]string)
}
delete(anns, clusterv1.IsPausedAnnotation)
infraCluster, err := external.Get(ctx, r.Client, cluster.Spec.InfrastructureRef, cluster.Namespace)
if err == nil {
if isPaused, exists := infraCluster.GetAnnotations()[clusterv1.IsPausedAnnotation]; exists {
anns[clusterv1.IsPausedAnnotation] = isPaused
}
}
cluster.SetAnnotations(anns)
// Return early if the object or Cluster is paused.
if annotations.IsPaused(cluster, cluster) {
log.Info("Reconciliation is paused for this object")
return ctrl.Result{}, nil
}

// Add finalizer first if not exist to avoid the race condition between init and delete
if !controllerutil.ContainsFinalizer(cluster, clusterv1.ClusterFinalizer) {
controllerutil.AddFinalizer(cluster, clusterv1.ClusterFinalizer)
Expand Down
85 changes: 85 additions & 0 deletions internal/controllers/cluster/cluster_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -352,6 +352,91 @@ func TestClusterReconciler(t *testing.T) {
return conditions.IsTrue(cluster, clusterv1.ControlPlaneInitializedCondition)
}, timeout).Should(BeTrue())
})

t.Run("Should reflect the infra cluster's paused status", func(t *testing.T) {
g := NewWithT(t)

infraCluster := builder.InfrastructureCluster(ns.Name, "").Build()
infraCluster.SetGenerateName("test7-")
g.Expect(env.Create(ctx, infraCluster)).To(Succeed())

cluster := &clusterv1.Cluster{
ObjectMeta: metav1.ObjectMeta{
GenerateName: "test7-",
Namespace: ns.Name,
},
Spec: clusterv1.ClusterSpec{
InfrastructureRef: &corev1.ObjectReference{
Name: infraCluster.GetName(),
Namespace: infraCluster.GetNamespace(),
APIVersion: infraCluster.GetAPIVersion(),
Kind: infraCluster.GetKind(),
},
},
}

// Create the Cluster object.
g.Expect(env.Create(ctx, cluster)).To(Succeed())
key := client.ObjectKey{Namespace: cluster.Namespace, Name: cluster.Name}
defer func() {
g.Expect(env.Delete(ctx, cluster)).To(Succeed())
g.Expect(env.Delete(ctx, infraCluster)).To(Succeed())
}()

// Make sure the Cluster exists.
g.Eventually(func() []string {
if err := env.Get(ctx, key, cluster); err != nil {
return nil
}
return cluster.Finalizers
}, timeout).ShouldNot(BeEmpty())
g.Expect(cluster.GetAnnotations()).NotTo(HaveKey(clusterv1.IsPausedAnnotation))

// Add the is-paused annotation to the infra cluster.
g.Expect(env.Get(ctx, client.ObjectKeyFromObject(infraCluster), infraCluster)).To(Succeed())
anns := infraCluster.GetAnnotations()
if anns == nil {
anns = make(map[string]string)
}
anns[clusterv1.IsPausedAnnotation] = "true"
infraCluster.SetAnnotations(anns)
g.Expect(env.Update(ctx, infraCluster)).To(Succeed())

g.Eventually(func() map[string]string {
if err := env.Get(ctx, key, cluster); err != nil {
return nil
}
return cluster.GetAnnotations()
}, timeout).Should(HaveKeyWithValue(clusterv1.IsPausedAnnotation, "true"))

// Modify the is-paused annotation on the infra cluster.
g.Expect(env.Get(ctx, client.ObjectKeyFromObject(infraCluster), infraCluster)).To(Succeed())
anns = infraCluster.GetAnnotations()
anns[clusterv1.IsPausedAnnotation] = "false"
infraCluster.SetAnnotations(anns)
g.Expect(env.Update(ctx, infraCluster)).To(Succeed())

g.Eventually(func() map[string]string {
if err := env.Get(ctx, key, cluster); err != nil {
return nil
}
return cluster.GetAnnotations()
}, timeout).Should(HaveKeyWithValue(clusterv1.IsPausedAnnotation, "false"))

// Remove the is-paused annotation from the infra cluster.
g.Expect(env.Get(ctx, client.ObjectKeyFromObject(infraCluster), infraCluster)).To(Succeed())
anns = infraCluster.GetAnnotations()
delete(anns, clusterv1.IsPausedAnnotation)
infraCluster.SetAnnotations(anns)
g.Expect(env.Update(ctx, infraCluster)).To(Succeed())

g.Eventually(func() map[string]string {
if err := env.Get(ctx, key, cluster); err != nil {
return nil
}
return cluster.GetAnnotations()
}, timeout).ShouldNot(HaveKey(clusterv1.IsPausedAnnotation))
})
}

func TestClusterReconciler_reconcileDelete(t *testing.T) {
Expand Down

0 comments on commit bfa7c87

Please sign in to comment.