diff --git a/api/src/main/java/io/strimzi/api/kafka/model/CruiseControlSpec.java b/api/src/main/java/io/strimzi/api/kafka/model/CruiseControlSpec.java index c0ee8b9ccda..281e195a3eb 100644 --- a/api/src/main/java/io/strimzi/api/kafka/model/CruiseControlSpec.java +++ b/api/src/main/java/io/strimzi/api/kafka/model/CruiseControlSpec.java @@ -38,8 +38,8 @@ public class CruiseControlSpec implements UnknownPropertyPreserving, Serializabl public static final String FORBIDDEN_PREFIXES = "bootstrap.servers, client.id, zookeeper., network., security., failed.brokers.zk.path," + "webserver.http., webserver.api.urlprefix, webserver.session.path, webserver.accesslog., two.step., request.reason.required," + "metric.reporter.sampler.bootstrap.servers, metric.reporter.topic, partition.metric.sample.store.topic, broker.metric.sample.store.topic," - + "capacity.config.file, self.healing."; - public static final String FORBIDDEN_PREFIX_EXCEPTIONS = ""; + + "capacity.config.file, self.healing., ssl."; + public static final String FORBIDDEN_PREFIX_EXCEPTIONS = "ssl.cipher.suites, ssl.protocol, ssl.enabled.protocols"; private int replicas; private String image; diff --git a/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/CruiseControl.java b/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/CruiseControl.java index 060c7deafb5..64bf80a5515 100644 --- a/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/CruiseControl.java +++ b/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/CruiseControl.java @@ -54,8 +54,6 @@ public class CruiseControl extends AbstractModel { protected static final String APPLICATION_NAME = "cruise-control"; public static final String CRUISE_CONTROL_METRIC_REPORTER = "com.linkedin.kafka.cruisecontrol.metricsreporter.CruiseControlMetricsReporter"; - public static final String CRUISE_CONTROL_METRICS_TOPIC = "cruise.control.metrics.topic"; - public static final String CRUISE_CONTROL_METRICS_TOPIC_VALUE = "strimzi.cruisecontrol.metrics"; protected static final String CRUISE_CONTROL_CONTAINER_NAME = "cruise-control"; protected static final String TLS_SIDECAR_NAME = "tls-sidecar"; @@ -92,7 +90,7 @@ public class CruiseControl extends AbstractModel { public static final String REST_API_PORT_NAME = "rest-api"; public static final int REST_API_PORT = 9090; - protected static final int DEFAULT_BOOTSTRAP_SERVERS_PORT = 9092; + protected static final int DEFAULT_BOOTSTRAP_SERVERS_PORT = 9091; public static final String MIN_INSYNC_REPLICAS = "min.insync.replicas"; // Cruise Control configuration keys (EnvVariables) diff --git a/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/KafkaBrokerConfigurationBuilder.java b/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/KafkaBrokerConfigurationBuilder.java index 13492ad7b86..5c21deaa843 100644 --- a/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/KafkaBrokerConfigurationBuilder.java +++ b/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/KafkaBrokerConfigurationBuilder.java @@ -6,6 +6,7 @@ import io.fabric8.kubernetes.api.model.VolumeMount; import io.strimzi.api.kafka.model.CertAndKeySecretSource; +import io.strimzi.api.kafka.model.CruiseControlSpec; import io.strimzi.api.kafka.model.KafkaAuthorization; import io.strimzi.api.kafka.model.KafkaAuthorizationKeycloak; import io.strimzi.api.kafka.model.KafkaAuthorizationSimple; @@ -56,11 +57,38 @@ public KafkaBrokerConfigurationBuilder() { public KafkaBrokerConfigurationBuilder withBrokerId() { printSectionHeader("Broker ID"); writer.println("broker.id=${STRIMZI_BROKER_ID}"); + writer.println(); return this; } + /** + * Configures the Cruise Control metric reporter. It is set only if user enabled the Cruise Control. + * + * @param clusterName Name of the cluster + * @param cruiseContol The Cruise Control configuration from the Kafka CR + * + * @return Returns the builder instance + */ + public KafkaBrokerConfigurationBuilder withCruiseControl(String clusterName, CruiseControlSpec cruiseContol) { + if (cruiseContol != null) { + printSectionHeader("Cruise Control configuration"); + writer.println("cruise.control.metrics.topic=strimzi.cruisecontrol.metrics"); + writer.println("cruise.control.metrics.reporter.ssl.endpoint.identification.algorithm=HTTPS"); + writer.println("cruise.control.metrics.reporter.bootstrap.servers=" + KafkaResources.bootstrapServiceName(clusterName) + ":9091"); + writer.println("cruise.control.metrics.reporter.security.protocol=SSL"); + writer.println("cruise.control.metrics.reporter.ssl.keystore.type=PKCS12"); + writer.println("cruise.control.metrics.reporter.ssl.keystore.location=/tmp/kafka/cluster.keystore.p12"); + writer.println("cruise.control.metrics.reporter.ssl.keystore.password=${CERTS_STORE_PASSWORD}"); + writer.println("cruise.control.metrics.reporter.ssl.truststore.type=PKCS12"); + writer.println("cruise.control.metrics.reporter.ssl.truststore.location=/tmp/kafka/cluster.truststore.p12"); + writer.println("cruise.control.metrics.reporter.ssl.truststore.password=${CERTS_STORE_PASSWORD}"); + writer.println(); + } + + return this; + } /** * Adds the template for the {@code rack.id}. The rack ID will be set in the container based on the value of the * {@code STRIMZI_RACK_ID} env var. It is set only if user enabled the rack awareness- @@ -376,6 +404,8 @@ public KafkaBrokerConfigurationBuilder withAuthorization(String clusterName, Kaf superUsers.add(String.format("User:CN=%s,O=io.strimzi", KafkaResources.kafkaStatefulSetName(clusterName))); superUsers.add(String.format("User:CN=%s-%s,O=io.strimzi", clusterName, "entity-operator")); superUsers.add(String.format("User:CN=%s-%s,O=io.strimzi", clusterName, "kafka-exporter")); + superUsers.add(String.format("User:CN=%s-%s,O=io.strimzi", clusterName, "cruise-control")); + superUsers.add(String.format("User:CN=%s,O=io.strimzi", "cluster-operator")); printSectionHeader("Authorization"); diff --git a/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/KafkaCluster.java b/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/KafkaCluster.java index bcc02ac196f..f1e635fdace 100644 --- a/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/KafkaCluster.java +++ b/cluster-operator/src/main/java/io/strimzi/operator/cluster/model/KafkaCluster.java @@ -115,8 +115,6 @@ import java.util.Map; import java.util.Set; -import static io.strimzi.operator.cluster.model.CruiseControl.CRUISE_CONTROL_METRICS_TOPIC; -import static io.strimzi.operator.cluster.model.CruiseControl.CRUISE_CONTROL_METRICS_TOPIC_VALUE; import static java.util.Collections.addAll; import static java.util.Collections.emptyMap; import static java.util.Collections.singletonMap; @@ -222,6 +220,7 @@ public class KafkaCluster extends AbstractModel { private KafkaListeners listeners; private KafkaAuthorization authorization; private KafkaVersion kafkaVersion; + private CruiseControlSpec cruiseControlSpec; private boolean isJmxEnabled; private boolean isJmxAuthenticated; private CertAndKeySecretSource secretSourceExternal = null; @@ -464,13 +463,11 @@ public static KafkaCluster fromCrd(Kafka kafkaAssembly, KafkaVersion.Lookup vers addAll(metricReporterList, configuration.getConfigOption(KAFKA_METRIC_REPORTERS_CONFIG_FIELD).split(",")); } - CruiseControlSpec cruiseControlSpec = kafkaSpec.getCruiseControl(); - if (cruiseControlSpec != null) { + result.cruiseControlSpec = kafkaSpec.getCruiseControl(); + if (result.cruiseControlSpec != null) { metricReporterList.add(CRUISE_CONTROL_METRIC_REPORTER); - configuration.setConfigOption(CRUISE_CONTROL_METRICS_TOPIC, CRUISE_CONTROL_METRICS_TOPIC_VALUE); } else { metricReporterList.remove(CRUISE_CONTROL_METRIC_REPORTER); - configuration.removeConfigOption(CRUISE_CONTROL_METRICS_TOPIC); } if (!metricReporterList.isEmpty()) { configuration.setConfigOption(KAFKA_METRIC_REPORTERS_CONFIG_FIELD, String.join(",", metricReporterList)); @@ -2046,11 +2043,18 @@ public NetworkPolicy generateNetworkPolicy(boolean namespaceAndPodSelectorNetwor .endPodSelector() .build(); + NetworkPolicyPeer cruiseControlPeer = new NetworkPolicyPeerBuilder() + .withNewPodSelector() // cruise control + .addToMatchLabels(Labels.STRIMZI_NAME_LABEL, CruiseControl.cruiseControlName(cluster)) + .endPodSelector() + .build(); + List clientsPortPeers = new ArrayList<>(4); clientsPortPeers.add(clusterOperatorPeer); clientsPortPeers.add(kafkaClusterPeer); clientsPortPeers.add(entityOperatorPeer); clientsPortPeers.add(kafkaExporterPeer); + clientsPortPeers.add(cruiseControlPeer); replicationRule.setFrom(clientsPortPeers); } @@ -2472,6 +2476,7 @@ private String generateBrokerConfiguration() { .withLogDirs(VolumeUtils.getDataVolumeMountPaths(storage, mountPath)) .withListeners(cluster, namespace, listeners) .withAuthorization(cluster, authorization) + .withCruiseControl(cluster, cruiseControlSpec) .withUserConfiguration(configuration) .build().trim(); } diff --git a/cluster-operator/src/test/java/io/strimzi/operator/cluster/model/KafkaBrokerConfigurationBuilderTest.java b/cluster-operator/src/test/java/io/strimzi/operator/cluster/model/KafkaBrokerConfigurationBuilderTest.java index a2f7daf134b..26d692f9fb5 100644 --- a/cluster-operator/src/test/java/io/strimzi/operator/cluster/model/KafkaBrokerConfigurationBuilderTest.java +++ b/cluster-operator/src/test/java/io/strimzi/operator/cluster/model/KafkaBrokerConfigurationBuilderTest.java @@ -6,6 +6,8 @@ import io.strimzi.api.kafka.model.CertSecretSource; import io.strimzi.api.kafka.model.CertSecretSourceBuilder; +import io.strimzi.api.kafka.model.CruiseControlSpec; +import io.strimzi.api.kafka.model.CruiseControlSpecBuilder; import io.strimzi.api.kafka.model.KafkaAuthorization; import io.strimzi.api.kafka.model.KafkaAuthorizationKeycloakBuilder; import io.strimzi.api.kafka.model.KafkaAuthorizationSimpleBuilder; @@ -50,6 +52,36 @@ public void testBrokerId() { assertThat(configuration, isEquivalent("broker.id=${STRIMZI_BROKER_ID}")); } + @Test + public void testNoCruiseControl() { + String configuration = new KafkaBrokerConfigurationBuilder() + .withCruiseControl("my-cluster", null) + .build(); + + assertThat(configuration, isEquivalent("")); + } + + @Test + public void testCruiseControl() { + CruiseControlSpec cruiseControlSpec = new CruiseControlSpecBuilder().build(); + + String configuration = new KafkaBrokerConfigurationBuilder() + .withCruiseControl("my-cluster", cruiseControlSpec) + .build(); + + assertThat(configuration, isEquivalent( + "cruise.control.metrics.topic=strimzi.cruisecontrol.metrics\n" + + "cruise.control.metrics.reporter.ssl.endpoint.identification.algorithm=HTTPS\n" + + "cruise.control.metrics.reporter.bootstrap.servers=my-cluster-kafka-bootstrap:9091\n" + + "cruise.control.metrics.reporter.security.protocol=SSL\n" + + "cruise.control.metrics.reporter.ssl.keystore.type=PKCS12\n" + + "cruise.control.metrics.reporter.ssl.keystore.location=/tmp/kafka/cluster.keystore.p12\n" + + "cruise.control.metrics.reporter.ssl.keystore.password=${CERTS_STORE_PASSWORD}\n" + + "cruise.control.metrics.reporter.ssl.truststore.type=PKCS12\n" + + "cruise.control.metrics.reporter.ssl.truststore.location=/tmp/kafka/cluster.truststore.p12\n" + + "cruise.control.metrics.reporter.ssl.truststore.password=${CERTS_STORE_PASSWORD}")); + } + @Test public void testNoRackAwareness() { String configuration = new KafkaBrokerConfigurationBuilder() @@ -108,7 +140,7 @@ public void testSimpleAuthorizationWithSuperUsers() { .build(); assertThat(configuration, isEquivalent("authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer\n" + - "super.users=User:CN=my-cluster-kafka,O=io.strimzi;User:CN=my-cluster-entity-operator,O=io.strimzi;User:CN=my-cluster-kafka-exporter,O=io.strimzi;User:CN=cluster-operator,O=io.strimzi;User:jakub;User:CN=kuba")); + "super.users=User:CN=my-cluster-kafka,O=io.strimzi;User:CN=my-cluster-entity-operator,O=io.strimzi;User:CN=my-cluster-kafka-exporter,O=io.strimzi;User:CN=my-cluster-cruise-control,O=io.strimzi;User:CN=cluster-operator,O=io.strimzi;User:jakub;User:CN=kuba")); } @Test @@ -121,7 +153,7 @@ public void testSimpleAuthorizationWithoutSuperUsers() { .build(); assertThat(configuration, isEquivalent("authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer\n" + - "super.users=User:CN=my-cluster-kafka,O=io.strimzi;User:CN=my-cluster-entity-operator,O=io.strimzi;User:CN=my-cluster-kafka-exporter,O=io.strimzi;User:CN=cluster-operator,O=io.strimzi")); + "super.users=User:CN=my-cluster-kafka,O=io.strimzi;User:CN=my-cluster-entity-operator,O=io.strimzi;User:CN=my-cluster-kafka-exporter,O=io.strimzi;User:CN=my-cluster-cruise-control,O=io.strimzi;User:CN=cluster-operator,O=io.strimzi")); } @Test @@ -155,7 +187,7 @@ public void testKeycloakAuthorization() { "strimzi.authorization.ssl.truststore.type=PKCS12\n" + "strimzi.authorization.ssl.secure.random.implementation=SHA1PRNG\n" + "strimzi.authorization.ssl.endpoint.identification.algorithm=\n" + - "super.users=User:CN=my-cluster-kafka,O=io.strimzi;User:CN=my-cluster-entity-operator,O=io.strimzi;User:CN=my-cluster-kafka-exporter,O=io.strimzi;User:CN=cluster-operator,O=io.strimzi;User:giada;User:CN=paccu")); + "super.users=User:CN=my-cluster-kafka,O=io.strimzi;User:CN=my-cluster-entity-operator,O=io.strimzi;User:CN=my-cluster-kafka-exporter,O=io.strimzi;User:CN=my-cluster-cruise-control,O=io.strimzi;User:CN=cluster-operator,O=io.strimzi;User:giada;User:CN=paccu")); } @Test diff --git a/cluster-operator/src/test/java/io/strimzi/operator/cluster/model/KafkaClusterTest.java b/cluster-operator/src/test/java/io/strimzi/operator/cluster/model/KafkaClusterTest.java index 473a357438b..483b6138064 100644 --- a/cluster-operator/src/test/java/io/strimzi/operator/cluster/model/KafkaClusterTest.java +++ b/cluster-operator/src/test/java/io/strimzi/operator/cluster/model/KafkaClusterTest.java @@ -1488,6 +1488,12 @@ public void testReplicationPortNetworkPolicy() { .endPodSelector() .build(); + NetworkPolicyPeer cruiseControlPeer = new NetworkPolicyPeerBuilder() + .withNewPodSelector() + .withMatchLabels(Collections.singletonMap(Labels.STRIMZI_NAME_LABEL, CruiseControl.cruiseControlName(cluster))) + .endPodSelector() + .build(); + NetworkPolicyPeer clusterOperatorPeer = new NetworkPolicyPeerBuilder() .withNewPodSelector() .withMatchLabels(Collections.singletonMap(Labels.STRIMZI_KIND_LABEL, "cluster-operator")) @@ -1506,10 +1512,11 @@ public void testReplicationPortNetworkPolicy() { List rules = np.getSpec().getIngress().stream().filter(ing -> ing.getPorts().get(0).getPort().equals(new IntOrString(KafkaCluster.REPLICATION_PORT))).map(NetworkPolicyIngressRule::getFrom).findFirst().orElse(null); - assertThat(rules.size(), is(4)); + assertThat(rules.size(), is(5)); assertThat(rules.contains(kafkaBrokersPeer), is(true)); assertThat(rules.contains(eoPeer), is(true)); assertThat(rules.contains(kafkaExporterPeer), is(true)); + assertThat(rules.contains(cruiseControlPeer), is(true)); assertThat(rules.contains(clusterOperatorPeer), is(true)); } diff --git a/docker-images/kafka/cruise-control-scripts/cruise_control_config_generator.sh b/docker-images/kafka/cruise-control-scripts/cruise_control_config_generator.sh index 001061010a9..09a05ce8cad 100755 --- a/docker-images/kafka/cruise-control-scripts/cruise_control_config_generator.sh +++ b/docker-images/kafka/cruise-control-scripts/cruise_control_config_generator.sh @@ -43,5 +43,12 @@ capacity.config.file=$CC_CAPACITY_FILE cluster.configs.file=$CC_CLUSTER_CONFIG_FILE webserver.accesslog.path=$CC_ACCESS_LOG webserver.http.address=0.0.0.0 +security.protocol=SSL +ssl.keystore.type=PKCS12 +ssl.keystore.location=/tmp/cruise-control/replication.keystore.p12 +ssl.keystore.password=$CERTS_STORE_PASSWORD +ssl.truststore.type=PKCS12 +ssl.truststore.location=/tmp/cruise-control/replication.truststore.p12 +ssl.truststore.password=$CERTS_STORE_PASSWORD ${CRUISE_CONTROL_CONFIGURATION} EOF diff --git a/docker-images/kafka/cruise-control-scripts/cruise_control_run.sh b/docker-images/kafka/cruise-control-scripts/cruise_control_run.sh index f130236e8fb..23bc8920cd6 100755 --- a/docker-images/kafka/cruise-control-scripts/cruise_control_run.sh +++ b/docker-images/kafka/cruise-control-scripts/cruise_control_run.sh @@ -1,4 +1,4 @@ -#!/bin/bash +#!/usr/bin/env bash set +x export CLASSPATH="$CLASSPATH:/opt/cruise-control/libs/*" @@ -40,7 +40,7 @@ fi # Generate and print the config file echo "Starting Cruise Control with configuration:" -$CRUISE_CONTROL_HOME/cruise_control_config_generator.sh | tee /tmp/cruisecontrol.properties +$CRUISE_CONTROL_HOME/cruise_control_config_generator.sh | tee /tmp/cruisecontrol.properties | sed -e 's/password=.*/password=[hidden]/g' echo "" # JVM performance options diff --git a/documentation/modules/appendix_crds.adoc b/documentation/modules/appendix_crds.adoc index bd201206754..b3b4115a592 100644 --- a/documentation/modules/appendix_crds.adoc +++ b/documentation/modules/appendix_crds.adoc @@ -1354,7 +1354,7 @@ Used in: xref:type-KafkaSpec-{context}[`KafkaSpec`] |integer |image 1.2+<.<|The docker image for the pods. |string -|config 1.2+<.<|The Cruise Control configuration. For a full list of configuration options refer to https://github.com/linkedin/cruise-control/wiki/Configurations. Note that properties with the following prefixes cannot be set: bootstrap.servers, client.id, zookeeper., network., security., failed.brokers.zk.path,webserver.http., webserver.api.urlprefix, webserver.session.path, webserver.accesslog., two.step., request.reason.required,metric.reporter.sampler.bootstrap.servers, metric.reporter.topic, partition.metric.sample.store.topic, broker.metric.sample.store.topic,capacity.config.file, self.healing. +|config 1.2+<.<|The Cruise Control configuration. For a full list of configuration options refer to https://github.com/linkedin/cruise-control/wiki/Configurations. Note that properties with the following prefixes cannot be set: bootstrap.servers, client.id, zookeeper., network., security., failed.brokers.zk.path,webserver.http., webserver.api.urlprefix, webserver.session.path, webserver.accesslog., two.step., request.reason.required,metric.reporter.sampler.bootstrap.servers, metric.reporter.topic, partition.metric.sample.store.topic, broker.metric.sample.store.topic,capacity.config.file, self.healing., ssl. |map |livenessProbe 1.2+<.<|Pod liveness checking for the Cruise Control container. |xref:type-Probe-{context}[`Probe`] diff --git a/helm-charts/strimzi-kafka-operator/templates/040-Crd-kafka.yaml b/helm-charts/strimzi-kafka-operator/templates/040-Crd-kafka.yaml index 34f5b366078..d94c6b6185f 100644 --- a/helm-charts/strimzi-kafka-operator/templates/040-Crd-kafka.yaml +++ b/helm-charts/strimzi-kafka-operator/templates/040-Crd-kafka.yaml @@ -4866,7 +4866,7 @@ spec: failed.brokers.zk.path,webserver.http., webserver.api.urlprefix, webserver.session.path, webserver.accesslog., two.step., request.reason.required,metric.reporter.sampler.bootstrap.servers, metric.reporter.topic, partition.metric.sample.store.topic, broker.metric.sample.store.topic,capacity.config.file, - self.healing.' + self.healing., ssl.' livenessProbe: type: object properties: diff --git a/install/cluster-operator/040-Crd-kafka.yaml b/install/cluster-operator/040-Crd-kafka.yaml index 6e2c10fabec..899b595d836 100644 --- a/install/cluster-operator/040-Crd-kafka.yaml +++ b/install/cluster-operator/040-Crd-kafka.yaml @@ -4861,7 +4861,7 @@ spec: failed.brokers.zk.path,webserver.http., webserver.api.urlprefix, webserver.session.path, webserver.accesslog., two.step., request.reason.required,metric.reporter.sampler.bootstrap.servers, metric.reporter.topic, partition.metric.sample.store.topic, broker.metric.sample.store.topic,capacity.config.file, - self.healing.' + self.healing., ssl.' livenessProbe: type: object properties: