g3 console initial cut + error doesnt kill auto

This commit is contained in:
Dhanji Prasanna
2025-11-04 11:39:26 +11:00
parent 6913c5f72e
commit aaf918828f
53 changed files with 6796 additions and 23 deletions

View File

@@ -0,0 +1,147 @@
use crate::models::*;
use crate::process::{ProcessController, ProcessDetector};
use axum::{extract::State, http::StatusCode, Json};
use std::sync::Arc;
use tokio::sync::Mutex;
use tracing::{error, info};
pub type ControllerState = Arc<Mutex<ProcessController>>;
pub async fn kill_instance(
State(controller): State<ControllerState>,
axum::extract::Path(id): axum::extract::Path<String>,
) -> Result<Json<serde_json::Value>, StatusCode> {
// Extract PID from ID (format: "pid_timestamp")
let pid = id
.split('_')
.next()
.and_then(|s| s.parse::<u32>().ok())
.ok_or(StatusCode::BAD_REQUEST)?;
let mut controller = controller.lock().await;
match controller.kill_process(pid) {
Ok(_) => {
info!("Successfully killed process {}", pid);
Ok(Json(serde_json::json!({
"status": "terminating"
})))
}
Err(e) => {
error!("Failed to kill process {}: {}", pid, e);
Err(StatusCode::INTERNAL_SERVER_ERROR)
}
}
}
pub async fn restart_instance(
State(controller): State<ControllerState>,
axum::extract::Path(id): axum::extract::Path<String>,
) -> Result<Json<LaunchResponse>, StatusCode> {
info!("Restarting instance: {}", id);
// Extract PID from instance ID (format: pid_timestamp)
let pid: u32 = id
.split('_')
.next()
.and_then(|s| s.parse().ok())
.ok_or(StatusCode::BAD_REQUEST)?;
let mut controller = controller.lock().await;
// Get stored launch params
let params = controller.get_launch_params(pid)
.ok_or(StatusCode::NOT_FOUND)?;
// Launch new instance with same parameters
let new_pid = controller.launch_g3(
params.workspace.to_str().unwrap(),
&params.provider,
&params.model,
&params.prompt,
params.autonomous,
params.g3_binary_path.as_deref(),
).map_err(|e| {
error!("Failed to restart instance: {}", e);
StatusCode::INTERNAL_SERVER_ERROR
})?;
let new_id = format!("{}_{}", new_pid, chrono::Utc::now().timestamp());
Ok(Json(LaunchResponse {
id: new_id,
status: "starting".to_string(),
}))
}
pub async fn launch_instance(
State(controller): State<ControllerState>,
Json(request): Json<LaunchRequest>,
) -> Result<Json<LaunchResponse>, (StatusCode, Json<serde_json::Value>)> {
info!("Launching new g3 instance: {:?}", request);
// Validate binary path if provided
if let Some(ref binary_path) = request.g3_binary_path {
let path = std::path::Path::new(binary_path);
// Check if file exists
if !path.exists() {
error!("G3 binary not found: {}", binary_path);
return Err((StatusCode::BAD_REQUEST, Json(serde_json::json!({
"error": "G3 binary not found",
"message": format!("The specified g3 binary does not exist: {}", binary_path)
}))));
}
// Check if file is executable (Unix only)
#[cfg(unix)]
{
use std::os::unix::fs::PermissionsExt;
if let Ok(metadata) = std::fs::metadata(path) {
if metadata.permissions().mode() & 0o111 == 0 {
error!("G3 binary is not executable: {}", binary_path);
return Err((StatusCode::BAD_REQUEST, Json(serde_json::json!({
"error": "G3 binary is not executable",
"message": format!("The specified g3 binary is not executable: {}", binary_path)
}))));
}
}
}
}
let workspace = request.workspace.to_str().ok_or_else(|| {
(StatusCode::BAD_REQUEST, Json(serde_json::json!({
"error": "Invalid workspace path",
"message": "The workspace path contains invalid characters"
})))
})?;
let autonomous = request.mode == LaunchMode::Ensemble;
let g3_binary_path = request.g3_binary_path.as_deref();
let mut controller = controller.lock().await;
match controller.launch_g3(
workspace,
&request.provider,
&request.model,
&request.prompt,
autonomous,
g3_binary_path,
) {
Ok(pid) => {
let id = format!("{}_{}", pid, chrono::Utc::now().timestamp());
info!("Successfully launched g3 instance with PID {}", pid);
Ok(Json(LaunchResponse {
id,
status: "starting".to_string(),
}))
}
Err(e) => {
error!("Failed to launch g3 instance: {}", e);
Err((StatusCode::INTERNAL_SERVER_ERROR, Json(serde_json::json!({
"error": "Failed to launch instance",
"message": format!("Error: {}", e)
}))))
}
}
}

View File

@@ -0,0 +1,189 @@
use crate::logs::{LogParser, StatsAggregator};
use crate::models::*;
use crate::process::ProcessDetector;
use axum::{extract::State, http::StatusCode, Json};
use std::sync::Arc;
use tokio::sync::Mutex;
use tracing::{debug, error, warn};
pub type AppState = Arc<Mutex<ProcessDetector>>;
pub async fn list_instances(
State(detector): State<AppState>,
) -> Result<Json<Vec<InstanceDetail>>, StatusCode> {
let mut detector = detector.lock().await;
match detector.detect_instances() {
Ok(instances) => {
let mut details = Vec::new();
for instance in instances {
match get_instance_detail(&instance) {
Ok(detail) => details.push(detail),
Err(e) => {
error!("Failed to get instance detail: {}", e);
// Continue with other instances
}
}
}
Ok(Json(details))
}
Err(e) => {
error!("Failed to detect instances: {}", e);
Err(StatusCode::INTERNAL_SERVER_ERROR)
}
}
}
pub async fn get_instance(
State(detector): State<AppState>,
axum::extract::Path(id): axum::extract::Path<String>,
) -> Result<Json<InstanceDetail>, StatusCode> {
let mut detector = detector.lock().await;
match detector.detect_instances() {
Ok(instances) => {
if let Some(instance) = instances.into_iter().find(|i| i.id == id) {
match get_instance_detail(&instance) {
Ok(detail) => Ok(Json(detail)),
Err(e) => {
error!("Failed to get instance detail: {}", e);
Err(StatusCode::INTERNAL_SERVER_ERROR)
}
}
} else {
Err(StatusCode::NOT_FOUND)
}
}
Err(e) => {
error!("Failed to detect instances: {}", e);
Err(StatusCode::INTERNAL_SERVER_ERROR)
}
}
}
fn get_instance_detail(instance: &Instance) -> anyhow::Result<InstanceDetail> {
// Parse logs - don't fail if logs don't exist yet
let log_entries = match LogParser::parse_logs(&instance.workspace) {
Ok(entries) => entries,
Err(e) => {
warn!("Failed to parse logs for instance {}: {}. Instance may be newly started.", instance.id, e);
Vec::new()
}
};
// Aggregate stats
let is_ensemble = instance.instance_type == crate::models::InstanceType::Ensemble;
let stats = StatsAggregator::aggregate_stats(&log_entries, instance.start_time, is_ensemble);
// Get latest message
let latest_message = StatsAggregator::get_latest_message(&log_entries);
// Get git status - don't fail if not a git repo
let git_status = match get_git_status(&instance.workspace) {
Some(status) => Some(status),
None => {
debug!("No git status available for workspace: {:?}", instance.workspace);
None
}
};
// Get project files
let project_files = get_project_files(&instance.workspace);
Ok(InstanceDetail {
instance: instance.clone(),
stats,
latest_message,
git_status,
project_files,
})
}
fn get_git_status(workspace: &std::path::Path) -> Option<GitStatus> {
use std::process::Command;
// Get current branch
let branch = Command::new("git")
.arg("-C")
.arg(workspace)
.arg("branch")
.arg("--show-current")
.output()
.ok()
.and_then(|output| String::from_utf8(output.stdout).ok())
.map(|s| s.trim().to_string())?;
// Get status
let status_output = Command::new("git")
.arg("-C")
.arg(workspace)
.arg("status")
.arg("--porcelain")
.output()
.ok()
.and_then(|output| String::from_utf8(output.stdout).ok())?;
let mut modified_files = Vec::new();
let mut added_files = Vec::new();
let mut deleted_files = Vec::new();
for line in status_output.lines() {
if line.len() < 4 {
continue;
}
let status = &line[0..2];
let file = line[3..].trim();
match status.trim() {
"M" | "MM" => modified_files.push(file.to_string()),
"A" | "AM" => added_files.push(file.to_string()),
"D" => deleted_files.push(file.to_string()),
_ => modified_files.push(file.to_string()),
}
}
let uncommitted_changes = modified_files.len() + added_files.len() + deleted_files.len();
Some(GitStatus {
branch,
uncommitted_changes,
modified_files,
added_files,
deleted_files,
})
}
fn get_project_files(workspace: &std::path::Path) -> ProjectFiles {
let requirements = read_file_snippet(workspace, "requirements.md");
let readme = read_file_snippet(workspace, "README.md");
let agents = read_file_snippet(workspace, "AGENTS.md");
ProjectFiles {
requirements,
readme,
agents,
}
}
fn read_file_snippet(workspace: &std::path::Path, filename: &str) -> Option<String> {
use std::fs;
let path = workspace.join(filename);
if !path.exists() {
return None;
}
fs::read_to_string(&path)
.ok()
.map(|content| {
// Return first 10 lines
content
.lines()
.take(10)
.collect::<Vec<_>>()
.join("\n")
})
}

View File

@@ -0,0 +1,44 @@
use crate::logs::LogParser;
use crate::models::*;
use crate::process::ProcessDetector;
use axum::{extract::State, http::StatusCode, Json};
use std::sync::Arc;
use tokio::sync::Mutex;
use tracing::error;
pub type LogState = Arc<Mutex<ProcessDetector>>;
pub async fn get_instance_logs(
State(detector): State<LogState>,
axum::extract::Path(id): axum::extract::Path<String>,
) -> Result<Json<serde_json::Value>, StatusCode> {
let mut detector = detector.lock().await;
match detector.detect_instances() {
Ok(instances) => {
if let Some(instance) = instances.into_iter().find(|i| i.id == id) {
match LogParser::parse_logs(&instance.workspace) {
Ok(entries) => {
let messages = LogParser::extract_chat_messages(&entries);
let tool_calls = LogParser::extract_tool_calls(&entries);
Ok(Json(serde_json::json!({
"messages": messages,
"tool_calls": tool_calls,
})))
}
Err(e) => {
error!("Failed to parse logs: {}", e);
Err(StatusCode::INTERNAL_SERVER_ERROR)
}
}
} else {
Err(StatusCode::NOT_FOUND)
}
}
Err(e) => {
error!("Failed to detect instances: {}", e);
Err(StatusCode::INTERNAL_SERVER_ERROR)
}
}
}

View File

@@ -0,0 +1,9 @@
pub mod instances;
pub mod control;
pub mod logs;
pub mod state;
pub use instances::*;
pub use control::*;
pub use logs::*;
pub use state::*;

View File

@@ -0,0 +1,99 @@
use crate::launch::ConsoleState;
use axum::{http::StatusCode, Json};
use serde::{Deserialize, Serialize};
use std::path::PathBuf;
use std::os::unix::fs::PermissionsExt;
use tracing::{error, info};
pub async fn get_state() -> Result<Json<ConsoleState>, StatusCode> {
let state = ConsoleState::load();
Ok(Json(state))
}
pub async fn save_state(
Json(state): Json<ConsoleState>,
) -> Result<Json<serde_json::Value>, StatusCode> {
match state.save() {
Ok(_) => {
info!("Console state saved successfully");
Ok(Json(serde_json::json!({
"status": "saved"
})))
}
Err(e) => {
error!("Failed to save console state: {}", e);
Err(StatusCode::INTERNAL_SERVER_ERROR)
}
}
}
#[derive(Debug, Serialize, Deserialize)]
pub struct BrowseRequest {
pub path: Option<String>,
pub browse_type: String, // "directory" or "file"
}
#[derive(Debug, Serialize)]
pub struct BrowseResponse {
pub current_path: String,
pub parent_path: Option<String>,
pub entries: Vec<FileEntry>,
}
#[derive(Debug, Serialize)]
pub struct FileEntry {
pub name: String,
pub path: String,
pub is_directory: bool,
pub is_executable: bool,
}
pub async fn browse_filesystem(
Json(request): Json<BrowseRequest>,
) -> Result<Json<BrowseResponse>, StatusCode> {
use std::fs;
let path = if let Some(p) = request.path {
PathBuf::from(p)
} else {
std::env::current_dir().unwrap_or_else(|_| PathBuf::from("."))
};
let current_path = path.canonicalize()
.map_err(|_| StatusCode::BAD_REQUEST)?
.to_string_lossy()
.to_string();
let parent_path = path.parent()
.and_then(|p| p.to_str())
.map(|s| s.to_string());
let mut entries = Vec::new();
if let Ok(read_dir) = fs::read_dir(&path) {
for entry in read_dir.flatten() {
if let Ok(metadata) = entry.metadata() {
entries.push(FileEntry {
name: entry.file_name().to_string_lossy().to_string(),
path: entry.path().to_string_lossy().to_string(),
is_directory: metadata.is_dir(),
is_executable: metadata.permissions().mode() & 0o111 != 0,
});
}
}
}
entries.sort_by(|a, b| {
match (a.is_directory, b.is_directory) {
(true, false) => std::cmp::Ordering::Less,
(false, true) => std::cmp::Ordering::Greater,
_ => a.name.cmp(&b.name),
}
});
Ok(Json(BrowseResponse {
current_path,
parent_path,
entries,
}))
}

View File

@@ -0,0 +1,66 @@
use serde::{Deserialize, Serialize};
use std::fs;
use std::path::PathBuf;
use tracing::info;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ConsoleState {
pub theme: String,
pub last_workspace: Option<String>,
pub g3_binary_path: Option<String>,
pub last_provider: Option<String>,
pub last_model: Option<String>,
}
impl Default for ConsoleState {
fn default() -> Self {
Self {
theme: "dark".to_string(),
last_workspace: None,
g3_binary_path: None,
last_provider: Some("databricks".to_string()),
last_model: Some("databricks-claude-sonnet-4-5".to_string()),
}
}
}
impl ConsoleState {
pub fn load() -> Self {
let config_path = Self::config_path();
if config_path.exists() {
if let Ok(content) = fs::read_to_string(&config_path) {
return serde_json::from_str(&content).unwrap_or_else(|e| {
tracing::warn!("Failed to parse console state: {}", e);
Self::default()
});
}
}
Self::default()
}
pub fn save(&self) -> anyhow::Result<()> {
let config_path = Self::config_path();
info!("Saving console state to: {:?}", config_path);
// Create parent directory if it doesn't exist
if let Some(parent) = config_path.parent() {
fs::create_dir_all(parent)?;
}
let content = serde_json::to_string_pretty(self)?;
fs::write(&config_path, content)?;
info!("Console state saved successfully to: {:?}", config_path);
Ok(())
}
fn config_path() -> PathBuf {
// Use explicit ~/.config/g3/console-state.json path as per requirements
let home = dirs::home_dir().unwrap_or_else(|| PathBuf::from("."));
home.join(".config")
.join("g3")
.join("console-state.json")
}
}

View File

@@ -0,0 +1,104 @@
mod api;
mod logs;
mod models;
mod process;
mod launch;
use api::control::{kill_instance, launch_instance, restart_instance, ControllerState};
use api::instances::{get_instance, list_instances, AppState};
use api::logs::get_instance_logs;
use api::state::{get_state, save_state, browse_filesystem};
use axum::{
routing::{get, post},
Router,
};
use clap::Parser;
use process::{ProcessController, ProcessDetector};
use std::sync::Arc;
use tokio::sync::Mutex;
use tower_http::cors::CorsLayer;
use tower_http::services::ServeDir;
use tracing::{info, Level};
use tracing_subscriber;
#[derive(Parser, Debug)]
#[command(name = "g3-console")]
#[command(about = "Web console for monitoring and managing g3 instances")]
struct Args {
/// Port to bind to
#[arg(long, default_value = "9090")]
port: u16,
/// Host to bind to
#[arg(long, default_value = "127.0.0.1")]
host: String,
/// Auto-open browser
#[arg(long)]
open: bool,
}
#[tokio::main]
async fn main() -> anyhow::Result<()> {
// Initialize tracing
tracing_subscriber::fmt()
.with_max_level(Level::INFO)
.init();
let args = Args::parse();
// Create shared state
let detector = Arc::new(Mutex::new(ProcessDetector::new()));
let controller = Arc::new(Mutex::new(ProcessController::new()));
// Build API routes with different state for different endpoints
let instance_routes = Router::new()
.route("/instances", get(list_instances))
.route("/instances/:id", get(get_instance))
.route("/instances/:id/logs", get(get_instance_logs))
.with_state(detector.clone());
let control_routes = Router::new()
.route("/instances/:id/kill", post(kill_instance))
.route("/instances/:id/restart", post(restart_instance))
.route("/instances/launch", post(launch_instance))
.with_state(controller.clone());
let state_routes = Router::new()
.route("/state", get(get_state))
.route("/state", post(save_state))
.route("/browse", post(browse_filesystem))
.with_state(controller.clone());
// Combine routes
let api_routes = Router::new()
.merge(instance_routes)
.merge(control_routes)
.merge(state_routes);
// Serve static files from web directory
let web_dir = std::path::PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("web");
let static_service = ServeDir::new(web_dir);
// Build main app
let app = Router::new()
.nest("/api", api_routes)
.fallback_service(static_service)
.layer(CorsLayer::permissive());
let addr = format!("{}:{}", args.host, args.port);
info!("Starting g3-console on http://{}", addr);
// Auto-open browser if requested
if args.open {
let url = format!("http://{}", addr);
info!("Opening browser to {}", url);
let _ = open::that(&url);
}
// Start server
let listener = tokio::net::TcpListener::bind(&addr).await?;
axum::serve(listener, app).await?;
Ok(())
}

View File

@@ -0,0 +1,127 @@
use serde::{Deserialize, Serialize};
use std::path::PathBuf;
use chrono::{DateTime, Utc};
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Instance {
pub id: String,
pub pid: u32,
pub workspace: PathBuf,
pub start_time: DateTime<Utc>,
pub status: InstanceStatus,
pub instance_type: InstanceType,
pub provider: Option<String>,
pub model: Option<String>,
pub execution_method: ExecutionMethod,
pub command_line: String,
// Store original launch parameters for restart
pub launch_params: Option<LaunchParams>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct LaunchParams {
pub workspace: PathBuf,
pub provider: String,
pub model: String,
pub prompt: String,
pub autonomous: bool,
pub g3_binary_path: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
#[serde(rename_all = "lowercase")]
pub enum InstanceStatus {
Running,
Completed,
Failed,
Idle,
Terminated,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
#[serde(rename_all = "lowercase")]
pub enum InstanceType {
Single,
Ensemble,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
#[serde(rename_all = "lowercase")]
pub enum ExecutionMethod {
Binary,
CargoRun,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct InstanceStats {
pub total_tokens: u64,
pub tool_calls: u64,
pub errors: u64,
pub duration_secs: u64,
pub turns: Option<Vec<TurnInfo>>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct InstanceDetail {
#[serde(flatten)]
pub instance: Instance,
pub stats: InstanceStats,
pub latest_message: Option<String>,
pub git_status: Option<GitStatus>,
pub project_files: ProjectFiles,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct GitStatus {
pub branch: String,
pub uncommitted_changes: usize,
pub modified_files: Vec<String>,
pub added_files: Vec<String>,
pub deleted_files: Vec<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
pub struct ProjectFiles {
pub requirements: Option<String>,
pub readme: Option<String>,
pub agents: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct LaunchRequest {
pub prompt: String,
pub workspace: PathBuf,
pub provider: String,
pub model: String,
pub mode: LaunchMode,
pub g3_binary_path: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
#[serde(rename_all = "lowercase")]
pub enum LaunchMode {
Single,
Ensemble,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct LaunchResponse {
pub id: String,
pub status: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TurnInfo {
pub agent: String,
pub duration_secs: u64,
pub status: String,
pub color: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ProgressInfo {
pub mode: InstanceType,
pub duration_secs: u64,
pub estimated_finish_secs: Option<u64>,
pub turns: Vec<TurnInfo>,
}

View File

@@ -0,0 +1,47 @@
use serde::{Deserialize, Serialize};
use chrono::{DateTime, Utc};
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ChatMessage {
pub id: String,
pub timestamp: DateTime<Utc>,
pub agent: AgentType,
pub content: String,
pub message_type: MessageType,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
#[serde(rename_all = "lowercase")]
pub enum AgentType {
Coach,
Player,
Single,
User,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
#[serde(rename_all = "lowercase")]
pub enum MessageType {
Text,
ToolCall,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ToolCall {
pub id: String,
pub timestamp: DateTime<Utc>,
pub tool_name: String,
pub parameters: serde_json::Value,
pub result: Option<serde_json::Value>,
pub execution_time_ms: Option<u64>,
pub success: bool,
pub error: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct LogEntry {
pub timestamp: DateTime<Utc>,
pub level: String,
pub message: String,
pub fields: serde_json::Value,
}

View File

@@ -0,0 +1,5 @@
pub mod instance;
pub mod message;
pub use instance::*;
pub use message::*;

View File

@@ -0,0 +1,270 @@
use anyhow::{anyhow, Context, Result};
use std::process::{Command, Stdio};
use std::os::unix::process::CommandExt;
use std::collections::HashMap;
use std::sync::Mutex;
use std::path::PathBuf;
use sysinfo::{Pid, Signal, System, Process};
use tracing::{debug, error, info};
use crate::models::LaunchParams;
pub struct ProcessController {
system: System,
launch_params: Mutex<HashMap<u32, LaunchParams>>,
}
impl ProcessController {
pub fn new() -> Self {
Self {
system: System::new_all(),
launch_params: Mutex::new(HashMap::new()),
}
}
pub fn kill_process(&mut self, pid: u32) -> Result<()> {
let sysinfo_pid = Pid::from_u32(pid);
self.system.refresh_processes();
if let Some(process) = self.system.process(sysinfo_pid) {
info!("Killing process {} ({})", pid, process.name());
// Try SIGTERM first
if process.kill_with(Signal::Term).is_some() {
debug!("Sent SIGTERM to process {}", pid);
// Wait a bit and check if it's still running
std::thread::sleep(std::time::Duration::from_secs(2));
self.system.refresh_processes();
if self.system.process(sysinfo_pid).is_some() {
// Still running, send SIGKILL
if let Some(proc) = self.system.process(sysinfo_pid) {
proc.kill_with(Signal::Kill);
debug!("Sent SIGKILL to process {}", pid);
}
}
Ok(())
} else {
Err(anyhow!("Failed to send signal to process {}", pid))
}
} else {
Err(anyhow!("Process {} not found", pid))
}
}
#[cfg(unix)]
pub fn launch_g3(
&mut self,
workspace: &str,
provider: &str,
model: &str,
prompt: &str,
autonomous: bool,
g3_binary_path: Option<&str>,
) -> Result<u32> {
let binary = g3_binary_path.unwrap_or("g3");
let mut cmd = Command::new(binary);
cmd.arg("--workspace")
.arg(workspace)
.arg("--provider")
.arg(provider)
.arg("--model")
.arg(model);
if autonomous {
cmd.arg("--autonomous");
}
cmd.arg(prompt);
// Run in background with proper detachment
cmd.stdout(Stdio::null())
.stderr(Stdio::null())
.stdin(Stdio::null());
// Double-fork technique to prevent zombie processes:
// 1. Fork once to create intermediate process
// 2. Intermediate process forks again and exits immediately
// 3. Grandchild is adopted by init (PID 1) which will reap it
unsafe {
cmd.pre_exec(|| {
// Fork again inside the child
match libc::fork() {
-1 => return Err(std::io::Error::last_os_error()),
0 => {
// Grandchild: create new session and continue
libc::setsid();
// Continue execution (this becomes the actual g3 process)
}
_ => {
// Child: exit immediately so parent can reap it
libc::_exit(0);
}
}
Ok(())
});
}
info!("Launching g3: {:?}", cmd);
// Spawn and wait for the intermediate process to exit
let mut child = cmd.spawn().context("Failed to spawn g3 process")?;
let intermediate_pid = child.id();
// Wait for intermediate process (it will exit immediately after forking)
child.wait().context("Failed to wait for intermediate process")?;
// The actual g3 process is now running as orphan
// We need to scan for it by matching workspace and recent start time
info!("Scanning for newly launched g3 process in workspace: {}", workspace);
// Wait a moment for the process to fully start
std::thread::sleep(std::time::Duration::from_millis(500));
// Refresh and scan for the process
self.system.refresh_processes();
let workspace_path = PathBuf::from(workspace);
let mut found_pid = None;
for (pid, process) in self.system.processes() {
let cmd = process.cmd();
let cmd_str = cmd.join(" ");
// Check if this is a g3 process
let is_g3 = process.name().contains("g3") || cmd_str.contains("g3");
if !is_g3 {
continue;
}
// Check if it has our workspace
let has_workspace = cmd.iter().any(|arg| {
if let Ok(path) = PathBuf::from(arg).canonicalize() {
if let Ok(ws) = workspace_path.canonicalize() {
return path == ws;
}
}
false
});
if has_workspace {
// Check if it's recent (started within last 5 seconds)
let now = std::time::SystemTime::now();
let start_time = std::time::UNIX_EPOCH + std::time::Duration::from_secs(process.start_time());
if let Ok(duration) = now.duration_since(start_time) {
if duration.as_secs() < 5 {
found_pid = Some(pid.as_u32());
break;
}
}
}
}
let pid = found_pid.unwrap_or(intermediate_pid);
info!("Launched g3 process with PID {}", pid);
// Store launch params for restart
let params = LaunchParams {
workspace: workspace.into(),
provider: provider.to_string(),
model: model.to_string(),
prompt: prompt.to_string(),
autonomous,
g3_binary_path: g3_binary_path.map(|s| s.to_string()),
};
if let Ok(mut map) = self.launch_params.lock() {
map.insert(pid, params);
}
Ok(pid)
}
pub fn get_launch_params(&mut self, pid: u32) -> Option<LaunchParams> {
// First check if we have stored params (for console-launched instances)
if let Ok(map) = self.launch_params.lock() {
if let Some(params) = map.get(&pid) {
return Some(params.clone());
}
}
// If not found, try to parse from process command line (for detected instances)
self.system.refresh_processes();
let sysinfo_pid = Pid::from_u32(pid);
if let Some(process) = self.system.process(sysinfo_pid) {
let cmd = process.cmd();
return self.parse_launch_params_from_cmd(cmd);
}
None
}
fn parse_launch_params_from_cmd(&self, cmd: &[String]) -> Option<LaunchParams> {
let mut workspace = None;
let mut provider = None;
let mut model = None;
let mut prompt = None;
let mut autonomous = false;
let mut g3_binary_path = None;
let mut i = 0;
while i < cmd.len() {
match cmd[i].as_str() {
"--workspace" | "-w" if i + 1 < cmd.len() => {
workspace = Some(PathBuf::from(&cmd[i + 1]));
i += 2;
}
"--provider" if i + 1 < cmd.len() => {
provider = Some(cmd[i + 1].clone());
i += 2;
}
"--model" if i + 1 < cmd.len() => {
model = Some(cmd[i + 1].clone());
i += 2;
}
"--autonomous" => {
autonomous = true;
i += 1;
}
_ => {
// Last non-flag argument is likely the prompt
if !cmd[i].starts_with('-') && i == cmd.len() - 1 {
prompt = Some(cmd[i].clone());
}
i += 1;
}
}
}
// Try to determine binary path from cmd[0]
if !cmd.is_empty() {
let first = &cmd[0];
if first.contains("g3") && !first.contains("cargo") {
g3_binary_path = Some(first.clone());
}
}
// Only return params if we have the minimum required fields
if let (Some(ws), Some(prov), Some(mdl), Some(prmt)) = (workspace, provider, model, prompt) {
Some(LaunchParams {
workspace: ws,
provider: prov,
model: mdl,
prompt: prmt,
autonomous,
g3_binary_path,
})
} else {
None
}
}
}
impl Default for ProcessController {
fn default() -> Self {
Self::new()
}
}

View File

@@ -0,0 +1,172 @@
use crate::models::{ExecutionMethod, Instance, InstanceStatus, InstanceType};
use anyhow::{Context, Result};
use chrono::{DateTime, Utc};
use std::path::PathBuf;
use std::process::Command;
use sysinfo::{System, Process, Pid};
use tracing::{debug, warn};
pub struct ProcessDetector {
system: System,
}
impl ProcessDetector {
pub fn new() -> Self {
Self {
system: System::new_all(),
}
}
pub fn detect_instances(&mut self) -> Result<Vec<Instance>> {
self.system.refresh_processes();
let mut instances = Vec::new();
// Find all g3 processes
for (pid, process) in self.system.processes() {
let cmd = process.cmd();
if cmd.is_empty() {
continue;
}
// Check if this is a g3 process (binary or cargo run)
if let Some(instance) = self.parse_g3_process(*pid, process, cmd) {
instances.push(instance);
}
}
debug!("Detected {} g3 instances", instances.len());
Ok(instances)
}
fn parse_g3_process(
&self,
pid: Pid,
process: &Process,
cmd: &[String],
) -> Option<Instance> {
let cmd_str = cmd.join(" ");
// Check if this is a g3 binary
let is_g3_binary = cmd.get(0).map(|s| s.ends_with("g3")).unwrap_or(false);
// Check if this is cargo run with g3
let is_cargo_run = cmd.get(0).map(|s| s.contains("cargo")).unwrap_or(false)
&& cmd.iter().any(|s| s == "run" || s.contains("g3"));
if !is_g3_binary && !is_cargo_run {
return None;
}
// Extract workspace directory
let workspace = self.extract_workspace(pid, process, cmd)?;
// Determine execution method
let execution_method = if is_cargo_run {
ExecutionMethod::CargoRun
} else {
ExecutionMethod::Binary
};
// Determine instance type (ensemble if --autonomous flag present)
let instance_type = if cmd.iter().any(|s| s == "--autonomous") {
InstanceType::Ensemble
} else {
InstanceType::Single
};
// Extract provider and model
let provider = self.extract_flag_value(cmd, "--provider");
let model = self.extract_flag_value(cmd, "--model");
// Get start time
let start_time = DateTime::from_timestamp(process.start_time() as i64, 0)
.unwrap_or_else(Utc::now);
// Generate instance ID from PID and start time
let id = format!("{}_{}", pid, start_time.timestamp());
Some(Instance {
id,
pid: pid.as_u32(),
workspace,
start_time,
status: InstanceStatus::Running,
instance_type,
provider,
model,
execution_method,
command_line: cmd_str,
launch_params: None, // Not available for detected processes
})
}
fn extract_workspace(&self, pid: Pid, process: &Process, cmd: &[String]) -> Option<PathBuf> {
// Look for --workspace flag
for i in 0..cmd.len() {
if cmd[i] == "--workspace" && i + 1 < cmd.len() {
return Some(PathBuf::from(&cmd[i + 1]));
}
if cmd[i] == "-w" && i + 1 < cmd.len() {
return Some(PathBuf::from(&cmd[i + 1]));
}
}
// Fallback: Try to get the working directory of the process
#[cfg(target_os = "linux")]
{
// On Linux, read /proc/<pid>/cwd symlink
let cwd_path = format!("/proc/{}/cwd", pid.as_u32());
if let Ok(cwd) = std::fs::read_link(&cwd_path) {
debug!("Found workspace via /proc for PID {}: {:?}", pid, cwd);
return Some(cwd);
}
}
#[cfg(target_os = "macos")]
{
// On macOS, use lsof to get the current working directory
if let Ok(output) = std::process::Command::new("lsof")
.args(["-p", &pid.as_u32().to_string(), "-a", "-d", "cwd", "-Fn"])
.output()
{
if let Ok(stdout) = String::from_utf8(output.stdout) {
if let Some(line) = stdout.lines().find(|l| l.starts_with('n')) {
let cwd = PathBuf::from(&line[1..]);
debug!("Found workspace via lsof for PID {}: {:?}", pid, cwd);
return Some(cwd);
}
}
}
}
// Final fallback: use current directory of console
warn!("Could not determine workspace for PID {}, using current directory", pid);
std::env::current_dir().ok()
}
fn extract_flag_value(&self, cmd: &[String], flag: &str) -> Option<String> {
for i in 0..cmd.len() {
if cmd[i] == flag && i + 1 < cmd.len() {
return Some(cmd[i + 1].clone());
}
}
None
}
pub fn get_process_status(&mut self, pid: u32) -> Option<InstanceStatus> {
self.system.refresh_processes();
let sysinfo_pid = Pid::from_u32(pid);
if self.system.process(sysinfo_pid).is_some() {
Some(InstanceStatus::Running)
} else {
Some(InstanceStatus::Terminated)
}
}
}
impl Default for ProcessDetector {
fn default() -> Self {
Self::new()
}
}

View File

@@ -0,0 +1,5 @@
pub mod detector;
pub mod controller;
pub use detector::*;
pub use controller::*;