Skip to content

Commit

Permalink
API Server Source: support non namespaced resources (#2930)
Browse files Browse the repository at this point in the history
* support non namespaced resources - apiserver source

* add apiserver source non-namespaced tests

* Update pkg/adapter/apiserver/adapter_test.go

Co-Authored-By: Matt Moore <mattmoor@vmware.com>

* Update pkg/adapter/apiserver/adapter_test.go

Co-Authored-By: Matt Moore <mattmoor@vmware.com>

* Update pkg/adapter/apiserver/adapter_test.go

Co-Authored-By: Matt Moore <mattmoor@vmware.com>

Co-authored-by: Matt Moore <mattmoor@vmware.com>
  • Loading branch information
odacremolbap and mattmoor authored May 8, 2020
1 parent 1e440d5 commit d76621b
Show file tree
Hide file tree
Showing 3 changed files with 146 additions and 21 deletions.
47 changes: 37 additions & 10 deletions pkg/adapter/apiserver/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/discovery"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/tools/cache"
"knative.dev/eventing/pkg/adapter/v2"
Expand All @@ -44,9 +45,10 @@ type apiServerAdapter struct {

config Config

k8s dynamic.Interface
source string // TODO: who dis?
name string // TODO: who dis?
discover discovery.DiscoveryInterface
k8s dynamic.Interface
source string // TODO: who dis?
name string // TODO: who dis?
}

func (a *apiServerAdapter) Start(stopCh <-chan struct{}) error {
Expand Down Expand Up @@ -75,15 +77,40 @@ func (a *apiServerAdapter) Start(stopCh <-chan struct{}) error {

a.logger.Infof("STARTING -- %#v", a.config)

for _, r := range a.config.Resources {
lw := &cache.ListWatch{
// TODO: this will not work with cluster scoped resources.
ListFunc: asUnstructuredLister(a.k8s.Resource(r.GVR).Namespace(a.config.Namespace).List, r.LabelSelector),
WatchFunc: asUnstructuredWatcher(a.k8s.Resource(r.GVR).Namespace(a.config.Namespace).Watch, r.LabelSelector),
for _, configRes := range a.config.Resources {

resources, err := a.discover.ServerResourcesForGroupVersion(configRes.GVR.GroupVersion().String())
if err != nil {
a.logger.Errorf("Could not retrieve information about resource %s: %s", configRes.GVR.String(), err.Error())
continue
}

reflector := cache.NewReflector(lw, &unstructured.Unstructured{}, delegate, resyncPeriod)
go reflector.Run(stop)
exists := false
for _, apires := range resources.APIResources {
if apires.Name == configRes.GVR.Resource {

var res dynamic.ResourceInterface
if apires.Namespaced {
res = a.k8s.Resource(configRes.GVR).Namespace(a.config.Namespace)
} else {
res = a.k8s.Resource(configRes.GVR)
}

lw := &cache.ListWatch{
ListFunc: asUnstructuredLister(res.List, configRes.LabelSelector),
WatchFunc: asUnstructuredWatcher(res.Watch, configRes.LabelSelector),
}

reflector := cache.NewReflector(lw, &unstructured.Unstructured{}, delegate, resyncPeriod)
go reflector.Run(stop)
exists = true
break
}
}

if !exists {
a.logger.Errorf("Could not retrieve information about resource %s: %s", configRes.GVR.String())
}
}

<-stopCh
Expand Down
12 changes: 7 additions & 5 deletions pkg/adapter/apiserver/adapter_injection.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
cloudevents "github.com/cloudevents/sdk-go/v2"
"k8s.io/client-go/rest"
"knative.dev/eventing/pkg/adapter/v2"
kubeclient "knative.dev/pkg/client/injection/kube/client"
"knative.dev/pkg/injection"
"knative.dev/pkg/injection/clients/dynamicclient"
"knative.dev/pkg/logging"
Expand Down Expand Up @@ -66,11 +67,12 @@ func NewAdapter(ctx context.Context, processed adapter.EnvConfigAccessor, ceClie
}

return &apiServerAdapter{
k8s: dynamicclient.Get(ctx),
ce: ceClient,
source: Get(ctx),
name: env.Name,
config: config,
discover: kubeclient.Get(ctx).Discovery(),
k8s: dynamicclient.Get(ctx),
ce: ceClient,
source: Get(ctx),
name: env.Name,
config: config,

logger: logger,
}
Expand Down
108 changes: 102 additions & 6 deletions pkg/adapter/apiserver/adapter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,15 @@ import (
"github.com/pkg/errors"
"go.uber.org/zap"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/discovery"
discoveryfake "k8s.io/client-go/discovery/fake"
"k8s.io/client-go/dynamic"
dynamicfake "k8s.io/client-go/dynamic/fake"
kubetesting "k8s.io/client-go/testing"
adaptertest "knative.dev/eventing/pkg/adapter/v2/test"
rectesting "knative.dev/eventing/pkg/reconciler/testing"
"knative.dev/pkg/logging"
Expand All @@ -53,9 +57,11 @@ func TestAdapter_StartRef(t *testing.T) {
ce: ce,
logger: logging.FromContext(ctx),
config: config,
k8s: makeDynamicClient(simplePod("foo", "default")),
source: "unit-test",
name: "unittest",

discover: makeDiscoveryClient(),
k8s: makeDynamicClient(simplePod("foo", "default")),
source: "unit-test",
name: "unittest",
}

err := errors.New("test never ran")
Expand Down Expand Up @@ -98,9 +104,58 @@ func TestAdapter_StartResource(t *testing.T) {
ce: ce,
logger: logging.FromContext(ctx),
config: config,
k8s: makeDynamicClient(simplePod("foo", "default")),
source: "unit-test",
name: "unittest",

discover: makeDiscoveryClient(),
k8s: makeDynamicClient(simplePod("foo", "default")),
source: "unit-test",
name: "unittest",
}

err := errors.New("test never ran")
stopCh := make(chan struct{})
done := make(chan struct{})
go func() {
err = a.Start(stopCh)
done <- struct{}{}
}()

// Wait for the reflector to be fully initialized.
// Ideally we want to check LastSyncResourceVersion is not empty but we
// don't have access to it.
time.Sleep(1 * time.Second)

stopCh <- struct{}{}
<-done

if err != nil {
t.Errorf("did not expect an error, but got %v", err)
}
}

func TestAdapter_StartNonNamespacedResource(t *testing.T) {
ce := adaptertest.NewTestClient()

config := Config{
Namespace: "default",
Resources: []ResourceWatch{{
GVR: schema.GroupVersionResource{
Version: "v1",
Resource: "namespaces",
},
}},
EventMode: "Resource",
}
ctx, _ := pkgtesting.SetupFakeContext(t)

a := &apiServerAdapter{
ce: ce,
logger: logging.FromContext(ctx),
config: config,

discover: makeDiscoveryClient(),
k8s: makeDynamicClient(simpleNamespace("foo")),
source: "unit-test",
name: "unittest",
}

err := errors.New("test never ran")
Expand Down Expand Up @@ -135,6 +190,35 @@ func makeDynamicClient(objects ...runtime.Object) dynamic.Interface {
return rectesting.NewMockDynamicInterface(realInterface, dynamicMocks)
}

func makeDiscoveryClient() discovery.DiscoveryInterface {
return &discoveryfake.FakeDiscovery{
Fake: &kubetesting.Fake{
Resources: []*metav1.APIResourceList{
{
GroupVersion: "v1",
APIResources: []metav1.APIResource{
// All resources used at tests need to be listed here
{
Name: "pods",
Namespaced: true,
Group: "",
Version: "v1",
Kind: "Pod",
},
{
Name: "namespaces",
Namespaced: false,
Group: "",
Version: "v1",
Kind: "Namespace",
},
},
},
},
},
}
}

func simplePod(name, namespace string) *unstructured.Unstructured {
return &unstructured.Unstructured{
Object: map[string]interface{}{
Expand All @@ -148,6 +232,18 @@ func simplePod(name, namespace string) *unstructured.Unstructured {
}
}

func simpleNamespace(name string) *unstructured.Unstructured {
return &unstructured.Unstructured{
Object: map[string]interface{}{
"apiVersion": "v1",
"kind": "Namespace",
"metadata": map[string]interface{}{
"name": name,
},
},
}
}

func simpleOwnedPod(name, namespace string) *unstructured.Unstructured {
return &unstructured.Unstructured{
Object: map[string]interface{}{
Expand Down

0 comments on commit d76621b

Please sign in to comment.