diff --git a/Cargo.lock b/Cargo.lock index 495c7d1c0..ec149b32d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2365,6 +2365,7 @@ dependencies = [ "tempfile", "tracing", "url", + "uuid", ] [[package]] diff --git a/crates/puffin-cache/Cargo.toml b/crates/puffin-cache/Cargo.toml index c89425da8..71e7af5a9 100644 --- a/crates/puffin-cache/Cargo.toml +++ b/crates/puffin-cache/Cargo.toml @@ -28,3 +28,4 @@ serde = { workspace = true, features = ["derive"] } tempfile = { workspace = true } tracing = { workspace = true } url = { workspace = true } +uuid = { workspace = true , default-features = false, features = ["v4"] } diff --git a/crates/puffin-cache/src/lib.rs b/crates/puffin-cache/src/lib.rs index 06d24e7d8..ef9fa9469 100644 --- a/crates/puffin-cache/src/lib.rs +++ b/crates/puffin-cache/src/lib.rs @@ -149,6 +149,32 @@ impl Cache { CacheEntry::new(self.bucket(cache_bucket).join(dir), file) } + /// Persist a temporary directory to the artifact store. + pub fn persist( + &self, + temp_dir: impl AsRef, + path: impl AsRef, + ) -> Result<(), io::Error> { + // Create a unique ID for the artifact. + let id = uuid::Uuid::new_v4(); + + // Move the temporary directory into the directory store. + let archive_entry = self.entry(CacheBucket::Archive, "", id.to_string()); + fs_err::create_dir_all(archive_entry.dir())?; + fs_err::rename(temp_dir.as_ref(), archive_entry.path())?; + + // Create a symlink to the directory store. + let temp_dir = tempfile::tempdir_in(self.root())?; + let temp_file = temp_dir.path().join("symlink"); + puffin_fs::symlink_dir(archive_entry.path(), &temp_file)?; + + // Move the symlink into the wheel cache. + fs_err::create_dir_all(path.as_ref().parent().expect("Cache entry to have parent"))?; + fs_err::rename(&temp_file, path.as_ref())?; + + Ok(()) + } + /// Initialize a directory for use as a cache. fn init(root: impl Into) -> Result { let root = root.into(); @@ -413,6 +439,12 @@ pub enum CacheBucket { /// /// The response is parsed into `puffin_client::SimpleMetadata` before storage. Simple, + /// A cache of unzipped wheels, stored as directories. This is used internally within the cache. + /// When other buckets need to store directories, they should persist them to + /// [`CacheBucket::Archive`], and then symlink them into the appropriate bucket. This ensures + /// that cache entries can be atomically replaced and removed, as storing directories in the + /// other buckets directly would make atomic operations impossible. + Archive, } impl CacheBucket { @@ -424,6 +456,7 @@ impl CacheBucket { CacheBucket::Interpreter => "interpreter-v0", CacheBucket::Simple => "simple-v0", CacheBucket::Wheels => "wheels-v0", + CacheBucket::Archive => "archive-v0", } } @@ -542,6 +575,9 @@ impl CacheBucket { CacheBucket::Interpreter => { // Nothing to do. } + CacheBucket::Archive => { + // Nothing to do. + } } Ok(count) } diff --git a/crates/puffin-distribution/src/distribution_database.rs b/crates/puffin-distribution/src/distribution_database.rs index 8d755c24d..f761404ef 100644 --- a/crates/puffin-distribution/src/distribution_database.rs +++ b/crates/puffin-distribution/src/distribution_database.rs @@ -46,7 +46,6 @@ pub enum DistributionDatabaseError { SourceBuild(#[from] SourceDistError), #[error("Git operation failed")] Git(#[source] anyhow::Error), - /// Should not occur, i've only seen it when another task panicked #[error("The task executor is broken, did some other task panic?")] Join(#[from] JoinError), #[error("Building source distributions is disabled")] @@ -166,23 +165,21 @@ impl<'a, Context: BuildContext + Send + Sync> DistributionDatabase<'a, Context> // Download and unzip the wheel to a temporary directory. let temp_dir = tempfile::tempdir_in(self.cache.root())?; - let temp_target = temp_dir.path().join(&wheel.file.filename); - unzip_no_seek(reader.compat(), &temp_target).await?; + unzip_no_seek(reader.compat(), temp_dir.path()).await?; - // Move the temporary file to the cache. + // Persist the temporary directory to the directory store. let wheel_filename = WheelFilename::from_str(&wheel.file.filename)?; let cache_entry = self.cache.entry( CacheBucket::Wheels, WheelCache::Index(&wheel.index).remote_wheel_dir(wheel_filename.name.as_ref()), wheel_filename.stem(), ); - fs_err::tokio::create_dir_all(&cache_entry.dir()).await?; - let target = cache_entry.into_path_buf(); - fs_err::tokio::rename(temp_target, &target).await?; + self.cache + .persist(temp_dir.into_path(), cache_entry.path())?; Ok(LocalWheel::Unzipped(UnzippedWheel { dist: dist.clone(), - target, + target: cache_entry.into_path_buf(), filename: wheel_filename, })) } @@ -196,26 +193,22 @@ impl<'a, Context: BuildContext + Send + Sync> DistributionDatabase<'a, Context> // Download and unzip the wheel to a temporary directory. let temp_dir = tempfile::tempdir_in(self.cache.root())?; - let temp_target = temp_dir.path().join(wheel.filename.to_string()); - unzip_no_seek(reader.compat(), &temp_target).await?; + unzip_no_seek(reader.compat(), temp_dir.path()).await?; - // Move the temporary file to the cache. + // Persist the temporary directory to the directory store. let cache_entry = self.cache.entry( CacheBucket::Wheels, WheelCache::Url(&wheel.url).remote_wheel_dir(wheel.name().as_ref()), wheel.filename.stem(), ); - fs_err::tokio::create_dir_all(&cache_entry.dir()).await?; - let target = cache_entry.into_path_buf(); - fs_err::tokio::rename(temp_target, &target).await?; + self.cache + .persist(temp_dir.into_path(), cache_entry.path())?; - let local_wheel = LocalWheel::Unzipped(UnzippedWheel { + Ok(LocalWheel::Unzipped(UnzippedWheel { dist: dist.clone(), - target, + target: cache_entry.into_path_buf(), filename: wheel.filename.clone(), - }); - - Ok(local_wheel) + })) } Dist::Built(BuiltDist::Path(wheel)) => { diff --git a/crates/puffin-distribution/src/index/built_wheel_index.rs b/crates/puffin-distribution/src/index/built_wheel_index.rs index 04c44e1bb..36120c188 100644 --- a/crates/puffin-distribution/src/index/built_wheel_index.rs +++ b/crates/puffin-distribution/src/index/built_wheel_index.rs @@ -1,7 +1,7 @@ use distribution_types::{git_reference, DirectUrlSourceDist, GitSourceDist, Name, PathSourceDist}; use platform_tags::Tags; use puffin_cache::{Cache, CacheBucket, CacheShard, WheelCache}; -use puffin_fs::directories; +use puffin_fs::symlinks; use crate::index::cached_wheel::CachedWheel; use crate::source::{read_http_manifest, read_timestamp_manifest, MANIFEST}; @@ -94,7 +94,8 @@ impl BuiltWheelIndex { pub fn find(shard: &CacheShard, tags: &Tags) -> Option { let mut candidate: Option = None; - for subdir in directories(shard) { + // Unzipped wheels are stored as symlinks into the archive directory. + for subdir in symlinks(shard) { match CachedWheel::from_path(&subdir) { None => {} Some(dist_info) => { diff --git a/crates/puffin-distribution/src/index/cached_wheel.rs b/crates/puffin-distribution/src/index/cached_wheel.rs index 33f9902e4..3f8945f05 100644 --- a/crates/puffin-distribution/src/index/cached_wheel.rs +++ b/crates/puffin-distribution/src/index/cached_wheel.rs @@ -18,9 +18,6 @@ impl CachedWheel { pub fn from_path(path: &Path) -> Option { let filename = path.file_name()?.to_str()?; let filename = WheelFilename::from_stem(filename).ok()?; - if path.is_file() { - return None; - } let entry = CacheEntry::from_path(path); Some(Self { filename, entry }) } diff --git a/crates/puffin-distribution/src/index/registry_wheel_index.rs b/crates/puffin-distribution/src/index/registry_wheel_index.rs index a8245cfdf..61d45b273 100644 --- a/crates/puffin-distribution/src/index/registry_wheel_index.rs +++ b/crates/puffin-distribution/src/index/registry_wheel_index.rs @@ -8,7 +8,7 @@ use distribution_types::{CachedRegistryDist, FlatIndexLocation, IndexLocations, use pep440_rs::Version; use platform_tags::Tags; use puffin_cache::{Cache, CacheBucket, WheelCache}; -use puffin_fs::directories; +use puffin_fs::{directories, symlinks}; use puffin_normalize::PackageName; use crate::index::cached_wheel::CachedWheel; @@ -125,7 +125,8 @@ impl<'a> RegistryWheelIndex<'a> { tags: &Tags, versions: &mut BTreeMap, ) { - for wheel_dir in directories(path.as_ref()) { + // Unzipped wheels are stored as symlinks into the archive directory. + for wheel_dir in symlinks(path.as_ref()) { match CachedWheel::from_path(&wheel_dir) { None => {} Some(dist_info) => { diff --git a/crates/puffin-distribution/src/source/mod.rs b/crates/puffin-distribution/src/source/mod.rs index 5c3f0b05c..3b1cedc20 100644 --- a/crates/puffin-distribution/src/source/mod.rs +++ b/crates/puffin-distribution/src/source/mod.rs @@ -11,7 +11,7 @@ use futures::{FutureExt, TryStreamExt}; use reqwest::Response; use tempfile::TempDir; use tokio_util::compat::FuturesAsyncReadCompatExt; -use tracing::{debug, info_span, instrument, warn, Instrument}; +use tracing::{debug, info_span, instrument, Instrument}; use url::Url; use zip::ZipArchive; @@ -674,29 +674,22 @@ impl<'a, T: BuildContext> SourceDistCachedBuilder<'a, T> { // Download the source distribution to a temporary file. let span = info_span!("download_source_dist", filename = filename, source_dist = %source_dist); - let (temp_dir, source_dist_archive) = - self.download_source_dist_url(response, filename).await?; + let download_dir = self.download_source_dist_url(response, filename).await?; drop(span); // Unzip the source distribution to a temporary directory. let span = info_span!("extract_source_dist", filename = filename, source_dist = %source_dist); let source_dist_dir = puffin_extract::extract_source( - &source_dist_archive, - temp_dir.path().join("extracted"), + download_dir.path().join(filename), + download_dir.path().join("extracted"), )?; drop(span); // Persist the unzipped distribution to the cache. - fs::create_dir_all(&cache_entry.dir()).await?; - if let Err(err) = fs_err::rename(&source_dist_dir, cache_path) { - // If another thread already cached the distribution, we can ignore the error. - if cache_path.is_dir() { - warn!("Downloaded already-cached distribution: {source_dist}"); - } else { - return Err(err.into()); - }; - } + self.build_context + .cache() + .persist(source_dist_dir, cache_path)?; Ok(cache_path) } @@ -706,7 +699,7 @@ impl<'a, T: BuildContext> SourceDistCachedBuilder<'a, T> { &self, response: Response, source_dist_filename: &str, - ) -> Result<(TempDir, PathBuf), puffin_client::Error> { + ) -> Result { let reader = response .bytes_stream() .map_err(|err| std::io::Error::new(std::io::ErrorKind::Other, err)) @@ -714,16 +707,12 @@ impl<'a, T: BuildContext> SourceDistCachedBuilder<'a, T> { let mut reader = tokio::io::BufReader::new(reader.compat()); // Create a temporary directory. - let cache_dir = self.build_context.cache().bucket(CacheBucket::BuiltWheels); - fs::create_dir_all(&cache_dir) - .await + let temp_dir = tempfile::tempdir_in(self.build_context.cache().root()) .map_err(puffin_client::Error::CacheWrite)?; - let temp_dir = tempfile::tempdir_in(cache_dir).map_err(puffin_client::Error::CacheWrite)?; // Download the source distribution to a temporary file. - let sdist_file = temp_dir.path().join(source_dist_filename); let mut writer = tokio::io::BufWriter::new( - fs_err::tokio::File::create(&sdist_file) + fs_err::tokio::File::create(temp_dir.path().join(source_dist_filename)) .await .map_err(puffin_client::Error::CacheWrite)?, ); @@ -731,7 +720,7 @@ impl<'a, T: BuildContext> SourceDistCachedBuilder<'a, T> { .await .map_err(puffin_client::Error::CacheWrite)?; - Ok((temp_dir, sdist_file)) + Ok(temp_dir) } /// Download a source distribution from a Git repository. diff --git a/crates/puffin-fs/src/lib.rs b/crates/puffin-fs/src/lib.rs index 6e35780f3..ded5172aa 100644 --- a/crates/puffin-fs/src/lib.rs +++ b/crates/puffin-fs/src/lib.rs @@ -8,6 +8,18 @@ use tracing::{error, warn}; use puffin_warnings::warn_user; +/// Symlink a directory. +#[cfg(windows)] +pub fn symlink_dir(src: impl AsRef, dst: impl AsRef) -> std::io::Result<()> { + std::os::windows::fs::symlink_dir(src, dst) +} + +/// Symlink a directory. +#[cfg(unix)] +pub fn symlink_dir(src: impl AsRef, dst: impl AsRef) -> std::io::Result<()> { + std::os::unix::fs::symlink(src, dst) +} + /// Write `data` to `path` atomically using a temporary file and atomic rename. pub async fn write_atomic(path: impl AsRef, data: impl AsRef<[u8]>) -> std::io::Result<()> { let temp_file = NamedTempFile::new_in( @@ -95,6 +107,30 @@ pub fn directories(path: impl AsRef) -> impl Iterator { .map(|entry| entry.path()) } +/// Iterate over the symlinks in a directory. +/// +/// If the directory does not exist, returns an empty iterator. +pub fn symlinks(path: impl AsRef) -> impl Iterator { + path.as_ref() + .read_dir() + .ok() + .into_iter() + .flatten() + .filter_map(|entry| match entry { + Ok(entry) => Some(entry), + Err(err) => { + warn!("Failed to read entry: {}", err); + None + } + }) + .filter(|entry| { + entry + .file_type() + .map_or(false, |file_type| file_type.is_symlink()) + }) + .map(|entry| entry.path()) +} + /// A file lock that is automatically released when dropped. #[derive(Debug)] pub struct LockedFile(fs_err::File); diff --git a/crates/puffin-installer/src/downloader.rs b/crates/puffin-installer/src/downloader.rs index 8895d4652..646e2d121 100644 --- a/crates/puffin-installer/src/downloader.rs +++ b/crates/puffin-installer/src/downloader.rs @@ -4,7 +4,7 @@ use std::sync::Arc; use futures::{FutureExt, Stream, StreamExt, TryFutureExt, TryStreamExt}; use tokio::task::JoinError; -use tracing::{instrument, warn}; +use tracing::instrument; use url::Url; use distribution_types::{CachedDist, Dist, Identifier, LocalEditable, RemoteSource, SourceDist}; @@ -36,6 +36,7 @@ pub enum Error { /// Download, build, and unzip a set of distributions. pub struct Downloader<'a, Context: BuildContext + Send + Sync> { database: DistributionDatabase<'a, Context>, + cache: &'a Cache, reporter: Option>, } @@ -49,6 +50,7 @@ impl<'a, Context: BuildContext + Send + Sync> Downloader<'a, Context> { Self { database: DistributionDatabase::new(cache, tags, client, build_context), reporter: None, + cache, } } @@ -59,6 +61,7 @@ impl<'a, Context: BuildContext + Send + Sync> Downloader<'a, Context> { Self { reporter: Some(reporter.clone()), database: self.database.with_reporter(Facade::from(reporter.clone())), + cache: self.cache, } } @@ -124,7 +127,7 @@ impl<'a, Context: BuildContext + Send + Sync> Downloader<'a, Context> { .build_wheel_editable(&editable, editable_wheel_dir) .await .map_err(Error::Editable)?; - let cached_dist = Self::unzip_wheel(local_wheel).await?; + let cached_dist = self.unzip_wheel(local_wheel).await?; if let Some(task_id) = task_id { if let Some(reporter) = &self.reporter { reporter.on_editable_build_complete(&editable, task_id); @@ -153,7 +156,7 @@ impl<'a, Context: BuildContext + Send + Sync> Downloader<'a, Context> { } /// Download, build, and unzip a single wheel. - #[instrument(skip_all, fields(name = % dist, size = ? dist.size(), url = dist.file().map(|file| file.url.to_string()).unwrap_or_default()))] + #[instrument(skip_all, fields(name = % dist, size = ? dist.size(), url = dist.file().map(| file | file.url.to_string()).unwrap_or_default()))] pub async fn get_wheel(&self, dist: Dist, in_flight: &InFlight) -> Result { let id = dist.distribution_id(); let wheel = if in_flight.downloads.register(&id) { @@ -163,7 +166,7 @@ impl<'a, Context: BuildContext + Send + Sync> Downloader<'a, Context> { .boxed() .map_err(|err| Error::Fetch(dist.clone(), err)) .await?; - let result = Self::unzip_wheel(download).await; + let result = self.unzip_wheel(download).await; match result { Ok(cached) => { in_flight.downloads.done(id, Ok(cached.clone())); @@ -188,41 +191,23 @@ impl<'a, Context: BuildContext + Send + Sync> Downloader<'a, Context> { } /// Unzip a locally-available wheel into the cache. - async fn unzip_wheel(download: LocalWheel) -> Result { + async fn unzip_wheel(&self, download: LocalWheel) -> Result { // Just an optimization: Avoid spawning a blocking task if there is no work to be done. if matches!(download, LocalWheel::Unzipped(_)) { return Ok(download.into_cached_dist()); } - // If the wheel is already unpacked, we should avoid attempting to unzip it at all. - if download.target().is_dir() { - warn!("Wheel is already unpacked: {download}"); - return Ok(download.into_cached_dist()); - } - // Unzip the wheel. tokio::task::spawn_blocking({ let download = download.clone(); + let cache = self.cache.clone(); move || -> Result<(), puffin_extract::Error> { // Unzip the wheel into a temporary directory. - let parent = download - .target() - .parent() - .expect("Cache paths can't be root"); - fs_err::create_dir_all(parent)?; - let staging = tempfile::tempdir_in(parent)?; - download.unzip(staging.path())?; + let temp_dir = tempfile::tempdir_in(cache.root())?; + download.unzip(temp_dir.path())?; - // Move the unzipped wheel into the cache. - if let Err(err) = fs_err::rename(staging.into_path(), download.target()) { - // If another thread already unpacked the wheel, we can ignore the error. - return if download.target().is_dir() { - warn!("Wheel is already unpacked: {download}"); - Ok(()) - } else { - Err(err.into()) - }; - } + // Persist the temporary directory to the directory store. + cache.persist(temp_dir.into_path(), download.target())?; Ok(()) } diff --git a/crates/puffin-installer/src/installer.rs b/crates/puffin-installer/src/installer.rs index 0c8461414..f054a5f50 100644 --- a/crates/puffin-installer/src/installer.rs +++ b/crates/puffin-installer/src/installer.rs @@ -48,7 +48,7 @@ impl<'a> Installer<'a> { install_wheel_rs::linker::install_wheel( &location, - wheel.path(), + wheel.path().canonicalize()?, wheel.filename(), wheel .direct_url()?