From f1840c77b6f6a5c1a65e4cdc2a09d2689de2a792 Mon Sep 17 00:00:00 2001 From: Charlie Marsh Date: Tue, 28 Jan 2025 15:33:49 -0500 Subject: [PATCH] Guard against concurrent cache writes on Windows (#11007) ## Summary On Windows, we have a lot of issues with atomic replacement and such. There are a bunch of different failure modes, but they generally involve: trying to persist a fail to a path at which the file already exists, trying to replace or remove a file while someone else is reading it, etc. This PR adds locks to all of the relevant database paths. We already use these advisory locks when building source distributions; now we use them when unzipping wheels, storing metadata, etc. Closes #11002. ## Test Plan I ran the following script: ```shell # Define the cache directory path $cacheDir = "C:\Users\crmar\workspace\uv\cache" # Clear the cache directory if it exists if (Test-Path $cacheDir) { Remove-Item -Recurse -Force $cacheDir } # Create the cache directory again New-Item -ItemType Directory -Force -Path $cacheDir # Define the command to run with --cache-dir flag $command = { param ($venvPath) # Create a virtual environment in the specified path with --python uv venv $venvPath # Run the pip install command with --cache-dir flag C:\Users\crmar\workspace\uv\target\profiling\uv.exe pip install flask==1.0.4 --no-binary flask --cache-dir C:\Users\crmar\workspace\uv\cache -v --python $venvPath } # Define the paths for the different virtual environments $venv1 = "C:\Users\crmar\workspace\uv\venv1" $venv2 = "C:\Users\crmar\workspace\uv\venv2" $venv3 = "C:\Users\crmar\workspace\uv\venv3" $venv4 = "C:\Users\crmar\workspace\uv\venv4" $venv5 = "C:\Users\crmar\workspace\uv\venv5" # Start the command in parallel five times using Start-Job, each with a different venv $job1 = Start-Job -ScriptBlock $command -ArgumentList $venv1 $job2 = Start-Job -ScriptBlock $command -ArgumentList $venv2 $job3 = Start-Job -ScriptBlock $command -ArgumentList $venv3 $job4 = Start-Job -ScriptBlock $command -ArgumentList $venv4 $job5 = Start-Job -ScriptBlock $command -ArgumentList $venv5 # Wait for all jobs to complete $jobs = @($job1, $job2, $job3, $job4, $job5) $jobs | ForEach-Object { Wait-Job $_ } # Retrieve the results (optional) $jobs | ForEach-Object { Receive-Job -Job $_ } # Clean up the jobs $jobs | ForEach-Object { Remove-Job -Job $_ } ``` And ensured it succeeded in five straight invocations (whereas on `main`, it consistently fails with a variety of different traces). --- crates/uv-cache/src/lib.rs | 14 +++- crates/uv-client/src/registry_client.rs | 21 ++++++ .../src/distribution_database.rs | 40 +++++++++- crates/uv-distribution/src/source/mod.rs | 35 ++++----- crates/uv-fs/src/lib.rs | 11 ++- crates/uv/tests/it/cache_clean.rs | 12 ++- crates/uv/tests/it/pip_sync.rs | 75 +++++++++---------- 7 files changed, 137 insertions(+), 71 deletions(-) diff --git a/crates/uv-cache/src/lib.rs b/crates/uv-cache/src/lib.rs index 049d09653..7fb61e772 100644 --- a/crates/uv-cache/src/lib.rs +++ b/crates/uv-cache/src/lib.rs @@ -12,7 +12,7 @@ use tracing::debug; pub use archive::ArchiveId; use uv_cache_info::Timestamp; use uv_distribution_types::InstalledDist; -use uv_fs::{cachedir, directories}; +use uv_fs::{cachedir, directories, LockedFile}; use uv_normalize::PackageName; use uv_pypi_types::ResolutionMetadata; @@ -74,6 +74,12 @@ impl CacheEntry { pub fn with_file(&self, file: impl AsRef) -> Self { Self(self.dir().join(file)) } + + /// Acquire the [`CacheEntry`] as an exclusive lock. + pub async fn lock(&self) -> Result { + fs_err::create_dir_all(self.dir())?; + LockedFile::acquire(self.path(), self.path().display()).await + } } impl AsRef for CacheEntry { @@ -97,6 +103,12 @@ impl CacheShard { pub fn shard(&self, dir: impl AsRef) -> Self { Self(self.0.join(dir.as_ref())) } + + /// Acquire the cache entry as an exclusive lock. + pub async fn lock(&self) -> Result { + fs_err::create_dir_all(self.as_ref())?; + LockedFile::acquire(self.join(".lock"), self.display()).await + } } impl AsRef for CacheShard { diff --git a/crates/uv-client/src/registry_client.rs b/crates/uv-client/src/registry_client.rs index 6901123ab..55eabf85f 100644 --- a/crates/uv-client/src/registry_client.rs +++ b/crates/uv-client/src/registry_client.rs @@ -336,6 +336,13 @@ impl RegistryClient { Connectivity::Offline => CacheControl::AllowStale, }; + // Acquire an advisory lock, to guard against concurrent writes. + #[cfg(windows)] + let _lock = { + let lock_entry = cache_entry.with_file(format!("{package_name}.lock")); + lock_entry.lock().await.map_err(ErrorKind::CacheWrite)? + }; + let result = if matches!(index, IndexUrl::Path(_)) { self.fetch_local_index(package_name, &url).await } else { @@ -614,6 +621,13 @@ impl RegistryClient { Connectivity::Offline => CacheControl::AllowStale, }; + // Acquire an advisory lock, to guard against concurrent writes. + #[cfg(windows)] + let _lock = { + let lock_entry = cache_entry.with_file(format!("{}.lock", filename.stem())); + lock_entry.lock().await.map_err(ErrorKind::CacheWrite)? + }; + let response_callback = |response: Response| async { let bytes = response .bytes() @@ -677,6 +691,13 @@ impl RegistryClient { Connectivity::Offline => CacheControl::AllowStale, }; + // Acquire an advisory lock, to guard against concurrent writes. + #[cfg(windows)] + let _lock = { + let lock_entry = cache_entry.with_file(format!("{}.lock", filename.stem())); + lock_entry.lock().await.map_err(ErrorKind::CacheWrite)? + }; + // Attempt to fetch via a range request. if index.map_or(true, |index| capabilities.supports_range_requests(index)) { let req = self diff --git a/crates/uv-distribution/src/distribution_database.rs b/crates/uv-distribution/src/distribution_database.rs index 3891e1916..9a74f9d2a 100644 --- a/crates/uv-distribution/src/distribution_database.rs +++ b/crates/uv-distribution/src/distribution_database.rs @@ -356,6 +356,19 @@ impl<'a, Context: BuildContext> DistributionDatabase<'a, Context> { .boxed_local() .await?; + // Acquire the advisory lock. + #[cfg(windows)] + let _lock = { + let lock_entry = CacheEntry::new( + built_wheel.target.parent().unwrap(), + format!( + "{}.lock", + built_wheel.target.file_name().unwrap().to_str().unwrap() + ), + ); + lock_entry.lock().await.map_err(Error::CacheWrite)? + }; + // If the wheel was unzipped previously, respect it. Source distributions are // cached under a unique revision ID, so unzipped directories are never stale. match built_wheel.target.canonicalize() { @@ -515,6 +528,13 @@ impl<'a, Context: BuildContext> DistributionDatabase<'a, Context> { dist: &BuiltDist, hashes: HashPolicy<'_>, ) -> Result { + // Acquire an advisory lock, to guard against concurrent writes. + #[cfg(windows)] + let _lock = { + let lock_entry = wheel_entry.with_file(format!("{}.lock", filename.stem())); + lock_entry.lock().await.map_err(Error::CacheWrite)? + }; + // Create an entry for the HTTP cache. let http_entry = wheel_entry.with_file(format!("{}.http", filename.stem())); @@ -640,6 +660,13 @@ impl<'a, Context: BuildContext> DistributionDatabase<'a, Context> { dist: &BuiltDist, hashes: HashPolicy<'_>, ) -> Result { + // Acquire an advisory lock, to guard against concurrent writes. + #[cfg(windows)] + let _lock = { + let lock_entry = wheel_entry.with_file(format!("{}.lock", filename.stem())); + lock_entry.lock().await.map_err(Error::CacheWrite)? + }; + // Create an entry for the HTTP cache. let http_entry = wheel_entry.with_file(format!("{}.http", filename.stem())); @@ -796,6 +823,12 @@ impl<'a, Context: BuildContext> DistributionDatabase<'a, Context> { dist: &BuiltDist, hashes: HashPolicy<'_>, ) -> Result { + #[cfg(windows)] + let _lock = { + let lock_entry = wheel_entry.with_file(format!("{}.lock", filename.stem())); + lock_entry.lock().await.map_err(Error::CacheWrite)? + }; + // Determine the last-modified time of the wheel. let modified = Timestamp::from_path(path).map_err(Error::CacheRead)?; @@ -890,10 +923,11 @@ impl<'a, Context: BuildContext> DistributionDatabase<'a, Context> { let temp_dir = tokio::task::spawn_blocking({ let path = path.to_owned(); let root = self.build_context.cache().root().to_path_buf(); - move || -> Result { + move || -> Result { // Unzip the wheel into a temporary directory. - let temp_dir = tempfile::tempdir_in(root)?; - uv_extract::unzip(fs_err::File::open(path)?, temp_dir.path())?; + let temp_dir = tempfile::tempdir_in(root).map_err(Error::CacheWrite)?; + let reader = fs_err::File::open(path).map_err(Error::CacheWrite)?; + uv_extract::unzip(reader, temp_dir.path())?; Ok(temp_dir) } }) diff --git a/crates/uv-distribution/src/source/mod.rs b/crates/uv-distribution/src/source/mod.rs index 80c868d40..9921855eb 100644 --- a/crates/uv-distribution/src/source/mod.rs +++ b/crates/uv-distribution/src/source/mod.rs @@ -39,7 +39,7 @@ use uv_distribution_types::{ PathSourceUrl, SourceDist, SourceUrl, }; use uv_extract::hash::Hasher; -use uv_fs::{rename_with_retry, write_atomic, LockedFile}; +use uv_fs::{rename_with_retry, write_atomic}; use uv_git::{GitHubRepository, GitOid}; use uv_metadata::read_archive_metadata; use uv_normalize::PackageName; @@ -392,7 +392,7 @@ impl<'a, T: BuildContext> SourceDistributionBuilder<'a, T> { hashes: HashPolicy<'_>, client: &ManagedClient<'_>, ) -> Result { - let _lock = lock_shard(cache_shard).await?; + let _lock = cache_shard.lock().await.map_err(Error::CacheWrite)?; // Fetch the revision for the source distribution. let revision = self @@ -505,7 +505,7 @@ impl<'a, T: BuildContext> SourceDistributionBuilder<'a, T> { hashes: HashPolicy<'_>, client: &ManagedClient<'_>, ) -> Result { - let _lock = lock_shard(cache_shard).await?; + let _lock = cache_shard.lock().await.map_err(Error::CacheWrite)?; // Fetch the revision for the source distribution. let revision = self @@ -753,7 +753,7 @@ impl<'a, T: BuildContext> SourceDistributionBuilder<'a, T> { tags: &Tags, hashes: HashPolicy<'_>, ) -> Result { - let _lock = lock_shard(cache_shard).await?; + let _lock = cache_shard.lock().await.map_err(Error::CacheWrite)?; // Fetch the revision for the source distribution. let LocalRevisionPointer { @@ -847,7 +847,7 @@ impl<'a, T: BuildContext> SourceDistributionBuilder<'a, T> { cache_shard: &CacheShard, hashes: HashPolicy<'_>, ) -> Result { - let _lock = lock_shard(cache_shard).await?; + let _lock = cache_shard.lock().await.map_err(Error::CacheWrite)?; // Fetch the revision for the source distribution. let LocalRevisionPointer { revision, .. } = self @@ -1058,7 +1058,8 @@ impl<'a, T: BuildContext> SourceDistributionBuilder<'a, T> { }, ); - let _lock = lock_shard(&cache_shard).await?; + // Acquire the advisory lock. + let _lock = cache_shard.lock().await.map_err(Error::CacheWrite)?; // Fetch the revision for the source distribution. let LocalRevisionPointer { @@ -1168,7 +1169,8 @@ impl<'a, T: BuildContext> SourceDistributionBuilder<'a, T> { }, ); - let _lock = lock_shard(&cache_shard).await?; + // Acquire the advisory lock. + let _lock = cache_shard.lock().await.map_err(Error::CacheWrite)?; // Fetch the revision for the source distribution. let LocalRevisionPointer { revision, .. } = self @@ -1430,7 +1432,8 @@ impl<'a, T: BuildContext> SourceDistributionBuilder<'a, T> { ); let metadata_entry = cache_shard.entry(METADATA); - let _lock = lock_shard(&cache_shard).await?; + // Acquire the advisory lock. + let _lock = cache_shard.lock().await.map_err(Error::CacheWrite)?; // If there are build settings, we need to scope to a cache shard. let config_settings = self.build_context.config_settings(); @@ -1581,7 +1584,8 @@ impl<'a, T: BuildContext> SourceDistributionBuilder<'a, T> { ); let metadata_entry = cache_shard.entry(METADATA); - let _lock = lock_shard(&cache_shard).await?; + // Acquire the advisory lock. + let _lock = cache_shard.lock().await.map_err(Error::CacheWrite)?; let path = if let Some(subdirectory) = resource.subdirectory { Cow::Owned(fetch.path().join(subdirectory)) @@ -2882,16 +2886,3 @@ fn read_wheel_metadata( .map_err(|err| Error::WheelMetadata(wheel.to_path_buf(), Box::new(err)))?; Ok(ResolutionMetadata::parse_metadata(&dist_info)?) } - -/// Apply an advisory lock to a [`CacheShard`] to prevent concurrent builds. -async fn lock_shard(cache_shard: &CacheShard) -> Result { - let root = cache_shard.as_ref(); - - fs_err::create_dir_all(root).map_err(Error::CacheWrite)?; - - let lock = LockedFile::acquire(root.join(".lock"), root.display()) - .await - .map_err(Error::CacheWrite)?; - - Ok(lock) -} diff --git a/crates/uv-fs/src/lib.rs b/crates/uv-fs/src/lib.rs index 940af094d..989ede8d3 100644 --- a/crates/uv-fs/src/lib.rs +++ b/crates/uv-fs/src/lib.rs @@ -45,8 +45,11 @@ pub async fn read_to_string_transcode(path: impl AsRef) -> std::io::Result /// Create a symlink at `dst` pointing to `src`, replacing any existing symlink. /// -/// On Windows, this uses the `junction` crate to create a junction point. -/// Note because junctions are used, the source must be a directory. +/// On Windows, this uses the `junction` crate to create a junction point. The +/// operation is _not_ atomic, as we first delete the junction, then create a +/// junction at the same path. +/// +/// Note that because junctions are used, the source must be a directory. #[cfg(windows)] pub fn replace_symlink(src: impl AsRef, dst: impl AsRef) -> std::io::Result<()> { // If the source is a file, we can't create a junction @@ -79,6 +82,10 @@ pub fn replace_symlink(src: impl AsRef, dst: impl AsRef) -> std::io: } /// Create a symlink at `dst` pointing to `src`, replacing any existing symlink if necessary. +/// +/// On Unix, this method creates a temporary file, then moves it into place. +/// +/// TODO(charlie): Consider using the `rust-atomicwrites` crate. #[cfg(unix)] pub fn replace_symlink(src: impl AsRef, dst: impl AsRef) -> std::io::Result<()> { // Attempt to create the symlink directly. diff --git a/crates/uv/tests/it/cache_clean.rs b/crates/uv/tests/it/cache_clean.rs index 38a08c79a..3e555cfe2 100644 --- a/crates/uv/tests/it/cache_clean.rs +++ b/crates/uv/tests/it/cache_clean.rs @@ -63,11 +63,13 @@ fn clean_package_pypi() -> Result<()> { .filters() .into_iter() .chain([ - // The cache entry does not have a stable key, so we filter it out + // The cache entry does not have a stable key, so we filter it out. ( r"\[CACHE_DIR\](\\|\/)(.+)(\\|\/).*", "[CACHE_DIR]/$2/[ENTRY]", ), + // The file count varies by operating system, so we filter it out. + ("Removed \\d+ files?", "Removed [N] files"), ]) .collect(); @@ -79,7 +81,7 @@ fn clean_package_pypi() -> Result<()> { ----- stderr ----- DEBUG uv [VERSION] ([COMMIT] DATE) DEBUG Removing dangling cache entry: [CACHE_DIR]/archive-v0/[ENTRY] - Removed 12 files ([SIZE]) + Removed [N] files ([SIZE]) "###); // Assert that the `.rkyv` file is removed for `iniconfig`. @@ -136,11 +138,13 @@ fn clean_package_index() -> Result<()> { .filters() .into_iter() .chain([ - // The cache entry does not have a stable key, so we filter it out + // The cache entry does not have a stable key, so we filter it out. ( r"\[CACHE_DIR\](\\|\/)(.+)(\\|\/).*", "[CACHE_DIR]/$2/[ENTRY]", ), + // The file count varies by operating system, so we filter it out. + ("Removed \\d+ files?", "Removed [N] files"), ]) .collect(); @@ -152,7 +156,7 @@ fn clean_package_index() -> Result<()> { ----- stderr ----- DEBUG uv [VERSION] ([COMMIT] DATE) DEBUG Removing dangling cache entry: [CACHE_DIR]/archive-v0/[ENTRY] - Removed 12 files ([SIZE]) + Removed [N] files ([SIZE]) "###); // Assert that the `.rkyv` file is removed for `iniconfig`. diff --git a/crates/uv/tests/it/pip_sync.rs b/crates/uv/tests/it/pip_sync.rs index cd58221dc..f19339320 100644 --- a/crates/uv/tests/it/pip_sync.rs +++ b/crates/uv/tests/it/pip_sync.rs @@ -1418,14 +1418,18 @@ fn install_url_source_dist_cached() -> Result<()> { // Clear the cache, then re-run the installation in a new virtual environment. context.reset_venv(); - uv_snapshot!(context.clean() - .arg("source_distribution"), @r###" + let filters = std::iter::once(("Removed \\d+ files?", "Removed [N] files")) + .chain(context.filters()) + .collect::>(); + uv_snapshot!( + filters, + context.clean().arg("source_distribution"), @r###" success: true exit_code: 0 ----- stdout ----- ----- stderr ----- - Removed 19 files ([SIZE]) + Removed [N] files ([SIZE]) "### ); @@ -1600,19 +1604,9 @@ fn install_registry_source_dist_cached() -> Result<()> { // Clear the cache, then re-run the installation in a new virtual environment. context.reset_venv(); - let filters: Vec<(&str, &str)> = if cfg!(windows) { - // On Windows, the number of files removed is different. - [("Removed 13 files", "Removed 14 files")] - .into_iter() - .chain(context.filters()) - .collect() - } else { - // For some Linux distributions, like Gentoo, the number of files removed is different. - [("Removed 12 files", "Removed 14 files")] - .into_iter() - .chain(context.filters()) - .collect() - }; + let filters = std::iter::once(("Removed \\d+ files?", "Removed [N] files")) + .chain(context.filters()) + .collect::>(); uv_snapshot!(filters, context.clean() .arg("source_distribution"), @r###" success: true @@ -1620,7 +1614,7 @@ fn install_registry_source_dist_cached() -> Result<()> { ----- stdout ----- ----- stderr ----- - Removed 20 files ([SIZE]) + Removed [N] files ([SIZE]) "### ); @@ -1710,14 +1704,18 @@ fn install_path_source_dist_cached() -> Result<()> { // Clear the cache, then re-run the installation in a new virtual environment. context.reset_venv(); - uv_snapshot!(context.clean() - .arg("source-distribution"), @r###" + let filters = std::iter::once(("Removed \\d+ files?", "Removed [N] files")) + .chain(context.filters()) + .collect::>(); + uv_snapshot!( + filters, + context.clean().arg("source-distribution"), @r###" success: true exit_code: 0 ----- stdout ----- ----- stderr ----- - Removed 19 files ([SIZE]) + Removed [N] files ([SIZE]) "### ); @@ -1800,23 +1798,18 @@ fn install_path_built_dist_cached() -> Result<()> { // Clear the cache, then re-run the installation in a new virtual environment. context.reset_venv(); - let filters = if cfg!(windows) { - // We do not display sizes on Windows - [("Removed 1 file", "Removed 1 file ([SIZE])")] - .into_iter() - .chain(context.filters()) - .collect() - } else { - context.filters() - }; - uv_snapshot!(filters, context.clean() - .arg("tomli"), @r###" + let filters = std::iter::once(("Removed \\d+ files?", "Removed [N] files")) + .chain(context.filters()) + .collect::>(); + uv_snapshot!( + filters, + context.clean().arg("tomli"), @r###" success: true exit_code: 0 ----- stdout ----- ----- stderr ----- - Removed 11 files ([SIZE]) + Removed [N] files ([SIZE]) "### ); @@ -1849,7 +1842,7 @@ fn install_url_built_dist_cached() -> Result<()> { let requirements_txt = context.temp_dir.child("requirements.txt"); requirements_txt.write_str("tqdm @ https://files.pythonhosted.org/packages/00/e5/f12a80907d0884e6dff9c16d0c0114d81b8cd07dc3ae54c5e962cc83037e/tqdm-4.66.1-py3-none-any.whl")?; - let filters = if cfg!(windows) { + let context_filters = if cfg!(windows) { [("warning: The package `tqdm` requires `colorama ; sys_platform == 'win32'`, but it's not installed\n", "")] .into_iter() .chain(context.filters()) @@ -1857,7 +1850,7 @@ fn install_url_built_dist_cached() -> Result<()> { } else { context.filters() }; - uv_snapshot!(filters, context.pip_sync() + uv_snapshot!(context_filters, context.pip_sync() .arg("requirements.txt") .arg("--strict"), @r###" success: true @@ -1877,7 +1870,7 @@ fn install_url_built_dist_cached() -> Result<()> { // Re-run the installation in a new virtual environment. context.reset_venv(); - uv_snapshot!(filters, context.pip_sync() + uv_snapshot!(context_filters, context.pip_sync() .arg("requirements.txt") .arg("--strict") , @r###" @@ -1897,18 +1890,22 @@ fn install_url_built_dist_cached() -> Result<()> { // Clear the cache, then re-run the installation in a new virtual environment. context.reset_venv(); - uv_snapshot!(context.clean() - .arg("tqdm"), @r###" + let filters = std::iter::once(("Removed \\d+ files?", "Removed [N] files")) + .chain(context_filters.clone()) + .collect::>(); + uv_snapshot!( + filters, + context.clean().arg("tqdm"), @r###" success: true exit_code: 0 ----- stdout ----- ----- stderr ----- - Removed 43 files ([SIZE]) + Removed [N] files ([SIZE]) "### ); - uv_snapshot!(filters, context.pip_sync() + uv_snapshot!(context_filters, context.pip_sync() .arg("requirements.txt") .arg("--strict") , @r###"