From aea9f8a1162e59ecb70dfcd273dcba5566a11316 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Sat, 9 Mar 2019 14:30:26 -0700 Subject: [PATCH] convert to use row iter --- rust/datafusion/src/datasource/parquet.rs | 78 ++++++++++++----------- 1 file changed, 40 insertions(+), 38 deletions(-) diff --git a/rust/datafusion/src/datasource/parquet.rs b/rust/datafusion/src/datasource/parquet.rs index 4dcd8d969be2c..1f2a66d2bb017 100644 --- a/rust/datafusion/src/datasource/parquet.rs +++ b/rust/datafusion/src/datasource/parquet.rs @@ -28,15 +28,13 @@ use arrow::datatypes::{DataType, Field, Schema}; use arrow::record_batch::RecordBatch; use parquet::basic; -use parquet::column::reader::*; -use parquet::data_type::ByteArray; use parquet::file::reader::*; -use parquet::schema::types::Type; use parquet::record::{Row, RowAccessor}; +use parquet::schema::types::Type; use crate::datasource::{RecordBatchIterator, Table}; use crate::execution::error::{ExecutionError, Result}; -use arrow::builder::{BinaryBuilder, Float64Builder, Int32Builder}; +use arrow::builder::{BinaryBuilder, Float32Builder, Float64Builder, Int32Builder}; pub struct ParquetTable { filename: String, @@ -121,27 +119,7 @@ impl ParquetFile { fn load_next_row_group(&mut self) { if self.row_group_index < self.reader.num_row_groups() { - //println!("Loading row group {} of {}", self.row_group_index, self.reader.num_row_groups()); let reader = self.reader.get_row_group(self.row_group_index).unwrap(); - -// self.column_readers = vec![]; -// -// match &self.projection { -// None => { -// for i in 0..reader.num_columns() { -// self.column_readers -// .push(reader.get_column_reader(i).unwrap()); -// } -// } -// Some(proj) => { -// for i in proj { -// //TODO validate index in bounds -// self.column_readers -// .push(reader.get_column_reader(*i).unwrap()); -// } -// } -// } - self.current_row_group = Some(reader); self.row_group_index += 1; } else { @@ -152,10 +130,9 @@ impl ParquetFile { fn load_batch(&mut self) -> Result> { match &self.current_row_group { Some(reader) => { - // read batch of rows into memory -// let parquet_projection = self.projection.iter().map(|i| reader.metadata().schema_descr().column(*i)).collect(); + // let parquet_projection = self.projection.iter().map(|i| reader.metadata().schema_descr().column(*i)).collect(); let mut row_iter = reader.get_row_iter(None).unwrap(); //TODO projection push down let mut rows: Vec = Vec::with_capacity(self.batch_size); @@ -168,7 +145,8 @@ impl ParquetFile { println!("Loaded {} rows into memory", rows.len()); // convert to columnar batch - let mut batch: Vec> = Vec::with_capacity(self.projection.len()); + let mut batch: Vec> = + Vec::with_capacity(self.projection.len()); for i in &self.projection { let array: Arc = match self.schema.field(*i).data_type() { DataType::Int32 => { @@ -179,17 +157,44 @@ impl ParquetFile { } Arc::new(builder.finish()) } + DataType::Float32 => { + let mut builder = Float32Builder::new(rows.len()); + for row in &rows { + //TODO null handling + builder.append_value(row.get_float(*i).unwrap()).unwrap(); + } + Arc::new(builder.finish()) + } + DataType::Float64 => { + let mut builder = Float64Builder::new(rows.len()); + for row in &rows { + //TODO null handling + builder + .append_value(row.get_double(*i).unwrap()) + .unwrap(); + } + Arc::new(builder.finish()) + } DataType::Utf8 => { let mut builder = BinaryBuilder::new(rows.len()); for row in &rows { //TODO null handling let bytes = row.get_bytes(*i).unwrap(); - builder.append_string(&String::from_utf8(bytes.data().to_vec()).unwrap()).unwrap(); + builder + .append_string( + &String::from_utf8(bytes.data().to_vec()) + .unwrap(), + ) + .unwrap(); } Arc::new(builder.finish()) } - other => return Err(ExecutionError::NotImplemented( - format!("unsupported column reader type ({:?})", other))) + other => { + return Err(ExecutionError::NotImplemented(format!( + "unsupported column reader type ({:?})", + other + ))); + } }; batch.push(array); } @@ -200,9 +205,9 @@ impl ParquetFile { Ok(None) } else { Ok(Some(RecordBatch::try_new( - self.schema.projection(&self.projection)?, - batch, - )?)) + self.schema.projection(&self.projection)?, + batch, + )?)) } } _ => Ok(None), @@ -299,14 +304,11 @@ mod tests { .downcast_ref::() .unwrap(); let mut values: Vec = vec![]; - for i in 0..16 { + for i in 0..batch.num_rows() { values.push(array.value(i)); } - assert_eq!( - "[4, 5, 6, 7, 2, 3, 0, 1, 0, 0, 9, 0, 1, 0, 0, 0]", - format!("{:?}", values) - ); + assert_eq!("[4, 5, 6, 7, 2, 3, 0, 1]", format!("{:?}", values)); } #[test]