From 274eefbfb7a24dad6ee94f02c2994667991d4a43 Mon Sep 17 00:00:00 2001 From: Calum Murray Date: Wed, 31 Jul 2024 16:22:16 -0400 Subject: [PATCH 1/9] feat: add filters to eventpolicy type Signed-off-by: Calum Murray --- docs/eventing-api.md | 38 +++++++++++++++++- .../eventing/v1alpha1/eventpolicy_types.go | 11 +++++ .../v1alpha1/eventpolicy_validation.go | 3 ++ .../v1alpha1/eventpolicy_validation_test.go | 40 +++++++++++++++++++ .../v1alpha1/zz_generated.deepcopy.go | 8 ++++ 5 files changed, 99 insertions(+), 1 deletion(-) diff --git a/docs/eventing-api.md b/docs/eventing-api.md index 213521267f1..196ab597488 100644 --- a/docs/eventing-api.md +++ b/docs/eventing-api.md @@ -2254,7 +2254,7 @@ AppliedEventPoliciesStatus

SubscriptionsAPIFilter

-(Appears on:SubscriptionsAPIFilter, TriggerSpec, ApiServerSourceSpec) +(Appears on:SubscriptionsAPIFilter, TriggerSpec, EventPolicySpec, ApiServerSourceSpec)

SubscriptionsAPIFilter allows defining a filter expression using CloudEvents @@ -2735,6 +2735,24 @@ An empty list means it applies to all resources in the EventPolicies namespaceFrom is the list of sources or oidc identities, which are allowed to send events to the targets (.spec.to).

+ + +filters
+ + +[]SubscriptionsAPIFilter + + + + +(Optional) +

Filters is the list of SubscriptoinsApi filters which determine whether or not the event is accepted. +It is an array of filter expressions that evaluate to true or false. +If any filter expression in the array evaluates to false, the event will not +pass the target resource’s ingress. Absence of any filters implies that the filters +always evaluate to true.

+ + @@ -2898,6 +2916,24 @@ An empty list means it applies to all resources in the EventPolicies namespaceFrom is the list of sources or oidc identities, which are allowed to send events to the targets (.spec.to).

+ + +filters
+ + +[]SubscriptionsAPIFilter + + + + +(Optional) +

Filters is the list of SubscriptoinsApi filters which determine whether or not the event is accepted. +It is an array of filter expressions that evaluate to true or false. +If any filter expression in the array evaluates to false, the event will not +pass the target resource’s ingress. Absence of any filters implies that the filters +always evaluate to true.

+ +

EventPolicySpecFrom diff --git a/pkg/apis/eventing/v1alpha1/eventpolicy_types.go b/pkg/apis/eventing/v1alpha1/eventpolicy_types.go index 53d62653444..e3c312aae9f 100644 --- a/pkg/apis/eventing/v1alpha1/eventpolicy_types.go +++ b/pkg/apis/eventing/v1alpha1/eventpolicy_types.go @@ -20,9 +20,12 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" + "knative.dev/pkg/apis" duckv1 "knative.dev/pkg/apis/duck/v1" "knative.dev/pkg/kmeta" + + eventingv1 "knative.dev/eventing/pkg/apis/eventing/v1" ) // +genclient @@ -71,6 +74,14 @@ type EventPolicySpec struct { // From is the list of sources or oidc identities, which are allowed to send events to the targets (.spec.to). From []EventPolicySpecFrom `json:"from,omitempty"` + + // Filters is the list of SubscriptoinsApi filters which determine whether or not the event is accepted. + // It is an array of filter expressions that evaluate to true or false. + // If any filter expression in the array evaluates to false, the event will not + // pass the target resource's ingress. Absence of any filters implies that the filters + // always evaluate to true. + // +optional + Filters []eventingv1.SubscriptionsAPIFilter `json:"filters,omitempty"` } type EventPolicySpecTo struct { diff --git a/pkg/apis/eventing/v1alpha1/eventpolicy_validation.go b/pkg/apis/eventing/v1alpha1/eventpolicy_validation.go index 5f05c240df9..4ae6ea3f7c2 100644 --- a/pkg/apis/eventing/v1alpha1/eventpolicy_validation.go +++ b/pkg/apis/eventing/v1alpha1/eventpolicy_validation.go @@ -20,6 +20,7 @@ import ( "context" "strings" + eventingv1 "knative.dev/eventing/pkg/apis/eventing/v1" "knative.dev/eventing/pkg/apis/feature" "knative.dev/pkg/apis" ) @@ -60,6 +61,8 @@ func (ets *EventPolicySpec) Validate(ctx context.Context) *apis.FieldError { } } + err = err.Also(eventingv1.ValidateSubscriptionAPIFiltersList(ctx, ets.Filters).ViaField("filters")) + return err } diff --git a/pkg/apis/eventing/v1alpha1/eventpolicy_validation_test.go b/pkg/apis/eventing/v1alpha1/eventpolicy_validation_test.go index 20034c8f907..14c40ddeb97 100644 --- a/pkg/apis/eventing/v1alpha1/eventpolicy_validation_test.go +++ b/pkg/apis/eventing/v1alpha1/eventpolicy_validation_test.go @@ -21,6 +21,7 @@ import ( "testing" "github.com/google/go-cmp/cmp" + eventingv1 "knative.dev/eventing/pkg/apis/eventing/v1" "knative.dev/eventing/pkg/apis/feature" "knative.dev/pkg/apis" "knative.dev/pkg/ptr" @@ -294,6 +295,45 @@ func TestEventPolicySpecValidationWithOIDCAuthenticationFeatureFlagEnabled(t *te return nil }(), }, + { + name: "valid, from.sub exactly '*', valid filters", + ep: &EventPolicy{ + Spec: EventPolicySpec{ + From: []EventPolicySpecFrom{{ + Sub: ptr.String("*"), + }}, + Filters: []eventingv1.SubscriptionsAPIFilter{ + { + Prefix: map[string]string{"type": "example"}, + }, + }, + }, + }, + want: func() *apis.FieldError { + return nil + }(), + }, + { + name: "invalid, from.sub exactly '*', invalid cesql filter", + ep: &EventPolicy{ + Spec: EventPolicySpec{ + From: []EventPolicySpecFrom{{ + Sub: ptr.String("*"), + }}, + Filters: []eventingv1.SubscriptionsAPIFilter{ + { + CESQL: "type LIKE id", + }, + }, + }, + }, + want: func() *apis.FieldError { + + return apis.ErrInvalidValue("type LIKE id", "cesql", "parse error: syntax error: |failed to parse LIKE expression: the pattern was not a string literal"). + ViaFieldIndex("filters", 0). + ViaField("spec") + }(), + }, } for _, test := range tests { diff --git a/pkg/apis/eventing/v1alpha1/zz_generated.deepcopy.go b/pkg/apis/eventing/v1alpha1/zz_generated.deepcopy.go index 998b577cad3..068369c53ba 100644 --- a/pkg/apis/eventing/v1alpha1/zz_generated.deepcopy.go +++ b/pkg/apis/eventing/v1alpha1/zz_generated.deepcopy.go @@ -24,6 +24,7 @@ package v1alpha1 import ( v1 "k8s.io/apimachinery/pkg/apis/meta/v1" runtime "k8s.io/apimachinery/pkg/runtime" + eventingv1 "knative.dev/eventing/pkg/apis/eventing/v1" ) // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. @@ -146,6 +147,13 @@ func (in *EventPolicySpec) DeepCopyInto(out *EventPolicySpec) { (*in)[i].DeepCopyInto(&(*out)[i]) } } + if in.Filters != nil { + in, out := &in.Filters, &out.Filters + *out = make([]eventingv1.SubscriptionsAPIFilter, len(*in)) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } + } return } From db2025431fcd6b24adbfd6cd3be3a17405ddcfc4 Mon Sep 17 00:00:00 2001 From: Calum Murray Date: Wed, 31 Jul 2024 16:32:00 -0400 Subject: [PATCH 2/9] feat: add filters to eventpolicy crd Signed-off-by: Calum Murray --- config/core/resources/eventpolicy.yaml | 38 ++++++++++++++++++++++++++ 1 file changed, 38 insertions(+) diff --git a/config/core/resources/eventpolicy.yaml b/config/core/resources/eventpolicy.yaml index 159d379e5bb..b68003ac1af 100644 --- a/config/core/resources/eventpolicy.yaml +++ b/config/core/resources/eventpolicy.yaml @@ -110,6 +110,44 @@ spec: description: matchLabels is a map of {key,value} pairs. A single {key,value} in the matchLabels map is equivalent to an element of matchExpressions, whose key field is "key", the operator is "In", and the values array contains only "value". The requirements are ANDed. type: object x-kubernetes-preserve-unknown-fields: true + filters: + description: 'Filters is an array of SubscriptionsAPIFilters that evaluate to true or false. If any filter expression in the array evaluates to false, the event will not continue pass the ingress of the target resources of the policy' + type: array + items: + type: object + properties: + all: + description: 'All evaluates to true if all the nested expressions evaluate to true. It must contain at least one filter expression' + type: array + items: + type: object + x-kubernetes-preserve-unknown-fields: true + any: + description: 'Any evaluates to true if any of the nested expressions evaluate to true. It must contain at least one filter expression' + type: array + items: + type: object + x-kubernetes-preserve-unknown-fields: true + cesql: + description: 'CESQL is a CloudEvents SQL v1 expression that will evaluate to true or false for each CloudEvent.' + type: string + exact: + description: 'Exact evaluates to true if the values of the matching CloudEvents attributes all exactly match with the associated value string specified (case sensitive)' + type: object + x-kubernetes-preserve-unknown-fields: true + not: + description: 'Not evaluates to true if the nested expression evaluates to false.' + type: object + x-kubernetes-preserve-unknown-fields: true + prefix: + description: 'Prefix evaluates to true if the values of the matching CloudEvents attributes all start with the associated value string specified (case sensitive)' + type: object + x-kubernetes-preserve-unknown-fields: true + suffix: + description: 'Exact evaluates to true if the values of the matching CloudEvents attributes all end with the associated value string specified (case sensitive)' + type: object + x-kubernetes-preserve-unknown-fields: true + status: description: Status represents the current state of the EventPolicy. This data may be out of date. type: object From c2a4b80047d82196e74ad3274e322508b362fa44 Mon Sep 17 00:00:00 2001 From: Calum Murray Date: Wed, 31 Jul 2024 17:08:31 -0400 Subject: [PATCH 3/9] feat: updated auth package to handle filters as well Signed-off-by: Calum Murray --- pkg/adapter/apiserver/adapter.go | 3 +- pkg/adapter/apiserver/adapter_test.go | 5 +- pkg/adapter/apiserver/delegate_test.go | 5 +- pkg/auth/event_policy.go | 24 +++--- pkg/auth/event_policy_test.go | 4 +- pkg/auth/token_verifier.go | 27 +++++-- pkg/broker/filter/filter_handler.go | 69 +---------------- pkg/broker/filter/filter_handler_test.go | 2 +- pkg/eventfilter/subscriptionsapi/create.go | 86 ++++++++++++++++++++++ 9 files changed, 134 insertions(+), 91 deletions(-) create mode 100644 pkg/eventfilter/subscriptionsapi/create.go diff --git a/pkg/adapter/apiserver/adapter.go b/pkg/adapter/apiserver/adapter.go index aba3301306c..cda3e617e44 100644 --- a/pkg/adapter/apiserver/adapter.go +++ b/pkg/adapter/apiserver/adapter.go @@ -34,7 +34,6 @@ import ( "knative.dev/eventing/pkg/adapter/v2" v1 "knative.dev/eventing/pkg/apis/sources/v1" - brokerfilter "knative.dev/eventing/pkg/broker/filter" "knative.dev/eventing/pkg/eventfilter/subscriptionsapi" ) @@ -73,7 +72,7 @@ func (a *apiServerAdapter) start(ctx context.Context, stopCh <-chan struct{}) er logger: a.logger, ref: a.config.EventMode == v1.ReferenceMode, apiServerSourceName: a.name, - filter: subscriptionsapi.NewAllFilter(brokerfilter.MaterializeFiltersList(a.logger.Desugar(), a.config.Filters)...), + filter: subscriptionsapi.NewAllFilter(subscriptionsapi.MaterializeFiltersList(a.logger.Desugar(), a.config.Filters)...), } if a.config.ResourceOwner != nil { a.logger.Infow("will be filtered", diff --git a/pkg/adapter/apiserver/adapter_test.go b/pkg/adapter/apiserver/adapter_test.go index 93b33df0bb0..8c931170f60 100644 --- a/pkg/adapter/apiserver/adapter_test.go +++ b/pkg/adapter/apiserver/adapter_test.go @@ -35,7 +35,6 @@ import ( kubetesting "k8s.io/client-go/testing" adaptertest "knative.dev/eventing/pkg/adapter/v2/test" eventingv1 "knative.dev/eventing/pkg/apis/eventing/v1" - brokerfilter "knative.dev/eventing/pkg/broker/filter" "knative.dev/eventing/pkg/eventfilter/subscriptionsapi" rectesting "knative.dev/eventing/pkg/reconciler/testing" "knative.dev/pkg/logging" @@ -299,7 +298,7 @@ func makeResourceAndTestingClient() (*resourceDelegate, *adaptertest.TestCloudEv source: "unit-test", apiServerSourceName: apiServerSourceNameTest, logger: logger, - filter: subscriptionsapi.NewAllFilter(brokerfilter.MaterializeFiltersList(logger.Desugar(), []eventingv1.SubscriptionsAPIFilter{})...), + filter: subscriptionsapi.NewAllFilter(subscriptionsapi.MaterializeFiltersList(logger.Desugar(), []eventingv1.SubscriptionsAPIFilter{})...), }, ce } @@ -313,6 +312,6 @@ func makeRefAndTestingClient() (*resourceDelegate, *adaptertest.TestCloudEventsC apiServerSourceName: apiServerSourceNameTest, logger: zap.NewExample().Sugar(), ref: true, - filter: subscriptionsapi.NewAllFilter(brokerfilter.MaterializeFiltersList(logger.Desugar(), []eventingv1.SubscriptionsAPIFilter{})...), + filter: subscriptionsapi.NewAllFilter(subscriptionsapi.MaterializeFiltersList(logger.Desugar(), []eventingv1.SubscriptionsAPIFilter{})...), }, ce } diff --git a/pkg/adapter/apiserver/delegate_test.go b/pkg/adapter/apiserver/delegate_test.go index 00fc9dfe691..d2d978b1f5a 100644 --- a/pkg/adapter/apiserver/delegate_test.go +++ b/pkg/adapter/apiserver/delegate_test.go @@ -22,7 +22,6 @@ import ( adaptertest "knative.dev/eventing/pkg/adapter/v2/test" eventingv1 "knative.dev/eventing/pkg/apis/eventing/v1" "knative.dev/eventing/pkg/apis/sources" - brokerfilter "knative.dev/eventing/pkg/broker/filter" "knative.dev/eventing/pkg/eventfilter/subscriptionsapi" ) @@ -87,7 +86,7 @@ func TestFilterFails(t *testing.T) { source: "unit-test", apiServerSourceName: apiServerSourceNameTest, logger: logger, - filter: subscriptionsapi.NewAllFilter(brokerfilter.MaterializeFiltersList(logger.Desugar(), filters)...), + filter: subscriptionsapi.NewAllFilter(subscriptionsapi.MaterializeFiltersList(logger.Desugar(), filters)...), } delegate.Update(simplePod("unit", "test")) @@ -104,7 +103,7 @@ func TestEmptyFiltersList(t *testing.T) { source: "unit-test", apiServerSourceName: apiServerSourceNameTest, logger: logger, - filter: subscriptionsapi.NewAllFilter(brokerfilter.MaterializeFiltersList(logger.Desugar(), filters)...), + filter: subscriptionsapi.NewAllFilter(subscriptionsapi.MaterializeFiltersList(logger.Desugar(), filters)...), } delegate.Update(simplePod("unit", "test")) diff --git a/pkg/auth/event_policy.go b/pkg/auth/event_policy.go index 7d4fcb1dbad..f30a726784e 100644 --- a/pkg/auth/event_policy.go +++ b/pkg/auth/event_policy.go @@ -17,7 +17,12 @@ limitations under the License. package auth import ( + "context" "fmt" + cloudevents "github.com/cloudevents/sdk-go/v2" + "go.uber.org/zap" + "knative.dev/eventing/pkg/eventfilter" + "knative.dev/eventing/pkg/eventfilter/subscriptionsapi" "strings" eventingduckv1 "knative.dev/eventing/pkg/apis/duck/v1" @@ -204,17 +209,18 @@ func resolveSubjectsFromReference(resolver *resolver.AuthenticatableResolver, re return objFullSANames, nil } -// SubjectContained checks if the given sub is contained in the list of allowedSubs +// SubjectAndFiltersPass checks if the given sub is contained in the list of allowedSubs // or if it matches a prefix pattern in subs (e.g. system:serviceaccounts:my-ns:*) -func SubjectContained(sub string, allowedSubs []string) bool { - for _, s := range allowedSubs { - if strings.EqualFold(s, sub) { - return true - } +func SubjectAndFiltersPass(ctx context.Context, sub string, allowedSubsWithFilters []filtersBySubjects, event *cloudevents.Event, logger *zap.SugaredLogger) bool { + if event == nil { + return false + } - if strings.HasSuffix(s, "*") && - strings.HasPrefix(sub, strings.TrimSuffix(s, "*")) { - return true + for _, swf := range allowedSubsWithFilters { + for _, s := range swf.subjects { + if strings.EqualFold(s, sub) || (strings.HasSuffix(s, "*") && strings.HasPrefix(sub, strings.TrimSuffix(s, "*"))) { + return subscriptionsapi.CreateSubscriptionsAPIFilters(logger, swf.filters).Filter(ctx, *event) != eventfilter.FailFilter + } } } diff --git a/pkg/auth/event_policy_test.go b/pkg/auth/event_policy_test.go index 124f1423173..e08041b3d99 100644 --- a/pkg/auth/event_policy_test.go +++ b/pkg/auth/event_policy_test.go @@ -762,8 +762,8 @@ func TestSubjectContained(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - if got := SubjectContained(tt.sub, tt.allowedSubs); got != tt.want { - t.Errorf("SubjectContained(%q, '%v') = %v, want %v", tt.sub, tt.allowedSubs, got, tt.want) + if got := SubjectAndFiltersPass(tt.sub, tt.allowedSubs); got != tt.want { + t.Errorf("SubjectAndFiltersPass(%q, '%v') = %v, want %v", tt.sub, tt.allowedSubs, got, tt.want) } }) } diff --git a/pkg/auth/token_verifier.go b/pkg/auth/token_verifier.go index 0d87cf11f69..d9a8d8a48d0 100644 --- a/pkg/auth/token_verifier.go +++ b/pkg/auth/token_verifier.go @@ -29,9 +29,12 @@ import ( eventpolicyinformer "knative.dev/eventing/pkg/client/injection/informers/eventing/v1alpha1/eventpolicy" "knative.dev/eventing/pkg/client/listers/eventing/v1alpha1" + "github.com/cloudevents/sdk-go/v2/binding" + cehttp "github.com/cloudevents/sdk-go/v2/protocol/http" "github.com/coreos/go-oidc/v3/oidc" "go.uber.org/zap" "k8s.io/client-go/rest" + eventingv1 "knative.dev/eventing/pkg/apis/eventing/v1" "knative.dev/eventing/pkg/apis/feature" "knative.dev/pkg/injection" "knative.dev/pkg/logging" @@ -92,7 +95,7 @@ func (v *OIDCTokenVerifier) VerifyRequest(ctx context.Context, features feature. return fmt.Errorf("authentication of request could not be verified: %w", err) } - err = v.verifyAuthZ(features, idToken, resourceNamespace, policyRefs, resp) + err = v.verifyAuthZ(ctx, features, idToken, resourceNamespace, policyRefs, req, resp) if err != nil { return fmt.Errorf("authorization of request could not be verified: %w", err) } @@ -123,9 +126,18 @@ func (v *OIDCTokenVerifier) verifyAuthN(ctx context.Context, audience *string, r } // verifyAuthZ verifies if the given idToken is allowed by the resources eventPolicyStatus -func (v *OIDCTokenVerifier) verifyAuthZ(features feature.Flags, idToken *IDToken, resourceNamespace string, policyRefs []duckv1.AppliedEventPolicyRef, resp http.ResponseWriter) error { +func (v *OIDCTokenVerifier) verifyAuthZ(ctx context.Context, features feature.Flags, idToken *IDToken, resourceNamespace string, policyRefs []duckv1.AppliedEventPolicyRef, req *http.Request, resp http.ResponseWriter) error { if len(policyRefs) > 0 { - subjectsFromApplyingPolicies := []string{} + message := cehttp.NewMessageFromHttpRequest(req) + defer message.Finish(nil) + + event, err := binding.ToEvent(ctx, message) + if err != nil { + resp.WriteHeader(http.StatusInternalServerError) + return fmt.Errorf("failed to decode event from request: %w", err) + } + + subjectsFromApplyingPolicies := []filtersBySubjects{} for _, p := range policyRefs { policy, err := v.eventPolicyLister.EventPolicies(resourceNamespace).Get(p.Name) if err != nil { @@ -133,10 +145,10 @@ func (v *OIDCTokenVerifier) verifyAuthZ(features feature.Flags, idToken *IDToken return fmt.Errorf("failed to get eventPolicy: %w", err) } - subjectsFromApplyingPolicies = append(subjectsFromApplyingPolicies, policy.Status.From...) + subjectsFromApplyingPolicies = append(subjectsFromApplyingPolicies, filtersBySubjects{subjects: policy.Status.From, filters: policy.Spec.Filters}) } - if !SubjectContained(idToken.Subject, subjectsFromApplyingPolicies) { + if !SubjectAndFiltersPass(ctx, idToken.Subject, subjectsFromApplyingPolicies, event, v.logger) { resp.WriteHeader(http.StatusForbidden) return fmt.Errorf("token is from subject %q, but only %q are part of applying event policies", idToken.Subject, subjectsFromApplyingPolicies) } @@ -255,3 +267,8 @@ type openIDMetadata struct { SubjectTypes []string `json:"subject_types_supported"` SigningAlgs []string `json:"id_token_signing_alg_values_supported"` } + +type filtersBySubjects struct { + filters []eventingv1.SubscriptionsAPIFilter + subjects []string +} diff --git a/pkg/broker/filter/filter_handler.go b/pkg/broker/filter/filter_handler.go index 8ea2565ebfa..ab61e11f75e 100644 --- a/pkg/broker/filter/filter_handler.go +++ b/pkg/broker/filter/filter_handler.go @@ -108,7 +108,7 @@ func NewHandler(logger *zap.Logger, tokenVerifier *auth.OIDCTokenVerifier, oidcT return } logger.Debug("Adding filter to filtersMap") - fm.Set(trigger, createSubscriptionsAPIFilters(logger, trigger)) + fm.Set(trigger, subscriptionsapi.CreateSubscriptionsAPIFilters(logger, trigger.Spec.Filters)) kncloudevents.AddOrUpdateAddressableHandler(clientConfig, duckv1.Addressable{ URL: trigger.Status.SubscriberURI, CACerts: trigger.Status.SubscriberCACerts, @@ -120,7 +120,7 @@ func NewHandler(logger *zap.Logger, tokenVerifier *auth.OIDCTokenVerifier, oidcT return } logger.Debug("Updating filter in filtersMap") - fm.Set(trigger, createSubscriptionsAPIFilters(logger, trigger)) + fm.Set(trigger, subscriptionsapi.CreateSubscriptionsAPIFilters(logger, trigger.Spec.Filters)) kncloudevents.AddOrUpdateAddressableHandler(clientConfig, duckv1.Addressable{ URL: trigger.Status.SubscriberURI, CACerts: trigger.Status.SubscriberCACerts, @@ -581,70 +581,7 @@ func (h *Handler) filterEvent(ctx context.Context, trigger *eventingv1.Trigger, } func applySubscriptionsAPIFilters(ctx context.Context, trigger *eventingv1.Trigger, event cloudevents.Event) eventfilter.FilterResult { - return createSubscriptionsAPIFilters(logging.FromContext(ctx).Desugar(), trigger).Filter(ctx, event) -} - -func createSubscriptionsAPIFilters(logger *zap.Logger, trigger *eventingv1.Trigger) eventfilter.Filter { - if len(trigger.Spec.Filters) == 0 { - logger.Debug("Found no filters for trigger", zap.Any("trigger.Spec", trigger.Spec)) - return subscriptionsapi.NewNoFilter() - } - return subscriptionsapi.NewAllFilter(MaterializeFiltersList(logger, trigger.Spec.Filters)...) -} - -func materializeSubscriptionsAPIFilter(logger *zap.Logger, filter eventingv1.SubscriptionsAPIFilter) eventfilter.Filter { - var materializedFilter eventfilter.Filter - var err error - switch { - case len(filter.Exact) > 0: - // The webhook validates that this map has only a single key:value pair. - materializedFilter, err = subscriptionsapi.NewExactFilter(filter.Exact) - if err != nil { - logger.Debug("Invalid exact expression", zap.Any("filters", filter.Exact), zap.Error(err)) - return nil - } - case len(filter.Prefix) > 0: - // The webhook validates that this map has only a single key:value pair. - materializedFilter, err = subscriptionsapi.NewPrefixFilter(filter.Prefix) - if err != nil { - logger.Debug("Invalid prefix expression", zap.Any("filters", filter.Exact), zap.Error(err)) - return nil - } - case len(filter.Suffix) > 0: - // The webhook validates that this map has only a single key:value pair. - materializedFilter, err = subscriptionsapi.NewSuffixFilter(filter.Suffix) - if err != nil { - logger.Debug("Invalid suffix expression", zap.Any("filters", filter.Exact), zap.Error(err)) - return nil - } - case len(filter.All) > 0: - materializedFilter = subscriptionsapi.NewAllFilter(MaterializeFiltersList(logger, filter.All)...) - case len(filter.Any) > 0: - materializedFilter = subscriptionsapi.NewAnyFilter(MaterializeFiltersList(logger, filter.Any)...) - case filter.Not != nil: - materializedFilter = subscriptionsapi.NewNotFilter(materializeSubscriptionsAPIFilter(logger, *filter.Not)) - case filter.CESQL != "": - if materializedFilter, err = subscriptionsapi.NewCESQLFilter(filter.CESQL); err != nil { - // This is weird, CESQL expression should be validated when Trigger's are created. - logger.Debug("Found an Invalid CE SQL expression", zap.String("expression", filter.CESQL)) - return nil - } - } - return materializedFilter -} - -// MaterialzieFilterList allows any component that supports `SubscriptionsAPIFilter` to process them -func MaterializeFiltersList(logger *zap.Logger, filters []eventingv1.SubscriptionsAPIFilter) []eventfilter.Filter { - materializedFilters := make([]eventfilter.Filter, 0, len(filters)) - for _, f := range filters { - f := materializeSubscriptionsAPIFilter(logger, f) - if f == nil { - logger.Warn("Failed to parse filter. Skipping filter.", zap.Any("filter", f)) - continue - } - materializedFilters = append(materializedFilters, f) - } - return materializedFilters + return subscriptionsapi.CreateSubscriptionsAPIFilters(logging.FromContext(ctx).Desugar(), trigger.Spec.Filters).Filter(ctx, event) } func applyAttributesFilter(ctx context.Context, filter *eventingv1.TriggerFilter, event cloudevents.Event) eventfilter.FilterResult { diff --git a/pkg/broker/filter/filter_handler_test.go b/pkg/broker/filter/filter_handler_test.go index e220e401774..062355015e1 100644 --- a/pkg/broker/filter/filter_handler_test.go +++ b/pkg/broker/filter/filter_handler_test.go @@ -651,7 +651,7 @@ func TestReceiver_WithSubscriptionsAPI(t *testing.T) { trig.Status.SubscriberURI = url } triggerinformerfake.Get(ctx).Informer().GetStore().Add(trig) - filtersMap.Set(trig, createSubscriptionsAPIFilters(logging.FromContext(ctx).Desugar(), trig)) + filtersMap.Set(trig, subscriptionsapi.CreateSubscriptionsAPIFilters(logging.FromContext(ctx).Desugar(), trig.Spec.Filters)) // create the needed broker object b := &v1.Broker{ diff --git a/pkg/eventfilter/subscriptionsapi/create.go b/pkg/eventfilter/subscriptionsapi/create.go new file mode 100644 index 00000000000..f3a44c24f5e --- /dev/null +++ b/pkg/eventfilter/subscriptionsapi/create.go @@ -0,0 +1,86 @@ +/* +Copyright 2024 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package subscriptionsapi + +import ( + "go.uber.org/zap" + "knative.dev/eventing/pkg/apis/eventing/v1" + "knative.dev/eventing/pkg/eventfilter" +) + +func MaterializeSubscriptionsAPIFilter(logger *zap.Logger, filter v1.SubscriptionsAPIFilter) eventfilter.Filter { + var materializedFilter eventfilter.Filter + var err error + switch { + case len(filter.Exact) > 0: + // The webhook validates that this map has only a single key:value pair. + materializedFilter, err = NewExactFilter(filter.Exact) + if err != nil { + logger.Debug("Invalid exact expression", zap.Any("filters", filter.Exact), zap.Error(err)) + return nil + } + case len(filter.Prefix) > 0: + // The webhook validates that this map has only a single key:value pair. + materializedFilter, err = NewPrefixFilter(filter.Prefix) + if err != nil { + logger.Debug("Invalid prefix expression", zap.Any("filters", filter.Exact), zap.Error(err)) + return nil + } + case len(filter.Suffix) > 0: + // The webhook validates that this map has only a single key:value pair. + materializedFilter, err = NewSuffixFilter(filter.Suffix) + if err != nil { + logger.Debug("Invalid suffix expression", zap.Any("filters", filter.Exact), zap.Error(err)) + return nil + } + case len(filter.All) > 0: + materializedFilter = NewAllFilter(MaterializeFiltersList(logger, filter.All)...) + case len(filter.Any) > 0: + materializedFilter = NewAnyFilter(MaterializeFiltersList(logger, filter.Any)...) + case filter.Not != nil: + materializedFilter = NewNotFilter(MaterializeSubscriptionsAPIFilter(logger, *filter.Not)) + case filter.CESQL != "": + if materializedFilter, err = NewCESQLFilter(filter.CESQL); err != nil { + // This is weird, CESQL expression should be validated when Trigger's are created. + logger.Debug("Found an Invalid CE SQL expression", zap.String("expression", filter.CESQL)) + return nil + } + } + return materializedFilter +} + +func CreateSubscriptionsAPIFilters(logger *zap.Logger, filters []v1.SubscriptionsAPIFilter) eventfilter.Filter { + if len(filters) == 0 { + logger.Debug("no filters provided") + return NewNoFilter() + } + return NewAllFilter(MaterializeFiltersList(logger, filters)...) +} + +// MaterialzieFilterList allows any component that supports `SubscriptionsAPIFilter` to process them +func MaterializeFiltersList(logger *zap.Logger, filters []v1.SubscriptionsAPIFilter) []eventfilter.Filter { + materializedFilters := make([]eventfilter.Filter, 0, len(filters)) + for _, f := range filters { + f := MaterializeSubscriptionsAPIFilter(logger, f) + if f == nil { + logger.Warn("Failed to parse filter. Skipping filter.", zap.Any("filter", f)) + continue + } + materializedFilters = append(materializedFilters, f) + } + return materializedFilters +} From f6fad57ee9270bde95c23f74008e264031840345 Mon Sep 17 00:00:00 2001 From: Calum Murray Date: Mon, 12 Aug 2024 10:07:43 -0400 Subject: [PATCH 4/9] fix build issues, add unit tests Signed-off-by: Calum Murray --- pkg/auth/event_policy.go | 2 +- pkg/auth/event_policy_test.go | 139 ++++++++++++++++----- pkg/auth/token_verifier.go | 2 +- pkg/eventfilter/subscriptionsapi/create.go | 2 +- 4 files changed, 109 insertions(+), 36 deletions(-) diff --git a/pkg/auth/event_policy.go b/pkg/auth/event_policy.go index 0be345a8e94..65e4522d6de 100644 --- a/pkg/auth/event_policy.go +++ b/pkg/auth/event_policy.go @@ -226,7 +226,7 @@ func SubjectAndFiltersPass(ctx context.Context, sub string, allowedSubsWithFilte for _, swf := range allowedSubsWithFilters { for _, s := range swf.subjects { if strings.EqualFold(s, sub) || (strings.HasSuffix(s, "*") && strings.HasPrefix(sub, strings.TrimSuffix(s, "*"))) { - return subscriptionsapi.CreateSubscriptionsAPIFilters(logger, swf.filters).Filter(ctx, *event) != eventfilter.FailFilter + return subscriptionsapi.CreateSubscriptionsAPIFilters(logger.Desugar(), swf.filters).Filter(ctx, *event) != eventfilter.FailFilter } } } diff --git a/pkg/auth/event_policy_test.go b/pkg/auth/event_policy_test.go index e08041b3d99..f666161b269 100644 --- a/pkg/auth/event_policy_test.go +++ b/pkg/auth/event_policy_test.go @@ -22,7 +22,11 @@ import ( "strings" "testing" + "go.uber.org/zap" "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/utils/ptr" + + cetest "github.com/cloudevents/sdk-go/v2/test" "github.com/google/go-cmp/cmp" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -38,7 +42,6 @@ import ( duckv1 "knative.dev/pkg/apis/duck/v1" "knative.dev/pkg/client/injection/ducks/duck/v1/authstatus" fakedynamicclient "knative.dev/pkg/injection/clients/dynamicclient/fake" - "knative.dev/pkg/ptr" reconcilertesting "knative.dev/pkg/reconciler/testing" "knative.dev/pkg/resolver" "knative.dev/pkg/tracker" @@ -541,9 +544,9 @@ func TestResolveSubjects(t *testing.T) { Namespace: namespace, }, }, { - Sub: ptr.String("system:serviceaccount:my-ns:my-app"), + Sub: ptr.To("system:serviceaccount:my-ns:my-app"), }, { - Sub: ptr.String("system:serviceaccount:my-ns:my-app-2"), + Sub: ptr.To("system:serviceaccount:my-ns:my-app-2"), }, }, objects: []runtime.Object{ @@ -555,7 +558,7 @@ func TestResolveSubjects(t *testing.T) { Status: sourcesv1.ApiServerSourceStatus{ SourceStatus: duckv1.SourceStatus{ Auth: &duckv1.AuthStatus{ - ServiceAccountName: ptr.String("my-apiserversource-oidc-sa"), + ServiceAccountName: ptr.To("my-apiserversource-oidc-sa"), }, }, }, @@ -591,9 +594,9 @@ func TestResolveSubjects(t *testing.T) { Namespace: namespace, }, }, { - Sub: ptr.String("system:serviceaccount:my-ns:my-app"), + Sub: ptr.To("system:serviceaccount:my-ns:my-app"), }, { - Sub: ptr.String("system:serviceaccount:my-ns:my-app-2"), + Sub: ptr.To("system:serviceaccount:my-ns:my-app-2"), }, }, objects: []runtime.Object{ @@ -605,7 +608,7 @@ func TestResolveSubjects(t *testing.T) { Status: sourcesv1.ApiServerSourceStatus{ SourceStatus: duckv1.SourceStatus{ Auth: &duckv1.AuthStatus{ - ServiceAccountName: ptr.String("my-apiserversource-oidc-sa"), + ServiceAccountName: ptr.To("my-apiserversource-oidc-sa"), }, }, }, @@ -618,7 +621,7 @@ func TestResolveSubjects(t *testing.T) { Status: sourcesv1.PingSourceStatus{ SourceStatus: duckv1.SourceStatus{ Auth: &duckv1.AuthStatus{ - ServiceAccountName: ptr.String("my-pingsource-oidc-sa"), + ServiceAccountName: ptr.To("my-pingsource-oidc-sa"), }, }, }, @@ -692,78 +695,148 @@ func TestResolveSubjects(t *testing.T) { } } -func TestSubjectContained(t *testing.T) { +func TestSubjectAndFiltersContained(t *testing.T) { tests := []struct { - name string - sub string - allowedSubs []string - want bool + name string + sub string + allowedSubsAndFilters []filtersBySubjects + want bool }{ { name: "simple 1:1 match", sub: "system:serviceaccounts:my-ns:my-sa", - allowedSubs: []string{ - "system:serviceaccounts:my-ns:my-sa", + allowedSubsAndFilters: []filtersBySubjects{ + { + subjects: []string{"system:serviceaccounts:my-ns:my-sa"}, + }, }, want: true, }, { name: "simple 1:n match", sub: "system:serviceaccounts:my-ns:my-sa", - allowedSubs: []string{ - "system:serviceaccounts:my-ns:another-sa", - "system:serviceaccounts:my-ns:my-sa", - "system:serviceaccounts:my-ns:yet-another-sa", + allowedSubsAndFilters: []filtersBySubjects{ + { + subjects: []string{ + "system:serviceaccounts:my-ns:another-sa", + "system:serviceaccounts:my-ns:my-sa", + "system:serviceaccounts:my-ns:yet-another-sa"}, + }, }, want: true, }, { name: "pattern match (all)", sub: "system:serviceaccounts:my-ns:my-sa", - allowedSubs: []string{ - "*", + allowedSubsAndFilters: []filtersBySubjects{ + { + subjects: []string{ + "*"}, + }, }, want: true, }, { name: "pattern match (namespace)", sub: "system:serviceaccounts:my-ns:my-sa", - allowedSubs: []string{ - "system:serviceaccounts:my-ns:*", + allowedSubsAndFilters: []filtersBySubjects{ + { + subjects: []string{ + "system:serviceaccounts:my-ns:*", + }, + }, }, want: true, }, { name: "pattern match (different namespace)", sub: "system:serviceaccounts:my-ns-2:my-sa", - allowedSubs: []string{ - "system:serviceaccounts:my-ns:*", + allowedSubsAndFilters: []filtersBySubjects{ + { + subjects: []string{ + "system:serviceaccounts:my-ns:*", + }, + }, }, want: false, }, { name: "pattern match (namespace prefix)", sub: "system:serviceaccounts:my-ns:my-sa", - allowedSubs: []string{ - "system:serviceaccounts:my-ns*", + allowedSubsAndFilters: []filtersBySubjects{ + { + subjects: []string{ + "system:serviceaccounts:my-ns*", + }, + }, }, want: true, }, { name: "pattern match (namespace prefix 2)", sub: "system:serviceaccounts:my-ns-2:my-sa", - allowedSubs: []string{ - "system:serviceaccounts:my-ns*", + allowedSubsAndFilters: []filtersBySubjects{ + { + subjects: []string{ + "system:serviceaccounts:my-ns*", + }, + }, }, want: true, }, { name: "pattern match (middle)", sub: "system:serviceaccounts:my-ns:my-sa", - allowedSubs: []string{ - "system:serviceaccounts:*:my-sa", + allowedSubsAndFilters: []filtersBySubjects{ + { + subjects: []string{ + "system:serviceaccounts:*:my-sa", + }, + }, + }, + want: false, + }, { + name: "pattern match (namespace prefix) and failing event filter", + sub: "system:serviceaccounts:my-ns:my-sa", + allowedSubsAndFilters: []filtersBySubjects{ + { + subjects: []string{ + "system:serviceaccounts:my-ns*", + }, + filters: []eventingv1.SubscriptionsAPIFilter{ + { + CESQL: "false", + }, + }, + }, + }, + want: false, + }, { + name: "only check filter if subject matches", + sub: "system:serviceaccounts:my-ns:my-sa", + allowedSubsAndFilters: []filtersBySubjects{ + { + subjects: []string{ + "system:serviceaccounts:not-my-ns*", + }, + filters: []eventingv1.SubscriptionsAPIFilter{ + { + CESQL: "true", + }, + }, + }, + { + subjects: []string{ + "system:serviceaccounts:my-ns*", + }, + filters: []eventingv1.SubscriptionsAPIFilter{ + { + CESQL: "false", + }, + }, + }, }, want: false, }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - if got := SubjectAndFiltersPass(tt.sub, tt.allowedSubs); got != tt.want { - t.Errorf("SubjectAndFiltersPass(%q, '%v') = %v, want %v", tt.sub, tt.allowedSubs, got, tt.want) + if got := SubjectAndFiltersPass(context.Background(), tt.sub, tt.allowedSubsAndFilters, ptr.To(cetest.MinEvent()), zap.NewNop().Sugar()); got != tt.want { + t.Errorf("SubjectAndFiltersPass(%q, '%v') = %v, want %v", tt.sub, tt.allowedSubsAndFilters, got, tt.want) } }) } diff --git a/pkg/auth/token_verifier.go b/pkg/auth/token_verifier.go index d9a8d8a48d0..280d65a175b 100644 --- a/pkg/auth/token_verifier.go +++ b/pkg/auth/token_verifier.go @@ -150,7 +150,7 @@ func (v *OIDCTokenVerifier) verifyAuthZ(ctx context.Context, features feature.Fl if !SubjectAndFiltersPass(ctx, idToken.Subject, subjectsFromApplyingPolicies, event, v.logger) { resp.WriteHeader(http.StatusForbidden) - return fmt.Errorf("token is from subject %q, but only %q are part of applying event policies", idToken.Subject, subjectsFromApplyingPolicies) + return fmt.Errorf("token is from subject %q, but only %#v are part of applying event policies", idToken.Subject, subjectsFromApplyingPolicies) } return nil diff --git a/pkg/eventfilter/subscriptionsapi/create.go b/pkg/eventfilter/subscriptionsapi/create.go index f3a44c24f5e..008bfcd05bd 100644 --- a/pkg/eventfilter/subscriptionsapi/create.go +++ b/pkg/eventfilter/subscriptionsapi/create.go @@ -18,7 +18,7 @@ package subscriptionsapi import ( "go.uber.org/zap" - "knative.dev/eventing/pkg/apis/eventing/v1" + v1 "knative.dev/eventing/pkg/apis/eventing/v1" "knative.dev/eventing/pkg/eventfilter" ) From adab227d9c2737adf40f8babddf35688cc0f6966 Mon Sep 17 00:00:00 2001 From: Calum Murray Date: Wed, 14 Aug 2024 15:06:10 -0400 Subject: [PATCH 5/9] address review comments Signed-off-by: Calum Murray --- pkg/auth/event_policy.go | 5 ++- pkg/auth/event_policy_test.go | 22 +++++------ pkg/auth/token_verifier.go | 46 +++++++++++++++++++--- pkg/eventfilter/subscriptionsapi/create.go | 1 + 4 files changed, 56 insertions(+), 18 deletions(-) diff --git a/pkg/auth/event_policy.go b/pkg/auth/event_policy.go index 65e4522d6de..972eb1147ef 100644 --- a/pkg/auth/event_policy.go +++ b/pkg/auth/event_policy.go @@ -217,8 +217,9 @@ func resolveSubjectsFromReference(resolver *resolver.AuthenticatableResolver, re } // SubjectAndFiltersPass checks if the given sub is contained in the list of allowedSubs -// or if it matches a prefix pattern in subs (e.g. system:serviceaccounts:my-ns:*) -func SubjectAndFiltersPass(ctx context.Context, sub string, allowedSubsWithFilters []filtersBySubjects, event *cloudevents.Event, logger *zap.SugaredLogger) bool { +// or if it matches a prefix pattern in subs (e.g. system:serviceaccounts:my-ns:*), as +// well as if the event passes any filters associated with the subjects for an event policy +func SubjectAndFiltersPass(ctx context.Context, sub string, allowedSubsWithFilters []subjectsWithFilters, event *cloudevents.Event, logger *zap.SugaredLogger) bool { if event == nil { return false } diff --git a/pkg/auth/event_policy_test.go b/pkg/auth/event_policy_test.go index f666161b269..c7396869b63 100644 --- a/pkg/auth/event_policy_test.go +++ b/pkg/auth/event_policy_test.go @@ -700,13 +700,13 @@ func TestSubjectAndFiltersContained(t *testing.T) { tests := []struct { name string sub string - allowedSubsAndFilters []filtersBySubjects + allowedSubsAndFilters []subjectsWithFilters want bool }{ { name: "simple 1:1 match", sub: "system:serviceaccounts:my-ns:my-sa", - allowedSubsAndFilters: []filtersBySubjects{ + allowedSubsAndFilters: []subjectsWithFilters{ { subjects: []string{"system:serviceaccounts:my-ns:my-sa"}, }, @@ -715,7 +715,7 @@ func TestSubjectAndFiltersContained(t *testing.T) { }, { name: "simple 1:n match", sub: "system:serviceaccounts:my-ns:my-sa", - allowedSubsAndFilters: []filtersBySubjects{ + allowedSubsAndFilters: []subjectsWithFilters{ { subjects: []string{ "system:serviceaccounts:my-ns:another-sa", @@ -727,7 +727,7 @@ func TestSubjectAndFiltersContained(t *testing.T) { }, { name: "pattern match (all)", sub: "system:serviceaccounts:my-ns:my-sa", - allowedSubsAndFilters: []filtersBySubjects{ + allowedSubsAndFilters: []subjectsWithFilters{ { subjects: []string{ "*"}, @@ -737,7 +737,7 @@ func TestSubjectAndFiltersContained(t *testing.T) { }, { name: "pattern match (namespace)", sub: "system:serviceaccounts:my-ns:my-sa", - allowedSubsAndFilters: []filtersBySubjects{ + allowedSubsAndFilters: []subjectsWithFilters{ { subjects: []string{ "system:serviceaccounts:my-ns:*", @@ -748,7 +748,7 @@ func TestSubjectAndFiltersContained(t *testing.T) { }, { name: "pattern match (different namespace)", sub: "system:serviceaccounts:my-ns-2:my-sa", - allowedSubsAndFilters: []filtersBySubjects{ + allowedSubsAndFilters: []subjectsWithFilters{ { subjects: []string{ "system:serviceaccounts:my-ns:*", @@ -759,7 +759,7 @@ func TestSubjectAndFiltersContained(t *testing.T) { }, { name: "pattern match (namespace prefix)", sub: "system:serviceaccounts:my-ns:my-sa", - allowedSubsAndFilters: []filtersBySubjects{ + allowedSubsAndFilters: []subjectsWithFilters{ { subjects: []string{ "system:serviceaccounts:my-ns*", @@ -770,7 +770,7 @@ func TestSubjectAndFiltersContained(t *testing.T) { }, { name: "pattern match (namespace prefix 2)", sub: "system:serviceaccounts:my-ns-2:my-sa", - allowedSubsAndFilters: []filtersBySubjects{ + allowedSubsAndFilters: []subjectsWithFilters{ { subjects: []string{ "system:serviceaccounts:my-ns*", @@ -781,7 +781,7 @@ func TestSubjectAndFiltersContained(t *testing.T) { }, { name: "pattern match (middle)", sub: "system:serviceaccounts:my-ns:my-sa", - allowedSubsAndFilters: []filtersBySubjects{ + allowedSubsAndFilters: []subjectsWithFilters{ { subjects: []string{ "system:serviceaccounts:*:my-sa", @@ -792,7 +792,7 @@ func TestSubjectAndFiltersContained(t *testing.T) { }, { name: "pattern match (namespace prefix) and failing event filter", sub: "system:serviceaccounts:my-ns:my-sa", - allowedSubsAndFilters: []filtersBySubjects{ + allowedSubsAndFilters: []subjectsWithFilters{ { subjects: []string{ "system:serviceaccounts:my-ns*", @@ -808,7 +808,7 @@ func TestSubjectAndFiltersContained(t *testing.T) { }, { name: "only check filter if subject matches", sub: "system:serviceaccounts:my-ns:my-sa", - allowedSubsAndFilters: []filtersBySubjects{ + allowedSubsAndFilters: []subjectsWithFilters{ { subjects: []string{ "system:serviceaccounts:not-my-ns*", diff --git a/pkg/auth/token_verifier.go b/pkg/auth/token_verifier.go index 280d65a175b..fe656cc61d3 100644 --- a/pkg/auth/token_verifier.go +++ b/pkg/auth/token_verifier.go @@ -17,6 +17,7 @@ limitations under the License. package auth import ( + "bytes" "context" "encoding/json" "fmt" @@ -128,6 +129,12 @@ func (v *OIDCTokenVerifier) verifyAuthN(ctx context.Context, audience *string, r // verifyAuthZ verifies if the given idToken is allowed by the resources eventPolicyStatus func (v *OIDCTokenVerifier) verifyAuthZ(ctx context.Context, features feature.Flags, idToken *IDToken, resourceNamespace string, policyRefs []duckv1.AppliedEventPolicyRef, req *http.Request, resp http.ResponseWriter) error { if len(policyRefs) > 0 { + req, err := copyRequest(req) + if err != nil { + resp.WriteHeader(http.StatusInternalServerError) + return fmt.Errorf("failed to copy request body: %w", err) + } + message := cehttp.NewMessageFromHttpRequest(req) defer message.Finish(nil) @@ -137,7 +144,7 @@ func (v *OIDCTokenVerifier) verifyAuthZ(ctx context.Context, features feature.Fl return fmt.Errorf("failed to decode event from request: %w", err) } - subjectsFromApplyingPolicies := []filtersBySubjects{} + subjectsWithFiltersFromApplyingPolicies := []subjectsWithFilters{} for _, p := range policyRefs { policy, err := v.eventPolicyLister.EventPolicies(resourceNamespace).Get(p.Name) if err != nil { @@ -145,12 +152,12 @@ func (v *OIDCTokenVerifier) verifyAuthZ(ctx context.Context, features feature.Fl return fmt.Errorf("failed to get eventPolicy: %w", err) } - subjectsFromApplyingPolicies = append(subjectsFromApplyingPolicies, filtersBySubjects{subjects: policy.Status.From, filters: policy.Spec.Filters}) + subjectsWithFiltersFromApplyingPolicies = append(subjectsWithFiltersFromApplyingPolicies, subjectsWithFilters{subjects: policy.Status.From, filters: policy.Spec.Filters}) } - if !SubjectAndFiltersPass(ctx, idToken.Subject, subjectsFromApplyingPolicies, event, v.logger) { + if !SubjectAndFiltersPass(ctx, idToken.Subject, subjectsWithFiltersFromApplyingPolicies, event, v.logger) { resp.WriteHeader(http.StatusForbidden) - return fmt.Errorf("token is from subject %q, but only %#v are part of applying event policies", idToken.Subject, subjectsFromApplyingPolicies) + return fmt.Errorf("token is from subject %q, but only %#v are part of applying event policies", idToken.Subject, subjectsWithFiltersFromApplyingPolicies) } return nil @@ -260,6 +267,35 @@ func (v *OIDCTokenVerifier) getKubernetesOIDCDiscovery() (*openIDMetadata, error return openIdConfig, nil } +// copyRequest makes a copy of the http request which can be consumed as needed, leaving the original request +// able to be consumed as well. +func copyRequest(req *http.Request) (*http.Request, error) { + // check if we actually need to copy the body, otherwise we can return the original request + if req.Body == nil || req.Body == http.NoBody { + return req, nil + } + + var buf bytes.Buffer + if _, err := buf.ReadFrom(req.Body); err != nil { + return nil, fmt.Errorf("failed to read request body while copying it: %w", err) + } + + if err := req.Body.Close(); err != nil { + return nil, fmt.Errorf("failed to close original request body ready while copying request: %w", err) + } + + // set the original request body to be readable again + req.Body = io.NopCloser(&buf) + + // return a new request with a readable body and same headers as the original + // we don't need to set any other fields as cloudevents only uses the headers + // and body to construct the Message/Event. + return &http.Request{ + Header: req.Header, + Body: io.NopCloser(bytes.NewReader(buf.Bytes())), + }, nil +} + type openIDMetadata struct { Issuer string `json:"issuer"` JWKSURI string `json:"jwks_uri"` @@ -268,7 +304,7 @@ type openIDMetadata struct { SigningAlgs []string `json:"id_token_signing_alg_values_supported"` } -type filtersBySubjects struct { +type subjectsWithFilters struct { filters []eventingv1.SubscriptionsAPIFilter subjects []string } diff --git a/pkg/eventfilter/subscriptionsapi/create.go b/pkg/eventfilter/subscriptionsapi/create.go index 008bfcd05bd..3f56f6e7f0c 100644 --- a/pkg/eventfilter/subscriptionsapi/create.go +++ b/pkg/eventfilter/subscriptionsapi/create.go @@ -22,6 +22,7 @@ import ( "knative.dev/eventing/pkg/eventfilter" ) +// MaterializeSubscriptionsAPIFilter materializes a SubscriptionsAPIFilter into a runnable Filter. func MaterializeSubscriptionsAPIFilter(logger *zap.Logger, filter v1.SubscriptionsAPIFilter) eventfilter.Filter { var materializedFilter eventfilter.Filter var err error From 213c77a59f3d61f28be695ac1a735480f61b14ce Mon Sep 17 00:00:00 2001 From: Calum Murray Date: Wed, 14 Aug 2024 15:28:50 -0400 Subject: [PATCH 6/9] test: added rekt test for eventpolicy with filters Signed-off-by: Calum Murray --- .../authz/addressable_authz_conformance.go | 68 +++++++++++++++++++ .../rekt/resources/eventpolicy/eventpolicy.go | 33 ++++++++- .../resources/eventpolicy/eventpolicy.yaml | 5 ++ 3 files changed, 104 insertions(+), 2 deletions(-) diff --git a/test/rekt/features/authz/addressable_authz_conformance.go b/test/rekt/features/authz/addressable_authz_conformance.go index 0d633804a23..96129b4ab0c 100644 --- a/test/rekt/features/authz/addressable_authz_conformance.go +++ b/test/rekt/features/authz/addressable_authz_conformance.go @@ -21,6 +21,7 @@ import ( "fmt" "time" + eventingv1 "knative.dev/eventing/pkg/apis/eventing/v1" "knative.dev/eventing/test/rekt/resources/eventpolicy" "knative.dev/eventing/test/rekt/resources/pingsource" "knative.dev/reconciler-test/pkg/environment" @@ -125,6 +126,73 @@ func addressableRejectsUnauthorizedRequest(gvr schema.GroupVersionResource, kind return f } +func addressableRespectsEventPolicyFilters(gvr schema.GroupVersionResource, kind, name string) *feature.Feature { + f := feature.NewFeatureNamed(fmt.Sprintf("%s only admits events that pass the event policy filter")) + + f.Prerequisite("OIDC authentication is enabled", featureflags.AuthenticationOIDCEnabled()) + f.Prerequisite("transport encryption is strict", featureflags.TransportEncryptionStrict()) + f.Prerequisite("should not run when Istio is enabled", featureflags.IstioDisabled()) + + eventPolicy := feature.MakeRandomK8sName("eventpolicy") + source1 := feature.MakeRandomK8sName("source") + sourceSubject1 := feature.MakeRandomK8sName("source-oidc-identity") + source2 := feature.MakeRandomK8sName("source") + sourceSubject2 := feature.MakeRandomK8sName("source-oidc-identity") + + event1 := test.FullEvent() + event1.SetType("valid.event.type") + event1.SetID("1") + event2 := test.FullEvent() + event2.SetType("invalid.event.type") + event2.SetID("2") + + // Install event policy + f.Setup("Install the EventPolicy", func(ctx context.Context, t feature.T) { + namespace := environment.FromContext(ctx).Namespace() + eventpolicy.Install( + eventPolicy, + eventpolicy.WithToRef( + gvr.GroupVersion().WithKind(kind), + name), + eventpolicy.WithFromSubject(fmt.Sprintf("system:serviceaccount:%s:%s", namespace, sourceSubject1)), + eventpolicy.WithFromSubject(fmt.Sprintf("system:serviceaccount:%s:%s", namespace, sourceSubject2)), + eventpolicy.WithFilters([]eventingv1.SubscriptionsAPIFilter{ + { + Prefix: map[string]string{ + "type": "valid", + }, + }, + }), + )(ctx, t) + }) + f.Setup(fmt.Sprintf("EventPolicy for %s %s is ready", kind, name), k8s.IsReady(gvr, name)) + + // Install source + f.Requirement("install source 1", eventshub.Install( + source1, + eventshub.StartSenderToResourceTLS(gvr, name, nil), + eventshub.InputEvent(event1), + eventshub.OIDCSubject(sourceSubject1), + )) + + f.Requirement("install source 2", eventshub.Install( + source2, + eventshub.StartSenderToResourceTLS(gvr, name, nil), + eventshub.InputEvent(event2), + eventshub.OIDCSubject(sourceSubject2), + )) + + f.Alpha(kind). + Must("valid event sent", eventassert.OnStore(source1).MatchSentEvent(test.HasId(event1.ID())).Exact(1)). + Must("get 202 on response", eventassert.OnStore(source1).Match(eventassert.MatchStatusCode(202)).AtLeast(1)) + + f.Alpha(kind). + Must("invalid event sent", eventassert.OnStore(source2).MatchSentEvent(test.HasId(event2.ID())).Exact(1)). + Must("get 403 on response", eventassert.OnStore(source2).Match(eventassert.MatchStatusCode(403)).AtLeast(1)) + + return f +} + func addressableBecomesUnreadyOnUnreadyEventPolicy(gvr schema.GroupVersionResource, kind, name string) *feature.Feature { f := feature.NewFeatureNamed(fmt.Sprintf("%s becomes NotReady when EventPolicy is NotReady", kind)) diff --git a/test/rekt/resources/eventpolicy/eventpolicy.go b/test/rekt/resources/eventpolicy/eventpolicy.go index 879eabfd7e5..6d44293bd13 100644 --- a/test/rekt/resources/eventpolicy/eventpolicy.go +++ b/test/rekt/resources/eventpolicy/eventpolicy.go @@ -19,6 +19,8 @@ package eventpolicy import ( "context" "embed" + "encoding/json" + "strings" "time" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -26,10 +28,13 @@ import ( "knative.dev/reconciler-test/pkg/feature" "knative.dev/reconciler-test/pkg/k8s" "knative.dev/reconciler-test/pkg/manifest" + "sigs.k8s.io/yaml" + + eventingv1 "knative.dev/eventing/pkg/apis/eventing/v1" ) //go:embed *.yaml -var yaml embed.FS +var yamlEmbed embed.FS func GVR() schema.GroupVersionResource { return schema.GroupVersionResource{Group: "eventing.knative.dev", Version: "v1alpha1", Resource: "eventpolicies"} @@ -44,7 +49,7 @@ func Install(name string, opts ...manifest.CfgFn) feature.StepFn { fn(cfg) } return func(ctx context.Context, t feature.T) { - if _, err := manifest.InstallYamlFS(ctx, yaml, cfg); err != nil { + if _, err := manifest.InstallYamlFS(ctx, yamlEmbed, cfg); err != nil { t.Fatal(err) } } @@ -133,6 +138,30 @@ func WithFromSubject(subject string) manifest.CfgFn { } } +func WithFilters(filters []eventingv1.SubscriptionsAPIFilter) manifest.CfgFn { + jsonBytes, err := json.Marshal(filters) + if err != nil { + panic(err) + } + + yamlBytes, err := yaml.JSONToYAML(jsonBytes) + if err != nil { + panic(err) + } + + filtersYaml := string(yamlBytes) + + lines := strings.Split(filtersYaml, "\n") + out := make([]string, 0, len(lines)) + for i := range lines { + out = append(out, " "+lines[i]) + } + + return func(m map[string]interface{}) { + m["filters"] = strings.Join(out, "\n") + } +} + // IsReady tests to see if an EventPolicy becomes ready within the time given. func IsReady(name string, timing ...time.Duration) feature.StepFn { return k8s.IsReady(GVR(), name, timing...) diff --git a/test/rekt/resources/eventpolicy/eventpolicy.yaml b/test/rekt/resources/eventpolicy/eventpolicy.yaml index 02a98422e67..3d2ef5ace20 100644 --- a/test/rekt/resources/eventpolicy/eventpolicy.yaml +++ b/test/rekt/resources/eventpolicy/eventpolicy.yaml @@ -67,3 +67,8 @@ spec: - sub: {{ $from.sub }} {{ end }} {{ end }} + + {{ if .filters }} + filters: +{{ .filters }} + {{ .end }} From 027e29d7dc8b9b4f2f955391c79343ec7b89617c Mon Sep 17 00:00:00 2001 From: Calum Murray Date: Wed, 14 Aug 2024 15:43:47 -0400 Subject: [PATCH 7/9] fix(test): event policy resources work Signed-off-by: Calum Murray --- test/rekt/resources/eventpolicy/eventpolicy.yaml | 2 +- test/rekt/resources/eventpolicy/eventpolicy_test.go | 8 ++++++++ 2 files changed, 9 insertions(+), 1 deletion(-) diff --git a/test/rekt/resources/eventpolicy/eventpolicy.yaml b/test/rekt/resources/eventpolicy/eventpolicy.yaml index 3d2ef5ace20..84925531e12 100644 --- a/test/rekt/resources/eventpolicy/eventpolicy.yaml +++ b/test/rekt/resources/eventpolicy/eventpolicy.yaml @@ -71,4 +71,4 @@ spec: {{ if .filters }} filters: {{ .filters }} - {{ .end }} + {{ end }} diff --git a/test/rekt/resources/eventpolicy/eventpolicy_test.go b/test/rekt/resources/eventpolicy/eventpolicy_test.go index 0832330d040..258579f5c28 100644 --- a/test/rekt/resources/eventpolicy/eventpolicy_test.go +++ b/test/rekt/resources/eventpolicy/eventpolicy_test.go @@ -20,6 +20,7 @@ import ( "embed" "os" + eventingv1 "knative.dev/eventing/pkg/apis/eventing/v1" "knative.dev/eventing/test/rekt/resources/broker" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -105,6 +106,11 @@ func Example_full() { "my-ns-2", ), eventpolicy.WithFromSubject("my-sub"), + eventpolicy.WithFilters([]eventingv1.SubscriptionsAPIFilter{ + { + CESQL: "type LIKE event.%.type", + }, + }), } for _, fn := range cfgFn { @@ -147,4 +153,6 @@ func Example_full() { // name: my-broker // namespace: my-ns-2 // - sub: my-sub + // filters: + // - cesql: type LIKE event.%.type } From c81aa9fb424ea9b708d56c755c0da63db5c41e6c Mon Sep 17 00:00:00 2001 From: Calum Murray Date: Wed, 14 Aug 2024 16:06:51 -0400 Subject: [PATCH 8/9] small fixes to rekt test Signed-off-by: Calum Murray --- test/rekt/features/authz/addressable_authz_conformance.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/test/rekt/features/authz/addressable_authz_conformance.go b/test/rekt/features/authz/addressable_authz_conformance.go index 96129b4ab0c..9b07fa840c3 100644 --- a/test/rekt/features/authz/addressable_authz_conformance.go +++ b/test/rekt/features/authz/addressable_authz_conformance.go @@ -43,6 +43,7 @@ func AddressableAuthZConformance(gvr schema.GroupVersionResource, kind, name str addressableAllowsAuthorizedRequest(gvr, kind, name), addressableRejectsUnauthorizedRequest(gvr, kind, name), addressableBecomesUnreadyOnUnreadyEventPolicy(gvr, kind, name), + addressableRespectsEventPolicyFilters(gvr, kind, name), }, } return &fs @@ -127,7 +128,7 @@ func addressableRejectsUnauthorizedRequest(gvr schema.GroupVersionResource, kind } func addressableRespectsEventPolicyFilters(gvr schema.GroupVersionResource, kind, name string) *feature.Feature { - f := feature.NewFeatureNamed(fmt.Sprintf("%s only admits events that pass the event policy filter")) + f := feature.NewFeatureNamed(fmt.Sprintf("%s only admits events that pass the event policy filter", kind)) f.Prerequisite("OIDC authentication is enabled", featureflags.AuthenticationOIDCEnabled()) f.Prerequisite("transport encryption is strict", featureflags.TransportEncryptionStrict()) From 01f12ec97ec53417cb03a8ae24985a22e1284d77 Mon Sep 17 00:00:00 2001 From: Calum Murray Date: Thu, 15 Aug 2024 10:59:23 -0400 Subject: [PATCH 9/9] fix: tests ran in correct order Signed-off-by: Calum Murray --- test/rekt/features/authz/addressable_authz_conformance.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/rekt/features/authz/addressable_authz_conformance.go b/test/rekt/features/authz/addressable_authz_conformance.go index 9b07fa840c3..9c9833967e7 100644 --- a/test/rekt/features/authz/addressable_authz_conformance.go +++ b/test/rekt/features/authz/addressable_authz_conformance.go @@ -42,8 +42,8 @@ func AddressableAuthZConformance(gvr schema.GroupVersionResource, kind, name str Features: []*feature.Feature{ addressableAllowsAuthorizedRequest(gvr, kind, name), addressableRejectsUnauthorizedRequest(gvr, kind, name), - addressableBecomesUnreadyOnUnreadyEventPolicy(gvr, kind, name), addressableRespectsEventPolicyFilters(gvr, kind, name), + addressableBecomesUnreadyOnUnreadyEventPolicy(gvr, kind, name), }, } return &fs