Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove automatic buffering in ipc::reader::FileReader for for consistent buffering #6132

Merged
merged 2 commits into from
Jul 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 48 additions & 17 deletions arrow-ipc/src/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1010,8 +1010,8 @@ impl FileReaderBuilder {
}

/// Arrow File reader
pub struct FileReader<R: Read + Seek> {
/// Buffered file reader that supports reading and seeking
pub struct FileReader<R> {
/// File reader that supports reading and seeking
reader: R,

/// The decoder
Expand All @@ -1032,7 +1032,7 @@ pub struct FileReader<R: Read + Seek> {
custom_metadata: HashMap<String, String>,
}

impl<R: Read + Seek> fmt::Debug for FileReader<R> {
impl<R> fmt::Debug for FileReader<R> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
f.debug_struct("FileReader<R>")
.field("decoder", &self.decoder)
Expand All @@ -1043,10 +1043,26 @@ impl<R: Read + Seek> fmt::Debug for FileReader<R> {
}
}

impl<R: Read + Seek> FileReader<BufReader<R>> {
/// Try to create a new file reader with the reader wrapped in a BufReader.
///
/// See [`FileReader::try_new`] for an unbuffered version.
V0ldek marked this conversation as resolved.
Show resolved Hide resolved
pub fn try_new_buffered(reader: R, projection: Option<Vec<usize>>) -> Result<Self, ArrowError> {
Self::try_new(BufReader::new(reader), projection)
}
}

impl<R: Read + Seek> FileReader<R> {
/// Try to create a new file reader
/// Try to create a new file reader.
///
/// Returns errors if the file does not meet the Arrow Format footer requirements
/// There is no internal buffering. If buffered reads are needed you likely want to use
/// [`FileReader::try_new_buffered`] instead.
///
/// # Errors
///
/// An ['Err'](Result::Err) may be returned if:
/// - the file does not meet the Arrow Format footer requirements, or
/// - file endianness does not match the target endianness.
pub fn try_new(reader: R, projection: Option<Vec<usize>>) -> Result<Self, ArrowError> {
let builder = FileReaderBuilder {
projection,
Expand Down Expand Up @@ -1129,7 +1145,7 @@ impl<R: Read + Seek> RecordBatchReader for FileReader<R> {
}

/// Arrow Stream reader
pub struct StreamReader<R: Read> {
pub struct StreamReader<R> {
/// Stream reader
reader: R,

Expand All @@ -1150,10 +1166,10 @@ pub struct StreamReader<R: Read> {
projection: Option<(Vec<usize>, Schema)>,
}

impl<R: Read> fmt::Debug for StreamReader<R> {
impl<R> fmt::Debug for StreamReader<R> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> std::result::Result<(), fmt::Error> {
f.debug_struct("StreamReader<R>")
.field("reader", &"BufReader<..>")
.field("reader", &"R")
.field("schema", &self.schema)
.field("dictionaries_by_id", &self.dictionaries_by_id)
.field("finished", &self.finished)
Expand All @@ -1163,21 +1179,27 @@ impl<R: Read> fmt::Debug for StreamReader<R> {
}

impl<R: Read> StreamReader<BufReader<R>> {
/// Try to create a new stream reader with the reader wrapped in a BufReader
/// Try to create a new stream reader with the reader wrapped in a BufReader.
///
/// The first message in the stream is the schema, the reader will fail if it does not
/// encounter a schema.
/// To check if the reader is done, use `is_finished(self)`
pub fn try_new(reader: R, projection: Option<Vec<usize>>) -> Result<Self, ArrowError> {
Self::try_new_unbuffered(BufReader::new(reader), projection)
/// See [`StreamReader::try_new`] for an unbuffered version.
pub fn try_new_buffered(reader: R, projection: Option<Vec<usize>>) -> Result<Self, ArrowError> {
Self::try_new(BufReader::new(reader), projection)
}
}

impl<R: Read> StreamReader<R> {
/// Try to create a new stream reader but do not wrap the reader in a BufReader.
/// Try to create a new stream reader.
///
/// Unless you need the StreamReader to be unbuffered you likely want to use `StreamReader::try_new` instead.
pub fn try_new_unbuffered(
V0ldek marked this conversation as resolved.
Show resolved Hide resolved
/// To check if the reader is done, use [`is_finished(self)`](StreamReader::is_finished).
///
/// There is no internal buffering. If buffered reads are needed you likely want to use
/// [`StreamReader::try_new_buffered`] instead.
///
/// # Errors
///
/// An ['Err'](Result::Err) may be returned if the reader does not encounter a schema
/// as the first message in the stream.
pub fn try_new(
mut reader: R,
projection: Option<Vec<usize>>,
) -> Result<StreamReader<R>, ArrowError> {
Expand Down Expand Up @@ -1224,6 +1246,15 @@ impl<R: Read> StreamReader<R> {
})
}

/// Deprecated, use [`StreamReader::try_new`] instead.
V0ldek marked this conversation as resolved.
Show resolved Hide resolved
#[deprecated(since = "53.0.0", note = "use `try_new` instead")]
pub fn try_new_unbuffered(
reader: R,
projection: Option<Vec<usize>>,
) -> Result<Self, ArrowError> {
Self::try_new(reader, projection)
}

/// Return the schema of the stream
pub fn schema(&self) -> SchemaRef {
self.schema.clone()
Expand Down
92 changes: 67 additions & 25 deletions arrow-ipc/src/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -821,9 +821,9 @@ impl DictionaryTracker {
}

/// Writer for an IPC file
pub struct FileWriter<W: Write> {
pub struct FileWriter<W> {
/// The object to write to
writer: BufWriter<W>,
writer: W,
/// IPC write options
write_options: IpcWriteOptions,
/// A reference to the schema, used in validating record batches
Expand All @@ -844,21 +844,41 @@ pub struct FileWriter<W: Write> {
data_gen: IpcDataGenerator,
}

impl<W: Write> FileWriter<BufWriter<W>> {
/// Try to create a new file writer with the writer wrapped in a BufWriter.
///
/// See [`FileWriter::try_new`] for an unbuffered version.
pub fn try_new_buffered(writer: W, schema: &Schema) -> Result<Self, ArrowError> {
Self::try_new(BufWriter::new(writer), schema)
}
}

impl<W: Write> FileWriter<W> {
/// Try to create a new writer, with the schema written as part of the header
V0ldek marked this conversation as resolved.
Show resolved Hide resolved
///
/// Note the created writer is not buffered. See [`FileWriter::try_new_buffered`] for details.
///
/// # Errors
///
/// An ['Err'](Result::Err) may be returned if writing the header to the writer fails.
pub fn try_new(writer: W, schema: &Schema) -> Result<Self, ArrowError> {
let write_options = IpcWriteOptions::default();
Self::try_new_with_options(writer, schema, write_options)
}

/// Try to create a new writer with IpcWriteOptions
V0ldek marked this conversation as resolved.
Show resolved Hide resolved
///
/// Note the created writer is not buffered. See [`FileWriter::try_new_buffered`] for details.
///
/// # Errors
///
/// An ['Err'](Result::Err) may be returned if writing the header to the writer fails.
pub fn try_new_with_options(
writer: W,
mut writer: W,
schema: &Schema,
write_options: IpcWriteOptions,
) -> Result<Self, ArrowError> {
let data_gen = IpcDataGenerator::default();
let mut writer = BufWriter::new(writer);
// write magic to header aligned on alignment boundary
let pad_len = pad_to_alignment(write_options.alignment, super::ARROW_MAGIC.len());
let header_size = super::ARROW_MAGIC.len() + pad_len;
Expand Down Expand Up @@ -972,14 +992,14 @@ impl<W: Write> FileWriter<W> {

/// Gets a reference to the underlying writer.
pub fn get_ref(&self) -> &W {
self.writer.get_ref()
&self.writer
}

/// Gets a mutable reference to the underlying writer.
///
/// It is inadvisable to directly write to the underlying writer.
pub fn get_mut(&mut self) -> &mut W {
self.writer.get_mut()
&mut self.writer
}

/// Flush the underlying writer.
Expand All @@ -990,16 +1010,20 @@ impl<W: Write> FileWriter<W> {
Ok(())
}

/// Unwraps the BufWriter housed in FileWriter.writer, returning the underlying
/// writer
/// Unwraps the the underlying writer.
///
/// The writer is flushed and the FileWriter is finished before returning.
///
/// The buffer is flushed and the FileWriter is finished before returning the
/// writer.
/// # Errors
///
/// An ['Err'](Result::Err) may be returned if an error occurs while finishing the StreamWriter
V0ldek marked this conversation as resolved.
Show resolved Hide resolved
/// or while flushing the writer.
pub fn into_inner(mut self) -> Result<W, ArrowError> {
if !self.finished {
// `finish` flushes the writer.
self.finish()?;
}
self.writer.into_inner().map_err(ArrowError::from)
Ok(self.writer)
}
}

Expand All @@ -1014,9 +1038,9 @@ impl<W: Write> RecordBatchWriter for FileWriter<W> {
}

/// Writer for an IPC stream
pub struct StreamWriter<W: Write> {
pub struct StreamWriter<W> {
/// The object to write to
writer: BufWriter<W>,
writer: W,
/// IPC write options
write_options: IpcWriteOptions,
/// Whether the writer footer has been written, and the writer is finished
Expand All @@ -1027,20 +1051,39 @@ pub struct StreamWriter<W: Write> {
data_gen: IpcDataGenerator,
}

impl<W: Write> StreamWriter<BufWriter<W>> {
/// Try to create a new stream writer with the writer wrapped in a BufWriter.
///
/// See [`StreamWriter::try_new`] for an unbuffered version.
pub fn try_new_buffered(writer: W, schema: &Schema) -> Result<Self, ArrowError> {
Self::try_new(BufWriter::new(writer), schema)
}
}

impl<W: Write> StreamWriter<W> {
/// Try to create a new writer, with the schema written as part of the header
/// Try to create a new writer, with the schema written as part of the header.
///
/// Note that there is no internal buffering. See also [`StreamWriter::try_new_buffered`].
///
/// # Errors
///
/// An ['Err'](Result::Err) may be returned if writing the header to the writer fails.
pub fn try_new(writer: W, schema: &Schema) -> Result<Self, ArrowError> {
let write_options = IpcWriteOptions::default();
Self::try_new_with_options(writer, schema, write_options)
}

/// Try to create a new writer with [`IpcWriteOptions`].
///
/// # Errors
///
/// An ['Err'](Result::Err) may be returned if writing the header to the writer fails.
pub fn try_new_with_options(
writer: W,
mut writer: W,
schema: &Schema,
write_options: IpcWriteOptions,
) -> Result<Self, ArrowError> {
let data_gen = IpcDataGenerator::default();
let mut writer = BufWriter::new(writer);
// write the schema, set the written bytes to the schema
let encoded_message = data_gen.schema_to_bytes(schema, &write_options);
write_message(&mut writer, encoded_message, &write_options)?;
Expand Down Expand Up @@ -1095,14 +1138,14 @@ impl<W: Write> StreamWriter<W> {

/// Gets a reference to the underlying writer.
pub fn get_ref(&self) -> &W {
self.writer.get_ref()
&self.writer
}

/// Gets a mutable reference to the underlying writer.
///
/// It is inadvisable to directly write to the underlying writer.
pub fn get_mut(&mut self) -> &mut W {
self.writer.get_mut()
&mut self.writer
}

/// Flush the underlying writer.
Expand All @@ -1113,16 +1156,14 @@ impl<W: Write> StreamWriter<W> {
Ok(())
}

/// Unwraps the BufWriter housed in StreamWriter.writer, returning the underlying
/// writer
/// Unwraps the the underlying writer.
///
/// The buffer is flushed and the StreamWriter is finished before returning the
/// writer.
/// The writer is flushed and the StreamWriter is finished before returning.
///
/// # Errors
///
/// An ['Err'] may be returned if an error occurs while finishing the StreamWriter
/// or while flushing the buffer.
/// An ['Err'](Result::Err) may be returned if an error occurs while finishing the StreamWriter
/// or while flushing the writer.
///
/// # Example
///
Expand Down Expand Up @@ -1154,9 +1195,10 @@ impl<W: Write> StreamWriter<W> {
/// ```
pub fn into_inner(mut self) -> Result<W, ArrowError> {
if !self.finished {
// `finish` flushes.
self.finish()?;
}
self.writer.into_inner().map_err(ArrowError::from)
Ok(self.writer)
}
}

Expand Down
Loading