Skip to content
This repository has been archived by the owner on Apr 17, 2019. It is now read-only.

Commit

Permalink
Only update service annotations if it contains named ports
Browse files Browse the repository at this point in the history
  • Loading branch information
aledbf committed May 2, 2016
1 parent 0c6ac0c commit 5e97d4f
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 80 deletions.
113 changes: 34 additions & 79 deletions ingress/controllers/nginx/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,12 +57,15 @@ var (

type namedPortMapping map[string]string

// getPort returns the port defined in a named port
func (npm namedPortMapping) getPort(name string) (string, bool) {
val, ok := npm.getMappings()[name]
val, ok := npm.getPortMappings()[name]
return val, ok
}

func (npm namedPortMapping) getMappings() map[string]string {
// getPortMappings returns the map containing the
// mapping of named port names and the port number
func (npm namedPortMapping) getPortMappings() map[string]string {
data := npm[namedPortAnnotation]
var mapping map[string]string
if data == "" {
Expand Down Expand Up @@ -100,10 +103,6 @@ type loadBalancerController struct {
// this avoids a sync execution in the ResourceEventHandlerFuncs
ingQueue *taskQueue

// used to update the annotation that matches a service using one or
// more named ports to an endpoint port
svcEpQueue *taskQueue

// stopLock is used to enforce only a single call to Stop is active.
// Needed because we allow stopping through an http endpoint and
// allowing concurrent stoppers leads to stack traces.
Expand Down Expand Up @@ -136,14 +135,12 @@ func newLoadBalancerController(kubeClient *client.Client, resyncPeriod time.Dura

lbc.syncQueue = NewTaskQueue(lbc.sync)
lbc.ingQueue = NewTaskQueue(lbc.updateIngressStatus)
lbc.svcEpQueue = NewTaskQueue(lbc.updateEpNamedPorts)

ingEventHandler := framework.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
addIng := obj.(*extensions.Ingress)
lbc.recorder.Eventf(addIng, api.EventTypeNormal, "CREATE", fmt.Sprintf("%s/%s", addIng.Namespace, addIng.Name))
lbc.ingQueue.enqueue(obj)
lbc.svcEpQueue.enqueue(obj)
lbc.syncQueue.enqueue(obj)
},
DeleteFunc: func(obj interface{}) {
Expand All @@ -156,7 +153,6 @@ func newLoadBalancerController(kubeClient *client.Client, resyncPeriod time.Dura
upIng := cur.(*extensions.Ingress)
lbc.recorder.Eventf(upIng, api.EventTypeNormal, "UPDATE", fmt.Sprintf("%s/%s", upIng.Namespace, upIng.Name))
lbc.ingQueue.enqueue(cur)
lbc.svcEpQueue.enqueue(cur)
lbc.syncQueue.enqueue(cur)
}
},
Expand Down Expand Up @@ -252,84 +248,26 @@ func (lbc *loadBalancerController) getUDPConfigMap(ns, name string) (*api.Config
return lbc.client.ConfigMaps(ns).Get(name)
}

func (lbc *loadBalancerController) updateEpNamedPorts(key string) {
if !lbc.controllersInSync() {
time.Sleep(podStoreSyncedPollPeriod)
lbc.svcEpQueue.requeue(key, fmt.Errorf("deferring sync till endpoints controller has synced"))
return
}

glog.V(4).Infof("checking if service %v uses named ports to update annotation %v", key, namedPortAnnotation)

ingObj, ingExists, err := lbc.ingLister.Store.GetByKey(key)
if err != nil {
glog.Warningf("error getting service %v: %v", key, err)
return
}

if !ingExists {
glog.Warningf("service %v not found", key)
return
}

ing := ingObj.(*extensions.Ingress)
for _, rule := range ing.Spec.Rules {
if rule.IngressRuleValue.HTTP == nil {
continue
}

for _, path := range rule.HTTP.Paths {
svcKey := fmt.Sprintf("%v/%v", ing.GetNamespace(), path.Backend.ServiceName)
svcObj, svcExists, err := lbc.svcLister.Store.GetByKey(svcKey)
if err != nil {
glog.Infof("error getting service %v from the cache: %v", svcKey, err)
continue
}

if !svcExists {
glog.Warningf("service %v does no exists", svcKey)
continue
}

svc := svcObj.(*api.Service)
if svc.Spec.Selector == nil {
return
}

// check to avoid a call to checkSvcForUpdate if the port is not a string
_, err = strconv.Atoi(path.Backend.ServicePort.StrVal)
if err == nil {
continue
}

err = lbc.checkSvcForUpdate(svc)
if err != nil {
lbc.svcEpQueue.requeue(key, err)
return
}
}
}
}

// checkSvcForUpdate verifies if one of the running pods for a service contains
// named port. If the annotation in the service does not exists or is not equals
// to the port mapping obtained from the pod the service must be updated to reflect
// the current state
func (lbc *loadBalancerController) checkSvcForUpdate(svc *api.Service) error {
func (lbc *loadBalancerController) checkSvcForUpdate(svc *api.Service) (map[string]string, error) {
// get the pods associated with the service
// TODO: switch this to a watch
pods, err := lbc.client.Pods(svc.Namespace).List(api.ListOptions{
LabelSelector: labels.Set(svc.Spec.Selector).AsSelector(),
})

namedPorts := map[string]string{}
if err != nil {
return fmt.Errorf("error searching service pods %v/%v: %v", svc.Namespace, svc.Name, err)
return namedPorts, fmt.Errorf("error searching service pods %v/%v: %v", svc.Namespace, svc.Name, err)
}

if len(pods.Items) == 0 {
return nil
return namedPorts, nil
}

namedPorts := map[string]string{}
// we need to check only one pod searching for named ports
pod := &pods.Items[0]
glog.V(4).Infof("checking pod %v/%v for named port information", pod.Namespace, pod.Name)
Expand Down Expand Up @@ -362,7 +300,7 @@ func (lbc *loadBalancerController) checkSvcForUpdate(svc *api.Service) error {

newSvc, err := lbc.client.Services(svc.Namespace).Get(svc.Name)
if err != nil {
return fmt.Errorf("error getting service %v/%v: %v", svc.Namespace, svc.Name, err)
return namedPorts, fmt.Errorf("error getting service %v/%v: %v", svc.Namespace, svc.Name, err)
}

if newSvc.ObjectMeta.Annotations == nil {
Expand All @@ -371,13 +309,15 @@ func (lbc *loadBalancerController) checkSvcForUpdate(svc *api.Service) error {

newSvc.ObjectMeta.Annotations[namedPortAnnotation] = string(data)
glog.Infof("updating service %v with new named port mappings", svc.Name)
_, err = lbc.client.Services(svc.Namespace).Update(svc)
_, err = lbc.client.Services(svc.Namespace).Update(newSvc)
if err != nil {
return fmt.Errorf("error syncing service %v/%v: %v", svc.Namespace, svc.Name, err)
return namedPorts, fmt.Errorf("error syncing service %v/%v: %v", svc.Namespace, svc.Name, err)
}

return newSvc.ObjectMeta.Annotations, nil
}

return nil
return namedPorts, nil
}

func (lbc *loadBalancerController) sync(key string) {
Expand Down Expand Up @@ -889,14 +829,30 @@ func (lbc *loadBalancerController) getEndpoints(s *api.Service, servicePort ints
targetPort = epPort.Port
}
case intstr.String:
if val, ok := namedPortMapping(s.ObjectMeta.Annotations).getPort(servicePort.StrVal); ok {
namedPorts := s.ObjectMeta.Annotations
val, ok := namedPortMapping(namedPorts).getPort(servicePort.StrVal)
if ok {
port, err := strconv.Atoi(val)
if err != nil {
glog.Warningf("%v is not valid as a port", val)
continue
}

if epPort.Protocol == proto {
targetPort = port
} else {
newnp, err := lbc.checkSvcForUpdate(s)
if err != nil {
glog.Warningf("error mapping service ports: %v", err)
continue
}
val, ok := namedPortMapping(newnp).getPort(servicePort.StrVal)
if ok {
port, err := strconv.Atoi(val)
if err != nil {
glog.Warningf("%v is not valid as a port", val)
continue
}

targetPort = port
}
}
Expand Down Expand Up @@ -988,7 +944,6 @@ func (lbc *loadBalancerController) Run() {

go lbc.syncQueue.run(time.Second, lbc.stopCh)
go lbc.ingQueue.run(time.Second, lbc.stopCh)
go lbc.svcEpQueue.run(time.Second, lbc.stopCh)

<-lbc.stopCh
glog.Infof("shutting down NGINX loadbalancer controller")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ spec:
- containerPort: 80
hostPort: 80
- containerPort: 443
hostPort: 4430
hostPort: 443
args:
- /nginx-ingress-controller
- --default-backend-service=default/default-http-backend
Expand Down

0 comments on commit 5e97d4f

Please sign in to comment.