From 81abc1a942cd13a92231f4a828077ad60fdabe36 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Thu, 5 Jan 2023 06:37:25 -0500 Subject: [PATCH] Consolidate arrow ipc tests and increase coverage (#3427) * Consolidate arrow ipc tests and increase coverage * fix fmt --- arrow-integration-testing/src/lib.rs | 7 + arrow-integration-testing/tests/ipc_reader.rs | 278 +++++-------- arrow-integration-testing/tests/ipc_writer.rs | 389 ++++++++---------- arrow/Cargo.toml | 4 - arrow/tests/ipc.rs | 61 --- 5 files changed, 286 insertions(+), 453 deletions(-) delete mode 100644 arrow/tests/ipc.rs diff --git a/arrow-integration-testing/src/lib.rs b/arrow-integration-testing/src/lib.rs index 2edd0ed28389..b0c8b85afe2e 100644 --- a/arrow-integration-testing/src/lib.rs +++ b/arrow-integration-testing/src/lib.rs @@ -77,6 +77,13 @@ pub fn read_json_file(json_name: &str) -> Result { } /// Read gzipped JSON test file +/// +/// For example given the input: +/// version = `0.17.1` +/// path = `generated_union` +/// +/// Returns the contents of +/// `arrow-ipc-stream/integration/0.17.1/generated_union.json.gz` pub fn read_gzip_json(version: &str, path: &str) -> ArrowJson { use flate2::read::GzDecoder; use std::io::Read; diff --git a/arrow-integration-testing/tests/ipc_reader.rs b/arrow-integration-testing/tests/ipc_reader.rs index 778d1ee77d3f..e185634f0dd4 100644 --- a/arrow-integration-testing/tests/ipc_reader.rs +++ b/arrow-integration-testing/tests/ipc_reader.rs @@ -15,16 +15,18 @@ // specific language governing permissions and limitations // under the License. +//! Tests for reading the content of [`FileReader`] and [`StreamReader`] +//! in `testing/arrow-ipc-stream/integration/...` + use arrow::ipc::reader::{FileReader, StreamReader}; use arrow::util::test_util::arrow_test_data; use arrow_integration_testing::read_gzip_json; use std::fs::File; #[test] -fn read_generated_files_014() { +fn read_0_1_4() { let testdata = arrow_test_data(); let version = "0.14.1"; - // the test is repetitive, thus we can read all supported files at once let paths = vec![ "generated_interval", "generated_datetime", @@ -37,51 +39,42 @@ fn read_generated_files_014() { "generated_decimal", ]; paths.iter().for_each(|path| { - let file = File::open(format!( - "{}/arrow-ipc-stream/integration/{}/{}.arrow_file", - testdata, version, path - )) - .unwrap(); - - let mut reader = FileReader::try_new(file, None).unwrap(); + verify_arrow_file(&testdata, version, path); + verify_arrow_stream(&testdata, version, path); + }); +} - // read expected JSON output - let arrow_json = read_gzip_json(version, path); - assert!(arrow_json.equals_reader(&mut reader).unwrap()); +#[test] +fn read_0_1_7() { + let testdata = arrow_test_data(); + let version = "0.17.1"; + let paths = vec!["generated_union"]; + paths.iter().for_each(|path| { + verify_arrow_file(&testdata, version, path); + verify_arrow_stream(&testdata, version, path); }); } #[test] #[should_panic(expected = "Big Endian is not supported for Decimal!")] -fn read_decimal_be_file_should_panic() { +fn read_1_0_0_bigendian_decimal_should_panic() { let testdata = arrow_test_data(); - let file = File::open(format!( - "{}/arrow-ipc-stream/integration/1.0.0-bigendian/generated_decimal.arrow_file", - testdata - )) - .unwrap(); - FileReader::try_new(file, None).unwrap(); + verify_arrow_file(&testdata, "1.0.0-bigendian", "generated_decimal"); } #[test] #[should_panic( expected = "Last offset 687865856 of Utf8 is larger than values length 41" )] -fn read_dictionary_be_not_implemented() { +fn read_1_0_0_bigendian_dictionary_should_panic() { // The offsets are not translated for big-endian files // https://github.com/apache/arrow-rs/issues/859 let testdata = arrow_test_data(); - let file = File::open(format!( - "{}/arrow-ipc-stream/integration/1.0.0-bigendian/generated_dictionary.arrow_file", - testdata - )) - .unwrap(); - FileReader::try_new(file, None).unwrap(); + verify_arrow_file(&testdata, "1.0.0-bigendian", "generated_dictionary"); } #[test] -fn read_generated_be_files_should_work() { - // complementary to the previous test +fn read_1_0_0_bigendian() { let testdata = arrow_test_data(); let paths = vec![ "generated_interval", @@ -102,163 +95,119 @@ fn read_generated_be_files_should_work() { .unwrap(); FileReader::try_new(file, None).unwrap(); + + // While the the reader doesn't error but the values are not read correctly + // so verifing the contents fails + //verify_arrow_file(&testdata, "1.0.0-bigendian", path); }); } #[test] -fn projection_should_work() { - // complementary to the previous test +fn read_1_0_0_littleendian() { let testdata = arrow_test_data(); + let version = "1.0.0-littleendian"; let paths = vec![ - "generated_interval", "generated_datetime", + "generated_custom_metadata", + "generated_decimal", + "generated_decimal256", + "generated_dictionary", + "generated_dictionary_unsigned", + "generated_duplicate_fieldnames", + "generated_extension", + "generated_interval", "generated_map", + // fails with + // thread 'read_1_0_0_littleendian' panicked at 'assertion failed: `(left == right)` + //"generated_map_non_canonical", "generated_nested", - "generated_null_trivial", + "generated_nested_dictionary", + "generated_nested_large_offsets", "generated_null", + "generated_null_trivial", + "generated_primitive", + "generated_primitive_large_offsets", "generated_primitive_no_batches", "generated_primitive_zerolength", - "generated_primitive", + "generated_recursive_nested", + "generated_union", ]; paths.iter().for_each(|path| { - // We must use littleendian files here. - // The offsets are not translated for big-endian files - // https://github.com/apache/arrow-rs/issues/859 - let file = File::open(format!( - "{}/arrow-ipc-stream/integration/1.0.0-littleendian/{}.arrow_file", - testdata, path - )) - .unwrap(); - - let reader = FileReader::try_new(file, Some(vec![0])).unwrap(); - let datatype_0 = reader.schema().fields()[0].data_type().clone(); - reader.for_each(|batch| { - let batch = batch.unwrap(); - assert_eq!(batch.columns().len(), 1); - assert_eq!(datatype_0, batch.schema().fields()[0].data_type().clone()); - }); + verify_arrow_file(&testdata, version, path); + verify_arrow_stream(&testdata, version, path); }); } #[test] -fn read_generated_streams_014() { +fn read_2_0_0_compression() { let testdata = arrow_test_data(); - let version = "0.14.1"; + let version = "2.0.0-compression"; + // the test is repetitive, thus we can read all supported files at once - let paths = vec![ - "generated_interval", - "generated_datetime", - "generated_dictionary", - "generated_map", - "generated_nested", - "generated_primitive_no_batches", - "generated_primitive_zerolength", - "generated_primitive", - "generated_decimal", - ]; + let paths = vec!["generated_lz4", "generated_zstd"]; paths.iter().for_each(|path| { - let file = File::open(format!( - "{}/arrow-ipc-stream/integration/{}/{}.stream", - testdata, version, path - )) - .unwrap(); - - let mut reader = StreamReader::try_new(file, None).unwrap(); - - // read expected JSON output - let arrow_json = read_gzip_json(version, path); - assert!(arrow_json.equals_reader(&mut reader).unwrap()); - // the next batch must be empty - assert!(reader.next().is_none()); - // the stream must indicate that it's finished - assert!(reader.is_finished()); + verify_arrow_file(&testdata, version, path); + verify_arrow_stream(&testdata, version, path); }); } -#[test] -fn read_generated_files_100() { - let testdata = arrow_test_data(); - let version = "1.0.0-littleendian"; - // the test is repetitive, thus we can read all supported files at once - let paths = vec![ - "generated_interval", - "generated_datetime", - "generated_dictionary", - "generated_map", - // "generated_map_non_canonical", - "generated_nested", - "generated_null_trivial", - "generated_null", - "generated_primitive_no_batches", - "generated_primitive_zerolength", - "generated_primitive", - ]; - paths.iter().for_each(|path| { - let file = File::open(format!( - "{}/arrow-ipc-stream/integration/{}/{}.arrow_file", - testdata, version, path - )) - .unwrap(); - +/// Verifies the arrow file format integration test +/// +/// Input file: +/// `arrow-ipc-stream/integration//.arrow_file +/// +/// Verification json file +/// `arrow-ipc-stream/integration//.json.gz +fn verify_arrow_file(testdata: &str, version: &str, path: &str) { + let filename = format!( + "{}/arrow-ipc-stream/integration/{}/{}.arrow_file", + testdata, version, path + ); + println!("Verifying {filename}"); + + // Compare contents to the expected output format in JSON + { + println!(" verifying content"); + let file = File::open(&filename).unwrap(); let mut reader = FileReader::try_new(file, None).unwrap(); // read expected JSON output let arrow_json = read_gzip_json(version, path); assert!(arrow_json.equals_reader(&mut reader).unwrap()); - }); -} + } -#[test] -fn read_generated_streams_100() { - let testdata = arrow_test_data(); - let version = "1.0.0-littleendian"; - // the test is repetitive, thus we can read all supported files at once - let paths = vec![ - "generated_interval", - "generated_datetime", - "generated_dictionary", - "generated_map", - // "generated_map_non_canonical", - "generated_nested", - "generated_null_trivial", - "generated_null", - "generated_primitive_no_batches", - "generated_primitive_zerolength", - "generated_primitive", - ]; - paths.iter().for_each(|path| { - let file = File::open(format!( - "{}/arrow-ipc-stream/integration/{}/{}.stream", - testdata, version, path - )) - .unwrap(); - - let mut reader = StreamReader::try_new(file, None).unwrap(); - - // read expected JSON output - let arrow_json = read_gzip_json(version, path); - assert!(arrow_json.equals_reader(&mut reader).unwrap()); - // the next batch must be empty - assert!(reader.next().is_none()); - // the stream must indicate that it's finished - assert!(reader.is_finished()); - }); + // Verify that projection works by selecting the first column + { + println!(" verifying projection"); + let file = File::open(&filename).unwrap(); + let reader = FileReader::try_new(file, Some(vec![0])).unwrap(); + let datatype_0 = reader.schema().fields()[0].data_type().clone(); + reader.for_each(|batch| { + let batch = batch.unwrap(); + assert_eq!(batch.columns().len(), 1); + assert_eq!(datatype_0, batch.schema().fields()[0].data_type().clone()); + }); + } } -#[test] -fn read_generated_streams_200() { - let testdata = arrow_test_data(); - let version = "2.0.0-compression"; - - // the test is repetitive, thus we can read all supported files at once - let paths = vec!["generated_lz4", "generated_zstd"]; - paths.iter().for_each(|path| { - let file = File::open(format!( - "{}/arrow-ipc-stream/integration/{}/{}.stream", - testdata, version, path - )) - .unwrap(); - +/// Verifies the arrow stream integration test +/// +/// Input file: +/// `arrow-ipc-stream/integration//.stream +/// +/// Verification json file +/// `arrow-ipc-stream/integration//.json.gz +fn verify_arrow_stream(testdata: &str, version: &str, path: &str) { + let filename = format!( + "{}/arrow-ipc-stream/integration/{}/{}.stream", + testdata, version, path + ); + println!("Verifying {filename}"); + + // Compare contents to the expected output format in JSON + { + println!(" verifying content"); + let file = File::open(&filename).unwrap(); let mut reader = StreamReader::try_new(file, None).unwrap(); // read expected JSON output @@ -268,26 +217,5 @@ fn read_generated_streams_200() { assert!(reader.next().is_none()); // the stream must indicate that it's finished assert!(reader.is_finished()); - }); -} - -#[test] -fn read_generated_files_200() { - let testdata = arrow_test_data(); - let version = "2.0.0-compression"; - // the test is repetitive, thus we can read all supported files at once - let paths = vec!["generated_lz4", "generated_zstd"]; - paths.iter().for_each(|path| { - let file = File::open(format!( - "{}/arrow-ipc-stream/integration/{}/{}.arrow_file", - testdata, version, path - )) - .unwrap(); - - let mut reader = FileReader::try_new(file, None).unwrap(); - - // read expected JSON output - let arrow_json = read_gzip_json(version, path); - assert!(arrow_json.equals_reader(&mut reader).unwrap()); - }); + } } diff --git a/arrow-integration-testing/tests/ipc_writer.rs b/arrow-integration-testing/tests/ipc_writer.rs index 0aa17cd05c35..e429b5e5cb39 100644 --- a/arrow-integration-testing/tests/ipc_writer.rs +++ b/arrow-integration-testing/tests/ipc_writer.rs @@ -24,10 +24,9 @@ use std::fs::File; use std::io::Seek; #[test] -fn read_and_rewrite_generated_files_014() { +fn write_0_1_4() { let testdata = arrow_test_data(); let version = "0.14.1"; - // the test is repetitive, thus we can read all supported files at once let paths = vec![ "generated_interval", "generated_datetime", @@ -40,275 +39,239 @@ fn read_and_rewrite_generated_files_014() { "generated_decimal", ]; paths.iter().for_each(|path| { - let file = File::open(format!( - "{}/arrow-ipc-stream/integration/{}/{}.arrow_file", - testdata, version, path - )) - .unwrap(); - - let mut reader = FileReader::try_new(file, None).unwrap(); - - let mut file = tempfile::tempfile().unwrap(); - - // read and rewrite the file to a temp location - { - let mut writer = FileWriter::try_new(&mut file, &reader.schema()).unwrap(); - while let Some(Ok(batch)) = reader.next() { - writer.write(&batch).unwrap(); - } - writer.finish().unwrap(); - } - file.rewind().unwrap(); - - let mut reader = FileReader::try_new(file, None).unwrap(); - - // read expected JSON output - let arrow_json = read_gzip_json(version, path); - assert!(arrow_json.equals_reader(&mut reader).unwrap()); + roundtrip_arrow_file(&testdata, version, path); + roundtrip_arrow_stream(&testdata, version, path); }); } #[test] -fn read_and_rewrite_generated_streams_014() { +fn write_0_1_7() { let testdata = arrow_test_data(); - let version = "0.14.1"; - // the test is repetitive, thus we can read all supported files at once - let paths = vec![ - "generated_interval", - "generated_datetime", - "generated_dictionary", - "generated_map", - "generated_nested", - "generated_primitive_no_batches", - "generated_primitive_zerolength", - "generated_primitive", - "generated_decimal", - ]; + let version = "0.17.1"; + let paths = vec!["generated_union"]; paths.iter().for_each(|path| { - let file = File::open(format!( - "{}/arrow-ipc-stream/integration/{}/{}.stream", - testdata, version, path - )) - .unwrap(); - - let reader = StreamReader::try_new(file, None).unwrap(); - - let mut file = tempfile::tempfile().unwrap(); - - // read and rewrite the stream to a temp location - { - let mut writer = StreamWriter::try_new(&mut file, &reader.schema()).unwrap(); - reader.for_each(|batch| { - writer.write(&batch.unwrap()).unwrap(); - }); - writer.finish().unwrap(); - } - - file.rewind().unwrap(); - let mut reader = StreamReader::try_new(file, None).unwrap(); - - // read expected JSON output - let arrow_json = read_gzip_json(version, path); - assert!(arrow_json.equals_reader(&mut reader).unwrap()); + roundtrip_arrow_file(&testdata, version, path); + roundtrip_arrow_stream(&testdata, version, path); }); } #[test] -fn read_and_rewrite_generated_files_100() { +fn write_1_0_0_littleendian() { let testdata = arrow_test_data(); let version = "1.0.0-littleendian"; - // the test is repetitive, thus we can read all supported files at once let paths = vec![ - "generated_custom_metadata", "generated_datetime", - "generated_dictionary_unsigned", + "generated_custom_metadata", + "generated_decimal", + "generated_decimal256", "generated_dictionary", - // "generated_duplicate_fieldnames", + "generated_dictionary_unsigned", + "generated_duplicate_fieldnames", + "generated_extension", "generated_interval", "generated_map", + // thread 'write_1_0_0_littleendian' panicked at 'assertion failed: `(left == right)` + // "generated_map_non_canonical", "generated_nested", - // "generated_nested_large_offsets", - "generated_null_trivial", + "generated_nested_dictionary", + "generated_nested_large_offsets", "generated_null", + "generated_null_trivial", + "generated_primitive", "generated_primitive_large_offsets", "generated_primitive_no_batches", "generated_primitive_zerolength", - "generated_primitive", - // "generated_recursive_nested", + "generated_recursive_nested", + "generated_union", ]; paths.iter().for_each(|path| { - let file = File::open(format!( - "{}/arrow-ipc-stream/integration/{}/{}.arrow_file", - testdata, version, path - )) - .unwrap(); - - let mut reader = FileReader::try_new(file, None).unwrap(); - - let mut file = tempfile::tempfile().unwrap(); - - // read and rewrite the file to a temp location - { - // write IPC version 5 - let options = - IpcWriteOptions::try_new(8, false, ipc::MetadataVersion::V5).unwrap(); - let mut writer = - FileWriter::try_new_with_options(&mut file, &reader.schema(), options) - .unwrap(); - while let Some(Ok(batch)) = reader.next() { - writer.write(&batch).unwrap(); - } - writer.finish().unwrap(); - } - - file.rewind().unwrap(); - let mut reader = FileReader::try_new(file, None).unwrap(); - - // read expected JSON output - let arrow_json = read_gzip_json(version, path); - assert!(arrow_json.equals_reader(&mut reader).unwrap()); + roundtrip_arrow_file(&testdata, version, path); + roundtrip_arrow_stream(&testdata, version, path); }); } #[test] -fn read_and_rewrite_generated_streams_100() { +fn write_2_0_0_compression() { let testdata = arrow_test_data(); - let version = "1.0.0-littleendian"; - // the test is repetitive, thus we can read all supported files at once - let paths = vec![ - "generated_custom_metadata", - "generated_datetime", - "generated_dictionary_unsigned", - "generated_dictionary", - // "generated_duplicate_fieldnames", - "generated_interval", - "generated_map", - "generated_nested", - // "generated_nested_large_offsets", - "generated_null_trivial", - "generated_null", - "generated_primitive_large_offsets", - "generated_primitive_no_batches", - "generated_primitive_zerolength", - "generated_primitive", - // "generated_recursive_nested", - ]; - paths.iter().for_each(|path| { - let file = File::open(format!( - "{}/arrow-ipc-stream/integration/{}/{}.stream", - testdata, version, path - )) - .unwrap(); - - let reader = StreamReader::try_new(file, None).unwrap(); + let version = "2.0.0-compression"; + let paths = vec!["generated_lz4", "generated_zstd"]; - let mut file = tempfile::tempfile().unwrap(); + // writer options for each compression type + let all_options = vec![ + IpcWriteOptions::try_new(8, false, ipc::MetadataVersion::V5) + .unwrap() + .try_with_compression(Some(ipc::CompressionType::LZ4_FRAME)) + .unwrap(), + // write IPC version 5 with zstd + IpcWriteOptions::try_new(8, false, ipc::MetadataVersion::V5) + .unwrap() + .try_with_compression(Some(ipc::CompressionType::ZSTD)) + .unwrap(), + ]; - // read and rewrite the stream to a temp location - { - let options = - IpcWriteOptions::try_new(8, false, ipc::MetadataVersion::V5).unwrap(); - let mut writer = - StreamWriter::try_new_with_options(&mut file, &reader.schema(), options) - .unwrap(); - reader.for_each(|batch| { - writer.write(&batch.unwrap()).unwrap(); - }); - writer.finish().unwrap(); + paths.iter().for_each(|path| { + for options in &all_options { + println!("Using options {options:?}"); + roundtrip_arrow_file_with_options(&testdata, version, path, options.clone()); + roundtrip_arrow_stream_with_options( + &testdata, + version, + path, + options.clone(), + ); } - - file.rewind().unwrap(); - - let mut reader = StreamReader::try_new(file, None).unwrap(); - - // read expected JSON output - let arrow_json = read_gzip_json(version, path); - assert!(arrow_json.equals_reader(&mut reader).unwrap()); }); } -#[test] -fn read_and_rewrite_compression_files_200() { - let testdata = arrow_test_data(); - let version = "2.0.0-compression"; - // the test is repetitive, thus we can read all supported files at once - let paths = vec!["generated_lz4", "generated_zstd"]; - paths.iter().for_each(|path| { - let file = File::open(format!( - "{}/arrow-ipc-stream/integration/{}/{}.arrow_file", - testdata, version, path - )) - .unwrap(); +/// Verifies the arrow file writer by reading the contents of an +/// arrow_file, writing it to a file, and then ensuring the contents +/// match the expected json contents. It also verifies that +/// RecordBatches read from the new file matches the original. +/// +/// Input file: +/// `arrow-ipc-stream/integration//.arrow_file +/// +/// Verification json file +/// `arrow-ipc-stream/integration//.json.gz +fn roundtrip_arrow_file(testdata: &str, version: &str, path: &str) { + roundtrip_arrow_file_with_options(testdata, version, path, IpcWriteOptions::default()) +} +fn roundtrip_arrow_file_with_options( + testdata: &str, + version: &str, + path: &str, + options: IpcWriteOptions, +) { + let filename = format!( + "{}/arrow-ipc-stream/integration/{}/{}.arrow_file", + testdata, version, path + ); + println!("Verifying {filename}"); + + let mut tempfile = tempfile::tempfile().unwrap(); + + { + println!(" writing to tempfile {tempfile:?}"); + let file = File::open(&filename).unwrap(); let mut reader = FileReader::try_new(file, None).unwrap(); - let mut file = tempfile::tempfile().unwrap(); - // read and rewrite the file to a temp location { - // write IPC version 5 - let options = IpcWriteOptions::try_new(8, false, ipc::MetadataVersion::V5) - .unwrap() - .try_with_compression(Some(ipc::CompressionType::LZ4_FRAME)) - .unwrap(); - - let mut writer = - FileWriter::try_new_with_options(&mut file, &reader.schema(), options) - .unwrap(); + let mut writer = FileWriter::try_new_with_options( + &mut tempfile, + &reader.schema(), + options, + ) + .unwrap(); while let Some(Ok(batch)) = reader.next() { writer.write(&batch).unwrap(); } writer.finish().unwrap(); } + } - file.rewind().unwrap(); - let mut reader = FileReader::try_new(file, None).unwrap(); + { + println!(" checking rewrite to with json"); + tempfile.rewind().unwrap(); + let mut reader = FileReader::try_new(&tempfile, None).unwrap(); - // read expected JSON output let arrow_json = read_gzip_json(version, path); assert!(arrow_json.equals_reader(&mut reader).unwrap()); - }); + } + + { + println!(" checking rewrite with original"); + let file = File::open(&filename).unwrap(); + let reader = FileReader::try_new(file, None).unwrap(); + + tempfile.rewind().unwrap(); + let rewrite_reader = FileReader::try_new(&tempfile, None).unwrap(); + + // Compare to original reader + reader.into_iter().zip(rewrite_reader.into_iter()).for_each( + |(batch1, batch2)| { + assert_eq!(batch1.unwrap(), batch2.unwrap()); + }, + ); + } } -#[test] -fn read_and_rewrite_compression_stream_200() { - let testdata = arrow_test_data(); - let version = "2.0.0-compression"; - // the test is repetitive, thus we can read all supported files at once - let paths = vec!["generated_lz4", "generated_zstd"]; - paths.iter().for_each(|path| { - let file = File::open(format!( - "{}/arrow-ipc-stream/integration/{}/{}.stream", - testdata, version, path - )) - .unwrap(); - - let reader = StreamReader::try_new(file, None).unwrap(); +/// Verifies the arrow file writer by reading the contents of an +/// arrow_file, writing it to a file, and then ensuring the contents +/// match the expected json contents. It also verifies that +/// RecordBatches read from the new file matches the original. +/// +/// Input file: +/// `arrow-ipc-stream/integration//.stream +/// +/// Verification json file +/// `arrow-ipc-stream/integration//.json.gz +fn roundtrip_arrow_stream(testdata: &str, version: &str, path: &str) { + roundtrip_arrow_stream_with_options( + testdata, + version, + path, + IpcWriteOptions::default(), + ) +} - let mut file = tempfile::tempfile().unwrap(); +fn roundtrip_arrow_stream_with_options( + testdata: &str, + version: &str, + path: &str, + options: IpcWriteOptions, +) { + let filename = format!( + "{}/arrow-ipc-stream/integration/{}/{}.stream", + testdata, version, path + ); + println!("Verifying {filename}"); + + let mut tempfile = tempfile::tempfile().unwrap(); + + { + println!(" writing to tempfile {tempfile:?}"); + let file = File::open(&filename).unwrap(); + let mut reader = StreamReader::try_new(file, None).unwrap(); - // read and rewrite the stream to a temp location + // read and rewrite the file to a temp location { - let options = IpcWriteOptions::try_new(8, false, ipc::MetadataVersion::V5) - .unwrap() - .try_with_compression(Some(ipc::CompressionType::ZSTD)) - .unwrap(); - - let mut writer = - StreamWriter::try_new_with_options(&mut file, &reader.schema(), options) - .unwrap(); - reader.for_each(|batch| { - writer.write(&batch.unwrap()).unwrap(); - }); + let mut writer = StreamWriter::try_new_with_options( + &mut tempfile, + &reader.schema(), + options, + ) + .unwrap(); + while let Some(Ok(batch)) = reader.next() { + writer.write(&batch).unwrap(); + } writer.finish().unwrap(); } + } - file.rewind().unwrap(); + { + println!(" checking rewrite to with json"); + tempfile.rewind().unwrap(); + let mut reader = StreamReader::try_new(&tempfile, None).unwrap(); - let mut reader = StreamReader::try_new(file, None).unwrap(); - - // read expected JSON output let arrow_json = read_gzip_json(version, path); assert!(arrow_json.equals_reader(&mut reader).unwrap()); - }); + } + + { + println!(" checking rewrite with original"); + let file = File::open(&filename).unwrap(); + let reader = StreamReader::try_new(file, None).unwrap(); + + tempfile.rewind().unwrap(); + let rewrite_reader = StreamReader::try_new(&tempfile, None).unwrap(); + + // Compare to original reader + reader.into_iter().zip(rewrite_reader.into_iter()).for_each( + |(batch1, batch2)| { + assert_eq!(batch1.unwrap(), batch2.unwrap()); + }, + ); + } } diff --git a/arrow/Cargo.toml b/arrow/Cargo.toml index 202b4c4f40f6..d83637cbcea1 100644 --- a/arrow/Cargo.toml +++ b/arrow/Cargo.toml @@ -265,10 +265,6 @@ name = "lexsort" harness = false required-features = ["test_utils"] -[[test]] -name = "ipc" -required-features = ["test_utils", "ipc"] - [[test]] name = "csv" required-features = ["csv", "chrono-tz"] diff --git a/arrow/tests/ipc.rs b/arrow/tests/ipc.rs deleted file mode 100644 index abaa238ba5c6..000000000000 --- a/arrow/tests/ipc.rs +++ /dev/null @@ -1,61 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -use arrow_ipc::reader::StreamReader; -use arrow_ipc::writer::StreamWriter; -use std::fs::File; -use std::io::Seek; - -#[test] -fn read_union_017() { - let testdata = arrow::util::test_util::arrow_test_data(); - let data_file = File::open(format!( - "{}/arrow-ipc-stream/integration/0.17.1/generated_union.stream", - testdata, - )) - .unwrap(); - - let reader = StreamReader::try_new(data_file, None).unwrap(); - - let mut file = tempfile::tempfile().unwrap(); - // read and rewrite the stream to a temp location - { - let mut writer = StreamWriter::try_new(&mut file, &reader.schema()).unwrap(); - reader.for_each(|batch| { - writer.write(&batch.unwrap()).unwrap(); - }); - writer.finish().unwrap(); - } - file.rewind().unwrap(); - - // Compare original file and rewrote file - let rewrite_reader = StreamReader::try_new(file, None).unwrap(); - - let data_file = File::open(format!( - "{}/arrow-ipc-stream/integration/0.17.1/generated_union.stream", - testdata, - )) - .unwrap(); - let reader = StreamReader::try_new(data_file, None).unwrap(); - - reader - .into_iter() - .zip(rewrite_reader.into_iter()) - .for_each(|(batch1, batch2)| { - assert_eq!(batch1.unwrap(), batch2.unwrap()); - }); -}