Trim `get_cached_with_callback` and `send_cached` down some more. (#1128)

I noticed that `get_cached_with_callback` and `send_cached` are large
both in terms of llvm lines and in terms of types (and large types can
cause buffer overflows on windows). `get_cached_with_callback`
specifically is large because it's monomorphized for each callback. I've
split both functions into smaller units and boxed the callback.

llvm lines, before:

```
  Lines                 Copies               Function name
  -----                 ------               -------------
  909511                21625                (TOTAL)
   36026 (4.0%,  4.0%)     33 (0.2%,  0.2%)  <&mut rmp_serde::decode::Deserializer<R,C> as serde:🇩🇪:Deserializer>::deserialize_any
   14688 (1.6%,  5.6%)      8 (0.0%,  0.2%)  puffin_client::cached_client::CachedClient::get_cached_with_callback::{{closure}}::{{closure}}
   13748 (1.5%,  7.1%)      5 (0.0%,  0.2%)  puffin_client::cached_client::CachedClient::send_cached::{{closure}}
   12460 (1.4%,  8.5%)     35 (0.2%,  0.4%)  alloc::raw_vec::RawVec<T,A>::grow_amortized
   10731 (1.2%,  9.6%)    122 (0.6%,  0.9%)  <alloc::boxed::Box<T,A> as core::ops::drop::Drop>::drop
    8952 (1.0%, 10.6%)      9 (0.0%,  1.0%)  core::slice::sort::partition_in_blocks
    8216 (0.9%, 11.5%)    323 (1.5%,  2.5%)  <core::result::Result<T,E> as core::ops::try_trait::Try>::branch
    7745 (0.9%, 12.4%)    205 (0.9%,  3.4%)  core::result::Result<T,E>::map_err
    6862 (0.8%, 13.1%)     54 (0.2%,  3.7%)  <alloc::vec::Vec<T> as alloc::vec::spec_from_iter_nested::SpecFromIterNested<T,I>>::from_iter
    6720 (0.7%, 13.9%)    133 (0.6%,  4.3%)  std::panicking::try
    6600 (0.7%, 14.6%)     45 (0.2%,  4.5%)  <alloc::sync::Weak<T,A> as core::ops::drop::Drop>::drop
    5899 (0.6%, 15.2%)     33 (0.2%,  4.6%)  rmp_serde::decode::Deserializer<R,C>::read_str_data
    5610 (0.6%, 15.9%)     33 (0.2%,  4.8%)  alloc::raw_vec::RawVec<T,A>::allocate_in
    5187 (0.6%, 16.4%)    133 (0.6%,  5.4%)  std::panicking::try::do_catch
    4740 (0.5%, 17.0%)    268 (1.2%,  6.7%)  core::ops::function::FnOnce::call_once
    4670 (0.5%, 17.5%)     40 (0.2%,  6.8%)  puffin_client::cached_client::CachedClient::get_cached_with_callback::{{closure}}::{{closure}}::{{closure}}
    4527 (0.5%, 18.0%)     54 (0.2%,  7.1%)  core::iter::traits::iterator::Iterator::try_fold
```

after:

```
  Lines                 Copies               Function name
  -----                 ------               -------------
  910275                21712                (TOTAL)
   36026 (4.0%,  4.0%)     33 (0.2%,  0.2%)  <&mut rmp_serde::decode::Deserializer<R,C> as serde:🇩🇪:Deserializer>::deserialize_any
   12460 (1.4%,  5.3%)     35 (0.2%,  0.3%)  alloc::raw_vec::RawVec<T,A>::grow_amortized
   10935 (1.2%,  6.5%)    124 (0.6%,  0.9%)  <alloc::boxed::Box<T,A> as core::ops::drop::Drop>::drop
    8952 (1.0%,  7.5%)      9 (0.0%,  0.9%)  core::slice::sort::partition_in_blocks
    8714 (1.0%,  8.5%)      5 (0.0%,  0.9%)  puffin_client::cached_client::CachedClient::send_cached_handle_stale::{{closure}}
    8216 (0.9%,  9.4%)    323 (1.5%,  2.4%)  <core::result::Result<T,E> as core::ops::try_trait::Try>::branch
    8192 (0.9%, 10.3%)      8 (0.0%,  2.5%)  puffin_client::cached_client::CachedClient::get_cached_with_callback::{{closure}}::{{closure}}
    7745 (0.9%, 11.1%)    205 (0.9%,  3.4%)  core::result::Result<T,E>::map_err
    6862 (0.8%, 11.9%)     54 (0.2%,  3.7%)  <alloc::vec::Vec<T> as alloc::vec::spec_from_iter_nested::SpecFromIterNested<T,I>>::from_iter
    6778 (0.7%, 12.6%)      5 (0.0%,  3.7%)  puffin_client::cached_client::CachedClient::send_cached::{{closure}}
    6720 (0.7%, 13.4%)    133 (0.6%,  4.3%)  std::panicking::try
    6600 (0.7%, 14.1%)     45 (0.2%,  4.5%)  <alloc::sync::Weak<T,A> as core::ops::drop::Drop>::drop
    5899 (0.6%, 14.7%)     33 (0.2%,  4.7%)  rmp_serde::decode::Deserializer<R,C>::read_str_data
    5610 (0.6%, 15.3%)     33 (0.2%,  4.8%)  alloc::raw_vec::RawVec<T,A>::allocate_in
    5187 (0.6%, 15.9%)    133 (0.6%,  5.4%)  std::panicking::try::do_catch
    4740 (0.5%, 16.4%)    268 (1.2%,  6.7%)  core::ops::function::FnOnce::call_once
    4527 (0.5%, 16.9%)     54 (0.2%,  6.9%)  core::iter::traits::iterator::Iterator::try_fold
```

Stack sizes diff:
https://gist.github.com/konstin/a3f38276aacf1170038a756c8c49793c
This commit is contained in:
konsti 2024-01-29 09:31:27 +01:00 committed by GitHub
parent ebd8cd425d
commit 8bfc3c1b37
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
1 changed files with 110 additions and 82 deletions

View File

@ -2,12 +2,14 @@ use std::future::Future;
use std::time::SystemTime;
use futures::FutureExt;
use http::request::Parts;
use http_cache_semantics::{AfterResponse, BeforeRequest, CachePolicy};
use reqwest::{Request, Response};
use reqwest::{Body, Request, Response};
use reqwest_middleware::ClientWithMiddleware;
use serde::de::DeserializeOwned;
use serde::{Deserialize, Serialize};
use tracing::{debug, info_span, instrument, trace, warn, Instrument};
use url::Url;
use puffin_cache::{CacheEntry, Freshness};
use puffin_fs::write_atomic;
@ -115,33 +117,9 @@ impl CachedClient {
) -> Result<Payload, CachedClientError<CallBackError>>
where
Callback: FnOnce(Response) -> CallbackReturn,
CallbackReturn: Future<Output = Result<Payload, CallBackError>>,
CallbackReturn: Future<Output = Result<Payload, CallBackError>> + Send,
{
let read_span = info_span!("read_cache", file = %cache_entry.path().display());
let read_result = fs_err::tokio::read(cache_entry.path())
.instrument(read_span)
.await;
let cached = if let Ok(cached) = read_result {
let parse_span = info_span!(
"parse_cache",
path = %cache_entry.path().display()
);
let parse_result = parse_span
.in_scope(|| rmp_serde::from_slice::<DataWithCachePolicy<Payload>>(&cached));
match parse_result {
Ok(data) => Some(data),
Err(err) => {
warn!(
"Broken cache entry at {}, removing: {err}",
cache_entry.path().display()
);
let _ = fs_err::tokio::remove_file(&cache_entry.path()).await;
None
}
}
} else {
None
};
let cached = Self::read_cache(cache_entry).await;
let cached_response = self.send_cached(req, cache_control, cached).boxed().await?;
@ -165,6 +143,7 @@ impl CachedClient {
let immutable = headers.is_immutable();
let data = response_callback(res)
.boxed()
.await
.map_err(|err| CachedClientError::Callback(err))?;
if let Some(cache_policy) = cache_policy {
@ -193,10 +172,41 @@ impl CachedClient {
}
}
async fn read_cache<Payload: Serialize + DeserializeOwned + Send>(
cache_entry: &CacheEntry,
) -> Option<DataWithCachePolicy<Payload>> {
let read_span = info_span!("read_cache", file = %cache_entry.path().display());
let read_result = fs_err::tokio::read(cache_entry.path())
.instrument(read_span)
.await;
if let Ok(cached) = read_result {
let parse_span = info_span!(
"parse_cache",
path = %cache_entry.path().display()
);
let parse_result = parse_span
.in_scope(|| rmp_serde::from_slice::<DataWithCachePolicy<Payload>>(&cached));
match parse_result {
Ok(data) => Some(data),
Err(err) => {
warn!(
"Broken cache entry at {}, removing: {err}",
cache_entry.path().display()
);
let _ = fs_err::tokio::remove_file(&cache_entry.path()).await;
None
}
}
} else {
None
}
}
/// `http-cache-semantics` to `reqwest` wrapper
async fn send_cached<T: Serialize + DeserializeOwned>(
&self,
mut req: Request,
req: Request,
cache_control: CacheControl,
cached: Option<DataWithCachePolicy<T>>,
) -> Result<CachedResponse<T>, Error> {
@ -236,60 +246,15 @@ impl CachedClient {
CachedResponse::FreshCache(cached.data)
}
BeforeRequest::Stale { request, matches } => {
if !matches {
// This shouldn't happen; if it does, we'll override the cache.
warn!("Cached request doesn't match current request for: {url}");
return self.fresh_request(req, converted_req).await;
}
debug!("Sending revalidation request for: {url}");
for header in &request.headers {
req.headers_mut().insert(header.0.clone(), header.1.clone());
converted_req
.headers_mut()
.insert(header.0.clone(), header.1.clone());
}
let res = self
.0
.execute(req)
.instrument(info_span!("revalidation_request", url = url.as_str()))
.await
.map_err(ErrorKind::RequestMiddlewareError)?
.error_for_status()
.map_err(ErrorKind::RequestError)?;
let mut converted_res = http::Response::new(());
*converted_res.status_mut() = res.status();
for header in res.headers() {
converted_res.headers_mut().insert(
http::HeaderName::from(header.0),
http::HeaderValue::from(header.1),
);
}
let after_response = cached.cache_policy.after_response(
&converted_req,
&converted_res,
SystemTime::now(),
);
match after_response {
AfterResponse::NotModified(new_policy, _parts) => {
debug!("Found not-modified response for: {url}");
let headers =
CacheHeaders::from_response(res.headers().get_all("cache-control"));
let immutable = headers.is_immutable();
CachedResponse::NotModified(DataWithCachePolicy {
data: cached.data,
immutable,
cache_policy: Box::new(new_policy),
})
}
AfterResponse::Modified(new_policy, _parts) => {
debug!("Found modified response for: {url}");
CachedResponse::ModifiedOrNew(
res,
new_policy.is_storable().then(|| Box::new(new_policy)),
)
}
}
self.send_cached_handle_stale(
req,
converted_req,
url,
cached,
&request,
matches,
)
.await?
}
}
} else {
@ -299,6 +264,69 @@ impl CachedClient {
Ok(cached_response)
}
async fn send_cached_handle_stale<T: Serialize + DeserializeOwned>(
&self,
mut req: Request,
mut converted_req: http::Request<Body>,
url: Url,
cached: DataWithCachePolicy<T>,
request: &Parts,
matches: bool,
) -> Result<CachedResponse<T>, Error> {
if !matches {
// This shouldn't happen; if it does, we'll override the cache.
warn!("Cached request doesn't match current request for: {url}");
return self.fresh_request(req, converted_req).await;
}
debug!("Sending revalidation request for: {url}");
for header in &request.headers {
req.headers_mut().insert(header.0.clone(), header.1.clone());
converted_req
.headers_mut()
.insert(header.0.clone(), header.1.clone());
}
let res = self
.0
.execute(req)
.instrument(info_span!("revalidation_request", url = url.as_str()))
.await
.map_err(ErrorKind::RequestMiddlewareError)?
.error_for_status()
.map_err(ErrorKind::RequestError)?;
let mut converted_res = http::Response::new(());
*converted_res.status_mut() = res.status();
for header in res.headers() {
converted_res.headers_mut().insert(
http::HeaderName::from(header.0),
http::HeaderValue::from(header.1),
);
}
let after_response =
cached
.cache_policy
.after_response(&converted_req, &converted_res, SystemTime::now());
match after_response {
AfterResponse::NotModified(new_policy, _parts) => {
debug!("Found not-modified response for: {url}");
let headers = CacheHeaders::from_response(res.headers().get_all("cache-control"));
let immutable = headers.is_immutable();
Ok(CachedResponse::NotModified(DataWithCachePolicy {
data: cached.data,
immutable,
cache_policy: Box::new(new_policy),
}))
}
AfterResponse::Modified(new_policy, _parts) => {
debug!("Found modified response for: {url}");
Ok(CachedResponse::ModifiedOrNew(
res,
new_policy.is_storable().then(|| Box::new(new_policy)),
))
}
}
}
#[instrument(skip_all, fields(url = req.url().as_str()))]
async fn fresh_request<T: Serialize>(
&self,