This repository has been archived by the owner on Apr 17, 2019. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 1.7k
/
Copy pathcontroller.go
475 lines (434 loc) · 15.7 KB
/
controller.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
/*
Copyright 2015 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package controller
import (
"fmt"
"net/http"
"reflect"
"sync"
"time"
"k8s.io/contrib/ingress/controllers/gce/loadbalancers"
"k8s.io/contrib/ingress/controllers/gce/utils"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/apis/extensions"
"k8s.io/kubernetes/pkg/client/cache"
"k8s.io/kubernetes/pkg/client/record"
client "k8s.io/kubernetes/pkg/client/unversioned"
"k8s.io/kubernetes/pkg/controller/framework"
"k8s.io/kubernetes/pkg/fields"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/watch"
"github.com/golang/glog"
)
var (
keyFunc = framework.DeletionHandlingMetaNamespaceKeyFunc
// DefaultClusterUID is the uid to use for clusters resources created by an
// L7 controller created without specifying the --cluster-uid flag.
DefaultClusterUID = ""
// Frequency to poll on local stores to sync.
storeSyncPollPeriod = 5 * time.Second
)
// LoadBalancerController watches the kubernetes api and adds/removes services
// from the loadbalancer, via loadBalancerConfig.
type LoadBalancerController struct {
client *client.Client
ingController *framework.Controller
nodeController *framework.Controller
svcController *framework.Controller
podController *framework.Controller
ingLister StoreToIngressLister
nodeLister cache.StoreToNodeLister
svcLister cache.StoreToServiceLister
// Health checks are the readiness probes of containers on pods.
podLister cache.StoreToPodLister
// TODO: Watch secrets
CloudClusterManager *ClusterManager
recorder record.EventRecorder
nodeQueue *taskQueue
ingQueue *taskQueue
tr *GCETranslator
stopCh chan struct{}
// stopLock is used to enforce only a single call to Stop is active.
// Needed because we allow stopping through an http endpoint and
// allowing concurrent stoppers leads to stack traces.
stopLock sync.Mutex
shutdown bool
// tlsLoader loads secrets from the Kubernetes apiserver for Ingresses.
tlsLoader tlsLoader
// hasSynced returns true if all associated sub-controllers have synced.
// Abstracted into a func for testing.
hasSynced func() bool
}
// NewLoadBalancerController creates a controller for gce loadbalancers.
// - kubeClient: A kubernetes REST client.
// - clusterManager: A ClusterManager capable of creating all cloud resources
// required for L7 loadbalancing.
// - resyncPeriod: Watchers relist from the Kubernetes API server this often.
func NewLoadBalancerController(kubeClient *client.Client, clusterManager *ClusterManager, resyncPeriod time.Duration, namespace string) (*LoadBalancerController, error) {
eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartLogging(glog.Infof)
eventBroadcaster.StartRecordingToSink(kubeClient.Events(""))
lbc := LoadBalancerController{
client: kubeClient,
CloudClusterManager: clusterManager,
stopCh: make(chan struct{}),
recorder: eventBroadcaster.NewRecorder(
api.EventSource{Component: "loadbalancer-controller"}),
}
lbc.nodeQueue = NewTaskQueue(lbc.syncNodes)
lbc.ingQueue = NewTaskQueue(lbc.sync)
lbc.hasSynced = lbc.storesSynced
// Ingress watch handlers
pathHandlers := framework.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
addIng := obj.(*extensions.Ingress)
if !isGCEIngress(addIng) {
glog.Infof("Ignoring add for ingress %v based on annotation %v", addIng.Name, ingressClassKey)
return
}
lbc.recorder.Eventf(addIng, api.EventTypeNormal, "ADD", fmt.Sprintf("%s/%s", addIng.Namespace, addIng.Name))
lbc.ingQueue.enqueue(obj)
},
DeleteFunc: func(obj interface{}) {
delIng := obj.(*extensions.Ingress)
if !isGCEIngress(delIng) {
glog.Infof("Ignoring delete for ingress %v based on annotation %v", delIng.Name, ingressClassKey)
return
}
glog.Infof("Delete notification received for Ingress %v/%v", delIng.Namespace, delIng.Name)
lbc.ingQueue.enqueue(obj)
},
UpdateFunc: func(old, cur interface{}) {
curIng := cur.(*extensions.Ingress)
if !isGCEIngress(curIng) {
return
}
if !reflect.DeepEqual(old, cur) {
glog.V(3).Infof("Ingress %v changed, syncing", curIng.Name)
}
lbc.ingQueue.enqueue(cur)
},
}
lbc.ingLister.Store, lbc.ingController = framework.NewInformer(
&cache.ListWatch{
ListFunc: ingressListFunc(lbc.client, namespace),
WatchFunc: ingressWatchFunc(lbc.client, namespace),
},
&extensions.Ingress{}, resyncPeriod, pathHandlers)
// Service watch handlers
svcHandlers := framework.ResourceEventHandlerFuncs{
AddFunc: lbc.enqueueIngressForService,
UpdateFunc: func(old, cur interface{}) {
if !reflect.DeepEqual(old, cur) {
lbc.enqueueIngressForService(cur)
}
},
// Ingress deletes matter, service deletes don't.
}
lbc.svcLister.Store, lbc.svcController = framework.NewInformer(
cache.NewListWatchFromClient(
lbc.client, "services", namespace, fields.Everything()),
&api.Service{}, resyncPeriod, svcHandlers)
lbc.podLister.Indexer, lbc.podController = framework.NewIndexerInformer(
cache.NewListWatchFromClient(lbc.client, "pods", namespace, fields.Everything()),
&api.Pod{},
resyncPeriod,
framework.ResourceEventHandlerFuncs{},
cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
)
nodeHandlers := framework.ResourceEventHandlerFuncs{
AddFunc: lbc.nodeQueue.enqueue,
DeleteFunc: lbc.nodeQueue.enqueue,
// Nodes are updated every 10s and we don't care, so no update handler.
}
// Node watch handlers
lbc.nodeLister.Store, lbc.nodeController = framework.NewInformer(
&cache.ListWatch{
ListFunc: func(opts api.ListOptions) (runtime.Object, error) {
return lbc.client.Get().
Resource("nodes").
FieldsSelectorParam(fields.Everything()).
Do().
Get()
},
WatchFunc: func(options api.ListOptions) (watch.Interface, error) {
return lbc.client.Get().
Prefix("watch").
Resource("nodes").
FieldsSelectorParam(fields.Everything()).
Param("resourceVersion", options.ResourceVersion).Watch()
},
},
&api.Node{}, 0, nodeHandlers)
lbc.tr = &GCETranslator{&lbc}
lbc.tlsLoader = &apiServerTLSLoader{client: lbc.client}
glog.V(3).Infof("Created new loadbalancer controller")
return &lbc, nil
}
func ingressListFunc(c *client.Client, ns string) func(api.ListOptions) (runtime.Object, error) {
return func(opts api.ListOptions) (runtime.Object, error) {
return c.Extensions().Ingress(ns).List(opts)
}
}
func ingressWatchFunc(c *client.Client, ns string) func(options api.ListOptions) (watch.Interface, error) {
return func(options api.ListOptions) (watch.Interface, error) {
return c.Extensions().Ingress(ns).Watch(options)
}
}
// enqueueIngressForService enqueues all the Ingress' for a Service.
func (lbc *LoadBalancerController) enqueueIngressForService(obj interface{}) {
svc := obj.(*api.Service)
ings, err := lbc.ingLister.GetServiceIngress(svc)
if err != nil {
glog.V(5).Infof("ignoring service %v: %v", svc.Name, err)
return
}
for _, ing := range ings {
if !isGCEIngress(&ing) {
continue
}
lbc.ingQueue.enqueue(&ing)
}
}
// Run starts the loadbalancer controller.
func (lbc *LoadBalancerController) Run() {
glog.Infof("Starting loadbalancer controller")
go lbc.ingController.Run(lbc.stopCh)
go lbc.nodeController.Run(lbc.stopCh)
go lbc.svcController.Run(lbc.stopCh)
go lbc.podController.Run(lbc.stopCh)
go lbc.ingQueue.run(time.Second, lbc.stopCh)
go lbc.nodeQueue.run(time.Second, lbc.stopCh)
<-lbc.stopCh
glog.Infof("Shutting down Loadbalancer Controller")
}
// Stop stops the loadbalancer controller. It also deletes cluster resources
// if deleteAll is true.
func (lbc *LoadBalancerController) Stop(deleteAll bool) error {
// Stop is invoked from the http endpoint.
lbc.stopLock.Lock()
defer lbc.stopLock.Unlock()
// Only try draining the workqueue if we haven't already.
if !lbc.shutdown {
close(lbc.stopCh)
glog.Infof("Shutting down controller queues.")
lbc.ingQueue.shutdown()
lbc.nodeQueue.shutdown()
lbc.shutdown = true
}
// Deleting shared cluster resources is idempotent.
if deleteAll {
glog.Infof("Shutting down cluster manager.")
return lbc.CloudClusterManager.shutdown()
}
return nil
}
// storesSynced returns true if all the sub-controllers have finished their
// first sync with apiserver.
func (lbc *LoadBalancerController) storesSynced() bool {
return (
// wait for pods to sync so we don't allocate a default health check when
// an endpoint has a readiness probe.
lbc.podController.HasSynced() &&
// wait for services so we don't thrash on backend creation.
lbc.svcController.HasSynced() &&
// wait for nodes so we don't disconnect a backend from an instance
// group just because we don't realize there are nodes in that zone.
lbc.nodeController.HasSynced() &&
// Wait for ingresses as a safety measure. We don't really need this.
lbc.ingController.HasSynced())
}
// sync manages Ingress create/updates/deletes.
func (lbc *LoadBalancerController) sync(key string) (err error) {
if !lbc.hasSynced() {
time.Sleep(storeSyncPollPeriod)
return fmt.Errorf("Waiting for stores to sync")
}
glog.V(3).Infof("Syncing %v", key)
paths, err := lbc.ingLister.List()
if err != nil {
return err
}
nodePorts := lbc.tr.toNodePorts(&paths)
lbNames := lbc.ingLister.Store.ListKeys()
lbs, err := lbc.ListRuntimeInfo()
if err != nil {
return err
}
nodeNames, err := lbc.getReadyNodeNames()
if err != nil {
return err
}
obj, ingExists, err := lbc.ingLister.Store.GetByKey(key)
if err != nil {
return err
}
// This performs a 2 phase checkpoint with the cloud:
// * Phase 1 creates/verifies resources are as expected. At the end of a
// successful checkpoint we know that existing L7s are WAI, and the L7
// for the Ingress associated with "key" is ready for a UrlMap update.
// If this encounters an error, eg for quota reasons, we want to invoke
// Phase 2 right away and retry checkpointing.
// * Phase 2 performs GC by refcounting shared resources. This needs to
// happen periodically whether or not stage 1 fails. At the end of a
// successful GC we know that there are no dangling cloud resources that
// don't have an associated Kubernetes Ingress/Service/Endpoint.
defer func() {
if deferErr := lbc.CloudClusterManager.GC(lbNames, nodePorts); deferErr != nil {
err = fmt.Errorf("Error during sync %v, error during GC %v", err, deferErr)
}
glog.V(3).Infof("Finished syncing %v", key)
}()
// Record any errors during sync and throw a single error at the end. This
// allows us to free up associated cloud resources ASAP.
var syncError error
if err := lbc.CloudClusterManager.Checkpoint(lbs, nodeNames, nodePorts); err != nil {
// TODO: Implement proper backoff for the queue.
eventMsg := "GCE"
if utils.IsHTTPErrorCode(err, http.StatusForbidden) {
eventMsg += " :Quota"
}
if ingExists {
lbc.recorder.Eventf(obj.(*extensions.Ingress), api.EventTypeWarning, eventMsg, err.Error())
} else {
err = fmt.Errorf("%v Error: %v", eventMsg, err)
}
syncError = err
}
if !ingExists {
return syncError
}
// Update the UrlMap of the single loadbalancer that came through the watch.
l7, err := lbc.CloudClusterManager.l7Pool.Get(key)
if err != nil {
return fmt.Errorf("%v, unable to get loadbalancer: %v", syncError, err)
}
ing := *obj.(*extensions.Ingress)
if urlMap, err := lbc.tr.toUrlMap(&ing); err != nil {
syncError = fmt.Errorf("%v, convert to url map error %v", syncError, err)
} else if err := l7.UpdateUrlMap(urlMap); err != nil {
lbc.recorder.Eventf(&ing, api.EventTypeWarning, "UrlMap", err.Error())
syncError = fmt.Errorf("%v, update url map error: %v", syncError, err)
} else if err := lbc.updateIngressStatus(l7, ing); err != nil {
lbc.recorder.Eventf(&ing, api.EventTypeWarning, "Status", err.Error())
syncError = fmt.Errorf("%v, update ingress error: %v", syncError, err)
}
return syncError
}
// updateIngressStatus updates the IP and annotations of a loadbalancer.
// The annotations are parsed by kubectl describe.
func (lbc *LoadBalancerController) updateIngressStatus(l7 *loadbalancers.L7, ing extensions.Ingress) error {
ingClient := lbc.client.Extensions().Ingress(ing.Namespace)
// Update IP through update/status endpoint
ip := l7.GetIP()
currIng, err := ingClient.Get(ing.Name)
if err != nil {
return err
}
currIng.Status = extensions.IngressStatus{
LoadBalancer: api.LoadBalancerStatus{
Ingress: []api.LoadBalancerIngress{
{IP: ip},
},
},
}
if ip != "" {
lbIPs := ing.Status.LoadBalancer.Ingress
if len(lbIPs) == 0 || lbIPs[0].IP != ip {
// TODO: If this update fails it's probably resource version related,
// which means it's advantageous to retry right away vs requeuing.
glog.Infof("Updating loadbalancer %v/%v with IP %v", ing.Namespace, ing.Name, ip)
if _, err := ingClient.UpdateStatus(currIng); err != nil {
return err
}
lbc.recorder.Eventf(currIng, api.EventTypeNormal, "CREATE", "ip: %v", ip)
}
}
// Update annotations through /update endpoint
currIng, err = ingClient.Get(ing.Name)
if err != nil {
return err
}
currIng.Annotations = loadbalancers.GetLBAnnotations(l7, currIng.Annotations, lbc.CloudClusterManager.backendPool)
if !reflect.DeepEqual(ing.Annotations, currIng.Annotations) {
glog.V(3).Infof("Updating annotations of %v/%v", ing.Namespace, ing.Name)
if _, err := ingClient.Update(currIng); err != nil {
return err
}
}
return nil
}
// ListRuntimeInfo lists L7RuntimeInfo as understood by the loadbalancer module.
func (lbc *LoadBalancerController) ListRuntimeInfo() (lbs []*loadbalancers.L7RuntimeInfo, err error) {
ingList, err := lbc.ingLister.List()
if err != nil {
return lbs, err
}
for _, ing := range ingList.Items {
k, err := keyFunc(&ing)
if err != nil {
glog.Warningf("Cannot get key for Ingress %v/%v: %v", ing.Namespace, ing.Name, err)
continue
}
tls, err := lbc.tlsLoader.load(&ing)
if err != nil {
glog.Warningf("Cannot get certs for Ingress %v/%v: %v", ing.Namespace, ing.Name, err)
}
annotations := ingAnnotations(ing.ObjectMeta.Annotations)
lbs = append(lbs, &loadbalancers.L7RuntimeInfo{
Name: k,
TLS: tls,
AllowHTTP: annotations.allowHTTP(),
StaticIPName: annotations.staticIPName(),
})
}
return lbs, nil
}
// syncNodes manages the syncing of kubernetes nodes to gce instance groups.
// The instancegroups are referenced by loadbalancer backends.
func (lbc *LoadBalancerController) syncNodes(key string) error {
nodeNames, err := lbc.getReadyNodeNames()
if err != nil {
return err
}
if err := lbc.CloudClusterManager.instancePool.Sync(nodeNames); err != nil {
return err
}
return nil
}
func getNodeReadyPredicate() cache.NodeConditionPredicate {
return func(node *api.Node) bool {
for ix := range node.Status.Conditions {
condition := &node.Status.Conditions[ix]
if condition.Type == api.NodeReady {
return condition.Status == api.ConditionTrue
}
}
return false
}
}
// getReadyNodeNames returns names of schedulable, ready nodes from the node lister.
func (lbc *LoadBalancerController) getReadyNodeNames() ([]string, error) {
nodeNames := []string{}
nodes, err := lbc.nodeLister.NodeCondition(getNodeReadyPredicate()).List()
if err != nil {
return nodeNames, err
}
for _, n := range nodes {
if n.Spec.Unschedulable {
continue
}
nodeNames = append(nodeNames, n.Name)
}
return nodeNames, nil
}