uv/scripts/ecosystem-testing/ecosystem_testing.py

358 lines
10 KiB
Python

#!/usr/bin/env -S uv run --script
# NB: LLM code ahead
# /// script
# requires-python = ">=3.13"
# dependencies = [
# "tomli-w>=1.2.0,<2.0.0",
# "tqdm>=4.67.1,<5.0.0",
# ]
# ///
import argparse
import concurrent.futures
import csv
import json
import os
import platform
import shutil
import subprocess
import time
import tomllib
from concurrent.futures import ThreadPoolExecutor
from dataclasses import dataclass
from pathlib import Path
from threading import Thread
import tomli_w
from tqdm.auto import tqdm
cwd = Path(__file__).parent
@dataclass
class Summary:
package: str
exit_code: int
max_rss: int
time: float
def run_uv(
package: str,
specification: str,
uv: Path,
mode: str,
python: str,
cache: Path,
offline: bool,
output: Path,
) -> Summary:
"""Resolve in a uv subprocess.
The logic captures the max RSS from the process and avoids deadlocks from full
pipes.
"""
package_dir = output.joinpath(package)
package_dir.mkdir()
command = prepare_uv_command(
specification,
uv,
mode,
cache,
offline,
package_dir,
python,
)
start = time.time()
process = subprocess.Popen(
command,
cwd=package_dir,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
)
stdout, stderr = communicate(process, specification if mode == "compile" else None)
# At this point, the process is a zombie, so has called `exit()`, but we haven't reaped it with `wait4` yet.
# rusage is only available on unix
if os.name == "posix":
# Wait for process and get resource usage
_pid, exit_code, rusage = os.wait4(process.pid, 0)
else:
exit_code = process.wait()
rusage = None
max_rss = rusage.ru_maxrss if rusage else 0
package_dir.joinpath("stdout.txt").write_text(stdout)
package_dir.joinpath("stderr.txt").write_text(stderr)
summary = Summary(
package=package, exit_code=exit_code, max_rss=max_rss, time=time.time() - start
)
package_dir.joinpath("summary.json").write_text(json.dumps(summary.__dict__))
return summary
def prepare_uv_command(
specification: str,
uv: Path,
mode: str,
cache: Path,
offline: bool,
package_dir: Path,
python: str,
) -> list[Path | str]:
shared_args = [
"--no-build",
"--cache-dir",
cache,
"--color",
"never",
]
if offline:
shared_args.append("--offline")
if mode == "pyproject-toml":
package_dir.joinpath("pyproject.toml").write_text(specification)
command = [uv, "lock", *shared_args]
elif mode == "lock":
package_dir.joinpath("pyproject.toml").write_text(
f"""
[project]
name = "testing"
version = "0.1.0"
requires-python = ">={python}"
dependencies = ["{specification}"]
"""
)
command = [uv, "lock", *shared_args]
elif mode == "compile":
command = [
uv,
"pip",
"compile",
"-",
"-p",
python,
# The results are more reproducible if they are platform independent
"--universal",
"--no-header",
"--no-annotate",
*shared_args,
]
else:
raise ValueError(f"Unknown mode: {mode}")
return command
def communicate(process: subprocess.Popen, stdin: str | None) -> tuple[str, str]:
"""Like `Popen.communicate`, but without the `os.wait` call.
Start threads to drain the pipes to avoid blocking on full pipes, but don't use
libc's `wait` so we can use `os.wait4` later.
"""
if stdin:
process.stdin.write(stdin)
process.stdin.close()
# Mutable objects to communicate across threads
stdout = []
stderr = []
def read_stdout():
stdout.append(process.stdout.read())
process.stdout.close()
def read_stderr():
stderr.append(process.stderr.read())
process.stderr.close()
stdout_thread = Thread(target=read_stdout, daemon=True)
stderr_thread = Thread(target=read_stderr, daemon=True)
stdout_thread.start()
stderr_thread.start()
stdout_thread.join()
stderr_thread.join()
return stdout[0], stderr[0]
def main():
parser = argparse.ArgumentParser()
parser.add_argument(
"--input", type=Path, default=cwd.joinpath("top-pypi-packages.json")
)
parser.add_argument(
"--mode",
choices=["compile", "lock", "pyproject-toml"],
default="compile",
help="`compile`: `uv pip compile`, "
"`lock`: `uv lock` from a single requirement"
"`pyproject-toml`: `uv lock` from a directory of `pyproject.toml` files",
)
parser.add_argument("--python", "-p", type=str, default="3.13")
parser.add_argument("--output", type=Path, default="output")
parser.add_argument("--uv", type=Path, default=Path("uv"))
parser.add_argument("--limit", type=int, default=None)
parser.add_argument("--cache", type=Path, default=cwd.joinpath("cache"))
parser.add_argument("--offline", action="store_true")
parser.add_argument("--latest", action="store_true")
args = parser.parse_args()
if args.mode == "pyproject-toml":
project_tomls = sorted((file.stem, file) for file in args.input.iterdir())
jobs = {}
no_project = 0
dynamic_dependencies = 0
for package, file in project_tomls:
if len(jobs) >= args.limit:
break
if file.suffix != ".toml":
continue
project_toml = file.read_text()
data = tomllib.loads(project_toml)
project = data.get("project")
if not project:
no_project += 1
continue
if dynamic := project.get("dynamic"):
if "dependencies" in dynamic:
dynamic_dependencies += 1
continue
if "version" in dynamic:
dynamic.remove("version")
# Usually there are no cycles back to the current project, so any version works
project["version"] = "1.0.0"
jobs[package] = tomli_w.dumps(data)
print(f"`pyproject.toml`s without `[project]`: {no_project}")
print(
f"`pyproject.toml`s with `dynamic = ['dependencies']`: {dynamic_dependencies}"
)
if args.latest:
raise ValueError("Latest versions are not supported in pyproject-toml mode")
else:
project_names = json.loads(args.input.read_text())
project_names = sorted(pkg["project"] for pkg in project_names["rows"])
if args.latest:
with cwd.joinpath("package_versions.csv").open() as f:
latest_versions = {
row["package_name"]: row["latest_version"]
for row in csv.DictReader(f)
}
else:
latest_versions = None
jobs = {}
for package in project_names[: args.limit]:
if latest_versions:
if version := latest_versions.get(package):
jobs[package] = f"{package}=={version}"
else:
tqdm.write(f"Missing version: {package}")
continue
else:
jobs[package] = package
excluded_packages = [
# 5000 releases, no solution
"nucliadb",
# These packages have many non-small versions
"tf-models-nightly",
"mtmtrain",
"llm-dialog-manager",
"python-must",
# Slow and have no solution
"edx-enterprise",
"kcli",
"emmet-api",
]
for package in excluded_packages:
jobs.pop(package, None)
if args.output.exists():
shutil.rmtree(args.output)
args.output.mkdir(parents=True)
args.output.joinpath(".gitignore").write_text("*")
parameters = {
"mode": args.mode,
"python": args.python,
"latest": args.latest,
}
args.output.joinpath("parameters.json").write_text(json.dumps(parameters))
success = 0
all_results = [] # Track all results for analysis
max_package_len = max(len(package) for package in jobs)
with ThreadPoolExecutor(max_workers=os.cpu_count() * 2) as executor:
tasks = []
packages_pending = []
for package, specification in jobs.items():
packages_pending.append(package)
tasks.append(
executor.submit(
run_uv,
package,
specification,
args.uv,
args.mode,
args.python,
args.cache,
args.offline,
args.output,
)
)
total = len(packages_pending)
with tqdm(total=total) as progress_bar:
for result in concurrent.futures.as_completed(tasks):
summary = result.result()
all_results.append(summary)
progress_bar.update(1)
packages_pending.remove(summary.package)
if packages_pending:
progress_bar.set_postfix_str(
f"{packages_pending[0]:>{max_package_len}}"
)
if summary.exit_code == 0:
success += 1
print(f"Success: {success}/{total} ({success / total:.0%})")
successes = [summary for summary in all_results if summary.exit_code == 0]
print("\n# top 5 slowest resolutions for successes")
slowest = sorted(successes, key=lambda x: x.time, reverse=True)[:5]
for summary in slowest:
print(
f"{summary.package}: {summary.time:.2f}s (exit code: {summary.exit_code})"
)
if os.name == "posix":
print("\n# top 5 max RSS for successes")
largest_rss = sorted(successes, key=lambda x: x.max_rss, reverse=True)[:5]
for summary in largest_rss:
# On linux, max RSS is in KB, on macOS, it is in bytes
if platform.system() == "Linux":
max_rss = summary.max_rss / 1024
elif platform.system() == "Darwin":
max_rss = summary.max_rss / 1024 / 1024
else:
raise NotImplementedError(f"Unknown platform: {platform.system()}")
print(
f"{summary.package}: {max_rss:.1f} MB (exit code: {summary.exit_code})"
)
if __name__ == "__main__":
main()