diff --git a/analysis/memory.md b/analysis/memory.md index e81cf8b..6546cf2 100644 --- a/analysis/memory.md +++ b/analysis/memory.md @@ -1,5 +1,5 @@ # Project Memory -> Updated: 2026-01-20T08:53:25Z | Size: 16.3k chars +> Updated: 2026-01-20T09:01:08Z | Size: 16.7k chars ### Remember Tool Wiring - `crates/g3-core/src/tools/memory.rs` [0..5000] - `execute_remember()`, `get_memory_path()`, `merge_memory()` @@ -295,4 +295,11 @@ Shared display functions for interactive and agent modes. - `print_workspace_path()` [20..29] - prints formatted workspace path - `LoadedContent` [32..39] - tracks loaded project files (README, AGENTS.md, Memory, include prompt) - `print_loaded_status()` [87..103] - prints "✓ README ✓ AGENTS.md" status line - - `print_project_heading()` [106..114] - prints project name from README \ No newline at end of file + - `print_project_heading()` [106..114] - prints project name from README + +### Interactive Commands Module +Handles `/` commands in interactive mode (extracted from interactive.rs). + +- `crates/g3-cli/src/commands.rs` + - `handle_command()` [17..320] - dispatches `/help`, `/compact`, `/thinnify`, `/skinnify`, `/fragments`, `/rehydrate`, `/run`, `/dump`, `/clear`, `/readme`, `/stats`, `/resume` + - Returns `Result` - true if command handled and loop should continue \ No newline at end of file diff --git a/crates/g3-core/src/lib.rs b/crates/g3-core/src/lib.rs index 4a1b37c..454910b 100644 --- a/crates/g3-core/src/lib.rs +++ b/crates/g3-core/src/lib.rs @@ -1897,31 +1897,23 @@ Skip if nothing new. Be brief."#; debug!("Starting stream_completion_with_tools"); // --- State Initialization --- - let full_response = String::new(); - let mut first_token_time: Option = None; - let stream_start = Instant::now(); - let mut iteration_count = 0; - const MAX_ITERATIONS: usize = 400; // Prevent infinite loops - let mut response_started = false; - let mut any_tool_executed = false; // Track if ANY tool was executed across all iterations - let mut assistant_message_added = false; // Track if assistant message was added to context this iteration - // Note: Session-level duplicate tracking was removed - we only prevent sequential duplicates (DUP IN CHUNK, DUP IN MSG) - let mut turn_accumulated_usage: Option = None; // Track token usage for timing footer + // Note: Session-level duplicate tracking was removed - we only prevent sequential duplicates (DUP IN CHUNK, DUP IN MSG) + let mut state = streaming::StreamingState::new(); // --- Phase 1: Pre-loop Context Capacity Check --- self.ensure_context_capacity(&mut request).await?; // --- Phase 2: Main Streaming Loop --- loop { - iteration_count += 1; - debug!("Starting iteration {}", iteration_count); - if iteration_count > MAX_ITERATIONS { + state.iteration_count += 1; + debug!("Starting iteration {}", state.iteration_count); + if state.iteration_count > streaming::MAX_ITERATIONS { warn!("Maximum iterations reached, stopping stream"); break; } // Add a small delay between iterations to prevent "model busy" errors - if iteration_count > 1 { + if state.iteration_count > 1 { tokio::time::sleep(tokio::time::Duration::from_millis(50)).await; } @@ -1970,10 +1962,10 @@ Skip if nothing new. Be brief."#; Err(e) => { error!("Failed to start stream: {}", e); // Additional retry for "busy" errors on subsequent iterations - if iteration_count > 1 && e.to_string().contains("busy") { + if state.iteration_count > 1 && e.to_string().contains("busy") { warn!( "Model busy on iteration {}, attempting one more retry in 500ms", - iteration_count + state.iteration_count ); tokio::time::sleep(tokio::time::Duration::from_millis(500)).await; @@ -1994,14 +1986,8 @@ Skip if nothing new. Be brief."#; // Write context window summary every time we send messages to LLM self.write_context_window_summary(); - let mut parser = StreamingToolParser::new(); - let mut current_response = String::new(); - let mut tool_executed = false; - let mut chunks_received = 0; - let mut raw_chunks: Vec = Vec::new(); // Store raw chunks for debugging - - let mut accumulated_usage: Option = None; - let mut stream_stop_reason: Option = None; // Track why the stream stopped + // Create fresh iteration state for this streaming iteration + let mut iter = streaming::IterationState::new(); while let Some(chunk_result) = stream.next().await { match chunk_result { @@ -2011,8 +1997,8 @@ Skip if nothing new. Be brief."#; // Capture usage data if available if let Some(ref usage) = chunk.usage { - accumulated_usage = Some(usage.clone()); - turn_accumulated_usage = Some(usage.clone()); + iter.accumulated_usage = Some(usage.clone()); + state.turn_accumulated_usage = Some(usage.clone()); debug!( "Received usage data - prompt: {}, completion: {}, total: {}", usage.prompt_tokens, usage.completion_tokens, usage.total_tokens @@ -2031,29 +2017,29 @@ Skip if nothing new. Be brief."#; } // Store raw chunk for debugging (limit to first 20 and last 5) - if chunks_received < 20 || chunk.finished { - raw_chunks.push(format!( + if iter.chunks_received < 20 || chunk.finished { + iter.raw_chunks.push(format!( "Chunk #{}: content={:?}, finished={}, tool_calls={:?}", - chunks_received + 1, + iter.chunks_received + 1, chunk.content, chunk.finished, chunk.tool_calls )); - } else if raw_chunks.len() == 20 { - raw_chunks.push("... (chunks 21+ omitted for brevity) ...".to_string()); + } else if iter.raw_chunks.len() == 20 { + iter.raw_chunks.push("... (chunks 21+ omitted for brevity) ...".to_string()); } // Record time to first token - if first_token_time.is_none() && !chunk.content.is_empty() { - first_token_time = Some(stream_start.elapsed()); + if state.first_token_time.is_none() && !chunk.content.is_empty() { + state.first_token_time = Some(state.stream_start.elapsed()); // Record in agent metrics - if let Some(ttft) = first_token_time { + if let Some(ttft) = state.first_token_time { self.first_token_times.push(ttft); } } - chunks_received += 1; - if chunks_received == 1 { + iter.chunks_received += 1; + if iter.chunks_received == 1 { debug!( "First chunk received: content_len={}, finished={}", chunk.content.len(), @@ -2062,7 +2048,7 @@ Skip if nothing new. Be brief."#; } // Process chunk with the new parser - let completed_tools = parser.process_chunk(&chunk); + let completed_tools = iter.parser.process_chunk(&chunk); // Handle completed tool calls - process all if multiple calls enabled // Always process all tool calls - they will be executed after stream ends @@ -2101,8 +2087,8 @@ Skip if nothing new. Be brief."#; } // Calculate new content to display (skip already-shown text) - let already_displayed_chars = current_response.chars().count(); - let text_content = parser.get_text_content(); + let already_displayed_chars = iter.current_response.chars().count(); + let text_content = iter.parser.get_text_content(); let clean_content = streaming::clean_llm_tokens(&text_content); let raw_content_for_log = clean_content.clone(); let filtered_content = @@ -2111,7 +2097,7 @@ Skip if nothing new. Be brief."#; // Extract only the new (undisplayed) portion let new_content = - if current_response.len() <= final_display_content.len() { + if iter.current_response.len() <= final_display_content.len() { final_display_content .chars() .skip(already_displayed_chars) @@ -2123,13 +2109,13 @@ Skip if nothing new. Be brief."#; // Display new text before tool execution if !new_content.trim().is_empty() { #[allow(unused_assignments)] - if !response_started { + if !state.response_started { self.ui_writer.print_agent_prompt(); - response_started = true; + state.response_started = true; } self.ui_writer.print_agent_response(&new_content); self.ui_writer.flush(); - current_response.push_str(&new_content); + iter.current_response.push_str(&new_content); } self.ui_writer.finish_streaming_markdown(); @@ -2392,8 +2378,8 @@ Skip if nothing new. Be brief."#; // 1. At the end when no tools were executed // 2. At the end when no tools were executed (handled in the "no tool executed" branch) - tool_executed = true; - any_tool_executed = true; // Track across all iterations + iter.tool_executed = true; + state.any_tool_executed = true; // Track across all iterations // Reset the JSON tool call filter state after each tool execution // This ensures the filter doesn't stay in suppression mode for subsequent streaming content @@ -2401,22 +2387,22 @@ Skip if nothing new. Be brief."#; // Only reset parser if there are no more unexecuted tool calls in the buffer // This handles the case where the LLM emits multiple tool calls in one response - if parser.has_unexecuted_tool_call() { + if iter.parser.has_unexecuted_tool_call() { debug!( "Parser still has unexecuted tool calls, not resetting buffer" ); // Mark current tool as consumed so we don't re-detect it - parser.mark_tool_calls_consumed(); + iter.parser.mark_tool_calls_consumed(); } else { // Reset parser for next iteration - this clears the text buffer - parser.reset(); + iter.parser.reset(); } // Clear current_response for next iteration to prevent buffered text // from being incorrectly displayed after tool execution - current_response.clear(); + iter.current_response.clear(); // Reset for next iteration (value read in next loop pass) - response_started = false; + state.response_started = false; // Continue processing - don't break mid-stream } // End of for loop processing each tool call @@ -2425,7 +2411,7 @@ Skip if nothing new. Be brief."#; // All tool calls are collected and executed after the stream ends. // If no tool calls were completed, continue streaming normally - if !tool_executed { + if !iter.tool_executed { let clean_content = streaming::clean_llm_tokens(&chunk.content); if !clean_content.is_empty() { @@ -2433,42 +2419,42 @@ Skip if nothing new. Be brief."#; self.ui_writer.filter_json_tool_calls(&clean_content); if !filtered_content.is_empty() { - if !response_started { + if !state.response_started { self.ui_writer.print_agent_prompt(); - response_started = true; + state.response_started = true; } self.ui_writer.print_agent_response(&filtered_content); self.ui_writer.flush(); - current_response.push_str(&filtered_content); + iter.current_response.push_str(&filtered_content); } } } if chunk.finished { debug!("Stream finished: tool_executed={}, current_response_len={}, full_response_len={}, chunks_received={}", - tool_executed, current_response.len(), full_response.len(), chunks_received); + iter.tool_executed, iter.current_response.len(), state.full_response.len(), iter.chunks_received); // Capture the stop reason from the final chunk if let Some(ref reason) = chunk.stop_reason { debug!("Stream stop_reason: {}", reason); - stream_stop_reason = Some(reason.clone()); + iter.stream_stop_reason = Some(reason.clone()); } // Stream finished - check if we should continue or return - if !tool_executed { + if !iter.tool_executed { // No tools were executed in this iteration // Check if we got any meaningful response at all // We need to check the parser's text buffer as well, since the LLM // might have responded with text but no tool calls - let text_content = parser.get_text_content(); + let text_content = iter.parser.get_text_content(); let has_text_response = !text_content.trim().is_empty() - || !current_response.trim().is_empty(); + || !iter.current_response.trim().is_empty(); // Don't re-add text from parser buffer if we already displayed it // The parser buffer contains ALL accumulated text, but current_response // already has what was displayed during streaming - if current_response.is_empty() && !text_content.trim().is_empty() { + if iter.current_response.is_empty() && !text_content.trim().is_empty() { // Only use parser text if we truly have no response // This should be rare - only if streaming failed to display anything debug!("Warning: Using parser buffer text as fallback - this may duplicate output"); @@ -2480,7 +2466,7 @@ Skip if nothing new. Be brief."#; self.ui_writer.filter_json_tool_calls(&clean_text); // Only use this if we truly have nothing else - if !filtered_text.trim().is_empty() && full_response.is_empty() + if !filtered_text.trim().is_empty() && state.full_response.is_empty() { debug!( "Using filtered parser text as last resort: {} chars", @@ -2491,22 +2477,22 @@ Skip if nothing new. Be brief."#; } } - if !has_text_response && full_response.is_empty() { + if !has_text_response && state.full_response.is_empty() { streaming::log_stream_error( - iteration_count, + state.iteration_count, &provider_name, &provider_model, - chunks_received, - &parser, + iter.chunks_received, + &iter.parser, &request, &self.context_window, self.session_id.as_deref(), - &raw_chunks, + &iter.raw_chunks, ); // No response received - this is an error condition warn!("Stream finished without any content or tool calls"); - warn!("Chunks received: {}", chunks_received); + warn!("Chunks received: {}", iter.chunks_received); return Err(anyhow::anyhow!( "No response received from the model. The model may be experiencing issues or the request may have been malformed." )); @@ -2514,18 +2500,18 @@ Skip if nothing new. Be brief."#; // If tools were executed in previous iterations, // break to let the outer loop handle finalization - if any_tool_executed { + if state.any_tool_executed { debug!("Tools were executed in previous iterations, breaking to finalize"); // IMPORTANT: Save any text response to context window before breaking // This ensures text displayed after tool execution is not lost - if !current_response.trim().is_empty() && !assistant_message_added { - debug!("Saving current_response ({} chars) to context before finalization", current_response.len()); + if !iter.current_response.trim().is_empty() && !state.assistant_message_added { + debug!("Saving current_response ({} chars) to context before finalization", iter.current_response.len()); let assistant_msg = Message::new( MessageRole::Assistant, - current_response.clone(), + iter.current_response.clone(), ); self.context_window.add_message(assistant_msg); - assistant_message_added = true; + state.assistant_message_added = true; } // NOTE: We intentionally do NOT set full_response here. @@ -2538,14 +2524,14 @@ Skip if nothing new. Be brief."#; // Save assistant message before returning (no tools were executed) // This ensures text-only responses are saved to context - if !current_response.trim().is_empty() && !assistant_message_added { - debug!("Saving current_response ({} chars) to context before early return", current_response.len()); + if !iter.current_response.trim().is_empty() && !state.assistant_message_added { + debug!("Saving current_response ({} chars) to context before early return", iter.current_response.len()); let assistant_msg = Message::new( MessageRole::Assistant, - current_response.clone(), + iter.current_response.clone(), ); self.context_window.add_message(assistant_msg); - // assistant_message_added = true; // Not needed, we're returning + // state.assistant_message_added = true; // Not needed, we're returning } // Set full_response to empty to avoid duplication in return value @@ -2553,9 +2539,9 @@ Skip if nothing new. Be brief."#; return Ok(self.finalize_streaming_turn( String::new(), show_timing, - stream_start, - first_token_time, - &turn_accumulated_usage, + state.stream_start, + state.first_token_time, + &state.turn_accumulated_usage, )); } break; // Tool was executed, break to continue outer loop @@ -2566,13 +2552,13 @@ Skip if nothing new. Be brief."#; let error_msg = e.to_string(); let error_details = format!( "Streaming error at chunk {}: {}", - chunks_received + 1, + iter.chunks_received + 1, error_msg ); error!("Error type: {}", std::any::type_name_of_val(&e)); error!("Parser state at error: text_buffer_len={}, has_incomplete={}, message_stopped={}", - parser.text_buffer_len(), parser.has_incomplete_tool_call(), parser.is_message_stopped()); + iter.parser.text_buffer_len(), iter.parser.has_incomplete_tool_call(), iter.parser.is_message_stopped()); // Check if this is a recoverable connection error let is_connection_error = streaming::is_connection_error(&error_msg); @@ -2580,26 +2566,26 @@ Skip if nothing new. Be brief."#; if is_connection_error { warn!( "Connection error at chunk {}, treating as end of stream", - chunks_received + 1 + iter.chunks_received + 1 ); // If we have any content or tool calls, treat this as a graceful end - if chunks_received > 0 - && (!parser.get_text_content().is_empty() - || parser.has_unexecuted_tool_call()) + if iter.chunks_received > 0 + && (!iter.parser.get_text_content().is_empty() + || iter.parser.has_unexecuted_tool_call()) { warn!("Stream terminated unexpectedly but we have content, continuing"); break; // Break to process what we have } } - if tool_executed { + if iter.tool_executed { error!("{}", error_details); warn!("Stream error after tool execution, attempting to continue"); break; // Break to outer loop to start new stream } else { // Log raw chunks before failing error!("Fatal streaming error. Raw chunks received before error:"); - for chunk_str in raw_chunks.iter().take(10) { + for chunk_str in iter.raw_chunks.iter().take(10) { error!(" {}", chunk_str); } return Err(e); @@ -2609,41 +2595,41 @@ Skip if nothing new. Be brief."#; } // Update context window with actual usage if available - if let Some(usage) = accumulated_usage { + if let Some(usage) = iter.accumulated_usage { debug!("Updating context window with actual usage from stream"); self.context_window.update_usage_from_response(&usage); } else { // Fall back to estimation if no usage data was provided debug!("No usage data from stream, using estimation"); - let estimated_tokens = ContextWindow::estimate_tokens(¤t_response); + let estimated_tokens = ContextWindow::estimate_tokens(&iter.current_response); self.context_window.add_streaming_tokens(estimated_tokens); } // If we get here and no tool was executed, we're done - if !tool_executed { + if !iter.tool_executed { // IMPORTANT: Do NOT add parser text_content here! // The text has already been displayed during streaming via current_response. // The parser buffer accumulates ALL text and would cause duplication. debug!("Stream completed without tool execution. Response already displayed during streaming."); debug!( "Current response length: {}, Full response length: {}", - current_response.len(), - full_response.len() + iter.current_response.len(), + state.full_response.len() ); - let has_response = !current_response.is_empty() || !full_response.is_empty(); + let has_response = !iter.current_response.is_empty() || !state.full_response.is_empty(); // Check if the response is essentially empty (just whitespace or timing lines) // Check if there's an incomplete tool call in the buffer (for debugging) - let has_incomplete_tool_call = parser.has_incomplete_tool_call(); + let has_incomplete_tool_call = iter.parser.has_incomplete_tool_call(); // Check if there's a complete but unexecuted tool call in the buffer (for debugging) - let has_unexecuted_tool_call = parser.has_unexecuted_tool_call(); + let has_unexecuted_tool_call = iter.parser.has_unexecuted_tool_call(); // Log when we detect unexecuted or incomplete tool calls for debugging if has_incomplete_tool_call { debug!("Detected incomplete tool call in buffer (buffer_len={}, consumed_up_to={})", - parser.text_buffer_len(), parser.text_buffer_len()); + iter.parser.text_buffer_len(), iter.parser.text_buffer_len()); } if has_unexecuted_tool_call { debug!("Detected unexecuted tool call in buffer - this may indicate a parsing issue"); @@ -2652,7 +2638,7 @@ Skip if nothing new. Be brief."#; // Check if the response was truncated due to max_tokens let was_truncated_by_max_tokens = - stream_stop_reason.as_deref() == Some("max_tokens"); + iter.stream_stop_reason.as_deref() == Some("max_tokens"); if was_truncated_by_max_tokens { debug!("Response was truncated due to max_tokens limit"); warn!("LLM response was cut off due to max_tokens limit"); @@ -2662,7 +2648,7 @@ Skip if nothing new. Be brief."#; if has_response { debug!( "Response already streamed, not setting full_response. current_response: {} chars", - current_response.len() + iter.current_response.len() ); } @@ -2670,9 +2656,9 @@ Skip if nothing new. Be brief."#; // This ensures the log contains the true raw content including any JSON. // Note: We check current_response, not full_response, because full_response // may be empty to avoid display duplication (content was already streamed). - if !current_response.trim().is_empty() && !assistant_message_added { + if !iter.current_response.trim().is_empty() && !state.assistant_message_added { // Get the raw text from the parser (before filtering) - let raw_text = parser.get_text_content(); + let raw_text = iter.parser.get_text_content(); let raw_clean = streaming::clean_llm_tokens(&raw_text); // Use raw_clean if available, otherwise fall back to current_response. @@ -2681,18 +2667,18 @@ Skip if nothing new. Be brief."#; let content_to_save = if !raw_clean.trim().is_empty() { raw_clean } else { - current_response.clone() + iter.current_response.clone() }; let assistant_message = Message::new(MessageRole::Assistant, content_to_save); self.context_window.add_message(assistant_message); } return Ok(self.finalize_streaming_turn( - full_response, + state.full_response.clone(), show_timing, - stream_start, - first_token_time, - &turn_accumulated_usage, + state.stream_start, + state.first_token_time, + &state.turn_accumulated_usage, )); } @@ -2701,11 +2687,11 @@ Skip if nothing new. Be brief."#; // --- Phase 4: Post-Loop Finalization --- Ok(self.finalize_streaming_turn( - full_response, + state.full_response.clone(), show_timing, - stream_start, - first_token_time, - &turn_accumulated_usage, + state.stream_start, + state.first_token_time, + &state.turn_accumulated_usage, )) } diff --git a/crates/g3-core/src/streaming.rs b/crates/g3-core/src/streaming.rs index 4ffca01..3139243 100644 --- a/crates/g3-core/src/streaming.rs +++ b/crates/g3-core/src/streaming.rs @@ -23,6 +23,7 @@ pub struct StreamingState { pub response_started: bool, pub any_tool_executed: bool, pub auto_summary_attempts: usize, + pub assistant_message_added: bool, pub turn_accumulated_usage: Option, } @@ -36,6 +37,7 @@ impl StreamingState { response_started: false, any_tool_executed: false, auto_summary_attempts: 0, + assistant_message_added: false, turn_accumulated_usage: None, } } @@ -65,6 +67,7 @@ pub struct IterationState { pub chunks_received: usize, pub raw_chunks: Vec, pub accumulated_usage: Option, + pub stream_stop_reason: Option, } impl IterationState { @@ -76,6 +79,7 @@ impl IterationState { chunks_received: 0, raw_chunks: Vec::new(), accumulated_usage: None, + stream_stop_reason: None, } }