diff --git a/control-plane/pkg/reconciler/trigger/controller.go b/control-plane/pkg/reconciler/trigger/controller.go index 2243f1ef8f..3b6545303a 100644 --- a/control-plane/pkg/reconciler/trigger/controller.go +++ b/control-plane/pkg/reconciler/trigger/controller.go @@ -121,6 +121,21 @@ func NewController(ctx context.Context, watcher configmap.Watcher, configs *conf impl.FilteredGlobalResync(filterTriggers(reconciler.BrokerLister, kafka.BrokerClass, FinalizerName), triggerInformer.Informer()) } + kafkaConfigStore := apisconfig.NewStore(ctx, func(name string, value *apisconfig.KafkaFeatureFlags) { + reconciler.KafkaFeatureFlags.Reset(value) + if globalResync != nil { + globalResync(nil) + } + }) + kafkaConfigStore.WatchConfigs(watcher) + + featureStore := feature.NewStore(logging.FromContext(ctx).Named("feature-config-store"), func(name string, value interface{}) { + if globalResync != nil { + globalResync(nil) + } + }) + featureStore.WatchConfigs(watcher) + configmapInformer.Informer().AddEventHandler(cache.FilteringResourceEventHandler{ FilterFunc: controller.FilterWithNameAndNamespace(configs.DataPlaneConfigMapNamespace, configs.ContractConfigMapName), Handler: cache.ResourceEventHandlerFuncs{ diff --git a/control-plane/pkg/reconciler/trigger/controller_test.go b/control-plane/pkg/reconciler/trigger/controller_test.go index 36edb063a6..31fba44541 100644 --- a/control-plane/pkg/reconciler/trigger/controller_test.go +++ b/control-plane/pkg/reconciler/trigger/controller_test.go @@ -46,6 +46,10 @@ func TestNewController(t *testing.T) { ObjectMeta: metav1.ObjectMeta{ Name: "config-features", }, + }, &corev1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Name: "config-kafka-features", + }, }), &config.Env{}) if controller == nil { t.Error("failed to create controller: ") diff --git a/control-plane/pkg/reconciler/trigger/namespaced_controller.go b/control-plane/pkg/reconciler/trigger/namespaced_controller.go index 7694edb804..f0e6a1e4de 100644 --- a/control-plane/pkg/reconciler/trigger/namespaced_controller.go +++ b/control-plane/pkg/reconciler/trigger/namespaced_controller.go @@ -20,6 +20,7 @@ import ( "context" "github.com/IBM/sarama" + apisconfig "knative.dev/eventing-kafka-broker/control-plane/pkg/apis/config" "knative.dev/eventing-kafka-broker/control-plane/pkg/kafka/offset" "k8s.io/client-go/tools/cache" @@ -81,6 +82,7 @@ func NewNamespacedController(ctx context.Context, watcher configmap.Watcher, con NewKafkaClient: sarama.NewClient, NewKafkaClusterAdminClient: sarama.NewClusterAdmin, InitOffsetsFunc: offset.InitOffsets, + KafkaFeatureFlags: apisconfig.DefaultFeaturesConfig(), } impl := triggerreconciler.NewImpl(ctx, reconciler, func(impl *controller.Impl) controller.Options { @@ -111,6 +113,21 @@ func NewNamespacedController(ctx context.Context, watcher configmap.Watcher, con impl.GlobalResync(brokerInformer.Informer()) } + kafkaConfigStore := apisconfig.NewStore(ctx, func(name string, value *apisconfig.KafkaFeatureFlags) { + reconciler.KafkaFeatureFlags.Reset(value) + if globalResync != nil { + globalResync(nil) + } + }) + kafkaConfigStore.WatchConfigs(watcher) + + featureStore := feature.NewStore(logging.FromContext(ctx).Named("feature-config-store"), func(name string, value interface{}) { + if globalResync != nil { + globalResync(nil) + } + }) + featureStore.WatchConfigs(watcher) + configmapInformer.Informer().AddEventHandler(cache.FilteringResourceEventHandler{ FilterFunc: kafka.FilterWithLabel(kafka.NamespacedBrokerDataplaneLabelKey, kafka.NamespacedBrokerDataplaneLabelValue), Handler: cache.ResourceEventHandlerFuncs{ diff --git a/control-plane/pkg/reconciler/trigger/namespaced_controller_test.go b/control-plane/pkg/reconciler/trigger/namespaced_controller_test.go index 0907bd11bf..4f1a540dd9 100644 --- a/control-plane/pkg/reconciler/trigger/namespaced_controller_test.go +++ b/control-plane/pkg/reconciler/trigger/namespaced_controller_test.go @@ -42,6 +42,10 @@ func TestNewNamespacedController(t *testing.T) { ObjectMeta: metav1.ObjectMeta{ Name: "config-features", }, + }, &corev1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Name: "config-kafka-features", + }, }), &config.Env{}) if controller == nil { t.Error("failed to create controller: ") diff --git a/control-plane/pkg/reconciler/trigger/namespaced_trigger.go b/control-plane/pkg/reconciler/trigger/namespaced_trigger.go index dc09aa8891..c9768f4542 100644 --- a/control-plane/pkg/reconciler/trigger/namespaced_trigger.go +++ b/control-plane/pkg/reconciler/trigger/namespaced_trigger.go @@ -47,6 +47,7 @@ type NamespacedReconciler struct { NewKafkaClusterAdminClient kafka.NewClusterAdminClientFunc NewKafkaClient kafka.NewClientFunc InitOffsetsFunc kafka.InitOffsetsFunc + KafkaFeatureFlags *apisconfig.KafkaFeatureFlags } func (r *NamespacedReconciler) ReconcileKind(ctx context.Context, trigger *eventing.Trigger) reconciler.Event { @@ -89,7 +90,7 @@ func (r *NamespacedReconciler) createReconcilerForTriggerInstance(trigger *event // override BrokerClass: kafka.NamespacedBrokerClass, DataPlaneConfigMapLabeler: kafka.NamespacedDataplaneLabelConfigmapOption, - KafkaFeatureFlags: apisconfig.DefaultFeaturesConfig(), + KafkaFeatureFlags: r.KafkaFeatureFlags, NewKafkaClusterAdminClient: r.NewKafkaClusterAdminClient, NewKafkaClient: r.NewKafkaClient, InitOffsetsFunc: r.InitOffsetsFunc, diff --git a/control-plane/pkg/reconciler/trigger/namespaced_trigger_test.go b/control-plane/pkg/reconciler/trigger/namespaced_trigger_test.go index 35323102a8..26822c1d97 100644 --- a/control-plane/pkg/reconciler/trigger/namespaced_trigger_test.go +++ b/control-plane/pkg/reconciler/trigger/namespaced_trigger_test.go @@ -38,6 +38,7 @@ import ( triggerreconciler "knative.dev/eventing/pkg/client/injection/reconciler/eventing/v1/trigger" reconcilertesting "knative.dev/eventing/pkg/reconciler/testing/v1" + apisconfig "knative.dev/eventing-kafka-broker/control-plane/pkg/apis/config" "knative.dev/eventing-kafka-broker/control-plane/pkg/config" "knative.dev/eventing-kafka-broker/control-plane/pkg/contract" kafkatesting "knative.dev/eventing-kafka-broker/control-plane/pkg/kafka/testing" @@ -196,6 +197,7 @@ func useNamespacedTable(t *testing.T, table TableTest, env *config.Env) { T: t, }, nil }, + KafkaFeatureFlags: apisconfig.DefaultFeaturesConfig(), } reconciler.Resolver = resolver.NewURIResolverFromTracker(ctx, tracker.New(func(name types.NamespacedName) {}, 0)) diff --git a/test/config-kafka-features/new-cg-id.yaml b/test/config-kafka-features/new-cg-id.yaml new file mode 100644 index 0000000000..be8acade61 --- /dev/null +++ b/test/config-kafka-features/new-cg-id.yaml @@ -0,0 +1,26 @@ +# Copyright 2024 The Knative Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +apiVersion: v1 +kind: ConfigMap +metadata: + name: config-kafka-features + namespace: knative-eventing +data: + dispatcher-rate-limiter: "disabled" + dispatcher-ordered-executor-metrics: "disabled" + controller-autoscaler-keda: "disabled" + triggers-consumergroup-template: "test-{{ .Namespace }}-{{ .Name }}-trigger" + brokers-topic-template: "knative-broker-{{ .Namespace }}-{{ .Name }}" + channels-topic-template: "knative-messaging-kafka.{{ .Namespace }}.{{ .Name }}" diff --git a/test/config-kafka-features/restore-cg-id.yaml b/test/config-kafka-features/restore-cg-id.yaml new file mode 100644 index 0000000000..a9d74be0e1 --- /dev/null +++ b/test/config-kafka-features/restore-cg-id.yaml @@ -0,0 +1,26 @@ +# Copyright 2024 The Knative Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +apiVersion: v1 +kind: ConfigMap +metadata: + name: config-kafka-features + namespace: knative-eventing +data: + dispatcher-rate-limiter: "disabled" + dispatcher-ordered-executor-metrics: "disabled" + controller-autoscaler-keda: "disabled" + triggers-consumergroup-template: "knative-trigger-{{ .Namespace }}-{{ .Name }}" + brokers-topic-template: "knative-broker-{{ .Namespace }}-{{ .Name }}" + channels-topic-template: "knative-messaging-kafka.{{ .Namespace }}.{{ .Name }}" diff --git a/test/e2e_new/broker_sasl_ssl_test.go b/test/e2e_new/broker_sasl_ssl_test.go index da8c45f18b..f0a195cd47 100644 --- a/test/e2e_new/broker_sasl_ssl_test.go +++ b/test/e2e_new/broker_sasl_ssl_test.go @@ -113,6 +113,18 @@ func TestRestrictedBrokerAuthSslSaslScram512(t *testing.T) { env.Test(ctx, t, features.SetupBrokerAuthRestrictedSslSaslScram512(ctx)) } +func TestTriggerUsesConsumerGroupIDFromTemplate(t *testing.T) { + ctx, env := global.Environment( + knative.WithKnativeNamespace(system.Namespace()), + knative.WithLoggingConfig, + knative.WithTracingConfig, + k8s.WithEventListener, + environment.Managed(t), + ) + + env.Test(ctx, t, features.TriggerUsesConsumerGroupIDTemplate()) +} + func TestBrokerNotReadyWithoutAuthSecret(t *testing.T) { t.Parallel() diff --git a/test/reconciler-tests.sh b/test/reconciler-tests.sh index 57fbab16ef..667e9151a4 100755 --- a/test/reconciler-tests.sh +++ b/test/reconciler-tests.sh @@ -48,6 +48,14 @@ go_test_e2e -tags=e2e,cloudevents -timeout=1h ./test/e2e_new_channel/... || fail go_test_e2e -tags=deletecm ./test/e2e_new/... || fail_test "E2E (new deletecm) suite failed" +echo "Running E2E Reconciler tests with consumergroup id template changed" + +kubectl apply -f "$(dirname "$0")/config-kafka-features/new-cg-id.yaml" + +go_test_e2e -tags=e2e -timeout=15m ./test/e2e_new -run TestTriggerUsesConsumerGroupIDFromTemplate + +kubectl apply -f "$(dirname "$0")/config-kafka-features/restore-cg-id.yaml" + echo "Running E2E Reconciler Tests with strict transport encryption" kubectl apply -Rf "$(dirname "$0")/config-transport-encryption" diff --git a/test/rekt/features/broker_auth.go b/test/rekt/features/broker_auth.go index 39e9959fa5..e33d5c867d 100644 --- a/test/rekt/features/broker_auth.go +++ b/test/rekt/features/broker_auth.go @@ -17,15 +17,27 @@ package features import ( + "bytes" "context" + "text/template" "time" + kubeclient "knative.dev/pkg/client/injection/kube/client" + "knative.dev/pkg/system" + + "knative.dev/eventing-kafka-broker/control-plane/pkg/kafka" + "knative.dev/reconciler-test/pkg/environment" + "knative.dev/reconciler-test/pkg/resources/service" + "github.com/cloudevents/sdk-go/v2/test" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "github.com/google/uuid" testpkg "knative.dev/eventing-kafka-broker/test/pkg" "knative.dev/eventing-kafka-broker/test/rekt/resources/kafkaauthsecret" + triggersclient "knative.dev/eventing/pkg/client/injection/client" "knative.dev/eventing/test/rekt/resources/broker" "knative.dev/eventing/test/rekt/resources/trigger" @@ -33,7 +45,6 @@ import ( "knative.dev/reconciler-test/pkg/eventshub/assert" "knative.dev/reconciler-test/pkg/feature" "knative.dev/reconciler-test/pkg/manifest" - "knative.dev/reconciler-test/resources/svc" brokerconfigmap "knative.dev/eventing-kafka-broker/test/rekt/resources/configmap/broker" @@ -142,3 +153,72 @@ func BrokerNotReadyWithoutAuthSecret() *feature.Feature { return f } + +func TriggerUsesConsumerGroupIDTemplate() *feature.Feature { + f := feature.NewFeature() + + brokerName := feature.MakeRandomK8sName("broker") + triggerName := feature.MakeRandomK8sName("trigger") + sinkName := feature.MakeRandomK8sName("sink") + + f.Setup("install broker", broker.Install(brokerName)) + f.Setup("install sink", eventshub.Install(sinkName, eventshub.StartReceiver)) + + f.Setup("broker is ready", broker.IsReady(brokerName)) + f.Setup("broker is addressable", broker.IsAddressable(brokerName)) + + f.Requirement("install trigger", trigger.Install(triggerName, brokerName, trigger.WithSubscriber(service.AsKReference(sinkName), ""))) + f.Requirement("trigger is ready", trigger.IsReady(triggerName)) + + // check that the trigger has the correct annotation + f.Assert("trigger has correct consumergroup template", checkTriggerConsumerGroupIDAnnotation(triggerName)) + + return f +} + +func checkTriggerConsumerGroupIDAnnotation(triggerName string) feature.StepFn { + return func(ctx context.Context, t feature.T) { + ns := environment.FromContext(ctx).Namespace() + + cm, err := kubeclient.Get(ctx).CoreV1().ConfigMaps(system.Namespace()).Get(ctx, "config-kafka-features", metav1.GetOptions{}) + if err != nil { + t.Fatal(err) + } + + expectedTemplateStr, ok := cm.Data["triggers-consumergroup-template"] + if !ok { + // there are two keys the value could be in + expectedTemplateStr, ok = cm.Data["triggers.consumergroup.template"] + if !ok { + t.Fatal("no consumergroup template in config-kafka-features") + } + } + + expectedTemplate, err := template.New("consumergroup-id").Parse(expectedTemplateStr) + if err != nil { + t.Fatal(err) + } + + trig, err := triggersclient.Get(ctx).EventingV1().Triggers(ns).Get(ctx, triggerName, metav1.GetOptions{}) + if err != nil { + t.Fatal(err) + } + + var expectedBytes bytes.Buffer + err = expectedTemplate.Execute(&expectedBytes, trig.ObjectMeta) + if err != nil { + t.Fatal(err) + } + + expectedAnnotation := expectedBytes.String() + + cgAnnotation, ok := trig.Status.Annotations[kafka.GroupIdAnnotation] + if !ok { + t.Fatal("no consumer group annotation present on the trigger") + } + + if cgAnnotation != expectedAnnotation { + t.Fatalf("consumer group id annotation was not equal to expected value. expected %s, got %s", expectedAnnotation, cgAnnotation) + } + } +}