diff --git a/cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/assembly/StrimziPodSetController.java b/cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/assembly/StrimziPodSetController.java index 2159d49184f..e31e800c094 100644 --- a/cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/assembly/StrimziPodSetController.java +++ b/cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/assembly/StrimziPodSetController.java @@ -7,7 +7,6 @@ import io.fabric8.kubernetes.api.model.DeletionPropagation; import io.fabric8.kubernetes.api.model.HasMetadata; import io.fabric8.kubernetes.api.model.LabelSelector; -import io.fabric8.kubernetes.api.model.LabelSelectorBuilder; import io.fabric8.kubernetes.api.model.LabelSelectorRequirement; import io.fabric8.kubernetes.api.model.OwnerReference; import io.fabric8.kubernetes.api.model.Pod; @@ -17,8 +16,6 @@ import io.fabric8.kubernetes.client.dsl.base.PatchContext; import io.fabric8.kubernetes.client.dsl.base.PatchType; import io.fabric8.kubernetes.client.informers.ResourceEventHandler; -import io.fabric8.kubernetes.client.informers.SharedIndexInformer; -import io.fabric8.kubernetes.client.informers.cache.Lister; import io.fabric8.kubernetes.client.readiness.Readiness; import io.micrometer.core.instrument.Timer; import io.strimzi.api.kafka.model.connect.KafkaConnect; @@ -45,6 +42,7 @@ import io.strimzi.operator.common.model.Labels; import io.strimzi.operator.common.model.StatusDiff; import io.strimzi.operator.common.model.StatusUtils; +import io.strimzi.operator.common.operator.resource.concurrent.Informer; import java.util.Collection; import java.util.HashSet; @@ -62,9 +60,7 @@ public class StrimziPodSetController implements Runnable { private static final ReconciliationLogger LOGGER = ReconciliationLogger.create(StrimziPodSetController.class); private static final long DEFAULT_RESYNC_PERIOD_MS = 5 * 60 * 1_000L; // 5 minutes by default - private static final LabelSelector POD_LABEL_SELECTOR = new LabelSelectorBuilder() - .withMatchExpressions(new LabelSelectorRequirement(Labels.STRIMZI_KIND_LABEL, "Exists", null)) - .build(); + private static final LabelSelector POD_LABEL_SELECTOR = new LabelSelector(List.of(new LabelSelectorRequirement(Labels.STRIMZI_KIND_LABEL, "Exists", null)), null); private final Thread controllerThread; @@ -77,16 +73,11 @@ public class StrimziPodSetController implements Runnable { private final String watchedNamespace; private final BlockingQueue workQueue; - private final SharedIndexInformer podInformer; - private final SharedIndexInformer strimziPodSetInformer; - private final SharedIndexInformer kafkaInformer; - private final SharedIndexInformer kafkaConnectInformer; - private final SharedIndexInformer kafkaMirrorMaker2Informer; - private final Lister podLister; - private final Lister strimziPodSetLister; - private final Lister kafkaLister; - private final Lister kafkaConnectLister; - private final Lister kafkaMirrorMaker2Lister; + private final Informer podInformer; + private final Informer strimziPodSetInformer; + private final Informer kafkaInformer; + private final Informer kafkaConnectInformer; + private final Informer kafkaMirrorMaker2Informer; /** * Creates the StrimziPodSet controller. The controller should normally exist once per operator for cluster-wide mode @@ -117,7 +108,7 @@ public StrimziPodSetController( ) { this.podOperator = podOperator; this.strimziPodSetOperator = strimziPodSetOperator; - this.crSelector = (crSelectorLabels == null || crSelectorLabels.toMap().isEmpty()) ? null : new LabelSelector(null, crSelectorLabels.toMap()); + this.crSelector = new LabelSelector(null, (crSelectorLabels == null || crSelectorLabels.toMap().isEmpty()) ? null : crSelectorLabels.toMap()); this.watchedNamespace = watchedNamespace; this.workQueue = new ArrayBlockingQueue<>(podSetControllerWorkQueueSize); @@ -126,20 +117,15 @@ public StrimziPodSetController( // Kafka, KafkaConnect and KafkaMirrorMaker2 informers and listers are used to get the CRs quickly. // This is needed for verification of the CR selector labels. - this.kafkaInformer = kafkaOperator.informer(watchedNamespace, (crSelectorLabels == null) ? Map.of() : crSelectorLabels.toMap(), DEFAULT_RESYNC_PERIOD_MS); - this.kafkaLister = new Lister<>(kafkaInformer.getIndexer()); - this.kafkaConnectInformer = kafkaConnectOperator.informer(watchedNamespace, (crSelectorLabels == null) ? Map.of() : crSelectorLabels.toMap(), DEFAULT_RESYNC_PERIOD_MS); - this.kafkaConnectLister = new Lister<>(kafkaConnectInformer.getIndexer()); - this.kafkaMirrorMaker2Informer = kafkaMirrorMaker2Operator.informer(watchedNamespace, (crSelectorLabels == null) ? Map.of() : crSelectorLabels.toMap(), DEFAULT_RESYNC_PERIOD_MS); - this.kafkaMirrorMaker2Lister = new Lister<>(kafkaMirrorMaker2Informer.getIndexer()); + this.kafkaInformer = kafkaOperator.informer(watchedNamespace, crSelector, DEFAULT_RESYNC_PERIOD_MS); + this.kafkaConnectInformer = kafkaConnectOperator.informer(watchedNamespace, crSelector, DEFAULT_RESYNC_PERIOD_MS); + this.kafkaMirrorMaker2Informer = kafkaMirrorMaker2Operator.informer(watchedNamespace, crSelector, DEFAULT_RESYNC_PERIOD_MS); // StrimziPodSet informer and lister is used to get events about StrimziPodSet and get StrimziPodSet quickly - this.strimziPodSetInformer = strimziPodSetOperator.informer(watchedNamespace, DEFAULT_RESYNC_PERIOD_MS); - this.strimziPodSetLister = new Lister<>(strimziPodSetInformer.getIndexer()); + this.strimziPodSetInformer = strimziPodSetOperator.informer(watchedNamespace, new LabelSelector(), DEFAULT_RESYNC_PERIOD_MS); // Pod informer and lister is used to get events about pods and get pods quickly this.podInformer = podOperator.informer(watchedNamespace, POD_LABEL_SELECTOR, DEFAULT_RESYNC_PERIOD_MS); - this.podLister = new Lister<>(podInformer.getIndexer()); this.controllerThread = new Thread(this, "StrimziPodSetController"); } @@ -158,26 +144,13 @@ protected boolean isSynced() { protected void startController() { strimziPodSetInformer.addEventHandler(new PodSetEventHandler()); - strimziPodSetInformer.exceptionHandler((isStarted, throwable) -> InformerUtils.loggingExceptionHandler("StrimziPodSet", isStarted, throwable)); - podInformer.addEventHandler(new PodEventHandler()); - podInformer.exceptionHandler((isStarted, throwable) -> InformerUtils.loggingExceptionHandler("Pod", isStarted, throwable)); - - kafkaInformer.exceptionHandler((isStarted, throwable) -> InformerUtils.loggingExceptionHandler("Kafka", isStarted, throwable)); - kafkaConnectInformer.exceptionHandler((isStarted, throwable) -> InformerUtils.loggingExceptionHandler("KafkaConnect", isStarted, throwable)); - kafkaMirrorMaker2Informer.exceptionHandler((isStarted, throwable) -> InformerUtils.loggingExceptionHandler("KafkaMirrorMaker2", isStarted, throwable)); strimziPodSetInformer.start(); podInformer.start(); kafkaInformer.start(); kafkaConnectInformer.start(); kafkaMirrorMaker2Informer.start(); - - strimziPodSetInformer.stopped().whenComplete((v, t) -> InformerUtils.stoppedInformerHandler("StrimziPodSet", t, stop)); - podInformer.stopped().whenComplete((v, t) -> InformerUtils.stoppedInformerHandler("Pod", t, stop)); - kafkaInformer.stopped().whenComplete((v, t) -> InformerUtils.stoppedInformerHandler("Kafka", t, stop)); - kafkaConnectInformer.stopped().whenComplete((v, t) -> InformerUtils.stoppedInformerHandler("KafkaConnect", t, stop)); - kafkaMirrorMaker2Informer.stopped().whenComplete((v, t) -> InformerUtils.stoppedInformerHandler("KafkaMirrorMaker2", t, stop)); } protected void stopController() { @@ -256,9 +229,8 @@ private StrimziPodSet findParentPodSetForPod(Pod pod) { * @return The parent StrimziPodSet (or null if not found) */ private StrimziPodSet findParentPodSetForPodByLabels(Pod pod) { - return strimziPodSetLister - .namespace(pod.getMetadata().getNamespace()) - .list() + return strimziPodSetInformer + .list(pod.getMetadata().getNamespace()) .stream() .filter(podSet -> podSet.getSpec() != null && Util.matchesSelector(podSet.getSpec().getSelector(), pod)) @@ -283,9 +255,8 @@ private StrimziPodSet findParentPodSetForPodByOwnerReference(Pod pod) { return null; } else { // We have owner reference => we find the StrimziPodSet based on it - return strimziPodSetLister - .namespace(pod.getMetadata().getNamespace()) - .list() + return strimziPodSetInformer + .list(pod.getMetadata().getNamespace()) .stream() .filter(podSet -> podSet.getMetadata().getName().equals(owner.getName())) .findFirst() @@ -327,9 +298,9 @@ private HasMetadata findCustomResource(StrimziPodSet podSet) { HasMetadata cr = null; switch (podSet.getMetadata().getLabels().get(Labels.STRIMZI_KIND_LABEL)) { - case Kafka.RESOURCE_KIND -> cr = kafkaLister.namespace(podSet.getMetadata().getNamespace()).get(customResourceName); - case KafkaConnect.RESOURCE_KIND -> cr = kafkaConnectLister.namespace(podSet.getMetadata().getNamespace()).get(customResourceName); - case KafkaMirrorMaker2.RESOURCE_KIND -> cr = kafkaMirrorMaker2Lister.namespace(podSet.getMetadata().getNamespace()).get(customResourceName); + case Kafka.RESOURCE_KIND -> cr = kafkaInformer.get(podSet.getMetadata().getNamespace(), customResourceName); + case KafkaConnect.RESOURCE_KIND -> cr = kafkaConnectInformer.get(podSet.getMetadata().getNamespace(), customResourceName); + case KafkaMirrorMaker2.RESOURCE_KIND -> cr = kafkaMirrorMaker2Informer.get(podSet.getMetadata().getNamespace(), customResourceName); default -> LOGGER.warnOp("StrimziPodSet {} belongs to unsupported custom resource kind {}", podSet.getMetadata().getName(), podSet.getMetadata().getLabels().get(Labels.STRIMZI_KIND_LABEL)); } @@ -359,7 +330,7 @@ private void reconcile(Reconciliation reconciliation) { try { String name = reconciliation.name(); String namespace = reconciliation.namespace(); - StrimziPodSet podSet = strimziPodSetLister.namespace(namespace).get(name); + StrimziPodSet podSet = strimziPodSetInformer.get(namespace, name); if (podSet == null) { LOGGER.debugCr(reconciliation, "StrimziPodSet is null => nothing to do"); @@ -431,7 +402,7 @@ private void maybeUpdateStatus(Reconciliation reconciliation, StrimziPodSet podS if (!new StatusDiff(podSet.getStatus(), desiredStatus).isEmpty()) { try { LOGGER.debugCr(reconciliation, "Updating status of StrimziPodSet {} in namespace {}", reconciliation.name(), reconciliation.namespace()); - StrimziPodSet latestPodSet = strimziPodSetLister.namespace(reconciliation.namespace()).get(reconciliation.name()); + StrimziPodSet latestPodSet = strimziPodSetInformer.get(reconciliation.namespace(), reconciliation.name()); if (latestPodSet != null) { StrimziPodSet updatedPodSet = new StrimziPodSetBuilder(latestPodSet) .withStatus(desiredStatus) @@ -461,7 +432,7 @@ private void maybeUpdateStatus(Reconciliation reconciliation, StrimziPodSet podS * @param podCounter Pod Counter used to count pods for the status */ private void maybeCreateOrPatchPod(Reconciliation reconciliation, Pod pod, OwnerReference owner, PodCounter podCounter) { - Pod currentPod = podLister.namespace(reconciliation.namespace()).get(pod.getMetadata().getName()); + Pod currentPod = podInformer.get(reconciliation.namespace(), pod.getMetadata().getName()); if (currentPod == null) { // Pod does not exist => we create it @@ -512,9 +483,8 @@ private void maybeCreateOrPatchPod(Reconciliation reconciliation, Pod pod, Owner * @param podCounter Pod Counter used to count pods for the status */ private void removeDeletedPods(Reconciliation reconciliation, LabelSelector selector, Collection desiredPodNames, PodCounter podCounter) { - Set toBeDeleted = podLister - .namespace(reconciliation.namespace()) - .list() + Set toBeDeleted = podInformer + .list(reconciliation.namespace()) .stream() .filter(pod -> Util.matchesSelector(selector, pod)) .map(pod -> pod.getMetadata().getName()) diff --git a/cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/resource/kubernetes/AbstractNamespacedResourceOperator.java b/cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/resource/kubernetes/AbstractNamespacedResourceOperator.java index 2a53c66b35b..5285266d17b 100644 --- a/cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/resource/kubernetes/AbstractNamespacedResourceOperator.java +++ b/cluster-operator/src/main/java/io/strimzi/operator/cluster/operator/resource/kubernetes/AbstractNamespacedResourceOperator.java @@ -23,6 +23,7 @@ import io.strimzi.operator.common.config.ConfigParameter; import io.strimzi.operator.common.model.Labels; import io.strimzi.operator.common.operator.resource.ReconcileResult; +import io.strimzi.operator.common.operator.resource.concurrent.Informer; import io.vertx.core.Future; import io.vertx.core.Vertx; @@ -403,34 +404,6 @@ public Future deleteAsync(Reconciliation reconciliation, String namespace, return internalDelete(reconciliation, namespace, name, cascading).map((Void) null); } - /** - * Creates the informer for given resource type to inform on all instances in given namespace (or cluster-wide). The - * informer returned by this method is not running and has to be started by the code using it. - * - * @param namespace Namespace on which to inform - * @param resyncIntervalMs The interval in which the resync of the informer should happen in milliseconds - * - * @return Informer instance - */ - public SharedIndexInformer informer(String namespace, long resyncIntervalMs) { - return runnableInformer(applyNamespace(namespace), resyncIntervalMs); - } - - /** - * Creates the informer for given resource type to inform on all instances in given namespace (or cluster-wide) - * matching the selector. The informer returned by this method is not running and has to be started by the code - * using it. - * - * @param namespace Namespace on which to inform - * @param selectorLabels Selector which should be matched by the resources - * @param resyncIntervalMs The interval in which the resync of the informer should happen in milliseconds - * - * @return Informer instance - */ - public SharedIndexInformer informer(String namespace, Map selectorLabels, long resyncIntervalMs) { - return runnableInformer(applyNamespace(namespace).withLabels(selectorLabels), resyncIntervalMs); - } - /** * Creates the informer for given resource type to inform on all instances in given namespace (or cluster-wide) * matching the selector. The informer returned by this method is not running and has to be started by the code @@ -442,8 +415,8 @@ public SharedIndexInformer informer(String namespace, Map sel * * @return Informer instance */ - public SharedIndexInformer informer(String namespace, LabelSelector labelSelector, long resyncIntervalMs) { - return runnableInformer(applyNamespace(namespace).withLabelSelector(labelSelector), resyncIntervalMs); + public Informer informer(String namespace, LabelSelector labelSelector, long resyncIntervalMs) { + return new Informer<>(runnableInformer(applyNamespace(namespace).withLabelSelector(labelSelector), resyncIntervalMs)); } /** diff --git a/cluster-operator/src/test/java/io/strimzi/operator/cluster/ClusterOperatorTest.java b/cluster-operator/src/test/java/io/strimzi/operator/cluster/ClusterOperatorTest.java index 33b032c3ea9..999ebbe5d89 100644 --- a/cluster-operator/src/test/java/io/strimzi/operator/cluster/ClusterOperatorTest.java +++ b/cluster-operator/src/test/java/io/strimzi/operator/cluster/ClusterOperatorTest.java @@ -150,7 +150,7 @@ private void startStop(VertxTestContext context, String namespaces, boolean podS return mockCmInformer; }); - when(mockNamespacedCms.withLabels(any())).thenReturn(mockNamespacedCms); + when(mockNamespacedCms.withLabelSelector(any(LabelSelector.class))).thenReturn(mockNamespacedCms); when(mockCms.inNamespace(namespace)).thenReturn(mockNamespacedCms); // Mock Pods @@ -230,7 +230,7 @@ private void startStopAllNamespaces(VertxTestContext context, String namespaces, when(mockCmInformer.stopped()).thenReturn(CompletableFuture.completedFuture(null)); AnyNamespaceOperation mockFilteredCms = mock(AnyNamespaceOperation.class); - when(mockFilteredCms.withLabels(any())).thenReturn(mockFilteredCms); + when(mockFilteredCms.withLabelSelector(any(LabelSelector.class))).thenReturn(mockFilteredCms); when(mockFilteredCms.watch(any())).thenAnswer(invo -> { numWatchers.incrementAndGet(); Watch mockWatch = mock(Watch.class); diff --git a/operator-common/src/main/java/io/strimzi/operator/common/InformerUtils.java b/operator-common/src/main/java/io/strimzi/operator/common/InformerUtils.java index e3ebb5f3e2d..9227be4f608 100644 --- a/operator-common/src/main/java/io/strimzi/operator/common/InformerUtils.java +++ b/operator-common/src/main/java/io/strimzi/operator/common/InformerUtils.java @@ -4,7 +4,7 @@ */ package io.strimzi.operator.common; -import io.fabric8.kubernetes.client.informers.SharedIndexInformer; +import io.strimzi.operator.common.operator.resource.concurrent.Informer; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; @@ -16,42 +16,6 @@ public class InformerUtils { private static final ReconciliationLogger LOGGER = ReconciliationLogger.create(InformerUtils.class); - /** - * Logs exceptions in the informers to give us a better overview of what is happening. - * - * @param type Type of the informer - * @param isStarted Flag indicating whether the informer is already started - * @param throwable Throwable describing the exception which occurred - * - * @return Boolean indicating whether the informer should retry or not. - */ - public static boolean loggingExceptionHandler(String type, boolean isStarted, Throwable throwable) { - LOGGER.errorOp("Caught exception in the " + type + " informer which is " + (isStarted ? "started" : "not started"), throwable); - // We always want the informer to retry => we just want to log the error - return true; - } - - /** - * Watches for informers to not stop unless we are shutting down the controller. If it stops unexpectedly, we will - * terminate the operator. - * - * @param type Type of the informer - * @param reason Reason why the informer stopped - * @param stopping Flag indicating if the controller shutdown is in progress (in which case the informer is expected to stop) - */ - public static void stoppedInformerHandler(String type, Throwable reason, boolean stopping) { - if (!stopping) { - // the informer is not being stopped, so this is unexpected! - if (reason != null) { - LOGGER.errorOp("{} informer stopped unexpectedly", type, reason); - } else { - LOGGER.errorOp("{} informer stopped unexpectedly without a reason", type); - } - } else { - LOGGER.infoOp("{} informer stopped", type); - } - } - /** * Synchronously stops one or more informers. It will stop them and then wait for up to the specified timeout for * each of them to actually stop. @@ -59,14 +23,14 @@ public static void stoppedInformerHandler(String type, Throwable reason, boolean * @param timeoutMs Timeout in milliseconds for how long we will wait for each informer to stop * @param informers Informers which should be stopped. */ - public static void stopAll(long timeoutMs, SharedIndexInformer... informers) { + public static void stopAll(long timeoutMs, Informer... informers) { LOGGER.infoOp("Stopping informers"); - for (SharedIndexInformer informer : informers) { + for (Informer informer : informers) { informer.stop(); } try { - for (SharedIndexInformer informer : informers) { + for (Informer informer : informers) { informer.stopped().toCompletableFuture().get(timeoutMs, TimeUnit.MILLISECONDS); } } catch (InterruptedException | TimeoutException | ExecutionException e) { diff --git a/operator-common/src/main/java/io/strimzi/operator/common/operator/resource/concurrent/AbstractNamespacedResourceOperator.java b/operator-common/src/main/java/io/strimzi/operator/common/operator/resource/concurrent/AbstractNamespacedResourceOperator.java index 182c0d25904..cde157d94ae 100644 --- a/operator-common/src/main/java/io/strimzi/operator/common/operator/resource/concurrent/AbstractNamespacedResourceOperator.java +++ b/operator-common/src/main/java/io/strimzi/operator/common/operator/resource/concurrent/AbstractNamespacedResourceOperator.java @@ -26,7 +26,6 @@ import java.util.ArrayList; import java.util.List; -import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; import java.util.concurrent.Executor; @@ -437,51 +436,23 @@ public CompletionStage deleteAsync(Reconciliation reconciliation, String n return internalDelete(reconciliation, namespace, name, cascading).thenRun(ResourceSupport.NOOP); } - /** - * Creates the informer for given resource type to inform on all instances in given namespace (or cluster-wide). The - * informer returned by this method is not running and has to be started by the code using it. - * - * @param namespace Namespace on which to inform - * @param resyncIntervalMs The interval in which the resync of the informer should happen in milliseconds - * - * @return Informer instance - */ - public SharedIndexInformer informer(String namespace, long resyncIntervalMs) { - return runnableInformer(applyNamespace(namespace), resyncIntervalMs); - } - - /** - * Creates the informer for given resource type to inform on all instances in given namespace (or cluster-wide) - * matching the selector. The informer returned by this method is not running and has to be started by the code - * using it. - * - * @param namespace Namespace on which to inform - * @param selectorLabels Selector which should be matched by the resources - * @param resyncIntervalMs The interval in which the resync of the informer should happen in milliseconds - * - * @return Informer instance - */ - public SharedIndexInformer informer(String namespace, Map selectorLabels, long resyncIntervalMs) { - return runnableInformer(applyNamespace(namespace).withLabels(selectorLabels), resyncIntervalMs); - } - /** * Creates the informer for given resource type to inform on all instances in given namespace (or cluster-wide) * matching the selector. The informer returned by this method is not running and has to be started by the code * using it. * * @param namespace Namespace on which to inform - * @param labelSelector Labels Selector which should be matched by the resources + * @param selector Selector which should be matched by the resources * @param resyncIntervalMs The interval in which the resync of the informer should happen in milliseconds * * @return Informer instance */ - public SharedIndexInformer informer(String namespace, LabelSelector labelSelector, long resyncIntervalMs) { - return runnableInformer(applyNamespace(namespace).withLabelSelector(labelSelector), resyncIntervalMs); + public Informer informer(String namespace, LabelSelector selector, long resyncIntervalMs) { + return new Informer<>(runnableInformer(applyNamespace(namespace).withLabelSelector(selector), resyncIntervalMs)); } /** - * Creates a runnable informer. Runnable informer is not running yet and need to be started by the code using it. + * Creates a runnable informer. Runnable informer is not running yet and needs to be started by the code using it. * * @param informable Instance of the Informable interface for creating informers * @param resyncIntervalMs The interval in which the resync of the informer should happen in milliseconds diff --git a/operator-common/src/main/java/io/strimzi/operator/common/operator/resource/concurrent/Informer.java b/operator-common/src/main/java/io/strimzi/operator/common/operator/resource/concurrent/Informer.java new file mode 100644 index 00000000000..2b77d572a73 --- /dev/null +++ b/operator-common/src/main/java/io/strimzi/operator/common/operator/resource/concurrent/Informer.java @@ -0,0 +1,138 @@ +/* + * Copyright Strimzi authors. + * License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html). + */ +package io.strimzi.operator.common.operator.resource.concurrent; + +import io.fabric8.kubernetes.api.model.HasMetadata; +import io.fabric8.kubernetes.client.informers.ResourceEventHandler; +import io.fabric8.kubernetes.client.informers.SharedIndexInformer; +import io.fabric8.kubernetes.client.informers.cache.Lister; +import io.strimzi.operator.common.ReconciliationLogger; + +import java.util.List; +import java.util.concurrent.CompletionStage; + +/** + * A class that wraps the Fabric8 Informer and Lister for particular resource + * + * @param Type of the resource handled by this informer + */ +public class Informer { + private static final ReconciliationLogger LOGGER = ReconciliationLogger.create(Informer.class); + + private final SharedIndexInformer informer; + private final Lister lister; + + /** + * Constructor. + * + * @param informer The Fabric8 informer that will be wrapped inside this class + */ + public Informer(SharedIndexInformer informer) { + this.informer = informer; + this.lister = new Lister<>(informer.getIndexer()); + + // Setup the exception handler to log errors + informer.exceptionHandler((isStarted, t) -> { + LOGGER.errorOp("Caught exception in the {} informer which is {}", informer.getApiTypeClass().getSimpleName(), (isStarted ? "started" : "not started"), t); + // We always want the informer to retry => we just want to log the error + return true; + }); + } + + ////////////////////////////// + /// Methods for working with the lister cache and querying the informer + ////////////////////////////// + + /** + * Finds the resource based on its namespace and name + * + * @param namespace Namespace of the resource + * @param name Name of the resource + * + * @return The resource matching the name and namespace or null if it was not found + */ + public T get(String namespace, String name) { + return lister.namespace(namespace).get(name); + } + + /** + * Lists the resource from a namespace + * + * @param namespace Namespace of the resource + * + * @return List of resources that exist in a given namespace + */ + public List list(String namespace) { + return lister.namespace(namespace).list(); + } + + + ////////////////////////////// + /// "Inherited" methods for working with the informers -> just call the corresponding informer method + ////////////////////////////// + + /** + * Configures the event handler for this informer + * + * @param handler Event handler + */ + public void addEventHandler(ResourceEventHandler handler) { + informer.addEventHandler(handler); + } + + /** + * Starts the informer + * + * @return CompletionStage that completes when the informer is started + */ + public CompletionStage start() { + CompletionStage start = informer.start(); + + // Setup logging for when the informer stops -> this is useful to debug various failures with the informers + informer.stopped().whenComplete((v, t) -> { + if (t != null) { + LOGGER.warnOp("{} informer stopped", informer.getApiTypeClass().getSimpleName(), t); + } else { + LOGGER.infoOp("{} informer stopped", informer.getApiTypeClass().getSimpleName()); + } + }); + + return start; + } + + /** + * Stops the informer + */ + public void stop() { + informer.stop(); + } + + /** + * Waits until the informer is stopped + * + * @return CompletionStage that completes when the informer is stopped + */ + public CompletionStage stopped() { + return informer.stopped(); + } + + /** + * Indicates whether the informer is synced + * + * @return True if the informer is synced. False otherwise. + */ + public boolean hasSynced() { + return informer.hasSynced(); + } + + /** + * Indicates whether the informer is running + * + * @return True if the informer is running. False otherwise. + */ + public boolean isRunning() { + return informer.isRunning(); + } +} \ No newline at end of file diff --git a/user-operator/src/main/java/io/strimzi/operator/user/UserController.java b/user-operator/src/main/java/io/strimzi/operator/user/UserController.java index 618f5b3cc34..b46cb1eac6b 100644 --- a/user-operator/src/main/java/io/strimzi/operator/user/UserController.java +++ b/user-operator/src/main/java/io/strimzi/operator/user/UserController.java @@ -4,11 +4,10 @@ */ package io.strimzi.operator.user; +import io.fabric8.kubernetes.api.model.LabelSelector; import io.fabric8.kubernetes.api.model.Secret; import io.fabric8.kubernetes.client.KubernetesClient; import io.fabric8.kubernetes.client.informers.ResourceEventHandler; -import io.fabric8.kubernetes.client.informers.SharedIndexInformer; -import io.fabric8.kubernetes.client.informers.cache.Lister; import io.strimzi.api.kafka.model.user.KafkaUser; import io.strimzi.api.kafka.model.user.KafkaUserList; import io.strimzi.operator.common.Annotations; @@ -25,6 +24,7 @@ import io.strimzi.operator.common.model.Labels; import io.strimzi.operator.common.model.NamespaceAndName; import io.strimzi.operator.common.operator.resource.concurrent.CrdOperator; +import io.strimzi.operator.common.operator.resource.concurrent.Informer; import io.strimzi.operator.common.operator.resource.concurrent.SecretOperator; import io.strimzi.operator.user.operator.KafkaUserOperator; @@ -60,8 +60,8 @@ public class UserController implements Liveness, Readiness { private final long reconcileIntervalMs; private final long operationTimeoutMs; - private final SharedIndexInformer secretInformer; - private final SharedIndexInformer userInformer; + private final Informer secretInformer; + private final Informer userInformer; private final ScheduledExecutorService scheduledExecutor; @@ -106,12 +106,10 @@ public UserController( this.workQueue = new ControllerQueue(config.getWorkQueueSize(), this.metrics); // Secret informer and lister is used to get events about Secrets and get Secrets quickly - this.secretInformer = secretOperator.informer(watchedNamespace, secretSelector, DEFAULT_RESYNC_PERIOD_MS); - Lister secretLister = new Lister<>(secretInformer.getIndexer()); + this.secretInformer = secretOperator.informer(watchedNamespace, new LabelSelector(null, secretSelector), DEFAULT_RESYNC_PERIOD_MS); // KafkaUser informer and lister is used to get events about Users and get Users quickly - this.userInformer = userCrdOperator.informer(watchedNamespace, userSelector, DEFAULT_RESYNC_PERIOD_MS); - Lister userLister = new Lister<>(userInformer.getIndexer()); + this.userInformer = userCrdOperator.informer(watchedNamespace, new LabelSelector(null, userSelector), DEFAULT_RESYNC_PERIOD_MS); // Creates the scheduled executor service used for periodical reconciliations and progress warnings this.scheduledExecutor = Executors.newSingleThreadScheduledExecutor(r -> new Thread(r, "UserControllerScheduledExecutor")); @@ -122,7 +120,7 @@ public UserController( // Create a thread pool for the reconciliation loops and add the reconciliation loops this.threadPool = new ArrayList<>(config.getControllerThreadPoolSize()); for (int i = 0; i < config.getControllerThreadPoolSize(); i++) { - threadPool.add(new UserControllerLoop(RESOURCE_KIND + "-ControllerLoop-" + i, workQueue, lockManager, scheduledExecutor, userLister, secretLister, userCrdOperator, userOperator, metrics, config)); + threadPool.add(new UserControllerLoop(RESOURCE_KIND + "-ControllerLoop-" + i, workQueue, lockManager, scheduledExecutor, userInformer, secretInformer, userCrdOperator, userOperator, metrics, config)); } } @@ -194,11 +192,9 @@ protected void stop() { protected void start() { // Configure the event handler for the KafkaUser resources this.userInformer.addEventHandler(new KafkaUserEventHandler()); - this.userInformer.exceptionHandler((isStarted, throwable) -> InformerUtils.loggingExceptionHandler("KafkaUser", isStarted, throwable)); // Configure the event handler for Secrets this.secretInformer.addEventHandler(new SecretEventHandler()); - this.userInformer.exceptionHandler((isStarted, throwable) -> InformerUtils.loggingExceptionHandler("Secret", isStarted, throwable)); LOGGER.infoOp("Starting the KafkaUser informer"); userInformer.start(); diff --git a/user-operator/src/main/java/io/strimzi/operator/user/UserControllerLoop.java b/user-operator/src/main/java/io/strimzi/operator/user/UserControllerLoop.java index 1a4ab3e5b3b..5f594f036a9 100644 --- a/user-operator/src/main/java/io/strimzi/operator/user/UserControllerLoop.java +++ b/user-operator/src/main/java/io/strimzi/operator/user/UserControllerLoop.java @@ -7,7 +7,6 @@ import io.fabric8.kubernetes.api.model.Secret; import io.fabric8.kubernetes.client.KubernetesClient; import io.fabric8.kubernetes.client.KubernetesClientException; -import io.fabric8.kubernetes.client.informers.cache.Lister; import io.strimzi.api.kafka.model.common.Condition; import io.strimzi.api.kafka.model.user.KafkaUser; import io.strimzi.api.kafka.model.user.KafkaUserBuilder; @@ -24,6 +23,7 @@ import io.strimzi.operator.common.model.StatusDiff; import io.strimzi.operator.common.model.StatusUtils; import io.strimzi.operator.common.operator.resource.concurrent.CrdOperator; +import io.strimzi.operator.common.operator.resource.concurrent.Informer; import io.strimzi.operator.user.model.KafkaUserModel; import io.strimzi.operator.user.operator.KafkaUserOperator; @@ -42,8 +42,8 @@ public class UserControllerLoop extends AbstractControllerLoop { private static final ReconciliationLogger LOGGER = ReconciliationLogger.create(UserControllerLoop.class); - private final Lister userLister; - private final Lister secretLister; + private final Informer userInformer; + private final Informer secretInformer; private final CrdOperator userCrdOperator; private final KafkaUserOperator userOperator; private final ControllerMetricsHolder metrics; @@ -61,8 +61,8 @@ public class UserControllerLoop extends AbstractControllerLoop { * @param lockManager LockManager which is used to avoid the same resource being reconciled in multiple loops in parallel * @param scheduledExecutor Scheduled executor service which will be passed to the AbstractControllerLoop and * used to run the progress warnings - * @param userLister The KafkaUser resource lister for getting the resources - * @param secretLister The Secret lister for getting the secrets + * @param userInformer The KafkaUser resource lister for getting the resources + * @param secretInformer The Secret lister for getting the secrets * @param userCrdOperator For operating on KafkaUser resources * @param userOperator The KafkaUserOperator which has the logic for updating the Kubernetes or Kafka resources * @param metrics The metrics holder for providing metrics about the reconciliation @@ -73,8 +73,8 @@ public UserControllerLoop( ControllerQueue workQueue, ReconciliationLockManager lockManager, ScheduledExecutorService scheduledExecutor, - Lister userLister, - Lister secretLister, + Informer userInformer, + Informer secretInformer, CrdOperator userCrdOperator, KafkaUserOperator userOperator, ControllerMetricsHolder metrics, @@ -82,8 +82,8 @@ public UserControllerLoop( ) { super(name, workQueue, lockManager, scheduledExecutor); - this.userLister = userLister; - this.secretLister = secretLister; + this.userInformer = userInformer; + this.secretInformer = secretInformer; this.userCrdOperator = userCrdOperator; this.userOperator = userOperator; this.metrics = metrics; @@ -101,7 +101,7 @@ public UserControllerLoop( protected void reconcile(Reconciliation reconciliation) { LOGGER.infoCr(reconciliation, "{} will be reconciled", reconciliation.kind()); - KafkaUser user = userLister.namespace(reconciliation.namespace()).get(reconciliation.name()); + KafkaUser user = userInformer.get(reconciliation.namespace(), reconciliation.name()); if (user != null && Annotations.isReconciliationPausedWithAnnotation(user)) { // Reconciliation is paused => we make sure the status is up-to-date but don't do anything @@ -112,7 +112,7 @@ protected void reconcile(Reconciliation reconciliation) { } else { // Resource is not paused or is null (and we should trigger deletion) => we should proceed with reconciliation CompletionStage reconciliationResult = userOperator - .reconcile(reconciliation, user, secretLister.namespace(reconciliation.namespace()).get(KafkaUserModel.getSecretName(secretPrefix, reconciliation.name()))); + .reconcile(reconciliation, user, secretInformer.get(reconciliation.namespace(), KafkaUserModel.getSecretName(secretPrefix, reconciliation.name()))); try { KafkaUserStatus status = new KafkaUserStatus(); @@ -155,7 +155,7 @@ private void maybeUpdateStatus(Reconciliation reconciliation, KafkaUser kafkaUse // KafkaUser or desiredStatus being null means deletion => no status to update if (kafkaUser != null && desiredStatus != null && !new StatusDiff(kafkaUser.getStatus(), desiredStatus).isEmpty()) { LOGGER.debugCr(reconciliation, "Updating status of {} {} in namespace {}", reconciliation.kind(), reconciliation.name(), reconciliation.namespace()); - KafkaUser latestKafkaUser = userLister.namespace(reconciliation.namespace()).get(reconciliation.name()); + KafkaUser latestKafkaUser = userInformer.get(reconciliation.namespace(), reconciliation.name()); if (latestKafkaUser != null) { KafkaUser updateKafkaUser = new KafkaUserBuilder(latestKafkaUser)