Remove .codeflash/ from ruff extend-exclude, add per-file ignores for .codeflash/, scripts/, evals/, and plugin/ (benchmark/script patterns like print, eval, magic values). Remove shebangs. Widen pre-commit hooks to check the full repo.
1576 lines
57 KiB
Python
1576 lines
57 KiB
Python
# /// script
|
|
# requires-python = ">=3.11"
|
|
# ///
|
|
"""Portable Gemini CLI usage report generator.
|
|
|
|
This script scans Gemini CLI chat session files under ``~/.gemini`` and builds
|
|
an HTML report plus JSON export.
|
|
|
|
It is intentionally Gemini-specific:
|
|
|
|
1. Session discovery uses Gemini ``session-*.json`` chat files.
|
|
2. Metrics are derived from Gemini message fields like ``thoughts``,
|
|
``tokens``, ``toolCalls``, and ``info``/``error`` messages.
|
|
3. Project scoping matches Gemini project hashes and known project roots, with
|
|
repo-family matching for worktrees and related clones.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import hashlib
|
|
import html
|
|
import json
|
|
import os
|
|
import re
|
|
import shlex
|
|
import subprocess
|
|
from collections import Counter
|
|
from dataclasses import asdict, dataclass, field
|
|
from datetime import datetime, timezone
|
|
from functools import lru_cache
|
|
from pathlib import Path
|
|
from typing import Any
|
|
|
|
LABEL_MAP: dict[str, str] = {
|
|
"debug_investigate": "Debug/Investigate",
|
|
"implement_feature": "Implement Feature",
|
|
"fix_bug": "Fix Bug",
|
|
"write_script_tool": "Write Script/Tool",
|
|
"refactor_code": "Refactor Code",
|
|
"configure_system": "Configure System",
|
|
"create_pr_commit": "Create PR/Commit",
|
|
"analyze_data": "Analyze Data",
|
|
"understand_codebase": "Understand Codebase",
|
|
"write_tests": "Write Tests",
|
|
"write_docs": "Write Docs",
|
|
"manage_email": "Manage Email",
|
|
"warmup_minimal": "Quick Check",
|
|
"completed_cleanly": "Completed Cleanly",
|
|
"completed_with_retries": "Completed With Retries",
|
|
"interrupted": "Interrupted",
|
|
"incomplete": "Incomplete",
|
|
"tool_error": "Tool Error",
|
|
"tool_cancelled": "Tool Cancelled",
|
|
"command_failed": "Command Failed",
|
|
"workspace_boundary": "Workspace Boundary",
|
|
"loop_detected": "Loop Detected",
|
|
"fallback_model": "Fallback Model",
|
|
"malformed_function_call": "Malformed Function Call",
|
|
"run_shell_command": "Shell",
|
|
"read_file": "Read File",
|
|
"read_many_files": "Read Many Files",
|
|
"write_file": "Write File",
|
|
"replace": "Replace",
|
|
"search_file_content": "Search File Content",
|
|
"grep_search": "Grep Search",
|
|
"list_directory": "List Directory",
|
|
"google_web_search": "Google Web Search",
|
|
"activate_skill": "Activate Skill",
|
|
"codebase_investigator": "Codebase Investigator",
|
|
}
|
|
|
|
|
|
GOAL_PATTERNS: dict[str, list[re.Pattern[str]]] = {
|
|
"debug_investigate": [
|
|
re.compile(r"\bdebug\b", re.IGNORECASE),
|
|
re.compile(r"\binvestigat", re.IGNORECASE),
|
|
re.compile(r"\btrace\b", re.IGNORECASE),
|
|
re.compile(r"\berror\b", re.IGNORECASE),
|
|
re.compile(r"\bwhy\b", re.IGNORECASE),
|
|
re.compile(r"\bfail", re.IGNORECASE),
|
|
],
|
|
"implement_feature": [
|
|
re.compile(r"\bimplement\b", re.IGNORECASE),
|
|
re.compile(r"\bbuild\b", re.IGNORECASE),
|
|
re.compile(r"\bfeature\b", re.IGNORECASE),
|
|
re.compile(r"\badd\b", re.IGNORECASE),
|
|
re.compile(r"\bcreate\b", re.IGNORECASE),
|
|
],
|
|
"fix_bug": [
|
|
re.compile(r"\bfix\b", re.IGNORECASE),
|
|
re.compile(r"\bbug\b", re.IGNORECASE),
|
|
re.compile(r"\bbroken\b", re.IGNORECASE),
|
|
re.compile(r"\bfailing\b", re.IGNORECASE),
|
|
],
|
|
"write_script_tool": [
|
|
re.compile(r"\bscript\b", re.IGNORECASE),
|
|
re.compile(r"\bcli\b", re.IGNORECASE),
|
|
re.compile(r"\btool\b", re.IGNORECASE),
|
|
re.compile(r"\bautomation\b", re.IGNORECASE),
|
|
],
|
|
"refactor_code": [
|
|
re.compile(r"\brefactor\b", re.IGNORECASE),
|
|
re.compile(r"\bcleanup\b", re.IGNORECASE),
|
|
re.compile(r"\breorgan", re.IGNORECASE),
|
|
re.compile(r"\bsimplif", re.IGNORECASE),
|
|
],
|
|
"configure_system": [
|
|
re.compile(r"\bconfigure\b", re.IGNORECASE),
|
|
re.compile(r"\bsetup\b", re.IGNORECASE),
|
|
re.compile(r"\binstall\b", re.IGNORECASE),
|
|
re.compile(r"\bconfig\b", re.IGNORECASE),
|
|
re.compile(r"\benv\b", re.IGNORECASE),
|
|
re.compile(r"\bci\b", re.IGNORECASE),
|
|
re.compile(r"\bauth\b", re.IGNORECASE),
|
|
re.compile(r"\blogin\b", re.IGNORECASE),
|
|
],
|
|
"create_pr_commit": [
|
|
re.compile(r"\bcommit\b", re.IGNORECASE),
|
|
re.compile(r"\bpull request\b", re.IGNORECASE),
|
|
re.compile(r"\bpr\b", re.IGNORECASE),
|
|
re.compile(r"\bmerge\b", re.IGNORECASE),
|
|
re.compile(r"\bbranch\b", re.IGNORECASE),
|
|
],
|
|
"analyze_data": [
|
|
re.compile(r"\banaly[sz]e\b", re.IGNORECASE),
|
|
re.compile(r"\bmetrics\b", re.IGNORECASE),
|
|
re.compile(r"\breport\b", re.IGNORECASE),
|
|
re.compile(r"\binsights?\b", re.IGNORECASE),
|
|
re.compile(r"\bdata\b", re.IGNORECASE),
|
|
],
|
|
"understand_codebase": [
|
|
re.compile(r"\bunderstand\b", re.IGNORECASE),
|
|
re.compile(r"\bexplain\b", re.IGNORECASE),
|
|
re.compile(r"\bwalk ?through\b", re.IGNORECASE),
|
|
re.compile(r"\bhow does\b", re.IGNORECASE),
|
|
re.compile(r"\bwhere is\b", re.IGNORECASE),
|
|
re.compile(r"\bfind\b", re.IGNORECASE),
|
|
re.compile(r"\breview\b", re.IGNORECASE),
|
|
],
|
|
"write_tests": [
|
|
re.compile(r"\btests?\b", re.IGNORECASE),
|
|
re.compile(r"\bpytest\b", re.IGNORECASE),
|
|
re.compile(r"\bunit test\b", re.IGNORECASE),
|
|
re.compile(r"\bintegration test\b", re.IGNORECASE),
|
|
re.compile(r"\bbenchmark\b", re.IGNORECASE),
|
|
],
|
|
"write_docs": [
|
|
re.compile(r"\breadme\b", re.IGNORECASE),
|
|
re.compile(r"\bdocs?\b", re.IGNORECASE),
|
|
re.compile(r"\bdocument", re.IGNORECASE),
|
|
],
|
|
"manage_email": [
|
|
re.compile(r"\bgmail\b", re.IGNORECASE),
|
|
re.compile(r"\bemail\b", re.IGNORECASE),
|
|
re.compile(r"\binbox\b", re.IGNORECASE),
|
|
re.compile(r"\bunsubscrib", re.IGNORECASE),
|
|
re.compile(r"\bdeclutter\b", re.IGNORECASE),
|
|
re.compile(r"\bdraft\b", re.IGNORECASE),
|
|
re.compile(r"\bdelete\b", re.IGNORECASE),
|
|
],
|
|
}
|
|
|
|
|
|
FRICTION_DESCRIPTIONS: dict[str, str] = {
|
|
"command_failed": "Shell execution is one of the main sources of retries in these sessions.",
|
|
"tool_error": "File and edit tools are failing often enough to shape the flow.",
|
|
"tool_cancelled": "Some requests are being cancelled before the tool chain lands cleanly.",
|
|
"workspace_boundary": "Gemini is running into workspace boundaries, which slows cross-repo work.",
|
|
"loop_detected": "At least one session tripped Gemini's loop protection.",
|
|
"fallback_model": "Some sessions fell back to a different model midstream.",
|
|
"malformed_function_call": "A malformed function call interrupted at least one request.",
|
|
}
|
|
|
|
|
|
GEMINI_GMAIL_PREFIX = "mcp_google-workspace_gmail."
|
|
WORKSPACE_BOUNDARY_RE = re.compile(
|
|
r"workspace directories|project temp directory", re.IGNORECASE
|
|
)
|
|
HASH_DIR_RE = re.compile(r"^[0-9a-f]{64}$")
|
|
EXIT_CODE_RE = re.compile(r"Exit Code:\s*(-?\d+)")
|
|
TEST_COMMAND_RE = re.compile(
|
|
r"\b(pytest|npm test|pnpm test|yarn test|cargo test|go test|vitest|jest|ruff|mypy|gradle test|mvn test)\b",
|
|
re.IGNORECASE,
|
|
)
|
|
GIT_COMMIT_RE = re.compile(r"(^|[;&|]\s*|\s)git\s+commit\b", re.IGNORECASE)
|
|
GIT_PUSH_RE = re.compile(r"(^|[;&|]\s*|\s)git\s+push\b", re.IGNORECASE)
|
|
GH_RE = re.compile(r"(^|[;&|]\s*|\s)gh\b", re.IGNORECASE)
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class RepoIdentity:
|
|
root: str
|
|
common_dir: str | None
|
|
remotes: frozenset[str]
|
|
worktrees: tuple[str, ...] = ()
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class ProjectScope:
|
|
target_prefix: str | None
|
|
path_prefixes: frozenset[str]
|
|
common_dirs: frozenset[str]
|
|
remotes: frozenset[str]
|
|
project_hashes: frozenset[str]
|
|
|
|
|
|
@dataclass
|
|
class GeminiSession:
|
|
session_id: str
|
|
file_path: str
|
|
start_time: str
|
|
end_time: str
|
|
project_hash: str
|
|
project_root: str
|
|
primary_model: str
|
|
duration_minutes: float
|
|
user_message_count: int
|
|
assistant_message_count: int
|
|
info_message_count: int
|
|
error_message_count: int
|
|
thought_count: int
|
|
tool_call_count: int
|
|
tool_errors: int
|
|
tool_cancellations: int
|
|
shell_command_count: int
|
|
command_failures: int
|
|
web_search_count: int
|
|
mcp_call_count: int
|
|
file_read_count: int
|
|
file_write_count: int
|
|
replace_count: int
|
|
skill_activation_count: int
|
|
workspace_boundary_errors: int
|
|
loop_events: int
|
|
fallback_model_switches: int
|
|
request_cancellations: int
|
|
malformed_function_calls: int
|
|
compressed_context_events: int
|
|
total_input_tokens: int
|
|
total_output_tokens: int
|
|
total_cached_tokens: int
|
|
total_thought_tokens: int
|
|
total_tool_tokens: int
|
|
total_tokens: int
|
|
git_commits: int
|
|
git_pushes: int
|
|
gh_commands: int
|
|
test_commands: int
|
|
models: dict[str, int] = field(default_factory=dict)
|
|
tool_counts: dict[str, int] = field(default_factory=dict)
|
|
command_families: dict[str, int] = field(default_factory=dict)
|
|
goal_categories: dict[str, int] = field(default_factory=dict)
|
|
friction: dict[str, int] = field(default_factory=dict)
|
|
user_messages: list[str] = field(default_factory=list)
|
|
assistant_summaries: list[str] = field(default_factory=list)
|
|
command_failure_examples: list[str] = field(default_factory=list)
|
|
first_prompt: str = ""
|
|
final_answer: str = ""
|
|
outcome: str = ""
|
|
|
|
|
|
@dataclass
|
|
class AggregatedData:
|
|
total_sessions: int
|
|
date_range: dict[str, str]
|
|
total_user_messages: int = 0
|
|
total_assistant_messages: int = 0
|
|
total_duration_hours: float = 0.0
|
|
total_thoughts: int = 0
|
|
total_tool_calls: int = 0
|
|
total_tool_errors: int = 0
|
|
total_tool_cancellations: int = 0
|
|
total_shell_commands: int = 0
|
|
total_command_failures: int = 0
|
|
total_web_searches: int = 0
|
|
total_mcp_calls: int = 0
|
|
total_file_reads: int = 0
|
|
total_file_writes: int = 0
|
|
total_replace_calls: int = 0
|
|
total_skill_activations: int = 0
|
|
total_workspace_boundary_errors: int = 0
|
|
total_loops: int = 0
|
|
total_fallback_model_switches: int = 0
|
|
total_request_cancellations: int = 0
|
|
total_malformed_function_calls: int = 0
|
|
total_compressed_context_events: int = 0
|
|
total_input_tokens: int = 0
|
|
total_output_tokens: int = 0
|
|
total_cached_tokens: int = 0
|
|
total_thought_tokens: int = 0
|
|
total_tool_tokens: int = 0
|
|
total_tokens: int = 0
|
|
git_commits: int = 0
|
|
git_pushes: int = 0
|
|
gh_commands: int = 0
|
|
test_commands: int = 0
|
|
sessions_with_mcp: int = 0
|
|
sessions_with_shell: int = 0
|
|
sessions_with_web_search: int = 0
|
|
sessions_with_skills: int = 0
|
|
tool_counts: dict[str, int] = field(default_factory=dict)
|
|
command_families: dict[str, int] = field(default_factory=dict)
|
|
models: dict[str, int] = field(default_factory=dict)
|
|
projects: dict[str, int] = field(default_factory=dict)
|
|
goal_categories: dict[str, int] = field(default_factory=dict)
|
|
friction: dict[str, int] = field(default_factory=dict)
|
|
outcomes: dict[str, int] = field(default_factory=dict)
|
|
session_summaries: list[dict[str, str]] = field(default_factory=list)
|
|
|
|
|
|
def parse_args() -> argparse.Namespace:
|
|
script_dir = Path(__file__).resolve().parent
|
|
home = Path(os.path.expanduser("~"))
|
|
parser = argparse.ArgumentParser(
|
|
description="Generate a Gemini CLI usage report from ~/.gemini.",
|
|
)
|
|
parser.add_argument(
|
|
"--gemini-dir",
|
|
type=Path,
|
|
default=home / ".gemini",
|
|
help="Gemini CLI home directory containing session chats.",
|
|
)
|
|
parser.add_argument(
|
|
"--cache-dir",
|
|
type=Path,
|
|
default=script_dir / "gemini-insights-output",
|
|
help="Directory for the generated HTML report and JSON export.",
|
|
)
|
|
parser.add_argument(
|
|
"--project-path-prefix",
|
|
type=str,
|
|
default=None,
|
|
help=(
|
|
"Only include sessions whose project root matches this path, a descendant, "
|
|
"or a related repo/worktree/fork in the same repo family."
|
|
),
|
|
)
|
|
parser.add_argument(
|
|
"--output-html",
|
|
type=Path,
|
|
default=None,
|
|
help="Path for the generated HTML report. Defaults to <cache-dir>/report.html.",
|
|
)
|
|
parser.add_argument(
|
|
"--output-json",
|
|
type=Path,
|
|
default=None,
|
|
help="Path for the JSON export. Defaults to <cache-dir>/report.json.",
|
|
)
|
|
return parser.parse_args()
|
|
|
|
|
|
def ensure_dir(path: Path) -> None:
|
|
path.mkdir(parents=True, exist_ok=True)
|
|
|
|
|
|
def parse_iso_timestamp(value: str | None) -> datetime:
|
|
if not value:
|
|
return datetime.fromtimestamp(0, tz=timezone.utc)
|
|
normalized = value
|
|
if normalized.endswith("Z"):
|
|
normalized = normalized[:-1] + "+00:00"
|
|
try:
|
|
return datetime.fromisoformat(normalized)
|
|
except ValueError:
|
|
return datetime.fromtimestamp(0, tz=timezone.utc)
|
|
|
|
|
|
def iso_date(value: str) -> str:
|
|
return parse_iso_timestamp(value).date().isoformat()
|
|
|
|
|
|
def truncate(text: str, length: int) -> str:
|
|
collapsed = " ".join(text.split())
|
|
if len(collapsed) <= length:
|
|
return collapsed
|
|
return collapsed[: max(0, length - 1)].rstrip() + "..."
|
|
|
|
|
|
def safe_title(value: str) -> str:
|
|
return LABEL_MAP.get(value, value.replace("_", " ").title())
|
|
|
|
|
|
def normalize_path_for_match(path_text: str) -> str:
|
|
return os.path.normpath(os.path.realpath(os.path.expanduser(path_text)))
|
|
|
|
|
|
def path_hash_variants(path_text: str) -> set[str]:
|
|
expanded = os.path.normpath(os.path.expanduser(path_text))
|
|
normalized = normalize_path_for_match(path_text)
|
|
variants = {expanded, normalized}
|
|
return {
|
|
hashlib.sha256(value.encode("utf-8")).hexdigest()
|
|
for value in variants
|
|
if value
|
|
}
|
|
|
|
|
|
def run_git(args: list[str], cwd: str) -> str | None:
|
|
try:
|
|
completed = subprocess.run(
|
|
["git", *args],
|
|
cwd=cwd,
|
|
check=True,
|
|
stdout=subprocess.PIPE,
|
|
stderr=subprocess.DEVNULL,
|
|
text=True,
|
|
)
|
|
except Exception:
|
|
return None
|
|
return completed.stdout.strip()
|
|
|
|
|
|
@lru_cache(maxsize=512)
|
|
def get_repo_identity(path_text: str) -> RepoIdentity | None:
|
|
normalized = normalize_path_for_match(path_text)
|
|
top_level = run_git(["rev-parse", "--show-toplevel"], normalized)
|
|
if not top_level:
|
|
return None
|
|
|
|
common_dir = run_git(["rev-parse", "--git-common-dir"], normalized)
|
|
if common_dir and not os.path.isabs(common_dir):
|
|
common_dir = normalize_path_for_match(
|
|
os.path.join(normalized, common_dir)
|
|
)
|
|
elif common_dir:
|
|
common_dir = normalize_path_for_match(common_dir)
|
|
|
|
remote_lines = run_git(["remote", "-v"], normalized) or ""
|
|
remotes: set[str] = set()
|
|
for line in remote_lines.splitlines():
|
|
parts = line.split()
|
|
if len(parts) >= 2:
|
|
remotes.add(parts[1].removesuffix(".git"))
|
|
|
|
worktree_text = (
|
|
run_git(["worktree", "list", "--porcelain"], normalized) or ""
|
|
)
|
|
worktrees: list[str] = []
|
|
for line in worktree_text.splitlines():
|
|
if line.startswith("worktree "):
|
|
worktrees.append(
|
|
normalize_path_for_match(
|
|
line.removeprefix("worktree ").strip()
|
|
)
|
|
)
|
|
|
|
return RepoIdentity(
|
|
root=normalize_path_for_match(top_level),
|
|
common_dir=common_dir,
|
|
remotes=frozenset(remotes),
|
|
worktrees=tuple(worktrees),
|
|
)
|
|
|
|
|
|
def discover_git_roots(base_path: Path) -> set[str]:
|
|
discovered: set[str] = set()
|
|
if not base_path.exists():
|
|
return discovered
|
|
|
|
skip_dirs = {
|
|
".git",
|
|
".venv",
|
|
"node_modules",
|
|
"__pycache__",
|
|
".pytest_cache",
|
|
".ruff_cache",
|
|
}
|
|
for root, dirs, files in os.walk(base_path):
|
|
dirs[:] = [entry for entry in dirs if entry not in skip_dirs]
|
|
if ".git" in dirs or ".git" in files:
|
|
discovered.add(normalize_path_for_match(root))
|
|
dirs[:] = []
|
|
return discovered
|
|
|
|
|
|
def build_project_scope(prefix: str | None) -> ProjectScope:
|
|
if not prefix:
|
|
return ProjectScope(
|
|
target_prefix=None,
|
|
path_prefixes=frozenset(),
|
|
common_dirs=frozenset(),
|
|
remotes=frozenset(),
|
|
project_hashes=frozenset(),
|
|
)
|
|
|
|
normalized_prefix = normalize_path_for_match(prefix)
|
|
path_prefixes: set[str] = {normalized_prefix}
|
|
common_dirs: set[str] = set()
|
|
remotes: set[str] = set()
|
|
|
|
candidate_roots = discover_git_roots(Path(normalized_prefix))
|
|
direct_identity = get_repo_identity(normalized_prefix)
|
|
if direct_identity:
|
|
candidate_roots.add(direct_identity.root)
|
|
|
|
for repo_root in candidate_roots:
|
|
identity = get_repo_identity(repo_root)
|
|
if not identity:
|
|
continue
|
|
path_prefixes.add(identity.root)
|
|
path_prefixes.update(identity.worktrees)
|
|
if identity.common_dir:
|
|
common_dirs.add(identity.common_dir)
|
|
remotes.update(identity.remotes)
|
|
|
|
project_hashes: set[str] = set()
|
|
for path_prefix in path_prefixes:
|
|
project_hashes.update(path_hash_variants(path_prefix))
|
|
|
|
return ProjectScope(
|
|
target_prefix=normalized_prefix,
|
|
path_prefixes=frozenset(path_prefixes),
|
|
common_dirs=frozenset(common_dirs),
|
|
remotes=frozenset(remotes),
|
|
project_hashes=frozenset(project_hashes),
|
|
)
|
|
|
|
|
|
def matches_project_scope(
|
|
project_root: str, project_hash: str, scope: ProjectScope
|
|
) -> bool:
|
|
if scope.target_prefix is None:
|
|
return True
|
|
|
|
if project_root:
|
|
normalized_root = normalize_path_for_match(project_root)
|
|
for prefix in scope.path_prefixes:
|
|
if normalized_root == prefix or normalized_root.startswith(
|
|
prefix + os.sep
|
|
):
|
|
return True
|
|
|
|
identity = get_repo_identity(normalized_root)
|
|
if identity:
|
|
if (
|
|
identity.common_dir
|
|
and identity.common_dir in scope.common_dirs
|
|
):
|
|
return True
|
|
if scope.remotes and identity.remotes.intersection(scope.remotes):
|
|
return True
|
|
|
|
return bool(project_hash and project_hash in scope.project_hashes)
|
|
|
|
|
|
def extract_message_text(
|
|
content: Any, display_content: Any | None = None
|
|
) -> str:
|
|
def extract(value: Any) -> str:
|
|
if isinstance(value, str):
|
|
return value.strip()
|
|
if isinstance(value, list):
|
|
parts: list[str] = []
|
|
for item in value:
|
|
if isinstance(item, str) and item.strip():
|
|
parts.append(item.strip())
|
|
continue
|
|
if not isinstance(item, dict):
|
|
continue
|
|
text = item.get("text")
|
|
if isinstance(text, str) and text.strip():
|
|
parts.append(text.strip())
|
|
return "\n".join(parts)
|
|
return ""
|
|
|
|
direct = extract(content)
|
|
if direct:
|
|
return direct
|
|
return extract(display_content)
|
|
|
|
|
|
def is_env_assignment(token: str) -> bool:
|
|
return bool(re.match(r"^[A-Za-z_][A-Za-z0-9_]*=.*$", token))
|
|
|
|
|
|
def command_tokens(command_text: str) -> list[str]:
|
|
if not command_text.strip():
|
|
return []
|
|
try:
|
|
tokens = shlex.split(command_text)
|
|
except ValueError:
|
|
tokens = command_text.split()
|
|
while tokens and is_env_assignment(tokens[0]):
|
|
tokens.pop(0)
|
|
return tokens
|
|
|
|
|
|
def command_family(command_text: str) -> str:
|
|
tokens = command_tokens(command_text)
|
|
if not tokens:
|
|
return ""
|
|
return Path(tokens[0]).name
|
|
|
|
|
|
def extract_tool_result_output(tool: dict[str, Any]) -> str:
|
|
result = tool.get("result")
|
|
if not isinstance(result, list):
|
|
return ""
|
|
|
|
parts: list[str] = []
|
|
for item in result:
|
|
if not isinstance(item, dict):
|
|
continue
|
|
function_response = item.get("functionResponse")
|
|
if not isinstance(function_response, dict):
|
|
continue
|
|
response = function_response.get("response")
|
|
if not isinstance(response, dict):
|
|
continue
|
|
output = response.get("output")
|
|
if isinstance(output, str) and output:
|
|
parts.append(output)
|
|
return "\n".join(parts)
|
|
|
|
|
|
def extract_tool_text_blob(tool: dict[str, Any]) -> str:
|
|
pieces = [
|
|
str(tool.get("resultDisplay") or "").strip(),
|
|
extract_tool_result_output(tool).strip(),
|
|
]
|
|
return "\n".join(piece for piece in pieces if piece)
|
|
|
|
|
|
def extract_shell_exit_code(tool: dict[str, Any]) -> int | None:
|
|
raw_output = extract_tool_result_output(tool)
|
|
match = EXIT_CODE_RE.search(raw_output)
|
|
if not match:
|
|
return None
|
|
try:
|
|
return int(match.group(1))
|
|
except ValueError:
|
|
return None
|
|
|
|
|
|
def estimate_active_minutes(
|
|
messages: list[dict[str, Any]],
|
|
start_time: str,
|
|
end_time: str,
|
|
idle_cap_minutes: float = 15.0,
|
|
) -> float:
|
|
timestamps: list[datetime] = []
|
|
for message in messages:
|
|
if not isinstance(message, dict):
|
|
continue
|
|
timestamp = parse_iso_timestamp(str(message.get("timestamp") or ""))
|
|
if timestamp.timestamp() > 0:
|
|
timestamps.append(timestamp)
|
|
|
|
if not timestamps:
|
|
start_dt = parse_iso_timestamp(start_time)
|
|
end_dt = parse_iso_timestamp(end_time or start_time)
|
|
return max(0.0, (end_dt - start_dt).total_seconds() / 60.0)
|
|
|
|
timestamps.sort()
|
|
total_minutes = 0.0
|
|
previous = timestamps[0]
|
|
for current in timestamps[1:]:
|
|
gap_minutes = max(0.0, (current - previous).total_seconds() / 60.0)
|
|
total_minutes += min(gap_minutes, idle_cap_minutes)
|
|
previous = current
|
|
|
|
return max(1.0, total_minutes)
|
|
|
|
|
|
def session_candidate_key(
|
|
session_path: Path, payload: dict[str, Any]
|
|
) -> tuple[int, float, int, int]:
|
|
message_count = (
|
|
len(payload.get("messages", []))
|
|
if isinstance(payload.get("messages"), list)
|
|
else 0
|
|
)
|
|
last_updated = parse_iso_timestamp(
|
|
str(payload.get("lastUpdated") or payload.get("startTime") or "")
|
|
)
|
|
parent_name = (
|
|
session_path.parent.parent.name
|
|
if session_path.parent.name == "chats"
|
|
else session_path.parent.name
|
|
)
|
|
named_bonus = 1 if not HASH_DIR_RE.fullmatch(parent_name) else 0
|
|
return (
|
|
message_count,
|
|
last_updated.timestamp(),
|
|
named_bonus,
|
|
-len(str(session_path)),
|
|
)
|
|
|
|
|
|
def load_project_root_hashes(gemini_dir: Path) -> dict[str, str]:
|
|
project_roots: dict[str, str] = {}
|
|
for root_file in gemini_dir.rglob(".project_root"):
|
|
try:
|
|
raw_root = root_file.read_text(encoding="utf-8").strip()
|
|
except Exception:
|
|
continue
|
|
if not raw_root:
|
|
continue
|
|
normalized_root = normalize_path_for_match(raw_root)
|
|
for project_hash in path_hash_variants(raw_root):
|
|
project_roots[project_hash] = normalized_root
|
|
return project_roots
|
|
|
|
|
|
def resolve_project_root(
|
|
session_path: Path,
|
|
project_hash: str,
|
|
project_root_hashes: dict[str, str],
|
|
) -> str:
|
|
if session_path.parent.name == "chats":
|
|
candidate = session_path.parent.parent / ".project_root"
|
|
if candidate.exists():
|
|
try:
|
|
raw_root = candidate.read_text(encoding="utf-8").strip()
|
|
except Exception:
|
|
raw_root = ""
|
|
if raw_root:
|
|
return normalize_path_for_match(raw_root)
|
|
return project_root_hashes.get(project_hash, "")
|
|
|
|
|
|
def is_gmail_tool(tool_name: str) -> bool:
|
|
return tool_name.startswith(GEMINI_GMAIL_PREFIX)
|
|
|
|
|
|
def int_value(value: Any) -> int:
|
|
try:
|
|
return int(value)
|
|
except Exception:
|
|
return 0
|
|
|
|
|
|
def detect_goal_categories(session: GeminiSession) -> dict[str, int]:
|
|
counts: Counter[str] = Counter()
|
|
if session.user_message_count <= 1 and session.tool_call_count <= 1:
|
|
return {"warmup_minimal": 1}
|
|
|
|
for text in session.user_messages:
|
|
for category, patterns in GOAL_PATTERNS.items():
|
|
if any(pattern.search(text) for pattern in patterns):
|
|
counts[category] += 1
|
|
|
|
if any(is_gmail_tool(name) for name in session.tool_counts):
|
|
counts["manage_email"] += 1
|
|
if session.test_commands > 0:
|
|
counts["write_tests"] += 1
|
|
if (
|
|
session.git_commits > 0
|
|
or session.gh_commands > 0
|
|
or session.git_pushes > 0
|
|
):
|
|
counts["create_pr_commit"] += 1
|
|
if session.file_write_count > 0 or session.replace_count > 0:
|
|
counts["implement_feature"] += int(not counts)
|
|
if session.web_search_count > 0 and not counts:
|
|
counts["understand_codebase"] += 1
|
|
if session.command_failures > 0 and not counts:
|
|
counts["debug_investigate"] += 1
|
|
if session.shell_command_count > 0 and not counts:
|
|
counts["understand_codebase"] += 1
|
|
if not counts:
|
|
counts["warmup_minimal"] += 1
|
|
return dict(counts)
|
|
|
|
|
|
def detect_friction(session: GeminiSession) -> dict[str, int]:
|
|
counts: Counter[str] = Counter()
|
|
if session.command_failures > 0:
|
|
counts["command_failed"] += session.command_failures
|
|
if session.tool_errors > 0:
|
|
counts["tool_error"] += session.tool_errors
|
|
if session.tool_cancellations > 0 or session.request_cancellations > 0:
|
|
counts["tool_cancelled"] += (
|
|
session.tool_cancellations + session.request_cancellations
|
|
)
|
|
if session.workspace_boundary_errors > 0:
|
|
counts["workspace_boundary"] += session.workspace_boundary_errors
|
|
if session.loop_events > 0:
|
|
counts["loop_detected"] += session.loop_events
|
|
if session.fallback_model_switches > 0:
|
|
counts["fallback_model"] += session.fallback_model_switches
|
|
if session.malformed_function_calls > 0:
|
|
counts["malformed_function_call"] += session.malformed_function_calls
|
|
return dict(counts)
|
|
|
|
|
|
def infer_outcome(session: GeminiSession) -> str:
|
|
if session.loop_events > 0 or session.malformed_function_calls > 0:
|
|
return "interrupted"
|
|
if (
|
|
session.assistant_message_count > 0
|
|
and session.command_failures == 0
|
|
and session.tool_errors == 0
|
|
and session.error_message_count == 0
|
|
and session.request_cancellations == 0
|
|
and session.tool_cancellations == 0
|
|
):
|
|
return "completed_cleanly"
|
|
if session.assistant_message_count > 0:
|
|
return "completed_with_retries"
|
|
if session.request_cancellations > 0 or session.tool_cancellations > 0:
|
|
return "interrupted"
|
|
return "incomplete"
|
|
|
|
|
|
def summarize_assistant(session: GeminiSession) -> str:
|
|
if session.final_answer:
|
|
return truncate(session.final_answer, 140)
|
|
if session.assistant_summaries:
|
|
return truncate(session.assistant_summaries[-1], 140)
|
|
if session.first_prompt:
|
|
return truncate(session.first_prompt, 140)
|
|
return "No assistant summary captured."
|
|
|
|
|
|
def parse_session_data(
|
|
session_path: Path,
|
|
payload: dict[str, Any],
|
|
scope: ProjectScope,
|
|
project_root_hashes: dict[str, str],
|
|
) -> GeminiSession | None:
|
|
session_id = str(payload.get("sessionId") or session_path.stem)
|
|
project_hash = str(payload.get("projectHash") or "")
|
|
project_root = resolve_project_root(
|
|
session_path, project_hash, project_root_hashes
|
|
)
|
|
if not matches_project_scope(project_root, project_hash, scope):
|
|
return None
|
|
|
|
user_messages: list[str] = []
|
|
assistant_summaries: list[str] = []
|
|
command_failure_examples: list[str] = []
|
|
tool_counts: Counter[str] = Counter()
|
|
command_families: Counter[str] = Counter()
|
|
model_counts: Counter[str] = Counter()
|
|
|
|
assistant_message_count = 0
|
|
info_message_count = 0
|
|
error_message_count = 0
|
|
thought_count = 0
|
|
tool_call_count = 0
|
|
tool_errors = 0
|
|
tool_cancellations = 0
|
|
shell_command_count = 0
|
|
command_failures = 0
|
|
web_search_count = 0
|
|
mcp_call_count = 0
|
|
file_read_count = 0
|
|
file_write_count = 0
|
|
replace_count = 0
|
|
skill_activation_count = 0
|
|
workspace_boundary_errors = 0
|
|
loop_events = 0
|
|
fallback_model_switches = 0
|
|
request_cancellations = 0
|
|
malformed_function_calls = 0
|
|
compressed_context_events = 0
|
|
total_input_tokens = 0
|
|
total_output_tokens = 0
|
|
total_cached_tokens = 0
|
|
total_thought_tokens = 0
|
|
total_tool_tokens = 0
|
|
total_tokens = 0
|
|
git_commits = 0
|
|
git_pushes = 0
|
|
gh_commands = 0
|
|
test_commands = 0
|
|
final_answer = ""
|
|
|
|
messages = payload.get("messages")
|
|
if not isinstance(messages, list):
|
|
messages = []
|
|
|
|
for message in messages:
|
|
if not isinstance(message, dict):
|
|
continue
|
|
|
|
message_type = str(message.get("type") or "")
|
|
message_text = extract_message_text(
|
|
message.get("content"), message.get("displayContent")
|
|
)
|
|
|
|
if message_type == "user":
|
|
if message_text:
|
|
user_messages.append(message_text)
|
|
continue
|
|
|
|
if message_type == "gemini":
|
|
if message_text:
|
|
assistant_message_count += 1
|
|
final_answer = message_text
|
|
if len(assistant_summaries) < 5:
|
|
assistant_summaries.append(message_text)
|
|
|
|
thoughts = message.get("thoughts")
|
|
if isinstance(thoughts, list):
|
|
thought_count += sum(
|
|
1 for item in thoughts if isinstance(item, dict)
|
|
)
|
|
|
|
tokens = message.get("tokens")
|
|
if isinstance(tokens, dict):
|
|
total_input_tokens += int_value(tokens.get("input"))
|
|
total_output_tokens += int_value(tokens.get("output"))
|
|
total_cached_tokens += int_value(tokens.get("cached"))
|
|
total_thought_tokens += int_value(tokens.get("thoughts"))
|
|
total_tool_tokens += int_value(tokens.get("tool"))
|
|
total_tokens += int_value(tokens.get("total"))
|
|
|
|
model = str(message.get("model") or "")
|
|
if model:
|
|
model_counts[model] += 1
|
|
|
|
tool_calls = message.get("toolCalls")
|
|
if not isinstance(tool_calls, list):
|
|
continue
|
|
|
|
tool_call_count += len(tool_calls)
|
|
for tool in tool_calls:
|
|
if not isinstance(tool, dict):
|
|
continue
|
|
|
|
name = str(tool.get("name") or "")
|
|
if name:
|
|
tool_counts[name] += 1
|
|
|
|
status = str(tool.get("status") or "")
|
|
if status == "error":
|
|
tool_errors += 1
|
|
elif status == "cancelled":
|
|
tool_cancellations += 1
|
|
|
|
tool_blob = extract_tool_text_blob(tool)
|
|
if WORKSPACE_BOUNDARY_RE.search(tool_blob):
|
|
workspace_boundary_errors += 1
|
|
|
|
if name.startswith("mcp_"):
|
|
mcp_call_count += 1
|
|
if name == "google_web_search":
|
|
web_search_count += 1
|
|
if name in {"read_file", "read_many_files"}:
|
|
file_read_count += 1
|
|
if name == "write_file":
|
|
file_write_count += 1
|
|
if name == "replace":
|
|
replace_count += 1
|
|
if name == "activate_skill":
|
|
skill_activation_count += 1
|
|
|
|
if name != "run_shell_command":
|
|
continue
|
|
|
|
shell_command_count += 1
|
|
args = tool.get("args")
|
|
command_text = ""
|
|
if isinstance(args, dict):
|
|
command_text = str(args.get("command") or "")
|
|
|
|
if command_text:
|
|
family = command_family(command_text)
|
|
if family:
|
|
command_families[family] += 1
|
|
|
|
lowered = command_text.lower()
|
|
if GIT_COMMIT_RE.search(command_text):
|
|
git_commits += 1
|
|
if GIT_PUSH_RE.search(command_text):
|
|
git_pushes += 1
|
|
if GH_RE.search(command_text):
|
|
gh_commands += 1
|
|
if TEST_COMMAND_RE.search(lowered):
|
|
test_commands += 1
|
|
|
|
exit_code = extract_shell_exit_code(tool)
|
|
if status == "error" or (
|
|
exit_code is not None and exit_code != 0
|
|
):
|
|
command_failures += 1
|
|
if command_text and len(command_failure_examples) < 5:
|
|
command_failure_examples.append(
|
|
truncate(command_text, 120)
|
|
)
|
|
continue
|
|
|
|
if message_type == "info":
|
|
info_message_count += 1
|
|
lowered = message_text.lower()
|
|
if "request cancelled" in lowered:
|
|
request_cancellations += 1
|
|
if "loop was detected" in lowered:
|
|
loop_events += 1
|
|
if "fallback model" in lowered:
|
|
fallback_model_switches += 1
|
|
if "malformed function call" in lowered:
|
|
malformed_function_calls += 1
|
|
if "compressed context" in lowered:
|
|
compressed_context_events += 1
|
|
continue
|
|
|
|
if message_type == "error":
|
|
error_message_count += 1
|
|
if message_text and len(command_failure_examples) < 5:
|
|
command_failure_examples.append(truncate(message_text, 120))
|
|
|
|
start_dt = parse_iso_timestamp(str(payload.get("startTime") or ""))
|
|
end_dt = parse_iso_timestamp(
|
|
str(payload.get("lastUpdated") or payload.get("startTime") or "")
|
|
)
|
|
duration_minutes = estimate_active_minutes(
|
|
messages,
|
|
str(payload.get("startTime") or ""),
|
|
str(payload.get("lastUpdated") or payload.get("startTime") or ""),
|
|
)
|
|
|
|
primary_model = ""
|
|
top_models = sorted(
|
|
model_counts.items(), key=lambda item: item[1], reverse=True
|
|
)
|
|
if top_models:
|
|
primary_model = top_models[0][0]
|
|
|
|
session = GeminiSession(
|
|
session_id=session_id,
|
|
file_path=str(session_path),
|
|
start_time=start_dt.isoformat(),
|
|
end_time=end_dt.isoformat(),
|
|
project_hash=project_hash,
|
|
project_root=project_root,
|
|
primary_model=primary_model,
|
|
duration_minutes=duration_minutes,
|
|
user_message_count=len(user_messages),
|
|
assistant_message_count=assistant_message_count,
|
|
info_message_count=info_message_count,
|
|
error_message_count=error_message_count,
|
|
thought_count=thought_count,
|
|
tool_call_count=tool_call_count,
|
|
tool_errors=tool_errors,
|
|
tool_cancellations=tool_cancellations,
|
|
shell_command_count=shell_command_count,
|
|
command_failures=command_failures,
|
|
web_search_count=web_search_count,
|
|
mcp_call_count=mcp_call_count,
|
|
file_read_count=file_read_count,
|
|
file_write_count=file_write_count,
|
|
replace_count=replace_count,
|
|
skill_activation_count=skill_activation_count,
|
|
workspace_boundary_errors=workspace_boundary_errors,
|
|
loop_events=loop_events,
|
|
fallback_model_switches=fallback_model_switches,
|
|
request_cancellations=request_cancellations,
|
|
malformed_function_calls=malformed_function_calls,
|
|
compressed_context_events=compressed_context_events,
|
|
total_input_tokens=total_input_tokens,
|
|
total_output_tokens=total_output_tokens,
|
|
total_cached_tokens=total_cached_tokens,
|
|
total_thought_tokens=total_thought_tokens,
|
|
total_tool_tokens=total_tool_tokens,
|
|
total_tokens=total_tokens,
|
|
git_commits=git_commits,
|
|
git_pushes=git_pushes,
|
|
gh_commands=gh_commands,
|
|
test_commands=test_commands,
|
|
models=dict(model_counts),
|
|
tool_counts=dict(tool_counts),
|
|
command_families=dict(command_families),
|
|
user_messages=user_messages,
|
|
assistant_summaries=[
|
|
truncate(text, 160) for text in assistant_summaries
|
|
],
|
|
command_failure_examples=command_failure_examples,
|
|
first_prompt=truncate(user_messages[0], 160) if user_messages else "",
|
|
final_answer=truncate(final_answer, 800) if final_answer else "",
|
|
)
|
|
session.goal_categories = detect_goal_categories(session)
|
|
session.friction = detect_friction(session)
|
|
session.outcome = infer_outcome(session)
|
|
return session
|
|
|
|
|
|
def scan_all_sessions(
|
|
gemini_dir: Path, scope: ProjectScope
|
|
) -> list[GeminiSession]:
|
|
project_root_hashes = load_project_root_hashes(gemini_dir)
|
|
|
|
best_payloads: dict[
|
|
str, tuple[Path, dict[str, Any], tuple[int, float, int, int]]
|
|
] = {}
|
|
for session_file in gemini_dir.rglob("session-*.json"):
|
|
try:
|
|
payload = json.loads(session_file.read_text(encoding="utf-8"))
|
|
except Exception:
|
|
continue
|
|
if not isinstance(payload, dict):
|
|
continue
|
|
|
|
session_id = str(payload.get("sessionId") or session_file.stem)
|
|
key = session_candidate_key(session_file, payload)
|
|
existing = best_payloads.get(session_id)
|
|
if existing is None or key > existing[2]:
|
|
best_payloads[session_id] = (session_file, payload, key)
|
|
|
|
sessions: list[GeminiSession] = []
|
|
for session_file, payload, _key in best_payloads.values():
|
|
parsed = parse_session_data(
|
|
session_file, payload, scope, project_root_hashes
|
|
)
|
|
if parsed is not None:
|
|
sessions.append(parsed)
|
|
|
|
sessions.sort(key=lambda item: item.start_time, reverse=True)
|
|
return sessions
|
|
|
|
|
|
def top_entries(
|
|
data: dict[str, int], limit: int = 5, exclude: set[str] | None = None
|
|
) -> list[tuple[str, int]]:
|
|
blocked = exclude or set()
|
|
return [
|
|
(key, value)
|
|
for key, value in sorted(
|
|
data.items(), key=lambda item: item[1], reverse=True
|
|
)
|
|
if value > 0 and key not in blocked
|
|
][:limit]
|
|
|
|
|
|
def project_key(project_root: str, project_hash: str) -> str:
|
|
if project_root:
|
|
return project_root
|
|
if project_hash:
|
|
return f"hash:{project_hash}"
|
|
return "(unknown)"
|
|
|
|
|
|
def project_label(value: str) -> str:
|
|
if value.startswith("hash:"):
|
|
return f"project {value.removeprefix('hash:')[:10]}"
|
|
parts = Path(value).parts
|
|
if len(parts) >= 2:
|
|
return "/".join(parts[-2:])
|
|
if parts:
|
|
return parts[-1]
|
|
return value or "(unknown)"
|
|
|
|
|
|
def aggregate_sessions(sessions: list[GeminiSession]) -> AggregatedData:
|
|
aggregated = AggregatedData(
|
|
total_sessions=len(sessions),
|
|
date_range={"start": "", "end": ""},
|
|
)
|
|
|
|
dates: list[str] = []
|
|
for session in sessions:
|
|
dates.append(session.start_time)
|
|
aggregated.total_user_messages += session.user_message_count
|
|
aggregated.total_assistant_messages += session.assistant_message_count
|
|
aggregated.total_duration_hours += session.duration_minutes / 60.0
|
|
aggregated.total_thoughts += session.thought_count
|
|
aggregated.total_tool_calls += session.tool_call_count
|
|
aggregated.total_tool_errors += session.tool_errors
|
|
aggregated.total_tool_cancellations += session.tool_cancellations
|
|
aggregated.total_shell_commands += session.shell_command_count
|
|
aggregated.total_command_failures += session.command_failures
|
|
aggregated.total_web_searches += session.web_search_count
|
|
aggregated.total_mcp_calls += session.mcp_call_count
|
|
aggregated.total_file_reads += session.file_read_count
|
|
aggregated.total_file_writes += session.file_write_count
|
|
aggregated.total_replace_calls += session.replace_count
|
|
aggregated.total_skill_activations += session.skill_activation_count
|
|
aggregated.total_workspace_boundary_errors += (
|
|
session.workspace_boundary_errors
|
|
)
|
|
aggregated.total_loops += session.loop_events
|
|
aggregated.total_fallback_model_switches += (
|
|
session.fallback_model_switches
|
|
)
|
|
aggregated.total_request_cancellations += session.request_cancellations
|
|
aggregated.total_malformed_function_calls += (
|
|
session.malformed_function_calls
|
|
)
|
|
aggregated.total_compressed_context_events += (
|
|
session.compressed_context_events
|
|
)
|
|
aggregated.total_input_tokens += session.total_input_tokens
|
|
aggregated.total_output_tokens += session.total_output_tokens
|
|
aggregated.total_cached_tokens += session.total_cached_tokens
|
|
aggregated.total_thought_tokens += session.total_thought_tokens
|
|
aggregated.total_tool_tokens += session.total_tool_tokens
|
|
aggregated.total_tokens += session.total_tokens
|
|
aggregated.git_commits += session.git_commits
|
|
aggregated.git_pushes += session.git_pushes
|
|
aggregated.gh_commands += session.gh_commands
|
|
aggregated.test_commands += session.test_commands
|
|
aggregated.sessions_with_mcp += int(session.mcp_call_count > 0)
|
|
aggregated.sessions_with_shell += int(session.shell_command_count > 0)
|
|
aggregated.sessions_with_web_search += int(
|
|
session.web_search_count > 0
|
|
)
|
|
aggregated.sessions_with_skills += int(
|
|
session.skill_activation_count > 0
|
|
)
|
|
|
|
aggregated.projects[
|
|
project_key(session.project_root, session.project_hash)
|
|
] = (
|
|
aggregated.projects.get(
|
|
project_key(session.project_root, session.project_hash), 0
|
|
)
|
|
+ 1
|
|
)
|
|
aggregated.outcomes[session.outcome] = (
|
|
aggregated.outcomes.get(session.outcome, 0) + 1
|
|
)
|
|
|
|
for key, count in session.models.items():
|
|
aggregated.models[key] = aggregated.models.get(key, 0) + count
|
|
for key, count in session.tool_counts.items():
|
|
aggregated.tool_counts[key] = (
|
|
aggregated.tool_counts.get(key, 0) + count
|
|
)
|
|
for key, count in session.command_families.items():
|
|
aggregated.command_families[key] = (
|
|
aggregated.command_families.get(key, 0) + count
|
|
)
|
|
for key, count in session.goal_categories.items():
|
|
aggregated.goal_categories[key] = (
|
|
aggregated.goal_categories.get(key, 0) + count
|
|
)
|
|
for key, count in session.friction.items():
|
|
aggregated.friction[key] = aggregated.friction.get(key, 0) + count
|
|
|
|
if len(aggregated.session_summaries) < 50:
|
|
aggregated.session_summaries.append(
|
|
{
|
|
"id": session.session_id[:8],
|
|
"date": iso_date(session.start_time),
|
|
"project": project_label(
|
|
project_key(session.project_root, session.project_hash)
|
|
),
|
|
"prompt": session.first_prompt,
|
|
"summary": summarize_assistant(session),
|
|
"outcome": session.outcome,
|
|
"failures": str(
|
|
session.command_failures + session.tool_errors
|
|
),
|
|
}
|
|
)
|
|
|
|
if dates:
|
|
dates.sort()
|
|
aggregated.date_range["start"] = iso_date(dates[0])
|
|
aggregated.date_range["end"] = iso_date(dates[-1])
|
|
|
|
return aggregated
|
|
|
|
|
|
def build_at_a_glance(data: AggregatedData) -> dict[str, str]:
|
|
top_goal = top_entries(
|
|
data.goal_categories, limit=1, exclude={"warmup_minimal"}
|
|
)
|
|
top_project = top_entries(data.projects, limit=1)
|
|
top_tool = top_entries(data.tool_counts, limit=3)
|
|
top_command = top_entries(data.command_families, limit=3)
|
|
top_friction = top_entries(data.friction, limit=1)
|
|
|
|
work_text = "This Gemini slice spans several different kinds of work."
|
|
if top_goal:
|
|
work_text = f"Most Gemini work in this slice is about {safe_title(top_goal[0][0]).lower()}."
|
|
if top_project:
|
|
work_text += f" The most common project is {project_label(top_project[0][0])}."
|
|
|
|
workflow_text = (
|
|
"Your workflow mixes chat steering, file tools, and shell execution."
|
|
)
|
|
if data.total_shell_commands >= max(10, data.total_sessions * 5):
|
|
workflow_text = "This is a terminal-first Gemini workflow: the shell is doing most of the heavy lifting."
|
|
elif data.total_file_reads + data.total_replace_calls > max(
|
|
12, data.total_shell_commands
|
|
):
|
|
workflow_text = "This is a file-tool-heavy Gemini workflow: reading and patching files matters more than driving the shell."
|
|
if data.sessions_with_skills > 0:
|
|
workflow_text += f" Skills are activated in {data.sessions_with_skills} session(s), so the flow is not purely generic chat."
|
|
|
|
external_text = "External context is limited."
|
|
if data.total_mcp_calls > 0:
|
|
external_text = "These sessions do use MCP-backed context rather than relying only on shell and file tools."
|
|
elif data.total_web_searches > 0:
|
|
external_text = "External context comes mostly from Google web search."
|
|
|
|
friction_text = "Measured friction is low."
|
|
if top_friction:
|
|
friction_key = top_friction[0][0]
|
|
friction_text = FRICTION_DESCRIPTIONS.get(
|
|
friction_key,
|
|
f"The main drag is {safe_title(friction_key).lower()}.",
|
|
)
|
|
|
|
tools_text = "Top tools are mixed."
|
|
if top_tool or top_command:
|
|
tool_names = ", ".join(
|
|
safe_title(name) for name, _count in top_tool[:2]
|
|
)
|
|
command_names = ", ".join(name for name, _count in top_command[:2])
|
|
bits = [part for part in (tool_names, command_names) if part]
|
|
if bits:
|
|
tools_text = f"The most-used levers are {bits[0]}"
|
|
if len(bits) > 1:
|
|
tools_text += f", with command families like {bits[1]}"
|
|
tools_text += "."
|
|
|
|
return {
|
|
"what_you_do": work_text,
|
|
"how_you_work": workflow_text,
|
|
"external_context": external_text,
|
|
"what_slows_you_down": friction_text,
|
|
"most_used_levers": tools_text,
|
|
}
|
|
|
|
|
|
def build_insights(data: AggregatedData) -> dict[str, Any]:
|
|
return {
|
|
"at_a_glance": build_at_a_glance(data),
|
|
"top_projects": [
|
|
{
|
|
"path": path,
|
|
"label": project_label(path),
|
|
"session_count": count,
|
|
}
|
|
for path, count in top_entries(data.projects, limit=8)
|
|
],
|
|
"top_goals": [
|
|
{"goal": goal, "label": safe_title(goal), "count": count}
|
|
for goal, count in top_entries(
|
|
data.goal_categories, limit=8, exclude={"warmup_minimal"}
|
|
)
|
|
],
|
|
"top_tools": [
|
|
{"tool": tool, "label": safe_title(tool), "count": count}
|
|
for tool, count in top_entries(data.tool_counts, limit=10)
|
|
],
|
|
"top_commands": [
|
|
{"command": name, "count": count}
|
|
for name, count in top_entries(data.command_families, limit=10)
|
|
],
|
|
"friction": [
|
|
{
|
|
"category": key,
|
|
"label": safe_title(key),
|
|
"count": count,
|
|
"description": FRICTION_DESCRIPTIONS.get(key, ""),
|
|
}
|
|
for key, count in top_entries(data.friction, limit=8)
|
|
],
|
|
}
|
|
|
|
|
|
def escape_html(text: str) -> str:
|
|
return html.escape(text or "")
|
|
|
|
|
|
def generate_bar_chart(
|
|
data: dict[str, int],
|
|
color: str,
|
|
max_items: int = 6,
|
|
label_fn: Any | None = None,
|
|
) -> str:
|
|
entries = top_entries(data, limit=max_items)
|
|
if not entries:
|
|
return '<p class="empty">No data</p>'
|
|
max_value = max(count for _label, count in entries) or 1
|
|
rows: list[str] = []
|
|
for label, count in entries:
|
|
display = label_fn(label) if label_fn else safe_title(label)
|
|
width = (count / max_value) * 100
|
|
rows.append(
|
|
f'<div class="bar-row"><div class="bar-label">{escape_html(str(display))}</div>'
|
|
f'<div class="bar-track"><div class="bar-fill" style="width:{width:.2f}%;background:{color}"></div></div>'
|
|
f'<div class="bar-value">{count}</div></div>'
|
|
)
|
|
return "\n".join(rows)
|
|
|
|
|
|
def generate_html_report(
|
|
data: AggregatedData,
|
|
insights: dict[str, Any],
|
|
project_scope_prefix: str | None,
|
|
) -> str:
|
|
glance = insights["at_a_glance"]
|
|
scope_label = project_scope_prefix or "All Gemini sessions"
|
|
html_parts = [
|
|
"<!doctype html>",
|
|
'<html lang="en">',
|
|
"<head>",
|
|
'<meta charset="utf-8">',
|
|
'<meta name="viewport" content="width=device-width, initial-scale=1">',
|
|
"<title>Gemini CLI Insights Report</title>",
|
|
"<style>",
|
|
"body{margin:0;font-family:ui-sans-serif,system-ui,-apple-system,BlinkMacSystemFont,'Segoe UI',sans-serif;background:#f8fafc;color:#0f172a;}",
|
|
".page{max-width:1180px;margin:0 auto;padding:40px 24px 56px;}",
|
|
".hero{padding:28px 30px;border-radius:22px;background:linear-gradient(135deg,#111827,#0f766e 58%,#f59e0b);color:#ecfeff;box-shadow:0 24px 80px rgba(15,23,42,.18);}",
|
|
".hero h1{margin:0 0 8px;font-size:34px;line-height:1.05;}",
|
|
".hero p{margin:0;color:#d1fae5;max-width:880px;}",
|
|
".scope{margin-top:12px;font-size:13px;color:#a7f3d0;}",
|
|
".glance{margin-top:20px;padding:18px 20px;border-radius:18px;background:rgba(255,255,255,.10);border:1px solid rgba(255,255,255,.16);}",
|
|
".glance-line{margin:0 0 10px;}",
|
|
".stats{display:grid;grid-template-columns:repeat(auto-fit,minmax(150px,1fr));gap:14px;margin:22px 0 30px;}",
|
|
".stat{background:#fff;border:1px solid #e2e8f0;border-radius:16px;padding:16px 18px;box-shadow:0 8px 30px rgba(15,23,42,.05);}",
|
|
".stat-value{font-size:28px;font-weight:700;}",
|
|
".stat-label{margin-top:4px;font-size:12px;text-transform:uppercase;letter-spacing:.08em;color:#64748b;}",
|
|
".grid{display:grid;grid-template-columns:repeat(auto-fit,minmax(320px,1fr));gap:18px;}",
|
|
".panel{background:#fff;border:1px solid #e2e8f0;border-radius:18px;padding:18px;box-shadow:0 8px 30px rgba(15,23,42,.05);}",
|
|
".panel h2{margin:0 0 14px;font-size:15px;text-transform:uppercase;letter-spacing:.08em;color:#64748b;}",
|
|
".bar-row{display:flex;align-items:center;gap:10px;margin-bottom:10px;}",
|
|
".bar-label{width:140px;font-size:13px;line-height:1.25;color:#1e293b;}",
|
|
".bar-track{flex:1;height:10px;border-radius:999px;background:#e2e8f0;overflow:hidden;}",
|
|
".bar-fill{height:100%;border-radius:999px;}",
|
|
".bar-value{width:34px;text-align:right;font-size:12px;color:#475569;}",
|
|
".empty{margin:0;color:#94a3b8;}",
|
|
".sessions{margin-top:26px;background:#fff;border:1px solid #e2e8f0;border-radius:18px;padding:18px;box-shadow:0 8px 30px rgba(15,23,42,.05);}",
|
|
".sessions h2{margin:0 0 14px;font-size:15px;text-transform:uppercase;letter-spacing:.08em;color:#64748b;}",
|
|
".session{padding:14px 0;border-top:1px solid #e2e8f0;}",
|
|
".session:first-of-type{border-top:none;padding-top:0;}",
|
|
".session-top{display:flex;justify-content:space-between;gap:12px;flex-wrap:wrap;}",
|
|
".session-project{font-weight:700;}",
|
|
".session-meta{font-size:12px;color:#64748b;}",
|
|
".session-prompt,.session-summary{margin:6px 0 0;font-size:14px;line-height:1.45;}",
|
|
".footer{margin-top:28px;font-size:12px;color:#64748b;}",
|
|
"@media (max-width:700px){.hero h1{font-size:28px}.bar-label{width:110px}}",
|
|
"</style>",
|
|
"</head>",
|
|
"<body>",
|
|
'<div class="page">',
|
|
'<div class="hero">',
|
|
"<h1>Gemini CLI Insights</h1>",
|
|
f"<p>{escape_html(data.date_range.get('start', ''))} to {escape_html(data.date_range.get('end', ''))}. Built from local Gemini CLI chat sessions.</p>",
|
|
f'<div class="scope">Scope: {escape_html(scope_label)}</div>',
|
|
'<div class="glance">',
|
|
f'<p class="glance-line"><strong>What you do:</strong> {escape_html(glance["what_you_do"])}</p>',
|
|
f'<p class="glance-line"><strong>How you work:</strong> {escape_html(glance["how_you_work"])}</p>',
|
|
f'<p class="glance-line"><strong>External context:</strong> {escape_html(glance["external_context"])}</p>',
|
|
f'<p class="glance-line"><strong>What slows you down:</strong> {escape_html(glance["what_slows_you_down"])}</p>',
|
|
f'<p class="glance-line"><strong>Most-used levers:</strong> {escape_html(glance["most_used_levers"])}</p>',
|
|
"</div>",
|
|
"</div>",
|
|
'<div class="stats">',
|
|
f'<div class="stat"><div class="stat-value">{data.total_sessions}</div><div class="stat-label">Sessions</div></div>',
|
|
f'<div class="stat"><div class="stat-value">{round(data.total_duration_hours, 1)}</div><div class="stat-label">Hours</div></div>',
|
|
f'<div class="stat"><div class="stat-value">{data.total_user_messages}</div><div class="stat-label">User Messages</div></div>',
|
|
f'<div class="stat"><div class="stat-value">{data.total_tool_calls}</div><div class="stat-label">Tool Calls</div></div>',
|
|
f'<div class="stat"><div class="stat-value">{data.total_shell_commands}</div><div class="stat-label">Shell Commands</div></div>',
|
|
f'<div class="stat"><div class="stat-value">{data.total_command_failures}</div><div class="stat-label">Shell Failures</div></div>',
|
|
f'<div class="stat"><div class="stat-value">{data.total_mcp_calls}</div><div class="stat-label">MCP Calls</div></div>',
|
|
f'<div class="stat"><div class="stat-value">{data.total_thoughts}</div><div class="stat-label">Thoughts</div></div>',
|
|
"</div>",
|
|
'<div class="grid">',
|
|
f'<div class="panel"><h2>Projects</h2>{generate_bar_chart(data.projects, "#0f766e", label_fn=project_label)}</div>',
|
|
f'<div class="panel"><h2>Goals</h2>{generate_bar_chart(data.goal_categories, "#2563eb")}</div>',
|
|
f'<div class="panel"><h2>Tools</h2>{generate_bar_chart(data.tool_counts, "#7c3aed")}</div>',
|
|
f'<div class="panel"><h2>Command Families</h2>{generate_bar_chart(data.command_families, "#ea580c", label_fn=lambda value: value)}</div>',
|
|
f'<div class="panel"><h2>Models</h2>{generate_bar_chart(data.models, "#16a34a", label_fn=lambda value: value)}</div>',
|
|
f'<div class="panel"><h2>Friction</h2>{generate_bar_chart(data.friction, "#dc2626")}</div>',
|
|
"</div>",
|
|
'<div class="sessions">',
|
|
"<h2>Recent Sessions</h2>",
|
|
]
|
|
|
|
for item in data.session_summaries[:18]:
|
|
html_parts.extend(
|
|
[
|
|
'<div class="session">',
|
|
'<div class="session-top">',
|
|
f'<div class="session-project">{escape_html(item["project"])}</div>',
|
|
f'<div class="session-meta">{escape_html(item["date"])} | {escape_html(safe_title(item["outcome"]))} | failures: {escape_html(item["failures"])}</div>',
|
|
"</div>",
|
|
f'<p class="session-prompt"><strong>Prompt:</strong> {escape_html(item["prompt"] or "(none captured)")}</p>',
|
|
f'<p class="session-summary"><strong>Summary:</strong> {escape_html(item["summary"])}</p>',
|
|
"</div>",
|
|
]
|
|
)
|
|
|
|
html_parts.extend(
|
|
[
|
|
"</div>",
|
|
(
|
|
f'<div class="footer">Input tokens: {data.total_input_tokens:,} | '
|
|
f"Output tokens: {data.total_output_tokens:,} | "
|
|
f"Cached tokens: {data.total_cached_tokens:,} | "
|
|
f"Thought tokens: {data.total_thought_tokens:,}</div>"
|
|
),
|
|
"</div>",
|
|
"</body>",
|
|
"</html>",
|
|
]
|
|
)
|
|
return "\n".join(html_parts)
|
|
|
|
|
|
def build_export_data(
|
|
data: AggregatedData,
|
|
insights: dict[str, Any],
|
|
sessions: list[GeminiSession],
|
|
project_scope_prefix: str | None,
|
|
) -> dict[str, Any]:
|
|
return {
|
|
"metadata": {
|
|
"generated_at": datetime.now(tz=timezone.utc).isoformat(),
|
|
"report_type": "gemini-insights",
|
|
"date_range": data.date_range,
|
|
"session_count": data.total_sessions,
|
|
"project_scope_prefix": project_scope_prefix,
|
|
},
|
|
"aggregated_data": asdict(data),
|
|
"insights": insights,
|
|
"sessions": [asdict(session) for session in sessions],
|
|
}
|
|
|
|
|
|
def generate_report(args: argparse.Namespace) -> dict[str, Any]:
|
|
ensure_dir(args.cache_dir)
|
|
scope = build_project_scope(args.project_path_prefix)
|
|
sessions = scan_all_sessions(args.gemini_dir, scope)
|
|
aggregated = aggregate_sessions(sessions)
|
|
insights = build_insights(aggregated)
|
|
|
|
output_html = args.output_html or (args.cache_dir / "report.html")
|
|
output_json = args.output_json or (args.cache_dir / "report.json")
|
|
ensure_dir(output_html.parent)
|
|
ensure_dir(output_json.parent)
|
|
|
|
output_html.write_text(
|
|
generate_html_report(aggregated, insights, args.project_path_prefix),
|
|
encoding="utf-8",
|
|
)
|
|
output_json.write_text(
|
|
json.dumps(
|
|
build_export_data(
|
|
aggregated, insights, sessions, args.project_path_prefix
|
|
),
|
|
indent=2,
|
|
),
|
|
encoding="utf-8",
|
|
)
|
|
|
|
return {
|
|
"html_path": output_html,
|
|
"json_path": output_json,
|
|
"data": aggregated,
|
|
"insights": insights,
|
|
}
|
|
|
|
|
|
def print_summary(result: dict[str, Any]) -> None:
|
|
data: AggregatedData = result["data"]
|
|
glance = result["insights"]["at_a_glance"]
|
|
print(f"Wrote HTML report: {result['html_path']}")
|
|
print(f"Wrote JSON export: {result['json_path']}")
|
|
print(
|
|
f"Analyzed {data.total_sessions} sessions "
|
|
f"({data.total_user_messages} user messages, {round(data.total_duration_hours, 1)}h) "
|
|
f"from {data.date_range.get('start', '')} to {data.date_range.get('end', '')}"
|
|
)
|
|
print(f"What you do: {glance['what_you_do']}")
|
|
print(f"What slows you down: {glance['what_slows_you_down']}")
|
|
|
|
|
|
def main() -> int:
|
|
args = parse_args()
|
|
result = generate_report(args)
|
|
print_summary(result)
|
|
return 0
|
|
|
|
|
|
if __name__ == "__main__":
|
|
raise SystemExit(main())
|