Skip to content

Commit

Permalink
add support for all primitive parquet types
Browse files Browse the repository at this point in the history
  • Loading branch information
andygrove committed Mar 14, 2019
1 parent b4981ed commit 6c3b7e2
Showing 1 changed file with 105 additions and 24 deletions.
129 changes: 105 additions & 24 deletions rust/datafusion/src/datasource/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ use crate::execution::error::{ExecutionError, Result};
use arrow::builder::BooleanBuilder;
use arrow::builder::Int64Builder;
use arrow::builder::{BinaryBuilder, Float32Builder, Float64Builder, Int32Builder};
use parquet::data_type::Int96;

pub struct ParquetTable {
filename: String,
Expand Down Expand Up @@ -171,7 +172,6 @@ impl ParquetFile {
None,
&mut read_buffer,
) {
//TODO this isn't handling null values
Ok((count, _)) => {
let mut builder = BooleanBuilder::new(count);
builder.append_slice(&read_buffer[0..count]).unwrap();
Expand Down Expand Up @@ -200,7 +200,6 @@ impl ParquetFile {
None,
&mut read_buffer,
) {
//TODO this isn't handling null values
Ok((count, _)) => {
let mut builder = Int32Builder::new(count);
builder.append_slice(&read_buffer[0..count]).unwrap();
Expand Down Expand Up @@ -229,7 +228,6 @@ impl ParquetFile {
None,
&mut read_buffer,
) {
//TODO this isn't handling null values
Ok((count, _)) => {
let mut builder = Int64Builder::new(count);
builder.append_slice(&read_buffer[0..count]).unwrap();
Expand All @@ -244,10 +242,41 @@ impl ParquetFile {
}
}
}
ColumnReader::Int96ColumnReader(ref mut _r) => {
return Err(ExecutionError::NotImplemented(
"unsupported column reader type (INT96)".to_string(),
));
ColumnReader::Int96ColumnReader(ref mut r) => {
let mut read_buffer: Vec<Int96> =
Vec::with_capacity(self.batch_size);

for _ in 0..self.batch_size {
read_buffer.push(Int96::new());
}

match r.read_batch(
self.batch_size,
None,
None,
&mut read_buffer,
) {
Ok((count, _)) => {
let mut builder = Int64Builder::new(count);

for i in 0..count {
let v = read_buffer[i].data();
let value: u128 = (v[0] as u128) << 64
| (v[1] as u128) << 32
| (v[2] as u128);
let ms: i64 = (value / 1000000) as i64;
builder.append_value(ms).unwrap();
}
row_count = count;
Arc::new(builder.finish())
}
Err(e) => {
return Err(ExecutionError::NotImplemented(format!(
"Error reading parquet batch (column {}): {:?}",
i, e
)));
}
}
}
ColumnReader::FloatColumnReader(ref mut r) => {
let mut builder = Float32Builder::new(self.batch_size);
Expand All @@ -262,7 +291,6 @@ impl ParquetFile {
None,
&mut read_buffer,
) {
//TODO this isn't handling null values
Ok((count, _)) => {
builder.append_slice(&read_buffer[0..count]).unwrap();
row_count = count;
Expand All @@ -289,7 +317,6 @@ impl ParquetFile {
None,
&mut read_buffer,
) {
//TODO this isn't handling null values
Ok((count, _)) => {
builder.append_slice(&read_buffer[0..count]).unwrap();
row_count = count;
Expand All @@ -303,11 +330,34 @@ impl ParquetFile {
}
}
}
ColumnReader::FixedLenByteArrayColumnReader(ref mut _r) => {
return Err(ExecutionError::NotImplemented(
"unsupported column reader type (FixedLenByteArray)"
.to_string(),
));
ColumnReader::FixedLenByteArrayColumnReader(ref mut r) => {
let mut b: Vec<ByteArray> =
Vec::with_capacity(self.batch_size);
for _ in 0..self.batch_size {
b.push(ByteArray::default());
}
match r.read_batch(self.batch_size, None, None, &mut b) {
Ok((count, _)) => {
row_count = count;
let mut builder = BinaryBuilder::new(row_count);
for j in 0..row_count {
let slice = b[j].slice(0, b[j].len());
builder
.append_string(
&String::from_utf8(slice.data().to_vec())
.unwrap(),
)
.unwrap();
}
Arc::new(builder.finish())
}
_ => {
return Err(ExecutionError::NotImplemented(format!(
"Error reading parquet batch (column {})",
i
)));
}
}
}
ColumnReader::ByteArrayColumnReader(ref mut r) => {
let mut b: Vec<ByteArray> =
Expand All @@ -316,10 +366,8 @@ impl ParquetFile {
b.push(ByteArray::default());
}
match r.read_batch(self.batch_size, None, None, &mut b) {
//TODO this isn't handling null values
Ok((count, _)) => {
row_count = count;
//TODO this is horribly inefficient
let mut builder = BinaryBuilder::new(row_count);
for j in 0..row_count {
let slice = b[j].slice(0, b[j].len());
Expand Down Expand Up @@ -370,16 +418,11 @@ fn to_arrow(t: &Type) -> Result<Field> {
basic::Type::BOOLEAN => DataType::Boolean,
basic::Type::INT32 => DataType::Int32,
basic::Type::INT64 => DataType::Int64,
basic::Type::INT96 => DataType::Int64, //TODO ???
basic::Type::INT96 => DataType::Int64,
basic::Type::FLOAT => DataType::Float32,
basic::Type::DOUBLE => DataType::Float64,
basic::Type::BYTE_ARRAY => DataType::Utf8, /*match basic_info.logical_type() {
basic::LogicalType::UTF8 => DataType::Utf8,
_ => unimplemented!("No support for Parquet BYTE_ARRAY yet"),
}*/
basic::Type::FIXED_LEN_BYTE_ARRAY => {
unimplemented!("No support for Parquet FIXED_LEN_BYTE_ARRAY yet")
}
basic::Type::BYTE_ARRAY => DataType::Utf8,
basic::Type::FIXED_LEN_BYTE_ARRAY => DataType::Utf8,
};

Ok(Field::new(basic_info.name(), arrow_type, false))
Expand Down Expand Up @@ -434,6 +477,19 @@ mod tests {
use arrow::array::{BinaryArray, Int32Array};
use std::env;

#[test]
fn read_alltypes_plain_parquet() {
let table = load_table("alltypes_plain.parquet");

let projection = None;
let scan = table.scan(&projection, 1024).unwrap();
let mut it = scan.borrow_mut();
let batch = it.next().unwrap().unwrap();

assert_eq!(11, batch.num_columns());
assert_eq!(8, batch.num_rows());
}

#[test]
fn read_bool_alltypes_plain_parquet() {
let table = load_table("alltypes_plain.parquet");
Expand Down Expand Up @@ -487,6 +543,31 @@ mod tests {
assert_eq!("[4, 5, 6, 7, 2, 3, 0, 1]", format!("{:?}", values));
}

#[test]
fn read_i96_alltypes_plain_parquet() {
let table = load_table("alltypes_plain.parquet");

let projection = Some(vec![10]);
let scan = table.scan(&projection, 1024).unwrap();
let mut it = scan.borrow_mut();
let batch = it.next().unwrap().unwrap();

assert_eq!(1, batch.num_columns());
assert_eq!(8, batch.num_rows());

let array = batch
.column(0)
.as_any()
.downcast_ref::<Int64Array>()
.unwrap();
let mut values: Vec<i64> = vec![];
for i in 0..batch.num_rows() {
values.push(array.value(i));
}

assert_eq!("[2, 7842670136425819125, 2, 7842670136425819125, 2, 7842670136425819125, 2, 7842670136425819125]", format!("{:?}", values));
}

#[test]
fn read_f32_alltypes_plain_parquet() {
let table = load_table("alltypes_plain.parquet");
Expand Down

0 comments on commit 6c3b7e2

Please sign in to comment.