Skip to content

Commit

Permalink
use the composite prober with the channel (#3252)
Browse files Browse the repository at this point in the history
* using the composite prober for the channel

* edit channel_test.go file

* change channel_test.go

* done changes in V2 channel

* edit in controllerv2.go

* edit addresses

* 2nd edit addresses

* Update prober pod port

Co-authored-by: Calum Murray <cmurray@redhat.com>

* Update prober pod port

Co-authored-by: Calum Murray <cmurray@redhat.com>

* Update prober pod port for TLS

Co-authored-by: Calum Murray <cmurray@redhat.com>

* Update prober pod port for TLS

Co-authored-by: Calum Murray <cmurray@redhat.com>

---------

Co-authored-by: Pierangelo Di Pilato <pierangelodipilato@gmail.com>
Co-authored-by: Calum Murray <cmurray@redhat.com>
  • Loading branch information
3 people authored Oct 13, 2023
1 parent 10fad52 commit 13e4936
Show file tree
Hide file tree
Showing 6 changed files with 76 additions and 35 deletions.
16 changes: 9 additions & 7 deletions control-plane/pkg/reconciler/channel/channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ type Reconciler struct {
ServiceLister corelisters.ServiceLister
SubscriptionLister messaginglisters.SubscriptionLister

Prober prober.Prober
Prober prober.NewProber

IngressHost string

Expand Down Expand Up @@ -346,9 +346,8 @@ func (r *Reconciler) reconcileKind(ctx context.Context, channel *messagingv1beta
addressableStatus.Addresses = []duckv1.Addressable{httpAddress}
}

address := addressableStatus.Address.URL.URL()
proberAddressable := prober.Addressable{
Address: address,
proberAddressable := prober.NewAddressable{
AddressStatus: &addressableStatus,
ResourceKey: types.NamespacedName{
Namespace: channel.GetNamespace(),
Name: channel.GetName(),
Expand Down Expand Up @@ -429,9 +428,12 @@ func (r *Reconciler) finalizeKind(ctx context.Context, channel *messagingv1beta1
// See (under discussions KIPs, unlikely to be accepted as they are):
// - https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=181306446
// - https://cwiki.apache.org/confluence/display/KAFKA/KIP-286%3A+producer.send%28%29+should+not+block+on+metadata+update
address := receiver.Address(r.IngressHost, channel)
proberAddressable := prober.Addressable{
Address: address,
address := receiver.HTTPAddress(r.IngressHost, channel)
proberAddressable := prober.NewAddressable{
AddressStatus: &duckv1.AddressStatus{
Address: &address,
Addresses: []duckv1.Addressable{address},
},
ResourceKey: types.NamespacedName{
Namespace: channel.GetNamespace(),
Name: channel.GetName(),
Expand Down
14 changes: 7 additions & 7 deletions control-plane/pkg/reconciler/channel/channel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ func TestReconcileKind(t *testing.T) {
NewConfigMapWithBinaryData(env.DataPlaneConfigMapNamespace, env.ContractConfigMapName, nil),
},
OtherTestData: map[string]interface{}{
testProber: probertesting.MockProber(prober.StatusNotReady),
testProber: probertesting.MockNewProber(prober.StatusNotReady),
},
},
{
Expand All @@ -157,7 +157,7 @@ func TestReconcileKind(t *testing.T) {
},
WantErr: true,
OtherTestData: map[string]interface{}{
testProber: probertesting.MockProber(prober.StatusReady),
testProber: probertesting.MockNewProber(prober.StatusReady),
},
},
{
Expand Down Expand Up @@ -377,7 +377,7 @@ func TestReconcileKind(t *testing.T) {
finalizerUpdatedEvent,
},
OtherTestData: map[string]interface{}{
testProber: probertesting.MockProber(prober.StatusNotReady),
testProber: probertesting.MockNewProber(prober.StatusNotReady),
},
},
{
Expand Down Expand Up @@ -447,7 +447,7 @@ func TestReconcileKind(t *testing.T) {
finalizerUpdatedEvent,
},
OtherTestData: map[string]interface{}{
testProber: probertesting.MockProber(prober.StatusUnknown),
testProber: probertesting.MockNewProber(prober.StatusUnknown),
},
},
{
Expand Down Expand Up @@ -2048,7 +2048,7 @@ func TestFinalizeKind(t *testing.T) {
},
SkipNamespaceValidation: true, // WantCreates compare the source namespace with configmap namespace, so skip it
OtherTestData: map[string]interface{}{
testProber: probertesting.MockProber(prober.StatusNotReady),
testProber: probertesting.MockNewProber(prober.StatusNotReady),
},
},
}
Expand All @@ -2058,9 +2058,9 @@ func TestFinalizeKind(t *testing.T) {

func useTable(t *testing.T, table TableTest, env config.Env) {
table.Test(t, NewFactory(&env, func(ctx context.Context, listers *Listers, env *config.Env, row *TableRow) controller.Reconciler {
proberMock := probertesting.MockProber(prober.StatusReady)
proberMock := probertesting.MockNewProber(prober.StatusReady)
if p, ok := row.OtherTestData[testProber]; ok {
proberMock = p.(prober.Prober)
proberMock = p.(prober.NewProber)
}

var featureFlags *apisconfig.KafkaFeatureFlags
Expand Down
26 changes: 23 additions & 3 deletions control-plane/pkg/reconciler/channel/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,13 @@ package channel

import (
"context"
"net/http"

"github.com/IBM/sarama"
"go.uber.org/zap"
corev1 "k8s.io/api/core/v1"
"k8s.io/client-go/tools/cache"

"knative.dev/eventing/pkg/apis/feature"
subscriptioninformer "knative.dev/eventing/pkg/client/injection/informers/messaging/v1/subscription"

messagingv1beta "knative.dev/eventing-kafka-broker/control-plane/pkg/apis/messaging/v1beta1"
Expand Down Expand Up @@ -89,10 +89,20 @@ func NewController(ctx context.Context, watcher configmap.Watcher, configs *conf
)
}

features := feature.FromContext(ctx)
caCerts, err := reconciler.getCaCerts()
if err != nil && (features.IsStrictTransportEncryption() || features.IsPermissiveTransportEncryption()) {
// We only need to warn here as the broker won't reconcile properly without the proper certs because the prober won't succeed
logger.Warn("Failed to get CA certs when at least one address uses TLS", zap.Error(err))
}

impl := kafkachannelreconciler.NewImpl(ctx, reconciler)
IPsLister := prober.IdentityIPsLister()
reconciler.Prober = prober.NewAsync(ctx, http.DefaultClient, "", IPsLister, impl.EnqueueKey)
reconciler.IngressHost = network.GetServiceHostname(configs.IngressName, configs.SystemNamespace)
reconciler.Prober, err = prober.NewComposite(ctx, "", "", IPsLister, impl.EnqueueKey, &caCerts)
if err != nil {
logger.Fatal("Failed to create prober", zap.Error(err))
}

channelInformer := kafkachannelinformer.Get(ctx)

Expand All @@ -115,6 +125,16 @@ func NewController(ctx context.Context, watcher configmap.Watcher, configs *conf
Handler: controller.HandleAll(globalResync),
})

rotateCACerts := func(obj interface{}) {
newCerts, err := reconciler.getCaCerts()
if err != nil && (features.IsPermissiveTransportEncryption() || features.IsStrictTransportEncryption()) {
// We only need to warn here as the broker won't reconcile properly without the proper certs because the prober won't succeed
logger.Warn("Failed to get new CA certs while rotating CA certs when at least one address uses TLS", zap.Error(err))
}
reconciler.Prober.RotateRootCaCerts(&newCerts)
globalResync(obj)
}

configmapInformer.Informer().AddEventHandler(cache.FilteringResourceEventHandler{
FilterFunc: controller.FilterWithNameAndNamespace(configs.DataPlaneConfigMapNamespace, configs.ContractConfigMapName),
Handler: cache.ResourceEventHandlerFuncs{
Expand All @@ -128,7 +148,7 @@ func NewController(ctx context.Context, watcher configmap.Watcher, configs *conf
secretinformer.Get(ctx).Informer().AddEventHandler(controller.HandleAll(reconciler.Tracker.OnChanged))
secretinformer.Get(ctx).Informer().AddEventHandler(cache.FilteringResourceEventHandler{
FilterFunc: controller.FilterWithName(kafkaChannelTLSSecretName),
Handler: controller.HandleAll(globalResync),
Handler: controller.HandleAll(rotateCACerts),
})

configmapinformer.Get(ctx).Informer().AddEventHandler(controller.HandleAll(
Expand Down
17 changes: 9 additions & 8 deletions control-plane/pkg/reconciler/channel/v2/channelv2.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ type Reconciler struct {
ServiceLister corelisters.ServiceLister
SubscriptionLister messaginglisters.SubscriptionLister

Prober prober.Prober
Prober prober.NewProber

IngressHost string

Expand Down Expand Up @@ -313,10 +313,8 @@ func (r *Reconciler) ReconcileKind(ctx context.Context, channel *messagingv1beta
addressableStatus.Addresses = []duckv1.Addressable{httpAddress}
}

address := addressableStatus.Address.URL.URL()

proberAddressable := prober.Addressable{
Address: address,
proberAddressable := prober.NewAddressable{
AddressStatus: &addressableStatus,
ResourceKey: types.NamespacedName{
Namespace: channel.GetNamespace(),
Name: channel.GetName(),
Expand Down Expand Up @@ -420,9 +418,12 @@ func (r *Reconciler) FinalizeKind(ctx context.Context, channel *messagingv1beta1
// See (under discussions KIPs, unlikely to be accepted as they are):
// - https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=181306446
// - https://cwiki.apache.org/confluence/display/KAFKA/KIP-286%3A+producer.send%28%29+should+not+block+on+metadata+update
address := receiver.Address(r.IngressHost, channel)
proberAddressable := prober.Addressable{
Address: address,
address := receiver.HTTPAddress(r.IngressHost, channel)
proberAddressable := prober.NewAddressable{
AddressStatus: &duckv1.AddressStatus{
Address: &address,
Addresses: []duckv1.Addressable{address},
},
ResourceKey: types.NamespacedName{
Namespace: channel.GetNamespace(),
Name: channel.GetName(),
Expand Down
14 changes: 7 additions & 7 deletions control-plane/pkg/reconciler/channel/v2/channelv2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ func TestReconcileKind(t *testing.T) {
NewConfigMapWithBinaryData(env.DataPlaneConfigMapNamespace, env.ContractConfigMapName, nil),
},
OtherTestData: map[string]interface{}{
testProber: probertesting.MockProber(prober.StatusNotReady),
testProber: probertesting.MockNewProber(prober.StatusNotReady),
},
},
{
Expand All @@ -152,7 +152,7 @@ func TestReconcileKind(t *testing.T) {
},
WantErr: true,
OtherTestData: map[string]interface{}{
testProber: probertesting.MockProber(prober.StatusReady),
testProber: probertesting.MockNewProber(prober.StatusReady),
},
},
{
Expand Down Expand Up @@ -186,7 +186,7 @@ func TestReconcileKind(t *testing.T) {
}),
},
OtherTestData: map[string]interface{}{
testProber: probertesting.MockProber(prober.StatusNotReady),
testProber: probertesting.MockNewProber(prober.StatusNotReady),
},
},
{
Expand Down Expand Up @@ -371,7 +371,7 @@ func TestReconcileKind(t *testing.T) {
finalizerUpdatedEvent,
},
OtherTestData: map[string]interface{}{
testProber: probertesting.MockProber(prober.StatusNotReady),
testProber: probertesting.MockNewProber(prober.StatusNotReady),
},
},
{
Expand Down Expand Up @@ -424,7 +424,7 @@ func TestReconcileKind(t *testing.T) {
finalizerUpdatedEvent,
},
OtherTestData: map[string]interface{}{
testProber: probertesting.MockProber(prober.StatusUnknown),
testProber: probertesting.MockNewProber(prober.StatusUnknown),
},
},
{
Expand Down Expand Up @@ -1907,9 +1907,9 @@ func TestReconcileKind(t *testing.T) {
}

table.Test(t, NewFactory(&env, func(ctx context.Context, listers *Listers, env *config.Env, row *TableRow) controller.Reconciler {
proberMock := probertesting.MockProber(prober.StatusReady)
proberMock := probertesting.MockNewProber(prober.StatusReady)
if p, ok := row.OtherTestData[testProber]; ok {
proberMock = p.(prober.Prober)
proberMock = p.(prober.NewProber)
}

var featureFlags *apisconfig.KafkaFeatureFlags
Expand Down
24 changes: 21 additions & 3 deletions control-plane/pkg/reconciler/channel/v2/controllerv2.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,12 @@ package v2

import (
"context"
"net/http"

"github.com/IBM/sarama"
"go.uber.org/zap"
corev1 "k8s.io/api/core/v1"
"k8s.io/client-go/tools/cache"
"knative.dev/eventing/pkg/apis/feature"
serviceinformer "knative.dev/pkg/client/injection/kube/informers/core/v1/service"
"knative.dev/pkg/configmap"
"knative.dev/pkg/logging"
Expand Down Expand Up @@ -90,6 +90,12 @@ func NewController(ctx context.Context, watcher configmap.Watcher, configs *conf
)
}

features := feature.FromContext(ctx)
caCerts, err := reconciler.getCaCerts()
if err != nil && (features.IsStrictTransportEncryption() || features.IsPermissiveTransportEncryption()) {
logger.Warn("Failed to get CA certs when at least one address uses TLS", zap.Error(err))
}

impl := kafkachannelreconciler.NewImpl(ctx, reconciler)

kafkaConfigStore := apisconfig.NewStore(ctx, func(name string, value *apisconfig.KafkaFeatureFlags) {
Expand All @@ -99,15 +105,27 @@ func NewController(ctx context.Context, watcher configmap.Watcher, configs *conf
kafkaConfigStore.WatchConfigs(watcher)

IPsLister := prober.IdentityIPsLister()
reconciler.Prober = prober.NewAsync(ctx, http.DefaultClient, "", IPsLister, impl.EnqueueKey)
reconciler.IngressHost = network.GetServiceHostname(configs.IngressName, configs.SystemNamespace)
reconciler.Resolver = resolver.NewURIResolverFromTracker(ctx, impl.Tracker)
reconciler.Prober, err = prober.NewComposite(ctx, "", "", IPsLister, impl.EnqueueKey, &caCerts)
if err != nil {
logger.Fatal("Failed to create prober", zap.Error(err))
}

rotateCACerts := func(obj interface{}) {
newCerts, err := reconciler.getCaCerts()
if err != nil && (features.IsPermissiveTransportEncryption() || features.IsStrictTransportEncryption()) {
// We only need to warn here as the broker won't reconcile properly without the proper certs because the prober won't succeed
logger.Warn("Failed to get new CA certs while rotating CA certs when at least one address uses TLS", zap.Error(err))
}
reconciler.Prober.RotateRootCaCerts(&newCerts)
consumergroup.Enqueue("kafkachannel", impl.EnqueueKey)
}
reconciler.Tracker = impl.Tracker
secretinformer.Get(ctx).Informer().AddEventHandler(controller.HandleAll(reconciler.Tracker.OnChanged))
secretinformer.Get(ctx).Informer().AddEventHandler(cache.FilteringResourceEventHandler{
FilterFunc: controller.FilterWithName(kafkaChannelTLSSecretName),
Handler: controller.HandleAll(consumergroup.Enqueue("kafkachannel", impl.EnqueueKey)),
Handler: controller.HandleAll(rotateCACerts),
})

reconciler.Tracker = impl.Tracker
Expand Down

0 comments on commit 13e4936

Please sign in to comment.