mirror of https://github.com/astral-sh/uv
Use symlinks for directories entries in cache (#1037)
## Summary One problem we have in the cache today is that we can't overwrite entries atomically, because we store unzipped _directories_ in the cache (which makes installation _much_ faster than storing zipped directories). So, if you ignore the existing contents of the cache when writing, you might run into an error, because you might attempt to write a directory where a directory already exists. This is especially annoying for cache refresh, because in order to refresh the cache, we have to purge it (i.e., delete a bunch of stuff), which is also highly unsafe if Puffin is running across multiple threads or multiple processes. The solution I'm proposing here is that whenever we persist a _directory_ to the cache, we persist it to a special "archive" bucket. Then, within the other buckets, directory entries are actually symlinks into that "archive" bucket. With symlinks, we can atomically replace, which means we can easily overwrite cache entries without having to delete from the cache. The main downside is that we'll now accumulate dangling entries in the "archive" bucket, and so we'll need to implement some form of garbage collection to ensure that we remove entries with no symlinks. Another downside is that cache reads and writes will be a bit slower, since we need to deal with creating and resolving these symlinks. As an example... after this change, the cache entry for this unzipped wheel is actually a symlink:  Then, within the archive directory, we actually have two unique entries (since I intentionally ran the command twice to ensure overwrites were safe): 
This commit is contained in:
parent
556080225d
commit
5621c414cf
|
|
@ -2365,6 +2365,7 @@ dependencies = [
|
|||
"tempfile",
|
||||
"tracing",
|
||||
"url",
|
||||
"uuid",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
|
|
|
|||
|
|
@ -28,3 +28,4 @@ serde = { workspace = true, features = ["derive"] }
|
|||
tempfile = { workspace = true }
|
||||
tracing = { workspace = true }
|
||||
url = { workspace = true }
|
||||
uuid = { workspace = true , default-features = false, features = ["v4"] }
|
||||
|
|
|
|||
|
|
@ -149,6 +149,32 @@ impl Cache {
|
|||
CacheEntry::new(self.bucket(cache_bucket).join(dir), file)
|
||||
}
|
||||
|
||||
/// Persist a temporary directory to the artifact store.
|
||||
pub fn persist(
|
||||
&self,
|
||||
temp_dir: impl AsRef<Path>,
|
||||
path: impl AsRef<Path>,
|
||||
) -> Result<(), io::Error> {
|
||||
// Create a unique ID for the artifact.
|
||||
let id = uuid::Uuid::new_v4();
|
||||
|
||||
// Move the temporary directory into the directory store.
|
||||
let archive_entry = self.entry(CacheBucket::Archive, "", id.to_string());
|
||||
fs_err::create_dir_all(archive_entry.dir())?;
|
||||
fs_err::rename(temp_dir.as_ref(), archive_entry.path())?;
|
||||
|
||||
// Create a symlink to the directory store.
|
||||
let temp_dir = tempfile::tempdir_in(self.root())?;
|
||||
let temp_file = temp_dir.path().join("symlink");
|
||||
puffin_fs::symlink_dir(archive_entry.path(), &temp_file)?;
|
||||
|
||||
// Move the symlink into the wheel cache.
|
||||
fs_err::create_dir_all(path.as_ref().parent().expect("Cache entry to have parent"))?;
|
||||
fs_err::rename(&temp_file, path.as_ref())?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Initialize a directory for use as a cache.
|
||||
fn init(root: impl Into<PathBuf>) -> Result<PathBuf, io::Error> {
|
||||
let root = root.into();
|
||||
|
|
@ -413,6 +439,12 @@ pub enum CacheBucket {
|
|||
///
|
||||
/// The response is parsed into `puffin_client::SimpleMetadata` before storage.
|
||||
Simple,
|
||||
/// A cache of unzipped wheels, stored as directories. This is used internally within the cache.
|
||||
/// When other buckets need to store directories, they should persist them to
|
||||
/// [`CacheBucket::Archive`], and then symlink them into the appropriate bucket. This ensures
|
||||
/// that cache entries can be atomically replaced and removed, as storing directories in the
|
||||
/// other buckets directly would make atomic operations impossible.
|
||||
Archive,
|
||||
}
|
||||
|
||||
impl CacheBucket {
|
||||
|
|
@ -424,6 +456,7 @@ impl CacheBucket {
|
|||
CacheBucket::Interpreter => "interpreter-v0",
|
||||
CacheBucket::Simple => "simple-v0",
|
||||
CacheBucket::Wheels => "wheels-v0",
|
||||
CacheBucket::Archive => "archive-v0",
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -542,6 +575,9 @@ impl CacheBucket {
|
|||
CacheBucket::Interpreter => {
|
||||
// Nothing to do.
|
||||
}
|
||||
CacheBucket::Archive => {
|
||||
// Nothing to do.
|
||||
}
|
||||
}
|
||||
Ok(count)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -46,7 +46,6 @@ pub enum DistributionDatabaseError {
|
|||
SourceBuild(#[from] SourceDistError),
|
||||
#[error("Git operation failed")]
|
||||
Git(#[source] anyhow::Error),
|
||||
/// Should not occur, i've only seen it when another task panicked
|
||||
#[error("The task executor is broken, did some other task panic?")]
|
||||
Join(#[from] JoinError),
|
||||
#[error("Building source distributions is disabled")]
|
||||
|
|
@ -166,23 +165,21 @@ impl<'a, Context: BuildContext + Send + Sync> DistributionDatabase<'a, Context>
|
|||
|
||||
// Download and unzip the wheel to a temporary directory.
|
||||
let temp_dir = tempfile::tempdir_in(self.cache.root())?;
|
||||
let temp_target = temp_dir.path().join(&wheel.file.filename);
|
||||
unzip_no_seek(reader.compat(), &temp_target).await?;
|
||||
unzip_no_seek(reader.compat(), temp_dir.path()).await?;
|
||||
|
||||
// Move the temporary file to the cache.
|
||||
// Persist the temporary directory to the directory store.
|
||||
let wheel_filename = WheelFilename::from_str(&wheel.file.filename)?;
|
||||
let cache_entry = self.cache.entry(
|
||||
CacheBucket::Wheels,
|
||||
WheelCache::Index(&wheel.index).remote_wheel_dir(wheel_filename.name.as_ref()),
|
||||
wheel_filename.stem(),
|
||||
);
|
||||
fs_err::tokio::create_dir_all(&cache_entry.dir()).await?;
|
||||
let target = cache_entry.into_path_buf();
|
||||
fs_err::tokio::rename(temp_target, &target).await?;
|
||||
self.cache
|
||||
.persist(temp_dir.into_path(), cache_entry.path())?;
|
||||
|
||||
Ok(LocalWheel::Unzipped(UnzippedWheel {
|
||||
dist: dist.clone(),
|
||||
target,
|
||||
target: cache_entry.into_path_buf(),
|
||||
filename: wheel_filename,
|
||||
}))
|
||||
}
|
||||
|
|
@ -196,26 +193,22 @@ impl<'a, Context: BuildContext + Send + Sync> DistributionDatabase<'a, Context>
|
|||
|
||||
// Download and unzip the wheel to a temporary directory.
|
||||
let temp_dir = tempfile::tempdir_in(self.cache.root())?;
|
||||
let temp_target = temp_dir.path().join(wheel.filename.to_string());
|
||||
unzip_no_seek(reader.compat(), &temp_target).await?;
|
||||
unzip_no_seek(reader.compat(), temp_dir.path()).await?;
|
||||
|
||||
// Move the temporary file to the cache.
|
||||
// Persist the temporary directory to the directory store.
|
||||
let cache_entry = self.cache.entry(
|
||||
CacheBucket::Wheels,
|
||||
WheelCache::Url(&wheel.url).remote_wheel_dir(wheel.name().as_ref()),
|
||||
wheel.filename.stem(),
|
||||
);
|
||||
fs_err::tokio::create_dir_all(&cache_entry.dir()).await?;
|
||||
let target = cache_entry.into_path_buf();
|
||||
fs_err::tokio::rename(temp_target, &target).await?;
|
||||
self.cache
|
||||
.persist(temp_dir.into_path(), cache_entry.path())?;
|
||||
|
||||
let local_wheel = LocalWheel::Unzipped(UnzippedWheel {
|
||||
Ok(LocalWheel::Unzipped(UnzippedWheel {
|
||||
dist: dist.clone(),
|
||||
target,
|
||||
target: cache_entry.into_path_buf(),
|
||||
filename: wheel.filename.clone(),
|
||||
});
|
||||
|
||||
Ok(local_wheel)
|
||||
}))
|
||||
}
|
||||
|
||||
Dist::Built(BuiltDist::Path(wheel)) => {
|
||||
|
|
|
|||
|
|
@ -1,7 +1,7 @@
|
|||
use distribution_types::{git_reference, DirectUrlSourceDist, GitSourceDist, Name, PathSourceDist};
|
||||
use platform_tags::Tags;
|
||||
use puffin_cache::{Cache, CacheBucket, CacheShard, WheelCache};
|
||||
use puffin_fs::directories;
|
||||
use puffin_fs::symlinks;
|
||||
|
||||
use crate::index::cached_wheel::CachedWheel;
|
||||
use crate::source::{read_http_manifest, read_timestamp_manifest, MANIFEST};
|
||||
|
|
@ -94,7 +94,8 @@ impl BuiltWheelIndex {
|
|||
pub fn find(shard: &CacheShard, tags: &Tags) -> Option<CachedWheel> {
|
||||
let mut candidate: Option<CachedWheel> = None;
|
||||
|
||||
for subdir in directories(shard) {
|
||||
// Unzipped wheels are stored as symlinks into the archive directory.
|
||||
for subdir in symlinks(shard) {
|
||||
match CachedWheel::from_path(&subdir) {
|
||||
None => {}
|
||||
Some(dist_info) => {
|
||||
|
|
|
|||
|
|
@ -18,9 +18,6 @@ impl CachedWheel {
|
|||
pub fn from_path(path: &Path) -> Option<Self> {
|
||||
let filename = path.file_name()?.to_str()?;
|
||||
let filename = WheelFilename::from_stem(filename).ok()?;
|
||||
if path.is_file() {
|
||||
return None;
|
||||
}
|
||||
let entry = CacheEntry::from_path(path);
|
||||
Some(Self { filename, entry })
|
||||
}
|
||||
|
|
|
|||
|
|
@ -8,7 +8,7 @@ use distribution_types::{CachedRegistryDist, FlatIndexLocation, IndexLocations,
|
|||
use pep440_rs::Version;
|
||||
use platform_tags::Tags;
|
||||
use puffin_cache::{Cache, CacheBucket, WheelCache};
|
||||
use puffin_fs::directories;
|
||||
use puffin_fs::{directories, symlinks};
|
||||
use puffin_normalize::PackageName;
|
||||
|
||||
use crate::index::cached_wheel::CachedWheel;
|
||||
|
|
@ -125,7 +125,8 @@ impl<'a> RegistryWheelIndex<'a> {
|
|||
tags: &Tags,
|
||||
versions: &mut BTreeMap<Version, CachedRegistryDist>,
|
||||
) {
|
||||
for wheel_dir in directories(path.as_ref()) {
|
||||
// Unzipped wheels are stored as symlinks into the archive directory.
|
||||
for wheel_dir in symlinks(path.as_ref()) {
|
||||
match CachedWheel::from_path(&wheel_dir) {
|
||||
None => {}
|
||||
Some(dist_info) => {
|
||||
|
|
|
|||
|
|
@ -11,7 +11,7 @@ use futures::{FutureExt, TryStreamExt};
|
|||
use reqwest::Response;
|
||||
use tempfile::TempDir;
|
||||
use tokio_util::compat::FuturesAsyncReadCompatExt;
|
||||
use tracing::{debug, info_span, instrument, warn, Instrument};
|
||||
use tracing::{debug, info_span, instrument, Instrument};
|
||||
use url::Url;
|
||||
use zip::ZipArchive;
|
||||
|
||||
|
|
@ -674,29 +674,22 @@ impl<'a, T: BuildContext> SourceDistCachedBuilder<'a, T> {
|
|||
// Download the source distribution to a temporary file.
|
||||
let span =
|
||||
info_span!("download_source_dist", filename = filename, source_dist = %source_dist);
|
||||
let (temp_dir, source_dist_archive) =
|
||||
self.download_source_dist_url(response, filename).await?;
|
||||
let download_dir = self.download_source_dist_url(response, filename).await?;
|
||||
drop(span);
|
||||
|
||||
// Unzip the source distribution to a temporary directory.
|
||||
let span =
|
||||
info_span!("extract_source_dist", filename = filename, source_dist = %source_dist);
|
||||
let source_dist_dir = puffin_extract::extract_source(
|
||||
&source_dist_archive,
|
||||
temp_dir.path().join("extracted"),
|
||||
download_dir.path().join(filename),
|
||||
download_dir.path().join("extracted"),
|
||||
)?;
|
||||
drop(span);
|
||||
|
||||
// Persist the unzipped distribution to the cache.
|
||||
fs::create_dir_all(&cache_entry.dir()).await?;
|
||||
if let Err(err) = fs_err::rename(&source_dist_dir, cache_path) {
|
||||
// If another thread already cached the distribution, we can ignore the error.
|
||||
if cache_path.is_dir() {
|
||||
warn!("Downloaded already-cached distribution: {source_dist}");
|
||||
} else {
|
||||
return Err(err.into());
|
||||
};
|
||||
}
|
||||
self.build_context
|
||||
.cache()
|
||||
.persist(source_dist_dir, cache_path)?;
|
||||
|
||||
Ok(cache_path)
|
||||
}
|
||||
|
|
@ -706,7 +699,7 @@ impl<'a, T: BuildContext> SourceDistCachedBuilder<'a, T> {
|
|||
&self,
|
||||
response: Response,
|
||||
source_dist_filename: &str,
|
||||
) -> Result<(TempDir, PathBuf), puffin_client::Error> {
|
||||
) -> Result<TempDir, puffin_client::Error> {
|
||||
let reader = response
|
||||
.bytes_stream()
|
||||
.map_err(|err| std::io::Error::new(std::io::ErrorKind::Other, err))
|
||||
|
|
@ -714,16 +707,12 @@ impl<'a, T: BuildContext> SourceDistCachedBuilder<'a, T> {
|
|||
let mut reader = tokio::io::BufReader::new(reader.compat());
|
||||
|
||||
// Create a temporary directory.
|
||||
let cache_dir = self.build_context.cache().bucket(CacheBucket::BuiltWheels);
|
||||
fs::create_dir_all(&cache_dir)
|
||||
.await
|
||||
let temp_dir = tempfile::tempdir_in(self.build_context.cache().root())
|
||||
.map_err(puffin_client::Error::CacheWrite)?;
|
||||
let temp_dir = tempfile::tempdir_in(cache_dir).map_err(puffin_client::Error::CacheWrite)?;
|
||||
|
||||
// Download the source distribution to a temporary file.
|
||||
let sdist_file = temp_dir.path().join(source_dist_filename);
|
||||
let mut writer = tokio::io::BufWriter::new(
|
||||
fs_err::tokio::File::create(&sdist_file)
|
||||
fs_err::tokio::File::create(temp_dir.path().join(source_dist_filename))
|
||||
.await
|
||||
.map_err(puffin_client::Error::CacheWrite)?,
|
||||
);
|
||||
|
|
@ -731,7 +720,7 @@ impl<'a, T: BuildContext> SourceDistCachedBuilder<'a, T> {
|
|||
.await
|
||||
.map_err(puffin_client::Error::CacheWrite)?;
|
||||
|
||||
Ok((temp_dir, sdist_file))
|
||||
Ok(temp_dir)
|
||||
}
|
||||
|
||||
/// Download a source distribution from a Git repository.
|
||||
|
|
|
|||
|
|
@ -8,6 +8,18 @@ use tracing::{error, warn};
|
|||
|
||||
use puffin_warnings::warn_user;
|
||||
|
||||
/// Symlink a directory.
|
||||
#[cfg(windows)]
|
||||
pub fn symlink_dir(src: impl AsRef<Path>, dst: impl AsRef<Path>) -> std::io::Result<()> {
|
||||
std::os::windows::fs::symlink_dir(src, dst)
|
||||
}
|
||||
|
||||
/// Symlink a directory.
|
||||
#[cfg(unix)]
|
||||
pub fn symlink_dir(src: impl AsRef<Path>, dst: impl AsRef<Path>) -> std::io::Result<()> {
|
||||
std::os::unix::fs::symlink(src, dst)
|
||||
}
|
||||
|
||||
/// Write `data` to `path` atomically using a temporary file and atomic rename.
|
||||
pub async fn write_atomic(path: impl AsRef<Path>, data: impl AsRef<[u8]>) -> std::io::Result<()> {
|
||||
let temp_file = NamedTempFile::new_in(
|
||||
|
|
@ -95,6 +107,30 @@ pub fn directories(path: impl AsRef<Path>) -> impl Iterator<Item = PathBuf> {
|
|||
.map(|entry| entry.path())
|
||||
}
|
||||
|
||||
/// Iterate over the symlinks in a directory.
|
||||
///
|
||||
/// If the directory does not exist, returns an empty iterator.
|
||||
pub fn symlinks(path: impl AsRef<Path>) -> impl Iterator<Item = PathBuf> {
|
||||
path.as_ref()
|
||||
.read_dir()
|
||||
.ok()
|
||||
.into_iter()
|
||||
.flatten()
|
||||
.filter_map(|entry| match entry {
|
||||
Ok(entry) => Some(entry),
|
||||
Err(err) => {
|
||||
warn!("Failed to read entry: {}", err);
|
||||
None
|
||||
}
|
||||
})
|
||||
.filter(|entry| {
|
||||
entry
|
||||
.file_type()
|
||||
.map_or(false, |file_type| file_type.is_symlink())
|
||||
})
|
||||
.map(|entry| entry.path())
|
||||
}
|
||||
|
||||
/// A file lock that is automatically released when dropped.
|
||||
#[derive(Debug)]
|
||||
pub struct LockedFile(fs_err::File);
|
||||
|
|
|
|||
|
|
@ -4,7 +4,7 @@ use std::sync::Arc;
|
|||
|
||||
use futures::{FutureExt, Stream, StreamExt, TryFutureExt, TryStreamExt};
|
||||
use tokio::task::JoinError;
|
||||
use tracing::{instrument, warn};
|
||||
use tracing::instrument;
|
||||
use url::Url;
|
||||
|
||||
use distribution_types::{CachedDist, Dist, Identifier, LocalEditable, RemoteSource, SourceDist};
|
||||
|
|
@ -36,6 +36,7 @@ pub enum Error {
|
|||
/// Download, build, and unzip a set of distributions.
|
||||
pub struct Downloader<'a, Context: BuildContext + Send + Sync> {
|
||||
database: DistributionDatabase<'a, Context>,
|
||||
cache: &'a Cache,
|
||||
reporter: Option<Arc<dyn Reporter>>,
|
||||
}
|
||||
|
||||
|
|
@ -49,6 +50,7 @@ impl<'a, Context: BuildContext + Send + Sync> Downloader<'a, Context> {
|
|||
Self {
|
||||
database: DistributionDatabase::new(cache, tags, client, build_context),
|
||||
reporter: None,
|
||||
cache,
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -59,6 +61,7 @@ impl<'a, Context: BuildContext + Send + Sync> Downloader<'a, Context> {
|
|||
Self {
|
||||
reporter: Some(reporter.clone()),
|
||||
database: self.database.with_reporter(Facade::from(reporter.clone())),
|
||||
cache: self.cache,
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -124,7 +127,7 @@ impl<'a, Context: BuildContext + Send + Sync> Downloader<'a, Context> {
|
|||
.build_wheel_editable(&editable, editable_wheel_dir)
|
||||
.await
|
||||
.map_err(Error::Editable)?;
|
||||
let cached_dist = Self::unzip_wheel(local_wheel).await?;
|
||||
let cached_dist = self.unzip_wheel(local_wheel).await?;
|
||||
if let Some(task_id) = task_id {
|
||||
if let Some(reporter) = &self.reporter {
|
||||
reporter.on_editable_build_complete(&editable, task_id);
|
||||
|
|
@ -153,7 +156,7 @@ impl<'a, Context: BuildContext + Send + Sync> Downloader<'a, Context> {
|
|||
}
|
||||
|
||||
/// Download, build, and unzip a single wheel.
|
||||
#[instrument(skip_all, fields(name = % dist, size = ? dist.size(), url = dist.file().map(|file| file.url.to_string()).unwrap_or_default()))]
|
||||
#[instrument(skip_all, fields(name = % dist, size = ? dist.size(), url = dist.file().map(| file | file.url.to_string()).unwrap_or_default()))]
|
||||
pub async fn get_wheel(&self, dist: Dist, in_flight: &InFlight) -> Result<CachedDist, Error> {
|
||||
let id = dist.distribution_id();
|
||||
let wheel = if in_flight.downloads.register(&id) {
|
||||
|
|
@ -163,7 +166,7 @@ impl<'a, Context: BuildContext + Send + Sync> Downloader<'a, Context> {
|
|||
.boxed()
|
||||
.map_err(|err| Error::Fetch(dist.clone(), err))
|
||||
.await?;
|
||||
let result = Self::unzip_wheel(download).await;
|
||||
let result = self.unzip_wheel(download).await;
|
||||
match result {
|
||||
Ok(cached) => {
|
||||
in_flight.downloads.done(id, Ok(cached.clone()));
|
||||
|
|
@ -188,41 +191,23 @@ impl<'a, Context: BuildContext + Send + Sync> Downloader<'a, Context> {
|
|||
}
|
||||
|
||||
/// Unzip a locally-available wheel into the cache.
|
||||
async fn unzip_wheel(download: LocalWheel) -> Result<CachedDist, Error> {
|
||||
async fn unzip_wheel(&self, download: LocalWheel) -> Result<CachedDist, Error> {
|
||||
// Just an optimization: Avoid spawning a blocking task if there is no work to be done.
|
||||
if matches!(download, LocalWheel::Unzipped(_)) {
|
||||
return Ok(download.into_cached_dist());
|
||||
}
|
||||
|
||||
// If the wheel is already unpacked, we should avoid attempting to unzip it at all.
|
||||
if download.target().is_dir() {
|
||||
warn!("Wheel is already unpacked: {download}");
|
||||
return Ok(download.into_cached_dist());
|
||||
}
|
||||
|
||||
// Unzip the wheel.
|
||||
tokio::task::spawn_blocking({
|
||||
let download = download.clone();
|
||||
let cache = self.cache.clone();
|
||||
move || -> Result<(), puffin_extract::Error> {
|
||||
// Unzip the wheel into a temporary directory.
|
||||
let parent = download
|
||||
.target()
|
||||
.parent()
|
||||
.expect("Cache paths can't be root");
|
||||
fs_err::create_dir_all(parent)?;
|
||||
let staging = tempfile::tempdir_in(parent)?;
|
||||
download.unzip(staging.path())?;
|
||||
let temp_dir = tempfile::tempdir_in(cache.root())?;
|
||||
download.unzip(temp_dir.path())?;
|
||||
|
||||
// Move the unzipped wheel into the cache.
|
||||
if let Err(err) = fs_err::rename(staging.into_path(), download.target()) {
|
||||
// If another thread already unpacked the wheel, we can ignore the error.
|
||||
return if download.target().is_dir() {
|
||||
warn!("Wheel is already unpacked: {download}");
|
||||
Ok(())
|
||||
} else {
|
||||
Err(err.into())
|
||||
};
|
||||
}
|
||||
// Persist the temporary directory to the directory store.
|
||||
cache.persist(temp_dir.into_path(), download.target())?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
|
|
|||
|
|
@ -48,7 +48,7 @@ impl<'a> Installer<'a> {
|
|||
|
||||
install_wheel_rs::linker::install_wheel(
|
||||
&location,
|
||||
wheel.path(),
|
||||
wheel.path().canonicalize()?,
|
||||
wheel.filename(),
|
||||
wheel
|
||||
.direct_url()?
|
||||
|
|
|
|||
Loading…
Reference in New Issue