From c79ce523a062263ad4ec26d4a4c05a3047087cd3 Mon Sep 17 00:00:00 2001 From: konstin Date: Fri, 12 Dec 2025 18:39:11 +0100 Subject: [PATCH] Use the same retry logic across uv We were using slightly different retry code in multiple places, which this PR unifies. Also fixes retry undercounting in publish if the retry middleware was involved. --- Cargo.lock | 1 - crates/uv-bin-install/Cargo.toml | 1 - crates/uv-bin-install/src/lib.rs | 35 +++--------- crates/uv-client/src/base_client.rs | 68 +++++++++++++++++++++-- crates/uv-client/src/cached_client.rs | 79 +++++---------------------- crates/uv-client/src/lib.rs | 4 +- crates/uv-publish/src/lib.rs | 34 +++++------- crates/uv-python/src/downloads.rs | 64 +++++++--------------- crates/uv-redacted/src/lib.rs | 6 ++ 9 files changed, 125 insertions(+), 167 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 9bb2813f1..736d88e3a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5607,7 +5607,6 @@ dependencies = [ "thiserror 2.0.17", "tokio", "tokio-util", - "tracing", "url", "uv-cache", "uv-client", diff --git a/crates/uv-bin-install/Cargo.toml b/crates/uv-bin-install/Cargo.toml index 7d9487abd..b51f55ef8 100644 --- a/crates/uv-bin-install/Cargo.toml +++ b/crates/uv-bin-install/Cargo.toml @@ -34,5 +34,4 @@ tempfile = { workspace = true } thiserror = { workspace = true } tokio = { workspace = true } tokio-util = { workspace = true } -tracing = { workspace = true } url = { workspace = true } diff --git a/crates/uv-bin-install/src/lib.rs b/crates/uv-bin-install/src/lib.rs index 1a26b0a56..d3caff2eb 100644 --- a/crates/uv-bin-install/src/lib.rs +++ b/crates/uv-bin-install/src/lib.rs @@ -6,21 +6,18 @@ use std::path::PathBuf; use std::pin::Pin; use std::task::{Context, Poll}; -use std::time::{Duration, SystemTime}; use futures::TryStreamExt; -use reqwest_retry::RetryPolicy; use reqwest_retry::policies::ExponentialBackoff; use std::fmt; use thiserror::Error; use tokio::io::{AsyncRead, ReadBuf}; use tokio_util::compat::FuturesAsyncReadCompatExt; -use tracing::debug; use url::Url; use uv_distribution_filename::SourceDistExtension; use uv_cache::{Cache, CacheBucket, CacheEntry}; -use uv_client::{BaseClient, is_transient_network_error}; +use uv_client::{BaseClient, RetryState}; use uv_extract::{Error as ExtractError, stream}; use uv_fs::LockedFileError; use uv_pep440::Version; @@ -142,7 +139,7 @@ pub enum Error { #[error("Failed to detect platform")] Platform(#[from] uv_platform::Error), - #[error("Attempt failed after {retries} {subject}", subject = if *retries > 1 { "retries" } else { "retry" })] + #[error("Request failed after {retries} {subject}", subject = if *retries > 1 { "retries" } else { "retry" })] RetriedError { #[source] err: Box, @@ -243,9 +240,7 @@ async fn download_and_unpack_with_retry( download_url: &Url, cache_entry: &CacheEntry, ) -> Result { - let mut total_attempts = 0; - let mut retried_here = false; - let start_time = SystemTime::now(); + let mut retry_state = RetryState::new(*retry_policy, download_url.clone()); loop { let result = download_and_unpack( @@ -264,30 +259,14 @@ async fn download_and_unpack_with_retry( let result = match result { Ok(path) => Ok(path), Err(err) => { - total_attempts += err.attempts(); - let past_retries = total_attempts - 1; - - if is_transient_network_error(&err) { - let retry_decision = retry_policy.should_retry(start_time, past_retries); - if let reqwest_retry::RetryDecision::Retry { execute_after } = retry_decision { - debug!( - "Transient failure while installing {} {}; retrying...", - binary.name(), - version - ); - let duration = execute_after - .duration_since(SystemTime::now()) - .unwrap_or_else(|_| Duration::default()); - tokio::time::sleep(duration).await; - retried_here = true; - continue; - } + if retry_state.should_retry(&err, err.attempts()).await { + continue; } - if retried_here { + if retry_state.total_retries() > 0 { Err(Error::RetriedError { err: Box::new(err), - retries: past_retries, + retries: retry_state.total_retries(), }) } else { Err(err) diff --git a/crates/uv-client/src/base_client.rs b/crates/uv-client/src/base_client.rs index e57a53702..790c108a3 100644 --- a/crates/uv-client/src/base_client.rs +++ b/crates/uv-client/src/base_client.rs @@ -4,7 +4,7 @@ use std::fmt::Write; use std::num::ParseIntError; use std::path::Path; use std::sync::Arc; -use std::time::Duration; +use std::time::{Duration, SystemTime}; use std::{env, io, iter}; use anyhow::anyhow; @@ -20,7 +20,7 @@ use reqwest::{Client, ClientBuilder, IntoUrl, Proxy, Request, Response, multipar use reqwest_middleware::{ClientWithMiddleware, Middleware}; use reqwest_retry::policies::ExponentialBackoff; use reqwest_retry::{ - RetryTransientMiddleware, Retryable, RetryableStrategy, default_on_request_error, + RetryPolicy, RetryTransientMiddleware, Retryable, RetryableStrategy, default_on_request_error, default_on_request_success, }; use thiserror::Error; @@ -1139,8 +1139,66 @@ fn retryable_on_request_failure(err: &(dyn Error + 'static)) -> Option bool { - retryable_on_request_failure(err) == Some(Retryable::Transient) +/// Decide whether a requests should be retried, tracking previous attempts and backoff. +pub struct RetryState { + retry_policy: ExponentialBackoff, + start_time: SystemTime, + total_retries: u32, + url: DisplaySafeUrl, +} + +impl RetryState { + pub fn new(retry_policy: ExponentialBackoff, url: impl Into) -> Self { + Self { + retry_policy, + start_time: SystemTime::now(), + total_retries: 0, + url: url.into(), + } + } + + /// The number of retries across all requests. + /// + /// After a failed retryable request, this equals the maximum number of retries. + pub fn total_retries(&self) -> u32 { + self.total_retries + } + + /// Whether request should be retried. Waits with backoff if required. + /// + /// Takes the number of retries from nested layers associated with the specific `err` type as + /// `error_retries`. + pub async fn should_retry(&mut self, err: &(dyn Error + 'static), error_retries: u32) -> bool { + // Check if the middleware already performed retries + self.total_retries += error_retries; + match retryable_on_request_failure(err) { + Some(Retryable::Transient) => { + // If middleware already retried, consider that in our retry budget + let retry_decision = self + .retry_policy + .should_retry(self.start_time, self.total_retries); + if let reqwest_retry::RetryDecision::Retry { execute_after } = retry_decision { + let duration = execute_after + .duration_since(SystemTime::now()) + .unwrap_or_else(|_| Duration::default()); + + debug!( + "Transient failure while handling response from {}; retrying after {:.1}s...", + self.url, + duration.as_secs_f32(), + ); + // TODO(konsti): Should we show a spinner plus a message in the CLI while + // waiting? + tokio::time::sleep(duration).await; + self.total_retries += 1; + return true; + } + + false + } + Some(Retryable::Fatal) | None => false, + } + } } /// Whether the error is a status code error that is retryable. @@ -1410,7 +1468,7 @@ mod tests { let uv_retry = match response.error_for_status() { Ok(_) => false, - Err(err) => is_transient_network_error(&err), + Err(err) => retryable_on_request_failure(&err) == Some(Retryable::Transient), }; // Ensure we're retrying the same status code as the reqwest_retry crate. We may choose diff --git a/crates/uv-client/src/cached_client.rs b/crates/uv-client/src/cached_client.rs index ff14437f7..4eb51e801 100644 --- a/crates/uv-client/src/cached_client.rs +++ b/crates/uv-client/src/cached_client.rs @@ -1,9 +1,7 @@ -use std::time::{Duration, SystemTime}; use std::{borrow::Cow, path::Path}; use futures::FutureExt; use reqwest::{Request, Response}; -use reqwest_retry::RetryPolicy; use rkyv::util::AlignedVec; use serde::de::DeserializeOwned; use serde::{Deserialize, Serialize}; @@ -14,7 +12,7 @@ use uv_fs::write_atomic; use uv_redacted::DisplaySafeUrl; use crate::BaseClient; -use crate::base_client::is_transient_network_error; +use crate::base_client::RetryState; use crate::error::ProblemDetails; use crate::{ Error, ErrorKind, @@ -692,9 +690,7 @@ impl CachedClient { cache_control: CacheControl<'_>, response_callback: Callback, ) -> Result> { - let mut total_retries = 0; - let start_time = SystemTime::now(); - let retry_policy = self.uncached().retry_policy(); + let mut retry_state = RetryState::new(self.uncached().retry_policy(), req.url().clone()); loop { let fresh_req = req.try_clone().expect("HTTP request must be cloneable"); let result = self @@ -704,34 +700,10 @@ impl CachedClient { match result { Ok(ok) => return Ok(ok), Err(err) => { - // Check if the middleware already performed retries - total_retries += err.retries(); - if is_transient_network_error(err.error()) { - // If middleware already retried, consider that in our retry budget - let retry_decision = retry_policy.should_retry(start_time, total_retries); - if let reqwest_retry::RetryDecision::Retry { execute_after } = - retry_decision - { - let duration = execute_after - .duration_since(SystemTime::now()) - .unwrap_or_else(|_| Duration::default()); - - debug!( - "Transient failure while handling response from {}; retrying after {:.1}s...", - req.url(), - duration.as_secs_f32(), - ); - tokio::time::sleep(duration).await; - total_retries += 1; - continue; - } + if retry_state.should_retry(err.error(), err.retries()).await { + continue; } - - return if total_retries > 0 { - Err(err.with_retries(total_retries)) - } else { - Err(err) - }; + return Err(err.with_retries(retry_state.total_retries())); } } } @@ -751,47 +723,22 @@ impl CachedClient { cache_control: CacheControl<'_>, response_callback: Callback, ) -> Result> { - let mut past_retries = 0; - let start_time = SystemTime::now(); - let retry_policy = self.uncached().retry_policy(); + let mut retry_state = RetryState::new(self.uncached().retry_policy(), req.url().clone()); loop { let fresh_req = req.try_clone().expect("HTTP request must be cloneable"); let result = self .skip_cache(fresh_req, cache_entry, cache_control, &response_callback) .await; - // Check if the middleware already performed retries - let middleware_retries = match &result { - Err(err) => err.retries(), - _ => 0, - }; - - if result - .as_ref() - .is_err_and(|err| is_transient_network_error(err.error())) - { - let total_retries = past_retries + middleware_retries; - let retry_decision = retry_policy.should_retry(start_time, total_retries); - if let reqwest_retry::RetryDecision::Retry { execute_after } = retry_decision { - let duration = execute_after - .duration_since(SystemTime::now()) - .unwrap_or_else(|_| Duration::default()); - debug!( - "Transient failure while handling response from {}; retrying after {}s...", - req.url(), - duration.as_secs(), - ); - tokio::time::sleep(duration).await; - past_retries += 1; - continue; + match result { + Ok(ok) => return Ok(ok), + Err(err) => { + if retry_state.should_retry(err.error(), err.retries()).await { + continue; + } + return Err(err.with_retries(retry_state.total_retries())); } } - - if past_retries > 0 { - return result.map_err(|err| err.with_retries(past_retries)); - } - - return result; } } } diff --git a/crates/uv-client/src/lib.rs b/crates/uv-client/src/lib.rs index 2862fc6d1..fcf94c8d6 100644 --- a/crates/uv-client/src/lib.rs +++ b/crates/uv-client/src/lib.rs @@ -1,7 +1,7 @@ pub use base_client::{ AuthIntegration, BaseClient, BaseClientBuilder, DEFAULT_RETRIES, ExtraMiddleware, - RedirectClientWithMiddleware, RequestBuilder, RetryParsingError, UvRetryableStrategy, - is_transient_network_error, + RedirectClientWithMiddleware, RequestBuilder, RetryParsingError, RetryState, + UvRetryableStrategy, }; pub use cached_client::{CacheControl, CachedClient, CachedClientError, DataWithCachePolicy}; pub use error::{Error, ErrorKind, WrappedReqwestError}; diff --git a/crates/uv-publish/src/lib.rs b/crates/uv-publish/src/lib.rs index 202ed6036..5980af37b 100644 --- a/crates/uv-publish/src/lib.rs +++ b/crates/uv-publish/src/lib.rs @@ -3,7 +3,6 @@ mod trusted_publishing; use std::collections::BTreeSet; use std::path::{Path, PathBuf}; use std::sync::Arc; -use std::time::{Duration, SystemTime}; use std::{fmt, io}; use fs_err::tokio::File; @@ -13,7 +12,7 @@ use itertools::Itertools; use reqwest::header::AUTHORIZATION; use reqwest::multipart::Part; use reqwest::{Body, Response, StatusCode}; -use reqwest_retry::RetryPolicy; +use reqwest_retry::RetryError; use reqwest_retry::policies::ExponentialBackoff; use rustc_hash::FxHashMap; use serde::Deserialize; @@ -28,7 +27,7 @@ use uv_auth::{Credentials, PyxTokenStore}; use uv_cache::{Cache, Refresh}; use uv_client::{ BaseClient, MetadataFormat, OwnedArchive, RegistryClientBuilder, RequestBuilder, - RetryParsingError, is_transient_network_error, + RetryParsingError, RetryState, }; use uv_configuration::{KeyringProviderType, TrustedPublishing}; use uv_distribution_filename::{DistFilename, SourceDistExtension, SourceDistFilename}; @@ -469,8 +468,8 @@ pub async fn upload( download_concurrency: &Semaphore, reporter: Arc, ) -> Result { - let mut n_past_retries = 0; - let start_time = SystemTime::now(); + let mut retry_state = RetryState::new(retry_policy, registry.clone()); + loop { let (request, idx) = build_upload_request( group, @@ -490,23 +489,16 @@ pub async fn upload( response } Err(err) => { - if is_transient_network_error(&err) { - let retry_decision = retry_policy.should_retry(start_time, n_past_retries); - if let reqwest_retry::RetryDecision::Retry { execute_after } = retry_decision { - let duration = execute_after - .duration_since(SystemTime::now()) - .unwrap_or_else(|_| Duration::default()); - warn_user!( - "Transient failure while handling response for {}; retrying after {}s...", - registry, - duration.as_secs() - ); - tokio::time::sleep(duration).await; - n_past_retries += 1; - continue; - } + let middleware_retries = if let Some(RetryError::WithRetries { retries, .. }) = + (&err as &dyn std::error::Error).downcast_ref::() + { + *retries + } else { + 0 + }; + if retry_state.should_retry(&err, middleware_retries).await { + continue; } - return Err(PublishError::PublishSend( group.file.clone(), registry.clone().into(), diff --git a/crates/uv-python/src/downloads.rs b/crates/uv-python/src/downloads.rs index 9c687e688..2e9edf964 100644 --- a/crates/uv-python/src/downloads.rs +++ b/crates/uv-python/src/downloads.rs @@ -5,14 +5,13 @@ use std::path::{Path, PathBuf}; use std::pin::Pin; use std::str::FromStr; use std::task::{Context, Poll}; -use std::time::{Duration, SystemTime}; use std::{env, io}; use futures::TryStreamExt; use itertools::Itertools; use owo_colors::OwoColorize; +use reqwest_retry::RetryError; use reqwest_retry::policies::ExponentialBackoff; -use reqwest_retry::{RetryError, RetryPolicy}; use serde::Deserialize; use thiserror::Error; use tokio::io::{AsyncRead, AsyncReadExt, AsyncWriteExt, BufWriter, ReadBuf}; @@ -21,7 +20,7 @@ use tokio_util::either::Either; use tracing::{debug, instrument}; use url::Url; -use uv_client::{BaseClient, WrappedReqwestError, is_transient_network_error}; +use uv_client::{BaseClient, RetryState, WrappedReqwestError}; use uv_distribution_filename::{ExtensionError, SourceDistExtension}; use uv_extract::hash::Hasher; use uv_fs::{Simplified, rename_with_retry}; @@ -118,26 +117,24 @@ pub enum Error { } impl Error { - // Return the number of attempts that were made to complete this request before this error was - // returned. Note that e.g. 3 retries equates to 4 attempts. + // Return the number of retries that were made to complete this request before this error was + // returned. // - // It's easier to do arithmetic with "attempts" instead of "retries", because if you have - // nested retry loops you can just add up all the attempts directly, while adding up the - // retries requires +1/-1 adjustments. - fn attempts(&self) -> u32 { + // Note that e.g. 3 retries equates to 4 attempts. + fn retries(&self) -> u32 { // Unfortunately different variants of `Error` track retry counts in different ways. We // could consider unifying the variants we handle here in `Error::from_reqwest_middleware` // instead, but both approaches will be fragile as new variants get added over time. if let Self::NetworkErrorWithRetries { retries, .. } = self { - return retries + 1; + return *retries; } if let Self::NetworkMiddlewareError(_, anyhow_error) = self && let Some(RetryError::WithRetries { retries, .. }) = anyhow_error.downcast_ref::() { - return retries + 1; + return *retries; } - 1 + 0 } } @@ -1108,9 +1105,11 @@ impl ManagedPythonDownload { pypy_install_mirror: Option<&str>, reporter: Option<&dyn Reporter>, ) -> Result { - let mut total_attempts = 0; - let mut retried_here = false; - let start_time = SystemTime::now(); + let mut retry_state = RetryState::new( + *retry_policy, + self.download_url(python_install_mirror, pypy_install_mirror)?, + ); + loop { let result = self .fetch( @@ -1123,43 +1122,22 @@ impl ManagedPythonDownload { reporter, ) .await; - let result = match result { - Ok(download_result) => Ok(download_result), + match result { + Ok(download_result) => return Ok(download_result), Err(err) => { - // Inner retry loops (e.g. `reqwest-retry` middleware) might make more than one - // attempt per error we see here. - total_attempts += err.attempts(); - // We currently interpret e.g. "3 retries" to mean we should make 4 attempts. - let n_past_retries = total_attempts - 1; - if is_transient_network_error(&err) { - let retry_decision = retry_policy.should_retry(start_time, n_past_retries); - if let reqwest_retry::RetryDecision::Retry { execute_after } = - retry_decision - { - let duration = execute_after - .duration_since(SystemTime::now()) - .unwrap_or_else(|_| Duration::default()); - debug!( - "Transient failure while handling response for {}; retrying after {}s...", - self.key(), - duration.as_secs() - ); - tokio::time::sleep(duration).await; - retried_here = true; - continue; // Retry. - } + if retry_state.should_retry(&err, err.retries()).await { + continue; } - if retried_here { + return if retry_state.total_retries() > 0 { Err(Error::NetworkErrorWithRetries { err: Box::new(err), - retries: n_past_retries, + retries: retry_state.total_retries(), }) } else { Err(err) - } + }; } }; - return result; } } diff --git a/crates/uv-redacted/src/lib.rs b/crates/uv-redacted/src/lib.rs index ddd1a5dfc..9a5adf280 100644 --- a/crates/uv-redacted/src/lib.rs +++ b/crates/uv-redacted/src/lib.rs @@ -265,6 +265,12 @@ impl From for Url { } } +impl From for DisplaySafeUrl { + fn from(url: Url) -> Self { + Self(url) + } +} + impl FromStr for DisplaySafeUrl { type Err = DisplaySafeUrlError;