Skip to content

Commit

Permalink
Add watchers for Peering Token secrets for the peering controllers
Browse files Browse the repository at this point in the history
- When a Kubernetes secret that has a label indicating it is a peering token secret, the controllers watch those secrets and updated to those secrets re-enqueues the resource that is associated with that peering token secret.

The status is used to determine the Peering Acceptor that is re-enqueued
while the spec is used to determing the Peering Dialer that gets
re-enqueued. This is because the acceptor is responsible for creating
the secret and hence metaphorically owns the secret described in it's
status. OTOH the dialer should respond to changes in the secret
described in it's spec.

This is only supported for secrets with a Kubernetes backend.
  • Loading branch information
thisisnotashwin committed Jun 21, 2022
1 parent 36ce818 commit 8ee2f39
Show file tree
Hide file tree
Showing 5 changed files with 657 additions and 2 deletions.
4 changes: 4 additions & 0 deletions control-plane/connect-inject/annotations.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,10 @@ const (
// registered with Consul.
labelServiceIgnore = "consul.hashicorp.com/service-ignore"

// labelPeeringToken is a label that can be added to a secret to allow it to be watched
// by the peering controllers.
labelPeeringToken = "consul.hashicorp.com/peering-token"

// injected is used as the annotation value for annotationInjected.
injected = "injected"

Expand Down
54 changes: 53 additions & 1 deletion control-plane/connect-inject/peering_acceptor_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,13 @@ import (
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/predicate"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
"sigs.k8s.io/controller-runtime/pkg/source"
)

// PeeringAcceptorController reconciles a PeeringAcceptor object.
Expand Down Expand Up @@ -318,7 +323,11 @@ func (r *PeeringAcceptorController) deleteK8sSecret(ctx context.Context, accepto
func (r *PeeringAcceptorController) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
For(&consulv1alpha1.PeeringAcceptor{}).
Complete(r)
Watches(
&source.Kind{Type: &corev1.Secret{}},
handler.EnqueueRequestsFromMapFunc(r.requestsForPeeringTokens),
builder.WithPredicates(predicate.NewPredicateFuncs(r.filterPeeringAcceptors)),
).Complete(r)
}

// generateToken is a helper function that calls the Consul api to generate a token for the peer.
Expand All @@ -344,12 +353,55 @@ func (r *PeeringAcceptorController) deletePeering(ctx context.Context, peerName
return nil
}

// requestsForPeeringTokens creates a slice of requests for the peering acceptor controller.
// It enqueues a request for each acceptor that needs to be reconciled. It iterates through
// the list of acceptors and creates a request for the acceptor that has the same secret as it's
// secretRef and that of the updated secret that is being watched.
// We compare it to the secret in the status as the resource has created the secret.
func (r *PeeringAcceptorController) requestsForPeeringTokens(object client.Object) []reconcile.Request {
r.Log.Info("received update for Peering Token Secret", "name", object.GetName())

// Get the list of all acceptors.
var acceptorList consulv1alpha1.PeeringAcceptorList
if err := r.Client.List(r.Context, &acceptorList); err != nil {
r.Log.Error(err, "failed to list Peering Acceptors")
return []ctrl.Request{}
}
for _, acceptor := range acceptorList.Items {
if acceptor.SecretRef().Backend == "kubernetes" {
if acceptor.SecretRef().Name == object.GetName() && acceptor.Namespace == object.GetNamespace() {
return []ctrl.Request{{NamespacedName: types.NamespacedName{Namespace: acceptor.Namespace, Name: acceptor.Name}}}
}
}
}
return []ctrl.Request{}
}

// filterPeeringAcceptors receives meta and object information for Kubernetes resources that are being watched,
// which in this case are Secrets. It only returns true if the Secret is a Peering Token Secret. It reads the labels
// from the meta of the resource and uses the values of the "consul.hashicorp.com/peering-token" label to validate that
// the Secret is a Peering Token Secret.
func (r *PeeringAcceptorController) filterPeeringAcceptors(object client.Object) bool {
secretLabels := object.GetLabels()
isPeeringToken, ok := secretLabels[labelPeeringToken]
if !ok {
return false
}
if isPeeringToken == "true" {
return true
}
return false
}

// createSecret is a helper function that creates a corev1.Secret when provided inputs.
func createSecret(name, namespace, key, value string) *corev1.Secret {
secret := &corev1.Secret{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Namespace: namespace,
Labels: map[string]string{
labelPeeringToken: "true",
},
},
Data: map[string][]byte{
key: []byte(value),
Expand Down
276 changes: 276 additions & 0 deletions control-plane/connect-inject/peering_acceptor_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"k8s.io/client-go/kubernetes/scheme"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client/fake"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
)

// TestReconcileCreateUpdatePeeringAcceptor creates a peering acceptor.
Expand Down Expand Up @@ -396,6 +397,8 @@ func TestReconcileCreateUpdatePeeringAcceptor(t *testing.T) {
require.NoError(t, err)
expSecrets := tt.expectedK8sSecrets()
require.Equal(t, expSecrets[0].Name, createdSecret.Name)
require.Contains(t, createdSecret.Labels, labelPeeringToken)
require.Equal(t, createdSecret.Labels[labelPeeringToken], "true")
// This assertion needs to be on StringData rather than Data because in the fake K8s client the contents are
// stored in StringData if that's how the secret was initialized in the fake client. In a real cluster, this
// StringData is an input only field, and shouldn't be read from.
Expand Down Expand Up @@ -943,3 +946,276 @@ func TestAcceptorUpdateStatusError(t *testing.T) {
})
}
}

func TestAcceptor_FilterPeeringAcceptor(t *testing.T) {
t.Parallel()
cases := map[string]struct {
secret *corev1.Secret
result bool
}{
"returns true if label is set to true": {
secret: &corev1.Secret{
ObjectMeta: metav1.ObjectMeta{
Name: "test",
Namespace: "test",
Labels: map[string]string{
labelPeeringToken: "true",
},
},
},
result: true,
},
"returns false if label is set to false": {
secret: &corev1.Secret{
ObjectMeta: metav1.ObjectMeta{
Name: "test",
Namespace: "test",
Labels: map[string]string{
labelPeeringToken: "false",
},
},
},
result: false,
},
"returns false if label is set to a non-true value": {
secret: &corev1.Secret{
ObjectMeta: metav1.ObjectMeta{
Name: "test",
Namespace: "test",
Labels: map[string]string{
labelPeeringToken: "foo",
},
},
},
result: false,
},
"returns false if label is not set": {
secret: &corev1.Secret{
ObjectMeta: metav1.ObjectMeta{
Name: "test",
Namespace: "test",
},
},
result: false,
},
}

for name, tt := range cases {
t.Run(name, func(t *testing.T) {
controller := PeeringAcceptorController{}
result := controller.filterPeeringAcceptors(tt.secret)
require.Equal(t, tt.result, result)
})
}
}

func TestAcceptor_RequestsForPeeringTokens(t *testing.T) {
t.Parallel()
cases := map[string]struct {
secret *corev1.Secret
acceptors v1alpha1.PeeringAcceptorList
result []reconcile.Request
}{
"secret matches existing acceptor": {
secret: &corev1.Secret{
ObjectMeta: metav1.ObjectMeta{
Name: "test",
Namespace: "test",
},
},
acceptors: v1alpha1.PeeringAcceptorList{
Items: []v1alpha1.PeeringAcceptor{
{
ObjectMeta: metav1.ObjectMeta{
Name: "peering",
Namespace: "test",
},
Status: v1alpha1.PeeringAcceptorStatus{
SecretRef: &v1alpha1.SecretRefStatus{
Secret: v1alpha1.Secret{
Name: "test",
Key: "test",
Backend: "kubernetes",
},
},
},
},
},
},
result: []reconcile.Request{
{
NamespacedName: types.NamespacedName{
Namespace: "test",
Name: "peering",
},
},
},
},
"does not match if backend is not kubernetes": {
secret: &corev1.Secret{
ObjectMeta: metav1.ObjectMeta{
Name: "test",
Namespace: "test",
},
},
acceptors: v1alpha1.PeeringAcceptorList{
Items: []v1alpha1.PeeringAcceptor{
{
ObjectMeta: metav1.ObjectMeta{
Name: "peering",
Namespace: "test",
},
Status: v1alpha1.PeeringAcceptorStatus{
SecretRef: &v1alpha1.SecretRefStatus{
Secret: v1alpha1.Secret{
Name: "test",
Key: "test",
Backend: "vault",
},
},
},
},
},
},
result: []reconcile.Request{},
},
"only matches with the correct acceptor": {
secret: &corev1.Secret{
ObjectMeta: metav1.ObjectMeta{
Name: "test",
Namespace: "test",
},
},
acceptors: v1alpha1.PeeringAcceptorList{
Items: []v1alpha1.PeeringAcceptor{
{
ObjectMeta: metav1.ObjectMeta{
Name: "peering-1",
Namespace: "test",
},
Status: v1alpha1.PeeringAcceptorStatus{
SecretRef: &v1alpha1.SecretRefStatus{
Secret: v1alpha1.Secret{
Name: "test",
Key: "test",
Backend: "kubernetes",
},
},
},
},
{
ObjectMeta: metav1.ObjectMeta{
Name: "peering-2",
Namespace: "test-2",
},
Status: v1alpha1.PeeringAcceptorStatus{
SecretRef: &v1alpha1.SecretRefStatus{
Secret: v1alpha1.Secret{
Name: "test",
Key: "test",
Backend: "kubernetes",
},
},
},
},
{
ObjectMeta: metav1.ObjectMeta{
Name: "peering-3",
Namespace: "test",
},
Status: v1alpha1.PeeringAcceptorStatus{
SecretRef: &v1alpha1.SecretRefStatus{
Secret: v1alpha1.Secret{
Name: "test-2",
Key: "test",
Backend: "kubernetes",
},
},
},
},
},
},
result: []reconcile.Request{
{
NamespacedName: types.NamespacedName{
Namespace: "test",
Name: "peering-1",
},
},
},
},
"can match with zero acceptors": {
secret: &corev1.Secret{
ObjectMeta: metav1.ObjectMeta{
Name: "test",
Namespace: "test",
},
},
acceptors: v1alpha1.PeeringAcceptorList{
Items: []v1alpha1.PeeringAcceptor{
{
ObjectMeta: metav1.ObjectMeta{
Name: "peering-1",
Namespace: "test",
},
Status: v1alpha1.PeeringAcceptorStatus{
SecretRef: &v1alpha1.SecretRefStatus{
Secret: v1alpha1.Secret{
Name: "fest",
Key: "test",
Backend: "kubernetes",
},
},
},
},
{
ObjectMeta: metav1.ObjectMeta{
Name: "peering-2",
Namespace: "test-2",
},
Status: v1alpha1.PeeringAcceptorStatus{
SecretRef: &v1alpha1.SecretRefStatus{
Secret: v1alpha1.Secret{
Name: "test",
Key: "test",
Backend: "kubernetes",
},
},
},
},
{
ObjectMeta: metav1.ObjectMeta{
Name: "peering-3",
Namespace: "test",
},
Status: v1alpha1.PeeringAcceptorStatus{
SecretRef: &v1alpha1.SecretRefStatus{
Secret: v1alpha1.Secret{
Name: "test-2",
Key: "test",
Backend: "kubernetes",
},
},
},
},
},
},
result: []reconcile.Request{},
},
}

for name, tt := range cases {
t.Run(name, func(t *testing.T) {
s := scheme.Scheme
s.AddKnownTypes(v1alpha1.GroupVersion, &v1alpha1.PeeringAcceptor{}, &v1alpha1.PeeringAcceptorList{})
fakeClient := fake.NewClientBuilder().WithScheme(s).WithRuntimeObjects(tt.secret, &tt.acceptors).Build()
controller := PeeringAcceptorController{
Client: fakeClient,
Log: logrtest.TestLogger{T: t},
}
result := controller.requestsForPeeringTokens(tt.secret)

require.Equal(t, tt.result, result)
})
}
}
Loading

0 comments on commit 8ee2f39

Please sign in to comment.