diff --git a/hadoop-common-project/hadoop-common/pom.xml b/hadoop-common-project/hadoop-common/pom.xml index c0235b4db9e9c..12d4fb7fa2859 100644 --- a/hadoop-common-project/hadoop-common/pom.xml +++ b/hadoop-common-project/hadoop-common/pom.xml @@ -375,6 +375,17 @@ lz4-java provided + + io.airlift + aircompressor + provided + + + com.hadoop.gplcompression + hadoop-lzo + 0.4.20 + test + diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/IOUtils.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/IOUtils.java index 121af64b01182..69e74bb1a8826 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/IOUtils.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/IOUtils.java @@ -92,12 +92,14 @@ public static void copyBytes(InputStream in, OutputStream out, int buffSize) PrintStream ps = out instanceof PrintStream ? (PrintStream)out : null; byte buf[] = new byte[buffSize]; int bytesRead = in.read(buf); + LOG.info("bytesRead: {}", bytesRead); while (bytesRead >= 0) { out.write(buf, 0, bytesRead); if ((ps != null) && ps.checkError()) { throw new IOException("Unable to write to output stream."); } bytesRead = in.read(buf); + LOG.info("bytesRead: {}", bytesRead); } } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/CodecConstants.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/CodecConstants.java index 96410a18ebcb5..dcb8491a36135 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/CodecConstants.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/CodecConstants.java @@ -65,4 +65,9 @@ private CodecConstants() { * Default extension for {@link org.apache.hadoop.io.compress.ZStandardCodec}. */ public static final String ZSTANDARD_CODEC_EXTENSION = ".zst"; + + /** + * Default extension for {@link org.apache.hadoop.io.compress.LzoCodec}. + */ + public static final String LZO_CODEC_EXTENSION = ".lzo_deflate"; } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/LzoCodec2.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/LzoCodec2.java new file mode 100644 index 0000000000000..cca829af4f979 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/LzoCodec2.java @@ -0,0 +1,186 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.io.compress; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; + +import org.apache.hadoop.conf.Configurable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CommonConfigurationKeys; +import org.apache.hadoop.io.compress.lzo.LzoCompressor; +import org.apache.hadoop.io.compress.lzo.LzoDecompressor; + +/** + * This class creates lzo compressors/decompressors. + */ +public class LzoCodec2 implements Configurable, CompressionCodec { + Configuration conf; + + /** + * Set the configuration to be used by this object. + * + * @param conf the configuration object. + */ + @Override + public void setConf(Configuration conf) { + this.conf = conf; + } + + /** + * Return the configuration used by this object. + * + * @return the configuration object used by this object. + */ + @Override + public Configuration getConf() { + return conf; + } + + /** + * Create a {@link CompressionOutputStream} that will write to the given + * {@link OutputStream}. + * + * @param out the location for the final output stream + * @return a stream the user can write uncompressed data to have it compressed + * @throws IOException + */ + @Override + public CompressionOutputStream createOutputStream(OutputStream out) + throws IOException { + return CompressionCodec.Util. + createOutputStreamWithCodecPool(this, conf, out); + } + + /** + * Create a {@link CompressionOutputStream} that will write to the given + * {@link OutputStream} with the given {@link Compressor}. + * + * @param out the location for the final output stream + * @param compressor compressor to use + * @return a stream the user can write uncompressed data to have it compressed + * @throws IOException + */ + @Override + public CompressionOutputStream createOutputStream(OutputStream out, + Compressor compressor) throws IOException { + int bufferSize = conf.getInt( + CommonConfigurationKeys.IO_COMPRESSION_CODEC_LZO_BUFFERSIZE_KEY, + CommonConfigurationKeys.IO_COMPRESSION_CODEC_LZO_BUFFERSIZE_DEFAULT); + io.airlift.compress.lzo.LzoCodec codec = new io.airlift.compress.lzo.LzoCodec(); + Configuration lzoConf = new Configuration(this.conf); + lzoConf.setInt(CommonConfigurationKeys.IO_COMPRESSION_CODEC_LZO_BUFFERSIZE_KEY, bufferSize); + codec.setConf(lzoConf); + return codec.createOutputStream(out); + } + + /** + * Get the type of {@link Compressor} needed by this {@link CompressionCodec}. + * + * @return the type of compressor needed by this codec. + */ + @Override + public Class getCompressorType() { + return LzoCompressor.class; + } + + /** + * Create a new {@link Compressor} for use by this {@link CompressionCodec}. + * + * @return a new compressor for use by this codec + */ + @Override + public Compressor createCompressor() { + int bufferSize = conf.getInt( + CommonConfigurationKeys.IO_COMPRESSION_CODEC_LZO_BUFFERSIZE_KEY, + CommonConfigurationKeys.IO_COMPRESSION_CODEC_LZO_BUFFERSIZE_DEFAULT); + return new LzoCompressor(bufferSize); + } + + /** + * Create a {@link CompressionInputStream} that will read from the given + * input stream. + * + * @param in the stream to read compressed bytes from + * @return a stream to read uncompressed bytes from + * @throws IOException + */ + @Override + public CompressionInputStream createInputStream(InputStream in) + throws IOException { + return CompressionCodec.Util. + createInputStreamWithCodecPool(this, conf, in); + } + + /** + * Create a {@link CompressionInputStream} that will read from the given + * {@link InputStream} with the given {@link Decompressor}. + * + * @param in the stream to read compressed bytes from + * @param decompressor decompressor to use + * @return a stream to read uncompressed bytes from + * @throws IOException + */ + @Override + public CompressionInputStream createInputStream(InputStream in, + Decompressor decompressor) throws IOException { + int bufferSize = conf.getInt( + CommonConfigurationKeys.IO_COMPRESSION_CODEC_LZO_BUFFERSIZE_KEY, + CommonConfigurationKeys.IO_COMPRESSION_CODEC_LZO_BUFFERSIZE_DEFAULT); + io.airlift.compress.lzo.LzoCodec codec = new io.airlift.compress.lzo.LzoCodec(); + Configuration lzoConf = new Configuration(this.conf); + lzoConf.setInt(CommonConfigurationKeys.IO_COMPRESSION_CODEC_LZO_BUFFERSIZE_KEY, bufferSize); + codec.setConf(lzoConf); + return codec.createInputStream(in); + } + + /** + * Get the type of {@link Decompressor} needed by this {@link CompressionCodec}. + * + * @return the type of decompressor needed by this codec. + */ + @Override + public Class getDecompressorType() { + return LzoDecompressor.class; + } + + /** + * Create a new {@link Decompressor} for use by this {@link CompressionCodec}. + * + * @return a new decompressor for use by this codec + */ + @Override + public Decompressor createDecompressor() { + int bufferSize = conf.getInt( + CommonConfigurationKeys.IO_COMPRESSION_CODEC_LZO_BUFFERSIZE_KEY, + CommonConfigurationKeys.IO_COMPRESSION_CODEC_LZO_BUFFERSIZE_DEFAULT); + return new LzoDecompressor(bufferSize); + } + + /** + * Get the default filename extension for this kind of compression. + * + * @return .lzo_deflate. + */ + @Override + public String getDefaultExtension() { + return CodecConstants.LZO_CODEC_EXTENSION; + } +} diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/lzo/LzoCompressor.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/lzo/LzoCompressor.java new file mode 100644 index 0000000000000..0874dfbbaed78 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/lzo/LzoCompressor.java @@ -0,0 +1,296 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.io.compress.lzo; + +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.compress.Compressor; + +public class LzoCompressor implements Compressor { + private static final int DEFAULT_DIRECT_BUFFER_SIZE = 64 * 1024; + + private int bufferSize; + private byte[] input; + private int inputOffset; + private int inputLength; + + private byte[] inputBuffer; + private int inputBufferLen; + private int inputBufferOffset; + private int inputBufferMaxLen; + + private boolean finish; + // If `finished` is true, meaning all inputs are consumed and no input in buffer. + private boolean finished; + + private byte[] outputBuffer; + private int outputBufferLen; + private int outputBufferOffset; + private int outputBufferMaxLen; + + private long bytesRead; + private long bytesWritten; + + private io.airlift.compress.lzo.LzoCompressor compressor; + + /** + * Creates a new compressor. + */ + public LzoCompressor(int bufferSize) { + this.bufferSize = bufferSize; + compressor = new io.airlift.compress.lzo.LzoCompressor(); + + reset(); + } + + /** + * Creates a new compressor with the default buffer size. + */ + public LzoCompressor() { + this(DEFAULT_DIRECT_BUFFER_SIZE); + } + + /** + * Sets input data for compression. + * This should be called whenever #needsInput() returns + * true indicating that more input data is required. + * + * @param b Input data + * @param off Start offset + * @param len Length + */ + @Override + public void setInput(byte[] b, int off, int len) { + if (b == null) { + throw new NullPointerException(); + } + if (off < 0 || len < 0 || off > b.length - len) { + throw new ArrayIndexOutOfBoundsException(); + } + + // Check if we can buffer current input. + if (len > inputBufferMaxLen - inputBufferOffset) { + input = b; + inputOffset = off; + inputLength = len; + } else { + // We can buffer current input. + System.arraycopy(inputBuffer, inputBufferOffset, b, off, len); + inputBufferOffset += len; + inputBufferLen += len; + } + + bytesRead += len; + } + + /** + * Does nothing. + */ + @Override + public void setDictionary(byte[] b, int off, int len) { + // do nothing + } + + /** + * Returns true if the input data buffer is empty and + * #setInput() should be called to provide more input. + * + * @return true if the input data buffer is empty and + * #setInput() should be called in order to provide more input. + */ + @Override + public boolean needsInput() { + // We do not need input if: + // 1. there still are compressed output in buffer, or + // 2. input buffer is full, or + // 3. there is direct input (the input cannot put into input buffer). + return !((outputBufferLen > 0) || + (inputBufferMaxLen - inputBufferOffset == 0) || inputLength > 0); + } + + /** + * When called, indicates that compression should end + * with the current contents of the input buffer. + */ + @Override + public void finish() { + finish = true; + } + + /** + * Returns true if the end of the compressed + * data output stream has been reached. + * + * @return true if the end of the compressed + * data output stream has been reached. + */ + @Override + public boolean finished() { + // This compressor is in finished status if: + // 1. `finish()` is called, and + // 2. all input are consumed, and + // 3. no compressed data is in buffer. + return finish && finished && (outputBufferLen == 0); + } + + /** + * Fills specified buffer with compressed data. Returns actual number + * of bytes of compressed data. A return value of 0 indicates that + * needsInput() should be called in order to determine if more input + * data is required. + * + * @param b Buffer for the compressed data + * @param off Start offset of the data + * @param len Size of the buffer + * @return The actual number of bytes of compressed data. + */ + @Override + public int compress(byte[] b, int off, int len) + throws IOException { + if (b == null) { + throw new NullPointerException(); + } + if (off < 0 || len < 0 || off > b.length - len) { + throw new ArrayIndexOutOfBoundsException(); + } + + // Check compressed data in output buffer. + if (outputBufferLen > 0) { + int outputSize = Math.min(outputBufferLen, len); + System.arraycopy(outputBuffer, outputBufferOffset, b, off, outputSize); + outputBufferOffset += outputSize; + outputBufferLen -= outputSize; + bytesWritten += outputSize; + return outputSize; + } + + outputBufferOffset = outputBufferLen = 0; + + int inputSize = 0; + // If no input data in input buffer. + if (inputBufferLen == 0) { + // Copy direct input. + inputBufferOffset = 0; + inputSize = copyIntoInputBuffer(); + if (inputSize == 0) { + finished = true; + return 0; + } + } else { + inputSize = inputBufferLen; + } + + // Compress input and write to output buffer. + int compressedSize = compressor.compress(inputBuffer, 0, inputSize, + outputBuffer, 0, outputBufferMaxLen); + // lzo consumes all buffer input + inputBufferOffset = 0; + inputBufferLen = 0; + + outputBufferLen = compressedSize; + + if (inputLength == 0) { + finished = true; + } + + // Copy from compressed data buffer to user buffer. + int copiedSize = Math.min(compressedSize, len); + bytesWritten += copiedSize; + System.arraycopy(outputBuffer, 0, b, off, copiedSize); + outputBufferOffset += copiedSize; + outputBufferLen -= copiedSize; + + return copiedSize; + } + + /** + * Copies the some input data from user input to input buffer. + */ + int copyIntoInputBuffer() { + if (inputLength == 0) { + return 0; + } + + finished = false; + + int inputSize = Math.min(inputLength, inputBufferMaxLen - inputBufferOffset); + System.arraycopy(input, inputOffset, inputBuffer, inputBufferOffset, inputSize); + + inputBufferLen += inputSize; + + inputOffset += inputSize; + inputLength -= inputSize; + + return inputSize; + } + + /** + * Resets compressor so that a new set of input data can be processed. + */ + @Override + public void reset() { + input = null; + inputOffset = inputLength = 0; + finish = finished = false; + + inputBufferMaxLen = bufferSize; + outputBufferMaxLen = compressor.maxCompressedLength(inputBufferMaxLen); + + inputBuffer = new byte[inputBufferMaxLen]; + outputBuffer = new byte[outputBufferMaxLen]; + + inputBufferLen = inputBufferOffset = outputBufferLen = outputBufferOffset = 0; + bytesRead = bytesWritten = 0; + } + + /** + * Prepare the compressor to be used in a new stream with settings defined in + * the given Configuration + * + * @param conf Configuration from which new setting are fetched + */ + @Override + public void reinit(Configuration conf) { + reset(); + } + + /** + * Return number of bytes given to this compressor since last reset. + */ + @Override + public long getBytesRead() { + return bytesRead; + } + + /** + * Return number of bytes consumed by callers of compress since last reset. + */ + @Override + public long getBytesWritten() { + return bytesWritten; + } + + /** + * Closes the compressor and discards any unprocessed input. + */ + @Override + public void end() { + } +} diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/lzo/LzoDecompressor.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/lzo/LzoDecompressor.java new file mode 100644 index 0000000000000..4d9f686463db6 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/lzo/LzoDecompressor.java @@ -0,0 +1,197 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.io.compress.lzo; + +import java.io.IOException; +import java.nio.Buffer; +import java.nio.ByteBuffer; + +import org.apache.hadoop.io.compress.Decompressor; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class LzoDecompressor implements Decompressor { + private static final Logger LOG = + LoggerFactory.getLogger(LzoDecompressor.class.getName()); + private static final int DEFAULT_DIRECT_BUFFER_SIZE = 64 * 1024; + + private int bufferSize; + private byte[] input; + private int inputOffset; + private int inputLength; + + private byte[] inputBuffer; + private int inputBufferLen; + private int inputBufferOffset; + private int inputBufferMaxLen; + + private boolean finish; + // If `finished` is true, meaning all inputs are consumed and no input in buffer. + private boolean finished; + + private byte[] outputBuffer; + private int outputBufferLen; + private int outputBufferOffset; + private int outputBufferMaxLen; + + private long bytesRead; + private long bytesWritten; + + private io.airlift.compress.lzo.LzoDecompressor decompressor; + + /** + * Creates a new compressor. + * + * @param bufferSize size of the direct buffer to be used. + */ + public LzoDecompressor(int bufferSize) { + this.bufferSize = bufferSize; + decompressor = new io.airlift.compress.lzo.LzoDecompressor(); + + reset(); + } + + /** + * Creates a new decompressor with the default buffer size. + */ + public LzoDecompressor() { + this(DEFAULT_DIRECT_BUFFER_SIZE); + } + + /** + * Sets input data for decompression. + * This should be called if and only if {@link #needsInput()} returns + * true indicating that more input data is required. + * (Both native and non-native versions of various Decompressors require + * that the data passed in via b[] remain unmodified until + * the caller is explicitly notified--via {@link #needsInput()}--that the + * buffer may be safely modified. With this requirement, an extra + * buffer-copy can be avoided.) + * + * @param b Input data + * @param off Start offset + * @param len Length + */ + @Override + public synchronized void setInput(byte[] b, int off, int len) { + if (b == null) { + throw new NullPointerException(); + } + if (off < 0 || len < 0 || off > b.length - len) { + throw new ArrayIndexOutOfBoundsException(); + } + + } + + /** + * If a write would exceed the capacity of the direct buffers, it is set + * aside to be loaded by this function while the compressed data are + * consumed. + */ + synchronized void setInputFromSavedData() { + } + + /** + * Does nothing. + */ + @Override + public synchronized void setDictionary(byte[] b, int off, int len) { + // do nothing + } + + /** + * Returns true if the input data buffer is empty and + * {@link #setInput(byte[], int, int)} should be called to + * provide more input. + * + * @return true if the input data buffer is empty and + * {@link #setInput(byte[], int, int)} should be called in + * order to provide more input. + */ + @Override + public synchronized boolean needsInput() { + return false; + } + + /** + * Returns false. + * + * @return false. + */ + @Override + public synchronized boolean needsDictionary() { + return false; + } + + /** + * Returns true if the end of the decompressed + * data output stream has been reached. + * + * @return true if the end of the decompressed + * data output stream has been reached. + */ + @Override + public synchronized boolean finished() { + return (finished); + } + + /** + * Fills specified buffer with uncompressed data. Returns actual number + * of bytes of uncompressed data. A return value of 0 indicates that + * {@link #needsInput()} should be called in order to determine if more + * input data is required. + * + * @param b Buffer for the compressed data + * @param off Start offset of the data + * @param len Size of the buffer + * @return The actual number of bytes of compressed data. + * @throws IOException + */ + @Override + public synchronized int decompress(byte[] b, int off, int len) + throws IOException + { + throw new UnsupportedOperationException("LZO block decompressor is not supported"); + } + + /** + * Returns 0. + * + * @return 0. + */ + @Override + public synchronized int getRemaining() { + // Never use this function in BlockDecompressorStream. + return 0; + } + + @Override + public synchronized void reset() { + + } + + /** + * Resets decompressor and input and output buffers so that a new set of + * input data can be processed. + */ + @Override + public synchronized void end() { + // do nothing + } +} diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/lzo/package-info.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/lzo/package-info.java new file mode 100644 index 0000000000000..19effdd39c11a --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/lzo/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +@InterfaceAudience.Private +@InterfaceStability.Unstable +package org.apache.hadoop.io.compress.lzo; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/TestCodec.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/TestCodec.java index 0bda66741ba65..aefc8b12d6afb 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/TestCodec.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/TestCodec.java @@ -49,6 +49,7 @@ import java.util.zip.GZIPInputStream; import java.util.zip.GZIPOutputStream; +import com.hadoop.compression.lzo.LzoCodec; import org.apache.commons.codec.binary.Base64; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeys; @@ -79,6 +80,7 @@ import org.apache.hadoop.util.ReflectionUtils; import org.junit.After; import org.junit.Assert; +import org.junit.Ignore; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -168,6 +170,12 @@ public void testDeflateCodec() throws IOException { codecTest(conf, seed, count, "org.apache.hadoop.io.compress.DeflateCodec"); } + @Test + public void testLzoCodec() throws IOException { + codecTest(conf, seed, 0, "org.apache.hadoop.io.compress.LzoCodec"); + codecTest(conf, seed, count, "org.apache.hadoop.io.compress.LzoCodec"); + } + @Test public void testGzipCodecWithParam() throws IOException { Configuration conf = new Configuration(this.conf); @@ -1260,4 +1268,177 @@ public void testGzipCompressorWithEmptyInput() throws IOException { assertThat(b).as("check decompressed output").isEqualTo(dflchk); } } + + @Test + public void testLzoCompatibilityWithCompressor() throws IOException { + Configuration hadoopConf = new Configuration(); + CompressionCodec codec = ReflectionUtils.newInstance(com.hadoop.compression.lzo.LzoCodec.class, hadoopConf); + Random r = new Random(); + + for (int i = 0; i < 100; i++) { + DataOutputBuffer outputBuffer = new DataOutputBuffer(); + assertTrue(outputBuffer.getLength() == 0); + Compressor compressor = codec.createCompressor(); + LOG.info("compressor: {}", compressor.getClass().toString()); + assertThat(compressor).as("compressor should not be null").isNotNull(); + + CompressionOutputStream compressStream = codec.createOutputStream(outputBuffer, compressor); + + long randonSeed = r.nextLong(); + r.setSeed(randonSeed); + + int inputSize = r.nextInt(256 * 1024 + 1) + 1; + byte[] b = new byte[inputSize]; + r.nextBytes(b); + compressStream.write(b); + compressStream.flush(); + compressStream.finish(); + + assertTrue(outputBuffer.getLength() > 0); + int compressedSize = outputBuffer.getLength(); + + DataInputBuffer lzobuf = new DataInputBuffer(); + assertTrue(lzobuf.getLength() == 0); + + lzobuf.reset(outputBuffer.getData(), 0, outputBuffer.getLength()); + assertTrue(lzobuf.getLength() == compressedSize); + assertTrue(lzobuf.getPosition() == 0); + + Decompressor decom = codec.createDecompressor(); + LOG.info("decompressor: {}", decom.getClass().toString()); + assertThat(decom).as("decompressor should not be null").isNotNull(); + + InputStream gzin = codec.createInputStream(lzobuf, decom); + DataOutputBuffer dflbuf = new DataOutputBuffer(); + IOUtils.copyBytes(gzin, dflbuf, inputSize); + final byte[] dataRead = Arrays.copyOf(dflbuf.getData(), dflbuf.getLength()); + assertArrayEquals(b, dataRead); + + gzin.close(); + } + } + + @Test + public void testLzoCompatibilityWithNewCompressor() throws IOException { + Configuration hadoopConf = new Configuration(); + CompressionCodec codec = ReflectionUtils.newInstance(com.hadoop.compression.lzo.LzoCodec.class, hadoopConf); + CompressionCodec codec2 = ReflectionUtils.newInstance(LzoCodec2.class, hadoopConf); + Random r = new Random(); + + for (int i = 0; i < 100; i++) { + DataOutputBuffer outputBuffer = new DataOutputBuffer(); + assertTrue(outputBuffer.getLength() == 0); + Compressor compressor = codec2.createCompressor(); + LOG.info("compressor: {}", compressor.getClass().toString()); + assertThat(compressor).as("compressor should not be null").isNotNull(); + + CompressionOutputStream compressStream = codec.createOutputStream(outputBuffer, compressor); + + long randonSeed = r.nextLong(); + r.setSeed(randonSeed); + + int inputSize = r.nextInt(256) + 1; + byte[] b = new byte[inputSize]; + r.nextBytes(b); + compressStream.write(b); + compressStream.flush(); + compressStream.finish(); + + assertTrue(outputBuffer.getLength() > 0); + int compressedSize = outputBuffer.getLength(); + + DataInputBuffer lzobuf = new DataInputBuffer(); + assertTrue(lzobuf.getLength() == 0); + + lzobuf.reset(outputBuffer.getData(), 0, outputBuffer.getLength()); + assertTrue(lzobuf.getLength() == compressedSize); + assertTrue(lzobuf.getPosition() == 0); + + Decompressor decom = codec.createDecompressor(); + LOG.info("decompressor: {}", decom.getClass().toString()); + assertThat(decom).as("decompressor should not be null").isNotNull(); + + InputStream gzin = codec.createInputStream(lzobuf, decom); + DataOutputBuffer dflbuf = new DataOutputBuffer(); + IOUtils.copyBytes(gzin, dflbuf, inputSize); + final byte[] dataRead = Arrays.copyOf(dflbuf.getData(), dflbuf.getLength()); + assertArrayEquals(b, dataRead); + + gzin.close(); + } + } + + private void checkCompressedOutput(DataOutputBuffer outputBuffer, int compressedSize, byte[] b, int inputSize, CompressionCodec codec) throws IOException { + DataInputBuffer lzobuf = new DataInputBuffer(); + assertTrue(lzobuf.getLength() == 0); + + lzobuf.reset(outputBuffer.getData(), 0, outputBuffer.getLength()); + assertTrue(lzobuf.getLength() == compressedSize); + assertTrue(lzobuf.getPosition() == 0); + + Decompressor decom = codec.createDecompressor(); + LOG.info("decompressor: {}", decom.getClass().toString()); + assertThat(decom).as("decompressor should not be null").isNotNull(); + + InputStream gzin = codec.createInputStream(lzobuf, decom); + DataOutputBuffer dflbuf = new DataOutputBuffer(); + IOUtils.copyBytes(gzin, dflbuf, inputSize); + final byte[] dataRead = Arrays.copyOf(dflbuf.getData(), dflbuf.getLength()); + assertArrayEquals(b, dataRead); + + gzin.close(); + } + + @Test + public void testLzoNewCompressor() throws IOException { + Configuration hadoopConf = new Configuration(); + CompressionCodec codec = ReflectionUtils.newInstance(com.hadoop.compression.lzo.LzoCodec.class, hadoopConf); + CompressionCodec codec2 = ReflectionUtils.newInstance(LzoCodec2.class, hadoopConf); + Random r = new Random(); + + for (int i = 1; i < 100; i++) { + DataOutputBuffer outputBuffer = new DataOutputBuffer(); + DataOutputBuffer outputBuffer2 = new DataOutputBuffer(); + + assertTrue(outputBuffer.getLength() == 0); + assertTrue(outputBuffer2.getLength() == 0); + + Compressor compressor = codec.createCompressor(); + Compressor compressor2 = codec2.createCompressor(); + LOG.info("compressor: {}", compressor.getClass().toString()); + LOG.info("compressor2: {}", compressor2.getClass().toString()); + + assertThat(compressor).as("compressor should not be null").isNotNull(); + assertThat(compressor2).as("compressor should not be null").isNotNull(); + + CompressionOutputStream compressStream = codec.createOutputStream(outputBuffer, compressor); + CompressionOutputStream compressStream2 = codec2.createOutputStream(outputBuffer2, compressor2); + + long randonSeed = r.nextLong(); + r.setSeed(randonSeed); + + int inputSize = r.nextInt(256) + 1; + byte[] b = new byte[inputSize]; + LOG.info("inputSize: " + inputSize); + r.nextBytes(b); + compressStream.write(b); + compressStream.flush(); + compressStream.finish(); + + compressStream2.write(b); + compressStream2.flush(); + compressStream2.finish(); + + assertTrue(outputBuffer.getLength() >= 0); + int compressedSize = outputBuffer.getLength(); + LOG.info("compressor compressed size: " + compressedSize); + + assertTrue(outputBuffer2.getLength() >= 0); + int compressedSize2 = outputBuffer2.getLength(); + LOG.info("compressor2 compressed size: " + compressedSize2); + + checkCompressedOutput(outputBuffer, compressedSize, b, inputSize, codec); + checkCompressedOutput(outputBuffer2, compressedSize2, b, inputSize, codec); + } + } } diff --git a/hadoop-project/pom.xml b/hadoop-project/pom.xml index 9392a9f67fb3e..625b45a1606ae 100644 --- a/hadoop-project/pom.xml +++ b/hadoop-project/pom.xml @@ -141,6 +141,7 @@ 4.1.68.Final 1.1.8.2 1.7.1 + 0.16 0.5.1 @@ -1762,6 +1763,11 @@ lz4-java ${lz4-java.version} + + io.airlift + aircompressor + ${aircompressor.version} + @@ -2459,5 +2465,9 @@ + + twitter + https://maven.twttr.com/ +