Skip to content

Commit

Permalink
Fix argument list too long in manage controller
Browse files Browse the repository at this point in the history
Signed-off-by: talon <tianmuyang@huawei.com>
  • Loading branch information
tacslon committed Aug 12, 2024
1 parent 77cadb3 commit 1964312
Showing 1 changed file with 82 additions and 35 deletions.
117 changes: 82 additions & 35 deletions pkg/controller/manage/manage_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,22 @@ type KmeshManageController struct {
client kubernetes.Interface
}

type podEventArgs struct {
namespaceLister v1.NamespaceLister
queue workqueue.RateLimitingInterface
sm *kmeshsecurity.SecretManager
xdpProgFd int
mode string
}

type nsEventArgs struct {
podLister v1.PodLister
queue workqueue.RateLimitingInterface
sm *kmeshsecurity.SecretManager
xdpProgFd int
mode string
}

func isPodReady(pod *corev1.Pod) bool {
for _, condition := range pod.Status.Conditions {
if condition.Type == corev1.PodReady && condition.Status == corev1.ConditionTrue {
Expand All @@ -77,7 +93,7 @@ func isPodReady(pod *corev1.Pod) bool {
return false
}

func NewKmeshManageController(client kubernetes.Interface, security *kmeshsecurity.SecretManager, xdpProgFd int, mode string) (*KmeshManageController, error) {
func NewKmeshManageController(client kubernetes.Interface, sm *kmeshsecurity.SecretManager, xdpProgFd int, mode string) (*KmeshManageController, error) {
nodeName := os.Getenv("NODE_NAME")

informerFactory := informers.NewSharedInformerFactoryWithOptions(client, 0,
Expand All @@ -95,21 +111,41 @@ func NewKmeshManageController(client kubernetes.Interface, security *kmeshsecuri

if _, err := podInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
handlePodAddFunc(obj, namespaceLister, queue, security, xdpProgFd, mode)
handlePodAddFunc(obj, &podEventArgs{
namespaceLister: namespaceLister,
queue: queue,
sm: sm,
xdpProgFd: xdpProgFd,
mode: mode,
})
},
UpdateFunc: func(oldObj, newObj interface{}) {
handlePodUpdateFunc(oldObj, newObj, namespaceLister, queue, security, xdpProgFd, mode)
handlePodUpdateFunc(oldObj, newObj, &podEventArgs{
namespaceLister: namespaceLister,
queue: queue,
sm: sm,
xdpProgFd: xdpProgFd,
mode: mode,
})
},
DeleteFunc: func(obj interface{}) {
handlePodDeleteFunc(obj, security)
handlePodDeleteFunc(obj, &podEventArgs{
sm: sm,
})
},
}); err != nil {
return nil, fmt.Errorf("failed to add event handler to podInformer: %v", err)
}

if _, err := namespaceInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
UpdateFunc: func(oldObj, newObj interface{}) {
handleNamespaceUpdateFunc(oldObj, newObj, podLister, queue, security, xdpProgFd, mode)
handleNamespaceUpdateFunc(oldObj, newObj, &nsEventArgs{
podLister: podLister,
queue: queue,
sm: sm,
xdpProgFd: xdpProgFd,
mode: mode,
})
},
}); err != nil {
return nil, fmt.Errorf("failed to add event handler to namespaceInformer: %v", err)
Expand All @@ -127,53 +163,53 @@ func NewKmeshManageController(client kubernetes.Interface, security *kmeshsecuri
}, nil
}

func handlePodAddFunc(obj interface{}, namespaceLister v1.NamespaceLister, queue workqueue.RateLimitingInterface, security *kmeshsecurity.SecretManager, xdpProgFd int, mode string) {
func handlePodAddFunc(obj interface{}, args *podEventArgs) {
pod, ok := obj.(*corev1.Pod)
if !ok {
log.Errorf("expected *corev1.Pod but got %T", obj)
return
}

namespace, err := namespaceLister.Get(pod.Namespace)
namespace, err := args.namespaceLister.Get(pod.Namespace)
if err != nil {
log.Errorf("failed to get pod namespace %s: %v", pod.Namespace, err)
return
}

if !utils.ShouldEnroll(pod, namespace) {
if pod.Annotations[constants.KmeshRedirectionAnnotation] == "enabled" {
disableKmeshManage(pod, queue, security, mode)
disableKmeshManage(pod, args)
}
return
}
enableKmeshManage(pod, queue, security, xdpProgFd, mode)
enableKmeshManage(pod, args)
}

func handlePodUpdateFunc(oldObj, newObj interface{}, namespaceLister v1.NamespaceLister, queue workqueue.RateLimitingInterface, security *kmeshsecurity.SecretManager, xdpProgFd int, mode string) {
func handlePodUpdateFunc(_, newObj interface{}, args *podEventArgs) {
newPod, okNew := newObj.(*corev1.Pod)
if !okNew {
log.Errorf("expected *corev1.Pod but got %T", newObj)
return
}

namespace, err := namespaceLister.Get(newPod.Namespace)
namespace, err := args.namespaceLister.Get(newPod.Namespace)
if err != nil {
log.Errorf("failed to get pod namespace %s: %v", newPod.Namespace, err)
return
}

// enable kmesh manage
if newPod.Annotations[constants.KmeshRedirectionAnnotation] != "enabled" && utils.ShouldEnroll(newPod, namespace) {
enableKmeshManage(newPod, queue, security, xdpProgFd, mode)
enableKmeshManage(newPod, args)
}

// disable kmesh manage
if newPod.Annotations[constants.KmeshRedirectionAnnotation] == "enabled" && !utils.ShouldEnroll(newPod, namespace) {
disableKmeshManage(newPod, queue, security, mode)
disableKmeshManage(newPod, args)
}
}

func handlePodDeleteFunc(obj interface{}, security *kmeshsecurity.SecretManager) {
func handlePodDeleteFunc(obj interface{}, args *podEventArgs) {
pod, ok := obj.(*corev1.Pod)
if !ok {
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
Expand All @@ -190,13 +226,13 @@ func handlePodDeleteFunc(obj interface{}, security *kmeshsecurity.SecretManager)

if pod.Annotations[constants.KmeshRedirectionAnnotation] == "enabled" {
log.Infof("%s/%s: Pod managed by Kmesh is deleted", pod.GetNamespace(), pod.GetName())
sendCertRequest(security, pod, kmeshsecurity.DELETE)
sendCertRequest(args.sm, pod, kmeshsecurity.DELETE)
// We donot need to do handleKmeshManage for delete, because we may have no change to execute a cmd in pod net ns.
// And we have done this in kmesh-cni
}
}

func handleNamespaceUpdateFunc(oldObj, newObj interface{}, podLister v1.PodLister, queue workqueue.RateLimitingInterface, security *kmeshsecurity.SecretManager, xdpProgFd int, mode string) {
func handleNamespaceUpdateFunc(oldObj, newObj interface{}, args *nsEventArgs) {
oldNS, okOld := oldObj.(*corev1.Namespace)
newNS, okNew := newObj.(*corev1.Namespace)
if !okOld || !okNew {
Expand All @@ -207,17 +243,17 @@ func handleNamespaceUpdateFunc(oldObj, newObj interface{}, podLister v1.PodListe
// Compare labels to check if they have actually changed
if !utils.ShouldEnroll(nil, oldNS) && utils.ShouldEnroll(nil, newNS) {
log.Infof("Enabling Kmesh for all pods in namespace: %s", newNS.Name)
enableKmeshForPodsInNamespace(newNS.Name, podLister, queue, security, xdpProgFd, mode)
enableKmeshForPodsInNamespace(newNS.Name, args)
}

if utils.ShouldEnroll(nil, oldNS) && !utils.ShouldEnroll(nil, newNS) {
log.Infof("Disabling Kmesh for all pods in namespace: %s", newNS.Name)
disableKmeshForPodsInNamespace(newNS.Name, podLister, queue, security, mode)
disableKmeshForPodsInNamespace(newNS.Name, args)
}
}

func enableKmeshManage(pod *corev1.Pod, queue workqueue.RateLimitingInterface, security *kmeshsecurity.SecretManager, xdpProgFd int, mode string) {
sendCertRequest(security, pod, kmeshsecurity.ADD)
func enableKmeshManage(pod *corev1.Pod, args *podEventArgs) {
sendCertRequest(args.sm, pod, kmeshsecurity.ADD)
if !isPodReady(pod) {
log.Debugf("Pod %s/%s is not ready, skipping Kmesh manage enable", pod.GetNamespace(), pod.GetName())
return
Expand All @@ -228,12 +264,12 @@ func enableKmeshManage(pod *corev1.Pod, queue workqueue.RateLimitingInterface, s
log.Errorf("failed to enable Kmesh manage")
return
}
queue.AddRateLimited(QueueItem{podName: pod.Name, podNs: pod.Namespace, action: ActionAddAnnotation})
_ = linkXdp(nspath, xdpProgFd, mode)
args.queue.AddRateLimited(QueueItem{podName: pod.Name, podNs: pod.Namespace, action: ActionAddAnnotation})
_ = linkXdp(nspath, args.xdpProgFd, args.mode)
}

func disableKmeshManage(pod *corev1.Pod, queue workqueue.RateLimitingInterface, security *kmeshsecurity.SecretManager, mode string) {
sendCertRequest(security, pod, kmeshsecurity.DELETE)
func disableKmeshManage(pod *corev1.Pod, args *podEventArgs) {
sendCertRequest(args.sm, pod, kmeshsecurity.DELETE)
if !isPodReady(pod) {
log.Debugf("%s/%s is not ready, skipping Kmesh manage disable", pod.GetNamespace(), pod.GetName())
return
Expand All @@ -244,32 +280,43 @@ func disableKmeshManage(pod *corev1.Pod, queue workqueue.RateLimitingInterface,
log.Errorf("failed to disable Kmesh manage")
return
}
queue.AddRateLimited(QueueItem{podName: pod.Name, podNs: pod.Namespace, action: ActionDeleteAnnotation})
_ = unlinkXdp(nspath, mode)
args.queue.AddRateLimited(QueueItem{podName: pod.Name, podNs: pod.Namespace, action: ActionDeleteAnnotation})
_ = unlinkXdp(nspath, args.mode)
}

func enableKmeshForPodsInNamespace(namespace string, podLister v1.PodLister, queue workqueue.RateLimitingInterface, security *kmeshsecurity.SecretManager, xdpProgFd int, mode string) {
pods, err := podLister.Pods(namespace).List(labels.Everything())
func enableKmeshForPodsInNamespace(namespace string, args *nsEventArgs) {
pods, err := args.podLister.Pods(namespace).List(labels.Everything())
if err != nil {
log.Errorf("Error listing pods: %v", err)
return
}

podArgs := &podEventArgs{
queue: args.queue,
sm: args.sm,
xdpProgFd: args.xdpProgFd,
mode: args.mode,
}
for _, pod := range pods {
enableKmeshManage(pod, queue, security, xdpProgFd, mode)
enableKmeshManage(pod, podArgs)
}
}

func disableKmeshForPodsInNamespace(namespace string, podLister v1.PodLister, queue workqueue.RateLimitingInterface, security *kmeshsecurity.SecretManager, mode string) {
pods, err := podLister.Pods(namespace).List(labels.Everything())
func disableKmeshForPodsInNamespace(namespace string, args *nsEventArgs) {
pods, err := args.podLister.Pods(namespace).List(labels.Everything())
if err != nil {
log.Errorf("Error listing pods in namespace %s: %v", namespace, err)
return
}

podArgs := &podEventArgs{
queue: args.queue,
sm: args.sm,
mode: args.mode,
}
for _, pod := range pods {
if !utils.ShouldEnroll(pod, nil) {
disableKmeshManage(pod, queue, security, mode)
disableKmeshManage(pod, podArgs)
}
}
}
Expand Down Expand Up @@ -333,14 +380,14 @@ func (c *KmeshManageController) processItems() bool {
return true
}

func sendCertRequest(security *kmeshsecurity.SecretManager, pod *corev1.Pod, op int) {
if security != nil {
func sendCertRequest(sm *kmeshsecurity.SecretManager, pod *corev1.Pod, op int) {
if sm != nil {
Identity := spiffe.Identity{
TrustDomain: constants.TrustDomain,
Namespace: pod.Namespace,
ServiceAccount: pod.Spec.ServiceAccountName,
}.String()
security.SendCertRequest(Identity, op)
sm.SendCertRequest(Identity, op)

Check warning on line 390 in pkg/controller/manage/manage_controller.go

View check run for this annotation

Codecov / codecov/patch

pkg/controller/manage/manage_controller.go#L390

Added line #L390 was not covered by tests
}
}

Expand Down

0 comments on commit 1964312

Please sign in to comment.