From be5a62f7e509d56b7d60da6bfc4d1a2f60a67833 Mon Sep 17 00:00:00 2001 From: Micha Reiser Date: Fri, 24 Oct 2025 09:06:19 +0200 Subject: [PATCH] [ty] Timeout based workspace diagnostic progress reports (#21019) --- .../api/requests/workspace_diagnostic.rs | 100 ++++++++++-------- 1 file changed, 57 insertions(+), 43 deletions(-) diff --git a/crates/ty_server/src/server/api/requests/workspace_diagnostic.rs b/crates/ty_server/src/server/api/requests/workspace_diagnostic.rs index 8618771bfd..c990d4f4af 100644 --- a/crates/ty_server/src/server/api/requests/workspace_diagnostic.rs +++ b/crates/ty_server/src/server/api/requests/workspace_diagnostic.rs @@ -23,8 +23,8 @@ use ruff_db::files::File; use rustc_hash::FxHashMap; use serde::{Deserialize, Serialize}; use std::collections::BTreeMap; -use std::sync::atomic::{AtomicUsize, Ordering}; -use std::time::Instant; +use std::sync::Mutex; +use std::time::{Duration, Instant}; use ty_project::{Db, ProgressReporter}; /// Handler for [Workspace diagnostics](workspace-diagnostics) @@ -199,77 +199,64 @@ impl RetriableRequestHandler for WorkspaceDiagnosticRequestHandler { /// /// Diagnostics are only streamed if the client sends a partial result token. struct WorkspaceDiagnosticsProgressReporter<'a> { - total_files: usize, - checked_files: AtomicUsize, work_done: LazyWorkDoneProgress, - response: std::sync::Mutex>, + state: Mutex>, } impl<'a> WorkspaceDiagnosticsProgressReporter<'a> { fn new(work_done: LazyWorkDoneProgress, response: ResponseWriter<'a>) -> Self { Self { - total_files: 0, - checked_files: AtomicUsize::new(0), + state: Mutex::new(ProgressReporterState { + total_files: 0, + checked_files: 0, + last_response_sent: Instant::now(), + response, + }), work_done, - response: std::sync::Mutex::new(response), } } fn into_final_report(self) -> WorkspaceDiagnosticReportResult { - let writer = self.response.into_inner().unwrap(); - writer.into_final_report() - } - - fn report_progress(&self) { - let checked = self.checked_files.load(Ordering::Relaxed); - let total = self.total_files; - - #[allow(clippy::cast_possible_truncation)] - let percentage = if total > 0 { - Some((checked * 100 / total) as u32) - } else { - None - }; - - self.work_done - .report_progress(format!("{checked}/{total} files"), percentage); - - if checked == total { - self.work_done - .set_finish_message(format!("Checked {total} files")); - } + let state = self.state.into_inner().unwrap(); + state.response.into_final_report() } } impl ProgressReporter for WorkspaceDiagnosticsProgressReporter<'_> { fn set_files(&mut self, files: usize) { - self.total_files += files; - self.report_progress(); + let state = self.state.get_mut().unwrap(); + state.total_files += files; + state.report_progress(&self.work_done); } fn report_checked_file(&self, db: &dyn Db, file: File, diagnostics: &[Diagnostic]) { - let checked = self.checked_files.fetch_add(1, Ordering::Relaxed) + 1; - - if checked.is_multiple_of(100) || checked == self.total_files { - // Report progress every 100 files or when all files are checked - self.report_progress(); - } - // Another thread might have panicked at this point because of a salsa cancellation which // poisoned the result. If the response is poisoned, just don't report and wait for our thread // to unwind with a salsa cancellation next. - let Ok(mut response) = self.response.lock() else { + let Ok(mut state) = self.state.lock() else { return; }; + state.checked_files += 1; + + if state.checked_files == state.total_files { + state.report_progress(&self.work_done); + } else if state.last_response_sent.elapsed() >= Duration::from_millis(50) { + state.last_response_sent = Instant::now(); + + state.report_progress(&self.work_done); + } + // Don't report empty diagnostics. We clear previous diagnostics in `into_response` // which also handles the case where a file no longer has diagnostics because // it's no longer part of the project. if !diagnostics.is_empty() { - response.write_diagnostics_for_file(db, file, diagnostics); + state + .response + .write_diagnostics_for_file(db, file, diagnostics); } - response.maybe_flush(); + state.response.maybe_flush(); } fn report_diagnostics(&mut self, db: &dyn Db, diagnostics: Vec) { @@ -286,7 +273,7 @@ impl ProgressReporter for WorkspaceDiagnosticsProgressReporter<'_> { } } - let response = self.response.get_mut().unwrap(); + let response = &mut self.state.get_mut().unwrap().response; for (file, diagnostics) in by_file { response.write_diagnostics_for_file(db, file, &diagnostics); @@ -295,6 +282,33 @@ impl ProgressReporter for WorkspaceDiagnosticsProgressReporter<'_> { } } +struct ProgressReporterState<'a> { + total_files: usize, + checked_files: usize, + last_response_sent: Instant, + response: ResponseWriter<'a>, +} + +impl ProgressReporterState<'_> { + fn report_progress(&self, work_done: &LazyWorkDoneProgress) { + let checked = self.checked_files; + let total = self.total_files; + + #[allow(clippy::cast_possible_truncation)] + let percentage = if total > 0 { + Some((checked * 100 / total) as u32) + } else { + None + }; + + work_done.report_progress(format!("{checked}/{total} files"), percentage); + + if checked == total { + work_done.set_finish_message(format!("Checked {total} files")); + } + } +} + #[derive(Debug)] struct ResponseWriter<'a> { mode: ReportingMode,