From 627fdcd9bf1e1cd23a1ffd2ca8facd4116f32542 Mon Sep 17 00:00:00 2001 From: Dhanji Prasanna Date: Mon, 13 Oct 2025 20:25:12 +1100 Subject: [PATCH] streaming tool call attempt 1 --- Cargo.lock | 1 + crates/g3-cli/src/tui.rs | 2 +- crates/g3-cli/src/ui_writer_impl.rs | 30 +++++++ crates/g3-core/src/lib.rs | 19 ++++- .../src/task_result_comprehensive_tests.rs | 29 ------- crates/g3-core/src/ui_writer.rs | 4 + crates/g3-execution/Cargo.toml | 1 + crates/g3-execution/src/lib.rs | 79 +++++++++++++++++++ 8 files changed, 134 insertions(+), 31 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 467f126..8a47bf4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1119,6 +1119,7 @@ name = "g3-execution" version = "0.1.0" dependencies = [ "anyhow", + "futures", "regex", "tempfile", "thiserror 1.0.69", diff --git a/crates/g3-cli/src/tui.rs b/crates/g3-cli/src/tui.rs index a77e51b..c1202c7 100644 --- a/crates/g3-cli/src/tui.rs +++ b/crates/g3-cli/src/tui.rs @@ -1,5 +1,5 @@ use crossterm::style::Color; -use crossterm::style::{Stylize, SetForegroundColor, ResetColor}; +use crossterm::style::{SetForegroundColor, ResetColor}; use termimad::MadSkin; /// Simple output handler with markdown support diff --git a/crates/g3-cli/src/ui_writer_impl.rs b/crates/g3-cli/src/ui_writer_impl.rs index d836fe5..9827ee3 100644 --- a/crates/g3-cli/src/ui_writer_impl.rs +++ b/crates/g3-cli/src/ui_writer_impl.rs @@ -8,6 +8,8 @@ use std::time::Instant; pub struct ConsoleUiWriter { current_tool_name: Mutex>, current_tool_args: Mutex>, + current_output_line: Mutex>, + output_line_printed: Mutex, } impl ConsoleUiWriter { @@ -15,6 +17,8 @@ impl ConsoleUiWriter { Self { current_tool_name: Mutex::new(None), current_tool_args: Mutex::new(Vec::new()), + current_output_line: Mutex::new(None), + output_line_printed: Mutex::new(false), } } } @@ -103,6 +107,25 @@ impl UiWriter for ConsoleUiWriter { } } + fn update_tool_output_line(&self, line: &str) { + let mut current_line = self.current_output_line.lock().unwrap(); + let mut line_printed = self.output_line_printed.lock().unwrap(); + + // If we've already printed a line, clear it first + if *line_printed { + // Move cursor up one line and clear it + print!("\x1b[1A\x1b[2K"); + } + + // Print the new line + println!("│ \x1b[2m{}\x1b[0m", line); + let _ = io::stdout().flush(); + + // Update state + *current_line = Some(line.to_string()); + *line_printed = true; + } + fn print_tool_output_line(&self, line: &str) { println!("│ \x1b[2m{}\x1b[0m", line); } @@ -121,6 +144,8 @@ impl UiWriter for ConsoleUiWriter { // Clear the stored tool info *self.current_tool_name.lock().unwrap() = None; self.current_tool_args.lock().unwrap().clear(); + *self.current_output_line.lock().unwrap() = None; + *self.output_line_printed.lock().unwrap() = false; } fn print_agent_prompt(&self) { @@ -252,6 +277,11 @@ impl UiWriter for RetroTuiWriter { .push("Output:".to_string()); } + fn update_tool_output_line(&self, line: &str) { + // For retro mode, we'll just add to the output buffer + self.current_tool_output.lock().unwrap().push(line.to_string()); + } + fn print_tool_output_line(&self, line: &str) { self.current_tool_output .lock() diff --git a/crates/g3-core/src/lib.rs b/crates/g3-core/src/lib.rs index b8d79a3..d155880 100644 --- a/crates/g3-core/src/lib.rs +++ b/crates/g3-core/src/lib.rs @@ -1442,6 +1442,7 @@ The tool will execute immediately and you'll receive the result (success or erro }; if !new_content.trim().is_empty() { + #[allow(unused_assignments)] if !response_started { self.ui_writer.print_agent_prompt(); response_started = true; @@ -1968,7 +1969,23 @@ The tool will execute immediately and you'll receive the result (success or erro let escaped_command = shell_escape_command(command_str); let executor = CodeExecutor::new(); - match executor.execute_code("bash", &escaped_command).await { + + // Create a receiver for streaming output + struct ToolOutputReceiver<'a, W: UiWriter> { + ui_writer: &'a W, + } + + impl<'a, W: UiWriter> g3_execution::OutputReceiver for ToolOutputReceiver<'a, W> { + fn on_output_line(&self, line: &str) { + self.ui_writer.update_tool_output_line(line); + } + } + + let receiver = ToolOutputReceiver { + ui_writer: &self.ui_writer, + }; + + match executor.execute_bash_streaming(&escaped_command, &receiver).await { Ok(result) => { if result.success { Ok(if result.stdout.is_empty() { diff --git a/crates/g3-core/src/task_result_comprehensive_tests.rs b/crates/g3-core/src/task_result_comprehensive_tests.rs index f15b1f3..3164d68 100644 --- a/crates/g3-core/src/task_result_comprehensive_tests.rs +++ b/crates/g3-core/src/task_result_comprehensive_tests.rs @@ -245,32 +245,3 @@ fn test_concurrent_access() { } } -fn main() { - println!("Running TaskResult comprehensive tests..."); - - test_task_result_basic_functionality(); - println!("✅ Basic functionality test passed"); - - test_extract_last_block_various_formats(); - println!("✅ Extract last block test passed"); - - test_is_approved_detection(); - println!("✅ Approval detection test passed"); - - test_context_window_preservation(); - println!("✅ Context window preservation test passed"); - - test_coach_feedback_extraction_scenarios(); - println!("✅ Coach feedback extraction test passed"); - - test_edge_cases_and_special_characters(); - println!("✅ Edge cases test passed"); - - test_large_response_handling(); - println!("✅ Large response handling test passed"); - - test_concurrent_access(); - println!("✅ Concurrent access test passed"); - - println!("\n🎉 All TaskResult tests passed successfully!"); -} diff --git a/crates/g3-core/src/ui_writer.rs b/crates/g3-core/src/ui_writer.rs index 6e7dc03..1b532e7 100644 --- a/crates/g3-core/src/ui_writer.rs +++ b/crates/g3-core/src/ui_writer.rs @@ -26,6 +26,9 @@ pub trait UiWriter: Send + Sync { /// Print tool output header fn print_tool_output_header(&self); + /// Update the current tool output line (replaces previous line) + fn update_tool_output_line(&self, line: &str); + /// Print a tool output line fn print_tool_output_line(&self, line: &str); @@ -60,6 +63,7 @@ impl UiWriter for NullUiWriter { fn print_tool_header(&self, _tool_name: &str) {} fn print_tool_arg(&self, _key: &str, _value: &str) {} fn print_tool_output_header(&self) {} + fn update_tool_output_line(&self, _line: &str) {} fn print_tool_output_line(&self, _line: &str) {} fn print_tool_output_summary(&self, _hidden_count: usize) {} fn print_tool_timing(&self, _duration_str: &str) {} diff --git a/crates/g3-execution/Cargo.toml b/crates/g3-execution/Cargo.toml index 4d9512b..a9901c0 100644 --- a/crates/g3-execution/Cargo.toml +++ b/crates/g3-execution/Cargo.toml @@ -7,6 +7,7 @@ description = "Code execution engine for G3 AI agent" [dependencies] tokio = { workspace = true } anyhow = { workspace = true } +futures = "0.3" thiserror = { workspace = true } tracing = { workspace = true } regex = "1.0" diff --git a/crates/g3-execution/src/lib.rs b/crates/g3-execution/src/lib.rs index e90c923..908c2bc 100644 --- a/crates/g3-execution/src/lib.rs +++ b/crates/g3-execution/src/lib.rs @@ -203,3 +203,82 @@ impl Default for CodeExecutor { Self::new() } } + +/// Trait for receiving streaming output from command execution +pub trait OutputReceiver: Send + Sync { + /// Called when a new line of output is available + fn on_output_line(&self, line: &str); +} + +impl CodeExecutor { + /// Execute bash command with streaming output + pub async fn execute_bash_streaming( + &self, + code: &str, + receiver: &R + ) -> Result { + use std::process::Stdio; + use tokio::io::{AsyncBufReadExt, BufReader}; + use tokio::process::Command as TokioCommand; + + let mut child = TokioCommand::new("bash") + .arg("-c") + .arg(code) + .stdout(Stdio::piped()) + .stderr(Stdio::piped()) + .spawn()?; + + let stdout = child.stdout.take().unwrap(); + let stderr = child.stderr.take().unwrap(); + + let stdout_reader = BufReader::new(stdout); + let stderr_reader = BufReader::new(stderr); + + let mut stdout_lines = stdout_reader.lines(); + let mut stderr_lines = stderr_reader.lines(); + + let mut stdout_output = Vec::new(); + let mut stderr_output = Vec::new(); + + // Read output lines as they come + loop { + tokio::select! { + line = stdout_lines.next_line() => { + match line { + Ok(Some(line)) => { + receiver.on_output_line(&line); + stdout_output.push(line); + } + Ok(None) => break, // EOF + Err(e) => { + error!("Error reading stdout: {}", e); + break; + } + } + } + line = stderr_lines.next_line() => { + match line { + Ok(Some(line)) => { + receiver.on_output_line(&format!("stderr: {}", line)); + stderr_output.push(line); + } + Ok(None) => {}, // stderr EOF, continue + Err(e) => { + error!("Error reading stderr: {}", e); + } + } + } + else => break + } + } + + let status = child.wait().await?; + + Ok(ExecutionResult { + stdout: stdout_output.join("\n"), + stderr: stderr_output.join("\n"), + exit_code: status.code().unwrap_or(-1), + success: status.success(), + }) + } +}