Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

storage: add selection predicates #40

Merged
merged 1 commit into from
Jun 25, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ require (
github.com/spf13/cobra v1.1.1
github.com/spf13/pflag v1.0.5
github.com/stretchr/testify v1.7.0
golang.org/x/sync v0.0.0-20201207232520-09787c993a3a
k8s.io/api v0.21.2
k8s.io/apimachinery v0.21.2
k8s.io/apiserver v0.21.2
Expand Down
42 changes: 42 additions & 0 deletions pkg/server/builder/builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,48 @@ func TestValidateOpenAPISpec(t *testing.T) {
assert.NotContains(t, content, `__internal`)
}

func TestLabelSelector(t *testing.T) {
f := newFixture(t)
defer f.tearDown()

client := f.client

_, err := client.CoreV1alpha1().Manifests().Create(f.ctx, &corev1alpha1.Manifest{
ObjectMeta: metav1.ObjectMeta{
Name: "foo-1",
Labels: map[string]string{"group": "foo"},
},
}, metav1.CreateOptions{})

_, err = client.CoreV1alpha1().Manifests().Create(f.ctx, &corev1alpha1.Manifest{
ObjectMeta: metav1.ObjectMeta{
Name: "foo-2",
Labels: map[string]string{"group": "foo"},
},
}, metav1.CreateOptions{})
require.NoError(t, err)

_, err = client.CoreV1alpha1().Manifests().Create(f.ctx, &corev1alpha1.Manifest{
ObjectMeta: metav1.ObjectMeta{
Name: "bar-1",
Labels: map[string]string{"group": "bar"},
},
}, metav1.CreateOptions{})
require.NoError(t, err)

list, err := client.CoreV1alpha1().Manifests().List(f.ctx, metav1.ListOptions{
LabelSelector: "group=foo",
})
require.NoError(t, err)

names := []string{}
for _, item := range list.Items {
names = append(names, item.Name)
}

assert.ElementsMatch(t, []string{"foo-1", "foo-2"}, names)
}

func memConnProvider() apiserver.ConnProvider {
return apiserver.NetworkConnProvider(&memconn.Provider{}, "memu")
}
Expand Down
14 changes: 8 additions & 6 deletions pkg/storage/filepath/fs.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ type FS interface {
EnsureDir(dirname string) error
Write(encoder runtime.Encoder, filepath string, obj runtime.Object) error
Read(decoder runtime.Decoder, path string, newFunc func() runtime.Object) (runtime.Object, error)
VisitDir(dirname string, newFunc func() runtime.Object, codec runtime.Decoder, visitFunc func(string, runtime.Object)) error
VisitDir(dirname string, newFunc func() runtime.Object, codec runtime.Decoder, visitFunc func(string, runtime.Object) error) error
}

type RealFS struct {
Expand Down Expand Up @@ -65,7 +65,7 @@ func (fs RealFS) Read(decoder runtime.Decoder, path string, newFunc func() runti
return decodedObj, nil
}

func (fs RealFS) VisitDir(dirname string, newFunc func() runtime.Object, codec runtime.Decoder, visitFunc func(string, runtime.Object)) error {
func (fs RealFS) VisitDir(dirname string, newFunc func() runtime.Object, codec runtime.Decoder, visitFunc func(string, runtime.Object) error) error {
return filepath.Walk(dirname, func(path string, info os.FileInfo, err error) error {
if err != nil {
return err
Expand All @@ -80,8 +80,7 @@ func (fs RealFS) VisitDir(dirname string, newFunc func() runtime.Object, codec r
if err != nil {
return err
}
visitFunc(path, newObj)
return nil
return visitFunc(path, newObj)
})
}

Expand Down Expand Up @@ -217,7 +216,7 @@ func (fs *MemoryFS) readInternal(decoder runtime.Decoder, p string, newFunc func
}

// Walk the directory, reading all objects in it.
func (fs *MemoryFS) VisitDir(dirname string, newFunc func() runtime.Object, codec runtime.Decoder, visitFunc func(string, runtime.Object)) error {
func (fs *MemoryFS) VisitDir(dirname string, newFunc func() runtime.Object, codec runtime.Decoder, visitFunc func(string, runtime.Object) error) error {
fs.mu.Lock()
defer fs.mu.Unlock()

Expand Down Expand Up @@ -247,7 +246,10 @@ func (fs *MemoryFS) VisitDir(dirname string, newFunc func() runtime.Object, code
if err != nil {
return err
}
visitFunc(keyPath, newObj)
err = visitFunc(keyPath, newObj)
if err != nil {
return err
}
}
return nil
}
Expand Down
56 changes: 50 additions & 6 deletions pkg/storage/filepath/jsonfile_rest.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,14 @@ import (
metainternalversion "k8s.io/apimachinery/pkg/apis/meta/internalversion"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/conversion"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/watch"
genericapirequest "k8s.io/apiserver/pkg/endpoints/request"
"k8s.io/apiserver/pkg/registry/rest"
"k8s.io/apiserver/pkg/storage"
)

// ErrFileNotExists means the file doesn't actually exist.
Expand Down Expand Up @@ -115,15 +118,23 @@ func (f *filepathREST) List(
ctx context.Context,
options *metainternalversion.ListOptions,
) (runtime.Object, error) {
p := newSelectionPredicate(options)
newListObj := f.NewList()
v, err := getListPrt(newListObj)
if err != nil {
return nil, err
}

dirname := f.objectDirName(ctx)
if err := f.fs.VisitDir(dirname, f.newFunc, f.codec, func(path string, obj runtime.Object) {
appendItem(v, obj)
if err := f.fs.VisitDir(dirname, f.newFunc, f.codec, func(path string, obj runtime.Object) error {
ok, err := p.Matches(obj)
if err != nil {
return err
}
if ok {
appendItem(v, obj)
}
return nil
}); err != nil {
return nil, fmt.Errorf("failed walking filepath %v: %v", dirname, err)
}
Expand Down Expand Up @@ -336,15 +347,23 @@ func (f *filepathREST) DeleteCollection(
options *metav1.DeleteOptions,
listOptions *metainternalversion.ListOptions,
) (runtime.Object, error) {
p := newSelectionPredicate(listOptions)
newListObj := f.NewList()
v, err := getListPrt(newListObj)
if err != nil {
return nil, err
}
dirname := f.objectDirName(ctx)
if err := f.fs.VisitDir(dirname, f.newFunc, f.codec, func(path string, obj runtime.Object) {
_ = f.fs.Remove(path)
appendItem(v, obj)
if err := f.fs.VisitDir(dirname, f.newFunc, f.codec, func(path string, obj runtime.Object) error {
ok, err := p.Matches(obj)
if err != nil {
return err
}
if ok {
_ = f.fs.Remove(path)
appendItem(v, obj)
}
return nil
}); err != nil {
return nil, fmt.Errorf("failed walking filepath %v: %v", dirname, err)
}
Expand Down Expand Up @@ -386,6 +405,7 @@ func getListPrt(listObj runtime.Object) (reflect.Value, error) {
}

func (f *filepathREST) Watch(ctx context.Context, options *metainternalversion.ListOptions) (watch.Interface, error) {
p := newSelectionPredicate(options)
jw := f.watchSet.newWatch()

// On initial watch, send all the existing objects
Expand All @@ -400,12 +420,36 @@ func (f *filepathREST) Watch(ctx context.Context, options *metainternalversion.L
initEvents := []watch.Event{}
for i := 0; i < items.Len(); i++ {
obj := items.Index(i).Addr().Interface().(runtime.Object)
ok, err := p.Matches(obj)
if err != nil {
return nil, err
}
if !ok {
continue
}
initEvents = append(initEvents, watch.Event{
Type: watch.Added,
Object: obj,
})
}
jw.Start(initEvents)
jw.Start(p, initEvents)

return jw, nil
}

func newSelectionPredicate(options *metainternalversion.ListOptions) storage.SelectionPredicate {
p := storage.SelectionPredicate{
Label: labels.Everything(),
Field: fields.Everything(),
GetAttrs: storage.DefaultClusterScopedAttr,
}
if options != nil {
if options.LabelSelector != nil {
p.Label = options.LabelSelector
}
if options.FieldSelector != nil {
p.Field = options.FieldSelector
}
}
return p
}
85 changes: 85 additions & 0 deletions pkg/storage/filepath/jsonfile_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,11 @@ import (
"github.com/tilt-dev/tilt-apiserver/pkg/apis/core/v1alpha1"
builderrest "github.com/tilt-dev/tilt-apiserver/pkg/server/builder/rest"
"github.com/tilt-dev/tilt-apiserver/pkg/storage/filepath"
"golang.org/x/sync/errgroup"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/apis/meta/internalversion"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/serializer"
"k8s.io/apiserver/pkg/registry/rest"
Expand Down Expand Up @@ -79,6 +82,26 @@ func TestDelete(t *testing.T) {
}
}

func TestListLabelSelector(t *testing.T) {
for _, fs := range fileSystems() {
t.Run(fmt.Sprintf("%T", fs), func(t *testing.T) {
f := newFixture(t, fs)
defer f.TearDown()
f.TestListLabelSelector()
})
}
}

func TestWatchLabelSelector(t *testing.T) {
for _, fs := range fileSystems() {
t.Run(fmt.Sprintf("%T", fs), func(t *testing.T) {
f := newFixture(t, fs)
defer f.TearDown()
f.TestWatchLabelSelector()
})
}
}

type fixture struct {
t *testing.T
dir string
Expand Down Expand Up @@ -183,6 +206,68 @@ func (f *fixture) TestDelete() {
}
}

func (f *fixture) TestListLabelSelector() {
_, err := f.storage.Create(context.Background(), &Manifest{
ObjectMeta: metav1.ObjectMeta{Name: "foo-1", Labels: map[string]string{"group": "foo"}},
}, nil, &metav1.CreateOptions{})
require.NoError(f.t, err)
_, err = f.storage.Create(context.Background(), &Manifest{
ObjectMeta: metav1.ObjectMeta{Name: "bar-1", Labels: map[string]string{"group": "bar"}},
}, nil, &metav1.CreateOptions{})
require.NoError(f.t, err)

list, err := f.storage.List(context.Background(), &internalversion.ListOptions{
LabelSelector: labels.SelectorFromSet(labels.Set{"group": "bar"}),
})
require.NoError(f.t, err)

mList := list.(*ManifestList)
require.Equal(f.t, 1, len(mList.Items))
assert.Equal(f.t, "bar-1", mList.Items[0].Name)
}

func (f *fixture) TestWatchLabelSelector() {
ctx := context.Background()
w, err := f.storage.Watch(ctx, &internalversion.ListOptions{
LabelSelector: labels.SelectorFromSet(labels.Set{"group": "foo"}),
})
require.NoError(f.t, err)
defer w.Stop()

g, ctx := errgroup.WithContext(ctx)
g.Go(func() error {
_, err = f.storage.Create(ctx, &Manifest{
ObjectMeta: metav1.ObjectMeta{Name: "foo-1", Labels: map[string]string{"group": "foo"}},
}, nil, &metav1.CreateOptions{})

if err != nil {
return err
}

_, err = f.storage.Create(ctx, &Manifest{
ObjectMeta: metav1.ObjectMeta{Name: "bar-1", Labels: map[string]string{"group": "bar"}},
}, nil, &metav1.CreateOptions{})
if err != nil {
return err
}

_, err = f.storage.Create(ctx, &Manifest{
ObjectMeta: metav1.ObjectMeta{Name: "foo-2", Labels: map[string]string{"group": "foo"}},
}, nil, &metav1.CreateOptions{})
if err != nil {
return err
}
return nil
})

evt := <-w.ResultChan()
assert.Equal(f.t, "foo-1", evt.Object.(*Manifest).Name)
evt = <-w.ResultChan()
assert.Equal(f.t, "foo-2", evt.Object.(*Manifest).Name)

require.NoError(f.t, g.Wait())
}

func (f *fixture) TearDown() {
_ = os.Remove(f.dir)
}
12 changes: 11 additions & 1 deletion pkg/storage/filepath/watchset.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"sync"

"k8s.io/apimachinery/pkg/watch"
"k8s.io/apiserver/pkg/storage"
)

// Keeps track of which watches need to be notified
Expand Down Expand Up @@ -45,13 +46,14 @@ func (s *WatchSet) notifyWatchers(ev watch.Event) {

type watchNode struct {
s *WatchSet
p storage.SelectionPredicate
id int
updateCh chan watch.Event
outCh chan watch.Event
}

// Start sending events to this watch.
func (w *watchNode) Start(initEvents []watch.Event) {
func (w *watchNode) Start(p storage.SelectionPredicate, initEvents []watch.Event) {
w.s.mu.Lock()
w.s.nodes[w.id] = w
w.s.mu.Unlock()
Expand All @@ -62,6 +64,14 @@ func (w *watchNode) Start(initEvents []watch.Event) {
}

for e := range w.updateCh {
ok, err := p.Matches(e.Object)
if err != nil {
continue
}

if !ok {
continue
}
w.outCh <- e
}
close(w.outCh)
Expand Down
Loading