# /// script # requires-python = ">=3.11" # /// """Portable Codex CLI usage report generator. This script scans Codex session JSONL files under ``~/.codex/sessions`` and builds a deterministic HTML report plus JSON export. It is intentionally Codex-specific: 1. Session discovery uses Codex rollout files, not Claude transcripts. 2. Metrics are derived from Codex event types like ``user_message``, ``function_call``, ``exec_command_end``, and ``web_search_end``. 3. Project scoping matches session ``cwd`` values, with repo-family matching for worktrees and related clones. """ from __future__ import annotations import argparse import html import json import os import re import shlex import subprocess from collections import Counter from dataclasses import asdict, dataclass, field from datetime import datetime, timezone from functools import lru_cache from pathlib import Path from typing import Any LABEL_MAP: dict[str, str] = { "debug_investigate": "Debug/Investigate", "implement_feature": "Implement Feature", "fix_bug": "Fix Bug", "write_script_tool": "Write Script/Tool", "refactor_code": "Refactor Code", "configure_system": "Configure System", "create_pr_commit": "Create PR/Commit", "analyze_data": "Analyze Data", "understand_codebase": "Understand Codebase", "write_tests": "Write Tests", "write_docs": "Write Docs", "warmup_minimal": "Quick Check", "command_failed": "Command Failed", "turn_aborted": "Turn Aborted", "model_error": "Model Error", "approval_requested": "Approval Requested", "completed_cleanly": "Completed Cleanly", "completed_with_retries": "Completed With Retries", "interrupted": "Interrupted", "incomplete": "Incomplete", "web_search": "Web Search", "exec_command": "Exec Command", "shell": "Shell", "shell_command": "Shell Command", "spawn_agent": "Spawn Agent", "update_plan": "Update Plan", "request_user_input": "Request User Input", } GOAL_PATTERNS: dict[str, list[re.Pattern[str]]] = { "debug_investigate": [ re.compile(r"\bdebug\b", re.IGNORECASE), re.compile(r"\binvestigat", re.IGNORECASE), re.compile(r"\btrace\b", re.IGNORECASE), re.compile(r"\berror\b", re.IGNORECASE), re.compile(r"\bwhy\b", re.IGNORECASE), re.compile(r"\bfail", re.IGNORECASE), ], "implement_feature": [ re.compile(r"\bimplement\b", re.IGNORECASE), re.compile(r"\bbuild\b", re.IGNORECASE), re.compile(r"\bfeature\b", re.IGNORECASE), re.compile(r"\badd\b", re.IGNORECASE), re.compile(r"\bcreate\b", re.IGNORECASE), ], "fix_bug": [ re.compile(r"\bfix\b", re.IGNORECASE), re.compile(r"\bbug\b", re.IGNORECASE), re.compile(r"\bbroken\b", re.IGNORECASE), re.compile(r"\bfailing\b", re.IGNORECASE), ], "write_script_tool": [ re.compile(r"\bscript\b", re.IGNORECASE), re.compile(r"\bcli\b", re.IGNORECASE), re.compile(r"\btool\b", re.IGNORECASE), re.compile(r"\bautomation\b", re.IGNORECASE), ], "refactor_code": [ re.compile(r"\brefactor\b", re.IGNORECASE), re.compile(r"\bcleanup\b", re.IGNORECASE), re.compile(r"\breorgan", re.IGNORECASE), re.compile(r"\bsimplif", re.IGNORECASE), ], "configure_system": [ re.compile(r"\bconfigure\b", re.IGNORECASE), re.compile(r"\bsetup\b", re.IGNORECASE), re.compile(r"\binstall\b", re.IGNORECASE), re.compile(r"\bconfig\b", re.IGNORECASE), re.compile(r"\benv\b", re.IGNORECASE), re.compile(r"\bci\b", re.IGNORECASE), ], "create_pr_commit": [ re.compile(r"\bcommit\b", re.IGNORECASE), re.compile(r"\bpull request\b", re.IGNORECASE), re.compile(r"\bpr\b", re.IGNORECASE), re.compile(r"\bmerge\b", re.IGNORECASE), re.compile(r"\bbranch\b", re.IGNORECASE), ], "analyze_data": [ re.compile(r"\banaly[sz]e\b", re.IGNORECASE), re.compile(r"\bmetrics\b", re.IGNORECASE), re.compile(r"\breport\b", re.IGNORECASE), re.compile(r"\binsights?\b", re.IGNORECASE), re.compile(r"\bdata\b", re.IGNORECASE), ], "understand_codebase": [ re.compile(r"\bunderstand\b", re.IGNORECASE), re.compile(r"\bexplain\b", re.IGNORECASE), re.compile(r"\bwalk ?through\b", re.IGNORECASE), re.compile(r"\bhow does\b", re.IGNORECASE), re.compile(r"\bwhere is\b", re.IGNORECASE), re.compile(r"\bfind\b", re.IGNORECASE), ], "write_tests": [ re.compile(r"\btests?\b", re.IGNORECASE), re.compile(r"\bpytest\b", re.IGNORECASE), re.compile(r"\bunit test\b", re.IGNORECASE), re.compile(r"\bintegration test\b", re.IGNORECASE), re.compile(r"\bbenchmark\b", re.IGNORECASE), ], "write_docs": [ re.compile(r"\breadme\b", re.IGNORECASE), re.compile(r"\bdocs?\b", re.IGNORECASE), re.compile(r"\bdocument", re.IGNORECASE), ], } FRICTION_DESCRIPTIONS: dict[str, str] = { "command_failed": "Shell execution is the main source of drag in these sessions.", "turn_aborted": "You are redirecting or interrupting turns before they land cleanly.", "model_error": "The model/runtime occasionally trips over its own protocol or request handling.", "approval_requested": "Some sessions slow down because they need explicit privilege escalation.", } MCP_TOOL_NAMES = { "list_mcp_resources", "list_mcp_resource_templates", "read_mcp_resource", } SHELL_TOOL_NAMES = {"exec_command", "shell", "shell_command"} TEST_COMMAND_RE = re.compile( r"\b(pytest|npm test|pnpm test|yarn test|cargo test|go test|vitest|jest|ruff|mypy|gradle test|mvn test)\b", re.IGNORECASE, ) GIT_COMMIT_RE = re.compile(r"(^|[;&|]\s*|\s)git\s+commit\b", re.IGNORECASE) GIT_PUSH_RE = re.compile(r"(^|[;&|]\s*|\s)git\s+push\b", re.IGNORECASE) GH_RE = re.compile(r"(^|[;&|]\s*|\s)gh\b", re.IGNORECASE) @dataclass(frozen=True) class RepoIdentity: root: str common_dir: str | None remotes: frozenset[str] worktrees: tuple[str, ...] = () @dataclass(frozen=True) class ProjectScope: target_prefix: str | None path_prefixes: frozenset[str] common_dirs: frozenset[str] remotes: frozenset[str] @dataclass class CodexSession: session_id: str file_path: str start_time: str end_time: str cwd: str model: str model_provider: str cli_version: str duration_minutes: float user_message_count: int assistant_message_count: int final_answer_count: int commentary_count: int shell_command_count: int command_failures: int web_search_count: int web_open_count: int approval_requests: int spawn_agent_count: int mcp_call_count: int invalid_request_errors: int interruptions: int total_input_tokens: int total_output_tokens: int total_reasoning_tokens: int git_commits: int git_pushes: int gh_commands: int test_commands: int tool_counts: dict[str, int] = field(default_factory=dict) command_families: dict[str, int] = field(default_factory=dict) goal_categories: dict[str, int] = field(default_factory=dict) friction: dict[str, int] = field(default_factory=dict) user_messages: list[str] = field(default_factory=list) assistant_summaries: list[str] = field(default_factory=list) command_failure_examples: list[str] = field(default_factory=list) first_prompt: str = "" final_answer: str = "" outcome: str = "" @dataclass class AggregatedData: total_sessions: int date_range: dict[str, str] total_user_messages: int = 0 total_assistant_messages: int = 0 total_duration_hours: float = 0.0 total_input_tokens: int = 0 total_output_tokens: int = 0 total_reasoning_tokens: int = 0 total_shell_commands: int = 0 total_command_failures: int = 0 total_web_searches: int = 0 total_web_opens: int = 0 total_approval_requests: int = 0 total_spawn_agents: int = 0 total_mcp_calls: int = 0 total_interruptions: int = 0 total_invalid_request_errors: int = 0 git_commits: int = 0 git_pushes: int = 0 gh_commands: int = 0 test_commands: int = 0 sessions_with_mcp: int = 0 sessions_with_subagents: int = 0 sessions_with_web_search: int = 0 tool_counts: dict[str, int] = field(default_factory=dict) command_families: dict[str, int] = field(default_factory=dict) models: dict[str, int] = field(default_factory=dict) providers: dict[str, int] = field(default_factory=dict) projects: dict[str, int] = field(default_factory=dict) goal_categories: dict[str, int] = field(default_factory=dict) friction: dict[str, int] = field(default_factory=dict) outcomes: dict[str, int] = field(default_factory=dict) session_summaries: list[dict[str, str]] = field(default_factory=list) def parse_args() -> argparse.Namespace: script_dir = Path(__file__).resolve().parent home = Path(os.path.expanduser("~")) parser = argparse.ArgumentParser( description="Generate a Codex CLI usage report from ~/.codex/sessions.", ) parser.add_argument( "--sessions-dir", type=Path, default=home / ".codex" / "sessions", help="Directory containing Codex session rollout JSONL files.", ) parser.add_argument( "--cache-dir", type=Path, default=script_dir / "codex-insights-output", help="Directory for the generated HTML report and JSON export.", ) parser.add_argument( "--project-path-prefix", type=str, default=None, help=( "Only include sessions whose cwd matches this path, a descendant, " "or a related repo/worktree/fork in the same repo family." ), ) parser.add_argument( "--output-html", type=Path, default=None, help="Path for the generated HTML report. Defaults to /report.html.", ) parser.add_argument( "--output-json", type=Path, default=None, help="Path for the JSON export. Defaults to /report.json.", ) return parser.parse_args() def ensure_dir(path: Path) -> None: path.mkdir(parents=True, exist_ok=True) def parse_iso_timestamp(value: str | None) -> datetime: if not value: return datetime.fromtimestamp(0, tz=timezone.utc) normalized = value if normalized.endswith("Z"): normalized = normalized[:-1] + "+00:00" try: return datetime.fromisoformat(normalized) except ValueError: return datetime.fromtimestamp(0, tz=timezone.utc) def iso_date(value: str) -> str: return parse_iso_timestamp(value).date().isoformat() def truncate(text: str, length: int) -> str: collapsed = " ".join(text.split()) if len(collapsed) <= length: return collapsed return collapsed[: max(0, length - 1)].rstrip() + "..." def safe_title(value: str) -> str: return LABEL_MAP.get(value, value.replace("_", " ").title()) def normalize_path_for_match(path_text: str) -> str: return os.path.normpath(os.path.realpath(os.path.expanduser(path_text))) def run_git(args: list[str], cwd: str) -> str | None: try: completed = subprocess.run( ["git", *args], cwd=cwd, check=True, stdout=subprocess.PIPE, stderr=subprocess.DEVNULL, text=True, ) except Exception: return None return completed.stdout.strip() @lru_cache(maxsize=512) def get_repo_identity(path_text: str) -> RepoIdentity | None: normalized = normalize_path_for_match(path_text) top_level = run_git(["rev-parse", "--show-toplevel"], normalized) if not top_level: return None common_dir = run_git(["rev-parse", "--git-common-dir"], normalized) if common_dir and not os.path.isabs(common_dir): common_dir = normalize_path_for_match( os.path.join(normalized, common_dir) ) elif common_dir: common_dir = normalize_path_for_match(common_dir) remote_lines = run_git(["remote", "-v"], normalized) or "" remotes: set[str] = set() for line in remote_lines.splitlines(): parts = line.split() if len(parts) >= 2: remotes.add(parts[1].removesuffix(".git")) worktree_text = ( run_git(["worktree", "list", "--porcelain"], normalized) or "" ) worktrees: list[str] = [] for line in worktree_text.splitlines(): if line.startswith("worktree "): worktrees.append( normalize_path_for_match( line.removeprefix("worktree ").strip() ) ) return RepoIdentity( root=normalize_path_for_match(top_level), common_dir=common_dir, remotes=frozenset(remotes), worktrees=tuple(worktrees), ) def discover_git_roots(base_path: Path) -> set[str]: discovered: set[str] = set() if not base_path.exists(): return discovered skip_dirs = { ".git", ".venv", "node_modules", "__pycache__", ".pytest_cache", ".ruff_cache", } for root, dirs, files in os.walk(base_path): dirs[:] = [entry for entry in dirs if entry not in skip_dirs] if ".git" in dirs or ".git" in files: discovered.add(normalize_path_for_match(root)) dirs[:] = [] return discovered def build_project_scope(prefix: str | None) -> ProjectScope: if not prefix: return ProjectScope( target_prefix=None, path_prefixes=frozenset(), common_dirs=frozenset(), remotes=frozenset(), ) normalized_prefix = normalize_path_for_match(prefix) path_prefixes: set[str] = {normalized_prefix} common_dirs: set[str] = set() remotes: set[str] = set() candidate_roots = discover_git_roots(Path(normalized_prefix)) direct_identity = get_repo_identity(normalized_prefix) if direct_identity: candidate_roots.add(direct_identity.root) for repo_root in candidate_roots: identity = get_repo_identity(repo_root) if not identity: continue path_prefixes.add(identity.root) path_prefixes.update(identity.worktrees) if identity.common_dir: common_dirs.add(identity.common_dir) remotes.update(identity.remotes) return ProjectScope( target_prefix=normalized_prefix, path_prefixes=frozenset(path_prefixes), common_dirs=frozenset(common_dirs), remotes=frozenset(remotes), ) def matches_project_scope(cwd: str, scope: ProjectScope) -> bool: if scope.target_prefix is None: return True if not cwd: return False normalized_cwd = normalize_path_for_match(cwd) for prefix in scope.path_prefixes: if normalized_cwd == prefix or normalized_cwd.startswith( prefix + os.sep ): return True identity = get_repo_identity(normalized_cwd) if not identity: return False if identity.common_dir and identity.common_dir in scope.common_dirs: return True return bool(scope.remotes and identity.remotes.intersection(scope.remotes)) def parse_tool_arguments(raw: str | None) -> dict[str, Any]: if not raw: return {} try: value = json.loads(raw) except Exception: return {} return value if isinstance(value, dict) else {} def extract_message_text(content: Any) -> str: if not isinstance(content, list): return "" parts: list[str] = [] for item in content: if not isinstance(item, dict): continue if item.get("type") in {"output_text", "input_text"}: text = item.get("text") if isinstance(text, str) and text.strip(): parts.append(text.strip()) return "\n".join(parts) def is_env_assignment(token: str) -> bool: return bool(re.match(r"^[A-Za-z_][A-Za-z0-9_]*=.*$", token)) def command_tokens(command_text: str) -> list[str]: if not command_text.strip(): return [] try: tokens = shlex.split(command_text) except ValueError: tokens = command_text.split() while tokens and is_env_assignment(tokens[0]): tokens.pop(0) return tokens def command_family(command_text: str) -> str: tokens = command_tokens(command_text) if not tokens: return "" return Path(tokens[0]).name def extract_exec_command_text(payload: dict[str, Any]) -> str: command = payload.get("command") if isinstance(command, list): if len(command) >= 3 and command[1] == "-lc": return str(command[2]) return " ".join(str(part) for part in command) if isinstance(command, str): return command return "" def detect_goal_categories(session: CodexSession) -> dict[str, int]: counts: Counter[str] = Counter() if session.user_message_count <= 1 and session.shell_command_count <= 1: return {"warmup_minimal": 1} for text in session.user_messages: for category, patterns in GOAL_PATTERNS.items(): if any(pattern.search(text) for pattern in patterns): counts[category] += 1 if session.test_commands > 0: counts["write_tests"] += 1 if ( session.git_commits > 0 or session.gh_commands > 0 or session.git_pushes > 0 ): counts["create_pr_commit"] += 1 if session.web_search_count > 0 and not counts: counts["understand_codebase"] += 1 if session.command_failures > 0 and not counts: counts["debug_investigate"] += 1 if session.shell_command_count > 0 and not counts: counts["understand_codebase"] += 1 if not counts: counts["warmup_minimal"] += 1 return dict(counts) def detect_friction(session: CodexSession) -> dict[str, int]: counts: Counter[str] = Counter() if session.command_failures > 0: counts["command_failed"] += session.command_failures if session.interruptions > 0: counts["turn_aborted"] += session.interruptions if session.invalid_request_errors > 0: counts["model_error"] += session.invalid_request_errors if session.approval_requests > 0: counts["approval_requested"] += session.approval_requests return dict(counts) def infer_outcome(session: CodexSession) -> str: if ( session.final_answer_count > 0 and session.command_failures == 0 and session.interruptions == 0 ): return "completed_cleanly" if session.final_answer_count > 0: return "completed_with_retries" if session.interruptions > 0: return "interrupted" return "incomplete" def summarize_assistant(session: CodexSession) -> str: if session.final_answer: return truncate(session.final_answer, 140) if session.assistant_summaries: return truncate(session.assistant_summaries[0], 140) if session.first_prompt: return truncate(session.first_prompt, 140) return "No assistant summary captured." def parse_session_file( session_path: Path, scope: ProjectScope ) -> CodexSession | None: session_id = session_path.stem.split("-")[-1] cwd = "" model = "" provider = "" cli_version = "" first_seen_ts = "" last_seen_ts = "" latest_input_tokens = 0 latest_output_tokens = 0 latest_reasoning_tokens = 0 user_messages: list[str] = [] assistant_summaries: list[str] = [] command_failure_examples: list[str] = [] tool_counts: Counter[str] = Counter() command_families: Counter[str] = Counter() shell_command_count = 0 command_failures = 0 web_search_count = 0 web_open_count = 0 approval_requests = 0 spawn_agent_count = 0 mcp_call_count = 0 invalid_request_errors = 0 interruptions = 0 assistant_message_count = 0 final_answer_count = 0 commentary_count = 0 git_commits = 0 git_pushes = 0 gh_commands = 0 test_commands = 0 final_answer = "" scope_match = scope.target_prefix is None with session_path.open("r", encoding="utf-8") as handle: for raw_line in handle: if "invalid_request_error" in raw_line: invalid_request_errors += 1 line = raw_line.strip() if not line: continue try: event = json.loads(line) except json.JSONDecodeError: continue timestamp = str(event.get("timestamp") or "") if timestamp: if not first_seen_ts: first_seen_ts = timestamp last_seen_ts = timestamp payload = event.get("payload") if not isinstance(payload, dict): continue event_type = event.get("type") if event_type == "session_meta": session_id = str(payload.get("id") or session_id) cwd = str(payload.get("cwd") or cwd) provider = str(payload.get("model_provider") or provider) cli_version = str(payload.get("cli_version") or cli_version) first_seen_ts = str(payload.get("timestamp") or first_seen_ts) if cwd and matches_project_scope(cwd, scope): scope_match = True continue if event_type == "turn_context": turn_cwd = str(payload.get("cwd") or "") if turn_cwd: cwd = turn_cwd if matches_project_scope(cwd, scope): scope_match = True if not model: model = str(payload.get("model") or "") continue if event_type == "event_msg": inner_type = payload.get("type") if inner_type == "user_message": message = str(payload.get("message") or "").strip() if message: user_messages.append(message) elif inner_type == "exec_command_end": command_text = extract_exec_command_text(payload) if command_text: shell_command_count += 1 family = command_family(command_text) if family: command_families[family] += 1 lowered = command_text.lower() if GIT_COMMIT_RE.search(command_text): git_commits += 1 if GIT_PUSH_RE.search(command_text): git_pushes += 1 if GH_RE.search(command_text): gh_commands += 1 if TEST_COMMAND_RE.search(lowered): test_commands += 1 exit_code = payload.get("exit_code") if isinstance(exit_code, int) and exit_code != 0: command_failures += 1 if command_text and len(command_failure_examples) < 5: command_failure_examples.append( truncate(command_text, 120) ) elif inner_type == "web_search_end": web_search_count += 1 action = payload.get("action") if ( isinstance(action, dict) and action.get("type") == "open_page" ): web_open_count += 1 elif inner_type == "turn_aborted": interruptions += 1 elif inner_type == "token_count": info = payload.get("info") if isinstance(info, dict): totals = info.get("total_token_usage") if isinstance(totals, dict): latest_input_tokens = int( totals.get("input_tokens") or 0 ) latest_output_tokens = int( totals.get("output_tokens") or 0 ) latest_reasoning_tokens = int( totals.get("reasoning_output_tokens") or 0 ) continue if event_type != "response_item": continue item_type = payload.get("type") if item_type == "function_call": name = str(payload.get("name") or "") if name: tool_counts[name] += 1 if name.startswith("mcp__") or name in MCP_TOOL_NAMES: mcp_call_count += 1 if name == "spawn_agent": spawn_agent_count += 1 args = parse_tool_arguments(payload.get("arguments")) if args.get("sandbox_permissions") == "require_escalated": approval_requests += 1 elif item_type == "web_search_call": tool_counts["web_search"] += 1 elif item_type == "message" and payload.get("role") == "assistant": text = extract_message_text(payload.get("content")) if text: assistant_message_count += 1 phase = str(payload.get("phase") or "") if phase == "commentary": commentary_count += 1 elif phase == "final_answer": final_answer_count += 1 final_answer = text if phase != "commentary" and len(assistant_summaries) < 5: assistant_summaries.append(text) if not scope_match: return None start_dt = parse_iso_timestamp(first_seen_ts) end_dt = parse_iso_timestamp(last_seen_ts or first_seen_ts) duration_minutes = max(0.0, (end_dt - start_dt).total_seconds() / 60.0) session = CodexSession( session_id=session_id, file_path=str(session_path), start_time=start_dt.isoformat(), end_time=end_dt.isoformat(), cwd=cwd, model=model, model_provider=provider, cli_version=cli_version, duration_minutes=duration_minutes, user_message_count=len(user_messages), assistant_message_count=assistant_message_count, final_answer_count=final_answer_count, commentary_count=commentary_count, shell_command_count=shell_command_count, command_failures=command_failures, web_search_count=web_search_count, web_open_count=web_open_count, approval_requests=approval_requests, spawn_agent_count=spawn_agent_count, mcp_call_count=mcp_call_count, invalid_request_errors=invalid_request_errors, interruptions=interruptions, total_input_tokens=latest_input_tokens, total_output_tokens=latest_output_tokens, total_reasoning_tokens=latest_reasoning_tokens, git_commits=git_commits, git_pushes=git_pushes, gh_commands=gh_commands, test_commands=test_commands, tool_counts=dict(tool_counts), command_families=dict(command_families), user_messages=user_messages, assistant_summaries=[ truncate(text, 160) for text in assistant_summaries ], command_failure_examples=command_failure_examples, first_prompt=truncate(user_messages[0], 160) if user_messages else "", final_answer=truncate(final_answer, 800) if final_answer else "", ) session.goal_categories = detect_goal_categories(session) session.friction = detect_friction(session) session.outcome = infer_outcome(session) return session def scan_all_sessions( sessions_dir: Path, scope: ProjectScope ) -> list[CodexSession]: session_files = sorted(sessions_dir.rglob("*.jsonl")) sessions: list[CodexSession] = [] for session_file in session_files: parsed = parse_session_file(session_file, scope) if parsed is not None: sessions.append(parsed) sessions.sort(key=lambda item: item.start_time, reverse=True) return sessions def top_entries( data: dict[str, int], limit: int = 5, exclude: set[str] | None = None ) -> list[tuple[str, int]]: blocked = exclude or set() return [ (key, value) for key, value in sorted( data.items(), key=lambda item: item[1], reverse=True ) if value > 0 and key not in blocked ][:limit] def project_label(path_text: str) -> str: parts = Path(path_text).parts if len(parts) >= 2: return "/".join(parts[-2:]) if parts: return parts[-1] return path_text or "(unknown)" def aggregate_sessions(sessions: list[CodexSession]) -> AggregatedData: aggregated = AggregatedData( total_sessions=len(sessions), date_range={"start": "", "end": ""}, ) dates: list[str] = [] for session in sessions: dates.append(session.start_time) aggregated.total_user_messages += session.user_message_count aggregated.total_assistant_messages += session.assistant_message_count aggregated.total_duration_hours += session.duration_minutes / 60.0 aggregated.total_input_tokens += session.total_input_tokens aggregated.total_output_tokens += session.total_output_tokens aggregated.total_reasoning_tokens += session.total_reasoning_tokens aggregated.total_shell_commands += session.shell_command_count aggregated.total_command_failures += session.command_failures aggregated.total_web_searches += session.web_search_count aggregated.total_web_opens += session.web_open_count aggregated.total_approval_requests += session.approval_requests aggregated.total_spawn_agents += session.spawn_agent_count aggregated.total_mcp_calls += session.mcp_call_count aggregated.total_interruptions += session.interruptions aggregated.total_invalid_request_errors += ( session.invalid_request_errors ) aggregated.git_commits += session.git_commits aggregated.git_pushes += session.git_pushes aggregated.gh_commands += session.gh_commands aggregated.test_commands += session.test_commands aggregated.sessions_with_mcp += int(session.mcp_call_count > 0) aggregated.sessions_with_subagents += int( session.spawn_agent_count > 0 ) aggregated.sessions_with_web_search += int( session.web_search_count > 0 ) if session.cwd: aggregated.projects[session.cwd] = ( aggregated.projects.get(session.cwd, 0) + 1 ) if session.model: aggregated.models[session.model] = ( aggregated.models.get(session.model, 0) + 1 ) if session.model_provider: aggregated.providers[session.model_provider] = ( aggregated.providers.get(session.model_provider, 0) + 1 ) aggregated.outcomes[session.outcome] = ( aggregated.outcomes.get(session.outcome, 0) + 1 ) for key, count in session.tool_counts.items(): aggregated.tool_counts[key] = ( aggregated.tool_counts.get(key, 0) + count ) for key, count in session.command_families.items(): aggregated.command_families[key] = ( aggregated.command_families.get(key, 0) + count ) for key, count in session.goal_categories.items(): aggregated.goal_categories[key] = ( aggregated.goal_categories.get(key, 0) + count ) for key, count in session.friction.items(): aggregated.friction[key] = aggregated.friction.get(key, 0) + count if len(aggregated.session_summaries) < 50: aggregated.session_summaries.append( { "id": session.session_id[:8], "date": iso_date(session.start_time), "cwd": session.cwd, "project": project_label(session.cwd), "prompt": session.first_prompt, "summary": summarize_assistant(session), "outcome": session.outcome, "failures": str(session.command_failures), } ) if dates: dates.sort() aggregated.date_range["start"] = iso_date(dates[0]) aggregated.date_range["end"] = iso_date(dates[-1]) return aggregated def build_at_a_glance(data: AggregatedData) -> dict[str, str]: top_goal = top_entries( data.goal_categories, limit=1, exclude={"warmup_minimal"} ) top_project = top_entries(data.projects, limit=1) top_tool = top_entries(data.tool_counts, limit=3) top_command = top_entries(data.command_families, limit=3) top_friction = top_entries(data.friction, limit=1) work_text = ( "Most sessions are broad repo work rather than a single repeated task." ) if top_goal: work_text = f"Most Codex work in this slice is about {safe_title(top_goal[0][0]).lower()}." if top_project: work_text += f" The most common working directory is {project_label(top_project[0][0])}." workflow_text = "Your workflow mixes repo search, shell execution, and short conversational steering." if data.total_shell_commands >= max(10, data.total_sessions * 8): workflow_text = "This is a terminal-first Codex workflow: most value comes from command execution, not long-form chat." if data.sessions_with_subagents > 0: workflow_text += f" Subagents show up in {data.sessions_with_subagents} session(s), so delegation is present but selective." external_text = "External context is limited." if data.total_web_searches > 0 and data.total_mcp_calls == 0: external_text = "External context is coming mostly from web search; MCP usage is rare in the captured sessions." elif data.total_mcp_calls > 0: external_text = "These sessions do use MCP-backed context rather than relying purely on shell and web search." friction_text = "Measured friction is low." if top_friction: friction_key = top_friction[0][0] friction_text = FRICTION_DESCRIPTIONS.get( friction_key, f"The main drag is {safe_title(friction_key).lower()}.", ) tools_text = "Top tools are mixed." if top_tool or top_command: tool_names = ", ".join( safe_title(name) for name, _count in top_tool[:2] ) command_names = ", ".join(name for name, _count in top_command[:2]) bits = [part for part in (tool_names, command_names) if part] if bits: tools_text = f"The most-used levers are {bits[0]}" if len(bits) > 1: tools_text += f", with command families like {bits[1]}" tools_text += "." return { "what_you_do": work_text, "how_you_work": workflow_text, "external_context": external_text, "what_slows_you_down": friction_text, "most_used_levers": tools_text, } def build_insights(data: AggregatedData) -> dict[str, Any]: return { "at_a_glance": build_at_a_glance(data), "top_projects": [ { "path": path, "label": project_label(path), "session_count": count, } for path, count in top_entries(data.projects, limit=8) ], "top_goals": [ {"goal": goal, "label": safe_title(goal), "count": count} for goal, count in top_entries( data.goal_categories, limit=8, exclude={"warmup_minimal"} ) ], "top_tools": [ {"tool": tool, "label": safe_title(tool), "count": count} for tool, count in top_entries(data.tool_counts, limit=10) ], "top_commands": [ {"command": name, "count": count} for name, count in top_entries(data.command_families, limit=10) ], "friction": [ { "category": key, "label": safe_title(key), "count": count, "description": FRICTION_DESCRIPTIONS.get(key, ""), } for key, count in top_entries(data.friction, limit=8) ], } def escape_html(text: str) -> str: return html.escape(text or "") def generate_bar_chart( data: dict[str, int], color: str, max_items: int = 6, label_fn: Any | None = None, ) -> str: entries = top_entries(data, limit=max_items) if not entries: return '

No data

' max_value = max(count for _label, count in entries) or 1 rows: list[str] = [] for label, count in entries: display = label_fn(label) if label_fn else safe_title(label) width = (count / max_value) * 100 rows.append( f'
{escape_html(str(display))}
' f'
' f'
{count}
' ) return "\n".join(rows) def generate_html_report( data: AggregatedData, insights: dict[str, Any], project_scope_prefix: str | None, ) -> str: glance = insights["at_a_glance"] scope_label = project_scope_prefix or "All Codex sessions" html_parts = [ "", '', "", '', '', "Codex Insights Report", "", "", "", '
', '
', "

Codex Insights

", f"

{escape_html(data.date_range.get('start', ''))} to {escape_html(data.date_range.get('end', ''))}. Built from local Codex rollout sessions.

", f'
Scope: {escape_html(scope_label)}
', '
', f'

What you do: {escape_html(glance["what_you_do"])}

', f'

How you work: {escape_html(glance["how_you_work"])}

', f'

External context: {escape_html(glance["external_context"])}

', f'

What slows you down: {escape_html(glance["what_slows_you_down"])}

', f'

Most-used levers: {escape_html(glance["most_used_levers"])}

', "
", "
", '
', f'
{data.total_sessions}
Sessions
', f'
{round(data.total_duration_hours, 1)}
Hours
', f'
{data.total_user_messages}
User Messages
', f'
{data.total_shell_commands}
Shell Commands
', f'
{data.total_command_failures}
Command Failures
', f'
{data.total_web_searches}
Web Searches
', f'
{data.total_spawn_agents}
Spawn Agents
', f'
{data.total_mcp_calls}
MCP Calls
', "
", '
', f'

Projects

{generate_bar_chart(data.projects, "#2563eb", label_fn=project_label)}
', f'

Goals

{generate_bar_chart(data.goal_categories, "#0f766e")}
', f'

Tools

{generate_bar_chart(data.tool_counts, "#9333ea")}
', f'

Command Families

{generate_bar_chart(data.command_families, "#ea580c", label_fn=lambda value: value)}
', f'

Models

{generate_bar_chart(data.models, "#16a34a", label_fn=lambda value: value)}
', f'

Friction

{generate_bar_chart(data.friction, "#dc2626")}
', "
", '
', "

Recent Sessions

", ] for item in data.session_summaries[:18]: html_parts.extend( [ '
', '
', f'
{escape_html(item["project"])}
', f'
{escape_html(item["date"])} | {escape_html(safe_title(item["outcome"]))} | failures: {escape_html(item["failures"])}
', "
", f'

Prompt: {escape_html(item["prompt"] or "(none captured)")}

', f'

Summary: {escape_html(item["summary"])}

', "
", ] ) html_parts.extend( [ "
", f'', "
", "", "", ] ) return "\n".join(html_parts) def build_export_data( data: AggregatedData, insights: dict[str, Any], sessions: list[CodexSession], project_scope_prefix: str | None, ) -> dict[str, Any]: session_details = [] for session in sessions: session_details.append( { "session_id": session.session_id, "file_path": session.file_path, "start_time": session.start_time, "end_time": session.end_time, "cwd": session.cwd, "model": session.model, "model_provider": session.model_provider, "cli_version": session.cli_version, "duration_minutes": round(session.duration_minutes, 2), "first_prompt": session.first_prompt, "outcome": session.outcome, "tool_counts": session.tool_counts, "command_families": session.command_families, "goal_categories": session.goal_categories, "friction": session.friction, "command_failures": session.command_failures, "web_search_count": session.web_search_count, "mcp_call_count": session.mcp_call_count, "spawn_agent_count": session.spawn_agent_count, } ) return { "metadata": { "generated_at": datetime.now(tz=timezone.utc).isoformat(), "report_type": "codex-insights", "date_range": data.date_range, "session_count": data.total_sessions, "project_scope_prefix": project_scope_prefix, }, "aggregated_data": asdict(data), "insights": insights, "sessions": session_details, } def generate_report(args: argparse.Namespace) -> dict[str, Any]: ensure_dir(args.cache_dir) scope = build_project_scope(args.project_path_prefix) sessions = scan_all_sessions(args.sessions_dir, scope) aggregated = aggregate_sessions(sessions) insights = build_insights(aggregated) output_html = args.output_html or (args.cache_dir / "report.html") output_json = args.output_json or (args.cache_dir / "report.json") ensure_dir(output_html.parent) ensure_dir(output_json.parent) output_html.write_text( generate_html_report(aggregated, insights, args.project_path_prefix), encoding="utf-8", ) output_json.write_text( json.dumps( build_export_data( aggregated, insights, sessions, args.project_path_prefix ), indent=2, ), encoding="utf-8", ) return { "html_path": output_html, "json_path": output_json, "data": aggregated, "insights": insights, } def print_summary(result: dict[str, Any]) -> None: data: AggregatedData = result["data"] glance = result["insights"]["at_a_glance"] print(f"Wrote HTML report: {result['html_path']}") print(f"Wrote JSON export: {result['json_path']}") print( f"Analyzed {data.total_sessions} sessions " f"({data.total_user_messages} user messages, {round(data.total_duration_hours, 1)}h) " f"from {data.date_range.get('start', '')} to {data.date_range.get('end', '')}" ) print(f"What you do: {glance['what_you_do']}") print(f"What slows you down: {glance['what_slows_you_down']}") def main() -> int: args = parse_args() result = generate_report(args) print_summary(result) return 0 if __name__ == "__main__": raise SystemExit(main())