diff --git a/config/core/configmaps/features.yaml b/config/core/configmaps/features.yaml
index 96da3544105..68f6ca282d7 100644
--- a/config/core/configmaps/features.yaml
+++ b/config/core/configmaps/features.yaml
@@ -60,3 +60,6 @@ data:
# For more details: https://github.com/knative/eventing/issues/7739
cross-namespace-event-links: "disabled"
+ # ALPHA feature: The new-apiserversource-filters flag allows you to use the new `filters` field
+ # in APIServerSource objects with its rich filtering capabilities.
+ new-apiserversource-filters: "disabled"
diff --git a/config/core/resources/apiserversource.yaml b/config/core/resources/apiserversource.yaml
index cf0c0ea0033..d387d4e7b0a 100644
--- a/config/core/resources/apiserversource.yaml
+++ b/config/core/resources/apiserversource.yaml
@@ -67,6 +67,7 @@ spec:
properties:
spec:
type: object
+ x-kubernetes-preserve-unknown-fields: true
required:
- resources
properties:
diff --git a/docs/eventing-api.md b/docs/eventing-api.md
index 6d06fccf261..44c4aa32287 100644
--- a/docs/eventing-api.md
+++ b/docs/eventing-api.md
@@ -2101,7 +2101,7 @@ resolved delivery options.
SubscriptionsAPIFilter
-(Appears on:SubscriptionsAPIFilter, TriggerSpec)
+(Appears on:SubscriptionsAPIFilter, TriggerSpec, ApiServerSourceSpec)
SubscriptionsAPIFilter allows defining a filter expression using CloudEvents
@@ -5327,6 +5327,25 @@ Kubernetes meta/v1.LabelSelector
should be watched by the source.
+
+
+filters
+
+
+[]SubscriptionsAPIFilter
+
+
+ |
+
+(Optional)
+ Filters is an experimental field that conforms to the CNCF CloudEvents Subscriptions
+API. It’s an array of filter expressions that evaluate to true or false.
+If any filter expression in the array evaluates to false, the event MUST
+NOT be sent to the Sink. If all the filter expressions in the array
+evaluate to true, the event MUST be attempted to be delivered. Absence of
+a filter or empty array implies a value of true.
+ |
+
@@ -5934,6 +5953,25 @@ Kubernetes meta/v1.LabelSelector
should be watched by the source.
+
+
+filters
+
+
+[]SubscriptionsAPIFilter
+
+
+ |
+
+(Optional)
+ Filters is an experimental field that conforms to the CNCF CloudEvents Subscriptions
+API. It’s an array of filter expressions that evaluate to true or false.
+If any filter expression in the array evaluates to false, the event MUST
+NOT be sent to the Sink. If all the filter expressions in the array
+evaluate to true, the event MUST be attempted to be delivered. Absence of
+a filter or empty array implies a value of true.
+ |
+
ApiServerSourceStatus
diff --git a/pkg/adapter/apiserver/adapter.go b/pkg/adapter/apiserver/adapter.go
index 1a858735b51..bfab282d5d0 100644
--- a/pkg/adapter/apiserver/adapter.go
+++ b/pkg/adapter/apiserver/adapter.go
@@ -34,6 +34,8 @@ import (
"knative.dev/eventing/pkg/adapter/v2"
v1 "knative.dev/eventing/pkg/apis/sources/v1"
+ brokerfilter "knative.dev/eventing/pkg/broker/filter"
+ "knative.dev/eventing/pkg/eventfilter/subscriptionsapi"
)
type envConfig struct {
@@ -71,6 +73,7 @@ func (a *apiServerAdapter) start(ctx context.Context, stopCh <-chan struct{}) er
logger: a.logger,
ref: a.config.EventMode == v1.ReferenceMode,
apiServerSourceName: a.name,
+ filter: subscriptionsapi.NewAllFilter(brokerfilter.MaterializeFiltersList(a.logger.Desugar(), a.config.Filters)...),
}
if a.config.ResourceOwner != nil {
a.logger.Infow("will be filtered",
diff --git a/pkg/adapter/apiserver/adapter_test.go b/pkg/adapter/apiserver/adapter_test.go
index 71b9a86f7cc..79e2721884a 100644
--- a/pkg/adapter/apiserver/adapter_test.go
+++ b/pkg/adapter/apiserver/adapter_test.go
@@ -34,6 +34,9 @@ import (
dynamicfake "k8s.io/client-go/dynamic/fake"
kubetesting "k8s.io/client-go/testing"
adaptertest "knative.dev/eventing/pkg/adapter/v2/test"
+ eventingv1 "knative.dev/eventing/pkg/apis/eventing/v1"
+ brokerfilter "knative.dev/eventing/pkg/broker/filter"
+ "knative.dev/eventing/pkg/eventfilter/subscriptionsapi"
rectesting "knative.dev/eventing/pkg/reconciler/testing"
"knative.dev/pkg/logging"
pkgtesting "knative.dev/pkg/reconciler/testing"
@@ -289,21 +292,27 @@ func validateNotSent(t *testing.T, ce *adaptertest.TestCloudEventsClient, want s
func makeResourceAndTestingClient() (*resourceDelegate, *adaptertest.TestCloudEventsClient) {
ce := adaptertest.NewTestClient()
+ logger := zap.NewExample().Sugar()
+
return &resourceDelegate{
ce: ce,
source: "unit-test",
apiServerSourceName: apiServerSourceNameTest,
- logger: zap.NewExample().Sugar(),
+ logger: logger,
+ filter: subscriptionsapi.NewAllFilter(brokerfilter.MaterializeFiltersList(logger.Desugar(), []eventingv1.SubscriptionsAPIFilter{})...),
}, ce
}
func makeRefAndTestingClient() (*resourceDelegate, *adaptertest.TestCloudEventsClient) {
ce := adaptertest.NewTestClient()
+ logger := zap.NewExample().Sugar()
+
return &resourceDelegate{
ce: ce,
source: "unit-test",
apiServerSourceName: apiServerSourceNameTest,
logger: zap.NewExample().Sugar(),
ref: true,
+ filter: subscriptionsapi.NewAllFilter(brokerfilter.MaterializeFiltersList(logger.Desugar(), []eventingv1.SubscriptionsAPIFilter{})...),
}, ce
}
diff --git a/pkg/adapter/apiserver/config.go b/pkg/adapter/apiserver/config.go
index 19fca01177a..c85f7f4de85 100644
--- a/pkg/adapter/apiserver/config.go
+++ b/pkg/adapter/apiserver/config.go
@@ -17,6 +17,8 @@ package apiserver
import (
"k8s.io/apimachinery/pkg/runtime/schema"
+
+ eventingv1 "knative.dev/eventing/pkg/apis/eventing/v1"
v1 "knative.dev/eventing/pkg/apis/sources/v1"
)
@@ -56,4 +58,14 @@ type Config struct {
// Defaults to `Reference`
// +optional
EventMode string `json:"mode,omitempty"`
+
+ // Filters is an experimental field that conforms to the CNCF CloudEvents Subscriptions
+ // API. It's an array of filter expressions that evaluate to true or false.
+ // If any filter expression in the array evaluates to false, the event MUST
+ // NOT be sent to the Sink. If all the filter expressions in the array
+ // evaluate to true, the event MUST be attempted to be delivered. Absence of
+ // a filter or empty array implies a value of true.
+ //
+ // +optional
+ Filters []eventingv1.SubscriptionsAPIFilter `json:"filters,omitempty"`
}
diff --git a/pkg/adapter/apiserver/delegate.go b/pkg/adapter/apiserver/delegate.go
index e45d87e67ae..f78b3abe163 100644
--- a/pkg/adapter/apiserver/delegate.go
+++ b/pkg/adapter/apiserver/delegate.go
@@ -24,6 +24,7 @@ import (
"go.uber.org/zap"
"k8s.io/client-go/tools/cache"
"knative.dev/eventing/pkg/adapter/apiserver/events"
+ "knative.dev/eventing/pkg/eventfilter"
)
type resourceDelegate struct {
@@ -31,6 +32,7 @@ type resourceDelegate struct {
source string
ref bool
apiServerSourceName string
+ filter eventfilter.Filter
logger *zap.SugaredLogger
}
@@ -38,31 +40,36 @@ type resourceDelegate struct {
var _ cache.Store = (*resourceDelegate)(nil)
func (a *resourceDelegate) Add(obj interface{}) error {
- ctx, event, err := events.MakeAddEvent(a.source, a.apiServerSourceName, obj, a.ref)
- if err != nil {
- a.logger.Infow("event creation failed", zap.Error(err))
- return err
- }
- a.sendCloudEvent(ctx, event)
- return nil
+ return a.handleKubernetesObject(events.MakeAddEvent, obj)
}
func (a *resourceDelegate) Update(obj interface{}) error {
- ctx, event, err := events.MakeUpdateEvent(a.source, a.apiServerSourceName, obj, a.ref)
- if err != nil {
- a.logger.Info("event creation failed", zap.Error(err))
- return err
- }
- a.sendCloudEvent(ctx, event)
- return nil
+ return a.handleKubernetesObject(events.MakeUpdateEvent, obj)
}
func (a *resourceDelegate) Delete(obj interface{}) error {
- ctx, event, err := events.MakeDeleteEvent(a.source, a.apiServerSourceName, obj, a.ref)
+ return a.handleKubernetesObject(events.MakeDeleteEvent, obj)
+
+}
+
+// makeEventFunc represents the signature of the functions `events.Make*Event` so they can
+// be passed as a parameter
+type makeEventFunc func(string, string, interface{}, bool) (context.Context, cloudevents.Event, error)
+
+func (a *resourceDelegate) handleKubernetesObject(makeEvent makeEventFunc, obj interface{}) error {
+ ctx, event, err := makeEvent(a.source, a.apiServerSourceName, obj, a.ref)
+
if err != nil {
- a.logger.Info("event creation failed", zap.Error(err))
+ a.logger.Infow("event creation failed", zap.Error(err))
return err
}
+
+ filterResult := a.filter.Filter(ctx, event)
+ if filterResult == eventfilter.FailFilter {
+ a.logger.Debugf("event type %s filtered out", event.Type())
+ return nil
+ }
+
a.sendCloudEvent(ctx, event)
return nil
}
diff --git a/pkg/adapter/apiserver/delegate_test.go b/pkg/adapter/apiserver/delegate_test.go
index a8e9270885f..00fc9dfe691 100644
--- a/pkg/adapter/apiserver/delegate_test.go
+++ b/pkg/adapter/apiserver/delegate_test.go
@@ -18,7 +18,12 @@ package apiserver
import (
"testing"
+ "go.uber.org/zap"
+ adaptertest "knative.dev/eventing/pkg/adapter/v2/test"
+ eventingv1 "knative.dev/eventing/pkg/apis/eventing/v1"
"knative.dev/eventing/pkg/apis/sources"
+ brokerfilter "knative.dev/eventing/pkg/broker/filter"
+ "knative.dev/eventing/pkg/eventfilter/subscriptionsapi"
)
func TestResourceAddEvent(t *testing.T) {
@@ -68,3 +73,40 @@ func TestResourceStub(t *testing.T) {
d.Replace(nil, "")
d.Resync()
}
+
+func TestFilterFails(t *testing.T) {
+ ce := adaptertest.NewTestClient()
+ filters := []eventingv1.SubscriptionsAPIFilter{{
+ Exact: map[string]string{
+ "type": "dev.knative.apiserver.resource.add",
+ },
+ }}
+ logger := zap.NewExample().Sugar()
+ delegate := &resourceDelegate{
+ ce: ce,
+ source: "unit-test",
+ apiServerSourceName: apiServerSourceNameTest,
+ logger: logger,
+ filter: subscriptionsapi.NewAllFilter(brokerfilter.MaterializeFiltersList(logger.Desugar(), filters)...),
+ }
+
+ delegate.Update(simplePod("unit", "test"))
+ validateNotSent(t, ce, sources.ApiServerSourceUpdateEventType)
+}
+
+func TestEmptyFiltersList(t *testing.T) {
+ ce := adaptertest.NewTestClient()
+ filters := []eventingv1.SubscriptionsAPIFilter{}
+
+ logger := zap.NewExample().Sugar()
+ delegate := &resourceDelegate{
+ ce: ce,
+ source: "unit-test",
+ apiServerSourceName: apiServerSourceNameTest,
+ logger: logger,
+ filter: subscriptionsapi.NewAllFilter(brokerfilter.MaterializeFiltersList(logger.Desugar(), filters)...),
+ }
+
+ delegate.Update(simplePod("unit", "test"))
+ validateSent(t, ce, sources.ApiServerSourceUpdateEventType)
+}
diff --git a/pkg/apis/feature/features.go b/pkg/apis/feature/features.go
index 9fc57664e53..e01195bb5f6 100644
--- a/pkg/apis/feature/features.go
+++ b/pkg/apis/feature/features.go
@@ -61,6 +61,7 @@ func newDefaults() Flags {
TransportEncryption: Disabled,
OIDCAuthentication: Disabled,
EvenTypeAutoCreate: Disabled,
+ NewAPIServerFilters: Disabled,
}
}
diff --git a/pkg/apis/feature/flag_names.go b/pkg/apis/feature/flag_names.go
index 40de28fcaa3..cd937554c4b 100644
--- a/pkg/apis/feature/flag_names.go
+++ b/pkg/apis/feature/flag_names.go
@@ -27,4 +27,5 @@ const (
OIDCAuthentication = "authentication-oidc"
NodeSelectorLabel = "apiserversources-nodeselector-"
CrossNamespaceEventLinks = "cross-namespace-event-links"
+ NewAPIServerFilters = "new-apiserversource-filters"
)
diff --git a/pkg/apis/sources/v1/apiserver_types.go b/pkg/apis/sources/v1/apiserver_types.go
index cfe41a956b1..ddd6332f359 100644
--- a/pkg/apis/sources/v1/apiserver_types.go
+++ b/pkg/apis/sources/v1/apiserver_types.go
@@ -19,6 +19,7 @@ package v1
import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
+ eventingv1 "knative.dev/eventing/pkg/apis/eventing/v1"
"knative.dev/pkg/apis"
duckv1 "knative.dev/pkg/apis/duck/v1"
"knative.dev/pkg/kmeta"
@@ -85,6 +86,16 @@ type ApiServerSourceSpec struct {
// should be watched by the source.
// +optional
NamespaceSelector *metav1.LabelSelector `json:"namespaceSelector,omitempty"`
+
+ // Filters is an experimental field that conforms to the CNCF CloudEvents Subscriptions
+ // API. It's an array of filter expressions that evaluate to true or false.
+ // If any filter expression in the array evaluates to false, the event MUST
+ // NOT be sent to the Sink. If all the filter expressions in the array
+ // evaluate to true, the event MUST be attempted to be delivered. Absence of
+ // a filter or empty array implies a value of true.
+ //
+ // +optional
+ Filters []eventingv1.SubscriptionsAPIFilter `json:"filters,omitempty"`
}
// ApiServerSourceStatus defines the observed state of ApiServerSource
diff --git a/pkg/apis/sources/v1/apiserver_validation.go b/pkg/apis/sources/v1/apiserver_validation.go
index 1b4774029c0..0eacb2dec94 100644
--- a/pkg/apis/sources/v1/apiserver_validation.go
+++ b/pkg/apis/sources/v1/apiserver_validation.go
@@ -22,6 +22,8 @@ import (
"k8s.io/apimachinery/pkg/runtime/schema"
+ eventingv1 "knative.dev/eventing/pkg/apis/eventing/v1"
+ "knative.dev/eventing/pkg/apis/feature"
"knative.dev/pkg/apis"
)
@@ -73,5 +75,22 @@ func (cs *ApiServerSourceSpec) Validate(ctx context.Context) *apis.FieldError {
}
}
errs = errs.Also(cs.SourceSpec.Validate(ctx))
+ errs = errs.Also(validateSubscriptionAPIFiltersList(ctx, cs.Filters).ViaField("filters"))
+ return errs
+}
+
+func validateSubscriptionAPIFiltersList(ctx context.Context, filters []eventingv1.SubscriptionsAPIFilter) (errs *apis.FieldError) {
+ if !feature.FromContext(ctx).IsEnabled(feature.NewAPIServerFilters) {
+ if len(filters) != 0 {
+ return errs.Also(apis.ErrGeneric("Filters is not empty but the NewAPIServerFilters feature is disabled."))
+ }
+
+ return nil
+ }
+
+ for i, f := range filters {
+ f := f
+ errs = errs.Also(eventingv1.ValidateSubscriptionAPIFilter(ctx, &f)).ViaIndex(i)
+ }
return errs
}
diff --git a/pkg/apis/sources/v1/apiserver_validation_test.go b/pkg/apis/sources/v1/apiserver_validation_test.go
index 9c6b509fe88..57646f3e23d 100644
--- a/pkg/apis/sources/v1/apiserver_validation_test.go
+++ b/pkg/apis/sources/v1/apiserver_validation_test.go
@@ -23,9 +23,11 @@ import (
"github.com/stretchr/testify/assert"
+ eventingv1 "knative.dev/eventing/pkg/apis/eventing/v1"
duckv1 "knative.dev/pkg/apis/duck/v1"
"github.com/google/go-cmp/cmp"
+ "knative.dev/eventing/pkg/apis/feature"
"knative.dev/pkg/apis"
)
@@ -266,3 +268,82 @@ func TestAPIServerValidationCallsSpecValidation(t *testing.T) {
err := source.Validate(context.TODO())
assert.EqualError(t, err, "missing field(s): spec.resources", "Spec is not validated!")
}
+
+func TestAPIServerFiltersValidation(t *testing.T) {
+ tests := []struct {
+ name string
+ featureState feature.Flag
+ want error
+ filters []eventingv1.SubscriptionsAPIFilter
+ }{{
+ name: "an error is raised if the feature is disabled but filters are specified",
+ featureState: feature.Disabled,
+ filters: []eventingv1.SubscriptionsAPIFilter{{
+ Prefix: map[string]string{
+ "invALID": "abc",
+ },
+ }},
+ want: apis.ErrGeneric("Filters is not empty but the NewAPIServerFilters feature is disabled."),
+ }, {
+ name: "filters are validated when the feature is enabled",
+ featureState: feature.Enabled,
+ filters: []eventingv1.SubscriptionsAPIFilter{{
+ Prefix: map[string]string{
+ "invALID": "abc",
+ },
+ }},
+ want: apis.ErrInvalidKeyName("invALID", apis.CurrentField,
+ "Attribute name must start with a letter and can only contain "+
+ "lowercase alphanumeric").ViaFieldKey("prefix", "invALID").ViaFieldIndex("filters", 0),
+ }, {
+ name: "validation works for valid filters",
+ featureState: feature.Enabled,
+ filters: []eventingv1.SubscriptionsAPIFilter{{
+ Exact: map[string]string{"myattr": "myval"},
+ }},
+ want: nil,
+ }, {
+ name: "validation works for empty filters",
+ featureState: feature.Enabled,
+ filters: []eventingv1.SubscriptionsAPIFilter{},
+ want: nil,
+ }, {
+ name: "validation does not work for empty filters",
+ featureState: feature.Disabled,
+ filters: []eventingv1.SubscriptionsAPIFilter{},
+ want: nil,
+ }}
+
+ for _, test := range tests {
+ t.Run(test.name, func(t *testing.T) {
+ featureContext := feature.ToContext(context.TODO(), feature.Flags{
+ feature.NewAPIServerFilters: test.featureState,
+ })
+ apiserversource := &ApiServerSourceSpec{
+ Filters: test.filters,
+ EventMode: "Resource",
+ Resources: []APIVersionKindSelector{{
+ APIVersion: "v1",
+ Kind: "Foo",
+ }},
+ SourceSpec: duckv1.SourceSpec{
+ Sink: duckv1.Destination{
+ Ref: &duckv1.KReference{
+ APIVersion: "v1",
+ Kind: "broker",
+ Name: "default",
+ },
+ },
+ },
+ }
+ got := apiserversource.Validate(featureContext)
+ if test.want != nil {
+ if diff := cmp.Diff(test.want.Error(), got.Error()); diff != "" {
+ t.Errorf("APIServerSourceSpec.Validate (-want, +got) = %v", diff)
+ }
+ } else if got != nil {
+ t.Errorf("APIServerSourceSpec.Validate wanted nil, got = %v", got.Error())
+ }
+ })
+ }
+}
diff --git a/pkg/apis/sources/v1/zz_generated.deepcopy.go b/pkg/apis/sources/v1/zz_generated.deepcopy.go
index 6d175e3c960..8de185540fc 100644
--- a/pkg/apis/sources/v1/zz_generated.deepcopy.go
+++ b/pkg/apis/sources/v1/zz_generated.deepcopy.go
@@ -24,6 +24,7 @@ package v1
import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
runtime "k8s.io/apimachinery/pkg/runtime"
+ eventingv1 "knative.dev/eventing/pkg/apis/eventing/v1"
)
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
@@ -145,6 +146,13 @@ func (in *ApiServerSourceSpec) DeepCopyInto(out *ApiServerSourceSpec) {
*out = new(metav1.LabelSelector)
(*in).DeepCopyInto(*out)
}
+ if in.Filters != nil {
+ in, out := &in.Filters, &out.Filters
+ *out = make([]eventingv1.SubscriptionsAPIFilter, len(*in))
+ for i := range *in {
+ (*in)[i].DeepCopyInto(&(*out)[i])
+ }
+ }
return
}
diff --git a/pkg/broker/filter/filter_handler.go b/pkg/broker/filter/filter_handler.go
index 6ee15200be9..6ded61139e5 100644
--- a/pkg/broker/filter/filter_handler.go
+++ b/pkg/broker/filter/filter_handler.go
@@ -545,7 +545,7 @@ func createSubscriptionsAPIFilters(logger *zap.Logger, trigger *eventingv1.Trigg
logger.Debug("Found no filters for trigger", zap.Any("trigger.Spec", trigger.Spec))
return subscriptionsapi.NewNoFilter()
}
- return subscriptionsapi.NewAllFilter(materializeFiltersList(logger, trigger.Spec.Filters)...)
+ return subscriptionsapi.NewAllFilter(MaterializeFiltersList(logger, trigger.Spec.Filters)...)
}
func materializeSubscriptionsAPIFilter(logger *zap.Logger, filter eventingv1.SubscriptionsAPIFilter) eventfilter.Filter {
@@ -574,9 +574,9 @@ func materializeSubscriptionsAPIFilter(logger *zap.Logger, filter eventingv1.Sub
return nil
}
case len(filter.All) > 0:
- materializedFilter = subscriptionsapi.NewAllFilter(materializeFiltersList(logger, filter.All)...)
+ materializedFilter = subscriptionsapi.NewAllFilter(MaterializeFiltersList(logger, filter.All)...)
case len(filter.Any) > 0:
- materializedFilter = subscriptionsapi.NewAnyFilter(materializeFiltersList(logger, filter.Any)...)
+ materializedFilter = subscriptionsapi.NewAnyFilter(MaterializeFiltersList(logger, filter.Any)...)
case filter.Not != nil:
materializedFilter = subscriptionsapi.NewNotFilter(materializeSubscriptionsAPIFilter(logger, *filter.Not))
case filter.CESQL != "":
@@ -589,7 +589,8 @@ func materializeSubscriptionsAPIFilter(logger *zap.Logger, filter eventingv1.Sub
return materializedFilter
}
-func materializeFiltersList(logger *zap.Logger, filters []eventingv1.SubscriptionsAPIFilter) []eventfilter.Filter {
+// MaterialzieFilterList allows any component that supports `SubscriptionsAPIFilter` to process them
+func MaterializeFiltersList(logger *zap.Logger, filters []eventingv1.SubscriptionsAPIFilter) []eventfilter.Filter {
materializedFilters := make([]eventfilter.Filter, 0, len(filters))
for _, f := range filters {
f := materializeSubscriptionsAPIFilter(logger, f)
diff --git a/pkg/reconciler/apiserversource/resources/receive_adapter.go b/pkg/reconciler/apiserversource/resources/receive_adapter.go
index 6b3bfe0827b..0997c8c7632 100644
--- a/pkg/reconciler/apiserversource/resources/receive_adapter.go
+++ b/pkg/reconciler/apiserversource/resources/receive_adapter.go
@@ -127,6 +127,7 @@ func makeEnv(args *ReceiveAdapterArgs) ([]corev1.EnvVar, error) {
ResourceOwner: args.Source.Spec.ResourceOwner,
EventMode: args.Source.Spec.EventMode,
AllNamespaces: args.AllNamespaces,
+ Filters: args.Source.Spec.Filters,
}
for _, r := range args.Source.Spec.Resources {
diff --git a/test/config/config-features.yaml b/test/config/config-features.yaml
new file mode 100644
index 00000000000..30194a008de
--- /dev/null
+++ b/test/config/config-features.yaml
@@ -0,0 +1,61 @@
+# Copyright 2024 The Knative Authors
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# https://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+apiVersion: v1
+kind: ConfigMap
+metadata:
+ name: config-features
+ namespace: knative-eventing
+ labels:
+data:
+ # ALPHA feature: The kreference-group allows you to use the Group field in KReferences.
+ # For more details: https://github.com/knative/eventing/issues/5086
+ kreference-group: "disabled"
+
+ # ALPHA feature: The delivery-retryafter allows you to use the RetryAfter field in DeliverySpec.
+ # For more details: https://github.com/knative/eventing/issues/5811
+ delivery-retryafter: "disabled"
+
+ # BETA feature: The delivery-timeout allows you to use the Timeout field in DeliverySpec.
+ # For more details: https://github.com/knative/eventing/issues/5148
+ delivery-timeout: "enabled"
+
+ # ALPHA feature: The kreference-mapping allows you to map kreference onto templated URI
+ # For more details: https://github.com/knative/eventing/issues/5593
+ kreference-mapping: "disabled"
+
+ # BETA feature: The new-trigger-filters flag allows you to use the new `filters` field
+ # in Trigger objects with its rich filtering capabilities.
+ # For more details: https://github.com/knative/eventing/issues/5204
+ new-trigger-filters: "enabled"
+
+ # ALPHA feature: The transport-encryption flag allows you to encrypt events in transit using the transport layer security (TLS) protocol.
+ # For more details: https://github.com/knative/eventing/issues/5957
+ transport-encryption: "disabled"
+
+ # ALPHA feature: The eventtype-auto-create flag allows automatic creation of Even Type instances based on Event's type being processed.
+ # For more details: https://github.com/knative/eventing/issues/6909
+ eventtype-auto-create: "disabled"
+
+ # ALPHA feature: The aauthentication-oidc flag allows you to use OIDC authentication for Eventing.
+ # For more details: https://github.com/knative/eventing/issues/7174
+ authentication-oidc: "disabled"
+
+ # ALPHA feature: The cross-namespace-event-links flag allows you to use cross-namespace referencing for Eventing.
+ # For more details: https://github.com/knative/eventing/issues/7739
+ cross-namespace-event-links: "disabled"
+
+ # ALPHA feature: The new-apiserversource-filters flag allows you to use the new `filters` field
+ # in APIServerSource objects with its rich filtering capabilities.
+ new-apiserversource-filters: "enabled"
diff --git a/test/e2e-common.sh b/test/e2e-common.sh
index 0ef9cde0f27..d629cc6081f 100755
--- a/test/e2e-common.sh
+++ b/test/e2e-common.sh
@@ -87,6 +87,8 @@ function knative_setup() {
enable_sugar || fail_test "Could not enable Sugar Controller Injection"
unleash_duck || fail_test "Could not unleash the chaos duck"
+
+ install_feature_cm || fail_test "Could not install features configmap"
}
function scale_controlplane() {
@@ -271,6 +273,14 @@ function unleash_duck() {
if (( SCALE_CHAOSDUCK_TO_ZERO )); then kubectl -n "${SYSTEM_NAMESPACE}" scale deployment/chaosduck --replicas=0; fi
}
+function install_feature_cm() {
+ KO_FLAGS="${KO_FLAGS:-}"
+ echo "install feature configmap"
+ cat test/config/config-features.yaml | \
+ sed "s/namespace: ${KNATIVE_DEFAULT_NAMESPACE}/namespace: ${SYSTEM_NAMESPACE}/g" | \
+ ko apply "${KO_FLAGS}" -f - || return $?
+}
+
# Teardown the Knative environment after tests finish.
function knative_teardown() {
echo ">> Stopping Knative Eventing"
diff --git a/test/rekt/apiserversource_test.go b/test/rekt/apiserversource_test.go
index e2641fc3ba8..f9ff6f9094e 100644
--- a/test/rekt/apiserversource_test.go
+++ b/test/rekt/apiserversource_test.go
@@ -212,3 +212,17 @@ func TestApiServerSourceDeployment(t *testing.T) {
env.Test(ctx, t, apiserversourcefeatures.DeployAPIServerSourceWithNodeSelector())
}
+
+func TestApiServerSourceNewFiltersFeature(t *testing.T) {
+ t.Parallel()
+
+ ctx, env := global.Environment(
+ knative.WithKnativeNamespace(system.Namespace()),
+ knative.WithLoggingConfig,
+ knative.WithTracingConfig,
+ k8s.WithEventListener,
+ environment.Managed(t),
+ )
+
+ env.TestSet(ctx, t, apiserversourcefeatures.NewFiltersFeature())
+}
diff --git a/test/rekt/features/apiserversource/data_plane.go b/test/rekt/features/apiserversource/data_plane.go
index 01661db1716..40b31c6fa73 100644
--- a/test/rekt/features/apiserversource/data_plane.go
+++ b/test/rekt/features/apiserversource/data_plane.go
@@ -21,6 +21,7 @@ import (
"fmt"
"github.com/cloudevents/sdk-go/v2/test"
+ eventingv1 "knative.dev/eventing/pkg/apis/eventing/v1"
"knative.dev/pkg/apis"
duckv1 "knative.dev/pkg/apis/duck/v1"
"knative.dev/pkg/network"
@@ -947,3 +948,68 @@ func SendsEventsWithBrokerAsSinkTLS() *feature.Feature {
return f
}
+
+func NewFiltersFeature() *feature.FeatureSet {
+ fs := &feature.FeatureSet{
+ Name: "Knative ApiServerSource - Features - New Filter",
+ Features: []*feature.Feature{
+ EventsAreFilteredOut(),
+ },
+ }
+ return fs
+}
+
+func EventsAreFilteredOut() *feature.Feature {
+ source := feature.MakeRandomK8sName("apiserversource")
+ sink := feature.MakeRandomK8sName("sink")
+ f := feature.NewFeatureNamed("Filters properly the messages")
+
+ f.Setup("install sink", eventshub.Install(sink, eventshub.StartReceiver))
+
+ sacmName := feature.MakeRandomK8sName("apiserversource")
+ f.Setup("Create Service Account for ApiServerSource with RBAC for v1.Pod resources",
+ setupAccountAndRoleForPods(sacmName))
+
+ cfg := []manifest.CfgFn{
+ apiserversource.WithServiceAccountName(sacmName),
+ apiserversource.WithEventMode(v1.ResourceMode),
+ apiserversource.WithSink(service.AsDestinationRef(sink)),
+ apiserversource.WithFilters([]eventingv1.SubscriptionsAPIFilter{{
+ Exact: map[string]string{
+ "type": "dev.knative.apiserver.resource.update",
+ },
+ }}),
+ apiserversource.WithResources(v1.APIVersionKindSelector{
+ APIVersion: "v1",
+ Kind: "Pod",
+ }),
+ }
+
+ f.Setup("install ApiServerSource", apiserversource.Install(source, cfg...))
+ f.Setup("ApiServerSource goes ready", apiserversource.IsReady(source))
+
+ examplePodName := feature.MakeRandomK8sName("example")
+
+ // create a pod so that ApiServerSource delivers an event to its sink
+ // event body is similar to this:
+ // {"kind":"Pod","namespace":"test-wmbcixlv","name":"example-axvlzbvc","apiVersion":"v1"}
+ f.Requirement("install example pod", pod.Install(examplePodName, exampleImage))
+
+ f.Stable("ApiServerSource as event source").
+ Must("delivers events",
+ eventassert.OnStore(sink).MatchEvent(
+ test.HasType("dev.knative.apiserver.resource.add"),
+ test.HasExtensions(map[string]interface{}{"apiversion": "v1"}),
+ test.DataContains(`"kind":"Pod"`),
+ test.DataContains(fmt.Sprintf(`"name":"%s"`, examplePodName)),
+ ).Exact(0)).
+ Must("delivers events",
+ eventassert.OnStore(sink).MatchEvent(
+ test.HasType("dev.knative.apiserver.resource.update"),
+ test.HasExtensions(map[string]interface{}{"apiversion": "v1"}),
+ test.DataContains(`"kind":"Pod"`),
+ test.DataContains(fmt.Sprintf(`"name":"%s"`, examplePodName)),
+ ).AtLeast(1))
+
+ return f
+}
diff --git a/test/rekt/resources/apiserversource/apiserversource.go b/test/rekt/resources/apiserversource/apiserversource.go
index 8b92fbeaaa3..158243b8555 100644
--- a/test/rekt/resources/apiserversource/apiserversource.go
+++ b/test/rekt/resources/apiserversource/apiserversource.go
@@ -19,6 +19,7 @@ package apiserversource
import (
"context"
"embed"
+ "encoding/json"
"fmt"
"strings"
"time"
@@ -27,6 +28,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
+ eventingv1 "knative.dev/eventing/pkg/apis/eventing/v1"
"knative.dev/reconciler-test/pkg/environment"
"knative.dev/reconciler-test/pkg/feature"
"knative.dev/reconciler-test/pkg/k8s"
@@ -37,6 +39,8 @@ import (
duckv1 "knative.dev/pkg/apis/duck/v1"
"knative.dev/pkg/kmeta"
"knative.dev/reconciler-test/pkg/manifest"
+
+ yamllib "sigs.k8s.io/yaml"
)
//go:embed *.yaml
@@ -264,3 +268,26 @@ func ResetNodeLabels(ctx context.Context, t feature.T) {
t.Fatalf("Could not update node: %v", err)
}
}
+
+func WithFilters(filters []eventingv1.SubscriptionsAPIFilter) manifest.CfgFn {
+ jsonBytes, err := json.Marshal(filters)
+ if err != nil {
+ panic(err)
+ }
+
+ yamlBytes, err := yamllib.JSONToYAML(jsonBytes)
+ if err != nil {
+ panic(err)
+ }
+
+ filtersYaml := string(yamlBytes)
+ lines := strings.Split(filtersYaml, "\n")
+ out := make([]string, 0, len(lines))
+ for i := range lines {
+ out = append(out, " "+lines[i])
+ }
+
+ return func(cfg map[string]interface{}) {
+ cfg["filters"] = strings.Join(out, "\n")
+ }
+}
diff --git a/test/rekt/resources/apiserversource/apiserversource.yaml b/test/rekt/resources/apiserversource/apiserversource.yaml
index 57da274150b..82e3350e2a1 100644
--- a/test/rekt/resources/apiserversource/apiserversource.yaml
+++ b/test/rekt/resources/apiserversource/apiserversource.yaml
@@ -87,3 +87,7 @@ spec:
audience: {{ .sink.audience }}
{{ end }}
{{ end }}
+ {{ if .filters }}
+ filters:
+{{ .filters }}
+ {{ end }}