codeflash-agent/scripts/gemini_insights.py

1577 lines
57 KiB
Python
Raw Normal View History

2026-04-09 08:36:01 +00:00
# /// script
# requires-python = ">=3.11"
# ///
"""Portable Gemini CLI usage report generator.
This script scans Gemini CLI chat session files under ``~/.gemini`` and builds
an HTML report plus JSON export.
It is intentionally Gemini-specific:
1. Session discovery uses Gemini ``session-*.json`` chat files.
2. Metrics are derived from Gemini message fields like ``thoughts``,
``tokens``, ``toolCalls``, and ``info``/``error`` messages.
3. Project scoping matches Gemini project hashes and known project roots, with
repo-family matching for worktrees and related clones.
"""
from __future__ import annotations
import argparse
import hashlib
import html
import json
import os
import re
import shlex
import subprocess
from collections import Counter
from dataclasses import asdict, dataclass, field
from datetime import datetime, timezone
from functools import lru_cache
from pathlib import Path
from typing import Any
LABEL_MAP: dict[str, str] = {
"debug_investigate": "Debug/Investigate",
"implement_feature": "Implement Feature",
"fix_bug": "Fix Bug",
"write_script_tool": "Write Script/Tool",
"refactor_code": "Refactor Code",
"configure_system": "Configure System",
"create_pr_commit": "Create PR/Commit",
"analyze_data": "Analyze Data",
"understand_codebase": "Understand Codebase",
"write_tests": "Write Tests",
"write_docs": "Write Docs",
"manage_email": "Manage Email",
"warmup_minimal": "Quick Check",
"completed_cleanly": "Completed Cleanly",
"completed_with_retries": "Completed With Retries",
"interrupted": "Interrupted",
"incomplete": "Incomplete",
"tool_error": "Tool Error",
"tool_cancelled": "Tool Cancelled",
"command_failed": "Command Failed",
"workspace_boundary": "Workspace Boundary",
"loop_detected": "Loop Detected",
"fallback_model": "Fallback Model",
"malformed_function_call": "Malformed Function Call",
"run_shell_command": "Shell",
"read_file": "Read File",
"read_many_files": "Read Many Files",
"write_file": "Write File",
"replace": "Replace",
"search_file_content": "Search File Content",
"grep_search": "Grep Search",
"list_directory": "List Directory",
"google_web_search": "Google Web Search",
"activate_skill": "Activate Skill",
"codebase_investigator": "Codebase Investigator",
}
GOAL_PATTERNS: dict[str, list[re.Pattern[str]]] = {
"debug_investigate": [
re.compile(r"\bdebug\b", re.IGNORECASE),
re.compile(r"\binvestigat", re.IGNORECASE),
re.compile(r"\btrace\b", re.IGNORECASE),
re.compile(r"\berror\b", re.IGNORECASE),
re.compile(r"\bwhy\b", re.IGNORECASE),
re.compile(r"\bfail", re.IGNORECASE),
2026-04-09 08:36:01 +00:00
],
"implement_feature": [
re.compile(r"\bimplement\b", re.IGNORECASE),
re.compile(r"\bbuild\b", re.IGNORECASE),
re.compile(r"\bfeature\b", re.IGNORECASE),
re.compile(r"\badd\b", re.IGNORECASE),
re.compile(r"\bcreate\b", re.IGNORECASE),
2026-04-09 08:36:01 +00:00
],
"fix_bug": [
re.compile(r"\bfix\b", re.IGNORECASE),
re.compile(r"\bbug\b", re.IGNORECASE),
re.compile(r"\bbroken\b", re.IGNORECASE),
re.compile(r"\bfailing\b", re.IGNORECASE),
2026-04-09 08:36:01 +00:00
],
"write_script_tool": [
re.compile(r"\bscript\b", re.IGNORECASE),
re.compile(r"\bcli\b", re.IGNORECASE),
re.compile(r"\btool\b", re.IGNORECASE),
re.compile(r"\bautomation\b", re.IGNORECASE),
2026-04-09 08:36:01 +00:00
],
"refactor_code": [
re.compile(r"\brefactor\b", re.IGNORECASE),
re.compile(r"\bcleanup\b", re.IGNORECASE),
re.compile(r"\breorgan", re.IGNORECASE),
re.compile(r"\bsimplif", re.IGNORECASE),
2026-04-09 08:36:01 +00:00
],
"configure_system": [
re.compile(r"\bconfigure\b", re.IGNORECASE),
re.compile(r"\bsetup\b", re.IGNORECASE),
re.compile(r"\binstall\b", re.IGNORECASE),
re.compile(r"\bconfig\b", re.IGNORECASE),
re.compile(r"\benv\b", re.IGNORECASE),
re.compile(r"\bci\b", re.IGNORECASE),
re.compile(r"\bauth\b", re.IGNORECASE),
re.compile(r"\blogin\b", re.IGNORECASE),
2026-04-09 08:36:01 +00:00
],
"create_pr_commit": [
re.compile(r"\bcommit\b", re.IGNORECASE),
re.compile(r"\bpull request\b", re.IGNORECASE),
re.compile(r"\bpr\b", re.IGNORECASE),
re.compile(r"\bmerge\b", re.IGNORECASE),
re.compile(r"\bbranch\b", re.IGNORECASE),
2026-04-09 08:36:01 +00:00
],
"analyze_data": [
re.compile(r"\banaly[sz]e\b", re.IGNORECASE),
re.compile(r"\bmetrics\b", re.IGNORECASE),
re.compile(r"\breport\b", re.IGNORECASE),
re.compile(r"\binsights?\b", re.IGNORECASE),
re.compile(r"\bdata\b", re.IGNORECASE),
2026-04-09 08:36:01 +00:00
],
"understand_codebase": [
re.compile(r"\bunderstand\b", re.IGNORECASE),
re.compile(r"\bexplain\b", re.IGNORECASE),
re.compile(r"\bwalk ?through\b", re.IGNORECASE),
re.compile(r"\bhow does\b", re.IGNORECASE),
re.compile(r"\bwhere is\b", re.IGNORECASE),
re.compile(r"\bfind\b", re.IGNORECASE),
re.compile(r"\breview\b", re.IGNORECASE),
2026-04-09 08:36:01 +00:00
],
"write_tests": [
re.compile(r"\btests?\b", re.IGNORECASE),
re.compile(r"\bpytest\b", re.IGNORECASE),
re.compile(r"\bunit test\b", re.IGNORECASE),
re.compile(r"\bintegration test\b", re.IGNORECASE),
re.compile(r"\bbenchmark\b", re.IGNORECASE),
2026-04-09 08:36:01 +00:00
],
"write_docs": [
re.compile(r"\breadme\b", re.IGNORECASE),
re.compile(r"\bdocs?\b", re.IGNORECASE),
re.compile(r"\bdocument", re.IGNORECASE),
2026-04-09 08:36:01 +00:00
],
"manage_email": [
re.compile(r"\bgmail\b", re.IGNORECASE),
re.compile(r"\bemail\b", re.IGNORECASE),
re.compile(r"\binbox\b", re.IGNORECASE),
re.compile(r"\bunsubscrib", re.IGNORECASE),
re.compile(r"\bdeclutter\b", re.IGNORECASE),
re.compile(r"\bdraft\b", re.IGNORECASE),
re.compile(r"\bdelete\b", re.IGNORECASE),
2026-04-09 08:36:01 +00:00
],
}
FRICTION_DESCRIPTIONS: dict[str, str] = {
"command_failed": "Shell execution is one of the main sources of retries in these sessions.",
"tool_error": "File and edit tools are failing often enough to shape the flow.",
"tool_cancelled": "Some requests are being cancelled before the tool chain lands cleanly.",
"workspace_boundary": "Gemini is running into workspace boundaries, which slows cross-repo work.",
"loop_detected": "At least one session tripped Gemini's loop protection.",
"fallback_model": "Some sessions fell back to a different model midstream.",
"malformed_function_call": "A malformed function call interrupted at least one request.",
}
GEMINI_GMAIL_PREFIX = "mcp_google-workspace_gmail."
WORKSPACE_BOUNDARY_RE = re.compile(
r"workspace directories|project temp directory", re.IGNORECASE
2026-04-09 08:36:01 +00:00
)
HASH_DIR_RE = re.compile(r"^[0-9a-f]{64}$")
EXIT_CODE_RE = re.compile(r"Exit Code:\s*(-?\d+)")
TEST_COMMAND_RE = re.compile(
r"\b(pytest|npm test|pnpm test|yarn test|cargo test|go test|vitest|jest|ruff|mypy|gradle test|mvn test)\b",
re.IGNORECASE,
2026-04-09 08:36:01 +00:00
)
GIT_COMMIT_RE = re.compile(r"(^|[;&|]\s*|\s)git\s+commit\b", re.IGNORECASE)
GIT_PUSH_RE = re.compile(r"(^|[;&|]\s*|\s)git\s+push\b", re.IGNORECASE)
GH_RE = re.compile(r"(^|[;&|]\s*|\s)gh\b", re.IGNORECASE)
2026-04-09 08:36:01 +00:00
@dataclass(frozen=True)
class RepoIdentity:
root: str
common_dir: str | None
remotes: frozenset[str]
worktrees: tuple[str, ...] = ()
@dataclass(frozen=True)
class ProjectScope:
target_prefix: str | None
path_prefixes: frozenset[str]
common_dirs: frozenset[str]
remotes: frozenset[str]
project_hashes: frozenset[str]
@dataclass
class GeminiSession:
session_id: str
file_path: str
start_time: str
end_time: str
project_hash: str
project_root: str
primary_model: str
duration_minutes: float
user_message_count: int
assistant_message_count: int
info_message_count: int
error_message_count: int
thought_count: int
tool_call_count: int
tool_errors: int
tool_cancellations: int
shell_command_count: int
command_failures: int
web_search_count: int
mcp_call_count: int
file_read_count: int
file_write_count: int
replace_count: int
skill_activation_count: int
workspace_boundary_errors: int
loop_events: int
fallback_model_switches: int
request_cancellations: int
malformed_function_calls: int
compressed_context_events: int
total_input_tokens: int
total_output_tokens: int
total_cached_tokens: int
total_thought_tokens: int
total_tool_tokens: int
total_tokens: int
git_commits: int
git_pushes: int
gh_commands: int
test_commands: int
models: dict[str, int] = field(default_factory=dict)
tool_counts: dict[str, int] = field(default_factory=dict)
command_families: dict[str, int] = field(default_factory=dict)
goal_categories: dict[str, int] = field(default_factory=dict)
friction: dict[str, int] = field(default_factory=dict)
user_messages: list[str] = field(default_factory=list)
assistant_summaries: list[str] = field(default_factory=list)
command_failure_examples: list[str] = field(default_factory=list)
first_prompt: str = ""
final_answer: str = ""
outcome: str = ""
@dataclass
class AggregatedData:
total_sessions: int
date_range: dict[str, str]
total_user_messages: int = 0
total_assistant_messages: int = 0
total_duration_hours: float = 0.0
total_thoughts: int = 0
total_tool_calls: int = 0
total_tool_errors: int = 0
total_tool_cancellations: int = 0
total_shell_commands: int = 0
total_command_failures: int = 0
total_web_searches: int = 0
total_mcp_calls: int = 0
total_file_reads: int = 0
total_file_writes: int = 0
total_replace_calls: int = 0
total_skill_activations: int = 0
total_workspace_boundary_errors: int = 0
total_loops: int = 0
total_fallback_model_switches: int = 0
total_request_cancellations: int = 0
total_malformed_function_calls: int = 0
total_compressed_context_events: int = 0
total_input_tokens: int = 0
total_output_tokens: int = 0
total_cached_tokens: int = 0
total_thought_tokens: int = 0
total_tool_tokens: int = 0
total_tokens: int = 0
git_commits: int = 0
git_pushes: int = 0
gh_commands: int = 0
test_commands: int = 0
sessions_with_mcp: int = 0
sessions_with_shell: int = 0
sessions_with_web_search: int = 0
sessions_with_skills: int = 0
tool_counts: dict[str, int] = field(default_factory=dict)
command_families: dict[str, int] = field(default_factory=dict)
models: dict[str, int] = field(default_factory=dict)
projects: dict[str, int] = field(default_factory=dict)
goal_categories: dict[str, int] = field(default_factory=dict)
friction: dict[str, int] = field(default_factory=dict)
outcomes: dict[str, int] = field(default_factory=dict)
session_summaries: list[dict[str, str]] = field(default_factory=list)
def parse_args() -> argparse.Namespace:
script_dir = Path(__file__).resolve().parent
home = Path(os.path.expanduser("~"))
parser = argparse.ArgumentParser(
description="Generate a Gemini CLI usage report from ~/.gemini.",
)
parser.add_argument(
"--gemini-dir",
type=Path,
default=home / ".gemini",
help="Gemini CLI home directory containing session chats.",
)
parser.add_argument(
"--cache-dir",
type=Path,
default=script_dir / "gemini-insights-output",
help="Directory for the generated HTML report and JSON export.",
)
parser.add_argument(
"--project-path-prefix",
type=str,
default=None,
help=(
"Only include sessions whose project root matches this path, a descendant, "
"or a related repo/worktree/fork in the same repo family."
),
)
parser.add_argument(
"--output-html",
type=Path,
default=None,
help="Path for the generated HTML report. Defaults to <cache-dir>/report.html.",
)
parser.add_argument(
"--output-json",
type=Path,
default=None,
help="Path for the JSON export. Defaults to <cache-dir>/report.json.",
)
return parser.parse_args()
def ensure_dir(path: Path) -> None:
path.mkdir(parents=True, exist_ok=True)
def parse_iso_timestamp(value: str | None) -> datetime:
if not value:
return datetime.fromtimestamp(0, tz=timezone.utc)
normalized = value
if normalized.endswith("Z"):
normalized = normalized[:-1] + "+00:00"
try:
return datetime.fromisoformat(normalized)
except ValueError:
return datetime.fromtimestamp(0, tz=timezone.utc)
def iso_date(value: str) -> str:
return parse_iso_timestamp(value).date().isoformat()
def truncate(text: str, length: int) -> str:
collapsed = " ".join(text.split())
if len(collapsed) <= length:
return collapsed
return collapsed[: max(0, length - 1)].rstrip() + "..."
def safe_title(value: str) -> str:
return LABEL_MAP.get(value, value.replace("_", " ").title())
def normalize_path_for_match(path_text: str) -> str:
return os.path.normpath(os.path.realpath(os.path.expanduser(path_text)))
def path_hash_variants(path_text: str) -> set[str]:
expanded = os.path.normpath(os.path.expanduser(path_text))
normalized = normalize_path_for_match(path_text)
variants = {expanded, normalized}
return {
hashlib.sha256(value.encode("utf-8")).hexdigest()
for value in variants
if value
}
def run_git(args: list[str], cwd: str) -> str | None:
try:
completed = subprocess.run(
["git", *args],
cwd=cwd,
check=True,
stdout=subprocess.PIPE,
stderr=subprocess.DEVNULL,
text=True,
)
except Exception:
return None
return completed.stdout.strip()
@lru_cache(maxsize=512)
def get_repo_identity(path_text: str) -> RepoIdentity | None:
normalized = normalize_path_for_match(path_text)
top_level = run_git(["rev-parse", "--show-toplevel"], normalized)
if not top_level:
return None
common_dir = run_git(["rev-parse", "--git-common-dir"], normalized)
if common_dir and not os.path.isabs(common_dir):
common_dir = normalize_path_for_match(
os.path.join(normalized, common_dir)
)
elif common_dir:
common_dir = normalize_path_for_match(common_dir)
remote_lines = run_git(["remote", "-v"], normalized) or ""
remotes: set[str] = set()
for line in remote_lines.splitlines():
parts = line.split()
if len(parts) >= 2:
remotes.add(parts[1].removesuffix(".git"))
worktree_text = (
run_git(["worktree", "list", "--porcelain"], normalized) or ""
)
worktrees: list[str] = []
for line in worktree_text.splitlines():
if line.startswith("worktree "):
worktrees.append(
normalize_path_for_match(
line.removeprefix("worktree ").strip()
)
)
return RepoIdentity(
root=normalize_path_for_match(top_level),
common_dir=common_dir,
remotes=frozenset(remotes),
worktrees=tuple(worktrees),
)
def discover_git_roots(base_path: Path) -> set[str]:
discovered: set[str] = set()
if not base_path.exists():
return discovered
skip_dirs = {
".git",
".venv",
"node_modules",
"__pycache__",
".pytest_cache",
".ruff_cache",
}
for root, dirs, files in os.walk(base_path):
dirs[:] = [entry for entry in dirs if entry not in skip_dirs]
if ".git" in dirs or ".git" in files:
discovered.add(normalize_path_for_match(root))
dirs[:] = []
return discovered
def build_project_scope(prefix: str | None) -> ProjectScope:
if not prefix:
return ProjectScope(
target_prefix=None,
path_prefixes=frozenset(),
common_dirs=frozenset(),
remotes=frozenset(),
project_hashes=frozenset(),
)
normalized_prefix = normalize_path_for_match(prefix)
path_prefixes: set[str] = {normalized_prefix}
common_dirs: set[str] = set()
remotes: set[str] = set()
candidate_roots = discover_git_roots(Path(normalized_prefix))
direct_identity = get_repo_identity(normalized_prefix)
if direct_identity:
candidate_roots.add(direct_identity.root)
for repo_root in candidate_roots:
identity = get_repo_identity(repo_root)
if not identity:
continue
path_prefixes.add(identity.root)
path_prefixes.update(identity.worktrees)
if identity.common_dir:
common_dirs.add(identity.common_dir)
remotes.update(identity.remotes)
project_hashes: set[str] = set()
for path_prefix in path_prefixes:
project_hashes.update(path_hash_variants(path_prefix))
return ProjectScope(
target_prefix=normalized_prefix,
path_prefixes=frozenset(path_prefixes),
common_dirs=frozenset(common_dirs),
remotes=frozenset(remotes),
project_hashes=frozenset(project_hashes),
)
def matches_project_scope(
project_root: str, project_hash: str, scope: ProjectScope
) -> bool:
if scope.target_prefix is None:
return True
if project_root:
normalized_root = normalize_path_for_match(project_root)
for prefix in scope.path_prefixes:
if normalized_root == prefix or normalized_root.startswith(
prefix + os.sep
):
return True
identity = get_repo_identity(normalized_root)
if identity:
if (
identity.common_dir
and identity.common_dir in scope.common_dirs
):
return True
if scope.remotes and identity.remotes.intersection(scope.remotes):
return True
return bool(project_hash and project_hash in scope.project_hashes)
def extract_message_text(
content: Any, display_content: Any | None = None
) -> str:
def extract(value: Any) -> str:
if isinstance(value, str):
return value.strip()
if isinstance(value, list):
parts: list[str] = []
for item in value:
if isinstance(item, str) and item.strip():
parts.append(item.strip())
continue
if not isinstance(item, dict):
continue
text = item.get("text")
if isinstance(text, str) and text.strip():
parts.append(text.strip())
return "\n".join(parts)
return ""
direct = extract(content)
if direct:
return direct
return extract(display_content)
def is_env_assignment(token: str) -> bool:
return bool(re.match(r"^[A-Za-z_][A-Za-z0-9_]*=.*$", token))
def command_tokens(command_text: str) -> list[str]:
if not command_text.strip():
return []
try:
tokens = shlex.split(command_text)
except ValueError:
tokens = command_text.split()
while tokens and is_env_assignment(tokens[0]):
tokens.pop(0)
return tokens
def command_family(command_text: str) -> str:
tokens = command_tokens(command_text)
if not tokens:
return ""
return Path(tokens[0]).name
def extract_tool_result_output(tool: dict[str, Any]) -> str:
result = tool.get("result")
if not isinstance(result, list):
return ""
parts: list[str] = []
for item in result:
if not isinstance(item, dict):
continue
function_response = item.get("functionResponse")
if not isinstance(function_response, dict):
continue
response = function_response.get("response")
if not isinstance(response, dict):
continue
output = response.get("output")
if isinstance(output, str) and output:
parts.append(output)
return "\n".join(parts)
def extract_tool_text_blob(tool: dict[str, Any]) -> str:
pieces = [
str(tool.get("resultDisplay") or "").strip(),
extract_tool_result_output(tool).strip(),
]
return "\n".join(piece for piece in pieces if piece)
def extract_shell_exit_code(tool: dict[str, Any]) -> int | None:
raw_output = extract_tool_result_output(tool)
match = EXIT_CODE_RE.search(raw_output)
if not match:
return None
try:
return int(match.group(1))
except ValueError:
return None
def estimate_active_minutes(
messages: list[dict[str, Any]],
start_time: str,
end_time: str,
idle_cap_minutes: float = 15.0,
) -> float:
timestamps: list[datetime] = []
for message in messages:
if not isinstance(message, dict):
continue
timestamp = parse_iso_timestamp(str(message.get("timestamp") or ""))
if timestamp.timestamp() > 0:
timestamps.append(timestamp)
if not timestamps:
start_dt = parse_iso_timestamp(start_time)
end_dt = parse_iso_timestamp(end_time or start_time)
return max(0.0, (end_dt - start_dt).total_seconds() / 60.0)
timestamps.sort()
total_minutes = 0.0
previous = timestamps[0]
for current in timestamps[1:]:
gap_minutes = max(0.0, (current - previous).total_seconds() / 60.0)
total_minutes += min(gap_minutes, idle_cap_minutes)
previous = current
return max(1.0, total_minutes)
def session_candidate_key(
session_path: Path, payload: dict[str, Any]
) -> tuple[int, float, int, int]:
message_count = (
len(payload.get("messages", []))
if isinstance(payload.get("messages"), list)
else 0
)
last_updated = parse_iso_timestamp(
str(payload.get("lastUpdated") or payload.get("startTime") or "")
)
parent_name = (
session_path.parent.parent.name
if session_path.parent.name == "chats"
else session_path.parent.name
)
named_bonus = 1 if not HASH_DIR_RE.fullmatch(parent_name) else 0
return (
message_count,
last_updated.timestamp(),
named_bonus,
-len(str(session_path)),
)
def load_project_root_hashes(gemini_dir: Path) -> dict[str, str]:
project_roots: dict[str, str] = {}
for root_file in gemini_dir.rglob(".project_root"):
try:
raw_root = root_file.read_text(encoding="utf-8").strip()
except Exception:
continue
if not raw_root:
continue
normalized_root = normalize_path_for_match(raw_root)
for project_hash in path_hash_variants(raw_root):
project_roots[project_hash] = normalized_root
return project_roots
def resolve_project_root(
session_path: Path,
project_hash: str,
project_root_hashes: dict[str, str],
) -> str:
if session_path.parent.name == "chats":
candidate = session_path.parent.parent / ".project_root"
if candidate.exists():
try:
raw_root = candidate.read_text(encoding="utf-8").strip()
except Exception:
raw_root = ""
if raw_root:
return normalize_path_for_match(raw_root)
return project_root_hashes.get(project_hash, "")
def is_gmail_tool(tool_name: str) -> bool:
return tool_name.startswith(GEMINI_GMAIL_PREFIX)
def int_value(value: Any) -> int:
try:
return int(value)
except Exception:
return 0
def detect_goal_categories(session: GeminiSession) -> dict[str, int]:
counts: Counter[str] = Counter()
if session.user_message_count <= 1 and session.tool_call_count <= 1:
return {"warmup_minimal": 1}
for text in session.user_messages:
for category, patterns in GOAL_PATTERNS.items():
if any(pattern.search(text) for pattern in patterns):
counts[category] += 1
if any(is_gmail_tool(name) for name in session.tool_counts):
counts["manage_email"] += 1
if session.test_commands > 0:
counts["write_tests"] += 1
if (
session.git_commits > 0
or session.gh_commands > 0
or session.git_pushes > 0
):
counts["create_pr_commit"] += 1
if session.file_write_count > 0 or session.replace_count > 0:
counts["implement_feature"] += int(not counts)
if session.web_search_count > 0 and not counts:
counts["understand_codebase"] += 1
if session.command_failures > 0 and not counts:
counts["debug_investigate"] += 1
if session.shell_command_count > 0 and not counts:
counts["understand_codebase"] += 1
if not counts:
counts["warmup_minimal"] += 1
return dict(counts)
def detect_friction(session: GeminiSession) -> dict[str, int]:
counts: Counter[str] = Counter()
if session.command_failures > 0:
counts["command_failed"] += session.command_failures
if session.tool_errors > 0:
counts["tool_error"] += session.tool_errors
if session.tool_cancellations > 0 or session.request_cancellations > 0:
counts["tool_cancelled"] += (
session.tool_cancellations + session.request_cancellations
)
if session.workspace_boundary_errors > 0:
counts["workspace_boundary"] += session.workspace_boundary_errors
if session.loop_events > 0:
counts["loop_detected"] += session.loop_events
if session.fallback_model_switches > 0:
counts["fallback_model"] += session.fallback_model_switches
if session.malformed_function_calls > 0:
counts["malformed_function_call"] += session.malformed_function_calls
return dict(counts)
def infer_outcome(session: GeminiSession) -> str:
if session.loop_events > 0 or session.malformed_function_calls > 0:
return "interrupted"
if (
session.assistant_message_count > 0
and session.command_failures == 0
and session.tool_errors == 0
and session.error_message_count == 0
and session.request_cancellations == 0
and session.tool_cancellations == 0
):
return "completed_cleanly"
if session.assistant_message_count > 0:
return "completed_with_retries"
if session.request_cancellations > 0 or session.tool_cancellations > 0:
return "interrupted"
return "incomplete"
def summarize_assistant(session: GeminiSession) -> str:
if session.final_answer:
return truncate(session.final_answer, 140)
if session.assistant_summaries:
return truncate(session.assistant_summaries[-1], 140)
if session.first_prompt:
return truncate(session.first_prompt, 140)
return "No assistant summary captured."
def parse_session_data(
session_path: Path,
payload: dict[str, Any],
scope: ProjectScope,
project_root_hashes: dict[str, str],
) -> GeminiSession | None:
session_id = str(payload.get("sessionId") or session_path.stem)
project_hash = str(payload.get("projectHash") or "")
project_root = resolve_project_root(
session_path, project_hash, project_root_hashes
)
if not matches_project_scope(project_root, project_hash, scope):
return None
user_messages: list[str] = []
assistant_summaries: list[str] = []
command_failure_examples: list[str] = []
tool_counts: Counter[str] = Counter()
command_families: Counter[str] = Counter()
model_counts: Counter[str] = Counter()
assistant_message_count = 0
info_message_count = 0
error_message_count = 0
thought_count = 0
tool_call_count = 0
tool_errors = 0
tool_cancellations = 0
shell_command_count = 0
command_failures = 0
web_search_count = 0
mcp_call_count = 0
file_read_count = 0
file_write_count = 0
replace_count = 0
skill_activation_count = 0
workspace_boundary_errors = 0
loop_events = 0
fallback_model_switches = 0
request_cancellations = 0
malformed_function_calls = 0
compressed_context_events = 0
total_input_tokens = 0
total_output_tokens = 0
total_cached_tokens = 0
total_thought_tokens = 0
total_tool_tokens = 0
total_tokens = 0
git_commits = 0
git_pushes = 0
gh_commands = 0
test_commands = 0
final_answer = ""
messages = payload.get("messages")
if not isinstance(messages, list):
messages = []
for message in messages:
if not isinstance(message, dict):
continue
message_type = str(message.get("type") or "")
message_text = extract_message_text(
message.get("content"), message.get("displayContent")
)
if message_type == "user":
if message_text:
user_messages.append(message_text)
continue
if message_type == "gemini":
if message_text:
assistant_message_count += 1
final_answer = message_text
if len(assistant_summaries) < 5:
assistant_summaries.append(message_text)
thoughts = message.get("thoughts")
if isinstance(thoughts, list):
thought_count += sum(
1 for item in thoughts if isinstance(item, dict)
)
tokens = message.get("tokens")
if isinstance(tokens, dict):
total_input_tokens += int_value(tokens.get("input"))
total_output_tokens += int_value(tokens.get("output"))
total_cached_tokens += int_value(tokens.get("cached"))
total_thought_tokens += int_value(tokens.get("thoughts"))
total_tool_tokens += int_value(tokens.get("tool"))
total_tokens += int_value(tokens.get("total"))
model = str(message.get("model") or "")
if model:
model_counts[model] += 1
tool_calls = message.get("toolCalls")
if not isinstance(tool_calls, list):
continue
tool_call_count += len(tool_calls)
for tool in tool_calls:
if not isinstance(tool, dict):
continue
name = str(tool.get("name") or "")
if name:
tool_counts[name] += 1
status = str(tool.get("status") or "")
if status == "error":
tool_errors += 1
elif status == "cancelled":
tool_cancellations += 1
tool_blob = extract_tool_text_blob(tool)
if WORKSPACE_BOUNDARY_RE.search(tool_blob):
workspace_boundary_errors += 1
if name.startswith("mcp_"):
mcp_call_count += 1
if name == "google_web_search":
web_search_count += 1
if name in {"read_file", "read_many_files"}:
file_read_count += 1
if name == "write_file":
file_write_count += 1
if name == "replace":
replace_count += 1
if name == "activate_skill":
skill_activation_count += 1
if name != "run_shell_command":
continue
shell_command_count += 1
args = tool.get("args")
command_text = ""
if isinstance(args, dict):
command_text = str(args.get("command") or "")
if command_text:
family = command_family(command_text)
if family:
command_families[family] += 1
lowered = command_text.lower()
if GIT_COMMIT_RE.search(command_text):
git_commits += 1
if GIT_PUSH_RE.search(command_text):
git_pushes += 1
if GH_RE.search(command_text):
gh_commands += 1
if TEST_COMMAND_RE.search(lowered):
test_commands += 1
exit_code = extract_shell_exit_code(tool)
if status == "error" or (
exit_code is not None and exit_code != 0
):
command_failures += 1
if command_text and len(command_failure_examples) < 5:
command_failure_examples.append(
truncate(command_text, 120)
)
continue
if message_type == "info":
info_message_count += 1
lowered = message_text.lower()
if "request cancelled" in lowered:
request_cancellations += 1
if "loop was detected" in lowered:
loop_events += 1
if "fallback model" in lowered:
fallback_model_switches += 1
if "malformed function call" in lowered:
malformed_function_calls += 1
if "compressed context" in lowered:
compressed_context_events += 1
continue
if message_type == "error":
error_message_count += 1
if message_text and len(command_failure_examples) < 5:
command_failure_examples.append(truncate(message_text, 120))
start_dt = parse_iso_timestamp(str(payload.get("startTime") or ""))
end_dt = parse_iso_timestamp(
str(payload.get("lastUpdated") or payload.get("startTime") or "")
)
duration_minutes = estimate_active_minutes(
messages,
str(payload.get("startTime") or ""),
str(payload.get("lastUpdated") or payload.get("startTime") or ""),
)
primary_model = ""
top_models = sorted(
model_counts.items(), key=lambda item: item[1], reverse=True
)
if top_models:
primary_model = top_models[0][0]
session = GeminiSession(
session_id=session_id,
file_path=str(session_path),
start_time=start_dt.isoformat(),
end_time=end_dt.isoformat(),
project_hash=project_hash,
project_root=project_root,
primary_model=primary_model,
duration_minutes=duration_minutes,
user_message_count=len(user_messages),
assistant_message_count=assistant_message_count,
info_message_count=info_message_count,
error_message_count=error_message_count,
thought_count=thought_count,
tool_call_count=tool_call_count,
tool_errors=tool_errors,
tool_cancellations=tool_cancellations,
shell_command_count=shell_command_count,
command_failures=command_failures,
web_search_count=web_search_count,
mcp_call_count=mcp_call_count,
file_read_count=file_read_count,
file_write_count=file_write_count,
replace_count=replace_count,
skill_activation_count=skill_activation_count,
workspace_boundary_errors=workspace_boundary_errors,
loop_events=loop_events,
fallback_model_switches=fallback_model_switches,
request_cancellations=request_cancellations,
malformed_function_calls=malformed_function_calls,
compressed_context_events=compressed_context_events,
total_input_tokens=total_input_tokens,
total_output_tokens=total_output_tokens,
total_cached_tokens=total_cached_tokens,
total_thought_tokens=total_thought_tokens,
total_tool_tokens=total_tool_tokens,
total_tokens=total_tokens,
git_commits=git_commits,
git_pushes=git_pushes,
gh_commands=gh_commands,
test_commands=test_commands,
models=dict(model_counts),
tool_counts=dict(tool_counts),
command_families=dict(command_families),
user_messages=user_messages,
assistant_summaries=[
truncate(text, 160) for text in assistant_summaries
],
command_failure_examples=command_failure_examples,
first_prompt=truncate(user_messages[0], 160) if user_messages else "",
final_answer=truncate(final_answer, 800) if final_answer else "",
)
session.goal_categories = detect_goal_categories(session)
session.friction = detect_friction(session)
session.outcome = infer_outcome(session)
return session
def scan_all_sessions(
gemini_dir: Path, scope: ProjectScope
) -> list[GeminiSession]:
project_root_hashes = load_project_root_hashes(gemini_dir)
best_payloads: dict[
str, tuple[Path, dict[str, Any], tuple[int, float, int, int]]
] = {}
for session_file in gemini_dir.rglob("session-*.json"):
try:
payload = json.loads(session_file.read_text(encoding="utf-8"))
except Exception:
continue
if not isinstance(payload, dict):
continue
session_id = str(payload.get("sessionId") or session_file.stem)
key = session_candidate_key(session_file, payload)
existing = best_payloads.get(session_id)
if existing is None or key > existing[2]:
best_payloads[session_id] = (session_file, payload, key)
sessions: list[GeminiSession] = []
for session_file, payload, _key in best_payloads.values():
parsed = parse_session_data(
session_file, payload, scope, project_root_hashes
)
if parsed is not None:
sessions.append(parsed)
sessions.sort(key=lambda item: item.start_time, reverse=True)
return sessions
def top_entries(
data: dict[str, int], limit: int = 5, exclude: set[str] | None = None
) -> list[tuple[str, int]]:
blocked = exclude or set()
return [
(key, value)
for key, value in sorted(
data.items(), key=lambda item: item[1], reverse=True
)
if value > 0 and key not in blocked
][:limit]
def project_key(project_root: str, project_hash: str) -> str:
if project_root:
return project_root
if project_hash:
return f"hash:{project_hash}"
return "(unknown)"
def project_label(value: str) -> str:
if value.startswith("hash:"):
return f"project {value.removeprefix('hash:')[:10]}"
parts = Path(value).parts
if len(parts) >= 2:
return "/".join(parts[-2:])
if parts:
return parts[-1]
return value or "(unknown)"
def aggregate_sessions(sessions: list[GeminiSession]) -> AggregatedData:
aggregated = AggregatedData(
total_sessions=len(sessions),
date_range={"start": "", "end": ""},
)
dates: list[str] = []
for session in sessions:
dates.append(session.start_time)
aggregated.total_user_messages += session.user_message_count
aggregated.total_assistant_messages += session.assistant_message_count
aggregated.total_duration_hours += session.duration_minutes / 60.0
aggregated.total_thoughts += session.thought_count
aggregated.total_tool_calls += session.tool_call_count
aggregated.total_tool_errors += session.tool_errors
aggregated.total_tool_cancellations += session.tool_cancellations
aggregated.total_shell_commands += session.shell_command_count
aggregated.total_command_failures += session.command_failures
aggregated.total_web_searches += session.web_search_count
aggregated.total_mcp_calls += session.mcp_call_count
aggregated.total_file_reads += session.file_read_count
aggregated.total_file_writes += session.file_write_count
aggregated.total_replace_calls += session.replace_count
aggregated.total_skill_activations += session.skill_activation_count
aggregated.total_workspace_boundary_errors += (
session.workspace_boundary_errors
)
aggregated.total_loops += session.loop_events
aggregated.total_fallback_model_switches += (
session.fallback_model_switches
)
aggregated.total_request_cancellations += session.request_cancellations
aggregated.total_malformed_function_calls += (
session.malformed_function_calls
)
aggregated.total_compressed_context_events += (
session.compressed_context_events
)
aggregated.total_input_tokens += session.total_input_tokens
aggregated.total_output_tokens += session.total_output_tokens
aggregated.total_cached_tokens += session.total_cached_tokens
aggregated.total_thought_tokens += session.total_thought_tokens
aggregated.total_tool_tokens += session.total_tool_tokens
aggregated.total_tokens += session.total_tokens
aggregated.git_commits += session.git_commits
aggregated.git_pushes += session.git_pushes
aggregated.gh_commands += session.gh_commands
aggregated.test_commands += session.test_commands
aggregated.sessions_with_mcp += int(session.mcp_call_count > 0)
aggregated.sessions_with_shell += int(session.shell_command_count > 0)
aggregated.sessions_with_web_search += int(
session.web_search_count > 0
)
aggregated.sessions_with_skills += int(
session.skill_activation_count > 0
)
aggregated.projects[
project_key(session.project_root, session.project_hash)
] = (
aggregated.projects.get(
project_key(session.project_root, session.project_hash), 0
)
+ 1
)
aggregated.outcomes[session.outcome] = (
aggregated.outcomes.get(session.outcome, 0) + 1
)
for key, count in session.models.items():
aggregated.models[key] = aggregated.models.get(key, 0) + count
for key, count in session.tool_counts.items():
aggregated.tool_counts[key] = (
aggregated.tool_counts.get(key, 0) + count
)
for key, count in session.command_families.items():
aggregated.command_families[key] = (
aggregated.command_families.get(key, 0) + count
)
for key, count in session.goal_categories.items():
aggregated.goal_categories[key] = (
aggregated.goal_categories.get(key, 0) + count
)
for key, count in session.friction.items():
aggregated.friction[key] = aggregated.friction.get(key, 0) + count
if len(aggregated.session_summaries) < 50:
aggregated.session_summaries.append(
{
"id": session.session_id[:8],
"date": iso_date(session.start_time),
"project": project_label(
project_key(session.project_root, session.project_hash)
),
"prompt": session.first_prompt,
"summary": summarize_assistant(session),
"outcome": session.outcome,
"failures": str(
session.command_failures + session.tool_errors
),
}
)
if dates:
dates.sort()
aggregated.date_range["start"] = iso_date(dates[0])
aggregated.date_range["end"] = iso_date(dates[-1])
return aggregated
def build_at_a_glance(data: AggregatedData) -> dict[str, str]:
top_goal = top_entries(
data.goal_categories, limit=1, exclude={"warmup_minimal"}
)
top_project = top_entries(data.projects, limit=1)
top_tool = top_entries(data.tool_counts, limit=3)
top_command = top_entries(data.command_families, limit=3)
top_friction = top_entries(data.friction, limit=1)
work_text = "This Gemini slice spans several different kinds of work."
if top_goal:
work_text = f"Most Gemini work in this slice is about {safe_title(top_goal[0][0]).lower()}."
if top_project:
work_text += f" The most common project is {project_label(top_project[0][0])}."
workflow_text = (
"Your workflow mixes chat steering, file tools, and shell execution."
)
if data.total_shell_commands >= max(10, data.total_sessions * 5):
workflow_text = "This is a terminal-first Gemini workflow: the shell is doing most of the heavy lifting."
elif data.total_file_reads + data.total_replace_calls > max(
12, data.total_shell_commands
):
workflow_text = "This is a file-tool-heavy Gemini workflow: reading and patching files matters more than driving the shell."
if data.sessions_with_skills > 0:
workflow_text += f" Skills are activated in {data.sessions_with_skills} session(s), so the flow is not purely generic chat."
external_text = "External context is limited."
if data.total_mcp_calls > 0:
external_text = "These sessions do use MCP-backed context rather than relying only on shell and file tools."
elif data.total_web_searches > 0:
external_text = "External context comes mostly from Google web search."
friction_text = "Measured friction is low."
if top_friction:
friction_key = top_friction[0][0]
friction_text = FRICTION_DESCRIPTIONS.get(
friction_key,
f"The main drag is {safe_title(friction_key).lower()}.",
)
tools_text = "Top tools are mixed."
if top_tool or top_command:
tool_names = ", ".join(
safe_title(name) for name, _count in top_tool[:2]
)
command_names = ", ".join(name for name, _count in top_command[:2])
bits = [part for part in (tool_names, command_names) if part]
if bits:
tools_text = f"The most-used levers are {bits[0]}"
if len(bits) > 1:
tools_text += f", with command families like {bits[1]}"
tools_text += "."
return {
"what_you_do": work_text,
"how_you_work": workflow_text,
"external_context": external_text,
"what_slows_you_down": friction_text,
"most_used_levers": tools_text,
}
def build_insights(data: AggregatedData) -> dict[str, Any]:
return {
"at_a_glance": build_at_a_glance(data),
"top_projects": [
{
"path": path,
"label": project_label(path),
"session_count": count,
}
for path, count in top_entries(data.projects, limit=8)
],
"top_goals": [
{"goal": goal, "label": safe_title(goal), "count": count}
for goal, count in top_entries(
data.goal_categories, limit=8, exclude={"warmup_minimal"}
)
],
"top_tools": [
{"tool": tool, "label": safe_title(tool), "count": count}
for tool, count in top_entries(data.tool_counts, limit=10)
],
"top_commands": [
{"command": name, "count": count}
for name, count in top_entries(data.command_families, limit=10)
],
"friction": [
{
"category": key,
"label": safe_title(key),
"count": count,
"description": FRICTION_DESCRIPTIONS.get(key, ""),
}
for key, count in top_entries(data.friction, limit=8)
],
}
def escape_html(text: str) -> str:
return html.escape(text or "")
def generate_bar_chart(
data: dict[str, int],
color: str,
max_items: int = 6,
label_fn: Any | None = None,
) -> str:
entries = top_entries(data, limit=max_items)
if not entries:
return '<p class="empty">No data</p>'
max_value = max(count for _label, count in entries) or 1
rows: list[str] = []
for label, count in entries:
display = label_fn(label) if label_fn else safe_title(label)
width = (count / max_value) * 100
rows.append(
f'<div class="bar-row"><div class="bar-label">{escape_html(str(display))}</div>'
f'<div class="bar-track"><div class="bar-fill" style="width:{width:.2f}%;background:{color}"></div></div>'
f'<div class="bar-value">{count}</div></div>'
)
return "\n".join(rows)
def generate_html_report(
data: AggregatedData,
insights: dict[str, Any],
project_scope_prefix: str | None,
) -> str:
glance = insights["at_a_glance"]
scope_label = project_scope_prefix or "All Gemini sessions"
html_parts = [
"<!doctype html>",
'<html lang="en">',
"<head>",
'<meta charset="utf-8">',
'<meta name="viewport" content="width=device-width, initial-scale=1">',
"<title>Gemini CLI Insights Report</title>",
"<style>",
"body{margin:0;font-family:ui-sans-serif,system-ui,-apple-system,BlinkMacSystemFont,'Segoe UI',sans-serif;background:#f8fafc;color:#0f172a;}",
".page{max-width:1180px;margin:0 auto;padding:40px 24px 56px;}",
".hero{padding:28px 30px;border-radius:22px;background:linear-gradient(135deg,#111827,#0f766e 58%,#f59e0b);color:#ecfeff;box-shadow:0 24px 80px rgba(15,23,42,.18);}",
".hero h1{margin:0 0 8px;font-size:34px;line-height:1.05;}",
".hero p{margin:0;color:#d1fae5;max-width:880px;}",
".scope{margin-top:12px;font-size:13px;color:#a7f3d0;}",
".glance{margin-top:20px;padding:18px 20px;border-radius:18px;background:rgba(255,255,255,.10);border:1px solid rgba(255,255,255,.16);}",
".glance-line{margin:0 0 10px;}",
".stats{display:grid;grid-template-columns:repeat(auto-fit,minmax(150px,1fr));gap:14px;margin:22px 0 30px;}",
".stat{background:#fff;border:1px solid #e2e8f0;border-radius:16px;padding:16px 18px;box-shadow:0 8px 30px rgba(15,23,42,.05);}",
".stat-value{font-size:28px;font-weight:700;}",
".stat-label{margin-top:4px;font-size:12px;text-transform:uppercase;letter-spacing:.08em;color:#64748b;}",
".grid{display:grid;grid-template-columns:repeat(auto-fit,minmax(320px,1fr));gap:18px;}",
".panel{background:#fff;border:1px solid #e2e8f0;border-radius:18px;padding:18px;box-shadow:0 8px 30px rgba(15,23,42,.05);}",
".panel h2{margin:0 0 14px;font-size:15px;text-transform:uppercase;letter-spacing:.08em;color:#64748b;}",
".bar-row{display:flex;align-items:center;gap:10px;margin-bottom:10px;}",
".bar-label{width:140px;font-size:13px;line-height:1.25;color:#1e293b;}",
".bar-track{flex:1;height:10px;border-radius:999px;background:#e2e8f0;overflow:hidden;}",
".bar-fill{height:100%;border-radius:999px;}",
".bar-value{width:34px;text-align:right;font-size:12px;color:#475569;}",
".empty{margin:0;color:#94a3b8;}",
".sessions{margin-top:26px;background:#fff;border:1px solid #e2e8f0;border-radius:18px;padding:18px;box-shadow:0 8px 30px rgba(15,23,42,.05);}",
".sessions h2{margin:0 0 14px;font-size:15px;text-transform:uppercase;letter-spacing:.08em;color:#64748b;}",
".session{padding:14px 0;border-top:1px solid #e2e8f0;}",
".session:first-of-type{border-top:none;padding-top:0;}",
".session-top{display:flex;justify-content:space-between;gap:12px;flex-wrap:wrap;}",
".session-project{font-weight:700;}",
".session-meta{font-size:12px;color:#64748b;}",
".session-prompt,.session-summary{margin:6px 0 0;font-size:14px;line-height:1.45;}",
".footer{margin-top:28px;font-size:12px;color:#64748b;}",
"@media (max-width:700px){.hero h1{font-size:28px}.bar-label{width:110px}}",
"</style>",
"</head>",
"<body>",
'<div class="page">',
'<div class="hero">',
"<h1>Gemini CLI Insights</h1>",
f"<p>{escape_html(data.date_range.get('start', ''))} to {escape_html(data.date_range.get('end', ''))}. Built from local Gemini CLI chat sessions.</p>",
f'<div class="scope">Scope: {escape_html(scope_label)}</div>',
'<div class="glance">',
f'<p class="glance-line"><strong>What you do:</strong> {escape_html(glance["what_you_do"])}</p>',
f'<p class="glance-line"><strong>How you work:</strong> {escape_html(glance["how_you_work"])}</p>',
f'<p class="glance-line"><strong>External context:</strong> {escape_html(glance["external_context"])}</p>',
f'<p class="glance-line"><strong>What slows you down:</strong> {escape_html(glance["what_slows_you_down"])}</p>',
f'<p class="glance-line"><strong>Most-used levers:</strong> {escape_html(glance["most_used_levers"])}</p>',
"</div>",
"</div>",
'<div class="stats">',
f'<div class="stat"><div class="stat-value">{data.total_sessions}</div><div class="stat-label">Sessions</div></div>',
f'<div class="stat"><div class="stat-value">{round(data.total_duration_hours, 1)}</div><div class="stat-label">Hours</div></div>',
f'<div class="stat"><div class="stat-value">{data.total_user_messages}</div><div class="stat-label">User Messages</div></div>',
f'<div class="stat"><div class="stat-value">{data.total_tool_calls}</div><div class="stat-label">Tool Calls</div></div>',
f'<div class="stat"><div class="stat-value">{data.total_shell_commands}</div><div class="stat-label">Shell Commands</div></div>',
f'<div class="stat"><div class="stat-value">{data.total_command_failures}</div><div class="stat-label">Shell Failures</div></div>',
f'<div class="stat"><div class="stat-value">{data.total_mcp_calls}</div><div class="stat-label">MCP Calls</div></div>',
f'<div class="stat"><div class="stat-value">{data.total_thoughts}</div><div class="stat-label">Thoughts</div></div>',
"</div>",
'<div class="grid">',
f'<div class="panel"><h2>Projects</h2>{generate_bar_chart(data.projects, "#0f766e", label_fn=project_label)}</div>',
f'<div class="panel"><h2>Goals</h2>{generate_bar_chart(data.goal_categories, "#2563eb")}</div>',
f'<div class="panel"><h2>Tools</h2>{generate_bar_chart(data.tool_counts, "#7c3aed")}</div>',
f'<div class="panel"><h2>Command Families</h2>{generate_bar_chart(data.command_families, "#ea580c", label_fn=lambda value: value)}</div>',
f'<div class="panel"><h2>Models</h2>{generate_bar_chart(data.models, "#16a34a", label_fn=lambda value: value)}</div>',
f'<div class="panel"><h2>Friction</h2>{generate_bar_chart(data.friction, "#dc2626")}</div>',
"</div>",
'<div class="sessions">',
"<h2>Recent Sessions</h2>",
]
for item in data.session_summaries[:18]:
html_parts.extend(
[
'<div class="session">',
'<div class="session-top">',
f'<div class="session-project">{escape_html(item["project"])}</div>',
f'<div class="session-meta">{escape_html(item["date"])} | {escape_html(safe_title(item["outcome"]))} | failures: {escape_html(item["failures"])}</div>',
"</div>",
f'<p class="session-prompt"><strong>Prompt:</strong> {escape_html(item["prompt"] or "(none captured)")}</p>',
f'<p class="session-summary"><strong>Summary:</strong> {escape_html(item["summary"])}</p>',
"</div>",
]
)
html_parts.extend(
[
"</div>",
(
f'<div class="footer">Input tokens: {data.total_input_tokens:,} | '
f"Output tokens: {data.total_output_tokens:,} | "
f"Cached tokens: {data.total_cached_tokens:,} | "
f"Thought tokens: {data.total_thought_tokens:,}</div>"
),
"</div>",
"</body>",
"</html>",
]
)
return "\n".join(html_parts)
def build_export_data(
data: AggregatedData,
insights: dict[str, Any],
sessions: list[GeminiSession],
project_scope_prefix: str | None,
) -> dict[str, Any]:
return {
"metadata": {
"generated_at": datetime.now(tz=timezone.utc).isoformat(),
"report_type": "gemini-insights",
"date_range": data.date_range,
"session_count": data.total_sessions,
"project_scope_prefix": project_scope_prefix,
},
"aggregated_data": asdict(data),
"insights": insights,
"sessions": [asdict(session) for session in sessions],
}
def generate_report(args: argparse.Namespace) -> dict[str, Any]:
ensure_dir(args.cache_dir)
scope = build_project_scope(args.project_path_prefix)
sessions = scan_all_sessions(args.gemini_dir, scope)
aggregated = aggregate_sessions(sessions)
insights = build_insights(aggregated)
output_html = args.output_html or (args.cache_dir / "report.html")
output_json = args.output_json or (args.cache_dir / "report.json")
ensure_dir(output_html.parent)
ensure_dir(output_json.parent)
output_html.write_text(
generate_html_report(aggregated, insights, args.project_path_prefix),
encoding="utf-8",
)
output_json.write_text(
json.dumps(
build_export_data(
aggregated, insights, sessions, args.project_path_prefix
),
indent=2,
),
encoding="utf-8",
)
return {
"html_path": output_html,
"json_path": output_json,
"data": aggregated,
"insights": insights,
}
def print_summary(result: dict[str, Any]) -> None:
data: AggregatedData = result["data"]
glance = result["insights"]["at_a_glance"]
print(f"Wrote HTML report: {result['html_path']}")
print(f"Wrote JSON export: {result['json_path']}")
print(
f"Analyzed {data.total_sessions} sessions "
f"({data.total_user_messages} user messages, {round(data.total_duration_hours, 1)}h) "
f"from {data.date_range.get('start', '')} to {data.date_range.get('end', '')}"
)
print(f"What you do: {glance['what_you_do']}")
print(f"What slows you down: {glance['what_slows_you_down']}")
def main() -> int:
args = parse_args()
result = generate_report(args)
print_summary(result)
return 0
if __name__ == "__main__":
raise SystemExit(main())