Skip to content

Commit

Permalink
NET-2619 - save ClusterIPs to manual vips table (hashicorp#2124)
Browse files Browse the repository at this point in the history
  • Loading branch information
curtbushko authored May 17, 2023
1 parent 02cab6c commit 5bd6c60
Show file tree
Hide file tree
Showing 7 changed files with 385 additions and 10 deletions.
3 changes: 3 additions & 0 deletions .changelog/2124.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
``release-note:improvement
control-plane: Transparent proxy enhancements for failover and virtual Services
```
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: MPL-2.0

package endpoints

import (
Expand Down Expand Up @@ -295,6 +294,14 @@ func (r *Controller) registerServicesAndHealthCheck(apiClient *api.Client, pod c
return err
}

// Add manual ip to the VIP table
r.Log.Info("adding manual ip to virtual ip table in Consul", "name", serviceRegistration.Service.Service,
"id", serviceRegistration.ID)
err = assignServiceVirtualIP(r.Context, apiClient, serviceRegistration.Service)
if err != nil {
r.Log.Error(err, "failed to add ip to virtual ip table", "name", serviceRegistration.Service.Service)
}

// Register the proxy service instance with Consul.
r.Log.Info("registering proxy service with Consul", "name", proxyServiceRegistration.Service.Service)
_, err = apiClient.Catalog().Register(proxyServiceRegistration, nil)
Expand Down Expand Up @@ -1258,6 +1265,26 @@ func (r *Controller) appendNodeMeta(registration *api.CatalogRegistration) {
}
}

// assignServiceVirtualIPs manually assigns the ClusterIP to the virtual IP table so that transparent proxy routing works.
func assignServiceVirtualIP(ctx context.Context, apiClient *api.Client, svc *api.AgentService) error {
ip := svc.TaggedAddresses[clusterIPTaggedAddressName].Address
if ip == "" {
return nil
}

_, _, err := apiClient.Internal().AssignServiceVirtualIP(ctx, svc.Service, []string{ip}, &api.WriteOptions{Namespace: svc.Namespace, Partition: svc.Partition})
if err != nil {
// Maintain backwards compatibility with older versions of Consul that do not support the VIP improvements. Tproxy
// will not work 100% correctly but the mesh will still work
if strings.Contains(err.Error(), "404") {
return fmt.Errorf("failed to add ip for service %s to virtual ip table. Please upgrade Consul to version 1.16 or higher", svc.Service)
} else {
return err
}
}
return nil
}

// hasBeenInjected checks the value of the status annotation and returns true if the Pod has been injected.
func hasBeenInjected(pod corev1.Pod) bool {
if anno, ok := pod.Annotations[constants.KeyInjectStatus]; ok && anno == constants.Injected {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6414,3 +6414,54 @@ func createGatewayPod(name, ip string, annotations map[string]string) *corev1.Po
}
return pod
}

func TestReconcileAssignServiceVirtualIP(t *testing.T) {
t.Parallel()
ctx := context.Background()
cases := []struct {
name string
service *api.AgentService
expectErr bool
}{
{
name: "valid service",
service: &api.AgentService{
ID: "",
Service: "foo",
Port: 80,
Address: "1.2.3.4",
TaggedAddresses: map[string]api.ServiceAddress{
"virtual": {
Address: "1.2.3.4",
Port: 80,
},
},
Meta: map[string]string{constants.MetaKeyKubeNS: "default"},
},
expectErr: false,
},
{
name: "service missing IP should not error",
service: &api.AgentService{
ID: "",
Service: "bar",
Meta: map[string]string{constants.MetaKeyKubeNS: "default"},
},
expectErr: false,
},
}
for _, c := range cases {
t.Run(c.name, func(t *testing.T) {

// Create test consulServer server.
testClient := test.TestServerWithMockConnMgrWatcher(t, nil)
apiClient := testClient.APIClient
err := assignServiceVirtualIP(ctx, apiClient, c.service)
if err != nil {
require.True(t, c.expectErr)
} else {
require.False(t, c.expectErr)
}
})
}
}
61 changes: 61 additions & 0 deletions control-plane/controllers/configentry_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,7 @@ func (r *ConfigEntryController) ReconcileEntry(ctx context.Context, crdCtrl Cont
return r.syncFailed(ctx, logger, crdCtrl, configEntry, ConsulAgentError,
fmt.Errorf("writing config entry to consul: %w", err))
}

logger.Info("config entry created", "request-time", writeMeta.RequestTime)
return r.syncSuccessful(ctx, crdCtrl, configEntry)
}
Expand Down Expand Up @@ -267,6 +268,15 @@ func (r *ConfigEntryController) ReconcileEntry(ctx context.Context, crdCtrl Cont
return r.syncSuccessful(ctx, crdCtrl, configEntry)
}

// For resolvers and splitters, we need to set the ClusterIP of the matching service to Consul so that transparent
// proxy works correctly. Do not fail the reconcile if assigning the virtual IP returns an error.
if needsVirtualIPAssignment(configEntry) {
err = assignServiceVirtualIP(ctx, logger, consulClient, crdCtrl, req.NamespacedName, configEntry, r.DatacenterName)
if err != nil {
logger.Error(err, "failed assigning service virtual ip")
}
}

return ctrl.Result{}, nil
}

Expand Down Expand Up @@ -381,6 +391,57 @@ func (r *ConfigEntryController) nonMatchingMigrationError(kubeEntry common.Confi
return fmt.Errorf("migration failed: Kubernetes resource does not match existing Consul config entry: consul=%s, kube=%s", consulJSON, kubeJSON)
}

// needsVirtualIPAssignment checks to see if a configEntry type needs to be assigned a virtual IP.
func needsVirtualIPAssignment(configEntry common.ConfigEntryResource) bool {
kubeKind := configEntry.KubeKind()
if kubeKind == common.ServiceResolver || kubeKind == common.ServiceRouter || kubeKind == common.ServiceSplitter {
return true
}
return false
}

// assignServiceVirtualIPs manually sends the ClusterIP for a matching service for ServiceRouter or ServiceSplitter
// CRDs to Consul so that it can be added to the virtual IP table. The assignment is skipped if the matching service
// does not exist or if an older version of Consul is being used. Endpoints Controller, on service registration, also
// manually sends a ClusterIP when a service is created. This increases the chance of a real IP ending up in the
// discovery chain.
func assignServiceVirtualIP(ctx context.Context, logger logr.Logger, consulClient *capi.Client, crdCtrl Controller, namespacedName types.NamespacedName, configEntry common.ConfigEntryResource, datacenter string) error {
service := corev1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: configEntry.KubernetesName(),
Namespace: namespacedName.Namespace,
},
}
if err := crdCtrl.Get(ctx, namespacedName, &service); err != nil {
// It is non-fatal if the service does not exist. The ClusterIP will get added when the service is registered in
// the endpoints controller
if k8serr.IsNotFound(err) {
return nil
}
// Something is really wrong with the service
return err
}

wo := &capi.WriteOptions{
Namespace: configEntry.ToConsul(datacenter).GetNamespace(),
Partition: configEntry.ToConsul(datacenter).GetPartition(),
}

logger.Info("adding manual ip to virtual ip table in Consul", "name", service.Name)
_, _, err := consulClient.Internal().AssignServiceVirtualIP(ctx, configEntry.KubernetesName(), []string{service.Spec.ClusterIP}, wo)
if err != nil {
// Maintain backwards compatibility with older versions of Consul that do not support the manual VIP improvements. With the older version, the mesh
// will still work.
if isNotFoundErr(err) {
logger.Error(err, "failed to add ip to virtual ip table. Please upgrade Consul to version 1.16 or higher", "name", service.Name)
return nil
} else {
return err
}
}
return nil
}

func isNotFoundErr(err error) bool {
return err != nil && strings.Contains(err.Error(), "404")
}
Expand Down
Loading

0 comments on commit 5bd6c60

Please sign in to comment.