From 52966bc9a3719c24cfb8629add0e35285c69d635 Mon Sep 17 00:00:00 2001
From: Hector Martinez
Date: Thu, 2 May 2024 14:48:06 +0200
Subject: [PATCH] Add SubscriptionsAPI filters to APIServerSource
This MR introduces the `filters` key in the APIServerSource Spec.
This new field allows users to filter which messages are sent from the
APIServerSource to the specified sink. The filter language
is the new SubscriptionsAPI, that allows for powerful filtering.
Signed-off-by: Hector Martinez
---
config/core/configmaps/features.yaml | 3 +
config/core/resources/apiserversource.yaml | 1 +
docs/eventing-api.md | 40 ++++++++-
pkg/adapter/apiserver/adapter.go | 3 +
pkg/adapter/apiserver/adapter_test.go | 11 ++-
pkg/adapter/apiserver/config.go | 12 +++
pkg/adapter/apiserver/delegate.go | 39 +++++----
pkg/adapter/apiserver/delegate_test.go | 42 ++++++++++
pkg/apis/feature/features.go | 1 +
pkg/apis/feature/flag_names.go | 1 +
pkg/apis/sources/v1/apiserver_types.go | 11 +++
pkg/apis/sources/v1/apiserver_validation.go | 19 +++++
.../sources/v1/apiserver_validation_test.go | 81 +++++++++++++++++++
pkg/apis/sources/v1/zz_generated.deepcopy.go | 8 ++
pkg/broker/filter/filter_handler.go | 9 ++-
.../resources/receive_adapter.go | 1 +
test/config/config-features.yaml | 61 ++++++++++++++
test/e2e-common.sh | 10 +++
test/rekt/apiserversource_test.go | 14 ++++
.../features/apiserversource/data_plane.go | 66 +++++++++++++++
.../apiserversource/apiserversource.go | 27 +++++++
.../apiserversource/apiserversource.yaml | 4 +
22 files changed, 442 insertions(+), 22 deletions(-)
create mode 100644 test/config/config-features.yaml
diff --git a/config/core/configmaps/features.yaml b/config/core/configmaps/features.yaml
index 96da3544105..68f6ca282d7 100644
--- a/config/core/configmaps/features.yaml
+++ b/config/core/configmaps/features.yaml
@@ -60,3 +60,6 @@ data:
# For more details: https://github.com/knative/eventing/issues/7739
cross-namespace-event-links: "disabled"
+ # ALPHA feature: The new-apiserversource-filters flag allows you to use the new `filters` field
+ # in APIServerSource objects with its rich filtering capabilities.
+ new-apiserversource-filters: "disabled"
diff --git a/config/core/resources/apiserversource.yaml b/config/core/resources/apiserversource.yaml
index cf0c0ea0033..d387d4e7b0a 100644
--- a/config/core/resources/apiserversource.yaml
+++ b/config/core/resources/apiserversource.yaml
@@ -67,6 +67,7 @@ spec:
properties:
spec:
type: object
+ x-kubernetes-preserve-unknown-fields: true
required:
- resources
properties:
diff --git a/docs/eventing-api.md b/docs/eventing-api.md
index 6d06fccf261..44c4aa32287 100644
--- a/docs/eventing-api.md
+++ b/docs/eventing-api.md
@@ -2101,7 +2101,7 @@ resolved delivery options.
SubscriptionsAPIFilter
-(Appears on:SubscriptionsAPIFilter, TriggerSpec)
+(Appears on:SubscriptionsAPIFilter, TriggerSpec, ApiServerSourceSpec)
SubscriptionsAPIFilter allows defining a filter expression using CloudEvents
@@ -5327,6 +5327,25 @@ Kubernetes meta/v1.LabelSelector
should be watched by the source.
+
+
+filters
+
+
+[]SubscriptionsAPIFilter
+
+
+ |
+
+(Optional)
+ Filters is an experimental field that conforms to the CNCF CloudEvents Subscriptions
+API. It’s an array of filter expressions that evaluate to true or false.
+If any filter expression in the array evaluates to false, the event MUST
+NOT be sent to the Sink. If all the filter expressions in the array
+evaluate to true, the event MUST be attempted to be delivered. Absence of
+a filter or empty array implies a value of true.
+ |
+
@@ -5934,6 +5953,25 @@ Kubernetes meta/v1.LabelSelector
should be watched by the source.
+
+
+filters
+
+
+[]SubscriptionsAPIFilter
+
+
+ |
+
+(Optional)
+ Filters is an experimental field that conforms to the CNCF CloudEvents Subscriptions
+API. It’s an array of filter expressions that evaluate to true or false.
+If any filter expression in the array evaluates to false, the event MUST
+NOT be sent to the Sink. If all the filter expressions in the array
+evaluate to true, the event MUST be attempted to be delivered. Absence of
+a filter or empty array implies a value of true.
+ |
+
ApiServerSourceStatus
diff --git a/pkg/adapter/apiserver/adapter.go b/pkg/adapter/apiserver/adapter.go
index 1a858735b51..bfab282d5d0 100644
--- a/pkg/adapter/apiserver/adapter.go
+++ b/pkg/adapter/apiserver/adapter.go
@@ -34,6 +34,8 @@ import (
"knative.dev/eventing/pkg/adapter/v2"
v1 "knative.dev/eventing/pkg/apis/sources/v1"
+ brokerfilter "knative.dev/eventing/pkg/broker/filter"
+ "knative.dev/eventing/pkg/eventfilter/subscriptionsapi"
)
type envConfig struct {
@@ -71,6 +73,7 @@ func (a *apiServerAdapter) start(ctx context.Context, stopCh <-chan struct{}) er
logger: a.logger,
ref: a.config.EventMode == v1.ReferenceMode,
apiServerSourceName: a.name,
+ filter: subscriptionsapi.NewAllFilter(brokerfilter.MaterializeFiltersList(a.logger.Desugar(), a.config.Filters)...),
}
if a.config.ResourceOwner != nil {
a.logger.Infow("will be filtered",
diff --git a/pkg/adapter/apiserver/adapter_test.go b/pkg/adapter/apiserver/adapter_test.go
index 71b9a86f7cc..79e2721884a 100644
--- a/pkg/adapter/apiserver/adapter_test.go
+++ b/pkg/adapter/apiserver/adapter_test.go
@@ -34,6 +34,9 @@ import (
dynamicfake "k8s.io/client-go/dynamic/fake"
kubetesting "k8s.io/client-go/testing"
adaptertest "knative.dev/eventing/pkg/adapter/v2/test"
+ eventingv1 "knative.dev/eventing/pkg/apis/eventing/v1"
+ brokerfilter "knative.dev/eventing/pkg/broker/filter"
+ "knative.dev/eventing/pkg/eventfilter/subscriptionsapi"
rectesting "knative.dev/eventing/pkg/reconciler/testing"
"knative.dev/pkg/logging"
pkgtesting "knative.dev/pkg/reconciler/testing"
@@ -289,21 +292,27 @@ func validateNotSent(t *testing.T, ce *adaptertest.TestCloudEventsClient, want s
func makeResourceAndTestingClient() (*resourceDelegate, *adaptertest.TestCloudEventsClient) {
ce := adaptertest.NewTestClient()
+ logger := zap.NewExample().Sugar()
+
return &resourceDelegate{
ce: ce,
source: "unit-test",
apiServerSourceName: apiServerSourceNameTest,
- logger: zap.NewExample().Sugar(),
+ logger: logger,
+ filter: subscriptionsapi.NewAllFilter(brokerfilter.MaterializeFiltersList(logger.Desugar(), []eventingv1.SubscriptionsAPIFilter{})...),
}, ce
}
func makeRefAndTestingClient() (*resourceDelegate, *adaptertest.TestCloudEventsClient) {
ce := adaptertest.NewTestClient()
+ logger := zap.NewExample().Sugar()
+
return &resourceDelegate{
ce: ce,
source: "unit-test",
apiServerSourceName: apiServerSourceNameTest,
logger: zap.NewExample().Sugar(),
ref: true,
+ filter: subscriptionsapi.NewAllFilter(brokerfilter.MaterializeFiltersList(logger.Desugar(), []eventingv1.SubscriptionsAPIFilter{})...),
}, ce
}
diff --git a/pkg/adapter/apiserver/config.go b/pkg/adapter/apiserver/config.go
index 19fca01177a..c85f7f4de85 100644
--- a/pkg/adapter/apiserver/config.go
+++ b/pkg/adapter/apiserver/config.go
@@ -17,6 +17,8 @@ package apiserver
import (
"k8s.io/apimachinery/pkg/runtime/schema"
+
+ eventingv1 "knative.dev/eventing/pkg/apis/eventing/v1"
v1 "knative.dev/eventing/pkg/apis/sources/v1"
)
@@ -56,4 +58,14 @@ type Config struct {
// Defaults to `Reference`
// +optional
EventMode string `json:"mode,omitempty"`
+
+ // Filters is an experimental field that conforms to the CNCF CloudEvents Subscriptions
+ // API. It's an array of filter expressions that evaluate to true or false.
+ // If any filter expression in the array evaluates to false, the event MUST
+ // NOT be sent to the Sink. If all the filter expressions in the array
+ // evaluate to true, the event MUST be attempted to be delivered. Absence of
+ // a filter or empty array implies a value of true.
+ //
+ // +optional
+ Filters []eventingv1.SubscriptionsAPIFilter `json:"filters,omitempty"`
}
diff --git a/pkg/adapter/apiserver/delegate.go b/pkg/adapter/apiserver/delegate.go
index e45d87e67ae..f78b3abe163 100644
--- a/pkg/adapter/apiserver/delegate.go
+++ b/pkg/adapter/apiserver/delegate.go
@@ -24,6 +24,7 @@ import (
"go.uber.org/zap"
"k8s.io/client-go/tools/cache"
"knative.dev/eventing/pkg/adapter/apiserver/events"
+ "knative.dev/eventing/pkg/eventfilter"
)
type resourceDelegate struct {
@@ -31,6 +32,7 @@ type resourceDelegate struct {
source string
ref bool
apiServerSourceName string
+ filter eventfilter.Filter
logger *zap.SugaredLogger
}
@@ -38,31 +40,36 @@ type resourceDelegate struct {
var _ cache.Store = (*resourceDelegate)(nil)
func (a *resourceDelegate) Add(obj interface{}) error {
- ctx, event, err := events.MakeAddEvent(a.source, a.apiServerSourceName, obj, a.ref)
- if err != nil {
- a.logger.Infow("event creation failed", zap.Error(err))
- return err
- }
- a.sendCloudEvent(ctx, event)
- return nil
+ return a.handleKubernetesObject(events.MakeAddEvent, obj)
}
func (a *resourceDelegate) Update(obj interface{}) error {
- ctx, event, err := events.MakeUpdateEvent(a.source, a.apiServerSourceName, obj, a.ref)
- if err != nil {
- a.logger.Info("event creation failed", zap.Error(err))
- return err
- }
- a.sendCloudEvent(ctx, event)
- return nil
+ return a.handleKubernetesObject(events.MakeUpdateEvent, obj)
}
func (a *resourceDelegate) Delete(obj interface{}) error {
- ctx, event, err := events.MakeDeleteEvent(a.source, a.apiServerSourceName, obj, a.ref)
+ return a.handleKubernetesObject(events.MakeDeleteEvent, obj)
+
+}
+
+// makeEventFunc represents the signature of the functions `events.Make*Event` so they can
+// be passed as a parameter
+type makeEventFunc func(string, string, interface{}, bool) (context.Context, cloudevents.Event, error)
+
+func (a *resourceDelegate) handleKubernetesObject(makeEvent makeEventFunc, obj interface{}) error {
+ ctx, event, err := makeEvent(a.source, a.apiServerSourceName, obj, a.ref)
+
if err != nil {
- a.logger.Info("event creation failed", zap.Error(err))
+ a.logger.Infow("event creation failed", zap.Error(err))
return err
}
+
+ filterResult := a.filter.Filter(ctx, event)
+ if filterResult == eventfilter.FailFilter {
+ a.logger.Debugf("event type %s filtered out", event.Type())
+ return nil
+ }
+
a.sendCloudEvent(ctx, event)
return nil
}
diff --git a/pkg/adapter/apiserver/delegate_test.go b/pkg/adapter/apiserver/delegate_test.go
index a8e9270885f..00fc9dfe691 100644
--- a/pkg/adapter/apiserver/delegate_test.go
+++ b/pkg/adapter/apiserver/delegate_test.go
@@ -18,7 +18,12 @@ package apiserver
import (
"testing"
+ "go.uber.org/zap"
+ adaptertest "knative.dev/eventing/pkg/adapter/v2/test"
+ eventingv1 "knative.dev/eventing/pkg/apis/eventing/v1"
"knative.dev/eventing/pkg/apis/sources"
+ brokerfilter "knative.dev/eventing/pkg/broker/filter"
+ "knative.dev/eventing/pkg/eventfilter/subscriptionsapi"
)
func TestResourceAddEvent(t *testing.T) {
@@ -68,3 +73,40 @@ func TestResourceStub(t *testing.T) {
d.Replace(nil, "")
d.Resync()
}
+
+func TestFilterFails(t *testing.T) {
+ ce := adaptertest.NewTestClient()
+ filters := []eventingv1.SubscriptionsAPIFilter{{
+ Exact: map[string]string{
+ "type": "dev.knative.apiserver.resource.add",
+ },
+ }}
+ logger := zap.NewExample().Sugar()
+ delegate := &resourceDelegate{
+ ce: ce,
+ source: "unit-test",
+ apiServerSourceName: apiServerSourceNameTest,
+ logger: logger,
+ filter: subscriptionsapi.NewAllFilter(brokerfilter.MaterializeFiltersList(logger.Desugar(), filters)...),
+ }
+
+ delegate.Update(simplePod("unit", "test"))
+ validateNotSent(t, ce, sources.ApiServerSourceUpdateEventType)
+}
+
+func TestEmptyFiltersList(t *testing.T) {
+ ce := adaptertest.NewTestClient()
+ filters := []eventingv1.SubscriptionsAPIFilter{}
+
+ logger := zap.NewExample().Sugar()
+ delegate := &resourceDelegate{
+ ce: ce,
+ source: "unit-test",
+ apiServerSourceName: apiServerSourceNameTest,
+ logger: logger,
+ filter: subscriptionsapi.NewAllFilter(brokerfilter.MaterializeFiltersList(logger.Desugar(), filters)...),
+ }
+
+ delegate.Update(simplePod("unit", "test"))
+ validateSent(t, ce, sources.ApiServerSourceUpdateEventType)
+}
diff --git a/pkg/apis/feature/features.go b/pkg/apis/feature/features.go
index 9fc57664e53..e01195bb5f6 100644
--- a/pkg/apis/feature/features.go
+++ b/pkg/apis/feature/features.go
@@ -61,6 +61,7 @@ func newDefaults() Flags {
TransportEncryption: Disabled,
OIDCAuthentication: Disabled,
EvenTypeAutoCreate: Disabled,
+ NewAPIServerFilters: Disabled,
}
}
diff --git a/pkg/apis/feature/flag_names.go b/pkg/apis/feature/flag_names.go
index 40de28fcaa3..cd937554c4b 100644
--- a/pkg/apis/feature/flag_names.go
+++ b/pkg/apis/feature/flag_names.go
@@ -27,4 +27,5 @@ const (
OIDCAuthentication = "authentication-oidc"
NodeSelectorLabel = "apiserversources-nodeselector-"
CrossNamespaceEventLinks = "cross-namespace-event-links"
+ NewAPIServerFilters = "new-apiserversource-filters"
)
diff --git a/pkg/apis/sources/v1/apiserver_types.go b/pkg/apis/sources/v1/apiserver_types.go
index cfe41a956b1..ddd6332f359 100644
--- a/pkg/apis/sources/v1/apiserver_types.go
+++ b/pkg/apis/sources/v1/apiserver_types.go
@@ -19,6 +19,7 @@ package v1
import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
+ eventingv1 "knative.dev/eventing/pkg/apis/eventing/v1"
"knative.dev/pkg/apis"
duckv1 "knative.dev/pkg/apis/duck/v1"
"knative.dev/pkg/kmeta"
@@ -85,6 +86,16 @@ type ApiServerSourceSpec struct {
// should be watched by the source.
// +optional
NamespaceSelector *metav1.LabelSelector `json:"namespaceSelector,omitempty"`
+
+ // Filters is an experimental field that conforms to the CNCF CloudEvents Subscriptions
+ // API. It's an array of filter expressions that evaluate to true or false.
+ // If any filter expression in the array evaluates to false, the event MUST
+ // NOT be sent to the Sink. If all the filter expressions in the array
+ // evaluate to true, the event MUST be attempted to be delivered. Absence of
+ // a filter or empty array implies a value of true.
+ //
+ // +optional
+ Filters []eventingv1.SubscriptionsAPIFilter `json:"filters,omitempty"`
}
// ApiServerSourceStatus defines the observed state of ApiServerSource
diff --git a/pkg/apis/sources/v1/apiserver_validation.go b/pkg/apis/sources/v1/apiserver_validation.go
index 1b4774029c0..0eacb2dec94 100644
--- a/pkg/apis/sources/v1/apiserver_validation.go
+++ b/pkg/apis/sources/v1/apiserver_validation.go
@@ -22,6 +22,8 @@ import (
"k8s.io/apimachinery/pkg/runtime/schema"
+ eventingv1 "knative.dev/eventing/pkg/apis/eventing/v1"
+ "knative.dev/eventing/pkg/apis/feature"
"knative.dev/pkg/apis"
)
@@ -73,5 +75,22 @@ func (cs *ApiServerSourceSpec) Validate(ctx context.Context) *apis.FieldError {
}
}
errs = errs.Also(cs.SourceSpec.Validate(ctx))
+ errs = errs.Also(validateSubscriptionAPIFiltersList(ctx, cs.Filters).ViaField("filters"))
+ return errs
+}
+
+func validateSubscriptionAPIFiltersList(ctx context.Context, filters []eventingv1.SubscriptionsAPIFilter) (errs *apis.FieldError) {
+ if !feature.FromContext(ctx).IsEnabled(feature.NewAPIServerFilters) {
+ if len(filters) != 0 {
+ return errs.Also(apis.ErrGeneric("Filters is not empty but the NewAPIServerFilters feature is disabled."))
+ }
+
+ return nil
+ }
+
+ for i, f := range filters {
+ f := f
+ errs = errs.Also(eventingv1.ValidateSubscriptionAPIFilter(ctx, &f)).ViaIndex(i)
+ }
return errs
}
diff --git a/pkg/apis/sources/v1/apiserver_validation_test.go b/pkg/apis/sources/v1/apiserver_validation_test.go
index 9c6b509fe88..57646f3e23d 100644
--- a/pkg/apis/sources/v1/apiserver_validation_test.go
+++ b/pkg/apis/sources/v1/apiserver_validation_test.go
@@ -23,9 +23,11 @@ import (
"github.com/stretchr/testify/assert"
+ eventingv1 "knative.dev/eventing/pkg/apis/eventing/v1"
duckv1 "knative.dev/pkg/apis/duck/v1"
"github.com/google/go-cmp/cmp"
+ "knative.dev/eventing/pkg/apis/feature"
"knative.dev/pkg/apis"
)
@@ -266,3 +268,82 @@ func TestAPIServerValidationCallsSpecValidation(t *testing.T) {
err := source.Validate(context.TODO())
assert.EqualError(t, err, "missing field(s): spec.resources", "Spec is not validated!")
}
+
+func TestAPIServerFiltersValidation(t *testing.T) {
+ tests := []struct {
+ name string
+ featureState feature.Flag
+ want error
+ filters []eventingv1.SubscriptionsAPIFilter
+ }{{
+ name: "an error is raised if the feature is disabled but filters are specified",
+ featureState: feature.Disabled,
+ filters: []eventingv1.SubscriptionsAPIFilter{{
+ Prefix: map[string]string{
+ "invALID": "abc",
+ },
+ }},
+ want: apis.ErrGeneric("Filters is not empty but the NewAPIServerFilters feature is disabled."),
+ }, {
+ name: "filters are validated when the feature is enabled",
+ featureState: feature.Enabled,
+ filters: []eventingv1.SubscriptionsAPIFilter{{
+ Prefix: map[string]string{
+ "invALID": "abc",
+ },
+ }},
+ want: apis.ErrInvalidKeyName("invALID", apis.CurrentField,
+ "Attribute name must start with a letter and can only contain "+
+ "lowercase alphanumeric").ViaFieldKey("prefix", "invALID").ViaFieldIndex("filters", 0),
+ }, {
+ name: "validation works for valid filters",
+ featureState: feature.Enabled,
+ filters: []eventingv1.SubscriptionsAPIFilter{{
+ Exact: map[string]string{"myattr": "myval"},
+ }},
+ want: nil,
+ }, {
+ name: "validation works for empty filters",
+ featureState: feature.Enabled,
+ filters: []eventingv1.SubscriptionsAPIFilter{},
+ want: nil,
+ }, {
+ name: "validation does not work for empty filters",
+ featureState: feature.Disabled,
+ filters: []eventingv1.SubscriptionsAPIFilter{},
+ want: nil,
+ }}
+
+ for _, test := range tests {
+ t.Run(test.name, func(t *testing.T) {
+ featureContext := feature.ToContext(context.TODO(), feature.Flags{
+ feature.NewAPIServerFilters: test.featureState,
+ })
+ apiserversource := &ApiServerSourceSpec{
+ Filters: test.filters,
+ EventMode: "Resource",
+ Resources: []APIVersionKindSelector{{
+ APIVersion: "v1",
+ Kind: "Foo",
+ }},
+ SourceSpec: duckv1.SourceSpec{
+ Sink: duckv1.Destination{
+ Ref: &duckv1.KReference{
+ APIVersion: "v1",
+ Kind: "broker",
+ Name: "default",
+ },
+ },
+ },
+ }
+ got := apiserversource.Validate(featureContext)
+ if test.want != nil {
+ if diff := cmp.Diff(test.want.Error(), got.Error()); diff != "" {
+ t.Errorf("APIServerSourceSpec.Validate (-want, +got) = %v", diff)
+ }
+ } else if got != nil {
+ t.Errorf("APIServerSourceSpec.Validate wanted nil, got = %v", got.Error())
+ }
+ })
+ }
+}
diff --git a/pkg/apis/sources/v1/zz_generated.deepcopy.go b/pkg/apis/sources/v1/zz_generated.deepcopy.go
index 6d175e3c960..8de185540fc 100644
--- a/pkg/apis/sources/v1/zz_generated.deepcopy.go
+++ b/pkg/apis/sources/v1/zz_generated.deepcopy.go
@@ -24,6 +24,7 @@ package v1
import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
runtime "k8s.io/apimachinery/pkg/runtime"
+ eventingv1 "knative.dev/eventing/pkg/apis/eventing/v1"
)
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
@@ -145,6 +146,13 @@ func (in *ApiServerSourceSpec) DeepCopyInto(out *ApiServerSourceSpec) {
*out = new(metav1.LabelSelector)
(*in).DeepCopyInto(*out)
}
+ if in.Filters != nil {
+ in, out := &in.Filters, &out.Filters
+ *out = make([]eventingv1.SubscriptionsAPIFilter, len(*in))
+ for i := range *in {
+ (*in)[i].DeepCopyInto(&(*out)[i])
+ }
+ }
return
}
diff --git a/pkg/broker/filter/filter_handler.go b/pkg/broker/filter/filter_handler.go
index 6ee15200be9..6ded61139e5 100644
--- a/pkg/broker/filter/filter_handler.go
+++ b/pkg/broker/filter/filter_handler.go
@@ -545,7 +545,7 @@ func createSubscriptionsAPIFilters(logger *zap.Logger, trigger *eventingv1.Trigg
logger.Debug("Found no filters for trigger", zap.Any("trigger.Spec", trigger.Spec))
return subscriptionsapi.NewNoFilter()
}
- return subscriptionsapi.NewAllFilter(materializeFiltersList(logger, trigger.Spec.Filters)...)
+ return subscriptionsapi.NewAllFilter(MaterializeFiltersList(logger, trigger.Spec.Filters)...)
}
func materializeSubscriptionsAPIFilter(logger *zap.Logger, filter eventingv1.SubscriptionsAPIFilter) eventfilter.Filter {
@@ -574,9 +574,9 @@ func materializeSubscriptionsAPIFilter(logger *zap.Logger, filter eventingv1.Sub
return nil
}
case len(filter.All) > 0:
- materializedFilter = subscriptionsapi.NewAllFilter(materializeFiltersList(logger, filter.All)...)
+ materializedFilter = subscriptionsapi.NewAllFilter(MaterializeFiltersList(logger, filter.All)...)
case len(filter.Any) > 0:
- materializedFilter = subscriptionsapi.NewAnyFilter(materializeFiltersList(logger, filter.Any)...)
+ materializedFilter = subscriptionsapi.NewAnyFilter(MaterializeFiltersList(logger, filter.Any)...)
case filter.Not != nil:
materializedFilter = subscriptionsapi.NewNotFilter(materializeSubscriptionsAPIFilter(logger, *filter.Not))
case filter.CESQL != "":
@@ -589,7 +589,8 @@ func materializeSubscriptionsAPIFilter(logger *zap.Logger, filter eventingv1.Sub
return materializedFilter
}
-func materializeFiltersList(logger *zap.Logger, filters []eventingv1.SubscriptionsAPIFilter) []eventfilter.Filter {
+// MaterialzieFilterList allows any component that supports `SubscriptionsAPIFilter` to process them
+func MaterializeFiltersList(logger *zap.Logger, filters []eventingv1.SubscriptionsAPIFilter) []eventfilter.Filter {
materializedFilters := make([]eventfilter.Filter, 0, len(filters))
for _, f := range filters {
f := materializeSubscriptionsAPIFilter(logger, f)
diff --git a/pkg/reconciler/apiserversource/resources/receive_adapter.go b/pkg/reconciler/apiserversource/resources/receive_adapter.go
index 6b3bfe0827b..0997c8c7632 100644
--- a/pkg/reconciler/apiserversource/resources/receive_adapter.go
+++ b/pkg/reconciler/apiserversource/resources/receive_adapter.go
@@ -127,6 +127,7 @@ func makeEnv(args *ReceiveAdapterArgs) ([]corev1.EnvVar, error) {
ResourceOwner: args.Source.Spec.ResourceOwner,
EventMode: args.Source.Spec.EventMode,
AllNamespaces: args.AllNamespaces,
+ Filters: args.Source.Spec.Filters,
}
for _, r := range args.Source.Spec.Resources {
diff --git a/test/config/config-features.yaml b/test/config/config-features.yaml
new file mode 100644
index 00000000000..30194a008de
--- /dev/null
+++ b/test/config/config-features.yaml
@@ -0,0 +1,61 @@
+# Copyright 2024 The Knative Authors
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# https://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+apiVersion: v1
+kind: ConfigMap
+metadata:
+ name: config-features
+ namespace: knative-eventing
+ labels:
+data:
+ # ALPHA feature: The kreference-group allows you to use the Group field in KReferences.
+ # For more details: https://github.com/knative/eventing/issues/5086
+ kreference-group: "disabled"
+
+ # ALPHA feature: The delivery-retryafter allows you to use the RetryAfter field in DeliverySpec.
+ # For more details: https://github.com/knative/eventing/issues/5811
+ delivery-retryafter: "disabled"
+
+ # BETA feature: The delivery-timeout allows you to use the Timeout field in DeliverySpec.
+ # For more details: https://github.com/knative/eventing/issues/5148
+ delivery-timeout: "enabled"
+
+ # ALPHA feature: The kreference-mapping allows you to map kreference onto templated URI
+ # For more details: https://github.com/knative/eventing/issues/5593
+ kreference-mapping: "disabled"
+
+ # BETA feature: The new-trigger-filters flag allows you to use the new `filters` field
+ # in Trigger objects with its rich filtering capabilities.
+ # For more details: https://github.com/knative/eventing/issues/5204
+ new-trigger-filters: "enabled"
+
+ # ALPHA feature: The transport-encryption flag allows you to encrypt events in transit using the transport layer security (TLS) protocol.
+ # For more details: https://github.com/knative/eventing/issues/5957
+ transport-encryption: "disabled"
+
+ # ALPHA feature: The eventtype-auto-create flag allows automatic creation of Even Type instances based on Event's type being processed.
+ # For more details: https://github.com/knative/eventing/issues/6909
+ eventtype-auto-create: "disabled"
+
+ # ALPHA feature: The aauthentication-oidc flag allows you to use OIDC authentication for Eventing.
+ # For more details: https://github.com/knative/eventing/issues/7174
+ authentication-oidc: "disabled"
+
+ # ALPHA feature: The cross-namespace-event-links flag allows you to use cross-namespace referencing for Eventing.
+ # For more details: https://github.com/knative/eventing/issues/7739
+ cross-namespace-event-links: "disabled"
+
+ # ALPHA feature: The new-apiserversource-filters flag allows you to use the new `filters` field
+ # in APIServerSource objects with its rich filtering capabilities.
+ new-apiserversource-filters: "enabled"
diff --git a/test/e2e-common.sh b/test/e2e-common.sh
index 0ef9cde0f27..d629cc6081f 100755
--- a/test/e2e-common.sh
+++ b/test/e2e-common.sh
@@ -87,6 +87,8 @@ function knative_setup() {
enable_sugar || fail_test "Could not enable Sugar Controller Injection"
unleash_duck || fail_test "Could not unleash the chaos duck"
+
+ install_feature_cm || fail_test "Could not install features configmap"
}
function scale_controlplane() {
@@ -271,6 +273,14 @@ function unleash_duck() {
if (( SCALE_CHAOSDUCK_TO_ZERO )); then kubectl -n "${SYSTEM_NAMESPACE}" scale deployment/chaosduck --replicas=0; fi
}
+function install_feature_cm() {
+ KO_FLAGS="${KO_FLAGS:-}"
+ echo "install feature configmap"
+ cat test/config/config-features.yaml | \
+ sed "s/namespace: ${KNATIVE_DEFAULT_NAMESPACE}/namespace: ${SYSTEM_NAMESPACE}/g" | \
+ ko apply "${KO_FLAGS}" -f - || return $?
+}
+
# Teardown the Knative environment after tests finish.
function knative_teardown() {
echo ">> Stopping Knative Eventing"
diff --git a/test/rekt/apiserversource_test.go b/test/rekt/apiserversource_test.go
index e2641fc3ba8..f9ff6f9094e 100644
--- a/test/rekt/apiserversource_test.go
+++ b/test/rekt/apiserversource_test.go
@@ -212,3 +212,17 @@ func TestApiServerSourceDeployment(t *testing.T) {
env.Test(ctx, t, apiserversourcefeatures.DeployAPIServerSourceWithNodeSelector())
}
+
+func TestApiServerSourceNewFiltersFeature(t *testing.T) {
+ t.Parallel()
+
+ ctx, env := global.Environment(
+ knative.WithKnativeNamespace(system.Namespace()),
+ knative.WithLoggingConfig,
+ knative.WithTracingConfig,
+ k8s.WithEventListener,
+ environment.Managed(t),
+ )
+
+ env.TestSet(ctx, t, apiserversourcefeatures.NewFiltersFeature())
+}
diff --git a/test/rekt/features/apiserversource/data_plane.go b/test/rekt/features/apiserversource/data_plane.go
index 01661db1716..40b31c6fa73 100644
--- a/test/rekt/features/apiserversource/data_plane.go
+++ b/test/rekt/features/apiserversource/data_plane.go
@@ -21,6 +21,7 @@ import (
"fmt"
"github.com/cloudevents/sdk-go/v2/test"
+ eventingv1 "knative.dev/eventing/pkg/apis/eventing/v1"
"knative.dev/pkg/apis"
duckv1 "knative.dev/pkg/apis/duck/v1"
"knative.dev/pkg/network"
@@ -947,3 +948,68 @@ func SendsEventsWithBrokerAsSinkTLS() *feature.Feature {
return f
}
+
+func NewFiltersFeature() *feature.FeatureSet {
+ fs := &feature.FeatureSet{
+ Name: "Knative ApiServerSource - Features - New Filter",
+ Features: []*feature.Feature{
+ EventsAreFilteredOut(),
+ },
+ }
+ return fs
+}
+
+func EventsAreFilteredOut() *feature.Feature {
+ source := feature.MakeRandomK8sName("apiserversource")
+ sink := feature.MakeRandomK8sName("sink")
+ f := feature.NewFeatureNamed("Filters properly the messages")
+
+ f.Setup("install sink", eventshub.Install(sink, eventshub.StartReceiver))
+
+ sacmName := feature.MakeRandomK8sName("apiserversource")
+ f.Setup("Create Service Account for ApiServerSource with RBAC for v1.Pod resources",
+ setupAccountAndRoleForPods(sacmName))
+
+ cfg := []manifest.CfgFn{
+ apiserversource.WithServiceAccountName(sacmName),
+ apiserversource.WithEventMode(v1.ResourceMode),
+ apiserversource.WithSink(service.AsDestinationRef(sink)),
+ apiserversource.WithFilters([]eventingv1.SubscriptionsAPIFilter{{
+ Exact: map[string]string{
+ "type": "dev.knative.apiserver.resource.update",
+ },
+ }}),
+ apiserversource.WithResources(v1.APIVersionKindSelector{
+ APIVersion: "v1",
+ Kind: "Pod",
+ }),
+ }
+
+ f.Setup("install ApiServerSource", apiserversource.Install(source, cfg...))
+ f.Setup("ApiServerSource goes ready", apiserversource.IsReady(source))
+
+ examplePodName := feature.MakeRandomK8sName("example")
+
+ // create a pod so that ApiServerSource delivers an event to its sink
+ // event body is similar to this:
+ // {"kind":"Pod","namespace":"test-wmbcixlv","name":"example-axvlzbvc","apiVersion":"v1"}
+ f.Requirement("install example pod", pod.Install(examplePodName, exampleImage))
+
+ f.Stable("ApiServerSource as event source").
+ Must("delivers events",
+ eventassert.OnStore(sink).MatchEvent(
+ test.HasType("dev.knative.apiserver.resource.add"),
+ test.HasExtensions(map[string]interface{}{"apiversion": "v1"}),
+ test.DataContains(`"kind":"Pod"`),
+ test.DataContains(fmt.Sprintf(`"name":"%s"`, examplePodName)),
+ ).Exact(0)).
+ Must("delivers events",
+ eventassert.OnStore(sink).MatchEvent(
+ test.HasType("dev.knative.apiserver.resource.update"),
+ test.HasExtensions(map[string]interface{}{"apiversion": "v1"}),
+ test.DataContains(`"kind":"Pod"`),
+ test.DataContains(fmt.Sprintf(`"name":"%s"`, examplePodName)),
+ ).AtLeast(1))
+
+ return f
+}
diff --git a/test/rekt/resources/apiserversource/apiserversource.go b/test/rekt/resources/apiserversource/apiserversource.go
index 8b92fbeaaa3..158243b8555 100644
--- a/test/rekt/resources/apiserversource/apiserversource.go
+++ b/test/rekt/resources/apiserversource/apiserversource.go
@@ -19,6 +19,7 @@ package apiserversource
import (
"context"
"embed"
+ "encoding/json"
"fmt"
"strings"
"time"
@@ -27,6 +28,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
+ eventingv1 "knative.dev/eventing/pkg/apis/eventing/v1"
"knative.dev/reconciler-test/pkg/environment"
"knative.dev/reconciler-test/pkg/feature"
"knative.dev/reconciler-test/pkg/k8s"
@@ -37,6 +39,8 @@ import (
duckv1 "knative.dev/pkg/apis/duck/v1"
"knative.dev/pkg/kmeta"
"knative.dev/reconciler-test/pkg/manifest"
+
+ yamllib "sigs.k8s.io/yaml"
)
//go:embed *.yaml
@@ -264,3 +268,26 @@ func ResetNodeLabels(ctx context.Context, t feature.T) {
t.Fatalf("Could not update node: %v", err)
}
}
+
+func WithFilters(filters []eventingv1.SubscriptionsAPIFilter) manifest.CfgFn {
+ jsonBytes, err := json.Marshal(filters)
+ if err != nil {
+ panic(err)
+ }
+
+ yamlBytes, err := yamllib.JSONToYAML(jsonBytes)
+ if err != nil {
+ panic(err)
+ }
+
+ filtersYaml := string(yamlBytes)
+ lines := strings.Split(filtersYaml, "\n")
+ out := make([]string, 0, len(lines))
+ for i := range lines {
+ out = append(out, " "+lines[i])
+ }
+
+ return func(cfg map[string]interface{}) {
+ cfg["filters"] = strings.Join(out, "\n")
+ }
+}
diff --git a/test/rekt/resources/apiserversource/apiserversource.yaml b/test/rekt/resources/apiserversource/apiserversource.yaml
index 57da274150b..82e3350e2a1 100644
--- a/test/rekt/resources/apiserversource/apiserversource.yaml
+++ b/test/rekt/resources/apiserversource/apiserversource.yaml
@@ -87,3 +87,7 @@ spec:
audience: {{ .sink.audience }}
{{ end }}
{{ end }}
+ {{ if .filters }}
+ filters:
+{{ .filters }}
+ {{ end }}