From aae537dbe2f89365e9791f38b1c475db164c2ae9 Mon Sep 17 00:00:00 2001 From: Adrian Taylor Date: Fri, 10 Nov 2023 17:17:35 +0000 Subject: [PATCH 1/2] Simplify to remove a mutex. Thanks to @djmitche for making me look at this again! --- src/unzip/seekable_http_reader.rs | 72 +++++++++++++------------------ 1 file changed, 31 insertions(+), 41 deletions(-) diff --git a/src/unzip/seekable_http_reader.rs b/src/unzip/seekable_http_reader.rs index 4f41bfd..9eb9a13 100644 --- a/src/unzip/seekable_http_reader.rs +++ b/src/unzip/seekable_http_reader.rs @@ -122,9 +122,6 @@ struct State { /// we skipped over, in order to service any subsequent requests for those /// positions. cache: BTreeMap, - /// Whether a read from the underlying HTTP stream is afoot. Only one thread - /// can be doing a read at a time. - read_in_progress: bool, /// We expect to skip some range of the read. expect_skip_ahead: bool, /// Threshold for fast forwards when we'd expect to skip some data @@ -134,6 +131,11 @@ struct State { max_block: usize, /// Some statistics about how we're doing. stats: SeekableHttpReaderStatistics, + /// Facilities to read from the underlying HTTP stream(s). + /// If this is present, a thread may start a read - it must `take` + /// this. If it's absent, some other thread is doing a read, and + /// you may not. + reader: Option>, } impl State { @@ -142,6 +144,7 @@ impl State { access_pattern: AccessPattern, skip_ahead_threshold: u64, max_block: usize, + reader: Box, ) -> Self { // Grow the readahead limit if it's less than block size, because we // must always store one block in order to service the most recent read. @@ -155,6 +158,7 @@ impl State { access_pattern, skip_ahead_threshold, max_block, + reader: Some(reader), ..Default::default() } } @@ -248,14 +252,9 @@ struct ReadingMaterials { /// objects that do. This object can only be used to access HTTP resources which /// support the `Range` header - an error will be reported on construction /// of this object if such ranges are not supported by the remote server. -// Mutex invariant below: only ONE of these mutices may be claimed at once. -// Ideally we'd enforce this in the type system -// per https://medium.com/@adetaylor/can-the-rust-type-system-prevent-deadlocks-9ae6e4123037 pub(crate) struct SeekableHttpReaderEngine { /// Total stream length len: u64, - /// Facilities to read from the underlying HTTP stream(s) - reader: Mutex, /// Overall state of this object, mostly related to the readahead cache /// of blocks we already read, but also with the all-important boolean /// stating whether any thread is already reading on the underlying stream. @@ -314,15 +313,15 @@ impl SeekableHttpReaderEngine { let len = range_fetcher.len(); Ok(Arc::new(Self { len, - reader: Mutex::new(ReadingMaterials { - range_fetcher, - reader: None, - }), state: Mutex::new(State::new( readahead_limit, access_pattern, skip_ahead_threshold, max_block, + Box::new(ReadingMaterials { + range_fetcher, + reader: None, + }), )), read_completed: Condvar::new(), })) @@ -345,12 +344,10 @@ impl SeekableHttpReaderEngine { // a) Allow exactly one thread to be reading on the underlying HTTP stream; // b) Allow other threads to query the cache of already-read blocks // without blocking on ongoing reads on the stream. - // We therefore need two mutexes - one for the cache (and, our state in - // general) and another for the actual HTTP stream reader. - // There is a risk of deadlock between these mutexes, since to do - // an actual read we will need to release the state mutex to allow - // others to do the reads. We avoid this by ensuring only a single - // thread ever has permission to do anything with the reader mutex. + // We have a mutex which guards the overall state. Within that state + // is a separate data structure which is used for underlying reads. + // That struct is moved into and out of the state in order to indicate + // whether a thread has a read in progress. // Specifically: // Claim STATE mutex // Is there block in cache? @@ -360,21 +357,14 @@ impl SeekableHttpReaderEngine { // - If yes, release STATE mutex, WAIT on condvar atomically // check cache again // - If no: - // set read in progress - // claim READER mutex + // set read in progress by taking reading materials out of state // release STATE mutex // perform read // claim STATE mutex // insert results - // set read not in progress + // reinsert reading materials back into the state // release STATE mutex - // release READER mutex // NOTIFYALL on condvar - - // Cases where you have STATE but want READER: near the start - // Cases where you have READER but want STATE: after read, - // ... but this deadlock can't happen because only one thread - // will enter this 'read in progress' block. log::debug!("Read: requested position 0x{:x}.", pos); if pos == self.len { @@ -393,9 +383,9 @@ impl SeekableHttpReaderEngine { return Ok(bytes_read_from_cache); } // - If no, check if read in progress - let mut read_in_progress = state.read_in_progress; + let mut reading_stuff = state.reader.take(); // Is there read in progress? - while read_in_progress { + while reading_stuff.is_none() { // - If yes, release CACHE mutex, WAIT on condvar atomically state = self.read_completed.wait(state).unwrap(); // check cache again @@ -403,19 +393,20 @@ impl SeekableHttpReaderEngine { log::debug!("Deferred cache success"); return Ok(bytes_read_from_cache); } - read_in_progress = state.read_in_progress; + reading_stuff = state.reader.take(); } + let mut reading_stuff = reading_stuff.unwrap(); // feels like there should + // be a way to do this with while let state.stats.cache_misses += 1; // - If no: - // set read in progress - state.read_in_progress = true; + // We'll start to do a read ourselves. Because we take()d the + // reading_stuff from the state, we now have exclusive ability to do the + // read ourselves. // If we need to read ahead, let expect_skip_ahead = state.expect_skip_ahead; state.expect_skip_ahead = false; let skip_ahead_threshold = state.skip_ahead_threshold; let max_block = state.max_block; - // claim READER mutex - let mut reading_stuff = self.reader.lock().unwrap(); // release STATE mutex drop(state); // perform read @@ -492,10 +483,10 @@ impl SeekableHttpReaderEngine { if reader_created { state.stats.num_http_streams += 1; } - // set read not in progress - state.read_in_progress = false; + // return the underlying reader to the state so that some other + // thread can use it + state.reader = Some(reading_stuff); // release STATE mutex - // release READER mutex Ok(bytes_read) } @@ -518,14 +509,13 @@ impl SeekableHttpReaderEngine { state.stats ); if matches!(access_pattern, AccessPattern::SequentialIsh) { - if state.read_in_progress { - panic!("Must not call set_expected_access_pattern while a read is in progress"); - } // If we're switching to a sequential pattern, recreate // the reader at position zero. log::debug!("create_reader_at_zero"); { - let mut reading_materials = self.reader.lock().unwrap(); + let reading_materials = state.reader.as_mut().expect( + "Must not call set_expected_access_pattern while a read is in progress", + ); let new_reader = reading_materials.range_fetcher.fetch_range(0); if let Ok(new_reader) = new_reader { reading_materials.reader = Some((BufReader::new(new_reader), 0)); From dc09729ee5e13b924d829769d8a484d459db03e4 Mon Sep 17 00:00:00 2001 From: Adrian Taylor Date: Fri, 10 Nov 2023 17:19:44 +0000 Subject: [PATCH 2/2] Clippy fix. --- src/unzip/seekable_http_reader.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/unzip/seekable_http_reader.rs b/src/unzip/seekable_http_reader.rs index 9eb9a13..57cad86 100644 --- a/src/unzip/seekable_http_reader.rs +++ b/src/unzip/seekable_http_reader.rs @@ -674,7 +674,7 @@ mod tests { ) .unwrap(); - let mut seekable_http_reader = seekable_http_reader_engine.clone().create_reader(); + let mut seekable_http_reader = seekable_http_reader_engine.create_reader(); server.verify_and_clear(); let mut throwaway = [0u8; 4];