Add real-time research completion notifications
When background research completes, g3 now immediately prints a status message instead of waiting for the next user interaction: - Added ResearchCompletionNotification and broadcast channel to PendingResearchManager for push-based notifications - Added spawn_research_notification_handler() in interactive mode that listens for completions in a background task - When idle (at prompt): clears line, prints status, reprints prompt - When busy (processing): prints status inline (interleaving is fine) - Added G3Status::research_complete() for consistent formatting - Added enable_research_notifications() method to Agent Output format: "g3: 1 research report ... [done]"
This commit is contained in:
@@ -284,6 +284,24 @@ impl G3Status {
|
|||||||
);
|
);
|
||||||
Self::done();
|
Self::done();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Print research completion notification: "g3: N research report(s) ... [done/failed]"
|
||||||
|
///
|
||||||
|
/// Used for real-time notification when background research completes.
|
||||||
|
pub fn research_complete(count: usize, all_succeeded: bool) {
|
||||||
|
let report_word = if count == 1 { "report" } else { "reports" };
|
||||||
|
print!(
|
||||||
|
"{} {} research {} ...",
|
||||||
|
Self::format_prefix(),
|
||||||
|
count,
|
||||||
|
report_word
|
||||||
|
);
|
||||||
|
if all_succeeded {
|
||||||
|
Self::done();
|
||||||
|
} else {
|
||||||
|
Self::failed();
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
|
|||||||
@@ -6,7 +6,10 @@ use rustyline::error::ReadlineError;
|
|||||||
use rustyline::{Config, Editor};
|
use rustyline::{Config, Editor};
|
||||||
use crate::completion::G3Helper;
|
use crate::completion::G3Helper;
|
||||||
use std::path::Path;
|
use std::path::Path;
|
||||||
|
use std::sync::Arc;
|
||||||
|
use std::sync::atomic::{AtomicBool, Ordering};
|
||||||
use tracing::{debug, error};
|
use tracing::{debug, error};
|
||||||
|
use tokio::sync::broadcast;
|
||||||
|
|
||||||
use g3_core::ui_writer::UiWriter;
|
use g3_core::ui_writer::UiWriter;
|
||||||
use g3_core::Agent;
|
use g3_core::Agent;
|
||||||
@@ -67,6 +70,57 @@ async fn execute_user_input<W: UiWriter>(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Spawn a background task to handle research completion notifications.
|
||||||
|
///
|
||||||
|
/// This task listens for research completions and prints status messages in real-time.
|
||||||
|
/// When g3 is idle (waiting for input), it reprints the prompt after the notification.
|
||||||
|
/// When g3 is busy (processing), it just prints the notification (interleaving is fine).
|
||||||
|
///
|
||||||
|
/// Returns a handle to the spawned task and an `is_busy` flag that should be set
|
||||||
|
/// to true while the agent is processing and false when waiting for input.
|
||||||
|
fn spawn_research_notification_handler(
|
||||||
|
mut rx: broadcast::Receiver<g3_core::ResearchCompletionNotification>,
|
||||||
|
is_busy: Arc<AtomicBool>,
|
||||||
|
prompt: Arc<std::sync::RwLock<String>>,
|
||||||
|
) -> tokio::task::JoinHandle<()> {
|
||||||
|
tokio::spawn(async move {
|
||||||
|
loop {
|
||||||
|
match rx.recv().await {
|
||||||
|
Ok(notification) => {
|
||||||
|
use std::io::Write;
|
||||||
|
|
||||||
|
let succeeded = notification.status == g3_core::ResearchStatus::Complete;
|
||||||
|
|
||||||
|
// Print the completion notification
|
||||||
|
// If we're idle (at prompt), we need to print on a new line first
|
||||||
|
let busy = is_busy.load(Ordering::SeqCst);
|
||||||
|
if !busy {
|
||||||
|
// Clear the current line (prompt) and move to start
|
||||||
|
print!("\r\x1b[K");
|
||||||
|
}
|
||||||
|
|
||||||
|
G3Status::research_complete(1, succeeded);
|
||||||
|
|
||||||
|
// If we're idle, reprint the prompt
|
||||||
|
if !busy {
|
||||||
|
let prompt_str = prompt.read().unwrap().clone();
|
||||||
|
print!("{}", prompt_str);
|
||||||
|
let _ = std::io::stdout().flush();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Err(broadcast::error::RecvError::Closed) => {
|
||||||
|
// Channel closed, exit the task
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
Err(broadcast::error::RecvError::Lagged(_)) => {
|
||||||
|
// Missed some messages, continue
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
/// Run interactive mode with console output.
|
/// Run interactive mode with console output.
|
||||||
/// If `agent_name` is Some, we're in agent+chat mode: skip session resume/verbose welcome,
|
/// If `agent_name` is Some, we're in agent+chat mode: skip session resume/verbose welcome,
|
||||||
/// and use the agent name as the prompt (e.g., "butler>").
|
/// and use the agent name as the prompt (e.g., "butler>").
|
||||||
@@ -186,6 +240,16 @@ pub async fn run_interactive<W: UiWriter>(
|
|||||||
let _ = rl.load_history(history_path);
|
let _ = rl.load_history(history_path);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Enable research completion notifications for real-time updates
|
||||||
|
let research_rx = agent.enable_research_notifications();
|
||||||
|
let is_busy = Arc::new(AtomicBool::new(false));
|
||||||
|
let current_prompt = Arc::new(std::sync::RwLock::new(String::new()));
|
||||||
|
let _notification_handle = spawn_research_notification_handler(
|
||||||
|
research_rx,
|
||||||
|
is_busy.clone(),
|
||||||
|
current_prompt.clone(),
|
||||||
|
);
|
||||||
|
|
||||||
// Track multiline input
|
// Track multiline input
|
||||||
let mut multiline_buffer = String::new();
|
let mut multiline_buffer = String::new();
|
||||||
let mut in_multiline = false;
|
let mut in_multiline = false;
|
||||||
@@ -221,6 +285,10 @@ pub async fn run_interactive<W: UiWriter>(
|
|||||||
|
|
||||||
// Build prompt
|
// Build prompt
|
||||||
let prompt = build_prompt(in_multiline, agent_name, &active_project);
|
let prompt = build_prompt(in_multiline, agent_name, &active_project);
|
||||||
|
|
||||||
|
// Update the shared prompt for the notification handler
|
||||||
|
*current_prompt.write().unwrap() = prompt.clone();
|
||||||
|
is_busy.store(false, Ordering::SeqCst);
|
||||||
|
|
||||||
let readline = rl.readline(&prompt);
|
let readline = rl.readline(&prompt);
|
||||||
match readline {
|
match readline {
|
||||||
@@ -258,9 +326,11 @@ pub async fn run_interactive<W: UiWriter>(
|
|||||||
// Reprint input with formatting
|
// Reprint input with formatting
|
||||||
reprint_formatted_input(&input, &prompt);
|
reprint_formatted_input(&input, &prompt);
|
||||||
|
|
||||||
|
is_busy.store(true, Ordering::SeqCst);
|
||||||
execute_user_input(
|
execute_user_input(
|
||||||
&mut agent, &input, show_prompt, show_code, &output, from_agent_mode
|
&mut agent, &input, show_prompt, show_code, &output, from_agent_mode
|
||||||
).await;
|
).await;
|
||||||
|
is_busy.store(false, Ordering::SeqCst);
|
||||||
} else {
|
} else {
|
||||||
// Single line input
|
// Single line input
|
||||||
let input = line.trim().to_string();
|
let input = line.trim().to_string();
|
||||||
@@ -278,7 +348,11 @@ pub async fn run_interactive<W: UiWriter>(
|
|||||||
|
|
||||||
// Check for control commands
|
// Check for control commands
|
||||||
if input.starts_with('/') {
|
if input.starts_with('/') {
|
||||||
if handle_command(&input, &mut agent, workspace_path, &output, &mut active_project, &mut rl, show_prompt, show_code).await? {
|
is_busy.store(true, Ordering::SeqCst);
|
||||||
|
let handled = handle_command(&input, &mut agent, workspace_path, &output, &mut active_project, &mut rl, show_prompt, show_code).await?;
|
||||||
|
is_busy.store(false, Ordering::SeqCst);
|
||||||
|
|
||||||
|
if handled {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -286,9 +360,11 @@ pub async fn run_interactive<W: UiWriter>(
|
|||||||
// Reprint input with formatting
|
// Reprint input with formatting
|
||||||
reprint_formatted_input(&input, &prompt);
|
reprint_formatted_input(&input, &prompt);
|
||||||
|
|
||||||
|
is_busy.store(true, Ordering::SeqCst);
|
||||||
execute_user_input(
|
execute_user_input(
|
||||||
&mut agent, &input, show_prompt, show_code, &output, from_agent_mode
|
&mut agent, &input, show_prompt, show_code, &output, from_agent_mode
|
||||||
).await;
|
).await;
|
||||||
|
is_busy.store(false, Ordering::SeqCst);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
Err(ReadlineError::Interrupted) => {
|
Err(ReadlineError::Interrupted) => {
|
||||||
|
|||||||
@@ -38,6 +38,9 @@ pub use task_result::TaskResult;
|
|||||||
// Re-export context window types
|
// Re-export context window types
|
||||||
pub use context_window::{ContextWindow, ThinResult, ThinScope};
|
pub use context_window::{ContextWindow, ThinResult, ThinScope};
|
||||||
|
|
||||||
|
// Re-export pending research types for notification handling
|
||||||
|
pub use pending_research::{PendingResearchManager, ResearchCompletionNotification, ResearchStatus};
|
||||||
|
|
||||||
// Export agent prompt generation for CLI use
|
// Export agent prompt generation for CLI use
|
||||||
pub use prompts::get_agent_system_prompt;
|
pub use prompts::get_agent_system_prompt;
|
||||||
|
|
||||||
@@ -1484,6 +1487,26 @@ impl<W: UiWriter> Agent<W> {
|
|||||||
&self.pending_research_manager
|
&self.pending_research_manager
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Subscribe to research completion notifications.
|
||||||
|
///
|
||||||
|
/// Returns a receiver that will receive notifications when research tasks complete.
|
||||||
|
/// Returns None if the agent was not configured with notifications enabled.
|
||||||
|
/// Use this in interactive mode to get real-time updates when research finishes.
|
||||||
|
pub fn subscribe_research_notifications(&self) -> Option<tokio::sync::broadcast::Receiver<pending_research::ResearchCompletionNotification>> {
|
||||||
|
self.pending_research_manager.subscribe()
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Enable research completion notifications and return a receiver.
|
||||||
|
///
|
||||||
|
/// This replaces the internal research manager with one that sends notifications.
|
||||||
|
/// Call this once during setup (e.g., in interactive mode) before any research tasks.
|
||||||
|
/// Returns a receiver that will receive notifications when research tasks complete.
|
||||||
|
pub fn enable_research_notifications(&mut self) -> tokio::sync::broadcast::Receiver<pending_research::ResearchCompletionNotification> {
|
||||||
|
let (manager, rx) = pending_research::PendingResearchManager::with_notifications();
|
||||||
|
self.pending_research_manager = manager;
|
||||||
|
rx
|
||||||
|
}
|
||||||
|
|
||||||
pub fn set_requirements_sha(&mut self, sha: String) {
|
pub fn set_requirements_sha(&mut self, sha: String) {
|
||||||
self.requirements_sha = Some(sha);
|
self.requirements_sha = Some(sha);
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -2,7 +2,8 @@
|
|||||||
//!
|
//!
|
||||||
//! This module manages research tasks that run in the background while the agent
|
//! This module manages research tasks that run in the background while the agent
|
||||||
//! continues with other work. Research results are stored until they can be
|
//! continues with other work. Research results are stored until they can be
|
||||||
//! injected into the conversation at a natural break point.
|
//! injected into the conversation at a natural break point. Completion notifications
|
||||||
|
//! are sent via a channel for real-time UI updates.
|
||||||
|
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
use std::collections::HashMap;
|
use std::collections::HashMap;
|
||||||
@@ -80,10 +81,23 @@ impl ResearchTask {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Notification sent when a research task completes (success or failure)
|
||||||
|
#[derive(Debug, Clone)]
|
||||||
|
pub struct ResearchCompletionNotification {
|
||||||
|
/// The research ID that completed
|
||||||
|
pub id: ResearchId,
|
||||||
|
/// Whether it succeeded or failed
|
||||||
|
pub status: ResearchStatus,
|
||||||
|
/// The query that was researched
|
||||||
|
pub query: String,
|
||||||
|
}
|
||||||
|
|
||||||
/// Thread-safe manager for pending research tasks
|
/// Thread-safe manager for pending research tasks
|
||||||
#[derive(Debug, Clone)]
|
#[derive(Debug, Clone)]
|
||||||
pub struct PendingResearchManager {
|
pub struct PendingResearchManager {
|
||||||
tasks: Arc<Mutex<HashMap<ResearchId, ResearchTask>>>,
|
tasks: Arc<Mutex<HashMap<ResearchId, ResearchTask>>>,
|
||||||
|
/// Channel sender for completion notifications (optional, for UI updates)
|
||||||
|
completion_tx: Option<tokio::sync::broadcast::Sender<ResearchCompletionNotification>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Default for PendingResearchManager {
|
impl Default for PendingResearchManager {
|
||||||
@@ -97,9 +111,31 @@ impl PendingResearchManager {
|
|||||||
pub fn new() -> Self {
|
pub fn new() -> Self {
|
||||||
Self {
|
Self {
|
||||||
tasks: Arc::new(Mutex::new(HashMap::new())),
|
tasks: Arc::new(Mutex::new(HashMap::new())),
|
||||||
|
completion_tx: None,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Create a new pending research manager with completion notifications enabled.
|
||||||
|
///
|
||||||
|
/// Returns the manager and a receiver for completion notifications.
|
||||||
|
/// The receiver can be used to get real-time updates when research completes.
|
||||||
|
pub fn with_notifications() -> (Self, tokio::sync::broadcast::Receiver<ResearchCompletionNotification>) {
|
||||||
|
// Buffer size of 16 should be plenty for concurrent research tasks
|
||||||
|
let (tx, rx) = tokio::sync::broadcast::channel(16);
|
||||||
|
let manager = Self {
|
||||||
|
tasks: Arc::new(Mutex::new(HashMap::new())),
|
||||||
|
completion_tx: Some(tx),
|
||||||
|
};
|
||||||
|
(manager, rx)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Subscribe to completion notifications.
|
||||||
|
///
|
||||||
|
/// Returns None if notifications are not enabled (manager created with `new()`).
|
||||||
|
pub fn subscribe(&self) -> Option<tokio::sync::broadcast::Receiver<ResearchCompletionNotification>> {
|
||||||
|
self.completion_tx.as_ref().map(|tx| tx.subscribe())
|
||||||
|
}
|
||||||
|
|
||||||
/// Generate a unique research ID
|
/// Generate a unique research ID
|
||||||
pub fn generate_id() -> ResearchId {
|
pub fn generate_id() -> ResearchId {
|
||||||
use std::time::{SystemTime, UNIX_EPOCH};
|
use std::time::{SystemTime, UNIX_EPOCH};
|
||||||
@@ -127,21 +163,47 @@ impl PendingResearchManager {
|
|||||||
|
|
||||||
/// Update a research task with its result
|
/// Update a research task with its result
|
||||||
pub fn complete(&self, id: &ResearchId, result: String) {
|
pub fn complete(&self, id: &ResearchId, result: String) {
|
||||||
let mut tasks = self.tasks.lock().unwrap();
|
let notification = {
|
||||||
if let Some(task) = tasks.get_mut(id) {
|
let mut tasks = self.tasks.lock().unwrap();
|
||||||
task.status = ResearchStatus::Complete;
|
if let Some(task) = tasks.get_mut(id) {
|
||||||
task.result = Some(result);
|
task.status = ResearchStatus::Complete;
|
||||||
debug!("Research task {} completed successfully", id);
|
task.result = Some(result);
|
||||||
|
debug!("Research task {} completed successfully", id);
|
||||||
|
Some(ResearchCompletionNotification {
|
||||||
|
id: id.clone(),
|
||||||
|
status: ResearchStatus::Complete,
|
||||||
|
query: task.query.clone(),
|
||||||
|
})
|
||||||
|
} else {
|
||||||
|
None
|
||||||
|
}
|
||||||
|
};
|
||||||
|
// Send notification outside the lock to avoid potential deadlocks
|
||||||
|
if let (Some(notification), Some(tx)) = (notification, &self.completion_tx) {
|
||||||
|
let _ = tx.send(notification); // Ignore error if no receivers
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Mark a research task as failed
|
/// Mark a research task as failed
|
||||||
pub fn fail(&self, id: &ResearchId, error: String) {
|
pub fn fail(&self, id: &ResearchId, error: String) {
|
||||||
let mut tasks = self.tasks.lock().unwrap();
|
let notification = {
|
||||||
if let Some(task) = tasks.get_mut(id) {
|
let mut tasks = self.tasks.lock().unwrap();
|
||||||
task.status = ResearchStatus::Failed;
|
if let Some(task) = tasks.get_mut(id) {
|
||||||
task.result = Some(error);
|
task.status = ResearchStatus::Failed;
|
||||||
debug!("Research task {} failed", id);
|
task.result = Some(error);
|
||||||
|
debug!("Research task {} failed", id);
|
||||||
|
Some(ResearchCompletionNotification {
|
||||||
|
id: id.clone(),
|
||||||
|
status: ResearchStatus::Failed,
|
||||||
|
query: task.query.clone(),
|
||||||
|
})
|
||||||
|
} else {
|
||||||
|
None
|
||||||
|
}
|
||||||
|
};
|
||||||
|
// Send notification outside the lock to avoid potential deadlocks
|
||||||
|
if let (Some(notification), Some(tx)) = (notification, &self.completion_tx) {
|
||||||
|
let _ = tx.send(notification); // Ignore error if no receivers
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -433,4 +495,43 @@ mod tests {
|
|||||||
let unique: std::collections::HashSet<_> = ids.iter().collect();
|
let unique: std::collections::HashSet<_> = ids.iter().collect();
|
||||||
assert_eq!(ids.len(), unique.len(), "Generated IDs should be unique");
|
assert_eq!(ids.len(), unique.len(), "Generated IDs should be unique");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn test_notifications_on_complete() {
|
||||||
|
let (manager, mut rx) = PendingResearchManager::with_notifications();
|
||||||
|
|
||||||
|
let id = manager.register("Test query");
|
||||||
|
|
||||||
|
// Complete the research
|
||||||
|
manager.complete(&id, "Report content".to_string());
|
||||||
|
|
||||||
|
// Should receive a notification
|
||||||
|
let notification = rx.recv().await.unwrap();
|
||||||
|
assert_eq!(notification.id, id);
|
||||||
|
assert_eq!(notification.status, ResearchStatus::Complete);
|
||||||
|
assert_eq!(notification.query, "Test query");
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn test_notifications_on_fail() {
|
||||||
|
let (manager, mut rx) = PendingResearchManager::with_notifications();
|
||||||
|
|
||||||
|
let id = manager.register("Test query");
|
||||||
|
|
||||||
|
// Fail the research
|
||||||
|
manager.fail(&id, "Connection error".to_string());
|
||||||
|
|
||||||
|
// Should receive a notification
|
||||||
|
let notification = rx.recv().await.unwrap();
|
||||||
|
assert_eq!(notification.id, id);
|
||||||
|
assert_eq!(notification.status, ResearchStatus::Failed);
|
||||||
|
assert_eq!(notification.query, "Test query");
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_no_notifications_without_setup() {
|
||||||
|
let manager = PendingResearchManager::new();
|
||||||
|
// subscribe() should return None when notifications not enabled
|
||||||
|
assert!(manager.subscribe().is_none());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user