From cc8459ecb5c053adc2d9c3da40b54eccc953cdef Mon Sep 17 00:00:00 2001 From: Zanie Date: Wed, 17 Jan 2024 11:24:36 -0600 Subject: [PATCH] Use a channel to read stdout --- crates/puffin-build/src/daemon.rs | 46 +++++++++++++++++++++++++------ crates/puffin-build/src/lib.rs | 4 +-- 2 files changed, 38 insertions(+), 12 deletions(-) diff --git a/crates/puffin-build/src/daemon.rs b/crates/puffin-build/src/daemon.rs index adb23ccd1..8fd8b7ece 100644 --- a/crates/puffin-build/src/daemon.rs +++ b/crates/puffin-build/src/daemon.rs @@ -7,6 +7,7 @@ use tokio::fs::File; use tokio::io::{self, BufReader}; use tokio::io::{AsyncWriteExt, Lines}; use tokio::process::{Child, ChildStdin, ChildStdout, Command}; +use tokio::sync::mpsc::{Receiver, Sender}; use tracing::{debug, error}; use crate::{BuildKind, Pep517Backend}; @@ -126,9 +127,23 @@ impl HookErrorKind { } } } + +async fn read_stdout_into_channel( + mut lines: Lines>, + sender: Sender, +) -> () { + // We do not handle any errors here. If there's an IO error or the receiver is dropped, + // we will just exit. The receiver will report the daemon as closed if it attempts to + // read after this channel exits. + while let Ok(Some(line)) = lines.next_line().await { + if let Err(_) = sender.send(line).await { + return; + } + } +} #[derive(Debug)] struct DaemonIO { - stdout: Lines>, + stdout: Receiver, stdin: ChildStdin, handle: Child, } @@ -143,8 +158,14 @@ impl DaemonIO { // Take standard input let stdin = handle.stdin.take().expect("stdin is available"); + // Create a channel to read standard output continuously + let (sender, receiver) = tokio::sync::mpsc::channel(20); + + // We let this handle drop, as we don't care about its result + tokio::spawn(read_stdout_into_channel(stdout, sender)); + Self { - stdout, + stdout: receiver, stdin, handle, } @@ -159,6 +180,11 @@ impl DaemonIO { self.stdin.flush().await?; Ok(()) } + + async fn recv(&mut self) -> Option { + self.stdout.recv().await + } + async fn close(mut self) -> Result, DaemonError> { if !self.exited()? { // Send a shutdown command if it's not closed yet @@ -195,7 +221,6 @@ impl Pep517Daemon { /// Ensure the daemon is started and ready. /// If the daemon is not started, [`Self::start`] will be called. - /// async fn ensure_started(&mut self) -> Result<&mut DaemonIO, DaemonError> { let started = { if let Some(io) = self.io.as_mut() { @@ -250,11 +275,14 @@ impl Pep517Daemon { /// Reads a single response from the daemon. async fn receive_one(&mut self) -> Result { - let stdout = &mut self.io.as_mut().unwrap().stdout; - if let Some(line) = stdout.next_line().await? { - Ok(DaemonResponse::try_from_str(line.as_str())?) + if let Some(io) = self.io.as_mut() { + if let Some(line) = io.recv().await { + Ok(DaemonResponse::try_from_str(line.as_str())?) + } else { + self.close().await?; + Err(DaemonError::Closed) + } } else { - self.close().await?; Err(DaemonError::Closed) } } @@ -292,7 +320,7 @@ impl Pep517Daemon { hook_name: &str, mut args: Vec<&str>, ) -> Result { - let mut io = self.ensure_started().await?; + let io = self.ensure_started().await?; // Always send run and the backend name let mut commands = vec!["run", backend.backend.as_str()]; @@ -434,7 +462,7 @@ impl Pep517Daemon { // If there's an error on close, we should raise that instead of complaining it was never called self.closed = true; - if let Some(mut io) = self.io.take() { + if let Some(io) = self.io.take() { io.close().await } else { Ok(None) diff --git a/crates/puffin-build/src/lib.rs b/crates/puffin-build/src/lib.rs index 2756cc542..1adf7a955 100644 --- a/crates/puffin-build/src/lib.rs +++ b/crates/puffin-build/src/lib.rs @@ -366,7 +366,6 @@ impl SourceBuild { if let Some(pep517_backend) = &pep517_backend { if pep517_backend != &default_backend { let environment = create_pep517_build_environment( - &source_tree, &venv, pep517_backend, &mut pep517_daemon, @@ -537,12 +536,11 @@ impl SourceBuildTrait for SourceBuild { } /// Not a method because we call it before the builder is completely initialized async fn create_pep517_build_environment( - source_tree: &Path, venv: &Virtualenv, pep517_backend: &Pep517Backend, pep517_daemon: &mut Pep517Daemon, build_context: &impl BuildContext, - package_id: &str, + _package_id: &str, build_kind: BuildKind, ) -> Result<(), Error> { let extra_requires = pep517_daemon