Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

use the composite prober with the channel #3252

Merged
merged 14 commits into from
Oct 13, 2023
Merged
16 changes: 9 additions & 7 deletions control-plane/pkg/reconciler/channel/channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ type Reconciler struct {
ServiceLister corelisters.ServiceLister
SubscriptionLister messaginglisters.SubscriptionLister

Prober prober.Prober
Prober prober.NewProber

IngressHost string

Expand Down Expand Up @@ -346,9 +346,8 @@ func (r *Reconciler) reconcileKind(ctx context.Context, channel *messagingv1beta
addressableStatus.Addresses = []duckv1.Addressable{httpAddress}
}

address := addressableStatus.Address.URL.URL()
proberAddressable := prober.Addressable{
Address: address,
proberAddressable := prober.NewAddressable{
AddressStatus: &addressableStatus,
ResourceKey: types.NamespacedName{
Namespace: channel.GetNamespace(),
Name: channel.GetName(),
Expand Down Expand Up @@ -429,9 +428,12 @@ func (r *Reconciler) finalizeKind(ctx context.Context, channel *messagingv1beta1
// See (under discussions KIPs, unlikely to be accepted as they are):
// - https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=181306446
// - https://cwiki.apache.org/confluence/display/KAFKA/KIP-286%3A+producer.send%28%29+should+not+block+on+metadata+update
address := receiver.Address(r.IngressHost, channel)
proberAddressable := prober.Addressable{
Address: address,
address := receiver.HTTPAddress(r.IngressHost, channel)
proberAddressable := prober.NewAddressable{
AddressStatus: &duckv1.AddressStatus{
Address: &address,
Addresses: []duckv1.Addressable{address},
},
ResourceKey: types.NamespacedName{
Namespace: channel.GetNamespace(),
Name: channel.GetName(),
Expand Down
14 changes: 7 additions & 7 deletions control-plane/pkg/reconciler/channel/channel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ func TestReconcileKind(t *testing.T) {
NewConfigMapWithBinaryData(env.DataPlaneConfigMapNamespace, env.ContractConfigMapName, nil),
},
OtherTestData: map[string]interface{}{
testProber: probertesting.MockProber(prober.StatusNotReady),
testProber: probertesting.MockNewProber(prober.StatusNotReady),
},
},
{
Expand All @@ -157,7 +157,7 @@ func TestReconcileKind(t *testing.T) {
},
WantErr: true,
OtherTestData: map[string]interface{}{
testProber: probertesting.MockProber(prober.StatusReady),
testProber: probertesting.MockNewProber(prober.StatusReady),
},
},
{
Expand Down Expand Up @@ -377,7 +377,7 @@ func TestReconcileKind(t *testing.T) {
finalizerUpdatedEvent,
},
OtherTestData: map[string]interface{}{
testProber: probertesting.MockProber(prober.StatusNotReady),
testProber: probertesting.MockNewProber(prober.StatusNotReady),
},
},
{
Expand Down Expand Up @@ -447,7 +447,7 @@ func TestReconcileKind(t *testing.T) {
finalizerUpdatedEvent,
},
OtherTestData: map[string]interface{}{
testProber: probertesting.MockProber(prober.StatusUnknown),
testProber: probertesting.MockNewProber(prober.StatusUnknown),
},
},
{
Expand Down Expand Up @@ -2048,7 +2048,7 @@ func TestFinalizeKind(t *testing.T) {
},
SkipNamespaceValidation: true, // WantCreates compare the source namespace with configmap namespace, so skip it
OtherTestData: map[string]interface{}{
testProber: probertesting.MockProber(prober.StatusNotReady),
testProber: probertesting.MockNewProber(prober.StatusNotReady),
},
},
}
Expand All @@ -2058,9 +2058,9 @@ func TestFinalizeKind(t *testing.T) {

func useTable(t *testing.T, table TableTest, env config.Env) {
table.Test(t, NewFactory(&env, func(ctx context.Context, listers *Listers, env *config.Env, row *TableRow) controller.Reconciler {
proberMock := probertesting.MockProber(prober.StatusReady)
proberMock := probertesting.MockNewProber(prober.StatusReady)
if p, ok := row.OtherTestData[testProber]; ok {
proberMock = p.(prober.Prober)
proberMock = p.(prober.NewProber)
}

var featureFlags *apisconfig.KafkaFeatureFlags
Expand Down
26 changes: 23 additions & 3 deletions control-plane/pkg/reconciler/channel/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,13 @@ package channel

import (
"context"
"net/http"

"github.com/IBM/sarama"
"go.uber.org/zap"
corev1 "k8s.io/api/core/v1"
"k8s.io/client-go/tools/cache"

"knative.dev/eventing/pkg/apis/feature"
subscriptioninformer "knative.dev/eventing/pkg/client/injection/informers/messaging/v1/subscription"

messagingv1beta "knative.dev/eventing-kafka-broker/control-plane/pkg/apis/messaging/v1beta1"
Expand Down Expand Up @@ -89,10 +89,20 @@ func NewController(ctx context.Context, watcher configmap.Watcher, configs *conf
)
}

features := feature.FromContext(ctx)
caCerts, err := reconciler.getCaCerts()
if err != nil && (features.IsStrictTransportEncryption() || features.IsPermissiveTransportEncryption()) {
// We only need to warn here as the broker won't reconcile properly without the proper certs because the prober won't succeed
logger.Warn("Failed to get CA certs when at least one address uses TLS", zap.Error(err))
}

impl := kafkachannelreconciler.NewImpl(ctx, reconciler)
IPsLister := prober.IdentityIPsLister()
reconciler.Prober = prober.NewAsync(ctx, http.DefaultClient, "", IPsLister, impl.EnqueueKey)
reconciler.IngressHost = network.GetServiceHostname(configs.IngressName, configs.SystemNamespace)
reconciler.Prober, err = prober.NewComposite(ctx, "", "", IPsLister, impl.EnqueueKey, &caCerts)
if err != nil {
logger.Fatal("Failed to create prober", zap.Error(err))
}

channelInformer := kafkachannelinformer.Get(ctx)

Expand All @@ -115,6 +125,16 @@ func NewController(ctx context.Context, watcher configmap.Watcher, configs *conf
Handler: controller.HandleAll(globalResync),
})

rotateCACerts := func(obj interface{}) {
newCerts, err := reconciler.getCaCerts()
if err != nil && (features.IsPermissiveTransportEncryption() || features.IsStrictTransportEncryption()) {
// We only need to warn here as the broker won't reconcile properly without the proper certs because the prober won't succeed
logger.Warn("Failed to get new CA certs while rotating CA certs when at least one address uses TLS", zap.Error(err))
}
reconciler.Prober.RotateRootCaCerts(&newCerts)
globalResync(obj)
}

configmapInformer.Informer().AddEventHandler(cache.FilteringResourceEventHandler{
FilterFunc: controller.FilterWithNameAndNamespace(configs.DataPlaneConfigMapNamespace, configs.ContractConfigMapName),
Handler: cache.ResourceEventHandlerFuncs{
Expand All @@ -128,7 +148,7 @@ func NewController(ctx context.Context, watcher configmap.Watcher, configs *conf
secretinformer.Get(ctx).Informer().AddEventHandler(controller.HandleAll(reconciler.Tracker.OnChanged))
secretinformer.Get(ctx).Informer().AddEventHandler(cache.FilteringResourceEventHandler{
FilterFunc: controller.FilterWithName(kafkaChannelTLSSecretName),
Handler: controller.HandleAll(globalResync),
Handler: controller.HandleAll(rotateCACerts),
})

configmapinformer.Get(ctx).Informer().AddEventHandler(controller.HandleAll(
Expand Down
17 changes: 9 additions & 8 deletions control-plane/pkg/reconciler/channel/v2/channelv2.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ type Reconciler struct {
ServiceLister corelisters.ServiceLister
SubscriptionLister messaginglisters.SubscriptionLister

Prober prober.Prober
Prober prober.NewProber

IngressHost string

Expand Down Expand Up @@ -313,10 +313,8 @@ func (r *Reconciler) ReconcileKind(ctx context.Context, channel *messagingv1beta
addressableStatus.Addresses = []duckv1.Addressable{httpAddress}
}

address := addressableStatus.Address.URL.URL()

proberAddressable := prober.Addressable{
Address: address,
proberAddressable := prober.NewAddressable{
AddressStatus: &addressableStatus,
ResourceKey: types.NamespacedName{
Namespace: channel.GetNamespace(),
Name: channel.GetName(),
Expand Down Expand Up @@ -420,9 +418,12 @@ func (r *Reconciler) FinalizeKind(ctx context.Context, channel *messagingv1beta1
// See (under discussions KIPs, unlikely to be accepted as they are):
// - https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=181306446
// - https://cwiki.apache.org/confluence/display/KAFKA/KIP-286%3A+producer.send%28%29+should+not+block+on+metadata+update
address := receiver.Address(r.IngressHost, channel)
proberAddressable := prober.Addressable{
Address: address,
address := receiver.HTTPAddress(r.IngressHost, channel)
proberAddressable := prober.NewAddressable{
AddressStatus: &duckv1.AddressStatus{
Address: &address,
Addresses: []duckv1.Addressable{address},
},
ResourceKey: types.NamespacedName{
Namespace: channel.GetNamespace(),
Name: channel.GetName(),
Expand Down
14 changes: 7 additions & 7 deletions control-plane/pkg/reconciler/channel/v2/channelv2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ func TestReconcileKind(t *testing.T) {
NewConfigMapWithBinaryData(env.DataPlaneConfigMapNamespace, env.ContractConfigMapName, nil),
},
OtherTestData: map[string]interface{}{
testProber: probertesting.MockProber(prober.StatusNotReady),
testProber: probertesting.MockNewProber(prober.StatusNotReady),
},
},
{
Expand All @@ -152,7 +152,7 @@ func TestReconcileKind(t *testing.T) {
},
WantErr: true,
OtherTestData: map[string]interface{}{
testProber: probertesting.MockProber(prober.StatusReady),
testProber: probertesting.MockNewProber(prober.StatusReady),
},
},
{
Expand Down Expand Up @@ -186,7 +186,7 @@ func TestReconcileKind(t *testing.T) {
}),
},
OtherTestData: map[string]interface{}{
testProber: probertesting.MockProber(prober.StatusNotReady),
testProber: probertesting.MockNewProber(prober.StatusNotReady),
},
},
{
Expand Down Expand Up @@ -371,7 +371,7 @@ func TestReconcileKind(t *testing.T) {
finalizerUpdatedEvent,
},
OtherTestData: map[string]interface{}{
testProber: probertesting.MockProber(prober.StatusNotReady),
testProber: probertesting.MockNewProber(prober.StatusNotReady),
},
},
{
Expand Down Expand Up @@ -424,7 +424,7 @@ func TestReconcileKind(t *testing.T) {
finalizerUpdatedEvent,
},
OtherTestData: map[string]interface{}{
testProber: probertesting.MockProber(prober.StatusUnknown),
testProber: probertesting.MockNewProber(prober.StatusUnknown),
},
},
{
Expand Down Expand Up @@ -1907,9 +1907,9 @@ func TestReconcileKind(t *testing.T) {
}

table.Test(t, NewFactory(&env, func(ctx context.Context, listers *Listers, env *config.Env, row *TableRow) controller.Reconciler {
proberMock := probertesting.MockProber(prober.StatusReady)
proberMock := probertesting.MockNewProber(prober.StatusReady)
if p, ok := row.OtherTestData[testProber]; ok {
proberMock = p.(prober.Prober)
proberMock = p.(prober.NewProber)
}

var featureFlags *apisconfig.KafkaFeatureFlags
Expand Down
24 changes: 21 additions & 3 deletions control-plane/pkg/reconciler/channel/v2/controllerv2.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,12 @@ package v2

import (
"context"
"net/http"

"github.com/IBM/sarama"
"go.uber.org/zap"
corev1 "k8s.io/api/core/v1"
"k8s.io/client-go/tools/cache"
"knative.dev/eventing/pkg/apis/feature"
serviceinformer "knative.dev/pkg/client/injection/kube/informers/core/v1/service"
"knative.dev/pkg/configmap"
"knative.dev/pkg/logging"
Expand Down Expand Up @@ -90,6 +90,12 @@ func NewController(ctx context.Context, watcher configmap.Watcher, configs *conf
)
}

features := feature.FromContext(ctx)
caCerts, err := reconciler.getCaCerts()
if err != nil && (features.IsStrictTransportEncryption() || features.IsPermissiveTransportEncryption()) {
logger.Warn("Failed to get CA certs when at least one address uses TLS", zap.Error(err))
}

impl := kafkachannelreconciler.NewImpl(ctx, reconciler)

kafkaConfigStore := apisconfig.NewStore(ctx, func(name string, value *apisconfig.KafkaFeatureFlags) {
Expand All @@ -99,15 +105,27 @@ func NewController(ctx context.Context, watcher configmap.Watcher, configs *conf
kafkaConfigStore.WatchConfigs(watcher)

IPsLister := prober.IdentityIPsLister()
reconciler.Prober = prober.NewAsync(ctx, http.DefaultClient, "", IPsLister, impl.EnqueueKey)
reconciler.IngressHost = network.GetServiceHostname(configs.IngressName, configs.SystemNamespace)
reconciler.Resolver = resolver.NewURIResolverFromTracker(ctx, impl.Tracker)
reconciler.Prober, err = prober.NewComposite(ctx, "", "", IPsLister, impl.EnqueueKey, &caCerts)
if err != nil {
logger.Fatal("Failed to create prober", zap.Error(err))
}

rotateCACerts := func(obj interface{}) {
newCerts, err := reconciler.getCaCerts()
if err != nil && (features.IsPermissiveTransportEncryption() || features.IsStrictTransportEncryption()) {
// We only need to warn here as the broker won't reconcile properly without the proper certs because the prober won't succeed
logger.Warn("Failed to get new CA certs while rotating CA certs when at least one address uses TLS", zap.Error(err))
}
reconciler.Prober.RotateRootCaCerts(&newCerts)
consumergroup.Enqueue("kafkachannel", impl.EnqueueKey)
}
reconciler.Tracker = impl.Tracker
secretinformer.Get(ctx).Informer().AddEventHandler(controller.HandleAll(reconciler.Tracker.OnChanged))
secretinformer.Get(ctx).Informer().AddEventHandler(cache.FilteringResourceEventHandler{
FilterFunc: controller.FilterWithName(kafkaChannelTLSSecretName),
Handler: controller.HandleAll(consumergroup.Enqueue("kafkachannel", impl.EnqueueKey)),
Handler: controller.HandleAll(rotateCACerts),
})

reconciler.Tracker = impl.Tracker
Expand Down