Files
g3/crates/g3-core/src/pending_research.rs
Dhanji R. Prasanna 5ab1598e03 feat: async research tool - runs in background, returns immediately
The research tool now spawns the scout agent in a background tokio task
and returns immediately with a research_id placeholder. This allows the
agent to continue working while research runs (30-120 seconds).

Key changes:
- New PendingResearchManager for tracking async research tasks
- research tool returns immediately with placeholder containing research_id
- research_status tool to check progress of pending research
- Auto-injection of completed research at natural break points:
  - Start of each tool iteration (before LLM call)
  - Before prompting user in interactive mode
- /research CLI command to list all research tasks
- Updated system prompt to explain async behavior

The agent can:
- Continue with other work while research runs
- Check status with research_status tool
- Yield turn to user if results are critical before continuing
2026-01-30 13:00:02 +11:00

437 lines
14 KiB
Rust

//! Pending research manager for async research tasks.
//!
//! This module manages research tasks that run in the background while the agent
//! continues with other work. Research results are stored until they can be
//! injected into the conversation at a natural break point.
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::sync::{Arc, Mutex};
use std::time::{Duration, Instant};
use tracing::debug;
/// Unique identifier for a research task
pub type ResearchId = String;
/// Status of a research task
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub enum ResearchStatus {
/// Research is in progress
Pending,
/// Research completed successfully
Complete,
/// Research failed with an error
Failed,
}
impl std::fmt::Display for ResearchStatus {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
ResearchStatus::Pending => write!(f, "pending"),
ResearchStatus::Complete => write!(f, "complete"),
ResearchStatus::Failed => write!(f, "failed"),
}
}
}
/// A research task being tracked by the manager
#[derive(Debug, Clone)]
pub struct ResearchTask {
/// Unique identifier for this research task
pub id: ResearchId,
/// The original research query
pub query: String,
/// Current status of the research
pub status: ResearchStatus,
/// The research result (report or error message)
pub result: Option<String>,
/// When the research was initiated
pub started_at: Instant,
/// Whether this result has been injected into the conversation
pub injected: bool,
}
impl ResearchTask {
/// Create a new pending research task
pub fn new(id: ResearchId, query: String) -> Self {
Self {
id,
query,
status: ResearchStatus::Pending,
result: None,
started_at: Instant::now(),
injected: false,
}
}
/// Get the elapsed time since the research started
pub fn elapsed(&self) -> Duration {
self.started_at.elapsed()
}
/// Format elapsed time for display
pub fn elapsed_display(&self) -> String {
let secs = self.elapsed().as_secs();
if secs < 60 {
format!("{}s", secs)
} else {
format!("{}m {}s", secs / 60, secs % 60)
}
}
}
/// Thread-safe manager for pending research tasks
#[derive(Debug, Clone)]
pub struct PendingResearchManager {
tasks: Arc<Mutex<HashMap<ResearchId, ResearchTask>>>,
}
impl Default for PendingResearchManager {
fn default() -> Self {
Self::new()
}
}
impl PendingResearchManager {
/// Create a new pending research manager
pub fn new() -> Self {
Self {
tasks: Arc::new(Mutex::new(HashMap::new())),
}
}
/// Generate a unique research ID
pub fn generate_id() -> ResearchId {
use std::time::{SystemTime, UNIX_EPOCH};
let timestamp = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_millis();
// Use timestamp + random suffix for uniqueness
format!("research_{:x}_{:04x}", timestamp, rand::random::<u16>())
}
/// Register a new research task
///
/// Returns the research ID for tracking
pub fn register(&self, query: &str) -> ResearchId {
let id = Self::generate_id();
let task = ResearchTask::new(id.clone(), query.to_string());
let mut tasks = self.tasks.lock().unwrap();
tasks.insert(id.clone(), task);
debug!("Registered research task: {} for query: {}", id, query);
id
}
/// Update a research task with its result
pub fn complete(&self, id: &ResearchId, result: String) {
let mut tasks = self.tasks.lock().unwrap();
if let Some(task) = tasks.get_mut(id) {
task.status = ResearchStatus::Complete;
task.result = Some(result);
debug!("Research task {} completed successfully", id);
}
}
/// Mark a research task as failed
pub fn fail(&self, id: &ResearchId, error: String) {
let mut tasks = self.tasks.lock().unwrap();
if let Some(task) = tasks.get_mut(id) {
task.status = ResearchStatus::Failed;
task.result = Some(error);
debug!("Research task {} failed", id);
}
}
/// Get the status of a specific research task
pub fn get_status(&self, id: &ResearchId) -> Option<(ResearchStatus, Option<String>)> {
let tasks = self.tasks.lock().unwrap();
tasks.get(id).map(|t| (t.status.clone(), t.result.clone()))
}
/// Get a specific research task
pub fn get(&self, id: &ResearchId) -> Option<ResearchTask> {
let tasks = self.tasks.lock().unwrap();
tasks.get(id).cloned()
}
/// List all pending (not yet injected) research tasks
pub fn list_pending(&self) -> Vec<ResearchTask> {
let tasks = self.tasks.lock().unwrap();
tasks
.values()
.filter(|t| !t.injected)
.cloned()
.collect()
}
/// List all research tasks (including injected ones)
pub fn list_all(&self) -> Vec<ResearchTask> {
let tasks = self.tasks.lock().unwrap();
tasks.values().cloned().collect()
}
/// Get count of pending (in-progress) research tasks
pub fn pending_count(&self) -> usize {
let tasks = self.tasks.lock().unwrap();
tasks
.values()
.filter(|t| t.status == ResearchStatus::Pending)
.count()
}
/// Get count of completed but not yet injected research tasks
pub fn ready_count(&self) -> usize {
let tasks = self.tasks.lock().unwrap();
tasks
.values()
.filter(|t| !t.injected && t.status != ResearchStatus::Pending)
.count()
}
/// Take all completed research tasks that haven't been injected yet
///
/// Marks them as injected so they won't be returned again
pub fn take_completed(&self) -> Vec<ResearchTask> {
let mut tasks = self.tasks.lock().unwrap();
let mut completed = Vec::new();
for task in tasks.values_mut() {
if !task.injected && task.status != ResearchStatus::Pending {
task.injected = true;
completed.push(task.clone());
}
}
debug!("Took {} completed research tasks for injection", completed.len());
completed
}
/// Remove a research task (e.g., after it's been fully processed)
pub fn remove(&self, id: &ResearchId) -> Option<ResearchTask> {
let mut tasks = self.tasks.lock().unwrap();
tasks.remove(id)
}
/// Clear all completed and injected tasks (cleanup)
pub fn cleanup_injected(&self) {
let mut tasks = self.tasks.lock().unwrap();
tasks.retain(|_, t| !t.injected);
}
/// Check if there are any tasks (pending or ready)
pub fn has_tasks(&self) -> bool {
let tasks = self.tasks.lock().unwrap();
!tasks.is_empty()
}
/// Format a summary of pending research for display
pub fn format_status_summary(&self) -> Option<String> {
let tasks = self.tasks.lock().unwrap();
let pending: Vec<_> = tasks.values().filter(|t| t.status == ResearchStatus::Pending).collect();
let ready: Vec<_> = tasks.values().filter(|t| !t.injected && t.status != ResearchStatus::Pending).collect();
if pending.is_empty() && ready.is_empty() {
return None;
}
let mut parts = Vec::new();
if !pending.is_empty() {
parts.push(format!("🔍 {} researching", pending.len()));
}
if !ready.is_empty() {
parts.push(format!("📋 {} ready", ready.len()));
}
Some(parts.join(" | "))
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::thread;
use std::time::Duration;
#[test]
fn test_register_and_get() {
let manager = PendingResearchManager::new();
let id = manager.register("How to use tokio?");
let task = manager.get(&id).unwrap();
assert_eq!(task.query, "How to use tokio?");
assert_eq!(task.status, ResearchStatus::Pending);
assert!(task.result.is_none());
assert!(!task.injected);
}
#[test]
fn test_complete_research() {
let manager = PendingResearchManager::new();
let id = manager.register("Test query");
manager.complete(&id, "Research report here".to_string());
let (status, result) = manager.get_status(&id).unwrap();
assert_eq!(status, ResearchStatus::Complete);
assert_eq!(result.unwrap(), "Research report here");
}
#[test]
fn test_fail_research() {
let manager = PendingResearchManager::new();
let id = manager.register("Test query");
manager.fail(&id, "Connection timeout".to_string());
let (status, result) = manager.get_status(&id).unwrap();
assert_eq!(status, ResearchStatus::Failed);
assert_eq!(result.unwrap(), "Connection timeout");
}
#[test]
fn test_take_completed() {
let manager = PendingResearchManager::new();
let id1 = manager.register("Query 1");
let id2 = manager.register("Query 2");
let id3 = manager.register("Query 3");
// Complete two, leave one pending
manager.complete(&id1, "Report 1".to_string());
manager.fail(&id2, "Error".to_string());
// id3 stays pending
// Take completed
let completed = manager.take_completed();
assert_eq!(completed.len(), 2);
// Taking again should return empty (already injected)
let completed_again = manager.take_completed();
assert!(completed_again.is_empty());
// Pending count should be 1
assert_eq!(manager.pending_count(), 1);
}
#[test]
fn test_list_pending() {
let manager = PendingResearchManager::new();
let id1 = manager.register("Query 1");
let id2 = manager.register("Query 2");
manager.complete(&id1, "Report".to_string());
// Both should be in list_pending (not injected yet)
let pending = manager.list_pending();
assert_eq!(pending.len(), 2);
// Take completed
manager.take_completed();
// Now only the actually pending one should be listed
let pending = manager.list_pending();
assert_eq!(pending.len(), 1);
assert_eq!(pending[0].id, id2);
}
#[test]
fn test_format_status_summary() {
let manager = PendingResearchManager::new();
// Empty - no summary
assert!(manager.format_status_summary().is_none());
// One pending
let id1 = manager.register("Query 1");
let summary = manager.format_status_summary().unwrap();
assert!(summary.contains("1 researching"));
// One pending, one ready
let id2 = manager.register("Query 2");
manager.complete(&id2, "Report".to_string());
let summary = manager.format_status_summary().unwrap();
assert!(summary.contains("1 researching"));
assert!(summary.contains("1 ready"));
}
#[test]
fn test_thread_safety() {
let manager = PendingResearchManager::new();
let manager_clone = manager.clone();
let id = manager.register("Concurrent test");
let id_clone = id.clone();
let handle = thread::spawn(move || {
thread::sleep(Duration::from_millis(10));
manager_clone.complete(&id_clone, "Result from thread".to_string());
});
// Main thread checks status
loop {
if let Some((status, _)) = manager.get_status(&id) {
if status == ResearchStatus::Complete {
break;
}
}
thread::sleep(Duration::from_millis(5));
}
handle.join().unwrap();
let (status, result) = manager.get_status(&id).unwrap();
assert_eq!(status, ResearchStatus::Complete);
assert_eq!(result.unwrap(), "Result from thread");
}
#[test]
fn test_elapsed_display() {
let manager = PendingResearchManager::new();
let id = manager.register("Test");
let task = manager.get(&id).unwrap();
let display = task.elapsed_display();
// Should be "0s" or similar (just started)
assert!(display.contains('s'));
}
#[test]
fn test_cleanup_injected() {
let manager = PendingResearchManager::new();
let id1 = manager.register("Query 1");
let id2 = manager.register("Query 2");
manager.complete(&id1, "Report 1".to_string());
manager.complete(&id2, "Report 2".to_string());
// Take and inject
manager.take_completed();
// Both should still exist
assert_eq!(manager.list_all().len(), 2);
// Cleanup injected
manager.cleanup_injected();
// Should be empty now
assert_eq!(manager.list_all().len(), 0);
}
#[test]
fn test_generate_id_uniqueness() {
let ids: Vec<_> = (0..100).map(|_| PendingResearchManager::generate_id()).collect();
let unique: std::collections::HashSet<_> = ids.iter().collect();
assert_eq!(ids.len(), unique.len(), "Generated IDs should be unique");
}
}