From da147c1d7503e307ee4b93e0139a27a80a073cfc Mon Sep 17 00:00:00 2001 From: Andrew Stucki Date: Fri, 9 Jun 2023 10:16:55 -0400 Subject: [PATCH] [API Gateway] fix dangling service registrations (#2321) * Fix when gateways are deleted before we get services populated into cache * a bit of cleanup --- control-plane/api-gateway/cache/gateway.go | 18 +++++++ .../controllers/gateway_controller.go | 50 +++++++++++++++---- 2 files changed, 57 insertions(+), 11 deletions(-) diff --git a/control-plane/api-gateway/cache/gateway.go b/control-plane/api-gateway/cache/gateway.go index 846131d11e..d8dd37ffac 100644 --- a/control-plane/api-gateway/cache/gateway.go +++ b/control-plane/api-gateway/cache/gateway.go @@ -52,6 +52,24 @@ func (r *GatewayCache) ServicesFor(ref api.ResourceReference) []api.CatalogServi return r.data[common.NormalizeMeta(ref)] } +func (r *GatewayCache) FetchServicesFor(ctx context.Context, ref api.ResourceReference) ([]api.CatalogService, error) { + client, err := consul.NewClientFromConnMgr(r.config, r.serverMgr) + if err != nil { + return nil, err + } + + opts := &api.QueryOptions{} + if ref.Namespace != "" { + opts.Namespace = ref.Namespace + } + + services, _, err := client.Catalog().Service(ref.Name, "", opts.WithContext(ctx)) + if err != nil { + return nil, err + } + return common.DerefAll(services), nil +} + func (r *GatewayCache) EnsureSubscribed(ref api.ResourceReference, resource types.NamespacedName) { r.mutex.Lock() defer r.mutex.Unlock() diff --git a/control-plane/api-gateway/controllers/gateway_controller.go b/control-plane/api-gateway/controllers/gateway_controller.go index 664bb98d3c..f83466d67a 100644 --- a/control-plane/api-gateway/controllers/gateway_controller.go +++ b/control-plane/api-gateway/controllers/gateway_controller.go @@ -209,6 +209,12 @@ func (r *GatewayController) Reconcile(ctx context.Context, req ctrl.Request) (ct return ctrl.Result{}, err } r.gatewayCache.RemoveSubscription(nonNormalizedConsulKey) + // make sure we have deregister all services even if they haven't + // hit cache yet + if err := r.deregisterAllServices(ctx, consulKey); err != nil { + log.Error(err, "error deregistering services") + return ctrl.Result{}, err + } } for _, deletion := range updates.Consul.Deletions { @@ -235,19 +241,24 @@ func (r *GatewayController) Reconcile(ctx context.Context, req ctrl.Request) (ct } } - for _, registration := range updates.Consul.Registrations { - log.Info("registering service in Consul", "service", registration.Service.Service, "id", registration.Service.ID) - if err := r.cache.Register(ctx, registration); err != nil { - log.Error(err, "error registering service") - return ctrl.Result{}, err + if updates.UpsertGatewayDeployment { + // We only do some registration/deregistraion if we still have a valid gateway + // otherwise, we've already deregistered everything related to the gateway, so + // no need to do any of the following. + for _, registration := range updates.Consul.Registrations { + log.Info("registering service in Consul", "service", registration.Service.Service, "id", registration.Service.ID) + if err := r.cache.Register(ctx, registration); err != nil { + log.Error(err, "error registering service") + return ctrl.Result{}, err + } } - } - for _, deregistration := range updates.Consul.Deregistrations { - log.Info("deregistering service in Consul", "id", deregistration.ServiceID) - if err := r.cache.Deregister(ctx, deregistration); err != nil { - log.Error(err, "error deregistering service") - return ctrl.Result{}, err + for _, deregistration := range updates.Consul.Deregistrations { + log.Info("deregistering service in Consul", "id", deregistration.ServiceID) + if err := r.cache.Deregister(ctx, deregistration); err != nil { + log.Error(err, "error deregistering service") + return ctrl.Result{}, err + } } } @@ -270,6 +281,23 @@ func (r *GatewayController) Reconcile(ctx context.Context, req ctrl.Request) (ct return ctrl.Result{}, nil } +func (r *GatewayController) deregisterAllServices(ctx context.Context, consulKey api.ResourceReference) error { + services, err := r.gatewayCache.FetchServicesFor(ctx, consulKey) + if err != nil { + return err + } + for _, service := range services { + if err := r.cache.Deregister(ctx, api.CatalogDeregistration{ + Node: service.Node, + ServiceID: service.ServiceID, + Namespace: service.Namespace, + }); err != nil { + return err + } + } + return nil +} + func (r *GatewayController) updateAndResetStatus(ctx context.Context, o client.Object) error { // we create a copy so that we can re-update its status if need be status := reflect.ValueOf(o.DeepCopyObject()).Elem().FieldByName("Status")