Skip to content

Commit

Permalink
Support auto generation of Subscriptions identity service account and… (
Browse files Browse the repository at this point in the history
#7338)

* Support auto generation of Subscriptions identity service account and expose in AuthStatus

Signed-off-by: pingjiang <xiangpingjiang1998@gmail.com>

* fix unit test

Signed-off-by: pingjiang <xiangpingjiang1998@gmail.com>

* fix unit test

Signed-off-by: pingjiang <xiangpingjiang1998@gmail.com>

* fix unit test

Signed-off-by: pingjiang <xiangpingjiang1998@gmail.com>

* changes after rewiews

Signed-off-by: pingjiang <xiangpingjiang1998@gmail.com>

* add globalResync

Signed-off-by: pingjiang <xiangpingjiang1998@gmail.com>

* Update pkg/reconciler/subscription/controller.go

fix the linting

Co-authored-by: Christoph Stäbler <cstabler@redhat.com>

---------

Signed-off-by: pingjiang <xiangpingjiang1998@gmail.com>
Co-authored-by: Christoph Stäbler <cstabler@redhat.com>
  • Loading branch information
xiangpingjiang and creydr authored Oct 16, 2023
1 parent 0af3ef6 commit 62ec33b
Show file tree
Hide file tree
Showing 9 changed files with 343 additions and 54 deletions.
1 change: 1 addition & 0 deletions pkg/apis/eventing/v1/test_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ func (testHelper) ReadySubscriptionStatus() *messagingv1.SubscriptionStatus {
ss.MarkChannelReady()
ss.MarkReferencesResolved()
ss.MarkAddedToChannel()
ss.MarkOIDCIdentityCreatedSucceeded()
return ss
}

Expand Down
1 change: 1 addition & 0 deletions pkg/apis/flows/v1/sequence_lifecycle_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ func getSubscription(name string, ready bool) *messagingv1.Subscription {
s.Status.MarkChannelReady()
s.Status.MarkReferencesResolved()
s.Status.MarkAddedToChannel()
s.Status.MarkOIDCIdentityCreatedSucceeded()
} else {
s.Status.MarkChannelFailed("testInducedFailure", "Test Induced failure")
s.Status.MarkReferencesNotResolved("testInducedFailure", "Test Induced failure")
Expand Down
20 changes: 19 additions & 1 deletion pkg/apis/messaging/v1/subscription_lifecycle.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import (

// SubCondSet is a condition set with Ready as the happy condition and
// ReferencesResolved and ChannelReady as the dependent conditions.
var SubCondSet = apis.NewLivingConditionSet(SubscriptionConditionReferencesResolved, SubscriptionConditionAddedToChannel, SubscriptionConditionChannelReady)
var SubCondSet = apis.NewLivingConditionSet(SubscriptionConditionReferencesResolved, SubscriptionConditionAddedToChannel, SubscriptionConditionChannelReady, SubscriptionConditionOIDCIdentityCreated)

const (
// SubscriptionConditionReady has status True when all subconditions below have been set to True.
Expand All @@ -37,6 +37,8 @@ const (

// SubscriptionConditionChannelReady has status True when the channel has marked the subscriber as 'ready'
SubscriptionConditionChannelReady apis.ConditionType = "ChannelReady"

SubscriptionConditionOIDCIdentityCreated apis.ConditionType = "OIDCIdentityCreated"
)

// GetConditionSet retrieves the condition set for this resource. Implements the KRShaped interface.
Expand Down Expand Up @@ -113,3 +115,19 @@ func (ss *SubscriptionStatus) MarkChannelUnknown(reason, messageFormat string, m
func (ss *SubscriptionStatus) MarkNotAddedToChannel(reason, messageFormat string, messageA ...interface{}) {
SubCondSet.Manage(ss).MarkFalse(SubscriptionConditionAddedToChannel, reason, messageFormat, messageA...)
}

func (ss *SubscriptionStatus) MarkOIDCIdentityCreatedSucceeded() {
SubCondSet.Manage(ss).MarkTrue(SubscriptionConditionOIDCIdentityCreated)
}

func (ss *SubscriptionStatus) MarkOIDCIdentityCreatedSucceededWithReason(reason, messageFormat string, messageA ...interface{}) {
SubCondSet.Manage(ss).MarkTrueWithReason(SubscriptionConditionOIDCIdentityCreated, reason, messageFormat, messageA...)
}

func (ss *SubscriptionStatus) MarkOIDCIdentityCreatedFailed(reason, messageFormat string, messageA ...interface{}) {
SubCondSet.Manage(ss).MarkFalse(SubscriptionConditionOIDCIdentityCreated, reason, messageFormat, messageA...)
}

func (ss *SubscriptionStatus) MarkOIDCIdentityCreatedUnknown(reason, messageFormat string, messageA ...interface{}) {
SubCondSet.Manage(ss).MarkUnknown(SubscriptionConditionOIDCIdentityCreated, reason, messageFormat, messageA...)
}
96 changes: 62 additions & 34 deletions pkg/apis/messaging/v1/subscription_lifecycle_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,9 @@ func TestSubscriptionInitializeConditions(t *testing.T) {
}, {
Type: SubscriptionConditionChannelReady,
Status: corev1.ConditionUnknown,
}, {
Type: SubscriptionConditionOIDCIdentityCreated,
Status: corev1.ConditionUnknown,
}, {
Type: SubscriptionConditionReady,
Status: corev1.ConditionUnknown,
Expand Down Expand Up @@ -157,6 +160,9 @@ func TestSubscriptionInitializeConditions(t *testing.T) {
}, {
Type: SubscriptionConditionChannelReady,
Status: corev1.ConditionFalse,
}, {
Type: SubscriptionConditionOIDCIdentityCreated,
Status: corev1.ConditionUnknown,
}, {
Type: SubscriptionConditionReady,
Status: corev1.ConditionUnknown,
Expand Down Expand Up @@ -184,6 +190,9 @@ func TestSubscriptionInitializeConditions(t *testing.T) {
}, {
Type: SubscriptionConditionChannelReady,
Status: corev1.ConditionUnknown,
}, {
Type: SubscriptionConditionOIDCIdentityCreated,
Status: corev1.ConditionUnknown,
}, {
Type: SubscriptionConditionReady,
Status: corev1.ConditionUnknown,
Expand All @@ -207,46 +216,60 @@ func TestSubscriptionInitializeConditions(t *testing.T) {

func TestSubscriptionIsReady(t *testing.T) {
tests := []struct {
name string
markResolved bool
markChannelReady bool
wantReady bool
markAddedToChannel bool
name string
markResolved bool
markChannelReady bool
wantReady bool
markAddedToChannel bool
markOIDCServiceAccountCreated bool
}{{
name: "all happy",
markResolved: true,
markChannelReady: true,
markAddedToChannel: true,
wantReady: true,
name: "all happy",
markResolved: true,
markChannelReady: true,
markAddedToChannel: true,
wantReady: true,
markOIDCServiceAccountCreated: true,
}, {
name: "one sad - markResolved",
markResolved: false,
markChannelReady: true,
markAddedToChannel: true,
wantReady: false,
name: "one sad - markResolved",
markResolved: false,
markChannelReady: true,
markAddedToChannel: true,
wantReady: false,
markOIDCServiceAccountCreated: true,
}, {
name: "one sad - markChannelReady",
markResolved: true,
markChannelReady: false,
markAddedToChannel: true,
wantReady: false,
name: "one sad - markChannelReady",
markResolved: true,
markChannelReady: false,
markAddedToChannel: true,
wantReady: false,
markOIDCServiceAccountCreated: true,
}, {
name: "one sad - markAddedToChannel",
markResolved: true,
markChannelReady: true,
markAddedToChannel: false,
wantReady: false,
name: "one sad - markAddedToChannel",
markResolved: true,
markChannelReady: true,
markAddedToChannel: false,
wantReady: false,
markOIDCServiceAccountCreated: true,
}, {
name: "other sad",
markResolved: true,
markChannelReady: false,
wantReady: false,
name: "other sad",
markResolved: true,
markChannelReady: false,
wantReady: false,
markOIDCServiceAccountCreated: false,
}, {
name: "all sad",
markResolved: false,
markChannelReady: false,
markAddedToChannel: false,
wantReady: false,
name: "all sad",
markResolved: false,
markChannelReady: false,
markAddedToChannel: false,
wantReady: false,
markOIDCServiceAccountCreated: false,
}, {
name: "one sad - markOIDCServiceAccountCreated",
markResolved: true,
markChannelReady: true,
markAddedToChannel: true,
wantReady: false,
markOIDCServiceAccountCreated: false,
}}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
Expand All @@ -266,6 +289,11 @@ func TestSubscriptionIsReady(t *testing.T) {
t.Errorf("Channel added, but not reflected in IsAddedToChannel")
}
}
if test.markOIDCServiceAccountCreated {
ss.MarkOIDCIdentityCreatedSucceeded()
} else {
ss.MarkOIDCIdentityCreatedFailed("Unable to ...", "")
}
got := ss.IsReady()
if test.wantReady != got {
t.Errorf("unexpected readiness: want %v, got %v", test.wantReady, got)
Expand Down
31 changes: 26 additions & 5 deletions pkg/reconciler/subscription/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package subscription
import (
"context"

"k8s.io/client-go/tools/cache"
"knative.dev/eventing/pkg/apis/feature"
"knative.dev/pkg/client/injection/apiextensions/informers/apiextensions/v1/customresourcedefinition"
"knative.dev/pkg/configmap"
Expand All @@ -33,6 +34,8 @@ import (
"knative.dev/eventing/pkg/client/injection/informers/messaging/v1/subscription"
subscriptionreconciler "knative.dev/eventing/pkg/client/injection/reconciler/messaging/v1/subscription"
"knative.dev/eventing/pkg/duck"
kubeclient "knative.dev/pkg/client/injection/kube/client"
serviceaccountinformer "knative.dev/pkg/client/injection/kube/informers/core/v1/serviceaccount"
"knative.dev/pkg/injection/clients/dynamicclient"
)

Expand All @@ -45,22 +48,35 @@ func NewController(

subscriptionInformer := subscription.Get(ctx)
channelInformer := channel.Get(ctx)
serviceaccountInformer := serviceaccountinformer.Get(ctx)

featureStore := feature.NewStore(logging.FromContext(ctx).Named("feature-config-store"))
var globalResync func(obj interface{})

featureStore := feature.NewStore(logging.FromContext(ctx).Named("feature-config-store"), func(name string, value interface{}) {
if globalResync != nil {
globalResync(nil)
}
})
featureStore.WatchConfigs(cmw)

r := &Reconciler{
dynamicClientSet: dynamicclient.Get(ctx),
kreferenceResolver: kref.NewKReferenceResolver(customresourcedefinition.Get(ctx).Lister()),
subscriptionLister: subscriptionInformer.Lister(),
channelLister: channelInformer.Lister(),
dynamicClientSet: dynamicclient.Get(ctx),
kubeclient: kubeclient.Get(ctx),
kreferenceResolver: kref.NewKReferenceResolver(customresourcedefinition.Get(ctx).Lister()),
subscriptionLister: subscriptionInformer.Lister(),
channelLister: channelInformer.Lister(),
serviceAccountLister: serviceaccountInformer.Lister(),
}
impl := subscriptionreconciler.NewImpl(ctx, r, func(impl *controller.Impl) controller.Options {
return controller.Options{
ConfigStore: featureStore,
}
})

globalResync = func(_ interface{}) {
impl.GlobalResync(subscriptionInformer.Informer())
}

subscriptionInformer.Informer().AddEventHandler(controller.HandleAll(impl.Enqueue))

// Trackers used to notify us when the resources Subscription depends on change, so that the
Expand All @@ -80,5 +96,10 @@ func NewController(
),
))

// Reconciler Subscription when the OIDC service account changes
serviceaccountInformer.Informer().AddEventHandler(cache.FilteringResourceEventHandler{
FilterFunc: controller.FilterController(&messagingv1.Subscription{}),
Handler: controller.HandleAll(impl.EnqueueControllerOf),
})
return impl
}
1 change: 1 addition & 0 deletions pkg/reconciler/subscription/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
_ "knative.dev/eventing/pkg/client/injection/informers/messaging/v1/subscription/fake"
_ "knative.dev/pkg/client/injection/apiextensions/informers/apiextensions/v1/customresourcedefinition/fake"
_ "knative.dev/pkg/client/injection/ducks/duck/v1/addressable/fake"
_ "knative.dev/pkg/client/injection/kube/informers/core/v1/serviceaccount/fake"
)

func TestNew(t *testing.T) {
Expand Down
34 changes: 29 additions & 5 deletions pkg/reconciler/subscription/subscription.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/kubernetes"

"knative.dev/pkg/apis"

Expand All @@ -39,9 +40,11 @@ import (
"knative.dev/pkg/resolver"
"knative.dev/pkg/tracker"

corev1listers "k8s.io/client-go/listers/core/v1"
eventingduckv1 "knative.dev/eventing/pkg/apis/duck/v1"
"knative.dev/eventing/pkg/apis/feature"
v1 "knative.dev/eventing/pkg/apis/messaging/v1"
"knative.dev/eventing/pkg/auth"
subscriptionreconciler "knative.dev/eventing/pkg/client/injection/reconciler/messaging/v1/subscription"
listers "knative.dev/eventing/pkg/client/listers/messaging/v1"
eventingduck "knative.dev/eventing/pkg/duck"
Expand Down Expand Up @@ -69,12 +72,15 @@ type Reconciler struct {
// crdLister is used to resolve the ref version
kreferenceResolver *kref.KReferenceResolver

kubeclient kubernetes.Interface

// listers index properties about resources
subscriptionLister listers.SubscriptionLister
channelLister listers.ChannelLister
channelableTracker eventingduck.ListableTracker
destinationResolver *resolver.URIResolver
tracker tracker.Interface
subscriptionLister listers.SubscriptionLister
channelLister listers.ChannelLister
channelableTracker eventingduck.ListableTracker
destinationResolver *resolver.URIResolver
tracker tracker.Interface
serviceAccountLister corev1listers.ServiceAccountLister
}

// Check that our Reconciler implements Interface
Expand All @@ -85,6 +91,24 @@ var _ subscriptionreconciler.Finalizer = (*Reconciler)(nil)

// ReconcileKind implements Interface.ReconcileKind.
func (r *Reconciler) ReconcileKind(ctx context.Context, subscription *v1.Subscription) pkgreconciler.Event {
// OIDC authentication
featureFlags := feature.FromContext(ctx)
if featureFlags.IsOIDCAuthentication() {
saName := auth.GetOIDCServiceAccountNameForResource(v1.SchemeGroupVersion.WithKind("Subscription"), subscription.ObjectMeta)
subscription.Status.Auth = &duckv1.AuthStatus{
ServiceAccountName: &saName,
}

if err := auth.EnsureOIDCServiceAccountExistsForResource(ctx, r.serviceAccountLister, r.kubeclient, v1.SchemeGroupVersion.WithKind("Subscription"), subscription.ObjectMeta); err != nil {
subscription.Status.MarkOIDCIdentityCreatedFailed("Unable to resolve service account for OIDC authentication", "%v", err)
return err
}
subscription.Status.MarkOIDCIdentityCreatedSucceeded()
} else {
subscription.Status.Auth = nil
subscription.Status.MarkOIDCIdentityCreatedSucceededWithReason(fmt.Sprintf("%s feature disabled", feature.OIDCAuthentication), "")
}

// Find the channel for this subscription.
channel, err := r.getChannel(ctx, subscription)
if err != nil {
Expand Down
Loading

0 comments on commit 62ec33b

Please sign in to comment.