codeflash-agent/scripts/codex_insights.py
Kevin Turcios 20f6c59f05
Lint and format entire repo, not just packages (#23)
Remove .codeflash/ from ruff extend-exclude, add per-file ignores
for .codeflash/, scripts/, evals/, and plugin/ (benchmark/script
patterns like print, eval, magic values). Remove shebangs. Widen
pre-commit hooks to check the full repo.
2026-04-15 03:16:15 -05:00

1288 lines
48 KiB
Python

# /// script
# requires-python = ">=3.11"
# ///
"""Portable Codex CLI usage report generator.
This script scans Codex session JSONL files under ``~/.codex/sessions`` and
builds a deterministic HTML report plus JSON export.
It is intentionally Codex-specific:
1. Session discovery uses Codex rollout files, not Claude transcripts.
2. Metrics are derived from Codex event types like ``user_message``,
``function_call``, ``exec_command_end``, and ``web_search_end``.
3. Project scoping matches session ``cwd`` values, with repo-family matching
for worktrees and related clones.
"""
from __future__ import annotations
import argparse
import html
import json
import os
import re
import shlex
import subprocess
from collections import Counter
from dataclasses import asdict, dataclass, field
from datetime import datetime, timezone
from functools import lru_cache
from pathlib import Path
from typing import Any
LABEL_MAP: dict[str, str] = {
"debug_investigate": "Debug/Investigate",
"implement_feature": "Implement Feature",
"fix_bug": "Fix Bug",
"write_script_tool": "Write Script/Tool",
"refactor_code": "Refactor Code",
"configure_system": "Configure System",
"create_pr_commit": "Create PR/Commit",
"analyze_data": "Analyze Data",
"understand_codebase": "Understand Codebase",
"write_tests": "Write Tests",
"write_docs": "Write Docs",
"warmup_minimal": "Quick Check",
"command_failed": "Command Failed",
"turn_aborted": "Turn Aborted",
"model_error": "Model Error",
"approval_requested": "Approval Requested",
"completed_cleanly": "Completed Cleanly",
"completed_with_retries": "Completed With Retries",
"interrupted": "Interrupted",
"incomplete": "Incomplete",
"web_search": "Web Search",
"exec_command": "Exec Command",
"shell": "Shell",
"shell_command": "Shell Command",
"spawn_agent": "Spawn Agent",
"update_plan": "Update Plan",
"request_user_input": "Request User Input",
}
GOAL_PATTERNS: dict[str, list[re.Pattern[str]]] = {
"debug_investigate": [
re.compile(r"\bdebug\b", re.IGNORECASE),
re.compile(r"\binvestigat", re.IGNORECASE),
re.compile(r"\btrace\b", re.IGNORECASE),
re.compile(r"\berror\b", re.IGNORECASE),
re.compile(r"\bwhy\b", re.IGNORECASE),
re.compile(r"\bfail", re.IGNORECASE),
],
"implement_feature": [
re.compile(r"\bimplement\b", re.IGNORECASE),
re.compile(r"\bbuild\b", re.IGNORECASE),
re.compile(r"\bfeature\b", re.IGNORECASE),
re.compile(r"\badd\b", re.IGNORECASE),
re.compile(r"\bcreate\b", re.IGNORECASE),
],
"fix_bug": [
re.compile(r"\bfix\b", re.IGNORECASE),
re.compile(r"\bbug\b", re.IGNORECASE),
re.compile(r"\bbroken\b", re.IGNORECASE),
re.compile(r"\bfailing\b", re.IGNORECASE),
],
"write_script_tool": [
re.compile(r"\bscript\b", re.IGNORECASE),
re.compile(r"\bcli\b", re.IGNORECASE),
re.compile(r"\btool\b", re.IGNORECASE),
re.compile(r"\bautomation\b", re.IGNORECASE),
],
"refactor_code": [
re.compile(r"\brefactor\b", re.IGNORECASE),
re.compile(r"\bcleanup\b", re.IGNORECASE),
re.compile(r"\breorgan", re.IGNORECASE),
re.compile(r"\bsimplif", re.IGNORECASE),
],
"configure_system": [
re.compile(r"\bconfigure\b", re.IGNORECASE),
re.compile(r"\bsetup\b", re.IGNORECASE),
re.compile(r"\binstall\b", re.IGNORECASE),
re.compile(r"\bconfig\b", re.IGNORECASE),
re.compile(r"\benv\b", re.IGNORECASE),
re.compile(r"\bci\b", re.IGNORECASE),
],
"create_pr_commit": [
re.compile(r"\bcommit\b", re.IGNORECASE),
re.compile(r"\bpull request\b", re.IGNORECASE),
re.compile(r"\bpr\b", re.IGNORECASE),
re.compile(r"\bmerge\b", re.IGNORECASE),
re.compile(r"\bbranch\b", re.IGNORECASE),
],
"analyze_data": [
re.compile(r"\banaly[sz]e\b", re.IGNORECASE),
re.compile(r"\bmetrics\b", re.IGNORECASE),
re.compile(r"\breport\b", re.IGNORECASE),
re.compile(r"\binsights?\b", re.IGNORECASE),
re.compile(r"\bdata\b", re.IGNORECASE),
],
"understand_codebase": [
re.compile(r"\bunderstand\b", re.IGNORECASE),
re.compile(r"\bexplain\b", re.IGNORECASE),
re.compile(r"\bwalk ?through\b", re.IGNORECASE),
re.compile(r"\bhow does\b", re.IGNORECASE),
re.compile(r"\bwhere is\b", re.IGNORECASE),
re.compile(r"\bfind\b", re.IGNORECASE),
],
"write_tests": [
re.compile(r"\btests?\b", re.IGNORECASE),
re.compile(r"\bpytest\b", re.IGNORECASE),
re.compile(r"\bunit test\b", re.IGNORECASE),
re.compile(r"\bintegration test\b", re.IGNORECASE),
re.compile(r"\bbenchmark\b", re.IGNORECASE),
],
"write_docs": [
re.compile(r"\breadme\b", re.IGNORECASE),
re.compile(r"\bdocs?\b", re.IGNORECASE),
re.compile(r"\bdocument", re.IGNORECASE),
],
}
FRICTION_DESCRIPTIONS: dict[str, str] = {
"command_failed": "Shell execution is the main source of drag in these sessions.",
"turn_aborted": "You are redirecting or interrupting turns before they land cleanly.",
"model_error": "The model/runtime occasionally trips over its own protocol or request handling.",
"approval_requested": "Some sessions slow down because they need explicit privilege escalation.",
}
MCP_TOOL_NAMES = {
"list_mcp_resources",
"list_mcp_resource_templates",
"read_mcp_resource",
}
SHELL_TOOL_NAMES = {"exec_command", "shell", "shell_command"}
TEST_COMMAND_RE = re.compile(
r"\b(pytest|npm test|pnpm test|yarn test|cargo test|go test|vitest|jest|ruff|mypy|gradle test|mvn test)\b",
re.IGNORECASE,
)
GIT_COMMIT_RE = re.compile(r"(^|[;&|]\s*|\s)git\s+commit\b", re.IGNORECASE)
GIT_PUSH_RE = re.compile(r"(^|[;&|]\s*|\s)git\s+push\b", re.IGNORECASE)
GH_RE = re.compile(r"(^|[;&|]\s*|\s)gh\b", re.IGNORECASE)
@dataclass(frozen=True)
class RepoIdentity:
root: str
common_dir: str | None
remotes: frozenset[str]
worktrees: tuple[str, ...] = ()
@dataclass(frozen=True)
class ProjectScope:
target_prefix: str | None
path_prefixes: frozenset[str]
common_dirs: frozenset[str]
remotes: frozenset[str]
@dataclass
class CodexSession:
session_id: str
file_path: str
start_time: str
end_time: str
cwd: str
model: str
model_provider: str
cli_version: str
duration_minutes: float
user_message_count: int
assistant_message_count: int
final_answer_count: int
commentary_count: int
shell_command_count: int
command_failures: int
web_search_count: int
web_open_count: int
approval_requests: int
spawn_agent_count: int
mcp_call_count: int
invalid_request_errors: int
interruptions: int
total_input_tokens: int
total_output_tokens: int
total_reasoning_tokens: int
git_commits: int
git_pushes: int
gh_commands: int
test_commands: int
tool_counts: dict[str, int] = field(default_factory=dict)
command_families: dict[str, int] = field(default_factory=dict)
goal_categories: dict[str, int] = field(default_factory=dict)
friction: dict[str, int] = field(default_factory=dict)
user_messages: list[str] = field(default_factory=list)
assistant_summaries: list[str] = field(default_factory=list)
command_failure_examples: list[str] = field(default_factory=list)
first_prompt: str = ""
final_answer: str = ""
outcome: str = ""
@dataclass
class AggregatedData:
total_sessions: int
date_range: dict[str, str]
total_user_messages: int = 0
total_assistant_messages: int = 0
total_duration_hours: float = 0.0
total_input_tokens: int = 0
total_output_tokens: int = 0
total_reasoning_tokens: int = 0
total_shell_commands: int = 0
total_command_failures: int = 0
total_web_searches: int = 0
total_web_opens: int = 0
total_approval_requests: int = 0
total_spawn_agents: int = 0
total_mcp_calls: int = 0
total_interruptions: int = 0
total_invalid_request_errors: int = 0
git_commits: int = 0
git_pushes: int = 0
gh_commands: int = 0
test_commands: int = 0
sessions_with_mcp: int = 0
sessions_with_subagents: int = 0
sessions_with_web_search: int = 0
tool_counts: dict[str, int] = field(default_factory=dict)
command_families: dict[str, int] = field(default_factory=dict)
models: dict[str, int] = field(default_factory=dict)
providers: dict[str, int] = field(default_factory=dict)
projects: dict[str, int] = field(default_factory=dict)
goal_categories: dict[str, int] = field(default_factory=dict)
friction: dict[str, int] = field(default_factory=dict)
outcomes: dict[str, int] = field(default_factory=dict)
session_summaries: list[dict[str, str]] = field(default_factory=list)
def parse_args() -> argparse.Namespace:
script_dir = Path(__file__).resolve().parent
home = Path(os.path.expanduser("~"))
parser = argparse.ArgumentParser(
description="Generate a Codex CLI usage report from ~/.codex/sessions.",
)
parser.add_argument(
"--sessions-dir",
type=Path,
default=home / ".codex" / "sessions",
help="Directory containing Codex session rollout JSONL files.",
)
parser.add_argument(
"--cache-dir",
type=Path,
default=script_dir / "codex-insights-output",
help="Directory for the generated HTML report and JSON export.",
)
parser.add_argument(
"--project-path-prefix",
type=str,
default=None,
help=(
"Only include sessions whose cwd matches this path, a descendant, "
"or a related repo/worktree/fork in the same repo family."
),
)
parser.add_argument(
"--output-html",
type=Path,
default=None,
help="Path for the generated HTML report. Defaults to <cache-dir>/report.html.",
)
parser.add_argument(
"--output-json",
type=Path,
default=None,
help="Path for the JSON export. Defaults to <cache-dir>/report.json.",
)
return parser.parse_args()
def ensure_dir(path: Path) -> None:
path.mkdir(parents=True, exist_ok=True)
def parse_iso_timestamp(value: str | None) -> datetime:
if not value:
return datetime.fromtimestamp(0, tz=timezone.utc)
normalized = value
if normalized.endswith("Z"):
normalized = normalized[:-1] + "+00:00"
try:
return datetime.fromisoformat(normalized)
except ValueError:
return datetime.fromtimestamp(0, tz=timezone.utc)
def iso_date(value: str) -> str:
return parse_iso_timestamp(value).date().isoformat()
def truncate(text: str, length: int) -> str:
collapsed = " ".join(text.split())
if len(collapsed) <= length:
return collapsed
return collapsed[: max(0, length - 1)].rstrip() + "..."
def safe_title(value: str) -> str:
return LABEL_MAP.get(value, value.replace("_", " ").title())
def normalize_path_for_match(path_text: str) -> str:
return os.path.normpath(os.path.realpath(os.path.expanduser(path_text)))
def run_git(args: list[str], cwd: str) -> str | None:
try:
completed = subprocess.run(
["git", *args],
cwd=cwd,
check=True,
stdout=subprocess.PIPE,
stderr=subprocess.DEVNULL,
text=True,
)
except Exception:
return None
return completed.stdout.strip()
@lru_cache(maxsize=512)
def get_repo_identity(path_text: str) -> RepoIdentity | None:
normalized = normalize_path_for_match(path_text)
top_level = run_git(["rev-parse", "--show-toplevel"], normalized)
if not top_level:
return None
common_dir = run_git(["rev-parse", "--git-common-dir"], normalized)
if common_dir and not os.path.isabs(common_dir):
common_dir = normalize_path_for_match(
os.path.join(normalized, common_dir)
)
elif common_dir:
common_dir = normalize_path_for_match(common_dir)
remote_lines = run_git(["remote", "-v"], normalized) or ""
remotes: set[str] = set()
for line in remote_lines.splitlines():
parts = line.split()
if len(parts) >= 2:
remotes.add(parts[1].removesuffix(".git"))
worktree_text = (
run_git(["worktree", "list", "--porcelain"], normalized) or ""
)
worktrees: list[str] = []
for line in worktree_text.splitlines():
if line.startswith("worktree "):
worktrees.append(
normalize_path_for_match(
line.removeprefix("worktree ").strip()
)
)
return RepoIdentity(
root=normalize_path_for_match(top_level),
common_dir=common_dir,
remotes=frozenset(remotes),
worktrees=tuple(worktrees),
)
def discover_git_roots(base_path: Path) -> set[str]:
discovered: set[str] = set()
if not base_path.exists():
return discovered
skip_dirs = {
".git",
".venv",
"node_modules",
"__pycache__",
".pytest_cache",
".ruff_cache",
}
for root, dirs, files in os.walk(base_path):
dirs[:] = [entry for entry in dirs if entry not in skip_dirs]
if ".git" in dirs or ".git" in files:
discovered.add(normalize_path_for_match(root))
dirs[:] = []
return discovered
def build_project_scope(prefix: str | None) -> ProjectScope:
if not prefix:
return ProjectScope(
target_prefix=None,
path_prefixes=frozenset(),
common_dirs=frozenset(),
remotes=frozenset(),
)
normalized_prefix = normalize_path_for_match(prefix)
path_prefixes: set[str] = {normalized_prefix}
common_dirs: set[str] = set()
remotes: set[str] = set()
candidate_roots = discover_git_roots(Path(normalized_prefix))
direct_identity = get_repo_identity(normalized_prefix)
if direct_identity:
candidate_roots.add(direct_identity.root)
for repo_root in candidate_roots:
identity = get_repo_identity(repo_root)
if not identity:
continue
path_prefixes.add(identity.root)
path_prefixes.update(identity.worktrees)
if identity.common_dir:
common_dirs.add(identity.common_dir)
remotes.update(identity.remotes)
return ProjectScope(
target_prefix=normalized_prefix,
path_prefixes=frozenset(path_prefixes),
common_dirs=frozenset(common_dirs),
remotes=frozenset(remotes),
)
def matches_project_scope(cwd: str, scope: ProjectScope) -> bool:
if scope.target_prefix is None:
return True
if not cwd:
return False
normalized_cwd = normalize_path_for_match(cwd)
for prefix in scope.path_prefixes:
if normalized_cwd == prefix or normalized_cwd.startswith(
prefix + os.sep
):
return True
identity = get_repo_identity(normalized_cwd)
if not identity:
return False
if identity.common_dir and identity.common_dir in scope.common_dirs:
return True
return bool(scope.remotes and identity.remotes.intersection(scope.remotes))
def parse_tool_arguments(raw: str | None) -> dict[str, Any]:
if not raw:
return {}
try:
value = json.loads(raw)
except Exception:
return {}
return value if isinstance(value, dict) else {}
def extract_message_text(content: Any) -> str:
if not isinstance(content, list):
return ""
parts: list[str] = []
for item in content:
if not isinstance(item, dict):
continue
if item.get("type") in {"output_text", "input_text"}:
text = item.get("text")
if isinstance(text, str) and text.strip():
parts.append(text.strip())
return "\n".join(parts)
def is_env_assignment(token: str) -> bool:
return bool(re.match(r"^[A-Za-z_][A-Za-z0-9_]*=.*$", token))
def command_tokens(command_text: str) -> list[str]:
if not command_text.strip():
return []
try:
tokens = shlex.split(command_text)
except ValueError:
tokens = command_text.split()
while tokens and is_env_assignment(tokens[0]):
tokens.pop(0)
return tokens
def command_family(command_text: str) -> str:
tokens = command_tokens(command_text)
if not tokens:
return ""
return Path(tokens[0]).name
def extract_exec_command_text(payload: dict[str, Any]) -> str:
command = payload.get("command")
if isinstance(command, list):
if len(command) >= 3 and command[1] == "-lc":
return str(command[2])
return " ".join(str(part) for part in command)
if isinstance(command, str):
return command
return ""
def detect_goal_categories(session: CodexSession) -> dict[str, int]:
counts: Counter[str] = Counter()
if session.user_message_count <= 1 and session.shell_command_count <= 1:
return {"warmup_minimal": 1}
for text in session.user_messages:
for category, patterns in GOAL_PATTERNS.items():
if any(pattern.search(text) for pattern in patterns):
counts[category] += 1
if session.test_commands > 0:
counts["write_tests"] += 1
if (
session.git_commits > 0
or session.gh_commands > 0
or session.git_pushes > 0
):
counts["create_pr_commit"] += 1
if session.web_search_count > 0 and not counts:
counts["understand_codebase"] += 1
if session.command_failures > 0 and not counts:
counts["debug_investigate"] += 1
if session.shell_command_count > 0 and not counts:
counts["understand_codebase"] += 1
if not counts:
counts["warmup_minimal"] += 1
return dict(counts)
def detect_friction(session: CodexSession) -> dict[str, int]:
counts: Counter[str] = Counter()
if session.command_failures > 0:
counts["command_failed"] += session.command_failures
if session.interruptions > 0:
counts["turn_aborted"] += session.interruptions
if session.invalid_request_errors > 0:
counts["model_error"] += session.invalid_request_errors
if session.approval_requests > 0:
counts["approval_requested"] += session.approval_requests
return dict(counts)
def infer_outcome(session: CodexSession) -> str:
if (
session.final_answer_count > 0
and session.command_failures == 0
and session.interruptions == 0
):
return "completed_cleanly"
if session.final_answer_count > 0:
return "completed_with_retries"
if session.interruptions > 0:
return "interrupted"
return "incomplete"
def summarize_assistant(session: CodexSession) -> str:
if session.final_answer:
return truncate(session.final_answer, 140)
if session.assistant_summaries:
return truncate(session.assistant_summaries[0], 140)
if session.first_prompt:
return truncate(session.first_prompt, 140)
return "No assistant summary captured."
def parse_session_file(
session_path: Path, scope: ProjectScope
) -> CodexSession | None:
session_id = session_path.stem.split("-")[-1]
cwd = ""
model = ""
provider = ""
cli_version = ""
first_seen_ts = ""
last_seen_ts = ""
latest_input_tokens = 0
latest_output_tokens = 0
latest_reasoning_tokens = 0
user_messages: list[str] = []
assistant_summaries: list[str] = []
command_failure_examples: list[str] = []
tool_counts: Counter[str] = Counter()
command_families: Counter[str] = Counter()
shell_command_count = 0
command_failures = 0
web_search_count = 0
web_open_count = 0
approval_requests = 0
spawn_agent_count = 0
mcp_call_count = 0
invalid_request_errors = 0
interruptions = 0
assistant_message_count = 0
final_answer_count = 0
commentary_count = 0
git_commits = 0
git_pushes = 0
gh_commands = 0
test_commands = 0
final_answer = ""
scope_match = scope.target_prefix is None
with session_path.open("r", encoding="utf-8") as handle:
for raw_line in handle:
if "invalid_request_error" in raw_line:
invalid_request_errors += 1
line = raw_line.strip()
if not line:
continue
try:
event = json.loads(line)
except json.JSONDecodeError:
continue
timestamp = str(event.get("timestamp") or "")
if timestamp:
if not first_seen_ts:
first_seen_ts = timestamp
last_seen_ts = timestamp
payload = event.get("payload")
if not isinstance(payload, dict):
continue
event_type = event.get("type")
if event_type == "session_meta":
session_id = str(payload.get("id") or session_id)
cwd = str(payload.get("cwd") or cwd)
provider = str(payload.get("model_provider") or provider)
cli_version = str(payload.get("cli_version") or cli_version)
first_seen_ts = str(payload.get("timestamp") or first_seen_ts)
if cwd and matches_project_scope(cwd, scope):
scope_match = True
continue
if event_type == "turn_context":
turn_cwd = str(payload.get("cwd") or "")
if turn_cwd:
cwd = turn_cwd
if matches_project_scope(cwd, scope):
scope_match = True
if not model:
model = str(payload.get("model") or "")
continue
if event_type == "event_msg":
inner_type = payload.get("type")
if inner_type == "user_message":
message = str(payload.get("message") or "").strip()
if message:
user_messages.append(message)
elif inner_type == "exec_command_end":
command_text = extract_exec_command_text(payload)
if command_text:
shell_command_count += 1
family = command_family(command_text)
if family:
command_families[family] += 1
lowered = command_text.lower()
if GIT_COMMIT_RE.search(command_text):
git_commits += 1
if GIT_PUSH_RE.search(command_text):
git_pushes += 1
if GH_RE.search(command_text):
gh_commands += 1
if TEST_COMMAND_RE.search(lowered):
test_commands += 1
exit_code = payload.get("exit_code")
if isinstance(exit_code, int) and exit_code != 0:
command_failures += 1
if command_text and len(command_failure_examples) < 5:
command_failure_examples.append(
truncate(command_text, 120)
)
elif inner_type == "web_search_end":
web_search_count += 1
action = payload.get("action")
if (
isinstance(action, dict)
and action.get("type") == "open_page"
):
web_open_count += 1
elif inner_type == "turn_aborted":
interruptions += 1
elif inner_type == "token_count":
info = payload.get("info")
if isinstance(info, dict):
totals = info.get("total_token_usage")
if isinstance(totals, dict):
latest_input_tokens = int(
totals.get("input_tokens") or 0
)
latest_output_tokens = int(
totals.get("output_tokens") or 0
)
latest_reasoning_tokens = int(
totals.get("reasoning_output_tokens") or 0
)
continue
if event_type != "response_item":
continue
item_type = payload.get("type")
if item_type == "function_call":
name = str(payload.get("name") or "")
if name:
tool_counts[name] += 1
if name.startswith("mcp__") or name in MCP_TOOL_NAMES:
mcp_call_count += 1
if name == "spawn_agent":
spawn_agent_count += 1
args = parse_tool_arguments(payload.get("arguments"))
if args.get("sandbox_permissions") == "require_escalated":
approval_requests += 1
elif item_type == "web_search_call":
tool_counts["web_search"] += 1
elif item_type == "message" and payload.get("role") == "assistant":
text = extract_message_text(payload.get("content"))
if text:
assistant_message_count += 1
phase = str(payload.get("phase") or "")
if phase == "commentary":
commentary_count += 1
elif phase == "final_answer":
final_answer_count += 1
final_answer = text
if phase != "commentary" and len(assistant_summaries) < 5:
assistant_summaries.append(text)
if not scope_match:
return None
start_dt = parse_iso_timestamp(first_seen_ts)
end_dt = parse_iso_timestamp(last_seen_ts or first_seen_ts)
duration_minutes = max(0.0, (end_dt - start_dt).total_seconds() / 60.0)
session = CodexSession(
session_id=session_id,
file_path=str(session_path),
start_time=start_dt.isoformat(),
end_time=end_dt.isoformat(),
cwd=cwd,
model=model,
model_provider=provider,
cli_version=cli_version,
duration_minutes=duration_minutes,
user_message_count=len(user_messages),
assistant_message_count=assistant_message_count,
final_answer_count=final_answer_count,
commentary_count=commentary_count,
shell_command_count=shell_command_count,
command_failures=command_failures,
web_search_count=web_search_count,
web_open_count=web_open_count,
approval_requests=approval_requests,
spawn_agent_count=spawn_agent_count,
mcp_call_count=mcp_call_count,
invalid_request_errors=invalid_request_errors,
interruptions=interruptions,
total_input_tokens=latest_input_tokens,
total_output_tokens=latest_output_tokens,
total_reasoning_tokens=latest_reasoning_tokens,
git_commits=git_commits,
git_pushes=git_pushes,
gh_commands=gh_commands,
test_commands=test_commands,
tool_counts=dict(tool_counts),
command_families=dict(command_families),
user_messages=user_messages,
assistant_summaries=[
truncate(text, 160) for text in assistant_summaries
],
command_failure_examples=command_failure_examples,
first_prompt=truncate(user_messages[0], 160) if user_messages else "",
final_answer=truncate(final_answer, 800) if final_answer else "",
)
session.goal_categories = detect_goal_categories(session)
session.friction = detect_friction(session)
session.outcome = infer_outcome(session)
return session
def scan_all_sessions(
sessions_dir: Path, scope: ProjectScope
) -> list[CodexSession]:
session_files = sorted(sessions_dir.rglob("*.jsonl"))
sessions: list[CodexSession] = []
for session_file in session_files:
parsed = parse_session_file(session_file, scope)
if parsed is not None:
sessions.append(parsed)
sessions.sort(key=lambda item: item.start_time, reverse=True)
return sessions
def top_entries(
data: dict[str, int], limit: int = 5, exclude: set[str] | None = None
) -> list[tuple[str, int]]:
blocked = exclude or set()
return [
(key, value)
for key, value in sorted(
data.items(), key=lambda item: item[1], reverse=True
)
if value > 0 and key not in blocked
][:limit]
def project_label(path_text: str) -> str:
parts = Path(path_text).parts
if len(parts) >= 2:
return "/".join(parts[-2:])
if parts:
return parts[-1]
return path_text or "(unknown)"
def aggregate_sessions(sessions: list[CodexSession]) -> AggregatedData:
aggregated = AggregatedData(
total_sessions=len(sessions),
date_range={"start": "", "end": ""},
)
dates: list[str] = []
for session in sessions:
dates.append(session.start_time)
aggregated.total_user_messages += session.user_message_count
aggregated.total_assistant_messages += session.assistant_message_count
aggregated.total_duration_hours += session.duration_minutes / 60.0
aggregated.total_input_tokens += session.total_input_tokens
aggregated.total_output_tokens += session.total_output_tokens
aggregated.total_reasoning_tokens += session.total_reasoning_tokens
aggregated.total_shell_commands += session.shell_command_count
aggregated.total_command_failures += session.command_failures
aggregated.total_web_searches += session.web_search_count
aggregated.total_web_opens += session.web_open_count
aggregated.total_approval_requests += session.approval_requests
aggregated.total_spawn_agents += session.spawn_agent_count
aggregated.total_mcp_calls += session.mcp_call_count
aggregated.total_interruptions += session.interruptions
aggregated.total_invalid_request_errors += (
session.invalid_request_errors
)
aggregated.git_commits += session.git_commits
aggregated.git_pushes += session.git_pushes
aggregated.gh_commands += session.gh_commands
aggregated.test_commands += session.test_commands
aggregated.sessions_with_mcp += int(session.mcp_call_count > 0)
aggregated.sessions_with_subagents += int(
session.spawn_agent_count > 0
)
aggregated.sessions_with_web_search += int(
session.web_search_count > 0
)
if session.cwd:
aggregated.projects[session.cwd] = (
aggregated.projects.get(session.cwd, 0) + 1
)
if session.model:
aggregated.models[session.model] = (
aggregated.models.get(session.model, 0) + 1
)
if session.model_provider:
aggregated.providers[session.model_provider] = (
aggregated.providers.get(session.model_provider, 0) + 1
)
aggregated.outcomes[session.outcome] = (
aggregated.outcomes.get(session.outcome, 0) + 1
)
for key, count in session.tool_counts.items():
aggregated.tool_counts[key] = (
aggregated.tool_counts.get(key, 0) + count
)
for key, count in session.command_families.items():
aggregated.command_families[key] = (
aggregated.command_families.get(key, 0) + count
)
for key, count in session.goal_categories.items():
aggregated.goal_categories[key] = (
aggregated.goal_categories.get(key, 0) + count
)
for key, count in session.friction.items():
aggregated.friction[key] = aggregated.friction.get(key, 0) + count
if len(aggregated.session_summaries) < 50:
aggregated.session_summaries.append(
{
"id": session.session_id[:8],
"date": iso_date(session.start_time),
"cwd": session.cwd,
"project": project_label(session.cwd),
"prompt": session.first_prompt,
"summary": summarize_assistant(session),
"outcome": session.outcome,
"failures": str(session.command_failures),
}
)
if dates:
dates.sort()
aggregated.date_range["start"] = iso_date(dates[0])
aggregated.date_range["end"] = iso_date(dates[-1])
return aggregated
def build_at_a_glance(data: AggregatedData) -> dict[str, str]:
top_goal = top_entries(
data.goal_categories, limit=1, exclude={"warmup_minimal"}
)
top_project = top_entries(data.projects, limit=1)
top_tool = top_entries(data.tool_counts, limit=3)
top_command = top_entries(data.command_families, limit=3)
top_friction = top_entries(data.friction, limit=1)
work_text = (
"Most sessions are broad repo work rather than a single repeated task."
)
if top_goal:
work_text = f"Most Codex work in this slice is about {safe_title(top_goal[0][0]).lower()}."
if top_project:
work_text += f" The most common working directory is {project_label(top_project[0][0])}."
workflow_text = "Your workflow mixes repo search, shell execution, and short conversational steering."
if data.total_shell_commands >= max(10, data.total_sessions * 8):
workflow_text = "This is a terminal-first Codex workflow: most value comes from command execution, not long-form chat."
if data.sessions_with_subagents > 0:
workflow_text += f" Subagents show up in {data.sessions_with_subagents} session(s), so delegation is present but selective."
external_text = "External context is limited."
if data.total_web_searches > 0 and data.total_mcp_calls == 0:
external_text = "External context is coming mostly from web search; MCP usage is rare in the captured sessions."
elif data.total_mcp_calls > 0:
external_text = "These sessions do use MCP-backed context rather than relying purely on shell and web search."
friction_text = "Measured friction is low."
if top_friction:
friction_key = top_friction[0][0]
friction_text = FRICTION_DESCRIPTIONS.get(
friction_key,
f"The main drag is {safe_title(friction_key).lower()}.",
)
tools_text = "Top tools are mixed."
if top_tool or top_command:
tool_names = ", ".join(
safe_title(name) for name, _count in top_tool[:2]
)
command_names = ", ".join(name for name, _count in top_command[:2])
bits = [part for part in (tool_names, command_names) if part]
if bits:
tools_text = f"The most-used levers are {bits[0]}"
if len(bits) > 1:
tools_text += f", with command families like {bits[1]}"
tools_text += "."
return {
"what_you_do": work_text,
"how_you_work": workflow_text,
"external_context": external_text,
"what_slows_you_down": friction_text,
"most_used_levers": tools_text,
}
def build_insights(data: AggregatedData) -> dict[str, Any]:
return {
"at_a_glance": build_at_a_glance(data),
"top_projects": [
{
"path": path,
"label": project_label(path),
"session_count": count,
}
for path, count in top_entries(data.projects, limit=8)
],
"top_goals": [
{"goal": goal, "label": safe_title(goal), "count": count}
for goal, count in top_entries(
data.goal_categories, limit=8, exclude={"warmup_minimal"}
)
],
"top_tools": [
{"tool": tool, "label": safe_title(tool), "count": count}
for tool, count in top_entries(data.tool_counts, limit=10)
],
"top_commands": [
{"command": name, "count": count}
for name, count in top_entries(data.command_families, limit=10)
],
"friction": [
{
"category": key,
"label": safe_title(key),
"count": count,
"description": FRICTION_DESCRIPTIONS.get(key, ""),
}
for key, count in top_entries(data.friction, limit=8)
],
}
def escape_html(text: str) -> str:
return html.escape(text or "")
def generate_bar_chart(
data: dict[str, int],
color: str,
max_items: int = 6,
label_fn: Any | None = None,
) -> str:
entries = top_entries(data, limit=max_items)
if not entries:
return '<p class="empty">No data</p>'
max_value = max(count for _label, count in entries) or 1
rows: list[str] = []
for label, count in entries:
display = label_fn(label) if label_fn else safe_title(label)
width = (count / max_value) * 100
rows.append(
f'<div class="bar-row"><div class="bar-label">{escape_html(str(display))}</div>'
f'<div class="bar-track"><div class="bar-fill" style="width:{width:.2f}%;background:{color}"></div></div>'
f'<div class="bar-value">{count}</div></div>'
)
return "\n".join(rows)
def generate_html_report(
data: AggregatedData,
insights: dict[str, Any],
project_scope_prefix: str | None,
) -> str:
glance = insights["at_a_glance"]
scope_label = project_scope_prefix or "All Codex sessions"
html_parts = [
"<!doctype html>",
'<html lang="en">',
"<head>",
'<meta charset="utf-8">',
'<meta name="viewport" content="width=device-width, initial-scale=1">',
"<title>Codex Insights Report</title>",
"<style>",
"body{margin:0;font-family:ui-sans-serif,system-ui,-apple-system,BlinkMacSystemFont,'Segoe UI',sans-serif;background:#f8fafc;color:#0f172a;}",
".page{max-width:1180px;margin:0 auto;padding:40px 24px 56px;}",
".hero{padding:28px 30px;border-radius:22px;background:linear-gradient(135deg,#0f172a,#1d4ed8 60%,#22c55e);color:#eff6ff;box-shadow:0 24px 80px rgba(15,23,42,.18);}",
".hero h1{margin:0 0 8px;font-size:34px;line-height:1.05;}",
".hero p{margin:0;color:#dbeafe;max-width:880px;}",
".scope{margin-top:12px;font-size:13px;color:#bfdbfe;}",
".glance{margin-top:20px;padding:18px 20px;border-radius:18px;background:rgba(255,255,255,.10);border:1px solid rgba(255,255,255,.16);}",
".glance-line{margin:0 0 10px;}",
".stats{display:grid;grid-template-columns:repeat(auto-fit,minmax(150px,1fr));gap:14px;margin:22px 0 30px;}",
".stat{background:#fff;border:1px solid #e2e8f0;border-radius:16px;padding:16px 18px;box-shadow:0 8px 30px rgba(15,23,42,.05);}",
".stat-value{font-size:28px;font-weight:700;}",
".stat-label{margin-top:4px;font-size:12px;text-transform:uppercase;letter-spacing:.08em;color:#64748b;}",
".grid{display:grid;grid-template-columns:repeat(auto-fit,minmax(320px,1fr));gap:18px;}",
".panel{background:#fff;border:1px solid #e2e8f0;border-radius:18px;padding:18px;box-shadow:0 8px 30px rgba(15,23,42,.05);}",
".panel h2{margin:0 0 14px;font-size:15px;text-transform:uppercase;letter-spacing:.08em;color:#64748b;}",
".bar-row{display:flex;align-items:center;gap:10px;margin-bottom:10px;}",
".bar-label{width:140px;font-size:13px;line-height:1.25;color:#1e293b;}",
".bar-track{flex:1;height:10px;border-radius:999px;background:#e2e8f0;overflow:hidden;}",
".bar-fill{height:100%;border-radius:999px;}",
".bar-value{width:34px;text-align:right;font-size:12px;color:#475569;}",
".empty{margin:0;color:#94a3b8;}",
".sessions{margin-top:26px;background:#fff;border:1px solid #e2e8f0;border-radius:18px;padding:18px;box-shadow:0 8px 30px rgba(15,23,42,.05);}",
".sessions h2{margin:0 0 14px;font-size:15px;text-transform:uppercase;letter-spacing:.08em;color:#64748b;}",
".session{padding:14px 0;border-top:1px solid #e2e8f0;}",
".session:first-of-type{border-top:none;padding-top:0;}",
".session-top{display:flex;justify-content:space-between;gap:12px;flex-wrap:wrap;}",
".session-project{font-weight:700;}",
".session-meta{font-size:12px;color:#64748b;}",
".session-prompt,.session-summary{margin:6px 0 0;font-size:14px;line-height:1.45;}",
".footer{margin-top:28px;font-size:12px;color:#64748b;}",
"@media (max-width:700px){.hero h1{font-size:28px}.bar-label{width:110px}}",
"</style>",
"</head>",
"<body>",
'<div class="page">',
'<div class="hero">',
"<h1>Codex Insights</h1>",
f"<p>{escape_html(data.date_range.get('start', ''))} to {escape_html(data.date_range.get('end', ''))}. Built from local Codex rollout sessions.</p>",
f'<div class="scope">Scope: {escape_html(scope_label)}</div>',
'<div class="glance">',
f'<p class="glance-line"><strong>What you do:</strong> {escape_html(glance["what_you_do"])}</p>',
f'<p class="glance-line"><strong>How you work:</strong> {escape_html(glance["how_you_work"])}</p>',
f'<p class="glance-line"><strong>External context:</strong> {escape_html(glance["external_context"])}</p>',
f'<p class="glance-line"><strong>What slows you down:</strong> {escape_html(glance["what_slows_you_down"])}</p>',
f'<p class="glance-line"><strong>Most-used levers:</strong> {escape_html(glance["most_used_levers"])}</p>',
"</div>",
"</div>",
'<div class="stats">',
f'<div class="stat"><div class="stat-value">{data.total_sessions}</div><div class="stat-label">Sessions</div></div>',
f'<div class="stat"><div class="stat-value">{round(data.total_duration_hours, 1)}</div><div class="stat-label">Hours</div></div>',
f'<div class="stat"><div class="stat-value">{data.total_user_messages}</div><div class="stat-label">User Messages</div></div>',
f'<div class="stat"><div class="stat-value">{data.total_shell_commands}</div><div class="stat-label">Shell Commands</div></div>',
f'<div class="stat"><div class="stat-value">{data.total_command_failures}</div><div class="stat-label">Command Failures</div></div>',
f'<div class="stat"><div class="stat-value">{data.total_web_searches}</div><div class="stat-label">Web Searches</div></div>',
f'<div class="stat"><div class="stat-value">{data.total_spawn_agents}</div><div class="stat-label">Spawn Agents</div></div>',
f'<div class="stat"><div class="stat-value">{data.total_mcp_calls}</div><div class="stat-label">MCP Calls</div></div>',
"</div>",
'<div class="grid">',
f'<div class="panel"><h2>Projects</h2>{generate_bar_chart(data.projects, "#2563eb", label_fn=project_label)}</div>',
f'<div class="panel"><h2>Goals</h2>{generate_bar_chart(data.goal_categories, "#0f766e")}</div>',
f'<div class="panel"><h2>Tools</h2>{generate_bar_chart(data.tool_counts, "#9333ea")}</div>',
f'<div class="panel"><h2>Command Families</h2>{generate_bar_chart(data.command_families, "#ea580c", label_fn=lambda value: value)}</div>',
f'<div class="panel"><h2>Models</h2>{generate_bar_chart(data.models, "#16a34a", label_fn=lambda value: value)}</div>',
f'<div class="panel"><h2>Friction</h2>{generate_bar_chart(data.friction, "#dc2626")}</div>',
"</div>",
'<div class="sessions">',
"<h2>Recent Sessions</h2>",
]
for item in data.session_summaries[:18]:
html_parts.extend(
[
'<div class="session">',
'<div class="session-top">',
f'<div class="session-project">{escape_html(item["project"])}</div>',
f'<div class="session-meta">{escape_html(item["date"])} | {escape_html(safe_title(item["outcome"]))} | failures: {escape_html(item["failures"])}</div>',
"</div>",
f'<p class="session-prompt"><strong>Prompt:</strong> {escape_html(item["prompt"] or "(none captured)")}</p>',
f'<p class="session-summary"><strong>Summary:</strong> {escape_html(item["summary"])}</p>',
"</div>",
]
)
html_parts.extend(
[
"</div>",
f'<div class="footer">Input tokens: {data.total_input_tokens:,} | Output tokens: {data.total_output_tokens:,} | Reasoning tokens: {data.total_reasoning_tokens:,}</div>',
"</div>",
"</body>",
"</html>",
]
)
return "\n".join(html_parts)
def build_export_data(
data: AggregatedData,
insights: dict[str, Any],
sessions: list[CodexSession],
project_scope_prefix: str | None,
) -> dict[str, Any]:
session_details = []
for session in sessions:
session_details.append(
{
"session_id": session.session_id,
"file_path": session.file_path,
"start_time": session.start_time,
"end_time": session.end_time,
"cwd": session.cwd,
"model": session.model,
"model_provider": session.model_provider,
"cli_version": session.cli_version,
"duration_minutes": round(session.duration_minutes, 2),
"first_prompt": session.first_prompt,
"outcome": session.outcome,
"tool_counts": session.tool_counts,
"command_families": session.command_families,
"goal_categories": session.goal_categories,
"friction": session.friction,
"command_failures": session.command_failures,
"web_search_count": session.web_search_count,
"mcp_call_count": session.mcp_call_count,
"spawn_agent_count": session.spawn_agent_count,
}
)
return {
"metadata": {
"generated_at": datetime.now(tz=timezone.utc).isoformat(),
"report_type": "codex-insights",
"date_range": data.date_range,
"session_count": data.total_sessions,
"project_scope_prefix": project_scope_prefix,
},
"aggregated_data": asdict(data),
"insights": insights,
"sessions": session_details,
}
def generate_report(args: argparse.Namespace) -> dict[str, Any]:
ensure_dir(args.cache_dir)
scope = build_project_scope(args.project_path_prefix)
sessions = scan_all_sessions(args.sessions_dir, scope)
aggregated = aggregate_sessions(sessions)
insights = build_insights(aggregated)
output_html = args.output_html or (args.cache_dir / "report.html")
output_json = args.output_json or (args.cache_dir / "report.json")
ensure_dir(output_html.parent)
ensure_dir(output_json.parent)
output_html.write_text(
generate_html_report(aggregated, insights, args.project_path_prefix),
encoding="utf-8",
)
output_json.write_text(
json.dumps(
build_export_data(
aggregated, insights, sessions, args.project_path_prefix
),
indent=2,
),
encoding="utf-8",
)
return {
"html_path": output_html,
"json_path": output_json,
"data": aggregated,
"insights": insights,
}
def print_summary(result: dict[str, Any]) -> None:
data: AggregatedData = result["data"]
glance = result["insights"]["at_a_glance"]
print(f"Wrote HTML report: {result['html_path']}")
print(f"Wrote JSON export: {result['json_path']}")
print(
f"Analyzed {data.total_sessions} sessions "
f"({data.total_user_messages} user messages, {round(data.total_duration_hours, 1)}h) "
f"from {data.date_range.get('start', '')} to {data.date_range.get('end', '')}"
)
print(f"What you do: {glance['what_you_do']}")
print(f"What slows you down: {glance['what_slows_you_down']}")
def main() -> int:
args = parse_args()
result = generate_report(args)
print_summary(result)
return 0
if __name__ == "__main__":
raise SystemExit(main())