diff --git a/rust/datafusion/src/datasource/parquet.rs b/rust/datafusion/src/datasource/parquet.rs index 6546f187fb280..cd24d6357bef7 100644 --- a/rust/datafusion/src/datasource/parquet.rs +++ b/rust/datafusion/src/datasource/parquet.rs @@ -22,7 +22,7 @@ use std::string::String; use std::sync::{Arc, Mutex}; use arrow::array::Array; -use arrow::datatypes::{Field, Schema}; +use arrow::datatypes::{DataType, Schema}; use arrow::record_batch::RecordBatch; use parquet::column::reader::*; @@ -31,6 +31,7 @@ use parquet::file::reader::*; use crate::datasource::{RecordBatchIterator, ScanResult, Table}; use crate::execution::error::{ExecutionError, Result}; +use arrow::array::BinaryArray; use arrow::builder::BooleanBuilder; use arrow::builder::Int64Builder; use arrow::builder::{BinaryBuilder, Float32Builder, Float64Builder, Int32Builder}; @@ -81,6 +82,94 @@ pub struct ParquetFile { column_readers: Vec, } +fn create_binary_array(b: &Vec, row_count: usize) -> Result> { + let mut builder = BinaryBuilder::new(b.len()); + for j in 0..row_count { + let slice = b[j].slice(0, b[j].len()); + builder.append_string(&String::from_utf8(slice.data().to_vec()).unwrap())?; + } + Ok(Arc::new(builder.finish())) +} + +macro_rules! read_column { + ($SELF:ident, $R:ident, $INDEX:expr, $BUILDER:ident, $TY:ident, $DEFAULT:expr) => {{ + //TODO: should be able to get num_rows in row group instead of defaulting to batch size + let mut read_buffer: Vec<$TY> = Vec::with_capacity($SELF.batch_size); + for _ in 0..$SELF.batch_size { + read_buffer.push($DEFAULT); + } + if $SELF.schema.field($INDEX).is_nullable() { + + let mut def_levels: Vec = Vec::with_capacity($SELF.batch_size); + for _ in 0..$SELF.batch_size { + def_levels.push(0); + } + + let (values_read, levels_read) = $R.read_batch( + $SELF.batch_size, + Some(&mut def_levels), + None, + &mut read_buffer, + )?; + let mut builder = $BUILDER::new(levels_read); + if values_read == levels_read { + builder.append_slice(&read_buffer[0..values_read])?; + } else { + return Err(ExecutionError::NotImplemented("Parquet datasource does not support null values".to_string())) + } + Arc::new(builder.finish()) + } else { + let (values_read, _) = $R.read_batch( + $SELF.batch_size, + None, + None, + &mut read_buffer, + )?; + let mut builder = $BUILDER::new(values_read); + builder.append_slice(&read_buffer[0..values_read])?; + Arc::new(builder.finish()) + } + }} +} + +macro_rules! read_binary_column { + ($SELF:ident, $R:ident, $INDEX:expr) => {{ + //TODO: should be able to get num_rows in row group instead of defaulting to batch size + let mut read_buffer: Vec = + Vec::with_capacity($SELF.batch_size); + for _ in 0..$SELF.batch_size { + read_buffer.push(ByteArray::default()); + } + if $SELF.schema.field($INDEX).is_nullable() { + + let mut def_levels: Vec = Vec::with_capacity($SELF.batch_size); + for _ in 0..$SELF.batch_size { + def_levels.push(0); + } + + let (values_read, levels_read) = $R.read_batch( + $SELF.batch_size, + Some(&mut def_levels), + None, + &mut read_buffer, + )?; + if values_read == levels_read { + create_binary_array(&read_buffer, values_read)? + } else { + return Err(ExecutionError::NotImplemented("Parquet datasource does not support null values".to_string())) + } + } else { + let (values_read, _) = $R.read_batch( + $SELF.batch_size, + None, + None, + &mut read_buffer, + )?; + create_binary_array(&read_buffer, values_read)? + } + }} +} + impl ParquetFile { pub fn open(file: File, projection: Option>) -> Result { let reader = SerializedFileReader::new(file)?; @@ -100,12 +189,7 @@ impl ParquetFile { } }; - let projected_fields: Vec = projection - .iter() - .map(|i| schema.fields()[*i].clone()) - .collect(); - - let projected_schema = Arc::new(Schema::new(projected_fields)); + let projected_schema = schema.projection(&projection)?; Ok(ParquetFile { reader: reader, @@ -124,8 +208,23 @@ impl ParquetFile { self.column_readers = Vec::with_capacity(self.projection.len()); - for i in &self.projection { - self.column_readers.push(reader.get_column_reader(*i)?); + for i in 0..self.projection.len() { + match self.schema().field(i).data_type() { + DataType::List(_) => { + return Err(ExecutionError::NotImplemented( + "Parquet datasource does not support LIST".to_string(), + )); + } + DataType::Struct(_) => { + return Err(ExecutionError::NotImplemented( + "Parquet datasource does not support STRUCT".to_string(), + )); + } + _ => {} + } + + self.column_readers + .push(reader.get_column_reader(self.projection[i])?); } self.current_row_group = Some(reader); @@ -143,92 +242,16 @@ impl ParquetFile { match &self.current_row_group { Some(reader) => { let mut batch: Vec> = Vec::with_capacity(reader.num_columns()); - let mut row_count = 0; for i in 0..self.column_readers.len() { let array: Arc = match self.column_readers[i] { ColumnReader::BoolColumnReader(ref mut r) => { - let mut read_buffer: Vec = - Vec::with_capacity(self.batch_size); - - for _ in 0..self.batch_size { - read_buffer.push(false); - } - - match r.read_batch( - self.batch_size, - None, - None, - &mut read_buffer, - ) { - Ok((count, _)) => { - let mut builder = BooleanBuilder::new(count); - builder.append_slice(&read_buffer[0..count])?; - row_count = count; - Arc::new(builder.finish()) - } - Err(e) => { - return Err(ExecutionError::NotImplemented(format!( - "Error reading parquet batch (column {}): {:?}", - i, e - ))); - } - } + read_column!(self, r, i, BooleanBuilder, bool, false) } ColumnReader::Int32ColumnReader(ref mut r) => { - let mut read_buffer: Vec = - Vec::with_capacity(self.batch_size); - - for _ in 0..self.batch_size { - read_buffer.push(0); - } - - match r.read_batch( - self.batch_size, - None, - None, - &mut read_buffer, - ) { - Ok((count, _)) => { - let mut builder = Int32Builder::new(count); - builder.append_slice(&read_buffer[0..count])?; - row_count = count; - Arc::new(builder.finish()) - } - Err(e) => { - return Err(ExecutionError::NotImplemented(format!( - "Error reading parquet batch (column {}): {:?}", - i, e - ))); - } - } + read_column!(self, r, i, Int32Builder, i32, 0) } ColumnReader::Int64ColumnReader(ref mut r) => { - let mut read_buffer: Vec = - Vec::with_capacity(self.batch_size); - - for _ in 0..self.batch_size { - read_buffer.push(0); - } - - match r.read_batch( - self.batch_size, - None, - None, - &mut read_buffer, - ) { - Ok((count, _)) => { - let mut builder = Int64Builder::new(count); - builder.append_slice(&read_buffer[0..count])?; - row_count = count; - Arc::new(builder.finish()) - } - Err(e) => { - return Err(ExecutionError::NotImplemented(format!( - "Error reading parquet batch (column {}): {:?}", - i, e - ))); - } - } + read_column!(self, r, i, Int64Builder, i64, 0) } ColumnReader::Int96ColumnReader(ref mut r) => { let mut read_buffer: Vec = @@ -238,16 +261,23 @@ impl ParquetFile { read_buffer.push(Int96::new()); } - match r.read_batch( - self.batch_size, - None, - None, - &mut read_buffer, - ) { - Ok((count, _)) => { - let mut builder = Int64Builder::new(count); + if self.schema.field(i).is_nullable() { + let mut def_levels: Vec = + Vec::with_capacity(self.batch_size); + for _ in 0..self.batch_size { + def_levels.push(0); + } + let (values_read, levels_read) = r.read_batch( + self.batch_size, + Some(&mut def_levels), + None, + &mut read_buffer, + )?; + + if values_read == levels_read { + let mut builder = Int64Builder::new(values_read); - for i in 0..count { + for i in 0..values_read { let v = read_buffer[i].data(); let value: u128 = (v[0] as u128) << 64 | (v[1] as u128) << 32 @@ -255,129 +285,52 @@ impl ParquetFile { let ms: i64 = (value / 1000000) as i64; builder.append_value(ms)?; } - row_count = count; Arc::new(builder.finish()) + } else { + return Err(ExecutionError::NotImplemented( + "Parquet datasource does not support null values" + .to_string(), + )); } - Err(e) => { - return Err(ExecutionError::NotImplemented(format!( - "Error reading parquet batch (column {}): {:?}", - i, e - ))); + } else { + let (values_read, _) = r.read_batch( + self.batch_size, + None, + None, + &mut read_buffer, + )?; + + let mut builder = Int64Builder::new(values_read); + + for i in 0..values_read { + let v = read_buffer[i].data(); + let value: u128 = (v[0] as u128) << 64 + | (v[1] as u128) << 32 + | (v[2] as u128); + let ms: i64 = (value / 1000000) as i64; + builder.append_value(ms)?; } + Arc::new(builder.finish()) } } ColumnReader::FloatColumnReader(ref mut r) => { - let mut builder = Float32Builder::new(self.batch_size); - let mut read_buffer: Vec = - Vec::with_capacity(self.batch_size); - for _ in 0..self.batch_size { - read_buffer.push(0.0); - } - match r.read_batch( - self.batch_size, - None, - None, - &mut read_buffer, - ) { - Ok((count, _)) => { - builder.append_slice(&read_buffer[0..count])?; - row_count = count; - Arc::new(builder.finish()) - } - Err(e) => { - return Err(ExecutionError::NotImplemented(format!( - "Error reading parquet batch (column {}): {:?}", - i, e - ))); - } - } + read_column!(self, r, i, Float32Builder, f32, 0_f32) } ColumnReader::DoubleColumnReader(ref mut r) => { - let mut builder = Float64Builder::new(self.batch_size); - let mut read_buffer: Vec = - Vec::with_capacity(self.batch_size); - for _ in 0..self.batch_size { - read_buffer.push(0.0); - } - match r.read_batch( - self.batch_size, - None, - None, - &mut read_buffer, - ) { - Ok((count, _)) => { - builder.append_slice(&read_buffer[0..count])?; - row_count = count; - Arc::new(builder.finish()) - } - Err(e) => { - return Err(ExecutionError::NotImplemented(format!( - "Error reading parquet batch (column {}): {:?}", - i, e - ))); - } - } + read_column!(self, r, i, Float64Builder, f64, 0_f64) } ColumnReader::FixedLenByteArrayColumnReader(ref mut r) => { - let mut b: Vec = - Vec::with_capacity(self.batch_size); - for _ in 0..self.batch_size { - b.push(ByteArray::default()); - } - match r.read_batch(self.batch_size, None, None, &mut b) { - Ok((count, _)) => { - row_count = count; - let mut builder = BinaryBuilder::new(row_count); - for j in 0..row_count { - let slice = b[j].slice(0, b[j].len()); - builder.append_string( - &String::from_utf8(slice.data().to_vec()) - .unwrap(), - )?; - } - Arc::new(builder.finish()) - } - _ => { - return Err(ExecutionError::NotImplemented(format!( - "Error reading parquet batch (column {})", - i - ))); - } - } + read_binary_column!(self, r, i) } ColumnReader::ByteArrayColumnReader(ref mut r) => { - let mut b: Vec = - Vec::with_capacity(self.batch_size); - for _ in 0..self.batch_size { - b.push(ByteArray::default()); - } - match r.read_batch(self.batch_size, None, None, &mut b) { - Ok((count, _)) => { - row_count = count; - let mut builder = BinaryBuilder::new(row_count); - for j in 0..row_count { - let slice = b[j].slice(0, b[j].len()); - builder.append_string( - &String::from_utf8(slice.data().to_vec()) - .unwrap(), - )?; - } - Arc::new(builder.finish()) - } - _ => { - return Err(ExecutionError::NotImplemented(format!( - "Error reading parquet batch (column {})", - i - ))); - } - } + read_binary_column!(self, r, i) } }; batch.push(array); } - if row_count == 0 { + if batch.len() == 0 || batch[0].data().len() == 0 { Ok(None) } else { Ok(Some(RecordBatch::try_new(self.schema.clone(), batch)?)) @@ -604,8 +557,8 @@ mod tests { #[test] fn read_int64_nullable_impala_parquet() { let table = load_table("nullable.impala.parquet"); - let projection = Some(vec![0]); + let scan = table.scan(&projection, 1024).unwrap(); let mut it = scan[0].lock().unwrap(); let batch = it.next().unwrap().unwrap(); @@ -626,6 +579,20 @@ mod tests { assert_eq!("[1, 2, 3, 4, 5, 6, 7]", format!("{:?}", values)); } + #[test] + fn read_array_nullable_impala_parquet() { + let table = load_table("nullable.impala.parquet"); + let projection = Some(vec![1]); + let scan = table.scan(&projection, 1024).unwrap(); + let mut it = scan[0].lock().unwrap(); + let batch = it.next(); + + assert_eq!( + "NotImplemented(\"Parquet datasource does not support LIST\")", + format!("{:?}", batch.err().unwrap()) + ); + } + fn load_table(name: &str) -> Box { let testdata = env::var("PARQUET_TEST_DATA").unwrap(); let filename = format!("{}/{}", testdata, name);