Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Wait until the discover controller cache is synced #738

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,12 @@ func _main() int {
return 1
}

discoverController := k8sapi.NewController(kubeClient)
discoverController, err := k8sapi.NewController(kubeClient)
if err != nil {
log.Errorf("Failed to create new Discover Controller: %v", err)
return 1
}

go discoverController.DiscoverK8SPods()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This needs to go inside the NewController() function and be synchronously executed (see below)


eniConfigController := eniconfig.NewENIConfigController()
Expand All @@ -59,7 +64,6 @@ func _main() int {
}

ipamContext, err := ipamd.New(discoverController, eniConfigController)

if err != nil {
log.Errorf("Initialization failure: %v", err)
return 1
Expand Down
41 changes: 19 additions & 22 deletions pkg/k8sapi/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,15 +68,31 @@ type Controller struct {
controller *controller
kubeClient kubernetes.Interface
myNodeName string
synced bool
}

// NewController creates a new DiscoveryController
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please update the docstring comment here to indicate that the function constructs the caches (and connects to the k8s API server to do so) before returning a fully-constructed controller object.

func NewController(clientset kubernetes.Interface) *Controller {
return &Controller{kubeClient: clientset,
func NewController(clientset kubernetes.Interface) (*Controller, error) {
stopCh := wait.NeverStop

d := &Controller{kubeClient: clientset,
myNodeName: os.Getenv("MY_NODE_NAME"),
cniPods: make(map[string]string),
workerPods: make(map[string]*K8SPodInfo)}

log.Info("Starting Pod informer")

go d.controller.informer.Run(stopCh)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

d.controller will be nil here, since d.controller is created in the DiscoverK8sPods() method:

d.controller = newController(queue, indexer, informer)

What you need to do is basically get rid of the DiscoverK8SPods() method entirely, and move the code that is in there into the NewController() function.

The only reason that the DiscoverK8SPods() method even exists as a separate struct method apparently is to "start the controller" by doing:

select {}

but that's really kinda hacky IMHO. A cleaner solution would be to simply move all this setup code into NewController():

	// create the pod watcher
	podListWatcher := cache.NewListWatchFromClient(d.kubeClient.CoreV1().RESTClient(), "pods", metav1.NamespaceAll, fields.OneTermEqualSelector("spec.nodeName", d.myNodeName))

	// create the workqueue
	queue := workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())

	// Bind the workqueue to a cache with the help of an informer. This way we make sure that
	// whenever the cache is updated, the pod key is added to the workqueue.
	// Note that when we finally process the item from the workqueue, we might see a newer version
	// of the Pod than the version which was responsible for triggering the update.
	indexer, informer := cache.NewIndexerInformer(podListWatcher, &v1.Pod{}, 0, cache.ResourceEventHandlerFuncs{
		AddFunc: func(obj interface{}) {
			key, err := cache.MetaNamespaceKeyFunc(obj)
			if err == nil {
				queue.Add(key)
			}
		},
		UpdateFunc: func(old interface{}, new interface{}) {
			key, err := cache.MetaNamespaceKeyFunc(new)
			if err == nil {
				queue.Add(key)
			}
		},
		DeleteFunc: func(obj interface{}) {
			// IndexerInformer uses a delta queue, therefore for deletes we have to use this
			// key function.
			key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
			if err == nil {
				queue.Add(key)
			}
		},
	}, cache.Indexers{})

	d.controller = newController(queue, indexer, informer)

and move this code:

	// Now let's start the controller
	stop := make(chan struct{})
	defer close(stop)
	go d.run(1, stop)

	// Wait forever
	select {}

to a separate Controller.Listen() method that you can call after returning successfully from NewController()


log.Info("Waiting for controller cache sync")
// Wait for all involved caches to be synced, before processing items from the queue is started
if !cache.WaitForCacheSync(stopCh, d.controller.informer.HasSynced) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On a future PR (not this one), we should use the WaitForNamedCacheSync method that @mogren alluded to in an earlier comment:

https://github.com/kubernetes/client-go/blob/bc9b51d240b2f5000adc2fa96231ab219ab0d702/tools/cache/shared_informer.go#L196

Which would basically remove the need for the log lines here.

log.Error("Timed out waiting for caches to sync!")
return nil, fmt.Errorf("timed out waiting for caches to sync")
}

log.Info("Synced successfully with APIServer")

return d, nil
}

// CreateKubeClient creates a k8s client
Expand Down Expand Up @@ -167,11 +183,6 @@ func (d *Controller) DiscoverK8SPods() {
func (d *Controller) K8SGetLocalPodIPs() ([]*K8SPodInfo, error) {
var localPods []*K8SPodInfo

if !d.synced {
log.Info("GetLocalPods: informer not synced yet")
return nil, ErrInformerNotSynced
}

log.Debug("GetLocalPods start ...")
d.workerPodsLock.Lock()
defer d.workerPodsLock.Unlock()
Expand Down Expand Up @@ -305,20 +316,6 @@ func (c *controller) handleErr(err error, key interface{}) {
func (d *Controller) run(threadiness int, stopCh chan struct{}) {
// Let the workers stop when we are done
defer d.controller.queue.ShutDown()
log.Info("Starting Pod controller")

go d.controller.informer.Run(stopCh)

log.Info("Waiting for controller cache sync")
// Wait for all involved caches to be synced, before processing items from the queue is started
if !cache.WaitForCacheSync(stopCh, d.controller.informer.HasSynced) {
log.Error("Timed out waiting for caches to sync!")
runtime.HandleError(fmt.Errorf("timed out waiting for caches to sync"))
return
}

log.Info("Synced successfully with APIServer")
d.synced = true

for i := 0; i < threadiness; i++ {
go wait.Until(d.runWorker, time.Second, stopCh)
Expand Down