Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix argument list too long in manage controller #711

Merged
merged 1 commit into from
Oct 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
100 changes: 54 additions & 46 deletions pkg/controller/manage/manage_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,9 @@ type KmeshManageController struct {
namespaceLister v1.NamespaceLister
queue workqueue.RateLimitingInterface
client kubernetes.Interface
sm *kmeshsecurity.SecretManager
xdpProgFd int
mode string
}

func isPodReady(pod *corev1.Pod) bool {
Expand All @@ -76,7 +79,7 @@ func isPodReady(pod *corev1.Pod) bool {
return false
}

func NewKmeshManageController(client kubernetes.Interface, security *kmeshsecurity.SecretManager, xdpProgFd int, mode string) (*KmeshManageController, error) {
func NewKmeshManageController(client kubernetes.Interface, sm *kmeshsecurity.SecretManager, xdpProgFd int, mode string) (*KmeshManageController, error) {
informerFactory := kube.NewInformerFactory(client)
podInformer := informerFactory.Core().V1().Pods().Informer()
podLister := informerFactory.Core().V1().Pods().Lister()
Expand All @@ -87,87 +90,92 @@ func NewKmeshManageController(client kubernetes.Interface, security *kmeshsecuri

queue := workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())

kmc := &KmeshManageController{
informerFactory: informerFactory,
podInformer: podInformer,
podLister: podLister,
factory: factory,
namespaceInformer: namespaceInformer,
namespaceLister: namespaceLister,
queue: queue,
client: client,
sm: sm,
xdpProgFd: xdpProgFd,
mode: mode,
}

if _, err := podInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
handlePodAddFunc(obj, namespaceLister, queue, security, xdpProgFd, mode)
kmc.handlePodAddFunc(obj)
},
UpdateFunc: func(oldObj, newObj interface{}) {
handlePodUpdateFunc(newObj, namespaceLister, queue, security, xdpProgFd, mode)
kmc.handlePodUpdateFunc(oldObj, newObj)
},
DeleteFunc: func(obj interface{}) {
handlePodDeleteFunc(obj, security)
kmc.handlePodDeleteFunc(obj)
},
}); err != nil {
return nil, fmt.Errorf("failed to add event handler to podInformer: %v", err)
}

if _, err := namespaceInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
UpdateFunc: func(oldObj, newObj interface{}) {
handleNamespaceUpdateFunc(oldObj, newObj, podLister, queue, security, xdpProgFd, mode)
kmc.handleNamespaceUpdateFunc(oldObj, newObj)
},
}); err != nil {
return nil, fmt.Errorf("failed to add event handler to namespaceInformer: %v", err)
}

return &KmeshManageController{
informerFactory: informerFactory,
podInformer: podInformer,
podLister: podLister,
factory: factory,
namespaceInformer: namespaceInformer,
namespaceLister: namespaceLister,
queue: queue,
client: client,
}, nil
return kmc, nil
}

func handlePodAddFunc(obj interface{}, namespaceLister v1.NamespaceLister, queue workqueue.RateLimitingInterface, security *kmeshsecurity.SecretManager, xdpProgFd int, mode string) {
func (kmc *KmeshManageController) handlePodAddFunc(obj interface{}) {
pod, ok := obj.(*corev1.Pod)
if !ok {
log.Errorf("expected *corev1.Pod but got %T", obj)
return
}

namespace, err := namespaceLister.Get(pod.Namespace)
namespace, err := kmc.namespaceLister.Get(pod.Namespace)
if err != nil {
log.Errorf("failed to get pod namespace %s: %v", pod.Namespace, err)
return
}

if !utils.ShouldEnroll(pod, namespace) {
if pod.Annotations[constants.KmeshRedirectionAnnotation] == "enabled" {
disableKmeshManage(pod, queue, security, mode)
if utils.AnnotationEnabled(pod.Annotations[constants.KmeshRedirectionAnnotation]) {
kmc.disableKmeshManage(pod)
}
return
}
enableKmeshManage(pod, queue, security, xdpProgFd, mode)
kmc.enableKmeshManage(pod)
}

func handlePodUpdateFunc(newObj interface{}, namespaceLister v1.NamespaceLister, queue workqueue.RateLimitingInterface, security *kmeshsecurity.SecretManager, xdpProgFd int, mode string) {
func (kmc *KmeshManageController) handlePodUpdateFunc(_, newObj interface{}) {
newPod, okNew := newObj.(*corev1.Pod)
if !okNew {
log.Errorf("expected *corev1.Pod but got %T", newObj)
return
}

namespace, err := namespaceLister.Get(newPod.Namespace)
namespace, err := kmc.namespaceLister.Get(newPod.Namespace)
if err != nil {
log.Errorf("failed to get pod namespace %s: %v", newPod.Namespace, err)
return
}

// enable kmesh manage
if newPod.Annotations[constants.KmeshRedirectionAnnotation] != "enabled" && utils.ShouldEnroll(newPod, namespace) {
enableKmeshManage(newPod, queue, security, xdpProgFd, mode)
if !utils.AnnotationEnabled(newPod.Annotations[constants.KmeshRedirectionAnnotation]) && utils.ShouldEnroll(newPod, namespace) {
kmc.enableKmeshManage(newPod)
}

// disable kmesh manage
if newPod.Annotations[constants.KmeshRedirectionAnnotation] == "enabled" && !utils.ShouldEnroll(newPod, namespace) {
disableKmeshManage(newPod, queue, security, mode)
if utils.AnnotationEnabled(newPod.Annotations[constants.KmeshRedirectionAnnotation]) && !utils.ShouldEnroll(newPod, namespace) {
kmc.disableKmeshManage(newPod)
}
}

func handlePodDeleteFunc(obj interface{}, security *kmeshsecurity.SecretManager) {
func (kmc *KmeshManageController) handlePodDeleteFunc(obj interface{}) {
pod, ok := obj.(*corev1.Pod)
if !ok {
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
Expand All @@ -182,15 +190,15 @@ func handlePodDeleteFunc(obj interface{}, security *kmeshsecurity.SecretManager)
}
}

if pod.Annotations[constants.KmeshRedirectionAnnotation] == "enabled" {
if utils.AnnotationEnabled(pod.Annotations[constants.KmeshRedirectionAnnotation]) {
log.Infof("%s/%s: Pod managed by Kmesh is deleted", pod.GetNamespace(), pod.GetName())
sendCertRequest(security, pod, kmeshsecurity.DELETE)
sendCertRequest(kmc.sm, pod, kmeshsecurity.DELETE)
// We donot need to do handleKmeshManage for delete, because we may have no change to execute a cmd in pod net ns.
// And we have done this in kmesh-cni
}
}

func handleNamespaceUpdateFunc(oldObj, newObj interface{}, podLister v1.PodLister, queue workqueue.RateLimitingInterface, security *kmeshsecurity.SecretManager, xdpProgFd int, mode string) {
func (kmc *KmeshManageController) handleNamespaceUpdateFunc(oldObj, newObj interface{}) {
oldNS, okOld := oldObj.(*corev1.Namespace)
newNS, okNew := newObj.(*corev1.Namespace)
if !okOld || !okNew {
Expand All @@ -201,17 +209,17 @@ func handleNamespaceUpdateFunc(oldObj, newObj interface{}, podLister v1.PodListe
// Compare labels to check if they have actually changed
if !utils.ShouldEnroll(nil, oldNS) && utils.ShouldEnroll(nil, newNS) {
log.Infof("Enabling Kmesh for all pods in namespace: %s", newNS.Name)
enableKmeshForPodsInNamespace(newNS.Name, podLister, queue, security, xdpProgFd, mode)
kmc.enableKmeshForPodsInNamespace(newNS.Name)
}

if utils.ShouldEnroll(nil, oldNS) && !utils.ShouldEnroll(nil, newNS) {
log.Infof("Disabling Kmesh for all pods in namespace: %s", newNS.Name)
disableKmeshForPodsInNamespace(newNS.Name, podLister, queue, security, mode)
kmc.disableKmeshForPodsInNamespace(newNS.Name)
}
}

func enableKmeshManage(pod *corev1.Pod, queue workqueue.RateLimitingInterface, security *kmeshsecurity.SecretManager, xdpProgFd int, mode string) {
sendCertRequest(security, pod, kmeshsecurity.ADD)
func (kmc *KmeshManageController) enableKmeshManage(pod *corev1.Pod) {
sendCertRequest(kmc.sm, pod, kmeshsecurity.ADD)
if !isPodReady(pod) {
log.Debugf("Pod %s/%s is not ready, skipping Kmesh manage enable", pod.GetNamespace(), pod.GetName())
return
Expand All @@ -222,12 +230,12 @@ func enableKmeshManage(pod *corev1.Pod, queue workqueue.RateLimitingInterface, s
log.Errorf("failed to enable Kmesh manage")
return
}
queue.AddRateLimited(QueueItem{podName: pod.Name, podNs: pod.Namespace, action: ActionAddAnnotation})
_ = linkXdp(nspath, xdpProgFd, mode)
kmc.queue.AddRateLimited(QueueItem{podName: pod.Name, podNs: pod.Namespace, action: ActionAddAnnotation})
_ = linkXdp(nspath, kmc.xdpProgFd, kmc.mode)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why handle this error?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You mean why NOT handle this error? Because error is handled in linkXdp

}

func disableKmeshManage(pod *corev1.Pod, queue workqueue.RateLimitingInterface, security *kmeshsecurity.SecretManager, mode string) {
sendCertRequest(security, pod, kmeshsecurity.DELETE)
func (kmc *KmeshManageController) disableKmeshManage(pod *corev1.Pod) {
sendCertRequest(kmc.sm, pod, kmeshsecurity.DELETE)
if !isPodReady(pod) {
log.Debugf("%s/%s is not ready, skipping Kmesh manage disable", pod.GetNamespace(), pod.GetName())
return
Expand All @@ -238,32 +246,32 @@ func disableKmeshManage(pod *corev1.Pod, queue workqueue.RateLimitingInterface,
log.Error("failed to disable Kmesh manage")
return
}
queue.AddRateLimited(QueueItem{podName: pod.Name, podNs: pod.Namespace, action: ActionDeleteAnnotation})
_ = unlinkXdp(nspath, mode)
kmc.queue.AddRateLimited(QueueItem{podName: pod.Name, podNs: pod.Namespace, action: ActionDeleteAnnotation})
_ = unlinkXdp(nspath, kmc.mode)
}

func enableKmeshForPodsInNamespace(namespace string, podLister v1.PodLister, queue workqueue.RateLimitingInterface, security *kmeshsecurity.SecretManager, xdpProgFd int, mode string) {
pods, err := podLister.Pods(namespace).List(labels.Everything())
func (kmc *KmeshManageController) enableKmeshForPodsInNamespace(namespace string) {
pods, err := kmc.podLister.Pods(namespace).List(labels.Everything())
if err != nil {
log.Errorf("Error listing pods: %v", err)
return
}

for _, pod := range pods {
enableKmeshManage(pod, queue, security, xdpProgFd, mode)
kmc.enableKmeshManage(pod)
}
}

func disableKmeshForPodsInNamespace(namespace string, podLister v1.PodLister, queue workqueue.RateLimitingInterface, security *kmeshsecurity.SecretManager, mode string) {
pods, err := podLister.Pods(namespace).List(labels.Everything())
func (kmc *KmeshManageController) disableKmeshForPodsInNamespace(namespace string) {
pods, err := kmc.podLister.Pods(namespace).List(labels.Everything())
if err != nil {
log.Errorf("Error listing pods in namespace %s: %v", namespace, err)
return
}

for _, pod := range pods {
if !utils.ShouldEnroll(pod, nil) {
disableKmeshManage(pod, queue, security, mode)
kmc.disableKmeshManage(pod)
}
}
}
Expand Down
6 changes: 5 additions & 1 deletion pkg/utils/enroll.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ var (
)

func PatchKmeshRedirectAnnotation(client kubernetes.Interface, pod *corev1.Pod) error {
if pod.Annotations[constants.KmeshRedirectionAnnotation] == "enabled" {
if AnnotationEnabled(pod.Annotations[constants.KmeshRedirectionAnnotation]) {
log.Debugf("Pod %s in namespace %s already has annotation %s", pod.Name, pod.Namespace, constants.KmeshRedirectionAnnotation)
return nil
}
Expand Down Expand Up @@ -134,3 +134,7 @@ func DelKmeshRedirectAnnotation(client kubernetes.Interface, pod *corev1.Pod) er
)
return err
}

func AnnotationEnabled(annotation string) bool {
return annotation == "enabled"
}
4 changes: 2 additions & 2 deletions pkg/utils/enroll_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -329,7 +329,7 @@ func TestPatchKmeshRedirectAnnotation(t *testing.T) {
t.Errorf("Failed to get the patched pod: %v", err)
}

if got.Annotations[constants.KmeshRedirectionAnnotation] != "enabled" {
if !AnnotationEnabled(got.Annotations[constants.KmeshRedirectionAnnotation]) {
t.Errorf("Expected annotation %s to be 'enabled', got '%s'", constants.KmeshRedirectionAnnotation, got.Annotations[constants.KmeshRedirectionAnnotation])
}

Expand All @@ -343,7 +343,7 @@ func TestPatchKmeshRedirectAnnotation(t *testing.T) {
t.Errorf("Failed to get the patched pod: %v", err)
}

if got.Annotations[constants.KmeshRedirectionAnnotation] != "enabled" {
if !AnnotationEnabled(got.Annotations[constants.KmeshRedirectionAnnotation]) {
t.Errorf("Expected annotation %s to be 'enabled', got '%s'", constants.KmeshRedirectionAnnotation, got.Annotations[constants.KmeshRedirectionAnnotation])
}
}
Expand Down
Loading