From 0045fa97dd54be05f228c33354c9b018f03e8317 Mon Sep 17 00:00:00 2001 From: Calum Murray Date: Thu, 24 Aug 2023 07:21:56 -0400 Subject: [PATCH] Broker eventtype autocreate fixes (#7161) * Fixed undefined typemeta on broker in eventtype autocreate Signed-off-by: Calum Murray * Fixed autocreate so that only one eventtype is created when events are sent to mt channel broker Signed-off-by: Calum Murray * Clean up Signed-off-by: Calum Murray * Fixed unit tests Signed-off-by: Calum Murray * channel only creates eventtypes if not owned by broker Signed-off-by: Calum Murray --------- Signed-off-by: Calum Murray --- cmd/broker/ingress/main.go | 18 ++++++++++++++++-- pkg/broker/ingress/ingress_handler.go | 9 ++++++++- .../dispatcher/inmemorychannel.go | 15 ++++++++++----- 3 files changed, 34 insertions(+), 8 deletions(-) diff --git a/cmd/broker/ingress/main.go b/cmd/broker/ingress/main.go index 67817f4eafc..bd8376dbe67 100644 --- a/cmd/broker/ingress/main.go +++ b/cmd/broker/ingress/main.go @@ -131,12 +131,26 @@ func main() { logger.Fatal("Error setting up trace publishing", zap.Error(err)) } - featureStore := feature.NewStore(logging.FromContext(ctx).Named("feature-config-store")) + var featureStore *feature.Store + var handler *ingress.Handler + + featureStore = feature.NewStore(logging.FromContext(ctx).Named("feature-config-store"), func(name string, value interface{}) { + featureFlags := value.(feature.Flags) + if featureFlags.IsEnabled(feature.EvenTypeAutoCreate) && featureStore != nil && handler != nil { + autoCreate := &eventtype.EventTypeAutoHandler{ + EventTypeLister: eventtypeinformer.Get(ctx).Lister(), + EventingClient: eventingclient.Get(ctx).EventingV1beta2(), + FeatureStore: featureStore, + Logger: logger, + } + handler.EvenTypeHandler = autoCreate + } + }) featureStore.WatchConfigs(configMapWatcher) reporter := ingress.NewStatsReporter(env.ContainerName, kmeta.ChildName(env.PodName, uuid.New().String())) - handler, err := ingress.NewHandler(logger, reporter, broker.TTLDefaulter(logger, int32(env.MaxTTL)), brokerInformer) + handler, err = ingress.NewHandler(logger, reporter, broker.TTLDefaulter(logger, int32(env.MaxTTL)), brokerInformer) if err != nil { logger.Fatal("Error creating Handler", zap.Error(err)) } diff --git a/pkg/broker/ingress/ingress_handler.go b/pkg/broker/ingress/ingress_handler.go index ba9ee64cc84..7c4fbeb5795 100644 --- a/pkg/broker/ingress/ingress_handler.go +++ b/pkg/broker/ingress/ingress_handler.go @@ -247,13 +247,20 @@ func (h *Handler) ServeHTTP(writer http.ResponseWriter, request *http.Request) { } func toKReference(broker *eventingv1.Broker) *duckv1.KReference { - return &duckv1.KReference{ + kref := &duckv1.KReference{ Kind: broker.Kind, Namespace: broker.Namespace, Name: broker.Name, APIVersion: broker.APIVersion, Address: broker.Status.Address.Name, } + if kref.Kind == "" { + kref.Kind = "Broker" + } + if kref.APIVersion == "" { + kref.APIVersion = "eventing.knative.dev/v1" + } + return kref } func (h *Handler) receive(ctx context.Context, headers http.Header, event *cloudevents.Event, brokerNamespace, brokerName string) (int, time.Duration) { diff --git a/pkg/reconciler/inmemorychannel/dispatcher/inmemorychannel.go b/pkg/reconciler/inmemorychannel/dispatcher/inmemorychannel.go index 05e22d5a998..14e157db781 100644 --- a/pkg/reconciler/inmemorychannel/dispatcher/inmemorychannel.go +++ b/pkg/reconciler/inmemorychannel/dispatcher/inmemorychannel.go @@ -93,17 +93,22 @@ func (r *Reconciler) reconcile(ctx context.Context, imc *v1.InMemoryChannel) rec return err } var eventTypeAutoHandler *eventtype.EventTypeAutoHandler - var channelReference *duckv1.KReference + var channelRef *duckv1.KReference var UID *types.UID - if r.featureStore.IsEnabled(feature.EvenTypeAutoCreate) { + if ownerReferences := imc.GetOwnerReferences(); r.featureStore.IsEnabled(feature.EvenTypeAutoCreate) && + (len(ownerReferences) == 0 || + ownerReferences[0].Kind != "Broker") { + logging.FromContext(ctx).Info("EventType autocreate is enabled, creating handler") eventTypeAutoHandler = &eventtype.EventTypeAutoHandler{ EventTypeLister: r.eventTypeLister, EventingClient: r.eventingClient, FeatureStore: r.featureStore, Logger: logging.FromContext(ctx).Desugar(), } - channelReference = toKReference(imc) + + channelRef = toKReference(imc) UID = &imc.UID + } // First grab the host based MultiChannelFanoutMessage httpHandler @@ -115,7 +120,7 @@ func (r *Reconciler) reconcile(ctx context.Context, imc *v1.InMemoryChannel) rec config.FanoutConfig, r.reporter, eventTypeAutoHandler, - channelReference, + channelRef, UID, ) if err != nil { @@ -143,7 +148,7 @@ func (r *Reconciler) reconcile(ctx context.Context, imc *v1.InMemoryChannel) rec config.FanoutConfig, r.reporter, eventTypeAutoHandler, - channelReference, + channelRef, UID, channel.ResolveChannelFromPath(channel.ParseChannelFromPath), )