Skip to content

Commit

Permalink
Apply advisory locks when building source distributions (#3525)
Browse files Browse the repository at this point in the history
## Summary

I don't love this, but it turns out that setuptools is not robust to
parallel builds: pypa/setuptools#3119. As a
result, if you run uv from multiple processes, and they each attempt to
build the same source distribution, you can hit failures.

This PR applies an advisory lock to the source distribution directory.
We apply it unconditionally, even if we ultimately find something in the
cache and _don't_ do a build, which helps ensure that we only build the
distribution once (and wait for that build to complete) rather than
kicking off builds from each thread.

Closes #3512.

## Test Plan

Ran:

```sh
#!/bin/bash
make_venv(){
    target/debug/uv venv $1
    source $1/bin/activate
    target/debug/uv pip install opentracing --no-deps --verbose
}

for i in {1..8}
do
   make_venv ./$1/$i &
done
```
  • Loading branch information
charliermarsh committed May 13, 2024
1 parent 42c3bfa commit 9a92a3a
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 26 deletions.
2 changes: 1 addition & 1 deletion crates/uv-cache/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use rustc_hash::FxHashSet;
use tempfile::{tempdir, TempDir};
use tracing::debug;

pub use archive::ArchiveId;
use distribution_types::InstalledDist;
use pypi_types::Metadata23;
use uv_fs::{cachedir, directories};
Expand All @@ -23,7 +24,6 @@ use crate::removal::{rm_rf, Removal};
pub use crate::timestamp::Timestamp;
pub use crate::wheel::WheelCache;
use crate::wheel::WheelCacheKind;
pub use archive::ArchiveId;

mod archive;
mod by_timestamp;
Expand Down
66 changes: 45 additions & 21 deletions crates/uv-distribution/src/source/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use uv_client::{
};
use uv_configuration::{BuildKind, NoBuild};
use uv_extract::hash::Hasher;
use uv_fs::write_atomic;
use uv_fs::{write_atomic, LockedFile};
use uv_types::{BuildContext, SourceBuildTrait};

use crate::distribution_database::ManagedClient;
Expand Down Expand Up @@ -396,6 +396,8 @@ impl<'a, T: BuildContext> SourceDistributionBuilder<'a, T> {
hashes: HashPolicy<'_>,
client: &ManagedClient<'_>,
) -> Result<BuiltWheelMetadata, Error> {
let _lock = lock_shard(cache_shard).await?;

// Fetch the revision for the source distribution.
let revision = self
.url_revision(source, filename, url, cache_shard, hashes, client)
Expand Down Expand Up @@ -465,6 +467,8 @@ impl<'a, T: BuildContext> SourceDistributionBuilder<'a, T> {
hashes: HashPolicy<'_>,
client: &ManagedClient<'_>,
) -> Result<ArchiveMetadata, Error> {
let _lock = lock_shard(cache_shard).await?;

// Fetch the revision for the source distribution.
let revision = self
.url_revision(source, filename, url, cache_shard, hashes, client)
Expand Down Expand Up @@ -503,11 +507,10 @@ impl<'a, T: BuildContext> SourceDistributionBuilder<'a, T> {
.await?
{
// Store the metadata.
let cache_entry = cache_shard.entry(METADATA);
fs::create_dir_all(cache_entry.dir())
fs::create_dir_all(metadata_entry.dir())
.await
.map_err(Error::CacheWrite)?;
write_atomic(cache_entry.path(), rmp_serde::to_vec(&metadata)?)
write_atomic(metadata_entry.path(), rmp_serde::to_vec(&metadata)?)
.await
.map_err(Error::CacheWrite)?;

Expand All @@ -528,8 +531,7 @@ impl<'a, T: BuildContext> SourceDistributionBuilder<'a, T> {
.await?;

// Store the metadata.
let cache_entry = cache_shard.entry(METADATA);
write_atomic(cache_entry.path(), rmp_serde::to_vec(&metadata)?)
write_atomic(metadata_entry.path(), rmp_serde::to_vec(&metadata)?)
.await
.map_err(Error::CacheWrite)?;

Expand Down Expand Up @@ -625,6 +627,8 @@ impl<'a, T: BuildContext> SourceDistributionBuilder<'a, T> {
tags: &Tags,
hashes: HashPolicy<'_>,
) -> Result<BuiltWheelMetadata, Error> {
let _lock = lock_shard(cache_shard).await?;

// Fetch the revision for the source distribution.
let revision = self
.archive_revision(source, resource, cache_shard, hashes)
Expand Down Expand Up @@ -691,6 +695,8 @@ impl<'a, T: BuildContext> SourceDistributionBuilder<'a, T> {
cache_shard: &CacheShard,
hashes: HashPolicy<'_>,
) -> Result<ArchiveMetadata, Error> {
let _lock = lock_shard(cache_shard).await?;

// Fetch the revision for the source distribution.
let revision = self
.archive_revision(source, resource, cache_shard, hashes)
Expand Down Expand Up @@ -728,11 +734,10 @@ impl<'a, T: BuildContext> SourceDistributionBuilder<'a, T> {
.await?
{
// Store the metadata.
let cache_entry = cache_shard.entry(METADATA);
fs::create_dir_all(cache_entry.dir())
fs::create_dir_all(metadata_entry.dir())
.await
.map_err(Error::CacheWrite)?;
write_atomic(cache_entry.path(), rmp_serde::to_vec(&metadata)?)
write_atomic(metadata_entry.path(), rmp_serde::to_vec(&metadata)?)
.await
.map_err(Error::CacheWrite)?;

Expand All @@ -759,7 +764,6 @@ impl<'a, T: BuildContext> SourceDistributionBuilder<'a, T> {
}

// Store the metadata.
let metadata_entry = cache_shard.entry(METADATA);
write_atomic(metadata_entry.path(), rmp_serde::to_vec(&metadata)?)
.await
.map_err(Error::CacheWrite)?;
Expand Down Expand Up @@ -838,6 +842,8 @@ impl<'a, T: BuildContext> SourceDistributionBuilder<'a, T> {
WheelCache::Path(resource.url).root(),
);

let _lock = lock_shard(&cache_shard).await?;

// Fetch the revision for the source distribution.
let revision = self
.source_tree_revision(source, resource, &cache_shard)
Expand Down Expand Up @@ -902,6 +908,8 @@ impl<'a, T: BuildContext> SourceDistributionBuilder<'a, T> {
WheelCache::Path(resource.url).root(),
);

let _lock = lock_shard(&cache_shard).await?;

// Fetch the revision for the source distribution.
let revision = self
.source_tree_revision(source, resource, &cache_shard)
Expand All @@ -925,11 +933,10 @@ impl<'a, T: BuildContext> SourceDistributionBuilder<'a, T> {
.await?
{
// Store the metadata.
let cache_entry = cache_shard.entry(METADATA);
fs::create_dir_all(cache_entry.dir())
fs::create_dir_all(metadata_entry.dir())
.await
.map_err(Error::CacheWrite)?;
write_atomic(cache_entry.path(), rmp_serde::to_vec(&metadata)?)
write_atomic(metadata_entry.path(), rmp_serde::to_vec(&metadata)?)
.await
.map_err(Error::CacheWrite)?;

Expand All @@ -953,7 +960,6 @@ impl<'a, T: BuildContext> SourceDistributionBuilder<'a, T> {
}

// Store the metadata.
let metadata_entry = cache_shard.entry(METADATA);
write_atomic(metadata_entry.path(), rmp_serde::to_vec(&metadata)?)
.await
.map_err(Error::CacheWrite)?;
Expand Down Expand Up @@ -1039,6 +1045,8 @@ impl<'a, T: BuildContext> SourceDistributionBuilder<'a, T> {
WheelCache::Git(&url, &git_sha.to_short_string()).root(),
);

let _lock = lock_shard(&cache_shard).await?;

// If the cache contains a compatible wheel, return it.
if let Some(built_wheel) = BuiltWheelMetadata::find_in_cache(tags, &cache_shard) {
return Ok(built_wheel);
Expand All @@ -1060,8 +1068,8 @@ impl<'a, T: BuildContext> SourceDistributionBuilder<'a, T> {
}

// Store the metadata.
let cache_entry = cache_shard.entry(METADATA);
write_atomic(cache_entry.path(), rmp_serde::to_vec(&metadata)?)
let metadata_entry = cache_shard.entry(METADATA);
write_atomic(metadata_entry.path(), rmp_serde::to_vec(&metadata)?)
.await
.map_err(Error::CacheWrite)?;

Expand Down Expand Up @@ -1111,6 +1119,8 @@ impl<'a, T: BuildContext> SourceDistributionBuilder<'a, T> {
WheelCache::Git(&url, &git_sha.to_short_string()).root(),
);

let _lock = lock_shard(&cache_shard).await?;

// If the cache contains compatible metadata, return it.
let metadata_entry = cache_shard.entry(METADATA);
if self
Expand All @@ -1132,11 +1142,10 @@ impl<'a, T: BuildContext> SourceDistributionBuilder<'a, T> {
.await?
{
// Store the metadata.
let cache_entry = cache_shard.entry(METADATA);
fs::create_dir_all(cache_entry.dir())
fs::create_dir_all(metadata_entry.dir())
.await
.map_err(Error::CacheWrite)?;
write_atomic(cache_entry.path(), rmp_serde::to_vec(&metadata)?)
write_atomic(metadata_entry.path(), rmp_serde::to_vec(&metadata)?)
.await
.map_err(Error::CacheWrite)?;

Expand All @@ -1160,8 +1169,7 @@ impl<'a, T: BuildContext> SourceDistributionBuilder<'a, T> {
}

// Store the metadata.
let cache_entry = cache_shard.entry(METADATA);
write_atomic(cache_entry.path(), rmp_serde::to_vec(&metadata)?)
write_atomic(metadata_entry.path(), rmp_serde::to_vec(&metadata)?)
.await
.map_err(Error::CacheWrite)?;

Expand Down Expand Up @@ -1625,3 +1633,19 @@ fn read_wheel_metadata(
let dist_info = read_archive_metadata(filename, &mut archive)?;
Ok(Metadata23::parse_metadata(&dist_info)?)
}

/// Apply an advisory lock to a [`CacheShard`] to prevent concurrent builds.
async fn lock_shard(cache_shard: &CacheShard) -> Result<LockedFile, Error> {
let root = cache_shard.as_ref();

fs_err::create_dir_all(root).map_err(Error::CacheWrite)?;

let lock: LockedFile = tokio::task::spawn_blocking({
let root = root.to_path_buf();
move || LockedFile::acquire(root.join(".lock"), root.display())
})
.await?
.map_err(Error::CacheWrite)?;

Ok(lock)
}
8 changes: 4 additions & 4 deletions crates/uv/tests/pip_sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1273,7 +1273,7 @@ fn install_url_source_dist_cached() -> Result<()> {
----- stdout -----
----- stderr -----
Removed 126 files for tqdm ([SIZE])
Removed 127 files for tqdm ([SIZE])
"###
);

Expand Down Expand Up @@ -1370,7 +1370,7 @@ fn install_git_source_dist_cached() -> Result<()> {
----- stdout -----
----- stderr -----
Removed 3 files for werkzeug ([SIZE])
Removed 4 files for werkzeug ([SIZE])
"###
);

Expand Down Expand Up @@ -1471,7 +1471,7 @@ fn install_registry_source_dist_cached() -> Result<()> {
----- stdout -----
----- stderr -----
Removed 616 files for future ([SIZE])
Removed 617 files for future ([SIZE])
"###
);

Expand Down Expand Up @@ -1576,7 +1576,7 @@ fn install_path_source_dist_cached() -> Result<()> {
----- stdout -----
----- stderr -----
Removed 102 files for wheel ([SIZE])
Removed 103 files for wheel ([SIZE])
"###
);

Expand Down

0 comments on commit 9a92a3a

Please sign in to comment.