mirror of https://github.com/astral-sh/uv
Avoid deadlocks when multiple uv processes lock resources (#6790)
This is achieved by updating the `LockedFile::acquire` API to be async — as in some cases we were attempting to acquire the lock synchronously, i.e., without yielding, which blocked the runtime. Closes https://github.com/astral-sh/uv/issues/6691 — I tested with the reproduction there and a local release build and no longer reproduce the deadlock with these changes. Some additional context in the [internal Discord thread](https://discord.com/channels/1039017663004942429/1278430431204741270/1278478941262188595)
This commit is contained in:
parent
4f5356ed55
commit
e3d5d3d26d
|
|
@ -4898,6 +4898,7 @@ dependencies = [
|
||||||
"path-slash",
|
"path-slash",
|
||||||
"serde",
|
"serde",
|
||||||
"tempfile",
|
"tempfile",
|
||||||
|
"tokio",
|
||||||
"tracing",
|
"tracing",
|
||||||
"urlencoding",
|
"urlencoding",
|
||||||
"uv-warnings",
|
"uv-warnings",
|
||||||
|
|
|
||||||
|
|
@ -1897,11 +1897,8 @@ async fn lock_shard(cache_shard: &CacheShard) -> Result<LockedFile, Error> {
|
||||||
|
|
||||||
fs_err::create_dir_all(root).map_err(Error::CacheWrite)?;
|
fs_err::create_dir_all(root).map_err(Error::CacheWrite)?;
|
||||||
|
|
||||||
let lock: LockedFile = tokio::task::spawn_blocking({
|
let lock = LockedFile::acquire(root.join(".lock"), root.display())
|
||||||
let root = root.to_path_buf();
|
.await
|
||||||
move || LockedFile::acquire(root.join(".lock"), root.display())
|
|
||||||
})
|
|
||||||
.await?
|
|
||||||
.map_err(Error::CacheWrite)?;
|
.map_err(Error::CacheWrite)?;
|
||||||
|
|
||||||
Ok(lock)
|
Ok(lock)
|
||||||
|
|
|
||||||
|
|
@ -24,6 +24,7 @@ fs-err = { workspace = true }
|
||||||
fs2 = { workspace = true }
|
fs2 = { workspace = true }
|
||||||
path-slash = { workspace = true }
|
path-slash = { workspace = true }
|
||||||
serde = { workspace = true, optional = true }
|
serde = { workspace = true, optional = true }
|
||||||
|
tokio = { workspace = true, optional = true}
|
||||||
tempfile = { workspace = true }
|
tempfile = { workspace = true }
|
||||||
tracing = { workspace = true }
|
tracing = { workspace = true }
|
||||||
urlencoding = { workspace = true }
|
urlencoding = { workspace = true }
|
||||||
|
|
@ -33,4 +34,4 @@ junction = { workspace = true }
|
||||||
|
|
||||||
[features]
|
[features]
|
||||||
default = []
|
default = []
|
||||||
tokio = ["fs-err/tokio", "backoff/tokio"]
|
tokio = ["dep:tokio", "fs-err/tokio", "backoff/tokio"]
|
||||||
|
|
|
||||||
|
|
@ -314,8 +314,8 @@ pub fn is_temporary(path: impl AsRef<Path>) -> bool {
|
||||||
pub struct LockedFile(fs_err::File);
|
pub struct LockedFile(fs_err::File);
|
||||||
|
|
||||||
impl LockedFile {
|
impl LockedFile {
|
||||||
pub fn acquire(path: impl AsRef<Path>, resource: impl Display) -> Result<Self, std::io::Error> {
|
/// Inner implementation for [`LockedFile::acquire_blocking`] and [`LockedFile::acquire`].
|
||||||
let file = fs_err::File::create(path.as_ref())?;
|
fn lock_file_blocking(file: fs_err::File, resource: &str) -> Result<Self, std::io::Error> {
|
||||||
trace!("Checking lock for `{resource}`");
|
trace!("Checking lock for `{resource}`");
|
||||||
match file.file().try_lock_exclusive() {
|
match file.file().try_lock_exclusive() {
|
||||||
Ok(()) => {
|
Ok(()) => {
|
||||||
|
|
@ -328,19 +328,42 @@ impl LockedFile {
|
||||||
warn_user!(
|
warn_user!(
|
||||||
"Waiting to acquire lock for {} (lockfile: {})",
|
"Waiting to acquire lock for {} (lockfile: {})",
|
||||||
resource,
|
resource,
|
||||||
path.user_display(),
|
file.path().user_display(),
|
||||||
);
|
);
|
||||||
file.file().lock_exclusive().map_err(|err| {
|
file.file().lock_exclusive().map_err(|err| {
|
||||||
// Not an fs_err method, we need to build our own path context
|
// Not an fs_err method, we need to build our own path context
|
||||||
std::io::Error::new(
|
std::io::Error::new(
|
||||||
std::io::ErrorKind::Other,
|
std::io::ErrorKind::Other,
|
||||||
format!("Could not lock {}: {}", path.as_ref().user_display(), err),
|
format!("Could not lock {}: {}", file.path().user_display(), err),
|
||||||
)
|
)
|
||||||
})?;
|
})?;
|
||||||
Ok(Self(file))
|
Ok(Self(file))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// The same as [`LockedFile::acquire`], but for synchronous contexts. Do not use from an async
|
||||||
|
/// context, as this can block the runtime while waiting for another process to release the
|
||||||
|
/// lock.
|
||||||
|
pub fn acquire_blocking(
|
||||||
|
path: impl AsRef<Path>,
|
||||||
|
resource: impl Display,
|
||||||
|
) -> Result<Self, std::io::Error> {
|
||||||
|
let file = fs_err::File::create(path.as_ref())?;
|
||||||
|
let resource = resource.to_string();
|
||||||
|
Self::lock_file_blocking(file, &resource)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Acquire a cross-process lock for a resource using a file at the provided path.
|
||||||
|
#[cfg(feature = "tokio")]
|
||||||
|
pub async fn acquire(
|
||||||
|
path: impl AsRef<Path>,
|
||||||
|
resource: impl Display,
|
||||||
|
) -> Result<Self, std::io::Error> {
|
||||||
|
let file = fs_err::File::create(path.as_ref())?;
|
||||||
|
let resource = resource.to_string();
|
||||||
|
tokio::task::spawn_blocking(move || Self::lock_file_blocking(file, &resource)).await?
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Drop for LockedFile {
|
impl Drop for LockedFile {
|
||||||
|
|
|
||||||
|
|
@ -67,7 +67,8 @@ impl GitResolver {
|
||||||
let _lock = LockedFile::acquire(
|
let _lock = LockedFile::acquire(
|
||||||
lock_dir.join(cache_key::cache_digest(&repository_url)),
|
lock_dir.join(cache_key::cache_digest(&repository_url)),
|
||||||
&repository_url,
|
&repository_url,
|
||||||
)?;
|
)
|
||||||
|
.await?;
|
||||||
|
|
||||||
// Fetch the Git repository.
|
// Fetch the Git repository.
|
||||||
let source = if let Some(reporter) = reporter {
|
let source = if let Some(reporter) = reporter {
|
||||||
|
|
|
||||||
|
|
@ -189,22 +189,23 @@ impl PythonEnvironment {
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Grab a file lock for the environment to prevent concurrent writes across processes.
|
/// Grab a file lock for the environment to prevent concurrent writes across processes.
|
||||||
pub fn lock(&self) -> Result<LockedFile, std::io::Error> {
|
pub async fn lock(&self) -> Result<LockedFile, std::io::Error> {
|
||||||
if let Some(target) = self.0.interpreter.target() {
|
if let Some(target) = self.0.interpreter.target() {
|
||||||
// If we're installing into a `--target`, use a target-specific lockfile.
|
// If we're installing into a `--target`, use a target-specific lockfile.
|
||||||
LockedFile::acquire(target.root().join(".lock"), target.root().user_display())
|
LockedFile::acquire(target.root().join(".lock"), target.root().user_display()).await
|
||||||
} else if let Some(prefix) = self.0.interpreter.prefix() {
|
} else if let Some(prefix) = self.0.interpreter.prefix() {
|
||||||
// Likewise, if we're installing into a `--prefix`, use a prefix-specific lockfile.
|
// Likewise, if we're installing into a `--prefix`, use a prefix-specific lockfile.
|
||||||
LockedFile::acquire(prefix.root().join(".lock"), prefix.root().user_display())
|
LockedFile::acquire(prefix.root().join(".lock"), prefix.root().user_display()).await
|
||||||
} else if self.0.interpreter.is_virtualenv() {
|
} else if self.0.interpreter.is_virtualenv() {
|
||||||
// If the environment a virtualenv, use a virtualenv-specific lockfile.
|
// If the environment a virtualenv, use a virtualenv-specific lockfile.
|
||||||
LockedFile::acquire(self.0.root.join(".lock"), self.0.root.user_display())
|
LockedFile::acquire(self.0.root.join(".lock"), self.0.root.user_display()).await
|
||||||
} else {
|
} else {
|
||||||
// Otherwise, use a global lockfile.
|
// Otherwise, use a global lockfile.
|
||||||
LockedFile::acquire(
|
LockedFile::acquire(
|
||||||
env::temp_dir().join(format!("uv-{}.lock", cache_key::cache_digest(&self.0.root))),
|
env::temp_dir().join(format!("uv-{}.lock", cache_key::cache_digest(&self.0.root))),
|
||||||
self.0.root.user_display(),
|
self.0.root.user_display(),
|
||||||
)
|
)
|
||||||
|
.await
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -124,7 +124,7 @@ impl PythonInstallation {
|
||||||
let installations = ManagedPythonInstallations::from_settings()?.init()?;
|
let installations = ManagedPythonInstallations::from_settings()?.init()?;
|
||||||
let installations_dir = installations.root();
|
let installations_dir = installations.root();
|
||||||
let cache_dir = installations.cache();
|
let cache_dir = installations.cache();
|
||||||
let _lock = installations.acquire_lock()?;
|
let _lock = installations.lock().await?;
|
||||||
|
|
||||||
let download = ManagedPythonDownload::from_request(&request)?;
|
let download = ManagedPythonDownload::from_request(&request)?;
|
||||||
let client = client_builder.build();
|
let client = client_builder.build();
|
||||||
|
|
|
||||||
|
|
@ -69,12 +69,10 @@ impl ManagedPythonInstallations {
|
||||||
Self { root: root.into() }
|
Self { root: root.into() }
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Lock the toolchains directory.
|
/// Grab a file lock for the managed Python distribution directory to prevent concurrent access
|
||||||
pub fn acquire_lock(&self) -> Result<LockedFile, Error> {
|
/// across processes.
|
||||||
Ok(LockedFile::acquire(
|
pub async fn lock(&self) -> Result<LockedFile, Error> {
|
||||||
self.root.join(".lock"),
|
Ok(LockedFile::acquire(self.root.join(".lock"), self.root.user_display()).await?)
|
||||||
self.root.user_display(),
|
|
||||||
)?)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Prefer, in order:
|
/// Prefer, in order:
|
||||||
|
|
|
||||||
|
|
@ -139,12 +139,9 @@ impl InstalledTools {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Lock the tools directory.
|
/// Grab a file lock for the tools directory to prevent concurrent access across processes.
|
||||||
pub fn acquire_lock(&self) -> Result<LockedFile, Error> {
|
pub async fn lock(&self) -> Result<LockedFile, Error> {
|
||||||
Ok(LockedFile::acquire(
|
Ok(LockedFile::acquire(self.root.join(".lock"), self.root.user_display()).await?)
|
||||||
self.root.join(".lock"),
|
|
||||||
self.root.user_display(),
|
|
||||||
)?)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Add a receipt for a tool.
|
/// Add a receipt for a tool.
|
||||||
|
|
|
||||||
|
|
@ -183,7 +183,7 @@ pub(crate) async fn pip_install(
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
let _lock = environment.lock()?;
|
let _lock = environment.lock().await?;
|
||||||
|
|
||||||
// Determine the markers to use for the resolution.
|
// Determine the markers to use for the resolution.
|
||||||
let interpreter = environment.interpreter();
|
let interpreter = environment.interpreter();
|
||||||
|
|
|
||||||
|
|
@ -174,7 +174,7 @@ pub(crate) async fn pip_sync(
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
let _lock = environment.lock()?;
|
let _lock = environment.lock().await?;
|
||||||
|
|
||||||
let interpreter = environment.interpreter();
|
let interpreter = environment.interpreter();
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -100,7 +100,7 @@ pub(crate) async fn pip_uninstall(
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
let _lock = environment.lock()?;
|
let _lock = environment.lock().await?;
|
||||||
|
|
||||||
// Index the current `site-packages` directory.
|
// Index the current `site-packages` directory.
|
||||||
let site_packages = uv_installer::SitePackages::from_environment(&environment)?;
|
let site_packages = uv_installer::SitePackages::from_environment(&environment)?;
|
||||||
|
|
|
||||||
|
|
@ -34,7 +34,7 @@ pub(crate) async fn install(
|
||||||
let installations = ManagedPythonInstallations::from_settings()?.init()?;
|
let installations = ManagedPythonInstallations::from_settings()?.init()?;
|
||||||
let installations_dir = installations.root();
|
let installations_dir = installations.root();
|
||||||
let cache_dir = installations.cache();
|
let cache_dir = installations.cache();
|
||||||
let _lock = installations.acquire_lock()?;
|
let _lock = installations.lock().await?;
|
||||||
|
|
||||||
let targets = targets.into_iter().collect::<BTreeSet<_>>();
|
let targets = targets.into_iter().collect::<BTreeSet<_>>();
|
||||||
let requests: Vec<_> = if targets.is_empty() {
|
let requests: Vec<_> = if targets.is_empty() {
|
||||||
|
|
|
||||||
|
|
@ -23,7 +23,7 @@ pub(crate) async fn uninstall(
|
||||||
printer: Printer,
|
printer: Printer,
|
||||||
) -> Result<ExitStatus> {
|
) -> Result<ExitStatus> {
|
||||||
let installations = ManagedPythonInstallations::from_settings()?.init()?;
|
let installations = ManagedPythonInstallations::from_settings()?.init()?;
|
||||||
let _lock = installations.acquire_lock()?;
|
let _lock = installations.lock().await?;
|
||||||
|
|
||||||
// Perform the uninstallation.
|
// Perform the uninstallation.
|
||||||
do_uninstall(&installations, targets, all, printer).await?;
|
do_uninstall(&installations, targets, all, printer).await?;
|
||||||
|
|
|
||||||
|
|
@ -239,7 +239,7 @@ pub(crate) async fn install(
|
||||||
let options = ToolOptions::from(options);
|
let options = ToolOptions::from(options);
|
||||||
|
|
||||||
let installed_tools = InstalledTools::from_settings()?.init()?;
|
let installed_tools = InstalledTools::from_settings()?.init()?;
|
||||||
let _lock = installed_tools.acquire_lock()?;
|
let _lock = installed_tools.lock().await?;
|
||||||
|
|
||||||
// Find the existing receipt, if it exists. If the receipt is present but malformed, we'll
|
// Find the existing receipt, if it exists. If the receipt is present but malformed, we'll
|
||||||
// remove the environment and continue with the install.
|
// remove the environment and continue with the install.
|
||||||
|
|
|
||||||
|
|
@ -14,7 +14,7 @@ use crate::printer::Printer;
|
||||||
/// List installed tools.
|
/// List installed tools.
|
||||||
pub(crate) async fn list(show_paths: bool, cache: &Cache, printer: Printer) -> Result<ExitStatus> {
|
pub(crate) async fn list(show_paths: bool, cache: &Cache, printer: Printer) -> Result<ExitStatus> {
|
||||||
let installed_tools = InstalledTools::from_settings()?;
|
let installed_tools = InstalledTools::from_settings()?;
|
||||||
let _lock = match installed_tools.acquire_lock() {
|
let _lock = match installed_tools.lock().await {
|
||||||
Ok(lock) => lock,
|
Ok(lock) => lock,
|
||||||
Err(uv_tool::Error::Io(err)) if err.kind() == std::io::ErrorKind::NotFound => {
|
Err(uv_tool::Error::Io(err)) if err.kind() == std::io::ErrorKind::NotFound => {
|
||||||
writeln!(printer.stderr(), "No tools installed")?;
|
writeln!(printer.stderr(), "No tools installed")?;
|
||||||
|
|
|
||||||
|
|
@ -424,7 +424,7 @@ async fn get_or_create_environment(
|
||||||
// Check if the tool is already installed in a compatible environment.
|
// Check if the tool is already installed in a compatible environment.
|
||||||
if !isolated && !target.is_latest() {
|
if !isolated && !target.is_latest() {
|
||||||
let installed_tools = InstalledTools::from_settings()?.init()?;
|
let installed_tools = InstalledTools::from_settings()?.init()?;
|
||||||
let _lock = installed_tools.acquire_lock()?;
|
let _lock = installed_tools.lock().await?;
|
||||||
|
|
||||||
let existing_environment =
|
let existing_environment =
|
||||||
installed_tools
|
installed_tools
|
||||||
|
|
|
||||||
|
|
@ -15,7 +15,7 @@ use crate::printer::Printer;
|
||||||
/// Uninstall a tool.
|
/// Uninstall a tool.
|
||||||
pub(crate) async fn uninstall(name: Option<PackageName>, printer: Printer) -> Result<ExitStatus> {
|
pub(crate) async fn uninstall(name: Option<PackageName>, printer: Printer) -> Result<ExitStatus> {
|
||||||
let installed_tools = InstalledTools::from_settings()?.init()?;
|
let installed_tools = InstalledTools::from_settings()?.init()?;
|
||||||
let _lock = match installed_tools.acquire_lock() {
|
let _lock = match installed_tools.lock().await {
|
||||||
Ok(lock) => lock,
|
Ok(lock) => lock,
|
||||||
Err(uv_tool::Error::Io(err)) if err.kind() == std::io::ErrorKind::NotFound => {
|
Err(uv_tool::Error::Io(err)) if err.kind() == std::io::ErrorKind::NotFound => {
|
||||||
if let Some(name) = name {
|
if let Some(name) = name {
|
||||||
|
|
|
||||||
|
|
@ -32,7 +32,7 @@ pub(crate) async fn upgrade(
|
||||||
printer: Printer,
|
printer: Printer,
|
||||||
) -> Result<ExitStatus> {
|
) -> Result<ExitStatus> {
|
||||||
let installed_tools = InstalledTools::from_settings()?.init()?;
|
let installed_tools = InstalledTools::from_settings()?.init()?;
|
||||||
let _lock = installed_tools.acquire_lock()?;
|
let _lock = installed_tools.lock().await?;
|
||||||
|
|
||||||
let names: BTreeSet<PackageName> =
|
let names: BTreeSet<PackageName> =
|
||||||
name.map(|name| BTreeSet::from_iter([name]))
|
name.map(|name| BTreeSet::from_iter([name]))
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue