Skip to content

Commit

Permalink
Merge pull request #8939 from Fedosin/clusterctl_context
Browse files Browse the repository at this point in the history
⚠️ Improve Context handling in clusterctl
  • Loading branch information
k8s-ci-robot authored Aug 18, 2023
2 parents 10e8e4f + dae1103 commit 82eff49
Show file tree
Hide file tree
Showing 115 changed files with 1,250 additions and 926 deletions.
6 changes: 0 additions & 6 deletions cmd/clusterctl/client/alpha/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,6 @@ limitations under the License.

package alpha

import "context"

var (
ctx = context.TODO()
)

// Client is the alpha client.
type Client interface {
Rollout() Rollout
Expand Down
9 changes: 5 additions & 4 deletions cmd/clusterctl/client/alpha/kubeadmcontrolplane.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package alpha

import (
"context"
"fmt"
"time"

Expand All @@ -29,7 +30,7 @@ import (
)

// getKubeadmControlPlane retrieves the KubeadmControlPlane object corresponding to the name and namespace specified.
func getKubeadmControlPlane(proxy cluster.Proxy, name, namespace string) (*controlplanev1.KubeadmControlPlane, error) {
func getKubeadmControlPlane(ctx context.Context, proxy cluster.Proxy, name, namespace string) (*controlplanev1.KubeadmControlPlane, error) {
kcpObj := &controlplanev1.KubeadmControlPlane{}
c, err := proxy.NewClient()
if err != nil {
Expand All @@ -47,13 +48,13 @@ func getKubeadmControlPlane(proxy cluster.Proxy, name, namespace string) (*contr
}

// setRolloutAfterOnKCP sets KubeadmControlPlane.spec.rolloutAfter.
func setRolloutAfterOnKCP(proxy cluster.Proxy, name, namespace string) error {
func setRolloutAfterOnKCP(ctx context.Context, proxy cluster.Proxy, name, namespace string) error {
patch := client.RawPatch(types.MergePatchType, []byte(fmt.Sprintf(`{"spec":{"rolloutAfter":"%v"}}`, time.Now().Format(time.RFC3339))))
return patchKubeadmControlPlane(proxy, name, namespace, patch)
return patchKubeadmControlPlane(ctx, proxy, name, namespace, patch)
}

// patchKubeadmControlPlane applies a patch to a KubeadmControlPlane.
func patchKubeadmControlPlane(proxy cluster.Proxy, name, namespace string, patch client.Patch) error {
func patchKubeadmControlPlane(ctx context.Context, proxy cluster.Proxy, name, namespace string, patch client.Patch) error {
cFrom, err := proxy.NewClient()
if err != nil {
return err
Expand Down
11 changes: 6 additions & 5 deletions cmd/clusterctl/client/alpha/machinedeployment.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package alpha

import (
"context"
"fmt"
"strconv"
"time"
Expand All @@ -35,7 +36,7 @@ import (
)

// getMachineDeployment retrieves the MachineDeployment object corresponding to the name and namespace specified.
func getMachineDeployment(proxy cluster.Proxy, name, namespace string) (*clusterv1.MachineDeployment, error) {
func getMachineDeployment(ctx context.Context, proxy cluster.Proxy, name, namespace string) (*clusterv1.MachineDeployment, error) {
mdObj := &clusterv1.MachineDeployment{}
c, err := proxy.NewClient()
if err != nil {
Expand All @@ -53,13 +54,13 @@ func getMachineDeployment(proxy cluster.Proxy, name, namespace string) (*cluster
}

// setRolloutAfterOnMachineDeployment sets MachineDeployment.spec.rolloutAfter.
func setRolloutAfterOnMachineDeployment(proxy cluster.Proxy, name, namespace string) error {
func setRolloutAfterOnMachineDeployment(ctx context.Context, proxy cluster.Proxy, name, namespace string) error {
patch := client.RawPatch(types.MergePatchType, []byte(fmt.Sprintf(`{"spec":{"rolloutAfter":"%v"}}`, time.Now().Format(time.RFC3339))))
return patchMachineDeployment(proxy, name, namespace, patch)
return patchMachineDeployment(ctx, proxy, name, namespace, patch)
}

// patchMachineDeployment applies a patch to a machinedeployment.
func patchMachineDeployment(proxy cluster.Proxy, name, namespace string, patch client.Patch) error {
func patchMachineDeployment(ctx context.Context, proxy cluster.Proxy, name, namespace string, patch client.Patch) error {
cFrom, err := proxy.NewClient()
if err != nil {
return err
Expand Down Expand Up @@ -118,7 +119,7 @@ func findMachineDeploymentRevision(toRevision int64, allMSs []*clusterv1.Machine
}

// getMachineSetsForDeployment returns a list of MachineSets associated with a MachineDeployment.
func getMachineSetsForDeployment(proxy cluster.Proxy, md *clusterv1.MachineDeployment) ([]*clusterv1.MachineSet, error) {
func getMachineSetsForDeployment(ctx context.Context, proxy cluster.Proxy, md *clusterv1.MachineDeployment) ([]*clusterv1.MachineSet, error) {
log := logf.Log
c, err := proxy.NewClient()
if err != nil {
Expand Down
10 changes: 6 additions & 4 deletions cmd/clusterctl/client/alpha/rollout.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ limitations under the License.
package alpha

import (
"context"

corev1 "k8s.io/api/core/v1"

"sigs.k8s.io/cluster-api/cmd/clusterctl/client/cluster"
Expand All @@ -40,10 +42,10 @@ var validRollbackResourceTypes = []string{

// Rollout defines the behavior of a rollout implementation.
type Rollout interface {
ObjectRestarter(cluster.Proxy, corev1.ObjectReference) error
ObjectPauser(cluster.Proxy, corev1.ObjectReference) error
ObjectResumer(cluster.Proxy, corev1.ObjectReference) error
ObjectRollbacker(cluster.Proxy, corev1.ObjectReference, int64) error
ObjectRestarter(context.Context, cluster.Proxy, corev1.ObjectReference) error
ObjectPauser(context.Context, cluster.Proxy, corev1.ObjectReference) error
ObjectResumer(context.Context, cluster.Proxy, corev1.ObjectReference) error
ObjectRollbacker(context.Context, cluster.Proxy, corev1.ObjectReference, int64) error
}

var _ Rollout = &rollout{}
Expand Down
19 changes: 10 additions & 9 deletions cmd/clusterctl/client/alpha/rollout_pauser.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package alpha

import (
"context"
"fmt"

"github.com/pkg/errors"
Expand All @@ -30,28 +31,28 @@ import (
)

// ObjectPauser will issue a pause on the specified cluster-api resource.
func (r *rollout) ObjectPauser(proxy cluster.Proxy, ref corev1.ObjectReference) error {
func (r *rollout) ObjectPauser(ctx context.Context, proxy cluster.Proxy, ref corev1.ObjectReference) error {
switch ref.Kind {
case MachineDeployment:
deployment, err := getMachineDeployment(proxy, ref.Name, ref.Namespace)
deployment, err := getMachineDeployment(ctx, proxy, ref.Name, ref.Namespace)
if err != nil || deployment == nil {
return errors.Wrapf(err, "failed to fetch %v/%v", ref.Kind, ref.Name)
}
if deployment.Spec.Paused {
return errors.Errorf("MachineDeployment is already paused: %v/%v\n", ref.Kind, ref.Name) //nolint:revive // MachineDeployment is intentionally capitalized.
}
if err := pauseMachineDeployment(proxy, ref.Name, ref.Namespace); err != nil {
if err := pauseMachineDeployment(ctx, proxy, ref.Name, ref.Namespace); err != nil {
return err
}
case KubeadmControlPlane:
kcp, err := getKubeadmControlPlane(proxy, ref.Name, ref.Namespace)
kcp, err := getKubeadmControlPlane(ctx, proxy, ref.Name, ref.Namespace)
if err != nil || kcp == nil {
return errors.Wrapf(err, "failed to fetch %v/%v", ref.Kind, ref.Name)
}
if annotations.HasPaused(kcp.GetObjectMeta()) {
return errors.Errorf("KubeadmControlPlane is already paused: %v/%v\n", ref.Kind, ref.Name) //nolint:revive // KubeadmControlPlane is intentionally capitalized.
}
if err := pauseKubeadmControlPlane(proxy, ref.Name, ref.Namespace); err != nil {
if err := pauseKubeadmControlPlane(ctx, proxy, ref.Name, ref.Namespace); err != nil {
return err
}
default:
Expand All @@ -61,13 +62,13 @@ func (r *rollout) ObjectPauser(proxy cluster.Proxy, ref corev1.ObjectReference)
}

// pauseMachineDeployment sets Paused to true in the MachineDeployment's spec.
func pauseMachineDeployment(proxy cluster.Proxy, name, namespace string) error {
func pauseMachineDeployment(ctx context.Context, proxy cluster.Proxy, name, namespace string) error {
patch := client.RawPatch(types.MergePatchType, []byte(fmt.Sprintf("{\"spec\":{\"paused\":%t}}", true)))
return patchMachineDeployment(proxy, name, namespace, patch)
return patchMachineDeployment(ctx, proxy, name, namespace, patch)
}

// pauseKubeadmControlPlane sets paused annotation to true.
func pauseKubeadmControlPlane(proxy cluster.Proxy, name, namespace string) error {
func pauseKubeadmControlPlane(ctx context.Context, proxy cluster.Proxy, name, namespace string) error {
patch := client.RawPatch(types.MergePatchType, []byte(fmt.Sprintf("{\"metadata\":{\"annotations\":{%q: \"%t\"}}}", clusterv1.PausedAnnotation, true)))
return patchKubeadmControlPlane(proxy, name, namespace, patch)
return patchKubeadmControlPlane(ctx, proxy, name, namespace, patch)
}
2 changes: 1 addition & 1 deletion cmd/clusterctl/client/alpha/rollout_pauser_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ func Test_ObjectPauser(t *testing.T) {
g := NewWithT(t)
r := newRolloutClient()
proxy := test.NewFakeProxy().WithObjs(tt.fields.objs...)
err := r.ObjectPauser(proxy, tt.fields.ref)
err := r.ObjectPauser(context.Background(), proxy, tt.fields.ref)
if tt.wantErr {
g.Expect(err).To(HaveOccurred())
return
Expand Down
11 changes: 6 additions & 5 deletions cmd/clusterctl/client/alpha/rollout_restarter.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package alpha

import (
"context"
"time"

"github.com/pkg/errors"
Expand All @@ -27,10 +28,10 @@ import (
)

// ObjectRestarter will issue a restart on the specified cluster-api resource.
func (r *rollout) ObjectRestarter(proxy cluster.Proxy, ref corev1.ObjectReference) error {
func (r *rollout) ObjectRestarter(ctx context.Context, proxy cluster.Proxy, ref corev1.ObjectReference) error {
switch ref.Kind {
case MachineDeployment:
deployment, err := getMachineDeployment(proxy, ref.Name, ref.Namespace)
deployment, err := getMachineDeployment(ctx, proxy, ref.Name, ref.Namespace)
if err != nil || deployment == nil {
return errors.Wrapf(err, "failed to fetch %v/%v", ref.Kind, ref.Name)
}
Expand All @@ -40,11 +41,11 @@ func (r *rollout) ObjectRestarter(proxy cluster.Proxy, ref corev1.ObjectReferenc
if deployment.Spec.RolloutAfter != nil && deployment.Spec.RolloutAfter.After(time.Now()) {
return errors.Errorf("can't update MachineDeployment (remove 'spec.rolloutAfter' first): %v/%v", ref.Kind, ref.Name)
}
if err := setRolloutAfterOnMachineDeployment(proxy, ref.Name, ref.Namespace); err != nil {
if err := setRolloutAfterOnMachineDeployment(ctx, proxy, ref.Name, ref.Namespace); err != nil {
return err
}
case KubeadmControlPlane:
kcp, err := getKubeadmControlPlane(proxy, ref.Name, ref.Namespace)
kcp, err := getKubeadmControlPlane(ctx, proxy, ref.Name, ref.Namespace)
if err != nil || kcp == nil {
return errors.Wrapf(err, "failed to fetch %v/%v", ref.Kind, ref.Name)
}
Expand All @@ -54,7 +55,7 @@ func (r *rollout) ObjectRestarter(proxy cluster.Proxy, ref corev1.ObjectReferenc
if kcp.Spec.RolloutAfter != nil && kcp.Spec.RolloutAfter.After(time.Now()) {
return errors.Errorf("can't update KubeadmControlPlane (remove 'spec.rolloutAfter' first): %v/%v", ref.Kind, ref.Name)
}
if err := setRolloutAfterOnKCP(proxy, ref.Name, ref.Namespace); err != nil {
if err := setRolloutAfterOnKCP(ctx, proxy, ref.Name, ref.Namespace); err != nil {
return err
}
default:
Expand Down
2 changes: 1 addition & 1 deletion cmd/clusterctl/client/alpha/rollout_restarter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ func Test_ObjectRestarter(t *testing.T) {
g := NewWithT(t)
r := newRolloutClient()
proxy := test.NewFakeProxy().WithObjs(tt.fields.objs...)
err := r.ObjectRestarter(proxy, tt.fields.ref)
err := r.ObjectRestarter(context.Background(), proxy, tt.fields.ref)
if tt.wantErr {
g.Expect(err).To(HaveOccurred())
return
Expand Down
19 changes: 10 additions & 9 deletions cmd/clusterctl/client/alpha/rollout_resumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package alpha

import (
"context"
"fmt"
"strings"

Expand All @@ -31,28 +32,28 @@ import (
)

// ObjectResumer will issue a resume on the specified cluster-api resource.
func (r *rollout) ObjectResumer(proxy cluster.Proxy, ref corev1.ObjectReference) error {
func (r *rollout) ObjectResumer(ctx context.Context, proxy cluster.Proxy, ref corev1.ObjectReference) error {
switch ref.Kind {
case MachineDeployment:
deployment, err := getMachineDeployment(proxy, ref.Name, ref.Namespace)
deployment, err := getMachineDeployment(ctx, proxy, ref.Name, ref.Namespace)
if err != nil || deployment == nil {
return errors.Wrapf(err, "failed to fetch %v/%v", ref.Kind, ref.Name)
}
if !deployment.Spec.Paused {
return errors.Errorf("MachineDeployment is not currently paused: %v/%v\n", ref.Kind, ref.Name) //nolint:revive // MachineDeployment is intentionally capitalized.
}
if err := resumeMachineDeployment(proxy, ref.Name, ref.Namespace); err != nil {
if err := resumeMachineDeployment(ctx, proxy, ref.Name, ref.Namespace); err != nil {
return err
}
case KubeadmControlPlane:
kcp, err := getKubeadmControlPlane(proxy, ref.Name, ref.Namespace)
kcp, err := getKubeadmControlPlane(ctx, proxy, ref.Name, ref.Namespace)
if err != nil || kcp == nil {
return errors.Wrapf(err, "failed to fetch %v/%v", ref.Kind, ref.Name)
}
if !annotations.HasPaused(kcp.GetObjectMeta()) {
return errors.Errorf("KubeadmControlPlane is not currently paused: %v/%v\n", ref.Kind, ref.Name) //nolint:revive // KubeadmControlPlane is intentionally capitalized.
}
if err := resumeKubeadmControlPlane(proxy, ref.Name, ref.Namespace); err != nil {
if err := resumeKubeadmControlPlane(ctx, proxy, ref.Name, ref.Namespace); err != nil {
return err
}
default:
Expand All @@ -62,17 +63,17 @@ func (r *rollout) ObjectResumer(proxy cluster.Proxy, ref corev1.ObjectReference)
}

// resumeMachineDeployment sets Paused to true in the MachineDeployment's spec.
func resumeMachineDeployment(proxy cluster.Proxy, name, namespace string) error {
func resumeMachineDeployment(ctx context.Context, proxy cluster.Proxy, name, namespace string) error {
patch := client.RawPatch(types.MergePatchType, []byte(fmt.Sprintf("{\"spec\":{\"paused\":%t}}", false)))

return patchMachineDeployment(proxy, name, namespace, patch)
return patchMachineDeployment(ctx, proxy, name, namespace, patch)
}

// resumeKubeadmControlPlane removes paused annotation.
func resumeKubeadmControlPlane(proxy cluster.Proxy, name, namespace string) error {
func resumeKubeadmControlPlane(ctx context.Context, proxy cluster.Proxy, name, namespace string) error {
// In the paused annotation we must replace slashes to ~1, see https://datatracker.ietf.org/doc/html/rfc6901#section-3.
pausedAnnotation := strings.Replace(clusterv1.PausedAnnotation, "/", "~1", -1)
patch := client.RawPatch(types.JSONPatchType, []byte(fmt.Sprintf("[{\"op\": \"remove\", \"path\": \"/metadata/annotations/%s\"}]", pausedAnnotation)))

return patchKubeadmControlPlane(proxy, name, namespace, patch)
return patchKubeadmControlPlane(ctx, proxy, name, namespace, patch)
}
2 changes: 1 addition & 1 deletion cmd/clusterctl/client/alpha/rollout_resumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ func Test_ObjectResumer(t *testing.T) {
g := NewWithT(t)
r := newRolloutClient()
proxy := test.NewFakeProxy().WithObjs(tt.fields.objs...)
err := r.ObjectResumer(proxy, tt.fields.ref)
err := r.ObjectResumer(context.Background(), proxy, tt.fields.ref)
if tt.wantErr {
g.Expect(err).To(HaveOccurred())
return
Expand Down
12 changes: 7 additions & 5 deletions cmd/clusterctl/client/alpha/rollout_rollbacker.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ limitations under the License.
package alpha

import (
"context"

"github.com/pkg/errors"
corev1 "k8s.io/api/core/v1"

Expand All @@ -27,17 +29,17 @@ import (
)

// ObjectRollbacker will issue a rollback on the specified cluster-api resource.
func (r *rollout) ObjectRollbacker(proxy cluster.Proxy, ref corev1.ObjectReference, toRevision int64) error {
func (r *rollout) ObjectRollbacker(ctx context.Context, proxy cluster.Proxy, ref corev1.ObjectReference, toRevision int64) error {
switch ref.Kind {
case MachineDeployment:
deployment, err := getMachineDeployment(proxy, ref.Name, ref.Namespace)
deployment, err := getMachineDeployment(ctx, proxy, ref.Name, ref.Namespace)
if err != nil || deployment == nil {
return errors.Wrapf(err, "failed to get %v/%v", ref.Kind, ref.Name)
}
if deployment.Spec.Paused {
return errors.Errorf("can't rollback a paused MachineDeployment: please run 'clusterctl rollout resume %v/%v' first", ref.Kind, ref.Name)
}
if err := rollbackMachineDeployment(proxy, deployment, toRevision); err != nil {
if err := rollbackMachineDeployment(ctx, proxy, deployment, toRevision); err != nil {
return err
}
default:
Expand All @@ -47,7 +49,7 @@ func (r *rollout) ObjectRollbacker(proxy cluster.Proxy, ref corev1.ObjectReferen
}

// rollbackMachineDeployment will rollback to a previous MachineSet revision used by this MachineDeployment.
func rollbackMachineDeployment(proxy cluster.Proxy, md *clusterv1.MachineDeployment, toRevision int64) error {
func rollbackMachineDeployment(ctx context.Context, proxy cluster.Proxy, md *clusterv1.MachineDeployment, toRevision int64) error {
log := logf.Log
c, err := proxy.NewClient()
if err != nil {
Expand All @@ -57,7 +59,7 @@ func rollbackMachineDeployment(proxy cluster.Proxy, md *clusterv1.MachineDeploym
if toRevision < 0 {
return errors.Errorf("revision number cannot be negative: %v", toRevision)
}
msList, err := getMachineSetsForDeployment(proxy, md)
msList, err := getMachineSetsForDeployment(ctx, proxy, md)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/clusterctl/client/alpha/rollout_rollbacker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,7 @@ func Test_ObjectRollbacker(t *testing.T) {
g := NewWithT(t)
r := newRolloutClient()
proxy := test.NewFakeProxy().WithObjs(tt.fields.objs...)
err := r.ObjectRollbacker(proxy, tt.fields.ref, tt.fields.toRevision)
err := r.ObjectRollbacker(context.Background(), proxy, tt.fields.ref, tt.fields.toRevision)
if tt.wantErr {
g.Expect(err).To(HaveOccurred())
return
Expand Down
Loading

0 comments on commit 82eff49

Please sign in to comment.