Skip to content

Commit

Permalink
fix: keep state accross yield points when reading data (dignifiedquir…
Browse files Browse the repository at this point in the history
  • Loading branch information
baszalmstra authored Sep 21, 2022
1 parent 329ac1c commit 1bd30fb
Showing 1 changed file with 69 additions and 23 deletions.
92 changes: 69 additions & 23 deletions src/archive.rs
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ impl<R: Read + Unpin> Archive<R> {

Ok(Entries {
archive: self.clone(),
next: 0,
current: (0, None, 0, None),
gnu_longlink: None,
gnu_longname: None,
pax_extensions: None,
Expand All @@ -195,7 +195,7 @@ impl<R: Read + Unpin> Archive<R> {

Ok(RawEntries {
archive: self.clone(),
next: 0,
current: (0, None, 0),
})
}

Expand Down Expand Up @@ -236,7 +236,7 @@ impl<R: Read + Unpin> Archive<R> {
/// Stream of `Entry`s.
pub struct Entries<R: Read + Unpin> {
archive: Archive<R>,
next: u64,
current: (u64, Option<Header>, usize, Option<GnuExtSparseHeader>),
gnu_longname: Option<Vec<u8>>,
gnu_longlink: Option<Vec<u8>>,
pax_extensions: Option<Vec<u8>>,
Expand Down Expand Up @@ -266,7 +266,15 @@ impl<R: Read + Unpin> Stream for Entries<R> {

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
loop {
let entry = ready_opt_err!(poll_next_raw(self.archive.clone(), &mut self.next, cx));
let archive = self.archive.clone();
let (next, current_header, current_header_pos, _) = &mut self.current;
let entry = ready_opt_err!(poll_next_raw(
archive,
next,
current_header,
current_header_pos,
cx
));

let is_recognized_header =
entry.header().as_gnu().is_some() || entry.header().as_ustar().is_some();
Expand Down Expand Up @@ -315,9 +323,14 @@ impl<R: Read + Unpin> Stream for Entries<R> {
fields.long_linkname = self.gnu_longlink.take();
fields.pax_extensions = self.pax_extensions.take();

let archive = self.archive.clone();
let (next, _, current_pos, current_ext) = &mut self.current;

ready_err!(poll_parse_sparse_header(
self.archive.clone(),
&mut self.next,
archive,
next,
current_ext,
current_pos,
&mut fields,
cx
));
Expand All @@ -330,36 +343,50 @@ impl<R: Read + Unpin> Stream for Entries<R> {
/// Stream of raw `Entry`s.
pub struct RawEntries<R: Read + Unpin> {
archive: Archive<R>,
next: u64,
current: (u64, Option<Header>, usize),
}

impl<R: Read + Unpin> Stream for RawEntries<R> {
type Item = io::Result<Entry<Archive<R>>>;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
poll_next_raw(self.archive.clone(), &mut self.next, cx)
let archive = self.archive.clone();
let (next, current_header, current_header_pos) = &mut self.current;
poll_next_raw(archive, next, current_header, current_header_pos, cx)
}
}

fn poll_next_raw<R: Read + Unpin>(
mut archive: Archive<R>,
next: &mut u64,
current_header: &mut Option<Header>,
current_header_pos: &mut usize,
cx: &mut Context<'_>,
) -> Poll<Option<io::Result<Entry<Archive<R>>>>> {
let mut header = Header::new_old();
let mut header_pos = *next;

loop {
// Seek to the start of the next header in the archive
let delta = *next - archive.inner.pos.load(Ordering::SeqCst);
if current_header.is_none() {
let delta = *next - archive.inner.pos.load(Ordering::SeqCst);
match futures_core::ready!(poll_skip(&mut archive, cx, delta)) {
Ok(_) => {}
Err(err) => return Poll::Ready(Some(Err(err))),
}

match futures_core::ready!(poll_skip(&mut archive, cx, delta)) {
Ok(_) => {}
Err(err) => return Poll::Ready(Some(Err(err))),
*current_header = Some(Header::new_old());
*current_header_pos = 0;
}

let header = current_header.as_mut().unwrap();

// EOF is an indicator that we are at the end of the archive.
match futures_core::ready!(poll_try_read_all(&mut archive, cx, header.as_mut_bytes())) {
match futures_core::ready!(poll_try_read_all(
&mut archive,
cx,
header.as_mut_bytes(),
current_header_pos,
)) {
Ok(true) => {}
Ok(false) => return Poll::Ready(None),
Err(err) => return Poll::Ready(Some(Err(err))),
Expand All @@ -381,6 +408,8 @@ fn poll_next_raw<R: Read + Unpin>(
header_pos = *next;
}

let header = current_header.as_mut().unwrap();

// Make sure the checksum is ok
let sum = header.as_bytes()[..148]
.iter()
Expand All @@ -397,6 +426,9 @@ fn poll_next_raw<R: Read + Unpin>(

let mut data = VecDeque::with_capacity(1);
data.push_back(EntryIo::Data(archive.clone().take(size)));
drop(header);

let header = current_header.take().unwrap();

let ret = EntryFields {
size,
Expand Down Expand Up @@ -424,6 +456,8 @@ fn poll_next_raw<R: Read + Unpin>(
fn poll_parse_sparse_header<R: Read + Unpin>(
mut archive: Archive<R>,
next: &mut u64,
current_ext: &mut Option<GnuExtSparseHeader>,
current_ext_pos: &mut usize,
entry: &mut EntryFields<Archive<R>>,
cx: &mut Context<'_>,
) -> Poll<io::Result<()>> {
Expand Down Expand Up @@ -500,11 +534,22 @@ fn poll_parse_sparse_header<R: Read + Unpin>(
add_block(block)?
}
if gnu.is_extended() {
let mut ext = GnuExtSparseHeader::new();
ext.isextended[0] = 1;
let started_header = current_ext.is_some();
if !started_header {
let mut ext = GnuExtSparseHeader::new();
ext.isextended[0] = 1;
*current_ext = Some(ext);
*current_ext_pos = 0;
}

let ext = current_ext.as_mut().unwrap();
while ext.is_extended() {
match futures_core::ready!(poll_try_read_all(&mut archive, cx, ext.as_mut_bytes()))
{
match futures_core::ready!(poll_try_read_all(
&mut archive,
cx,
ext.as_mut_bytes(),
current_ext_pos,
)) {
Ok(true) => {}
Ok(false) => return Poll::Ready(Err(other("failed to read extension"))),
Err(err) => return Poll::Ready(Err(err)),
Expand Down Expand Up @@ -567,23 +612,24 @@ fn poll_try_read_all<R: Read + Unpin>(
mut source: R,
cx: &mut Context<'_>,
buf: &mut [u8],
pos: &mut usize,
) -> Poll<io::Result<bool>> {
let mut read = 0;
while read < buf.len() {
let mut read_buf = io::ReadBuf::new(&mut buf[read..]);
while *pos < buf.len() {
let mut read_buf = io::ReadBuf::new(&mut buf[*pos..]);
match futures_core::ready!(Pin::new(&mut source).poll_read(cx, &mut read_buf)) {
Ok(()) if read_buf.filled().is_empty() => {
if read == 0 {
if *pos == 0 {
return Poll::Ready(Ok(false));
}

return Poll::Ready(Err(other("failed to read entire block")));
}
Ok(()) => read += read_buf.filled().len(),
Ok(()) => *pos += read_buf.filled().len(),
Err(err) => return Poll::Ready(Err(err)),
}
}

*pos = 0;
Poll::Ready(Ok(true))
}

Expand Down

0 comments on commit 1bd30fb

Please sign in to comment.