diff --git a/Cargo.lock b/Cargo.lock index 8747977ff9..14d2ee88e4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1520,6 +1520,16 @@ dependencies = [ "autocfg", ] +[[package]] +name = "num_cpus" +version = "1.16.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4161fcb6d602d4d2081af7c3a45852d875a03dd337a6bfdd6e06407b61342a43" +dependencies = [ + "hermit-abi", + "libc", +] + [[package]] name = "number_prefix" version = "0.4.0" @@ -2031,6 +2041,7 @@ dependencies = [ "log", "mimalloc", "notify", + "num_cpus", "path-absolutize", "rayon", "regex", diff --git a/Cargo.toml b/Cargo.toml index d91fe8fc8f..31b4337d24 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -65,6 +65,7 @@ memchr = { version = "2.7.1" } mimalloc = { version = "0.1.39" } natord = { version = "1.0.9" } notify = { version = "6.1.1" } +num_cpus = { version = "1.16.0" } once_cell = { version = "1.19.0" } path-absolutize = { version = "3.1.1" } pathdiff = { version = "0.2.1" } diff --git a/crates/ruff/Cargo.toml b/crates/ruff/Cargo.toml index 4e9880808d..3a75eabaf3 100644 --- a/crates/ruff/Cargo.toml +++ b/crates/ruff/Cargo.toml @@ -41,6 +41,7 @@ is-macro = { workspace = true } itertools = { workspace = true } log = { workspace = true } notify = { workspace = true } +num_cpus = { workspace = true } path-absolutize = { workspace = true, features = ["once_cell_cache"] } rayon = { workspace = true } regex = { workspace = true } diff --git a/crates/ruff/src/args.rs b/crates/ruff/src/args.rs index c4f9f7a247..ed98a999af 100644 --- a/crates/ruff/src/args.rs +++ b/crates/ruff/src/args.rs @@ -496,7 +496,7 @@ pub struct FormatCommand { pub range: Option, } -#[derive(Clone, Debug, clap::Parser)] +#[derive(Copy, Clone, Debug, clap::Parser)] pub struct ServerCommand { /// Enable preview mode; required for regular operation #[arg(long)] diff --git a/crates/ruff/src/commands/server.rs b/crates/ruff/src/commands/server.rs index e6b9c5e4db..bb7b3efe90 100644 --- a/crates/ruff/src/commands/server.rs +++ b/crates/ruff/src/commands/server.rs @@ -1,3 +1,5 @@ +use std::num::NonZeroUsize; + use crate::ExitStatus; use anyhow::Result; use ruff_linter::logging::LogLevel; @@ -9,7 +11,11 @@ use tracing_subscriber::{ }; use tracing_tree::time::Uptime; -pub(crate) fn run_server(preview: bool, log_level: LogLevel) -> Result { +pub(crate) fn run_server( + preview: bool, + worker_threads: NonZeroUsize, + log_level: LogLevel, +) -> Result { if !preview { tracing::error!("--preview needs to be provided as a command line argument while the server is still unstable.\nFor example: `ruff server --preview`"); return Ok(ExitStatus::Error); @@ -33,7 +39,7 @@ pub(crate) fn run_server(preview: bool, log_level: LogLevel) -> Result Result Result { let ServerCommand { preview } = args; - commands::server::run_server(preview, log_level) + // by default, we set the number of worker threads to `num_cpus`, with a maximum of 4. + let worker_threads = num_cpus::get().max(4); + commands::server::run_server( + preview, + NonZeroUsize::try_from(worker_threads).expect("a non-zero worker thread count"), + log_level, + ) } pub fn check(args: CheckCommand, global_options: GlobalConfigArgs) -> Result { diff --git a/crates/ruff_server/src/server.rs b/crates/ruff_server/src/server.rs index a934471701..ae1f9a20ed 100644 --- a/crates/ruff_server/src/server.rs +++ b/crates/ruff_server/src/server.rs @@ -1,5 +1,7 @@ //! Scheduling, I/O, and API endpoints. +use std::num::NonZeroUsize; + use lsp::Connection; use lsp_server as lsp; use lsp_types as types; @@ -27,11 +29,12 @@ pub(crate) type Result = std::result::Result; pub struct Server { conn: lsp::Connection, threads: lsp::IoThreads, + worker_threads: NonZeroUsize, session: Session, } impl Server { - pub fn new() -> crate::Result { + pub fn new(worker_threads: NonZeroUsize) -> crate::Result { let (conn, threads) = lsp::Connection::stdio(); let (id, params) = conn.initialize_start()?; @@ -66,19 +69,27 @@ impl Server { Ok(Self { conn, threads, + worker_threads, session: Session::new(&server_capabilities, &workspaces)?, }) } pub fn run(self) -> crate::Result<()> { - let result = event_loop_thread(move || Self::event_loop(&self.conn, self.session))?.join(); + let result = event_loop_thread(move || { + Self::event_loop(&self.conn, self.session, self.worker_threads) + })? + .join(); self.threads.join()?; result } - fn event_loop(connection: &Connection, session: Session) -> crate::Result<()> { + fn event_loop( + connection: &Connection, + session: Session, + worker_threads: NonZeroUsize, + ) -> crate::Result<()> { // TODO(jane): Make thread count configurable - let mut scheduler = schedule::Scheduler::new(session, 4, &connection.sender); + let mut scheduler = schedule::Scheduler::new(session, worker_threads, &connection.sender); for msg in &connection.receiver { let task = match msg { lsp::Message::Request(req) => { diff --git a/crates/ruff_server/src/server/schedule.rs b/crates/ruff_server/src/server/schedule.rs index fd2e59582b..00368a411f 100644 --- a/crates/ruff_server/src/server/schedule.rs +++ b/crates/ruff_server/src/server/schedule.rs @@ -1,3 +1,5 @@ +use std::num::NonZeroUsize; + use crossbeam::channel::Sender; use crate::session::Session; @@ -42,13 +44,14 @@ pub(crate) struct Scheduler { impl Scheduler { pub(super) fn new( session: Session, - thread_count: usize, + worker_threads: NonZeroUsize, sender: &Sender, ) -> Self { + const FMT_THREADS: usize = 1; Self { session, - fmt_pool: thread::Pool::new(1), - background_pool: thread::Pool::new(thread_count), + fmt_pool: thread::Pool::new(NonZeroUsize::try_from(FMT_THREADS).unwrap()), + background_pool: thread::Pool::new(worker_threads), client: Client::new(sender), } } diff --git a/crates/ruff_server/src/server/schedule/thread/pool.rs b/crates/ruff_server/src/server/schedule/thread/pool.rs index 9a69ce367e..7d1f9a418f 100644 --- a/crates/ruff_server/src/server/schedule/thread/pool.rs +++ b/crates/ruff_server/src/server/schedule/thread/pool.rs @@ -13,9 +13,12 @@ //! The thread pool is implemented entirely using //! the threading utilities in [`crate::server::schedule::thread`]. -use std::sync::{ - atomic::{AtomicUsize, Ordering}, - Arc, +use std::{ + num::NonZeroUsize, + sync::{ + atomic::{AtomicUsize, Ordering}, + Arc, + }, }; use crossbeam::channel::{Receiver, Sender}; @@ -41,12 +44,15 @@ struct Job { } impl Pool { - pub(crate) fn new(threads: usize) -> Pool { + pub(crate) fn new(threads: NonZeroUsize) -> Pool { // Override OS defaults to avoid stack overflows on platforms with low stack size defaults. const STACK_SIZE: usize = 2 * 1024 * 1024; const INITIAL_PRIORITY: ThreadPriority = ThreadPriority::Worker; - let (job_sender, job_receiver) = crossbeam::channel::bounded(threads); + let threads = usize::from(threads); + + // Channel buffer capacity is between 2 and 4, depending on the pool size. + let (job_sender, job_receiver) = crossbeam::channel::bounded(std::cmp::min(threads * 2, 4)); let extant_tasks = Arc::new(AtomicUsize::new(0)); let mut handles = Vec::with_capacity(threads);