refactor(g3-core): improve stream_completion_with_tools readability

Extract and simplify the streaming completion function:

- Extract ensure_context_capacity() helper for pre-loop context management
  (thinning + compaction logic now in dedicated async method)
- Simplify compact_summary generation block: flatten nested if/match,
  remove redundant comments, reorder branches for clarity
- Remove dead code: unused _last_error variable and modified_tool_call
- Streamline duplicate detection block: reduce verbose logging
- Clean up text content display block: remove redundant comments,
  tighten variable declarations
- Remove redundant is_todo_tool redefinition inside block expression

Net reduction: 79 lines (-187/+108)
Behavior unchanged, all unit tests pass.

Agent: carmack
This commit is contained in:
Dhanji R. Prasanna
2026-01-14 15:11:53 +05:30
parent 996dc357b4
commit 5104bd53b6

View File

@@ -1011,6 +1011,73 @@ impl<W: UiWriter> Agent<W> {
message
}
/// Ensure context window has capacity before streaming.
/// Tries thinning first (cheap), then compaction (requires LLM call).
/// Returns Ok(true) if request.messages was updated, Ok(false) if unchanged.
async fn ensure_context_capacity(&mut self, request: &mut CompletionRequest) -> Result<bool> {
if !self.context_window.should_compact() {
return Ok(false);
}
// Try thinning first if at high capacity
if self.context_window.percentage_used() > 90.0 && self.context_window.should_thin() {
self.ui_writer.print_context_status(&format!(
"\n🥒 Context window at {}%. Trying thinning first...",
self.context_window.percentage_used() as u32
));
let thin_summary = self.do_thin_context();
self.ui_writer.print_context_thinning(&thin_summary);
if !self.context_window.should_compact() {
self.ui_writer.print_context_status("✅ Thinning resolved capacity issue. Continuing...\n");
return Ok(false);
}
self.ui_writer.print_context_status("⚠️ Thinning insufficient. Proceeding with compaction...\n");
}
// Compaction still needed
if !self.context_window.should_compact() {
return Ok(false);
}
use crate::compaction::{CompactionConfig, perform_compaction};
self.ui_writer.print_context_status(&format!(
"\n🗜️ Context window reaching capacity ({}%). Compacting...",
self.context_window.percentage_used() as u32
));
let provider_name = self.providers.get(None)?.name().to_string();
let latest_user_msg = request.messages.iter().rev()
.find(|m| matches!(m.role, MessageRole::User))
.map(|m| m.content.clone());
let compaction_config = CompactionConfig {
provider_name: &provider_name,
latest_user_msg,
};
let result = perform_compaction(
&self.providers,
&mut self.context_window,
&self.config,
compaction_config,
&self.ui_writer,
&mut self.thinning_events,
).await?;
if result.success {
self.ui_writer.print_context_status("✅ Context compacted successfully. Continuing...\n");
self.compaction_events.push(result.chars_saved);
request.messages = self.context_window.conversation_history.clone();
return Ok(true);
}
self.ui_writer.print_context_status("⚠️ Unable to compact context. Consider starting a new session if you continue to see errors.\n");
Err(anyhow::anyhow!("Context window at capacity and compaction failed. Please start a new session."))
}
/// Check if a tool call is a duplicate of the last tool call in the previous assistant message.
/// Returns Some("DUP IN MSG") if it's a duplicate, None otherwise.
fn check_duplicate_in_previous_message(&self, tool_call: &ToolCall) -> Option<String> {
@@ -1703,82 +1770,7 @@ Skip if nothing new. Be brief."#;
let mut turn_accumulated_usage: Option<g3_providers::Usage> = None; // Track token usage for timing footer
// --- Phase 1: Pre-loop Context Capacity Check ---
if self.context_window.should_compact() {
// First try thinning if we are at capacity, don't call the LLM for compaction (might fail)
if self.context_window.percentage_used() > 90.0 && self.context_window.should_thin() {
self.ui_writer.print_context_status(&format!(
"\n🥒 Context window at {}%. Trying thinning first...",
self.context_window.percentage_used() as u32
));
let thin_summary = self.do_thin_context();
self.ui_writer.print_context_thinning(&thin_summary);
// Check if thinning was sufficient
if !self.context_window.should_compact() {
self.ui_writer.print_context_status(
"✅ Thinning resolved capacity issue. Continuing...\n",
);
// Continue with the original request without compaction
} else {
self.ui_writer.print_context_status(
"⚠️ Thinning insufficient. Proceeding with compaction...\n",
);
}
}
// Only proceed with compaction if still needed after thinning
if self.context_window.should_compact() {
use crate::compaction::{CompactionConfig, perform_compaction};
// Notify user about compaction
self.ui_writer.print_context_status(&format!(
"\n🗜️ Context window reaching capacity ({}%). Compacting...",
self.context_window.percentage_used() as u32
));
let provider = self.providers.get(None)?;
let provider_name = provider.name().to_string();
let _ = provider; // Release borrow early
// Extract the latest user message from the request (not context_window)
let latest_user_msg = request
.messages
.iter()
.rev()
.find(|m| matches!(m.role, MessageRole::User))
.map(|m| m.content.clone());
let compaction_config = CompactionConfig {
provider_name: &provider_name,
latest_user_msg,
};
let result = perform_compaction(
&self.providers,
&mut self.context_window,
&self.config,
compaction_config,
&self.ui_writer,
&mut self.thinning_events,
).await?;
if result.success {
self.ui_writer.print_context_status(
"✅ Context compacted successfully. Continuing...\n",
);
self.compaction_events.push(result.chars_saved);
// Update the request with new context
request.messages = self.context_window.conversation_history.clone();
} else {
self.ui_writer.print_context_status("⚠️ Unable to compact context. Consider starting a new session if you continue to see errors.\n");
// Don't continue with the original request if compaction failed
// as we're likely at token limit
return Err(anyhow::anyhow!("Context window at capacity and compaction failed. Please start a new session."));
}
}
}
self.ensure_context_capacity(&mut request).await?;
// --- Phase 2: Main Streaming Loop ---
loop {
@@ -1868,7 +1860,7 @@ Skip if nothing new. Be brief."#;
let mut tool_executed = false;
let mut chunks_received = 0;
let mut raw_chunks: Vec<String> = Vec::new(); // Store raw chunks for debugging
let mut _last_error: Option<String> = None;
let mut accumulated_usage: Option<g3_providers::Usage> = None;
let mut stream_stop_reason: Option<String> = None; // Track why the stream stopped
@@ -1935,83 +1927,48 @@ Skip if nothing new. Be brief."#;
for (tool_call, duplicate_type) in deduplicated_tools {
debug!("Processing completed tool call: {:?}", tool_call);
// If it's a duplicate, log it and skip - don't set tool_executed!
// Setting tool_executed for duplicates would trigger auto-continue
// even when no actual tool execution occurred.
// Skip duplicates (don't set tool_executed - would trigger spurious auto-continue)
if let Some(dup_type) = &duplicate_type {
// Log the duplicate with red prefix
let prefixed_tool_name =
format!("🟥 {} {}", tool_call.tool, dup_type);
let warning_msg = format!(
"⚠️ Duplicate tool call detected ({}): Skipping execution of {} with args {}",
debug!(
"Skipping duplicate tool call ({}): {} with args {}",
dup_type,
tool_call.tool,
serde_json::to_string(&tool_call.args).unwrap_or_else(|_| "<unserializable>".to_string())
);
// Log to tool log with red prefix
let mut modified_tool_call = tool_call.clone();
modified_tool_call.tool = prefixed_tool_name;
debug!("{}", warning_msg);
// NOTE: Do NOT call parser.reset() here!
// Resetting the parser clears the entire text buffer, which would
// lose any subsequent (non-duplicate) tool calls that haven't been
// processed yet.
continue; // Skip execution of duplicate
continue;
}
// Check if we should auto-compact at 90% BEFORE executing the tool
// We need to do this before any borrows of self
// Flag for post-turn compaction if at 90% capacity
if self.auto_compact && self.context_window.percentage_used() >= 90.0 {
// Set flag to trigger compaction after this turn completes
// We can't do it now due to borrow checker constraints
self.pending_90_compaction = true;
}
// Check if we should thin the context BEFORE executing the tool
// Thin context if needed before tool execution
if self.context_window.should_thin() {
let thin_summary = self.do_thin_context();
// Print the thinning summary
self.ui_writer.print_context_thinning(&thin_summary);
}
// Track what we've already displayed before getting new text
// This prevents re-displaying old content after tool execution
// Calculate new content to display (skip already-shown text)
let already_displayed_chars = current_response.chars().count();
// Get the text content accumulated so far
let text_content = parser.get_text_content();
// Clean the content
let clean_content = streaming::clean_llm_tokens(&text_content);
// Store the raw content BEFORE filtering for the context window log
let raw_content_for_log = clean_content.clone();
// Filter out JSON tool calls from the display
let filtered_content =
self.ui_writer.filter_json_tool_calls(&clean_content);
let filtered_content = self.ui_writer.filter_json_tool_calls(&clean_content);
let final_display_content = filtered_content.trim();
// Display any new content before tool execution
// We need to skip what was already shown (tracked in current_response)
// but also account for the fact that parser.text_buffer accumulates
// across iterations and is never cleared until reset()
let new_content =
if current_response.len() <= final_display_content.len() {
// Only show content that hasn't been displayed yet
final_display_content
.chars()
.skip(already_displayed_chars)
.collect::<String>()
} else {
// Nothing new to display
String::new()
};
// Extract only the new (undisplayed) portion
let new_content = if current_response.len() <= final_display_content.len() {
final_display_content
.chars()
.skip(already_displayed_chars)
.collect::<String>()
} else {
String::new()
};
// Display any new text content
if !new_content.trim().is_empty() {
// Display new text before tool execution
if !new_content.trim().is_empty() {
#[allow(unused_assignments)]
if !response_started {
self.ui_writer.print_agent_prompt();
@@ -2019,19 +1976,14 @@ Skip if nothing new. Be brief."#;
}
self.ui_writer.print_agent_response(&new_content);
self.ui_writer.flush();
// Update current_response to track what we've displayed
current_response.push_str(&new_content);
}
// Execute the tool with formatted output
// Finish streaming markdown before showing tool output
self.ui_writer.finish_streaming_markdown();
// Check if this is a TODO tool (they handle their own output)
let is_todo_tool = tool_call.tool == "todo_read" || tool_call.tool == "todo_write";
// Tool call header (skip for TODO tools - they print their own compact header)
// Tool call header (TODO tools print their own)
if !is_todo_tool {
self.ui_writer.print_tool_header(&tool_call.tool, Some(&tool_call.args));
if let Some(args_obj) = tool_call.args.as_object() {
@@ -2084,75 +2036,46 @@ Skip if nothing new. Be brief."#;
// Display tool execution result with proper indentation
let compact_summary = {
let output_lines: Vec<&str> = tool_result.lines().collect();
// Check if UI wants full output (machine mode) or truncated (human mode)
let wants_full = self.ui_writer.wants_full_output();
const MAX_LINES: usize = 5;
const MAX_LINE_WIDTH: usize = 80;
let output_len = output_lines.len();
// Skip printing content for todo tools - they already print their content
let is_todo_tool =
tool_call.tool == "todo_read" || tool_call.tool == "todo_write";
if is_compact_tool {
// For failed compact tools, show truncated error message
// Determine output format based on tool type
if is_todo_tool {
// TODO tools handle their own output
None
} else if is_compact_tool {
// Compact tools: show one-line summary
if !tool_success {
let error_msg = streaming::truncate_for_display(&tool_result, 60);
Some(error_msg)
Some(streaming::truncate_for_display(&tool_result, 60))
} else {
// Generate appropriate summary based on tool type
match tool_call.tool.as_str() {
"read_file" => Some(streaming::format_read_file_summary(output_len, tool_result.len())),
"write_file" => {
// The tool result already contains the formatted summary
// Format: "✅ wrote N lines | M chars"
Some(streaming::format_write_file_result(&tool_result))
}
"write_file" => Some(streaming::format_write_file_result(&tool_result)),
"str_replace" => {
// Parse insertions/deletions from result
// Result format: "✅ +N insertions | -M deletions"
let (ins, del) = parse_diff_stats(&tool_result);
Some(streaming::format_str_replace_summary(ins, del))
}
"remember" => {
// Extract size from result like "Memory updated. Size: 1.2k"
Some(streaming::format_remember_summary(&tool_result))
}
"screenshot" => {
// Extract path from result
Some(streaming::format_screenshot_summary(&tool_result))
}
"coverage" => {
// Show coverage summary
Some(streaming::format_coverage_summary(&tool_result))
}
"rehydrate" => {
// Show fragment info
Some(streaming::format_rehydrate_summary(&tool_result))
}
"code_search" => {
// Show search summary (matches and files)
Some(streaming::format_code_search_summary(&tool_result))
}
_ => Some(format!("✅ completed"))
"remember" => Some(streaming::format_remember_summary(&tool_result)),
"screenshot" => Some(streaming::format_screenshot_summary(&tool_result)),
"coverage" => Some(streaming::format_coverage_summary(&tool_result)),
"rehydrate" => Some(streaming::format_rehydrate_summary(&tool_result)),
"code_search" => Some(streaming::format_code_search_summary(&tool_result)),
_ => Some("✅ completed".to_string()),
}
}
} else if is_todo_tool {
// Skip - todo tools print their own content
None
} else {
// Regular tools: show truncated output lines
let max_lines_to_show = if wants_full { output_len } else { MAX_LINES };
for (idx, line) in output_lines.iter().enumerate() {
if !wants_full && idx >= max_lines_to_show {
break;
}
let clipped_line = streaming::truncate_line(line, MAX_LINE_WIDTH, !wants_full);
self.ui_writer.update_tool_output_line(&clipped_line);
self.ui_writer.update_tool_output_line(
&streaming::truncate_line(line, MAX_LINE_WIDTH, !wants_full)
);
}
if !wants_full && output_len > MAX_LINES {
self.ui_writer.print_tool_output_summary(output_len);
}
@@ -2292,7 +2215,7 @@ Skip if nothing new. Be brief."#;
// Clear current_response for next iteration to prevent buffered text
// from being incorrectly displayed after tool execution
current_response.clear();
// Reset response_started flag for next iteration
// Reset for next iteration (value read in next loop pass)
response_started = false;
// Continue processing - don't break mid-stream
@@ -2443,8 +2366,6 @@ Skip if nothing new. Be brief."#;
error!("Parser state at error: text_buffer_len={}, has_incomplete={}, message_stopped={}",
parser.text_buffer_len(), parser.has_incomplete_tool_call(), parser.is_message_stopped());
// Store the error for potential logging later
_last_error = Some(error_details.clone());
// Check if this is a recoverable connection error
let is_connection_error = streaming::is_connection_error(&error_msg);