codeflash-agent/evals/score.py
2026-04-03 17:36:50 -05:00

649 lines
23 KiB
Python

#!/usr/bin/env python3
"""LLM-graded eval scorer.
Feeds the manifest rubric and full conversation to Claude, which scores
each criterion. Fully automated, no human input needed.
Usage:
python3 score.py <results-dir> # score a single run
python3 score.py aggregate <parent-dir> # aggregate multiple runs
"""
import json
import math
import re
import subprocess
import sys
from pathlib import Path
CLAUDE_DIR = Path.home() / ".claude"
# --- Session reading ---
def _read_single_jsonl(jsonl: Path) -> list[str]:
"""Read a single JSONL file and return formatted text lines."""
texts = []
with open(jsonl) as f:
for line in f:
try:
msg = json.loads(line)
except json.JSONDecodeError:
continue
message = msg.get("message", {})
role = message.get("role", msg.get("type", ""))
content = message.get("content", [])
parts = []
if isinstance(content, list):
for block in content:
if not isinstance(block, dict):
continue
if block.get("type") == "text":
parts.append(block["text"])
elif block.get("type") == "tool_use":
name = block.get("name", "")
inp = block.get("input", {})
cmd = inp.get("command", "") if isinstance(inp, dict) else ""
if cmd:
parts.append(f"[{name}] {cmd}")
elif name == "Write" and isinstance(inp, dict):
# Include full file content for Write calls so
# deterministic checks can see profiling scripts
content = inp.get("content", "")
path = inp.get("file_path", "")
parts.append(f"[{name}] {path}\n{content[:2000]}")
else:
parts.append(f"[{name}] {json.dumps(inp)[:500]}")
elif block.get("type") == "tool_result":
inner = block.get("content", "")
if isinstance(inner, str):
parts.append(f"[result] {inner[:2000]}")
elif isinstance(inner, list):
for item in inner:
if isinstance(item, dict) and item.get("type") == "text":
parts.append(f"[result] {item['text'][:2000]}")
elif isinstance(content, str) and content:
parts.append(content)
if parts:
texts.append(f"[{role}] " + "\n".join(parts))
return texts
def read_session_text(session_id: str) -> str:
"""Read the full conversation from a session JSONL file, including subagents.
Claude Code stores subagent sessions at:
<session_id>/subagents/agent-<agentId>.jsonl
This function reads the parent session and all subagent sessions,
concatenating them so deterministic scoring checks can see the full
agent chain (skill → router → domain agent).
"""
for jsonl in CLAUDE_DIR.glob(f"projects/*/{session_id}.jsonl"):
# Read parent session
texts = _read_single_jsonl(jsonl)
# Read all subagent sessions (router, domain agents, researchers)
subagent_dir = jsonl.parent / session_id / "subagents"
if subagent_dir.is_dir():
for sub_jsonl in sorted(subagent_dir.glob("agent-*.jsonl")):
sub_texts = _read_single_jsonl(sub_jsonl)
if sub_texts:
texts.append(f"\n[subagent: {sub_jsonl.stem}]")
texts.extend(sub_texts)
return "\n\n".join(texts)
return ""
def get_session_id(result_json_path: Path) -> str | None:
"""Extract session_id from the claude output JSON."""
if not result_json_path.exists():
return None
text = result_json_path.read_text().strip()
for line in text.split("\n"):
try:
data = json.loads(line)
if isinstance(data, dict) and "session_id" in data:
return data["session_id"]
if isinstance(data, list) and data:
for item in data:
if isinstance(item, dict) and "session_id" in item:
return item["session_id"]
except json.JSONDecodeError:
continue
return None
def extract_peak_memory(test_output_path: Path) -> float | None:
"""Extract peak memory from test output [PEAK_MEMORY_MB:X] marker."""
if not test_output_path.exists():
return None
text = test_output_path.read_text()
match = re.search(r"\[PEAK_MEMORY_MB:([\d.]+)\]", text)
return float(match.group(1)) if match else None
def check_tests_pass(test_output_path: Path) -> bool:
"""Check if all tests passed."""
if not test_output_path.exists():
return False
text = test_output_path.read_text()
return "passed" in text.lower() and "FAILED" not in text
# --- Deterministic session-based scoring ---
_MEMORY_PROFILER_PATTERNS = re.compile(
r"(?:"
# Direct bash commands (domain agent style)
r"\[Bash\]\s.*(?:memray\s+(?:run|stats|flamegraph|table|tree)|"
r"tracemalloc|"
r"pytest\s.*--memray|"
r"@pytest\.mark\.limit_memory)"
r"|"
# Profiler usage inside scripts (deep agent writes profiling scripts)
r"tracemalloc\.start\(\)"
r"|"
r"tracemalloc\.take_snapshot\(\)"
r"|"
r"memray\.Tracker"
r")",
re.IGNORECASE,
)
_CPU_PROFILER_PATTERNS = re.compile(
r"(?:"
# Direct bash commands (domain agent style)
r"\[Bash\]\s.*(?:python[3]?\s+-m\s+cProfile|"
r"cProfile\.run|"
r"pstats|"
r"pyinstrument|"
r"py-spy)"
r"|"
# Profiler usage inside scripts (deep agent writes unified profiling scripts)
r"cProfile\.Profile\(\)"
r"|"
r"profiler\.enable\(\)"
r"|"
r"pstats\.Stats"
r")",
re.IGNORECASE,
)
def detect_memory_profiler_usage(session_text: str) -> bool:
"""Check if the agent used a memory profiler during the session."""
return bool(_MEMORY_PROFILER_PATTERNS.search(session_text))
def count_profiling_runs(session_text: str, profiler_type: str = "memory") -> int:
"""Count distinct profiling command invocations in the session.
Counts both direct bash commands (domain agent style) and profiling
script executions (deep agent writes scripts then runs them).
"""
pattern = _MEMORY_PROFILER_PATTERNS if profiler_type == "memory" else _CPU_PROFILER_PATTERNS
count = len(pattern.findall(session_text))
# Also count script executions that run profiling scripts
# Deep agent writes /tmp/deep_profile.py or similar, then runs it
script_runs = len(re.findall(
r"\[Bash\]\s.*python[3]?\s+/tmp/\w*prof\w*\.py",
session_text, re.IGNORECASE,
))
return max(count, count + script_runs)
_ADVERSARIAL_REVIEW_PATTERNS = re.compile(
r"codex-companion\.mjs.*adversarial-review|"
r"\[adversarial-review\]",
re.IGNORECASE,
)
def detect_adversarial_review(session_text: str) -> bool:
"""Check if the agent ran a Codex adversarial review during the session."""
return bool(_ADVERSARIAL_REVIEW_PATTERNS.search(session_text))
def detect_ranked_list(session_text: str) -> bool:
"""Check if the agent built a ranked list with impact percentages.
Looks for: (1) CPU profiler usage AND (2) output with percentage-based ranking.
Supports both domain agent format ([ranked targets]) and deep agent format
([unified targets] with CPU %, MiB, domains columns).
"""
has_profiler = bool(_CPU_PROFILER_PATTERNS.search(session_text))
# Look for ranking output — lines with percentages in a list/table context
has_ranking = bool(re.search(
r"(?:\d+\.?\d*\s*%.*(?:function|target|time|cumtime|tottime|CPU|Mem))|"
r"(?:(?:#\d|rank|\d\.\s).*\d+\.?\d*\s*%)|"
# Deep agent unified targets table
r"\[unified targets\]|"
r"(?:CPU\s*%.*Mem.*MiB)",
session_text, re.IGNORECASE,
))
return has_profiler and has_ranking
# --- LLM scoring ---
def build_scoring_prompt(manifest: dict, conversation: str, variant: str) -> str:
"""Build the prompt for LLM-based scoring."""
rubric = manifest.get("rubric", {})
criteria = rubric.get("criteria") or rubric.get("per_bug", {})
notes = rubric.get("notes", {})
bugs = manifest.get("bugs", [])
bugs_desc = "\n".join(
f" - {b['id']}: {b['description']} (dominant={b.get('is_dominant', False)}, "
f"contribution={b.get('peak_contribution_pct', '?')}%)"
for b in bugs
)
criteria_desc = "\n".join(
f" - {name} (0-{pts}): {notes.get(name, 'no description')}"
for name, pts in criteria.items()
)
# Truncate conversation if too long (keep first and last parts)
max_chars = 80000
if len(conversation) > max_chars:
half = max_chars // 2
conversation = (
conversation[:half]
+ f"\n\n... [{len(conversation) - max_chars} chars truncated] ...\n\n"
+ conversation[-half:]
)
return f"""You are scoring an AI agent's performance on a code optimization task.
## Task Description
{manifest.get('description', 'No description')}
## Known Bugs
{bugs_desc}
## Scoring Rubric
Score each criterion independently. Use the full range (0 to max).
{criteria_desc}
## Agent Conversation ({variant})
{conversation}
## Instructions
Score each criterion based on what the agent actually did in the conversation.
Be strict: mentioning a concept is not the same as doing it. Check for evidence
of actual tool use (profiling commands, code edits, test runs).
Return ONLY a JSON object with this exact structure:
{{
"criteria": {{
{', '.join(f'"{name}": <0-{pts}>' for name, pts in criteria.items())}
}},
"notes": "<brief explanation of key scoring decisions>"
}}"""
def llm_score(prompt: str) -> dict:
"""Call Claude to score, return parsed JSON."""
result = subprocess.run(
[
"claude", "-p", prompt,
"--output-format", "json",
"--model", "sonnet",
],
capture_output=True,
text=True,
timeout=120,
)
# Parse the result
output = result.stdout.strip()
for line in output.split("\n"):
try:
data = json.loads(line)
# claude --output-format json wraps in {"result": "..."}
if isinstance(data, dict) and "result" in data:
inner = data["result"]
# The result might be a JSON string
if isinstance(inner, str):
# Extract JSON from markdown code blocks if present
json_match = re.search(r"```(?:json)?\s*(\{.*?\})\s*```", inner, re.DOTALL)
if json_match:
return json.loads(json_match.group(1))
# Try parsing directly
try:
return json.loads(inner)
except json.JSONDecodeError:
# Find JSON object in the text
brace_match = re.search(r"\{[^{}]*\"criteria\"[^{}]*\{[^{}]*\}[^{}]*\}", inner, re.DOTALL)
if brace_match:
return json.loads(brace_match.group(0))
elif isinstance(inner, dict):
return inner
except json.JSONDecodeError:
continue
print(f"WARNING: Could not parse LLM response:\n{output[:500]}", file=sys.stderr)
return {}
# --- Main scoring ---
def score_variant(variant: str, results_dir: Path, manifest: dict) -> dict:
"""Score a single variant using LLM grading."""
result_json = results_dir / f"{variant}.json"
test_output = results_dir / f"{variant}.tests"
duration_file = results_dir / f"{variant}.duration"
rubric = manifest.get("rubric", {})
criteria = rubric.get("criteria") or rubric.get("per_bug", {})
# Read conversation
session_id = get_session_id(result_json)
conversation = ""
if session_id:
conversation = read_session_text(session_id)
print(f" Session: {session_id} ({len(conversation)} chars)")
if not conversation:
# Fallback to result text
try:
for line in result_json.read_text().strip().split("\n"):
data = json.loads(line)
if isinstance(data, dict) and "result" in data:
conversation = data["result"]
break
except (json.JSONDecodeError, TypeError):
pass
print(f" No session JSONL, using result text ({len(conversation)} chars)")
# LLM scoring
print(f" Grading with LLM...")
prompt = build_scoring_prompt(manifest, conversation, variant)
llm_result = llm_score(prompt)
scores = llm_result.get("criteria", {})
llm_notes = llm_result.get("notes", "")
# Clamp scores to max points
for name in list(scores.keys()):
max_pts = criteria.get(name, 0)
scores[name] = max(0, min(int(scores.get(name, 0)), max_pts))
# Auto-score: tests_pass (deterministic, don't need LLM)
if "tests_pass" in criteria:
scores["tests_pass"] = criteria["tests_pass"] if check_tests_pass(test_output) else 0
# Auto-score: optimization_depth from peak memory thresholds
auto_score = rubric.get("auto_score", {})
if "optimization_depth" in criteria and "optimization_depth" in auto_score:
peak = extract_peak_memory(test_output)
if peak is not None:
for t in auto_score["optimization_depth"].get("thresholds", []):
if peak <= t["max_mb"]:
scores["optimization_depth"] = t["points"]
llm_notes += f" | optimization_depth: {peak:.1f}MB → {t['label']}"
break
# Auto-score: used_memory_profiler (deterministic — did agent use memray/tracemalloc?)
if "used_memory_profiler" in criteria and conversation:
if detect_memory_profiler_usage(conversation):
scores["used_memory_profiler"] = criteria["used_memory_profiler"]
llm_notes += " | used_memory_profiler: detected (deterministic)"
else:
scores["used_memory_profiler"] = 0
llm_notes += " | used_memory_profiler: NOT detected (deterministic)"
# Auto-score: profiled_iteratively (deterministic — count profiling runs)
if "profiled_iteratively" in criteria and conversation:
count = count_profiling_runs(conversation, "memory")
max_pts = criteria["profiled_iteratively"]
if count >= 2:
scores["profiled_iteratively"] = max_pts
elif count == 1:
scores["profiled_iteratively"] = 1
else:
scores["profiled_iteratively"] = 0
llm_notes += f" | profiled_iteratively: {count} runs (deterministic)"
# Auto-score: ran_adversarial_review (deterministic — codex adversarial review invoked)
if "ran_adversarial_review" in criteria and conversation:
if detect_adversarial_review(conversation):
scores["ran_adversarial_review"] = criteria["ran_adversarial_review"]
llm_notes += " | ran_adversarial_review: detected (deterministic)"
else:
scores["ran_adversarial_review"] = 0
llm_notes += " | ran_adversarial_review: NOT detected (deterministic)"
# Auto-score: profiled_and_identified (deterministic — any profiler used)
if "profiled_and_identified" in criteria and conversation:
has_cpu = bool(_CPU_PROFILER_PATTERNS.search(conversation))
has_mem = detect_memory_profiler_usage(conversation)
if has_cpu or has_mem:
# Profiler detected — let LLM score the quality (don't override)
llm_notes += f" | profiler: detected (cpu={has_cpu}, mem={has_mem})"
else:
scores["profiled_and_identified"] = 0
llm_notes += " | profiler: NOT detected (deterministic override to 0)"
# Fill missing criteria with 0
for name in criteria:
if name not in scores:
scores[name] = 0
total = sum(scores.values())
max_total = rubric.get("total", sum(criteria.values()))
duration = None
if duration_file.exists():
try:
duration = int(duration_file.read_text().strip())
except ValueError:
pass
peak = extract_peak_memory(test_output)
result = {
"variant": variant,
"total": total,
"max": max_total,
"criteria": scores,
"duration": duration,
"notes": llm_notes,
}
if peak is not None:
result["peak_memory_mb"] = peak
return result
# --- Aggregation ---
def aggregate_runs(parent_dir: Path) -> int:
"""Aggregate scores from multiple runs into stats per criterion."""
run_dirs = sorted(parent_dir.glob("run-*/"))
if not run_dirs:
print(f"ERROR: No run-*/ directories found in {parent_dir}", file=sys.stderr)
return 1
for variant in ("skill", "baseline"):
score_files = [d / f"{variant}.score.json" for d in run_dirs]
score_files = [f for f in score_files if f.exists()]
if not score_files:
continue
scores = [json.loads(f.read_text()) for f in score_files]
n = len(scores)
# Aggregate totals
totals = [s["total"] for s in scores]
max_total = scores[0]["max"]
# Aggregate per-criterion
all_criteria = list(scores[0]["criteria"].keys())
criteria_stats = {}
for crit in all_criteria:
vals = [s["criteria"].get(crit, 0) for s in scores]
avg = sum(vals) / n
criteria_stats[crit] = {
"scores": vals,
"min": min(vals),
"max": max(vals),
"avg": round(avg, 1),
"stddev": round(math.sqrt(sum((v - avg) ** 2 for v in vals) / n), 2),
}
total_avg = sum(totals) / n
agg = {
"variant": variant,
"runs": n,
"total": {
"scores": totals,
"min": min(totals),
"max": max(totals),
"avg": round(total_avg, 1),
"stddev": round(math.sqrt(sum((v - total_avg) ** 2 for v in totals) / n), 2),
},
"max_possible": max_total,
"criteria": criteria_stats,
}
# Identify flaky criteria (stddev > 0)
flaky = [c for c, s in criteria_stats.items() if s["stddev"] > 0]
if flaky:
agg["flaky_criteria"] = flaky
# Collect durations
durations = [s.get("duration") for s in scores if s.get("duration") is not None]
if durations:
agg["duration"] = {
"min": min(durations),
"max": max(durations),
"avg": round(sum(durations) / len(durations), 1),
}
agg_path = parent_dir / f"{variant}.aggregate.json"
agg_path.write_text(json.dumps(agg, indent=2))
# Print summary
print(f"=== {variant} aggregate ({n} runs) ===")
print(f" Total: {agg['total']['avg']}/{max_total} "
f"(range {agg['total']['min']}-{agg['total']['max']}, "
f"stddev {agg['total']['stddev']})")
print()
for crit, stats in criteria_stats.items():
flaky_mark = " *" if stats["stddev"] > 0 else ""
print(f" {crit:40s} avg={stats['avg']:4.1f} "
f"range=[{stats['min']}-{stats['max']}] "
f"stddev={stats['stddev']}{flaky_mark}")
if flaky:
print(f"\n * flaky criteria (non-zero stddev): {', '.join(flaky)}")
if agg.get("duration"):
d = agg["duration"]
print(f"\n Duration: avg={d['avg']}s range=[{d['min']}-{d['max']}s]")
print()
return 0
def score_single(results_dir: Path) -> int:
"""Score a single results directory."""
manifest_path = results_dir / "manifest.json"
if not manifest_path.exists():
print(f"ERROR: No manifest.json in {results_dir}", file=sys.stderr)
return 1
manifest = json.loads(manifest_path.read_text())
template = manifest["name"]
eval_type = manifest.get("eval_type", "")
print(f"=== Scoring: {template} ({eval_type}) ===\n")
variant_results = {}
for variant in ("skill", "baseline"):
result_json = results_dir / f"{variant}.json"
if not result_json.exists():
continue
print(f"--- {variant} ---")
result = score_variant(variant, results_dir, manifest)
variant_results[variant] = result
# Write score file
score_path = results_dir / f"{variant}.score.json"
score_path.write_text(json.dumps(result, indent=2))
print(f" Total: {result['total']} / {result['max']}")
if result.get("peak_memory_mb"):
print(f" Peak memory: {result['peak_memory_mb']:.1f} MB")
if result.get("duration"):
print(f" Duration: {result['duration']}s")
rubric_criteria = (manifest.get("rubric", {}).get("criteria") or
manifest.get("rubric", {}).get("per_bug", {}))
for criterion, score in result["criteria"].items():
max_pts = rubric_criteria.get(criterion, "?")
print(f" {criterion}: {score}/{max_pts}")
if result.get("notes"):
print(f" Notes: {result['notes']}")
print()
# Comparison
if "skill" in variant_results and "baseline" in variant_results:
skill = variant_results["skill"]
baseline = variant_results["baseline"]
gap = skill["total"] - baseline["total"]
print("=== Comparison ===")
print(f" With-skill: {skill['total']} / {skill['max']}")
print(f" Baseline: {baseline['total']} / {baseline['max']}")
print(f" Gap: {gap:+d}")
comparison = {
"template": template,
"skill_total": skill["total"],
"baseline_total": baseline["total"],
"max": skill["max"],
"gap": gap,
"skill_duration": skill.get("duration"),
"baseline_duration": baseline.get("duration"),
}
if skill.get("peak_memory_mb"):
comparison["skill_peak_mb"] = skill["peak_memory_mb"]
if baseline.get("peak_memory_mb"):
comparison["baseline_peak_mb"] = baseline["peak_memory_mb"]
comp_path = results_dir / "comparison.json"
comp_path.write_text(json.dumps(comparison, indent=2))
print(f"\n Saved: {comp_path}")
return 0
def main():
if len(sys.argv) < 2:
print("Usage: python3 score.py <results-dir>", file=sys.stderr)
print(" python3 score.py aggregate <parent-dir>", file=sys.stderr)
return 1
if sys.argv[1] == "aggregate":
if len(sys.argv) < 3:
print("Usage: python3 score.py aggregate <parent-dir>", file=sys.stderr)
return 1
return aggregate_runs(Path(sys.argv[2]))
return score_single(Path(sys.argv[1]))
if __name__ == "__main__":
sys.exit(main())