Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Securing communication between Cruise Control and Kafka #37

Merged
merged 3 commits into from
Apr 22, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@ public class CruiseControlSpec implements UnknownPropertyPreserving, Serializabl
public static final String FORBIDDEN_PREFIXES = "bootstrap.servers, client.id, zookeeper., network., security., failed.brokers.zk.path,"
+ "webserver.http., webserver.api.urlprefix, webserver.session.path, webserver.accesslog., two.step., request.reason.required,"
+ "metric.reporter.sampler.bootstrap.servers, metric.reporter.topic, partition.metric.sample.store.topic, broker.metric.sample.store.topic,"
+ "capacity.config.file, self.healing.";
public static final String FORBIDDEN_PREFIX_EXCEPTIONS = "";
+ "capacity.config.file, self.healing., ssl.";
public static final String FORBIDDEN_PREFIX_EXCEPTIONS = "ssl.cipher.suites, ssl.protocol, ssl.enabled.protocols";

private int replicas;
private String image;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,6 @@ public class CruiseControl extends AbstractModel {
protected static final String APPLICATION_NAME = "cruise-control";

public static final String CRUISE_CONTROL_METRIC_REPORTER = "com.linkedin.kafka.cruisecontrol.metricsreporter.CruiseControlMetricsReporter";
public static final String CRUISE_CONTROL_METRICS_TOPIC = "cruise.control.metrics.topic";
public static final String CRUISE_CONTROL_METRICS_TOPIC_VALUE = "strimzi.cruisecontrol.metrics";

protected static final String CRUISE_CONTROL_CONTAINER_NAME = "cruise-control";
protected static final String TLS_SIDECAR_NAME = "tls-sidecar";
Expand Down Expand Up @@ -92,7 +90,7 @@ public class CruiseControl extends AbstractModel {

public static final String REST_API_PORT_NAME = "rest-api";
public static final int REST_API_PORT = 9090;
protected static final int DEFAULT_BOOTSTRAP_SERVERS_PORT = 9092;
protected static final int DEFAULT_BOOTSTRAP_SERVERS_PORT = 9091;
public static final String MIN_INSYNC_REPLICAS = "min.insync.replicas";

// Cruise Control configuration keys (EnvVariables)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

import io.fabric8.kubernetes.api.model.VolumeMount;
import io.strimzi.api.kafka.model.CertAndKeySecretSource;
import io.strimzi.api.kafka.model.CruiseControlSpec;
import io.strimzi.api.kafka.model.KafkaAuthorization;
import io.strimzi.api.kafka.model.KafkaAuthorizationKeycloak;
import io.strimzi.api.kafka.model.KafkaAuthorizationSimple;
Expand Down Expand Up @@ -56,11 +57,38 @@ public KafkaBrokerConfigurationBuilder() {
public KafkaBrokerConfigurationBuilder withBrokerId() {
printSectionHeader("Broker ID");
writer.println("broker.id=${STRIMZI_BROKER_ID}");

writer.println();

return this;
}

/**
* Configures the Cruise Control metric reporter. It is set only if user enabled the Cruise Control.
*
* @param clusterName Name of the cluster
* @param cruiseContol The Cruise Control configuration from the Kafka CR
*
* @return Returns the builder instance
*/
public KafkaBrokerConfigurationBuilder withCruiseControl(String clusterName, CruiseControlSpec cruiseContol) {
if (cruiseContol != null) {
printSectionHeader("Cruise Control configuration");
writer.println("cruise.control.metrics.topic=strimzi.cruisecontrol.metrics");
writer.println("cruise.control.metrics.reporter.ssl.endpoint.identification.algorithm=HTTPS");
writer.println("cruise.control.metrics.reporter.bootstrap.servers=" + KafkaResources.bootstrapServiceName(clusterName) + ":9091");
writer.println("cruise.control.metrics.reporter.security.protocol=SSL");
writer.println("cruise.control.metrics.reporter.ssl.keystore.type=PKCS12");
writer.println("cruise.control.metrics.reporter.ssl.keystore.location=/tmp/kafka/cluster.keystore.p12");
writer.println("cruise.control.metrics.reporter.ssl.keystore.password=${CERTS_STORE_PASSWORD}");
writer.println("cruise.control.metrics.reporter.ssl.truststore.type=PKCS12");
writer.println("cruise.control.metrics.reporter.ssl.truststore.location=/tmp/kafka/cluster.truststore.p12");
writer.println("cruise.control.metrics.reporter.ssl.truststore.password=${CERTS_STORE_PASSWORD}");
writer.println();
}

return this;
}
/**
* Adds the template for the {@code rack.id}. The rack ID will be set in the container based on the value of the
* {@code STRIMZI_RACK_ID} env var. It is set only if user enabled the rack awareness-
Expand Down Expand Up @@ -376,6 +404,8 @@ public KafkaBrokerConfigurationBuilder withAuthorization(String clusterName, Kaf
superUsers.add(String.format("User:CN=%s,O=io.strimzi", KafkaResources.kafkaStatefulSetName(clusterName)));
superUsers.add(String.format("User:CN=%s-%s,O=io.strimzi", clusterName, "entity-operator"));
superUsers.add(String.format("User:CN=%s-%s,O=io.strimzi", clusterName, "kafka-exporter"));
superUsers.add(String.format("User:CN=%s-%s,O=io.strimzi", clusterName, "cruise-control"));

superUsers.add(String.format("User:CN=%s,O=io.strimzi", "cluster-operator"));

printSectionHeader("Authorization");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,8 +115,6 @@
import java.util.Map;
import java.util.Set;

import static io.strimzi.operator.cluster.model.CruiseControl.CRUISE_CONTROL_METRICS_TOPIC;
import static io.strimzi.operator.cluster.model.CruiseControl.CRUISE_CONTROL_METRICS_TOPIC_VALUE;
import static java.util.Collections.addAll;
import static java.util.Collections.emptyMap;
import static java.util.Collections.singletonMap;
Expand Down Expand Up @@ -222,6 +220,7 @@ public class KafkaCluster extends AbstractModel {
private KafkaListeners listeners;
private KafkaAuthorization authorization;
private KafkaVersion kafkaVersion;
private CruiseControlSpec cruiseControlSpec;
private boolean isJmxEnabled;
private boolean isJmxAuthenticated;
private CertAndKeySecretSource secretSourceExternal = null;
Expand Down Expand Up @@ -464,13 +463,11 @@ public static KafkaCluster fromCrd(Kafka kafkaAssembly, KafkaVersion.Lookup vers
addAll(metricReporterList, configuration.getConfigOption(KAFKA_METRIC_REPORTERS_CONFIG_FIELD).split(","));
}

CruiseControlSpec cruiseControlSpec = kafkaSpec.getCruiseControl();
if (cruiseControlSpec != null) {
result.cruiseControlSpec = kafkaSpec.getCruiseControl();
if (result.cruiseControlSpec != null) {
metricReporterList.add(CRUISE_CONTROL_METRIC_REPORTER);
configuration.setConfigOption(CRUISE_CONTROL_METRICS_TOPIC, CRUISE_CONTROL_METRICS_TOPIC_VALUE);
} else {
metricReporterList.remove(CRUISE_CONTROL_METRIC_REPORTER);
configuration.removeConfigOption(CRUISE_CONTROL_METRICS_TOPIC);
}
if (!metricReporterList.isEmpty()) {
configuration.setConfigOption(KAFKA_METRIC_REPORTERS_CONFIG_FIELD, String.join(",", metricReporterList));
Expand Down Expand Up @@ -2046,11 +2043,18 @@ public NetworkPolicy generateNetworkPolicy(boolean namespaceAndPodSelectorNetwor
.endPodSelector()
.build();

NetworkPolicyPeer cruiseControlPeer = new NetworkPolicyPeerBuilder()
.withNewPodSelector() // cruise control
.addToMatchLabels(Labels.STRIMZI_NAME_LABEL, CruiseControl.cruiseControlName(cluster))
.endPodSelector()
.build();

List<NetworkPolicyPeer> clientsPortPeers = new ArrayList<>(4);
clientsPortPeers.add(clusterOperatorPeer);
clientsPortPeers.add(kafkaClusterPeer);
clientsPortPeers.add(entityOperatorPeer);
clientsPortPeers.add(kafkaExporterPeer);
clientsPortPeers.add(cruiseControlPeer);

replicationRule.setFrom(clientsPortPeers);
}
Expand Down Expand Up @@ -2472,6 +2476,7 @@ private String generateBrokerConfiguration() {
.withLogDirs(VolumeUtils.getDataVolumeMountPaths(storage, mountPath))
.withListeners(cluster, namespace, listeners)
.withAuthorization(cluster, authorization)
.withCruiseControl(cluster, cruiseControlSpec)
.withUserConfiguration(configuration)
.build().trim();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@

import io.strimzi.api.kafka.model.CertSecretSource;
import io.strimzi.api.kafka.model.CertSecretSourceBuilder;
import io.strimzi.api.kafka.model.CruiseControlSpec;
import io.strimzi.api.kafka.model.CruiseControlSpecBuilder;
import io.strimzi.api.kafka.model.KafkaAuthorization;
import io.strimzi.api.kafka.model.KafkaAuthorizationKeycloakBuilder;
import io.strimzi.api.kafka.model.KafkaAuthorizationSimpleBuilder;
Expand Down Expand Up @@ -50,6 +52,36 @@ public void testBrokerId() {
assertThat(configuration, isEquivalent("broker.id=${STRIMZI_BROKER_ID}"));
}

@Test
public void testNoCruiseControl() {
String configuration = new KafkaBrokerConfigurationBuilder()
.withCruiseControl("my-cluster", null)
.build();

assertThat(configuration, isEquivalent(""));
}

@Test
public void testCruiseControl() {
CruiseControlSpec cruiseControlSpec = new CruiseControlSpecBuilder().build();

String configuration = new KafkaBrokerConfigurationBuilder()
.withCruiseControl("my-cluster", cruiseControlSpec)
.build();

assertThat(configuration, isEquivalent(
"cruise.control.metrics.topic=strimzi.cruisecontrol.metrics\n" +
"cruise.control.metrics.reporter.ssl.endpoint.identification.algorithm=HTTPS\n" +
"cruise.control.metrics.reporter.bootstrap.servers=my-cluster-kafka-bootstrap:9091\n" +
"cruise.control.metrics.reporter.security.protocol=SSL\n" +
"cruise.control.metrics.reporter.ssl.keystore.type=PKCS12\n" +
"cruise.control.metrics.reporter.ssl.keystore.location=/tmp/kafka/cluster.keystore.p12\n" +
"cruise.control.metrics.reporter.ssl.keystore.password=${CERTS_STORE_PASSWORD}\n" +
"cruise.control.metrics.reporter.ssl.truststore.type=PKCS12\n" +
"cruise.control.metrics.reporter.ssl.truststore.location=/tmp/kafka/cluster.truststore.p12\n" +
"cruise.control.metrics.reporter.ssl.truststore.password=${CERTS_STORE_PASSWORD}"));
}

@Test
public void testNoRackAwareness() {
String configuration = new KafkaBrokerConfigurationBuilder()
Expand Down Expand Up @@ -108,7 +140,7 @@ public void testSimpleAuthorizationWithSuperUsers() {
.build();

assertThat(configuration, isEquivalent("authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer\n" +
"super.users=User:CN=my-cluster-kafka,O=io.strimzi;User:CN=my-cluster-entity-operator,O=io.strimzi;User:CN=my-cluster-kafka-exporter,O=io.strimzi;User:CN=cluster-operator,O=io.strimzi;User:jakub;User:CN=kuba"));
"super.users=User:CN=my-cluster-kafka,O=io.strimzi;User:CN=my-cluster-entity-operator,O=io.strimzi;User:CN=my-cluster-kafka-exporter,O=io.strimzi;User:CN=my-cluster-cruise-control,O=io.strimzi;User:CN=cluster-operator,O=io.strimzi;User:jakub;User:CN=kuba"));
}

@Test
Expand All @@ -121,7 +153,7 @@ public void testSimpleAuthorizationWithoutSuperUsers() {
.build();

assertThat(configuration, isEquivalent("authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer\n" +
"super.users=User:CN=my-cluster-kafka,O=io.strimzi;User:CN=my-cluster-entity-operator,O=io.strimzi;User:CN=my-cluster-kafka-exporter,O=io.strimzi;User:CN=cluster-operator,O=io.strimzi"));
"super.users=User:CN=my-cluster-kafka,O=io.strimzi;User:CN=my-cluster-entity-operator,O=io.strimzi;User:CN=my-cluster-kafka-exporter,O=io.strimzi;User:CN=my-cluster-cruise-control,O=io.strimzi;User:CN=cluster-operator,O=io.strimzi"));
}

@Test
Expand Down Expand Up @@ -155,7 +187,7 @@ public void testKeycloakAuthorization() {
"strimzi.authorization.ssl.truststore.type=PKCS12\n" +
"strimzi.authorization.ssl.secure.random.implementation=SHA1PRNG\n" +
"strimzi.authorization.ssl.endpoint.identification.algorithm=\n" +
"super.users=User:CN=my-cluster-kafka,O=io.strimzi;User:CN=my-cluster-entity-operator,O=io.strimzi;User:CN=my-cluster-kafka-exporter,O=io.strimzi;User:CN=cluster-operator,O=io.strimzi;User:giada;User:CN=paccu"));
"super.users=User:CN=my-cluster-kafka,O=io.strimzi;User:CN=my-cluster-entity-operator,O=io.strimzi;User:CN=my-cluster-kafka-exporter,O=io.strimzi;User:CN=my-cluster-cruise-control,O=io.strimzi;User:CN=cluster-operator,O=io.strimzi;User:giada;User:CN=paccu"));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1488,6 +1488,12 @@ public void testReplicationPortNetworkPolicy() {
.endPodSelector()
.build();

NetworkPolicyPeer cruiseControlPeer = new NetworkPolicyPeerBuilder()
.withNewPodSelector()
.withMatchLabels(Collections.singletonMap(Labels.STRIMZI_NAME_LABEL, CruiseControl.cruiseControlName(cluster)))
.endPodSelector()
.build();

NetworkPolicyPeer clusterOperatorPeer = new NetworkPolicyPeerBuilder()
.withNewPodSelector()
.withMatchLabels(Collections.singletonMap(Labels.STRIMZI_KIND_LABEL, "cluster-operator"))
Expand All @@ -1506,10 +1512,11 @@ public void testReplicationPortNetworkPolicy() {

List<NetworkPolicyPeer> rules = np.getSpec().getIngress().stream().filter(ing -> ing.getPorts().get(0).getPort().equals(new IntOrString(KafkaCluster.REPLICATION_PORT))).map(NetworkPolicyIngressRule::getFrom).findFirst().orElse(null);

assertThat(rules.size(), is(4));
assertThat(rules.size(), is(5));
assertThat(rules.contains(kafkaBrokersPeer), is(true));
assertThat(rules.contains(eoPeer), is(true));
assertThat(rules.contains(kafkaExporterPeer), is(true));
assertThat(rules.contains(cruiseControlPeer), is(true));
assertThat(rules.contains(clusterOperatorPeer), is(true));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,5 +43,12 @@ capacity.config.file=$CC_CAPACITY_FILE
cluster.configs.file=$CC_CLUSTER_CONFIG_FILE
webserver.accesslog.path=$CC_ACCESS_LOG
webserver.http.address=0.0.0.0
security.protocol=SSL
ssl.keystore.type=PKCS12
ssl.keystore.location=/tmp/cruise-control/replication.keystore.p12
ssl.keystore.password=$CERTS_STORE_PASSWORD
ssl.truststore.type=PKCS12
ssl.truststore.location=/tmp/cruise-control/replication.truststore.p12
ssl.truststore.password=$CERTS_STORE_PASSWORD
${CRUISE_CONTROL_CONFIGURATION}
EOF
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
#!/bin/bash
#!/usr/bin/env bash
set +x

export CLASSPATH="$CLASSPATH:/opt/cruise-control/libs/*"
Expand Down Expand Up @@ -40,7 +40,7 @@ fi

# Generate and print the config file
echo "Starting Cruise Control with configuration:"
$CRUISE_CONTROL_HOME/cruise_control_config_generator.sh | tee /tmp/cruisecontrol.properties
$CRUISE_CONTROL_HOME/cruise_control_config_generator.sh | tee /tmp/cruisecontrol.properties | sed -e 's/password=.*/password=[hidden]/g'
echo ""

# JVM performance options
Expand Down
2 changes: 1 addition & 1 deletion documentation/modules/appendix_crds.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -1354,7 +1354,7 @@ Used in: xref:type-KafkaSpec-{context}[`KafkaSpec`]
|integer
|image 1.2+<.<|The docker image for the pods.
|string
|config 1.2+<.<|The Cruise Control configuration. For a full list of configuration options refer to https://github.com/linkedin/cruise-control/wiki/Configurations. Note that properties with the following prefixes cannot be set: bootstrap.servers, client.id, zookeeper., network., security., failed.brokers.zk.path,webserver.http., webserver.api.urlprefix, webserver.session.path, webserver.accesslog., two.step., request.reason.required,metric.reporter.sampler.bootstrap.servers, metric.reporter.topic, partition.metric.sample.store.topic, broker.metric.sample.store.topic,capacity.config.file, self.healing.
|config 1.2+<.<|The Cruise Control configuration. For a full list of configuration options refer to https://github.com/linkedin/cruise-control/wiki/Configurations. Note that properties with the following prefixes cannot be set: bootstrap.servers, client.id, zookeeper., network., security., failed.brokers.zk.path,webserver.http., webserver.api.urlprefix, webserver.session.path, webserver.accesslog., two.step., request.reason.required,metric.reporter.sampler.bootstrap.servers, metric.reporter.topic, partition.metric.sample.store.topic, broker.metric.sample.store.topic,capacity.config.file, self.healing., ssl.
|map
|livenessProbe 1.2+<.<|Pod liveness checking for the Cruise Control container.
|xref:type-Probe-{context}[`Probe`]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4866,7 +4866,7 @@ spec:
failed.brokers.zk.path,webserver.http., webserver.api.urlprefix,
webserver.session.path, webserver.accesslog., two.step., request.reason.required,metric.reporter.sampler.bootstrap.servers,
metric.reporter.topic, partition.metric.sample.store.topic, broker.metric.sample.store.topic,capacity.config.file,
self.healing.'
self.healing., ssl.'
livenessProbe:
type: object
properties:
Expand Down
2 changes: 1 addition & 1 deletion install/cluster-operator/040-Crd-kafka.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4861,7 +4861,7 @@ spec:
failed.brokers.zk.path,webserver.http., webserver.api.urlprefix,
webserver.session.path, webserver.accesslog., two.step., request.reason.required,metric.reporter.sampler.bootstrap.servers,
metric.reporter.topic, partition.metric.sample.store.topic, broker.metric.sample.store.topic,capacity.config.file,
self.healing.'
self.healing., ssl.'
livenessProbe:
type: object
properties:
Expand Down