diff --git a/hadoop-common-project/hadoop-common/pom.xml b/hadoop-common-project/hadoop-common/pom.xml
index c0235b4db9e9c..12d4fb7fa2859 100644
--- a/hadoop-common-project/hadoop-common/pom.xml
+++ b/hadoop-common-project/hadoop-common/pom.xml
@@ -375,6 +375,17 @@
lz4-java
provided
+
+ io.airlift
+ aircompressor
+ provided
+
+
+ com.hadoop.gplcompression
+ hadoop-lzo
+ 0.4.20
+ test
+
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/IOUtils.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/IOUtils.java
index 121af64b01182..69e74bb1a8826 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/IOUtils.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/IOUtils.java
@@ -92,12 +92,14 @@ public static void copyBytes(InputStream in, OutputStream out, int buffSize)
PrintStream ps = out instanceof PrintStream ? (PrintStream)out : null;
byte buf[] = new byte[buffSize];
int bytesRead = in.read(buf);
+ LOG.info("bytesRead: {}", bytesRead);
while (bytesRead >= 0) {
out.write(buf, 0, bytesRead);
if ((ps != null) && ps.checkError()) {
throw new IOException("Unable to write to output stream.");
}
bytesRead = in.read(buf);
+ LOG.info("bytesRead: {}", bytesRead);
}
}
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/CodecConstants.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/CodecConstants.java
index 96410a18ebcb5..dcb8491a36135 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/CodecConstants.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/CodecConstants.java
@@ -65,4 +65,9 @@ private CodecConstants() {
* Default extension for {@link org.apache.hadoop.io.compress.ZStandardCodec}.
*/
public static final String ZSTANDARD_CODEC_EXTENSION = ".zst";
+
+ /**
+ * Default extension for {@link org.apache.hadoop.io.compress.LzoCodec}.
+ */
+ public static final String LZO_CODEC_EXTENSION = ".lzo_deflate";
}
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/LzoCodec2.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/LzoCodec2.java
new file mode 100644
index 0000000000000..cca829af4f979
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/LzoCodec2.java
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.io.compress;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.io.compress.lzo.LzoCompressor;
+import org.apache.hadoop.io.compress.lzo.LzoDecompressor;
+
+/**
+ * This class creates lzo compressors/decompressors.
+ */
+public class LzoCodec2 implements Configurable, CompressionCodec {
+ Configuration conf;
+
+ /**
+ * Set the configuration to be used by this object.
+ *
+ * @param conf the configuration object.
+ */
+ @Override
+ public void setConf(Configuration conf) {
+ this.conf = conf;
+ }
+
+ /**
+ * Return the configuration used by this object.
+ *
+ * @return the configuration object used by this object.
+ */
+ @Override
+ public Configuration getConf() {
+ return conf;
+ }
+
+ /**
+ * Create a {@link CompressionOutputStream} that will write to the given
+ * {@link OutputStream}.
+ *
+ * @param out the location for the final output stream
+ * @return a stream the user can write uncompressed data to have it compressed
+ * @throws IOException
+ */
+ @Override
+ public CompressionOutputStream createOutputStream(OutputStream out)
+ throws IOException {
+ return CompressionCodec.Util.
+ createOutputStreamWithCodecPool(this, conf, out);
+ }
+
+ /**
+ * Create a {@link CompressionOutputStream} that will write to the given
+ * {@link OutputStream} with the given {@link Compressor}.
+ *
+ * @param out the location for the final output stream
+ * @param compressor compressor to use
+ * @return a stream the user can write uncompressed data to have it compressed
+ * @throws IOException
+ */
+ @Override
+ public CompressionOutputStream createOutputStream(OutputStream out,
+ Compressor compressor) throws IOException {
+ int bufferSize = conf.getInt(
+ CommonConfigurationKeys.IO_COMPRESSION_CODEC_LZO_BUFFERSIZE_KEY,
+ CommonConfigurationKeys.IO_COMPRESSION_CODEC_LZO_BUFFERSIZE_DEFAULT);
+ io.airlift.compress.lzo.LzoCodec codec = new io.airlift.compress.lzo.LzoCodec();
+ Configuration lzoConf = new Configuration(this.conf);
+ lzoConf.setInt(CommonConfigurationKeys.IO_COMPRESSION_CODEC_LZO_BUFFERSIZE_KEY, bufferSize);
+ codec.setConf(lzoConf);
+ return codec.createOutputStream(out);
+ }
+
+ /**
+ * Get the type of {@link Compressor} needed by this {@link CompressionCodec}.
+ *
+ * @return the type of compressor needed by this codec.
+ */
+ @Override
+ public Class extends Compressor> getCompressorType() {
+ return LzoCompressor.class;
+ }
+
+ /**
+ * Create a new {@link Compressor} for use by this {@link CompressionCodec}.
+ *
+ * @return a new compressor for use by this codec
+ */
+ @Override
+ public Compressor createCompressor() {
+ int bufferSize = conf.getInt(
+ CommonConfigurationKeys.IO_COMPRESSION_CODEC_LZO_BUFFERSIZE_KEY,
+ CommonConfigurationKeys.IO_COMPRESSION_CODEC_LZO_BUFFERSIZE_DEFAULT);
+ return new LzoCompressor(bufferSize);
+ }
+
+ /**
+ * Create a {@link CompressionInputStream} that will read from the given
+ * input stream.
+ *
+ * @param in the stream to read compressed bytes from
+ * @return a stream to read uncompressed bytes from
+ * @throws IOException
+ */
+ @Override
+ public CompressionInputStream createInputStream(InputStream in)
+ throws IOException {
+ return CompressionCodec.Util.
+ createInputStreamWithCodecPool(this, conf, in);
+ }
+
+ /**
+ * Create a {@link CompressionInputStream} that will read from the given
+ * {@link InputStream} with the given {@link Decompressor}.
+ *
+ * @param in the stream to read compressed bytes from
+ * @param decompressor decompressor to use
+ * @return a stream to read uncompressed bytes from
+ * @throws IOException
+ */
+ @Override
+ public CompressionInputStream createInputStream(InputStream in,
+ Decompressor decompressor) throws IOException {
+ int bufferSize = conf.getInt(
+ CommonConfigurationKeys.IO_COMPRESSION_CODEC_LZO_BUFFERSIZE_KEY,
+ CommonConfigurationKeys.IO_COMPRESSION_CODEC_LZO_BUFFERSIZE_DEFAULT);
+ io.airlift.compress.lzo.LzoCodec codec = new io.airlift.compress.lzo.LzoCodec();
+ Configuration lzoConf = new Configuration(this.conf);
+ lzoConf.setInt(CommonConfigurationKeys.IO_COMPRESSION_CODEC_LZO_BUFFERSIZE_KEY, bufferSize);
+ codec.setConf(lzoConf);
+ return codec.createInputStream(in);
+ }
+
+ /**
+ * Get the type of {@link Decompressor} needed by this {@link CompressionCodec}.
+ *
+ * @return the type of decompressor needed by this codec.
+ */
+ @Override
+ public Class extends Decompressor> getDecompressorType() {
+ return LzoDecompressor.class;
+ }
+
+ /**
+ * Create a new {@link Decompressor} for use by this {@link CompressionCodec}.
+ *
+ * @return a new decompressor for use by this codec
+ */
+ @Override
+ public Decompressor createDecompressor() {
+ int bufferSize = conf.getInt(
+ CommonConfigurationKeys.IO_COMPRESSION_CODEC_LZO_BUFFERSIZE_KEY,
+ CommonConfigurationKeys.IO_COMPRESSION_CODEC_LZO_BUFFERSIZE_DEFAULT);
+ return new LzoDecompressor(bufferSize);
+ }
+
+ /**
+ * Get the default filename extension for this kind of compression.
+ *
+ * @return .lzo_deflate
.
+ */
+ @Override
+ public String getDefaultExtension() {
+ return CodecConstants.LZO_CODEC_EXTENSION;
+ }
+}
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/lzo/LzoCompressor.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/lzo/LzoCompressor.java
new file mode 100644
index 0000000000000..0874dfbbaed78
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/lzo/LzoCompressor.java
@@ -0,0 +1,296 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.io.compress.lzo;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.compress.Compressor;
+
+public class LzoCompressor implements Compressor {
+ private static final int DEFAULT_DIRECT_BUFFER_SIZE = 64 * 1024;
+
+ private int bufferSize;
+ private byte[] input;
+ private int inputOffset;
+ private int inputLength;
+
+ private byte[] inputBuffer;
+ private int inputBufferLen;
+ private int inputBufferOffset;
+ private int inputBufferMaxLen;
+
+ private boolean finish;
+ // If `finished` is true, meaning all inputs are consumed and no input in buffer.
+ private boolean finished;
+
+ private byte[] outputBuffer;
+ private int outputBufferLen;
+ private int outputBufferOffset;
+ private int outputBufferMaxLen;
+
+ private long bytesRead;
+ private long bytesWritten;
+
+ private io.airlift.compress.lzo.LzoCompressor compressor;
+
+ /**
+ * Creates a new compressor.
+ */
+ public LzoCompressor(int bufferSize) {
+ this.bufferSize = bufferSize;
+ compressor = new io.airlift.compress.lzo.LzoCompressor();
+
+ reset();
+ }
+
+ /**
+ * Creates a new compressor with the default buffer size.
+ */
+ public LzoCompressor() {
+ this(DEFAULT_DIRECT_BUFFER_SIZE);
+ }
+
+ /**
+ * Sets input data for compression.
+ * This should be called whenever #needsInput() returns
+ * true
indicating that more input data is required.
+ *
+ * @param b Input data
+ * @param off Start offset
+ * @param len Length
+ */
+ @Override
+ public void setInput(byte[] b, int off, int len) {
+ if (b == null) {
+ throw new NullPointerException();
+ }
+ if (off < 0 || len < 0 || off > b.length - len) {
+ throw new ArrayIndexOutOfBoundsException();
+ }
+
+ // Check if we can buffer current input.
+ if (len > inputBufferMaxLen - inputBufferOffset) {
+ input = b;
+ inputOffset = off;
+ inputLength = len;
+ } else {
+ // We can buffer current input.
+ System.arraycopy(inputBuffer, inputBufferOffset, b, off, len);
+ inputBufferOffset += len;
+ inputBufferLen += len;
+ }
+
+ bytesRead += len;
+ }
+
+ /**
+ * Does nothing.
+ */
+ @Override
+ public void setDictionary(byte[] b, int off, int len) {
+ // do nothing
+ }
+
+ /**
+ * Returns true if the input data buffer is empty and
+ * #setInput() should be called to provide more input.
+ *
+ * @return true
if the input data buffer is empty and
+ * #setInput() should be called in order to provide more input.
+ */
+ @Override
+ public boolean needsInput() {
+ // We do not need input if:
+ // 1. there still are compressed output in buffer, or
+ // 2. input buffer is full, or
+ // 3. there is direct input (the input cannot put into input buffer).
+ return !((outputBufferLen > 0) ||
+ (inputBufferMaxLen - inputBufferOffset == 0) || inputLength > 0);
+ }
+
+ /**
+ * When called, indicates that compression should end
+ * with the current contents of the input buffer.
+ */
+ @Override
+ public void finish() {
+ finish = true;
+ }
+
+ /**
+ * Returns true if the end of the compressed
+ * data output stream has been reached.
+ *
+ * @return true
if the end of the compressed
+ * data output stream has been reached.
+ */
+ @Override
+ public boolean finished() {
+ // This compressor is in finished status if:
+ // 1. `finish()` is called, and
+ // 2. all input are consumed, and
+ // 3. no compressed data is in buffer.
+ return finish && finished && (outputBufferLen == 0);
+ }
+
+ /**
+ * Fills specified buffer with compressed data. Returns actual number
+ * of bytes of compressed data. A return value of 0 indicates that
+ * needsInput() should be called in order to determine if more input
+ * data is required.
+ *
+ * @param b Buffer for the compressed data
+ * @param off Start offset of the data
+ * @param len Size of the buffer
+ * @return The actual number of bytes of compressed data.
+ */
+ @Override
+ public int compress(byte[] b, int off, int len)
+ throws IOException {
+ if (b == null) {
+ throw new NullPointerException();
+ }
+ if (off < 0 || len < 0 || off > b.length - len) {
+ throw new ArrayIndexOutOfBoundsException();
+ }
+
+ // Check compressed data in output buffer.
+ if (outputBufferLen > 0) {
+ int outputSize = Math.min(outputBufferLen, len);
+ System.arraycopy(outputBuffer, outputBufferOffset, b, off, outputSize);
+ outputBufferOffset += outputSize;
+ outputBufferLen -= outputSize;
+ bytesWritten += outputSize;
+ return outputSize;
+ }
+
+ outputBufferOffset = outputBufferLen = 0;
+
+ int inputSize = 0;
+ // If no input data in input buffer.
+ if (inputBufferLen == 0) {
+ // Copy direct input.
+ inputBufferOffset = 0;
+ inputSize = copyIntoInputBuffer();
+ if (inputSize == 0) {
+ finished = true;
+ return 0;
+ }
+ } else {
+ inputSize = inputBufferLen;
+ }
+
+ // Compress input and write to output buffer.
+ int compressedSize = compressor.compress(inputBuffer, 0, inputSize,
+ outputBuffer, 0, outputBufferMaxLen);
+ // lzo consumes all buffer input
+ inputBufferOffset = 0;
+ inputBufferLen = 0;
+
+ outputBufferLen = compressedSize;
+
+ if (inputLength == 0) {
+ finished = true;
+ }
+
+ // Copy from compressed data buffer to user buffer.
+ int copiedSize = Math.min(compressedSize, len);
+ bytesWritten += copiedSize;
+ System.arraycopy(outputBuffer, 0, b, off, copiedSize);
+ outputBufferOffset += copiedSize;
+ outputBufferLen -= copiedSize;
+
+ return copiedSize;
+ }
+
+ /**
+ * Copies the some input data from user input to input buffer.
+ */
+ int copyIntoInputBuffer() {
+ if (inputLength == 0) {
+ return 0;
+ }
+
+ finished = false;
+
+ int inputSize = Math.min(inputLength, inputBufferMaxLen - inputBufferOffset);
+ System.arraycopy(input, inputOffset, inputBuffer, inputBufferOffset, inputSize);
+
+ inputBufferLen += inputSize;
+
+ inputOffset += inputSize;
+ inputLength -= inputSize;
+
+ return inputSize;
+ }
+
+ /**
+ * Resets compressor so that a new set of input data can be processed.
+ */
+ @Override
+ public void reset() {
+ input = null;
+ inputOffset = inputLength = 0;
+ finish = finished = false;
+
+ inputBufferMaxLen = bufferSize;
+ outputBufferMaxLen = compressor.maxCompressedLength(inputBufferMaxLen);
+
+ inputBuffer = new byte[inputBufferMaxLen];
+ outputBuffer = new byte[outputBufferMaxLen];
+
+ inputBufferLen = inputBufferOffset = outputBufferLen = outputBufferOffset = 0;
+ bytesRead = bytesWritten = 0;
+ }
+
+ /**
+ * Prepare the compressor to be used in a new stream with settings defined in
+ * the given Configuration
+ *
+ * @param conf Configuration from which new setting are fetched
+ */
+ @Override
+ public void reinit(Configuration conf) {
+ reset();
+ }
+
+ /**
+ * Return number of bytes given to this compressor since last reset.
+ */
+ @Override
+ public long getBytesRead() {
+ return bytesRead;
+ }
+
+ /**
+ * Return number of bytes consumed by callers of compress since last reset.
+ */
+ @Override
+ public long getBytesWritten() {
+ return bytesWritten;
+ }
+
+ /**
+ * Closes the compressor and discards any unprocessed input.
+ */
+ @Override
+ public void end() {
+ }
+}
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/lzo/LzoDecompressor.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/lzo/LzoDecompressor.java
new file mode 100644
index 0000000000000..4d9f686463db6
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/lzo/LzoDecompressor.java
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.io.compress.lzo;
+
+import java.io.IOException;
+import java.nio.Buffer;
+import java.nio.ByteBuffer;
+
+import org.apache.hadoop.io.compress.Decompressor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class LzoDecompressor implements Decompressor {
+ private static final Logger LOG =
+ LoggerFactory.getLogger(LzoDecompressor.class.getName());
+ private static final int DEFAULT_DIRECT_BUFFER_SIZE = 64 * 1024;
+
+ private int bufferSize;
+ private byte[] input;
+ private int inputOffset;
+ private int inputLength;
+
+ private byte[] inputBuffer;
+ private int inputBufferLen;
+ private int inputBufferOffset;
+ private int inputBufferMaxLen;
+
+ private boolean finish;
+ // If `finished` is true, meaning all inputs are consumed and no input in buffer.
+ private boolean finished;
+
+ private byte[] outputBuffer;
+ private int outputBufferLen;
+ private int outputBufferOffset;
+ private int outputBufferMaxLen;
+
+ private long bytesRead;
+ private long bytesWritten;
+
+ private io.airlift.compress.lzo.LzoDecompressor decompressor;
+
+ /**
+ * Creates a new compressor.
+ *
+ * @param bufferSize size of the direct buffer to be used.
+ */
+ public LzoDecompressor(int bufferSize) {
+ this.bufferSize = bufferSize;
+ decompressor = new io.airlift.compress.lzo.LzoDecompressor();
+
+ reset();
+ }
+
+ /**
+ * Creates a new decompressor with the default buffer size.
+ */
+ public LzoDecompressor() {
+ this(DEFAULT_DIRECT_BUFFER_SIZE);
+ }
+
+ /**
+ * Sets input data for decompression.
+ * This should be called if and only if {@link #needsInput()} returns
+ * true
indicating that more input data is required.
+ * (Both native and non-native versions of various Decompressors require
+ * that the data passed in via b[]
remain unmodified until
+ * the caller is explicitly notified--via {@link #needsInput()}--that the
+ * buffer may be safely modified. With this requirement, an extra
+ * buffer-copy can be avoided.)
+ *
+ * @param b Input data
+ * @param off Start offset
+ * @param len Length
+ */
+ @Override
+ public synchronized void setInput(byte[] b, int off, int len) {
+ if (b == null) {
+ throw new NullPointerException();
+ }
+ if (off < 0 || len < 0 || off > b.length - len) {
+ throw new ArrayIndexOutOfBoundsException();
+ }
+
+ }
+
+ /**
+ * If a write would exceed the capacity of the direct buffers, it is set
+ * aside to be loaded by this function while the compressed data are
+ * consumed.
+ */
+ synchronized void setInputFromSavedData() {
+ }
+
+ /**
+ * Does nothing.
+ */
+ @Override
+ public synchronized void setDictionary(byte[] b, int off, int len) {
+ // do nothing
+ }
+
+ /**
+ * Returns true if the input data buffer is empty and
+ * {@link #setInput(byte[], int, int)} should be called to
+ * provide more input.
+ *
+ * @return true
if the input data buffer is empty and
+ * {@link #setInput(byte[], int, int)} should be called in
+ * order to provide more input.
+ */
+ @Override
+ public synchronized boolean needsInput() {
+ return false;
+ }
+
+ /**
+ * Returns false
.
+ *
+ * @return false
.
+ */
+ @Override
+ public synchronized boolean needsDictionary() {
+ return false;
+ }
+
+ /**
+ * Returns true if the end of the decompressed
+ * data output stream has been reached.
+ *
+ * @return true
if the end of the decompressed
+ * data output stream has been reached.
+ */
+ @Override
+ public synchronized boolean finished() {
+ return (finished);
+ }
+
+ /**
+ * Fills specified buffer with uncompressed data. Returns actual number
+ * of bytes of uncompressed data. A return value of 0 indicates that
+ * {@link #needsInput()} should be called in order to determine if more
+ * input data is required.
+ *
+ * @param b Buffer for the compressed data
+ * @param off Start offset of the data
+ * @param len Size of the buffer
+ * @return The actual number of bytes of compressed data.
+ * @throws IOException
+ */
+ @Override
+ public synchronized int decompress(byte[] b, int off, int len)
+ throws IOException
+ {
+ throw new UnsupportedOperationException("LZO block decompressor is not supported");
+ }
+
+ /**
+ * Returns 0
.
+ *
+ * @return 0
.
+ */
+ @Override
+ public synchronized int getRemaining() {
+ // Never use this function in BlockDecompressorStream.
+ return 0;
+ }
+
+ @Override
+ public synchronized void reset() {
+
+ }
+
+ /**
+ * Resets decompressor and input and output buffers so that a new set of
+ * input data can be processed.
+ */
+ @Override
+ public synchronized void end() {
+ // do nothing
+ }
+}
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/lzo/package-info.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/lzo/package-info.java
new file mode 100644
index 0000000000000..19effdd39c11a
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/lzo/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+package org.apache.hadoop.io.compress.lzo;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/TestCodec.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/TestCodec.java
index 0bda66741ba65..aefc8b12d6afb 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/TestCodec.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/TestCodec.java
@@ -49,6 +49,7 @@
import java.util.zip.GZIPInputStream;
import java.util.zip.GZIPOutputStream;
+import com.hadoop.compression.lzo.LzoCodec;
import org.apache.commons.codec.binary.Base64;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
@@ -79,6 +80,7 @@
import org.apache.hadoop.util.ReflectionUtils;
import org.junit.After;
import org.junit.Assert;
+import org.junit.Ignore;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -168,6 +170,12 @@ public void testDeflateCodec() throws IOException {
codecTest(conf, seed, count, "org.apache.hadoop.io.compress.DeflateCodec");
}
+ @Test
+ public void testLzoCodec() throws IOException {
+ codecTest(conf, seed, 0, "org.apache.hadoop.io.compress.LzoCodec");
+ codecTest(conf, seed, count, "org.apache.hadoop.io.compress.LzoCodec");
+ }
+
@Test
public void testGzipCodecWithParam() throws IOException {
Configuration conf = new Configuration(this.conf);
@@ -1260,4 +1268,177 @@ public void testGzipCompressorWithEmptyInput() throws IOException {
assertThat(b).as("check decompressed output").isEqualTo(dflchk);
}
}
+
+ @Test
+ public void testLzoCompatibilityWithCompressor() throws IOException {
+ Configuration hadoopConf = new Configuration();
+ CompressionCodec codec = ReflectionUtils.newInstance(com.hadoop.compression.lzo.LzoCodec.class, hadoopConf);
+ Random r = new Random();
+
+ for (int i = 0; i < 100; i++) {
+ DataOutputBuffer outputBuffer = new DataOutputBuffer();
+ assertTrue(outputBuffer.getLength() == 0);
+ Compressor compressor = codec.createCompressor();
+ LOG.info("compressor: {}", compressor.getClass().toString());
+ assertThat(compressor).as("compressor should not be null").isNotNull();
+
+ CompressionOutputStream compressStream = codec.createOutputStream(outputBuffer, compressor);
+
+ long randonSeed = r.nextLong();
+ r.setSeed(randonSeed);
+
+ int inputSize = r.nextInt(256 * 1024 + 1) + 1;
+ byte[] b = new byte[inputSize];
+ r.nextBytes(b);
+ compressStream.write(b);
+ compressStream.flush();
+ compressStream.finish();
+
+ assertTrue(outputBuffer.getLength() > 0);
+ int compressedSize = outputBuffer.getLength();
+
+ DataInputBuffer lzobuf = new DataInputBuffer();
+ assertTrue(lzobuf.getLength() == 0);
+
+ lzobuf.reset(outputBuffer.getData(), 0, outputBuffer.getLength());
+ assertTrue(lzobuf.getLength() == compressedSize);
+ assertTrue(lzobuf.getPosition() == 0);
+
+ Decompressor decom = codec.createDecompressor();
+ LOG.info("decompressor: {}", decom.getClass().toString());
+ assertThat(decom).as("decompressor should not be null").isNotNull();
+
+ InputStream gzin = codec.createInputStream(lzobuf, decom);
+ DataOutputBuffer dflbuf = new DataOutputBuffer();
+ IOUtils.copyBytes(gzin, dflbuf, inputSize);
+ final byte[] dataRead = Arrays.copyOf(dflbuf.getData(), dflbuf.getLength());
+ assertArrayEquals(b, dataRead);
+
+ gzin.close();
+ }
+ }
+
+ @Test
+ public void testLzoCompatibilityWithNewCompressor() throws IOException {
+ Configuration hadoopConf = new Configuration();
+ CompressionCodec codec = ReflectionUtils.newInstance(com.hadoop.compression.lzo.LzoCodec.class, hadoopConf);
+ CompressionCodec codec2 = ReflectionUtils.newInstance(LzoCodec2.class, hadoopConf);
+ Random r = new Random();
+
+ for (int i = 0; i < 100; i++) {
+ DataOutputBuffer outputBuffer = new DataOutputBuffer();
+ assertTrue(outputBuffer.getLength() == 0);
+ Compressor compressor = codec2.createCompressor();
+ LOG.info("compressor: {}", compressor.getClass().toString());
+ assertThat(compressor).as("compressor should not be null").isNotNull();
+
+ CompressionOutputStream compressStream = codec.createOutputStream(outputBuffer, compressor);
+
+ long randonSeed = r.nextLong();
+ r.setSeed(randonSeed);
+
+ int inputSize = r.nextInt(256) + 1;
+ byte[] b = new byte[inputSize];
+ r.nextBytes(b);
+ compressStream.write(b);
+ compressStream.flush();
+ compressStream.finish();
+
+ assertTrue(outputBuffer.getLength() > 0);
+ int compressedSize = outputBuffer.getLength();
+
+ DataInputBuffer lzobuf = new DataInputBuffer();
+ assertTrue(lzobuf.getLength() == 0);
+
+ lzobuf.reset(outputBuffer.getData(), 0, outputBuffer.getLength());
+ assertTrue(lzobuf.getLength() == compressedSize);
+ assertTrue(lzobuf.getPosition() == 0);
+
+ Decompressor decom = codec.createDecompressor();
+ LOG.info("decompressor: {}", decom.getClass().toString());
+ assertThat(decom).as("decompressor should not be null").isNotNull();
+
+ InputStream gzin = codec.createInputStream(lzobuf, decom);
+ DataOutputBuffer dflbuf = new DataOutputBuffer();
+ IOUtils.copyBytes(gzin, dflbuf, inputSize);
+ final byte[] dataRead = Arrays.copyOf(dflbuf.getData(), dflbuf.getLength());
+ assertArrayEquals(b, dataRead);
+
+ gzin.close();
+ }
+ }
+
+ private void checkCompressedOutput(DataOutputBuffer outputBuffer, int compressedSize, byte[] b, int inputSize, CompressionCodec codec) throws IOException {
+ DataInputBuffer lzobuf = new DataInputBuffer();
+ assertTrue(lzobuf.getLength() == 0);
+
+ lzobuf.reset(outputBuffer.getData(), 0, outputBuffer.getLength());
+ assertTrue(lzobuf.getLength() == compressedSize);
+ assertTrue(lzobuf.getPosition() == 0);
+
+ Decompressor decom = codec.createDecompressor();
+ LOG.info("decompressor: {}", decom.getClass().toString());
+ assertThat(decom).as("decompressor should not be null").isNotNull();
+
+ InputStream gzin = codec.createInputStream(lzobuf, decom);
+ DataOutputBuffer dflbuf = new DataOutputBuffer();
+ IOUtils.copyBytes(gzin, dflbuf, inputSize);
+ final byte[] dataRead = Arrays.copyOf(dflbuf.getData(), dflbuf.getLength());
+ assertArrayEquals(b, dataRead);
+
+ gzin.close();
+ }
+
+ @Test
+ public void testLzoNewCompressor() throws IOException {
+ Configuration hadoopConf = new Configuration();
+ CompressionCodec codec = ReflectionUtils.newInstance(com.hadoop.compression.lzo.LzoCodec.class, hadoopConf);
+ CompressionCodec codec2 = ReflectionUtils.newInstance(LzoCodec2.class, hadoopConf);
+ Random r = new Random();
+
+ for (int i = 1; i < 100; i++) {
+ DataOutputBuffer outputBuffer = new DataOutputBuffer();
+ DataOutputBuffer outputBuffer2 = new DataOutputBuffer();
+
+ assertTrue(outputBuffer.getLength() == 0);
+ assertTrue(outputBuffer2.getLength() == 0);
+
+ Compressor compressor = codec.createCompressor();
+ Compressor compressor2 = codec2.createCompressor();
+ LOG.info("compressor: {}", compressor.getClass().toString());
+ LOG.info("compressor2: {}", compressor2.getClass().toString());
+
+ assertThat(compressor).as("compressor should not be null").isNotNull();
+ assertThat(compressor2).as("compressor should not be null").isNotNull();
+
+ CompressionOutputStream compressStream = codec.createOutputStream(outputBuffer, compressor);
+ CompressionOutputStream compressStream2 = codec2.createOutputStream(outputBuffer2, compressor2);
+
+ long randonSeed = r.nextLong();
+ r.setSeed(randonSeed);
+
+ int inputSize = r.nextInt(256) + 1;
+ byte[] b = new byte[inputSize];
+ LOG.info("inputSize: " + inputSize);
+ r.nextBytes(b);
+ compressStream.write(b);
+ compressStream.flush();
+ compressStream.finish();
+
+ compressStream2.write(b);
+ compressStream2.flush();
+ compressStream2.finish();
+
+ assertTrue(outputBuffer.getLength() >= 0);
+ int compressedSize = outputBuffer.getLength();
+ LOG.info("compressor compressed size: " + compressedSize);
+
+ assertTrue(outputBuffer2.getLength() >= 0);
+ int compressedSize2 = outputBuffer2.getLength();
+ LOG.info("compressor2 compressed size: " + compressedSize2);
+
+ checkCompressedOutput(outputBuffer, compressedSize, b, inputSize, codec);
+ checkCompressedOutput(outputBuffer2, compressedSize2, b, inputSize, codec);
+ }
+ }
}
diff --git a/hadoop-project/pom.xml b/hadoop-project/pom.xml
index 9392a9f67fb3e..625b45a1606ae 100644
--- a/hadoop-project/pom.xml
+++ b/hadoop-project/pom.xml
@@ -141,6 +141,7 @@
4.1.68.Final
1.1.8.2
1.7.1
+ 0.16
0.5.1
@@ -1762,6 +1763,11 @@
lz4-java
${lz4-java.version}
+
+ io.airlift
+ aircompressor
+ ${aircompressor.version}
+
@@ -2459,5 +2465,9 @@
+
+ twitter
+ https://maven.twttr.com/
+