From b50e5fcbc509d865e84821b16e2ddd9dff3738d7 Mon Sep 17 00:00:00 2001 From: Charlie Marsh Date: Tue, 16 Jan 2024 05:37:35 -0500 Subject: [PATCH] Fetch `--find-links` indexes in parallel (#934) ## Summary Removes a TODO. ## Test Plan Tested manually with: ```shell cargo run -p puffin-cli -- \ pip compile requirements.in -n \ --find-links 'https://download.pytorch.org/whl/torch_stable.html' \ --find-links 'https://storage.googleapis.com/jax-releases/jax_cuda_releases.html' \ --verbose ``` And inspecting the logs to ensure that the two requests were kicked off concrurently. --- crates/puffin-client/src/flat_index.rs | 52 ++++++++++++++------------ 1 file changed, 29 insertions(+), 23 deletions(-) diff --git a/crates/puffin-client/src/flat_index.rs b/crates/puffin-client/src/flat_index.rs index 427abd3f8..732fd2f85 100644 --- a/crates/puffin-client/src/flat_index.rs +++ b/crates/puffin-client/src/flat_index.rs @@ -2,6 +2,7 @@ use std::collections::btree_map::Entry; use std::collections::BTreeMap; use std::path::PathBuf; +use futures::StreamExt; use reqwest::Response; use rustc_hash::FxHashMap; use tracing::{debug, info_span, instrument, warn, Instrument}; @@ -53,30 +54,35 @@ impl<'a> FlatIndexClient<'a> { &self, indexes: impl Iterator, ) -> Result, FlatIndexError> { - let mut dists = Vec::new(); - // TODO(konstin): Parallelize reads over flat indexes. - for flat_index in indexes { - let index_dists = match flat_index { - FlatIndexLocation::Path(path) => Self::read_from_directory(path) - .map_err(|err| FlatIndexError::FindLinksDirectory(path.clone(), err))?, - FlatIndexLocation::Url(url) => self - .read_from_url(url) - .await - .map_err(|err| FlatIndexError::FindLinksUrl(url.clone(), err))?, - }; - if index_dists.is_empty() { - warn!("No packages found in `--find-links` entry: {}", flat_index); - } else { - debug!( - "Found {} package{} in `--find-links` entry: {}", - index_dists.len(), - if index_dists.len() == 1 { "" } else { "s" }, - flat_index - ); - } - dists.extend(index_dists); + let mut fetches = futures::stream::iter(indexes) + .map(|index| async move { + let entries = match index { + FlatIndexLocation::Path(path) => Self::read_from_directory(path) + .map_err(|err| FlatIndexError::FindLinksDirectory(path.clone(), err))?, + FlatIndexLocation::Url(url) => self + .read_from_url(url) + .await + .map_err(|err| FlatIndexError::FindLinksUrl(url.clone(), err))?, + }; + if entries.is_empty() { + warn!("No packages found in `--find-links` entry: {}", index); + } else { + debug!( + "Found {} package{} in `--find-links` entry: {}", + entries.len(), + if entries.len() == 1 { "" } else { "s" }, + index + ); + } + Ok::, FlatIndexError>(entries) + }) + .buffered(16); + + let mut results = Vec::new(); + while let Some(entries) = fetches.next().await.transpose()? { + results.extend(entries); } - Ok(dists) + Ok(results) } /// Read a flat remote index from a `--find-links` URL.