Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bail instead of deadlock if reader creation fails. #70

Merged
merged 4 commits into from
Jun 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

[package]
name = "ripunzip"
version = "1.2.1"
version = "1.2.2"
edition = "2021"
authors = ["Adrian Taylor <adetaylor@chromium.org>"]
license = "MIT OR Apache-2.0"
Expand Down
53 changes: 50 additions & 3 deletions src/unzip/seekable_http_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,9 @@ struct State {
/// this. If it's absent, some other thread is doing a read, and
/// you may not.
reader: Option<Box<ReadingMaterials>>,
/// Some problem was encountered creating a reader.
/// All threads should abandon hope.
read_failed_somewhere: bool,
}

impl State {
Expand Down Expand Up @@ -338,7 +341,6 @@ impl SeekableHttpReaderEngine {

/// Read some data, ideally from the cache of pre-read blocks, but
/// otherwise from the underlying HTTP stream.
#[allow(clippy::comparison_chain)]
fn read(&self, buf: &mut [u8], pos: u64) -> std::io::Result<usize> {
// There is some mutex delicacy here. Goals are:
// a) Allow exactly one thread to be reading on the underlying HTTP stream;
Expand Down Expand Up @@ -388,15 +390,21 @@ impl SeekableHttpReaderEngine {
while reading_stuff.is_none() {
// - If yes, release CACHE mutex, WAIT on condvar atomically
state = self.read_completed.wait(state).unwrap();
if state.read_failed_somewhere {
return Err(std::io::Error::new(
std::io::ErrorKind::BrokenPipe,
"another thread experienced a problem creating a reader",
));
}
// check cache again
if let Some(bytes_read_from_cache) = state.read_from_cache(pos, buf) {
log::debug!("Deferred cache success");
return Ok(bytes_read_from_cache);
}
reading_stuff = state.reader.take();
}
let mut reading_stuff = reading_stuff.unwrap(); // feels like there should
// be a way to do this with while let
let reading_stuff = reading_stuff.unwrap(); // feels like there should
// be a way to do this with while let
state.stats.cache_misses += 1;
// - If no:
// We'll start to do a read ourselves. Because we take()d the
Expand All @@ -410,6 +418,38 @@ impl SeekableHttpReaderEngine {
// release STATE mutex
drop(state);
// perform read
let read_result = self.perform_read_using_reader(
buf,
pos,
reading_stuff,
expect_skip_ahead,
skip_ahead_threshold,
max_block,
);
if read_result.is_err() {
let mut state = self.state.lock().unwrap();
state.read_failed_somewhere = true;
}
// 'state' has been updated to indicate either an error, or the
// reading_materials has been repopulated, so wake up all other
// threads to check.
self.read_completed.notify_all();
read_result
}

#[allow(clippy::comparison_chain)]
// Read from the underlying HTTP stream
// This is a separate function because if it errors at any point
// we need to take cleanup action in the caller.
fn perform_read_using_reader(
&self,
buf: &mut [u8],
pos: u64,
mut reading_stuff: Box<ReadingMaterials>,
expect_skip_ahead: bool,
skip_ahead_threshold: u64,
max_block: usize,
) -> std::io::Result<usize> {
// First check if we need to rewind, OR if we need to fast forward
// and are expecting to skip over some significant data.
if let Some((_, readerpos)) = reading_stuff.reader.as_ref() {
Expand Down Expand Up @@ -788,4 +828,11 @@ mod tests {
server.verify_and_clear();
}
}

// It would be highly desirable to enhance these tests with:
// * tests of what happens if the server refuses a connection at any
// point.
// * tests of what happens if the server closes a connection part way
// through
// * multi-threaded tests
}
Loading