mirror of https://github.com/astral-sh/uv
Use the same retry logic across uv
We were using slightly different retry code in multiple places, which this PR unifies. Also fixes retry undercounting in publish if the retry middleware was involved.
This commit is contained in:
parent
f4a3fac705
commit
c79ce523a0
|
|
@ -5607,7 +5607,6 @@ dependencies = [
|
|||
"thiserror 2.0.17",
|
||||
"tokio",
|
||||
"tokio-util",
|
||||
"tracing",
|
||||
"url",
|
||||
"uv-cache",
|
||||
"uv-client",
|
||||
|
|
|
|||
|
|
@ -34,5 +34,4 @@ tempfile = { workspace = true }
|
|||
thiserror = { workspace = true }
|
||||
tokio = { workspace = true }
|
||||
tokio-util = { workspace = true }
|
||||
tracing = { workspace = true }
|
||||
url = { workspace = true }
|
||||
|
|
|
|||
|
|
@ -6,21 +6,18 @@
|
|||
use std::path::PathBuf;
|
||||
use std::pin::Pin;
|
||||
use std::task::{Context, Poll};
|
||||
use std::time::{Duration, SystemTime};
|
||||
|
||||
use futures::TryStreamExt;
|
||||
use reqwest_retry::RetryPolicy;
|
||||
use reqwest_retry::policies::ExponentialBackoff;
|
||||
use std::fmt;
|
||||
use thiserror::Error;
|
||||
use tokio::io::{AsyncRead, ReadBuf};
|
||||
use tokio_util::compat::FuturesAsyncReadCompatExt;
|
||||
use tracing::debug;
|
||||
use url::Url;
|
||||
use uv_distribution_filename::SourceDistExtension;
|
||||
|
||||
use uv_cache::{Cache, CacheBucket, CacheEntry};
|
||||
use uv_client::{BaseClient, is_transient_network_error};
|
||||
use uv_client::{BaseClient, RetryState};
|
||||
use uv_extract::{Error as ExtractError, stream};
|
||||
use uv_fs::LockedFileError;
|
||||
use uv_pep440::Version;
|
||||
|
|
@ -142,7 +139,7 @@ pub enum Error {
|
|||
#[error("Failed to detect platform")]
|
||||
Platform(#[from] uv_platform::Error),
|
||||
|
||||
#[error("Attempt failed after {retries} {subject}", subject = if *retries > 1 { "retries" } else { "retry" })]
|
||||
#[error("Request failed after {retries} {subject}", subject = if *retries > 1 { "retries" } else { "retry" })]
|
||||
RetriedError {
|
||||
#[source]
|
||||
err: Box<Error>,
|
||||
|
|
@ -243,9 +240,7 @@ async fn download_and_unpack_with_retry(
|
|||
download_url: &Url,
|
||||
cache_entry: &CacheEntry,
|
||||
) -> Result<PathBuf, Error> {
|
||||
let mut total_attempts = 0;
|
||||
let mut retried_here = false;
|
||||
let start_time = SystemTime::now();
|
||||
let mut retry_state = RetryState::new(*retry_policy, download_url.clone());
|
||||
|
||||
loop {
|
||||
let result = download_and_unpack(
|
||||
|
|
@ -264,30 +259,14 @@ async fn download_and_unpack_with_retry(
|
|||
let result = match result {
|
||||
Ok(path) => Ok(path),
|
||||
Err(err) => {
|
||||
total_attempts += err.attempts();
|
||||
let past_retries = total_attempts - 1;
|
||||
|
||||
if is_transient_network_error(&err) {
|
||||
let retry_decision = retry_policy.should_retry(start_time, past_retries);
|
||||
if let reqwest_retry::RetryDecision::Retry { execute_after } = retry_decision {
|
||||
debug!(
|
||||
"Transient failure while installing {} {}; retrying...",
|
||||
binary.name(),
|
||||
version
|
||||
);
|
||||
let duration = execute_after
|
||||
.duration_since(SystemTime::now())
|
||||
.unwrap_or_else(|_| Duration::default());
|
||||
tokio::time::sleep(duration).await;
|
||||
retried_here = true;
|
||||
if retry_state.should_retry(&err, err.attempts()).await {
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
if retried_here {
|
||||
if retry_state.total_retries() > 0 {
|
||||
Err(Error::RetriedError {
|
||||
err: Box::new(err),
|
||||
retries: past_retries,
|
||||
retries: retry_state.total_retries(),
|
||||
})
|
||||
} else {
|
||||
Err(err)
|
||||
|
|
|
|||
|
|
@ -4,7 +4,7 @@ use std::fmt::Write;
|
|||
use std::num::ParseIntError;
|
||||
use std::path::Path;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
use std::time::{Duration, SystemTime};
|
||||
use std::{env, io, iter};
|
||||
|
||||
use anyhow::anyhow;
|
||||
|
|
@ -20,7 +20,7 @@ use reqwest::{Client, ClientBuilder, IntoUrl, Proxy, Request, Response, multipar
|
|||
use reqwest_middleware::{ClientWithMiddleware, Middleware};
|
||||
use reqwest_retry::policies::ExponentialBackoff;
|
||||
use reqwest_retry::{
|
||||
RetryTransientMiddleware, Retryable, RetryableStrategy, default_on_request_error,
|
||||
RetryPolicy, RetryTransientMiddleware, Retryable, RetryableStrategy, default_on_request_error,
|
||||
default_on_request_success,
|
||||
};
|
||||
use thiserror::Error;
|
||||
|
|
@ -1139,8 +1139,66 @@ fn retryable_on_request_failure(err: &(dyn Error + 'static)) -> Option<Retryable
|
|||
None
|
||||
}
|
||||
|
||||
pub fn is_transient_network_error(err: &(dyn Error + 'static)) -> bool {
|
||||
retryable_on_request_failure(err) == Some(Retryable::Transient)
|
||||
/// Decide whether a requests should be retried, tracking previous attempts and backoff.
|
||||
pub struct RetryState {
|
||||
retry_policy: ExponentialBackoff,
|
||||
start_time: SystemTime,
|
||||
total_retries: u32,
|
||||
url: DisplaySafeUrl,
|
||||
}
|
||||
|
||||
impl RetryState {
|
||||
pub fn new(retry_policy: ExponentialBackoff, url: impl Into<DisplaySafeUrl>) -> Self {
|
||||
Self {
|
||||
retry_policy,
|
||||
start_time: SystemTime::now(),
|
||||
total_retries: 0,
|
||||
url: url.into(),
|
||||
}
|
||||
}
|
||||
|
||||
/// The number of retries across all requests.
|
||||
///
|
||||
/// After a failed retryable request, this equals the maximum number of retries.
|
||||
pub fn total_retries(&self) -> u32 {
|
||||
self.total_retries
|
||||
}
|
||||
|
||||
/// Whether request should be retried. Waits with backoff if required.
|
||||
///
|
||||
/// Takes the number of retries from nested layers associated with the specific `err` type as
|
||||
/// `error_retries`.
|
||||
pub async fn should_retry(&mut self, err: &(dyn Error + 'static), error_retries: u32) -> bool {
|
||||
// Check if the middleware already performed retries
|
||||
self.total_retries += error_retries;
|
||||
match retryable_on_request_failure(err) {
|
||||
Some(Retryable::Transient) => {
|
||||
// If middleware already retried, consider that in our retry budget
|
||||
let retry_decision = self
|
||||
.retry_policy
|
||||
.should_retry(self.start_time, self.total_retries);
|
||||
if let reqwest_retry::RetryDecision::Retry { execute_after } = retry_decision {
|
||||
let duration = execute_after
|
||||
.duration_since(SystemTime::now())
|
||||
.unwrap_or_else(|_| Duration::default());
|
||||
|
||||
debug!(
|
||||
"Transient failure while handling response from {}; retrying after {:.1}s...",
|
||||
self.url,
|
||||
duration.as_secs_f32(),
|
||||
);
|
||||
// TODO(konsti): Should we show a spinner plus a message in the CLI while
|
||||
// waiting?
|
||||
tokio::time::sleep(duration).await;
|
||||
self.total_retries += 1;
|
||||
return true;
|
||||
}
|
||||
|
||||
false
|
||||
}
|
||||
Some(Retryable::Fatal) | None => false,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Whether the error is a status code error that is retryable.
|
||||
|
|
@ -1410,7 +1468,7 @@ mod tests {
|
|||
|
||||
let uv_retry = match response.error_for_status() {
|
||||
Ok(_) => false,
|
||||
Err(err) => is_transient_network_error(&err),
|
||||
Err(err) => retryable_on_request_failure(&err) == Some(Retryable::Transient),
|
||||
};
|
||||
|
||||
// Ensure we're retrying the same status code as the reqwest_retry crate. We may choose
|
||||
|
|
|
|||
|
|
@ -1,9 +1,7 @@
|
|||
use std::time::{Duration, SystemTime};
|
||||
use std::{borrow::Cow, path::Path};
|
||||
|
||||
use futures::FutureExt;
|
||||
use reqwest::{Request, Response};
|
||||
use reqwest_retry::RetryPolicy;
|
||||
use rkyv::util::AlignedVec;
|
||||
use serde::de::DeserializeOwned;
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
|
@ -14,7 +12,7 @@ use uv_fs::write_atomic;
|
|||
use uv_redacted::DisplaySafeUrl;
|
||||
|
||||
use crate::BaseClient;
|
||||
use crate::base_client::is_transient_network_error;
|
||||
use crate::base_client::RetryState;
|
||||
use crate::error::ProblemDetails;
|
||||
use crate::{
|
||||
Error, ErrorKind,
|
||||
|
|
@ -692,9 +690,7 @@ impl CachedClient {
|
|||
cache_control: CacheControl<'_>,
|
||||
response_callback: Callback,
|
||||
) -> Result<Payload::Target, CachedClientError<CallBackError>> {
|
||||
let mut total_retries = 0;
|
||||
let start_time = SystemTime::now();
|
||||
let retry_policy = self.uncached().retry_policy();
|
||||
let mut retry_state = RetryState::new(self.uncached().retry_policy(), req.url().clone());
|
||||
loop {
|
||||
let fresh_req = req.try_clone().expect("HTTP request must be cloneable");
|
||||
let result = self
|
||||
|
|
@ -704,34 +700,10 @@ impl CachedClient {
|
|||
match result {
|
||||
Ok(ok) => return Ok(ok),
|
||||
Err(err) => {
|
||||
// Check if the middleware already performed retries
|
||||
total_retries += err.retries();
|
||||
if is_transient_network_error(err.error()) {
|
||||
// If middleware already retried, consider that in our retry budget
|
||||
let retry_decision = retry_policy.should_retry(start_time, total_retries);
|
||||
if let reqwest_retry::RetryDecision::Retry { execute_after } =
|
||||
retry_decision
|
||||
{
|
||||
let duration = execute_after
|
||||
.duration_since(SystemTime::now())
|
||||
.unwrap_or_else(|_| Duration::default());
|
||||
|
||||
debug!(
|
||||
"Transient failure while handling response from {}; retrying after {:.1}s...",
|
||||
req.url(),
|
||||
duration.as_secs_f32(),
|
||||
);
|
||||
tokio::time::sleep(duration).await;
|
||||
total_retries += 1;
|
||||
if retry_state.should_retry(err.error(), err.retries()).await {
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
return if total_retries > 0 {
|
||||
Err(err.with_retries(total_retries))
|
||||
} else {
|
||||
Err(err)
|
||||
};
|
||||
return Err(err.with_retries(retry_state.total_retries()));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -751,47 +723,22 @@ impl CachedClient {
|
|||
cache_control: CacheControl<'_>,
|
||||
response_callback: Callback,
|
||||
) -> Result<Payload, CachedClientError<CallBackError>> {
|
||||
let mut past_retries = 0;
|
||||
let start_time = SystemTime::now();
|
||||
let retry_policy = self.uncached().retry_policy();
|
||||
let mut retry_state = RetryState::new(self.uncached().retry_policy(), req.url().clone());
|
||||
loop {
|
||||
let fresh_req = req.try_clone().expect("HTTP request must be cloneable");
|
||||
let result = self
|
||||
.skip_cache(fresh_req, cache_entry, cache_control, &response_callback)
|
||||
.await;
|
||||
|
||||
// Check if the middleware already performed retries
|
||||
let middleware_retries = match &result {
|
||||
Err(err) => err.retries(),
|
||||
_ => 0,
|
||||
};
|
||||
|
||||
if result
|
||||
.as_ref()
|
||||
.is_err_and(|err| is_transient_network_error(err.error()))
|
||||
{
|
||||
let total_retries = past_retries + middleware_retries;
|
||||
let retry_decision = retry_policy.should_retry(start_time, total_retries);
|
||||
if let reqwest_retry::RetryDecision::Retry { execute_after } = retry_decision {
|
||||
let duration = execute_after
|
||||
.duration_since(SystemTime::now())
|
||||
.unwrap_or_else(|_| Duration::default());
|
||||
debug!(
|
||||
"Transient failure while handling response from {}; retrying after {}s...",
|
||||
req.url(),
|
||||
duration.as_secs(),
|
||||
);
|
||||
tokio::time::sleep(duration).await;
|
||||
past_retries += 1;
|
||||
match result {
|
||||
Ok(ok) => return Ok(ok),
|
||||
Err(err) => {
|
||||
if retry_state.should_retry(err.error(), err.retries()).await {
|
||||
continue;
|
||||
}
|
||||
return Err(err.with_retries(retry_state.total_retries()));
|
||||
}
|
||||
|
||||
if past_retries > 0 {
|
||||
return result.map_err(|err| err.with_retries(past_retries));
|
||||
}
|
||||
|
||||
return result;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,7 +1,7 @@
|
|||
pub use base_client::{
|
||||
AuthIntegration, BaseClient, BaseClientBuilder, DEFAULT_RETRIES, ExtraMiddleware,
|
||||
RedirectClientWithMiddleware, RequestBuilder, RetryParsingError, UvRetryableStrategy,
|
||||
is_transient_network_error,
|
||||
RedirectClientWithMiddleware, RequestBuilder, RetryParsingError, RetryState,
|
||||
UvRetryableStrategy,
|
||||
};
|
||||
pub use cached_client::{CacheControl, CachedClient, CachedClientError, DataWithCachePolicy};
|
||||
pub use error::{Error, ErrorKind, WrappedReqwestError};
|
||||
|
|
|
|||
|
|
@ -3,7 +3,6 @@ mod trusted_publishing;
|
|||
use std::collections::BTreeSet;
|
||||
use std::path::{Path, PathBuf};
|
||||
use std::sync::Arc;
|
||||
use std::time::{Duration, SystemTime};
|
||||
use std::{fmt, io};
|
||||
|
||||
use fs_err::tokio::File;
|
||||
|
|
@ -13,7 +12,7 @@ use itertools::Itertools;
|
|||
use reqwest::header::AUTHORIZATION;
|
||||
use reqwest::multipart::Part;
|
||||
use reqwest::{Body, Response, StatusCode};
|
||||
use reqwest_retry::RetryPolicy;
|
||||
use reqwest_retry::RetryError;
|
||||
use reqwest_retry::policies::ExponentialBackoff;
|
||||
use rustc_hash::FxHashMap;
|
||||
use serde::Deserialize;
|
||||
|
|
@ -28,7 +27,7 @@ use uv_auth::{Credentials, PyxTokenStore};
|
|||
use uv_cache::{Cache, Refresh};
|
||||
use uv_client::{
|
||||
BaseClient, MetadataFormat, OwnedArchive, RegistryClientBuilder, RequestBuilder,
|
||||
RetryParsingError, is_transient_network_error,
|
||||
RetryParsingError, RetryState,
|
||||
};
|
||||
use uv_configuration::{KeyringProviderType, TrustedPublishing};
|
||||
use uv_distribution_filename::{DistFilename, SourceDistExtension, SourceDistFilename};
|
||||
|
|
@ -469,8 +468,8 @@ pub async fn upload(
|
|||
download_concurrency: &Semaphore,
|
||||
reporter: Arc<impl Reporter>,
|
||||
) -> Result<bool, PublishError> {
|
||||
let mut n_past_retries = 0;
|
||||
let start_time = SystemTime::now();
|
||||
let mut retry_state = RetryState::new(retry_policy, registry.clone());
|
||||
|
||||
loop {
|
||||
let (request, idx) = build_upload_request(
|
||||
group,
|
||||
|
|
@ -490,23 +489,16 @@ pub async fn upload(
|
|||
response
|
||||
}
|
||||
Err(err) => {
|
||||
if is_transient_network_error(&err) {
|
||||
let retry_decision = retry_policy.should_retry(start_time, n_past_retries);
|
||||
if let reqwest_retry::RetryDecision::Retry { execute_after } = retry_decision {
|
||||
let duration = execute_after
|
||||
.duration_since(SystemTime::now())
|
||||
.unwrap_or_else(|_| Duration::default());
|
||||
warn_user!(
|
||||
"Transient failure while handling response for {}; retrying after {}s...",
|
||||
registry,
|
||||
duration.as_secs()
|
||||
);
|
||||
tokio::time::sleep(duration).await;
|
||||
n_past_retries += 1;
|
||||
let middleware_retries = if let Some(RetryError::WithRetries { retries, .. }) =
|
||||
(&err as &dyn std::error::Error).downcast_ref::<RetryError>()
|
||||
{
|
||||
*retries
|
||||
} else {
|
||||
0
|
||||
};
|
||||
if retry_state.should_retry(&err, middleware_retries).await {
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
return Err(PublishError::PublishSend(
|
||||
group.file.clone(),
|
||||
registry.clone().into(),
|
||||
|
|
|
|||
|
|
@ -5,14 +5,13 @@ use std::path::{Path, PathBuf};
|
|||
use std::pin::Pin;
|
||||
use std::str::FromStr;
|
||||
use std::task::{Context, Poll};
|
||||
use std::time::{Duration, SystemTime};
|
||||
use std::{env, io};
|
||||
|
||||
use futures::TryStreamExt;
|
||||
use itertools::Itertools;
|
||||
use owo_colors::OwoColorize;
|
||||
use reqwest_retry::RetryError;
|
||||
use reqwest_retry::policies::ExponentialBackoff;
|
||||
use reqwest_retry::{RetryError, RetryPolicy};
|
||||
use serde::Deserialize;
|
||||
use thiserror::Error;
|
||||
use tokio::io::{AsyncRead, AsyncReadExt, AsyncWriteExt, BufWriter, ReadBuf};
|
||||
|
|
@ -21,7 +20,7 @@ use tokio_util::either::Either;
|
|||
use tracing::{debug, instrument};
|
||||
use url::Url;
|
||||
|
||||
use uv_client::{BaseClient, WrappedReqwestError, is_transient_network_error};
|
||||
use uv_client::{BaseClient, RetryState, WrappedReqwestError};
|
||||
use uv_distribution_filename::{ExtensionError, SourceDistExtension};
|
||||
use uv_extract::hash::Hasher;
|
||||
use uv_fs::{Simplified, rename_with_retry};
|
||||
|
|
@ -118,26 +117,24 @@ pub enum Error {
|
|||
}
|
||||
|
||||
impl Error {
|
||||
// Return the number of attempts that were made to complete this request before this error was
|
||||
// returned. Note that e.g. 3 retries equates to 4 attempts.
|
||||
// Return the number of retries that were made to complete this request before this error was
|
||||
// returned.
|
||||
//
|
||||
// It's easier to do arithmetic with "attempts" instead of "retries", because if you have
|
||||
// nested retry loops you can just add up all the attempts directly, while adding up the
|
||||
// retries requires +1/-1 adjustments.
|
||||
fn attempts(&self) -> u32 {
|
||||
// Note that e.g. 3 retries equates to 4 attempts.
|
||||
fn retries(&self) -> u32 {
|
||||
// Unfortunately different variants of `Error` track retry counts in different ways. We
|
||||
// could consider unifying the variants we handle here in `Error::from_reqwest_middleware`
|
||||
// instead, but both approaches will be fragile as new variants get added over time.
|
||||
if let Self::NetworkErrorWithRetries { retries, .. } = self {
|
||||
return retries + 1;
|
||||
return *retries;
|
||||
}
|
||||
if let Self::NetworkMiddlewareError(_, anyhow_error) = self
|
||||
&& let Some(RetryError::WithRetries { retries, .. }) =
|
||||
anyhow_error.downcast_ref::<RetryError>()
|
||||
{
|
||||
return retries + 1;
|
||||
return *retries;
|
||||
}
|
||||
1
|
||||
0
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -1108,9 +1105,11 @@ impl ManagedPythonDownload {
|
|||
pypy_install_mirror: Option<&str>,
|
||||
reporter: Option<&dyn Reporter>,
|
||||
) -> Result<DownloadResult, Error> {
|
||||
let mut total_attempts = 0;
|
||||
let mut retried_here = false;
|
||||
let start_time = SystemTime::now();
|
||||
let mut retry_state = RetryState::new(
|
||||
*retry_policy,
|
||||
self.download_url(python_install_mirror, pypy_install_mirror)?,
|
||||
);
|
||||
|
||||
loop {
|
||||
let result = self
|
||||
.fetch(
|
||||
|
|
@ -1123,43 +1122,22 @@ impl ManagedPythonDownload {
|
|||
reporter,
|
||||
)
|
||||
.await;
|
||||
let result = match result {
|
||||
Ok(download_result) => Ok(download_result),
|
||||
match result {
|
||||
Ok(download_result) => return Ok(download_result),
|
||||
Err(err) => {
|
||||
// Inner retry loops (e.g. `reqwest-retry` middleware) might make more than one
|
||||
// attempt per error we see here.
|
||||
total_attempts += err.attempts();
|
||||
// We currently interpret e.g. "3 retries" to mean we should make 4 attempts.
|
||||
let n_past_retries = total_attempts - 1;
|
||||
if is_transient_network_error(&err) {
|
||||
let retry_decision = retry_policy.should_retry(start_time, n_past_retries);
|
||||
if let reqwest_retry::RetryDecision::Retry { execute_after } =
|
||||
retry_decision
|
||||
{
|
||||
let duration = execute_after
|
||||
.duration_since(SystemTime::now())
|
||||
.unwrap_or_else(|_| Duration::default());
|
||||
debug!(
|
||||
"Transient failure while handling response for {}; retrying after {}s...",
|
||||
self.key(),
|
||||
duration.as_secs()
|
||||
);
|
||||
tokio::time::sleep(duration).await;
|
||||
retried_here = true;
|
||||
continue; // Retry.
|
||||
if retry_state.should_retry(&err, err.retries()).await {
|
||||
continue;
|
||||
}
|
||||
}
|
||||
if retried_here {
|
||||
return if retry_state.total_retries() > 0 {
|
||||
Err(Error::NetworkErrorWithRetries {
|
||||
err: Box::new(err),
|
||||
retries: n_past_retries,
|
||||
retries: retry_state.total_retries(),
|
||||
})
|
||||
} else {
|
||||
Err(err)
|
||||
}
|
||||
};
|
||||
}
|
||||
};
|
||||
return result;
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -265,6 +265,12 @@ impl From<DisplaySafeUrl> for Url {
|
|||
}
|
||||
}
|
||||
|
||||
impl From<Url> for DisplaySafeUrl {
|
||||
fn from(url: Url) -> Self {
|
||||
Self(url)
|
||||
}
|
||||
}
|
||||
|
||||
impl FromStr for DisplaySafeUrl {
|
||||
type Err = DisplaySafeUrlError;
|
||||
|
||||
|
|
|
|||
Loading…
Reference in New Issue