Add a 5 min default timeout for deadlocks (#16342)

When a process is running and another calls `uv cache clean` or `uv
cache prune` we currently deadlock - sometimes until the CI timeout
(https://github.com/astral-sh/setup-uv/issues/588). To avoid this, we
add a default 5 min timeout waiting for a lock. 5 min balances allowing
in-progress builds to finish, especially with larger native
dependencies, while also giving timely errors for deadlocks on (remote)
systems.

Commit 1 is a refactoring.

This branch also fixes a problem with the logging where acquired and
released resources currently mismatch:

```
DEBUG Acquired lock for `https://github.com/tqdm/tqdm`
DEBUG Using existing Git source `https://github.com/tqdm/tqdm`
DEBUG Released lock at `C:\Users\Konsti\AppData\Local\uv\cache\git-v0\locks\16bb813afef8edd2`
```
This commit is contained in:
konsti 2025-12-04 14:59:04 +01:00 committed by GitHub
parent 2748dce860
commit 62bf92132b
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
47 changed files with 707 additions and 397 deletions

3
Cargo.lock generated
View File

@ -5590,6 +5590,7 @@ dependencies = [
"uv-client", "uv-client",
"uv-distribution-filename", "uv-distribution-filename",
"uv-extract", "uv-extract",
"uv-fs",
"uv-pep440", "uv-pep440",
"uv-platform", "uv-platform",
"uv-redacted", "uv-redacted",
@ -6123,8 +6124,10 @@ dependencies = [
"schemars", "schemars",
"serde", "serde",
"tempfile", "tempfile",
"thiserror 2.0.17",
"tokio", "tokio",
"tracing", "tracing",
"uv-static",
"windows 0.59.0", "windows 0.59.0",
] ]

View File

@ -178,7 +178,7 @@ tempfile = { version = "3.14.0" }
textwrap = { version = "0.16.1" } textwrap = { version = "0.16.1" }
thiserror = { version = "2.0.0" } thiserror = { version = "2.0.0" }
astral-tl = { version = "0.7.9" } astral-tl = { version = "0.7.9" }
tokio = { version = "1.40.0", features = ["fs", "io-util", "macros", "process", "rt", "signal", "sync"] } tokio = { version = "1.40.0", features = ["fs", "io-util", "macros", "process", "rt", "signal", "sync", "time"] }
tokio-stream = { version = "0.1.16" } tokio-stream = { version = "0.1.16" }
tokio-util = { version = "0.7.12", features = ["compat", "io"] } tokio-util = { version = "0.7.12", features = ["compat", "io"] }
toml = { version = "0.9.2", features = ["fast_hash"] } toml = { version = "0.9.2", features = ["fast_hash"] }

View File

@ -23,7 +23,7 @@ use crate::{
index::{AuthPolicy, Indexes}, index::{AuthPolicy, Indexes},
realm::Realm, realm::Realm,
}; };
use crate::{Index, TextCredentialStore, TomlCredentialError}; use crate::{Index, TextCredentialStore};
/// Cached check for whether we're running in Dependabot. /// Cached check for whether we're running in Dependabot.
static IS_DEPENDABOT: LazyLock<bool> = static IS_DEPENDABOT: LazyLock<bool> =
@ -65,28 +65,35 @@ impl NetrcMode {
/// Strategy for loading text-based credential files. /// Strategy for loading text-based credential files.
enum TextStoreMode { enum TextStoreMode {
Automatic(LazyLock<Option<TextCredentialStore>>), Automatic(tokio::sync::OnceCell<Option<TextCredentialStore>>),
Enabled(TextCredentialStore), Enabled(TextCredentialStore),
Disabled, Disabled,
} }
impl Default for TextStoreMode { impl Default for TextStoreMode {
fn default() -> Self { fn default() -> Self {
// TODO(zanieb): Reconsider this pattern. We're just mirroring the [`NetrcMode`] Self::Automatic(tokio::sync::OnceCell::new())
// implementation for now. }
Self::Automatic(LazyLock::new(|| { }
impl TextStoreMode {
async fn load_default_store() -> Option<TextCredentialStore> {
let path = TextCredentialStore::default_file() let path = TextCredentialStore::default_file()
.inspect_err(|err| { .inspect_err(|err| {
warn!("Failed to determine credentials file path: {}", err); warn!("Failed to determine credentials file path: {}", err);
}) })
.ok()?; .ok()?;
match TextCredentialStore::read(&path) { match TextCredentialStore::read(&path).await {
Ok((store, _lock)) => { Ok((store, _lock)) => {
debug!("Loaded credential file {}", path.display()); debug!("Loaded credential file {}", path.display());
Some(store) Some(store)
} }
Err(TomlCredentialError::Io(err)) if err.kind() == std::io::ErrorKind::NotFound => { Err(err)
if err
.as_io_error()
.is_some_and(|err| err.kind() == std::io::ErrorKind::NotFound) =>
{
debug!("No credentials file found at {}", path.display()); debug!("No credentials file found at {}", path.display());
None None
} }
@ -99,15 +106,14 @@ impl Default for TextStoreMode {
None None
} }
} }
}))
} }
}
impl TextStoreMode {
/// Get the parsed credential store, if enabled. /// Get the parsed credential store, if enabled.
fn get(&self) -> Option<&TextCredentialStore> { async fn get(&self) -> Option<&TextCredentialStore> {
match self { match self {
Self::Automatic(lock) => lock.as_ref(), // TODO(zanieb): Reconsider this pattern. We're just mirroring the [`NetrcMode`]
// implementation for now.
Self::Automatic(lock) => lock.get_or_init(Self::load_default_store).await.as_ref(),
Self::Enabled(store) => Some(store), Self::Enabled(store) => Some(store),
Self::Disabled => None, Self::Disabled => None,
} }
@ -736,7 +742,7 @@ impl AuthMiddleware {
Some(credentials) Some(credentials)
// Text credential store support. // Text credential store support.
} else if let Some(credentials) = self.text_store.get().and_then(|text_store| { } else if let Some(credentials) = self.text_store.get().await.and_then(|text_store| {
debug!("Checking text store for credentials for {url}"); debug!("Checking text store for credentials for {url}");
text_store.get_credentials(url, credentials.as_ref().and_then(|credentials| credentials.username())).cloned() text_store.get_credentials(url, credentials.as_ref().and_then(|credentials| credentials.username())).cloned()
}) { }) {

View File

@ -5,7 +5,7 @@ use fs_err as fs;
use rustc_hash::FxHashMap; use rustc_hash::FxHashMap;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use thiserror::Error; use thiserror::Error;
use uv_fs::{LockedFile, with_added_extension}; use uv_fs::{LockedFile, LockedFileError, LockedFileMode, with_added_extension};
use uv_preview::{Preview, PreviewFeatures}; use uv_preview::{Preview, PreviewFeatures};
use uv_redacted::DisplaySafeUrl; use uv_redacted::DisplaySafeUrl;
@ -28,7 +28,7 @@ pub enum AuthBackend {
} }
impl AuthBackend { impl AuthBackend {
pub fn from_settings(preview: Preview) -> Result<Self, TomlCredentialError> { pub async fn from_settings(preview: Preview) -> Result<Self, TomlCredentialError> {
// If preview is enabled, we'll use the system-native store // If preview is enabled, we'll use the system-native store
if preview.is_enabled(PreviewFeatures::NATIVE_AUTH) { if preview.is_enabled(PreviewFeatures::NATIVE_AUTH) {
return Ok(Self::System(KeyringProvider::native())); return Ok(Self::System(KeyringProvider::native()));
@ -36,12 +36,16 @@ impl AuthBackend {
// Otherwise, we'll use the plaintext credential store // Otherwise, we'll use the plaintext credential store
let path = TextCredentialStore::default_file()?; let path = TextCredentialStore::default_file()?;
match TextCredentialStore::read(&path) { match TextCredentialStore::read(&path).await {
Ok((store, lock)) => Ok(Self::TextStore(store, lock)), Ok((store, lock)) => Ok(Self::TextStore(store, lock)),
Err(TomlCredentialError::Io(err)) if err.kind() == std::io::ErrorKind::NotFound => { Err(err)
if err
.as_io_error()
.is_some_and(|err| err.kind() == std::io::ErrorKind::NotFound) =>
{
Ok(Self::TextStore( Ok(Self::TextStore(
TextCredentialStore::default(), TextCredentialStore::default(),
TextCredentialStore::lock(&path)?, TextCredentialStore::lock(&path).await?,
)) ))
} }
Err(err) => Err(err), Err(err) => Err(err),
@ -69,6 +73,8 @@ pub enum AuthScheme {
pub enum TomlCredentialError { pub enum TomlCredentialError {
#[error(transparent)] #[error(transparent)]
Io(#[from] std::io::Error), Io(#[from] std::io::Error),
#[error(transparent)]
LockedFile(#[from] LockedFileError),
#[error("Failed to parse TOML credential file: {0}")] #[error("Failed to parse TOML credential file: {0}")]
ParseError(#[from] toml::de::Error), ParseError(#[from] toml::de::Error),
#[error("Failed to serialize credentials to TOML")] #[error("Failed to serialize credentials to TOML")]
@ -83,6 +89,21 @@ pub enum TomlCredentialError {
TokenNotUnicode(#[from] std::string::FromUtf8Error), TokenNotUnicode(#[from] std::string::FromUtf8Error),
} }
impl TomlCredentialError {
pub fn as_io_error(&self) -> Option<&std::io::Error> {
match self {
Self::Io(err) => Some(err),
Self::LockedFile(err) => err.as_io_error(),
Self::ParseError(_)
| Self::SerializeError(_)
| Self::BasicAuthError(_)
| Self::BearerAuthError(_)
| Self::CredentialsDirError
| Self::TokenNotUnicode(_) => None,
}
}
}
#[derive(Debug, Error)] #[derive(Debug, Error)]
pub enum BasicAuthError { pub enum BasicAuthError {
#[error("`username` is required with `scheme = basic`")] #[error("`username` is required with `scheme = basic`")]
@ -233,12 +254,12 @@ impl TextCredentialStore {
} }
/// Acquire a lock on the credentials file at the given path. /// Acquire a lock on the credentials file at the given path.
pub fn lock(path: &Path) -> Result<LockedFile, TomlCredentialError> { pub async fn lock(path: &Path) -> Result<LockedFile, TomlCredentialError> {
if let Some(parent) = path.parent() { if let Some(parent) = path.parent() {
fs::create_dir_all(parent)?; fs::create_dir_all(parent)?;
} }
let lock = with_added_extension(path, ".lock"); let lock = with_added_extension(path, ".lock");
Ok(LockedFile::acquire_blocking(lock, "credentials store")?) Ok(LockedFile::acquire(lock, LockedFileMode::Exclusive, "credentials store").await?)
} }
/// Read credentials from a file. /// Read credentials from a file.
@ -269,8 +290,8 @@ impl TextCredentialStore {
/// Returns [`TextCredentialStore`] and a [`LockedFile`] to hold if mutating the store. /// Returns [`TextCredentialStore`] and a [`LockedFile`] to hold if mutating the store.
/// ///
/// If the store will not be written to following the read, the lock can be dropped. /// If the store will not be written to following the read, the lock can be dropped.
pub fn read<P: AsRef<Path>>(path: P) -> Result<(Self, LockedFile), TomlCredentialError> { pub async fn read<P: AsRef<Path>>(path: P) -> Result<(Self, LockedFile), TomlCredentialError> {
let lock = Self::lock(path.as_ref())?; let lock = Self::lock(path.as_ref()).await?;
let store = Self::from_file(path)?; let store = Self::from_file(path)?;
Ok((store, lock)) Ok((store, lock))
} }
@ -450,8 +471,8 @@ mod tests {
assert!(store.get_credentials(&url, None).is_none()); assert!(store.get_credentials(&url, None).is_none());
} }
#[test] #[tokio::test]
fn test_file_operations() { async fn test_file_operations() {
let mut temp_file = NamedTempFile::new().unwrap(); let mut temp_file = NamedTempFile::new().unwrap();
writeln!( writeln!(
temp_file, temp_file,
@ -487,7 +508,7 @@ password = "pass2"
store store
.write( .write(
temp_output.path(), temp_output.path(),
TextCredentialStore::lock(temp_file.path()).unwrap(), TextCredentialStore::lock(temp_file.path()).await.unwrap(),
) )
.unwrap(); .unwrap();

View File

@ -59,7 +59,10 @@ fn setup(manifest: Manifest) -> impl Fn(bool) {
.build() .build()
.unwrap(); .unwrap();
let cache = Cache::from_path("../../.cache").init().unwrap(); let cache = Cache::from_path("../../.cache")
.init_no_wait()
.expect("No cache contention when running benchmarks")
.unwrap();
let interpreter = PythonEnvironment::from_root("../../.venv", &cache) let interpreter = PythonEnvironment::from_root("../../.venv", &cache)
.unwrap() .unwrap()
.into_interpreter(); .into_interpreter();

View File

@ -20,9 +20,11 @@ uv-cache = { workspace = true }
uv-client = { workspace = true } uv-client = { workspace = true }
uv-distribution-filename = { workspace = true } uv-distribution-filename = { workspace = true }
uv-extract = { workspace = true } uv-extract = { workspace = true }
uv-fs = { workspace = true }
uv-pep440 = { workspace = true } uv-pep440 = { workspace = true }
uv-platform = { workspace = true } uv-platform = { workspace = true }
uv-redacted = { workspace = true } uv-redacted = { workspace = true }
fs-err = { workspace = true, features = ["tokio"] } fs-err = { workspace = true, features = ["tokio"] }
futures = { workspace = true } futures = { workspace = true }
reqwest = { workspace = true } reqwest = { workspace = true }

View File

@ -22,6 +22,7 @@ use uv_distribution_filename::SourceDistExtension;
use uv_cache::{Cache, CacheBucket, CacheEntry}; use uv_cache::{Cache, CacheBucket, CacheEntry};
use uv_client::{BaseClient, is_transient_network_error}; use uv_client::{BaseClient, is_transient_network_error};
use uv_extract::{Error as ExtractError, stream}; use uv_extract::{Error as ExtractError, stream};
use uv_fs::LockedFileError;
use uv_pep440::Version; use uv_pep440::Version;
use uv_platform::Platform; use uv_platform::Platform;
use uv_redacted::DisplaySafeUrl; use uv_redacted::DisplaySafeUrl;
@ -135,6 +136,9 @@ pub enum Error {
#[error(transparent)] #[error(transparent)]
Io(#[from] std::io::Error), Io(#[from] std::io::Error),
#[error(transparent)]
LockedFile(#[from] LockedFileError),
#[error("Failed to detect platform")] #[error("Failed to detect platform")]
Platform(#[from] uv_platform::Error), Platform(#[from] uv_platform::Error),

View File

@ -36,7 +36,7 @@ use uv_distribution_types::{
ConfigSettings, ExtraBuildRequirement, ExtraBuildRequires, IndexLocations, Requirement, ConfigSettings, ExtraBuildRequirement, ExtraBuildRequires, IndexLocations, Requirement,
Resolution, Resolution,
}; };
use uv_fs::LockedFile; use uv_fs::{LockedFile, LockedFileMode};
use uv_fs::{PythonExt, Simplified}; use uv_fs::{PythonExt, Simplified};
use uv_normalize::PackageName; use uv_normalize::PackageName;
use uv_pep440::Version; use uv_pep440::Version;
@ -493,7 +493,11 @@ impl SourceBuild {
"uv-setuptools-{}.lock", "uv-setuptools-{}.lock",
cache_digest(&canonical_source_path) cache_digest(&canonical_source_path)
)); ));
source_tree_lock = LockedFile::acquire(lock_path, self.source_tree.to_string_lossy()) source_tree_lock = LockedFile::acquire(
lock_path,
LockedFileMode::Exclusive,
self.source_tree.to_string_lossy(),
)
.await .await
.inspect_err(|err| { .inspect_err(|err| {
warn!("Failed to acquire build lock: {err}"); warn!("Failed to acquire build lock: {err}");

View File

@ -10,7 +10,7 @@ use rustc_hash::FxHashMap;
use tracing::{debug, trace, warn}; use tracing::{debug, trace, warn};
use uv_cache_info::Timestamp; use uv_cache_info::Timestamp;
use uv_fs::{LockedFile, Simplified, cachedir, directories}; use uv_fs::{LockedFile, LockedFileError, LockedFileMode, Simplified, cachedir, directories};
use uv_normalize::PackageName; use uv_normalize::PackageName;
use uv_pypi_types::ResolutionMetadata; use uv_pypi_types::ResolutionMetadata;
@ -80,9 +80,14 @@ impl CacheEntry {
} }
/// Acquire the [`CacheEntry`] as an exclusive lock. /// Acquire the [`CacheEntry`] as an exclusive lock.
pub async fn lock(&self) -> Result<LockedFile, io::Error> { pub async fn lock(&self) -> Result<LockedFile, LockedFileError> {
fs_err::create_dir_all(self.dir())?; fs_err::create_dir_all(self.dir())?;
LockedFile::acquire(self.path(), self.path().display()).await LockedFile::acquire(
self.path(),
LockedFileMode::Exclusive,
self.path().display(),
)
.await
} }
} }
@ -109,9 +114,14 @@ impl CacheShard {
} }
/// Acquire the cache entry as an exclusive lock. /// Acquire the cache entry as an exclusive lock.
pub async fn lock(&self) -> Result<LockedFile, io::Error> { pub async fn lock(&self) -> Result<LockedFile, LockedFileError> {
fs_err::create_dir_all(self.as_ref())?; fs_err::create_dir_all(self.as_ref())?;
LockedFile::acquire(self.join(".lock"), self.display()).await LockedFile::acquire(
self.join(".lock"),
LockedFileMode::Exclusive,
self.display(),
)
.await
} }
/// Return the [`CacheShard`] as a [`PathBuf`]. /// Return the [`CacheShard`] as a [`PathBuf`].
@ -182,7 +192,7 @@ impl Cache {
} }
/// Acquire a lock that allows removing entries from the cache. /// Acquire a lock that allows removing entries from the cache.
pub fn with_exclusive_lock(self) -> Result<Self, io::Error> { pub async fn with_exclusive_lock(self) -> Result<Self, LockedFileError> {
let Self { let Self {
root, root,
refresh, refresh,
@ -198,8 +208,12 @@ impl Cache {
), ),
); );
} }
let lock_file = let lock_file = LockedFile::acquire(
LockedFile::acquire_blocking(root.join(".lock"), root.simplified_display())?; root.join(".lock"),
LockedFileMode::Exclusive,
root.simplified_display(),
)
.await?;
Ok(Self { Ok(Self {
root, root,
@ -220,7 +234,11 @@ impl Cache {
lock_file, lock_file,
} = self; } = self;
match LockedFile::acquire_no_wait(root.join(".lock"), root.simplified_display()) { match LockedFile::acquire_no_wait(
root.join(".lock"),
LockedFileMode::Exclusive,
root.simplified_display(),
) {
Some(lock_file) => Ok(Self { Some(lock_file) => Ok(Self {
root, root,
refresh, refresh,
@ -372,10 +390,8 @@ impl Cache {
self.temp_dir.is_some() self.temp_dir.is_some()
} }
/// Initialize the [`Cache`]. /// Populate the cache scaffold.
pub fn init(self) -> Result<Self, io::Error> { fn create_base_files(root: &PathBuf) -> Result<(), io::Error> {
let root = &self.root;
// Create the cache directory, if it doesn't exist. // Create the cache directory, if it doesn't exist.
fs_err::create_dir_all(root)?; fs_err::create_dir_all(root)?;
@ -421,13 +437,29 @@ impl Cache {
.join(".git"), .join(".git"),
)?; )?;
Ok(())
}
/// Initialize the [`Cache`].
pub async fn init(self) -> Result<Self, LockedFileError> {
let root = &self.root;
Self::create_base_files(root)?;
// Block cache removal operations from interfering. // Block cache removal operations from interfering.
let lock_file = match LockedFile::acquire_shared_blocking( let lock_file = match LockedFile::acquire(
root.join(".lock"), root.join(".lock"),
LockedFileMode::Shared,
root.simplified_display(), root.simplified_display(),
) { )
.await
{
Ok(lock_file) => Some(Arc::new(lock_file)), Ok(lock_file) => Some(Arc::new(lock_file)),
Err(err) if err.kind() == io::ErrorKind::Unsupported => { Err(err)
if err
.as_io_error()
.is_some_and(|err| err.kind() == io::ErrorKind::Unsupported) =>
{
warn!( warn!(
"Shared locking is not supported by the current platform or filesystem, \ "Shared locking is not supported by the current platform or filesystem, \
reduced parallel process safety with `uv cache clean` and `uv cache prune`." reduced parallel process safety with `uv cache clean` and `uv cache prune`."
@ -444,6 +476,27 @@ impl Cache {
}) })
} }
/// Initialize the [`Cache`], assuming that there are no other uv processes running.
pub fn init_no_wait(self) -> Result<Option<Self>, io::Error> {
let root = &self.root;
Self::create_base_files(root)?;
// Block cache removal operations from interfering.
let Some(lock_file) = LockedFile::acquire_no_wait(
root.join(".lock"),
LockedFileMode::Shared,
root.simplified_display(),
) else {
return Ok(None);
};
Ok(Some(Self {
root: std::path::absolute(root)?,
lock_file: Some(Arc::new(lock_file)),
..self
}))
}
/// Clear the cache, removing all entries. /// Clear the cache, removing all entries.
pub fn clear(self, reporter: Box<dyn CleanReporter>) -> Result<Removal, io::Error> { pub fn clear(self, reporter: Box<dyn CleanReporter>) -> Result<Removal, io::Error> {
// Remove everything but `.lock`, Windows does not allow removal of a locked file // Remove everything but `.lock`, Windows does not allow removal of a locked file

View File

@ -6,6 +6,7 @@ use std::ops::Deref;
use std::path::PathBuf; use std::path::PathBuf;
use uv_distribution_filename::{WheelFilename, WheelFilenameError}; use uv_distribution_filename::{WheelFilename, WheelFilenameError};
use uv_fs::LockedFileError;
use uv_normalize::PackageName; use uv_normalize::PackageName;
use uv_redacted::DisplaySafeUrl; use uv_redacted::DisplaySafeUrl;
@ -337,6 +338,9 @@ pub enum ErrorKind {
#[error("Failed to write to the client cache")] #[error("Failed to write to the client cache")]
CacheWrite(#[source] std::io::Error), CacheWrite(#[source] std::io::Error),
#[error("Failed to acquire lock on the client cache")]
CacheLock(#[source] LockedFileError),
#[error(transparent)] #[error(transparent)]
Io(std::io::Error), Io(std::io::Error),

View File

@ -539,7 +539,7 @@ impl RegistryClient {
#[cfg(windows)] #[cfg(windows)]
let _lock = { let _lock = {
let lock_entry = cache_entry.with_file(format!("{package_name}.lock")); let lock_entry = cache_entry.with_file(format!("{package_name}.lock"));
lock_entry.lock().await.map_err(ErrorKind::CacheWrite)? lock_entry.lock().await.map_err(ErrorKind::CacheLock)?
}; };
let result = if matches!(index, IndexUrl::Path(_)) { let result = if matches!(index, IndexUrl::Path(_)) {
@ -1031,7 +1031,7 @@ impl RegistryClient {
#[cfg(windows)] #[cfg(windows)]
let _lock = { let _lock = {
let lock_entry = cache_entry.with_file(format!("{}.lock", filename.stem())); let lock_entry = cache_entry.with_file(format!("{}.lock", filename.stem()));
lock_entry.lock().await.map_err(ErrorKind::CacheWrite)? lock_entry.lock().await.map_err(ErrorKind::CacheLock)?
}; };
let response_callback = async |response: Response| { let response_callback = async |response: Response| {
@ -1115,7 +1115,7 @@ impl RegistryClient {
#[cfg(windows)] #[cfg(windows)]
let _lock = { let _lock = {
let lock_entry = cache_entry.with_file(format!("{}.lock", filename.stem())); let lock_entry = cache_entry.with_file(format!("{}.lock", filename.stem()));
lock_entry.lock().await.map_err(ErrorKind::CacheWrite)? lock_entry.lock().await.map_err(ErrorKind::CacheLock)?
}; };
// Attempt to fetch via a range request. // Attempt to fetch via a range request.

View File

@ -11,7 +11,7 @@ use uv_redacted::DisplaySafeUrl;
#[tokio::test] #[tokio::test]
async fn remote_metadata_with_and_without_cache() -> Result<()> { async fn remote_metadata_with_and_without_cache() -> Result<()> {
let cache = Cache::temp()?.init()?; let cache = Cache::temp()?.init().await?;
let client = RegistryClientBuilder::new(BaseClientBuilder::default(), cache).build(); let client = RegistryClientBuilder::new(BaseClientBuilder::default(), cache).build();
// The first run is without cache (the tempdir is empty), the second has the cache from the // The first run is without cache (the tempdir is empty), the second has the cache from the

View File

@ -84,7 +84,7 @@ async fn ssl_env_vars() -> Result<()> {
} }
let (server_task, addr) = start_https_user_agent_server(&standalone_server_cert).await?; let (server_task, addr) = start_https_user_agent_server(&standalone_server_cert).await?;
let url = DisplaySafeUrl::from_str(&format!("https://{addr}"))?; let url = DisplaySafeUrl::from_str(&format!("https://{addr}"))?;
let cache = Cache::temp()?.init()?; let cache = Cache::temp()?.init().await?;
let client = RegistryClientBuilder::new(BaseClientBuilder::default(), cache).build(); let client = RegistryClientBuilder::new(BaseClientBuilder::default(), cache).build();
let res = client let res = client
.cached_client() .cached_client()
@ -142,7 +142,7 @@ async fn ssl_env_vars() -> Result<()> {
} }
let (server_task, addr) = start_https_user_agent_server(&standalone_server_cert).await?; let (server_task, addr) = start_https_user_agent_server(&standalone_server_cert).await?;
let url = DisplaySafeUrl::from_str(&format!("https://{addr}"))?; let url = DisplaySafeUrl::from_str(&format!("https://{addr}"))?;
let cache = Cache::temp()?.init()?; let cache = Cache::temp()?.init().await?;
let client = RegistryClientBuilder::new(BaseClientBuilder::default(), cache).build(); let client = RegistryClientBuilder::new(BaseClientBuilder::default(), cache).build();
let res = client let res = client
.cached_client() .cached_client()
@ -171,7 +171,7 @@ async fn ssl_env_vars() -> Result<()> {
} }
let (server_task, addr) = start_https_user_agent_server(&standalone_server_cert).await?; let (server_task, addr) = start_https_user_agent_server(&standalone_server_cert).await?;
let url = DisplaySafeUrl::from_str(&format!("https://{addr}"))?; let url = DisplaySafeUrl::from_str(&format!("https://{addr}"))?;
let cache = Cache::temp()?.init()?; let cache = Cache::temp()?.init().await?;
let client = RegistryClientBuilder::new(BaseClientBuilder::default(), cache).build(); let client = RegistryClientBuilder::new(BaseClientBuilder::default(), cache).build();
let res = client let res = client
.cached_client() .cached_client()
@ -194,7 +194,7 @@ async fn ssl_env_vars() -> Result<()> {
} }
let (server_task, addr) = start_https_user_agent_server(&standalone_server_cert).await?; let (server_task, addr) = start_https_user_agent_server(&standalone_server_cert).await?;
let url = DisplaySafeUrl::from_str(&format!("https://{addr}"))?; let url = DisplaySafeUrl::from_str(&format!("https://{addr}"))?;
let cache = Cache::temp()?.init()?; let cache = Cache::temp()?.init().await?;
let client = RegistryClientBuilder::new(BaseClientBuilder::default(), cache).build(); let client = RegistryClientBuilder::new(BaseClientBuilder::default(), cache).build();
let res = client let res = client
.cached_client() .cached_client()
@ -259,7 +259,7 @@ async fn ssl_env_vars() -> Result<()> {
} }
let (server_task, addr) = start_https_mtls_user_agent_server(&ca_cert, &server_cert).await?; let (server_task, addr) = start_https_mtls_user_agent_server(&ca_cert, &server_cert).await?;
let url = DisplaySafeUrl::from_str(&format!("https://{addr}"))?; let url = DisplaySafeUrl::from_str(&format!("https://{addr}"))?;
let cache = Cache::temp()?.init()?; let cache = Cache::temp()?.init().await?;
let client = RegistryClientBuilder::new(BaseClientBuilder::default(), cache).build(); let client = RegistryClientBuilder::new(BaseClientBuilder::default(), cache).build();
let res = client let res = client
.cached_client() .cached_client()
@ -283,7 +283,7 @@ async fn ssl_env_vars() -> Result<()> {
} }
let (server_task, addr) = start_https_mtls_user_agent_server(&ca_cert, &server_cert).await?; let (server_task, addr) = start_https_mtls_user_agent_server(&ca_cert, &server_cert).await?;
let url = DisplaySafeUrl::from_str(&format!("https://{addr}"))?; let url = DisplaySafeUrl::from_str(&format!("https://{addr}"))?;
let cache = Cache::temp()?.init()?; let cache = Cache::temp()?.init().await?;
let client = RegistryClientBuilder::new(BaseClientBuilder::default(), cache).build(); let client = RegistryClientBuilder::new(BaseClientBuilder::default(), cache).build();
let res = client let res = client
.cached_client() .cached_client()

View File

@ -20,7 +20,7 @@ async fn test_user_agent_has_version() -> Result<()> {
let (server_task, addr) = start_http_user_agent_server().await?; let (server_task, addr) = start_http_user_agent_server().await?;
// Initialize uv-client // Initialize uv-client
let cache = Cache::temp()?.init()?; let cache = Cache::temp()?.init().await?;
let client = RegistryClientBuilder::new(BaseClientBuilder::default(), cache).build(); let client = RegistryClientBuilder::new(BaseClientBuilder::default(), cache).build();
// Send request to our dummy server // Send request to our dummy server
@ -75,7 +75,7 @@ async fn test_user_agent_has_subcommand() -> Result<()> {
let (server_task, addr) = start_http_user_agent_server().await?; let (server_task, addr) = start_http_user_agent_server().await?;
// Initialize uv-client // Initialize uv-client
let cache = Cache::temp()?.init()?; let cache = Cache::temp()?.init().await?;
let client = RegistryClientBuilder::new( let client = RegistryClientBuilder::new(
BaseClientBuilder::default().subcommand(vec!["foo".to_owned(), "bar".to_owned()]), BaseClientBuilder::default().subcommand(vec!["foo".to_owned(), "bar".to_owned()]),
cache, cache,
@ -152,7 +152,7 @@ async fn test_user_agent_has_linehaul() -> Result<()> {
})?; })?;
// Initialize uv-client // Initialize uv-client
let cache = Cache::temp()?.init()?; let cache = Cache::temp()?.init().await?;
let mut builder = let mut builder =
RegistryClientBuilder::new(BaseClientBuilder::default(), cache).markers(&markers); RegistryClientBuilder::new(BaseClientBuilder::default(), cache).markers(&markers);

View File

@ -18,7 +18,7 @@ pub(crate) struct CompileArgs {
} }
pub(crate) async fn compile(args: CompileArgs) -> anyhow::Result<()> { pub(crate) async fn compile(args: CompileArgs) -> anyhow::Result<()> {
let cache = Cache::try_from(args.cache_args)?.init()?; let cache = Cache::try_from(args.cache_args)?.init().await?;
let interpreter = if let Some(python) = args.python { let interpreter = if let Some(python) = args.python {
python python

View File

@ -19,7 +19,7 @@ pub(crate) async fn list_packages(
args: ListPackagesArgs, args: ListPackagesArgs,
environment: EnvironmentOptions, environment: EnvironmentOptions,
) -> Result<()> { ) -> Result<()> {
let cache = Cache::try_from(args.cache_args)?.init()?; let cache = Cache::try_from(args.cache_args)?.init().await?;
let client = RegistryClientBuilder::new( let client = RegistryClientBuilder::new(
BaseClientBuilder::default().timeout(environment.http_timeout), BaseClientBuilder::default().timeout(environment.http_timeout),
cache, cache,

View File

@ -22,7 +22,7 @@ pub(crate) async fn validate_zip(
args: ValidateZipArgs, args: ValidateZipArgs,
environment: EnvironmentOptions, environment: EnvironmentOptions,
) -> Result<()> { ) -> Result<()> {
let cache = Cache::try_from(args.cache_args)?.init()?; let cache = Cache::try_from(args.cache_args)?.init().await?;
let client = RegistryClientBuilder::new( let client = RegistryClientBuilder::new(
BaseClientBuilder::default().timeout(environment.http_timeout), BaseClientBuilder::default().timeout(environment.http_timeout),
cache, cache,

View File

@ -23,7 +23,7 @@ pub(crate) async fn wheel_metadata(
args: WheelMetadataArgs, args: WheelMetadataArgs,
environment: EnvironmentOptions, environment: EnvironmentOptions,
) -> Result<()> { ) -> Result<()> {
let cache = Cache::try_from(args.cache_args)?.init()?; let cache = Cache::try_from(args.cache_args)?.init().await?;
let client = RegistryClientBuilder::new( let client = RegistryClientBuilder::new(
BaseClientBuilder::default().timeout(environment.http_timeout), BaseClientBuilder::default().timeout(environment.http_timeout),
cache, cache,

View File

@ -395,7 +395,7 @@ impl<'a, Context: BuildContext> DistributionDatabase<'a, Context> {
built_wheel.target.file_name().unwrap().to_str().unwrap() built_wheel.target.file_name().unwrap().to_str().unwrap()
), ),
); );
lock_entry.lock().await.map_err(Error::CacheWrite)? lock_entry.lock().await.map_err(Error::CacheLock)?
}; };
// If the wheel was unzipped previously, respect it. Source distributions are // If the wheel was unzipped previously, respect it. Source distributions are
@ -578,7 +578,7 @@ impl<'a, Context: BuildContext> DistributionDatabase<'a, Context> {
#[cfg(windows)] #[cfg(windows)]
let _lock = { let _lock = {
let lock_entry = wheel_entry.with_file(format!("{}.lock", filename.stem())); let lock_entry = wheel_entry.with_file(format!("{}.lock", filename.stem()));
lock_entry.lock().await.map_err(Error::CacheWrite)? lock_entry.lock().await.map_err(Error::CacheLock)?
}; };
// Create an entry for the HTTP cache. // Create an entry for the HTTP cache.
@ -749,7 +749,7 @@ impl<'a, Context: BuildContext> DistributionDatabase<'a, Context> {
#[cfg(windows)] #[cfg(windows)]
let _lock = { let _lock = {
let lock_entry = wheel_entry.with_file(format!("{}.lock", filename.stem())); let lock_entry = wheel_entry.with_file(format!("{}.lock", filename.stem()));
lock_entry.lock().await.map_err(Error::CacheWrite)? lock_entry.lock().await.map_err(Error::CacheLock)?
}; };
// Create an entry for the HTTP cache. // Create an entry for the HTTP cache.
@ -951,7 +951,7 @@ impl<'a, Context: BuildContext> DistributionDatabase<'a, Context> {
#[cfg(windows)] #[cfg(windows)]
let _lock = { let _lock = {
let lock_entry = wheel_entry.with_file(format!("{}.lock", filename.stem())); let lock_entry = wheel_entry.with_file(format!("{}.lock", filename.stem()));
lock_entry.lock().await.map_err(Error::CacheWrite)? lock_entry.lock().await.map_err(Error::CacheLock)?
}; };
// Determine the last-modified time of the wheel. // Determine the last-modified time of the wheel.

View File

@ -8,7 +8,7 @@ use crate::metadata::MetadataError;
use uv_client::WrappedReqwestError; use uv_client::WrappedReqwestError;
use uv_distribution_filename::WheelFilenameError; use uv_distribution_filename::WheelFilenameError;
use uv_distribution_types::{InstalledDist, InstalledDistError, IsBuildBackendError}; use uv_distribution_types::{InstalledDist, InstalledDistError, IsBuildBackendError};
use uv_fs::Simplified; use uv_fs::{LockedFileError, Simplified};
use uv_git::GitError; use uv_git::GitError;
use uv_normalize::PackageName; use uv_normalize::PackageName;
use uv_pep440::{Version, VersionSpecifiers}; use uv_pep440::{Version, VersionSpecifiers};
@ -40,6 +40,8 @@ pub enum Error {
CacheRead(#[source] std::io::Error), CacheRead(#[source] std::io::Error),
#[error("Failed to write to the distribution cache")] #[error("Failed to write to the distribution cache")]
CacheWrite(#[source] std::io::Error), CacheWrite(#[source] std::io::Error),
#[error("Failed to acquire lock on the distribution cache")]
CacheLock(#[source] LockedFileError),
#[error("Failed to deserialize cache entry")] #[error("Failed to deserialize cache entry")]
CacheDecode(#[from] rmp_serde::decode::Error), CacheDecode(#[from] rmp_serde::decode::Error),
#[error("Failed to serialize cache entry")] #[error("Failed to serialize cache entry")]

View File

@ -457,7 +457,7 @@ impl<'a, T: BuildContext> SourceDistributionBuilder<'a, T> {
hashes: HashPolicy<'_>, hashes: HashPolicy<'_>,
client: &ManagedClient<'_>, client: &ManagedClient<'_>,
) -> Result<BuiltWheelMetadata, Error> { ) -> Result<BuiltWheelMetadata, Error> {
let _lock = cache_shard.lock().await.map_err(Error::CacheWrite)?; let _lock = cache_shard.lock().await.map_err(Error::CacheLock)?;
// Fetch the revision for the source distribution. // Fetch the revision for the source distribution.
let revision = self let revision = self
@ -587,7 +587,7 @@ impl<'a, T: BuildContext> SourceDistributionBuilder<'a, T> {
hashes: HashPolicy<'_>, hashes: HashPolicy<'_>,
client: &ManagedClient<'_>, client: &ManagedClient<'_>,
) -> Result<ArchiveMetadata, Error> { ) -> Result<ArchiveMetadata, Error> {
let _lock = cache_shard.lock().await.map_err(Error::CacheWrite)?; let _lock = cache_shard.lock().await.map_err(Error::CacheLock)?;
// Fetch the revision for the source distribution. // Fetch the revision for the source distribution.
let revision = self let revision = self
@ -859,7 +859,7 @@ impl<'a, T: BuildContext> SourceDistributionBuilder<'a, T> {
tags: &Tags, tags: &Tags,
hashes: HashPolicy<'_>, hashes: HashPolicy<'_>,
) -> Result<BuiltWheelMetadata, Error> { ) -> Result<BuiltWheelMetadata, Error> {
let _lock = cache_shard.lock().await.map_err(Error::CacheWrite)?; let _lock = cache_shard.lock().await.map_err(Error::CacheLock)?;
// Fetch the revision for the source distribution. // Fetch the revision for the source distribution.
let LocalRevisionPointer { let LocalRevisionPointer {
@ -964,7 +964,7 @@ impl<'a, T: BuildContext> SourceDistributionBuilder<'a, T> {
cache_shard: &CacheShard, cache_shard: &CacheShard,
hashes: HashPolicy<'_>, hashes: HashPolicy<'_>,
) -> Result<ArchiveMetadata, Error> { ) -> Result<ArchiveMetadata, Error> {
let _lock = cache_shard.lock().await.map_err(Error::CacheWrite)?; let _lock = cache_shard.lock().await.map_err(Error::CacheLock)?;
// Fetch the revision for the source distribution. // Fetch the revision for the source distribution.
let LocalRevisionPointer { revision, .. } = self let LocalRevisionPointer { revision, .. } = self
@ -1185,7 +1185,7 @@ impl<'a, T: BuildContext> SourceDistributionBuilder<'a, T> {
); );
// Acquire the advisory lock. // Acquire the advisory lock.
let _lock = cache_shard.lock().await.map_err(Error::CacheWrite)?; let _lock = cache_shard.lock().await.map_err(Error::CacheLock)?;
// Fetch the revision for the source distribution. // Fetch the revision for the source distribution.
let LocalRevisionPointer { let LocalRevisionPointer {
@ -1309,7 +1309,7 @@ impl<'a, T: BuildContext> SourceDistributionBuilder<'a, T> {
); );
// Acquire the advisory lock. // Acquire the advisory lock.
let _lock = cache_shard.lock().await.map_err(Error::CacheWrite)?; let _lock = cache_shard.lock().await.map_err(Error::CacheLock)?;
// Fetch the revision for the source distribution. // Fetch the revision for the source distribution.
let LocalRevisionPointer { revision, .. } = self let LocalRevisionPointer { revision, .. } = self
@ -1614,7 +1614,7 @@ impl<'a, T: BuildContext> SourceDistributionBuilder<'a, T> {
let metadata_entry = cache_shard.entry(METADATA); let metadata_entry = cache_shard.entry(METADATA);
// Acquire the advisory lock. // Acquire the advisory lock.
let _lock = cache_shard.lock().await.map_err(Error::CacheWrite)?; let _lock = cache_shard.lock().await.map_err(Error::CacheLock)?;
// We don't track any cache information for Git-based source distributions; they're assumed // We don't track any cache information for Git-based source distributions; they're assumed
// to be immutable. // to be immutable.
@ -1829,7 +1829,7 @@ impl<'a, T: BuildContext> SourceDistributionBuilder<'a, T> {
let metadata_entry = cache_shard.entry(METADATA); let metadata_entry = cache_shard.entry(METADATA);
// Acquire the advisory lock. // Acquire the advisory lock.
let _lock = cache_shard.lock().await.map_err(Error::CacheWrite)?; let _lock = cache_shard.lock().await.map_err(Error::CacheLock)?;
let path = if let Some(subdirectory) = resource.subdirectory { let path = if let Some(subdirectory) = resource.subdirectory {
Cow::Owned(fetch.path().join(subdirectory)) Cow::Owned(fetch.path().join(subdirectory))

View File

@ -16,6 +16,8 @@ doctest = false
workspace = true workspace = true
[dependencies] [dependencies]
uv-static = { workspace = true }
dunce = { workspace = true } dunce = { workspace = true }
either = { workspace = true } either = { workspace = true }
encoding_rs_io = { workspace = true } encoding_rs_io = { workspace = true }
@ -26,6 +28,7 @@ same-file = { workspace = true }
schemars = { workspace = true, optional = true } schemars = { workspace = true, optional = true }
serde = { workspace = true, optional = true } serde = { workspace = true, optional = true }
tempfile = { workspace = true } tempfile = { workspace = true }
thiserror = { workspace = true }
tokio = { workspace = true, optional = true } tokio = { workspace = true, optional = true }
tracing = { workspace = true } tracing = { workspace = true }

View File

@ -1,13 +1,14 @@
use std::borrow::Cow; use std::borrow::Cow;
use std::fmt::Display;
use std::path::{Path, PathBuf}; use std::path::{Path, PathBuf};
use tempfile::NamedTempFile; use tempfile::NamedTempFile;
use tracing::{debug, error, info, trace, warn}; use tracing::warn;
pub use crate::locked_file::*;
pub use crate::path::*; pub use crate::path::*;
pub mod cachedir; pub mod cachedir;
mod locked_file;
mod path; mod path;
pub mod which; pub mod which;
@ -666,218 +667,6 @@ fn is_known_already_locked_error(err: &std::fs::TryLockError) -> bool {
} }
} }
/// A file lock that is automatically released when dropped.
#[derive(Debug)]
#[must_use]
pub struct LockedFile(fs_err::File);
impl LockedFile {
/// Inner implementation for [`LockedFile::acquire_blocking`] and [`LockedFile::acquire`].
fn lock_file_blocking(file: fs_err::File, resource: &str) -> Result<Self, std::io::Error> {
trace!(
"Checking lock for `{resource}` at `{}`",
file.path().user_display()
);
match file.file().try_lock() {
Ok(()) => {
debug!("Acquired lock for `{resource}`");
Ok(Self(file))
}
Err(err) => {
// Log error code and enum kind to help debugging more exotic failures.
if !is_known_already_locked_error(&err) {
debug!("Try lock error: {err:?}");
}
info!(
"Waiting to acquire lock for `{resource}` at `{}`",
file.path().user_display(),
);
file.lock()?;
debug!("Acquired lock for `{resource}`");
Ok(Self(file))
}
}
}
/// Inner implementation for [`LockedFile::acquire_no_wait`].
fn lock_file_no_wait(file: fs_err::File, resource: &str) -> Option<Self> {
trace!(
"Checking lock for `{resource}` at `{}`",
file.path().user_display()
);
match file.try_lock() {
Ok(()) => {
debug!("Acquired lock for `{resource}`");
Some(Self(file))
}
Err(err) => {
// Log error code and enum kind to help debugging more exotic failures.
if !is_known_already_locked_error(&err) {
debug!("Try lock error: {err:?}");
}
debug!("Lock is busy for `{resource}`");
None
}
}
}
/// Inner implementation for [`LockedFile::acquire_shared_blocking`] and
/// [`LockedFile::acquire_blocking`].
fn lock_file_shared_blocking(
file: fs_err::File,
resource: &str,
) -> Result<Self, std::io::Error> {
trace!(
"Checking shared lock for `{resource}` at `{}`",
file.path().user_display()
);
match file.try_lock_shared() {
Ok(()) => {
debug!("Acquired shared lock for `{resource}`");
Ok(Self(file))
}
Err(err) => {
// Log error code and enum kind to help debugging more exotic failures.
if !is_known_already_locked_error(&err) {
debug!("Try lock error: {err:?}");
}
info!(
"Waiting to acquire shared lock for `{resource}` at `{}`",
file.path().user_display(),
);
file.lock_shared()?;
debug!("Acquired shared lock for `{resource}`");
Ok(Self(file))
}
}
}
/// The same as [`LockedFile::acquire`], but for synchronous contexts.
///
/// Do not use from an async context, as this can block the runtime while waiting for another
/// process to release the lock.
pub fn acquire_blocking(
path: impl AsRef<Path>,
resource: impl Display,
) -> Result<Self, std::io::Error> {
let file = Self::create(path)?;
let resource = resource.to_string();
Self::lock_file_blocking(file, &resource)
}
/// The same as [`LockedFile::acquire_blocking`], but for synchronous contexts.
///
/// Do not use from an async context, as this can block the runtime while waiting for another
/// process to release the lock.
pub fn acquire_shared_blocking(
path: impl AsRef<Path>,
resource: impl Display,
) -> Result<Self, std::io::Error> {
let file = Self::create(path)?;
let resource = resource.to_string();
Self::lock_file_shared_blocking(file, &resource)
}
/// Acquire a cross-process lock for a resource using a file at the provided path.
#[cfg(feature = "tokio")]
pub async fn acquire(
path: impl AsRef<Path>,
resource: impl Display,
) -> Result<Self, std::io::Error> {
let file = Self::create(path)?;
let resource = resource.to_string();
tokio::task::spawn_blocking(move || Self::lock_file_blocking(file, &resource)).await?
}
/// Acquire a cross-process read lock for a shared resource using a file at the provided path.
#[cfg(feature = "tokio")]
pub async fn acquire_shared(
path: impl AsRef<Path>,
resource: impl Display,
) -> Result<Self, std::io::Error> {
let file = Self::create(path)?;
let resource = resource.to_string();
tokio::task::spawn_blocking(move || Self::lock_file_shared_blocking(file, &resource))
.await?
}
/// Acquire a cross-process lock for a resource using a file at the provided path
///
/// Unlike [`LockedFile::acquire`] this function will not wait for the lock to become available.
///
/// If the lock is not immediately available, [`None`] is returned.
pub fn acquire_no_wait(path: impl AsRef<Path>, resource: impl Display) -> Option<Self> {
let file = Self::create(path).ok()?;
let resource = resource.to_string();
Self::lock_file_no_wait(file, &resource)
}
#[cfg(unix)]
fn create(path: impl AsRef<Path>) -> Result<fs_err::File, std::io::Error> {
use std::os::unix::fs::PermissionsExt;
// If path already exists, return it.
if let Ok(file) = fs_err::OpenOptions::new()
.read(true)
.write(true)
.open(path.as_ref())
{
return Ok(file);
}
// Otherwise, create a temporary file with 666 permissions. We must set
// permissions _after_ creating the file, to override the `umask`.
let file = if let Some(parent) = path.as_ref().parent() {
NamedTempFile::new_in(parent)?
} else {
NamedTempFile::new()?
};
if let Err(err) = file
.as_file()
.set_permissions(std::fs::Permissions::from_mode(0o666))
{
warn!("Failed to set permissions on temporary file: {err}");
}
// Try to move the file to path, but if path exists now, just open path
match file.persist_noclobber(path.as_ref()) {
Ok(file) => Ok(fs_err::File::from_parts(file, path.as_ref())),
Err(err) => {
if err.error.kind() == std::io::ErrorKind::AlreadyExists {
fs_err::OpenOptions::new()
.read(true)
.write(true)
.open(path.as_ref())
} else {
Err(err.error)
}
}
}
}
#[cfg(not(unix))]
fn create(path: impl AsRef<Path>) -> std::io::Result<fs_err::File> {
fs_err::OpenOptions::new()
.read(true)
.write(true)
.create(true)
.open(path.as_ref())
}
}
impl Drop for LockedFile {
fn drop(&mut self) {
if let Err(err) = self.0.unlock() {
error!(
"Failed to unlock resource at `{}`; program may be stuck: {err}",
self.0.path().display()
);
} else {
debug!("Released lock at `{}`", self.0.path().display());
}
}
}
/// An asynchronous reader that reports progress as bytes are read. /// An asynchronous reader that reports progress as bytes are read.
#[cfg(feature = "tokio")] #[cfg(feature = "tokio")]
pub struct ProgressReader<Reader: tokio::io::AsyncRead + Unpin, Callback: Fn(usize) + Unpin> { pub struct ProgressReader<Reader: tokio::io::AsyncRead + Unpin, Callback: Fn(usize) + Unpin> {

View File

@ -0,0 +1,291 @@
use std::fmt::Display;
use std::path::{Path, PathBuf};
use std::sync::LazyLock;
use std::time::Duration;
use std::{env, io};
use thiserror::Error;
use tracing::{debug, error, info, trace, warn};
use uv_static::EnvVars;
use crate::{Simplified, is_known_already_locked_error};
/// Parsed value of `UV_LOCK_TIMEOUT`, with a default of 5 min.
static LOCK_TIMEOUT: LazyLock<Duration> = LazyLock::new(|| {
let default_timeout = Duration::from_secs(300);
let Some(lock_timeout) = env::var_os(EnvVars::UV_LOCK_TIMEOUT) else {
return default_timeout;
};
if let Some(lock_timeout) = lock_timeout
.to_str()
.and_then(|lock_timeout| lock_timeout.parse::<u64>().ok())
{
Duration::from_secs(lock_timeout)
} else {
warn!(
"Could not parse value of {} as integer: {:?}",
EnvVars::UV_LOCK_TIMEOUT,
lock_timeout
);
default_timeout
}
});
#[derive(Debug, Error)]
pub enum LockedFileError {
#[error(
"Timeout ({}s) when waiting for lock on `{}` at `{}`, is another uv process running? You can set `{}` to increase the timeout.",
timeout.as_secs(),
resource,
path.user_display(),
EnvVars::UV_LOCK_TIMEOUT
)]
Timeout {
timeout: Duration,
resource: String,
path: PathBuf,
},
#[error(
"Could not acquire lock for `{}` at `{}`",
resource,
path.user_display()
)]
Lock {
resource: String,
path: PathBuf,
#[source]
source: io::Error,
},
#[error(transparent)]
Io(#[from] io::Error),
#[error(transparent)]
#[cfg(feature = "tokio")]
JoinError(#[from] tokio::task::JoinError),
}
impl LockedFileError {
pub fn as_io_error(&self) -> Option<&io::Error> {
match self {
Self::Timeout { .. } => None,
#[cfg(feature = "tokio")]
Self::JoinError(_) => None,
Self::Lock { source, .. } => Some(source),
Self::Io(err) => Some(err),
}
}
}
/// Whether to acquire a shared (read) lock or exclusive (write) lock.
#[derive(Debug, Clone, Copy)]
pub enum LockedFileMode {
Shared,
Exclusive,
}
impl LockedFileMode {
/// Try to lock the file and return an error if the lock is already acquired by another process
/// and cannot be acquired immediately.
fn try_lock(self, file: &fs_err::File) -> Result<(), std::fs::TryLockError> {
match self {
Self::Exclusive => file.try_lock()?,
Self::Shared => file.try_lock_shared()?,
}
Ok(())
}
/// Lock the file, blocking until the lock becomes available if necessary.
fn lock(self, file: &fs_err::File) -> Result<(), io::Error> {
match self {
Self::Exclusive => file.lock()?,
Self::Shared => file.lock_shared()?,
}
Ok(())
}
}
impl Display for LockedFileMode {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Shared => write!(f, "shared"),
Self::Exclusive => write!(f, "exclusive"),
}
}
}
/// A file lock that is automatically released when dropped.
#[cfg(feature = "tokio")]
#[derive(Debug)]
#[must_use]
pub struct LockedFile(fs_err::File);
#[cfg(feature = "tokio")]
impl LockedFile {
/// Inner implementation for [`LockedFile::acquire`].
async fn lock_file(
file: fs_err::File,
mode: LockedFileMode,
resource: &str,
) -> Result<Self, LockedFileError> {
trace!(
"Checking lock for `{resource}` at `{}`",
file.path().user_display()
);
// If there's no contention, return directly.
let try_lock_exclusive = tokio::task::spawn_blocking(move || (mode.try_lock(&file), file));
let file = match try_lock_exclusive.await? {
(Ok(()), file) => {
debug!("Acquired {mode} lock for `{resource}`");
return Ok(Self(file));
}
(Err(err), file) => {
// Log error code and enum kind to help debugging more exotic failures.
if !is_known_already_locked_error(&err) {
debug!("Try lock {mode} error: {err:?}");
}
file
}
};
// If there's lock contention, wait and break deadlocks with a timeout if necessary.
info!(
"Waiting to acquire {mode} lock for `{resource}` at `{}`",
file.path().user_display(),
);
let path = file.path().to_path_buf();
let lock_exclusive = tokio::task::spawn_blocking(move || (mode.lock(&file), file));
let (result, file) = tokio::time::timeout(*LOCK_TIMEOUT, lock_exclusive)
.await
.map_err(|_| LockedFileError::Timeout {
timeout: *LOCK_TIMEOUT,
resource: resource.to_string(),
path: path.clone(),
})??;
// Not an fs_err method, we need to build our own path context
result.map_err(|err| LockedFileError::Lock {
resource: resource.to_string(),
path,
source: err,
})?;
debug!("Acquired {mode} lock for `{resource}`");
Ok(Self(file))
}
/// Inner implementation for [`LockedFile::acquire_no_wait`].
fn lock_file_no_wait(file: fs_err::File, mode: LockedFileMode, resource: &str) -> Option<Self> {
trace!(
"Checking lock for `{resource}` at `{}`",
file.path().user_display()
);
match mode.try_lock(&file) {
Ok(()) => {
debug!("Acquired {mode} lock for `{resource}`");
Some(Self(file))
}
Err(err) => {
// Log error code and enum kind to help debugging more exotic failures.
if !is_known_already_locked_error(&err) {
debug!("Try lock error: {err:?}");
}
debug!("Lock is busy for `{resource}`");
None
}
}
}
/// Acquire a cross-process lock for a resource using a file at the provided path.
pub async fn acquire(
path: impl AsRef<Path>,
mode: LockedFileMode,
resource: impl Display,
) -> Result<Self, LockedFileError> {
let file = Self::create(path)?;
let resource = resource.to_string();
Self::lock_file(file, mode, &resource).await
}
/// Acquire a cross-process lock for a resource using a file at the provided path
///
/// Unlike [`LockedFile::acquire`] this function will not wait for the lock to become available.
///
/// If the lock is not immediately available, [`None`] is returned.
pub fn acquire_no_wait(
path: impl AsRef<Path>,
mode: LockedFileMode,
resource: impl Display,
) -> Option<Self> {
let file = Self::create(path).ok()?;
let resource = resource.to_string();
Self::lock_file_no_wait(file, mode, &resource)
}
#[cfg(unix)]
fn create(path: impl AsRef<Path>) -> Result<fs_err::File, std::io::Error> {
use std::os::unix::fs::PermissionsExt;
use tempfile::NamedTempFile;
// If path already exists, return it.
if let Ok(file) = fs_err::OpenOptions::new()
.read(true)
.write(true)
.open(path.as_ref())
{
return Ok(file);
}
// Otherwise, create a temporary file with 666 permissions. We must set
// permissions _after_ creating the file, to override the `umask`.
let file = if let Some(parent) = path.as_ref().parent() {
NamedTempFile::new_in(parent)?
} else {
NamedTempFile::new()?
};
if let Err(err) = file
.as_file()
.set_permissions(std::fs::Permissions::from_mode(0o666))
{
warn!("Failed to set permissions on temporary file: {err}");
}
// Try to move the file to path, but if path exists now, just open path
match file.persist_noclobber(path.as_ref()) {
Ok(file) => Ok(fs_err::File::from_parts(file, path.as_ref())),
Err(err) => {
if err.error.kind() == std::io::ErrorKind::AlreadyExists {
fs_err::OpenOptions::new()
.read(true)
.write(true)
.open(path.as_ref())
} else {
Err(err.error)
}
}
}
}
#[cfg(not(unix))]
fn create(path: impl AsRef<Path>) -> std::io::Result<fs_err::File> {
fs_err::OpenOptions::new()
.read(true)
.write(true)
.create(true)
.open(path.as_ref())
}
}
#[cfg(feature = "tokio")]
impl Drop for LockedFile {
/// Unlock the file.
fn drop(&mut self) {
if let Err(err) = self.0.unlock() {
error!(
"Failed to unlock resource at `{}`; program may be stuck: {err}",
self.0.path().display()
);
} else {
debug!("Released lock at `{}`", self.0.path().display());
}
}
}

View File

@ -10,7 +10,7 @@ use reqwest_middleware::ClientWithMiddleware;
use tracing::debug; use tracing::debug;
use uv_cache_key::{RepositoryUrl, cache_digest}; use uv_cache_key::{RepositoryUrl, cache_digest};
use uv_fs::LockedFile; use uv_fs::{LockedFile, LockedFileError, LockedFileMode};
use uv_git_types::{GitHubRepository, GitOid, GitReference, GitUrl}; use uv_git_types::{GitHubRepository, GitOid, GitReference, GitUrl};
use uv_static::EnvVars; use uv_static::EnvVars;
use uv_version::version; use uv_version::version;
@ -25,6 +25,8 @@ pub enum GitResolverError {
#[error(transparent)] #[error(transparent)]
Io(#[from] std::io::Error), Io(#[from] std::io::Error),
#[error(transparent)] #[error(transparent)]
LockedFile(#[from] LockedFileError),
#[error(transparent)]
Join(#[from] tokio::task::JoinError), Join(#[from] tokio::task::JoinError),
#[error("Git operation failed")] #[error("Git operation failed")]
Git(#[source] anyhow::Error), Git(#[source] anyhow::Error),
@ -169,6 +171,7 @@ impl GitResolver {
let repository_url = RepositoryUrl::new(url.repository()); let repository_url = RepositoryUrl::new(url.repository());
let _lock = LockedFile::acquire( let _lock = LockedFile::acquire(
lock_dir.join(cache_digest(&repository_url)), lock_dir.join(cache_digest(&repository_url)),
LockedFileMode::Exclusive,
&repository_url, &repository_url,
) )
.await?; .await?;

View File

@ -7,7 +7,7 @@ use owo_colors::OwoColorize;
use tracing::debug; use tracing::debug;
use uv_cache::Cache; use uv_cache::Cache;
use uv_fs::{LockedFile, Simplified}; use uv_fs::{LockedFile, LockedFileError, Simplified};
use uv_pep440::Version; use uv_pep440::Version;
use uv_preview::Preview; use uv_preview::Preview;
@ -312,7 +312,7 @@ impl PythonEnvironment {
} }
/// Grab a file lock for the environment to prevent concurrent writes across processes. /// Grab a file lock for the environment to prevent concurrent writes across processes.
pub async fn lock(&self) -> Result<LockedFile, std::io::Error> { pub async fn lock(&self) -> Result<LockedFile, LockedFileError> {
self.0.interpreter.lock().await self.0.interpreter.lock().await
} }

View File

@ -18,7 +18,9 @@ use tracing::{debug, trace, warn};
use uv_cache::{Cache, CacheBucket, CachedByTimestamp, Freshness}; use uv_cache::{Cache, CacheBucket, CachedByTimestamp, Freshness};
use uv_cache_info::Timestamp; use uv_cache_info::Timestamp;
use uv_cache_key::cache_digest; use uv_cache_key::cache_digest;
use uv_fs::{LockedFile, PythonExt, Simplified, write_atomic_sync}; use uv_fs::{
LockedFile, LockedFileError, LockedFileMode, PythonExt, Simplified, write_atomic_sync,
};
use uv_install_wheel::Layout; use uv_install_wheel::Layout;
use uv_pep440::Version; use uv_pep440::Version;
use uv_pep508::{MarkerEnvironment, StringVersion}; use uv_pep508::{MarkerEnvironment, StringVersion};
@ -666,17 +668,28 @@ impl Interpreter {
} }
/// Grab a file lock for the environment to prevent concurrent writes across processes. /// Grab a file lock for the environment to prevent concurrent writes across processes.
pub async fn lock(&self) -> Result<LockedFile, io::Error> { pub async fn lock(&self) -> Result<LockedFile, LockedFileError> {
if let Some(target) = self.target() { if let Some(target) = self.target() {
// If we're installing into a `--target`, use a target-specific lockfile. // If we're installing into a `--target`, use a target-specific lockfile.
LockedFile::acquire(target.root().join(".lock"), target.root().user_display()).await LockedFile::acquire(
target.root().join(".lock"),
LockedFileMode::Exclusive,
target.root().user_display(),
)
.await
} else if let Some(prefix) = self.prefix() { } else if let Some(prefix) = self.prefix() {
// Likewise, if we're installing into a `--prefix`, use a prefix-specific lockfile. // Likewise, if we're installing into a `--prefix`, use a prefix-specific lockfile.
LockedFile::acquire(prefix.root().join(".lock"), prefix.root().user_display()).await LockedFile::acquire(
prefix.root().join(".lock"),
LockedFileMode::Exclusive,
prefix.root().user_display(),
)
.await
} else if self.is_virtualenv() { } else if self.is_virtualenv() {
// If the environment a virtualenv, use a virtualenv-specific lockfile. // If the environment a virtualenv, use a virtualenv-specific lockfile.
LockedFile::acquire( LockedFile::acquire(
self.sys_prefix.join(".lock"), self.sys_prefix.join(".lock"),
LockedFileMode::Exclusive,
self.sys_prefix.user_display(), self.sys_prefix.user_display(),
) )
.await .await
@ -684,6 +697,7 @@ impl Interpreter {
// Otherwise, use a global lockfile. // Otherwise, use a global lockfile.
LockedFile::acquire( LockedFile::acquire(
env::temp_dir().join(format!("uv-{}.lock", cache_digest(&self.sys_executable))), env::temp_dir().join(format!("uv-{}.lock", cache_digest(&self.sys_executable))),
LockedFileMode::Exclusive,
self.sys_prefix.user_display(), self.sys_prefix.user_display(),
) )
.await .await
@ -1272,8 +1286,8 @@ mod tests {
use crate::Interpreter; use crate::Interpreter;
#[test] #[tokio::test]
fn test_cache_invalidation() { async fn test_cache_invalidation() {
let mock_dir = tempdir().unwrap(); let mock_dir = tempdir().unwrap();
let mocked_interpreter = mock_dir.path().join("python"); let mocked_interpreter = mock_dir.path().join("python");
let json = indoc! {r##" let json = indoc! {r##"
@ -1334,7 +1348,7 @@ mod tests {
} }
"##}; "##};
let cache = Cache::temp().unwrap().init().unwrap(); let cache = Cache::temp().unwrap().init().await.unwrap();
fs::write( fs::write(
&mocked_interpreter, &mocked_interpreter,

View File

@ -16,7 +16,9 @@ use uv_preview::{Preview, PreviewFeatures};
#[cfg(windows)] #[cfg(windows)]
use windows::Win32::Storage::FileSystem::FILE_ATTRIBUTE_REPARSE_POINT; use windows::Win32::Storage::FileSystem::FILE_ATTRIBUTE_REPARSE_POINT;
use uv_fs::{LockedFile, Simplified, replace_symlink, symlink_or_copy_file}; use uv_fs::{
LockedFile, LockedFileError, LockedFileMode, Simplified, replace_symlink, symlink_or_copy_file,
};
use uv_platform::{Error as PlatformError, Os}; use uv_platform::{Error as PlatformError, Os};
use uv_platform::{LibcDetectionError, Platform}; use uv_platform::{LibcDetectionError, Platform};
use uv_state::{StateBucket, StateStore}; use uv_state::{StateBucket, StateStore};
@ -38,6 +40,8 @@ pub enum Error {
#[error(transparent)] #[error(transparent)]
Io(#[from] io::Error), Io(#[from] io::Error),
#[error(transparent)] #[error(transparent)]
LockedFile(#[from] LockedFileError),
#[error(transparent)]
Download(#[from] DownloadError), Download(#[from] DownloadError),
#[error(transparent)] #[error(transparent)]
PlatformError(#[from] PlatformError), PlatformError(#[from] PlatformError),
@ -123,7 +127,12 @@ impl ManagedPythonInstallations {
/// Grab a file lock for the managed Python distribution directory to prevent concurrent access /// Grab a file lock for the managed Python distribution directory to prevent concurrent access
/// across processes. /// across processes.
pub async fn lock(&self) -> Result<LockedFile, Error> { pub async fn lock(&self) -> Result<LockedFile, Error> {
Ok(LockedFile::acquire(self.root.join(".lock"), self.root.user_display()).await?) Ok(LockedFile::acquire(
self.root.join(".lock"),
LockedFileMode::Exclusive,
self.root.user_display(),
)
.await?)
} }
/// Prefer, in order: /// Prefer, in order:

View File

@ -1271,4 +1271,10 @@ impl EnvVars {
/// of build failures. /// of build failures.
#[attr_added_in("0.9.15")] #[attr_added_in("0.9.15")]
pub const UV_HIDE_BUILD_OUTPUT: &'static str = "UV_HIDE_BUILD_OUTPUT"; pub const UV_HIDE_BUILD_OUTPUT: &'static str = "UV_HIDE_BUILD_OUTPUT";
/// The time in seconds uv waits for a file lock to become available.
///
/// Defaults to 300s (5 min).
#[attr_added_in("0.9.4")]
pub const UV_LOCK_TIMEOUT: &'static str = "UV_LOCK_TIMEOUT";
} }

View File

@ -9,7 +9,7 @@ use tracing::{debug, warn};
use uv_cache::Cache; use uv_cache::Cache;
use uv_dirs::user_executable_directory; use uv_dirs::user_executable_directory;
use uv_fs::{LockedFile, Simplified}; use uv_fs::{LockedFile, LockedFileError, LockedFileMode, Simplified};
use uv_install_wheel::read_record_file; use uv_install_wheel::read_record_file;
use uv_installer::SitePackages; use uv_installer::SitePackages;
use uv_normalize::{InvalidNameError, PackageName}; use uv_normalize::{InvalidNameError, PackageName};
@ -65,6 +65,8 @@ impl ToolEnvironment {
pub enum Error { pub enum Error {
#[error(transparent)] #[error(transparent)]
Io(#[from] io::Error), Io(#[from] io::Error),
#[error(transparent)]
LockedFile(#[from] LockedFileError),
#[error("Failed to update `uv-receipt.toml` at {0}")] #[error("Failed to update `uv-receipt.toml` at {0}")]
ReceiptWrite(PathBuf, #[source] Box<toml_edit::ser::Error>), ReceiptWrite(PathBuf, #[source] Box<toml_edit::ser::Error>),
#[error("Failed to read `uv-receipt.toml` at {0}")] #[error("Failed to read `uv-receipt.toml` at {0}")]
@ -89,6 +91,27 @@ pub enum Error {
ToolEnvironmentNotFound(PackageName, PathBuf), ToolEnvironmentNotFound(PackageName, PathBuf),
} }
impl Error {
pub fn as_io_error(&self) -> Option<&io::Error> {
match self {
Self::Io(err) => Some(err),
Self::LockedFile(err) => err.as_io_error(),
Self::VirtualEnvError(uv_virtualenv::Error::Io(err)) => Some(err),
Self::ReceiptWrite(_, _)
| Self::ReceiptRead(_, _)
| Self::VirtualEnvError(_)
| Self::EntrypointRead(_)
| Self::NoExecutableDirectory
| Self::ToolName(_)
| Self::EnvironmentError(_)
| Self::MissingToolReceipt(_, _)
| Self::EnvironmentRead(_, _)
| Self::MissingToolPackage(_)
| Self::ToolEnvironmentNotFound(_, _) => None,
}
}
}
/// A collection of uv-managed tools installed on the current system. /// A collection of uv-managed tools installed on the current system.
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub struct InstalledTools { pub struct InstalledTools {
@ -179,7 +202,12 @@ impl InstalledTools {
/// Grab a file lock for the tools directory to prevent concurrent access across processes. /// Grab a file lock for the tools directory to prevent concurrent access across processes.
pub async fn lock(&self) -> Result<LockedFile, Error> { pub async fn lock(&self) -> Result<LockedFile, Error> {
Ok(LockedFile::acquire(self.root.join(".lock"), self.root.user_display()).await?) Ok(LockedFile::acquire(
self.root.join(".lock"),
LockedFileMode::Exclusive,
self.root.user_display(),
)
.await?)
} }
/// Add a receipt for a tool. /// Add a receipt for a tool.

View File

@ -68,7 +68,7 @@ pub(crate) async fn login(
return Ok(ExitStatus::Success); return Ok(ExitStatus::Success);
} }
let backend = AuthBackend::from_settings(preview)?; let backend = AuthBackend::from_settings(preview).await?;
// If the URL includes a known index URL suffix, strip it // If the URL includes a known index URL suffix, strip it
// TODO(zanieb): Use a shared abstraction across `login` and `logout`? // TODO(zanieb): Use a shared abstraction across `login` and `logout`?

View File

@ -27,7 +27,7 @@ pub(crate) async fn logout(
return pyx_logout(&pyx_store, network_settings, printer, preview).await; return pyx_logout(&pyx_store, network_settings, printer, preview).await;
} }
let backend = AuthBackend::from_settings(preview)?; let backend = AuthBackend::from_settings(preview).await?;
// TODO(zanieb): Use a shared abstraction across `login` and `logout`? // TODO(zanieb): Use a shared abstraction across `login` and `logout`?
let url = service.url().clone(); let url = service.url().clone();

View File

@ -41,7 +41,7 @@ pub(crate) async fn token(
return Ok(ExitStatus::Success); return Ok(ExitStatus::Success);
} }
let backend = AuthBackend::from_settings(preview)?; let backend = AuthBackend::from_settings(preview).await?;
let url = service.url(); let url = service.url();
// Extract credentials from URL if present // Extract credentials from URL if present

View File

@ -13,7 +13,7 @@ use crate::commands::{ExitStatus, human_readable_bytes};
use crate::printer::Printer; use crate::printer::Printer;
/// Clear the cache, removing all entries or those linked to specific packages. /// Clear the cache, removing all entries or those linked to specific packages.
pub(crate) fn cache_clean( pub(crate) async fn cache_clean(
packages: &[PackageName], packages: &[PackageName],
force: bool, force: bool,
cache: Cache, cache: Cache,
@ -39,7 +39,7 @@ pub(crate) fn cache_clean(
printer.stderr(), printer.stderr(),
"Cache is currently in-use, waiting for other uv processes to finish (use `--force` to override)" "Cache is currently in-use, waiting for other uv processes to finish (use `--force` to override)"
)?; )?;
cache.with_exclusive_lock()? cache.with_exclusive_lock().await?
} }
}; };

View File

@ -11,7 +11,7 @@ use crate::commands::{ExitStatus, human_readable_bytes};
use crate::printer::Printer; use crate::printer::Printer;
/// Prune all unreachable objects from the cache. /// Prune all unreachable objects from the cache.
pub(crate) fn cache_prune( pub(crate) async fn cache_prune(
ci: bool, ci: bool,
force: bool, force: bool,
cache: Cache, cache: Cache,
@ -37,7 +37,7 @@ pub(crate) fn cache_prune(
printer.stderr(), printer.stderr(),
"Cache is currently in-use, waiting for other uv processes to finish (use `--force` to override)" "Cache is currently in-use, waiting for other uv processes to finish (use `--force` to override)"
)?; )?;
cache.with_exclusive_lock()? cache.with_exclusive_lock().await?
} }
}; };

View File

@ -25,7 +25,7 @@ use uv_distribution_types::{
Index, IndexName, IndexUrl, IndexUrls, NameRequirementSpecification, Requirement, Index, IndexName, IndexUrl, IndexUrls, NameRequirementSpecification, Requirement,
RequirementSource, UnresolvedRequirement, VersionId, RequirementSource, UnresolvedRequirement, VersionId,
}; };
use uv_fs::{LockedFile, Simplified}; use uv_fs::{LockedFile, LockedFileError, Simplified};
use uv_git::GIT_STORE; use uv_git::GIT_STORE;
use uv_normalize::{DEV_DEPENDENCIES, DefaultExtras, DefaultGroups, ExtraName, PackageName}; use uv_normalize::{DEV_DEPENDENCIES, DefaultExtras, DefaultGroups, ExtraName, PackageName};
use uv_pep508::{MarkerTree, VersionOrUrl}; use uv_pep508::{MarkerTree, VersionOrUrl};
@ -1308,7 +1308,7 @@ impl<'lock> From<&'lock AddTarget> for LockTarget<'lock> {
impl AddTarget { impl AddTarget {
/// Acquire a file lock mapped to the underlying interpreter to prevent concurrent /// Acquire a file lock mapped to the underlying interpreter to prevent concurrent
/// modifications. /// modifications.
pub(super) async fn acquire_lock(&self) -> Result<LockedFile, io::Error> { pub(super) async fn acquire_lock(&self) -> Result<LockedFile, LockedFileError> {
match self { match self {
Self::Script(_, interpreter) => interpreter.lock().await, Self::Script(_, interpreter) => interpreter.lock().await,
Self::Project(_, python_target) => python_target.interpreter().lock().await, Self::Project(_, python_target) => python_target.interpreter().lock().await,

View File

@ -21,7 +21,7 @@ use uv_distribution_types::{
ExtraBuildRequirement, ExtraBuildRequires, Index, Requirement, RequiresPython, Resolution, ExtraBuildRequirement, ExtraBuildRequires, Index, Requirement, RequiresPython, Resolution,
UnresolvedRequirement, UnresolvedRequirementSpecification, UnresolvedRequirement, UnresolvedRequirementSpecification,
}; };
use uv_fs::{CWD, LockedFile, Simplified}; use uv_fs::{CWD, LockedFile, LockedFileError, LockedFileMode, Simplified};
use uv_git::ResolvedRepositoryReference; use uv_git::ResolvedRepositoryReference;
use uv_installer::{InstallationStrategy, SatisfiesResult, SitePackages}; use uv_installer::{InstallationStrategy, SatisfiesResult, SitePackages};
use uv_normalize::{DEV_DEPENDENCIES, DefaultGroups, ExtraName, GroupName, PackageName}; use uv_normalize::{DEV_DEPENDENCIES, DefaultGroups, ExtraName, GroupName, PackageName};
@ -753,11 +753,12 @@ impl ScriptInterpreter {
} }
/// Grab a file lock for the script to prevent concurrent writes across processes. /// Grab a file lock for the script to prevent concurrent writes across processes.
pub(crate) async fn lock(script: Pep723ItemRef<'_>) -> Result<LockedFile, std::io::Error> { pub(crate) async fn lock(script: Pep723ItemRef<'_>) -> Result<LockedFile, LockedFileError> {
match script { match script {
Pep723ItemRef::Script(script) => { Pep723ItemRef::Script(script) => {
LockedFile::acquire( LockedFile::acquire(
std::env::temp_dir().join(format!("uv-{}.lock", cache_digest(&script.path))), std::env::temp_dir().join(format!("uv-{}.lock", cache_digest(&script.path))),
LockedFileMode::Exclusive,
script.path.simplified_display(), script.path.simplified_display(),
) )
.await .await
@ -765,6 +766,7 @@ impl ScriptInterpreter {
Pep723ItemRef::Remote(.., url) => { Pep723ItemRef::Remote(.., url) => {
LockedFile::acquire( LockedFile::acquire(
std::env::temp_dir().join(format!("uv-{}.lock", cache_digest(url))), std::env::temp_dir().join(format!("uv-{}.lock", cache_digest(url))),
LockedFileMode::Exclusive,
url.to_string(), url.to_string(),
) )
.await .await
@ -772,6 +774,7 @@ impl ScriptInterpreter {
Pep723ItemRef::Stdin(metadata) => { Pep723ItemRef::Stdin(metadata) => {
LockedFile::acquire( LockedFile::acquire(
std::env::temp_dir().join(format!("uv-{}.lock", cache_digest(&metadata.raw))), std::env::temp_dir().join(format!("uv-{}.lock", cache_digest(&metadata.raw))),
LockedFileMode::Exclusive,
"stdin".to_string(), "stdin".to_string(),
) )
.await .await
@ -1046,12 +1049,13 @@ impl ProjectInterpreter {
} }
/// Grab a file lock for the environment to prevent concurrent writes across processes. /// Grab a file lock for the environment to prevent concurrent writes across processes.
pub(crate) async fn lock(workspace: &Workspace) -> Result<LockedFile, std::io::Error> { pub(crate) async fn lock(workspace: &Workspace) -> Result<LockedFile, LockedFileError> {
LockedFile::acquire( LockedFile::acquire(
std::env::temp_dir().join(format!( std::env::temp_dir().join(format!(
"uv-{}.lock", "uv-{}.lock",
cache_digest(workspace.install_path()) cache_digest(workspace.install_path())
)), )),
LockedFileMode::Exclusive,
workspace.install_path().simplified_display(), workspace.install_path().simplified_display(),
) )
.await .await

View File

@ -341,7 +341,10 @@ pub(crate) async fn install(
package_name.cyan() package_name.cyan()
); );
} }
Err(uv_tool::Error::Io(err)) if err.kind() == std::io::ErrorKind::NotFound => {} Err(err)
if err
.as_io_error()
.is_some_and(|err| err.kind() == std::io::ErrorKind::NotFound) => {}
Err(err) => { Err(err) => {
return Err(err.into()); return Err(err.into());
} }

View File

@ -27,7 +27,11 @@ pub(crate) async fn list(
let installed_tools = InstalledTools::from_settings()?; let installed_tools = InstalledTools::from_settings()?;
let _lock = match installed_tools.lock().await { let _lock = match installed_tools.lock().await {
Ok(lock) => lock, Ok(lock) => lock,
Err(uv_tool::Error::Io(err)) if err.kind() == std::io::ErrorKind::NotFound => { Err(err)
if err
.as_io_error()
.is_some_and(|err| err.kind() == std::io::ErrorKind::NotFound) =>
{
writeln!(printer.stderr(), "No tools installed")?; writeln!(printer.stderr(), "No tools installed")?;
return Ok(ExitStatus::Success); return Ok(ExitStatus::Success);
} }

View File

@ -448,7 +448,11 @@ async fn show_help(
let installed_tools = InstalledTools::from_settings()?; let installed_tools = InstalledTools::from_settings()?;
let _lock = match installed_tools.lock().await { let _lock = match installed_tools.lock().await {
Ok(lock) => lock, Ok(lock) => lock,
Err(uv_tool::Error::Io(err)) if err.kind() == std::io::ErrorKind::NotFound => { Err(err)
if err
.as_io_error()
.is_some_and(|err| err.kind() == std::io::ErrorKind::NotFound) =>
{
writeln!(printer.stdout(), "{help}")?; writeln!(printer.stdout(), "{help}")?;
return Ok(()); return Ok(());
} }

View File

@ -17,7 +17,11 @@ pub(crate) async fn uninstall(name: Vec<PackageName>, printer: Printer) -> Resul
let installed_tools = InstalledTools::from_settings()?.init()?; let installed_tools = InstalledTools::from_settings()?.init()?;
let _lock = match installed_tools.lock().await { let _lock = match installed_tools.lock().await {
Ok(lock) => lock, Ok(lock) => lock,
Err(uv_tool::Error::Io(err)) if err.kind() == std::io::ErrorKind::NotFound => { Err(err)
if err
.as_io_error()
.is_some_and(|err| err.kind() == std::io::ErrorKind::NotFound) =>
{
if !name.is_empty() { if !name.is_empty() {
for name in name { for name in name {
writeln!(printer.stderr(), "`{name}` is not installed")?; writeln!(printer.stderr(), "`{name}` is not installed")?;
@ -110,8 +114,10 @@ async fn do_uninstall(
)?; )?;
continue; continue;
} }
Err(uv_tool::Error::VirtualEnvError(uv_virtualenv::Error::Io(err))) Err(err)
if err.kind() == std::io::ErrorKind::NotFound => if err
.as_io_error()
.is_some_and(|err| err.kind() == std::io::ErrorKind::NotFound) =>
{ {
bail!("`{name}` is not installed"); bail!("`{name}` is not installed");
} }

View File

@ -557,7 +557,7 @@ async fn run(mut cli: Cli) -> Result<ExitStatus> {
show_settings!(args); show_settings!(args);
// Initialize the cache. // Initialize the cache.
let cache = cache.init()?.with_refresh( let cache = cache.init().await?.with_refresh(
args.refresh args.refresh
.combine(Refresh::from(args.settings.reinstall.clone())) .combine(Refresh::from(args.settings.reinstall.clone()))
.combine(Refresh::from(args.settings.upgrade.clone())), .combine(Refresh::from(args.settings.upgrade.clone())),
@ -666,7 +666,7 @@ async fn run(mut cli: Cli) -> Result<ExitStatus> {
show_settings!(args); show_settings!(args);
// Initialize the cache. // Initialize the cache.
let cache = cache.init()?.with_refresh( let cache = cache.init().await?.with_refresh(
args.refresh args.refresh
.combine(Refresh::from(args.settings.reinstall.clone())) .combine(Refresh::from(args.settings.reinstall.clone()))
.combine(Refresh::from(args.settings.upgrade.clone())), .combine(Refresh::from(args.settings.upgrade.clone())),
@ -829,7 +829,7 @@ async fn run(mut cli: Cli) -> Result<ExitStatus> {
} }
// Initialize the cache. // Initialize the cache.
let cache = cache.init()?.with_refresh( let cache = cache.init().await?.with_refresh(
args.refresh args.refresh
.combine(Refresh::from(args.settings.reinstall.clone())) .combine(Refresh::from(args.settings.reinstall.clone()))
.combine(Refresh::from(args.settings.upgrade.clone())), .combine(Refresh::from(args.settings.upgrade.clone())),
@ -898,7 +898,7 @@ async fn run(mut cli: Cli) -> Result<ExitStatus> {
show_settings!(args); show_settings!(args);
// Initialize the cache. // Initialize the cache.
let cache = cache.init()?; let cache = cache.init().await?;
let mut sources = Vec::with_capacity(args.package.len() + args.requirements.len()); let mut sources = Vec::with_capacity(args.package.len() + args.requirements.len());
for package in args.package { for package in args.package {
@ -934,7 +934,7 @@ async fn run(mut cli: Cli) -> Result<ExitStatus> {
show_settings!(args); show_settings!(args);
// Initialize the cache. // Initialize the cache.
let cache = cache.init()?; let cache = cache.init().await?;
commands::pip_freeze( commands::pip_freeze(
args.exclude_editable, args.exclude_editable,
@ -959,7 +959,7 @@ async fn run(mut cli: Cli) -> Result<ExitStatus> {
show_settings!(args); show_settings!(args);
// Initialize the cache. // Initialize the cache.
let cache = cache.init()?; let cache = cache.init().await?;
commands::pip_list( commands::pip_list(
args.editable, args.editable,
@ -992,7 +992,7 @@ async fn run(mut cli: Cli) -> Result<ExitStatus> {
show_settings!(args); show_settings!(args);
// Initialize the cache. // Initialize the cache.
let cache = cache.init()?; let cache = cache.init().await?;
commands::pip_show( commands::pip_show(
args.package, args.package,
@ -1014,7 +1014,7 @@ async fn run(mut cli: Cli) -> Result<ExitStatus> {
let args = PipTreeSettings::resolve(args, filesystem, environment); let args = PipTreeSettings::resolve(args, filesystem, environment);
// Initialize the cache. // Initialize the cache.
let cache = cache.init()?; let cache = cache.init().await?;
commands::pip_tree( commands::pip_tree(
args.show_version_specifiers, args.show_version_specifiers,
@ -1048,7 +1048,7 @@ async fn run(mut cli: Cli) -> Result<ExitStatus> {
show_settings!(args); show_settings!(args);
// Initialize the cache. // Initialize the cache.
let cache = cache.init()?; let cache = cache.init().await?;
commands::pip_check( commands::pip_check(
args.settings.python.as_deref(), args.settings.python.as_deref(),
@ -1070,13 +1070,13 @@ async fn run(mut cli: Cli) -> Result<ExitStatus> {
}) })
| Commands::Clean(args) => { | Commands::Clean(args) => {
show_settings!(args); show_settings!(args);
commands::cache_clean(&args.package, args.force, cache, printer) commands::cache_clean(&args.package, args.force, cache, printer).await
} }
Commands::Cache(CacheNamespace { Commands::Cache(CacheNamespace {
command: CacheCommand::Prune(args), command: CacheCommand::Prune(args),
}) => { }) => {
show_settings!(args); show_settings!(args);
commands::cache_prune(args.ci, args.force, cache, printer) commands::cache_prune(args.ci, args.force, cache, printer).await
} }
Commands::Cache(CacheNamespace { Commands::Cache(CacheNamespace {
command: CacheCommand::Dir, command: CacheCommand::Dir,
@ -1090,7 +1090,7 @@ async fn run(mut cli: Cli) -> Result<ExitStatus> {
show_settings!(args); show_settings!(args);
// Initialize the cache. // Initialize the cache.
let cache = cache.init()?.with_refresh( let cache = cache.init().await?.with_refresh(
args.refresh args.refresh
.combine(Refresh::from(args.settings.upgrade.clone())), .combine(Refresh::from(args.settings.upgrade.clone())),
); );
@ -1151,7 +1151,7 @@ async fn run(mut cli: Cli) -> Result<ExitStatus> {
show_settings!(args); show_settings!(args);
// Initialize the cache. // Initialize the cache.
let cache = cache.init()?.with_refresh( let cache = cache.init().await?.with_refresh(
args.refresh args.refresh
.combine(Refresh::from(args.settings.reinstall.clone())) .combine(Refresh::from(args.settings.reinstall.clone()))
.combine(Refresh::from(args.settings.upgrade.clone())), .combine(Refresh::from(args.settings.upgrade.clone())),
@ -1302,7 +1302,7 @@ async fn run(mut cli: Cli) -> Result<ExitStatus> {
show_settings!(args); show_settings!(args);
// Initialize the cache. // Initialize the cache.
let cache = cache.init()?.with_refresh( let cache = cache.init().await?.with_refresh(
args.refresh args.refresh
.combine(Refresh::from(args.settings.reinstall.clone())) .combine(Refresh::from(args.settings.reinstall.clone()))
.combine(Refresh::from(args.settings.resolver.upgrade.clone())), .combine(Refresh::from(args.settings.resolver.upgrade.clone())),
@ -1387,7 +1387,7 @@ async fn run(mut cli: Cli) -> Result<ExitStatus> {
show_settings!(args); show_settings!(args);
// Initialize the cache. // Initialize the cache.
let cache = cache.init()?.with_refresh( let cache = cache.init().await?.with_refresh(
args.refresh args.refresh
.combine(Refresh::from(args.settings.reinstall.clone())) .combine(Refresh::from(args.settings.reinstall.clone()))
.combine(Refresh::from(args.settings.resolver.upgrade.clone())), .combine(Refresh::from(args.settings.resolver.upgrade.clone())),
@ -1480,7 +1480,7 @@ async fn run(mut cli: Cli) -> Result<ExitStatus> {
show_settings!(args); show_settings!(args);
// Initialize the cache. // Initialize the cache.
let cache = cache.init()?; let cache = cache.init().await?;
commands::tool_list( commands::tool_list(
args.show_paths, args.show_paths,
@ -1501,7 +1501,10 @@ async fn run(mut cli: Cli) -> Result<ExitStatus> {
show_settings!(args); show_settings!(args);
// Initialize the cache. // Initialize the cache.
let cache = cache.init()?.with_refresh(Refresh::All(Timestamp::now())); let cache = cache
.init()
.await?
.with_refresh(Refresh::All(Timestamp::now()));
Box::pin(commands::tool_upgrade( Box::pin(commands::tool_upgrade(
args.names, args.names,
@ -1554,7 +1557,7 @@ async fn run(mut cli: Cli) -> Result<ExitStatus> {
show_settings!(args); show_settings!(args);
// Initialize the cache. // Initialize the cache.
let cache = cache.init()?; let cache = cache.init().await?;
commands::python_list( commands::python_list(
args.request, args.request,
@ -1654,7 +1657,7 @@ async fn run(mut cli: Cli) -> Result<ExitStatus> {
let args = settings::PythonFindSettings::resolve(args, filesystem, environment); let args = settings::PythonFindSettings::resolve(args, filesystem, environment);
// Initialize the cache. // Initialize the cache.
let cache = cache.init()?; let cache = cache.init().await?;
if let Some(Pep723Item::Script(script)) = script { if let Some(Pep723Item::Script(script)) = script {
commands::python_find_script( commands::python_find_script(
@ -1695,7 +1698,7 @@ async fn run(mut cli: Cli) -> Result<ExitStatus> {
let args = settings::PythonPinSettings::resolve(args, filesystem, environment); let args = settings::PythonPinSettings::resolve(args, filesystem, environment);
// Initialize the cache. // Initialize the cache.
let cache = cache.init()?; let cache = cache.init().await?;
commands::python_pin( commands::python_pin(
&project_dir, &project_dir,
@ -1888,7 +1891,7 @@ async fn run_project(
} }
// Initialize the cache. // Initialize the cache.
let cache = cache.init()?; let cache = cache.init().await?;
commands::init( commands::init(
project_dir, project_dir,
@ -1923,7 +1926,7 @@ async fn run_project(
show_settings!(args); show_settings!(args);
// Initialize the cache. // Initialize the cache.
let cache = cache.init()?.with_refresh( let cache = cache.init().await?.with_refresh(
args.refresh args.refresh
.combine(Refresh::from(args.settings.reinstall.clone())) .combine(Refresh::from(args.settings.reinstall.clone()))
.combine(Refresh::from(args.settings.resolver.upgrade.clone())), .combine(Refresh::from(args.settings.resolver.upgrade.clone())),
@ -1987,7 +1990,7 @@ async fn run_project(
show_settings!(args); show_settings!(args);
// Initialize the cache. // Initialize the cache.
let cache = cache.init()?.with_refresh( let cache = cache.init().await?.with_refresh(
args.refresh args.refresh
.combine(Refresh::from(args.settings.reinstall.clone())) .combine(Refresh::from(args.settings.reinstall.clone()))
.combine(Refresh::from(args.settings.resolver.upgrade.clone())), .combine(Refresh::from(args.settings.resolver.upgrade.clone())),
@ -2037,7 +2040,7 @@ async fn run_project(
show_settings!(args); show_settings!(args);
// Initialize the cache. // Initialize the cache.
let cache = cache.init()?.with_refresh( let cache = cache.init().await?.with_refresh(
args.refresh args.refresh
.clone() .clone()
.combine(Refresh::from(args.settings.upgrade.clone())), .combine(Refresh::from(args.settings.upgrade.clone())),
@ -2149,7 +2152,7 @@ async fn run_project(
} }
// Initialize the cache. // Initialize the cache.
let cache = cache.init()?.with_refresh( let cache = cache.init().await?.with_refresh(
args.refresh args.refresh
.combine(Refresh::from(args.settings.reinstall.clone())) .combine(Refresh::from(args.settings.reinstall.clone()))
.combine(Refresh::from(args.settings.resolver.upgrade.clone())), .combine(Refresh::from(args.settings.resolver.upgrade.clone())),
@ -2212,7 +2215,7 @@ async fn run_project(
show_settings!(args); show_settings!(args);
// Initialize the cache. // Initialize the cache.
let cache = cache.init()?.with_refresh( let cache = cache.init().await?.with_refresh(
args.refresh args.refresh
.combine(Refresh::from(args.settings.reinstall.clone())) .combine(Refresh::from(args.settings.reinstall.clone()))
.combine(Refresh::from(args.settings.resolver.upgrade.clone())), .combine(Refresh::from(args.settings.resolver.upgrade.clone())),
@ -2256,7 +2259,7 @@ async fn run_project(
show_settings!(args); show_settings!(args);
// Initialize the cache. // Initialize the cache.
let cache = cache.init()?.with_refresh( let cache = cache.init().await?.with_refresh(
args.refresh args.refresh
.combine(Refresh::from(args.settings.reinstall.clone())) .combine(Refresh::from(args.settings.reinstall.clone()))
.combine(Refresh::from(args.settings.resolver.upgrade.clone())), .combine(Refresh::from(args.settings.resolver.upgrade.clone())),
@ -2296,7 +2299,7 @@ async fn run_project(
show_settings!(args); show_settings!(args);
// Initialize the cache. // Initialize the cache.
let cache = cache.init()?; let cache = cache.init().await?;
// Unwrap the script. // Unwrap the script.
let script = script.map(|script| match script { let script = script.map(|script| match script {
@ -2341,7 +2344,7 @@ async fn run_project(
show_settings!(args); show_settings!(args);
// Initialize the cache. // Initialize the cache.
let cache = cache.init()?; let cache = cache.init().await?;
// Unwrap the script. // Unwrap the script.
let script = script.map(|script| match script { let script = script.map(|script| match script {
@ -2389,7 +2392,7 @@ async fn run_project(
show_settings!(args); show_settings!(args);
// Initialize the cache. // Initialize the cache.
let cache = cache.init()?; let cache = cache.init().await?;
Box::pin(commands::format( Box::pin(commands::format(
project_dir, project_dir,

View File

@ -2,6 +2,9 @@ use anyhow::Result;
use assert_cmd::prelude::*; use assert_cmd::prelude::*;
use assert_fs::prelude::*; use assert_fs::prelude::*;
use uv_cache::Cache;
use uv_static::EnvVars;
use crate::common::{TestContext, uv_snapshot}; use crate::common::{TestContext, uv_snapshot};
/// `cache clean` should remove all packages. /// `cache clean` should remove all packages.
@ -26,7 +29,7 @@ fn clean_all() -> Result<()> {
----- stderr ----- ----- stderr -----
DEBUG uv [VERSION] ([COMMIT] DATE) DEBUG uv [VERSION] ([COMMIT] DATE)
DEBUG Acquired lock for `[CACHE_DIR]/` DEBUG Acquired exclusive lock for `[CACHE_DIR]/`
Clearing cache at: [CACHE_DIR]/ Clearing cache at: [CACHE_DIR]/
DEBUG Released lock at `[CACHE_DIR]/.lock` DEBUG Released lock at `[CACHE_DIR]/.lock`
Removed [N] files ([SIZE]) Removed [N] files ([SIZE])
@ -35,8 +38,8 @@ fn clean_all() -> Result<()> {
Ok(()) Ok(())
} }
#[test] #[tokio::test]
fn clean_force() -> Result<()> { async fn clean_force() -> Result<()> {
let context = TestContext::new("3.12").with_filtered_counts(); let context = TestContext::new("3.12").with_filtered_counts();
let requirements_txt = context.temp_dir.child("requirements.txt"); let requirements_txt = context.temp_dir.child("requirements.txt");
@ -57,7 +60,7 @@ fn clean_force() -> Result<()> {
----- stderr ----- ----- stderr -----
DEBUG uv [VERSION] ([COMMIT] DATE) DEBUG uv [VERSION] ([COMMIT] DATE)
DEBUG Acquired lock for `[CACHE_DIR]/` DEBUG Acquired exclusive lock for `[CACHE_DIR]/`
Clearing cache at: [CACHE_DIR]/ Clearing cache at: [CACHE_DIR]/
DEBUG Released lock at `[CACHE_DIR]/.lock` DEBUG Released lock at `[CACHE_DIR]/.lock`
Removed [N] files ([SIZE]) Removed [N] files ([SIZE])
@ -71,7 +74,9 @@ fn clean_force() -> Result<()> {
.success(); .success();
// When locked, `--force` should proceed without blocking // When locked, `--force` should proceed without blocking
let _cache = uv_cache::Cache::from_path(context.cache_dir.path()).with_exclusive_lock(); let _cache = uv_cache::Cache::from_path(context.cache_dir.path())
.with_exclusive_lock()
.await;
uv_snapshot!(context.filters(), context.clean().arg("--verbose").arg("--force"), @r" uv_snapshot!(context.filters(), context.clean().arg("--verbose").arg("--force"), @r"
success: true success: true
exit_code: 0 exit_code: 0
@ -135,7 +140,7 @@ fn clean_package_pypi() -> Result<()> {
----- stderr ----- ----- stderr -----
DEBUG uv [VERSION] ([COMMIT] DATE) DEBUG uv [VERSION] ([COMMIT] DATE)
DEBUG Acquired lock for `[CACHE_DIR]/` DEBUG Acquired exclusive lock for `[CACHE_DIR]/`
DEBUG Removing dangling cache entry: [CACHE_DIR]/archive-v0/[ENTRY] DEBUG Removing dangling cache entry: [CACHE_DIR]/archive-v0/[ENTRY]
Removed [N] files ([SIZE]) Removed [N] files ([SIZE])
DEBUG Released lock at `[CACHE_DIR]/.lock` DEBUG Released lock at `[CACHE_DIR]/.lock`
@ -155,7 +160,7 @@ fn clean_package_pypi() -> Result<()> {
----- stderr ----- ----- stderr -----
DEBUG uv [VERSION] ([COMMIT] DATE) DEBUG uv [VERSION] ([COMMIT] DATE)
DEBUG Acquired lock for `[CACHE_DIR]/` DEBUG Acquired exclusive lock for `[CACHE_DIR]/`
Pruning cache at: [CACHE_DIR]/ Pruning cache at: [CACHE_DIR]/
No unused entries found No unused entries found
DEBUG Released lock at `[CACHE_DIR]/.lock` DEBUG Released lock at `[CACHE_DIR]/.lock`
@ -214,7 +219,7 @@ fn clean_package_index() -> Result<()> {
----- stderr ----- ----- stderr -----
DEBUG uv [VERSION] ([COMMIT] DATE) DEBUG uv [VERSION] ([COMMIT] DATE)
DEBUG Acquired lock for `[CACHE_DIR]/` DEBUG Acquired exclusive lock for `[CACHE_DIR]/`
DEBUG Removing dangling cache entry: [CACHE_DIR]/archive-v0/[ENTRY] DEBUG Removing dangling cache entry: [CACHE_DIR]/archive-v0/[ENTRY]
Removed [N] files ([SIZE]) Removed [N] files ([SIZE])
DEBUG Released lock at `[CACHE_DIR]/.lock` DEBUG Released lock at `[CACHE_DIR]/.lock`
@ -228,3 +233,23 @@ fn clean_package_index() -> Result<()> {
Ok(()) Ok(())
} }
#[tokio::test]
async fn cache_timeout() {
let context = TestContext::new("3.12");
// Simulate another uv process running and locking the cache, e.g., with a source build.
let _cache = Cache::from_path(context.cache_dir.path())
.with_exclusive_lock()
.await;
uv_snapshot!(context.filters(), context.clean().env(EnvVars::UV_LOCK_TIMEOUT, "1"), @r"
success: false
exit_code: 2
----- stdout -----
----- stderr -----
Cache is currently in-use, waiting for other uv processes to finish (use `--force` to override)
error: Timeout ([TIME]) when waiting for lock on `[CACHE_DIR]/` at `[CACHE_DIR]/.lock`, is another uv process running? You can set `UV_LOCK_TIMEOUT` to increase the timeout.
");
}

View File

@ -36,7 +36,7 @@ fn prune_no_op() -> Result<()> {
----- stderr ----- ----- stderr -----
DEBUG uv [VERSION] ([COMMIT] DATE) DEBUG uv [VERSION] ([COMMIT] DATE)
DEBUG Acquired lock for `[CACHE_DIR]/` DEBUG Acquired exclusive lock for `[CACHE_DIR]/`
Pruning cache at: [CACHE_DIR]/ Pruning cache at: [CACHE_DIR]/
No unused entries found No unused entries found
DEBUG Released lock at `[CACHE_DIR]/.lock` DEBUG Released lock at `[CACHE_DIR]/.lock`
@ -77,7 +77,7 @@ fn prune_stale_directory() -> Result<()> {
----- stderr ----- ----- stderr -----
DEBUG uv [VERSION] ([COMMIT] DATE) DEBUG uv [VERSION] ([COMMIT] DATE)
DEBUG Acquired lock for `[CACHE_DIR]/` DEBUG Acquired exclusive lock for `[CACHE_DIR]/`
Pruning cache at: [CACHE_DIR]/ Pruning cache at: [CACHE_DIR]/
DEBUG Removing dangling cache bucket: [CACHE_DIR]/simple-v4 DEBUG Removing dangling cache bucket: [CACHE_DIR]/simple-v4
Removed 1 directory Removed 1 directory
@ -139,7 +139,7 @@ fn prune_cached_env() {
----- stderr ----- ----- stderr -----
DEBUG uv [VERSION] ([COMMIT] DATE) DEBUG uv [VERSION] ([COMMIT] DATE)
DEBUG Acquired lock for `[CACHE_DIR]/` DEBUG Acquired exclusive lock for `[CACHE_DIR]/`
Pruning cache at: [CACHE_DIR]/ Pruning cache at: [CACHE_DIR]/
DEBUG Removing dangling cache environment: [CACHE_DIR]/environments-v2/[ENTRY] DEBUG Removing dangling cache environment: [CACHE_DIR]/environments-v2/[ENTRY]
DEBUG Removing dangling cache archive: [CACHE_DIR]/archive-v0/[ENTRY] DEBUG Removing dangling cache archive: [CACHE_DIR]/archive-v0/[ENTRY]
@ -186,7 +186,7 @@ fn prune_stale_symlink() -> Result<()> {
----- stderr ----- ----- stderr -----
DEBUG uv [VERSION] ([COMMIT] DATE) DEBUG uv [VERSION] ([COMMIT] DATE)
DEBUG Acquired lock for `[CACHE_DIR]/` DEBUG Acquired exclusive lock for `[CACHE_DIR]/`
Pruning cache at: [CACHE_DIR]/ Pruning cache at: [CACHE_DIR]/
DEBUG Removing dangling cache archive: [CACHE_DIR]/archive-v0/[ENTRY] DEBUG Removing dangling cache archive: [CACHE_DIR]/archive-v0/[ENTRY]
Removed 44 files ([SIZE]) Removed 44 files ([SIZE])
@ -196,8 +196,8 @@ fn prune_stale_symlink() -> Result<()> {
Ok(()) Ok(())
} }
#[test] #[tokio::test]
fn prune_force() -> Result<()> { async fn prune_force() -> Result<()> {
let context = TestContext::new("3.12").with_filtered_counts(); let context = TestContext::new("3.12").with_filtered_counts();
let requirements_txt = context.temp_dir.child("requirements.txt"); let requirements_txt = context.temp_dir.child("requirements.txt");
@ -218,7 +218,7 @@ fn prune_force() -> Result<()> {
----- stderr ----- ----- stderr -----
DEBUG uv [VERSION] ([COMMIT] DATE) DEBUG uv [VERSION] ([COMMIT] DATE)
DEBUG Acquired lock for `[CACHE_DIR]/` DEBUG Acquired exclusive lock for `[CACHE_DIR]/`
Pruning cache at: [CACHE_DIR]/ Pruning cache at: [CACHE_DIR]/
No unused entries found No unused entries found
DEBUG Released lock at `[CACHE_DIR]/.lock` DEBUG Released lock at `[CACHE_DIR]/.lock`
@ -229,7 +229,9 @@ fn prune_force() -> Result<()> {
simple.create_dir_all()?; simple.create_dir_all()?;
// When locked, `--force` should proceed without blocking // When locked, `--force` should proceed without blocking
let _cache = uv_cache::Cache::from_path(context.cache_dir.path()).with_exclusive_lock(); let _cache = uv_cache::Cache::from_path(context.cache_dir.path())
.with_exclusive_lock()
.await;
uv_snapshot!(context.filters(), context.prune().arg("--verbose").arg("--force"), @r" uv_snapshot!(context.filters(), context.prune().arg("--verbose").arg("--force"), @r"
success: true success: true
exit_code: 0 exit_code: 0
@ -406,7 +408,7 @@ fn prune_stale_revision() -> Result<()> {
----- stderr ----- ----- stderr -----
DEBUG uv [VERSION] ([COMMIT] DATE) DEBUG uv [VERSION] ([COMMIT] DATE)
DEBUG Acquired lock for `[CACHE_DIR]/` DEBUG Acquired exclusive lock for `[CACHE_DIR]/`
Pruning cache at: [CACHE_DIR]/ Pruning cache at: [CACHE_DIR]/
DEBUG Removing dangling source revision: [CACHE_DIR]/sdists-v9/[ENTRY] DEBUG Removing dangling source revision: [CACHE_DIR]/sdists-v9/[ENTRY]
DEBUG Removing dangling cache archive: [CACHE_DIR]/archive-v0/[ENTRY] DEBUG Removing dangling cache archive: [CACHE_DIR]/archive-v0/[ENTRY]

View File

@ -1784,7 +1784,9 @@ pub fn python_installations_for_versions(
python_versions: &[&str], python_versions: &[&str],
download_list: &ManagedPythonDownloadList, download_list: &ManagedPythonDownloadList,
) -> anyhow::Result<Vec<PathBuf>> { ) -> anyhow::Result<Vec<PathBuf>> {
let cache = Cache::from_path(temp_dir.child("cache").to_path_buf()).init()?; let cache = Cache::from_path(temp_dir.child("cache").to_path_buf())
.init_no_wait()?
.expect("No cache contention when setting up Python in tests");
let selected_pythons = python_versions let selected_pythons = python_versions
.iter() .iter()
.map(|python_version| { .map(|python_version| {

View File

@ -127,8 +127,7 @@ cache is designed to be thread-safe and append-only, and thus robust to multiple
and writers. uv applies a file-based lock to the target virtual environment when installing, to and writers. uv applies a file-based lock to the target virtual environment when installing, to
avoid concurrent modifications across processes. avoid concurrent modifications across processes.
Note that it's _not_ safe to modify the uv cache (e.g., `uv cache clean`) while other uv commands Note that it's _never_ safe to modify the cache directly (e.g., by removing a file or directory).
are running, and _never_ safe to modify the cache directly (e.g., by removing a file or directory).
## Clearing the cache ## Clearing the cache
@ -141,6 +140,12 @@ uv provides a few different mechanisms for removing entries from the cache:
entries created in previous uv versions that are no longer necessary and can be safely removed. entries created in previous uv versions that are no longer necessary and can be safely removed.
`uv cache prune` is safe to run periodically, to keep the cache directory clean. `uv cache prune` is safe to run periodically, to keep the cache directory clean.
uv blocks cache-modifying operations while other uv commands are running. By default, those
`uv cache` commands have a 5 min timeout waiting for other uv processes to terminate to avoid
deadlocks. This timeout can be changed with
[`UV_LOCK_TIMEOUT`](../reference/environment.md#uv_lock_timeout). In cases where it is known that no
other uv processes are reading or writing from the cache, `--force` can be used to ignore the lock.
## Caching in continuous integration ## Caching in continuous integration
It's common to cache package installation artifacts in continuous integration environments (like It's common to cache package installation artifacts in continuous integration environments (like

View File

@ -273,6 +273,13 @@ a link mode.
Equivalent to the `--locked` command-line argument. If set, uv will assert that the Equivalent to the `--locked` command-line argument. If set, uv will assert that the
`uv.lock` remains unchanged. `uv.lock` remains unchanged.
### `UV_LOCK_TIMEOUT`
<small class="added-in">added in `0.9.4`</small>
The time in seconds uv waits for a file lock to become available.
Defaults to 300s (5 min).
### `UV_LOG_CONTEXT` ### `UV_LOG_CONTEXT`
<small class="added-in">added in `0.6.4`</small> <small class="added-in">added in `0.6.4`</small>