Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Option for reader #52

Merged
merged 2 commits into from
Nov 12, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 32 additions & 42 deletions src/unzip/seekable_http_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,9 +122,6 @@ struct State {
/// we skipped over, in order to service any subsequent requests for those
/// positions.
cache: BTreeMap<u64, CacheCell>,
/// Whether a read from the underlying HTTP stream is afoot. Only one thread
/// can be doing a read at a time.
read_in_progress: bool,
/// We expect to skip some range of the read.
expect_skip_ahead: bool,
/// Threshold for fast forwards when we'd expect to skip some data
Expand All @@ -134,6 +131,11 @@ struct State {
max_block: usize,
/// Some statistics about how we're doing.
stats: SeekableHttpReaderStatistics,
/// Facilities to read from the underlying HTTP stream(s).
/// If this is present, a thread may start a read - it must `take`
/// this. If it's absent, some other thread is doing a read, and
/// you may not.
reader: Option<Box<ReadingMaterials>>,
}

impl State {
Expand All @@ -142,6 +144,7 @@ impl State {
access_pattern: AccessPattern,
skip_ahead_threshold: u64,
max_block: usize,
reader: Box<ReadingMaterials>,
) -> Self {
// Grow the readahead limit if it's less than block size, because we
// must always store one block in order to service the most recent read.
Expand All @@ -155,6 +158,7 @@ impl State {
access_pattern,
skip_ahead_threshold,
max_block,
reader: Some(reader),
..Default::default()
}
}
Expand Down Expand Up @@ -248,14 +252,9 @@ struct ReadingMaterials {
/// objects that do. This object can only be used to access HTTP resources which
/// support the `Range` header - an error will be reported on construction
/// of this object if such ranges are not supported by the remote server.
// Mutex invariant below: only ONE of these mutices may be claimed at once.
// Ideally we'd enforce this in the type system
// per https://medium.com/@adetaylor/can-the-rust-type-system-prevent-deadlocks-9ae6e4123037
pub(crate) struct SeekableHttpReaderEngine {
/// Total stream length
len: u64,
/// Facilities to read from the underlying HTTP stream(s)
reader: Mutex<ReadingMaterials>,
/// Overall state of this object, mostly related to the readahead cache
/// of blocks we already read, but also with the all-important boolean
/// stating whether any thread is already reading on the underlying stream.
Expand Down Expand Up @@ -314,15 +313,15 @@ impl SeekableHttpReaderEngine {
let len = range_fetcher.len();
Ok(Arc::new(Self {
len,
reader: Mutex::new(ReadingMaterials {
range_fetcher,
reader: None,
}),
state: Mutex::new(State::new(
readahead_limit,
access_pattern,
skip_ahead_threshold,
max_block,
Box::new(ReadingMaterials {
range_fetcher,
reader: None,
}),
)),
read_completed: Condvar::new(),
}))
Expand All @@ -345,12 +344,10 @@ impl SeekableHttpReaderEngine {
// a) Allow exactly one thread to be reading on the underlying HTTP stream;
// b) Allow other threads to query the cache of already-read blocks
// without blocking on ongoing reads on the stream.
// We therefore need two mutexes - one for the cache (and, our state in
// general) and another for the actual HTTP stream reader.
// There is a risk of deadlock between these mutexes, since to do
// an actual read we will need to release the state mutex to allow
// others to do the reads. We avoid this by ensuring only a single
// thread ever has permission to do anything with the reader mutex.
// We have a mutex which guards the overall state. Within that state
// is a separate data structure which is used for underlying reads.
// That struct is moved into and out of the state in order to indicate
// whether a thread has a read in progress.
// Specifically:
// Claim STATE mutex
// Is there block in cache?
Expand All @@ -360,21 +357,14 @@ impl SeekableHttpReaderEngine {
// - If yes, release STATE mutex, WAIT on condvar atomically
// check cache again
// - If no:
// set read in progress
// claim READER mutex
// set read in progress by taking reading materials out of state
// release STATE mutex
// perform read
// claim STATE mutex
// insert results
// set read not in progress
// reinsert reading materials back into the state
// release STATE mutex
// release READER mutex
// NOTIFYALL on condvar

// Cases where you have STATE but want READER: near the start
// Cases where you have READER but want STATE: after read,
// ... but this deadlock can't happen because only one thread
// will enter this 'read in progress' block.
log::debug!("Read: requested position 0x{:x}.", pos);

if pos == self.len {
Expand All @@ -393,29 +383,30 @@ impl SeekableHttpReaderEngine {
return Ok(bytes_read_from_cache);
}
// - If no, check if read in progress
let mut read_in_progress = state.read_in_progress;
let mut reading_stuff = state.reader.take();
// Is there read in progress?
while read_in_progress {
while reading_stuff.is_none() {
// - If yes, release CACHE mutex, WAIT on condvar atomically
state = self.read_completed.wait(state).unwrap();
// check cache again
if let Some(bytes_read_from_cache) = state.read_from_cache(pos, buf) {
log::debug!("Deferred cache success");
return Ok(bytes_read_from_cache);
}
read_in_progress = state.read_in_progress;
reading_stuff = state.reader.take();
}
Comment on lines +386 to 397
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might be worth encapsulating this into a State method or other utility, in such a way that it uses RAII to put the value back. It's almost 100 lines later that this value is returned to the Option!

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like that idea! Will do

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had a crack at this but it turned out I didn't like the results so much. I prefer the current explicit step since then it's obvious that we put back the reading stuff before releasing the mutex. I fiddled around a bit with RAII guards and with a .with_reading_stuff(FnOnce) type arrangement but neither was sufficiently delightful.

let mut reading_stuff = reading_stuff.unwrap(); // feels like there should
// be a way to do this with while let
state.stats.cache_misses += 1;
// - If no:
// set read in progress
state.read_in_progress = true;
// We'll start to do a read ourselves. Because we take()d the
// reading_stuff from the state, we now have exclusive ability to do the
// read ourselves.
// If we need to read ahead,
let expect_skip_ahead = state.expect_skip_ahead;
state.expect_skip_ahead = false;
let skip_ahead_threshold = state.skip_ahead_threshold;
let max_block = state.max_block;
// claim READER mutex
let mut reading_stuff = self.reader.lock().unwrap();
// release STATE mutex
drop(state);
// perform read
Expand Down Expand Up @@ -492,10 +483,10 @@ impl SeekableHttpReaderEngine {
if reader_created {
state.stats.num_http_streams += 1;
}
// set read not in progress
state.read_in_progress = false;
// return the underlying reader to the state so that some other
// thread can use it
state.reader = Some(reading_stuff);
// release STATE mutex
// release READER mutex
Ok(bytes_read)
}

Expand All @@ -518,14 +509,13 @@ impl SeekableHttpReaderEngine {
state.stats
);
if matches!(access_pattern, AccessPattern::SequentialIsh) {
if state.read_in_progress {
panic!("Must not call set_expected_access_pattern while a read is in progress");
}
// If we're switching to a sequential pattern, recreate
// the reader at position zero.
log::debug!("create_reader_at_zero");
{
let mut reading_materials = self.reader.lock().unwrap();
let reading_materials = state.reader.as_mut().expect(
"Must not call set_expected_access_pattern while a read is in progress",
);
let new_reader = reading_materials.range_fetcher.fetch_range(0);
if let Ok(new_reader) = new_reader {
reading_materials.reader = Some((BufReader::new(new_reader), 0));
Expand Down Expand Up @@ -684,7 +674,7 @@ mod tests {
)
.unwrap();

let mut seekable_http_reader = seekable_http_reader_engine.clone().create_reader();
let mut seekable_http_reader = seekable_http_reader_engine.create_reader();
server.verify_and_clear();

let mut throwaway = [0u8; 4];
Expand Down
Loading