Skip to content

Commit

Permalink
[API Gateway] fix dangling service registrations (#2321)
Browse files Browse the repository at this point in the history
* Fix when gateways are deleted before we get services populated into cache

* a bit of cleanup
  • Loading branch information
Andrew Stucki authored Jun 9, 2023
1 parent 203c9d1 commit da147c1
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 11 deletions.
18 changes: 18 additions & 0 deletions control-plane/api-gateway/cache/gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,24 @@ func (r *GatewayCache) ServicesFor(ref api.ResourceReference) []api.CatalogServi
return r.data[common.NormalizeMeta(ref)]
}

func (r *GatewayCache) FetchServicesFor(ctx context.Context, ref api.ResourceReference) ([]api.CatalogService, error) {
client, err := consul.NewClientFromConnMgr(r.config, r.serverMgr)
if err != nil {
return nil, err
}

opts := &api.QueryOptions{}
if ref.Namespace != "" {
opts.Namespace = ref.Namespace
}

services, _, err := client.Catalog().Service(ref.Name, "", opts.WithContext(ctx))
if err != nil {
return nil, err
}
return common.DerefAll(services), nil
}

func (r *GatewayCache) EnsureSubscribed(ref api.ResourceReference, resource types.NamespacedName) {
r.mutex.Lock()
defer r.mutex.Unlock()
Expand Down
50 changes: 39 additions & 11 deletions control-plane/api-gateway/controllers/gateway_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,12 @@ func (r *GatewayController) Reconcile(ctx context.Context, req ctrl.Request) (ct
return ctrl.Result{}, err
}
r.gatewayCache.RemoveSubscription(nonNormalizedConsulKey)
// make sure we have deregister all services even if they haven't
// hit cache yet
if err := r.deregisterAllServices(ctx, consulKey); err != nil {
log.Error(err, "error deregistering services")
return ctrl.Result{}, err
}
}

for _, deletion := range updates.Consul.Deletions {
Expand All @@ -235,19 +241,24 @@ func (r *GatewayController) Reconcile(ctx context.Context, req ctrl.Request) (ct
}
}

for _, registration := range updates.Consul.Registrations {
log.Info("registering service in Consul", "service", registration.Service.Service, "id", registration.Service.ID)
if err := r.cache.Register(ctx, registration); err != nil {
log.Error(err, "error registering service")
return ctrl.Result{}, err
if updates.UpsertGatewayDeployment {
// We only do some registration/deregistraion if we still have a valid gateway
// otherwise, we've already deregistered everything related to the gateway, so
// no need to do any of the following.
for _, registration := range updates.Consul.Registrations {
log.Info("registering service in Consul", "service", registration.Service.Service, "id", registration.Service.ID)
if err := r.cache.Register(ctx, registration); err != nil {
log.Error(err, "error registering service")
return ctrl.Result{}, err
}
}
}

for _, deregistration := range updates.Consul.Deregistrations {
log.Info("deregistering service in Consul", "id", deregistration.ServiceID)
if err := r.cache.Deregister(ctx, deregistration); err != nil {
log.Error(err, "error deregistering service")
return ctrl.Result{}, err
for _, deregistration := range updates.Consul.Deregistrations {
log.Info("deregistering service in Consul", "id", deregistration.ServiceID)
if err := r.cache.Deregister(ctx, deregistration); err != nil {
log.Error(err, "error deregistering service")
return ctrl.Result{}, err
}
}
}

Expand All @@ -270,6 +281,23 @@ func (r *GatewayController) Reconcile(ctx context.Context, req ctrl.Request) (ct
return ctrl.Result{}, nil
}

func (r *GatewayController) deregisterAllServices(ctx context.Context, consulKey api.ResourceReference) error {
services, err := r.gatewayCache.FetchServicesFor(ctx, consulKey)
if err != nil {
return err
}
for _, service := range services {
if err := r.cache.Deregister(ctx, api.CatalogDeregistration{
Node: service.Node,
ServiceID: service.ServiceID,
Namespace: service.Namespace,
}); err != nil {
return err
}
}
return nil
}

func (r *GatewayController) updateAndResetStatus(ctx context.Context, o client.Object) error {
// we create a copy so that we can re-update its status if need be
status := reflect.ValueOf(o.DeepCopyObject()).Elem().FieldByName("Status")
Expand Down

0 comments on commit da147c1

Please sign in to comment.