diff --git a/logstash-core/src/main/java/org/logstash/common/BufferedTokenizerExt.java b/logstash-core/src/main/java/org/logstash/common/BufferedTokenizerExt.java index be1c64d2356..e2c476520c1 100644 --- a/logstash-core/src/main/java/org/logstash/common/BufferedTokenizerExt.java +++ b/logstash-core/src/main/java/org/logstash/common/BufferedTokenizerExt.java @@ -23,14 +23,18 @@ import org.jruby.Ruby; import org.jruby.RubyArray; import org.jruby.RubyClass; +import org.jruby.RubyEncoding; import org.jruby.RubyObject; import org.jruby.RubyString; import org.jruby.anno.JRubyClass; import org.jruby.anno.JRubyMethod; import org.jruby.runtime.ThreadContext; import org.jruby.runtime.builtin.IRubyObject; +import org.jruby.util.ByteList; import org.logstash.RubyUtil; +import java.nio.charset.Charset; + @JRubyClass(name = "BufferedTokenizer") public class BufferedTokenizerExt extends RubyObject { @@ -40,10 +44,13 @@ public class BufferedTokenizerExt extends RubyObject { freeze(RubyUtil.RUBY.getCurrentContext()); private @SuppressWarnings("rawtypes") RubyArray input = RubyUtil.RUBY.newArray(); + private StringBuilder headToken = new StringBuilder(); private RubyString delimiter = NEW_LINE; private int sizeLimit; private boolean hasSizeLimit; private int inputSize; + private boolean bufferFullErrorNotified = false; + private String encodingName; public BufferedTokenizerExt(final Ruby runtime, final RubyClass metaClass) { super(runtime, metaClass); @@ -80,23 +87,76 @@ public IRubyObject init(final ThreadContext context, IRubyObject[] args) { @JRubyMethod @SuppressWarnings("rawtypes") public RubyArray extract(final ThreadContext context, IRubyObject data) { + RubyEncoding encoding = (RubyEncoding) data.convertToString().encoding(context); + encodingName = encoding.getEncoding().getCharsetName(); final RubyArray entities = data.convertToString().split(delimiter, -1); + if (!bufferFullErrorNotified) { + input.clear(); + input.concat(entities); + } else { + // after a full buffer signal + if (input.isEmpty()) { + // after a buffer full error, the remaining part of the line, till next delimiter, + // has to be consumed, unless the input buffer doesn't still contain fragments of + // subsequent tokens. + entities.shift(context); + input.concat(entities); + } else { + // merge last of the input with first of incoming data segment + if (!entities.isEmpty()) { + RubyString last = ((RubyString) input.pop(context)); + RubyString nextFirst = ((RubyString) entities.shift(context)); + entities.unshift(last.concat(nextFirst)); + input.concat(entities); + } + } + } + if (hasSizeLimit) { - final int entitiesSize = ((RubyString) entities.first()).size(); + if (bufferFullErrorNotified) { + bufferFullErrorNotified = false; + if (input.isEmpty()) { + return RubyUtil.RUBY.newArray(); + } + } + final int entitiesSize = ((RubyString) input.first()).size(); if (inputSize + entitiesSize > sizeLimit) { - throw new IllegalStateException("input buffer full"); + bufferFullErrorNotified = true; + headToken = new StringBuilder(); + String errorMessage = String.format("input buffer full, consumed token which exceeded the sizeLimit %d; inputSize: %d, entitiesSize %d", sizeLimit, inputSize, entitiesSize); + inputSize = 0; + input.shift(context); // consume the token fragment that generates the buffer full + throw new IllegalStateException(errorMessage); } this.inputSize = inputSize + entitiesSize; } - input.append(entities.shift(context)); - if (entities.isEmpty()) { + + if (input.getLength() < 2) { + // this is a specialization case which avoid adding and removing from input accumulator + // when it contains just one element + headToken.append(input.shift(context)); // remove head return RubyUtil.RUBY.newArray(); } - entities.unshift(input.join(context)); - input.clear(); - input.append(entities.pop(context)); - inputSize = ((RubyString) input.first()).size(); - return entities; + + if (headToken.length() > 0) { + // if there is a pending token part, merge it with the first token segment present + // in the accumulator, and clean the pending token part. + headToken.append(input.shift(context)); // append buffer to first element and + // create new RubyString with the data specified encoding + RubyString encodedHeadToken = toEncodedRubyString(context, headToken.toString()); + input.unshift(encodedHeadToken); // reinsert it into the array + headToken = new StringBuilder(); + } + headToken.append(input.pop(context)); // put the leftovers in headToken for later + inputSize = headToken.length(); + return input; + } + + private RubyString toEncodedRubyString(ThreadContext context, String input) { + // Depends on the encodingName being set by the extract method, could potentially raise if not set. + RubyString result = RubyUtil.RUBY.newString(new ByteList(input.getBytes(Charset.forName(encodingName)))); + result.force_encoding(context, RubyUtil.RUBY.newString(encodingName)); + return result; } /** @@ -108,15 +168,30 @@ public RubyArray extract(final ThreadContext context, IRubyObject data) { */ @JRubyMethod public IRubyObject flush(final ThreadContext context) { - final IRubyObject buffer = input.join(context); - input.clear(); + final IRubyObject buffer = RubyUtil.toRubyObject(headToken.toString()); + headToken = new StringBuilder(); inputSize = 0; - return buffer; + + // create new RubyString with the last data specified encoding, if exists + RubyString encodedHeadToken; + if (encodingName != null) { + encodedHeadToken = toEncodedRubyString(context, buffer.toString()); + } else { + // When used with TCP input it could be that on socket connection the flush method + // is invoked while no invocation of extract, leaving the encoding name unassigned. + // In such case also the headToken must be empty + if (!buffer.toString().isEmpty()) { + throw new IllegalStateException("invoked flush with unassigned encoding but not empty head token, this shouldn't happen"); + } + encodedHeadToken = (RubyString) buffer; + } + + return encodedHeadToken; } @JRubyMethod(name = "empty?") public IRubyObject isEmpty(final ThreadContext context) { - return RubyUtil.RUBY.newBoolean(input.isEmpty() && (inputSize == 0)); + return RubyUtil.RUBY.newBoolean(headToken.toString().isEmpty() && (inputSize == 0)); } } diff --git a/logstash-core/src/test/java/org/logstash/common/BufferedTokenizerExtTest.java b/logstash-core/src/test/java/org/logstash/common/BufferedTokenizerExtTest.java new file mode 100644 index 00000000000..524abb36ed5 --- /dev/null +++ b/logstash-core/src/test/java/org/logstash/common/BufferedTokenizerExtTest.java @@ -0,0 +1,161 @@ +/* + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.logstash.common; + +import org.jruby.RubyArray; +import org.jruby.RubyEncoding; +import org.jruby.RubyString; +import org.jruby.runtime.ThreadContext; +import org.jruby.runtime.builtin.IRubyObject; +import org.junit.Before; +import org.junit.Test; +import org.logstash.RubyTestBase; +import org.logstash.RubyUtil; + +import java.util.List; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.logstash.RubyUtil.RUBY; + +@SuppressWarnings("unchecked") +public final class BufferedTokenizerExtTest extends RubyTestBase { + + private BufferedTokenizerExt sut; + private ThreadContext context; + + @Before + public void setUp() { + sut = new BufferedTokenizerExt(RubyUtil.RUBY, RubyUtil.BUFFERED_TOKENIZER); + context = RUBY.getCurrentContext(); + IRubyObject[] args = {}; + sut.init(context, args); + } + + @Test + public void shouldTokenizeASingleToken() { + RubyArray tokens = (RubyArray) sut.extract(context, RubyUtil.RUBY.newString("foo\n")); + + assertEquals(List.of("foo"), tokens); + } + + @Test + public void shouldMergeMultipleToken() { + RubyArray tokens = (RubyArray) sut.extract(context, RubyUtil.RUBY.newString("foo")); + assertTrue(tokens.isEmpty()); + + tokens = (RubyArray) sut.extract(context, RubyUtil.RUBY.newString("bar\n")); + assertEquals(List.of("foobar"), tokens); + } + + @Test + public void shouldTokenizeMultipleToken() { + RubyArray tokens = (RubyArray) sut.extract(context, RubyUtil.RUBY.newString("foo\nbar\n")); + + assertEquals(List.of("foo", "bar"), tokens); + } + + @Test + public void shouldIgnoreEmptyPayload() { + RubyArray tokens = (RubyArray) sut.extract(context, RubyUtil.RUBY.newString("")); + assertTrue(tokens.isEmpty()); + + tokens = (RubyArray) sut.extract(context, RubyUtil.RUBY.newString("foo\nbar")); + assertEquals(List.of("foo"), tokens); + } + + @Test + public void shouldTokenizeEmptyPayloadWithNewline() { + RubyArray tokens = (RubyArray) sut.extract(context, RubyUtil.RUBY.newString("\n")); + assertEquals(List.of(""), tokens); + + tokens = (RubyArray) sut.extract(context, RubyUtil.RUBY.newString("\n\n\n")); + assertEquals(List.of("", "", ""), tokens); + } + + @Test + public void shouldNotChangeEncodingOfTokensAfterPartitioning() { + RubyString rubyString = RubyString.newString(RUBY, new byte[]{(byte) 0xA3, 0x0A, 0x41}); // £ character, newline, A + IRubyObject rubyInput = rubyString.force_encoding(context, RUBY.newString("ISO8859-1")); + RubyArray tokens = (RubyArray)sut.extract(context, rubyInput); + + // read the first token, the £ string + IRubyObject firstToken = tokens.shift(context); + assertEquals("£", firstToken.toString()); + + // verify encoding "ISO8859-1" is preserved in the Java to Ruby String conversion + RubyEncoding encoding = (RubyEncoding) firstToken.callMethod(context, "encoding"); + assertEquals("ISO-8859-1", encoding.toString()); + } + + @Test + public void shouldNotChangeEncodingOfTokensAfterPartitioningInCaseMultipleExtractionInInvoked() { + RubyString rubyString = RubyString.newString(RUBY, new byte[]{(byte) 0xA3}); // £ character + IRubyObject rubyInput = rubyString.force_encoding(context, RUBY.newString("ISO8859-1")); + sut.extract(context, rubyInput); + IRubyObject capitalAInLatin1 = RubyString.newString(RUBY, new byte[]{(byte) 0x41}) + .force_encoding(context, RUBY.newString("ISO8859-1")); + RubyArray tokens = (RubyArray)sut.extract(context, capitalAInLatin1); + assertTrue(tokens.isEmpty()); + + tokens = (RubyArray)sut.extract(context, RubyString.newString(RUBY, new byte[]{(byte) 0x0A})); + + // read the first token, the £ string + IRubyObject firstToken = tokens.shift(context); + assertEquals("£A", firstToken.toString()); + + // verify encoding "ISO8859-1" is preserved in the Java to Ruby String conversion + RubyEncoding encoding = (RubyEncoding) firstToken.callMethod(context, "encoding"); + assertEquals("ISO-8859-1", encoding.toString()); + } + + @Test + public void shouldNotChangeEncodingOfTokensAfterPartitioningWhenRetrieveLastFlushedToken() { + RubyString rubyString = RubyString.newString(RUBY, new byte[]{(byte) 0xA3, 0x0A, 0x41}); // £ character, newline, A + IRubyObject rubyInput = rubyString.force_encoding(context, RUBY.newString("ISO8859-1")); + RubyArray tokens = (RubyArray)sut.extract(context, rubyInput); + + // read the first token, the £ string + IRubyObject firstToken = tokens.shift(context); + assertEquals("£", firstToken.toString()); + + // flush and check that the remaining A is still encoded in ISO8859-1 + IRubyObject lastToken = sut.flush(context); + assertEquals("A", lastToken.toString()); + + // verify encoding "ISO8859-1" is preserved in the Java to Ruby String conversion + RubyEncoding encoding = (RubyEncoding) lastToken.callMethod(context, "encoding"); + assertEquals("ISO-8859-1", encoding.toString()); + } + + @Test + public void givenDirectFlushInvocationUTF8EncodingIsApplied() { + RubyString rubyString = RubyString.newString(RUBY, new byte[]{(byte) 0xA3, 0x41}); // £ character, A + IRubyObject rubyInput = rubyString.force_encoding(context, RUBY.newString("ISO8859-1")); + + // flush and check that the remaining A is still encoded in ISO8859-1 + IRubyObject lastToken = sut.flush(context); + assertEquals("", lastToken.toString()); + + // verify encoding "ISO8859-1" is preserved in the Java to Ruby String conversion + RubyEncoding encoding = (RubyEncoding) lastToken.callMethod(context, "encoding"); + assertEquals("UTF-8", encoding.toString()); + } +} \ No newline at end of file diff --git a/logstash-core/src/test/java/org/logstash/common/BufferedTokenizerExtWithDelimiterTest.java b/logstash-core/src/test/java/org/logstash/common/BufferedTokenizerExtWithDelimiterTest.java new file mode 100644 index 00000000000..19872e66c3c --- /dev/null +++ b/logstash-core/src/test/java/org/logstash/common/BufferedTokenizerExtWithDelimiterTest.java @@ -0,0 +1,66 @@ +/* + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.logstash.common; + +import org.jruby.RubyArray; +import org.jruby.RubyString; +import org.jruby.runtime.ThreadContext; +import org.jruby.runtime.builtin.IRubyObject; +import org.junit.Before; +import org.junit.Test; +import org.logstash.RubyTestBase; +import org.logstash.RubyUtil; + +import java.util.List; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.logstash.RubyUtil.RUBY; + +@SuppressWarnings("unchecked") +public final class BufferedTokenizerExtWithDelimiterTest extends RubyTestBase { + + private BufferedTokenizerExt sut; + private ThreadContext context; + + @Before + public void setUp() { + sut = new BufferedTokenizerExt(RubyUtil.RUBY, RubyUtil.BUFFERED_TOKENIZER); + context = RUBY.getCurrentContext(); + IRubyObject[] args = {RubyUtil.RUBY.newString("||")}; + sut.init(context, args); + } + + @Test + public void shouldTokenizeMultipleToken() { + RubyArray tokens = (RubyArray) sut.extract(context, RubyUtil.RUBY.newString("foo||b|r||")); + + assertEquals(List.of("foo", "b|r"), tokens); + } + + @Test + public void shouldIgnoreEmptyPayload() { + RubyArray tokens = (RubyArray) sut.extract(context, RubyUtil.RUBY.newString("")); + assertTrue(tokens.isEmpty()); + + tokens = (RubyArray) sut.extract(context, RubyUtil.RUBY.newString("foo||bar")); + assertEquals(List.of("foo"), tokens); + } +} diff --git a/logstash-core/src/test/java/org/logstash/common/BufferedTokenizerExtWithSizeLimitTest.java b/logstash-core/src/test/java/org/logstash/common/BufferedTokenizerExtWithSizeLimitTest.java new file mode 100644 index 00000000000..9a07242369d --- /dev/null +++ b/logstash-core/src/test/java/org/logstash/common/BufferedTokenizerExtWithSizeLimitTest.java @@ -0,0 +1,111 @@ +/* + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.logstash.common; + +import org.jruby.RubyArray; +import org.jruby.RubyString; +import org.jruby.runtime.ThreadContext; +import org.jruby.runtime.builtin.IRubyObject; +import org.junit.Before; +import org.junit.Test; +import org.logstash.RubyTestBase; +import org.logstash.RubyUtil; + +import java.util.List; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.containsString; +import static org.junit.Assert.*; +import static org.logstash.RubyUtil.RUBY; + +@SuppressWarnings("unchecked") +public final class BufferedTokenizerExtWithSizeLimitTest extends RubyTestBase { + + private BufferedTokenizerExt sut; + private ThreadContext context; + + @Before + public void setUp() { + sut = new BufferedTokenizerExt(RubyUtil.RUBY, RubyUtil.BUFFERED_TOKENIZER); + context = RUBY.getCurrentContext(); + IRubyObject[] args = {RubyUtil.RUBY.newString("\n"), RubyUtil.RUBY.newFixnum(10)}; + sut.init(context, args); + } + + @Test + public void givenTokenWithinSizeLimitWhenExtractedThenReturnTokens() { + RubyArray tokens = (RubyArray) sut.extract(context, RubyUtil.RUBY.newString("foo\nbar\n")); + + assertEquals(List.of("foo", "bar"), tokens); + } + + @Test + public void givenTokenExceedingSizeLimitWhenExtractedThenThrowsAnError() { + Exception thrownException = assertThrows(IllegalStateException.class, () -> { + sut.extract(context, RubyUtil.RUBY.newString("this_is_longer_than_10\nkaboom")); + }); + assertThat(thrownException.getMessage(), containsString("input buffer full")); + } + + @Test + public void givenExtractedThrownLimitErrorWhenFeedFreshDataThenReturnTokenStartingFromEndOfOffendingToken() { + Exception thrownException = assertThrows(IllegalStateException.class, () -> { + sut.extract(context, RubyUtil.RUBY.newString("this_is_longer_than_10\nkaboom")); + }); + assertThat(thrownException.getMessage(), containsString("input buffer full")); + + RubyArray tokens = (RubyArray) sut.extract(context, RubyUtil.RUBY.newString("\nanother")); + assertEquals("After buffer full error should resume from the end of line", List.of("kaboom"), tokens); + } + + @Test + public void givenExtractInvokedWithDifferentFramingAfterBufferFullErrorTWhenFeedFreshDataThenReturnTokenStartingFromEndOfOffendingToken() { + sut.extract(context, RubyUtil.RUBY.newString("aaaa")); + + Exception thrownException = assertThrows(IllegalStateException.class, () -> { + sut.extract(context, RubyUtil.RUBY.newString("aaaaaaa")); + }); + assertThat(thrownException.getMessage(), containsString("input buffer full")); + + RubyArray tokens = (RubyArray) sut.extract(context, RubyUtil.RUBY.newString("aa\nbbbb\nccc")); + assertEquals(List.of("bbbb"), tokens); + } + + @Test + public void giveMultipleSegmentsThatGeneratesMultipleBufferFullErrorsThenIsAbleToRecoverTokenization() { + sut.extract(context, RubyUtil.RUBY.newString("aaaa")); + + //first buffer full on 13 "a" letters + Exception thrownException = assertThrows(IllegalStateException.class, () -> { + sut.extract(context, RubyUtil.RUBY.newString("aaaaaaa")); + }); + assertThat(thrownException.getMessage(), containsString("input buffer full")); + + // second buffer full on 11 "b" letters + Exception secondThrownException = assertThrows(IllegalStateException.class, () -> { + sut.extract(context, RubyUtil.RUBY.newString("aa\nbbbbbbbbbbb\ncc")); + }); + assertThat(secondThrownException.getMessage(), containsString("input buffer full")); + + // now should resemble processing on c and d + RubyArray tokens = (RubyArray) sut.extract(context, RubyUtil.RUBY.newString("ccc\nddd\n")); + assertEquals(List.of("ccccc", "ddd"), tokens); + } +} \ No newline at end of file