refactor(g3-core): use StreamingState and IterationState structs in stream_completion_with_tools

Consolidate scattered state variables in the 834-line stream_completion_with_tools()
function to use the existing StreamingState and IterationState structs from
streaming.rs. This eliminates code-path aliasing where state was tracked in
multiple places and makes the streaming loop easier to reason about.

Changes:
- Add assistant_message_added field to StreamingState
- Add stream_stop_reason field to IterationState
- Replace 8 inline state variables with StreamingState::new()
- Replace 7 iteration-local variables with IterationState::new()
- All 585 workspace tests pass

This is a pure refactor with no behavior changes. The state structs were already
defined in streaming.rs but not used in the main streaming loop.

Agent: fowler
This commit is contained in:
Dhanji R. Prasanna
2026-01-20 15:05:23 +05:30
parent dec22f5e58
commit 9abb3735d2
3 changed files with 108 additions and 111 deletions

View File

@@ -1897,31 +1897,23 @@ Skip if nothing new. Be brief."#;
debug!("Starting stream_completion_with_tools");
// --- State Initialization ---
let full_response = String::new();
let mut first_token_time: Option<Duration> = None;
let stream_start = Instant::now();
let mut iteration_count = 0;
const MAX_ITERATIONS: usize = 400; // Prevent infinite loops
let mut response_started = false;
let mut any_tool_executed = false; // Track if ANY tool was executed across all iterations
let mut assistant_message_added = false; // Track if assistant message was added to context this iteration
// Note: Session-level duplicate tracking was removed - we only prevent sequential duplicates (DUP IN CHUNK, DUP IN MSG)
let mut turn_accumulated_usage: Option<g3_providers::Usage> = None; // Track token usage for timing footer
// Note: Session-level duplicate tracking was removed - we only prevent sequential duplicates (DUP IN CHUNK, DUP IN MSG)
let mut state = streaming::StreamingState::new();
// --- Phase 1: Pre-loop Context Capacity Check ---
self.ensure_context_capacity(&mut request).await?;
// --- Phase 2: Main Streaming Loop ---
loop {
iteration_count += 1;
debug!("Starting iteration {}", iteration_count);
if iteration_count > MAX_ITERATIONS {
state.iteration_count += 1;
debug!("Starting iteration {}", state.iteration_count);
if state.iteration_count > streaming::MAX_ITERATIONS {
warn!("Maximum iterations reached, stopping stream");
break;
}
// Add a small delay between iterations to prevent "model busy" errors
if iteration_count > 1 {
if state.iteration_count > 1 {
tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
}
@@ -1970,10 +1962,10 @@ Skip if nothing new. Be brief."#;
Err(e) => {
error!("Failed to start stream: {}", e);
// Additional retry for "busy" errors on subsequent iterations
if iteration_count > 1 && e.to_string().contains("busy") {
if state.iteration_count > 1 && e.to_string().contains("busy") {
warn!(
"Model busy on iteration {}, attempting one more retry in 500ms",
iteration_count
state.iteration_count
);
tokio::time::sleep(tokio::time::Duration::from_millis(500)).await;
@@ -1994,14 +1986,8 @@ Skip if nothing new. Be brief."#;
// Write context window summary every time we send messages to LLM
self.write_context_window_summary();
let mut parser = StreamingToolParser::new();
let mut current_response = String::new();
let mut tool_executed = false;
let mut chunks_received = 0;
let mut raw_chunks: Vec<String> = Vec::new(); // Store raw chunks for debugging
let mut accumulated_usage: Option<g3_providers::Usage> = None;
let mut stream_stop_reason: Option<String> = None; // Track why the stream stopped
// Create fresh iteration state for this streaming iteration
let mut iter = streaming::IterationState::new();
while let Some(chunk_result) = stream.next().await {
match chunk_result {
@@ -2011,8 +1997,8 @@ Skip if nothing new. Be brief."#;
// Capture usage data if available
if let Some(ref usage) = chunk.usage {
accumulated_usage = Some(usage.clone());
turn_accumulated_usage = Some(usage.clone());
iter.accumulated_usage = Some(usage.clone());
state.turn_accumulated_usage = Some(usage.clone());
debug!(
"Received usage data - prompt: {}, completion: {}, total: {}",
usage.prompt_tokens, usage.completion_tokens, usage.total_tokens
@@ -2031,29 +2017,29 @@ Skip if nothing new. Be brief."#;
}
// Store raw chunk for debugging (limit to first 20 and last 5)
if chunks_received < 20 || chunk.finished {
raw_chunks.push(format!(
if iter.chunks_received < 20 || chunk.finished {
iter.raw_chunks.push(format!(
"Chunk #{}: content={:?}, finished={}, tool_calls={:?}",
chunks_received + 1,
iter.chunks_received + 1,
chunk.content,
chunk.finished,
chunk.tool_calls
));
} else if raw_chunks.len() == 20 {
raw_chunks.push("... (chunks 21+ omitted for brevity) ...".to_string());
} else if iter.raw_chunks.len() == 20 {
iter.raw_chunks.push("... (chunks 21+ omitted for brevity) ...".to_string());
}
// Record time to first token
if first_token_time.is_none() && !chunk.content.is_empty() {
first_token_time = Some(stream_start.elapsed());
if state.first_token_time.is_none() && !chunk.content.is_empty() {
state.first_token_time = Some(state.stream_start.elapsed());
// Record in agent metrics
if let Some(ttft) = first_token_time {
if let Some(ttft) = state.first_token_time {
self.first_token_times.push(ttft);
}
}
chunks_received += 1;
if chunks_received == 1 {
iter.chunks_received += 1;
if iter.chunks_received == 1 {
debug!(
"First chunk received: content_len={}, finished={}",
chunk.content.len(),
@@ -2062,7 +2048,7 @@ Skip if nothing new. Be brief."#;
}
// Process chunk with the new parser
let completed_tools = parser.process_chunk(&chunk);
let completed_tools = iter.parser.process_chunk(&chunk);
// Handle completed tool calls - process all if multiple calls enabled
// Always process all tool calls - they will be executed after stream ends
@@ -2101,8 +2087,8 @@ Skip if nothing new. Be brief."#;
}
// Calculate new content to display (skip already-shown text)
let already_displayed_chars = current_response.chars().count();
let text_content = parser.get_text_content();
let already_displayed_chars = iter.current_response.chars().count();
let text_content = iter.parser.get_text_content();
let clean_content = streaming::clean_llm_tokens(&text_content);
let raw_content_for_log = clean_content.clone();
let filtered_content =
@@ -2111,7 +2097,7 @@ Skip if nothing new. Be brief."#;
// Extract only the new (undisplayed) portion
let new_content =
if current_response.len() <= final_display_content.len() {
if iter.current_response.len() <= final_display_content.len() {
final_display_content
.chars()
.skip(already_displayed_chars)
@@ -2123,13 +2109,13 @@ Skip if nothing new. Be brief."#;
// Display new text before tool execution
if !new_content.trim().is_empty() {
#[allow(unused_assignments)]
if !response_started {
if !state.response_started {
self.ui_writer.print_agent_prompt();
response_started = true;
state.response_started = true;
}
self.ui_writer.print_agent_response(&new_content);
self.ui_writer.flush();
current_response.push_str(&new_content);
iter.current_response.push_str(&new_content);
}
self.ui_writer.finish_streaming_markdown();
@@ -2392,8 +2378,8 @@ Skip if nothing new. Be brief."#;
// 1. At the end when no tools were executed
// 2. At the end when no tools were executed (handled in the "no tool executed" branch)
tool_executed = true;
any_tool_executed = true; // Track across all iterations
iter.tool_executed = true;
state.any_tool_executed = true; // Track across all iterations
// Reset the JSON tool call filter state after each tool execution
// This ensures the filter doesn't stay in suppression mode for subsequent streaming content
@@ -2401,22 +2387,22 @@ Skip if nothing new. Be brief."#;
// Only reset parser if there are no more unexecuted tool calls in the buffer
// This handles the case where the LLM emits multiple tool calls in one response
if parser.has_unexecuted_tool_call() {
if iter.parser.has_unexecuted_tool_call() {
debug!(
"Parser still has unexecuted tool calls, not resetting buffer"
);
// Mark current tool as consumed so we don't re-detect it
parser.mark_tool_calls_consumed();
iter.parser.mark_tool_calls_consumed();
} else {
// Reset parser for next iteration - this clears the text buffer
parser.reset();
iter.parser.reset();
}
// Clear current_response for next iteration to prevent buffered text
// from being incorrectly displayed after tool execution
current_response.clear();
iter.current_response.clear();
// Reset for next iteration (value read in next loop pass)
response_started = false;
state.response_started = false;
// Continue processing - don't break mid-stream
} // End of for loop processing each tool call
@@ -2425,7 +2411,7 @@ Skip if nothing new. Be brief."#;
// All tool calls are collected and executed after the stream ends.
// If no tool calls were completed, continue streaming normally
if !tool_executed {
if !iter.tool_executed {
let clean_content = streaming::clean_llm_tokens(&chunk.content);
if !clean_content.is_empty() {
@@ -2433,42 +2419,42 @@ Skip if nothing new. Be brief."#;
self.ui_writer.filter_json_tool_calls(&clean_content);
if !filtered_content.is_empty() {
if !response_started {
if !state.response_started {
self.ui_writer.print_agent_prompt();
response_started = true;
state.response_started = true;
}
self.ui_writer.print_agent_response(&filtered_content);
self.ui_writer.flush();
current_response.push_str(&filtered_content);
iter.current_response.push_str(&filtered_content);
}
}
}
if chunk.finished {
debug!("Stream finished: tool_executed={}, current_response_len={}, full_response_len={}, chunks_received={}",
tool_executed, current_response.len(), full_response.len(), chunks_received);
iter.tool_executed, iter.current_response.len(), state.full_response.len(), iter.chunks_received);
// Capture the stop reason from the final chunk
if let Some(ref reason) = chunk.stop_reason {
debug!("Stream stop_reason: {}", reason);
stream_stop_reason = Some(reason.clone());
iter.stream_stop_reason = Some(reason.clone());
}
// Stream finished - check if we should continue or return
if !tool_executed {
if !iter.tool_executed {
// No tools were executed in this iteration
// Check if we got any meaningful response at all
// We need to check the parser's text buffer as well, since the LLM
// might have responded with text but no tool calls
let text_content = parser.get_text_content();
let text_content = iter.parser.get_text_content();
let has_text_response = !text_content.trim().is_empty()
|| !current_response.trim().is_empty();
|| !iter.current_response.trim().is_empty();
// Don't re-add text from parser buffer if we already displayed it
// The parser buffer contains ALL accumulated text, but current_response
// already has what was displayed during streaming
if current_response.is_empty() && !text_content.trim().is_empty() {
if iter.current_response.is_empty() && !text_content.trim().is_empty() {
// Only use parser text if we truly have no response
// This should be rare - only if streaming failed to display anything
debug!("Warning: Using parser buffer text as fallback - this may duplicate output");
@@ -2480,7 +2466,7 @@ Skip if nothing new. Be brief."#;
self.ui_writer.filter_json_tool_calls(&clean_text);
// Only use this if we truly have nothing else
if !filtered_text.trim().is_empty() && full_response.is_empty()
if !filtered_text.trim().is_empty() && state.full_response.is_empty()
{
debug!(
"Using filtered parser text as last resort: {} chars",
@@ -2491,22 +2477,22 @@ Skip if nothing new. Be brief."#;
}
}
if !has_text_response && full_response.is_empty() {
if !has_text_response && state.full_response.is_empty() {
streaming::log_stream_error(
iteration_count,
state.iteration_count,
&provider_name,
&provider_model,
chunks_received,
&parser,
iter.chunks_received,
&iter.parser,
&request,
&self.context_window,
self.session_id.as_deref(),
&raw_chunks,
&iter.raw_chunks,
);
// No response received - this is an error condition
warn!("Stream finished without any content or tool calls");
warn!("Chunks received: {}", chunks_received);
warn!("Chunks received: {}", iter.chunks_received);
return Err(anyhow::anyhow!(
"No response received from the model. The model may be experiencing issues or the request may have been malformed."
));
@@ -2514,18 +2500,18 @@ Skip if nothing new. Be brief."#;
// If tools were executed in previous iterations,
// break to let the outer loop handle finalization
if any_tool_executed {
if state.any_tool_executed {
debug!("Tools were executed in previous iterations, breaking to finalize");
// IMPORTANT: Save any text response to context window before breaking
// This ensures text displayed after tool execution is not lost
if !current_response.trim().is_empty() && !assistant_message_added {
debug!("Saving current_response ({} chars) to context before finalization", current_response.len());
if !iter.current_response.trim().is_empty() && !state.assistant_message_added {
debug!("Saving current_response ({} chars) to context before finalization", iter.current_response.len());
let assistant_msg = Message::new(
MessageRole::Assistant,
current_response.clone(),
iter.current_response.clone(),
);
self.context_window.add_message(assistant_msg);
assistant_message_added = true;
state.assistant_message_added = true;
}
// NOTE: We intentionally do NOT set full_response here.
@@ -2538,14 +2524,14 @@ Skip if nothing new. Be brief."#;
// Save assistant message before returning (no tools were executed)
// This ensures text-only responses are saved to context
if !current_response.trim().is_empty() && !assistant_message_added {
debug!("Saving current_response ({} chars) to context before early return", current_response.len());
if !iter.current_response.trim().is_empty() && !state.assistant_message_added {
debug!("Saving current_response ({} chars) to context before early return", iter.current_response.len());
let assistant_msg = Message::new(
MessageRole::Assistant,
current_response.clone(),
iter.current_response.clone(),
);
self.context_window.add_message(assistant_msg);
// assistant_message_added = true; // Not needed, we're returning
// state.assistant_message_added = true; // Not needed, we're returning
}
// Set full_response to empty to avoid duplication in return value
@@ -2553,9 +2539,9 @@ Skip if nothing new. Be brief."#;
return Ok(self.finalize_streaming_turn(
String::new(),
show_timing,
stream_start,
first_token_time,
&turn_accumulated_usage,
state.stream_start,
state.first_token_time,
&state.turn_accumulated_usage,
));
}
break; // Tool was executed, break to continue outer loop
@@ -2566,13 +2552,13 @@ Skip if nothing new. Be brief."#;
let error_msg = e.to_string();
let error_details = format!(
"Streaming error at chunk {}: {}",
chunks_received + 1,
iter.chunks_received + 1,
error_msg
);
error!("Error type: {}", std::any::type_name_of_val(&e));
error!("Parser state at error: text_buffer_len={}, has_incomplete={}, message_stopped={}",
parser.text_buffer_len(), parser.has_incomplete_tool_call(), parser.is_message_stopped());
iter.parser.text_buffer_len(), iter.parser.has_incomplete_tool_call(), iter.parser.is_message_stopped());
// Check if this is a recoverable connection error
let is_connection_error = streaming::is_connection_error(&error_msg);
@@ -2580,26 +2566,26 @@ Skip if nothing new. Be brief."#;
if is_connection_error {
warn!(
"Connection error at chunk {}, treating as end of stream",
chunks_received + 1
iter.chunks_received + 1
);
// If we have any content or tool calls, treat this as a graceful end
if chunks_received > 0
&& (!parser.get_text_content().is_empty()
|| parser.has_unexecuted_tool_call())
if iter.chunks_received > 0
&& (!iter.parser.get_text_content().is_empty()
|| iter.parser.has_unexecuted_tool_call())
{
warn!("Stream terminated unexpectedly but we have content, continuing");
break; // Break to process what we have
}
}
if tool_executed {
if iter.tool_executed {
error!("{}", error_details);
warn!("Stream error after tool execution, attempting to continue");
break; // Break to outer loop to start new stream
} else {
// Log raw chunks before failing
error!("Fatal streaming error. Raw chunks received before error:");
for chunk_str in raw_chunks.iter().take(10) {
for chunk_str in iter.raw_chunks.iter().take(10) {
error!(" {}", chunk_str);
}
return Err(e);
@@ -2609,41 +2595,41 @@ Skip if nothing new. Be brief."#;
}
// Update context window with actual usage if available
if let Some(usage) = accumulated_usage {
if let Some(usage) = iter.accumulated_usage {
debug!("Updating context window with actual usage from stream");
self.context_window.update_usage_from_response(&usage);
} else {
// Fall back to estimation if no usage data was provided
debug!("No usage data from stream, using estimation");
let estimated_tokens = ContextWindow::estimate_tokens(&current_response);
let estimated_tokens = ContextWindow::estimate_tokens(&iter.current_response);
self.context_window.add_streaming_tokens(estimated_tokens);
}
// If we get here and no tool was executed, we're done
if !tool_executed {
if !iter.tool_executed {
// IMPORTANT: Do NOT add parser text_content here!
// The text has already been displayed during streaming via current_response.
// The parser buffer accumulates ALL text and would cause duplication.
debug!("Stream completed without tool execution. Response already displayed during streaming.");
debug!(
"Current response length: {}, Full response length: {}",
current_response.len(),
full_response.len()
iter.current_response.len(),
state.full_response.len()
);
let has_response = !current_response.is_empty() || !full_response.is_empty();
let has_response = !iter.current_response.is_empty() || !state.full_response.is_empty();
// Check if the response is essentially empty (just whitespace or timing lines)
// Check if there's an incomplete tool call in the buffer (for debugging)
let has_incomplete_tool_call = parser.has_incomplete_tool_call();
let has_incomplete_tool_call = iter.parser.has_incomplete_tool_call();
// Check if there's a complete but unexecuted tool call in the buffer (for debugging)
let has_unexecuted_tool_call = parser.has_unexecuted_tool_call();
let has_unexecuted_tool_call = iter.parser.has_unexecuted_tool_call();
// Log when we detect unexecuted or incomplete tool calls for debugging
if has_incomplete_tool_call {
debug!("Detected incomplete tool call in buffer (buffer_len={}, consumed_up_to={})",
parser.text_buffer_len(), parser.text_buffer_len());
iter.parser.text_buffer_len(), iter.parser.text_buffer_len());
}
if has_unexecuted_tool_call {
debug!("Detected unexecuted tool call in buffer - this may indicate a parsing issue");
@@ -2652,7 +2638,7 @@ Skip if nothing new. Be brief."#;
// Check if the response was truncated due to max_tokens
let was_truncated_by_max_tokens =
stream_stop_reason.as_deref() == Some("max_tokens");
iter.stream_stop_reason.as_deref() == Some("max_tokens");
if was_truncated_by_max_tokens {
debug!("Response was truncated due to max_tokens limit");
warn!("LLM response was cut off due to max_tokens limit");
@@ -2662,7 +2648,7 @@ Skip if nothing new. Be brief."#;
if has_response {
debug!(
"Response already streamed, not setting full_response. current_response: {} chars",
current_response.len()
iter.current_response.len()
);
}
@@ -2670,9 +2656,9 @@ Skip if nothing new. Be brief."#;
// This ensures the log contains the true raw content including any JSON.
// Note: We check current_response, not full_response, because full_response
// may be empty to avoid display duplication (content was already streamed).
if !current_response.trim().is_empty() && !assistant_message_added {
if !iter.current_response.trim().is_empty() && !state.assistant_message_added {
// Get the raw text from the parser (before filtering)
let raw_text = parser.get_text_content();
let raw_text = iter.parser.get_text_content();
let raw_clean = streaming::clean_llm_tokens(&raw_text);
// Use raw_clean if available, otherwise fall back to current_response.
@@ -2681,18 +2667,18 @@ Skip if nothing new. Be brief."#;
let content_to_save = if !raw_clean.trim().is_empty() {
raw_clean
} else {
current_response.clone()
iter.current_response.clone()
};
let assistant_message = Message::new(MessageRole::Assistant, content_to_save);
self.context_window.add_message(assistant_message);
}
return Ok(self.finalize_streaming_turn(
full_response,
state.full_response.clone(),
show_timing,
stream_start,
first_token_time,
&turn_accumulated_usage,
state.stream_start,
state.first_token_time,
&state.turn_accumulated_usage,
));
}
@@ -2701,11 +2687,11 @@ Skip if nothing new. Be brief."#;
// --- Phase 4: Post-Loop Finalization ---
Ok(self.finalize_streaming_turn(
full_response,
state.full_response.clone(),
show_timing,
stream_start,
first_token_time,
&turn_accumulated_usage,
state.stream_start,
state.first_token_time,
&state.turn_accumulated_usage,
))
}

View File

@@ -23,6 +23,7 @@ pub struct StreamingState {
pub response_started: bool,
pub any_tool_executed: bool,
pub auto_summary_attempts: usize,
pub assistant_message_added: bool,
pub turn_accumulated_usage: Option<g3_providers::Usage>,
}
@@ -36,6 +37,7 @@ impl StreamingState {
response_started: false,
any_tool_executed: false,
auto_summary_attempts: 0,
assistant_message_added: false,
turn_accumulated_usage: None,
}
}
@@ -65,6 +67,7 @@ pub struct IterationState {
pub chunks_received: usize,
pub raw_chunks: Vec<String>,
pub accumulated_usage: Option<g3_providers::Usage>,
pub stream_stop_reason: Option<String>,
}
impl IterationState {
@@ -76,6 +79,7 @@ impl IterationState {
chunks_received: 0,
raw_chunks: Vec::new(),
accumulated_usage: None,
stream_stop_reason: None,
}
}