diff --git a/discovery/cached/memcache.go b/discovery/cached/memcache.go index 702392514b..bd6b478c90 100644 --- a/discovery/cached/memcache.go +++ b/discovery/cached/memcache.go @@ -19,10 +19,14 @@ package cached import ( "errors" "fmt" + "net" + "net/url" "sync" + "syscall" "github.com/googleapis/gnostic/OpenAPIv2" + errorsutil "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/version" @@ -30,40 +34,87 @@ import ( restclient "k8s.io/client-go/rest" ) +type cacheEntry struct { + resourceList *metav1.APIResourceList + err error +} + // memCacheClient can Invalidate() to stay up-to-date with discovery // information. // -// TODO: Switch to a watch interface. Right now it will poll anytime -// Invalidate() is called. +// TODO: Switch to a watch interface. Right now it will poll after each +// Invalidate() call. type memCacheClient struct { delegate discovery.DiscoveryInterface lock sync.RWMutex - groupToServerResources map[string]*metav1.APIResourceList + groupToServerResources map[string]*cacheEntry groupList *metav1.APIGroupList cacheValid bool } // Error Constants var ( - ErrCacheEmpty = errors.New("the cache has not been filled yet") ErrCacheNotFound = errors.New("not found") ) var _ discovery.CachedDiscoveryInterface = &memCacheClient{} +// isTransientConnectionError checks whether given error is "Connection refused" or +// "Connection reset" error which usually means that apiserver is temporarily +// unavailable. +func isTransientConnectionError(err error) bool { + urlError, ok := err.(*url.Error) + if !ok { + return false + } + opError, ok := urlError.Err.(*net.OpError) + if !ok { + return false + } + errno, ok := opError.Err.(syscall.Errno) + if !ok { + return false + } + return errno == syscall.ECONNREFUSED || errno == syscall.ECONNRESET +} + +func isTransientError(err error) bool { + if isTransientConnectionError(err) { + return true + } + + if t, ok := err.(errorsutil.APIStatus); ok && t.Status().Code >= 500 { + return true + } + + return errorsutil.IsTooManyRequests(err) +} + // ServerResourcesForGroupVersion returns the supported resources for a group and version. func (d *memCacheClient) ServerResourcesForGroupVersion(groupVersion string) (*metav1.APIResourceList, error) { - d.lock.RLock() - defer d.lock.RUnlock() + d.lock.Lock() + defer d.lock.Unlock() if !d.cacheValid { - return nil, ErrCacheEmpty + if err := d.refreshLocked(); err != nil { + return nil, err + } } cachedVal, ok := d.groupToServerResources[groupVersion] if !ok { return nil, ErrCacheNotFound } - return cachedVal, nil + + if cachedVal.err != nil && isTransientError(cachedVal.err) { + r, err := d.serverResourcesForGroupVersion(groupVersion) + if err != nil { + utilruntime.HandleError(fmt.Errorf("couldn't get resource list for %v: %v", groupVersion, err)) + } + cachedVal = &cacheEntry{r, err} + d.groupToServerResources[groupVersion] = cachedVal + } + + return cachedVal.resourceList, cachedVal.err } // ServerResources returns the supported resources for all groups and versions. @@ -72,10 +123,12 @@ func (d *memCacheClient) ServerResources() ([]*metav1.APIResourceList, error) { } func (d *memCacheClient) ServerGroups() (*metav1.APIGroupList, error) { - d.lock.RLock() - defer d.lock.RUnlock() - if d.groupList == nil { - return nil, ErrCacheEmpty + d.lock.Lock() + defer d.lock.Unlock() + if !d.cacheValid { + if err := d.refreshLocked(); err != nil { + return nil, err + } } return d.groupList, nil } @@ -103,49 +156,59 @@ func (d *memCacheClient) OpenAPISchema() (*openapi_v2.Document, error) { func (d *memCacheClient) Fresh() bool { d.lock.RLock() defer d.lock.RUnlock() - // Fresh is supposed to tell the caller whether or not to retry if the cache - // fails to find something. The idea here is that Invalidate will be called - // periodically and therefore we'll always be returning the latest data. (And - // in the future we can watch and stay even more up-to-date.) So we only - // return false if the cache has never been filled. + // Return whether the cache is populated at all. It is still possible that + // a single entry is missing due to transient errors and the attempt to read + // that entry will trigger retry. return d.cacheValid } -// Invalidate refreshes the cache, blocking calls until the cache has been -// refreshed. It would be trivial to make a version that does this in the -// background while continuing to respond to requests if needed. +// Invalidate enforces that no cached data that is older than the current time +// is used. func (d *memCacheClient) Invalidate() { d.lock.Lock() defer d.lock.Unlock() + d.cacheValid = false + d.groupToServerResources = nil + d.groupList = nil +} +// refreshLocked refreshes the state of cache. The caller must hold d.lock for +// writing. +func (d *memCacheClient) refreshLocked() error { // TODO: Could this multiplicative set of calls be replaced by a single call // to ServerResources? If it's possible for more than one resulting // APIResourceList to have the same GroupVersion, the lists would need merged. gl, err := d.delegate.ServerGroups() if err != nil || len(gl.Groups) == 0 { - utilruntime.HandleError(fmt.Errorf("couldn't get current server API group list; will keep using cached value. (%v)", err)) - return + utilruntime.HandleError(fmt.Errorf("couldn't get current server API group list: %v", err)) + return err } - rl := map[string]*metav1.APIResourceList{} + rl := map[string]*cacheEntry{} for _, g := range gl.Groups { for _, v := range g.Versions { - r, err := d.delegate.ServerResourcesForGroupVersion(v.GroupVersion) - if err != nil || len(r.APIResources) == 0 { + r, err := d.serverResourcesForGroupVersion(v.GroupVersion) + rl[v.GroupVersion] = &cacheEntry{r, err} + if err != nil { utilruntime.HandleError(fmt.Errorf("couldn't get resource list for %v: %v", v.GroupVersion, err)) - if cur, ok := d.groupToServerResources[v.GroupVersion]; ok { - // retain the existing list, if we had it. - r = cur - } else { - continue - } } - rl[v.GroupVersion] = r } } d.groupToServerResources, d.groupList = rl, gl d.cacheValid = true + return nil +} + +func (d *memCacheClient) serverResourcesForGroupVersion(groupVersion string) (*metav1.APIResourceList, error) { + r, err := d.delegate.ServerResourcesForGroupVersion(groupVersion) + if err != nil { + return r, err + } + if len(r.APIResources) == 0 { + return r, fmt.Errorf("Got empty response for: %v", groupVersion) + } + return r, nil } // NewMemCacheClient creates a new CachedDiscoveryInterface which caches @@ -156,6 +219,6 @@ func (d *memCacheClient) Invalidate() { func NewMemCacheClient(delegate discovery.DiscoveryInterface) discovery.CachedDiscoveryInterface { return &memCacheClient{ delegate: delegate, - groupToServerResources: map[string]*metav1.APIResourceList{}, + groupToServerResources: map[string]*cacheEntry{}, } } diff --git a/discovery/cached/memcache_test.go b/discovery/cached/memcache_test.go index a9caa6e83d..680c68d896 100644 --- a/discovery/cached/memcache_test.go +++ b/discovery/cached/memcache_test.go @@ -18,27 +18,35 @@ package cached import ( "errors" + "net/http" "reflect" "sync" "testing" + errorsutil "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/discovery/fake" ) +type resourceMapEntry struct { + list *metav1.APIResourceList + err error +} + type fakeDiscovery struct { *fake.FakeDiscovery - lock sync.Mutex - groupList *metav1.APIGroupList - resourceMap map[string]*metav1.APIResourceList + lock sync.Mutex + groupList *metav1.APIGroupList + groupListErr error + resourceMap map[string]*resourceMapEntry } func (c *fakeDiscovery) ServerResourcesForGroupVersion(groupVersion string) (*metav1.APIResourceList, error) { c.lock.Lock() defer c.lock.Unlock() if rl, ok := c.resourceMap[groupVersion]; ok { - return rl, nil + return rl.list, rl.err } return nil, errors.New("doesn't exist") } @@ -49,7 +57,7 @@ func (c *fakeDiscovery) ServerGroups() (*metav1.APIGroupList, error) { if c.groupList == nil { return nil, errors.New("doesn't exist") } - return c.groupList, nil + return c.groupList, c.groupListErr } func TestClient(t *testing.T) { @@ -63,33 +71,37 @@ func TestClient(t *testing.T) { }}, }}, }, - resourceMap: map[string]*metav1.APIResourceList{ + resourceMap: map[string]*resourceMapEntry{ "astronomy/v8beta1": { - GroupVersion: "astronomy/v8beta1", - APIResources: []metav1.APIResource{{ - Name: "dwarfplanets", - SingularName: "dwarfplanet", - Namespaced: true, - Kind: "DwarfPlanet", - ShortNames: []string{"dp"}, - }}, + list: &metav1.APIResourceList{ + GroupVersion: "astronomy/v8beta1", + APIResources: []metav1.APIResource{{ + Name: "dwarfplanets", + SingularName: "dwarfplanet", + Namespaced: true, + Kind: "DwarfPlanet", + ShortNames: []string{"dp"}, + }}, + }, }, }, } c := NewMemCacheClient(fake) - g, err := c.ServerGroups() - if err == nil { - t.Errorf("Unexpected non-error.") - } if c.Fresh() { t.Errorf("Expected not fresh.") } - - c.Invalidate() + g, err := c.ServerGroups() + if err != nil { + t.Errorf("Unexpected error: %v", err) + } if !c.Fresh() { t.Errorf("Expected fresh.") } + c.Invalidate() + if c.Fresh() { + t.Errorf("Expected not fresh.") + } g, err = c.ServerGroups() if err != nil { @@ -98,35 +110,268 @@ func TestClient(t *testing.T) { if e, a := fake.groupList, g; !reflect.DeepEqual(e, a) { t.Errorf("Expected %#v, got %#v", e, a) } + if !c.Fresh() { + t.Errorf("Expected fresh.") + } r, err := c.ServerResourcesForGroupVersion("astronomy/v8beta1") if err != nil { t.Errorf("Unexpected error: %v", err) } - if e, a := fake.resourceMap["astronomy/v8beta1"], r; !reflect.DeepEqual(e, a) { + if e, a := fake.resourceMap["astronomy/v8beta1"].list, r; !reflect.DeepEqual(e, a) { t.Errorf("Expected %#v, got %#v", e, a) } fake.lock.Lock() - fake.resourceMap = map[string]*metav1.APIResourceList{ + fake.resourceMap = map[string]*resourceMapEntry{ "astronomy/v8beta1": { + list: &metav1.APIResourceList{ + GroupVersion: "astronomy/v8beta1", + APIResources: []metav1.APIResource{{ + Name: "stars", + SingularName: "star", + Namespaced: true, + Kind: "Star", + ShortNames: []string{"s"}, + }}, + }, + }, + } + fake.lock.Unlock() + + c.Invalidate() + r, err = c.ServerResourcesForGroupVersion("astronomy/v8beta1") + if err != nil { + t.Errorf("Unexpected error: %v", err) + } + if e, a := fake.resourceMap["astronomy/v8beta1"].list, r; !reflect.DeepEqual(e, a) { + t.Errorf("Expected %#v, got %#v", e, a) + } +} + +func TestServerGroupsFails(t *testing.T) { + fake := &fakeDiscovery{ + groupList: &metav1.APIGroupList{ + Groups: []metav1.APIGroup{{ + Name: "astronomy", + Versions: []metav1.GroupVersionForDiscovery{{ + GroupVersion: "astronomy/v8beta1", + Version: "v8beta1", + }}, + }}, + }, + groupListErr: errors.New("some error"), + resourceMap: map[string]*resourceMapEntry{ + "astronomy/v8beta1": { + list: &metav1.APIResourceList{ + GroupVersion: "astronomy/v8beta1", + APIResources: []metav1.APIResource{{ + Name: "dwarfplanets", + SingularName: "dwarfplanet", + Namespaced: true, + Kind: "DwarfPlanet", + ShortNames: []string{"dp"}, + }}, + }, + }, + }, + } + + c := NewMemCacheClient(fake) + if c.Fresh() { + t.Errorf("Expected not fresh.") + } + _, err := c.ServerGroups() + if err == nil { + t.Errorf("Expected error") + } + if c.Fresh() { + t.Errorf("Expected not fresh.") + } + fake.lock.Lock() + fake.groupListErr = nil + fake.lock.Unlock() + r, err := c.ServerResourcesForGroupVersion("astronomy/v8beta1") + if err != nil { + t.Errorf("Unexpected error: %v", err) + } + if e, a := fake.resourceMap["astronomy/v8beta1"].list, r; !reflect.DeepEqual(e, a) { + t.Errorf("Expected %#v, got %#v", e, a) + } + if !c.Fresh() { + t.Errorf("Expected not fresh.") + } +} + +func TestPartialPermanentFailure(t *testing.T) { + fake := &fakeDiscovery{ + groupList: &metav1.APIGroupList{ + Groups: []metav1.APIGroup{{ + Name: "astronomy", + Versions: []metav1.GroupVersionForDiscovery{{ + GroupVersion: "astronomy/v8beta1", + Version: "v8beta1", + }, { + GroupVersion: "astronomy2/v8beta1", + Version: "v8beta1", + }}, + }}, + }, + resourceMap: map[string]*resourceMapEntry{ + "astronomy/v8beta1": { + err: errors.New("some permanent error"), + }, + "astronomy2/v8beta1": { + list: &metav1.APIResourceList{ + GroupVersion: "astronomy2/v8beta1", + APIResources: []metav1.APIResource{{ + Name: "dwarfplanets", + SingularName: "dwarfplanet", + Namespaced: true, + Kind: "DwarfPlanet", + ShortNames: []string{"dp"}, + }}, + }, + }, + }, + } + + c := NewMemCacheClient(fake) + if c.Fresh() { + t.Errorf("Expected not fresh.") + } + r, err := c.ServerResourcesForGroupVersion("astronomy2/v8beta1") + if err != nil { + t.Errorf("Unexpected error: %v", err) + } + if e, a := fake.resourceMap["astronomy2/v8beta1"].list, r; !reflect.DeepEqual(e, a) { + t.Errorf("Expected %#v, got %#v", e, a) + } + _, err = c.ServerResourcesForGroupVersion("astronomy/v8beta1") + if err == nil { + t.Errorf("Expected error, got nil") + } + + fake.lock.Lock() + fake.resourceMap["astronomy/v8beta1"] = &resourceMapEntry{ + list: &metav1.APIResourceList{ GroupVersion: "astronomy/v8beta1", APIResources: []metav1.APIResource{{ - Name: "stars", - SingularName: "star", + Name: "dwarfplanets", + SingularName: "dwarfplanet", Namespaced: true, - Kind: "Star", - ShortNames: []string{"s"}, + Kind: "DwarfPlanet", + ShortNames: []string{"dp"}, }}, }, + err: nil, } fake.lock.Unlock() - + // We don't retry permanent errors, so it should fail. + _, err = c.ServerResourcesForGroupVersion("astronomy/v8beta1") + if err == nil { + t.Errorf("Expected error, got nil") + } c.Invalidate() + + // After Invalidate, we should retry. r, err = c.ServerResourcesForGroupVersion("astronomy/v8beta1") if err != nil { t.Errorf("Unexpected error: %v", err) } - if e, a := fake.resourceMap["astronomy/v8beta1"], r; !reflect.DeepEqual(e, a) { + if e, a := fake.resourceMap["astronomy/v8beta1"].list, r; !reflect.DeepEqual(e, a) { + t.Errorf("Expected %#v, got %#v", e, a) + } +} + +func TestPartialRetryableFailure(t *testing.T) { + fake := &fakeDiscovery{ + groupList: &metav1.APIGroupList{ + Groups: []metav1.APIGroup{{ + Name: "astronomy", + Versions: []metav1.GroupVersionForDiscovery{{ + GroupVersion: "astronomy/v8beta1", + Version: "v8beta1", + }, { + GroupVersion: "astronomy2/v8beta1", + Version: "v8beta1", + }}, + }}, + }, + resourceMap: map[string]*resourceMapEntry{ + "astronomy/v8beta1": { + err: &errorsutil.StatusError{ + ErrStatus: metav1.Status{ + Message: "Some retryable error", + Code: int32(http.StatusServiceUnavailable), + Reason: metav1.StatusReasonServiceUnavailable, + }, + }, + }, + "astronomy2/v8beta1": { + list: &metav1.APIResourceList{ + GroupVersion: "astronomy2/v8beta1", + APIResources: []metav1.APIResource{{ + Name: "dwarfplanets", + SingularName: "dwarfplanet", + Namespaced: true, + Kind: "DwarfPlanet", + ShortNames: []string{"dp"}, + }}, + }, + }, + }, + } + + c := NewMemCacheClient(fake) + if c.Fresh() { + t.Errorf("Expected not fresh.") + } + r, err := c.ServerResourcesForGroupVersion("astronomy2/v8beta1") + if err != nil { + t.Errorf("Unexpected error: %v", err) + } + if e, a := fake.resourceMap["astronomy2/v8beta1"].list, r; !reflect.DeepEqual(e, a) { + t.Errorf("Expected %#v, got %#v", e, a) + } + _, err = c.ServerResourcesForGroupVersion("astronomy/v8beta1") + if err == nil { + t.Errorf("Expected error, got nil") + } + + fake.lock.Lock() + fake.resourceMap["astronomy/v8beta1"] = &resourceMapEntry{ + list: &metav1.APIResourceList{ + GroupVersion: "astronomy/v8beta1", + APIResources: []metav1.APIResource{{ + Name: "dwarfplanets", + SingularName: "dwarfplanet", + Namespaced: true, + Kind: "DwarfPlanet", + ShortNames: []string{"dp"}, + }}, + }, + err: nil, + } + fake.lock.Unlock() + // We should retry retryable error even without Invalidate() being called, + // so no error is expected. + r, err = c.ServerResourcesForGroupVersion("astronomy/v8beta1") + if err != nil { + t.Errorf("Expected no error, got %v", err) + } + if e, a := fake.resourceMap["astronomy/v8beta1"].list, r; !reflect.DeepEqual(e, a) { + t.Errorf("Expected %#v, got %#v", e, a) + } + + // Check that the last result was cached and we don't retry further. + fake.lock.Lock() + fake.resourceMap["astronomy/v8beta1"].err = errors.New("some permanent error") + fake.lock.Unlock() + r, err = c.ServerResourcesForGroupVersion("astronomy/v8beta1") + if err != nil { + t.Errorf("Expected no error, got %v", err) + } + if e, a := fake.resourceMap["astronomy/v8beta1"].list, r; !reflect.DeepEqual(e, a) { t.Errorf("Expected %#v, got %#v", e, a) } } diff --git a/discovery/discovery_client.go b/discovery/discovery_client.go index 17b39de053..52f67af1de 100644 --- a/discovery/discovery_client.go +++ b/discovery/discovery_client.go @@ -26,7 +26,7 @@ import ( "time" "github.com/golang/protobuf/proto" - "github.com/googleapis/gnostic/OpenAPIv2" + openapi_v2 "github.com/googleapis/gnostic/OpenAPIv2" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -60,6 +60,9 @@ type DiscoveryInterface interface { } // CachedDiscoveryInterface is a DiscoveryInterface with cache invalidation and freshness. +// Note that If the ServerResourcesForGroupVersion method returns a cache miss +// error, the user needs to explicitly call Invalidate to clear the cache, +// otherwise the same cache miss error will be returned next time. type CachedDiscoveryInterface interface { DiscoveryInterface // Fresh is supposed to tell the caller whether or not to retry if the cache @@ -68,7 +71,8 @@ type CachedDiscoveryInterface interface { // TODO: this needs to be revisited, this interface can't be locked properly // and doesn't make a lot of sense. Fresh() bool - // Invalidate enforces that no cached data is used in the future that is older than the current time. + // Invalidate enforces that no cached data that is older than the current time + // is used. Invalidate() }