Skip to content

Commit

Permalink
Merge pull request #390 from hzxuzhonghu/fifq
Browse files Browse the repository at this point in the history
Make use of FIFQ instead of priority queue
  • Loading branch information
kmesh-bot authored Jun 4, 2024
2 parents c5131d6 + cb0a3b9 commit 8627833
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 94 deletions.
1 change: 1 addition & 0 deletions pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ func (c *Controller) Start() error {
if err != nil {
return fmt.Errorf("secretManager create failed: %v", err)
}
go secertManager.Run(stopCh)

c.client = NewXdsClient(c.mode, c.bpfWorkloadObj)
if c.client.WorkloadController != nil {
Expand Down
4 changes: 4 additions & 0 deletions pkg/controller/security/caclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,3 +177,7 @@ func (c *caClient) reconnect() error {
c.client = pb.NewIstioCertificateServiceClient(conn)
return nil
}

func (c *caClient) close() error {
return c.conn.Close()
}
130 changes: 36 additions & 94 deletions pkg/controller/security/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,23 +17,18 @@
package security

import (
"container/heap"
"sync"
"time"

istiosecurity "istio.io/istio/pkg/security"
"k8s.io/client-go/util/workqueue"

"kmesh.net/kmesh/pkg/constants"
"kmesh.net/kmesh/pkg/logger"
)

var log = logger.NewLoggerField("security")

type certExp struct {
identity string
exp time.Time
}

type certItem struct {
cert *istiosecurity.SecretItem
refCnt int32
Expand All @@ -59,28 +54,29 @@ type SecretManager struct {
certsCache *certsCache

// certs rotation priority queue based on exp
certsRotateQueue *rotateQueue
certsRotateQueue workqueue.DelayingInterface

certRequestChan chan certRequest
}

type rotateQueue struct {
certs []*certExp
mu sync.Mutex
}

func (s *SecretManager) SendCertRequest(identity string, op int) {
s.certRequestChan <- certRequest{Identity: identity, Operation: op}
}

func (s *SecretManager) handleCertRequests() {
func (s *SecretManager) handleCertRequests(stop <-chan struct{}) {
for data := range s.certRequestChan {
select {
case <-stop:
return
default:
}

identity, op := data.Identity, data.Operation
switch op {
case ADD:
certificate := s.certsCache.addOrUpdate(identity)
if certificate != nil {
log.Debugf("add identity: %v refCnt++ : %v\n", identity, certificate.refCnt)
log.Debugf("add identity: %v refCnt: %v\n", identity, certificate.refCnt)
continue
}
// sign cert if only no cert exists for this identity
Expand All @@ -100,63 +96,6 @@ func newCertCache() *certsCache {
}
}

func newCertRotateQueue() *rotateQueue {
return &rotateQueue{
certs: make([]*certExp, 0),
mu: sync.Mutex{},
}
}

func (pq *rotateQueue) Push(x interface{}) {
item := x.(*certExp)
pq.certs = append(pq.certs, item)
}

func (pq *rotateQueue) Pop() interface{} {
old := pq.certs
n := len(old)
x := old[n-1]
old[n-1] = nil // avoid memory leak
pq.certs = old[0 : n-1]
return x
}

func (pq *rotateQueue) Len() int {
return len(pq.certs)
}

func (pq *rotateQueue) Less(i, j int) bool {
return pq.certs[i].exp.Before(pq.certs[j].exp)
}

func (pq *rotateQueue) Swap(i, j int) {
pq.certs[i], pq.certs[j] = pq.certs[j], pq.certs[i]
}

func (pq *rotateQueue) addItem(certExp *certExp) {
pq.mu.Lock()
defer pq.mu.Unlock()
heap.Push(pq, certExp)
}

func (pq *rotateQueue) delete(identity string) *certExp {
pq.mu.Lock()
defer pq.mu.Unlock()
for i := 0; i < len(pq.certs); i++ {
if pq.certs[i].identity == identity {
return heap.Remove(pq, i).(*certExp)
}
}
return nil
}

// pop a certificate that is about to expire
func (pq *rotateQueue) pop() *certExp {
pq.mu.Lock()
defer pq.mu.Unlock()
return heap.Pop(pq).(*certExp)
}

func (s *SecretManager) storeCert(identity string, newCert *istiosecurity.SecretItem) {
s.certsCache.mu.Lock()
defer s.certsCache.mu.Unlock()
Expand All @@ -173,14 +112,9 @@ func (s *SecretManager) storeCert(identity string, newCert *istiosecurity.Secret
return
}

existing.cert = newCert
certExp := certExp{
exp: newCert.ExpireTime,
identity: identity,
}
// push to rotate queue
s.certsRotateQueue.addItem(&certExp)
log.Debugf("cert %v added to rotation queue, exp: %v\n", identity, newCert.ExpireTime)
// push to rotate queue one hour before cert expire
s.certsRotateQueue.AddAfter(identity, time.Until(newCert.ExpireTime.Add(-1*time.Hour)))
log.Debugf("cert %v added to rotation queue, exp: %v", identity, newCert.ExpireTime)
}

// addOrUpdate checks whether the certificate already exists.
Expand All @@ -201,7 +135,7 @@ func (c *certsCache) addOrUpdate(identity string) *certItem {
return nil
}

// NewSecretManager creates a new secretManager.s
// NewSecretManager creates a new secretManager
func NewSecretManager() (*SecretManager, error) {
tlsOpts := &tlsOptions{
RootCert: constants.RootCertPath,
Expand All @@ -212,31 +146,36 @@ func NewSecretManager() (*SecretManager, error) {
if err != nil {
return nil, err
}
pq := newCertRotateQueue()
heap.Init(pq)

secretManager := SecretManager{
caClient: caClient,
configOptions: options,
certsCache: newCertCache(),
certsRotateQueue: pq,
certsRotateQueue: workqueue.NewDelayingQueue(),
certRequestChan: make(chan certRequest, maxConcurrentCSR),
}
go secretManager.handleCertRequests()
go secretManager.rotateCerts()
return &secretManager, nil
}

func (s *SecretManager) Run(stop <-chan struct{}) {
go s.handleCertRequests(stop)
go s.rotateCerts()
<-stop
s.certsRotateQueue.ShutDown()
s.caClient.close()
}

// Automatically check and rotate when the validity period expires
func (s *SecretManager) rotateCerts() {
for {
if s.certsRotateQueue.Len() != 0 {
top := s.certsRotateQueue.pop()
time.Sleep(time.Until(top.exp.Add(-1 * time.Hour)))
s.SendCertRequest(top.identity, Rotate)
} else {
time.Sleep(5 * time.Second)
element, quit := s.certsRotateQueue.Get()
if quit {
return
}

identity := element.(string)
s.SendCertRequest(identity, Rotate)
s.certsRotateQueue.Done(element)
}
}

Expand All @@ -250,8 +189,7 @@ func (s *SecretManager) addCert(identity string) {
return
}

// Save the new certificate in the map and add a record to the priority queue
// of the auto-refresh task when it expires
// Save the new certificate in the map and add a record to the rotate queue
s.storeCert(identity, newCert)
}

Expand All @@ -268,7 +206,6 @@ func (s *SecretManager) deleteCert(identity string) {
log.Debugf("remove identity: %v refCnt : %v", identity, certificate.refCnt)
if certificate.refCnt == 0 {
delete(s.certsCache.certs, identity)
s.certsRotateQueue.delete(identity)
log.Debugf("identity: %v cert deleted", identity)
}
}
Expand All @@ -283,5 +220,10 @@ func (s *SecretManager) rotateCert(identity string) {
}
s.certsCache.mu.RUnlock()

if time.Until(certificate.cert.ExpireTime) >= 1*time.Hour {
// This can happen when delete a certificate following adding the same one later.
log.Debugf("cert %s expire at %T, skip rotate now", identity, certificate.cert.ExpireTime)
}

s.addCert(identity)
}

0 comments on commit 8627833

Please sign in to comment.