Skip to content

Commit

Permalink
More tests
Browse files Browse the repository at this point in the history
  • Loading branch information
tustvold committed May 8, 2022
1 parent 1b99c49 commit 05c2311
Show file tree
Hide file tree
Showing 2 changed files with 75 additions and 18 deletions.
46 changes: 32 additions & 14 deletions parquet/src/arrow/array_reader/list_array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,8 @@ mod tests {
// [[1, null], null, [4], []],
// [],
// [[7]],
// [[]]
// [[]],
// [[1, 2, 3], [4, null, 6], null]
// ],
// null,
// [],
Expand All @@ -305,29 +306,35 @@ mod tests {
None,
Some(4),
Some(7),
Some(1),
Some(2),
Some(3),
Some(4),
None,
Some(6),
Some(11),
]);

// [[1, null], null, [4], [], [7], [], [11]]
let offsets = to_offsets::<OffsetSize>(vec![0, 2, 2, 3, 3, 4, 4, 5]);
// [[1, null], null, [4], [], [7], [], [1, 2, 3], [4, null, 6], null, [11]]
let offsets = to_offsets::<OffsetSize>(vec![0, 2, 2, 3, 3, 4, 4, 7, 10, 10, 11]);
let l3 = ArrayDataBuilder::new(l3_type.clone())
.len(7)
.len(10)
.add_buffer(offsets)
.add_child_data(leaf.data().clone())
.null_bit_buffer(Buffer::from([0b01111101]))
.null_bit_buffer(Buffer::from([0b11111101, 0b00000010]))
.build()
.unwrap();

// [[[1, null], null, [4], []],[], [[7]], [[]], [[11]]]
let offsets = to_offsets::<OffsetSize>(vec![0, 4, 4, 5, 6, 7]);
// [[[1, null], null, [4], []], [], [[7]], [[]], [[1, 2, 3], [4, null, 6], null], [[11]]]
let offsets = to_offsets::<OffsetSize>(vec![0, 4, 4, 5, 6, 9, 10]);
let l2 = ArrayDataBuilder::new(l2_type.clone())
.len(5)
.len(6)
.add_buffer(offsets)
.add_child_data(l3)
.build()
.unwrap();

let offsets = to_offsets::<OffsetSize>(vec![0, 4, 4, 4, 5]);
let offsets = to_offsets::<OffsetSize>(vec![0, 5, 5, 5, 6]);
let l1 = ArrayDataBuilder::new(l1_type.clone())
.len(4)
.add_buffer(offsets)
Expand All @@ -347,6 +354,13 @@ mod tests {
None,
Some(7),
None,
Some(1),
Some(2),
Some(3),
Some(4),
None,
Some(6),
None,
None,
None,
Some(11),
Expand All @@ -355,8 +369,8 @@ mod tests {
let item_array_reader = InMemoryArrayReader::new(
ArrowType::Int32,
values,
Some(vec![6, 5, 3, 6, 4, 2, 6, 4, 0, 1, 6]),
Some(vec![0, 3, 2, 2, 2, 1, 1, 1, 0, 0, 0]),
Some(vec![6, 5, 3, 6, 4, 2, 6, 4, 6, 6, 6, 6, 5, 6, 3, 0, 1, 6]),
Some(vec![0, 3, 2, 2, 2, 1, 1, 1, 1, 3, 3, 2, 3, 3, 2, 0, 0, 0]),
);

let l3 = ListArrayReader::<OffsetSize>::new(
Expand Down Expand Up @@ -386,10 +400,14 @@ mod tests {
true,
);

let actual = l1.next_batch(1024).unwrap();
let actual = downcast::<OffsetSize>(&actual);
let expected_1 = expected.slice(0, 2);
let expected_2 = expected.slice(2, 2);

assert_eq!(&expected, actual)
let actual = l1.next_batch(2).unwrap();
assert_eq!(expected_1.as_ref(), actual.as_ref());

let actual = l1.next_batch(1024).unwrap();
assert_eq!(expected_2.as_ref(), actual.as_ref());
}

fn test_required_list<OffsetSize: OffsetSizeTrait>() {
Expand Down
47 changes: 43 additions & 4 deletions parquet/src/arrow/array_reader/test_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,8 @@ pub struct InMemoryArrayReader {
array: ArrayRef,
def_levels: Option<Vec<i16>>,
rep_levels: Option<Vec<i16>>,
last_idx: usize,
cur_idx: usize,
}

impl InMemoryArrayReader {
Expand All @@ -126,6 +128,8 @@ impl InMemoryArrayReader {
array,
def_levels,
rep_levels,
cur_idx: 0,
last_idx: 0,
}
}
}
Expand All @@ -139,16 +143,51 @@ impl ArrayReader for InMemoryArrayReader {
&self.data_type
}

fn next_batch(&mut self, _batch_size: usize) -> Result<ArrayRef> {
Ok(self.array.clone())
fn next_batch(&mut self, batch_size: usize) -> Result<ArrayRef> {
assert_ne!(batch_size, 0);

// This replicates the logical normally performed by
// RecordReader to delimit semantic records
let (levels, records) = match &self.rep_levels {
Some(v) => {
let mut records = 0;
let mut levels = 0;
for v in &v[self.cur_idx..] {
if *v == 0 {
// Start of new record
records += 1;
if records == batch_size + 1 {
break;
}
}
levels += 1;
}
(levels, records - 1)
}
None => {
let records = batch_size.min(self.array.len() - self.cur_idx);
(records, records)
}
};

let array = self.array.slice(self.cur_idx, records);

self.last_idx = self.cur_idx;
self.cur_idx += levels;

Ok(array)
}

fn get_def_levels(&self) -> Option<&[i16]> {
self.def_levels.as_deref()
self.def_levels
.as_ref()
.map(|l| &l[self.last_idx..self.cur_idx])
}

fn get_rep_levels(&self) -> Option<&[i16]> {
self.rep_levels.as_deref()
self.rep_levels
.as_ref()
.map(|l| &l[self.last_idx..self.cur_idx])
}
}

Expand Down

0 comments on commit 05c2311

Please sign in to comment.