Skip to content

Commit

Permalink
fix querying local empty list error
Browse files Browse the repository at this point in the history
  • Loading branch information
qclc committed Jun 1, 2021
1 parent d23036b commit 80f4a33
Show file tree
Hide file tree
Showing 6 changed files with 580 additions and 43 deletions.
38 changes: 31 additions & 7 deletions pkg/yurthub/cachemanager/cache_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,12 +52,16 @@ type CacheManager interface {
UpdateCacheAgents(agents []string) error
ListCacheAgents() []string
CanCacheFor(req *http.Request) bool
UpdateKind(gvk schema.GroupVersionKind) error
DeleteKindFor(gvr schema.GroupVersionResource) error
ListCacheCRD() map[schema.GroupVersionResource]schema.GroupVersionKind
}

type cacheManager struct {
sync.RWMutex
storage StorageWrapper
serializerManager *serializer.SerializerManager
restMapperManager *serializer.RESTMapperManager
cacheAgents map[string]bool
listSelectorCollector map[string]string
}
Expand All @@ -74,6 +78,7 @@ func NewCacheManager(
listSelectorCollector: make(map[string]string),
}

cm.initRESTMapperManager()
err := cm.initCacheAgents()
if err != nil {
return nil, err
Expand Down Expand Up @@ -152,13 +157,19 @@ func (cm *cacheManager) queryListObject(req *http.Request) (runtime.Object, erro
if err != nil {
return nil, err
} else if len(objs) == 0 {
gvk, err = serializer.UnsafeDefaultRESTMapper.KindFor(schema.GroupVersionResource{
gvr := schema.GroupVersionResource{
Group: info.APIGroup,
Version: info.APIVersion,
Resource: info.Resource,
})
if err != nil {
return nil, err
}
var isScheme, isCustom bool
isScheme, gvk = cm.restMapperManager.IsSchemeResource(gvr)
if !isScheme {
isCustom, gvk = cm.restMapperManager.IsCustomResource(gvr)
if !isCustom {
// Unrecognized gvr are treated as unregistered resource, and 404 code(not found) will be returned
return nil, fmt.Errorf("no matches for %v", gvr)
}
}
kind = gvk.Kind
} else {
Expand Down Expand Up @@ -346,10 +357,17 @@ func (cm *cacheManager) saveListObject(ctx context.Context, info *apirequest.Req
apiVersion := schema.GroupVersion{
Group: info.APIGroup,
Version: info.APIVersion,
}.String()
}
accessor := meta.NewAccessor()

comp, _ := util.ClientComponentFrom(ctx)

// Verify if DynamicRESTMapper(which store the CRD information) needs to be updated
gvk := apiVersion.WithKind(kind)
if err = cm.UpdateKind(gvk); err != nil {
klog.Errorf("failed to update the DynamicRESTMapper %v", err)
}

// even if no objects in cloud cluster, we need to
// make up a storage that represents the no resources
// in local disk, so when cloud-edge network disconnected,
Expand All @@ -361,7 +379,7 @@ func (cm *cacheManager) saveListObject(ctx context.Context, info *apirequest.Req
} else if info.Name != "" && len(items) == 1 {
// list with fieldSelector=metadata.name=xxx
accessor.SetKind(items[0], kind)
accessor.SetAPIVersion(items[0], apiVersion)
accessor.SetAPIVersion(items[0], apiVersion.String())
name, _ := accessor.Name(items[0])
ns, _ := accessor.Namespace(items[0])
if ns == "" {
Expand All @@ -381,7 +399,7 @@ func (cm *cacheManager) saveListObject(ctx context.Context, info *apirequest.Req
objs := make(map[string]runtime.Object)
for i := range items {
accessor.SetKind(items[i], kind)
accessor.SetAPIVersion(items[i], apiVersion)
accessor.SetAPIVersion(items[i], apiVersion.String())
name, _ := accessor.Name(items[i])
ns, _ := accessor.Namespace(items[i])
if ns == "" {
Expand Down Expand Up @@ -455,6 +473,12 @@ func (cm *cacheManager) saveOneObjectWithValidation(key string, obj runtime.Obje
return fmt.Errorf("pod(%s/%s) is not assigned to a node, skip cache it.", ns, name)
}

// Verify if DynamicRESTMapper(which store the CRD info) needs to be updated
gvk := obj.GetObjectKind().GroupVersionKind()
if err := cm.UpdateKind(gvk); err != nil {
klog.Errorf("failed to update the DynamicRESTMapper %v", err)
}

oldObj, err := cm.storage.Get(key)
if err == nil && oldObj != nil {
oldRv, err := accessor.ResourceVersion(oldObj)
Expand Down
145 changes: 123 additions & 22 deletions pkg/yurthub/cachemanager/cache_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/apiserver/pkg/endpoints/filters"
)
Expand Down Expand Up @@ -424,6 +425,7 @@ func TestCacheGetResponse(t *testing.T) {
resolver := newTestRequestInfoResolver()
for k, tt := range testcases {
t.Run(k, func(t *testing.T) {
yurtCM.initRESTMapperManager()
s := serializerM.CreateSerializer(tt.accept, tt.group, tt.version, tt.resource)
encoder, err := s.Encoder(tt.accept, nil)
if err != nil {
Expand Down Expand Up @@ -506,6 +508,10 @@ func TestCacheGetResponse(t *testing.T) {
if err != nil {
t.Errorf("failed to delete collection: kubelet, %v", err)
}
err = sWrapper.Delete(cacheDynamicRESTMapperKey)
if err != nil {
t.Errorf("failed to delete Cached DynamicRESTMapper, %v", err)
}
})
}
}
Expand Down Expand Up @@ -766,6 +772,7 @@ func TestCacheWatchResponse(t *testing.T) {
resolver := newTestRequestInfoResolver()
for k, tt := range testcases {
t.Run(k, func(t *testing.T) {
yurtCM.initRESTMapperManager()
s := serializerM.CreateSerializer(tt.accept, tt.group, tt.version, tt.resource)
r, w := io.Pipe()
go func(w *io.PipeWriter) {
Expand Down Expand Up @@ -828,6 +835,10 @@ func TestCacheWatchResponse(t *testing.T) {
if err != nil {
t.Errorf("failed to delete collection: kubelet, %v", err)
}
err = sWrapper.Delete(cacheDynamicRESTMapperKey)
if err != nil {
t.Errorf("failed to delete Cached DynamicRESTMapper, %v", err)
}
})
}
}
Expand Down Expand Up @@ -1213,11 +1224,43 @@ func TestCacheListResponse(t *testing.T) {
},
},
},
"list foos with no objects": {
group: "samplecontroller.k8s.io",
version: "v1",
key: "kubelet/foos",
inputObj: runtime.Object(
&unstructured.UnstructuredList{
Object: map[string]interface{}{
"apiVersion": "samplecontroller.k8s.io/v1",
"kind": "FooList",
"metadata": map[string]interface{}{
"continue": "",
"resourceVersion": "2",
"selfLink": "/apis/samplecontroller.k8s.io/v1/foos",
},
},
Items: []unstructured.Unstructured{},
},
),
userAgent: "kubelet",
accept: "application/json",
verb: "GET",
path: "/apis/samplecontroller.k8s.io/v1/foos",
resource: "foos",
namespaced: false,
expectResult: struct {
err bool
data map[string]struct{}
}{
data: map[string]struct{}{},
},
},
}

resolver := newTestRequestInfoResolver()
for k, tt := range testcases {
t.Run(k, func(t *testing.T) {
yurtCM.initRESTMapperManager()
s := serializerM.CreateSerializer(tt.accept, tt.group, tt.version, tt.resource)
encoder, err := s.Encoder(tt.accept, nil)
if err != nil {
Expand Down Expand Up @@ -1252,7 +1295,6 @@ func TestCacheListResponse(t *testing.T) {
handler = proxyutil.WithRequestClientComponent(handler)
handler = filters.WithRequestInfo(handler, resolver)
handler.ServeHTTP(httptest.NewRecorder(), req)

if tt.expectResult.err {
if err == nil {
t.Error("Got no error, but expect err")
Expand All @@ -1261,8 +1303,26 @@ func TestCacheListResponse(t *testing.T) {
if err != nil {
t.Errorf("Got error %v", err)
}

objs, err := sWrapper.List(tt.key)
if len(objs) == 0 {
originGVR := schema.GroupVersionResource{
Group: tt.group,
Version: tt.version,
Resource: tt.resource,
}
if isScheme, _ := yurtCM.restMapperManager.IsSchemeResource(originGVR); isScheme {
if isCustom, _ := yurtCM.restMapperManager.IsCustomResource(originGVR); isCustom {
t.Errorf("%s, is a registered resource but stored in the CRDRESTMapper",
originGVR.String())
}
} else {
if isCustom, _ := yurtCM.restMapperManager.IsCustomResource(originGVR); !isCustom {
t.Errorf("%s, is not stored in the DynamicRESTMapper",
originGVR.String())
}
}
}

if err != nil {
if err != tt.cacheErr {
t.Errorf("expect error %v, but got %v", tt.cacheErr, err)
Expand All @@ -1277,6 +1337,10 @@ func TestCacheListResponse(t *testing.T) {
if err != nil {
t.Errorf("failed to delete collection: kubelet, %v", err)
}
err = sWrapper.Delete(cacheDynamicRESTMapperKey)
if err != nil {
t.Errorf("failed to delete cached DynamicRESTMapper, %v", err)
}
})
}
}
Expand Down Expand Up @@ -1746,6 +1810,7 @@ func TestQueryCacheForGet(t *testing.T) {
resolver := newTestRequestInfoResolver()
for k, tt := range testcases {
t.Run(k, func(t *testing.T) {
yurtCM.initRESTMapperManager()
_ = sWrapper.Create(tt.key, tt.inputObj)
req, _ := http.NewRequest(tt.verb, tt.path, nil)
if len(tt.userAgent) != 0 {
Expand Down Expand Up @@ -1807,6 +1872,10 @@ func TestQueryCacheForGet(t *testing.T) {
if err != nil {
t.Errorf("failed to delete collection: kubelet, %v", err)
}
err = sWrapper.Delete(cacheDynamicRESTMapperKey)
if err != nil {
t.Errorf("failed to delete Cached DynamicRESTMapper, %v", err)
}
})
}
}
Expand All @@ -1827,6 +1896,7 @@ func TestQueryCacheForList(t *testing.T) {
testcases := map[string]struct {
keyPrefix string
noObjs bool
cachedKind string
inputObj []runtime.Object
userAgent string
accept string
Expand Down Expand Up @@ -1971,6 +2041,39 @@ func TestQueryCacheForList(t *testing.T) {
},
},
},
"list runtimeclass": {
keyPrefix: "kubelet/runtimeclasses",
noObjs: true,
userAgent: "kubelet",
accept: "application/json",
verb: "GET",
path: "/apis/node.k8s.io/v1beta1/runtimeclasses",
namespaced: false,
expectResult: struct {
err bool
rv string
data map[string]struct{}
}{
data: map[string]struct{}{},
},
},
"list pods and no pods in cache": {
keyPrefix: "kubelet/pods",
noObjs: true,
userAgent: "kubelet",
accept: "application/json",
verb: "GET",
path: "/api/v1/pods",
namespaced: false,
expectResult: struct {
err bool
rv string
data map[string]struct{}
}{
err: true,
},
queryErr: storage.ErrStorageNotFound,
},

//used to test whether the query local Custom Resource list request can be handled correctly
"list crontabs": {
Expand Down Expand Up @@ -2080,13 +2183,14 @@ func TestQueryCacheForList(t *testing.T) {
},
},
},
"list runtimeclass": {
keyPrefix: "kubelet/runtimeclasses",
"list foos with no objs": {
keyPrefix: "kubelet/foos",
noObjs: true,
cachedKind: "samplecontroller.k8s.io/v1/Foo",
userAgent: "kubelet",
accept: "application/json",
verb: "GET",
path: "/apis/node.k8s.io/v1beta1/runtimeclasses",
path: "/apis/samplecontroller.k8s.io/v1/foos",
namespaced: false,
expectResult: struct {
err bool
Expand All @@ -2096,23 +2200,6 @@ func TestQueryCacheForList(t *testing.T) {
data: map[string]struct{}{},
},
},
"list pods and no pods in cache": {
keyPrefix: "kubelet/pods",
noObjs: true,
userAgent: "kubelet",
accept: "application/json",
verb: "GET",
path: "/api/v1/pods",
namespaced: false,
expectResult: struct {
err bool
rv string
data map[string]struct{}
}{
err: true,
},
queryErr: storage.ErrStorageNotFound,
},
"list resources not exist": {
userAgent: "kubelet",
accept: "application/json",
Expand All @@ -2134,6 +2221,7 @@ func TestQueryCacheForList(t *testing.T) {
resolver := newTestRequestInfoResolver()
for k, tt := range testcases {
t.Run(k, func(t *testing.T) {
yurtCM.initRESTMapperManager()
for i := range tt.inputObj {
v, _ := accessor.Name(tt.inputObj[i])
key := filepath.Join(tt.keyPrefix, v)
Expand All @@ -2142,6 +2230,15 @@ func TestQueryCacheForList(t *testing.T) {

if tt.noObjs {
_ = sWrapper.Create(tt.keyPrefix, nil)
if tt.cachedKind != "" {
info := strings.Split(tt.cachedKind, sepForGVR)
gvk := schema.GroupVersionKind{
Group: info[0],
Version: info[1],
Kind: info[2],
}
_ = yurtCM.UpdateKind(gvk)
}
}

req, _ := http.NewRequest(tt.verb, tt.path, nil)
Expand Down Expand Up @@ -2197,6 +2294,10 @@ func TestQueryCacheForList(t *testing.T) {
if err != nil {
t.Errorf("failed to delete collection: kubelet, %v", err)
}
err = sWrapper.Delete(cacheDynamicRESTMapperKey)
if err != nil {
t.Errorf("failed to delete cached DynamicRESTMapper, %v", err)
}
})
}
}
Expand Down
Loading

0 comments on commit 80f4a33

Please sign in to comment.