From 042842d24ea145b34eb241ecd53cc8c32fedb473 Mon Sep 17 00:00:00 2001 From: Jakub Senko Date: Wed, 29 Jan 2025 20:52:01 +0100 Subject: [PATCH] test(operator): stream operator pod logs in remote test mode --- .../apicurio/registry/operator/it/ITBase.java | 27 ++++++- .../registry/operator/it/PodLogManager.java | 79 +++++++++++++++++++ 2 files changed, 103 insertions(+), 3 deletions(-) create mode 100644 operator/controller/src/test/java/io/apicurio/registry/operator/it/PodLogManager.java diff --git a/operator/controller/src/test/java/io/apicurio/registry/operator/it/ITBase.java b/operator/controller/src/test/java/io/apicurio/registry/operator/it/ITBase.java index 0141318fa6..ba385a3972 100644 --- a/operator/controller/src/test/java/io/apicurio/registry/operator/it/ITBase.java +++ b/operator/controller/src/test/java/io/apicurio/registry/operator/it/ITBase.java @@ -4,6 +4,7 @@ import io.apicurio.registry.operator.api.v1.ApicurioRegistry3; import io.fabric8.kubernetes.api.model.HasMetadata; import io.fabric8.kubernetes.api.model.NamespaceBuilder; +import io.fabric8.kubernetes.api.model.Pod; import io.fabric8.kubernetes.api.model.apps.Deployment; import io.fabric8.kubernetes.client.Config; import io.fabric8.kubernetes.client.ConfigBuilder; @@ -12,6 +13,7 @@ import io.fabric8.kubernetes.client.utils.Serialization; import io.javaoperatorsdk.operator.Operator; import io.javaoperatorsdk.operator.api.reconciler.Reconciler; +import io.javaoperatorsdk.operator.processing.event.ResourceID; import io.quarkiverse.operatorsdk.runtime.QuarkusConfigurationService; import jakarta.enterprise.inject.Instance; import jakarta.enterprise.inject.spi.CDI; @@ -22,12 +24,13 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.FileInputStream; -import java.io.IOException; +import java.io.*; import java.nio.file.Files; import java.nio.file.Path; import java.time.Duration; +import java.util.ArrayList; import java.util.List; +import java.util.Map; import java.util.UUID; import static org.assertj.core.api.Assertions.assertThat; @@ -54,6 +57,7 @@ public enum OperatorDeployment { protected static Instance> reconcilers; protected static QuarkusConfigurationService configuration; protected static KubernetesClient client; + protected static PodLogManager podLogManager; protected static PortForwardManager portForwardManager; protected static IngressManager ingressManager; protected static String deploymentTarget; @@ -79,9 +83,11 @@ public static void before() throws Exception { portForwardManager = new PortForwardManager(client, namespace); ingressManager = new IngressManager(client, namespace); + podLogManager = new PodLogManager(client); if (operatorDeployment == OperatorDeployment.remote) { createTestResources(); + startOperatorLogs(); } else { createOperator(); registerReconcilers(); @@ -176,6 +182,21 @@ private static void createTestResources() throws Exception { }); } + private static void startOperatorLogs() { + List operatorPods = new ArrayList<>(); + await().ignoreExceptions().untilAsserted(() -> { + operatorPods.clear(); + operatorPods.addAll(client.pods() + .withLabels(Map.of( + "app.kubernetes.io/name", "apicurio-registry-operator", + "app.kubernetes.io/component", "operator", + "app.kubernetes.io/part-of", "apicurio-registry")) + .list().getItems()); + assertThat(operatorPods).hasSize(1); + }); + podLogManager.startPodLog(ResourceID.fromResource(operatorPods.get(0))); + } + private static void cleanTestResources() throws Exception { if (cleanup) { log.info("Deleting generated resources from Namespace {}", namespace); @@ -258,7 +279,7 @@ public static void after() throws Exception { } else { cleanTestResources(); } - + podLogManager.stopAndWait(); if (cleanup) { log.info("Deleting namespace : {}", namespace); assertThat(client.namespaces().withName(namespace).delete()).isNotNull(); diff --git a/operator/controller/src/test/java/io/apicurio/registry/operator/it/PodLogManager.java b/operator/controller/src/test/java/io/apicurio/registry/operator/it/PodLogManager.java new file mode 100644 index 0000000000..97c27141f3 --- /dev/null +++ b/operator/controller/src/test/java/io/apicurio/registry/operator/it/PodLogManager.java @@ -0,0 +1,79 @@ +package io.apicurio.registry.operator.it; + +import io.fabric8.kubernetes.client.KubernetesClient; +import io.fabric8.kubernetes.client.dsl.LogWatch; +import io.javaoperatorsdk.operator.processing.event.ResourceID; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.BufferedReader; +import java.io.InputStreamReader; +import java.time.Duration; +import java.time.Instant; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicBoolean; + +import static java.util.concurrent.TimeUnit.SECONDS; +import static org.awaitility.Awaitility.await; + +public class PodLogManager { + + private static final Logger log = LoggerFactory.getLogger(PodLogManager.class); + + private final KubernetesClient k8sClient; + + private final Map activePodLogMap = new ConcurrentHashMap<>(); + + public PodLogManager(KubernetesClient k8sClient) { + this.k8sClient = k8sClient; + } + + private String getNamespace(ResourceID id) { + return id.getNamespace().orElse("default"); + } + + public void startPodLog(ResourceID podID) { + k8sClient.pods().inNamespace(getNamespace(podID)).withName(podID.getName()).waitUntilReady(60, + SECONDS); + new Thread(() -> { + StringBuilder chunk = new StringBuilder(); + try ( + LogWatch logWatch = k8sClient.pods().inNamespace(getNamespace(podID)).withName(podID.getName()).watchLog(); + BufferedReader reader = new BufferedReader(new InputStreamReader(logWatch.getOutput())) + ) { + AtomicBoolean stop = new AtomicBoolean(false); + log.debug("START LOG of pod {}/{}", getNamespace(podID), podID.getName()); + activePodLogMap.put(podID, stop); + var lastWriteAt = Instant.now(); + while (!stop.get()) { + var line = reader.readLine(); + if (line != null) { + chunk.append(getNamespace(podID)).append("/").append(podID.getName()).append(" >>> ") + .append(line).append("\n"); + if (lastWriteAt.plus(Duration.ofSeconds(5)).isBefore(Instant.now())) { + log.debug("LOG of pod {}/{}:\n{}", getNamespace(podID), podID.getName(), chunk); + chunk.setLength(0); + lastWriteAt = Instant.now(); + } + } else { + stop.set(true); + } + } + } catch (Exception ex) { + log.error("Error while reading logs of pod {}/{}", getNamespace(podID), podID.getName(), ex); + } finally { + if (chunk.length() > 0) { + log.debug("LOG of pod {}/{}:\n{}", getNamespace(podID), podID.getName(), chunk); + } + log.debug("END LOG of pod {}/{}", getNamespace(podID), podID.getName()); + activePodLogMap.remove(podID); + } + }).start(); + } + + public void stopAndWait() { + activePodLogMap.values().forEach(stop -> stop.set(true)); + await().until(activePodLogMap::isEmpty); + } +}