This commit is contained in:
Charlie Marsh 2025-11-08 22:14:08 -05:00
parent 5339c83998
commit ba8284379c
10 changed files with 508 additions and 124 deletions

1
Cargo.lock generated
View File

@ -5871,6 +5871,7 @@ dependencies = [
"uv-platform-tags",
"uv-pypi-types",
"uv-redacted",
"uv-static",
"uv-types",
"uv-workspace",
"walkdir",

View File

@ -15,7 +15,7 @@ pub enum Error {
#[error("Could not extract path segments from URL: {0}")]
MissingPathSegments(String),
#[error("Distribution not fxound at: {0}")]
#[error("Distribution not found at: {0}")]
NotFound(DisplaySafeUrl),
#[error("Requested package name `{0}` does not match `{1}` in the distribution filename: {2}")]

View File

@ -409,11 +409,10 @@ impl Dist {
// Normalize the path.
let install_path = normalize_absolute_path(&install_path)?;
// // Validate that the path exists.
// if !install_path.exists() {
// println!("foo");
// return Err(Error::NotFound(url.to_url()));
// }
// Validate that the path exists.
if !install_path.exists() {
return Err(Error::NotFound(url.to_url()));
}
// Determine whether the path represents a built or source distribution.
match ext {
@ -470,7 +469,6 @@ impl Dist {
// Validate that the path exists.
if !install_path.exists() {
println!("bar");
return Err(Error::NotFound(url.to_url()));
}

View File

@ -34,6 +34,7 @@ uv-pep508 = { workspace = true }
uv-platform-tags = { workspace = true }
uv-pypi-types = { workspace = true }
uv-redacted = { workspace = true }
uv-static = { workspace = true }
uv-types = { workspace = true }
uv-workspace = { workspace = true }

View File

@ -10,18 +10,21 @@ use tempfile::TempDir;
use tokio::io::{AsyncRead, AsyncSeekExt, ReadBuf};
use tokio::sync::Semaphore;
use tokio_util::compat::FuturesAsyncReadCompatExt;
use tracing::{Instrument, info_span, instrument, warn, debug};
use tracing::{Instrument, info_span, instrument, warn};
use url::Url;
use uv_auth::PyxTokenStore;
use uv_cache::{ArchiveId, CacheBucket, CacheEntry, WheelCache};
use uv_cache_info::{CacheInfo, Timestamp};
use uv_client::{CacheControl, CachedClientError, Connectivity, DataWithCachePolicy, MetadataFormat, RegistryClient, VersionFiles};
use uv_distribution_filename::{DistFilename, WheelFilename};
use uv_distribution_types::{BuildInfo, BuildableSource, BuiltDist, Dist, File, HashPolicy, Hashed, IndexFormat, IndexMetadata, IndexUrl, InstalledDist, Name, RegistryBuiltDist, RegistryBuiltWheel, SourceDist, ToUrlError};
use uv_client::{
CacheControl, CachedClientError, Connectivity, DataWithCachePolicy, RegistryClient,
};
use uv_distribution_filename::WheelFilename;
use uv_distribution_types::{
BuildInfo, BuildableSource, BuiltDist, CompatibleDist, Dist, File, HashPolicy, Hashed,
IndexUrl, InstalledDist, Name, RegistryBuiltDist, SourceDist, ToUrlError,
};
use uv_extract::hash::Hasher;
use uv_fs::write_atomic;
use uv_git_types::GitHubRepository;
use uv_pep508::VerbatimUrl;
use uv_platform_tags::Tags;
use uv_pypi_types::{HashDigest, HashDigests, PyProjectToml};
use uv_redacted::DisplaySafeUrl;
@ -29,6 +32,7 @@ use uv_types::{BuildContext, BuildStack};
use crate::archive::Archive;
use crate::metadata::{ArchiveMetadata, Metadata};
use crate::remote::RemoteCacheResolver;
use crate::source::SourceDistributionBuilder;
use crate::{Error, LocalWheel, Reporter, RequiresDist};
@ -47,6 +51,7 @@ use crate::{Error, LocalWheel, Reporter, RequiresDist};
pub struct DistributionDatabase<'a, Context: BuildContext> {
build_context: &'a Context,
builder: SourceDistributionBuilder<'a, Context>,
resolver: RemoteCacheResolver<'a, Context>,
client: ManagedClient<'a>,
reporter: Option<Arc<dyn Reporter>>,
}
@ -60,6 +65,7 @@ impl<'a, Context: BuildContext> DistributionDatabase<'a, Context> {
Self {
build_context,
builder: SourceDistributionBuilder::new(build_context),
resolver: RemoteCacheResolver::new(build_context),
client: ManagedClient::new(client, concurrent_downloads),
reporter: None,
}
@ -376,59 +382,23 @@ impl<'a, Context: BuildContext> DistributionDatabase<'a, Context> {
tags: &Tags,
hashes: HashPolicy<'_>,
) -> Result<LocalWheel, Error> {
// If this is a Git distribution, look for cached wheels.
if let SourceDist::Git(dist) = dist {
if dist.subdirectory.is_none() {
if let Some(repo) = GitHubRepository::parse(dist.git.repository()) {
if let Ok(store) = PyxTokenStore::from_settings() {
// let url = store.api().join(&format!("v1/git/astral-sh/{}/{}", repo.owner, repo.repo)).unwrap();
let url = VerbatimUrl::parse_url(&format!("http://localhost:8000/v1/git/astral-sh/{}/{}", repo.owner, repo.repo)).unwrap();
let index = IndexMetadata {
// url: IndexUrl::from(VerbatimUrl::from(url)),
url: IndexUrl::from(url.clone()),
format: IndexFormat::Simple,
};
let archives = self.client
.manual(|client, semaphore| {
client.package_metadata(
dist.name(), Some(index.as_ref()), self.build_context.capabilities(), semaphore,
)
})
.await?;
// TODO(charlie): This needs to prefer wheels to sdists (but allow sdists),
// etc., filter by tags, filter by `requires-python`, etc.
for (_, archive) in archives {
let MetadataFormat::Simple(archive) = archive else {
continue;
};
for datum in archive.iter().rev() {
let files = rkyv::deserialize::<VersionFiles, rkyv::rancor::Error>(&datum.files)
.expect("archived version files always deserializes");
for (filename, file) in files.all() {
if let DistFilename::WheelFilename(filename) = filename {
debug!("Found cached wheel {filename} for Git distribution: {dist}");
let dist = BuiltDist::Registry(RegistryBuiltDist {
wheels: vec![
RegistryBuiltWheel {
filename,
file: Box::new(file),
index: IndexUrl::from(VerbatimUrl::from(url)),
}
],
best_wheel_index: 0,
sdist: None,
});
return self.get_wheel(&dist, hashes).await;
}
}
}
}
}
}
}
// If the metadata is available in a remote cache, fetch it.
if let Some(wheel) = self.get_remote_wheel(dist, tags, hashes).await? {
return Ok(wheel);
}
// Otherwise, build the wheel locally.
self.build_wheel_inner(dist, tags, hashes).await
}
/// Convert a source distribution into a wheel, fetching it from the cache or building it if
/// necessary.
async fn build_wheel_inner(
&self,
dist: &SourceDist,
tags: &Tags,
hashes: HashPolicy<'_>,
) -> Result<LocalWheel, Error> {
let built_wheel = self
.builder
.download_and_build(&BuildableSource::Dist(dist), tags, hashes, &self.client)
@ -588,65 +558,21 @@ impl<'a, Context: BuildContext> DistributionDatabase<'a, Context> {
}
}
// If this is a Git distribution, look for cached wheels.
// TODO(charlie): What if this is unnamed? How can we infer the package name? Maybe we make
// the whole thing content-addressed, and assume that the registry contains at most one
// package?
if let BuildableSource::Dist(SourceDist::Git(dist)) = source {
// TODO(charlie): Make this more efficient.
self.builder.resolve_revision(source, &self.client).await?;
if dist.subdirectory.is_none() {
if let Some(repo) = GitHubRepository::parse(dist.git.repository()) {
if let Ok(store) = PyxTokenStore::from_settings() {
// let url = store.api().join(&format!("v1/git/astral-sh/{}/{}", repo.owner, repo.repo)).unwrap();
let url = VerbatimUrl::parse_url(&format!("http://localhost:8000/v1/git/astral-sh/{}/{}", repo.owner, repo.repo)).unwrap();
let index = IndexMetadata {
// url: IndexUrl::from(VerbatimUrl::from(url)),
url: IndexUrl::from(url.clone()),
format: IndexFormat::Simple,
};
let archives = self.client
.manual(|client, semaphore| {
client.package_metadata(
dist.name(), Some(index.as_ref()), self.build_context.capabilities(),semaphore
)
})
.await?;
// TODO(charlie): This needs to prefer wheels to sdists (but allow sdists),
// etc.
for (_, archive) in archives {
let MetadataFormat::Simple(archive) = archive else {
continue;
};
for datum in archive.iter().rev() {
let files = rkyv::deserialize::<VersionFiles, rkyv::rancor::Error>(&datum.files)
.expect("archived version files always deserializes");
for (filename, file) in files.all() {
if let DistFilename::WheelFilename(filename) = filename {
debug!("Found cached wheel {filename} for Git distribution: {dist}");
let dist = BuiltDist::Registry(RegistryBuiltDist {
wheels: vec![
RegistryBuiltWheel {
filename,
file: Box::new(file),
index: IndexUrl::from(VerbatimUrl::from(url)),
}
],
best_wheel_index: 0,
sdist: None,
});
return self.get_wheel_metadata(&dist, hashes).await;
}
}
}
}
}
}
}
// If the metadata is available in a remote cache, fetch it.
if let Some(metadata) = self.get_remote_metadata(source, hashes).await? {
return Ok(metadata);
}
// Otherwise, retrieve the metadata from the source distribution.
self.build_wheel_metadata_inner(source, hashes).await
}
/// Build the wheel metadata for a source distribution, or fetch it from the cache if possible.
async fn build_wheel_metadata_inner(
&self,
source: &BuildableSource<'_>,
hashes: HashPolicy<'_>,
) -> Result<ArchiveMetadata, Error> {
let metadata = self
.builder
.download_and_build_metadata(source, hashes, &self.client)
@ -656,6 +582,96 @@ impl<'a, Context: BuildContext> DistributionDatabase<'a, Context> {
Ok(metadata)
}
/// Fetch a wheel from a remote cache, if available.
async fn get_remote_wheel(
&self,
dist: &SourceDist,
tags: &Tags,
hashes: HashPolicy<'_>,
) -> Result<Option<LocalWheel>, Error> {
let Some(index) = self
.resolver
.get_cached_distribution(dist, Some(tags), &self.client)
.await?
else {
return Ok(None);
};
let Some(entries) = index.get(dist.name()) else {
return Ok(None);
};
for (.., prioritized_dist) in entries.iter() {
let Some(compatible_dist) = prioritized_dist.get() else {
continue;
};
match compatible_dist {
CompatibleDist::InstalledDist(..) => {}
CompatibleDist::SourceDist { sdist, .. } => {
let dist = SourceDist::Registry(sdist.clone());
return self.build_wheel_inner(&dist, tags, hashes).await.map(Some);
}
CompatibleDist::CompatibleWheel { wheel, .. }
| CompatibleDist::IncompatibleWheel { wheel, .. } => {
let dist = BuiltDist::Registry(RegistryBuiltDist {
wheels: vec![wheel.clone()],
best_wheel_index: 0,
sdist: None,
});
return self.get_wheel(&dist, hashes).await.map(Some);
}
}
}
Ok(None)
}
/// Fetch the wheel metadata from a remote cache, if available.
async fn get_remote_metadata(
&self,
source: &BuildableSource<'_>,
hashes: HashPolicy<'_>,
) -> Result<Option<ArchiveMetadata>, Error> {
// TODO(charlie): If the distribution is unnamed, we should be able to infer the name
// from the list of available distributions in the index, since we expect exactly one
// package name per cache entry.
let BuildableSource::Dist(dist) = source else {
return Ok(None);
};
let Some(index) = self
.resolver
.get_cached_distribution(dist, None, &self.client)
.await?
else {
return Ok(None);
};
let Some(entries) = index.get(dist.name()) else {
return Ok(None);
};
for (.., prioritized_dist) in entries.iter() {
let Some(compatible_dist) = prioritized_dist.get() else {
continue;
};
match compatible_dist {
CompatibleDist::InstalledDist(..) => {}
CompatibleDist::SourceDist { sdist, .. } => {
let dist = SourceDist::Registry(sdist.clone());
return self
.build_wheel_metadata_inner(&BuildableSource::Dist(&dist), hashes)
.await
.map(Some);
}
CompatibleDist::CompatibleWheel { wheel, .. }
| CompatibleDist::IncompatibleWheel { wheel, .. } => {
let dist = BuiltDist::Registry(RegistryBuiltDist {
wheels: vec![wheel.clone()],
best_wheel_index: 0,
sdist: None,
});
return self.get_wheel_metadata(&dist, hashes).await.map(Some);
}
}
}
Ok(None)
}
/// Return the [`RequiresDist`] from a `pyproject.toml`, if it can be statically extracted.
pub async fn requires_dist(
&self,

View File

@ -100,7 +100,7 @@ pub enum Error {
UnsupportedScheme(String),
#[error(transparent)]
MetadataLowering(#[from] MetadataError),
#[error("Distribution not founyd at: {0}")]
#[error("Distribution not found at: {0}")]
NotFound(DisplaySafeUrl),
#[error("Attempted to re-extract the source distribution for `{}`, but the {} hash didn't match. Run `{}` to clear the cache.", _0, _1, "uv cache clean".green())]
CacheHeal(String, HashAlgorithm),

View File

@ -16,5 +16,6 @@ mod download;
mod error;
mod index;
mod metadata;
mod remote;
mod reporter;
mod source;

View File

@ -0,0 +1,364 @@
use std::collections::BTreeMap;
use std::collections::btree_map::Entry;
use std::sync::Arc;
use rustc_hash::FxHashMap;
use tokio::sync::Mutex;
use tracing::instrument;
use uv_auth::PyxTokenStore;
use uv_client::{MetadataFormat, VersionFiles};
use uv_configuration::BuildOptions;
use uv_distribution_filename::{DistFilename, SourceDistFilename, WheelFilename};
use uv_distribution_types::{
File, HashComparison, HashPolicy, IncompatibleSource, IncompatibleWheel, IndexFormat,
IndexMetadata, IndexUrl, Name, PrioritizedDist, RegistryBuiltWheel, RegistrySourceDist,
SourceDist, SourceDistCompatibility, WheelCompatibility,
};
use uv_git_types::GitHubRepository;
use uv_normalize::PackageName;
use uv_pep440::Version;
use uv_pep508::VerbatimUrl;
use uv_platform_tags::{TagCompatibility, Tags};
use uv_pypi_types::HashDigest;
use uv_static::EnvVars;
use uv_types::{BuildContext, HashStrategy};
use crate::Error;
use crate::distribution_database::ManagedClient;
/// A resolver for remote Git-based indexes.
pub(crate) struct RemoteCacheResolver<'a, Context: BuildContext> {
build_context: &'a Context,
cache: Arc<Mutex<GitIndexCache>>,
store: Option<PyxTokenStore>,
workspace: Option<String>,
}
impl<'a, T: BuildContext> RemoteCacheResolver<'a, T> {
/// Initialize a [`RemoteCacheResolver`] from a [`BuildContext`].
pub(crate) fn new(build_context: &'a T) -> Self {
Self {
build_context,
cache: Arc::default(),
store: PyxTokenStore::from_settings().ok(),
workspace: std::env::var(EnvVars::PYX_GIT_WORKSPACE).ok(),
}
}
/// Return the cached Git index for the given distribution, if any.
pub(crate) async fn get_cached_distribution(
&self,
dist: &SourceDist,
tags: Option<&Tags>,
client: &ManagedClient<'a>,
) -> Result<Option<GitIndex>, Error> {
// Fetch the entries for the given distribution.
let entries = self.get_or_fetch_index(dist, client).await?;
if entries.is_empty() {
return Ok(None);
}
// Create the index.
let index = GitIndex::from_entries(
entries,
tags,
&HashStrategy::default(),
self.build_context.build_options(),
);
Ok(Some(index))
}
/// Fetch the remote Git index for the given distribution.
async fn get_or_fetch_index(
&self,
dist: &SourceDist,
client: &ManagedClient<'a>,
) -> Result<Vec<GitIndexEntry>, Error> {
let Some(workspace) = &self.workspace else {
return Ok(Vec::default());
};
let SourceDist::Git(dist) = dist else {
return Ok(Vec::default());
};
// TODO(charlie): Handle subdirectories.
if dist.subdirectory.is_some() {
return Ok(Vec::default());
}
let Some(repo) = GitHubRepository::parse(dist.git.repository()) else {
return Ok(Vec::default());
};
// Store the index entries in a cache, to avoid redundant fetches.
let index = IndexUrl::from(
VerbatimUrl::parse_url(format!(
"http://localhost:8000/v1/git/{workspace}/{}/{}",
repo.owner, repo.repo
))
.unwrap(),
);
{
let cache = self.cache.lock().await;
if let Some(entries) = cache.get(&index) {
return Ok(entries.to_vec());
}
}
// Perform a remote fetch via the Simple API.
let metadata = IndexMetadata {
url: index.clone(),
format: IndexFormat::Simple,
};
let archives = client
.manual(|client, semaphore| {
client.package_metadata(
dist.name(),
Some(metadata.as_ref()),
self.build_context.capabilities(),
semaphore,
)
})
.await?;
// Collect the files from the remote index.
let mut entries = Vec::new();
for (_, archive) in archives {
let MetadataFormat::Simple(archive) = archive else {
continue;
};
for datum in archive.iter().rev() {
let files = rkyv::deserialize::<VersionFiles, rkyv::rancor::Error>(&datum.files)
.expect("archived version files always deserializes");
for (filename, file) in files.all() {
entries.push(GitIndexEntry {
filename,
file,
index: index.clone(),
});
}
}
}
// Write to the cache.
{
let mut cache = self.cache.lock().await;
cache.insert(index.clone(), entries.clone());
}
Ok(entries)
}
}
/// An entry in a remote Git index.
#[derive(Debug, Clone)]
struct GitIndexEntry {
filename: DistFilename,
file: File,
index: IndexUrl,
}
/// A set of [`PrioritizedDist`] from a Git index.
#[derive(Debug, Clone, Default)]
pub(crate) struct GitIndex(FxHashMap<PackageName, GitIndexDistributions>);
impl GitIndex {
/// Collect all files from a Git index.
#[instrument(skip_all)]
fn from_entries(
entries: Vec<GitIndexEntry>,
tags: Option<&Tags>,
hasher: &HashStrategy,
build_options: &BuildOptions,
) -> Self {
// Collect compatible distributions.
let mut index = FxHashMap::<PackageName, GitIndexDistributions>::default();
for entry in entries {
let distributions = index.entry(entry.filename.name().clone()).or_default();
distributions.add_file(
entry.file,
entry.filename,
tags,
hasher,
build_options,
entry.index,
);
}
Self(index)
}
/// Get the [`GitIndexDistributions`] for the given package name.
pub(crate) fn get(&self, package_name: &PackageName) -> Option<&GitIndexDistributions> {
self.0.get(package_name)
}
}
/// A set of [`PrioritizedDist`] from a Git index, indexed by [`Version`].
#[derive(Debug, Clone, Default)]
pub(crate) struct GitIndexDistributions(BTreeMap<Version, PrioritizedDist>);
impl GitIndexDistributions {
/// Returns an [`Iterator`] over the distributions.
pub(crate) fn iter(&self) -> impl Iterator<Item = (&Version, &PrioritizedDist)> {
self.0.iter()
}
/// Add the given [`File`] to the [`GitIndexDistributions`] for the given package.
fn add_file(
&mut self,
file: File,
filename: DistFilename,
tags: Option<&Tags>,
hasher: &HashStrategy,
build_options: &BuildOptions,
index: IndexUrl,
) {
// No `requires-python` here: for source distributions, we don't have that information;
// for wheels, we read it lazily only when selected.
match filename {
DistFilename::WheelFilename(filename) => {
let version = filename.version.clone();
let compatibility = Self::wheel_compatibility(
&filename,
file.hashes.as_slice(),
tags,
hasher,
build_options,
);
let dist = RegistryBuiltWheel {
filename,
file: Box::new(file),
index,
};
match self.0.entry(version) {
Entry::Occupied(mut entry) => {
entry.get_mut().insert_built(dist, vec![], compatibility);
}
Entry::Vacant(entry) => {
entry.insert(PrioritizedDist::from_built(dist, vec![], compatibility));
}
}
}
DistFilename::SourceDistFilename(filename) => {
let compatibility = Self::source_dist_compatibility(
&filename,
file.hashes.as_slice(),
hasher,
build_options,
);
let dist = RegistrySourceDist {
name: filename.name.clone(),
version: filename.version.clone(),
ext: filename.extension,
file: Box::new(file),
index,
wheels: vec![],
};
match self.0.entry(filename.version) {
Entry::Occupied(mut entry) => {
entry.get_mut().insert_source(dist, vec![], compatibility);
}
Entry::Vacant(entry) => {
entry.insert(PrioritizedDist::from_source(dist, vec![], compatibility));
}
}
}
}
}
fn source_dist_compatibility(
filename: &SourceDistFilename,
hashes: &[HashDigest],
hasher: &HashStrategy,
build_options: &BuildOptions,
) -> SourceDistCompatibility {
// Check if source distributions are allowed for this package.
if build_options.no_build_package(&filename.name) {
return SourceDistCompatibility::Incompatible(IncompatibleSource::NoBuild);
}
// Check if hashes line up.
let hash = if let HashPolicy::Validate(required) =
hasher.get_package(&filename.name, &filename.version)
{
if hashes.is_empty() {
HashComparison::Missing
} else if required.iter().any(|hash| hashes.contains(hash)) {
HashComparison::Matched
} else {
HashComparison::Mismatched
}
} else {
HashComparison::Matched
};
SourceDistCompatibility::Compatible(hash)
}
fn wheel_compatibility(
filename: &WheelFilename,
hashes: &[HashDigest],
tags: Option<&Tags>,
hasher: &HashStrategy,
build_options: &BuildOptions,
) -> WheelCompatibility {
// Check if binaries are allowed for this package.
if build_options.no_binary_package(&filename.name) {
return WheelCompatibility::Incompatible(IncompatibleWheel::NoBinary);
}
// Determine a compatibility for the wheel based on tags.
let priority = match tags {
Some(tags) => match filename.compatibility(tags) {
TagCompatibility::Incompatible(tag) => {
return WheelCompatibility::Incompatible(IncompatibleWheel::Tag(tag));
}
TagCompatibility::Compatible(priority) => Some(priority),
},
None => None,
};
// Check if hashes line up.
let hash = if let HashPolicy::Validate(required) =
hasher.get_package(&filename.name, &filename.version)
{
if hashes.is_empty() {
HashComparison::Missing
} else if required.iter().any(|hash| hashes.contains(hash)) {
HashComparison::Matched
} else {
HashComparison::Mismatched
}
} else {
HashComparison::Matched
};
// Break ties with the build tag.
let build_tag = filename.build_tag().cloned();
WheelCompatibility::Compatible(hash, priority, build_tag)
}
}
/// A map from [`IndexUrl`] to [`GitIndex`] entries found at the given URL.
#[derive(Default, Debug, Clone)]
struct GitIndexCache(FxHashMap<IndexUrl, Vec<GitIndexEntry>>);
impl GitIndexCache {
/// Get the entries for a given index URL.
fn get(&self, index: &IndexUrl) -> Option<&[GitIndexEntry]> {
self.0.get(index).map(Vec::as_slice)
}
/// Insert the entries for a given index URL.
fn insert(
&mut self,
index: IndexUrl,
entries: Vec<GitIndexEntry>,
) -> Option<Vec<GitIndexEntry>> {
self.0.insert(index, entries)
}
}

View File

@ -213,7 +213,6 @@ impl GitResolver {
/// [`resolve_precise`], and will return `None` for URLs that have not been resolved _or_
/// already have a precise reference.
pub fn precise(&self, url: GitUrl) -> Option<GitUrl> {
println!("Resolving precise Git URL for: {url}");
let reference = RepositoryReference::from(&url);
let precise = self.get(&reference)?;
Some(url.with_precise(*precise))

View File

@ -1139,6 +1139,10 @@ impl EnvVars {
#[attr_added_in("0.8.15")]
pub const PYX_API_KEY: &'static str = "PYX_API_KEY";
/// The pyx workspace in which to search for cached Git dependencies.
#[attr_added_in("0.9.8")]
pub const PYX_GIT_WORKSPACE: &'static str = "PYX_GIT_WORKSPACE";
/// The pyx API key, for backwards compatibility.
#[attr_hidden]
#[attr_added_in("0.8.15")]