From 3776ed847eaaf1b349cec2084ffe478e663a2af1 Mon Sep 17 00:00:00 2001 From: "Dhanji R. Prasanna" Date: Wed, 7 Jan 2026 13:01:03 +1100 Subject: [PATCH] refactor: use shared streaming helpers in openai and embedded providers Agent: carmack openai.rs: - Use make_text_chunk() for streaming text content - Use make_final_chunk() for final completion chunk - Simplify tool_calls conversion logic embedded.rs: - Use make_text_chunk() for all 4 streaming text chunks - Use make_final_chunk() for final completion chunk - Remove unused CompletionChunk import Net reduction: 35 lines removed All tests pass. Behavior unchanged. --- crates/g3-providers/src/embedded.rs | 38 ++++++----------------------- crates/g3-providers/src/openai.rs | 27 ++++++-------------- 2 files changed, 15 insertions(+), 50 deletions(-) diff --git a/crates/g3-providers/src/embedded.rs b/crates/g3-providers/src/embedded.rs index 999e489..9ac9882 100644 --- a/crates/g3-providers/src/embedded.rs +++ b/crates/g3-providers/src/embedded.rs @@ -1,6 +1,7 @@ use crate::{ - CompletionChunk, CompletionRequest, CompletionResponse, CompletionStream, LLMProvider, Message, + CompletionRequest, CompletionResponse, CompletionStream, LLMProvider, Message, MessageRole, Usage, + streaming::{make_text_chunk, make_final_chunk}, }; use anyhow::Result; use llama_cpp::{ @@ -699,12 +700,7 @@ impl LLMProvider for EmbeddedProvider { if accumulated_text.len() > already_sent_len { let remaining_to_send = &accumulated_text[already_sent_len..]; if !remaining_to_send.is_empty() { - let chunk = CompletionChunk { - content: remaining_to_send.to_string(), - finished: false, - usage: None, - tool_calls: None, - }; + let chunk = make_text_chunk(remaining_to_send.to_string()); let _ = tx.blocking_send(Ok(chunk)); } } @@ -726,12 +722,7 @@ impl LLMProvider for EmbeddedProvider { if clean_accumulated.len() > already_sent_len { let remaining_to_send = &clean_accumulated[already_sent_len..]; if !remaining_to_send.is_empty() { - let chunk = CompletionChunk { - content: remaining_to_send.to_string(), - finished: false, - usage: None, - tool_calls: None, - }; + let chunk = make_text_chunk(remaining_to_send.to_string()); let _ = tx.blocking_send(Ok(chunk)); } } @@ -761,12 +752,7 @@ impl LLMProvider for EmbeddedProvider { // Send the oldest part and keep only the recent part that might be a stop sequence let to_send = &unsent_tokens[..unsent_tokens.len() - 10]; if !to_send.is_empty() { - let chunk = CompletionChunk { - content: to_send.to_string(), - finished: false, - usage: None, - tool_calls: None, - }; + let chunk = make_text_chunk(to_send.to_string()); if tx.blocking_send(Ok(chunk)).is_err() { break; } @@ -777,12 +763,7 @@ impl LLMProvider for EmbeddedProvider { } else { // No potential stop sequence, send all unsent tokens if !unsent_tokens.is_empty() { - let chunk = CompletionChunk { - content: unsent_tokens.clone(), - finished: false, - usage: None, - tool_calls: None, - }; + let chunk = make_text_chunk(unsent_tokens.clone()); if tx.blocking_send(Ok(chunk)).is_err() { break; } @@ -798,12 +779,7 @@ impl LLMProvider for EmbeddedProvider { } // Send final chunk - let final_chunk = CompletionChunk { - content: String::new(), - finished: true, - usage: None, // Embedded models calculate usage differently - tool_calls: None, - }; + let final_chunk = make_final_chunk(vec![], None); let _ = tx.blocking_send(Ok(final_chunk)); }); diff --git a/crates/g3-providers/src/openai.rs b/crates/g3-providers/src/openai.rs index d322663..aa567ed 100644 --- a/crates/g3-providers/src/openai.rs +++ b/crates/g3-providers/src/openai.rs @@ -12,6 +12,7 @@ use tracing::{debug, error}; use crate::{ CompletionChunk, CompletionRequest, CompletionResponse, CompletionStream, LLMProvider, Message, MessageRole, Tool, ToolCall, Usage, + streaming::{make_text_chunk, make_final_chunk}, }; #[derive(Clone)] @@ -171,12 +172,7 @@ impl OpenAIProvider { if let Some(content) = &choice.delta.content { accumulated_content.push_str(content); - let chunk = CompletionChunk { - content: content.clone(), - finished: false, - tool_calls: None, - usage: None, - }; + let chunk = make_text_chunk(content.clone()); if tx.send(Ok(chunk)).await.is_err() { debug!("Receiver dropped, stopping stream"); return accumulated_usage; @@ -242,22 +238,15 @@ impl OpenAIProvider { // Send final chunk if we haven't already let tool_calls = if current_tool_calls.is_empty() { - None + vec![] } else { - Some( - current_tool_calls - .iter() - .filter_map(|tc| tc.to_tool_call()) - .collect(), - ) + current_tool_calls + .iter() + .filter_map(|tc| tc.to_tool_call()) + .collect() }; - let final_chunk = CompletionChunk { - content: String::new(), - finished: true, - tool_calls, - usage: accumulated_usage.clone(), - }; + let final_chunk = make_final_chunk(tool_calls, accumulated_usage.clone()); let _ = tx.send(Ok(final_chunk)).await; accumulated_usage