Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement an Inner struct for CloneableSeekableReader #47

Merged
merged 1 commit into from
Nov 10, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
128 changes: 86 additions & 42 deletions src/unzip/cloneable_seekable_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,23 +20,65 @@ pub(crate) trait HasLength {
fn len(&self) -> u64;
}

struct Inner<R: Read + Seek + HasLength> {
/// The underlying Read implementation.
r: R,
/// The position of r.
pos: u64,
/// The length of r, lazily loaded.
len: Option<u64>,
}

impl<R: Read + Seek + HasLength> Inner<R> {
fn new(r: R) -> Self {
Self {
r,
pos: 0,
len: None,
}
}

/// Get the length of the data stream. This is assumed to be constant.
fn len(&mut self) -> u64 {
if let Some(len) = self.len {
return len;
}

let len = self.r.len();
self.len = Some(len);
len
}

/// Read into the given buffer, starting at the given offset in the data stream.
fn read_at(&mut self, offset: u64, buf: &mut [u8]) -> std::io::Result<usize> {
if offset != self.pos {
self.r.seek(SeekFrom::Start(offset))?;
}
let read_result = self.r.read(buf);
if let Ok(bytes_read) = read_result {
// TODO, once stabilised, use checked_add_signed
self.pos += bytes_read as u64;
}
read_result
}
}

/// A [`Read`] which refers to its underlying stream by reference count,
/// and thus can be cloned cheaply. It supports seeking; each cloned instance
/// maintains its own pointer into the file, and the underlying instance
/// is seeked prior to each read.
pub(crate) struct CloneableSeekableReader<R: Read + Seek + HasLength> {
file: Arc<Mutex<R>>,
/// The wrapper around the Read implementation, shared between threads.
inner: Arc<Mutex<Inner<R>>>,
/// The position of _this_ reader.
pos: u64,
// TODO determine and store this once instead of per cloneable file
file_length: Option<u64>,
}

impl<R: Read + Seek + HasLength> Clone for CloneableSeekableReader<R> {
fn clone(&self) -> Self {
Self {
file: self.file.clone(),
inner: self.inner.clone(),
pos: self.pos,
file_length: self.file_length,
}
}
}
Expand All @@ -47,34 +89,18 @@ impl<R: Read + Seek + HasLength> CloneableSeekableReader<R> {
/// to be fixed and unchanging. Odd behavior may occur if the length
/// of the stream changes; any subsequent seeks will not take account
/// of the changed stream length.
pub(crate) fn new(file: R) -> Self {
pub(crate) fn new(r: R) -> Self {
Self {
file: Arc::new(Mutex::new(file)),
inner: Arc::new(Mutex::new(Inner::new(r))),
pos: 0u64,
file_length: None,
}
}

/// Determine the length of the underlying stream.
fn ascertain_file_length(&mut self) -> u64 {
match self.file_length {
Some(file_length) => file_length,
None => {
let len = self.file.lock().unwrap().len();
self.file_length = Some(len);
len
}
}
}
}

impl<R: Read + Seek + HasLength> Read for CloneableSeekableReader<R> {
fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
let mut underlying_file = self.file.lock().expect("Unable to get underlying file");
// TODO share an object which knows current position to avoid unnecessary
// seeks
underlying_file.seek(SeekFrom::Start(self.pos))?;
let read_result = underlying_file.read(buf);
let mut inner = self.inner.lock().unwrap();
let read_result = inner.read_at(self.pos, buf);
if let Ok(bytes_read) = read_result {
// TODO, once stabilised, use checked_add_signed
self.pos += bytes_read as u64;
Expand All @@ -88,7 +114,7 @@ impl<R: Read + Seek + HasLength> Seek for CloneableSeekableReader<R> {
let new_pos = match pos {
SeekFrom::Start(pos) => pos,
SeekFrom::End(offset_from_end) => {
let file_len = self.ascertain_file_length();
let file_len = self.inner.lock().unwrap().len();
if -offset_from_end as u64 > file_len {
return Err(std::io::Error::new(
std::io::ErrorKind::InvalidInput,
Expand Down Expand Up @@ -137,26 +163,44 @@ mod test {
}

#[test]
fn test_cloneable_seekable_reader() {
fn test_cloneable_seekable_reader() -> std::io::Result<()> {
let buf: Vec<u8> = vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9];
let buf = Cursor::new(buf);
let mut reader = CloneableSeekableReader::new(buf);
let mut out = vec![0; 2];
assert!(reader.read_exact(&mut out).is_ok());
assert_eq!(out[0], 0);
assert_eq!(out[1], 1);
assert!(reader.rewind().is_ok());
assert!(reader.read_exact(&mut out).is_ok());
assert_eq!(out[0], 0);
assert_eq!(out[1], 1);
assert!(reader.stream_position().is_ok());
assert!(reader.read_exact(&mut out).is_ok());
assert_eq!(out[0], 2);
assert_eq!(out[1], 3);
assert!(reader.seek(SeekFrom::End(-2)).is_ok());
assert!(reader.read_exact(&mut out).is_ok());
assert_eq!(out[0], 8);
assert_eq!(out[1], 9);
reader.read_exact(&mut out)?;
assert_eq!(&out, &[0, 1]);
reader.rewind()?;
reader.read_exact(&mut out)?;
assert_eq!(&out, &[0, 1]);
reader.stream_position()?;
reader.read_exact(&mut out)?;
assert_eq!(&out, &[2, 3]);
reader.seek(SeekFrom::End(-2))?;
reader.read_exact(&mut out)?;
assert_eq!(&out, &[8, 9]);
assert!(reader.read_exact(&mut out).is_err());
Ok(())
}

#[test]
fn test_cloned_independent_positions() -> std::io::Result<()> {
let buf: Vec<u8> = vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9];
let buf = Cursor::new(buf);
let mut r1 = CloneableSeekableReader::new(buf);
let mut r2 = r1.clone();
let mut out = vec![0; 2];
r1.read_exact(&mut out)?;
assert_eq!(&out, &[0, 1]);
r2.read_exact(&mut out)?;
assert_eq!(&out, &[0, 1]);
r1.read_exact(&mut out)?;
assert_eq!(&out, &[2, 3]);
r2.seek(SeekFrom::End(-2))?;
r2.read_exact(&mut out)?;
assert_eq!(&out, &[8, 9]);
r1.read_exact(&mut out)?;
assert_eq!(&out, &[4, 5]);
Ok(())
}
}
Loading