-
Notifications
You must be signed in to change notification settings - Fork 748
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Wait until the discover controller cache is synced #738
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||
---|---|---|---|---|
|
@@ -68,15 +68,31 @@ type Controller struct { | |||
controller *controller | ||||
kubeClient kubernetes.Interface | ||||
myNodeName string | ||||
synced bool | ||||
} | ||||
|
||||
// NewController creates a new DiscoveryController | ||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please update the docstring comment here to indicate that the function constructs the caches (and connects to the k8s API server to do so) before returning a fully-constructed controller object. |
||||
func NewController(clientset kubernetes.Interface) *Controller { | ||||
return &Controller{kubeClient: clientset, | ||||
func NewController(clientset kubernetes.Interface) (*Controller, error) { | ||||
stopCh := wait.NeverStop | ||||
|
||||
d := &Controller{kubeClient: clientset, | ||||
myNodeName: os.Getenv("MY_NODE_NAME"), | ||||
cniPods: make(map[string]string), | ||||
workerPods: make(map[string]*K8SPodInfo)} | ||||
|
||||
log.Info("Starting Pod informer") | ||||
|
||||
go d.controller.informer.Run(stopCh) | ||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
amazon-vpc-cni-k8s/pkg/k8sapi/discovery.go Line 155 in 7c6dec7
What you need to do is basically get rid of the The only reason that the select {} but that's really kinda hacky IMHO. A cleaner solution would be to simply move all this setup code into // create the pod watcher
podListWatcher := cache.NewListWatchFromClient(d.kubeClient.CoreV1().RESTClient(), "pods", metav1.NamespaceAll, fields.OneTermEqualSelector("spec.nodeName", d.myNodeName))
// create the workqueue
queue := workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())
// Bind the workqueue to a cache with the help of an informer. This way we make sure that
// whenever the cache is updated, the pod key is added to the workqueue.
// Note that when we finally process the item from the workqueue, we might see a newer version
// of the Pod than the version which was responsible for triggering the update.
indexer, informer := cache.NewIndexerInformer(podListWatcher, &v1.Pod{}, 0, cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
key, err := cache.MetaNamespaceKeyFunc(obj)
if err == nil {
queue.Add(key)
}
},
UpdateFunc: func(old interface{}, new interface{}) {
key, err := cache.MetaNamespaceKeyFunc(new)
if err == nil {
queue.Add(key)
}
},
DeleteFunc: func(obj interface{}) {
// IndexerInformer uses a delta queue, therefore for deletes we have to use this
// key function.
key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
if err == nil {
queue.Add(key)
}
},
}, cache.Indexers{})
d.controller = newController(queue, indexer, informer) and move this code: // Now let's start the controller
stop := make(chan struct{})
defer close(stop)
go d.run(1, stop)
// Wait forever
select {} to a separate |
||||
|
||||
log.Info("Waiting for controller cache sync") | ||||
// Wait for all involved caches to be synced, before processing items from the queue is started | ||||
if !cache.WaitForCacheSync(stopCh, d.controller.informer.HasSynced) { | ||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. On a future PR (not this one), we should use the Which would basically remove the need for the log lines here. |
||||
log.Error("Timed out waiting for caches to sync!") | ||||
return nil, fmt.Errorf("timed out waiting for caches to sync") | ||||
} | ||||
|
||||
log.Info("Synced successfully with APIServer") | ||||
|
||||
return d, nil | ||||
} | ||||
|
||||
// CreateKubeClient creates a k8s client | ||||
|
@@ -167,11 +183,6 @@ func (d *Controller) DiscoverK8SPods() { | |||
func (d *Controller) K8SGetLocalPodIPs() ([]*K8SPodInfo, error) { | ||||
var localPods []*K8SPodInfo | ||||
|
||||
if !d.synced { | ||||
log.Info("GetLocalPods: informer not synced yet") | ||||
return nil, ErrInformerNotSynced | ||||
} | ||||
|
||||
log.Debug("GetLocalPods start ...") | ||||
d.workerPodsLock.Lock() | ||||
defer d.workerPodsLock.Unlock() | ||||
|
@@ -305,20 +316,6 @@ func (c *controller) handleErr(err error, key interface{}) { | |||
func (d *Controller) run(threadiness int, stopCh chan struct{}) { | ||||
// Let the workers stop when we are done | ||||
defer d.controller.queue.ShutDown() | ||||
log.Info("Starting Pod controller") | ||||
|
||||
go d.controller.informer.Run(stopCh) | ||||
|
||||
log.Info("Waiting for controller cache sync") | ||||
// Wait for all involved caches to be synced, before processing items from the queue is started | ||||
if !cache.WaitForCacheSync(stopCh, d.controller.informer.HasSynced) { | ||||
log.Error("Timed out waiting for caches to sync!") | ||||
runtime.HandleError(fmt.Errorf("timed out waiting for caches to sync")) | ||||
return | ||||
} | ||||
|
||||
log.Info("Synced successfully with APIServer") | ||||
d.synced = true | ||||
|
||||
for i := 0; i < threadiness; i++ { | ||||
go wait.Until(d.runWorker, time.Second, stopCh) | ||||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This needs to go inside the
NewController()
function and be synchronously executed (see below)