streaming tool call attempt 1

This commit is contained in:
Dhanji Prasanna
2025-10-13 20:25:12 +11:00
parent b43b693b60
commit 627fdcd9bf
8 changed files with 134 additions and 31 deletions

1
Cargo.lock generated
View File

@@ -1119,6 +1119,7 @@ name = "g3-execution"
version = "0.1.0"
dependencies = [
"anyhow",
"futures",
"regex",
"tempfile",
"thiserror 1.0.69",

View File

@@ -1,5 +1,5 @@
use crossterm::style::Color;
use crossterm::style::{Stylize, SetForegroundColor, ResetColor};
use crossterm::style::{SetForegroundColor, ResetColor};
use termimad::MadSkin;
/// Simple output handler with markdown support

View File

@@ -8,6 +8,8 @@ use std::time::Instant;
pub struct ConsoleUiWriter {
current_tool_name: Mutex<Option<String>>,
current_tool_args: Mutex<Vec<(String, String)>>,
current_output_line: Mutex<Option<String>>,
output_line_printed: Mutex<bool>,
}
impl ConsoleUiWriter {
@@ -15,6 +17,8 @@ impl ConsoleUiWriter {
Self {
current_tool_name: Mutex::new(None),
current_tool_args: Mutex::new(Vec::new()),
current_output_line: Mutex::new(None),
output_line_printed: Mutex::new(false),
}
}
}
@@ -103,6 +107,25 @@ impl UiWriter for ConsoleUiWriter {
}
}
fn update_tool_output_line(&self, line: &str) {
let mut current_line = self.current_output_line.lock().unwrap();
let mut line_printed = self.output_line_printed.lock().unwrap();
// If we've already printed a line, clear it first
if *line_printed {
// Move cursor up one line and clear it
print!("\x1b[1A\x1b[2K");
}
// Print the new line
println!("\x1b[2m{}\x1b[0m", line);
let _ = io::stdout().flush();
// Update state
*current_line = Some(line.to_string());
*line_printed = true;
}
fn print_tool_output_line(&self, line: &str) {
println!("\x1b[2m{}\x1b[0m", line);
}
@@ -121,6 +144,8 @@ impl UiWriter for ConsoleUiWriter {
// Clear the stored tool info
*self.current_tool_name.lock().unwrap() = None;
self.current_tool_args.lock().unwrap().clear();
*self.current_output_line.lock().unwrap() = None;
*self.output_line_printed.lock().unwrap() = false;
}
fn print_agent_prompt(&self) {
@@ -252,6 +277,11 @@ impl UiWriter for RetroTuiWriter {
.push("Output:".to_string());
}
fn update_tool_output_line(&self, line: &str) {
// For retro mode, we'll just add to the output buffer
self.current_tool_output.lock().unwrap().push(line.to_string());
}
fn print_tool_output_line(&self, line: &str) {
self.current_tool_output
.lock()

View File

@@ -1442,6 +1442,7 @@ The tool will execute immediately and you'll receive the result (success or erro
};
if !new_content.trim().is_empty() {
#[allow(unused_assignments)]
if !response_started {
self.ui_writer.print_agent_prompt();
response_started = true;
@@ -1968,7 +1969,23 @@ The tool will execute immediately and you'll receive the result (success or erro
let escaped_command = shell_escape_command(command_str);
let executor = CodeExecutor::new();
match executor.execute_code("bash", &escaped_command).await {
// Create a receiver for streaming output
struct ToolOutputReceiver<'a, W: UiWriter> {
ui_writer: &'a W,
}
impl<'a, W: UiWriter> g3_execution::OutputReceiver for ToolOutputReceiver<'a, W> {
fn on_output_line(&self, line: &str) {
self.ui_writer.update_tool_output_line(line);
}
}
let receiver = ToolOutputReceiver {
ui_writer: &self.ui_writer,
};
match executor.execute_bash_streaming(&escaped_command, &receiver).await {
Ok(result) => {
if result.success {
Ok(if result.stdout.is_empty() {

View File

@@ -245,32 +245,3 @@ fn test_concurrent_access() {
}
}
fn main() {
println!("Running TaskResult comprehensive tests...");
test_task_result_basic_functionality();
println!("✅ Basic functionality test passed");
test_extract_last_block_various_formats();
println!("✅ Extract last block test passed");
test_is_approved_detection();
println!("✅ Approval detection test passed");
test_context_window_preservation();
println!("✅ Context window preservation test passed");
test_coach_feedback_extraction_scenarios();
println!("✅ Coach feedback extraction test passed");
test_edge_cases_and_special_characters();
println!("✅ Edge cases test passed");
test_large_response_handling();
println!("✅ Large response handling test passed");
test_concurrent_access();
println!("✅ Concurrent access test passed");
println!("\n🎉 All TaskResult tests passed successfully!");
}

View File

@@ -26,6 +26,9 @@ pub trait UiWriter: Send + Sync {
/// Print tool output header
fn print_tool_output_header(&self);
/// Update the current tool output line (replaces previous line)
fn update_tool_output_line(&self, line: &str);
/// Print a tool output line
fn print_tool_output_line(&self, line: &str);
@@ -60,6 +63,7 @@ impl UiWriter for NullUiWriter {
fn print_tool_header(&self, _tool_name: &str) {}
fn print_tool_arg(&self, _key: &str, _value: &str) {}
fn print_tool_output_header(&self) {}
fn update_tool_output_line(&self, _line: &str) {}
fn print_tool_output_line(&self, _line: &str) {}
fn print_tool_output_summary(&self, _hidden_count: usize) {}
fn print_tool_timing(&self, _duration_str: &str) {}

View File

@@ -7,6 +7,7 @@ description = "Code execution engine for G3 AI agent"
[dependencies]
tokio = { workspace = true }
anyhow = { workspace = true }
futures = "0.3"
thiserror = { workspace = true }
tracing = { workspace = true }
regex = "1.0"

View File

@@ -203,3 +203,82 @@ impl Default for CodeExecutor {
Self::new()
}
}
/// Trait for receiving streaming output from command execution
pub trait OutputReceiver: Send + Sync {
/// Called when a new line of output is available
fn on_output_line(&self, line: &str);
}
impl CodeExecutor {
/// Execute bash command with streaming output
pub async fn execute_bash_streaming<R: OutputReceiver>(
&self,
code: &str,
receiver: &R
) -> Result<ExecutionResult> {
use std::process::Stdio;
use tokio::io::{AsyncBufReadExt, BufReader};
use tokio::process::Command as TokioCommand;
let mut child = TokioCommand::new("bash")
.arg("-c")
.arg(code)
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.spawn()?;
let stdout = child.stdout.take().unwrap();
let stderr = child.stderr.take().unwrap();
let stdout_reader = BufReader::new(stdout);
let stderr_reader = BufReader::new(stderr);
let mut stdout_lines = stdout_reader.lines();
let mut stderr_lines = stderr_reader.lines();
let mut stdout_output = Vec::new();
let mut stderr_output = Vec::new();
// Read output lines as they come
loop {
tokio::select! {
line = stdout_lines.next_line() => {
match line {
Ok(Some(line)) => {
receiver.on_output_line(&line);
stdout_output.push(line);
}
Ok(None) => break, // EOF
Err(e) => {
error!("Error reading stdout: {}", e);
break;
}
}
}
line = stderr_lines.next_line() => {
match line {
Ok(Some(line)) => {
receiver.on_output_line(&format!("stderr: {}", line));
stderr_output.push(line);
}
Ok(None) => {}, // stderr EOF, continue
Err(e) => {
error!("Error reading stderr: {}", e);
}
}
}
else => break
}
}
let status = child.wait().await?;
Ok(ExecutionResult {
stdout: stdout_output.join("\n"),
stderr: stderr_output.join("\n"),
exit_code: status.code().unwrap_or(-1),
success: status.success(),
})
}
}