Skip to content

Commit

Permalink
Consolidate arrow ipc tests and increase coverage (#3427)
Browse files Browse the repository at this point in the history
* Consolidate arrow ipc tests and increase coverage

* fix fmt
  • Loading branch information
alamb authored Jan 5, 2023
1 parent e256e3d commit 81abc1a
Show file tree
Hide file tree
Showing 5 changed files with 286 additions and 453 deletions.
7 changes: 7 additions & 0 deletions arrow-integration-testing/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,13 @@ pub fn read_json_file(json_name: &str) -> Result<ArrowFile> {
}

/// Read gzipped JSON test file
///
/// For example given the input:
/// version = `0.17.1`
/// path = `generated_union`
///
/// Returns the contents of
/// `arrow-ipc-stream/integration/0.17.1/generated_union.json.gz`
pub fn read_gzip_json(version: &str, path: &str) -> ArrowJson {
use flate2::read::GzDecoder;
use std::io::Read;
Expand Down
278 changes: 103 additions & 175 deletions arrow-integration-testing/tests/ipc_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,18 @@
// specific language governing permissions and limitations
// under the License.

//! Tests for reading the content of [`FileReader`] and [`StreamReader`]
//! in `testing/arrow-ipc-stream/integration/...`

use arrow::ipc::reader::{FileReader, StreamReader};
use arrow::util::test_util::arrow_test_data;
use arrow_integration_testing::read_gzip_json;
use std::fs::File;

#[test]
fn read_generated_files_014() {
fn read_0_1_4() {
let testdata = arrow_test_data();
let version = "0.14.1";
// the test is repetitive, thus we can read all supported files at once
let paths = vec![
"generated_interval",
"generated_datetime",
Expand All @@ -37,51 +39,42 @@ fn read_generated_files_014() {
"generated_decimal",
];
paths.iter().for_each(|path| {
let file = File::open(format!(
"{}/arrow-ipc-stream/integration/{}/{}.arrow_file",
testdata, version, path
))
.unwrap();

let mut reader = FileReader::try_new(file, None).unwrap();
verify_arrow_file(&testdata, version, path);
verify_arrow_stream(&testdata, version, path);
});
}

// read expected JSON output
let arrow_json = read_gzip_json(version, path);
assert!(arrow_json.equals_reader(&mut reader).unwrap());
#[test]
fn read_0_1_7() {
let testdata = arrow_test_data();
let version = "0.17.1";
let paths = vec!["generated_union"];
paths.iter().for_each(|path| {
verify_arrow_file(&testdata, version, path);
verify_arrow_stream(&testdata, version, path);
});
}

#[test]
#[should_panic(expected = "Big Endian is not supported for Decimal!")]
fn read_decimal_be_file_should_panic() {
fn read_1_0_0_bigendian_decimal_should_panic() {
let testdata = arrow_test_data();
let file = File::open(format!(
"{}/arrow-ipc-stream/integration/1.0.0-bigendian/generated_decimal.arrow_file",
testdata
))
.unwrap();
FileReader::try_new(file, None).unwrap();
verify_arrow_file(&testdata, "1.0.0-bigendian", "generated_decimal");
}

#[test]
#[should_panic(
expected = "Last offset 687865856 of Utf8 is larger than values length 41"
)]
fn read_dictionary_be_not_implemented() {
fn read_1_0_0_bigendian_dictionary_should_panic() {
// The offsets are not translated for big-endian files
// https://github.com/apache/arrow-rs/issues/859
let testdata = arrow_test_data();
let file = File::open(format!(
"{}/arrow-ipc-stream/integration/1.0.0-bigendian/generated_dictionary.arrow_file",
testdata
))
.unwrap();
FileReader::try_new(file, None).unwrap();
verify_arrow_file(&testdata, "1.0.0-bigendian", "generated_dictionary");
}

#[test]
fn read_generated_be_files_should_work() {
// complementary to the previous test
fn read_1_0_0_bigendian() {
let testdata = arrow_test_data();
let paths = vec![
"generated_interval",
Expand All @@ -102,163 +95,119 @@ fn read_generated_be_files_should_work() {
.unwrap();

FileReader::try_new(file, None).unwrap();

// While the the reader doesn't error but the values are not read correctly
// so verifing the contents fails
//verify_arrow_file(&testdata, "1.0.0-bigendian", path);
});
}

#[test]
fn projection_should_work() {
// complementary to the previous test
fn read_1_0_0_littleendian() {
let testdata = arrow_test_data();
let version = "1.0.0-littleendian";
let paths = vec![
"generated_interval",
"generated_datetime",
"generated_custom_metadata",
"generated_decimal",
"generated_decimal256",
"generated_dictionary",
"generated_dictionary_unsigned",
"generated_duplicate_fieldnames",
"generated_extension",
"generated_interval",
"generated_map",
// fails with
// thread 'read_1_0_0_littleendian' panicked at 'assertion failed: `(left == right)`
//"generated_map_non_canonical",
"generated_nested",
"generated_null_trivial",
"generated_nested_dictionary",
"generated_nested_large_offsets",
"generated_null",
"generated_null_trivial",
"generated_primitive",
"generated_primitive_large_offsets",
"generated_primitive_no_batches",
"generated_primitive_zerolength",
"generated_primitive",
"generated_recursive_nested",
"generated_union",
];
paths.iter().for_each(|path| {
// We must use littleendian files here.
// The offsets are not translated for big-endian files
// https://github.com/apache/arrow-rs/issues/859
let file = File::open(format!(
"{}/arrow-ipc-stream/integration/1.0.0-littleendian/{}.arrow_file",
testdata, path
))
.unwrap();

let reader = FileReader::try_new(file, Some(vec![0])).unwrap();
let datatype_0 = reader.schema().fields()[0].data_type().clone();
reader.for_each(|batch| {
let batch = batch.unwrap();
assert_eq!(batch.columns().len(), 1);
assert_eq!(datatype_0, batch.schema().fields()[0].data_type().clone());
});
verify_arrow_file(&testdata, version, path);
verify_arrow_stream(&testdata, version, path);
});
}

#[test]
fn read_generated_streams_014() {
fn read_2_0_0_compression() {
let testdata = arrow_test_data();
let version = "0.14.1";
let version = "2.0.0-compression";

// the test is repetitive, thus we can read all supported files at once
let paths = vec![
"generated_interval",
"generated_datetime",
"generated_dictionary",
"generated_map",
"generated_nested",
"generated_primitive_no_batches",
"generated_primitive_zerolength",
"generated_primitive",
"generated_decimal",
];
let paths = vec!["generated_lz4", "generated_zstd"];
paths.iter().for_each(|path| {
let file = File::open(format!(
"{}/arrow-ipc-stream/integration/{}/{}.stream",
testdata, version, path
))
.unwrap();

let mut reader = StreamReader::try_new(file, None).unwrap();

// read expected JSON output
let arrow_json = read_gzip_json(version, path);
assert!(arrow_json.equals_reader(&mut reader).unwrap());
// the next batch must be empty
assert!(reader.next().is_none());
// the stream must indicate that it's finished
assert!(reader.is_finished());
verify_arrow_file(&testdata, version, path);
verify_arrow_stream(&testdata, version, path);
});
}

#[test]
fn read_generated_files_100() {
let testdata = arrow_test_data();
let version = "1.0.0-littleendian";
// the test is repetitive, thus we can read all supported files at once
let paths = vec![
"generated_interval",
"generated_datetime",
"generated_dictionary",
"generated_map",
// "generated_map_non_canonical",
"generated_nested",
"generated_null_trivial",
"generated_null",
"generated_primitive_no_batches",
"generated_primitive_zerolength",
"generated_primitive",
];
paths.iter().for_each(|path| {
let file = File::open(format!(
"{}/arrow-ipc-stream/integration/{}/{}.arrow_file",
testdata, version, path
))
.unwrap();

/// Verifies the arrow file format integration test
///
/// Input file:
/// `arrow-ipc-stream/integration/<version>/<path>.arrow_file
///
/// Verification json file
/// `arrow-ipc-stream/integration/<version>/<path>.json.gz
fn verify_arrow_file(testdata: &str, version: &str, path: &str) {
let filename = format!(
"{}/arrow-ipc-stream/integration/{}/{}.arrow_file",
testdata, version, path
);
println!("Verifying {filename}");

// Compare contents to the expected output format in JSON
{
println!(" verifying content");
let file = File::open(&filename).unwrap();
let mut reader = FileReader::try_new(file, None).unwrap();

// read expected JSON output
let arrow_json = read_gzip_json(version, path);
assert!(arrow_json.equals_reader(&mut reader).unwrap());
});
}
}

#[test]
fn read_generated_streams_100() {
let testdata = arrow_test_data();
let version = "1.0.0-littleendian";
// the test is repetitive, thus we can read all supported files at once
let paths = vec![
"generated_interval",
"generated_datetime",
"generated_dictionary",
"generated_map",
// "generated_map_non_canonical",
"generated_nested",
"generated_null_trivial",
"generated_null",
"generated_primitive_no_batches",
"generated_primitive_zerolength",
"generated_primitive",
];
paths.iter().for_each(|path| {
let file = File::open(format!(
"{}/arrow-ipc-stream/integration/{}/{}.stream",
testdata, version, path
))
.unwrap();

let mut reader = StreamReader::try_new(file, None).unwrap();

// read expected JSON output
let arrow_json = read_gzip_json(version, path);
assert!(arrow_json.equals_reader(&mut reader).unwrap());
// the next batch must be empty
assert!(reader.next().is_none());
// the stream must indicate that it's finished
assert!(reader.is_finished());
});
// Verify that projection works by selecting the first column
{
println!(" verifying projection");
let file = File::open(&filename).unwrap();
let reader = FileReader::try_new(file, Some(vec![0])).unwrap();
let datatype_0 = reader.schema().fields()[0].data_type().clone();
reader.for_each(|batch| {
let batch = batch.unwrap();
assert_eq!(batch.columns().len(), 1);
assert_eq!(datatype_0, batch.schema().fields()[0].data_type().clone());
});
}
}

#[test]
fn read_generated_streams_200() {
let testdata = arrow_test_data();
let version = "2.0.0-compression";

// the test is repetitive, thus we can read all supported files at once
let paths = vec!["generated_lz4", "generated_zstd"];
paths.iter().for_each(|path| {
let file = File::open(format!(
"{}/arrow-ipc-stream/integration/{}/{}.stream",
testdata, version, path
))
.unwrap();

/// Verifies the arrow stream integration test
///
/// Input file:
/// `arrow-ipc-stream/integration/<version>/<path>.stream
///
/// Verification json file
/// `arrow-ipc-stream/integration/<version>/<path>.json.gz
fn verify_arrow_stream(testdata: &str, version: &str, path: &str) {
let filename = format!(
"{}/arrow-ipc-stream/integration/{}/{}.stream",
testdata, version, path
);
println!("Verifying {filename}");

// Compare contents to the expected output format in JSON
{
println!(" verifying content");
let file = File::open(&filename).unwrap();
let mut reader = StreamReader::try_new(file, None).unwrap();

// read expected JSON output
Expand All @@ -268,26 +217,5 @@ fn read_generated_streams_200() {
assert!(reader.next().is_none());
// the stream must indicate that it's finished
assert!(reader.is_finished());
});
}

#[test]
fn read_generated_files_200() {
let testdata = arrow_test_data();
let version = "2.0.0-compression";
// the test is repetitive, thus we can read all supported files at once
let paths = vec!["generated_lz4", "generated_zstd"];
paths.iter().for_each(|path| {
let file = File::open(format!(
"{}/arrow-ipc-stream/integration/{}/{}.arrow_file",
testdata, version, path
))
.unwrap();

let mut reader = FileReader::try_new(file, None).unwrap();

// read expected JSON output
let arrow_json = read_gzip_json(version, path);
assert!(arrow_json.equals_reader(&mut reader).unwrap());
});
}
}
Loading

0 comments on commit 81abc1a

Please sign in to comment.