From 89e27e2d8bb0f38bca1054706707c034853ee241 Mon Sep 17 00:00:00 2001 From: adetaylor Date: Sat, 1 Jun 2024 09:42:39 +0100 Subject: [PATCH 1/4] Bail instead of deadlock if reader creation fails. --- src/unzip/seekable_http_reader.rs | 38 +++++++++++++++++++++++-------- 1 file changed, 28 insertions(+), 10 deletions(-) diff --git a/src/unzip/seekable_http_reader.rs b/src/unzip/seekable_http_reader.rs index 31365cc..7066437 100644 --- a/src/unzip/seekable_http_reader.rs +++ b/src/unzip/seekable_http_reader.rs @@ -136,6 +136,9 @@ struct State { /// this. If it's absent, some other thread is doing a read, and /// you may not. reader: Option>, + /// Some problem was encountered creating a reader. + /// All threads should abandon hope. + reader_creation_err: bool, } impl State { @@ -388,6 +391,12 @@ impl SeekableHttpReaderEngine { while reading_stuff.is_none() { // - If yes, release CACHE mutex, WAIT on condvar atomically state = self.read_completed.wait(state).unwrap(); + if state.reader_creation_err { + return Err(std::io::Error::new( + std::io::ErrorKind::BrokenPipe, + "another thread experienced a problem creating a reader", + )); + } // check cache again if let Some(bytes_read_from_cache) = state.read_from_cache(pos, buf) { log::debug!("Deferred cache success"); @@ -438,16 +447,25 @@ impl SeekableHttpReaderEngine { let mut reader_created = false; if reading_stuff.reader.is_none() { log::debug!("create_reader"); - reading_stuff.reader = Some(( - BufReader::new( - reading_stuff - .range_fetcher - .fetch_range(pos) - .map_err(|e| std::io::Error::new(ErrorKind::Unsupported, e.to_string()))?, - ), - pos, - )); - reader_created = true; + + let new_reader = reading_stuff + .range_fetcher + .fetch_range(pos) + .map_err(|e| std::io::Error::new(ErrorKind::Unsupported, e.to_string())); + + match new_reader { + Ok(new_reader) => { + reading_stuff.reader = Some((BufReader::new(new_reader), pos)); + reader_created = true; + } + Err(err) => { + let mut state = self.state.lock().unwrap(); + state.reader_creation_err = true; + // Tell any waiting threads that they might have an issue + self.read_completed.notify_all(); + return Err(err); + } + } }; let (reader, reader_pos) = reading_stuff.reader.as_mut().unwrap(); From fb1edc56ad46a80b66a5f080437128cfc9cdfc0e Mon Sep 17 00:00:00 2001 From: Adrian Taylor Date: Sat, 1 Jun 2024 14:20:30 +0100 Subject: [PATCH 2/4] Separate out HTTP stream reader function. --- src/unzip/seekable_http_reader.rs | 70 ++++++++++++++++++++----------- 1 file changed, 46 insertions(+), 24 deletions(-) diff --git a/src/unzip/seekable_http_reader.rs b/src/unzip/seekable_http_reader.rs index 7066437..9306e10 100644 --- a/src/unzip/seekable_http_reader.rs +++ b/src/unzip/seekable_http_reader.rs @@ -138,7 +138,7 @@ struct State { reader: Option>, /// Some problem was encountered creating a reader. /// All threads should abandon hope. - reader_creation_err: bool, + read_failed_somewhere: bool, } impl State { @@ -341,7 +341,6 @@ impl SeekableHttpReaderEngine { /// Read some data, ideally from the cache of pre-read blocks, but /// otherwise from the underlying HTTP stream. - #[allow(clippy::comparison_chain)] fn read(&self, buf: &mut [u8], pos: u64) -> std::io::Result { // There is some mutex delicacy here. Goals are: // a) Allow exactly one thread to be reading on the underlying HTTP stream; @@ -391,7 +390,7 @@ impl SeekableHttpReaderEngine { while reading_stuff.is_none() { // - If yes, release CACHE mutex, WAIT on condvar atomically state = self.read_completed.wait(state).unwrap(); - if state.reader_creation_err { + if state.read_failed_somewhere { return Err(std::io::Error::new( std::io::ErrorKind::BrokenPipe, "another thread experienced a problem creating a reader", @@ -404,8 +403,8 @@ impl SeekableHttpReaderEngine { } reading_stuff = state.reader.take(); } - let mut reading_stuff = reading_stuff.unwrap(); // feels like there should - // be a way to do this with while let + let reading_stuff = reading_stuff.unwrap(); // feels like there should + // be a way to do this with while let state.stats.cache_misses += 1; // - If no: // We'll start to do a read ourselves. Because we take()d the @@ -419,6 +418,38 @@ impl SeekableHttpReaderEngine { // release STATE mutex drop(state); // perform read + let read_result = self.perform_read_using_reader( + buf, + pos, + reading_stuff, + expect_skip_ahead, + skip_ahead_threshold, + max_block, + ); + if read_result.is_err() { + let mut state = self.state.lock().unwrap(); + state.read_failed_somewhere = true; + } + // 'state' has been updated to indicate either an error, or the + // reading_materials has been repopulated, so wake up all other + // threads to check. + self.read_completed.notify_all(); + read_result + } + + #[allow(clippy::comparison_chain)] + // Read from the underlying HTTP stream + // This is a separate function because if it errors at any point + // we need to take cleanup action in the caller. + fn perform_read_using_reader( + &self, + buf: &mut [u8], + pos: u64, + mut reading_stuff: Box, + expect_skip_ahead: bool, + skip_ahead_threshold: u64, + max_block: usize, + ) -> std::io::Result { // First check if we need to rewind, OR if we need to fast forward // and are expecting to skip over some significant data. if let Some((_, readerpos)) = reading_stuff.reader.as_ref() { @@ -447,25 +478,16 @@ impl SeekableHttpReaderEngine { let mut reader_created = false; if reading_stuff.reader.is_none() { log::debug!("create_reader"); - - let new_reader = reading_stuff - .range_fetcher - .fetch_range(pos) - .map_err(|e| std::io::Error::new(ErrorKind::Unsupported, e.to_string())); - - match new_reader { - Ok(new_reader) => { - reading_stuff.reader = Some((BufReader::new(new_reader), pos)); - reader_created = true; - } - Err(err) => { - let mut state = self.state.lock().unwrap(); - state.reader_creation_err = true; - // Tell any waiting threads that they might have an issue - self.read_completed.notify_all(); - return Err(err); - } - } + reading_stuff.reader = Some(( + BufReader::new( + reading_stuff + .range_fetcher + .fetch_range(pos) + .map_err(|e| std::io::Error::new(ErrorKind::Unsupported, e.to_string()))?, + ), + pos, + )); + reader_created = true; }; let (reader, reader_pos) = reading_stuff.reader.as_mut().unwrap(); From 1d4dee3e40c5b08677a35dcdb599157c2a98f339 Mon Sep 17 00:00:00 2001 From: Adrian Taylor Date: Sat, 1 Jun 2024 15:18:47 +0100 Subject: [PATCH 3/4] Note desirable tests. --- src/unzip/seekable_http_reader.rs | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/src/unzip/seekable_http_reader.rs b/src/unzip/seekable_http_reader.rs index 9306e10..6e3f2ef 100644 --- a/src/unzip/seekable_http_reader.rs +++ b/src/unzip/seekable_http_reader.rs @@ -828,4 +828,11 @@ mod tests { server.verify_and_clear(); } } + + // It would be highly desirable to enhance these tests with: + // * tests of what happens if the server refuses a connection at any + // point. + // * tests of what happens if the server closes a connection part way + // through + // * multi-threaded tests } From e5e4482fe1b4a0736da117ca9f62a6dffe4647c4 Mon Sep 17 00:00:00 2001 From: Adrian Taylor Date: Sat, 1 Jun 2024 15:19:07 +0100 Subject: [PATCH 4/4] Reviser to 1.2.2 --- Cargo.lock | 2 +- Cargo.toml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index b83f33d..ad9b0ca 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1437,7 +1437,7 @@ dependencies = [ [[package]] name = "ripunzip" -version = "1.2.1" +version = "1.2.2" dependencies = [ "anyhow", "clap 4.4.7", diff --git a/Cargo.toml b/Cargo.toml index b30eb03..f70188f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,7 +8,7 @@ [package] name = "ripunzip" -version = "1.2.1" +version = "1.2.2" edition = "2021" authors = ["Adrian Taylor "] license = "MIT OR Apache-2.0"