Skip to content
This repository has been archived by the owner on Dec 20, 2022. It is now read-only.

Commit

Permalink
[patch] fix policy race condition on update (#92)
Browse files Browse the repository at this point in the history
* fix

* revert

* increase sleep time

* increase sleep time 2
  • Loading branch information
WindzCUHK authored Feb 10, 2022
1 parent ac5023f commit 8c218d1
Show file tree
Hide file tree
Showing 4 changed files with 101 additions and 85 deletions.
2 changes: 1 addition & 1 deletion jwk/daemon_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ func Test_jwkd_Start(t *testing.T) {
ctx: ctx,
},
checkFunc: func(j *jwkd, ch <-chan error) error {
time.Sleep(time.Millisecond * 100)
time.Sleep(time.Millisecond * 200)
cancel()
if k, _ := j.keys.Load(j.athenzJwksURL); k == nil {
return errors.New("cannot update keys")
Expand Down
30 changes: 22 additions & 8 deletions policy/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@ import (
"net/http"
"strings"
"sync"
"sync/atomic"
"time"
"unsafe"

"github.com/AthenZ/athenz/utils/zpe-updater/util"
"github.com/kpango/fastime"
Expand Down Expand Up @@ -54,7 +56,7 @@ type policyd struct {
// The []*Assertion contains deny policies first, and following the allow policies
// When CheckPolicy function called, the []*Assertion is check by order, in current implementation the deny policy is prioritize,
// so we need to put the deny policies in lower index.
rolePolicies gache.Gache
rolePolicies *gache.Gache

expiryMargin time.Duration // force update policy before actual expiry by margin duration
refreshPeriod time.Duration
Expand All @@ -72,8 +74,9 @@ type policyd struct {

// New represent the constructor of Policyd
func New(opts ...Option) (Daemon, error) {
g := gache.New()
p := &policyd{
rolePolicies: gache.New(),
rolePolicies: &g,
}

for _, opt := range append(defaultOptions, opts...) {
Expand Down Expand Up @@ -151,6 +154,7 @@ func (p *policyd) Start(ctx context.Context) <-chan error {

// Update updates and cache policy data
func (p *policyd) Update(ctx context.Context) error {
glg.Get().DisableColor()
jobID := fastime.Now().Unix()
glg.Infof("[%d] will update policy", jobID)
eg := errgroup.Group{}
Expand Down Expand Up @@ -184,13 +188,23 @@ func (p *policyd) Update(ctx context.Context) error {
EnableExpiredHook().
SetExpiredHook(func(ctx context.Context, key string) {
// key = <domain>:role.<role>
fetchAndCachePolicy(ctx, p.rolePolicies, p.fetchers[strings.Split(key, ":role.")[0]])
fetchAndCachePolicy(ctx, *(p.rolePolicies), p.fetchers[strings.Split(key, ":role.")[0]])
})

p.rolePolicies, rp = rp, p.rolePolicies
// swap pointer
glg.DebugFunc(func() string {
return fmt.Sprintf("cache before swap, old: %p %v; new: %p %v", *p.rolePolicies, (*p.rolePolicies).Len(), rp, rp.Len())
})
curRpPtrPtr := (*unsafe.Pointer)(unsafe.Pointer(&p.rolePolicies))
oldRpPtr := (*gache.Gache)(atomic.SwapPointer(curRpPtrPtr, unsafe.Pointer(&rp)))
glg.Debugf("tmp cache becomes effective")
rp.Stop()
rp.Clear()
glg.DebugFunc(func() string {
return fmt.Sprintf("cache after swap, old: %p %v; new: %p %v", *p.rolePolicies, (*p.rolePolicies).Len(), *oldRpPtr, (*oldRpPtr).Len())
})
(*oldRpPtr).Stop()

// prevent old cache cleanup, old pointer may be cached in other policy checking goroutine, leave clear up to GC
// (*oldRpPtr).Clear()

glg.Infof("[%d] update policy done", jobID)
return nil
Expand Down Expand Up @@ -218,7 +232,7 @@ func (p *policyd) CheckPolicyRoles(ctx context.Context, domain string, roles []s

wg := new(sync.WaitGroup)
wg.Add(len(roles))
rp := p.rolePolicies
rp := *p.rolePolicies

for _, role := range roles {
dr := fmt.Sprintf("%s:role.%s", domain, role)
Expand Down Expand Up @@ -275,7 +289,7 @@ func (p *policyd) CheckPolicyRoles(ctx context.Context, domain string, roles []s

// GetPolicyCache returns the cached role policy data
func (p *policyd) GetPolicyCache(ctx context.Context) map[string]interface{} {
return p.rolePolicies.ToRawMap(ctx)
return (*p.rolePolicies).ToRawMap(ctx)
}

func fetchAndCachePolicy(ctx context.Context, g gache.Gache, f Fetcher) error {
Expand Down
Loading

0 comments on commit 8c218d1

Please sign in to comment.