From 0c92a7c6b4b755a56caa0267b693462ca30ae4d8 Mon Sep 17 00:00:00 2001 From: Dhanji Prasanna Date: Tue, 9 Sep 2025 10:02:16 +1000 Subject: [PATCH] inline tool calling --- crates/g3-core/src/lib.rs | 461 +++++++++++++++++++++++++++------ crates/g3-execution/src/lib.rs | 2 +- 2 files changed, 378 insertions(+), 85 deletions(-) diff --git a/crates/g3-core/src/lib.rs b/crates/g3-core/src/lib.rs index 1a77556..98e0481 100644 --- a/crates/g3-core/src/lib.rs +++ b/crates/g3-core/src/lib.rs @@ -3,10 +3,150 @@ use g3_config::Config; use g3_execution::CodeExecutor; use g3_providers::{CompletionRequest, Message, MessageRole, ProviderRegistry}; use serde::{Deserialize, Serialize}; -use std::path::Path; use std::time::{Duration, Instant}; use tokio_util::sync::CancellationToken; -use tracing::{error, field::debug, info}; +use tracing::{error, info, warn}; + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ToolCall { + pub tool: String, + pub args: serde_json::Value, +} + +#[derive(Debug, Clone)] +pub enum StreamState { + Generating, + ToolDetected(ToolCall), + Executing, + Resuming, +} + +#[derive(Debug)] +pub struct StreamingToolParser { + buffer: String, + brace_count: i32, + in_tool_call: bool, + tool_start_pos: Option, +} + +impl StreamingToolParser { + pub fn new() -> Self { + Self { + buffer: String::new(), + brace_count: 0, + in_tool_call: false, + tool_start_pos: None, + } + } + + pub fn add_chunk(&mut self, chunk: &str) -> Option<(ToolCall, usize)> { + self.buffer.push_str(chunk); + //info!("Parser buffer now: {:?}", self.buffer); + self.detect_tool_call() + } + + fn detect_tool_call(&mut self) -> Option<(ToolCall, usize)> { + //info!("Detecting tool call in buffer: {:?}", self.buffer); + + // Look for the start of a tool call pattern: {"tool": + if !self.in_tool_call { + // Look for JSON tool call pattern - check both raw JSON and inside code blocks + if let Some(pos) = self.buffer.rfind(r#"{"tool":"#) { + //info!("Found tool call pattern at position: {}", pos); + + // Check if this is inside a code block + let before_pos = &self.buffer[..pos]; + let code_block_count = before_pos.matches("```").count(); + + //info!("Code block count before position {}: {}", pos, code_block_count); + + // Accept tool calls both inside and outside code blocks + // The LLM might use either format despite our instructions + //info!("Starting tool call parsing (code block status: {})", code_block_count % 2 == 1); + self.in_tool_call = true; + self.tool_start_pos = Some(pos); + self.brace_count = 0; // Start counting from 0, we'll count the opening brace in parsing + + // Continue parsing from after the opening brace + return self.parse_from_start_pos(pos); + } + } else { + //info!("Already in tool call, continuing parsing"); + // We're already in a tool call, continue parsing + let start_pos = self.tool_start_pos.unwrap(); + return self.parse_from_start_pos(start_pos); + } + + None + } + + fn parse_from_start_pos(&mut self, start_pos: usize) -> Option<(ToolCall, usize)> { + let remaining = self.buffer[start_pos..].to_string(); + self.parse_from_position(&remaining, start_pos) + } + + fn parse_from_position(&mut self, text: &str, start_pos: usize) -> Option<(ToolCall, usize)> { + let mut current_brace_count = 0; // Always start fresh for each parsing attempt + + //info!("Parsing from position {} with text: {:?}", start_pos, text); + //info!("Starting brace count: {}", current_brace_count); + + for (i, ch) in text.char_indices() { + match ch { + '{' => current_brace_count += 1, + '}' => { + current_brace_count -= 1; + //info!("Found '}}' at position {}, brace count now: {}", i, current_brace_count); + if current_brace_count == 0 { + // Found complete JSON object + let end_pos = start_pos + i + 1; + let json_str = &self.buffer[start_pos..end_pos]; + + //info!("Complete JSON found: {}", json_str); + + if let Ok(tool_call) = serde_json::from_str::(json_str) { + info!("Successfully parsed tool call: {:?}", tool_call); + // Reset parser state + self.in_tool_call = false; + self.tool_start_pos = None; + self.brace_count = 0; + + return Some((tool_call, end_pos)); + } else { + info!("Failed to parse JSON: {}", json_str); + // Invalid JSON, reset and continue looking + self.in_tool_call = false; + self.tool_start_pos = None; + self.brace_count = 0; + } + } + } + _ => {} + } + } + + // Update brace count for next iteration + self.brace_count = current_brace_count; + //info!("End of parsing, final brace count: {}", current_brace_count); + None + } + + pub fn get_content_before_tool(&self, tool_end_pos: usize) -> String { + if tool_end_pos <= self.buffer.len() { + self.buffer[..tool_end_pos].to_string() + } else { + self.buffer.clone() + } + } + + pub fn get_remaining_content(&self, from_pos: usize) -> String { + if from_pos < self.buffer.len() { + self.buffer[from_pos..].to_string() + } else { + String::new() + } + } +} #[derive(Debug, Clone)] pub struct ContextWindow { @@ -215,7 +355,7 @@ impl Agent { pub async fn execute_task_with_timing_cancellable( &mut self, description: &str, - language: Option<&str>, + _language: Option<&str>, _auto_execute: bool, show_prompt: bool, show_code: bool, @@ -224,59 +364,37 @@ impl Agent { ) -> Result { info!("Executing task: {}", description); - let provider = self.providers.get(None)?; + let _provider = self.providers.get(None)?; let system_prompt = format!( - "You are G3, a general-purpose AI agent. Your goal is to analyze and write code to solve given problems. + "You are G3, a general-purpose AI agent. Your goal is to analyze and solve problems step by step. - G3 uses LLMs with tool calling capability. - Tools allow external systems to provide context and data to G3. You solve higher level problems using - tools, and can interact with multiple at once. When you want to perform an action, use 'I' as the pronoun. +# Tool Call Format + +When you need to execute a tool, write ONLY the JSON tool call on a new line: + +{{\"tool\": \"tool_name\", \"args\": {{\"param\": \"value\"}}}} + +The tool will execute immediately and you'll receive the result to continue with. # Available Tools -- name: shell - type: function - usage: shell [command] - description: \" - Execute a command in the shell. +- **shell**: Execute shell commands + - Format: {{\"tool\": \"shell\", \"args\": {{\"command\": \"your_command_here\"}}}} + - Example: {{\"tool\": \"shell\", \"args\": {{\"command\": \"ls ~/Downloads\"}}}} - This will return the output and error concatenated into a single string, as - you would see from running on the command line. There will also be an indication - of if the command succeeded or failed. +- **final_output**: Signal task completion + - Format: {{\"tool\": \"final_output\", \"args\": {{\"summary\": \"what_was_accomplished\"}}}} - Avoid commands that produce a large amount of output, and consider piping those outputs to files. +# Instructions - **Important**: Each shell command runs in its own process. Things like directory changes or - sourcing files do not persist between tool calls. So you may need to repeat them each time by - stringing together commands, e.g. `cd example && ls` or `source env/bin/activate && pip install numpy` +1. Break down tasks into small steps +2. Execute ONE tool at a time +3. Wait for the result before proceeding +4. Use the actual file paths on the system (like ~/Downloads for Downloads folder) +5. End with final_output when done - Multiple commands: Use ; or && to chain commands, avoid newlines - Pathnames: Use absolute paths and avoid cd unless explicitly requested - \" - -- name: final_output - type: function - usage: final_output [summary] - description: \" - This tool signals the final output for a user in a conversation and MUST be used for the final message to the user. You must - pass in a detailed summary of the work done so far.\" - -# Response Guidelines -- Use Markdown formatting for all responses. -- Follow best practices for Markdown, including: - - Using headers for organization. - - Bullet points for lists. - - Links formatted correctly, either as linked text (e.g., [this is linked text](https://example.com)) or automatic links using angle brackets (e.g., ). -- For code, use fenced code blocks by placing triple backticks (` ``` `) before and after the code. Include the language identifier after the opening backticks (e.g., ` ```python `) to enable syntax highlighting. -- Ensure clarity, conciseness, and proper formatting to enhance readability and usability. - -IMPORTANT INSTRUCTIONS: - -Break down your task into smaller steps and do one step and tool call at a time. -Do not try to use multiple tools at once. -**After you get the tool result back, consider the result and then proceed to do -the next step and tool call if required.** +Let's start with the first step of your task. "); if show_prompt { @@ -337,27 +455,17 @@ the next step and tool call if required.** }; self.context_window.add_message(assistant_message); - // Time the code execution with cancellation support - let exec_start = Instant::now(); - let executor = CodeExecutor::new(); - let result = tokio::select! { - result = executor.execute_from_response_with_options(&response_content, show_code) => result?, - _ = cancellation_token.cancelled() => { - return Err(anyhow::anyhow!("Operation cancelled by user")); - } - }; - let exec_duration = exec_start.elapsed(); - + // With streaming tool execution, we don't need separate code execution + // The tools are already executed during streaming if show_timing { let timing_summary = format!( - "\nā±ļø {} | šŸ’­ {} | āš”ļø {}", + "\nā±ļø {} | šŸ’­ {}", Self::format_duration(llm_duration), - Self::format_duration(think_time), - Self::format_duration(exec_duration) + Self::format_duration(think_time) ); - Ok(format!("{}\n{}", result, timing_summary)) + Ok(format!("{}\n{}", response_content, timing_summary)) } else { - Ok(result) + Ok(response_content) } } @@ -366,45 +474,230 @@ the next step and tool call if required.** } async fn stream_completion(&self, request: CompletionRequest) -> Result<(String, Duration)> { + self.stream_completion_with_tools(request).await + } + + async fn stream_completion_with_tools( + &self, + mut request: CompletionRequest, + ) -> Result<(String, Duration)> { + use std::io::{self, Write}; use tokio_stream::StreamExt; - let provider = self.providers.get(None)?; - let mut stream = provider.stream(request).await?; - - let mut full_content = String::new(); + let mut full_response = String::new(); let mut first_token_time: Option = None; let stream_start = Instant::now(); + let mut total_execution_time = Duration::new(0, 0); + let mut iteration_count = 0; + const MAX_ITERATIONS: usize = 10; // Prevent infinite loops print!("šŸ¤– "); // Start the response indicator - use std::io::{self, Write}; io::stdout().flush()?; - while let Some(chunk_result) = stream.next().await { - match chunk_result { - Ok(chunk) => { - // Record time to first token - if first_token_time.is_none() && !chunk.content.is_empty() { - first_token_time = Some(stream_start.elapsed()); - } + loop { + iteration_count += 1; + if iteration_count > MAX_ITERATIONS { + warn!("Maximum iterations reached, stopping stream"); + break; + } - print!("{}", chunk.content); - io::stdout().flush()?; - full_content.push_str(&chunk.content); + // Add a small delay between iterations to prevent "model busy" errors + if iteration_count > 1 { + tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; + } - if chunk.finished { - break; + let provider = self.providers.get(None)?; + let mut stream = match provider.stream(request.clone()).await { + Ok(s) => s, + Err(e) => { + if iteration_count > 1 && e.to_string().contains("busy") { + warn!("Model busy on iteration {}, retrying in 500ms", iteration_count); + tokio::time::sleep(tokio::time::Duration::from_millis(500)).await; + match provider.stream(request.clone()).await { + Ok(s) => s, + Err(e2) => { + error!("Failed to start stream after retry: {}", e2); + return Err(e2); + } + } + } else { + return Err(e); } } - Err(e) => { - error!("Streaming error: {}", e); - return Err(e); + }; + let mut parser = StreamingToolParser::new(); + let mut current_response = String::new(); + let mut tool_executed = false; + + while let Some(chunk_result) = stream.next().await { + match chunk_result { + Ok(chunk) => { + // Record time to first token + if first_token_time.is_none() && !chunk.content.is_empty() { + first_token_time = Some(stream_start.elapsed()); + } + + // Check for tool calls in the streaming content + if let Some((tool_call, tool_end_pos)) = parser.add_chunk(&chunk.content) { + info!( + "šŸ”§ Detected tool call: {:?} at position {}", + tool_call, tool_end_pos + ); + // Found a complete tool call! Stop streaming and execute it + let content_before_tool = parser.get_content_before_tool(tool_end_pos); + + // Display content up to the tool call (excluding the JSON) + let display_content = if let Some(json_start) = + content_before_tool.rfind(r#"{"tool":"#) + { + &content_before_tool[..json_start] + } else { + &content_before_tool + }; + + // Safely get the new content to display + let new_content = if current_response.len() <= display_content.len() { + // Use char indices to avoid UTF-8 boundary issues + let chars_already_shown = current_response.chars().count(); + display_content.chars().skip(chars_already_shown).collect::() + } else { + String::new() + }; + print!("{}", new_content); + io::stdout().flush()?; + + // Execute the tool + println!(); // New line before tool execution + let exec_start = Instant::now(); + let tool_result = self.execute_tool(&tool_call).await?; + let exec_duration = exec_start.elapsed(); + total_execution_time += exec_duration; + + // Display tool execution result + println!("šŸ”§ {}: {}", tool_call.tool, tool_result); + print!("šŸ¤– "); // Continue response indicator + io::stdout().flush()?; + + // Update the conversation with the tool call and result + let tool_message = Message { + role: MessageRole::Assistant, + content: format!( + "{}\n\n{{\"tool\": \"{}\", \"args\": {}}}", + display_content.trim(), + tool_call.tool, + tool_call.args + ), + }; + let result_message = Message { + role: MessageRole::User, // Tool results come back as user messages + content: format!("Tool result: {}", tool_result), + }; + + request.messages.push(tool_message); + request.messages.push(result_message); + + full_response.push_str(display_content); + full_response.push_str(&format!( + "\n\nTool executed: {} -> {}\n\n", + tool_call.tool, tool_result + )); + + tool_executed = true; + // Break out of current stream to start a new one with updated context + break; + } else { + // No tool call detected, continue streaming normally + print!("{}", chunk.content); + io::stdout().flush()?; + current_response.push_str(&chunk.content); + } + + if chunk.finished { + // Stream finished naturally without tool calls + full_response.push_str(¤t_response); + println!(); // New line after streaming completes + let ttft = first_token_time.unwrap_or_else(|| stream_start.elapsed()); + return Ok((full_response, ttft)); + } + } + Err(e) => { + error!("Streaming error: {}", e); + + // If we executed a tool, try to continue with a new stream + if tool_executed { + warn!("Stream error after tool execution, attempting to continue"); + break; // Break to outer loop to start new stream + } else { + return Err(e); + } + } } } + + // If we get here and no tool was executed, we're done + if !tool_executed { + full_response.push_str(¤t_response); + println!(); // New line after streaming completes + let ttft = first_token_time.unwrap_or_else(|| stream_start.elapsed()); + return Ok((full_response, ttft)); + } + + // Continue the loop to start a new stream with updated context + info!( + "Starting new stream iteration {} with {} messages", + iteration_count, + request.messages.len() + ); } - println!(); // New line after streaming completes + // If we exit the loop due to max iterations let ttft = first_token_time.unwrap_or_else(|| stream_start.elapsed()); - Ok((full_content, ttft)) + Ok((full_response, ttft)) + } + + async fn execute_tool(&self, tool_call: &ToolCall) -> Result { + match tool_call.tool.as_str() { + "shell" => { + if let Some(command) = tool_call.args.get("command") { + if let Some(command_str) = command.as_str() { + let executor = CodeExecutor::new(); + match executor.execute_code("bash", command_str).await { + Ok(result) => { + if result.success { + Ok(if result.stdout.is_empty() { + "āœ… Command executed successfully".to_string() + } else { + result.stdout.trim().to_string() + }) + } else { + Ok(format!("āŒ Command failed: {}", result.stderr.trim())) + } + } + Err(e) => Ok(format!("āŒ Execution error: {}", e)), + } + } else { + Ok("āŒ Invalid command argument".to_string()) + } + } else { + Ok("āŒ Missing command argument".to_string()) + } + } + "final_output" => { + if let Some(summary) = tool_call.args.get("summary") { + if let Some(summary_str) = summary.as_str() { + Ok(format!("šŸ“‹ Final Output: {}", summary_str)) + } else { + Ok("šŸ“‹ Task completed".to_string()) + } + } else { + Ok("šŸ“‹ Task completed".to_string()) + } + } + _ => { + warn!("Unknown tool: {}", tool_call.tool); + Ok(format!("ā“ Unknown tool: {}", tool_call.tool)) + } + } } fn format_duration(duration: Duration) -> String { diff --git a/crates/g3-execution/src/lib.rs b/crates/g3-execution/src/lib.rs index ccb6f25..e90c923 100644 --- a/crates/g3-execution/src/lib.rs +++ b/crates/g3-execution/src/lib.rs @@ -133,7 +133,7 @@ impl CodeExecutor { } /// Execute code in the specified language - async fn execute_code(&self, language: &str, code: &str) -> Result { + pub async fn execute_code(&self, language: &str, code: &str) -> Result { match language.to_lowercase().as_str() { "python" | "py" => self.execute_python(code).await, "bash" | "shell" | "sh" => self.execute_bash(code).await,