From 5621c414cf249d5a78fcaed6673702c98dbd38e0 Mon Sep 17 00:00:00 2001 From: Charlie Marsh Date: Tue, 23 Jan 2024 14:52:37 -0500 Subject: [PATCH] Use symlinks for directories entries in cache (#1037) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ## Summary One problem we have in the cache today is that we can't overwrite entries atomically, because we store unzipped _directories_ in the cache (which makes installation _much_ faster than storing zipped directories). So, if you ignore the existing contents of the cache when writing, you might run into an error, because you might attempt to write a directory where a directory already exists. This is especially annoying for cache refresh, because in order to refresh the cache, we have to purge it (i.e., delete a bunch of stuff), which is also highly unsafe if Puffin is running across multiple threads or multiple processes. The solution I'm proposing here is that whenever we persist a _directory_ to the cache, we persist it to a special "archive" bucket. Then, within the other buckets, directory entries are actually symlinks into that "archive" bucket. With symlinks, we can atomically replace, which means we can easily overwrite cache entries without having to delete from the cache. The main downside is that we'll now accumulate dangling entries in the "archive" bucket, and so we'll need to implement some form of garbage collection to ensure that we remove entries with no symlinks. Another downside is that cache reads and writes will be a bit slower, since we need to deal with creating and resolving these symlinks. As an example... after this change, the cache entry for this unzipped wheel is actually a symlink: ![Screenshot 2024-01-22 at 11 56 18 AM](https://github.com/astral-sh/puffin/assets/1309177/99ff6940-5096-4246-8d16-2a7bdcdd8d4b) Then, within the archive directory, we actually have two unique entries (since I intentionally ran the command twice to ensure overwrites were safe): ![Screenshot 2024-01-22 at 11 56 22 AM](https://github.com/astral-sh/puffin/assets/1309177/717d04e2-25d9-4225-b190-bad1441868c6) --- Cargo.lock | 1 + crates/puffin-cache/Cargo.toml | 1 + crates/puffin-cache/src/lib.rs | 36 ++++++++++++++++ .../src/distribution_database.rs | 31 ++++++-------- .../src/index/built_wheel_index.rs | 5 ++- .../src/index/cached_wheel.rs | 3 -- .../src/index/registry_wheel_index.rs | 5 ++- crates/puffin-distribution/src/source/mod.rs | 33 +++++---------- crates/puffin-fs/src/lib.rs | 36 ++++++++++++++++ crates/puffin-installer/src/downloader.rs | 41 ++++++------------- crates/puffin-installer/src/installer.rs | 2 +- 11 files changed, 117 insertions(+), 77 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 495c7d1c0..ec149b32d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2365,6 +2365,7 @@ dependencies = [ "tempfile", "tracing", "url", + "uuid", ] [[package]] diff --git a/crates/puffin-cache/Cargo.toml b/crates/puffin-cache/Cargo.toml index c89425da8..71e7af5a9 100644 --- a/crates/puffin-cache/Cargo.toml +++ b/crates/puffin-cache/Cargo.toml @@ -28,3 +28,4 @@ serde = { workspace = true, features = ["derive"] } tempfile = { workspace = true } tracing = { workspace = true } url = { workspace = true } +uuid = { workspace = true , default-features = false, features = ["v4"] } diff --git a/crates/puffin-cache/src/lib.rs b/crates/puffin-cache/src/lib.rs index 06d24e7d8..ef9fa9469 100644 --- a/crates/puffin-cache/src/lib.rs +++ b/crates/puffin-cache/src/lib.rs @@ -149,6 +149,32 @@ impl Cache { CacheEntry::new(self.bucket(cache_bucket).join(dir), file) } + /// Persist a temporary directory to the artifact store. + pub fn persist( + &self, + temp_dir: impl AsRef, + path: impl AsRef, + ) -> Result<(), io::Error> { + // Create a unique ID for the artifact. + let id = uuid::Uuid::new_v4(); + + // Move the temporary directory into the directory store. + let archive_entry = self.entry(CacheBucket::Archive, "", id.to_string()); + fs_err::create_dir_all(archive_entry.dir())?; + fs_err::rename(temp_dir.as_ref(), archive_entry.path())?; + + // Create a symlink to the directory store. + let temp_dir = tempfile::tempdir_in(self.root())?; + let temp_file = temp_dir.path().join("symlink"); + puffin_fs::symlink_dir(archive_entry.path(), &temp_file)?; + + // Move the symlink into the wheel cache. + fs_err::create_dir_all(path.as_ref().parent().expect("Cache entry to have parent"))?; + fs_err::rename(&temp_file, path.as_ref())?; + + Ok(()) + } + /// Initialize a directory for use as a cache. fn init(root: impl Into) -> Result { let root = root.into(); @@ -413,6 +439,12 @@ pub enum CacheBucket { /// /// The response is parsed into `puffin_client::SimpleMetadata` before storage. Simple, + /// A cache of unzipped wheels, stored as directories. This is used internally within the cache. + /// When other buckets need to store directories, they should persist them to + /// [`CacheBucket::Archive`], and then symlink them into the appropriate bucket. This ensures + /// that cache entries can be atomically replaced and removed, as storing directories in the + /// other buckets directly would make atomic operations impossible. + Archive, } impl CacheBucket { @@ -424,6 +456,7 @@ impl CacheBucket { CacheBucket::Interpreter => "interpreter-v0", CacheBucket::Simple => "simple-v0", CacheBucket::Wheels => "wheels-v0", + CacheBucket::Archive => "archive-v0", } } @@ -542,6 +575,9 @@ impl CacheBucket { CacheBucket::Interpreter => { // Nothing to do. } + CacheBucket::Archive => { + // Nothing to do. + } } Ok(count) } diff --git a/crates/puffin-distribution/src/distribution_database.rs b/crates/puffin-distribution/src/distribution_database.rs index 8d755c24d..f761404ef 100644 --- a/crates/puffin-distribution/src/distribution_database.rs +++ b/crates/puffin-distribution/src/distribution_database.rs @@ -46,7 +46,6 @@ pub enum DistributionDatabaseError { SourceBuild(#[from] SourceDistError), #[error("Git operation failed")] Git(#[source] anyhow::Error), - /// Should not occur, i've only seen it when another task panicked #[error("The task executor is broken, did some other task panic?")] Join(#[from] JoinError), #[error("Building source distributions is disabled")] @@ -166,23 +165,21 @@ impl<'a, Context: BuildContext + Send + Sync> DistributionDatabase<'a, Context> // Download and unzip the wheel to a temporary directory. let temp_dir = tempfile::tempdir_in(self.cache.root())?; - let temp_target = temp_dir.path().join(&wheel.file.filename); - unzip_no_seek(reader.compat(), &temp_target).await?; + unzip_no_seek(reader.compat(), temp_dir.path()).await?; - // Move the temporary file to the cache. + // Persist the temporary directory to the directory store. let wheel_filename = WheelFilename::from_str(&wheel.file.filename)?; let cache_entry = self.cache.entry( CacheBucket::Wheels, WheelCache::Index(&wheel.index).remote_wheel_dir(wheel_filename.name.as_ref()), wheel_filename.stem(), ); - fs_err::tokio::create_dir_all(&cache_entry.dir()).await?; - let target = cache_entry.into_path_buf(); - fs_err::tokio::rename(temp_target, &target).await?; + self.cache + .persist(temp_dir.into_path(), cache_entry.path())?; Ok(LocalWheel::Unzipped(UnzippedWheel { dist: dist.clone(), - target, + target: cache_entry.into_path_buf(), filename: wheel_filename, })) } @@ -196,26 +193,22 @@ impl<'a, Context: BuildContext + Send + Sync> DistributionDatabase<'a, Context> // Download and unzip the wheel to a temporary directory. let temp_dir = tempfile::tempdir_in(self.cache.root())?; - let temp_target = temp_dir.path().join(wheel.filename.to_string()); - unzip_no_seek(reader.compat(), &temp_target).await?; + unzip_no_seek(reader.compat(), temp_dir.path()).await?; - // Move the temporary file to the cache. + // Persist the temporary directory to the directory store. let cache_entry = self.cache.entry( CacheBucket::Wheels, WheelCache::Url(&wheel.url).remote_wheel_dir(wheel.name().as_ref()), wheel.filename.stem(), ); - fs_err::tokio::create_dir_all(&cache_entry.dir()).await?; - let target = cache_entry.into_path_buf(); - fs_err::tokio::rename(temp_target, &target).await?; + self.cache + .persist(temp_dir.into_path(), cache_entry.path())?; - let local_wheel = LocalWheel::Unzipped(UnzippedWheel { + Ok(LocalWheel::Unzipped(UnzippedWheel { dist: dist.clone(), - target, + target: cache_entry.into_path_buf(), filename: wheel.filename.clone(), - }); - - Ok(local_wheel) + })) } Dist::Built(BuiltDist::Path(wheel)) => { diff --git a/crates/puffin-distribution/src/index/built_wheel_index.rs b/crates/puffin-distribution/src/index/built_wheel_index.rs index 04c44e1bb..36120c188 100644 --- a/crates/puffin-distribution/src/index/built_wheel_index.rs +++ b/crates/puffin-distribution/src/index/built_wheel_index.rs @@ -1,7 +1,7 @@ use distribution_types::{git_reference, DirectUrlSourceDist, GitSourceDist, Name, PathSourceDist}; use platform_tags::Tags; use puffin_cache::{Cache, CacheBucket, CacheShard, WheelCache}; -use puffin_fs::directories; +use puffin_fs::symlinks; use crate::index::cached_wheel::CachedWheel; use crate::source::{read_http_manifest, read_timestamp_manifest, MANIFEST}; @@ -94,7 +94,8 @@ impl BuiltWheelIndex { pub fn find(shard: &CacheShard, tags: &Tags) -> Option { let mut candidate: Option = None; - for subdir in directories(shard) { + // Unzipped wheels are stored as symlinks into the archive directory. + for subdir in symlinks(shard) { match CachedWheel::from_path(&subdir) { None => {} Some(dist_info) => { diff --git a/crates/puffin-distribution/src/index/cached_wheel.rs b/crates/puffin-distribution/src/index/cached_wheel.rs index 33f9902e4..3f8945f05 100644 --- a/crates/puffin-distribution/src/index/cached_wheel.rs +++ b/crates/puffin-distribution/src/index/cached_wheel.rs @@ -18,9 +18,6 @@ impl CachedWheel { pub fn from_path(path: &Path) -> Option { let filename = path.file_name()?.to_str()?; let filename = WheelFilename::from_stem(filename).ok()?; - if path.is_file() { - return None; - } let entry = CacheEntry::from_path(path); Some(Self { filename, entry }) } diff --git a/crates/puffin-distribution/src/index/registry_wheel_index.rs b/crates/puffin-distribution/src/index/registry_wheel_index.rs index a8245cfdf..61d45b273 100644 --- a/crates/puffin-distribution/src/index/registry_wheel_index.rs +++ b/crates/puffin-distribution/src/index/registry_wheel_index.rs @@ -8,7 +8,7 @@ use distribution_types::{CachedRegistryDist, FlatIndexLocation, IndexLocations, use pep440_rs::Version; use platform_tags::Tags; use puffin_cache::{Cache, CacheBucket, WheelCache}; -use puffin_fs::directories; +use puffin_fs::{directories, symlinks}; use puffin_normalize::PackageName; use crate::index::cached_wheel::CachedWheel; @@ -125,7 +125,8 @@ impl<'a> RegistryWheelIndex<'a> { tags: &Tags, versions: &mut BTreeMap, ) { - for wheel_dir in directories(path.as_ref()) { + // Unzipped wheels are stored as symlinks into the archive directory. + for wheel_dir in symlinks(path.as_ref()) { match CachedWheel::from_path(&wheel_dir) { None => {} Some(dist_info) => { diff --git a/crates/puffin-distribution/src/source/mod.rs b/crates/puffin-distribution/src/source/mod.rs index 5c3f0b05c..3b1cedc20 100644 --- a/crates/puffin-distribution/src/source/mod.rs +++ b/crates/puffin-distribution/src/source/mod.rs @@ -11,7 +11,7 @@ use futures::{FutureExt, TryStreamExt}; use reqwest::Response; use tempfile::TempDir; use tokio_util::compat::FuturesAsyncReadCompatExt; -use tracing::{debug, info_span, instrument, warn, Instrument}; +use tracing::{debug, info_span, instrument, Instrument}; use url::Url; use zip::ZipArchive; @@ -674,29 +674,22 @@ impl<'a, T: BuildContext> SourceDistCachedBuilder<'a, T> { // Download the source distribution to a temporary file. let span = info_span!("download_source_dist", filename = filename, source_dist = %source_dist); - let (temp_dir, source_dist_archive) = - self.download_source_dist_url(response, filename).await?; + let download_dir = self.download_source_dist_url(response, filename).await?; drop(span); // Unzip the source distribution to a temporary directory. let span = info_span!("extract_source_dist", filename = filename, source_dist = %source_dist); let source_dist_dir = puffin_extract::extract_source( - &source_dist_archive, - temp_dir.path().join("extracted"), + download_dir.path().join(filename), + download_dir.path().join("extracted"), )?; drop(span); // Persist the unzipped distribution to the cache. - fs::create_dir_all(&cache_entry.dir()).await?; - if let Err(err) = fs_err::rename(&source_dist_dir, cache_path) { - // If another thread already cached the distribution, we can ignore the error. - if cache_path.is_dir() { - warn!("Downloaded already-cached distribution: {source_dist}"); - } else { - return Err(err.into()); - }; - } + self.build_context + .cache() + .persist(source_dist_dir, cache_path)?; Ok(cache_path) } @@ -706,7 +699,7 @@ impl<'a, T: BuildContext> SourceDistCachedBuilder<'a, T> { &self, response: Response, source_dist_filename: &str, - ) -> Result<(TempDir, PathBuf), puffin_client::Error> { + ) -> Result { let reader = response .bytes_stream() .map_err(|err| std::io::Error::new(std::io::ErrorKind::Other, err)) @@ -714,16 +707,12 @@ impl<'a, T: BuildContext> SourceDistCachedBuilder<'a, T> { let mut reader = tokio::io::BufReader::new(reader.compat()); // Create a temporary directory. - let cache_dir = self.build_context.cache().bucket(CacheBucket::BuiltWheels); - fs::create_dir_all(&cache_dir) - .await + let temp_dir = tempfile::tempdir_in(self.build_context.cache().root()) .map_err(puffin_client::Error::CacheWrite)?; - let temp_dir = tempfile::tempdir_in(cache_dir).map_err(puffin_client::Error::CacheWrite)?; // Download the source distribution to a temporary file. - let sdist_file = temp_dir.path().join(source_dist_filename); let mut writer = tokio::io::BufWriter::new( - fs_err::tokio::File::create(&sdist_file) + fs_err::tokio::File::create(temp_dir.path().join(source_dist_filename)) .await .map_err(puffin_client::Error::CacheWrite)?, ); @@ -731,7 +720,7 @@ impl<'a, T: BuildContext> SourceDistCachedBuilder<'a, T> { .await .map_err(puffin_client::Error::CacheWrite)?; - Ok((temp_dir, sdist_file)) + Ok(temp_dir) } /// Download a source distribution from a Git repository. diff --git a/crates/puffin-fs/src/lib.rs b/crates/puffin-fs/src/lib.rs index 6e35780f3..ded5172aa 100644 --- a/crates/puffin-fs/src/lib.rs +++ b/crates/puffin-fs/src/lib.rs @@ -8,6 +8,18 @@ use tracing::{error, warn}; use puffin_warnings::warn_user; +/// Symlink a directory. +#[cfg(windows)] +pub fn symlink_dir(src: impl AsRef, dst: impl AsRef) -> std::io::Result<()> { + std::os::windows::fs::symlink_dir(src, dst) +} + +/// Symlink a directory. +#[cfg(unix)] +pub fn symlink_dir(src: impl AsRef, dst: impl AsRef) -> std::io::Result<()> { + std::os::unix::fs::symlink(src, dst) +} + /// Write `data` to `path` atomically using a temporary file and atomic rename. pub async fn write_atomic(path: impl AsRef, data: impl AsRef<[u8]>) -> std::io::Result<()> { let temp_file = NamedTempFile::new_in( @@ -95,6 +107,30 @@ pub fn directories(path: impl AsRef) -> impl Iterator { .map(|entry| entry.path()) } +/// Iterate over the symlinks in a directory. +/// +/// If the directory does not exist, returns an empty iterator. +pub fn symlinks(path: impl AsRef) -> impl Iterator { + path.as_ref() + .read_dir() + .ok() + .into_iter() + .flatten() + .filter_map(|entry| match entry { + Ok(entry) => Some(entry), + Err(err) => { + warn!("Failed to read entry: {}", err); + None + } + }) + .filter(|entry| { + entry + .file_type() + .map_or(false, |file_type| file_type.is_symlink()) + }) + .map(|entry| entry.path()) +} + /// A file lock that is automatically released when dropped. #[derive(Debug)] pub struct LockedFile(fs_err::File); diff --git a/crates/puffin-installer/src/downloader.rs b/crates/puffin-installer/src/downloader.rs index 8895d4652..646e2d121 100644 --- a/crates/puffin-installer/src/downloader.rs +++ b/crates/puffin-installer/src/downloader.rs @@ -4,7 +4,7 @@ use std::sync::Arc; use futures::{FutureExt, Stream, StreamExt, TryFutureExt, TryStreamExt}; use tokio::task::JoinError; -use tracing::{instrument, warn}; +use tracing::instrument; use url::Url; use distribution_types::{CachedDist, Dist, Identifier, LocalEditable, RemoteSource, SourceDist}; @@ -36,6 +36,7 @@ pub enum Error { /// Download, build, and unzip a set of distributions. pub struct Downloader<'a, Context: BuildContext + Send + Sync> { database: DistributionDatabase<'a, Context>, + cache: &'a Cache, reporter: Option>, } @@ -49,6 +50,7 @@ impl<'a, Context: BuildContext + Send + Sync> Downloader<'a, Context> { Self { database: DistributionDatabase::new(cache, tags, client, build_context), reporter: None, + cache, } } @@ -59,6 +61,7 @@ impl<'a, Context: BuildContext + Send + Sync> Downloader<'a, Context> { Self { reporter: Some(reporter.clone()), database: self.database.with_reporter(Facade::from(reporter.clone())), + cache: self.cache, } } @@ -124,7 +127,7 @@ impl<'a, Context: BuildContext + Send + Sync> Downloader<'a, Context> { .build_wheel_editable(&editable, editable_wheel_dir) .await .map_err(Error::Editable)?; - let cached_dist = Self::unzip_wheel(local_wheel).await?; + let cached_dist = self.unzip_wheel(local_wheel).await?; if let Some(task_id) = task_id { if let Some(reporter) = &self.reporter { reporter.on_editable_build_complete(&editable, task_id); @@ -153,7 +156,7 @@ impl<'a, Context: BuildContext + Send + Sync> Downloader<'a, Context> { } /// Download, build, and unzip a single wheel. - #[instrument(skip_all, fields(name = % dist, size = ? dist.size(), url = dist.file().map(|file| file.url.to_string()).unwrap_or_default()))] + #[instrument(skip_all, fields(name = % dist, size = ? dist.size(), url = dist.file().map(| file | file.url.to_string()).unwrap_or_default()))] pub async fn get_wheel(&self, dist: Dist, in_flight: &InFlight) -> Result { let id = dist.distribution_id(); let wheel = if in_flight.downloads.register(&id) { @@ -163,7 +166,7 @@ impl<'a, Context: BuildContext + Send + Sync> Downloader<'a, Context> { .boxed() .map_err(|err| Error::Fetch(dist.clone(), err)) .await?; - let result = Self::unzip_wheel(download).await; + let result = self.unzip_wheel(download).await; match result { Ok(cached) => { in_flight.downloads.done(id, Ok(cached.clone())); @@ -188,41 +191,23 @@ impl<'a, Context: BuildContext + Send + Sync> Downloader<'a, Context> { } /// Unzip a locally-available wheel into the cache. - async fn unzip_wheel(download: LocalWheel) -> Result { + async fn unzip_wheel(&self, download: LocalWheel) -> Result { // Just an optimization: Avoid spawning a blocking task if there is no work to be done. if matches!(download, LocalWheel::Unzipped(_)) { return Ok(download.into_cached_dist()); } - // If the wheel is already unpacked, we should avoid attempting to unzip it at all. - if download.target().is_dir() { - warn!("Wheel is already unpacked: {download}"); - return Ok(download.into_cached_dist()); - } - // Unzip the wheel. tokio::task::spawn_blocking({ let download = download.clone(); + let cache = self.cache.clone(); move || -> Result<(), puffin_extract::Error> { // Unzip the wheel into a temporary directory. - let parent = download - .target() - .parent() - .expect("Cache paths can't be root"); - fs_err::create_dir_all(parent)?; - let staging = tempfile::tempdir_in(parent)?; - download.unzip(staging.path())?; + let temp_dir = tempfile::tempdir_in(cache.root())?; + download.unzip(temp_dir.path())?; - // Move the unzipped wheel into the cache. - if let Err(err) = fs_err::rename(staging.into_path(), download.target()) { - // If another thread already unpacked the wheel, we can ignore the error. - return if download.target().is_dir() { - warn!("Wheel is already unpacked: {download}"); - Ok(()) - } else { - Err(err.into()) - }; - } + // Persist the temporary directory to the directory store. + cache.persist(temp_dir.into_path(), download.target())?; Ok(()) } diff --git a/crates/puffin-installer/src/installer.rs b/crates/puffin-installer/src/installer.rs index 0c8461414..f054a5f50 100644 --- a/crates/puffin-installer/src/installer.rs +++ b/crates/puffin-installer/src/installer.rs @@ -48,7 +48,7 @@ impl<'a> Installer<'a> { install_wheel_rs::linker::install_wheel( &location, - wheel.path(), + wheel.path().canonicalize()?, wheel.filename(), wheel .direct_url()?