codeflash-agent/scripts/claude_insights.py
Kevin Turcios 20f6c59f05
Lint and format entire repo, not just packages (#23)
Remove .codeflash/ from ruff extend-exclude, add per-file ignores
for .codeflash/, scripts/, evals/, and plugin/ (benchmark/script
patterns like print, eval, magic values). Remove shebangs. Widen
pre-commit hooks to check the full repo.
2026-04-15 03:16:15 -05:00

3368 lines
116 KiB
Python

# /// script
# requires-python = ">=3.11"
# ///
"""Portable Python implementation of Claude Code /insights.
This script focuses on the core report pipeline:
1. Scan Claude Code transcript files under ``~/.claude/projects``.
2. Reconstruct leaf conversation chains from append-only JSONL transcripts.
3. Extract deterministic usage metrics from tool calls and user messages.
4. Generate heuristic facets and higher-level insights.
5. Write an HTML report plus a JSON export.
Differences from the TypeScript implementation:
- Uses the Python standard library only.
- Narrative sections are heuristic rather than model-generated.
- Does not implement Anthropic-internal homespace collection or S3 upload.
"""
from __future__ import annotations
import argparse
import difflib
import html
import json
import math
import os
import re
import statistics
import subprocess
from collections import Counter, defaultdict
from collections.abc import Iterable
from dataclasses import asdict, dataclass, field
from datetime import datetime, timezone
from functools import lru_cache
from pathlib import Path
from typing import Any
from uuid import UUID
EXTENSION_TO_LANGUAGE: dict[str, str] = {
".ts": "TypeScript",
".tsx": "TypeScript",
".js": "JavaScript",
".jsx": "JavaScript",
".py": "Python",
".rb": "Ruby",
".go": "Go",
".rs": "Rust",
".java": "Java",
".md": "Markdown",
".json": "JSON",
".yaml": "YAML",
".yml": "YAML",
".sh": "Shell",
".css": "CSS",
".html": "HTML",
}
LABEL_MAP: dict[str, str] = {
"debug_investigate": "Debug/Investigate",
"implement_feature": "Implement Feature",
"fix_bug": "Fix Bug",
"write_script_tool": "Write Script/Tool",
"refactor_code": "Refactor Code",
"configure_system": "Configure System",
"create_pr_commit": "Create PR/Commit",
"analyze_data": "Analyze Data",
"understand_codebase": "Understand Codebase",
"write_tests": "Write Tests",
"write_docs": "Write Docs",
"deploy_infra": "Deploy/Infra",
"warmup_minimal": "Cache Warmup",
"fast_accurate_search": "Fast/Accurate Search",
"correct_code_edits": "Correct Code Edits",
"good_explanations": "Good Explanations",
"proactive_help": "Proactive Help",
"multi_file_changes": "Multi-file Changes",
"handled_complexity": "Multi-file Changes",
"good_debugging": "Good Debugging",
"misunderstood_request": "Misunderstood Request",
"wrong_approach": "Wrong Approach",
"buggy_code": "Buggy Code",
"user_rejected_action": "User Rejected Action",
"claude_got_blocked": "Claude Got Blocked",
"user_stopped_early": "User Stopped Early",
"wrong_file_or_location": "Wrong File/Location",
"excessive_changes": "Excessive Changes",
"slow_or_verbose": "Slow/Verbose",
"tool_failed": "Tool Failed",
"user_unclear": "User Unclear",
"external_issue": "External Issue",
"frustrated": "Frustrated",
"dissatisfied": "Dissatisfied",
"likely_satisfied": "Likely Satisfied",
"satisfied": "Satisfied",
"happy": "Happy",
"unsure": "Unsure",
"neutral": "Neutral",
"delighted": "Delighted",
"single_task": "Single Task",
"multi_task": "Multi Task",
"iterative_refinement": "Iterative Refinement",
"exploration": "Exploration",
"quick_question": "Quick Question",
"fully_achieved": "Fully Achieved",
"mostly_achieved": "Mostly Achieved",
"partially_achieved": "Partially Achieved",
"not_achieved": "Not Achieved",
"unclear_from_transcript": "Unclear",
"unhelpful": "Unhelpful",
"slightly_helpful": "Slightly Helpful",
"moderately_helpful": "Moderately Helpful",
"very_helpful": "Very Helpful",
"essential": "Essential",
}
SATISFACTION_ORDER = [
"frustrated",
"dissatisfied",
"likely_satisfied",
"satisfied",
"happy",
"unsure",
]
OUTCOME_ORDER = [
"not_achieved",
"partially_achieved",
"mostly_achieved",
"fully_achieved",
"unclear_from_transcript",
]
AGENT_TOOL_NAMES = {"Agent", "Task"}
GOAL_PATTERNS: dict[str, list[re.Pattern[str]]] = {
"debug_investigate": [
re.compile(r"\bdebug\b", re.IGNORECASE),
re.compile(r"\binvestigat", re.IGNORECASE),
re.compile(r"\btrace\b", re.IGNORECASE),
re.compile(r"\bwhy\b", re.IGNORECASE),
re.compile(r"\berror\b", re.IGNORECASE),
re.compile(r"\bissue\b", re.IGNORECASE),
],
"implement_feature": [
re.compile(r"\bimplement\b", re.IGNORECASE),
re.compile(r"\bbuild\b", re.IGNORECASE),
re.compile(r"\bfeature\b", re.IGNORECASE),
re.compile(r"\badd\b", re.IGNORECASE),
re.compile(r"\bcreate\b", re.IGNORECASE),
],
"fix_bug": [
re.compile(r"\bfix\b", re.IGNORECASE),
re.compile(r"\bbug\b", re.IGNORECASE),
re.compile(r"\bbroken\b", re.IGNORECASE),
re.compile(r"\bfailing\b", re.IGNORECASE),
],
"write_script_tool": [
re.compile(r"\bscript\b", re.IGNORECASE),
re.compile(r"\bcli\b", re.IGNORECASE),
re.compile(r"\btool\b", re.IGNORECASE),
re.compile(r"\bautomation\b", re.IGNORECASE),
],
"refactor_code": [
re.compile(r"\brefactor\b", re.IGNORECASE),
re.compile(r"\bcleanup\b", re.IGNORECASE),
re.compile(r"\breorgan", re.IGNORECASE),
re.compile(r"\bsimplif", re.IGNORECASE),
],
"configure_system": [
re.compile(r"\bconfigure\b", re.IGNORECASE),
re.compile(r"\bsetup\b", re.IGNORECASE),
re.compile(r"\binstall\b", re.IGNORECASE),
re.compile(r"\bconfig\b", re.IGNORECASE),
re.compile(r"\benv\b", re.IGNORECASE),
re.compile(r"\bdocker\b", re.IGNORECASE),
re.compile(r"\bci\b", re.IGNORECASE),
],
"create_pr_commit": [
re.compile(r"\bcommit\b", re.IGNORECASE),
re.compile(r"\bpull request\b", re.IGNORECASE),
re.compile(r"\bpr\b", re.IGNORECASE),
re.compile(r"\bmerge\b", re.IGNORECASE),
],
"analyze_data": [
re.compile(r"\banaly[sz]e\b", re.IGNORECASE),
re.compile(r"\bmetrics\b", re.IGNORECASE),
re.compile(r"\breport\b", re.IGNORECASE),
re.compile(r"\bdata\b", re.IGNORECASE),
],
"understand_codebase": [
re.compile(r"\bunderstand\b", re.IGNORECASE),
re.compile(r"\bexplain\b", re.IGNORECASE),
re.compile(r"\bwalk ?through\b", re.IGNORECASE),
re.compile(r"\bhow does\b", re.IGNORECASE),
re.compile(r"\bwhere is\b", re.IGNORECASE),
],
"write_tests": [
re.compile(r"\btests?\b", re.IGNORECASE),
re.compile(r"\bpytest\b", re.IGNORECASE),
re.compile(r"\bunit test\b", re.IGNORECASE),
re.compile(r"\bintegration test\b", re.IGNORECASE),
],
"write_docs": [
re.compile(r"\breadme\b", re.IGNORECASE),
re.compile(r"\bdocs?\b", re.IGNORECASE),
re.compile(r"\bdocument", re.IGNORECASE),
],
"deploy_infra": [
re.compile(r"\bdeploy\b", re.IGNORECASE),
re.compile(r"\binfra\b", re.IGNORECASE),
re.compile(r"\bterraform\b", re.IGNORECASE),
re.compile(r"\bkubernetes\b", re.IGNORECASE),
re.compile(r"\bk8s\b", re.IGNORECASE),
],
}
REPEATED_INSTRUCTION_PATTERNS = [
re.compile(r"\balways\b", re.IGNORECASE),
re.compile(r"\bnever\b", re.IGNORECASE),
re.compile(r"\bdon't\b", re.IGNORECASE),
re.compile(r"\bdo not\b", re.IGNORECASE),
re.compile(r"\bplease\b", re.IGNORECASE),
re.compile(r"\bmake sure\b", re.IGNORECASE),
re.compile(r"\buse\b", re.IGNORECASE),
re.compile(r"\brun\b", re.IGNORECASE),
re.compile(r"\bavoid\b", re.IGNORECASE),
]
POSITIVE_STRONG_PATTERNS = [
re.compile(r"\bperfect\b", re.IGNORECASE),
re.compile(r"\bgreat\b", re.IGNORECASE),
re.compile(r"\bawesome\b", re.IGNORECASE),
re.compile(r"\bexcellent\b", re.IGNORECASE),
re.compile(r"\blove\b", re.IGNORECASE),
re.compile(r"\bship it\b", re.IGNORECASE),
]
POSITIVE_MILD_PATTERNS = [
re.compile(r"\bthanks\b", re.IGNORECASE),
re.compile(r"\bthat works\b", re.IGNORECASE),
re.compile(r"\bworks\b", re.IGNORECASE),
re.compile(r"\blooks good\b", re.IGNORECASE),
re.compile(r"\bsolid\b", re.IGNORECASE),
]
NEGATIVE_STRONG_PATTERNS = [
re.compile(r"\bbroken\b", re.IGNORECASE),
re.compile(r"\bfrustrat", re.IGNORECASE),
re.compile(r"\bgive up\b", re.IGNORECASE),
re.compile(r"\buseless\b", re.IGNORECASE),
re.compile(r"\bterrible\b", re.IGNORECASE),
]
NEGATIVE_MILD_PATTERNS = [
re.compile(r"\bnot right\b", re.IGNORECASE),
re.compile(r"\bwrong\b", re.IGNORECASE),
re.compile(r"\btry again\b", re.IGNORECASE),
re.compile(r"\bstill failing\b", re.IGNORECASE),
re.compile(r"\bdoesn't work\b", re.IGNORECASE),
re.compile(r"\bdoes not work\b", re.IGNORECASE),
re.compile(r"\bproblem\b", re.IGNORECASE),
]
CONTINUATION_PATTERNS = [
re.compile(r"\bok\b", re.IGNORECASE),
re.compile(r"\bokay\b", re.IGNORECASE),
re.compile(r"\bnow\b", re.IGNORECASE),
re.compile(r"\bnext\b", re.IGNORECASE),
re.compile(r"\balso\b", re.IGNORECASE),
re.compile(r"\bthen\b", re.IGNORECASE),
]
PROMPT_NOISE_RE = re.compile(r"^\s*<[a-z][^>]*>", re.IGNORECASE)
PROJECT_AREA_DESCRIPTIONS = {
"implement_feature": "You use Claude Code to add or reshape product functionality, usually with code edits followed by a quick validation loop.",
"fix_bug": "You bring Claude in when something is visibly failing and need a concrete patch rather than a high-level discussion.",
"debug_investigate": "You lean on Claude to narrow a failure quickly, map the problem space, and turn symptoms into a working diagnosis.",
"write_script_tool": "You regularly turn repetitive work into scripts and small command-line tools instead of doing it by hand.",
"refactor_code": "You use Claude to restructure code without changing the goal, especially when the work spans several related files.",
"configure_system": "You rely on Claude for setup and environment work where config drift and shell details slow you down.",
"create_pr_commit": "You do not stop at code changes; you also use Claude to package the work into a shippable commit or PR loop.",
"analyze_data": "You use Claude as an analyst as much as a coder, especially when the task starts with collecting and summarizing signals.",
"understand_codebase": "You use Claude to orient inside unfamiliar code before deciding what to change.",
"write_tests": "You turn Claude toward validation work when you need coverage, reproduction, or a guardrail around a fix.",
"write_docs": "You use Claude to turn implementation details into docs and maintainable explanations.",
"deploy_infra": "You ask Claude to help with deployment and infrastructure tasks where small mistakes have outsized consequences.",
}
FEATURE_CATALOG = {
"MCP Servers": {
"one_liner": "Connect Claude to external tools, databases, and APIs.",
"example_code": "claude mcp add github -- npx -y @modelcontextprotocol/server-github",
},
"Custom Skills": {
"one_liner": "Package a repeated workflow behind a reusable slash command.",
"example_code": "mkdir -p .claude/skills/review && $EDITOR .claude/skills/review/SKILL.md",
},
"Hooks": {
"one_liner": "Run validations or formatting automatically at key lifecycle events.",
"example_code": '{\n "hooks": {\n "Stop": ["pytest -q"]\n }\n}',
},
"Headless Mode": {
"one_liner": "Run Claude non-interactively from scripts or CI.",
"example_code": 'claude -p "fix the failing tests and explain the diff" --allowedTools "Read,Edit,Bash"',
},
"Task Agents": {
"one_liner": "Use focused sub-agents for exploration or parallel work.",
"example_code": "Use an agent to explore the auth flow and another agent to inspect the failing tests.",
},
}
FRICTION_DESCRIPTIONS = {
"tool_failed": "Tool execution is breaking momentum. When a shell run or file operation fails, the session shifts from solving the task to recovering the environment.",
"wrong_approach": "The target is usually clear, but the first implementation path is not always the cheapest one. That leads to avoidable retries.",
"buggy_code": "Claude is producing code that still needs correction, so you spend time validating and steering instead of moving straight to done.",
"user_stopped_early": "You are stepping in to redirect or cut off a run before the initial plan lands. That usually means the execution path is drifting too far too quickly.",
"claude_got_blocked": "Some sessions fail for environmental reasons rather than reasoning quality. That still costs turns and makes the workflow feel brittle.",
"user_rejected_action": "Claude is proposing actions you do not want to approve, which adds friction even when the task itself is understood.",
"slow_or_verbose": "The session is spending too much time on explanation or intermediate output relative to the value delivered.",
"wrong_file_or_location": "The implementation work is landing in the wrong place, which forces extra review and cleanup.",
"excessive_changes": "The patch is larger than the task needed, increasing review cost and raising the chance of regressions.",
}
OUTCOME_VERBS = {
"fully_achieved": "fully achieved",
"mostly_achieved": "mostly achieved",
"partially_achieved": "partially achieved",
"not_achieved": "not achieved",
"unclear_from_transcript": "unclear",
}
@dataclass
class SessionLog:
date: str
messages: list[dict[str, Any]]
full_path: str
created: datetime
modified: datetime
first_prompt: str
message_count: int
is_sidechain: bool
session_id: str
leaf_uuid: str
summary: str | None = None
custom_title: str | None = None
tag: str | None = None
agent_name: str | None = None
agent_color: str | None = None
agent_setting: str | None = None
mode: str | None = None
pr_number: int | None = None
pr_url: str | None = None
pr_repository: str | None = None
git_branch: str | None = None
project_path: str = ""
@dataclass
class SessionMeta:
session_id: str
project_path: str
start_time: str
duration_minutes: int
user_message_count: int
assistant_message_count: int
tool_counts: dict[str, int]
languages: dict[str, int]
git_commits: int
git_pushes: int
input_tokens: int
output_tokens: int
first_prompt: str
summary: str | None
user_interruptions: int
user_response_times: list[float]
tool_errors: int
tool_error_categories: dict[str, int]
uses_task_agent: bool
uses_mcp: bool
uses_web_search: bool
uses_web_fetch: bool
lines_added: int
lines_removed: int
files_modified: int
message_hours: list[int]
user_message_timestamps: list[str]
@classmethod
def from_dict(cls, payload: dict[str, Any]) -> SessionMeta:
return cls(**payload)
@dataclass
class SessionFacets:
session_id: str
underlying_goal: str
goal_categories: dict[str, int]
outcome: str
user_satisfaction_counts: dict[str, int]
claude_helpfulness: str
session_type: str
friction_counts: dict[str, int]
friction_detail: str
primary_success: str
brief_summary: str
user_instructions_to_claude: list[str] = field(default_factory=list)
@classmethod
def from_dict(cls, payload: dict[str, Any]) -> SessionFacets:
return cls(**payload)
@dataclass
class AggregatedData:
total_sessions: int
sessions_with_facets: int
date_range: dict[str, str]
total_messages: int = 0
total_duration_hours: float = 0.0
total_input_tokens: int = 0
total_output_tokens: int = 0
tool_counts: dict[str, int] = field(default_factory=dict)
languages: dict[str, int] = field(default_factory=dict)
git_commits: int = 0
git_pushes: int = 0
projects: dict[str, int] = field(default_factory=dict)
goal_categories: dict[str, int] = field(default_factory=dict)
outcomes: dict[str, int] = field(default_factory=dict)
satisfaction: dict[str, int] = field(default_factory=dict)
helpfulness: dict[str, int] = field(default_factory=dict)
session_types: dict[str, int] = field(default_factory=dict)
friction: dict[str, int] = field(default_factory=dict)
success: dict[str, int] = field(default_factory=dict)
session_summaries: list[dict[str, str]] = field(default_factory=list)
total_interruptions: int = 0
total_tool_errors: int = 0
tool_error_categories: dict[str, int] = field(default_factory=dict)
user_response_times: list[float] = field(default_factory=list)
median_response_time: float = 0.0
avg_response_time: float = 0.0
sessions_using_task_agent: int = 0
sessions_using_mcp: int = 0
sessions_using_web_search: int = 0
sessions_using_web_fetch: int = 0
total_lines_added: int = 0
total_lines_removed: int = 0
total_files_modified: int = 0
days_active: int = 0
messages_per_day: float = 0.0
message_hours: list[int] = field(default_factory=list)
multi_clauding: dict[str, int] = field(
default_factory=lambda: {
"overlap_events": 0,
"sessions_involved": 0,
"user_messages_during": 0,
}
)
total_sessions_scanned: int | None = None
@dataclass(frozen=True)
class RepoIdentity:
root: str
common_dir: str | None
remotes: frozenset[str]
worktrees: tuple[str, ...] = ()
@dataclass(frozen=True)
class ProjectScope:
target_prefix: str | None
path_prefixes: frozenset[str]
common_dirs: frozenset[str]
remotes: frozenset[str]
def parse_args() -> argparse.Namespace:
home = Path(os.path.expanduser("~"))
default_projects = home / ".claude" / "projects"
default_cache = home / ".claude" / "usage-data-py"
parser = argparse.ArgumentParser(
description="Python implementation of Claude Code /insights."
)
parser.add_argument(
"--projects-dir",
type=Path,
default=default_projects,
help="Directory containing Claude Code project transcript directories.",
)
parser.add_argument(
"--cache-dir",
type=Path,
default=default_cache,
help="Directory for cached session metadata, facets, and reports.",
)
parser.add_argument(
"--project-path-prefix",
type=str,
default=None,
help=(
"Only include sessions whose transcript project_path matches this path "
"or one of its descendants."
),
)
parser.add_argument(
"--output-html",
type=Path,
default=None,
help="Path for the generated HTML report. Defaults to <cache-dir>/report.html.",
)
parser.add_argument(
"--output-json",
type=Path,
default=None,
help="Path for the JSON export. Defaults to <cache-dir>/report.json.",
)
parser.add_argument(
"--max-sessions-load",
type=int,
default=200,
help="Maximum uncached session files to load on a single run.",
)
parser.add_argument(
"--max-facet-extractions",
type=int,
default=200,
help="Maximum sessions to facet-extract on a single run.",
)
return parser.parse_args()
def ensure_dir(path: Path) -> None:
path.mkdir(parents=True, exist_ok=True)
def normalize_path_for_match(path_text: str) -> str:
return os.path.normpath(os.path.realpath(os.path.expanduser(path_text)))
def run_git(path: str, *args: str) -> str | None:
try:
result = subprocess.run(
["git", "-C", path, *args],
check=False,
capture_output=True,
text=True,
timeout=5,
)
except (OSError, subprocess.TimeoutExpired):
return None
if result.returncode != 0:
return None
return result.stdout.strip()
@lru_cache(maxsize=512)
def get_repo_identity(path_text: str) -> RepoIdentity | None:
normalized_path = normalize_path_for_match(path_text)
root = run_git(normalized_path, "rev-parse", "--show-toplevel")
if not root:
return None
normalized_root = normalize_path_for_match(root)
common_dir = run_git(normalized_root, "rev-parse", "--git-common-dir")
normalized_common_dir: str | None = None
if common_dir:
common_path = Path(common_dir)
if not common_path.is_absolute():
common_path = Path(normalized_root) / common_dir
normalized_common_dir = normalize_path_for_match(str(common_path))
remotes_output = run_git(
normalized_root,
"config",
"--get-regexp",
r"^remote\..*\.url$",
)
remotes: set[str] = set()
if remotes_output:
for line in remotes_output.splitlines():
parts = line.split(None, 1)
if len(parts) == 2 and parts[1].strip():
remotes.add(parts[1].strip())
worktrees_output = run_git(
normalized_root, "worktree", "list", "--porcelain"
)
worktrees: list[str] = []
if worktrees_output:
for line in worktrees_output.splitlines():
if line.startswith("worktree "):
worktree_path = line.removeprefix("worktree ").strip()
if worktree_path:
worktrees.append(normalize_path_for_match(worktree_path))
return RepoIdentity(
root=normalized_root,
common_dir=normalized_common_dir,
remotes=frozenset(remotes),
worktrees=tuple(dict.fromkeys(worktrees)),
)
def discover_git_roots(base_path: Path, max_depth: int = 4) -> set[str]:
if not base_path.exists() or not base_path.is_dir():
return set()
normalized_base = normalize_path_for_match(str(base_path))
discovered: set[str] = set()
for root, dirs, files in os.walk(normalized_base):
current_path = Path(root)
try:
rel_parts = current_path.relative_to(normalized_base).parts
except ValueError:
continue
depth = len(rel_parts)
if depth > max_depth:
dirs[:] = []
continue
if ".git" in dirs or ".git" in files:
discovered.add(normalize_path_for_match(root))
return discovered
def build_project_scope(prefix: str | None) -> ProjectScope:
if not prefix:
return ProjectScope(
target_prefix=None,
path_prefixes=frozenset(),
common_dirs=frozenset(),
remotes=frozenset(),
)
normalized_prefix = normalize_path_for_match(prefix)
path_prefixes: set[str] = {normalized_prefix}
common_dirs: set[str] = set()
remotes: set[str] = set()
candidate_roots = discover_git_roots(Path(normalized_prefix))
direct_identity = get_repo_identity(normalized_prefix)
if direct_identity:
candidate_roots.add(direct_identity.root)
for repo_root in candidate_roots:
identity = get_repo_identity(repo_root)
if not identity:
continue
path_prefixes.add(identity.root)
path_prefixes.update(identity.worktrees)
if identity.common_dir:
common_dirs.add(identity.common_dir)
remotes.update(identity.remotes)
return ProjectScope(
target_prefix=normalized_prefix,
path_prefixes=frozenset(path_prefixes),
common_dirs=frozenset(common_dirs),
remotes=frozenset(remotes),
)
def path_matches_prefix(project_path: str, prefix: str | None) -> bool:
if not prefix:
return True
if not project_path:
return False
normalized_project = normalize_path_for_match(project_path)
normalized_prefix = normalize_path_for_match(prefix)
return (
normalized_project == normalized_prefix
or normalized_project.startswith(normalized_prefix + os.sep)
)
def matches_project_scope(project_path: str, scope: ProjectScope) -> bool:
if scope.target_prefix is None:
return True
if not project_path:
return False
normalized_project = normalize_path_for_match(project_path)
for prefix in scope.path_prefixes:
if normalized_project == prefix or normalized_project.startswith(
prefix + os.sep
):
return True
identity = get_repo_identity(normalized_project)
if not identity:
return False
if identity.common_dir and identity.common_dir in scope.common_dirs:
return True
return bool(scope.remotes and identity.remotes.intersection(scope.remotes))
def truncate(text: str, length: int) -> str:
stripped = " ".join(text.split())
if len(stripped) <= length:
return stripped
return stripped[: max(0, length - 1)].rstrip() + ""
def safe_title(key: str) -> str:
return LABEL_MAP.get(key, key.replace("_", " ").title())
def validate_uuid(text: str) -> bool:
try:
UUID(text)
return True
except Exception:
return False
def parse_iso_timestamp(value: str | None) -> datetime:
if not value:
return datetime.fromtimestamp(0, tz=timezone.utc)
normalized = value
if normalized.endswith("Z"):
normalized = normalized[:-1] + "+00:00"
try:
parsed = datetime.fromisoformat(normalized)
except ValueError:
return datetime.fromtimestamp(0, tz=timezone.utc)
if parsed.tzinfo is None:
return parsed.replace(tzinfo=timezone.utc)
return parsed
def iso_date(value: str) -> str:
return parse_iso_timestamp(value).date().isoformat()
def extract_text_blocks(content: Any) -> list[str]:
if isinstance(content, str):
return [content]
if not isinstance(content, list):
return []
values: list[str] = []
for block in content:
if not isinstance(block, dict):
continue
if block.get("type") == "text" and isinstance(block.get("text"), str):
values.append(block["text"])
return values
def extract_user_message_text(message: dict[str, Any]) -> str:
content = (message.get("message") or {}).get("content")
return "\n".join(
part for part in extract_text_blocks(content) if part
).strip()
def has_tool_result_block(message: dict[str, Any]) -> bool:
content = (message.get("message") or {}).get("content")
if not isinstance(content, list):
return False
return any(
isinstance(block, dict) and block.get("type") == "tool_result"
for block in content
)
def has_visible_user_content(message: dict[str, Any]) -> bool:
if message.get("type") != "user" or message.get("isMeta"):
return False
content = (message.get("message") or {}).get("content")
if isinstance(content, str):
return bool(content.strip())
if not isinstance(content, list):
return False
for block in content:
if not isinstance(block, dict):
continue
if block.get("type") in {"text", "image", "document"}:
return True
return False
def has_visible_assistant_content(message: dict[str, Any]) -> bool:
if message.get("type") != "assistant":
return False
content = (message.get("message") or {}).get("content")
if not isinstance(content, list):
return False
for block in content:
if (
isinstance(block, dict)
and block.get("type") == "text"
and isinstance(block.get("text"), str)
and block["text"].strip()
):
return True
return False
def count_visible_messages(transcript: list[dict[str, Any]]) -> int:
count = 0
for message in transcript:
if message.get("type") == "user":
if has_visible_user_content(message):
count += 1
elif message.get("type") == "assistant":
if has_visible_assistant_content(message):
count += 1
return count
def first_meaningful_user_text(transcript: list[dict[str, Any]]) -> str | None:
for message in transcript:
if message.get("type") != "user" or message.get("isMeta"):
continue
if message.get("isCompactSummary"):
continue
for text in extract_text_blocks(
(message.get("message") or {}).get("content")
):
stripped = text.strip()
if not stripped:
continue
if PROMPT_NOISE_RE.match(stripped):
continue
if stripped.startswith("[Request interrupted by user"):
continue
return stripped
return None
def extract_first_prompt(transcript: list[dict[str, Any]]) -> str:
text = first_meaningful_user_text(transcript)
if not text:
return "No prompt"
return truncate(text.replace("\n", " "), 200)
def is_transcript_message(entry: dict[str, Any]) -> bool:
return entry.get("type") in {"user", "assistant", "attachment", "system"}
def is_legacy_progress_entry(entry: dict[str, Any]) -> bool:
return (
entry.get("type") == "progress"
and isinstance(entry.get("uuid"), str)
and "parentUuid" in entry
)
def is_compact_boundary_message(entry: dict[str, Any]) -> bool:
return (
entry.get("type") == "system"
and entry.get("subtype") == "compact_boundary"
)
def sort_by_timestamp(
messages: Iterable[dict[str, Any]],
) -> list[dict[str, Any]]:
return sorted(messages, key=lambda msg: msg.get("timestamp") or "")
def apply_preserved_segment_relinks(
messages: dict[str, dict[str, Any]],
) -> None:
last_segment: dict[str, Any] | None = None
last_segment_boundary_index = -1
absolute_last_boundary_index = -1
entry_index: dict[str, int] = {}
for index, entry in enumerate(messages.values()):
entry_index[entry["uuid"]] = index
if is_compact_boundary_message(entry):
absolute_last_boundary_index = index
segment = (entry.get("compactMetadata") or {}).get(
"preservedSegment"
) or None
if isinstance(segment, dict):
last_segment = segment
last_segment_boundary_index = index
if not last_segment:
return
seg_is_live = last_segment_boundary_index == absolute_last_boundary_index
preserved_uuids: set[str] = set()
if seg_is_live:
walk_seen: set[str] = set()
current = messages.get(last_segment.get("tailUuid") or "")
reached_head = False
while current and current["uuid"] not in walk_seen:
walk_seen.add(current["uuid"])
preserved_uuids.add(current["uuid"])
if current["uuid"] == last_segment.get("headUuid"):
reached_head = True
break
parent_uuid = current.get("parentUuid")
current = messages.get(parent_uuid) if parent_uuid else None
if not reached_head:
return
head = messages.get(last_segment.get("headUuid") or "")
anchor_uuid = last_segment.get("anchorUuid")
tail_uuid = last_segment.get("tailUuid")
if head and anchor_uuid:
head["parentUuid"] = anchor_uuid
if anchor_uuid and tail_uuid:
for uuid_text, message in list(messages.items()):
if message.get(
"parentUuid"
) == anchor_uuid and uuid_text != last_segment.get("headUuid"):
message["parentUuid"] = tail_uuid
for uuid_text in preserved_uuids:
message = messages.get(uuid_text)
if not message or message.get("type") != "assistant":
continue
usage = ((message.get("message") or {}).get("usage") or {}).copy()
usage["input_tokens"] = 0
usage["output_tokens"] = 0
usage["cache_creation_input_tokens"] = 0
usage["cache_read_input_tokens"] = 0
message.setdefault("message", {})["usage"] = usage
to_delete: list[str] = []
for uuid_text in list(messages.keys()):
idx = entry_index.get(uuid_text, math.inf)
if (
idx < absolute_last_boundary_index
and uuid_text not in preserved_uuids
):
to_delete.append(uuid_text)
for uuid_text in to_delete:
messages.pop(uuid_text, None)
def apply_snip_removals(messages: dict[str, dict[str, Any]]) -> None:
to_delete: set[str] = set()
for entry in messages.values():
snip_metadata = entry.get("snipMetadata") or {}
removed_uuids = snip_metadata.get("removedUuids")
if isinstance(removed_uuids, list):
for uuid_text in removed_uuids:
if isinstance(uuid_text, str):
to_delete.add(uuid_text)
if not to_delete:
return
deleted_parent: dict[str, str | None] = {}
for uuid_text in to_delete:
entry = messages.get(uuid_text)
if not entry:
continue
deleted_parent[uuid_text] = entry.get("parentUuid")
messages.pop(uuid_text, None)
def resolve(start: str) -> str | None:
path: list[str] = []
current: str | None = start
while current and current in to_delete:
path.append(current)
current = deleted_parent.get(current)
if current is None:
break
for item in path:
deleted_parent[item] = current
return current
for message in messages.values():
parent_uuid = message.get("parentUuid")
if parent_uuid and parent_uuid in to_delete:
message["parentUuid"] = resolve(parent_uuid)
def recover_orphaned_parallel_tool_results(
messages: dict[str, dict[str, Any]],
chain: list[dict[str, Any]],
seen: set[str],
) -> list[dict[str, Any]]:
chain_assistants = [
message
for message in chain
if message.get("type") == "assistant"
and isinstance((message.get("message") or {}).get("id"), str)
]
if not chain_assistants:
return chain
anchor_by_message_id: dict[str, dict[str, Any]] = {}
for assistant in chain_assistants:
message_id = (assistant.get("message") or {}).get("id")
if isinstance(message_id, str):
anchor_by_message_id[message_id] = assistant
siblings_by_message_id: dict[str, list[dict[str, Any]]] = defaultdict(list)
tool_results_by_assistant: dict[str, list[dict[str, Any]]] = defaultdict(
list
)
for message in messages.values():
if message.get("type") == "assistant":
message_id = (message.get("message") or {}).get("id")
if isinstance(message_id, str):
siblings_by_message_id[message_id].append(message)
elif (
message.get("type") == "user"
and isinstance(message.get("parentUuid"), str)
and has_tool_result_block(message)
):
tool_results_by_assistant[message["parentUuid"]].append(message)
processed_groups: set[str] = set()
inserts: dict[str, list[dict[str, Any]]] = {}
for assistant in chain_assistants:
message_id = (assistant.get("message") or {}).get("id")
if not isinstance(message_id, str) or message_id in processed_groups:
continue
processed_groups.add(message_id)
group = siblings_by_message_id.get(message_id) or [assistant]
orphaned_siblings = [
member for member in group if member["uuid"] not in seen
]
orphaned_tool_results: list[dict[str, Any]] = []
for member in group:
for tool_result in tool_results_by_assistant.get(
member["uuid"], []
):
if tool_result["uuid"] not in seen:
orphaned_tool_results.append(tool_result)
if not orphaned_siblings and not orphaned_tool_results:
continue
recovered = sort_by_timestamp(orphaned_siblings) + sort_by_timestamp(
orphaned_tool_results
)
for item in recovered:
seen.add(item["uuid"])
anchor = anchor_by_message_id[message_id]
inserts[anchor["uuid"]] = recovered
if not inserts:
return chain
rebuilt: list[dict[str, Any]] = []
for message in chain:
rebuilt.append(message)
rebuilt.extend(inserts.get(message["uuid"], []))
return rebuilt
def build_conversation_chain(
messages: dict[str, dict[str, Any]],
leaf_message: dict[str, Any],
) -> list[dict[str, Any]]:
transcript: list[dict[str, Any]] = []
seen: set[str] = set()
current: dict[str, Any] | None = leaf_message
while current:
uuid_text = current["uuid"]
if uuid_text in seen:
break
seen.add(uuid_text)
transcript.append(current)
parent_uuid = current.get("parentUuid")
current = messages.get(parent_uuid) if parent_uuid else None
transcript.reverse()
return recover_orphaned_parallel_tool_results(messages, transcript, seen)
def load_transcript_file(file_path: Path) -> dict[str, Any]:
messages: dict[str, dict[str, Any]] = {}
summaries: dict[str, str] = {}
custom_titles: dict[str, str] = {}
tags: dict[str, str] = {}
agent_names: dict[str, str] = {}
agent_colors: dict[str, str] = {}
agent_settings: dict[str, str] = {}
pr_numbers: dict[str, int] = {}
pr_urls: dict[str, str] = {}
pr_repositories: dict[str, str] = {}
modes: dict[str, str] = {}
progress_bridge: dict[str, str | None] = {}
try:
raw_lines = file_path.read_text(
encoding="utf-8", errors="replace"
).splitlines()
except OSError:
raw_lines = []
entries: list[dict[str, Any]] = []
for line in raw_lines:
stripped = line.strip()
if not stripped:
continue
try:
parsed = json.loads(stripped)
except json.JSONDecodeError:
continue
if isinstance(parsed, dict):
entries.append(parsed)
for entry in entries:
if is_legacy_progress_entry(entry):
parent_uuid = entry.get("parentUuid")
if parent_uuid and parent_uuid in progress_bridge:
progress_bridge[entry["uuid"]] = progress_bridge[parent_uuid]
else:
progress_bridge[entry["uuid"]] = parent_uuid
continue
if is_transcript_message(entry):
parent_uuid = entry.get("parentUuid")
if parent_uuid in progress_bridge:
entry["parentUuid"] = progress_bridge[parent_uuid]
messages[entry["uuid"]] = entry
elif entry.get("type") == "summary" and isinstance(
entry.get("leafUuid"), str
):
summaries[entry["leafUuid"]] = entry.get("summary") or ""
elif entry.get("type") == "custom-title" and isinstance(
entry.get("sessionId"), str
):
custom_titles[entry["sessionId"]] = entry.get("customTitle") or ""
elif entry.get("type") == "tag" and isinstance(
entry.get("sessionId"), str
):
tags[entry["sessionId"]] = entry.get("tag") or ""
elif entry.get("type") == "agent-name" and isinstance(
entry.get("sessionId"), str
):
agent_names[entry["sessionId"]] = entry.get("agentName") or ""
elif entry.get("type") == "agent-color" and isinstance(
entry.get("sessionId"), str
):
agent_colors[entry["sessionId"]] = entry.get("agentColor") or ""
elif entry.get("type") == "agent-setting" and isinstance(
entry.get("sessionId"), str
):
agent_settings[entry["sessionId"]] = (
entry.get("agentSetting") or ""
)
elif entry.get("type") == "mode" and isinstance(
entry.get("sessionId"), str
):
modes[entry["sessionId"]] = entry.get("mode") or ""
elif entry.get("type") == "pr-link" and isinstance(
entry.get("sessionId"), str
):
pr_numbers[entry["sessionId"]] = int(entry.get("prNumber") or 0)
pr_urls[entry["sessionId"]] = entry.get("prUrl") or ""
pr_repositories[entry["sessionId"]] = (
entry.get("prRepository") or ""
)
apply_preserved_segment_relinks(messages)
apply_snip_removals(messages)
all_messages = list(messages.values())
parent_uuids = {
message.get("parentUuid")
for message in all_messages
if message.get("parentUuid")
}
terminal_messages = [
message
for message in all_messages
if message["uuid"] not in parent_uuids
]
leaf_uuids: set[str] = set()
for terminal in terminal_messages:
seen: set[str] = set()
current: dict[str, Any] | None = terminal
while current:
uuid_text = current["uuid"]
if uuid_text in seen:
break
seen.add(uuid_text)
if current.get("type") in {"user", "assistant"}:
leaf_uuids.add(uuid_text)
break
parent_uuid = current.get("parentUuid")
current = messages.get(parent_uuid) if parent_uuid else None
return {
"messages": messages,
"summaries": summaries,
"custom_titles": custom_titles,
"tags": tags,
"agent_names": agent_names,
"agent_colors": agent_colors,
"agent_settings": agent_settings,
"pr_numbers": pr_numbers,
"pr_urls": pr_urls,
"pr_repositories": pr_repositories,
"modes": modes,
"leaf_uuids": leaf_uuids,
}
def load_all_logs_from_session_file(file_path: Path) -> list[SessionLog]:
data = load_transcript_file(file_path)
messages: dict[str, dict[str, Any]] = data["messages"]
if not messages:
return []
leaf_messages: list[dict[str, Any]] = []
children_by_parent: dict[str, list[dict[str, Any]]] = defaultdict(list)
for message in messages.values():
if message["uuid"] in data["leaf_uuids"]:
leaf_messages.append(message)
elif isinstance(message.get("parentUuid"), str):
children_by_parent[message["parentUuid"]].append(message)
logs: list[SessionLog] = []
for leaf_message in leaf_messages:
chain = build_conversation_chain(messages, leaf_message)
if not chain:
continue
trailing_messages = sort_by_timestamp(
children_by_parent.get(leaf_message["uuid"], [])
)
if trailing_messages:
chain.extend(trailing_messages)
first_message = chain[0]
session_id = str(leaf_message.get("sessionId") or file_path.stem)
logs.append(
SessionLog(
date=str(leaf_message.get("timestamp") or ""),
messages=chain,
full_path=str(file_path),
created=parse_iso_timestamp(first_message.get("timestamp")),
modified=parse_iso_timestamp(leaf_message.get("timestamp")),
first_prompt=extract_first_prompt(chain),
message_count=count_visible_messages(chain),
is_sidechain=bool(first_message.get("isSidechain")),
session_id=session_id,
leaf_uuid=leaf_message["uuid"],
summary=data["summaries"].get(leaf_message["uuid"]),
custom_title=data["custom_titles"].get(session_id),
tag=data["tags"].get(session_id),
agent_name=data["agent_names"].get(session_id),
agent_color=data["agent_colors"].get(session_id),
agent_setting=data["agent_settings"].get(session_id),
mode=data["modes"].get(session_id),
pr_number=data["pr_numbers"].get(session_id),
pr_url=data["pr_urls"].get(session_id),
pr_repository=data["pr_repositories"].get(session_id),
git_branch=leaf_message.get("gitBranch"),
project_path=str(first_message.get("cwd") or ""),
)
)
return logs
def classify_tool_error(content: str) -> str:
lower = content.lower()
if "exit code" in lower:
return "Command Failed"
if "rejected" in lower or "doesn't want" in lower:
return "User Rejected"
if "string to replace not found" in lower or "no changes" in lower:
return "Edit Failed"
if "modified since read" in lower:
return "File Changed"
if "exceeds maximum" in lower or "too large" in lower:
return "File Too Large"
if "file not found" in lower or "does not exist" in lower:
return "File Not Found"
return "Other"
def language_from_path(file_path: str) -> str | None:
return EXTENSION_TO_LANGUAGE.get(Path(file_path).suffix.lower())
def diff_line_counts(old: str, new: str) -> tuple[int, int]:
old_lines = old.splitlines()
new_lines = new.splitlines()
added = 0
removed = 0
matcher = difflib.SequenceMatcher(a=old_lines, b=new_lines)
for tag, i1, i2, j1, j2 in matcher.get_opcodes():
if tag in {"replace", "insert"}:
added += j2 - j1
if tag in {"replace", "delete"}:
removed += i2 - i1
return added, removed
def is_human_user_message(message: dict[str, Any]) -> bool:
if message.get("type") != "user":
return False
content = (message.get("message") or {}).get("content")
if isinstance(content, str):
return bool(content.strip())
if not isinstance(content, list):
return False
return any(
isinstance(block, dict) and block.get("type") == "text"
for block in content
)
def extract_tool_stats(log: SessionLog) -> dict[str, Any]:
tool_counts: Counter[str] = Counter()
languages: Counter[str] = Counter()
git_commits = 0
git_pushes = 0
input_tokens = 0
output_tokens = 0
user_interruptions = 0
user_response_times: list[float] = []
tool_errors = 0
tool_error_categories: Counter[str] = Counter()
uses_task_agent = False
uses_mcp = False
uses_web_search = False
uses_web_fetch = False
lines_added = 0
lines_removed = 0
files_modified: set[str] = set()
message_hours: list[int] = []
user_message_timestamps: list[str] = []
last_assistant_timestamp: str | None = None
for message in log.messages:
timestamp = message.get("timestamp")
if message.get("type") == "assistant":
if timestamp:
last_assistant_timestamp = timestamp
usage = (message.get("message") or {}).get("usage") or {}
input_tokens += int(usage.get("input_tokens") or 0)
output_tokens += int(usage.get("output_tokens") or 0)
content = (message.get("message") or {}).get("content")
if isinstance(content, list):
for block in content:
if (
not isinstance(block, dict)
or block.get("type") != "tool_use"
):
continue
tool_name = str(block.get("name") or "")
tool_counts[tool_name] += 1
if tool_name in AGENT_TOOL_NAMES:
uses_task_agent = True
if tool_name.startswith("mcp__"):
uses_mcp = True
if tool_name == "WebSearch":
uses_web_search = True
if tool_name == "WebFetch":
uses_web_fetch = True
tool_input = block.get("input") or {}
if isinstance(tool_input, dict):
file_path = str(tool_input.get("file_path") or "")
if file_path:
language = language_from_path(file_path)
if language:
languages[language] += 1
if tool_name in {"Edit", "Write"}:
files_modified.add(file_path)
if tool_name == "Edit":
added, removed = diff_line_counts(
str(tool_input.get("old_string") or ""),
str(tool_input.get("new_string") or ""),
)
lines_added += added
lines_removed += removed
if tool_name == "Write":
content_text = str(tool_input.get("content") or "")
if content_text:
lines_added += content_text.count("\n") + 1
command = str(tool_input.get("command") or "")
if "git commit" in command:
git_commits += 1
if "git push" in command:
git_pushes += 1
if message.get("type") == "user":
if is_human_user_message(message) and timestamp:
parsed = parse_iso_timestamp(timestamp).astimezone()
message_hours.append(parsed.hour)
user_message_timestamps.append(timestamp)
if last_assistant_timestamp:
assistant_time = parse_iso_timestamp(
last_assistant_timestamp
)
response_time = (
parse_iso_timestamp(timestamp) - assistant_time
).total_seconds()
if 2 < response_time < 3600:
user_response_times.append(response_time)
content = (message.get("message") or {}).get("content")
if isinstance(content, list):
for block in content:
if (
not isinstance(block, dict)
or block.get("type") != "tool_result"
):
continue
if block.get("is_error"):
tool_errors += 1
tool_error_categories[
classify_tool_error(
str(block.get("content") or "")
)
] += 1
user_text = extract_user_message_text(message)
if "[Request interrupted by user" in user_text:
user_interruptions += 1
return {
"tool_counts": dict(tool_counts),
"languages": dict(languages),
"git_commits": git_commits,
"git_pushes": git_pushes,
"input_tokens": input_tokens,
"output_tokens": output_tokens,
"user_interruptions": user_interruptions,
"user_response_times": user_response_times,
"tool_errors": tool_errors,
"tool_error_categories": dict(tool_error_categories),
"uses_task_agent": uses_task_agent,
"uses_mcp": uses_mcp,
"uses_web_search": uses_web_search,
"uses_web_fetch": uses_web_fetch,
"lines_added": lines_added,
"lines_removed": lines_removed,
"files_modified": len(files_modified),
"message_hours": message_hours,
"user_message_timestamps": user_message_timestamps,
}
def log_to_session_meta(log: SessionLog) -> SessionMeta:
stats = extract_tool_stats(log)
user_message_count = 0
assistant_message_count = 0
for message in log.messages:
if message.get("type") == "assistant":
assistant_message_count += 1
if is_human_user_message(message):
user_message_count += 1
duration_minutes = round((log.modified - log.created).total_seconds() / 60)
return SessionMeta(
session_id=log.session_id,
project_path=log.project_path,
start_time=log.created.isoformat(),
duration_minutes=duration_minutes,
user_message_count=user_message_count,
assistant_message_count=assistant_message_count,
tool_counts=stats["tool_counts"],
languages=stats["languages"],
git_commits=stats["git_commits"],
git_pushes=stats["git_pushes"],
input_tokens=stats["input_tokens"],
output_tokens=stats["output_tokens"],
first_prompt=log.first_prompt,
summary=log.summary,
user_interruptions=stats["user_interruptions"],
user_response_times=stats["user_response_times"],
tool_errors=stats["tool_errors"],
tool_error_categories=stats["tool_error_categories"],
uses_task_agent=stats["uses_task_agent"],
uses_mcp=stats["uses_mcp"],
uses_web_search=stats["uses_web_search"],
uses_web_fetch=stats["uses_web_fetch"],
lines_added=stats["lines_added"],
lines_removed=stats["lines_removed"],
files_modified=stats["files_modified"],
message_hours=stats["message_hours"],
user_message_timestamps=stats["user_message_timestamps"],
)
def load_cached_session_meta(
cache_dir: Path, session_id: str
) -> SessionMeta | None:
path = cache_dir / "session-meta" / f"{session_id}.json"
try:
payload = json.loads(path.read_text(encoding="utf-8"))
except (OSError, json.JSONDecodeError):
return None
if not isinstance(payload, dict):
return None
try:
return SessionMeta.from_dict(payload)
except TypeError:
return None
def save_session_meta(cache_dir: Path, meta: SessionMeta) -> None:
target_dir = cache_dir / "session-meta"
ensure_dir(target_dir)
path = target_dir / f"{meta.session_id}.json"
path.write_text(json.dumps(asdict(meta), indent=2), encoding="utf-8")
def load_cached_facets(
cache_dir: Path, session_id: str
) -> SessionFacets | None:
path = cache_dir / "facets" / f"{session_id}.json"
try:
payload = json.loads(path.read_text(encoding="utf-8"))
except (OSError, json.JSONDecodeError):
return None
if not isinstance(payload, dict):
return None
try:
return SessionFacets.from_dict(payload)
except TypeError:
return None
def save_facets(cache_dir: Path, facets: SessionFacets) -> None:
target_dir = cache_dir / "facets"
ensure_dir(target_dir)
path = target_dir / f"{facets.session_id}.json"
path.write_text(json.dumps(asdict(facets), indent=2), encoding="utf-8")
def scan_all_sessions(projects_dir: Path) -> list[dict[str, Any]]:
if not projects_dir.exists():
return []
results: list[dict[str, Any]] = []
for project_dir in projects_dir.iterdir():
if not project_dir.is_dir():
continue
for session_file in project_dir.iterdir():
if not session_file.is_file() or session_file.suffix != ".jsonl":
continue
if not validate_uuid(session_file.stem):
continue
try:
stat = session_file.stat()
except OSError:
continue
results.append(
{
"session_id": session_file.stem,
"path": session_file,
"mtime": stat.st_mtime,
"size": stat.st_size,
}
)
results.sort(key=lambda item: item["mtime"], reverse=True)
return results
def is_meta_session(log: SessionLog) -> bool:
for message in log.messages[:5]:
if message.get("type") != "user":
continue
text = extract_user_message_text(message)
if (
"RESPOND WITH ONLY A VALID JSON OBJECT" in text
or "record_facets" in text
):
return True
return False
def choose_best_log(
logs: list[SessionLog],
scope: ProjectScope,
) -> SessionLog | None:
matching_logs = [
log for log in logs if matches_project_scope(log.project_path, scope)
]
if not matching_logs:
return None
return max(
matching_logs,
key=lambda log: (
sum(
1 for message in log.messages if is_human_user_message(message)
),
int((log.modified - log.created).total_seconds()),
),
)
def is_substantive_session(meta: SessionMeta) -> bool:
return meta.user_message_count >= 2 and meta.duration_minutes >= 1
def normalize_instruction(text: str) -> str:
normalized = re.sub(r"\s+", " ", text.strip().lower())
return normalized.strip(" .")
def split_candidate_sentences(text: str) -> list[str]:
raw_parts = re.split(r"[\n\r]+|(?<=[.!?])\s+", text)
return [part.strip() for part in raw_parts if part and part.strip()]
def extract_user_texts(log: SessionLog) -> list[str]:
texts: list[str] = []
for message in log.messages:
if not is_human_user_message(message):
continue
text = extract_user_message_text(message)
if text:
texts.append(text)
return texts
def extract_user_instructions(log: SessionLog) -> list[str]:
instructions: list[str] = []
seen: set[str] = set()
for text in extract_user_texts(log):
for sentence in split_candidate_sentences(text):
if not (6 <= len(sentence) <= 180):
continue
if not any(
pattern.search(sentence)
for pattern in REPEATED_INSTRUCTION_PATTERNS
):
continue
cleaned = truncate(sentence, 140)
normalized = normalize_instruction(cleaned)
if normalized in seen:
continue
seen.add(normalized)
instructions.append(cleaned)
return instructions[:10]
def detect_goal_categories(
log: SessionLog, meta: SessionMeta
) -> dict[str, int]:
texts = extract_user_texts(log)
counts: Counter[str] = Counter()
if meta.user_message_count < 2 or meta.duration_minutes < 1:
return {"warmup_minimal": 1}
for text in texts:
for category, patterns in GOAL_PATTERNS.items():
if any(pattern.search(text) for pattern in patterns):
counts[category] += 1
if not counts:
read_heavy = sum(
meta.tool_counts.get(name, 0) for name in ("Read", "Grep", "Glob")
)
if meta.files_modified > 0 or meta.lines_added > 0:
counts["implement_feature"] += 1
elif meta.tool_errors > 0:
counts["debug_investigate"] += 1
elif read_heavy > 0:
counts["understand_codebase"] += 1
else:
counts["warmup_minimal"] += 1
return dict(counts)
def detect_satisfaction(user_texts: list[str]) -> dict[str, int]:
counts: Counter[str] = Counter()
messages = user_texts[1:] if len(user_texts) > 1 else user_texts
for text in messages:
if any(pattern.search(text) for pattern in NEGATIVE_STRONG_PATTERNS):
counts["frustrated"] += 1
elif any(pattern.search(text) for pattern in NEGATIVE_MILD_PATTERNS):
counts["dissatisfied"] += 1
elif any(pattern.search(text) for pattern in POSITIVE_STRONG_PATTERNS):
counts["happy"] += 1
elif any(pattern.search(text) for pattern in POSITIVE_MILD_PATTERNS):
counts["satisfied"] += 1
elif any(pattern.search(text) for pattern in CONTINUATION_PATTERNS):
counts["likely_satisfied"] += 1
if not counts:
counts["unsure"] = 1
return dict(counts)
def detect_friction(
meta: SessionMeta, user_texts: list[str]
) -> tuple[dict[str, int], str]:
counts: Counter[str] = Counter()
all_text = "\n".join(user_texts)
if meta.tool_errors > 0:
counts["tool_failed"] += meta.tool_errors
if meta.tool_error_categories.get("User Rejected", 0) > 0:
counts["user_rejected_action"] += meta.tool_error_categories[
"User Rejected"
]
if meta.user_interruptions > 0:
counts["user_stopped_early"] += meta.user_interruptions
if (
meta.tool_error_categories.get("Command Failed", 0) > 0
or meta.tool_error_categories.get("File Not Found", 0) > 0
):
counts["claude_got_blocked"] += 1
if any(
"wrong file" in text.lower() or "wrong place" in text.lower()
for text in user_texts
):
counts["wrong_file_or_location"] += 1
if any(
"too much" in text.lower() or "overkill" in text.lower()
for text in user_texts
):
counts["excessive_changes"] += 1
if any(
pattern.search(all_text)
for pattern in NEGATIVE_STRONG_PATTERNS + NEGATIVE_MILD_PATTERNS
):
if meta.files_modified > 0:
counts["buggy_code"] += 1
else:
counts["wrong_approach"] += 1
if meta.user_response_times:
median = statistics.median(meta.user_response_times)
if median > 300:
counts["slow_or_verbose"] += 1
if (
not counts
and meta.tool_errors == 0
and meta.files_modified == 0
and meta.user_interruptions == 0
):
detail = ""
else:
top_category = (
counts.most_common(1)[0][0] if counts else "wrong_approach"
)
detail = {
"tool_failed": "Tool runs failed and forced retries before the work could move forward.",
"user_rejected_action": "Claude proposed actions that the user did not want to approve.",
"user_stopped_early": "The run was interrupted before the original plan fully landed.",
"claude_got_blocked": "Environment-level failures blocked progress more than reasoning quality did.",
"buggy_code": "The first patch did not hold up under validation and needed another pass.",
"wrong_approach": "The initial solution path was not the cheapest route to the goal.",
"slow_or_verbose": "The session spent too much time in intermediate output rather than forward progress.",
"wrong_file_or_location": "The implementation drifted toward the wrong file or layer.",
"excessive_changes": "The patch scope expanded beyond what the task really needed.",
}.get(top_category, "There was measurable friction during execution.")
return dict(counts), detail
def infer_outcome(
meta: SessionMeta,
satisfaction: dict[str, int],
friction: dict[str, int],
) -> str:
positive = sum(
satisfaction.get(key, 0)
for key in ("happy", "satisfied", "likely_satisfied")
)
negative = sum(
satisfaction.get(key, 0) for key in ("frustrated", "dissatisfied")
)
if negative >= 2 and positive == 0:
return "not_achieved"
if (
meta.user_interruptions > 0
and meta.files_modified == 0
and positive == 0
):
return "not_achieved"
if positive > 0 and meta.files_modified > 0 and not friction:
return "fully_achieved"
if positive > 0 and (meta.files_modified > 0 or meta.git_commits > 0):
return "mostly_achieved"
if meta.files_modified > 0 or meta.tool_counts:
return (
"partially_achieved" if negative > positive else "mostly_achieved"
)
return "unclear_from_transcript"
def infer_helpfulness(outcome: str, satisfaction: dict[str, int]) -> str:
if outcome == "fully_achieved":
return (
"essential" if satisfaction.get("happy", 0) > 0 else "very_helpful"
)
if outcome == "mostly_achieved":
return (
"very_helpful"
if satisfaction.get("satisfied", 0) > 0
else "moderately_helpful"
)
if outcome == "partially_achieved":
return "slightly_helpful"
if outcome == "not_achieved":
return "unhelpful"
return "moderately_helpful"
def infer_session_type(
goal_categories: dict[str, int], meta: SessionMeta
) -> str:
active_goals = [
goal
for goal, count in goal_categories.items()
if count > 0 and goal != "warmup_minimal"
]
if meta.user_message_count <= 1:
return "quick_question"
if len(active_goals) >= 2 and meta.user_message_count >= 4:
return "multi_task"
if meta.user_message_count >= 5 or meta.user_interruptions > 0:
return "iterative_refinement"
if (
goal_categories.get("understand_codebase", 0) > 0
or goal_categories.get("analyze_data", 0) > 0
) and meta.files_modified == 0:
return "exploration"
return "single_task"
def infer_primary_success(
goal_categories: dict[str, int], meta: SessionMeta, outcome: str
) -> str:
if outcome not in {"fully_achieved", "mostly_achieved"}:
return "none"
if (
goal_categories.get("debug_investigate", 0) > 0
and sum(
meta.tool_counts.get(name, 0) for name in ("Read", "Grep", "Glob")
)
> 0
):
return "good_debugging"
if (
meta.files_modified >= 3
or (meta.lines_added + meta.lines_removed) >= 60
):
return "multi_file_changes"
if meta.files_modified > 0:
return "correct_code_edits"
if (
sum(meta.tool_counts.get(name, 0) for name in ("Read", "Grep", "Glob"))
>= 3
):
return "fast_accurate_search"
if meta.assistant_message_count > meta.user_message_count:
return "good_explanations"
if meta.uses_task_agent:
return "proactive_help"
return "none"
def heuristic_extract_facets(
log: SessionLog, meta: SessionMeta
) -> SessionFacets:
user_texts = extract_user_texts(log)
goal_categories = detect_goal_categories(log, meta)
satisfaction = detect_satisfaction(user_texts)
friction_counts, friction_detail = detect_friction(meta, user_texts)
outcome = infer_outcome(meta, satisfaction, friction_counts)
helpfulness = infer_helpfulness(outcome, satisfaction)
session_type = infer_session_type(goal_categories, meta)
primary_success = infer_primary_success(goal_categories, meta, outcome)
instructions = extract_user_instructions(log)
underlying_goal = truncate(
meta.first_prompt or "Investigate the transcripted task", 120
)
brief_summary = (
f"{underlying_goal} ({OUTCOME_VERBS.get(outcome, outcome)})."
)
return SessionFacets(
session_id=meta.session_id,
underlying_goal=underlying_goal,
goal_categories=goal_categories,
outcome=outcome,
user_satisfaction_counts=satisfaction,
claude_helpfulness=helpfulness,
session_type=session_type,
friction_counts=friction_counts,
friction_detail=friction_detail,
primary_success=primary_success,
brief_summary=brief_summary,
user_instructions_to_claude=instructions,
)
def is_minimal_session(facet: SessionFacets | None) -> bool:
if not facet:
return False
active = [key for key, count in facet.goal_categories.items() if count > 0]
return active == ["warmup_minimal"]
def detect_multi_clauding(sessions: list[SessionMeta]) -> dict[str, int]:
overlap_window_seconds = 30 * 60
all_messages: list[tuple[float, str]] = []
for session in sessions:
for timestamp in session.user_message_timestamps:
all_messages.append(
(
parse_iso_timestamp(timestamp).timestamp(),
session.session_id,
)
)
all_messages.sort()
session_last_index: dict[str, int] = {}
window_start = 0
overlap_pairs: set[tuple[str, str]] = set()
messages_during: set[tuple[int, str]] = set()
for index, (timestamp, session_id) in enumerate(all_messages):
while (
window_start < index
and timestamp - all_messages[window_start][0]
> overlap_window_seconds
):
expiring_session = all_messages[window_start][1]
if session_last_index.get(expiring_session) == window_start:
session_last_index.pop(expiring_session, None)
window_start += 1
previous_index = session_last_index.get(session_id)
if previous_index is not None:
for between_index in range(previous_index + 1, index):
between_session = all_messages[between_index][1]
if between_session != session_id:
pair = tuple(sorted((session_id, between_session)))
overlap_pairs.add(pair)
messages_during.add(
(int(all_messages[previous_index][0]), session_id)
)
messages_during.add(
(int(all_messages[between_index][0]), between_session)
)
messages_during.add((int(timestamp), session_id))
break
session_last_index[session_id] = index
sessions_involved: set[str] = set()
for first, second in overlap_pairs:
sessions_involved.add(first)
sessions_involved.add(second)
return {
"overlap_events": len(overlap_pairs),
"sessions_involved": len(sessions_involved),
"user_messages_during": len(messages_during),
}
def aggregate_data(
sessions: list[SessionMeta], facets: dict[str, SessionFacets]
) -> AggregatedData:
result = AggregatedData(
total_sessions=len(sessions),
sessions_with_facets=len(facets),
date_range={"start": "", "end": ""},
)
dates: list[str] = []
all_response_times: list[float] = []
all_message_hours: list[int] = []
for session in sessions:
dates.append(session.start_time)
result.total_messages += session.user_message_count
result.total_duration_hours += session.duration_minutes / 60
result.total_input_tokens += session.input_tokens
result.total_output_tokens += session.output_tokens
result.git_commits += session.git_commits
result.git_pushes += session.git_pushes
result.total_interruptions += session.user_interruptions
result.total_tool_errors += session.tool_errors
result.total_lines_added += session.lines_added
result.total_lines_removed += session.lines_removed
result.total_files_modified += session.files_modified
result.sessions_using_task_agent += int(session.uses_task_agent)
result.sessions_using_mcp += int(session.uses_mcp)
result.sessions_using_web_search += int(session.uses_web_search)
result.sessions_using_web_fetch += int(session.uses_web_fetch)
all_response_times.extend(session.user_response_times)
all_message_hours.extend(session.message_hours)
for key, count in session.tool_counts.items():
result.tool_counts[key] = result.tool_counts.get(key, 0) + count
for key, count in session.languages.items():
result.languages[key] = result.languages.get(key, 0) + count
for key, count in session.tool_error_categories.items():
result.tool_error_categories[key] = (
result.tool_error_categories.get(key, 0) + count
)
if session.project_path:
result.projects[session.project_path] = (
result.projects.get(session.project_path, 0) + 1
)
facet = facets.get(session.session_id)
if facet:
for key, count in facet.goal_categories.items():
if count > 0:
result.goal_categories[key] = (
result.goal_categories.get(key, 0) + count
)
result.outcomes[facet.outcome] = (
result.outcomes.get(facet.outcome, 0) + 1
)
result.helpfulness[facet.claude_helpfulness] = (
result.helpfulness.get(facet.claude_helpfulness, 0) + 1
)
result.session_types[facet.session_type] = (
result.session_types.get(facet.session_type, 0) + 1
)
for key, count in facet.user_satisfaction_counts.items():
if count > 0:
result.satisfaction[key] = (
result.satisfaction.get(key, 0) + count
)
for key, count in facet.friction_counts.items():
if count > 0:
result.friction[key] = result.friction.get(key, 0) + count
if facet.primary_success != "none":
result.success[facet.primary_success] = (
result.success.get(facet.primary_success, 0) + 1
)
if len(result.session_summaries) < 50:
result.session_summaries.append(
{
"id": session.session_id[:8],
"date": iso_date(session.start_time),
"summary": truncate(
session.summary or session.first_prompt, 100
),
"goal": facet.underlying_goal if facet else "",
}
)
if dates:
dates.sort()
result.date_range["start"] = iso_date(dates[0])
result.date_range["end"] = iso_date(dates[-1])
if all_response_times:
result.user_response_times = all_response_times
result.median_response_time = statistics.median(all_response_times)
result.avg_response_time = sum(all_response_times) / len(
all_response_times
)
if dates:
unique_days = {iso_date(date) for date in dates}
result.days_active = len(unique_days)
if result.days_active:
result.messages_per_day = round(
result.total_messages / result.days_active, 1
)
result.message_hours = all_message_hours
result.multi_clauding = detect_multi_clauding(sessions)
return result
def top_entries(
data: dict[str, int], limit: int = 3, exclude: set[str] | None = None
) -> list[tuple[str, int]]:
exclude = exclude or set()
return [
(key, count)
for key, count in sorted(
data.items(), key=lambda item: item[1], reverse=True
)
if key not in exclude and count > 0
][:limit]
def project_areas_from_heuristics(
data: AggregatedData,
) -> list[dict[str, Any]]:
goal_entries = top_entries(
data.goal_categories, limit=5, exclude={"warmup_minimal"}
)
areas: list[dict[str, Any]] = []
for key, count in goal_entries:
areas.append(
{
"name": safe_title(key),
"session_count": count,
"description": PROJECT_AREA_DESCRIPTIONS.get(
key,
"You use Claude Code for this work often enough that it shows up as a recurring pattern in your sessions.",
),
}
)
if not areas and data.projects:
for project_path, count in top_entries(data.projects, limit=3):
areas.append(
{
"name": Path(project_path).name or project_path,
"session_count": count,
"description": "This project shows up repeatedly in your transcripts, so it is a meaningful part of your Claude Code workload.",
}
)
return areas
def interaction_style_from_heuristics(data: AggregatedData) -> dict[str, str]:
dominant_session_type = (
top_entries(data.session_types, limit=1)[0][0]
if data.session_types
else ""
)
if (
dominant_session_type == "iterative_refinement"
or data.median_response_time < 90
):
sentence_1 = "You tend to iterate quickly with Claude Code, tightening the ask as soon as you see an intermediate result."
key_pattern = "Fast feedback loops shape how you use Claude."
elif data.median_response_time > 240 and data.total_interruptions == 0:
sentence_1 = "You usually hand Claude a chunk of work, let it run, and review after it has produced something substantial."
key_pattern = "You prefer chunked execution over constant steering."
else:
sentence_1 = "You mix direct requests with short follow-up corrections rather than sticking to one rigid interaction pattern."
key_pattern = "You balance direct asks with light steering."
if (
data.total_interruptions > 0
or data.friction.get("user_rejected_action", 0) > 0
):
sentence_2 = "You keep a close hand on execution and intervene quickly when the plan starts drifting."
else:
sentence_2 = "Once the task is framed well, you usually let Claude carry the middle of the execution rather than micromanaging each step."
if data.sessions_using_task_agent > 0 or data.sessions_using_mcp > 0:
sentence_3 = "You are willing to widen the tool surface when it clearly buys leverage, instead of staying confined to basic file edits."
else:
sentence_3 = "You mostly stay close to repo-local context and core edit/search tools, which keeps the workflow predictable."
return {
"narrative": f"{sentence_1}\n\n{sentence_2} {sentence_3}",
"key_pattern": key_pattern,
}
def what_works_from_heuristics(data: AggregatedData) -> dict[str, Any]:
workflows: list[dict[str, str]] = []
if data.total_files_modified > 0:
workflows.append(
{
"title": "Ship Multi-file Changes",
"description": "You are using Claude for work that actually changes the codebase, not just for explanations. That is where the tool creates the most leverage.",
}
)
if (
data.success.get("fast_accurate_search", 0) > 0
or data.goal_categories.get("understand_codebase", 0) > 0
):
workflows.append(
{
"title": "Map Unknown Code Quickly",
"description": "You use Claude well when the work starts with orientation. Search-heavy sessions are turning into faster diagnoses and clearer edits.",
}
)
if data.git_commits > 0:
workflows.append(
{
"title": "Close The Loop",
"description": "You are not stopping at patches. Sessions often make it all the way into a reviewable or commit-ready state, which is the right bar for this workflow.",
}
)
if data.sessions_using_task_agent > 0:
workflows.append(
{
"title": "Delegate Focused Exploration",
"description": "When the repo is broad, you are already comfortable letting a narrower thread explore part of the problem space in parallel.",
}
)
if not workflows:
workflows.append(
{
"title": "Keep Sessions Concrete",
"description": "Your best sessions are the ones with a concrete target and a visible success condition. The transcripts suggest that clarity pays off immediately.",
}
)
return {
"intro": "These are the patterns where Claude Code is already creating real leverage for you.",
"impressive_workflows": workflows[:3],
}
def friction_examples(
category: str, facets: dict[str, SessionFacets]
) -> list[str]:
examples: list[str] = []
seen: set[str] = set()
for facet in facets.values():
if facet.friction_counts.get(category, 0) <= 0:
continue
candidate = facet.friction_detail or facet.brief_summary
normalized = normalize_instruction(candidate)
if normalized in seen or not candidate:
continue
seen.add(normalized)
examples.append(truncate(candidate, 110))
if len(examples) == 2:
break
return examples
def friction_analysis_from_heuristics(
data: AggregatedData, facets: dict[str, SessionFacets]
) -> dict[str, Any]:
categories: list[dict[str, Any]] = []
for key, _count in top_entries(data.friction, limit=3):
categories.append(
{
"category": safe_title(key),
"description": FRICTION_DESCRIPTIONS.get(
key,
"This pattern shows up often enough that it is worth changing the workflow around it.",
),
"examples": friction_examples(key, facets),
}
)
if not categories:
categories.append(
{
"category": "Low Measured Friction",
"description": "No single friction pattern dominates the transcripts. Most sessions either land cleanly or fail for different reasons.",
"examples": [],
}
)
return {
"intro": "The biggest slowdowns are not random; they cluster into a few repeatable failure modes.",
"categories": categories,
}
def repeated_instructions(
facets: dict[str, SessionFacets],
) -> list[tuple[str, int, str]]:
counts: Counter[str] = Counter()
originals: dict[str, str] = {}
for facet in facets.values():
for instruction in facet.user_instructions_to_claude:
normalized = normalize_instruction(instruction)
if not normalized:
continue
counts[normalized] += 1
originals.setdefault(normalized, instruction)
ranked = counts.most_common()
return [
(normalized, count, originals[normalized])
for normalized, count in ranked
]
def feature_suggestions(
data: AggregatedData, repeated: list[tuple[str, int, str]]
) -> list[dict[str, str]]:
selections: list[str] = []
if data.sessions_using_mcp == 0:
selections.append("MCP Servers")
if data.sessions_using_task_agent == 0:
selections.append("Task Agents")
if repeated:
selections.append("Custom Skills")
if data.total_tool_errors > 0:
selections.append("Hooks")
if data.git_commits > 0 or data.total_lines_added > 150:
selections.append("Headless Mode")
ordered = []
seen: set[str] = set()
for name in selections:
if name in seen:
continue
seen.add(name)
ordered.append(name)
output: list[dict[str, str]] = []
for name in ordered[:3]:
why = {
"MCP Servers": "You are still solving most tasks with local repo context only. External context would remove lookup overhead when the answer lives outside the tree.",
"Task Agents": "Your transcripts show broad tasks that would benefit from parallel exploration instead of one linear thread doing all the discovery.",
"Custom Skills": "You repeat certain instructions enough that they should become a reusable workflow instead of another line in chat.",
"Hooks": "Validation failures are costing you turns. Automating the checks would catch them before you need to ask again.",
"Headless Mode": "Some of your work is procedural enough that it can be turned into a repeatable non-interactive job for CI or local automation.",
}[name]
output.append(
{
"feature": name,
"one_liner": FEATURE_CATALOG[name]["one_liner"],
"why_for_you": why,
"example_code": FEATURE_CATALOG[name]["example_code"],
}
)
return output
def claude_md_additions(
repeated: list[tuple[str, int, str]],
) -> list[dict[str, str]]:
additions: list[dict[str, str]] = []
for _normalized, count, original in repeated:
if count < 2:
continue
additions.append(
{
"addition": original,
"why": f"You repeated this in {count} separate sessions. That is strong evidence it belongs in durable project guidance.",
"prompt_scaffold": "Add under a workflow or validation section in CLAUDE.md.",
}
)
if len(additions) == 3:
break
return additions
def usage_patterns(
data: AggregatedData, repeated: list[tuple[str, int, str]]
) -> list[dict[str, str]]:
patterns: list[dict[str, str]] = []
if data.total_tool_errors > 0:
patterns.append(
{
"title": "Ask For A Validation Pass",
"suggestion": "Separate implementation from verification so the model knows the job is not done at the first patch.",
"detail": "This is the fastest way to reduce avoidable retries when the first edit is plausible but not yet proven. It is especially useful in sessions that already include shell validation.",
"copyable_prompt": "Make the minimal patch first, then run the relevant validation and tell me exactly what still fails before doing any more refactoring.",
}
)
if data.total_interruptions > 0:
patterns.append(
{
"title": "Split Plan From Execute",
"suggestion": "Use one short turn to force a bounded plan before Claude starts changing files.",
"detail": "Your interruptions suggest that the problem is often not effort but drift. A compact execution plan makes it easier to catch the wrong approach before the patch grows.",
"copyable_prompt": "Before editing anything, give me a 3-step plan with the files you expect to touch and the validation you will run. Wait for approval.",
}
)
if repeated:
patterns.append(
{
"title": "Promote Repeated Constraints",
"suggestion": "Move recurring instructions into persistent project guidance instead of restating them in chat.",
"detail": "If the same constraint appears in multiple sessions, it is no longer session-specific. Turning it into durable guidance frees the conversation to focus on the task.",
"copyable_prompt": "Review the last few sessions and extract the recurring instructions I keep repeating. Draft the exact CLAUDE.md additions you would recommend.",
}
)
if data.sessions_using_task_agent == 0:
patterns.append(
{
"title": "Use Parallel Exploration",
"suggestion": "Ask Claude to split discovery work across agents when a question touches multiple subsystems.",
"detail": "This is most useful when the next step is blocked on understanding several parts of the repo at once. It keeps the main thread focused on synthesis instead of raw search.",
"copyable_prompt": "Use one agent to trace the failing code path, another to inspect tests, and then summarize the overlap before making changes.",
}
)
return patterns[:3]
def on_the_horizon(data: AggregatedData) -> dict[str, Any]:
opportunities = [
{
"title": "Patch Then Verify In Parallel",
"whats_possible": "A stronger workflow is to let one thread patch while another thread prepares validation or regression checks. That shrinks the dead time between edit and confidence.",
"how_to_try": "Use agents for exploration and keep the main thread for the final patch synthesis.",
"copyable_prompt": "Use one agent to prepare the patch plan, another to identify the best validation commands, then merge the findings and implement the smallest safe fix.",
},
{
"title": "Repo-Wide Maintenance Bursts",
"whats_possible": "As models improve, the obvious next step is batching repetitive repo maintenance instead of handling one fix at a time. That includes lint cleanup, test migrations, and repeated mechanical edits.",
"how_to_try": "Pair headless mode with a narrow validation command so the batch job has a hard stop condition.",
"copyable_prompt": "Identify one mechanical issue repeated across the repo, fix it in the smallest safe batch, and stop if the validation command starts failing for a new reason.",
},
{
"title": "Background Repair Loops",
"whats_possible": "The longer-term opportunity is a workflow where Claude iterates against failing checks with less supervision and hands you a compact review packet when it converges.",
"how_to_try": "Use scripts or CI entrypoints with headless mode so the loop can restart from the same validation target.",
"copyable_prompt": "Treat the failing CI target as the contract. Iterate until it passes or you can prove the blocker is environmental, then summarize the exact diff and remaining risk.",
},
]
if data.total_tool_errors == 0:
opportunities[2]["whats_possible"] = (
"Because your sessions are not dominated by tool failure, you are a good candidate for longer autonomous repair loops with less supervision."
)
return {
"intro": "The next gains are less about better autocomplete and more about moving whole workflows into repeatable loops.",
"opportunities": opportunities,
}
def fun_ending_from_heuristics(
facets: dict[str, SessionFacets],
) -> dict[str, str]:
repeated = repeated_instructions(facets)
if repeated and repeated[0][1] >= 2:
return {
"headline": f'"{repeated[0][2]}" kept coming back across sessions.',
"detail": "That is usually a sign that the workflow wants a durable default instead of another reminder in chat.",
}
positive = [
facet
for facet in facets.values()
if facet.user_satisfaction_counts.get("happy", 0) > 0
or facet.user_satisfaction_counts.get("satisfied", 0) > 0
]
if positive:
chosen = positive[0]
return {
"headline": truncate(chosen.underlying_goal, 90),
"detail": chosen.brief_summary,
}
if facets:
chosen = next(iter(facets.values()))
return {
"headline": truncate(chosen.underlying_goal, 90),
"detail": chosen.brief_summary,
}
return {}
def at_a_glance(
interaction: dict[str, str],
what_works: dict[str, Any],
friction: dict[str, Any],
suggestions: dict[str, Any],
horizon: dict[str, Any],
) -> dict[str, str]:
working = interaction.get("key_pattern", "")
if what_works.get("impressive_workflows"):
first = what_works["impressive_workflows"][0]["title"]
working = f"{working} Your strongest sessions usually end with {first.lower()}."
hindering = friction.get("categories", [{}])[0]
hindering_text = ""
if hindering:
hindering_text = f"The main drag is {str(hindering.get('category', '')).lower()}. You lose momentum when the first path is not cheap to validate or redirect."
feature_names = [
item["feature"] for item in suggestions.get("features_to_try", [])
]
quick_wins = (
", ".join(feature_names[:2])
if feature_names
else "promoting repeated guidance into CLAUDE.md"
)
quick_wins_text = f"The fastest upgrades are {quick_wins}. They directly target the repeated overhead in these sessions."
opportunity_names = [
item["title"] for item in horizon.get("opportunities", [])
]
ambitious = (
", ".join(opportunity_names[:2])
if opportunity_names
else "patch-and-verify loops"
)
ambitious_text = f"The next workflow to prepare for is {ambitious.lower()}. Better models will make longer repair and validation loops much more practical."
return {
"whats_working": working,
"whats_hindering": hindering_text,
"quick_wins": quick_wins_text,
"ambitious_workflows": ambitious_text,
}
def generate_heuristic_insights(
data: AggregatedData, facets: dict[str, SessionFacets]
) -> dict[str, Any]:
project_areas = project_areas_from_heuristics(data)
interaction = interaction_style_from_heuristics(data)
works = what_works_from_heuristics(data)
friction = friction_analysis_from_heuristics(data, facets)
repeated = repeated_instructions(facets)
suggestions = {
"claude_md_additions": claude_md_additions(repeated),
"features_to_try": feature_suggestions(data, repeated),
"usage_patterns": usage_patterns(data, repeated),
}
horizon = on_the_horizon(data)
fun = fun_ending_from_heuristics(facets)
return {
"project_areas": {"areas": project_areas},
"interaction_style": interaction,
"what_works": works,
"friction_analysis": friction,
"suggestions": suggestions,
"on_the_horizon": horizon,
"fun_ending": fun,
"at_a_glance": at_a_glance(
interaction, works, friction, suggestions, horizon
),
}
def escape_html_with_bold(text: str) -> str:
escaped = html.escape(text or "")
return re.sub(r"\*\*(.+?)\*\*", r"<strong>\1</strong>", escaped)
def markdown_to_html(text: str) -> str:
paragraphs = [part for part in text.split("\n\n") if part.strip()]
rendered = []
for paragraph in paragraphs:
escaped = escape_html_with_bold(paragraph).replace("\n", "<br>")
rendered.append(f"<p>{escaped}</p>")
return "\n".join(rendered)
def generate_bar_chart(
data: dict[str, int],
color: str,
max_items: int = 6,
fixed_order: list[str] | None = None,
) -> str:
if fixed_order:
entries = [
(key, data[key]) for key in fixed_order if data.get(key, 0) > 0
]
else:
entries = sorted(data.items(), key=lambda item: item[1], reverse=True)[
:max_items
]
if not entries:
return '<p class="empty">No data</p>'
max_value = max(count for _label, count in entries) or 1
rows = []
for label, count in entries:
width = (count / max_value) * 100
rows.append(
f'<div class="bar-row"><div class="bar-label">{html.escape(safe_title(label))}</div>'
f'<div class="bar-track"><div class="bar-fill" style="width:{width:.2f}%;background:{color}"></div></div>'
f'<div class="bar-value">{count}</div></div>'
)
return "\n".join(rows)
def generate_response_time_histogram(times: list[float]) -> str:
if not times:
return '<p class="empty">No response time data</p>'
buckets: list[tuple[str, int]] = [
("2-10s", 0),
("10-30s", 0),
("30s-1m", 0),
("1-2m", 0),
("2-5m", 0),
("5-15m", 0),
(">15m", 0),
]
bucket_counts = dict(buckets)
for value in times:
if value < 10:
bucket_counts["2-10s"] += 1
elif value < 30:
bucket_counts["10-30s"] += 1
elif value < 60:
bucket_counts["30s-1m"] += 1
elif value < 120:
bucket_counts["1-2m"] += 1
elif value < 300:
bucket_counts["2-5m"] += 1
elif value < 900:
bucket_counts["5-15m"] += 1
else:
bucket_counts[">15m"] += 1
return generate_bar_chart(
bucket_counts, "#6366f1", max_items=len(bucket_counts)
)
def generate_time_of_day_chart(hours: list[int]) -> str:
if not hours:
return '<p class="empty">No time data</p>'
periods = {
"Morning (6-12)": range(6, 12),
"Afternoon (12-18)": range(12, 18),
"Evening (18-24)": range(18, 24),
"Night (0-6)": range(6),
}
counts = dict.fromkeys(periods, 0)
for hour in hours:
for label, hour_range in periods.items():
if hour in hour_range:
counts[label] += 1
break
return generate_bar_chart(counts, "#8b5cf6", max_items=len(counts))
def render_cards(
cards: list[dict[str, Any]],
title_key: str,
description_key: str,
class_name: str,
) -> str:
if not cards:
return ""
parts = ['<div class="card-stack">']
for card in cards:
parts.append(
f'<div class="{class_name}">'
f'<div class="card-title">{html.escape(str(card.get(title_key) or ""))}</div>'
f'<div class="card-body">{html.escape(str(card.get(description_key) or ""))}</div>'
"</div>"
)
parts.append("</div>")
return "\n".join(parts)
def generate_html_report(
data: AggregatedData, insights: dict[str, Any]
) -> str:
at_a_glance = insights.get("at_a_glance") or {}
project_areas = (insights.get("project_areas") or {}).get("areas") or []
interaction_style = insights.get("interaction_style") or {}
what_works = insights.get("what_works") or {}
friction = insights.get("friction_analysis") or {}
suggestions = insights.get("suggestions") or {}
horizon = insights.get("on_the_horizon") or {}
fun = insights.get("fun_ending") or {}
glance_sections = []
for label, key in (
("What's working", "whats_working"),
("What's hindering you", "whats_hindering"),
("Quick wins to try", "quick_wins"),
("Ambitious workflows", "ambitious_workflows"),
):
if at_a_glance.get(key):
glance_sections.append(
f'<div class="glance-section"><strong>{html.escape(label)}:</strong> {escape_html_with_bold(str(at_a_glance[key]))}</div>'
)
project_cards = []
for area in project_areas:
project_cards.append(
'<div class="project-card">'
f'<div class="project-header"><span class="project-name">{html.escape(str(area.get("name") or ""))}</span>'
f'<span class="project-count">~{int(area.get("session_count") or 0)} sessions</span></div>'
f'<div class="project-desc">{html.escape(str(area.get("description") or ""))}</div>'
"</div>"
)
what_works_cards = []
for item in what_works.get("impressive_workflows") or []:
what_works_cards.append(
'<div class="success-card">'
f'<div class="card-title">{html.escape(str(item.get("title") or ""))}</div>'
f'<div class="card-body">{html.escape(str(item.get("description") or ""))}</div>'
"</div>"
)
friction_cards = []
for item in friction.get("categories") or []:
examples_html = ""
examples = item.get("examples") or []
if examples:
examples_html = (
"<ul>"
+ "".join(
f"<li>{html.escape(str(example))}</li>"
for example in examples
)
+ "</ul>"
)
friction_cards.append(
'<div class="warning-card">'
f'<div class="card-title">{html.escape(str(item.get("category") or ""))}</div>'
f'<div class="card-body">{html.escape(str(item.get("description") or ""))}</div>'
f"{examples_html}"
"</div>"
)
feature_cards = []
for item in suggestions.get("features_to_try") or []:
code = item.get("example_code") or ""
feature_cards.append(
'<div class="feature-card">'
f'<div class="card-title">{html.escape(str(item.get("feature") or ""))}</div>'
f'<div class="card-body">{html.escape(str(item.get("one_liner") or ""))}</div>'
f'<div class="subtle"><strong>Why for you:</strong> {html.escape(str(item.get("why_for_you") or ""))}</div>'
f"<pre>{html.escape(str(code))}</pre>"
"</div>"
)
addition_cards = []
for item in suggestions.get("claude_md_additions") or []:
addition_cards.append(
'<div class="feature-card">'
f'<div class="card-title">{html.escape(str(item.get("addition") or ""))}</div>'
f'<div class="subtle">{html.escape(str(item.get("why") or ""))}</div>'
f'<div class="muted">{html.escape(str(item.get("prompt_scaffold") or ""))}</div>'
"</div>"
)
pattern_cards = []
for item in suggestions.get("usage_patterns") or []:
pattern_cards.append(
'<div class="info-card">'
f'<div class="card-title">{html.escape(str(item.get("title") or ""))}</div>'
f'<div class="card-body">{html.escape(str(item.get("suggestion") or ""))}</div>'
f'<div class="subtle">{html.escape(str(item.get("detail") or ""))}</div>'
f"<pre>{html.escape(str(item.get('copyable_prompt') or ''))}</pre>"
"</div>"
)
horizon_cards = []
for item in horizon.get("opportunities") or []:
horizon_cards.append(
'<div class="future-card">'
f'<div class="card-title">{html.escape(str(item.get("title") or ""))}</div>'
f'<div class="card-body">{html.escape(str(item.get("whats_possible") or ""))}</div>'
f'<div class="subtle"><strong>Getting started:</strong> {html.escape(str(item.get("how_to_try") or ""))}</div>'
f"<pre>{html.escape(str(item.get('copyable_prompt') or ''))}</pre>"
"</div>"
)
css = """
* { box-sizing: border-box; }
body {
margin: 0;
font-family: "SF Pro Text", "Inter", -apple-system, BlinkMacSystemFont, sans-serif;
background: #f8fafc;
color: #334155;
line-height: 1.6;
}
.container {
max-width: 980px;
margin: 0 auto;
padding: 40px 20px 80px;
}
h1 {
margin: 0 0 8px;
font-size: 34px;
color: #0f172a;
}
h2 {
margin: 40px 0 14px;
font-size: 20px;
color: #0f172a;
}
.subtitle {
margin: 0 0 28px;
color: #64748b;
}
.glance {
padding: 20px 24px;
background: linear-gradient(135deg, #fef3c7, #fde68a);
border: 1px solid #f59e0b;
border-radius: 14px;
margin-bottom: 28px;
}
.glance-title {
margin-bottom: 12px;
font-size: 15px;
font-weight: 700;
color: #92400e;
}
.glance-section {
margin-bottom: 10px;
color: #78350f;
}
.stats {
display: grid;
grid-template-columns: repeat(auto-fit, minmax(120px, 1fr));
gap: 12px;
margin: 24px 0 36px;
}
.stat {
background: white;
border: 1px solid #e2e8f0;
border-radius: 12px;
padding: 14px 16px;
}
.stat-value {
font-size: 24px;
font-weight: 700;
color: #0f172a;
}
.stat-label {
font-size: 11px;
letter-spacing: 0.08em;
text-transform: uppercase;
color: #64748b;
}
.grid {
display: grid;
grid-template-columns: repeat(auto-fit, minmax(300px, 1fr));
gap: 18px;
margin: 18px 0;
}
.panel {
background: white;
border: 1px solid #e2e8f0;
border-radius: 14px;
padding: 16px;
}
.panel-title {
margin-bottom: 12px;
font-size: 12px;
text-transform: uppercase;
letter-spacing: 0.08em;
color: #64748b;
}
.bar-row {
display: flex;
align-items: center;
gap: 8px;
margin-bottom: 8px;
}
.bar-label {
width: 120px;
font-size: 12px;
color: #475569;
}
.bar-track {
flex: 1;
height: 8px;
background: #f1f5f9;
border-radius: 999px;
overflow: hidden;
}
.bar-fill {
height: 100%;
border-radius: 999px;
}
.bar-value {
width: 36px;
text-align: right;
font-size: 12px;
color: #64748b;
}
.project-list,
.card-stack {
display: flex;
flex-direction: column;
gap: 12px;
}
.project-card,
.success-card,
.warning-card,
.feature-card,
.info-card,
.future-card {
background: white;
border-radius: 12px;
padding: 16px;
border: 1px solid #e2e8f0;
}
.success-card {
background: #f0fdf4;
border-color: #86efac;
}
.warning-card {
background: #fef2f2;
border-color: #fca5a5;
}
.feature-card {
background: #eff6ff;
border-color: #bfdbfe;
}
.info-card {
background: #f0f9ff;
border-color: #7dd3fc;
}
.future-card {
background: #faf5ff;
border-color: #c4b5fd;
}
.project-header {
display: flex;
justify-content: space-between;
gap: 8px;
margin-bottom: 8px;
}
.project-name,
.card-title {
font-weight: 700;
color: #0f172a;
}
.project-count {
font-size: 12px;
color: #64748b;
}
.project-desc,
.card-body {
color: #475569;
}
.subtle {
margin-top: 8px;
color: #334155;
font-size: 14px;
}
.muted {
margin-top: 6px;
color: #64748b;
font-size: 13px;
}
pre {
white-space: pre-wrap;
margin: 12px 0 0;
padding: 12px;
border-radius: 10px;
background: #f8fafc;
border: 1px solid #e2e8f0;
font-size: 12px;
overflow-x: auto;
}
.empty {
color: #94a3b8;
}
.narrative {
background: white;
border: 1px solid #e2e8f0;
border-radius: 14px;
padding: 18px;
}
.narrative p {
margin: 0 0 12px;
}
.fun {
margin-top: 40px;
padding: 24px;
border-radius: 16px;
background: linear-gradient(135deg, #fef3c7, #fde68a);
border: 1px solid #f59e0b;
text-align: center;
}
.fun-quote {
font-size: 19px;
font-weight: 700;
color: #78350f;
}
.fun-detail {
margin-top: 8px;
color: #92400e;
}
ul {
margin: 10px 0 0 18px;
}
@media (max-width: 640px) {
.bar-label {
width: 92px;
}
}
"""
html_parts = [
"<!DOCTYPE html>",
"<html>",
"<head>",
'<meta charset="utf-8">',
"<title>Claude Code Insights (Python)</title>",
f"<style>{css}</style>",
"</head>",
"<body>",
'<div class="container">',
"<h1>Claude Code Insights (Python)</h1>",
f'<p class="subtitle">{data.total_messages:,} messages across {data.total_sessions} sessions | {html.escape(data.date_range.get("start", ""))} to {html.escape(data.date_range.get("end", ""))}</p>',
]
if glance_sections:
html_parts.append(
'<div class="glance"><div class="glance-title">At a Glance</div>'
)
html_parts.extend(glance_sections)
html_parts.append("</div>")
html_parts.append(
'<div class="stats">'
f'<div class="stat"><div class="stat-value">{data.total_messages:,}</div><div class="stat-label">Messages</div></div>'
f'<div class="stat"><div class="stat-value">{data.total_sessions}</div><div class="stat-label">Sessions</div></div>'
f'<div class="stat"><div class="stat-value">{data.days_active}</div><div class="stat-label">Days Active</div></div>'
f'<div class="stat"><div class="stat-value">{data.total_files_modified}</div><div class="stat-label">Files Modified</div></div>'
f'<div class="stat"><div class="stat-value">{data.git_commits}</div><div class="stat-label">Commits</div></div>'
"</div>"
)
if project_cards:
html_parts.append("<h2>What You Work On</h2>")
html_parts.append('<div class="project-list">')
html_parts.extend(project_cards)
html_parts.append("</div>")
if interaction_style.get("narrative"):
html_parts.append("<h2>How You Use Claude Code</h2>")
html_parts.append(
f'<div class="narrative">{markdown_to_html(str(interaction_style["narrative"]))}</div>'
)
html_parts.append('<div class="grid">')
html_parts.append(
f'<div class="panel"><div class="panel-title">What You Wanted</div>{generate_bar_chart(data.goal_categories, "#2563eb")}</div>'
)
html_parts.append(
f'<div class="panel"><div class="panel-title">Top Tools Used</div>{generate_bar_chart(data.tool_counts, "#0891b2")}</div>'
)
html_parts.append(
f'<div class="panel"><div class="panel-title">Languages</div>{generate_bar_chart(data.languages, "#10b981")}</div>'
)
html_parts.append(
f'<div class="panel"><div class="panel-title">Session Types</div>{generate_bar_chart(data.session_types, "#8b5cf6")}</div>'
)
html_parts.append(
f'<div class="panel"><div class="panel-title">Response Time Distribution</div>{generate_response_time_histogram(data.user_response_times)}</div>'
)
html_parts.append(
f'<div class="panel"><div class="panel-title">Messages By Time Of Day</div>{generate_time_of_day_chart(data.message_hours)}</div>'
)
html_parts.append(
f'<div class="panel"><div class="panel-title">Outcomes</div>{generate_bar_chart(data.outcomes, "#7c3aed", fixed_order=OUTCOME_ORDER)}</div>'
)
html_parts.append(
f'<div class="panel"><div class="panel-title">Satisfaction</div>{generate_bar_chart(data.satisfaction, "#eab308", fixed_order=SATISFACTION_ORDER)}</div>'
)
html_parts.append("</div>")
if what_works_cards:
html_parts.append("<h2>Impressive Things You Did</h2>")
if what_works.get("intro"):
html_parts.append(
f'<p class="subtitle">{html.escape(str(what_works["intro"]))}</p>'
)
html_parts.append('<div class="card-stack">')
html_parts.extend(what_works_cards)
html_parts.append("</div>")
if friction_cards:
html_parts.append("<h2>Where Things Go Wrong</h2>")
if friction.get("intro"):
html_parts.append(
f'<p class="subtitle">{html.escape(str(friction["intro"]))}</p>'
)
html_parts.append('<div class="card-stack">')
html_parts.extend(friction_cards)
html_parts.append("</div>")
if addition_cards:
html_parts.append("<h2>Suggested CLAUDE.md Additions</h2>")
html_parts.append('<div class="card-stack">')
html_parts.extend(addition_cards)
html_parts.append("</div>")
if feature_cards:
html_parts.append("<h2>Existing Claude Code Features To Try</h2>")
html_parts.append('<div class="card-stack">')
html_parts.extend(feature_cards)
html_parts.append("</div>")
if pattern_cards:
html_parts.append("<h2>New Ways To Use Claude Code</h2>")
html_parts.append('<div class="card-stack">')
html_parts.extend(pattern_cards)
html_parts.append("</div>")
if horizon_cards:
html_parts.append("<h2>On The Horizon</h2>")
if horizon.get("intro"):
html_parts.append(
f'<p class="subtitle">{html.escape(str(horizon["intro"]))}</p>'
)
html_parts.append('<div class="card-stack">')
html_parts.extend(horizon_cards)
html_parts.append("</div>")
if fun.get("headline"):
html_parts.append('<div class="fun">')
html_parts.append(
f'<div class="fun-quote">"{html.escape(str(fun["headline"]))}"</div>'
)
if fun.get("detail"):
html_parts.append(
f'<div class="fun-detail">{html.escape(str(fun["detail"]))}</div>'
)
html_parts.append("</div>")
html_parts.extend(["</div>", "</body>", "</html>"])
return "\n".join(html_parts)
def build_export_data(
data: AggregatedData,
insights: dict[str, Any],
facets: dict[str, SessionFacets],
project_scope_prefix: str | None = None,
) -> dict[str, Any]:
facets_summary = {
"total": len(facets),
"goal_categories": {},
"outcomes": {},
"satisfaction": {},
"friction": {},
}
for facet in facets.values():
for key, count in facet.goal_categories.items():
if count > 0:
facets_summary["goal_categories"][key] = (
facets_summary["goal_categories"].get(key, 0) + count
)
facets_summary["outcomes"][facet.outcome] = (
facets_summary["outcomes"].get(facet.outcome, 0) + 1
)
for key, count in facet.user_satisfaction_counts.items():
if count > 0:
facets_summary["satisfaction"][key] = (
facets_summary["satisfaction"].get(key, 0) + count
)
for key, count in facet.friction_counts.items():
if count > 0:
facets_summary["friction"][key] = (
facets_summary["friction"].get(key, 0) + count
)
return {
"metadata": {
"username": os.getenv("USER") or "unknown",
"generated_at": datetime.now(tz=timezone.utc).isoformat(),
"claude_code_version": "python-port",
"date_range": data.date_range,
"session_count": data.total_sessions,
"project_scope_prefix": project_scope_prefix,
},
"aggregated_data": asdict(data),
"insights": insights,
"facets_summary": facets_summary,
}
def generate_usage_report(args: argparse.Namespace) -> dict[str, Any]:
cache_dir: Path = args.cache_dir
ensure_dir(cache_dir)
scope = build_project_scope(args.project_path_prefix)
scanned_sessions = scan_all_sessions(args.projects_dir)
total_sessions_scanned = 0
metas: list[SessionMeta] = []
logs_for_facets: dict[str, SessionLog] = {}
uncached_sessions: list[dict[str, Any]] = []
for item in scanned_sessions:
cached = load_cached_session_meta(cache_dir, item["session_id"])
if cached and matches_project_scope(cached.project_path, scope):
metas.append(cached)
total_sessions_scanned += 1
elif len(uncached_sessions) < args.max_sessions_load:
uncached_sessions.append(item)
for item in uncached_sessions:
logs = load_all_logs_from_session_file(item["path"])
if not logs:
continue
best_log = choose_best_log(logs, scope)
if best_log is None:
continue
if is_meta_session(best_log):
continue
meta = log_to_session_meta(best_log)
metas.append(meta)
total_sessions_scanned += 1
logs_for_facets[meta.session_id] = best_log
save_session_meta(cache_dir, meta)
best_by_session: dict[str, SessionMeta] = {}
for meta in metas:
current = best_by_session.get(meta.session_id)
if (
current is None
or meta.user_message_count > current.user_message_count
or (
meta.user_message_count == current.user_message_count
and meta.duration_minutes > current.duration_minutes
)
):
best_by_session[meta.session_id] = meta
metas = sorted(
best_by_session.values(),
key=lambda meta: meta.start_time,
reverse=True,
)
substantive_metas = [
meta for meta in metas if is_substantive_session(meta)
]
facets: dict[str, SessionFacets] = {}
facet_candidates: list[SessionMeta] = []
for meta in substantive_metas:
cached = load_cached_facets(cache_dir, meta.session_id)
if cached:
facets[meta.session_id] = cached
else:
facet_candidates.append(meta)
for meta in facet_candidates[: args.max_facet_extractions]:
log = logs_for_facets.get(meta.session_id)
if log is None:
path = Path(meta.project_path)
session_file = None
if path:
candidate = Path(meta.project_path)
del candidate
for item in scanned_sessions:
if item["session_id"] == meta.session_id:
session_file = item["path"]
break
if session_file:
logs = load_all_logs_from_session_file(session_file)
if logs:
log = choose_best_log(logs, scope)
if log is None:
continue
logs_for_facets[meta.session_id] = log
if not log:
continue
facet = heuristic_extract_facets(log, meta)
facets[facet.session_id] = facet
save_facets(cache_dir, facet)
substantive_facets = {
session_id: facet
for session_id, facet in facets.items()
if not is_minimal_session(facet)
}
substantive_sessions = [
meta
for meta in substantive_metas
if meta.session_id in substantive_facets
]
if not substantive_sessions:
substantive_sessions = substantive_metas
aggregated = aggregate_data(substantive_sessions, substantive_facets)
aggregated.total_sessions_scanned = total_sessions_scanned
insights = generate_heuristic_insights(aggregated, substantive_facets)
output_html = args.output_html or (cache_dir / "report.html")
output_json = args.output_json or (cache_dir / "report.json")
ensure_dir(output_html.parent)
ensure_dir(output_json.parent)
output_html.write_text(
generate_html_report(aggregated, insights), encoding="utf-8"
)
output_json.write_text(
json.dumps(
build_export_data(
aggregated,
insights,
substantive_facets,
args.project_path_prefix,
),
indent=2,
),
encoding="utf-8",
)
return {
"html_path": output_html,
"json_path": output_json,
"data": aggregated,
"insights": insights,
"facets": substantive_facets,
}
def print_summary(result: dict[str, Any]) -> None:
data: AggregatedData = result["data"]
at_a_glance = result["insights"].get("at_a_glance") or {}
print(f"Wrote HTML report: {result['html_path']}")
print(f"Wrote JSON export: {result['json_path']}")
print(
f"Analyzed {data.total_sessions} sessions "
f"({data.total_messages} user messages, {round(data.total_duration_hours)}h) "
f"from {data.date_range.get('start', '')} to {data.date_range.get('end', '')}"
)
if at_a_glance.get("whats_working"):
print(f"What's working: {at_a_glance['whats_working']}")
if at_a_glance.get("quick_wins"):
print(f"Quick wins: {at_a_glance['quick_wins']}")
def main() -> int:
args = parse_args()
result = generate_usage_report(args)
print_summary(result)
return 0
if __name__ == "__main__":
raise SystemExit(main())