From 15dcf9840683aec5e1e92766290fb79e773f3eec Mon Sep 17 00:00:00 2001 From: Silvio Giebl Date: Sun, 18 Sep 2022 20:27:49 +0200 Subject: [PATCH] Fixes #5843: log consumers are now called with exactly one complete log line Motivation: Docker logs provide a stream, the frames provided are not necessarily split by newline characters. So it might happen that a frame contains partial log lines. Changes: - Line splitting, partial log buffering and line merging independently of the stream type (RAW, STDOUT, STDERR) - OutputFrame does consistently not contain newline characters (independent of TTY) - ToStringConsumer now adds newlines - Slf4jLogConsumer does not need to remove any newlines Also fixes #4110, #455 --- .../output/FrameConsumerResultCallback.java | 167 +++++++----------- .../containers/output/OutputFrame.java | 10 -- .../containers/output/Slf4jLogConsumer.java | 8 +- .../containers/output/ToStringConsumer.java | 4 +- 4 files changed, 71 insertions(+), 118 deletions(-) diff --git a/core/src/main/java/org/testcontainers/containers/output/FrameConsumerResultCallback.java b/core/src/main/java/org/testcontainers/containers/output/FrameConsumerResultCallback.java index dfc146a8089..c38f5d750b6 100644 --- a/core/src/main/java/org/testcontainers/containers/output/FrameConsumerResultCallback.java +++ b/core/src/main/java/org/testcontainers/containers/output/FrameConsumerResultCallback.java @@ -2,16 +2,13 @@ import com.github.dockerjava.api.async.ResultCallbackTemplate; import com.github.dockerjava.api.model.Frame; -import com.github.dockerjava.api.model.StreamType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.ByteArrayOutputStream; import java.io.IOException; -import java.util.ArrayList; -import java.util.Arrays; +import java.nio.charset.StandardCharsets; import java.util.HashMap; -import java.util.HashSet; -import java.util.List; import java.util.Map; import java.util.concurrent.CountDownLatch; import java.util.function.Consumer; @@ -24,27 +21,9 @@ public class FrameConsumerResultCallback extends ResultCallbackTemplate consumers = new HashMap<>(); - private static final Pattern ANSI_COLOR_PATTERN = Pattern.compile("\u001B\\[[0-9;]+m"); - - private static final String LINE_BREAK_REGEX = "((\\r?\\n)|(\\r))"; - - private static final Pattern LINE_BREAK_PATTERN = Pattern.compile(LINE_BREAK_REGEX); - - static final Pattern LINE_BREAK_AT_END_PATTERN = Pattern.compile(LINE_BREAK_REGEX + "$"); - - private Map> consumers; - - private CountDownLatch completionLatch = new CountDownLatch(1); - - private StringBuilder logString = new StringBuilder(); - - private OutputFrame brokenFrame; - - public FrameConsumerResultCallback() { - consumers = new HashMap<>(); - } + private final CountDownLatch completionLatch = new CountDownLatch(1); /** * Set this callback to use the specified consumer for the given output type. @@ -53,23 +32,19 @@ public FrameConsumerResultCallback() { * @param consumer the consumer to use for that output type */ public void addConsumer(OutputFrame.OutputType outputType, Consumer consumer) { - consumers.put(outputType, consumer); + consumers.put(outputType, new LineConsumer(outputType, consumer)); } @Override public void onNext(Frame frame) { if (frame != null) { - OutputFrame outputFrame = OutputFrame.forFrame(frame); - if (outputFrame != null) { - Consumer consumer = consumers.get(outputFrame.getType()); + final OutputFrame.OutputType type = OutputFrame.OutputType.forStreamType(frame.getStreamType()); + if (type != null) { + final LineConsumer consumer = consumers.get(type); if (consumer == null) { LOGGER.error("got frame with type {}, for which no handler is configured", frame.getStreamType()); - } else if (outputFrame.getBytes() != null && outputFrame.getBytes().length > 0) { - if (frame.getStreamType() == StreamType.RAW) { - processRawFrame(outputFrame, consumer); - } else { - processOtherFrame(outputFrame, consumer); - } + } else if (frame.getPayload() != null) { + consumer.processFrame(frame.getPayload()); } } } @@ -85,18 +60,8 @@ public void onError(Throwable throwable) { @Override public void close() throws IOException { - OutputFrame lastLine = null; - - if (logString.length() > 0) { - lastLine = new OutputFrame(OutputFrame.OutputType.STDOUT, logString.toString().getBytes()); - } - - // send an END frame to every consumer... but only once per consumer. - for (Consumer consumer : new HashSet<>(consumers.values())) { - if (lastLine != null) { - consumer.accept(lastLine); - } - consumer.accept(OutputFrame.END); + for (final LineConsumer consumer : consumers.values()) { + consumer.close(); } super.close(); @@ -110,70 +75,70 @@ public CountDownLatch getCompletionLatch() { return completionLatch; } - private synchronized void processRawFrame(OutputFrame outputFrame, Consumer consumer) { - String utf8String = outputFrame.getUtf8String(); - byte[] bytes = outputFrame.getBytes(); + private static class LineConsumer { - // Merging the strings by bytes to solve the problem breaking non-latin unicode symbols. - if (brokenFrame != null) { - bytes = merge(brokenFrame.getBytes(), bytes); - utf8String = new String(bytes); - brokenFrame = null; - } - // Logger chunks can break the string in middle of multibyte unicode character. - // Backup the bytes to reconstruct proper char sequence with bytes from next frame. - int lastCharacterType = Character.getType(utf8String.charAt(utf8String.length() - 1)); - if (lastCharacterType == Character.OTHER_SYMBOL) { - brokenFrame = new OutputFrame(outputFrame.getType(), bytes); - return; - } + private static final Pattern ANSI_COLOR_PATTERN = Pattern.compile("\u001B\\[[0-9;]+m"); - utf8String = processAnsiColorCodes(utf8String, consumer); - normalizeLogLines(utf8String, consumer); - } - - private synchronized void processOtherFrame(OutputFrame outputFrame, Consumer consumer) { - String utf8String = outputFrame.getUtf8String(); + private final OutputFrame.OutputType type; + private final Consumer consumer; - utf8String = processAnsiColorCodes(utf8String, consumer); - consumer.accept(new OutputFrame(outputFrame.getType(), utf8String.getBytes())); - } + private final ByteArrayOutputStream buffer = new ByteArrayOutputStream(); + private boolean lastCR = false; - private void normalizeLogLines(String utf8String, Consumer consumer) { - // Reformat strings to normalize new lines. - List lines = new ArrayList<>(Arrays.asList(LINE_BREAK_PATTERN.split(utf8String))); - if (lines.isEmpty()) { - consumer.accept(new OutputFrame(OutputFrame.OutputType.STDOUT, EMPTY_LINE)); - return; - } - if (utf8String.startsWith("\n") || utf8String.startsWith("\r")) { - lines.add(0, ""); + LineConsumer(final OutputFrame.OutputType type, final Consumer consumer) { + this.type = type; + this.consumer = consumer; } - if (utf8String.endsWith("\n") || utf8String.endsWith("\r")) { - lines.add(""); + + void processFrame(final byte[] b) { + int start = 0; + int i = 0; + while (i < b.length) { + switch (b[i]) { + case '\n': { + if (!lastCR) { + buffer.write(b, start, i - start); + consume(); + } + start = i + 1; + lastCR = false; + break; + } + case '\r': { + buffer.write(b, start, i - start); + consume(); + start = i + 1; + lastCR = true; + break; + } + default: { + lastCR = false; + } + } + i++; + } + buffer.write(b, start, b.length - start); } - for (int i = 0; i < lines.size() - 1; i++) { - String line = lines.get(i); - if (i == 0 && logString.length() > 0) { - line = logString.toString() + line; - logString.setLength(0); + + void close() { + if (buffer.size() > 0) { + consume(); } - consumer.accept(new OutputFrame(OutputFrame.OutputType.STDOUT, line.getBytes())); + consumer.accept(OutputFrame.END); } - logString.append(lines.get(lines.size() - 1)); - } - private String processAnsiColorCodes(String utf8String, Consumer consumer) { - if (!(consumer instanceof BaseConsumer) || ((BaseConsumer) consumer).isRemoveColorCodes()) { - return ANSI_COLOR_PATTERN.matcher(utf8String).replaceAll(""); + private void consume() { + final String string = new String(buffer.toByteArray(), StandardCharsets.UTF_8); + final byte[] bytes = processAnsiColorCodes(string).getBytes(StandardCharsets.UTF_8); + consumer.accept(new OutputFrame(type, bytes)); + buffer.reset(); } - return utf8String; - } - private byte[] merge(byte[] str1, byte[] str2) { - byte[] mergedString = new byte[str1.length + str2.length]; - System.arraycopy(str1, 0, mergedString, 0, str1.length); - System.arraycopy(str2, 0, mergedString, str1.length, str2.length); - return mergedString; + private String processAnsiColorCodes(final String utf8String) { + if (!(consumer instanceof BaseConsumer) || ((BaseConsumer) consumer).isRemoveColorCodes()) { + return ANSI_COLOR_PATTERN.matcher(utf8String).replaceAll(""); + } + return utf8String; + } } } diff --git a/core/src/main/java/org/testcontainers/containers/output/OutputFrame.java b/core/src/main/java/org/testcontainers/containers/output/OutputFrame.java index f6eb5c18cbc..6fca22b1926 100644 --- a/core/src/main/java/org/testcontainers/containers/output/OutputFrame.java +++ b/core/src/main/java/org/testcontainers/containers/output/OutputFrame.java @@ -1,6 +1,5 @@ package org.testcontainers.containers.output; -import com.github.dockerjava.api.model.Frame; import com.github.dockerjava.api.model.StreamType; import com.google.common.base.Charsets; @@ -44,7 +43,6 @@ public enum OutputType { public static OutputType forStreamType(StreamType streamType) { switch (streamType) { case RAW: - return STDOUT; case STDOUT: return STDOUT; case STDERR: @@ -54,12 +52,4 @@ public static OutputType forStreamType(StreamType streamType) { } } } - - public static OutputFrame forFrame(Frame frame) { - OutputType outputType = OutputType.forStreamType(frame.getStreamType()); - if (outputType == null) { - return null; - } - return new OutputFrame(outputType, frame.getPayload()); - } } diff --git a/core/src/main/java/org/testcontainers/containers/output/Slf4jLogConsumer.java b/core/src/main/java/org/testcontainers/containers/output/Slf4jLogConsumer.java index 6eff93ae4c8..c2570cfa62b 100644 --- a/core/src/main/java/org/testcontainers/containers/output/Slf4jLogConsumer.java +++ b/core/src/main/java/org/testcontainers/containers/output/Slf4jLogConsumer.java @@ -50,12 +50,10 @@ public Slf4jLogConsumer withSeparateOutputStreams() { @Override public void accept(OutputFrame outputFrame) { - OutputFrame.OutputType outputType = outputFrame.getType(); + final OutputFrame.OutputType outputType = outputFrame.getType(); + final String utf8String = outputFrame.getUtf8String(); - String utf8String = outputFrame.getUtf8String(); - utf8String = FrameConsumerResultCallback.LINE_BREAK_AT_END_PATTERN.matcher(utf8String).replaceAll(""); - - Map originalMdc = MDC.getCopyOfContextMap(); + final Map originalMdc = MDC.getCopyOfContextMap(); MDC.setContextMap(mdc); try { switch (outputType) { diff --git a/core/src/main/java/org/testcontainers/containers/output/ToStringConsumer.java b/core/src/main/java/org/testcontainers/containers/output/ToStringConsumer.java index 444434fa702..e80eb312b7e 100644 --- a/core/src/main/java/org/testcontainers/containers/output/ToStringConsumer.java +++ b/core/src/main/java/org/testcontainers/containers/output/ToStringConsumer.java @@ -11,14 +11,14 @@ */ public class ToStringConsumer extends BaseConsumer { - private ByteArrayOutputStream stringBuffer = new ByteArrayOutputStream(); + private final ByteArrayOutputStream stringBuffer = new ByteArrayOutputStream(); @Override public void accept(OutputFrame outputFrame) { try { if (outputFrame.getBytes() != null) { stringBuffer.write(outputFrame.getBytes()); - stringBuffer.flush(); + stringBuffer.write('\n'); } } catch (IOException e) { throw new RuntimeException(e);