hopefully a bit better tool call detection

This commit is contained in:
Dhanji Prasanna
2025-10-02 10:27:58 +10:00
parent 9638f40cfb
commit e324ddd99d
4 changed files with 194 additions and 19 deletions

1
Cargo.lock generated
View File

@@ -1015,6 +1015,7 @@ dependencies = [
"g3-execution", "g3-execution",
"g3-providers", "g3-providers",
"rand", "rand",
"regex",
"reqwest", "reqwest",
"serde", "serde",
"serde_json", "serde_json",

View File

@@ -307,7 +307,31 @@ async fn execute_task(agent: &mut Agent, input: &str, show_prompt: bool, show_co
if e.to_string().contains("cancelled") { if e.to_string().contains("cancelled") {
output.print("⚠️ Operation cancelled by user"); output.print("⚠️ Operation cancelled by user");
} else { } else {
// Enhanced error logging with detailed information
error!("=== TASK EXECUTION ERROR ===");
error!("Error: {}", e); error!("Error: {}", e);
// Log error chain
let mut source = e.source();
let mut depth = 1;
while let Some(err) = source {
error!(" Caused by [{}]: {}", depth, err);
source = err.source();
depth += 1;
}
// Log additional context
error!("Task input: {}", input);
error!("Error type: {}", std::any::type_name_of_val(&e));
// Display user-friendly error message
output.print(&format!("❌ Error: {}", e));
// If it's a stream error, provide helpful guidance
if e.to_string().contains("No response received") {
output.print("💡 This may be a temporary issue. Please try again or check the logs for more details.");
output.print(" Log files are saved in the 'logs/' directory.");
}
} }
} }
} }

View File

@@ -22,3 +22,4 @@ tokio-util = "0.7"
futures-util = "0.3" futures-util = "0.3"
chrono = { version = "0.4", features = ["serde"] } chrono = { version = "0.4", features = ["serde"] }
rand = "0.8" rand = "0.8"
regex = "1.0"

View File

@@ -7,6 +7,7 @@ use anyhow::Result;
use g3_config::Config; use g3_config::Config;
use g3_execution::CodeExecutor; use g3_execution::CodeExecutor;
use g3_providers::{CompletionRequest, Message, MessageRole, ProviderRegistry, Tool}; use g3_providers::{CompletionRequest, Message, MessageRole, ProviderRegistry, Tool};
use regex::Regex;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use serde_json::json; use serde_json::json;
use std::time::{Duration, Instant}; use std::time::{Duration, Instant};
@@ -993,6 +994,13 @@ The tool will execute immediately and you'll receive the result (success or erro
if attempt > 1 { if attempt > 1 {
info!("Stream started successfully after {} attempts", attempt); info!("Stream started successfully after {} attempts", attempt);
} }
debug!("Stream started successfully");
debug!(
"Request had {} messages, tools={}, max_tokens={:?}",
request.messages.len(),
request.tools.is_some(),
request.max_tokens
);
return Ok(stream); return Ok(stream);
} }
Err(e) if attempt < MAX_ATTEMPTS => { Err(e) if attempt < MAX_ATTEMPTS => {
@@ -1194,10 +1202,20 @@ The tool will execute immediately and you'll receive the result (success or erro
.unwrap_or_else(|_| "Failed to serialize request".to_string()), .unwrap_or_else(|_| "Failed to serialize request".to_string()),
); );
// Log initial request details
debug!("Starting stream with provider={}, model={}, messages={}, tools={}, max_tokens={:?}",
provider.name(),
provider.model(),
request.messages.len(),
request.tools.is_some(),
request.max_tokens
);
// Try to get stream with retry logic // Try to get stream with retry logic
let mut stream = match self.stream_with_retry(&request, &error_context).await { let mut stream = match self.stream_with_retry(&request, &error_context).await {
Ok(s) => s, Ok(s) => s,
Err(e) => { Err(e) => {
error!("Failed to start stream: {}", e);
// Additional retry for "busy" errors on subsequent iterations // Additional retry for "busy" errors on subsequent iterations
if iteration_count > 1 && e.to_string().contains("busy") { if iteration_count > 1 && e.to_string().contains("busy") {
warn!( warn!(
@@ -1210,6 +1228,7 @@ The tool will execute immediately and you'll receive the result (success or erro
Ok(s) => s, Ok(s) => s,
Err(e2) => { Err(e2) => {
error!("Failed to start stream after retry: {}", e2); error!("Failed to start stream after retry: {}", e2);
error_context.clone().log_error(&e2);
return Err(e2); return Err(e2);
} }
} }
@@ -1223,18 +1242,45 @@ The tool will execute immediately and you'll receive the result (success or erro
let mut current_response = String::new(); let mut current_response = String::new();
let mut tool_executed = false; let mut tool_executed = false;
let mut chunks_received = 0; let mut chunks_received = 0;
let mut raw_chunks: Vec<String> = Vec::new(); // Store raw chunks for debugging
let mut _last_error: Option<String> = None;
while let Some(chunk_result) = stream.next().await { while let Some(chunk_result) = stream.next().await {
match chunk_result { match chunk_result {
Ok(chunk) => { Ok(chunk) => {
// Store raw chunk for debugging (limit to first 20 and last 5)
if chunks_received < 20 || chunk.finished {
raw_chunks.push(format!(
"Chunk #{}: content={:?}, finished={}, tool_calls={:?}",
chunks_received + 1,
chunk.content,
chunk.finished,
chunk.tool_calls
));
} else if raw_chunks.len() == 20 {
raw_chunks.push("... (chunks 21+ omitted for brevity) ...".to_string());
}
// Record time to first token // Record time to first token
if first_token_time.is_none() && !chunk.content.is_empty() { if first_token_time.is_none() && !chunk.content.is_empty() {
first_token_time = Some(stream_start.elapsed()); first_token_time = Some(stream_start.elapsed());
} }
chunks_received += 1; chunks_received += 1;
if chunks_received == 1 { if chunks_received == 1 {
debug!("First chunk received: content_len={}, finished={}", chunk.content.len(), chunk.finished); debug!(
"First chunk received: content_len={}, finished={}",
chunk.content.len(),
chunk.finished
);
}
// Log raw chunk data for debugging
if chunks_received <= 5 || chunk.finished {
debug!(
"Chunk #{}: content={:?}, finished={}, tool_calls={:?}",
chunks_received, chunk.content, chunk.finished, chunk.tool_calls
);
} }
// Process chunk with the new parser // Process chunk with the new parser
@@ -1257,7 +1303,7 @@ The tool will execute immediately and you'll receive the result (success or erro
// Filter out JSON tool calls from the display // Filter out JSON tool calls from the display
let filtered_content = filter_json_tool_calls(&clean_content); let filtered_content = filter_json_tool_calls(&clean_content);
let final_display_content = filtered_content.trim(); let final_display_content = filtered_content.trim();
// Display any new content before tool execution // Display any new content before tool execution
let new_content = let new_content =
if current_response.len() <= final_display_content.len() { if current_response.len() <= final_display_content.len() {
@@ -1448,14 +1494,98 @@ The tool will execute immediately and you'll receive the result (success or erro
} }
if chunk.finished { if chunk.finished {
debug!("Stream finished: tool_executed={}, current_response_len={}, full_response_len={}, chunks_received={}", debug!("Stream finished: tool_executed={}, current_response_len={}, full_response_len={}, chunks_received={}",
tool_executed, current_response.len(), full_response.len(), chunks_received); tool_executed, current_response.len(), full_response.len(), chunks_received);
// Stream finished - check if we should continue or return // Stream finished - check if we should continue or return
if !tool_executed { if !tool_executed {
// No tools were executed in this iteration // No tools were executed in this iteration
// Check if we got any response at all // Check if we got any response at all
if current_response.is_empty() && full_response.is_empty() { if current_response.is_empty() && full_response.is_empty() {
// Log detailed error information before failing
error!(
"=== STREAM ERROR: No content or tool calls received ==="
);
error!("Iteration: {}/{}", iteration_count, MAX_ITERATIONS);
error!(
"Provider: {} (model: {})",
provider.name(),
provider.model()
);
error!("Chunks received: {}", chunks_received);
error!("Parser state:");
error!(" - Text buffer length: {}", parser.text_buffer_len());
error!(
" - Text buffer content: {:?}",
parser.get_text_content()
);
error!(" - Native tool calls: {:?}", parser.native_tool_calls);
error!(" - Message stopped: {}", parser.is_message_stopped());
error!(" - In JSON tool call: {}", parser.in_json_tool_call);
error!(" - JSON tool start: {:?}", parser.json_tool_start);
error!("Request details:");
error!(" - Messages count: {}", request.messages.len());
error!(" - Has tools: {}", request.tools.is_some());
error!(" - Max tokens: {:?}", request.max_tokens);
error!(" - Temperature: {:?}", request.temperature);
error!(" - Stream: {}", request.stream);
// Log raw chunks received
error!("Raw chunks received ({} total):", chunks_received);
for (i, chunk_str) in raw_chunks.iter().take(25).enumerate() {
error!(" [{}] {}", i, chunk_str);
}
// Log the full request JSON
match serde_json::to_string_pretty(&request) {
Ok(json) => {
error!("Full request JSON:\n{}", json);
}
Err(e) => {
error!("Failed to serialize request: {}", e);
}
}
// Log last user message for context
if let Some(last_user_msg) = request
.messages
.iter()
.rev()
.find(|m| matches!(m.role, MessageRole::User))
{
error!(
"Last user message: {}",
if last_user_msg.content.len() > 500 {
format!(
"{}... (truncated)",
&last_user_msg.content[..500]
)
} else {
last_user_msg.content.clone()
}
);
}
// Log context window state
error!("Context window state:");
error!(
" - Used tokens: {}/{}",
self.context_window.used_tokens,
self.context_window.total_tokens
);
error!(
" - Percentage used: {:.1}%",
self.context_window.percentage_used()
);
error!(
" - Conversation history length: {}",
self.context_window.conversation_history.len()
);
// Log session info
error!("Session ID: {:?}", self.session_id);
error!("=== END STREAM ERROR ===");
// No response received - this is an error condition // No response received - this is an error condition
warn!("Stream finished without any content or tool calls"); warn!("Stream finished without any content or tool calls");
warn!("Chunks received: {}", chunks_received); warn!("Chunks received: {}", chunks_received);
@@ -1463,26 +1593,41 @@ The tool will execute immediately and you'll receive the result (success or erro
"No response received from the model. The model may be experiencing issues or the request may have been malformed." "No response received from the model. The model may be experiencing issues or the request may have been malformed."
)); ));
} }
// Add current response to full response if we have any // Add current response to full response if we have any
if !current_response.is_empty() { if !current_response.is_empty() {
full_response.push_str(&current_response); full_response.push_str(&current_response);
} }
println!(); println!();
let ttft = first_token_time.unwrap_or_else(|| stream_start.elapsed()); let ttft =
first_token_time.unwrap_or_else(|| stream_start.elapsed());
return Ok((full_response, ttft)); return Ok((full_response, ttft));
} }
break; // Tool was executed, break to continue outer loop break; // Tool was executed, break to continue outer loop
} }
} }
Err(e) => { Err(e) => {
error!("Streaming error: {}", e); // Capture detailed streaming error information
let error_details =
format!("Streaming error at chunk {}: {}", chunks_received + 1, e);
error!("{}", error_details);
error!("Error type: {}", std::any::type_name_of_val(&e));
error!("Parser state at error: text_buffer_len={}, native_tool_calls={}, message_stopped={}",
parser.text_buffer_len(), parser.native_tool_calls.len(), parser.is_message_stopped());
// Store the error for potential logging later
_last_error = Some(error_details);
if tool_executed { if tool_executed {
warn!("Stream error after tool execution, attempting to continue"); warn!("Stream error after tool execution, attempting to continue");
break; // Break to outer loop to start new stream break; // Break to outer loop to start new stream
} else { } else {
// Log raw chunks before failing
error!("Fatal streaming error. Raw chunks received before error:");
for chunk_str in raw_chunks.iter().take(10) {
error!(" {}", chunk_str);
}
return Err(e); return Err(e);
} }
} }
@@ -1492,12 +1637,15 @@ The tool will execute immediately and you'll receive the result (success or erro
// If we get here and no tool was executed, we're done // If we get here and no tool was executed, we're done
if !tool_executed { if !tool_executed {
if current_response.is_empty() && full_response.is_empty() { if current_response.is_empty() && full_response.is_empty() {
warn!("Loop exited without any response after {} iterations", iteration_count); warn!(
"Loop exited without any response after {} iterations",
iteration_count
);
} else { } else {
full_response.push_str(&current_response); full_response.push_str(&current_response);
println!(); println!();
} }
let ttft = first_token_time.unwrap_or_else(|| stream_start.elapsed()); let ttft = first_token_time.unwrap_or_else(|| stream_start.elapsed());
return Ok((full_response, ttft)); return Ok((full_response, ttft));
} }
@@ -1991,7 +2139,7 @@ impl JsonToolState {
buffer: String::new(), buffer: String::new(),
} }
} }
fn reset(&mut self) { fn reset(&mut self) {
self.suppression_mode = false; self.suppression_mode = false;
self.brace_depth = 0; self.brace_depth = 0;
@@ -2003,12 +2151,12 @@ impl JsonToolState {
fn filter_json_tool_calls(content: &str) -> String { fn filter_json_tool_calls(content: &str) -> String {
JSON_TOOL_STATE.with(|state| { JSON_TOOL_STATE.with(|state| {
let mut state = state.borrow_mut(); let mut state = state.borrow_mut();
// If we're already in suppression mode, continue tracking // If we're already in suppression mode, continue tracking
if state.suppression_mode { if state.suppression_mode {
// Add content to buffer for tracking // Add content to buffer for tracking
state.buffer.push_str(content); state.buffer.push_str(content);
// Count braces to track JSON nesting depth // Count braces to track JSON nesting depth
for ch in content.chars() { for ch in content.chars() {
match ch { match ch {
@@ -2058,7 +2206,7 @@ fn filter_json_tool_calls(content: &str) -> String {
state.brace_depth = 0; state.brace_depth = 0;
state.buffer.clear(); state.buffer.clear();
state.buffer.push_str(&content[pos..]); state.buffer.push_str(&content[pos..]);
// Count braces in the remaining content after the pattern // Count braces in the remaining content after the pattern
for ch in content[pos..].chars() { for ch in content[pos..].chars() {
match ch { match ch {
@@ -2074,7 +2222,7 @@ fn filter_json_tool_calls(content: &str) -> String {
_ => {} _ => {}
} }
} }
// Return any content before the JSON tool call // Return any content before the JSON tool call
if pos > 0 { if pos > 0 {
return content[..pos].to_string(); return content[..pos].to_string();
@@ -2099,7 +2247,8 @@ fn filter_json_tool_calls(content: &str) -> String {
} }
// Check if this looks like the start of a JSON tool call (larger chunks) // Check if this looks like the start of a JSON tool call (larger chunks)
if trimmed.starts_with('{') && (trimmed.contains("tool") || trimmed.contains('"')) { let pattern = Regex::new(r#"\{\s*"tool"\s*:"#).unwrap();
if pattern.is_match(trimmed) {
// This might be the start of a JSON tool call // This might be the start of a JSON tool call
// Enter suppression mode preemptively // Enter suppression mode preemptively
debug!("Detected potential JSON tool call start - entering suppression mode"); debug!("Detected potential JSON tool call start - entering suppression mode");
@@ -2107,7 +2256,7 @@ fn filter_json_tool_calls(content: &str) -> String {
state.brace_depth = 0; state.brace_depth = 0;
state.buffer.clear(); state.buffer.clear();
state.buffer.push_str(content); state.buffer.push_str(content);
// Count braces // Count braces
for ch in content.chars() { for ch in content.chars() {
match ch { match ch {
@@ -2122,7 +2271,7 @@ fn filter_json_tool_calls(content: &str) -> String {
_ => {} _ => {}
} }
} }
return String::new(); return String::new();
} }