Skip to content

Commit

Permalink
null handling for int96
Browse files Browse the repository at this point in the history
  • Loading branch information
andygrove committed Mar 14, 2019
1 parent 1503855 commit 639e13e
Showing 1 changed file with 50 additions and 51 deletions.
101 changes: 50 additions & 51 deletions rust/datafusion/src/datasource/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ impl ParquetTable {
pub fn try_new(filename: &str) -> Result<Self> {
let file = File::open(filename)?;
let parquet_file = ParquetFile::open(file, None)?;
let schema = parquet_file.schema.clone();
let schema = parquet_file.projection_schema.clone();
Ok(Self {
filename: filename.to_string(),
schema,
Expand All @@ -72,11 +72,12 @@ impl Table for ParquetTable {

pub struct ParquetFile {
reader: SerializedFileReader<File>,
row_group_index: usize,
/// The schema of the underlying file
schema: Arc<Schema>,
/// Projection expressed as column indices into underlying parquet reader
projection: Vec<usize>,
/// The schema of the projection
projection_schema: Arc<Schema>,
batch_size: usize,
row_group_index: usize,
current_row_group: Option<Box<RowGroupReader>>,
column_readers: Vec<ColumnReader>,
}
Expand All @@ -97,7 +98,7 @@ macro_rules! read_binary_column {
for _ in 0..$SELF.batch_size {
read_buffer.push(ByteArray::default());
}
if $SELF.schema.field($INDEX).is_nullable() {
if $SELF.projection_schema.field($INDEX).is_nullable() {

let mut def_levels: Vec<i16> = Vec::with_capacity($SELF.batch_size);
for _ in 0..$SELF.batch_size {
Expand Down Expand Up @@ -247,7 +248,7 @@ impl ParquetFile {
Ok(ParquetFile {
reader: reader,
row_group_index: 0,
schema: projected_schema,
projection_schema: projected_schema,
projection,
batch_size: 64 * 1024,
current_row_group: None,
Expand Down Expand Up @@ -285,34 +286,25 @@ impl ParquetFile {
let is_nullable = self.schema().field(i).is_nullable();
let array: Arc<Array> = match self.column_readers[i] {
ColumnReader::BoolColumnReader(ref mut r) => {
match ArrowReader::<BooleanType>::read(
ArrowReader::<BooleanType>::read(
r,
self.batch_size,
is_nullable,
) {
Ok(array) => array,
Err(e) => return Err(e),
}
)?
}
ColumnReader::Int32ColumnReader(ref mut r) => {
match ArrowReader::<Int32Type>::read(
ArrowReader::<Int32Type>::read(
r,
self.batch_size,
is_nullable,
) {
Ok(array) => array,
Err(e) => return Err(e),
}
)?
}
ColumnReader::Int64ColumnReader(ref mut r) => {
match ArrowReader::<Int64Type>::read(
ArrowReader::<Int64Type>::read(
r,
self.batch_size,
is_nullable,
) {
Ok(array) => array,
Err(e) => return Err(e),
}
)?
}
ColumnReader::Int96ColumnReader(ref mut r) => {
let mut read_buffer: Vec<Int96> =
Expand All @@ -322,7 +314,7 @@ impl ParquetFile {
read_buffer.push(Int96::new());
}

if self.schema.field(i).is_nullable() {
if self.projection_schema.field(i).is_nullable() {
let mut def_levels: Vec<i16> =
Vec::with_capacity(self.batch_size);
for _ in 0..self.batch_size {
Expand All @@ -337,21 +329,28 @@ impl ParquetFile {

if values_read == levels_read {
let mut builder = Int64Builder::new(values_read);

for i in 0..values_read {
let v = read_buffer[i].data();
let value: u128 = (v[0] as u128) << 64
| (v[1] as u128) << 32
| (v[2] as u128);
let ms: i64 = (value / 1000000) as i64;
builder.append_value(ms)?;
builder.append_value(convert_int96_timestamp(
read_buffer[i].data(),
))?;
}
Arc::new(builder.finish())
} else {
return Err(ExecutionError::NotImplemented(
"Parquet datasource does not support null values"
.to_string(),
));
let mut builder = Int64Builder::new(levels_read);
let mut value_index = 0;
for i in 0..levels_read {
if def_levels[i] > 0 {
builder.append_value(
convert_int96_timestamp(
read_buffer[value_index].data(),
),
)?;
value_index += 1;
} else {
builder.append_null()?;
}
}
Arc::new(builder.finish())
}
} else {
let (values_read, _) = r.read_batch(
Expand All @@ -364,35 +363,26 @@ impl ParquetFile {
let mut builder = Int64Builder::new(values_read);

for i in 0..values_read {
let v = read_buffer[i].data();
let value: u128 = (v[0] as u128) << 64
| (v[1] as u128) << 32
| (v[2] as u128);
let ms: i64 = (value / 1000000) as i64;
builder.append_value(ms)?;
builder.append_value(convert_int96_timestamp(
read_buffer[i].data(),
))?;
}
Arc::new(builder.finish())
}
}
ColumnReader::FloatColumnReader(ref mut r) => {
match ArrowReader::<Float32Type>::read(
ArrowReader::<Float32Type>::read(
r,
self.batch_size,
is_nullable,
) {
Ok(array) => array,
Err(e) => return Err(e),
}
)?
}
ColumnReader::DoubleColumnReader(ref mut r) => {
match ArrowReader::<Float64Type>::read(
ArrowReader::<Float64Type>::read(
r,
self.batch_size,
is_nullable,
) {
Ok(array) => array,
Err(e) => return Err(e),
}
)?
}
ColumnReader::FixedLenByteArrayColumnReader(ref mut r) => {
read_binary_column!(self, r, i)
Expand All @@ -408,17 +398,26 @@ impl ParquetFile {
if batch.len() == 0 || batch[0].data().len() == 0 {
Ok(None)
} else {
Ok(Some(RecordBatch::try_new(self.schema.clone(), batch)?))
Ok(Some(RecordBatch::try_new(
self.projection_schema.clone(),
batch,
)?))
}
}
_ => Ok(None),
}
}
}

/// convert a parquet timestamp in nanoseconds to a timestamp with milliseconds
fn convert_int96_timestamp(v: &[u32]) -> i64 {
let value: u128 = (v[0] as u128) << 64 | (v[1] as u128) << 32 | (v[2] as u128);
(value / 1000000) as i64
}

impl RecordBatchIterator for ParquetFile {
fn schema(&self) -> &Arc<Schema> {
&self.schema
&self.projection_schema
}

fn next(&mut self) -> Result<Option<RecordBatch>> {
Expand Down

0 comments on commit 639e13e

Please sign in to comment.