Skip to content

Commit

Permalink
Fix bug where cache cells not being discarded.
Browse files Browse the repository at this point in the history
In patterns of access where we read the same bytes more than once, we may have
discarded cache cells too early, which might have resulted in the need to issue
extra https requests and thus slowdowns.
  • Loading branch information
adetaylor committed Oct 27, 2023
1 parent 2100402 commit 8ef2c35
Show file tree
Hide file tree
Showing 3 changed files with 60 additions and 12 deletions.
7 changes: 7 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ itertools = "0.10.5"
log = "0.4.17"
monitor = "0.1.0"
progress-streams = "1.1.0"
ranges = "0.3.3"
rayon = "1.6.0"
regex = "1.10.2"
reqwest = { version = "0.11.13", features = ["blocking"] }
Expand Down
64 changes: 52 additions & 12 deletions src/unzip/seekable_http_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,11 @@ use std::{
cmp::min,
collections::BTreeMap,
io::{BufReader, ErrorKind, Read, Seek, SeekFrom},
ops::Range,
sync::{Arc, Condvar, Mutex},
};

use ranges::Ranges;
use reqwest::blocking::Response;
use thiserror::Error;

Expand Down Expand Up @@ -66,23 +68,31 @@ pub(crate) enum Error {
/// caller.
struct CacheCell {
data: Vec<u8>,
bytes_read: usize,
bytes_read: Ranges<usize>,
}

impl CacheCell {
fn new(data: Vec<u8>) -> Self {
Self {
data,
bytes_read: 0,
bytes_read: Ranges::new(),
}
}

fn read(&mut self, range: Range<usize>) -> &[u8] {
let new_range = self.bytes_read.clone().union(Ranges::from(range.clone()));
self.bytes_read = new_range;
&self.data[range]
}

fn len(&self) -> usize {
self.data.len()
}

fn entirely_consumed(&self) -> bool {
self.bytes_read >= self.len()
let data_left_to_read =
Ranges::from(0..self.data.len()).difference(self.bytes_read.clone());
data_left_to_read.is_empty()
}
}

Expand Down Expand Up @@ -177,7 +187,7 @@ impl State {
let block_len = block.len();
let block_offset = pos as usize - *possible_block_start as usize;
let to_read = min(buf.len(), block_len - block_offset);
buf[..to_read].copy_from_slice(&block.data[block_offset..to_read + block_offset]);
buf[..to_read].copy_from_slice(block.read(block_offset..to_read + block_offset));
block.bytes_read += to_read;
self.stats.cache_hits += 1;
if discard_read_data && block.entirely_consumed() {
Expand Down Expand Up @@ -549,7 +559,7 @@ mod tests {

fn get_range_expectation(expected_start: u64, expected_end: u64) -> Expectation {
Expectation::matching(request::method_path("GET", "/foo"))
.times(..)
.times(1..)
.respond_with(RangeAwareResponder::expected_range(
expected_start,
expected_end,
Expand Down Expand Up @@ -581,26 +591,56 @@ mod tests {
seekable_http_reader.read_exact(&mut throwaway).unwrap();
assert_eq!(std::str::from_utf8(&throwaway).unwrap(), "4567");
seekable_http_reader.stream_position().unwrap();
server.expect(get_range_expectation(8, 12));
seekable_http_reader.read_exact(&mut throwaway).unwrap();
assert_eq!(std::str::from_utf8(&throwaway).unwrap(), "89AB");
// server.verify_and_clear();
server.verify_and_clear();

// Now rewind. We expect a new request for the whole file
// Now rewind.
seekable_http_reader.rewind().unwrap();
server.expect(get_range_expectation(0, 12));
if matches!(access_pattern, AccessPattern::SequentialIsh) {
// If we're in sequential mode, we expect to have discarded
// the data in the cache and should start a new read.
server.expect(get_range_expectation(0, 12));
}
seekable_http_reader.read_exact(&mut throwaway).unwrap();
assert_eq!(std::str::from_utf8(&throwaway).unwrap(), "0123");
seekable_http_reader.read_exact(&mut throwaway).unwrap();
assert_eq!(std::str::from_utf8(&throwaway).unwrap(), "4567");
server.verify_and_clear();
seekable_http_reader.read_exact(&mut throwaway).unwrap();
assert_eq!(std::str::from_utf8(&throwaway).unwrap(), "89AB");

if matches!(access_pattern, AccessPattern::SequentialIsh) {
server.verify_and_clear();
}

// Rewind a bit... we should get a range request only from here on.
seekable_http_reader.seek(SeekFrom::Start(4)).unwrap();
server.expect(get_range_expectation(4, 8));
if matches!(access_pattern, AccessPattern::SequentialIsh) {
server.expect(get_range_expectation(4, 12));
}
seekable_http_reader.read_exact(&mut throwaway).unwrap();
assert_eq!(std::str::from_utf8(&throwaway).unwrap(), "4567");
server.verify_and_clear();
if matches!(access_pattern, AccessPattern::SequentialIsh) {
server.verify_and_clear();
}

// Test fast forwarding behavior. There's only any point
// in sequential mode; in random access we'll service requests
// from the cache of what we already read.

if matches!(access_pattern, AccessPattern::SequentialIsh) {
seekable_http_reader.rewind().unwrap();
server.expect(get_range_expectation(0, 12));
seekable_http_reader.read_exact(&mut throwaway).unwrap();
assert_eq!(std::str::from_utf8(&throwaway).unwrap(), "0123");
server.verify_and_clear();
// then seek forward
seekable_http_reader.seek(SeekFrom::Start(8)).unwrap();
// We expect no new requests. We'll just skip over.
seekable_http_reader.read_exact(&mut throwaway).unwrap();
assert_eq!(std::str::from_utf8(&throwaway).unwrap(), "89AB");
server.verify_and_clear();
}
}

struct RangeAwareResponder {
Expand Down

0 comments on commit 8ef2c35

Please sign in to comment.