diff --git a/rust/datafusion/src/datasource/parquet.rs b/rust/datafusion/src/datasource/parquet.rs index a3ea303881eaf..2656b54112612 100644 --- a/rust/datafusion/src/datasource/parquet.rs +++ b/rust/datafusion/src/datasource/parquet.rs @@ -32,7 +32,6 @@ use parquet::file::reader::*; use crate::datasource::{RecordBatchIterator, ScanResult, Table}; use crate::execution::error::{ExecutionError, Result}; -use arrow::array::BinaryArray; use arrow::builder::{BinaryBuilder, Int64Builder}; use parquet::data_type::Int96; use parquet::reader::schema::parquet_to_arrow_schema; @@ -82,60 +81,31 @@ pub struct ParquetFile { column_readers: Vec, } -fn create_binary_array(b: &Vec, row_count: usize) -> Result> { - let mut builder = BinaryBuilder::new(b.len()); - for i in 0..row_count { - builder.append_string(&String::from_utf8(b[i].data().to_vec()).unwrap())?; - } - Ok(Arc::new(builder.finish())) -} - macro_rules! read_binary_column { ($SELF:ident, $R:ident, $INDEX:expr) => {{ - //TODO: should be able to get num_rows in row group instead of defaulting to batch size let mut read_buffer: Vec = - Vec::with_capacity($SELF.batch_size); - for _ in 0..$SELF.batch_size { - read_buffer.push(ByteArray::default()); - } - if $SELF.projection_schema.field($INDEX).is_nullable() { - - let mut def_levels: Vec = Vec::with_capacity($SELF.batch_size); - for _ in 0..$SELF.batch_size { - def_levels.push(0); - } - - let (values_read, levels_read) = $R.read_batch( - $SELF.batch_size, - Some(&mut def_levels), - None, - &mut read_buffer, - )?; - if values_read == levels_read { - create_binary_array(&read_buffer, values_read)? + vec![ByteArray::default(); $SELF.batch_size]; + let mut def_levels: Vec = vec![0; $SELF.batch_size]; + let (_, levels_read) = $R.read_batch( + $SELF.batch_size, + Some(&mut def_levels), + None, + &mut read_buffer, + )?; + let mut builder = BinaryBuilder::new(levels_read); + let mut value_index = 0; + for i in 0..levels_read { + if def_levels[i] > 0 { + builder.append_string( + &String::from_utf8(read_buffer[value_index].data().to_vec()).unwrap(), + )?; + value_index += 1; } else { - let mut builder = BinaryBuilder::new(levels_read); - let mut value_index = 0; - for i in 0..levels_read { - if def_levels[i] > 0 { - builder.append_string(&String::from_utf8(read_buffer[value_index].data().to_vec()).unwrap())?; - value_index += 1; - } else { - builder.append_null()?; - } - } - Arc::new(builder.finish()) + builder.append_null()?; } - } else { - let (values_read, _) = $R.read_batch( - $SELF.batch_size, - None, - None, - &mut read_buffer, - )?; - create_binary_array(&read_buffer, values_read)? } - }} + Arc::new(builder.finish()) + }}; } trait ArrowReader @@ -165,10 +135,7 @@ where let mut read_buffer: Vec = vec![A::default_value().into(); batch_size]; if is_nullable { - let mut def_levels: Vec = Vec::with_capacity(batch_size); - for _ in 0..batch_size { - def_levels.push(0); - } + let mut def_levels: Vec = vec![0; batch_size]; let (values_read, levels_read) = self.read_batch( batch_size, @@ -308,67 +275,29 @@ impl ParquetFile { } ColumnReader::Int96ColumnReader(ref mut r) => { let mut read_buffer: Vec = - Vec::with_capacity(self.batch_size); + vec![Int96::new(); self.batch_size]; - for _ in 0..self.batch_size { - read_buffer.push(Int96::new()); - } - - if self.projection_schema.field(i).is_nullable() { - let mut def_levels: Vec = - Vec::with_capacity(self.batch_size); - for _ in 0..self.batch_size { - def_levels.push(0); - } - let (values_read, levels_read) = r.read_batch( - self.batch_size, - Some(&mut def_levels), - None, - &mut read_buffer, - )?; - - if values_read == levels_read { - let mut builder = Int64Builder::new(values_read); - for i in 0..values_read { - builder.append_value(convert_int96_timestamp( - read_buffer[i].data(), - ))?; - } - Arc::new(builder.finish()) - } else { - let mut builder = Int64Builder::new(levels_read); - let mut value_index = 0; - for i in 0..levels_read { - if def_levels[i] > 0 { - builder.append_value( - convert_int96_timestamp( - read_buffer[value_index].data(), - ), - )?; - value_index += 1; - } else { - builder.append_null()?; - } - } - Arc::new(builder.finish()) - } - } else { - let (values_read, _) = r.read_batch( - self.batch_size, - None, - None, - &mut read_buffer, - )?; - - let mut builder = Int64Builder::new(values_read); - - for i in 0..values_read { + let mut def_levels: Vec = vec![0; self.batch_size]; + let (_, levels_read) = r.read_batch( + self.batch_size, + Some(&mut def_levels), + None, + &mut read_buffer, + )?; + + let mut builder = Int64Builder::new(levels_read); + let mut value_index = 0; + for i in 0..levels_read { + if def_levels[i] > 0 { builder.append_value(convert_int96_timestamp( - read_buffer[i].data(), + read_buffer[value_index].data(), ))?; + value_index += 1; + } else { + builder.append_null()?; } - Arc::new(builder.finish()) } + Arc::new(builder.finish()) } ColumnReader::FloatColumnReader(ref mut r) => { ArrowReader::::read(