add context window monitor

Writes the current context window to logs/current_context_window (uses a symlink to a session ID).

This PR was unfortunately generated by a different LLM and did a ton of superficial reformating, it's actually a fairly small and benign change, but I don't want to roll back everything. Hope that's ok.
This commit is contained in:
Jochen
2025-11-27 21:00:02 +11:00
parent 93dc4acf86
commit 52f78653b4
89 changed files with 4040 additions and 2576 deletions

View File

@@ -139,7 +139,7 @@ impl AnthropicProvider {
.map_err(|e| anyhow!("Failed to create HTTP client: {}", e))?;
let model = model.unwrap_or_else(|| "claude-3-5-sonnet-20241022".to_string());
debug!("Initialized Anthropic provider with model: {}", model);
Ok(Self {
@@ -160,11 +160,11 @@ impl AnthropicProvider {
.header("x-api-key", &self.api_key)
.header("anthropic-version", ANTHROPIC_VERSION)
.header("content-type", "application/json");
if self.enable_1m_context {
builder = builder.header("anthropic-beta", "context-1m-2025-08-07");
}
if streaming {
builder = builder.header("accept", "text/event-stream");
}
@@ -188,12 +188,17 @@ impl AnthropicProvider {
};
// Extract properties and required fields from the input schema
if let Ok(schema_obj) = serde_json::from_value::<serde_json::Map<String, serde_json::Value>>(tool.input_schema.clone()) {
if let Ok(schema_obj) = serde_json::from_value::<
serde_json::Map<String, serde_json::Value>,
>(tool.input_schema.clone())
{
if let Some(properties) = schema_obj.get("properties") {
schema.properties = properties.clone();
}
if let Some(required) = schema_obj.get("required") {
if let Ok(required_vec) = serde_json::from_value::<Vec<String>>(required.clone()) {
if let Ok(required_vec) =
serde_json::from_value::<Vec<String>>(required.clone())
{
schema.required = Some(required_vec);
}
}
@@ -208,7 +213,10 @@ impl AnthropicProvider {
.collect()
}
fn convert_messages(&self, messages: &[Message]) -> Result<(Option<String>, Vec<AnthropicMessage>)> {
fn convert_messages(
&self,
messages: &[Message],
) -> Result<(Option<String>, Vec<AnthropicMessage>)> {
let mut system_message = None;
let mut anthropic_messages = Vec::new();
@@ -225,7 +233,9 @@ impl AnthropicProvider {
role: "user".to_string(),
content: vec![AnthropicContent::Text {
text: message.content.clone(),
cache_control: message.cache_control.as_ref()
cache_control: message
.cache_control
.as_ref()
.map(Self::convert_cache_control),
}],
});
@@ -235,7 +245,9 @@ impl AnthropicProvider {
role: "assistant".to_string(),
content: vec![AnthropicContent::Text {
text: message.content.clone(),
cache_control: message.cache_control.as_ref()
cache_control: message
.cache_control
.as_ref()
.map(Self::convert_cache_control),
}],
});
@@ -257,7 +269,9 @@ impl AnthropicProvider {
let (system, anthropic_messages) = self.convert_messages(messages)?;
if anthropic_messages.is_empty() {
return Err(anyhow!("At least one user or assistant message is required"));
return Err(anyhow!(
"At least one user or assistant message is required"
));
}
// Convert tools if provided
@@ -292,13 +306,13 @@ impl AnthropicProvider {
let mut accumulated_usage: Option<Usage> = None;
let mut byte_buffer = Vec::new(); // Buffer for incomplete UTF-8 sequences
let mut message_stopped = false; // Track if we've received message_stop
while let Some(chunk_result) = stream.next().await {
match chunk_result {
Ok(chunk) => {
// Append new bytes to our buffer
byte_buffer.extend_from_slice(&chunk);
// Try to convert the entire buffer to UTF-8
let chunk_str = match std::str::from_utf8(&byte_buffer) {
Ok(s) => {
@@ -312,7 +326,8 @@ impl AnthropicProvider {
let valid_up_to = e.valid_up_to();
if valid_up_to > 0 {
// We have some valid UTF-8, extract it and keep the rest for next iteration
let valid_bytes = byte_buffer.drain(..valid_up_to).collect::<Vec<_>>();
let valid_bytes =
byte_buffer.drain(..valid_up_to).collect::<Vec<_>>();
std::str::from_utf8(&valid_bytes).unwrap().to_string()
} else {
// No valid UTF-8 at all, skip this chunk and continue
@@ -346,7 +361,11 @@ impl AnthropicProvider {
content: String::new(),
finished: true,
usage: accumulated_usage.clone(),
tool_calls: if current_tool_calls.is_empty() { None } else { Some(current_tool_calls.clone()) },
tool_calls: if current_tool_calls.is_empty() {
None
} else {
Some(current_tool_calls.clone())
},
};
if tx.send(Ok(final_chunk)).await.is_err() {
debug!("Receiver dropped, stopping stream");
@@ -358,7 +377,10 @@ impl AnthropicProvider {
match serde_json::from_str::<AnthropicStreamEvent>(data) {
Ok(event) => {
debug!("Parsed event type: {}, event: {:?}", event.event_type, event);
debug!(
"Parsed event type: {}, event: {:?}",
event.event_type, event
);
match event.event_type.as_str() {
"message_start" => {
// Extract usage data from message_start event
@@ -367,19 +389,30 @@ impl AnthropicProvider {
accumulated_usage = Some(Usage {
prompt_tokens: usage.input_tokens,
completion_tokens: usage.output_tokens,
total_tokens: usage.input_tokens + usage.output_tokens,
total_tokens: usage.input_tokens
+ usage.output_tokens,
});
debug!("Captured usage from message_start: {:?}", accumulated_usage);
debug!(
"Captured usage from message_start: {:?}",
accumulated_usage
);
}
}
}
"content_block_start" => {
debug!("Received content_block_start event: {:?}", event);
debug!(
"Received content_block_start event: {:?}",
event
);
if let Some(content_block) = event.content_block {
match content_block {
AnthropicContent::ToolUse { id, name, input } => {
AnthropicContent::ToolUse {
id,
name,
input,
} => {
debug!("Found tool use in content_block_start: id={}, name={}, input={:?}", id, name, input);
// For native tool calls, create the tool call immediately if we have complete args
// If args are empty, we'll wait for partial_json to accumulate them
let tool_call = ToolCall {
@@ -387,9 +420,14 @@ impl AnthropicProvider {
tool: name.clone(),
args: input.clone(),
};
// Check if we already have complete arguments
if !input.is_null() && input != serde_json::Value::Object(serde_json::Map::new()) {
if !input.is_null()
&& input
!= serde_json::Value::Object(
serde_json::Map::new(),
)
{
// We have complete arguments, send the tool call immediately
debug!("Tool call has complete args, sending immediately: {:?}", tool_call);
let chunk = CompletionChunk {
@@ -410,7 +448,10 @@ impl AnthropicProvider {
}
}
_ => {
debug!("Non-tool content block: {:?}", content_block);
debug!(
"Non-tool content block: {:?}",
content_block
);
}
}
}
@@ -418,7 +459,11 @@ impl AnthropicProvider {
"content_block_delta" => {
if let Some(delta) = event.delta {
if let Some(text) = delta.text {
debug!("Sending text chunk of length {}: '{}'", text.len(), text);
debug!(
"Sending text chunk of length {}: '{}'",
text.len(),
text
);
let chunk = CompletionChunk {
content: text,
finished: false,
@@ -432,31 +477,51 @@ impl AnthropicProvider {
}
// Handle partial JSON for tool calls
if let Some(partial_json) = delta.partial_json {
debug!("Received partial JSON: {}", partial_json);
debug!(
"Received partial JSON: {}",
partial_json
);
partial_tool_json.push_str(&partial_json);
debug!("Accumulated tool JSON: {}", partial_tool_json);
debug!(
"Accumulated tool JSON: {}",
partial_tool_json
);
}
}
}
"content_block_stop" => {
// Tool call block is complete - now parse the accumulated JSON
if !current_tool_calls.is_empty() && !partial_tool_json.is_empty() {
debug!("Parsing complete tool JSON: {}", partial_tool_json);
if !current_tool_calls.is_empty()
&& !partial_tool_json.is_empty()
{
debug!(
"Parsing complete tool JSON: {}",
partial_tool_json
);
// Parse the accumulated JSON and update the last tool call
if let Ok(parsed_args) = serde_json::from_str::<serde_json::Value>(&partial_tool_json) {
if let Some(last_tool) = current_tool_calls.last_mut() {
if let Ok(parsed_args) =
serde_json::from_str::<serde_json::Value>(
&partial_tool_json,
)
{
if let Some(last_tool) =
current_tool_calls.last_mut()
{
last_tool.args = parsed_args;
debug!("Updated tool call with complete args: {:?}", last_tool);
}
} else {
debug!("Failed to parse accumulated JSON: {}", partial_tool_json);
debug!(
"Failed to parse accumulated JSON: {}",
partial_tool_json
);
}
// Clear the accumulator
partial_tool_json.clear();
}
// Send the complete tool call
if !current_tool_calls.is_empty() {
let chunk = CompletionChunk {
@@ -478,7 +543,11 @@ impl AnthropicProvider {
content: String::new(),
finished: true,
usage: accumulated_usage.clone(),
tool_calls: if current_tool_calls.is_empty() { None } else { Some(current_tool_calls.clone()) },
tool_calls: if current_tool_calls.is_empty() {
None
} else {
Some(current_tool_calls.clone())
},
};
if tx.send(Ok(final_chunk)).await.is_err() {
debug!("Receiver dropped, stopping stream");
@@ -490,7 +559,10 @@ impl AnthropicProvider {
if let Some(error) = event.error {
error!("Anthropic API error: {:?}", error);
let _ = tx
.send(Err(anyhow!("Anthropic API error: {:?}", error)))
.send(Err(anyhow!(
"Anthropic API error: {:?}",
error
)))
.await;
break; // Break to let stream exhaust naturally
}
@@ -524,7 +596,11 @@ impl AnthropicProvider {
content: String::new(),
finished: true,
usage: accumulated_usage.clone(),
tool_calls: if current_tool_calls.is_empty() { None } else { Some(current_tool_calls) },
tool_calls: if current_tool_calls.is_empty() {
None
} else {
Some(current_tool_calls)
},
};
let _ = tx.send(Ok(final_chunk)).await;
accumulated_usage
@@ -543,15 +619,17 @@ impl LLMProvider for AnthropicProvider {
let temperature = request.temperature.unwrap_or(self.temperature);
let request_body = self.create_request_body(
&request.messages,
request.tools.as_deref(),
false,
max_tokens,
temperature
&request.messages,
request.tools.as_deref(),
false,
max_tokens,
temperature,
)?;
debug!("Sending request to Anthropic API: model={}, max_tokens={}, temperature={}",
request_body.model, request_body.max_tokens, request_body.temperature);
debug!(
"Sending request to Anthropic API: model={}, max_tokens={}, temperature={}",
request_body.model, request_body.max_tokens, request_body.temperature
);
let response = self
.create_request_builder(false)
@@ -588,7 +666,8 @@ impl LLMProvider for AnthropicProvider {
let usage = Usage {
prompt_tokens: anthropic_response.usage.input_tokens,
completion_tokens: anthropic_response.usage.output_tokens,
total_tokens: anthropic_response.usage.input_tokens + anthropic_response.usage.output_tokens,
total_tokens: anthropic_response.usage.input_tokens
+ anthropic_response.usage.output_tokens,
};
debug!(
@@ -613,18 +692,24 @@ impl LLMProvider for AnthropicProvider {
let temperature = request.temperature.unwrap_or(self.temperature);
let request_body = self.create_request_body(
&request.messages,
request.tools.as_deref(),
true,
max_tokens,
temperature
&request.messages,
request.tools.as_deref(),
true,
max_tokens,
temperature,
)?;
debug!("Sending streaming request to Anthropic API: model={}, max_tokens={}, temperature={}",
request_body.model, request_body.max_tokens, request_body.temperature);
debug!(
"Sending streaming request to Anthropic API: model={}, max_tokens={}, temperature={}",
request_body.model, request_body.max_tokens, request_body.temperature
);
// Debug: Log the full request body
debug!("Full request body: {}", serde_json::to_string_pretty(&request_body).unwrap_or_else(|_| "Failed to serialize".to_string()));
debug!(
"Full request body: {}",
serde_json::to_string_pretty(&request_body)
.unwrap_or_else(|_| "Failed to serialize".to_string())
);
let response = self
.create_request_builder(true)
@@ -673,16 +758,16 @@ impl LLMProvider for AnthropicProvider {
// Claude models support native tool calling
true
}
fn supports_cache_control(&self) -> bool {
// Anthropic supports cache control
true
}
fn max_tokens(&self) -> u32 {
self.max_tokens
}
fn temperature(&self) -> f32 {
self.temperature
}
@@ -729,7 +814,7 @@ struct AnthropicMessage {
#[serde(tag = "type")]
enum AnthropicContent {
#[serde(rename = "text")]
Text {
Text {
text: String,
#[serde(skip_serializing_if = "Option::is_none")]
cache_control: Option<crate::CacheControl>,
@@ -798,17 +883,14 @@ mod tests {
#[test]
fn test_message_conversion() {
let provider = AnthropicProvider::new(
"test-key".to_string(),
None,
None,
None,
None,
None,
).unwrap();
let provider =
AnthropicProvider::new("test-key".to_string(), None, None, None, None, None).unwrap();
let messages = vec![
Message::new(MessageRole::System, "You are a helpful assistant.".to_string()),
Message::new(
MessageRole::System,
"You are a helpful assistant.".to_string(),
),
Message::new(MessageRole::User, "Hello!".to_string()),
Message::new(MessageRole::Assistant, "Hi there!".to_string()),
];
@@ -830,7 +912,8 @@ mod tests {
Some(0.5),
None,
None,
).unwrap();
)
.unwrap();
let messages = vec![Message::new(MessageRole::User, "Test message".to_string())];
@@ -848,31 +931,23 @@ mod tests {
#[test]
fn test_tool_conversion() {
let provider = AnthropicProvider::new(
"test-key".to_string(),
None,
None,
None,
None,
None,
).unwrap();
let provider =
AnthropicProvider::new("test-key".to_string(), None, None, None, None, None).unwrap();
let tools = vec![
Tool {
name: "get_weather".to_string(),
description: "Get the current weather".to_string(),
input_schema: serde_json::json!({
"type": "object",
"properties": {
"location": {
"type": "string",
"description": "The city and state"
}
},
"required": ["location"]
}),
},
];
let tools = vec![Tool {
name: "get_weather".to_string(),
description: "Get the current weather".to_string(),
input_schema: serde_json::json!({
"type": "object",
"properties": {
"location": {
"type": "string",
"description": "The city and state"
}
},
"required": ["location"]
}),
}];
let anthropic_tools = provider.convert_tools(&tools);
@@ -881,31 +956,30 @@ mod tests {
assert_eq!(anthropic_tools[0].description, "Get the current weather");
assert_eq!(anthropic_tools[0].input_schema.schema_type, "object");
assert!(anthropic_tools[0].input_schema.required.is_some());
assert_eq!(anthropic_tools[0].input_schema.required.as_ref().unwrap()[0], "location");
assert_eq!(
anthropic_tools[0].input_schema.required.as_ref().unwrap()[0],
"location"
);
}
#[test]
fn test_cache_control_serialization() {
let provider = AnthropicProvider::new(
"test-key".to_string(),
None,
None,
None,
None,
None,
).unwrap();
let provider =
AnthropicProvider::new("test-key".to_string(), None, None, None, None, None).unwrap();
// Test message WITHOUT cache_control
let messages_without = vec![Message::new(MessageRole::User, "Hello".to_string())];
let (_, anthropic_messages_without) = provider.convert_messages(&messages_without).unwrap();
let json_without = serde_json::to_string(&anthropic_messages_without).unwrap();
println!("Anthropic JSON without cache_control: {}", json_without);
// Check if cache_control appears in the JSON
if json_without.contains("cache_control") {
println!("WARNING: JSON contains 'cache_control' field when not configured!");
assert!(!json_without.contains("\"cache_control\":null"),
"JSON should not contain 'cache_control: null'");
assert!(
!json_without.contains("\"cache_control\":null"),
"JSON should not contain 'cache_control: null'"
);
}
// Test message WITH cache_control
@@ -916,15 +990,21 @@ mod tests {
)];
let (_, anthropic_messages_with) = provider.convert_messages(&messages_with).unwrap();
let json_with = serde_json::to_string(&anthropic_messages_with).unwrap();
println!("Anthropic JSON with cache_control: {}", json_with);
assert!(json_with.contains("cache_control"),
"JSON should contain 'cache_control' field when configured");
assert!(json_with.contains("ephemeral"),
"JSON should contain 'ephemeral' type");
assert!(
json_with.contains("cache_control"),
"JSON should contain 'cache_control' field when configured"
);
assert!(
json_with.contains("ephemeral"),
"JSON should contain 'ephemeral' type"
);
// The key assertion: when cache_control is None, it should not appear in JSON
assert!(!json_without.contains("cache_control") || !json_without.contains("null"),
"JSON should not contain 'cache_control' field or null values when not configured");
assert!(
!json_without.contains("cache_control") || !json_without.contains("null"),
"JSON should not contain 'cache_control' field or null values when not configured"
);
}
}