refactor: use shared streaming helpers in openai and embedded providers

Agent: carmack

openai.rs:
- Use make_text_chunk() for streaming text content
- Use make_final_chunk() for final completion chunk
- Simplify tool_calls conversion logic

embedded.rs:
- Use make_text_chunk() for all 4 streaming text chunks
- Use make_final_chunk() for final completion chunk
- Remove unused CompletionChunk import

Net reduction: 35 lines removed
All tests pass. Behavior unchanged.
This commit is contained in:
Dhanji R. Prasanna
2026-01-07 13:01:03 +11:00
parent 2bf475960c
commit 3776ed847e
2 changed files with 15 additions and 50 deletions

View File

@@ -1,6 +1,7 @@
use crate::{ use crate::{
CompletionChunk, CompletionRequest, CompletionResponse, CompletionStream, LLMProvider, Message, CompletionRequest, CompletionResponse, CompletionStream, LLMProvider, Message,
MessageRole, Usage, MessageRole, Usage,
streaming::{make_text_chunk, make_final_chunk},
}; };
use anyhow::Result; use anyhow::Result;
use llama_cpp::{ use llama_cpp::{
@@ -699,12 +700,7 @@ impl LLMProvider for EmbeddedProvider {
if accumulated_text.len() > already_sent_len { if accumulated_text.len() > already_sent_len {
let remaining_to_send = &accumulated_text[already_sent_len..]; let remaining_to_send = &accumulated_text[already_sent_len..];
if !remaining_to_send.is_empty() { if !remaining_to_send.is_empty() {
let chunk = CompletionChunk { let chunk = make_text_chunk(remaining_to_send.to_string());
content: remaining_to_send.to_string(),
finished: false,
usage: None,
tool_calls: None,
};
let _ = tx.blocking_send(Ok(chunk)); let _ = tx.blocking_send(Ok(chunk));
} }
} }
@@ -726,12 +722,7 @@ impl LLMProvider for EmbeddedProvider {
if clean_accumulated.len() > already_sent_len { if clean_accumulated.len() > already_sent_len {
let remaining_to_send = &clean_accumulated[already_sent_len..]; let remaining_to_send = &clean_accumulated[already_sent_len..];
if !remaining_to_send.is_empty() { if !remaining_to_send.is_empty() {
let chunk = CompletionChunk { let chunk = make_text_chunk(remaining_to_send.to_string());
content: remaining_to_send.to_string(),
finished: false,
usage: None,
tool_calls: None,
};
let _ = tx.blocking_send(Ok(chunk)); let _ = tx.blocking_send(Ok(chunk));
} }
} }
@@ -761,12 +752,7 @@ impl LLMProvider for EmbeddedProvider {
// Send the oldest part and keep only the recent part that might be a stop sequence // Send the oldest part and keep only the recent part that might be a stop sequence
let to_send = &unsent_tokens[..unsent_tokens.len() - 10]; let to_send = &unsent_tokens[..unsent_tokens.len() - 10];
if !to_send.is_empty() { if !to_send.is_empty() {
let chunk = CompletionChunk { let chunk = make_text_chunk(to_send.to_string());
content: to_send.to_string(),
finished: false,
usage: None,
tool_calls: None,
};
if tx.blocking_send(Ok(chunk)).is_err() { if tx.blocking_send(Ok(chunk)).is_err() {
break; break;
} }
@@ -777,12 +763,7 @@ impl LLMProvider for EmbeddedProvider {
} else { } else {
// No potential stop sequence, send all unsent tokens // No potential stop sequence, send all unsent tokens
if !unsent_tokens.is_empty() { if !unsent_tokens.is_empty() {
let chunk = CompletionChunk { let chunk = make_text_chunk(unsent_tokens.clone());
content: unsent_tokens.clone(),
finished: false,
usage: None,
tool_calls: None,
};
if tx.blocking_send(Ok(chunk)).is_err() { if tx.blocking_send(Ok(chunk)).is_err() {
break; break;
} }
@@ -798,12 +779,7 @@ impl LLMProvider for EmbeddedProvider {
} }
// Send final chunk // Send final chunk
let final_chunk = CompletionChunk { let final_chunk = make_final_chunk(vec![], None);
content: String::new(),
finished: true,
usage: None, // Embedded models calculate usage differently
tool_calls: None,
};
let _ = tx.blocking_send(Ok(final_chunk)); let _ = tx.blocking_send(Ok(final_chunk));
}); });

View File

@@ -12,6 +12,7 @@ use tracing::{debug, error};
use crate::{ use crate::{
CompletionChunk, CompletionRequest, CompletionResponse, CompletionStream, LLMProvider, Message, CompletionChunk, CompletionRequest, CompletionResponse, CompletionStream, LLMProvider, Message,
MessageRole, Tool, ToolCall, Usage, MessageRole, Tool, ToolCall, Usage,
streaming::{make_text_chunk, make_final_chunk},
}; };
#[derive(Clone)] #[derive(Clone)]
@@ -171,12 +172,7 @@ impl OpenAIProvider {
if let Some(content) = &choice.delta.content { if let Some(content) = &choice.delta.content {
accumulated_content.push_str(content); accumulated_content.push_str(content);
let chunk = CompletionChunk { let chunk = make_text_chunk(content.clone());
content: content.clone(),
finished: false,
tool_calls: None,
usage: None,
};
if tx.send(Ok(chunk)).await.is_err() { if tx.send(Ok(chunk)).await.is_err() {
debug!("Receiver dropped, stopping stream"); debug!("Receiver dropped, stopping stream");
return accumulated_usage; return accumulated_usage;
@@ -242,22 +238,15 @@ impl OpenAIProvider {
// Send final chunk if we haven't already // Send final chunk if we haven't already
let tool_calls = if current_tool_calls.is_empty() { let tool_calls = if current_tool_calls.is_empty() {
None vec![]
} else { } else {
Some( current_tool_calls
current_tool_calls .iter()
.iter() .filter_map(|tc| tc.to_tool_call())
.filter_map(|tc| tc.to_tool_call()) .collect()
.collect(),
)
}; };
let final_chunk = CompletionChunk { let final_chunk = make_final_chunk(tool_calls, accumulated_usage.clone());
content: String::new(),
finished: true,
tool_calls,
usage: accumulated_usage.clone(),
};
let _ = tx.send(Ok(final_chunk)).await; let _ = tx.send(Ok(final_chunk)).await;
accumulated_usage accumulated_usage