Skip to content

Commit

Permalink
Fix test_struct_array_reader
Browse files Browse the repository at this point in the history
  • Loading branch information
tustvold committed May 8, 2022
1 parent f250ba7 commit 15b8144
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 26 deletions.
6 changes: 3 additions & 3 deletions parquet/src/arrow/array_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1532,15 +1532,15 @@ mod tests {
ArrowType::Int32,
array_1.clone(),
Some(vec![0, 1, 2, 3, 1]),
Some(vec![1, 1, 1, 1, 1]),
Some(vec![0, 1, 1, 1, 1]),
);

let array_2 = Arc::new(PrimitiveArray::<ArrowInt32>::from(vec![5, 4, 3, 2, 1]));
let array_reader_2 = InMemoryArrayReader::new(
ArrowType::Int32,
array_2.clone(),
Some(vec![0, 1, 3, 1, 2]),
Some(vec![1, 1, 1, 1, 1]),
Some(vec![0, 1, 1, 1, 1]),
);

let struct_type = ArrowType::Struct(vec![
Expand Down Expand Up @@ -1570,7 +1570,7 @@ mod tests {
struct_array_reader.get_def_levels()
);
assert_eq!(
Some(vec![1, 1, 1, 1, 1].as_slice()),
Some(vec![0, 1, 1, 1, 1].as_slice()),
struct_array_reader.get_rep_levels()
);
}
Expand Down
41 changes: 18 additions & 23 deletions parquet/src/arrow/array_reader/test_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,37 +145,32 @@ impl ArrayReader for InMemoryArrayReader {

fn next_batch(&mut self, batch_size: usize) -> Result<ArrayRef> {
assert_ne!(batch_size, 0);

// This replicates the logical normally performed by
// RecordReader to delimit semantic records
let (levels, records) = match &self.rep_levels {
Some(v) => {
let mut records = 0;
let mut levels = 0;
for v in &v[self.cur_idx..] {
if *v == 0 {
// Start of new record
records += 1;
if records == batch_size + 1 {
break;
}
let read = match &self.rep_levels {
Some(rep_levels) => {
let rep_levels = &rep_levels[self.cur_idx..];
let mut levels_read = 0;
let mut records_read = 0;
while levels_read < rep_levels.len() && records_read < batch_size {
if rep_levels[levels_read] == 0 {
records_read += 1; // Start of new record
}
levels += 1;
levels_read += 1;
}
(levels, records - 1)
}
None => {
let records = batch_size.min(self.array.len() - self.cur_idx);
(records, records)

// Find end of current record
while levels_read < rep_levels.len() && rep_levels[levels_read] != 0 {
levels_read += 1
}
levels_read
}
None => batch_size.min(self.array.len() - self.cur_idx),
};

let array = self.array.slice(self.cur_idx, records);

self.last_idx = self.cur_idx;
self.cur_idx += levels;

Ok(array)
self.cur_idx += read;
Ok(self.array.slice(self.last_idx, read))
}

fn get_def_levels(&self) -> Option<&[i16]> {
Expand Down

0 comments on commit 15b8144

Please sign in to comment.