From e80b652ab8462eab56f379986dbce15b04514820 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Christoph=20St=C3=A4bler?= Date: Tue, 7 May 2024 11:47:54 +0200 Subject: [PATCH 1/8] Update CRDs to include AuthStatus serviceAccountNames --- .../in-memory-channel/resources/in-memory-channel.yaml | 10 ++++++++++ config/core/resources/apiserversource.yaml | 5 +++++ config/core/resources/channel.yaml | 10 ++++++++++ config/core/resources/containersource.yaml | 5 +++++ config/core/resources/parallel.yaml | 5 +++++ config/core/resources/pingsource.yaml | 10 ++++++++++ config/core/resources/sequence.yaml | 5 +++++ config/core/resources/sinkbindings.yaml | 5 +++++ config/core/resources/subscription.yaml | 5 +++++ config/core/resources/trigger.yaml | 5 +++++ 10 files changed, 65 insertions(+) diff --git a/config/channels/in-memory-channel/resources/in-memory-channel.yaml b/config/channels/in-memory-channel/resources/in-memory-channel.yaml index 7014f02f0f3..be64bb4cb3a 100644 --- a/config/channels/in-memory-channel/resources/in-memory-channel.yaml +++ b/config/channels/in-memory-channel/resources/in-memory-channel.yaml @@ -170,6 +170,11 @@ spec: serviceAccountName: description: ServiceAccountName is the name of the generated service account used for this components OIDC authentication. type: string + serviceAccountNames: + description: ServiceAccountNames is the list of names of the generated service accounts used for this components OIDC authentication. + type: array + items: + type: string status: description: Status represents the current state of the Channel. This data may be out of date. type: object @@ -286,6 +291,11 @@ spec: serviceAccountName: description: ServiceAccountName is the name of the generated service account used for this components OIDC authentication. type: string + serviceAccountNames: + description: ServiceAccountNames is the list of names of the generated service accounts used for this components OIDC authentication. + type: array + items: + type: string additionalPrinterColumns: - name: URL type: string diff --git a/config/core/resources/apiserversource.yaml b/config/core/resources/apiserversource.yaml index d387d4e7b0a..dafba1672d8 100644 --- a/config/core/resources/apiserversource.yaml +++ b/config/core/resources/apiserversource.yaml @@ -201,6 +201,11 @@ spec: serviceAccountName: description: ServiceAccountName is the name of the generated service account used for this components OIDC authentication. type: string + serviceAccountNames: + description: ServiceAccountNames is the list of names of the generated service accounts used for this components OIDC authentication. + type: array + items: + type: string ceAttributes: description: CloudEventAttributes are the specific attributes that the Source uses as part of its CloudEvents. type: array diff --git a/config/core/resources/channel.yaml b/config/core/resources/channel.yaml index 9a5f38840c5..02b174ee93f 100644 --- a/config/core/resources/channel.yaml +++ b/config/core/resources/channel.yaml @@ -189,6 +189,11 @@ spec: serviceAccountName: description: ServiceAccountName is the name of the generated service account used for this components OIDC authentication. type: string + serviceAccountNames: + description: ServiceAccountNames is the list of names of the generated service accounts used for this components OIDC authentication. + type: array + items: + type: string status: description: Status represents the current state of the Channel. This data may be out of date. type: object @@ -321,6 +326,11 @@ spec: serviceAccountName: description: ServiceAccountName is the name of the generated service account used for this components OIDC authentication. type: string + serviceAccountNames: + description: ServiceAccountNames is the list of names of the generated service accounts used for this components OIDC authentication. + type: array + items: + type: string names: kind: Channel plural: channels diff --git a/config/core/resources/containersource.yaml b/config/core/resources/containersource.yaml index 1fa07ec642a..f1badb46ec8 100644 --- a/config/core/resources/containersource.yaml +++ b/config/core/resources/containersource.yaml @@ -94,6 +94,11 @@ spec: serviceAccountName: description: ServiceAccountName is the name of the generated service account used for this components OIDC authentication. type: string + serviceAccountNames: + description: ServiceAccountNames is the list of names of the generated service accounts used for this components OIDC authentication. + type: array + items: + type: string ceAttributes: description: CloudEventAttributes are the specific attributes that the Source uses as part of its CloudEvents. type: array diff --git a/config/core/resources/parallel.yaml b/config/core/resources/parallel.yaml index cd69b91d93a..269eb193022 100644 --- a/config/core/resources/parallel.yaml +++ b/config/core/resources/parallel.yaml @@ -345,6 +345,11 @@ spec: serviceAccountName: description: ServiceAccountName is the name of the generated service account used for this components OIDC authentication. type: string + serviceAccountNames: + description: ServiceAccountNames is the list of names of the generated service accounts used for this components OIDC authentication. + type: array + items: + type: string branchStatuses: description: BranchStatuses is an array of corresponding to branch statuses. Matches the Spec.Branches array in the order. diff --git a/config/core/resources/pingsource.yaml b/config/core/resources/pingsource.yaml index b5a07a529e1..57bd817be8e 100644 --- a/config/core/resources/pingsource.yaml +++ b/config/core/resources/pingsource.yaml @@ -135,6 +135,11 @@ spec: serviceAccountName: description: ServiceAccountName is the name of the generated service account used for this components OIDC authentication. type: string + serviceAccountNames: + description: ServiceAccountNames is the list of names of the generated service accounts used for this components OIDC authentication. + type: array + items: + type: string ceAttributes: description: 'CloudEventAttributes are the specific attributes that the Source uses as part of its CloudEvents.' @@ -316,6 +321,11 @@ spec: serviceAccountName: description: ServiceAccountName is the name of the generated service account used for this components OIDC authentication. type: string + serviceAccountNames: + description: ServiceAccountNames is the list of names of the generated service accounts used for this components OIDC authentication. + type: array + items: + type: string ceAttributes: description: 'CloudEventAttributes are the specific attributes that the Source uses as part of its CloudEvents.' diff --git a/config/core/resources/sequence.yaml b/config/core/resources/sequence.yaml index 6ef3641caa9..af6157e4eff 100644 --- a/config/core/resources/sequence.yaml +++ b/config/core/resources/sequence.yaml @@ -196,6 +196,11 @@ spec: serviceAccountName: description: ServiceAccountName is the name of the generated service account used for this components OIDC authentication. type: string + serviceAccountNames: + description: ServiceAccountNames is the list of names of the generated service accounts used for this components OIDC authentication. + type: array + items: + type: string channelStatuses: description: ChannelStatuses is an array of corresponding Channel statuses. Matches the Spec.Steps array in the order. type: array diff --git a/config/core/resources/sinkbindings.yaml b/config/core/resources/sinkbindings.yaml index 7aae2a3f688..705ba7e1b65 100644 --- a/config/core/resources/sinkbindings.yaml +++ b/config/core/resources/sinkbindings.yaml @@ -136,6 +136,11 @@ spec: serviceAccountName: description: ServiceAccountName is the name of the generated service account used for this components OIDC authentication. type: string + serviceAccountNames: + description: ServiceAccountNames is the list of names of the generated service accounts used for this components OIDC authentication. + type: array + items: + type: string ceAttributes: description: CloudEventAttributes are the specific attributes that the Source uses as part of its CloudEvents. type: array diff --git a/config/core/resources/subscription.yaml b/config/core/resources/subscription.yaml index 9446bf97ea5..71edb78ae03 100644 --- a/config/core/resources/subscription.yaml +++ b/config/core/resources/subscription.yaml @@ -169,6 +169,11 @@ spec: serviceAccountName: description: ServiceAccountName is the name of the generated service account used for this components OIDC authentication. type: string + serviceAccountNames: + description: ServiceAccountNames is the list of names of the generated service accounts used for this components OIDC authentication. + type: array + items: + type: string conditions: description: Conditions the latest available observations of a resource's current state. type: array diff --git a/config/core/resources/trigger.yaml b/config/core/resources/trigger.yaml index 6ce1ceb101e..395da4c0b92 100644 --- a/config/core/resources/trigger.yaml +++ b/config/core/resources/trigger.yaml @@ -169,6 +169,11 @@ spec: serviceAccountName: description: ServiceAccountName is the name of the generated service account used for this components OIDC authentication. type: string + serviceAccountNames: + description: ServiceAccountNames is the list of names of the generated service accounts used for this components OIDC authentication. + type: array + items: + type: string conditions: description: Conditions the latest available observations of a resource's current state. type: array From 862a579eb6d96fc135ec26b930e6f86651a04353 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Christoph=20St=C3=A4bler?= Date: Tue, 7 May 2024 12:08:44 +0200 Subject: [PATCH 2/8] Revert "support auto generation of Sequence identity service account [OIDC] (#7361)" This reverts commit e5f28141cd65d2494abe7d4b7bc7e75949b85b02. --- pkg/apis/flows/v1/sequence_lifecycle.go | 27 +-- pkg/apis/flows/v1/sequence_lifecycle_test.go | 91 ++++----- pkg/reconciler/sequence/controller.go | 42 +---- pkg/reconciler/sequence/controller_test.go | 23 +-- pkg/reconciler/sequence/sequence.go | 16 +- pkg/reconciler/sequence/sequence_test.go | 186 ++----------------- pkg/reconciler/testing/v1/sequence.go | 30 --- 7 files changed, 55 insertions(+), 360 deletions(-) diff --git a/pkg/apis/flows/v1/sequence_lifecycle.go b/pkg/apis/flows/v1/sequence_lifecycle.go index c29ef7f099b..6614a1e57c6 100644 --- a/pkg/apis/flows/v1/sequence_lifecycle.go +++ b/pkg/apis/flows/v1/sequence_lifecycle.go @@ -28,8 +28,7 @@ import ( duckv1 "knative.dev/pkg/apis/duck/v1" ) -var sCondSet = apis.NewLivingConditionSet(SequenceConditionReady, SequenceConditionChannelsReady, SequenceConditionSubscriptionsReady, SequenceConditionAddressable, - SequenceConditionOIDCIdentityCreated) +var sCondSet = apis.NewLivingConditionSet(SequenceConditionReady, SequenceConditionChannelsReady, SequenceConditionSubscriptionsReady, SequenceConditionAddressable) const ( // SequenceConditionReady has status True when all subconditions below have been set to True. @@ -46,10 +45,6 @@ const ( // SequenceConditionAddressable has status true when this Sequence meets // the Addressable contract and has a non-empty hostname. SequenceConditionAddressable apis.ConditionType = "Addressable" - - // SequenceConditionOIDCIdentityCreated has status True when the OIDCIdentity has been created. - // This condition is only relevant if the OIDC feature is enabled. - SequenceConditionOIDCIdentityCreated apis.ConditionType = "OIDCIdentityCreated" ) // GetConditionSet retrieves the condition set for this resource. Implements the KRShaped interface. @@ -199,23 +194,3 @@ func (ss *SequenceStatus) setAddress(address *duckv1.Addressable) { sCondSet.Manage(ss).MarkTrue(SequenceConditionAddressable) } } - -// MarkOIDCIdentityCreatedSucceeded marks the OIDCIdentityCreated condition as true. -func (ss *SequenceStatus) MarkOIDCIdentityCreatedSucceeded() { - sCondSet.Manage(ss).MarkTrue(SequenceConditionOIDCIdentityCreated) -} - -// MarkOIDCIdentityCreatedSucceededWithReason marks the OIDCIdentityCreated condition as true with the given reason. -func (ss *SequenceStatus) MarkOIDCIdentityCreatedSucceededWithReason(reason, messageFormat string, messageA ...interface{}) { - sCondSet.Manage(ss).MarkTrueWithReason(SequenceConditionOIDCIdentityCreated, reason, messageFormat, messageA...) -} - -// MarkOIDCIdentityCreatedFailed marks the OIDCIdentityCreated condition as false with the given reason. -func (ss *SequenceStatus) MarkOIDCIdentityCreatedFailed(reason, messageFormat string, messageA ...interface{}) { - sCondSet.Manage(ss).MarkFalse(SequenceConditionOIDCIdentityCreated, reason, messageFormat, messageA...) -} - -// MarkOIDCIdentityCreatedUnknown marks the OIDCIdentityCreated condition as unknown with the given reason. -func (ss *SequenceStatus) MarkOIDCIdentityCreatedUnknown(reason, messageFormat string, messageA ...interface{}) { - sCondSet.Manage(ss).MarkUnknown(SequenceConditionOIDCIdentityCreated, reason, messageFormat, messageA...) -} diff --git a/pkg/apis/flows/v1/sequence_lifecycle_test.go b/pkg/apis/flows/v1/sequence_lifecycle_test.go index ee19d74844d..24c822147bb 100644 --- a/pkg/apis/flows/v1/sequence_lifecycle_test.go +++ b/pkg/apis/flows/v1/sequence_lifecycle_test.go @@ -143,9 +143,6 @@ func TestSequenceInitializeConditions(t *testing.T) { }, { Type: SequenceConditionChannelsReady, Status: corev1.ConditionUnknown, - }, { - Type: SequenceConditionOIDCIdentityCreated, - Status: corev1.ConditionUnknown, }, { Type: SequenceConditionReady, Status: corev1.ConditionUnknown, @@ -173,9 +170,6 @@ func TestSequenceInitializeConditions(t *testing.T) { }, { Type: SequenceConditionChannelsReady, Status: corev1.ConditionFalse, - }, { - Type: SequenceConditionOIDCIdentityCreated, - Status: corev1.ConditionUnknown, }, { Type: SequenceConditionReady, Status: corev1.ConditionUnknown, @@ -203,9 +197,6 @@ func TestSequenceInitializeConditions(t *testing.T) { }, { Type: SequenceConditionChannelsReady, Status: corev1.ConditionUnknown, - }, { - Type: SequenceConditionOIDCIdentityCreated, - Status: corev1.ConditionUnknown, }, { Type: SequenceConditionReady, Status: corev1.ConditionUnknown, @@ -324,59 +315,45 @@ func TestSequencePropagateChannelStatuses(t *testing.T) { func TestSequenceReady(t *testing.T) { tests := []struct { - name string - subs []*messagingv1.Subscription - channels []*eventingduckv1.Channelable - oidcSACreated bool - want bool + name string + subs []*messagingv1.Subscription + channels []*eventingduckv1.Channelable + want bool }{{ - name: "empty", - subs: []*messagingv1.Subscription{}, - channels: []*eventingduckv1.Channelable{}, - oidcSACreated: false, - want: false, - }, { - name: "one channelable not ready, one subscription ready", - channels: []*eventingduckv1.Channelable{getChannelable(false)}, - subs: []*messagingv1.Subscription{getSubscription("sub0", true)}, - oidcSACreated: false, - want: false, + name: "empty", + subs: []*messagingv1.Subscription{}, + channels: []*eventingduckv1.Channelable{}, + want: false, }, { - name: "one channelable ready, one subscription not ready", - channels: []*eventingduckv1.Channelable{getChannelable(true)}, - subs: []*messagingv1.Subscription{getSubscription("sub0", false)}, - oidcSACreated: false, - want: false, + name: "one channelable not ready, one subscription ready", + channels: []*eventingduckv1.Channelable{getChannelable(false)}, + subs: []*messagingv1.Subscription{getSubscription("sub0", true)}, + want: false, }, { - name: "one channelable ready, one subscription ready, oidc SA created", - channels: []*eventingduckv1.Channelable{getChannelable(true)}, - subs: []*messagingv1.Subscription{getSubscription("sub0", true)}, - oidcSACreated: true, - want: true, + name: "one channelable ready, one subscription not ready", + channels: []*eventingduckv1.Channelable{getChannelable(true)}, + subs: []*messagingv1.Subscription{getSubscription("sub0", false)}, + want: false, }, { - name: "one channelable ready, one not, two subscriptions ready", - channels: []*eventingduckv1.Channelable{getChannelable(true), getChannelable(false)}, - subs: []*messagingv1.Subscription{getSubscription("sub0", true), getSubscription("sub1", true)}, - oidcSACreated: false, - want: false, + name: "one channelable ready, one subscription ready", + channels: []*eventingduckv1.Channelable{getChannelable(true)}, + subs: []*messagingv1.Subscription{getSubscription("sub0", true)}, + want: true, }, { - name: "two channelables ready, one subscription ready, one not", - channels: []*eventingduckv1.Channelable{getChannelable(true), getChannelable(true)}, - subs: []*messagingv1.Subscription{getSubscription("sub0", true), getSubscription("sub1", false)}, - oidcSACreated: false, - want: false, + name: "one channelable ready, one not, two subscriptions ready", + channels: []*eventingduckv1.Channelable{getChannelable(true), getChannelable(false)}, + subs: []*messagingv1.Subscription{getSubscription("sub0", true), getSubscription("sub1", true)}, + want: false, }, { - name: "two channelables ready, two subscriptions ready, oidc SA not created", - channels: []*eventingduckv1.Channelable{getChannelable(true), getChannelable(true)}, - subs: []*messagingv1.Subscription{getSubscription("sub0", true), getSubscription("sub1", true)}, - oidcSACreated: false, - want: false, + name: "two channelables ready, one subscription ready, one not", + channels: []*eventingduckv1.Channelable{getChannelable(true), getChannelable(true)}, + subs: []*messagingv1.Subscription{getSubscription("sub0", true), getSubscription("sub1", false)}, + want: false, }, { - name: "two channelables ready, two subscriptions ready, oidc SA created", - channels: []*eventingduckv1.Channelable{getChannelable(true), getChannelable(true)}, - subs: []*messagingv1.Subscription{getSubscription("sub0", true), getSubscription("sub1", true)}, - oidcSACreated: true, - want: true, + name: "two channelables ready, two subscriptions ready", + channels: []*eventingduckv1.Channelable{getChannelable(true), getChannelable(true)}, + subs: []*messagingv1.Subscription{getSubscription("sub0", true), getSubscription("sub1", true)}, + want: true, }} for _, test := range tests { @@ -384,10 +361,6 @@ func TestSequenceReady(t *testing.T) { ps := SequenceStatus{} ps.PropagateChannelStatuses(test.channels) ps.PropagateSubscriptionStatuses(test.subs) - if test.oidcSACreated { - ps.MarkOIDCIdentityCreatedSucceeded() - } - got := ps.IsReady() want := test.want if want != got { diff --git a/pkg/reconciler/sequence/controller.go b/pkg/reconciler/sequence/controller.go index 2ba64da960c..e0d7abed614 100644 --- a/pkg/reconciler/sequence/controller.go +++ b/pkg/reconciler/sequence/controller.go @@ -19,23 +19,17 @@ package sequence import ( "context" - "knative.dev/eventing/pkg/auth" - "k8s.io/client-go/tools/cache" - "knative.dev/eventing/pkg/apis/feature" v1 "knative.dev/eventing/pkg/apis/flows/v1" "knative.dev/eventing/pkg/duck" "knative.dev/pkg/configmap" "knative.dev/pkg/controller" - "knative.dev/pkg/logging" eventingclient "knative.dev/eventing/pkg/client/injection/client" "knative.dev/eventing/pkg/client/injection/ducks/duck/v1/channelable" "knative.dev/eventing/pkg/client/injection/informers/flows/v1/sequence" "knative.dev/eventing/pkg/client/injection/informers/messaging/v1/subscription" sequencereconciler "knative.dev/eventing/pkg/client/injection/reconciler/flows/v1/sequence" - kubeclient "knative.dev/pkg/client/injection/kube/client" - serviceaccountinformer "knative.dev/pkg/client/injection/kube/informers/core/v1/serviceaccount/filtered" "knative.dev/pkg/injection/clients/dynamicclient" ) @@ -48,34 +42,14 @@ func NewController( sequenceInformer := sequence.Get(ctx) subscriptionInformer := subscription.Get(ctx) - oidcServiceaccountInformer := serviceaccountinformer.Get(ctx, auth.OIDCLabelSelector) - - var globalResync func(obj interface{}) - featureStore := feature.NewStore(logging.FromContext(ctx).Named("feature-config-store"), func(name string, value interface{}) { - if globalResync != nil { - globalResync(nil) - } - }) - featureStore.WatchConfigs(cmw) r := &Reconciler{ - sequenceLister: sequenceInformer.Lister(), - subscriptionLister: subscriptionInformer.Lister(), - dynamicClientSet: dynamicclient.Get(ctx), - eventingClientSet: eventingclient.Get(ctx), - serviceAccountLister: oidcServiceaccountInformer.Lister(), - kubeclient: kubeclient.Get(ctx), - } - - impl := sequencereconciler.NewImpl(ctx, r, func(impl *controller.Impl) controller.Options { - return controller.Options{ - ConfigStore: featureStore, - } - }) - - globalResync = func(_ interface{}) { - impl.GlobalResync(sequenceInformer.Informer()) + sequenceLister: sequenceInformer.Lister(), + subscriptionLister: subscriptionInformer.Lister(), + dynamicClientSet: dynamicclient.Get(ctx), + eventingClientSet: eventingclient.Get(ctx), } + impl := sequencereconciler.NewImpl(ctx, r) r.channelableTracker = duck.NewListableTrackerFromTracker(ctx, channelable.Get, impl.Tracker) sequenceInformer.Informer().AddEventHandler(controller.HandleAll(impl.Enqueue)) @@ -87,11 +61,5 @@ func NewController( Handler: controller.HandleAll(impl.EnqueueControllerOf), }) - // Reconcile Sequence when the OIDC service account changes - oidcServiceaccountInformer.Informer().AddEventHandler(cache.FilteringResourceEventHandler{ - FilterFunc: controller.FilterController(&v1.Sequence{}), - Handler: controller.HandleAll(impl.EnqueueControllerOf), - }) - return impl } diff --git a/pkg/reconciler/sequence/controller_test.go b/pkg/reconciler/sequence/controller_test.go index ee62360a68d..18faa1dd97b 100644 --- a/pkg/reconciler/sequence/controller_test.go +++ b/pkg/reconciler/sequence/controller_test.go @@ -17,42 +17,23 @@ limitations under the License. package sequence import ( - "context" "testing" - "knative.dev/eventing/pkg/auth" - filteredFactory "knative.dev/pkg/client/injection/kube/informers/factory/filtered" - - corev1 "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "knative.dev/pkg/configmap" . "knative.dev/pkg/reconciler/testing" // Fake injection informers - "knative.dev/eventing/pkg/apis/feature" _ "knative.dev/eventing/pkg/client/injection/ducks/duck/v1/channelable/fake" _ "knative.dev/eventing/pkg/client/injection/informers/flows/v1/sequence/fake" _ "knative.dev/eventing/pkg/client/injection/informers/messaging/v1/subscription/fake" - _ "knative.dev/pkg/client/injection/kube/informers/core/v1/serviceaccount/filtered/fake" - _ "knative.dev/pkg/client/injection/kube/informers/factory/filtered/fake" ) func TestNew(t *testing.T) { - ctx, _ := SetupFakeContext(t, SetUpInformerSelector) + ctx, _ := SetupFakeContext(t) - c := NewController(ctx, configmap.NewStaticWatcher( - &corev1.ConfigMap{ - ObjectMeta: metav1.ObjectMeta{ - Name: feature.FlagsConfigName, - }, - })) + c := NewController(ctx, configmap.NewStaticWatcher()) if c == nil { t.Fatal("Expected NewController to return a non-nil value") } } - -func SetUpInformerSelector(ctx context.Context) context.Context { - ctx = filteredFactory.WithSelectors(ctx, auth.OIDCLabelSelector) - return ctx -} diff --git a/pkg/reconciler/sequence/sequence.go b/pkg/reconciler/sequence/sequence.go index 7a7d5fdbf7a..d37dd3b793a 100644 --- a/pkg/reconciler/sequence/sequence.go +++ b/pkg/reconciler/sequence/sequence.go @@ -29,7 +29,6 @@ import ( "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" "k8s.io/client-go/dynamic" - "k8s.io/client-go/kubernetes" "knative.dev/pkg/kmeta" duckapis "knative.dev/pkg/apis/duck" @@ -37,19 +36,15 @@ import ( pkgreconciler "knative.dev/pkg/reconciler" eventingduckv1 "knative.dev/eventing/pkg/apis/duck/v1" - "knative.dev/eventing/pkg/apis/feature" v1 "knative.dev/eventing/pkg/apis/flows/v1" messagingv1 "knative.dev/eventing/pkg/apis/messaging/v1" - "knative.dev/eventing/pkg/auth" clientset "knative.dev/eventing/pkg/client/clientset/versioned" sequencereconciler "knative.dev/eventing/pkg/client/injection/reconciler/flows/v1/sequence" listers "knative.dev/eventing/pkg/client/listers/flows/v1" messaginglisters "knative.dev/eventing/pkg/client/listers/messaging/v1" "knative.dev/eventing/pkg/duck" - corev1listers "k8s.io/client-go/listers/core/v1" "knative.dev/eventing/pkg/reconciler/sequence/resources" - duckv1 "knative.dev/pkg/apis/duck/v1" ) type Reconciler struct { @@ -62,9 +57,7 @@ type Reconciler struct { eventingClientSet clientset.Interface // dynamicClientSet allows us to configure pluggable Build objects - dynamicClientSet dynamic.Interface - serviceAccountLister corev1listers.ServiceAccountLister - kubeclient kubernetes.Interface + dynamicClientSet dynamic.Interface } // Check that our Reconciler implements sequencereconciler.Interface @@ -129,13 +122,6 @@ func (r *Reconciler) ReconcileKind(ctx context.Context, s *v1.Sequence) pkgrecon return err } - featureFlags := feature.FromContext(ctx) - if err := auth.SetupOIDCServiceAccount(ctx, featureFlags, r.serviceAccountLister, r.kubeclient, v1.SchemeGroupVersion.WithKind("Sequence"), s.ObjectMeta, &s.Status, func(as *duckv1.AuthStatus) { - s.Status.Auth = as - }); err != nil { - return err - } - return r.removeUnwantedSubscriptions(ctx, s, subs) } diff --git a/pkg/reconciler/sequence/sequence_test.go b/pkg/reconciler/sequence/sequence_test.go index 0e4f3aa0ee3..b149a72ea01 100644 --- a/pkg/reconciler/sequence/sequence_test.go +++ b/pkg/reconciler/sequence/sequence_test.go @@ -28,10 +28,8 @@ import ( "k8s.io/apimachinery/pkg/types" clientgotesting "k8s.io/client-go/testing" - "knative.dev/eventing/pkg/apis/feature" v1 "knative.dev/eventing/pkg/apis/flows/v1" messagingv1 "knative.dev/eventing/pkg/apis/messaging/v1" - "knative.dev/eventing/pkg/auth" fakeeventingclient "knative.dev/eventing/pkg/client/injection/client/fake" "knative.dev/pkg/apis" @@ -52,8 +50,6 @@ import ( . "knative.dev/pkg/reconciler/testing" . "knative.dev/eventing/pkg/reconciler/testing/v1" - - fakekubeclient "knative.dev/pkg/client/injection/kube/client/fake" ) const ( @@ -190,7 +186,6 @@ func TestAllCases(t *testing.T) { WithInitSequenceConditions, WithSequenceChannelTemplateSpec(imc), WithSequenceSteps([]v1.SequenceStep{{Destination: createDestination(0)}}), - WithSequenceOIDCIdentityCreatedSucceededBecauseOIDCFeatureDisabled(), WithSequenceChannelsNotReady("ChannelsNotReady", "Channels are not ready yet, or there are none"), WithSequenceAddressableNotReady("emptyAddress", "addressable is nil"), WithSequenceSubscriptionsNotReady("SubscriptionsNotReady", "Subscriptions are not ready yet, or there are none"), @@ -465,8 +460,7 @@ func TestAllCases(t *testing.T) { Message: "Subscription does not have Ready condition", }, }, - }), - WithSequenceOIDCIdentityCreatedSucceededBecauseOIDCFeatureDisabled()), + })), }}, }, { Name: "threestep", @@ -602,8 +596,7 @@ func TestAllCases(t *testing.T) { Message: "Subscription does not have Ready condition", }, }, - }), - WithSequenceOIDCIdentityCreatedSucceededBecauseOIDCFeatureDisabled()), + })), }}, }, { Name: "threestepwithdeliveryontwo", @@ -739,8 +732,7 @@ func TestAllCases(t *testing.T) { Message: "Subscription does not have Ready condition", }, }, - }), - WithSequenceOIDCIdentityCreatedSucceededBecauseOIDCFeatureDisabled()), + })), }}, }, { Name: "threestepwithreply", @@ -879,8 +871,7 @@ func TestAllCases(t *testing.T) { Message: "Subscription does not have Ready condition", }, }, - }), - WithSequenceOIDCIdentityCreatedSucceededBecauseOIDCFeatureDisabled()), + })), }}, }, { Name: "sequenceupdatesubscription", @@ -947,8 +938,7 @@ func TestAllCases(t *testing.T) { Message: "Subscription does not have Ready condition", }, }, - }), - WithSequenceOIDCIdentityCreatedSucceededBecauseOIDCFeatureDisabled()), + })), }}, }, { Name: "sequenceupdate-remove-step", @@ -1070,8 +1060,7 @@ func TestAllCases(t *testing.T) { Message: "Subscription does not have Ready condition", }, }, - }), - WithSequenceOIDCIdentityCreatedSucceededBecauseOIDCFeatureDisabled()), + })), }}, }, { Name: "sequenceupdate-remove-step-subscription-removal-fails", @@ -1200,8 +1189,7 @@ func TestAllCases(t *testing.T) { Message: "Subscription does not have Ready condition", }, }, - }), - WithSequenceOIDCIdentityCreatedSucceededBecauseOIDCFeatureDisabled()), + })), }}, }, { Name: "sequenceupdate-remove-step-channel-removal-fails", @@ -1326,167 +1314,21 @@ func TestAllCases(t *testing.T) { }, })), }}, - }, { - Name: "OIDC: creates OIDC service account", - Key: pKey, - Ctx: feature.ToContext(context.Background(), feature.Flags{ - feature.OIDCAuthentication: feature.Enabled, - }), - Objects: []runtime.Object{ - NewSequence(sequenceName, testNS, - WithInitSequenceConditions, - WithSequenceChannelTemplateSpec(imc), - WithSequenceSteps([]v1.SequenceStep{{Destination: createDestination(0)}})), - createChannel(sequenceName, 0), - resources.NewSubscription(0, - NewSequence(sequenceName, testNS, - WithSequenceChannelTemplateSpec(imc), - WithSequenceSteps([]v1.SequenceStep{{Destination: createDestination(0)}})))}, - WantErr: false, - WantCreates: []runtime.Object{ - makeSequenceOIDCServiceAccount(), - }, - WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ - Object: NewSequence(sequenceName, testNS, - WithInitSequenceConditions, - WithSequenceChannelTemplateSpec(imc), - WithSequenceSteps([]v1.SequenceStep{{Destination: createDestination(0)}}), - WithSequenceChannelsNotReady("ChannelsNotReady", "Channels are not ready yet, or there are none"), - WithSequenceAddressableNotReady("emptyAddress", "addressable is nil"), - WithSequenceSubscriptionsNotReady("SubscriptionsNotReady", "Subscriptions are not ready yet, or there are none"), - WithSequenceChannelStatuses([]v1.SequenceChannelStatus{ - { - Channel: corev1.ObjectReference{ - APIVersion: "messaging.knative.dev/v1", - Kind: "InMemoryChannel", - Name: resources.SequenceChannelName(sequenceName, 0), - Namespace: testNS, - }, - ReadyCondition: apis.Condition{ - Type: apis.ConditionReady, - Status: corev1.ConditionUnknown, - Reason: "NoReady", - Message: "Channel does not have Ready condition", - }, - }, - }), - WithSequenceSubscriptionStatuses([]v1.SequenceSubscriptionStatus{ - { - Subscription: corev1.ObjectReference{ - APIVersion: "messaging.knative.dev/v1", - Kind: "Subscription", - Name: resources.SequenceSubscriptionName(sequenceName, 0), - Namespace: testNS, - }, - ReadyCondition: apis.Condition{ - Type: apis.ConditionReady, - Status: corev1.ConditionUnknown, - Reason: "NoReady", - Message: "Subscription does not have Ready condition", - }, - }, - }), - WithSequenceOIDCIdentityCreatedSucceeded(), - WithSequenceOIDCServiceAccountName(makeSequenceOIDCServiceAccount().Name)), - }}, - }, { - Name: "OIDC: Sequence not ready on invalid OIDC service account", - Key: pKey, - Ctx: feature.ToContext(context.Background(), feature.Flags{ - feature.OIDCAuthentication: feature.Enabled, - }), - Objects: []runtime.Object{ - makeSequenceOIDCServiceAccountWithoutOwnerRef(), - NewSequence(sequenceName, testNS, - WithInitSequenceConditions, - WithSequenceChannelTemplateSpec(imc), - WithSequenceSteps([]v1.SequenceStep{{Destination: createDestination(0)}})), - createChannel(sequenceName, 0), - resources.NewSubscription(0, - NewSequence(sequenceName, testNS, - WithSequenceChannelTemplateSpec(imc), - WithSequenceSteps([]v1.SequenceStep{{Destination: createDestination(0)}})))}, - WantErr: true, - WantCreates: []runtime.Object{}, - WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ - Object: NewSequence(sequenceName, testNS, - WithInitSequenceConditions, - WithSequenceChannelTemplateSpec(imc), - WithSequenceSteps([]v1.SequenceStep{{Destination: createDestination(0)}}), - WithSequenceChannelsNotReady("ChannelsNotReady", "Channels are not ready yet, or there are none"), - WithSequenceAddressableNotReady("emptyAddress", "addressable is nil"), - WithSequenceSubscriptionsNotReady("SubscriptionsNotReady", "Subscriptions are not ready yet, or there are none"), - WithSequenceChannelStatuses([]v1.SequenceChannelStatus{ - { - Channel: corev1.ObjectReference{ - APIVersion: "messaging.knative.dev/v1", - Kind: "InMemoryChannel", - Name: resources.SequenceChannelName(sequenceName, 0), - Namespace: testNS, - }, - ReadyCondition: apis.Condition{ - Type: apis.ConditionReady, - Status: corev1.ConditionUnknown, - Reason: "NoReady", - Message: "Channel does not have Ready condition", - }, - }, - }), - WithSequenceSubscriptionStatuses([]v1.SequenceSubscriptionStatus{ - { - Subscription: corev1.ObjectReference{ - APIVersion: "messaging.knative.dev/v1", - Kind: "Subscription", - Name: resources.SequenceSubscriptionName(sequenceName, 0), - Namespace: testNS, - }, - ReadyCondition: apis.Condition{ - Type: apis.ConditionReady, - Status: corev1.ConditionUnknown, - Reason: "NoReady", - Message: "Subscription does not have Ready condition", - }, - }, - }), - WithSequenceOIDCIdentityCreatedFailed("Unable to resolve service account for OIDC authentication", fmt.Sprintf("service account %s not owned by Sequence %s", makeSequenceOIDCServiceAccountWithoutOwnerRef().Name, sequenceName)), - WithSequenceOIDCServiceAccountName(makeSequenceOIDCServiceAccountWithoutOwnerRef().Name)), - }}, - WantEvents: []string{ - Eventf(corev1.EventTypeWarning, "InternalError", fmt.Sprintf("service account %s not owned by Sequence %s", makeSequenceOIDCServiceAccountWithoutOwnerRef().Name, sequenceName)), - }, - }} + }, + } logger := logtesting.TestLogger(t) table.Test(t, MakeFactory(func(ctx context.Context, listers *Listers, cmw configmap.Watcher) controller.Reconciler { ctx = channelable.WithDuck(ctx) r := &Reconciler{ - sequenceLister: listers.GetSequenceLister(), - channelableTracker: duck.NewListableTrackerFromTracker(ctx, channelable.Get, tracker.New(func(types.NamespacedName) {}, 0)), - subscriptionLister: listers.GetSubscriptionLister(), - eventingClientSet: fakeeventingclient.Get(ctx), - dynamicClientSet: fakedynamicclient.Get(ctx), - kubeclient: fakekubeclient.Get(ctx), - serviceAccountLister: listers.GetServiceAccountLister(), + sequenceLister: listers.GetSequenceLister(), + channelableTracker: duck.NewListableTrackerFromTracker(ctx, channelable.Get, tracker.New(func(types.NamespacedName) {}, 0)), + subscriptionLister: listers.GetSubscriptionLister(), + eventingClientSet: fakeeventingclient.Get(ctx), + dynamicClientSet: fakedynamicclient.Get(ctx), } return sequence.NewReconciler(ctx, logging.FromContext(ctx), fakeeventingclient.Get(ctx), listers.GetSequenceLister(), controller.GetEventRecorder(ctx), r) }, false, logger)) } - -func makeSequenceOIDCServiceAccount() *corev1.ServiceAccount { - return auth.GetOIDCServiceAccountForResource(v1.SchemeGroupVersion.WithKind("Sequence"), metav1.ObjectMeta{ - Name: sequenceName, - Namespace: testNS, - }) -} - -func makeSequenceOIDCServiceAccountWithoutOwnerRef() *corev1.ServiceAccount { - sa := auth.GetOIDCServiceAccountForResource(v1.SchemeGroupVersion.WithKind("Sequence"), metav1.ObjectMeta{ - Name: sequenceName, - Namespace: testNS, - }) - sa.OwnerReferences = nil - - return sa -} diff --git a/pkg/reconciler/testing/v1/sequence.go b/pkg/reconciler/testing/v1/sequence.go index 64232984124..d54e2b5f113 100644 --- a/pkg/reconciler/testing/v1/sequence.go +++ b/pkg/reconciler/testing/v1/sequence.go @@ -18,11 +18,9 @@ package testing import ( "context" - "fmt" "time" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "knative.dev/eventing/pkg/apis/feature" flowsv1 "knative.dev/eventing/pkg/apis/flows/v1" messagingv1 "knative.dev/eventing/pkg/apis/messaging/v1" duckv1 "knative.dev/pkg/apis/duck/v1" @@ -115,31 +113,3 @@ func WithSequenceAddressableNotReady(reason, message string) SequenceOption { p.Status.MarkAddressableNotReady(reason, message) } } - -func WithSequenceOIDCIdentityCreatedSucceeded() SequenceOption { - return func(s *flowsv1.Sequence) { - s.Status.MarkOIDCIdentityCreatedSucceeded() - } -} - -func WithSequenceOIDCIdentityCreatedSucceededBecauseOIDCFeatureDisabled() SequenceOption { - return func(s *flowsv1.Sequence) { - s.Status.MarkOIDCIdentityCreatedSucceededWithReason(fmt.Sprintf("%s feature disabled", feature.OIDCAuthentication), "") - } -} - -func WithSequenceOIDCIdentityCreatedFailed(reason, message string) SequenceOption { - return func(s *flowsv1.Sequence) { - s.Status.MarkOIDCIdentityCreatedFailed(reason, message) - } -} - -func WithSequenceOIDCServiceAccountName(name string) SequenceOption { - return func(s *flowsv1.Sequence) { - if s.Status.Auth == nil { - s.Status.Auth = &duckv1.AuthStatus{} - } - - s.Status.Auth.ServiceAccountName = &name - } -} From 7ab72b8567c686775504e02810dce9cf94b1a2d3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Christoph=20St=C3=A4bler?= Date: Tue, 7 May 2024 13:16:34 +0200 Subject: [PATCH 3/8] Update Sequence to expose OIDC identities of underlying Subscriptions --- pkg/apis/flows/v1/sequence_lifecycle.go | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/pkg/apis/flows/v1/sequence_lifecycle.go b/pkg/apis/flows/v1/sequence_lifecycle.go index 6614a1e57c6..3dccf690077 100644 --- a/pkg/apis/flows/v1/sequence_lifecycle.go +++ b/pkg/apis/flows/v1/sequence_lifecycle.go @@ -81,12 +81,13 @@ func (ss *SequenceStatus) InitializeConditions() { // the status of the incoming subscriptions. func (ss *SequenceStatus) PropagateSubscriptionStatuses(subscriptions []*messagingv1.Subscription) { ss.SubscriptionStatuses = make([]SequenceSubscriptionStatus, len(subscriptions)) + ss.Auth = nil allReady := true // If there are no subscriptions, treat that as a False case. Could go either way, but this seems right. if len(subscriptions) == 0 { allReady = false - } + for i, s := range subscriptions { ss.SubscriptionStatuses[i] = SequenceSubscriptionStatus{ Subscription: corev1.ObjectReference{ @@ -113,6 +114,13 @@ func (ss *SequenceStatus) PropagateSubscriptionStatuses(subscriptions []*messagi allReady = false } + if s.Status.Auth != nil && s.Status.Auth.ServiceAccountName != nil { + if ss.Auth == nil { + ss.Auth = &duckv1.AuthStatus{} + } + + ss.Auth.ServiceAccountNames = append(ss.Auth.ServiceAccountNames, *s.Status.Auth.ServiceAccountName) + } } if allReady { sCondSet.Manage(ss).MarkTrue(SequenceConditionSubscriptionsReady) From 0ea5d8c62786300eee73dc58a83f18ceb10678fb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Christoph=20St=C3=A4bler?= Date: Tue, 7 May 2024 15:09:31 +0200 Subject: [PATCH 4/8] Revert "Add serviceaccount in parallel (#7373)" This reverts commit dc965225f63560e79d11d6729a2c720c254b0db7. --- pkg/apis/flows/v1/parallel_lifecycle.go | 21 +-- pkg/apis/flows/v1/parallel_lifecycle_test.go | 173 +++++++------------ pkg/reconciler/parallel/controller.go | 40 +---- pkg/reconciler/parallel/controller_test.go | 24 +-- pkg/reconciler/parallel/parallel.go | 17 +- pkg/reconciler/parallel/parallel_test.go | 121 +------------ pkg/reconciler/testing/v1/parallel.go | 30 ---- 7 files changed, 82 insertions(+), 344 deletions(-) diff --git a/pkg/apis/flows/v1/parallel_lifecycle.go b/pkg/apis/flows/v1/parallel_lifecycle.go index 34467c058de..b02363ebfb8 100644 --- a/pkg/apis/flows/v1/parallel_lifecycle.go +++ b/pkg/apis/flows/v1/parallel_lifecycle.go @@ -25,7 +25,7 @@ import ( pkgduckv1 "knative.dev/pkg/apis/duck/v1" ) -var pCondSet = apis.NewLivingConditionSet(ParallelConditionReady, ParallelConditionChannelsReady, ParallelConditionSubscriptionsReady, ParallelConditionAddressable, ParallelConditionOIDCIdentityCreated) +var pCondSet = apis.NewLivingConditionSet(ParallelConditionReady, ParallelConditionChannelsReady, ParallelConditionSubscriptionsReady, ParallelConditionAddressable) const ( // ParallelConditionReady has status True when all subconditions below have been set to True. @@ -41,8 +41,7 @@ const ( // ParallelConditionAddressable has status true when this Parallel meets // the Addressable contract and has a non-empty hostname. - ParallelConditionAddressable apis.ConditionType = "Addressable" - ParallelConditionOIDCIdentityCreated apis.ConditionType = "OIDCIdentityCreated" + ParallelConditionAddressable apis.ConditionType = "Addressable" ) // GetConditionSet retrieves the condition set for this resource. Implements the KRShaped interface. @@ -196,22 +195,6 @@ func (ps *ParallelStatus) MarkAddressableNotReady(reason, messageFormat string, pCondSet.Manage(ps).MarkFalse(ParallelConditionAddressable, reason, messageFormat, messageA...) } -func (ps *ParallelStatus) MarkOIDCIdentityCreatedSucceeded() { - pCondSet.Manage(ps).MarkTrue(ParallelConditionOIDCIdentityCreated) -} - -func (ps *ParallelStatus) MarkOIDCIdentityCreatedSucceededWithReason(reason, messageFormat string, messageA ...interface{}) { - pCondSet.Manage(ps).MarkTrueWithReason(ParallelConditionOIDCIdentityCreated, reason, messageFormat, messageA...) -} - -func (ps *ParallelStatus) MarkOIDCIdentityCreatedFailed(reason, messageFormat string, messageA ...interface{}) { - pCondSet.Manage(ps).MarkFalse(ParallelConditionOIDCIdentityCreated, reason, messageFormat, messageA...) -} - -func (ps *ParallelStatus) MarkOIDCIdentityCreatedUnknown(reason, messageFormat string, messageA ...interface{}) { - pCondSet.Manage(ps).MarkUnknown(ParallelConditionOIDCIdentityCreated, reason, messageFormat, messageA...) -} - func (ps *ParallelStatus) setAddress(address *pkgduckv1.Addressable) { ps.Address = address if address == nil { diff --git a/pkg/apis/flows/v1/parallel_lifecycle_test.go b/pkg/apis/flows/v1/parallel_lifecycle_test.go index d193951325f..d87f3026be4 100644 --- a/pkg/apis/flows/v1/parallel_lifecycle_test.go +++ b/pkg/apis/flows/v1/parallel_lifecycle_test.go @@ -88,9 +88,6 @@ func TestParallelInitializeConditions(t *testing.T) { }, { Type: ParallelConditionChannelsReady, Status: corev1.ConditionUnknown, - }, { - Type: ParallelConditionOIDCIdentityCreated, - Status: corev1.ConditionUnknown, }, { Type: ParallelConditionReady, Status: corev1.ConditionUnknown, @@ -118,9 +115,6 @@ func TestParallelInitializeConditions(t *testing.T) { }, { Type: ParallelConditionChannelsReady, Status: corev1.ConditionFalse, - }, { - Type: ParallelConditionOIDCIdentityCreated, - Status: corev1.ConditionUnknown, }, { Type: ParallelConditionReady, Status: corev1.ConditionUnknown, @@ -148,9 +142,6 @@ func TestParallelInitializeConditions(t *testing.T) { }, { Type: ParallelConditionChannelsReady, Status: corev1.ConditionUnknown, - }, { - Type: ParallelConditionOIDCIdentityCreated, - Status: corev1.ConditionUnknown, }, { Type: ParallelConditionReady, Status: corev1.ConditionUnknown, @@ -336,109 +327,82 @@ func TestParallelPropagateSubscriptionStatusUpdated(t *testing.T) { func TestParallelReady(t *testing.T) { tests := []struct { - name string - fsubs []*messagingv1.Subscription - subs []*messagingv1.Subscription - ichannel *eventingduckv1.Channelable - channels []*eventingduckv1.Channelable - markOIDCServiceAccountCreated bool - want bool + name string + fsubs []*messagingv1.Subscription + subs []*messagingv1.Subscription + ichannel *eventingduckv1.Channelable + channels []*eventingduckv1.Channelable + want bool }{{ - name: "ingress false, empty, serviceAccount ready", - fsubs: []*messagingv1.Subscription{}, - subs: []*messagingv1.Subscription{}, - ichannel: getChannelable(false), - channels: []*eventingduckv1.Channelable{}, - markOIDCServiceAccountCreated: true, - want: false, - }, { - name: "ingress true, empty, serviceAccount ready", - fsubs: []*messagingv1.Subscription{}, - subs: []*messagingv1.Subscription{}, - ichannel: getChannelable(true), - channels: []*eventingduckv1.Channelable{}, - markOIDCServiceAccountCreated: true, - want: false, - }, { - name: "ingress true, one channelable not ready, one subscription ready, serviceAccount ready", - ichannel: getChannelable(true), - channels: []*eventingduckv1.Channelable{getChannelable(false)}, - fsubs: []*messagingv1.Subscription{getSubscription("fsub0", true)}, - subs: []*messagingv1.Subscription{getSubscription("sub0", true)}, - markOIDCServiceAccountCreated: true, - want: false, + name: "ingress false, empty", + fsubs: []*messagingv1.Subscription{}, + subs: []*messagingv1.Subscription{}, + ichannel: getChannelable(false), + channels: []*eventingduckv1.Channelable{}, + want: false, }, { - name: "ingress true, one channelable ready, one subscription not ready, serviceAccount ready", - ichannel: getChannelable(true), - channels: []*eventingduckv1.Channelable{getChannelable(true)}, - fsubs: []*messagingv1.Subscription{getSubscription("fsub0", false)}, - subs: []*messagingv1.Subscription{getSubscription("sub0", false)}, - markOIDCServiceAccountCreated: true, - want: false, + name: "ingress true, empty", + fsubs: []*messagingv1.Subscription{}, + subs: []*messagingv1.Subscription{}, + ichannel: getChannelable(true), + channels: []*eventingduckv1.Channelable{}, + want: false, }, { - name: "ingress false, one channelable ready, one subscription ready,serviceAccount ready", - ichannel: getChannelable(false), - channels: []*eventingduckv1.Channelable{getChannelable(true)}, - fsubs: []*messagingv1.Subscription{getSubscription("fsub0", true)}, - subs: []*messagingv1.Subscription{getSubscription("sub0", true)}, - markOIDCServiceAccountCreated: true, - want: false, + name: "ingress true, one channelable not ready, one subscription ready", + ichannel: getChannelable(true), + channels: []*eventingduckv1.Channelable{getChannelable(false)}, + fsubs: []*messagingv1.Subscription{getSubscription("fsub0", true)}, + subs: []*messagingv1.Subscription{getSubscription("sub0", true)}, + want: false, }, { - name: "ingress true, one channelable ready, one subscription ready, serviceAccount ready", - ichannel: getChannelable(true), - channels: []*eventingduckv1.Channelable{getChannelable(true)}, - fsubs: []*messagingv1.Subscription{getSubscription("fsub0", true)}, - subs: []*messagingv1.Subscription{getSubscription("sub0", true)}, - markOIDCServiceAccountCreated: true, - want: true, + name: "ingress true, one channelable ready, one subscription not ready", + ichannel: getChannelable(true), + channels: []*eventingduckv1.Channelable{getChannelable(true)}, + fsubs: []*messagingv1.Subscription{getSubscription("fsub0", false)}, + subs: []*messagingv1.Subscription{getSubscription("sub0", false)}, + want: false, }, { - name: "ingress true, one channelable ready, one subscription ready, serviceAccount not ready", - ichannel: getChannelable(true), - channels: []*eventingduckv1.Channelable{getChannelable(true)}, - fsubs: []*messagingv1.Subscription{getSubscription("fsub0", true)}, - subs: []*messagingv1.Subscription{getSubscription("sub0", true)}, - markOIDCServiceAccountCreated: false, - want: false, + name: "ingress false, one channelable ready, one subscription ready", + ichannel: getChannelable(false), + channels: []*eventingduckv1.Channelable{getChannelable(true)}, + fsubs: []*messagingv1.Subscription{getSubscription("fsub0", true)}, + subs: []*messagingv1.Subscription{getSubscription("sub0", true)}, + want: false, }, { - name: "ingress true, one channelable ready, one not, two subscriptions ready, serviceAccount ready", - ichannel: getChannelable(true), - channels: []*eventingduckv1.Channelable{getChannelable(true), getChannelable(false)}, - fsubs: []*messagingv1.Subscription{getSubscription("fsub0", true), getSubscription("fsub1", true)}, - subs: []*messagingv1.Subscription{getSubscription("sub0", true), getSubscription("sub1", true)}, - markOIDCServiceAccountCreated: true, - want: false, + name: "ingress true, one channelable ready, one subscription ready", + ichannel: getChannelable(true), + channels: []*eventingduckv1.Channelable{getChannelable(true)}, + fsubs: []*messagingv1.Subscription{getSubscription("fsub0", true)}, + subs: []*messagingv1.Subscription{getSubscription("sub0", true)}, + want: true, }, { - name: "ingress true, two channelables ready, one subscription ready, one not, serviceAccount ready", - ichannel: getChannelable(true), - channels: []*eventingduckv1.Channelable{getChannelable(true), getChannelable(true)}, - fsubs: []*messagingv1.Subscription{getSubscription("fsub0", true), getSubscription("fsub1", false)}, - subs: []*messagingv1.Subscription{getSubscription("sub0", true), getSubscription("sub1", false)}, - markOIDCServiceAccountCreated: true, - want: false, + name: "ingress true, one channelable ready, one not, two subscriptions ready", + ichannel: getChannelable(true), + channels: []*eventingduckv1.Channelable{getChannelable(true), getChannelable(false)}, + fsubs: []*messagingv1.Subscription{getSubscription("fsub0", true), getSubscription("fsub1", true)}, + subs: []*messagingv1.Subscription{getSubscription("sub0", true), getSubscription("sub1", true)}, + want: false, }, { - name: "ingress false, two channelables ready, two subscriptions ready, serviceAccount ready", - ichannel: getChannelable(false), - channels: []*eventingduckv1.Channelable{getChannelable(true), getChannelable(true)}, - fsubs: []*messagingv1.Subscription{getSubscription("fsub0", true), getSubscription("fsub1", true)}, - subs: []*messagingv1.Subscription{getSubscription("sub0", true), getSubscription("sub1", true)}, - markOIDCServiceAccountCreated: true, - want: false, + name: "ingress true, two channelables ready, one subscription ready, one not", + ichannel: getChannelable(true), + channels: []*eventingduckv1.Channelable{getChannelable(true), getChannelable(true)}, + fsubs: []*messagingv1.Subscription{getSubscription("fsub0", true), getSubscription("fsub1", false)}, + subs: []*messagingv1.Subscription{getSubscription("sub0", true), getSubscription("sub1", false)}, + want: false, }, { - name: "ingress true, two channelables ready, two subscriptions ready, serviceAccount not ready", - ichannel: getChannelable(true), - channels: []*eventingduckv1.Channelable{getChannelable(true), getChannelable(true)}, - fsubs: []*messagingv1.Subscription{getSubscription("fsub0", true), getSubscription("fsub1", true)}, - subs: []*messagingv1.Subscription{getSubscription("sub0", true), getSubscription("sub1", true)}, - markOIDCServiceAccountCreated: false, - want: false, + name: "ingress false, two channelables ready, two subscriptions ready", + ichannel: getChannelable(false), + channels: []*eventingduckv1.Channelable{getChannelable(true), getChannelable(true)}, + fsubs: []*messagingv1.Subscription{getSubscription("fsub0", true), getSubscription("fsub1", true)}, + subs: []*messagingv1.Subscription{getSubscription("sub0", true), getSubscription("sub1", true)}, + want: false, }, { - name: "ingress true, two channelables ready, two subscriptions ready, serviceAccount ready", - ichannel: getChannelable(true), - channels: []*eventingduckv1.Channelable{getChannelable(true), getChannelable(true)}, - fsubs: []*messagingv1.Subscription{getSubscription("fsub0", true), getSubscription("fsub1", true)}, - subs: []*messagingv1.Subscription{getSubscription("sub0", true), getSubscription("sub1", true)}, - markOIDCServiceAccountCreated: true, - want: true, + name: "ingress true, two channelables ready, two subscriptions ready", + ichannel: getChannelable(true), + channels: []*eventingduckv1.Channelable{getChannelable(true), getChannelable(true)}, + fsubs: []*messagingv1.Subscription{getSubscription("fsub0", true), getSubscription("fsub1", true)}, + subs: []*messagingv1.Subscription{getSubscription("sub0", true), getSubscription("sub1", true)}, + want: true, }} for _, test := range tests { @@ -446,11 +410,6 @@ func TestParallelReady(t *testing.T) { ps := ParallelStatus{} ps.PropagateChannelStatuses(test.ichannel, test.channels) ps.PropagateSubscriptionStatuses(test.fsubs, test.subs) - if test.markOIDCServiceAccountCreated { - ps.MarkOIDCIdentityCreatedSucceeded() - } else { - ps.MarkOIDCIdentityCreatedFailed("Unable to create serviceaccount", "") - } got := ps.IsReady() want := test.want if want != got { diff --git a/pkg/reconciler/parallel/controller.go b/pkg/reconciler/parallel/controller.go index 524b968836d..71121995dee 100644 --- a/pkg/reconciler/parallel/controller.go +++ b/pkg/reconciler/parallel/controller.go @@ -19,18 +19,12 @@ package parallel import ( "context" - "knative.dev/eventing/pkg/auth" - "k8s.io/client-go/tools/cache" - "knative.dev/eventing/pkg/apis/feature" v1 "knative.dev/eventing/pkg/apis/flows/v1" "knative.dev/eventing/pkg/duck" - kubeclient "knative.dev/pkg/client/injection/kube/client" - serviceaccountinformer "knative.dev/pkg/client/injection/kube/informers/core/v1/serviceaccount/filtered" "knative.dev/pkg/configmap" "knative.dev/pkg/controller" "knative.dev/pkg/injection/clients/dynamicclient" - "knative.dev/pkg/logging" eventingclient "knative.dev/eventing/pkg/client/injection/client" "knative.dev/eventing/pkg/client/injection/ducks/duck/v1/channelable" @@ -48,33 +42,14 @@ func NewController( parallelInformer := parallel.Get(ctx) subscriptionInformer := subscription.Get(ctx) - oidcServiceaccountInformer := serviceaccountinformer.Get(ctx, auth.OIDCLabelSelector) - - var globalResync func(obj interface{}) - featureStore := feature.NewStore(logging.FromContext(ctx).Named("feature-config-store"), func(name string, value interface{}) { - if globalResync != nil { - globalResync(nil) - } - }) - featureStore.WatchConfigs(cmw) r := &Reconciler{ - parallelLister: parallelInformer.Lister(), - subscriptionLister: subscriptionInformer.Lister(), - serviceAccountLister: oidcServiceaccountInformer.Lister(), - kubeclient: kubeclient.Get(ctx), - dynamicClientSet: dynamicclient.Get(ctx), - eventingClientSet: eventingclient.Get(ctx), - } - impl := parallelreconciler.NewImpl(ctx, r, func(impl *controller.Impl) controller.Options { - return controller.Options{ - ConfigStore: featureStore, - } - }) - - globalResync = func(_ interface{}) { - impl.GlobalResync(parallelInformer.Informer()) + parallelLister: parallelInformer.Lister(), + subscriptionLister: subscriptionInformer.Lister(), + dynamicClientSet: dynamicclient.Get(ctx), + eventingClientSet: eventingclient.Get(ctx), } + impl := parallelreconciler.NewImpl(ctx, r) r.channelableTracker = duck.NewListableTrackerFromTracker(ctx, channelable.Get, impl.Tracker) parallelInformer.Informer().AddEventHandler(controller.HandleAll(impl.Enqueue)) @@ -85,11 +60,6 @@ func NewController( FilterFunc: controller.FilterController(&v1.Parallel{}), Handler: controller.HandleAll(impl.EnqueueControllerOf), }) - // Reconcile Parallel when the OIDC service account changes - oidcServiceaccountInformer.Informer().AddEventHandler(cache.FilteringResourceEventHandler{ - FilterFunc: controller.FilterController(&v1.Parallel{}), - Handler: controller.HandleAll(impl.EnqueueControllerOf), - }) return impl } diff --git a/pkg/reconciler/parallel/controller_test.go b/pkg/reconciler/parallel/controller_test.go index 3af5abc8a1c..acf3b8c5258 100644 --- a/pkg/reconciler/parallel/controller_test.go +++ b/pkg/reconciler/parallel/controller_test.go @@ -17,43 +17,23 @@ limitations under the License. package parallel import ( - "context" "testing" - "knative.dev/eventing/pkg/auth" - filteredFactory "knative.dev/pkg/client/injection/kube/informers/factory/filtered" - - corev1 "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "knative.dev/pkg/configmap" . "knative.dev/pkg/reconciler/testing" // Fake injection informers - "knative.dev/eventing/pkg/apis/feature" _ "knative.dev/eventing/pkg/client/injection/ducks/duck/v1/channelable/fake" _ "knative.dev/eventing/pkg/client/injection/informers/flows/v1/parallel/fake" _ "knative.dev/eventing/pkg/client/injection/informers/messaging/v1/subscription/fake" - _ "knative.dev/pkg/client/injection/kube/informers/core/v1/serviceaccount/filtered/fake" - _ "knative.dev/pkg/client/injection/kube/informers/factory/filtered/fake" ) func TestNew(t *testing.T) { - ctx, _ := SetupFakeContext(t, SetUpInformerSelector) + ctx, _ := SetupFakeContext(t) - c := NewController(ctx, configmap.NewStaticWatcher( - &corev1.ConfigMap{ - ObjectMeta: metav1.ObjectMeta{ - Name: feature.FlagsConfigName, - }, - }, - )) + c := NewController(ctx, configmap.NewStaticWatcher()) if c == nil { t.Fatal("Expected NewController to return a non-nil value") } } - -func SetUpInformerSelector(ctx context.Context) context.Context { - ctx = filteredFactory.WithSelectors(ctx, auth.OIDCLabelSelector) - return ctx -} diff --git a/pkg/reconciler/parallel/parallel.go b/pkg/reconciler/parallel/parallel.go index 12a593dc51e..26f3521b03c 100644 --- a/pkg/reconciler/parallel/parallel.go +++ b/pkg/reconciler/parallel/parallel.go @@ -30,30 +30,24 @@ import ( "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/client-go/dynamic" - "k8s.io/client-go/kubernetes" duckapis "knative.dev/pkg/apis/duck" "knative.dev/pkg/kmeta" "knative.dev/pkg/logging" pkgreconciler "knative.dev/pkg/reconciler" - corev1listers "k8s.io/client-go/listers/core/v1" duckv1 "knative.dev/eventing/pkg/apis/duck/v1" - "knative.dev/eventing/pkg/apis/feature" v1 "knative.dev/eventing/pkg/apis/flows/v1" messagingv1 "knative.dev/eventing/pkg/apis/messaging/v1" - "knative.dev/eventing/pkg/auth" clientset "knative.dev/eventing/pkg/client/clientset/versioned" parallelreconciler "knative.dev/eventing/pkg/client/injection/reconciler/flows/v1/parallel" listers "knative.dev/eventing/pkg/client/listers/flows/v1" messaginglisters "knative.dev/eventing/pkg/client/listers/messaging/v1" ducklib "knative.dev/eventing/pkg/duck" "knative.dev/eventing/pkg/reconciler/parallel/resources" - duckv1knative "knative.dev/pkg/apis/duck/v1" ) type Reconciler struct { - kubeclient kubernetes.Interface // listers index properties about resources parallelLister listers.ParallelLister channelableTracker ducklib.ListableTracker @@ -63,8 +57,7 @@ type Reconciler struct { eventingClientSet clientset.Interface // dynamicClientSet allows us to configure pluggable Build objects - dynamicClientSet dynamic.Interface - serviceAccountLister corev1listers.ServiceAccountLister + dynamicClientSet dynamic.Interface } // Check that our Reconciler implements parallelreconciler.Interface @@ -78,14 +71,6 @@ func (r *Reconciler) ReconcileKind(ctx context.Context, p *v1.Parallel) pkgrecon // 2.2 create a Subscription to the filter Channel, subscribe the subscriber and send reply to // either the branch Reply. If not present, send reply to the global Reply. If not present, do not send reply. // 3. Rinse and repeat step #2 above for each branch in the list - // OIDC authentication - featureFlags := feature.FromContext(ctx) - if err := auth.SetupOIDCServiceAccount(ctx, featureFlags, r.serviceAccountLister, r.kubeclient, v1.SchemeGroupVersion.WithKind("Parallel"), p.ObjectMeta, &p.Status, func(as *duckv1knative.AuthStatus) { - p.Status.Auth = as - }); err != nil { - return err - } - if p.Status.BranchStatuses == nil { p.Status.BranchStatuses = make([]v1.ParallelBranchStatus, 0) } diff --git a/pkg/reconciler/parallel/parallel_test.go b/pkg/reconciler/parallel/parallel_test.go index 9dabaeb1656..54dbec0d76f 100644 --- a/pkg/reconciler/parallel/parallel_test.go +++ b/pkg/reconciler/parallel/parallel_test.go @@ -21,7 +21,6 @@ import ( "fmt" "testing" - "knative.dev/eventing/pkg/auth" fakeeventingclient "knative.dev/eventing/pkg/client/injection/client/fake" fakedynamicclient "knative.dev/pkg/injection/clients/dynamicclient/fake" "knative.dev/pkg/tracker" @@ -48,11 +47,9 @@ import ( v1 "knative.dev/eventing/pkg/apis/flows/v1" - "knative.dev/eventing/pkg/apis/feature" messagingv1 "knative.dev/eventing/pkg/apis/messaging/v1" "knative.dev/eventing/pkg/reconciler/parallel/resources" . "knative.dev/eventing/pkg/reconciler/testing/v1" - fakekubeclient "knative.dev/pkg/client/injection/kube/client/fake" ) const ( @@ -140,7 +137,6 @@ func TestAllBranches(t *testing.T) { WithFlowsParallelAddressableNotReady("emptyAddress", "addressable is nil"), WithFlowsParallelSubscriptionsNotReady("SubscriptionsNotReady", "Subscriptions are not ready yet, or there are none"), WithFlowsParallelIngressChannelStatus(createParallelChannelStatus(parallelName, corev1.ConditionFalse)), - WithFlowsParallelOIDCIdentityCreatedSucceededBecauseOIDCFeatureDisabled(), WithFlowsParallelBranchStatuses([]v1.ParallelBranchStatus{{ FilterSubscriptionStatus: createParallelFilterSubscriptionStatus(parallelName, 0, corev1.ConditionFalse), FilterChannelStatus: createParallelBranchChannelStatus(parallelName, 0, corev1.ConditionFalse), @@ -177,7 +173,6 @@ func TestAllBranches(t *testing.T) { WithFlowsParallelAddressableNotReady("emptyAddress", "addressable is nil"), WithFlowsParallelSubscriptionsNotReady("SubscriptionsNotReady", "Subscriptions are not ready yet, or there are none"), WithFlowsParallelIngressChannelStatus(createParallelChannelStatus(parallelName, corev1.ConditionFalse)), - WithFlowsParallelOIDCIdentityCreatedSucceededBecauseOIDCFeatureDisabled(), WithFlowsParallelBranchStatuses([]v1.ParallelBranchStatus{{ FilterSubscriptionStatus: createParallelFilterSubscriptionStatus(parallelName, 0, corev1.ConditionFalse), FilterChannelStatus: createParallelBranchChannelStatus(parallelName, 0, corev1.ConditionFalse), @@ -214,7 +209,6 @@ func TestAllBranches(t *testing.T) { WithFlowsParallelAddressableNotReady("emptyAddress", "addressable is nil"), WithFlowsParallelSubscriptionsNotReady("SubscriptionsNotReady", "Subscriptions are not ready yet, or there are none"), WithFlowsParallelIngressChannelStatus(createParallelChannelStatus(parallelName, corev1.ConditionFalse)), - WithFlowsParallelOIDCIdentityCreatedSucceededBecauseOIDCFeatureDisabled(), WithFlowsParallelBranchStatuses([]v1.ParallelBranchStatus{{ FilterSubscriptionStatus: createParallelFilterSubscriptionStatus(parallelName, 0, corev1.ConditionFalse), FilterChannelStatus: createParallelBranchChannelStatus(parallelName, 0, corev1.ConditionFalse), @@ -255,7 +249,6 @@ func TestAllBranches(t *testing.T) { WithFlowsParallelChannelsNotReady("ChannelsNotReady", "Channels are not ready yet, or there are none"), WithFlowsParallelSubscriptionsNotReady("SubscriptionsNotReady", "Subscriptions are not ready yet, or there are none"), WithFlowsParallelIngressChannelStatus(createParallelChannelStatus(parallelName, corev1.ConditionFalse)), - WithFlowsParallelOIDCIdentityCreatedSucceededBecauseOIDCFeatureDisabled(), WithFlowsParallelBranchStatuses([]v1.ParallelBranchStatus{{ FilterSubscriptionStatus: createParallelFilterSubscriptionStatus(parallelName, 0, corev1.ConditionFalse), FilterChannelStatus: createParallelBranchChannelStatus(parallelName, 0, corev1.ConditionFalse), @@ -296,7 +289,6 @@ func TestAllBranches(t *testing.T) { WithFlowsParallelChannelsNotReady("ChannelsNotReady", "Channels are not ready yet, or there are none"), WithFlowsParallelSubscriptionsNotReady("SubscriptionsNotReady", "Subscriptions are not ready yet, or there are none"), WithFlowsParallelIngressChannelStatus(createParallelChannelStatus(parallelName, corev1.ConditionFalse)), - WithFlowsParallelOIDCIdentityCreatedSucceededBecauseOIDCFeatureDisabled(), WithFlowsParallelBranchStatuses([]v1.ParallelBranchStatus{{ FilterSubscriptionStatus: createParallelFilterSubscriptionStatus(parallelName, 0, corev1.ConditionFalse), FilterChannelStatus: createParallelBranchChannelStatus(parallelName, 0, corev1.ConditionFalse), @@ -347,7 +339,6 @@ func TestAllBranches(t *testing.T) { WithFlowsParallelAddressableNotReady("emptyAddress", "addressable is nil"), WithFlowsParallelSubscriptionsNotReady("SubscriptionsNotReady", "Subscriptions are not ready yet, or there are none"), WithFlowsParallelIngressChannelStatus(createParallelChannelStatus(parallelName, corev1.ConditionFalse)), - WithFlowsParallelOIDCIdentityCreatedSucceededBecauseOIDCFeatureDisabled(), WithFlowsParallelBranchStatuses([]v1.ParallelBranchStatus{ { FilterSubscriptionStatus: createParallelFilterSubscriptionStatus(parallelName, 0, corev1.ConditionFalse), @@ -407,7 +398,6 @@ func TestAllBranches(t *testing.T) { WithFlowsParallelAddressableNotReady("emptyAddress", "addressable is nil"), WithFlowsParallelSubscriptionsNotReady("SubscriptionsNotReady", "Subscriptions are not ready yet, or there are none"), WithFlowsParallelIngressChannelStatus(createParallelChannelStatus(parallelName, corev1.ConditionFalse)), - WithFlowsParallelOIDCIdentityCreatedSucceededBecauseOIDCFeatureDisabled(), WithFlowsParallelBranchStatuses([]v1.ParallelBranchStatus{ { FilterSubscriptionStatus: createParallelFilterSubscriptionStatus(parallelName, 0, corev1.ConditionFalse), @@ -463,7 +453,6 @@ func TestAllBranches(t *testing.T) { WithFlowsParallelAddressableNotReady("emptyAddress", "addressable is nil"), WithFlowsParallelSubscriptionsNotReady("SubscriptionsNotReady", "Subscriptions are not ready yet, or there are none"), WithFlowsParallelIngressChannelStatus(createParallelChannelStatus(parallelName, corev1.ConditionFalse)), - WithFlowsParallelOIDCIdentityCreatedSucceededBecauseOIDCFeatureDisabled(), WithFlowsParallelBranchStatuses([]v1.ParallelBranchStatus{{ FilterSubscriptionStatus: createParallelFilterSubscriptionStatus(parallelName, 0, corev1.ConditionFalse), FilterChannelStatus: createParallelBranchChannelStatus(parallelName, 0, corev1.ConditionFalse), @@ -485,7 +474,6 @@ func TestAllBranches(t *testing.T) { WithFlowsParallelAddressableNotReady("emptyAddress", "addressable is nil"), WithFlowsParallelSubscriptionsNotReady("SubscriptionsNotReady", "Subscriptions are not ready yet, or there are none"), WithFlowsParallelIngressChannelStatus(createParallelChannelStatus(parallelName, corev1.ConditionFalse)), - WithFlowsParallelOIDCIdentityCreatedSucceededBecauseOIDCFeatureDisabled(), WithFlowsParallelBranchStatuses([]v1.ParallelBranchStatus{ { FilterSubscriptionStatus: createParallelFilterSubscriptionStatus(parallelName, 0, corev1.ConditionFalse), @@ -562,104 +550,24 @@ func TestAllBranches(t *testing.T) { WithFlowsParallelAddressableNotReady("emptyAddress", "addressable is nil"), WithFlowsParallelSubscriptionsNotReady("SubscriptionsNotReady", "Subscriptions are not ready yet, or there are none"), WithFlowsParallelIngressChannelStatus(createParallelChannelStatus(parallelName, corev1.ConditionFalse)), - WithFlowsParallelOIDCIdentityCreatedSucceededBecauseOIDCFeatureDisabled(), WithFlowsParallelBranchStatuses([]v1.ParallelBranchStatus{{ FilterSubscriptionStatus: createParallelFilterSubscriptionStatus(parallelName, 0, corev1.ConditionFalse), FilterChannelStatus: createParallelBranchChannelStatus(parallelName, 0, corev1.ConditionFalse), SubscriptionStatus: createParallelSubscriptionStatus(parallelName, 0, corev1.ConditionFalse), }})), }}, - }, { - Name: "OIDC: creates OIDC service account", - Key: pKey, - Ctx: feature.ToContext(context.Background(), feature.Flags{ - feature.OIDCAuthentication: feature.Enabled, - }), - Objects: []runtime.Object{ - NewFlowsParallel(parallelName, testNS, - WithInitFlowsParallelConditions, - WithFlowsParallelChannelTemplateSpec(imc), - WithFlowsParallelBranches([]v1.ParallelBranch{ - {Subscriber: createSubscriber(0)}, - })), - createChannel(parallelName), - createBranchChannel(parallelName, 0), - resources.NewFilterSubscription(0, NewFlowsParallel(parallelName, testNS, - WithFlowsParallelChannelTemplateSpec(imc), - WithFlowsParallelBranches([]v1.ParallelBranch{ - {Subscriber: createSubscriber(0)}, - }))), - resources.NewSubscription(0, NewFlowsParallel(parallelName, testNS, - WithFlowsParallelChannelTemplateSpec(imc), - WithFlowsParallelBranches([]v1.ParallelBranch{ - {Subscriber: createSubscriber(0)}, - }))), - }, - WantErr: false, - WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ - Object: NewFlowsParallel(parallelName, testNS, - WithInitFlowsParallelConditions, - WithFlowsParallelChannelTemplateSpec(imc), - WithFlowsParallelBranches([]v1.ParallelBranch{{Subscriber: createSubscriber(0)}}), - WithFlowsParallelChannelsNotReady("ChannelsNotReady", "Channels are not ready yet, or there are none"), - WithFlowsParallelAddressableNotReady("emptyAddress", "addressable is nil"), - WithFlowsParallelSubscriptionsNotReady("SubscriptionsNotReady", "Subscriptions are not ready yet, or there are none"), - WithFlowsParallelIngressChannelStatus(createParallelChannelStatus(parallelName, corev1.ConditionFalse)), - WithFlowsParallelOIDCServiceAccountName(makeFlowParallelOIDCServiceAccount().Name), - WithFlowsParallelOIDCIdentityCreatedSucceeded(), - WithFlowsParallelBranchStatuses([]v1.ParallelBranchStatus{{ - FilterSubscriptionStatus: createParallelFilterSubscriptionStatus(parallelName, 0, corev1.ConditionFalse), - FilterChannelStatus: createParallelBranchChannelStatus(parallelName, 0, corev1.ConditionFalse), - SubscriptionStatus: createParallelSubscriptionStatus(parallelName, 0, corev1.ConditionFalse), - }})), - }}, - WantCreates: []runtime.Object{ - makeFlowParallelOIDCServiceAccount(), - }, - }, { - Name: "OIDC: Parallel not ready on invalid OIDC service account", - Key: pKey, - Ctx: feature.ToContext(context.Background(), feature.Flags{ - feature.OIDCAuthentication: feature.Enabled, - }), - Objects: []runtime.Object{ - makeFlowParallelOIDCServiceAccountWithoutOwnerRef(), - NewFlowsParallel(parallelName, testNS, - WithInitFlowsParallelConditions, - WithFlowsParallelChannelTemplateSpec(imc), - WithFlowsParallelBranches([]v1.ParallelBranch{ - {Subscriber: createSubscriber(0)}, - }))}, - WantErr: true, - WantStatusUpdates: []clientgotesting.UpdateActionImpl{{ - Object: NewFlowsParallel(parallelName, testNS, - WithInitFlowsParallelConditions, - WithFlowsParallelChannelTemplateSpec(imc), - WithFlowsParallelBranches([]v1.ParallelBranch{{Subscriber: createSubscriber(0)}}), - // WithFlowsParallelChannelsNotReady("ChannelsNotReady", "Channels are not ready yet, or there are none"), - // WithFlowsParallelAddressableNotReady("emptyAddress", "addressable is nil"), - // WithFlowsParallelSubscriptionsNotReady("SubscriptionsNotReady", "Subscriptions are not ready yet, or there are none"), - // WithFlowsParallelIngressChannelStatus(createParallelChannelStatus(parallelName, corev1.ConditionFalse)), - WithFlowsParallelOIDCIdentityCreatedFailed("Unable to resolve service account for OIDC authentication", fmt.Sprintf("service account %s not owned by Parallel %s", makeFlowParallelOIDCServiceAccountWithoutOwnerRef().Name, parallelName)), - WithFlowsParallelOIDCServiceAccountName(makeFlowParallelOIDCServiceAccountWithoutOwnerRef().Name), - ), - }}, - WantEvents: []string{ - Eventf(corev1.EventTypeWarning, "InternalError", fmt.Sprintf("service account %s not owned by Parallel %s", makeFlowParallelOIDCServiceAccountWithoutOwnerRef().Name, parallelName)), - }}, + }, } logger := logtesting.TestLogger(t) table.Test(t, MakeFactory(func(ctx context.Context, listers *Listers, cmw configmap.Watcher) controller.Reconciler { ctx = channelable.WithDuck(ctx) r := &Reconciler{ - parallelLister: listers.GetParallelLister(), - channelableTracker: duck.NewListableTrackerFromTracker(ctx, channelable.Get, tracker.New(func(types.NamespacedName) {}, 0)), - subscriptionLister: listers.GetSubscriptionLister(), - eventingClientSet: fakeeventingclient.Get(ctx), - kubeclient: fakekubeclient.Get(ctx), - dynamicClientSet: fakedynamicclient.Get(ctx), - serviceAccountLister: listers.GetServiceAccountLister(), + parallelLister: listers.GetParallelLister(), + channelableTracker: duck.NewListableTrackerFromTracker(ctx, channelable.Get, tracker.New(func(types.NamespacedName) {}, 0)), + subscriptionLister: listers.GetSubscriptionLister(), + eventingClientSet: fakeeventingclient.Get(ctx), + dynamicClientSet: fakedynamicclient.Get(ctx), } return parallel.NewReconciler(ctx, logging.FromContext(ctx), fakeeventingclient.Get(ctx), listers.GetParallelLister(), @@ -829,20 +737,3 @@ func createDelivery(gvk metav1.GroupVersionKind, name, namespace string) *eventi }, } } - -func makeFlowParallelOIDCServiceAccount() *corev1.ServiceAccount { - return auth.GetOIDCServiceAccountForResource(v1.SchemeGroupVersion.WithKind("Parallel"), metav1.ObjectMeta{ - Name: parallelName, - Namespace: testNS, - }) -} - -func makeFlowParallelOIDCServiceAccountWithoutOwnerRef() *corev1.ServiceAccount { - sa := auth.GetOIDCServiceAccountForResource(v1.SchemeGroupVersion.WithKind("Parallel"), metav1.ObjectMeta{ - Name: parallelName, - Namespace: testNS, - }) - sa.OwnerReferences = nil - - return sa -} diff --git a/pkg/reconciler/testing/v1/parallel.go b/pkg/reconciler/testing/v1/parallel.go index dd8463a14d1..a9d50c0743c 100644 --- a/pkg/reconciler/testing/v1/parallel.go +++ b/pkg/reconciler/testing/v1/parallel.go @@ -18,11 +18,9 @@ package testing import ( "context" - "fmt" "time" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "knative.dev/eventing/pkg/apis/feature" flowsv1 "knative.dev/eventing/pkg/apis/flows/v1" messagingv1 "knative.dev/eventing/pkg/apis/messaging/v1" duckv1 "knative.dev/pkg/apis/duck/v1" @@ -115,31 +113,3 @@ func WithFlowsParallelAddressableNotReady(reason, message string) FlowsParallelO p.Status.MarkAddressableNotReady(reason, message) } } - -func WithFlowsParallelOIDCIdentityCreatedSucceeded() FlowsParallelOption { - return func(p *flowsv1.Parallel) { - p.Status.MarkOIDCIdentityCreatedSucceeded() - } -} - -func WithFlowsParallelOIDCIdentityCreatedSucceededBecauseOIDCFeatureDisabled() FlowsParallelOption { - return func(p *flowsv1.Parallel) { - p.Status.MarkOIDCIdentityCreatedSucceededWithReason(fmt.Sprintf("%s feature disabled", feature.OIDCAuthentication), "") - } -} - -func WithFlowsParallelOIDCIdentityCreatedFailed(reason, message string) FlowsParallelOption { - return func(p *flowsv1.Parallel) { - p.Status.MarkOIDCIdentityCreatedFailed(reason, message) - } -} - -func WithFlowsParallelOIDCServiceAccountName(name string) FlowsParallelOption { - return func(p *flowsv1.Parallel) { - if p.Status.Auth == nil { - p.Status.Auth = &duckv1.AuthStatus{} - } - - p.Status.Auth.ServiceAccountName = &name - } -} From 0599d4910645e0f7ac201c37a80b774c6e17bac7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Christoph=20St=C3=A4bler?= Date: Tue, 7 May 2024 13:28:15 +0200 Subject: [PATCH 5/8] Update Parallel to expose OIDC identities of underlying Subscriptions --- pkg/apis/flows/v1/parallel_lifecycle.go | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/pkg/apis/flows/v1/parallel_lifecycle.go b/pkg/apis/flows/v1/parallel_lifecycle.go index b02363ebfb8..89a235311cf 100644 --- a/pkg/apis/flows/v1/parallel_lifecycle.go +++ b/pkg/apis/flows/v1/parallel_lifecycle.go @@ -80,6 +80,7 @@ func (ps *ParallelStatus) PropagateSubscriptionStatuses(filterSubscriptions []*m if ps.BranchStatuses == nil || len(subscriptions) != len(ps.BranchStatuses) { ps.BranchStatuses = make([]ParallelBranchStatus, len(subscriptions)) } + ps.Auth = nil allReady := true // If there are no subscriptions, treat that as a False branch. Could go either way, but this seems right. if len(subscriptions) == 0 { @@ -125,6 +126,19 @@ func (ps *ParallelStatus) PropagateSubscriptionStatuses(filterSubscriptions []*m allReady = false } + if fs.Status.Auth != nil && fs.Status.Auth.ServiceAccountName != nil { + if ps.Auth == nil { + ps.Auth = &pkgduckv1.AuthStatus{} + } + ps.Auth.ServiceAccountNames = append(ps.Auth.ServiceAccountNames, *fs.Status.Auth.ServiceAccountName) + } + + if s.Status.Auth != nil && s.Status.Auth.ServiceAccountName != nil { + if ps.Auth == nil { + ps.Auth = &pkgduckv1.AuthStatus{} + } + ps.Auth.ServiceAccountNames = append(ps.Auth.ServiceAccountNames, *s.Status.Auth.ServiceAccountName) + } } if allReady { pCondSet.Manage(ps).MarkTrue(ParallelConditionSubscriptionsReady) From 32f59d4f0f15f04ef9f3353159d7e12fc4d98f91 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Christoph=20St=C3=A4bler?= Date: Tue, 7 May 2024 14:53:43 +0200 Subject: [PATCH 6/8] Add e2e test for Parallel --- test/rekt/features/parallel/oidc_feature.go | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/test/rekt/features/parallel/oidc_feature.go b/test/rekt/features/parallel/oidc_feature.go index fec05b7e570..c60ab3a8a6a 100644 --- a/test/rekt/features/parallel/oidc_feature.go +++ b/test/rekt/features/parallel/oidc_feature.go @@ -144,7 +144,19 @@ func ParallelWithTwoBranchesOIDC(channelTemplate channel_template.ChannelTemplat Must("deliver event from subscriber 2 to reply", assert.OnStore(sink). MatchEvent(test.HasId(event.ID()), test.HasData([]byte("appended data 2"))). AtLeast(1), - ) + ). + Must("use parallels identity for OIDC to subscriber1", assert.OnStore(subscriber1).MatchWithContext( + assert.MatchKind(eventshub.EventReceived).WithContext(), + assert.MatchOIDCUserFromResource(parallel.GVR(), parallelName)).AtLeast(1)). + Must("use parallels identity for OIDC to subscriber2", assert.OnStore(subscriber2).MatchWithContext( + assert.MatchKind(eventshub.EventReceived).WithContext(), + assert.MatchOIDCUserFromResource(parallel.GVR(), parallelName)).AtLeast(1)). + Must("use parallels identity for OIDC to filter1", assert.OnStore(filter1).MatchWithContext( + assert.MatchKind(eventshub.EventReceived).WithContext(), + assert.MatchOIDCUserFromResource(parallel.GVR(), parallelName)).AtLeast(1)). + Must("use parallels identity for OIDC to sink", assert.OnStore(sink).MatchWithContext( + assert.MatchKind(eventshub.EventReceived).WithContext(), + assert.MatchOIDCUserFromResource(parallel.GVR(), parallelName)).AtLeast(1)) return f } From b8c77563627fa5b4d0abb9d678b777a679112d51 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Christoph=20St=C3=A4bler?= Date: Tue, 7 May 2024 14:53:50 +0200 Subject: [PATCH 7/8] Add e2e test for Sequence --- test/rekt/features/sequence/oidc_feature.go | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/test/rekt/features/sequence/oidc_feature.go b/test/rekt/features/sequence/oidc_feature.go index 9f17bfbf225..da3c5bd3826 100644 --- a/test/rekt/features/sequence/oidc_feature.go +++ b/test/rekt/features/sequence/oidc_feature.go @@ -129,7 +129,10 @@ func SequenceSendsEventWithOIDCTokenToSteps() *feature.Feature { Must("Delivers events correctly to steps", assert.OnStore(step2Name).MatchEvent( test.HasData([]byte(expectedMsg)), - ).AtLeast(1)) + ).AtLeast(1)). + Must("use sequences identity for OIDC", assert.OnStore(step2Name).MatchWithContext( + assert.MatchKind(eventshub.EventReceived).WithContext(), + assert.MatchOIDCUserFromResource(sequence.GVR(), sequenceName)).Exact(1)) return f } @@ -209,7 +212,10 @@ func SequenceSendsEventWithOIDCTokenToReply() *feature.Feature { Must("Delivers events correctly to reply", assert.OnStore(replySinkName).MatchEvent( test.HasData([]byte(expectedMsg)), - ).AtLeast(1)) + ).AtLeast(1)). + Must("use sequences identity for OIDC", assert.OnStore(replySinkName).MatchWithContext( + assert.MatchKind(eventshub.EventReceived).WithContext(), + assert.MatchOIDCUserFromResource(sequence.GVR(), sequenceName)).Exact(1)) return f } From c7953fcf3d0b750c193f2466913e2184cb1d6acb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Christoph=20St=C3=A4bler?= Date: Tue, 14 May 2024 08:59:52 +0200 Subject: [PATCH 8/8] Add unit tests --- pkg/apis/flows/v1/parallel_lifecycle_test.go | 108 +++++++++++++++++++ pkg/apis/flows/v1/sequence_lifecycle_test.go | 81 ++++++++++++++ 2 files changed, 189 insertions(+) diff --git a/pkg/apis/flows/v1/parallel_lifecycle_test.go b/pkg/apis/flows/v1/parallel_lifecycle_test.go index d87f3026be4..f90248933e0 100644 --- a/pkg/apis/flows/v1/parallel_lifecycle_test.go +++ b/pkg/apis/flows/v1/parallel_lifecycle_test.go @@ -19,6 +19,8 @@ package v1 import ( "testing" + "knative.dev/pkg/ptr" + "knative.dev/pkg/apis" "github.com/google/go-cmp/cmp" @@ -238,6 +240,112 @@ func TestParallelPropagateSubscriptionStatuses(t *testing.T) { } } +func TestParallelPropagateSubscriptionOIDCServiceAccounts(t *testing.T) { + tests := []struct { + name string + filterSubs []*messagingv1.Subscription + subs []*messagingv1.Subscription + wantOIDCSAs []string + }{{ + name: "empty", + filterSubs: []*messagingv1.Subscription{}, + subs: []*messagingv1.Subscription{}, + }, { + name: "both subscriptions with OIDC SAs", + filterSubs: []*messagingv1.Subscription{{ + TypeMeta: metav1.TypeMeta{ + APIVersion: "messaging.knative.dev/v1", + Kind: "Subscription", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "sub", + Namespace: "testns", + }, + Status: messagingv1.SubscriptionStatus{ + Auth: &duckv1.AuthStatus{ + ServiceAccountName: ptr.String("filterSub-oidc-sa"), + }, + }, + }}, subs: []*messagingv1.Subscription{{ + TypeMeta: metav1.TypeMeta{ + APIVersion: "messaging.knative.dev/v1", + Kind: "Subscription", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "sub", + Namespace: "testns", + }, + Status: messagingv1.SubscriptionStatus{ + Auth: &duckv1.AuthStatus{ + ServiceAccountName: ptr.String("sub-oidc-sa"), + }, + }, + }}, + wantOIDCSAs: []string{ + "filterSub-oidc-sa", + "sub-oidc-sa", + }, + }, { + name: "filter subscription without OIDC SA", + filterSubs: []*messagingv1.Subscription{getSubscription("fsub0", false)}, + subs: []*messagingv1.Subscription{{ + TypeMeta: metav1.TypeMeta{ + APIVersion: "messaging.knative.dev/v1", + Kind: "Subscription", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "sub", + Namespace: "testns", + }, + Status: messagingv1.SubscriptionStatus{ + Auth: &duckv1.AuthStatus{ + ServiceAccountName: ptr.String("sub-oidc-sa"), + }, + }, + }}, + wantOIDCSAs: []string{ + "sub-oidc-sa", + }, + }, { + name: "subscriber subscription without OIDC SA", + filterSubs: []*messagingv1.Subscription{{ + TypeMeta: metav1.TypeMeta{ + APIVersion: "messaging.knative.dev/v1", + Kind: "Subscription", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "sub", + Namespace: "testns", + }, + Status: messagingv1.SubscriptionStatus{ + Auth: &duckv1.AuthStatus{ + ServiceAccountName: ptr.String("filterSub-oidc-sa"), + }, + }, + }}, + subs: []*messagingv1.Subscription{getSubscription("sub0", false)}, + wantOIDCSAs: []string{ + "filterSub-oidc-sa", + }, + }} + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + ps := ParallelStatus{} + ps.PropagateSubscriptionStatuses(test.filterSubs, test.subs) + + var got []string + if ps.Auth != nil { + got = ps.Auth.ServiceAccountNames + } + + if diff := cmp.Diff(test.wantOIDCSAs, got); diff != "" { + t.Errorf("unexpected OIDC service accounts (-want, +got) = %v", diff) + } + }) + } +} + func TestParallelPropagateChannelStatuses(t *testing.T) { tests := []struct { name string diff --git a/pkg/apis/flows/v1/sequence_lifecycle_test.go b/pkg/apis/flows/v1/sequence_lifecycle_test.go index 24c822147bb..7b5bc7a0514 100644 --- a/pkg/apis/flows/v1/sequence_lifecycle_test.go +++ b/pkg/apis/flows/v1/sequence_lifecycle_test.go @@ -20,6 +20,8 @@ import ( "fmt" "testing" + "knative.dev/pkg/ptr" + "github.com/google/go-cmp/cmp" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -273,6 +275,85 @@ func TestSequencePropagateSubscriptionStatuses(t *testing.T) { } } +func TestSequencePropagateSubscriptionOIDCSA(t *testing.T) { + tests := []struct { + name string + subs []*messagingv1.Subscription + wantOIDCSAs []string + }{{ + name: "empty", + subs: []*messagingv1.Subscription{}, + }, { + name: "one subscription", + subs: []*messagingv1.Subscription{{ + TypeMeta: metav1.TypeMeta{ + APIVersion: "messaging.knative.dev/v1", + Kind: "Subscription", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "sub", + Namespace: "testns", + }, + Status: messagingv1.SubscriptionStatus{ + Auth: &duckv1.AuthStatus{ + ServiceAccountName: ptr.String("sub-oidc-sa"), + }, + }, + }}, + wantOIDCSAs: []string{"sub-oidc-sa"}, + }, { + name: "multiple subscriptions", + subs: []*messagingv1.Subscription{{ + TypeMeta: metav1.TypeMeta{ + APIVersion: "messaging.knative.dev/v1", + Kind: "Subscription", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "sub1", + Namespace: "testns", + }, + Status: messagingv1.SubscriptionStatus{ + Auth: &duckv1.AuthStatus{ + ServiceAccountName: ptr.String("sub1-oidc-sa"), + }, + }, + }, { + TypeMeta: metav1.TypeMeta{ + APIVersion: "messaging.knative.dev/v1", + Kind: "Subscription", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: "sub2", + Namespace: "testns", + }, + Status: messagingv1.SubscriptionStatus{ + Auth: &duckv1.AuthStatus{ + ServiceAccountName: ptr.String("sub2-oidc-sa"), + }, + }, + }, + getSubscription("sub3", true), + }, + wantOIDCSAs: []string{"sub1-oidc-sa", "sub2-oidc-sa"}, + }} + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + ps := SequenceStatus{} + ps.PropagateSubscriptionStatuses(test.subs) + + var got []string + if ps.Auth != nil { + got = ps.Auth.ServiceAccountNames + } + + if diff := cmp.Diff(test.wantOIDCSAs, got); diff != "" { + t.Errorf("unexpected OIDC service accounts (-want, +got) = %v", diff) + } + }) + } +} + func TestSequencePropagateChannelStatuses(t *testing.T) { tests := []struct { name string