Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

bugfix: fix querying local empty list error #326

Merged
merged 1 commit into from
Aug 6, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions cmd/yurthub/app/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/openyurtio/openyurt/pkg/yurthub/filter"
"github.com/openyurtio/openyurt/pkg/yurthub/filter/masterservice"
"github.com/openyurtio/openyurt/pkg/yurthub/filter/servicetopology"
"github.com/openyurtio/openyurt/pkg/yurthub/kubernetes/meta"
"github.com/openyurtio/openyurt/pkg/yurthub/kubernetes/serializer"
"github.com/openyurtio/openyurt/pkg/yurthub/storage/factory"
yurtclientset "github.com/openyurtio/yurt-app-manager-api/pkg/yurtappmanager/client/clientset/versioned"
Expand Down Expand Up @@ -68,6 +69,7 @@ type YurtHubConfiguration struct {
HubAgentDummyIfName string
StorageWrapper cachemanager.StorageWrapper
SerializerManager *serializer.SerializerManager
RESTMapperManager *meta.RESTMapperManager
TLSConfig *tls.Config
MutatedMasterServiceAddr string
Filters *filter.Filters
Expand All @@ -89,6 +91,7 @@ func Complete(options *options.YurtHubOptions) (*YurtHubConfiguration, error) {
}
storageWrapper := cachemanager.NewStorageWrapper(storageManager)
serializerManager := serializer.NewSerializerManager()
restMapperManager := meta.NewRESTMapperManager(storageManager)

hubServerAddr := net.JoinHostPort(options.YurtHubHost, options.YurtHubPort)
proxyServerAddr := net.JoinHostPort(options.YurtHubHost, options.YurtHubProxyPort)
Expand Down Expand Up @@ -141,6 +144,7 @@ func Complete(options *options.YurtHubOptions) (*YurtHubConfiguration, error) {
HubAgentDummyIfName: options.HubAgentDummyIfName,
StorageWrapper: storageWrapper,
SerializerManager: serializerManager,
RESTMapperManager: restMapperManager,
MutatedMasterServiceAddr: mutatedMasterServiceAddr,
Filters: filters,
SharedFactory: sharedFactory,
Expand Down
2 changes: 1 addition & 1 deletion cmd/yurthub/app/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ func Run(cfg *config.YurtHubConfiguration, stopCh <-chan struct{}) error {
trace++

klog.Infof("%d. new cache manager with storage wrapper and serializer manager", trace)
cacheMgr, err := cachemanager.NewCacheManager(cfg.StorageWrapper, cfg.SerializerManager)
cacheMgr, err := cachemanager.NewCacheManager(cfg.StorageWrapper, cfg.SerializerManager, cfg.RESTMapperManager)
if err != nil {
klog.Errorf("could not new cache manager, %v", err)
return err
Expand Down
6 changes: 3 additions & 3 deletions pkg/yurthub/cachemanager/cache_agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func TestInitCacheAgents(t *testing.T) {
t.Errorf("failed to create disk storage, %v", err)
}
s := NewStorageWrapper(dStorage)
m, _ := NewCacheManager(s, nil)
m, _ := NewCacheManager(s, nil, nil)

// default cache agents in fake store
b, err := s.GetRaw(cacheAgentsKey)
Expand All @@ -52,7 +52,7 @@ func TestInitCacheAgents(t *testing.T) {
// add agents for next init cache
_ = m.UpdateCacheAgents([]string{"agent1"})

_, _ = NewCacheManager(s, nil)
_, _ = NewCacheManager(s, nil, nil)

b2, err := s.GetRaw(cacheAgentsKey)
if err != nil {
Expand Down Expand Up @@ -81,7 +81,7 @@ func TestUpdateCacheAgents(t *testing.T) {
t.Errorf("failed to create disk storage, %v", err)
}
s := NewStorageWrapper(dStorage)
m, _ := NewCacheManager(s, nil)
m, _ := NewCacheManager(s, nil, nil)

testcases := map[string]struct {
desc string
Expand Down
72 changes: 51 additions & 21 deletions pkg/yurthub/cachemanager/cache_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"strings"
"sync"

hubmeta "github.com/openyurtio/openyurt/pkg/yurthub/kubernetes/meta"
"github.com/openyurtio/openyurt/pkg/yurthub/kubernetes/serializer"
"github.com/openyurtio/openyurt/pkg/yurthub/storage"
"github.com/openyurtio/openyurt/pkg/yurthub/util"
Expand All @@ -52,12 +53,14 @@ type CacheManager interface {
UpdateCacheAgents(agents []string) error
ListCacheAgents() []string
CanCacheFor(req *http.Request) bool
DeleteKindFor(gvr schema.GroupVersionResource) error
}

type cacheManager struct {
sync.RWMutex
storage StorageWrapper
serializerManager *serializer.SerializerManager
restMapperManager *hubmeta.RESTMapperManager
cacheAgents map[string]bool
listSelectorCollector map[string]string
}
Expand All @@ -66,10 +69,12 @@ type cacheManager struct {
func NewCacheManager(
storage StorageWrapper,
serializerMgr *serializer.SerializerManager,
restMapperMgr *hubmeta.RESTMapperManager,
) (CacheManager, error) {
cm := &cacheManager{
storage: storage,
serializerManager: serializerMgr,
restMapperManager: restMapperMgr,
cacheAgents: make(map[string]bool),
listSelectorCollector: make(map[string]string),
}
Expand Down Expand Up @@ -148,21 +153,38 @@ func (cm *cacheManager) queryListObject(req *http.Request) (runtime.Object, erro

var gvk schema.GroupVersionKind
var kind string
// If the GVR information is not recognized, return 404 not found directly
gvr := schema.GroupVersionResource{
Group: info.APIGroup,
Version: info.APIVersion,
Resource: info.Resource,
}
if _, gvk = cm.restMapperManager.KindFor(gvr); gvk.Empty() {
return nil, hubmeta.ErrGVRNotRecognized
} else {
kind = gvk.Kind
}

// If the GVR information is recognized, return list or empty list
objs, err := cm.storage.List(key)
if err != nil {
return nil, err
} else if len(objs) == 0 {
gvk, err = serializer.UnsafeDefaultRESTMapper.KindFor(schema.GroupVersionResource{
Group: info.APIGroup,
Version: info.APIVersion,
Resource: info.Resource,
})
if err != nil {
if err != storage.ErrStorageNotFound {
return nil, err
} else if isPodKey(key) {
// because at least there will be yurt-hub pod on the node.
// if no pods in cache, maybe all of pods have been deleted by accident,
// if empty object is returned, pods on node will be deleted by kubelet.
// in order to prevent the influence to business, return error here so pods
// will be kept on node.
return nil, err
}
kind = gvk.Kind
} else {
kind = objs[0].GetObjectKind().GroupVersionKind().Kind
} else if len(objs) != 0 {
// If restMapper's kind and object's kind are inconsistent, use the object's kind
objKind := objs[0].GetObjectKind().GroupVersionKind().Kind
if kind != objKind {
klog.Warningf("The restMapper's kind(%v) and object's kind(%v) are inconsistent ", kind, objKind)
kind = objKind
}
}

var listObj runtime.Object
Expand Down Expand Up @@ -348,16 +370,13 @@ func (cm *cacheManager) saveListObject(ctx context.Context, info *apirequest.Req
}.String()
accessor := meta.NewAccessor()

// Verify if DynamicRESTMapper(which store the CRD info) needs to be updated
if err := cm.restMapperManager.UpdateKind(schema.GroupVersionKind{Group: info.APIGroup, Version: info.APIVersion, Kind: kind}); err != nil {
klog.Errorf("failed to update the DynamicRESTMapper %v", err)
}

comp, _ := util.ClientComponentFrom(ctx)
// even if no objects in cloud cluster, we need to
// make up a storage that represents the no resources
// in local disk, so when cloud-edge network disconnected,
// yurthub can return empty objects instead of 404 code(not found)
if len(items) == 0 {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's need to keep len(items)==0 for empty item list object.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for reminding. The PR submitted recently recovered this situation.

// list returns no objects
key, _ := util.KeyFunc(comp, info.Resource, info.Namespace, "")
return cm.storage.Create(key, nil)
} else if info.Name != "" && len(items) == 1 {
if info.Name != "" && len(items) == 1 {
// list with fieldSelector=metadata.name=xxx
accessor.SetKind(items[0], kind)
accessor.SetAPIVersion(items[0], apiVersion)
Expand Down Expand Up @@ -390,7 +409,7 @@ func (cm *cacheManager) saveListObject(ctx context.Context, info *apirequest.Req
key, _ := util.KeyFunc(comp, info.Resource, ns, name)
objs[key] = items[i]
}

// if no objects in cloud cluster(objs is empty), it will clean the old files in the path of rootkey
return cm.storage.Replace(rootKey, objs)
}
}
Expand Down Expand Up @@ -436,6 +455,12 @@ func (cm *cacheManager) saveOneObject(ctx context.Context, info *apirequest.Requ
return err
}

// Verify if DynamicRESTMapper(which store the CRD info) needs to be updated
gvk := obj.GetObjectKind().GroupVersionKind()
if err := cm.restMapperManager.UpdateKind(gvk); err != nil {
klog.Errorf("failed to update the DynamicRESTMapper %v", err)
}

if err := cm.saveOneObjectWithValidation(key, obj); err != nil {
if err != storage.ErrStorageAccessConflict {
return err
Expand Down Expand Up @@ -600,3 +625,8 @@ func (cm *cacheManager) CanCacheFor(req *http.Request) bool {
}
return true
}

// DeleteKindFor is used to delete the invalid Kind(which is not registered in the cloud)
func (cm *cacheManager) DeleteKindFor(gvr schema.GroupVersionResource) error {
return cm.restMapperManager.DeleteKindFor(gvr)
}
Loading