From d21a4ba5eba3481800f9bcb9fb8311c6aa4e00b9 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Mon, 20 Sep 2021 16:34:03 -0700 Subject: [PATCH 1/5] Add Lzo Compressor and Decompressor. --- hadoop-common-project/hadoop-common/pom.xml | 11 + .../java/org/apache/hadoop/io/IOUtils.java | 2 + .../hadoop/io/compress/CodecConstants.java | 10 + .../apache/hadoop/io/compress/LzoCodec2.java | 186 +++++++++++ .../apache/hadoop/io/compress/LzopCodec.java | 185 +++++++++++ .../hadoop/io/compress/lzo/LzoCompressor.java | 298 ++++++++++++++++++ .../io/compress/lzo/LzoDecompressor.java | 197 ++++++++++++ .../io/compress/lzo/LzopCompressor.java | 226 +++++++++++++ .../io/compress/lzo/LzopDecompressor.java | 220 +++++++++++++ .../hadoop/io/compress/lzo/package-info.java | 22 ++ .../apache/hadoop/io/compress/TestCodec.java | 194 ++++++++++++ hadoop-project/pom.xml | 10 + 12 files changed, 1561 insertions(+) create mode 100644 hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/LzoCodec2.java create mode 100644 hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/LzopCodec.java create mode 100644 hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/lzo/LzoCompressor.java create mode 100644 hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/lzo/LzoDecompressor.java create mode 100644 hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/lzo/LzopCompressor.java create mode 100644 hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/lzo/LzopDecompressor.java create mode 100644 hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/lzo/package-info.java diff --git a/hadoop-common-project/hadoop-common/pom.xml b/hadoop-common-project/hadoop-common/pom.xml index c0235b4db9e9c..7cf9e741dd641 100644 --- a/hadoop-common-project/hadoop-common/pom.xml +++ b/hadoop-common-project/hadoop-common/pom.xml @@ -375,6 +375,17 @@ lz4-java provided + + io.airlift + aircompressor + provided + + + com.hadoop.gplcompression + hadoop-lzo + 0.4.21-SNAPSHOT + test + diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/IOUtils.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/IOUtils.java index 121af64b01182..69e74bb1a8826 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/IOUtils.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/IOUtils.java @@ -92,12 +92,14 @@ public static void copyBytes(InputStream in, OutputStream out, int buffSize) PrintStream ps = out instanceof PrintStream ? (PrintStream)out : null; byte buf[] = new byte[buffSize]; int bytesRead = in.read(buf); + LOG.info("bytesRead: {}", bytesRead); while (bytesRead >= 0) { out.write(buf, 0, bytesRead); if ((ps != null) && ps.checkError()) { throw new IOException("Unable to write to output stream."); } bytesRead = in.read(buf); + LOG.info("bytesRead: {}", bytesRead); } } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/CodecConstants.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/CodecConstants.java index 96410a18ebcb5..b29d79c175fc6 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/CodecConstants.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/CodecConstants.java @@ -65,4 +65,14 @@ private CodecConstants() { * Default extension for {@link org.apache.hadoop.io.compress.ZStandardCodec}. */ public static final String ZSTANDARD_CODEC_EXTENSION = ".zst"; + + /** + * Default extension for {@link org.apache.hadoop.io.compress.LzoCodec}. + */ + public static final String LZO_CODEC_EXTENSION = ".lzo_deflate"; + + /** + * Default extension for {@link org.apache.hadoop.io.compress.LzopCodec}. + */ + public static final String LZOP_CODEC_EXTENSION = ".lzo"; } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/LzoCodec2.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/LzoCodec2.java new file mode 100644 index 0000000000000..cca829af4f979 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/LzoCodec2.java @@ -0,0 +1,186 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.io.compress; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; + +import org.apache.hadoop.conf.Configurable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CommonConfigurationKeys; +import org.apache.hadoop.io.compress.lzo.LzoCompressor; +import org.apache.hadoop.io.compress.lzo.LzoDecompressor; + +/** + * This class creates lzo compressors/decompressors. + */ +public class LzoCodec2 implements Configurable, CompressionCodec { + Configuration conf; + + /** + * Set the configuration to be used by this object. + * + * @param conf the configuration object. + */ + @Override + public void setConf(Configuration conf) { + this.conf = conf; + } + + /** + * Return the configuration used by this object. + * + * @return the configuration object used by this object. + */ + @Override + public Configuration getConf() { + return conf; + } + + /** + * Create a {@link CompressionOutputStream} that will write to the given + * {@link OutputStream}. + * + * @param out the location for the final output stream + * @return a stream the user can write uncompressed data to have it compressed + * @throws IOException + */ + @Override + public CompressionOutputStream createOutputStream(OutputStream out) + throws IOException { + return CompressionCodec.Util. + createOutputStreamWithCodecPool(this, conf, out); + } + + /** + * Create a {@link CompressionOutputStream} that will write to the given + * {@link OutputStream} with the given {@link Compressor}. + * + * @param out the location for the final output stream + * @param compressor compressor to use + * @return a stream the user can write uncompressed data to have it compressed + * @throws IOException + */ + @Override + public CompressionOutputStream createOutputStream(OutputStream out, + Compressor compressor) throws IOException { + int bufferSize = conf.getInt( + CommonConfigurationKeys.IO_COMPRESSION_CODEC_LZO_BUFFERSIZE_KEY, + CommonConfigurationKeys.IO_COMPRESSION_CODEC_LZO_BUFFERSIZE_DEFAULT); + io.airlift.compress.lzo.LzoCodec codec = new io.airlift.compress.lzo.LzoCodec(); + Configuration lzoConf = new Configuration(this.conf); + lzoConf.setInt(CommonConfigurationKeys.IO_COMPRESSION_CODEC_LZO_BUFFERSIZE_KEY, bufferSize); + codec.setConf(lzoConf); + return codec.createOutputStream(out); + } + + /** + * Get the type of {@link Compressor} needed by this {@link CompressionCodec}. + * + * @return the type of compressor needed by this codec. + */ + @Override + public Class getCompressorType() { + return LzoCompressor.class; + } + + /** + * Create a new {@link Compressor} for use by this {@link CompressionCodec}. + * + * @return a new compressor for use by this codec + */ + @Override + public Compressor createCompressor() { + int bufferSize = conf.getInt( + CommonConfigurationKeys.IO_COMPRESSION_CODEC_LZO_BUFFERSIZE_KEY, + CommonConfigurationKeys.IO_COMPRESSION_CODEC_LZO_BUFFERSIZE_DEFAULT); + return new LzoCompressor(bufferSize); + } + + /** + * Create a {@link CompressionInputStream} that will read from the given + * input stream. + * + * @param in the stream to read compressed bytes from + * @return a stream to read uncompressed bytes from + * @throws IOException + */ + @Override + public CompressionInputStream createInputStream(InputStream in) + throws IOException { + return CompressionCodec.Util. + createInputStreamWithCodecPool(this, conf, in); + } + + /** + * Create a {@link CompressionInputStream} that will read from the given + * {@link InputStream} with the given {@link Decompressor}. + * + * @param in the stream to read compressed bytes from + * @param decompressor decompressor to use + * @return a stream to read uncompressed bytes from + * @throws IOException + */ + @Override + public CompressionInputStream createInputStream(InputStream in, + Decompressor decompressor) throws IOException { + int bufferSize = conf.getInt( + CommonConfigurationKeys.IO_COMPRESSION_CODEC_LZO_BUFFERSIZE_KEY, + CommonConfigurationKeys.IO_COMPRESSION_CODEC_LZO_BUFFERSIZE_DEFAULT); + io.airlift.compress.lzo.LzoCodec codec = new io.airlift.compress.lzo.LzoCodec(); + Configuration lzoConf = new Configuration(this.conf); + lzoConf.setInt(CommonConfigurationKeys.IO_COMPRESSION_CODEC_LZO_BUFFERSIZE_KEY, bufferSize); + codec.setConf(lzoConf); + return codec.createInputStream(in); + } + + /** + * Get the type of {@link Decompressor} needed by this {@link CompressionCodec}. + * + * @return the type of decompressor needed by this codec. + */ + @Override + public Class getDecompressorType() { + return LzoDecompressor.class; + } + + /** + * Create a new {@link Decompressor} for use by this {@link CompressionCodec}. + * + * @return a new decompressor for use by this codec + */ + @Override + public Decompressor createDecompressor() { + int bufferSize = conf.getInt( + CommonConfigurationKeys.IO_COMPRESSION_CODEC_LZO_BUFFERSIZE_KEY, + CommonConfigurationKeys.IO_COMPRESSION_CODEC_LZO_BUFFERSIZE_DEFAULT); + return new LzoDecompressor(bufferSize); + } + + /** + * Get the default filename extension for this kind of compression. + * + * @return .lzo_deflate. + */ + @Override + public String getDefaultExtension() { + return CodecConstants.LZO_CODEC_EXTENSION; + } +} diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/LzopCodec.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/LzopCodec.java new file mode 100644 index 0000000000000..94e51434f3a1d --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/LzopCodec.java @@ -0,0 +1,185 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.io.compress; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; + +import org.apache.hadoop.conf.Configurable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CommonConfigurationKeys; +import org.apache.hadoop.io.compress.lzo.*; + +/** + * This class creates lzop compressors/decompressors. + */ +public class LzopCodec implements Configurable, CompressionCodec { + private Configuration conf; + + /** + * Set the configuration to be used by this object. + * + * @param conf the configuration object. + */ + @Override + public void setConf(Configuration conf) { + this.conf = conf; + } + + /** + * Return the configuration used by this object. + * + * @return the configuration object used by this objec. + */ + @Override + public Configuration getConf() { + return conf; + } + + /** + * Create a {@link CompressionOutputStream} that will write to the given + * {@link OutputStream}. + * + * @param out the location for the final output stream + * @return a stream the user can write uncompressed data to have it compressed + * @throws IOException + */ + @Override + public CompressionOutputStream createOutputStream(OutputStream out) + throws IOException { + return CompressionCodec.Util. + createOutputStreamWithCodecPool(this, conf, out); + } + + /** + * Create a {@link CompressionOutputStream} that will write to the given + * {@link OutputStream} with the given {@link Compressor}. + * + * @param out the location for the final output stream + * @param compressor compressor to use + * @return a stream the user can write uncompressed data to have it compressed + * @throws IOException + */ + @Override + public CompressionOutputStream createOutputStream(OutputStream out, + Compressor compressor) throws IOException { + int bufferSize = conf.getInt( + CommonConfigurationKeys.IO_COMPRESSION_CODEC_LZO_BUFFERSIZE_KEY, + CommonConfigurationKeys.IO_COMPRESSION_CODEC_LZO_BUFFERSIZE_DEFAULT); + io.airlift.compress.lzo.LzopCodec codec = new io.airlift.compress.lzo.LzopCodec(); + Configuration lzoConf = new Configuration(this.conf); + lzoConf.setInt(CommonConfigurationKeys.IO_COMPRESSION_CODEC_LZO_BUFFERSIZE_KEY, bufferSize); + codec.setConf(lzoConf); + return codec.createOutputStream(out); + } + + /** + * Get the type of {@link Compressor} needed by this {@link CompressionCodec}. + * + * @return the type of compressor needed by this codec. + */ + @Override + public Class getCompressorType() { + return LzopCompressor.class; + } + + /** + * Create a new {@link Compressor} for use by this {@link CompressionCodec}. + * + * @return a new compressor for use by this codec + */ + @Override + public Compressor createCompressor() { + int bufferSize = conf.getInt( + CommonConfigurationKeys.IO_COMPRESSION_CODEC_LZO_BUFFERSIZE_KEY, + CommonConfigurationKeys.IO_COMPRESSION_CODEC_LZO_BUFFERSIZE_DEFAULT); + return new LzopCompressor(bufferSize); + } + + /** + * Create a {@link CompressionInputStream} that will read from the given + * input stream. + * + * @param in the stream to read compressed bytes from + * @return a stream to read uncompressed bytes from + * @throws IOException + */ + @Override + public CompressionInputStream createInputStream(InputStream in) + throws IOException { + return CompressionCodec.Util. + createInputStreamWithCodecPool(this, conf, in); + } + + /** + * Create a {@link CompressionInputStream} that will read from the given + * {@link InputStream} with the given {@link Decompressor}. + * + * @param in the stream to read compressed bytes from + * @param decompressor decompressor to use + * @return a stream to read uncompressed bytes from + * @throws IOException + */ + @Override + public CompressionInputStream createInputStream(InputStream in, + Decompressor decompressor) throws IOException { + int bufferSize = conf.getInt( + CommonConfigurationKeys.IO_COMPRESSION_CODEC_LZO_BUFFERSIZE_KEY, + CommonConfigurationKeys.IO_COMPRESSION_CODEC_LZO_BUFFERSIZE_DEFAULT); + io.airlift.compress.lzo.LzopCodec codec = new io.airlift.compress.lzo.LzopCodec(); + Configuration lzoConf = new Configuration(this.conf); + lzoConf.setInt(CommonConfigurationKeys.IO_COMPRESSION_CODEC_LZO_BUFFERSIZE_KEY, bufferSize); + codec.setConf(lzoConf); + return codec.createInputStream(in); + } + + /** + * Get the type of {@link Decompressor} needed by this {@link CompressionCodec}. + * + * @return the type of decompressor needed by this codec. + */ + @Override + public Class getDecompressorType() { + return LzopDecompressor.class; + } + + /** + * Create a new {@link Decompressor} for use by this {@link CompressionCodec}. + * + * @return a new decompressor for use by this codec + */ + @Override + public Decompressor createDecompressor() { + int bufferSize = conf.getInt( + CommonConfigurationKeys.IO_COMPRESSION_CODEC_LZO_BUFFERSIZE_KEY, + CommonConfigurationKeys.IO_COMPRESSION_CODEC_LZO_BUFFERSIZE_DEFAULT); + return new LzopDecompressor(bufferSize); + } + + /** + * Get the default filename extension for this kind of compression. + * + * @return .lzo. + */ + @Override + public String getDefaultExtension() { + return CodecConstants.LZO_CODEC_EXTENSION; + } +} diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/lzo/LzoCompressor.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/lzo/LzoCompressor.java new file mode 100644 index 0000000000000..9e16a49b28b5f --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/lzo/LzoCompressor.java @@ -0,0 +1,298 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.io.compress.lzo; + +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.compress.Compressor; + +public class LzoCompressor implements Compressor { + private static final int DEFAULT_DIRECT_BUFFER_SIZE = 64 * 1024; + + private int bufferSize; + private byte[] input; + private int inputOffset; + private int inputLength; + + private byte[] inputBuffer; + private int inputBufferLen; + private int inputBufferOffset; + private int inputBufferMaxLen; + + private boolean finish; + // If `finished` is true, meaning all inputs are consumed and no input in buffer. + private boolean finished; + + private byte[] outputBuffer; + private int outputBufferLen; + private int outputBufferOffset; + private int outputBufferMaxLen; + + private long bytesRead; + private long bytesWritten; + + private io.airlift.compress.lzo.LzoCompressor compressor; + + /** + * Creates a new compressor. + */ + public LzoCompressor(int bufferSize) { + this.bufferSize = bufferSize; + compressor = new io.airlift.compress.lzo.LzoCompressor(); + + reset(); + } + + /** + * Creates a new compressor with the default buffer size. + */ + public LzoCompressor() { + this(DEFAULT_DIRECT_BUFFER_SIZE); + } + + /** + * Sets input data for compression. + * This should be called whenever #needsInput() returns + * true indicating that more input data is required. + * + * @param b Input data + * @param off Start offset + * @param len Length + */ + @Override + public void setInput(byte[] b, int off, int len) { + if (b == null) { + throw new NullPointerException(); + } + if (off < 0 || len < 0 || off > b.length - len) { + throw new ArrayIndexOutOfBoundsException(); + } + + // Check if we can buffer current input. + if (len > inputBufferMaxLen - inputBufferOffset) { + input = b; + inputOffset = off; + inputLength = len; + } else { + // We can buffer current input. + System.arraycopy(inputBuffer, inputBufferOffset, b, off, len); + inputBufferOffset += len; + inputBufferLen += len; + } + + bytesRead += len; + } + + /** + * Does nothing. + */ + @Override + public void setDictionary(byte[] b, int off, int len) { + // do nothing + } + + /** + * Returns true if the input data buffer is empty and + * #setInput() should be called to provide more input. + * + * @return true if the input data buffer is empty and + * #setInput() should be called in order to provide more input. + */ + @Override + public boolean needsInput() { + // We do not need input if: + // 1. there still are compressed output in buffer, or + // 2. input buffer is full, or + // 3. there is direct input (the input cannot put into input buffer). + // But if `finish()` is called, this compressor does not need input after all. + return !((outputBufferLen - outputBufferOffset > 0) || + (inputBufferMaxLen - inputBufferOffset == 0) || inputLength > 0) && !finish; + } + + /** + * When called, indicates that compression should end + * with the current contents of the input buffer. + */ + @Override + public void finish() { + finish = true; + } + + /** + * Returns true if the end of the compressed + * data output stream has been reached. + * + * @return true if the end of the compressed + * data output stream has been reached. + */ + @Override + public boolean finished() { + // This compressor is in finished status if: + // 1. `finish()` is called, and + // 2. all input are consumed, and + // 3. no compressed data is in buffer. + return finish && finished && (outputBufferLen - outputBufferOffset == 0); + } + + /** + * Fills specified buffer with compressed data. Returns actual number + * of bytes of compressed data. A return value of 0 indicates that + * needsInput() should be called in order to determine if more input + * data is required. + * + * @param b Buffer for the compressed data + * @param off Start offset of the data + * @param len Size of the buffer + * @return The actual number of bytes of compressed data. + */ + @Override + public int compress(byte[] b, int off, int len) + throws IOException { + if (b == null) { + throw new NullPointerException(); + } + if (off < 0 || len < 0 || off > b.length - len) { + throw new ArrayIndexOutOfBoundsException(); + } + + // Check compressed data in output buffer. + if (outputBufferLen > 0) { + int outputSize = Math.min(outputBufferLen, len); + System.arraycopy(outputBuffer, outputBufferOffset, b, off, outputSize); + outputBufferOffset += outputSize; + outputBufferLen -= outputSize; + bytesWritten += outputSize; + return outputSize; + } + + outputBufferOffset = outputBufferLen = 0; + + int inputSize = 0; + // If no input data in input buffer. + if (inputBufferLen == 0) { + // Copy direct input. + inputBufferOffset = 0; + inputSize = copyIntoInputBuffer(); + if (inputSize == 0) { + finished = true; + return 0; + } + } else { + inputSize = inputBufferLen; + } + + System.out.println("inputBufferOffset: " + inputBufferOffset + " inputSize: " + inputSize); + // Compress input and write to output buffer. + int compressedSize = compressor.compress(inputBuffer, inputBufferOffset, inputSize, + outputBuffer, outputBufferOffset, outputBufferMaxLen); + inputBufferOffset += inputSize; + inputBufferLen -= inputSize; + + outputBufferLen += compressedSize; + System.out.println("compressedSize: " + compressedSize + " outputBufferLen: " + outputBufferLen); + + if (inputLength == 0) { + finished = true; + } + + // Copy from compressed data buffer to user buffer. + int copiedSize = Math.min(compressedSize, len); + bytesWritten += copiedSize; + System.arraycopy(outputBuffer, outputBufferOffset, b, off, copiedSize); + outputBufferOffset += copiedSize; + outputBufferLen -= copiedSize; + System.out.println("copiedSize: " + copiedSize + " outputBufferOffset: " + outputBufferOffset + " outputBufferLen: " + outputBufferLen); + + return copiedSize; + } + + /** + * Copies the some input data from user input to input buffer. + */ + int copyIntoInputBuffer() { + if (inputLength == 0) { + return 0; + } + + int inputSize = Math.min(inputLength, inputBufferMaxLen); + System.arraycopy(input, inputOffset, inputBuffer, inputBufferOffset, inputSize); + + inputBufferOffset += inputSize; + inputBufferLen += inputSize; + + inputOffset += inputSize; + inputLength -= inputSize; + + return inputSize; + } + + /** + * Resets compressor so that a new set of input data can be processed. + */ + @Override + public void reset() { + input = null; + inputOffset = inputLength = 0; + finish = finished = false; + + inputBufferMaxLen = bufferSize; + outputBufferMaxLen = compressor.maxCompressedLength(inputBufferMaxLen); + + inputBuffer = new byte[inputBufferMaxLen]; + outputBuffer = new byte[outputBufferMaxLen]; + + inputBufferLen = inputBufferOffset = outputBufferLen = outputBufferOffset = 0; + bytesRead = bytesWritten = 0; + } + + /** + * Prepare the compressor to be used in a new stream with settings defined in + * the given Configuration + * + * @param conf Configuration from which new setting are fetched + */ + @Override + public void reinit(Configuration conf) { + reset(); + } + + /** + * Return number of bytes given to this compressor since last reset. + */ + @Override + public long getBytesRead() { + return bytesRead; + } + + /** + * Return number of bytes consumed by callers of compress since last reset. + */ + @Override + public long getBytesWritten() { + return bytesWritten; + } + + /** + * Closes the compressor and discards any unprocessed input. + */ + @Override + public void end() { + } +} diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/lzo/LzoDecompressor.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/lzo/LzoDecompressor.java new file mode 100644 index 0000000000000..4d9f686463db6 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/lzo/LzoDecompressor.java @@ -0,0 +1,197 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.io.compress.lzo; + +import java.io.IOException; +import java.nio.Buffer; +import java.nio.ByteBuffer; + +import org.apache.hadoop.io.compress.Decompressor; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class LzoDecompressor implements Decompressor { + private static final Logger LOG = + LoggerFactory.getLogger(LzoDecompressor.class.getName()); + private static final int DEFAULT_DIRECT_BUFFER_SIZE = 64 * 1024; + + private int bufferSize; + private byte[] input; + private int inputOffset; + private int inputLength; + + private byte[] inputBuffer; + private int inputBufferLen; + private int inputBufferOffset; + private int inputBufferMaxLen; + + private boolean finish; + // If `finished` is true, meaning all inputs are consumed and no input in buffer. + private boolean finished; + + private byte[] outputBuffer; + private int outputBufferLen; + private int outputBufferOffset; + private int outputBufferMaxLen; + + private long bytesRead; + private long bytesWritten; + + private io.airlift.compress.lzo.LzoDecompressor decompressor; + + /** + * Creates a new compressor. + * + * @param bufferSize size of the direct buffer to be used. + */ + public LzoDecompressor(int bufferSize) { + this.bufferSize = bufferSize; + decompressor = new io.airlift.compress.lzo.LzoDecompressor(); + + reset(); + } + + /** + * Creates a new decompressor with the default buffer size. + */ + public LzoDecompressor() { + this(DEFAULT_DIRECT_BUFFER_SIZE); + } + + /** + * Sets input data for decompression. + * This should be called if and only if {@link #needsInput()} returns + * true indicating that more input data is required. + * (Both native and non-native versions of various Decompressors require + * that the data passed in via b[] remain unmodified until + * the caller is explicitly notified--via {@link #needsInput()}--that the + * buffer may be safely modified. With this requirement, an extra + * buffer-copy can be avoided.) + * + * @param b Input data + * @param off Start offset + * @param len Length + */ + @Override + public synchronized void setInput(byte[] b, int off, int len) { + if (b == null) { + throw new NullPointerException(); + } + if (off < 0 || len < 0 || off > b.length - len) { + throw new ArrayIndexOutOfBoundsException(); + } + + } + + /** + * If a write would exceed the capacity of the direct buffers, it is set + * aside to be loaded by this function while the compressed data are + * consumed. + */ + synchronized void setInputFromSavedData() { + } + + /** + * Does nothing. + */ + @Override + public synchronized void setDictionary(byte[] b, int off, int len) { + // do nothing + } + + /** + * Returns true if the input data buffer is empty and + * {@link #setInput(byte[], int, int)} should be called to + * provide more input. + * + * @return true if the input data buffer is empty and + * {@link #setInput(byte[], int, int)} should be called in + * order to provide more input. + */ + @Override + public synchronized boolean needsInput() { + return false; + } + + /** + * Returns false. + * + * @return false. + */ + @Override + public synchronized boolean needsDictionary() { + return false; + } + + /** + * Returns true if the end of the decompressed + * data output stream has been reached. + * + * @return true if the end of the decompressed + * data output stream has been reached. + */ + @Override + public synchronized boolean finished() { + return (finished); + } + + /** + * Fills specified buffer with uncompressed data. Returns actual number + * of bytes of uncompressed data. A return value of 0 indicates that + * {@link #needsInput()} should be called in order to determine if more + * input data is required. + * + * @param b Buffer for the compressed data + * @param off Start offset of the data + * @param len Size of the buffer + * @return The actual number of bytes of compressed data. + * @throws IOException + */ + @Override + public synchronized int decompress(byte[] b, int off, int len) + throws IOException + { + throw new UnsupportedOperationException("LZO block decompressor is not supported"); + } + + /** + * Returns 0. + * + * @return 0. + */ + @Override + public synchronized int getRemaining() { + // Never use this function in BlockDecompressorStream. + return 0; + } + + @Override + public synchronized void reset() { + + } + + /** + * Resets decompressor and input and output buffers so that a new set of + * input data can be processed. + */ + @Override + public synchronized void end() { + // do nothing + } +} diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/lzo/LzopCompressor.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/lzo/LzopCompressor.java new file mode 100644 index 0000000000000..dcc512cd772f2 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/lzo/LzopCompressor.java @@ -0,0 +1,226 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.io.compress.lzo; + +import java.io.IOException; +import java.nio.Buffer; +import java.nio.ByteBuffer; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.compress.Compressor; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class LzopCompressor implements Compressor { + private static final Logger LOG = + LoggerFactory.getLogger(LzopCompressor.class.getName()); + private static final int DEFAULT_DIRECT_BUFFER_SIZE = 64 * 1024; + + private int directBufferSize; + private Buffer compressedDirectBuf = null; + private int uncompressedDirectBufLen; + private Buffer uncompressedDirectBuf = null; + private byte[] userBuf = null; + private int userBufOff = 0, userBufLen = 0; + private boolean finish, finished; + + private long bytesRead = 0L; + private long bytesWritten = 0L; + + /** + * Creates a new compressor. + * + * @param directBufferSize size of the direct buffer to be used. + */ + public LzopCompressor(int directBufferSize) { + this.directBufferSize = directBufferSize; + + uncompressedDirectBuf = ByteBuffer.allocateDirect(directBufferSize); + compressedDirectBuf = ByteBuffer.allocateDirect(directBufferSize); + compressedDirectBuf.position(directBufferSize); + } + + /** + * Creates a new compressor with the default buffer size. + */ + public LzopCompressor() { + this(DEFAULT_DIRECT_BUFFER_SIZE); + } + + /** + * Sets input data for compression. + * This should be called whenever #needsInput() returns + * true indicating that more input data is required. + * + * @param b Input data + * @param off Start offset + * @param len Length + */ + @Override + public synchronized void setInput(byte[] b, int off, int len) { + if (b == null) { + throw new NullPointerException(); + } + if (off < 0 || len < 0 || off > b.length - len) { + throw new ArrayIndexOutOfBoundsException(); + } + finished = false; + + if (len > uncompressedDirectBuf.remaining()) { + // save data; now !needsInput + this.userBuf = b; + this.userBufOff = off; + this.userBufLen = len; + } else { + ((ByteBuffer) uncompressedDirectBuf).put(b, off, len); + uncompressedDirectBufLen = uncompressedDirectBuf.position(); + } + + bytesRead += len; + } + + /** + * If a write would exceed the capacity of the direct buffers, it is set + * aside to be loaded by this function while the compressed data are + * consumed. + */ + synchronized void setInputFromSavedData() { + if (0 >= userBufLen) { + return; + } + finished = false; + + uncompressedDirectBufLen = Math.min(userBufLen, directBufferSize); + ((ByteBuffer) uncompressedDirectBuf).put(userBuf, userBufOff, + uncompressedDirectBufLen); + + // Note how much data is being fed to lz4 + userBufOff += uncompressedDirectBufLen; + userBufLen -= uncompressedDirectBufLen; + } + + /** + * Does nothing. + */ + @Override + public synchronized void setDictionary(byte[] b, int off, int len) { + // do nothing + } + + /** + * Returns true if the input data buffer is empty and + * #setInput() should be called to provide more input. + * + * @return true if the input data buffer is empty and + * #setInput() should be called in order to provide more input. + */ + @Override + public synchronized boolean needsInput() { + return !(compressedDirectBuf.remaining() > 0 + || uncompressedDirectBuf.remaining() == 0 || userBufLen > 0); + } + + /** + * When called, indicates that compression should end + * with the current contents of the input buffer. + */ + @Override + public synchronized void finish() { + finish = true; + } + + /** + * Returns true if the end of the compressed + * data output stream has been reached. + * + * @return true if the end of the compressed + * data output stream has been reached. + */ + @Override + public synchronized boolean finished() { + // Check if all uncompressed data has been consumed + return (finish && finished && compressedDirectBuf.remaining() == 0); + } + + /** + * Fills specified buffer with compressed data. Returns actual number + * of bytes of compressed data. A return value of 0 indicates that + * needsInput() should be called in order to determine if more input + * data is required. + * + * @param b Buffer for the compressed data + * @param off Start offset of the data + * @param len Size of the buffer + * @return The actual number of bytes of compressed data. + */ + @Override + public synchronized int compress(byte[] b, int off, int len) + throws IOException { + throw new UnsupportedOperationException("LZO block compressor is not supported"); + } + + /** + * Resets compressor so that a new set of input data can be processed. + */ + @Override + public synchronized void reset() { + finish = false; + finished = false; + uncompressedDirectBuf.clear(); + uncompressedDirectBufLen = 0; + compressedDirectBuf.clear(); + compressedDirectBuf.limit(0); + userBufOff = userBufLen = 0; + bytesRead = bytesWritten = 0L; + } + + /** + * Prepare the compressor to be used in a new stream with settings defined in + * the given Configuration + * + * @param conf Configuration from which new setting are fetched + */ + @Override + public synchronized void reinit(Configuration conf) { + reset(); + } + + /** + * Return number of bytes given to this compressor since last reset. + */ + @Override + public synchronized long getBytesRead() { + return bytesRead; + } + + /** + * Return number of bytes consumed by callers of compress since last reset. + */ + @Override + public synchronized long getBytesWritten() { + return bytesWritten; + } + + /** + * Closes the compressor and discards any unprocessed input. + */ + @Override + public synchronized void end() { + } +} diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/lzo/LzopDecompressor.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/lzo/LzopDecompressor.java new file mode 100644 index 0000000000000..bd704af006e90 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/lzo/LzopDecompressor.java @@ -0,0 +1,220 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.io.compress.lzo; + +import java.io.IOException; +import java.nio.Buffer; +import java.nio.ByteBuffer; + +import org.apache.hadoop.io.compress.Decompressor; +import org.apache.hadoop.io.compress.lz4.Lz4Compressor; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class LzopDecompressor implements Decompressor { + private static final Logger LOG = + LoggerFactory.getLogger(Lz4Compressor.class.getName()); + private static final int DEFAULT_DIRECT_BUFFER_SIZE = 64 * 1024; + + private int directBufferSize; + private Buffer compressedDirectBuf = null; + private int compressedDirectBufLen; + private Buffer uncompressedDirectBuf = null; + private byte[] userBuf = null; + private int userBufOff = 0, userBufLen = 0; + private boolean finished; + + /** + * Creates a new compressor. + * + * @param directBufferSize size of the direct buffer to be used. + */ + public LzopDecompressor(int directBufferSize) { + this.directBufferSize = directBufferSize; + + compressedDirectBuf = ByteBuffer.allocateDirect(directBufferSize); + uncompressedDirectBuf = ByteBuffer.allocateDirect(directBufferSize); + uncompressedDirectBuf.position(directBufferSize); + } + + /** + * Creates a new decompressor with the default buffer size. + */ + public LzopDecompressor() { + this(DEFAULT_DIRECT_BUFFER_SIZE); + } + + /** + * Sets input data for decompression. + * This should be called if and only if {@link #needsInput()} returns + * true indicating that more input data is required. + * (Both native and non-native versions of various Decompressors require + * that the data passed in via b[] remain unmodified until + * the caller is explicitly notified--via {@link #needsInput()}--that the + * buffer may be safely modified. With this requirement, an extra + * buffer-copy can be avoided.) + * + * @param b Input data + * @param off Start offset + * @param len Length + */ + @Override + public synchronized void setInput(byte[] b, int off, int len) { + if (b == null) { + throw new NullPointerException(); + } + if (off < 0 || len < 0 || off > b.length - len) { + throw new ArrayIndexOutOfBoundsException(); + } + + this.userBuf = b; + this.userBufOff = off; + this.userBufLen = len; + + setInputFromSavedData(); + + // Reinitialize lz4's output direct-buffer + uncompressedDirectBuf.limit(directBufferSize); + uncompressedDirectBuf.position(directBufferSize); + } + + /** + * If a write would exceed the capacity of the direct buffers, it is set + * aside to be loaded by this function while the compressed data are + * consumed. + */ + synchronized void setInputFromSavedData() { + compressedDirectBufLen = Math.min(userBufLen, directBufferSize); + + // Reinitialize lz4's input direct buffer + compressedDirectBuf.rewind(); + ((ByteBuffer) compressedDirectBuf).put(userBuf, userBufOff, + compressedDirectBufLen); + + // Note how much data is being fed to lz4 + userBufOff += compressedDirectBufLen; + userBufLen -= compressedDirectBufLen; + } + + /** + * Does nothing. + */ + @Override + public synchronized void setDictionary(byte[] b, int off, int len) { + // do nothing + } + + /** + * Returns true if the input data buffer is empty and + * {@link #setInput(byte[], int, int)} should be called to + * provide more input. + * + * @return true if the input data buffer is empty and + * {@link #setInput(byte[], int, int)} should be called in + * order to provide more input. + */ + @Override + public synchronized boolean needsInput() { + // Consume remaining compressed data? + if (uncompressedDirectBuf.remaining() > 0) { + return false; + } + + // Check if lz4 has consumed all input + if (compressedDirectBufLen <= 0) { + // Check if we have consumed all user-input + if (userBufLen <= 0) { + return true; + } else { + setInputFromSavedData(); + } + } + + return false; + } + + /** + * Returns false. + * + * @return false. + */ + @Override + public synchronized boolean needsDictionary() { + return false; + } + + /** + * Returns true if the end of the decompressed + * data output stream has been reached. + * + * @return true if the end of the decompressed + * data output stream has been reached. + */ + @Override + public synchronized boolean finished() { + return (finished && uncompressedDirectBuf.remaining() == 0); + } + + /** + * Fills specified buffer with uncompressed data. Returns actual number + * of bytes of uncompressed data. A return value of 0 indicates that + * {@link #needsInput()} should be called in order to determine if more + * input data is required. + * + * @param b Buffer for the compressed data + * @param off Start offset of the data + * @param len Size of the buffer + * @return The actual number of bytes of compressed data. + * @throws IOException + */ + @Override + public synchronized int decompress(byte[] b, int off, int len) + throws IOException { + throw new UnsupportedOperationException("LZO block decompressor is not supported"); + } + + /** + * Returns 0. + * + * @return 0. + */ + @Override + public synchronized int getRemaining() { + // Never use this function in BlockDecompressorStream. + return 0; + } + + @Override + public synchronized void reset() { + finished = false; + compressedDirectBufLen = 0; + uncompressedDirectBuf.limit(directBufferSize); + uncompressedDirectBuf.position(directBufferSize); + userBufOff = userBufLen = 0; + } + + /** + * Resets decompressor and input and output buffers so that a new set of + * input data can be processed. + */ + @Override + public synchronized void end() { + // do nothing + } +} diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/lzo/package-info.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/lzo/package-info.java new file mode 100644 index 0000000000000..19effdd39c11a --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/lzo/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +@InterfaceAudience.Private +@InterfaceStability.Unstable +package org.apache.hadoop.io.compress.lzo; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/TestCodec.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/TestCodec.java index 0bda66741ba65..51d2f1b2a91e3 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/TestCodec.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/TestCodec.java @@ -49,6 +49,7 @@ import java.util.zip.GZIPInputStream; import java.util.zip.GZIPOutputStream; +import com.hadoop.compression.lzo.LzoCodec; import org.apache.commons.codec.binary.Base64; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeys; @@ -79,6 +80,7 @@ import org.apache.hadoop.util.ReflectionUtils; import org.junit.After; import org.junit.Assert; +import org.junit.Ignore; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -168,6 +170,18 @@ public void testDeflateCodec() throws IOException { codecTest(conf, seed, count, "org.apache.hadoop.io.compress.DeflateCodec"); } + @Test + public void testLzoCodec() throws IOException { + codecTest(conf, seed, 0, "org.apache.hadoop.io.compress.LzoCodec"); + codecTest(conf, seed, count, "org.apache.hadoop.io.compress.LzoCodec"); + } + + @Test + public void testLzopCodec() throws IOException { + codecTest(conf, seed, 0, "org.apache.hadoop.io.compress.LzopCodec"); + codecTest(conf, seed, count, "org.apache.hadoop.io.compress.LzopCodec"); + } + @Test public void testGzipCodecWithParam() throws IOException { Configuration conf = new Configuration(this.conf); @@ -1260,4 +1274,184 @@ public void testGzipCompressorWithEmptyInput() throws IOException { assertThat(b).as("check decompressed output").isEqualTo(dflchk); } } + + @Test + public void testLzoCompatibilityWithCompressor() throws IOException { + Configuration hadoopConf = new Configuration(); + CompressionCodec codec = ReflectionUtils.newInstance(com.hadoop.compression.lzo.LzoCodec.class, hadoopConf); + Random r = new Random(); + + for (int i = 0; i < 100; i++) { + DataOutputBuffer outputBuffer = new DataOutputBuffer(); + assertTrue(outputBuffer.getLength() == 0); + Compressor compressor = codec.createCompressor(); + LOG.info("compressor: {}", compressor.getClass().toString()); + assertThat(compressor).as("compressor should not be null").isNotNull(); + + CompressionOutputStream compressStream = codec.createOutputStream(outputBuffer, compressor); + + long randonSeed = r.nextLong(); + r.setSeed(randonSeed); + + int inputSize = r.nextInt(256 * 1024 + 1) + 1; + byte[] b = new byte[inputSize]; + r.nextBytes(b); + compressStream.write(b); + compressStream.flush(); + compressStream.finish(); + + assertTrue(outputBuffer.getLength() > 0); + int compressedSize = outputBuffer.getLength(); + + DataInputBuffer lzobuf = new DataInputBuffer(); + assertTrue(lzobuf.getLength() == 0); + + lzobuf.reset(outputBuffer.getData(), 0, outputBuffer.getLength()); + assertTrue(lzobuf.getLength() == compressedSize); + assertTrue(lzobuf.getPosition() == 0); + + Decompressor decom = codec.createDecompressor(); + LOG.info("decompressor: {}", decom.getClass().toString()); + assertThat(decom).as("decompressor should not be null").isNotNull(); + + InputStream gzin = codec.createInputStream(lzobuf, decom); + DataOutputBuffer dflbuf = new DataOutputBuffer(); + IOUtils.copyBytes(gzin, dflbuf, inputSize); + final byte[] dataRead = Arrays.copyOf(dflbuf.getData(), dflbuf.getLength()); + assertArrayEquals(b, dataRead); + + gzin.close(); + } + } + + @Test + public void testLzoCompatibilityWithNewCompressor() throws IOException { + Configuration hadoopConf = new Configuration(); + CompressionCodec codec = ReflectionUtils.newInstance(com.hadoop.compression.lzo.LzoCodec.class, hadoopConf); + CompressionCodec codec2 = ReflectionUtils.newInstance(LzoCodec2.class, hadoopConf); + Random r = new Random(); + + for (int i = 0; i < 100; i++) { + DataOutputBuffer outputBuffer = new DataOutputBuffer(); + assertTrue(outputBuffer.getLength() == 0); + Compressor compressor = codec2.createCompressor(); + LOG.info("compressor: {}", compressor.getClass().toString()); + assertThat(compressor).as("compressor should not be null").isNotNull(); + + CompressionOutputStream compressStream = codec.createOutputStream(outputBuffer, compressor); + + long randonSeed = r.nextLong(); + r.setSeed(randonSeed); + + int inputSize = r.nextInt(256) + 1; + byte[] b = new byte[inputSize]; + r.nextBytes(b); + compressStream.write(b); + compressStream.flush(); + compressStream.finish(); + + assertTrue(outputBuffer.getLength() > 0); + int compressedSize = outputBuffer.getLength(); + + DataInputBuffer lzobuf = new DataInputBuffer(); + assertTrue(lzobuf.getLength() == 0); + + lzobuf.reset(outputBuffer.getData(), 0, outputBuffer.getLength()); + assertTrue(lzobuf.getLength() == compressedSize); + assertTrue(lzobuf.getPosition() == 0); + + Decompressor decom = codec.createDecompressor(); + LOG.info("decompressor: {}", decom.getClass().toString()); + assertThat(decom).as("decompressor should not be null").isNotNull(); + + InputStream gzin = codec.createInputStream(lzobuf, decom); + DataOutputBuffer dflbuf = new DataOutputBuffer(); + IOUtils.copyBytes(gzin, dflbuf, inputSize); + final byte[] dataRead = Arrays.copyOf(dflbuf.getData(), dflbuf.getLength()); + assertArrayEquals(b, dataRead); + + gzin.close(); + } + } + + private void checkCompressedOutput(DataOutputBuffer outputBuffer, int compressedSize, byte[] b, int inputSize, CompressionCodec codec) throws IOException { + DataInputBuffer lzobuf = new DataInputBuffer(); + assertTrue(lzobuf.getLength() == 0); + + lzobuf.reset(outputBuffer.getData(), 0, outputBuffer.getLength()); + assertTrue(lzobuf.getLength() == compressedSize); + assertTrue(lzobuf.getPosition() == 0); + + Decompressor decom = codec.createDecompressor(); + LOG.info("decompressor: {}", decom.getClass().toString()); + assertThat(decom).as("decompressor should not be null").isNotNull(); + + InputStream gzin = codec.createInputStream(lzobuf, decom); + DataOutputBuffer dflbuf = new DataOutputBuffer(); + IOUtils.copyBytes(gzin, dflbuf, inputSize); + final byte[] dataRead = Arrays.copyOf(dflbuf.getData(), dflbuf.getLength()); + assertArrayEquals(b, dataRead); + + gzin.close(); + } + + @Test + public void testLzoNewCompressor() throws IOException { + Configuration hadoopConf = new Configuration(); + CompressionCodec codec = ReflectionUtils.newInstance(com.hadoop.compression.lzo.LzoCodec.class, hadoopConf); + CompressionCodec codec2 = ReflectionUtils.newInstance(LzoCodec2.class, hadoopConf); + Random r = new Random(); + + for (int i = 0; i < 100; i++) { + DataOutputBuffer outputBuffer = new DataOutputBuffer(); + DataOutputBuffer outputBuffer2 = new DataOutputBuffer(); + + assertTrue(outputBuffer.getLength() == 0); + assertTrue(outputBuffer2.getLength() == 0); + + Compressor compressor = codec.createCompressor(); + Compressor compressor2 = codec2.createCompressor(); + LOG.info("compressor: {}", compressor.getClass().toString()); + LOG.info("compressor2: {}", compressor2.getClass().toString()); + + assertThat(compressor).as("compressor should not be null").isNotNull(); + assertThat(compressor2).as("compressor should not be null").isNotNull(); + + CompressionOutputStream compressStream = codec.createOutputStream(outputBuffer, compressor); + CompressionOutputStream compressStream2 = codec2.createOutputStream(outputBuffer2, compressor2); + + long randonSeed = r.nextLong(); + r.setSeed(randonSeed); + + int inputSize = r.nextInt(256) + 1; + byte[] b = new byte[inputSize]; + LOG.info("inputSize: " + inputSize); + r.nextBytes(b); + compressStream.write(b); + compressStream.flush(); + compressStream.finish(); + + compressStream2.write(b); + compressStream2.flush(); + compressStream2.finish(); + + assertTrue(outputBuffer.getLength() >= 0); + int compressedSize = outputBuffer.getLength(); + LOG.info("compressor compressed size: " + compressedSize); + + assertTrue(outputBuffer2.getLength() >= 0); + int compressedSize2 = outputBuffer2.getLength(); + LOG.info("compressor2 compressed size: " + compressedSize2); + + assertThat(compressedSize).isEqualTo(compressedSize2); + + checkCompressedOutput(outputBuffer, compressedSize, b, inputSize, codec); + checkCompressedOutput(outputBuffer2, compressedSize2, b, inputSize, codec); + + final byte[] dataRead = Arrays.copyOf(outputBuffer.getData(), outputBuffer.getLength()); + final byte[] dataRead2 = Arrays.copyOf(outputBuffer2.getData(), outputBuffer2.getLength()); + + assertArrayEquals(dataRead, dataRead2); + } + } } diff --git a/hadoop-project/pom.xml b/hadoop-project/pom.xml index 9392a9f67fb3e..625b45a1606ae 100644 --- a/hadoop-project/pom.xml +++ b/hadoop-project/pom.xml @@ -141,6 +141,7 @@ 4.1.68.Final 1.1.8.2 1.7.1 + 0.16 0.5.1 @@ -1762,6 +1763,11 @@ lz4-java ${lz4-java.version} + + io.airlift + aircompressor + ${aircompressor.version} + @@ -2459,5 +2465,9 @@ + + twitter + https://maven.twttr.com/ + From 132b9a84a26f4303db9e0ef551ca9c3b8b01dd30 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Tue, 2 Nov 2021 19:07:04 -0700 Subject: [PATCH 2/5] clean up. --- .../hadoop/io/compress/lzo/LzoCompressor.java | 28 +++++++++---------- .../apache/hadoop/io/compress/TestCodec.java | 5 ++-- 2 files changed, 15 insertions(+), 18 deletions(-) diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/lzo/LzoCompressor.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/lzo/LzoCompressor.java index 9e16a49b28b5f..0874dfbbaed78 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/lzo/LzoCompressor.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/lzo/LzoCompressor.java @@ -121,9 +121,8 @@ public boolean needsInput() { // 1. there still are compressed output in buffer, or // 2. input buffer is full, or // 3. there is direct input (the input cannot put into input buffer). - // But if `finish()` is called, this compressor does not need input after all. - return !((outputBufferLen - outputBufferOffset > 0) || - (inputBufferMaxLen - inputBufferOffset == 0) || inputLength > 0) && !finish; + return !((outputBufferLen > 0) || + (inputBufferMaxLen - inputBufferOffset == 0) || inputLength > 0); } /** @@ -148,7 +147,7 @@ public boolean finished() { // 1. `finish()` is called, and // 2. all input are consumed, and // 3. no compressed data is in buffer. - return finish && finished && (outputBufferLen - outputBufferOffset == 0); + return finish && finished && (outputBufferLen == 0); } /** @@ -198,15 +197,14 @@ public int compress(byte[] b, int off, int len) inputSize = inputBufferLen; } - System.out.println("inputBufferOffset: " + inputBufferOffset + " inputSize: " + inputSize); // Compress input and write to output buffer. - int compressedSize = compressor.compress(inputBuffer, inputBufferOffset, inputSize, - outputBuffer, outputBufferOffset, outputBufferMaxLen); - inputBufferOffset += inputSize; - inputBufferLen -= inputSize; + int compressedSize = compressor.compress(inputBuffer, 0, inputSize, + outputBuffer, 0, outputBufferMaxLen); + // lzo consumes all buffer input + inputBufferOffset = 0; + inputBufferLen = 0; - outputBufferLen += compressedSize; - System.out.println("compressedSize: " + compressedSize + " outputBufferLen: " + outputBufferLen); + outputBufferLen = compressedSize; if (inputLength == 0) { finished = true; @@ -215,10 +213,9 @@ public int compress(byte[] b, int off, int len) // Copy from compressed data buffer to user buffer. int copiedSize = Math.min(compressedSize, len); bytesWritten += copiedSize; - System.arraycopy(outputBuffer, outputBufferOffset, b, off, copiedSize); + System.arraycopy(outputBuffer, 0, b, off, copiedSize); outputBufferOffset += copiedSize; outputBufferLen -= copiedSize; - System.out.println("copiedSize: " + copiedSize + " outputBufferOffset: " + outputBufferOffset + " outputBufferLen: " + outputBufferLen); return copiedSize; } @@ -231,10 +228,11 @@ int copyIntoInputBuffer() { return 0; } - int inputSize = Math.min(inputLength, inputBufferMaxLen); + finished = false; + + int inputSize = Math.min(inputLength, inputBufferMaxLen - inputBufferOffset); System.arraycopy(input, inputOffset, inputBuffer, inputBufferOffset, inputSize); - inputBufferOffset += inputSize; inputBufferLen += inputSize; inputOffset += inputSize; diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/TestCodec.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/TestCodec.java index 51d2f1b2a91e3..25446f465402c 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/TestCodec.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/TestCodec.java @@ -48,6 +48,7 @@ import java.util.Random; import java.util.zip.GZIPInputStream; import java.util.zip.GZIPOutputStream; +import java.nio.charset.StandardCharsets; import com.hadoop.compression.lzo.LzoCodec; import org.apache.commons.codec.binary.Base64; @@ -1402,7 +1403,7 @@ public void testLzoNewCompressor() throws IOException { CompressionCodec codec2 = ReflectionUtils.newInstance(LzoCodec2.class, hadoopConf); Random r = new Random(); - for (int i = 0; i < 100; i++) { + for (int i = 1; i < 100; i++) { DataOutputBuffer outputBuffer = new DataOutputBuffer(); DataOutputBuffer outputBuffer2 = new DataOutputBuffer(); @@ -1443,8 +1444,6 @@ public void testLzoNewCompressor() throws IOException { int compressedSize2 = outputBuffer2.getLength(); LOG.info("compressor2 compressed size: " + compressedSize2); - assertThat(compressedSize).isEqualTo(compressedSize2); - checkCompressedOutput(outputBuffer, compressedSize, b, inputSize, codec); checkCompressedOutput(outputBuffer2, compressedSize2, b, inputSize, codec); From bc3a46705ff4fe80f5b20524aff2ee539a76c05c Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Tue, 2 Nov 2021 19:12:28 -0700 Subject: [PATCH 3/5] Remove array check. --- .../test/java/org/apache/hadoop/io/compress/TestCodec.java | 6 ------ 1 file changed, 6 deletions(-) diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/TestCodec.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/TestCodec.java index 25446f465402c..ae2d42297834c 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/TestCodec.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/TestCodec.java @@ -48,7 +48,6 @@ import java.util.Random; import java.util.zip.GZIPInputStream; import java.util.zip.GZIPOutputStream; -import java.nio.charset.StandardCharsets; import com.hadoop.compression.lzo.LzoCodec; import org.apache.commons.codec.binary.Base64; @@ -1446,11 +1445,6 @@ public void testLzoNewCompressor() throws IOException { checkCompressedOutput(outputBuffer, compressedSize, b, inputSize, codec); checkCompressedOutput(outputBuffer2, compressedSize2, b, inputSize, codec); - - final byte[] dataRead = Arrays.copyOf(outputBuffer.getData(), outputBuffer.getLength()); - final byte[] dataRead2 = Arrays.copyOf(outputBuffer2.getData(), outputBuffer2.getLength()); - - assertArrayEquals(dataRead, dataRead2); } } } From 426529446c52cdda37a9e8a3aab7bb9683dca263 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Tue, 2 Nov 2021 19:25:57 -0700 Subject: [PATCH 4/5] Remove LzopCodec. --- .../hadoop/io/compress/CodecConstants.java | 5 - .../apache/hadoop/io/compress/LzopCodec.java | 185 -------------- .../io/compress/lzo/LzopCompressor.java | 226 ------------------ .../io/compress/lzo/LzopDecompressor.java | 220 ----------------- .../apache/hadoop/io/compress/TestCodec.java | 6 - 5 files changed, 642 deletions(-) delete mode 100644 hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/LzopCodec.java delete mode 100644 hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/lzo/LzopCompressor.java delete mode 100644 hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/lzo/LzopDecompressor.java diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/CodecConstants.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/CodecConstants.java index b29d79c175fc6..dcb8491a36135 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/CodecConstants.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/CodecConstants.java @@ -70,9 +70,4 @@ private CodecConstants() { * Default extension for {@link org.apache.hadoop.io.compress.LzoCodec}. */ public static final String LZO_CODEC_EXTENSION = ".lzo_deflate"; - - /** - * Default extension for {@link org.apache.hadoop.io.compress.LzopCodec}. - */ - public static final String LZOP_CODEC_EXTENSION = ".lzo"; } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/LzopCodec.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/LzopCodec.java deleted file mode 100644 index 94e51434f3a1d..0000000000000 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/LzopCodec.java +++ /dev/null @@ -1,185 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.io.compress; - -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; - -import org.apache.hadoop.conf.Configurable; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.CommonConfigurationKeys; -import org.apache.hadoop.io.compress.lzo.*; - -/** - * This class creates lzop compressors/decompressors. - */ -public class LzopCodec implements Configurable, CompressionCodec { - private Configuration conf; - - /** - * Set the configuration to be used by this object. - * - * @param conf the configuration object. - */ - @Override - public void setConf(Configuration conf) { - this.conf = conf; - } - - /** - * Return the configuration used by this object. - * - * @return the configuration object used by this objec. - */ - @Override - public Configuration getConf() { - return conf; - } - - /** - * Create a {@link CompressionOutputStream} that will write to the given - * {@link OutputStream}. - * - * @param out the location for the final output stream - * @return a stream the user can write uncompressed data to have it compressed - * @throws IOException - */ - @Override - public CompressionOutputStream createOutputStream(OutputStream out) - throws IOException { - return CompressionCodec.Util. - createOutputStreamWithCodecPool(this, conf, out); - } - - /** - * Create a {@link CompressionOutputStream} that will write to the given - * {@link OutputStream} with the given {@link Compressor}. - * - * @param out the location for the final output stream - * @param compressor compressor to use - * @return a stream the user can write uncompressed data to have it compressed - * @throws IOException - */ - @Override - public CompressionOutputStream createOutputStream(OutputStream out, - Compressor compressor) throws IOException { - int bufferSize = conf.getInt( - CommonConfigurationKeys.IO_COMPRESSION_CODEC_LZO_BUFFERSIZE_KEY, - CommonConfigurationKeys.IO_COMPRESSION_CODEC_LZO_BUFFERSIZE_DEFAULT); - io.airlift.compress.lzo.LzopCodec codec = new io.airlift.compress.lzo.LzopCodec(); - Configuration lzoConf = new Configuration(this.conf); - lzoConf.setInt(CommonConfigurationKeys.IO_COMPRESSION_CODEC_LZO_BUFFERSIZE_KEY, bufferSize); - codec.setConf(lzoConf); - return codec.createOutputStream(out); - } - - /** - * Get the type of {@link Compressor} needed by this {@link CompressionCodec}. - * - * @return the type of compressor needed by this codec. - */ - @Override - public Class getCompressorType() { - return LzopCompressor.class; - } - - /** - * Create a new {@link Compressor} for use by this {@link CompressionCodec}. - * - * @return a new compressor for use by this codec - */ - @Override - public Compressor createCompressor() { - int bufferSize = conf.getInt( - CommonConfigurationKeys.IO_COMPRESSION_CODEC_LZO_BUFFERSIZE_KEY, - CommonConfigurationKeys.IO_COMPRESSION_CODEC_LZO_BUFFERSIZE_DEFAULT); - return new LzopCompressor(bufferSize); - } - - /** - * Create a {@link CompressionInputStream} that will read from the given - * input stream. - * - * @param in the stream to read compressed bytes from - * @return a stream to read uncompressed bytes from - * @throws IOException - */ - @Override - public CompressionInputStream createInputStream(InputStream in) - throws IOException { - return CompressionCodec.Util. - createInputStreamWithCodecPool(this, conf, in); - } - - /** - * Create a {@link CompressionInputStream} that will read from the given - * {@link InputStream} with the given {@link Decompressor}. - * - * @param in the stream to read compressed bytes from - * @param decompressor decompressor to use - * @return a stream to read uncompressed bytes from - * @throws IOException - */ - @Override - public CompressionInputStream createInputStream(InputStream in, - Decompressor decompressor) throws IOException { - int bufferSize = conf.getInt( - CommonConfigurationKeys.IO_COMPRESSION_CODEC_LZO_BUFFERSIZE_KEY, - CommonConfigurationKeys.IO_COMPRESSION_CODEC_LZO_BUFFERSIZE_DEFAULT); - io.airlift.compress.lzo.LzopCodec codec = new io.airlift.compress.lzo.LzopCodec(); - Configuration lzoConf = new Configuration(this.conf); - lzoConf.setInt(CommonConfigurationKeys.IO_COMPRESSION_CODEC_LZO_BUFFERSIZE_KEY, bufferSize); - codec.setConf(lzoConf); - return codec.createInputStream(in); - } - - /** - * Get the type of {@link Decompressor} needed by this {@link CompressionCodec}. - * - * @return the type of decompressor needed by this codec. - */ - @Override - public Class getDecompressorType() { - return LzopDecompressor.class; - } - - /** - * Create a new {@link Decompressor} for use by this {@link CompressionCodec}. - * - * @return a new decompressor for use by this codec - */ - @Override - public Decompressor createDecompressor() { - int bufferSize = conf.getInt( - CommonConfigurationKeys.IO_COMPRESSION_CODEC_LZO_BUFFERSIZE_KEY, - CommonConfigurationKeys.IO_COMPRESSION_CODEC_LZO_BUFFERSIZE_DEFAULT); - return new LzopDecompressor(bufferSize); - } - - /** - * Get the default filename extension for this kind of compression. - * - * @return .lzo. - */ - @Override - public String getDefaultExtension() { - return CodecConstants.LZO_CODEC_EXTENSION; - } -} diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/lzo/LzopCompressor.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/lzo/LzopCompressor.java deleted file mode 100644 index dcc512cd772f2..0000000000000 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/lzo/LzopCompressor.java +++ /dev/null @@ -1,226 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.io.compress.lzo; - -import java.io.IOException; -import java.nio.Buffer; -import java.nio.ByteBuffer; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.io.compress.Compressor; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class LzopCompressor implements Compressor { - private static final Logger LOG = - LoggerFactory.getLogger(LzopCompressor.class.getName()); - private static final int DEFAULT_DIRECT_BUFFER_SIZE = 64 * 1024; - - private int directBufferSize; - private Buffer compressedDirectBuf = null; - private int uncompressedDirectBufLen; - private Buffer uncompressedDirectBuf = null; - private byte[] userBuf = null; - private int userBufOff = 0, userBufLen = 0; - private boolean finish, finished; - - private long bytesRead = 0L; - private long bytesWritten = 0L; - - /** - * Creates a new compressor. - * - * @param directBufferSize size of the direct buffer to be used. - */ - public LzopCompressor(int directBufferSize) { - this.directBufferSize = directBufferSize; - - uncompressedDirectBuf = ByteBuffer.allocateDirect(directBufferSize); - compressedDirectBuf = ByteBuffer.allocateDirect(directBufferSize); - compressedDirectBuf.position(directBufferSize); - } - - /** - * Creates a new compressor with the default buffer size. - */ - public LzopCompressor() { - this(DEFAULT_DIRECT_BUFFER_SIZE); - } - - /** - * Sets input data for compression. - * This should be called whenever #needsInput() returns - * true indicating that more input data is required. - * - * @param b Input data - * @param off Start offset - * @param len Length - */ - @Override - public synchronized void setInput(byte[] b, int off, int len) { - if (b == null) { - throw new NullPointerException(); - } - if (off < 0 || len < 0 || off > b.length - len) { - throw new ArrayIndexOutOfBoundsException(); - } - finished = false; - - if (len > uncompressedDirectBuf.remaining()) { - // save data; now !needsInput - this.userBuf = b; - this.userBufOff = off; - this.userBufLen = len; - } else { - ((ByteBuffer) uncompressedDirectBuf).put(b, off, len); - uncompressedDirectBufLen = uncompressedDirectBuf.position(); - } - - bytesRead += len; - } - - /** - * If a write would exceed the capacity of the direct buffers, it is set - * aside to be loaded by this function while the compressed data are - * consumed. - */ - synchronized void setInputFromSavedData() { - if (0 >= userBufLen) { - return; - } - finished = false; - - uncompressedDirectBufLen = Math.min(userBufLen, directBufferSize); - ((ByteBuffer) uncompressedDirectBuf).put(userBuf, userBufOff, - uncompressedDirectBufLen); - - // Note how much data is being fed to lz4 - userBufOff += uncompressedDirectBufLen; - userBufLen -= uncompressedDirectBufLen; - } - - /** - * Does nothing. - */ - @Override - public synchronized void setDictionary(byte[] b, int off, int len) { - // do nothing - } - - /** - * Returns true if the input data buffer is empty and - * #setInput() should be called to provide more input. - * - * @return true if the input data buffer is empty and - * #setInput() should be called in order to provide more input. - */ - @Override - public synchronized boolean needsInput() { - return !(compressedDirectBuf.remaining() > 0 - || uncompressedDirectBuf.remaining() == 0 || userBufLen > 0); - } - - /** - * When called, indicates that compression should end - * with the current contents of the input buffer. - */ - @Override - public synchronized void finish() { - finish = true; - } - - /** - * Returns true if the end of the compressed - * data output stream has been reached. - * - * @return true if the end of the compressed - * data output stream has been reached. - */ - @Override - public synchronized boolean finished() { - // Check if all uncompressed data has been consumed - return (finish && finished && compressedDirectBuf.remaining() == 0); - } - - /** - * Fills specified buffer with compressed data. Returns actual number - * of bytes of compressed data. A return value of 0 indicates that - * needsInput() should be called in order to determine if more input - * data is required. - * - * @param b Buffer for the compressed data - * @param off Start offset of the data - * @param len Size of the buffer - * @return The actual number of bytes of compressed data. - */ - @Override - public synchronized int compress(byte[] b, int off, int len) - throws IOException { - throw new UnsupportedOperationException("LZO block compressor is not supported"); - } - - /** - * Resets compressor so that a new set of input data can be processed. - */ - @Override - public synchronized void reset() { - finish = false; - finished = false; - uncompressedDirectBuf.clear(); - uncompressedDirectBufLen = 0; - compressedDirectBuf.clear(); - compressedDirectBuf.limit(0); - userBufOff = userBufLen = 0; - bytesRead = bytesWritten = 0L; - } - - /** - * Prepare the compressor to be used in a new stream with settings defined in - * the given Configuration - * - * @param conf Configuration from which new setting are fetched - */ - @Override - public synchronized void reinit(Configuration conf) { - reset(); - } - - /** - * Return number of bytes given to this compressor since last reset. - */ - @Override - public synchronized long getBytesRead() { - return bytesRead; - } - - /** - * Return number of bytes consumed by callers of compress since last reset. - */ - @Override - public synchronized long getBytesWritten() { - return bytesWritten; - } - - /** - * Closes the compressor and discards any unprocessed input. - */ - @Override - public synchronized void end() { - } -} diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/lzo/LzopDecompressor.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/lzo/LzopDecompressor.java deleted file mode 100644 index bd704af006e90..0000000000000 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/lzo/LzopDecompressor.java +++ /dev/null @@ -1,220 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.io.compress.lzo; - -import java.io.IOException; -import java.nio.Buffer; -import java.nio.ByteBuffer; - -import org.apache.hadoop.io.compress.Decompressor; -import org.apache.hadoop.io.compress.lz4.Lz4Compressor; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class LzopDecompressor implements Decompressor { - private static final Logger LOG = - LoggerFactory.getLogger(Lz4Compressor.class.getName()); - private static final int DEFAULT_DIRECT_BUFFER_SIZE = 64 * 1024; - - private int directBufferSize; - private Buffer compressedDirectBuf = null; - private int compressedDirectBufLen; - private Buffer uncompressedDirectBuf = null; - private byte[] userBuf = null; - private int userBufOff = 0, userBufLen = 0; - private boolean finished; - - /** - * Creates a new compressor. - * - * @param directBufferSize size of the direct buffer to be used. - */ - public LzopDecompressor(int directBufferSize) { - this.directBufferSize = directBufferSize; - - compressedDirectBuf = ByteBuffer.allocateDirect(directBufferSize); - uncompressedDirectBuf = ByteBuffer.allocateDirect(directBufferSize); - uncompressedDirectBuf.position(directBufferSize); - } - - /** - * Creates a new decompressor with the default buffer size. - */ - public LzopDecompressor() { - this(DEFAULT_DIRECT_BUFFER_SIZE); - } - - /** - * Sets input data for decompression. - * This should be called if and only if {@link #needsInput()} returns - * true indicating that more input data is required. - * (Both native and non-native versions of various Decompressors require - * that the data passed in via b[] remain unmodified until - * the caller is explicitly notified--via {@link #needsInput()}--that the - * buffer may be safely modified. With this requirement, an extra - * buffer-copy can be avoided.) - * - * @param b Input data - * @param off Start offset - * @param len Length - */ - @Override - public synchronized void setInput(byte[] b, int off, int len) { - if (b == null) { - throw new NullPointerException(); - } - if (off < 0 || len < 0 || off > b.length - len) { - throw new ArrayIndexOutOfBoundsException(); - } - - this.userBuf = b; - this.userBufOff = off; - this.userBufLen = len; - - setInputFromSavedData(); - - // Reinitialize lz4's output direct-buffer - uncompressedDirectBuf.limit(directBufferSize); - uncompressedDirectBuf.position(directBufferSize); - } - - /** - * If a write would exceed the capacity of the direct buffers, it is set - * aside to be loaded by this function while the compressed data are - * consumed. - */ - synchronized void setInputFromSavedData() { - compressedDirectBufLen = Math.min(userBufLen, directBufferSize); - - // Reinitialize lz4's input direct buffer - compressedDirectBuf.rewind(); - ((ByteBuffer) compressedDirectBuf).put(userBuf, userBufOff, - compressedDirectBufLen); - - // Note how much data is being fed to lz4 - userBufOff += compressedDirectBufLen; - userBufLen -= compressedDirectBufLen; - } - - /** - * Does nothing. - */ - @Override - public synchronized void setDictionary(byte[] b, int off, int len) { - // do nothing - } - - /** - * Returns true if the input data buffer is empty and - * {@link #setInput(byte[], int, int)} should be called to - * provide more input. - * - * @return true if the input data buffer is empty and - * {@link #setInput(byte[], int, int)} should be called in - * order to provide more input. - */ - @Override - public synchronized boolean needsInput() { - // Consume remaining compressed data? - if (uncompressedDirectBuf.remaining() > 0) { - return false; - } - - // Check if lz4 has consumed all input - if (compressedDirectBufLen <= 0) { - // Check if we have consumed all user-input - if (userBufLen <= 0) { - return true; - } else { - setInputFromSavedData(); - } - } - - return false; - } - - /** - * Returns false. - * - * @return false. - */ - @Override - public synchronized boolean needsDictionary() { - return false; - } - - /** - * Returns true if the end of the decompressed - * data output stream has been reached. - * - * @return true if the end of the decompressed - * data output stream has been reached. - */ - @Override - public synchronized boolean finished() { - return (finished && uncompressedDirectBuf.remaining() == 0); - } - - /** - * Fills specified buffer with uncompressed data. Returns actual number - * of bytes of uncompressed data. A return value of 0 indicates that - * {@link #needsInput()} should be called in order to determine if more - * input data is required. - * - * @param b Buffer for the compressed data - * @param off Start offset of the data - * @param len Size of the buffer - * @return The actual number of bytes of compressed data. - * @throws IOException - */ - @Override - public synchronized int decompress(byte[] b, int off, int len) - throws IOException { - throw new UnsupportedOperationException("LZO block decompressor is not supported"); - } - - /** - * Returns 0. - * - * @return 0. - */ - @Override - public synchronized int getRemaining() { - // Never use this function in BlockDecompressorStream. - return 0; - } - - @Override - public synchronized void reset() { - finished = false; - compressedDirectBufLen = 0; - uncompressedDirectBuf.limit(directBufferSize); - uncompressedDirectBuf.position(directBufferSize); - userBufOff = userBufLen = 0; - } - - /** - * Resets decompressor and input and output buffers so that a new set of - * input data can be processed. - */ - @Override - public synchronized void end() { - // do nothing - } -} diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/TestCodec.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/TestCodec.java index ae2d42297834c..aefc8b12d6afb 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/TestCodec.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/compress/TestCodec.java @@ -176,12 +176,6 @@ public void testLzoCodec() throws IOException { codecTest(conf, seed, count, "org.apache.hadoop.io.compress.LzoCodec"); } - @Test - public void testLzopCodec() throws IOException { - codecTest(conf, seed, 0, "org.apache.hadoop.io.compress.LzopCodec"); - codecTest(conf, seed, count, "org.apache.hadoop.io.compress.LzopCodec"); - } - @Test public void testGzipCodecWithParam() throws IOException { Configuration conf = new Configuration(this.conf); From e7d98ab66585cbae11162ae605bf001839705be0 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Tue, 2 Nov 2021 22:24:52 -0700 Subject: [PATCH 5/5] Use hadoop-lzo 0.4.20. --- hadoop-common-project/hadoop-common/pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hadoop-common-project/hadoop-common/pom.xml b/hadoop-common-project/hadoop-common/pom.xml index 7cf9e741dd641..12d4fb7fa2859 100644 --- a/hadoop-common-project/hadoop-common/pom.xml +++ b/hadoop-common-project/hadoop-common/pom.xml @@ -383,7 +383,7 @@ com.hadoop.gplcompression hadoop-lzo - 0.4.21-SNAPSHOT + 0.4.20 test