Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added security context, network policy and tests #39

Merged
merged 2 commits into from
Apr 23, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
)
@JsonInclude(JsonInclude.Include.NON_NULL)
@JsonPropertyOrder({
"replicas", "image", "config",
"image", "config",
"livenessProbe", "readinessProbe",
"jvmOptions", "resources",
"logging", "tlsSidecar", "template", "brokerCapacity"})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import io.fabric8.kubernetes.api.model.LifecycleBuilder;
import io.fabric8.kubernetes.api.model.LocalObjectReference;
import io.fabric8.kubernetes.api.model.Secret;
import io.fabric8.kubernetes.api.model.SecurityContext;
import io.fabric8.kubernetes.api.model.Service;
import io.fabric8.kubernetes.api.model.ServicePort;
import io.fabric8.kubernetes.api.model.Toleration;
Expand All @@ -23,6 +24,12 @@
import io.fabric8.kubernetes.api.model.apps.DeploymentStrategy;
import io.fabric8.kubernetes.api.model.apps.DeploymentStrategyBuilder;
import io.fabric8.kubernetes.api.model.apps.RollingUpdateDeploymentBuilder;
import io.fabric8.kubernetes.api.model.networking.NetworkPolicy;
import io.fabric8.kubernetes.api.model.networking.NetworkPolicyBuilder;
import io.fabric8.kubernetes.api.model.networking.NetworkPolicyIngressRule;
import io.fabric8.kubernetes.api.model.networking.NetworkPolicyIngressRuleBuilder;
import io.fabric8.kubernetes.api.model.networking.NetworkPolicyPeer;
import io.fabric8.kubernetes.api.model.networking.NetworkPolicyPeerBuilder;
import io.fabric8.kubernetes.api.model.policy.PodDisruptionBudget;
import io.strimzi.api.kafka.model.ContainerEnvVar;
import io.strimzi.api.kafka.model.CruiseControlResources;
Expand All @@ -39,6 +46,7 @@
import io.strimzi.operator.cluster.ClusterOperatorConfig;
import io.strimzi.operator.cluster.model.cruisecontrol.Capacity;
import io.strimzi.operator.common.Annotations;
import io.strimzi.operator.common.model.Labels;

import java.util.ArrayList;
import java.util.Arrays;
Expand All @@ -63,6 +71,7 @@ public class CruiseControl extends AbstractModel {
protected static final String TLS_SIDECAR_CA_CERTS_VOLUME_MOUNT = "/etc/tls-sidecar/cluster-ca-certs/";
protected static final String LOG_AND_METRICS_CONFIG_VOLUME_NAME = "cruise-control-logging";
protected static final String LOG_AND_METRICS_CONFIG_VOLUME_MOUNT = "/opt/cruise-control/custom-config/";
private static final String NAME_SUFFIX = "-cruise-control";

public static final String ANNO_STRIMZI_IO_LOGGING = Annotations.STRIMZI_DOMAIN + "logging";

Expand Down Expand Up @@ -107,6 +116,9 @@ public class CruiseControl extends AbstractModel {
protected List<ContainerEnvVar> templateCruiseControlContainerEnvVars;
protected List<ContainerEnvVar> templateTlsSidecarContainerEnvVars;

protected SecurityContext templateCruiseControlContainerSecurityContext;
protected SecurityContext templateTlsSidecarContainerSecurityContext;

private boolean isDeployed;

/**
Expand Down Expand Up @@ -250,6 +262,14 @@ public static CruiseControl updateTemplate(CruiseControlSpec spec, CruiseControl
cruiseControl.templateTlsSidecarContainerEnvVars = template.getTlsSidecarContainer().getEnv();
}

if (template.getCruiseControlContainer() != null && template.getCruiseControlContainer().getSecurityContext() != null) {
cruiseControl.templateCruiseControlContainerSecurityContext = template.getCruiseControlContainer().getSecurityContext();
}

if (template.getTlsSidecarContainer() != null && template.getTlsSidecarContainer().getSecurityContext() != null) {
cruiseControl.templateTlsSidecarContainerSecurityContext = template.getTlsSidecarContainer().getSecurityContext();
}

ModelUtils.parsePodDisruptionBudgetTemplate(cruiseControl, template.getPodDisruptionBudget());
}
return cruiseControl;
Expand Down Expand Up @@ -352,6 +372,7 @@ protected List<Container> getContainers(ImagePullPolicy imagePullPolicy) {
.withResources(getResources())
.withVolumeMounts(getVolumeMounts())
.withImagePullPolicy(determineImagePullPolicy(imagePullPolicy, getImage()))
.withSecurityContext(templateCruiseControlContainerSecurityContext)
.build();

String tlsSidecarImage = this.tlsSidecarImage;
Expand All @@ -374,6 +395,7 @@ protected List<Container> getContainers(ImagePullPolicy imagePullPolicy) {
String.valueOf(templateTerminationGracePeriodSeconds))
.endExec().endPreStop().build())
.withImagePullPolicy(determineImagePullPolicy(imagePullPolicy, tlsSidecarImage))
.withSecurityContext(templateTlsSidecarContainerSecurityContext)
.build();

containers.add(container);
Expand Down Expand Up @@ -481,4 +503,57 @@ public Secret generateSecret(ClusterCa clusterCa, boolean isMaintenanceTimeWindo
return ModelUtils.buildSecret(clusterCa, secret, namespace, CruiseControl.secretName(cluster), name, "cruise-control", labels, createOwnerReference(), isMaintenanceTimeWindowsSatisfied);
}

/**
* @param cluster The name of the cluster.
* @return The name of the network policy.
*/
public static String policyName(String cluster) {
return cluster + NETWORK_POLICY_KEY_SUFFIX + NAME_SUFFIX;
}

/**
* @param namespaceAndPodSelectorNetworkPolicySupported whether the kube cluster supports namespace selectors
* @return The network policy.
*/
public NetworkPolicy generateNetworkPolicy(boolean namespaceAndPodSelectorNetworkPolicySupported) {
List<NetworkPolicyIngressRule> rules = new ArrayList<>(1);

// CO can access the REST API
NetworkPolicyIngressRule restApiRule = new NetworkPolicyIngressRuleBuilder()
.addNewPort()
.withNewPort(REST_API_PORT)
.endPort()
.build();

if (namespaceAndPodSelectorNetworkPolicySupported) {
NetworkPolicyPeer clusterOperatorPeer = new NetworkPolicyPeerBuilder()
.withNewPodSelector() // cluster operator
.addToMatchLabels(Labels.STRIMZI_KIND_LABEL, "cluster-operator")
.endPodSelector()
.withNewNamespaceSelector()
.endNamespaceSelector()
.build();
restApiRule.setFrom(Collections.singletonList(clusterOperatorPeer));
}

rules.add(restApiRule);

NetworkPolicy networkPolicy = new NetworkPolicyBuilder()
.withNewMetadata()
.withName(policyName(cluster))
.withNamespace(namespace)
.withLabels(labels.toMap())
.withOwnerReferences(createOwnerReference())
.endMetadata()
.withNewSpec()
.withNewPodSelector()
.addToMatchLabels(Labels.STRIMZI_NAME_LABEL, cruiseControlName(cluster))
.endPodSelector()
.withIngress(rules)
.endSpec()
.build();

log.trace("Created network policy {}", networkPolicy);
return networkPolicy;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -340,6 +340,7 @@ Future<Void> reconcile(ReconciliationState reconcileState) {
.compose(state -> state.entityOperatorReady())

.compose(state -> state.getCruiseControlDescription())
.compose(state -> state.cruiseControlNetPolicy())
.compose(state -> state.cruiseControlServiceAccount())
.compose(state -> state.cruiseControlAncillaryCm())
.compose(state -> state.cruiseControlSecret(this::dateSupplier))
Expand Down Expand Up @@ -3165,6 +3166,10 @@ Future<ReconciliationState> cruiseControlReady() {
return withVoid(Future.succeededFuture());
}

Future<ReconciliationState> cruiseControlNetPolicy() {
return withVoid(networkPolicyOperator.reconcile(namespace, CruiseControl.policyName(name), this.cruiseControl.generateNetworkPolicy(pfa.isNamespaceAndPodSelectorNetworkPolicySupported())));
}

private boolean isPodCaCertUpToDate(Pod pod, Ca ca) {
final int caCertGeneration = getCaCertGeneration(ca);
String podAnnotation = getCaCertAnnotation(ca);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,19 @@
import io.fabric8.kubernetes.api.model.EnvVar;
import io.fabric8.kubernetes.api.model.EnvVarBuilder;
import io.fabric8.kubernetes.api.model.IntOrString;
import io.fabric8.kubernetes.api.model.PodSecurityContextBuilder;
import io.fabric8.kubernetes.api.model.Quantity;
import io.fabric8.kubernetes.api.model.ResourceRequirementsBuilder;
import io.fabric8.kubernetes.api.model.SecurityContext;
import io.fabric8.kubernetes.api.model.SecurityContextBuilder;
import io.fabric8.kubernetes.api.model.Service;
import io.fabric8.kubernetes.api.model.Volume;
import io.fabric8.kubernetes.api.model.VolumeMount;
import io.fabric8.kubernetes.api.model.apps.Deployment;
import io.fabric8.kubernetes.api.model.networking.NetworkPolicy;
import io.fabric8.kubernetes.api.model.networking.NetworkPolicyIngressRule;
import io.fabric8.kubernetes.api.model.networking.NetworkPolicyPeer;
import io.fabric8.kubernetes.api.model.networking.NetworkPolicyPeerBuilder;
import io.fabric8.kubernetes.api.model.policy.PodDisruptionBudget;
import io.strimzi.api.kafka.model.ContainerEnvVar;
import io.strimzi.api.kafka.model.CruiseControlResources;
Expand All @@ -37,6 +44,7 @@
import org.junit.jupiter.api.Test;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand All @@ -50,10 +58,14 @@
import static io.strimzi.operator.cluster.model.cruisecontrol.Capacity.DEFAULT_BROKER_INBOUND_NETWORK_KIB_PER_SECOND_CAPACITY;
import static io.strimzi.operator.cluster.model.cruisecontrol.Capacity.DEFAULT_BROKER_OUTBOUND_NETWORK_KIB_PER_SECOND_CAPACITY;
import static java.util.Collections.singletonMap;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.hasItem;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.CoreMatchers.notNullValue;
import static org.hamcrest.CoreMatchers.nullValue;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.allOf;
import static org.hamcrest.Matchers.hasProperty;
import static org.hamcrest.Matchers.hasItems;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
Expand Down Expand Up @@ -569,6 +581,148 @@ public void testProbeConfiguration() {
assertThat(ccContainer.getReadinessProbe().getTimeoutSeconds(), is(new Integer(healthTimeout)));
}

@Test
public void testSecurityContext() {
CruiseControlSpec cruiseControlSpec = new CruiseControlSpecBuilder()
.withImage(ccImage)
.withConfig((Map) configuration.asOrderedProperties().asMap())
.withNewTemplate()
.withNewPod()
.withSecurityContext(new PodSecurityContextBuilder().withFsGroup(123L).withRunAsGroup(456L).withRunAsUser(789L).build())
.endPod()
.endTemplate()
.build();

Kafka resource =
new KafkaBuilder(ResourceUtils.createKafkaCluster(namespace, cluster, replicas, image, healthDelay, healthTimeout))
.editSpec()
.editKafka()
.withVersion(version)
.endKafka()
.withCruiseControl(cruiseControlSpec)
.endSpec()
.build();

CruiseControl cc = CruiseControl.fromCrd(resource, VERSIONS);

Deployment dep = cc.generateDeployment(true, null, null, null);
assertThat(dep.getSpec().getTemplate().getSpec().getSecurityContext(), is(notNullValue()));
assertThat(dep.getSpec().getTemplate().getSpec().getSecurityContext().getFsGroup(), is(Long.valueOf(123)));
assertThat(dep.getSpec().getTemplate().getSpec().getSecurityContext().getRunAsGroup(), is(Long.valueOf(456)));
assertThat(dep.getSpec().getTemplate().getSpec().getSecurityContext().getRunAsUser(), is(Long.valueOf(789)));
}

@Test
public void testDefaultSecurityContext() {
Deployment dep = cc.generateDeployment(true, null, null, null);
assertThat(dep.getSpec().getTemplate().getSpec().getSecurityContext(), is(nullValue()));
}

@Test
public void testCruiseControlContainerSecurityContext() {
SecurityContext securityContext = new SecurityContextBuilder()
.withPrivileged(false)
.withNewReadOnlyRootFilesystem(false)
.withAllowPrivilegeEscalation(false)
.withRunAsNonRoot(true)
.withNewCapabilities()
.addNewDrop("ALL")
.endCapabilities()
.build();

CruiseControlSpec cruiseControlSpec = new CruiseControlSpecBuilder()
.withImage(ccImage)
.withConfig((Map) configuration.asOrderedProperties().asMap())
.withNewTemplate()
.withNewCruiseControlContainer()
.withSecurityContext(securityContext)
.endCruiseControlContainer()
.endTemplate()
.build();

Kafka resource =
new KafkaBuilder(ResourceUtils.createKafkaCluster(namespace, cluster, replicas, image, healthDelay, healthTimeout))
.editSpec()
.editKafka()
.withVersion(version)
.endKafka()
.withCruiseControl(cruiseControlSpec)
.endSpec()
.build();

CruiseControl cc = CruiseControl.fromCrd(resource, VERSIONS);

Deployment dep = cc.generateDeployment(true, null, null, null);

assertThat(dep.getSpec().getTemplate().getSpec().getContainers(),
hasItem(allOf(
hasProperty("name", equalTo(CruiseControl.CRUISE_CONTROL_CONTAINER_NAME)),
hasProperty("securityContext", equalTo(securityContext))
)));
}

@Test
public void testTlsSidecarContainerSecurityContext() {
SecurityContext securityContext = new SecurityContextBuilder()
.withPrivileged(false)
.withNewReadOnlyRootFilesystem(false)
.withAllowPrivilegeEscalation(false)
.withRunAsNonRoot(true)
.withNewCapabilities()
.addNewDrop("ALL")
.endCapabilities()
.build();

CruiseControlSpec cruiseControlSpec = new CruiseControlSpecBuilder()
.withImage(ccImage)
.withConfig((Map) configuration.asOrderedProperties().asMap())
.withNewTemplate()
.withNewTlsSidecarContainer()
.withSecurityContext(securityContext)
.endTlsSidecarContainer()
.endTemplate()
.build();

Kafka resource =
new KafkaBuilder(ResourceUtils.createKafkaCluster(namespace, cluster, replicas, image, healthDelay, healthTimeout))
.editSpec()
.editKafka()
.withVersion(version)
.endKafka()
.withCruiseControl(cruiseControlSpec)
.endSpec()
.build();

CruiseControl cc = CruiseControl.fromCrd(resource, VERSIONS);

Deployment dep = cc.generateDeployment(true, null, null, null);

assertThat(dep.getSpec().getTemplate().getSpec().getContainers(),
hasItem(allOf(
hasProperty("name", equalTo(CruiseControl.TLS_SIDECAR_NAME)),
hasProperty("securityContext", equalTo(securityContext))
)));
}

@Test
public void testRestApiPortNetworkPolicy() {
NetworkPolicyPeer clusterOperatorPeer = new NetworkPolicyPeerBuilder()
.withNewPodSelector()
.withMatchLabels(Collections.singletonMap(Labels.STRIMZI_KIND_LABEL, "cluster-operator"))
.endPodSelector()
.withNewNamespaceSelector().endNamespaceSelector()
.build();

NetworkPolicy np = cc.generateNetworkPolicy(true);

assertThat(np.getSpec().getIngress().stream().filter(ing -> ing.getPorts().get(0).getPort().equals(new IntOrString(CruiseControl.REST_API_PORT))).findFirst().orElse(null), is(notNullValue()));

List<NetworkPolicyPeer> rules = np.getSpec().getIngress().stream().filter(ing -> ing.getPorts().get(0).getPort().equals(new IntOrString(CruiseControl.REST_API_PORT))).map(NetworkPolicyIngressRule::getFrom).findFirst().orElse(null);

assertThat(rules.size(), is(1));
assertThat(rules.contains(clusterOperatorPeer), is(true));
}

@AfterAll
public static void cleanUp() {
ResourceUtils.cleanUpTemporaryTLSFiles();
Expand Down
2 changes: 1 addition & 1 deletion examples/kafka/kafka-cruise-control.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ metadata:
name: my-cluster
spec:
kafka:
version: 2.4.0
version: 2.4.1
replicas: 3
listeners:
plain: {}
Expand Down