From 015222900f16ae3100b3415e1b6be44aed867e4b Mon Sep 17 00:00:00 2001 From: Micha Reiser Date: Thu, 12 Jun 2025 22:08:42 +0200 Subject: [PATCH] Support cancellation requests (#18627) --- crates/ruff/src/commands/server.rs | 13 +- crates/ruff/src/lib.rs | 9 +- crates/ruff_server/src/lib.rs | 42 ++- crates/ruff_server/src/message.rs | 54 ---- crates/ruff_server/src/server.rs | 265 ++++++--------- crates/ruff_server/src/server/api.rs | 304 +++++++++++++----- .../ruff_server/src/server/api/diagnostics.rs | 15 +- .../src/server/api/notifications.rs | 3 +- .../src/server/api/notifications/cancel.rs | 33 +- .../server/api/notifications/did_change.rs | 8 +- .../notifications/did_change_configuration.rs | 6 +- .../api/notifications/did_change_notebook.rs | 8 +- .../notifications/did_change_watched_files.rs | 21 +- .../api/notifications/did_change_workspace.rs | 8 +- .../src/server/api/notifications/did_close.rs | 8 +- .../api/notifications/did_close_notebook.rs | 6 +- .../src/server/api/notifications/did_open.rs | 8 +- .../api/notifications/did_open_notebook.rs | 8 +- crates/ruff_server/src/server/api/requests.rs | 2 + .../src/server/api/requests/code_action.rs | 6 +- .../api/requests/code_action_resolve.rs | 5 +- .../src/server/api/requests/diagnostic.rs | 4 +- .../server/api/requests/execute_command.rs | 24 +- .../src/server/api/requests/format.rs | 6 +- .../src/server/api/requests/format_range.rs | 6 +- .../src/server/api/requests/hover.rs | 6 +- .../src/server/api/requests/shutdown.rs | 17 + crates/ruff_server/src/server/api/traits.rs | 13 +- crates/ruff_server/src/server/client.rs | 169 ---------- crates/ruff_server/src/server/connection.rs | 129 +------- crates/ruff_server/src/server/main_loop.rs | 209 ++++++++++++ crates/ruff_server/src/server/schedule.rs | 56 +--- .../ruff_server/src/server/schedule/task.rs | 39 +-- .../src/server/schedule/thread/pool.rs | 22 +- crates/ruff_server/src/session.rs | 40 ++- crates/ruff_server/src/session/client.rs | 248 ++++++++++++++ crates/ruff_server/src/session/index.rs | 20 +- .../src/session/index/ruff_settings.rs | 10 +- crates/ruff_server/src/session/options.rs | 30 +- .../ruff_server/src/session/request_queue.rs | 198 ++++++++++++ crates/ruff_server/src/session/settings.rs | 9 +- crates/ruff_server/tests/notebook.rs | 13 +- crates/ty_server/src/server/api.rs | 2 +- crates/ty_server/src/server/main_loop.rs | 5 +- .../src/server/schedule/thread/pool.rs | 2 +- crates/ty_server/src/session/client.rs | 72 ++--- 46 files changed, 1324 insertions(+), 857 deletions(-) delete mode 100644 crates/ruff_server/src/message.rs create mode 100644 crates/ruff_server/src/server/api/requests/shutdown.rs delete mode 100644 crates/ruff_server/src/server/client.rs create mode 100644 crates/ruff_server/src/server/main_loop.rs create mode 100644 crates/ruff_server/src/session/client.rs create mode 100644 crates/ruff_server/src/session/request_queue.rs diff --git a/crates/ruff/src/commands/server.rs b/crates/ruff/src/commands/server.rs index 817269bc7e..837662e7d3 100644 --- a/crates/ruff/src/commands/server.rs +++ b/crates/ruff/src/commands/server.rs @@ -1,14 +1,7 @@ -use std::num::NonZeroUsize; - use crate::ExitStatus; use anyhow::Result; -use ruff_server::Server; -pub(crate) fn run_server( - worker_threads: NonZeroUsize, - preview: Option, -) -> Result { - let server = Server::new(worker_threads, preview)?; - - server.run().map(|()| ExitStatus::Success) +pub(crate) fn run_server(preview: Option) -> Result { + ruff_server::run(preview)?; + Ok(ExitStatus::Success) } diff --git a/crates/ruff/src/lib.rs b/crates/ruff/src/lib.rs index ef8f639746..bb4d8e02ab 100644 --- a/crates/ruff/src/lib.rs +++ b/crates/ruff/src/lib.rs @@ -2,7 +2,6 @@ use std::fs::File; use std::io::{self, BufWriter, Write, stdout}; -use std::num::NonZeroUsize; use std::path::{Path, PathBuf}; use std::process::ExitCode; use std::sync::mpsc::channel; @@ -223,13 +222,7 @@ fn analyze_graph( } fn server(args: ServerCommand) -> Result { - let four = NonZeroUsize::new(4).unwrap(); - - // by default, we set the number of worker threads to `num_cpus`, with a maximum of 4. - let worker_threads = std::thread::available_parallelism() - .unwrap_or(four) - .min(four); - commands::server::run_server(worker_threads, args.resolve_preview()) + commands::server::run_server(args.resolve_preview()) } pub fn check(args: CheckCommand, global_options: GlobalConfigArgs) -> Result { diff --git a/crates/ruff_server/src/lib.rs b/crates/ruff_server/src/lib.rs index ca4ee50ab8..784538a23e 100644 --- a/crates/ruff_server/src/lib.rs +++ b/crates/ruff_server/src/lib.rs @@ -1,13 +1,15 @@ //! ## The Ruff Language Server +use std::num::NonZeroUsize; + +use anyhow::Context as _; pub use edit::{DocumentKey, NotebookDocument, PositionEncoding, TextDocument}; use lsp_types::CodeActionKind; -pub use server::Server; -pub use session::{ClientOptions, DocumentQuery, DocumentSnapshot, GlobalOptions, Session}; +pub use server::{ConnectionSender, MainLoopSender, Server}; +pub use session::{Client, ClientOptions, DocumentQuery, DocumentSnapshot, GlobalOptions, Session}; pub use workspace::{Workspace, Workspaces}; -#[macro_use] -mod message; +use crate::server::ConnectionInitializer; mod edit; mod fix; @@ -37,3 +39,35 @@ pub(crate) type Result = anyhow::Result; pub(crate) fn version() -> &'static str { ruff_linter::VERSION } + +pub fn run(preview: Option) -> Result<()> { + let four = NonZeroUsize::new(4).unwrap(); + + // by default, we set the number of worker threads to `num_cpus`, with a maximum of 4. + let worker_threads = std::thread::available_parallelism() + .unwrap_or(four) + .min(four); + + let (connection, io_threads) = ConnectionInitializer::stdio(); + + let server_result = Server::new(worker_threads, connection, preview) + .context("Failed to start server")? + .run(); + + let io_result = io_threads.join(); + + let result = match (server_result, io_result) { + (Ok(()), Ok(())) => Ok(()), + (Err(server), Err(io)) => Err(server).context(format!("IO thread error: {io}")), + (Err(server), _) => Err(server), + (_, Err(io)) => Err(io).context("IO thread error"), + }; + + if let Err(err) = result.as_ref() { + tracing::warn!("Server shut down with an error: {err}"); + } else { + tracing::info!("Server shut down"); + } + + result +} diff --git a/crates/ruff_server/src/message.rs b/crates/ruff_server/src/message.rs deleted file mode 100644 index 1b007ea38c..0000000000 --- a/crates/ruff_server/src/message.rs +++ /dev/null @@ -1,54 +0,0 @@ -use anyhow::Context; -use lsp_types::notification::Notification; -use std::sync::OnceLock; - -use crate::server::ClientSender; - -static MESSENGER: OnceLock = OnceLock::new(); - -pub(crate) fn init_messenger(client_sender: ClientSender) { - MESSENGER - .set(client_sender) - .expect("messenger should only be initialized once"); -} - -pub(crate) fn show_message(message: String, message_type: lsp_types::MessageType) { - try_show_message(message, message_type).unwrap(); -} - -pub(super) fn try_show_message( - message: String, - message_type: lsp_types::MessageType, -) -> crate::Result<()> { - MESSENGER - .get() - .ok_or_else(|| anyhow::anyhow!("messenger not initialized"))? - .send(lsp_server::Message::Notification( - lsp_server::Notification { - method: lsp_types::notification::ShowMessage::METHOD.into(), - params: serde_json::to_value(lsp_types::ShowMessageParams { - typ: message_type, - message, - })?, - }, - )) - .context("Failed to send message")?; - - Ok(()) -} - -/// Sends a request to display an error to the client with a formatted message. The error is sent -/// in a `window/showMessage` notification. -macro_rules! show_err_msg { - ($msg:expr$(, $($arg:tt)*)?) => { - crate::message::show_message(::core::format_args!($msg$(, $($arg)*)?).to_string(), lsp_types::MessageType::ERROR) - }; -} - -/// Sends a request to display a warning to the client with a formatted message. The warning is -/// sent in a `window/showMessage` notification. -macro_rules! show_warn_msg { - ($msg:expr$(, $($arg:tt)*)?) => { - crate::message::show_message(::core::format_args!($msg$(, $($arg)*)?).to_string(), lsp_types::MessageType::WARNING) - }; -} diff --git a/crates/ruff_server/src/server.rs b/crates/ruff_server/src/server.rs index 9e9c462f3f..19e0d75a23 100644 --- a/crates/ruff_server/src/server.rs +++ b/crates/ruff_server/src/server.rs @@ -1,19 +1,17 @@ //! Scheduling, I/O, and API endpoints. -use lsp_server as lsp; +use lsp_server::Connection; use lsp_types as types; use lsp_types::InitializeParams; +use lsp_types::MessageType; use std::num::NonZeroUsize; -// The new PanicInfoHook name requires MSRV >= 1.82 -#[expect(deprecated)] -use std::panic::PanicInfo; +use std::panic::PanicHookInfo; use std::str::FromStr; +use std::sync::Arc; use types::ClientCapabilities; use types::CodeActionKind; use types::CodeActionOptions; use types::DiagnosticOptions; -use types::DidChangeWatchedFilesRegistrationOptions; -use types::FileSystemWatcher; use types::NotebookCellSelector; use types::NotebookDocumentSyncOptions; use types::NotebookSelector; @@ -24,37 +22,38 @@ use types::TextDocumentSyncOptions; use types::WorkDoneProgressOptions; use types::WorkspaceFoldersServerCapabilities; -use self::connection::Connection; -use self::connection::ConnectionInitializer; -use self::schedule::Scheduler; -use self::schedule::Task; -use self::schedule::event_loop_thread; +pub(crate) use self::connection::ConnectionInitializer; +pub use self::connection::ConnectionSender; +use self::schedule::spawn_main_loop; use crate::PositionEncoding; -use crate::session::AllOptions; -use crate::session::Session; +pub use crate::server::main_loop::MainLoopSender; +pub(crate) use crate::server::main_loop::{Event, MainLoopReceiver}; +use crate::session::{AllOptions, Client, Session}; use crate::workspace::Workspaces; +pub(crate) use api::Error; mod api; -mod client; mod connection; +mod main_loop; mod schedule; -use crate::message::try_show_message; -pub(crate) use connection::ClientSender; - pub(crate) type Result = std::result::Result; pub struct Server { connection: Connection, client_capabilities: ClientCapabilities, worker_threads: NonZeroUsize, + main_loop_receiver: MainLoopReceiver, + main_loop_sender: MainLoopSender, session: Session, } impl Server { - pub fn new(worker_threads: NonZeroUsize, preview: Option) -> crate::Result { - let connection = ConnectionInitializer::stdio(); - + pub(crate) fn new( + worker_threads: NonZeroUsize, + connection: ConnectionInitializer, + preview: Option, + ) -> crate::Result { let (id, init_params) = connection.initialize_start()?; let client_capabilities = init_params.capabilities; @@ -69,7 +68,7 @@ impl Server { crate::version(), )?; - crate::message::init_messenger(connection.make_sender()); + let (main_loop_sender, main_loop_receiver) = crossbeam::channel::bounded(32); let InitializeParams { initialization_options, @@ -77,13 +76,17 @@ impl Server { .. } = init_params; + let client = Client::new(main_loop_sender.clone(), connection.sender.clone()); let mut all_options = AllOptions::from_value( initialization_options .unwrap_or_else(|| serde_json::Value::Object(serde_json::Map::default())), + &client, ); + if let Some(preview) = preview { all_options.set_preview(preview); } + let AllOptions { global: global_options, workspace: workspace_options, @@ -101,159 +104,33 @@ impl Server { tracing::debug!("Negotiated position encoding: {position_encoding:?}"); - let global = global_options.into_settings(); + let global = global_options.into_settings(client.clone()); Ok(Self { connection, worker_threads, - session: Session::new(&client_capabilities, position_encoding, global, &workspaces)?, + main_loop_sender, + main_loop_receiver, + session: Session::new( + &client_capabilities, + position_encoding, + global, + &workspaces, + &client, + )?, client_capabilities, }) } - pub fn run(self) -> crate::Result<()> { - // The new PanicInfoHook name requires MSRV >= 1.82 - #[expect(deprecated)] - type PanicHook = Box) + 'static + Sync + Send>; - struct RestorePanicHook { - hook: Option, - } + pub fn run(mut self) -> crate::Result<()> { + let client = Client::new( + self.main_loop_sender.clone(), + self.connection.sender.clone(), + ); - impl Drop for RestorePanicHook { - fn drop(&mut self) { - if let Some(hook) = self.hook.take() { - std::panic::set_hook(hook); - } - } - } + let _panic_hook = ServerPanicHookHandler::new(client); - // unregister any previously registered panic hook - // The hook will be restored when this function exits. - let _ = RestorePanicHook { - hook: Some(std::panic::take_hook()), - }; - - // When we panic, try to notify the client. - std::panic::set_hook(Box::new(move |panic_info| { - use std::io::Write; - - let backtrace = std::backtrace::Backtrace::force_capture(); - tracing::error!("{panic_info}\n{backtrace}"); - - // we also need to print to stderr directly for when using `$logTrace` because - // the message won't be sent to the client. - // But don't use `eprintln` because `eprintln` itself may panic if the pipe is broken. - let mut stderr = std::io::stderr().lock(); - writeln!(stderr, "{panic_info}\n{backtrace}").ok(); - - try_show_message( - "The Ruff language server exited with a panic. See the logs for more details." - .to_string(), - lsp_types::MessageType::ERROR, - ) - .ok(); - })); - - event_loop_thread(move || { - Self::event_loop( - &self.connection, - &self.client_capabilities, - self.session, - self.worker_threads, - )?; - self.connection.close()?; - Ok(()) - })? - .join() - } - - fn event_loop( - connection: &Connection, - client_capabilities: &ClientCapabilities, - mut session: Session, - worker_threads: NonZeroUsize, - ) -> crate::Result<()> { - let mut scheduler = - schedule::Scheduler::new(&mut session, worker_threads, connection.make_sender()); - - Self::try_register_capabilities(client_capabilities, &mut scheduler); - for msg in connection.incoming() { - if connection.handle_shutdown(&msg)? { - break; - } - let task = match msg { - lsp::Message::Request(req) => api::request(req), - lsp::Message::Notification(notification) => api::notification(notification), - lsp::Message::Response(response) => scheduler.response(response), - }; - scheduler.dispatch(task); - } - - Ok(()) - } - - fn try_register_capabilities( - client_capabilities: &ClientCapabilities, - scheduler: &mut Scheduler, - ) { - let dynamic_registration = client_capabilities - .workspace - .as_ref() - .and_then(|workspace| workspace.did_change_watched_files) - .and_then(|watched_files| watched_files.dynamic_registration) - .unwrap_or_default(); - if dynamic_registration { - // Register all dynamic capabilities here - - // `workspace/didChangeWatchedFiles` - // (this registers the configuration file watcher) - let params = lsp_types::RegistrationParams { - registrations: vec![lsp_types::Registration { - id: "ruff-server-watch".into(), - method: "workspace/didChangeWatchedFiles".into(), - register_options: Some( - serde_json::to_value(DidChangeWatchedFilesRegistrationOptions { - watchers: vec![ - FileSystemWatcher { - glob_pattern: types::GlobPattern::String( - "**/.ruff.toml".into(), - ), - kind: None, - }, - FileSystemWatcher { - glob_pattern: types::GlobPattern::String("**/ruff.toml".into()), - kind: None, - }, - FileSystemWatcher { - glob_pattern: types::GlobPattern::String( - "**/pyproject.toml".into(), - ), - kind: None, - }, - ], - }) - .unwrap(), - ), - }], - }; - - let response_handler = |()| { - tracing::info!("Configuration file watcher successfully registered"); - Task::nothing() - }; - - if let Err(err) = scheduler - .request::(params, response_handler) - { - tracing::error!( - "An error occurred when trying to register the configuration file watcher: {err}" - ); - } - } else { - tracing::warn!( - "LSP client does not support dynamic capability registration - automatic configuration reloading will not be available." - ); - } + spawn_main_loop(move || self.main_loop())?.join() } fn find_best_position_encoding(client_capabilities: &ClientCapabilities) -> PositionEncoding { @@ -445,3 +322,63 @@ impl FromStr for SupportedCommand { }) } } + +type PanicHook = Box) + 'static + Sync + Send>; + +struct ServerPanicHookHandler { + hook: Option, + // Hold on to the strong reference for as long as the panic hook is set. + _client: Arc, +} + +impl ServerPanicHookHandler { + fn new(client: Client) -> Self { + let hook = std::panic::take_hook(); + let client = Arc::new(client); + + // Use a weak reference to the client because it must be dropped when exiting or the + // io-threads join hangs forever (because client has a reference to the connection sender). + let hook_client = Arc::downgrade(&client); + + // When we panic, try to notify the client. + std::panic::set_hook(Box::new(move |panic_info| { + use std::io::Write; + + let backtrace = std::backtrace::Backtrace::force_capture(); + tracing::error!("{panic_info}\n{backtrace}"); + + // we also need to print to stderr directly for when using `$logTrace` because + // the message won't be sent to the client. + // But don't use `eprintln` because `eprintln` itself may panic if the pipe is broken. + let mut stderr = std::io::stderr().lock(); + writeln!(stderr, "{panic_info}\n{backtrace}").ok(); + + if let Some(client) = hook_client.upgrade() { + client + .show_message( + "The Ruff language server exited with a panic. See the logs for more details.", + MessageType::ERROR, + ) + .ok(); + } + })); + + Self { + hook: Some(hook), + _client: client, + } + } +} + +impl Drop for ServerPanicHookHandler { + fn drop(&mut self) { + if std::thread::panicking() { + // Calling `std::panic::set_hook` while panicking results in a panic. + return; + } + + if let Some(hook) = self.hook.take() { + std::panic::set_hook(hook); + } + } +} diff --git a/crates/ruff_server/src/server/api.rs b/crates/ruff_server/src/server/api.rs index aa419c6348..42c73528cf 100644 --- a/crates/ruff_server/src/server/api.rs +++ b/crates/ruff_server/src/server/api.rs @@ -1,17 +1,30 @@ -use crate::{server::schedule::Task, session::Session}; -use lsp_server as server; +use std::panic::UnwindSafe; + +use anyhow::anyhow; +use lsp_server::{self as server, RequestId}; +use lsp_types::{notification::Notification, request::Request}; +use notifications as notification; +use requests as request; + +use crate::{ + server::{ + api::traits::{ + BackgroundDocumentNotificationHandler, BackgroundDocumentRequestHandler, + SyncNotificationHandler, + }, + schedule::Task, + }, + session::{Client, Session}, +}; mod diagnostics; mod notifications; mod requests; mod traits; -use notifications as notification; -use requests as request; - use self::traits::{NotificationHandler, RequestHandler}; -use super::{Result, client::Responder, schedule::BackgroundSchedule}; +use super::{Result, schedule::BackgroundSchedule}; /// Defines the `document_url` method for implementers of [`traits::Notification`] and [`traits::Request`], /// given the parameter type used by the implementer. @@ -25,7 +38,13 @@ macro_rules! define_document_url { use define_document_url; -pub(super) fn request<'a>(req: server::Request) -> Task<'a> { +/// Processes a request from the client to the server. +/// +/// The LSP specification requires that each request has exactly one response. Therefore, +/// it's crucial that all paths in this method call [`Client::respond`] exactly once. +/// The only exception to this is requests that were cancelled by the client. In this case, +/// the response was already sent by the [`notification::CancelNotificationHandler`]. +pub(super) fn request(req: server::Request) -> Task { let id = req.id.clone(); match req.method.as_str() { @@ -38,7 +57,7 @@ pub(super) fn request<'a>(req: server::Request) -> Task<'a> { request::DocumentDiagnostic::METHOD => { background_request_task::(req, BackgroundSchedule::Worker) } - request::ExecuteCommand::METHOD => local_request_task::(req), + request::ExecuteCommand::METHOD => sync_request_task::(req), request::Format::METHOD => { background_request_task::(req, BackgroundSchedule::Fmt) } @@ -48,46 +67,67 @@ pub(super) fn request<'a>(req: server::Request) -> Task<'a> { request::Hover::METHOD => { background_request_task::(req, BackgroundSchedule::Worker) } + lsp_types::request::Shutdown::METHOD => sync_request_task::(req), method => { tracing::warn!("Received request {method} which does not have a handler"); - return Task::nothing(); + let result: Result<()> = Err(Error::new( + anyhow!("Unknown request: {method}"), + server::ErrorCode::MethodNotFound, + )); + return Task::immediate(id, result); } } .unwrap_or_else(|err| { tracing::error!("Encountered error when routing request with ID {id}: {err}"); - show_err_msg!( - "Ruff failed to handle a request from the editor. Check the logs for more details." - ); - let result: Result<()> = Err(err); - Task::immediate(id, result) + + Task::sync(move |_session, client| { + client.show_error_message( + "Ruff failed to handle a request from the editor. Check the logs for more details.", + ); + respond_silent_error( + id, + client, + lsp_server::ResponseError { + code: err.code as i32, + message: err.to_string(), + data: None, + }, + ); + }) }) } -pub(super) fn notification<'a>(notif: server::Notification) -> Task<'a> { +pub(super) fn notification(notif: server::Notification) -> Task { match notif.method.as_str() { - notification::Cancel::METHOD => local_notification_task::(notif), notification::DidChange::METHOD => { - local_notification_task::(notif) + sync_notification_task::(notif) } notification::DidChangeConfiguration::METHOD => { - local_notification_task::(notif) + sync_notification_task::(notif) } notification::DidChangeWatchedFiles::METHOD => { - local_notification_task::(notif) + sync_notification_task::(notif) } notification::DidChangeWorkspace::METHOD => { - local_notification_task::(notif) + sync_notification_task::(notif) } - notification::DidClose::METHOD => local_notification_task::(notif), - notification::DidOpen::METHOD => local_notification_task::(notif), + notification::DidClose::METHOD => sync_notification_task::(notif), + notification::DidOpen::METHOD => sync_notification_task::(notif), notification::DidOpenNotebook::METHOD => { - local_notification_task::(notif) + sync_notification_task::(notif) } notification::DidChangeNotebook::METHOD => { - local_notification_task::(notif) + sync_notification_task::(notif) } notification::DidCloseNotebook::METHOD => { - local_notification_task::(notif) + sync_notification_task::(notif) + } + lsp_types::notification::Cancel::METHOD => { + sync_notification_task::(notif) + } + lsp_types::notification::SetTrace::METHOD => { + tracing::trace!("Ignoring `setTrace` notification"); + return Task::nothing(); } method => { tracing::warn!("Received notification {method} which does not have a handler."); @@ -96,71 +136,158 @@ pub(super) fn notification<'a>(notif: server::Notification) -> Task<'a> { } .unwrap_or_else(|err| { tracing::error!("Encountered error when routing notification: {err}"); - show_err_msg!( - "Ruff failed to handle a notification from the editor. Check the logs for more details." - ); - Task::nothing() + Task::sync(|_session, client| { + client.show_error_message( + "Ruff failed to handle a notification from the editor. Check the logs for more details." + ); + }) }) } -fn local_request_task<'a, R: traits::SyncRequestHandler>( - req: server::Request, -) -> super::Result> { +fn sync_request_task(req: server::Request) -> Result +where + <::RequestType as Request>::Params: UnwindSafe, +{ let (id, params) = cast_request::(req)?; - Ok(Task::local(|session, notifier, requester, responder| { - let _span = tracing::trace_span!("request", %id, method = R::METHOD).entered(); - let result = R::run(session, notifier, requester, params); - respond::(id, result, &responder); + Ok(Task::sync(move |session, client: &Client| { + let _span = tracing::debug_span!("request", %id, method = R::METHOD).entered(); + let result = R::run(session, client, params); + respond::(&id, result, client); })) } -fn background_request_task<'a, R: traits::BackgroundDocumentRequestHandler>( +fn background_request_task( req: server::Request, schedule: BackgroundSchedule, -) -> super::Result> { +) -> Result +where + <::RequestType as Request>::Params: UnwindSafe, +{ let (id, params) = cast_request::(req)?; + Ok(Task::background(schedule, move |session: &Session| { - // TODO(jane): we should log an error if we can't take a snapshot. + let cancellation_token = session + .request_queue() + .incoming() + .cancellation_token(&id) + .expect("request should have been tested for cancellation before scheduling"); + + let url = R::document_url(¶ms).into_owned(); + let Some(snapshot) = session.take_snapshot(R::document_url(¶ms).into_owned()) else { - return Box::new(|_, _| {}); + tracing::warn!("Ignoring request because snapshot for path `{url:?}` doesn't exist."); + return Box::new(|_| {}); }; - Box::new(move |notifier, responder| { - let _span = tracing::trace_span!("request", %id, method = R::METHOD).entered(); - let result = R::run_with_snapshot(snapshot, notifier, params); - respond::(id, result, &responder); + + Box::new(move |client| { + let _span = tracing::debug_span!("request", %id, method = R::METHOD).entered(); + + // Test again if the request was cancelled since it was scheduled on the background task + // and, if so, return early + if cancellation_token.is_cancelled() { + tracing::trace!( + "Ignoring request id={id} method={} because it was cancelled", + R::METHOD + ); + + // We don't need to send a response here because the `cancel` notification + // handler already responded with a message. + return; + } + + let result = + std::panic::catch_unwind(|| R::run_with_snapshot(snapshot, client, params)); + + let response = request_result_to_response::(result); + respond::(&id, response, client); }) })) } -fn local_notification_task<'a, N: traits::SyncNotificationHandler>( - notif: server::Notification, -) -> super::Result> { +fn request_result_to_response( + result: std::result::Result< + Result<<::RequestType as Request>::Result>, + Box, + >, +) -> Result<<::RequestType as Request>::Result> +where + R: BackgroundDocumentRequestHandler, +{ + match result { + Ok(response) => response, + + Err(error) => { + let message = if let Some(panic_message) = panic_message(&error) { + format!("Request handler failed with: {panic_message}") + } else { + "Request handler failed".into() + }; + + Err(Error { + code: lsp_server::ErrorCode::InternalError, + error: anyhow!(message), + }) + } + } +} + +fn sync_notification_task(notif: server::Notification) -> Result { let (id, params) = cast_notification::(notif)?; - Ok(Task::local(move |session, notifier, requester, _| { - let _span = tracing::trace_span!("notification", method = N::METHOD).entered(); - if let Err(err) = N::run(session, notifier, requester, params) { + Ok(Task::sync(move |session, client| { + let _span = tracing::debug_span!("notification", method = N::METHOD).entered(); + if let Err(err) = N::run(session, client, params) { tracing::error!("An error occurred while running {id}: {err}"); - show_err_msg!("Ruff encountered a problem. Check the logs for more details."); + client + .show_error_message("Ruff encountered a problem. Check the logs for more details."); } })) } #[expect(dead_code)] -fn background_notification_thread<'a, N: traits::BackgroundDocumentNotificationHandler>( +fn background_notification_thread( req: server::Notification, schedule: BackgroundSchedule, -) -> super::Result> { +) -> Result +where + N: BackgroundDocumentNotificationHandler, + <::NotificationType as Notification>::Params: UnwindSafe, +{ let (id, params) = cast_notification::(req)?; Ok(Task::background(schedule, move |session: &Session| { - // TODO(jane): we should log an error if we can't take a snapshot. - let Some(snapshot) = session.take_snapshot(N::document_url(¶ms).into_owned()) else { - return Box::new(|_, _| {}); + let url = N::document_url(¶ms); + + let Some(snapshot) = session.take_snapshot((*url).clone()) else { + tracing::debug!( + "Ignoring notification because snapshot for url `{url}` doesn't exist." + ); + return Box::new(|_| {}); }; - Box::new(move |notifier, _| { - let _span = tracing::trace_span!("notification", method = N::METHOD).entered(); - if let Err(err) = N::run_with_snapshot(snapshot, notifier, params) { + Box::new(move |client| { + let _span = tracing::debug_span!("notification", method = N::METHOD).entered(); + + let result = + match std::panic::catch_unwind(|| N::run_with_snapshot(snapshot, client, params)) { + Ok(result) => result, + Err(panic) => { + let message = if let Some(panic_message) = panic_message(&panic) { + format!("notification handler for {id} failed with: {panic_message}") + } else { + format!("notification handler for {id} failed") + }; + + tracing::error!(message); + client.show_error_message( + "Ruff encountered a panic. Check the logs for more details.", + ); + return; + } + }; + + if let Err(err) = result { tracing::error!("An error occurred while running {id}: {err}"); - show_err_msg!("Ruff encountered a problem. Check the logs for more details."); + client.show_error_message( + "Ruff encountered a problem. Check the logs for more details.", + ); } }) })) @@ -172,12 +299,13 @@ fn background_notification_thread<'a, N: traits::BackgroundDocumentNotificationH /// implementation. fn cast_request( request: server::Request, -) -> super::Result<( - server::RequestId, - <::RequestType as lsp_types::request::Request>::Params, +) -> Result<( + RequestId, + <::RequestType as Request>::Params, )> where - Req: traits::RequestHandler, + Req: RequestHandler, + <::RequestType as Request>::Params: UnwindSafe, { request .extract(Req::METHOD) @@ -193,21 +321,27 @@ where .with_failure_code(server::ErrorCode::InternalError) } -/// Sends back a response to the server using a [`Responder`]. +/// Sends back a response to the server, but only if the request wasn't cancelled. fn respond( - id: server::RequestId, - result: crate::server::Result< - <::RequestType as lsp_types::request::Request>::Result, - >, - responder: &Responder, + id: &RequestId, + result: Result<<::RequestType as Request>::Result>, + client: &Client, ) where - Req: traits::RequestHandler, + Req: RequestHandler, { if let Err(err) = &result { tracing::error!("An error occurred with request ID {id}: {err}"); - show_err_msg!("Ruff encountered a problem. Check the logs for more details."); + client.show_error_message("Ruff encountered a problem. Check the logs for more details."); } - if let Err(err) = responder.respond(id, result) { + if let Err(err) = client.respond(id, result) { + tracing::error!("Failed to send response: {err}"); + } +} + +/// Sends back an error response to the server using a [`Client`] without showing a warning +/// to the user. +fn respond_silent_error(id: RequestId, client: &Client, error: lsp_server::ResponseError) { + if let Err(err) = client.respond_err(id, error) { tracing::error!("Failed to send response: {err}"); } } @@ -216,11 +350,13 @@ fn respond( /// a parameter type for a specific request handler. fn cast_notification( notification: server::Notification, -) -> super::Result< - ( - &'static str, - <::NotificationType as lsp_types::notification::Notification>::Params, -)> where N: traits::NotificationHandler{ +) -> Result<( + &'static str, + <::NotificationType as Notification>::Params, +)> +where + N: NotificationHandler, +{ Ok(( N::METHOD, notification @@ -273,3 +409,15 @@ impl std::fmt::Display for Error { self.error.fmt(f) } } + +fn panic_message<'a>( + err: &'a Box, +) -> Option> { + if let Some(s) = err.downcast_ref::() { + Some(s.into()) + } else if let Some(&s) = err.downcast_ref::<&str>() { + Some(s.into()) + } else { + None + } +} diff --git a/crates/ruff_server/src/server/api/diagnostics.rs b/crates/ruff_server/src/server/api/diagnostics.rs index 5f0b9f468d..6f8efe47e8 100644 --- a/crates/ruff_server/src/server/api/diagnostics.rs +++ b/crates/ruff_server/src/server/api/diagnostics.rs @@ -1,7 +1,6 @@ use crate::{ lint::DiagnosticsMap, - server::client::Notifier, - session::{DocumentQuery, DocumentSnapshot}, + session::{Client, DocumentQuery, DocumentSnapshot}, }; use super::LSPResult; @@ -21,11 +20,11 @@ pub(super) fn generate_diagnostics(snapshot: &DocumentSnapshot) -> DiagnosticsMa pub(super) fn publish_diagnostics_for_document( snapshot: &DocumentSnapshot, - notifier: &Notifier, + client: &Client, ) -> crate::server::Result<()> { for (uri, diagnostics) in generate_diagnostics(snapshot) { - notifier - .notify::( + client + .send_notification::( lsp_types::PublishDiagnosticsParams { uri, diagnostics, @@ -40,10 +39,10 @@ pub(super) fn publish_diagnostics_for_document( pub(super) fn clear_diagnostics_for_document( query: &DocumentQuery, - notifier: &Notifier, + client: &Client, ) -> crate::server::Result<()> { - notifier - .notify::( + client + .send_notification::( lsp_types::PublishDiagnosticsParams { uri: query.make_key().into_url(), diagnostics: vec![], diff --git a/crates/ruff_server/src/server/api/notifications.rs b/crates/ruff_server/src/server/api/notifications.rs index ade0c2fbd5..d9a473d3fe 100644 --- a/crates/ruff_server/src/server/api/notifications.rs +++ b/crates/ruff_server/src/server/api/notifications.rs @@ -10,7 +10,8 @@ mod did_open; mod did_open_notebook; use super::traits::{NotificationHandler, SyncNotificationHandler}; -pub(super) use cancel::Cancel; + +pub(super) use cancel::CancelNotificationHandler; pub(super) use did_change::DidChange; pub(super) use did_change_configuration::DidChangeConfiguration; pub(super) use did_change_notebook::DidChangeNotebook; diff --git a/crates/ruff_server/src/server/api/notifications/cancel.rs b/crates/ruff_server/src/server/api/notifications/cancel.rs index a88fb30d1a..8555386678 100644 --- a/crates/ruff_server/src/server/api/notifications/cancel.rs +++ b/crates/ruff_server/src/server/api/notifications/cancel.rs @@ -1,23 +1,26 @@ +use lsp_server::RequestId; +use lsp_types::CancelParams; +use lsp_types::notification::Cancel; + use crate::server::Result; -use crate::server::client::{Notifier, Requester}; -use crate::session::Session; -use lsp_types as types; -use lsp_types::notification as notif; +use crate::server::api::traits::{NotificationHandler, SyncNotificationHandler}; +use crate::session::{Client, Session}; -pub(crate) struct Cancel; +pub(crate) struct CancelNotificationHandler; -impl super::NotificationHandler for Cancel { - type NotificationType = notif::Cancel; +impl NotificationHandler for CancelNotificationHandler { + type NotificationType = Cancel; } -impl super::SyncNotificationHandler for Cancel { - fn run( - _session: &mut Session, - _notifier: Notifier, - _requester: &mut Requester, - _params: types::CancelParams, - ) -> Result<()> { - // TODO(jane): Handle this once we have task cancellation in the scheduler. +impl SyncNotificationHandler for CancelNotificationHandler { + fn run(session: &mut Session, client: &Client, params: CancelParams) -> Result<()> { + let id: RequestId = match params.id { + lsp_types::NumberOrString::Number(id) => id.into(), + lsp_types::NumberOrString::String(id) => id.into(), + }; + + let _ = client.cancel(session, id); + Ok(()) } } diff --git a/crates/ruff_server/src/server/api/notifications/did_change.rs b/crates/ruff_server/src/server/api/notifications/did_change.rs index d9b7d7092d..8e77cb593f 100644 --- a/crates/ruff_server/src/server/api/notifications/did_change.rs +++ b/crates/ruff_server/src/server/api/notifications/did_change.rs @@ -1,8 +1,7 @@ use crate::server::Result; use crate::server::api::LSPResult; use crate::server::api::diagnostics::publish_diagnostics_for_document; -use crate::server::client::{Notifier, Requester}; -use crate::session::Session; +use crate::session::{Client, Session}; use lsp_server::ErrorCode; use lsp_types as types; use lsp_types::notification as notif; @@ -16,8 +15,7 @@ impl super::NotificationHandler for DidChange { impl super::SyncNotificationHandler for DidChange { fn run( session: &mut Session, - notifier: Notifier, - _requester: &mut Requester, + client: &Client, types::DidChangeTextDocumentParams { text_document: types::VersionedTextDocumentIdentifier { @@ -36,7 +34,7 @@ impl super::SyncNotificationHandler for DidChange { // Publish diagnostics if the client doesn't support pull diagnostics if !session.resolved_client_capabilities().pull_diagnostics { let snapshot = session.take_snapshot(key.into_url()).unwrap(); - publish_diagnostics_for_document(&snapshot, ¬ifier)?; + publish_diagnostics_for_document(&snapshot, client)?; } Ok(()) diff --git a/crates/ruff_server/src/server/api/notifications/did_change_configuration.rs b/crates/ruff_server/src/server/api/notifications/did_change_configuration.rs index ccfd30ddb0..34982ec656 100644 --- a/crates/ruff_server/src/server/api/notifications/did_change_configuration.rs +++ b/crates/ruff_server/src/server/api/notifications/did_change_configuration.rs @@ -1,6 +1,5 @@ use crate::server::Result; -use crate::server::client::{Notifier, Requester}; -use crate::session::Session; +use crate::session::{Client, Session}; use lsp_types as types; use lsp_types::notification as notif; @@ -13,8 +12,7 @@ impl super::NotificationHandler for DidChangeConfiguration { impl super::SyncNotificationHandler for DidChangeConfiguration { fn run( _session: &mut Session, - _notifier: Notifier, - _requester: &mut Requester, + _client: &Client, _params: types::DidChangeConfigurationParams, ) -> Result<()> { // TODO(jane): get this wired up after the pre-release diff --git a/crates/ruff_server/src/server/api/notifications/did_change_notebook.rs b/crates/ruff_server/src/server/api/notifications/did_change_notebook.rs index d96b4ea97a..d092ccacb8 100644 --- a/crates/ruff_server/src/server/api/notifications/did_change_notebook.rs +++ b/crates/ruff_server/src/server/api/notifications/did_change_notebook.rs @@ -1,8 +1,7 @@ use crate::server::Result; use crate::server::api::LSPResult; use crate::server::api::diagnostics::publish_diagnostics_for_document; -use crate::server::client::{Notifier, Requester}; -use crate::session::Session; +use crate::session::{Client, Session}; use lsp_server::ErrorCode; use lsp_types as types; use lsp_types::notification as notif; @@ -16,8 +15,7 @@ impl super::NotificationHandler for DidChangeNotebook { impl super::SyncNotificationHandler for DidChangeNotebook { fn run( session: &mut Session, - notifier: Notifier, - _requester: &mut Requester, + client: &Client, types::DidChangeNotebookDocumentParams { notebook_document: types::VersionedNotebookDocumentIdentifier { uri, version }, change: types::NotebookDocumentChangeEvent { cells, metadata }, @@ -32,7 +30,7 @@ impl super::SyncNotificationHandler for DidChangeNotebook { let snapshot = session .take_snapshot(key.into_url()) .expect("snapshot should be available"); - publish_diagnostics_for_document(&snapshot, ¬ifier)?; + publish_diagnostics_for_document(&snapshot, client)?; Ok(()) } diff --git a/crates/ruff_server/src/server/api/notifications/did_change_watched_files.rs b/crates/ruff_server/src/server/api/notifications/did_change_watched_files.rs index a05ee1f272..bc97231411 100644 --- a/crates/ruff_server/src/server/api/notifications/did_change_watched_files.rs +++ b/crates/ruff_server/src/server/api/notifications/did_change_watched_files.rs @@ -1,9 +1,7 @@ use crate::server::Result; use crate::server::api::LSPResult; use crate::server::api::diagnostics::publish_diagnostics_for_document; -use crate::server::client::{Notifier, Requester}; -use crate::server::schedule::Task; -use crate::session::Session; +use crate::session::{Client, Session}; use lsp_types as types; use lsp_types::notification as notif; @@ -16,16 +14,19 @@ impl super::NotificationHandler for DidChangeWatchedFiles { impl super::SyncNotificationHandler for DidChangeWatchedFiles { fn run( session: &mut Session, - notifier: Notifier, - requester: &mut Requester, + client: &Client, params: types::DidChangeWatchedFilesParams, ) -> Result<()> { - session.reload_settings(¶ms.changes); + session.reload_settings(¶ms.changes, client); if !params.changes.is_empty() { if session.resolved_client_capabilities().workspace_refresh { - requester - .request::((), |()| Task::nothing()) + client + .send_request::( + session, + (), + |_, ()| (), + ) .with_failure_code(lsp_server::ErrorCode::InternalError)?; } else { // publish diagnostics for text documents @@ -33,7 +34,7 @@ impl super::SyncNotificationHandler for DidChangeWatchedFiles { let snapshot = session .take_snapshot(url.clone()) .expect("snapshot should be available"); - publish_diagnostics_for_document(&snapshot, ¬ifier)?; + publish_diagnostics_for_document(&snapshot, client)?; } } @@ -42,7 +43,7 @@ impl super::SyncNotificationHandler for DidChangeWatchedFiles { let snapshot = session .take_snapshot(url.clone()) .expect("snapshot should be available"); - publish_diagnostics_for_document(&snapshot, ¬ifier)?; + publish_diagnostics_for_document(&snapshot, client)?; } } diff --git a/crates/ruff_server/src/server/api/notifications/did_change_workspace.rs b/crates/ruff_server/src/server/api/notifications/did_change_workspace.rs index 2e9dd7cb1d..a121d1b2b4 100644 --- a/crates/ruff_server/src/server/api/notifications/did_change_workspace.rs +++ b/crates/ruff_server/src/server/api/notifications/did_change_workspace.rs @@ -1,7 +1,6 @@ use crate::server::Result; use crate::server::api::LSPResult; -use crate::server::client::{Notifier, Requester}; -use crate::session::Session; +use crate::session::{Client, Session}; use lsp_types as types; use lsp_types::notification as notif; @@ -14,13 +13,12 @@ impl super::NotificationHandler for DidChangeWorkspace { impl super::SyncNotificationHandler for DidChangeWorkspace { fn run( session: &mut Session, - _notifier: Notifier, - _requester: &mut Requester, + client: &Client, params: types::DidChangeWorkspaceFoldersParams, ) -> Result<()> { for types::WorkspaceFolder { uri, .. } in params.event.added { session - .open_workspace_folder(uri) + .open_workspace_folder(uri, client) .with_failure_code(lsp_server::ErrorCode::InvalidParams)?; } for types::WorkspaceFolder { uri, .. } in params.event.removed { diff --git a/crates/ruff_server/src/server/api/notifications/did_close.rs b/crates/ruff_server/src/server/api/notifications/did_close.rs index 1448243f2b..a3075a4846 100644 --- a/crates/ruff_server/src/server/api/notifications/did_close.rs +++ b/crates/ruff_server/src/server/api/notifications/did_close.rs @@ -1,8 +1,7 @@ use crate::server::Result; use crate::server::api::LSPResult; use crate::server::api::diagnostics::clear_diagnostics_for_document; -use crate::server::client::{Notifier, Requester}; -use crate::session::Session; +use crate::session::{Client, Session}; use lsp_types as types; use lsp_types::notification as notif; @@ -15,8 +14,7 @@ impl super::NotificationHandler for DidClose { impl super::SyncNotificationHandler for DidClose { fn run( session: &mut Session, - notifier: Notifier, - _requester: &mut Requester, + client: &Client, types::DidCloseTextDocumentParams { text_document: types::TextDocumentIdentifier { uri }, }: types::DidCloseTextDocumentParams, @@ -29,7 +27,7 @@ impl super::SyncNotificationHandler for DidClose { ); return Ok(()); }; - clear_diagnostics_for_document(snapshot.query(), ¬ifier)?; + clear_diagnostics_for_document(snapshot.query(), client)?; session .close_document(&key) diff --git a/crates/ruff_server/src/server/api/notifications/did_close_notebook.rs b/crates/ruff_server/src/server/api/notifications/did_close_notebook.rs index e675a4ddbe..b70993b5e1 100644 --- a/crates/ruff_server/src/server/api/notifications/did_close_notebook.rs +++ b/crates/ruff_server/src/server/api/notifications/did_close_notebook.rs @@ -1,7 +1,6 @@ use crate::server::Result; use crate::server::api::LSPResult; -use crate::server::client::{Notifier, Requester}; -use crate::session::Session; +use crate::session::{Client, Session}; use lsp_types::notification as notif; use lsp_types::{self as types, NotebookDocumentIdentifier}; @@ -14,8 +13,7 @@ impl super::NotificationHandler for DidCloseNotebook { impl super::SyncNotificationHandler for DidCloseNotebook { fn run( session: &mut Session, - _notifier: Notifier, - _requester: &mut Requester, + _client: &Client, types::DidCloseNotebookDocumentParams { notebook_document: NotebookDocumentIdentifier { uri }, .. diff --git a/crates/ruff_server/src/server/api/notifications/did_open.rs b/crates/ruff_server/src/server/api/notifications/did_open.rs index 6d8e7b4b11..41a6fb6cf8 100644 --- a/crates/ruff_server/src/server/api/notifications/did_open.rs +++ b/crates/ruff_server/src/server/api/notifications/did_open.rs @@ -2,8 +2,7 @@ use crate::TextDocument; use crate::server::Result; use crate::server::api::LSPResult; use crate::server::api::diagnostics::publish_diagnostics_for_document; -use crate::server::client::{Notifier, Requester}; -use crate::session::Session; +use crate::session::{Client, Session}; use lsp_types as types; use lsp_types::notification as notif; @@ -16,8 +15,7 @@ impl super::NotificationHandler for DidOpen { impl super::SyncNotificationHandler for DidOpen { fn run( session: &mut Session, - notifier: Notifier, - _requester: &mut Requester, + client: &Client, types::DidOpenTextDocumentParams { text_document: types::TextDocumentItem { @@ -40,7 +38,7 @@ impl super::SyncNotificationHandler for DidOpen { anyhow::anyhow!("Unable to take snapshot for document with URL {uri}") }) .with_failure_code(lsp_server::ErrorCode::InternalError)?; - publish_diagnostics_for_document(&snapshot, ¬ifier)?; + publish_diagnostics_for_document(&snapshot, client)?; } Ok(()) diff --git a/crates/ruff_server/src/server/api/notifications/did_open_notebook.rs b/crates/ruff_server/src/server/api/notifications/did_open_notebook.rs index d8cda99c42..a75e88ecc5 100644 --- a/crates/ruff_server/src/server/api/notifications/did_open_notebook.rs +++ b/crates/ruff_server/src/server/api/notifications/did_open_notebook.rs @@ -2,8 +2,7 @@ use crate::edit::NotebookDocument; use crate::server::Result; use crate::server::api::LSPResult; use crate::server::api::diagnostics::publish_diagnostics_for_document; -use crate::server::client::{Notifier, Requester}; -use crate::session::Session; +use crate::session::{Client, Session}; use lsp_server::ErrorCode; use lsp_types as types; use lsp_types::notification as notif; @@ -17,8 +16,7 @@ impl super::NotificationHandler for DidOpenNotebook { impl super::SyncNotificationHandler for DidOpenNotebook { fn run( session: &mut Session, - notifier: Notifier, - _requester: &mut Requester, + client: &Client, types::DidOpenNotebookDocumentParams { notebook_document: types::NotebookDocument { @@ -45,7 +43,7 @@ impl super::SyncNotificationHandler for DidOpenNotebook { let snapshot = session .take_snapshot(uri) .expect("snapshot should be available"); - publish_diagnostics_for_document(&snapshot, ¬ifier)?; + publish_diagnostics_for_document(&snapshot, client)?; Ok(()) } diff --git a/crates/ruff_server/src/server/api/requests.rs b/crates/ruff_server/src/server/api/requests.rs index 049f396f63..198ce4fe61 100644 --- a/crates/ruff_server/src/server/api/requests.rs +++ b/crates/ruff_server/src/server/api/requests.rs @@ -5,6 +5,7 @@ mod execute_command; mod format; mod format_range; mod hover; +mod shutdown; use super::{ define_document_url, @@ -17,5 +18,6 @@ pub(super) use execute_command::ExecuteCommand; pub(super) use format::Format; pub(super) use format_range::FormatRange; pub(super) use hover::Hover; +pub(super) use shutdown::ShutdownHandler; type FormatResponse = Option>; diff --git a/crates/ruff_server/src/server/api/requests/code_action.rs b/crates/ruff_server/src/server/api/requests/code_action.rs index 8edd92ba64..b39543a773 100644 --- a/crates/ruff_server/src/server/api/requests/code_action.rs +++ b/crates/ruff_server/src/server/api/requests/code_action.rs @@ -6,10 +6,10 @@ use types::{CodeActionKind, CodeActionOrCommand}; use crate::DIAGNOSTIC_NAME; use crate::edit::WorkspaceEditTracker; use crate::lint::{DiagnosticFix, fixes_for_diagnostics}; +use crate::server::Result; use crate::server::SupportedCodeAction; use crate::server::api::LSPResult; -use crate::server::{Result, client::Notifier}; -use crate::session::DocumentSnapshot; +use crate::session::{Client, DocumentSnapshot}; use super::code_action_resolve::{resolve_edit_for_fix_all, resolve_edit_for_organize_imports}; @@ -23,7 +23,7 @@ impl super::BackgroundDocumentRequestHandler for CodeActions { super::define_document_url!(params: &types::CodeActionParams); fn run_with_snapshot( snapshot: DocumentSnapshot, - _notifier: Notifier, + _client: &Client, params: types::CodeActionParams, ) -> Result> { let mut response: types::CodeActionResponse = types::CodeActionResponse::default(); diff --git a/crates/ruff_server/src/server/api/requests/code_action_resolve.rs b/crates/ruff_server/src/server/api/requests/code_action_resolve.rs index e77ef8092e..ea5691c9d0 100644 --- a/crates/ruff_server/src/server/api/requests/code_action_resolve.rs +++ b/crates/ruff_server/src/server/api/requests/code_action_resolve.rs @@ -8,9 +8,10 @@ use ruff_linter::codes::Rule; use crate::PositionEncoding; use crate::edit::WorkspaceEditTracker; use crate::fix::Fixes; +use crate::server::Result; use crate::server::SupportedCodeAction; use crate::server::api::LSPResult; -use crate::server::{Result, client::Notifier}; +use crate::session::Client; use crate::session::{DocumentQuery, DocumentSnapshot, ResolvedClientCapabilities}; pub(crate) struct CodeActionResolve; @@ -27,7 +28,7 @@ impl super::BackgroundDocumentRequestHandler for CodeActionResolve { } fn run_with_snapshot( snapshot: DocumentSnapshot, - _notifier: Notifier, + _client: &Client, mut action: types::CodeAction, ) -> Result { let query = snapshot.query(); diff --git a/crates/ruff_server/src/server/api/requests/diagnostic.rs b/crates/ruff_server/src/server/api/requests/diagnostic.rs index 48d5783b1e..5315ea931b 100644 --- a/crates/ruff_server/src/server/api/requests/diagnostic.rs +++ b/crates/ruff_server/src/server/api/requests/diagnostic.rs @@ -1,6 +1,6 @@ use crate::server::api::diagnostics::generate_diagnostics; -use crate::server::{Result, client::Notifier}; use crate::session::DocumentSnapshot; +use crate::{server::Result, session::Client}; use lsp_types::{self as types, request as req}; use types::{ DocumentDiagnosticReportResult, FullDocumentDiagnosticReport, @@ -17,7 +17,7 @@ impl super::BackgroundDocumentRequestHandler for DocumentDiagnostic { super::define_document_url!(params: &types::DocumentDiagnosticParams); fn run_with_snapshot( snapshot: DocumentSnapshot, - _notifier: Notifier, + _client: &Client, _params: types::DocumentDiagnosticParams, ) -> Result { Ok(DocumentDiagnosticReportResult::Report( diff --git a/crates/ruff_server/src/server/api/requests/execute_command.rs b/crates/ruff_server/src/server/api/requests/execute_command.rs index 600a65fb78..1303e0ee14 100644 --- a/crates/ruff_server/src/server/api/requests/execute_command.rs +++ b/crates/ruff_server/src/server/api/requests/execute_command.rs @@ -2,10 +2,9 @@ use std::fmt::Write; use std::str::FromStr; use crate::edit::WorkspaceEditTracker; +use crate::server::SupportedCommand; use crate::server::api::LSPResult; -use crate::server::schedule::Task; -use crate::server::{SupportedCommand, client}; -use crate::session::Session; +use crate::session::{Client, Session}; use crate::{DIAGNOSTIC_NAME, DocumentKey}; use crate::{edit::DocumentVersion, server}; use lsp_server::ErrorCode; @@ -38,8 +37,7 @@ impl super::RequestHandler for ExecuteCommand { impl super::SyncRequestHandler for ExecuteCommand { fn run( session: &mut Session, - _notifier: client::Notifier, - requester: &mut client::Requester, + client: &Client, params: types::ExecuteCommandParams, ) -> server::Result> { let command = SupportedCommand::from_str(¶ms.command) @@ -76,7 +74,7 @@ impl super::SyncRequestHandler for ExecuteCommand { for Argument { uri, version } in arguments { let Some(snapshot) = session.take_snapshot(uri.clone()) else { tracing::error!("Document at {uri} could not be opened"); - show_err_msg!("Ruff does not recognize this file"); + client.show_error_message("Ruff does not recognize this file"); return Ok(None); }; match command { @@ -114,7 +112,8 @@ impl super::SyncRequestHandler for ExecuteCommand { if !edit_tracker.is_empty() { apply_edit( - requester, + session, + client, command.label(), edit_tracker.into_workspace_edit(), ) @@ -126,24 +125,25 @@ impl super::SyncRequestHandler for ExecuteCommand { } fn apply_edit( - requester: &mut client::Requester, + session: &mut Session, + client: &Client, label: &str, edit: types::WorkspaceEdit, ) -> crate::Result<()> { - requester.request::( + client.send_request::( + session, types::ApplyWorkspaceEditParams { label: Some(format!("{DIAGNOSTIC_NAME}: {label}")), edit, }, - |response| { + move |client, response| { if !response.applied { let reason = response .failure_reason .unwrap_or_else(|| String::from("unspecified reason")); tracing::error!("Failed to apply workspace edit: {reason}"); - show_err_msg!("Ruff was unable to apply edits: {reason}"); + client.show_error_message(format_args!("Ruff was unable to apply edits: {reason}")); } - Task::nothing() }, ) } diff --git a/crates/ruff_server/src/server/api/requests/format.rs b/crates/ruff_server/src/server/api/requests/format.rs index e4fbc86ab1..2203d5381d 100644 --- a/crates/ruff_server/src/server/api/requests/format.rs +++ b/crates/ruff_server/src/server/api/requests/format.rs @@ -7,9 +7,9 @@ use ruff_source_file::LineIndex; use crate::edit::{Replacement, ToRangeExt}; use crate::fix::Fixes; use crate::resolve::is_document_excluded_for_formatting; +use crate::server::Result; use crate::server::api::LSPResult; -use crate::server::{Result, client::Notifier}; -use crate::session::{DocumentQuery, DocumentSnapshot}; +use crate::session::{Client, DocumentQuery, DocumentSnapshot}; use crate::{PositionEncoding, TextDocument}; pub(crate) struct Format; @@ -22,7 +22,7 @@ impl super::BackgroundDocumentRequestHandler for Format { super::define_document_url!(params: &types::DocumentFormattingParams); fn run_with_snapshot( snapshot: DocumentSnapshot, - _notifier: Notifier, + _client: &Client, _params: types::DocumentFormattingParams, ) -> Result { format_document(&snapshot) diff --git a/crates/ruff_server/src/server/api/requests/format_range.rs b/crates/ruff_server/src/server/api/requests/format_range.rs index 5f18af9737..e8cb4d56fc 100644 --- a/crates/ruff_server/src/server/api/requests/format_range.rs +++ b/crates/ruff_server/src/server/api/requests/format_range.rs @@ -3,9 +3,9 @@ use lsp_types::{self as types, Range, request as req}; use crate::edit::{RangeExt, ToRangeExt}; use crate::resolve::is_document_excluded_for_formatting; +use crate::server::Result; use crate::server::api::LSPResult; -use crate::server::{Result, client::Notifier}; -use crate::session::{DocumentQuery, DocumentSnapshot}; +use crate::session::{Client, DocumentQuery, DocumentSnapshot}; use crate::{PositionEncoding, TextDocument}; pub(crate) struct FormatRange; @@ -18,7 +18,7 @@ impl super::BackgroundDocumentRequestHandler for FormatRange { super::define_document_url!(params: &types::DocumentRangeFormattingParams); fn run_with_snapshot( snapshot: DocumentSnapshot, - _notifier: Notifier, + _client: &Client, params: types::DocumentRangeFormattingParams, ) -> Result { format_document_range(&snapshot, params.range) diff --git a/crates/ruff_server/src/server/api/requests/hover.rs b/crates/ruff_server/src/server/api/requests/hover.rs index 846f3654c5..266234e868 100644 --- a/crates/ruff_server/src/server/api/requests/hover.rs +++ b/crates/ruff_server/src/server/api/requests/hover.rs @@ -1,5 +1,5 @@ -use crate::server::{Result, client::Notifier}; -use crate::session::DocumentSnapshot; +use crate::server::Result; +use crate::session::{Client, DocumentSnapshot}; use anyhow::Context; use lsp_types::{self as types, request as req}; use regex::Regex; @@ -20,7 +20,7 @@ impl super::BackgroundDocumentRequestHandler for Hover { } fn run_with_snapshot( snapshot: DocumentSnapshot, - _notifier: Notifier, + _client: &Client, params: types::HoverParams, ) -> Result> { Ok(hover(&snapshot, ¶ms.text_document_position_params)) diff --git a/crates/ruff_server/src/server/api/requests/shutdown.rs b/crates/ruff_server/src/server/api/requests/shutdown.rs new file mode 100644 index 0000000000..5ec89c7932 --- /dev/null +++ b/crates/ruff_server/src/server/api/requests/shutdown.rs @@ -0,0 +1,17 @@ +use crate::Session; +use crate::server::api::traits::{RequestHandler, SyncRequestHandler}; +use crate::session::Client; + +pub(crate) struct ShutdownHandler; + +impl RequestHandler for ShutdownHandler { + type RequestType = lsp_types::request::Shutdown; +} + +impl SyncRequestHandler for ShutdownHandler { + fn run(session: &mut Session, _client: &Client, _params: ()) -> crate::server::Result<()> { + tracing::debug!("Received shutdown request, waiting for shutdown notification"); + session.set_shutdown_requested(true); + Ok(()) + } +} diff --git a/crates/ruff_server/src/server/api/traits.rs b/crates/ruff_server/src/server/api/traits.rs index 3c8e56b7a2..8d70c5ff67 100644 --- a/crates/ruff_server/src/server/api/traits.rs +++ b/crates/ruff_server/src/server/api/traits.rs @@ -1,7 +1,6 @@ //! A stateful LSP implementation that calls into the Ruff API. -use crate::server::client::{Notifier, Requester}; -use crate::session::{DocumentSnapshot, Session}; +use crate::session::{Client, DocumentSnapshot, Session}; use lsp_types::notification::Notification as LSPNotification; use lsp_types::request::Request; @@ -19,8 +18,7 @@ pub(super) trait RequestHandler { pub(super) trait SyncRequestHandler: RequestHandler { fn run( session: &mut Session, - notifier: Notifier, - requester: &mut Requester, + client: &Client, params: <::RequestType as Request>::Params, ) -> super::Result<<::RequestType as Request>::Result>; } @@ -36,7 +34,7 @@ pub(super) trait BackgroundDocumentRequestHandler: RequestHandler { fn run_with_snapshot( snapshot: DocumentSnapshot, - notifier: Notifier, + client: &Client, params: <::RequestType as Request>::Params, ) -> super::Result<<::RequestType as Request>::Result>; } @@ -55,8 +53,7 @@ pub(super) trait NotificationHandler { pub(super) trait SyncNotificationHandler: NotificationHandler { fn run( session: &mut Session, - notifier: Notifier, - requester: &mut Requester, + client: &Client, params: <::NotificationType as LSPNotification>::Params, ) -> super::Result<()>; } @@ -72,7 +69,7 @@ pub(super) trait BackgroundDocumentNotificationHandler: NotificationHandler { fn run_with_snapshot( snapshot: DocumentSnapshot, - notifier: Notifier, + client: &Client, params: <::NotificationType as LSPNotification>::Params, ) -> super::Result<()>; } diff --git a/crates/ruff_server/src/server/client.rs b/crates/ruff_server/src/server/client.rs deleted file mode 100644 index e136bc98d4..0000000000 --- a/crates/ruff_server/src/server/client.rs +++ /dev/null @@ -1,169 +0,0 @@ -use std::any::TypeId; - -use lsp_server::{Notification, RequestId}; -use rustc_hash::FxHashMap; -use serde_json::Value; - -use super::{ClientSender, schedule::Task}; - -type ResponseBuilder<'s> = Box Task<'s>>; - -pub(crate) struct Client<'s> { - notifier: Notifier, - responder: Responder, - pub(super) requester: Requester<'s>, -} - -#[derive(Clone)] -pub(crate) struct Notifier(ClientSender); - -#[derive(Clone)] -pub(crate) struct Responder(ClientSender); - -pub(crate) struct Requester<'s> { - sender: ClientSender, - next_request_id: i32, - response_handlers: FxHashMap>, -} - -impl Client<'_> { - pub(super) fn new(sender: ClientSender) -> Self { - Self { - notifier: Notifier(sender.clone()), - responder: Responder(sender.clone()), - requester: Requester { - sender, - next_request_id: 1, - response_handlers: FxHashMap::default(), - }, - } - } - - pub(super) fn notifier(&self) -> Notifier { - self.notifier.clone() - } - - pub(super) fn responder(&self) -> Responder { - self.responder.clone() - } -} - -#[expect(dead_code)] // we'll need to use `Notifier` in the future -impl Notifier { - pub(crate) fn notify(&self, params: N::Params) -> crate::Result<()> - where - N: lsp_types::notification::Notification, - { - let method = N::METHOD.to_string(); - - let message = lsp_server::Message::Notification(Notification::new(method, params)); - - self.0.send(message) - } - - pub(crate) fn notify_method(&self, method: String) -> crate::Result<()> { - self.0 - .send(lsp_server::Message::Notification(Notification::new( - method, - Value::Null, - ))) - } -} - -impl Responder { - pub(crate) fn respond( - &self, - id: RequestId, - result: crate::server::Result, - ) -> crate::Result<()> - where - R: serde::Serialize, - { - self.0.send( - match result { - Ok(res) => lsp_server::Response::new_ok(id, res), - Err(crate::server::api::Error { code, error }) => { - lsp_server::Response::new_err(id, code as i32, format!("{error}")) - } - } - .into(), - ) - } -} - -impl<'s> Requester<'s> { - /// Sends a request of kind `R` to the client, with associated parameters. - /// The task provided by `response_handler` will be dispatched as soon as the response - /// comes back from the client. - pub(crate) fn request( - &mut self, - params: R::Params, - response_handler: impl Fn(R::Result) -> Task<'s> + 'static, - ) -> crate::Result<()> - where - R: lsp_types::request::Request, - { - let serialized_params = serde_json::to_value(params)?; - - self.response_handlers.insert( - self.next_request_id.into(), - Box::new(move |response: lsp_server::Response| { - match (response.error, response.result) { - (Some(err), _) => { - tracing::error!( - "Got an error from the client (code {}): {}", - err.code, - err.message - ); - Task::nothing() - } - (None, Some(response)) => match serde_json::from_value(response) { - Ok(response) => response_handler(response), - Err(error) => { - tracing::error!("Failed to deserialize response from server: {error}"); - Task::nothing() - } - }, - (None, None) => { - if TypeId::of::() == TypeId::of::<()>() { - // We can't call `response_handler(())` directly here, but - // since we _know_ the type expected is `()`, we can use - // `from_value(Value::Null)`. `R::Result` implements `DeserializeOwned`, - // so this branch works in the general case but we'll only - // hit it if the concrete type is `()`, so the `unwrap()` is safe here. - response_handler(serde_json::from_value(Value::Null).unwrap()); - } else { - tracing::error!( - "Server response was invalid: did not contain a result or error" - ); - } - Task::nothing() - } - } - }), - ); - - self.sender - .send(lsp_server::Message::Request(lsp_server::Request { - id: self.next_request_id.into(), - method: R::METHOD.into(), - params: serialized_params, - }))?; - - self.next_request_id += 1; - - Ok(()) - } - - pub(crate) fn pop_response_task(&mut self, response: lsp_server::Response) -> Task<'s> { - if let Some(handler) = self.response_handlers.remove(&response.id) { - handler(response) - } else { - tracing::error!( - "Received a response with ID {}, which was not expected", - response.id - ); - Task::nothing() - } - } -} diff --git a/crates/ruff_server/src/server/connection.rs b/crates/ruff_server/src/server/connection.rs index 029a34c931..4993d2ba6c 100644 --- a/crates/ruff_server/src/server/connection.rs +++ b/crates/ruff_server/src/server/connection.rs @@ -1,31 +1,17 @@ use lsp_server as lsp; -use lsp_types::{notification::Notification, request::Request}; -use std::sync::{Arc, Weak}; -type ConnectionSender = crossbeam::channel::Sender; -type ConnectionReceiver = crossbeam::channel::Receiver; +pub type ConnectionSender = crossbeam::channel::Sender; /// A builder for `Connection` that handles LSP initialization. pub(crate) struct ConnectionInitializer { connection: lsp::Connection, - threads: lsp::IoThreads, -} - -/// Handles inbound and outbound messages with the client. -pub(crate) struct Connection { - sender: Arc, - receiver: ConnectionReceiver, - threads: lsp::IoThreads, } impl ConnectionInitializer { /// Create a new LSP server connection over stdin/stdout. - pub(super) fn stdio() -> Self { + pub(crate) fn stdio() -> (Self, lsp::IoThreads) { let (connection, threads) = lsp::Connection::stdio(); - Self { - connection, - threads, - } + (Self { connection }, threads) } /// Starts the initialization process with the client by listening for an initialization request. @@ -46,7 +32,7 @@ impl ConnectionInitializer { server_capabilities: &lsp_types::ServerCapabilities, name: &str, version: &str, - ) -> crate::Result { + ) -> crate::Result { self.connection.initialize_finish( id, serde_json::json!({ @@ -57,111 +43,6 @@ impl ConnectionInitializer { } }), )?; - let Self { - connection: lsp::Connection { sender, receiver }, - threads, - } = self; - Ok(Connection { - sender: Arc::new(sender), - receiver, - threads, - }) - } -} - -impl Connection { - /// Make a new `ClientSender` for sending messages to the client. - pub(super) fn make_sender(&self) -> ClientSender { - ClientSender { - weak_sender: Arc::downgrade(&self.sender), - } - } - - /// An iterator over incoming messages from the client. - pub(super) fn incoming(&self) -> crossbeam::channel::Iter { - self.receiver.iter() - } - - /// Check and respond to any incoming shutdown requests; returns`true` if the server should be shutdown. - pub(super) fn handle_shutdown(&self, message: &lsp::Message) -> crate::Result { - match message { - lsp::Message::Request(lsp::Request { id, method, .. }) - if method == lsp_types::request::Shutdown::METHOD => - { - self.sender - .send(lsp::Response::new_ok(id.clone(), ()).into())?; - tracing::info!("Shutdown request received. Waiting for an exit notification..."); - - loop { - match &self - .receiver - .recv_timeout(std::time::Duration::from_secs(30))? - { - lsp::Message::Notification(lsp::Notification { method, .. }) - if method == lsp_types::notification::Exit::METHOD => - { - tracing::info!("Exit notification received. Server shutting down..."); - return Ok(true); - } - lsp::Message::Request(lsp::Request { id, method, .. }) => { - tracing::warn!( - "Server received unexpected request {method} ({id}) while waiting for exit notification", - ); - self.sender.send(lsp::Message::Response(lsp::Response::new_err( - id.clone(), - lsp::ErrorCode::InvalidRequest as i32, - "Server received unexpected request while waiting for exit notification".to_string(), - )))?; - } - message => { - tracing::warn!( - "Server received unexpected message while waiting for exit notification: {message:?}" - ); - } - } - } - } - lsp::Message::Notification(lsp::Notification { method, .. }) - if method == lsp_types::notification::Exit::METHOD => - { - anyhow::bail!( - "Server received an exit notification before a shutdown request was sent. Exiting..." - ); - } - _ => Ok(false), - } - } - - /// Join the I/O threads that underpin this connection. - /// This is guaranteed to be nearly immediate since - /// we close the only active channels to these threads prior - /// to joining them. - pub(super) fn close(self) -> crate::Result<()> { - std::mem::drop( - Arc::into_inner(self.sender) - .expect("the client sender shouldn't have more than one strong reference"), - ); - std::mem::drop(self.receiver); - self.threads.join()?; - Ok(()) - } -} - -/// A weak reference to an underlying sender channel, used for communication with the client. -/// If the `Connection` that created this `ClientSender` is dropped, any `send` calls will throw -/// an error. -#[derive(Clone, Debug)] -pub(crate) struct ClientSender { - weak_sender: Weak, -} - -// note: additional wrapper functions for senders may be implemented as needed. -impl ClientSender { - pub(crate) fn send(&self, msg: lsp::Message) -> crate::Result<()> { - let Some(sender) = self.weak_sender.upgrade() else { - anyhow::bail!("The connection with the client has been closed"); - }; - - Ok(sender.send(msg)?) + Ok(self.connection) } } diff --git a/crates/ruff_server/src/server/main_loop.rs b/crates/ruff_server/src/server/main_loop.rs new file mode 100644 index 0000000000..b5943ad3db --- /dev/null +++ b/crates/ruff_server/src/server/main_loop.rs @@ -0,0 +1,209 @@ +use anyhow::anyhow; +use crossbeam::select; +use lsp_server::Message; +use lsp_types::{ + self as types, DidChangeWatchedFilesRegistrationOptions, FileSystemWatcher, + notification::Notification as _, +}; + +use crate::{ + Server, + server::{api, schedule}, + session::Client, +}; + +pub type MainLoopSender = crossbeam::channel::Sender; +pub(crate) type MainLoopReceiver = crossbeam::channel::Receiver; + +impl Server { + pub(super) fn main_loop(&mut self) -> crate::Result<()> { + self.initialize(&Client::new( + self.main_loop_sender.clone(), + self.connection.sender.clone(), + )); + + let mut scheduler = schedule::Scheduler::new(self.worker_threads); + + while let Ok(next_event) = self.next_event() { + let Some(next_event) = next_event else { + anyhow::bail!("client exited without proper shutdown sequence"); + }; + + match next_event { + Event::Message(msg) => { + let client = Client::new( + self.main_loop_sender.clone(), + self.connection.sender.clone(), + ); + + let task = match msg { + Message::Request(req) => { + self.session + .request_queue_mut() + .incoming_mut() + .register(req.id.clone(), req.method.clone()); + + if self.session.is_shutdown_requested() { + tracing::warn!( + "Received request after server shutdown was requested, discarding" + ); + client.respond_err( + req.id, + lsp_server::ResponseError { + code: lsp_server::ErrorCode::InvalidRequest as i32, + message: "Shutdown already requested".to_owned(), + data: None, + }, + )?; + continue; + } + + api::request(req) + } + Message::Notification(notification) => { + if notification.method == lsp_types::notification::Exit::METHOD { + if !self.session.is_shutdown_requested() { + return Err(anyhow!( + "Received exit notification before a shutdown request" + )); + } + + tracing::debug!("Received exit notification, exiting"); + return Ok(()); + } + + api::notification(notification) + } + + // Handle the response from the client to a server request + Message::Response(response) => { + if let Some(handler) = self + .session + .request_queue_mut() + .outgoing_mut() + .complete(&response.id) + { + handler(&client, response); + } else { + tracing::error!( + "Received a response with ID {}, which was not expected", + response.id + ); + } + + continue; + } + }; + + scheduler.dispatch(task, &mut self.session, client); + } + Event::SendResponse(response) => { + // Filter out responses for already canceled requests. + if let Some((start_time, method)) = self + .session + .request_queue_mut() + .incoming_mut() + .complete(&response.id) + { + let duration = start_time.elapsed(); + tracing::trace!(name: "message response", method, %response.id, duration = format_args!("{:0.2?}", duration)); + + self.connection.sender.send(Message::Response(response))?; + } else { + tracing::trace!( + "Ignoring response for canceled request id={}", + response.id + ); + } + } + } + } + + Ok(()) + } + + /// Waits for the next message from the client or action. + /// + /// Returns `Ok(None)` if the client connection is closed. + fn next_event(&self) -> Result, crossbeam::channel::RecvError> { + select!( + recv(self.connection.receiver) -> msg => { + // Ignore disconnect errors, they're handled by the main loop (it will exit). + Ok(msg.ok().map(Event::Message)) + }, + recv(self.main_loop_receiver) -> event => event.map(Some), + ) + } + + fn initialize(&mut self, client: &Client) { + let dynamic_registration = self + .client_capabilities + .workspace + .as_ref() + .and_then(|workspace| workspace.did_change_watched_files) + .and_then(|watched_files| watched_files.dynamic_registration) + .unwrap_or_default(); + if dynamic_registration { + // Register all dynamic capabilities here + + // `workspace/didChangeWatchedFiles` + // (this registers the configuration file watcher) + let params = lsp_types::RegistrationParams { + registrations: vec![lsp_types::Registration { + id: "ruff-server-watch".into(), + method: "workspace/didChangeWatchedFiles".into(), + register_options: Some( + serde_json::to_value(DidChangeWatchedFilesRegistrationOptions { + watchers: vec![ + FileSystemWatcher { + glob_pattern: types::GlobPattern::String( + "**/.ruff.toml".into(), + ), + kind: None, + }, + FileSystemWatcher { + glob_pattern: types::GlobPattern::String("**/ruff.toml".into()), + kind: None, + }, + FileSystemWatcher { + glob_pattern: types::GlobPattern::String( + "**/pyproject.toml".into(), + ), + kind: None, + }, + ], + }) + .unwrap(), + ), + }], + }; + + let response_handler = |_: &Client, ()| { + tracing::info!("Configuration file watcher successfully registered"); + }; + + if let Err(err) = client.send_request::( + &self.session, + params, + response_handler, + ) { + tracing::error!( + "An error occurred when trying to register the configuration file watcher: {err}" + ); + } + } else { + tracing::warn!( + "LSP client does not support dynamic capability registration - automatic configuration reloading will not be available." + ); + } + } +} + +#[derive(Debug)] +pub enum Event { + /// An incoming message from the LSP client. + Message(lsp_server::Message), + + /// Send a response to the client + SendResponse(lsp_server::Response), +} diff --git a/crates/ruff_server/src/server/schedule.rs b/crates/ruff_server/src/server/schedule.rs index ec6204dd62..e6ca9bd438 100644 --- a/crates/ruff_server/src/server/schedule.rs +++ b/crates/ruff_server/src/server/schedule.rs @@ -1,6 +1,6 @@ use std::num::NonZeroUsize; -use crate::session::Session; +use crate::session::{Client, Session}; mod task; mod thread; @@ -12,13 +12,11 @@ use self::{ thread::ThreadPriority, }; -use super::{ClientSender, client::Client}; - /// The event loop thread is actually a secondary thread that we spawn from the /// _actual_ main thread. This secondary thread has a larger stack size /// than some OS defaults (Windows, for example) and is also designated as /// high-priority. -pub(crate) fn event_loop_thread( +pub(crate) fn spawn_main_loop( func: impl FnOnce() -> crate::Result<()> + Send + 'static, ) -> crate::Result>> { // Override OS defaults to avoid stack overflows on platforms with low stack size defaults. @@ -32,69 +30,33 @@ pub(crate) fn event_loop_thread( ) } -pub(crate) struct Scheduler<'s> { - session: &'s mut Session, - client: Client<'s>, +pub(crate) struct Scheduler { fmt_pool: thread::Pool, background_pool: thread::Pool, } -impl<'s> Scheduler<'s> { - pub(super) fn new( - session: &'s mut Session, - worker_threads: NonZeroUsize, - sender: ClientSender, - ) -> Self { +impl Scheduler { + pub(super) fn new(worker_threads: NonZeroUsize) -> Self { const FMT_THREADS: usize = 1; Self { - session, fmt_pool: thread::Pool::new(NonZeroUsize::try_from(FMT_THREADS).unwrap()), background_pool: thread::Pool::new(worker_threads), - client: Client::new(sender), } } - /// Immediately sends a request of kind `R` to the client, with associated parameters. - /// The task provided by `response_handler` will be dispatched as soon as the response - /// comes back from the client. - pub(super) fn request( - &mut self, - params: R::Params, - response_handler: impl Fn(R::Result) -> Task<'s> + 'static, - ) -> crate::Result<()> - where - R: lsp_types::request::Request, - { - self.client.requester.request::(params, response_handler) - } - - /// Creates a task to handle a response from the client. - pub(super) fn response(&mut self, response: lsp_server::Response) -> Task<'s> { - self.client.requester.pop_response_task(response) - } - /// Dispatches a `task` by either running it as a blocking function or /// executing it on a background thread pool. - pub(super) fn dispatch(&mut self, task: task::Task<'s>) { + pub(super) fn dispatch(&mut self, task: Task, session: &mut Session, client: Client) { match task { Task::Sync(SyncTask { func }) => { - let notifier = self.client.notifier(); - let responder = self.client.responder(); - func( - self.session, - notifier, - &mut self.client.requester, - responder, - ); + func(session, &client); } Task::Background(BackgroundTaskBuilder { schedule, builder: func, }) => { - let static_func = func(self.session); - let notifier = self.client.notifier(); - let responder = self.client.responder(); - let task = move || static_func(notifier, responder); + let static_func = func(session); + let task = move || static_func(&client); match schedule { BackgroundSchedule::Worker => { self.background_pool.spawn(ThreadPriority::Worker, task); diff --git a/crates/ruff_server/src/server/schedule/task.rs b/crates/ruff_server/src/server/schedule/task.rs index e34b22ed69..2e768c2b18 100644 --- a/crates/ruff_server/src/server/schedule/task.rs +++ b/crates/ruff_server/src/server/schedule/task.rs @@ -1,16 +1,13 @@ use lsp_server::RequestId; use serde::Serialize; -use crate::{ - server::client::{Notifier, Requester, Responder}, - session::Session, -}; +use crate::session::{Client, Session}; -type LocalFn<'s> = Box; +type LocalFn = Box; -type BackgroundFn = Box; +type BackgroundFn = Box; -type BackgroundFnBuilder<'s> = Box BackgroundFn + 's>; +type BackgroundFnBuilder = Box BackgroundFn>; /// Describes how the task should be run. #[derive(Clone, Copy, Debug, Default)] @@ -36,9 +33,9 @@ pub(in crate::server) enum BackgroundSchedule { /// while local tasks have exclusive access and can modify it as they please. Keep in mind that /// local tasks will **block** the main event loop, so only use local tasks if you **need** /// mutable state access or you need the absolute lowest latency possible. -pub(in crate::server) enum Task<'s> { - Background(BackgroundTaskBuilder<'s>), - Sync(SyncTask<'s>), +pub(in crate::server) enum Task { + Background(BackgroundTaskBuilder), + Sync(SyncTask), } // The reason why this isn't just a 'static background closure @@ -49,20 +46,20 @@ pub(in crate::server) enum Task<'s> { // that the inner closure can capture. This builder closure has a lifetime linked to the scheduler. // When the task is dispatched, the scheduler runs the synchronous builder, which takes the session // as a reference, to create the inner 'static closure. That closure is then moved to a background task pool. -pub(in crate::server) struct BackgroundTaskBuilder<'s> { +pub(in crate::server) struct BackgroundTaskBuilder { pub(super) schedule: BackgroundSchedule, - pub(super) builder: BackgroundFnBuilder<'s>, + pub(super) builder: BackgroundFnBuilder, } -pub(in crate::server) struct SyncTask<'s> { - pub(super) func: LocalFn<'s>, +pub(in crate::server) struct SyncTask { + pub(super) func: LocalFn, } -impl<'s> Task<'s> { +impl Task { /// Creates a new background task. pub(crate) fn background( schedule: BackgroundSchedule, - func: impl FnOnce(&Session) -> Box + 's, + func: impl FnOnce(&Session) -> Box + 'static, ) -> Self { Self::Background(BackgroundTaskBuilder { schedule, @@ -70,9 +67,7 @@ impl<'s> Task<'s> { }) } /// Creates a new local task. - pub(crate) fn local( - func: impl FnOnce(&mut Session, Notifier, &mut Requester, Responder) + 's, - ) -> Self { + pub(crate) fn sync(func: impl FnOnce(&mut Session, &Client) + 'static) -> Self { Self::Sync(SyncTask { func: Box::new(func), }) @@ -83,8 +78,8 @@ impl<'s> Task<'s> { where R: Serialize + Send + 'static, { - Self::local(move |_, _, _, responder| { - if let Err(err) = responder.respond(id, result) { + Self::sync(move |_, client| { + if let Err(err) = client.respond(&id, result) { tracing::error!("Unable to send immediate response: {err}"); } }) @@ -92,6 +87,6 @@ impl<'s> Task<'s> { /// Creates a local task that does nothing. pub(crate) fn nothing() -> Self { - Self::local(move |_, _, _, _| {}) + Self::sync(move |_, _| {}) } } diff --git a/crates/ruff_server/src/server/schedule/thread/pool.rs b/crates/ruff_server/src/server/schedule/thread/pool.rs index 98728bba8f..ac3e072ab8 100644 --- a/crates/ruff_server/src/server/schedule/thread/pool.rs +++ b/crates/ruff_server/src/server/schedule/thread/pool.rs @@ -15,6 +15,7 @@ use std::{ num::NonZeroUsize, + panic::AssertUnwindSafe, sync::{ Arc, atomic::{AtomicUsize, Ordering}, @@ -71,7 +72,26 @@ impl Pool { current_priority = job.requested_priority; } extant_tasks.fetch_add(1, Ordering::SeqCst); - (job.f)(); + + // SAFETY: it's safe to assume that `job.f` is unwind safe because we always + // abort the process if it panics. + // Panicking here ensures that we don't swallow errors and is the same as + // what rayon does. + // Any recovery should be implemented outside the thread pool (e.g. when + // dispatching requests/notifications etc). + if let Err(error) = std::panic::catch_unwind(AssertUnwindSafe(job.f)) { + if let Some(msg) = error.downcast_ref::() { + tracing::error!("Worker thread panicked with: {msg}; aborting"); + } else if let Some(msg) = error.downcast_ref::<&str>() { + tracing::error!("Worker thread panicked with: {msg}; aborting"); + } else { + tracing::error!( + "Worker thread panicked with: {error:?}; aborting" + ); + } + + std::process::abort(); + } extant_tasks.fetch_sub(1, Ordering::SeqCst); } } diff --git a/crates/ruff_server/src/session.rs b/crates/ruff_server/src/session.rs index 79cc059b4c..c87ba6d4ee 100644 --- a/crates/ruff_server/src/session.rs +++ b/crates/ruff_server/src/session.rs @@ -7,6 +7,7 @@ use lsp_types::{ClientCapabilities, FileEvent, NotebookDocumentCellChange, Url}; use settings::ClientSettings; use crate::edit::{DocumentKey, DocumentVersion, NotebookDocument}; +use crate::session::request_queue::RequestQueue; use crate::session::settings::GlobalClientSettings; use crate::workspace::Workspaces; use crate::{PositionEncoding, TextDocument}; @@ -15,10 +16,13 @@ pub(crate) use self::capabilities::ResolvedClientCapabilities; pub use self::index::DocumentQuery; pub(crate) use self::options::{AllOptions, WorkspaceOptionsMap}; pub use self::options::{ClientOptions, GlobalOptions}; +pub use client::Client; mod capabilities; +mod client; mod index; mod options; +mod request_queue; mod settings; /// The global state for the LSP @@ -32,6 +36,12 @@ pub struct Session { /// Tracks what LSP features the client supports and doesn't support. resolved_client_capabilities: Arc, + + /// Tracks the pending requests between client and server. + request_queue: RequestQueue, + + /// Has the client requested the server to shutdown. + shutdown_requested: bool, } /// An immutable snapshot of `Session` that references @@ -49,17 +59,36 @@ impl Session { position_encoding: PositionEncoding, global: GlobalClientSettings, workspaces: &Workspaces, + client: &Client, ) -> crate::Result { Ok(Self { position_encoding, - index: index::Index::new(workspaces, &global)?, + index: index::Index::new(workspaces, &global, client)?, global_settings: global, resolved_client_capabilities: Arc::new(ResolvedClientCapabilities::new( client_capabilities, )), + request_queue: RequestQueue::new(), + shutdown_requested: false, }) } + pub(crate) fn request_queue(&self) -> &RequestQueue { + &self.request_queue + } + + pub(crate) fn request_queue_mut(&mut self) -> &mut RequestQueue { + &mut self.request_queue + } + + pub(crate) fn is_shutdown_requested(&self) -> bool { + self.shutdown_requested + } + + pub(crate) fn set_shutdown_requested(&mut self, requested: bool) { + self.shutdown_requested = requested; + } + pub fn key_from_url(&self, url: Url) -> DocumentKey { self.index.key_from_url(url) } @@ -140,13 +169,14 @@ impl Session { } /// Reloads the settings index based on the provided changes. - pub(crate) fn reload_settings(&mut self, changes: &[FileEvent]) { - self.index.reload_settings(changes); + pub(crate) fn reload_settings(&mut self, changes: &[FileEvent], client: &Client) { + self.index.reload_settings(changes, client); } /// Open a workspace folder at the given `url`. - pub(crate) fn open_workspace_folder(&mut self, url: Url) -> crate::Result<()> { - self.index.open_workspace_folder(url, &self.global_settings) + pub(crate) fn open_workspace_folder(&mut self, url: Url, client: &Client) -> crate::Result<()> { + self.index + .open_workspace_folder(url, &self.global_settings, client) } /// Close a workspace folder at the given `url`. diff --git a/crates/ruff_server/src/session/client.rs b/crates/ruff_server/src/session/client.rs new file mode 100644 index 0000000000..0cbcd449d5 --- /dev/null +++ b/crates/ruff_server/src/session/client.rs @@ -0,0 +1,248 @@ +use crate::Session; +use crate::server::{ConnectionSender, Event, MainLoopSender}; +use anyhow::{Context, anyhow}; +use lsp_server::{ErrorCode, Message, Notification, RequestId, ResponseError}; +use serde_json::Value; +use std::any::TypeId; +use std::fmt::Display; + +pub(crate) type ClientResponseHandler = Box; + +#[derive(Clone, Debug)] +pub struct Client { + /// Channel to send messages back to the main loop. + main_loop_sender: MainLoopSender, + /// Channel to send messages directly to the LSP client without going through the main loop. + /// + /// This is generally preferred because it reduces pressure on the main loop but it may not always be + /// possible if access to data on [`Session`] is required, which background tasks don't have. + client_sender: ConnectionSender, +} + +impl Client { + pub fn new(main_loop_sender: MainLoopSender, client_sender: ConnectionSender) -> Self { + Self { + main_loop_sender, + client_sender, + } + } + + /// Sends a request of kind `R` to the client, with associated parameters. + /// + /// The request is sent immediately. + /// The `response_handler` will be dispatched as soon as the client response + /// is processed on the main-loop. The handler always runs on the main-loop thread. + /// + /// # Note + /// This method takes a `session` so that we can register the pending-request + /// and send the response directly to the client. If this ever becomes too limiting (because we + /// need to send a request from somewhere where we don't have access to session), consider introducing + /// a new `send_deferred_request` method that doesn't take a session and instead sends + /// an `Action` to the main loop to send the request (the main loop has always access to session). + pub(crate) fn send_request( + &self, + session: &Session, + params: R::Params, + response_handler: impl FnOnce(&Client, R::Result) + Send + 'static, + ) -> crate::Result<()> + where + R: lsp_types::request::Request, + { + let response_handler = Box::new(move |client: &Client, response: lsp_server::Response| { + let _span = + tracing::debug_span!("client_response", id=%response.id, method = R::METHOD) + .entered(); + + match (response.error, response.result) { + (Some(err), _) => { + tracing::error!( + "Got an error from the client (code {code}, method {method}): {message}", + code = err.code, + message = err.message, + method = R::METHOD + ); + } + (None, Some(response)) => match serde_json::from_value(response) { + Ok(response) => response_handler(client, response), + Err(error) => { + tracing::error!( + "Failed to deserialize client response (method={method}): {error}", + method = R::METHOD + ); + } + }, + (None, None) => { + if TypeId::of::() == TypeId::of::<()>() { + // We can't call `response_handler(())` directly here, but + // since we _know_ the type expected is `()`, we can use + // `from_value(Value::Null)`. `R::Result` implements `DeserializeOwned`, + // so this branch works in the general case but we'll only + // hit it if the concrete type is `()`, so the `unwrap()` is safe here. + response_handler(client, serde_json::from_value(Value::Null).unwrap()); + } else { + tracing::error!( + "Invalid client response: did not contain a result or error (method={method})", + method = R::METHOD + ); + } + } + } + }); + + let id = session + .request_queue() + .outgoing() + .register(response_handler); + + self.client_sender + .send(Message::Request(lsp_server::Request { + id, + method: R::METHOD.to_string(), + params: serde_json::to_value(params).context("Failed to serialize params")?, + })) + .with_context(|| { + format!("Failed to send request method={method}", method = R::METHOD) + })?; + + Ok(()) + } + + /// Sends a notification to the client. + pub(crate) fn send_notification(&self, params: N::Params) -> crate::Result<()> + where + N: lsp_types::notification::Notification, + { + let method = N::METHOD.to_string(); + + self.client_sender + .send(lsp_server::Message::Notification(Notification::new( + method, params, + ))) + .map_err(|error| { + anyhow!( + "Failed to send notification (method={method}): {error}", + method = N::METHOD + ) + }) + } + + /// Sends a notification without any parameters to the client. + /// + /// This is useful for notifications that don't require any data. + #[expect(dead_code)] + pub(crate) fn send_notification_no_params(&self, method: &str) -> crate::Result<()> { + self.client_sender + .send(lsp_server::Message::Notification(Notification::new( + method.to_string(), + Value::Null, + ))) + .map_err(|error| anyhow!("Failed to send notification (method={method}): {error}",)) + } + + /// Sends a response to the client for a given request ID. + /// + /// The response isn't sent immediately. Instead, it's queued up in the main loop + /// and checked for cancellation (each request must have exactly one response). + pub(crate) fn respond( + &self, + id: &RequestId, + result: crate::server::Result, + ) -> crate::Result<()> + where + R: serde::Serialize, + { + let response = match result { + Ok(res) => lsp_server::Response::new_ok(id.clone(), res), + Err(crate::server::Error { code, error }) => { + lsp_server::Response::new_err(id.clone(), code as i32, error.to_string()) + } + }; + + self.main_loop_sender + .send(Event::SendResponse(response)) + .map_err(|error| anyhow!("Failed to send response for request {id}: {error}")) + } + + /// Sends an error response to the client for a given request ID. + /// + /// The response isn't sent immediately. Instead, it's queued up in the main loop. + pub(crate) fn respond_err( + &self, + id: RequestId, + error: lsp_server::ResponseError, + ) -> crate::Result<()> { + let response = lsp_server::Response { + id, + result: None, + error: Some(error), + }; + + self.main_loop_sender + .send(Event::SendResponse(response)) + .map_err(|error| anyhow!("Failed to send response: {error}")) + } + + /// Shows a message to the user. + /// + /// This opens a pop up in VS Code showing `message`. + pub(crate) fn show_message( + &self, + message: impl Display, + message_type: lsp_types::MessageType, + ) -> crate::Result<()> { + self.send_notification::( + lsp_types::ShowMessageParams { + typ: message_type, + message: message.to_string(), + }, + ) + } + + /// Sends a request to display a warning to the client with a formatted message. The warning is + /// sent in a `window/showMessage` notification. + /// + /// Logs an error if the message could not be sent. + pub(crate) fn show_warning_message(&self, message: impl Display) { + let result = self.show_message(message, lsp_types::MessageType::WARNING); + + if let Err(err) = result { + tracing::error!("Failed to send warning message to the client: {err}"); + } + } + + /// Sends a request to display an error to the client with a formatted message. The error is + /// sent in a `window/showMessage` notification. + /// + /// Logs an error if the message could not be sent. + pub(crate) fn show_error_message(&self, message: impl Display) { + let result = self.show_message(message, lsp_types::MessageType::ERROR); + + if let Err(err) = result { + tracing::error!("Failed to send error message to the client: {err}"); + } + } + + pub(crate) fn cancel(&self, session: &mut Session, id: RequestId) -> crate::Result<()> { + let method_name = session.request_queue_mut().incoming_mut().cancel(&id); + + if let Some(method_name) = method_name { + tracing::debug!("Cancelled request id={id} method={method_name}"); + let error = ResponseError { + code: ErrorCode::RequestCanceled as i32, + message: "request was cancelled by client".to_owned(), + data: None, + }; + + // Use `client_sender` here instead of `respond_err` because + // `respond_err` filters out responses for canceled requests (which we just did!). + self.client_sender + .send(Message::Response(lsp_server::Response { + id, + result: None, + error: Some(error), + }))?; + } + + Ok(()) + } +} diff --git a/crates/ruff_server/src/session/index.rs b/crates/ruff_server/src/session/index.rs index 2e56499ecc..209b5121c8 100644 --- a/crates/ruff_server/src/session/index.rs +++ b/crates/ruff_server/src/session/index.rs @@ -11,6 +11,7 @@ use thiserror::Error; pub(crate) use ruff_settings::RuffSettings; use crate::edit::LanguageId; +use crate::session::Client; use crate::session::options::Combine; use crate::session::settings::GlobalClientSettings; use crate::workspace::{Workspace, Workspaces}; @@ -73,10 +74,11 @@ impl Index { pub(super) fn new( workspaces: &Workspaces, global: &GlobalClientSettings, + client: &Client, ) -> crate::Result { let mut settings = WorkspaceSettingsIndex::default(); for workspace in &**workspaces { - settings.register_workspace(workspace, global)?; + settings.register_workspace(workspace, global, client)?; } Ok(Self { @@ -173,10 +175,11 @@ impl Index { &mut self, url: Url, global: &GlobalClientSettings, + client: &Client, ) -> crate::Result<()> { // TODO(jane): Find a way for workspace client settings to be added or changed dynamically. self.settings - .register_workspace(&Workspace::new(url), global) + .register_workspace(&Workspace::new(url), global, client) } pub(super) fn close_workspace_folder(&mut self, workspace_url: &Url) -> crate::Result<()> { @@ -259,7 +262,7 @@ impl Index { /// registered in [`try_register_capabilities`] method. /// /// [`try_register_capabilities`]: crate::server::Server::try_register_capabilities - pub(super) fn reload_settings(&mut self, changes: &[FileEvent]) { + pub(super) fn reload_settings(&mut self, changes: &[FileEvent], client: &Client) { let mut indexed = FxHashSet::default(); for change in changes { @@ -287,6 +290,7 @@ impl Index { indexed.insert(root.clone()); settings.ruff_settings = ruff_settings::RuffSettingsIndex::new( + client, root, settings.client_settings.editor_settings(), false, @@ -415,11 +419,14 @@ impl WorkspaceSettingsIndex { &mut self, workspace: &Workspace, global: &GlobalClientSettings, + client: &Client, ) -> crate::Result<()> { let workspace_url = workspace.url(); if workspace_url.scheme() != "file" { tracing::info!("Ignoring non-file workspace URL: {workspace_url}"); - show_warn_msg!("Ruff does not support non-file workspaces; Ignoring {workspace_url}"); + client.show_warning_message(format_args!( + "Ruff does not support non-file workspaces; Ignoring {workspace_url}" + )); return Ok(()); } let workspace_path = workspace_url.to_file_path().map_err(|()| { @@ -431,10 +438,10 @@ impl WorkspaceSettingsIndex { let settings = match options.into_settings() { Ok(settings) => settings, Err(settings) => { - show_err_msg!( + client.show_error_message(format_args!( "The settings for the workspace {workspace_path} are invalid. Refer to the logs for more information.", workspace_path = workspace_path.display() - ); + )); settings } }; @@ -444,6 +451,7 @@ impl WorkspaceSettingsIndex { }; let workspace_settings_index = ruff_settings::RuffSettingsIndex::new( + client, &workspace_path, client_settings.editor_settings(), workspace.is_default(), diff --git a/crates/ruff_server/src/session/index/ruff_settings.rs b/crates/ruff_server/src/session/index/ruff_settings.rs index 0f58ac2d94..658a2a08cc 100644 --- a/crates/ruff_server/src/session/index/ruff_settings.rs +++ b/crates/ruff_server/src/session/index/ruff_settings.rs @@ -18,6 +18,7 @@ use ruff_workspace::{ resolver::ConfigurationTransformer, }; +use crate::session::Client; use crate::session::options::ConfigurationPreference; use crate::session::settings::{EditorSettings, ResolvedConfiguration}; @@ -155,6 +156,7 @@ impl RuffSettingsIndex { /// server will be running in a single file mode, then only (1) and (2) will be resolved, /// skipping (3). pub(super) fn new( + client: &Client, root: &Path, editor_settings: &EditorSettings, is_default_workspace: bool, @@ -242,10 +244,10 @@ impl RuffSettingsIndex { // means for different editors. if is_default_workspace { if has_error { - show_err_msg!( + client.show_error_message(format!( "Error while resolving settings from workspace {}. Please refer to the logs for more details.", root.display() - ); + )); } return RuffSettingsIndex { index, fallback }; @@ -358,10 +360,10 @@ impl RuffSettingsIndex { }); if has_error.load(Ordering::Relaxed) { - show_err_msg!( + client.show_error_message(format!( "Error while resolving settings from workspace {}. Please refer to the logs for more details.", root.display() - ); + )); } RuffSettingsIndex { diff --git a/crates/ruff_server/src/session/options.rs b/crates/ruff_server/src/session/options.rs index 47ce2fa83e..0a23e712c7 100644 --- a/crates/ruff_server/src/session/options.rs +++ b/crates/ruff_server/src/session/options.rs @@ -7,8 +7,9 @@ use serde_json::{Map, Value}; use ruff_linter::{RuleSelector, line_width::LineLength, rule_selector::ParseError}; -use crate::session::settings::{ - ClientSettings, EditorSettings, GlobalClientSettings, ResolvedConfiguration, +use crate::session::{ + Client, + settings::{ClientSettings, EditorSettings, GlobalClientSettings, ResolvedConfiguration}, }; pub(crate) type WorkspaceOptionsMap = FxHashMap; @@ -62,10 +63,11 @@ impl GlobalOptions { &self.client } - pub fn into_settings(self) -> GlobalClientSettings { + pub fn into_settings(self, client: Client) -> GlobalClientSettings { GlobalClientSettings { options: self.client, settings: std::cell::OnceCell::default(), + client, } } } @@ -367,12 +369,12 @@ pub(crate) struct AllOptions { impl AllOptions { /// Initializes the controller from the serialized initialization options. /// This fails if `options` are not valid initialization options. - pub(crate) fn from_value(options: serde_json::Value) -> Self { + pub(crate) fn from_value(options: serde_json::Value, client: &Client) -> Self { Self::from_init_options( serde_json::from_value(options) .map_err(|err| { tracing::error!("Failed to deserialize initialization options: {err}. Falling back to default client settings..."); - show_err_msg!("Ruff received invalid client settings - falling back to default client settings."); + client.show_error_message("Ruff received invalid client settings - falling back to default client settings."); }) .unwrap_or_default(), ) @@ -896,10 +898,14 @@ mod tests { #[test] fn test_global_only_resolves_correctly() { + let (main_loop_sender, main_loop_receiver) = crossbeam::channel::unbounded(); + let (client_sender, client_receiver) = crossbeam::channel::unbounded(); + let options = deserialize_fixture(GLOBAL_ONLY_INIT_OPTIONS_FIXTURE); let AllOptions { global, .. } = AllOptions::from_init_options(options); - let global = global.into_settings(); + let client = Client::new(main_loop_sender, client_sender); + let global = global.into_settings(client); assert_eq!( global.to_settings(), &ClientSettings { @@ -922,6 +928,9 @@ mod tests { }, } ); + + assert!(main_loop_receiver.is_empty()); + assert!(client_receiver.is_empty()); } #[test] @@ -959,6 +968,10 @@ mod tests { #[test] fn inline_configuration() { + let (main_loop_sender, main_loop_receiver) = crossbeam::channel::unbounded(); + let (client_sender, client_receiver) = crossbeam::channel::unbounded(); + let client = Client::new(main_loop_sender, client_sender); + let options: InitializationOptions = deserialize_fixture(INLINE_CONFIGURATION_FIXTURE); let AllOptions { @@ -969,7 +982,7 @@ mod tests { panic!("Expected global settings only"); }; - let global = global.into_settings(); + let global = global.into_settings(client); assert_eq!( global.to_settings(), @@ -1001,5 +1014,8 @@ mod tests { } } ); + + assert!(main_loop_receiver.is_empty()); + assert!(client_receiver.is_empty()); } } diff --git a/crates/ruff_server/src/session/request_queue.rs b/crates/ruff_server/src/session/request_queue.rs new file mode 100644 index 0000000000..68696050bf --- /dev/null +++ b/crates/ruff_server/src/session/request_queue.rs @@ -0,0 +1,198 @@ +use crate::session::client::ClientResponseHandler; +use lsp_server::RequestId; +use rustc_hash::FxHashMap; +use std::cell::{Cell, OnceCell, RefCell}; +use std::fmt::Formatter; +use std::sync::Arc; +use std::sync::atomic::AtomicBool; +use std::time::Instant; + +/// Tracks the pending requests between client and server. +pub(crate) struct RequestQueue { + incoming: Incoming, + outgoing: Outgoing, +} + +impl RequestQueue { + pub(super) fn new() -> Self { + Self { + incoming: Incoming::default(), + outgoing: Outgoing::default(), + } + } + + pub(crate) fn outgoing_mut(&mut self) -> &mut Outgoing { + &mut self.outgoing + } + + /// Returns the server to client request queue. + pub(crate) fn outgoing(&self) -> &Outgoing { + &self.outgoing + } + + /// Returns the client to server request queue. + pub(crate) fn incoming(&self) -> &Incoming { + &self.incoming + } + + pub(crate) fn incoming_mut(&mut self) -> &mut Incoming { + &mut self.incoming + } +} + +/// Requests from client -> server. +/// +/// Tracks which requests are pending. Requests that aren't registered are considered completed. +/// +/// A request is pending if: +/// +/// * it has been registered +/// * it hasn't been cancelled +/// * it hasn't been completed +/// +/// Tracking whether a request is pending is required to ensure that the server sends exactly +/// one response for every request as required by the LSP specification. +#[derive(Default, Debug)] +pub(crate) struct Incoming { + pending: FxHashMap, +} + +impl Incoming { + /// Registers a new pending request. + pub(crate) fn register(&mut self, request_id: RequestId, method: String) { + self.pending.insert(request_id, PendingRequest::new(method)); + } + + /// Cancels the pending request with the given id. + /// + /// Returns the method name if the request was still pending, `None` if it was already completed. + pub(super) fn cancel(&mut self, request_id: &RequestId) -> Option { + self.pending.remove(request_id).map(|mut pending| { + if let Some(cancellation_token) = pending.cancellation_token.take() { + cancellation_token.cancel(); + } + pending.method + }) + } + + /// Returns `true` if the request with the given id is still pending. + #[expect(dead_code)] + pub(crate) fn is_pending(&self, request_id: &RequestId) -> bool { + self.pending.contains_key(request_id) + } + + /// Returns the cancellation token for the given request id if the request is still pending. + pub(crate) fn cancellation_token( + &self, + request_id: &RequestId, + ) -> Option { + let pending = self.pending.get(request_id)?; + + Some(RequestCancellationToken::clone( + pending + .cancellation_token + .get_or_init(RequestCancellationToken::default), + )) + } + + /// Marks the request as completed. + /// + /// Returns the time when the request was registered and the request method name, or `None` if the request was not pending. + pub(crate) fn complete(&mut self, request_id: &RequestId) -> Option<(Instant, String)> { + self.pending + .remove(request_id) + .map(|pending| (pending.start_time, pending.method)) + } +} + +/// A request from the client to the server that hasn't been responded yet. +#[derive(Debug)] +struct PendingRequest { + /// The time when the request was registered. + /// + /// This does not include the time the request was queued in the main loop before it was registered. + start_time: Instant, + + /// The method name of the request. + method: String, + + /// A cancellation token to cancel this request. + /// + /// This is only initialized for background requests. Local tasks don't support cancellation (unless retried) + /// as they're processed immediately after receiving the request; Making it impossible for a + /// cancellation message to be processed before the task is completed. + cancellation_token: OnceCell, +} + +impl PendingRequest { + fn new(method: String) -> Self { + Self { + start_time: Instant::now(), + method, + cancellation_token: OnceCell::new(), + } + } +} + +/// Token to cancel a specific request. +/// +/// Can be shared between threads to check for cancellation *after* a request has been scheduled. +#[derive(Debug, Default)] +pub(crate) struct RequestCancellationToken(Arc); + +impl RequestCancellationToken { + /// Returns true if the request was cancelled. + pub(crate) fn is_cancelled(&self) -> bool { + self.0.load(std::sync::atomic::Ordering::Relaxed) + } + + /// Signals that the request should not be processed because it was cancelled. + fn cancel(&self) { + self.0.store(true, std::sync::atomic::Ordering::Relaxed); + } + + fn clone(this: &Self) -> Self { + RequestCancellationToken(this.0.clone()) + } +} + +/// Requests from server -> client. +#[derive(Default)] +pub(crate) struct Outgoing { + /// The id of the next request sent from the server to the client. + next_request_id: Cell, + + /// A map of request ids to the handlers that process the client-response. + response_handlers: RefCell>, +} + +impl Outgoing { + /// Registers a handler, returns the id for the request. + #[must_use] + pub(crate) fn register(&self, handler: ClientResponseHandler) -> RequestId { + let id = self.next_request_id.get(); + self.next_request_id.set(id + 1); + + self.response_handlers + .borrow_mut() + .insert(id.into(), handler); + id.into() + } + + /// Marks the request with the given id as complete and returns the handler to process the response. + /// + /// Returns `None` if the request was not found. + #[must_use] + pub(crate) fn complete(&mut self, request_id: &RequestId) -> Option { + self.response_handlers.get_mut().remove(request_id) + } +} + +impl std::fmt::Debug for Outgoing { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + f.debug_struct("Outgoing") + .field("next_request_id", &self.next_request_id) + .field("response_handlers", &"") + .finish() + } +} diff --git a/crates/ruff_server/src/session/settings.rs b/crates/ruff_server/src/session/settings.rs index 4015109e74..94cf41c827 100644 --- a/crates/ruff_server/src/session/settings.rs +++ b/crates/ruff_server/src/session/settings.rs @@ -8,7 +8,10 @@ use ruff_workspace::options::Options; use crate::{ ClientOptions, - session::options::{ClientConfiguration, ConfigurationPreference}, + session::{ + Client, + options::{ClientConfiguration, ConfigurationPreference}, + }, }; pub struct GlobalClientSettings { @@ -20,6 +23,8 @@ pub struct GlobalClientSettings { /// when the workspace settings e.g. select some rules that aren't available in a specific workspace /// and said workspace overrides the selected rules. pub(super) settings: std::cell::OnceCell>, + + pub(super) client: Client, } impl GlobalClientSettings { @@ -33,7 +38,7 @@ impl GlobalClientSettings { let settings = match settings { Ok(settings) => settings, Err(settings) => { - show_err_msg!( + self.client.show_error_message( "Ruff received invalid settings from the editor. Refer to the logs for more information." ); settings diff --git a/crates/ruff_server/tests/notebook.rs b/crates/ruff_server/tests/notebook.rs index 6d3e6be1db..504d299048 100644 --- a/crates/ruff_server/tests/notebook.rs +++ b/crates/ruff_server/tests/notebook.rs @@ -8,7 +8,7 @@ use lsp_types::{ Position, Range, TextDocumentContentChangeEvent, VersionedTextDocumentIdentifier, }; use ruff_notebook::SourceValue; -use ruff_server::{ClientOptions, GlobalOptions, Workspace, Workspaces}; +use ruff_server::{Client, ClientOptions, GlobalOptions, Workspace, Workspaces}; const SUPER_RESOLUTION_OVERVIEW_PATH: &str = "./resources/test/fixtures/tensorflow_test_notebook.ipynb"; @@ -28,8 +28,13 @@ fn super_resolution_overview() { insta::assert_snapshot!("initial_notebook", notebook_source(¬ebook)); + let (main_loop_sender, main_loop_receiver) = crossbeam::channel::unbounded(); + let (client_sender, client_receiver) = crossbeam::channel::unbounded(); + + let client = Client::new(main_loop_sender, client_sender); + let options = GlobalOptions::default(); - let global = options.into_settings(); + let global = options.into_settings(client.clone()); let mut session = ruff_server::Session::new( &ClientCapabilities::default(), @@ -39,6 +44,7 @@ fn super_resolution_overview() { Workspace::new(lsp_types::Url::from_file_path(file_path.parent().unwrap()).unwrap()) .with_options(ClientOptions::default()), ]), + &client, ) .unwrap(); @@ -307,6 +313,9 @@ fn super_resolution_overview() { "changed_notebook", notebook_source(snapshot.query().as_notebook().unwrap()) ); + + assert!(client_receiver.is_empty()); + assert!(main_loop_receiver.is_empty()); } fn notebook_source(notebook: &ruff_server::NotebookDocument) -> String { diff --git a/crates/ty_server/src/server/api.rs b/crates/ty_server/src/server/api.rs index f937adb2e4..cf1ffcdaad 100644 --- a/crates/ty_server/src/server/api.rs +++ b/crates/ty_server/src/server/api.rs @@ -331,7 +331,7 @@ where .with_failure_code(server::ErrorCode::InternalError) } -/// Sends back a response to the server using a [`Responder`]. +/// Sends back a response to the server, but only if the request wasn't cancelled. fn respond( id: &RequestId, result: Result<<::RequestType as Request>::Result>, diff --git a/crates/ty_server/src/server/main_loop.rs b/crates/ty_server/src/server/main_loop.rs index 456e9d88b2..6b05f39317 100644 --- a/crates/ty_server/src/server/main_loop.rs +++ b/crates/ty_server/src/server/main_loop.rs @@ -1,4 +1,3 @@ -use crate::Session; use crate::server::schedule::Scheduler; use crate::server::{Server, api}; use crate::session::client::Client; @@ -79,7 +78,7 @@ impl Server { .outgoing_mut() .complete(&response.id) { - handler(&self.session, response); + handler(&client, response); } else { tracing::error!( "Received a response with ID {}, which was not expected", @@ -203,7 +202,7 @@ impl Server { .unwrap(), ), }; - let response_handler = move |_session: &Session, ()| { + let response_handler = move |_: &Client, ()| { tracing::info!("File watcher successfully registered"); }; diff --git a/crates/ty_server/src/server/schedule/thread/pool.rs b/crates/ty_server/src/server/schedule/thread/pool.rs index 88abbd8e42..c49d642a77 100644 --- a/crates/ty_server/src/server/schedule/thread/pool.rs +++ b/crates/ty_server/src/server/schedule/thread/pool.rs @@ -51,7 +51,7 @@ impl Pool { let threads = usize::from(threads); - let (job_sender, job_receiver) = crossbeam::channel::bounded(std::cmp::max(threads * 2, 4)); + let (job_sender, job_receiver) = crossbeam::channel::bounded(std::cmp::min(threads * 2, 4)); let extant_tasks = Arc::new(AtomicUsize::new(0)); let mut handles = Vec::with_capacity(threads); diff --git a/crates/ty_server/src/session/client.rs b/crates/ty_server/src/session/client.rs index 4d54c0fd63..f9506e375b 100644 --- a/crates/ty_server/src/session/client.rs +++ b/crates/ty_server/src/session/client.rs @@ -7,7 +7,7 @@ use serde_json::Value; use std::any::TypeId; use std::fmt::Display; -pub(crate) type ClientResponseHandler = Box; +pub(crate) type ClientResponseHandler = Box; #[derive(Debug)] pub(crate) struct Client { @@ -44,53 +44,51 @@ impl Client { &self, session: &Session, params: R::Params, - response_handler: impl FnOnce(&Session, R::Result) + Send + 'static, + response_handler: impl FnOnce(&Client, R::Result) + Send + 'static, ) -> crate::Result<()> where R: lsp_types::request::Request, { - let response_handler = Box::new( - move |session: &Session, response: lsp_server::Response| { - let _span = - tracing::debug_span!("client_response", id=%response.id, method = R::METHOD) - .entered(); + let response_handler = Box::new(move |client: &Client, response: lsp_server::Response| { + let _span = + tracing::debug_span!("client_response", id=%response.id, method = R::METHOD) + .entered(); - match (response.error, response.result) { - (Some(err), _) => { + match (response.error, response.result) { + (Some(err), _) => { + tracing::error!( + "Got an error from the client (code {code}, method {method}): {message}", + code = err.code, + message = err.message, + method = R::METHOD + ); + } + (None, Some(response)) => match serde_json::from_value(response) { + Ok(response) => response_handler(client, response), + Err(error) => { tracing::error!( - "Got an error from the client (code {code}, method {method}): {message}", - code = err.code, - message = err.message, + "Failed to deserialize client response (method={method}): {error}", method = R::METHOD ); } - (None, Some(response)) => match serde_json::from_value(response) { - Ok(response) => response_handler(session, response), - Err(error) => { - tracing::error!( - "Failed to deserialize client response (method={method}): {error}", - method = R::METHOD - ); - } - }, - (None, None) => { - if TypeId::of::() == TypeId::of::<()>() { - // We can't call `response_handler(())` directly here, but - // since we _know_ the type expected is `()`, we can use - // `from_value(Value::Null)`. `R::Result` implements `DeserializeOwned`, - // so this branch works in the general case but we'll only - // hit it if the concrete type is `()`, so the `unwrap()` is safe here. - response_handler(session, serde_json::from_value(Value::Null).unwrap()); - } else { - tracing::error!( - "Invalid client response: did not contain a result or error (method={method})", - method = R::METHOD - ); - } + }, + (None, None) => { + if TypeId::of::() == TypeId::of::<()>() { + // We can't call `response_handler(())` directly here, but + // since we _know_ the type expected is `()`, we can use + // `from_value(Value::Null)`. `R::Result` implements `DeserializeOwned`, + // so this branch works in the general case but we'll only + // hit it if the concrete type is `()`, so the `unwrap()` is safe here. + response_handler(client, serde_json::from_value(Value::Null).unwrap()); + } else { + tracing::error!( + "Invalid client response: did not contain a result or error (method={method})", + method = R::METHOD + ); } } - }, - ); + } + }); let id = session .request_queue()