Skip to content

Commit

Permalink
Fixes testcontainers#5843: log consumers are now called with exactly …
Browse files Browse the repository at this point in the history
…one complete log line

Motivation: Docker logs provide a stream, the frames provided are not necessarily split by newline characters. So it might happen that a frame contains partial log lines.
Changes:
- Line splitting, partial log buffering and line merging independently of the stream type (RAW, STDOUT, STDERR)
- OutputFrame does consistently not contain newline characters (independent of TTY)
- ToStringConsumer now adds newlines
- Slf4jLogConsumer does not need to remove any newlines

Also fixes testcontainers#4110, testcontainers#455
  • Loading branch information
SgtSilvio committed Jan 18, 2023
1 parent d243084 commit 15dcf98
Show file tree
Hide file tree
Showing 4 changed files with 71 additions and 118 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,13 @@

import com.github.dockerjava.api.async.ResultCallbackTemplate;
import com.github.dockerjava.api.model.Frame;
import com.github.dockerjava.api.model.StreamType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.function.Consumer;
Expand All @@ -24,27 +21,9 @@ public class FrameConsumerResultCallback extends ResultCallbackTemplate<FrameCon

private static final Logger LOGGER = LoggerFactory.getLogger(FrameConsumerResultCallback.class);

private static final byte[] EMPTY_LINE = new byte[0];
private final Map<OutputFrame.OutputType, LineConsumer> consumers = new HashMap<>();

private static final Pattern ANSI_COLOR_PATTERN = Pattern.compile("\u001B\\[[0-9;]+m");

private static final String LINE_BREAK_REGEX = "((\\r?\\n)|(\\r))";

private static final Pattern LINE_BREAK_PATTERN = Pattern.compile(LINE_BREAK_REGEX);

static final Pattern LINE_BREAK_AT_END_PATTERN = Pattern.compile(LINE_BREAK_REGEX + "$");

private Map<OutputFrame.OutputType, Consumer<OutputFrame>> consumers;

private CountDownLatch completionLatch = new CountDownLatch(1);

private StringBuilder logString = new StringBuilder();

private OutputFrame brokenFrame;

public FrameConsumerResultCallback() {
consumers = new HashMap<>();
}
private final CountDownLatch completionLatch = new CountDownLatch(1);

/**
* Set this callback to use the specified consumer for the given output type.
Expand All @@ -53,23 +32,19 @@ public FrameConsumerResultCallback() {
* @param consumer the consumer to use for that output type
*/
public void addConsumer(OutputFrame.OutputType outputType, Consumer<OutputFrame> consumer) {
consumers.put(outputType, consumer);
consumers.put(outputType, new LineConsumer(outputType, consumer));
}

@Override
public void onNext(Frame frame) {
if (frame != null) {
OutputFrame outputFrame = OutputFrame.forFrame(frame);
if (outputFrame != null) {
Consumer<OutputFrame> consumer = consumers.get(outputFrame.getType());
final OutputFrame.OutputType type = OutputFrame.OutputType.forStreamType(frame.getStreamType());
if (type != null) {
final LineConsumer consumer = consumers.get(type);
if (consumer == null) {
LOGGER.error("got frame with type {}, for which no handler is configured", frame.getStreamType());
} else if (outputFrame.getBytes() != null && outputFrame.getBytes().length > 0) {
if (frame.getStreamType() == StreamType.RAW) {
processRawFrame(outputFrame, consumer);
} else {
processOtherFrame(outputFrame, consumer);
}
} else if (frame.getPayload() != null) {
consumer.processFrame(frame.getPayload());
}
}
}
Expand All @@ -85,18 +60,8 @@ public void onError(Throwable throwable) {

@Override
public void close() throws IOException {
OutputFrame lastLine = null;

if (logString.length() > 0) {
lastLine = new OutputFrame(OutputFrame.OutputType.STDOUT, logString.toString().getBytes());
}

// send an END frame to every consumer... but only once per consumer.
for (Consumer<OutputFrame> consumer : new HashSet<>(consumers.values())) {
if (lastLine != null) {
consumer.accept(lastLine);
}
consumer.accept(OutputFrame.END);
for (final LineConsumer consumer : consumers.values()) {
consumer.close();
}
super.close();

Expand All @@ -110,70 +75,70 @@ public CountDownLatch getCompletionLatch() {
return completionLatch;
}

private synchronized void processRawFrame(OutputFrame outputFrame, Consumer<OutputFrame> consumer) {
String utf8String = outputFrame.getUtf8String();
byte[] bytes = outputFrame.getBytes();
private static class LineConsumer {

// Merging the strings by bytes to solve the problem breaking non-latin unicode symbols.
if (brokenFrame != null) {
bytes = merge(brokenFrame.getBytes(), bytes);
utf8String = new String(bytes);
brokenFrame = null;
}
// Logger chunks can break the string in middle of multibyte unicode character.
// Backup the bytes to reconstruct proper char sequence with bytes from next frame.
int lastCharacterType = Character.getType(utf8String.charAt(utf8String.length() - 1));
if (lastCharacterType == Character.OTHER_SYMBOL) {
brokenFrame = new OutputFrame(outputFrame.getType(), bytes);
return;
}
private static final Pattern ANSI_COLOR_PATTERN = Pattern.compile("\u001B\\[[0-9;]+m");

utf8String = processAnsiColorCodes(utf8String, consumer);
normalizeLogLines(utf8String, consumer);
}

private synchronized void processOtherFrame(OutputFrame outputFrame, Consumer<OutputFrame> consumer) {
String utf8String = outputFrame.getUtf8String();
private final OutputFrame.OutputType type;
private final Consumer<OutputFrame> consumer;

utf8String = processAnsiColorCodes(utf8String, consumer);
consumer.accept(new OutputFrame(outputFrame.getType(), utf8String.getBytes()));
}
private final ByteArrayOutputStream buffer = new ByteArrayOutputStream();
private boolean lastCR = false;

private void normalizeLogLines(String utf8String, Consumer<OutputFrame> consumer) {
// Reformat strings to normalize new lines.
List<String> lines = new ArrayList<>(Arrays.asList(LINE_BREAK_PATTERN.split(utf8String)));
if (lines.isEmpty()) {
consumer.accept(new OutputFrame(OutputFrame.OutputType.STDOUT, EMPTY_LINE));
return;
}
if (utf8String.startsWith("\n") || utf8String.startsWith("\r")) {
lines.add(0, "");
LineConsumer(final OutputFrame.OutputType type, final Consumer<OutputFrame> consumer) {
this.type = type;
this.consumer = consumer;
}
if (utf8String.endsWith("\n") || utf8String.endsWith("\r")) {
lines.add("");

void processFrame(final byte[] b) {
int start = 0;
int i = 0;
while (i < b.length) {
switch (b[i]) {
case '\n': {
if (!lastCR) {
buffer.write(b, start, i - start);
consume();
}
start = i + 1;
lastCR = false;
break;
}
case '\r': {
buffer.write(b, start, i - start);
consume();
start = i + 1;
lastCR = true;
break;
}
default: {
lastCR = false;
}
}
i++;
}
buffer.write(b, start, b.length - start);
}
for (int i = 0; i < lines.size() - 1; i++) {
String line = lines.get(i);
if (i == 0 && logString.length() > 0) {
line = logString.toString() + line;
logString.setLength(0);

void close() {
if (buffer.size() > 0) {
consume();
}
consumer.accept(new OutputFrame(OutputFrame.OutputType.STDOUT, line.getBytes()));
consumer.accept(OutputFrame.END);
}
logString.append(lines.get(lines.size() - 1));
}

private String processAnsiColorCodes(String utf8String, Consumer<OutputFrame> consumer) {
if (!(consumer instanceof BaseConsumer) || ((BaseConsumer) consumer).isRemoveColorCodes()) {
return ANSI_COLOR_PATTERN.matcher(utf8String).replaceAll("");
private void consume() {
final String string = new String(buffer.toByteArray(), StandardCharsets.UTF_8);
final byte[] bytes = processAnsiColorCodes(string).getBytes(StandardCharsets.UTF_8);
consumer.accept(new OutputFrame(type, bytes));
buffer.reset();
}
return utf8String;
}

private byte[] merge(byte[] str1, byte[] str2) {
byte[] mergedString = new byte[str1.length + str2.length];
System.arraycopy(str1, 0, mergedString, 0, str1.length);
System.arraycopy(str2, 0, mergedString, str1.length, str2.length);
return mergedString;
private String processAnsiColorCodes(final String utf8String) {
if (!(consumer instanceof BaseConsumer) || ((BaseConsumer<?>) consumer).isRemoveColorCodes()) {
return ANSI_COLOR_PATTERN.matcher(utf8String).replaceAll("");
}
return utf8String;
}
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package org.testcontainers.containers.output;

import com.github.dockerjava.api.model.Frame;
import com.github.dockerjava.api.model.StreamType;
import com.google.common.base.Charsets;

Expand Down Expand Up @@ -44,7 +43,6 @@ public enum OutputType {
public static OutputType forStreamType(StreamType streamType) {
switch (streamType) {
case RAW:
return STDOUT;
case STDOUT:
return STDOUT;
case STDERR:
Expand All @@ -54,12 +52,4 @@ public static OutputType forStreamType(StreamType streamType) {
}
}
}

public static OutputFrame forFrame(Frame frame) {
OutputType outputType = OutputType.forStreamType(frame.getStreamType());
if (outputType == null) {
return null;
}
return new OutputFrame(outputType, frame.getPayload());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,12 +50,10 @@ public Slf4jLogConsumer withSeparateOutputStreams() {

@Override
public void accept(OutputFrame outputFrame) {
OutputFrame.OutputType outputType = outputFrame.getType();
final OutputFrame.OutputType outputType = outputFrame.getType();
final String utf8String = outputFrame.getUtf8String();

String utf8String = outputFrame.getUtf8String();
utf8String = FrameConsumerResultCallback.LINE_BREAK_AT_END_PATTERN.matcher(utf8String).replaceAll("");

Map<String, String> originalMdc = MDC.getCopyOfContextMap();
final Map<String, String> originalMdc = MDC.getCopyOfContextMap();
MDC.setContextMap(mdc);
try {
switch (outputType) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,14 @@
*/
public class ToStringConsumer extends BaseConsumer<ToStringConsumer> {

private ByteArrayOutputStream stringBuffer = new ByteArrayOutputStream();
private final ByteArrayOutputStream stringBuffer = new ByteArrayOutputStream();

@Override
public void accept(OutputFrame outputFrame) {
try {
if (outputFrame.getBytes() != null) {
stringBuffer.write(outputFrame.getBytes());
stringBuffer.flush();
stringBuffer.write('\n');
}
} catch (IOException e) {
throw new RuntimeException(e);
Expand Down

0 comments on commit 15dcf98

Please sign in to comment.