Skip to content

Commit

Permalink
Add kafka ouath
Browse files Browse the repository at this point in the history
  • Loading branch information
carlesarnal committed Feb 4, 2025
1 parent 35264bb commit fb87679
Show file tree
Hide file tree
Showing 8 changed files with 305 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,14 @@ public class EnvironmentVariables {
public static final String KAFKASQL_SSL_TRUSTSTORE_LOCATION = KAFKA_PREFIX + "SSL_TRUSTSTORE_LOCATION";
public static final String KAFKASQL_SSL_TRUSTSTORE_PASSWORD = KAFKA_PREFIX + "SSL_TRUSTSTORE_PASSWORD";

// KafkaSQL oauth
public static final String APICURIO_KAFKASQL_SECURITY_SASL_ENABLED = "APICURIO_KAFKASQL_SECURITY_SASL_ENABLED";
public static final String APICURIO_KAFKASQL_SECURITY_SASL_MECHANISM = "APICURIO_KAFKASQL_SECURITY_SASL_MECHANISM";
public static final String APICURIO_KAFKA_SECURITY_SASL_CLIENT_ID = "APICURIO_KAFKA_SECURITY_SASL_CLIENT_ID";
public static final String APICURIO_KAFKASQL_SECURITY_SASL_CLIENT_SECRET = "APICURIO_KAFKASQL_SECURITY_SASL_CLIENT_SECRET";
public static final String APICURIO_KAFAKSQL_SECURITY_SASL_TOKEN_ENDPOINT = "APICURIO_KAFAKSQL_SECURITY_SASL_TOKEN_ENDPOINT";
public static final String APICURIO_KAFAKSQL_SECURITY_SASL_LOGIN_CALLBACK_HANDLER_CLASS = "APICURIO_KAFAKSQL_SECURITY_SASL_LOGIN_CALLBACK_HANDLER_CLASS";

// Auth related environment variables
public static final String APICURIO_REGISTRY_AUTH_ENABLED = "QUARKUS_OIDC_TENANT_ENABLED";
public static final String APICURIO_REGISTRY_APP_CLIENT_ID = "QUARKUS_OIDC_CLIENT_ID";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@ public static void configureKafkaSQL(ApicurioRegistry3 primary, Deployment deplo
env)) {
log.info("KafkaSQL storage with TLS security configured.");
}

if (KafkaSqlAuth.configureKafkaSQLOauth(primary, env)) {
log.info("KafkaSQL storage with Oauth security configured.");
}
}
});
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package io.apicurio.registry.operator.feat;

import io.apicurio.registry.operator.api.v1.ApicurioRegistry3;
import io.apicurio.registry.operator.api.v1.ApicurioRegistry3Spec;
import io.apicurio.registry.operator.api.v1.spec.AppSpec;
import io.apicurio.registry.operator.api.v1.spec.KafkaSqlAuthSpec;
import io.apicurio.registry.operator.api.v1.spec.KafkaSqlSpec;
import io.apicurio.registry.operator.api.v1.spec.StorageSpec;
import io.apicurio.registry.operator.utils.SecretKeyRefTool;
import io.fabric8.kubernetes.api.model.EnvVar;

import java.util.Map;
import java.util.Optional;

import static io.apicurio.registry.operator.EnvironmentVariables.*;
import static io.apicurio.registry.operator.resource.app.AppDeploymentResource.addEnvVar;
import static java.util.Optional.ofNullable;

public class KafkaSqlAuth {

/**
* KafkaSQL must be already configured.
*/
public static boolean configureKafkaSQLOauth(ApicurioRegistry3 primary, Map<String, EnvVar> env) {

// spotless:off
var clientSecret = new SecretKeyRefTool(getKafkaSqlAuthSpec(primary)
.map(KafkaSqlAuthSpec::getClientSecret)
.orElse(null), "client-secret");

if (clientSecret.isValid()) {

getKafkaSqlAuthSpec(primary)
.filter(KafkaSqlAuthSpec::getEnabled)
.ifPresent(kafkaSqlAuthSpec -> {
addEnvVar(env, APICURIO_KAFKASQL_SECURITY_SASL_ENABLED, kafkaSqlAuthSpec.getEnabled().toString());
addEnvVar(env, APICURIO_KAFKASQL_SECURITY_SASL_MECHANISM, kafkaSqlAuthSpec.getMechanism());
addEnvVar(env, APICURIO_KAFKA_SECURITY_SASL_CLIENT_ID, kafkaSqlAuthSpec.getClientId());
addEnvVar(env, APICURIO_KAFKASQL_SECURITY_SASL_CLIENT_SECRET, new SecretKeyRefTool(kafkaSqlAuthSpec.getClientSecret(), "client-secret").getSecretVolumeKeyPath());
addEnvVar(env, APICURIO_KAFAKSQL_SECURITY_SASL_TOKEN_ENDPOINT, kafkaSqlAuthSpec.getTokenEndpoint());
addEnvVar(env, APICURIO_KAFAKSQL_SECURITY_SASL_LOGIN_CALLBACK_HANDLER_CLASS, kafkaSqlAuthSpec.getLoginHandlerClass());
});

return true;
}
return false;
}

private static Optional<KafkaSqlAuthSpec> getKafkaSqlAuthSpec(ApicurioRegistry3 primary) {
// spotless:off
return ofNullable(primary)
.map(ApicurioRegistry3::getSpec)
.map(ApicurioRegistry3Spec::getApp)
.map(AppSpec::getStorage)
.map(StorageSpec::getKafkasql)
.map(KafkaSqlSpec::getAuth);
// spotless:on
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package io.apicurio.registry.operator.it;

import io.apicurio.registry.operator.api.v1.ApicurioRegistry3;
import io.fabric8.kubernetes.api.model.PodCondition;
import io.quarkus.test.junit.QuarkusTest;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static io.apicurio.registry.operator.it.KafkaSqlITTest.applyStrimziResources;
import static io.apicurio.registry.operator.resource.ResourceFactory.deserialize;
import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;

@QuarkusTest
public class KafkaSqlOAuthITTest extends ITBase {

private static final Logger log = LoggerFactory.getLogger(KafkaSqlOAuthITTest.class);

@BeforeAll
public static void beforeAll() throws Exception {
applyStrimziResources();
}

@Test
void testKafkaSQLTLS() {
client.load(getClass().getResourceAsStream("/k8s/examples/kafkasql/oauth/oauth-example-cluster.yaml"))
.create();
final var clusterName = "oauth-example-cluster";

await().ignoreExceptions().untilAsserted(() ->
// Strimzi uses StrimziPodSet instead of ReplicaSet, so we have to check pods
assertThat(client.pods().inNamespace(namespace).withName(clusterName + "-kafka-0").get().getStatus()
.getConditions()).filteredOn(c -> "Ready".equals(c.getType())).map(PodCondition::getStatus)
.containsOnly("True"));

// We're guessing the value here to avoid using Strimzi Java model, and relying on retries below.
var bootstrapServers = clusterName + "-kafka-bootstrap." + namespace + ".svc:9093";

var registry = deserialize(
"k8s/examples/kafkasql/oauth/oauth-example-kafkasql-tls.apicurioregistry3.yaml.yaml",
ApicurioRegistry3.class);
registry.getMetadata().setNamespace(namespace);
registry.getSpec().getApp().getStorage().getKafkasql().setBootstrapServers(bootstrapServers);

client.resource(registry).create();

await().ignoreExceptions().until(() -> {
assertThat(client.apps().deployments().inNamespace(namespace)
.withName(registry.getMetadata().getName() + "-app-deployment").get().getStatus()
.getReadyReplicas().intValue()).isEqualTo(1);
var podName = client.pods().inNamespace(namespace).list().getItems().stream()
.map(pod -> pod.getMetadata().getName())
.filter(podN -> podN.startsWith(registry.getMetadata().getName() + "-app-deployment"))
.findFirst().get();
assertThat(client.pods().inNamespace(namespace).withName(podName).getLog())
.contains("Using Kafka-SQL artifactStore");
return true;
});
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
# IMPORTANT: This resource should only be used for development or testing purposes.
apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
metadata:
name: oauth-example-cluster
spec:
kafka:
version: 3.8.0
replicas: 1
listeners:
- name: tls
type: internal
port: 9093
tls: true
authentication:
type: tls
authorization:
type: oauth
clientId: admin-client
clientSecret:
clientSecret:
key: client-secret
secretName: client-credentials
validIssuerUri: https://simple-keycloak.apps.cluster.example/realms/registry
jwksEndpointUri: https://simple-keycloak.apps.cluster.example/realms/registry/protocol/openid-connect/certs
userNameClaim: preferred_username
tlsTrustedCertificates:
- secretName: keycloak-tls
certificate: tls.crt
config:
inter.broker.protocol.version: "3.8"
offsets.topic.replication.factor: 1
storage:
type: ephemeral
zookeeper:
replicas: 1
storage:
type: ephemeral
entityOperator:
userOperator: { }
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
apiVersion: v1
kind: Secret
metadata:
name: client-credentials
data:
client-secret: dGVzdDE=
---
apiVersion: registry.apicur.io/v1
kind: ApicurioRegistry3
metadata:
name: example-kafkasql-tls
spec:
app:
storage:
type: kafkasql
kafkasql:
bootstrapServers: "<service name>.<namespace>.svc:9092"
# Try using Strimzi/Red Hat AMQ Streams Operator!
tls:
keystoreSecretRef:
name: apicurio
keystorePasswordSecretRef:
name: apicurio
truststoreSecretRef:
name: oauth-example-cluster-cluster-ca-cert
truststorePasswordSecretRef:
name: oauth-example-cluster-cluster-ca-cert
auth:
enabled: true
mechanism: "OAUTHBEARER"
clientId: "admin-client"
clientSecretRef:
name: client-credentials
key: client-secret
tokenEndpoint: https://simple-keycloak.apps.cluster.example/realms/registry/protocol/openid-connect/token
loginHandlerClass: io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler
ingress:
host: example-kafkasql-tls-app.apps.cluster.example
ui:
ingress:
host: example-kafkasql-tls-ui.apps.cluster.example
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
package io.apicurio.registry.operator.api.v1.spec;

import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonPropertyDescription;
import com.fasterxml.jackson.annotation.JsonPropertyOrder;
import com.fasterxml.jackson.annotation.JsonSetter;
import com.fasterxml.jackson.databind.JsonDeserializer;
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import lombok.AllArgsConstructor;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
import lombok.ToString;
import lombok.experimental.SuperBuilder;

import static com.fasterxml.jackson.annotation.JsonInclude.Include.NON_NULL;
import static com.fasterxml.jackson.annotation.Nulls.SKIP;
import static lombok.AccessLevel.PRIVATE;

@JsonDeserialize(using = JsonDeserializer.None.class)
@JsonInclude(NON_NULL)
@JsonPropertyOrder({ "enabled", "mechanism", "clientId", "clientSecret", "tokenEndpoint",
"loginHandlerClass" })
@NoArgsConstructor
@AllArgsConstructor(access = PRIVATE)
@SuperBuilder(toBuilder = true)
@Getter
@Setter
@EqualsAndHashCode
@ToString
public class KafkaSqlAuthSpec {

@JsonProperty("enabled")
@JsonPropertyDescription("""
Enables SASL OAuth authentication for Apicurio Registry storage in Kafka. You must set this variable to true for the other variables to have effect.""")
@JsonSetter(nulls = SKIP)
private Boolean enabled;

@JsonProperty("mechanism")
@JsonPropertyDescription("""
The mechanism used to authenticate to Kafka.""")
@JsonSetter(nulls = SKIP)
private String mechanism;

@JsonProperty("clientId")
@JsonPropertyDescription("""
The client ID used to authenticate to Kafka.""")
@JsonSetter(nulls = SKIP)
private String clientId;

@JsonProperty("clientSecret")
@JsonPropertyDescription("""
The client secret used to authenticate to Kafka.""")
@JsonSetter(nulls = SKIP)
private SecretKeyRef clientSecret;

@JsonProperty("tokenEndpoint")
@JsonPropertyDescription("""
The URL of the OAuth identity server.""")
@JsonSetter(nulls = SKIP)
private String tokenEndpoint;

@JsonProperty("loginHandlerClass")
@JsonPropertyDescription("""
The login class to be used for login.""")
@JsonSetter(nulls = SKIP)
private String loginHandlerClass;
}
Original file line number Diff line number Diff line change
@@ -1,9 +1,19 @@
package io.apicurio.registry.operator.api.v1.spec;

import com.fasterxml.jackson.annotation.*;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonPropertyDescription;
import com.fasterxml.jackson.annotation.JsonPropertyOrder;
import com.fasterxml.jackson.annotation.JsonSetter;
import com.fasterxml.jackson.annotation.Nulls;
import com.fasterxml.jackson.databind.JsonDeserializer.None;
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import lombok.*;
import lombok.AllArgsConstructor;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
import lombok.ToString;
import lombok.experimental.SuperBuilder;

import static com.fasterxml.jackson.annotation.JsonInclude.Include.NON_NULL;
Expand Down Expand Up @@ -43,4 +53,13 @@ public class KafkaSqlSpec {
Configure KafkaSQL storage when the access to the Kafka cluster is secured using TLS.""")
@JsonSetter(nulls = Nulls.SKIP)
private KafkaSqlTLSSpec tls;

/**
* Configure KafkaSQL storage when the access to the Kafka cluster is secured using TLS.
*/
@JsonProperty("auth")
@JsonPropertyDescription("""
Configure KafkaSQL storage authentication.""")
@JsonSetter(nulls = Nulls.SKIP)
private KafkaSqlAuthSpec auth;
}

0 comments on commit fb87679

Please sign in to comment.