#!/usr/bin/env python3 """ A daemon process for PEP 517 build hook requests. See the `README` for details. """ from __future__ import annotations import enum import importlib import io import errno import os import sys import time import traceback from contextlib import ExitStack, contextmanager from functools import cache from typing import Any, Literal, Self, TextIO # Arbitrary nesting is allowed, but all keys and terminal values are strings StringDict = dict[str, "str | StringDict"] # Alias for readability — we don't use `pathlib` for a modest speedup Path = str class FatalError(Exception): """An unrecoverable error in the daemon""" def __init__(self, *args) -> None: super().__init__(*args) class UnreadableInput(FatalError): """Standard input is not readable""" def __init__(self, reason: str) -> None: super().__init__("Standard input is not readable " + reason) class HookdError(Exception): """A non-fatal exception related the this program""" def message(self) -> str: pass def __repr__(self) -> str: attributes = ", ".join( f"{key}={value!r}" for key, value in self.__dict__.items() ) return f"{type(self)}({attributes})" def __str__(self) -> str: return self.message() class MissingBackendModule(HookdError): """A backend was not found""" def __init__(self, name: str) -> None: self.name = name super().__init__() def message(self) -> str: return f"Failed to import the backend {self.name!r}" class MissingBackendAttribute(HookdError): """A backend attribute was not found""" def __init__(self, module: str, attr: str) -> None: self.attr = attr self.module = module super().__init__() def message(self) -> str: return f"Failed to find attribute {self.attr!r} in the backend module {self.module!r}" class MalformedBackendName(HookdError): """A backend is not valid""" def __init__(self, name: str) -> None: self.name = name super().__init__() def message(self) -> str: return f"Backend {self.name!r} is malformed" class BackendImportError(HookdError): """A backend raised an exception on import""" def __init__(self, exc: Exception) -> None: self.exc = exc super().__init__() def message(self) -> str: return f"Backend threw an exception during import: {self.exc}" class InvalidHookName(HookdError): """A parsed hook name is not valid""" def __init__(self, name: str) -> None: self.name = name super().__init__() def message(self) -> str: names = ", ".join(repr(name) for name in Hook._member_names_) return f"The name {self.name!r} is not valid hook. Expected one of: {names}" class InvalidAction(HookdError): """The given action is not valid""" def __init__(self, name: str) -> None: self.name = name super().__init__() def message(self) -> str: names = ", ".join(repr(name) for name in Action._member_names_) return f"Received invalid action {self.name!r}. Expected one of: {names}" class UnsupportedHook(HookdError): """A hook is not supported by the backend""" def __init__(self, backend: object, hook: Hook) -> None: self.backend = backend self.hook = hook super().__init__() def message(self) -> str: hook_names = set(Hook._member_names_) names = ", ".join( repr(name) for name in dir(self.backend) if name in hook_names ) hint = ( f"The backend supports: {names}" if names else "The backend does not support any known hooks." ) return f"The hook {self.hook.value!r} is not supported by the backend. {hint}" class MalformedHookArgument(HookdError): """A parsed hook argument was not in the expected format""" def __init__(self, raw: str, argument: HookArgument) -> None: self.raw = raw self.argument = argument super().__init__() def message(self) -> str: # TODO(zanieb): Consider display an expected type return f"Malformed content for argument {self.argument.name!r}: {self.raw!r}" class HookRuntimeError(HookdError): """Execution of a hook failed""" def __init__(self, exc: BaseException) -> None: self.exc = exc super().__init__() def message(self) -> str: return str(self.exc) class Hook(enum.StrEnum): build_wheel = enum.auto() build_sdist = enum.auto() prepare_metadata_for_build_wheel = enum.auto() get_requires_for_build_wheel = enum.auto() get_requires_for_build_sdist = enum.auto() @classmethod def from_str(cls: type[Self], name: str) -> Self: try: return Hook(name) except ValueError: raise InvalidHookName(name) from None def parse_build_backend(buffer: TextIO) -> str: # TODO: Add support for `build-path` name = buffer.readline().rstrip("\n") if not name: # Default to the legacy build name name = "setuptools.build_meta:__legacy__" return name @cache def import_build_backend(backend_name: str) -> object: """ See: https://peps.python.org/pep-0517/#source-trees """ parts = backend_name.split(":") if len(parts) == 1: module_name = parts[0] attribute = None elif len(parts) == 2: module_name = parts[0] attribute = parts[1] # Check for malformed attribute if not attribute: raise MalformedBackendName(backend_name) else: raise MalformedBackendName(backend_name) module = None backend = None try: module = importlib.import_module(module_name) except ImportError: # If they could not have meant `.`, raise if "." not in module_name: raise MissingBackendModule(module_name) if module is None: # Otherwise, we'll try to load it as an attribute of a module parent_name, child_name = module_name.rsplit(".", 1) try: module = importlib.import_module(parent_name) except ImportError: raise MissingBackendModule(module_name) try: backend = getattr(module, child_name) except AttributeError: raise MissingBackendAttribute(module_name, child_name) if attribute is not None: try: backend = getattr(module, attribute) except AttributeError: raise MissingBackendAttribute(module_name, backend_name) if backend is None: backend = module return backend class Action(enum.StrEnum): run = enum.auto() shutdown = enum.auto() @classmethod def from_str(cls: type[Self], action: str) -> Self: try: return Action(action) except ValueError: raise InvalidAction(action) from None def parse_action(buffer: TextIO) -> Action: action = buffer.readline().rstrip("\n") return Action.from_str(action) def parse_hook_name(buffer: TextIO) -> Hook: name = buffer.readline().rstrip("\n") return Hook.from_str(name) def parse_path(buffer: TextIO) -> Path: path = os.path.abspath(buffer.readline().rstrip("\n")) # TODO(zanieb): Consider validating the path here return path def parse_optional_path(buffer: TextIO) -> Path | None: data = buffer.readline().rstrip("\n") if not data: return None # TODO(zanieb): Consider validating the path here return os.path.abspath(data) def parse_config_settings(buffer: TextIO) -> StringDict | None: """ See https://peps.python.org/pep-0517/#config-settings """ data = buffer.readline().rstrip("\n") if not data: return None # We defer the import of `json` until someone actually passes us a `config_settings` # object since it's not necessarily common import json try: # TODO(zanieb): Consider using something faster than JSON here since we _should_ # be restricted to strings return json.loads(data) except json.decoder.JSONDecodeError as exc: raise MalformedHookArgument(data, HookArgument.config_settings) from exc @contextmanager def redirect_sys_stream(name: Literal["stdout", "stderr"]): """ Redirect a system stream to a temporary file. Deletion of the temporary file is deferred to the caller. WARNING: This function is not safe to concurrent usage. """ stream: TextIO = getattr(sys, name) # We use an optimized version of `NamedTemporaryFile` fd, name = tmpfile() setattr(sys, name, io.open(fd, "rt")) yield name # Restore to the previous stream setattr(sys, name, stream) class HookArgument(enum.StrEnum): wheel_directory = enum.auto() config_settings = enum.auto() metadata_directory = enum.auto() sdist_directory = enum.auto() def parse_hook_argument(hook_arg: HookArgument, buffer: TextIO) -> Any: if hook_arg == HookArgument.wheel_directory: return parse_path(buffer) if hook_arg == HookArgument.metadata_directory: return parse_optional_path(buffer) if hook_arg == HookArgument.sdist_directory: return parse_path(buffer) if hook_arg == HookArgument.config_settings: return parse_config_settings(buffer) raise FatalError(f"No parser for hook argument kind {hook_arg.name!r}") HookArguments = { Hook.build_sdist: ( HookArgument.sdist_directory, HookArgument.config_settings, ), Hook.build_wheel: ( HookArgument.wheel_directory, HookArgument.config_settings, HookArgument.metadata_directory, ), Hook.prepare_metadata_for_build_wheel: ( HookArgument.metadata_directory, HookArgument.config_settings, ), Hook.get_requires_for_build_sdist: (HookArgument.config_settings,), Hook.get_requires_for_build_wheel: (HookArgument.config_settings,), } def write_safe(file: TextIO, *args: str): args = [str(arg).replace("\n", "\\n") for arg in args] print(*args, file=file) def send_expect(file: TextIO, name: str): write_safe(file, "EXPECT", name) def send_ready(file: TextIO): write_safe(file, "READY") def send_shutdown(file: TextIO): write_safe(file, "SHUTDOWN") def send_error(file: TextIO, exc: HookdError): write_safe(file, "ERROR", type(exc).__name__, str(exc)) send_traceback(file, exc) def send_traceback(file: TextIO, exc: BaseException): tb = traceback.format_exception(exc) write_safe(file, "TRACEBACK", "\n".join(tb)) def send_ok(file: TextIO, result: str): write_safe(file, "OK", result) def send_fatal(file: TextIO, exc: BaseException): write_safe(file, "FATAL", type(exc).__name__, str(exc)) send_traceback(file, exc) def send_debug(file: TextIO, *args): write_safe(file, "DEBUG", *args) def send_redirect(file: TextIO, name: Literal["stdout", "stderr"], path: str): write_safe(file, name.upper(), path) def run_once(stdin: TextIO, stdout: TextIO): start = time.perf_counter() send_expect(stdout, "build-backend") build_backend_name = parse_build_backend(stdin) send_expect(stdout, "hook-name") hook_name = parse_hook_name(stdin) if hook_name not in HookArguments: raise FatalError(f"No arguments defined for hook {hook_name!r}") # Parse arguments for the given hook def parse(argument: str): send_expect(stdout, argument.name) return parse_hook_argument(argument, stdin) args = tuple(parse(argument) for argument in HookArguments[hook_name]) send_debug( stdout, build_backend_name, hook_name, *(f"{name}={value}" for name, value in zip(HookArguments[hook_name], args)), ) end = time.perf_counter() send_debug(stdout, f"parsed hook inputs in {(end - start)*1000.0:.2f}ms") # All hooks are run with working directory set to the root of the source tree # TODO(zanieb): Where do we get the path of the source tree? with ExitStack() as hook_ctx: hook_stdout = hook_ctx.enter_context(redirect_sys_stream("stdout")) hook_stderr = hook_ctx.enter_context(redirect_sys_stream("stderr")) send_redirect(stdout, "stdout", str(hook_stdout)) send_redirect(stdout, "stderr", str(hook_stderr)) try: build_backend = import_build_backend(build_backend_name) except Exception as exc: if not isinstance(exc, HookdError): # Wrap unhandled errors in a generic one raise BackendImportError(exc) from exc raise try: hook = getattr(build_backend, hook_name) except AttributeError: raise UnsupportedHook(build_backend, hook_name) try: result = hook(*args) except BaseException as exc: # Respect SIGTERM and SIGINT if isinstance(exc, (SystemExit, KeyboardInterrupt)): raise raise HookRuntimeError(exc) from exc else: send_ok(stdout, result) """ Optimized version of temporary file creation. Implemenation based on CPython. This implementation: - Uses UUIDs instead of the CPython random name generator - Finds a valid temporary directory at the same time as creating the temporary file - Avoids having to unlink a file created just to test if the directory is valid - Only finds the default temporary directory _once_ then caches it """ _text_openflags = os.O_RDWR | os.O_CREAT | os.O_EXCL if hasattr(os, "O_NOFOLLOW"): _text_openflags |= os.O_NOFOLLOW def _candidate_tempdirs(): """ Generate a list of candidate temporary directories """ dirlist = [] # First, try the environment. for envname in "TMPDIR", "TEMP", "TMP": dirname = os.getenv(envname) if dirname: dirlist.append(dirname) # Failing that, try OS-specific locations. if os.name == "nt": dirlist.extend( [ os.path.expanduser(r"~\AppData\Local\Temp"), os.path.expandvars(r"%SYSTEMROOT%\Temp"), r"c:\temp", r"c:\tmp", r"\temp", r"\tmp", ] ) else: dirlist.extend(["/tmp", "/var/tmp", "/usr/tmp"]) # As a last resort, the current directory. try: dirlist.append(os.getcwd()) except (AttributeError, OSError): dirlist.append(os.curdir) return dirlist _default_tmpdir = None _max_tmpfile_attempts = 10000 def tmpfile(): global _default_tmpdir # Use the default directory if previously found, otherwise we will # find if not _default_tmpdir: tmpdir = None candidate_tempdirs = iter(_candidate_tempdirs()) else: tmpdir = _default_tmpdir candidate_tempdirs = None for attempt in range(_max_tmpfile_attempts): # Generate a random hex string, similar to a UUID without version and variant information name = "%032x" % int.from_bytes(os.urandom(16)) # Every one hundred attempts, switch to another candidate directory if not _default_tmpdir and attempt % 100 == 0: try: tmpdir = next(candidate_tempdirs) except StopIteration: raise FileNotFoundError( errno.ENOENT, f"No usable temporary directory found in {_candidate_tempdirs()}", ) from None file = os.path.join(tmpdir, name) try: fd = os.open(file, _text_openflags, 0o600) except FileExistsError: continue # try again except PermissionError: # This exception is thrown when a directory with the chosen name # already exists on windows. if ( os.name == "nt" and os.path.isdir(_default_tmpdir) and os.access(dir, os.W_OK) ): continue else: raise _default_tmpdir = tmpdir return fd, file raise FileExistsError(errno.EEXIST, "No usable temporary file name found") def main(): # Create copies of standard streams since the `sys.` will be redirected during # hook execution stdout = sys.stdout stdin = sys.stdin # TODO: Close `sys.stdin` and create a duplicate file for ourselves so hooks don't read from our stream while True: try: start = time.perf_counter() if not stdin.readable(): raise UnreadableInput() send_ready(stdout) send_expect(stdout, "action") action = parse_action(stdin) if action == Action.shutdown: send_shutdown(stdout) break run_once(stdin, stdout) end = time.perf_counter() send_debug(stdout, f"ran hook in {(end - start)*1000.0:.2f}ms") except HookdError as exc: # These errors are "handled" and non-fatal try: send_error(stdout, exc) except Exception as exc: # Failures to report errors are a fatal error send_fatal(stdout, exc) raise exc except BaseException as exc: # All other exceptions result in a crash of the daemon send_fatal(stdout, exc) raise if __name__ == "__main__": if len(sys.argv) > 2: print( "Invalid usage. Expected one argument specifying the path to the source tree.", file=sys.stderr, ) sys.exit(1) try: source_tree = os.path.abspath(sys.argv[1]) os.chdir(source_tree) send_debug(sys.stdout, "changed working directory to", source_tree) except IndexError: pass except ValueError as exc: print( f"Invalid usage. Expected path argument but validation failed: {exc}", file=sys.stderr, ) sys.exit(1) main()