Enable uploads via pre-signed URLs (#17349)

## Summary

For pyx, we can allow uploads that bypass the registry and send the file
directly to S3. This is an opt-in feature, enabled via the `--direct`
flag.
This commit is contained in:
Charlie Marsh
2026-01-08 21:26:34 -05:00
committed by GitHub
parent 25d691eeaf
commit 85dedc8051
7 changed files with 318 additions and 41 deletions

View File

@@ -7586,6 +7586,13 @@ pub struct PublishArgs {
/// that is published.
#[arg(long, env = EnvVars::UV_PUBLISH_NO_ATTESTATIONS)]
pub no_attestations: bool,
/// Use direct upload to the registry.
///
/// When enabled, the publish command will use a direct two-phase upload protocol
/// that uploads files directly to storage, bypassing the registry's upload endpoint.
#[arg(long, hide = true)]
pub direct: bool,
}
#[derive(Args)]

View File

@@ -27,6 +27,7 @@ bitflags::bitflags! {
const WORKSPACE_LIST = 1 << 15;
const SBOM_EXPORT = 1 << 16;
const AUTH_HELPER = 1 << 17;
const DIRECT_PUBLISH = 1 << 18;
}
}
@@ -54,6 +55,7 @@ impl PreviewFeatures {
Self::WORKSPACE_LIST => "workspace-list",
Self::SBOM_EXPORT => "sbom-export",
Self::AUTH_HELPER => "auth-helper",
Self::DIRECT_PUBLISH => "direct-publish",
_ => panic!("`flag_as_str` can only be used for exactly one feature flag"),
}
}
@@ -109,6 +111,7 @@ impl FromStr for PreviewFeatures {
"workspace-list" => Self::WORKSPACE_LIST,
"sbom-export" => Self::SBOM_EXPORT,
"auth-helper" => Self::AUTH_HELPER,
"direct-publish" => Self::DIRECT_PUBLISH,
_ => {
warn_user_once!("Unknown preview feature: `{part}`");
continue;
@@ -285,6 +288,10 @@ mod tests {
assert_eq!(PreviewFeatures::FORMAT.flag_as_str(), "format");
assert_eq!(PreviewFeatures::S3_ENDPOINT.flag_as_str(), "s3-endpoint");
assert_eq!(PreviewFeatures::SBOM_EXPORT.flag_as_str(), "sbom-export");
assert_eq!(
PreviewFeatures::DIRECT_PUBLISH.flag_as_str(),
"direct-publish"
);
}
#[test]

View File

@@ -88,6 +88,12 @@ pub enum PublishError {
MissingHash(Box<DistFilename>),
#[error(transparent)]
RetryParsing(#[from] RetryParsingError),
#[error("Failed to reserve upload slot for `{}`", _0.user_display())]
Reserve(PathBuf, #[source] Box<PublishSendError>),
#[error("Failed to upload to S3 for `{}`", _0.user_display())]
S3Upload(PathBuf, #[source] Box<PublishSendError>),
#[error("Failed to finalize upload for `{}`", _0.user_display())]
Finalize(PathBuf, #[source] Box<PublishSendError>),
}
/// Failure to get the metadata for a specific file.
@@ -625,7 +631,7 @@ pub async fn validate(
.expect("URL must have path segments")
.push("validate");
let request = build_validation_request(
let request = build_metadata_request(
raw_filename,
&validation_url,
client,
@@ -653,6 +659,215 @@ pub async fn validate(
Ok(())
}
/// Upload a file using the two-phase upload protocol for pyx.
///
/// This is a more efficient upload method that:
/// 1. Reserves an upload slot and gets a pre-signed S3 URL.
/// 2. Uploads the file directly to S3.
/// 3. Finalizes the upload with the registry.
///
/// Returns `true` if the file was newly uploaded and `false` if it already existed.
pub async fn upload_two_phase(
group: &UploadDistribution,
form_metadata: &FormMetadata,
registry: &DisplaySafeUrl,
client: &BaseClient,
s3_client: &BaseClient,
retry_policy: ExponentialBackoff,
credentials: &Credentials,
reporter: Arc<impl Reporter>,
) -> Result<bool, PublishError> {
#[derive(Debug, Deserialize)]
struct ReserveResponse {
upload_url: Option<String>,
}
// Step 1: Reserve an upload slot.
let mut reserve_url = registry.clone();
reserve_url
.path_segments_mut()
.expect("URL must have path segments")
.push("reserve");
debug!("Reserving upload slot at {reserve_url}");
let reserve_request = build_metadata_request(
&group.raw_filename,
&reserve_url,
client,
credentials,
form_metadata,
);
let response = reserve_request.send().await.map_err(|err| {
PublishError::Reserve(
group.file.clone(),
PublishSendError::ReqwestMiddleware(err).into(),
)
})?;
let status = response.status();
let reserve_response: ReserveResponse = match status {
StatusCode::OK => {
debug!("File already uploaded: {}", group.raw_filename);
return Ok(false);
}
StatusCode::CREATED => {
let body = response.text().await.map_err(|err| {
PublishError::Reserve(
group.file.clone(),
PublishSendError::StatusNoBody(status, err).into(),
)
})?;
serde_json::from_str(&body).map_err(|_| {
PublishError::Reserve(
group.file.clone(),
PublishSendError::Status(status, format!("Invalid JSON response: {body}"))
.into(),
)
})?
}
_ => {
let body = response.text().await.unwrap_or_default();
return Err(PublishError::Reserve(
group.file.clone(),
PublishSendError::Status(status, body).into(),
));
}
};
// Step 2: Upload the file directly to S3 (if needed).
// When upload_url is None, the file already exists on S3 with the correct hash.
if let Some(upload_url) = reserve_response.upload_url {
let s3_url = DisplaySafeUrl::parse(&upload_url).map_err(|_| {
PublishError::S3Upload(
group.file.clone(),
PublishSendError::Status(
StatusCode::BAD_REQUEST,
"Invalid S3 URL in reserve response".to_string(),
)
.into(),
)
})?;
debug!("Got pre-signed URL for upload: {s3_url}");
// Use a custom retry loop since streaming uploads can't be retried by the middleware.
let file_size = fs_err::tokio::metadata(&group.file)
.await
.map_err(|err| {
PublishError::PublishPrepare(
group.file.clone(),
Box::new(PublishPrepareError::Io(err)),
)
})?
.len();
let mut retry_state = RetryState::start(retry_policy, s3_url.clone());
loop {
let file = File::open(&group.file).await.map_err(|err| {
PublishError::PublishPrepare(
group.file.clone(),
Box::new(PublishPrepareError::Io(err)),
)
})?;
let idx = reporter.on_upload_start(&group.filename.to_string(), Some(file_size));
let reporter_clone = reporter.clone();
let reader = ProgressReader::new(file, move |read| {
reporter_clone.on_upload_progress(idx, read as u64);
});
let file_reader = Body::wrap_stream(ReaderStream::new(reader));
let result = s3_client
.for_host(&s3_url)
.raw_client()
.put(Url::from(s3_url.clone()))
.header(reqwest::header::CONTENT_TYPE, "application/octet-stream")
.header(reqwest::header::CONTENT_LENGTH, file_size)
.body(file_reader)
.send()
.await;
let response = match result {
Ok(response) => {
reporter.on_upload_complete(idx);
response
}
Err(err) => {
let middleware_retries =
if let Some(RetryError::WithRetries { retries, .. }) =
(&err as &dyn std::error::Error).downcast_ref::<RetryError>()
{
*retries
} else {
0
};
if let Some(backoff) = retry_state.should_retry(&err, middleware_retries) {
retry_state.sleep_backoff(backoff).await;
continue;
}
return Err(PublishError::S3Upload(
group.file.clone(),
PublishSendError::ReqwestMiddleware(err).into(),
));
}
};
if response.status().is_success() {
break;
}
let status = response.status();
let body = response.text().await.unwrap_or_default();
return Err(PublishError::S3Upload(
group.file.clone(),
PublishSendError::Status(status, format!("S3 upload failed: {body}")).into(),
));
}
debug!("S3 upload complete for {}", group.raw_filename);
} else {
debug!(
"File already exists on S3, skipping upload: {}",
group.raw_filename
);
}
// Step 3: Finalize the upload.
let mut finalize_url = registry.clone();
finalize_url
.path_segments_mut()
.expect("URL must have path segments")
.push("finalize");
debug!("Finalizing upload at {finalize_url}");
let finalize_request = build_metadata_request(
&group.raw_filename,
&finalize_url,
client,
credentials,
form_metadata,
);
let response = finalize_request.send().await.map_err(|err| {
PublishError::Finalize(
group.file.clone(),
PublishSendError::ReqwestMiddleware(err).into(),
)
})?;
handle_response(&finalize_url, response)
.await
.map_err(|err| PublishError::Finalize(group.file.clone(), err.into()))?;
debug!("Upload finalized for {}", group.raw_filename);
Ok(true)
}
/// Check whether we should skip the upload of a file because it already exists on the index.
pub async fn check_url(
check_url_client: &CheckUrlClient<'_>,
@@ -1067,10 +1282,8 @@ async fn build_upload_request<'a>(
Ok((request, idx))
}
/// Build the validation request, to validate the upload without actually uploading the file.
///
/// Returns the [`RequestBuilder`].
fn build_validation_request<'a>(
/// Build a request with form metadata but without the file content.
fn build_metadata_request<'a>(
raw_filename: &str,
registry: &DisplaySafeUrl,
client: &'a BaseClient,

View File

@@ -13,9 +13,10 @@ use uv_client::{
};
use uv_configuration::{KeyringProviderType, TrustedPublishing};
use uv_distribution_types::{IndexCapabilities, IndexLocations, IndexUrl};
use uv_preview::{Preview, PreviewFeatures};
use uv_publish::{
CheckUrlClient, FormMetadata, PublishError, TrustedPublishResult, check_trusted_publishing,
group_files_for_publishing, upload,
group_files_for_publishing, upload, upload_two_phase,
};
use uv_redacted::DisplaySafeUrl;
use uv_settings::EnvironmentOptions;
@@ -39,6 +40,8 @@ pub(crate) async fn publish(
index_locations: IndexLocations,
dry_run: bool,
no_attestations: bool,
direct: bool,
preview: Preview,
cache: &Cache,
printer: Printer,
) -> Result<ExitStatus> {
@@ -46,6 +49,14 @@ pub(crate) async fn publish(
bail!("Unable to publish files in offline mode");
}
if direct && !preview.is_enabled(PreviewFeatures::DIRECT_PUBLISH) {
warn_user_once!(
"The `--direct` option is experimental and may change without warning. \
Pass `--preview-features {}` to disable this warning.",
PreviewFeatures::DIRECT_PUBLISH
);
}
let token_store = PyxTokenStore::from_settings()?;
let (publish_url, check_url) = if let Some(index_name) = index {
@@ -130,10 +141,12 @@ pub(crate) async fn publish(
.redirect(RedirectPolicy::NoRedirect)
.timeout(environment.upload_http_timeout)
.build();
let oidc_client = client_builder
let unauthenticated_client = client_builder
.clone()
.retries(0)
.auth_integration(AuthIntegration::NoAuthMiddleware)
.wrap_existing(&upload_client);
.timeout(environment.upload_http_timeout)
.build();
let retry_policy = client_builder.retry_policy();
// We're only checking a single URL and one at a time, so 1 permit is sufficient
@@ -147,7 +160,7 @@ pub(crate) async fn publish(
trusted_publishing,
keyring_provider,
&token_store,
&oidc_client,
&unauthenticated_client,
&upload_client,
check_url.as_ref(),
Prompt::Enabled,
@@ -216,36 +229,68 @@ pub(crate) async fn publish(
.await
.map_err(|err| PublishError::PublishPrepare(group.file.clone(), Box::new(err)))?;
// Run validation checks on the file, but don't upload it (if possible).
uv_publish::validate(
&group.file,
&form_metadata,
&group.raw_filename,
&publish_url,
&token_store,
&upload_client,
&credentials,
)
.await?;
let uploaded = if direct {
if dry_run {
// For dry run, call validate since we won't call reserve.
uv_publish::validate(
&group.file,
&form_metadata,
&group.raw_filename,
&publish_url,
&token_store,
&upload_client,
&credentials,
)
.await?;
continue;
}
if dry_run {
continue;
}
debug!("Using two-phase upload (direct mode)");
let reporter = PublishReporter::single(printer);
upload_two_phase(
&group,
&form_metadata,
&publish_url,
&upload_client,
&unauthenticated_client,
retry_policy,
&credentials,
// Needs to be an `Arc` because the reqwest `Body` static lifetime requirement
Arc::new(reporter),
)
.await?
} else {
// Run validation checks on the file, but don't upload it (if possible).
uv_publish::validate(
&group.file,
&form_metadata,
&group.raw_filename,
&publish_url,
&token_store,
&upload_client,
&credentials,
)
.await?;
let reporter = PublishReporter::single(printer);
let uploaded = upload(
&group,
&form_metadata,
&publish_url,
&upload_client,
retry_policy,
&credentials,
check_url_client.as_ref(),
&download_concurrency,
// Needs to be an `Arc` because the reqwest `Body` static lifetime requirement
Arc::new(reporter),
)
.await?; // Filename and/or URL are already attached, if applicable.
if dry_run {
continue;
}
let reporter = PublishReporter::single(printer);
upload(
&group,
&form_metadata,
&publish_url,
&upload_client,
retry_policy,
&credentials,
check_url_client.as_ref(),
&download_concurrency,
// Needs to be an `Arc` because the reqwest `Body` static lifetime requirement
Arc::new(reporter),
)
.await? // Filename and/or URL are already attached, if applicable.
};
info!("Upload succeeded");
if !uploaded {
@@ -306,7 +351,7 @@ async fn gather_credentials(
trusted_publishing: TrustedPublishing,
keyring_provider: KeyringProviderType,
token_store: &PyxTokenStore,
oidc_client: &BaseClient,
unauthenticated_client: &BaseClient,
base_client: &BaseClient,
check_url: Option<&IndexUrl>,
prompt: Prompt,
@@ -356,7 +401,7 @@ async fn gather_credentials(
keyring_provider,
trusted_publishing,
&publish_url,
oidc_client,
unauthenticated_client,
)
.await?;

View File

@@ -1814,6 +1814,7 @@ async fn run(mut cli: Cli) -> Result<ExitStatus> {
password,
dry_run,
no_attestations,
direct,
publish_url,
trusted_publishing,
keyring_provider,
@@ -1836,6 +1837,8 @@ async fn run(mut cli: Cli) -> Result<ExitStatus> {
index_locations,
dry_run,
no_attestations,
direct,
globals.preview,
&cache,
printer,
)

View File

@@ -4061,6 +4061,7 @@ pub(crate) struct PublishSettings {
pub(crate) index: Option<String>,
pub(crate) dry_run: bool,
pub(crate) no_attestations: bool,
pub(crate) direct: bool,
// Both CLI and configuration.
pub(crate) publish_url: DisplaySafeUrl,
@@ -4107,6 +4108,7 @@ impl PublishSettings {
password,
dry_run: args.dry_run,
no_attestations: args.no_attestations,
direct: args.direct,
publish_url: args
.publish_url
.combine(publish_url)

View File

@@ -7982,7 +7982,7 @@ fn preview_features() {
show_settings: true,
preview: Preview {
flags: PreviewFeatures(
PYTHON_INSTALL_DEFAULT | PYTHON_UPGRADE | JSON_OUTPUT | PYLOCK | ADD_BOUNDS | PACKAGE_CONFLICTS | EXTRA_BUILD_DEPENDENCIES | DETECT_MODULE_CONFLICTS | FORMAT | NATIVE_AUTH | S3_ENDPOINT | CACHE_SIZE | INIT_PROJECT_FLAG | WORKSPACE_METADATA | WORKSPACE_DIR | WORKSPACE_LIST | SBOM_EXPORT | AUTH_HELPER,
PYTHON_INSTALL_DEFAULT | PYTHON_UPGRADE | JSON_OUTPUT | PYLOCK | ADD_BOUNDS | PACKAGE_CONFLICTS | EXTRA_BUILD_DEPENDENCIES | DETECT_MODULE_CONFLICTS | FORMAT | NATIVE_AUTH | S3_ENDPOINT | CACHE_SIZE | INIT_PROJECT_FLAG | WORKSPACE_METADATA | WORKSPACE_DIR | WORKSPACE_LIST | SBOM_EXPORT | AUTH_HELPER | DIRECT_PUBLISH,
),
},
python_preference: Managed,
@@ -8220,7 +8220,7 @@ fn preview_features() {
show_settings: true,
preview: Preview {
flags: PreviewFeatures(
PYTHON_INSTALL_DEFAULT | PYTHON_UPGRADE | JSON_OUTPUT | PYLOCK | ADD_BOUNDS | PACKAGE_CONFLICTS | EXTRA_BUILD_DEPENDENCIES | DETECT_MODULE_CONFLICTS | FORMAT | NATIVE_AUTH | S3_ENDPOINT | CACHE_SIZE | INIT_PROJECT_FLAG | WORKSPACE_METADATA | WORKSPACE_DIR | WORKSPACE_LIST | SBOM_EXPORT | AUTH_HELPER,
PYTHON_INSTALL_DEFAULT | PYTHON_UPGRADE | JSON_OUTPUT | PYLOCK | ADD_BOUNDS | PACKAGE_CONFLICTS | EXTRA_BUILD_DEPENDENCIES | DETECT_MODULE_CONFLICTS | FORMAT | NATIVE_AUTH | S3_ENDPOINT | CACHE_SIZE | INIT_PROJECT_FLAG | WORKSPACE_METADATA | WORKSPACE_DIR | WORKSPACE_LIST | SBOM_EXPORT | AUTH_HELPER | DIRECT_PUBLISH,
),
},
python_preference: Managed,