Use borrowed data in `BuildDispatch` (#679)

This PR uses borrowed data in `BuildDispatch` which makes creating a
`BuildDispatch` extremely cheap (only one allocation, for the Python
executable). I can be talked out of this, it will have no measurable
impact.
This commit is contained in:
Charlie Marsh 2023-12-18 11:43:03 -05:00 committed by GitHub
parent c400ab7d07
commit dbf055fe6f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 94 additions and 101 deletions

View File

@ -139,12 +139,12 @@ pub(crate) async fn pip_compile(
let options = ResolutionOptions::new(resolution_mode, prerelease_mode, exclude_newer); let options = ResolutionOptions::new(resolution_mode, prerelease_mode, exclude_newer);
let build_dispatch = BuildDispatch::new( let build_dispatch = BuildDispatch::new(
client.clone(), &client,
cache.clone(), &cache,
interpreter, &interpreter,
&index_urls,
venv.python_executable(), venv.python_executable(),
no_build, no_build,
index_urls,
) )
.with_options(options); .with_options(options);

View File

@ -133,12 +133,12 @@ pub(crate) async fn pip_install(
let options = ResolutionOptions::new(resolution_mode, prerelease_mode, exclude_newer); let options = ResolutionOptions::new(resolution_mode, prerelease_mode, exclude_newer);
let build_dispatch = BuildDispatch::new( let build_dispatch = BuildDispatch::new(
client.clone(), &client,
cache.clone(), &cache,
interpreter, &interpreter,
&index_urls,
venv.python_executable(), venv.python_executable(),
no_build, no_build,
index_urls.clone(),
) )
.with_options(options); .with_options(options);
@ -200,7 +200,7 @@ pub(crate) async fn pip_install(
site_packages, site_packages,
reinstall, reinstall,
link_mode, link_mode,
index_urls, &index_urls,
&tags, &tags,
&client, &client,
&build_dispatch, &build_dispatch,
@ -264,7 +264,7 @@ async fn build_editables(
cache: &Cache, cache: &Cache,
tags: &Tags, tags: &Tags,
client: &RegistryClient, client: &RegistryClient,
build_dispatch: &BuildDispatch, build_dispatch: &BuildDispatch<'_>,
mut printer: Printer, mut printer: Printer,
) -> Result<Vec<BuiltEditable>, Error> { ) -> Result<Vec<BuiltEditable>, Error> {
let start = std::time::Instant::now(); let start = std::time::Instant::now();
@ -321,7 +321,7 @@ async fn resolve(
tags: &Tags, tags: &Tags,
markers: &MarkerEnvironment, markers: &MarkerEnvironment,
client: &RegistryClient, client: &RegistryClient,
build_dispatch: &BuildDispatch, build_dispatch: &BuildDispatch<'_>,
options: ResolutionOptions, options: ResolutionOptions,
mut printer: Printer, mut printer: Printer,
) -> Result<ResolutionGraph, Error> { ) -> Result<ResolutionGraph, Error> {
@ -386,10 +386,10 @@ async fn install(
site_packages: SitePackages<'_>, site_packages: SitePackages<'_>,
reinstall: &Reinstall, reinstall: &Reinstall,
link_mode: LinkMode, link_mode: LinkMode,
index_urls: IndexUrls, index_urls: &IndexUrls,
tags: &Tags, tags: &Tags,
client: &RegistryClient, client: &RegistryClient,
build_dispatch: &BuildDispatch, build_dispatch: &BuildDispatch<'_>,
cache: &Cache, cache: &Cache,
venv: &Virtualenv, venv: &Virtualenv,
mut printer: Printer, mut printer: Printer,
@ -413,7 +413,7 @@ async fn install(
editables, editables,
site_packages, site_packages,
reinstall, reinstall,
&index_urls, index_urls,
cache, cache,
venv, venv,
tags, tags,

View File

@ -62,12 +62,12 @@ pub(crate) async fn pip_sync(
// Prep the build context. // Prep the build context.
let build_dispatch = BuildDispatch::new( let build_dispatch = BuildDispatch::new(
client.clone(), &client,
cache.clone(), &cache,
venv.interpreter().clone(), venv.interpreter(),
&index_urls,
venv.python_executable(), venv.python_executable(),
no_build, no_build,
index_urls.clone(),
); );
// Determine the set of installed packages. // Determine the set of installed packages.
@ -341,7 +341,7 @@ async fn resolve_editables(
tags: &Tags, tags: &Tags,
cache: &Cache, cache: &Cache,
client: &RegistryClient, client: &RegistryClient,
build_dispatch: &BuildDispatch, build_dispatch: &BuildDispatch<'_>,
mut printer: Printer, mut printer: Printer,
) -> Result<ResolvedEditables> { ) -> Result<ResolvedEditables> {
// Partition the editables into those that are already installed, and those that must be built. // Partition the editables into those that are already installed, and those that must be built.

View File

@ -53,14 +53,16 @@ pub(crate) async fn build(args: BuildArgs) -> Result<PathBuf> {
let platform = Platform::current()?; let platform = Platform::current()?;
let venv = Virtualenv::from_env(platform, &cache)?; let venv = Virtualenv::from_env(platform, &cache)?;
let client = RegistryClientBuilder::new(cache.clone()).build();
let index_urls = IndexUrls::default();
let build_dispatch = BuildDispatch::new( let build_dispatch = BuildDispatch::new(
RegistryClientBuilder::new(cache.clone()).build(), &client,
cache, &cache,
venv.interpreter().clone(), venv.interpreter(),
&index_urls,
venv.python_executable(), venv.python_executable(),
false, false,
IndexUrls::default(),
); );
let builder = SourceBuild::setup( let builder = SourceBuild::setup(

View File

@ -51,13 +51,15 @@ pub(crate) async fn resolve_cli(args: ResolveCliArgs) -> Result<()> {
let platform = Platform::current()?; let platform = Platform::current()?;
let venv = Virtualenv::from_env(platform, &cache)?; let venv = Virtualenv::from_env(platform, &cache)?;
let client = RegistryClientBuilder::new(cache.clone()).build(); let client = RegistryClientBuilder::new(cache.clone()).build();
let index_urls = IndexUrls::default();
let build_dispatch = BuildDispatch::new( let build_dispatch = BuildDispatch::new(
client.clone(), &client,
cache.clone(), &cache,
venv.interpreter().clone(), venv.interpreter(),
&index_urls,
venv.python_executable(), venv.python_executable(),
args.no_build, args.no_build,
IndexUrls::default(),
); );
// Copied from `BuildDispatch` // Copied from `BuildDispatch`

View File

@ -4,11 +4,8 @@ use std::sync::Arc;
use anyhow::Result; use anyhow::Result;
use clap::Parser; use clap::Parser;
use fs_err as fs;
use futures::stream::FuturesUnordered;
use futures::StreamExt; use futures::StreamExt;
use indicatif::ProgressStyle; use indicatif::ProgressStyle;
use tokio::sync::Semaphore;
use tokio::time::Instant; use tokio::time::Instant;
use tracing::{info, info_span, span, Level, Span}; use tracing::{info, info_span, span, Level, Span};
use tracing_indicatif::span_ext::IndicatifSpanExt; use tracing_indicatif::span_ext::IndicatifSpanExt;
@ -19,13 +16,13 @@ use puffin_cache::{Cache, CacheArgs};
use puffin_client::RegistryClientBuilder; use puffin_client::RegistryClientBuilder;
use puffin_dispatch::BuildDispatch; use puffin_dispatch::BuildDispatch;
use puffin_interpreter::Virtualenv; use puffin_interpreter::Virtualenv;
use puffin_normalize::PackageName;
use puffin_traits::BuildContext; use puffin_traits::BuildContext;
use pypi_types::IndexUrls; use pypi_types::IndexUrls;
#[derive(Parser)] #[derive(Parser)]
pub(crate) struct ResolveManyArgs { pub(crate) struct ResolveManyArgs {
list: PathBuf, /// Path to a file containing one requirement per line.
requirements: PathBuf,
#[clap(long)] #[clap(long)]
limit: Option<usize>, limit: Option<usize>,
/// Don't build source distributions. This means resolving will not run arbitrary code. The /// Don't build source distributions. This means resolving will not run arbitrary code. The
@ -42,71 +39,62 @@ pub(crate) struct ResolveManyArgs {
pub(crate) async fn resolve_many(args: ResolveManyArgs) -> Result<()> { pub(crate) async fn resolve_many(args: ResolveManyArgs) -> Result<()> {
let cache = Cache::try_from(args.cache_args)?; let cache = Cache::try_from(args.cache_args)?;
let data = fs::read_to_string(&args.list)?; let data = fs_err::read_to_string(&args.requirements)?;
let lines = data.lines().map(Requirement::from_str); let lines = data.lines().map(Requirement::from_str);
let requirements: Vec<Requirement> = if let Some(limit) = args.limit { let requirements: Vec<Requirement> = if let Some(limit) = args.limit {
lines.take(limit).collect::<Result<_, _>>()? lines.take(limit).collect::<Result<_, _>>()?
} else { } else {
lines.collect::<Result<_, _>>()? lines.collect::<Result<_, _>>()?
}; };
let total = requirements.len();
let platform = Platform::current()?; let platform = Platform::current()?;
let venv = Virtualenv::from_env(platform, &cache)?; let venv = Virtualenv::from_env(platform, &cache)?;
let client = RegistryClientBuilder::new(cache.clone()).build();
let index_urls = IndexUrls::default();
let build_dispatch = BuildDispatch::new( let build_dispatch = BuildDispatch::new(
RegistryClientBuilder::new(cache.clone()).build(), &client,
cache.clone(), &cache,
venv.interpreter().clone(), venv.interpreter(),
&index_urls,
venv.python_executable(), venv.python_executable(),
args.no_build, args.no_build,
IndexUrls::default(),
); );
let build_dispatch = Arc::new(build_dispatch);
let build_dispatch_arc = Arc::new(build_dispatch);
let mut tasks = FuturesUnordered::new();
let semaphore = Arc::new(Semaphore::new(args.num_tasks));
let header_span = info_span!("resolve many"); let header_span = info_span!("resolve many");
header_span.pb_set_style(&ProgressStyle::default_bar()); header_span.pb_set_style(&ProgressStyle::default_bar());
let total = requirements.len();
header_span.pb_set_length(total as u64); header_span.pb_set_length(total as u64);
let _header_span_enter = header_span.enter(); let _header_span_enter = header_span.enter();
let tf_models_nightly = PackageName::from_str("tf-models-nightly").unwrap(); let mut tasks = futures::stream::iter(requirements)
for requirement in requirements { .map(|requirement| {
if requirement.name == tf_models_nightly { let build_dispatch = build_dispatch.clone();
continue; async move {
} let span = span!(Level::TRACE, "fetching");
let build_dispatch_arc = build_dispatch_arc.clone(); let _enter = span.enter();
let semaphore = semaphore.clone(); let start = Instant::now();
tasks.push(tokio::spawn(async move {
let permit = semaphore.clone().acquire_owned().await.unwrap();
let span = span!(Level::TRACE, "resolving");
let _enter = span.enter();
let start = Instant::now();
let result = build_dispatch_arc.resolve(&[requirement.clone()]).await; let result = build_dispatch.resolve(&[requirement.clone()]).await;
drop(permit); (requirement.to_string(), start.elapsed(), result)
(requirement.to_string(), start.elapsed(), result) }
})); })
} .buffer_unordered(args.num_tasks);
let mut success = 0usize; let mut success = 0usize;
let mut errors = Vec::new(); let mut errors = Vec::new();
while let Some(result) = tasks.next().await { while let Some(result) = tasks.next().await {
let (package, duration, result) = result.unwrap(); let (package, duration, result) = result;
match result { match result {
Ok(resolution) => { Ok(_) => {
info!( info!(
"Success ({}/{}, {} ms): {} ({} package(s))", "Success ({}/{}, {} ms): {}",
success + errors.len(), success + errors.len(),
total, total,
duration.as_millis(), duration.as_millis(),
package, package,
resolution.len(),
); );
success += 1; success += 1;
} }
@ -124,6 +112,7 @@ pub(crate) async fn resolve_many(args: ResolveManyArgs) -> Result<()> {
} }
Span::current().pb_inc(1); Span::current().pb_inc(1);
} }
info!("Errors: {}", errors.join(", ")); info!("Errors: {}", errors.join(", "));
info!("Success: {}, Error: {}", success, errors.len()); info!("Success: {}, Error: {}", success, errors.len());
Ok(()) Ok(())

View File

@ -24,36 +24,36 @@ use pypi_types::IndexUrls;
/// The main implementation of [`BuildContext`], used by the CLI, see [`BuildContext`] /// The main implementation of [`BuildContext`], used by the CLI, see [`BuildContext`]
/// documentation. /// documentation.
pub struct BuildDispatch { pub struct BuildDispatch<'a> {
client: RegistryClient, client: &'a RegistryClient,
cache: Cache, cache: &'a Cache,
interpreter: Interpreter, interpreter: &'a Interpreter,
index_urls: &'a IndexUrls,
base_python: PathBuf, base_python: PathBuf,
no_build: bool, no_build: bool,
source_build_context: SourceBuildContext, source_build_context: SourceBuildContext,
options: ResolutionOptions, options: ResolutionOptions,
index_urls: IndexUrls,
in_flight_unzips: OnceMap<PathBuf, Result<CachedDist, String>>, in_flight_unzips: OnceMap<PathBuf, Result<CachedDist, String>>,
} }
impl BuildDispatch { impl<'a> BuildDispatch<'a> {
pub fn new( pub fn new(
client: RegistryClient, client: &'a RegistryClient,
cache: Cache, cache: &'a Cache,
interpreter: Interpreter, interpreter: &'a Interpreter,
index_urls: &'a IndexUrls,
base_python: PathBuf, base_python: PathBuf,
no_build: bool, no_build: bool,
index_urls: IndexUrls,
) -> Self { ) -> Self {
Self { Self {
client, client,
cache, cache,
interpreter, interpreter,
index_urls,
base_python, base_python,
no_build, no_build,
source_build_context: SourceBuildContext::default(), source_build_context: SourceBuildContext::default(),
options: ResolutionOptions::default(), options: ResolutionOptions::default(),
index_urls,
in_flight_unzips: OnceMap::default(), in_flight_unzips: OnceMap::default(),
} }
} }
@ -65,15 +65,15 @@ impl BuildDispatch {
} }
} }
impl BuildContext for BuildDispatch { impl<'a> BuildContext for BuildDispatch<'a> {
type SourceDistBuilder = SourceBuild; type SourceDistBuilder = SourceBuild;
fn cache(&self) -> &Cache { fn cache(&self) -> &Cache {
&self.cache self.cache
} }
fn interpreter(&self) -> &Interpreter { fn interpreter(&self) -> &Interpreter {
&self.interpreter self.interpreter
} }
fn base_python(&self) -> &Path { fn base_python(&self) -> &Path {
@ -85,18 +85,18 @@ impl BuildContext for BuildDispatch {
} }
#[instrument(skip(self, requirements), fields(requirements = requirements.iter().map(ToString::to_string).join(", ")))] #[instrument(skip(self, requirements), fields(requirements = requirements.iter().map(ToString::to_string).join(", ")))]
fn resolve<'a>( fn resolve<'data>(
&'a self, &'data self,
requirements: &'a [Requirement], requirements: &'data [Requirement],
) -> Pin<Box<dyn Future<Output = Result<Resolution>> + Send + 'a>> { ) -> Pin<Box<dyn Future<Output = Result<Resolution>> + Send + 'data>> {
Box::pin(async { Box::pin(async {
let tags = Tags::from_interpreter(&self.interpreter)?; let tags = Tags::from_interpreter(self.interpreter)?;
let resolver = Resolver::new( let resolver = Resolver::new(
Manifest::simple(requirements.to_vec()), Manifest::simple(requirements.to_vec()),
self.options, self.options,
self.interpreter.markers(), self.interpreter.markers(),
&tags, &tags,
&self.client, self.client,
self, self,
); );
let graph = resolver.resolve().await.with_context(|| { let graph = resolver.resolve().await.with_context(|| {
@ -116,11 +116,11 @@ impl BuildContext for BuildDispatch {
venv = ?venv.root() venv = ?venv.root()
) )
)] )]
fn install<'a>( fn install<'data>(
&'a self, &'data self,
resolution: &'a Resolution, resolution: &'data Resolution,
venv: &'a Virtualenv, venv: &'data Virtualenv,
) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'a>> { ) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'data>> {
Box::pin(async move { Box::pin(async move {
debug!( debug!(
"Installing in {} in {}", "Installing in {} in {}",
@ -132,7 +132,7 @@ impl BuildContext for BuildDispatch {
); );
// Determine the current environment markers. // Determine the current environment markers.
let tags = Tags::from_interpreter(&self.interpreter)?; let tags = Tags::from_interpreter(self.interpreter)?;
// Determine the set of installed packages. // Determine the set of installed packages.
let site_packages = let site_packages =
@ -148,7 +148,7 @@ impl BuildContext for BuildDispatch {
Vec::new(), Vec::new(),
site_packages, site_packages,
&Reinstall::None, &Reinstall::None,
&self.index_urls, self.index_urls,
self.cache(), self.cache(),
venv, venv,
&tags, &tags,
@ -170,7 +170,7 @@ impl BuildContext for BuildDispatch {
vec![] vec![]
} else { } else {
// TODO(konstin): Check that there is no endless recursion. // TODO(konstin): Check that there is no endless recursion.
let downloader = Downloader::new(self.cache(), &tags, &self.client, self); let downloader = Downloader::new(self.cache(), &tags, self.client, self);
debug!( debug!(
"Downloading and building requirement{} for build: {}", "Downloading and building requirement{} for build: {}",
if remote.len() == 1 { "" } else { "s" }, if remote.len() == 1 { "" } else { "s" },
@ -218,13 +218,13 @@ impl BuildContext for BuildDispatch {
} }
#[instrument(skip_all, fields(package_id = package_id, subdirectory = ?subdirectory))] #[instrument(skip_all, fields(package_id = package_id, subdirectory = ?subdirectory))]
fn setup_build<'a>( fn setup_build<'data>(
&'a self, &'data self,
source: &'a Path, source: &'data Path,
subdirectory: Option<&'a Path>, subdirectory: Option<&'data Path>,
package_id: &'a str, package_id: &'data str,
build_kind: BuildKind, build_kind: BuildKind,
) -> Pin<Box<dyn Future<Output = Result<SourceBuild>> + Send + 'a>> { ) -> Pin<Box<dyn Future<Output = Result<SourceBuild>> + Send + 'data>> {
Box::pin(async move { Box::pin(async move {
if self.no_build { if self.no_build {
bail!("Building source distributions is disabled"); bail!("Building source distributions is disabled");
@ -232,7 +232,7 @@ impl BuildContext for BuildDispatch {
let builder = SourceBuild::setup( let builder = SourceBuild::setup(
source, source,
subdirectory, subdirectory,
&self.interpreter, self.interpreter,
self, self,
self.source_build_context.clone(), self.source_build_context.clone(),
package_id.to_string(), package_id.to_string(),