This commit is contained in:
konstin 2025-12-16 17:30:10 +01:00
parent e65c8a6002
commit 468b19eea6
5 changed files with 43 additions and 20 deletions

View File

@ -148,13 +148,15 @@ pub enum Error {
} }
impl Error { impl Error {
/// Return the number of attempts that were made to complete this request before this error was /// Return the number of retries that were made to complete this request before this error was
/// returned. Note that e.g. 3 retries equates to 4 attempts. /// returned.
fn attempts(&self) -> u32 { ///
/// Note that e.g. 3 retries equates to 4 attempts.
fn retries(&self) -> u32 {
if let Self::RetriedError { retries, .. } = self { if let Self::RetriedError { retries, .. } = self {
return retries + 1; return *retries;
} }
1 0
} }
} }
@ -240,7 +242,7 @@ async fn download_and_unpack_with_retry(
download_url: &Url, download_url: &Url,
cache_entry: &CacheEntry, cache_entry: &CacheEntry,
) -> Result<PathBuf, Error> { ) -> Result<PathBuf, Error> {
let mut retry_state = RetryState::new(*retry_policy, download_url.clone()); let mut retry_state = RetryState::start(*retry_policy, download_url.clone());
loop { loop {
let result = download_and_unpack( let result = download_and_unpack(
@ -259,7 +261,10 @@ async fn download_and_unpack_with_retry(
let result = match result { let result = match result {
Ok(path) => Ok(path), Ok(path) => Ok(path),
Err(err) => { Err(err) => {
if retry_state.should_retry(&err, err.attempts()).await { if retry_state
.handle_retry_and_backoff(&err, err.retries())
.await
{
continue; continue;
} }

View File

@ -1148,7 +1148,8 @@ pub struct RetryState {
} }
impl RetryState { impl RetryState {
pub fn new(retry_policy: ExponentialBackoff, url: impl Into<DisplaySafeUrl>) -> Self { /// Initialize the [`RetryState`] and start the backoff timer.
pub fn start(retry_policy: ExponentialBackoff, url: impl Into<DisplaySafeUrl>) -> Self {
Self { Self {
retry_policy, retry_policy,
start_time: SystemTime::now(), start_time: SystemTime::now(),
@ -1164,16 +1165,21 @@ impl RetryState {
self.total_retries self.total_retries
} }
/// Whether request should be retried. Waits with backoff if required. /// Determines whether request should be retried and waits with backoff if it's retried.
/// ///
/// Takes the number of retries from nested layers associated with the specific `err` type as /// Takes the number of retries from nested layers associated with the specific `err` type as
/// `error_retries`. /// `error_retries`.
pub async fn should_retry(&mut self, err: &(dyn Error + 'static), error_retries: u32) -> bool { ///
// Check if the middleware already performed retries /// Returns `true` if the request should be retried.
pub async fn handle_retry_and_backoff(
&mut self,
err: &(dyn Error + 'static),
error_retries: u32,
) -> bool {
// If the middleware performed any retries, consider them in our budget.
self.total_retries += error_retries; self.total_retries += error_retries;
match retryable_on_request_failure(err) { match retryable_on_request_failure(err) {
Some(Retryable::Transient) => { Some(Retryable::Transient) => {
// If middleware already retried, consider that in our retry budget
let retry_decision = self let retry_decision = self
.retry_policy .retry_policy
.should_retry(self.start_time, self.total_retries); .should_retry(self.start_time, self.total_retries);

View File

@ -690,7 +690,7 @@ impl CachedClient {
cache_control: CacheControl<'_>, cache_control: CacheControl<'_>,
response_callback: Callback, response_callback: Callback,
) -> Result<Payload::Target, CachedClientError<CallBackError>> { ) -> Result<Payload::Target, CachedClientError<CallBackError>> {
let mut retry_state = RetryState::new(self.uncached().retry_policy(), req.url().clone()); let mut retry_state = RetryState::start(self.uncached().retry_policy(), req.url().clone());
loop { loop {
let fresh_req = req.try_clone().expect("HTTP request must be cloneable"); let fresh_req = req.try_clone().expect("HTTP request must be cloneable");
let result = self let result = self
@ -700,7 +700,10 @@ impl CachedClient {
match result { match result {
Ok(ok) => return Ok(ok), Ok(ok) => return Ok(ok),
Err(err) => { Err(err) => {
if retry_state.should_retry(err.error(), err.retries()).await { if retry_state
.handle_retry_and_backoff(err.error(), err.retries())
.await
{
continue; continue;
} }
return Err(err.with_retries(retry_state.total_retries())); return Err(err.with_retries(retry_state.total_retries()));
@ -723,7 +726,7 @@ impl CachedClient {
cache_control: CacheControl<'_>, cache_control: CacheControl<'_>,
response_callback: Callback, response_callback: Callback,
) -> Result<Payload, CachedClientError<CallBackError>> { ) -> Result<Payload, CachedClientError<CallBackError>> {
let mut retry_state = RetryState::new(self.uncached().retry_policy(), req.url().clone()); let mut retry_state = RetryState::start(self.uncached().retry_policy(), req.url().clone());
loop { loop {
let fresh_req = req.try_clone().expect("HTTP request must be cloneable"); let fresh_req = req.try_clone().expect("HTTP request must be cloneable");
let result = self let result = self
@ -733,7 +736,10 @@ impl CachedClient {
match result { match result {
Ok(ok) => return Ok(ok), Ok(ok) => return Ok(ok),
Err(err) => { Err(err) => {
if retry_state.should_retry(err.error(), err.retries()).await { if retry_state
.handle_retry_and_backoff(err.error(), err.retries())
.await
{
continue; continue;
} }
return Err(err.with_retries(retry_state.total_retries())); return Err(err.with_retries(retry_state.total_retries()));

View File

@ -468,7 +468,7 @@ pub async fn upload(
download_concurrency: &Semaphore, download_concurrency: &Semaphore,
reporter: Arc<impl Reporter>, reporter: Arc<impl Reporter>,
) -> Result<bool, PublishError> { ) -> Result<bool, PublishError> {
let mut retry_state = RetryState::new(retry_policy, registry.clone()); let mut retry_state = RetryState::start(retry_policy, registry.clone());
loop { loop {
let (request, idx) = build_upload_request( let (request, idx) = build_upload_request(
@ -496,7 +496,10 @@ pub async fn upload(
} else { } else {
0 0
}; };
if retry_state.should_retry(&err, middleware_retries).await { if retry_state
.handle_retry_and_backoff(&err, middleware_retries)
.await
{
continue; continue;
} }
return Err(PublishError::PublishSend( return Err(PublishError::PublishSend(

View File

@ -1105,7 +1105,7 @@ impl ManagedPythonDownload {
pypy_install_mirror: Option<&str>, pypy_install_mirror: Option<&str>,
reporter: Option<&dyn Reporter>, reporter: Option<&dyn Reporter>,
) -> Result<DownloadResult, Error> { ) -> Result<DownloadResult, Error> {
let mut retry_state = RetryState::new( let mut retry_state = RetryState::start(
*retry_policy, *retry_policy,
self.download_url(python_install_mirror, pypy_install_mirror)?, self.download_url(python_install_mirror, pypy_install_mirror)?,
); );
@ -1125,7 +1125,10 @@ impl ManagedPythonDownload {
match result { match result {
Ok(download_result) => return Ok(download_result), Ok(download_result) => return Ok(download_result),
Err(err) => { Err(err) => {
if retry_state.should_retry(&err, err.retries()).await { if retry_state
.handle_retry_and_backoff(&err, err.retries())
.await
{
continue; continue;
} }
return if retry_state.total_retries() > 0 { return if retry_state.total_retries() > 0 {