diff --git a/crates/g3-core/src/lib.rs b/crates/g3-core/src/lib.rs index f8448dd..253b4e1 100644 --- a/crates/g3-core/src/lib.rs +++ b/crates/g3-core/src/lib.rs @@ -16,6 +16,7 @@ pub mod tool_dispatch; pub mod tool_definitions; pub mod tools; pub mod ui_writer; +pub mod streaming; pub mod utils; pub mod webdriver_session; @@ -1983,11 +1984,7 @@ impl Agent { let text_content = parser.get_text_content(); // Clean the content - let clean_content = text_content - .replace("<|im_end|>", "") - .replace("", "") - .replace("[/INST]", "") - .replace("<>", ""); + let clean_content = streaming::clean_llm_tokens(&text_content); // Store the raw content BEFORE filtering for the context window log let raw_content_for_log = clean_content.clone(); @@ -2046,14 +2043,8 @@ impl Agent { } else { s.clone() } - } else if s.len() > 100 { - // Use char_indices to respect UTF-8 boundaries - let truncated = s - .char_indices() - .take(100) - .map(|(_, c)| c) - .collect::(); - format!("{}...", truncated) + } else if s.chars().count() > 100 { + streaming::truncate_for_display(s, 100) } else { s.clone() } @@ -2104,25 +2095,6 @@ impl Agent { // Check if UI wants full output (machine mode) or truncated (human mode) let wants_full = self.ui_writer.wants_full_output(); - // Helper function to safely truncate strings at character boundaries - let truncate_line = - |line: &str, max_width: usize, truncate: bool| -> String { - if !truncate { - // Machine mode - return full line - line.to_string() - } else if line.chars().count() <= max_width { - // Human mode - line fits within limit - line.to_string() - } else { - // Human mode - truncate long line - let truncated: String = line - .chars() - .take(max_width.saturating_sub(3)) - .collect(); - format!("{}...", truncated) - } - }; - const MAX_LINES: usize = 5; const MAX_LINE_WIDTH: usize = 80; let output_len = output_lines.len(); @@ -2138,7 +2110,7 @@ impl Agent { if !wants_full && idx >= max_lines_to_show { break; } - let clipped_line = truncate_line(line, MAX_LINE_WIDTH, !wants_full); + let clipped_line = streaming::truncate_line(line, MAX_LINE_WIDTH, !wants_full); self.ui_writer.update_tool_output_line(&clipped_line); } @@ -2330,12 +2302,7 @@ impl Agent { // If no tool calls were completed, continue streaming normally if !tool_executed { - let clean_content = chunk - .content - .replace("<|im_end|>", "") - .replace("", "") - .replace("[/INST]", "") - .replace("<>", ""); + let clean_content = streaming::clean_llm_tokens(&chunk.content); if !clean_content.is_empty() { let filtered_content = @@ -2382,11 +2349,7 @@ impl Agent { debug!("Warning: Using parser buffer text as fallback - this may duplicate output"); // Extract only the undisplayed portion from parser buffer // Parser buffer accumulates across iterations, so we need to be careful - let clean_text = text_content - .replace("<|im_end|>", "") - .replace("", "") - .replace("[/INST]", "") - .replace("<>", ""); + let clean_text = streaming::clean_llm_tokens(&text_content); let filtered_text = self.ui_writer.filter_json_tool_calls(&clean_text); @@ -2404,91 +2367,17 @@ impl Agent { } if !has_text_response && full_response.is_empty() { - // Log detailed error information before failing - error!( - "=== STREAM ERROR: No content or tool calls received ===" + streaming::log_stream_error( + iteration_count, + &provider_name, + &provider_model, + chunks_received, + &parser, + &request, + &self.context_window, + self.session_id.as_deref(), + &raw_chunks, ); - error!("Iteration: {}/{}", iteration_count, MAX_ITERATIONS); - error!( - "Provider: {} (model: {})", - provider_name, provider_model - ); - error!("Chunks received: {}", chunks_received); - error!("Parser state:"); - error!(" - Text buffer length: {}", parser.text_buffer_len()); - error!( - " - Text buffer content: {:?}", - parser.get_text_content() - ); - error!(" - Has incomplete tool call: {}", parser.has_incomplete_tool_call()); - error!(" - Message stopped: {}", parser.is_message_stopped()); - error!(" - In JSON tool call: {}", parser.is_in_json_tool_call()); - error!(" - JSON tool start: {:?}", parser.json_tool_start_position()); - error!("Request details:"); - error!(" - Messages count: {}", request.messages.len()); - error!(" - Has tools: {}", request.tools.is_some()); - error!(" - Max tokens: {:?}", request.max_tokens); - error!(" - Temperature: {:?}", request.temperature); - error!(" - Stream: {}", request.stream); - - // Log raw chunks received - error!("Raw chunks received ({} total):", chunks_received); - for (i, chunk_str) in raw_chunks.iter().take(25).enumerate() { - error!(" [{}] {}", i, chunk_str); - } - - // Log the full request JSON - match serde_json::to_string_pretty(&request) { - Ok(json) => { - error!( - "(turn on DEBUG logging for the raw JSON request)" - ); - debug!("Full request JSON:\n{}", json); - } - Err(e) => { - error!("Failed to serialize request: {}", e); - } - } - - // Log last user message for context - if let Some(last_user_msg) = request - .messages - .iter() - .rev() - .find(|m| matches!(m.role, MessageRole::User)) - { - error!( - "Last user message: {}", - if last_user_msg.content.len() > 500 { - format!( - "{}... (truncated)", - &last_user_msg.content[..500] - ) - } else { - last_user_msg.content.clone() - } - ); - } - - // Log context window state - error!("Context window state:"); - error!( - " - Used tokens: {}/{}", - self.context_window.used_tokens, - self.context_window.total_tokens - ); - error!( - " - Percentage used: {:.1}%", - self.context_window.percentage_used() - ); - error!( - " - Conversation history length: {}", - self.context_window.conversation_history.len() - ); - - // Log session info - error!("Session ID: {:?}", self.session_id); - error!("=== END STREAM ERROR ==="); // No response received - this is an error condition warn!("Stream finished without any content or tool calls"); @@ -2565,10 +2454,7 @@ impl Agent { _last_error = Some(error_details.clone()); // Check if this is a recoverable connection error - let is_connection_error = error_msg.contains("unexpected EOF") - || error_msg.contains("connection") - || error_msg.contains("chunk size line") - || error_msg.contains("body error"); + let is_connection_error = streaming::is_connection_error(&error_msg); if is_connection_error { warn!( @@ -2633,8 +2519,7 @@ impl Agent { } else { &full_response }; - let is_empty_response = response_text.trim().is_empty() - || response_text.lines().all(|line| line.trim().is_empty() || line.trim().starts_with("⏱️")); + let is_empty_response = streaming::is_empty_response(response_text); // Check if there's an incomplete tool call in the buffer let has_incomplete_tool_call = parser.has_incomplete_tool_call(); @@ -2769,11 +2654,7 @@ impl Agent { if !full_response.trim().is_empty() { // Get the raw text from the parser (before filtering) let raw_text = parser.get_text_content(); - let raw_clean = raw_text - .replace("<|im_end|>", "") - .replace("", "") - .replace("[/INST]", "") - .replace("<>", ""); + let raw_clean = streaming::clean_llm_tokens(&raw_text); if !raw_clean.trim().is_empty() { let assistant_message = Message::new(MessageRole::Assistant, raw_clean); @@ -2908,35 +2789,16 @@ impl Agent { fn format_duration(duration: Duration) -> String { - let total_ms = duration.as_millis(); - - if total_ms < 1000 { - format!("{}ms", total_ms) - } else if total_ms < 60_000 { - let seconds = duration.as_secs_f64(); - format!("{:.1}s", seconds) - } else { - let minutes = total_ms / 60_000; - let remaining_seconds = (total_ms % 60_000) as f64 / 1000.0; - format!("{}m {:.1}s", minutes, remaining_seconds) - } + streaming::format_duration(duration) } - /// Format the timing footer with optional token usage info fn format_timing_footer( elapsed: Duration, ttft: Duration, turn_tokens: Option, context_percentage: f32, ) -> String { - let timing = format!("⏱️ {} | 💭 {}", Self::format_duration(elapsed), Self::format_duration(ttft)); - - // Add token usage info if available (dimmed) - if let Some(tokens) = turn_tokens { - format!("{} \x1b[2m{} ◉ | {:.0}%\x1b[0m", timing, tokens, context_percentage) - } else { - format!("{} \x1b[2m{:.0}%\x1b[0m", timing, context_percentage) - } + streaming::format_timing_footer(elapsed, ttft, turn_tokens, context_percentage) } } diff --git a/crates/g3-core/src/streaming.rs b/crates/g3-core/src/streaming.rs new file mode 100644 index 0000000..e4cb2a9 --- /dev/null +++ b/crates/g3-core/src/streaming.rs @@ -0,0 +1,309 @@ +//! Streaming completion logic for the Agent. +//! +//! This module handles the streaming response from LLM providers, +//! including tool call detection, execution, and auto-continue logic. + +use crate::context_window::ContextWindow; +use crate::streaming_parser::StreamingToolParser; +use crate::ToolCall; +use g3_providers::{CompletionRequest, MessageRole}; +use std::time::{Duration, Instant}; +use tracing::{debug, error}; + +/// Constants for streaming behavior +pub const MAX_ITERATIONS: usize = 400; + +/// State tracked across streaming iterations +pub struct StreamingState { + pub full_response: String, + pub first_token_time: Option, + pub stream_start: Instant, + pub iteration_count: usize, + pub response_started: bool, + pub any_tool_executed: bool, + pub auto_summary_attempts: usize, + pub final_output_called: bool, + pub turn_accumulated_usage: Option, +} + +impl StreamingState { + pub fn new() -> Self { + Self { + full_response: String::new(), + first_token_time: None, + stream_start: Instant::now(), + iteration_count: 0, + response_started: false, + any_tool_executed: false, + auto_summary_attempts: 0, + final_output_called: false, + turn_accumulated_usage: None, + } + } + + pub fn record_first_token(&mut self) { + if self.first_token_time.is_none() { + self.first_token_time = Some(self.stream_start.elapsed()); + } + } + + pub fn get_ttft(&self) -> Duration { + self.first_token_time.unwrap_or_else(|| self.stream_start.elapsed()) + } +} + +impl Default for StreamingState { + fn default() -> Self { + Self::new() + } +} + +/// State tracked within a single streaming iteration +pub struct IterationState { + pub parser: StreamingToolParser, + pub current_response: String, + pub tool_executed: bool, + pub chunks_received: usize, + pub raw_chunks: Vec, + pub accumulated_usage: Option, +} + +impl IterationState { + pub fn new() -> Self { + Self { + parser: StreamingToolParser::new(), + current_response: String::new(), + tool_executed: false, + chunks_received: 0, + raw_chunks: Vec::new(), + accumulated_usage: None, + } + } + + /// Store a raw chunk for debugging (limited to first 20 + last few) + pub fn record_chunk(&mut self, chunk: &g3_providers::CompletionChunk) { + if self.chunks_received < 20 || chunk.finished { + self.raw_chunks.push(format!( + "Chunk #{}: content={:?}, finished={}, tool_calls={:?}", + self.chunks_received + 1, + chunk.content, + chunk.finished, + chunk.tool_calls + )); + } else if self.raw_chunks.len() == 20 { + self.raw_chunks.push("... (chunks 21+ omitted for brevity) ...".to_string()); + } + self.chunks_received += 1; + } +} + +impl Default for IterationState { + fn default() -> Self { + Self::new() + } +} + +/// Clean LLM-specific tokens from content +pub fn clean_llm_tokens(content: &str) -> String { + content + .replace("<|im_end|>", "") + .replace("", "") + .replace("[/INST]", "") + .replace("<>", "") +} + +/// Format a duration for display +pub fn format_duration(duration: Duration) -> String { + let total_ms = duration.as_millis(); + + if total_ms < 1000 { + format!("{}ms", total_ms) + } else if total_ms < 60_000 { + format!("{:.1}s", duration.as_secs_f64()) + } else { + let minutes = total_ms / 60_000; + let remaining_seconds = (total_ms % 60_000) as f64 / 1000.0; + format!("{}m {:.1}s", minutes, remaining_seconds) + } +} + +/// Format the timing footer with optional token usage info +pub fn format_timing_footer( + elapsed: Duration, + ttft: Duration, + turn_tokens: Option, + context_percentage: f32, +) -> String { + let timing = format!( + "⏱️ {} | 💭 {}", + format_duration(elapsed), + format_duration(ttft) + ); + + // Add token usage info if available (dimmed) + if let Some(tokens) = turn_tokens { + format!( + "{} \x1b[2m{} ◉ | {:.0}%\x1b[0m", + timing, tokens, context_percentage + ) + } else { + format!("{} \x1b[2m{:.0}%\x1b[0m", timing, context_percentage) + } +} + +/// Log detailed error information when stream produces no content +pub fn log_stream_error( + iteration_count: usize, + provider_name: &str, + provider_model: &str, + chunks_received: usize, + parser: &StreamingToolParser, + request: &CompletionRequest, + context_window: &ContextWindow, + session_id: Option<&str>, + raw_chunks: &[String], +) { + error!("=== STREAM ERROR: No content or tool calls received ==="); + error!("Iteration: {}/{}", iteration_count, MAX_ITERATIONS); + error!("Provider: {} (model: {})", provider_name, provider_model); + error!("Chunks received: {}", chunks_received); + + error!("Parser state:"); + error!(" - Text buffer length: {}", parser.text_buffer_len()); + error!(" - Text buffer content: {:?}", parser.get_text_content()); + error!(" - Has incomplete tool call: {}", parser.has_incomplete_tool_call()); + error!(" - Message stopped: {}", parser.is_message_stopped()); + error!(" - In JSON tool call: {}", parser.is_in_json_tool_call()); + error!(" - JSON tool start: {:?}", parser.json_tool_start_position()); + + error!("Request details:"); + error!(" - Messages count: {}", request.messages.len()); + error!(" - Has tools: {}", request.tools.is_some()); + error!(" - Max tokens: {:?}", request.max_tokens); + error!(" - Temperature: {:?}", request.temperature); + error!(" - Stream: {}", request.stream); + + error!("Raw chunks received ({} total):", chunks_received); + for (i, chunk_str) in raw_chunks.iter().take(25).enumerate() { + error!(" [{}] {}", i, chunk_str); + } + + match serde_json::to_string_pretty(request) { + Ok(json) => { + error!("(turn on DEBUG logging for the raw JSON request)"); + debug!("Full request JSON:\n{}", json); + } + Err(e) => error!("Failed to serialize request: {}", e), + } + + if let Some(last_user_msg) = request + .messages + .iter() + .rev() + .find(|m| matches!(m.role, MessageRole::User)) + { + let truncated = if last_user_msg.content.len() > 500 { + format!("{}... (truncated)", &last_user_msg.content[..500]) + } else { + last_user_msg.content.clone() + }; + error!("Last user message: {}", truncated); + } + + error!("Context window state:"); + error!( + " - Used tokens: {}/{}", + context_window.used_tokens, context_window.total_tokens + ); + error!(" - Percentage used: {:.1}%", context_window.percentage_used()); + error!( + " - Conversation history length: {}", + context_window.conversation_history.len() + ); + + error!("Session ID: {:?}", session_id); + error!("=== END STREAM ERROR ==="); +} + +/// Truncate a string value for display, respecting UTF-8 boundaries +pub fn truncate_for_display(s: &str, max_len: usize) -> String { + if s.len() <= max_len { + s.to_string() + } else { + let truncated: String = s.char_indices().take(max_len).map(|(_, c)| c).collect(); + format!("{}...", truncated) + } +} + +/// Truncate a line for tool output display +pub fn truncate_line(line: &str, max_width: usize, should_truncate: bool) -> String { + if !should_truncate { + line.to_string() + } else if line.chars().count() <= max_width { + line.to_string() + } else { + let truncated: String = line.chars().take(max_width.saturating_sub(3)).collect(); + format!("{}...", truncated) + } +} + +/// Check if two tool calls are duplicates (same tool and args) +pub fn are_tool_calls_duplicate(tc1: &ToolCall, tc2: &ToolCall) -> bool { + tc1.tool == tc2.tool && tc1.args == tc2.args +} + +/// Determine if a response is essentially empty (whitespace or timing only) +pub fn is_empty_response(response: &str) -> bool { + response.trim().is_empty() + || response + .lines() + .all(|line| line.trim().is_empty() || line.trim().starts_with("⏱️")) +} + +/// Check if an error is a recoverable connection error +pub fn is_connection_error(error_msg: &str) -> bool { + error_msg.contains("unexpected EOF") + || error_msg.contains("connection") + || error_msg.contains("chunk size line") + || error_msg.contains("body error") +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_clean_llm_tokens() { + assert_eq!(clean_llm_tokens("hello<|im_end|>"), "hello"); + assert_eq!(clean_llm_tokens("testmore"), "testmore"); + assert_eq!(clean_llm_tokens("[/INST]response"), "response"); + } + + #[test] + fn test_format_duration() { + assert_eq!(format_duration(Duration::from_millis(500)), "500ms"); + assert_eq!(format_duration(Duration::from_millis(1500)), "1.5s"); + assert_eq!(format_duration(Duration::from_secs(90)), "1m 30.0s"); + } + + #[test] + fn test_truncate_for_display() { + assert_eq!(truncate_for_display("short", 10), "short"); + assert_eq!(truncate_for_display("this is long", 5), "this ..."); + } + + #[test] + fn test_is_empty_response() { + assert!(is_empty_response("")); + assert!(is_empty_response(" \n ")); + assert!(is_empty_response("⏱️ 1.5s")); + assert!(!is_empty_response("actual content")); + } + + #[test] + fn test_is_connection_error() { + assert!(is_connection_error("unexpected EOF during read")); + assert!(is_connection_error("connection reset")); + assert!(!is_connection_error("invalid JSON")); + } +}