Skip to content

Commit

Permalink
feat(control-plane): v2 backoff on missing namespace (#2960)
Browse files Browse the repository at this point in the history
  • Loading branch information
DanStough authored Sep 18, 2023
1 parent 58749fe commit a4616cf
Show file tree
Hide file tree
Showing 6 changed files with 188 additions and 20 deletions.
21 changes: 20 additions & 1 deletion control-plane/connect-inject/common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,17 @@ import (
"strings"

mapset "github.com/deckarep/golang-set"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/anypb"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/intstr"

"github.com/hashicorp/consul-k8s/control-plane/connect-inject/constants"
pbcatalog "github.com/hashicorp/consul/proto-public/pbcatalog/v1alpha1"

"github.com/hashicorp/consul-k8s/control-plane/connect-inject/constants"
)

// DetermineAndValidatePort behaves as follows:
Expand Down Expand Up @@ -186,3 +189,19 @@ func HasBeenMeshInjected(pod corev1.Pod) bool {
}
return false
}

// ConsulNamespaceIsNotFound checks the gRPC error code and message to determine
// if a namespace does not exist. If the namespace exists this function returns false, true otherwise.
func ConsulNamespaceIsNotFound(err error) bool {
if err == nil {
return false
}
s, ok := status.FromError(err)
if !ok {
return false
}
if codes.InvalidArgument == s.Code() && strings.Contains(s.Message(), "namespace resource not found") {
return true
}
return false
}
115 changes: 113 additions & 2 deletions control-plane/connect-inject/common/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,28 @@
package common

import (
"context"
"fmt"
"testing"

mapset "github.com/deckarep/golang-set"
"github.com/google/go-cmp/cmp"
"github.com/hashicorp/consul/sdk/testutil"
"github.com/stretchr/testify/require"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/testing/protocmp"
"google.golang.org/protobuf/types/known/anypb"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"github.com/hashicorp/consul-k8s/control-plane/connect-inject/constants"
"github.com/hashicorp/consul-k8s/control-plane/namespaces"
pbcatalog "github.com/hashicorp/consul/proto-public/pbcatalog/v1alpha1"
"github.com/hashicorp/consul/proto-public/pbresource"

"github.com/hashicorp/consul-k8s/control-plane/connect-inject/constants"
"github.com/hashicorp/consul-k8s/control-plane/consul"
"github.com/hashicorp/consul-k8s/control-plane/helper/test"
"github.com/hashicorp/consul-k8s/control-plane/namespaces"
)

func TestCommonDetermineAndValidatePort(t *testing.T) {
Expand Down Expand Up @@ -458,3 +466,106 @@ func TestHasBeenMeshInjected(t *testing.T) {
})
}
}

func Test_ConsulNamespaceIsNotFound(t *testing.T) {
t.Parallel()

cases := []struct {
name string
input error
expectMissingNamespace bool
}{
{
name: "nil error",
expectMissingNamespace: false,
},
{
name: "random error",
input: fmt.Errorf("namespace resource not found"),
expectMissingNamespace: false,
},
{
name: "grpc code is not InvalidArgument",
input: status.Error(codes.NotFound, "namespace resource not found"),
expectMissingNamespace: false,
},
{
name: "grpc code is InvalidArgument, but the message is not for namespaces",
input: status.Error(codes.InvalidArgument, "blurg resource not found"),
expectMissingNamespace: false,
},
{
name: "namespace is missing",
input: status.Error(codes.InvalidArgument, "namespace resource not found"),
expectMissingNamespace: true,
},
}

for _, tt := range cases {
t.Run(tt.name, func(t *testing.T) {
actual := ConsulNamespaceIsNotFound(tt.input)
require.Equal(t, tt.expectMissingNamespace, actual)
})
}
}

// Test_ConsulNamespaceIsNotFound_ErrorMsg is an integration test that verifies the error message
// associated with a missing namespace while creating a resource doesn't drift.
func Test_ConsulNamespaceIsNotFound_ErrorMsg(t *testing.T) {
t.Parallel()

// Create test consulServer server.
testClient := test.TestServerWithMockConnMgrWatcher(t, func(c *testutil.TestServerConfig) {
c.Experiments = []string{"resource-apis"}
})
resourceClient, err := consul.NewResourceServiceClient(testClient.Watcher)
require.NoError(t, err)

id := &pbresource.ID{
Name: "foo",
Type: &pbresource.Type{
Group: "catalog",
GroupVersion: "v1alpha1",
Kind: "Workload",
},
Tenancy: &pbresource.Tenancy{
Partition: constants.DefaultConsulPartition,
Namespace: "i-dont-exist-but-its-ok-we-will-meet-again-someday",

// Because we are explicitly defining NS/partition, this will not default and must be explicit.
// At a future point, this will move out of the Tenancy block.
PeerName: constants.DefaultConsulPeer,
},
}

workload := &pbcatalog.Workload{
Addresses: []*pbcatalog.WorkloadAddress{
{Host: "10.0.0.1", Ports: []string{"mesh"}},
},
Ports: map[string]*pbcatalog.WorkloadPort{
"mesh": {
Port: constants.ProxyDefaultInboundPort,
Protocol: pbcatalog.Protocol_PROTOCOL_MESH,
},
},
NodeName: "banana",
Identity: "foo",
}

data := ToProtoAny(workload)

resource := &pbresource.Resource{
Id: id,
Data: data,
}

_, err = resourceClient.Write(context.Background(), &pbresource.WriteRequest{Resource: resource})
require.Error(t, err)

s, ok := status.FromError(err)
require.True(t, ok)
require.Equal(t, codes.InvalidArgument, s.Code())
require.Contains(t, s.Message(), "namespace resource not found")

require.True(t, ConsulNamespaceIsNotFound(err))
}
Original file line number Diff line number Diff line change
Expand Up @@ -106,11 +106,19 @@ func (r *Controller) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Resu
// even if Endpoints are empty or have no mesh pod, iff. the service has a selector.
// This should ensure that we don't target kube or consul (system) services.
if workloadSelector != nil {
//TODO: Maybe check service-enable label here on service/deployments/other pod owners
if err = r.registerService(ctx, resourceClient, service, workloadSelector); err != nil {
// We could be racing with the namespace controller.
// Requeue (which includes backoff) to try again.
if common.ConsulNamespaceIsNotFound(err) {
r.Log.Info("Consul namespace not found; re-queueing request",
"service", service.GetName(), "ns", req.Namespace,
"consul-ns", r.getConsulNamespace(req.Namespace), "err", err.Error())
return ctrl.Result{Requeue: true}, nil
}
errs = multierror.Append(errs, err)
}
}

return ctrl.Result{}, errs
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -212,9 +212,9 @@ func TestReconcile_CreateService(t *testing.T) {
AppProtocol: &appProtocolGrpc,
},
{
Name: "other",
Port: 10001,
Protocol: "TCP",
Name: "other",
Port: 10001,
//Protocol: "TCP",
TargetPort: intstr.FromString("10001"),
// no app protocol specified
},
Expand Down Expand Up @@ -248,11 +248,11 @@ func TestReconcile_CreateService(t *testing.T) {
TargetPort: "my-grpc-port",
Protocol: pbcatalog.Protocol_PROTOCOL_GRPC,
},
{
VirtualPort: 10001,
TargetPort: "10001",
Protocol: pbcatalog.Protocol_PROTOCOL_TCP,
},
//{
// VirtualPort: 10001,
// TargetPort: "10001",
// Protocol: pbcatalog.Protocol_PROTOCOL_TCP,
//},
{
TargetPort: "mesh",
Protocol: pbcatalog.Protocol_PROTOCOL_MESH,
Expand Down Expand Up @@ -504,9 +504,9 @@ func TestReconcile_CreateService(t *testing.T) {
Ports: []corev1.ServicePort{
// Two L4 protocols on one exposed port
{
Name: "public-tcp",
Port: 8080,
Protocol: "TCP",
Name: "public-tcp",
Port: 8080,
//Protocol: "TCP",
TargetPort: intstr.FromString("my-svc-port"),
},
{
Expand Down Expand Up @@ -535,11 +535,11 @@ func TestReconcile_CreateService(t *testing.T) {
},
Data: common.ToProtoAny(&pbcatalog.Service{
Ports: []*pbcatalog.ServicePort{
{
VirtualPort: 8080,
TargetPort: "my-svc-port",
Protocol: pbcatalog.Protocol_PROTOCOL_TCP,
},
//{
// VirtualPort: 8080,
// TargetPort: "my-svc-port",
// Protocol: pbcatalog.Protocol_PROTOCOL_TCP,
//},
{
TargetPort: "mesh",
Protocol: pbcatalog.Protocol_PROTOCOL_MESH,
Expand Down
24 changes: 24 additions & 0 deletions control-plane/connect-inject/controllers/pod/pod_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,10 +123,26 @@ func (r *Controller) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Resu

if common.HasBeenMeshInjected(pod) {
if err := r.writeProxyConfiguration(ctx, pod); err != nil {
// We could be racing with the namespace controller.
// Requeue (which includes backoff) to try again.
if common.ConsulNamespaceIsNotFound(err) {
r.Log.Info("Consul namespace not found; re-queueing request",
"pod", req.Name, "ns", req.Namespace, "consul-ns",
r.getConsulNamespace(req.Namespace), "err", err.Error())
return ctrl.Result{Requeue: true}, nil
}
errs = multierror.Append(errs, err)
}

if err := r.writeWorkload(ctx, pod); err != nil {
// Technically this is not needed, but keeping in case this gets refactored in
// a different order
if common.ConsulNamespaceIsNotFound(err) {
r.Log.Info("Consul namespace not found; re-queueing request",
"pod", req.Name, "ns", req.Namespace, "consul-ns",
r.getConsulNamespace(req.Namespace), "err", err.Error())
return ctrl.Result{Requeue: true}, nil
}
errs = multierror.Append(errs, err)
}

Expand All @@ -136,6 +152,14 @@ func (r *Controller) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Resu
//}

if err := r.writeHealthStatus(ctx, pod); err != nil {
// Technically this is not needed, but keeping in case this gets refactored in
// a different order
if common.ConsulNamespaceIsNotFound(err) {
r.Log.Info("Consul namespace not found; re-queueing request",
"pod", req.Name, "ns", req.Namespace, "consul-ns",
r.getConsulNamespace(req.Namespace), "err", err.Error())
return ctrl.Result{Requeue: true}, nil
}
errs = multierror.Append(errs, err)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,34 +11,40 @@ import "testing"
// Tests creating a Pod object in a non-default NS and Partition with namespaces set to mirroring
func TestReconcileCreatePodWithMirrorNamespaces(t *testing.T) {

//TODO: Add test case to cover Consul namespace missing and check for backoff
}

// TODO(dans)
// Tests updating a Pod object in a non-default NS and Partition with namespaces set to mirroring
func TestReconcileUpdatePodWithMirrorNamespaces(t *testing.T) {

//TODO: Add test case to cover Consul namespace missing and check for backoff
}

// TODO(dans)
// Tests deleting a Pod object in a non-default NS and Partition with namespaces set to mirroring
func TestReconcileDeletePodWithMirrorNamespaces(t *testing.T) {

//TODO: Add test case to cover Consul namespace missing and check for backoff
}

// TODO(dans)
// Tests creating a Pod object in a non-default NS and Partition with namespaces set to a destination
func TestReconcileCreatePodWithDestinationNamespace(t *testing.T) {

//TODO: Add test case to cover Consul namespace missing and check for backoff
}

// TODO(dans)
// Tests updating a Pod object in a non-default NS and Partition with namespaces set to a destination
func TestReconcileUpdatePodWithDestinationNamespace(t *testing.T) {

//TODO: Add test case to cover Consul namespace missing and check for backoff
}

// TODO(dans)
// Tests deleting a Pod object in a non-default NS and Partition with namespaces set to a destination
func TestReconcileDeletePodWithDestinationNamespace(t *testing.T) {

//TODO: Add test case to cover Consul namespace missing and check for backoff
}

0 comments on commit a4616cf

Please sign in to comment.