diff --git a/crates/distribution-types/src/lib.rs b/crates/distribution-types/src/lib.rs index 88946da74..3cab4778c 100644 --- a/crates/distribution-types/src/lib.rs +++ b/crates/distribution-types/src/lib.rs @@ -147,7 +147,7 @@ pub enum SourceDist { #[derive(Debug, Clone)] pub struct RegistryBuiltDist { pub filename: WheelFilename, - pub file: File, + pub file: Box, pub index: IndexUrl, } @@ -172,7 +172,7 @@ pub struct PathBuiltDist { #[derive(Debug, Clone)] pub struct RegistrySourceDist { pub filename: SourceDistFilename, - pub file: File, + pub file: Box, pub index: IndexUrl, } @@ -208,14 +208,14 @@ impl Dist { DistFilename::WheelFilename(filename) => { Self::Built(BuiltDist::Registry(RegistryBuiltDist { filename, - file, + file: Box::new(file), index, })) } DistFilename::SourceDistFilename(filename) => { Self::Source(SourceDist::Registry(RegistrySourceDist { filename, - file, + file: Box::new(file), index, })) } @@ -865,3 +865,16 @@ impl Identifier for Dist { } } } + +#[cfg(test)] +mod test { + use crate::{BuiltDist, Dist, SourceDist}; + + /// Ensure that we don't accidentally grow the `Dist` sizes. + #[test] + fn dist_size() { + assert!(std::mem::size_of::() <= 240); + assert!(std::mem::size_of::() <= 240); + assert!(std::mem::size_of::() <= 168); + } +} diff --git a/crates/puffin-client/src/cached_client.rs b/crates/puffin-client/src/cached_client.rs index 55f243c36..e4b59c80b 100644 --- a/crates/puffin-client/src/cached_client.rs +++ b/crates/puffin-client/src/cached_client.rs @@ -1,3 +1,4 @@ +use futures::FutureExt; use std::future::Future; use std::time::SystemTime; @@ -42,14 +43,15 @@ enum CachedResponse { /// There was no prior cached response or the cache was outdated /// /// The cache policy is `None` if it isn't storable - ModifiedOrNew(Response, Option), + ModifiedOrNew(Response, Option>), } /// Serialize the actual payload together with its caching information #[derive(Debug, Deserialize, Serialize)] pub struct DataWithCachePolicy { pub data: Payload, - cache_policy: CachePolicy, + // The cache policy is large (448 bytes at time of writing), reduce the stack size + cache_policy: Box, } /// Custom caching layer over [`reqwest::Client`] using `http-cache-semantics`. @@ -88,7 +90,7 @@ impl CachedClient { /// client. #[instrument(skip_all)] pub async fn get_cached_with_callback< - Payload: Serialize + DeserializeOwned, + Payload: Serialize + DeserializeOwned + Send, CallBackError, Callback, CallbackReturn, @@ -128,7 +130,7 @@ impl CachedClient { None }; - let cached_response = self.send_cached(req, cached).await?; + let cached_response = self.send_cached(req, cached).boxed().await?; let write_cache = info_span!("write_cache", file = %cache_entry.path().display()); match cached_response { @@ -231,14 +233,14 @@ impl CachedClient { debug!("Found not-modified response for: {url}"); CachedResponse::NotModified(DataWithCachePolicy { data: cached.data, - cache_policy: new_policy, + cache_policy: Box::new(new_policy), }) } AfterResponse::Modified(new_policy, _parts) => { debug!("Found modified response for: {url}"); CachedResponse::ModifiedOrNew( res, - new_policy.is_storable().then_some(new_policy), + new_policy.is_storable().then(|| Box::new(new_policy)), ) } } @@ -271,7 +273,7 @@ impl CachedClient { CachePolicy::new(&converted_req.into_parts().0, &converted_res.into_parts().0); Ok(CachedResponse::ModifiedOrNew( res, - cache_policy.is_storable().then_some(cache_policy), + cache_policy.is_storable().then(|| Box::new(cache_policy)), )) } } diff --git a/crates/puffin-client/src/flat_index.rs b/crates/puffin-client/src/flat_index.rs index b223c2f09..25a6de2a2 100644 --- a/crates/puffin-client/src/flat_index.rs +++ b/crates/puffin-client/src/flat_index.rs @@ -2,7 +2,7 @@ use std::collections::btree_map::Entry; use std::collections::BTreeMap; use std::path::PathBuf; -use futures::StreamExt; +use futures::{FutureExt, StreamExt}; use reqwest::Response; use rustc_hash::FxHashMap; use tracing::{debug, info_span, instrument, warn, Instrument}; @@ -120,6 +120,7 @@ impl<'a> FlatIndexClient<'a> { .collect(); Ok(files) } + .boxed() .instrument(info_span!("parse_flat_index_html", url = % url)) }; let files = cached_client @@ -218,7 +219,7 @@ impl FlatIndex { let dist = Dist::Built(BuiltDist::Registry(RegistryBuiltDist { filename, - file, + file: Box::new(file), index, })); match distributions.0.entry(version) { @@ -235,7 +236,7 @@ impl FlatIndex { DistFilename::SourceDistFilename(filename) => { let dist = Dist::Source(SourceDist::Registry(RegistrySourceDist { filename: filename.clone(), - file, + file: Box::new(file), index, })); match distributions.0.entry(filename.version.clone()) { diff --git a/crates/puffin-client/src/registry_client.rs b/crates/puffin-client/src/registry_client.rs index faf05e562..3b56c24a7 100644 --- a/crates/puffin-client/src/registry_client.rs +++ b/crates/puffin-client/src/registry_client.rs @@ -5,7 +5,7 @@ use std::str::FromStr; use async_http_range_reader::{AsyncHttpRangeReader, AsyncHttpRangeReaderError}; use async_zip::tokio::read::seek::ZipFileReader; -use futures::TryStreamExt; +use futures::{FutureExt, TryStreamExt}; use reqwest::{Client, ClientBuilder, Response, StatusCode}; use reqwest_retry::policies::ExponentialBackoff; use reqwest_retry::RetryTransientMiddleware; @@ -206,6 +206,7 @@ impl RegistryClient { } } } + .boxed() .instrument(info_span!("parse_simple_api", package = %package_name)) }; let result = self @@ -335,6 +336,7 @@ impl RegistryClient { })?; Ok(metadata) } + .boxed() .instrument(info_span!("read_metadata_range_request", wheel = %filename)) }; diff --git a/crates/puffin-distribution/src/distribution_database.rs b/crates/puffin-distribution/src/distribution_database.rs index 75d5c36d4..309f22368 100644 --- a/crates/puffin-distribution/src/distribution_database.rs +++ b/crates/puffin-distribution/src/distribution_database.rs @@ -5,6 +5,7 @@ use std::str::FromStr; use std::sync::Arc; use fs_err::tokio as fs; +use futures::FutureExt; use thiserror::Error; use tokio::task::JoinError; use tokio_util::compat::FuturesAsyncReadCompatExt; @@ -219,7 +220,7 @@ impl<'a, Context: BuildContext + Send + Sync> DistributionDatabase<'a, Context> let lock = self.locks.acquire(&dist).await; let _guard = lock.lock().await; - let built_wheel = self.builder.download_and_build(source_dist).await?; + let built_wheel = self.builder.download_and_build(source_dist).boxed().await?; Ok(LocalWheel::Built(BuiltWheel { dist: dist.clone(), path: built_wheel.path, @@ -242,7 +243,9 @@ impl<'a, Context: BuildContext + Send + Sync> DistributionDatabase<'a, Context> dist: &Dist, ) -> Result<(Metadata21, Option), DistributionDatabaseError> { match dist { - Dist::Built(built_dist) => Ok((self.client.wheel_metadata(built_dist).await?, None)), + Dist::Built(built_dist) => { + Ok((self.client.wheel_metadata(built_dist).boxed().await?, None)) + } Dist::Source(source_dist) => { // Optimization: Skip source dist download when we must not build them anyway. if self.build_context.no_build() { @@ -263,6 +266,7 @@ impl<'a, Context: BuildContext + Send + Sync> DistributionDatabase<'a, Context> let metadata = self .builder .download_and_build_metadata(&source_dist) + .boxed() .await?; Ok((metadata, precise)) } diff --git a/crates/puffin-distribution/src/source/mod.rs b/crates/puffin-distribution/src/source/mod.rs index 3d065b04c..e4c1b8576 100644 --- a/crates/puffin-distribution/src/source/mod.rs +++ b/crates/puffin-distribution/src/source/mod.rs @@ -6,7 +6,7 @@ use std::sync::Arc; use anyhow::Result; use fs_err::tokio as fs; -use futures::TryStreamExt; +use futures::{FutureExt, TryStreamExt}; use reqwest::Response; use tempfile::TempDir; use tokio_util::compat::FuturesAsyncReadCompatExt; @@ -96,6 +96,7 @@ impl<'a, T: BuildContext> SourceDistCachedBuilder<'a, T> { &cache_shard, subdirectory.as_deref(), ) + .boxed() .await? } SourceDist::Registry(registry_source_dist) => { @@ -134,6 +135,7 @@ impl<'a, T: BuildContext> SourceDistCachedBuilder<'a, T> { &cache_shard, None, ) + .boxed() .await? } SourceDist::Git(git_source_dist) => self.git(source_dist, git_source_dist).await?, @@ -171,6 +173,7 @@ impl<'a, T: BuildContext> SourceDistCachedBuilder<'a, T> { &cache_shard, subdirectory.as_deref(), ) + .boxed() .await? } SourceDist::Registry(registry_source_dist) => { @@ -189,7 +192,10 @@ impl<'a, T: BuildContext> SourceDistCachedBuilder<'a, T> { path: path.clone(), editable: false, }; - return self.path_metadata(source_dist, &path_source_dist).await; + return self + .path_metadata(source_dist, &path_source_dist) + .boxed() + .await; } }; @@ -209,13 +215,18 @@ impl<'a, T: BuildContext> SourceDistCachedBuilder<'a, T> { &cache_shard, None, ) + .boxed() .await? } SourceDist::Git(git_source_dist) => { - self.git_metadata(source_dist, git_source_dist).await? + self.git_metadata(source_dist, git_source_dist) + .boxed() + .await? } SourceDist::Path(path_source_dist) => { - self.path_metadata(source_dist, path_source_dist).await? + self.path_metadata(source_dist, path_source_dist) + .boxed() + .await? } }; @@ -380,6 +391,7 @@ impl<'a, T: BuildContext> SourceDistCachedBuilder<'a, T> { // If the backend supports `prepare_metadata_for_build_wheel`, use it. if let Some(metadata) = self .build_source_dist_metadata(source_dist, source_dist_entry.path(), subdirectory) + .boxed() .await? { if let Ok(cached) = fs::read(cache_entry.path()).await { @@ -564,6 +576,7 @@ impl<'a, T: BuildContext> SourceDistCachedBuilder<'a, T> { // If the backend supports `prepare_metadata_for_build_wheel`, use it. if let Some(metadata) = self .build_source_dist_metadata(source_dist, &path_source_dist.path, None) + .boxed() .await? { // Store the metadata for this build along with all the other builds. @@ -712,6 +725,7 @@ impl<'a, T: BuildContext> SourceDistCachedBuilder<'a, T> { // If the backend supports `prepare_metadata_for_build_wheel`, use it. if let Some(metadata) = self .build_source_dist_metadata(source_dist, fetch.path(), subdirectory.as_deref()) + .boxed() .await? { // Store the metadata for this build along with all the other builds. diff --git a/crates/puffin-installer/src/downloader.rs b/crates/puffin-installer/src/downloader.rs index ca716adc8..a842cdca6 100644 --- a/crates/puffin-installer/src/downloader.rs +++ b/crates/puffin-installer/src/downloader.rs @@ -2,7 +2,7 @@ use std::cmp::Reverse; use std::path::Path; use std::sync::Arc; -use futures::{Stream, StreamExt, TryFutureExt, TryStreamExt}; +use futures::{FutureExt, Stream, StreamExt, TryFutureExt, TryStreamExt}; use tokio::task::JoinError; use tracing::{instrument, warn}; use url::Url; @@ -68,7 +68,7 @@ impl<'a, Context: BuildContext + Send + Sync> Downloader<'a, Context> { ) -> impl Stream> + 'stream { futures::stream::iter(distributions) .map(|dist| async { - let wheel = self.get_wheel(dist, in_flight).await?; + let wheel = self.get_wheel(dist, in_flight).boxed().await?; if let Some(reporter) = self.reporter.as_ref() { reporter.on_progress(&wheel); } @@ -158,6 +158,7 @@ impl<'a, Context: BuildContext + Send + Sync> Downloader<'a, Context> { let download: LocalWheel = self .database .get_or_build_wheel(dist.clone()) + .boxed() .map_err(|err| Error::Fetch(dist.clone(), err)) .await?; let result = Self::unzip_wheel(download).await; diff --git a/crates/puffin-resolver/src/resolver/mod.rs b/crates/puffin-resolver/src/resolver/mod.rs index cd5dcd193..b8ec28db0 100644 --- a/crates/puffin-resolver/src/resolver/mod.rs +++ b/crates/puffin-resolver/src/resolver/mod.rs @@ -683,7 +683,7 @@ impl<'a, Provider: ResolverProvider> Resolver<'a, Provider> { /// Fetch the metadata for a stream of packages and versions. async fn fetch(&self, request_stream: UnboundedReceiver) -> Result<(), ResolveError> { let mut response_stream = request_stream - .map(|request| self.process_request(request)) + .map(|request| self.process_request(request).boxed()) .buffer_unordered(50); while let Some(response) = response_stream.next().await { @@ -738,6 +738,7 @@ impl<'a, Provider: ResolverProvider> Resolver<'a, Provider> { let version_map = self .provider .get_version_map(&package_name) + .boxed() .await .map_err(ResolveError::Client)?; Ok(Some(Response::Package(package_name, version_map))) @@ -748,6 +749,7 @@ impl<'a, Provider: ResolverProvider> Resolver<'a, Provider> { let (metadata, precise) = self .provider .get_or_build_wheel_metadata(&dist) + .boxed() .await .map_err(|err| match dist.clone() { Dist::Built(BuiltDist::Path(built_dist)) => { @@ -800,6 +802,7 @@ impl<'a, Provider: ResolverProvider> Resolver<'a, Provider> { let (metadata, precise) = self .provider .get_or_build_wheel_metadata(&dist) + .boxed() .await .map_err(|err| match dist.clone() { Dist::Built(BuiltDist::Path(built_dist)) => { diff --git a/crates/puffin-traits/src/lib.rs b/crates/puffin-traits/src/lib.rs index ebef0c95f..3b0c51f14 100644 --- a/crates/puffin-traits/src/lib.rs +++ b/crates/puffin-traits/src/lib.rs @@ -51,7 +51,7 @@ use puffin_interpreter::{Interpreter, Virtualenv}; /// them. // TODO(konstin): Proper error types -pub trait BuildContext { +pub trait BuildContext: Sync { type SourceDistBuilder: SourceBuildTrait + Send + Sync; fn cache(&self) -> &Cache;