Parallelize network requests in `uv tree --outdated` (#9280)

## Summary

Closes https://github.com/astral-sh/uv/issues/9266.
This commit is contained in:
Charlie Marsh 2024-11-20 11:45:14 -05:00 committed by GitHub
parent 23cc9b0322
commit a0de83001c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 51 additions and 27 deletions

View File

@ -13,6 +13,11 @@ impl<T> Default for PackageMap<T> {
}
impl<T> PackageMap<T> {
/// Insert a value by [`PackageId`].
pub fn insert(&mut self, package: Package, value: T) -> Option<T> {
self.0.insert(package.id, value)
}
/// Get a value by [`PackageId`].
pub(crate) fn get(&self, package_id: &PackageId) -> Option<&T> {
self.0.get(package_id)

View File

@ -1,3 +1,4 @@
use tracing::debug;
use uv_client::{RegistryClient, VersionFiles};
use uv_distribution_filename::DistFilename;
use uv_distribution_types::{IndexCapabilities, IndexUrl};
@ -10,7 +11,7 @@ use uv_warnings::warn_user_once;
///
/// The returned distribution is guaranteed to be compatible with the provided tags and Python
/// requirement.
#[derive(Debug)]
#[derive(Debug, Copy, Clone)]
pub(crate) struct LatestClient<'env> {
pub(crate) client: &'env RegistryClient,
pub(crate) capabilities: &'env IndexCapabilities,
@ -27,6 +28,8 @@ impl<'env> LatestClient<'env> {
package: &PackageName,
index: Option<&IndexUrl>,
) -> anyhow::Result<Option<DistFilename>, uv_client::Error> {
debug!("Fetching latest version of: `{package}`");
let archives = match self.client.simple(package, index, self.capabilities).await {
Ok(archives) => archives,
Err(err) => {

View File

@ -3,8 +3,7 @@ use std::fmt::Write;
use anstream::println;
use anyhow::Result;
use futures::stream::FuturesUnordered;
use futures::TryStreamExt;
use futures::StreamExt;
use itertools::Itertools;
use owo_colors::OwoColorize;
use rustc_hash::FxHashMap;
@ -15,7 +14,7 @@ use uv_cache::{Cache, Refresh};
use uv_cache_info::Timestamp;
use uv_cli::ListFormat;
use uv_client::{Connectivity, RegistryClientBuilder};
use uv_configuration::{IndexStrategy, KeyringProviderType, TrustedHost};
use uv_configuration::{Concurrency, IndexStrategy, KeyringProviderType, TrustedHost};
use uv_distribution_filename::DistFilename;
use uv_distribution_types::{Diagnostic, IndexCapabilities, IndexLocations, InstalledDist, Name};
use uv_fs::Simplified;
@ -44,6 +43,7 @@ pub(crate) async fn pip_list(
keyring_provider: KeyringProviderType,
allow_insecure_host: Vec<TrustedHost>,
connectivity: Connectivity,
concurrency: Concurrency,
strict: bool,
exclude_newer: Option<ExcludeNewer>,
python: Option<&str>,
@ -111,15 +111,18 @@ pub(crate) async fn pip_list(
};
// Fetch the latest version for each package.
results
.iter()
let mut fetches = futures::stream::iter(&results)
.map(|dist| async {
let latest = client.find_latest(dist.name(), None).await?;
Ok::<(&PackageName, Option<DistFilename>), uv_client::Error>((dist.name(), latest))
})
.collect::<FuturesUnordered<_>>()
.try_collect::<FxHashMap<_, _>>()
.await?
.buffer_unordered(concurrency.downloads);
let mut map = FxHashMap::default();
while let Some((package, version)) = fetches.next().await.transpose()? {
map.insert(package, version);
}
map
} else {
FxHashMap::default()
};

View File

@ -1,8 +1,8 @@
use std::path::Path;
use anstream::print;
use anyhow::Result;
use futures::{stream, StreamExt};
use anyhow::{Error, Result};
use futures::StreamExt;
use uv_cache::{Cache, Refresh};
use uv_cache_info::Timestamp;
@ -11,7 +11,6 @@ use uv_configuration::{
Concurrency, DevGroupsSpecification, LowerBound, TargetTriple, TrustedHost,
};
use uv_distribution_types::IndexCapabilities;
use uv_pep440::Version;
use uv_pep508::PackageName;
use uv_python::{PythonDownloads, PythonPreference, PythonRequest, PythonVersion};
use uv_resolver::{PackageMap, TreeDisplay};
@ -182,21 +181,34 @@ pub(crate) async fn tree(
};
// Fetch the latest version for each package.
stream::iter(lock.packages())
.filter_map(|package| async {
let index = package.index(workspace.install_path()).ok()??;
let filename = client
.find_latest(package.name(), Some(&index))
.await
.ok()??;
if filename.version() == package.version() {
None
} else {
Some((package.clone(), filename.into_version()))
}
})
.collect::<PackageMap<Version>>()
.await
let mut fetches = futures::stream::iter(lock.packages().iter().filter_map(|package| {
// Filter to packages that are derived from a registry.
let index = match package.index(workspace.install_path()) {
Ok(Some(index)) => index,
Ok(None) => return None,
Err(err) => return Some(Err(err)),
};
Some(Ok((package, index)))
}))
.map(|result| async move {
let (package, index) = result?;
let Some(filename) = client.find_latest(package.name(), Some(&index)).await? else {
return Ok(None);
};
if filename.version() == package.version() {
return Ok(None);
}
Ok::<Option<_>, Error>(Some((package, filename.into_version())))
})
.buffer_unordered(concurrency.downloads);
let mut map = PackageMap::default();
while let Some(entry) = fetches.next().await.transpose()? {
if let Some((package, version)) = entry {
map.insert(package.clone(), version);
}
}
map
} else {
PackageMap::default()
};

View File

@ -615,6 +615,7 @@ async fn run(mut cli: Cli) -> Result<ExitStatus> {
args.settings.keyring_provider,
globals.allow_insecure_host,
globals.connectivity,
globals.concurrency,
args.settings.strict,
args.settings.exclude_newer,
args.settings.python.as_deref(),