Skip to content

Commit

Permalink
ARROW-11301: [C++] Fix reading Parquet LZ4-compressed files produced …
Browse files Browse the repository at this point in the history
…by Hadoop

The compatibility code was incomplete as it assumed a single "frame" per data page.
It turns out that Hadoop tries to compress data in blocks of 128 kiB.

Closes #9244 from pitrou/ARROW-9177-lz4-hadoop-frames

Authored-by: Antoine Pitrou <antoine@python.org>
Signed-off-by: Krisztián Szűcs <szucs.krisztian@gmail.com>
  • Loading branch information
pitrou authored and kszucs committed Jan 18, 2021
1 parent 348a40b commit 1b59a7a
Show file tree
Hide file tree
Showing 3 changed files with 73 additions and 27 deletions.
66 changes: 40 additions & 26 deletions cpp/src/arrow/util/compression_lz4.cc
Original file line number Diff line number Diff line change
Expand Up @@ -417,38 +417,52 @@ class Lz4HadoopCodec : public Lz4Codec {

int64_t TryDecompressHadoop(int64_t input_len, const uint8_t* input,
int64_t output_buffer_len, uint8_t* output_buffer) {
// Parquet files written with the Hadoop Lz4Codec contain at the beginning
// of the input buffer two uint32_t's representing (in this order) expected
// decompressed size in bytes and expected compressed size in bytes.
// Parquet files written with the Hadoop Lz4Codec use their own framing.
// The input buffer can contain an arbitrary number of "frames", each
// with the following structure:
// - bytes 0..3: big-endian uint32_t representing the frame decompressed size
// - bytes 4..7: big-endian uint32_t representing the frame compressed size
// - bytes 8...: frame compressed data
//
// The Hadoop Lz4Codec source code can be found here:
// https://github.com/apache/hadoop/blob/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/codec/Lz4Codec.cc
if (input_len < kPrefixLength) {
return kNotHadoop;
}

const uint32_t expected_decompressed_size =
BitUtil::FromBigEndian(SafeLoadAs<uint32_t>(input));
const uint32_t expected_compressed_size =
BitUtil::FromBigEndian(SafeLoadAs<uint32_t>(input + sizeof(uint32_t)));
const int64_t lz4_compressed_buffer_size = input_len - kPrefixLength;

// We use a heuristic to determine if the parquet file being read
// was compressed using the Hadoop Lz4Codec.
if (lz4_compressed_buffer_size == expected_compressed_size) {
// Parquet file was likely compressed with Hadoop Lz4Codec.
auto maybe_decompressed_size =
Lz4Codec::Decompress(lz4_compressed_buffer_size, input + kPrefixLength,
output_buffer_len, output_buffer);

if (maybe_decompressed_size.ok() &&
*maybe_decompressed_size == expected_decompressed_size) {
return *maybe_decompressed_size;
int64_t total_decompressed_size = 0;

while (input_len >= kPrefixLength) {
const uint32_t expected_decompressed_size =
BitUtil::FromBigEndian(SafeLoadAs<uint32_t>(input));
const uint32_t expected_compressed_size =
BitUtil::FromBigEndian(SafeLoadAs<uint32_t>(input + sizeof(uint32_t)));
input += kPrefixLength;
input_len -= kPrefixLength;

if (input_len < expected_compressed_size) {
// Not enough bytes for Hadoop "frame"
return kNotHadoop;
}
if (output_buffer_len < expected_decompressed_size) {
// Not enough bytes to hold advertised output => probably not Hadoop
return kNotHadoop;
}
// Try decompressing and compare with expected decompressed length
auto maybe_decompressed_size = Lz4Codec::Decompress(
expected_compressed_size, input, output_buffer_len, output_buffer);
if (!maybe_decompressed_size.ok() ||
*maybe_decompressed_size != expected_decompressed_size) {
return kNotHadoop;
}
input += expected_compressed_size;
input_len -= expected_compressed_size;
output_buffer += expected_decompressed_size;
output_buffer_len -= expected_decompressed_size;
total_decompressed_size += expected_decompressed_size;
}

// Parquet file was compressed without Hadoop Lz4Codec (or data is corrupt)
return kNotHadoop;
if (input_len == 0) {
return total_decompressed_size;
} else {
return kNotHadoop;
}
}
};

Expand Down
32 changes: 32 additions & 0 deletions cpp/src/parquet/reader_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,11 @@ std::string non_hadoop_lz4_compressed() {
return data_file("non_hadoop_lz4_compressed.parquet");
}

// Larger data compressed using custom Hadoop LZ4 format (several frames)
std::string hadoop_lz4_compressed_larger() {
return data_file("hadoop_lz4_compressed_larger.parquet");
}

// TODO: Assert on definition and repetition levels
template <typename DType, typename ValueType>
void AssertColumnValues(std::shared_ptr<TypedColumnReader<DType>> col, int64_t batch_size,
Expand Down Expand Up @@ -553,6 +558,33 @@ TEST_P(TestCodec, FileMetadataAndValues) {
INSTANTIATE_TEST_SUITE_P(Lz4CodecTests, TestCodec,
::testing::Values(hadoop_lz4_compressed(),
non_hadoop_lz4_compressed()));

TEST(TestLz4HadoopCodec, TestSeveralFrames) {
// ARROW-9177: Hadoop can compress a data block in several LZ4 "frames"
auto file = ParquetFileReader::OpenFile(hadoop_lz4_compressed_larger());
auto group = file->RowGroup(0);

const int64_t kNumRows = 10000;

ASSERT_EQ(kNumRows, file->metadata()->num_rows());
ASSERT_EQ(1, file->metadata()->num_columns());
ASSERT_EQ(1, file->metadata()->num_row_groups());
ASSERT_EQ(kNumRows, group->metadata()->num_rows());

// column 0 ("a")
auto col = checked_pointer_cast<ByteArrayReader>(group->Column(0));

std::vector<ByteArray> values(kNumRows);
int64_t values_read;
auto levels_read =
col->ReadBatch(kNumRows, nullptr, nullptr, values.data(), &values_read);
ASSERT_EQ(kNumRows, levels_read);
ASSERT_EQ(kNumRows, values_read);
ASSERT_EQ(values[0], ByteArray("c7ce6bef-d5b0-4863-b199-8ea8c7fb117b"));
ASSERT_EQ(values[1], ByteArray("e8fb9197-cb9f-4118-b67f-fbfa65f61843"));
ASSERT_EQ(values[kNumRows - 2], ByteArray("ab52a0cc-c6bb-4d61-8a8f-166dc4b8b13c"));
ASSERT_EQ(values[kNumRows - 1], ByteArray("85440778-460a-41ac-aa2e-ac3ee41696bf"));
}
#endif

} // namespace parquet
2 changes: 1 addition & 1 deletion cpp/submodules/parquet-testing

0 comments on commit 1b59a7a

Please sign in to comment.