diff --git a/crates/g3-core/src/lib.rs b/crates/g3-core/src/lib.rs index e0d5a57..43c4548 100644 --- a/crates/g3-core/src/lib.rs +++ b/crates/g3-core/src/lib.rs @@ -3135,17 +3135,33 @@ Template: } Err(e) => { // Capture detailed streaming error information - let error_details = - format!("Streaming error at chunk {}: {}", chunks_received + 1, e); - error!("{}", error_details); + let error_msg = e.to_string(); + let error_details = format!("Streaming error at chunk {}: {}", chunks_received + 1, error_msg); + error!("Error type: {}", std::any::type_name_of_val(&e)); error!("Parser state at error: text_buffer_len={}, native_tool_calls={}, message_stopped={}", parser.text_buffer_len(), parser.native_tool_calls.len(), parser.is_message_stopped()); // Store the error for potential logging later - _last_error = Some(error_details); + _last_error = Some(error_details.clone()); + + // Check if this is a recoverable connection error + let is_connection_error = error_msg.contains("unexpected EOF") + || error_msg.contains("connection") + || error_msg.contains("chunk size line") + || error_msg.contains("body error"); + + if is_connection_error { + warn!("Connection error at chunk {}, treating as end of stream", chunks_received + 1); + // If we have any content or tool calls, treat this as a graceful end + if chunks_received > 0 && (!parser.get_text_content().is_empty() || parser.native_tool_calls.len() > 0) { + warn!("Stream terminated unexpectedly but we have content, continuing"); + break; // Break to process what we have + } + } if tool_executed { + error!("{}", error_details); warn!("Stream error after tool execution, attempting to continue"); break; // Break to outer loop to start new stream } else { diff --git a/crates/g3-providers/src/databricks.rs b/crates/g3-providers/src/databricks.rs index 50373d6..962822f 100644 --- a/crates/g3-providers/src/databricks.rs +++ b/crates/g3-providers/src/databricks.rs @@ -298,6 +298,7 @@ impl DatabricksProvider { let mut current_tool_calls: std::collections::HashMap = std::collections::HashMap::new(); // index -> (id, name, args) let mut incomplete_data_line = String::new(); // Buffer for incomplete data: lines + let mut chunk_count = 0; let accumulated_usage: Option = None; let mut byte_buffer = Vec::new(); // Buffer for incomplete UTF-8 sequences @@ -305,6 +306,8 @@ impl DatabricksProvider { match chunk_result { Ok(chunk) => { // Debug: Log raw bytes received + chunk_count += 1; + debug!("Processing chunk #{}", chunk_count); debug!("Raw SSE bytes received: {} bytes", chunk.len()); // Append new bytes to our buffer @@ -589,13 +592,39 @@ impl DatabricksProvider { } } Err(e) => { - error!("Stream error: {}", e); - let _ = tx.send(Err(anyhow!("Stream error: {}", e))).await; + error!("Stream error at chunk {}: {}", chunk_count, e); + + // Check if this is a connection error that might be recoverable + let error_msg = e.to_string(); + if error_msg.contains("unexpected EOF") || error_msg.contains("connection") { + warn!("Connection terminated unexpectedly at chunk {}, treating as end of stream", chunk_count); + // Don't send error, just break and finalize + break; + } else { + let _ = tx.send(Err(anyhow!("Stream error: {}", e))).await; + } return accumulated_usage; } } } + // Log final state + debug!("Stream ended after {} chunks", chunk_count); + debug!("Final state: buffer_len={}, incomplete_data_line_len={}, byte_buffer_len={}", + buffer.len(), incomplete_data_line.len(), byte_buffer.len()); + debug!("Accumulated tool calls: {}", current_tool_calls.len()); + + // If we have any remaining data in buffers, log it for debugging + if !buffer.is_empty() { + debug!("Remaining buffer content: {:?}", buffer); + } + if !byte_buffer.is_empty() { + debug!("Remaining byte buffer: {} bytes", byte_buffer.len()); + } + if !incomplete_data_line.is_empty() { + debug!("Remaining incomplete data line: {:?}", incomplete_data_line); + } + // If we have any incomplete data line at the end, try to process it if !incomplete_data_line.is_empty() { debug!(