diff --git a/cmd/broker/filter/main.go b/cmd/broker/filter/main.go index 5e0121dca71..94709479205 100644 --- a/cmd/broker/filter/main.go +++ b/cmd/broker/filter/main.go @@ -120,10 +120,10 @@ func main() { reporter := filter.NewStatsReporter(env.ContainerName, kmeta.ChildName(env.PodName, uuid.New().String())) - oidcTokenHandler := auth.NewOIDCTokenHandler(ctx) + oidcTokenProvider := auth.NewOIDCTokenProvider(ctx) // We are running both the receiver (takes messages in from the Broker) and the dispatcher (send // the messages to the triggers' subscribers) in this binary. - handler, err := filter.NewHandler(logger, oidcTokenHandler, triggerinformer.Get(ctx), reporter, ctxFunc) + handler, err := filter.NewHandler(logger, oidcTokenProvider, triggerinformer.Get(ctx), reporter, ctxFunc) if err != nil { logger.Fatal("Error creating Handler", zap.Error(err)) } diff --git a/cmd/broker/ingress/main.go b/cmd/broker/ingress/main.go index 8cf2b2552b7..0eb838e115a 100644 --- a/cmd/broker/ingress/main.go +++ b/cmd/broker/ingress/main.go @@ -152,9 +152,9 @@ func main() { reporter := ingress.NewStatsReporter(env.ContainerName, kmeta.ChildName(env.PodName, uuid.New().String())) - oidcTokenHandler := auth.NewOIDCTokenHandler(ctx) + oidcTokenProvider := auth.NewOIDCTokenProvider(ctx) - handler, err = ingress.NewHandler(logger, reporter, broker.TTLDefaulter(logger, int32(env.MaxTTL)), brokerInformer, oidcTokenHandler) + handler, err = ingress.NewHandler(logger, reporter, broker.TTLDefaulter(logger, int32(env.MaxTTL)), brokerInformer, oidcTokenProvider) if err != nil { logger.Fatal("Error creating Handler", zap.Error(err)) } diff --git a/pkg/auth/token_provider.go b/pkg/auth/token_provider.go new file mode 100644 index 00000000000..ece3b4abc51 --- /dev/null +++ b/pkg/auth/token_provider.go @@ -0,0 +1,67 @@ +/* +Copyright 2023 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package auth + +import ( + "context" + "fmt" + + "go.uber.org/zap" + authv1 "k8s.io/api/authentication/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/kubernetes" + kubeclient "knative.dev/pkg/client/injection/kube/client" + "knative.dev/pkg/logging" +) + +type OIDCTokenProvider struct { + logger *zap.SugaredLogger + kubeClient kubernetes.Interface +} + +func NewOIDCTokenProvider(ctx context.Context) *OIDCTokenProvider { + tokenProvider := &OIDCTokenProvider{ + logger: logging.FromContext(ctx).With("component", "oidc-token-provider"), + kubeClient: kubeclient.Get(ctx), + } + + return tokenProvider +} + +// GetJWT returns a JWT from the given service account for the given audience. +func (c *OIDCTokenProvider) GetJWT(serviceAccount types.NamespacedName, audience string) (string, error) { + // TODO: check the cache + + // if not found in cache: request new token + tokenRequest := authv1.TokenRequest{ + Spec: authv1.TokenRequestSpec{ + Audiences: []string{audience}, + }, + } + + tokenRequestResponse, err := c.kubeClient. + CoreV1(). + ServiceAccounts(serviceAccount.Namespace). + CreateToken(context.TODO(), serviceAccount.Name, &tokenRequest, metav1.CreateOptions{}) + + if err != nil { + return "", fmt.Errorf("could not request a token for %s: %w", serviceAccount, err) + } + + return tokenRequestResponse.Status.Token, nil +} diff --git a/pkg/auth/token_handler.go b/pkg/auth/token_verifier.go similarity index 73% rename from pkg/auth/token_handler.go rename to pkg/auth/token_verifier.go index ffdb69a3516..99a456c0b00 100644 --- a/pkg/auth/token_handler.go +++ b/pkg/auth/token_verifier.go @@ -26,13 +26,8 @@ import ( "github.com/coreos/go-oidc/v3/oidc" "go.uber.org/zap" - authv1 "k8s.io/api/authentication/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/types" - "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" "knative.dev/eventing/pkg/apis/feature" - kubeclient "knative.dev/pkg/client/injection/kube/client" "knative.dev/pkg/injection" "knative.dev/pkg/logging" ) @@ -41,29 +36,61 @@ const ( kubernetesOIDCDiscoveryBaseURL = "https://kubernetes.default.svc" ) -type OIDCTokenHandler struct { +type OIDCTokenVerifier struct { logger *zap.SugaredLogger - kubeClient kubernetes.Interface restConfig *rest.Config provider *oidc.Provider } +type IDToken struct { + Issuer string + Audience []string + Subject string + Expiry time.Time + IssuedAt time.Time + AccessTokenHash string +} + // TODO: rename OIDCTokenProvider and OIDCTokenVerifier -func NewOIDCTokenHandler(ctx context.Context) *OIDCTokenHandler { - tokenHandler := &OIDCTokenHandler{ +func NewOIDCTokenVerifier(ctx context.Context) *OIDCTokenVerifier { + tokenHandler := &OIDCTokenVerifier{ logger: logging.FromContext(ctx).With("component", "oidc-token-handler"), - kubeClient: kubeclient.Get(ctx), restConfig: injection.GetConfig(ctx), } - if err := tokenHandler.initProvider(ctx); err != nil { + if err := tokenHandler.initOIDCProvider(ctx); err != nil { tokenHandler.logger.Error(fmt.Sprintf("could not initialize provider. You can ignore this message, when the %s feature is disabled", feature.OIDCAuthentication), zap.Error(err)) } return tokenHandler } -func (c *OIDCTokenHandler) initProvider(ctx context.Context) error { +// VerifyJWT verifies the given JWT for the expected audience and returns the parsed ID token. +func (c *OIDCTokenVerifier) VerifyJWT(ctx context.Context, jwt, audience string) (*IDToken, error) { + if c.provider == nil { + return nil, fmt.Errorf("provider is nil. Is the OIDC provider config correct?") + } + + verifier := c.provider.Verifier(&oidc.Config{ + ClientID: audience, + }) + + token, err := verifier.Verify(ctx, jwt) + if err != nil { + return nil, fmt.Errorf("could not verify JWT: %w", err) + } + + return &IDToken{ + Issuer: token.Issuer, + Audience: token.Audience, + Subject: token.Subject, + Expiry: token.Expiry, + IssuedAt: token.IssuedAt, + AccessTokenHash: token.AccessTokenHash, + }, nil +} + +func (c *OIDCTokenVerifier) initOIDCProvider(ctx context.Context) error { discovery, err := c.getKubernetesOIDCDiscovery() if err != nil { return fmt.Errorf("could not load Kubernetes OIDC discovery information: %w", err) @@ -91,7 +118,7 @@ func (c *OIDCTokenHandler) initProvider(ctx context.Context) error { return nil } -func (c *OIDCTokenHandler) getHTTPClientForKubeAPIServer() (*http.Client, error) { +func (c *OIDCTokenVerifier) getHTTPClientForKubeAPIServer() (*http.Client, error) { client, err := rest.HTTPClientFor(c.restConfig) if err != nil { return nil, fmt.Errorf("could not create HTTP client from rest config: %w", err) @@ -100,7 +127,7 @@ func (c *OIDCTokenHandler) getHTTPClientForKubeAPIServer() (*http.Client, error) return client, nil } -func (c *OIDCTokenHandler) getKubernetesOIDCDiscovery() (*openIDMetadata, error) { +func (c *OIDCTokenVerifier) getKubernetesOIDCDiscovery() (*openIDMetadata, error) { client, err := c.getHTTPClientForKubeAPIServer() if err != nil { return nil, fmt.Errorf("could not get HTTP client for API server: %w", err) @@ -125,63 +152,6 @@ func (c *OIDCTokenHandler) getKubernetesOIDCDiscovery() (*openIDMetadata, error) return openIdConfig, nil } -// GetJWT returns a JWT from the given service account for the given audience. -func (c *OIDCTokenHandler) GetJWT(serviceAccount types.NamespacedName, audience string) (string, error) { - // TODO: check the cache - - // if not found in cache: request new token - tokenRequest := authv1.TokenRequest{ - Spec: authv1.TokenRequestSpec{ - Audiences: []string{audience}, - }, - } - - tokenRequestResponse, err := c.kubeClient. - CoreV1(). - ServiceAccounts(serviceAccount.Namespace). - CreateToken(context.TODO(), serviceAccount.Name, &tokenRequest, metav1.CreateOptions{}) - - if err != nil { - return "", fmt.Errorf("could not request a token for %s: %w", serviceAccount, err) - } - - return tokenRequestResponse.Status.Token, nil -} - -// VerifyJWT verifies the given JWT for the expected audience and returns the parsed ID token. -func (c *OIDCTokenHandler) VerifyJWT(ctx context.Context, jwt, audience string) (*IDToken, error) { - if c.provider == nil { - return nil, fmt.Errorf("provider is nil. Is the OIDC provider config correct?") - } - - verifier := c.provider.Verifier(&oidc.Config{ - ClientID: audience, - }) - - token, err := verifier.Verify(ctx, jwt) - if err != nil { - return nil, fmt.Errorf("could not verify JWT: %w", err) - } - - return &IDToken{ - Issuer: token.Issuer, - Audience: token.Audience, - Subject: token.Subject, - Expiry: token.Expiry, - IssuedAt: token.IssuedAt, - AccessTokenHash: token.AccessTokenHash, - }, nil -} - -type IDToken struct { - Issuer string - Audience []string - Subject string - Expiry time.Time - IssuedAt time.Time - AccessTokenHash string -} - type openIDMetadata struct { Issuer string `json:"issuer"` JWKSURI string `json:"jwks_uri"` diff --git a/pkg/broker/filter/filter_handler.go b/pkg/broker/filter/filter_handler.go index fca927d757f..175a1eb95ca 100644 --- a/pkg/broker/filter/filter_handler.go +++ b/pkg/broker/filter/filter_handler.go @@ -79,7 +79,7 @@ type Handler struct { } // NewHandler creates a new Handler and its associated EventReceiver. -func NewHandler(logger *zap.Logger, oidcTokenHandler *auth.OIDCTokenHandler, triggerInformer v1.TriggerInformer, reporter StatsReporter, wc func(ctx context.Context) context.Context) (*Handler, error) { +func NewHandler(logger *zap.Logger, oidcTokenProvider *auth.OIDCTokenProvider, triggerInformer v1.TriggerInformer, reporter StatsReporter, wc func(ctx context.Context) context.Context) (*Handler, error) { kncloudevents.ConfigureConnectionArgs(&kncloudevents.ConnectionArgs{ MaxIdleConns: defaultMaxIdleConnections, MaxIdleConnsPerHost: defaultMaxIdleConnectionsPerHost, @@ -128,7 +128,7 @@ func NewHandler(logger *zap.Logger, oidcTokenHandler *auth.OIDCTokenHandler, tri return &Handler{ reporter: reporter, - eventDispatcher: kncloudevents.NewDispatcher(oidcTokenHandler), + eventDispatcher: kncloudevents.NewDispatcher(oidcTokenProvider), triggerLister: triggerInformer.Lister(), logger: logger, withContext: wc, diff --git a/pkg/broker/filter/filter_handler_test.go b/pkg/broker/filter/filter_handler_test.go index 572d1321145..d7a4c58c14b 100644 --- a/pkg/broker/filter/filter_handler_test.go +++ b/pkg/broker/filter/filter_handler_test.go @@ -430,7 +430,7 @@ func TestReceiver(t *testing.T) { defer s.Close() logger := zaptest.NewLogger(t, zaptest.WrapOptions(zap.AddCaller())) - oidcTokenHandler := auth.NewOIDCTokenHandler(ctx) + oidcTokenProvider := auth.NewOIDCTokenProvider(ctx) // Replace the SubscriberURI to point at our fake server. for _, trig := range tc.triggers { @@ -447,7 +447,7 @@ func TestReceiver(t *testing.T) { reporter := &mockReporter{} r, err := NewHandler( logger, - oidcTokenHandler, + oidcTokenProvider, triggerinformerfake.Get(ctx), reporter, func(ctx context.Context) context.Context { @@ -615,7 +615,7 @@ func TestReceiver_WithSubscriptionsAPI(t *testing.T) { filtersMap := subscriptionsapi.NewFiltersMap() logger := zaptest.NewLogger(t, zaptest.WrapOptions(zap.AddCaller())) - oidcTokenHandler := auth.NewOIDCTokenHandler(ctx) + oidcTokenProvider := auth.NewOIDCTokenProvider(ctx) // Replace the SubscriberURI to point at our fake server. for _, trig := range tc.triggers { @@ -633,7 +633,7 @@ func TestReceiver_WithSubscriptionsAPI(t *testing.T) { reporter := &mockReporter{} r, err := NewHandler( logger, - oidcTokenHandler, + oidcTokenProvider, triggerinformerfake.Get(ctx), reporter, func(ctx context.Context) context.Context { diff --git a/pkg/broker/ingress/ingress_handler.go b/pkg/broker/ingress/ingress_handler.go index 88a1db1a139..44e811485c9 100644 --- a/pkg/broker/ingress/ingress_handler.go +++ b/pkg/broker/ingress/ingress_handler.go @@ -66,12 +66,12 @@ type Handler struct { Logger *zap.Logger - oidcTokenHandler *auth.OIDCTokenHandler + oidcTokenProvider *auth.OIDCTokenProvider eventDispatcher *kncloudevents.Dispatcher } -func NewHandler(logger *zap.Logger, reporter StatsReporter, defaulter client.EventDefaulter, brokerInformer v1.BrokerInformer, oidcTokenHandler *auth.OIDCTokenHandler) (*Handler, error) { +func NewHandler(logger *zap.Logger, reporter StatsReporter, defaulter client.EventDefaulter, brokerInformer v1.BrokerInformer, oidcTokenProvider *auth.OIDCTokenProvider) (*Handler, error) { connectionArgs := kncloudevents.ConnectionArgs{ MaxIdleConns: defaultMaxIdleConnections, MaxIdleConnsPerHost: defaultMaxIdleConnectionsPerHost, @@ -112,12 +112,12 @@ func NewHandler(logger *zap.Logger, reporter StatsReporter, defaulter client.Eve }) return &Handler{ - Defaulter: defaulter, - Reporter: reporter, - Logger: logger, - BrokerLister: brokerInformer.Lister(), - oidcTokenHandler: oidcTokenHandler, - eventDispatcher: kncloudevents.NewDispatcher(oidcTokenHandler), + Defaulter: defaulter, + Reporter: reporter, + Logger: logger, + BrokerLister: brokerInformer.Lister(), + oidcTokenProvider: oidcTokenProvider, + eventDispatcher: kncloudevents.NewDispatcher(oidcTokenProvider), }, nil } diff --git a/pkg/broker/ingress/ingress_handler_test.go b/pkg/broker/ingress/ingress_handler_test.go index a2e6d39d4df..e9b0b614d2f 100644 --- a/pkg/broker/ingress/ingress_handler_test.go +++ b/pkg/broker/ingress/ingress_handler_test.go @@ -285,9 +285,9 @@ func TestHandler_ServeHTTP(t *testing.T) { brokerinformerfake.Get(ctx).Informer().GetStore().Add(b) } - oidcTokenHandler := auth.NewOIDCTokenHandler(ctx) + oidcTokenProvider := auth.NewOIDCTokenProvider(ctx) - h, err := NewHandler(logger, &mockReporter{}, tc.defaulter, brokerinformerfake.Get(ctx), oidcTokenHandler) + h, err := NewHandler(logger, &mockReporter{}, tc.defaulter, brokerinformerfake.Get(ctx), oidcTokenProvider) if err != nil { t.Fatal("Unable to create receiver:", err) } diff --git a/pkg/channel/fanout/fanout_event_handler_test.go b/pkg/channel/fanout/fanout_event_handler_test.go index 1128f46c8c5..44ab95ccf82 100644 --- a/pkg/channel/fanout/fanout_event_handler_test.go +++ b/pkg/channel/fanout/fanout_event_handler_test.go @@ -371,8 +371,8 @@ func testFanoutEventHandler(t *testing.T, async bool, receiverFunc channel.Event t.Fatal(err) } - oidcTokenHandler := auth.NewOIDCTokenHandler(ctx) - dispatcher := kncloudevents.NewDispatcher(oidcTokenHandler) + oidcTokenProvider := auth.NewOIDCTokenProvider(ctx) + dispatcher := kncloudevents.NewDispatcher(oidcTokenProvider) calledChan := make(chan bool, 1) recvOptionFunc := func(*channel.EventReceiver) error { diff --git a/pkg/channel/multichannelfanout/multi_channel_fanout_event_handler_test.go b/pkg/channel/multichannelfanout/multi_channel_fanout_event_handler_test.go index 65f11e4b0e4..455d3fc9746 100644 --- a/pkg/channel/multichannelfanout/multi_channel_fanout_event_handler_test.go +++ b/pkg/channel/multichannelfanout/multi_channel_fanout_event_handler_test.go @@ -79,9 +79,9 @@ func TestNewEventHandlerWithConfig(t *testing.T) { ctx = injection.WithConfig(ctx, &rest.Config{}) logger := zaptest.NewLogger(t, zaptest.WrapOptions(zap.AddCaller())) - oidcTokenHandler := auth.NewOIDCTokenHandler(ctx) + oidcTokenProvider := auth.NewOIDCTokenProvider(ctx) - dispatcher := kncloudevents.NewDispatcher(oidcTokenHandler) + dispatcher := kncloudevents.NewDispatcher(oidcTokenProvider) _, err := NewEventHandlerWithConfig( context.TODO(), logger, @@ -112,8 +112,8 @@ func TestNewEventHandler(t *testing.T) { reporter := channel.NewStatsReporter("testcontainer", "testpod") logger := zaptest.NewLogger(t, zaptest.WrapOptions(zap.AddCaller())) - oidcTokenHandler := auth.NewOIDCTokenHandler(ctx) - dispatcher := kncloudevents.NewDispatcher(oidcTokenHandler) + oidcTokenProvider := auth.NewOIDCTokenProvider(ctx) + dispatcher := kncloudevents.NewDispatcher(oidcTokenProvider) handler := NewEventHandler(context.TODO(), logger) h := handler.GetChannelHandler(handlerName) @@ -337,8 +337,8 @@ func TestServeHTTPEventHandler(t *testing.T) { logger := zaptest.NewLogger(t, zaptest.WrapOptions(zap.AddCaller())) reporter := channel.NewStatsReporter("testcontainer", "testpod") - oidcTokenHandler := auth.NewOIDCTokenHandler(ctx) - dispatcher := kncloudevents.NewDispatcher(oidcTokenHandler) + oidcTokenProvider := auth.NewOIDCTokenProvider(ctx) + dispatcher := kncloudevents.NewDispatcher(oidcTokenProvider) h, err := NewEventHandlerWithConfig(context.TODO(), logger, tc.config, reporter, dispatcher, tc.recvOptions...) if err != nil { t.Fatalf("Unexpected NewHandler error: '%v'", err) diff --git a/pkg/inmemorychannel/event_dispatcher_test.go b/pkg/inmemorychannel/event_dispatcher_test.go index 63d95497d12..904933a4f8a 100644 --- a/pkg/inmemorychannel/event_dispatcher_test.go +++ b/pkg/inmemorychannel/event_dispatcher_test.go @@ -113,8 +113,8 @@ func TestDispatcher_dispatch(t *testing.T) { ctx = injection.WithConfig(ctx, &rest.Config{}) logger, err := zap.NewDevelopment(zap.AddStacktrace(zap.WarnLevel)) - oidcTokenHandler := auth.NewOIDCTokenHandler(ctx) - dispatcher := kncloudevents.NewDispatcher(oidcTokenHandler) + oidcTokenProvider := auth.NewOIDCTokenProvider(ctx) + dispatcher := kncloudevents.NewDispatcher(oidcTokenProvider) reporter := channel.NewStatsReporter("testcontainer", "testpod") if err != nil { t.Fatal(err) @@ -269,7 +269,7 @@ func TestDispatcher_dispatch(t *testing.T) { inMemoryDispatcher.WaitReady() // Ok now everything should be ready to send the event - d := kncloudevents.NewDispatcher(oidcTokenHandler) + d := kncloudevents.NewDispatcher(oidcTokenProvider) dispatchInfo, err := d.SendEvent(context.TODO(), test.FullEvent(), *mustParseUrlToAddressable(t, channelAProxy.URL)) if err != nil { t.Fatal(err) diff --git a/pkg/kncloudevents/event_dispatcher.go b/pkg/kncloudevents/event_dispatcher.go index 2e819323173..442b6125215 100644 --- a/pkg/kncloudevents/event_dispatcher.go +++ b/pkg/kncloudevents/event_dispatcher.go @@ -124,12 +124,12 @@ type senderConfig struct { } type Dispatcher struct { - oidcTokenHandler *auth.OIDCTokenHandler + oidcTokenProvider *auth.OIDCTokenProvider } -func NewDispatcher(oidcTokenHandler *auth.OIDCTokenHandler) *Dispatcher { +func NewDispatcher(oidcTokenProvider *auth.OIDCTokenProvider) *Dispatcher { return &Dispatcher{ - oidcTokenHandler: oidcTokenHandler, + oidcTokenProvider: oidcTokenProvider, } } @@ -345,7 +345,7 @@ func (d *Dispatcher) createRequest(ctx context.Context, message binding.Message, if oidcServiceAccount != nil { if target.Audience != nil && *target.Audience != "" { - jwt, err := d.oidcTokenHandler.GetJWT(*oidcServiceAccount, *target.Audience) + jwt, err := d.oidcTokenProvider.GetJWT(*oidcServiceAccount, *target.Audience) if err != nil { return nil, fmt.Errorf("could not get JWT: %w", err) } diff --git a/pkg/kncloudevents/event_dispatcher_test.go b/pkg/kncloudevents/event_dispatcher_test.go index 76a610964d6..a988e7aa358 100644 --- a/pkg/kncloudevents/event_dispatcher_test.go +++ b/pkg/kncloudevents/event_dispatcher_test.go @@ -788,8 +788,8 @@ func TestSendEvent(t *testing.T) { ctx, _ = fakekubeclient.With(ctx) ctx = injection.WithConfig(ctx, &rest.Config{}) - oidcTokenHandler := auth.NewOIDCTokenHandler(ctx) - dispatcher := kncloudevents.NewDispatcher(oidcTokenHandler) + oidcTokenProvider := auth.NewOIDCTokenProvider(ctx) + dispatcher := kncloudevents.NewDispatcher(oidcTokenProvider) destHandler := &fakeHandler{ t: t, response: tc.fakeResponse, @@ -934,8 +934,8 @@ func TestDispatchMessageToTLSEndpoint(t *testing.T) { // give the servers a bit time to fully shutdown to prevent port clashes time.Sleep(500 * time.Millisecond) }() - oidcTokenHandler := auth.NewOIDCTokenHandler(ctx) - dispatcher := kncloudevents.NewDispatcher(oidcTokenHandler) + oidcTokenProvider := auth.NewOIDCTokenProvider(ctx) + dispatcher := kncloudevents.NewDispatcher(oidcTokenProvider) eventToSend := test.FullEvent() // destination @@ -982,8 +982,8 @@ func TestDispatchMessageToTLSEndpointWithReply(t *testing.T) { // give the servers a bit time to fully shutdown to prevent port clashes time.Sleep(500 * time.Millisecond) }() - oidcTokenHandler := auth.NewOIDCTokenHandler(ctx) - dispatcher := kncloudevents.NewDispatcher(oidcTokenHandler) + oidcTokenProvider := auth.NewOIDCTokenProvider(ctx) + dispatcher := kncloudevents.NewDispatcher(oidcTokenProvider) eventToSend := test.FullEvent() eventToReply := test.FullEvent() @@ -1047,8 +1047,8 @@ func TestDispatchMessageToTLSEndpointWithDeadLetterSink(t *testing.T) { // give the servers a bit time to fully shutdown to prevent port clashes time.Sleep(500 * time.Millisecond) }() - oidcTokenHandler := auth.NewOIDCTokenHandler(ctx) - dispatcher := kncloudevents.NewDispatcher(oidcTokenHandler) + oidcTokenProvider := auth.NewOIDCTokenProvider(ctx) + dispatcher := kncloudevents.NewDispatcher(oidcTokenProvider) eventToSend := test.FullEvent() // destination diff --git a/pkg/reconciler/broker/broker.go b/pkg/reconciler/broker/broker.go index 61e0e4705de..4a88ee648bd 100644 --- a/pkg/reconciler/broker/broker.go +++ b/pkg/reconciler/broker/broker.go @@ -76,8 +76,6 @@ type Reconciler struct { // If specified, only reconcile brokers with these labels brokerClass string - - oidcTokenHandler *auth.OIDCTokenHandler } // Check that our Reconciler implements Interface diff --git a/pkg/reconciler/inmemorychannel/dispatcher/controller.go b/pkg/reconciler/inmemorychannel/dispatcher/controller.go index 8954e112d1e..d344455a17a 100644 --- a/pkg/reconciler/inmemorychannel/dispatcher/controller.go +++ b/pkg/reconciler/inmemorychannel/dispatcher/controller.go @@ -119,7 +119,7 @@ func NewController( chMsgHandler: sh, } - oidcTokenHandler := auth.NewOIDCTokenHandler(ctx) + oidcTokenProvider := auth.NewOIDCTokenProvider(ctx) r := &Reconciler{ multiChannelEventHandler: sh, @@ -127,7 +127,7 @@ func NewController( messagingClientSet: eventingclient.Get(ctx).MessagingV1(), eventingClient: eventingclient.Get(ctx).EventingV1beta2(), eventTypeLister: eventtypeinformer.Get(ctx).Lister(), - eventDispatcher: kncloudevents.NewDispatcher(oidcTokenHandler), + eventDispatcher: kncloudevents.NewDispatcher(oidcTokenProvider), } impl := inmemorychannelreconciler.NewImpl(ctx, r, func(impl *controller.Impl) controller.Options { diff --git a/pkg/reconciler/inmemorychannel/dispatcher/inmemorychannel_test.go b/pkg/reconciler/inmemorychannel/dispatcher/inmemorychannel_test.go index f4da73af797..5a1ebb34884 100644 --- a/pkg/reconciler/inmemorychannel/dispatcher/inmemorychannel_test.go +++ b/pkg/reconciler/inmemorychannel/dispatcher/inmemorychannel_test.go @@ -498,8 +498,8 @@ func TestReconciler_ReconcileKind(t *testing.T) { feature.EvenTypeAutoCreate: feature.Disabled, }) - oidcTokenHandler := auth.NewOIDCTokenHandler(ctx) - dispatcher := kncloudevents.NewDispatcher(oidcTokenHandler) + oidcTokenProvider := auth.NewOIDCTokenProvider(ctx) + dispatcher := kncloudevents.NewDispatcher(oidcTokenProvider) // Just run the tests once with no existing handler (creates the handler) and once // with an existing, so we exercise both paths at once. fh, err := fanout.NewFanoutEventHandler(nil, fanout.Config{}, nil, nil, nil, nil, dispatcher) @@ -549,8 +549,8 @@ func TestReconciler_InvalidInputs(t *testing.T) { for n, tc := range testCases { ctx, _ := SetupFakeContext(t) - oidcTokenHandler := auth.NewOIDCTokenHandler(ctx) - dispatcher := kncloudevents.NewDispatcher(oidcTokenHandler) + oidcTokenProvider := auth.NewOIDCTokenProvider(ctx) + dispatcher := kncloudevents.NewDispatcher(oidcTokenProvider) fh, err := fanout.NewFanoutEventHandler(nil, fanout.Config{}, nil, nil, nil, nil, dispatcher) if err != nil { t.Error(err) @@ -583,8 +583,8 @@ func TestReconciler_Deletion(t *testing.T) { for n, tc := range testCases { ctx, _ := SetupFakeContext(t) - oidcTokenHandler := auth.NewOIDCTokenHandler(ctx) - dispatcher := kncloudevents.NewDispatcher(oidcTokenHandler) + oidcTokenProvider := auth.NewOIDCTokenProvider(ctx) + dispatcher := kncloudevents.NewDispatcher(oidcTokenProvider) fh, err := fanout.NewFanoutEventHandler(nil, fanout.Config{}, nil, nil, nil, nil, dispatcher) if err != nil { t.Error(err)