Skip to content

Commit

Permalink
storage: add selection predicates (#40)
Browse files Browse the repository at this point in the history
  • Loading branch information
nicks committed Jun 25, 2021
1 parent d499034 commit f173ded
Show file tree
Hide file tree
Showing 8 changed files with 265 additions and 13 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ require (
github.com/spf13/cobra v1.1.1
github.com/spf13/pflag v1.0.5
github.com/stretchr/testify v1.7.0
golang.org/x/sync v0.0.0-20201207232520-09787c993a3a
k8s.io/api v0.21.2
k8s.io/apimachinery v0.21.2
k8s.io/apiserver v0.21.2
Expand Down
42 changes: 42 additions & 0 deletions pkg/server/builder/builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,48 @@ func TestValidateOpenAPISpec(t *testing.T) {
assert.NotContains(t, content, `__internal`)
}

func TestLabelSelector(t *testing.T) {
f := newFixture(t)
defer f.tearDown()

client := f.client

_, err := client.CoreV1alpha1().Manifests().Create(f.ctx, &corev1alpha1.Manifest{
ObjectMeta: metav1.ObjectMeta{
Name: "foo-1",
Labels: map[string]string{"group": "foo"},
},
}, metav1.CreateOptions{})

_, err = client.CoreV1alpha1().Manifests().Create(f.ctx, &corev1alpha1.Manifest{
ObjectMeta: metav1.ObjectMeta{
Name: "foo-2",
Labels: map[string]string{"group": "foo"},
},
}, metav1.CreateOptions{})
require.NoError(t, err)

_, err = client.CoreV1alpha1().Manifests().Create(f.ctx, &corev1alpha1.Manifest{
ObjectMeta: metav1.ObjectMeta{
Name: "bar-1",
Labels: map[string]string{"group": "bar"},
},
}, metav1.CreateOptions{})
require.NoError(t, err)

list, err := client.CoreV1alpha1().Manifests().List(f.ctx, metav1.ListOptions{
LabelSelector: "group=foo",
})
require.NoError(t, err)

names := []string{}
for _, item := range list.Items {
names = append(names, item.Name)
}

assert.ElementsMatch(t, []string{"foo-1", "foo-2"}, names)
}

func memConnProvider() apiserver.ConnProvider {
return apiserver.NetworkConnProvider(&memconn.Provider{}, "memu")
}
Expand Down
14 changes: 8 additions & 6 deletions pkg/storage/filepath/fs.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ type FS interface {
EnsureDir(dirname string) error
Write(encoder runtime.Encoder, filepath string, obj runtime.Object) error
Read(decoder runtime.Decoder, path string, newFunc func() runtime.Object) (runtime.Object, error)
VisitDir(dirname string, newFunc func() runtime.Object, codec runtime.Decoder, visitFunc func(string, runtime.Object)) error
VisitDir(dirname string, newFunc func() runtime.Object, codec runtime.Decoder, visitFunc func(string, runtime.Object) error) error
}

type RealFS struct {
Expand Down Expand Up @@ -65,7 +65,7 @@ func (fs RealFS) Read(decoder runtime.Decoder, path string, newFunc func() runti
return decodedObj, nil
}

func (fs RealFS) VisitDir(dirname string, newFunc func() runtime.Object, codec runtime.Decoder, visitFunc func(string, runtime.Object)) error {
func (fs RealFS) VisitDir(dirname string, newFunc func() runtime.Object, codec runtime.Decoder, visitFunc func(string, runtime.Object) error) error {
return filepath.Walk(dirname, func(path string, info os.FileInfo, err error) error {
if err != nil {
return err
Expand All @@ -80,8 +80,7 @@ func (fs RealFS) VisitDir(dirname string, newFunc func() runtime.Object, codec r
if err != nil {
return err
}
visitFunc(path, newObj)
return nil
return visitFunc(path, newObj)
})
}

Expand Down Expand Up @@ -217,7 +216,7 @@ func (fs *MemoryFS) readInternal(decoder runtime.Decoder, p string, newFunc func
}

// Walk the directory, reading all objects in it.
func (fs *MemoryFS) VisitDir(dirname string, newFunc func() runtime.Object, codec runtime.Decoder, visitFunc func(string, runtime.Object)) error {
func (fs *MemoryFS) VisitDir(dirname string, newFunc func() runtime.Object, codec runtime.Decoder, visitFunc func(string, runtime.Object) error) error {
fs.mu.Lock()
defer fs.mu.Unlock()

Expand Down Expand Up @@ -247,7 +246,10 @@ func (fs *MemoryFS) VisitDir(dirname string, newFunc func() runtime.Object, code
if err != nil {
return err
}
visitFunc(keyPath, newObj)
err = visitFunc(keyPath, newObj)
if err != nil {
return err
}
}
return nil
}
Expand Down
56 changes: 50 additions & 6 deletions pkg/storage/filepath/jsonfile_rest.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,14 @@ import (
metainternalversion "k8s.io/apimachinery/pkg/apis/meta/internalversion"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/conversion"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/watch"
genericapirequest "k8s.io/apiserver/pkg/endpoints/request"
"k8s.io/apiserver/pkg/registry/rest"
"k8s.io/apiserver/pkg/storage"
)

// ErrFileNotExists means the file doesn't actually exist.
Expand Down Expand Up @@ -115,15 +118,23 @@ func (f *filepathREST) List(
ctx context.Context,
options *metainternalversion.ListOptions,
) (runtime.Object, error) {
p := newSelectionPredicate(options)
newListObj := f.NewList()
v, err := getListPrt(newListObj)
if err != nil {
return nil, err
}

dirname := f.objectDirName(ctx)
if err := f.fs.VisitDir(dirname, f.newFunc, f.codec, func(path string, obj runtime.Object) {
appendItem(v, obj)
if err := f.fs.VisitDir(dirname, f.newFunc, f.codec, func(path string, obj runtime.Object) error {
ok, err := p.Matches(obj)
if err != nil {
return err
}
if ok {
appendItem(v, obj)
}
return nil
}); err != nil {
return nil, fmt.Errorf("failed walking filepath %v: %v", dirname, err)
}
Expand Down Expand Up @@ -336,15 +347,23 @@ func (f *filepathREST) DeleteCollection(
options *metav1.DeleteOptions,
listOptions *metainternalversion.ListOptions,
) (runtime.Object, error) {
p := newSelectionPredicate(listOptions)
newListObj := f.NewList()
v, err := getListPrt(newListObj)
if err != nil {
return nil, err
}
dirname := f.objectDirName(ctx)
if err := f.fs.VisitDir(dirname, f.newFunc, f.codec, func(path string, obj runtime.Object) {
_ = f.fs.Remove(path)
appendItem(v, obj)
if err := f.fs.VisitDir(dirname, f.newFunc, f.codec, func(path string, obj runtime.Object) error {
ok, err := p.Matches(obj)
if err != nil {
return err
}
if ok {
_ = f.fs.Remove(path)
appendItem(v, obj)
}
return nil
}); err != nil {
return nil, fmt.Errorf("failed walking filepath %v: %v", dirname, err)
}
Expand Down Expand Up @@ -386,6 +405,7 @@ func getListPrt(listObj runtime.Object) (reflect.Value, error) {
}

func (f *filepathREST) Watch(ctx context.Context, options *metainternalversion.ListOptions) (watch.Interface, error) {
p := newSelectionPredicate(options)
jw := f.watchSet.newWatch()

// On initial watch, send all the existing objects
Expand All @@ -400,12 +420,36 @@ func (f *filepathREST) Watch(ctx context.Context, options *metainternalversion.L
initEvents := []watch.Event{}
for i := 0; i < items.Len(); i++ {
obj := items.Index(i).Addr().Interface().(runtime.Object)
ok, err := p.Matches(obj)
if err != nil {
return nil, err
}
if !ok {
continue
}
initEvents = append(initEvents, watch.Event{
Type: watch.Added,
Object: obj,
})
}
jw.Start(initEvents)
jw.Start(p, initEvents)

return jw, nil
}

func newSelectionPredicate(options *metainternalversion.ListOptions) storage.SelectionPredicate {
p := storage.SelectionPredicate{
Label: labels.Everything(),
Field: fields.Everything(),
GetAttrs: storage.DefaultClusterScopedAttr,
}
if options != nil {
if options.LabelSelector != nil {
p.Label = options.LabelSelector
}
if options.FieldSelector != nil {
p.Field = options.FieldSelector
}
}
return p
}
85 changes: 85 additions & 0 deletions pkg/storage/filepath/jsonfile_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,11 @@ import (
"github.com/tilt-dev/tilt-apiserver/pkg/apis/core/v1alpha1"
builderrest "github.com/tilt-dev/tilt-apiserver/pkg/server/builder/rest"
"github.com/tilt-dev/tilt-apiserver/pkg/storage/filepath"
"golang.org/x/sync/errgroup"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/apis/meta/internalversion"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/serializer"
"k8s.io/apiserver/pkg/registry/rest"
Expand Down Expand Up @@ -79,6 +82,26 @@ func TestDelete(t *testing.T) {
}
}

func TestListLabelSelector(t *testing.T) {
for _, fs := range fileSystems() {
t.Run(fmt.Sprintf("%T", fs), func(t *testing.T) {
f := newFixture(t, fs)
defer f.TearDown()
f.TestListLabelSelector()
})
}
}

func TestWatchLabelSelector(t *testing.T) {
for _, fs := range fileSystems() {
t.Run(fmt.Sprintf("%T", fs), func(t *testing.T) {
f := newFixture(t, fs)
defer f.TearDown()
f.TestWatchLabelSelector()
})
}
}

type fixture struct {
t *testing.T
dir string
Expand Down Expand Up @@ -183,6 +206,68 @@ func (f *fixture) TestDelete() {
}
}

func (f *fixture) TestListLabelSelector() {
_, err := f.storage.Create(context.Background(), &Manifest{
ObjectMeta: metav1.ObjectMeta{Name: "foo-1", Labels: map[string]string{"group": "foo"}},
}, nil, &metav1.CreateOptions{})
require.NoError(f.t, err)
_, err = f.storage.Create(context.Background(), &Manifest{
ObjectMeta: metav1.ObjectMeta{Name: "bar-1", Labels: map[string]string{"group": "bar"}},
}, nil, &metav1.CreateOptions{})
require.NoError(f.t, err)

list, err := f.storage.List(context.Background(), &internalversion.ListOptions{
LabelSelector: labels.SelectorFromSet(labels.Set{"group": "bar"}),
})
require.NoError(f.t, err)

mList := list.(*ManifestList)
require.Equal(f.t, 1, len(mList.Items))
assert.Equal(f.t, "bar-1", mList.Items[0].Name)
}

func (f *fixture) TestWatchLabelSelector() {
ctx := context.Background()
w, err := f.storage.Watch(ctx, &internalversion.ListOptions{
LabelSelector: labels.SelectorFromSet(labels.Set{"group": "foo"}),
})
require.NoError(f.t, err)
defer w.Stop()

g, ctx := errgroup.WithContext(ctx)
g.Go(func() error {
_, err = f.storage.Create(ctx, &Manifest{
ObjectMeta: metav1.ObjectMeta{Name: "foo-1", Labels: map[string]string{"group": "foo"}},
}, nil, &metav1.CreateOptions{})

if err != nil {
return err
}

_, err = f.storage.Create(ctx, &Manifest{
ObjectMeta: metav1.ObjectMeta{Name: "bar-1", Labels: map[string]string{"group": "bar"}},
}, nil, &metav1.CreateOptions{})
if err != nil {
return err
}

_, err = f.storage.Create(ctx, &Manifest{
ObjectMeta: metav1.ObjectMeta{Name: "foo-2", Labels: map[string]string{"group": "foo"}},
}, nil, &metav1.CreateOptions{})
if err != nil {
return err
}
return nil
})

evt := <-w.ResultChan()
assert.Equal(f.t, "foo-1", evt.Object.(*Manifest).Name)
evt = <-w.ResultChan()
assert.Equal(f.t, "foo-2", evt.Object.(*Manifest).Name)

require.NoError(f.t, g.Wait())
}

func (f *fixture) TearDown() {
_ = os.Remove(f.dir)
}
12 changes: 11 additions & 1 deletion pkg/storage/filepath/watchset.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"sync"

"k8s.io/apimachinery/pkg/watch"
"k8s.io/apiserver/pkg/storage"
)

// Keeps track of which watches need to be notified
Expand Down Expand Up @@ -45,13 +46,14 @@ func (s *WatchSet) notifyWatchers(ev watch.Event) {

type watchNode struct {
s *WatchSet
p storage.SelectionPredicate
id int
updateCh chan watch.Event
outCh chan watch.Event
}

// Start sending events to this watch.
func (w *watchNode) Start(initEvents []watch.Event) {
func (w *watchNode) Start(p storage.SelectionPredicate, initEvents []watch.Event) {
w.s.mu.Lock()
w.s.nodes[w.id] = w
w.s.mu.Unlock()
Expand All @@ -62,6 +64,14 @@ func (w *watchNode) Start(initEvents []watch.Event) {
}

for e := range w.updateCh {
ok, err := p.Matches(e.Object)
if err != nil {
continue
}

if !ok {
continue
}
w.outCh <- e
}
close(w.outCh)
Expand Down
Loading

0 comments on commit f173ded

Please sign in to comment.