Skip to content

Commit

Permalink
Add SubscriptionsAPI filters to APIServerSource
Browse files Browse the repository at this point in the history
This MR introduces the `filters` key in the APIServerSource Spec.
This new field allows users to filter which messages are sent from the
APIServerSource to the specified sink. The filter language
is the new SubscriptionsAPI, that allows for powerful filtering.

Signed-off-by: Hector Martinez <hemartin@redhat.com>
  • Loading branch information
rh-hemartin committed Mar 19, 2024
1 parent acc6385 commit 0353280
Show file tree
Hide file tree
Showing 12 changed files with 143 additions and 20 deletions.
3 changes: 3 additions & 0 deletions config/core/configmaps/features.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -60,3 +60,6 @@ data:
# For more details: https://github.com/knative/eventing/issues/7739
cross-namespace-event-links: "disabled"

# ALPHA feature: The new-apiserversource-filters flag allows you to use the new `filters` field
# in APIServerSource objects with its rich filtering capabilities.
new-apiserversource-filters: "enabled"
1 change: 1 addition & 0 deletions config/core/resources/apiserversource.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ spec:
properties:
spec:
type: object
x-kubernetes-preserve-unknown-fields: true
required:
- resources
properties:
Expand Down
1 change: 1 addition & 0 deletions pkg/adapter/apiserver/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ func (a *apiServerAdapter) start(ctx context.Context, stopCh <-chan struct{}) er
logger: a.logger,
ref: a.config.EventMode == v1.ReferenceMode,
apiServerSourceName: a.name,
filters: a.config.Filters,
}
if a.config.ResourceOwner != nil {
a.logger.Infow("will be filtered",
Expand Down
12 changes: 12 additions & 0 deletions pkg/adapter/apiserver/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ package apiserver

import (
"k8s.io/apimachinery/pkg/runtime/schema"

eventingv1 "knative.dev/eventing/pkg/apis/eventing/v1"
v1 "knative.dev/eventing/pkg/apis/sources/v1"
)

Expand Down Expand Up @@ -56,4 +58,14 @@ type Config struct {
// Defaults to `Reference`
// +optional
EventMode string `json:"mode,omitempty"`

// Filters is an experimental field that conforms to the CNCF CloudEvents Subscriptions
// API. It's an array of filter expressions that evaluate to true or false.
// If any filter expression in the array evaluates to false, the event MUST
// NOT be sent to the Sink. If all the filter expressions in the array
// evaluate to true, the event MUST be attempted to be delivered. Absence of
// a filter or empty array implies a value of true.
//
// +optional
Filters []eventingv1.SubscriptionsAPIFilter `json:"filters,omitempty"`
}
44 changes: 28 additions & 16 deletions pkg/adapter/apiserver/delegate.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,45 +24,57 @@ import (
"go.uber.org/zap"
"k8s.io/client-go/tools/cache"
"knative.dev/eventing/pkg/adapter/apiserver/events"
eventingv1 "knative.dev/eventing/pkg/apis/eventing/v1"
brokerfilter "knative.dev/eventing/pkg/broker/filter"
"knative.dev/eventing/pkg/eventfilter"
"knative.dev/eventing/pkg/eventfilter/subscriptionsapi"
)

type resourceDelegate struct {
ce cloudevents.Client
source string
ref bool
apiServerSourceName string
filters []eventingv1.SubscriptionsAPIFilter

logger *zap.SugaredLogger
}

var _ cache.Store = (*resourceDelegate)(nil)

func (a *resourceDelegate) Add(obj interface{}) error {
ctx, event, err := events.MakeAddEvent(a.source, a.apiServerSourceName, obj, a.ref)
if err != nil {
a.logger.Infow("event creation failed", zap.Error(err))
return err
}
a.sendCloudEvent(ctx, event)
return nil
return a.handleKubernetesObject(events.MakeAddEvent, obj)
}

func (a *resourceDelegate) Update(obj interface{}) error {
ctx, event, err := events.MakeUpdateEvent(a.source, a.apiServerSourceName, obj, a.ref)
if err != nil {
a.logger.Info("event creation failed", zap.Error(err))
return err
}
a.sendCloudEvent(ctx, event)
return nil
return a.handleKubernetesObject(events.MakeUpdateEvent, obj)
}

func (a *resourceDelegate) Delete(obj interface{}) error {
ctx, event, err := events.MakeDeleteEvent(a.source, a.apiServerSourceName, obj, a.ref)
return a.handleKubernetesObject(events.MakeDeleteEvent, obj)

}

// makeEventFunc represents the signature of the functions `events.Make*Event` so they can
// be passed as a parameter
type makeEventFunc func(string, string, interface{}, bool) (context.Context, cloudevents.Event, error)

func (a *resourceDelegate) handleKubernetesObject(makeEvent makeEventFunc, obj interface{}) error {
ctx, event, err := makeEvent(a.source, a.apiServerSourceName, obj, a.ref)

if err != nil {
a.logger.Info("event creation failed", zap.Error(err))
a.logger.Infow("event creation failed", zap.Error(err))
return err
}

// TODO: generate the filter once, then reuse it to improve performance, maybe sync.Once?
filter := subscriptionsapi.NewAllFilter(brokerfilter.MaterializeFiltersList(a.logger.Desugar(), a.filters)...)
filterResult := filter.Filter(ctx, event)
if filterResult == eventfilter.FailFilter {
a.logger.Debugf("event type %s filtered out", event.Type())
return nil
}

a.sendCloudEvent(ctx, event)
return nil
}
Expand Down
1 change: 1 addition & 0 deletions pkg/apis/feature/features.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ func newDefaults() Flags {
TransportEncryption: Disabled,
OIDCAuthentication: Disabled,
EvenTypeAutoCreate: Disabled,
NewAPIServerFilters: Enabled,
}
}

Expand Down
1 change: 1 addition & 0 deletions pkg/apis/feature/flag_names.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,5 @@ const (
OIDCAuthentication = "authentication-oidc"
NodeSelectorLabel = "apiserversources-nodeselector-"
CrossNamespaceEventLinks = "cross-namespace-event-links"
NewAPIServerFilters = "new-apiserversource-filters"
)
11 changes: 11 additions & 0 deletions pkg/apis/sources/v1/apiserver_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package v1
import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
eventingv1 "knative.dev/eventing/pkg/apis/eventing/v1"
"knative.dev/pkg/apis"
duckv1 "knative.dev/pkg/apis/duck/v1"
"knative.dev/pkg/kmeta"
Expand Down Expand Up @@ -85,6 +86,16 @@ type ApiServerSourceSpec struct {
// should be watched by the source.
// +optional
NamespaceSelector *metav1.LabelSelector `json:"namespaceSelector,omitempty"`

// Filters is an experimental field that conforms to the CNCF CloudEvents Subscriptions
// API. It's an array of filter expressions that evaluate to true or false.
// If any filter expression in the array evaluates to false, the event MUST
// NOT be sent to the Sink. If all the filter expressions in the array
// evaluate to true, the event MUST be attempted to be delivered. Absence of
// a filter or empty array implies a value of true.
//
// +optional
Filters []eventingv1.SubscriptionsAPIFilter `json:"filters,omitempty"`
}

// ApiServerSourceStatus defines the observed state of ApiServerSource
Expand Down
15 changes: 15 additions & 0 deletions pkg/apis/sources/v1/apiserver_validation.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import (

"k8s.io/apimachinery/pkg/runtime/schema"

eventingv1 "knative.dev/eventing/pkg/apis/eventing/v1"
"knative.dev/eventing/pkg/apis/feature"
"knative.dev/pkg/apis"
)

Expand Down Expand Up @@ -73,5 +75,18 @@ func (cs *ApiServerSourceSpec) Validate(ctx context.Context) *apis.FieldError {
}
}
errs = errs.Also(cs.SourceSpec.Validate(ctx))
errs = errs.Also(validateSubscriptionAPIFiltersList(ctx, cs.Filters).ViaField("filters"))
return errs
}

func validateSubscriptionAPIFiltersList(ctx context.Context, filters []eventingv1.SubscriptionsAPIFilter) (errs *apis.FieldError) {
if filters == nil || !feature.FromContext(ctx).IsEnabled(feature.NewAPIServerFilters) {
return nil
}

for i, f := range filters {
f := f
errs = errs.Also(eventingv1.ValidateSubscriptionAPIFilter(ctx, &f)).ViaIndex(i)
}
return errs
}
64 changes: 64 additions & 0 deletions pkg/apis/sources/v1/apiserver_validation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,11 @@ import (

"github.com/stretchr/testify/assert"

eventingv1 "knative.dev/eventing/pkg/apis/eventing/v1"
duckv1 "knative.dev/pkg/apis/duck/v1"

"github.com/google/go-cmp/cmp"
"knative.dev/eventing/pkg/apis/feature"
"knative.dev/pkg/apis"
)

Expand Down Expand Up @@ -266,3 +268,65 @@ func TestAPIServerValidationCallsSpecValidation(t *testing.T) {
err := source.Validate(context.TODO())
assert.EqualError(t, err, "missing field(s): spec.resources", "Spec is not validated!")
}

func TestAPIServerFiltersValidation(t *testing.T) {
tests := []struct {
name string
featureState feature.Flag
want error
filters []eventingv1.SubscriptionsAPIFilter
}{{
name: "filters are validated only if feature is enabled",
featureState: feature.Disabled,
filters: []eventingv1.SubscriptionsAPIFilter{{
Prefix: map[string]string{
"invALID": "abc",
},
}},
want: nil,
}, {
name: "filters are validated only if feature is enabled",
featureState: feature.Enabled,
filters: []eventingv1.SubscriptionsAPIFilter{{
Prefix: map[string]string{
"invALID": "abc",
},
}},
want: apis.ErrInvalidKeyName("invALID", apis.CurrentField,
"Attribute name must start with a letter and can only contain "+
"lowercase alphanumeric").ViaFieldKey("prefix", "invALID").ViaFieldIndex("filters", 0),
}}

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
featureContext := feature.ToContext(context.TODO(), feature.Flags{
feature.NewAPIServerFilters: test.featureState,
})
apiserversource := &ApiServerSourceSpec{
Filters: test.filters,
EventMode: "Resource",
Resources: []APIVersionKindSelector{{
APIVersion: "v1",
Kind: "Foo",
}},
SourceSpec: duckv1.SourceSpec{
Sink: duckv1.Destination{
Ref: &duckv1.KReference{
APIVersion: "v1",
Kind: "broker",
Name: "default",
},
},
},
}
got := apiserversource.Validate(featureContext)
if test.want != nil {
if diff := cmp.Diff(test.want.Error(), got.Error()); diff != "" {
t.Errorf("APIServerSourceSpec.Validate (-want, +got) = %v", diff)
}
} else if got != nil {
t.Errorf("APIServerSourceSpec.Validate wanted nil, got = %v", got.Error())
}
})
}
}
9 changes: 5 additions & 4 deletions pkg/broker/filter/filter_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -530,7 +530,7 @@ func createSubscriptionsAPIFilters(logger *zap.Logger, trigger *eventingv1.Trigg
logger.Debug("Found no filters for trigger", zap.Any("trigger.Spec", trigger.Spec))
return subscriptionsapi.NewNoFilter()
}
return subscriptionsapi.NewAllFilter(materializeFiltersList(logger, trigger.Spec.Filters)...)
return subscriptionsapi.NewAllFilter(MaterializeFiltersList(logger, trigger.Spec.Filters)...)
}

func materializeSubscriptionsAPIFilter(logger *zap.Logger, filter eventingv1.SubscriptionsAPIFilter) eventfilter.Filter {
Expand Down Expand Up @@ -559,9 +559,9 @@ func materializeSubscriptionsAPIFilter(logger *zap.Logger, filter eventingv1.Sub
return nil
}
case len(filter.All) > 0:
materializedFilter = subscriptionsapi.NewAllFilter(materializeFiltersList(logger, filter.All)...)
materializedFilter = subscriptionsapi.NewAllFilter(MaterializeFiltersList(logger, filter.All)...)
case len(filter.Any) > 0:
materializedFilter = subscriptionsapi.NewAnyFilter(materializeFiltersList(logger, filter.Any)...)
materializedFilter = subscriptionsapi.NewAnyFilter(MaterializeFiltersList(logger, filter.Any)...)
case filter.Not != nil:
materializedFilter = subscriptionsapi.NewNotFilter(materializeSubscriptionsAPIFilter(logger, *filter.Not))
case filter.CESQL != "":
Expand All @@ -574,7 +574,8 @@ func materializeSubscriptionsAPIFilter(logger *zap.Logger, filter eventingv1.Sub
return materializedFilter
}

func materializeFiltersList(logger *zap.Logger, filters []eventingv1.SubscriptionsAPIFilter) []eventfilter.Filter {
// MaterialzieFilterList allows any component that supports `SubscriptionsAPIFilter` to process them
func MaterializeFiltersList(logger *zap.Logger, filters []eventingv1.SubscriptionsAPIFilter) []eventfilter.Filter {
materializedFilters := make([]eventfilter.Filter, 0, len(filters))
for _, f := range filters {
f := materializeSubscriptionsAPIFilter(logger, f)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ func makeEnv(args *ReceiveAdapterArgs) ([]corev1.EnvVar, error) {
ResourceOwner: args.Source.Spec.ResourceOwner,
EventMode: args.Source.Spec.EventMode,
AllNamespaces: args.AllNamespaces,
Filters: args.Source.Spec.Filters,
}

for _, r := range args.Source.Spec.Resources {
Expand Down

0 comments on commit 0353280

Please sign in to comment.