Skip to content

Commit

Permalink
🐛 DynamicRestMapper: return NoMatchError when resource doesn't exist
Browse files Browse the repository at this point in the history
  • Loading branch information
timebertt committed Sep 28, 2020
1 parent fdc6658 commit 93a5448
Show file tree
Hide file tree
Showing 2 changed files with 65 additions and 53 deletions.
43 changes: 18 additions & 25 deletions pkg/client/apiutil/dynamicrestmapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@ import (

// ErrRateLimited is returned by a RESTMapper method if the number of API
// calls has exceeded a limit within a certain time period.
//
// Deprecated: as the dynamic RESTMapper now returns NoMatch errors directly
// instead of RateLimited errors, it is meaningless and only kept for the sake
// of backwards-compatibility.
type ErrRateLimited struct {
// Duration to wait until the next API call can be made.
Delay time.Duration
Expand All @@ -43,6 +47,10 @@ func (e ErrRateLimited) Error() string {
// DelayIfRateLimited returns the delay time until the next API call is
// allowed and true if err is of type ErrRateLimited. The zero
// time.Duration value and false are returned if err is not a ErrRateLimited.
//
// Deprecated: as the dynamic RESTMapper now returns NoMatch errors directly
// instead of RateLimited errors, it is meaningless and only kept for the sake
// of backwards-compatibility.
func DelayIfRateLimited(err error) (time.Duration, bool) {
var rlerr ErrRateLimited
if errors.As(err, &rlerr) {
Expand All @@ -56,7 +64,7 @@ func DelayIfRateLimited(err error) (time.Duration, bool) {
type dynamicRESTMapper struct {
mu sync.RWMutex // protects the following fields
staticMapper meta.RESTMapper
limiter *dynamicLimiter
limiter *rate.Limiter
newMapper func() (meta.RESTMapper, error)

lazy bool
Expand All @@ -70,7 +78,7 @@ type DynamicRESTMapperOption func(*dynamicRESTMapper) error
// WithLimiter sets the RESTMapper's underlying limiter to lim.
func WithLimiter(lim *rate.Limiter) DynamicRESTMapperOption {
return func(drm *dynamicRESTMapper) error {
drm.limiter = &dynamicLimiter{lim}
drm.limiter = lim
return nil
}
}
Expand Down Expand Up @@ -103,9 +111,7 @@ func NewDynamicRESTMapper(cfg *rest.Config, opts ...DynamicRESTMapperOption) (me
return nil, err
}
drm := &dynamicRESTMapper{
limiter: &dynamicLimiter{
rate.NewLimiter(rate.Limit(defaultRefillRate), defaultLimitSize),
},
limiter: rate.NewLimiter(rate.Limit(defaultRefillRate), defaultLimitSize),
newMapper: func() (meta.RESTMapper, error) {
groupResources, err := restmapper.GetAPIGroupResources(client)
if err != nil {
Expand Down Expand Up @@ -161,12 +167,13 @@ func (drm *dynamicRESTMapper) init() (err error) {
// checkAndReload attempts to call the given callback, which is assumed to be dependent
// on the data in the restmapper.
//
// If the callback returns a NoKindMatchError, it will attempt to reload
// If the callback returns an error that matches the given error, it will attempt to reload
// the RESTMapper's data and re-call the callback once that's occurred.
// If the callback returns any other error, the function will return immediately regardless.
//
// It will take care
// ensuring that reloads are rate-limitted and that extraneous calls aren't made.
// It will take care of ensuring that reloads are rate-limited and that extraneous calls
// aren't made. If a reload would exceed the limiters rate, it returns the error return by
// the callback.
// It's thread-safe, and worries about thread-safety for the callback (so the callback does
// not need to attempt to lock the restmapper).
func (drm *dynamicRESTMapper) checkAndReload(needsReloadErr error, checkNeedsReload func() error) error {
Expand Down Expand Up @@ -199,7 +206,9 @@ func (drm *dynamicRESTMapper) checkAndReload(needsReloadErr error, checkNeedsRel
}

// we're still stale, so grab a rate-limit token if we can...
if err := drm.limiter.checkRate(); err != nil {
if !drm.limiter.Allow() {
// return error from static mapper here, we have refreshed often enough (exceeding rate of provided limiter)
// so that client's can handle this the same way as a "normal" NoResourceMatchError / NoKindMatchError
return err
}

Expand Down Expand Up @@ -305,19 +314,3 @@ func (drm *dynamicRESTMapper) ResourceSingularizer(resource string) (string, err
})
return singular, err
}

// dynamicLimiter holds a rate limiter used to throttle chatty RESTMapper users.
type dynamicLimiter struct {
*rate.Limiter
}

// checkRate returns an ErrRateLimited if too many API calls have been made
// within the set limit.
func (b *dynamicLimiter) checkRate() error {
res := b.Reserve()
if res.Delay() == 0 {
return nil
}
res.Cancel()
return ErrRateLimited{res.Delay()}
}
75 changes: 47 additions & 28 deletions pkg/client/apiutil/dynamicrestmapper_test.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
package apiutil_test

import (
"errors"
"time"

. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
"github.com/onsi/gomega/format"
"github.com/onsi/gomega/types"
"golang.org/x/time/rate"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/runtime/schema"
Expand Down Expand Up @@ -57,53 +58,49 @@ var _ = Describe("Dynamic REST Mapper", func() {
})

It("should reload if not present in the cache", func() {
By("reading successfully once")
By("reading target successfully once")
Expect(callWithTarget()).To(Succeed())
Expect(callWithOther()).NotTo(Succeed())

By("asking for a something that didn't exist previously after adding it to the mapper")
By("reading other not successfully")
count := 0
addToMapper = func(baseMapper *meta.DefaultRESTMapper) {
count++
baseMapper.Add(targetGVK, meta.RESTScopeNamespace)
baseMapper.Add(secondGVK, meta.RESTScopeNamespace)
}
Expect(callWithOther()).To(Succeed())
Expect(callWithTarget()).To(Succeed())
})
Expect(callWithOther()).To(beNoMatchError())
Expect(count).To(Equal(1), "should reload exactly once")

It("should rate-limit reloads so that we don't get more than a certain number per second", func() {
By("setting a small limit")
*lim = *rate.NewLimiter(rate.Limit(1), 1)

By("forcing a reload after changing the mapper")
By("reading both successfully now")
addToMapper = func(baseMapper *meta.DefaultRESTMapper) {
baseMapper.Add(targetGVK, meta.RESTScopeNamespace)
baseMapper.Add(secondGVK, meta.RESTScopeNamespace)
}
Expect(callWithOther()).To(Succeed())

By("calling another time that would need a requery and failing")
Eventually(func() bool {
return errors.As(callWithTarget(), &apiutil.ErrRateLimited{})
}, "10s").Should(BeTrue())
Expect(callWithTarget()).To(Succeed())
})

It("should rate-limit then allow more at 1rps", func() {
It("should rate-limit then allow more at configured rate", func() {
By("setting a small limit")
*lim = *rate.NewLimiter(rate.Limit(1), 1)
*lim = *rate.NewLimiter(rate.Every(100*time.Millisecond), 1)

By("forcing a reload after changing the mapper")
addToMapper = func(baseMapper *meta.DefaultRESTMapper) {
baseMapper.Add(secondGVK, meta.RESTScopeNamespace)
}

By("calling twice to trigger rate limiting")
Expect(callWithOther()).To(Succeed())
Expect(callWithTarget()).NotTo(Succeed())

// by 2nd call loop should succeed because we canceled our 1st rate-limited token, then waited a full second
By("calling until no longer rate-limited, 2nd call should succeed")
Eventually(func() bool {
return errors.As(callWithTarget(), &apiutil.ErrRateLimited{})
}, "2.5s", "1s").Should(BeFalse())
By("calling another time to trigger rate limiting")
addToMapper = func(baseMapper *meta.DefaultRESTMapper) {
baseMapper.Add(targetGVK, meta.RESTScopeNamespace)
}
// if call consistently fails, we are sure, that it was rate-limited,
// otherwise it would have reloaded and succeeded
Consistently(callWithTarget, "90ms", "10ms").Should(beNoMatchError())

By("calling until no longer rate-limited")
// once call succeeds, we are sure, that it was no longer rate-limited,
// as it was allowed to reload and found matching kind/resource
Eventually(callWithTarget, "30ms", "10ms").Should(And(Succeed(), Not(beNoMatchError())))
})

It("should avoid reloading twice if two requests for the same thing come in", func() {
Expand Down Expand Up @@ -251,3 +248,25 @@ var _ = Describe("Dynamic REST Mapper", func() {
})
})
})

func beNoMatchError() types.GomegaMatcher {
return noMatchErrorMatcher{}
}

type noMatchErrorMatcher struct{}

func (k noMatchErrorMatcher) Match(actual interface{}) (success bool, err error) {
actualErr, actualOk := actual.(error)
if !actualOk {
return false, nil
}

return meta.IsNoMatchError(actualErr), nil
}

func (k noMatchErrorMatcher) FailureMessage(actual interface{}) (message string) {
return format.Message(actual, "to be a NoMatchError")
}
func (k noMatchErrorMatcher) NegatedFailureMessage(actual interface{}) (message string) {
return format.Message(actual, "not to be a NoMatchError")
}

0 comments on commit 93a5448

Please sign in to comment.