From 5e97d4fe4703100ecb5f7d523d6c946d2ac7ae4b Mon Sep 17 00:00:00 2001 From: Manuel de Brito Fontes Date: Sun, 1 May 2016 23:34:00 -0300 Subject: [PATCH] Only update service annotations if it contains named ports --- ingress/controllers/nginx/controller.go | 113 ++++++------------ .../custom-template/custom-template.yaml | 2 +- 2 files changed, 35 insertions(+), 80 deletions(-) diff --git a/ingress/controllers/nginx/controller.go b/ingress/controllers/nginx/controller.go index bbf0161708..f2cb6173f2 100644 --- a/ingress/controllers/nginx/controller.go +++ b/ingress/controllers/nginx/controller.go @@ -57,12 +57,15 @@ var ( type namedPortMapping map[string]string +// getPort returns the port defined in a named port func (npm namedPortMapping) getPort(name string) (string, bool) { - val, ok := npm.getMappings()[name] + val, ok := npm.getPortMappings()[name] return val, ok } -func (npm namedPortMapping) getMappings() map[string]string { +// getPortMappings returns the map containing the +// mapping of named port names and the port number +func (npm namedPortMapping) getPortMappings() map[string]string { data := npm[namedPortAnnotation] var mapping map[string]string if data == "" { @@ -100,10 +103,6 @@ type loadBalancerController struct { // this avoids a sync execution in the ResourceEventHandlerFuncs ingQueue *taskQueue - // used to update the annotation that matches a service using one or - // more named ports to an endpoint port - svcEpQueue *taskQueue - // stopLock is used to enforce only a single call to Stop is active. // Needed because we allow stopping through an http endpoint and // allowing concurrent stoppers leads to stack traces. @@ -136,14 +135,12 @@ func newLoadBalancerController(kubeClient *client.Client, resyncPeriod time.Dura lbc.syncQueue = NewTaskQueue(lbc.sync) lbc.ingQueue = NewTaskQueue(lbc.updateIngressStatus) - lbc.svcEpQueue = NewTaskQueue(lbc.updateEpNamedPorts) ingEventHandler := framework.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { addIng := obj.(*extensions.Ingress) lbc.recorder.Eventf(addIng, api.EventTypeNormal, "CREATE", fmt.Sprintf("%s/%s", addIng.Namespace, addIng.Name)) lbc.ingQueue.enqueue(obj) - lbc.svcEpQueue.enqueue(obj) lbc.syncQueue.enqueue(obj) }, DeleteFunc: func(obj interface{}) { @@ -156,7 +153,6 @@ func newLoadBalancerController(kubeClient *client.Client, resyncPeriod time.Dura upIng := cur.(*extensions.Ingress) lbc.recorder.Eventf(upIng, api.EventTypeNormal, "UPDATE", fmt.Sprintf("%s/%s", upIng.Namespace, upIng.Name)) lbc.ingQueue.enqueue(cur) - lbc.svcEpQueue.enqueue(cur) lbc.syncQueue.enqueue(cur) } }, @@ -252,84 +248,26 @@ func (lbc *loadBalancerController) getUDPConfigMap(ns, name string) (*api.Config return lbc.client.ConfigMaps(ns).Get(name) } -func (lbc *loadBalancerController) updateEpNamedPorts(key string) { - if !lbc.controllersInSync() { - time.Sleep(podStoreSyncedPollPeriod) - lbc.svcEpQueue.requeue(key, fmt.Errorf("deferring sync till endpoints controller has synced")) - return - } - - glog.V(4).Infof("checking if service %v uses named ports to update annotation %v", key, namedPortAnnotation) - - ingObj, ingExists, err := lbc.ingLister.Store.GetByKey(key) - if err != nil { - glog.Warningf("error getting service %v: %v", key, err) - return - } - - if !ingExists { - glog.Warningf("service %v not found", key) - return - } - - ing := ingObj.(*extensions.Ingress) - for _, rule := range ing.Spec.Rules { - if rule.IngressRuleValue.HTTP == nil { - continue - } - - for _, path := range rule.HTTP.Paths { - svcKey := fmt.Sprintf("%v/%v", ing.GetNamespace(), path.Backend.ServiceName) - svcObj, svcExists, err := lbc.svcLister.Store.GetByKey(svcKey) - if err != nil { - glog.Infof("error getting service %v from the cache: %v", svcKey, err) - continue - } - - if !svcExists { - glog.Warningf("service %v does no exists", svcKey) - continue - } - - svc := svcObj.(*api.Service) - if svc.Spec.Selector == nil { - return - } - - // check to avoid a call to checkSvcForUpdate if the port is not a string - _, err = strconv.Atoi(path.Backend.ServicePort.StrVal) - if err == nil { - continue - } - - err = lbc.checkSvcForUpdate(svc) - if err != nil { - lbc.svcEpQueue.requeue(key, err) - return - } - } - } -} - // checkSvcForUpdate verifies if one of the running pods for a service contains // named port. If the annotation in the service does not exists or is not equals // to the port mapping obtained from the pod the service must be updated to reflect // the current state -func (lbc *loadBalancerController) checkSvcForUpdate(svc *api.Service) error { +func (lbc *loadBalancerController) checkSvcForUpdate(svc *api.Service) (map[string]string, error) { // get the pods associated with the service // TODO: switch this to a watch pods, err := lbc.client.Pods(svc.Namespace).List(api.ListOptions{ LabelSelector: labels.Set(svc.Spec.Selector).AsSelector(), }) + + namedPorts := map[string]string{} if err != nil { - return fmt.Errorf("error searching service pods %v/%v: %v", svc.Namespace, svc.Name, err) + return namedPorts, fmt.Errorf("error searching service pods %v/%v: %v", svc.Namespace, svc.Name, err) } if len(pods.Items) == 0 { - return nil + return namedPorts, nil } - namedPorts := map[string]string{} // we need to check only one pod searching for named ports pod := &pods.Items[0] glog.V(4).Infof("checking pod %v/%v for named port information", pod.Namespace, pod.Name) @@ -362,7 +300,7 @@ func (lbc *loadBalancerController) checkSvcForUpdate(svc *api.Service) error { newSvc, err := lbc.client.Services(svc.Namespace).Get(svc.Name) if err != nil { - return fmt.Errorf("error getting service %v/%v: %v", svc.Namespace, svc.Name, err) + return namedPorts, fmt.Errorf("error getting service %v/%v: %v", svc.Namespace, svc.Name, err) } if newSvc.ObjectMeta.Annotations == nil { @@ -371,13 +309,15 @@ func (lbc *loadBalancerController) checkSvcForUpdate(svc *api.Service) error { newSvc.ObjectMeta.Annotations[namedPortAnnotation] = string(data) glog.Infof("updating service %v with new named port mappings", svc.Name) - _, err = lbc.client.Services(svc.Namespace).Update(svc) + _, err = lbc.client.Services(svc.Namespace).Update(newSvc) if err != nil { - return fmt.Errorf("error syncing service %v/%v: %v", svc.Namespace, svc.Name, err) + return namedPorts, fmt.Errorf("error syncing service %v/%v: %v", svc.Namespace, svc.Name, err) } + + return newSvc.ObjectMeta.Annotations, nil } - return nil + return namedPorts, nil } func (lbc *loadBalancerController) sync(key string) { @@ -889,14 +829,30 @@ func (lbc *loadBalancerController) getEndpoints(s *api.Service, servicePort ints targetPort = epPort.Port } case intstr.String: - if val, ok := namedPortMapping(s.ObjectMeta.Annotations).getPort(servicePort.StrVal); ok { + namedPorts := s.ObjectMeta.Annotations + val, ok := namedPortMapping(namedPorts).getPort(servicePort.StrVal) + if ok { port, err := strconv.Atoi(val) if err != nil { glog.Warningf("%v is not valid as a port", val) continue } - if epPort.Protocol == proto { + targetPort = port + } else { + newnp, err := lbc.checkSvcForUpdate(s) + if err != nil { + glog.Warningf("error mapping service ports: %v", err) + continue + } + val, ok := namedPortMapping(newnp).getPort(servicePort.StrVal) + if ok { + port, err := strconv.Atoi(val) + if err != nil { + glog.Warningf("%v is not valid as a port", val) + continue + } + targetPort = port } } @@ -988,7 +944,6 @@ func (lbc *loadBalancerController) Run() { go lbc.syncQueue.run(time.Second, lbc.stopCh) go lbc.ingQueue.run(time.Second, lbc.stopCh) - go lbc.svcEpQueue.run(time.Second, lbc.stopCh) <-lbc.stopCh glog.Infof("shutting down NGINX loadbalancer controller") diff --git a/ingress/controllers/nginx/examples/custom-template/custom-template.yaml b/ingress/controllers/nginx/examples/custom-template/custom-template.yaml index 65ef90e862..1b33910e9a 100644 --- a/ingress/controllers/nginx/examples/custom-template/custom-template.yaml +++ b/ingress/controllers/nginx/examples/custom-template/custom-template.yaml @@ -40,7 +40,7 @@ spec: - containerPort: 80 hostPort: 80 - containerPort: 443 - hostPort: 4430 + hostPort: 443 args: - /nginx-ingress-controller - --default-backend-service=default/default-http-backend