diff --git a/pkg/daemon/daemon.go b/pkg/daemon/daemon.go index 3931a2ba9..297893d0e 100644 --- a/pkg/daemon/daemon.go +++ b/pkg/daemon/daemon.go @@ -98,8 +98,6 @@ type Daemon struct { disableDrain bool - nodeLister listerv1.NodeLister - workqueue workqueue.RateLimitingInterface mcpName string @@ -243,14 +241,17 @@ func (dn *Daemon) Run(stopCh <-chan struct{}, exitCh <-chan error) error { }) rand.Seed(time.Now().UnixNano()) - nodeInformerFactory := informers.NewSharedInformerFactory(dn.kubeClient, + nodeInformerFactory := informers.NewSharedInformerFactoryWithOptions(dn.kubeClient, time.Second*15, + informers.WithTweakListOptions(func(lo *metav1.ListOptions) { + lo.FieldSelector = metadataKey + "=" + vars.NodeName + }), ) - dn.nodeLister = nodeInformerFactory.Core().V1().Nodes().Lister() nodeInformer := nodeInformerFactory.Core().V1().Nodes().Informer() nodeInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ - AddFunc: dn.nodeAddHandler, - UpdateFunc: dn.nodeUpdateHandler, + AddFunc: dn.currentNodeAddHandler, + UpdateFunc: dn.currentNodeUpdateHandler, + DeleteFunc: dn.currentNodeDeleteHandler, }) go cfgInformer.Run(dn.stopCh) go nodeInformer.Run(dn.stopCh) @@ -353,19 +354,49 @@ func (dn *Daemon) processNextWorkItem() bool { return true } -func (dn *Daemon) nodeAddHandler(obj interface{}) { - dn.nodeUpdateHandler(nil, obj) +func (dn *Daemon) currentNodeAddHandler(obj interface{}) { + dn.currentNodeUpdateHandler(nil, obj) } -func (dn *Daemon) nodeUpdateHandler(old, new interface{}) { - node, err := dn.nodeLister.Get(vars.NodeName) - if errors.IsNotFound(err) { - log.Log.V(2).Info("nodeUpdateHandler(): node has been deleted", "name", vars.NodeName) - return +func (dn *Daemon) currentNodeUpdateHandler(old, new interface{}) { + dn.node = new.(*corev1.Node).DeepCopy() +} + +func (dn *Daemon) currentNodeDeleteHandler(obj interface{}) { + log.Log.V(2).Info("nodeUpdateHandler(): node has been deleted", "name", vars.NodeName) +} + +func (dn *Daemon) startAllNodesInformer(stopWatchCh chan struct{}) error { + go func() { + for { + select { + case <-dn.stopCh: + close(stopWatchCh) + return + case <-stopWatchCh: + return + } + } + }() + nodeInformerFactory := informers.NewSharedInformerFactory(dn.kubeClient, + time.Minute*5, + ) + nodeLister := nodeInformerFactory.Core().V1().Nodes().Lister() + nodeInformer := nodeInformerFactory.Core().V1().Nodes().Informer() + nodeInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: func(_ interface{}) { dn.nodeUpdateHandler(nodeLister) }, + UpdateFunc: func(_, _ interface{}) { dn.nodeUpdateHandler(nodeLister) }, + }) + go nodeInformer.Run(stopWatchCh) + if ok := cache.WaitForCacheSync(stopWatchCh, nodeInformer.HasSynced); !ok { + return fmt.Errorf("failed to wait for node caches to sync") } - dn.node = node.DeepCopy() + dn.nodeUpdateHandler(nodeLister) + return nil +} - nodes, err := dn.nodeLister.List(labels.Everything()) +func (dn *Daemon) nodeUpdateHandler(nodeLister listerv1.NodeLister) { + nodes, err := nodeLister.List(labels.Everything()) if err != nil { log.Log.Error(err, "nodeUpdateHandler(): failed to list nodes") return @@ -903,6 +934,15 @@ func (dn *Daemon) getDrainLock(ctx context.Context, done chan bool) { Callbacks: leaderelection.LeaderCallbacks{ OnStartedLeading: func(ctx context.Context) { log.Log.V(2).Info("getDrainLock(): started leading") + stopAllNodesWatchChan := make(chan struct{}) + err = dn.startAllNodesInformer(stopAllNodesWatchChan) + if err != nil { + log.Log.Error(err, "getDrainLock(): Failed to start node informer") + // The context was canceled, stopChannel closed. There is no need to block here + done <- true + return + } + log.Log.V(2).Info("getDrainLock(): started node informer") for { time.Sleep(3 * time.Second) if dn.node.Annotations[annoKey] == annoMcpPaused { @@ -911,6 +951,8 @@ func (dn *Daemon) getDrainLock(ctx context.Context, done chan bool) { return } if dn.drainable { + // Stop node informer + close(stopAllNodesWatchChan) log.Log.V(2).Info("getDrainLock(): no other node is draining") err = dn.annotateNode(vars.NodeName, annoDraining) if err != nil {