Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add SubscriptionsAPI filters to APIServerSource #7799

Merged
merged 1 commit into from
May 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions config/core/configmaps/features.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -60,3 +60,6 @@ data:
# For more details: https://github.com/knative/eventing/issues/7739
cross-namespace-event-links: "disabled"

# ALPHA feature: The new-apiserversource-filters flag allows you to use the new `filters` field
# in APIServerSource objects with its rich filtering capabilities.
new-apiserversource-filters: "disabled"
1 change: 1 addition & 0 deletions config/core/resources/apiserversource.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ spec:
properties:
spec:
type: object
x-kubernetes-preserve-unknown-fields: true
required:
- resources
properties:
Expand Down
40 changes: 39 additions & 1 deletion docs/eventing-api.md
Original file line number Diff line number Diff line change
Expand Up @@ -2101,7 +2101,7 @@ resolved delivery options.</p>
<h3 id="eventing.knative.dev/v1.SubscriptionsAPIFilter">SubscriptionsAPIFilter
</h3>
<p>
(<em>Appears on:</em><a href="#eventing.knative.dev/v1.SubscriptionsAPIFilter">SubscriptionsAPIFilter</a>, <a href="#eventing.knative.dev/v1.TriggerSpec">TriggerSpec</a>)
(<em>Appears on:</em><a href="#eventing.knative.dev/v1.SubscriptionsAPIFilter">SubscriptionsAPIFilter</a>, <a href="#eventing.knative.dev/v1.TriggerSpec">TriggerSpec</a>, <a href="#sources.knative.dev/v1.ApiServerSourceSpec">ApiServerSourceSpec</a>)
</p>
<p>
<p>SubscriptionsAPIFilter allows defining a filter expression using CloudEvents
Expand Down Expand Up @@ -5327,6 +5327,25 @@ Kubernetes meta/v1.LabelSelector
should be watched by the source.</p>
</td>
</tr>
<tr>
<td>
<code>filters</code><br/>
<em>
<a href="#eventing.knative.dev/v1.SubscriptionsAPIFilter">
[]SubscriptionsAPIFilter
</a>
</em>
</td>
<td>
<em>(Optional)</em>
<p>Filters is an experimental field that conforms to the CNCF CloudEvents Subscriptions
API. It&rsquo;s an array of filter expressions that evaluate to true or false.
If any filter expression in the array evaluates to false, the event MUST
NOT be sent to the Sink. If all the filter expressions in the array
evaluate to true, the event MUST be attempted to be delivered. Absence of
a filter or empty array implies a value of true.</p>
</td>
</tr>
</table>
</td>
</tr>
Expand Down Expand Up @@ -5934,6 +5953,25 @@ Kubernetes meta/v1.LabelSelector
should be watched by the source.</p>
</td>
</tr>
<tr>
<td>
<code>filters</code><br/>
<em>
<a href="#eventing.knative.dev/v1.SubscriptionsAPIFilter">
[]SubscriptionsAPIFilter
</a>
</em>
</td>
<td>
<em>(Optional)</em>
<p>Filters is an experimental field that conforms to the CNCF CloudEvents Subscriptions
API. It&rsquo;s an array of filter expressions that evaluate to true or false.
If any filter expression in the array evaluates to false, the event MUST
NOT be sent to the Sink. If all the filter expressions in the array
evaluate to true, the event MUST be attempted to be delivered. Absence of
a filter or empty array implies a value of true.</p>
</td>
</tr>
</tbody>
</table>
<h3 id="sources.knative.dev/v1.ApiServerSourceStatus">ApiServerSourceStatus
Expand Down
3 changes: 3 additions & 0 deletions pkg/adapter/apiserver/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ import (

"knative.dev/eventing/pkg/adapter/v2"
v1 "knative.dev/eventing/pkg/apis/sources/v1"
brokerfilter "knative.dev/eventing/pkg/broker/filter"
"knative.dev/eventing/pkg/eventfilter/subscriptionsapi"
)

type envConfig struct {
Expand Down Expand Up @@ -71,6 +73,7 @@ func (a *apiServerAdapter) start(ctx context.Context, stopCh <-chan struct{}) er
logger: a.logger,
ref: a.config.EventMode == v1.ReferenceMode,
apiServerSourceName: a.name,
filter: subscriptionsapi.NewAllFilter(brokerfilter.MaterializeFiltersList(a.logger.Desugar(), a.config.Filters)...),
}
if a.config.ResourceOwner != nil {
a.logger.Infow("will be filtered",
Expand Down
11 changes: 10 additions & 1 deletion pkg/adapter/apiserver/adapter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ import (
dynamicfake "k8s.io/client-go/dynamic/fake"
kubetesting "k8s.io/client-go/testing"
adaptertest "knative.dev/eventing/pkg/adapter/v2/test"
eventingv1 "knative.dev/eventing/pkg/apis/eventing/v1"
brokerfilter "knative.dev/eventing/pkg/broker/filter"
"knative.dev/eventing/pkg/eventfilter/subscriptionsapi"
rectesting "knative.dev/eventing/pkg/reconciler/testing"
"knative.dev/pkg/logging"
pkgtesting "knative.dev/pkg/reconciler/testing"
Expand Down Expand Up @@ -289,21 +292,27 @@ func validateNotSent(t *testing.T, ce *adaptertest.TestCloudEventsClient, want s

func makeResourceAndTestingClient() (*resourceDelegate, *adaptertest.TestCloudEventsClient) {
ce := adaptertest.NewTestClient()
logger := zap.NewExample().Sugar()

return &resourceDelegate{
ce: ce,
source: "unit-test",
apiServerSourceName: apiServerSourceNameTest,
logger: zap.NewExample().Sugar(),
logger: logger,
filter: subscriptionsapi.NewAllFilter(brokerfilter.MaterializeFiltersList(logger.Desugar(), []eventingv1.SubscriptionsAPIFilter{})...),
}, ce
}

func makeRefAndTestingClient() (*resourceDelegate, *adaptertest.TestCloudEventsClient) {
ce := adaptertest.NewTestClient()
logger := zap.NewExample().Sugar()

return &resourceDelegate{
ce: ce,
source: "unit-test",
apiServerSourceName: apiServerSourceNameTest,
logger: zap.NewExample().Sugar(),
ref: true,
filter: subscriptionsapi.NewAllFilter(brokerfilter.MaterializeFiltersList(logger.Desugar(), []eventingv1.SubscriptionsAPIFilter{})...),
}, ce
}
12 changes: 12 additions & 0 deletions pkg/adapter/apiserver/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ package apiserver

import (
"k8s.io/apimachinery/pkg/runtime/schema"

eventingv1 "knative.dev/eventing/pkg/apis/eventing/v1"
v1 "knative.dev/eventing/pkg/apis/sources/v1"
)

Expand Down Expand Up @@ -56,4 +58,14 @@ type Config struct {
// Defaults to `Reference`
// +optional
EventMode string `json:"mode,omitempty"`

// Filters is an experimental field that conforms to the CNCF CloudEvents Subscriptions
// API. It's an array of filter expressions that evaluate to true or false.
// If any filter expression in the array evaluates to false, the event MUST
// NOT be sent to the Sink. If all the filter expressions in the array
// evaluate to true, the event MUST be attempted to be delivered. Absence of
// a filter or empty array implies a value of true.
//
// +optional
Filters []eventingv1.SubscriptionsAPIFilter `json:"filters,omitempty"`
}
39 changes: 23 additions & 16 deletions pkg/adapter/apiserver/delegate.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,45 +24,52 @@ import (
"go.uber.org/zap"
"k8s.io/client-go/tools/cache"
"knative.dev/eventing/pkg/adapter/apiserver/events"
"knative.dev/eventing/pkg/eventfilter"
)

type resourceDelegate struct {
ce cloudevents.Client
source string
ref bool
apiServerSourceName string
filter eventfilter.Filter

logger *zap.SugaredLogger
}

var _ cache.Store = (*resourceDelegate)(nil)

func (a *resourceDelegate) Add(obj interface{}) error {
ctx, event, err := events.MakeAddEvent(a.source, a.apiServerSourceName, obj, a.ref)
if err != nil {
a.logger.Infow("event creation failed", zap.Error(err))
return err
}
a.sendCloudEvent(ctx, event)
return nil
return a.handleKubernetesObject(events.MakeAddEvent, obj)
}

func (a *resourceDelegate) Update(obj interface{}) error {
ctx, event, err := events.MakeUpdateEvent(a.source, a.apiServerSourceName, obj, a.ref)
if err != nil {
a.logger.Info("event creation failed", zap.Error(err))
return err
}
a.sendCloudEvent(ctx, event)
return nil
return a.handleKubernetesObject(events.MakeUpdateEvent, obj)
}

func (a *resourceDelegate) Delete(obj interface{}) error {
ctx, event, err := events.MakeDeleteEvent(a.source, a.apiServerSourceName, obj, a.ref)
return a.handleKubernetesObject(events.MakeDeleteEvent, obj)

}

// makeEventFunc represents the signature of the functions `events.Make*Event` so they can
// be passed as a parameter
type makeEventFunc func(string, string, interface{}, bool) (context.Context, cloudevents.Event, error)

func (a *resourceDelegate) handleKubernetesObject(makeEvent makeEventFunc, obj interface{}) error {
ctx, event, err := makeEvent(a.source, a.apiServerSourceName, obj, a.ref)

if err != nil {
a.logger.Info("event creation failed", zap.Error(err))
a.logger.Infow("event creation failed", zap.Error(err))
return err
}

filterResult := a.filter.Filter(ctx, event)
if filterResult == eventfilter.FailFilter {
a.logger.Debugf("event type %s filtered out", event.Type())
return nil
}

a.sendCloudEvent(ctx, event)
return nil
}
Expand Down
42 changes: 42 additions & 0 deletions pkg/adapter/apiserver/delegate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,12 @@ package apiserver
import (
"testing"

"go.uber.org/zap"
adaptertest "knative.dev/eventing/pkg/adapter/v2/test"
eventingv1 "knative.dev/eventing/pkg/apis/eventing/v1"
"knative.dev/eventing/pkg/apis/sources"
brokerfilter "knative.dev/eventing/pkg/broker/filter"
"knative.dev/eventing/pkg/eventfilter/subscriptionsapi"
)

func TestResourceAddEvent(t *testing.T) {
Expand Down Expand Up @@ -68,3 +73,40 @@ func TestResourceStub(t *testing.T) {
d.Replace(nil, "")
d.Resync()
}

func TestFilterFails(t *testing.T) {
ce := adaptertest.NewTestClient()
filters := []eventingv1.SubscriptionsAPIFilter{{
Exact: map[string]string{
"type": "dev.knative.apiserver.resource.add",
},
}}
logger := zap.NewExample().Sugar()
delegate := &resourceDelegate{
ce: ce,
source: "unit-test",
apiServerSourceName: apiServerSourceNameTest,
logger: logger,
filter: subscriptionsapi.NewAllFilter(brokerfilter.MaterializeFiltersList(logger.Desugar(), filters)...),
}

delegate.Update(simplePod("unit", "test"))
validateNotSent(t, ce, sources.ApiServerSourceUpdateEventType)
}

func TestEmptyFiltersList(t *testing.T) {
ce := adaptertest.NewTestClient()
filters := []eventingv1.SubscriptionsAPIFilter{}

logger := zap.NewExample().Sugar()
delegate := &resourceDelegate{
ce: ce,
source: "unit-test",
apiServerSourceName: apiServerSourceNameTest,
logger: logger,
filter: subscriptionsapi.NewAllFilter(brokerfilter.MaterializeFiltersList(logger.Desugar(), filters)...),
}

delegate.Update(simplePod("unit", "test"))
validateSent(t, ce, sources.ApiServerSourceUpdateEventType)
}
1 change: 1 addition & 0 deletions pkg/apis/feature/features.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ func newDefaults() Flags {
TransportEncryption: Disabled,
OIDCAuthentication: Disabled,
EvenTypeAutoCreate: Disabled,
NewAPIServerFilters: Disabled,
}
}

Expand Down
1 change: 1 addition & 0 deletions pkg/apis/feature/flag_names.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,5 @@ const (
OIDCAuthentication = "authentication-oidc"
NodeSelectorLabel = "apiserversources-nodeselector-"
CrossNamespaceEventLinks = "cross-namespace-event-links"
NewAPIServerFilters = "new-apiserversource-filters"
)
11 changes: 11 additions & 0 deletions pkg/apis/sources/v1/apiserver_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package v1
import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
eventingv1 "knative.dev/eventing/pkg/apis/eventing/v1"
"knative.dev/pkg/apis"
duckv1 "knative.dev/pkg/apis/duck/v1"
"knative.dev/pkg/kmeta"
Expand Down Expand Up @@ -85,6 +86,16 @@ type ApiServerSourceSpec struct {
// should be watched by the source.
// +optional
NamespaceSelector *metav1.LabelSelector `json:"namespaceSelector,omitempty"`

// Filters is an experimental field that conforms to the CNCF CloudEvents Subscriptions
// API. It's an array of filter expressions that evaluate to true or false.
// If any filter expression in the array evaluates to false, the event MUST
// NOT be sent to the Sink. If all the filter expressions in the array
// evaluate to true, the event MUST be attempted to be delivered. Absence of
// a filter or empty array implies a value of true.
//
// +optional
Filters []eventingv1.SubscriptionsAPIFilter `json:"filters,omitempty"`
}

// ApiServerSourceStatus defines the observed state of ApiServerSource
Expand Down
19 changes: 19 additions & 0 deletions pkg/apis/sources/v1/apiserver_validation.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import (

"k8s.io/apimachinery/pkg/runtime/schema"

eventingv1 "knative.dev/eventing/pkg/apis/eventing/v1"
"knative.dev/eventing/pkg/apis/feature"
"knative.dev/pkg/apis"
)

Expand Down Expand Up @@ -73,5 +75,22 @@ func (cs *ApiServerSourceSpec) Validate(ctx context.Context) *apis.FieldError {
}
}
errs = errs.Also(cs.SourceSpec.Validate(ctx))
errs = errs.Also(validateSubscriptionAPIFiltersList(ctx, cs.Filters).ViaField("filters"))
return errs
}

func validateSubscriptionAPIFiltersList(ctx context.Context, filters []eventingv1.SubscriptionsAPIFilter) (errs *apis.FieldError) {
if !feature.FromContext(ctx).IsEnabled(feature.NewAPIServerFilters) {
if len(filters) != 0 {
return errs.Also(apis.ErrGeneric("Filters is not empty but the NewAPIServerFilters feature is disabled."))
}
rh-hemartin marked this conversation as resolved.
Show resolved Hide resolved

return nil
}

for i, f := range filters {
f := f
errs = errs.Also(eventingv1.ValidateSubscriptionAPIFilter(ctx, &f)).ViaIndex(i)
}
return errs
}
Loading
Loading