diff --git a/Cargo.lock b/Cargo.lock index 9bb2813f1..736d88e3a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5607,7 +5607,6 @@ dependencies = [ "thiserror 2.0.17", "tokio", "tokio-util", - "tracing", "url", "uv-cache", "uv-client", diff --git a/crates/uv-bin-install/Cargo.toml b/crates/uv-bin-install/Cargo.toml index 7d9487abd..b51f55ef8 100644 --- a/crates/uv-bin-install/Cargo.toml +++ b/crates/uv-bin-install/Cargo.toml @@ -34,5 +34,4 @@ tempfile = { workspace = true } thiserror = { workspace = true } tokio = { workspace = true } tokio-util = { workspace = true } -tracing = { workspace = true } url = { workspace = true } diff --git a/crates/uv-bin-install/src/lib.rs b/crates/uv-bin-install/src/lib.rs index 1a26b0a56..d3caff2eb 100644 --- a/crates/uv-bin-install/src/lib.rs +++ b/crates/uv-bin-install/src/lib.rs @@ -6,21 +6,18 @@ use std::path::PathBuf; use std::pin::Pin; use std::task::{Context, Poll}; -use std::time::{Duration, SystemTime}; use futures::TryStreamExt; -use reqwest_retry::RetryPolicy; use reqwest_retry::policies::ExponentialBackoff; use std::fmt; use thiserror::Error; use tokio::io::{AsyncRead, ReadBuf}; use tokio_util::compat::FuturesAsyncReadCompatExt; -use tracing::debug; use url::Url; use uv_distribution_filename::SourceDistExtension; use uv_cache::{Cache, CacheBucket, CacheEntry}; -use uv_client::{BaseClient, is_transient_network_error}; +use uv_client::{BaseClient, RetryState}; use uv_extract::{Error as ExtractError, stream}; use uv_fs::LockedFileError; use uv_pep440::Version; @@ -142,7 +139,7 @@ pub enum Error { #[error("Failed to detect platform")] Platform(#[from] uv_platform::Error), - #[error("Attempt failed after {retries} {subject}", subject = if *retries > 1 { "retries" } else { "retry" })] + #[error("Request failed after {retries} {subject}", subject = if *retries > 1 { "retries" } else { "retry" })] RetriedError { #[source] err: Box, @@ -243,9 +240,7 @@ async fn download_and_unpack_with_retry( download_url: &Url, cache_entry: &CacheEntry, ) -> Result { - let mut total_attempts = 0; - let mut retried_here = false; - let start_time = SystemTime::now(); + let mut retry_state = RetryState::new(*retry_policy, download_url.clone()); loop { let result = download_and_unpack( @@ -264,30 +259,14 @@ async fn download_and_unpack_with_retry( let result = match result { Ok(path) => Ok(path), Err(err) => { - total_attempts += err.attempts(); - let past_retries = total_attempts - 1; - - if is_transient_network_error(&err) { - let retry_decision = retry_policy.should_retry(start_time, past_retries); - if let reqwest_retry::RetryDecision::Retry { execute_after } = retry_decision { - debug!( - "Transient failure while installing {} {}; retrying...", - binary.name(), - version - ); - let duration = execute_after - .duration_since(SystemTime::now()) - .unwrap_or_else(|_| Duration::default()); - tokio::time::sleep(duration).await; - retried_here = true; - continue; - } + if retry_state.should_retry(&err, err.attempts()).await { + continue; } - if retried_here { + if retry_state.total_retries() > 0 { Err(Error::RetriedError { err: Box::new(err), - retries: past_retries, + retries: retry_state.total_retries(), }) } else { Err(err) diff --git a/crates/uv-client/src/base_client.rs b/crates/uv-client/src/base_client.rs index e57a53702..790c108a3 100644 --- a/crates/uv-client/src/base_client.rs +++ b/crates/uv-client/src/base_client.rs @@ -4,7 +4,7 @@ use std::fmt::Write; use std::num::ParseIntError; use std::path::Path; use std::sync::Arc; -use std::time::Duration; +use std::time::{Duration, SystemTime}; use std::{env, io, iter}; use anyhow::anyhow; @@ -20,7 +20,7 @@ use reqwest::{Client, ClientBuilder, IntoUrl, Proxy, Request, Response, multipar use reqwest_middleware::{ClientWithMiddleware, Middleware}; use reqwest_retry::policies::ExponentialBackoff; use reqwest_retry::{ - RetryTransientMiddleware, Retryable, RetryableStrategy, default_on_request_error, + RetryPolicy, RetryTransientMiddleware, Retryable, RetryableStrategy, default_on_request_error, default_on_request_success, }; use thiserror::Error; @@ -1139,8 +1139,66 @@ fn retryable_on_request_failure(err: &(dyn Error + 'static)) -> Option bool { - retryable_on_request_failure(err) == Some(Retryable::Transient) +/// Decide whether a requests should be retried, tracking previous attempts and backoff. +pub struct RetryState { + retry_policy: ExponentialBackoff, + start_time: SystemTime, + total_retries: u32, + url: DisplaySafeUrl, +} + +impl RetryState { + pub fn new(retry_policy: ExponentialBackoff, url: impl Into) -> Self { + Self { + retry_policy, + start_time: SystemTime::now(), + total_retries: 0, + url: url.into(), + } + } + + /// The number of retries across all requests. + /// + /// After a failed retryable request, this equals the maximum number of retries. + pub fn total_retries(&self) -> u32 { + self.total_retries + } + + /// Whether request should be retried. Waits with backoff if required. + /// + /// Takes the number of retries from nested layers associated with the specific `err` type as + /// `error_retries`. + pub async fn should_retry(&mut self, err: &(dyn Error + 'static), error_retries: u32) -> bool { + // Check if the middleware already performed retries + self.total_retries += error_retries; + match retryable_on_request_failure(err) { + Some(Retryable::Transient) => { + // If middleware already retried, consider that in our retry budget + let retry_decision = self + .retry_policy + .should_retry(self.start_time, self.total_retries); + if let reqwest_retry::RetryDecision::Retry { execute_after } = retry_decision { + let duration = execute_after + .duration_since(SystemTime::now()) + .unwrap_or_else(|_| Duration::default()); + + debug!( + "Transient failure while handling response from {}; retrying after {:.1}s...", + self.url, + duration.as_secs_f32(), + ); + // TODO(konsti): Should we show a spinner plus a message in the CLI while + // waiting? + tokio::time::sleep(duration).await; + self.total_retries += 1; + return true; + } + + false + } + Some(Retryable::Fatal) | None => false, + } + } } /// Whether the error is a status code error that is retryable. @@ -1410,7 +1468,7 @@ mod tests { let uv_retry = match response.error_for_status() { Ok(_) => false, - Err(err) => is_transient_network_error(&err), + Err(err) => retryable_on_request_failure(&err) == Some(Retryable::Transient), }; // Ensure we're retrying the same status code as the reqwest_retry crate. We may choose diff --git a/crates/uv-client/src/cached_client.rs b/crates/uv-client/src/cached_client.rs index ff14437f7..4eb51e801 100644 --- a/crates/uv-client/src/cached_client.rs +++ b/crates/uv-client/src/cached_client.rs @@ -1,9 +1,7 @@ -use std::time::{Duration, SystemTime}; use std::{borrow::Cow, path::Path}; use futures::FutureExt; use reqwest::{Request, Response}; -use reqwest_retry::RetryPolicy; use rkyv::util::AlignedVec; use serde::de::DeserializeOwned; use serde::{Deserialize, Serialize}; @@ -14,7 +12,7 @@ use uv_fs::write_atomic; use uv_redacted::DisplaySafeUrl; use crate::BaseClient; -use crate::base_client::is_transient_network_error; +use crate::base_client::RetryState; use crate::error::ProblemDetails; use crate::{ Error, ErrorKind, @@ -692,9 +690,7 @@ impl CachedClient { cache_control: CacheControl<'_>, response_callback: Callback, ) -> Result> { - let mut total_retries = 0; - let start_time = SystemTime::now(); - let retry_policy = self.uncached().retry_policy(); + let mut retry_state = RetryState::new(self.uncached().retry_policy(), req.url().clone()); loop { let fresh_req = req.try_clone().expect("HTTP request must be cloneable"); let result = self @@ -704,34 +700,10 @@ impl CachedClient { match result { Ok(ok) => return Ok(ok), Err(err) => { - // Check if the middleware already performed retries - total_retries += err.retries(); - if is_transient_network_error(err.error()) { - // If middleware already retried, consider that in our retry budget - let retry_decision = retry_policy.should_retry(start_time, total_retries); - if let reqwest_retry::RetryDecision::Retry { execute_after } = - retry_decision - { - let duration = execute_after - .duration_since(SystemTime::now()) - .unwrap_or_else(|_| Duration::default()); - - debug!( - "Transient failure while handling response from {}; retrying after {:.1}s...", - req.url(), - duration.as_secs_f32(), - ); - tokio::time::sleep(duration).await; - total_retries += 1; - continue; - } + if retry_state.should_retry(err.error(), err.retries()).await { + continue; } - - return if total_retries > 0 { - Err(err.with_retries(total_retries)) - } else { - Err(err) - }; + return Err(err.with_retries(retry_state.total_retries())); } } } @@ -751,47 +723,22 @@ impl CachedClient { cache_control: CacheControl<'_>, response_callback: Callback, ) -> Result> { - let mut past_retries = 0; - let start_time = SystemTime::now(); - let retry_policy = self.uncached().retry_policy(); + let mut retry_state = RetryState::new(self.uncached().retry_policy(), req.url().clone()); loop { let fresh_req = req.try_clone().expect("HTTP request must be cloneable"); let result = self .skip_cache(fresh_req, cache_entry, cache_control, &response_callback) .await; - // Check if the middleware already performed retries - let middleware_retries = match &result { - Err(err) => err.retries(), - _ => 0, - }; - - if result - .as_ref() - .is_err_and(|err| is_transient_network_error(err.error())) - { - let total_retries = past_retries + middleware_retries; - let retry_decision = retry_policy.should_retry(start_time, total_retries); - if let reqwest_retry::RetryDecision::Retry { execute_after } = retry_decision { - let duration = execute_after - .duration_since(SystemTime::now()) - .unwrap_or_else(|_| Duration::default()); - debug!( - "Transient failure while handling response from {}; retrying after {}s...", - req.url(), - duration.as_secs(), - ); - tokio::time::sleep(duration).await; - past_retries += 1; - continue; + match result { + Ok(ok) => return Ok(ok), + Err(err) => { + if retry_state.should_retry(err.error(), err.retries()).await { + continue; + } + return Err(err.with_retries(retry_state.total_retries())); } } - - if past_retries > 0 { - return result.map_err(|err| err.with_retries(past_retries)); - } - - return result; } } } diff --git a/crates/uv-client/src/lib.rs b/crates/uv-client/src/lib.rs index 2862fc6d1..fcf94c8d6 100644 --- a/crates/uv-client/src/lib.rs +++ b/crates/uv-client/src/lib.rs @@ -1,7 +1,7 @@ pub use base_client::{ AuthIntegration, BaseClient, BaseClientBuilder, DEFAULT_RETRIES, ExtraMiddleware, - RedirectClientWithMiddleware, RequestBuilder, RetryParsingError, UvRetryableStrategy, - is_transient_network_error, + RedirectClientWithMiddleware, RequestBuilder, RetryParsingError, RetryState, + UvRetryableStrategy, }; pub use cached_client::{CacheControl, CachedClient, CachedClientError, DataWithCachePolicy}; pub use error::{Error, ErrorKind, WrappedReqwestError}; diff --git a/crates/uv-publish/src/lib.rs b/crates/uv-publish/src/lib.rs index 202ed6036..5980af37b 100644 --- a/crates/uv-publish/src/lib.rs +++ b/crates/uv-publish/src/lib.rs @@ -3,7 +3,6 @@ mod trusted_publishing; use std::collections::BTreeSet; use std::path::{Path, PathBuf}; use std::sync::Arc; -use std::time::{Duration, SystemTime}; use std::{fmt, io}; use fs_err::tokio::File; @@ -13,7 +12,7 @@ use itertools::Itertools; use reqwest::header::AUTHORIZATION; use reqwest::multipart::Part; use reqwest::{Body, Response, StatusCode}; -use reqwest_retry::RetryPolicy; +use reqwest_retry::RetryError; use reqwest_retry::policies::ExponentialBackoff; use rustc_hash::FxHashMap; use serde::Deserialize; @@ -28,7 +27,7 @@ use uv_auth::{Credentials, PyxTokenStore}; use uv_cache::{Cache, Refresh}; use uv_client::{ BaseClient, MetadataFormat, OwnedArchive, RegistryClientBuilder, RequestBuilder, - RetryParsingError, is_transient_network_error, + RetryParsingError, RetryState, }; use uv_configuration::{KeyringProviderType, TrustedPublishing}; use uv_distribution_filename::{DistFilename, SourceDistExtension, SourceDistFilename}; @@ -469,8 +468,8 @@ pub async fn upload( download_concurrency: &Semaphore, reporter: Arc, ) -> Result { - let mut n_past_retries = 0; - let start_time = SystemTime::now(); + let mut retry_state = RetryState::new(retry_policy, registry.clone()); + loop { let (request, idx) = build_upload_request( group, @@ -490,23 +489,16 @@ pub async fn upload( response } Err(err) => { - if is_transient_network_error(&err) { - let retry_decision = retry_policy.should_retry(start_time, n_past_retries); - if let reqwest_retry::RetryDecision::Retry { execute_after } = retry_decision { - let duration = execute_after - .duration_since(SystemTime::now()) - .unwrap_or_else(|_| Duration::default()); - warn_user!( - "Transient failure while handling response for {}; retrying after {}s...", - registry, - duration.as_secs() - ); - tokio::time::sleep(duration).await; - n_past_retries += 1; - continue; - } + let middleware_retries = if let Some(RetryError::WithRetries { retries, .. }) = + (&err as &dyn std::error::Error).downcast_ref::() + { + *retries + } else { + 0 + }; + if retry_state.should_retry(&err, middleware_retries).await { + continue; } - return Err(PublishError::PublishSend( group.file.clone(), registry.clone().into(), diff --git a/crates/uv-python/src/downloads.rs b/crates/uv-python/src/downloads.rs index 9c687e688..2e9edf964 100644 --- a/crates/uv-python/src/downloads.rs +++ b/crates/uv-python/src/downloads.rs @@ -5,14 +5,13 @@ use std::path::{Path, PathBuf}; use std::pin::Pin; use std::str::FromStr; use std::task::{Context, Poll}; -use std::time::{Duration, SystemTime}; use std::{env, io}; use futures::TryStreamExt; use itertools::Itertools; use owo_colors::OwoColorize; +use reqwest_retry::RetryError; use reqwest_retry::policies::ExponentialBackoff; -use reqwest_retry::{RetryError, RetryPolicy}; use serde::Deserialize; use thiserror::Error; use tokio::io::{AsyncRead, AsyncReadExt, AsyncWriteExt, BufWriter, ReadBuf}; @@ -21,7 +20,7 @@ use tokio_util::either::Either; use tracing::{debug, instrument}; use url::Url; -use uv_client::{BaseClient, WrappedReqwestError, is_transient_network_error}; +use uv_client::{BaseClient, RetryState, WrappedReqwestError}; use uv_distribution_filename::{ExtensionError, SourceDistExtension}; use uv_extract::hash::Hasher; use uv_fs::{Simplified, rename_with_retry}; @@ -118,26 +117,24 @@ pub enum Error { } impl Error { - // Return the number of attempts that were made to complete this request before this error was - // returned. Note that e.g. 3 retries equates to 4 attempts. + // Return the number of retries that were made to complete this request before this error was + // returned. // - // It's easier to do arithmetic with "attempts" instead of "retries", because if you have - // nested retry loops you can just add up all the attempts directly, while adding up the - // retries requires +1/-1 adjustments. - fn attempts(&self) -> u32 { + // Note that e.g. 3 retries equates to 4 attempts. + fn retries(&self) -> u32 { // Unfortunately different variants of `Error` track retry counts in different ways. We // could consider unifying the variants we handle here in `Error::from_reqwest_middleware` // instead, but both approaches will be fragile as new variants get added over time. if let Self::NetworkErrorWithRetries { retries, .. } = self { - return retries + 1; + return *retries; } if let Self::NetworkMiddlewareError(_, anyhow_error) = self && let Some(RetryError::WithRetries { retries, .. }) = anyhow_error.downcast_ref::() { - return retries + 1; + return *retries; } - 1 + 0 } } @@ -1108,9 +1105,11 @@ impl ManagedPythonDownload { pypy_install_mirror: Option<&str>, reporter: Option<&dyn Reporter>, ) -> Result { - let mut total_attempts = 0; - let mut retried_here = false; - let start_time = SystemTime::now(); + let mut retry_state = RetryState::new( + *retry_policy, + self.download_url(python_install_mirror, pypy_install_mirror)?, + ); + loop { let result = self .fetch( @@ -1123,43 +1122,22 @@ impl ManagedPythonDownload { reporter, ) .await; - let result = match result { - Ok(download_result) => Ok(download_result), + match result { + Ok(download_result) => return Ok(download_result), Err(err) => { - // Inner retry loops (e.g. `reqwest-retry` middleware) might make more than one - // attempt per error we see here. - total_attempts += err.attempts(); - // We currently interpret e.g. "3 retries" to mean we should make 4 attempts. - let n_past_retries = total_attempts - 1; - if is_transient_network_error(&err) { - let retry_decision = retry_policy.should_retry(start_time, n_past_retries); - if let reqwest_retry::RetryDecision::Retry { execute_after } = - retry_decision - { - let duration = execute_after - .duration_since(SystemTime::now()) - .unwrap_or_else(|_| Duration::default()); - debug!( - "Transient failure while handling response for {}; retrying after {}s...", - self.key(), - duration.as_secs() - ); - tokio::time::sleep(duration).await; - retried_here = true; - continue; // Retry. - } + if retry_state.should_retry(&err, err.retries()).await { + continue; } - if retried_here { + return if retry_state.total_retries() > 0 { Err(Error::NetworkErrorWithRetries { err: Box::new(err), - retries: n_past_retries, + retries: retry_state.total_retries(), }) } else { Err(err) - } + }; } }; - return result; } } diff --git a/crates/uv-redacted/src/lib.rs b/crates/uv-redacted/src/lib.rs index ddd1a5dfc..9a5adf280 100644 --- a/crates/uv-redacted/src/lib.rs +++ b/crates/uv-redacted/src/lib.rs @@ -265,6 +265,12 @@ impl From for Url { } } +impl From for DisplaySafeUrl { + fn from(url: Url) -> Self { + Self(url) + } +} + impl FromStr for DisplaySafeUrl { type Err = DisplaySafeUrlError;