Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Commit

Permalink
Added projection to IPC.
Browse files Browse the repository at this point in the history
  • Loading branch information
jorgecarleitao committed Aug 9, 2021
1 parent 444206c commit 3e0471a
Show file tree
Hide file tree
Showing 9 changed files with 128 additions and 31 deletions.
3 changes: 2 additions & 1 deletion arrow-flight/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,8 @@ pub fn flight_data_to_arrow_batch(
.map(|batch| {
read_record_batch(
batch,
schema,
schema.clone(),
None,
is_little_endian,
&dictionaries_by_field,
&mut reader,
Expand Down
2 changes: 1 addition & 1 deletion examples/ipc_file_read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ fn read_batches(path: &str) -> Result<Vec<RecordBatch>> {
let metadata = read_file_metadata(&mut file)?;

// Simplest way: use the reader, an iterator over batches.
let reader = FileReader::new(&mut file, metadata);
let reader = FileReader::new(&mut file, metadata, None);

reader.collect()
}
Expand Down
2 changes: 1 addition & 1 deletion integration-testing/src/bin/arrow-file-to-stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ fn main() -> Result<()> {
let filename = &args[1];
let mut f = File::open(filename)?;
let metadata = read::read_file_metadata(&mut f)?;
let mut reader = read::FileReader::new(&mut f, metadata);
let mut reader = read::FileReader::new(&mut f, metadata, None);
let schema = reader.schema();

let mut writer = StreamWriter::try_new(std::io::stdout(), &schema)?;
Expand Down
4 changes: 2 additions & 2 deletions integration-testing/src/bin/arrow-json-integration-test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ fn arrow_to_json(arrow_name: &str, json_name: &str, verbose: bool) -> Result<()>

let mut arrow_file = File::open(arrow_name)?;
let metadata = read::read_file_metadata(&mut arrow_file)?;
let reader = read::FileReader::new(&mut arrow_file, metadata);
let reader = read::FileReader::new(&mut arrow_file, metadata, None);

let mut fields: Vec<ArrowJsonField> = vec![];
for f in reader.schema().fields() {
Expand Down Expand Up @@ -137,7 +137,7 @@ fn validate(arrow_name: &str, json_name: &str, verbose: bool) -> Result<()> {
// open Arrow file
let mut arrow_file = File::open(arrow_name)?;
let metadata = read::read_file_metadata(&mut arrow_file)?;
let reader = read::FileReader::new(&mut arrow_file, metadata);
let reader = read::FileReader::new(&mut arrow_file, metadata, None);
let arrow_schema = reader.schema().as_ref().to_owned();

// compare schemas
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,7 @@ async fn record_batch_from_message(
let arrow_batch_result = ipc::read::read_record_batch(
ipc_batch,
schema_ref,
None,
true,
&dictionaries_by_field,
&mut reader,
Expand Down
67 changes: 49 additions & 18 deletions src/io/ipc/read/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.

use std::collections::{HashMap, VecDeque};
use std::collections::{HashMap, HashSet, VecDeque};
use std::io::{Read, Seek};
use std::sync::Arc;

Expand All @@ -25,14 +25,15 @@ use crate::error::{ArrowError, Result};
use crate::record_batch::RecordBatch;

use super::super::gen;
use super::deserialize::read;
use super::deserialize::{read, skip};

type ArrayRef = Arc<dyn Array>;

/// Creates a record batch from binary data using the `ipc::RecordBatch` indexes and the `Schema`
pub fn read_record_batch<R: Read + Seek>(
batch: gen::Message::RecordBatch,
schema: Arc<Schema>,
projection: Option<(&HashSet<usize>, Arc<Schema>)>,
is_little_endian: bool,
dictionaries: &[Option<ArrayRef>],
reader: &mut R,
Expand All @@ -54,23 +55,52 @@ pub fn read_record_batch<R: Read + Seek>(
.zip(dictionaries)
.collect::<VecDeque<_>>();

let arrays = schema
.fields()
.iter()
.map(|field| {
read(
&mut field_nodes,
field.data_type().clone(),
&mut buffers,
reader,
block_offset,
is_little_endian,
batch.compression(),
)
})
.collect::<Result<Vec<_>>>()?;
let (schema, columns) = if let Some(projection) = projection {
let projected_schema = projection.1.clone();
let projection = (0..schema.fields().len()).map(|x| projection.0.contains(&x));

RecordBatch::try_new(schema.clone(), arrays)
let arrays = schema
.fields()
.iter()
.zip(projection)
.map(|(field, is_taken)| {
if is_taken {
Some(read(
&mut field_nodes,
field.data_type().clone(),
&mut buffers,
reader,
block_offset,
is_little_endian,
batch.compression(),
))
} else {
skip(&mut field_nodes, field.data_type(), &mut buffers);
None
}
})
.flatten()
.collect::<Result<Vec<_>>>()?;
(projected_schema, arrays)
} else {
let arrays = schema
.fields()
.iter()
.map(|field| {
read(
&mut field_nodes,
field.data_type().clone(),
&mut buffers,
reader,
block_offset,
is_little_endian,
batch.compression(),
)
})
.collect::<Result<Vec<_>>>()?;
(schema.clone(), arrays)
};
RecordBatch::try_new(schema, columns)
}

/// Read the dictionary from the buffer and provided metadata,
Expand Down Expand Up @@ -109,6 +139,7 @@ pub fn read_dictionary<R: Read + Seek>(
let record_batch = read_record_batch(
batch.data().unwrap(),
schema,
None,
is_little_endian,
dictionaries_by_field,
reader,
Expand Down
75 changes: 69 additions & 6 deletions src/io/ipc/read/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
// specific language governing permissions and limitations
// under the License.

use std::collections::HashSet;
use std::io::{Read, Seek, SeekFrom};
use std::sync::Arc;

Expand Down Expand Up @@ -66,6 +67,7 @@ pub struct FileReader<'a, R: Read + Seek> {
reader: &'a mut R,
metadata: FileMetadata,
current_block: usize,
projection: Option<(HashSet<usize>, Arc<Schema>)>,
}

/// Read the IPC file's metadata
Expand Down Expand Up @@ -173,6 +175,7 @@ pub fn read_file_metadata<R: Read + Seek>(reader: &mut R) -> Result<FileMetadata
pub fn read_batch<R: Read + Seek>(
reader: &mut R,
metadata: &FileMetadata,
projection: Option<(&HashSet<usize>, Arc<Schema>)>,
block: usize,
) -> Result<Option<RecordBatch>> {
let block = metadata.blocks[block];
Expand Down Expand Up @@ -212,6 +215,7 @@ pub fn read_batch<R: Read + Seek>(
read_record_batch(
batch,
metadata.schema.clone(),
projection,
metadata.is_little_endian,
&metadata.dictionaries_by_field,
reader,
Expand All @@ -228,18 +232,41 @@ pub fn read_batch<R: Read + Seek>(
}

impl<'a, R: Read + Seek> FileReader<'a, R> {
/// Creates a new reader
pub fn new(reader: &'a mut R, metadata: FileMetadata) -> Self {
/// Creates a new [`FileReader`]. Use `projection` to only take certain columns.
pub fn new(
reader: &'a mut R,
metadata: FileMetadata,
projection: Option<HashSet<usize>>,
) -> Self {
let projection = projection.map(|projection| {
let fields = metadata
.schema()
.fields()
.iter()
.enumerate()
.filter(|x| projection.contains(&x.0))
.map(|x| x.1.clone())
.collect();
let schema = Arc::new(Schema {
fields,
metadata: metadata.schema().metadata().clone(),
});
(projection, schema)
});
Self {
reader,
metadata,
projection,
current_block: 0,
}
}

/// Return the schema of the file
pub fn schema(&self) -> &Arc<Schema> {
&self.metadata.schema
self.projection
.as_ref()
.map(|x| &x.1)
.unwrap_or(&self.metadata.schema)
}
}

Expand All @@ -251,7 +278,13 @@ impl<'a, R: Read + Seek> Iterator for FileReader<'a, R> {
if self.current_block < self.metadata.total_blocks {
let block = self.current_block;
self.current_block += 1;
read_batch(&mut self.reader, &self.metadata, block).transpose()
read_batch(
&mut self.reader,
&self.metadata,
self.projection.as_ref().map(|x| (&x.0, x.1.clone())),
block,
)
.transpose()
} else {
None
}
Expand All @@ -260,7 +293,7 @@ impl<'a, R: Read + Seek> Iterator for FileReader<'a, R> {

impl<'a, R: Read + Seek> RecordBatchReader for FileReader<'a, R> {
fn schema(&self) -> &Schema {
&self.metadata.schema
self.schema().as_ref()
}
}

Expand All @@ -281,7 +314,7 @@ mod tests {
))?;

let metadata = read_file_metadata(&mut file)?;
let reader = FileReader::new(&mut file, metadata);
let reader = FileReader::new(&mut file, metadata, None);

// read expected JSON output
let (schema, batches) = read_gzip_json(version, file_name);
Expand Down Expand Up @@ -388,4 +421,34 @@ mod tests {
fn read_generated_200_compression_zstd() -> Result<()> {
test_file("2.0.0-compression", "generated_zstd")
}

fn test_projection(version: &str, file_name: &str, column: usize) -> Result<()> {
let testdata = crate::util::test_util::arrow_test_data();
let mut file = File::open(format!(
"{}/arrow-ipc-stream/integration/{}/{}.arrow_file",
testdata, version, file_name
))?;

let metadata = read_file_metadata(&mut file)?;
let mut reader = FileReader::new(
&mut file,
metadata,
Some(vec![column].into_iter().collect()),
);

assert_eq!(reader.schema().fields().len(), 1);

reader.try_for_each(|rhs| {
assert_eq!(rhs?.num_columns(), 1);
Result::Ok(())
})?;
Ok(())
}

#[test]
fn read_projected() -> Result<()> {
test_projection("1.0.0-littleendian", "generated_primitive", 1)?;
test_projection("1.0.0-littleendian", "generated_dictionary", 2)?;
test_projection("1.0.0-littleendian", "generated_nested", 1)
}
}
1 change: 1 addition & 0 deletions src/io/ipc/read/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ pub fn read_next<R: Read>(
read_record_batch(
batch,
metadata.schema.clone(),
None,
metadata.is_little_endian,
dictionaries_by_field,
&mut reader,
Expand Down
4 changes: 2 additions & 2 deletions src/io/ipc/write/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ mod tests {
let metadata = read_file_metadata(&mut reader)?;
let schema = metadata.schema().clone();

let reader = FileReader::new(&mut reader, metadata);
let reader = FileReader::new(&mut reader, metadata, None);

// read expected JSON output
let (expected_schema, expected_batches) = (batch.schema().clone(), vec![batch]);
Expand Down Expand Up @@ -221,7 +221,7 @@ mod tests {
let metadata = read_file_metadata(&mut reader)?;
let schema = metadata.schema().clone();

let reader = FileReader::new(&mut reader, metadata);
let reader = FileReader::new(&mut reader, metadata, None);

// read expected JSON output
let (expected_schema, expected_batches) = read_gzip_json(version, file_name);
Expand Down

0 comments on commit 3e0471a

Please sign in to comment.