use std::process::{ExitCode, Termination}; use std::sync::Mutex; use anyhow::{anyhow, Context}; use clap::Parser; use colored::Colorize; use crossbeam::channel as crossbeam_channel; use salsa::plumbing::ZalsaDatabase; use red_knot_python_semantic::{ProgramSettings, SearchPathSettings}; use red_knot_server::run_server; use red_knot_workspace::db::RootDatabase; use red_knot_workspace::site_packages::VirtualEnvironment; use red_knot_workspace::watch; use red_knot_workspace::watch::WorkspaceWatcher; use red_knot_workspace::workspace::WorkspaceMetadata; use ruff_db::system::{OsSystem, System, SystemPath, SystemPathBuf}; use target_version::TargetVersion; use crate::logging::{setup_tracing, Verbosity}; mod logging; mod target_version; mod verbosity; #[derive(Debug, Parser)] #[command( author, name = "red-knot", about = "An extremely fast Python type checker." )] #[command(version)] struct Args { #[command(subcommand)] pub(crate) command: Option, #[arg( long, help = "Changes the current working directory.", long_help = "Changes the current working directory before any specified operations. This affects the workspace and configuration discovery.", value_name = "PATH" )] current_directory: Option, #[arg( long, help = "Path to the virtual environment the project uses", long_help = "\ Path to the virtual environment the project uses. \ If provided, red-knot will use the `site-packages` directory of this virtual environment \ to resolve type information for the project's third-party dependencies.", value_name = "PATH" )] venv_path: Option, #[arg( long, value_name = "DIRECTORY", help = "Custom directory to use for stdlib typeshed stubs" )] custom_typeshed_dir: Option, #[arg( long, value_name = "PATH", help = "Additional path to use as a module-resolution source (can be passed multiple times)" )] extra_search_path: Vec, #[arg( long, help = "Python version to assume when resolving types", default_value_t = TargetVersion::default(), value_name="VERSION") ] target_version: TargetVersion, #[clap(flatten)] verbosity: Verbosity, #[arg( long, help = "Run in watch mode by re-running whenever files change", short = 'W' )] watch: bool, } #[derive(Debug, clap::Subcommand)] pub enum Command { /// Start the language server Server, } #[allow(clippy::print_stdout, clippy::unnecessary_wraps, clippy::print_stderr)] pub fn main() -> ExitStatus { run().unwrap_or_else(|error| { use std::io::Write; // Use `writeln` instead of `eprintln` to avoid panicking when the stderr pipe is broken. let mut stderr = std::io::stderr().lock(); // This communicates that this isn't a linter error but Red Knot itself hard-errored for // some reason (e.g. failed to resolve the configuration) writeln!(stderr, "{}", "Red Knot failed".red().bold()).ok(); // Currently we generally only see one error, but e.g. with io errors when resolving // the configuration it is help to chain errors ("resolving configuration failed" -> // "failed to read file: subdir/pyproject.toml") for cause in error.chain() { writeln!(stderr, " {} {cause}", "Cause:".bold()).ok(); } ExitStatus::Error }) } fn run() -> anyhow::Result { let Args { command, current_directory, custom_typeshed_dir, extra_search_path: extra_paths, venv_path, target_version, verbosity, watch, } = Args::parse_from(std::env::args().collect::>()); if matches!(command, Some(Command::Server)) { return run_server().map(|()| ExitStatus::Success); } let verbosity = verbosity.level(); countme::enable(verbosity.is_trace()); let _guard = setup_tracing(verbosity)?; // The base path to which all CLI arguments are relative to. let cli_base_path = { let cwd = std::env::current_dir().context("Failed to get the current working directory")?; SystemPathBuf::from_path_buf(cwd) .map_err(|path| { anyhow!( "The current working directory '{}' contains non-unicode characters. Red Knot only supports unicode paths.", path.display() ) })? }; let cwd = current_directory .map(|cwd| { if cwd.as_std_path().is_dir() { Ok(SystemPath::absolute(&cwd, &cli_base_path)) } else { Err(anyhow!( "Provided current-directory path '{cwd}' is not a directory." )) } }) .transpose()? .unwrap_or_else(|| cli_base_path.clone()); let system = OsSystem::new(cwd.clone()); let workspace_metadata = WorkspaceMetadata::from_path(system.current_directory(), &system)?; // TODO: Verify the remaining search path settings eagerly. let site_packages = venv_path .map(|path| { VirtualEnvironment::new(path, &OsSystem::new(cli_base_path)) .and_then(|venv| venv.site_packages_directories(&system)) }) .transpose()? .unwrap_or_default(); // TODO: Respect the settings from the workspace metadata. when resolving the program settings. let program_settings = ProgramSettings { target_version: target_version.into(), search_paths: SearchPathSettings { extra_paths, src_root: workspace_metadata.root().to_path_buf(), custom_typeshed: custom_typeshed_dir, site_packages, }, }; // TODO: Use the `program_settings` to compute the key for the database's persistent // cache and load the cache if it exists. let mut db = RootDatabase::new(workspace_metadata, program_settings, system)?; let (main_loop, main_loop_cancellation_token) = MainLoop::new(); // Listen to Ctrl+C and abort the watch mode. let main_loop_cancellation_token = Mutex::new(Some(main_loop_cancellation_token)); ctrlc::set_handler(move || { let mut lock = main_loop_cancellation_token.lock().unwrap(); if let Some(token) = lock.take() { token.stop(); } })?; let exit_status = if watch { main_loop.watch(&mut db)? } else { main_loop.run(&mut db) }; tracing::trace!("Counts for entire CLI run:\n{}", countme::get_all()); std::mem::forget(db); Ok(exit_status) } #[derive(Copy, Clone)] pub enum ExitStatus { /// Checking was successful and there were no errors. Success = 0, /// Checking was successful but there were errors. Failure = 1, /// Checking failed. Error = 2, } impl Termination for ExitStatus { fn report(self) -> ExitCode { ExitCode::from(self as u8) } } struct MainLoop { /// Sender that can be used to send messages to the main loop. sender: crossbeam_channel::Sender, /// Receiver for the messages sent **to** the main loop. receiver: crossbeam_channel::Receiver, /// The file system watcher, if running in watch mode. watcher: Option, } impl MainLoop { fn new() -> (Self, MainLoopCancellationToken) { let (sender, receiver) = crossbeam_channel::bounded(10); ( Self { sender: sender.clone(), receiver, watcher: None, }, MainLoopCancellationToken { sender }, ) } fn watch(mut self, db: &mut RootDatabase) -> anyhow::Result { tracing::debug!("Starting watch mode"); let sender = self.sender.clone(); let watcher = watch::directory_watcher(move |event| { sender.send(MainLoopMessage::ApplyChanges(event)).unwrap(); })?; self.watcher = Some(WorkspaceWatcher::new(watcher, db)); self.run(db); Ok(ExitStatus::Success) } fn run(mut self, db: &mut RootDatabase) -> ExitStatus { self.sender.send(MainLoopMessage::CheckWorkspace).unwrap(); let result = self.main_loop(db); tracing::debug!("Exiting main loop"); result } fn main_loop(&mut self, db: &mut RootDatabase) -> ExitStatus { // Schedule the first check. tracing::debug!("Starting main loop"); let mut revision = 0u64; while let Ok(message) = self.receiver.recv() { match message { MainLoopMessage::CheckWorkspace => { let db = db.snapshot(); let sender = self.sender.clone(); // Spawn a new task that checks the workspace. This needs to be done in a separate thread // to prevent blocking the main loop here. rayon::spawn(move || { if let Ok(result) = db.check() { // Send the result back to the main loop for printing. sender .send(MainLoopMessage::CheckCompleted { result, revision }) .unwrap(); } }); } MainLoopMessage::CheckCompleted { result, revision: check_revision, } => { let has_diagnostics = !result.is_empty(); if check_revision == revision { for diagnostic in result { tracing::error!("{}", diagnostic); } } else { tracing::debug!( "Discarding check result for outdated revision: current: {revision}, result revision: {check_revision}" ); } if self.watcher.is_none() { return if has_diagnostics { ExitStatus::Failure } else { ExitStatus::Success }; } tracing::trace!("Counts after last check:\n{}", countme::get_all()); } MainLoopMessage::ApplyChanges(changes) => { revision += 1; // Automatically cancels any pending queries and waits for them to complete. db.apply_changes(changes); if let Some(watcher) = self.watcher.as_mut() { watcher.update(db); } self.sender.send(MainLoopMessage::CheckWorkspace).unwrap(); } MainLoopMessage::Exit => { // Cancel any pending queries and wait for them to complete. // TODO: Don't use Salsa internal APIs // [Zulip-Thread](https://salsa.zulipchat.com/#narrow/stream/333573-salsa-3.2E0/topic/Expose.20an.20API.20to.20cancel.20other.20queries) let _ = db.zalsa_mut(); return ExitStatus::Success; } } tracing::debug!("Waiting for next main loop message."); } ExitStatus::Success } } #[derive(Debug)] struct MainLoopCancellationToken { sender: crossbeam_channel::Sender, } impl MainLoopCancellationToken { fn stop(self) { self.sender.send(MainLoopMessage::Exit).unwrap(); } } /// Message sent from the orchestrator to the main loop. #[derive(Debug)] enum MainLoopMessage { CheckWorkspace, CheckCompleted { result: Vec, revision: u64 }, ApplyChanges(Vec), Exit, }