Readability refactor: extract mega-functions into focused helpers

Agent: carmack

4 files refactored, net -250 lines, all tests passing (417 + 71).

datalog.rs:
- Extract 7 predicate evaluation helpers from evaluate_predicate_datalog()
  (~200-line match → 12-line dispatch table)
- Extract rule_body_for_predicate() from format_datalog_program()
  (~75-line match → 2-line call)

invariants.rs:
- Extract 7 per-rule helpers from evaluate_predicate()
  (~230-line match → 12-line dispatch table)

envelope.rs:
- Simplify summary construction in verify_envelope()
- Eliminate redundant clone in stamp_envelope()

anthropic.rs:
- Introduce StreamState struct with 6 handler methods
- parse_streaming_response: ~290 lines → ~90 lines
- Max nesting depth reduced from 8 to 4 levels
This commit is contained in:
Dhanji R. Prasanna
2026-02-13 16:21:38 +11:00
parent 0410efd41b
commit 1ad74baaa5
4 changed files with 517 additions and 767 deletions

View File

@@ -135,6 +135,134 @@ pub struct AnthropicProvider {
thinking_budget_tokens: Option<u32>,
}
// ── SSE Stream State ────────────────────────────────────────────────────
// Mutable state threaded through Anthropic's SSE stream parser.
// Each `handle_*` method processes one event type and returns chunks to send.
struct StreamState {
tool_calls: Vec<ToolCall>,
partial_tool_json: String,
usage: Option<Usage>,
message_stopped: bool,
stop_reason: Option<String>,
}
impl StreamState {
fn new() -> Self {
Self {
tool_calls: Vec::new(),
partial_tool_json: String::new(),
usage: None,
message_stopped: false,
stop_reason: None,
}
}
fn handle_message_start(&mut self, event: &AnthropicStreamEvent) {
if let Some(message) = &event.message {
if let Some(u) = &message.usage {
self.usage = Some(Usage {
prompt_tokens: u.input_tokens,
completion_tokens: u.output_tokens,
total_tokens: u.input_tokens + u.output_tokens,
cache_creation_tokens: u.cache_creation_input_tokens,
cache_read_tokens: u.cache_read_input_tokens,
});
debug!("Captured usage from message_start: {:?}", self.usage);
}
}
}
/// Returns chunks to send for a content_block_start event.
fn handle_block_start(&mut self, event: AnthropicStreamEvent) -> Vec<Result<CompletionChunk>> {
let Some(content_block) = event.content_block else { return vec![] };
match content_block {
AnthropicContent::ToolUse { id, name, input } => {
debug!("Tool use block: id={}, name={}, input={:?}", id, name, input);
let tool_call = ToolCall { id: id.clone(), tool: name.clone(), args: input.clone() };
let has_complete_args = !input.is_null()
&& input != serde_json::Value::Object(serde_json::Map::new());
if has_complete_args {
debug!("Tool call has complete args, sending immediately");
vec![Ok(make_tool_chunk(vec![tool_call]))]
} else {
debug!("Tool call has empty args, will accumulate from partial_json");
let hint = make_tool_streaming_hint(name);
self.tool_calls.push(tool_call);
self.partial_tool_json.clear();
vec![Ok(hint)]
}
}
_ => {
debug!("Non-tool content block: {:?}", content_block);
vec![]
}
}
}
/// Returns chunks to send for a content_block_delta event.
fn handle_block_delta(&mut self, event: AnthropicStreamEvent) -> Vec<Result<CompletionChunk>> {
let Some(delta) = event.delta else { return vec![] };
let mut chunks = Vec::new();
if let Some(text) = delta.text {
debug!("Text chunk (len {})", text.len());
chunks.push(Ok(make_text_chunk(text)));
}
if let Some(json_fragment) = delta.partial_json {
debug!("Partial JSON: {}", json_fragment);
self.partial_tool_json.push_str(&json_fragment);
chunks.push(Ok(make_tool_streaming_active()));
}
chunks
}
/// Returns chunks to send when a content block finishes.
fn handle_block_stop(&mut self) -> Vec<Result<CompletionChunk>> {
// Finalize accumulated partial JSON into the last tool call's args
if !self.tool_calls.is_empty() && !self.partial_tool_json.is_empty() {
debug!("Parsing complete tool JSON: {}", self.partial_tool_json);
if let Ok(parsed) = serde_json::from_str::<serde_json::Value>(&self.partial_tool_json) {
if let Some(last) = self.tool_calls.last_mut() {
last.args = parsed;
debug!("Updated tool call with complete args: {:?}", last);
}
} else {
debug!("Failed to parse accumulated JSON: {}", self.partial_tool_json);
}
self.partial_tool_json.clear();
}
if self.tool_calls.is_empty() {
return vec![];
}
let chunk = make_tool_chunk(self.tool_calls.clone());
self.tool_calls.clear();
vec![Ok(chunk)]
}
fn handle_message_delta(&mut self, event: &AnthropicStreamEvent) {
if let Some(delta) = &event.delta {
if let Some(reason) = &delta.stop_reason {
debug!("Received stop_reason: {}", reason);
self.stop_reason = Some(reason.clone());
}
}
}
fn handle_message_stop(&mut self) -> Vec<Result<CompletionChunk>> {
debug!("Received message stop event");
self.message_stopped = true;
let chunk = make_final_chunk_with_reason(
self.tool_calls.clone(),
self.usage.clone(),
self.stop_reason.clone(),
);
vec![Ok(chunk)]
}
}
impl AnthropicProvider {
pub fn new(
api_key: String,
@@ -501,267 +629,76 @@ impl AnthropicProvider {
mut stream: impl futures_util::Stream<Item = reqwest::Result<Bytes>> + Unpin,
tx: mpsc::Sender<Result<CompletionChunk>>,
) -> Option<Usage> {
let mut buffer = String::new();
let mut current_tool_calls: Vec<ToolCall> = Vec::new();
let mut partial_tool_json = String::new(); // Accumulate partial JSON for tool calls
let mut accumulated_usage: Option<Usage> = None;
let mut byte_buffer = Vec::new(); // Buffer for incomplete UTF-8 sequences
let mut message_stopped = false; // Track if we've received message_stop
let mut stop_reason: Option<String> = None; // Track why the message stopped
let mut state = StreamState::new();
let mut line_buffer = String::new();
let mut byte_buffer: Vec<u8> = Vec::new();
while let Some(chunk_result) = stream.next().await {
match chunk_result {
Ok(chunk) => {
byte_buffer.extend_from_slice(&chunk);
let Some(chunk_str) = decode_utf8_streaming(&mut byte_buffer) else {
continue;
};
line_buffer.push_str(&chunk_str);
buffer.push_str(&chunk_str);
while let Some(line_end) = line_buffer.find('\n') {
let line = line_buffer[..line_end].trim().to_string();
line_buffer.drain(..line_end + 1);
// Process complete lines
while let Some(line_end) = buffer.find('\n') {
let line = buffer[..line_end].trim().to_string();
buffer.drain(..line_end + 1);
if line.is_empty() {
continue;
}
// If we've already sent the final chunk, skip processing more events
if message_stopped {
debug!("Skipping event after message_stop: {}", line);
continue;
}
// Parse Server-Sent Events format
if let Some(data) = line.strip_prefix("data: ") {
if data == "[DONE]" {
debug!("Received stream completion marker");
let final_chunk = make_final_chunk(
current_tool_calls.clone(),
accumulated_usage.clone(),
);
if tx.send(Ok(final_chunk)).await.is_err() {
debug!("Receiver dropped, stopping stream");
}
return accumulated_usage;
if line.is_empty() || state.message_stopped {
if state.message_stopped && !line.is_empty() {
debug!("Skipping event after message_stop: {}", line);
}
continue;
}
debug!("Raw Claude API JSON: {}", data);
let Some(data) = line.strip_prefix("data: ") else { continue };
match serde_json::from_str::<AnthropicStreamEvent>(data) {
Ok(event) => {
debug!(
"Parsed event type: {}, event: {:?}",
event.event_type, event
);
match event.event_type.as_str() {
"message_start" => {
// Extract usage data from message_start event
if let Some(message) = event.message {
if let Some(usage) = message.usage {
accumulated_usage = Some(Usage {
prompt_tokens: usage.input_tokens,
completion_tokens: usage.output_tokens,
total_tokens: usage.input_tokens
+ usage.output_tokens,
cache_creation_tokens: usage
.cache_creation_input_tokens,
cache_read_tokens: usage
.cache_read_input_tokens,
});
debug!(
"Captured usage from message_start: {:?}",
accumulated_usage
);
}
}
}
"content_block_start" => {
debug!(
"Received content_block_start event: {:?}",
event
);
if let Some(content_block) = event.content_block {
match content_block {
AnthropicContent::ToolUse {
id,
name,
input,
} => {
debug!("Found tool use in content_block_start: id={}, name={}, input={:?}", id, name, input);
// Stream completion marker
if data == "[DONE]" {
debug!("Received stream completion marker");
let final_chunk = make_final_chunk(state.tool_calls.clone(), state.usage.clone());
let _ = tx.send(Ok(final_chunk)).await;
return state.usage;
}
// For native tool calls, create the tool call immediately if we have complete args
// If args are empty, we'll wait for partial_json to accumulate them
let tool_call = ToolCall {
id: id.clone(),
tool: name.clone(),
args: input.clone(),
};
debug!("Raw Claude API JSON: {}", data);
// Check if we already have complete arguments
if !input.is_null()
&& input
!= serde_json::Value::Object(
serde_json::Map::new(),
)
{
// We have complete arguments, send the tool call immediately
debug!("Tool call has complete args, sending immediately: {:?}", tool_call);
let chunk =
make_tool_chunk(vec![tool_call]);
if tx.send(Ok(chunk)).await.is_err() {
debug!("Receiver dropped, stopping stream");
return accumulated_usage;
}
} else {
// Arguments are empty, we'll accumulate them from partial_json
debug!("Tool call has empty args, will accumulate from partial_json");
// Send a streaming hint so the UI can show the tool name immediately
let hint_chunk = make_tool_streaming_hint(name.clone());
if tx.send(Ok(hint_chunk)).await.is_err() {
debug!("Receiver dropped, stopping stream");
return accumulated_usage;
}
current_tool_calls.push(tool_call);
partial_tool_json.clear();
}
}
_ => {
debug!(
"Non-tool content block: {:?}",
content_block
);
}
}
}
}
"content_block_delta" => {
if let Some(delta) = event.delta {
if let Some(text) = delta.text {
debug!(
"Sending text chunk of length {}: '{}'",
text.len(),
text
);
let chunk = make_text_chunk(text);
if tx.send(Ok(chunk)).await.is_err() {
debug!("Receiver dropped, stopping stream");
return accumulated_usage;
}
}
// Handle partial JSON for tool calls
if let Some(partial_json) = delta.partial_json {
debug!(
"Received partial JSON: {}",
partial_json
);
partial_tool_json.push_str(&partial_json);
debug!(
"Accumulated tool JSON: {}",
partial_tool_json
);
// Send an active hint to trigger UI blink
let active_chunk = make_tool_streaming_active();
if tx.send(Ok(active_chunk)).await.is_err() {
debug!("Receiver dropped, stopping stream");
return accumulated_usage;
}
}
}
}
"content_block_stop" => {
// Tool call block is complete - now parse the accumulated JSON
if !current_tool_calls.is_empty()
&& !partial_tool_json.is_empty()
{
debug!(
"Parsing complete tool JSON: {}",
partial_tool_json
);
let event = match serde_json::from_str::<AnthropicStreamEvent>(data) {
Ok(e) => e,
Err(e) => {
debug!("Failed to parse stream event: {} - Data: {}", e, data);
continue;
}
};
// Parse the accumulated JSON and update the last tool call
if let Ok(parsed_args) =
serde_json::from_str::<serde_json::Value>(
&partial_tool_json,
)
{
if let Some(last_tool) =
current_tool_calls.last_mut()
{
last_tool.args = parsed_args;
debug!("Updated tool call with complete args: {:?}", last_tool);
}
} else {
debug!(
"Failed to parse accumulated JSON: {}",
partial_tool_json
);
}
debug!("Parsed event type: {}", event.event_type);
// Clear the accumulator
partial_tool_json.clear();
}
// Send the complete tool call
if !current_tool_calls.is_empty() {
let chunk =
make_tool_chunk(current_tool_calls.clone());
if tx.send(Ok(chunk)).await.is_err() {
debug!("Receiver dropped, stopping stream");
return accumulated_usage;
}
// Clear tool calls after sending to prevent duplicates at message_stop
current_tool_calls.clear();
}
}
"message_delta" => {
// message_delta contains the stop_reason and final usage
if let Some(delta) = &event.delta {
if let Some(reason) = &delta.stop_reason {
debug!("Received stop_reason: {}", reason);
stop_reason = Some(reason.clone());
}
}
// Usage is also in message_delta but we get it from message_start
}
"message_stop" => {
debug!("Received message stop event");
message_stopped = true;
let final_chunk = make_final_chunk_with_reason(
current_tool_calls.clone(),
accumulated_usage.clone(),
stop_reason.clone(),
);
if tx.send(Ok(final_chunk)).await.is_err() {
debug!("Receiver dropped, stopping stream");
}
// Don't return here - let the stream naturally exhaust
// This prevents dropping the sender prematurely
}
"error" => {
if let Some(error) = event.error {
error!("Anthropic API error: {:?}", error);
let _ = tx
.send(Err(anyhow!(
"Anthropic API error: {:?}",
error
)))
.await;
break; // Break to let stream exhaust naturally
}
}
_ => {
debug!("Ignoring event type: {}", event.event_type);
}
}
}
Err(e) => {
debug!("Failed to parse stream event: {} - Data: {}", e, data);
// Don't error out on parse failures, just continue
// Dispatch to per-event handlers; collect chunks to send
let chunks: Vec<Result<CompletionChunk>> = match event.event_type.as_str() {
"message_start" => { state.handle_message_start(&event); vec![] }
"content_block_start" => state.handle_block_start(event),
"content_block_delta" => state.handle_block_delta(event),
"content_block_stop" => state.handle_block_stop(),
"message_delta" => { state.handle_message_delta(&event); vec![] }
"message_stop" => state.handle_message_stop(),
"error" => {
if let Some(error) = event.error {
error!("Anthropic API error: {:?}", error);
let _ = tx.send(Err(anyhow!("Anthropic API error: {:?}", error))).await;
break;
}
vec![]
}
_ => { debug!("Ignoring event type: {}", event.event_type); vec![] }
};
// Send all chunks produced by the handler
for chunk in chunks {
if tx.send(chunk).await.is_err() {
debug!("Receiver dropped, stopping stream");
return state.usage;
}
}
}
@@ -769,18 +706,14 @@ impl AnthropicProvider {
Err(e) => {
error!("Stream error: {}", e);
let _ = tx.send(Err(anyhow!("Stream error: {}", e))).await;
// Don't return here either - let the stream exhaust naturally
// The error has been sent to the receiver, so it will handle it
// Breaking here ensures we clean up properly
break;
}
}
}
// Send final chunk if we haven't already
let final_chunk = make_final_chunk(current_tool_calls, accumulated_usage.clone());
let final_chunk = make_final_chunk(state.tool_calls, state.usage.clone());
let _ = tx.send(Ok(final_chunk)).await;
accumulated_usage
state.usage
}
}