Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
0d5d695
progress save
Leo6Leo Aug 15, 2023
216b605
Merge main branch
Leo6Leo Aug 21, 2023
204c66b
add the new cert rotation test
Leo6Leo Aug 21, 2023
9d85eed
Update the rekt test
Leo6Leo Aug 21, 2023
ae3403a
Format the files
Leo6Leo Aug 21, 2023
049d830
workspace save
Leo6Leo Aug 23, 2023
a38f82f
Merge branch 'main' into rekt-test-_for_TLS_key_pair_rotation
Leo6Leo Aug 23, 2023
a43d93b
maven
Leo6Leo Aug 23, 2023
2c526f2
Merge branch 'main' into rekt-test-_for_TLS_key_pair_rotation
Leo6Leo Sep 7, 2023
72f5b72
Clean up
Leo6Leo Sep 13, 2023
041293f
Clean up
Leo6Leo Sep 14, 2023
1225253
Format
Leo6Leo Sep 14, 2023
e1e581c
Merge branch 'main' into rekt-test-_for_TLS_key_pair_rotation
Leo6Leo Sep 29, 2023
f7c1392
Fix the rekt test CA cert issue on Source
Leo6Leo Oct 2, 2023
1a65cc9
Format fix
Leo6Leo Oct 2, 2023
72b0723
Re-order the prerequisite
Leo6Leo Oct 2, 2023
f5aacb0
Enable the strict transportation mode in TLS rekt tests
Leo6Leo Oct 2, 2023
54d1f04
Fix the reviewDog comment
Leo6Leo Oct 2, 2023
52188f6
Update test/e2e_new/broker_eventing_tls_test.go
Leo6Leo Oct 3, 2023
93f3884
Fix the format of the shell script
Leo6Leo Oct 3, 2023
4e87518
Fix the format of the shell script
Leo6Leo Oct 3, 2023
0686a45
Update the Strict feature flag
Leo6Leo Oct 3, 2023
1ce6ca0
Merge branch 'main' into rekt-test-_for_TLS_key_pair_rotation
Leo6Leo Oct 4, 2023
f48b148
Add the code to inject the feature flag to the context in the broker …
Leo6Leo Oct 4, 2023
e975fc0
Run go import
Leo6Leo Oct 4, 2023
9fd2fdd
Fix the controller tests
Leo6Leo Oct 4, 2023
83c4714
Merge branch 'main' into rekt-test-_for_TLS_key_pair_rotation
Leo6Leo Oct 6, 2023
36c265b
Set the default value to all other feature flags when we are updating…
Leo6Leo Oct 6, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion control-plane/pkg/reconciler/broker/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,15 +88,21 @@ func NewController(ctx context.Context, watcher configmap.Watcher, env *config.E
)
}

featureStore := feature.NewStore(logging.FromContext(ctx).Named("feature-config-store"))
featureStore.WatchConfigs(watcher)

impl := brokerreconciler.NewImpl(ctx, reconciler, kafka.BrokerClass, func(impl *controller.Impl) controller.Options {
return controller.Options{PromoteFilterFunc: kafka.BrokerClassFilter()}
return controller.Options{
ConfigStore: featureStore,
PromoteFilterFunc: kafka.BrokerClassFilter()}
})

reconciler.Resolver = resolver.NewURIResolverFromTracker(ctx, impl.Tracker)
IPsLister := prober.IPsListerFromService(types.NamespacedName{Namespace: reconciler.DataPlaneNamespace, Name: env.IngressName})

features := feature.FromContext(ctx)
caCerts, err := reconciler.getCaCerts()

if err != nil && (features.IsStrictTransportEncryption() || features.IsPermissiveTransportEncryption()) {
// We only need to warn here as the broker won't reconcile properly without the proper certs because the prober won't succeed
logger.Warn("Failed to get CA certs when at least one address uses TLS", zap.Error(err))
Expand Down
4 changes: 4 additions & 0 deletions control-plane/pkg/reconciler/broker/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,10 @@ func TestNewController(t *testing.T) {
ObjectMeta: metav1.ObjectMeta{
Name: apisconfig.FlagsConfigName,
},
}, &corev1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Name: "config-features",
},
}),
env,
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,35 +62,42 @@ public class Metrics {
public static final boolean DISABLE_KAFKA_CLIENTS_METRICS =
Boolean.parseBoolean(System.getenv("DISABLE_KAFKA_CLIENTS_METRICS"));

// There are different thread polls usable, mainly, each with its own drawbacks for our use case:
// - cached thread pools
// - fixed thread pools
// There are different thread polls usable, mainly, each with its own drawbacks
// for our use case:
// - cached thread pools
// - fixed thread pools
//
// A cached thread might grow unbounded and since creating, updating and deleting resources
// trigger the usage of this executor, a bad actor might start continuously creating, updating
// A cached thread might grow unbounded and since creating, updating and
// deleting resources
// trigger the usage of this executor, a bad actor might start continuously
// creating, updating
// and deleting resources which will cause resource exhaustion.
//
// A fixed thread poll doesn't give the best possible latency for every resource, but it's
// A fixed thread poll doesn't give the best possible latency for every
// resource, but it's
// bounded, so we keep the resource usage under control.
// We might want to provide configs to make it bigger than a single thread but a single thread
// We might want to provide configs to make it bigger than a single thread but a
// single thread
// to start with is good enough for now.
public static final ExecutorService meterBinderExecutor = Executors.newSingleThreadExecutor();

static {
Runtime.getRuntime().addShutdownHook(new Thread(meterBinderExecutor::shutdown));
}

// Micrometer employs a naming convention that separates lowercase words with a '.' (dot) character.
// Different monitoring systems have different recommendations regarding naming convention, and some naming
// Micrometer employs a naming convention that separates lowercase words with a
// '.' (dot) character.
// Different monitoring systems have different recommendations regarding naming
// convention, and some naming
// conventions may be incompatible for one system and not another.
// Each Micrometer implementation for a monitoring system comes with a naming convention that transforms lowercase
// Each Micrometer implementation for a monitoring system comes with a naming
// convention that transforms lowercase
// dot notation names to the monitoring system’s recommended naming convention.
// Additionally, this naming convention implementation sanitizes metric names and tags of special characters that
// Additionally, this naming convention implementation sanitizes metric names
// and tags of special characters that
// are disallowed by the monitoring system.

/**
* In prometheus format --> http_events_sent_total
*/
/** In prometheus format --> http_events_sent_total */
public static final String HTTP_EVENTS_SENT_COUNT = "http.events.sent";

/**
Expand Down Expand Up @@ -221,8 +228,8 @@ public static MeterRegistry getRegistry() {
* Register the given consumer to the global meter registry.
*
* @param consumer consumer to bind to the global registry.
* @param <K> Record key type.
* @param <V> Record value type.
* @param <K> Record key type.
* @param <V> Record value type.
* @return A meter binder to close once the consumer is closed.
*/
public static <K, V> AsyncCloseable register(final Consumer<K, V> consumer) {
Expand All @@ -233,8 +240,8 @@ public static <K, V> AsyncCloseable register(final Consumer<K, V> consumer) {
* Register the given producer to the global meter registry.
*
* @param producer Consumer to bind to the global registry.
* @param <K> Record key type.
* @param <V> Record value type.
* @param <K> Record key type.
* @param <V> Record value type.
* @return A meter binder to close once the producer is closed.
*/
public static <K, V> AsyncCloseable register(final Producer<K, V> producer) {
Expand Down Expand Up @@ -270,7 +277,8 @@ private static AsyncCloseable register(final Supplier<KafkaClientMetrics> metric
};

} catch (final RejectedExecutionException ex) {
// if this task cannot be accepted for execution when the executor has been shutdown.
// if this task cannot be accepted for execution when the executor has been
// shutdown.
logger.warn("Failed to bind metrics for Kafka client", ex);
}
}
Expand Down
32 changes: 32 additions & 0 deletions test/config-transport-encryption/features.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
# Copyright 2021 The Knative Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

apiVersion: v1
kind: ConfigMap
metadata:
name: config-features
namespace: knative-eventing
labels:
knative.dev/config-propagation: original
knative.dev/config-category: eventing
data:
authentication.oidc: "disabled"
delivery-retryafter: "disabled"
delivery-timeout: "enabled"
eventtype-auto-create: "disabled"
kreference-group: "disabled"
kreference-mapping: "disabled"
new-trigger-filters: "enabled"
strict-subscriber: "disabled"
transport-encryption: "Strict"
48 changes: 48 additions & 0 deletions test/e2e_new/broker_eventing_tls_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
//go:build e2e
// +build e2e

/*
* Copyright 2023 The Knative Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package e2e_new

import (
"testing"
"time"

"knative.dev/eventing-kafka-broker/test/rekt/features"
"knative.dev/pkg/system"
"knative.dev/reconciler-test/pkg/environment"
"knative.dev/reconciler-test/pkg/eventshub"
"knative.dev/reconciler-test/pkg/k8s"
"knative.dev/reconciler-test/pkg/knative"
)

func TestBrokerTLSCARotation(t *testing.T) {
t.Parallel()

ctx, env := global.Environment(
knative.WithKnativeNamespace(system.Namespace()),
knative.WithLoggingConfig,
knative.WithTracingConfig,
k8s.WithEventListener,
environment.Managed(t),
eventshub.WithTLS(t),
environment.WithPollTimings(5*time.Second, 4*time.Minute),
)

env.Test(ctx, t, features.RotateBrokerTLSCertificates())
}
6 changes: 6 additions & 0 deletions test/reconciler-tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,12 @@ go_test_e2e -tags=e2e,cloudevents -timeout=1h ./test/e2e_new_channel/... || fail

go_test_e2e -tags=deletecm ./test/e2e_new/... || fail_test "E2E (new deletecm) suite failed"

echo "Running E2E Reconciler Tests with strict transport encryption"

kubectl apply -Rf "$(dirname "$0")/config-transport-encryption"

go_test_e2e -timeout=1h ./test/e2e_new -run TLS || fail_test

if ! ${LOCAL_DEVELOPMENT}; then
go_test_e2e -tags=sacura -timeout=40m ./test/e2e/... || fail_test "E2E (sacura) suite failed"
fi
Expand Down
4 changes: 4 additions & 0 deletions test/rekt/features/broker_auth.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,19 @@ import (
"time"

"github.com/cloudevents/sdk-go/v2/test"

"github.com/google/uuid"
testpkg "knative.dev/eventing-kafka-broker/test/pkg"
"knative.dev/eventing-kafka-broker/test/rekt/resources/kafkaauthsecret"

"knative.dev/eventing/test/rekt/resources/broker"
"knative.dev/eventing/test/rekt/resources/trigger"

"knative.dev/reconciler-test/pkg/eventshub"
"knative.dev/reconciler-test/pkg/eventshub/assert"
"knative.dev/reconciler-test/pkg/feature"
"knative.dev/reconciler-test/pkg/manifest"

"knative.dev/reconciler-test/resources/svc"

brokerconfigmap "knative.dev/eventing-kafka-broker/test/rekt/resources/configmap/broker"
Expand Down
111 changes: 111 additions & 0 deletions test/rekt/features/broker_tls.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
/*
* Copyright 2023 The Knative Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package features

import (
"context"
"time"

"k8s.io/apimachinery/pkg/types"
"knative.dev/eventing/test/rekt/resources/addressable"
"knative.dev/reconciler-test/resources/certificate"

testpkg "knative.dev/eventing-kafka-broker/test/pkg"

cetest "github.com/cloudevents/sdk-go/v2/test"
"github.com/google/uuid"

brokerconfigmap "knative.dev/eventing-kafka-broker/test/rekt/resources/configmap/broker"
"knative.dev/eventing/test/rekt/features/featureflags"
"knative.dev/eventing/test/rekt/resources/broker"
"knative.dev/eventing/test/rekt/resources/trigger"
"knative.dev/pkg/system"
"knative.dev/reconciler-test/pkg/eventshub"
"knative.dev/reconciler-test/pkg/eventshub/assert"
"knative.dev/reconciler-test/pkg/feature"
"knative.dev/reconciler-test/pkg/resources/service"
)

func RotateBrokerTLSCertificates() *feature.Feature {

ingressCertificateName := "kafka-broker-ingress-server-tls"
ingressSecretName := "kafka-broker-ingress-server-tls"

brokerName := feature.MakeRandomK8sName("broker")
triggerName := feature.MakeRandomK8sName("trigger")
sink := feature.MakeRandomK8sName("sink")
source := feature.MakeRandomK8sName("source")

f := feature.NewFeatureNamed("Rotate Kafka Broker TLS certificate")

brokerConfig := feature.MakeRandomK8sName("brokercfg")

f.Prerequisite("transport encryption is strict", featureflags.TransportEncryptionStrict())
f.Prerequisite("should not run when Istio is enabled", featureflags.IstioDisabled())

f.Setup("Create broker config", brokerconfigmap.Install(brokerConfig,
brokerconfigmap.WithNumPartitions(1),
brokerconfigmap.WithReplicationFactor(1),
brokerconfigmap.WithBootstrapServer(testpkg.BootstrapServersPlaintext)))

f.Setup("Rotate ingress certificate", certificate.Rotate(certificate.RotateCertificate{
Certificate: types.NamespacedName{
Namespace: system.Namespace(),
Name: ingressCertificateName,
},
}))

f.Setup("install sink", eventshub.Install(sink, eventshub.StartReceiverTLS))
f.Setup("Install broker", broker.Install(brokerName, append(
broker.WithEnvConfig(),
broker.WithConfig(brokerConfig))...,
))
f.Setup("Broker is ready", broker.IsReady(brokerName))
f.Setup("install trigger", func(ctx context.Context, t feature.T) {
d := service.AsDestinationRef(sink)
d.CACerts = eventshub.GetCaCerts(ctx)
trigger.Install(triggerName, brokerName, trigger.WithSubscriberFromDestination(d))(ctx, t)
})
f.Setup("trigger is ready", trigger.IsReady(triggerName))
f.Setup("Broker has HTTPS address", broker.ValidateAddress(brokerName, addressable.AssertHTTPSAddress))

event := cetest.FullEvent()
event.SetID(uuid.New().String())

f.Requirement("install source", eventshub.Install(source,
eventshub.StartSenderToResourceTLS(broker.GVR(), brokerName, nil),
eventshub.InputEvent(event),
// Send multiple events so that we take into account that the certificate rotation might
// be detected by the server after some time.
eventshub.SendMultipleEvents(100, 3*time.Second),
))

f.Assert("Event sent", assert.OnStore(source).
MatchSentEvent(cetest.HasId(event.ID())).
AtLeast(1),
)
f.Assert("Event received", assert.OnStore(sink).
MatchReceivedEvent(cetest.HasId(event.ID())).
AtLeast(1),
)
f.Assert("Source match updated peer certificate", assert.OnStore(source).
MatchPeerCertificatesReceived(assert.MatchPeerCertificatesFromSecret(system.Namespace(), ingressSecretName, "tls.crt")).
AtLeast(1),
)

return f
}