inline tool calling

This commit is contained in:
Dhanji Prasanna
2025-09-09 10:02:16 +10:00
parent a69054cb2b
commit 0c92a7c6b4
2 changed files with 378 additions and 85 deletions

View File

@@ -3,10 +3,150 @@ use g3_config::Config;
use g3_execution::CodeExecutor; use g3_execution::CodeExecutor;
use g3_providers::{CompletionRequest, Message, MessageRole, ProviderRegistry}; use g3_providers::{CompletionRequest, Message, MessageRole, ProviderRegistry};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use std::path::Path;
use std::time::{Duration, Instant}; use std::time::{Duration, Instant};
use tokio_util::sync::CancellationToken; use tokio_util::sync::CancellationToken;
use tracing::{error, field::debug, info}; use tracing::{error, info, warn};
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ToolCall {
pub tool: String,
pub args: serde_json::Value,
}
#[derive(Debug, Clone)]
pub enum StreamState {
Generating,
ToolDetected(ToolCall),
Executing,
Resuming,
}
#[derive(Debug)]
pub struct StreamingToolParser {
buffer: String,
brace_count: i32,
in_tool_call: bool,
tool_start_pos: Option<usize>,
}
impl StreamingToolParser {
pub fn new() -> Self {
Self {
buffer: String::new(),
brace_count: 0,
in_tool_call: false,
tool_start_pos: None,
}
}
pub fn add_chunk(&mut self, chunk: &str) -> Option<(ToolCall, usize)> {
self.buffer.push_str(chunk);
//info!("Parser buffer now: {:?}", self.buffer);
self.detect_tool_call()
}
fn detect_tool_call(&mut self) -> Option<(ToolCall, usize)> {
//info!("Detecting tool call in buffer: {:?}", self.buffer);
// Look for the start of a tool call pattern: {"tool":
if !self.in_tool_call {
// Look for JSON tool call pattern - check both raw JSON and inside code blocks
if let Some(pos) = self.buffer.rfind(r#"{"tool":"#) {
//info!("Found tool call pattern at position: {}", pos);
// Check if this is inside a code block
let before_pos = &self.buffer[..pos];
let code_block_count = before_pos.matches("```").count();
//info!("Code block count before position {}: {}", pos, code_block_count);
// Accept tool calls both inside and outside code blocks
// The LLM might use either format despite our instructions
//info!("Starting tool call parsing (code block status: {})", code_block_count % 2 == 1);
self.in_tool_call = true;
self.tool_start_pos = Some(pos);
self.brace_count = 0; // Start counting from 0, we'll count the opening brace in parsing
// Continue parsing from after the opening brace
return self.parse_from_start_pos(pos);
}
} else {
//info!("Already in tool call, continuing parsing");
// We're already in a tool call, continue parsing
let start_pos = self.tool_start_pos.unwrap();
return self.parse_from_start_pos(start_pos);
}
None
}
fn parse_from_start_pos(&mut self, start_pos: usize) -> Option<(ToolCall, usize)> {
let remaining = self.buffer[start_pos..].to_string();
self.parse_from_position(&remaining, start_pos)
}
fn parse_from_position(&mut self, text: &str, start_pos: usize) -> Option<(ToolCall, usize)> {
let mut current_brace_count = 0; // Always start fresh for each parsing attempt
//info!("Parsing from position {} with text: {:?}", start_pos, text);
//info!("Starting brace count: {}", current_brace_count);
for (i, ch) in text.char_indices() {
match ch {
'{' => current_brace_count += 1,
'}' => {
current_brace_count -= 1;
//info!("Found '}}' at position {}, brace count now: {}", i, current_brace_count);
if current_brace_count == 0 {
// Found complete JSON object
let end_pos = start_pos + i + 1;
let json_str = &self.buffer[start_pos..end_pos];
//info!("Complete JSON found: {}", json_str);
if let Ok(tool_call) = serde_json::from_str::<ToolCall>(json_str) {
info!("Successfully parsed tool call: {:?}", tool_call);
// Reset parser state
self.in_tool_call = false;
self.tool_start_pos = None;
self.brace_count = 0;
return Some((tool_call, end_pos));
} else {
info!("Failed to parse JSON: {}", json_str);
// Invalid JSON, reset and continue looking
self.in_tool_call = false;
self.tool_start_pos = None;
self.brace_count = 0;
}
}
}
_ => {}
}
}
// Update brace count for next iteration
self.brace_count = current_brace_count;
//info!("End of parsing, final brace count: {}", current_brace_count);
None
}
pub fn get_content_before_tool(&self, tool_end_pos: usize) -> String {
if tool_end_pos <= self.buffer.len() {
self.buffer[..tool_end_pos].to_string()
} else {
self.buffer.clone()
}
}
pub fn get_remaining_content(&self, from_pos: usize) -> String {
if from_pos < self.buffer.len() {
self.buffer[from_pos..].to_string()
} else {
String::new()
}
}
}
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub struct ContextWindow { pub struct ContextWindow {
@@ -215,7 +355,7 @@ impl Agent {
pub async fn execute_task_with_timing_cancellable( pub async fn execute_task_with_timing_cancellable(
&mut self, &mut self,
description: &str, description: &str,
language: Option<&str>, _language: Option<&str>,
_auto_execute: bool, _auto_execute: bool,
show_prompt: bool, show_prompt: bool,
show_code: bool, show_code: bool,
@@ -224,59 +364,37 @@ impl Agent {
) -> Result<String> { ) -> Result<String> {
info!("Executing task: {}", description); info!("Executing task: {}", description);
let provider = self.providers.get(None)?; let _provider = self.providers.get(None)?;
let system_prompt = format!( let system_prompt = format!(
"You are G3, a general-purpose AI agent. Your goal is to analyze and write code to solve given problems. "You are G3, a general-purpose AI agent. Your goal is to analyze and solve problems step by step.
G3 uses LLMs with tool calling capability. # Tool Call Format
Tools allow external systems to provide context and data to G3. You solve higher level problems using
tools, and can interact with multiple at once. When you want to perform an action, use 'I' as the pronoun. When you need to execute a tool, write ONLY the JSON tool call on a new line:
{{\"tool\": \"tool_name\", \"args\": {{\"param\": \"value\"}}}}
The tool will execute immediately and you'll receive the result to continue with.
# Available Tools # Available Tools
- name: shell - **shell**: Execute shell commands
type: function - Format: {{\"tool\": \"shell\", \"args\": {{\"command\": \"your_command_here\"}}}}
usage: shell [command] - Example: {{\"tool\": \"shell\", \"args\": {{\"command\": \"ls ~/Downloads\"}}}}
description: \"
Execute a command in the shell.
This will return the output and error concatenated into a single string, as - **final_output**: Signal task completion
you would see from running on the command line. There will also be an indication - Format: {{\"tool\": \"final_output\", \"args\": {{\"summary\": \"what_was_accomplished\"}}}}
of if the command succeeded or failed.
Avoid commands that produce a large amount of output, and consider piping those outputs to files. # Instructions
**Important**: Each shell command runs in its own process. Things like directory changes or 1. Break down tasks into small steps
sourcing files do not persist between tool calls. So you may need to repeat them each time by 2. Execute ONE tool at a time
stringing together commands, e.g. `cd example && ls` or `source env/bin/activate && pip install numpy` 3. Wait for the result before proceeding
4. Use the actual file paths on the system (like ~/Downloads for Downloads folder)
5. End with final_output when done
Multiple commands: Use ; or && to chain commands, avoid newlines Let's start with the first step of your task.
Pathnames: Use absolute paths and avoid cd unless explicitly requested
\"
- name: final_output
type: function
usage: final_output [summary]
description: \"
This tool signals the final output for a user in a conversation and MUST be used for the final message to the user. You must
pass in a detailed summary of the work done so far.\"
# Response Guidelines
- Use Markdown formatting for all responses.
- Follow best practices for Markdown, including:
- Using headers for organization.
- Bullet points for lists.
- Links formatted correctly, either as linked text (e.g., [this is linked text](https://example.com)) or automatic links using angle brackets (e.g., <http://example.com/>).
- For code, use fenced code blocks by placing triple backticks (` ``` `) before and after the code. Include the language identifier after the opening backticks (e.g., ` ```python `) to enable syntax highlighting.
- Ensure clarity, conciseness, and proper formatting to enhance readability and usability.
IMPORTANT INSTRUCTIONS:
Break down your task into smaller steps and do one step and tool call at a time.
Do not try to use multiple tools at once.
**After you get the tool result back, consider the result and then proceed to do
the next step and tool call if required.**
"); ");
if show_prompt { if show_prompt {
@@ -337,27 +455,17 @@ the next step and tool call if required.**
}; };
self.context_window.add_message(assistant_message); self.context_window.add_message(assistant_message);
// Time the code execution with cancellation support // With streaming tool execution, we don't need separate code execution
let exec_start = Instant::now(); // The tools are already executed during streaming
let executor = CodeExecutor::new();
let result = tokio::select! {
result = executor.execute_from_response_with_options(&response_content, show_code) => result?,
_ = cancellation_token.cancelled() => {
return Err(anyhow::anyhow!("Operation cancelled by user"));
}
};
let exec_duration = exec_start.elapsed();
if show_timing { if show_timing {
let timing_summary = format!( let timing_summary = format!(
"\n⏱️ {} | 💭 {} | ⚡️ {}", "\n⏱️ {} | 💭 {}",
Self::format_duration(llm_duration), Self::format_duration(llm_duration),
Self::format_duration(think_time), Self::format_duration(think_time)
Self::format_duration(exec_duration)
); );
Ok(format!("{}\n{}", result, timing_summary)) Ok(format!("{}\n{}", response_content, timing_summary))
} else { } else {
Ok(result) Ok(response_content)
} }
} }
@@ -366,45 +474,230 @@ the next step and tool call if required.**
} }
async fn stream_completion(&self, request: CompletionRequest) -> Result<(String, Duration)> { async fn stream_completion(&self, request: CompletionRequest) -> Result<(String, Duration)> {
self.stream_completion_with_tools(request).await
}
async fn stream_completion_with_tools(
&self,
mut request: CompletionRequest,
) -> Result<(String, Duration)> {
use std::io::{self, Write};
use tokio_stream::StreamExt; use tokio_stream::StreamExt;
let provider = self.providers.get(None)?; let mut full_response = String::new();
let mut stream = provider.stream(request).await?;
let mut full_content = String::new();
let mut first_token_time: Option<Duration> = None; let mut first_token_time: Option<Duration> = None;
let stream_start = Instant::now(); let stream_start = Instant::now();
let mut total_execution_time = Duration::new(0, 0);
let mut iteration_count = 0;
const MAX_ITERATIONS: usize = 10; // Prevent infinite loops
print!("🤖 "); // Start the response indicator print!("🤖 "); // Start the response indicator
use std::io::{self, Write};
io::stdout().flush()?; io::stdout().flush()?;
while let Some(chunk_result) = stream.next().await { loop {
match chunk_result { iteration_count += 1;
Ok(chunk) => { if iteration_count > MAX_ITERATIONS {
// Record time to first token warn!("Maximum iterations reached, stopping stream");
if first_token_time.is_none() && !chunk.content.is_empty() { break;
first_token_time = Some(stream_start.elapsed()); }
}
print!("{}", chunk.content); // Add a small delay between iterations to prevent "model busy" errors
io::stdout().flush()?; if iteration_count > 1 {
full_content.push_str(&chunk.content); tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
}
if chunk.finished { let provider = self.providers.get(None)?;
break; let mut stream = match provider.stream(request.clone()).await {
Ok(s) => s,
Err(e) => {
if iteration_count > 1 && e.to_string().contains("busy") {
warn!("Model busy on iteration {}, retrying in 500ms", iteration_count);
tokio::time::sleep(tokio::time::Duration::from_millis(500)).await;
match provider.stream(request.clone()).await {
Ok(s) => s,
Err(e2) => {
error!("Failed to start stream after retry: {}", e2);
return Err(e2);
}
}
} else {
return Err(e);
} }
} }
Err(e) => { };
error!("Streaming error: {}", e); let mut parser = StreamingToolParser::new();
return Err(e); let mut current_response = String::new();
let mut tool_executed = false;
while let Some(chunk_result) = stream.next().await {
match chunk_result {
Ok(chunk) => {
// Record time to first token
if first_token_time.is_none() && !chunk.content.is_empty() {
first_token_time = Some(stream_start.elapsed());
}
// Check for tool calls in the streaming content
if let Some((tool_call, tool_end_pos)) = parser.add_chunk(&chunk.content) {
info!(
"🔧 Detected tool call: {:?} at position {}",
tool_call, tool_end_pos
);
// Found a complete tool call! Stop streaming and execute it
let content_before_tool = parser.get_content_before_tool(tool_end_pos);
// Display content up to the tool call (excluding the JSON)
let display_content = if let Some(json_start) =
content_before_tool.rfind(r#"{"tool":"#)
{
&content_before_tool[..json_start]
} else {
&content_before_tool
};
// Safely get the new content to display
let new_content = if current_response.len() <= display_content.len() {
// Use char indices to avoid UTF-8 boundary issues
let chars_already_shown = current_response.chars().count();
display_content.chars().skip(chars_already_shown).collect::<String>()
} else {
String::new()
};
print!("{}", new_content);
io::stdout().flush()?;
// Execute the tool
println!(); // New line before tool execution
let exec_start = Instant::now();
let tool_result = self.execute_tool(&tool_call).await?;
let exec_duration = exec_start.elapsed();
total_execution_time += exec_duration;
// Display tool execution result
println!("🔧 {}: {}", tool_call.tool, tool_result);
print!("🤖 "); // Continue response indicator
io::stdout().flush()?;
// Update the conversation with the tool call and result
let tool_message = Message {
role: MessageRole::Assistant,
content: format!(
"{}\n\n{{\"tool\": \"{}\", \"args\": {}}}",
display_content.trim(),
tool_call.tool,
tool_call.args
),
};
let result_message = Message {
role: MessageRole::User, // Tool results come back as user messages
content: format!("Tool result: {}", tool_result),
};
request.messages.push(tool_message);
request.messages.push(result_message);
full_response.push_str(display_content);
full_response.push_str(&format!(
"\n\nTool executed: {} -> {}\n\n",
tool_call.tool, tool_result
));
tool_executed = true;
// Break out of current stream to start a new one with updated context
break;
} else {
// No tool call detected, continue streaming normally
print!("{}", chunk.content);
io::stdout().flush()?;
current_response.push_str(&chunk.content);
}
if chunk.finished {
// Stream finished naturally without tool calls
full_response.push_str(&current_response);
println!(); // New line after streaming completes
let ttft = first_token_time.unwrap_or_else(|| stream_start.elapsed());
return Ok((full_response, ttft));
}
}
Err(e) => {
error!("Streaming error: {}", e);
// If we executed a tool, try to continue with a new stream
if tool_executed {
warn!("Stream error after tool execution, attempting to continue");
break; // Break to outer loop to start new stream
} else {
return Err(e);
}
}
} }
} }
// If we get here and no tool was executed, we're done
if !tool_executed {
full_response.push_str(&current_response);
println!(); // New line after streaming completes
let ttft = first_token_time.unwrap_or_else(|| stream_start.elapsed());
return Ok((full_response, ttft));
}
// Continue the loop to start a new stream with updated context
info!(
"Starting new stream iteration {} with {} messages",
iteration_count,
request.messages.len()
);
} }
println!(); // New line after streaming completes // If we exit the loop due to max iterations
let ttft = first_token_time.unwrap_or_else(|| stream_start.elapsed()); let ttft = first_token_time.unwrap_or_else(|| stream_start.elapsed());
Ok((full_content, ttft)) Ok((full_response, ttft))
}
async fn execute_tool(&self, tool_call: &ToolCall) -> Result<String> {
match tool_call.tool.as_str() {
"shell" => {
if let Some(command) = tool_call.args.get("command") {
if let Some(command_str) = command.as_str() {
let executor = CodeExecutor::new();
match executor.execute_code("bash", command_str).await {
Ok(result) => {
if result.success {
Ok(if result.stdout.is_empty() {
"✅ Command executed successfully".to_string()
} else {
result.stdout.trim().to_string()
})
} else {
Ok(format!("❌ Command failed: {}", result.stderr.trim()))
}
}
Err(e) => Ok(format!("❌ Execution error: {}", e)),
}
} else {
Ok("❌ Invalid command argument".to_string())
}
} else {
Ok("❌ Missing command argument".to_string())
}
}
"final_output" => {
if let Some(summary) = tool_call.args.get("summary") {
if let Some(summary_str) = summary.as_str() {
Ok(format!("📋 Final Output: {}", summary_str))
} else {
Ok("📋 Task completed".to_string())
}
} else {
Ok("📋 Task completed".to_string())
}
}
_ => {
warn!("Unknown tool: {}", tool_call.tool);
Ok(format!("❓ Unknown tool: {}", tool_call.tool))
}
}
} }
fn format_duration(duration: Duration) -> String { fn format_duration(duration: Duration) -> String {

View File

@@ -133,7 +133,7 @@ impl CodeExecutor {
} }
/// Execute code in the specified language /// Execute code in the specified language
async fn execute_code(&self, language: &str, code: &str) -> Result<ExecutionResult> { pub async fn execute_code(&self, language: &str, code: &str) -> Result<ExecutionResult> {
match language.to_lowercase().as_str() { match language.to_lowercase().as_str() {
"python" | "py" => self.execute_python(code).await, "python" | "py" => self.execute_python(code).await,
"bash" | "shell" | "sh" => self.execute_bash(code).await, "bash" | "shell" | "sh" => self.execute_bash(code).await,