Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

API Server Source: support non namespaced resources #2930

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 37 additions & 10 deletions pkg/adapter/apiserver/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/discovery"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/tools/cache"
"knative.dev/eventing/pkg/adapter/v2"
Expand All @@ -44,9 +45,10 @@ type apiServerAdapter struct {

config Config

k8s dynamic.Interface
source string // TODO: who dis?
name string // TODO: who dis?
discover discovery.DiscoveryInterface
k8s dynamic.Interface
source string // TODO: who dis?
name string // TODO: who dis?
}

func (a *apiServerAdapter) Start(stopCh <-chan struct{}) error {
Expand Down Expand Up @@ -75,15 +77,40 @@ func (a *apiServerAdapter) Start(stopCh <-chan struct{}) error {

a.logger.Infof("STARTING -- %#v", a.config)

for _, r := range a.config.Resources {
lw := &cache.ListWatch{
// TODO: this will not work with cluster scoped resources.
ListFunc: asUnstructuredLister(a.k8s.Resource(r.GVR).Namespace(a.config.Namespace).List, r.LabelSelector),
WatchFunc: asUnstructuredWatcher(a.k8s.Resource(r.GVR).Namespace(a.config.Namespace).Watch, r.LabelSelector),
for _, configRes := range a.config.Resources {

resources, err := a.discover.ServerResourcesForGroupVersion(configRes.GVR.GroupVersion().String())
if err != nil {
a.logger.Errorf("Could not retrieve information about resource %s: %s", configRes.GVR.String(), err.Error())
continue
}

reflector := cache.NewReflector(lw, &unstructured.Unstructured{}, delegate, resyncPeriod)
go reflector.Run(stop)
exists := false
for _, apires := range resources.APIResources {
if apires.Name == configRes.GVR.Resource {

var res dynamic.ResourceInterface
if apires.Namespaced {
res = a.k8s.Resource(configRes.GVR).Namespace(a.config.Namespace)
} else {
res = a.k8s.Resource(configRes.GVR)
}

lw := &cache.ListWatch{
ListFunc: asUnstructuredLister(res.List, configRes.LabelSelector),
WatchFunc: asUnstructuredWatcher(res.Watch, configRes.LabelSelector),
}

reflector := cache.NewReflector(lw, &unstructured.Unstructured{}, delegate, resyncPeriod)
go reflector.Run(stop)
exists = true
break
}
}

if !exists {
a.logger.Errorf("Could not retrieve information about resource %s: %s", configRes.GVR.String())
}
}

<-stopCh
Expand Down
12 changes: 7 additions & 5 deletions pkg/adapter/apiserver/adapter_injection.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
cloudevents "github.com/cloudevents/sdk-go/v2"
"k8s.io/client-go/rest"
"knative.dev/eventing/pkg/adapter/v2"
kubeclient "knative.dev/pkg/client/injection/kube/client"
"knative.dev/pkg/injection"
"knative.dev/pkg/injection/clients/dynamicclient"
"knative.dev/pkg/logging"
Expand Down Expand Up @@ -66,11 +67,12 @@ func NewAdapter(ctx context.Context, processed adapter.EnvConfigAccessor, ceClie
}

return &apiServerAdapter{
k8s: dynamicclient.Get(ctx),
ce: ceClient,
source: Get(ctx),
name: env.Name,
config: config,
discover: kubeclient.Get(ctx).Discovery(),
k8s: dynamicclient.Get(ctx),
ce: ceClient,
source: Get(ctx),
name: env.Name,
config: config,

logger: logger,
}
Expand Down
108 changes: 102 additions & 6 deletions pkg/adapter/apiserver/adapter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,15 @@ import (
"github.com/pkg/errors"
"go.uber.org/zap"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/discovery"
discoveryfake "k8s.io/client-go/discovery/fake"
"k8s.io/client-go/dynamic"
dynamicfake "k8s.io/client-go/dynamic/fake"
kubetesting "k8s.io/client-go/testing"
adaptertest "knative.dev/eventing/pkg/adapter/v2/test"
rectesting "knative.dev/eventing/pkg/reconciler/testing"
"knative.dev/pkg/logging"
Expand All @@ -53,9 +57,11 @@ func TestAdapter_StartRef(t *testing.T) {
ce: ce,
logger: logging.FromContext(ctx),
config: config,
k8s: makeDynamicClient(simplePod("foo", "default")),
source: "unit-test",
name: "unittest",

discover: makeDiscoveryClient(),
k8s: makeDynamicClient(simplePod("foo", "default")),
source: "unit-test",
name: "unittest",
}

err := errors.New("test never ran")
Expand Down Expand Up @@ -98,9 +104,58 @@ func TestAdapter_StartResource(t *testing.T) {
ce: ce,
logger: logging.FromContext(ctx),
config: config,
k8s: makeDynamicClient(simplePod("foo", "default")),
source: "unit-test",
name: "unittest",

discover: makeDiscoveryClient(),
k8s: makeDynamicClient(simplePod("foo", "default")),
source: "unit-test",
name: "unittest",
}

err := errors.New("test never ran")
stopCh := make(chan struct{})
done := make(chan struct{})
go func() {
err = a.Start(stopCh)
done <- struct{}{}
}()

// Wait for the reflector to be fully initialized.
// Ideally we want to check LastSyncResourceVersion is not empty but we
// don't have access to it.
time.Sleep(1 * time.Second)

stopCh <- struct{}{}
<-done

if err != nil {
t.Errorf("did not expect an error, but got %v", err)
}
}

func TestAdapter_StartNonNamespacedResource(t *testing.T) {
ce := adaptertest.NewTestClient()

config := Config{
Namespace: "default",
Resources: []ResourceWatch{{
GVR: schema.GroupVersionResource{
Version: "v1",
Resource: "namespaces",
},
}},
EventMode: "Resource",
}
ctx, _ := pkgtesting.SetupFakeContext(t)

a := &apiServerAdapter{
ce: ce,
logger: logging.FromContext(ctx),
config: config,

discover: makeDiscoveryClient(),
k8s: makeDynamicClient(simpleNamespace("foo")),
source: "unit-test",
name: "unittest",
}

err := errors.New("test never ran")
Expand Down Expand Up @@ -135,6 +190,35 @@ func makeDynamicClient(objects ...runtime.Object) dynamic.Interface {
return rectesting.NewMockDynamicInterface(realInterface, dynamicMocks)
}

func makeDiscoveryClient() discovery.DiscoveryInterface {
return &discoveryfake.FakeDiscovery{
Fake: &kubetesting.Fake{
Resources: []*metav1.APIResourceList{
{
GroupVersion: "v1",
APIResources: []metav1.APIResource{
// All resources used at tests need to be listed here
{
Name: "pods",
Namespaced: true,
Group: "",
Version: "v1",
Kind: "Pod",
},
{
odacremolbap marked this conversation as resolved.
Show resolved Hide resolved
Name: "namespaces",
Namespaced: false,
Group: "",
Version: "v1",
Kind: "Namespace",
},
},
},
},
},
}
}

func simplePod(name, namespace string) *unstructured.Unstructured {
return &unstructured.Unstructured{
Object: map[string]interface{}{
Expand All @@ -148,6 +232,18 @@ func simplePod(name, namespace string) *unstructured.Unstructured {
}
}

func simpleNamespace(name string) *unstructured.Unstructured {
return &unstructured.Unstructured{
Object: map[string]interface{}{
"apiVersion": "v1",
"kind": "Namespace",
"metadata": map[string]interface{}{
"name": name,
},
},
}
}

func simpleOwnedPod(name, namespace string) *unstructured.Unstructured {
return &unstructured.Unstructured{
Object: map[string]interface{}{
Expand Down