refactor(streaming): Extract deduplication and auto-continue logic into helpers

Improve readability of stream_completion_with_tools (~1000 line function):

- Add deduplicate_tool_calls() helper with closure for previous-message check
- Add should_auto_continue() with AutoContinueReason enum for clearer control flow
- Replace inline deduplication loop with helper call (-19 lines)
- Replace complex auto-continue conditional with match on reason enum (-13 lines)
- Add section comments for major phases (State Init, Pre-loop, Main Loop, Auto-Continue, Post-Loop)
- Add comprehensive tests for new helpers

Net reduction: 82 deletions, behavior unchanged (172+ tests pass)

Agent: carmack
This commit is contained in:
Dhanji R. Prasanna
2026-01-13 11:44:06 +05:30
parent dc45987e8d
commit a09967eb27
2 changed files with 216 additions and 82 deletions

View File

@@ -1769,11 +1769,22 @@ impl<W: UiWriter> Agent<W> {
mut request: CompletionRequest,
show_timing: bool,
) -> Result<TaskResult> {
// =========================================================================
// STREAMING COMPLETION WITH TOOL EXECUTION
// =========================================================================
// This function orchestrates the streaming LLM response loop:
// 1. Pre-loop: Check context capacity, compact/thin if needed
// 2. Main loop: Stream chunks, detect tool calls, execute tools
// 3. Auto-continue: Re-prompt LLM if tools executed or response truncated
// 4. Post-loop: Finalize response, save context, return result
// =========================================================================
use crate::error_handling::ErrorContext;
use tokio_stream::StreamExt;
debug!("Starting stream_completion_with_tools");
// --- State Initialization ---
let mut full_response = String::new();
let mut first_token_time: Option<Duration> = None;
let stream_start = Instant::now();
@@ -1787,7 +1798,7 @@ impl<W: UiWriter> Agent<W> {
// Note: Session-level duplicate tracking was removed - we only prevent sequential duplicates (DUP IN CHUNK, DUP IN MSG)
let mut turn_accumulated_usage: Option<g3_providers::Usage> = None; // Track token usage for timing footer
// Check if we need to compact before starting
// --- Phase 1: Pre-loop Context Capacity Check ---
if self.context_window.should_compact() {
// First try thinning if we are at capacity, don't call the LLM for compaction (might fail)
if self.context_window.percentage_used() > 90.0 && self.context_window.should_thin() {
@@ -1865,6 +1876,7 @@ impl<W: UiWriter> Agent<W> {
}
}
// --- Phase 2: Main Streaming Loop ---
loop {
iteration_count += 1;
debug!("Starting iteration {}", iteration_count);
@@ -2008,31 +2020,12 @@ impl<W: UiWriter> Agent<W> {
// Handle completed tool calls - process all if multiple calls enabled
// Always process all tool calls - they will be executed after stream ends
let tools_to_process: Vec<ToolCall> = completed_tools;
// De-duplicate tool calls and track duplicates
let mut last_tool_in_chunk: Option<ToolCall> = None;
let mut deduplicated_tools: Vec<(ToolCall, Option<String>)> = Vec::new();
for tool_call in tools_to_process {
let mut duplicate_type = None;
// Check for IMMEDIATELY SEQUENTIAL duplicate in current chunk
// Only the immediately previous tool call counts as a duplicate
if let Some(ref last_tool) = last_tool_in_chunk {
if streaming::are_tool_calls_duplicate(last_tool, &tool_call) {
duplicate_type = Some("DUP IN CHUNK".to_string());
}
} else {
// Check for duplicate against previous message
duplicate_type = self.check_duplicate_in_previous_message(&tool_call);
}
// Track the last tool call for sequential duplicate detection
last_tool_in_chunk = Some(tool_call.clone());
deduplicated_tools.push((tool_call, duplicate_type));
}
// De-duplicate tool calls (sequential duplicates in chunk + duplicates from previous message)
let deduplicated_tools = streaming::deduplicate_tool_calls(
completed_tools,
|tc| self.check_duplicate_in_previous_message(tc),
);
// Process each tool call
for (tool_call, duplicate_type) in deduplicated_tools {
@@ -2664,55 +2657,42 @@ impl<W: UiWriter> Agent<W> {
warn!("LLM response was cut off due to max_tokens limit - will auto-continue");
}
// Auto-continue if tools were executed and we are in autonomous mode
// OR if the LLM emitted an incomplete tool call (truncated JSON)
// OR if the LLM emitted a complete tool call that wasn't executed
// OR if the response was truncated due to max_tokens
// This ensures we don't return control when the LLM clearly intended to call a tool
// Note: We removed the redundant condition (any_tool_executed && is_empty_response)
// because it's already covered by (any_tool_executed )
// Auto-continue is only enabled in autonomous mode - in interactive mode,
// the user may be asking questions and we should return control to them
let should_auto_continue = self.is_autonomous && ((any_tool_executed )
|| has_incomplete_tool_call
|| has_unexecuted_tool_call
|| was_truncated_by_max_tokens);
if should_auto_continue {
// --- Phase 3: Auto-Continue Decision ---
let auto_continue_reason = streaming::should_auto_continue(
self.is_autonomous,
any_tool_executed,
has_incomplete_tool_call,
has_unexecuted_tool_call,
was_truncated_by_max_tokens,
);
if let Some(reason) = auto_continue_reason {
if auto_summary_attempts < MAX_AUTO_SUMMARY_ATTEMPTS {
auto_summary_attempts += 1;
if has_incomplete_tool_call {
warn!(
"LLM emitted incomplete tool call ({} iterations, auto-continue attempt {}/{})",
iteration_count, auto_summary_attempts, MAX_AUTO_SUMMARY_ATTEMPTS
);
self.ui_writer.print_context_status(
"\n🔄 Model emitted incomplete tool call. Auto-continuing...\n"
);
} else if has_unexecuted_tool_call {
warn!(
"LLM emitted unexecuted tool call ({} iterations, auto-continue attempt {}/{})",
iteration_count, auto_summary_attempts, MAX_AUTO_SUMMARY_ATTEMPTS
);
self.ui_writer.print_context_status(
"\n🔄 Model emitted tool call that wasn't executed. Auto-continuing...\n"
);
} else if is_empty_response {
warn!(
"LLM emitted empty/trivial response ({} iterations, auto-continue attempt {}/{})",
iteration_count, auto_summary_attempts, MAX_AUTO_SUMMARY_ATTEMPTS
);
self.ui_writer.print_context_status(
"\n🔄 Model emitted empty response. Auto-continuing...\n"
);
} else {
warn!(
"LLM stopped after executing tools ({} iterations, auto-continue attempt {}/{})",
iteration_count, auto_summary_attempts, MAX_AUTO_SUMMARY_ATTEMPTS
);
self.ui_writer.print_context_status(
"\n🔄 Model stopped without providing summary. Auto-continuing...\n"
);
}
// Log and display appropriate message based on reason
use streaming::AutoContinueReason::*;
let (log_msg, ui_msg) = match reason {
IncompleteToolCall => (
"LLM emitted incomplete tool call",
"\n🔄 Model emitted incomplete tool call. Auto-continuing...\n",
),
UnexecutedToolCall => (
"LLM emitted unexecuted tool call",
"\n🔄 Model emitted tool call that wasn't executed. Auto-continuing...\n",
),
MaxTokensTruncation => (
"LLM response truncated by max_tokens",
"\n🔄 Model response was truncated. Auto-continuing...\n",
),
ToolsExecuted => (
"LLM stopped after executing tools",
"\n🔄 Model stopped without providing summary. Auto-continuing...\n",
),
};
warn!("{} ({} iterations, auto-continue attempt {}/{})",
log_msg, iteration_count, auto_summary_attempts, MAX_AUTO_SUMMARY_ATTEMPTS);
self.ui_writer.print_context_status(ui_msg);
// Add any text response to context before prompting for continuation
if has_response {
@@ -2731,16 +2711,15 @@ impl<W: UiWriter> Agent<W> {
}
// Add a follow-up message asking for continuation
let continue_prompt = if has_incomplete_tool_call {
Message::new(
let continue_prompt = match reason {
IncompleteToolCall => Message::new(
MessageRole::User,
"Your previous response was cut off mid-tool-call. Please complete the tool call and continue.".to_string(),
)
} else {
Message::new(
),
_ => Message::new(
MessageRole::User,
"Please continue until you are done. Provide a summary when complete.".to_string(),
)
),
};
self.context_window.add_message(continue_prompt);
request.messages = self.context_window.conversation_history.clone();
@@ -2826,7 +2805,7 @@ impl<W: UiWriter> Agent<W> {
// Continue the loop to start a new stream with updated context
}
// If we exit the loop due to max iterations
// --- Phase 4: Post-Loop Finalization ---
let _ttft = first_token_time.unwrap_or_else(|| stream_start.elapsed());
// Add timing if needed

View File

@@ -1,7 +1,8 @@
//! Streaming completion logic for the Agent.
//!
//! This module handles the streaming response from LLM providers,
//! including tool call detection, execution, and auto-continue logic.
//! This module provides state management and helper functions for streaming
//! LLM responses, including tool call detection, deduplication, and
//! auto-continue decision logic.
use crate::context_window::ContextWindow;
use crate::streaming_parser::StreamingToolParser;
@@ -385,6 +386,92 @@ pub fn format_rehydrate_summary(result: &str) -> String {
}
}
// =============================================================================
// Tool Call Deduplication
// =============================================================================
/// Result of deduplicating a batch of tool calls.
/// Each tool call is paired with an optional duplicate marker.
pub type DeduplicatedTools = Vec<(ToolCall, Option<String>)>;
/// Deduplicate tool calls, detecting sequential duplicates within a chunk
/// and duplicates against the previous message.
///
/// Returns each tool call paired with `Some("DUP IN CHUNK")` or `Some("DUP IN MSG")`
/// if it's a duplicate, or `None` if it should be executed.
pub fn deduplicate_tool_calls<F>(
tool_calls: Vec<ToolCall>,
check_previous_message: F,
) -> DeduplicatedTools
where
F: Fn(&ToolCall) -> Option<String>,
{
let mut last_tool_in_chunk: Option<ToolCall> = None;
let mut result = Vec::with_capacity(tool_calls.len());
for tool_call in tool_calls {
let duplicate_type = if let Some(ref last) = last_tool_in_chunk {
// Check for sequential duplicate within this chunk
if are_tool_calls_duplicate(last, &tool_call) {
Some("DUP IN CHUNK".to_string())
} else {
None
}
} else {
// First tool in chunk - check against previous message
check_previous_message(&tool_call)
};
last_tool_in_chunk = Some(tool_call.clone());
result.push((tool_call, duplicate_type));
}
result
}
// =============================================================================
// Auto-Continue Decision Logic
// =============================================================================
/// Reasons why the streaming loop should auto-continue.
#[derive(Debug, Clone, PartialEq)]
pub enum AutoContinueReason {
/// Tools were executed and we're in autonomous mode
ToolsExecuted,
/// LLM emitted an incomplete (truncated) tool call
IncompleteToolCall,
/// LLM emitted a tool call that wasn't executed
UnexecutedToolCall,
/// Response was truncated due to max_tokens limit
MaxTokensTruncation,
}
/// Determine if the streaming loop should auto-continue.
/// Returns `Some(reason)` if it should continue, `None` otherwise.
pub fn should_auto_continue(
is_autonomous: bool,
any_tool_executed: bool,
has_incomplete_tool_call: bool,
has_unexecuted_tool_call: bool,
was_truncated: bool,
) -> Option<AutoContinueReason> {
if !is_autonomous {
return None;
}
if any_tool_executed {
Some(AutoContinueReason::ToolsExecuted)
} else if has_incomplete_tool_call {
Some(AutoContinueReason::IncompleteToolCall)
} else if has_unexecuted_tool_call {
Some(AutoContinueReason::UnexecutedToolCall)
} else if was_truncated {
Some(AutoContinueReason::MaxTokensTruncation)
} else {
None
}
}
/// Determine if a response is essentially empty (whitespace or timing only)
pub fn is_empty_response(response: &str) -> bool {
response.trim().is_empty()
@@ -474,4 +561,72 @@ mod tests {
assert_eq!(lines.len(), 3);
assert_eq!(lines[0], "line1");
}
#[test]
fn test_deduplicate_tool_calls_no_duplicates() {
let tools = vec![
ToolCall { tool: "shell".to_string(), args: serde_json::json!({"command": "ls"}) },
ToolCall { tool: "read_file".to_string(), args: serde_json::json!({"path": "foo.rs"}) },
];
let result = deduplicate_tool_calls(tools, |_| None);
assert_eq!(result.len(), 2);
assert!(result[0].1.is_none());
assert!(result[1].1.is_none());
}
#[test]
fn test_deduplicate_tool_calls_sequential_duplicate() {
let tools = vec![
ToolCall { tool: "shell".to_string(), args: serde_json::json!({"command": "ls"}) },
ToolCall { tool: "shell".to_string(), args: serde_json::json!({"command": "ls"}) },
];
let result = deduplicate_tool_calls(tools, |_| None);
assert_eq!(result.len(), 2);
assert!(result[0].1.is_none(), "First should not be duplicate");
assert_eq!(result[1].1, Some("DUP IN CHUNK".to_string()));
}
#[test]
fn test_deduplicate_tool_calls_previous_message_duplicate() {
let tools = vec![
ToolCall { tool: "shell".to_string(), args: serde_json::json!({"command": "ls"}) },
];
// Simulate finding a duplicate in previous message
let result = deduplicate_tool_calls(tools, |_| Some("DUP IN MSG".to_string()));
assert_eq!(result.len(), 1);
assert_eq!(result[0].1, Some("DUP IN MSG".to_string()));
}
#[test]
fn test_should_auto_continue_not_autonomous() {
// Never auto-continue in interactive mode
assert_eq!(should_auto_continue(false, true, false, false, false), None);
assert_eq!(should_auto_continue(false, false, true, false, false), None);
}
#[test]
fn test_should_auto_continue_autonomous() {
use AutoContinueReason::*;
// Tools executed
assert_eq!(should_auto_continue(true, true, false, false, false), Some(ToolsExecuted));
// Incomplete tool call
assert_eq!(should_auto_continue(true, false, true, false, false), Some(IncompleteToolCall));
// Unexecuted tool call
assert_eq!(should_auto_continue(true, false, false, true, false), Some(UnexecutedToolCall));
// Max tokens truncation
assert_eq!(should_auto_continue(true, false, false, false, true), Some(MaxTokensTruncation));
// Nothing special - no auto-continue
assert_eq!(should_auto_continue(true, false, false, false, false), None);
}
}