Skip to content

Commit

Permalink
Revert "Merge handleAddPod with handleUpdatePod. (kubeovn#2563)"
Browse files Browse the repository at this point in the history
This reverts commit 2fb1f95.
  • Loading branch information
zhangzujian committed Apr 8, 2023
1 parent 7f9b4cc commit 3348462
Show file tree
Hide file tree
Showing 3 changed files with 320 additions and 223 deletions.
12 changes: 8 additions & 4 deletions pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,9 @@ type Controller struct {

podsLister v1.PodLister
podsSynced cache.InformerSynced
addOrUpdatePodQueue workqueue.RateLimitingInterface
addPodQueue workqueue.RateLimitingInterface
deletePodQueue workqueue.RateLimitingInterface
updatePodQueue workqueue.RateLimitingInterface
updatePodSecurityQueue workqueue.RateLimitingInterface
podKeyMutex *keymutex.KeyMutex

Expand Down Expand Up @@ -366,8 +367,9 @@ func NewController(config *Configuration) *Controller {

podsLister: podInformer.Lister(),
podsSynced: podInformer.Informer().HasSynced,
addOrUpdatePodQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "AddOrUpdatePod"),
addPodQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "AddPod"),
deletePodQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "DeletePod"),
updatePodQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "UpdatePod"),
updatePodSecurityQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "UpdatePodSecurity"),
podKeyMutex: keymutex.New(97),

Expand Down Expand Up @@ -770,8 +772,9 @@ func (c *Controller) Run(ctx context.Context) {
func (c *Controller) shutdown() {
utilruntime.HandleCrash()

c.addOrUpdatePodQueue.ShutDown()
c.addPodQueue.ShutDown()
c.deletePodQueue.ShutDown()
c.updatePodQueue.ShutDown()
c.updatePodSecurityQueue.ShutDown()

c.addNamespaceQueue.ShutDown()
Expand Down Expand Up @@ -950,8 +953,9 @@ func (c *Controller) startWorkers(ctx context.Context) {
}

for i := 0; i < c.config.WorkerNum; i++ {
go wait.Until(c.runAddPodWorker, time.Second, ctx.Done())
go wait.Until(c.runDeletePodWorker, time.Second, ctx.Done())
go wait.Until(c.runAddOrUpdatePodWorker, time.Second, ctx.Done())
go wait.Until(c.runUpdatePodWorker, time.Second, ctx.Done())
go wait.Until(c.runUpdatePodSecurityWorker, time.Second, ctx.Done())

go wait.Until(c.runDeleteSubnetWorker, time.Second, ctx.Done())
Expand Down
4 changes: 2 additions & 2 deletions pkg/controller/inspection.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,13 +59,13 @@ func (c *Controller) inspectPod() error {
return err
}
klog.V(5).Infof("finish remove annotation for %s", portName)
c.addOrUpdatePodQueue.Add(fmt.Sprintf("%s/%s", pod.Namespace, pod.Name))
c.addPodQueue.Add(fmt.Sprintf("%s/%s", pod.Namespace, pod.Name))
break
} else {
if pod.Annotations[fmt.Sprintf(util.AllocatedAnnotationTemplate, podNet.ProviderName)] == "true" && pod.Spec.NodeName != "" {
if pod.Annotations[fmt.Sprintf(util.RoutedAnnotationTemplate, podNet.ProviderName)] != "true" {
klog.V(5).Infof("enqueue update pod %s/%s", pod.Namespace, pod.Name)
c.addOrUpdatePodQueue.Add(fmt.Sprintf("%s/%s", pod.Namespace, pod.Name))
c.updatePodQueue.Add(fmt.Sprintf("%s/%s", pod.Namespace, pod.Name))
break
}
}
Expand Down
Loading

0 comments on commit 3348462

Please sign in to comment.