From ae23e48bb3aa5bd133ecc5e9fb0d7136e3f3227b Mon Sep 17 00:00:00 2001 From: Ashwin Venkatesh Date: Wed, 22 Jun 2022 11:59:16 -0400 Subject: [PATCH] Add support for version annotation on Peering Dialer and Acceptor. --- CHANGELOG.md | 5 + .../templates/crd-peeringacceptors.yaml | 5 + .../consul/templates/crd-peeringdialers.yaml | 5 + .../api/v1alpha1/peeringacceptor_types.go | 6 +- .../api/v1alpha1/peeringdialer_types.go | 6 +- .../api/v1alpha1/zz_generated.deepcopy.go | 10 + ...consul.hashicorp.com_peeringacceptors.yaml | 5 + .../consul.hashicorp.com_peeringdialers.yaml | 5 + control-plane/connect-inject/annotations.go | 4 + .../connect-inject/container_init.go | 5 + .../peering_acceptor_controller.go | 20 ++ .../peering_acceptor_controller_test.go | 229 +++++++++++++++- .../peering_dialer_controller.go | 43 ++- .../peering_dialer_controller_test.go | 252 +++++++++++++++++- 14 files changed, 583 insertions(+), 17 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index cddc301f6d..21c40a9c93 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,10 @@ ## UNRELEASED +FEATURES: +* [Experimental] Cluster Peering: + * Add support for secret watchers on the Peering Acceptor and Peering Dialer controllers. [[GH-1284](https://github.com/hashicorp/consul-k8s/pull/1284)] + * Add support for version annotation on the Peering Acceptor and Peering Dialer controllers. [[GH-1302](https://github.com/hashicorp/consul-k8s/pull/1302)] + ## 0.45.0 (June 17, 2022) FEATURES: * [Experimental] Cluster Peering: Support Consul cluster peering, which allows service connectivity between two independent clusters. diff --git a/charts/consul/templates/crd-peeringacceptors.yaml b/charts/consul/templates/crd-peeringacceptors.yaml index a5242ad311..6cd7090ef7 100644 --- a/charts/consul/templates/crd-peeringacceptors.yaml +++ b/charts/consul/templates/crd-peeringacceptors.yaml @@ -71,6 +71,11 @@ spec: description: LastReconcileTime is the last time the resource was reconciled. format: date-time type: string + latestPeeringVersion: + description: LatestPeeringVersion is the latest version of the resource + that was reconciled. + format: int64 + type: integer reconcileError: description: ReconcileError shows any errors during the last reconciliation of this resource. diff --git a/charts/consul/templates/crd-peeringdialers.yaml b/charts/consul/templates/crd-peeringdialers.yaml index f03a1790ad..d5dee45f61 100644 --- a/charts/consul/templates/crd-peeringdialers.yaml +++ b/charts/consul/templates/crd-peeringdialers.yaml @@ -71,6 +71,11 @@ spec: description: LastReconcileTime is the last time the resource was reconciled. format: date-time type: string + latestPeeringVersion: + description: LatestPeeringVersion is the latest version of the resource + that was reconciled. + format: int64 + type: integer reconcileError: description: ReconcileError shows any errors during the last reconciliation of this resource. diff --git a/control-plane/api/v1alpha1/peeringacceptor_types.go b/control-plane/api/v1alpha1/peeringacceptor_types.go index f6dd84c4e8..5dd70b0597 100644 --- a/control-plane/api/v1alpha1/peeringacceptor_types.go +++ b/control-plane/api/v1alpha1/peeringacceptor_types.go @@ -33,8 +33,6 @@ type PeeringAcceptorList struct { // PeeringAcceptorSpec defines the desired state of PeeringAcceptor. type PeeringAcceptorSpec struct { - // Important: Run "make" to regenerate code after modifying this file - // Peer describes the information needed to create a peering. Peer *Peer `json:"peer"` } @@ -55,8 +53,8 @@ type Secret struct { // PeeringAcceptorStatus defines the observed state of PeeringAcceptor. type PeeringAcceptorStatus struct { - // Important: Run "make" to regenerate code after modifying this file - + // LatestPeeringVersion is the latest version of the resource that was reconciled. + LatestPeeringVersion *uint64 `json:"latestPeeringVersion,omitempty"` // LastReconcileTime is the last time the resource was reconciled. // +optional LastReconcileTime *metav1.Time `json:"lastReconcileTime,omitempty" description:"last time the resource was reconciled"` diff --git a/control-plane/api/v1alpha1/peeringdialer_types.go b/control-plane/api/v1alpha1/peeringdialer_types.go index 99385ff0aa..0ffea7bee4 100644 --- a/control-plane/api/v1alpha1/peeringdialer_types.go +++ b/control-plane/api/v1alpha1/peeringdialer_types.go @@ -33,16 +33,14 @@ type PeeringDialerList struct { // PeeringDialerSpec defines the desired state of PeeringDialer. type PeeringDialerSpec struct { - // Important: Run "make" to regenerate code after modifying this file - // Peer describes the information needed to create a peering. Peer *Peer `json:"peer"` } // PeeringDialerStatus defines the observed state of PeeringDialer. type PeeringDialerStatus struct { - // Important: Run "make" to regenerate code after modifying this file - + // LatestPeeringVersion is the latest version of the resource that was reconciled. + LatestPeeringVersion *uint64 `json:"latestPeeringVersion,omitempty"` // LastReconcileTime is the last time the resource was reconciled. // +optional LastReconcileTime *metav1.Time `json:"lastReconcileTime,omitempty" description:"last time the resource was reconciled"` diff --git a/control-plane/api/v1alpha1/zz_generated.deepcopy.go b/control-plane/api/v1alpha1/zz_generated.deepcopy.go index 78b0e33959..4806d72e6b 100644 --- a/control-plane/api/v1alpha1/zz_generated.deepcopy.go +++ b/control-plane/api/v1alpha1/zz_generated.deepcopy.go @@ -916,6 +916,11 @@ func (in *PeeringAcceptorSpec) DeepCopy() *PeeringAcceptorSpec { // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *PeeringAcceptorStatus) DeepCopyInto(out *PeeringAcceptorStatus) { *out = *in + if in.LatestPeeringVersion != nil { + in, out := &in.LatestPeeringVersion, &out.LatestPeeringVersion + *out = new(uint64) + **out = **in + } if in.LastReconcileTime != nil { in, out := &in.LastReconcileTime, &out.LastReconcileTime *out = (*in).DeepCopy() @@ -1024,6 +1029,11 @@ func (in *PeeringDialerSpec) DeepCopy() *PeeringDialerSpec { // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *PeeringDialerStatus) DeepCopyInto(out *PeeringDialerStatus) { *out = *in + if in.LatestPeeringVersion != nil { + in, out := &in.LatestPeeringVersion, &out.LatestPeeringVersion + *out = new(uint64) + **out = **in + } if in.LastReconcileTime != nil { in, out := &in.LastReconcileTime, &out.LastReconcileTime *out = (*in).DeepCopy() diff --git a/control-plane/config/crd/bases/consul.hashicorp.com_peeringacceptors.yaml b/control-plane/config/crd/bases/consul.hashicorp.com_peeringacceptors.yaml index a4a00a7426..f2df1eb96b 100644 --- a/control-plane/config/crd/bases/consul.hashicorp.com_peeringacceptors.yaml +++ b/control-plane/config/crd/bases/consul.hashicorp.com_peeringacceptors.yaml @@ -64,6 +64,11 @@ spec: description: LastReconcileTime is the last time the resource was reconciled. format: date-time type: string + latestPeeringVersion: + description: LatestPeeringVersion is the latest version of the resource + that was reconciled. + format: int64 + type: integer reconcileError: description: ReconcileError shows any errors during the last reconciliation of this resource. diff --git a/control-plane/config/crd/bases/consul.hashicorp.com_peeringdialers.yaml b/control-plane/config/crd/bases/consul.hashicorp.com_peeringdialers.yaml index c2eef39627..8492acec3a 100644 --- a/control-plane/config/crd/bases/consul.hashicorp.com_peeringdialers.yaml +++ b/control-plane/config/crd/bases/consul.hashicorp.com_peeringdialers.yaml @@ -64,6 +64,11 @@ spec: description: LastReconcileTime is the last time the resource was reconciled. format: date-time type: string + latestPeeringVersion: + description: LatestPeeringVersion is the latest version of the resource + that was reconciled. + format: int64 + type: integer reconcileError: description: ReconcileError shows any errors during the last reconciliation of this resource. diff --git a/control-plane/connect-inject/annotations.go b/control-plane/connect-inject/annotations.go index 1c3c9420e4..777eaa1beb 100644 --- a/control-plane/connect-inject/annotations.go +++ b/control-plane/connect-inject/annotations.go @@ -142,6 +142,10 @@ const ( // webhook/meshWebhook. annotationOriginalPod = "consul.hashicorp.com/original-pod" + // annotationPeeringVersion is the version of the peering resource and can be utilized + // to explicitly perform the peering operation again. + annotationPeeringVersion = "consul.hashicorp.com/peering-version" + // labelServiceIgnore is a label that can be added to a service to prevent it from being // registered with Consul. labelServiceIgnore = "consul.hashicorp.com/service-ignore" diff --git a/control-plane/connect-inject/container_init.go b/control-plane/connect-inject/container_init.go index 66ab836863..bae4592397 100644 --- a/control-plane/connect-inject/container_init.go +++ b/control-plane/connect-inject/container_init.go @@ -326,6 +326,11 @@ func pointerToInt64(i int64) *int64 { return &i } +// pointerToUInt64 takes an int64 and returns a pointer to it. +func pointerToUint64(i uint64) *uint64 { + return &i +} + // pointerToBool takes a bool and returns a pointer to it. func pointerToBool(b bool) *bool { return &b diff --git a/control-plane/connect-inject/peering_acceptor_controller.go b/control-plane/connect-inject/peering_acceptor_controller.go index 8830546aed..680dc5cc5f 100644 --- a/control-plane/connect-inject/peering_acceptor_controller.go +++ b/control-plane/connect-inject/peering_acceptor_controller.go @@ -3,6 +3,7 @@ package connectinject import ( "context" "errors" + "strconv" "time" "github.com/go-logr/logr" @@ -214,6 +215,15 @@ func shouldGenerateToken(acceptor *consulv1alpha1.PeeringAcceptor, existingStatu if acceptor.SecretRef().Backend != acceptor.Secret().Backend { return false, false, errors.New("PeeringAcceptor backend cannot be changed") } + if peeringVersionString, ok := acceptor.Annotations[annotationPeeringVersion]; ok { + peeringVersion, err := strconv.ParseUint(peeringVersionString, 10, 64) + if err != nil { + return false, false, err + } + if acceptor.Status.LatestPeeringVersion == nil || *acceptor.Status.LatestPeeringVersion < peeringVersion { + return true, false, nil + } + } // Compare the existing secret resource version. // Get the secret specified by the status, make sure it matches the status' secret.ResourceVersion. if existingStatusSecret != nil { @@ -238,6 +248,16 @@ func (r *PeeringAcceptorController) updateStatus(ctx context.Context, acceptor * Error: pointerToBool(false), Message: pointerToString(""), } + if peeringVersionString, ok := acceptor.Annotations[annotationPeeringVersion]; ok { + peeringVersion, err := strconv.ParseUint(peeringVersionString, 10, 64) + if err != nil { + r.Log.Error(err, "failed to update PeeringAcceptor status", "name", acceptor.Name, "namespace", acceptor.Namespace) + return err + } + if acceptor.Status.LatestPeeringVersion == nil || *acceptor.Status.LatestPeeringVersion < peeringVersion { + acceptor.Status.LatestPeeringVersion = pointerToUint64(peeringVersion) + } + } err := r.Status().Update(ctx, acceptor) if err != nil { r.Log.Error(err, "failed to update PeeringAcceptor status", "name", acceptor.Name, "namespace", acceptor.Namespace) diff --git a/control-plane/connect-inject/peering_acceptor_controller_test.go b/control-plane/connect-inject/peering_acceptor_controller_test.go index 1f53bcc516..69599f84a5 100644 --- a/control-plane/connect-inject/peering_acceptor_controller_test.go +++ b/control-plane/connect-inject/peering_acceptor_controller_test.go @@ -24,8 +24,8 @@ import ( "sigs.k8s.io/controller-runtime/pkg/reconcile" ) -// TestReconcileCreateUpdatePeeringAcceptor creates a peering acceptor. -func TestReconcileCreateUpdatePeeringAcceptor(t *testing.T) { +// TestReconcile_CreateUpdatePeeringAcceptor creates a peering acceptor. +func TestReconcile_CreateUpdatePeeringAcceptor(t *testing.T) { t.Parallel() nodeName := "test-node" cases := []struct { @@ -202,6 +202,69 @@ func TestReconcileCreateUpdatePeeringAcceptor(t *testing.T) { }, initialConsulPeerName: "acceptor-created", }, + { + name: "PeeringAcceptor version annotation is updated", + k8sObjects: func() []runtime.Object { + acceptor := &v1alpha1.PeeringAcceptor{ + ObjectMeta: metav1.ObjectMeta{ + Name: "acceptor-created", + Namespace: "default", + Annotations: map[string]string{ + annotationPeeringVersion: "2", + }, + }, + Spec: v1alpha1.PeeringAcceptorSpec{ + Peer: &v1alpha1.Peer{ + Secret: &v1alpha1.Secret{ + Name: "acceptor-created-secret", + Key: "data", + Backend: "kubernetes", + }, + }, + }, + Status: v1alpha1.PeeringAcceptorStatus{ + SecretRef: &v1alpha1.SecretRefStatus{ + Secret: v1alpha1.Secret{ + Name: "acceptor-created-secret", + Key: "data", + Backend: "kubernetes", + }, + ResourceVersion: "some-old-sha", + }, + }, + } + secret := createSecret("acceptor-created-secret", "default", "data", "some-old-data") + return []runtime.Object{acceptor, secret} + }, + expectedStatus: &v1alpha1.PeeringAcceptorStatus{ + SecretRef: &v1alpha1.SecretRefStatus{ + Secret: v1alpha1.Secret{ + Name: "acceptor-created-secret", + Key: "data", + Backend: "kubernetes", + }, + }, + LatestPeeringVersion: pointerToUint64(2), + }, + expectedConsulPeerings: []*api.Peering{ + { + Name: "acceptor-created", + }, + }, + expectedK8sSecrets: func() []*corev1.Secret { + secret := &corev1.Secret{ + ObjectMeta: metav1.ObjectMeta{ + Name: "acceptor-created-secret", + Namespace: "default", + }, + StringData: map[string]string{ + "data": "tokenstub", + }, + } + return []*corev1.Secret{secret} + }, + initialConsulPeerName: "acceptor-created", + }, { name: "PeeringAcceptor status secret exists and there's no peering in Consul", k8sObjects: func() []runtime.Object { @@ -422,6 +485,7 @@ func TestReconcileCreateUpdatePeeringAcceptor(t *testing.T) { require.Equal(t, tt.expectedStatus.SecretRef.Name, acceptor.SecretRef().Name) require.Equal(t, tt.expectedStatus.SecretRef.Key, acceptor.SecretRef().Key) require.Equal(t, tt.expectedStatus.SecretRef.Backend, acceptor.SecretRef().Backend) + require.Equal(t, tt.expectedStatus.LatestPeeringVersion, acceptor.Status.LatestPeeringVersion) } // Check that old secret was deleted. if tt.expectDeletedK8sSecret != nil { @@ -433,14 +497,13 @@ func TestReconcileCreateUpdatePeeringAcceptor(t *testing.T) { t.Error("old secret should have been deleted but was not") } } - }) } } -// TestReconcileDeletePeeringAcceptor reconciles a PeeringAcceptor resource that is no longer in Kubernetes, but still +// TestReconcile_DeletePeeringAcceptor reconciles a PeeringAcceptor resource that is no longer in Kubernetes, but still // exists in Consul. -func TestReconcileDeletePeeringAcceptor(t *testing.T) { +func TestReconcile_DeletePeeringAcceptor(t *testing.T) { // Add the default namespace. ns := corev1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: "default"}} acceptor := &v1alpha1.PeeringAcceptor{ @@ -520,6 +583,162 @@ func TestReconcileDeletePeeringAcceptor(t *testing.T) { require.EqualError(t, err, `secrets "acceptor-deleted" not found`) } +// TestReconcile_AcceptorVersionAnnotation tests the behavior of Reconcile for various +// scenarios involving the user setting the version annotation. +func TestReconcile_VersionAnnotation(t *testing.T) { + t.Parallel() + nodeName := "test-node" + cases := map[string]struct { + annotations map[string]string + expErr string + expectedStatus *v1alpha1.PeeringAcceptorStatus + }{ + "fails if annotation is not a number": { + annotations: map[string]string{ + annotationPeeringVersion: "foo", + }, + expErr: `strconv.ParseUint: parsing "foo": invalid syntax`, + }, + "is no/op if annotation value is less than value in status": { + annotations: map[string]string{ + annotationPeeringVersion: "2", + }, + expectedStatus: &v1alpha1.PeeringAcceptorStatus{ + SecretRef: &v1alpha1.SecretRefStatus{ + Secret: v1alpha1.Secret{ + Name: "acceptor-created-secret", + Key: "data", + Backend: "kubernetes", + }, + ResourceVersion: "some-old-sha", + }, + LatestPeeringVersion: pointerToUint64(3), + }, + }, + "is no/op if annotation value is equal to value in status": { + annotations: map[string]string{ + annotationPeeringVersion: "3", + }, + expectedStatus: &v1alpha1.PeeringAcceptorStatus{ + SecretRef: &v1alpha1.SecretRefStatus{ + Secret: v1alpha1.Secret{ + Name: "acceptor-created-secret", + Key: "data", + Backend: "kubernetes", + }, + ResourceVersion: "some-old-sha", + }, + LatestPeeringVersion: pointerToUint64(3), + }, + }, + "updates if annotation value is greater than value in status": { + annotations: map[string]string{ + annotationPeeringVersion: "4", + }, + expectedStatus: &v1alpha1.PeeringAcceptorStatus{ + SecretRef: &v1alpha1.SecretRefStatus{ + Secret: v1alpha1.Secret{ + Name: "acceptor-created-secret", + Key: "data", + Backend: "kubernetes", + }, + }, + LatestPeeringVersion: pointerToUint64(4), + }, + }, + } + for name, tt := range cases { + t.Run(name, func(t *testing.T) { + ns := &corev1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: "default"}} + acceptor := &v1alpha1.PeeringAcceptor{ + ObjectMeta: metav1.ObjectMeta{ + Name: "acceptor-created", + Namespace: "default", + Annotations: tt.annotations, + }, + Spec: v1alpha1.PeeringAcceptorSpec{ + Peer: &v1alpha1.Peer{ + Secret: &v1alpha1.Secret{ + Name: "acceptor-created-secret", + Key: "data", + Backend: "kubernetes", + }, + }, + }, + Status: v1alpha1.PeeringAcceptorStatus{ + SecretRef: &v1alpha1.SecretRefStatus{ + Secret: v1alpha1.Secret{ + Name: "acceptor-created-secret", + Key: "data", + Backend: "kubernetes", + }, + ResourceVersion: "some-old-sha", + }, + LatestPeeringVersion: pointerToUint64(3), + }, + } + secret := createSecret("acceptor-created-secret", "default", "data", "some-data") + // Create fake k8s client + k8sObjects := []runtime.Object{acceptor, secret, ns} + + s := scheme.Scheme + s.AddKnownTypes(v1alpha1.GroupVersion, &v1alpha1.PeeringAcceptor{}, &v1alpha1.PeeringAcceptorList{}) + fakeClient := fake.NewClientBuilder().WithScheme(s).WithRuntimeObjects(k8sObjects...).Build() + + // Create test consul server. + consul, err := testutil.NewTestServerConfigT(t, func(c *testutil.TestServerConfig) { + c.NodeName = nodeName + }) + require.NoError(t, err) + defer consul.Stop() + consul.WaitForServiceIntentions(t) + + cfg := &api.Config{ + Address: consul.HTTPAddr, + } + consulClient, err := api.NewClient(cfg) + require.NoError(t, err) + + _, _, err = consulClient.Peerings().GenerateToken(context.Background(), api.PeeringGenerateTokenRequest{PeerName: "acceptor-created"}, nil) + require.NoError(t, err) + + // Create the peering acceptor controller + controller := &PeeringAcceptorController{ + Client: fakeClient, + Log: logrtest.TestLogger{T: t}, + ConsulClient: consulClient, + Scheme: s, + } + namespacedName := types.NamespacedName{ + Name: "acceptor-created", + Namespace: "default", + } + + resp, err := controller.Reconcile(context.Background(), ctrl.Request{ + NamespacedName: namespacedName, + }) + if tt.expErr != "" { + require.EqualError(t, err, tt.expErr) + } else { + require.NoError(t, err) + } + require.False(t, resp.Requeue) + + // Get the reconciled PeeringAcceptor and make assertions on the status + acceptor = &v1alpha1.PeeringAcceptor{} + err = fakeClient.Get(context.Background(), namespacedName, acceptor) + require.NoError(t, err) + require.Contains(t, acceptor.Finalizers, FinalizerName) + if tt.expectedStatus != nil { + require.Equal(t, tt.expectedStatus.SecretRef.Name, acceptor.SecretRef().Name) + require.Equal(t, tt.expectedStatus.SecretRef.Key, acceptor.SecretRef().Key) + require.Equal(t, tt.expectedStatus.SecretRef.Backend, acceptor.SecretRef().Backend) + require.Equal(t, tt.expectedStatus.LatestPeeringVersion, acceptor.Status.LatestPeeringVersion) + } + }) + } +} + func TestShouldGenerateToken(t *testing.T) { cases := []struct { name string diff --git a/control-plane/connect-inject/peering_dialer_controller.go b/control-plane/connect-inject/peering_dialer_controller.go index 1a62ae1b91..a5f3e19094 100644 --- a/control-plane/connect-inject/peering_dialer_controller.go +++ b/control-plane/connect-inject/peering_dialer_controller.go @@ -3,6 +3,7 @@ package connectinject import ( "context" "errors" + "strconv" "time" "github.com/go-logr/logr" @@ -161,7 +162,7 @@ func (r *PeeringDialerController) Reconcile(ctx context.Context, req ctrl.Reques // Or, if the peering in Consul does exist, compare it to the spec's secret. If there's any // differences, initiate peering. if r.specStatusSecretsDifferent(dialer, specSecret) { - r.Log.Info("the secret in status.secretRef exists and is different from spec.peer.secret; establishing peering with the existing spec.peer.secret", "secret-name", dialer.Secret().Name, "secret-namespace", dialer.Namespace) + r.Log.Info("the version annotation was incremented; re-establishing peering with spec.peer.secret", "secret-name", dialer.Secret().Name, "secret-namespace", dialer.Namespace) peeringToken := specSecret.Data[dialer.Secret().Key] if err := r.establishPeering(ctx, dialer.Name, string(peeringToken)); err != nil { r.updateStatusError(ctx, dialer, err) @@ -171,6 +172,21 @@ func (r *PeeringDialerController) Reconcile(ctx context.Context, req ctrl.Reques return ctrl.Result{}, err } } + + if updated, err := r.versionAnnotationUpdated(dialer); err == nil && updated { + r.Log.Info("status.secret exists, but the peering doesn't exist in Consul; establishing peering with the existing spec.peer.secret", "secret-name", dialer.Secret().Name, "secret-namespace", dialer.Namespace) + peeringToken := specSecret.Data[dialer.Secret().Key] + if err := r.establishPeering(ctx, dialer.Name, string(peeringToken)); err != nil { + r.updateStatusError(ctx, dialer, err) + return ctrl.Result{}, err + } else { + err := r.updateStatus(ctx, dialer, specSecret.ResourceVersion) + return ctrl.Result{}, err + } + } else if err != nil { + r.updateStatusError(ctx, dialer, err) + return ctrl.Result{}, err + } } return ctrl.Result{}, nil @@ -199,6 +215,16 @@ func (r *PeeringDialerController) updateStatus(ctx context.Context, dialer *cons Error: pointerToBool(false), Message: pointerToString(""), } + if peeringVersionString, ok := dialer.Annotations[annotationPeeringVersion]; ok { + peeringVersion, err := strconv.ParseUint(peeringVersionString, 10, 64) + if err != nil { + r.Log.Error(err, "failed to update PeeringDialer status", "name", dialer.Name, "namespace", dialer.Namespace) + return err + } + if dialer.Status.LatestPeeringVersion == nil || *dialer.Status.LatestPeeringVersion < peeringVersion { + dialer.Status.LatestPeeringVersion = pointerToUint64(peeringVersion) + } + } err := r.Status().Update(ctx, dialer) if err != nil { r.Log.Error(err, "failed to update PeeringDialer status", "name", dialer.Name, "namespace", dialer.Namespace) @@ -267,6 +293,19 @@ func (r *PeeringDialerController) deletePeering(ctx context.Context, peerName st return nil } +func (r *PeeringDialerController) versionAnnotationUpdated(dialer *consulv1alpha1.PeeringDialer) (bool, error) { + if peeringVersionString, ok := dialer.Annotations[annotationPeeringVersion]; ok { + peeringVersion, err := strconv.ParseUint(peeringVersionString, 10, 64) + if err != nil { + return false, err + } + if dialer.Status.LatestPeeringVersion == nil || *dialer.Status.LatestPeeringVersion < peeringVersion { + return true, nil + } + } + return false, nil +} + // requestsForPeeringTokens creates a slice of requests for the peering dialer controller. // It enqueues a request for each dialer that needs to be reconciled. It iterates through // the list of dialers and creates a request for the dialer that has the same secret as it's @@ -291,7 +330,7 @@ func (r *PeeringDialerController) requestsForPeeringTokens(object client.Object) return []ctrl.Request{} } -// filterPeeringAcceptors receives meta and object information for Kubernetes resources that are being watched, +// filterPeeringDialers receives meta and object information for Kubernetes resources that are being watched, // which in this case are Secrets. It only returns true if the Secret is a Peering Token Secret. It reads the labels // from the meta of the resource and uses the values of the "consul.hashicorp.com/peering-token" label to validate that // the Secret is a Peering Token Secret. diff --git a/control-plane/connect-inject/peering_dialer_controller_test.go b/control-plane/connect-inject/peering_dialer_controller_test.go index 4715840500..bc530cbdce 100644 --- a/control-plane/connect-inject/peering_dialer_controller_test.go +++ b/control-plane/connect-inject/peering_dialer_controller_test.go @@ -26,8 +26,8 @@ import ( "sigs.k8s.io/controller-runtime/pkg/reconcile" ) -// TestReconcileCreateUpdatePeeringDialer creates a peering dialer. -func TestReconcileCreateUpdatePeeringDialer(t *testing.T) { +// TestReconcile_CreateUpdatePeeringDialer creates a peering dialer. +func TestReconcile_CreateUpdatePeeringDialer(t *testing.T) { t.Parallel() nodeName := "test-node" node2Name := "test-node2" @@ -213,6 +213,58 @@ func TestReconcileCreateUpdatePeeringDialer(t *testing.T) { }, peeringExists: true, }, + "Initiates peering when version annotation is set": { + peeringName: "peering", + k8sObjects: func() []runtime.Object { + dialer := &v1alpha1.PeeringDialer{ + ObjectMeta: metav1.ObjectMeta{ + Name: "peering", + Namespace: "default", + Annotations: map[string]string{ + annotationPeeringVersion: "2", + }, + }, + Spec: v1alpha1.PeeringDialerSpec{ + Peer: &v1alpha1.Peer{ + Secret: &v1alpha1.Secret{ + Name: "dialer-token", + Key: "token", + Backend: "kubernetes", + }, + }, + }, + Status: v1alpha1.PeeringDialerStatus{ + SecretRef: &v1alpha1.SecretRefStatus{ + Secret: v1alpha1.Secret{ + Name: "dialer-token", + Key: "token", + Backend: "kubernetes", + }, + ResourceVersion: "test-version", + }, + }, + } + return []runtime.Object{dialer} + }, + expectedConsulPeerings: &api.Peering{ + Name: "peering", + State: api.PeeringStateActive, + }, + peeringSecret: func(token string) *corev1.Secret { + return createSecret("dialer-token", "default", "token", token) + }, + expectedStatus: &v1alpha1.PeeringDialerStatus{ + SecretRef: &v1alpha1.SecretRefStatus{ + Secret: v1alpha1.Secret{ + Name: "dialer-token", + Key: "token", + Backend: "kubernetes", + }, + }, + LatestPeeringVersion: pointerToUint64(2), + }, + peeringExists: true, + }, } for name, tt := range cases { t.Run(name, func(t *testing.T) { @@ -327,6 +379,7 @@ func TestReconcileCreateUpdatePeeringDialer(t *testing.T) { require.Equal(t, tt.expectedStatus.SecretRef.Key, dialer.SecretRef().Key) require.Equal(t, tt.expectedStatus.SecretRef.Backend, dialer.SecretRef().Backend) require.Equal(t, "latest-version", dialer.SecretRef().ResourceVersion) + require.Equal(t, tt.expectedStatus.LatestPeeringVersion, dialer.Status.LatestPeeringVersion) require.Contains(t, dialer.Finalizers, FinalizerName) require.NotEmpty(t, dialer.SecretRef().ResourceVersion) require.NotEqual(t, "test-version", dialer.SecretRef().ResourceVersion) @@ -336,6 +389,201 @@ func TestReconcileCreateUpdatePeeringDialer(t *testing.T) { } } +func TestReconcile_VersionAnnotationPeeringDialer(t *testing.T) { + t.Parallel() + nodeName := "test-node" + node2Name := "test-node2" + cases := map[string]struct { + annotations map[string]string + expErr string + expectedStatus *v1alpha1.PeeringDialerStatus + }{ + "fails if annotation is not a number": { + annotations: map[string]string{ + annotationPeeringVersion: "foo", + }, + expErr: `strconv.ParseUint: parsing "foo": invalid syntax`, + }, + "is no/op if annotation value is less than value in status": { + annotations: map[string]string{ + annotationPeeringVersion: "2", + }, + expectedStatus: &v1alpha1.PeeringDialerStatus{ + SecretRef: &v1alpha1.SecretRefStatus{ + Secret: v1alpha1.Secret{ + Name: "dialer-token", + Key: "token", + Backend: "kubernetes", + }, + }, + LatestPeeringVersion: pointerToUint64(3), + }, + }, + "is no/op if annotation value is equal to value in status": { + annotations: map[string]string{ + annotationPeeringVersion: "3", + }, + expectedStatus: &v1alpha1.PeeringDialerStatus{ + SecretRef: &v1alpha1.SecretRefStatus{ + Secret: v1alpha1.Secret{ + Name: "dialer-token", + Key: "token", + Backend: "kubernetes", + }, + }, + LatestPeeringVersion: pointerToUint64(3), + }, + }, + "updates if annotation value is greater than value in status": { + annotations: map[string]string{ + annotationPeeringVersion: "4", + }, + expectedStatus: &v1alpha1.PeeringDialerStatus{ + SecretRef: &v1alpha1.SecretRefStatus{ + Secret: v1alpha1.Secret{ + Name: "dialer-token", + Key: "token", + Backend: "kubernetes", + }, + }, + LatestPeeringVersion: pointerToUint64(4), + }, + }, + } + for name, tt := range cases { + t.Run(name, func(t *testing.T) { + + // Create test consul server. + acceptorPeerServer, err := testutil.NewTestServerConfigT(t, func(c *testutil.TestServerConfig) { + c.NodeName = nodeName + }) + require.NoError(t, err) + defer acceptorPeerServer.Stop() + acceptorPeerServer.WaitForServiceIntentions(t) + + cfg := &api.Config{ + Address: acceptorPeerServer.HTTPAddr, + } + acceptorClient, err := api.NewClient(cfg) + require.NoError(t, err) + + ns := &corev1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: "default"}} + dialer := &v1alpha1.PeeringDialer{ + ObjectMeta: metav1.ObjectMeta{ + Name: "peering", + Namespace: "default", + Annotations: tt.annotations, + }, + Spec: v1alpha1.PeeringDialerSpec{ + Peer: &v1alpha1.Peer{ + Secret: &v1alpha1.Secret{ + Name: "dialer-token", + Key: "token", + Backend: "kubernetes", + }, + }, + }, + Status: v1alpha1.PeeringDialerStatus{ + SecretRef: &v1alpha1.SecretRefStatus{ + Secret: v1alpha1.Secret{ + Name: "dialer-token", + Key: "token", + Backend: "kubernetes", + }, + ResourceVersion: "latest-version", + }, + LatestPeeringVersion: pointerToUint64(3), + }, + } + // Create fake k8s client + k8sObjects := []runtime.Object{dialer, ns} + + // This is responsible for updating the token generated by the acceptor side with the IP + // of the Consul server as the generated token currently does not have that set on it. + var encodedPeeringToken string + var token struct { + CA string + ServerAddresses []string + ServerName string + PeerID string + } + // Create the initial token. + baseToken, _, err := acceptorClient.Peerings().GenerateToken(context.Background(), api.PeeringGenerateTokenRequest{PeerName: "peering"}, nil) + require.NoError(t, err) + // Decode the token to extract the ServerName and PeerID from the token. CA is always NULL. + decodeBytes, err := base64.StdEncoding.DecodeString(baseToken.PeeringToken) + require.NoError(t, err) + err = json.Unmarshal(decodeBytes, &token) + require.NoError(t, err) + // Get the IP of the Consul server. + addr := strings.Split(acceptorPeerServer.HTTPAddr, ":")[0] + // Generate expected token for Peering Initiate. + tokenString := fmt.Sprintf(`{"CA":null,"ServerAddresses":["%s:8300"],"ServerName":"%s","PeerID":"%s"}`, addr, token.ServerName, token.PeerID) + // Create peering initiate secret in Kubernetes. + encodedPeeringToken = base64.StdEncoding.EncodeToString([]byte(tokenString)) + secret := createSecret("dialer-token", "default", "token", encodedPeeringToken) + secret.SetResourceVersion("latest-version") + k8sObjects = append(k8sObjects, secret) + + // Create test consul server. + dialerPeerServer, err := testutil.NewTestServerConfigT(t, func(c *testutil.TestServerConfig) { + c.NodeName = node2Name + }) + require.NoError(t, err) + defer dialerPeerServer.Stop() + dialerPeerServer.WaitForServiceIntentions(t) + + cfg = &api.Config{ + Address: dialerPeerServer.HTTPAddr, + } + dialerClient, err := api.NewClient(cfg) + require.NoError(t, err) + + _, _, err = dialerClient.Peerings().Establish(context.Background(), api.PeeringEstablishRequest{PeerName: "peering", PeeringToken: encodedPeeringToken}, nil) + require.NoError(t, err) + k8sObjects = append(k8sObjects, createSecret("dialer-token-old", "default", "token", "old-token")) + + s := scheme.Scheme + s.AddKnownTypes(v1alpha1.GroupVersion, &v1alpha1.PeeringDialer{}, &v1alpha1.PeeringDialerList{}) + fakeClient := fake.NewClientBuilder().WithScheme(s).WithRuntimeObjects(k8sObjects...).Build() + + // Create the peering dialer controller + controller := &PeeringDialerController{ + Client: fakeClient, + Log: logrtest.TestLogger{T: t}, + ConsulClient: dialerClient, + Scheme: s, + } + namespacedName := types.NamespacedName{ + Name: "peering", + Namespace: "default", + } + + resp, err := controller.Reconcile(context.Background(), ctrl.Request{ + NamespacedName: namespacedName, + }) + if tt.expErr != "" { + require.EqualError(t, err, tt.expErr) + } else { + require.NoError(t, err) + require.False(t, resp.Requeue) + + // Get the reconciled PeeringDialer and make assertions on the status + dialer := &v1alpha1.PeeringDialer{} + err = fakeClient.Get(context.Background(), namespacedName, dialer) + require.NoError(t, err) + if tt.expectedStatus != nil { + require.Equal(t, tt.expectedStatus.SecretRef.Name, dialer.SecretRef().Name) + require.Equal(t, tt.expectedStatus.SecretRef.Key, dialer.SecretRef().Key) + require.Equal(t, tt.expectedStatus.SecretRef.Backend, dialer.SecretRef().Backend) + require.Equal(t, "latest-version", dialer.SecretRef().ResourceVersion) + require.Equal(t, tt.expectedStatus.LatestPeeringVersion, dialer.Status.LatestPeeringVersion) + } + } + }) + } +} + // TestSpecStatusSecretsDifferent tests that the correct result is returned // when comparing the secret in the status against the existing secret. func TestSpecStatusSecretsDifferent(t *testing.T) {