Content-address distributions in the archive

This commit is contained in:
Charlie Marsh 2025-11-21 22:02:16 -05:00
parent 7b8240dca9
commit 6dbfe80ed7
13 changed files with 153 additions and 137 deletions

1
Cargo.lock generated
View File

@ -5698,6 +5698,7 @@ dependencies = [
"uv-normalize",
"uv-pypi-types",
"uv-redacted",
"uv-small-str",
"uv-static",
"walkdir",
]

View File

@ -24,6 +24,7 @@ uv-fs = { workspace = true, features = ["tokio"] }
uv-normalize = { workspace = true }
uv-pypi-types = { workspace = true }
uv-redacted = { workspace = true }
uv-small-str = { workspace = true }
uv-static = { workspace = true }
clap = { workspace = true, features = ["derive", "env"], optional = true }

View File

@ -5,17 +5,17 @@ use std::str::FromStr;
#[derive(Debug, Clone, Eq, PartialEq, Hash, serde::Serialize, serde::Deserialize)]
pub struct ArchiveId(String);
impl Default for ArchiveId {
fn default() -> Self {
Self::new()
}
impl ArchiveId {
/// Create a content-addressed identifier for an archive from a SHA256 digest.
pub fn from_sha256(digest: &str) -> Self {
Self(digest.to_string())
}
impl ArchiveId {
/// Generate a new unique identifier for an archive.
pub fn new() -> Self {
/// Create a random content-addressed identifier for an archive.
pub fn nanoid() -> Self {
Self(nanoid::nanoid!())
}
}
impl AsRef<Path> for ArchiveId {

View File

@ -96,7 +96,7 @@ fn migrate_windows_cache(source: &Path, destination: &Path) -> Result<(), io::Er
"interpreter-v2",
"simple-v12",
"wheels-v1",
"archive-v0",
"archive-v1",
"builds-v0",
"environments-v1",
] {

View File

@ -33,7 +33,7 @@ mod wheel;
/// The version of the archive bucket.
///
/// Must be kept in-sync with the version in [`CacheBucket::to_str`].
pub const ARCHIVE_VERSION: u8 = 0;
pub const ARCHIVE_VERSION: u8 = 1;
/// A [`CacheEntry`] which may or may not exist yet.
#[derive(Debug, Clone)]
@ -346,19 +346,32 @@ impl Cache {
}
/// Persist a temporary directory to the artifact store, returning its unique ID.
///
/// The archive is content-addressed using the provided ID. If an archive with this ID
/// already exists, the temporary directory is discarded and the existing archive is used.
pub async fn persist(
&self,
temp_dir: impl AsRef<Path>,
path: impl AsRef<Path>,
id: ArchiveId,
) -> io::Result<ArchiveId> {
// Create a unique ID for the artifact.
// TODO(charlie): Support content-addressed persistence via SHAs.
let id = ArchiveId::new();
// Move the temporary directory into the directory store.
let archive_entry = self.entry(CacheBucket::Archive, "", &id);
fs_err::create_dir_all(archive_entry.dir())?;
uv_fs::rename_with_retry(temp_dir.as_ref(), archive_entry.path()).await?;
match uv_fs::rename_with_retry(temp_dir.as_ref(), archive_entry.path()).await {
Ok(()) => {}
Err(err)
if err.kind() == io::ErrorKind::AlreadyExists
|| err.kind() == io::ErrorKind::DirectoryNotEmpty =>
{
debug!(
"Archive already exists at {}; skipping extraction",
archive_entry.path().display()
);
fs_err::tokio::remove_dir_all(temp_dir.as_ref()).await?;
}
Err(err) => return Err(err),
}
// Create a symlink to the directory store.
fs_err::create_dir_all(path.as_ref().parent().expect("Cache entry to have parent"))?;
@ -1114,7 +1127,7 @@ impl CacheBucket {
Self::Wheels => "wheels-v5",
// Note that when bumping this, you'll also need to bump
// `ARCHIVE_VERSION` in `crates/uv-cache/src/lib.rs`.
Self::Archive => "archive-v0",
Self::Archive => "archive-v1",
Self::Builds => "builds-v0",
Self::Environments => "environments-v2",
Self::Python => "python-v0",
@ -1363,7 +1376,7 @@ mod tests {
#[test]
fn test_link_round_trip() {
let id = ArchiveId::new();
let id = ArchiveId::from_sha256("a".repeat(64).as_str());
let link = Link::new(id);
let s = link.to_string();
let parsed = Link::from_str(&s).unwrap();
@ -1373,9 +1386,10 @@ mod tests {
#[test]
fn test_link_deserialize() {
assert!(Link::from_str("archive-v0/foo").is_ok());
assert!(Link::from_str("archive-v1/foo").is_ok());
assert!(Link::from_str("archive/foo").is_err());
assert!(Link::from_str("v1/foo").is_err());
assert!(Link::from_str("archive-v0/").is_err());
assert!(Link::from_str("archive-v1/").is_err());
assert!(Link::from_str("archive-v0/foo").is_ok());
}
}

View File

@ -35,12 +35,15 @@ impl HashPolicy<'_> {
}
/// Return the algorithms used in the hash policy.
///
/// SHA256 is always included to support content-addressed archive IDs.
pub fn algorithms(&self) -> Vec<HashAlgorithm> {
match self {
Self::None => vec![],
Self::None => vec![HashAlgorithm::Sha256],
Self::Generate(_) => vec![HashAlgorithm::Sha256],
Self::Validate(hashes) => {
let mut algorithms = hashes.iter().map(HashDigest::algorithm).collect::<Vec<_>>();
algorithms.push(HashAlgorithm::Sha256);
algorithms.sort();
algorithms.dedup();
algorithms

View File

@ -1,7 +1,7 @@
use uv_cache::{ARCHIVE_VERSION, ArchiveId, Cache};
use uv_distribution_filename::WheelFilename;
use uv_distribution_types::Hashed;
use uv_pypi_types::{HashDigest, HashDigests};
use uv_pypi_types::{HashAlgorithm, HashDigest, HashDigests};
/// An archive (unzipped wheel) that exists in the local cache.
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
@ -17,8 +17,16 @@ pub struct Archive {
}
impl Archive {
/// Create a new [`Archive`] with the given ID and hashes.
pub(crate) fn new(id: ArchiveId, hashes: HashDigests, filename: WheelFilename) -> Self {
/// Create a new [`Archive`] with the given hashes.
///
/// The archive ID is derived from the SHA256 hash in the hashes.
pub(crate) fn new(hashes: HashDigests, filename: WheelFilename) -> Self {
// Extract the SHA256 hash to use as the archive ID
let sha256 = hashes
.iter()
.find(|digest| digest.algorithm == HashAlgorithm::Sha256)
.expect("SHA256 hash must be present");
let id = ArchiveId::from_sha256(&sha256.digest);
Self {
id,
hashes,

View File

@ -6,7 +6,6 @@ use std::sync::Arc;
use std::task::{Context, Poll};
use futures::{FutureExt, TryStreamExt};
use tempfile::TempDir;
use tokio::io::{AsyncRead, AsyncSeekExt, ReadBuf};
use tokio::sync::Semaphore;
use tokio_util::compat::FuturesAsyncReadCompatExt;
@ -26,7 +25,7 @@ use uv_distribution_types::{
use uv_extract::hash::Hasher;
use uv_fs::write_atomic;
use uv_platform_tags::Tags;
use uv_pypi_types::{HashDigest, HashDigests, PyProjectToml};
use uv_pypi_types::{HashAlgorithm, HashDigest, HashDigests, PyProjectToml};
use uv_redacted::DisplaySafeUrl;
use uv_types::{BuildContext, BuildStack};
@ -638,11 +637,22 @@ impl<'a, Context: BuildContext> DistributionDatabase<'a, Context> {
hasher.finish().await.map_err(Error::HashExhaustion)?;
}
let hash_digests: HashDigests = hashers.into_iter().map(HashDigest::from).collect();
// Extract the SHA256 hash for content-addressing
let sha256 = hash_digests
.iter()
.find(|digest| digest.algorithm == HashAlgorithm::Sha256)
.expect("SHA256 hash must be present");
// Persist the temporary directory to the directory store.
let id = self
.build_context
self.build_context
.cache()
.persist(temp_dir.keep(), wheel_entry.path())
.persist(
temp_dir.keep(),
wheel_entry.path(),
ArchiveId::from_sha256(&sha256.digest),
)
.await
.map_err(Error::CacheRead)?;
@ -650,11 +660,7 @@ impl<'a, Context: BuildContext> DistributionDatabase<'a, Context> {
reporter.on_download_complete(dist.name(), progress);
}
Ok(Archive::new(
id,
hashers.into_iter().map(HashDigest::from).collect(),
filename.clone(),
))
Ok(Archive::new(hash_digests, filename.clone()))
}
.instrument(info_span!("wheel", wheel = %dist))
};
@ -800,29 +806,6 @@ impl<'a, Context: BuildContext> DistributionDatabase<'a, Context> {
.await
.map_err(Error::CacheWrite)?;
// If no hashes are required, parallelize the unzip operation.
let hashes = if hashes.is_none() {
let file = file.into_std().await;
tokio::task::spawn_blocking({
let target = temp_dir.path().to_owned();
move || -> Result<(), uv_extract::Error> {
// Unzip the wheel into a temporary directory.
match extension {
WheelExtension::Whl => {
uv_extract::unzip(file, &target)?;
}
WheelExtension::WhlZst => {
uv_extract::stream::untar_zst_file(file, &target)?;
}
}
Ok(())
}
})
.await?
.map_err(|err| Error::Extract(filename.to_string(), err))?;
HashDigests::empty()
} else {
// Create a hasher for each hash algorithm.
let algorithms = hashes.algorithms();
let mut hashers = algorithms.into_iter().map(Hasher::from).collect::<Vec<_>>();
@ -841,17 +824,25 @@ impl<'a, Context: BuildContext> DistributionDatabase<'a, Context> {
}
}
// If necessary, exhaust the reader to compute the hash.
// Exhaust the reader to compute the hash.
hasher.finish().await.map_err(Error::HashExhaustion)?;
hashers.into_iter().map(HashDigest::from).collect()
};
let hash_digests: HashDigests = hashers.into_iter().map(HashDigest::from).collect();
// Extract the SHA256 hash for content-addressing
let sha256 = hash_digests
.iter()
.find(|digest| digest.algorithm == HashAlgorithm::Sha256)
.expect("SHA256 hash must be present");
// Persist the temporary directory to the directory store.
let id = self
.build_context
self.build_context
.cache()
.persist(temp_dir.keep(), wheel_entry.path())
.persist(
temp_dir.keep(),
wheel_entry.path(),
ArchiveId::from_sha256(&sha256.digest),
)
.await
.map_err(Error::CacheRead)?;
@ -859,7 +850,7 @@ impl<'a, Context: BuildContext> DistributionDatabase<'a, Context> {
reporter.on_download_complete(dist.name(), progress);
}
Ok(Archive::new(id, hashes, filename.clone()))
Ok(Archive::new(hash_digests, filename.clone()))
}
.instrument(info_span!("wheel", wheel = %dist))
};
@ -965,33 +956,6 @@ impl<'a, Context: BuildContext> DistributionDatabase<'a, Context> {
// If the file is already unzipped, and the cache is up-to-date, return it.
if let Some(archive) = archive {
Ok(LocalWheel {
dist: Dist::Built(dist.clone()),
archive: self
.build_context
.cache()
.archive(&archive.id)
.into_boxed_path(),
hashes: archive.hashes,
filename: filename.clone(),
cache: CacheInfo::from_timestamp(modified),
build: None,
})
} else if hashes.is_none() {
// Otherwise, unzip the wheel.
let archive = Archive::new(
self.unzip_wheel(path, wheel_entry.path()).await?,
HashDigests::empty(),
filename.clone(),
);
// Write the archive pointer to the cache.
let pointer = LocalArchivePointer {
timestamp: modified,
archive: archive.clone(),
};
pointer.write_to(&pointer_entry).await?;
Ok(LocalWheel {
dist: Dist::Built(dist.clone()),
archive: self
@ -1005,14 +969,14 @@ impl<'a, Context: BuildContext> DistributionDatabase<'a, Context> {
build: None,
})
} else {
// If necessary, compute the hashes of the wheel.
// Otherwise, unzip the wheel and compute hashes (always including SHA256).
let file = fs_err::tokio::File::open(path)
.await
.map_err(Error::CacheRead)?;
let temp_dir = tempfile::tempdir_in(self.build_context.cache().root())
.map_err(Error::CacheWrite)?;
// Create a hasher for each hash algorithm.
// Create a hasher for each hash algorithm (always includes SHA256).
let algorithms = hashes.algorithms();
let mut hashers = algorithms.into_iter().map(Hasher::from).collect::<Vec<_>>();
let mut hasher = uv_extract::hash::HashReader::new(file, &mut hashers);
@ -1034,18 +998,27 @@ impl<'a, Context: BuildContext> DistributionDatabase<'a, Context> {
// Exhaust the reader to compute the hash.
hasher.finish().await.map_err(Error::HashExhaustion)?;
let hashes = hashers.into_iter().map(HashDigest::from).collect();
let hash_digests: HashDigests = hashers.into_iter().map(HashDigest::from).collect();
// Extract the SHA256 hash for content-addressing
let sha256 = hash_digests
.iter()
.find(|digest| digest.algorithm == HashAlgorithm::Sha256)
.expect("SHA256 hash must be present");
// Persist the temporary directory to the directory store.
let id = self
.build_context
self.build_context
.cache()
.persist(temp_dir.keep(), wheel_entry.path())
.persist(
temp_dir.keep(),
wheel_entry.path(),
ArchiveId::from_sha256(&sha256.digest),
)
.await
.map_err(Error::CacheWrite)?;
// Create an archive.
let archive = Archive::new(id, hashes, filename.clone());
let archive = Archive::new(hash_digests, filename.clone());
// Write the archive pointer to the cache.
let pointer = LocalArchivePointer {
@ -1071,25 +1044,39 @@ impl<'a, Context: BuildContext> DistributionDatabase<'a, Context> {
/// Unzip a wheel into the cache, returning the path to the unzipped directory.
async fn unzip_wheel(&self, path: &Path, target: &Path) -> Result<ArchiveId, Error> {
let temp_dir = tokio::task::spawn_blocking({
let path = path.to_owned();
let root = self.build_context.cache().root().to_path_buf();
move || -> Result<TempDir, Error> {
// Unzip the wheel into a temporary directory.
let temp_dir = tempfile::tempdir_in(root).map_err(Error::CacheWrite)?;
let reader = fs_err::File::open(&path).map_err(Error::CacheWrite)?;
uv_extract::unzip(reader, temp_dir.path())
// Open the wheel file for hashing
let file = fs_err::tokio::File::open(path)
.await
.map_err(Error::CacheWrite)?;
// Create a temporary directory for unzipping
let temp_dir =
tempfile::tempdir_in(self.build_context.cache().root()).map_err(Error::CacheWrite)?;
// Create a hasher to content-address the wheel.
let mut hashers = vec![Hasher::from(HashAlgorithm::Sha256)];
let mut hasher = uv_extract::hash::HashReader::new(file, &mut hashers);
// Unzip the wheel while computing the hash
uv_extract::stream::unzip(&mut hasher, temp_dir.path())
.await
.map_err(|err| Error::Extract(path.to_string_lossy().into_owned(), err))?;
Ok(temp_dir)
}
})
.await??;
// Exhaust the reader to complete the hash computation
hasher.finish().await.map_err(Error::HashExhaustion)?;
// Extract the digest.
let hash_digest = HashDigest::from(hashers.into_iter().next().expect("SHA256 hasher"));
// Persist the temporary directory to the directory store.
let id = self
.build_context
.cache()
.persist(temp_dir.keep(), target)
.persist(
temp_dir.keep(),
target,
ArchiveId::from_sha256(&hash_digest.digest),
)
.await
.map_err(Error::CacheWrite)?;

View File

@ -10,7 +10,7 @@ use crate::commands::project::{
use crate::printer::Printer;
use crate::settings::ResolverInstallerSettings;
use uv_cache::{Cache, CacheBucket};
use uv_cache::{ArchiveId, Cache, CacheBucket};
use uv_cache_key::{cache_digest, hash_digest};
use uv_client::BaseClientBuilder;
use uv_configuration::{Concurrency, Constraints, TargetTriple};
@ -212,7 +212,9 @@ impl CachedEnvironment {
.await?;
// Now that the environment is complete, sync it to its content-addressed location.
let id = cache.persist(temp_dir.keep(), cache_entry.path()).await?;
let id = cache
.persist(temp_dir.keep(), cache_entry.path(), ArchiveId::nanoid())
.await?;
let root = cache.archive(&id);
Ok(Self(PythonEnvironment::from_root(root, cache)?))

View File

@ -136,7 +136,7 @@ fn clean_package_pypi() -> Result<()> {
----- stderr -----
DEBUG uv [VERSION] ([COMMIT] DATE)
DEBUG Acquired lock for `[CACHE_DIR]/`
DEBUG Removing dangling cache entry: [CACHE_DIR]/archive-v0/[ENTRY]
DEBUG Removing dangling cache entry: [CACHE_DIR]/archive-v1/[ENTRY]
Removed [N] files ([SIZE])
DEBUG Released lock at `[CACHE_DIR]/.lock`
");
@ -215,7 +215,7 @@ fn clean_package_index() -> Result<()> {
----- stderr -----
DEBUG uv [VERSION] ([COMMIT] DATE)
DEBUG Acquired lock for `[CACHE_DIR]/`
DEBUG Removing dangling cache entry: [CACHE_DIR]/archive-v0/[ENTRY]
DEBUG Removing dangling cache entry: [CACHE_DIR]/archive-v1/[ENTRY]
Removed [N] files ([SIZE])
DEBUG Released lock at `[CACHE_DIR]/.lock`
");

View File

@ -142,7 +142,7 @@ fn prune_cached_env() {
DEBUG Acquired lock for `[CACHE_DIR]/`
Pruning cache at: [CACHE_DIR]/
DEBUG Removing dangling cache environment: [CACHE_DIR]/environments-v2/[ENTRY]
DEBUG Removing dangling cache archive: [CACHE_DIR]/archive-v0/[ENTRY]
DEBUG Removing dangling cache archive: [CACHE_DIR]/archive-v1/[ENTRY]
Removed [N] files ([SIZE])
DEBUG Released lock at `[CACHE_DIR]/.lock`
");
@ -188,7 +188,7 @@ fn prune_stale_symlink() -> Result<()> {
DEBUG uv [VERSION] ([COMMIT] DATE)
DEBUG Acquired lock for `[CACHE_DIR]/`
Pruning cache at: [CACHE_DIR]/
DEBUG Removing dangling cache archive: [CACHE_DIR]/archive-v0/[ENTRY]
DEBUG Removing dangling cache archive: [CACHE_DIR]/archive-v1/[ENTRY]
Removed 44 files ([SIZE])
DEBUG Released lock at `[CACHE_DIR]/.lock`
");
@ -409,7 +409,7 @@ fn prune_stale_revision() -> Result<()> {
DEBUG Acquired lock for `[CACHE_DIR]/`
Pruning cache at: [CACHE_DIR]/
DEBUG Removing dangling source revision: [CACHE_DIR]/sdists-v9/[ENTRY]
DEBUG Removing dangling cache archive: [CACHE_DIR]/archive-v0/[ENTRY]
DEBUG Removing dangling cache archive: [CACHE_DIR]/archive-v1/[ENTRY]
Removed [N] files ([SIZE])
DEBUG Released lock at `[CACHE_DIR]/.lock`
");

View File

@ -246,7 +246,7 @@ fn find_uv_bin_in_ephemeral_environment() -> anyhow::Result<()> {
success: true
exit_code: 0
----- stdout -----
[CACHE_DIR]/archive-v0/[HASH]/[BIN]/uv
[CACHE_DIR]/archive-v1/[HASH]/[BIN]/uv
----- stderr -----
Resolved 1 package in [TIME]

View File

@ -53,10 +53,10 @@ Collecting numpy==1.19.5
ERROR: Exception:
Traceback (most recent call last):
...
File "/Users/example/.cache/uv/archive-v0/3783IbOdglemN3ieOULx2/lib/python3.13/site-packages/pip/_vendor/pyproject_hooks/_impl.py", line 321, in _call_hook
File "/Users/example/.cache/uv/archive-v1/3783IbOdglemN3ieOULx2/lib/python3.13/site-packages/pip/_vendor/pyproject_hooks/_impl.py", line 321, in _call_hook
raise BackendUnavailable(data.get('traceback', ''))
pip._vendor.pyproject_hooks._impl.BackendUnavailable: Traceback (most recent call last):
File "/Users/example/.cache/uv/archive-v0/3783IbOdglemN3ieOULx2/lib/python3.13/site-packages/pip/_vendor/pyproject_hooks/_in_process/_in_process.py", line 77, in _build_backend
File "/Users/example/.cache/uv/archive-v1/3783IbOdglemN3ieOULx2/lib/python3.13/site-packages/pip/_vendor/pyproject_hooks/_in_process/_in_process.py", line 77, in _build_backend
obj = import_module(mod_path)
File "/Users/example/.local/share/uv/python/cpython-3.13.0-macos-aarch64-none/lib/python3.13/importlib/__init__.py", line 88, in import_module
return _bootstrap._gcd_import(name[level:], package, level)