diff --git a/src/io/parquet/read/deserialize/binary/basic.rs b/src/io/parquet/read/deserialize/binary/basic.rs index 63aa8ff642a..3732839b7b0 100644 --- a/src/io/parquet/read/deserialize/binary/basic.rs +++ b/src/io/parquet/read/deserialize/binary/basic.rs @@ -426,16 +426,18 @@ pub struct Iter, I: DataPages> { data_type: DataType, items: VecDeque<(Binary, MutableBitmap)>, chunk_size: Option, + remaining: usize, phantom_a: std::marker::PhantomData, } impl, I: DataPages> Iter { - pub fn new(iter: I, data_type: DataType, chunk_size: Option) -> Self { + pub fn new(iter: I, data_type: DataType, chunk_size: Option, num_rows: usize) -> Self { Self { iter, data_type, items: VecDeque::new(), chunk_size, + remaining: num_rows, phantom_a: Default::default(), } } @@ -448,6 +450,7 @@ impl, I: DataPages> Iterator for Iter let maybe_state = next( &mut self.iter, &mut self.items, + &mut self.remaining, self.chunk_size, &BinaryDecoder::::default(), ); diff --git a/src/io/parquet/read/deserialize/binary/dictionary.rs b/src/io/parquet/read/deserialize/binary/dictionary.rs index bf656929e65..16ccc372fec 100644 --- a/src/io/parquet/read/deserialize/binary/dictionary.rs +++ b/src/io/parquet/read/deserialize/binary/dictionary.rs @@ -26,6 +26,7 @@ where values_data_type: DataType, values: Dict, items: VecDeque<(Vec, MutableBitmap)>, + remaining: usize, chunk_size: Option, phantom: std::marker::PhantomData, } @@ -36,7 +37,7 @@ where O: Offset, I: DataPages, { - pub fn new(iter: I, data_type: DataType, chunk_size: Option) -> Self { + pub fn new(iter: I, data_type: DataType, num_rows: usize, chunk_size: Option) -> Self { let values_data_type = match &data_type { DataType::Dictionary(_, values, _) => values.as_ref().clone(), _ => unreachable!(), @@ -47,6 +48,7 @@ where values_data_type, values: Dict::Empty, items: VecDeque::new(), + remaining: num_rows, chunk_size, phantom: std::marker::PhantomData, } @@ -93,6 +95,7 @@ where &mut self.items, &mut self.values, self.data_type.clone(), + &mut self.remaining, self.chunk_size, |dict| read_dict::(self.values_data_type.clone(), dict), ); diff --git a/src/io/parquet/read/deserialize/binary/mod.rs b/src/io/parquet/read/deserialize/binary/mod.rs index 6eee318772d..5ea524eaaee 100644 --- a/src/io/parquet/read/deserialize/binary/mod.rs +++ b/src/io/parquet/read/deserialize/binary/mod.rs @@ -23,6 +23,7 @@ pub fn iter_to_arrays_nested<'a, O, A, I>( iter: I, init: Vec, data_type: DataType, + num_rows: usize, chunk_size: Option, ) -> NestedArrayIter<'a> where @@ -31,7 +32,7 @@ where O: Offset, { Box::new( - ArrayIterator::::new(iter, init, data_type, chunk_size).map(|x| { + ArrayIterator::::new(iter, init, data_type, num_rows, chunk_size).map(|x| { x.map(|(mut nested, array)| { let _ = nested.nested.pop().unwrap(); // the primitive let values = Box::new(array) as Box; diff --git a/src/io/parquet/read/deserialize/binary/nested.rs b/src/io/parquet/read/deserialize/binary/nested.rs index f7b16fe03ef..e4fb36b944a 100644 --- a/src/io/parquet/read/deserialize/binary/nested.rs +++ b/src/io/parquet/read/deserialize/binary/nested.rs @@ -147,6 +147,7 @@ pub struct ArrayIterator, I: DataPages> { init: Vec, items: VecDeque<(NestedState, (Binary, MutableBitmap))>, chunk_size: Option, + remaining: usize, phantom_a: std::marker::PhantomData, } @@ -155,6 +156,7 @@ impl, I: DataPages> ArrayIterator { iter: I, init: Vec, data_type: DataType, + num_rows: usize, chunk_size: Option, ) -> Self { Self { @@ -163,6 +165,7 @@ impl, I: DataPages> ArrayIterator { init, items: VecDeque::new(), chunk_size, + remaining: num_rows, phantom_a: Default::default(), } } @@ -175,6 +178,7 @@ impl, I: DataPages> Iterator for ArrayIterator let maybe_state = next( &mut self.iter, &mut self.items, + &mut self.remaining, &self.init, self.chunk_size, &BinaryDecoder::::default(), diff --git a/src/io/parquet/read/deserialize/boolean/basic.rs b/src/io/parquet/read/deserialize/boolean/basic.rs index d3189b23f64..572565a6d5d 100644 --- a/src/io/parquet/read/deserialize/boolean/basic.rs +++ b/src/io/parquet/read/deserialize/boolean/basic.rs @@ -192,15 +192,17 @@ pub struct Iter { data_type: DataType, items: VecDeque<(MutableBitmap, MutableBitmap)>, chunk_size: Option, + remaining: usize, } impl Iter { - pub fn new(iter: I, data_type: DataType, chunk_size: Option) -> Self { + pub fn new(iter: I, data_type: DataType, chunk_size: Option, num_rows: usize) -> Self { Self { iter, data_type, items: VecDeque::new(), chunk_size, + remaining: num_rows, } } } @@ -212,6 +214,7 @@ impl Iterator for Iter { let maybe_state = next( &mut self.iter, &mut self.items, + &mut self.remaining, self.chunk_size, &BooleanDecoder::default(), ); diff --git a/src/io/parquet/read/deserialize/boolean/mod.rs b/src/io/parquet/read/deserialize/boolean/mod.rs index dde0a14852a..5ff68edc1f0 100644 --- a/src/io/parquet/read/deserialize/boolean/mod.rs +++ b/src/io/parquet/read/deserialize/boolean/mod.rs @@ -13,16 +13,19 @@ pub use self::basic::Iter; pub fn iter_to_arrays_nested<'a, I: 'a>( iter: I, init: Vec, + num_rows: usize, chunk_size: Option, ) -> NestedArrayIter<'a> where I: DataPages, { - Box::new(ArrayIterator::new(iter, init, chunk_size).map(|x| { - x.map(|(mut nested, array)| { - let _ = nested.nested.pop().unwrap(); // the primitive - let values = array.boxed(); - (nested, values) - }) - })) + Box::new( + ArrayIterator::new(iter, init, num_rows, chunk_size).map(|x| { + x.map(|(mut nested, array)| { + let _ = nested.nested.pop().unwrap(); // the primitive + let values = array.boxed(); + (nested, values) + }) + }), + ) } diff --git a/src/io/parquet/read/deserialize/boolean/nested.rs b/src/io/parquet/read/deserialize/boolean/nested.rs index 8c20694ed5c..6df69b59443 100644 --- a/src/io/parquet/read/deserialize/boolean/nested.rs +++ b/src/io/parquet/read/deserialize/boolean/nested.rs @@ -105,15 +105,17 @@ pub struct ArrayIterator { iter: I, init: Vec, items: VecDeque<(NestedState, (MutableBitmap, MutableBitmap))>, + remaining: usize, chunk_size: Option, } impl ArrayIterator { - pub fn new(iter: I, init: Vec, chunk_size: Option) -> Self { + pub fn new(iter: I, init: Vec, num_rows: usize, chunk_size: Option) -> Self { Self { iter, init, items: VecDeque::new(), + remaining: num_rows, chunk_size, } } @@ -130,6 +132,7 @@ impl Iterator for ArrayIterator { let maybe_state = next( &mut self.iter, &mut self.items, + &mut self.remaining, &self.init, self.chunk_size, &BooleanDecoder::default(), diff --git a/src/io/parquet/read/deserialize/dictionary.rs b/src/io/parquet/read/deserialize/dictionary.rs index f89a5e06d5a..be7ca1dc4dd 100644 --- a/src/io/parquet/read/deserialize/dictionary.rs +++ b/src/io/parquet/read/deserialize/dictionary.rs @@ -255,6 +255,7 @@ pub(super) fn next_dict< items: &mut VecDeque<(Vec, MutableBitmap)>, dict: &mut Dict, data_type: DataType, + remaining: &mut usize, chunk_size: Option, read_dict: F, ) -> MaybeNext>> { @@ -286,7 +287,13 @@ pub(super) fn next_dict< Err(e) => return MaybeNext::Some(Err(e)), }; - utils::extend_from_new_page(page, chunk_size, items, &PrimitiveDecoder::::default()); + utils::extend_from_new_page( + page, + chunk_size, + items, + remaining, + &PrimitiveDecoder::::default(), + ); if items.front().unwrap().len() < chunk_size.unwrap_or(usize::MAX) { MaybeNext::More diff --git a/src/io/parquet/read/deserialize/fixed_size_binary/basic.rs b/src/io/parquet/read/deserialize/fixed_size_binary/basic.rs index 5f260967e71..b68df58613a 100644 --- a/src/io/parquet/read/deserialize/fixed_size_binary/basic.rs +++ b/src/io/parquet/read/deserialize/fixed_size_binary/basic.rs @@ -286,10 +286,11 @@ pub struct Iter { size: usize, items: VecDeque<(FixedSizeBinary, MutableBitmap)>, chunk_size: Option, + remaining: usize, } impl Iter { - pub fn new(iter: I, data_type: DataType, chunk_size: Option) -> Self { + pub fn new(iter: I, data_type: DataType, num_rows: usize, chunk_size: Option) -> Self { let size = FixedSizeBinaryArray::get_size(&data_type); Self { iter, @@ -297,6 +298,7 @@ impl Iter { size, items: VecDeque::new(), chunk_size, + remaining: num_rows, } } } @@ -308,6 +310,7 @@ impl Iterator for Iter { let maybe_state = next( &mut self.iter, &mut self.items, + &mut self.remaining, self.chunk_size, &BinaryDecoder { size: self.size }, ); diff --git a/src/io/parquet/read/deserialize/fixed_size_binary/dictionary.rs b/src/io/parquet/read/deserialize/fixed_size_binary/dictionary.rs index 4d44ef4f724..7ce28bd3e79 100644 --- a/src/io/parquet/read/deserialize/fixed_size_binary/dictionary.rs +++ b/src/io/parquet/read/deserialize/fixed_size_binary/dictionary.rs @@ -25,6 +25,7 @@ where values_data_type: DataType, values: Dict, items: VecDeque<(Vec, MutableBitmap)>, + remaining: usize, chunk_size: Option, } @@ -33,7 +34,7 @@ where K: DictionaryKey, I: DataPages, { - pub fn new(iter: I, data_type: DataType, chunk_size: Option) -> Self { + pub fn new(iter: I, data_type: DataType, num_rows: usize, chunk_size: Option) -> Self { let values_data_type = match &data_type { DataType::Dictionary(_, values, _) => values.as_ref().clone(), _ => unreachable!(), @@ -44,6 +45,7 @@ where values_data_type, values: Dict::Empty, items: VecDeque::new(), + remaining: num_rows, chunk_size, } } @@ -76,6 +78,7 @@ where &mut self.items, &mut self.values, self.data_type.clone(), + &mut self.remaining, self.chunk_size, |dict| read_dict(self.values_data_type.clone(), dict), ); diff --git a/src/io/parquet/read/deserialize/mod.rs b/src/io/parquet/read/deserialize/mod.rs index 3d229555b79..b80b9adbfb7 100644 --- a/src/io/parquet/read/deserialize/mod.rs +++ b/src/io/parquet/read/deserialize/mod.rs @@ -96,6 +96,7 @@ fn columns_to_iter_recursive<'a, I: 'a>( mut types: Vec<&PrimitiveType>, field: Field, mut init: Vec, + num_rows: usize, chunk_size: Option, ) -> Result> where @@ -111,6 +112,7 @@ where types.pop().unwrap(), field.data_type, chunk_size, + num_rows, )? .map(|x| Ok((NestedState::new(vec![]), x?))), )); @@ -120,7 +122,7 @@ where Boolean => { init.push(InitNested::Primitive(field.is_nullable)); types.pop(); - boolean::iter_to_arrays_nested(columns.pop().unwrap(), init, chunk_size) + boolean::iter_to_arrays_nested(columns.pop().unwrap(), init, num_rows, chunk_size) } Primitive(Int8) => { init.push(InitNested::Primitive(field.is_nullable)); @@ -129,6 +131,7 @@ where columns.pop().unwrap(), init, field.data_type().clone(), + num_rows, chunk_size, |x: i32| x as i8, ) @@ -140,6 +143,7 @@ where columns.pop().unwrap(), init, field.data_type().clone(), + num_rows, chunk_size, |x: i32| x as i16, ) @@ -151,6 +155,7 @@ where columns.pop().unwrap(), init, field.data_type().clone(), + num_rows, chunk_size, |x: i32| x, ) @@ -162,6 +167,7 @@ where columns.pop().unwrap(), init, field.data_type().clone(), + num_rows, chunk_size, |x: i64| x, ) @@ -173,6 +179,7 @@ where columns.pop().unwrap(), init, field.data_type().clone(), + num_rows, chunk_size, |x: i32| x as u8, ) @@ -184,6 +191,7 @@ where columns.pop().unwrap(), init, field.data_type().clone(), + num_rows, chunk_size, |x: i32| x as u16, ) @@ -196,6 +204,7 @@ where columns.pop().unwrap(), init, field.data_type().clone(), + num_rows, chunk_size, |x: i32| x as u32, ), @@ -204,6 +213,7 @@ where columns.pop().unwrap(), init, field.data_type().clone(), + num_rows, chunk_size, |x: i64| x as u32, ), @@ -221,6 +231,7 @@ where columns.pop().unwrap(), init, field.data_type().clone(), + num_rows, chunk_size, |x: i64| x as u64, ) @@ -232,6 +243,7 @@ where columns.pop().unwrap(), init, field.data_type().clone(), + num_rows, chunk_size, |x: f32| x, ) @@ -243,6 +255,7 @@ where columns.pop().unwrap(), init, field.data_type().clone(), + num_rows, chunk_size, |x: f64| x, ) @@ -254,6 +267,7 @@ where columns.pop().unwrap(), init, field.data_type().clone(), + num_rows, chunk_size, ) } @@ -264,6 +278,7 @@ where columns.pop().unwrap(), init, field.data_type().clone(), + num_rows, chunk_size, ) } @@ -274,6 +289,7 @@ where columns.pop().unwrap(), init, field.data_type().clone(), + num_rows, chunk_size, ) } @@ -284,6 +300,7 @@ where columns.pop().unwrap(), init, field.data_type().clone(), + num_rows, chunk_size, ) } @@ -298,6 +315,7 @@ where types, inner.as_ref().clone(), init, + num_rows, chunk_size, )?; let iter = iter.map(move |x| { @@ -317,7 +335,14 @@ where let n = n_columns(&f.data_type); let columns = columns.drain(columns.len() - n..).collect(); let types = types.drain(types.len() - n..).collect(); - columns_to_iter_recursive(columns, types, f.clone(), init, chunk_size) + columns_to_iter_recursive( + columns, + types, + f.clone(), + init, + num_rows, + chunk_size, + ) }) .collect::>>()?; let columns = columns.into_iter().rev().collect(); @@ -330,6 +355,7 @@ where types, inner.as_ref().clone(), init, + num_rows, chunk_size, )?; Box::new(iter.map(move |x| { @@ -392,12 +418,13 @@ pub fn column_iter_to_arrays<'a, I: 'a>( types: Vec<&PrimitiveType>, field: Field, chunk_size: Option, + num_rows: usize, ) -> Result> where I: DataPages, { Ok(Box::new( - columns_to_iter_recursive(columns, types, field, vec![], chunk_size)? + columns_to_iter_recursive(columns, types, field, vec![], num_rows, chunk_size)? .map(|x| x.map(|x| x.1)), )) } diff --git a/src/io/parquet/read/deserialize/nested_utils.rs b/src/io/parquet/read/deserialize/nested_utils.rs index f4188d2816a..a4a52fb3db3 100644 --- a/src/io/parquet/read/deserialize/nested_utils.rs +++ b/src/io/parquet/read/deserialize/nested_utils.rs @@ -340,6 +340,7 @@ fn extend<'a, D: NestedDecoder<'a>>( page: &'a DataPage, init: &[InitNested], items: &mut VecDeque<(NestedState, D::DecodedState)>, + remaining: &mut usize, decoder: &D, chunk_size: Option, ) -> Result<()> { @@ -347,21 +348,20 @@ fn extend<'a, D: NestedDecoder<'a>>( let mut page = NestedPage::try_new(page)?; let capacity = chunk_size.unwrap_or(0); - let chunk_size = chunk_size.unwrap_or(usize::MAX); + let chunk_size = chunk_size.map(|x| x.min(*remaining)).unwrap_or(*remaining); let (mut nested, mut decoded) = if let Some((nested, decoded)) = items.pop_back() { - // there is a already a state => it must be incomplete... - debug_assert!( - nested.len() < chunk_size, - "the temp array is expected to be incomplete" - ); + *remaining += nested.len(); (nested, decoded) } else { // there is no state => initialize it (init_nested(init, capacity), decoder.with_capacity(0)) }; - let remaining = chunk_size - nested.len(); + // e.g. chunk = 10, remaining = 100, decoded = 2 => 8.min(100) = 8 + // e.g. chunk = 100, remaining = 100, decoded = 0 => 100.min(100) = 100 + // e.g. chunk = 10, remaining = 2, decoded = 2 => 8.min(2) = 2 + let additional = (chunk_size - nested.len()).min(*remaining); // extend the current state extend_offsets2( @@ -370,12 +370,15 @@ fn extend<'a, D: NestedDecoder<'a>>( &mut nested.nested, &mut decoded, decoder, - remaining, + additional, ); + *remaining -= nested.len(); items.push_back((nested, decoded)); - while page.len() > 0 { - let mut nested = init_nested(init, capacity); + while page.len() > 0 && *remaining > 0 { + let additional = chunk_size.min(*remaining); + + let mut nested = init_nested(init, additional); let mut decoded = decoder.with_capacity(0); extend_offsets2( &mut page, @@ -383,8 +386,9 @@ fn extend<'a, D: NestedDecoder<'a>>( &mut nested.nested, &mut decoded, decoder, - chunk_size, + additional, ); + *remaining -= nested.len(); items.push_back((nested, decoded)); } Ok(()) @@ -465,6 +469,7 @@ fn extend_offsets2<'a, D: NestedDecoder<'a>>( pub(super) fn next<'a, I, D>( iter: &'a mut I, items: &mut VecDeque<(NestedState, D::DecodedState)>, + remaining: &mut usize, init: &[InitNested], chunk_size: Option, decoder: &D, @@ -489,7 +494,7 @@ where } Ok(Some(page)) => { // there is a new page => consume the page from the start - let error = extend(page, init, items, decoder, chunk_size); + let error = extend(page, init, items, remaining, decoder, chunk_size); match error { Ok(_) => {} Err(e) => return MaybeNext::Some(Err(e)), diff --git a/src/io/parquet/read/deserialize/null.rs b/src/io/parquet/read/deserialize/null.rs index 2ea2bf06e1e..e8c18eb8c65 100644 --- a/src/io/parquet/read/deserialize/null.rs +++ b/src/io/parquet/read/deserialize/null.rs @@ -2,11 +2,12 @@ use crate::{array::NullArray, datatypes::DataType}; use super::super::{ArrayIter, DataPages}; -/// Converts [`DataPages`] to an [`Iterator`] of [`Array`] +/// Converts [`DataPages`] to an [`ArrayIter`] pub fn iter_to_arrays<'a, I>( mut iter: I, data_type: DataType, chunk_size: Option, + num_rows: usize, ) -> ArrayIter<'a> where I: 'a + DataPages, @@ -14,16 +15,22 @@ where let mut len = 0usize; while let Ok(Some(x)) = iter.next() { - len += x.num_values() + let rows = x.num_values(); + len = (len + rows).min(num_rows); + if len == num_rows { + break; + } } + if len == 0 { return Box::new(std::iter::empty()); } let chunk_size = chunk_size.unwrap_or(len); - let complete_chunks = chunk_size / len; - let remainder = chunk_size % len; + let complete_chunks = len / chunk_size; + + let remainder = len - (complete_chunks * chunk_size); let i_data_type = data_type.clone(); let complete = (0..complete_chunks) .map(move |_| Ok(NullArray::new(i_data_type.clone(), chunk_size).boxed())); @@ -34,3 +41,56 @@ where Box::new(complete.chain(std::iter::once(Ok(array.boxed())))) } } + +#[cfg(test)] +mod tests { + use parquet2::{ + encoding::Encoding, + error::Error as ParquetError, + metadata::Descriptor, + page::{DataPage, DataPageHeader, DataPageHeaderV1}, + schema::types::{PhysicalType, PrimitiveType}, + }; + + use crate::{array::NullArray, datatypes::DataType, error::Error}; + + use super::iter_to_arrays; + + #[test] + fn limit() { + let new_page = |values: i32| { + DataPage::new( + DataPageHeader::V1(DataPageHeaderV1 { + num_values: values, + encoding: Encoding::Plain.into(), + definition_level_encoding: Encoding::Plain.into(), + repetition_level_encoding: Encoding::Plain.into(), + statistics: None, + }), + vec![], + None, + Descriptor { + primitive_type: PrimitiveType::from_physical( + "a".to_string(), + PhysicalType::Int32, + ), + max_def_level: 0, + max_rep_level: 0, + }, + None, + ) + }; + + let p1 = new_page(100); + let p2 = new_page(100); + let pages = vec![Result::<_, ParquetError>::Ok(&p1), Ok(&p2)]; + let pages = fallible_streaming_iterator::convert(pages.into_iter()); + let arrays = iter_to_arrays(pages, DataType::Null, Some(10), 101); + + let arrays = arrays.collect::, Error>>().unwrap(); + let expected = std::iter::repeat(NullArray::new(DataType::Null, 10).boxed()) + .take(10) + .chain(std::iter::once(NullArray::new(DataType::Null, 1).boxed())); + assert_eq!(arrays, expected.collect::>()) + } +} diff --git a/src/io/parquet/read/deserialize/primitive/basic.rs b/src/io/parquet/read/deserialize/primitive/basic.rs index 1a3d1afef96..0ed1d8b51cb 100644 --- a/src/io/parquet/read/deserialize/primitive/basic.rs +++ b/src/io/parquet/read/deserialize/primitive/basic.rs @@ -284,7 +284,7 @@ pub(super) fn finish( MutablePrimitiveArray::from_data(data_type.clone(), values, validity) } -/// An iterator adapter over [`DataPages`] assumed to be encoded as primitive arrays +/// An [`Iterator`] adapter over [`DataPages`] assumed to be encoded as primitive arrays #[derive(Debug)] pub struct Iter where @@ -296,6 +296,7 @@ where iter: I, data_type: DataType, items: VecDeque<(Vec, MutableBitmap)>, + remaining: usize, chunk_size: Option, op: F, phantom: std::marker::PhantomData

, @@ -309,11 +310,18 @@ where P: ParquetNativeType, F: Copy + Fn(P) -> T, { - pub fn new(iter: I, data_type: DataType, chunk_size: Option, op: F) -> Self { + pub fn new( + iter: I, + data_type: DataType, + num_rows: usize, + chunk_size: Option, + op: F, + ) -> Self { Self { iter, data_type, items: VecDeque::new(), + remaining: num_rows, chunk_size, op, phantom: Default::default(), @@ -334,6 +342,7 @@ where let maybe_state = utils::next( &mut self.iter, &mut self.items, + &mut self.remaining, self.chunk_size, &PrimitiveDecoder::new(self.op), ); diff --git a/src/io/parquet/read/deserialize/primitive/dictionary.rs b/src/io/parquet/read/deserialize/primitive/dictionary.rs index 2b3f0ca5491..3253b84706b 100644 --- a/src/io/parquet/read/deserialize/primitive/dictionary.rs +++ b/src/io/parquet/read/deserialize/primitive/dictionary.rs @@ -48,6 +48,7 @@ where values_data_type: DataType, values: Dict, items: VecDeque<(Vec, MutableBitmap)>, + remaining: usize, chunk_size: Option, op: F, phantom: std::marker::PhantomData

, @@ -62,7 +63,13 @@ where P: ParquetNativeType, F: Copy + Fn(P) -> T, { - pub fn new(iter: I, data_type: DataType, chunk_size: Option, op: F) -> Self { + pub fn new( + iter: I, + data_type: DataType, + num_rows: usize, + chunk_size: Option, + op: F, + ) -> Self { let values_data_type = match &data_type { DataType::Dictionary(_, values, _) => *(values.clone()), _ => unreachable!(), @@ -74,6 +81,7 @@ where values: Dict::Empty, items: VecDeque::new(), chunk_size, + remaining: num_rows, op, phantom: Default::default(), } @@ -96,6 +104,7 @@ where &mut self.items, &mut self.values, self.data_type.clone(), + &mut self.remaining, self.chunk_size, |dict| read_dict::(self.values_data_type.clone(), self.op, dict), ); diff --git a/src/io/parquet/read/deserialize/primitive/mod.rs b/src/io/parquet/read/deserialize/primitive/mod.rs index e49cdb80ea5..0bf570ac22f 100644 --- a/src/io/parquet/read/deserialize/primitive/mod.rs +++ b/src/io/parquet/read/deserialize/primitive/mod.rs @@ -16,6 +16,7 @@ pub fn iter_to_arrays_nested<'a, I, T, P, F>( iter: I, init: Vec, data_type: DataType, + num_rows: usize, chunk_size: Option, op: F, ) -> NestedArrayIter<'a> @@ -26,11 +27,13 @@ where F: 'a + Copy + Send + Sync + Fn(P) -> T, { Box::new( - ArrayIterator::::new(iter, init, data_type, chunk_size, op).map(|x| { - x.map(|(mut nested, array)| { - let _ = nested.nested.pop().unwrap(); // the primitive - (nested, array.boxed()) - }) - }), + ArrayIterator::::new(iter, init, data_type, num_rows, chunk_size, op).map( + |x| { + x.map(|(mut nested, array)| { + let _ = nested.nested.pop().unwrap(); // the primitive + (nested, array.boxed()) + }) + }, + ), ) } diff --git a/src/io/parquet/read/deserialize/primitive/nested.rs b/src/io/parquet/read/deserialize/primitive/nested.rs index 587e1967adc..72f339d4f68 100644 --- a/src/io/parquet/read/deserialize/primitive/nested.rs +++ b/src/io/parquet/read/deserialize/primitive/nested.rs @@ -171,6 +171,7 @@ where init: Vec, data_type: DataType, items: VecDeque<(NestedState, (Vec, MutableBitmap))>, + remaining: usize, chunk_size: Option, decoder: PrimitiveDecoder, } @@ -187,6 +188,7 @@ where iter: I, init: Vec, data_type: DataType, + num_rows: usize, chunk_size: Option, op: F, ) -> Self { @@ -196,6 +198,7 @@ where data_type, items: VecDeque::new(), chunk_size, + remaining: num_rows, decoder: PrimitiveDecoder::new(op), } } @@ -215,6 +218,7 @@ where let maybe_state = next( &mut self.iter, &mut self.items, + &mut self.remaining, &self.init, self.chunk_size, &self.decoder, diff --git a/src/io/parquet/read/deserialize/simple.rs b/src/io/parquet/read/deserialize/simple.rs index ea6dacf609b..0ef14a4fd35 100644 --- a/src/io/parquet/read/deserialize/simple.rs +++ b/src/io/parquet/read/deserialize/simple.rs @@ -61,6 +61,7 @@ pub fn page_iter_to_arrays<'a, I: 'a + DataPages>( type_: &PrimitiveType, data_type: DataType, chunk_size: Option, + num_rows: usize, ) -> Result> { use DataType::*; @@ -68,17 +69,19 @@ pub fn page_iter_to_arrays<'a, I: 'a + DataPages>( let logical_type = &type_.logical_type; Ok(match data_type.to_logical_type() { - Null => null::iter_to_arrays(pages, data_type, chunk_size), - Boolean => dyn_iter(boolean::Iter::new(pages, data_type, chunk_size)), + Null => null::iter_to_arrays(pages, data_type, chunk_size, num_rows), + Boolean => dyn_iter(boolean::Iter::new(pages, data_type, chunk_size, num_rows)), UInt8 => dyn_iter(iden(primitive::Iter::new( pages, data_type, + num_rows, chunk_size, |x: i32| x as u8, ))), UInt16 => dyn_iter(iden(primitive::Iter::new( pages, data_type, + num_rows, chunk_size, |x: i32| x as u16, ))), @@ -86,6 +89,7 @@ pub fn page_iter_to_arrays<'a, I: 'a + DataPages>( PhysicalType::Int32 => dyn_iter(iden(primitive::Iter::new( pages, data_type, + num_rows, chunk_size, |x: i32| x as u32, ))), @@ -93,6 +97,7 @@ pub fn page_iter_to_arrays<'a, I: 'a + DataPages>( PhysicalType::Int64 => dyn_iter(iden(primitive::Iter::new( pages, data_type, + num_rows, chunk_size, |x: i64| x as u32, ))), @@ -106,18 +111,21 @@ pub fn page_iter_to_arrays<'a, I: 'a + DataPages>( Int8 => dyn_iter(iden(primitive::Iter::new( pages, data_type, + num_rows, chunk_size, |x: i32| x as i8, ))), Int16 => dyn_iter(iden(primitive::Iter::new( pages, data_type, + num_rows, chunk_size, |x: i32| x as i16, ))), Int32 | Date32 | Time32(_) => dyn_iter(iden(primitive::Iter::new( pages, data_type, + num_rows, chunk_size, |x: i32| x as i32, ))), @@ -129,17 +137,24 @@ pub fn page_iter_to_arrays<'a, I: 'a + DataPages>( physical_type, logical_type, data_type, + num_rows, chunk_size, time_unit, ); } - FixedSizeBinary(_) => dyn_iter(fixed_size_binary::Iter::new(pages, data_type, chunk_size)), + FixedSizeBinary(_) => dyn_iter(fixed_size_binary::Iter::new( + pages, data_type, num_rows, chunk_size, + )), Interval(IntervalUnit::YearMonth) => { let n = 12; - let pages = - fixed_size_binary::Iter::new(pages, DataType::FixedSizeBinary(n), chunk_size); + let pages = fixed_size_binary::Iter::new( + pages, + DataType::FixedSizeBinary(n), + num_rows, + chunk_size, + ); let pages = pages.map(move |maybe_array| { let array = maybe_array?; @@ -160,8 +175,12 @@ pub fn page_iter_to_arrays<'a, I: 'a + DataPages>( Interval(IntervalUnit::DayTime) => { let n = 12; - let pages = - fixed_size_binary::Iter::new(pages, DataType::FixedSizeBinary(n), chunk_size); + let pages = fixed_size_binary::Iter::new( + pages, + DataType::FixedSizeBinary(n), + num_rows, + chunk_size, + ); let pages = pages.map(move |maybe_array| { let array = maybe_array?; @@ -184,12 +203,14 @@ pub fn page_iter_to_arrays<'a, I: 'a + DataPages>( PhysicalType::Int32 => dyn_iter(iden(primitive::Iter::new( pages, data_type, + num_rows, chunk_size, |x: i32| x as i128, ))), PhysicalType::Int64 => dyn_iter(iden(primitive::Iter::new( pages, data_type, + num_rows, chunk_size, |x: i64| x as i128, ))), @@ -202,8 +223,12 @@ pub fn page_iter_to_arrays<'a, I: 'a + DataPages>( PhysicalType::FixedLenByteArray(n) => { let n = *n; - let pages = - fixed_size_binary::Iter::new(pages, DataType::FixedSizeBinary(n), chunk_size); + let pages = fixed_size_binary::Iter::new( + pages, + DataType::FixedSizeBinary(n), + num_rows, + chunk_size, + ); let pages = pages.map(move |maybe_array| { let array = maybe_array?; @@ -228,12 +253,14 @@ pub fn page_iter_to_arrays<'a, I: 'a + DataPages>( Int64 | Date64 | Time64(_) | Duration(_) => dyn_iter(iden(primitive::Iter::new( pages, data_type, + num_rows, chunk_size, |x: i64| x as i64, ))), UInt64 => dyn_iter(iden(primitive::Iter::new( pages, data_type, + num_rows, chunk_size, |x: i64| x as u64, ))), @@ -241,32 +268,34 @@ pub fn page_iter_to_arrays<'a, I: 'a + DataPages>( Float32 => dyn_iter(iden(primitive::Iter::new( pages, data_type, + num_rows, chunk_size, |x: f32| x, ))), Float64 => dyn_iter(iden(primitive::Iter::new( pages, data_type, + num_rows, chunk_size, |x: f64| x, ))), Binary => dyn_iter(binary::Iter::, _>::new( - pages, data_type, chunk_size, + pages, data_type, chunk_size, num_rows, )), LargeBinary => dyn_iter(binary::Iter::, _>::new( - pages, data_type, chunk_size, + pages, data_type, chunk_size, num_rows, )), Utf8 => dyn_iter(binary::Iter::, _>::new( - pages, data_type, chunk_size, + pages, data_type, chunk_size, num_rows, )), LargeUtf8 => dyn_iter(binary::Iter::, _>::new( - pages, data_type, chunk_size, + pages, data_type, chunk_size, num_rows, )), Dictionary(key_type, _, _) => { return match_integer_type!(key_type, |$K| { - dict_read::<$K, _>(pages, physical_type, logical_type, data_type, chunk_size) + dict_read::<$K, _>(pages, physical_type, logical_type, data_type, num_rows, chunk_size) }) } @@ -315,11 +344,12 @@ fn timestamp<'a, I: 'a + DataPages>( physical_type: &PhysicalType, logical_type: &Option, data_type: DataType, + num_rows: usize, chunk_size: Option, time_unit: TimeUnit, ) -> Result> { if physical_type == &PhysicalType::Int96 { - let iter = primitive::Iter::new(pages, data_type, chunk_size, int96_to_i64_ns); + let iter = primitive::Iter::new(pages, data_type, num_rows, chunk_size, int96_to_i64_ns); let logical_type = PrimitiveLogicalType::Timestamp { unit: ParquetTimeUnit::Nanoseconds, is_adjusted_to_utc: false, @@ -338,7 +368,7 @@ fn timestamp<'a, I: 'a + DataPages>( )); } - let iter = primitive::Iter::new(pages, data_type, chunk_size, |x: i64| x); + let iter = primitive::Iter::new(pages, data_type, num_rows, chunk_size, |x: i64| x); let (factor, is_multiplier) = unifiy_timestmap_unit(logical_type, time_unit); match (factor, is_multiplier) { (1, _) => Ok(dyn_iter(iden(iter))), @@ -352,6 +382,7 @@ fn timestamp_dict<'a, K: DictionaryKey, I: 'a + DataPages>( physical_type: &PhysicalType, logical_type: &Option, data_type: DataType, + num_rows: usize, chunk_size: Option, time_unit: TimeUnit, ) -> Result> { @@ -365,12 +396,14 @@ fn timestamp_dict<'a, K: DictionaryKey, I: 'a + DataPages>( (a, true) => Ok(dyn_iter(primitive::DictIter::::new( pages, DataType::Timestamp(TimeUnit::Nanosecond, None), + num_rows, chunk_size, move |x| int96_to_i64_ns(x) * a, ))), (a, false) => Ok(dyn_iter(primitive::DictIter::::new( pages, DataType::Timestamp(TimeUnit::Nanosecond, None), + num_rows, chunk_size, move |x| int96_to_i64_ns(x) / a, ))), @@ -382,12 +415,14 @@ fn timestamp_dict<'a, K: DictionaryKey, I: 'a + DataPages>( (a, true) => Ok(dyn_iter(primitive::DictIter::::new( pages, data_type, + num_rows, chunk_size, move |x: i64| x * a, ))), (a, false) => Ok(dyn_iter(primitive::DictIter::::new( pages, data_type, + num_rows, chunk_size, move |x: i64| x / a, ))), @@ -399,6 +434,7 @@ fn dict_read<'a, K: DictionaryKey, I: 'a + DataPages>( physical_type: &PhysicalType, logical_type: &Option, data_type: DataType, + num_rows: usize, chunk_size: Option, ) -> Result> { use DataType::*; @@ -412,44 +448,54 @@ fn dict_read<'a, K: DictionaryKey, I: 'a + DataPages>( UInt8 => dyn_iter(primitive::DictIter::::new( iter, data_type, + num_rows, chunk_size, |x: i32| x as u8, )), UInt16 => dyn_iter(primitive::DictIter::::new( iter, data_type, + num_rows, chunk_size, |x: i32| x as u16, )), UInt32 => dyn_iter(primitive::DictIter::::new( iter, data_type, + num_rows, chunk_size, |x: i32| x as u32, )), UInt64 => dyn_iter(primitive::DictIter::::new( iter, data_type, + num_rows, chunk_size, |x: i64| x as u64, )), Int8 => dyn_iter(primitive::DictIter::::new( iter, data_type, + num_rows, chunk_size, |x: i32| x as i8, )), Int16 => dyn_iter(primitive::DictIter::::new( iter, data_type, + num_rows, chunk_size, |x: i32| x as i16, )), - Int32 | Date32 | Time32(_) | Interval(IntervalUnit::YearMonth) => dyn_iter( - primitive::DictIter::::new(iter, data_type, chunk_size, |x: i32| { - x as i32 - }), - ), + Int32 | Date32 | Time32(_) | Interval(IntervalUnit::YearMonth) => { + dyn_iter(primitive::DictIter::::new( + iter, + data_type, + num_rows, + chunk_size, + |x: i32| x as i32, + )) + } Timestamp(time_unit, _) => { let time_unit = *time_unit; @@ -458,35 +504,44 @@ fn dict_read<'a, K: DictionaryKey, I: 'a + DataPages>( physical_type, logical_type, data_type, + num_rows, chunk_size, time_unit, ); } - Int64 | Date64 | Time64(_) | Duration(_) => dyn_iter( - primitive::DictIter::::new(iter, data_type, chunk_size, |x: i64| x), - ), + Int64 | Date64 | Time64(_) | Duration(_) => { + dyn_iter(primitive::DictIter::::new( + iter, + data_type, + num_rows, + chunk_size, + |x: i64| x, + )) + } Float32 => dyn_iter(primitive::DictIter::::new( iter, data_type, + num_rows, chunk_size, |x: f32| x, )), Float64 => dyn_iter(primitive::DictIter::::new( iter, data_type, + num_rows, chunk_size, |x: f64| x, )), Utf8 | Binary => dyn_iter(binary::DictIter::::new( - iter, data_type, chunk_size, + iter, data_type, num_rows, chunk_size, )), LargeUtf8 | LargeBinary => dyn_iter(binary::DictIter::::new( - iter, data_type, chunk_size, + iter, data_type, num_rows, chunk_size, )), FixedSizeBinary(_) => dyn_iter(fixed_size_binary::DictIter::::new( - iter, data_type, chunk_size, + iter, data_type, num_rows, chunk_size, )), other => { return Err(Error::nyi(format!( diff --git a/src/io/parquet/read/deserialize/utils.rs b/src/io/parquet/read/deserialize/utils.rs index 4c022850130..7584c28723b 100644 --- a/src/io/parquet/read/deserialize/utils.rs +++ b/src/io/parquet/read/deserialize/utils.rs @@ -372,33 +372,37 @@ pub(super) fn extend_from_new_page<'a, T: Decoder<'a>>( mut page: T::State, chunk_size: Option, items: &mut VecDeque, + remaining: &mut usize, decoder: &T, ) { let capacity = chunk_size.unwrap_or(0); - let chunk_size = chunk_size.unwrap_or(usize::MAX); + let chunk_size = chunk_size.map(|x| x.min(*remaining)).unwrap_or(*remaining); let mut decoded = if let Some(decoded) = items.pop_back() { - // there is a already a state => it must be incomplete... - debug_assert!( - decoded.len() <= chunk_size, - "the temp state is expected to be incomplete" - ); + *remaining += decoded.len(); decoded } else { // there is no state => initialize it decoder.with_capacity(capacity) }; - let remaining = chunk_size - decoded.len(); + // e.g. chunk = 10, remaining = 100, decoded = 2 => 8.min(100) = 8 + // e.g. chunk = 100, remaining = 100, decoded = 0 => 100.min(100) = 100 + // e.g. chunk = 10, remaining = 2, decoded = 2 => 8.min(2) = 2 + let additional = (chunk_size - decoded.len()).min(*remaining); // extend the current state - decoder.extend_from_state(&mut page, &mut decoded, remaining); + decoder.extend_from_state(&mut page, &mut decoded, additional); + *remaining -= decoded.len(); items.push_back(decoded); - while page.len() > 0 { - let mut decoded = decoder.with_capacity(capacity); - decoder.extend_from_state(&mut page, &mut decoded, chunk_size); + while page.len() > 0 && *remaining > 0 { + let additional = chunk_size.min(*remaining); + + let mut decoded = decoder.with_capacity(additional); + decoder.extend_from_state(&mut page, &mut decoded, additional); + *remaining -= decoded.len(); items.push_back(decoded) } } @@ -414,6 +418,7 @@ pub enum MaybeNext

{ pub(super) fn next<'a, I: DataPages, D: Decoder<'a>>( iter: &'a mut I, items: &mut VecDeque, + remaining: &mut usize, chunk_size: Option, decoder: &D, ) -> MaybeNext> { @@ -422,6 +427,9 @@ pub(super) fn next<'a, I: DataPages, D: Decoder<'a>>( let item = items.pop_front().unwrap(); return MaybeNext::Some(Ok(item)); } + if *remaining == 0 { + return MaybeNext::None; + } match iter.next() { Err(e) => MaybeNext::Some(Err(e.into())), Ok(Some(page)) => { @@ -432,7 +440,7 @@ pub(super) fn next<'a, I: DataPages, D: Decoder<'a>>( Err(e) => return MaybeNext::Some(Err(e)), }; - extend_from_new_page(page, chunk_size, items, decoder); + extend_from_new_page(page, chunk_size, items, remaining, decoder); if (items.len() == 1) && items.front().unwrap().len() < chunk_size.unwrap_or(0) { MaybeNext::More diff --git a/src/io/parquet/read/row_group.rs b/src/io/parquet/read/row_group.rs index 8e9a6a97c64..4dde21e467d 100644 --- a/src/io/parquet/read/row_group.rs +++ b/src/io/parquet/read/row_group.rs @@ -175,7 +175,7 @@ pub fn to_deserializer<'a>( num_rows: usize, chunk_size: Option, ) -> Result> { - let chunk_size = chunk_size.unwrap_or(usize::MAX).min(num_rows); + let chunk_size = chunk_size.map(|c| c.min(num_rows)); let (columns, types): (Vec<_>, Vec<_>) = columns .into_iter() @@ -193,7 +193,7 @@ pub fn to_deserializer<'a>( }) .unzip(); - column_iter_to_arrays(columns, types, field, Some(chunk_size)) + column_iter_to_arrays(columns, types, field, chunk_size, num_rows) } /// Returns a vector of iterators of [`Array`] ([`ArrayIter`]) corresponding to the top diff --git a/tests/it/io/parquet/integration.rs b/tests/it/io/parquet/integration.rs index 71ce7facaf8..7f84c433b0d 100644 --- a/tests/it/io/parquet/integration.rs +++ b/tests/it/io/parquet/integration.rs @@ -14,7 +14,7 @@ fn test_file(version: &str, file_name: &str) -> Result<()> { let data = integration_write(&schema, &batches)?; - let (read_schema, read_batches) = integration_read(&data)?; + let (read_schema, read_batches) = integration_read(&data, None)?; assert_eq!(schema, read_schema); assert_eq!(batches, read_batches); diff --git a/tests/it/io/parquet/mod.rs b/tests/it/io/parquet/mod.rs index 346208e8951..12c8d89c872 100644 --- a/tests/it/io/parquet/mod.rs +++ b/tests/it/io/parquet/mod.rs @@ -1147,9 +1147,9 @@ fn integration_write(schema: &Schema, chunks: &[Chunk>]) -> Resul type IntegrationRead = (Schema, Vec>>); -fn integration_read(data: &[u8]) -> Result { +fn integration_read(data: &[u8], limit: Option) -> Result { let reader = Cursor::new(data); - let reader = FileReader::try_new(reader, None, None, None, None)?; + let reader = FileReader::try_new(reader, None, None, limit, None)?; let schema = reader.schema().clone(); for field in &schema.fields { @@ -1161,10 +1161,7 @@ fn integration_read(data: &[u8]) -> Result { Ok((schema, batches)) } -/// Tests that when arrow-specific types (Duration and LargeUtf8) are written to parquet, we can rountrip its -/// logical types. -#[test] -fn arrow_type() -> Result<()> { +fn generic_data() -> Result<(Schema, Chunk>)> { let array1 = PrimitiveArray::::from([Some(1), None, Some(2)]) .to(DataType::Duration(TimeUnit::Second)); let array2 = Utf8Array::::from([Some("a"), None, Some("bb")]); @@ -1252,9 +1249,18 @@ fn arrow_type() -> Result<()> { array14.boxed(), ])?; + Ok((schema, chunk)) +} + +/// Tests that when arrow-specific types (Duration and LargeUtf8) are written to parquet, we can rountrip its +/// logical types. +#[test] +fn arrow_type() -> Result<()> { + let (schema, chunk) = generic_data()?; + let r = integration_write(&schema, &[chunk.clone()])?; - let (new_schema, new_chunks) = integration_read(&r)?; + let (new_schema, new_chunks) = integration_read(&r, None)?; assert_eq!(new_schema, schema); assert_eq!(new_chunks, vec![chunk]); @@ -1297,7 +1303,11 @@ fn data>( array.into() } -fn list_array_generic(is_nullable: bool, array: ListArray) -> Result<()> { +fn list_array_generic( + is_nullable: bool, + array: ListArray, + limit: Option, +) -> Result<()> { let schema = Schema::from(vec![Field::new( "a1", array.data_type().clone(), @@ -1307,40 +1317,59 @@ fn list_array_generic(is_nullable: bool, array: ListArray) -> Resu let r = integration_write(&schema, &[chunk.clone()])?; - let (new_schema, new_chunks) = integration_read(&r)?; + let (new_schema, new_chunks) = integration_read(&r, limit)?; + + let expected = if let Some(limit) = limit { + let expected = chunk + .into_arrays() + .into_iter() + .map(|x| x.slice(0, limit)) + .collect::>(); + Chunk::new(expected) + } else { + chunk + }; assert_eq!(new_schema, schema); - assert_eq!(new_chunks, vec![chunk]); + assert_eq!(new_chunks, vec![expected]); Ok(()) } +fn test_list_array_required_required(limit: Option) -> Result<()> { + list_array_generic(false, data(0..12i8, false), limit)?; + list_array_generic(false, data(0..12i16, false), limit)?; + list_array_generic(false, data(0..12i32, false), limit)?; + list_array_generic(false, data(0..12i64, false), limit)?; + list_array_generic(false, data(0..12u8, false), limit)?; + list_array_generic(false, data(0..12u16, false), limit)?; + list_array_generic(false, data(0..12u32, false), limit)?; + list_array_generic(false, data(0..12u64, false), limit)?; + list_array_generic(false, data((0..12).map(|x| (x as f32) * 1.0), false), limit)?; + list_array_generic( + false, + data((0..12).map(|x| (x as f64) * 1.0f64), false), + limit, + ) +} + #[test] fn list_array_required_required() -> Result<()> { - list_array_generic(false, data(0..12i8, false))?; - list_array_generic(false, data(0..12i16, false))?; - list_array_generic(false, data(0..12i32, false))?; - list_array_generic(false, data(0..12i64, false))?; - list_array_generic(false, data(0..12u8, false))?; - list_array_generic(false, data(0..12u16, false))?; - list_array_generic(false, data(0..12u32, false))?; - list_array_generic(false, data(0..12u64, false))?; - list_array_generic(false, data((0..12).map(|x| (x as f32) * 1.0), false))?; - list_array_generic(false, data((0..12).map(|x| (x as f64) * 1.0f64), false)) + test_list_array_required_required(None) } #[test] fn list_array_optional_optional() -> Result<()> { - list_array_generic(true, data(0..12, true)) + list_array_generic(true, data(0..12, true), None) } #[test] fn list_array_required_optional() -> Result<()> { - list_array_generic(true, data(0..12, false)) + list_array_generic(true, data(0..12, false), None) } #[test] fn list_array_optional_required() -> Result<()> { - list_array_generic(false, data(0..12, true)) + list_array_generic(false, data(0..12, true), None) } #[test] @@ -1353,7 +1382,7 @@ fn list_utf8() -> Result<()> { let mut array = MutableListArray::::new_with_field(MutableUtf8Array::::new(), "item", true); array.try_extend(data).unwrap(); - list_array_generic(false, array.into()) + list_array_generic(false, array.into(), None) } #[test] @@ -1366,7 +1395,7 @@ fn list_large_utf8() -> Result<()> { let mut array = MutableListArray::::new_with_field(MutableUtf8Array::::new(), "item", true); array.try_extend(data).unwrap(); - list_array_generic(false, array.into()) + list_array_generic(false, array.into(), None) } #[test] @@ -1379,7 +1408,7 @@ fn list_binary() -> Result<()> { let mut array = MutableListArray::::new_with_field(MutableBinaryArray::::new(), "item", true); array.try_extend(data).unwrap(); - list_array_generic(false, array.into()) + list_array_generic(false, array.into(), None) } #[test] @@ -1392,7 +1421,7 @@ fn large_list_large_binary() -> Result<()> { let mut array = MutableListArray::::new_with_field(MutableBinaryArray::::new(), "item", true); array.try_extend(data).unwrap(); - list_array_generic(false, array.into()) + list_array_generic(false, array.into(), None) } #[test] @@ -1408,7 +1437,7 @@ fn list_utf8_nullable() -> Result<()> { let mut array = MutableListArray::::new_with_field(MutableUtf8Array::::new(), "item", true); array.try_extend(data).unwrap(); - list_array_generic(true, array.into()) + list_array_generic(true, array.into(), None) } #[test] @@ -1427,5 +1456,30 @@ fn list_int_nullable() -> Result<()> { true, ); array.try_extend(data).unwrap(); - list_array_generic(true, array.into()) + list_array_generic(true, array.into(), None) +} + +#[test] +fn limit() -> Result<()> { + let (schema, chunk) = generic_data()?; + + let r = integration_write(&schema, &[chunk.clone()])?; + + let (new_schema, new_chunks) = integration_read(&r, Some(2))?; + + let expected = chunk + .into_arrays() + .into_iter() + .map(|x| x.slice(0, 2)) + .collect::>(); + let expected = Chunk::new(expected); + + assert_eq!(new_schema, schema); + assert_eq!(new_chunks, vec![expected]); + Ok(()) +} + +#[test] +fn limit_list() -> Result<()> { + test_list_array_required_required(Some(2)) } diff --git a/tests/it/io/parquet/read_indexes.rs b/tests/it/io/parquet/read_indexes.rs index ee91ba846a5..d0b74606dfb 100644 --- a/tests/it/io/parquet/read_indexes.rs +++ b/tests/it/io/parquet/read_indexes.rs @@ -124,7 +124,8 @@ fn read_with_indexes( vec![pages], vec![&c1.descriptor().descriptor.primitive_type], schema.fields[1].clone(), - Some(row_group.num_rows() as usize), + None, + row_group.num_rows() as usize, )?; let arrays = arrays.collect::>>()?;