Move in-flight tracking to the download level (#892)

## Summary

Now that `get_or_build_wheel` will often _also_ handle the unzip step,
we need to move our per-target locking (`OnceMap`) up a level.
Previously, it was only applied to the unzip step, to prevent us from
attempting to unzip into the same target concurrently; now, it's applied
at the `get_wheel` level, which includes both downloading and unzipping.

## Test Plan

It seems like none of our existing tests catch this -- perhaps because
they're too "simple"? You need to run into a situation in which you're
doing multiple source distribution builds concurrently (since they'll
all try to download `setuptools`):

```
rm -rf foo && virtualenv --clear .venv && cargo run -p puffin-cli -- pip-compile ./scripts/requirements/pydantic.in  --verbose --cache-dir foo
```
This commit is contained in:
Charlie Marsh 2024-01-12 03:52:22 -05:00 committed by GitHub
parent 60cea0f07d
commit 35c1faa575
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 51 additions and 48 deletions

View File

@ -9,7 +9,7 @@ use anyhow::{bail, Context, Result};
use itertools::Itertools;
use tracing::{debug, instrument};
use distribution_types::{CachedDist, IndexUrls, Name, Resolution};
use distribution_types::{CachedDist, DistributionId, IndexUrls, Name, Resolution};
use pep508_rs::Requirement;
use puffin_build::{SourceBuild, SourceBuildContext};
use puffin_cache::Cache;
@ -31,7 +31,7 @@ pub struct BuildDispatch<'a> {
no_build: bool,
source_build_context: SourceBuildContext,
options: ResolutionOptions,
in_flight_unzips: OnceMap<PathBuf, Result<CachedDist, String>>,
in_flight: OnceMap<DistributionId, Result<CachedDist, String>>,
}
impl<'a> BuildDispatch<'a> {
@ -54,7 +54,7 @@ impl<'a> BuildDispatch<'a> {
no_build,
source_build_context: SourceBuildContext::default(),
options: ResolutionOptions::default(),
in_flight_unzips: OnceMap::default(),
in_flight: OnceMap::default(),
}
}
@ -179,7 +179,7 @@ impl<'a> BuildContext for BuildDispatch<'a> {
);
downloader
.download(remote, &self.in_flight_unzips)
.download(remote, &self.in_flight)
.await
.context("Failed to download and build distributions")?
};

View File

@ -5,11 +5,10 @@ use std::str::FromStr;
use std::sync::Arc;
use fs_err::tokio as fs;
use puffin_extract::unzip_no_seek;
use thiserror::Error;
use tokio::task::JoinError;
use tokio_util::compat::FuturesAsyncReadCompatExt;
use tracing::{debug, instrument};
use tracing::instrument;
use url::Url;
use distribution_filename::{WheelFilename, WheelFilenameError};
@ -17,6 +16,7 @@ use distribution_types::{BuiltDist, DirectGitUrl, Dist, LocalEditable, Name, Sou
use platform_tags::Tags;
use puffin_cache::{Cache, CacheBucket, WheelCache};
use puffin_client::RegistryClient;
use puffin_extract::unzip_no_seek;
use puffin_git::GitSource;
use puffin_traits::BuildContext;
use pypi_types::Metadata21;
@ -113,15 +113,7 @@ impl<'a, Context: BuildContext + Send + Sync> DistributionDatabase<'a, Context>
.join_relative(&wheel.file.url)
.map_err(|err| DistributionDatabaseError::Url(wheel.file.url.clone(), err))?;
// Make cache entry
let wheel_filename = WheelFilename::from_str(&wheel.file.filename)?;
let cache_entry = self.cache.entry(
CacheBucket::Wheels,
WheelCache::Index(&wheel.index).remote_wheel_dir(wheel_filename.name.as_ref()),
wheel_filename.stem(),
);
// Download and unzip on the same tokio task
// Download and unzip on the same tokio task.
//
// In all wheels we've seen so far, unzipping while downloading is
// faster than downloading into a file and then unzipping on multiple
@ -137,8 +129,22 @@ impl<'a, Context: BuildContext + Send + Sync> DistributionDatabase<'a, Context>
// to rayon if this buffer grows large by the time the file is fully
// downloaded.
let reader = self.client.stream_external(&url).await?;
// Download and unzip the wheel to a temporary directory.
let temp_dir = tempfile::tempdir_in(self.cache.root())?;
let temp_target = temp_dir.path().join(&wheel.file.filename);
unzip_no_seek(reader.compat(), &temp_target).await?;
// Move the temporary file to the cache.
let wheel_filename = WheelFilename::from_str(&wheel.file.filename)?;
let cache_entry = self.cache.entry(
CacheBucket::Wheels,
WheelCache::Index(&wheel.index).remote_wheel_dir(wheel_filename.name.as_ref()),
wheel_filename.stem(),
);
fs::create_dir_all(&cache_entry.dir()).await?;
let target = cache_entry.into_path_buf();
unzip_no_seek(reader.compat(), &target).await?;
tokio::fs::rename(temp_target, &target).await?;
Ok(LocalWheel::Unzipped(UnzippedWheel {
dist: dist.clone(),
@ -148,11 +154,9 @@ impl<'a, Context: BuildContext + Send + Sync> DistributionDatabase<'a, Context>
}
Dist::Built(BuiltDist::DirectUrl(wheel)) => {
debug!("Fetching disk-based wheel from URL: {}", wheel.url);
let reader = self.client.stream_external(&wheel.url).await?;
// Download and unzip the wheel to a temporary dir.
// Download and unzip the wheel to a temporary directory.
let temp_dir = tempfile::tempdir_in(self.cache.root())?;
let temp_target = temp_dir.path().join(wheel.filename.to_string());
unzip_no_seek(reader.compat(), &temp_target).await?;

View File

@ -7,7 +7,9 @@ use tokio::task::JoinError;
use tracing::{instrument, warn};
use url::Url;
use distribution_types::{CachedDist, Dist, LocalEditable, RemoteSource, SourceDist};
use distribution_types::{
CachedDist, Dist, DistributionId, Identifier, LocalEditable, RemoteSource, SourceDist,
};
use platform_tags::Tags;
use puffin_cache::Cache;
use puffin_client::RegistryClient;
@ -64,7 +66,7 @@ impl<'a, Context: BuildContext + Send + Sync> Downloader<'a, Context> {
pub fn download_stream<'stream>(
&'stream self,
distributions: Vec<Dist>,
in_flight: &'stream OnceMap<PathBuf, Result<CachedDist, String>>,
in_flight: &'stream OnceMap<DistributionId, Result<CachedDist, String>>,
) -> impl Stream<Item = Result<CachedDist, Error>> + 'stream {
futures::stream::iter(distributions)
.map(|dist| async {
@ -84,7 +86,7 @@ impl<'a, Context: BuildContext + Send + Sync> Downloader<'a, Context> {
pub async fn download(
&self,
mut distributions: Vec<Dist>,
in_flight: &OnceMap<PathBuf, Result<CachedDist, String>>,
in_flight: &OnceMap<DistributionId, Result<CachedDist, String>>,
) -> Result<Vec<CachedDist>, Error> {
// Sort the distributions by size.
distributions
@ -151,35 +153,33 @@ impl<'a, Context: BuildContext + Send + Sync> Downloader<'a, Context> {
}
/// Download, build, and unzip a single wheel.
#[instrument(skip_all, fields(name = %dist, size = ?dist.size(), url = dist.file().unwrap().url))]
#[instrument(skip_all, fields(name = % dist, size = ? dist.size(), url = dist.file().unwrap().url))]
pub async fn get_wheel(
&self,
dist: Dist,
in_flight: &OnceMap<PathBuf, Result<CachedDist, String>>,
in_flight: &OnceMap<DistributionId, Result<CachedDist, String>>,
) -> Result<CachedDist, Error> {
// TODO(charlie): Add in-flight tracking around `get_or_build_wheel`.
let download: LocalWheel = self
.database
.get_or_build_wheel(dist.clone())
.map_err(|err| Error::Fetch(dist.clone(), err))
.await?;
let target = download.target().to_path_buf();
let wheel = if in_flight.register(&target) {
let id = dist.distribution_id();
let wheel = if in_flight.register(&id) {
let download: LocalWheel = self
.database
.get_or_build_wheel(dist.clone())
.map_err(|err| Error::Fetch(dist.clone(), err))
.await?;
let result = Self::unzip_wheel(download).await;
match result {
Ok(cached) => {
in_flight.done(target, Ok(cached.clone()));
in_flight.done(id, Ok(cached.clone()));
cached
}
Err(err) => {
in_flight.done(target, Err(err.to_string()));
in_flight.done(id, Err(err.to_string()));
return Err(err);
}
}
} else {
in_flight
.wait(&target)
.wait(&id)
.await
.value()
.clone()
@ -194,22 +194,21 @@ impl<'a, Context: BuildContext + Send + Sync> Downloader<'a, Context> {
let remote = download.remote().clone();
let filename = download.filename().clone();
// If the wheel is already unpacked, we should avoid attempting to unzip it at all.
if download.target().is_dir() {
warn!("Wheel is already unpacked: {remote}");
return Ok(CachedDist::from_remote(
remote,
filename,
download.target().to_path_buf(),
));
}
// Unzip the wheel.
let normalized_path = if matches!(download, LocalWheel::Unzipped(_)) {
// Just an optimizaion: Avoid spawning a blocking
// task if there is no work to be done.
// Just an optimization: Avoid spawning a blocking task if there is no work to be done.
download.target().to_path_buf()
} else {
// If the wheel is already unpacked, we should avoid attempting to unzip it at all.
if download.target().is_dir() {
warn!("Wheel is already unpacked: {remote}");
return Ok(CachedDist::from_remote(
remote,
filename,
download.target().to_path_buf(),
));
}
tokio::task::spawn_blocking({
move || -> Result<PathBuf, puffin_extract::Error> {
// Unzip the wheel into a temporary directory.