codeflash-internal/experiments/optimization-factory/server/app.py
Sarthak Agarwal 0c16414301
optimization Pipeline (#1860)
Co-authored-by: saga4 <saga4@codeflashs-MacBook-Air.local>
2025-10-02 12:21:10 -07:00

2709 lines
No EOL
121 KiB
Python
Raw Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
Flask web server for Optimizer Factory - EC2-based Codeflash Management
Provides REST API and web UI for managing repositories and EC2-backed optimization jobs
This server manages the complete lifecycle of code optimization jobs:
1. Repository management (add/edit/delete repos from CSV)
2. Analysis submission and result retrieval
3. EC2 instance lifecycle (launch, configure, monitor, terminate)
4. Job execution and monitoring (logs, status, optimization results)
5. Web UI for managing the entire workflow
"""
# Standard library imports for core functionality
import os
import re
import json
import csv
import logging
import time
import subprocess
import tempfile
from pathlib import Path
from typing import List, Dict, Any, Optional
# Flask framework for web API and UI
from flask import Flask, jsonify, request, send_from_directory, Response, stream_with_context
# Environment variable management
from dotenv import load_dotenv
# Analysis module for repository analysis functionality
from .analyzer import submit_analysis, job_status as analysis_status, job_result as analysis_result, load_analysis_for_repo
# AWS SDK for EC2 instance management
import boto3
# SSH client for remote instance operations
import paramiko
# =============================================================================
# LOGGING CONFIGURATION
# =============================================================================
# Configure structured logging with timestamps and service identification
# This helps track request flows, performance, and debugging across the application
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
)
# Create logger for this module - used throughout the application
logger = logging.getLogger(__name__)
def log_request_start(endpoint: str, **kwargs) -> None:
"""
Log the start of an API request with parameters
Used to track incoming requests and their parameters for debugging
"""
params = ', '.join(f"{k}={v}" for k, v in kwargs.items() if v is not None)
logger.info(f"🔄 [{endpoint}] Request started - {params}")
def log_request_success(endpoint: str, duration_ms: int, **kwargs) -> None:
"""
Log successful API request completion with timing information
Helps monitor performance and successful operations
"""
details = ', '.join(f"{k}={v}" for k, v in kwargs.items() if v is not None)
logger.info(f"✅ [{endpoint}] Request completed successfully in {duration_ms}ms - {details}")
def log_request_error(endpoint: str, error: str, duration_ms: int, **kwargs) -> None:
"""
Log failed API request with error details and timing
Critical for debugging failed operations and understanding error patterns
"""
details = ', '.join(f"{k}={v}" for k, v in kwargs.items() if v is not None)
logger.error(f"❌ [{endpoint}] Request failed in {duration_ms}ms - Error: {error} - {details}")
def log_service_operation(service: str, operation: str, **kwargs) -> None:
"""
Log service operations (AWS, file operations, SSH, etc.)
Tracks detailed operations across different services for comprehensive monitoring
"""
details = ', '.join(f"{k}={v}" for k, v in kwargs.items() if v is not None)
logger.info(f"🔧 [{service}] {operation} - {details}")
# =============================================================================
# CONFIGURATION AND PATHS
# =============================================================================
# Define core application paths for data persistence and configuration
# Base directory is the project root (parent of server/) - where all config files live
BASE_DIR = Path(__file__).resolve().parent.parent
# CSV file containing repository configurations - stores repo URLs, module roots, test roots, resource tiers
CONFIG_CSV = BASE_DIR / "config" / "repos.csv"
# Server data directory for storing job tracking - where runtime data is persisted
DATA_DIR = Path(__file__).resolve().parent
# JSON file mapping repository URLs to their latest job IDs - tracks which EC2 instance handles each repo
JOBS_JSON = DATA_DIR / "jobs.json"
# Local archive directory for saving job logs before shutdown
LOGS_ARCHIVE_DIR = DATA_DIR / "logs"
# Load environment variables from .env file if present
# This makes CODEFLASH_API_KEY, GITHUB_TOKEN, AWS credentials, etc. available
try:
load_dotenv()
except Exception:
pass
# AWS EC2 configuration from environment variables
# These control which AWS region, instance type, AMI, and security settings to use
AWS_REGION = os.getenv("AWS_REGION", "us-east-1") # AWS region for EC2 instances
AWS_KEY_NAME = os.getenv("AWS_KEY_NAME", "").strip() # EC2 key pair name for SSH access
AWS_SECURITY_GROUP = os.getenv("AWS_SECURITY_GROUP", "").strip() # Security group for EC2 instances
AWS_INSTANCE_TYPE = os.getenv("AWS_INSTANCE_TYPE", "c7i.xlarge").strip() # EC2 instance size
AWS_AMI_ID = os.getenv("AWS_AMI_ID", "").strip() # AMI ID for EC2 instances (Ubuntu with pre-installed tools)
SSH_KEY_PATH = os.getenv("SSH_KEY_PATH", "").strip() # Local path to SSH private key for EC2 access
# Log successful configuration loading for debugging
logger.info(
f"🚀 Server configuration loaded - AWS_REGION={AWS_REGION}, INSTANCE_TYPE={AWS_INSTANCE_TYPE}"
)
# =============================================================================
# UTILITY FUNCTIONS
# =============================================================================
def _ensure_files() -> None:
"""
Ensures required directories and files exist for application startup
Creates the jobs.json file if it doesn't exist to track repository-to-instance mappings
"""
logger.info("📁 Ensuring required directories and files exist")
DATA_DIR.mkdir(parents=True, exist_ok=True)
if not JOBS_JSON.exists():
JOBS_JSON.write_text(json.dumps({}, indent=2))
logger.info(f"📝 Created empty jobs index file: {JOBS_JSON}")
else:
logger.info(f"📝 Jobs index file exists: {JOBS_JSON}")
def _canon_repo_url(repo_url: str) -> str:
"""
Canonicalize repo URL for consistent indexing across the application
Removes trailing slashes and normalizes URLs to prevent duplicate entries
"""
return (repo_url or "").strip().rstrip("/")
def _rsync_logs_from_instance(public_ip: str, local_dest: str) -> bool:
"""
Uses rsync to efficiently download all log files from a remote instance
to a local directory.
Args:
public_ip: The public IP of the EC2 instance.
local_dest: The local destination directory path.
Returns:
True if the rsync operation was successful, False otherwise.
"""
ssh_key = os.path.expanduser(SSH_KEY_PATH)
if not os.path.exists(ssh_key):
logger.error(f"❌ [RSYNC] SSH key not found at {ssh_key}")
return False
# Convert to absolute path for rsync
ssh_key_abs = os.path.abspath(ssh_key)
# Define rsync command with includes for all relevant log files and patterns
# This is more efficient than running multiple commands or listing files first.
rsync_command = [
"rsync",
"-avh", # Archive, verbose, human-readable
"--progress", # Show progress during transfer
"--compress", # Compress file data during the transfer
"-e", f"ssh -i {ssh_key_abs} -o StrictHostKeyChecking=no -o UserKnownHostsFile=/dev/null",
"--include=optimization-*.log",
"--include=optimization.log",
"--include=launcher.log",
"--include=llm-setup-*.log",
"--include=llm-setup.log",
"--include=claude-setup-*.log",
"--include=claude-setup-round-*.log",
"--include=tests-*.log",
"--include=tests.log",
"--include=stage.jsonl",
"--include=job.exitcode",
"--include=optimization.pid",
"--exclude=*", # Exclude all other files from the directory
f"ubuntu@{public_ip}:/home/ubuntu/app/logs/", # Source directory
str(local_dest), # Destination directory
]
try:
log_service_operation("RSYNC", "Starting log download", ip=public_ip, dest=local_dest, ssh_key=ssh_key_abs)
logger.debug(f"[RSYNC] Command: {' '.join(rsync_command)}")
# We use subprocess.run to execute the command
result = subprocess.run(
rsync_command,
capture_output=True,
text=True,
check=True, # This will raise CalledProcessError if rsync returns a non-zero exit code
timeout=300 # 5-minute timeout for the rsync operation
)
logger.info(f"✅ [RSYNC] Logs successfully synced from {public_ip}.")
# Log the output from rsync for debugging progress
if result.stdout:
logger.debug(f"[RSYNC STDOUT]:\n{result.stdout}")
return True
except subprocess.CalledProcessError as e:
logger.error(f"❌ [RSYNC] Command failed with exit code {e.returncode}")
logger.error(f"❌ [RSYNC] Command: {' '.join(rsync_command)}")
if e.stderr:
logger.error(f"❌ [RSYNC] STDERR: {e.stderr}")
if e.stdout:
logger.error(f"❌ [RSYNC] STDOUT: {e.stdout}")
log_request_error("RSYNC", f"Rsync failed with exit code {e.returncode}", 0, ip=public_ip)
return False
except subprocess.TimeoutExpired:
log_request_error("RSYNC", "Rsync operation timed out", 0, ip=public_ip)
return False
except Exception as e:
log_request_error("RSYNC", f"An unexpected error occurred during rsync: {str(e)}", 0, ip=public_ip)
return False
def _load_jobs_index() -> Dict[str, str]:
"""
Loads the job tracking index from JSON file
This maintains the mapping between repository URLs and their current EC2 instance IDs
Returns: Dictionary mapping repository URLs to their latest job IDs
"""
try:
data = json.loads(JOBS_JSON.read_text())
# Normalize keys as canonical URLs to prevent duplicate entries
if isinstance(data, dict):
normalized = {}
for k, v in data.items():
normalized[_canon_repo_url(k)] = v
if normalized != data:
JOBS_JSON.write_text(json.dumps(normalized, indent=2))
data = normalized
logger.debug(f"📖 Loaded jobs index with {len(data)} entries")
return data
except Exception as e:
logger.warning(f"⚠️ Failed to load jobs index: {e}, returning empty dict")
return {}
def _save_jobs_index(data: Dict[str, str]) -> None:
"""
Saves the job tracking index to JSON file
Persists the repository-to-instance mapping for state management across server restarts
"""
try:
# Persist canonical URL keys only to maintain consistency
normalized = { _canon_repo_url(k): v for k, v in data.items() }
JOBS_JSON.write_text(json.dumps(normalized, indent=2))
logger.debug(f"💾 Saved jobs index with {len(data)} entries")
except Exception as e:
logger.error(f"❌ Failed to save jobs index: {e}")
def _set_repo_job(repo_url: str, instance_id: str) -> None:
"""
Update jobs index mapping for a repo to an EC2 instance id
This creates/updates the association between a repository and its current optimization instance
"""
try:
jobs_index = _load_jobs_index()
jobs_index[_canon_repo_url(repo_url)] = instance_id
_save_jobs_index(jobs_index)
log_service_operation("JOBS", "Set repo->instance mapping", repo_url=_canon_repo_url(repo_url), instance_id=instance_id)
except Exception as e:
logger.exception(f"Failed to update jobs index for {repo_url}: {e}")
def _read_csv() -> List[Dict[str, str]]:
"""
Reads repository configurations from CSV file
This loads all configured repositories with their optimization settings
Returns: List of dictionaries with repo_url, module_root, tests_root, resource_tier
"""
try:
rows: List[Dict[str, str]] = []
with CONFIG_CSV.open("r", encoding="utf-8") as f:
reader = csv.DictReader(f)
for row in reader:
rows.append({
"repo_url": row.get("repo_url", "").strip(),
"module_root": row.get("module_root", "auto").strip(),
"tests_root": row.get("tests_root", "auto").strip(),
"resource_tier": row.get("resource_tier", "small").strip().lower() or "small",
})
logger.debug(f"📊 Read {len(rows)} repositories from CSV")
return rows
except Exception as e:
logger.error(f"❌ Failed to read CSV file: {e}")
return []
def _write_csv(rows: List[Dict[str, str]]) -> None:
"""
Writes repository configurations to CSV file
Persists the current repository list with all configuration changes
"""
try:
fieldnames = ["repo_url", "module_root", "tests_root", "resource_tier"]
with CONFIG_CSV.open("w", encoding="utf-8", newline="") as f:
writer = csv.DictWriter(f, fieldnames=fieldnames)
writer.writeheader()
for r in rows:
writer.writerow({
"repo_url": r["repo_url"].strip(),
"module_root": r.get("module_root", "auto").strip(),
"tests_root": r.get("tests_root", "auto").strip(),
"resource_tier": r.get("resource_tier", "small").strip().lower() or "small",
})
logger.info(f"💾 Saved {len(rows)} repositories to CSV")
except Exception as e:
logger.error(f"❌ Failed to write CSV file: {e}")
def _find_row(rows: List[Dict[str, str]], repo_url: str) -> Optional[int]:
"""
Finds the index of a repository in the rows list
Used to locate specific repositories for updates, deletions, or job launches
Args:
rows: List of repository dictionaries
repo_url: Repository URL to find
Returns: Index of the repository or None if not found
"""
for idx, r in enumerate(rows):
if r.get("repo_url", "").rstrip("/") == repo_url.rstrip("/"):
logger.debug(f"🔍 Found repository {repo_url} at index {idx}")
return idx
logger.debug(f"🔍 Repository {repo_url} not found")
return None
# =============================================================================
# INITIALIZATION
# =============================================================================
# Application startup sequence - initialize all required components
logger.info("🔧 Initializing Flask server...")
# Ensure required files exist before starting the server
# This creates the jobs.json file and ensures data directories exist
_ensure_files()
# Initialize AWS EC2 client for instance management
# This client is used throughout the application for EC2 operations
try:
ec2 = boto3.client("ec2", region_name=AWS_REGION)
logger.info(f"☁️ AWS EC2 client initialized successfully - Region: {AWS_REGION}")
except Exception as e:
logger.error(f"❌ Failed to initialize AWS EC2 client: {e}")
raise
# Initialize Flask app with static file serving capability
# The static folder serves the web UI (HTML, CSS, JS files)
app = Flask(__name__, static_folder="static")
logger.info("🌐 Flask app initialized")
# =============================================================================
# EC2 MANAGER
# =============================================================================
class EC2Manager:
"""
Simple EC2 instance lifecycle and SSH utilities for running optimizations.
This class handles the complete lifecycle of EC2 instances used for code optimization:
1. Launch instances with pre-configured user data
2. SSH operations (upload files, execute commands, download logs)
3. Instance monitoring and termination
4. File management on remote instances
"""
def __init__(self, region: str, key_name: str, security_group: str, instance_type: str, ami_id: str, ssh_key_path: str) -> None:
"""
Initialize EC2 manager with AWS configuration
Sets up the EC2 client and stores configuration for instance operations
"""
self.ec2 = boto3.client("ec2", region_name=region)
self.region = region
self.key_name = key_name
self.security_group = security_group
self.instance_type = instance_type
self.ami_id = ami_id
self.ssh_key_path = os.path.expanduser(ssh_key_path)
def launch_instance(self, job_name: str, job_tag_value: str, tags: Optional[Dict[str, str]] = None) -> str:
"""
Launch an EC2 instance and return its instance_id.
This method:
1. Creates a user-data script that installs prerequisites (git, python, codeflash, etc.)
2. Launches the instance with proper tags and security settings
3. Waits for the instance to be running
4. Returns the instance ID for tracking
The instance is initialized via user-data to install prerequisites.
"""
user_data = """#!/bin/bash
set -u
# Log all output
exec > >(tee /var/log/user-data.log)
exec 2>&1
# Prepare application directories early regardless of package install success
mkdir -p /home/ubuntu/app/scripts /home/ubuntu/app/logs || true
chown -R ubuntu:ubuntu /home/ubuntu/app || true
ln -s /home/ubuntu/app /app 2>/dev/null || true
apt-get update || true
DEBIAN_FRONTEND=noninteractive apt-get upgrade -y || true
DEBIAN_FRONTEND=noninteractive apt-get install -y \
git curl wget ca-certificates build-essential \
python3 python3-venv python3-pip \
unzip jq software-properties-common cloud-guest-utils || true
# Ensure the root partition and filesystem use the full EBS volume
# This is safe to run multiple times and works for NVMe and Xen devices
ROOT_PART=$(lsblk -no SOURCE,MOUNTPOINT | awk '$2=="/"{print "/dev/"$1}')
DISK=$(echo "$ROOT_PART" | sed -E 's|p?[0-9]+$||')
PART_NUM=$(echo "$ROOT_PART" | sed -E 's|^.*p?([0-9]+)$|\\1|')
growpart "$DISK" "$PART_NUM" || true
FSTYPE=$(findmnt -n -o FSTYPE /)
if [ "$FSTYPE" = "xfs" ]; then
xfs_growfs /
else
resize2fs "$ROOT_PART" || true
fi
# Install GitHub CLI
type -p curl >/dev/null || (apt-get update && apt-get install -y curl || true)
curl -fsSL https://cli.github.com/packages/githubcli-archive-keyring.gpg | \
dd of=/usr/share/keyrings/githubcli-archive-keyring.gpg
chmod go+r /usr/share/keyrings/githubcli-archive-keyring.gpg
echo "deb [arch=$(dpkg --print-architecture) signed-by=/usr/share/keyrings/githubcli-archive-keyring.gpg] https://cli.github.com/packages stable main" > /etc/apt/sources.list.d/github-cli.list
apt-get update || true
DEBIAN_FRONTEND=noninteractive apt-get install -y gh || true
# Install Codeflash
python3 -m pip install --upgrade pip || true
python3 -m pip install codeflash || true
echo "EC2 bootstrap complete for {JOB_NAME}" > /home/ubuntu/.bootstrap_done
""".replace("{JOB_NAME}", job_name)
log_service_operation("AWS_EC2", "Launching instance", instance_type=self.instance_type, ami_id=self.ami_id, key_name=self.key_name)
# Discover the AMI root device name so we resize the actual root volume instead of attaching an extra disk
try:
image_desc = self.ec2.describe_images(ImageIds=[self.ami_id])
root_device_name = image_desc["Images"][0].get("RootDeviceName", "/dev/sda1")
except Exception:
root_device_name = "/dev/sda1"
params = {
"ImageId": self.ami_id,
"InstanceType": self.instance_type,
"MinCount": 1,
"MaxCount": 1,
"UserData": user_data,
"TagSpecifications": [
{
"ResourceType": "instance",
"Tags": [
{"Key": "Name", "Value": job_name},
{"Key": "Job", "Value": job_tag_value},
{"Key": "Project", "Value": "OptimizerFactory"},
] + ([{"Key": k, "Value": v} for k, v in (tags or {}).items()] ),
}
],
"BlockDeviceMappings": [
{
"DeviceName": root_device_name,
"Ebs": {"VolumeSize": 50, "VolumeType": "gp3", "DeleteOnTermination": True},
}
],
}
if self.key_name:
params["KeyName"] = self.key_name
if self.security_group:
params["SecurityGroupIds"] = [self.security_group]
resp = self.ec2.run_instances(**params)
instance_id = resp["Instances"][0]["InstanceId"]
log_service_operation("AWS_EC2", "Instance launched", instance_id=instance_id)
waiter = self.ec2.get_waiter("instance_running")
waiter.wait(InstanceIds=[instance_id])
log_service_operation("AWS_EC2", "Instance is running", instance_id=instance_id)
return instance_id
def get_public_ip(self, instance_id: str) -> Optional[str]:
"""
Return the public IP address for an instance, if assigned.
Used to determine when an instance is ready for SSH connections.
"""
desc = self.ec2.describe_instances(InstanceIds=[instance_id])
try:
return desc["Reservations"][0]["Instances"][0].get("PublicIpAddress")
except Exception:
return None
def wait_for_ssh(self, public_ip: str, timeout: int = 600) -> bool:
"""
Wait until SSH is reachable on the instance.
This ensures the instance is fully booted and ready for remote operations.
"""
start = time.time()
while time.time() - start < timeout:
try:
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
ssh.connect(public_ip, username="ubuntu", key_filename=self.ssh_key_path, timeout=15, banner_timeout=15, auth_timeout=15)
ssh.close()
return True
except Exception:
time.sleep(10)
return False
def open_ssh(self, public_ip: str) -> paramiko.SSHClient:
"""
Open and return an SSH client connection.
Creates a new SSH connection for remote operations on the instance.
"""
log_service_operation("SSH", "Connecting", ip=public_ip)
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
ssh.connect(public_ip, username="ubuntu", key_filename=self.ssh_key_path)
return ssh
def upload_text(self, public_ip: str, content: str, remote_path: str) -> None:
"""
Upload text content to a remote file on the instance.
Used for uploading generated scripts (like the job wrapper script).
"""
log_service_operation("SSH", "Uploading text", ip=public_ip, remote_path=remote_path, size=len(content or ""))
ssh = self.open_ssh(public_ip)
try:
sftp = ssh.open_sftp()
# Ensure parent directory exists
parent = os.path.dirname(remote_path)
try:
sftp.stat(parent)
except FileNotFoundError:
# Try to create nested directories
parts = parent.strip('/').split('/')
cur = ''
for p in parts:
cur = f"{cur}/{p}" if cur else f"/{p}"
try:
sftp.stat(cur)
except FileNotFoundError:
try:
sftp.mkdir(cur)
except Exception:
pass
with sftp.file(remote_path, "w") as f:
f.write(content)
sftp.chmod(remote_path, 0o755)
log_service_operation("SSH", "Upload text complete", ip=public_ip, remote_path=remote_path)
finally:
ssh.close()
def upload_file(self, public_ip: str, local_path: str, remote_path: str) -> None:
"""
Upload a local file to the remote instance.
Used for uploading optimization scripts and other required files.
"""
log_service_operation("SSH", "Uploading file", ip=public_ip, local_path=local_path, remote_path=remote_path)
ssh = self.open_ssh(public_ip)
try:
sftp = ssh.open_sftp()
# Ensure parent directory exists
parent = os.path.dirname(remote_path)
try:
sftp.stat(parent)
except FileNotFoundError:
parts = parent.strip('/').split('/')
cur = ''
for p in parts:
cur = f"{cur}/{p}" if cur else f"/{p}"
try:
sftp.stat(cur)
except FileNotFoundError:
try:
sftp.mkdir(cur)
except Exception:
pass
sftp.put(local_path, remote_path)
sftp.chmod(remote_path, 0o755)
log_service_operation("SSH", "Upload file complete", ip=public_ip, remote_path=remote_path)
finally:
ssh.close()
def exec(self, public_ip: str, command: str, get_pty: bool = False) -> int:
"""
Execute a command on the remote instance and return exit status.
Used for running optimization scripts and other remote operations.
"""
log_service_operation("SSH", "Executing remote command", ip=public_ip, command=command[:120])
ssh = self.open_ssh(public_ip)
try:
_, stdout, stderr = ssh.exec_command(command, get_pty=get_pty)
status = stdout.channel.recv_exit_status()
log_service_operation("SSH", "Command completed", ip=public_ip, exit_status=status)
return status
finally:
ssh.close()
def exec_capture(self, public_ip: str, command: str, get_pty: bool = False):
"""
Execute a command and capture stdout/stderr.
Used for commands that need to return output (like grep for optimization links).
"""
log_service_operation("SSH", "Executing remote command (capture)", ip=public_ip, command=command[:120])
ssh = self.open_ssh(public_ip)
try:
_, stdout, stderr = ssh.exec_command(command, get_pty=get_pty)
out = stdout.read().decode("utf-8", errors="ignore")
err = stderr.read().decode("utf-8", errors="ignore")
code = stdout.channel.recv_exit_status()
log_service_operation("SSH", "Command completed (capture)", ip=public_ip, exit_status=code, out_len=len(out), err_len=len(err))
return code, out, err
finally:
ssh.close()
def read_file_tail(self, public_ip: str, remote_path: str, lines: int = 1000) -> List[str]:
"""
Read the last N lines of a remote file.
Used for fetching recent log entries from optimization runs.
"""
log_service_operation("SSH", "Tailing file", ip=public_ip, path=remote_path, lines=lines)
ssh = self.open_ssh(public_ip)
try:
cmd = f"bash -lc 'set -o pipefail; test -f {remote_path} && tail -n {lines} {remote_path} || true'"
# Use a longer timeout by requesting a pty and non-blocking read
_, stdout, _ = ssh.exec_command(cmd, get_pty=True)
content = stdout.read().decode("utf-8", errors="ignore")
return content.splitlines()
finally:
ssh.close()
def read_file_bytes(self, public_ip: str, remote_path: str) -> bytes:
"""
Download a remote file as bytes.
Used for downloading full log files for analysis.
"""
log_service_operation("SSH", "Downloading file", ip=public_ip, path=remote_path)
ssh = self.open_ssh(public_ip)
try:
sftp = ssh.open_sftp()
try:
with sftp.file(remote_path, "rb") as f:
data = f.read()
log_service_operation("SSH", "Download complete", ip=public_ip, path=remote_path, size=len(data))
return data
finally:
sftp.close()
finally:
ssh.close()
def file_exists(self, public_ip: str, remote_path: str) -> bool:
"""
Check if a file exists on the remote instance.
Used to verify job completion markers and log file availability.
"""
log_service_operation("SSH", "Checking file exists", ip=public_ip, path=remote_path)
ssh = self.open_ssh(public_ip)
try:
sftp = ssh.open_sftp()
try:
sftp.stat(remote_path)
log_service_operation("SSH", "File exists", ip=public_ip, path=remote_path, exists=True)
return True
except FileNotFoundError:
log_service_operation("SSH", "File not found", ip=public_ip, path=remote_path, exists=False)
return False
finally:
sftp.close()
finally:
ssh.close()
def terminate(self, instance_id: str) -> None:
"""
Terminate an EC2 instance.
Used for cleanup when jobs complete or are cancelled.
"""
try:
self.ec2.terminate_instances(InstanceIds=[instance_id])
log_service_operation("AWS_EC2", "Instance termination initiated", instance_id=instance_id)
except Exception as e:
logger.exception(f"Failed to terminate instance {instance_id}: {e}")
# Global EC2 manager instance - shared across the application
# This instance handles all EC2 operations throughout the application lifecycle
ec2_manager = EC2Manager(
region=AWS_REGION,
key_name=AWS_KEY_NAME,
security_group=AWS_SECURITY_GROUP,
instance_type=AWS_INSTANCE_TYPE,
ami_id=AWS_AMI_ID,
ssh_key_path=SSH_KEY_PATH,
)
# Simple in-memory watcher registry to avoid duplicate watchers per instance
# Prevents multiple completion watchers from being started for the same instance
_watchers: Dict[str, bool] = {}
def _start_completion_watcher(instance_id: str, public_ip: str, repo_url: str, log_path: str = "/var/log/codeflash-optimization.log") -> None:
"""
Start a background thread that waits for job completion then terminates the instance.
This watcher:
1. Monitors for job completion markers on the remote instance
2. Checks instance state to detect if it's already terminated
3. Automatically terminates the instance when the job completes
4. Prevents resource waste by ensuring instances are cleaned up
"""
if instance_id in _watchers:
return
_watchers[instance_id] = True
def _watch() -> None:
try:
logger.info(f"👀 Starting completion watcher for {instance_id}")
# Wait up to 24 hours for job exitcode marker
deadline = time.time() + 24 * 3600
while time.time() < deadline:
try:
if ec2_manager.file_exists(public_ip, "/home/ubuntu/app/logs/job.exitcode"):
logger.info(f"✅ Remote job finished on {instance_id}; terminating")
# Attempt to archive remote logs locally before termination
try:
_archive_remote_logs_to_local(public_ip, repo_url)
except Exception as _e:
logger.warning(f"⚠️ Failed to archive logs for {instance_id}: {_e}")
break
except Exception:
pass
# If instance is no longer running, exit
try:
state = _describe_instance_state(instance_id)
if state and state.lower() in {"shutting-down", "terminated", "stopped", "stopping"}:
logger.info(f" Instance {instance_id} state={state}; stopping watcher")
# Best-effort archive if still reachable
try:
_archive_remote_logs_to_local(public_ip, repo_url)
except Exception:
pass
return
except Exception:
pass
time.sleep(30)
finally:
try:
ec2_manager.terminate(instance_id)
except Exception:
pass
import threading
t = threading.Thread(target=_watch, daemon=True)
t.start()
def _describe_instance_state(instance_id: str) -> Optional[str]:
"""
Get the current state of an EC2 instance.
Used to check if instances are running, stopped, terminated, etc.
"""
try:
resp = ec2.describe_instances(InstanceIds=[instance_id])
state = resp["Reservations"][0]["Instances"][0]["State"]["Name"]
return state
except Exception:
return None
def _archive_remote_logs_to_local(public_ip: str, repo_url: str) -> None:
"""
Download remote job logs to local disk before shutdown using rsync.
Saves into server/logs/<org-repo>_<YYYY-MM-DD_HH-MM>/
"""
LOGS_ARCHIVE_DIR.mkdir(parents=True, exist_ok=True)
# Determine slug from repo_url
slug = None
try:
parts = (repo_url or "").rstrip("/").split("/")
if len(parts) >= 2:
org = parts[-2]
name = parts[-1].replace(".git", "")
slug = f"{org}-{name}"
except Exception:
pass
if not slug:
slug = "job"
# Determine timestamp from latest optimization log if available
ts = ""
try:
cmd = "bash -lc 'basename $(ls -1 /home/ubuntu/app/logs/optimization-*.log 2>/dev/null | sort -r | head -n1)'"
_, out, _ = ec2_manager.exec_capture(public_ip, cmd)
base = (out or "").strip()
m = re.match(r"optimization-([0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9]{2}-[0-9]{2})", base)
if m:
ts = m.group(1).replace("T", "_")
except Exception:
ts = ""
prefix = f"{slug}_{ts}" if ts else slug
dest_dir = LOGS_ARCHIVE_DIR / prefix
dest_dir.mkdir(parents=True, exist_ok=True)
# Use the new rsync helper to download all logs in one command
success = _rsync_logs_from_instance(public_ip, str(dest_dir))
if success:
log_service_operation("ARCHIVE", "Saved remote logs to local via rsync", dest=str(dest_dir), ip=public_ip)
else:
logger.error(f"❌ [ARCHIVE] Failed to save remote logs via rsync from {public_ip}")
# =============================================================================
# CUSTOM ERROR HANDLERS
# =============================================================================
@app.errorhandler(500)
def handle_server_error(e):
"""
Handle 500 Internal Server Error with JSON response for API calls
This handler provides user-friendly error messages for common AWS/EC2 issues:
- Authentication failures
- Missing key pairs
- Permission errors
- Other AWS configuration issues
"""
if request.path.startswith('/api/'):
# For API calls, return JSON error instead of HTML debug page
error_message = str(e.original_exception) if hasattr(e, 'original_exception') else "Internal server error"
# Extract more meaningful error from common AWS EC2 exceptions
if 'AuthFailure' in error_message:
error_message = "AWS EC2 Auth Failure: Check AWS credentials and region"
elif 'InvalidKeyPair' in error_message or 'InvalidKeyPair.NotFound' in error_message:
error_message = "AWS EC2 Key Pair Error: Check AWS_KEY_NAME exists in the region"
elif 'InvalidGroup' in error_message or 'UnauthorizedOperation' in error_message:
error_message = "AWS EC2 Permission Error: Check IAM permissions for ec2:RunInstances and security groups"
logger.error(f"❌ [ERROR_HANDLER] 500 Internal Server Error: {error_message}")
return jsonify({"error": error_message}), 500
# For non-API calls, use default HTML error page
return e
@app.errorhandler(404)
def handle_not_found(e):
"""
Handle 404 Not Found with JSON response for API calls
Provides consistent JSON error responses for API endpoints
"""
if request.path.startswith('/api/'):
logger.warning(f"⚠️ [ERROR_HANDLER] 404 Not Found: {request.path}")
return jsonify({"error": "Resource not found"}), 404
return e
@app.errorhandler(403)
def handle_forbidden(e):
"""
Handle 403 Forbidden with JSON response for API calls
Provides consistent JSON error responses for API endpoints
"""
if request.path.startswith('/api/'):
logger.warning(f"⚠️ [ERROR_HANDLER] 403 Forbidden: {request.path}")
return jsonify({"error": "Access forbidden"}), 403
return e
# =============================================================================
# FLASK ROUTES
# =============================================================================
@app.get("/")
def index() -> Any:
"""
Serves the main HTML page - the web UI for managing repositories and jobs
This is the entry point for the web interface where users can:
- View and manage repositories
- Start optimization jobs
- Monitor job progress
- View logs and results
"""
start_time = time.time()
log_request_start("GET /")
try:
result = send_from_directory(app.static_folder, "index.html")
duration_ms = int((time.time() - start_time) * 1000)
log_request_success("GET /", duration_ms)
return result
except Exception as e:
duration_ms = int((time.time() - start_time) * 1000)
log_request_error("GET /", str(e), duration_ms)
raise
@app.get("/static/<path:filename>")
def static_files(filename: str):
"""
Serves static files (CSS, JS, images) for the web UI
Handles all static assets needed by the frontend interface
"""
start_time = time.time()
log_request_start("GET /static", filename=filename)
try:
result = send_from_directory(app.static_folder, filename)
duration_ms = int((time.time() - start_time) * 1000)
log_request_success("GET /static", duration_ms, filename=filename)
return result
except Exception as e:
duration_ms = int((time.time() - start_time) * 1000)
log_request_error("GET /static", str(e), duration_ms, filename=filename)
raise
@app.get("/health")
def health() -> Any:
"""
Health check endpoint for monitoring and load balancer health checks
Returns a simple JSON response indicating the server is running
"""
start_time = time.time()
log_request_start("GET /health")
try:
result = jsonify({"ok": True})
duration_ms = int((time.time() - start_time) * 1000)
log_request_success("GET /health", duration_ms)
return result
except Exception as e:
duration_ms = int((time.time() - start_time) * 1000)
log_request_error("GET /health", str(e), duration_ms)
raise
# =============================================================================
# REPOSITORY MANAGEMENT API
# =============================================================================
@app.get("/api/repos")
def list_repos() -> Any:
"""
GET /api/repos - Lists all repositories with their latest job IDs
This endpoint:
1. Reads all repository configurations from CSV
2. Loads the job index to get current job IDs for each repo
3. Returns a combined view showing repos and their current optimization status
Returns: JSON with items array containing repository configurations
"""
start_time = time.time()
log_request_start("GET /api/repos")
try:
log_service_operation("CSV", "Reading repository configurations")
rows = _read_csv()
log_service_operation("JOBS", "Loading job index")
jobs_index = _load_jobs_index()
# Add current job ID to each repository entry
for r in rows:
r["last_job_id"] = jobs_index.get(_canon_repo_url(r.get("repo_url", ""))) or ""
duration_ms = int((time.time() - start_time) * 1000)
log_request_success("GET /api/repos", duration_ms, repo_count=len(rows))
return jsonify({"items": rows})
except Exception as e:
duration_ms = int((time.time() - start_time) * 1000)
log_request_error("GET /api/repos", str(e), duration_ms)
raise
@app.post("/api/repos")
def add_repo() -> Any:
"""
POST /api/repos - Adds a new repository to the optimization queue
This endpoint:
1. Validates the repository URL is provided
2. Checks for duplicate repositories
3. Adds the new repository with configuration settings
4. Saves the updated repository list to CSV
Body: {repo_url, module_root, tests_root, resource_tier}
Returns: JSON with success status
"""
start_time = time.time()
payload = request.get_json(force=True)
repo_url = (payload.get("repo_url") or "").strip()
log_request_start("POST /api/repos", repo_url=repo_url)
try:
if not repo_url:
duration_ms = int((time.time() - start_time) * 1000)
log_request_error("POST /api/repos", "repo_url is required", duration_ms)
return jsonify({"error": "repo_url is required"}), 400
log_service_operation("CSV", "Reading existing repositories")
rows = _read_csv()
# Check for duplicate repositories
if _find_row(rows, repo_url) is not None:
duration_ms = int((time.time() - start_time) * 1000)
log_request_error("POST /api/repos", "repo already exists", duration_ms, repo_url=repo_url)
return jsonify({"error": "repo already exists"}), 400
# Create new repository entry with default values
new_repo = {
"repo_url": repo_url,
"module_root": (payload.get("module_root") or "auto").strip(),
"tests_root": (payload.get("tests_root") or "auto").strip(),
"resource_tier": (payload.get("resource_tier") or "small").strip().lower(),
}
rows.append(new_repo)
log_service_operation("CSV", "Saving updated repository list")
_write_csv(rows)
duration_ms = int((time.time() - start_time) * 1000)
log_request_success("POST /api/repos", duration_ms, repo_url=repo_url,
resource_tier=new_repo["resource_tier"])
return jsonify({"ok": True})
except Exception as e:
duration_ms = int((time.time() - start_time) * 1000)
log_request_error("POST /api/repos", str(e), duration_ms, repo_url=repo_url)
raise
# =============================================================================
# ANALYZER API
# =============================================================================
@app.post("/api/analyze_repo")
def analyze_repo() -> Any:
"""
POST /api/analyze_repo - Submits a repository for analysis
This endpoint:
1. Validates the repository URL
2. Submits the repository to the analyzer service
3. Returns an analysis job ID for tracking progress
The analysis process examines the repository structure, dependencies,
and configuration to determine optimal settings for code optimization.
"""
start_time = time.time()
payload = request.get_json(force=True)
repo_url = (payload.get("repo_url") or "").strip()
log_request_start("POST /api/analyze_repo", repo_url=repo_url)
try:
if not repo_url:
duration_ms = int((time.time() - start_time) * 1000)
log_request_error("POST /api/analyze_repo", "repo_url is required", duration_ms)
return jsonify({"error": "repo_url is required"}), 400
log_service_operation("ANALYZER", "Submitting analysis job", repo_url=repo_url)
job_id = submit_analysis(repo_url)
duration_ms = int((time.time() - start_time) * 1000)
log_request_success("POST /api/analyze_repo", duration_ms, repo_url=repo_url, analysis_id=job_id)
return jsonify({"ok": True, "analysis_id": job_id})
except Exception as e:
duration_ms = int((time.time() - start_time) * 1000)
log_request_error("POST /api/analyze_repo", str(e), duration_ms, repo_url=repo_url)
raise
@app.get("/api/analyze_repo/status")
def analyze_status() -> Any:
"""
GET /api/analyze_repo/status - Check the status of a repository analysis
This endpoint:
1. Validates the analysis ID is provided
2. Checks the current status of the analysis job
3. Returns status information (pending, running, completed, failed)
Query params: analysis_id
Returns: JSON with status and message
"""
start_time = time.time()
analysis_id = (request.args.get("analysis_id") or "").strip()
log_request_start("GET /api/analyze_repo/status", analysis_id=analysis_id)
try:
if not analysis_id:
duration_ms = int((time.time() - start_time) * 1000)
log_request_error("GET /api/analyze_repo/status", "analysis_id is required", duration_ms)
return jsonify({"error": "analysis_id is required"}), 400
log_service_operation("ANALYZER", "Checking analysis status", analysis_id=analysis_id)
st = analysis_status(analysis_id)
duration_ms = int((time.time() - start_time) * 1000)
log_request_success("GET /api/analyze_repo/status", duration_ms,
analysis_id=analysis_id, status=st.status)
return jsonify({
"ok": True,
"status": st.status,
"message": st.message,
})
except KeyError:
duration_ms = int((time.time() - start_time) * 1000)
log_request_error("GET /api/analyze_repo/status", "analysis not found", duration_ms,
analysis_id=analysis_id)
return jsonify({"error": "analysis not found"}), 404
except Exception as e:
duration_ms = int((time.time() - start_time) * 1000)
log_request_error("GET /api/analyze_repo/status", str(e), duration_ms, analysis_id=analysis_id)
raise
@app.get("/api/analyze_repo/result")
def analyze_result() -> Any:
"""
GET /api/analyze_repo/result - Get the results of a completed analysis
This endpoint:
1. Validates the analysis ID is provided
2. Fetches the analysis results if completed
3. Returns detailed analysis data including recommended settings
Query params: analysis_id
Returns: JSON with analysis results or error if not ready
"""
start_time = time.time()
analysis_id = (request.args.get("analysis_id") or "").strip()
log_request_start("GET /api/analyze_repo/result", analysis_id=analysis_id)
try:
if not analysis_id:
duration_ms = int((time.time() - start_time) * 1000)
log_request_error("GET /api/analyze_repo/result", "analysis_id is required", duration_ms)
return jsonify({"error": "analysis_id is required"}), 400
log_service_operation("ANALYZER", "Fetching analysis result", analysis_id=analysis_id)
res = analysis_result(analysis_id)
if not res:
st = analysis_status(analysis_id)
duration_ms = int((time.time() - start_time) * 1000)
log_request_error("GET /api/analyze_repo/result", f"not ready (status={st.status})",
duration_ms, analysis_id=analysis_id)
return jsonify({"error": f"not ready (status={st.status})"}), 400
duration_ms = int((time.time() - start_time) * 1000)
log_request_success("GET /api/analyze_repo/result", duration_ms, analysis_id=analysis_id)
return jsonify({"ok": True, "result": res})
except Exception as e:
duration_ms = int((time.time() - start_time) * 1000)
log_request_error("GET /api/analyze_repo/result", str(e), duration_ms, analysis_id=analysis_id)
raise
@app.post("/api/apply_analysis")
def apply_analysis() -> Any:
"""
POST /api/apply_analysis - Apply analysis results to repository configuration
This endpoint:
1. Loads the analysis results for a repository
2. Applies selected analysis recommendations to the repository configuration
3. Updates the CSV with the new settings
4. Returns success with applied field information
Body: {repo_url, apply: {module_root?, tests_root?, resource_tier?}}
Returns: JSON with success status and applied fields
"""
start_time = time.time()
payload = request.get_json(force=True)
repo_url = (payload.get("repo_url") or "").strip()
apply = payload.get("apply") or {}
log_request_start("POST /api/apply_analysis", repo_url=repo_url,
fields=list(apply.keys()) if apply else [])
try:
if not repo_url:
duration_ms = int((time.time() - start_time) * 1000)
log_request_error("POST /api/apply_analysis", "repo_url is required", duration_ms)
return jsonify({"error": "repo_url is required"}), 400
log_service_operation("ANALYZER", "Loading analysis result", repo_url=repo_url)
result = load_analysis_for_repo(repo_url)
if not result:
duration_ms = int((time.time() - start_time) * 1000)
log_request_error("POST /api/apply_analysis", "no saved analysis found for repo",
duration_ms, repo_url=repo_url)
return jsonify({"error": "no saved analysis found for repo"}), 404
log_service_operation("CSV", "Reading repository configurations")
rows = _read_csv()
idx = _find_row(rows, repo_url)
if idx is None:
duration_ms = int((time.time() - start_time) * 1000)
log_request_error("POST /api/apply_analysis", "repo not found in CSV",
duration_ms, repo_url=repo_url)
return jsonify({"error": "repo not found in CSV"}), 404
# Apply selected fields from analysis to repository configuration
applied_fields = []
cf = result.get("codeflash", {})
if apply.get("module_root") and cf.get("module_root"):
rows[idx]["module_root"] = cf["module_root"]
applied_fields.append(f"module_root={cf['module_root']}")
if apply.get("tests_root") and cf.get("tests_root"):
rows[idx]["tests_root"] = cf["tests_root"]
applied_fields.append(f"tests_root={cf['tests_root']}")
if apply.get("resource_tier") and result.get("resources", {}).get("tier"):
rows[idx]["resource_tier"] = result["resources"]["tier"]
applied_fields.append(f"resource_tier={result['resources']['tier']}")
log_service_operation("CSV", "Saving updated repository configurations")
_write_csv(rows)
duration_ms = int((time.time() - start_time) * 1000)
log_request_success("POST /api/apply_analysis", duration_ms, repo_url=repo_url,
applied_fields=applied_fields)
return jsonify({"ok": True})
except Exception as e:
duration_ms = int((time.time() - start_time) * 1000)
log_request_error("POST /api/apply_analysis", str(e), duration_ms, repo_url=repo_url)
raise
@app.put("/api/repos")
def update_repo() -> Any:
"""
PUT /api/repos - Updates an existing repository
Body: {repo_url, module_root, tests_root, resource_tier}
Returns: JSON with success status
"""
start_time = time.time()
payload = request.get_json(force=True)
repo_url = (payload.get("repo_url") or "").strip()
log_request_start("PUT /api/repos", repo_url=repo_url)
try:
if not repo_url:
duration_ms = int((time.time() - start_time) * 1000)
log_request_error("PUT /api/repos", "repo_url is required", duration_ms)
return jsonify({"error": "repo_url is required"}), 400
log_service_operation("CSV", "Reading repository configurations")
rows = _read_csv()
idx = _find_row(rows, repo_url)
if idx is None:
duration_ms = int((time.time() - start_time) * 1000)
log_request_error("PUT /api/repos", "repo not found", duration_ms, repo_url=repo_url)
return jsonify({"error": "repo not found"}), 404
rows[idx] = {
"repo_url": repo_url,
"module_root": (payload.get("module_root") or "auto").strip(),
"tests_root": (payload.get("tests_root") or "auto").strip(),
"resource_tier": (payload.get("resource_tier") or "small").strip().lower(),
}
log_service_operation("CSV", "Saving updated repository configurations")
_write_csv(rows)
duration_ms = int((time.time() - start_time) * 1000)
log_request_success("PUT /api/repos", duration_ms, repo_url=repo_url)
return jsonify({"ok": True})
except Exception as e:
duration_ms = int((time.time() - start_time) * 1000)
log_request_error("PUT /api/repos", str(e), duration_ms, repo_url=repo_url)
raise
@app.delete("/api/repos")
def delete_repo() -> Any:
"""
DELETE /api/repos - Deletes a repository
Body: {repo_url}
Returns: JSON with success status
"""
start_time = time.time()
payload = request.get_json(force=True)
repo_url = (payload.get("repo_url") or "").strip()
log_request_start("DELETE /api/repos", repo_url=repo_url)
try:
log_service_operation("CSV", "Reading repository configurations")
rows = _read_csv()
new_rows = [r for r in rows if _canon_repo_url(r.get("repo_url", "")) != _canon_repo_url(repo_url)]
if len(new_rows) == len(rows):
duration_ms = int((time.time() - start_time) * 1000)
log_request_error("DELETE /api/repos", "repo not found", duration_ms, repo_url=repo_url)
return jsonify({"error": "repo not found"}), 404
log_service_operation("CSV", "Saving updated repository configurations")
_write_csv(new_rows)
# also clear job id if exists
log_service_operation("JOBS", "Cleaning up job index")
jobs_index = _load_jobs_index()
key = _canon_repo_url(repo_url)
if key in jobs_index:
jobs_index.pop(key, None)
_save_jobs_index(jobs_index)
duration_ms = int((time.time() - start_time) * 1000)
log_request_success("DELETE /api/repos", duration_ms, repo_url=repo_url)
return jsonify({"ok": True})
except Exception as e:
duration_ms = int((time.time() - start_time) * 1000)
log_request_error("DELETE /api/repos", str(e), duration_ms, repo_url=repo_url)
raise
@app.post("/api/repos/bulk")
def bulk_upload_repos() -> Any:
"""
POST /api/repos/bulk - Bulk upload repositories from CSV data
Body: {csv_data: string}
Returns: JSON with validation results and success/error counts
"""
start_time = time.time()
payload = request.get_json(force=True)
csv_data = (payload.get("csv_data") or "").strip()
log_request_start("POST /api/repos/bulk", data_size=len(csv_data) if csv_data else 0)
try:
if not csv_data:
duration_ms = int((time.time() - start_time) * 1000)
log_request_error("POST /api/repos/bulk", "csv_data is required", duration_ms)
return jsonify({"error": "csv_data is required"}), 400
# Parse and validate CSV data
log_service_operation("CSV", "Parsing and validating bulk CSV data")
validation_results = []
valid_rows = []
line_number = 1
# Split CSV data into lines and parse
lines = csv_data.strip().split('\n')
if not lines:
duration_ms = int((time.time() - start_time) * 1000)
log_request_error("POST /api/repos/bulk", "Empty CSV data", duration_ms)
return jsonify({"error": "Empty CSV data"}), 400
# Check for header
header_line = lines[0].strip()
expected_headers = ["repo_url", "module_root", "tests_root", "resource_tier"]
# Parse header
import io
csv_reader = csv.DictReader(io.StringIO(csv_data))
# Validate header
if not all(h in csv_reader.fieldnames for h in expected_headers):
duration_ms = int((time.time() - start_time) * 1000)
log_request_error("POST /api/repos/bulk", f"Invalid CSV header", duration_ms,
expected=expected_headers, found=csv_reader.fieldnames)
return jsonify({
"error": f"Invalid CSV header. Expected: {', '.join(expected_headers)}",
"found_headers": csv_reader.fieldnames
}), 400
# Get existing repositories to check for duplicates
log_service_operation("CSV", "Loading existing repositories for duplicate check")
existing_rows = _read_csv()
existing_urls = {r["repo_url"].rstrip("/") for r in existing_rows}
new_urls_in_csv = set()
# Validate each row
for row_idx, row in enumerate(csv_reader, start=2): # Start at 2 since header is line 1
validation_result = {
"line": row_idx,
"repo_url": row.get("repo_url", "").strip(),
"errors": [],
"warnings": []
}
# Validate repo_url
repo_url = row.get("repo_url", "").strip()
if not repo_url:
validation_result["errors"].append("repo_url is required")
elif not repo_url.startswith("https://github.com/"):
validation_result["errors"].append("repo_url must be a valid GitHub URL starting with https://github.com/")
elif repo_url.rstrip("/") in existing_urls:
validation_result["errors"].append("repository already exists in database")
elif repo_url.rstrip("/") in new_urls_in_csv:
validation_result["errors"].append("duplicate repository URL in CSV")
else:
new_urls_in_csv.add(repo_url.rstrip("/"))
# Validate module_root
module_root = row.get("module_root", "").strip()
if not module_root:
validation_result["warnings"].append("module_root is empty, will default to 'auto'")
module_root = "auto"
# Validate tests_root
tests_root = row.get("tests_root", "").strip()
if not tests_root:
validation_result["warnings"].append("tests_root is empty, will default to 'auto'")
tests_root = "auto"
# Validate resource_tier
resource_tier = row.get("resource_tier", "").strip().lower()
valid_tiers = ["small", "medium", "large"]
if not resource_tier:
validation_result["warnings"].append("resource_tier is empty, will default to 'small'")
resource_tier = "small"
elif resource_tier not in valid_tiers:
validation_result["errors"].append(f"resource_tier must be one of: {', '.join(valid_tiers)}")
resource_tier = "small" # fallback
validation_results.append(validation_result)
# If no errors, add to valid rows
if not validation_result["errors"]:
valid_rows.append({
"repo_url": repo_url,
"module_root": module_root,
"tests_root": tests_root,
"resource_tier": resource_tier
})
# Count results
total_rows = len(validation_results)
error_count = sum(1 for r in validation_results if r["errors"])
warning_count = sum(1 for r in validation_results if r["warnings"] and not r["errors"])
valid_count = len(valid_rows)
log_service_operation("CSV", "Bulk validation completed",
total_rows=total_rows, valid_count=valid_count,
warning_count=warning_count, error_count=error_count)
# If there are any errors, don't save anything
if error_count > 0:
duration_ms = int((time.time() - start_time) * 1000)
log_request_error("POST /api/repos/bulk", f"Validation failed: {error_count} errors found",
duration_ms, error_count=error_count, valid_count=valid_count)
return jsonify({
"ok": False,
"message": f"Validation failed: {error_count} errors found",
"validation_results": validation_results,
"stats": {
"total_rows": total_rows,
"valid_count": valid_count,
"warning_count": warning_count,
"error_count": error_count
}
}), 400
# Save valid rows to CSV
if valid_rows:
existing_rows.extend(valid_rows)
log_service_operation("CSV", "Saving bulk uploaded repositories")
_write_csv(existing_rows)
duration_ms = int((time.time() - start_time) * 1000)
log_request_success("POST /api/repos/bulk", duration_ms,
added_count=valid_count, warning_count=warning_count)
return jsonify({
"ok": True,
"message": f"Successfully added {valid_count} repositories",
"validation_results": validation_results,
"stats": {
"total_rows": total_rows,
"valid_count": valid_count,
"warning_count": warning_count,
"error_count": error_count
}
})
except Exception as e:
duration_ms = int((time.time() - start_time) * 1000)
log_request_error("POST /api/repos/bulk", f"CSV parsing error: {str(e)}", duration_ms)
return jsonify({"error": f"CSV parsing error: {str(e)}"}), 400
# =============================================================================
# EC2 JOB MANAGEMENT
# =============================================================================
def _job_name_for_repo(repo_url: str) -> str:
"""
Generate a unique job name for a repository URL.
This creates a consistent naming scheme for EC2 instances based on the repository:
- Extracts organization and repository name from GitHub URL
- Creates a sanitized name suitable for AWS resource naming
- Ensures uniqueness across different repositories
"""
parts = repo_url.rstrip("/").split("/")
org = parts[-2] if len(parts) >= 2 else "repo"
name = parts[-1].replace(".", "-")
job_name = f"codeflash-opt-{org}-{name}".lower()
logger.debug(f"🏷️ Generated job name: {job_name} for repo: {repo_url}")
return job_name
def _launch_ec2_job(row: Dict[str, str], launch_opts: Optional[Dict[str, str]] = None) -> str:
"""
Launch an EC2 instance, upload scripts, start optimization in background.
This is the core function that orchestrates the complete optimization workflow:
1. INSTANCE REUSE CHECK:
- Checks if there's already a running instance for this repository
- Reuses existing instances to avoid duplicate work and costs
2. INSTANCE LAUNCH:
- Launches a new EC2 instance with pre-configured user data
- User data installs prerequisites (git, python, codeflash, etc.)
- Tags the instance for tracking and cost management
3. INSTANCE PREPARATION:
- Waits for public IP assignment
- Waits for SSH connectivity
- Waits for bootstrap completion (user-data script)
4. SCRIPT UPLOAD:
- Uploads optimization scripts (run_optimization.sh, detect_roots.py)
- Sets proper permissions and ownership
- Ensures scripts are executable
5. ENVIRONMENT SETUP:
- Loads analysis results for the repository
- Builds environment variables from analysis data
- Creates a wrapper script with all necessary configuration
6. JOB EXECUTION:
- Uploads the wrapper script to the instance
- Starts the optimization job in the background
- Starts a completion watcher to monitor progress
7. CLEANUP:
- The completion watcher automatically terminates the instance when done
- Prevents resource waste and cost accumulation
Args:
row: Repository configuration from CSV (repo_url, module_root, tests_root, resource_tier)
launch_opts: Optional launch options (target_file, target_function for single-file optimization)
Returns: instance_id used as job_id for tracking
"""
repo_url = row["repo_url"] # Extract repository URL from configuration row
job_name = _job_name_for_repo(repo_url) # Generate unique job name for EC2 instance
log_service_operation("JOB", "Launch requested", repo_url=repo_url, job_name=job_name)
# If an instance is already associated with this repo and still running, reuse it
jobs_index = _load_jobs_index() # Load current job-to-instance mappings
canon = _canon_repo_url(repo_url) # Normalize URL for consistent lookup
log_service_operation("AWS_EC2", "Checking for reusable instance", repo_url=repo_url, canonical=canon)
existing_id = jobs_index.get(canon) # Check if we already have an instance for this repo
if existing_id: # If an instance exists, check if it's still usable
state = _describe_instance_state(existing_id) # Get current EC2 instance state
if state in {"pending", "running", "stopping", "stopped"}: # Instance is still active
log_service_operation("AWS_EC2", "Reusing existing instance", instance_id=existing_id, state=state)
return existing_id # Return existing instance ID to avoid duplicate work
else: # Instance is terminated or in unusable state
log_service_operation("AWS_EC2", "Existing instance not reusable", instance_id=existing_id, state=state)
# Otherwise launch a new instance
log_service_operation("AWS_EC2", "Launching new instance", job_name=job_name, canonical=canon)
instance_id = ec2_manager.launch_instance( # Launch new EC2 instance with user-data script
job_name=job_name, # Human-readable name for the instance
job_tag_value=job_name, # Tag value for identification
tags={"RepoUrl": canon}, # Additional tags for tracking
)
log_service_operation("AWS_EC2", "Instance launched", instance_id=instance_id, job_name=job_name)
# Save mapping early so UI reflects the new instance id promptly
log_service_operation("STATE", "Saving repo to instance mapping", repo_url=repo_url, instance_id=instance_id)
_set_repo_job(repo_url, instance_id) # Persist the repo-to-instance mapping
log_service_operation("STATE", "Mapping saved", canonical=canon, instance_id=instance_id)
# Resolve public IP and wait for bootstrap completion
public_ip: Optional[str] = None # Will store the public IP once assigned
ip_attempts_used = 0 # Track number of attempts for logging
for i in range(60): # Try for up to 5 minutes (60 * 5s = 300s)
ip_attempts_used = i + 1 # Count attempts starting from 1
public_ip = ec2_manager.get_public_ip(instance_id) # Check if public IP is assigned
if public_ip: # If we got the IP, we can proceed
break
# Log progress every ~30s to avoid log spam (6 * 5s)
if ip_attempts_used % 6 == 0: # Every 6 attempts = 30 seconds
log_service_operation("AWS_EC2", "Waiting for public IP assignment", instance_id=instance_id, attempts=ip_attempts_used)
time.sleep(5) # Wait 5 seconds before next attempt
if not public_ip: # If we never got a public IP
log_service_operation("AWS_EC2", "Public IP not assigned within timeout", instance_id=instance_id, attempts=ip_attempts_used)
return instance_id # Return instance ID anyway, job might still work later
else: # We successfully got the public IP
log_service_operation("AWS_EC2", "Public IP resolved", instance_id=instance_id, public_ip=public_ip, attempts=ip_attempts_used)
# Wait for SSH
log_service_operation("SSH", "Waiting for SSH reachability", instance_id=instance_id, public_ip=public_ip, timeout=600)
if not ec2_manager.wait_for_ssh(public_ip, timeout=600): # Wait up to 10 minutes for SSH
log_service_operation("AWS_EC2", "SSH not reachable within timeout", instance_id=instance_id, public_ip=public_ip, timeout=600)
return instance_id # Return instance ID, job might still work later
log_service_operation("SSH", "SSH reachable", instance_id=instance_id, ip=public_ip) # SSH is working
# Wait for bootstrap marker created by user-data
try:
log_service_operation("SSH", "Checking bootstrap completion", instance_id=instance_id, ip=public_ip)
ssh = ec2_manager.open_ssh(public_ip) # Open SSH connection to instance
try:
bootstrap_ready = False # Track if bootstrap is complete
bootstrap_attempts = 0 # Count bootstrap check attempts
for bootstrap_attempts in range(1, 121): # Try for up to 10 minutes (121 * 5s = 605s)
_, stdout, _ = ssh.exec_command("test -f /home/ubuntu/.bootstrap_done && echo READY || echo WAIT") # Check for bootstrap marker file
if (stdout.read() or b"").decode().strip() == "READY": # If marker file exists
bootstrap_ready = True # Bootstrap is complete
break
if bootstrap_attempts % 12 == 0: # every ~60s (12 * 5s = 60s)
log_service_operation("SSH", "Still waiting for bootstrap marker", instance_id=instance_id, ip=public_ip, attempts=bootstrap_attempts)
time.sleep(5) # Wait 5 seconds before next check
finally:
ssh.close() # Always close SSH connection
if bootstrap_ready: # Bootstrap completed successfully
log_service_operation("SSH", "Bootstrap completion detected", instance_id=instance_id, ip=public_ip, attempts=bootstrap_attempts)
else: # Bootstrap didn't complete in time
log_service_operation("SSH", "Bootstrap marker not detected within wait window", instance_id=instance_id, ip=public_ip, attempts=bootstrap_attempts)
except Exception as e: # Handle any SSH or bootstrap check errors
logger.exception(f"Error while checking bootstrap completion on {instance_id}@{public_ip}: {e}")
# Upload required scripts
try:
local_root = BASE_DIR # Get the project root directory
log_service_operation("SSH", "Uploading optimization scripts", instance_id=instance_id, ip=public_ip)
ec2_manager.upload_file(public_ip, str(local_root / "scripts" / "run_optimization.sh"), # Upload main optimization script
"/home/ubuntu/app/scripts/run_optimization.sh")
log_service_operation("SSH", "Uploaded script", instance_id=instance_id, ip=public_ip, filename="run_optimization.sh")
ec2_manager.upload_file(public_ip, str(local_root / "scripts" / "detect_roots.py"), # Upload root detection script
"/home/ubuntu/app/scripts/detect_roots.py")
log_service_operation("SSH", "Uploaded script", instance_id=instance_id, ip=public_ip, filename="detect_roots.py")
ec2_manager.upload_file(public_ip, str(local_root / "scripts" / "llm_setup_helper.py"), # Upload LLM setup helper
"/home/ubuntu/app/scripts/llm_setup_helper.py")
log_service_operation("SSH", "Uploaded script", instance_id=instance_id, ip=public_ip, filename="llm_setup_helper.py")
# Ensure ownership and execute bits
log_service_operation("SSH", "Setting script permissions and ownership", instance_id=instance_id, ip=public_ip)
ssh = ec2_manager.open_ssh(public_ip) # Open SSH for permission changes
try:
ssh.exec_command( # Set proper ownership and permissions
"sudo chown -R ubuntu:ubuntu /home/ubuntu/app && chmod +x /home/ubuntu/app/scripts/*.sh; sed -i 's/\\r$//' /home/ubuntu/app/scripts/*.sh || true"
) # Fix line endings and set execute permissions
log_service_operation("SSH", "Script permissions set", instance_id=instance_id, ip=public_ip)
finally:
ssh.close() # Always close SSH connection
except Exception as e: # Handle any script upload errors
logger.exception(f"Failed to upload scripts to {instance_id}@{public_ip}: {e}")
# Build environment and wrapper script
log_service_operation("ANALYSIS", "Loading analysis for repo", repo_url=repo_url)
analysis = load_analysis_for_repo(repo_url) or {} # Load repository analysis results
log_service_operation("ANALYSIS", "Analysis loaded", has_data=bool(analysis))
cf = analysis.get("codeflash", {}) if isinstance(analysis, dict) else {} # Extract codeflash configuration
tests_block = analysis.get("tests", {}) if isinstance(analysis, dict) else {} # Extract test configuration
sys_pkgs_list = analysis.get("system_packages") or [] # Get system packages to install
py_pkgs_list = analysis.get("python_packages") or [] # Get Python packages to install
install = analysis.get("install", {}) or {} # Get custom install commands
def _join_cmds(key: str) -> str: # Helper function to join command lists
cmds = install.get(key) or [] # Get commands for specific phase
if not isinstance(cmds, list) or not cmds: # Validate it's a non-empty list
return "" # Return empty string if no commands
return " && ".join(cmds) # Join commands with && for sequential execution
pre_cmds = _join_cmds("pre_install_cmds") # Commands to run before package installation
ins_cmds = _join_cmds("install_cmds") # Commands to run during package installation
post_cmds = _join_cmds("post_install_cmds") # Commands to run after package installation
log_service_operation(
"ANALYSIS",
"Install commands prepared",
has_pre=bool(pre_cmds), # Log whether we have pre-install commands
has_install=bool(ins_cmds), # Log whether we have install commands
has_post=bool(post_cmds), # Log whether we have post-install commands
)
# JSON for formatter cmds
try:
formatter_cmds_json = json.dumps(cf.get("formatter_cmds", [])) # Convert formatter commands to JSON
except Exception: # Handle JSON serialization errors
formatter_cmds_json = "[\"disabled\"]" # Default to disabled if serialization fails
num_formatter_cmds = len(cf.get("formatter_cmds", [])) if isinstance(cf.get("formatter_cmds", []), list) else 0 # Count formatter commands
# JSON for python packages list
pip_specs_count = 0 # Track number of pip packages
try:
def _to_spec(item: Any) -> Optional[str]: # Convert package item to pip spec string
if isinstance(item, str): # If it's already a string
return item.strip() or None # Return trimmed string or None if empty
if isinstance(item, dict): # If it's a dictionary with name/version
name = str(item.get("name") or "").strip() # Extract package name
spec = str(item.get("version_spec") or "").strip() # Extract version specification
if not name: # If no name provided
return None # Skip this package
return name + (spec if spec else "") # Combine name and version spec
return None # Skip unknown item types
pip_specs = [] # List to store pip package specifications
for it in py_pkgs_list: # Process each Python package
spec = _to_spec(it) # Convert to pip spec
if spec: # If conversion succeeded
pip_specs.append(spec) # Add to specifications list
pip_packages_json = json.dumps(pip_specs) # Convert to JSON string
pip_specs_count = len(pip_specs) # Count successful specifications
except Exception: # Handle any errors in package processing
pip_packages_json = "[]" # Default to empty list
pip_specs_count = 0 # Reset count
log_service_operation(
"ANALYSIS",
"Environment summary",
system_packages=len(sys_pkgs_list) if isinstance(sys_pkgs_list, list) else 0, # Count system packages
pip_packages=pip_specs_count, # Count Python packages
formatter_cmds=num_formatter_cmds, # Count formatter commands
has_pytest_cmd=bool(tests_block.get("test_command")), # Check if test command exists
)
# Compose wrapper script
# Export only non-empty secrets; otherwise, wrapper will print a helpful error
cf_key = os.getenv('CODEFLASH_API_KEY') or '' # Get Codeflash API key from environment
gh_token = os.getenv('GITHUB_TOKEN') or '' # Get GitHub token from environment
anthropic_key = os.getenv('ANTHROPIC_API_KEY') or '' # Anthropic API key for LLM setup helper
anthropic_model = os.getenv('ANTHROPIC_MODEL') or 'claude-3-5-haiku-20241022'
env_lines = [ # Start building environment variable exports
(f"export ANTHROPIC_API_KEY='{anthropic_key}'" if anthropic_key else "unset ANTHROPIC_API_KEY"), # Expose Anthropic key if provided
(f"export CODEFLASH_API_KEY='{cf_key}'" if cf_key else "unset CODEFLASH_API_KEY"), # Set or unset Codeflash key
(f"export GITHUB_TOKEN='{gh_token}'" if gh_token else "unset GITHUB_TOKEN"), # Set or unset GitHub token
(f"export GH_TOKEN='{gh_token}'" if gh_token else "unset GH_TOKEN"), # Also set GH_TOKEN alias
f"export ANTHROPIC_MODEL='{anthropic_model}'",
f"export GITHUB_REPO_URL='{repo_url}'", # Set repository URL
f"export MODULE_ROOT='{row.get('module_root', 'auto')}'", # Set module root path
f"export TESTS_ROOT='{row.get('tests_root', 'auto')}'", # Set tests root path
]
# Pass optional single-file optimization targets
if launch_opts: # If launch options are provided
tf = (launch_opts.get('target_file') or '').strip() # Get target file path
fn = (launch_opts.get('target_function') or '').strip() # Get target function name
if tf: # If target file is specified
env_lines.append(f"export CF_TARGET_FILE='{tf}'") # Set target file environment variable
if fn: # If target function is specified
env_lines.append(f"export CF_TARGET_FUNCTION='{fn}'") # Set target function environment variable
if cf.get("module_root"): # If analysis found a module root
env_lines.append(f"export LLM_MODULE_ROOT='{cf['module_root']}'") # Set LLM module root
if cf.get("tests_root"): # If analysis found a tests root
env_lines.append(f"export LLM_TESTS_ROOT='{cf['tests_root']}'") # Set LLM tests root
if tests_block.get("test_command"): # If test command is available
# Properly escape the test command for shell export
test_cmd = tests_block['test_command'].replace("'", "'\"'\"'") # Escape single quotes in test command
env_lines.append(f"export LLM_PYTEST_CMD='{test_cmd}'") # Set test command environment variable
env_lines.append(f"export LLM_FORMATTER_CMDS='{formatter_cmds_json}'") # Set formatter commands JSON
env_lines.append(f"export LLM_PIP_PACKAGES='{pip_packages_json}'") # Set pip packages JSON
system_packages = " ".join(sys_pkgs_list) if sys_pkgs_list else "" # Join system packages into space-separated string
if system_packages: # If we have system packages to install
env_lines.append(f"export SYSTEM_PACKAGES='{system_packages}'") # Set system packages environment variable
if pre_cmds: # If we have pre-install commands
# Properly escape commands for shell export
escaped_pre = pre_cmds.replace("'", "'\"'\"'") # Escape single quotes in pre-install commands
env_lines.append(f"export PRE_INSTALL_CMDS='{escaped_pre}'") # Set pre-install commands environment variable
if ins_cmds: # If we have install commands
escaped_ins = ins_cmds.replace("'", "'\"'\"'") # Escape single quotes in install commands
env_lines.append(f"export INSTALL_CMDS='{escaped_ins}'") # Set install commands environment variable
if post_cmds: # If we have post-install commands
escaped_post = post_cmds.replace("'", "'\"'\"'") # Escape single quotes in post-install commands
env_lines.append(f"export POST_INSTALL_CMDS='{escaped_post}'") # Set post-install commands environment variable
log_service_operation(
"ANALYSIS",
"Environment variables composed",
env_lines=len(env_lines), # Log total number of environment variables
has_system_packages=bool(sys_pkgs_list), # Log whether system packages are configured
has_pre_install=bool(pre_cmds), # Log whether pre-install commands exist
has_install=bool(ins_cmds), # Log whether install commands exist
has_post_install=bool(post_cmds), # Log whether post-install commands exist
)
# Non-secret env vars passthrough
try:
extra_env = (analysis.get("env", {}) or {}).get("non_secret_env_vars", {}) if isinstance(analysis, dict) else {} # Get additional environment variables from analysis
if isinstance(extra_env, dict): # If we have extra environment variables
for k, v in extra_env.items(): # Process each additional environment variable
try:
key = str(k) # Convert key to string
if not re.match(r"^[A-Z_][A-Z0-9_]*$", key): # Validate environment variable name format
continue # Skip invalid environment variable names
val = str(v) # Convert value to string
# Escape single quotes in value
val = val.replace("'", "'\\''") # Escape single quotes in environment variable value
env_lines.append(f"export {key}='{val}'") # Add environment variable export
except Exception: # Handle any errors in environment variable processing
continue # Skip problematic environment variables
except Exception: # Handle any errors in extra environment processing
pass # Continue if extra environment processing fails
wrapper = "\n".join([ # Create wrapper script by joining lines
"#!/bin/bash", # Shebang for bash script
"set -euo pipefail", # Exit on error, undefined vars, pipe failures
"LOG_DIR=/home/ubuntu/app/logs", # Set log directory path
"mkdir -p \"$LOG_DIR\"", # Create log directory if it doesn't exist
"TS=$(date -Is | sed 's/[:+]/-/g')", # Timestamp for log rotation (safe filename)
"LOG_FILE=$LOG_DIR/optimization-$TS.log", # Timestamped log file
"PID_FILE=$LOG_DIR/optimization.pid", # PID file for running job
"EXIT_FILE=$LOG_DIR/job.exitcode", # Exit code marker
"STAGE_FILE=$LOG_DIR/stage.jsonl", # Stage tracking file (JSONL)
"# Clean previous markers to ensure fresh run",
"rm -f \"$PID_FILE\" \"$EXIT_FILE\" || true",
"touch \"$LOG_FILE\" && chmod 666 \"$LOG_FILE\"", # Create log file with write permissions
"touch \"$STAGE_FILE\" && chmod 666 \"$STAGE_FILE\" || true",
"echo '=== Job start' $(date -Is) | tee -a \"$LOG_FILE\"", # Log job start timestamp
"echo USER=$(whoami) | tee -a \"$LOG_FILE\"", # Log current user
"echo HOME=$HOME | tee -a \"$LOG_FILE\"", # Log home directory
"echo PWD=$(pwd) | tee -a \"$LOG_FILE\"", # Log current working directory
"env | sort | sed -n '1,40p' | sed 's/.*/ENV: &/' | tee -a \"$LOG_FILE\"", # Log first 40 environment variables
"export WORK_DIR=/home/ubuntu/work", # Set working directory for optimization
"# Export markers for child processes (runner) to be able to update status",
"export LOG_FILE PID_FILE EXIT_FILE STAGE_FILE WORK_DIR",
"# Stage: wrapper_started",
"echo \"{\\\"ts\\\":\\\"$(date -Is)\\\",\\\"stage\\\":\\\"wrapper_started\\\"}\" >> \"$STAGE_FILE\" || true",
# Ensure a basic Python is available via 'python' fallback
"if ! command -v python >/dev/null 2>&1 && command -v python3 >/dev/null 2>&1; then ln -sf $(command -v python3) /home/ubuntu/python || true; export PATH=/home/ubuntu:$PATH; fi", # Create python symlink if needed
*env_lines, # Insert all environment variable exports
"if [ -n \"${SYSTEM_PACKAGES:-}\" ]; then apt-get update && DEBIAN_FRONTEND=noninteractive apt-get install -y ${SYSTEM_PACKAGES} || true; fi", # Install system packages if specified
# Choose mode: single file or entire codebase
"if [ -n \"${CF_TARGET_FILE:-}\" ]; then export CF_MODE=single; else export CF_MODE=all; fi", # Set optimization mode based on target file
# Normalize line endings just in case the script was uploaded with CRLF
"sed -i 's/\\r$//' /home/ubuntu/app/scripts/run_optimization.sh || true", # Fix line endings in optimization script
"# Start the optimization script and record PID",
"( bash /home/ubuntu/app/scripts/run_optimization.sh >>\"$LOG_FILE\" 2>&1 ) & echo $! > \"$PID_FILE\"",
"# Wait for process and persist exit code",
"PID=$(cat \"$PID_FILE\" 2>/dev/null || echo 0)",
"if [ \"$PID\" -gt 0 ] && kill -0 \"$PID\" 2>/dev/null; then",
" wait \"$PID\"; RC=$?; echo $RC > \"$EXIT_FILE\";",
"else",
" echo 1 > \"$EXIT_FILE\"; RC=1;",
"fi",
"# Stage: wrapper_finished",
"echo \"{\\\"ts\\\":\\\"$(date -Is)\\\",\\\"stage\\\":\\\"wrapper_finished\\\",\\\"rc\\\":$RC}\" >> \"$STAGE_FILE\" || true",
"echo '=== Job end' $(date -Is) | tee -a \"$LOG_FILE\"", # Log job end timestamp
])
log_service_operation("SSH", "Uploading wrapper script", instance_id=instance_id, ip=public_ip, remote_path="/home/ubuntu/app/run_job.sh")
try:
ec2_manager.upload_text(public_ip, wrapper, "/home/ubuntu/app/run_job.sh") # Upload the wrapper script to instance
# Verify files exist before starting
run_path = "/home/ubuntu/app/run_job.sh" # Path to wrapper script on remote instance
script_path = "/home/ubuntu/app/scripts/run_optimization.sh" # Path to optimization script on remote instance
has_runner = ec2_manager.file_exists(public_ip, run_path) # Check if wrapper script exists
has_script = ec2_manager.file_exists(public_ip, script_path) # Check if optimization script exists
log_service_operation("AWS_EC2", "Remote files present", run_job=has_runner, opt_script=has_script) # Log file existence status
# Run in background; fully detach so it survives SSH session
log_service_operation("SSH", "Starting background job", instance_id=instance_id, ip=public_ip, run_path=run_path)
ec2_manager.exec( # Execute the wrapper script in background
public_ip,
"bash -lc 'chmod +x /home/ubuntu/app/run_job.sh && setsid nohup /home/ubuntu/app/run_job.sh >> /home/ubuntu/app/logs/launcher.log 2>&1 & disown'", # Make executable and run in background
get_pty=False, # Don't allocate pseudo-terminal
)
# Start watcher for cleanup
log_service_operation("WATCHER", "Starting completion watcher", instance_id=instance_id, ip=public_ip) # Log watcher start
_start_completion_watcher(instance_id, public_ip, repo_url) # Start background thread to monitor job completion
except Exception as e: # Handle any errors in job execution setup
logger.exception(f"Failed to start remote job on {instance_id}@{public_ip}: {e}") # Log the error
log_service_operation("JOB", "Launch orchestration finished", instance_id=instance_id, repo_url=repo_url) # Log completion of launch process
return instance_id # Return the instance ID for tracking
# =============================================================================
# JOB EXECUTION API
# =============================================================================
@app.post("/api/run")
def run_single() -> Any:
"""
POST /api/run - Runs optimization for a single repository
This endpoint:
1. Validates the repository exists in the configuration
2. Optionally validates target_file for single-file optimization
3. Launches an EC2 instance and starts the optimization job
4. Updates the job index to track the new job
5. Returns the job ID for monitoring progress
Body: {repo_url, target_file?, target_function?}
Returns: JSON with job_id
"""
start_time = time.time()
payload = request.get_json(force=True)
repo_url = (payload.get("repo_url") or "").strip()
target_file = (payload.get("target_file") or "").strip()
target_function = (payload.get("target_function") or "").strip()
log_request_start("POST /api/run", repo_url=repo_url)
try:
log_service_operation("CSV", "Finding repository configuration")
rows = _read_csv()
idx = _find_row(rows, repo_url)
if idx is None:
duration_ms = int((time.time() - start_time) * 1000)
log_request_error("POST /api/run", "repo not found", duration_ms, repo_url=repo_url)
return jsonify({"error": "repo not found"}), 404
# Minimal validation for single-file mode
if target_file:
if target_file.startswith(("/", "..")) or not target_file.endswith(".py"):
duration_ms = int((time.time() - start_time) * 1000)
log_request_error("POST /api/run", "invalid target_file", duration_ms, repo_url=repo_url, target_file=target_file)
return jsonify({"error": "target_file must be a relative .py path under repo"}), 400
job_id = _launch_ec2_job(rows[idx], {
"target_file": target_file,
"target_function": target_function,
})
log_service_operation("JOBS", "Updating job index")
jobs_index = _load_jobs_index()
jobs_index[_canon_repo_url(repo_url)] = job_id
_save_jobs_index(jobs_index)
duration_ms = int((time.time() - start_time) * 1000)
log_request_success("POST /api/run", duration_ms, repo_url=repo_url, job_id=job_id)
return jsonify({"ok": True, "job_id": job_id})
except Exception as e:
duration_ms = int((time.time() - start_time) * 1000)
log_request_error("POST /api/run", str(e), duration_ms, repo_url=repo_url)
raise
@app.post("/api/run_all")
def run_all() -> Any:
"""
POST /api/run_all - Runs optimization for all repositories
This endpoint:
1. Reads all configured repositories from CSV
2. Launches optimization jobs for each repository
3. Handles errors gracefully for individual repositories
4. Updates the job index with all successful launches
5. Returns a mapping of repositories to job IDs or error messages
This is useful for bulk optimization of multiple repositories at once.
Returns: JSON with results mapping repo URLs to job IDs or errors
"""
start_time = time.time()
log_request_start("POST /api/run_all")
try:
log_service_operation("CSV", "Reading all repository configurations")
rows = _read_csv()
jobs_index = _load_jobs_index()
results: Dict[str, str] = {}
successful_jobs = 0
failed_jobs = 0
for row in rows:
repo_url = row["repo_url"]
try:
job_id = _launch_ec2_job(row)
jobs_index[_canon_repo_url(repo_url)] = job_id
results[repo_url] = job_id
successful_jobs += 1
except Exception as e:
results[repo_url] = f"ERROR: {e}"
failed_jobs += 1
logger.error(f"❌ Failed to submit job for {repo_url}: {e}")
log_service_operation("JOBS", "Updating job index after bulk run")
_save_jobs_index(jobs_index)
duration_ms = int((time.time() - start_time) * 1000)
log_request_success("POST /api/run_all", duration_ms,
total_repos=len(rows), successful_jobs=successful_jobs,
failed_jobs=failed_jobs)
return jsonify({"results": results})
except Exception as e:
duration_ms = int((time.time() - start_time) * 1000)
log_request_error("POST /api/run_all", str(e), duration_ms)
raise
# =============================================================================
# JOB MONITORING API
# =============================================================================
def _resolve_job_id(args) -> Optional[str]:
"""
Resolves job ID from request arguments
This utility function handles flexible job ID resolution:
- If job_id is provided directly, use it
- If repo_url is provided, look up the job ID from the job index
- Returns None if neither is provided or job not found
Args:
args: Request arguments (job_id or repo_url)
Returns: Job ID or None if not found
"""
job_id = args.get("job_id")
if job_id:
logger.debug(f"🔍 Using direct job_id: {job_id}")
return job_id
repo_url = args.get("repo_url")
if not repo_url:
return None
jobs_index = _load_jobs_index()
resolved_job_id = jobs_index.get(_canon_repo_url(repo_url))
logger.debug(f"🔍 Resolved job_id for {repo_url}: {resolved_job_id}")
return resolved_job_id
@app.get("/api/job_status")
def job_status() -> Any:
"""
GET /api/job_status - Gets job status for EC2-backed job
This endpoint provides comprehensive job status information:
1. Resolves the job ID from request parameters
2. Checks the EC2 instance state (pending, running, stopped, etc.)
3. Determines if the optimization job has completed
4. Reads the exit code if available
5. Returns a unified status (running, succeeded, failed)
Query params: job_id or repo_url
Returns: JSON with job status information
"""
start_time = time.time()
job_id = _resolve_job_id(request.args)
repo_url = request.args.get("repo_url")
log_request_start("GET /api/job_status", job_id=job_id, repo_url=repo_url)
try:
if not job_id:
duration_ms = int((time.time() - start_time) * 1000)
log_request_error("GET /api/job_status", "job_id or repo_url is required", duration_ms)
return jsonify({"error": "job_id or repo_url is required"}), 400
# job_id is EC2 instance id
state = _describe_instance_state(job_id) or "unknown"
public_ip = ec2_manager.get_public_ip(job_id) or None
# If instance has SSH and job exit code file, consider finished
finished = False
exit_code: Optional[int] = None
if public_ip:
try:
if ec2_manager.file_exists(public_ip, "/home/ubuntu/app/logs/job.exitcode"):
finished = True
# Try to read exit code
log_service_operation("SSH", "Reading job exit code", job_id=job_id, ip=public_ip)
ssh = ec2_manager.open_ssh(public_ip)
try:
_, stdout, _ = ssh.exec_command("cat /home/ubuntu/app/logs/job.exitcode")
try:
exit_code = int((stdout.read() or b"0").decode().strip() or "0")
except Exception:
exit_code = 0
finally:
ssh.close()
except Exception:
pass
status = "running"
if state in {"pending", "running"}:
status = "running"
elif finished:
status = "succeeded" if (exit_code is not None and exit_code == 0) else "failed"
elif state in {"stopped", "stopping", "shutting-down", "terminated"}:
status = "failed"
simple = {
"jobId": job_id,
"jobName": repo_url or job_id,
"status": status,
"state": state,
"publicIp": public_ip,
"exitCode": exit_code,
}
duration_ms = int((time.time() - start_time) * 1000)
log_request_success("GET /api/job_status", duration_ms, job_id=job_id, status=status)
return jsonify({"job": simple})
except Exception as e:
duration_ms = int((time.time() - start_time) * 1000)
log_request_error("GET /api/job_status", str(e), duration_ms, job_id=job_id)
raise
@app.get("/api/job_logs")
def job_logs() -> Any:
"""
GET /api/job_logs - Gets job logs from the EC2 instance
This endpoint provides real-time access to optimization logs:
1. Resolves the job ID and gets the instance public IP
2. Fetches the most recent log entries from the optimization run
3. Provides fallback to other log sources if main log is empty
4. Returns log lines for display in the web UI
The logs show the complete optimization process including:
- Repository cloning and setup
- Dependency installation
- Codeflash optimization execution
- Results and any errors encountered
Query params: job_id or repo_url
Returns: JSON with log events (lines) from remote log file
"""
start_time = time.time()
job_id = _resolve_job_id(request.args)
repo_url = request.args.get("repo_url")
log_request_start("GET /api/job_logs", job_id=job_id, repo_url=repo_url)
try:
if not job_id:
duration_ms = int((time.time() - start_time) * 1000)
log_request_error("GET /api/job_logs", "job_id or repo_url is required", duration_ms)
return jsonify({"error": "job_id or repo_url is required"}), 400
public_ip = ec2_manager.get_public_ip(job_id)
if not public_ip:
duration_ms = int((time.time() - start_time) * 1000)
log_request_success("GET /api/job_logs", duration_ms, job_id=job_id, note="public ip not available yet")
return jsonify({"events": [], "note": "instance public ip not available yet"})
# Resolve latest optimization log by timestamp if available; fallback to legacy symlink
def _latest_log_path(ip: str) -> str:
try:
ssh = ec2_manager.open_ssh(ip)
try:
# List timestamped logs and pick the newest lexicographically
cmd = "bash -lc 'ls -1 /home/ubuntu/app/logs/optimization-*.log 2>/dev/null | sort -r | head -n1'"
_, stdout, _ = ssh.exec_command(cmd)
candidate = (stdout.read() or b"").decode().strip()
if candidate:
return candidate
finally:
ssh.close()
except Exception:
pass
return "/home/ubuntu/app/logs/optimization.log"
main_log_path = _latest_log_path(public_ip)
log_service_operation("SSH", "Opening SSH to tail optimization log", job_id=job_id, ip=public_ip, path=main_log_path)
events = ec2_manager.read_file_tail(public_ip, main_log_path, lines=1000)
# Fallbacks
fallback = []
if not events:
# Try launcher log (contains early wrapper prints)
log_service_operation("SSH", "optimization log empty, fetching launcher.log", job_id=job_id, ip=public_ip)
fallback = ec2_manager.read_file_tail(public_ip, "/home/ubuntu/app/logs/launcher.log", lines=200)
if not events and not fallback:
# Finally try cloud-init user data
log_service_operation("SSH", "launcher.log empty, falling back to user-data.log", job_id=job_id, ip=public_ip)
fallback = ec2_manager.read_file_tail(public_ip, "/var/log/user-data.log", lines=200)
duration_ms = int((time.time() - start_time) * 1000)
log_request_success("GET /api/job_logs", duration_ms, job_id=job_id, event_count=len(events), fallback_count=len(fallback or []))
# Try to fetch current stage info
stage_lines: List[str] = []
try:
stage_lines = ec2_manager.read_file_tail(public_ip, "/home/ubuntu/app/logs/stage.jsonl", lines=50)
except Exception:
stage_lines = []
current_stage = None
if stage_lines:
try:
import json as _json
for line in reversed(stage_lines):
line = (line or "").strip()
if not line:
continue
try:
obj = _json.loads(line)
if isinstance(obj, dict) and obj.get("stage"):
current_stage = obj
break
except Exception:
continue
except Exception:
current_stage = None
resp = {"events": events}
if current_stage is not None:
resp["stage"] = current_stage
if fallback:
resp["bootstrap"] = fallback
return jsonify(resp)
except Exception as e:
duration_ms = int((time.time() - start_time) * 1000)
log_request_error("GET /api/job_logs", str(e), duration_ms, job_id=job_id)
return jsonify({"error": str(e)}), 500
@app.get("/api/job_logs/download")
def job_logs_download() -> Any:
"""
GET /api/job_logs/download - Downloads the full optimization log from the EC2 instance
This endpoint provides access to complete log files for analysis:
1. Resolves the job ID and gets the instance public IP
2. Streams the complete log file content to the client
3. Handles large log files efficiently with chunked streaming
4. Provides fallback to other log sources if main log is missing
5. Sets appropriate headers for file download
This is useful for:
- Downloading complete logs for offline analysis
- Debugging optimization issues
- Archiving optimization results
Query params: job_id or repo_url
Returns: text/plain content of the log file
"""
start_time = time.time()
job_id = _resolve_job_id(request.args)
repo_url = request.args.get("repo_url")
log_request_start("GET /api/job_logs/download", job_id=job_id, repo_url=repo_url)
try:
if not job_id:
duration_ms = int((time.time() - start_time) * 1000)
log_request_error("GET /api/job_logs/download", "job_id or repo_url is required", duration_ms)
return jsonify({"error": "job_id or repo_url is required"}), 400
public_ip = ec2_manager.get_public_ip(job_id)
if not public_ip:
duration_ms = int((time.time() - start_time) * 1000)
log_request_error("GET /api/job_logs/download", "instance public ip not available yet", duration_ms)
return jsonify({"error": "instance public ip not available yet"}), 400
# Determine latest optimization log first; then fallback to launcher and user-data
latest_cmd = "bash -lc 'ls -1 /home/ubuntu/app/logs/optimization-*.log 2>/dev/null | sort -r | head -n1'"
used_path = None
try:
ssh_tmp = ec2_manager.open_ssh(public_ip)
try:
_, stdout, _ = ssh_tmp.exec_command(latest_cmd)
candidate = (stdout.read() or b"").decode().strip()
if candidate:
used_path = candidate
finally:
ssh_tmp.close()
except Exception:
used_path = None
if not used_path:
# Prefer legacy symlink, then launcher, then user-data
paths = [
"/home/ubuntu/app/logs/optimization.log",
"/home/ubuntu/app/logs/launcher.log",
"/var/log/user-data.log",
]
try:
for p in paths:
if ec2_manager.file_exists(public_ip, p):
used_path = p
break
except Exception:
used_path = None
if not used_path:
duration_ms = int((time.time() - start_time) * 1000)
log_request_error("GET /api/job_logs/download", "no logs available", duration_ms, job_id=job_id)
return jsonify({"error": "no logs available"}), 404
# Obtain file size for Content-Length (progress in FE)
total_size = 0
try:
ssh_stat = ec2_manager.open_ssh(public_ip)
try:
sftp_stat = ssh_stat.open_sftp()
try:
total_size = sftp_stat.stat(used_path).st_size or 0
finally:
sftp_stat.close()
finally:
ssh_stat.close()
except Exception:
total_size = 0
def generate():
# Stream via SFTP in chunks
log_service_operation("SSH", "Streaming log file download", job_id=job_id, ip=public_ip, path=used_path)
ssh = ec2_manager.open_ssh(public_ip)
try:
sftp = ssh.open_sftp()
try:
with sftp.file(used_path, "rb") as f:
while True:
chunk = f.read(65536)
if not chunk:
break
yield chunk
finally:
sftp.close()
finally:
ssh.close()
# Build friendly filename: <org-repo>_<YYYY-MM-DD_HH-MM>.log
slug = None
try:
parts = (repo_url or "").rstrip("/").split("/")
if len(parts) >= 2:
org = parts[-2]
name = parts[-1].replace(".git", "")
slug = f"{org}-{name}"
except Exception:
pass
# Get timestamp from filename if present
stamp = ""
base = os.path.basename(used_path)
# base may look like optimization-2024-09-20T18-22-33.log or optimization.log
m = re.match(r"optimization-([0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9]{2}-[0-9]{2})", base)
if m:
ts = m.group(1).replace("T", "_")
stamp = f"_{ts}"
filename = f"{slug or job_id}{stamp}.log"
duration_ms = int((time.time() - start_time) * 1000)
log_request_success("GET /api/job_logs/download", duration_ms, job_id=job_id, path=used_path)
resp = Response(stream_with_context(generate()), mimetype="text/plain")
# Force download with naming convention
resp.headers["Content-Disposition"] = f"attachment; filename={filename}"
if total_size:
resp.headers["Content-Length"] = str(total_size)
return resp
except Exception as e:
duration_ms = int((time.time() - start_time) * 1000)
log_request_error("GET /api/job_logs/download", str(e), duration_ms, job_id=job_id)
return jsonify({"error": str(e)}), 500
@app.get("/api/job_optimizations")
def job_optimizations() -> Any:
"""
GET /api/job_optimizations - Extract Codeflash optimization review links from logs
This endpoint extracts optimization review links from job logs:
1. Resolves the job ID and gets the instance public IP
2. Searches the optimization log for Codeflash review URLs
3. Extracts and deduplicates the review links
4. Returns the count and list of review URLs
These links provide direct access to:
- Codeflash optimization reviews
- Detailed optimization suggestions
- Before/after code comparisons
- Performance improvement recommendations
Query params: job_id or repo_url
Returns: { count, links }
"""
start_time = time.time()
job_id = _resolve_job_id(request.args)
repo_url = request.args.get("repo_url")
log_request_start("GET /api/job_optimizations", job_id=job_id, repo_url=repo_url)
try:
if not job_id:
duration_ms = int((time.time() - start_time) * 1000)
log_request_error("GET /api/job_optimizations", "job_id or repo_url is required", duration_ms)
return jsonify({"error": "job_id or repo_url is required"}), 400
public_ip = ec2_manager.get_public_ip(job_id)
if not public_ip:
duration_ms = int((time.time() - start_time) * 1000)
log_request_error("GET /api/job_optimizations", "instance public ip not available yet", duration_ms)
return jsonify({"error": "instance public ip not available yet"}), 400
# Resolve latest log file path
log_path = None
try:
ssh = ec2_manager.open_ssh(public_ip)
try:
cmd = "bash -lc 'ls -1 /home/ubuntu/app/logs/optimization-*.log 2>/dev/null | sort -r | head -n1'"
_, stdout, _ = ssh.exec_command(cmd)
candidate = (stdout.read() or b"").decode().strip()
if candidate:
log_path = candidate
finally:
ssh.close()
except Exception:
log_path = None
if not log_path:
# fallback to legacy symlink
log_path = "/home/ubuntu/app/logs/optimization.log"
if not ec2_manager.file_exists(public_ip, log_path):
duration_ms = int((time.time() - start_time) * 1000)
log_request_success("GET /api/job_optimizations", duration_ms, job_id=job_id, links=0)
return jsonify({"count": 0, "links": []})
grep_cmd = (
"bash -lc \"grep -ao 'https://app.codeflash.ai/review-optimizations/[A-Za-z0-9-]*' "
f"{log_path} | sort -u\""
)
code, out, _ = ec2_manager.exec_capture(public_ip, grep_cmd, get_pty=False)
found = [line.strip() for line in out.splitlines() if line.strip()]
# Deduplicate while preserving order
seen = set()
links: list[str] = []
for url in found:
if url not in seen:
seen.add(url)
links.append(url)
duration_ms = int((time.time() - start_time) * 1000)
log_request_success("GET /api/job_optimizations", duration_ms, job_id=job_id, links=len(links))
return jsonify({"count": len(links), "links": links})
except Exception as e:
duration_ms = int((time.time() - start_time) * 1000)
log_request_error("GET /api/job_optimizations", str(e), duration_ms, job_id=job_id)
return jsonify({"error": str(e)}), 500
@app.post("/api/terminate")
def terminate_job() -> Any:
"""
POST /api/terminate - Terminates the EC2 instance for a job
This endpoint provides manual control over EC2 instances:
1. Resolves the job ID from request parameters
2. Terminates the EC2 instance to stop the optimization job
3. Cleans up the job index to remove the mapping
4. Prevents further costs from running instances
This is useful for:
- Stopping long-running optimization jobs
- Cleaning up failed or stuck instances
- Managing costs by terminating unused instances
Body: {repo_url? , job_id?}
"""
start_time = time.time()
payload = request.get_json(force=True) or {}
repo_url = (payload.get("repo_url") or "").strip()
job_id = (payload.get("job_id") or "").strip()
log_request_start("POST /api/terminate", repo_url=repo_url, job_id=job_id)
try:
# Resolve job_id from mapping if not provided
if not job_id and repo_url:
jobs_index = _load_jobs_index()
job_id = jobs_index.get(_canon_repo_url(repo_url)) or ""
if not job_id:
duration_ms = int((time.time() - start_time) * 1000)
log_request_error("POST /api/terminate", "job_id or repo_url is required", duration_ms)
return jsonify({"error": "job_id or repo_url is required"}), 400
# Terminate the instance
ec2_manager.terminate(job_id)
# Cleanup jobs index mapping
jobs_index = _load_jobs_index()
changed = False
if repo_url:
key = _canon_repo_url(repo_url)
if key in jobs_index and jobs_index[key] == job_id:
jobs_index.pop(key, None)
changed = True
else:
# Remove any mapping pointing to this job_id
to_del = [k for k, v in jobs_index.items() if v == job_id]
for k in to_del:
jobs_index.pop(k, None)
changed = True
if changed:
_save_jobs_index(jobs_index)
duration_ms = int((time.time() - start_time) * 1000)
log_request_success("POST /api/terminate", duration_ms, job_id=job_id)
return jsonify({"ok": True})
except Exception as e:
duration_ms = int((time.time() - start_time) * 1000)
log_request_error("POST /api/terminate", str(e), duration_ms, repo_url=repo_url, job_id=job_id)
return jsonify({"error": str(e)}), 500
@app.post("/api/restart")
def restart_job() -> Any:
"""
POST /api/restart - Stops the current optimization process on the instance and starts a new one.
Body: {repo_url}
Behavior:
- Resolves instance by repo_url mapping
- SSH in, attempts to kill running optimization via PID file if present
- Clears exitcode marker, re-runs the wrapper in background (nohup)
- Returns ok and job_id
"""
start_time = time.time()
payload = request.get_json(force=True) or {}
repo_url = (payload.get("repo_url") or "").strip()
log_request_start("POST /api/restart", repo_url=repo_url)
try:
if not repo_url:
duration_ms = int((time.time() - start_time) * 1000)
log_request_error("POST /api/restart", "repo_url is required", duration_ms)
return jsonify({"error": "repo_url is required"}), 400
jobs_index = _load_jobs_index()
job_id = jobs_index.get(_canon_repo_url(repo_url))
if not job_id:
# If no running instance, just run a new one
rows = _read_csv()
idx = _find_row(rows, repo_url)
if idx is None:
duration_ms = int((time.time() - start_time) * 1000)
log_request_error("POST /api/restart", "repo not found", duration_ms, repo_url=repo_url)
return jsonify({"error": "repo not found"}), 404
new_job = _launch_ec2_job(rows[idx])
duration_ms = int((time.time() - start_time) * 1000)
log_request_success("POST /api/restart", duration_ms, repo_url=repo_url, job_id=new_job)
return jsonify({"ok": True, "job_id": new_job, "action": "started_new"})
# We have an instance; attempt to stop current optimization and start a new one
# Resolve IP
public_ip = ec2_manager.get_public_ip(job_id)
if not public_ip:
duration_ms = int((time.time() - start_time) * 1000)
log_request_error("POST /api/restart", "public IP not available", duration_ms, job_id=job_id)
return jsonify({"error": "instance public IP not available yet"}), 409
# Kill existing process if PID file exists
kill_cmds = [
"bash -lc 'set -e; PID_FILE=/home/ubuntu/app/logs/optimization.pid; EXIT_FILE=/home/ubuntu/app/logs/job.exitcode; "
"if [ -f \"$PID_FILE\" ]; then PID=$(cat \"$PID_FILE\" 2>/dev/null || echo 0); "
"if [ \"$PID\" -gt 0 ] && kill -0 \"$PID\" 2>/dev/null; then kill -TERM \"$PID\" || true; sleep 2; kill -KILL \"$PID\" || true; fi; fi; "
"rm -f \"$EXIT_FILE\" || true'"
]
for cmd in kill_cmds:
ec2_manager.exec(public_ip, cmd)
# Re-run the wrapper script in background
start_cmd = (
"bash -lc 'chmod +x /home/ubuntu/app/run_job.sh && "
"setsid nohup /home/ubuntu/app/run_job.sh >> /home/ubuntu/app/logs/launcher.log 2>&1 & disown'"
)
ec2_manager.exec(public_ip, start_cmd)
duration_ms = int((time.time() - start_time) * 1000)
log_request_success("POST /api/restart", duration_ms, repo_url=repo_url, job_id=job_id)
return jsonify({"ok": True, "job_id": job_id, "action": "restarted"})
except Exception as e:
duration_ms = int((time.time() - start_time) * 1000)
log_request_error("POST /api/restart", str(e), duration_ms, repo_url=repo_url)
return jsonify({"error": str(e)}), 500
@app.get("/api/job_logs/download_all")
def job_logs_download_all() -> Any:
"""
GET /api/job_logs/download_all - Download all related logs (optimization, launcher, LLM) as a zip
Query params: job_id or repo_url
Returns: application/zip with files named using <org-repo>_<YYYY-MM-DD_HH-MM> convention
"""
start_time = time.time()
job_id = _resolve_job_id(request.args)
repo_url = request.args.get("repo_url")
log_request_start("GET /api/job_logs/download_all", job_id=job_id, repo_url=repo_url)
try:
if not job_id:
duration_ms = int((time.time() - start_time) * 1000)
log_request_error("GET /api/job_logs/download_all", "job_id or repo_url is required", duration_ms)
return jsonify({"error": "job_id or repo_url is required"}), 400
public_ip = ec2_manager.get_public_ip(job_id)
if not public_ip:
duration_ms = int((time.time() - start_time) * 1000)
log_request_error("GET /api/job_logs/download_all", "instance public ip not available yet", duration_ms)
return jsonify({"error": "instance public ip not available yet"}), 400
# Determine naming prefix
slug = None
try:
parts = (repo_url or "").rstrip("/").split("/")
if len(parts) >= 2:
org = parts[-2]
name = parts[-1].replace(".git", "")
slug = f"{org}-{name}"
except Exception:
pass
if not slug:
slug = job_id
# Determine timestamp (prefer latest optimization log timestamp)
ts = ""
try:
ssh = ec2_manager.open_ssh(public_ip)
try:
cmd = "bash -lc 'basename $(ls -1 /home/ubuntu/app/logs/optimization-*.log 2>/dev/null | sort -r | head -n1)'"
_, stdout, _ = ssh.exec_command(cmd)
base = (stdout.read() or b"").decode().strip()
m = re.match(r"optimization-([0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9]{2}-[0-9]{2})", base)
if m:
ts = m.group(1).replace("T", "_")
finally:
ssh.close()
except Exception:
ts = ""
prefix = f"{slug}_{ts}" if ts else slug
# Stream a zip archive containing all available logs
def generate_zip():
import io, zipfile, os
# Use a temporary directory to store logs before zipping
with tempfile.TemporaryDirectory() as temp_dir:
# 1. Download all logs using our rsync helper
success = _rsync_logs_from_instance(public_ip, temp_dir)
if not success:
# If rsync fails, we can still yield an empty zip or an error file
logger.warning(f"⚠️ [ZIP] Rsync failed for {public_ip}, creating zip with error message.")
mem = io.BytesIO()
with zipfile.ZipFile(mem, mode="w") as zf:
zf.writestr("rsync_error.txt", f"Failed to download logs from instance {job_id} at {public_ip}.")
mem.seek(0)
yield from mem
return
# 2. Zip the contents of the temporary directory
mem = io.BytesIO()
with zipfile.ZipFile(mem, mode="w", compression=zipfile.ZIP_DEFLATED) as zf:
for root, _, files in os.walk(temp_dir):
for file in files:
full_path = os.path.join(root, file)
# Only include files that actually exist
if os.path.exists(full_path):
# Create a relative name for the file inside the zip archive
arcname = f"{prefix}/{file}"
zf.write(full_path, arcname=arcname)
# 3. Stream the zip file
mem.seek(0)
yield from mem
duration_ms = int((time.time() - start_time) * 1000)
log_request_success("GET /api/job_logs/download_all", duration_ms, job_id=job_id)
resp = Response(stream_with_context(generate_zip()), mimetype="application/zip")
resp.headers["Content-Disposition"] = f"attachment; filename={prefix}.zip"
return resp
except Exception as e:
duration_ms = int((time.time() - start_time) * 1000)
log_request_error("GET /api/job_logs/download_all", str(e), duration_ms, job_id=job_id)
return jsonify({"error": str(e)}), 500
# =============================================================================
# SERVER STARTUP
# =============================================================================
if __name__ == "__main__":
"""
Start the Flask development server
This is the entry point for running the application in development mode.
The server provides:
- Web UI for managing repositories and jobs
- REST API for all optimization operations
- Real-time job monitoring and log access
- EC2 instance lifecycle management
Environment variables:
- PORT: Server port (default: 5000)
- AWS_REGION: AWS region for EC2 instances
- AWS_KEY_NAME: EC2 key pair name
- AWS_SECURITY_GROUP: Security group for instances
- AWS_INSTANCE_TYPE: EC2 instance type
- AWS_AMI_ID: AMI ID for instances
- SSH_KEY_PATH: Path to SSH private key
- CODEFLASH_API_KEY: Codeflash API key
- GITHUB_TOKEN: GitHub token for repository access
"""
port = int(os.getenv("PORT", "5000"))
logger.info(f"🚀 Starting Flask development server on port {port}")
app.run(host="0.0.0.0", port=port, debug=True)