diff --git a/pkg/controller/manage/manage_controller.go b/pkg/controller/manage/manage_controller.go index fd1d9c87f..56cc9a81f 100644 --- a/pkg/controller/manage/manage_controller.go +++ b/pkg/controller/manage/manage_controller.go @@ -65,6 +65,9 @@ type KmeshManageController struct { namespaceLister v1.NamespaceLister queue workqueue.RateLimitingInterface client kubernetes.Interface + sm *kmeshsecurity.SecretManager + xdpProgFd int + mode string } func isPodReady(pod *corev1.Pod) bool { @@ -76,7 +79,7 @@ func isPodReady(pod *corev1.Pod) bool { return false } -func NewKmeshManageController(client kubernetes.Interface, security *kmeshsecurity.SecretManager, xdpProgFd int, mode string) (*KmeshManageController, error) { +func NewKmeshManageController(client kubernetes.Interface, sm *kmeshsecurity.SecretManager, xdpProgFd int, mode string) (*KmeshManageController, error) { informerFactory := kube.NewInformerFactory(client) podInformer := informerFactory.Core().V1().Pods().Informer() podLister := informerFactory.Core().V1().Pods().Lister() @@ -87,15 +90,29 @@ func NewKmeshManageController(client kubernetes.Interface, security *kmeshsecuri queue := workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()) + kmc := &KmeshManageController{ + informerFactory: informerFactory, + podInformer: podInformer, + podLister: podLister, + factory: factory, + namespaceInformer: namespaceInformer, + namespaceLister: namespaceLister, + queue: queue, + client: client, + sm: sm, + xdpProgFd: xdpProgFd, + mode: mode, + } + if _, err := podInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { - handlePodAddFunc(obj, namespaceLister, queue, security, xdpProgFd, mode) + kmc.handlePodAddFunc(obj) }, UpdateFunc: func(oldObj, newObj interface{}) { - handlePodUpdateFunc(newObj, namespaceLister, queue, security, xdpProgFd, mode) + kmc.handlePodUpdateFunc(oldObj, newObj) }, DeleteFunc: func(obj interface{}) { - handlePodDeleteFunc(obj, security) + kmc.handlePodDeleteFunc(obj) }, }); err != nil { return nil, fmt.Errorf("failed to add event handler to podInformer: %v", err) @@ -103,71 +120,62 @@ func NewKmeshManageController(client kubernetes.Interface, security *kmeshsecuri if _, err := namespaceInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ UpdateFunc: func(oldObj, newObj interface{}) { - handleNamespaceUpdateFunc(oldObj, newObj, podLister, queue, security, xdpProgFd, mode) + kmc.handleNamespaceUpdateFunc(oldObj, newObj) }, }); err != nil { return nil, fmt.Errorf("failed to add event handler to namespaceInformer: %v", err) } - return &KmeshManageController{ - informerFactory: informerFactory, - podInformer: podInformer, - podLister: podLister, - factory: factory, - namespaceInformer: namespaceInformer, - namespaceLister: namespaceLister, - queue: queue, - client: client, - }, nil + return kmc, nil } -func handlePodAddFunc(obj interface{}, namespaceLister v1.NamespaceLister, queue workqueue.RateLimitingInterface, security *kmeshsecurity.SecretManager, xdpProgFd int, mode string) { +func (kmc *KmeshManageController) handlePodAddFunc(obj interface{}) { pod, ok := obj.(*corev1.Pod) if !ok { log.Errorf("expected *corev1.Pod but got %T", obj) return } - namespace, err := namespaceLister.Get(pod.Namespace) + namespace, err := kmc.namespaceLister.Get(pod.Namespace) if err != nil { log.Errorf("failed to get pod namespace %s: %v", pod.Namespace, err) return } if !utils.ShouldEnroll(pod, namespace) { - if pod.Annotations[constants.KmeshRedirectionAnnotation] == "enabled" { - disableKmeshManage(pod, queue, security, mode) + if utils.AnnotationEnabled(pod.Annotations[constants.KmeshRedirectionAnnotation]) { + kmc.disableKmeshManage(pod) } return } - enableKmeshManage(pod, queue, security, xdpProgFd, mode) + kmc.enableKmeshManage(pod) } -func handlePodUpdateFunc(newObj interface{}, namespaceLister v1.NamespaceLister, queue workqueue.RateLimitingInterface, security *kmeshsecurity.SecretManager, xdpProgFd int, mode string) { +func (kmc *KmeshManageController) handlePodUpdateFunc(_, newObj interface{}) { newPod, okNew := newObj.(*corev1.Pod) if !okNew { log.Errorf("expected *corev1.Pod but got %T", newObj) return } - namespace, err := namespaceLister.Get(newPod.Namespace) + namespace, err := kmc.namespaceLister.Get(newPod.Namespace) if err != nil { log.Errorf("failed to get pod namespace %s: %v", newPod.Namespace, err) return } // enable kmesh manage - if newPod.Annotations[constants.KmeshRedirectionAnnotation] != "enabled" && utils.ShouldEnroll(newPod, namespace) { - enableKmeshManage(newPod, queue, security, xdpProgFd, mode) + if !utils.AnnotationEnabled(newPod.Annotations[constants.KmeshRedirectionAnnotation]) && utils.ShouldEnroll(newPod, namespace) { + kmc.enableKmeshManage(newPod) } // disable kmesh manage - if newPod.Annotations[constants.KmeshRedirectionAnnotation] == "enabled" && !utils.ShouldEnroll(newPod, namespace) { - disableKmeshManage(newPod, queue, security, mode) + if utils.AnnotationEnabled(newPod.Annotations[constants.KmeshRedirectionAnnotation]) && !utils.ShouldEnroll(newPod, namespace) { + kmc.disableKmeshManage(newPod) } } -func handlePodDeleteFunc(obj interface{}, security *kmeshsecurity.SecretManager) { +func (kmc *KmeshManageController) handlePodDeleteFunc(obj interface{}) { pod, ok := obj.(*corev1.Pod) if !ok { tombstone, ok := obj.(cache.DeletedFinalStateUnknown) @@ -182,15 +190,15 @@ func handlePodDeleteFunc(obj interface{}, security *kmeshsecurity.SecretManager) } } - if pod.Annotations[constants.KmeshRedirectionAnnotation] == "enabled" { + if utils.AnnotationEnabled(pod.Annotations[constants.KmeshRedirectionAnnotation]) { log.Infof("%s/%s: Pod managed by Kmesh is deleted", pod.GetNamespace(), pod.GetName()) - sendCertRequest(security, pod, kmeshsecurity.DELETE) + sendCertRequest(kmc.sm, pod, kmeshsecurity.DELETE) // We donot need to do handleKmeshManage for delete, because we may have no change to execute a cmd in pod net ns. // And we have done this in kmesh-cni } } -func handleNamespaceUpdateFunc(oldObj, newObj interface{}, podLister v1.PodLister, queue workqueue.RateLimitingInterface, security *kmeshsecurity.SecretManager, xdpProgFd int, mode string) { +func (kmc *KmeshManageController) handleNamespaceUpdateFunc(oldObj, newObj interface{}) { oldNS, okOld := oldObj.(*corev1.Namespace) newNS, okNew := newObj.(*corev1.Namespace) if !okOld || !okNew { @@ -201,17 +209,17 @@ func handleNamespaceUpdateFunc(oldObj, newObj interface{}, podLister v1.PodListe // Compare labels to check if they have actually changed if !utils.ShouldEnroll(nil, oldNS) && utils.ShouldEnroll(nil, newNS) { log.Infof("Enabling Kmesh for all pods in namespace: %s", newNS.Name) - enableKmeshForPodsInNamespace(newNS.Name, podLister, queue, security, xdpProgFd, mode) + kmc.enableKmeshForPodsInNamespace(newNS.Name) } if utils.ShouldEnroll(nil, oldNS) && !utils.ShouldEnroll(nil, newNS) { log.Infof("Disabling Kmesh for all pods in namespace: %s", newNS.Name) - disableKmeshForPodsInNamespace(newNS.Name, podLister, queue, security, mode) + kmc.disableKmeshForPodsInNamespace(newNS.Name) } } -func enableKmeshManage(pod *corev1.Pod, queue workqueue.RateLimitingInterface, security *kmeshsecurity.SecretManager, xdpProgFd int, mode string) { - sendCertRequest(security, pod, kmeshsecurity.ADD) +func (kmc *KmeshManageController) enableKmeshManage(pod *corev1.Pod) { + sendCertRequest(kmc.sm, pod, kmeshsecurity.ADD) if !isPodReady(pod) { log.Debugf("Pod %s/%s is not ready, skipping Kmesh manage enable", pod.GetNamespace(), pod.GetName()) return @@ -222,12 +230,12 @@ func enableKmeshManage(pod *corev1.Pod, queue workqueue.RateLimitingInterface, s log.Errorf("failed to enable Kmesh manage") return } - queue.AddRateLimited(QueueItem{podName: pod.Name, podNs: pod.Namespace, action: ActionAddAnnotation}) - _ = linkXdp(nspath, xdpProgFd, mode) + kmc.queue.AddRateLimited(QueueItem{podName: pod.Name, podNs: pod.Namespace, action: ActionAddAnnotation}) + _ = linkXdp(nspath, kmc.xdpProgFd, kmc.mode) } -func disableKmeshManage(pod *corev1.Pod, queue workqueue.RateLimitingInterface, security *kmeshsecurity.SecretManager, mode string) { - sendCertRequest(security, pod, kmeshsecurity.DELETE) +func (kmc *KmeshManageController) disableKmeshManage(pod *corev1.Pod) { + sendCertRequest(kmc.sm, pod, kmeshsecurity.DELETE) if !isPodReady(pod) { log.Debugf("%s/%s is not ready, skipping Kmesh manage disable", pod.GetNamespace(), pod.GetName()) return @@ -238,24 +246,24 @@ func disableKmeshManage(pod *corev1.Pod, queue workqueue.RateLimitingInterface, log.Error("failed to disable Kmesh manage") return } - queue.AddRateLimited(QueueItem{podName: pod.Name, podNs: pod.Namespace, action: ActionDeleteAnnotation}) - _ = unlinkXdp(nspath, mode) + kmc.queue.AddRateLimited(QueueItem{podName: pod.Name, podNs: pod.Namespace, action: ActionDeleteAnnotation}) + _ = unlinkXdp(nspath, kmc.mode) } -func enableKmeshForPodsInNamespace(namespace string, podLister v1.PodLister, queue workqueue.RateLimitingInterface, security *kmeshsecurity.SecretManager, xdpProgFd int, mode string) { - pods, err := podLister.Pods(namespace).List(labels.Everything()) +func (kmc *KmeshManageController) enableKmeshForPodsInNamespace(namespace string) { + pods, err := kmc.podLister.Pods(namespace).List(labels.Everything()) if err != nil { log.Errorf("Error listing pods: %v", err) return } for _, pod := range pods { - enableKmeshManage(pod, queue, security, xdpProgFd, mode) + kmc.enableKmeshManage(pod) } } -func disableKmeshForPodsInNamespace(namespace string, podLister v1.PodLister, queue workqueue.RateLimitingInterface, security *kmeshsecurity.SecretManager, mode string) { - pods, err := podLister.Pods(namespace).List(labels.Everything()) +func (kmc *KmeshManageController) disableKmeshForPodsInNamespace(namespace string) { + pods, err := kmc.podLister.Pods(namespace).List(labels.Everything()) if err != nil { log.Errorf("Error listing pods in namespace %s: %v", namespace, err) return @@ -263,7 +271,7 @@ func disableKmeshForPodsInNamespace(namespace string, podLister v1.PodLister, qu for _, pod := range pods { if !utils.ShouldEnroll(pod, nil) { - disableKmeshManage(pod, queue, security, mode) + kmc.disableKmeshManage(pod) } } } diff --git a/pkg/utils/enroll.go b/pkg/utils/enroll.go index 239a90c0f..ecb87cafe 100644 --- a/pkg/utils/enroll.go +++ b/pkg/utils/enroll.go @@ -106,7 +106,7 @@ var ( ) func PatchKmeshRedirectAnnotation(client kubernetes.Interface, pod *corev1.Pod) error { - if pod.Annotations[constants.KmeshRedirectionAnnotation] == "enabled" { + if AnnotationEnabled(pod.Annotations[constants.KmeshRedirectionAnnotation]) { log.Debugf("Pod %s in namespace %s already has annotation %s", pod.Name, pod.Namespace, constants.KmeshRedirectionAnnotation) return nil } @@ -134,3 +134,7 @@ func DelKmeshRedirectAnnotation(client kubernetes.Interface, pod *corev1.Pod) er ) return err } + +func AnnotationEnabled(annotation string) bool { + return annotation == "enabled" +} diff --git a/pkg/utils/enroll_test.go b/pkg/utils/enroll_test.go index 1a233a1e9..22908f032 100644 --- a/pkg/utils/enroll_test.go +++ b/pkg/utils/enroll_test.go @@ -329,7 +329,7 @@ func TestPatchKmeshRedirectAnnotation(t *testing.T) { t.Errorf("Failed to get the patched pod: %v", err) } - if got.Annotations[constants.KmeshRedirectionAnnotation] != "enabled" { + if !AnnotationEnabled(got.Annotations[constants.KmeshRedirectionAnnotation]) { t.Errorf("Expected annotation %s to be 'enabled', got '%s'", constants.KmeshRedirectionAnnotation, got.Annotations[constants.KmeshRedirectionAnnotation]) } @@ -343,7 +343,7 @@ func TestPatchKmeshRedirectAnnotation(t *testing.T) { t.Errorf("Failed to get the patched pod: %v", err) } - if got.Annotations[constants.KmeshRedirectionAnnotation] != "enabled" { + if !AnnotationEnabled(got.Annotations[constants.KmeshRedirectionAnnotation]) { t.Errorf("Expected annotation %s to be 'enabled', got '%s'", constants.KmeshRedirectionAnnotation, got.Annotations[constants.KmeshRedirectionAnnotation]) } }