Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[release-v1.14] Allow enabling sarama logging and disabling client pool and Upgrade knative.dev/eventing to latest 1.14 #1279

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 14 additions & 1 deletion control-plane/cmd/kafka-controller/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,10 @@ package main
import (
"context"
"log"
"os"
"strings"

"github.com/IBM/sarama"
filteredFactory "knative.dev/pkg/client/injection/kube/informers/factory/filtered"
"knative.dev/pkg/configmap"
"knative.dev/pkg/controller"
Expand Down Expand Up @@ -67,8 +70,18 @@ func main() {
ctx = filteredFactory.WithSelectors(ctx,
eventingtls.TrustBundleLabelSelector,
auth.OIDCLabelSelector,
"app.kubernetes.io/kind=kafka-dispatcher",
)
ctx = clientpool.WithKafkaClientPool(ctx)

if v := os.Getenv("ENABLE_SARAMA_LOGGER"); strings.EqualFold(v, "true") {
sarama.Logger = log.New(os.Stdout, "[sarama] ", log.LstdFlags|log.Llongfile)
}
if v := os.Getenv("ENABLE_SARAMA_DEBUG_LOGGER"); strings.EqualFold(v, "true") {
sarama.DebugLogger = log.New(os.Stdout, "[sarama][debug] ", log.LstdFlags|log.Llongfile)
}
if v := os.Getenv("ENABLE_SARAMA_CLIENT_POOL"); v == "" || strings.EqualFold(v, "true") {
ctx = clientpool.WithKafkaClientPool(ctx)
}

sharedmain.MainNamed(ctx, component,

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,12 @@ spec:
valueFrom:
fieldRef:
fieldPath: metadata.name
- name: ENABLE_SARAMA_LOGGER
value: "false"
- name: ENABLE_SARAMA_DEBUG_LOGGER
value: "false"
- name: ENABLE_SARAMA_CLIENT_POOL
value: "true"

ports:
- containerPort: 9090
Expand Down
28 changes: 25 additions & 3 deletions control-plane/pkg/kafka/clientpool/clientpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,11 @@ import (
"go.uber.org/zap"

corev1 "k8s.io/api/core/v1"
"knative.dev/pkg/logging"

"knative.dev/eventing-kafka-broker/control-plane/pkg/kafka"
"knative.dev/eventing-kafka-broker/control-plane/pkg/prober"
"knative.dev/eventing-kafka-broker/control-plane/pkg/security"
"knative.dev/pkg/logging"
)

type KafkaClientKey struct{}
Expand Down Expand Up @@ -63,8 +64,21 @@ type ClientPool struct {
}

type GetKafkaClientFunc func(ctx context.Context, bootstrapServers []string, secret *corev1.Secret) (sarama.Client, error)

type GetKafkaClusterAdminFunc func(ctx context.Context, bootstrapServers []string, secret *corev1.Secret) (sarama.ClusterAdmin, error)

func DisabledGetKafkaClusterAdminFunc(_ context.Context, bootstrapServers []string, secret *corev1.Secret) (sarama.ClusterAdmin, error) {
c, err := makeSaramaClient(bootstrapServers, secret, sarama.NewClient)
if err != nil {
return nil, err
}
return sarama.NewClusterAdminFromClient(c)
}

func DisabledGetClient(_ context.Context, bootstrapServers []string, secret *corev1.Secret) (sarama.Client, error) {
return makeSaramaClient(bootstrapServers, secret, sarama.NewClient)
}

func (cp *ClientPool) GetClient(ctx context.Context, bootstrapServers []string, secret *corev1.Secret) (sarama.Client, error) {
client, err := cp.getClient(ctx, bootstrapServers, secret)
if err != nil {
Expand Down Expand Up @@ -141,7 +155,11 @@ func (cp *ClientPool) GetClusterAdmin(ctx context.Context, bootstrapServers []st
}

func Get(ctx context.Context) *ClientPool {
return ctx.Value(ctxKey).(*ClientPool)
v := ctx.Value(ctxKey)
if v == nil {
return nil
}
return v.(*ClientPool)
}

func makeClusterAdminKey(bootstrapServers []string, secret *corev1.Secret) clientKey {
Expand All @@ -162,6 +180,10 @@ func makeClusterAdminKey(bootstrapServers []string, secret *corev1.Secret) clien
}

func (cp *ClientPool) makeSaramaClient(bootstrapServers []string, secret *corev1.Secret) (sarama.Client, error) {
return makeSaramaClient(bootstrapServers, secret, cp.newSaramaClient)
}

func makeSaramaClient(bootstrapServers []string, secret *corev1.Secret, newSaramaClient kafka.NewClientFunc) (sarama.Client, error) {
secretOpt, err := security.NewSaramaSecurityOptionFromSecret(secret)
if err != nil {
return nil, err
Expand All @@ -172,7 +194,7 @@ func (cp *ClientPool) makeSaramaClient(bootstrapServers []string, secret *corev1
return nil, err
}

saramaClient, err := cp.newSaramaClient(bootstrapServers, config)
saramaClient, err := newSaramaClient(bootstrapServers, config)
if err != nil {
return nil, err
}
Expand Down
18 changes: 11 additions & 7 deletions control-plane/pkg/reconciler/broker/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,6 @@ func NewController(ctx context.Context, watcher configmap.Watcher, env *config.E
configmapInformer := configmapinformer.Get(ctx)
featureFlags := apisconfig.DefaultFeaturesConfig()

clientPool := clientpool.Get(ctx)

reconciler := &Reconciler{
Reconciler: &base.Reconciler{
KubeClient: kubeclient.Get(ctx),
Expand All @@ -79,11 +77,17 @@ func NewController(ctx context.Context, watcher configmap.Watcher, env *config.E
DispatcherLabel: base.BrokerDispatcherLabel,
ReceiverLabel: base.BrokerReceiverLabel,
},
GetKafkaClusterAdmin: clientPool.GetClusterAdmin,
ConfigMapLister: configmapInformer.Lister(),
Env: env,
Counter: counter.NewExpiringCounter(ctx),
KafkaFeatureFlags: featureFlags,
ConfigMapLister: configmapInformer.Lister(),
Env: env,
Counter: counter.NewExpiringCounter(ctx),
KafkaFeatureFlags: featureFlags,
}

clientPool := clientpool.Get(ctx)
if clientPool == nil {
reconciler.GetKafkaClusterAdmin = clientpool.DisabledGetKafkaClusterAdminFunc
} else {
reconciler.GetKafkaClusterAdmin = clientPool.GetClusterAdmin
}

logger := logging.FromContext(ctx)
Expand Down
10 changes: 7 additions & 3 deletions control-plane/pkg/reconciler/broker/namespaced_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,6 @@ func NewNamespacedController(ctx context.Context, watcher configmap.Watcher, env
logger.Fatal("unable to create Manifestival client-go client", zap.Error(err))
}

clientPool := clientpool.Get(ctx)

reconciler := &NamespacedReconciler{
Reconciler: &base.Reconciler{
KubeClient: kubeclient.Get(ctx),
Expand All @@ -103,7 +101,6 @@ func NewNamespacedController(ctx context.Context, watcher configmap.Watcher, env
DispatcherLabel: base.BrokerDispatcherLabel,
ReceiverLabel: base.BrokerReceiverLabel,
},
GetKafkaClusterAdmin: clientPool.GetClusterAdmin,
NamespaceLister: namespaceinformer.Get(ctx).Lister(),
ConfigMapLister: configmapInformer.Lister(),
ServiceAccountLister: serviceaccountinformer.Get(ctx).Lister(),
Expand All @@ -119,6 +116,13 @@ func NewNamespacedController(ctx context.Context, watcher configmap.Watcher, env
KafkaFeatureFlags: apisconfig.DefaultFeaturesConfig(),
}

clientPool := clientpool.Get(ctx)
if clientPool == nil {
reconciler.GetKafkaClusterAdmin = clientpool.DisabledGetKafkaClusterAdminFunc
} else {
reconciler.GetKafkaClusterAdmin = clientPool.GetClusterAdmin
}

impl := brokerreconciler.NewImpl(ctx, reconciler, kafka.NamespacedBrokerClass, func(impl *controller.Impl) controller.Options {
return controller.Options{PromoteFilterFunc: kafka.NamespacedBrokerClassFilter()}
})
Expand Down
39 changes: 22 additions & 17 deletions control-plane/pkg/reconciler/channel/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,20 +31,20 @@ import (
"knative.dev/eventing/pkg/apis/feature"
subscriptioninformer "knative.dev/eventing/pkg/client/injection/informers/messaging/v1/subscription"

messagingv1beta "knative.dev/eventing-kafka-broker/control-plane/pkg/apis/messaging/v1beta1"
kafkachannelinformer "knative.dev/eventing-kafka-broker/control-plane/pkg/client/injection/informers/messaging/v1beta1/kafkachannel"
kafkachannelreconciler "knative.dev/eventing-kafka-broker/control-plane/pkg/client/injection/reconciler/messaging/v1beta1/kafkachannel"
"knative.dev/eventing-kafka-broker/control-plane/pkg/kafka"
"knative.dev/eventing-kafka-broker/control-plane/pkg/kafka/clientpool"
"knative.dev/eventing-kafka-broker/control-plane/pkg/kafka/offset"

kubeclient "knative.dev/pkg/client/injection/kube/client"
configmapinformer "knative.dev/pkg/client/injection/kube/informers/core/v1/configmap"
podinformer "knative.dev/pkg/client/injection/kube/informers/core/v1/pod"
secretinformer "knative.dev/pkg/client/injection/kube/informers/core/v1/secret"
serviceinformer "knative.dev/pkg/client/injection/kube/informers/core/v1/service"
"knative.dev/pkg/configmap"

messagingv1beta "knative.dev/eventing-kafka-broker/control-plane/pkg/apis/messaging/v1beta1"
kafkachannelinformer "knative.dev/eventing-kafka-broker/control-plane/pkg/client/injection/informers/messaging/v1beta1/kafkachannel"
kafkachannelreconciler "knative.dev/eventing-kafka-broker/control-plane/pkg/client/injection/reconciler/messaging/v1beta1/kafkachannel"
"knative.dev/eventing-kafka-broker/control-plane/pkg/kafka"
"knative.dev/eventing-kafka-broker/control-plane/pkg/kafka/clientpool"
"knative.dev/eventing-kafka-broker/control-plane/pkg/kafka/offset"

"knative.dev/pkg/controller"
"knative.dev/pkg/logging"
"knative.dev/pkg/network"
Expand All @@ -63,8 +63,6 @@ func NewController(ctx context.Context, watcher configmap.Watcher, configs *conf
configmapInformer := configmapinformer.Get(ctx)
serviceInformer := serviceinformer.Get(ctx)

clientPool := clientpool.Get(ctx)

reconciler := &Reconciler{
Reconciler: &base.Reconciler{
KubeClient: kubeclient.Get(ctx),
Expand All @@ -77,14 +75,21 @@ func NewController(ctx context.Context, watcher configmap.Watcher, configs *conf
DispatcherLabel: base.ChannelDispatcherLabel,
ReceiverLabel: base.ChannelReceiverLabel,
},
SubscriptionLister: subscriptioninformer.Get(ctx).Lister(),
GetKafkaClient: clientPool.GetClient,
GetKafkaClusterAdmin: clientPool.GetClusterAdmin,
InitOffsetsFunc: offset.InitOffsets,
Env: configs,
ConfigMapLister: configmapInformer.Lister(),
ServiceLister: serviceInformer.Lister(),
KafkaFeatureFlags: apisconfig.DefaultFeaturesConfig(),
Env: configs,
InitOffsetsFunc: offset.InitOffsets,
ConfigMapLister: configmapInformer.Lister(),
ServiceLister: serviceinformer.Get(ctx).Lister(),
SubscriptionLister: subscriptioninformer.Get(ctx).Lister(),
KafkaFeatureFlags: apisconfig.DefaultFeaturesConfig(),
}

clientPool := clientpool.Get(ctx)
if clientPool == nil {
reconciler.GetKafkaClient = clientpool.DisabledGetClient
reconciler.GetKafkaClusterAdmin = clientpool.DisabledGetKafkaClusterAdminFunc
} else {
reconciler.GetKafkaClient = clientPool.GetClient
reconciler.GetKafkaClusterAdmin = clientPool.GetClusterAdmin
}

logger := logging.FromContext(ctx)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -446,7 +446,7 @@ func (r *Reconciler) schedule(ctx context.Context, cg *kafkainternals.ConsumerGr
return cg.MarkScheduleConsumerFailed("Schedule", err)
}

placements, err := statefulSetScheduler.Schedule(cg)
placements, err := statefulSetScheduler.Schedule(ctx, cg)
if err != nil {
return cg.MarkScheduleConsumerFailed("Schedule", err)
}
Expand Down
Loading