From 13e49363a00bde279bad7d006ccac094ed73a269 Mon Sep 17 00:00:00 2001 From: Rahul kumar <68837569+Rahul-Kumar-prog@users.noreply.github.com> Date: Fri, 13 Oct 2023 20:46:20 +0530 Subject: [PATCH] use the composite prober with the channel (#3252) * using the composite prober for the channel * edit channel_test.go file * change channel_test.go * done changes in V2 channel * edit in controllerv2.go * edit addresses * 2nd edit addresses * Update prober pod port Co-authored-by: Calum Murray * Update prober pod port Co-authored-by: Calum Murray * Update prober pod port for TLS Co-authored-by: Calum Murray * Update prober pod port for TLS Co-authored-by: Calum Murray --------- Co-authored-by: Pierangelo Di Pilato Co-authored-by: Calum Murray --- .../pkg/reconciler/channel/channel.go | 16 +++++++----- .../pkg/reconciler/channel/channel_test.go | 14 +++++----- .../pkg/reconciler/channel/controller.go | 26 ++++++++++++++++--- .../pkg/reconciler/channel/v2/channelv2.go | 17 ++++++------ .../reconciler/channel/v2/channelv2_test.go | 14 +++++----- .../pkg/reconciler/channel/v2/controllerv2.go | 24 ++++++++++++++--- 6 files changed, 76 insertions(+), 35 deletions(-) diff --git a/control-plane/pkg/reconciler/channel/channel.go b/control-plane/pkg/reconciler/channel/channel.go index e50b284579..cc705f1987 100644 --- a/control-plane/pkg/reconciler/channel/channel.go +++ b/control-plane/pkg/reconciler/channel/channel.go @@ -96,7 +96,7 @@ type Reconciler struct { ServiceLister corelisters.ServiceLister SubscriptionLister messaginglisters.SubscriptionLister - Prober prober.Prober + Prober prober.NewProber IngressHost string @@ -346,9 +346,8 @@ func (r *Reconciler) reconcileKind(ctx context.Context, channel *messagingv1beta addressableStatus.Addresses = []duckv1.Addressable{httpAddress} } - address := addressableStatus.Address.URL.URL() - proberAddressable := prober.Addressable{ - Address: address, + proberAddressable := prober.NewAddressable{ + AddressStatus: &addressableStatus, ResourceKey: types.NamespacedName{ Namespace: channel.GetNamespace(), Name: channel.GetName(), @@ -429,9 +428,12 @@ func (r *Reconciler) finalizeKind(ctx context.Context, channel *messagingv1beta1 // See (under discussions KIPs, unlikely to be accepted as they are): // - https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=181306446 // - https://cwiki.apache.org/confluence/display/KAFKA/KIP-286%3A+producer.send%28%29+should+not+block+on+metadata+update - address := receiver.Address(r.IngressHost, channel) - proberAddressable := prober.Addressable{ - Address: address, + address := receiver.HTTPAddress(r.IngressHost, channel) + proberAddressable := prober.NewAddressable{ + AddressStatus: &duckv1.AddressStatus{ + Address: &address, + Addresses: []duckv1.Addressable{address}, + }, ResourceKey: types.NamespacedName{ Namespace: channel.GetNamespace(), Name: channel.GetName(), diff --git a/control-plane/pkg/reconciler/channel/channel_test.go b/control-plane/pkg/reconciler/channel/channel_test.go index e5a92ffa9c..6e72a56b39 100644 --- a/control-plane/pkg/reconciler/channel/channel_test.go +++ b/control-plane/pkg/reconciler/channel/channel_test.go @@ -140,7 +140,7 @@ func TestReconcileKind(t *testing.T) { NewConfigMapWithBinaryData(env.DataPlaneConfigMapNamespace, env.ContractConfigMapName, nil), }, OtherTestData: map[string]interface{}{ - testProber: probertesting.MockProber(prober.StatusNotReady), + testProber: probertesting.MockNewProber(prober.StatusNotReady), }, }, { @@ -157,7 +157,7 @@ func TestReconcileKind(t *testing.T) { }, WantErr: true, OtherTestData: map[string]interface{}{ - testProber: probertesting.MockProber(prober.StatusReady), + testProber: probertesting.MockNewProber(prober.StatusReady), }, }, { @@ -377,7 +377,7 @@ func TestReconcileKind(t *testing.T) { finalizerUpdatedEvent, }, OtherTestData: map[string]interface{}{ - testProber: probertesting.MockProber(prober.StatusNotReady), + testProber: probertesting.MockNewProber(prober.StatusNotReady), }, }, { @@ -447,7 +447,7 @@ func TestReconcileKind(t *testing.T) { finalizerUpdatedEvent, }, OtherTestData: map[string]interface{}{ - testProber: probertesting.MockProber(prober.StatusUnknown), + testProber: probertesting.MockNewProber(prober.StatusUnknown), }, }, { @@ -2048,7 +2048,7 @@ func TestFinalizeKind(t *testing.T) { }, SkipNamespaceValidation: true, // WantCreates compare the source namespace with configmap namespace, so skip it OtherTestData: map[string]interface{}{ - testProber: probertesting.MockProber(prober.StatusNotReady), + testProber: probertesting.MockNewProber(prober.StatusNotReady), }, }, } @@ -2058,9 +2058,9 @@ func TestFinalizeKind(t *testing.T) { func useTable(t *testing.T, table TableTest, env config.Env) { table.Test(t, NewFactory(&env, func(ctx context.Context, listers *Listers, env *config.Env, row *TableRow) controller.Reconciler { - proberMock := probertesting.MockProber(prober.StatusReady) + proberMock := probertesting.MockNewProber(prober.StatusReady) if p, ok := row.OtherTestData[testProber]; ok { - proberMock = p.(prober.Prober) + proberMock = p.(prober.NewProber) } var featureFlags *apisconfig.KafkaFeatureFlags diff --git a/control-plane/pkg/reconciler/channel/controller.go b/control-plane/pkg/reconciler/channel/controller.go index c2e269e463..1a38d96ae3 100644 --- a/control-plane/pkg/reconciler/channel/controller.go +++ b/control-plane/pkg/reconciler/channel/controller.go @@ -18,13 +18,13 @@ package channel import ( "context" - "net/http" "github.com/IBM/sarama" "go.uber.org/zap" corev1 "k8s.io/api/core/v1" "k8s.io/client-go/tools/cache" + "knative.dev/eventing/pkg/apis/feature" subscriptioninformer "knative.dev/eventing/pkg/client/injection/informers/messaging/v1/subscription" messagingv1beta "knative.dev/eventing-kafka-broker/control-plane/pkg/apis/messaging/v1beta1" @@ -89,10 +89,20 @@ func NewController(ctx context.Context, watcher configmap.Watcher, configs *conf ) } + features := feature.FromContext(ctx) + caCerts, err := reconciler.getCaCerts() + if err != nil && (features.IsStrictTransportEncryption() || features.IsPermissiveTransportEncryption()) { + // We only need to warn here as the broker won't reconcile properly without the proper certs because the prober won't succeed + logger.Warn("Failed to get CA certs when at least one address uses TLS", zap.Error(err)) + } + impl := kafkachannelreconciler.NewImpl(ctx, reconciler) IPsLister := prober.IdentityIPsLister() - reconciler.Prober = prober.NewAsync(ctx, http.DefaultClient, "", IPsLister, impl.EnqueueKey) reconciler.IngressHost = network.GetServiceHostname(configs.IngressName, configs.SystemNamespace) + reconciler.Prober, err = prober.NewComposite(ctx, "", "", IPsLister, impl.EnqueueKey, &caCerts) + if err != nil { + logger.Fatal("Failed to create prober", zap.Error(err)) + } channelInformer := kafkachannelinformer.Get(ctx) @@ -115,6 +125,16 @@ func NewController(ctx context.Context, watcher configmap.Watcher, configs *conf Handler: controller.HandleAll(globalResync), }) + rotateCACerts := func(obj interface{}) { + newCerts, err := reconciler.getCaCerts() + if err != nil && (features.IsPermissiveTransportEncryption() || features.IsStrictTransportEncryption()) { + // We only need to warn here as the broker won't reconcile properly without the proper certs because the prober won't succeed + logger.Warn("Failed to get new CA certs while rotating CA certs when at least one address uses TLS", zap.Error(err)) + } + reconciler.Prober.RotateRootCaCerts(&newCerts) + globalResync(obj) + } + configmapInformer.Informer().AddEventHandler(cache.FilteringResourceEventHandler{ FilterFunc: controller.FilterWithNameAndNamespace(configs.DataPlaneConfigMapNamespace, configs.ContractConfigMapName), Handler: cache.ResourceEventHandlerFuncs{ @@ -128,7 +148,7 @@ func NewController(ctx context.Context, watcher configmap.Watcher, configs *conf secretinformer.Get(ctx).Informer().AddEventHandler(controller.HandleAll(reconciler.Tracker.OnChanged)) secretinformer.Get(ctx).Informer().AddEventHandler(cache.FilteringResourceEventHandler{ FilterFunc: controller.FilterWithName(kafkaChannelTLSSecretName), - Handler: controller.HandleAll(globalResync), + Handler: controller.HandleAll(rotateCACerts), }) configmapinformer.Get(ctx).Informer().AddEventHandler(controller.HandleAll( diff --git a/control-plane/pkg/reconciler/channel/v2/channelv2.go b/control-plane/pkg/reconciler/channel/v2/channelv2.go index e9d8f68243..601cdafaa5 100644 --- a/control-plane/pkg/reconciler/channel/v2/channelv2.go +++ b/control-plane/pkg/reconciler/channel/v2/channelv2.go @@ -103,7 +103,7 @@ type Reconciler struct { ServiceLister corelisters.ServiceLister SubscriptionLister messaginglisters.SubscriptionLister - Prober prober.Prober + Prober prober.NewProber IngressHost string @@ -313,10 +313,8 @@ func (r *Reconciler) ReconcileKind(ctx context.Context, channel *messagingv1beta addressableStatus.Addresses = []duckv1.Addressable{httpAddress} } - address := addressableStatus.Address.URL.URL() - - proberAddressable := prober.Addressable{ - Address: address, + proberAddressable := prober.NewAddressable{ + AddressStatus: &addressableStatus, ResourceKey: types.NamespacedName{ Namespace: channel.GetNamespace(), Name: channel.GetName(), @@ -420,9 +418,12 @@ func (r *Reconciler) FinalizeKind(ctx context.Context, channel *messagingv1beta1 // See (under discussions KIPs, unlikely to be accepted as they are): // - https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=181306446 // - https://cwiki.apache.org/confluence/display/KAFKA/KIP-286%3A+producer.send%28%29+should+not+block+on+metadata+update - address := receiver.Address(r.IngressHost, channel) - proberAddressable := prober.Addressable{ - Address: address, + address := receiver.HTTPAddress(r.IngressHost, channel) + proberAddressable := prober.NewAddressable{ + AddressStatus: &duckv1.AddressStatus{ + Address: &address, + Addresses: []duckv1.Addressable{address}, + }, ResourceKey: types.NamespacedName{ Namespace: channel.GetNamespace(), Name: channel.GetName(), diff --git a/control-plane/pkg/reconciler/channel/v2/channelv2_test.go b/control-plane/pkg/reconciler/channel/v2/channelv2_test.go index aa698ae936..84b7fbfac7 100644 --- a/control-plane/pkg/reconciler/channel/v2/channelv2_test.go +++ b/control-plane/pkg/reconciler/channel/v2/channelv2_test.go @@ -135,7 +135,7 @@ func TestReconcileKind(t *testing.T) { NewConfigMapWithBinaryData(env.DataPlaneConfigMapNamespace, env.ContractConfigMapName, nil), }, OtherTestData: map[string]interface{}{ - testProber: probertesting.MockProber(prober.StatusNotReady), + testProber: probertesting.MockNewProber(prober.StatusNotReady), }, }, { @@ -152,7 +152,7 @@ func TestReconcileKind(t *testing.T) { }, WantErr: true, OtherTestData: map[string]interface{}{ - testProber: probertesting.MockProber(prober.StatusReady), + testProber: probertesting.MockNewProber(prober.StatusReady), }, }, { @@ -186,7 +186,7 @@ func TestReconcileKind(t *testing.T) { }), }, OtherTestData: map[string]interface{}{ - testProber: probertesting.MockProber(prober.StatusNotReady), + testProber: probertesting.MockNewProber(prober.StatusNotReady), }, }, { @@ -371,7 +371,7 @@ func TestReconcileKind(t *testing.T) { finalizerUpdatedEvent, }, OtherTestData: map[string]interface{}{ - testProber: probertesting.MockProber(prober.StatusNotReady), + testProber: probertesting.MockNewProber(prober.StatusNotReady), }, }, { @@ -424,7 +424,7 @@ func TestReconcileKind(t *testing.T) { finalizerUpdatedEvent, }, OtherTestData: map[string]interface{}{ - testProber: probertesting.MockProber(prober.StatusUnknown), + testProber: probertesting.MockNewProber(prober.StatusUnknown), }, }, { @@ -1907,9 +1907,9 @@ func TestReconcileKind(t *testing.T) { } table.Test(t, NewFactory(&env, func(ctx context.Context, listers *Listers, env *config.Env, row *TableRow) controller.Reconciler { - proberMock := probertesting.MockProber(prober.StatusReady) + proberMock := probertesting.MockNewProber(prober.StatusReady) if p, ok := row.OtherTestData[testProber]; ok { - proberMock = p.(prober.Prober) + proberMock = p.(prober.NewProber) } var featureFlags *apisconfig.KafkaFeatureFlags diff --git a/control-plane/pkg/reconciler/channel/v2/controllerv2.go b/control-plane/pkg/reconciler/channel/v2/controllerv2.go index e9ed060a5f..ff191c526d 100644 --- a/control-plane/pkg/reconciler/channel/v2/controllerv2.go +++ b/control-plane/pkg/reconciler/channel/v2/controllerv2.go @@ -18,12 +18,12 @@ package v2 import ( "context" - "net/http" "github.com/IBM/sarama" "go.uber.org/zap" corev1 "k8s.io/api/core/v1" "k8s.io/client-go/tools/cache" + "knative.dev/eventing/pkg/apis/feature" serviceinformer "knative.dev/pkg/client/injection/kube/informers/core/v1/service" "knative.dev/pkg/configmap" "knative.dev/pkg/logging" @@ -90,6 +90,12 @@ func NewController(ctx context.Context, watcher configmap.Watcher, configs *conf ) } + features := feature.FromContext(ctx) + caCerts, err := reconciler.getCaCerts() + if err != nil && (features.IsStrictTransportEncryption() || features.IsPermissiveTransportEncryption()) { + logger.Warn("Failed to get CA certs when at least one address uses TLS", zap.Error(err)) + } + impl := kafkachannelreconciler.NewImpl(ctx, reconciler) kafkaConfigStore := apisconfig.NewStore(ctx, func(name string, value *apisconfig.KafkaFeatureFlags) { @@ -99,15 +105,27 @@ func NewController(ctx context.Context, watcher configmap.Watcher, configs *conf kafkaConfigStore.WatchConfigs(watcher) IPsLister := prober.IdentityIPsLister() - reconciler.Prober = prober.NewAsync(ctx, http.DefaultClient, "", IPsLister, impl.EnqueueKey) reconciler.IngressHost = network.GetServiceHostname(configs.IngressName, configs.SystemNamespace) reconciler.Resolver = resolver.NewURIResolverFromTracker(ctx, impl.Tracker) + reconciler.Prober, err = prober.NewComposite(ctx, "", "", IPsLister, impl.EnqueueKey, &caCerts) + if err != nil { + logger.Fatal("Failed to create prober", zap.Error(err)) + } + rotateCACerts := func(obj interface{}) { + newCerts, err := reconciler.getCaCerts() + if err != nil && (features.IsPermissiveTransportEncryption() || features.IsStrictTransportEncryption()) { + // We only need to warn here as the broker won't reconcile properly without the proper certs because the prober won't succeed + logger.Warn("Failed to get new CA certs while rotating CA certs when at least one address uses TLS", zap.Error(err)) + } + reconciler.Prober.RotateRootCaCerts(&newCerts) + consumergroup.Enqueue("kafkachannel", impl.EnqueueKey) + } reconciler.Tracker = impl.Tracker secretinformer.Get(ctx).Informer().AddEventHandler(controller.HandleAll(reconciler.Tracker.OnChanged)) secretinformer.Get(ctx).Informer().AddEventHandler(cache.FilteringResourceEventHandler{ FilterFunc: controller.FilterWithName(kafkaChannelTLSSecretName), - Handler: controller.HandleAll(consumergroup.Enqueue("kafkachannel", impl.EnqueueKey)), + Handler: controller.HandleAll(rotateCACerts), }) reconciler.Tracker = impl.Tracker