From 1bd30fbd1a219e8982571da48eb68f34317d1e15 Mon Sep 17 00:00:00 2001 From: Bas Zalmstra Date: Wed, 21 Sep 2022 23:29:21 +0200 Subject: [PATCH] fix: keep state accross yield points when reading data (#10) --- src/archive.rs | 92 +++++++++++++++++++++++++++++++++++++------------- 1 file changed, 69 insertions(+), 23 deletions(-) diff --git a/src/archive.rs b/src/archive.rs index ea75ee6..7721e25 100644 --- a/src/archive.rs +++ b/src/archive.rs @@ -172,7 +172,7 @@ impl Archive { Ok(Entries { archive: self.clone(), - next: 0, + current: (0, None, 0, None), gnu_longlink: None, gnu_longname: None, pax_extensions: None, @@ -195,7 +195,7 @@ impl Archive { Ok(RawEntries { archive: self.clone(), - next: 0, + current: (0, None, 0), }) } @@ -236,7 +236,7 @@ impl Archive { /// Stream of `Entry`s. pub struct Entries { archive: Archive, - next: u64, + current: (u64, Option
, usize, Option), gnu_longname: Option>, gnu_longlink: Option>, pax_extensions: Option>, @@ -266,7 +266,15 @@ impl Stream for Entries { fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { loop { - let entry = ready_opt_err!(poll_next_raw(self.archive.clone(), &mut self.next, cx)); + let archive = self.archive.clone(); + let (next, current_header, current_header_pos, _) = &mut self.current; + let entry = ready_opt_err!(poll_next_raw( + archive, + next, + current_header, + current_header_pos, + cx + )); let is_recognized_header = entry.header().as_gnu().is_some() || entry.header().as_ustar().is_some(); @@ -315,9 +323,14 @@ impl Stream for Entries { fields.long_linkname = self.gnu_longlink.take(); fields.pax_extensions = self.pax_extensions.take(); + let archive = self.archive.clone(); + let (next, _, current_pos, current_ext) = &mut self.current; + ready_err!(poll_parse_sparse_header( - self.archive.clone(), - &mut self.next, + archive, + next, + current_ext, + current_pos, &mut fields, cx )); @@ -330,36 +343,50 @@ impl Stream for Entries { /// Stream of raw `Entry`s. pub struct RawEntries { archive: Archive, - next: u64, + current: (u64, Option
, usize), } impl Stream for RawEntries { type Item = io::Result>>; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - poll_next_raw(self.archive.clone(), &mut self.next, cx) + let archive = self.archive.clone(); + let (next, current_header, current_header_pos) = &mut self.current; + poll_next_raw(archive, next, current_header, current_header_pos, cx) } } fn poll_next_raw( mut archive: Archive, next: &mut u64, + current_header: &mut Option
, + current_header_pos: &mut usize, cx: &mut Context<'_>, ) -> Poll>>>> { - let mut header = Header::new_old(); let mut header_pos = *next; loop { // Seek to the start of the next header in the archive - let delta = *next - archive.inner.pos.load(Ordering::SeqCst); + if current_header.is_none() { + let delta = *next - archive.inner.pos.load(Ordering::SeqCst); + match futures_core::ready!(poll_skip(&mut archive, cx, delta)) { + Ok(_) => {} + Err(err) => return Poll::Ready(Some(Err(err))), + } - match futures_core::ready!(poll_skip(&mut archive, cx, delta)) { - Ok(_) => {} - Err(err) => return Poll::Ready(Some(Err(err))), + *current_header = Some(Header::new_old()); + *current_header_pos = 0; } + let header = current_header.as_mut().unwrap(); + // EOF is an indicator that we are at the end of the archive. - match futures_core::ready!(poll_try_read_all(&mut archive, cx, header.as_mut_bytes())) { + match futures_core::ready!(poll_try_read_all( + &mut archive, + cx, + header.as_mut_bytes(), + current_header_pos, + )) { Ok(true) => {} Ok(false) => return Poll::Ready(None), Err(err) => return Poll::Ready(Some(Err(err))), @@ -381,6 +408,8 @@ fn poll_next_raw( header_pos = *next; } + let header = current_header.as_mut().unwrap(); + // Make sure the checksum is ok let sum = header.as_bytes()[..148] .iter() @@ -397,6 +426,9 @@ fn poll_next_raw( let mut data = VecDeque::with_capacity(1); data.push_back(EntryIo::Data(archive.clone().take(size))); + drop(header); + + let header = current_header.take().unwrap(); let ret = EntryFields { size, @@ -424,6 +456,8 @@ fn poll_next_raw( fn poll_parse_sparse_header( mut archive: Archive, next: &mut u64, + current_ext: &mut Option, + current_ext_pos: &mut usize, entry: &mut EntryFields>, cx: &mut Context<'_>, ) -> Poll> { @@ -500,11 +534,22 @@ fn poll_parse_sparse_header( add_block(block)? } if gnu.is_extended() { - let mut ext = GnuExtSparseHeader::new(); - ext.isextended[0] = 1; + let started_header = current_ext.is_some(); + if !started_header { + let mut ext = GnuExtSparseHeader::new(); + ext.isextended[0] = 1; + *current_ext = Some(ext); + *current_ext_pos = 0; + } + + let ext = current_ext.as_mut().unwrap(); while ext.is_extended() { - match futures_core::ready!(poll_try_read_all(&mut archive, cx, ext.as_mut_bytes())) - { + match futures_core::ready!(poll_try_read_all( + &mut archive, + cx, + ext.as_mut_bytes(), + current_ext_pos, + )) { Ok(true) => {} Ok(false) => return Poll::Ready(Err(other("failed to read extension"))), Err(err) => return Poll::Ready(Err(err)), @@ -567,23 +612,24 @@ fn poll_try_read_all( mut source: R, cx: &mut Context<'_>, buf: &mut [u8], + pos: &mut usize, ) -> Poll> { - let mut read = 0; - while read < buf.len() { - let mut read_buf = io::ReadBuf::new(&mut buf[read..]); + while *pos < buf.len() { + let mut read_buf = io::ReadBuf::new(&mut buf[*pos..]); match futures_core::ready!(Pin::new(&mut source).poll_read(cx, &mut read_buf)) { Ok(()) if read_buf.filled().is_empty() => { - if read == 0 { + if *pos == 0 { return Poll::Ready(Ok(false)); } return Poll::Ready(Err(other("failed to read entire block"))); } - Ok(()) => read += read_buf.filled().len(), + Ok(()) => *pos += read_buf.filled().len(), Err(err) => return Poll::Ready(Err(err)), } } + *pos = 0; Poll::Ready(Ok(true)) }