mirror of https://github.com/astral-sh/uv
Retry on python interpreter launch failures (#2278)
Sometimes, the first time we read from the stdout of the bytecode compiler python subprocess, we get an empty string back (no newline). If we try to write to stdin, it will often be a broken pipe (#2245). After we got an empty string the first time, we will get the same empty string if we read a line again. The details of this behavior are mysterious to me, but it seems that it can be identified by the first empty string. We check by inserting starting with a `Ready` message on the Python side. When we encounter the broken state, we discard the interpreter and try again. We have to introduce a third timeout check for the interpreter launch itself. Minimized test script: ```bash #!/usr/bin/env bash set -euo pipefail while true; do date --iso-8601=seconds # Progress indicator rm -rf testenv target/profiling/uv venv testenv -q --python 3.12 VIRTUAL_ENV=$PWD/testenv target/profiling/uv pip install -q --compile wheel==0.42.0 done ``` Run as ``` cargo build --profile profiling && bash compile_bug.sh ``` Fixes #2245
This commit is contained in:
parent
c3cd550a7a
commit
54311c8664
|
|
@ -8,7 +8,7 @@ use async_channel::{Receiver, SendError};
|
||||||
use tempfile::tempdir_in;
|
use tempfile::tempdir_in;
|
||||||
use thiserror::Error;
|
use thiserror::Error;
|
||||||
use tokio::io::{AsyncBufReadExt, AsyncReadExt, AsyncWriteExt, BufReader};
|
use tokio::io::{AsyncBufReadExt, AsyncReadExt, AsyncWriteExt, BufReader};
|
||||||
use tokio::process::{ChildStdin, ChildStdout, Command};
|
use tokio::process::{Child, ChildStderr, ChildStdin, ChildStdout, Command};
|
||||||
use tokio::task::JoinError;
|
use tokio::task::JoinError;
|
||||||
use tracing::{debug, instrument};
|
use tracing::{debug, instrument};
|
||||||
use walkdir::WalkDir;
|
use walkdir::WalkDir;
|
||||||
|
|
@ -149,36 +149,27 @@ async fn worker(
|
||||||
fs_err::tokio::write(&pip_compileall_py, COMPILEALL_SCRIPT)
|
fs_err::tokio::write(&pip_compileall_py, COMPILEALL_SCRIPT)
|
||||||
.await
|
.await
|
||||||
.map_err(CompileError::TempFile)?;
|
.map_err(CompileError::TempFile)?;
|
||||||
// We input the paths through stdin and get the successful paths returned through stdout.
|
|
||||||
let mut bytecode_compiler = Command::new(&interpreter)
|
|
||||||
.arg(&pip_compileall_py)
|
|
||||||
.stdin(Stdio::piped())
|
|
||||||
.stdout(Stdio::piped())
|
|
||||||
.stderr(Stdio::piped())
|
|
||||||
.current_dir(dir)
|
|
||||||
// Otherwise stdout is buffered and we'll wait forever for a response
|
|
||||||
.env("PYTHONUNBUFFERED", "1")
|
|
||||||
.spawn()
|
|
||||||
.map_err(CompileError::PythonSubcommand)?;
|
|
||||||
|
|
||||||
// https://stackoverflow.com/questions/49218599/write-to-child-process-stdin-in-rust/49597789#comment120223107_49597789
|
// Sometimes, the first time we read from stdout, we get an empty string back (no newline). If
|
||||||
// Unbuffered, we need to write immediately or the python process will get stuck waiting
|
// we try to write to stdin, it will often be a broken pipe. In this case, we have to restart
|
||||||
let child_stdin = bytecode_compiler
|
// the child process
|
||||||
.stdin
|
// https://github.com/astral-sh/uv/issues/2245
|
||||||
.take()
|
let wait_until_ready = async {
|
||||||
.expect("Child must have stdin");
|
loop {
|
||||||
let mut child_stdout = BufReader::new(
|
// If the interpreter started successful, return it, else retry.
|
||||||
bytecode_compiler
|
if let Some(child) =
|
||||||
.stdout
|
launch_bytecode_compiler(&dir, &interpreter, &pip_compileall_py).await?
|
||||||
.take()
|
{
|
||||||
.expect("Child must have stdout"),
|
break Ok::<_, CompileError>(child);
|
||||||
);
|
}
|
||||||
let mut child_stderr = BufReader::new(
|
}
|
||||||
bytecode_compiler
|
};
|
||||||
.stderr
|
// Handle a broken `python` by using a timeout, one that's higher than any compilation
|
||||||
.take()
|
// should ever take.
|
||||||
.expect("Child must have stderr"),
|
let (mut bytecode_compiler, child_stdin, mut child_stdout, mut child_stderr) =
|
||||||
);
|
tokio::time::timeout(COMPILE_TIMEOUT, wait_until_ready)
|
||||||
|
.await
|
||||||
|
.map_err(|_| CompileError::Timeout(COMPILE_TIMEOUT))??;
|
||||||
|
|
||||||
let stderr_reader = tokio::task::spawn(async move {
|
let stderr_reader = tokio::task::spawn(async move {
|
||||||
let mut child_stderr_collected: Vec<u8> = Vec::new();
|
let mut child_stderr_collected: Vec<u8> = Vec::new();
|
||||||
|
|
@ -221,6 +212,78 @@ async fn worker(
|
||||||
result
|
result
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Returns the child and stdin/stdout/stderr on a successful launch or `None` for a broken interpreter state.
|
||||||
|
async fn launch_bytecode_compiler(
|
||||||
|
dir: &Path,
|
||||||
|
interpreter: &Path,
|
||||||
|
pip_compileall_py: &Path,
|
||||||
|
) -> Result<
|
||||||
|
Option<(
|
||||||
|
Child,
|
||||||
|
ChildStdin,
|
||||||
|
BufReader<ChildStdout>,
|
||||||
|
BufReader<ChildStderr>,
|
||||||
|
)>,
|
||||||
|
CompileError,
|
||||||
|
> {
|
||||||
|
// We input the paths through stdin and get the successful paths returned through stdout.
|
||||||
|
let mut bytecode_compiler = Command::new(interpreter)
|
||||||
|
.arg(pip_compileall_py)
|
||||||
|
.stdin(Stdio::piped())
|
||||||
|
.stdout(Stdio::piped())
|
||||||
|
.stderr(Stdio::piped())
|
||||||
|
.current_dir(dir)
|
||||||
|
// Otherwise stdout is buffered and we'll wait forever for a response
|
||||||
|
.env("PYTHONUNBUFFERED", "1")
|
||||||
|
.spawn()
|
||||||
|
.map_err(CompileError::PythonSubcommand)?;
|
||||||
|
|
||||||
|
// https://stackoverflow.com/questions/49218599/write-to-child-process-stdin-in-rust/49597789#comment120223107_49597789
|
||||||
|
// Unbuffered, we need to write immediately or the python process will get stuck waiting
|
||||||
|
let child_stdin = bytecode_compiler
|
||||||
|
.stdin
|
||||||
|
.take()
|
||||||
|
.expect("Child must have stdin");
|
||||||
|
let mut child_stdout = BufReader::new(
|
||||||
|
bytecode_compiler
|
||||||
|
.stdout
|
||||||
|
.take()
|
||||||
|
.expect("Child must have stdout"),
|
||||||
|
);
|
||||||
|
let child_stderr = BufReader::new(
|
||||||
|
bytecode_compiler
|
||||||
|
.stderr
|
||||||
|
.take()
|
||||||
|
.expect("Child must have stderr"),
|
||||||
|
);
|
||||||
|
|
||||||
|
// Check if the launch was successful.
|
||||||
|
let mut out_line = String::new();
|
||||||
|
child_stdout
|
||||||
|
.read_line(&mut out_line)
|
||||||
|
.await
|
||||||
|
.map_err(|err| CompileError::ChildStdio {
|
||||||
|
device: "stdout",
|
||||||
|
err,
|
||||||
|
})?;
|
||||||
|
|
||||||
|
if out_line.trim_end() == "Ready" {
|
||||||
|
// Success
|
||||||
|
Ok(Some((
|
||||||
|
bytecode_compiler,
|
||||||
|
child_stdin,
|
||||||
|
child_stdout,
|
||||||
|
child_stderr,
|
||||||
|
)))
|
||||||
|
} else if out_line.is_empty() {
|
||||||
|
// Failed to launch, try again
|
||||||
|
Ok(None)
|
||||||
|
} else {
|
||||||
|
// Not observed yet
|
||||||
|
Err(CompileError::WrongPath("Ready".to_string(), out_line))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// We use stdin/stdout as a sort of bounded channel. We write one path to stdin, then wait until
|
/// We use stdin/stdout as a sort of bounded channel. We write one path to stdin, then wait until
|
||||||
/// we get the same path back from stdout. This way we ensure one worker is only working on one
|
/// we get the same path back from stdout. This way we ensure one worker is only working on one
|
||||||
/// piece of work at the same time.
|
/// piece of work at the same time.
|
||||||
|
|
|
||||||
|
|
@ -14,6 +14,10 @@ import warnings
|
||||||
|
|
||||||
with warnings.catch_warnings():
|
with warnings.catch_warnings():
|
||||||
warnings.filterwarnings("ignore")
|
warnings.filterwarnings("ignore")
|
||||||
|
|
||||||
|
# Successful launch check
|
||||||
|
print("Ready")
|
||||||
|
|
||||||
# In rust, we provide one line per file to compile.
|
# In rust, we provide one line per file to compile.
|
||||||
for path in sys.stdin:
|
for path in sys.stdin:
|
||||||
# Remove trailing newlines.
|
# Remove trailing newlines.
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue