codeflash-internal/experiments/universe-optimize/orchestrator.py

989 lines
36 KiB
Python
Raw Normal View History

#!/usr/bin/env python3
"""
Universe Optimize Orchestrator
Manages the full pipeline: provision Azure VMs, fork repos, run Codeflash
optimizations via Claude Code, collect results, and draft outreach emails.
Usage:
python orchestrator.py provision <project-id> Provision VM, fork repo, bootstrap
python orchestrator.py run <project-id> Launch Claude Code on the VM
python orchestrator.py status [project-id] Dashboard or single project status
python orchestrator.py monitor [--interval=600] Poll all running VMs
python orchestrator.py collect <project-id> SCP results, verify push, destroy VM
python orchestrator.py results <project-id> Show detailed optimization results
python orchestrator.py email <project-id> Build context + render emails
python orchestrator.py email --rerender-all Re-render all emails after template edit
python orchestrator.py email <project-id> --show Preview rendered emails
python orchestrator.py email <project-id> --show-context Show raw context.json
python orchestrator.py mark-sent <project-id> Mark emails as sent
python orchestrator.py destroy <project-id> Destroy VM without collecting
"""
from __future__ import annotations
import argparse
import json
import os
import shlex
import sqlite3
import subprocess
import sys
import time
from datetime import datetime, timezone
from glob import glob
from pathlib import Path
from textwrap import dedent
# ---------------------------------------------------------------------------
# Paths
# ---------------------------------------------------------------------------
BASE_DIR = Path(__file__).resolve().parent
PROJECTS_FILE = BASE_DIR / "projects.json"
OPTIMIZATION_MD = BASE_DIR / "optimization.md"
BOOTSTRAP_SH = BASE_DIR / "bootstrap.sh"
EMAIL_TEMPLATES_DIR = BASE_DIR / "email_templates"
RESULTS_DIR = BASE_DIR / "results"
DB_PATH = BASE_DIR / "universe_optimize.db"
CODEFLASH_AGENT_DIST = Path.home() / "Library/CloudStorage/Dropbox/hacks/codeflash-agent/dist"
# ---------------------------------------------------------------------------
# Config
# ---------------------------------------------------------------------------
AZURE_RESOURCE_GROUP = os.environ.get("UO_AZURE_RESOURCE_GROUP", "universe-optimize-rg")
AZURE_LOCATION = os.environ.get("UO_AZURE_LOCATION", "eastus")
AZURE_VM_SIZE = os.environ.get("UO_AZURE_VM_SIZE", "Standard_D4s_v5")
AZURE_VM_IMAGE = "Canonical:ubuntu-24_04-lts:server:latest"
AZURE_ADMIN_USER = "azureuser"
SSH_KEY_PATH = Path.home() / ".ssh" / "id_rsa"
GITHUB_PAT = os.environ.get(
"UO_GITHUB_PAT",
"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",
)
GITHUB_ORG = "codeflash-ai"
# Bedrock auth for Claude Code on VMs
AWS_BEARER_TOKEN_BEDROCK = os.environ.get(
"AWS_BEARER_TOKEN_BEDROCK",
"ABSKc2FydGhha0Bjb2RlZmxhc2guYWkrMS1hdC05OTIzODI0NjM5MDc6bS9tdGx4SW0wQ08yazMwU3QxZFdlbWRWeTM0NnJWZElBZmFNNmVobC9UU2tRVTBPQm4wUXVPS3ZFQWs9",
)
CODEFLASH_API_KEY = os.environ.get(
"CODEFLASH_API_KEY",
"cf-kNZFz7nM3Tl3wY4t5Kh1E58A-UhxKvLhokyTfFIz5YJ_xJJLcBBFrdp8kJF6G8ld",
)
CALENDLY_LINK = "https://calendly.com/codeflash-saurabh/30min"
# Max time (seconds) before we consider a VM stuck
VM_TIMEOUT_SECONDS = 5 * 3600 # 5 hours
# ===========================================================================
# Database
# ===========================================================================
def get_db() -> sqlite3.Connection:
db = sqlite3.connect(str(DB_PATH))
db.row_factory = sqlite3.Row
db.execute("""
CREATE TABLE IF NOT EXISTS projects (
id TEXT PRIMARY KEY,
repo TEXT NOT NULL,
language TEXT NOT NULL,
fork_url TEXT,
company TEXT,
contact_name TEXT,
contact_email TEXT,
contact_title TEXT,
status TEXT DEFAULT 'pending',
vm_name TEXT,
vm_ip TEXT,
created_at TEXT,
started_at TEXT,
completed_at TEXT,
optimization_branch TEXT,
num_optimizations INTEGER DEFAULT 0,
best_speedup TEXT,
summary_json TEXT,
email_draft_path TEXT
)
""")
db.commit()
return db
def load_projects() -> list[dict]:
with open(PROJECTS_FILE) as f:
return json.load(f)
def load_project(project_id: str) -> dict:
for p in load_projects():
if p["id"] == project_id:
return p
raise ValueError(f"Project {project_id} not found in projects.json")
def upsert_project(db: sqlite3.Connection, project: dict) -> None:
contact = project.get("target_contact", {})
db.execute("""
INSERT INTO projects (id, repo, language, company, contact_name, contact_email, contact_title, status, created_at)
VALUES (?, ?, ?, ?, ?, ?, ?, 'pending', ?)
ON CONFLICT(id) DO UPDATE SET
repo=excluded.repo, language=excluded.language, company=excluded.company,
contact_name=excluded.contact_name, contact_email=excluded.contact_email,
contact_title=excluded.contact_title
""", (
project["id"], project["repo"], project["language"],
project.get("company", ""), contact.get("name", ""),
contact.get("email", ""), contact.get("title", ""),
datetime.now(timezone.utc).isoformat(),
))
db.commit()
def update_status(db: sqlite3.Connection, project_id: str, status: str, **kwargs) -> None:
sets = ["status = ?"]
vals: list = [status]
for k, v in kwargs.items():
sets.append(f"{k} = ?")
vals.append(v)
vals.append(project_id)
db.execute(f"UPDATE projects SET {', '.join(sets)} WHERE id = ?", vals)
db.commit()
def get_project_row(db: sqlite3.Connection, project_id: str) -> sqlite3.Row | None:
return db.execute("SELECT * FROM projects WHERE id = ?", (project_id,)).fetchone()
# ===========================================================================
# SSH helpers
# ===========================================================================
def ssh_exec(ip: str, command: str, timeout: int = 120) -> str:
"""Execute a command on the VM via SSH. Returns stdout."""
import paramiko
client = paramiko.SSHClient()
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
client.connect(ip, username=AZURE_ADMIN_USER, key_filename=str(SSH_KEY_PATH), timeout=30)
try:
_, stdout, stderr = client.exec_command(command, timeout=timeout)
out = stdout.read().decode()
err = stderr.read().decode()
if err:
print(f" [ssh stderr] {err.strip()}", file=sys.stderr)
return out
finally:
client.close()
def scp_upload(ip: str, local_path: str | Path, remote_path: str) -> None:
"""Upload a file or directory to the VM."""
import paramiko
client = paramiko.SSHClient()
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
client.connect(ip, username=AZURE_ADMIN_USER, key_filename=str(SSH_KEY_PATH), timeout=30)
try:
sftp = client.open_sftp()
local_path = Path(local_path)
if local_path.is_dir():
_sftp_put_dir(sftp, local_path, remote_path)
else:
sftp.put(str(local_path), remote_path)
sftp.close()
finally:
client.close()
def _sftp_mkdir_p(sftp, remote_dir: str) -> None:
"""Recursively create remote directories (like mkdir -p)."""
dirs_to_create = []
current = remote_dir
while current and current != "/":
try:
sftp.stat(current)
break # exists
except FileNotFoundError:
dirs_to_create.append(current)
current = os.path.dirname(current)
for d in reversed(dirs_to_create):
sftp.mkdir(d)
def _sftp_put_dir(sftp, local_dir: Path, remote_dir: str) -> None:
"""Recursively upload a directory."""
_sftp_mkdir_p(sftp, remote_dir)
for item in sorted(local_dir.iterdir()):
remote_item = f"{remote_dir}/{item.name}"
if item.is_dir():
_sftp_put_dir(sftp, item, remote_item)
else:
sftp.put(str(item), remote_item)
def scp_download(ip: str, remote_path: str, local_path: str | Path) -> bool:
"""Download a file from the VM. Returns True if successful."""
import paramiko
local_path = Path(local_path)
local_path.parent.mkdir(parents=True, exist_ok=True)
client = paramiko.SSHClient()
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
client.connect(ip, username=AZURE_ADMIN_USER, key_filename=str(SSH_KEY_PATH), timeout=30)
try:
sftp = client.open_sftp()
sftp.get(remote_path, str(local_path))
sftp.close()
return True
except FileNotFoundError:
print(f" [scp] Remote file not found: {remote_path}")
return False
finally:
client.close()
# ===========================================================================
# Azure VM management (via az CLI)
# ===========================================================================
def az(cmd: str, parse_json: bool = True) -> dict | list | str:
"""Run an az CLI command. Returns parsed JSON or raw stdout."""
full_cmd = f"az {cmd}"
if parse_json and "--output" not in cmd:
full_cmd += " --output json"
result = subprocess.run(
full_cmd, shell=True, capture_output=True, text=True, timeout=600,
)
if result.returncode != 0:
raise RuntimeError(f"az command failed:\n cmd: {full_cmd}\n stderr: {result.stderr.strip()}")
if parse_json:
return json.loads(result.stdout) if result.stdout.strip() else {}
return result.stdout
def provision_vm(project_id: str, repo_name: str) -> tuple[str, str]:
"""Provision an Azure VM using az CLI. Returns (vm_name, public_ip)."""
vm_name = f"uo-{project_id}"
# Ensure resource group exists
print(f" Ensuring resource group '{AZURE_RESOURCE_GROUP}'...")
az(f"group create --name {AZURE_RESOURCE_GROUP} --location {AZURE_LOCATION}")
# Create VM (az vm create handles vnet, subnet, nsg, nic, ip, disk automatically)
ssh_pub_key_path = SSH_KEY_PATH.with_suffix(".pub")
print(f" Creating VM '{vm_name}' ({AZURE_VM_SIZE})...")
vm_info = az(
f"vm create"
f" --resource-group {AZURE_RESOURCE_GROUP}"
f" --name {vm_name}"
f" --image {AZURE_VM_IMAGE}"
f" --size {AZURE_VM_SIZE}"
f" --admin-username {AZURE_ADMIN_USER}"
f" --ssh-key-values {shlex.quote(str(ssh_pub_key_path))}"
f" --os-disk-size-gb 128"
f" --storage-sku Premium_LRS"
f" --public-ip-sku Standard"
f" --tags project=universe-optimize project_id={project_id} repo={repo_name}"
)
public_ip = vm_info.get("publicIpAddress", "")
if not public_ip:
# Fallback: query the IP explicitly
ip_info = az(
f"vm list-ip-addresses"
f" --resource-group {AZURE_RESOURCE_GROUP}"
f" --name {vm_name}"
)
public_ip = ip_info[0]["virtualMachine"]["network"]["publicIpAddresses"][0]["ipAddress"]
print(f" VM '{vm_name}' provisioned at {public_ip}")
return vm_name, public_ip
def destroy_vm(project_id: str) -> None:
"""Destroy the VM and all associated resources."""
vm_name = f"uo-{project_id}"
print(f" Deleting VM '{vm_name}' and all associated resources...")
try:
# --delete-all removes NIC, public-ip, NSG, and OS disk along with the VM
az(
f"vm delete"
f" --resource-group {AZURE_RESOURCE_GROUP}"
f" --name {vm_name}"
f" --yes"
f" --force-deletion true",
parse_json=False,
)
except RuntimeError as e:
print(f" Warning: VM delete may have partially failed: {e}")
# Clean up any remaining resources that az vm delete may leave behind
# (VNet and NSG are sometimes shared/retained)
for cmd in [
f"network nsg delete --resource-group {AZURE_RESOURCE_GROUP} --name {vm_name}NSG --yes",
f"network vnet delete --resource-group {AZURE_RESOURCE_GROUP} --name {vm_name}VNET --yes",
f"network nic delete --resource-group {AZURE_RESOURCE_GROUP} --name {vm_name}VMNic",
f"network public-ip delete --resource-group {AZURE_RESOURCE_GROUP} --name {vm_name}PublicIP",
]:
try:
az(cmd, parse_json=False)
except RuntimeError:
pass # Resource may not exist or was already cleaned up
print(f" VM '{vm_name}' destroyed.")
# ===========================================================================
# GitHub
# ===========================================================================
def fork_repo(repo: str) -> str:
"""Fork a repo to the codeflash-ai org. Returns the fork URL.
Uses gh CLI which respects the logged-in user's auth and org permissions.
Falls back to PyGithub if gh is not available.
"""
repo_name = repo.split("/")[1]
fork_url = f"https://github.com/{GITHUB_ORG}/{repo_name}"
# Check if fork already exists
result = subprocess.run(
f"gh repo view {GITHUB_ORG}/{repo_name} --json url -q .url",
shell=True, capture_output=True, text=True,
)
if result.returncode == 0 and result.stdout.strip():
print(f" Fork already exists: {result.stdout.strip()}")
return result.stdout.strip()
# Create the fork via gh CLI
result = subprocess.run(
f"gh repo fork {repo} --org {GITHUB_ORG} --clone=false",
shell=True, capture_output=True, text=True,
)
if result.returncode == 0:
print(f" Forked {repo} -> {fork_url}")
time.sleep(5) # Wait for fork to be ready
return fork_url
# Fallback to PyGithub
print(f" gh fork failed ({result.stderr.strip()}), trying PyGithub...")
from github import Auth, Github
g = Github(auth=Auth.Token(GITHUB_PAT))
source = g.get_repo(repo)
org = g.get_organization(GITHUB_ORG)
fork = org.create_fork(source)
print(f" Forked {repo} -> {fork.html_url}")
time.sleep(5)
return fork.html_url
# ===========================================================================
# Bootstrap
# ===========================================================================
def bootstrap_vm(ip: str, project: dict) -> None:
"""Upload files and run bootstrap script on the VM."""
repo_name = project["repo"].split("/")[1]
print(" Uploading optimization.md...")
scp_upload(ip, OPTIMIZATION_MD, "/home/azureuser/optimization.md")
print(" Uploading codeflash-agent plugin...")
scp_upload(ip, CODEFLASH_AGENT_DIST, "/home/azureuser/codeflash-agent/dist")
print(" Uploading bootstrap.sh...")
# Read bootstrap, substitute placeholders
bootstrap_content = BOOTSTRAP_SH.read_text()
bootstrap_content = bootstrap_content.replace("__REPO_NAME__", repo_name)
bootstrap_content = bootstrap_content.replace("__GITHUB_PAT__", GITHUB_PAT)
bootstrap_content = bootstrap_content.replace("__AWS_BEARER_TOKEN_BEDROCK__", AWS_BEARER_TOKEN_BEDROCK)
bootstrap_content = bootstrap_content.replace("__CODEFLASH_API_KEY__", CODEFLASH_API_KEY)
# Write temp file, upload, execute
tmp_bootstrap = BASE_DIR / ".tmp_bootstrap.sh"
tmp_bootstrap.write_text(bootstrap_content)
scp_upload(ip, tmp_bootstrap, "/home/azureuser/bootstrap.sh")
tmp_bootstrap.unlink()
print(" Running bootstrap (this takes a few minutes)...")
output = ssh_exec(ip, "chmod +x ~/bootstrap.sh && ~/bootstrap.sh", timeout=600)
print(output)
# ===========================================================================
# Run Claude Code
# ===========================================================================
def launch_claude(ip: str) -> None:
"""Launch Claude Code on the VM in the background."""
home = f"/home/{AZURE_ADMIN_USER}"
cmd = dedent(f"""\
nohup bash -c '
source {home}/.env_universe 2>/dev/null
cd {home}/project && claude \\
--dangerously-skip-permissions \\
--plugin-dir {home}/codeflash-agent/dist \\
--model opus \\
--max-turns 400 \\
--print \\
-p "Read the CLAUDE.md file and follow its instructions exactly." \\
< /dev/null \\
2>&1 | tee {home}/results/claude_output.log
' > /dev/null 2>&1 &
""")
ssh_exec(ip, cmd, timeout=30)
print(" Claude Code launched in background.")
def check_vm_status(ip: str) -> dict:
"""Check if Claude is still running and if results are ready."""
try:
is_running = ssh_exec(ip, "pgrep -f 'claude' > /dev/null 2>&1 && echo running || echo done", timeout=15)
has_results = ssh_exec(ip, "test -f ~/results/summary.json && echo yes || echo no", timeout=15)
return {
"reachable": True,
"is_running": is_running.strip() == "running",
"has_results": has_results.strip() == "yes",
}
except Exception as e:
return {"reachable": False, "is_running": False, "has_results": False, "error": str(e)}
# ===========================================================================
# Results collection
# ===========================================================================
def collect_results(project_id: str, ip: str) -> dict | None:
"""Download results from the VM. Returns parsed summary or None."""
home = f"/home/{AZURE_ADMIN_USER}"
result_dir = RESULTS_DIR / project_id
result_dir.mkdir(parents=True, exist_ok=True)
# Try both ~/results/ (written by optimization.md) and ~/project/.codeflash/ (written by agent)
files = [
(f"{home}/results/summary.json", result_dir / "summary.json"),
(f"{home}/results/results.tsv", result_dir / "results.tsv"),
(f"{home}/results/HANDOFF.md", result_dir / "HANDOFF.md"),
(f"{home}/results/claude_output.log", result_dir / "claude_output.log"),
(f"{home}/project/.codeflash/results.tsv", result_dir / "results.tsv"),
(f"{home}/project/.codeflash/HANDOFF.md", result_dir / "HANDOFF.md"),
]
for remote, local in files:
scp_download(ip, remote, local)
summary_path = result_dir / "summary.json"
if summary_path.exists() and summary_path.stat().st_size > 0:
with open(summary_path) as f:
return json.load(f)
return None
def verify_and_push(ip: str) -> bool:
"""Verify the optimization branch was pushed; push if not."""
home = f"/home/{AZURE_ADMIN_USER}"
check = ssh_exec(ip, f"cd {home}/project && git log --oneline origin/codeflash/optimize -1 2>/dev/null || echo NOT_PUSHED", timeout=30)
if "NOT_PUSHED" in check:
print(" Branch not pushed yet, pushing now...")
result = ssh_exec(ip, f"cd {home}/project && git push origin codeflash/optimize 2>&1", timeout=60)
print(f" {result.strip()}")
return "NOT_PUSHED" not in result
print(f" Branch already pushed: {check.strip()}")
return True
# ===========================================================================
# Email system
# ===========================================================================
def build_summary_sentence(optimizations: list[dict]) -> str:
"""Build a human-readable summary of optimizations."""
if not optimizations:
return ""
parts = []
for opt in optimizations[:3]: # top 3
func = opt.get("function", "unknown")
speedup = opt.get("cpu_speedup", "")
technique = opt.get("technique", "")
if speedup:
parts.append(f"{speedup} {func}")
elif technique:
parts.append(f"{func} ({technique})")
else:
parts.append(func)
count = len(optimizations)
if count <= 3:
return f"{count} merge-ready commits including " + ", ".join(parts)
return f"{count} merge-ready commits including " + ", ".join(parts) + f", and {count - 3} more"
def build_context(project_id: str) -> dict | None:
"""Build context.json from summary.json + projects.json."""
project = load_project(project_id)
summary_path = RESULTS_DIR / project_id / "summary.json"
if not summary_path.exists():
print(f" No summary.json found for {project_id}")
return None
with open(summary_path) as f:
summary = json.load(f)
if summary.get("status") == "failed" or summary.get("total_keeps", 0) == 0:
print(f" Skipping {project_id}: status={summary.get('status')}, keeps={summary.get('total_keeps', 0)}")
return None
best = summary.get("headline_stats", {})
opts = summary.get("optimizations", [])
repo_name = project["repo"].split("/")[1]
context = {
"first_name": project["target_contact"]["name"].split()[0],
"full_name": project["target_contact"]["name"],
"title": project["target_contact"]["title"],
"company_name": project.get("company", ""),
"repo": project["repo"],
"repo_name": repo_name,
"fork_url": f"https://github.com/{GITHUB_ORG}/{repo_name}",
"branch_url": f"https://github.com/{GITHUB_ORG}/{repo_name}/tree/codeflash/optimize",
"num_optimizations": summary.get("total_keeps", 0),
"best_function": best.get("best_function", ""),
"best_speedup": best.get("best_single_speedup", ""),
"best_description": opts[0]["description"] if opts else "",
"second_best_function": opts[1]["function"] if len(opts) > 1 else "",
"second_best_technique": opts[1].get("technique", "") if len(opts) > 1 else "",
"total_cpu_improvement_pct": best.get("total_cpu_improvement_pct", 0),
"total_memory_saved_mb": best.get("total_memory_saved_mb", 0),
"optimizations_summary": build_summary_sentence(opts),
"calendly_link": CALENDLY_LINK,
}
context_path = RESULTS_DIR / project_id / "context.json"
with open(context_path, "w") as f:
json.dump(context, f, indent=2)
print(f" Built context.json for {project_id}")
return context
def render_emails(project_id: str) -> None:
"""Render all email templates for a project using its context.json."""
context_path = RESULTS_DIR / project_id / "context.json"
if not context_path.exists():
print(f" No context.json for {project_id}. Run 'email {project_id}' first.")
return
with open(context_path) as f:
context = json.load(f)
emails_dir = RESULTS_DIR / project_id / "emails"
emails_dir.mkdir(parents=True, exist_ok=True)
templates = sorted(EMAIL_TEMPLATES_DIR.glob("email_*.md"))
if not templates:
print(" No email templates found in email_templates/")
return
for template_path in templates:
template = template_path.read_text()
try:
rendered = template.format(**context)
except KeyError as e:
print(f" Warning: missing variable {e} in template {template_path.name}")
continue
out_path = emails_dir / template_path.name
out_path.write_text(rendered)
print(f" Rendered {out_path.name}")
def render_all_emails() -> None:
"""Re-render emails for all projects that have context.json."""
if not RESULTS_DIR.exists():
print("No results directory found.")
return
for project_dir in sorted(RESULTS_DIR.iterdir()):
if not project_dir.is_dir():
continue
context_path = project_dir / "context.json"
if context_path.exists():
print(f"Re-rendering {project_dir.name}...")
render_emails(project_dir.name)
def show_emails(project_id: str) -> None:
"""Print rendered emails to stdout."""
emails_dir = RESULTS_DIR / project_id / "emails"
if not emails_dir.exists():
print(f"No rendered emails for {project_id}. Run 'email {project_id}' first.")
return
for email_path in sorted(emails_dir.glob("email_*.md")):
print(f"\n{'=' * 60}")
print(f" {email_path.name}")
print(f"{'=' * 60}")
print(email_path.read_text())
def show_context(project_id: str) -> None:
"""Print context.json to stdout."""
context_path = RESULTS_DIR / project_id / "context.json"
if not context_path.exists():
print(f"No context.json for {project_id}.")
return
with open(context_path) as f:
print(json.dumps(json.load(f), indent=2))
# ===========================================================================
# Commands
# ===========================================================================
def cmd_provision(args: argparse.Namespace) -> None:
project_id = args.project_id
project = load_project(project_id)
db = get_db()
upsert_project(db, project)
print(f"\n[provision] {project_id}: {project['repo']}")
# Step 1: Fork
print("\n Step 1: Forking repo...")
fork_url = fork_repo(project["repo"])
update_status(db, project_id, "provisioning", fork_url=fork_url)
# Step 2: Provision VM
print("\n Step 2: Provisioning Azure VM...")
vm_name, vm_ip = provision_vm(project_id, project["repo"].split("/")[1])
update_status(db, project_id, "provisioning", vm_name=vm_name, vm_ip=vm_ip)
# Wait for VM to be SSH-ready
print("\n Waiting for SSH to become available...")
for attempt in range(30):
try:
ssh_exec(vm_ip, "echo ready", timeout=10)
break
except Exception:
time.sleep(10)
else:
print(" ERROR: VM not reachable via SSH after 5 minutes")
update_status(db, project_id, "failed")
return
# Step 3: Bootstrap
print("\n Step 3: Bootstrapping VM...")
bootstrap_vm(vm_ip, project)
update_status(db, project_id, "provisioned")
print(f"\n[provision] Done. VM ready at {vm_ip}. Run: python orchestrator.py run {project_id}")
def cmd_run(args: argparse.Namespace) -> None:
project_id = args.project_id
db = get_db()
row = get_project_row(db, project_id)
if not row:
print(f"Project {project_id} not in DB. Run 'provision' first.")
return
if not row["vm_ip"]:
print(f"No VM IP for {project_id}. Run 'provision' first.")
return
print(f"\n[run] Launching Claude Code on {row['vm_ip']}...")
launch_claude(row["vm_ip"])
update_status(db, project_id, "running", started_at=datetime.now(timezone.utc).isoformat())
print(f"[run] Done. Monitor with: python orchestrator.py status {project_id}")
def cmd_status(args: argparse.Namespace) -> None:
db = get_db()
if args.project_id:
# Single project status
row = get_project_row(db, args.project_id)
if not row:
print(f"Project {args.project_id} not found in DB.")
return
print(f"\nProject: {row['id']}")
print(f"Repo: {row['repo']}")
print(f"Status: {row['status']}")
print(f"VM: {row['vm_name'] or '-'} ({row['vm_ip'] or '-'})")
print(f"Fork: {row['fork_url'] or '-'}")
print(f"Started: {row['started_at'] or '-'}")
if row["vm_ip"] and row["status"] == "running":
vm_status = check_vm_status(row["vm_ip"])
print(f"Claude: {'running' if vm_status['is_running'] else 'stopped'}")
print(f"Results: {'ready' if vm_status['has_results'] else 'not yet'}")
if row["num_optimizations"]:
print(f"Optimizations: {row['num_optimizations']}")
print(f"Best speedup: {row['best_speedup'] or '-'}")
else:
# Dashboard
rows = db.execute("SELECT * FROM projects ORDER BY id").fetchall()
if not rows:
print("No projects in DB. Add to projects.json and run 'provision'.")
return
print(f"\n{'ID':<15} {'Repo':<30} {'Status':<14} {'Opts':>5} {'Best':<12} {'Email'}")
print("-" * 90)
for row in rows:
email_status = "-"
email_dir = RESULTS_DIR / row["id"] / "emails"
if email_dir.exists() and list(email_dir.glob("email_*.md")):
email_status = "DRAFT READY"
print(f"{row['id']:<15} {row['repo']:<30} {row['status']:<14} "
f"{row['num_optimizations'] or 0:>5} {row['best_speedup'] or '-':<12} {email_status}")
def cmd_monitor(args: argparse.Namespace) -> None:
interval = args.interval
db = get_db()
print(f"[monitor] Polling running VMs every {interval}s. Ctrl-C to stop.\n")
while True:
rows = db.execute("SELECT * FROM projects WHERE status = 'running'").fetchall()
if not rows:
print(" No running projects.")
break
for row in rows:
ip = row["vm_ip"]
pid = row["id"]
status = check_vm_status(ip)
elapsed = ""
if row["started_at"]:
started = datetime.fromisoformat(row["started_at"])
elapsed_s = (datetime.now(timezone.utc) - started).total_seconds()
elapsed = f" ({int(elapsed_s / 60)}m elapsed)"
if elapsed_s > VM_TIMEOUT_SECONDS and not status["has_results"]:
print(f" {pid}: TIMEOUT after {int(elapsed_s / 3600)}h. Killing claude...")
ssh_exec(ip, "pkill -f claude || true", timeout=15)
time.sleep(60)
status = check_vm_status(ip)
if status["has_results"] and not status["is_running"]:
print(f" {pid}: COMPLETED{elapsed}")
update_status(db, pid, "completed", completed_at=datetime.now(timezone.utc).isoformat())
elif status["is_running"]:
print(f" {pid}: running{elapsed}")
elif not status["reachable"]:
print(f" {pid}: UNREACHABLE - {status.get('error', '')}")
else:
print(f" {pid}: claude stopped, no results{elapsed}")
print()
time.sleep(interval)
def cmd_collect(args: argparse.Namespace) -> None:
project_id = args.project_id
db = get_db()
row = get_project_row(db, project_id)
if not row or not row["vm_ip"]:
print(f"No VM found for {project_id}.")
return
ip = row["vm_ip"]
print(f"\n[collect] {project_id} from {ip}")
# Step 1: Download results
print("\n Step 1: Downloading results...")
summary = collect_results(project_id, ip)
# Step 2: Verify branch pushed
print("\n Step 2: Verifying branch push...")
verify_and_push(ip)
# Step 3: Update DB
if summary:
update_status(
db, project_id, "completed",
completed_at=datetime.now(timezone.utc).isoformat(),
num_optimizations=summary.get("total_keeps", 0),
best_speedup=summary.get("headline_stats", {}).get("best_single_speedup", ""),
optimization_branch="codeflash/optimize",
summary_json=json.dumps(summary),
)
print(f"\n Results: {summary.get('total_keeps', 0)} optimizations kept")
else:
update_status(db, project_id, "completed",
completed_at=datetime.now(timezone.utc).isoformat())
print("\n Warning: No summary.json found. Marked as completed with no results.")
# Step 4: Destroy VM
if not args.keep_vm:
print("\n Step 3: Destroying VM...")
destroy_vm(project_id)
update_status(db, project_id, "destroyed")
else:
print("\n Keeping VM alive (--keep-vm flag).")
print(f"\n[collect] Done. Results in results/{project_id}/")
print(f" Next: python orchestrator.py email {project_id}")
def cmd_results(args: argparse.Namespace) -> None:
project_id = args.project_id
db = get_db()
row = get_project_row(db, project_id)
summary_path = RESULTS_DIR / project_id / "summary.json"
if not summary_path.exists():
print(f"No results for {project_id}. Run 'collect' first.")
return
with open(summary_path) as f:
summary = json.load(f)
project = load_project(project_id)
repo_name = project["repo"].split("/")[1]
contact = project.get("target_contact", {})
print(f"\nProject: {project['repo']}")
print(f"Status: {summary.get('status', 'unknown')}")
print(f"Branch: https://github.com/{GITHUB_ORG}/{repo_name}/tree/codeflash/optimize")
print(f"Experiments: {summary.get('total_experiments', 0)} total "
f"({summary.get('total_keeps', 0)} kept, {summary.get('total_discards', 0)} discarded)")
opts = summary.get("optimizations", [])
if opts:
print(f"\nTop optimizations:")
for i, opt in enumerate(opts[:10], 1):
speedup = opt.get("cpu_speedup", "")
desc = opt.get("technique", opt.get("description", ""))
print(f" {i}. {opt.get('function', '?'):<25} {speedup:<15} {desc}")
hs = summary.get("headline_stats", {})
cpu_pct = hs.get("total_cpu_improvement_pct", 0)
mem_mb = hs.get("total_memory_saved_mb", 0)
if cpu_pct or mem_mb:
print(f"\nHeadline: {cpu_pct}% total CPU improvement, {mem_mb} MiB memory saved")
if contact.get("name"):
print(f"\nContact: {contact['name']} ({contact.get('title', '')}) -- {contact.get('email', '')}")
print(f"Emails: results/{project_id}/emails/")
def cmd_email(args: argparse.Namespace) -> None:
if args.rerender_all:
print("[email] Re-rendering all project emails...")
render_all_emails()
return
project_id = args.project_id
if not project_id:
print("Usage: email <project-id> or email --rerender-all")
return
if args.show_context:
show_context(project_id)
return
if args.show:
show_emails(project_id)
return
# Default: build context + render
print(f"[email] Building context and rendering emails for {project_id}...")
ctx = build_context(project_id)
if ctx:
render_emails(project_id)
print(f"\n[email] Done. Preview with: python orchestrator.py email {project_id} --show")
else:
print("[email] Could not build context (no results or all failed).")
def cmd_mark_sent(args: argparse.Namespace) -> None:
db = get_db()
update_status(db, args.project_id, "email_sent")
print(f"Marked {args.project_id} as email_sent.")
def cmd_destroy(args: argparse.Namespace) -> None:
project_id = args.project_id
db = get_db()
print(f"[destroy] Destroying VM for {project_id}...")
destroy_vm(project_id)
update_status(db, project_id, "destroyed")
print("[destroy] Done.")
# ===========================================================================
# CLI
# ===========================================================================
def main() -> None:
parser = argparse.ArgumentParser(description="Universe Optimize Orchestrator")
sub = parser.add_subparsers(dest="command")
p_provision = sub.add_parser("provision", help="Provision VM, fork repo, bootstrap")
p_provision.add_argument("project_id")
p_run = sub.add_parser("run", help="Launch Claude Code on the VM")
p_run.add_argument("project_id")
p_status = sub.add_parser("status", help="Show status dashboard or single project")
p_status.add_argument("project_id", nargs="?")
p_monitor = sub.add_parser("monitor", help="Poll running VMs periodically")
p_monitor.add_argument("--interval", type=int, default=600, help="Poll interval in seconds")
p_collect = sub.add_parser("collect", help="Collect results and destroy VM")
p_collect.add_argument("project_id")
p_collect.add_argument("--keep-vm", action="store_true", help="Don't destroy VM after collecting")
p_results = sub.add_parser("results", help="Show detailed optimization results")
p_results.add_argument("project_id")
p_email = sub.add_parser("email", help="Build context and render email drafts")
p_email.add_argument("project_id", nargs="?")
p_email.add_argument("--rerender-all", action="store_true", help="Re-render all project emails")
p_email.add_argument("--show", action="store_true", help="Preview rendered emails")
p_email.add_argument("--show-context", action="store_true", help="Show raw context.json")
p_sent = sub.add_parser("mark-sent", help="Mark project emails as sent")
p_sent.add_argument("project_id")
p_destroy = sub.add_parser("destroy", help="Destroy VM without collecting results")
p_destroy.add_argument("project_id")
args = parser.parse_args()
if not args.command:
parser.print_help()
return
commands = {
"provision": cmd_provision,
"run": cmd_run,
"status": cmd_status,
"monitor": cmd_monitor,
"collect": cmd_collect,
"results": cmd_results,
"email": cmd_email,
"mark-sent": cmd_mark_sent,
"destroy": cmd_destroy,
}
commands[args.command](args)
if __name__ == "__main__":
main()