diff --git a/crates/g3-cli/src/g3_status.rs b/crates/g3-cli/src/g3_status.rs index 18cb6a0..d21f30a 100644 --- a/crates/g3-cli/src/g3_status.rs +++ b/crates/g3-cli/src/g3_status.rs @@ -284,6 +284,24 @@ impl G3Status { ); Self::done(); } + + /// Print research completion notification: "g3: N research report(s) ... [done/failed]" + /// + /// Used for real-time notification when background research completes. + pub fn research_complete(count: usize, all_succeeded: bool) { + let report_word = if count == 1 { "report" } else { "reports" }; + print!( + "{} {} research {} ...", + Self::format_prefix(), + count, + report_word + ); + if all_succeeded { + Self::done(); + } else { + Self::failed(); + } + } } #[cfg(test)] diff --git a/crates/g3-cli/src/interactive.rs b/crates/g3-cli/src/interactive.rs index 18ce2fc..da0929c 100644 --- a/crates/g3-cli/src/interactive.rs +++ b/crates/g3-cli/src/interactive.rs @@ -6,7 +6,10 @@ use rustyline::error::ReadlineError; use rustyline::{Config, Editor}; use crate::completion::G3Helper; use std::path::Path; +use std::sync::Arc; +use std::sync::atomic::{AtomicBool, Ordering}; use tracing::{debug, error}; +use tokio::sync::broadcast; use g3_core::ui_writer::UiWriter; use g3_core::Agent; @@ -67,6 +70,57 @@ async fn execute_user_input( } } +/// Spawn a background task to handle research completion notifications. +/// +/// This task listens for research completions and prints status messages in real-time. +/// When g3 is idle (waiting for input), it reprints the prompt after the notification. +/// When g3 is busy (processing), it just prints the notification (interleaving is fine). +/// +/// Returns a handle to the spawned task and an `is_busy` flag that should be set +/// to true while the agent is processing and false when waiting for input. +fn spawn_research_notification_handler( + mut rx: broadcast::Receiver, + is_busy: Arc, + prompt: Arc>, +) -> tokio::task::JoinHandle<()> { + tokio::spawn(async move { + loop { + match rx.recv().await { + Ok(notification) => { + use std::io::Write; + + let succeeded = notification.status == g3_core::ResearchStatus::Complete; + + // Print the completion notification + // If we're idle (at prompt), we need to print on a new line first + let busy = is_busy.load(Ordering::SeqCst); + if !busy { + // Clear the current line (prompt) and move to start + print!("\r\x1b[K"); + } + + G3Status::research_complete(1, succeeded); + + // If we're idle, reprint the prompt + if !busy { + let prompt_str = prompt.read().unwrap().clone(); + print!("{}", prompt_str); + let _ = std::io::stdout().flush(); + } + } + Err(broadcast::error::RecvError::Closed) => { + // Channel closed, exit the task + break; + } + Err(broadcast::error::RecvError::Lagged(_)) => { + // Missed some messages, continue + continue; + } + } + } + }) +} + /// Run interactive mode with console output. /// If `agent_name` is Some, we're in agent+chat mode: skip session resume/verbose welcome, /// and use the agent name as the prompt (e.g., "butler>"). @@ -186,6 +240,16 @@ pub async fn run_interactive( let _ = rl.load_history(history_path); } + // Enable research completion notifications for real-time updates + let research_rx = agent.enable_research_notifications(); + let is_busy = Arc::new(AtomicBool::new(false)); + let current_prompt = Arc::new(std::sync::RwLock::new(String::new())); + let _notification_handle = spawn_research_notification_handler( + research_rx, + is_busy.clone(), + current_prompt.clone(), + ); + // Track multiline input let mut multiline_buffer = String::new(); let mut in_multiline = false; @@ -221,6 +285,10 @@ pub async fn run_interactive( // Build prompt let prompt = build_prompt(in_multiline, agent_name, &active_project); + + // Update the shared prompt for the notification handler + *current_prompt.write().unwrap() = prompt.clone(); + is_busy.store(false, Ordering::SeqCst); let readline = rl.readline(&prompt); match readline { @@ -258,9 +326,11 @@ pub async fn run_interactive( // Reprint input with formatting reprint_formatted_input(&input, &prompt); + is_busy.store(true, Ordering::SeqCst); execute_user_input( &mut agent, &input, show_prompt, show_code, &output, from_agent_mode ).await; + is_busy.store(false, Ordering::SeqCst); } else { // Single line input let input = line.trim().to_string(); @@ -278,7 +348,11 @@ pub async fn run_interactive( // Check for control commands if input.starts_with('/') { - if handle_command(&input, &mut agent, workspace_path, &output, &mut active_project, &mut rl, show_prompt, show_code).await? { + is_busy.store(true, Ordering::SeqCst); + let handled = handle_command(&input, &mut agent, workspace_path, &output, &mut active_project, &mut rl, show_prompt, show_code).await?; + is_busy.store(false, Ordering::SeqCst); + + if handled { continue; } } @@ -286,9 +360,11 @@ pub async fn run_interactive( // Reprint input with formatting reprint_formatted_input(&input, &prompt); + is_busy.store(true, Ordering::SeqCst); execute_user_input( &mut agent, &input, show_prompt, show_code, &output, from_agent_mode ).await; + is_busy.store(false, Ordering::SeqCst); } } Err(ReadlineError::Interrupted) => { diff --git a/crates/g3-core/src/lib.rs b/crates/g3-core/src/lib.rs index 0fb64dd..bec5498 100644 --- a/crates/g3-core/src/lib.rs +++ b/crates/g3-core/src/lib.rs @@ -38,6 +38,9 @@ pub use task_result::TaskResult; // Re-export context window types pub use context_window::{ContextWindow, ThinResult, ThinScope}; +// Re-export pending research types for notification handling +pub use pending_research::{PendingResearchManager, ResearchCompletionNotification, ResearchStatus}; + // Export agent prompt generation for CLI use pub use prompts::get_agent_system_prompt; @@ -1484,6 +1487,26 @@ impl Agent { &self.pending_research_manager } + /// Subscribe to research completion notifications. + /// + /// Returns a receiver that will receive notifications when research tasks complete. + /// Returns None if the agent was not configured with notifications enabled. + /// Use this in interactive mode to get real-time updates when research finishes. + pub fn subscribe_research_notifications(&self) -> Option> { + self.pending_research_manager.subscribe() + } + + /// Enable research completion notifications and return a receiver. + /// + /// This replaces the internal research manager with one that sends notifications. + /// Call this once during setup (e.g., in interactive mode) before any research tasks. + /// Returns a receiver that will receive notifications when research tasks complete. + pub fn enable_research_notifications(&mut self) -> tokio::sync::broadcast::Receiver { + let (manager, rx) = pending_research::PendingResearchManager::with_notifications(); + self.pending_research_manager = manager; + rx + } + pub fn set_requirements_sha(&mut self, sha: String) { self.requirements_sha = Some(sha); } diff --git a/crates/g3-core/src/pending_research.rs b/crates/g3-core/src/pending_research.rs index 2159fb2..0b3e9ee 100644 --- a/crates/g3-core/src/pending_research.rs +++ b/crates/g3-core/src/pending_research.rs @@ -2,7 +2,8 @@ //! //! This module manages research tasks that run in the background while the agent //! continues with other work. Research results are stored until they can be -//! injected into the conversation at a natural break point. +//! injected into the conversation at a natural break point. Completion notifications +//! are sent via a channel for real-time UI updates. use serde::{Deserialize, Serialize}; use std::collections::HashMap; @@ -80,10 +81,23 @@ impl ResearchTask { } } +/// Notification sent when a research task completes (success or failure) +#[derive(Debug, Clone)] +pub struct ResearchCompletionNotification { + /// The research ID that completed + pub id: ResearchId, + /// Whether it succeeded or failed + pub status: ResearchStatus, + /// The query that was researched + pub query: String, +} + /// Thread-safe manager for pending research tasks #[derive(Debug, Clone)] pub struct PendingResearchManager { tasks: Arc>>, + /// Channel sender for completion notifications (optional, for UI updates) + completion_tx: Option>, } impl Default for PendingResearchManager { @@ -97,9 +111,31 @@ impl PendingResearchManager { pub fn new() -> Self { Self { tasks: Arc::new(Mutex::new(HashMap::new())), + completion_tx: None, } } + /// Create a new pending research manager with completion notifications enabled. + /// + /// Returns the manager and a receiver for completion notifications. + /// The receiver can be used to get real-time updates when research completes. + pub fn with_notifications() -> (Self, tokio::sync::broadcast::Receiver) { + // Buffer size of 16 should be plenty for concurrent research tasks + let (tx, rx) = tokio::sync::broadcast::channel(16); + let manager = Self { + tasks: Arc::new(Mutex::new(HashMap::new())), + completion_tx: Some(tx), + }; + (manager, rx) + } + + /// Subscribe to completion notifications. + /// + /// Returns None if notifications are not enabled (manager created with `new()`). + pub fn subscribe(&self) -> Option> { + self.completion_tx.as_ref().map(|tx| tx.subscribe()) + } + /// Generate a unique research ID pub fn generate_id() -> ResearchId { use std::time::{SystemTime, UNIX_EPOCH}; @@ -127,21 +163,47 @@ impl PendingResearchManager { /// Update a research task with its result pub fn complete(&self, id: &ResearchId, result: String) { - let mut tasks = self.tasks.lock().unwrap(); - if let Some(task) = tasks.get_mut(id) { - task.status = ResearchStatus::Complete; - task.result = Some(result); - debug!("Research task {} completed successfully", id); + let notification = { + let mut tasks = self.tasks.lock().unwrap(); + if let Some(task) = tasks.get_mut(id) { + task.status = ResearchStatus::Complete; + task.result = Some(result); + debug!("Research task {} completed successfully", id); + Some(ResearchCompletionNotification { + id: id.clone(), + status: ResearchStatus::Complete, + query: task.query.clone(), + }) + } else { + None + } + }; + // Send notification outside the lock to avoid potential deadlocks + if let (Some(notification), Some(tx)) = (notification, &self.completion_tx) { + let _ = tx.send(notification); // Ignore error if no receivers } } /// Mark a research task as failed pub fn fail(&self, id: &ResearchId, error: String) { - let mut tasks = self.tasks.lock().unwrap(); - if let Some(task) = tasks.get_mut(id) { - task.status = ResearchStatus::Failed; - task.result = Some(error); - debug!("Research task {} failed", id); + let notification = { + let mut tasks = self.tasks.lock().unwrap(); + if let Some(task) = tasks.get_mut(id) { + task.status = ResearchStatus::Failed; + task.result = Some(error); + debug!("Research task {} failed", id); + Some(ResearchCompletionNotification { + id: id.clone(), + status: ResearchStatus::Failed, + query: task.query.clone(), + }) + } else { + None + } + }; + // Send notification outside the lock to avoid potential deadlocks + if let (Some(notification), Some(tx)) = (notification, &self.completion_tx) { + let _ = tx.send(notification); // Ignore error if no receivers } } @@ -433,4 +495,43 @@ mod tests { let unique: std::collections::HashSet<_> = ids.iter().collect(); assert_eq!(ids.len(), unique.len(), "Generated IDs should be unique"); } + + #[tokio::test] + async fn test_notifications_on_complete() { + let (manager, mut rx) = PendingResearchManager::with_notifications(); + + let id = manager.register("Test query"); + + // Complete the research + manager.complete(&id, "Report content".to_string()); + + // Should receive a notification + let notification = rx.recv().await.unwrap(); + assert_eq!(notification.id, id); + assert_eq!(notification.status, ResearchStatus::Complete); + assert_eq!(notification.query, "Test query"); + } + + #[tokio::test] + async fn test_notifications_on_fail() { + let (manager, mut rx) = PendingResearchManager::with_notifications(); + + let id = manager.register("Test query"); + + // Fail the research + manager.fail(&id, "Connection error".to_string()); + + // Should receive a notification + let notification = rx.recv().await.unwrap(); + assert_eq!(notification.id, id); + assert_eq!(notification.status, ResearchStatus::Failed); + assert_eq!(notification.query, "Test query"); + } + + #[test] + fn test_no_notifications_without_setup() { + let manager = PendingResearchManager::new(); + // subscribe() should return None when notifications not enabled + assert!(manager.subscribe().is_none()); + } }