From 66801d01f106fbd4e4dd9253b10d958df88f249f Mon Sep 17 00:00:00 2001 From: konstin Date: Sun, 21 Sep 2025 20:29:53 +0200 Subject: [PATCH] . --- .../ecosystem-testing/ecosystem_testing.py | 99 +++++++++---------- 1 file changed, 45 insertions(+), 54 deletions(-) diff --git a/scripts/ecosystem-testing/ecosystem_testing.py b/scripts/ecosystem-testing/ecosystem_testing.py index 608e53d6a..d6afe84fb 100644 --- a/scripts/ecosystem-testing/ecosystem_testing.py +++ b/scripts/ecosystem-testing/ecosystem_testing.py @@ -10,11 +10,14 @@ import argparse import concurrent import csv +import functools import json +import multiprocessing import os import shutil import signal import subprocess +import sys import time from collections import deque from concurrent.futures import ThreadPoolExecutor @@ -26,22 +29,6 @@ from tqdm.auto import tqdm cwd = Path(__file__).parent -# Global flag to track ctrl+c -should_stop = False - - -def signal_handler(signum, frame): - """Handle Ctrl+C gracefully.""" - global should_stop - print("\nReceived interrupt signal. Stopping gracefully...") - should_stop = True - - # Send SIGTERM to all processes in our process group - try: - os.killpg(0, signal.SIGTERM) - except ProcessLookupError: - pass - @dataclass class Summary: @@ -54,6 +41,11 @@ class Summary: def run_uv( cmd: list[str], package: str, output_dir: Path, version: str | None ) -> Summary: + """Run a uv subprocess. + + The logic captures the max RSS from the process and avoids deadlocks from full + pipes.""" + start = time.time() process = subprocess.Popen( @@ -92,7 +84,6 @@ def run_uv( # Wait for process and get resource usage _pid, exit_code, rusage = os.wait4(process.pid, 0) - # Wait for threads to finish reading stdout_thread.join() stderr_thread.join() @@ -108,18 +99,21 @@ def run_uv( summary = Summary( package=package, exit_code=exit_code, max_rss=max_rss, time=time.time() - start ) - # if max_rss > 1000 * 1024: - # print(f"{package} exit code:{exit_code}, {max_rss / 1024:.0f} MB") package_dir.joinpath("summary.json").write_text(json.dumps(summary.__dict__)) return summary -def main(): - # Register signal handler for Ctrl+C - signal.signal(signal.SIGINT, signal_handler) +def signal_handler(executor: ThreadPoolExecutor, signum, frame): + """Handle Ctrl+C gracefully.""" + print(f"Stopping for SIGINT (signal {signum})") + executor.shutdown(wait=False, cancel_futures=True) + print("Stopped.") + sys.exit(1) + +def main(): parser = argparse.ArgumentParser() - parser.add_argument("--python", type=str, default="3.13") + parser.add_argument("--python", "-p", type=str, default="3.13") parser.add_argument("--output-dir", type=Path, default="output") parser.add_argument("--uv", type=Path, default=Path("uv")) parser.add_argument("--limit", type=int, default=None) @@ -164,9 +158,10 @@ def main(): args.uv, "pip", "compile", + "-", "-p", args.python, - "-", + "--universal", "--no-build", "--cache-dir", args.cache, @@ -179,59 +174,55 @@ def main(): cmd.append("--offline") success = 0 all_results = [] # Track all results for analysis - interrupted = False max_package_len = max(len(package) for package in top_15k_pypi[: args.limit]) - try: - with ThreadPoolExecutor(max_workers=os.cpu_count() * 2) as executor: - tasks = [] - packages_pending = [] - for package in top_15k_pypi[: args.limit]: - if latest_versions: - if version := latest_versions.get(package): - pass - else: - tqdm.write(f"Missing version: {package}") - continue + with ThreadPoolExecutor(max_workers=os.cpu_count() * 2) as executor: + # Shutdown executor on ctrl+c + previous_sigint_handler = signal.signal( + signal.SIGINT, functools.partial(signal_handler, executor) + ) + + tasks = [] + packages_pending = [] + for package in top_15k_pypi[: args.limit]: + if latest_versions: + if version := latest_versions.get(package): + pass else: - version = None - tasks.append(executor.submit(run_uv, cmd, package, output_dir, version)) - packages_pending.append(package) - - progress_bar = tqdm(total=len(packages_pending)) + tqdm.write(f"Missing version: {package}") + continue + else: + version = None + packages_pending.append(package) + tasks.append(executor.submit(run_uv, cmd, package, output_dir, version)) + total = len(packages_pending) + with tqdm(total=total) as progress_bar: for result in concurrent.futures.as_completed(tasks): summary = result.result() - all_results.append(summary) # Collect all results + all_results.append(summary) progress_bar.update(1) packages_pending.remove(summary.package) - if len(packages_pending) > 0: + if packages_pending: progress_bar.set_postfix_str( f"{packages_pending[0]:>{max_package_len}}" ) if summary.exit_code == 0: success += 1 - progress_bar.close() + signal.signal(signal.SIGINT, previous_sigint_handler) - except KeyboardInterrupt: - print("\nInterrupted. Cleaning up...") - interrupted = True - - if interrupted or should_stop: - print(f"Interrupted. Success: {success}/{len(all_results)} (completed tasks)") - else: - print(f"Success: {success}/{len(top_15k_pypi[: args.limit])}") + print(f"Success: {success}/{total}") successes = [summary for summary in all_results if summary.exit_code == 0] - print("\ntop 5 max RSS") + print("\n# top 5 max RSS for successes") largest_rss = sorted(successes, key=lambda x: x.max_rss, reverse=True)[:5] for summary in largest_rss: print( f"{summary.package}: {summary.max_rss / 1024:.1f} MB (exit code: {summary.exit_code})" ) - print("\ntop 5 slowest resolutions") + print("\n# top 5 slowest resolutions for successes") slowest = sorted(successes, key=lambda x: x.time, reverse=True)[:5] for summary in slowest: print(