Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add serviceaccount in parallel #7373

Merged
21 changes: 19 additions & 2 deletions pkg/apis/flows/v1/parallel_lifecycle.go
creydr marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import (
pkgduckv1 "knative.dev/pkg/apis/duck/v1"
)

var pCondSet = apis.NewLivingConditionSet(ParallelConditionReady, ParallelConditionChannelsReady, ParallelConditionSubscriptionsReady, ParallelConditionAddressable)
var pCondSet = apis.NewLivingConditionSet(ParallelConditionReady, ParallelConditionChannelsReady, ParallelConditionSubscriptionsReady, ParallelConditionAddressable, ParallelConditionOIDCIdentityCreated)

const (
// ParallelConditionReady has status True when all subconditions below have been set to True.
Expand All @@ -41,7 +41,8 @@ const (

// ParallelConditionAddressable has status true when this Parallel meets
// the Addressable contract and has a non-empty hostname.
ParallelConditionAddressable apis.ConditionType = "Addressable"
ParallelConditionAddressable apis.ConditionType = "Addressable"
ParallelConditionOIDCIdentityCreated apis.ConditionType = "OIDCIdentityCreated"
)

// GetConditionSet retrieves the condition set for this resource. Implements the KRShaped interface.
Expand Down Expand Up @@ -195,6 +196,22 @@ func (ps *ParallelStatus) MarkAddressableNotReady(reason, messageFormat string,
pCondSet.Manage(ps).MarkFalse(ParallelConditionAddressable, reason, messageFormat, messageA...)
}

func (ps *ParallelStatus) MarkOIDCIdentityCreatedSucceeded() {
pCondSet.Manage(ps).MarkTrue(ParallelConditionOIDCIdentityCreated)
}

func (ps *ParallelStatus) MarkOIDCIdentityCreatedSucceededWithReason(reason, messageFormat string, messageA ...interface{}) {
pCondSet.Manage(ps).MarkTrueWithReason(ParallelConditionOIDCIdentityCreated, reason, messageFormat, messageA...)
}

func (ps *ParallelStatus) MarkOIDCIdentityCreatedFailed(reason, messageFormat string, messageA ...interface{}) {
pCondSet.Manage(ps).MarkFalse(ParallelConditionOIDCIdentityCreated, reason, messageFormat, messageA...)
}

func (ps *ParallelStatus) MarkOIDCIdentityCreatedUnknown(reason, messageFormat string, messageA ...interface{}) {
pCondSet.Manage(ps).MarkUnknown(ParallelConditionOIDCIdentityCreated, reason, messageFormat, messageA...)
}

func (ps *ParallelStatus) setAddress(address *pkgduckv1.Addressable) {
ps.Address = address
if address == nil {
Expand Down
173 changes: 107 additions & 66 deletions pkg/apis/flows/v1/parallel_lifecycle_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,9 @@ func TestParallelInitializeConditions(t *testing.T) {
}, {
Type: ParallelConditionChannelsReady,
Status: corev1.ConditionUnknown,
}, {
Type: ParallelConditionOIDCIdentityCreated,
Status: corev1.ConditionUnknown,
}, {
Type: ParallelConditionReady,
Status: corev1.ConditionUnknown,
Expand Down Expand Up @@ -114,6 +117,9 @@ func TestParallelInitializeConditions(t *testing.T) {
}, {
Type: ParallelConditionChannelsReady,
Status: corev1.ConditionFalse,
}, {
Type: ParallelConditionOIDCIdentityCreated,
Status: corev1.ConditionUnknown,
}, {
Type: ParallelConditionReady,
Status: corev1.ConditionUnknown,
Expand Down Expand Up @@ -141,6 +147,9 @@ func TestParallelInitializeConditions(t *testing.T) {
}, {
Type: ParallelConditionChannelsReady,
Status: corev1.ConditionUnknown,
}, {
Type: ParallelConditionOIDCIdentityCreated,
Status: corev1.ConditionUnknown,
}, {
Type: ParallelConditionReady,
Status: corev1.ConditionUnknown,
Expand Down Expand Up @@ -326,89 +335,121 @@ func TestParallelPropagateSubscriptionStatusUpdated(t *testing.T) {

func TestParallelReady(t *testing.T) {
tests := []struct {
name string
fsubs []*messagingv1.Subscription
subs []*messagingv1.Subscription
ichannel *eventingduckv1.Channelable
channels []*eventingduckv1.Channelable
want bool
name string
fsubs []*messagingv1.Subscription
subs []*messagingv1.Subscription
ichannel *eventingduckv1.Channelable
channels []*eventingduckv1.Channelable
markOIDCServiceAccountCreated bool
want bool
}{{
name: "ingress false, empty",
fsubs: []*messagingv1.Subscription{},
subs: []*messagingv1.Subscription{},
ichannel: getChannelable(false),
channels: []*eventingduckv1.Channelable{},
want: false,
name: "ingress false, empty, serviceAccount ready",
fsubs: []*messagingv1.Subscription{},
subs: []*messagingv1.Subscription{},
ichannel: getChannelable(false),
channels: []*eventingduckv1.Channelable{},
markOIDCServiceAccountCreated: true,
want: false,
}, {
name: "ingress true, empty",
fsubs: []*messagingv1.Subscription{},
subs: []*messagingv1.Subscription{},
ichannel: getChannelable(true),
channels: []*eventingduckv1.Channelable{},
want: false,
name: "ingress true, empty, serviceAccount ready",
fsubs: []*messagingv1.Subscription{},
subs: []*messagingv1.Subscription{},
ichannel: getChannelable(true),
channels: []*eventingduckv1.Channelable{},
markOIDCServiceAccountCreated: true,
want: false,
}, {
name: "ingress true, one channelable not ready, one subscription ready",
ichannel: getChannelable(true),
channels: []*eventingduckv1.Channelable{getChannelable(false)},
fsubs: []*messagingv1.Subscription{getSubscription("fsub0", true)},
subs: []*messagingv1.Subscription{getSubscription("sub0", true)},
want: false,
name: "ingress true, one channelable not ready, one subscription ready, serviceAccount ready",
ichannel: getChannelable(true),
channels: []*eventingduckv1.Channelable{getChannelable(false)},
fsubs: []*messagingv1.Subscription{getSubscription("fsub0", true)},
subs: []*messagingv1.Subscription{getSubscription("sub0", true)},
markOIDCServiceAccountCreated: true,
want: false,
}, {
name: "ingress true, one channelable ready, one subscription not ready",
ichannel: getChannelable(true),
channels: []*eventingduckv1.Channelable{getChannelable(true)},
fsubs: []*messagingv1.Subscription{getSubscription("fsub0", false)},
subs: []*messagingv1.Subscription{getSubscription("sub0", false)},
want: false,
name: "ingress true, one channelable ready, one subscription not ready, serviceAccount ready",
ichannel: getChannelable(true),
channels: []*eventingduckv1.Channelable{getChannelable(true)},
fsubs: []*messagingv1.Subscription{getSubscription("fsub0", false)},
subs: []*messagingv1.Subscription{getSubscription("sub0", false)},
markOIDCServiceAccountCreated: true,
want: false,
}, {
name: "ingress false, one channelable ready, one subscription ready",
ichannel: getChannelable(false),
channels: []*eventingduckv1.Channelable{getChannelable(true)},
fsubs: []*messagingv1.Subscription{getSubscription("fsub0", true)},
subs: []*messagingv1.Subscription{getSubscription("sub0", true)},
want: false,
name: "ingress false, one channelable ready, one subscription ready,serviceAccount ready",
ichannel: getChannelable(false),
channels: []*eventingduckv1.Channelable{getChannelable(true)},
fsubs: []*messagingv1.Subscription{getSubscription("fsub0", true)},
subs: []*messagingv1.Subscription{getSubscription("sub0", true)},
markOIDCServiceAccountCreated: true,
want: false,
}, {
name: "ingress true, one channelable ready, one subscription ready",
ichannel: getChannelable(true),
channels: []*eventingduckv1.Channelable{getChannelable(true)},
fsubs: []*messagingv1.Subscription{getSubscription("fsub0", true)},
subs: []*messagingv1.Subscription{getSubscription("sub0", true)},
want: true,
name: "ingress true, one channelable ready, one subscription ready, serviceAccount ready",
ichannel: getChannelable(true),
channels: []*eventingduckv1.Channelable{getChannelable(true)},
fsubs: []*messagingv1.Subscription{getSubscription("fsub0", true)},
subs: []*messagingv1.Subscription{getSubscription("sub0", true)},
markOIDCServiceAccountCreated: true,
want: true,
}, {
name: "ingress true, one channelable ready, one not, two subscriptions ready",
ichannel: getChannelable(true),
channels: []*eventingduckv1.Channelable{getChannelable(true), getChannelable(false)},
fsubs: []*messagingv1.Subscription{getSubscription("fsub0", true), getSubscription("fsub1", true)},
subs: []*messagingv1.Subscription{getSubscription("sub0", true), getSubscription("sub1", true)},
want: false,
name: "ingress true, one channelable ready, one subscription ready, serviceAccount not ready",
ichannel: getChannelable(true),
channels: []*eventingduckv1.Channelable{getChannelable(true)},
fsubs: []*messagingv1.Subscription{getSubscription("fsub0", true)},
subs: []*messagingv1.Subscription{getSubscription("sub0", true)},
markOIDCServiceAccountCreated: false,
want: false,
}, {
name: "ingress true, two channelables ready, one subscription ready, one not",
ichannel: getChannelable(true),
channels: []*eventingduckv1.Channelable{getChannelable(true), getChannelable(true)},
fsubs: []*messagingv1.Subscription{getSubscription("fsub0", true), getSubscription("fsub1", false)},
subs: []*messagingv1.Subscription{getSubscription("sub0", true), getSubscription("sub1", false)},
want: false,
name: "ingress true, one channelable ready, one not, two subscriptions ready, serviceAccount ready",
ichannel: getChannelable(true),
channels: []*eventingduckv1.Channelable{getChannelable(true), getChannelable(false)},
fsubs: []*messagingv1.Subscription{getSubscription("fsub0", true), getSubscription("fsub1", true)},
subs: []*messagingv1.Subscription{getSubscription("sub0", true), getSubscription("sub1", true)},
markOIDCServiceAccountCreated: true,
want: false,
}, {
name: "ingress false, two channelables ready, two subscriptions ready",
ichannel: getChannelable(false),
channels: []*eventingduckv1.Channelable{getChannelable(true), getChannelable(true)},
fsubs: []*messagingv1.Subscription{getSubscription("fsub0", true), getSubscription("fsub1", true)},
subs: []*messagingv1.Subscription{getSubscription("sub0", true), getSubscription("sub1", true)},
want: false,
name: "ingress true, two channelables ready, one subscription ready, one not, serviceAccount ready",
ichannel: getChannelable(true),
channels: []*eventingduckv1.Channelable{getChannelable(true), getChannelable(true)},
fsubs: []*messagingv1.Subscription{getSubscription("fsub0", true), getSubscription("fsub1", false)},
subs: []*messagingv1.Subscription{getSubscription("sub0", true), getSubscription("sub1", false)},
markOIDCServiceAccountCreated: true,
want: false,
}, {
name: "ingress true, two channelables ready, two subscriptions ready",
ichannel: getChannelable(true),
channels: []*eventingduckv1.Channelable{getChannelable(true), getChannelable(true)},
fsubs: []*messagingv1.Subscription{getSubscription("fsub0", true), getSubscription("fsub1", true)},
subs: []*messagingv1.Subscription{getSubscription("sub0", true), getSubscription("sub1", true)},
want: true,
name: "ingress false, two channelables ready, two subscriptions ready, serviceAccount ready",
ichannel: getChannelable(false),
channels: []*eventingduckv1.Channelable{getChannelable(true), getChannelable(true)},
fsubs: []*messagingv1.Subscription{getSubscription("fsub0", true), getSubscription("fsub1", true)},
subs: []*messagingv1.Subscription{getSubscription("sub0", true), getSubscription("sub1", true)},
markOIDCServiceAccountCreated: true,
want: false,
}, {
name: "ingress true, two channelables ready, two subscriptions ready, serviceAccount not ready",
ichannel: getChannelable(true),
channels: []*eventingduckv1.Channelable{getChannelable(true), getChannelable(true)},
fsubs: []*messagingv1.Subscription{getSubscription("fsub0", true), getSubscription("fsub1", true)},
subs: []*messagingv1.Subscription{getSubscription("sub0", true), getSubscription("sub1", true)},
markOIDCServiceAccountCreated: false,
want: false,
}, {
name: "ingress true, two channelables ready, two subscriptions ready, serviceAccount ready",
ichannel: getChannelable(true),
channels: []*eventingduckv1.Channelable{getChannelable(true), getChannelable(true)},
fsubs: []*messagingv1.Subscription{getSubscription("fsub0", true), getSubscription("fsub1", true)},
subs: []*messagingv1.Subscription{getSubscription("sub0", true), getSubscription("sub1", true)},
markOIDCServiceAccountCreated: true,
want: true,
}}

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
ps := ParallelStatus{}
ps.PropagateChannelStatuses(test.ichannel, test.channels)
ps.PropagateSubscriptionStatuses(test.fsubs, test.subs)
if test.markOIDCServiceAccountCreated {
ps.MarkOIDCIdentityCreatedSucceeded()
} else {
ps.MarkOIDCIdentityCreatedFailed("Unable to create serviceaccount", "")
}
got := ps.IsReady()
want := test.want
if want != got {
Expand Down
38 changes: 33 additions & 5 deletions pkg/reconciler/parallel/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,15 @@ import (
"context"

"k8s.io/client-go/tools/cache"
"knative.dev/eventing/pkg/apis/feature"
v1 "knative.dev/eventing/pkg/apis/flows/v1"
"knative.dev/eventing/pkg/duck"
kubeclient "knative.dev/pkg/client/injection/kube/client"
serviceaccountinformer "knative.dev/pkg/client/injection/kube/informers/core/v1/serviceaccount"
"knative.dev/pkg/configmap"
"knative.dev/pkg/controller"
"knative.dev/pkg/injection/clients/dynamicclient"
"knative.dev/pkg/logging"

eventingclient "knative.dev/eventing/pkg/client/injection/client"
"knative.dev/eventing/pkg/client/injection/ducks/duck/v1/channelable"
Expand All @@ -42,14 +46,33 @@ func NewController(

parallelInformer := parallel.Get(ctx)
subscriptionInformer := subscription.Get(ctx)
serviceaccountInformer := serviceaccountinformer.Get(ctx)

var globalResync func(obj interface{})
featureStore := feature.NewStore(logging.FromContext(ctx).Named("feature-config-store"), func(name string, value interface{}) {
if globalResync != nil {
creydr marked this conversation as resolved.
Show resolved Hide resolved
globalResync(nil)
}
})
featureStore.WatchConfigs(cmw)

r := &Reconciler{
parallelLister: parallelInformer.Lister(),
subscriptionLister: subscriptionInformer.Lister(),
dynamicClientSet: dynamicclient.Get(ctx),
eventingClientSet: eventingclient.Get(ctx),
parallelLister: parallelInformer.Lister(),
subscriptionLister: subscriptionInformer.Lister(),
serviceAccountLister: serviceaccountInformer.Lister(),
kubeclient: kubeclient.Get(ctx),
dynamicClientSet: dynamicclient.Get(ctx),
eventingClientSet: eventingclient.Get(ctx),
}
impl := parallelreconciler.NewImpl(ctx, r, func(impl *controller.Impl) controller.Options {
return controller.Options{
ConfigStore: featureStore,
}
})

globalResync = func(_ interface{}) {
impl.GlobalResync(parallelInformer.Informer())
}
impl := parallelreconciler.NewImpl(ctx, r)

r.channelableTracker = duck.NewListableTrackerFromTracker(ctx, channelable.Get, impl.Tracker)
parallelInformer.Informer().AddEventHandler(controller.HandleAll(impl.Enqueue))
Expand All @@ -60,6 +83,11 @@ func NewController(
FilterFunc: controller.FilterController(&v1.Parallel{}),
Handler: controller.HandleAll(impl.EnqueueControllerOf),
})
// Reconcile Parallel when the OIDC service account changes
serviceaccountInformer.Informer().AddEventHandler(cache.FilteringResourceEventHandler{
FilterFunc: controller.FilterController(&v1.Parallel{}),
Handler: controller.HandleAll(impl.EnqueueControllerOf),
})

return impl
}
12 changes: 11 additions & 1 deletion pkg/reconciler/parallel/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,29 @@ package parallel
import (
"testing"

corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"knative.dev/pkg/configmap"
. "knative.dev/pkg/reconciler/testing"

// Fake injection informers
"knative.dev/eventing/pkg/apis/feature"
_ "knative.dev/eventing/pkg/client/injection/ducks/duck/v1/channelable/fake"
_ "knative.dev/eventing/pkg/client/injection/informers/flows/v1/parallel/fake"
_ "knative.dev/eventing/pkg/client/injection/informers/messaging/v1/subscription/fake"
_ "knative.dev/pkg/client/injection/kube/informers/core/v1/serviceaccount/fake"
)

func TestNew(t *testing.T) {
ctx, _ := SetupFakeContext(t)

c := NewController(ctx, configmap.NewStaticWatcher())
c := NewController(ctx, configmap.NewStaticWatcher(
&corev1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Name: feature.FlagsConfigName,
},
},
))

if c == nil {
t.Fatal("Expected NewController to return a non-nil value")
Expand Down
Loading
Loading