diff --git a/crates/uv-cli/src/lib.rs b/crates/uv-cli/src/lib.rs index e89d8838a..10f3ec846 100644 --- a/crates/uv-cli/src/lib.rs +++ b/crates/uv-cli/src/lib.rs @@ -7586,6 +7586,13 @@ pub struct PublishArgs { /// that is published. #[arg(long, env = EnvVars::UV_PUBLISH_NO_ATTESTATIONS)] pub no_attestations: bool, + + /// Use direct upload to the registry. + /// + /// When enabled, the publish command will use a direct two-phase upload protocol + /// that uploads files directly to storage, bypassing the registry's upload endpoint. + #[arg(long, hide = true)] + pub direct: bool, } #[derive(Args)] diff --git a/crates/uv-preview/src/lib.rs b/crates/uv-preview/src/lib.rs index 146c6e580..f8e26f5d3 100644 --- a/crates/uv-preview/src/lib.rs +++ b/crates/uv-preview/src/lib.rs @@ -27,6 +27,7 @@ bitflags::bitflags! { const WORKSPACE_LIST = 1 << 15; const SBOM_EXPORT = 1 << 16; const AUTH_HELPER = 1 << 17; + const DIRECT_PUBLISH = 1 << 18; } } @@ -54,6 +55,7 @@ impl PreviewFeatures { Self::WORKSPACE_LIST => "workspace-list", Self::SBOM_EXPORT => "sbom-export", Self::AUTH_HELPER => "auth-helper", + Self::DIRECT_PUBLISH => "direct-publish", _ => panic!("`flag_as_str` can only be used for exactly one feature flag"), } } @@ -109,6 +111,7 @@ impl FromStr for PreviewFeatures { "workspace-list" => Self::WORKSPACE_LIST, "sbom-export" => Self::SBOM_EXPORT, "auth-helper" => Self::AUTH_HELPER, + "direct-publish" => Self::DIRECT_PUBLISH, _ => { warn_user_once!("Unknown preview feature: `{part}`"); continue; @@ -285,6 +288,10 @@ mod tests { assert_eq!(PreviewFeatures::FORMAT.flag_as_str(), "format"); assert_eq!(PreviewFeatures::S3_ENDPOINT.flag_as_str(), "s3-endpoint"); assert_eq!(PreviewFeatures::SBOM_EXPORT.flag_as_str(), "sbom-export"); + assert_eq!( + PreviewFeatures::DIRECT_PUBLISH.flag_as_str(), + "direct-publish" + ); } #[test] diff --git a/crates/uv-publish/src/lib.rs b/crates/uv-publish/src/lib.rs index 78879195b..1955238b2 100644 --- a/crates/uv-publish/src/lib.rs +++ b/crates/uv-publish/src/lib.rs @@ -88,6 +88,12 @@ pub enum PublishError { MissingHash(Box), #[error(transparent)] RetryParsing(#[from] RetryParsingError), + #[error("Failed to reserve upload slot for `{}`", _0.user_display())] + Reserve(PathBuf, #[source] Box), + #[error("Failed to upload to S3 for `{}`", _0.user_display())] + S3Upload(PathBuf, #[source] Box), + #[error("Failed to finalize upload for `{}`", _0.user_display())] + Finalize(PathBuf, #[source] Box), } /// Failure to get the metadata for a specific file. @@ -625,7 +631,7 @@ pub async fn validate( .expect("URL must have path segments") .push("validate"); - let request = build_validation_request( + let request = build_metadata_request( raw_filename, &validation_url, client, @@ -653,6 +659,215 @@ pub async fn validate( Ok(()) } +/// Upload a file using the two-phase upload protocol for pyx. +/// +/// This is a more efficient upload method that: +/// 1. Reserves an upload slot and gets a pre-signed S3 URL. +/// 2. Uploads the file directly to S3. +/// 3. Finalizes the upload with the registry. +/// +/// Returns `true` if the file was newly uploaded and `false` if it already existed. +pub async fn upload_two_phase( + group: &UploadDistribution, + form_metadata: &FormMetadata, + registry: &DisplaySafeUrl, + client: &BaseClient, + s3_client: &BaseClient, + retry_policy: ExponentialBackoff, + credentials: &Credentials, + reporter: Arc, +) -> Result { + #[derive(Debug, Deserialize)] + struct ReserveResponse { + upload_url: Option, + } + + // Step 1: Reserve an upload slot. + let mut reserve_url = registry.clone(); + reserve_url + .path_segments_mut() + .expect("URL must have path segments") + .push("reserve"); + + debug!("Reserving upload slot at {reserve_url}"); + + let reserve_request = build_metadata_request( + &group.raw_filename, + &reserve_url, + client, + credentials, + form_metadata, + ); + + let response = reserve_request.send().await.map_err(|err| { + PublishError::Reserve( + group.file.clone(), + PublishSendError::ReqwestMiddleware(err).into(), + ) + })?; + + let status = response.status(); + + let reserve_response: ReserveResponse = match status { + StatusCode::OK => { + debug!("File already uploaded: {}", group.raw_filename); + return Ok(false); + } + StatusCode::CREATED => { + let body = response.text().await.map_err(|err| { + PublishError::Reserve( + group.file.clone(), + PublishSendError::StatusNoBody(status, err).into(), + ) + })?; + serde_json::from_str(&body).map_err(|_| { + PublishError::Reserve( + group.file.clone(), + PublishSendError::Status(status, format!("Invalid JSON response: {body}")) + .into(), + ) + })? + } + _ => { + let body = response.text().await.unwrap_or_default(); + return Err(PublishError::Reserve( + group.file.clone(), + PublishSendError::Status(status, body).into(), + )); + } + }; + + // Step 2: Upload the file directly to S3 (if needed). + // When upload_url is None, the file already exists on S3 with the correct hash. + if let Some(upload_url) = reserve_response.upload_url { + let s3_url = DisplaySafeUrl::parse(&upload_url).map_err(|_| { + PublishError::S3Upload( + group.file.clone(), + PublishSendError::Status( + StatusCode::BAD_REQUEST, + "Invalid S3 URL in reserve response".to_string(), + ) + .into(), + ) + })?; + + debug!("Got pre-signed URL for upload: {s3_url}"); + + // Use a custom retry loop since streaming uploads can't be retried by the middleware. + let file_size = fs_err::tokio::metadata(&group.file) + .await + .map_err(|err| { + PublishError::PublishPrepare( + group.file.clone(), + Box::new(PublishPrepareError::Io(err)), + ) + })? + .len(); + + let mut retry_state = RetryState::start(retry_policy, s3_url.clone()); + loop { + let file = File::open(&group.file).await.map_err(|err| { + PublishError::PublishPrepare( + group.file.clone(), + Box::new(PublishPrepareError::Io(err)), + ) + })?; + + let idx = reporter.on_upload_start(&group.filename.to_string(), Some(file_size)); + let reporter_clone = reporter.clone(); + let reader = ProgressReader::new(file, move |read| { + reporter_clone.on_upload_progress(idx, read as u64); + }); + let file_reader = Body::wrap_stream(ReaderStream::new(reader)); + + let result = s3_client + .for_host(&s3_url) + .raw_client() + .put(Url::from(s3_url.clone())) + .header(reqwest::header::CONTENT_TYPE, "application/octet-stream") + .header(reqwest::header::CONTENT_LENGTH, file_size) + .body(file_reader) + .send() + .await; + + let response = match result { + Ok(response) => { + reporter.on_upload_complete(idx); + response + } + Err(err) => { + let middleware_retries = + if let Some(RetryError::WithRetries { retries, .. }) = + (&err as &dyn std::error::Error).downcast_ref::() + { + *retries + } else { + 0 + }; + if let Some(backoff) = retry_state.should_retry(&err, middleware_retries) { + retry_state.sleep_backoff(backoff).await; + continue; + } + return Err(PublishError::S3Upload( + group.file.clone(), + PublishSendError::ReqwestMiddleware(err).into(), + )); + } + }; + + if response.status().is_success() { + break; + } + + let status = response.status(); + let body = response.text().await.unwrap_or_default(); + return Err(PublishError::S3Upload( + group.file.clone(), + PublishSendError::Status(status, format!("S3 upload failed: {body}")).into(), + )); + } + + debug!("S3 upload complete for {}", group.raw_filename); + } else { + debug!( + "File already exists on S3, skipping upload: {}", + group.raw_filename + ); + } + + // Step 3: Finalize the upload. + let mut finalize_url = registry.clone(); + finalize_url + .path_segments_mut() + .expect("URL must have path segments") + .push("finalize"); + + debug!("Finalizing upload at {finalize_url}"); + + let finalize_request = build_metadata_request( + &group.raw_filename, + &finalize_url, + client, + credentials, + form_metadata, + ); + + let response = finalize_request.send().await.map_err(|err| { + PublishError::Finalize( + group.file.clone(), + PublishSendError::ReqwestMiddleware(err).into(), + ) + })?; + + handle_response(&finalize_url, response) + .await + .map_err(|err| PublishError::Finalize(group.file.clone(), err.into()))?; + + debug!("Upload finalized for {}", group.raw_filename); + + Ok(true) +} + /// Check whether we should skip the upload of a file because it already exists on the index. pub async fn check_url( check_url_client: &CheckUrlClient<'_>, @@ -1067,10 +1282,8 @@ async fn build_upload_request<'a>( Ok((request, idx)) } -/// Build the validation request, to validate the upload without actually uploading the file. -/// -/// Returns the [`RequestBuilder`]. -fn build_validation_request<'a>( +/// Build a request with form metadata but without the file content. +fn build_metadata_request<'a>( raw_filename: &str, registry: &DisplaySafeUrl, client: &'a BaseClient, diff --git a/crates/uv/src/commands/publish.rs b/crates/uv/src/commands/publish.rs index 8781f6824..cf7f4e1dd 100644 --- a/crates/uv/src/commands/publish.rs +++ b/crates/uv/src/commands/publish.rs @@ -13,9 +13,10 @@ use uv_client::{ }; use uv_configuration::{KeyringProviderType, TrustedPublishing}; use uv_distribution_types::{IndexCapabilities, IndexLocations, IndexUrl}; +use uv_preview::{Preview, PreviewFeatures}; use uv_publish::{ CheckUrlClient, FormMetadata, PublishError, TrustedPublishResult, check_trusted_publishing, - group_files_for_publishing, upload, + group_files_for_publishing, upload, upload_two_phase, }; use uv_redacted::DisplaySafeUrl; use uv_settings::EnvironmentOptions; @@ -39,6 +40,8 @@ pub(crate) async fn publish( index_locations: IndexLocations, dry_run: bool, no_attestations: bool, + direct: bool, + preview: Preview, cache: &Cache, printer: Printer, ) -> Result { @@ -46,6 +49,14 @@ pub(crate) async fn publish( bail!("Unable to publish files in offline mode"); } + if direct && !preview.is_enabled(PreviewFeatures::DIRECT_PUBLISH) { + warn_user_once!( + "The `--direct` option is experimental and may change without warning. \ + Pass `--preview-features {}` to disable this warning.", + PreviewFeatures::DIRECT_PUBLISH + ); + } + let token_store = PyxTokenStore::from_settings()?; let (publish_url, check_url) = if let Some(index_name) = index { @@ -130,10 +141,12 @@ pub(crate) async fn publish( .redirect(RedirectPolicy::NoRedirect) .timeout(environment.upload_http_timeout) .build(); - let oidc_client = client_builder + let unauthenticated_client = client_builder .clone() + .retries(0) .auth_integration(AuthIntegration::NoAuthMiddleware) - .wrap_existing(&upload_client); + .timeout(environment.upload_http_timeout) + .build(); let retry_policy = client_builder.retry_policy(); // We're only checking a single URL and one at a time, so 1 permit is sufficient @@ -147,7 +160,7 @@ pub(crate) async fn publish( trusted_publishing, keyring_provider, &token_store, - &oidc_client, + &unauthenticated_client, &upload_client, check_url.as_ref(), Prompt::Enabled, @@ -216,36 +229,68 @@ pub(crate) async fn publish( .await .map_err(|err| PublishError::PublishPrepare(group.file.clone(), Box::new(err)))?; - // Run validation checks on the file, but don't upload it (if possible). - uv_publish::validate( - &group.file, - &form_metadata, - &group.raw_filename, - &publish_url, - &token_store, - &upload_client, - &credentials, - ) - .await?; + let uploaded = if direct { + if dry_run { + // For dry run, call validate since we won't call reserve. + uv_publish::validate( + &group.file, + &form_metadata, + &group.raw_filename, + &publish_url, + &token_store, + &upload_client, + &credentials, + ) + .await?; + continue; + } - if dry_run { - continue; - } + debug!("Using two-phase upload (direct mode)"); + let reporter = PublishReporter::single(printer); + upload_two_phase( + &group, + &form_metadata, + &publish_url, + &upload_client, + &unauthenticated_client, + retry_policy, + &credentials, + // Needs to be an `Arc` because the reqwest `Body` static lifetime requirement + Arc::new(reporter), + ) + .await? + } else { + // Run validation checks on the file, but don't upload it (if possible). + uv_publish::validate( + &group.file, + &form_metadata, + &group.raw_filename, + &publish_url, + &token_store, + &upload_client, + &credentials, + ) + .await?; - let reporter = PublishReporter::single(printer); - let uploaded = upload( - &group, - &form_metadata, - &publish_url, - &upload_client, - retry_policy, - &credentials, - check_url_client.as_ref(), - &download_concurrency, - // Needs to be an `Arc` because the reqwest `Body` static lifetime requirement - Arc::new(reporter), - ) - .await?; // Filename and/or URL are already attached, if applicable. + if dry_run { + continue; + } + + let reporter = PublishReporter::single(printer); + upload( + &group, + &form_metadata, + &publish_url, + &upload_client, + retry_policy, + &credentials, + check_url_client.as_ref(), + &download_concurrency, + // Needs to be an `Arc` because the reqwest `Body` static lifetime requirement + Arc::new(reporter), + ) + .await? // Filename and/or URL are already attached, if applicable. + }; info!("Upload succeeded"); if !uploaded { @@ -306,7 +351,7 @@ async fn gather_credentials( trusted_publishing: TrustedPublishing, keyring_provider: KeyringProviderType, token_store: &PyxTokenStore, - oidc_client: &BaseClient, + unauthenticated_client: &BaseClient, base_client: &BaseClient, check_url: Option<&IndexUrl>, prompt: Prompt, @@ -356,7 +401,7 @@ async fn gather_credentials( keyring_provider, trusted_publishing, &publish_url, - oidc_client, + unauthenticated_client, ) .await?; diff --git a/crates/uv/src/lib.rs b/crates/uv/src/lib.rs index 15591896e..d83ff4055 100644 --- a/crates/uv/src/lib.rs +++ b/crates/uv/src/lib.rs @@ -1814,6 +1814,7 @@ async fn run(mut cli: Cli) -> Result { password, dry_run, no_attestations, + direct, publish_url, trusted_publishing, keyring_provider, @@ -1836,6 +1837,8 @@ async fn run(mut cli: Cli) -> Result { index_locations, dry_run, no_attestations, + direct, + globals.preview, &cache, printer, ) diff --git a/crates/uv/src/settings.rs b/crates/uv/src/settings.rs index da7055138..08e49f2c9 100644 --- a/crates/uv/src/settings.rs +++ b/crates/uv/src/settings.rs @@ -4061,6 +4061,7 @@ pub(crate) struct PublishSettings { pub(crate) index: Option, pub(crate) dry_run: bool, pub(crate) no_attestations: bool, + pub(crate) direct: bool, // Both CLI and configuration. pub(crate) publish_url: DisplaySafeUrl, @@ -4107,6 +4108,7 @@ impl PublishSettings { password, dry_run: args.dry_run, no_attestations: args.no_attestations, + direct: args.direct, publish_url: args .publish_url .combine(publish_url) diff --git a/crates/uv/tests/it/show_settings.rs b/crates/uv/tests/it/show_settings.rs index 9ebf6cebd..58245df70 100644 --- a/crates/uv/tests/it/show_settings.rs +++ b/crates/uv/tests/it/show_settings.rs @@ -7982,7 +7982,7 @@ fn preview_features() { show_settings: true, preview: Preview { flags: PreviewFeatures( - PYTHON_INSTALL_DEFAULT | PYTHON_UPGRADE | JSON_OUTPUT | PYLOCK | ADD_BOUNDS | PACKAGE_CONFLICTS | EXTRA_BUILD_DEPENDENCIES | DETECT_MODULE_CONFLICTS | FORMAT | NATIVE_AUTH | S3_ENDPOINT | CACHE_SIZE | INIT_PROJECT_FLAG | WORKSPACE_METADATA | WORKSPACE_DIR | WORKSPACE_LIST | SBOM_EXPORT | AUTH_HELPER, + PYTHON_INSTALL_DEFAULT | PYTHON_UPGRADE | JSON_OUTPUT | PYLOCK | ADD_BOUNDS | PACKAGE_CONFLICTS | EXTRA_BUILD_DEPENDENCIES | DETECT_MODULE_CONFLICTS | FORMAT | NATIVE_AUTH | S3_ENDPOINT | CACHE_SIZE | INIT_PROJECT_FLAG | WORKSPACE_METADATA | WORKSPACE_DIR | WORKSPACE_LIST | SBOM_EXPORT | AUTH_HELPER | DIRECT_PUBLISH, ), }, python_preference: Managed, @@ -8220,7 +8220,7 @@ fn preview_features() { show_settings: true, preview: Preview { flags: PreviewFeatures( - PYTHON_INSTALL_DEFAULT | PYTHON_UPGRADE | JSON_OUTPUT | PYLOCK | ADD_BOUNDS | PACKAGE_CONFLICTS | EXTRA_BUILD_DEPENDENCIES | DETECT_MODULE_CONFLICTS | FORMAT | NATIVE_AUTH | S3_ENDPOINT | CACHE_SIZE | INIT_PROJECT_FLAG | WORKSPACE_METADATA | WORKSPACE_DIR | WORKSPACE_LIST | SBOM_EXPORT | AUTH_HELPER, + PYTHON_INSTALL_DEFAULT | PYTHON_UPGRADE | JSON_OUTPUT | PYLOCK | ADD_BOUNDS | PACKAGE_CONFLICTS | EXTRA_BUILD_DEPENDENCIES | DETECT_MODULE_CONFLICTS | FORMAT | NATIVE_AUTH | S3_ENDPOINT | CACHE_SIZE | INIT_PROJECT_FLAG | WORKSPACE_METADATA | WORKSPACE_DIR | WORKSPACE_LIST | SBOM_EXPORT | AUTH_HELPER | DIRECT_PUBLISH, ), }, python_preference: Managed,