refactor(g3-core): extract 4 modules from monolithic lib.rs
Reduce lib.rs from 7481 to 6557 lines (-12.4%) by extracting: - paths.rs: Session/workspace path utilities (get_todo_path, get_logs_dir, etc.) - streaming_parser.rs: StreamingToolParser for LLM response parsing - utils.rs: Diff parsing and shell escaping utilities - webdriver_session.rs: Unified Safari/Chrome WebDriver abstraction All public APIs preserved via re-exports for backward compatibility. Added 13 new unit tests across extracted modules. All 225 tests pass.
This commit is contained in:
413
crates/g3-core/src/streaming_parser.rs
Normal file
413
crates/g3-core/src/streaming_parser.rs
Normal file
@@ -0,0 +1,413 @@
|
||||
//! Streaming tool parser for processing LLM response chunks.
|
||||
//!
|
||||
//! This module handles parsing of tool calls from streaming LLM responses,
|
||||
//! supporting both native tool calls and JSON-based fallback parsing.
|
||||
|
||||
use tracing::debug;
|
||||
|
||||
use crate::ToolCall;
|
||||
|
||||
/// Patterns used to detect JSON tool calls in text.
|
||||
/// These cover common whitespace variations in JSON formatting.
|
||||
const TOOL_CALL_PATTERNS: [&str; 4] = [
|
||||
r#"{"tool":"#,
|
||||
r#"{ "tool":"#,
|
||||
r#"{"tool" :"#,
|
||||
r#"{ "tool" :"#,
|
||||
];
|
||||
|
||||
/// Modern streaming tool parser that properly handles native tool calls and SSE chunks.
|
||||
#[derive(Debug)]
|
||||
pub struct StreamingToolParser {
|
||||
/// Buffer for accumulating text content
|
||||
text_buffer: String,
|
||||
/// Position in text_buffer up to which tool calls have been consumed/executed.
|
||||
/// This prevents has_unexecuted_tool_call() from returning true for already-executed tools.
|
||||
last_consumed_position: usize,
|
||||
/// Whether we've received a message_stop event
|
||||
message_stopped: bool,
|
||||
/// Whether we're currently in a JSON tool call (for fallback parsing)
|
||||
in_json_tool_call: bool,
|
||||
/// Start position of JSON tool call (for fallback parsing)
|
||||
json_tool_start: Option<usize>,
|
||||
}
|
||||
|
||||
impl Default for StreamingToolParser {
|
||||
fn default() -> Self {
|
||||
Self::new()
|
||||
}
|
||||
}
|
||||
|
||||
impl StreamingToolParser {
|
||||
pub fn new() -> Self {
|
||||
Self {
|
||||
text_buffer: String::new(),
|
||||
last_consumed_position: 0,
|
||||
message_stopped: false,
|
||||
in_json_tool_call: false,
|
||||
json_tool_start: None,
|
||||
}
|
||||
}
|
||||
|
||||
/// Find the starting position of the last tool call pattern in the given text.
|
||||
/// Returns None if no tool call pattern is found.
|
||||
fn find_last_tool_call_start(text: &str) -> Option<usize> {
|
||||
let mut best_start: Option<usize> = None;
|
||||
for pattern in &TOOL_CALL_PATTERNS {
|
||||
if let Some(pos) = text.rfind(pattern) {
|
||||
if best_start.map_or(true, |best| pos > best) {
|
||||
best_start = Some(pos);
|
||||
}
|
||||
}
|
||||
}
|
||||
best_start
|
||||
}
|
||||
|
||||
/// Find the starting position of the FIRST tool call pattern in the given text.
|
||||
/// Returns None if no tool call pattern is found.
|
||||
fn find_first_tool_call_start(text: &str) -> Option<usize> {
|
||||
let mut best_start: Option<usize> = None;
|
||||
for pattern in &TOOL_CALL_PATTERNS {
|
||||
if let Some(pos) = text.find(pattern) {
|
||||
if best_start.map_or(true, |best| pos < best) {
|
||||
best_start = Some(pos);
|
||||
}
|
||||
}
|
||||
}
|
||||
best_start
|
||||
}
|
||||
|
||||
/// Validate that tool call args don't contain message-like content.
|
||||
/// This detects malformed tool calls where agent messages got mixed into args.
|
||||
fn has_message_like_keys(args: &serde_json::Map<String, serde_json::Value>) -> bool {
|
||||
args.keys().any(|key| {
|
||||
key.len() > 100
|
||||
|| key.contains('\n')
|
||||
|| key.contains("I'll")
|
||||
|| key.contains("Let me")
|
||||
|| key.contains("Here's")
|
||||
|| key.contains("I can")
|
||||
|| key.contains("I need")
|
||||
|| key.contains("First")
|
||||
|| key.contains("Now")
|
||||
|| key.contains("The ")
|
||||
})
|
||||
}
|
||||
|
||||
/// Process a streaming chunk and return completed tool calls if any.
|
||||
pub fn process_chunk(&mut self, chunk: &g3_providers::CompletionChunk) -> Vec<ToolCall> {
|
||||
let mut completed_tools = Vec::new();
|
||||
|
||||
// Add text content to buffer
|
||||
if !chunk.content.is_empty() {
|
||||
self.text_buffer.push_str(&chunk.content);
|
||||
}
|
||||
|
||||
// Handle native tool calls - return them immediately when received.
|
||||
// This allows tools to be executed as soon as they're fully parsed,
|
||||
// preventing duplicate tool calls from being accumulated.
|
||||
if let Some(ref tool_calls) = chunk.tool_calls {
|
||||
debug!("Received native tool calls: {:?}", tool_calls);
|
||||
|
||||
// Convert and return tool calls immediately
|
||||
for tool_call in tool_calls {
|
||||
let converted_tool = ToolCall {
|
||||
tool: tool_call.tool.clone(),
|
||||
args: tool_call.args.clone(),
|
||||
};
|
||||
completed_tools.push(converted_tool);
|
||||
}
|
||||
}
|
||||
|
||||
// Check if message is finished/stopped
|
||||
if chunk.finished {
|
||||
self.message_stopped = true;
|
||||
debug!("Message finished, processing accumulated tool calls");
|
||||
|
||||
// When stream finishes, find ALL JSON tool calls in the accumulated buffer
|
||||
if completed_tools.is_empty() && !self.text_buffer.is_empty() {
|
||||
let all_tools = self.try_parse_all_json_tool_calls_from_buffer();
|
||||
if !all_tools.is_empty() {
|
||||
debug!(
|
||||
"Found {} JSON tool calls in buffer at stream end",
|
||||
all_tools.len()
|
||||
);
|
||||
completed_tools.extend(all_tools);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Fallback: Try to parse JSON tool calls from current chunk content if no native tool calls
|
||||
if completed_tools.is_empty() && !chunk.content.is_empty() && !chunk.finished {
|
||||
if let Some(json_tool) = self.try_parse_json_tool_call(&chunk.content) {
|
||||
completed_tools.push(json_tool);
|
||||
}
|
||||
}
|
||||
|
||||
completed_tools
|
||||
}
|
||||
|
||||
/// Fallback method to parse JSON tool calls from text content.
|
||||
fn try_parse_json_tool_call(&mut self, _content: &str) -> Option<ToolCall> {
|
||||
// If we're not currently in a JSON tool call, look for the start
|
||||
if !self.in_json_tool_call {
|
||||
if let Some(pos) = Self::find_last_tool_call_start(&self.text_buffer) {
|
||||
debug!("Found JSON tool call pattern at position {}", pos);
|
||||
self.in_json_tool_call = true;
|
||||
self.json_tool_start = Some(pos);
|
||||
}
|
||||
}
|
||||
|
||||
// If we're in a JSON tool call, try to find the end and parse it
|
||||
if self.in_json_tool_call {
|
||||
if let Some(start_pos) = self.json_tool_start {
|
||||
let json_text = &self.text_buffer[start_pos..];
|
||||
|
||||
// Try to find a complete JSON object
|
||||
if let Some(end_pos) = Self::find_complete_json_object_end(json_text) {
|
||||
let json_str = &json_text[..=end_pos];
|
||||
debug!("Attempting to parse JSON tool call: {}", json_str);
|
||||
|
||||
// Try to parse as a ToolCall
|
||||
if let Ok(tool_call) = serde_json::from_str::<ToolCall>(json_str) {
|
||||
// Validate that args is an object with reasonable keys
|
||||
if let Some(args_obj) = tool_call.args.as_object() {
|
||||
if Self::has_message_like_keys(args_obj) {
|
||||
debug!(
|
||||
"Detected malformed tool call with message-like keys, skipping"
|
||||
);
|
||||
self.in_json_tool_call = false;
|
||||
self.json_tool_start = None;
|
||||
return None;
|
||||
}
|
||||
|
||||
debug!("Successfully parsed valid JSON tool call: {:?}", tool_call);
|
||||
self.in_json_tool_call = false;
|
||||
self.json_tool_start = None;
|
||||
return Some(tool_call);
|
||||
}
|
||||
debug!("Tool call args is not an object, skipping");
|
||||
} else {
|
||||
debug!("Failed to parse JSON tool call: {}", json_str);
|
||||
}
|
||||
// Reset and continue looking
|
||||
self.in_json_tool_call = false;
|
||||
self.json_tool_start = None;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
None
|
||||
}
|
||||
|
||||
/// Parse ALL JSON tool calls from the accumulated text buffer.
|
||||
/// This finds all complete tool calls, not just the last one.
|
||||
fn try_parse_all_json_tool_calls_from_buffer(&self) -> Vec<ToolCall> {
|
||||
let mut tool_calls = Vec::new();
|
||||
let mut search_start = 0;
|
||||
|
||||
while search_start < self.text_buffer.len() {
|
||||
let search_text = &self.text_buffer[search_start..];
|
||||
|
||||
// Find the next tool call pattern
|
||||
if let Some(relative_pos) = Self::find_first_tool_call_start(search_text) {
|
||||
let abs_start = search_start + relative_pos;
|
||||
let json_text = &self.text_buffer[abs_start..];
|
||||
|
||||
// Try to find a complete JSON object
|
||||
if let Some(end_pos) = Self::find_complete_json_object_end(json_text) {
|
||||
let json_str = &json_text[..=end_pos];
|
||||
|
||||
if let Ok(tool_call) = serde_json::from_str::<ToolCall>(json_str) {
|
||||
if let Some(args_obj) = tool_call.args.as_object() {
|
||||
if !Self::has_message_like_keys(args_obj) {
|
||||
debug!(
|
||||
"Found tool call at position {}: {:?}",
|
||||
abs_start, tool_call.tool
|
||||
);
|
||||
tool_calls.push(tool_call);
|
||||
}
|
||||
}
|
||||
}
|
||||
// Move past this tool call
|
||||
search_start = abs_start + end_pos + 1;
|
||||
} else {
|
||||
// Incomplete JSON, stop searching
|
||||
break;
|
||||
}
|
||||
} else {
|
||||
// No more tool call patterns found
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
tool_calls
|
||||
}
|
||||
|
||||
/// Get the accumulated text content (excluding tool calls).
|
||||
pub fn get_text_content(&self) -> &str {
|
||||
&self.text_buffer
|
||||
}
|
||||
|
||||
/// Get content before a specific position (for display purposes).
|
||||
pub fn get_content_before_position(&self, pos: usize) -> String {
|
||||
if pos <= self.text_buffer.len() {
|
||||
self.text_buffer[..pos].to_string()
|
||||
} else {
|
||||
self.text_buffer.clone()
|
||||
}
|
||||
}
|
||||
|
||||
/// Check if the message has been stopped/finished.
|
||||
pub fn is_message_stopped(&self) -> bool {
|
||||
self.message_stopped
|
||||
}
|
||||
|
||||
/// Check if the text buffer contains an incomplete JSON tool call.
|
||||
/// This detects cases where the LLM started emitting a tool call but the stream ended
|
||||
/// before the JSON was complete (truncated output).
|
||||
pub fn has_incomplete_tool_call(&self) -> bool {
|
||||
// Only check the unconsumed portion of the buffer
|
||||
let unchecked_buffer = &self.text_buffer[self.last_consumed_position..];
|
||||
if let Some(start_pos) = Self::find_last_tool_call_start(unchecked_buffer) {
|
||||
let json_text = &unchecked_buffer[start_pos..];
|
||||
// If NOT complete, it's an incomplete tool call
|
||||
Self::find_complete_json_object_end(json_text).is_none()
|
||||
} else {
|
||||
false
|
||||
}
|
||||
}
|
||||
|
||||
/// Check if the text buffer contains an unexecuted tool call.
|
||||
/// This detects cases where the LLM emitted a complete tool call JSON
|
||||
/// but it wasn't parsed/executed (e.g., due to parsing issues).
|
||||
pub fn has_unexecuted_tool_call(&self) -> bool {
|
||||
// Only check the unconsumed portion of the buffer
|
||||
let unchecked_buffer = &self.text_buffer[self.last_consumed_position..];
|
||||
if let Some(start_pos) = Self::find_last_tool_call_start(unchecked_buffer) {
|
||||
let json_text = &unchecked_buffer[start_pos..];
|
||||
// If the JSON IS complete, it means there's an unexecuted tool call
|
||||
if let Some(json_end) = Self::find_complete_json_object_end(json_text) {
|
||||
let json_only = &json_text[..=json_end];
|
||||
return serde_json::from_str::<serde_json::Value>(json_only).is_ok();
|
||||
}
|
||||
}
|
||||
false
|
||||
}
|
||||
|
||||
/// Mark all tool calls up to the current buffer position as consumed/executed.
|
||||
/// This prevents has_unexecuted_tool_call() from returning true for already-executed tools.
|
||||
pub fn mark_tool_calls_consumed(&mut self) {
|
||||
self.last_consumed_position = self.text_buffer.len();
|
||||
}
|
||||
|
||||
/// Find the end position (byte index) of a complete JSON object in the text.
|
||||
/// Returns None if no complete JSON object is found.
|
||||
pub fn find_complete_json_object_end(text: &str) -> Option<usize> {
|
||||
let mut brace_count = 0;
|
||||
let mut in_string = false;
|
||||
let mut escape_next = false;
|
||||
let mut found_start = false;
|
||||
|
||||
for (i, ch) in text.char_indices() {
|
||||
if escape_next {
|
||||
escape_next = false;
|
||||
continue;
|
||||
}
|
||||
|
||||
match ch {
|
||||
'\\' => escape_next = true,
|
||||
'"' if !escape_next => in_string = !in_string,
|
||||
'{' if !in_string => {
|
||||
brace_count += 1;
|
||||
found_start = true;
|
||||
}
|
||||
'}' if !in_string => {
|
||||
brace_count -= 1;
|
||||
if brace_count == 0 && found_start {
|
||||
return Some(i); // Return the byte index of the closing brace
|
||||
}
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
|
||||
None // No complete JSON object found
|
||||
}
|
||||
|
||||
/// Reset the parser state for a new message.
|
||||
pub fn reset(&mut self) {
|
||||
self.text_buffer.clear();
|
||||
self.last_consumed_position = 0;
|
||||
self.message_stopped = false;
|
||||
self.in_json_tool_call = false;
|
||||
self.json_tool_start = None;
|
||||
}
|
||||
|
||||
/// Get the current text buffer length (for position tracking).
|
||||
pub fn text_buffer_len(&self) -> usize {
|
||||
self.text_buffer.len()
|
||||
}
|
||||
|
||||
/// Check if currently parsing a JSON tool call (for debugging).
|
||||
pub fn is_in_json_tool_call(&self) -> bool {
|
||||
self.in_json_tool_call
|
||||
}
|
||||
|
||||
/// Get the JSON tool start position (for debugging).
|
||||
pub fn json_tool_start_position(&self) -> Option<usize> {
|
||||
self.json_tool_start
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn test_find_complete_json_object_end_simple() {
|
||||
let text = r#"{"tool":"shell","args":{"command":"ls"}}"#;
|
||||
assert_eq!(
|
||||
StreamingToolParser::find_complete_json_object_end(text),
|
||||
Some(text.len() - 1)
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_find_complete_json_object_end_nested() {
|
||||
let text = r#"{"tool":"write","args":{"content":"{nested}"}}"#;
|
||||
assert_eq!(
|
||||
StreamingToolParser::find_complete_json_object_end(text),
|
||||
Some(text.len() - 1)
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_find_complete_json_object_end_incomplete() {
|
||||
let text = r#"{"tool":"shell","args":{"command":"ls""#;
|
||||
assert_eq!(StreamingToolParser::find_complete_json_object_end(text), None);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_tool_call_patterns() {
|
||||
// Test that all patterns are detected
|
||||
assert!(StreamingToolParser::find_first_tool_call_start(r#"{"tool":"test"}"#).is_some());
|
||||
assert!(StreamingToolParser::find_first_tool_call_start(r#"{ "tool":"test"}"#).is_some());
|
||||
assert!(StreamingToolParser::find_first_tool_call_start(r#"{"tool" :"test"}"#).is_some());
|
||||
assert!(StreamingToolParser::find_first_tool_call_start(r#"{ "tool" :"test"}"#).is_some());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_parser_reset() {
|
||||
let mut parser = StreamingToolParser::new();
|
||||
parser.text_buffer = "some content".to_string();
|
||||
parser.message_stopped = true;
|
||||
parser.last_consumed_position = 5;
|
||||
|
||||
parser.reset();
|
||||
|
||||
assert!(parser.text_buffer.is_empty());
|
||||
assert!(!parser.message_stopped);
|
||||
assert_eq!(parser.last_consumed_position, 0);
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user