Skip to content

Commit

Permalink
Merge pull request #711 from tacslon/fix/mng
Browse files Browse the repository at this point in the history
Fix argument list too long in manage controller
  • Loading branch information
kmesh-bot authored Oct 8, 2024
2 parents 66882c2 + fdca0a7 commit 973cbc6
Show file tree
Hide file tree
Showing 3 changed files with 61 additions and 49 deletions.
100 changes: 54 additions & 46 deletions pkg/controller/manage/manage_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,9 @@ type KmeshManageController struct {
namespaceLister v1.NamespaceLister
queue workqueue.RateLimitingInterface
client kubernetes.Interface
sm *kmeshsecurity.SecretManager
xdpProgFd int
mode string
}

func isPodReady(pod *corev1.Pod) bool {
Expand All @@ -76,7 +79,7 @@ func isPodReady(pod *corev1.Pod) bool {
return false
}

func NewKmeshManageController(client kubernetes.Interface, security *kmeshsecurity.SecretManager, xdpProgFd int, mode string) (*KmeshManageController, error) {
func NewKmeshManageController(client kubernetes.Interface, sm *kmeshsecurity.SecretManager, xdpProgFd int, mode string) (*KmeshManageController, error) {
informerFactory := kube.NewInformerFactory(client)
podInformer := informerFactory.Core().V1().Pods().Informer()
podLister := informerFactory.Core().V1().Pods().Lister()
Expand All @@ -87,87 +90,92 @@ func NewKmeshManageController(client kubernetes.Interface, security *kmeshsecuri

queue := workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())

kmc := &KmeshManageController{
informerFactory: informerFactory,
podInformer: podInformer,
podLister: podLister,
factory: factory,
namespaceInformer: namespaceInformer,
namespaceLister: namespaceLister,
queue: queue,
client: client,
sm: sm,
xdpProgFd: xdpProgFd,
mode: mode,
}

if _, err := podInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
handlePodAddFunc(obj, namespaceLister, queue, security, xdpProgFd, mode)
kmc.handlePodAddFunc(obj)
},
UpdateFunc: func(oldObj, newObj interface{}) {
handlePodUpdateFunc(newObj, namespaceLister, queue, security, xdpProgFd, mode)
kmc.handlePodUpdateFunc(oldObj, newObj)
},
DeleteFunc: func(obj interface{}) {
handlePodDeleteFunc(obj, security)
kmc.handlePodDeleteFunc(obj)
},
}); err != nil {
return nil, fmt.Errorf("failed to add event handler to podInformer: %v", err)
}

if _, err := namespaceInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
UpdateFunc: func(oldObj, newObj interface{}) {
handleNamespaceUpdateFunc(oldObj, newObj, podLister, queue, security, xdpProgFd, mode)
kmc.handleNamespaceUpdateFunc(oldObj, newObj)
},
}); err != nil {
return nil, fmt.Errorf("failed to add event handler to namespaceInformer: %v", err)
}

return &KmeshManageController{
informerFactory: informerFactory,
podInformer: podInformer,
podLister: podLister,
factory: factory,
namespaceInformer: namespaceInformer,
namespaceLister: namespaceLister,
queue: queue,
client: client,
}, nil
return kmc, nil
}

func handlePodAddFunc(obj interface{}, namespaceLister v1.NamespaceLister, queue workqueue.RateLimitingInterface, security *kmeshsecurity.SecretManager, xdpProgFd int, mode string) {
func (kmc *KmeshManageController) handlePodAddFunc(obj interface{}) {
pod, ok := obj.(*corev1.Pod)
if !ok {
log.Errorf("expected *corev1.Pod but got %T", obj)
return
}

namespace, err := namespaceLister.Get(pod.Namespace)
namespace, err := kmc.namespaceLister.Get(pod.Namespace)
if err != nil {
log.Errorf("failed to get pod namespace %s: %v", pod.Namespace, err)
return
}

if !utils.ShouldEnroll(pod, namespace) {
if pod.Annotations[constants.KmeshRedirectionAnnotation] == "enabled" {
disableKmeshManage(pod, queue, security, mode)
if utils.AnnotationEnabled(pod.Annotations[constants.KmeshRedirectionAnnotation]) {
kmc.disableKmeshManage(pod)
}
return
}
enableKmeshManage(pod, queue, security, xdpProgFd, mode)
kmc.enableKmeshManage(pod)
}

func handlePodUpdateFunc(newObj interface{}, namespaceLister v1.NamespaceLister, queue workqueue.RateLimitingInterface, security *kmeshsecurity.SecretManager, xdpProgFd int, mode string) {
func (kmc *KmeshManageController) handlePodUpdateFunc(_, newObj interface{}) {
newPod, okNew := newObj.(*corev1.Pod)
if !okNew {
log.Errorf("expected *corev1.Pod but got %T", newObj)
return
}

namespace, err := namespaceLister.Get(newPod.Namespace)
namespace, err := kmc.namespaceLister.Get(newPod.Namespace)
if err != nil {
log.Errorf("failed to get pod namespace %s: %v", newPod.Namespace, err)
return
}

// enable kmesh manage
if newPod.Annotations[constants.KmeshRedirectionAnnotation] != "enabled" && utils.ShouldEnroll(newPod, namespace) {
enableKmeshManage(newPod, queue, security, xdpProgFd, mode)
if !utils.AnnotationEnabled(newPod.Annotations[constants.KmeshRedirectionAnnotation]) && utils.ShouldEnroll(newPod, namespace) {
kmc.enableKmeshManage(newPod)
}

// disable kmesh manage
if newPod.Annotations[constants.KmeshRedirectionAnnotation] == "enabled" && !utils.ShouldEnroll(newPod, namespace) {
disableKmeshManage(newPod, queue, security, mode)
if utils.AnnotationEnabled(newPod.Annotations[constants.KmeshRedirectionAnnotation]) && !utils.ShouldEnroll(newPod, namespace) {
kmc.disableKmeshManage(newPod)
}
}

func handlePodDeleteFunc(obj interface{}, security *kmeshsecurity.SecretManager) {
func (kmc *KmeshManageController) handlePodDeleteFunc(obj interface{}) {
pod, ok := obj.(*corev1.Pod)
if !ok {
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
Expand All @@ -182,15 +190,15 @@ func handlePodDeleteFunc(obj interface{}, security *kmeshsecurity.SecretManager)
}
}

if pod.Annotations[constants.KmeshRedirectionAnnotation] == "enabled" {
if utils.AnnotationEnabled(pod.Annotations[constants.KmeshRedirectionAnnotation]) {
log.Infof("%s/%s: Pod managed by Kmesh is deleted", pod.GetNamespace(), pod.GetName())
sendCertRequest(security, pod, kmeshsecurity.DELETE)
sendCertRequest(kmc.sm, pod, kmeshsecurity.DELETE)
// We donot need to do handleKmeshManage for delete, because we may have no change to execute a cmd in pod net ns.
// And we have done this in kmesh-cni
}
}

func handleNamespaceUpdateFunc(oldObj, newObj interface{}, podLister v1.PodLister, queue workqueue.RateLimitingInterface, security *kmeshsecurity.SecretManager, xdpProgFd int, mode string) {
func (kmc *KmeshManageController) handleNamespaceUpdateFunc(oldObj, newObj interface{}) {
oldNS, okOld := oldObj.(*corev1.Namespace)
newNS, okNew := newObj.(*corev1.Namespace)
if !okOld || !okNew {
Expand All @@ -201,17 +209,17 @@ func handleNamespaceUpdateFunc(oldObj, newObj interface{}, podLister v1.PodListe
// Compare labels to check if they have actually changed
if !utils.ShouldEnroll(nil, oldNS) && utils.ShouldEnroll(nil, newNS) {
log.Infof("Enabling Kmesh for all pods in namespace: %s", newNS.Name)
enableKmeshForPodsInNamespace(newNS.Name, podLister, queue, security, xdpProgFd, mode)
kmc.enableKmeshForPodsInNamespace(newNS.Name)
}

if utils.ShouldEnroll(nil, oldNS) && !utils.ShouldEnroll(nil, newNS) {
log.Infof("Disabling Kmesh for all pods in namespace: %s", newNS.Name)
disableKmeshForPodsInNamespace(newNS.Name, podLister, queue, security, mode)
kmc.disableKmeshForPodsInNamespace(newNS.Name)
}
}

func enableKmeshManage(pod *corev1.Pod, queue workqueue.RateLimitingInterface, security *kmeshsecurity.SecretManager, xdpProgFd int, mode string) {
sendCertRequest(security, pod, kmeshsecurity.ADD)
func (kmc *KmeshManageController) enableKmeshManage(pod *corev1.Pod) {
sendCertRequest(kmc.sm, pod, kmeshsecurity.ADD)
if !isPodReady(pod) {
log.Debugf("Pod %s/%s is not ready, skipping Kmesh manage enable", pod.GetNamespace(), pod.GetName())
return
Expand All @@ -222,12 +230,12 @@ func enableKmeshManage(pod *corev1.Pod, queue workqueue.RateLimitingInterface, s
log.Errorf("failed to enable Kmesh manage")
return
}
queue.AddRateLimited(QueueItem{podName: pod.Name, podNs: pod.Namespace, action: ActionAddAnnotation})
_ = linkXdp(nspath, xdpProgFd, mode)
kmc.queue.AddRateLimited(QueueItem{podName: pod.Name, podNs: pod.Namespace, action: ActionAddAnnotation})
_ = linkXdp(nspath, kmc.xdpProgFd, kmc.mode)
}

func disableKmeshManage(pod *corev1.Pod, queue workqueue.RateLimitingInterface, security *kmeshsecurity.SecretManager, mode string) {
sendCertRequest(security, pod, kmeshsecurity.DELETE)
func (kmc *KmeshManageController) disableKmeshManage(pod *corev1.Pod) {
sendCertRequest(kmc.sm, pod, kmeshsecurity.DELETE)
if !isPodReady(pod) {
log.Debugf("%s/%s is not ready, skipping Kmesh manage disable", pod.GetNamespace(), pod.GetName())
return
Expand All @@ -238,32 +246,32 @@ func disableKmeshManage(pod *corev1.Pod, queue workqueue.RateLimitingInterface,
log.Error("failed to disable Kmesh manage")
return
}
queue.AddRateLimited(QueueItem{podName: pod.Name, podNs: pod.Namespace, action: ActionDeleteAnnotation})
_ = unlinkXdp(nspath, mode)
kmc.queue.AddRateLimited(QueueItem{podName: pod.Name, podNs: pod.Namespace, action: ActionDeleteAnnotation})
_ = unlinkXdp(nspath, kmc.mode)
}

func enableKmeshForPodsInNamespace(namespace string, podLister v1.PodLister, queue workqueue.RateLimitingInterface, security *kmeshsecurity.SecretManager, xdpProgFd int, mode string) {
pods, err := podLister.Pods(namespace).List(labels.Everything())
func (kmc *KmeshManageController) enableKmeshForPodsInNamespace(namespace string) {
pods, err := kmc.podLister.Pods(namespace).List(labels.Everything())
if err != nil {
log.Errorf("Error listing pods: %v", err)
return
}

for _, pod := range pods {
enableKmeshManage(pod, queue, security, xdpProgFd, mode)
kmc.enableKmeshManage(pod)
}
}

func disableKmeshForPodsInNamespace(namespace string, podLister v1.PodLister, queue workqueue.RateLimitingInterface, security *kmeshsecurity.SecretManager, mode string) {
pods, err := podLister.Pods(namespace).List(labels.Everything())
func (kmc *KmeshManageController) disableKmeshForPodsInNamespace(namespace string) {
pods, err := kmc.podLister.Pods(namespace).List(labels.Everything())
if err != nil {
log.Errorf("Error listing pods in namespace %s: %v", namespace, err)
return
}

for _, pod := range pods {
if !utils.ShouldEnroll(pod, nil) {
disableKmeshManage(pod, queue, security, mode)
kmc.disableKmeshManage(pod)
}
}
}
Expand Down
6 changes: 5 additions & 1 deletion pkg/utils/enroll.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ var (
)

func PatchKmeshRedirectAnnotation(client kubernetes.Interface, pod *corev1.Pod) error {
if pod.Annotations[constants.KmeshRedirectionAnnotation] == "enabled" {
if AnnotationEnabled(pod.Annotations[constants.KmeshRedirectionAnnotation]) {
log.Debugf("Pod %s in namespace %s already has annotation %s", pod.Name, pod.Namespace, constants.KmeshRedirectionAnnotation)
return nil
}
Expand Down Expand Up @@ -134,3 +134,7 @@ func DelKmeshRedirectAnnotation(client kubernetes.Interface, pod *corev1.Pod) er
)
return err
}

func AnnotationEnabled(annotation string) bool {
return annotation == "enabled"
}
4 changes: 2 additions & 2 deletions pkg/utils/enroll_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -329,7 +329,7 @@ func TestPatchKmeshRedirectAnnotation(t *testing.T) {
t.Errorf("Failed to get the patched pod: %v", err)
}

if got.Annotations[constants.KmeshRedirectionAnnotation] != "enabled" {
if !AnnotationEnabled(got.Annotations[constants.KmeshRedirectionAnnotation]) {
t.Errorf("Expected annotation %s to be 'enabled', got '%s'", constants.KmeshRedirectionAnnotation, got.Annotations[constants.KmeshRedirectionAnnotation])
}

Expand All @@ -343,7 +343,7 @@ func TestPatchKmeshRedirectAnnotation(t *testing.T) {
t.Errorf("Failed to get the patched pod: %v", err)
}

if got.Annotations[constants.KmeshRedirectionAnnotation] != "enabled" {
if !AnnotationEnabled(got.Annotations[constants.KmeshRedirectionAnnotation]) {
t.Errorf("Expected annotation %s to be 'enabled', got '%s'", constants.KmeshRedirectionAnnotation, got.Annotations[constants.KmeshRedirectionAnnotation])
}
}
Expand Down

0 comments on commit 973cbc6

Please sign in to comment.