Skip to content

Commit

Permalink
fmt
Browse files Browse the repository at this point in the history
  • Loading branch information
nevi-me authored and andygrove committed Mar 14, 2019
1 parent 3c711a5 commit 306d07a
Showing 1 changed file with 56 additions and 28 deletions.
84 changes: 56 additions & 28 deletions rust/datafusion/src/datasource/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,19 +128,29 @@ macro_rules! read_binary_column {
}}
}

trait ArrowReader<T> where T: ArrowPrimitiveType {
fn read(&mut self, batch_size: usize, is_nullable: bool) -> Result<Arc<PrimitiveArray<T>>>;
trait ArrowReader<T>
where
T: ArrowPrimitiveType,
{
fn read(
&mut self,
batch_size: usize,
is_nullable: bool,
) -> Result<Arc<PrimitiveArray<T>>>;
}

impl<A,P> ArrowReader<A> for ColumnReaderImpl<P>
impl<A, P> ArrowReader<A> for ColumnReaderImpl<P>
where
A: ArrowPrimitiveType,
A: ArrowPrimitiveType,
P: parquet::data_type::DataType,
P::T: std::convert::From<A::Native>,
A::Native: std::convert::From<P::T>,
{
fn read(&mut self, batch_size: usize, is_nullable: bool) -> Result<Arc<PrimitiveArray<A>>> {

fn read(
&mut self,
batch_size: usize,
is_nullable: bool,
) -> Result<Arc<PrimitiveArray<A>>> {
// create read buffer
let mut read_buffer: Vec<P::T> = vec![A::default_value().into(); batch_size];

Expand All @@ -151,14 +161,15 @@ where
}

let (values_read, levels_read) = self.read_batch(
batch_size,
Some(&mut def_levels),
None,
&mut read_buffer,
batch_size,
Some(&mut def_levels),
None,
&mut read_buffer,
)?;
let mut builder = PrimitiveBuilder::<A>::new(levels_read);
if values_read == levels_read {
let converted_buffer: Vec<A::Native> = read_buffer.into_iter().map(|v| v.into()).collect();
let converted_buffer: Vec<A::Native> =
read_buffer.into_iter().map(|v| v.into()).collect();
builder.append_slice(&converted_buffer[0..values_read])?;
} else {
for (v, l) in read_buffer.into_iter().zip(def_levels) {
Expand All @@ -171,15 +182,12 @@ where
}
Ok(Arc::new(builder.finish()))
} else {
let (values_read, _) = self.read_batch(
batch_size,
None,
None,
&mut read_buffer,
)?;
let (values_read, _) =
self.read_batch(batch_size, None, None, &mut read_buffer)?;

let mut builder = PrimitiveBuilder::<A>::new(values_read);
let converted_buffer: Vec<A::Native> = read_buffer.into_iter().map(|v| v.into()).collect();
let converted_buffer: Vec<A::Native> =
read_buffer.into_iter().map(|v| v.into()).collect();
builder.append_slice(&converted_buffer[0..values_read])?;
Ok(Arc::new(builder.finish()))
}
Expand Down Expand Up @@ -262,21 +270,33 @@ impl ParquetFile {
let is_nullable = self.schema().field(i).is_nullable();
let array: Arc<Array> = match self.column_readers[i] {
ColumnReader::BoolColumnReader(ref mut r) => {
match ArrowReader::<BooleanType>::read(r, self.batch_size, is_nullable) {
match ArrowReader::<BooleanType>::read(
r,
self.batch_size,
is_nullable,
) {
Ok(array) => array,
Err(e) => return Err(e)
Err(e) => return Err(e),
}
}
ColumnReader::Int32ColumnReader(ref mut r) => {
match ArrowReader::<Int32Type>::read(r, self.batch_size, is_nullable) {
match ArrowReader::<Int32Type>::read(
r,
self.batch_size,
is_nullable,
) {
Ok(array) => array,
Err(e) => return Err(e)
Err(e) => return Err(e),
}
}
ColumnReader::Int64ColumnReader(ref mut r) => {
match ArrowReader::<Int64Type>::read(r, self.batch_size, is_nullable) {
match ArrowReader::<Int64Type>::read(
r,
self.batch_size,
is_nullable,
) {
Ok(array) => array,
Err(e) => return Err(e)
Err(e) => return Err(e),
}
}
ColumnReader::Int96ColumnReader(ref mut r) => {
Expand Down Expand Up @@ -340,15 +360,23 @@ impl ParquetFile {
}
}
ColumnReader::FloatColumnReader(ref mut r) => {
match ArrowReader::<Float32Type>::read(r, self.batch_size, is_nullable) {
match ArrowReader::<Float32Type>::read(
r,
self.batch_size,
is_nullable,
) {
Ok(array) => array,
Err(e) => return Err(e)
Err(e) => return Err(e),
}
}
ColumnReader::DoubleColumnReader(ref mut r) => {
match ArrowReader::<Float64Type>::read(r, self.batch_size, is_nullable) {
match ArrowReader::<Float64Type>::read(
r,
self.batch_size,
is_nullable,
) {
Ok(array) => array,
Err(e) => return Err(e)
Err(e) => return Err(e),
}
}
ColumnReader::FixedLenByteArrayColumnReader(ref mut r) => {
Expand Down

0 comments on commit 306d07a

Please sign in to comment.