refactor(g3-core): extract finalize_streaming_turn() to unify return paths

Extract a single canonical helper function for completing streaming turns,
eliminating 3 nearly-identical return paths in stream_completion_with_tools().

Changes:
- Add finalize_streaming_turn() helper that handles:
  - Finishing streaming markdown
  - Saving context window
  - Adding timing footer (when requested)
  - Dehydrating context (when ACD enabled)
  - Building TaskResult
- Replace 3 duplicated return blocks with calls to the helper
- Remove unused mut on full_response variable

Results:
- Function reduced from 1067 to 999 lines (-68 lines)
- Eliminated code-path aliasing: 3 paths → 1 canonical path
- All 32 characterization tests pass
- Full g3-core test suite passes

Agent: fowler
This commit is contained in:
Dhanji R. Prasanna
2026-01-13 16:52:48 +05:30
parent 333a85ed1e
commit 5e45e110e2

View File

@@ -1234,6 +1234,43 @@ impl<W: UiWriter> Agent<W> {
debug!("ACD (aggressive context dehydration): {}", if enabled { "enabled" } else { "disabled" });
}
/// Build the final response and prepare for return.
///
/// This is the single canonical path for completing a streaming turn:
/// 1. Finish streaming markdown
/// 2. Save context window
/// 3. Add timing footer (if requested)
/// 4. Dehydrate context (if ACD enabled)
/// 5. Build TaskResult
fn finalize_streaming_turn(
&mut self,
full_response: String,
show_timing: bool,
stream_start: Instant,
first_token_time: Option<Duration>,
turn_accumulated_usage: &Option<g3_providers::Usage>,
) -> TaskResult {
self.ui_writer.finish_streaming_markdown();
self.save_context_window("completed");
let final_response = if show_timing {
let ttft = first_token_time.unwrap_or_else(|| stream_start.elapsed());
let turn_tokens = turn_accumulated_usage.as_ref().map(|u| u.total_tokens);
let timing_footer = streaming::format_timing_footer(
stream_start.elapsed(),
ttft,
turn_tokens,
self.context_window.percentage_used(),
);
format!("{}\n\n{}", full_response, timing_footer)
} else {
full_response
};
self.dehydrate_context();
TaskResult::new(final_response, self.context_window.clone())
}
/// Perform ACD dehydration - save current conversation state to a fragment.
/// Called at the end of each turn when ACD is enabled.
///
@@ -1642,7 +1679,7 @@ impl<W: UiWriter> Agent<W> {
debug!("Starting stream_completion_with_tools");
// --- State Initialization ---
let mut full_response = String::new();
let full_response = String::new();
let mut first_token_time: Option<Duration> = None;
let stream_start = Instant::now();
let mut iteration_count = 0;
@@ -2368,40 +2405,12 @@ impl<W: UiWriter> Agent<W> {
// Set full_response to empty to avoid duplication in return value
// (content was already displayed during streaming)
full_response = String::new();
// Finish the streaming markdown formatter before returning
self.ui_writer.finish_streaming_markdown();
// Save context window BEFORE returning
self.save_context_window("completed");
let _ttft =
first_token_time.unwrap_or_else(|| stream_start.elapsed());
// Add timing if needed
let final_response = if show_timing {
let turn_tokens = turn_accumulated_usage.as_ref().map(|u| u.total_tokens);
let timing_footer = streaming::format_timing_footer(
stream_start.elapsed(),
_ttft,
turn_tokens,
self.context_window.percentage_used(),
);
format!(
"{}\n\n{}",
full_response,
timing_footer
)
} else {
full_response
};
// Dehydrate context - the function extracts the summary from context itself
self.dehydrate_context();
return Ok(TaskResult::new(
final_response,
self.context_window.clone(),
return Ok(self.finalize_streaming_turn(
String::new(),
show_timing,
stream_start,
first_token_time,
&turn_accumulated_usage,
));
}
break; // Tool was executed, break to continue outer loop
@@ -2612,8 +2621,6 @@ impl<W: UiWriter> Agent<W> {
);
}
let _ttft = first_token_time.unwrap_or_else(|| stream_start.elapsed());
// Add the RAW unfiltered response to context window before returning.
// This ensures the log contains the true raw content including any JSON.
// Note: We check current_response, not full_response, because full_response
@@ -2629,64 +2636,26 @@ impl<W: UiWriter> Agent<W> {
}
}
// Save context window BEFORE returning
self.save_context_window("completed");
// Add timing if needed
let final_response = if show_timing {
let turn_tokens = turn_accumulated_usage.as_ref().map(|u| u.total_tokens);
let timing_footer = streaming::format_timing_footer(
stream_start.elapsed(),
_ttft,
turn_tokens,
self.context_window.percentage_used(),
);
format!(
"{}\n\n{}",
return Ok(self.finalize_streaming_turn(
full_response,
timing_footer
)
} else {
full_response
};
// Finish streaming markdown before returning
self.ui_writer.finish_streaming_markdown();
// Dehydrate context - the function extracts the summary from context itself
self.dehydrate_context();
return Ok(TaskResult::new(final_response, self.context_window.clone()));
show_timing,
stream_start,
first_token_time,
&turn_accumulated_usage,
));
}
// Continue the loop to start a new stream with updated context
}
// --- Phase 4: Post-Loop Finalization ---
let _ttft = first_token_time.unwrap_or_else(|| stream_start.elapsed());
// Add timing if needed
let final_response = if show_timing {
let turn_tokens = turn_accumulated_usage.as_ref().map(|u| u.total_tokens);
let timing_footer = streaming::format_timing_footer(
stream_start.elapsed(),
_ttft,
turn_tokens,
self.context_window.percentage_used(),
);
format!(
"{}\n\n{}",
Ok(self.finalize_streaming_turn(
full_response,
timing_footer
)
} else {
full_response
};
// Dehydrate context - the function extracts the summary from context itself
self.dehydrate_context();
Ok(TaskResult::new(final_response, self.context_window.clone()))
show_timing,
stream_start,
first_token_time,
&turn_accumulated_usage,
))
}
pub async fn execute_tool(&mut self, tool_call: &ToolCall) -> Result<String> {