refactor(g3-core): Extract streaming utilities into dedicated module

Extract reusable utilities from the massive stream_completion_with_tools
function into a new streaming.rs module for improved readability:

- format_duration, format_timing_footer: timing display helpers
- clean_llm_tokens: consolidates 4 duplicate token-cleaning call sites
- log_stream_error: extracts 70+ lines of error logging
- is_empty_response, is_connection_error: predicate helpers
- truncate_for_display, truncate_line: string truncation utilities
- StreamingState, IterationState: state structs for future refactoring

Results:
- lib.rs reduced from 2978 to 2840 lines (138 lines, ~5%)
- New streaming.rs: 309 lines with 5 unit tests
- All 98+ tests pass

Agent: carmack
This commit is contained in:
Dhanji R. Prasanna
2026-01-08 13:20:11 +11:00
parent 267ef00848
commit 381b852869
2 changed files with 331 additions and 160 deletions

View File

@@ -16,6 +16,7 @@ pub mod tool_dispatch;
pub mod tool_definitions; pub mod tool_definitions;
pub mod tools; pub mod tools;
pub mod ui_writer; pub mod ui_writer;
pub mod streaming;
pub mod utils; pub mod utils;
pub mod webdriver_session; pub mod webdriver_session;
@@ -1983,11 +1984,7 @@ impl<W: UiWriter> Agent<W> {
let text_content = parser.get_text_content(); let text_content = parser.get_text_content();
// Clean the content // Clean the content
let clean_content = text_content let clean_content = streaming::clean_llm_tokens(&text_content);
.replace("<|im_end|>", "")
.replace("</s>", "")
.replace("[/INST]", "")
.replace("<</SYS>>", "");
// Store the raw content BEFORE filtering for the context window log // Store the raw content BEFORE filtering for the context window log
let raw_content_for_log = clean_content.clone(); let raw_content_for_log = clean_content.clone();
@@ -2046,14 +2043,8 @@ impl<W: UiWriter> Agent<W> {
} else { } else {
s.clone() s.clone()
} }
} else if s.len() > 100 { } else if s.chars().count() > 100 {
// Use char_indices to respect UTF-8 boundaries streaming::truncate_for_display(s, 100)
let truncated = s
.char_indices()
.take(100)
.map(|(_, c)| c)
.collect::<String>();
format!("{}...", truncated)
} else { } else {
s.clone() s.clone()
} }
@@ -2104,25 +2095,6 @@ impl<W: UiWriter> Agent<W> {
// Check if UI wants full output (machine mode) or truncated (human mode) // Check if UI wants full output (machine mode) or truncated (human mode)
let wants_full = self.ui_writer.wants_full_output(); let wants_full = self.ui_writer.wants_full_output();
// Helper function to safely truncate strings at character boundaries
let truncate_line =
|line: &str, max_width: usize, truncate: bool| -> String {
if !truncate {
// Machine mode - return full line
line.to_string()
} else if line.chars().count() <= max_width {
// Human mode - line fits within limit
line.to_string()
} else {
// Human mode - truncate long line
let truncated: String = line
.chars()
.take(max_width.saturating_sub(3))
.collect();
format!("{}...", truncated)
}
};
const MAX_LINES: usize = 5; const MAX_LINES: usize = 5;
const MAX_LINE_WIDTH: usize = 80; const MAX_LINE_WIDTH: usize = 80;
let output_len = output_lines.len(); let output_len = output_lines.len();
@@ -2138,7 +2110,7 @@ impl<W: UiWriter> Agent<W> {
if !wants_full && idx >= max_lines_to_show { if !wants_full && idx >= max_lines_to_show {
break; break;
} }
let clipped_line = truncate_line(line, MAX_LINE_WIDTH, !wants_full); let clipped_line = streaming::truncate_line(line, MAX_LINE_WIDTH, !wants_full);
self.ui_writer.update_tool_output_line(&clipped_line); self.ui_writer.update_tool_output_line(&clipped_line);
} }
@@ -2330,12 +2302,7 @@ impl<W: UiWriter> Agent<W> {
// If no tool calls were completed, continue streaming normally // If no tool calls were completed, continue streaming normally
if !tool_executed { if !tool_executed {
let clean_content = chunk let clean_content = streaming::clean_llm_tokens(&chunk.content);
.content
.replace("<|im_end|>", "")
.replace("</s>", "")
.replace("[/INST]", "")
.replace("<</SYS>>", "");
if !clean_content.is_empty() { if !clean_content.is_empty() {
let filtered_content = let filtered_content =
@@ -2382,11 +2349,7 @@ impl<W: UiWriter> Agent<W> {
debug!("Warning: Using parser buffer text as fallback - this may duplicate output"); debug!("Warning: Using parser buffer text as fallback - this may duplicate output");
// Extract only the undisplayed portion from parser buffer // Extract only the undisplayed portion from parser buffer
// Parser buffer accumulates across iterations, so we need to be careful // Parser buffer accumulates across iterations, so we need to be careful
let clean_text = text_content let clean_text = streaming::clean_llm_tokens(&text_content);
.replace("<|im_end|>", "")
.replace("</s>", "")
.replace("[/INST]", "")
.replace("<</SYS>>", "");
let filtered_text = let filtered_text =
self.ui_writer.filter_json_tool_calls(&clean_text); self.ui_writer.filter_json_tool_calls(&clean_text);
@@ -2404,91 +2367,17 @@ impl<W: UiWriter> Agent<W> {
} }
if !has_text_response && full_response.is_empty() { if !has_text_response && full_response.is_empty() {
// Log detailed error information before failing streaming::log_stream_error(
error!( iteration_count,
"=== STREAM ERROR: No content or tool calls received ===" &provider_name,
&provider_model,
chunks_received,
&parser,
&request,
&self.context_window,
self.session_id.as_deref(),
&raw_chunks,
); );
error!("Iteration: {}/{}", iteration_count, MAX_ITERATIONS);
error!(
"Provider: {} (model: {})",
provider_name, provider_model
);
error!("Chunks received: {}", chunks_received);
error!("Parser state:");
error!(" - Text buffer length: {}", parser.text_buffer_len());
error!(
" - Text buffer content: {:?}",
parser.get_text_content()
);
error!(" - Has incomplete tool call: {}", parser.has_incomplete_tool_call());
error!(" - Message stopped: {}", parser.is_message_stopped());
error!(" - In JSON tool call: {}", parser.is_in_json_tool_call());
error!(" - JSON tool start: {:?}", parser.json_tool_start_position());
error!("Request details:");
error!(" - Messages count: {}", request.messages.len());
error!(" - Has tools: {}", request.tools.is_some());
error!(" - Max tokens: {:?}", request.max_tokens);
error!(" - Temperature: {:?}", request.temperature);
error!(" - Stream: {}", request.stream);
// Log raw chunks received
error!("Raw chunks received ({} total):", chunks_received);
for (i, chunk_str) in raw_chunks.iter().take(25).enumerate() {
error!(" [{}] {}", i, chunk_str);
}
// Log the full request JSON
match serde_json::to_string_pretty(&request) {
Ok(json) => {
error!(
"(turn on DEBUG logging for the raw JSON request)"
);
debug!("Full request JSON:\n{}", json);
}
Err(e) => {
error!("Failed to serialize request: {}", e);
}
}
// Log last user message for context
if let Some(last_user_msg) = request
.messages
.iter()
.rev()
.find(|m| matches!(m.role, MessageRole::User))
{
error!(
"Last user message: {}",
if last_user_msg.content.len() > 500 {
format!(
"{}... (truncated)",
&last_user_msg.content[..500]
)
} else {
last_user_msg.content.clone()
}
);
}
// Log context window state
error!("Context window state:");
error!(
" - Used tokens: {}/{}",
self.context_window.used_tokens,
self.context_window.total_tokens
);
error!(
" - Percentage used: {:.1}%",
self.context_window.percentage_used()
);
error!(
" - Conversation history length: {}",
self.context_window.conversation_history.len()
);
// Log session info
error!("Session ID: {:?}", self.session_id);
error!("=== END STREAM ERROR ===");
// No response received - this is an error condition // No response received - this is an error condition
warn!("Stream finished without any content or tool calls"); warn!("Stream finished without any content or tool calls");
@@ -2565,10 +2454,7 @@ impl<W: UiWriter> Agent<W> {
_last_error = Some(error_details.clone()); _last_error = Some(error_details.clone());
// Check if this is a recoverable connection error // Check if this is a recoverable connection error
let is_connection_error = error_msg.contains("unexpected EOF") let is_connection_error = streaming::is_connection_error(&error_msg);
|| error_msg.contains("connection")
|| error_msg.contains("chunk size line")
|| error_msg.contains("body error");
if is_connection_error { if is_connection_error {
warn!( warn!(
@@ -2633,8 +2519,7 @@ impl<W: UiWriter> Agent<W> {
} else { } else {
&full_response &full_response
}; };
let is_empty_response = response_text.trim().is_empty() let is_empty_response = streaming::is_empty_response(response_text);
|| response_text.lines().all(|line| line.trim().is_empty() || line.trim().starts_with("⏱️"));
// Check if there's an incomplete tool call in the buffer // Check if there's an incomplete tool call in the buffer
let has_incomplete_tool_call = parser.has_incomplete_tool_call(); let has_incomplete_tool_call = parser.has_incomplete_tool_call();
@@ -2769,11 +2654,7 @@ impl<W: UiWriter> Agent<W> {
if !full_response.trim().is_empty() { if !full_response.trim().is_empty() {
// Get the raw text from the parser (before filtering) // Get the raw text from the parser (before filtering)
let raw_text = parser.get_text_content(); let raw_text = parser.get_text_content();
let raw_clean = raw_text let raw_clean = streaming::clean_llm_tokens(&raw_text);
.replace("<|im_end|>", "")
.replace("</s>", "")
.replace("[/INST]", "")
.replace("<</SYS>>", "");
if !raw_clean.trim().is_empty() { if !raw_clean.trim().is_empty() {
let assistant_message = Message::new(MessageRole::Assistant, raw_clean); let assistant_message = Message::new(MessageRole::Assistant, raw_clean);
@@ -2908,35 +2789,16 @@ impl<W: UiWriter> Agent<W> {
fn format_duration(duration: Duration) -> String { fn format_duration(duration: Duration) -> String {
let total_ms = duration.as_millis(); streaming::format_duration(duration)
if total_ms < 1000 {
format!("{}ms", total_ms)
} else if total_ms < 60_000 {
let seconds = duration.as_secs_f64();
format!("{:.1}s", seconds)
} else {
let minutes = total_ms / 60_000;
let remaining_seconds = (total_ms % 60_000) as f64 / 1000.0;
format!("{}m {:.1}s", minutes, remaining_seconds)
}
} }
/// Format the timing footer with optional token usage info
fn format_timing_footer( fn format_timing_footer(
elapsed: Duration, elapsed: Duration,
ttft: Duration, ttft: Duration,
turn_tokens: Option<u32>, turn_tokens: Option<u32>,
context_percentage: f32, context_percentage: f32,
) -> String { ) -> String {
let timing = format!("⏱️ {} | 💭 {}", Self::format_duration(elapsed), Self::format_duration(ttft)); streaming::format_timing_footer(elapsed, ttft, turn_tokens, context_percentage)
// Add token usage info if available (dimmed)
if let Some(tokens) = turn_tokens {
format!("{} \x1b[2m{} ◉ | {:.0}%\x1b[0m", timing, tokens, context_percentage)
} else {
format!("{} \x1b[2m{:.0}%\x1b[0m", timing, context_percentage)
}
} }
} }

View File

@@ -0,0 +1,309 @@
//! Streaming completion logic for the Agent.
//!
//! This module handles the streaming response from LLM providers,
//! including tool call detection, execution, and auto-continue logic.
use crate::context_window::ContextWindow;
use crate::streaming_parser::StreamingToolParser;
use crate::ToolCall;
use g3_providers::{CompletionRequest, MessageRole};
use std::time::{Duration, Instant};
use tracing::{debug, error};
/// Constants for streaming behavior
pub const MAX_ITERATIONS: usize = 400;
/// State tracked across streaming iterations
pub struct StreamingState {
pub full_response: String,
pub first_token_time: Option<Duration>,
pub stream_start: Instant,
pub iteration_count: usize,
pub response_started: bool,
pub any_tool_executed: bool,
pub auto_summary_attempts: usize,
pub final_output_called: bool,
pub turn_accumulated_usage: Option<g3_providers::Usage>,
}
impl StreamingState {
pub fn new() -> Self {
Self {
full_response: String::new(),
first_token_time: None,
stream_start: Instant::now(),
iteration_count: 0,
response_started: false,
any_tool_executed: false,
auto_summary_attempts: 0,
final_output_called: false,
turn_accumulated_usage: None,
}
}
pub fn record_first_token(&mut self) {
if self.first_token_time.is_none() {
self.first_token_time = Some(self.stream_start.elapsed());
}
}
pub fn get_ttft(&self) -> Duration {
self.first_token_time.unwrap_or_else(|| self.stream_start.elapsed())
}
}
impl Default for StreamingState {
fn default() -> Self {
Self::new()
}
}
/// State tracked within a single streaming iteration
pub struct IterationState {
pub parser: StreamingToolParser,
pub current_response: String,
pub tool_executed: bool,
pub chunks_received: usize,
pub raw_chunks: Vec<String>,
pub accumulated_usage: Option<g3_providers::Usage>,
}
impl IterationState {
pub fn new() -> Self {
Self {
parser: StreamingToolParser::new(),
current_response: String::new(),
tool_executed: false,
chunks_received: 0,
raw_chunks: Vec::new(),
accumulated_usage: None,
}
}
/// Store a raw chunk for debugging (limited to first 20 + last few)
pub fn record_chunk(&mut self, chunk: &g3_providers::CompletionChunk) {
if self.chunks_received < 20 || chunk.finished {
self.raw_chunks.push(format!(
"Chunk #{}: content={:?}, finished={}, tool_calls={:?}",
self.chunks_received + 1,
chunk.content,
chunk.finished,
chunk.tool_calls
));
} else if self.raw_chunks.len() == 20 {
self.raw_chunks.push("... (chunks 21+ omitted for brevity) ...".to_string());
}
self.chunks_received += 1;
}
}
impl Default for IterationState {
fn default() -> Self {
Self::new()
}
}
/// Clean LLM-specific tokens from content
pub fn clean_llm_tokens(content: &str) -> String {
content
.replace("<|im_end|>", "")
.replace("</s>", "")
.replace("[/INST]", "")
.replace("<</SYS>>", "")
}
/// Format a duration for display
pub fn format_duration(duration: Duration) -> String {
let total_ms = duration.as_millis();
if total_ms < 1000 {
format!("{}ms", total_ms)
} else if total_ms < 60_000 {
format!("{:.1}s", duration.as_secs_f64())
} else {
let minutes = total_ms / 60_000;
let remaining_seconds = (total_ms % 60_000) as f64 / 1000.0;
format!("{}m {:.1}s", minutes, remaining_seconds)
}
}
/// Format the timing footer with optional token usage info
pub fn format_timing_footer(
elapsed: Duration,
ttft: Duration,
turn_tokens: Option<u32>,
context_percentage: f32,
) -> String {
let timing = format!(
"⏱️ {} | 💭 {}",
format_duration(elapsed),
format_duration(ttft)
);
// Add token usage info if available (dimmed)
if let Some(tokens) = turn_tokens {
format!(
"{} \x1b[2m{} ◉ | {:.0}%\x1b[0m",
timing, tokens, context_percentage
)
} else {
format!("{} \x1b[2m{:.0}%\x1b[0m", timing, context_percentage)
}
}
/// Log detailed error information when stream produces no content
pub fn log_stream_error(
iteration_count: usize,
provider_name: &str,
provider_model: &str,
chunks_received: usize,
parser: &StreamingToolParser,
request: &CompletionRequest,
context_window: &ContextWindow,
session_id: Option<&str>,
raw_chunks: &[String],
) {
error!("=== STREAM ERROR: No content or tool calls received ===");
error!("Iteration: {}/{}", iteration_count, MAX_ITERATIONS);
error!("Provider: {} (model: {})", provider_name, provider_model);
error!("Chunks received: {}", chunks_received);
error!("Parser state:");
error!(" - Text buffer length: {}", parser.text_buffer_len());
error!(" - Text buffer content: {:?}", parser.get_text_content());
error!(" - Has incomplete tool call: {}", parser.has_incomplete_tool_call());
error!(" - Message stopped: {}", parser.is_message_stopped());
error!(" - In JSON tool call: {}", parser.is_in_json_tool_call());
error!(" - JSON tool start: {:?}", parser.json_tool_start_position());
error!("Request details:");
error!(" - Messages count: {}", request.messages.len());
error!(" - Has tools: {}", request.tools.is_some());
error!(" - Max tokens: {:?}", request.max_tokens);
error!(" - Temperature: {:?}", request.temperature);
error!(" - Stream: {}", request.stream);
error!("Raw chunks received ({} total):", chunks_received);
for (i, chunk_str) in raw_chunks.iter().take(25).enumerate() {
error!(" [{}] {}", i, chunk_str);
}
match serde_json::to_string_pretty(request) {
Ok(json) => {
error!("(turn on DEBUG logging for the raw JSON request)");
debug!("Full request JSON:\n{}", json);
}
Err(e) => error!("Failed to serialize request: {}", e),
}
if let Some(last_user_msg) = request
.messages
.iter()
.rev()
.find(|m| matches!(m.role, MessageRole::User))
{
let truncated = if last_user_msg.content.len() > 500 {
format!("{}... (truncated)", &last_user_msg.content[..500])
} else {
last_user_msg.content.clone()
};
error!("Last user message: {}", truncated);
}
error!("Context window state:");
error!(
" - Used tokens: {}/{}",
context_window.used_tokens, context_window.total_tokens
);
error!(" - Percentage used: {:.1}%", context_window.percentage_used());
error!(
" - Conversation history length: {}",
context_window.conversation_history.len()
);
error!("Session ID: {:?}", session_id);
error!("=== END STREAM ERROR ===");
}
/// Truncate a string value for display, respecting UTF-8 boundaries
pub fn truncate_for_display(s: &str, max_len: usize) -> String {
if s.len() <= max_len {
s.to_string()
} else {
let truncated: String = s.char_indices().take(max_len).map(|(_, c)| c).collect();
format!("{}...", truncated)
}
}
/// Truncate a line for tool output display
pub fn truncate_line(line: &str, max_width: usize, should_truncate: bool) -> String {
if !should_truncate {
line.to_string()
} else if line.chars().count() <= max_width {
line.to_string()
} else {
let truncated: String = line.chars().take(max_width.saturating_sub(3)).collect();
format!("{}...", truncated)
}
}
/// Check if two tool calls are duplicates (same tool and args)
pub fn are_tool_calls_duplicate(tc1: &ToolCall, tc2: &ToolCall) -> bool {
tc1.tool == tc2.tool && tc1.args == tc2.args
}
/// Determine if a response is essentially empty (whitespace or timing only)
pub fn is_empty_response(response: &str) -> bool {
response.trim().is_empty()
|| response
.lines()
.all(|line| line.trim().is_empty() || line.trim().starts_with("⏱️"))
}
/// Check if an error is a recoverable connection error
pub fn is_connection_error(error_msg: &str) -> bool {
error_msg.contains("unexpected EOF")
|| error_msg.contains("connection")
|| error_msg.contains("chunk size line")
|| error_msg.contains("body error")
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_clean_llm_tokens() {
assert_eq!(clean_llm_tokens("hello<|im_end|>"), "hello");
assert_eq!(clean_llm_tokens("test</s>more"), "testmore");
assert_eq!(clean_llm_tokens("[/INST]response"), "response");
}
#[test]
fn test_format_duration() {
assert_eq!(format_duration(Duration::from_millis(500)), "500ms");
assert_eq!(format_duration(Duration::from_millis(1500)), "1.5s");
assert_eq!(format_duration(Duration::from_secs(90)), "1m 30.0s");
}
#[test]
fn test_truncate_for_display() {
assert_eq!(truncate_for_display("short", 10), "short");
assert_eq!(truncate_for_display("this is long", 5), "this ...");
}
#[test]
fn test_is_empty_response() {
assert!(is_empty_response(""));
assert!(is_empty_response(" \n "));
assert!(is_empty_response("⏱️ 1.5s"));
assert!(!is_empty_response("actual content"));
}
#[test]
fn test_is_connection_error() {
assert!(is_connection_error("unexpected EOF during read"));
assert!(is_connection_error("connection reset"));
assert!(!is_connection_error("invalid JSON"));
}
}