From d2629f40aea302198c8a191ee39554a9cfab36be Mon Sep 17 00:00:00 2001 From: Zhonghu Xu Date: Tue, 28 May 2024 16:54:28 +0800 Subject: [PATCH 1/4] replace priority queue with fifq Signed-off-by: Zhonghu Xu --- pkg/controller/security/manager.go | 92 +++++------------------------- 1 file changed, 14 insertions(+), 78 deletions(-) diff --git a/pkg/controller/security/manager.go b/pkg/controller/security/manager.go index b6128bfca..55549dbe3 100644 --- a/pkg/controller/security/manager.go +++ b/pkg/controller/security/manager.go @@ -17,11 +17,11 @@ package security import ( - "container/heap" "sync" "time" istiosecurity "istio.io/istio/pkg/security" + "k8s.io/client-go/util/workqueue" "kmesh.net/kmesh/pkg/constants" "kmesh.net/kmesh/pkg/logger" @@ -59,16 +59,11 @@ type SecretManager struct { certsCache *certsCache // certs rotation priority queue based on exp - certsRotateQueue *rotateQueue + certsRotateQueue workqueue.Interface certRequestChan chan certRequest } -type rotateQueue struct { - certs []*certExp - mu sync.Mutex -} - func (s *SecretManager) SendCertRequest(identity string, op int) { s.certRequestChan <- certRequest{Identity: identity, Operation: op} } @@ -100,63 +95,6 @@ func newCertCache() *certsCache { } } -func newCertRotateQueue() *rotateQueue { - return &rotateQueue{ - certs: make([]*certExp, 0), - mu: sync.Mutex{}, - } -} - -func (pq *rotateQueue) Push(x interface{}) { - item := x.(*certExp) - pq.certs = append(pq.certs, item) -} - -func (pq *rotateQueue) Pop() interface{} { - old := pq.certs - n := len(old) - x := old[n-1] - old[n-1] = nil // avoid memory leak - pq.certs = old[0 : n-1] - return x -} - -func (pq *rotateQueue) Len() int { - return len(pq.certs) -} - -func (pq *rotateQueue) Less(i, j int) bool { - return pq.certs[i].exp.Before(pq.certs[j].exp) -} - -func (pq *rotateQueue) Swap(i, j int) { - pq.certs[i], pq.certs[j] = pq.certs[j], pq.certs[i] -} - -func (pq *rotateQueue) addItem(certExp *certExp) { - pq.mu.Lock() - defer pq.mu.Unlock() - heap.Push(pq, certExp) -} - -func (pq *rotateQueue) delete(identity string) *certExp { - pq.mu.Lock() - defer pq.mu.Unlock() - for i := 0; i < len(pq.certs); i++ { - if pq.certs[i].identity == identity { - return heap.Remove(pq, i).(*certExp) - } - } - return nil -} - -// pop a certificate that is about to expire -func (pq *rotateQueue) pop() *certExp { - pq.mu.Lock() - defer pq.mu.Unlock() - return heap.Pop(pq).(*certExp) -} - func (s *SecretManager) storeCert(identity string, newCert *istiosecurity.SecretItem) { s.certsCache.mu.Lock() defer s.certsCache.mu.Unlock() @@ -179,8 +117,8 @@ func (s *SecretManager) storeCert(identity string, newCert *istiosecurity.Secret identity: identity, } // push to rotate queue - s.certsRotateQueue.addItem(&certExp) - log.Debugf("cert %v added to rotation queue, exp: %v\n", identity, newCert.ExpireTime) + s.certsRotateQueue.Add(certExp) + log.Debugf("cert %v added to rotation queue, exp: %v", identity, newCert.ExpireTime) } // addOrUpdate checks whether the certificate already exists. @@ -212,14 +150,12 @@ func NewSecretManager() (*SecretManager, error) { if err != nil { return nil, err } - pq := newCertRotateQueue() - heap.Init(pq) secretManager := SecretManager{ caClient: caClient, configOptions: options, certsCache: newCertCache(), - certsRotateQueue: pq, + certsRotateQueue: workqueue.New(), certRequestChan: make(chan certRequest, maxConcurrentCSR), } go secretManager.handleCertRequests() @@ -230,13 +166,15 @@ func NewSecretManager() (*SecretManager, error) { // Automatically check and rotate when the validity period expires func (s *SecretManager) rotateCerts() { for { - if s.certsRotateQueue.Len() != 0 { - top := s.certsRotateQueue.pop() - time.Sleep(time.Until(top.exp.Add(-1 * time.Hour))) - s.SendCertRequest(top.identity, Rotate) - } else { - time.Sleep(5 * time.Second) + element, quit := s.certsRotateQueue.Get() + if quit { + return } + defer s.certsRotateQueue.Done(element) + + certExp := element.(certExp) + time.Sleep(time.Until(certExp.exp.Add(-1 * time.Hour))) + s.SendCertRequest(certExp.identity, Rotate) } } @@ -250,8 +188,7 @@ func (s *SecretManager) addCert(identity string) { return } - // Save the new certificate in the map and add a record to the priority queue - // of the auto-refresh task when it expires + // Save the new certificate in the map and add a record to the rotate queue s.storeCert(identity, newCert) } @@ -268,7 +205,6 @@ func (s *SecretManager) deleteCert(identity string) { log.Debugf("remove identity: %v refCnt : %v", identity, certificate.refCnt) if certificate.refCnt == 0 { delete(s.certsCache.certs, identity) - s.certsRotateQueue.delete(identity) log.Debugf("identity: %v cert deleted", identity) } } From aee13d1e58f63cfd029de76a1da14141a82b8dc8 Mon Sep 17 00:00:00 2001 From: Zhonghu Xu Date: Tue, 28 May 2024 17:18:55 +0800 Subject: [PATCH 2/4] Add Run method for secret manager Signed-off-by: Zhonghu Xu --- pkg/controller/controller.go | 1 + pkg/controller/security/caclient.go | 4 ++++ pkg/controller/security/manager.go | 22 +++++++++++++++++----- 3 files changed, 22 insertions(+), 5 deletions(-) diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index edd7db14c..76295a447 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -70,6 +70,7 @@ func (c *Controller) Start() error { if err != nil { return fmt.Errorf("secretManager create failed: %v", err) } + go secertManager.Run(stopCh) c.client = NewXdsClient(c.mode, c.bpfWorkloadObj) if c.client.workloadController != nil { diff --git a/pkg/controller/security/caclient.go b/pkg/controller/security/caclient.go index 13a2c56b6..0023dc27b 100644 --- a/pkg/controller/security/caclient.go +++ b/pkg/controller/security/caclient.go @@ -177,3 +177,7 @@ func (c *caClient) reconnect() error { c.client = pb.NewIstioCertificateServiceClient(conn) return nil } + +func (c *caClient) close() error { + return c.conn.Close() +} diff --git a/pkg/controller/security/manager.go b/pkg/controller/security/manager.go index 55549dbe3..1a5bb9fda 100644 --- a/pkg/controller/security/manager.go +++ b/pkg/controller/security/manager.go @@ -68,14 +68,20 @@ func (s *SecretManager) SendCertRequest(identity string, op int) { s.certRequestChan <- certRequest{Identity: identity, Operation: op} } -func (s *SecretManager) handleCertRequests() { +func (s *SecretManager) handleCertRequests(stop <-chan struct{}) { for data := range s.certRequestChan { + select { + case <-stop: + return + default: + } + identity, op := data.Identity, data.Operation switch op { case ADD: certificate := s.certsCache.addOrUpdate(identity) if certificate != nil { - log.Debugf("add identity: %v refCnt++ : %v\n", identity, certificate.refCnt) + log.Debugf("add identity: %v refCnt: %v\n", identity, certificate.refCnt) continue } // sign cert if only no cert exists for this identity @@ -139,7 +145,7 @@ func (c *certsCache) addOrUpdate(identity string) *certItem { return nil } -// NewSecretManager creates a new secretManager.s +// NewSecretManager creates a new secretManager func NewSecretManager() (*SecretManager, error) { tlsOpts := &tlsOptions{ RootCert: constants.RootCertPath, @@ -158,11 +164,17 @@ func NewSecretManager() (*SecretManager, error) { certsRotateQueue: workqueue.New(), certRequestChan: make(chan certRequest, maxConcurrentCSR), } - go secretManager.handleCertRequests() - go secretManager.rotateCerts() return &secretManager, nil } +func (s *SecretManager) Run(stop <-chan struct{}) { + go s.handleCertRequests(stop) + go s.rotateCerts() + <-stop + s.certsRotateQueue.ShutDown() + s.caClient.close() +} + // Automatically check and rotate when the validity period expires func (s *SecretManager) rotateCerts() { for { From ac09ffef002cd17d579b6d37084b46edca93b542 Mon Sep 17 00:00:00 2001 From: Zhonghu Xu Date: Wed, 29 May 2024 11:51:28 +0800 Subject: [PATCH 3/4] remove defer --- pkg/controller/security/manager.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/controller/security/manager.go b/pkg/controller/security/manager.go index 1a5bb9fda..22b2794b1 100644 --- a/pkg/controller/security/manager.go +++ b/pkg/controller/security/manager.go @@ -182,11 +182,11 @@ func (s *SecretManager) rotateCerts() { if quit { return } - defer s.certsRotateQueue.Done(element) certExp := element.(certExp) time.Sleep(time.Until(certExp.exp.Add(-1 * time.Hour))) s.SendCertRequest(certExp.identity, Rotate) + s.certsRotateQueue.Done(element) } } From cb0a3b97b7e8c122b5ba768542d51ea54dc0aae1 Mon Sep 17 00:00:00 2001 From: Zhonghu Xu Date: Wed, 29 May 2024 12:11:29 +0800 Subject: [PATCH 4/4] use delay queue Signed-off-by: Zhonghu Xu --- pkg/controller/security/manager.go | 28 +++++++++++----------------- 1 file changed, 11 insertions(+), 17 deletions(-) diff --git a/pkg/controller/security/manager.go b/pkg/controller/security/manager.go index 22b2794b1..a756359f8 100644 --- a/pkg/controller/security/manager.go +++ b/pkg/controller/security/manager.go @@ -29,11 +29,6 @@ import ( var log = logger.NewLoggerField("security") -type certExp struct { - identity string - exp time.Time -} - type certItem struct { cert *istiosecurity.SecretItem refCnt int32 @@ -59,7 +54,7 @@ type SecretManager struct { certsCache *certsCache // certs rotation priority queue based on exp - certsRotateQueue workqueue.Interface + certsRotateQueue workqueue.DelayingInterface certRequestChan chan certRequest } @@ -117,13 +112,8 @@ func (s *SecretManager) storeCert(identity string, newCert *istiosecurity.Secret return } - existing.cert = newCert - certExp := certExp{ - exp: newCert.ExpireTime, - identity: identity, - } - // push to rotate queue - s.certsRotateQueue.Add(certExp) + // push to rotate queue one hour before cert expire + s.certsRotateQueue.AddAfter(identity, time.Until(newCert.ExpireTime.Add(-1*time.Hour))) log.Debugf("cert %v added to rotation queue, exp: %v", identity, newCert.ExpireTime) } @@ -161,7 +151,7 @@ func NewSecretManager() (*SecretManager, error) { caClient: caClient, configOptions: options, certsCache: newCertCache(), - certsRotateQueue: workqueue.New(), + certsRotateQueue: workqueue.NewDelayingQueue(), certRequestChan: make(chan certRequest, maxConcurrentCSR), } return &secretManager, nil @@ -183,9 +173,8 @@ func (s *SecretManager) rotateCerts() { return } - certExp := element.(certExp) - time.Sleep(time.Until(certExp.exp.Add(-1 * time.Hour))) - s.SendCertRequest(certExp.identity, Rotate) + identity := element.(string) + s.SendCertRequest(identity, Rotate) s.certsRotateQueue.Done(element) } } @@ -231,5 +220,10 @@ func (s *SecretManager) rotateCert(identity string) { } s.certsCache.mu.Unlock() + if time.Until(certificate.cert.ExpireTime) >= 1*time.Hour { + // This can happen when delete a certificate following adding the same one later. + log.Debugf("cert %s expire at %T, skip rotate now", identity, certificate.cert.ExpireTime) + } + s.addCert(identity) }