Skip to content

Commit

Permalink
Refactor use of Informers in UO and SPS controller (#10885)
Browse files Browse the repository at this point in the history
Signed-off-by: Jakub Scholz <www@scholzj.com>
  • Loading branch information
scholzj authored Nov 29, 2024
1 parent 97887a8 commit 36fc647
Show file tree
Hide file tree
Showing 8 changed files with 194 additions and 182 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
import io.fabric8.kubernetes.api.model.DeletionPropagation;
import io.fabric8.kubernetes.api.model.HasMetadata;
import io.fabric8.kubernetes.api.model.LabelSelector;
import io.fabric8.kubernetes.api.model.LabelSelectorBuilder;
import io.fabric8.kubernetes.api.model.LabelSelectorRequirement;
import io.fabric8.kubernetes.api.model.OwnerReference;
import io.fabric8.kubernetes.api.model.Pod;
Expand All @@ -17,8 +16,6 @@
import io.fabric8.kubernetes.client.dsl.base.PatchContext;
import io.fabric8.kubernetes.client.dsl.base.PatchType;
import io.fabric8.kubernetes.client.informers.ResourceEventHandler;
import io.fabric8.kubernetes.client.informers.SharedIndexInformer;
import io.fabric8.kubernetes.client.informers.cache.Lister;
import io.fabric8.kubernetes.client.readiness.Readiness;
import io.micrometer.core.instrument.Timer;
import io.strimzi.api.kafka.model.connect.KafkaConnect;
Expand All @@ -45,6 +42,7 @@
import io.strimzi.operator.common.model.Labels;
import io.strimzi.operator.common.model.StatusDiff;
import io.strimzi.operator.common.model.StatusUtils;
import io.strimzi.operator.common.operator.resource.concurrent.Informer;

import java.util.Collection;
import java.util.HashSet;
Expand All @@ -62,9 +60,7 @@ public class StrimziPodSetController implements Runnable {
private static final ReconciliationLogger LOGGER = ReconciliationLogger.create(StrimziPodSetController.class);

private static final long DEFAULT_RESYNC_PERIOD_MS = 5 * 60 * 1_000L; // 5 minutes by default
private static final LabelSelector POD_LABEL_SELECTOR = new LabelSelectorBuilder()
.withMatchExpressions(new LabelSelectorRequirement(Labels.STRIMZI_KIND_LABEL, "Exists", null))
.build();
private static final LabelSelector POD_LABEL_SELECTOR = new LabelSelector(List.of(new LabelSelectorRequirement(Labels.STRIMZI_KIND_LABEL, "Exists", null)), null);

private final Thread controllerThread;

Expand All @@ -77,16 +73,11 @@ public class StrimziPodSetController implements Runnable {
private final String watchedNamespace;

private final BlockingQueue<SimplifiedReconciliation> workQueue;
private final SharedIndexInformer<Pod> podInformer;
private final SharedIndexInformer<StrimziPodSet> strimziPodSetInformer;
private final SharedIndexInformer<Kafka> kafkaInformer;
private final SharedIndexInformer<KafkaConnect> kafkaConnectInformer;
private final SharedIndexInformer<KafkaMirrorMaker2> kafkaMirrorMaker2Informer;
private final Lister<Pod> podLister;
private final Lister<StrimziPodSet> strimziPodSetLister;
private final Lister<Kafka> kafkaLister;
private final Lister<KafkaConnect> kafkaConnectLister;
private final Lister<KafkaMirrorMaker2> kafkaMirrorMaker2Lister;
private final Informer<Pod> podInformer;
private final Informer<StrimziPodSet> strimziPodSetInformer;
private final Informer<Kafka> kafkaInformer;
private final Informer<KafkaConnect> kafkaConnectInformer;
private final Informer<KafkaMirrorMaker2> kafkaMirrorMaker2Informer;

/**
* Creates the StrimziPodSet controller. The controller should normally exist once per operator for cluster-wide mode
Expand Down Expand Up @@ -117,7 +108,7 @@ public StrimziPodSetController(
) {
this.podOperator = podOperator;
this.strimziPodSetOperator = strimziPodSetOperator;
this.crSelector = (crSelectorLabels == null || crSelectorLabels.toMap().isEmpty()) ? null : new LabelSelector(null, crSelectorLabels.toMap());
this.crSelector = new LabelSelector(null, (crSelectorLabels == null || crSelectorLabels.toMap().isEmpty()) ? null : crSelectorLabels.toMap());
this.watchedNamespace = watchedNamespace;
this.workQueue = new ArrayBlockingQueue<>(podSetControllerWorkQueueSize);

Expand All @@ -126,20 +117,15 @@ public StrimziPodSetController(

// Kafka, KafkaConnect and KafkaMirrorMaker2 informers and listers are used to get the CRs quickly.
// This is needed for verification of the CR selector labels.
this.kafkaInformer = kafkaOperator.informer(watchedNamespace, (crSelectorLabels == null) ? Map.of() : crSelectorLabels.toMap(), DEFAULT_RESYNC_PERIOD_MS);
this.kafkaLister = new Lister<>(kafkaInformer.getIndexer());
this.kafkaConnectInformer = kafkaConnectOperator.informer(watchedNamespace, (crSelectorLabels == null) ? Map.of() : crSelectorLabels.toMap(), DEFAULT_RESYNC_PERIOD_MS);
this.kafkaConnectLister = new Lister<>(kafkaConnectInformer.getIndexer());
this.kafkaMirrorMaker2Informer = kafkaMirrorMaker2Operator.informer(watchedNamespace, (crSelectorLabels == null) ? Map.of() : crSelectorLabels.toMap(), DEFAULT_RESYNC_PERIOD_MS);
this.kafkaMirrorMaker2Lister = new Lister<>(kafkaMirrorMaker2Informer.getIndexer());
this.kafkaInformer = kafkaOperator.informer(watchedNamespace, crSelector, DEFAULT_RESYNC_PERIOD_MS);
this.kafkaConnectInformer = kafkaConnectOperator.informer(watchedNamespace, crSelector, DEFAULT_RESYNC_PERIOD_MS);
this.kafkaMirrorMaker2Informer = kafkaMirrorMaker2Operator.informer(watchedNamespace, crSelector, DEFAULT_RESYNC_PERIOD_MS);

// StrimziPodSet informer and lister is used to get events about StrimziPodSet and get StrimziPodSet quickly
this.strimziPodSetInformer = strimziPodSetOperator.informer(watchedNamespace, DEFAULT_RESYNC_PERIOD_MS);
this.strimziPodSetLister = new Lister<>(strimziPodSetInformer.getIndexer());
this.strimziPodSetInformer = strimziPodSetOperator.informer(watchedNamespace, new LabelSelector(), DEFAULT_RESYNC_PERIOD_MS);

// Pod informer and lister is used to get events about pods and get pods quickly
this.podInformer = podOperator.informer(watchedNamespace, POD_LABEL_SELECTOR, DEFAULT_RESYNC_PERIOD_MS);
this.podLister = new Lister<>(podInformer.getIndexer());

this.controllerThread = new Thread(this, "StrimziPodSetController");
}
Expand All @@ -158,26 +144,13 @@ protected boolean isSynced() {

protected void startController() {
strimziPodSetInformer.addEventHandler(new PodSetEventHandler());
strimziPodSetInformer.exceptionHandler((isStarted, throwable) -> InformerUtils.loggingExceptionHandler("StrimziPodSet", isStarted, throwable));

podInformer.addEventHandler(new PodEventHandler());
podInformer.exceptionHandler((isStarted, throwable) -> InformerUtils.loggingExceptionHandler("Pod", isStarted, throwable));

kafkaInformer.exceptionHandler((isStarted, throwable) -> InformerUtils.loggingExceptionHandler("Kafka", isStarted, throwable));
kafkaConnectInformer.exceptionHandler((isStarted, throwable) -> InformerUtils.loggingExceptionHandler("KafkaConnect", isStarted, throwable));
kafkaMirrorMaker2Informer.exceptionHandler((isStarted, throwable) -> InformerUtils.loggingExceptionHandler("KafkaMirrorMaker2", isStarted, throwable));

strimziPodSetInformer.start();
podInformer.start();
kafkaInformer.start();
kafkaConnectInformer.start();
kafkaMirrorMaker2Informer.start();

strimziPodSetInformer.stopped().whenComplete((v, t) -> InformerUtils.stoppedInformerHandler("StrimziPodSet", t, stop));
podInformer.stopped().whenComplete((v, t) -> InformerUtils.stoppedInformerHandler("Pod", t, stop));
kafkaInformer.stopped().whenComplete((v, t) -> InformerUtils.stoppedInformerHandler("Kafka", t, stop));
kafkaConnectInformer.stopped().whenComplete((v, t) -> InformerUtils.stoppedInformerHandler("KafkaConnect", t, stop));
kafkaMirrorMaker2Informer.stopped().whenComplete((v, t) -> InformerUtils.stoppedInformerHandler("KafkaMirrorMaker2", t, stop));
}

protected void stopController() {
Expand Down Expand Up @@ -256,9 +229,8 @@ private StrimziPodSet findParentPodSetForPod(Pod pod) {
* @return The parent StrimziPodSet (or null if not found)
*/
private StrimziPodSet findParentPodSetForPodByLabels(Pod pod) {
return strimziPodSetLister
.namespace(pod.getMetadata().getNamespace())
.list()
return strimziPodSetInformer
.list(pod.getMetadata().getNamespace())
.stream()
.filter(podSet -> podSet.getSpec() != null
&& Util.matchesSelector(podSet.getSpec().getSelector(), pod))
Expand All @@ -283,9 +255,8 @@ private StrimziPodSet findParentPodSetForPodByOwnerReference(Pod pod) {
return null;
} else {
// We have owner reference => we find the StrimziPodSet based on it
return strimziPodSetLister
.namespace(pod.getMetadata().getNamespace())
.list()
return strimziPodSetInformer
.list(pod.getMetadata().getNamespace())
.stream()
.filter(podSet -> podSet.getMetadata().getName().equals(owner.getName()))
.findFirst()
Expand Down Expand Up @@ -327,9 +298,9 @@ private HasMetadata findCustomResource(StrimziPodSet podSet) {
HasMetadata cr = null;

switch (podSet.getMetadata().getLabels().get(Labels.STRIMZI_KIND_LABEL)) {
case Kafka.RESOURCE_KIND -> cr = kafkaLister.namespace(podSet.getMetadata().getNamespace()).get(customResourceName);
case KafkaConnect.RESOURCE_KIND -> cr = kafkaConnectLister.namespace(podSet.getMetadata().getNamespace()).get(customResourceName);
case KafkaMirrorMaker2.RESOURCE_KIND -> cr = kafkaMirrorMaker2Lister.namespace(podSet.getMetadata().getNamespace()).get(customResourceName);
case Kafka.RESOURCE_KIND -> cr = kafkaInformer.get(podSet.getMetadata().getNamespace(), customResourceName);
case KafkaConnect.RESOURCE_KIND -> cr = kafkaConnectInformer.get(podSet.getMetadata().getNamespace(), customResourceName);
case KafkaMirrorMaker2.RESOURCE_KIND -> cr = kafkaMirrorMaker2Informer.get(podSet.getMetadata().getNamespace(), customResourceName);
default -> LOGGER.warnOp("StrimziPodSet {} belongs to unsupported custom resource kind {}", podSet.getMetadata().getName(), podSet.getMetadata().getLabels().get(Labels.STRIMZI_KIND_LABEL));
}

Expand Down Expand Up @@ -359,7 +330,7 @@ private void reconcile(Reconciliation reconciliation) {
try {
String name = reconciliation.name();
String namespace = reconciliation.namespace();
StrimziPodSet podSet = strimziPodSetLister.namespace(namespace).get(name);
StrimziPodSet podSet = strimziPodSetInformer.get(namespace, name);

if (podSet == null) {
LOGGER.debugCr(reconciliation, "StrimziPodSet is null => nothing to do");
Expand Down Expand Up @@ -431,7 +402,7 @@ private void maybeUpdateStatus(Reconciliation reconciliation, StrimziPodSet podS
if (!new StatusDiff(podSet.getStatus(), desiredStatus).isEmpty()) {
try {
LOGGER.debugCr(reconciliation, "Updating status of StrimziPodSet {} in namespace {}", reconciliation.name(), reconciliation.namespace());
StrimziPodSet latestPodSet = strimziPodSetLister.namespace(reconciliation.namespace()).get(reconciliation.name());
StrimziPodSet latestPodSet = strimziPodSetInformer.get(reconciliation.namespace(), reconciliation.name());
if (latestPodSet != null) {
StrimziPodSet updatedPodSet = new StrimziPodSetBuilder(latestPodSet)
.withStatus(desiredStatus)
Expand Down Expand Up @@ -461,7 +432,7 @@ private void maybeUpdateStatus(Reconciliation reconciliation, StrimziPodSet podS
* @param podCounter Pod Counter used to count pods for the status
*/
private void maybeCreateOrPatchPod(Reconciliation reconciliation, Pod pod, OwnerReference owner, PodCounter podCounter) {
Pod currentPod = podLister.namespace(reconciliation.namespace()).get(pod.getMetadata().getName());
Pod currentPod = podInformer.get(reconciliation.namespace(), pod.getMetadata().getName());

if (currentPod == null) {
// Pod does not exist => we create it
Expand Down Expand Up @@ -512,9 +483,8 @@ private void maybeCreateOrPatchPod(Reconciliation reconciliation, Pod pod, Owner
* @param podCounter Pod Counter used to count pods for the status
*/
private void removeDeletedPods(Reconciliation reconciliation, LabelSelector selector, Collection<String> desiredPodNames, PodCounter podCounter) {
Set<String> toBeDeleted = podLister
.namespace(reconciliation.namespace())
.list()
Set<String> toBeDeleted = podInformer
.list(reconciliation.namespace())
.stream()
.filter(pod -> Util.matchesSelector(selector, pod))
.map(pod -> pod.getMetadata().getName())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import io.strimzi.operator.common.config.ConfigParameter;
import io.strimzi.operator.common.model.Labels;
import io.strimzi.operator.common.operator.resource.ReconcileResult;
import io.strimzi.operator.common.operator.resource.concurrent.Informer;
import io.vertx.core.Future;
import io.vertx.core.Vertx;

Expand Down Expand Up @@ -403,34 +404,6 @@ public Future<Void> deleteAsync(Reconciliation reconciliation, String namespace,
return internalDelete(reconciliation, namespace, name, cascading).map((Void) null);
}

/**
* Creates the informer for given resource type to inform on all instances in given namespace (or cluster-wide). The
* informer returned by this method is not running and has to be started by the code using it.
*
* @param namespace Namespace on which to inform
* @param resyncIntervalMs The interval in which the resync of the informer should happen in milliseconds
*
* @return Informer instance
*/
public SharedIndexInformer<T> informer(String namespace, long resyncIntervalMs) {
return runnableInformer(applyNamespace(namespace), resyncIntervalMs);
}

/**
* Creates the informer for given resource type to inform on all instances in given namespace (or cluster-wide)
* matching the selector. The informer returned by this method is not running and has to be started by the code
* using it.
*
* @param namespace Namespace on which to inform
* @param selectorLabels Selector which should be matched by the resources
* @param resyncIntervalMs The interval in which the resync of the informer should happen in milliseconds
*
* @return Informer instance
*/
public SharedIndexInformer<T> informer(String namespace, Map<String, String> selectorLabels, long resyncIntervalMs) {
return runnableInformer(applyNamespace(namespace).withLabels(selectorLabels), resyncIntervalMs);
}

/**
* Creates the informer for given resource type to inform on all instances in given namespace (or cluster-wide)
* matching the selector. The informer returned by this method is not running and has to be started by the code
Expand All @@ -442,8 +415,8 @@ public SharedIndexInformer<T> informer(String namespace, Map<String, String> sel
*
* @return Informer instance
*/
public SharedIndexInformer<T> informer(String namespace, LabelSelector labelSelector, long resyncIntervalMs) {
return runnableInformer(applyNamespace(namespace).withLabelSelector(labelSelector), resyncIntervalMs);
public Informer<T> informer(String namespace, LabelSelector labelSelector, long resyncIntervalMs) {
return new Informer<>(runnableInformer(applyNamespace(namespace).withLabelSelector(labelSelector), resyncIntervalMs));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ private void startStop(VertxTestContext context, String namespaces, boolean podS
return mockCmInformer;
});

when(mockNamespacedCms.withLabels(any())).thenReturn(mockNamespacedCms);
when(mockNamespacedCms.withLabelSelector(any(LabelSelector.class))).thenReturn(mockNamespacedCms);
when(mockCms.inNamespace(namespace)).thenReturn(mockNamespacedCms);

// Mock Pods
Expand Down Expand Up @@ -230,7 +230,7 @@ private void startStopAllNamespaces(VertxTestContext context, String namespaces,
when(mockCmInformer.stopped()).thenReturn(CompletableFuture.completedFuture(null));

AnyNamespaceOperation mockFilteredCms = mock(AnyNamespaceOperation.class);
when(mockFilteredCms.withLabels(any())).thenReturn(mockFilteredCms);
when(mockFilteredCms.withLabelSelector(any(LabelSelector.class))).thenReturn(mockFilteredCms);
when(mockFilteredCms.watch(any())).thenAnswer(invo -> {
numWatchers.incrementAndGet();
Watch mockWatch = mock(Watch.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
*/
package io.strimzi.operator.common;

import io.fabric8.kubernetes.client.informers.SharedIndexInformer;
import io.strimzi.operator.common.operator.resource.concurrent.Informer;

import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
Expand All @@ -16,57 +16,21 @@
public class InformerUtils {
private static final ReconciliationLogger LOGGER = ReconciliationLogger.create(InformerUtils.class);

/**
* Logs exceptions in the informers to give us a better overview of what is happening.
*
* @param type Type of the informer
* @param isStarted Flag indicating whether the informer is already started
* @param throwable Throwable describing the exception which occurred
*
* @return Boolean indicating whether the informer should retry or not.
*/
public static boolean loggingExceptionHandler(String type, boolean isStarted, Throwable throwable) {
LOGGER.errorOp("Caught exception in the " + type + " informer which is " + (isStarted ? "started" : "not started"), throwable);
// We always want the informer to retry => we just want to log the error
return true;
}

/**
* Watches for informers to not stop unless we are shutting down the controller. If it stops unexpectedly, we will
* terminate the operator.
*
* @param type Type of the informer
* @param reason Reason why the informer stopped
* @param stopping Flag indicating if the controller shutdown is in progress (in which case the informer is expected to stop)
*/
public static void stoppedInformerHandler(String type, Throwable reason, boolean stopping) {
if (!stopping) {
// the informer is not being stopped, so this is unexpected!
if (reason != null) {
LOGGER.errorOp("{} informer stopped unexpectedly", type, reason);
} else {
LOGGER.errorOp("{} informer stopped unexpectedly without a reason", type);
}
} else {
LOGGER.infoOp("{} informer stopped", type);
}
}

/**
* Synchronously stops one or more informers. It will stop them and then wait for up to the specified timeout for
* each of them to actually stop.
*
* @param timeoutMs Timeout in milliseconds for how long we will wait for each informer to stop
* @param informers Informers which should be stopped.
*/
public static void stopAll(long timeoutMs, SharedIndexInformer<?>... informers) {
public static void stopAll(long timeoutMs, Informer<?>... informers) {
LOGGER.infoOp("Stopping informers");
for (SharedIndexInformer<?> informer : informers) {
for (Informer<?> informer : informers) {
informer.stop();
}

try {
for (SharedIndexInformer<?> informer : informers) {
for (Informer<?> informer : informers) {
informer.stopped().toCompletableFuture().get(timeoutMs, TimeUnit.MILLISECONDS);
}
} catch (InterruptedException | TimeoutException | ExecutionException e) {
Expand Down
Loading

0 comments on commit 36fc647

Please sign in to comment.