Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add IPC StreamDecoder #5531

Merged
merged 3 commits into from
Mar 23, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 16 additions & 6 deletions arrow-buffer/src/buffer/immutable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -171,23 +171,33 @@ impl Buffer {

/// Returns a new [Buffer] that is a slice of this buffer starting at `offset`.
/// Doing so allows the same memory region to be shared between buffers.
///
/// # Panics
///
/// Panics iff `offset` is larger than `len`.
pub fn slice(&self, offset: usize) -> Self {
let mut s = self.clone();
s.advance(offset);
s
}

/// Increases the offset of this buffer by `offset`
///
/// # Panics
///
/// Panics iff `offset` is larger than `len`.
#[inline]
pub fn advance(&mut self, offset: usize) {
assert!(
offset <= self.length,
"the offset of the new Buffer cannot exceed the existing length"
);
self.length -= offset;
// Safety:
// This cannot overflow as
// `self.offset + self.length < self.data.len()`
// `offset < self.length`
let ptr = unsafe { self.ptr.add(offset) };
Self {
data: self.data.clone(),
length: self.length - offset,
ptr,
}
self.ptr = unsafe { self.ptr.add(offset) };
}

/// Returns a new [Buffer] that is a slice of this buffer starting at `offset`,
Expand Down
35 changes: 32 additions & 3 deletions arrow-integration-testing/tests/ipc_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,12 @@
//! in `testing/arrow-ipc-stream/integration/...`

use arrow::error::ArrowError;
use arrow::ipc::reader::{FileReader, StreamReader};
use arrow::ipc::reader::{FileReader, StreamDecoder, StreamReader};
use arrow::util::test_util::arrow_test_data;
use arrow_buffer::Buffer;
use arrow_integration_testing::read_gzip_json;
use std::fs::File;
use std::io::Read;

#[test]
fn read_0_1_4() {
Expand Down Expand Up @@ -182,18 +184,45 @@ fn verify_arrow_stream(testdata: &str, version: &str, path: &str) {
let filename = format!("{testdata}/arrow-ipc-stream/integration/{version}/{path}.stream");
println!("Verifying {filename}");

// read expected JSON output
let arrow_json = read_gzip_json(version, path);

// Compare contents to the expected output format in JSON
{
println!(" verifying content");
let file = File::open(&filename).unwrap();
let mut reader = StreamReader::try_new(file, None).unwrap();

// read expected JSON output
let arrow_json = read_gzip_json(version, path);
assert!(arrow_json.equals_reader(&mut reader).unwrap());
// the next batch must be empty
assert!(reader.next().is_none());
// the stream must indicate that it's finished
assert!(reader.is_finished());
}

// Test stream decoder
let expected = arrow_json.get_record_batches().unwrap();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

✅ this is a very nice test

for chunk_sizes in [1, 2, 8, 123] {
let mut decoder = StreamDecoder::new();
let stream = chunked_file(&filename, chunk_sizes);
let mut actual = Vec::with_capacity(expected.len());
for mut x in stream {
while !x.is_empty() {
if let Some(x) = decoder.decode(&mut x).unwrap() {
actual.push(x);
}
}
}
decoder.finish().unwrap();
assert_eq!(expected, actual);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we also add tests for error conditions?

At the very least I think we should have a test for truncated stream which I think would be the most likely error in practice.

It would be nice to have a test for things like missing dictionaries, but I think that the value of those tests is lower

}
}

fn chunked_file(filename: &str, chunk_size: u64) -> impl Iterator<Item = Buffer> {
let mut file = File::open(filename).unwrap();
std::iter::from_fn(move || {
let mut buf = vec![];
let read = (&mut file).take(chunk_size).read_to_end(&mut buf).unwrap();
(read != 0).then(|| Buffer::from_vec(buf))
})
}
48 changes: 46 additions & 2 deletions arrow-ipc/src/convert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,17 @@

//! Utilities for converting between IPC types and native Arrow types

use arrow_buffer::Buffer;
use arrow_schema::*;
use flatbuffers::{FlatBufferBuilder, ForwardsUOffset, UnionWIPOffset, Vector, WIPOffset};
use flatbuffers::{
FlatBufferBuilder, ForwardsUOffset, UnionWIPOffset, Vector, Verifiable, Verifier,
VerifierOptions, WIPOffset,
};
use std::collections::HashMap;
use std::fmt::{Debug, Formatter};
use std::sync::Arc;

use crate::{size_prefixed_root_as_message, KeyValue, CONTINUATION_MARKER};
use crate::{size_prefixed_root_as_message, KeyValue, Message, CONTINUATION_MARKER};
use DataType::*;

/// Serialize a schema in IPC format
Expand Down Expand Up @@ -806,6 +811,45 @@ pub(crate) fn get_fb_dictionary<'a>(
builder.finish()
}

/// An owned container for a validated [`Message`]
///
/// Safely decoding a flatbuffer requires validating the various embedded offsets,
/// see [`Verifier`]. This is a potentially expensive operation, and it is therefore desirable
/// to only do this once. [`crate::root_as_message`] performs this validation on construction,
/// however, it returns a [`Message`] borrowing the provided byte slice. This prevents
/// storing this [`Message`] in the same data structure that owns the buffer, as this
/// would require self-referential borrows.
///
/// [`MessageBuffer`] solves this problem by providing a safe API for a [`Message`]
/// without a lifetime bound.
#[derive(Clone)]
pub struct MessageBuffer(Buffer);

impl Debug for MessageBuffer {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
self.as_ref().fmt(f)
}
}

impl MessageBuffer {
/// Try to create a [`MessageBuffer`] from the provided [`Buffer`]
pub fn try_new(buf: Buffer) -> Result<Self, ArrowError> {
let opts = VerifierOptions::default();
let mut v = Verifier::new(&opts, &buf);
<ForwardsUOffset<Message>>::run_verifier(&mut v, 0).map_err(|err| {
ArrowError::ParseError(format!("Unable to get root as message: {err:?}"))
})?;
Ok(Self(buf))
}

/// Return the [`Message`]
#[inline]
pub fn as_ref(&self) -> Message<'_> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would be more standard to call this function as_message and then provide an impl AsRef<Message> for MessageBuffer and that way the compiler can automatically do the deref.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately this isn't possible because of the lifetime on Message

// SAFETY: Run verifier on construction
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was worried that this is a not a safe API as since this is a public struct and it could be created directly

let buf = MessageBuffer(my_unsafe_buffer);
// cast to message without checks being called:
let my unsafe_message = buf.as_ref();

However, I convinced myself it was actually safe as long as MessageBuffer is used from some other module:

https://play.rust-lang.org/?version=stable&mode=debug&edition=2021&gist=efb81e5fe993e75995b09e8773d31643

unsafe { crate::root_as_message_unchecked(&self.0) }
}
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
4 changes: 4 additions & 0 deletions arrow-ipc/src/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@
//! The `FileReader` and `StreamReader` have similar interfaces,
//! however the `FileReader` expects a reader that supports `Seek`ing

mod stream;

pub use stream::*;

use flatbuffers::{VectorIter, VerifierOptions};
use std::collections::HashMap;
use std::fmt;
Expand Down
Loading
Loading