From 0ae1a13cdbf0641d835c1f0662fbc3203052eb5f Mon Sep 17 00:00:00 2001 From: "Dhanji R. Prasanna" Date: Thu, 15 Jan 2026 12:11:44 +0530 Subject: [PATCH] feat: real-time tool call streaming indicator with blinking UI MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Add ToolParsingHint enum (Detected/Active/Complete) for UI feedback - New UiWriter methods: print_tool_streaming_hint(), print_tool_streaming_active() - Refactor ConsoleUiWriter state to use atomics in ParsingHintState - Add tool_call_streaming field to CompletionChunk for provider hints - Anthropic provider sends streaming hints when tool name detected - New streaming helpers: make_tool_streaming_hint(), make_tool_streaming_active() Parser improvements: - Add is_json_invalidated() to detect false positive tool patterns - Fix tool result poisoning when file contents contain partial JSON - Unescaped newlines in strings or prose after JSON invalidates detection User sees ' ● tool_name |' immediately when tool call starts streaming, with blinking indicator while args are received. --- crates/g3-cli/src/filter_json.rs | 11 + crates/g3-cli/src/ui_writer_impl.rs | 199 ++++++++++++++---- crates/g3-core/src/lib.rs | 11 + crates/g3-core/src/streaming_parser.rs | 3 +- crates/g3-core/src/ui_writer.rs | 10 + .../g3-core/tests/duplicate_detection_test.rs | 1 + .../tests/incomplete_tool_call_test.rs | 11 + .../g3-core/tests/parser_sanitization_test.rs | 1 + ...stream_completion_characterization_test.rs | 7 + crates/g3-core/tests/streaming_parser_test.rs | 1 + crates/g3-core/tests/todo_staleness_test.rs | 2 + .../tests/tool_result_poisoning_test.rs | 1 + crates/g3-planner/src/llm.rs | 8 + crates/g3-providers/src/anthropic.rs | 14 +- crates/g3-providers/src/databricks.rs | 1 + crates/g3-providers/src/lib.rs | 2 + crates/g3-providers/src/openai.rs | 1 + crates/g3-providers/src/streaming.rs | 29 +++ 18 files changed, 271 insertions(+), 42 deletions(-) diff --git a/crates/g3-cli/src/filter_json.rs b/crates/g3-cli/src/filter_json.rs index 7ddd597..b9dad8f 100644 --- a/crates/g3-cli/src/filter_json.rs +++ b/crates/g3-cli/src/filter_json.rs @@ -21,6 +21,17 @@ use tracing::debug; /// Realistically: `{"tool":"` = 9 chars, with whitespace maybe 15 max const MAX_BUFFER_FOR_DETECTION: usize = 20; +/// Hints emitted during tool call parsing for UI feedback. +#[derive(Debug, Clone)] +pub enum ToolParsingHint { + /// Tool call detected, name is known. UI should show " ● tool_name |" + Detected(String), + /// More characters being parsed. UI should blink the indicator. + Active, + /// Tool call JSON fully parsed. UI should clear the parsing indicator. + Complete, +} + // Thread-local state for tracking JSON tool call suppression thread_local! { static JSON_TOOL_STATE: RefCell = RefCell::new(FilterState::new()); diff --git a/crates/g3-cli/src/ui_writer_impl.rs b/crates/g3-cli/src/ui_writer_impl.rs index a904bf6..4c2c64e 100644 --- a/crates/g3-cli/src/ui_writer_impl.rs +++ b/crates/g3-cli/src/ui_writer_impl.rs @@ -1,27 +1,129 @@ -use crate::filter_json::{filter_json_tool_calls, reset_json_tool_state}; +use crate::filter_json::{filter_json_tool_calls, reset_json_tool_state, ToolParsingHint}; use crate::streaming_markdown::StreamingMarkdownFormatter; use g3_core::ui_writer::UiWriter; use std::io::{self, Write}; -use std::sync::Mutex; +use std::sync::{Arc, Mutex, atomic::{AtomicBool, AtomicU8, Ordering}}; use termimad::MadSkin; +/// Padding width for tool names in compact display (longest tool: "str_replace" = 11 chars) +const TOOL_NAME_PADDING: usize = 11; + +/// ANSI color codes for tool names +const TOOL_COLOR_NORMAL: &str = "\x1b[32m"; +const TOOL_COLOR_NORMAL_BOLD: &str = "\x1b[1;32m"; +const TOOL_COLOR_AGENT: &str = "\x1b[38;5;250m"; +const TOOL_COLOR_AGENT_BOLD: &str = "\x1b[1;38;5;250m"; + +/// Blink state values for the streaming indicator +const BLINK_INACTIVE: u8 = 0; +const BLINK_SHOW_PIPE: u8 = 1; +const BLINK_SHOW_SPACE: u8 = 2; + +/// Shared state for tool parsing hints that can be used in callbacks. +/// This is separate from ConsoleUiWriter so it can be captured by Arc in closures. +#[derive(Clone)] +struct ParsingHintState { + parsing_indicator_printed: Arc, + last_output_was_text: Arc, + last_output_was_tool: Arc, + is_agent_mode: Arc, + /// Blink state: 0 = inactive, 1 = show pipe, 2 = show space + blink_state: Arc, +} + +impl ParsingHintState { + fn new() -> Self { + Self { + parsing_indicator_printed: Arc::new(AtomicBool::new(false)), + last_output_was_text: Arc::new(AtomicBool::new(false)), + last_output_was_tool: Arc::new(AtomicBool::new(false)), + is_agent_mode: Arc::new(AtomicBool::new(false)), + blink_state: Arc::new(AtomicU8::new(BLINK_INACTIVE)), + } + } + + fn clear(&self) { + self.parsing_indicator_printed.store(false, Ordering::Relaxed); + self.blink_state.store(BLINK_INACTIVE, Ordering::Relaxed); + } + + /// Handle a tool parsing hint - this is the core logic extracted for use in callbacks + fn handle_hint(&self, hint: ToolParsingHint) { + match hint { + ToolParsingHint::Detected(tool_name) => { + // Stop any previous blinking + self.blink_state.store(BLINK_INACTIVE, Ordering::Relaxed); + + // Check if we've already printed an indicator (this is an update) + let already_printed = self.parsing_indicator_printed.load(Ordering::Relaxed); + + if already_printed { + // Update in place: clear line and reprint with new name + print!("\r\x1b[2K"); + } else { + // First time: add blank line if last output was text + if self.last_output_was_text.load(Ordering::Relaxed) { + println!(); + } + self.last_output_was_text.store(false, Ordering::Relaxed); + self.last_output_was_tool.store(true, Ordering::Relaxed); + } + + // Get color based on agent mode + let tool_color = if self.is_agent_mode.load(Ordering::Relaxed) { + TOOL_COLOR_AGENT + } else { + TOOL_COLOR_NORMAL + }; + + // Print the indicator: " ● tool_name |" + print!(" \x1b[2m●\x1b[0m {}{: { + // Toggle blink state for visual feedback + let current = self.blink_state.load(Ordering::Relaxed); + if current != BLINK_INACTIVE { + let new_state = if current == BLINK_SHOW_PIPE { BLINK_SHOW_SPACE } else { BLINK_SHOW_PIPE }; + self.blink_state.store(new_state, Ordering::Relaxed); + let indicator = if new_state == BLINK_SHOW_PIPE { "|" } else { " " }; + // Move back one char and reprint + print!("\x1b[1D\x1b[2m{}\x1b[0m", indicator); + let _ = io::stdout().flush(); + } + } + ToolParsingHint::Complete => { + // Stop blinking + self.blink_state.store(BLINK_INACTIVE, Ordering::Relaxed); + // Clear the parsing indicator line - the actual tool output will follow + if self.parsing_indicator_printed.load(Ordering::Relaxed) { + // Clear the current line and move to start + print!("\r\x1b[2K"); + let _ = io::stdout().flush(); + } + self.clear(); + } + } + } +} + /// Console implementation of UiWriter that prints to stdout pub struct ConsoleUiWriter { current_tool_name: std::sync::Mutex>, current_tool_args: std::sync::Mutex>, current_output_line: std::sync::Mutex>, output_line_printed: std::sync::Mutex, - is_agent_mode: std::sync::Mutex, /// Track if we're in shell compact mode (for appending timing to output line) is_shell_compact: std::sync::Mutex, /// Streaming markdown formatter for agent responses markdown_formatter: Mutex>, - /// Track if the last output was text (for spacing between text and tool calls) - last_output_was_text: std::sync::Mutex, - /// Track if the last output was a tool call (for spacing between tool calls and text) - last_output_was_tool: std::sync::Mutex, /// Track the last read_file path for continuation display last_read_file_path: std::sync::Mutex>, + /// Shared state for tool parsing hints (used by real-time callback) + hint_state: ParsingHintState, } /// ANSI color code for duration display based on elapsed time. @@ -61,6 +163,7 @@ impl ConsoleUiWriter { *self.current_output_line.lock().unwrap() = None; *self.output_line_printed.lock().unwrap() = false; } + } impl ConsoleUiWriter { @@ -70,12 +173,10 @@ impl ConsoleUiWriter { current_tool_args: std::sync::Mutex::new(Vec::new()), current_output_line: std::sync::Mutex::new(None), output_line_printed: std::sync::Mutex::new(false), - is_agent_mode: std::sync::Mutex::new(false), is_shell_compact: std::sync::Mutex::new(false), markdown_formatter: Mutex::new(None), - last_output_was_text: std::sync::Mutex::new(false), - last_output_was_tool: std::sync::Mutex::new(false), last_read_file_path: std::sync::Mutex::new(None), + hint_state: ParsingHintState::new(), } } } @@ -163,14 +264,17 @@ impl UiWriter for ConsoleUiWriter { } fn print_tool_output_header(&self) { + // Clear any streaming hint that might be showing + // This ensures we don't duplicate the tool name on the line + self.hint_state.handle_hint(ToolParsingHint::Complete); + // Add blank line if last output was text (for visual separation) - let mut last_was_text = self.last_output_was_text.lock().unwrap(); - if *last_was_text { + let last_was_text = self.hint_state.last_output_was_text.load(Ordering::Relaxed); + if last_was_text { println!(); } - *last_was_text = false; // We're now outputting a tool call - *self.last_output_was_tool.lock().unwrap() = true; - drop(last_was_text); // Release lock early + self.hint_state.last_output_was_text.store(false, Ordering::Relaxed); + self.hint_state.last_output_was_tool.store(true, Ordering::Relaxed); // Reset output_line_printed at the start of a new tool output // This ensures the header isn't cleared by update_tool_output_line @@ -179,9 +283,13 @@ impl UiWriter for ConsoleUiWriter { *self.is_shell_compact.lock().unwrap() = false; // Now print the tool header with the most important arg // Use light gray/silver in agent mode, bold green otherwise - let is_agent_mode = *self.is_agent_mode.lock().unwrap(); + let is_agent_mode = self.hint_state.is_agent_mode.load(Ordering::Relaxed); // Light gray/silver: \x1b[38;5;250m, Bold green: \x1b[1;32m - let tool_color = if is_agent_mode { "\x1b[1;38;5;250m" } else { "\x1b[1;32m" }; + let tool_color = if is_agent_mode { + TOOL_COLOR_AGENT_BOLD + } else { + TOOL_COLOR_NORMAL_BOLD + }; if let Some(tool_name) = self.current_tool_name.lock().unwrap().as_ref() { let args = self.current_tool_args.lock().unwrap(); @@ -323,6 +431,10 @@ impl UiWriter for ConsoleUiWriter { } fn print_tool_compact(&self, tool_name: &str, summary: &str, duration_str: &str, tokens_delta: u32, _context_percentage: f32) -> bool { + // Clear any streaming hint that might be showing + // This ensures we don't duplicate the tool name on the line + self.hint_state.handle_hint(ToolParsingHint::Complete); + // Handle file operation tools and other compact tools let is_compact_tool = matches!(tool_name, "read_file" | "write_file" | "str_replace" | "remember" | "screenshot" | "coverage" | "rehydrate" | "code_search"); if !is_compact_tool { @@ -332,15 +444,14 @@ impl UiWriter for ConsoleUiWriter { } // Add blank line if last output was text (for visual separation) - let mut last_was_text = self.last_output_was_text.lock().unwrap(); - if *last_was_text { + if self.hint_state.last_output_was_text.load(Ordering::Relaxed) { println!(); } - *last_was_text = false; // We're now outputting a tool call - *self.last_output_was_tool.lock().unwrap() = true; + self.hint_state.last_output_was_text.store(false, Ordering::Relaxed); + self.hint_state.last_output_was_tool.store(true, Ordering::Relaxed); let args = self.current_tool_args.lock().unwrap(); - let is_agent_mode = *self.is_agent_mode.lock().unwrap(); + let is_agent_mode = self.hint_state.is_agent_mode.load(Ordering::Relaxed); // Get file path (for file operation tools) let file_path = args @@ -422,7 +533,7 @@ impl UiWriter for ConsoleUiWriter { }; // Color for tool name - let tool_color = if is_agent_mode { "\x1b[38;5;250m" } else { "\x1b[32m" }; + let tool_color = if is_agent_mode { TOOL_COLOR_AGENT } else { TOOL_COLOR_NORMAL }; // Print compact single line if is_continuation { @@ -469,29 +580,26 @@ impl UiWriter for ConsoleUiWriter { fn print_todo_compact(&self, content: Option<&str>, is_write: bool) -> bool { let tool_name = if is_write { "todo_write" } else { "todo_read" }; - let is_agent_mode = *self.is_agent_mode.lock().unwrap(); - let tool_color = if is_agent_mode { "\x1b[38;5;250m" } else { "\x1b[32m" }; + let is_agent_mode = self.hint_state.is_agent_mode.load(Ordering::Relaxed); + let tool_color = if is_agent_mode { TOOL_COLOR_AGENT } else { TOOL_COLOR_NORMAL }; // Add blank line if last output was text (for visual separation) - let mut last_was_text = self.last_output_was_text.lock().unwrap(); - if *last_was_text { + if self.hint_state.last_output_was_text.load(Ordering::Relaxed) { println!(); } - *last_was_text = false; - *self.last_output_was_tool.lock().unwrap() = true; + self.hint_state.last_output_was_text.store(false, Ordering::Relaxed); + self.hint_state.last_output_was_tool.store(true, Ordering::Relaxed); // Reset read_file continuation tracking *self.last_read_file_path.lock().unwrap() = None; match content { None => { // Empty TODO - // Pad to align with longest compact tool (str_replace = 11 chars) - println!(" \x1b[2m●\x1b[0m {}{:<11}\x1b[0m \x1b[2m|\x1b[0m \x1b[35mempty\x1b[0m", tool_color, tool_name); + println!(" \x1b[2m●\x1b[0m {}{: { // Header - // Pad to align with longest compact tool (str_replace = 11 chars) - println!(" \x1b[2m●\x1b[0m {}{:<11}\x1b[0m", tool_color, tool_name); + println!(" \x1b[2m●\x1b[0m {}{: = text.lines().collect(); let last_idx = lines.len().saturating_sub(1); @@ -574,18 +682,17 @@ impl UiWriter for ConsoleUiWriter { if let Some(ref mut formatter) = *formatter_guard { // Add blank line if last output was a tool call (for visual separation) // Only do this once at the start of new text content - let mut last_was_tool = self.last_output_was_tool.lock().unwrap(); - if *last_was_tool && !content.trim().is_empty() { + let last_was_tool = self.hint_state.last_output_was_tool.load(Ordering::Relaxed); + if last_was_tool && !content.trim().is_empty() { println!(); - *last_was_tool = false; + self.hint_state.last_output_was_tool.store(false, Ordering::Relaxed); } - drop(last_was_tool); let formatted = formatter.process(content); print!("{}", formatted); // Track that we just output text (only if non-empty) if !content.trim().is_empty() { - *self.last_output_was_text.lock().unwrap() = true; + self.hint_state.last_output_was_text.store(true, Ordering::Relaxed); // Reset read_file continuation tracking when text is output between tool calls *self.last_read_file_path.lock().unwrap() = None; } @@ -611,6 +718,16 @@ impl UiWriter for ConsoleUiWriter { // No-op for console - we don't track SSEs in console mode } + fn print_tool_streaming_hint(&self, tool_name: &str) { + // Use the hint state to show the streaming indicator + self.hint_state.handle_hint(ToolParsingHint::Detected(tool_name.to_string())); + } + + fn print_tool_streaming_active(&self) { + // Trigger the blink animation + self.hint_state.handle_hint(ToolParsingHint::Active); + } + fn flush(&self) { let _ = io::stdout().flush(); } @@ -652,7 +769,9 @@ impl UiWriter for ConsoleUiWriter { fn filter_json_tool_calls(&self, content: &str) -> String { - // Apply JSON tool call filtering for display + // Filter the content to remove JSON tool calls from display. + // Tool streaming hints are now handled via the provider's tool_call_streaming + // field in CompletionChunk, not via callbacks during JSON filtering. filter_json_tool_calls(content) } @@ -662,6 +781,6 @@ impl UiWriter for ConsoleUiWriter { } fn set_agent_mode(&self, is_agent_mode: bool) { - *self.is_agent_mode.lock().unwrap() = is_agent_mode; + self.hint_state.is_agent_mode.store(is_agent_mode, Ordering::Relaxed); } } diff --git a/crates/g3-core/src/lib.rs b/crates/g3-core/src/lib.rs index ffa7bfd..6d6f2ae 100644 --- a/crates/g3-core/src/lib.rs +++ b/crates/g3-core/src/lib.rs @@ -1959,6 +1959,17 @@ Skip if nothing new. Be brief."#; ); } + // Handle tool call streaming hint (show UI indicator immediately) + if let Some(ref tool_name) = chunk.tool_call_streaming { + if tool_name.is_empty() { + // Empty string = "active" hint for blinking + self.ui_writer.print_tool_streaming_active(); + } else { + // Non-empty = "detected" hint with tool name + self.ui_writer.print_tool_streaming_hint(tool_name); + } + } + // Store raw chunk for debugging (limit to first 20 and last 5) if chunks_received < 20 || chunk.finished { raw_chunks.push(format!( diff --git a/crates/g3-core/src/streaming_parser.rs b/crates/g3-core/src/streaming_parser.rs index bf1a6bb..6b8cba6 100644 --- a/crates/g3-core/src/streaming_parser.rs +++ b/crates/g3-core/src/streaming_parser.rs @@ -587,7 +587,8 @@ Some text after"#; finished: true, tool_calls: None, usage: None, - stop_reason: None, + stop_reason: None, + tool_call_streaming: None, }; let tools = parser.process_chunk(&chunk); diff --git a/crates/g3-core/src/ui_writer.rs b/crates/g3-core/src/ui_writer.rs index ee12238..f6e293c 100644 --- a/crates/g3-core/src/ui_writer.rs +++ b/crates/g3-core/src/ui_writer.rs @@ -67,6 +67,14 @@ pub trait UiWriter: Send + Sync { /// Notify that an SSE event was received (including pings) fn notify_sse_received(&self); + /// Print a hint that a tool call is being streamed (show indicator immediately) + /// This is called when the provider starts receiving a tool call but args are still streaming + fn print_tool_streaming_hint(&self, tool_name: &str); + + /// Signal that a tool call is still actively streaming (for blinking indicator) + /// This is called periodically while tool args are being received + fn print_tool_streaming_active(&self); + /// Flush any buffered output fn flush(&self); @@ -127,6 +135,8 @@ impl UiWriter for NullUiWriter { fn print_agent_prompt(&self) {} fn print_agent_response(&self, _content: &str) {} fn notify_sse_received(&self) {} + fn print_tool_streaming_hint(&self, _tool_name: &str) {} + fn print_tool_streaming_active(&self) {} fn flush(&self) {} fn finish_streaming_markdown(&self) {} fn wants_full_output(&self) -> bool { diff --git a/crates/g3-core/tests/duplicate_detection_test.rs b/crates/g3-core/tests/duplicate_detection_test.rs index 4d55cee..8d1a5e6 100644 --- a/crates/g3-core/tests/duplicate_detection_test.rs +++ b/crates/g3-core/tests/duplicate_detection_test.rs @@ -14,6 +14,7 @@ fn chunk(content: &str, finished: bool) -> CompletionChunk { tool_calls: None, usage: None, stop_reason: None, + tool_call_streaming: None, } } diff --git a/crates/g3-core/tests/incomplete_tool_call_test.rs b/crates/g3-core/tests/incomplete_tool_call_test.rs index a0e4a3d..580e41e 100644 --- a/crates/g3-core/tests/incomplete_tool_call_test.rs +++ b/crates/g3-core/tests/incomplete_tool_call_test.rs @@ -18,6 +18,7 @@ fn test_has_incomplete_tool_call_no_tool_pattern() { tool_calls: None, usage: None, stop_reason: None, + tool_call_streaming: None, }; parser.process_chunk(&chunk); assert!(!parser.has_incomplete_tool_call()); @@ -32,6 +33,7 @@ fn test_has_incomplete_tool_call_complete_tool_call() { tool_calls: None, usage: None, stop_reason: None, + tool_call_streaming: None, }; parser.process_chunk(&chunk); // Complete JSON should NOT be detected as incomplete @@ -48,6 +50,7 @@ fn test_has_incomplete_tool_call_truncated_tool_call() { tool_calls: None, usage: None, stop_reason: None, + tool_call_streaming: None, }; parser.process_chunk(&chunk); // Incomplete JSON should be detected @@ -64,6 +67,7 @@ fn test_has_incomplete_tool_call_truncated_mid_value() { tool_calls: None, usage: None, stop_reason: None, + tool_call_streaming: None, }; parser.process_chunk(&chunk); // Incomplete JSON should be detected @@ -82,6 +86,7 @@ fn test_has_incomplete_tool_call_with_text_before() { tool_calls: None, usage: None, stop_reason: None, + tool_call_streaming: None, }; parser.process_chunk(&chunk); // Incomplete JSON should be detected @@ -99,6 +104,7 @@ fn test_has_incomplete_tool_call_malformed_like_trace() { tool_calls: None, usage: None, stop_reason: None, + tool_call_streaming: None, }; parser.process_chunk(&chunk); // Truncated JSON (missing closing braces) should be detected as incomplete @@ -120,6 +126,7 @@ fn test_has_unexecuted_tool_call_no_tool_pattern() { tool_calls: None, usage: None, stop_reason: None, + tool_call_streaming: None, }; parser.process_chunk(&chunk); assert!(!parser.has_unexecuted_tool_call()); @@ -134,6 +141,7 @@ fn test_has_unexecuted_tool_call_complete_tool_call() { tool_calls: None, usage: None, stop_reason: None, + tool_call_streaming: None, }; parser.process_chunk(&chunk); // Complete JSON tool call that wasn't executed should be detected @@ -149,6 +157,7 @@ fn test_has_unexecuted_tool_call_incomplete_json() { tool_calls: None, usage: None, stop_reason: None, + tool_call_streaming: None, }; parser.process_chunk(&chunk); // Incomplete JSON should NOT be detected as unexecuted (it's incomplete, not unexecuted) @@ -167,6 +176,7 @@ Some trailing text after the JSON"#.to_string(), tool_calls: None, usage: None, stop_reason: None, + tool_call_streaming: None, }; parser.process_chunk(&chunk); // Complete JSON tool call should be detected even with trailing text @@ -186,6 +196,7 @@ I'll execute this command now."#.to_string(), tool_calls: None, usage: None, stop_reason: None, + tool_call_streaming: None, }; parser.process_chunk(&chunk); // Complete JSON tool call should be detected diff --git a/crates/g3-core/tests/parser_sanitization_test.rs b/crates/g3-core/tests/parser_sanitization_test.rs index a94cb11..eaa6351 100644 --- a/crates/g3-core/tests/parser_sanitization_test.rs +++ b/crates/g3-core/tests/parser_sanitization_test.rs @@ -354,6 +354,7 @@ mod streaming_repro { tool_calls: None, usage: None, stop_reason: None, + tool_call_streaming: None, } } diff --git a/crates/g3-core/tests/stream_completion_characterization_test.rs b/crates/g3-core/tests/stream_completion_characterization_test.rs index 13fc7c1..33be9b6 100644 --- a/crates/g3-core/tests/stream_completion_characterization_test.rs +++ b/crates/g3-core/tests/stream_completion_characterization_test.rs @@ -46,6 +46,7 @@ mod streaming_parser_characterization { tool_calls: None, usage: None, stop_reason: None, + tool_call_streaming: None, }; let tools = parser.process_chunk(&chunk); @@ -67,6 +68,7 @@ mod streaming_parser_characterization { tool_calls: None, usage: None, stop_reason: None, + tool_call_streaming: None, }; let tools1 = parser.process_chunk(&chunk1); assert!(tools1.is_empty(), "No tool call yet"); @@ -78,6 +80,7 @@ mod streaming_parser_characterization { tool_calls: None, usage: None, stop_reason: None, + tool_call_streaming: None, }; let tools2 = parser.process_chunk(&chunk2); assert_eq!(tools2.len(), 1, "Should detect tool call"); @@ -101,6 +104,7 @@ mod streaming_parser_characterization { tool_calls: None, usage: None, stop_reason: None, + tool_call_streaming: None, }; parser.process_chunk(&chunk); @@ -122,6 +126,7 @@ mod streaming_parser_characterization { tool_calls: None, usage: None, stop_reason: None, + tool_call_streaming: None, }; // Process but don't execute @@ -144,6 +149,7 @@ mod streaming_parser_characterization { tool_calls: None, usage: None, stop_reason: None, + tool_call_streaming: None, }; let _tools = parser.process_chunk(&chunk); @@ -168,6 +174,7 @@ mod streaming_parser_characterization { tool_calls: None, usage: None, stop_reason: None, + tool_call_streaming: None, }; parser.process_chunk(&chunk); diff --git a/crates/g3-core/tests/streaming_parser_test.rs b/crates/g3-core/tests/streaming_parser_test.rs index d83b62e..1e8aec0 100644 --- a/crates/g3-core/tests/streaming_parser_test.rs +++ b/crates/g3-core/tests/streaming_parser_test.rs @@ -18,6 +18,7 @@ fn chunk(content: &str, finished: bool) -> CompletionChunk { tool_calls: None, usage: None, stop_reason: None, + tool_call_streaming: None, } } diff --git a/crates/g3-core/tests/todo_staleness_test.rs b/crates/g3-core/tests/todo_staleness_test.rs index b2077b1..5f786ba 100644 --- a/crates/g3-core/tests/todo_staleness_test.rs +++ b/crates/g3-core/tests/todo_staleness_test.rs @@ -81,6 +81,8 @@ impl UiWriter for MockUiWriter { .push(format!("CHOICE: {} Options: {:?}", message, options)); self.choice_responses.lock().unwrap().pop().unwrap_or(0) } + fn print_tool_streaming_hint(&self, _tool_name: &str) {} + fn print_tool_streaming_active(&self) {} } #[tokio::test] diff --git a/crates/g3-core/tests/tool_result_poisoning_test.rs b/crates/g3-core/tests/tool_result_poisoning_test.rs index f8a22ea..7599032 100644 --- a/crates/g3-core/tests/tool_result_poisoning_test.rs +++ b/crates/g3-core/tests/tool_result_poisoning_test.rs @@ -18,6 +18,7 @@ fn chunk(content: &str, finished: bool) -> CompletionChunk { tool_calls: None, usage: None, stop_reason: None, + tool_call_streaming: None, } } diff --git a/crates/g3-planner/src/llm.rs b/crates/g3-planner/src/llm.rs index 16691cd..5a3fb04 100644 --- a/crates/g3-planner/src/llm.rs +++ b/crates/g3-planner/src/llm.rs @@ -292,6 +292,14 @@ impl g3_core::ui_writer::UiWriter for PlannerUiWriter { // The "Thinking..." status was causing overwrites } + fn print_tool_streaming_hint(&self, _tool_name: &str) { + // No-op for planner - we don't show streaming hints + } + + fn print_tool_streaming_active(&self) { + // No-op for planner - we don't show streaming hints + } + fn flush(&self) { use std::io::Write; std::io::stdout().flush().ok(); diff --git a/crates/g3-providers/src/anthropic.rs b/crates/g3-providers/src/anthropic.rs index a1d4dc3..9835ed1 100644 --- a/crates/g3-providers/src/anthropic.rs +++ b/crates/g3-providers/src/anthropic.rs @@ -112,7 +112,7 @@ use tracing::{debug, error}; use crate::{ streaming::{ decode_utf8_streaming, make_final_chunk, make_final_chunk_with_reason, make_text_chunk, - make_tool_chunk, + make_tool_chunk, make_tool_streaming_active, make_tool_streaming_hint, }, CompletionChunk, CompletionRequest, CompletionResponse, CompletionStream, LLMProvider, Message, MessageRole, Tool, ToolCall, Usage, @@ -512,6 +512,12 @@ impl AnthropicProvider { } else { // Arguments are empty, we'll accumulate them from partial_json debug!("Tool call has empty args, will accumulate from partial_json"); + // Send a streaming hint so the UI can show the tool name immediately + let hint_chunk = make_tool_streaming_hint(name.clone()); + if tx.send(Ok(hint_chunk)).await.is_err() { + debug!("Receiver dropped, stopping stream"); + return accumulated_usage; + } current_tool_calls.push(tool_call); partial_tool_json.clear(); } @@ -550,6 +556,12 @@ impl AnthropicProvider { "Accumulated tool JSON: {}", partial_tool_json ); + // Send an active hint to trigger UI blink + let active_chunk = make_tool_streaming_active(); + if tx.send(Ok(active_chunk)).await.is_err() { + debug!("Receiver dropped, stopping stream"); + return accumulated_usage; + } } } } diff --git a/crates/g3-providers/src/databricks.rs b/crates/g3-providers/src/databricks.rs index bb25893..db0dbcc 100644 --- a/crates/g3-providers/src/databricks.rs +++ b/crates/g3-providers/src/databricks.rs @@ -494,6 +494,7 @@ impl DatabricksProvider { usage: None, tool_calls: None, stop_reason: None, + tool_call_streaming: None, }; if tx.send(Ok(text_chunk)).await.is_err() { debug!("Receiver dropped"); diff --git a/crates/g3-providers/src/lib.rs b/crates/g3-providers/src/lib.rs index 8dbc774..ccfc6f5 100644 --- a/crates/g3-providers/src/lib.rs +++ b/crates/g3-providers/src/lib.rs @@ -205,6 +205,8 @@ pub struct CompletionChunk { pub usage: Option, // Add usage tracking for streaming /// Stop reason from the API (e.g., "end_turn", "max_tokens", "stop_sequence") pub stop_reason: Option, + /// Tool call currently being streamed (name only, for UI hint) + pub tool_call_streaming: Option, } #[derive(Debug, Clone, Serialize, Deserialize)] diff --git a/crates/g3-providers/src/openai.rs b/crates/g3-providers/src/openai.rs index 8dd05b0..c95471f 100644 --- a/crates/g3-providers/src/openai.rs +++ b/crates/g3-providers/src/openai.rs @@ -158,6 +158,7 @@ impl OpenAIProvider { tool_calls, usage: accumulated_usage.clone(), stop_reason: None, // TODO: Extract from OpenAI response + tool_call_streaming: None, }; let _ = tx.send(Ok(final_chunk)).await; } diff --git a/crates/g3-providers/src/streaming.rs b/crates/g3-providers/src/streaming.rs index 15a3762..1fcecf3 100644 --- a/crates/g3-providers/src/streaming.rs +++ b/crates/g3-providers/src/streaming.rs @@ -62,6 +62,7 @@ pub fn make_final_chunk(tool_calls: Vec, usage: Option) -> Comp Some(tool_calls) }, stop_reason: None, + tool_call_streaming: None, } } @@ -77,6 +78,7 @@ pub fn make_final_chunk_with_reason(tool_calls: Vec, usage: Option CompletionChunk { usage: None, tool_calls: None, stop_reason: None, + tool_call_streaming: None, } } @@ -99,5 +102,31 @@ pub fn make_tool_chunk(tool_calls: Vec) -> CompletionChunk { usage: None, tool_calls: Some(tool_calls), stop_reason: None, + tool_call_streaming: None, + } +} + +/// Create a hint chunk indicating a tool call is being streamed. +pub fn make_tool_streaming_hint(tool_name: String) -> CompletionChunk { + CompletionChunk { + content: String::new(), + finished: false, + usage: None, + tool_calls: None, + stop_reason: None, + tool_call_streaming: Some(tool_name), + } +} + +/// Create a hint chunk indicating a tool call is still actively streaming. +/// This is used to trigger UI updates (like blinking indicators) during long tool calls. +pub fn make_tool_streaming_active() -> CompletionChunk { + CompletionChunk { + content: String::new(), + finished: false, + usage: None, + tool_calls: None, + stop_reason: None, + tool_call_streaming: Some(String::new()), // Empty string signals "active" vs "detected" } }