# /// script # requires-python = ">=3.11" # /// """Portable Gemini CLI usage report generator. This script scans Gemini CLI chat session files under ``~/.gemini`` and builds an HTML report plus JSON export. It is intentionally Gemini-specific: 1. Session discovery uses Gemini ``session-*.json`` chat files. 2. Metrics are derived from Gemini message fields like ``thoughts``, ``tokens``, ``toolCalls``, and ``info``/``error`` messages. 3. Project scoping matches Gemini project hashes and known project roots, with repo-family matching for worktrees and related clones. """ from __future__ import annotations import argparse import hashlib import html import json import os import re import shlex import subprocess from collections import Counter from dataclasses import asdict, dataclass, field from datetime import datetime, timezone from functools import lru_cache from pathlib import Path from typing import Any LABEL_MAP: dict[str, str] = { "debug_investigate": "Debug/Investigate", "implement_feature": "Implement Feature", "fix_bug": "Fix Bug", "write_script_tool": "Write Script/Tool", "refactor_code": "Refactor Code", "configure_system": "Configure System", "create_pr_commit": "Create PR/Commit", "analyze_data": "Analyze Data", "understand_codebase": "Understand Codebase", "write_tests": "Write Tests", "write_docs": "Write Docs", "manage_email": "Manage Email", "warmup_minimal": "Quick Check", "completed_cleanly": "Completed Cleanly", "completed_with_retries": "Completed With Retries", "interrupted": "Interrupted", "incomplete": "Incomplete", "tool_error": "Tool Error", "tool_cancelled": "Tool Cancelled", "command_failed": "Command Failed", "workspace_boundary": "Workspace Boundary", "loop_detected": "Loop Detected", "fallback_model": "Fallback Model", "malformed_function_call": "Malformed Function Call", "run_shell_command": "Shell", "read_file": "Read File", "read_many_files": "Read Many Files", "write_file": "Write File", "replace": "Replace", "search_file_content": "Search File Content", "grep_search": "Grep Search", "list_directory": "List Directory", "google_web_search": "Google Web Search", "activate_skill": "Activate Skill", "codebase_investigator": "Codebase Investigator", } GOAL_PATTERNS: dict[str, list[re.Pattern[str]]] = { "debug_investigate": [ re.compile(r"\bdebug\b", re.IGNORECASE), re.compile(r"\binvestigat", re.IGNORECASE), re.compile(r"\btrace\b", re.IGNORECASE), re.compile(r"\berror\b", re.IGNORECASE), re.compile(r"\bwhy\b", re.IGNORECASE), re.compile(r"\bfail", re.IGNORECASE), ], "implement_feature": [ re.compile(r"\bimplement\b", re.IGNORECASE), re.compile(r"\bbuild\b", re.IGNORECASE), re.compile(r"\bfeature\b", re.IGNORECASE), re.compile(r"\badd\b", re.IGNORECASE), re.compile(r"\bcreate\b", re.IGNORECASE), ], "fix_bug": [ re.compile(r"\bfix\b", re.IGNORECASE), re.compile(r"\bbug\b", re.IGNORECASE), re.compile(r"\bbroken\b", re.IGNORECASE), re.compile(r"\bfailing\b", re.IGNORECASE), ], "write_script_tool": [ re.compile(r"\bscript\b", re.IGNORECASE), re.compile(r"\bcli\b", re.IGNORECASE), re.compile(r"\btool\b", re.IGNORECASE), re.compile(r"\bautomation\b", re.IGNORECASE), ], "refactor_code": [ re.compile(r"\brefactor\b", re.IGNORECASE), re.compile(r"\bcleanup\b", re.IGNORECASE), re.compile(r"\breorgan", re.IGNORECASE), re.compile(r"\bsimplif", re.IGNORECASE), ], "configure_system": [ re.compile(r"\bconfigure\b", re.IGNORECASE), re.compile(r"\bsetup\b", re.IGNORECASE), re.compile(r"\binstall\b", re.IGNORECASE), re.compile(r"\bconfig\b", re.IGNORECASE), re.compile(r"\benv\b", re.IGNORECASE), re.compile(r"\bci\b", re.IGNORECASE), re.compile(r"\bauth\b", re.IGNORECASE), re.compile(r"\blogin\b", re.IGNORECASE), ], "create_pr_commit": [ re.compile(r"\bcommit\b", re.IGNORECASE), re.compile(r"\bpull request\b", re.IGNORECASE), re.compile(r"\bpr\b", re.IGNORECASE), re.compile(r"\bmerge\b", re.IGNORECASE), re.compile(r"\bbranch\b", re.IGNORECASE), ], "analyze_data": [ re.compile(r"\banaly[sz]e\b", re.IGNORECASE), re.compile(r"\bmetrics\b", re.IGNORECASE), re.compile(r"\breport\b", re.IGNORECASE), re.compile(r"\binsights?\b", re.IGNORECASE), re.compile(r"\bdata\b", re.IGNORECASE), ], "understand_codebase": [ re.compile(r"\bunderstand\b", re.IGNORECASE), re.compile(r"\bexplain\b", re.IGNORECASE), re.compile(r"\bwalk ?through\b", re.IGNORECASE), re.compile(r"\bhow does\b", re.IGNORECASE), re.compile(r"\bwhere is\b", re.IGNORECASE), re.compile(r"\bfind\b", re.IGNORECASE), re.compile(r"\breview\b", re.IGNORECASE), ], "write_tests": [ re.compile(r"\btests?\b", re.IGNORECASE), re.compile(r"\bpytest\b", re.IGNORECASE), re.compile(r"\bunit test\b", re.IGNORECASE), re.compile(r"\bintegration test\b", re.IGNORECASE), re.compile(r"\bbenchmark\b", re.IGNORECASE), ], "write_docs": [ re.compile(r"\breadme\b", re.IGNORECASE), re.compile(r"\bdocs?\b", re.IGNORECASE), re.compile(r"\bdocument", re.IGNORECASE), ], "manage_email": [ re.compile(r"\bgmail\b", re.IGNORECASE), re.compile(r"\bemail\b", re.IGNORECASE), re.compile(r"\binbox\b", re.IGNORECASE), re.compile(r"\bunsubscrib", re.IGNORECASE), re.compile(r"\bdeclutter\b", re.IGNORECASE), re.compile(r"\bdraft\b", re.IGNORECASE), re.compile(r"\bdelete\b", re.IGNORECASE), ], } FRICTION_DESCRIPTIONS: dict[str, str] = { "command_failed": "Shell execution is one of the main sources of retries in these sessions.", "tool_error": "File and edit tools are failing often enough to shape the flow.", "tool_cancelled": "Some requests are being cancelled before the tool chain lands cleanly.", "workspace_boundary": "Gemini is running into workspace boundaries, which slows cross-repo work.", "loop_detected": "At least one session tripped Gemini's loop protection.", "fallback_model": "Some sessions fell back to a different model midstream.", "malformed_function_call": "A malformed function call interrupted at least one request.", } GEMINI_GMAIL_PREFIX = "mcp_google-workspace_gmail." WORKSPACE_BOUNDARY_RE = re.compile( r"workspace directories|project temp directory", re.IGNORECASE ) HASH_DIR_RE = re.compile(r"^[0-9a-f]{64}$") EXIT_CODE_RE = re.compile(r"Exit Code:\s*(-?\d+)") TEST_COMMAND_RE = re.compile( r"\b(pytest|npm test|pnpm test|yarn test|cargo test|go test|vitest|jest|ruff|mypy|gradle test|mvn test)\b", re.IGNORECASE, ) GIT_COMMIT_RE = re.compile(r"(^|[;&|]\s*|\s)git\s+commit\b", re.IGNORECASE) GIT_PUSH_RE = re.compile(r"(^|[;&|]\s*|\s)git\s+push\b", re.IGNORECASE) GH_RE = re.compile(r"(^|[;&|]\s*|\s)gh\b", re.IGNORECASE) @dataclass(frozen=True) class RepoIdentity: root: str common_dir: str | None remotes: frozenset[str] worktrees: tuple[str, ...] = () @dataclass(frozen=True) class ProjectScope: target_prefix: str | None path_prefixes: frozenset[str] common_dirs: frozenset[str] remotes: frozenset[str] project_hashes: frozenset[str] @dataclass class GeminiSession: session_id: str file_path: str start_time: str end_time: str project_hash: str project_root: str primary_model: str duration_minutes: float user_message_count: int assistant_message_count: int info_message_count: int error_message_count: int thought_count: int tool_call_count: int tool_errors: int tool_cancellations: int shell_command_count: int command_failures: int web_search_count: int mcp_call_count: int file_read_count: int file_write_count: int replace_count: int skill_activation_count: int workspace_boundary_errors: int loop_events: int fallback_model_switches: int request_cancellations: int malformed_function_calls: int compressed_context_events: int total_input_tokens: int total_output_tokens: int total_cached_tokens: int total_thought_tokens: int total_tool_tokens: int total_tokens: int git_commits: int git_pushes: int gh_commands: int test_commands: int models: dict[str, int] = field(default_factory=dict) tool_counts: dict[str, int] = field(default_factory=dict) command_families: dict[str, int] = field(default_factory=dict) goal_categories: dict[str, int] = field(default_factory=dict) friction: dict[str, int] = field(default_factory=dict) user_messages: list[str] = field(default_factory=list) assistant_summaries: list[str] = field(default_factory=list) command_failure_examples: list[str] = field(default_factory=list) first_prompt: str = "" final_answer: str = "" outcome: str = "" @dataclass class AggregatedData: total_sessions: int date_range: dict[str, str] total_user_messages: int = 0 total_assistant_messages: int = 0 total_duration_hours: float = 0.0 total_thoughts: int = 0 total_tool_calls: int = 0 total_tool_errors: int = 0 total_tool_cancellations: int = 0 total_shell_commands: int = 0 total_command_failures: int = 0 total_web_searches: int = 0 total_mcp_calls: int = 0 total_file_reads: int = 0 total_file_writes: int = 0 total_replace_calls: int = 0 total_skill_activations: int = 0 total_workspace_boundary_errors: int = 0 total_loops: int = 0 total_fallback_model_switches: int = 0 total_request_cancellations: int = 0 total_malformed_function_calls: int = 0 total_compressed_context_events: int = 0 total_input_tokens: int = 0 total_output_tokens: int = 0 total_cached_tokens: int = 0 total_thought_tokens: int = 0 total_tool_tokens: int = 0 total_tokens: int = 0 git_commits: int = 0 git_pushes: int = 0 gh_commands: int = 0 test_commands: int = 0 sessions_with_mcp: int = 0 sessions_with_shell: int = 0 sessions_with_web_search: int = 0 sessions_with_skills: int = 0 tool_counts: dict[str, int] = field(default_factory=dict) command_families: dict[str, int] = field(default_factory=dict) models: dict[str, int] = field(default_factory=dict) projects: dict[str, int] = field(default_factory=dict) goal_categories: dict[str, int] = field(default_factory=dict) friction: dict[str, int] = field(default_factory=dict) outcomes: dict[str, int] = field(default_factory=dict) session_summaries: list[dict[str, str]] = field(default_factory=list) def parse_args() -> argparse.Namespace: script_dir = Path(__file__).resolve().parent home = Path(os.path.expanduser("~")) parser = argparse.ArgumentParser( description="Generate a Gemini CLI usage report from ~/.gemini.", ) parser.add_argument( "--gemini-dir", type=Path, default=home / ".gemini", help="Gemini CLI home directory containing session chats.", ) parser.add_argument( "--cache-dir", type=Path, default=script_dir / "gemini-insights-output", help="Directory for the generated HTML report and JSON export.", ) parser.add_argument( "--project-path-prefix", type=str, default=None, help=( "Only include sessions whose project root matches this path, a descendant, " "or a related repo/worktree/fork in the same repo family." ), ) parser.add_argument( "--output-html", type=Path, default=None, help="Path for the generated HTML report. Defaults to /report.html.", ) parser.add_argument( "--output-json", type=Path, default=None, help="Path for the JSON export. Defaults to /report.json.", ) return parser.parse_args() def ensure_dir(path: Path) -> None: path.mkdir(parents=True, exist_ok=True) def parse_iso_timestamp(value: str | None) -> datetime: if not value: return datetime.fromtimestamp(0, tz=timezone.utc) normalized = value if normalized.endswith("Z"): normalized = normalized[:-1] + "+00:00" try: return datetime.fromisoformat(normalized) except ValueError: return datetime.fromtimestamp(0, tz=timezone.utc) def iso_date(value: str) -> str: return parse_iso_timestamp(value).date().isoformat() def truncate(text: str, length: int) -> str: collapsed = " ".join(text.split()) if len(collapsed) <= length: return collapsed return collapsed[: max(0, length - 1)].rstrip() + "..." def safe_title(value: str) -> str: return LABEL_MAP.get(value, value.replace("_", " ").title()) def normalize_path_for_match(path_text: str) -> str: return os.path.normpath(os.path.realpath(os.path.expanduser(path_text))) def path_hash_variants(path_text: str) -> set[str]: expanded = os.path.normpath(os.path.expanduser(path_text)) normalized = normalize_path_for_match(path_text) variants = {expanded, normalized} return { hashlib.sha256(value.encode("utf-8")).hexdigest() for value in variants if value } def run_git(args: list[str], cwd: str) -> str | None: try: completed = subprocess.run( ["git", *args], cwd=cwd, check=True, stdout=subprocess.PIPE, stderr=subprocess.DEVNULL, text=True, ) except Exception: return None return completed.stdout.strip() @lru_cache(maxsize=512) def get_repo_identity(path_text: str) -> RepoIdentity | None: normalized = normalize_path_for_match(path_text) top_level = run_git(["rev-parse", "--show-toplevel"], normalized) if not top_level: return None common_dir = run_git(["rev-parse", "--git-common-dir"], normalized) if common_dir and not os.path.isabs(common_dir): common_dir = normalize_path_for_match( os.path.join(normalized, common_dir) ) elif common_dir: common_dir = normalize_path_for_match(common_dir) remote_lines = run_git(["remote", "-v"], normalized) or "" remotes: set[str] = set() for line in remote_lines.splitlines(): parts = line.split() if len(parts) >= 2: remotes.add(parts[1].removesuffix(".git")) worktree_text = ( run_git(["worktree", "list", "--porcelain"], normalized) or "" ) worktrees: list[str] = [] for line in worktree_text.splitlines(): if line.startswith("worktree "): worktrees.append( normalize_path_for_match( line.removeprefix("worktree ").strip() ) ) return RepoIdentity( root=normalize_path_for_match(top_level), common_dir=common_dir, remotes=frozenset(remotes), worktrees=tuple(worktrees), ) def discover_git_roots(base_path: Path) -> set[str]: discovered: set[str] = set() if not base_path.exists(): return discovered skip_dirs = { ".git", ".venv", "node_modules", "__pycache__", ".pytest_cache", ".ruff_cache", } for root, dirs, files in os.walk(base_path): dirs[:] = [entry for entry in dirs if entry not in skip_dirs] if ".git" in dirs or ".git" in files: discovered.add(normalize_path_for_match(root)) dirs[:] = [] return discovered def build_project_scope(prefix: str | None) -> ProjectScope: if not prefix: return ProjectScope( target_prefix=None, path_prefixes=frozenset(), common_dirs=frozenset(), remotes=frozenset(), project_hashes=frozenset(), ) normalized_prefix = normalize_path_for_match(prefix) path_prefixes: set[str] = {normalized_prefix} common_dirs: set[str] = set() remotes: set[str] = set() candidate_roots = discover_git_roots(Path(normalized_prefix)) direct_identity = get_repo_identity(normalized_prefix) if direct_identity: candidate_roots.add(direct_identity.root) for repo_root in candidate_roots: identity = get_repo_identity(repo_root) if not identity: continue path_prefixes.add(identity.root) path_prefixes.update(identity.worktrees) if identity.common_dir: common_dirs.add(identity.common_dir) remotes.update(identity.remotes) project_hashes: set[str] = set() for path_prefix in path_prefixes: project_hashes.update(path_hash_variants(path_prefix)) return ProjectScope( target_prefix=normalized_prefix, path_prefixes=frozenset(path_prefixes), common_dirs=frozenset(common_dirs), remotes=frozenset(remotes), project_hashes=frozenset(project_hashes), ) def matches_project_scope( project_root: str, project_hash: str, scope: ProjectScope ) -> bool: if scope.target_prefix is None: return True if project_root: normalized_root = normalize_path_for_match(project_root) for prefix in scope.path_prefixes: if normalized_root == prefix or normalized_root.startswith( prefix + os.sep ): return True identity = get_repo_identity(normalized_root) if identity: if ( identity.common_dir and identity.common_dir in scope.common_dirs ): return True if scope.remotes and identity.remotes.intersection(scope.remotes): return True return bool(project_hash and project_hash in scope.project_hashes) def extract_message_text( content: Any, display_content: Any | None = None ) -> str: def extract(value: Any) -> str: if isinstance(value, str): return value.strip() if isinstance(value, list): parts: list[str] = [] for item in value: if isinstance(item, str) and item.strip(): parts.append(item.strip()) continue if not isinstance(item, dict): continue text = item.get("text") if isinstance(text, str) and text.strip(): parts.append(text.strip()) return "\n".join(parts) return "" direct = extract(content) if direct: return direct return extract(display_content) def is_env_assignment(token: str) -> bool: return bool(re.match(r"^[A-Za-z_][A-Za-z0-9_]*=.*$", token)) def command_tokens(command_text: str) -> list[str]: if not command_text.strip(): return [] try: tokens = shlex.split(command_text) except ValueError: tokens = command_text.split() while tokens and is_env_assignment(tokens[0]): tokens.pop(0) return tokens def command_family(command_text: str) -> str: tokens = command_tokens(command_text) if not tokens: return "" return Path(tokens[0]).name def extract_tool_result_output(tool: dict[str, Any]) -> str: result = tool.get("result") if not isinstance(result, list): return "" parts: list[str] = [] for item in result: if not isinstance(item, dict): continue function_response = item.get("functionResponse") if not isinstance(function_response, dict): continue response = function_response.get("response") if not isinstance(response, dict): continue output = response.get("output") if isinstance(output, str) and output: parts.append(output) return "\n".join(parts) def extract_tool_text_blob(tool: dict[str, Any]) -> str: pieces = [ str(tool.get("resultDisplay") or "").strip(), extract_tool_result_output(tool).strip(), ] return "\n".join(piece for piece in pieces if piece) def extract_shell_exit_code(tool: dict[str, Any]) -> int | None: raw_output = extract_tool_result_output(tool) match = EXIT_CODE_RE.search(raw_output) if not match: return None try: return int(match.group(1)) except ValueError: return None def estimate_active_minutes( messages: list[dict[str, Any]], start_time: str, end_time: str, idle_cap_minutes: float = 15.0, ) -> float: timestamps: list[datetime] = [] for message in messages: if not isinstance(message, dict): continue timestamp = parse_iso_timestamp(str(message.get("timestamp") or "")) if timestamp.timestamp() > 0: timestamps.append(timestamp) if not timestamps: start_dt = parse_iso_timestamp(start_time) end_dt = parse_iso_timestamp(end_time or start_time) return max(0.0, (end_dt - start_dt).total_seconds() / 60.0) timestamps.sort() total_minutes = 0.0 previous = timestamps[0] for current in timestamps[1:]: gap_minutes = max(0.0, (current - previous).total_seconds() / 60.0) total_minutes += min(gap_minutes, idle_cap_minutes) previous = current return max(1.0, total_minutes) def session_candidate_key( session_path: Path, payload: dict[str, Any] ) -> tuple[int, float, int, int]: message_count = ( len(payload.get("messages", [])) if isinstance(payload.get("messages"), list) else 0 ) last_updated = parse_iso_timestamp( str(payload.get("lastUpdated") or payload.get("startTime") or "") ) parent_name = ( session_path.parent.parent.name if session_path.parent.name == "chats" else session_path.parent.name ) named_bonus = 1 if not HASH_DIR_RE.fullmatch(parent_name) else 0 return ( message_count, last_updated.timestamp(), named_bonus, -len(str(session_path)), ) def load_project_root_hashes(gemini_dir: Path) -> dict[str, str]: project_roots: dict[str, str] = {} for root_file in gemini_dir.rglob(".project_root"): try: raw_root = root_file.read_text(encoding="utf-8").strip() except Exception: continue if not raw_root: continue normalized_root = normalize_path_for_match(raw_root) for project_hash in path_hash_variants(raw_root): project_roots[project_hash] = normalized_root return project_roots def resolve_project_root( session_path: Path, project_hash: str, project_root_hashes: dict[str, str], ) -> str: if session_path.parent.name == "chats": candidate = session_path.parent.parent / ".project_root" if candidate.exists(): try: raw_root = candidate.read_text(encoding="utf-8").strip() except Exception: raw_root = "" if raw_root: return normalize_path_for_match(raw_root) return project_root_hashes.get(project_hash, "") def is_gmail_tool(tool_name: str) -> bool: return tool_name.startswith(GEMINI_GMAIL_PREFIX) def int_value(value: Any) -> int: try: return int(value) except Exception: return 0 def detect_goal_categories(session: GeminiSession) -> dict[str, int]: counts: Counter[str] = Counter() if session.user_message_count <= 1 and session.tool_call_count <= 1: return {"warmup_minimal": 1} for text in session.user_messages: for category, patterns in GOAL_PATTERNS.items(): if any(pattern.search(text) for pattern in patterns): counts[category] += 1 if any(is_gmail_tool(name) for name in session.tool_counts): counts["manage_email"] += 1 if session.test_commands > 0: counts["write_tests"] += 1 if ( session.git_commits > 0 or session.gh_commands > 0 or session.git_pushes > 0 ): counts["create_pr_commit"] += 1 if session.file_write_count > 0 or session.replace_count > 0: counts["implement_feature"] += int(not counts) if session.web_search_count > 0 and not counts: counts["understand_codebase"] += 1 if session.command_failures > 0 and not counts: counts["debug_investigate"] += 1 if session.shell_command_count > 0 and not counts: counts["understand_codebase"] += 1 if not counts: counts["warmup_minimal"] += 1 return dict(counts) def detect_friction(session: GeminiSession) -> dict[str, int]: counts: Counter[str] = Counter() if session.command_failures > 0: counts["command_failed"] += session.command_failures if session.tool_errors > 0: counts["tool_error"] += session.tool_errors if session.tool_cancellations > 0 or session.request_cancellations > 0: counts["tool_cancelled"] += ( session.tool_cancellations + session.request_cancellations ) if session.workspace_boundary_errors > 0: counts["workspace_boundary"] += session.workspace_boundary_errors if session.loop_events > 0: counts["loop_detected"] += session.loop_events if session.fallback_model_switches > 0: counts["fallback_model"] += session.fallback_model_switches if session.malformed_function_calls > 0: counts["malformed_function_call"] += session.malformed_function_calls return dict(counts) def infer_outcome(session: GeminiSession) -> str: if session.loop_events > 0 or session.malformed_function_calls > 0: return "interrupted" if ( session.assistant_message_count > 0 and session.command_failures == 0 and session.tool_errors == 0 and session.error_message_count == 0 and session.request_cancellations == 0 and session.tool_cancellations == 0 ): return "completed_cleanly" if session.assistant_message_count > 0: return "completed_with_retries" if session.request_cancellations > 0 or session.tool_cancellations > 0: return "interrupted" return "incomplete" def summarize_assistant(session: GeminiSession) -> str: if session.final_answer: return truncate(session.final_answer, 140) if session.assistant_summaries: return truncate(session.assistant_summaries[-1], 140) if session.first_prompt: return truncate(session.first_prompt, 140) return "No assistant summary captured." def parse_session_data( session_path: Path, payload: dict[str, Any], scope: ProjectScope, project_root_hashes: dict[str, str], ) -> GeminiSession | None: session_id = str(payload.get("sessionId") or session_path.stem) project_hash = str(payload.get("projectHash") or "") project_root = resolve_project_root( session_path, project_hash, project_root_hashes ) if not matches_project_scope(project_root, project_hash, scope): return None user_messages: list[str] = [] assistant_summaries: list[str] = [] command_failure_examples: list[str] = [] tool_counts: Counter[str] = Counter() command_families: Counter[str] = Counter() model_counts: Counter[str] = Counter() assistant_message_count = 0 info_message_count = 0 error_message_count = 0 thought_count = 0 tool_call_count = 0 tool_errors = 0 tool_cancellations = 0 shell_command_count = 0 command_failures = 0 web_search_count = 0 mcp_call_count = 0 file_read_count = 0 file_write_count = 0 replace_count = 0 skill_activation_count = 0 workspace_boundary_errors = 0 loop_events = 0 fallback_model_switches = 0 request_cancellations = 0 malformed_function_calls = 0 compressed_context_events = 0 total_input_tokens = 0 total_output_tokens = 0 total_cached_tokens = 0 total_thought_tokens = 0 total_tool_tokens = 0 total_tokens = 0 git_commits = 0 git_pushes = 0 gh_commands = 0 test_commands = 0 final_answer = "" messages = payload.get("messages") if not isinstance(messages, list): messages = [] for message in messages: if not isinstance(message, dict): continue message_type = str(message.get("type") or "") message_text = extract_message_text( message.get("content"), message.get("displayContent") ) if message_type == "user": if message_text: user_messages.append(message_text) continue if message_type == "gemini": if message_text: assistant_message_count += 1 final_answer = message_text if len(assistant_summaries) < 5: assistant_summaries.append(message_text) thoughts = message.get("thoughts") if isinstance(thoughts, list): thought_count += sum( 1 for item in thoughts if isinstance(item, dict) ) tokens = message.get("tokens") if isinstance(tokens, dict): total_input_tokens += int_value(tokens.get("input")) total_output_tokens += int_value(tokens.get("output")) total_cached_tokens += int_value(tokens.get("cached")) total_thought_tokens += int_value(tokens.get("thoughts")) total_tool_tokens += int_value(tokens.get("tool")) total_tokens += int_value(tokens.get("total")) model = str(message.get("model") or "") if model: model_counts[model] += 1 tool_calls = message.get("toolCalls") if not isinstance(tool_calls, list): continue tool_call_count += len(tool_calls) for tool in tool_calls: if not isinstance(tool, dict): continue name = str(tool.get("name") or "") if name: tool_counts[name] += 1 status = str(tool.get("status") or "") if status == "error": tool_errors += 1 elif status == "cancelled": tool_cancellations += 1 tool_blob = extract_tool_text_blob(tool) if WORKSPACE_BOUNDARY_RE.search(tool_blob): workspace_boundary_errors += 1 if name.startswith("mcp_"): mcp_call_count += 1 if name == "google_web_search": web_search_count += 1 if name in {"read_file", "read_many_files"}: file_read_count += 1 if name == "write_file": file_write_count += 1 if name == "replace": replace_count += 1 if name == "activate_skill": skill_activation_count += 1 if name != "run_shell_command": continue shell_command_count += 1 args = tool.get("args") command_text = "" if isinstance(args, dict): command_text = str(args.get("command") or "") if command_text: family = command_family(command_text) if family: command_families[family] += 1 lowered = command_text.lower() if GIT_COMMIT_RE.search(command_text): git_commits += 1 if GIT_PUSH_RE.search(command_text): git_pushes += 1 if GH_RE.search(command_text): gh_commands += 1 if TEST_COMMAND_RE.search(lowered): test_commands += 1 exit_code = extract_shell_exit_code(tool) if status == "error" or ( exit_code is not None and exit_code != 0 ): command_failures += 1 if command_text and len(command_failure_examples) < 5: command_failure_examples.append( truncate(command_text, 120) ) continue if message_type == "info": info_message_count += 1 lowered = message_text.lower() if "request cancelled" in lowered: request_cancellations += 1 if "loop was detected" in lowered: loop_events += 1 if "fallback model" in lowered: fallback_model_switches += 1 if "malformed function call" in lowered: malformed_function_calls += 1 if "compressed context" in lowered: compressed_context_events += 1 continue if message_type == "error": error_message_count += 1 if message_text and len(command_failure_examples) < 5: command_failure_examples.append(truncate(message_text, 120)) start_dt = parse_iso_timestamp(str(payload.get("startTime") or "")) end_dt = parse_iso_timestamp( str(payload.get("lastUpdated") or payload.get("startTime") or "") ) duration_minutes = estimate_active_minutes( messages, str(payload.get("startTime") or ""), str(payload.get("lastUpdated") or payload.get("startTime") or ""), ) primary_model = "" top_models = sorted( model_counts.items(), key=lambda item: item[1], reverse=True ) if top_models: primary_model = top_models[0][0] session = GeminiSession( session_id=session_id, file_path=str(session_path), start_time=start_dt.isoformat(), end_time=end_dt.isoformat(), project_hash=project_hash, project_root=project_root, primary_model=primary_model, duration_minutes=duration_minutes, user_message_count=len(user_messages), assistant_message_count=assistant_message_count, info_message_count=info_message_count, error_message_count=error_message_count, thought_count=thought_count, tool_call_count=tool_call_count, tool_errors=tool_errors, tool_cancellations=tool_cancellations, shell_command_count=shell_command_count, command_failures=command_failures, web_search_count=web_search_count, mcp_call_count=mcp_call_count, file_read_count=file_read_count, file_write_count=file_write_count, replace_count=replace_count, skill_activation_count=skill_activation_count, workspace_boundary_errors=workspace_boundary_errors, loop_events=loop_events, fallback_model_switches=fallback_model_switches, request_cancellations=request_cancellations, malformed_function_calls=malformed_function_calls, compressed_context_events=compressed_context_events, total_input_tokens=total_input_tokens, total_output_tokens=total_output_tokens, total_cached_tokens=total_cached_tokens, total_thought_tokens=total_thought_tokens, total_tool_tokens=total_tool_tokens, total_tokens=total_tokens, git_commits=git_commits, git_pushes=git_pushes, gh_commands=gh_commands, test_commands=test_commands, models=dict(model_counts), tool_counts=dict(tool_counts), command_families=dict(command_families), user_messages=user_messages, assistant_summaries=[ truncate(text, 160) for text in assistant_summaries ], command_failure_examples=command_failure_examples, first_prompt=truncate(user_messages[0], 160) if user_messages else "", final_answer=truncate(final_answer, 800) if final_answer else "", ) session.goal_categories = detect_goal_categories(session) session.friction = detect_friction(session) session.outcome = infer_outcome(session) return session def scan_all_sessions( gemini_dir: Path, scope: ProjectScope ) -> list[GeminiSession]: project_root_hashes = load_project_root_hashes(gemini_dir) best_payloads: dict[ str, tuple[Path, dict[str, Any], tuple[int, float, int, int]] ] = {} for session_file in gemini_dir.rglob("session-*.json"): try: payload = json.loads(session_file.read_text(encoding="utf-8")) except Exception: continue if not isinstance(payload, dict): continue session_id = str(payload.get("sessionId") or session_file.stem) key = session_candidate_key(session_file, payload) existing = best_payloads.get(session_id) if existing is None or key > existing[2]: best_payloads[session_id] = (session_file, payload, key) sessions: list[GeminiSession] = [] for session_file, payload, _key in best_payloads.values(): parsed = parse_session_data( session_file, payload, scope, project_root_hashes ) if parsed is not None: sessions.append(parsed) sessions.sort(key=lambda item: item.start_time, reverse=True) return sessions def top_entries( data: dict[str, int], limit: int = 5, exclude: set[str] | None = None ) -> list[tuple[str, int]]: blocked = exclude or set() return [ (key, value) for key, value in sorted( data.items(), key=lambda item: item[1], reverse=True ) if value > 0 and key not in blocked ][:limit] def project_key(project_root: str, project_hash: str) -> str: if project_root: return project_root if project_hash: return f"hash:{project_hash}" return "(unknown)" def project_label(value: str) -> str: if value.startswith("hash:"): return f"project {value.removeprefix('hash:')[:10]}" parts = Path(value).parts if len(parts) >= 2: return "/".join(parts[-2:]) if parts: return parts[-1] return value or "(unknown)" def aggregate_sessions(sessions: list[GeminiSession]) -> AggregatedData: aggregated = AggregatedData( total_sessions=len(sessions), date_range={"start": "", "end": ""}, ) dates: list[str] = [] for session in sessions: dates.append(session.start_time) aggregated.total_user_messages += session.user_message_count aggregated.total_assistant_messages += session.assistant_message_count aggregated.total_duration_hours += session.duration_minutes / 60.0 aggregated.total_thoughts += session.thought_count aggregated.total_tool_calls += session.tool_call_count aggregated.total_tool_errors += session.tool_errors aggregated.total_tool_cancellations += session.tool_cancellations aggregated.total_shell_commands += session.shell_command_count aggregated.total_command_failures += session.command_failures aggregated.total_web_searches += session.web_search_count aggregated.total_mcp_calls += session.mcp_call_count aggregated.total_file_reads += session.file_read_count aggregated.total_file_writes += session.file_write_count aggregated.total_replace_calls += session.replace_count aggregated.total_skill_activations += session.skill_activation_count aggregated.total_workspace_boundary_errors += ( session.workspace_boundary_errors ) aggregated.total_loops += session.loop_events aggregated.total_fallback_model_switches += ( session.fallback_model_switches ) aggregated.total_request_cancellations += session.request_cancellations aggregated.total_malformed_function_calls += ( session.malformed_function_calls ) aggregated.total_compressed_context_events += ( session.compressed_context_events ) aggregated.total_input_tokens += session.total_input_tokens aggregated.total_output_tokens += session.total_output_tokens aggregated.total_cached_tokens += session.total_cached_tokens aggregated.total_thought_tokens += session.total_thought_tokens aggregated.total_tool_tokens += session.total_tool_tokens aggregated.total_tokens += session.total_tokens aggregated.git_commits += session.git_commits aggregated.git_pushes += session.git_pushes aggregated.gh_commands += session.gh_commands aggregated.test_commands += session.test_commands aggregated.sessions_with_mcp += int(session.mcp_call_count > 0) aggregated.sessions_with_shell += int(session.shell_command_count > 0) aggregated.sessions_with_web_search += int( session.web_search_count > 0 ) aggregated.sessions_with_skills += int( session.skill_activation_count > 0 ) aggregated.projects[ project_key(session.project_root, session.project_hash) ] = ( aggregated.projects.get( project_key(session.project_root, session.project_hash), 0 ) + 1 ) aggregated.outcomes[session.outcome] = ( aggregated.outcomes.get(session.outcome, 0) + 1 ) for key, count in session.models.items(): aggregated.models[key] = aggregated.models.get(key, 0) + count for key, count in session.tool_counts.items(): aggregated.tool_counts[key] = ( aggregated.tool_counts.get(key, 0) + count ) for key, count in session.command_families.items(): aggregated.command_families[key] = ( aggregated.command_families.get(key, 0) + count ) for key, count in session.goal_categories.items(): aggregated.goal_categories[key] = ( aggregated.goal_categories.get(key, 0) + count ) for key, count in session.friction.items(): aggregated.friction[key] = aggregated.friction.get(key, 0) + count if len(aggregated.session_summaries) < 50: aggregated.session_summaries.append( { "id": session.session_id[:8], "date": iso_date(session.start_time), "project": project_label( project_key(session.project_root, session.project_hash) ), "prompt": session.first_prompt, "summary": summarize_assistant(session), "outcome": session.outcome, "failures": str( session.command_failures + session.tool_errors ), } ) if dates: dates.sort() aggregated.date_range["start"] = iso_date(dates[0]) aggregated.date_range["end"] = iso_date(dates[-1]) return aggregated def build_at_a_glance(data: AggregatedData) -> dict[str, str]: top_goal = top_entries( data.goal_categories, limit=1, exclude={"warmup_minimal"} ) top_project = top_entries(data.projects, limit=1) top_tool = top_entries(data.tool_counts, limit=3) top_command = top_entries(data.command_families, limit=3) top_friction = top_entries(data.friction, limit=1) work_text = "This Gemini slice spans several different kinds of work." if top_goal: work_text = f"Most Gemini work in this slice is about {safe_title(top_goal[0][0]).lower()}." if top_project: work_text += f" The most common project is {project_label(top_project[0][0])}." workflow_text = ( "Your workflow mixes chat steering, file tools, and shell execution." ) if data.total_shell_commands >= max(10, data.total_sessions * 5): workflow_text = "This is a terminal-first Gemini workflow: the shell is doing most of the heavy lifting." elif data.total_file_reads + data.total_replace_calls > max( 12, data.total_shell_commands ): workflow_text = "This is a file-tool-heavy Gemini workflow: reading and patching files matters more than driving the shell." if data.sessions_with_skills > 0: workflow_text += f" Skills are activated in {data.sessions_with_skills} session(s), so the flow is not purely generic chat." external_text = "External context is limited." if data.total_mcp_calls > 0: external_text = "These sessions do use MCP-backed context rather than relying only on shell and file tools." elif data.total_web_searches > 0: external_text = "External context comes mostly from Google web search." friction_text = "Measured friction is low." if top_friction: friction_key = top_friction[0][0] friction_text = FRICTION_DESCRIPTIONS.get( friction_key, f"The main drag is {safe_title(friction_key).lower()}.", ) tools_text = "Top tools are mixed." if top_tool or top_command: tool_names = ", ".join( safe_title(name) for name, _count in top_tool[:2] ) command_names = ", ".join(name for name, _count in top_command[:2]) bits = [part for part in (tool_names, command_names) if part] if bits: tools_text = f"The most-used levers are {bits[0]}" if len(bits) > 1: tools_text += f", with command families like {bits[1]}" tools_text += "." return { "what_you_do": work_text, "how_you_work": workflow_text, "external_context": external_text, "what_slows_you_down": friction_text, "most_used_levers": tools_text, } def build_insights(data: AggregatedData) -> dict[str, Any]: return { "at_a_glance": build_at_a_glance(data), "top_projects": [ { "path": path, "label": project_label(path), "session_count": count, } for path, count in top_entries(data.projects, limit=8) ], "top_goals": [ {"goal": goal, "label": safe_title(goal), "count": count} for goal, count in top_entries( data.goal_categories, limit=8, exclude={"warmup_minimal"} ) ], "top_tools": [ {"tool": tool, "label": safe_title(tool), "count": count} for tool, count in top_entries(data.tool_counts, limit=10) ], "top_commands": [ {"command": name, "count": count} for name, count in top_entries(data.command_families, limit=10) ], "friction": [ { "category": key, "label": safe_title(key), "count": count, "description": FRICTION_DESCRIPTIONS.get(key, ""), } for key, count in top_entries(data.friction, limit=8) ], } def escape_html(text: str) -> str: return html.escape(text or "") def generate_bar_chart( data: dict[str, int], color: str, max_items: int = 6, label_fn: Any | None = None, ) -> str: entries = top_entries(data, limit=max_items) if not entries: return '

No data

' max_value = max(count for _label, count in entries) or 1 rows: list[str] = [] for label, count in entries: display = label_fn(label) if label_fn else safe_title(label) width = (count / max_value) * 100 rows.append( f'
{escape_html(str(display))}
' f'
' f'
{count}
' ) return "\n".join(rows) def generate_html_report( data: AggregatedData, insights: dict[str, Any], project_scope_prefix: str | None, ) -> str: glance = insights["at_a_glance"] scope_label = project_scope_prefix or "All Gemini sessions" html_parts = [ "", '', "", '', '', "Gemini CLI Insights Report", "", "", "", '
', '
', "

Gemini CLI Insights

", f"

{escape_html(data.date_range.get('start', ''))} to {escape_html(data.date_range.get('end', ''))}. Built from local Gemini CLI chat sessions.

", f'
Scope: {escape_html(scope_label)}
', '
', f'

What you do: {escape_html(glance["what_you_do"])}

', f'

How you work: {escape_html(glance["how_you_work"])}

', f'

External context: {escape_html(glance["external_context"])}

', f'

What slows you down: {escape_html(glance["what_slows_you_down"])}

', f'

Most-used levers: {escape_html(glance["most_used_levers"])}

', "
", "
", '
', f'
{data.total_sessions}
Sessions
', f'
{round(data.total_duration_hours, 1)}
Hours
', f'
{data.total_user_messages}
User Messages
', f'
{data.total_tool_calls}
Tool Calls
', f'
{data.total_shell_commands}
Shell Commands
', f'
{data.total_command_failures}
Shell Failures
', f'
{data.total_mcp_calls}
MCP Calls
', f'
{data.total_thoughts}
Thoughts
', "
", '
', f'

Projects

{generate_bar_chart(data.projects, "#0f766e", label_fn=project_label)}
', f'

Goals

{generate_bar_chart(data.goal_categories, "#2563eb")}
', f'

Tools

{generate_bar_chart(data.tool_counts, "#7c3aed")}
', f'

Command Families

{generate_bar_chart(data.command_families, "#ea580c", label_fn=lambda value: value)}
', f'

Models

{generate_bar_chart(data.models, "#16a34a", label_fn=lambda value: value)}
', f'

Friction

{generate_bar_chart(data.friction, "#dc2626")}
', "
", '
', "

Recent Sessions

", ] for item in data.session_summaries[:18]: html_parts.extend( [ '
', '
', f'
{escape_html(item["project"])}
', f'
{escape_html(item["date"])} | {escape_html(safe_title(item["outcome"]))} | failures: {escape_html(item["failures"])}
', "
", f'

Prompt: {escape_html(item["prompt"] or "(none captured)")}

', f'

Summary: {escape_html(item["summary"])}

', "
", ] ) html_parts.extend( [ "
", ( f'" ), "
", "", "", ] ) return "\n".join(html_parts) def build_export_data( data: AggregatedData, insights: dict[str, Any], sessions: list[GeminiSession], project_scope_prefix: str | None, ) -> dict[str, Any]: return { "metadata": { "generated_at": datetime.now(tz=timezone.utc).isoformat(), "report_type": "gemini-insights", "date_range": data.date_range, "session_count": data.total_sessions, "project_scope_prefix": project_scope_prefix, }, "aggregated_data": asdict(data), "insights": insights, "sessions": [asdict(session) for session in sessions], } def generate_report(args: argparse.Namespace) -> dict[str, Any]: ensure_dir(args.cache_dir) scope = build_project_scope(args.project_path_prefix) sessions = scan_all_sessions(args.gemini_dir, scope) aggregated = aggregate_sessions(sessions) insights = build_insights(aggregated) output_html = args.output_html or (args.cache_dir / "report.html") output_json = args.output_json or (args.cache_dir / "report.json") ensure_dir(output_html.parent) ensure_dir(output_json.parent) output_html.write_text( generate_html_report(aggregated, insights, args.project_path_prefix), encoding="utf-8", ) output_json.write_text( json.dumps( build_export_data( aggregated, insights, sessions, args.project_path_prefix ), indent=2, ), encoding="utf-8", ) return { "html_path": output_html, "json_path": output_json, "data": aggregated, "insights": insights, } def print_summary(result: dict[str, Any]) -> None: data: AggregatedData = result["data"] glance = result["insights"]["at_a_glance"] print(f"Wrote HTML report: {result['html_path']}") print(f"Wrote JSON export: {result['json_path']}") print( f"Analyzed {data.total_sessions} sessions " f"({data.total_user_messages} user messages, {round(data.total_duration_hours, 1)}h) " f"from {data.date_range.get('start', '')} to {data.date_range.get('end', '')}" ) print(f"What you do: {glance['what_you_do']}") print(f"What slows you down: {glance['what_slows_you_down']}") def main() -> int: args = parse_args() result = generate_report(args) print_summary(result) return 0 if __name__ == "__main__": raise SystemExit(main())