diff --git a/rust/datafusion/src/datasource/parquet.rs b/rust/datafusion/src/datasource/parquet.rs index cb8597febf525..0b1c0818c9aff 100644 --- a/rust/datafusion/src/datasource/parquet.rs +++ b/rust/datafusion/src/datasource/parquet.rs @@ -38,6 +38,7 @@ use crate::execution::error::{ExecutionError, Result}; use arrow::builder::BooleanBuilder; use arrow::builder::Int64Builder; use arrow::builder::{BinaryBuilder, Float32Builder, Float64Builder, Int32Builder}; +use parquet::data_type::Int96; pub struct ParquetTable { filename: String, @@ -171,7 +172,6 @@ impl ParquetFile { None, &mut read_buffer, ) { - //TODO this isn't handling null values Ok((count, _)) => { let mut builder = BooleanBuilder::new(count); builder.append_slice(&read_buffer[0..count]).unwrap(); @@ -200,7 +200,6 @@ impl ParquetFile { None, &mut read_buffer, ) { - //TODO this isn't handling null values Ok((count, _)) => { let mut builder = Int32Builder::new(count); builder.append_slice(&read_buffer[0..count]).unwrap(); @@ -229,7 +228,6 @@ impl ParquetFile { None, &mut read_buffer, ) { - //TODO this isn't handling null values Ok((count, _)) => { let mut builder = Int64Builder::new(count); builder.append_slice(&read_buffer[0..count]).unwrap(); @@ -244,10 +242,41 @@ impl ParquetFile { } } } - ColumnReader::Int96ColumnReader(ref mut _r) => { - return Err(ExecutionError::NotImplemented( - "unsupported column reader type (INT96)".to_string(), - )); + ColumnReader::Int96ColumnReader(ref mut r) => { + let mut read_buffer: Vec = + Vec::with_capacity(self.batch_size); + + for _ in 0..self.batch_size { + read_buffer.push(Int96::new()); + } + + match r.read_batch( + self.batch_size, + None, + None, + &mut read_buffer, + ) { + Ok((count, _)) => { + let mut builder = Int64Builder::new(count); + + for i in 0..count { + let v = read_buffer[i].data(); + let value: u128 = (v[0] as u128) << 64 + | (v[1] as u128) << 32 + | (v[2] as u128); + let ms: i64 = (value / 1000000) as i64; + builder.append_value(ms).unwrap(); + } + row_count = count; + Arc::new(builder.finish()) + } + Err(e) => { + return Err(ExecutionError::NotImplemented(format!( + "Error reading parquet batch (column {}): {:?}", + i, e + ))); + } + } } ColumnReader::FloatColumnReader(ref mut r) => { let mut builder = Float32Builder::new(self.batch_size); @@ -262,7 +291,6 @@ impl ParquetFile { None, &mut read_buffer, ) { - //TODO this isn't handling null values Ok((count, _)) => { builder.append_slice(&read_buffer[0..count]).unwrap(); row_count = count; @@ -289,7 +317,6 @@ impl ParquetFile { None, &mut read_buffer, ) { - //TODO this isn't handling null values Ok((count, _)) => { builder.append_slice(&read_buffer[0..count]).unwrap(); row_count = count; @@ -303,11 +330,34 @@ impl ParquetFile { } } } - ColumnReader::FixedLenByteArrayColumnReader(ref mut _r) => { - return Err(ExecutionError::NotImplemented( - "unsupported column reader type (FixedLenByteArray)" - .to_string(), - )); + ColumnReader::FixedLenByteArrayColumnReader(ref mut r) => { + let mut b: Vec = + Vec::with_capacity(self.batch_size); + for _ in 0..self.batch_size { + b.push(ByteArray::default()); + } + match r.read_batch(self.batch_size, None, None, &mut b) { + Ok((count, _)) => { + row_count = count; + let mut builder = BinaryBuilder::new(row_count); + for j in 0..row_count { + let slice = b[j].slice(0, b[j].len()); + builder + .append_string( + &String::from_utf8(slice.data().to_vec()) + .unwrap(), + ) + .unwrap(); + } + Arc::new(builder.finish()) + } + _ => { + return Err(ExecutionError::NotImplemented(format!( + "Error reading parquet batch (column {})", + i + ))); + } + } } ColumnReader::ByteArrayColumnReader(ref mut r) => { let mut b: Vec = @@ -316,10 +366,8 @@ impl ParquetFile { b.push(ByteArray::default()); } match r.read_batch(self.batch_size, None, None, &mut b) { - //TODO this isn't handling null values Ok((count, _)) => { row_count = count; - //TODO this is horribly inefficient let mut builder = BinaryBuilder::new(row_count); for j in 0..row_count { let slice = b[j].slice(0, b[j].len()); @@ -370,16 +418,11 @@ fn to_arrow(t: &Type) -> Result { basic::Type::BOOLEAN => DataType::Boolean, basic::Type::INT32 => DataType::Int32, basic::Type::INT64 => DataType::Int64, - basic::Type::INT96 => DataType::Int64, //TODO ??? + basic::Type::INT96 => DataType::Int64, basic::Type::FLOAT => DataType::Float32, basic::Type::DOUBLE => DataType::Float64, - basic::Type::BYTE_ARRAY => DataType::Utf8, /*match basic_info.logical_type() { - basic::LogicalType::UTF8 => DataType::Utf8, - _ => unimplemented!("No support for Parquet BYTE_ARRAY yet"), - }*/ - basic::Type::FIXED_LEN_BYTE_ARRAY => { - unimplemented!("No support for Parquet FIXED_LEN_BYTE_ARRAY yet") - } + basic::Type::BYTE_ARRAY => DataType::Utf8, + basic::Type::FIXED_LEN_BYTE_ARRAY => DataType::Utf8, }; Ok(Field::new(basic_info.name(), arrow_type, false)) @@ -434,6 +477,19 @@ mod tests { use arrow::array::{BinaryArray, Int32Array}; use std::env; + #[test] + fn read_alltypes_plain_parquet() { + let table = load_table("alltypes_plain.parquet"); + + let projection = None; + let scan = table.scan(&projection, 1024).unwrap(); + let mut it = scan.borrow_mut(); + let batch = it.next().unwrap().unwrap(); + + assert_eq!(11, batch.num_columns()); + assert_eq!(8, batch.num_rows()); + } + #[test] fn read_bool_alltypes_plain_parquet() { let table = load_table("alltypes_plain.parquet"); @@ -487,6 +543,31 @@ mod tests { assert_eq!("[4, 5, 6, 7, 2, 3, 0, 1]", format!("{:?}", values)); } + #[test] + fn read_i96_alltypes_plain_parquet() { + let table = load_table("alltypes_plain.parquet"); + + let projection = Some(vec![10]); + let scan = table.scan(&projection, 1024).unwrap(); + let mut it = scan.borrow_mut(); + let batch = it.next().unwrap().unwrap(); + + assert_eq!(1, batch.num_columns()); + assert_eq!(8, batch.num_rows()); + + let array = batch + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + let mut values: Vec = vec![]; + for i in 0..batch.num_rows() { + values.push(array.value(i)); + } + + assert_eq!("[2, 7842670136425819125, 2, 7842670136425819125, 2, 7842670136425819125, 2, 7842670136425819125]", format!("{:?}", values)); + } + #[test] fn read_f32_alltypes_plain_parquet() { let table = load_table("alltypes_plain.parquet");