Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Guard against concurrent cache writes on Windows #11007

Merged
merged 3 commits into from
Jan 28, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 13 additions & 1 deletion crates/uv-cache/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use tracing::debug;
pub use archive::ArchiveId;
use uv_cache_info::Timestamp;
use uv_distribution_types::InstalledDist;
use uv_fs::{cachedir, directories};
use uv_fs::{cachedir, directories, LockedFile};
use uv_normalize::PackageName;
use uv_pypi_types::ResolutionMetadata;

Expand Down Expand Up @@ -74,6 +74,12 @@ impl CacheEntry {
pub fn with_file(&self, file: impl AsRef<Path>) -> Self {
Self(self.dir().join(file))
}

/// Acquire the [`CacheEntry`] as an exclusive lock.
pub async fn lock(&self) -> Result<LockedFile, io::Error> {
fs_err::create_dir_all(self.dir())?;
LockedFile::acquire(self.path(), self.path().display()).await
}
}

impl AsRef<Path> for CacheEntry {
Expand All @@ -97,6 +103,12 @@ impl CacheShard {
pub fn shard(&self, dir: impl AsRef<Path>) -> Self {
Self(self.0.join(dir.as_ref()))
}

/// Acquire the cache entry as an exclusive lock.
pub async fn lock(&self) -> Result<LockedFile, io::Error> {
fs_err::create_dir_all(self.as_ref())?;
LockedFile::acquire(self.join(".lock"), self.display()).await
}
}

impl AsRef<Path> for CacheShard {
Expand Down
21 changes: 21 additions & 0 deletions crates/uv-client/src/registry_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -336,6 +336,13 @@ impl RegistryClient {
Connectivity::Offline => CacheControl::AllowStale,
};

// Acquire an advisory lock, to guard against concurrent writes.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you extend the comment explaining the [cfg(windows)]?

#[cfg(windows)]
let _lock = {
let lock_entry = cache_entry.with_file(format!("{package_name}.lock"));
lock_entry.lock().await.map_err(ErrorKind::CacheWrite)?
};

let result = if matches!(index, IndexUrl::Path(_)) {
self.fetch_local_index(package_name, &url).await
} else {
Expand Down Expand Up @@ -614,6 +621,13 @@ impl RegistryClient {
Connectivity::Offline => CacheControl::AllowStale,
};

// Acquire an advisory lock, to guard against concurrent writes.
#[cfg(windows)]
let _lock = {
let lock_entry = cache_entry.with_file(format!("{}.lock", filename.stem()));
lock_entry.lock().await.map_err(ErrorKind::CacheWrite)?
};

let response_callback = |response: Response| async {
let bytes = response
.bytes()
Expand Down Expand Up @@ -677,6 +691,13 @@ impl RegistryClient {
Connectivity::Offline => CacheControl::AllowStale,
};

// Acquire an advisory lock, to guard against concurrent writes.
#[cfg(windows)]
let _lock = {
let lock_entry = cache_entry.with_file(format!("{}.lock", filename.stem()));
lock_entry.lock().await.map_err(ErrorKind::CacheWrite)?
};

// Attempt to fetch via a range request.
if index.map_or(true, |index| capabilities.supports_range_requests(index)) {
let req = self
Expand Down
40 changes: 37 additions & 3 deletions crates/uv-distribution/src/distribution_database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -356,6 +356,19 @@ impl<'a, Context: BuildContext> DistributionDatabase<'a, Context> {
.boxed_local()
.await?;

// Acquire the advisory lock.
#[cfg(windows)]
let _lock = {
let lock_entry = CacheEntry::new(
built_wheel.target.parent().unwrap(),
format!(
"{}.lock",
built_wheel.target.file_name().unwrap().to_str().unwrap()
),
);
lock_entry.lock().await.map_err(Error::CacheWrite)?
};

// If the wheel was unzipped previously, respect it. Source distributions are
// cached under a unique revision ID, so unzipped directories are never stale.
match built_wheel.target.canonicalize() {
Expand Down Expand Up @@ -515,6 +528,13 @@ impl<'a, Context: BuildContext> DistributionDatabase<'a, Context> {
dist: &BuiltDist,
hashes: HashPolicy<'_>,
) -> Result<Archive, Error> {
// Acquire an advisory lock, to guard against concurrent writes.
#[cfg(windows)]
let _lock = {
let lock_entry = wheel_entry.with_file(format!("{}.lock", filename.stem()));
lock_entry.lock().await.map_err(Error::CacheWrite)?
};

// Create an entry for the HTTP cache.
let http_entry = wheel_entry.with_file(format!("{}.http", filename.stem()));

Expand Down Expand Up @@ -640,6 +660,13 @@ impl<'a, Context: BuildContext> DistributionDatabase<'a, Context> {
dist: &BuiltDist,
hashes: HashPolicy<'_>,
) -> Result<Archive, Error> {
// Acquire an advisory lock, to guard against concurrent writes.
#[cfg(windows)]
let _lock = {
let lock_entry = wheel_entry.with_file(format!("{}.lock", filename.stem()));
lock_entry.lock().await.map_err(Error::CacheWrite)?
};

// Create an entry for the HTTP cache.
let http_entry = wheel_entry.with_file(format!("{}.http", filename.stem()));

Expand Down Expand Up @@ -796,6 +823,12 @@ impl<'a, Context: BuildContext> DistributionDatabase<'a, Context> {
dist: &BuiltDist,
hashes: HashPolicy<'_>,
) -> Result<LocalWheel, Error> {
#[cfg(windows)]
let _lock = {
let lock_entry = wheel_entry.with_file(format!("{}.lock", filename.stem()));
lock_entry.lock().await.map_err(Error::CacheWrite)?
};

// Determine the last-modified time of the wheel.
let modified = Timestamp::from_path(path).map_err(Error::CacheRead)?;

Expand Down Expand Up @@ -890,10 +923,11 @@ impl<'a, Context: BuildContext> DistributionDatabase<'a, Context> {
let temp_dir = tokio::task::spawn_blocking({
let path = path.to_owned();
let root = self.build_context.cache().root().to_path_buf();
move || -> Result<TempDir, uv_extract::Error> {
move || -> Result<TempDir, Error> {
// Unzip the wheel into a temporary directory.
let temp_dir = tempfile::tempdir_in(root)?;
uv_extract::unzip(fs_err::File::open(path)?, temp_dir.path())?;
let temp_dir = tempfile::tempdir_in(root).map_err(Error::CacheWrite)?;
let reader = fs_err::File::open(path).map_err(Error::CacheWrite)?;
uv_extract::unzip(reader, temp_dir.path())?;
Ok(temp_dir)
}
})
Expand Down
35 changes: 13 additions & 22 deletions crates/uv-distribution/src/source/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ use uv_distribution_types::{
PathSourceUrl, SourceDist, SourceUrl,
};
use uv_extract::hash::Hasher;
use uv_fs::{rename_with_retry, write_atomic, LockedFile};
use uv_fs::{rename_with_retry, write_atomic};
use uv_git::{GitHubRepository, GitOid};
use uv_metadata::read_archive_metadata;
use uv_normalize::PackageName;
Expand Down Expand Up @@ -392,7 +392,7 @@ impl<'a, T: BuildContext> SourceDistributionBuilder<'a, T> {
hashes: HashPolicy<'_>,
client: &ManagedClient<'_>,
) -> Result<BuiltWheelMetadata, Error> {
let _lock = lock_shard(cache_shard).await?;
let _lock = cache_shard.lock().await.map_err(Error::CacheWrite)?;

// Fetch the revision for the source distribution.
let revision = self
Expand Down Expand Up @@ -505,7 +505,7 @@ impl<'a, T: BuildContext> SourceDistributionBuilder<'a, T> {
hashes: HashPolicy<'_>,
client: &ManagedClient<'_>,
) -> Result<ArchiveMetadata, Error> {
let _lock = lock_shard(cache_shard).await?;
let _lock = cache_shard.lock().await.map_err(Error::CacheWrite)?;

// Fetch the revision for the source distribution.
let revision = self
Expand Down Expand Up @@ -753,7 +753,7 @@ impl<'a, T: BuildContext> SourceDistributionBuilder<'a, T> {
tags: &Tags,
hashes: HashPolicy<'_>,
) -> Result<BuiltWheelMetadata, Error> {
let _lock = lock_shard(cache_shard).await?;
let _lock = cache_shard.lock().await.map_err(Error::CacheWrite)?;

// Fetch the revision for the source distribution.
let LocalRevisionPointer {
Expand Down Expand Up @@ -847,7 +847,7 @@ impl<'a, T: BuildContext> SourceDistributionBuilder<'a, T> {
cache_shard: &CacheShard,
hashes: HashPolicy<'_>,
) -> Result<ArchiveMetadata, Error> {
let _lock = lock_shard(cache_shard).await?;
let _lock = cache_shard.lock().await.map_err(Error::CacheWrite)?;

// Fetch the revision for the source distribution.
let LocalRevisionPointer { revision, .. } = self
Expand Down Expand Up @@ -1058,7 +1058,8 @@ impl<'a, T: BuildContext> SourceDistributionBuilder<'a, T> {
},
);

let _lock = lock_shard(&cache_shard).await?;
// Acquire the advisory lock.
let _lock = cache_shard.lock().await.map_err(Error::CacheWrite)?;

// Fetch the revision for the source distribution.
let LocalRevisionPointer {
Expand Down Expand Up @@ -1168,7 +1169,8 @@ impl<'a, T: BuildContext> SourceDistributionBuilder<'a, T> {
},
);

let _lock = lock_shard(&cache_shard).await?;
// Acquire the advisory lock.
let _lock = cache_shard.lock().await.map_err(Error::CacheWrite)?;

// Fetch the revision for the source distribution.
let LocalRevisionPointer { revision, .. } = self
Expand Down Expand Up @@ -1430,7 +1432,8 @@ impl<'a, T: BuildContext> SourceDistributionBuilder<'a, T> {
);
let metadata_entry = cache_shard.entry(METADATA);

let _lock = lock_shard(&cache_shard).await?;
// Acquire the advisory lock.
let _lock = cache_shard.lock().await.map_err(Error::CacheWrite)?;

// If there are build settings, we need to scope to a cache shard.
let config_settings = self.build_context.config_settings();
Expand Down Expand Up @@ -1581,7 +1584,8 @@ impl<'a, T: BuildContext> SourceDistributionBuilder<'a, T> {
);
let metadata_entry = cache_shard.entry(METADATA);

let _lock = lock_shard(&cache_shard).await?;
// Acquire the advisory lock.
let _lock = cache_shard.lock().await.map_err(Error::CacheWrite)?;

let path = if let Some(subdirectory) = resource.subdirectory {
Cow::Owned(fetch.path().join(subdirectory))
Expand Down Expand Up @@ -2882,16 +2886,3 @@ fn read_wheel_metadata(
.map_err(|err| Error::WheelMetadata(wheel.to_path_buf(), Box::new(err)))?;
Ok(ResolutionMetadata::parse_metadata(&dist_info)?)
}

/// Apply an advisory lock to a [`CacheShard`] to prevent concurrent builds.
async fn lock_shard(cache_shard: &CacheShard) -> Result<LockedFile, Error> {
let root = cache_shard.as_ref();

fs_err::create_dir_all(root).map_err(Error::CacheWrite)?;

let lock = LockedFile::acquire(root.join(".lock"), root.display())
.await
.map_err(Error::CacheWrite)?;

Ok(lock)
}
11 changes: 9 additions & 2 deletions crates/uv-fs/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,11 @@ pub async fn read_to_string_transcode(path: impl AsRef<Path>) -> std::io::Result

/// Create a symlink at `dst` pointing to `src`, replacing any existing symlink.
///
/// On Windows, this uses the `junction` crate to create a junction point.
/// Note because junctions are used, the source must be a directory.
/// On Windows, this uses the `junction` crate to create a junction point. The
/// operation is _not_ atomic, as we first delete the junction, then create a
/// junction at the same path.
///
/// Note that because junctions are used, the source must be a directory.
Comment on lines +48 to +52
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do i read this comment correctly that there is no way for us to make this atomic?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can try, but making it atomic doesn't solve the problem. We could still run into issues whereby we attempt to overwrite the file while it's open, right?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's good to know, i didn't think of that case.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's somewhat tragic. I think we should still try to make this atomic though.

#[cfg(windows)]
pub fn replace_symlink(src: impl AsRef<Path>, dst: impl AsRef<Path>) -> std::io::Result<()> {
// If the source is a file, we can't create a junction
Expand Down Expand Up @@ -79,6 +82,10 @@ pub fn replace_symlink(src: impl AsRef<Path>, dst: impl AsRef<Path>) -> std::io:
}

/// Create a symlink at `dst` pointing to `src`, replacing any existing symlink if necessary.
///
/// On Unix, this method creates a temporary file, then moves it into place.
///
/// TODO(charlie): Consider using the `rust-atomicwrites` crate.
#[cfg(unix)]
pub fn replace_symlink(src: impl AsRef<Path>, dst: impl AsRef<Path>) -> std::io::Result<()> {
// Attempt to create the symlink directly.
Expand Down
12 changes: 8 additions & 4 deletions crates/uv/tests/it/cache_clean.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,11 +63,13 @@ fn clean_package_pypi() -> Result<()> {
.filters()
.into_iter()
.chain([
// The cache entry does not have a stable key, so we filter it out
// The cache entry does not have a stable key, so we filter it out.
(
r"\[CACHE_DIR\](\\|\/)(.+)(\\|\/).*",
"[CACHE_DIR]/$2/[ENTRY]",
),
// The file count varies by operating system, so we filter it out.
("Removed \\d+ files?", "Removed [N] files"),
])
.collect();

Expand All @@ -79,7 +81,7 @@ fn clean_package_pypi() -> Result<()> {
----- stderr -----
DEBUG uv [VERSION] ([COMMIT] DATE)
DEBUG Removing dangling cache entry: [CACHE_DIR]/archive-v0/[ENTRY]
Removed 12 files ([SIZE])
Removed [N] files ([SIZE])
"###);

// Assert that the `.rkyv` file is removed for `iniconfig`.
Expand Down Expand Up @@ -136,11 +138,13 @@ fn clean_package_index() -> Result<()> {
.filters()
.into_iter()
.chain([
// The cache entry does not have a stable key, so we filter it out
// The cache entry does not have a stable key, so we filter it out.
(
r"\[CACHE_DIR\](\\|\/)(.+)(\\|\/).*",
"[CACHE_DIR]/$2/[ENTRY]",
),
// The file count varies by operating system, so we filter it out.
("Removed \\d+ files?", "Removed [N] files"),
])
.collect();

Expand All @@ -152,7 +156,7 @@ fn clean_package_index() -> Result<()> {
----- stderr -----
DEBUG uv [VERSION] ([COMMIT] DATE)
DEBUG Removing dangling cache entry: [CACHE_DIR]/archive-v0/[ENTRY]
Removed 12 files ([SIZE])
Removed [N] files ([SIZE])
"###);

// Assert that the `.rkyv` file is removed for `iniconfig`.
Expand Down
Loading
Loading