Skip to content

Commit

Permalink
Merge pull request #353 from aruiz14/v2.0-indexer-by-hash
Browse files Browse the repository at this point in the history
[release-2.0] Use indexer when listing cached types by hash
  • Loading branch information
KevinJoiner authored Jan 11, 2024
2 parents 0ffc258 + f5f2da2 commit 49d04f8
Show file tree
Hide file tree
Showing 4 changed files with 255 additions and 8 deletions.
36 changes: 35 additions & 1 deletion pkg/apply/desiredset.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,24 @@ package apply

import (
"context"
"errors"
"fmt"

"github.com/rancher/wrangler/v2/pkg/apply/injectors"
"github.com/rancher/wrangler/v2/pkg/kv"
"github.com/rancher/wrangler/v2/pkg/merr"
"github.com/rancher/wrangler/v2/pkg/objectset"
"github.com/sirupsen/logrus"
"k8s.io/apimachinery/pkg/api/meta"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/tools/cache"
)

// Indexer name added for cached types
const byHash = "wrangler.byObjectSetHash"

type patchKey struct {
schema.GroupVersionKind
objectset.ObjectKey
Expand Down Expand Up @@ -160,13 +167,40 @@ func (o desiredSet) WithCacheTypes(igs ...InformerGetter) Apply {
}

for _, ig := range igs {
pruneTypes[ig.GroupVersionKind()] = ig.Informer()
informer := ig.Informer()
if err := addIndexerByHash(informer.GetIndexer()); err != nil {
// Ignore repeatedly adding the same indexer for different types
if !errors.Is(err, indexerAlreadyExistsErr) {
logrus.Warnf("Problem adding hash indexer to informer [%s]: %v", ig.GroupVersionKind().Kind, err)
}
}
pruneTypes[ig.GroupVersionKind()] = informer
}

o.pruneTypes = pruneTypes
return o
}

// addIndexerByHash an Informer to index objects by the hash annotation value
func addIndexerByHash(indexer cache.Indexer) error {
if _, alreadyAdded := indexer.GetIndexers()[byHash]; alreadyAdded {
return fmt.Errorf("adding indexer %q: %w", byHash, indexerAlreadyExistsErr)
}
return indexer.AddIndexers(map[string]cache.IndexFunc{
byHash: func(obj interface{}) ([]string, error) {
metadata, err := meta.Accessor(obj)
if err != nil {
return nil, err
}
labels := metadata.GetLabels()
if labels == nil || labels[LabelHash] == "" {
return nil, nil
}
return []string{labels[LabelHash]}, nil
},
})
}

func (o desiredSet) WithPatcher(gvk schema.GroupVersionKind, patcher Patcher) Apply {
patchers := map[schema.GroupVersionKind]Patcher{}
for k, v := range o.patchers {
Expand Down
5 changes: 3 additions & 2 deletions pkg/apply/desiredset_apply.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,9 @@ var (
LabelName,
LabelNamespace,
}
rls = map[string]flowcontrol.RateLimiter{}
rlsLock sync.Mutex
rls = map[string]flowcontrol.RateLimiter{}
rlsLock sync.Mutex
indexerAlreadyExistsErr = errors.New("an indexer with the same already exists")
)

func (o *desiredSet) getRateLimit(labelHash string) flowcontrol.RateLimiter {
Expand Down
56 changes: 51 additions & 5 deletions pkg/apply/desiredset_process.go
Original file line number Diff line number Diff line change
Expand Up @@ -338,8 +338,7 @@ func (o *desiredSet) process(debugID string, set labels.Selector, gvk schema.Gro
}
}

func (o *desiredSet) list(namespaced bool, informer cache.SharedIndexInformer, client dynamic.NamespaceableResourceInterface,
selector labels.Selector, desiredObjects objectset.ObjectByKey) (map[objectset.ObjectKey]runtime.Object, error) {
func (o *desiredSet) list(namespaced bool, informer cache.SharedIndexInformer, client dynamic.NamespaceableResourceInterface, selector labels.Selector, desiredObjects objectset.ObjectByKey) (map[objectset.ObjectKey]runtime.Object, error) {
var (
errs []error
objs = objectset.ObjectByKey{}
Expand Down Expand Up @@ -388,12 +387,17 @@ func (o *desiredSet) list(namespaced bool, informer cache.SharedIndexInformer, c
namespace = o.listerNamespace
}

err := cache.ListAllByNamespace(informer.GetIndexer(), namespace, selector, func(obj interface{}) {
// Special case for listing only by hash using indexers
indexer := informer.GetIndexer()
if hash, ok := getIndexableHash(indexer, selector); ok {
return listByHash(indexer, hash, namespace)
}

if err := cache.ListAllByNamespace(indexer, namespace, selector, func(obj interface{}) {
if err := addObjectToMap(objs, obj); err != nil {
errs = append(errs, err)
}
})
if err != nil {
}); err != nil {
errs = append(errs, err)
}

Expand Down Expand Up @@ -494,3 +498,45 @@ func multiNamespaceList(ctx context.Context, namespaces []string, baseClient dyn

return wg.Wait()
}

// getIndexableHash detects if provided selector can be replaced by using the hash index, if configured, in which case returns the hash value
func getIndexableHash(indexer cache.Indexer, selector labels.Selector) (string, bool) {
// Check if indexer was added
if indexer == nil || indexer.GetIndexers()[byHash] == nil {
return "", false
}

// Check specific case of listing with exact hash label selector
if req, selectable := selector.Requirements(); len(req) != 1 || !selectable {
return "", false
}

return selector.RequiresExactMatch(LabelHash)
}

// inNamespace checks whether a given object is a Kubernetes object and is part of the provided namespace
func inNamespace(namespace string, obj interface{}) bool {
metadata, err := meta.Accessor(obj)
return err == nil && metadata.GetNamespace() == namespace
}

// listByHash use a pre-configured indexer to list objects of a certain type by their hash label
func listByHash(indexer cache.Indexer, hash string, namespace string) (map[objectset.ObjectKey]runtime.Object, error) {
var (
errs []error
objs = objectset.ObjectByKey{}
)
res, err := indexer.ByIndex(byHash, hash)
if err != nil {
return nil, err
}
for _, obj := range res {
if namespace != "" && !inNamespace(namespace, obj) {
continue
}
if err := addObjectToMap(objs, obj); err != nil {
errs = append(errs, err)
}
}
return objs, merr.NewErrors(errs...)
}
166 changes: 166 additions & 0 deletions pkg/apply/desiredset_process_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,18 @@ import (
"strings"
"testing"

"github.com/rancher/wrangler/v2/pkg/objectset"

"github.com/stretchr/testify/assert"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/dynamic/fake"
k8stesting "k8s.io/client-go/testing"
"k8s.io/client-go/tools/cache"
)

func Test_multiNamespaceList(t *testing.T) {
Expand Down Expand Up @@ -109,3 +114,164 @@ func Test_multiNamespaceList(t *testing.T) {
})
}
}

func Test_getIndexableHash(t *testing.T) {
const hash = "somehash"
hashSelector, err := GetSelector(map[string]string{LabelHash: hash})
if err != nil {
t.Fatal(err)
}
envLabelSelector, err := metav1.LabelSelectorAsSelector(&metav1.LabelSelector{MatchLabels: map[string]string{"env": "dev"}})
if err != nil {
t.Fatal(err)
}

indexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{byHash: func(obj interface{}) ([]string, error) {
return nil, nil
}})
type args struct {
indexer cache.Indexer
selector labels.Selector
}
tests := []struct {
name string
args args
wantHash string
want bool
}{
{name: "indexer configured", args: args{
indexer: indexer,
selector: hashSelector,
}, wantHash: hash, want: true},
{name: "indexer not configured", args: args{
indexer: cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{}),
selector: hashSelector,
}, wantHash: "", want: false},
{name: "using Everything selector", args: args{
indexer: indexer,
selector: labels.Everything(),
}, wantHash: "", want: false},
{name: "using other label selectors", args: args{
indexer: indexer,
selector: envLabelSelector,
}, wantHash: "", want: false},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
gotHash, got := getIndexableHash(tt.args.indexer, tt.args.selector)
assert.Equalf(t, tt.wantHash, gotHash, "getIndexableHash(%v, %v)", tt.args.indexer, tt.args.selector)
assert.Equalf(t, tt.want, got, "getIndexableHash(%v, %v)", tt.args.indexer, tt.args.selector)
})
}
}

func Test_inNamespace(t *testing.T) {
type args struct {
namespace string
obj interface{}
}
tests := []struct {
name string
args args
want bool
}{
{name: "object in namespace", args: args{
namespace: "ns", obj: &metav1.ObjectMeta{
Namespace: "ns",
},
}, want: true},
{name: "object not in namespace", args: args{
namespace: "ns", obj: &metav1.ObjectMeta{
Namespace: "another-ns",
},
}, want: false},
{name: "object not namespaced", args: args{
namespace: "ns", obj: &corev1.Namespace{},
}, want: false},
{name: "non k8s object", args: args{
namespace: "ns", obj: &struct{}{},
}, want: false},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
assert.Equalf(t, tt.want, inNamespace(tt.args.namespace, tt.args.obj), "inNamespace(%v, %v)", tt.args.namespace, tt.args.obj)
})
}
}

func Test_listByHash(t *testing.T) {
indexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{})
if err := addIndexerByHash(indexer); err != nil {
t.Fatal(err)
}

addObject := func(name, namespace, hash string) *corev1.Pod {
t.Helper()
obj := &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Namespace: namespace,
Labels: map[string]string{LabelHash: hash},
},
}
if err := indexer.Add(obj); err != nil {
t.Fatal(err)
}
return obj
}
namespace := "ns"
objects := []*corev1.Pod{
// 3 objects with the same hash
addObject("obj0", namespace, "hash0"),
addObject("obj01", namespace, "hash0"),
addObject("obj02", "another-ns", "hash0"),
// Single object for hash
addObject("obj1", namespace, "hash1"),
}

type args struct {
hash string
namespace string
}
tests := []struct {
name string
args args
want map[objectset.ObjectKey]runtime.Object
}{
{name: "finds object by hash in all namespaces",
args: args{
hash: "hash0",
}, want: map[objectset.ObjectKey]runtime.Object{
objectset.NewObjectKey(objects[0]): objects[0],
objectset.NewObjectKey(objects[1]): objects[1],
objectset.NewObjectKey(objects[2]): objects[2],
}},
{name: "finds object by hash in namespace",
args: args{
hash: "hash0",
namespace: namespace,
}, want: map[objectset.ObjectKey]runtime.Object{
objectset.NewObjectKey(objects[0]): objects[0],
objectset.NewObjectKey(objects[1]): objects[1],
}},
{name: "returns empty if namespace does not match",
args: args{
hash: "hash1",
namespace: "another-ns",
}, want: map[objectset.ObjectKey]runtime.Object{}},
{name: "finds object by hash",
args: args{
hash: "hash1",
namespace: namespace,
}, want: map[objectset.ObjectKey]runtime.Object{
objectset.NewObjectKey(objects[3]): objects[3],
}},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, err := listByHash(indexer, tt.args.hash, tt.args.namespace)
assert.NoError(t, err)
assert.Equalf(t, tt.want, got, "listByHash(%v, %v, %v)", indexer, tt.args.hash, tt.args.namespace)
})
}
}

0 comments on commit 49d04f8

Please sign in to comment.