From 5e45e110e2af0453c36da5ef5414bace9af7f747 Mon Sep 17 00:00:00 2001 From: "Dhanji R. Prasanna" Date: Tue, 13 Jan 2026 16:52:48 +0530 Subject: [PATCH] refactor(g3-core): extract finalize_streaming_turn() to unify return paths MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Extract a single canonical helper function for completing streaming turns, eliminating 3 nearly-identical return paths in stream_completion_with_tools(). Changes: - Add finalize_streaming_turn() helper that handles: - Finishing streaming markdown - Saving context window - Adding timing footer (when requested) - Dehydrating context (when ACD enabled) - Building TaskResult - Replace 3 duplicated return blocks with calls to the helper - Remove unused mut on full_response variable Results: - Function reduced from 1067 to 999 lines (-68 lines) - Eliminated code-path aliasing: 3 paths → 1 canonical path - All 32 characterization tests pass - Full g3-core test suite passes Agent: fowler --- crates/g3-core/src/lib.rs | 147 +++++++++++++++----------------------- 1 file changed, 58 insertions(+), 89 deletions(-) diff --git a/crates/g3-core/src/lib.rs b/crates/g3-core/src/lib.rs index 0f24c51..e2bb90b 100644 --- a/crates/g3-core/src/lib.rs +++ b/crates/g3-core/src/lib.rs @@ -1234,6 +1234,43 @@ impl Agent { debug!("ACD (aggressive context dehydration): {}", if enabled { "enabled" } else { "disabled" }); } + /// Build the final response and prepare for return. + /// + /// This is the single canonical path for completing a streaming turn: + /// 1. Finish streaming markdown + /// 2. Save context window + /// 3. Add timing footer (if requested) + /// 4. Dehydrate context (if ACD enabled) + /// 5. Build TaskResult + fn finalize_streaming_turn( + &mut self, + full_response: String, + show_timing: bool, + stream_start: Instant, + first_token_time: Option, + turn_accumulated_usage: &Option, + ) -> TaskResult { + self.ui_writer.finish_streaming_markdown(); + self.save_context_window("completed"); + + let final_response = if show_timing { + let ttft = first_token_time.unwrap_or_else(|| stream_start.elapsed()); + let turn_tokens = turn_accumulated_usage.as_ref().map(|u| u.total_tokens); + let timing_footer = streaming::format_timing_footer( + stream_start.elapsed(), + ttft, + turn_tokens, + self.context_window.percentage_used(), + ); + format!("{}\n\n{}", full_response, timing_footer) + } else { + full_response + }; + + self.dehydrate_context(); + TaskResult::new(final_response, self.context_window.clone()) + } + /// Perform ACD dehydration - save current conversation state to a fragment. /// Called at the end of each turn when ACD is enabled. /// @@ -1642,7 +1679,7 @@ impl Agent { debug!("Starting stream_completion_with_tools"); // --- State Initialization --- - let mut full_response = String::new(); + let full_response = String::new(); let mut first_token_time: Option = None; let stream_start = Instant::now(); let mut iteration_count = 0; @@ -2368,40 +2405,12 @@ impl Agent { // Set full_response to empty to avoid duplication in return value // (content was already displayed during streaming) - full_response = String::new(); - - // Finish the streaming markdown formatter before returning - self.ui_writer.finish_streaming_markdown(); - - // Save context window BEFORE returning - self.save_context_window("completed"); - let _ttft = - first_token_time.unwrap_or_else(|| stream_start.elapsed()); - - // Add timing if needed - let final_response = if show_timing { - let turn_tokens = turn_accumulated_usage.as_ref().map(|u| u.total_tokens); - let timing_footer = streaming::format_timing_footer( - stream_start.elapsed(), - _ttft, - turn_tokens, - self.context_window.percentage_used(), - ); - format!( - "{}\n\n{}", - full_response, - timing_footer - ) - } else { - full_response - }; - - // Dehydrate context - the function extracts the summary from context itself - self.dehydrate_context(); - - return Ok(TaskResult::new( - final_response, - self.context_window.clone(), + return Ok(self.finalize_streaming_turn( + String::new(), + show_timing, + stream_start, + first_token_time, + &turn_accumulated_usage, )); } break; // Tool was executed, break to continue outer loop @@ -2612,8 +2621,6 @@ impl Agent { ); } - let _ttft = first_token_time.unwrap_or_else(|| stream_start.elapsed()); - // Add the RAW unfiltered response to context window before returning. // This ensures the log contains the true raw content including any JSON. // Note: We check current_response, not full_response, because full_response @@ -2629,64 +2636,26 @@ impl Agent { } } - // Save context window BEFORE returning - self.save_context_window("completed"); - - // Add timing if needed - let final_response = if show_timing { - let turn_tokens = turn_accumulated_usage.as_ref().map(|u| u.total_tokens); - let timing_footer = streaming::format_timing_footer( - stream_start.elapsed(), - _ttft, - turn_tokens, - self.context_window.percentage_used(), - ); - format!( - "{}\n\n{}", - full_response, - timing_footer - ) - } else { - full_response - }; - - // Finish streaming markdown before returning - self.ui_writer.finish_streaming_markdown(); - - // Dehydrate context - the function extracts the summary from context itself - self.dehydrate_context(); - - return Ok(TaskResult::new(final_response, self.context_window.clone())); + return Ok(self.finalize_streaming_turn( + full_response, + show_timing, + stream_start, + first_token_time, + &turn_accumulated_usage, + )); } // Continue the loop to start a new stream with updated context } // --- Phase 4: Post-Loop Finalization --- - let _ttft = first_token_time.unwrap_or_else(|| stream_start.elapsed()); - - // Add timing if needed - let final_response = if show_timing { - let turn_tokens = turn_accumulated_usage.as_ref().map(|u| u.total_tokens); - let timing_footer = streaming::format_timing_footer( - stream_start.elapsed(), - _ttft, - turn_tokens, - self.context_window.percentage_used(), - ); - format!( - "{}\n\n{}", - full_response, - timing_footer - ) - } else { - full_response - }; - - // Dehydrate context - the function extracts the summary from context itself - self.dehydrate_context(); - - Ok(TaskResult::new(final_response, self.context_window.clone())) + Ok(self.finalize_streaming_turn( + full_response, + show_timing, + stream_start, + first_token_time, + &turn_accumulated_usage, + )) } pub async fn execute_tool(&mut self, tool_call: &ToolCall) -> Result {