Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Added support for projection pushdown on IPC files #264

Merged
merged 5 commits into from
Aug 11, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion arrow-flight/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,8 @@ pub fn flight_data_to_arrow_batch(
.map(|batch| {
read_record_batch(
batch,
schema,
schema.clone(),
None,
is_little_endian,
&dictionaries_by_field,
&mut reader,
Expand Down
2 changes: 1 addition & 1 deletion examples/ipc_file_read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ fn read_batches(path: &str) -> Result<Vec<RecordBatch>> {
let metadata = read_file_metadata(&mut file)?;

// Simplest way: use the reader, an iterator over batches.
let reader = FileReader::new(&mut file, metadata);
let reader = FileReader::new(&mut file, metadata, None);

reader.collect()
}
Expand Down
2 changes: 1 addition & 1 deletion integration-testing/src/bin/arrow-file-to-stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ fn main() -> Result<()> {
let filename = &args[1];
let mut f = File::open(filename)?;
let metadata = read::read_file_metadata(&mut f)?;
let mut reader = read::FileReader::new(&mut f, metadata);
let mut reader = read::FileReader::new(&mut f, metadata, None);
let schema = reader.schema();

let mut writer = StreamWriter::try_new(std::io::stdout(), &schema)?;
Expand Down
4 changes: 2 additions & 2 deletions integration-testing/src/bin/arrow-json-integration-test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ fn arrow_to_json(arrow_name: &str, json_name: &str, verbose: bool) -> Result<()>

let mut arrow_file = File::open(arrow_name)?;
let metadata = read::read_file_metadata(&mut arrow_file)?;
let reader = read::FileReader::new(&mut arrow_file, metadata);
let reader = read::FileReader::new(&mut arrow_file, metadata, None);

let mut fields: Vec<ArrowJsonField> = vec![];
for f in reader.schema().fields() {
Expand Down Expand Up @@ -137,7 +137,7 @@ fn validate(arrow_name: &str, json_name: &str, verbose: bool) -> Result<()> {
// open Arrow file
let mut arrow_file = File::open(arrow_name)?;
let metadata = read::read_file_metadata(&mut arrow_file)?;
let reader = read::FileReader::new(&mut arrow_file, metadata);
let reader = read::FileReader::new(&mut arrow_file, metadata, None);
let arrow_schema = reader.schema().as_ref().to_owned();

// compare schemas
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,7 @@ async fn record_batch_from_message(
let arrow_batch_result = ipc::read::read_record_batch(
ipc_batch,
schema_ref,
None,
true,
&dictionaries_by_field,
&mut reader,
Expand Down
67 changes: 67 additions & 0 deletions src/io/ipc/read/array/binary.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
use std::collections::VecDeque;
use std::convert::TryInto;
use std::io::{Read, Seek};

use crate::array::{BinaryArray, Offset};
use crate::buffer::Buffer;
use crate::error::Result;
use crate::io::ipc::gen::Message::BodyCompression;
use crate::types::NativeType;

use super::super::super::gen;
use super::super::deserialize::Node;
use super::super::read_basic::*;

pub fn read_binary<O: Offset, R: Read + Seek>(
field_nodes: &mut VecDeque<Node>,
buffers: &mut VecDeque<&gen::Schema::Buffer>,
reader: &mut R,
block_offset: u64,
is_little_endian: bool,
compression: Option<BodyCompression>,
) -> Result<BinaryArray<O>>
where
Vec<u8>: TryInto<O::Bytes> + TryInto<<u8 as NativeType>::Bytes>,
{
let field_node = field_nodes.pop_front().unwrap().0;

let validity = read_validity(
buffers,
field_node,
reader,
block_offset,
is_little_endian,
compression,
)?;

let offsets: Buffer<O> = read_buffer(
buffers,
1 + field_node.length() as usize,
reader,
block_offset,
is_little_endian,
compression,
)
// Older versions of the IPC format sometimes do not report an offset
.or_else(|_| Result::Ok(Buffer::<O>::from(&[O::default()])))?;

let last_offset = offsets.as_slice()[offsets.len() - 1].to_usize();
let values = read_buffer(
buffers,
last_offset,
reader,
block_offset,
is_little_endian,
compression,
)?;

Ok(BinaryArray::<O>::from_data(offsets, values, validity))
}

pub fn skip_binary(field_nodes: &mut VecDeque<Node>, buffers: &mut VecDeque<&gen::Schema::Buffer>) {
let _ = field_nodes.pop_front().unwrap();

let _ = buffers.pop_front().unwrap();
let _ = buffers.pop_front().unwrap();
let _ = buffers.pop_front().unwrap();
}
49 changes: 49 additions & 0 deletions src/io/ipc/read/array/boolean.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
use std::collections::VecDeque;
use std::io::{Read, Seek};

use crate::array::BooleanArray;
use crate::error::Result;

use super::super::super::gen;
use super::super::deserialize::Node;
use super::super::read_basic::*;

pub fn read_boolean<R: Read + Seek>(
field_nodes: &mut VecDeque<Node>,
buffers: &mut VecDeque<&gen::Schema::Buffer>,
reader: &mut R,
block_offset: u64,
is_little_endian: bool,
) -> Result<BooleanArray> {
let field_node = field_nodes.pop_front().unwrap().0;

let length = field_node.length() as usize;
let validity = read_validity(
buffers,
field_node,
reader,
block_offset,
is_little_endian,
None,
)?;

let values = read_bitmap(
buffers,
length,
reader,
block_offset,
is_little_endian,
None,
)?;
Ok(BooleanArray::from_data(values, validity))
}

pub fn skip_boolean(
field_nodes: &mut VecDeque<Node>,
buffers: &mut VecDeque<&gen::Schema::Buffer>,
) {
let _ = field_nodes.pop_front().unwrap();

let _ = buffers.pop_front().unwrap();
let _ = buffers.pop_front().unwrap();
}
42 changes: 42 additions & 0 deletions src/io/ipc/read/array/dictionary.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
use std::collections::VecDeque;
use std::convert::TryInto;
use std::io::{Read, Seek};

use crate::array::{DictionaryArray, DictionaryKey};
use crate::error::Result;

use super::super::super::gen;
use super::super::deserialize::Node;
use super::{read_primitive, skip_primitive};

pub fn read_dictionary<T: DictionaryKey, R: Read + Seek>(
field_nodes: &mut VecDeque<Node>,
buffers: &mut VecDeque<&gen::Schema::Buffer>,
reader: &mut R,
block_offset: u64,
is_little_endian: bool,
) -> Result<DictionaryArray<T>>
where
Vec<u8>: TryInto<T::Bytes>,
{
let values = field_nodes.front().unwrap().1.as_ref().unwrap();

let keys = read_primitive(
field_nodes,
T::DATA_TYPE,
buffers,
reader,
block_offset,
is_little_endian,
None,
)?;

Ok(DictionaryArray::<T>::from_data(keys, values.clone()))
}

pub fn skip_dictionary(
field_nodes: &mut VecDeque<Node>,
buffers: &mut VecDeque<&gen::Schema::Buffer>,
) {
skip_primitive(field_nodes, buffers)
}
55 changes: 55 additions & 0 deletions src/io/ipc/read/array/fixed_size_binary.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
use std::collections::VecDeque;
use std::io::{Read, Seek};

use crate::array::FixedSizeBinaryArray;
use crate::datatypes::DataType;
use crate::error::Result;
use crate::io::ipc::gen::Message::BodyCompression;

use super::super::super::gen;
use super::super::deserialize::Node;
use super::super::read_basic::*;

pub fn read_fixed_size_binary<R: Read + Seek>(
field_nodes: &mut VecDeque<Node>,
data_type: DataType,
buffers: &mut VecDeque<&gen::Schema::Buffer>,
reader: &mut R,
block_offset: u64,
is_little_endian: bool,
compression: Option<BodyCompression>,
) -> Result<FixedSizeBinaryArray> {
let field_node = field_nodes.pop_front().unwrap().0;

let validity = read_validity(
buffers,
field_node,
reader,
block_offset,
is_little_endian,
compression,
)?;

let length =
field_node.length() as usize * (*FixedSizeBinaryArray::get_size(&data_type) as usize);
let values = read_buffer(
buffers,
length,
reader,
block_offset,
is_little_endian,
compression,
)?;

Ok(FixedSizeBinaryArray::from_data(data_type, values, validity))
}

pub fn skip_fixed_size_binary(
field_nodes: &mut VecDeque<Node>,
buffers: &mut VecDeque<&gen::Schema::Buffer>,
) {
let _ = field_nodes.pop_front().unwrap();

let _ = buffers.pop_front().unwrap();
let _ = buffers.pop_front().unwrap();
}
59 changes: 59 additions & 0 deletions src/io/ipc/read/array/fixed_size_list.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
use std::collections::VecDeque;
use std::io::{Read, Seek};

use crate::array::FixedSizeListArray;
use crate::datatypes::DataType;
use crate::error::Result;
use crate::io::ipc::gen::Message::BodyCompression;

use super::super::super::gen;
use super::super::deserialize::{read, skip, Node};
use super::super::read_basic::*;

pub fn read_fixed_size_list<R: Read + Seek>(
field_nodes: &mut VecDeque<Node>,
data_type: DataType,
buffers: &mut VecDeque<&gen::Schema::Buffer>,
reader: &mut R,
block_offset: u64,
is_little_endian: bool,
compression: Option<BodyCompression>,
) -> Result<FixedSizeListArray> {
let field_node = field_nodes.pop_front().unwrap().0;

let validity = read_validity(
buffers,
field_node,
reader,
block_offset,
is_little_endian,
compression,
)?;

let (value_data_type, _) = FixedSizeListArray::get_child_and_size(&data_type);

let values = read(
field_nodes,
value_data_type.clone(),
buffers,
reader,
block_offset,
is_little_endian,
compression,
)?;
Ok(FixedSizeListArray::from_data(data_type, values, validity))
}

pub fn skip_fixed_size_list(
field_nodes: &mut VecDeque<Node>,
data_type: &DataType,
buffers: &mut VecDeque<&gen::Schema::Buffer>,
) {
let _ = field_nodes.pop_front().unwrap();

let _ = buffers.pop_front().unwrap();

let (data_type, _) = FixedSizeListArray::get_child_and_size(data_type);

skip(field_nodes, data_type, buffers)
}
Loading