Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support exposing the Audience of a Broker #3600

24 changes: 23 additions & 1 deletion control-plane/pkg/reconciler/broker/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ import (
"strings"
"time"

"knative.dev/eventing/pkg/auth"
"knative.dev/pkg/logging"

"go.uber.org/zap"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
Expand Down Expand Up @@ -265,7 +268,6 @@ func (r *Reconciler) reconcileKind(ctx context.Context, broker *eventing.Broker)
addressableStatus.Address = &httpAddress
addressableStatus.Addresses = []duckv1.Addressable{httpAddress}
}

proberAddressable := prober.ProberAddressable{
AddressStatus: &addressableStatus,
ResourceKey: types.NamespacedName{
Expand All @@ -282,6 +284,26 @@ func (r *Reconciler) reconcileKind(ctx context.Context, broker *eventing.Broker)

broker.Status.Address = addressableStatus.Address
broker.Status.Addresses = addressableStatus.Addresses

if feature.FromContext(ctx).IsOIDCAuthentication() && broker.Status.Address != nil {
audience := auth.GetAudience(eventing.SchemeGroupVersion.WithKind("Broker"), broker.ObjectMeta)
logging.FromContext(ctx).Debugw("Setting the brokers audience", zap.String("audience", audience))
broker.Status.Address.Audience = &audience
creydr marked this conversation as resolved.
Show resolved Hide resolved

for i := range broker.Status.Addresses {
broker.Status.Addresses[i].Audience = &audience
}
} else {
logging.FromContext(ctx).Debug("Clearing the brokers audience as OIDC is not enabled")
if broker.Status.Address != nil {
broker.Status.Address.Audience = nil
}

for i := range broker.Status.Addresses {
broker.Status.Addresses[i].Audience = nil
}
}

broker.GetConditionSet().Manage(broker.GetStatus()).MarkTrue(base.ConditionAddressable)

return nil
Expand Down
106 changes: 105 additions & 1 deletion control-plane/pkg/reconciler/broker/broker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import (
"testing"
"text/template"

"knative.dev/eventing/pkg/auth"

"knative.dev/eventing-kafka-broker/control-plane/pkg/counter"

"k8s.io/apimachinery/pkg/runtime/schema"
Expand Down Expand Up @@ -106,6 +108,10 @@ var (
linear = eventingduck.BackoffPolicyLinear
exponential = eventingduck.BackoffPolicyExponential
customBrokerTopicTemplate = customTemplate()
brokerAudience = auth.GetAudience(eventing.SchemeGroupVersion.WithKind("Broker"), metav1.ObjectMeta{
Name: BrokerName,
Namespace: BrokerNamespace,
})
)

var DefaultEnv = &config.Env{
Expand All @@ -121,7 +127,6 @@ var DefaultEnv = &config.Env{

func TestBrokerReconciler(t *testing.T) {
eventing.RegisterAlternateBrokerConditionSet(base.IngressConditionSet)

t.Parallel()

for _, f := range Formats {
Expand Down Expand Up @@ -2221,6 +2226,105 @@ func brokerReconciliation(t *testing.T, format string, env config.Env) {
}),
},
},
{
Name: "Should provision audience if authentication enabled",
Objects: []runtime.Object{
NewBroker(
WithBrokerConfig(KReference(BrokerConfig(bootstrapServers, 20, 5,
BrokerAuthConfig("secret-1"),
))),
),
NewSSLSecret(ConfigMapNamespace, "secret-1"),
BrokerConfig(bootstrapServers, 20, 5, BrokerAuthConfig("secret-1")),
NewConfigMapWithBinaryData(env.DataPlaneConfigMapNamespace, env.ContractConfigMapName, nil),
NewService(),
BrokerReceiverPod(env.SystemNamespace, map[string]string{
"annotation_to_preserve": "value_to_preserve",
}),
BrokerDispatcherPod(env.SystemNamespace, map[string]string{
"annotation_to_preserve": "value_to_preserve",
}),
},
Key: testKey,
WantEvents: []string{
finalizerUpdatedEvent,
},
WantUpdates: []clientgotesting.UpdateActionImpl{
SecretFinalizerUpdate("secret-1", SecretFinalizerName),
ConfigMapUpdate(env.DataPlaneConfigMapNamespace, env.ContractConfigMapName, env.ContractConfigMapFormat, &contract.Contract{
Resources: []*contract.Resource{
{
Uid: BrokerUUID,
Topics: []string{BrokerTopic()},
Ingress: &contract.Ingress{Path: receiver.Path(BrokerNamespace, BrokerName)},
BootstrapServers: bootstrapServers,
Reference: BrokerReference(),
Auth: &contract.Resource_AuthSecret{
AuthSecret: &contract.Reference{
Uuid: SecretUUID,
Namespace: ConfigMapNamespace,
Name: "secret-1",
Version: SecretResourceVersion,
},
},
},
},
Generation: 1,
}),
BrokerReceiverPodUpdate(env.SystemNamespace, map[string]string{
base.VolumeGenerationAnnotationKey: "1",
"annotation_to_preserve": "value_to_preserve",
}),
BrokerDispatcherPodUpdate(env.SystemNamespace, map[string]string{
base.VolumeGenerationAnnotationKey: "1",
"annotation_to_preserve": "value_to_preserve",
}),
},
WantPatches: []clientgotesting.PatchActionImpl{
patchFinalizers(),
},
WantStatusUpdates: []clientgotesting.UpdateActionImpl{
{
Object: NewBroker(
WithBrokerConfig(KReference(BrokerConfig(bootstrapServers, 20, 5,
BrokerAuthConfig("secret-1"),
))),
reconcilertesting.WithInitBrokerConditions,
StatusBrokerConfigMapUpdatedReady(&env),
StatusBrokerDataPlaneAvailable,
StatusBrokerConfigParsed,
StatusBrokerTopicReady,
BrokerConfigMapAnnotations(),
WithTopicStatusAnnotation(BrokerTopic()),
BrokerConfigMapSecretAnnotation("secret-1"),
BrokerAddressable(&env),
StatusBrokerProbeSucceeded,
WithBrokerAddresses([]duckv1.Addressable{
{
Name: pointer.String("http"),
URL: brokerAddress,
Audience: &brokerAudience,
},
}),
WithBrokerAddress(duckv1.Addressable{
Name: pointer.String("http"),
URL: brokerAddress,
Audience: &brokerAudience,
}),
WithBrokerAddessable(),
),
},
},
OtherTestData: map[string]interface{}{
ExpectedTopicDetail: sarama.TopicDetail{
NumPartitions: 20,
ReplicationFactor: 5,
},
},
Ctx: feature.ToContext(context.Background(), feature.Flags{
feature.OIDCAuthentication: feature.Enabled,
}),
},
}

for i := range table {
Expand Down
Loading