diff --git a/config/core/configmaps/features.yaml b/config/core/configmaps/features.yaml index 96da3544105..68f6ca282d7 100644 --- a/config/core/configmaps/features.yaml +++ b/config/core/configmaps/features.yaml @@ -60,3 +60,6 @@ data: # For more details: https://github.com/knative/eventing/issues/7739 cross-namespace-event-links: "disabled" + # ALPHA feature: The new-apiserversource-filters flag allows you to use the new `filters` field + # in APIServerSource objects with its rich filtering capabilities. + new-apiserversource-filters: "disabled" diff --git a/config/core/resources/apiserversource.yaml b/config/core/resources/apiserversource.yaml index cf0c0ea0033..d387d4e7b0a 100644 --- a/config/core/resources/apiserversource.yaml +++ b/config/core/resources/apiserversource.yaml @@ -67,6 +67,7 @@ spec: properties: spec: type: object + x-kubernetes-preserve-unknown-fields: true required: - resources properties: diff --git a/docs/eventing-api.md b/docs/eventing-api.md index 6d06fccf261..44c4aa32287 100644 --- a/docs/eventing-api.md +++ b/docs/eventing-api.md @@ -2101,7 +2101,7 @@ resolved delivery options.

SubscriptionsAPIFilter

-(Appears on:SubscriptionsAPIFilter, TriggerSpec) +(Appears on:SubscriptionsAPIFilter, TriggerSpec, ApiServerSourceSpec)

SubscriptionsAPIFilter allows defining a filter expression using CloudEvents @@ -5327,6 +5327,25 @@ Kubernetes meta/v1.LabelSelector should be watched by the source.

+ + +filters
+ + +[]SubscriptionsAPIFilter + + + + +(Optional) +

Filters is an experimental field that conforms to the CNCF CloudEvents Subscriptions +API. It’s an array of filter expressions that evaluate to true or false. +If any filter expression in the array evaluates to false, the event MUST +NOT be sent to the Sink. If all the filter expressions in the array +evaluate to true, the event MUST be attempted to be delivered. Absence of +a filter or empty array implies a value of true.

+ + @@ -5934,6 +5953,25 @@ Kubernetes meta/v1.LabelSelector should be watched by the source.

+ + +filters
+ + +[]SubscriptionsAPIFilter + + + + +(Optional) +

Filters is an experimental field that conforms to the CNCF CloudEvents Subscriptions +API. It’s an array of filter expressions that evaluate to true or false. +If any filter expression in the array evaluates to false, the event MUST +NOT be sent to the Sink. If all the filter expressions in the array +evaluate to true, the event MUST be attempted to be delivered. Absence of +a filter or empty array implies a value of true.

+ +

ApiServerSourceStatus diff --git a/pkg/adapter/apiserver/adapter.go b/pkg/adapter/apiserver/adapter.go index 1a858735b51..bfab282d5d0 100644 --- a/pkg/adapter/apiserver/adapter.go +++ b/pkg/adapter/apiserver/adapter.go @@ -34,6 +34,8 @@ import ( "knative.dev/eventing/pkg/adapter/v2" v1 "knative.dev/eventing/pkg/apis/sources/v1" + brokerfilter "knative.dev/eventing/pkg/broker/filter" + "knative.dev/eventing/pkg/eventfilter/subscriptionsapi" ) type envConfig struct { @@ -71,6 +73,7 @@ func (a *apiServerAdapter) start(ctx context.Context, stopCh <-chan struct{}) er logger: a.logger, ref: a.config.EventMode == v1.ReferenceMode, apiServerSourceName: a.name, + filter: subscriptionsapi.NewAllFilter(brokerfilter.MaterializeFiltersList(a.logger.Desugar(), a.config.Filters)...), } if a.config.ResourceOwner != nil { a.logger.Infow("will be filtered", diff --git a/pkg/adapter/apiserver/adapter_test.go b/pkg/adapter/apiserver/adapter_test.go index 71b9a86f7cc..79e2721884a 100644 --- a/pkg/adapter/apiserver/adapter_test.go +++ b/pkg/adapter/apiserver/adapter_test.go @@ -34,6 +34,9 @@ import ( dynamicfake "k8s.io/client-go/dynamic/fake" kubetesting "k8s.io/client-go/testing" adaptertest "knative.dev/eventing/pkg/adapter/v2/test" + eventingv1 "knative.dev/eventing/pkg/apis/eventing/v1" + brokerfilter "knative.dev/eventing/pkg/broker/filter" + "knative.dev/eventing/pkg/eventfilter/subscriptionsapi" rectesting "knative.dev/eventing/pkg/reconciler/testing" "knative.dev/pkg/logging" pkgtesting "knative.dev/pkg/reconciler/testing" @@ -289,21 +292,27 @@ func validateNotSent(t *testing.T, ce *adaptertest.TestCloudEventsClient, want s func makeResourceAndTestingClient() (*resourceDelegate, *adaptertest.TestCloudEventsClient) { ce := adaptertest.NewTestClient() + logger := zap.NewExample().Sugar() + return &resourceDelegate{ ce: ce, source: "unit-test", apiServerSourceName: apiServerSourceNameTest, - logger: zap.NewExample().Sugar(), + logger: logger, + filter: subscriptionsapi.NewAllFilter(brokerfilter.MaterializeFiltersList(logger.Desugar(), []eventingv1.SubscriptionsAPIFilter{})...), }, ce } func makeRefAndTestingClient() (*resourceDelegate, *adaptertest.TestCloudEventsClient) { ce := adaptertest.NewTestClient() + logger := zap.NewExample().Sugar() + return &resourceDelegate{ ce: ce, source: "unit-test", apiServerSourceName: apiServerSourceNameTest, logger: zap.NewExample().Sugar(), ref: true, + filter: subscriptionsapi.NewAllFilter(brokerfilter.MaterializeFiltersList(logger.Desugar(), []eventingv1.SubscriptionsAPIFilter{})...), }, ce } diff --git a/pkg/adapter/apiserver/config.go b/pkg/adapter/apiserver/config.go index 19fca01177a..c85f7f4de85 100644 --- a/pkg/adapter/apiserver/config.go +++ b/pkg/adapter/apiserver/config.go @@ -17,6 +17,8 @@ package apiserver import ( "k8s.io/apimachinery/pkg/runtime/schema" + + eventingv1 "knative.dev/eventing/pkg/apis/eventing/v1" v1 "knative.dev/eventing/pkg/apis/sources/v1" ) @@ -56,4 +58,14 @@ type Config struct { // Defaults to `Reference` // +optional EventMode string `json:"mode,omitempty"` + + // Filters is an experimental field that conforms to the CNCF CloudEvents Subscriptions + // API. It's an array of filter expressions that evaluate to true or false. + // If any filter expression in the array evaluates to false, the event MUST + // NOT be sent to the Sink. If all the filter expressions in the array + // evaluate to true, the event MUST be attempted to be delivered. Absence of + // a filter or empty array implies a value of true. + // + // +optional + Filters []eventingv1.SubscriptionsAPIFilter `json:"filters,omitempty"` } diff --git a/pkg/adapter/apiserver/delegate.go b/pkg/adapter/apiserver/delegate.go index e45d87e67ae..f78b3abe163 100644 --- a/pkg/adapter/apiserver/delegate.go +++ b/pkg/adapter/apiserver/delegate.go @@ -24,6 +24,7 @@ import ( "go.uber.org/zap" "k8s.io/client-go/tools/cache" "knative.dev/eventing/pkg/adapter/apiserver/events" + "knative.dev/eventing/pkg/eventfilter" ) type resourceDelegate struct { @@ -31,6 +32,7 @@ type resourceDelegate struct { source string ref bool apiServerSourceName string + filter eventfilter.Filter logger *zap.SugaredLogger } @@ -38,31 +40,36 @@ type resourceDelegate struct { var _ cache.Store = (*resourceDelegate)(nil) func (a *resourceDelegate) Add(obj interface{}) error { - ctx, event, err := events.MakeAddEvent(a.source, a.apiServerSourceName, obj, a.ref) - if err != nil { - a.logger.Infow("event creation failed", zap.Error(err)) - return err - } - a.sendCloudEvent(ctx, event) - return nil + return a.handleKubernetesObject(events.MakeAddEvent, obj) } func (a *resourceDelegate) Update(obj interface{}) error { - ctx, event, err := events.MakeUpdateEvent(a.source, a.apiServerSourceName, obj, a.ref) - if err != nil { - a.logger.Info("event creation failed", zap.Error(err)) - return err - } - a.sendCloudEvent(ctx, event) - return nil + return a.handleKubernetesObject(events.MakeUpdateEvent, obj) } func (a *resourceDelegate) Delete(obj interface{}) error { - ctx, event, err := events.MakeDeleteEvent(a.source, a.apiServerSourceName, obj, a.ref) + return a.handleKubernetesObject(events.MakeDeleteEvent, obj) + +} + +// makeEventFunc represents the signature of the functions `events.Make*Event` so they can +// be passed as a parameter +type makeEventFunc func(string, string, interface{}, bool) (context.Context, cloudevents.Event, error) + +func (a *resourceDelegate) handleKubernetesObject(makeEvent makeEventFunc, obj interface{}) error { + ctx, event, err := makeEvent(a.source, a.apiServerSourceName, obj, a.ref) + if err != nil { - a.logger.Info("event creation failed", zap.Error(err)) + a.logger.Infow("event creation failed", zap.Error(err)) return err } + + filterResult := a.filter.Filter(ctx, event) + if filterResult == eventfilter.FailFilter { + a.logger.Debugf("event type %s filtered out", event.Type()) + return nil + } + a.sendCloudEvent(ctx, event) return nil } diff --git a/pkg/adapter/apiserver/delegate_test.go b/pkg/adapter/apiserver/delegate_test.go index a8e9270885f..00fc9dfe691 100644 --- a/pkg/adapter/apiserver/delegate_test.go +++ b/pkg/adapter/apiserver/delegate_test.go @@ -18,7 +18,12 @@ package apiserver import ( "testing" + "go.uber.org/zap" + adaptertest "knative.dev/eventing/pkg/adapter/v2/test" + eventingv1 "knative.dev/eventing/pkg/apis/eventing/v1" "knative.dev/eventing/pkg/apis/sources" + brokerfilter "knative.dev/eventing/pkg/broker/filter" + "knative.dev/eventing/pkg/eventfilter/subscriptionsapi" ) func TestResourceAddEvent(t *testing.T) { @@ -68,3 +73,40 @@ func TestResourceStub(t *testing.T) { d.Replace(nil, "") d.Resync() } + +func TestFilterFails(t *testing.T) { + ce := adaptertest.NewTestClient() + filters := []eventingv1.SubscriptionsAPIFilter{{ + Exact: map[string]string{ + "type": "dev.knative.apiserver.resource.add", + }, + }} + logger := zap.NewExample().Sugar() + delegate := &resourceDelegate{ + ce: ce, + source: "unit-test", + apiServerSourceName: apiServerSourceNameTest, + logger: logger, + filter: subscriptionsapi.NewAllFilter(brokerfilter.MaterializeFiltersList(logger.Desugar(), filters)...), + } + + delegate.Update(simplePod("unit", "test")) + validateNotSent(t, ce, sources.ApiServerSourceUpdateEventType) +} + +func TestEmptyFiltersList(t *testing.T) { + ce := adaptertest.NewTestClient() + filters := []eventingv1.SubscriptionsAPIFilter{} + + logger := zap.NewExample().Sugar() + delegate := &resourceDelegate{ + ce: ce, + source: "unit-test", + apiServerSourceName: apiServerSourceNameTest, + logger: logger, + filter: subscriptionsapi.NewAllFilter(brokerfilter.MaterializeFiltersList(logger.Desugar(), filters)...), + } + + delegate.Update(simplePod("unit", "test")) + validateSent(t, ce, sources.ApiServerSourceUpdateEventType) +} diff --git a/pkg/apis/feature/features.go b/pkg/apis/feature/features.go index 9fc57664e53..e01195bb5f6 100644 --- a/pkg/apis/feature/features.go +++ b/pkg/apis/feature/features.go @@ -61,6 +61,7 @@ func newDefaults() Flags { TransportEncryption: Disabled, OIDCAuthentication: Disabled, EvenTypeAutoCreate: Disabled, + NewAPIServerFilters: Disabled, } } diff --git a/pkg/apis/feature/flag_names.go b/pkg/apis/feature/flag_names.go index 40de28fcaa3..cd937554c4b 100644 --- a/pkg/apis/feature/flag_names.go +++ b/pkg/apis/feature/flag_names.go @@ -27,4 +27,5 @@ const ( OIDCAuthentication = "authentication-oidc" NodeSelectorLabel = "apiserversources-nodeselector-" CrossNamespaceEventLinks = "cross-namespace-event-links" + NewAPIServerFilters = "new-apiserversource-filters" ) diff --git a/pkg/apis/sources/v1/apiserver_types.go b/pkg/apis/sources/v1/apiserver_types.go index cfe41a956b1..ddd6332f359 100644 --- a/pkg/apis/sources/v1/apiserver_types.go +++ b/pkg/apis/sources/v1/apiserver_types.go @@ -19,6 +19,7 @@ package v1 import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" + eventingv1 "knative.dev/eventing/pkg/apis/eventing/v1" "knative.dev/pkg/apis" duckv1 "knative.dev/pkg/apis/duck/v1" "knative.dev/pkg/kmeta" @@ -85,6 +86,16 @@ type ApiServerSourceSpec struct { // should be watched by the source. // +optional NamespaceSelector *metav1.LabelSelector `json:"namespaceSelector,omitempty"` + + // Filters is an experimental field that conforms to the CNCF CloudEvents Subscriptions + // API. It's an array of filter expressions that evaluate to true or false. + // If any filter expression in the array evaluates to false, the event MUST + // NOT be sent to the Sink. If all the filter expressions in the array + // evaluate to true, the event MUST be attempted to be delivered. Absence of + // a filter or empty array implies a value of true. + // + // +optional + Filters []eventingv1.SubscriptionsAPIFilter `json:"filters,omitempty"` } // ApiServerSourceStatus defines the observed state of ApiServerSource diff --git a/pkg/apis/sources/v1/apiserver_validation.go b/pkg/apis/sources/v1/apiserver_validation.go index 1b4774029c0..0eacb2dec94 100644 --- a/pkg/apis/sources/v1/apiserver_validation.go +++ b/pkg/apis/sources/v1/apiserver_validation.go @@ -22,6 +22,8 @@ import ( "k8s.io/apimachinery/pkg/runtime/schema" + eventingv1 "knative.dev/eventing/pkg/apis/eventing/v1" + "knative.dev/eventing/pkg/apis/feature" "knative.dev/pkg/apis" ) @@ -73,5 +75,22 @@ func (cs *ApiServerSourceSpec) Validate(ctx context.Context) *apis.FieldError { } } errs = errs.Also(cs.SourceSpec.Validate(ctx)) + errs = errs.Also(validateSubscriptionAPIFiltersList(ctx, cs.Filters).ViaField("filters")) + return errs +} + +func validateSubscriptionAPIFiltersList(ctx context.Context, filters []eventingv1.SubscriptionsAPIFilter) (errs *apis.FieldError) { + if !feature.FromContext(ctx).IsEnabled(feature.NewAPIServerFilters) { + if len(filters) != 0 { + return errs.Also(apis.ErrGeneric("Filters is not empty but the NewAPIServerFilters feature is disabled.")) + } + + return nil + } + + for i, f := range filters { + f := f + errs = errs.Also(eventingv1.ValidateSubscriptionAPIFilter(ctx, &f)).ViaIndex(i) + } return errs } diff --git a/pkg/apis/sources/v1/apiserver_validation_test.go b/pkg/apis/sources/v1/apiserver_validation_test.go index 9c6b509fe88..57646f3e23d 100644 --- a/pkg/apis/sources/v1/apiserver_validation_test.go +++ b/pkg/apis/sources/v1/apiserver_validation_test.go @@ -23,9 +23,11 @@ import ( "github.com/stretchr/testify/assert" + eventingv1 "knative.dev/eventing/pkg/apis/eventing/v1" duckv1 "knative.dev/pkg/apis/duck/v1" "github.com/google/go-cmp/cmp" + "knative.dev/eventing/pkg/apis/feature" "knative.dev/pkg/apis" ) @@ -266,3 +268,82 @@ func TestAPIServerValidationCallsSpecValidation(t *testing.T) { err := source.Validate(context.TODO()) assert.EqualError(t, err, "missing field(s): spec.resources", "Spec is not validated!") } + +func TestAPIServerFiltersValidation(t *testing.T) { + tests := []struct { + name string + featureState feature.Flag + want error + filters []eventingv1.SubscriptionsAPIFilter + }{{ + name: "an error is raised if the feature is disabled but filters are specified", + featureState: feature.Disabled, + filters: []eventingv1.SubscriptionsAPIFilter{{ + Prefix: map[string]string{ + "invALID": "abc", + }, + }}, + want: apis.ErrGeneric("Filters is not empty but the NewAPIServerFilters feature is disabled."), + }, { + name: "filters are validated when the feature is enabled", + featureState: feature.Enabled, + filters: []eventingv1.SubscriptionsAPIFilter{{ + Prefix: map[string]string{ + "invALID": "abc", + }, + }}, + want: apis.ErrInvalidKeyName("invALID", apis.CurrentField, + "Attribute name must start with a letter and can only contain "+ + "lowercase alphanumeric").ViaFieldKey("prefix", "invALID").ViaFieldIndex("filters", 0), + }, { + name: "validation works for valid filters", + featureState: feature.Enabled, + filters: []eventingv1.SubscriptionsAPIFilter{{ + Exact: map[string]string{"myattr": "myval"}, + }}, + want: nil, + }, { + name: "validation works for empty filters", + featureState: feature.Enabled, + filters: []eventingv1.SubscriptionsAPIFilter{}, + want: nil, + }, { + name: "validation does not work for empty filters", + featureState: feature.Disabled, + filters: []eventingv1.SubscriptionsAPIFilter{}, + want: nil, + }} + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + featureContext := feature.ToContext(context.TODO(), feature.Flags{ + feature.NewAPIServerFilters: test.featureState, + }) + apiserversource := &ApiServerSourceSpec{ + Filters: test.filters, + EventMode: "Resource", + Resources: []APIVersionKindSelector{{ + APIVersion: "v1", + Kind: "Foo", + }}, + SourceSpec: duckv1.SourceSpec{ + Sink: duckv1.Destination{ + Ref: &duckv1.KReference{ + APIVersion: "v1", + Kind: "broker", + Name: "default", + }, + }, + }, + } + got := apiserversource.Validate(featureContext) + if test.want != nil { + if diff := cmp.Diff(test.want.Error(), got.Error()); diff != "" { + t.Errorf("APIServerSourceSpec.Validate (-want, +got) = %v", diff) + } + } else if got != nil { + t.Errorf("APIServerSourceSpec.Validate wanted nil, got = %v", got.Error()) + } + }) + } +} diff --git a/pkg/apis/sources/v1/zz_generated.deepcopy.go b/pkg/apis/sources/v1/zz_generated.deepcopy.go index 6d175e3c960..8de185540fc 100644 --- a/pkg/apis/sources/v1/zz_generated.deepcopy.go +++ b/pkg/apis/sources/v1/zz_generated.deepcopy.go @@ -24,6 +24,7 @@ package v1 import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" runtime "k8s.io/apimachinery/pkg/runtime" + eventingv1 "knative.dev/eventing/pkg/apis/eventing/v1" ) // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. @@ -145,6 +146,13 @@ func (in *ApiServerSourceSpec) DeepCopyInto(out *ApiServerSourceSpec) { *out = new(metav1.LabelSelector) (*in).DeepCopyInto(*out) } + if in.Filters != nil { + in, out := &in.Filters, &out.Filters + *out = make([]eventingv1.SubscriptionsAPIFilter, len(*in)) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } + } return } diff --git a/pkg/broker/filter/filter_handler.go b/pkg/broker/filter/filter_handler.go index 6ee15200be9..6ded61139e5 100644 --- a/pkg/broker/filter/filter_handler.go +++ b/pkg/broker/filter/filter_handler.go @@ -545,7 +545,7 @@ func createSubscriptionsAPIFilters(logger *zap.Logger, trigger *eventingv1.Trigg logger.Debug("Found no filters for trigger", zap.Any("trigger.Spec", trigger.Spec)) return subscriptionsapi.NewNoFilter() } - return subscriptionsapi.NewAllFilter(materializeFiltersList(logger, trigger.Spec.Filters)...) + return subscriptionsapi.NewAllFilter(MaterializeFiltersList(logger, trigger.Spec.Filters)...) } func materializeSubscriptionsAPIFilter(logger *zap.Logger, filter eventingv1.SubscriptionsAPIFilter) eventfilter.Filter { @@ -574,9 +574,9 @@ func materializeSubscriptionsAPIFilter(logger *zap.Logger, filter eventingv1.Sub return nil } case len(filter.All) > 0: - materializedFilter = subscriptionsapi.NewAllFilter(materializeFiltersList(logger, filter.All)...) + materializedFilter = subscriptionsapi.NewAllFilter(MaterializeFiltersList(logger, filter.All)...) case len(filter.Any) > 0: - materializedFilter = subscriptionsapi.NewAnyFilter(materializeFiltersList(logger, filter.Any)...) + materializedFilter = subscriptionsapi.NewAnyFilter(MaterializeFiltersList(logger, filter.Any)...) case filter.Not != nil: materializedFilter = subscriptionsapi.NewNotFilter(materializeSubscriptionsAPIFilter(logger, *filter.Not)) case filter.CESQL != "": @@ -589,7 +589,8 @@ func materializeSubscriptionsAPIFilter(logger *zap.Logger, filter eventingv1.Sub return materializedFilter } -func materializeFiltersList(logger *zap.Logger, filters []eventingv1.SubscriptionsAPIFilter) []eventfilter.Filter { +// MaterialzieFilterList allows any component that supports `SubscriptionsAPIFilter` to process them +func MaterializeFiltersList(logger *zap.Logger, filters []eventingv1.SubscriptionsAPIFilter) []eventfilter.Filter { materializedFilters := make([]eventfilter.Filter, 0, len(filters)) for _, f := range filters { f := materializeSubscriptionsAPIFilter(logger, f) diff --git a/pkg/reconciler/apiserversource/resources/receive_adapter.go b/pkg/reconciler/apiserversource/resources/receive_adapter.go index 6b3bfe0827b..0997c8c7632 100644 --- a/pkg/reconciler/apiserversource/resources/receive_adapter.go +++ b/pkg/reconciler/apiserversource/resources/receive_adapter.go @@ -127,6 +127,7 @@ func makeEnv(args *ReceiveAdapterArgs) ([]corev1.EnvVar, error) { ResourceOwner: args.Source.Spec.ResourceOwner, EventMode: args.Source.Spec.EventMode, AllNamespaces: args.AllNamespaces, + Filters: args.Source.Spec.Filters, } for _, r := range args.Source.Spec.Resources { diff --git a/test/config/config-features.yaml b/test/config/config-features.yaml new file mode 100644 index 00000000000..30194a008de --- /dev/null +++ b/test/config/config-features.yaml @@ -0,0 +1,61 @@ +# Copyright 2024 The Knative Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +apiVersion: v1 +kind: ConfigMap +metadata: + name: config-features + namespace: knative-eventing + labels: +data: + # ALPHA feature: The kreference-group allows you to use the Group field in KReferences. + # For more details: https://github.com/knative/eventing/issues/5086 + kreference-group: "disabled" + + # ALPHA feature: The delivery-retryafter allows you to use the RetryAfter field in DeliverySpec. + # For more details: https://github.com/knative/eventing/issues/5811 + delivery-retryafter: "disabled" + + # BETA feature: The delivery-timeout allows you to use the Timeout field in DeliverySpec. + # For more details: https://github.com/knative/eventing/issues/5148 + delivery-timeout: "enabled" + + # ALPHA feature: The kreference-mapping allows you to map kreference onto templated URI + # For more details: https://github.com/knative/eventing/issues/5593 + kreference-mapping: "disabled" + + # BETA feature: The new-trigger-filters flag allows you to use the new `filters` field + # in Trigger objects with its rich filtering capabilities. + # For more details: https://github.com/knative/eventing/issues/5204 + new-trigger-filters: "enabled" + + # ALPHA feature: The transport-encryption flag allows you to encrypt events in transit using the transport layer security (TLS) protocol. + # For more details: https://github.com/knative/eventing/issues/5957 + transport-encryption: "disabled" + + # ALPHA feature: The eventtype-auto-create flag allows automatic creation of Even Type instances based on Event's type being processed. + # For more details: https://github.com/knative/eventing/issues/6909 + eventtype-auto-create: "disabled" + + # ALPHA feature: The aauthentication-oidc flag allows you to use OIDC authentication for Eventing. + # For more details: https://github.com/knative/eventing/issues/7174 + authentication-oidc: "disabled" + + # ALPHA feature: The cross-namespace-event-links flag allows you to use cross-namespace referencing for Eventing. + # For more details: https://github.com/knative/eventing/issues/7739 + cross-namespace-event-links: "disabled" + + # ALPHA feature: The new-apiserversource-filters flag allows you to use the new `filters` field + # in APIServerSource objects with its rich filtering capabilities. + new-apiserversource-filters: "enabled" diff --git a/test/e2e-common.sh b/test/e2e-common.sh index 0ef9cde0f27..d629cc6081f 100755 --- a/test/e2e-common.sh +++ b/test/e2e-common.sh @@ -87,6 +87,8 @@ function knative_setup() { enable_sugar || fail_test "Could not enable Sugar Controller Injection" unleash_duck || fail_test "Could not unleash the chaos duck" + + install_feature_cm || fail_test "Could not install features configmap" } function scale_controlplane() { @@ -271,6 +273,14 @@ function unleash_duck() { if (( SCALE_CHAOSDUCK_TO_ZERO )); then kubectl -n "${SYSTEM_NAMESPACE}" scale deployment/chaosduck --replicas=0; fi } +function install_feature_cm() { + KO_FLAGS="${KO_FLAGS:-}" + echo "install feature configmap" + cat test/config/config-features.yaml | \ + sed "s/namespace: ${KNATIVE_DEFAULT_NAMESPACE}/namespace: ${SYSTEM_NAMESPACE}/g" | \ + ko apply "${KO_FLAGS}" -f - || return $? +} + # Teardown the Knative environment after tests finish. function knative_teardown() { echo ">> Stopping Knative Eventing" diff --git a/test/rekt/apiserversource_test.go b/test/rekt/apiserversource_test.go index e2641fc3ba8..f9ff6f9094e 100644 --- a/test/rekt/apiserversource_test.go +++ b/test/rekt/apiserversource_test.go @@ -212,3 +212,17 @@ func TestApiServerSourceDeployment(t *testing.T) { env.Test(ctx, t, apiserversourcefeatures.DeployAPIServerSourceWithNodeSelector()) } + +func TestApiServerSourceNewFiltersFeature(t *testing.T) { + t.Parallel() + + ctx, env := global.Environment( + knative.WithKnativeNamespace(system.Namespace()), + knative.WithLoggingConfig, + knative.WithTracingConfig, + k8s.WithEventListener, + environment.Managed(t), + ) + + env.TestSet(ctx, t, apiserversourcefeatures.NewFiltersFeature()) +} diff --git a/test/rekt/features/apiserversource/data_plane.go b/test/rekt/features/apiserversource/data_plane.go index 01661db1716..40b31c6fa73 100644 --- a/test/rekt/features/apiserversource/data_plane.go +++ b/test/rekt/features/apiserversource/data_plane.go @@ -21,6 +21,7 @@ import ( "fmt" "github.com/cloudevents/sdk-go/v2/test" + eventingv1 "knative.dev/eventing/pkg/apis/eventing/v1" "knative.dev/pkg/apis" duckv1 "knative.dev/pkg/apis/duck/v1" "knative.dev/pkg/network" @@ -947,3 +948,68 @@ func SendsEventsWithBrokerAsSinkTLS() *feature.Feature { return f } + +func NewFiltersFeature() *feature.FeatureSet { + fs := &feature.FeatureSet{ + Name: "Knative ApiServerSource - Features - New Filter", + Features: []*feature.Feature{ + EventsAreFilteredOut(), + }, + } + return fs +} + +func EventsAreFilteredOut() *feature.Feature { + source := feature.MakeRandomK8sName("apiserversource") + sink := feature.MakeRandomK8sName("sink") + f := feature.NewFeatureNamed("Filters properly the messages") + + f.Setup("install sink", eventshub.Install(sink, eventshub.StartReceiver)) + + sacmName := feature.MakeRandomK8sName("apiserversource") + f.Setup("Create Service Account for ApiServerSource with RBAC for v1.Pod resources", + setupAccountAndRoleForPods(sacmName)) + + cfg := []manifest.CfgFn{ + apiserversource.WithServiceAccountName(sacmName), + apiserversource.WithEventMode(v1.ResourceMode), + apiserversource.WithSink(service.AsDestinationRef(sink)), + apiserversource.WithFilters([]eventingv1.SubscriptionsAPIFilter{{ + Exact: map[string]string{ + "type": "dev.knative.apiserver.resource.update", + }, + }}), + apiserversource.WithResources(v1.APIVersionKindSelector{ + APIVersion: "v1", + Kind: "Pod", + }), + } + + f.Setup("install ApiServerSource", apiserversource.Install(source, cfg...)) + f.Setup("ApiServerSource goes ready", apiserversource.IsReady(source)) + + examplePodName := feature.MakeRandomK8sName("example") + + // create a pod so that ApiServerSource delivers an event to its sink + // event body is similar to this: + // {"kind":"Pod","namespace":"test-wmbcixlv","name":"example-axvlzbvc","apiVersion":"v1"} + f.Requirement("install example pod", pod.Install(examplePodName, exampleImage)) + + f.Stable("ApiServerSource as event source"). + Must("delivers events", + eventassert.OnStore(sink).MatchEvent( + test.HasType("dev.knative.apiserver.resource.add"), + test.HasExtensions(map[string]interface{}{"apiversion": "v1"}), + test.DataContains(`"kind":"Pod"`), + test.DataContains(fmt.Sprintf(`"name":"%s"`, examplePodName)), + ).Exact(0)). + Must("delivers events", + eventassert.OnStore(sink).MatchEvent( + test.HasType("dev.knative.apiserver.resource.update"), + test.HasExtensions(map[string]interface{}{"apiversion": "v1"}), + test.DataContains(`"kind":"Pod"`), + test.DataContains(fmt.Sprintf(`"name":"%s"`, examplePodName)), + ).AtLeast(1)) + + return f +} diff --git a/test/rekt/resources/apiserversource/apiserversource.go b/test/rekt/resources/apiserversource/apiserversource.go index 8b92fbeaaa3..158243b8555 100644 --- a/test/rekt/resources/apiserversource/apiserversource.go +++ b/test/rekt/resources/apiserversource/apiserversource.go @@ -19,6 +19,7 @@ package apiserversource import ( "context" "embed" + "encoding/json" "fmt" "strings" "time" @@ -27,6 +28,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime/schema" + eventingv1 "knative.dev/eventing/pkg/apis/eventing/v1" "knative.dev/reconciler-test/pkg/environment" "knative.dev/reconciler-test/pkg/feature" "knative.dev/reconciler-test/pkg/k8s" @@ -37,6 +39,8 @@ import ( duckv1 "knative.dev/pkg/apis/duck/v1" "knative.dev/pkg/kmeta" "knative.dev/reconciler-test/pkg/manifest" + + yamllib "sigs.k8s.io/yaml" ) //go:embed *.yaml @@ -264,3 +268,26 @@ func ResetNodeLabels(ctx context.Context, t feature.T) { t.Fatalf("Could not update node: %v", err) } } + +func WithFilters(filters []eventingv1.SubscriptionsAPIFilter) manifest.CfgFn { + jsonBytes, err := json.Marshal(filters) + if err != nil { + panic(err) + } + + yamlBytes, err := yamllib.JSONToYAML(jsonBytes) + if err != nil { + panic(err) + } + + filtersYaml := string(yamlBytes) + lines := strings.Split(filtersYaml, "\n") + out := make([]string, 0, len(lines)) + for i := range lines { + out = append(out, " "+lines[i]) + } + + return func(cfg map[string]interface{}) { + cfg["filters"] = strings.Join(out, "\n") + } +} diff --git a/test/rekt/resources/apiserversource/apiserversource.yaml b/test/rekt/resources/apiserversource/apiserversource.yaml index 57da274150b..82e3350e2a1 100644 --- a/test/rekt/resources/apiserversource/apiserversource.yaml +++ b/test/rekt/resources/apiserversource/apiserversource.yaml @@ -87,3 +87,7 @@ spec: audience: {{ .sink.audience }} {{ end }} {{ end }} + {{ if .filters }} + filters: +{{ .filters }} + {{ end }}