Fix retry counts in cached client

Previously, we dropped the counts from the middleware layer, potentially doing to many retries and/or reporting too few.

Not pretty but fixes the bug.
This commit is contained in:
konstin 2025-12-12 18:00:44 +01:00
parent 7571ffa385
commit f4a3fac705
6 changed files with 158 additions and 101 deletions

View File

@ -119,51 +119,35 @@ where
} }
} }
/// Dispatch type: Either a cached client error or a (user specified) error from the callback /// Dispatch type: Either a cached client error or a (user specified) error from the callback.
#[derive(Debug)]
pub enum CachedClientError<CallbackError: std::error::Error + 'static> { pub enum CachedClientError<CallbackError: std::error::Error + 'static> {
Client { /// The client tracks retries internally.
retries: Option<u32>, Client(Error),
err: Error, /// Track retries before a callback explicitly, as we can't attach them to the callback error
}, /// type.
Callback { Callback { retries: u32, err: CallbackError },
retries: Option<u32>,
err: CallbackError,
},
} }
impl<CallbackError: std::error::Error + 'static> CachedClientError<CallbackError> { impl<CallbackError: std::error::Error + 'static> CachedClientError<CallbackError> {
/// Attach the number of retries to the error context. /// Attach the combined number of retries to the error context, discarding the previous count.
///
/// Adds to existing errors if any, in case different layers retried.
fn with_retries(self, retries: u32) -> Self { fn with_retries(self, retries: u32) -> Self {
match self { match self {
Self::Client { Self::Client(err) => Self::Client(err.with_retries(retries)),
retries: existing_retries, Self::Callback { retries: _, err } => Self::Callback { retries, err },
err,
} => Self::Client {
retries: Some(existing_retries.unwrap_or_default() + retries),
err,
},
Self::Callback {
retries: existing_retries,
err,
} => Self::Callback {
retries: Some(existing_retries.unwrap_or_default() + retries),
err,
},
} }
} }
fn retries(&self) -> Option<u32> { fn retries(&self) -> u32 {
match self { match self {
Self::Client { retries, .. } => *retries, Self::Client(err) => err.retries(),
Self::Callback { retries, .. } => *retries, Self::Callback { retries, .. } => *retries,
} }
} }
fn error(&self) -> &(dyn std::error::Error + 'static) { fn error(&self) -> &(dyn std::error::Error + 'static) {
match self { match self {
Self::Client { err, .. } => err, Self::Client(err) => err,
Self::Callback { err, .. } => err, Self::Callback { err, .. } => err,
} }
} }
@ -171,10 +155,7 @@ impl<CallbackError: std::error::Error + 'static> CachedClientError<CallbackError
impl<CallbackError: std::error::Error + 'static> From<Error> for CachedClientError<CallbackError> { impl<CallbackError: std::error::Error + 'static> From<Error> for CachedClientError<CallbackError> {
fn from(error: Error) -> Self { fn from(error: Error) -> Self {
Self::Client { Self::Client(error)
retries: None,
err: error,
}
} }
} }
@ -182,10 +163,7 @@ impl<CallbackError: std::error::Error + 'static> From<ErrorKind>
for CachedClientError<CallbackError> for CachedClientError<CallbackError>
{ {
fn from(error: ErrorKind) -> Self { fn from(error: ErrorKind) -> Self {
Self::Client { Self::Client(error.into())
retries: None,
err: error.into(),
}
} }
} }
@ -193,16 +171,10 @@ impl<E: Into<Self> + std::error::Error + 'static> From<CachedClientError<E>> for
/// Attach retry error context, if there were retries. /// Attach retry error context, if there were retries.
fn from(error: CachedClientError<E>) -> Self { fn from(error: CachedClientError<E>) -> Self {
match error { match error {
CachedClientError::Client { CachedClientError::Client(error) => error,
retries: Some(retries), CachedClientError::Callback { retries, err } => {
err, Self::new(err.into().into_kind(), retries)
} => Self::new(err.into_kind(), retries), }
CachedClientError::Client { retries: None, err } => err,
CachedClientError::Callback {
retries: Some(retries),
err,
} => Self::new(err.into().into_kind(), retries),
CachedClientError::Callback { retries: None, err } => err.into(),
} }
} }
} }
@ -450,10 +422,16 @@ impl CachedClient {
response_callback: Callback, response_callback: Callback,
) -> Result<Payload::Target, CachedClientError<CallBackError>> { ) -> Result<Payload::Target, CachedClientError<CallBackError>> {
let new_cache = info_span!("new_cache", file = %cache_entry.path().display()); let new_cache = info_span!("new_cache", file = %cache_entry.path().display());
// Capture retries from the retry middleware
let retries = response
.extensions()
.get::<reqwest_retry::RetryCount>()
.map(|retries| retries.value())
.unwrap_or_default();
let data = response_callback(response) let data = response_callback(response)
.boxed_local() .boxed_local()
.await .await
.map_err(|err| CachedClientError::Callback { retries: None, err })?; .map_err(|err| CachedClientError::Callback { retries, err })?;
let Some(cache_policy) = cache_policy else { let Some(cache_policy) = cache_policy else {
return Ok(data.into_target()); return Ok(data.into_target());
}; };
@ -662,16 +640,10 @@ impl CachedClient {
} else { } else {
None None
}; };
return Err(CachedClientError::<Error>::Client { return Err(Error::new(
retries: retry_count, ErrorKind::from_reqwest_with_problem_details(url, status_error, problem_details),
err: ErrorKind::from_reqwest_with_problem_details( retry_count.unwrap_or_default(),
url, ));
status_error,
problem_details,
)
.into(),
}
.into());
} }
let cache_policy = cache_policy_builder.build(&response); let cache_policy = cache_policy_builder.build(&response);
@ -720,7 +692,7 @@ impl CachedClient {
cache_control: CacheControl<'_>, cache_control: CacheControl<'_>,
response_callback: Callback, response_callback: Callback,
) -> Result<Payload::Target, CachedClientError<CallBackError>> { ) -> Result<Payload::Target, CachedClientError<CallBackError>> {
let mut past_retries = 0; let mut total_retries = 0;
let start_time = SystemTime::now(); let start_time = SystemTime::now();
let retry_policy = self.uncached().retry_policy(); let retry_policy = self.uncached().retry_policy();
loop { loop {
@ -729,40 +701,39 @@ impl CachedClient {
.get_cacheable(fresh_req, cache_entry, cache_control, &response_callback) .get_cacheable(fresh_req, cache_entry, cache_control, &response_callback)
.await; .await;
// Check if the middleware already performed retries match result {
let middleware_retries = match &result { Ok(ok) => return Ok(ok),
Err(err) => err.retries().unwrap_or_default(), Err(err) => {
Ok(_) => 0, // Check if the middleware already performed retries
}; total_retries += err.retries();
if is_transient_network_error(err.error()) {
// If middleware already retried, consider that in our retry budget
let retry_decision = retry_policy.should_retry(start_time, total_retries);
if let reqwest_retry::RetryDecision::Retry { execute_after } =
retry_decision
{
let duration = execute_after
.duration_since(SystemTime::now())
.unwrap_or_else(|_| Duration::default());
if result debug!(
.as_ref() "Transient failure while handling response from {}; retrying after {:.1}s...",
.is_err_and(|err| is_transient_network_error(err.error())) req.url(),
{ duration.as_secs_f32(),
// If middleware already retried, consider that in our retry budget );
let total_retries = past_retries + middleware_retries; tokio::time::sleep(duration).await;
let retry_decision = retry_policy.should_retry(start_time, total_retries); total_retries += 1;
if let reqwest_retry::RetryDecision::Retry { execute_after } = retry_decision { continue;
let duration = execute_after }
.duration_since(SystemTime::now()) }
.unwrap_or_else(|_| Duration::default());
debug!( return if total_retries > 0 {
"Transient failure while handling response from {}; retrying after {:.1}s...", Err(err.with_retries(total_retries))
req.url(), } else {
duration.as_secs_f32(), Err(err)
); };
tokio::time::sleep(duration).await;
past_retries += 1;
continue;
} }
} }
if past_retries > 0 {
return result.map_err(|err| err.with_retries(past_retries));
}
return result;
} }
} }
@ -791,14 +762,13 @@ impl CachedClient {
// Check if the middleware already performed retries // Check if the middleware already performed retries
let middleware_retries = match &result { let middleware_retries = match &result {
Err(err) => err.retries().unwrap_or_default(), Err(err) => err.retries(),
_ => 0, _ => 0,
}; };
if result if result
.as_ref() .as_ref()
.err() .is_err_and(|err| is_transient_network_error(err.error()))
.is_some_and(|err| is_transient_network_error(err.error()))
{ {
let total_retries = past_retries + middleware_retries; let total_retries = past_retries + middleware_retries;
let retry_decision = retry_policy.should_retry(start_time, total_retries); let retry_decision = retry_policy.should_retry(start_time, total_retries);

View File

@ -123,6 +123,11 @@ impl Error {
&self.kind &self.kind
} }
pub(crate) fn with_retries(mut self, retries: u32) -> Self {
self.retries = retries;
self
}
/// Create a new error from a JSON parsing error. /// Create a new error from a JSON parsing error.
pub(crate) fn from_json_err(err: serde_json::Error, url: DisplaySafeUrl) -> Self { pub(crate) fn from_json_err(err: serde_json::Error, url: DisplaySafeUrl) -> Self {
ErrorKind::BadJson { source: err, url }.into() ErrorKind::BadJson { source: err, url }.into()

View File

@ -246,7 +246,7 @@ impl<'a> FlatIndexClient<'a> {
.collect(); .collect();
Ok(FlatIndexEntries::from_entries(files)) Ok(FlatIndexEntries::from_entries(files))
} }
Err(CachedClientError::Client { err, .. }) if err.is_offline() => { Err(CachedClientError::Client(err)) if err.is_offline() => {
Ok(FlatIndexEntries::offline()) Ok(FlatIndexEntries::offline())
} }
Err(err) => Err(err.into()), Err(err) => Err(err.into()),

View File

@ -721,7 +721,7 @@ impl<'a, Context: BuildContext> DistributionDatabase<'a, Context> {
.await .await
.map_err(|err| match err { .map_err(|err| match err {
CachedClientError::Callback { err, .. } => err, CachedClientError::Callback { err, .. } => err,
CachedClientError::Client { err, .. } => Error::Client(err), CachedClientError::Client(err) => Error::Client(err),
})?; })?;
// If the archive is missing the required hashes, or has since been removed, force a refresh. // If the archive is missing the required hashes, or has since been removed, force a refresh.
@ -745,7 +745,7 @@ impl<'a, Context: BuildContext> DistributionDatabase<'a, Context> {
.await .await
.map_err(|err| match err { .map_err(|err| match err {
CachedClientError::Callback { err, .. } => err, CachedClientError::Callback { err, .. } => err,
CachedClientError::Client { err, .. } => Error::Client(err), CachedClientError::Client(err) => Error::Client(err),
}) })
}) })
.await? .await?
@ -926,7 +926,7 @@ impl<'a, Context: BuildContext> DistributionDatabase<'a, Context> {
.await .await
.map_err(|err| match err { .map_err(|err| match err {
CachedClientError::Callback { err, .. } => err, CachedClientError::Callback { err, .. } => err,
CachedClientError::Client { err, .. } => Error::Client(err), CachedClientError::Client(err) => Error::Client(err),
})?; })?;
// If the archive is missing the required hashes, or has since been removed, force a refresh. // If the archive is missing the required hashes, or has since been removed, force a refresh.
@ -950,7 +950,7 @@ impl<'a, Context: BuildContext> DistributionDatabase<'a, Context> {
.await .await
.map_err(|err| match err { .map_err(|err| match err {
CachedClientError::Callback { err, .. } => err, CachedClientError::Callback { err, .. } => err,
CachedClientError::Client { err, .. } => Error::Client(err), CachedClientError::Client(err) => Error::Client(err),
}) })
}) })
.await? .await?

View File

@ -823,7 +823,7 @@ impl<'a, T: BuildContext> SourceDistributionBuilder<'a, T> {
.await .await
.map_err(|err| match err { .map_err(|err| match err {
CachedClientError::Callback { err, .. } => err, CachedClientError::Callback { err, .. } => err,
CachedClientError::Client { err, .. } => Error::Client(err), CachedClientError::Client(err) => Error::Client(err),
})?; })?;
// If the archive is missing the required hashes, force a refresh. // If the archive is missing the required hashes, force a refresh.
@ -843,7 +843,7 @@ impl<'a, T: BuildContext> SourceDistributionBuilder<'a, T> {
.await .await
.map_err(|err| match err { .map_err(|err| match err {
CachedClientError::Callback { err, .. } => err, CachedClientError::Callback { err, .. } => err,
CachedClientError::Client { err, .. } => Error::Client(err), CachedClientError::Client(err) => Error::Client(err),
}) })
}) })
.await .await
@ -2280,7 +2280,7 @@ impl<'a, T: BuildContext> SourceDistributionBuilder<'a, T> {
.await .await
.map_err(|err| match err { .map_err(|err| match err {
CachedClientError::Callback { err, .. } => err, CachedClientError::Callback { err, .. } => err,
CachedClientError::Client { err, .. } => Error::Client(err), CachedClientError::Client(err) => Error::Client(err),
}) })
}) })
.await .await

View File

@ -37,6 +37,30 @@ async fn io_error_server() -> (MockServer, String) {
(server, mock_server_uri) (server, mock_server_uri)
} }
/// Answers with a retryable HTTP status 500 for 2 times, then with a retryable connection reset
/// IO error.
///
/// Tests different errors paths inside uv, which retries 3 times by default, for a total for 4
/// requests.
async fn mixed_error_server() -> (MockServer, String) {
let server = MockServer::start().await;
Mock::given(method("GET"))
.respond_with_err(connection_reset)
.up_to_n_times(2)
.mount(&server)
.await;
Mock::given(method("GET"))
.respond_with(ResponseTemplate::new(StatusCode::INTERNAL_SERVER_ERROR))
.up_to_n_times(2)
.mount(&server)
.await;
let mock_server_uri = server.uri();
(server, mock_server_uri)
}
/// Check the simple index error message when the server returns HTTP status 500, a retryable error. /// Check the simple index error message when the server returns HTTP status 500, a retryable error.
#[tokio::test] #[tokio::test]
async fn simple_http_500() { async fn simple_http_500() {
@ -149,6 +173,35 @@ async fn find_links_io_error() {
"); ");
} }
/// Check the error message for a find links index page, a non-streaming request, when the server
/// returns different kinds of retryable errors.
#[tokio::test]
async fn find_links_mixed_error() {
let context = TestContext::new("3.12");
let (_server_drop_guard, mock_server_uri) = mixed_error_server().await;
let filters = vec![(mock_server_uri.as_str(), "[SERVER]")];
uv_snapshot!(filters, context
.pip_install()
.arg("tqdm")
.arg("--no-index")
.arg("--find-links")
.arg(&mock_server_uri)
.env_remove(EnvVars::UV_HTTP_RETRIES)
.env(EnvVars::UV_TEST_NO_HTTP_RETRY_DELAY, "true"), @r"
success: false
exit_code: 2
----- stdout -----
----- stderr -----
error: Failed to read `--find-links` URL: [SERVER]/
Caused by: Request failed after 3 retries
Caused by: Failed to fetch: `[SERVER]/`
Caused by: HTTP status server error (500 Internal Server Error) for url ([SERVER]/)
");
}
/// Check the direct package URL error message when the server returns HTTP status 500, a retryable /// Check the direct package URL error message when the server returns HTTP status 500, a retryable
/// error. /// error.
#[tokio::test] #[tokio::test]
@ -193,7 +246,7 @@ async fn direct_url_io_error() {
.pip_install() .pip_install()
.arg(format!("tqdm @ {tqdm_url}")) .arg(format!("tqdm @ {tqdm_url}"))
.env_remove(EnvVars::UV_HTTP_RETRIES) .env_remove(EnvVars::UV_HTTP_RETRIES)
.env(EnvVars::UV_TEST_NO_HTTP_RETRY_DELAY, "true"), @r" .env(EnvVars::UV_TEST_NO_HTTP_RETRY_DELAY, "true"), @r#"
success: false success: false
exit_code: 1 exit_code: 1
----- stdout ----- ----- stdout -----
@ -205,6 +258,35 @@ async fn direct_url_io_error() {
error sending request for url ([SERVER]/packages/d0/30/dc54f88dd4a2b5dc8a0279bdd7270e735851848b762aeb1c1184ed1f6b14/tqdm-4.67.1-py3-none-any.whl) error sending request for url ([SERVER]/packages/d0/30/dc54f88dd4a2b5dc8a0279bdd7270e735851848b762aeb1c1184ed1f6b14/tqdm-4.67.1-py3-none-any.whl)
client error (SendRequest) client error (SendRequest)
connection closed before message completed connection closed before message completed
"#);
}
/// Check the error message for direct package URL, a streaming request, when the server returns
/// different kinds of retryable errors.
#[tokio::test]
async fn direct_url_mixed_error() {
let context = TestContext::new("3.12");
let (_server_drop_guard, mock_server_uri) = mixed_error_server().await;
let tqdm_url = format!(
"{mock_server_uri}/packages/d0/30/dc54f88dd4a2b5dc8a0279bdd7270e735851848b762aeb1c1184ed1f6b14/tqdm-4.67.1-py3-none-any.whl"
);
let filters = vec![(mock_server_uri.as_str(), "[SERVER]")];
uv_snapshot!(filters, context
.pip_install()
.arg(format!("tqdm @ {tqdm_url}"))
.env_remove(EnvVars::UV_HTTP_RETRIES)
.env(EnvVars::UV_TEST_NO_HTTP_RETRY_DELAY, "true"), @r"
success: false
exit_code: 1
----- stdout -----
----- stderr -----
× Failed to download `tqdm @ [SERVER]/packages/d0/30/dc54f88dd4a2b5dc8a0279bdd7270e735851848b762aeb1c1184ed1f6b14/tqdm-4.67.1-py3-none-any.whl`
Request failed after 3 retries
Failed to fetch: `[SERVER]/packages/d0/30/dc54f88dd4a2b5dc8a0279bdd7270e735851848b762aeb1c1184ed1f6b14/tqdm-4.67.1-py3-none-any.whl`
HTTP status server error (500 Internal Server Error) for url ([SERVER]/packages/d0/30/dc54f88dd4a2b5dc8a0279bdd7270e735851848b762aeb1c1184ed1f6b14/tqdm-4.67.1-py3-none-any.whl)
"); ");
} }