Skip to content

Commit

Permalink
Auto merge of #10482 - arlosi:refactor_load, r=Eh2406
Browse files Browse the repository at this point in the history
Refactor RegistryData::load to handle management of the index cache

Enables registry implementations to signal if the cache is valid on a per-request basis.

Fixes a bug introduced by #10064 that caused Cargo not to update for several cases in a release build because it believed the index cache to be valid when it was not. The issue only occurred in release builds because debug builds verify that the cache contents is correct (by refreshing it).

Previously `current_version` was called by the index to determine whether the cache was valid. In the new model, `RegistryData::load` is passed the current version of the cache and returns an enum to indicate the status of the cached data.

r? `@eh2406`
cc `@ehuss`
  • Loading branch information
bors committed Mar 17, 2022
2 parents 1023fa5 + a4a882d commit 109bfbd
Show file tree
Hide file tree
Showing 4 changed files with 148 additions and 135 deletions.
177 changes: 92 additions & 85 deletions src/cargo/sources/registry/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@

use crate::core::dependency::Dependency;
use crate::core::{PackageId, SourceId, Summary};
use crate::sources::registry::{RegistryData, RegistryPackage, INDEX_V_MAX};
use crate::sources::registry::{LoadResponse, RegistryData, RegistryPackage, INDEX_V_MAX};
use crate::util::interning::InternedString;
use crate::util::{internal, CargoResult, Config, Filesystem, OptVersionReq, ToSemver};
use anyhow::bail;
Expand Down Expand Up @@ -247,6 +247,7 @@ pub struct IndexSummary {
#[derive(Default)]
struct SummariesCache<'a> {
versions: Vec<(Version, &'a [u8])>,
index_version: &'a str,
}

impl<'cfg> RegistryIndex<'cfg> {
Expand Down Expand Up @@ -358,7 +359,6 @@ impl<'cfg> RegistryIndex<'cfg> {

let root = load.assert_index_locked(&self.path);
let cache_root = root.join(".cache");
let index_version = load.current_version();

// See module comment in `registry/mod.rs` for why this is structured
// the way it is.
Expand All @@ -376,7 +376,6 @@ impl<'cfg> RegistryIndex<'cfg> {
// along the way produce helpful "did you mean?" suggestions.
for (i, path) in UncanonicalizedIter::new(&raw_path).take(1024).enumerate() {
let summaries = Summaries::parse(
index_version.as_deref(),
root,
&cache_root,
path.as_ref(),
Expand Down Expand Up @@ -559,7 +558,6 @@ impl Summaries {
/// * `load` - the actual index implementation which may be very slow to
/// call. We avoid this if we can.
pub fn parse(
index_version: Option<&str>,
root: &Path,
cache_root: &Path,
relative: &Path,
Expand All @@ -571,88 +569,101 @@ impl Summaries {
// of reasons, but consider all of them non-fatal and just log their
// occurrence in case anyone is debugging anything.
let cache_path = cache_root.join(relative);
let mut cache_contents = None;
if let Some(index_version) = index_version {
match fs::read(&cache_path) {
Ok(contents) => match Summaries::parse_cache(contents, index_version) {
Ok(s) => {
log::debug!("fast path for registry cache of {:?}", relative);
if cfg!(debug_assertions) {
cache_contents = Some(s.raw_data);
} else {
return Poll::Ready(Ok(Some(s)));
}
}
Err(e) => {
log::debug!("failed to parse {:?} cache: {}", relative, e);
}
},
Err(e) => log::debug!("cache missing for {:?} error: {}", relative, e),
}
let mut cached_summaries = None;
let mut index_version = None;
match fs::read(&cache_path) {
Ok(contents) => match Summaries::parse_cache(contents) {
Ok((s, v)) => {
cached_summaries = Some(s);
index_version = Some(v);
}
Err(e) => {
log::debug!("failed to parse {:?} cache: {}", relative, e);
}
},
Err(e) => log::debug!("cache missing for {:?} error: {}", relative, e),
}

// This is the fallback path where we actually talk to the registry backend to load
// information. Here we parse every single line in the index (as we need
// to find the versions)
log::debug!("slow path for {:?}", relative);
let mut response = load.load(root, relative, index_version.as_deref())?;
// In debug builds, perform a second load without the cache so that
// we can validate that the cache is correct.
if cfg!(debug_assertions) && matches!(response, Poll::Ready(LoadResponse::CacheValid)) {
response = load.load(root, relative, None)?;
}
let response = match response {
Poll::Pending => return Poll::Pending,
Poll::Ready(response) => response,
};

let mut bytes_to_cache = None;
let mut version_to_cache = None;
let mut ret = Summaries::default();
let mut hit_closure = false;
let mut cache_bytes = None;
let result = load.load(root, relative, &mut |contents| {
ret.raw_data = contents.to_vec();
let mut cache = SummariesCache::default();
hit_closure = true;
for line in split(contents, b'\n') {
// Attempt forwards-compatibility on the index by ignoring
// everything that we ourselves don't understand, that should
// allow future cargo implementations to break the
// interpretation of each line here and older cargo will simply
// ignore the new lines.
let summary = match IndexSummary::parse(config, line, source_id) {
Ok(summary) => summary,
Err(e) => {
// This should only happen when there is an index
// entry from a future version of cargo that this
// version doesn't understand. Hopefully, those future
// versions of cargo correctly set INDEX_V_MAX and
// CURRENT_CACHE_VERSION, otherwise this will skip
// entries in the cache preventing those newer
// versions from reading them (that is, until the
// cache is rebuilt).
log::info!("failed to parse {:?} registry package: {}", relative, e);
continue;
}
};
let version = summary.summary.package_id().version().clone();
cache.versions.push((version.clone(), line));
ret.versions.insert(version, summary.into());
match response {
LoadResponse::CacheValid => {
log::debug!("fast path for registry cache of {:?}", relative);
return Poll::Ready(Ok(cached_summaries));
}
if let Some(index_version) = index_version {
cache_bytes = Some(cache.serialize(index_version));
LoadResponse::NotFound => {
debug_assert!(cached_summaries.is_none());
return Poll::Ready(Ok(None));
}
LoadResponse::Data {
raw_data,
index_version,
} => {
// This is the fallback path where we actually talk to the registry backend to load
// information. Here we parse every single line in the index (as we need
// to find the versions)
log::debug!("slow path for {:?}", relative);
let mut cache = SummariesCache::default();
ret.raw_data = raw_data;
for line in split(&ret.raw_data, b'\n') {
// Attempt forwards-compatibility on the index by ignoring
// everything that we ourselves don't understand, that should
// allow future cargo implementations to break the
// interpretation of each line here and older cargo will simply
// ignore the new lines.
let summary = match IndexSummary::parse(config, line, source_id) {
Ok(summary) => summary,
Err(e) => {
// This should only happen when there is an index
// entry from a future version of cargo that this
// version doesn't understand. Hopefully, those future
// versions of cargo correctly set INDEX_V_MAX and
// CURRENT_CACHE_VERSION, otherwise this will skip
// entries in the cache preventing those newer
// versions from reading them (that is, until the
// cache is rebuilt).
log::info!("failed to parse {:?} registry package: {}", relative, e);
continue;
}
};
let version = summary.summary.package_id().version().clone();
cache.versions.push((version.clone(), line));
ret.versions.insert(version, summary.into());
}
if let Some(index_version) = index_version {
bytes_to_cache = Some(cache.serialize(index_version.as_str()));
version_to_cache = Some(index_version);
}
}
Ok(())
});

if result?.is_pending() {
assert!(!hit_closure);
return Poll::Pending;
}

if !hit_closure {
debug_assert!(cache_contents.is_none());
return Poll::Ready(Ok(None));
}

// If we've got debug assertions enabled and the cache was previously
// present and considered fresh this is where the debug assertions
// actually happens to verify that our cache is indeed fresh and
// computes exactly the same value as before.
if cfg!(debug_assertions) && cache_contents.is_some() && cache_bytes != cache_contents {
let cache_contents = cached_summaries.as_ref().map(|s| &s.raw_data);
if cfg!(debug_assertions)
&& index_version.as_deref() == version_to_cache.as_deref()
&& cached_summaries.is_some()
&& bytes_to_cache.as_ref() != cache_contents
{
panic!(
"original cache contents:\n{:?}\n\
does not equal new cache contents:\n{:?}\n",
cache_contents.as_ref().map(|s| String::from_utf8_lossy(s)),
cache_bytes.as_ref().map(|s| String::from_utf8_lossy(s)),
bytes_to_cache.as_ref().map(|s| String::from_utf8_lossy(s)),
);
}

Expand All @@ -662,7 +673,7 @@ impl Summaries {
//
// This is opportunistic so we ignore failure here but are sure to log
// something in case of error.
if let Some(cache_bytes) = cache_bytes {
if let Some(cache_bytes) = bytes_to_cache {
if paths::create_dir_all(cache_path.parent().unwrap()).is_ok() {
let path = Filesystem::new(cache_path.clone());
config.assert_package_cache_locked(&path);
Expand All @@ -677,16 +688,17 @@ impl Summaries {

/// Parses an open `File` which represents information previously cached by
/// Cargo.
pub fn parse_cache(contents: Vec<u8>, last_index_update: &str) -> CargoResult<Summaries> {
let cache = SummariesCache::parse(&contents, last_index_update)?;
pub fn parse_cache(contents: Vec<u8>) -> CargoResult<(Summaries, InternedString)> {
let cache = SummariesCache::parse(&contents)?;
let index_version = InternedString::new(cache.index_version);
let mut ret = Summaries::default();
for (version, summary) in cache.versions {
let (start, end) = subslice_bounds(&contents, summary);
ret.versions
.insert(version, MaybeIndexSummary::Unparsed { start, end });
}
ret.raw_data = contents;
return Ok(ret);
return Ok((ret, index_version));

// Returns the start/end offsets of `inner` with `outer`. Asserts that
// `inner` is a subslice of `outer`.
Expand Down Expand Up @@ -742,7 +754,7 @@ impl Summaries {
const CURRENT_CACHE_VERSION: u8 = 3;

impl<'a> SummariesCache<'a> {
fn parse(data: &'a [u8], last_index_update: &str) -> CargoResult<SummariesCache<'a>> {
fn parse(data: &'a [u8]) -> CargoResult<SummariesCache<'a>> {
// NB: keep this method in sync with `serialize` below
let (first_byte, rest) = data
.split_first()
Expand All @@ -764,18 +776,13 @@ impl<'a> SummariesCache<'a> {
let rest = &rest[4..];

let mut iter = split(rest, 0);
if let Some(update) = iter.next() {
if update != last_index_update.as_bytes() {
bail!(
"cache out of date: current index ({}) != cache ({})",
last_index_update,
str::from_utf8(update)?,
)
}
let last_index_update = if let Some(update) = iter.next() {
str::from_utf8(update)?
} else {
bail!("malformed file");
}
};
let mut ret = SummariesCache::default();
ret.index_version = last_index_update;
while let Some(version) = iter.next() {
let version = str::from_utf8(version)?;
let version = Version::parse(version)?;
Expand Down
16 changes: 7 additions & 9 deletions src/cargo/sources/registry/local.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use crate::core::PackageId;
use crate::sources::registry::{MaybeLock, RegistryConfig, RegistryData};
use crate::sources::registry::{LoadResponse, MaybeLock, RegistryConfig, RegistryData};
use crate::util::errors::CargoResult;
use crate::util::interning::InternedString;
use crate::util::{Config, Filesystem};
use cargo_util::{paths, Sha256};
use std::fs::File;
Expand Down Expand Up @@ -48,18 +47,17 @@ impl<'cfg> RegistryData for LocalRegistry<'cfg> {
path.as_path_unlocked()
}

fn current_version(&self) -> Option<InternedString> {
None
}

fn load(
&self,
root: &Path,
path: &Path,
data: &mut dyn FnMut(&[u8]) -> CargoResult<()>,
) -> Poll<CargoResult<()>> {
_index_version: Option<&str>,
) -> Poll<CargoResult<LoadResponse>> {
if self.updated {
Poll::Ready(Ok(data(&paths::read_bytes(&root.join(path))?)?))
Poll::Ready(Ok(LoadResponse::Data {
raw_data: paths::read_bytes(&root.join(path))?,
index_version: None,
}))
} else {
Poll::Pending
}
Expand Down
31 changes: 17 additions & 14 deletions src/cargo/sources/registry/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -411,6 +411,20 @@ impl<'a> RegistryDependency<'a> {
}
}

pub enum LoadResponse {
/// The cache is valid. The cached data should be used.
CacheValid,

/// The cache is out of date. Returned data should be used.
Data {
raw_data: Vec<u8>,
index_version: Option<String>,
},

/// The requested crate was found.
NotFound,
}

/// An abstract interface to handle both a local (see `local::LocalRegistry`)
/// and remote (see `remote::RemoteRegistry`) registry.
///
Expand All @@ -432,15 +446,13 @@ pub trait RegistryData {
///
/// * `root` is the root path to the index.
/// * `path` is the relative path to the package to load (like `ca/rg/cargo`).
/// * `data` is a callback that will receive the raw bytes of the index JSON file.
///
/// If `load` returns a `Poll::Pending` then it must not have called data.
/// * `index_version` is the version of the requested crate data currently in cache.
fn load(
&self,
root: &Path,
path: &Path,
data: &mut dyn FnMut(&[u8]) -> CargoResult<()>,
) -> Poll<CargoResult<()>>;
index_version: Option<&str>,
) -> Poll<CargoResult<LoadResponse>>;

/// Loads the `config.json` file and returns it.
///
Expand Down Expand Up @@ -495,15 +507,6 @@ pub trait RegistryData {
/// Returns the [`Path`] to the [`Filesystem`].
fn assert_index_locked<'a>(&self, path: &'a Filesystem) -> &'a Path;

/// Returns the current "version" of the index.
///
/// For local registries, this returns `None` because there is no
/// versioning. For remote registries, this returns the SHA hash of the
/// git index on disk (or None if the index hasn't been downloaded yet).
///
/// This is used by index caching to check if the cache is out of date.
fn current_version(&self) -> Option<InternedString>;

/// Block until all outstanding Poll::Pending requests are Poll::Ready.
fn block_until_ready(&mut self) -> CargoResult<()>;
}
Expand Down
Loading

0 comments on commit 109bfbd

Please sign in to comment.