fix unexpected EOF on streams

This commit is contained in:
Dhanji Prasanna
2025-11-04 16:28:41 +11:00
parent 631f3c16ca
commit 22a0090cdc
2 changed files with 51 additions and 6 deletions

View File

@@ -3135,17 +3135,33 @@ Template:
}
Err(e) => {
// Capture detailed streaming error information
let error_details =
format!("Streaming error at chunk {}: {}", chunks_received + 1, e);
error!("{}", error_details);
let error_msg = e.to_string();
let error_details = format!("Streaming error at chunk {}: {}", chunks_received + 1, error_msg);
error!("Error type: {}", std::any::type_name_of_val(&e));
error!("Parser state at error: text_buffer_len={}, native_tool_calls={}, message_stopped={}",
parser.text_buffer_len(), parser.native_tool_calls.len(), parser.is_message_stopped());
// Store the error for potential logging later
_last_error = Some(error_details);
_last_error = Some(error_details.clone());
// Check if this is a recoverable connection error
let is_connection_error = error_msg.contains("unexpected EOF")
|| error_msg.contains("connection")
|| error_msg.contains("chunk size line")
|| error_msg.contains("body error");
if is_connection_error {
warn!("Connection error at chunk {}, treating as end of stream", chunks_received + 1);
// If we have any content or tool calls, treat this as a graceful end
if chunks_received > 0 && (!parser.get_text_content().is_empty() || parser.native_tool_calls.len() > 0) {
warn!("Stream terminated unexpectedly but we have content, continuing");
break; // Break to process what we have
}
}
if tool_executed {
error!("{}", error_details);
warn!("Stream error after tool execution, attempting to continue");
break; // Break to outer loop to start new stream
} else {

View File

@@ -298,6 +298,7 @@ impl DatabricksProvider {
let mut current_tool_calls: std::collections::HashMap<usize, (String, String, String)> =
std::collections::HashMap::new(); // index -> (id, name, args)
let mut incomplete_data_line = String::new(); // Buffer for incomplete data: lines
let mut chunk_count = 0;
let accumulated_usage: Option<Usage> = None;
let mut byte_buffer = Vec::new(); // Buffer for incomplete UTF-8 sequences
@@ -305,6 +306,8 @@ impl DatabricksProvider {
match chunk_result {
Ok(chunk) => {
// Debug: Log raw bytes received
chunk_count += 1;
debug!("Processing chunk #{}", chunk_count);
debug!("Raw SSE bytes received: {} bytes", chunk.len());
// Append new bytes to our buffer
@@ -589,13 +592,39 @@ impl DatabricksProvider {
}
}
Err(e) => {
error!("Stream error: {}", e);
let _ = tx.send(Err(anyhow!("Stream error: {}", e))).await;
error!("Stream error at chunk {}: {}", chunk_count, e);
// Check if this is a connection error that might be recoverable
let error_msg = e.to_string();
if error_msg.contains("unexpected EOF") || error_msg.contains("connection") {
warn!("Connection terminated unexpectedly at chunk {}, treating as end of stream", chunk_count);
// Don't send error, just break and finalize
break;
} else {
let _ = tx.send(Err(anyhow!("Stream error: {}", e))).await;
}
return accumulated_usage;
}
}
}
// Log final state
debug!("Stream ended after {} chunks", chunk_count);
debug!("Final state: buffer_len={}, incomplete_data_line_len={}, byte_buffer_len={}",
buffer.len(), incomplete_data_line.len(), byte_buffer.len());
debug!("Accumulated tool calls: {}", current_tool_calls.len());
// If we have any remaining data in buffers, log it for debugging
if !buffer.is_empty() {
debug!("Remaining buffer content: {:?}", buffer);
}
if !byte_buffer.is_empty() {
debug!("Remaining byte buffer: {} bytes", byte_buffer.len());
}
if !incomplete_data_line.is_empty() {
debug!("Remaining incomplete data line: {:?}", incomplete_data_line);
}
// If we have any incomplete data line at the end, try to process it
if !incomplete_data_line.is_empty() {
debug!(