"""LLM-graded eval scorer. Feeds the manifest rubric and full conversation to Claude, which scores each criterion. Fully automated, no human input needed. Usage: python3 score.py # score a single run python3 score.py aggregate # aggregate multiple runs """ import json import math import re import subprocess import sys from pathlib import Path CLAUDE_DIR = Path.home() / ".claude" # --- Session reading --- def _read_single_jsonl(jsonl: Path) -> list[str]: """Read a single JSONL file and return formatted text lines.""" texts = [] with open(jsonl) as f: for line in f: try: msg = json.loads(line) except json.JSONDecodeError: continue message = msg.get("message", {}) role = message.get("role", msg.get("type", "")) content = message.get("content", []) parts = [] if isinstance(content, list): for block in content: if not isinstance(block, dict): continue if block.get("type") == "text": parts.append(block["text"]) elif block.get("type") == "tool_use": name = block.get("name", "") inp = block.get("input", {}) cmd = ( inp.get("command", "") if isinstance(inp, dict) else "" ) if cmd: parts.append(f"[{name}] {cmd}") elif name == "Write" and isinstance(inp, dict): # Include full file content for Write calls so # deterministic checks can see profiling scripts content = inp.get("content", "") path = inp.get("file_path", "") parts.append(f"[{name}] {path}\n{content[:2000]}") else: parts.append(f"[{name}] {json.dumps(inp)[:500]}") elif block.get("type") == "tool_result": inner = block.get("content", "") if isinstance(inner, str): parts.append(f"[result] {inner[:2000]}") elif isinstance(inner, list): for item in inner: if ( isinstance(item, dict) and item.get("type") == "text" ): parts.append( f"[result] {item['text'][:2000]}" ) elif isinstance(content, str) and content: parts.append(content) if parts: texts.append(f"[{role}] " + "\n".join(parts)) return texts def read_session_text(session_id: str) -> str: """Read the full conversation from a session JSONL file, including subagents. Claude Code stores subagent sessions at: /subagents/agent-.jsonl This function reads the parent session and all subagent sessions, concatenating them so deterministic scoring checks can see the full agent chain (skill → router → domain agent). """ for jsonl in CLAUDE_DIR.glob(f"projects/*/{session_id}.jsonl"): # Read parent session texts = _read_single_jsonl(jsonl) # Read all subagent sessions (router, domain agents, researchers) subagent_dir = jsonl.parent / session_id / "subagents" if subagent_dir.is_dir(): for sub_jsonl in sorted(subagent_dir.glob("agent-*.jsonl")): sub_texts = _read_single_jsonl(sub_jsonl) if sub_texts: texts.append(f"\n[subagent: {sub_jsonl.stem}]") texts.extend(sub_texts) return "\n\n".join(texts) return "" def get_session_id(result_json_path: Path) -> str | None: """Extract session_id from the claude output JSON.""" if not result_json_path.exists(): return None text = result_json_path.read_text().strip() for line in text.split("\n"): try: data = json.loads(line) if isinstance(data, dict) and "session_id" in data: return data["session_id"] if isinstance(data, list) and data: for item in data: if isinstance(item, dict) and "session_id" in item: return item["session_id"] except json.JSONDecodeError: continue return None def extract_peak_memory(test_output_path: Path) -> float | None: """Extract peak memory from test output [PEAK_MEMORY_MB:X] marker.""" if not test_output_path.exists(): return None text = test_output_path.read_text() match = re.search(r"\[PEAK_MEMORY_MB:([\d.]+)\]", text) return float(match.group(1)) if match else None def check_tests_pass(test_output_path: Path) -> bool: """Check if all tests passed.""" if not test_output_path.exists(): return False text = test_output_path.read_text() return "passed" in text.lower() and "FAILED" not in text # --- Deterministic session-based scoring --- _MEMORY_PROFILER_PATTERNS = re.compile( r"(?:" # Direct bash commands (domain agent style) r"\[Bash\]\s.*(?:memray\s+(?:run|stats|flamegraph|table|tree)|" r"tracemalloc|" r"pytest\s.*--memray|" r"@pytest\.mark\.limit_memory)" r"|" # Profiler usage inside scripts (deep agent writes profiling scripts) r"tracemalloc\.start\(\)" r"|" r"tracemalloc\.take_snapshot\(\)" r"|" r"memray\.Tracker" r")", re.IGNORECASE, ) _CPU_PROFILER_PATTERNS = re.compile( r"(?:" # Direct bash commands (domain agent style) r"\[Bash\]\s.*(?:python[3]?\s+-m\s+cProfile|" r"cProfile\.run|" r"pstats|" r"pyinstrument|" r"py-spy)" r"|" # Profiler usage inside scripts (deep agent writes unified profiling scripts) r"cProfile\.Profile\(\)" r"|" r"profiler\.enable\(\)" r"|" r"pstats\.Stats" r")", re.IGNORECASE, ) def detect_memory_profiler_usage(session_text: str) -> bool: """Check if the agent used a memory profiler during the session.""" return bool(_MEMORY_PROFILER_PATTERNS.search(session_text)) def count_profiling_runs( session_text: str, profiler_type: str = "memory" ) -> int: """Count distinct profiling command invocations in the session. Counts both direct bash commands (domain agent style) and profiling script executions (deep agent writes scripts then runs them). """ pattern = ( _MEMORY_PROFILER_PATTERNS if profiler_type == "memory" else _CPU_PROFILER_PATTERNS ) count = len(pattern.findall(session_text)) # Also count script executions that run profiling scripts # Deep agent writes /tmp/deep_profile.py or similar, then runs it script_runs = len( re.findall( r"\[Bash\]\s.*python[3]?\s+/tmp/\w*prof\w*\.py", session_text, re.IGNORECASE, ) ) return max(count, count + script_runs) _ADVERSARIAL_REVIEW_PATTERNS = re.compile( r"codex-companion\.mjs.*adversarial-review|" r"\[adversarial-review\]", re.IGNORECASE, ) def detect_adversarial_review(session_text: str) -> bool: """Check if the agent ran a Codex adversarial review during the session.""" return bool(_ADVERSARIAL_REVIEW_PATTERNS.search(session_text)) def detect_ranked_list(session_text: str) -> bool: """Check if the agent built a ranked list with impact percentages. Looks for: (1) CPU profiler usage AND (2) output with percentage-based ranking. Supports both domain agent format ([ranked targets]) and deep agent format ([unified targets] with CPU %, MiB, domains columns). """ has_profiler = bool(_CPU_PROFILER_PATTERNS.search(session_text)) # Look for ranking output — lines with percentages in a list/table context has_ranking = bool( re.search( r"(?:\d+\.?\d*\s*%.*(?:function|target|time|cumtime|tottime|CPU|Mem))|" r"(?:(?:#\d|rank|\d\.\s).*\d+\.?\d*\s*%)|" # Deep agent unified targets table r"\[unified targets\]|" r"(?:CPU\s*%.*Mem.*MiB)", session_text, re.IGNORECASE, ) ) return has_profiler and has_ranking # --- LLM scoring --- def build_scoring_prompt( manifest: dict, conversation: str, variant: str ) -> str: """Build the prompt for LLM-based scoring.""" rubric = manifest.get("rubric", {}) criteria = rubric.get("criteria") or rubric.get("per_bug", {}) notes = rubric.get("notes", {}) bugs = manifest.get("bugs", []) bugs_desc = "\n".join( f" - {b['id']}: {b['description']} (dominant={b.get('is_dominant', False)}, " f"contribution={b.get('peak_contribution_pct', '?')}%)" for b in bugs ) criteria_desc = "\n".join( f" - {name} (0-{pts}): {notes.get(name, 'no description')}" for name, pts in criteria.items() ) # Truncate conversation if too long (keep first and last parts) max_chars = 80000 if len(conversation) > max_chars: half = max_chars // 2 conversation = ( conversation[:half] + f"\n\n... [{len(conversation) - max_chars} chars truncated] ...\n\n" + conversation[-half:] ) return f"""You are scoring an AI agent's performance on a code optimization task. ## Task Description {manifest.get("description", "No description")} ## Known Bugs {bugs_desc} ## Scoring Rubric Score each criterion independently. Use the full range (0 to max). {criteria_desc} ## Agent Conversation ({variant}) {conversation} ## Instructions Score each criterion based on what the agent actually did in the conversation. Be strict: mentioning a concept is not the same as doing it. Check for evidence of actual tool use (profiling commands, code edits, test runs). Return ONLY a JSON object with this exact structure: {{ "criteria": {{ {", ".join(f'"{name}": <0-{pts}>' for name, pts in criteria.items())} }}, "notes": "" }}""" def llm_score(prompt: str) -> dict: """Call Claude to score, return parsed JSON.""" result = subprocess.run( [ "claude", "-p", prompt, "--output-format", "json", "--model", "sonnet", ], capture_output=True, text=True, timeout=120, ) # Parse the result output = result.stdout.strip() for line in output.split("\n"): try: data = json.loads(line) # claude --output-format json wraps in {"result": "..."} if isinstance(data, dict) and "result" in data: inner = data["result"] # The result might be a JSON string if isinstance(inner, str): # Extract JSON from markdown code blocks if present json_match = re.search( r"```(?:json)?\s*(\{.*?\})\s*```", inner, re.DOTALL ) if json_match: return json.loads(json_match.group(1)) # Try parsing directly try: return json.loads(inner) except json.JSONDecodeError: # Find JSON object in the text brace_match = re.search( r"\{[^{}]*\"criteria\"[^{}]*\{[^{}]*\}[^{}]*\}", inner, re.DOTALL, ) if brace_match: return json.loads(brace_match.group(0)) elif isinstance(inner, dict): return inner except json.JSONDecodeError: continue print( f"WARNING: Could not parse LLM response:\n{output[:500]}", file=sys.stderr, ) return {} # --- Main scoring --- def score_variant(variant: str, results_dir: Path, manifest: dict) -> dict: """Score a single variant using LLM grading.""" result_json = results_dir / f"{variant}.json" test_output = results_dir / f"{variant}.tests" duration_file = results_dir / f"{variant}.duration" rubric = manifest.get("rubric", {}) criteria = rubric.get("criteria") or rubric.get("per_bug", {}) # Read conversation session_id = get_session_id(result_json) conversation = "" if session_id: conversation = read_session_text(session_id) print(f" Session: {session_id} ({len(conversation)} chars)") if not conversation: # Fallback to result text try: for line in result_json.read_text().strip().split("\n"): data = json.loads(line) if isinstance(data, dict) and "result" in data: conversation = data["result"] break except (json.JSONDecodeError, TypeError): pass print( f" No session JSONL, using result text ({len(conversation)} chars)" ) # LLM scoring print(" Grading with LLM...") prompt = build_scoring_prompt(manifest, conversation, variant) llm_result = llm_score(prompt) scores = llm_result.get("criteria", {}) llm_notes = llm_result.get("notes", "") # Clamp scores to max points for name in list(scores.keys()): max_pts = criteria.get(name, 0) scores[name] = max(0, min(int(scores.get(name, 0)), max_pts)) # Auto-score: tests_pass (deterministic, don't need LLM) if "tests_pass" in criteria: scores["tests_pass"] = ( criteria["tests_pass"] if check_tests_pass(test_output) else 0 ) # Auto-score: optimization_depth from peak memory thresholds auto_score = rubric.get("auto_score", {}) if "optimization_depth" in criteria and "optimization_depth" in auto_score: peak = extract_peak_memory(test_output) if peak is not None: for t in auto_score["optimization_depth"].get("thresholds", []): if peak <= t["max_mb"]: scores["optimization_depth"] = t["points"] llm_notes += ( f" | optimization_depth: {peak:.1f}MB → {t['label']}" ) break # Auto-score: used_memory_profiler (deterministic — did agent use memray/tracemalloc?) if "used_memory_profiler" in criteria and conversation: if detect_memory_profiler_usage(conversation): scores["used_memory_profiler"] = criteria["used_memory_profiler"] llm_notes += " | used_memory_profiler: detected (deterministic)" else: scores["used_memory_profiler"] = 0 llm_notes += ( " | used_memory_profiler: NOT detected (deterministic)" ) # Auto-score: profiled_iteratively (deterministic — count profiling runs) if "profiled_iteratively" in criteria and conversation: count = count_profiling_runs(conversation, "memory") max_pts = criteria["profiled_iteratively"] if count >= 2: scores["profiled_iteratively"] = max_pts elif count == 1: scores["profiled_iteratively"] = 1 else: scores["profiled_iteratively"] = 0 llm_notes += f" | profiled_iteratively: {count} runs (deterministic)" # Auto-score: ran_adversarial_review (deterministic — codex adversarial review invoked) if "ran_adversarial_review" in criteria and conversation: if detect_adversarial_review(conversation): scores["ran_adversarial_review"] = criteria[ "ran_adversarial_review" ] llm_notes += " | ran_adversarial_review: detected (deterministic)" else: scores["ran_adversarial_review"] = 0 llm_notes += ( " | ran_adversarial_review: NOT detected (deterministic)" ) # Auto-score: profiled_and_identified (deterministic — any profiler used) if "profiled_and_identified" in criteria and conversation: has_cpu = bool(_CPU_PROFILER_PATTERNS.search(conversation)) has_mem = detect_memory_profiler_usage(conversation) if has_cpu or has_mem: # Profiler detected — let LLM score the quality (don't override) llm_notes += ( f" | profiler: detected (cpu={has_cpu}, mem={has_mem})" ) else: scores["profiled_and_identified"] = 0 llm_notes += ( " | profiler: NOT detected (deterministic override to 0)" ) # Fill missing criteria with 0 for name in criteria: if name not in scores: scores[name] = 0 total = sum(scores.values()) max_total = rubric.get("total", sum(criteria.values())) duration = None if duration_file.exists(): try: duration = int(duration_file.read_text().strip()) except ValueError: pass peak = extract_peak_memory(test_output) result = { "variant": variant, "total": total, "max": max_total, "criteria": scores, "duration": duration, "notes": llm_notes, } if peak is not None: result["peak_memory_mb"] = peak return result # --- Aggregation --- def aggregate_runs(parent_dir: Path) -> int: """Aggregate scores from multiple runs into stats per criterion.""" run_dirs = sorted(parent_dir.glob("run-*/")) if not run_dirs: print( f"ERROR: No run-*/ directories found in {parent_dir}", file=sys.stderr, ) return 1 for variant in ("skill", "baseline"): score_files = [d / f"{variant}.score.json" for d in run_dirs] score_files = [f for f in score_files if f.exists()] if not score_files: continue scores = [json.loads(f.read_text()) for f in score_files] n = len(scores) # Aggregate totals totals = [s["total"] for s in scores] max_total = scores[0]["max"] # Aggregate per-criterion all_criteria = list(scores[0]["criteria"].keys()) criteria_stats = {} for crit in all_criteria: vals = [s["criteria"].get(crit, 0) for s in scores] avg = sum(vals) / n criteria_stats[crit] = { "scores": vals, "min": min(vals), "max": max(vals), "avg": round(avg, 1), "stddev": round( math.sqrt(sum((v - avg) ** 2 for v in vals) / n), 2 ), } total_avg = sum(totals) / n agg = { "variant": variant, "runs": n, "total": { "scores": totals, "min": min(totals), "max": max(totals), "avg": round(total_avg, 1), "stddev": round( math.sqrt(sum((v - total_avg) ** 2 for v in totals) / n), 2 ), }, "max_possible": max_total, "criteria": criteria_stats, } # Identify flaky criteria (stddev > 0) flaky = [c for c, s in criteria_stats.items() if s["stddev"] > 0] if flaky: agg["flaky_criteria"] = flaky # Collect durations durations = [ s.get("duration") for s in scores if s.get("duration") is not None ] if durations: agg["duration"] = { "min": min(durations), "max": max(durations), "avg": round(sum(durations) / len(durations), 1), } agg_path = parent_dir / f"{variant}.aggregate.json" agg_path.write_text(json.dumps(agg, indent=2)) # Print summary print(f"=== {variant} aggregate ({n} runs) ===") print( f" Total: {agg['total']['avg']}/{max_total} " f"(range {agg['total']['min']}-{agg['total']['max']}, " f"stddev {agg['total']['stddev']})" ) print() for crit, stats in criteria_stats.items(): flaky_mark = " *" if stats["stddev"] > 0 else "" print( f" {crit:40s} avg={stats['avg']:4.1f} " f"range=[{stats['min']}-{stats['max']}] " f"stddev={stats['stddev']}{flaky_mark}" ) if flaky: print( f"\n * flaky criteria (non-zero stddev): {', '.join(flaky)}" ) if agg.get("duration"): d = agg["duration"] print( f"\n Duration: avg={d['avg']}s range=[{d['min']}-{d['max']}s]" ) print() return 0 def score_single(results_dir: Path) -> int: """Score a single results directory.""" manifest_path = results_dir / "manifest.json" if not manifest_path.exists(): print(f"ERROR: No manifest.json in {results_dir}", file=sys.stderr) return 1 manifest = json.loads(manifest_path.read_text()) template = manifest["name"] eval_type = manifest.get("eval_type", "") print(f"=== Scoring: {template} ({eval_type}) ===\n") variant_results = {} for variant in ("skill", "baseline"): result_json = results_dir / f"{variant}.json" if not result_json.exists(): continue print(f"--- {variant} ---") result = score_variant(variant, results_dir, manifest) variant_results[variant] = result # Write score file score_path = results_dir / f"{variant}.score.json" score_path.write_text(json.dumps(result, indent=2)) print(f" Total: {result['total']} / {result['max']}") if result.get("peak_memory_mb"): print(f" Peak memory: {result['peak_memory_mb']:.1f} MB") if result.get("duration"): print(f" Duration: {result['duration']}s") rubric_criteria = manifest.get("rubric", {}).get( "criteria" ) or manifest.get("rubric", {}).get("per_bug", {}) for criterion, score in result["criteria"].items(): max_pts = rubric_criteria.get(criterion, "?") print(f" {criterion}: {score}/{max_pts}") if result.get("notes"): print(f" Notes: {result['notes']}") print() # Comparison if "skill" in variant_results and "baseline" in variant_results: skill = variant_results["skill"] baseline = variant_results["baseline"] gap = skill["total"] - baseline["total"] print("=== Comparison ===") print(f" With-skill: {skill['total']} / {skill['max']}") print(f" Baseline: {baseline['total']} / {baseline['max']}") print(f" Gap: {gap:+d}") comparison = { "template": template, "skill_total": skill["total"], "baseline_total": baseline["total"], "max": skill["max"], "gap": gap, "skill_duration": skill.get("duration"), "baseline_duration": baseline.get("duration"), } if skill.get("peak_memory_mb"): comparison["skill_peak_mb"] = skill["peak_memory_mb"] if baseline.get("peak_memory_mb"): comparison["baseline_peak_mb"] = baseline["peak_memory_mb"] comp_path = results_dir / "comparison.json" comp_path.write_text(json.dumps(comparison, indent=2)) print(f"\n Saved: {comp_path}") return 0 def main(): if len(sys.argv) < 2: print("Usage: python3 score.py ", file=sys.stderr) print( " python3 score.py aggregate ", file=sys.stderr ) return 1 if sys.argv[1] == "aggregate": if len(sys.argv) < 3: print( "Usage: python3 score.py aggregate ", file=sys.stderr, ) return 1 return aggregate_runs(Path(sys.argv[2])) return score_single(Path(sys.argv[1])) if __name__ == "__main__": sys.exit(main())