Skip to content

Commit

Permalink
Cean up
Browse files Browse the repository at this point in the history
  • Loading branch information
charliermarsh committed Jan 27, 2025
1 parent e611a06 commit 3548f59
Show file tree
Hide file tree
Showing 5 changed files with 88 additions and 95 deletions.
14 changes: 13 additions & 1 deletion crates/uv-cache/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use tracing::debug;
pub use archive::ArchiveId;
use uv_cache_info::Timestamp;
use uv_distribution_types::InstalledDist;
use uv_fs::{cachedir, directories};
use uv_fs::{cachedir, directories, LockedFile};
use uv_normalize::PackageName;
use uv_pypi_types::ResolutionMetadata;

Expand Down Expand Up @@ -74,6 +74,12 @@ impl CacheEntry {
pub fn with_file(&self, file: impl AsRef<Path>) -> Self {
Self(self.dir().join(file))
}

/// Acquire the [`CacheEntry`] as an exclusive lock.
pub async fn lock(&self) -> Result<LockedFile, io::Error> {
fs_err::create_dir_all(self.dir())?;
LockedFile::acquire(self.path(), self.path().display()).await
}
}

impl AsRef<Path> for CacheEntry {
Expand All @@ -97,6 +103,12 @@ impl CacheShard {
pub fn shard(&self, dir: impl AsRef<Path>) -> Self {
Self(self.0.join(dir.as_ref()))
}

/// Acquire the cache entry as an exclusive lock.
pub async fn lock(&self) -> Result<LockedFile, io::Error> {
fs_err::create_dir_all(self.as_ref())?;
LockedFile::acquire(self.join(".lock"), self.display()).await
}
}

impl AsRef<Path> for CacheShard {
Expand Down
40 changes: 18 additions & 22 deletions crates/uv-client/src/registry_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ use uv_distribution_filename::{DistFilename, SourceDistFilename, WheelFilename};
use uv_distribution_types::{
BuiltDist, File, FileLocation, Index, IndexCapabilities, IndexUrl, IndexUrls, Name,
};
use uv_fs::LockedFile;
use uv_metadata::{read_metadata_async_seek, read_metadata_async_stream};
use uv_normalize::PackageName;
use uv_pep440::Version;
Expand Down Expand Up @@ -337,9 +336,12 @@ impl RegistryClient {
Connectivity::Offline => CacheControl::AllowStale,
};

// Acquire the advisory lock for the cache entry.
let lock_entry = cache_entry.with_file(format!("{package_name}.lock"));
let _lock = acquire_lock(&lock_entry).await?;
// Acquire an advisory lock, to guard against concurrent writes.
#[cfg(windows)]
let _lock = {
let lock_entry = cache_entry.with_file(format!("{package_name}.lock"));
lock_entry.lock().await.map_err(ErrorKind::CacheWrite)?
};

let result = if matches!(index, IndexUrl::Path(_)) {
self.fetch_local_index(package_name, &url).await
Expand Down Expand Up @@ -619,9 +621,12 @@ impl RegistryClient {
Connectivity::Offline => CacheControl::AllowStale,
};

// Acquire the advisory lock for the cache entry.
let lock_entry = cache_entry.with_file(format!("{}.lock", filename.stem()));
let _lock = acquire_lock(&lock_entry).await?;
// Acquire an advisory lock, to guard against concurrent writes.
#[cfg(windows)]
let _lock = {
let lock_entry = cache_entry.with_file(format!("{}.lock", filename.stem()));
lock_entry.lock().await.map_err(ErrorKind::CacheWrite)?
};

let response_callback = |response: Response| async {
let bytes = response
Expand Down Expand Up @@ -686,9 +691,12 @@ impl RegistryClient {
Connectivity::Offline => CacheControl::AllowStale,
};

// Acquire the advisory lock for the cache entry.
let lock_entry = cache_entry.with_file(format!("{}.lock", filename.stem()));
let _lock = acquire_lock(&lock_entry).await?;
// Acquire an advisory lock, to guard against concurrent writes.
#[cfg(windows)]
let _lock = {
let lock_entry = cache_entry.with_file(format!("{}.lock", filename.stem()));
lock_entry.lock().await.map_err(ErrorKind::CacheWrite)?
};

// Attempt to fetch via a range request.
if index.map_or(true, |index| capabilities.supports_range_requests(index)) {
Expand Down Expand Up @@ -992,18 +1000,6 @@ impl Connectivity {
}
}

/// Apply an advisory lock to a [`CacheEntry`] to prevent concurrent cache writes.
async fn acquire_lock(cache_entry: &CacheEntry) -> Result<LockedFile, Error> {
fs_err::create_dir_all(cache_entry.dir()).map_err(ErrorKind::Io)?;

let lock = LockedFile::acquire(cache_entry.path(), cache_entry.dir().display())
.await.map_err(ErrorKind::Io)
?;

Ok(lock)
}


#[cfg(test)]
mod tests {
use std::str::FromStr;
Expand Down
83 changes: 35 additions & 48 deletions crates/uv-distribution/src/distribution_database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use tokio_util::compat::FuturesAsyncReadCompatExt;
use tracing::{info_span, instrument, warn, Instrument};
use url::Url;

use uv_cache::{ArchiveId, CacheBucket, CacheEntry, CacheShard, WheelCache};
use uv_cache::{ArchiveId, CacheBucket, CacheEntry, WheelCache};
use uv_cache_info::{CacheInfo, Timestamp};
use uv_client::{
CacheControl, CachedClientError, Connectivity, DataWithCachePolicy, RegistryClient,
Expand All @@ -25,15 +25,15 @@ use uv_distribution_types::{
SourceDist,
};
use uv_extract::hash::Hasher;
use uv_fs::{write_atomic, LockedFile};
use uv_fs::write_atomic;
use uv_platform_tags::Tags;
use uv_pypi_types::HashDigest;
use uv_types::{BuildContext, BuildStack};

use crate::archive::Archive;
use crate::locks::Locks;
use crate::metadata::{ArchiveMetadata, Metadata};
use crate::source::{acquire_entry_lock, lock_shard, SourceDistributionBuilder};
use crate::source::SourceDistributionBuilder;
use crate::{Error, LocalWheel, Reporter, RequiresDist};

/// A cached high-level interface to convert distributions (a requirement resolved to a location)
Expand Down Expand Up @@ -356,6 +356,19 @@ impl<'a, Context: BuildContext> DistributionDatabase<'a, Context> {
.boxed_local()
.await?;

// Acquire the advisory lock.
#[cfg(windows)]
let _lock = {
let lock_entry = CacheEntry::new(
built_wheel.target.parent().unwrap(),
format!(
"{}.lock",
built_wheel.target.file_name().unwrap().to_str().unwrap()
),
);
lock_entry.lock().await.map_err(Error::CacheWrite)?
};

// If the wheel was unzipped previously, respect it. Source distributions are
// cached under a unique revision ID, so unzipped directories are never stale.
match built_wheel.target.canonicalize() {
Expand All @@ -373,28 +386,9 @@ impl<'a, Context: BuildContext> DistributionDatabase<'a, Context> {
}

// Otherwise, unzip the wheel.
let id = match self
let id = self
.unzip_wheel(&built_wheel.path, &built_wheel.target)
.await
{
Ok(id) => id,
Err(Error::CacheWrite(err)) => {
// If we failed due to a cache conflict, try again.
warn!("Failed to write wheel to cache; retrying ({err})");
return if let Ok(archive) = built_wheel.target.canonicalize() {
Ok(LocalWheel {
dist: Dist::Source(dist.clone()),
archive,
filename: built_wheel.filename,
hashes: built_wheel.hashes,
cache: built_wheel.cache_info,
})
} else {
Err(Error::CacheWrite(err))
};
}
Err(err) => return Err(err),
};
.await?;

Ok(LocalWheel {
dist: Dist::Source(dist.clone()),
Expand Down Expand Up @@ -534,14 +528,16 @@ impl<'a, Context: BuildContext> DistributionDatabase<'a, Context> {
dist: &BuiltDist,
hashes: HashPolicy<'_>,
) -> Result<Archive, Error> {
// Create an entry for the advisory lock.
let lock_entry = wheel_entry.with_file(format!("{}.lock", filename.stem()));
let _lock = acquire_lock(&lock_entry).await?;
// Acquire an advisory lock, to guard against concurrent writes.
#[cfg(windows)]
let _lock = {
let lock_entry = wheel_entry.with_file(format!("{}.lock", filename.stem()));
lock_entry.lock().await.map_err(Error::CacheWrite)?
};

// Create an entry for the HTTP cache.
let http_entry = wheel_entry.with_file(format!("{}.http", filename.stem()));


let download = |response: reqwest::Response| {
async {
let size = size.or_else(|| content_length(&response));
Expand Down Expand Up @@ -664,14 +660,16 @@ impl<'a, Context: BuildContext> DistributionDatabase<'a, Context> {
dist: &BuiltDist,
hashes: HashPolicy<'_>,
) -> Result<Archive, Error> {
// Create an entry for the advisory lock.
let lock_entry = wheel_entry.with_file(format!("{}.lock", filename.stem()));
let _lock = acquire_lock(&lock_entry).await?;
// Acquire an advisory lock, to guard against concurrent writes.
#[cfg(windows)]
let _lock = {
let lock_entry = wheel_entry.with_file(format!("{}.lock", filename.stem()));
lock_entry.lock().await.map_err(Error::CacheWrite)?
};

// Create an entry for the HTTP cache.
let http_entry = wheel_entry.with_file(format!("{}.http", filename.stem()));


let download = |response: reqwest::Response| {
async {
let size = size.or_else(|| content_length(&response));
Expand Down Expand Up @@ -825,9 +823,11 @@ impl<'a, Context: BuildContext> DistributionDatabase<'a, Context> {
dist: &BuiltDist,
hashes: HashPolicy<'_>,
) -> Result<LocalWheel, Error> {
// Create an entry for the advisory lock.
let lock_entry = wheel_entry.with_file(format!("{}.lock", filename.stem()));
let _lock = acquire_lock(&lock_entry).await?;
#[cfg(windows)]
let _lock = {
let lock_entry = wheel_entry.with_file(format!("{}.lock", filename.stem()));
lock_entry.lock().await.map_err(Error::CacheWrite)?
};

// Determine the last-modified time of the wheel.
let modified = Timestamp::from_path(path).map_err(Error::CacheRead)?;
Expand All @@ -836,7 +836,6 @@ impl<'a, Context: BuildContext> DistributionDatabase<'a, Context> {
let pointer_entry = wheel_entry.with_file(format!("{}.rev", filename.stem()));
let pointer = LocalArchivePointer::read_from(&pointer_entry)?;


// Extract the archive from the pointer.
let archive = pointer
.filter(|pointer| pointer.is_up_to_date(modified))
Expand Down Expand Up @@ -1114,15 +1113,3 @@ impl LocalArchivePointer {
CacheInfo::from_timestamp(self.timestamp)
}
}


/// Apply an advisory lock to a [`CacheEntry`] to prevent concurrent cache writes.
async fn acquire_lock(cache_entry: &CacheEntry) -> Result<LockedFile, Error> {
fs_err::create_dir_all(cache_entry.dir()).map_err(Error::CacheWrite)?;

let lock = LockedFile::acquire(cache_entry.path(), cache_entry.dir().display())
.await
.map_err(Error::CacheWrite)?;

Ok(lock)
}
35 changes: 13 additions & 22 deletions crates/uv-distribution/src/source/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ use uv_distribution_types::{
PathSourceUrl, SourceDist, SourceUrl,
};
use uv_extract::hash::Hasher;
use uv_fs::{rename_with_retry, write_atomic, LockedFile};
use uv_fs::{rename_with_retry, write_atomic};
use uv_git::{GitHubRepository, GitOid};
use uv_metadata::read_archive_metadata;
use uv_normalize::PackageName;
Expand Down Expand Up @@ -392,7 +392,7 @@ impl<'a, T: BuildContext> SourceDistributionBuilder<'a, T> {
hashes: HashPolicy<'_>,
client: &ManagedClient<'_>,
) -> Result<BuiltWheelMetadata, Error> {
let _lock = lock_shard(cache_shard).await?;
let _lock = cache_shard.lock().await.map_err(Error::CacheWrite)?;

// Fetch the revision for the source distribution.
let revision = self
Expand Down Expand Up @@ -505,7 +505,7 @@ impl<'a, T: BuildContext> SourceDistributionBuilder<'a, T> {
hashes: HashPolicy<'_>,
client: &ManagedClient<'_>,
) -> Result<ArchiveMetadata, Error> {
let _lock = lock_shard(cache_shard).await?;
let _lock = cache_shard.lock().await.map_err(Error::CacheWrite)?;

// Fetch the revision for the source distribution.
let revision = self
Expand Down Expand Up @@ -753,7 +753,7 @@ impl<'a, T: BuildContext> SourceDistributionBuilder<'a, T> {
tags: &Tags,
hashes: HashPolicy<'_>,
) -> Result<BuiltWheelMetadata, Error> {
let _lock = lock_shard(cache_shard).await?;
let _lock = cache_shard.lock().await.map_err(Error::CacheWrite)?;

// Fetch the revision for the source distribution.
let LocalRevisionPointer {
Expand Down Expand Up @@ -847,7 +847,7 @@ impl<'a, T: BuildContext> SourceDistributionBuilder<'a, T> {
cache_shard: &CacheShard,
hashes: HashPolicy<'_>,
) -> Result<ArchiveMetadata, Error> {
let _lock = lock_shard(cache_shard).await?;
let _lock = cache_shard.lock().await.map_err(Error::CacheWrite)?;

// Fetch the revision for the source distribution.
let LocalRevisionPointer { revision, .. } = self
Expand Down Expand Up @@ -1058,7 +1058,8 @@ impl<'a, T: BuildContext> SourceDistributionBuilder<'a, T> {
},
);

let _lock = lock_shard(&cache_shard).await?;
// Acquire the advisory lock.
let _lock = cache_shard.lock().await.map_err(Error::CacheWrite)?;

// Fetch the revision for the source distribution.
let LocalRevisionPointer {
Expand Down Expand Up @@ -1168,7 +1169,8 @@ impl<'a, T: BuildContext> SourceDistributionBuilder<'a, T> {
},
);

let _lock = lock_shard(&cache_shard).await?;
// Acquire the advisory lock.
let _lock = cache_shard.lock().await.map_err(Error::CacheWrite)?;

// Fetch the revision for the source distribution.
let LocalRevisionPointer { revision, .. } = self
Expand Down Expand Up @@ -1430,7 +1432,8 @@ impl<'a, T: BuildContext> SourceDistributionBuilder<'a, T> {
);
let metadata_entry = cache_shard.entry(METADATA);

let _lock = lock_shard(&cache_shard).await?;
// Acquire the advisory lock.
let _lock = cache_shard.lock().await.map_err(Error::CacheWrite)?;

// If there are build settings, we need to scope to a cache shard.
let config_settings = self.build_context.config_settings();
Expand Down Expand Up @@ -1581,7 +1584,8 @@ impl<'a, T: BuildContext> SourceDistributionBuilder<'a, T> {
);
let metadata_entry = cache_shard.entry(METADATA);

let _lock = lock_shard(&cache_shard).await?;
// Acquire the advisory lock.
let _lock = cache_shard.lock().await.map_err(Error::CacheWrite)?;

let path = if let Some(subdirectory) = resource.subdirectory {
Cow::Owned(fetch.path().join(subdirectory))
Expand Down Expand Up @@ -2882,16 +2886,3 @@ fn read_wheel_metadata(
.map_err(|err| Error::WheelMetadata(wheel.to_path_buf(), Box::new(err)))?;
Ok(ResolutionMetadata::parse_metadata(&dist_info)?)
}

/// Apply an advisory lock to a [`CacheShard`] to prevent concurrent builds.
pub(crate) async fn lock_shard(cache_shard: &CacheShard) -> Result<LockedFile, Error> {
let root = cache_shard.as_ref();

fs_err::create_dir_all(root).map_err(Error::CacheWrite)?;

let lock = LockedFile::acquire(root.join(".lock"), root.display())
.await
.map_err(Error::CacheWrite)?;

Ok(lock)
}
11 changes: 9 additions & 2 deletions crates/uv-fs/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,11 @@ pub async fn read_to_string_transcode(path: impl AsRef<Path>) -> std::io::Result

/// Create a symlink at `dst` pointing to `src`, replacing any existing symlink.
///
/// On Windows, this uses the `junction` crate to create a junction point.
/// Note because junctions are used, the source must be a directory.
/// On Windows, this uses the `junction` crate to create a junction point. The
/// operation is _not_ atomic, as we first delete the junction, then create a
/// junction at the same path.
///
/// Note that because junctions are used, the source must be a directory.
#[cfg(windows)]
pub fn replace_symlink(src: impl AsRef<Path>, dst: impl AsRef<Path>) -> std::io::Result<()> {
// If the source is a file, we can't create a junction
Expand Down Expand Up @@ -79,6 +82,10 @@ pub fn replace_symlink(src: impl AsRef<Path>, dst: impl AsRef<Path>) -> std::io:
}

/// Create a symlink at `dst` pointing to `src`, replacing any existing symlink if necessary.
///
/// On Unix, this method creates a temporary file, then moves it into place.
///
/// TODO(charlie): Consider using the `rust-atomicwrites` crate.
#[cfg(unix)]
pub fn replace_symlink(src: impl AsRef<Path>, dst: impl AsRef<Path>) -> std::io::Result<()> {
// Attempt to create the symlink directly.
Expand Down

0 comments on commit 3548f59

Please sign in to comment.