Skip to content

Commit

Permalink
ETs reference subscriptions/triggers on reply (#7733)
Browse files Browse the repository at this point in the history
* feat: support et autocreate on replies in kncloudevents dispatcher

Signed-off-by: Calum Murray <cmurray@redhat.com>

* feat: et autocreate on replies references triggers

Signed-off-by: Calum Murray <cmurray@redhat.com>

* feat: et autocreate reference subscriptions on replies

Signed-off-by: Calum Murray <cmurray@redhat.com>

* test: added rekt tests for eventtype autocreate on replies

Signed-off-by: Calum Murray <cmurray@redhat.com>

* fix: unit tests pass

Signed-off-by: Calum Murray <cmurray@redhat.com>

* address review comments

Signed-off-by: Calum Murray <cmurray@redhat.com>

* fix: imc dispatcher reconciler no longer panics on error

Signed-off-by: Calum Murray <cmurray@redhat.com>

* use GroupVersion variables instead of hardcoded strings

Signed-off-by: Calum Murray <cmurray@redhat.com>

---------

Signed-off-by: Calum Murray <cmurray@redhat.com>
  • Loading branch information
Cali0707 authored Apr 3, 2024
1 parent 8069d71 commit 366afed
Show file tree
Hide file tree
Showing 33 changed files with 986 additions and 34 deletions.
22 changes: 20 additions & 2 deletions cmd/broker/filter/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,12 @@ import (
"knative.dev/eventing/pkg/apis/feature"
"knative.dev/eventing/pkg/auth"
"knative.dev/eventing/pkg/broker/filter"
eventingclient "knative.dev/eventing/pkg/client/injection/client"
brokerinformer "knative.dev/eventing/pkg/client/injection/informers/eventing/v1/broker"
triggerinformer "knative.dev/eventing/pkg/client/injection/informers/eventing/v1/trigger"
eventtypeinformer "knative.dev/eventing/pkg/client/injection/informers/eventing/v1beta2/eventtype"
"knative.dev/eventing/pkg/eventingtls"
"knative.dev/eventing/pkg/eventtype"
"knative.dev/eventing/pkg/reconciler/names"
)

Expand Down Expand Up @@ -114,7 +117,22 @@ func main() {
// Watch the observability config map and dynamically update request logs.
configMapWatcher.Watch(logging.ConfigMapName(), logging.UpdateLevelFromConfigMap(sl, atomicLevel, component))

featureStore := feature.NewStore(logging.FromContext(ctx).Named("feature-config-store"))
var featureStore *feature.Store
var handler *filter.Handler

featureStore = feature.NewStore(logging.FromContext(ctx).Named("feature-config-store"), func(name string, value interface{}) {
featureFlags := value.(feature.Flags)
if featureFlags.IsEnabled(feature.EvenTypeAutoCreate) && featureStore != nil && handler != nil {
autoCreate := &eventtype.EventTypeAutoHandler{
EventTypeLister: eventtypeinformer.Get(ctx).Lister(),
EventingClient: eventingclient.Get(ctx).EventingV1beta2(),
FeatureStore: featureStore,
Logger: logger,
}
handler.EventTypeCreator = autoCreate
}

})
featureStore.WatchConfigs(configMapWatcher)

// Decorate contexts with the current state of the feature config.
Expand All @@ -135,7 +153,7 @@ func main() {
// the messages to the triggers' subscribers) in this binary.
oidcTokenVerifier := auth.NewOIDCTokenVerifier(ctx)
trustBundleConfigMapInformer := configmapinformer.Get(ctx, eventingtls.TrustBundleLabelSelector).Lister().ConfigMaps(system.Namespace())
handler, err := filter.NewHandler(logger, oidcTokenVerifier, oidcTokenProvider, triggerinformer.Get(ctx), brokerinformer.Get(ctx), reporter, trustBundleConfigMapInformer, ctxFunc)
handler, err = filter.NewHandler(logger, oidcTokenVerifier, oidcTokenProvider, triggerinformer.Get(ctx), brokerinformer.Get(ctx), reporter, trustBundleConfigMapInformer, ctxFunc)
if err != nil {
logger.Fatal("Error creating Handler", zap.Error(err))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,3 +44,12 @@ rules:
- "serviceaccounts/token"
verbs:
- create
- apiGroups:
- "eventing.knative.dev"
resources:
- "eventtypes"
verbs:
- "get"
- "list"
- "watch"
- "create"
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,12 @@ spec:
description: Generation of the origin of the subscriber with uid:UID.
type: integer
format: int64
name:
description: The name of the subscription
type: string
namespace:
description: The namespace of the subscription
type: string
replyUri:
description: ReplyURI is the endpoint for the reply
type: string
Expand Down
3 changes: 3 additions & 0 deletions config/core/resources/channel.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,9 @@ spec:
description: Generation of the origin of the subscriber with uid:UID.
type: integer
format: int64
name:
description: The name of the subscription
type: string
replyUri:
description: ReplyURI is the endpoint for the reply
type: string
Expand Down
12 changes: 12 additions & 0 deletions docs/eventing-api.md
Original file line number Diff line number Diff line change
Expand Up @@ -625,6 +625,18 @@ section of the resource.</p>
<tbody>
<tr>
<td>
<code>name</code><br/>
<em>
string
</em>
</td>
<td>
<em>(Optional)</em>
<p>Name is used to identify the original subscription object.</p>
</td>
</tr>
<tr>
<td>
<code>uid</code><br/>
<em>
<a href="https://godoc.org/k8s.io/apimachinery/pkg/types#UID">
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ require (
github.com/russross/blackfriday/v2 v2.1.0 // indirect
github.com/spf13/cobra v1.7.0 // indirect
github.com/spf13/pflag v1.0.5 // indirect
github.com/valyala/bytebufferpool v1.0.0 // indirect
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.49.0 // indirect
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.49.0 // indirect
go.opentelemetry.io/otel/metric v1.24.0 // indirect
Expand Down
3 changes: 3 additions & 0 deletions pkg/apis/duck/v1/subscribable_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ var _ duck.Implementable = (*Subscribable)(nil)
//
// At least one of SubscriberURI and ReplyURI must be present
type SubscriberSpec struct {
// Name is used to identify the original subscription object.
// +optional
Name *string `json:"name,omitempty"`
// UID is used to understand the origin of the subscriber.
// +optional
UID types.UID `json:"uid,omitempty"`
Expand Down
5 changes: 5 additions & 0 deletions pkg/apis/duck/v1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

27 changes: 21 additions & 6 deletions pkg/broker/filter/filter_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ import (
"knative.dev/eventing/pkg/eventfilter"
"knative.dev/eventing/pkg/eventfilter/attributes"
"knative.dev/eventing/pkg/eventfilter/subscriptionsapi"
"knative.dev/eventing/pkg/eventtype"
"knative.dev/eventing/pkg/kncloudevents"
"knative.dev/eventing/pkg/reconciler/sugar/trigger/path"
"knative.dev/eventing/pkg/tracing"
Expand All @@ -78,12 +79,13 @@ type Handler struct {

eventDispatcher *kncloudevents.Dispatcher

triggerLister eventinglisters.TriggerLister
brokerLister eventinglisters.BrokerLister
logger *zap.Logger
withContext func(ctx context.Context) context.Context
filtersMap *subscriptionsapi.FiltersMap
tokenVerifier *auth.OIDCTokenVerifier
triggerLister eventinglisters.TriggerLister
brokerLister eventinglisters.BrokerLister
logger *zap.Logger
withContext func(ctx context.Context) context.Context
filtersMap *subscriptionsapi.FiltersMap
tokenVerifier *auth.OIDCTokenVerifier
EventTypeCreator *eventtype.EventTypeAutoHandler
}

// NewHandler creates a new Handler and its associated EventReceiver.
Expand Down Expand Up @@ -367,6 +369,19 @@ func (h *Handler) send(ctx context.Context, writer http.ResponseWriter, headers
kncloudevents.WithHeader(additionalHeaders),
}

if h.EventTypeCreator != nil {
opts = append(opts, kncloudevents.WithEventTypeAutoHandler(
h.EventTypeCreator,
&duckv1.KReference{
Name: t.Name,
Namespace: t.Namespace,
APIVersion: eventingv1.SchemeGroupVersion.String(),
Kind: "Trigger",
},
t.UID,
))
}

if t.Status.Auth != nil && t.Status.Auth.ServiceAccountName != nil {
opts = append(opts, kncloudevents.WithOIDCAuthentication(&types.NamespacedName{
Name: *t.Status.Auth.ServiceAccountName,
Expand Down
25 changes: 24 additions & 1 deletion pkg/channel/fanout/fanout_event_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (

"knative.dev/eventing/pkg/apis"
eventingduckv1 "knative.dev/eventing/pkg/apis/duck/v1"
messagingv1 "knative.dev/eventing/pkg/apis/messaging/v1"
"knative.dev/eventing/pkg/channel"
"knative.dev/eventing/pkg/eventtype"
"knative.dev/eventing/pkg/kncloudevents"
Expand All @@ -51,6 +52,9 @@ type Subscription struct {
DeadLetter *duckv1.Addressable
RetryConfig *kncloudevents.RetryConfig
ServiceAccount *types.NamespacedName
Name string
Namespace string
UID types.UID
}

// Config for a fanout.EventHandler.
Expand Down Expand Up @@ -167,7 +171,13 @@ func SubscriberSpecToFanoutConfig(sub eventingduckv1.SubscriberSpec) (*Subscript
}
}

return &Subscription{Subscriber: destination, Reply: reply, DeadLetter: deadLetter, RetryConfig: retryConfig}, nil
s := &Subscription{Subscriber: destination, Reply: reply, DeadLetter: deadLetter, RetryConfig: retryConfig, UID: sub.UID}

if sub.Name != nil {
s.Name = *sub.Name
}

return s, nil
}

func (f *FanoutEventHandler) SetSubscriptions(ctx context.Context, subs []Subscription) {
Expand Down Expand Up @@ -361,6 +371,19 @@ func (f *FanoutEventHandler) makeFanoutRequest(ctx context.Context, event event.
kncloudevents.WithRetryConfig(sub.RetryConfig),
}

if f.eventTypeHandler != nil && sub.Name != "" && sub.Namespace != "" && sub.UID != types.UID("") {
dispatchOptions = append(dispatchOptions, kncloudevents.WithEventTypeAutoHandler(
f.eventTypeHandler,
&duckv1.KReference{
Name: sub.Name,
Namespace: sub.Namespace,
APIVersion: messagingv1.SchemeGroupVersion.String(),
Kind: "Subscription",
},
sub.UID,
))
}

if sub.ServiceAccount != nil {
dispatchOptions = append(dispatchOptions, kncloudevents.WithOIDCAuthentication(sub.ServiceAccount))
}
Expand Down
50 changes: 44 additions & 6 deletions pkg/kncloudevents/event_dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (

cloudevents "github.com/cloudevents/sdk-go/v2"
"github.com/cloudevents/sdk-go/v2/binding"
"github.com/cloudevents/sdk-go/v2/binding/buffering"
"github.com/cloudevents/sdk-go/v2/event"
cehttp "github.com/cloudevents/sdk-go/v2/protocol/http"
"github.com/hashicorp/go-retryablehttp"
Expand All @@ -42,6 +43,7 @@ import (
eventingapis "knative.dev/eventing/pkg/apis"
"knative.dev/eventing/pkg/auth"
"knative.dev/eventing/pkg/eventingtls"
"knative.dev/eventing/pkg/eventtype"
"knative.dev/eventing/pkg/utils"

"knative.dev/eventing/pkg/broker"
Expand Down Expand Up @@ -115,13 +117,29 @@ func WithOIDCAuthentication(serviceAccount *types.NamespacedName) SendOption {
}
}

func WithEventTypeAutoHandler(handler *eventtype.EventTypeAutoHandler, ref *duckv1.KReference, ownerUID types.UID) SendOption {
return func(sc *senderConfig) error {
if handler != nil && (ref == nil || ownerUID == types.UID("")) {
return fmt.Errorf("addressable and ownerUID must be provided if using the eventtype auto handler")
}
sc.eventTypeAutoHandler = handler
sc.eventTypeRef = ref
sc.eventTypeOnwerUID = ownerUID

return nil
}
}

type senderConfig struct {
reply *duckv1.Addressable
deadLetterSink *duckv1.Addressable
additionalHeaders http.Header
retryConfig *RetryConfig
transformers binding.Transformers
oidcServiceAccount *types.NamespacedName
reply *duckv1.Addressable
deadLetterSink *duckv1.Addressable
additionalHeaders http.Header
retryConfig *RetryConfig
transformers binding.Transformers
oidcServiceAccount *types.NamespacedName
eventTypeAutoHandler *eventtype.EventTypeAutoHandler
eventTypeRef *duckv1.KReference
eventTypeOnwerUID types.UID
}

type Dispatcher struct {
Expand Down Expand Up @@ -229,6 +247,10 @@ func (d *Dispatcher) send(ctx context.Context, message binding.Message, destinat

messagesToFinish = append(messagesToFinish, responseMessage)

if config.eventTypeAutoHandler != nil {
d.handleAutocreate(ctx, responseMessage, config)
}

if config.reply == nil {
return dispatchExecutionInfo, nil
}
Expand Down Expand Up @@ -332,6 +354,22 @@ func (d *Dispatcher) executeRequest(ctx context.Context, target duckv1.Addressab
return ctx, responseMessage, &dispatchInfo, nil
}

func (d *Dispatcher) handleAutocreate(ctx context.Context, responseMessage binding.Message, config *senderConfig) {
// messages can only be read once, so we need to make a copy of it
messageCopy, err := buffering.CopyMessage(ctx, responseMessage)
if err != nil {
return
}
defer responseMessage.Finish(nil)

responseEvent, err := binding.ToEvent(ctx, messageCopy)
if err != nil {
return
}

config.eventTypeAutoHandler.AutoCreateEventType(ctx, responseEvent, config.eventTypeRef, config.eventTypeOnwerUID)
}

func (d *Dispatcher) createRequest(ctx context.Context, message binding.Message, target duckv1.Addressable, additionalHeaders http.Header, oidcServiceAccount *types.NamespacedName, transformers ...binding.Transformer) (*http.Request, error) {
request, err := http.NewRequestWithContext(ctx, "POST", target.URL.String(), nil)
if err != nil {
Expand Down
9 changes: 6 additions & 3 deletions pkg/reconciler/inmemorychannel/dispatcher/inmemorychannel.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,15 +230,18 @@ func newConfigForInMemoryChannel(ctx context.Context, imc *v1.InMemoryChannel) (

for i, sub := range imc.Spec.Subscribers {
conf, err := fanout.SubscriberSpecToFanoutConfig(sub)
if err != nil {
return nil, err
}

conf.Namespace = imc.Namespace
if isOIDCEnabled {
conf.ServiceAccount = &types.NamespacedName{
Name: *sub.Auth.ServiceAccountName,
Namespace: imc.Namespace,
}
}
if err != nil {
return nil, err
}

subs[i] = *conf
}

Expand Down
Loading

0 comments on commit 366afed

Please sign in to comment.