Skip to content

Commit

Permalink
test(operator): stream operator pod logs in remote test mode
Browse files Browse the repository at this point in the history
  • Loading branch information
jsenko committed Feb 3, 2025
1 parent 7211b67 commit 042842d
Show file tree
Hide file tree
Showing 2 changed files with 103 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import io.apicurio.registry.operator.api.v1.ApicurioRegistry3;
import io.fabric8.kubernetes.api.model.HasMetadata;
import io.fabric8.kubernetes.api.model.NamespaceBuilder;
import io.fabric8.kubernetes.api.model.Pod;
import io.fabric8.kubernetes.api.model.apps.Deployment;
import io.fabric8.kubernetes.client.Config;
import io.fabric8.kubernetes.client.ConfigBuilder;
Expand All @@ -12,6 +13,7 @@
import io.fabric8.kubernetes.client.utils.Serialization;
import io.javaoperatorsdk.operator.Operator;
import io.javaoperatorsdk.operator.api.reconciler.Reconciler;
import io.javaoperatorsdk.operator.processing.event.ResourceID;
import io.quarkiverse.operatorsdk.runtime.QuarkusConfigurationService;
import jakarta.enterprise.inject.Instance;
import jakarta.enterprise.inject.spi.CDI;
Expand All @@ -22,12 +24,13 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.FileInputStream;
import java.io.IOException;
import java.io.*;
import java.nio.file.Files;
import java.nio.file.Path;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.UUID;

import static org.assertj.core.api.Assertions.assertThat;
Expand All @@ -54,6 +57,7 @@ public enum OperatorDeployment {
protected static Instance<Reconciler<? extends HasMetadata>> reconcilers;
protected static QuarkusConfigurationService configuration;
protected static KubernetesClient client;
protected static PodLogManager podLogManager;
protected static PortForwardManager portForwardManager;
protected static IngressManager ingressManager;
protected static String deploymentTarget;
Expand All @@ -79,9 +83,11 @@ public static void before() throws Exception {

portForwardManager = new PortForwardManager(client, namespace);
ingressManager = new IngressManager(client, namespace);
podLogManager = new PodLogManager(client);

if (operatorDeployment == OperatorDeployment.remote) {
createTestResources();
startOperatorLogs();
} else {
createOperator();
registerReconcilers();
Expand Down Expand Up @@ -176,6 +182,21 @@ private static void createTestResources() throws Exception {
});
}

private static void startOperatorLogs() {
List<Pod> operatorPods = new ArrayList<>();
await().ignoreExceptions().untilAsserted(() -> {
operatorPods.clear();
operatorPods.addAll(client.pods()
.withLabels(Map.of(
"app.kubernetes.io/name", "apicurio-registry-operator",
"app.kubernetes.io/component", "operator",
"app.kubernetes.io/part-of", "apicurio-registry"))
.list().getItems());
assertThat(operatorPods).hasSize(1);
});
podLogManager.startPodLog(ResourceID.fromResource(operatorPods.get(0)));
}

private static void cleanTestResources() throws Exception {
if (cleanup) {
log.info("Deleting generated resources from Namespace {}", namespace);
Expand Down Expand Up @@ -258,7 +279,7 @@ public static void after() throws Exception {
} else {
cleanTestResources();
}

podLogManager.stopAndWait();
if (cleanup) {
log.info("Deleting namespace : {}", namespace);
assertThat(client.namespaces().withName(namespace).delete()).isNotNull();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
package io.apicurio.registry.operator.it;

import io.fabric8.kubernetes.client.KubernetesClient;
import io.fabric8.kubernetes.client.dsl.LogWatch;
import io.javaoperatorsdk.operator.processing.event.ResourceID;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.time.Duration;
import java.time.Instant;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;

import static java.util.concurrent.TimeUnit.SECONDS;
import static org.awaitility.Awaitility.await;

public class PodLogManager {

private static final Logger log = LoggerFactory.getLogger(PodLogManager.class);

private final KubernetesClient k8sClient;

private final Map<ResourceID, AtomicBoolean> activePodLogMap = new ConcurrentHashMap<>();

public PodLogManager(KubernetesClient k8sClient) {
this.k8sClient = k8sClient;
}

private String getNamespace(ResourceID id) {
return id.getNamespace().orElse("default");
}

public void startPodLog(ResourceID podID) {
k8sClient.pods().inNamespace(getNamespace(podID)).withName(podID.getName()).waitUntilReady(60,
SECONDS);
new Thread(() -> {
StringBuilder chunk = new StringBuilder();
try (
LogWatch logWatch = k8sClient.pods().inNamespace(getNamespace(podID)).withName(podID.getName()).watchLog();
BufferedReader reader = new BufferedReader(new InputStreamReader(logWatch.getOutput()))
) {
AtomicBoolean stop = new AtomicBoolean(false);
log.debug("START LOG of pod {}/{}", getNamespace(podID), podID.getName());
activePodLogMap.put(podID, stop);
var lastWriteAt = Instant.now();
while (!stop.get()) {
var line = reader.readLine();
if (line != null) {
chunk.append(getNamespace(podID)).append("/").append(podID.getName()).append(" >>> ")
.append(line).append("\n");
if (lastWriteAt.plus(Duration.ofSeconds(5)).isBefore(Instant.now())) {
log.debug("LOG of pod {}/{}:\n{}", getNamespace(podID), podID.getName(), chunk);
chunk.setLength(0);
lastWriteAt = Instant.now();
}
} else {
stop.set(true);
}
}
} catch (Exception ex) {
log.error("Error while reading logs of pod {}/{}", getNamespace(podID), podID.getName(), ex);
} finally {
if (chunk.length() > 0) {
log.debug("LOG of pod {}/{}:\n{}", getNamespace(podID), podID.getName(), chunk);
}
log.debug("END LOG of pod {}/{}", getNamespace(podID), podID.getName());
activePodLogMap.remove(podID);
}
}).start();
}

public void stopAndWait() {
activePodLogMap.values().forEach(stop -> stop.set(true));
await().until(activePodLogMap::isEmpty);
}
}

0 comments on commit 042842d

Please sign in to comment.