diff --git a/.gitignore b/.gitignore index f9f70c3..a4020fc 100644 --- a/.gitignore +++ b/.gitignore @@ -3,6 +3,7 @@ debug target .build +appy/ # These are backup files generated by rustfmt **/*.rs.bk diff --git a/crates/g3-providers/src/ollama.rs b/crates/g3-providers/src/ollama.rs index 1807528..7967297 100644 --- a/crates/g3-providers/src/ollama.rs +++ b/crates/g3-providers/src/ollama.rs @@ -466,7 +466,7 @@ impl LLMProvider for OllamaProvider { async fn stream(&self, request: CompletionRequest) -> Result { debug!( - "Processing Ollama streaming request with {} messages", + "Processing Ollama request (non-streaming) with {} messages", request.messages.len() ); @@ -483,13 +483,13 @@ impl LLMProvider for OllamaProvider { let request_body = self.create_request_body( &request.messages, request.tools.as_deref(), - true, + false, // Use non-streaming mode to avoid streaming bugs max_tokens, temperature, )?; debug!( - "Sending streaming request to Ollama API: model={}, temperature={}", + "Sending request to Ollama API (stream=false): model={}, temperature={}", self.model, request_body.options.temperature ); @@ -499,7 +499,7 @@ impl LLMProvider for OllamaProvider { .json(&request_body) .send() .await - .map_err(|e| anyhow!("Failed to send streaming request to Ollama API: {}", e))?; + .map_err(|e| anyhow!("Failed to send request to Ollama API: {}", e))?; let status = response.status(); if !status.is_success() { @@ -510,12 +510,58 @@ impl LLMProvider for OllamaProvider { return Err(anyhow!("Ollama API error {}: {}", status, error_text)); } - let stream = response.bytes_stream(); + // For non-streaming, parse the complete JSON response + let response_text = response.text().await?; + debug!("Raw Ollama API response: {}", response_text); + + let ollama_response: OllamaResponse = + serde_json::from_str(&response_text).map_err(|e| { + anyhow!( + "Failed to parse Ollama response: {} - Response: {}", + e, + response_text + ) + })?; + let (tx, rx) = mpsc::channel(100); - let provider = self.clone(); tokio::spawn(async move { - provider.parse_streaming_response(stream, tx).await; + let content = ollama_response.message.content; + let usage = Usage { + prompt_tokens: ollama_response.prompt_eval_count.unwrap_or(0), + completion_tokens: ollama_response.eval_count.unwrap_or(0), + total_tokens: ollama_response.prompt_eval_count.unwrap_or(0) + + ollama_response.eval_count.unwrap_or(0), + }; + + // Extract tool calls if present + let tool_calls: Option> = ollama_response.message.tool_calls.map(|tcs| { + tcs.iter() + .map(|tc| ToolCall { + id: tc.function.name.clone(), + tool: tc.function.name.clone(), + args: tc.function.arguments.clone(), + }) + .collect() + }); + + // Send content if any + if !content.is_empty() { + let _ = tx.send(Ok(CompletionChunk { + content, + finished: false, + usage: None, + tool_calls: None, + })).await; + } + + // Send final chunk with usage and tool calls + let _ = tx.send(Ok(CompletionChunk { + content: String::new(), + finished: true, + usage: Some(usage), + tool_calls, + })).await; }); Ok(ReceiverStream::new(rx))