Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Increase test coverage of ArrowWriter #2220

Merged
merged 1 commit into from
Jul 29, 2022
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
187 changes: 128 additions & 59 deletions parquet/src/arrow/arrow_writer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -756,7 +756,9 @@ mod tests {
use arrow::{array::*, buffer::Buffer};

use crate::arrow::{ArrowReader, ParquetFileArrowReader};
use crate::basic::Encoding;
use crate::file::metadata::ParquetMetaData;
use crate::file::properties::WriterVersion;
use crate::file::{
reader::{FileReader, SerializedFileReader},
statistics::Statistics,
Expand Down Expand Up @@ -1226,17 +1228,31 @@ mod tests {

const SMALL_SIZE: usize = 7;

fn roundtrip(expected_batch: RecordBatch, max_row_group_size: Option<usize>) -> File {
fn roundtrip(
expected_batch: RecordBatch,
max_row_group_size: Option<usize>,
) -> Vec<File> {
let mut files = vec![];
for version in [WriterVersion::PARQUET_1_0, WriterVersion::PARQUET_2_0] {
let mut props = WriterProperties::builder().set_writer_version(version);

if let Some(size) = max_row_group_size {
props = props.set_max_row_group_size(size)
}

let props = props.build();
files.push(roundtrip_opts(&expected_batch, props))
}
files
}

fn roundtrip_opts(expected_batch: &RecordBatch, props: WriterProperties) -> File {
let file = tempfile::tempfile().unwrap();

let mut writer = ArrowWriter::try_new(
file.try_clone().unwrap(),
expected_batch.schema(),
max_row_group_size.map(|size| {
WriterProperties::builder()
.set_max_row_group_size(size)
.build()
}),
Some(props),
)
.expect("Unable to write file");
writer.write(&expected_batch).unwrap();
Expand Down Expand Up @@ -1264,20 +1280,59 @@ mod tests {
file
}

fn one_column_roundtrip(
fn one_column_roundtrip(values: ArrayRef, nullable: bool) -> Vec<File> {
let data_type = values.data_type().clone();
let schema = Schema::new(vec![Field::new("col", data_type, nullable)]);
one_column_roundtrip_with_schema(values, Arc::new(schema))
}

fn one_column_roundtrip_with_schema(
values: ArrayRef,
nullable: bool,
max_row_group_size: Option<usize>,
) -> File {
let schema = Schema::new(vec![Field::new(
"col",
values.data_type().clone(),
nullable,
)]);
let expected_batch =
RecordBatch::try_new(Arc::new(schema), vec![values]).unwrap();
schema: SchemaRef,
) -> Vec<File> {
let encodings = match values.data_type() {
DataType::Utf8
| DataType::LargeUtf8
| DataType::Binary
| DataType::LargeBinary => vec![
Encoding::PLAIN,
Encoding::DELTA_BYTE_ARRAY,
Encoding::DELTA_LENGTH_BYTE_ARRAY,
],
DataType::Int64
| DataType::Int32
| DataType::Int16
| DataType::Int8
| DataType::UInt64
| DataType::UInt32
| DataType::UInt16
| DataType::UInt8 => vec![Encoding::PLAIN, Encoding::DELTA_BINARY_PACKED],
_ => vec![Encoding::PLAIN],
};

let expected_batch = RecordBatch::try_new(schema, vec![values]).unwrap();

let row_group_sizes = [1024, SMALL_SIZE, SMALL_SIZE / 2, SMALL_SIZE / 2 + 1, 10];

let mut files = vec![];
for dictionary_size in [0, 1, 1024] {
for encoding in &encodings {
for version in [WriterVersion::PARQUET_1_0, WriterVersion::PARQUET_2_0] {
for row_group_size in row_group_sizes {
let props = WriterProperties::builder()
.set_writer_version(version)
.set_max_row_group_size(row_group_size)
.set_dictionary_enabled(dictionary_size != 0)
.set_dictionary_pagesize_limit(dictionary_size.max(1))
.set_encoding(*encoding)
.build();

roundtrip(expected_batch, max_row_group_size)
files.push(roundtrip_opts(&expected_batch, props))
}
}
}
}
files
}

fn values_required<A, I>(iter: I)
Expand All @@ -1287,7 +1342,7 @@ mod tests {
{
let raw_values: Vec<_> = iter.into_iter().collect();
let values = Arc::new(A::from(raw_values));
one_column_roundtrip(values, false, Some(SMALL_SIZE / 2));
one_column_roundtrip(values, false);
}

fn values_optional<A, I>(iter: I)
Expand All @@ -1301,7 +1356,7 @@ mod tests {
.map(|(i, v)| if i % 2 == 0 { None } else { Some(v) })
.collect();
let optional_values = Arc::new(A::from(optional_raw_values));
one_column_roundtrip(optional_values, true, Some(SMALL_SIZE / 2));
one_column_roundtrip(optional_values, true);
}

fn required_and_optional<A, I>(iter: I)
Expand All @@ -1316,12 +1371,12 @@ mod tests {
#[test]
fn all_null_primitive_single_column() {
let values = Arc::new(Int32Array::from(vec![None; SMALL_SIZE]));
one_column_roundtrip(values, true, Some(SMALL_SIZE / 2));
one_column_roundtrip(values, true);
}
#[test]
fn null_single_column() {
let values = Arc::new(NullArray::new(SMALL_SIZE));
one_column_roundtrip(values, true, Some(SMALL_SIZE / 2));
one_column_roundtrip(values, true);
// null arrays are always nullable, a test with non-nullable nulls fails
}

Expand Down Expand Up @@ -1417,31 +1472,31 @@ mod tests {
let raw_values: Vec<_> = (0..SMALL_SIZE as i64).collect();
let values = Arc::new(TimestampSecondArray::from_vec(raw_values, None));

one_column_roundtrip(values, false, Some(3));
one_column_roundtrip(values, false);
}

#[test]
fn timestamp_millisecond_single_column() {
let raw_values: Vec<_> = (0..SMALL_SIZE as i64).collect();
let values = Arc::new(TimestampMillisecondArray::from_vec(raw_values, None));

one_column_roundtrip(values, false, Some(SMALL_SIZE / 2 + 1));
one_column_roundtrip(values, false);
}

#[test]
fn timestamp_microsecond_single_column() {
let raw_values: Vec<_> = (0..SMALL_SIZE as i64).collect();
let values = Arc::new(TimestampMicrosecondArray::from_vec(raw_values, None));

one_column_roundtrip(values, false, Some(SMALL_SIZE / 2 + 2));
one_column_roundtrip(values, false);
}

#[test]
fn timestamp_nanosecond_single_column() {
let raw_values: Vec<_> = (0..SMALL_SIZE as i64).collect();
let values = Arc::new(TimestampNanosecondArray::from_vec(raw_values, None));

one_column_roundtrip(values, false, Some(SMALL_SIZE / 2));
one_column_roundtrip(values, false);
}

#[test]
Expand Down Expand Up @@ -1548,7 +1603,7 @@ mod tests {
builder.append_value(b"1112").unwrap();
let array = Arc::new(builder.finish());

one_column_roundtrip(array, true, Some(SMALL_SIZE / 2));
one_column_roundtrip(array, true);
}

#[test]
Expand Down Expand Up @@ -1626,7 +1681,7 @@ mod tests {
let a = ListArray::from(a_list_data);
let values = Arc::new(a);

one_column_roundtrip(values, true, Some(SMALL_SIZE / 2));
one_column_roundtrip(values, true);
}

#[test]
Expand All @@ -1652,7 +1707,7 @@ mod tests {
let a = LargeListArray::from(a_list_data);
let values = Arc::new(a);

one_column_roundtrip(values, true, Some(SMALL_SIZE / 2));
one_column_roundtrip(values, true);
}

#[test]
Expand All @@ -1668,10 +1723,10 @@ mod tests {
];

let list = ListArray::from_iter_primitive::<Int32Type, _, _>(data.clone());
one_column_roundtrip(Arc::new(list), true, Some(SMALL_SIZE / 2));
one_column_roundtrip(Arc::new(list), true);

let list = LargeListArray::from_iter_primitive::<Int32Type, _, _>(data);
one_column_roundtrip(Arc::new(list), true, Some(SMALL_SIZE / 2));
one_column_roundtrip(Arc::new(list), true);
}

#[test]
Expand All @@ -1681,7 +1736,7 @@ mod tests {
let s = StructArray::from(vec![(struct_field_a, Arc::new(a_values) as ArrayRef)]);

let values = Arc::new(s);
one_column_roundtrip(values, false, Some(SMALL_SIZE / 2));
one_column_roundtrip(values, false);
}

#[test]
Expand All @@ -1702,9 +1757,7 @@ mod tests {
.collect();

// build a record batch
let expected_batch = RecordBatch::try_new(schema, vec![Arc::new(d)]).unwrap();

roundtrip(expected_batch, Some(SMALL_SIZE / 2));
one_column_roundtrip_with_schema(Arc::new(d), schema);
}

#[test]
Expand All @@ -1728,10 +1781,7 @@ mod tests {
builder.append(12345678).unwrap();
let d = builder.finish();

// build a record batch
let expected_batch = RecordBatch::try_new(schema, vec![Arc::new(d)]).unwrap();

roundtrip(expected_batch, Some(SMALL_SIZE / 2));
one_column_roundtrip_with_schema(Arc::new(d), schema);
}

#[test]
Expand All @@ -1751,80 +1801,98 @@ mod tests {
.copied()
.collect();

// build a record batch
let expected_batch = RecordBatch::try_new(schema, vec![Arc::new(d)]).unwrap();

roundtrip(expected_batch, Some(SMALL_SIZE / 2));
one_column_roundtrip_with_schema(Arc::new(d), schema);
}

#[test]
fn u32_min_max() {
// check values roundtrip through parquet
let values = Arc::new(UInt32Array::from_iter_values(vec![
let src = vec![
u32::MIN,
u32::MIN + 1,
(i32::MAX as u32) - 1,
i32::MAX as u32,
(i32::MAX as u32) + 1,
u32::MAX - 1,
u32::MAX,
]));
let file = one_column_roundtrip(values, false, None);
];
let values = Arc::new(UInt32Array::from_iter_values(src.iter().cloned()));
let files = one_column_roundtrip(values, false);

for file in files {
// check statistics are valid
let reader = SerializedFileReader::new(file).unwrap();
let metadata = reader.metadata();
assert_eq!(metadata.num_row_groups(), 1);
let row_group = metadata.row_group(0);

let mut row_offset = 0;
for row_group in metadata.row_groups() {
assert_eq!(row_group.num_columns(), 1);
let column = row_group.column(0);

let num_values = column.num_values() as usize;
let src_slice = &src[row_offset..row_offset + num_values];
row_offset += column.num_values() as usize;

let stats = column.statistics().unwrap();
assert!(stats.has_min_max_set());
if let Statistics::Int32(stats) = stats {
assert_eq!(*stats.min() as u32, u32::MIN);
assert_eq!(*stats.max() as u32, u32::MAX);
assert_eq!(*stats.min() as u32, *src_slice.iter().min().unwrap());
assert_eq!(*stats.max() as u32, *src_slice.iter().max().unwrap());
} else {
panic!("Statistics::Int32 missing")
}
}
}
}

#[test]
fn u64_min_max() {
// check values roundtrip through parquet
let values = Arc::new(UInt64Array::from_iter_values(vec![
let src = vec![
u64::MIN,
u64::MIN + 1,
(i64::MAX as u64) - 1,
i64::MAX as u64,
(i64::MAX as u64) + 1,
u64::MAX - 1,
u64::MAX,
]));
let file = one_column_roundtrip(values, false, None);
];
let values = Arc::new(UInt64Array::from_iter_values(src.iter().cloned()));
let files = one_column_roundtrip(values, false);

for file in files {
// check statistics are valid
let reader = SerializedFileReader::new(file).unwrap();
let metadata = reader.metadata();
assert_eq!(metadata.num_row_groups(), 1);
let row_group = metadata.row_group(0);

let mut row_offset = 0;
for row_group in metadata.row_groups() {
assert_eq!(row_group.num_columns(), 1);
let column = row_group.column(0);

let num_values = column.num_values() as usize;
let src_slice = &src[row_offset..row_offset + num_values];
row_offset += column.num_values() as usize;

let stats = column.statistics().unwrap();
assert!(stats.has_min_max_set());
if let Statistics::Int64(stats) = stats {
assert_eq!(*stats.min() as u64, u64::MIN);
assert_eq!(*stats.max() as u64, u64::MAX);
assert_eq!(*stats.min() as u64, *src_slice.iter().min().unwrap());
assert_eq!(*stats.max() as u64, *src_slice.iter().max().unwrap());
} else {
panic!("Statistics::Int64 missing")
}
}
}
}

#[test]
fn statistics_null_counts_only_nulls() {
// check that null-count statistics for "only NULL"-columns are correct
let values = Arc::new(UInt64Array::from(vec![None, None]));
let file = one_column_roundtrip(values, true, None);
let files = one_column_roundtrip(values, true);

for file in files {
// check statistics are valid
let reader = SerializedFileReader::new(file).unwrap();
let metadata = reader.metadata();
Expand All @@ -1835,6 +1903,7 @@ mod tests {
let stats = column.statistics().unwrap();
assert_eq!(stats.null_count(), 2);
}
}

#[test]
fn test_list_of_struct_roundtrip() {
Expand Down Expand Up @@ -1923,7 +1992,7 @@ mod tests {

let array = Arc::new(list_builder.finish());

one_column_roundtrip(array, true, Some(10));
one_column_roundtrip(array, true);
}

fn row_group_sizes(metadata: &ParquetMetaData) -> Vec<i64> {
Expand Down