From 8ef2c356ac8b51904eaa14cea0a1873554cc6578 Mon Sep 17 00:00:00 2001 From: Adrian Taylor Date: Fri, 27 Oct 2023 17:24:01 +0100 Subject: [PATCH] Fix bug where cache cells not being discarded. In patterns of access where we read the same bytes more than once, we may have discarded cache cells too early, which might have resulted in the need to issue extra https requests and thus slowdowns. --- Cargo.lock | 7 ++++ Cargo.toml | 1 + src/unzip/seekable_http_reader.rs | 64 +++++++++++++++++++++++++------ 3 files changed, 60 insertions(+), 12 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 0b12cc4..bfa0fe8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1311,6 +1311,12 @@ dependencies = [ "getrandom", ] +[[package]] +name = "ranges" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f781d391cd4838df77e09fd26e33a87e0ac9bf2edf6ff770cfc65f83d50e3948" + [[package]] name = "rayon" version = "1.8.0" @@ -1430,6 +1436,7 @@ dependencies = [ "log", "monitor", "progress-streams", + "ranges", "rayon", "regex", "reqwest", diff --git a/Cargo.toml b/Cargo.toml index f64a88e..9119a9d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -28,6 +28,7 @@ itertools = "0.10.5" log = "0.4.17" monitor = "0.1.0" progress-streams = "1.1.0" +ranges = "0.3.3" rayon = "1.6.0" regex = "1.10.2" reqwest = { version = "0.11.13", features = ["blocking"] } diff --git a/src/unzip/seekable_http_reader.rs b/src/unzip/seekable_http_reader.rs index 2f37191..5ed3274 100644 --- a/src/unzip/seekable_http_reader.rs +++ b/src/unzip/seekable_http_reader.rs @@ -10,9 +10,11 @@ use std::{ cmp::min, collections::BTreeMap, io::{BufReader, ErrorKind, Read, Seek, SeekFrom}, + ops::Range, sync::{Arc, Condvar, Mutex}, }; +use ranges::Ranges; use reqwest::blocking::Response; use thiserror::Error; @@ -66,23 +68,31 @@ pub(crate) enum Error { /// caller. struct CacheCell { data: Vec, - bytes_read: usize, + bytes_read: Ranges, } impl CacheCell { fn new(data: Vec) -> Self { Self { data, - bytes_read: 0, + bytes_read: Ranges::new(), } } + fn read(&mut self, range: Range) -> &[u8] { + let new_range = self.bytes_read.clone().union(Ranges::from(range.clone())); + self.bytes_read = new_range; + &self.data[range] + } + fn len(&self) -> usize { self.data.len() } fn entirely_consumed(&self) -> bool { - self.bytes_read >= self.len() + let data_left_to_read = + Ranges::from(0..self.data.len()).difference(self.bytes_read.clone()); + data_left_to_read.is_empty() } } @@ -177,7 +187,7 @@ impl State { let block_len = block.len(); let block_offset = pos as usize - *possible_block_start as usize; let to_read = min(buf.len(), block_len - block_offset); - buf[..to_read].copy_from_slice(&block.data[block_offset..to_read + block_offset]); + buf[..to_read].copy_from_slice(block.read(block_offset..to_read + block_offset)); block.bytes_read += to_read; self.stats.cache_hits += 1; if discard_read_data && block.entirely_consumed() { @@ -549,7 +559,7 @@ mod tests { fn get_range_expectation(expected_start: u64, expected_end: u64) -> Expectation { Expectation::matching(request::method_path("GET", "/foo")) - .times(..) + .times(1..) .respond_with(RangeAwareResponder::expected_range( expected_start, expected_end, @@ -581,26 +591,56 @@ mod tests { seekable_http_reader.read_exact(&mut throwaway).unwrap(); assert_eq!(std::str::from_utf8(&throwaway).unwrap(), "4567"); seekable_http_reader.stream_position().unwrap(); - server.expect(get_range_expectation(8, 12)); seekable_http_reader.read_exact(&mut throwaway).unwrap(); assert_eq!(std::str::from_utf8(&throwaway).unwrap(), "89AB"); - // server.verify_and_clear(); + server.verify_and_clear(); - // Now rewind. We expect a new request for the whole file + // Now rewind. seekable_http_reader.rewind().unwrap(); - server.expect(get_range_expectation(0, 12)); + if matches!(access_pattern, AccessPattern::SequentialIsh) { + // If we're in sequential mode, we expect to have discarded + // the data in the cache and should start a new read. + server.expect(get_range_expectation(0, 12)); + } seekable_http_reader.read_exact(&mut throwaway).unwrap(); assert_eq!(std::str::from_utf8(&throwaway).unwrap(), "0123"); seekable_http_reader.read_exact(&mut throwaway).unwrap(); assert_eq!(std::str::from_utf8(&throwaway).unwrap(), "4567"); - server.verify_and_clear(); + seekable_http_reader.read_exact(&mut throwaway).unwrap(); + assert_eq!(std::str::from_utf8(&throwaway).unwrap(), "89AB"); + + if matches!(access_pattern, AccessPattern::SequentialIsh) { + server.verify_and_clear(); + } // Rewind a bit... we should get a range request only from here on. seekable_http_reader.seek(SeekFrom::Start(4)).unwrap(); - server.expect(get_range_expectation(4, 8)); + if matches!(access_pattern, AccessPattern::SequentialIsh) { + server.expect(get_range_expectation(4, 12)); + } seekable_http_reader.read_exact(&mut throwaway).unwrap(); assert_eq!(std::str::from_utf8(&throwaway).unwrap(), "4567"); - server.verify_and_clear(); + if matches!(access_pattern, AccessPattern::SequentialIsh) { + server.verify_and_clear(); + } + + // Test fast forwarding behavior. There's only any point + // in sequential mode; in random access we'll service requests + // from the cache of what we already read. + + if matches!(access_pattern, AccessPattern::SequentialIsh) { + seekable_http_reader.rewind().unwrap(); + server.expect(get_range_expectation(0, 12)); + seekable_http_reader.read_exact(&mut throwaway).unwrap(); + assert_eq!(std::str::from_utf8(&throwaway).unwrap(), "0123"); + server.verify_and_clear(); + // then seek forward + seekable_http_reader.seek(SeekFrom::Start(8)).unwrap(); + // We expect no new requests. We'll just skip over. + seekable_http_reader.read_exact(&mut throwaway).unwrap(); + assert_eq!(std::str::from_utf8(&throwaway).unwrap(), "89AB"); + server.verify_and_clear(); + } } struct RangeAwareResponder {