first cut of horizontal partitioning

# Conflicts:
#	Cargo.lock

# Conflicts:
#	Cargo.lock
#	crates/g3-cli/src/lib.rs
This commit is contained in:
Dhanji Prasanna
2025-11-13 11:21:48 +11:00
committed by Jochen
parent c6c35bf2ca
commit 4cfa0147ca
14 changed files with 2200 additions and 114 deletions

View File

@@ -0,0 +1,647 @@
//! Flock mode implementation - parallel multi-agent development
use anyhow::{Context, Result};
use chrono::Utc;
use g3_config::Config;
use std::path::{Path, PathBuf};
use std::process::Stdio;
use tokio::io::{AsyncBufReadExt, BufReader};
use tokio::process::Command;
use tracing::{debug, error, info, warn};
use uuid::Uuid;
use crate::status::{FlockStatus, SegmentState, SegmentStatus};
/// Configuration for flock mode
#[derive(Debug, Clone)]
pub struct FlockConfig {
/// Project directory (must be a git repo with flock-requirements.md)
pub project_dir: PathBuf,
/// Flock workspace directory where segments will be created
pub flock_workspace: PathBuf,
/// Number of segments to partition work into
pub num_segments: usize,
/// Maximum turns per segment (for autonomous mode)
pub max_turns: usize,
/// G3 configuration to use for agents
pub g3_config: Config,
/// Path to g3 binary (defaults to current executable)
pub g3_binary: Option<PathBuf>,
}
impl FlockConfig {
/// Create a new flock configuration
pub fn new(
project_dir: PathBuf,
flock_workspace: PathBuf,
num_segments: usize,
) -> Result<Self> {
// Validate project directory
if !project_dir.exists() {
anyhow::bail!("Project directory does not exist: {}", project_dir.display());
}
// Check if it's a git repo
if !project_dir.join(".git").exists() {
anyhow::bail!("Project directory must be a git repository: {}", project_dir.display());
}
// Check for flock-requirements.md
let requirements_path = project_dir.join("flock-requirements.md");
if !requirements_path.exists() {
anyhow::bail!(
"Project directory must contain flock-requirements.md: {}",
project_dir.display()
);
}
// Load default config
let g3_config = Config::load(None)?;
Ok(Self {
project_dir,
flock_workspace,
num_segments,
max_turns: 5, // Default
g3_config,
g3_binary: None,
})
}
/// Set maximum turns per segment
pub fn with_max_turns(mut self, max_turns: usize) -> Self {
self.max_turns = max_turns;
self
}
/// Set custom g3 binary path
pub fn with_g3_binary(mut self, binary: PathBuf) -> Self {
self.g3_binary = Some(binary);
self
}
/// Set custom g3 config
pub fn with_config(mut self, config: Config) -> Self {
self.g3_config = config;
self
}
}
/// Flock mode orchestrator
pub struct FlockMode {
config: FlockConfig,
status: FlockStatus,
session_id: String,
}
impl FlockMode {
/// Create a new flock mode instance
pub fn new(config: FlockConfig) -> Result<Self> {
let session_id = Uuid::new_v4().to_string();
let status = FlockStatus::new(
session_id.clone(),
config.project_dir.clone(),
config.flock_workspace.clone(),
config.num_segments,
);
Ok(Self {
config,
status,
session_id,
})
}
/// Run flock mode
pub async fn run(&mut self) -> Result<()> {
info!("Starting flock mode with {} segments", self.config.num_segments);
// Step 1: Partition requirements
println!("\n🧠 Step 1: Partitioning requirements into {} segments...", self.config.num_segments);
let partitions = self.partition_requirements().await?;
// Step 2: Create segment workspaces
println!("\n📁 Step 2: Creating segment workspaces...");
self.create_segment_workspaces(&partitions).await?;
// Step 3: Run segments in parallel
println!("\n🚀 Step 3: Running {} segments in parallel...", self.config.num_segments);
self.run_segments_parallel().await?;
// Step 4: Generate final report
println!("\n📊 Step 4: Generating final report...");
self.status.completed_at = Some(Utc::now());
self.save_status()?;
let report = self.status.generate_report();
println!("{}", report);
Ok(())
}
/// Partition requirements using an AI agent
async fn partition_requirements(&mut self) -> Result<Vec<String>> {
let requirements_path = self.config.project_dir.join("flock-requirements.md");
let requirements_content = std::fs::read_to_string(&requirements_path)
.context("Failed to read flock-requirements.md")?;
// Create a temporary workspace for the partitioning agent
let partition_workspace = self.config.flock_workspace.join("_partition");
std::fs::create_dir_all(&partition_workspace)?;
// Create the partitioning prompt
let partition_prompt = format!(
"You are a software architect tasked with partitioning project requirements into {} logical, \
largely non-overlapping modules that can grow into separate architectural components \
(e.g., crates, services, or packages).\n\n\
REQUIREMENTS:\n{}\n\n\
INSTRUCTIONS:\n\
1. Analyze the requirements carefully\n\
2. Identify {} distinct architectural modules that:\n\
- Have minimal overlap and dependencies\n\
- Can be developed largely independently\n\
- Represent logical architectural boundaries\n\
- Could eventually become separate crates or services\n\
3. For each module, provide:\n\
- A clear module name\n\
- The specific requirements that belong to this module\n\
- Any dependencies on other modules\n\n\
4. Use the final_output tool to provide your partitioning as a JSON array of objects, where each object has:\n\
- \"module_name\": string\n\
- \"requirements\": string (the requirements text for this module)\n\
- \"dependencies\": array of strings (names of other modules this depends on)\n\n\
Example format:\n\
```json\n\
[\n\
{{\n\
\"module_name\": \"core-engine\",\n\
\"requirements\": \"Implement the core processing engine...\",\n\
\"dependencies\": []\n\
}},\n\
{{\n\
\"module_name\": \"api-server\",\n\
\"requirements\": \"Create REST API endpoints...\",\n\
\"dependencies\": [\"core-engine\"]\n\
}}\n\
]\n\
```\n\n\
Be thoughtful and strategic in your partitioning. The goal is to enable parallel development.",
self.config.num_segments,
requirements_content,
self.config.num_segments
);
// Get g3 binary path
let g3_binary = self.get_g3_binary()?;
// Run g3 in single-shot mode to partition requirements
println!(" Analyzing requirements and creating partitions...");
let output = Command::new(&g3_binary)
.arg("--workspace")
.arg(&partition_workspace)
.arg("--quiet") // Disable logging for partitioning agent
.arg(&partition_prompt)
.output()
.await
.context("Failed to run g3 for partitioning")?;
if !output.status.success() {
let stderr = String::from_utf8_lossy(&output.stderr);
anyhow::bail!("Partitioning agent failed: {}", stderr);
}
let stdout = String::from_utf8_lossy(&output.stdout);
debug!("Partitioning agent output: {}", stdout);
// Extract JSON from the output
let partitions_json = self.extract_json_from_output(&stdout)
.context("Failed to extract partition JSON from agent output")?;
// Parse the partitions
let partitions: Vec<serde_json::Value> = serde_json::from_str(&partitions_json)
.context("Failed to parse partition JSON")?;
if partitions.len() != self.config.num_segments {
warn!(
"Expected {} partitions but got {}. Adjusting...",
self.config.num_segments,
partitions.len()
);
}
// Extract requirements text from each partition
let mut partition_texts = Vec::new();
for (i, partition) in partitions.iter().enumerate() {
let default_name = format!("module-{}", i + 1);
let module_name = partition["module_name"]
.as_str()
.unwrap_or(&default_name);
let requirements = partition["requirements"]
.as_str()
.context("Missing requirements field in partition")?;
let dependencies = partition["dependencies"]
.as_array()
.map(|arr| {
arr.iter()
.filter_map(|v| v.as_str())
.collect::<Vec<_>>()
.join(", ")
})
.unwrap_or_default();
let partition_text = format!(
"# Module: {}\n\n## Dependencies\n{}\n\n## Requirements\n\n{}",
module_name,
if dependencies.is_empty() {
"None".to_string()
} else {
dependencies
},
requirements
);
partition_texts.push(partition_text);
println!(" ✓ Created partition {}: {}", i + 1, module_name);
}
Ok(partition_texts)
}
/// Extract JSON from agent output (looks for JSON array in output)
fn extract_json_from_output(&self, output: &str) -> Result<String> {
// Look for JSON array in the output
// Try to find content between [ and ]
if let Some(start) = output.find('[') {
if let Some(end) = output.rfind(']') {
if end > start {
return Ok(output[start..=end].to_string());
}
}
}
// If we can't find JSON array markers, try to parse the whole output
anyhow::bail!("Could not find JSON array in agent output")
}
/// Create segment workspaces by copying project directory
async fn create_segment_workspaces(&mut self, partitions: &[String]) -> Result<()> {
// Ensure flock workspace exists
std::fs::create_dir_all(&self.config.flock_workspace)?;
for (i, partition) in partitions.iter().enumerate() {
let segment_id = i + 1;
let segment_dir = self.config.flock_workspace.join(format!("segment-{}", segment_id));
println!(" Creating segment {} workspace...", segment_id);
// Copy project directory to segment directory
self.copy_git_repo(&self.config.project_dir, &segment_dir)
.await
.context(format!("Failed to copy project to segment {}", segment_id))?;
// Write segment-requirements.md
let requirements_path = segment_dir.join("segment-requirements.md");
std::fs::write(&requirements_path, partition)
.context(format!("Failed to write requirements for segment {}", segment_id))?;
println!(" ✓ Segment {} workspace ready at {}", segment_id, segment_dir.display());
}
Ok(())
}
/// Copy a git repository to a new location
async fn copy_git_repo(&self, source: &Path, dest: &Path) -> Result<()> {
// Use git clone for efficient copying
let output = Command::new("git")
.arg("clone")
.arg(source)
.arg(dest)
.output()
.await
.context("Failed to run git clone")?;
if !output.status.success() {
let stderr = String::from_utf8_lossy(&output.stderr);
anyhow::bail!("Git clone failed: {}", stderr);
}
Ok(())
}
/// Run all segments in parallel
async fn run_segments_parallel(&mut self) -> Result<()> {
let mut handles = Vec::new();
for segment_id in 1..=self.config.num_segments {
let segment_dir = self.config.flock_workspace.join(format!("segment-{}", segment_id));
let max_turns = self.config.max_turns;
let g3_binary = self.get_g3_binary()?;
let status_file = self.get_status_file_path();
let session_id = self.session_id.clone();
// Initialize segment status
let segment_status = SegmentStatus {
segment_id,
workspace: segment_dir.clone(),
state: SegmentState::Running,
started_at: Utc::now(),
completed_at: None,
tokens_used: 0,
tool_calls: 0,
errors: 0,
current_turn: 0,
max_turns,
last_message: Some("Starting...".to_string()),
error_message: None,
};
self.status.update_segment(segment_id, segment_status);
self.save_status()?;
// Spawn a task for this segment
let handle = tokio::spawn(async move {
run_segment(
segment_id,
segment_dir,
max_turns,
g3_binary,
status_file,
session_id,
)
.await
});
handles.push((segment_id, handle));
}
// Wait for all segments to complete
for (segment_id, handle) in handles {
match handle.await {
Ok(Ok(final_status)) => {
println!("\n✅ Segment {} completed", segment_id);
self.status.update_segment(segment_id, final_status);
self.save_status()?;
}
Ok(Err(e)) => {
error!("Segment {} failed: {}", segment_id, e);
let mut segment_status = self.status.segments.get(&segment_id).cloned()
.unwrap_or_else(|| SegmentStatus {
segment_id,
workspace: self.config.flock_workspace.join(format!("segment-{}", segment_id)),
state: SegmentState::Failed,
started_at: Utc::now(),
completed_at: Some(Utc::now()),
tokens_used: 0,
tool_calls: 0,
errors: 1,
current_turn: 0,
max_turns: self.config.max_turns,
last_message: None,
error_message: Some(e.to_string()),
});
segment_status.state = SegmentState::Failed;
segment_status.completed_at = Some(Utc::now());
segment_status.error_message = Some(e.to_string());
segment_status.errors += 1;
self.status.update_segment(segment_id, segment_status);
self.save_status()?;
}
Err(e) => {
error!("Segment {} task panicked: {}", segment_id, e);
let mut segment_status = self.status.segments.get(&segment_id).cloned()
.unwrap_or_else(|| SegmentStatus {
segment_id,
workspace: self.config.flock_workspace.join(format!("segment-{}", segment_id)),
state: SegmentState::Failed,
started_at: Utc::now(),
completed_at: Some(Utc::now()),
tokens_used: 0,
tool_calls: 0,
errors: 1,
current_turn: 0,
max_turns: self.config.max_turns,
last_message: None,
error_message: Some(format!("Task panicked: {}", e)),
});
segment_status.state = SegmentState::Failed;
segment_status.completed_at = Some(Utc::now());
segment_status.error_message = Some(format!("Task panicked: {}", e));
segment_status.errors += 1;
self.status.update_segment(segment_id, segment_status);
self.save_status()?;
}
}
}
Ok(())
}
/// Get the g3 binary path
fn get_g3_binary(&self) -> Result<PathBuf> {
if let Some(ref binary) = self.config.g3_binary {
Ok(binary.clone())
} else {
// Use current executable
std::env::current_exe().context("Failed to get current executable path")
}
}
/// Get the status file path
fn get_status_file_path(&self) -> PathBuf {
self.config.flock_workspace.join("flock-status.json")
}
/// Save current status to file
fn save_status(&self) -> Result<()> {
let status_file = self.get_status_file_path();
self.status.save_to_file(&status_file)
}
}
/// Run a single segment worker
async fn run_segment(
segment_id: usize,
segment_dir: PathBuf,
max_turns: usize,
g3_binary: PathBuf,
status_file: PathBuf,
session_id: String,
) -> Result<SegmentStatus> {
info!("Starting segment {} in {}", segment_id, segment_dir.display());
let mut segment_status = SegmentStatus {
segment_id,
workspace: segment_dir.clone(),
state: SegmentState::Running,
started_at: Utc::now(),
completed_at: None,
tokens_used: 0,
tool_calls: 0,
errors: 0,
current_turn: 0,
max_turns,
last_message: Some("Starting autonomous mode...".to_string()),
error_message: None,
};
// Run g3 in autonomous mode with segment-requirements.md
let mut child = Command::new(&g3_binary)
.arg("--workspace")
.arg(&segment_dir)
.arg("--autonomous")
.arg("--max-turns")
.arg(max_turns.to_string())
.arg("--requirements")
.arg(std::fs::read_to_string(segment_dir.join("segment-requirements.md"))?)
.arg("--quiet") // Disable session logging for workers
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.spawn()
.context("Failed to spawn g3 process")?;
// Stream output and update status
let stdout = child.stdout.take().context("Failed to get stdout")?;
let stderr = child.stderr.take().context("Failed to get stderr")?;
let stdout_reader = BufReader::new(stdout);
let stderr_reader = BufReader::new(stderr);
let mut stdout_lines = stdout_reader.lines();
let mut stderr_lines = stderr_reader.lines();
// Read output and update status
loop {
tokio::select! {
line = stdout_lines.next_line() => {
match line {
Ok(Some(line)) => {
println!("[Segment {}] {}", segment_id, line);
// Parse output for status updates
if line.contains("TURN") {
// Extract turn number if possible
if let Some(turn_str) = line.split("TURN").nth(1) {
if let Ok(turn) = turn_str.trim().split('/').next().unwrap_or("0").parse::<usize>() {
segment_status.current_turn = turn;
}
}
}
segment_status.last_message = Some(line);
update_status_file(&status_file, &session_id, segment_status.clone())?;
}
Ok(None) => break,
Err(e) => {
error!("Error reading stdout for segment {}: {}", segment_id, e);
break;
}
}
}
line = stderr_lines.next_line() => {
match line {
Ok(Some(line)) => {
eprintln!("[Segment {} ERROR] {}", segment_id, line);
segment_status.errors += 1;
update_status_file(&status_file, &session_id, segment_status.clone())?;
}
Ok(None) => break,
Err(e) => {
error!("Error reading stderr for segment {}: {}", segment_id, e);
break;
}
}
}
}
}
// Wait for process to complete
let status = child.wait().await.context("Failed to wait for g3 process")?;
segment_status.completed_at = Some(Utc::now());
if status.success() {
segment_status.state = SegmentState::Completed;
segment_status.last_message = Some("Completed successfully".to_string());
} else {
segment_status.state = SegmentState::Failed;
segment_status.error_message = Some(format!("Process exited with status: {}", status));
segment_status.errors += 1;
}
// Try to extract metrics from session log if available
let log_dir = segment_dir.join("logs");
if log_dir.exists() {
if let Ok(entries) = std::fs::read_dir(&log_dir) {
for entry in entries.flatten() {
let path = entry.path();
if path.extension().and_then(|s| s.to_str()) == Some("json") {
if let Ok(log_content) = std::fs::read_to_string(&path) {
if let Ok(log_json) = serde_json::from_str::<serde_json::Value>(&log_content) {
// Extract token usage
if let Some(context) = log_json.get("context_window") {
if let Some(cumulative) = context.get("cumulative_tokens") {
if let Some(tokens) = cumulative.as_u64() {
segment_status.tokens_used = tokens;
}
}
}
// Count tool calls from conversation history
if let Some(context) = log_json.get("context_window") {
if let Some(history) = context.get("conversation_history") {
if let Some(messages) = history.as_array() {
let tool_call_count = messages
.iter()
.filter(|msg| {
msg.get("role")
.and_then(|r| r.as_str())
== Some("tool")
})
.count();
segment_status.tool_calls = tool_call_count as u64;
}
}
}
}
}
}
}
}
}
update_status_file(&status_file, &session_id, segment_status.clone())?;
Ok(segment_status)
}
/// Update the status file with new segment status
fn update_status_file(
status_file: &PathBuf,
session_id: &str,
segment_status: SegmentStatus,
) -> Result<()> {
// Load existing status or create new one
let mut flock_status = if status_file.exists() {
FlockStatus::load_from_file(status_file)?
} else {
// This shouldn't happen, but handle it gracefully
FlockStatus::new(
session_id.to_string(),
PathBuf::new(),
PathBuf::new(),
0,
)
};
flock_status.update_segment(segment_status.segment_id, segment_status);
flock_status.save_to_file(status_file)?;
Ok(())
}

View File

@@ -0,0 +1,12 @@
//! G3 Ensembles - Multi-agent ensemble functionality
//!
//! This crate provides functionality for running multiple G3 agents in coordination,
//! enabling parallel development across different architectural modules.
pub mod flock;
pub mod status;
mod tests;
/// Re-export main types for convenience
pub use flock::{FlockConfig, FlockMode};
pub use status::{FlockStatus, SegmentStatus};

View File

@@ -0,0 +1,240 @@
//! Status tracking for flock mode
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::path::PathBuf;
/// Status of an individual segment worker
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SegmentStatus {
/// Segment number
pub segment_id: usize,
/// Segment workspace directory
pub workspace: PathBuf,
/// Current state of the segment
pub state: SegmentState,
/// Start time
pub started_at: DateTime<Utc>,
/// Completion time (if finished)
pub completed_at: Option<DateTime<Utc>>,
/// Total tokens used
pub tokens_used: u64,
/// Number of tool calls made
pub tool_calls: u64,
/// Number of errors encountered
pub errors: u64,
/// Current turn number (for autonomous mode)
pub current_turn: usize,
/// Maximum turns allowed
pub max_turns: usize,
/// Last status message
pub last_message: Option<String>,
/// Error message (if failed)
pub error_message: Option<String>,
}
/// State of a segment worker
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub enum SegmentState {
/// Waiting to start
Pending,
/// Currently running
Running,
/// Completed successfully
Completed,
/// Failed with error
Failed,
/// Cancelled by user
Cancelled,
}
impl std::fmt::Display for SegmentState {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
SegmentState::Pending => write!(f, "⏳ Pending"),
SegmentState::Running => write!(f, "🔄 Running"),
SegmentState::Completed => write!(f, "✅ Completed"),
SegmentState::Failed => write!(f, "❌ Failed"),
SegmentState::Cancelled => write!(f, "⚠️ Cancelled"),
}
}
}
/// Overall flock status
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct FlockStatus {
/// Flock session ID
pub session_id: String,
/// Project directory
pub project_dir: PathBuf,
/// Flock workspace directory
pub flock_workspace: PathBuf,
/// Number of segments
pub num_segments: usize,
/// Start time
pub started_at: DateTime<Utc>,
/// Completion time (if finished)
pub completed_at: Option<DateTime<Utc>>,
/// Status of each segment
pub segments: HashMap<usize, SegmentStatus>,
/// Total tokens used across all segments
pub total_tokens: u64,
/// Total tool calls across all segments
pub total_tool_calls: u64,
/// Total errors across all segments
pub total_errors: u64,
}
impl FlockStatus {
/// Create a new flock status
pub fn new(
session_id: String,
project_dir: PathBuf,
flock_workspace: PathBuf,
num_segments: usize,
) -> Self {
Self {
session_id,
project_dir,
flock_workspace,
num_segments,
started_at: Utc::now(),
completed_at: None,
segments: HashMap::new(),
total_tokens: 0,
total_tool_calls: 0,
total_errors: 0,
}
}
/// Update segment status
pub fn update_segment(&mut self, segment_id: usize, status: SegmentStatus) {
self.segments.insert(segment_id, status);
self.recalculate_totals();
}
/// Recalculate total metrics
fn recalculate_totals(&mut self) {
self.total_tokens = self.segments.values().map(|s| s.tokens_used).sum();
self.total_tool_calls = self.segments.values().map(|s| s.tool_calls).sum();
self.total_errors = self.segments.values().map(|s| s.errors).sum();
}
/// Check if all segments are complete
pub fn is_complete(&self) -> bool {
self.segments.len() == self.num_segments
&& self.segments.values().all(|s| {
matches!(
s.state,
SegmentState::Completed | SegmentState::Failed | SegmentState::Cancelled
)
})
}
/// Get count of segments by state
pub fn count_by_state(&self, state: SegmentState) -> usize {
self.segments.values().filter(|s| s.state == state).count()
}
/// Save status to file
pub fn save_to_file(&self, path: &PathBuf) -> anyhow::Result<()> {
let json = serde_json::to_string_pretty(self)?;
std::fs::write(path, json)?;
Ok(())
}
/// Load status from file
pub fn load_from_file(path: &PathBuf) -> anyhow::Result<Self> {
let json = std::fs::read_to_string(path)?;
let status = serde_json::from_str(&json)?;
Ok(status)
}
/// Generate a summary report
pub fn generate_report(&self) -> String {
let mut report = String::new();
report.push_str(&format!("\n{}", "=".repeat(80)));
report.push_str(&format!("\n📊 FLOCK MODE SESSION REPORT"));
report.push_str(&format!("\n{}", "=".repeat(80)));
report.push_str(&format!("\n\n🆔 Session ID: {}", self.session_id));
report.push_str(&format!("\n📁 Project: {}", self.project_dir.display()));
report.push_str(&format!("\n🗂️ Workspace: {}", self.flock_workspace.display()));
report.push_str(&format!("\n🔢 Segments: {}", self.num_segments));
let duration = if let Some(completed) = self.completed_at {
completed.signed_duration_since(self.started_at)
} else {
Utc::now().signed_duration_since(self.started_at)
};
report.push_str(&format!("\n⏱️ Duration: {:.2}s", duration.num_milliseconds() as f64 / 1000.0));
// Segment status summary
report.push_str(&format!("\n\n📈 Segment Status:"));
report.push_str(&format!("\n • Completed: {}", self.count_by_state(SegmentState::Completed)));
report.push_str(&format!("\n • Running: {}", self.count_by_state(SegmentState::Running)));
report.push_str(&format!("\n • Failed: {}", self.count_by_state(SegmentState::Failed)));
report.push_str(&format!("\n • Pending: {}", self.count_by_state(SegmentState::Pending)));
report.push_str(&format!("\n • Cancelled: {}", self.count_by_state(SegmentState::Cancelled)));
// Metrics
report.push_str(&format!("\n\n📊 Aggregate Metrics:"));
report.push_str(&format!("\n • Total Tokens: {}", self.total_tokens));
report.push_str(&format!("\n • Total Tool Calls: {}", self.total_tool_calls));
report.push_str(&format!("\n • Total Errors: {}", self.total_errors));
// Per-segment details
report.push_str(&format!("\n\n🔍 Segment Details:"));
let mut segments: Vec<_> = self.segments.iter().collect();
segments.sort_by_key(|(id, _)| *id);
for (id, segment) in segments {
report.push_str(&format!("\n\n Segment {}:", id));
report.push_str(&format!("\n Status: {}", segment.state));
report.push_str(&format!("\n Workspace: {}", segment.workspace.display()));
report.push_str(&format!("\n Tokens: {}", segment.tokens_used));
report.push_str(&format!("\n Tool Calls: {}", segment.tool_calls));
report.push_str(&format!("\n Errors: {}", segment.errors));
report.push_str(&format!("\n Turn: {}/{}", segment.current_turn, segment.max_turns));
if let Some(ref msg) = segment.last_message {
report.push_str(&format!("\n Last Message: {}", msg));
}
if let Some(ref err) = segment.error_message {
report.push_str(&format!("\n Error: {}", err));
}
}
report.push_str(&format!("\n\n{}", "=".repeat(80)));
report
}
}

View File

@@ -0,0 +1,331 @@
//! Unit tests for g3-ensembles
#[cfg(test)]
mod tests {
use crate::status::{FlockStatus, SegmentState, SegmentStatus};
use chrono::Utc;
use std::path::PathBuf;
#[test]
fn test_segment_state_display() {
assert_eq!(format!("{}", SegmentState::Pending), "⏳ Pending");
assert_eq!(format!("{}", SegmentState::Running), "🔄 Running");
assert_eq!(format!("{}", SegmentState::Completed), "✅ Completed");
assert_eq!(format!("{}", SegmentState::Failed), "❌ Failed");
assert_eq!(format!("{}", SegmentState::Cancelled), "⚠️ Cancelled");
}
#[test]
fn test_flock_status_creation() {
let status = FlockStatus::new(
"test-session".to_string(),
PathBuf::from("/test/project"),
PathBuf::from("/test/workspace"),
3,
);
assert_eq!(status.session_id, "test-session");
assert_eq!(status.num_segments, 3);
assert_eq!(status.segments.len(), 0);
assert_eq!(status.total_tokens, 0);
assert_eq!(status.total_tool_calls, 0);
assert_eq!(status.total_errors, 0);
assert!(status.completed_at.is_none());
}
#[test]
fn test_segment_status_update() {
let mut status = FlockStatus::new(
"test-session".to_string(),
PathBuf::from("/test/project"),
PathBuf::from("/test/workspace"),
2,
);
let segment1 = SegmentStatus {
segment_id: 1,
workspace: PathBuf::from("/test/workspace/segment-1"),
state: SegmentState::Completed,
started_at: Utc::now(),
completed_at: Some(Utc::now()),
tokens_used: 1000,
tool_calls: 50,
errors: 2,
current_turn: 5,
max_turns: 10,
last_message: Some("Done".to_string()),
error_message: None,
};
status.update_segment(1, segment1);
assert_eq!(status.segments.len(), 1);
assert_eq!(status.total_tokens, 1000);
assert_eq!(status.total_tool_calls, 50);
assert_eq!(status.total_errors, 2);
}
#[test]
fn test_multiple_segment_updates() {
let mut status = FlockStatus::new(
"test-session".to_string(),
PathBuf::from("/test/project"),
PathBuf::from("/test/workspace"),
2,
);
let segment1 = SegmentStatus {
segment_id: 1,
workspace: PathBuf::from("/test/workspace/segment-1"),
state: SegmentState::Completed,
started_at: Utc::now(),
completed_at: Some(Utc::now()),
tokens_used: 1000,
tool_calls: 50,
errors: 2,
current_turn: 5,
max_turns: 10,
last_message: Some("Done".to_string()),
error_message: None,
};
let segment2 = SegmentStatus {
segment_id: 2,
workspace: PathBuf::from("/test/workspace/segment-2"),
state: SegmentState::Failed,
started_at: Utc::now(),
completed_at: Some(Utc::now()),
tokens_used: 500,
tool_calls: 25,
errors: 5,
current_turn: 3,
max_turns: 10,
last_message: Some("Error".to_string()),
error_message: Some("Test error".to_string()),
};
status.update_segment(1, segment1);
status.update_segment(2, segment2);
assert_eq!(status.segments.len(), 2);
assert_eq!(status.total_tokens, 1500);
assert_eq!(status.total_tool_calls, 75);
assert_eq!(status.total_errors, 7);
}
#[test]
fn test_is_complete() {
let mut status = FlockStatus::new(
"test-session".to_string(),
PathBuf::from("/test/project"),
PathBuf::from("/test/workspace"),
2,
);
// Not complete - no segments
assert!(!status.is_complete());
// Add one completed segment
let segment1 = SegmentStatus {
segment_id: 1,
workspace: PathBuf::from("/test/workspace/segment-1"),
state: SegmentState::Completed,
started_at: Utc::now(),
completed_at: Some(Utc::now()),
tokens_used: 1000,
tool_calls: 50,
errors: 0,
current_turn: 5,
max_turns: 10,
last_message: None,
error_message: None,
};
status.update_segment(1, segment1);
// Still not complete - only 1 of 2 segments
assert!(!status.is_complete());
// Add second segment (running)
let segment2 = SegmentStatus {
segment_id: 2,
workspace: PathBuf::from("/test/workspace/segment-2"),
state: SegmentState::Running,
started_at: Utc::now(),
completed_at: None,
tokens_used: 500,
tool_calls: 25,
errors: 0,
current_turn: 3,
max_turns: 10,
last_message: None,
error_message: None,
};
status.update_segment(2, segment2);
// Still not complete - segment 2 is running
assert!(!status.is_complete());
// Update segment 2 to completed
let segment2_done = SegmentStatus {
segment_id: 2,
workspace: PathBuf::from("/test/workspace/segment-2"),
state: SegmentState::Completed,
started_at: Utc::now(),
completed_at: Some(Utc::now()),
tokens_used: 500,
tool_calls: 25,
errors: 0,
current_turn: 5,
max_turns: 10,
last_message: None,
error_message: None,
};
status.update_segment(2, segment2_done);
// Now complete
assert!(status.is_complete());
}
#[test]
fn test_count_by_state() {
let mut status = FlockStatus::new(
"test-session".to_string(),
PathBuf::from("/test/project"),
PathBuf::from("/test/workspace"),
3,
);
let segment1 = SegmentStatus {
segment_id: 1,
workspace: PathBuf::from("/test/workspace/segment-1"),
state: SegmentState::Completed,
started_at: Utc::now(),
completed_at: Some(Utc::now()),
tokens_used: 1000,
tool_calls: 50,
errors: 0,
current_turn: 5,
max_turns: 10,
last_message: None,
error_message: None,
};
let segment2 = SegmentStatus {
segment_id: 2,
workspace: PathBuf::from("/test/workspace/segment-2"),
state: SegmentState::Failed,
started_at: Utc::now(),
completed_at: Some(Utc::now()),
tokens_used: 500,
tool_calls: 25,
errors: 5,
current_turn: 3,
max_turns: 10,
last_message: None,
error_message: Some("Error".to_string()),
};
let segment3 = SegmentStatus {
segment_id: 3,
workspace: PathBuf::from("/test/workspace/segment-3"),
state: SegmentState::Completed,
started_at: Utc::now(),
completed_at: Some(Utc::now()),
tokens_used: 800,
tool_calls: 40,
errors: 1,
current_turn: 4,
max_turns: 10,
last_message: None,
error_message: None,
};
status.update_segment(1, segment1);
status.update_segment(2, segment2);
status.update_segment(3, segment3);
assert_eq!(status.count_by_state(SegmentState::Completed), 2);
assert_eq!(status.count_by_state(SegmentState::Failed), 1);
assert_eq!(status.count_by_state(SegmentState::Running), 0);
assert_eq!(status.count_by_state(SegmentState::Pending), 0);
}
#[test]
fn test_status_serialization() {
let mut status = FlockStatus::new(
"test-session".to_string(),
PathBuf::from("/test/project"),
PathBuf::from("/test/workspace"),
1,
);
let segment1 = SegmentStatus {
segment_id: 1,
workspace: PathBuf::from("/test/workspace/segment-1"),
state: SegmentState::Completed,
started_at: Utc::now(),
completed_at: Some(Utc::now()),
tokens_used: 1000,
tool_calls: 50,
errors: 2,
current_turn: 5,
max_turns: 10,
last_message: Some("Done".to_string()),
error_message: None,
};
status.update_segment(1, segment1);
// Serialize to JSON
let json = serde_json::to_string(&status).expect("Failed to serialize");
assert!(json.contains("test-session"));
assert!(json.contains("segment_id"));
assert!(json.contains("Completed"));
// Deserialize back
let deserialized: FlockStatus =
serde_json::from_str(&json).expect("Failed to deserialize");
assert_eq!(deserialized.session_id, "test-session");
assert_eq!(deserialized.segments.len(), 1);
assert_eq!(deserialized.total_tokens, 1000);
}
#[test]
fn test_report_generation() {
let mut status = FlockStatus::new(
"test-session".to_string(),
PathBuf::from("/test/project"),
PathBuf::from("/test/workspace"),
2,
);
let segment1 = SegmentStatus {
segment_id: 1,
workspace: PathBuf::from("/test/workspace/segment-1"),
state: SegmentState::Completed,
started_at: Utc::now(),
completed_at: Some(Utc::now()),
tokens_used: 1000,
tool_calls: 50,
errors: 2,
current_turn: 5,
max_turns: 10,
last_message: Some("Done".to_string()),
error_message: None,
};
status.update_segment(1, segment1);
let report = status.generate_report();
// Check that report contains expected sections
assert!(report.contains("FLOCK MODE SESSION REPORT"));
assert!(report.contains("test-session"));
assert!(report.contains("Segment Status:"));
assert!(report.contains("Aggregate Metrics:"));
assert!(report.contains("Segment Details:"));
assert!(report.contains("Total Tokens: 1000"));
assert!(report.contains("Total Tool Calls: 50"));
assert!(report.contains("Total Errors: 2"));
}
}