diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml
index a46cd09..b96e581 100644
--- a/.github/workflows/ci.yml
+++ b/.github/workflows/ci.yml
@@ -66,18 +66,49 @@ jobs:
- name: Test changed packages
run: |
if [ "${{ github.event_name }}" = "push" ]; then
- uv run pytest packages/ -v
+ uv run pytest packages/ -v --ignore=packages/blackbox/tests/e2e
else
CHANGED='${{ needs.changes.outputs.packages }}'
for pkg in $(echo "$CHANGED" | jq -r '.[]'); do
echo "::group::Testing $pkg"
- uv run pytest "packages/$pkg" -v
+ uv run pytest "packages/$pkg" -v --ignore="packages/$pkg/tests/e2e"
echo "::endgroup::"
done
fi
env:
CI: "true"
+ e2e-blackbox:
+ needs: changes
+ if: >-
+ github.event_name == 'push'
+ || contains(fromJSON(needs.changes.outputs.packages), 'blackbox')
+ runs-on: ubuntu-latest
+ permissions:
+ contents: read
+ strategy:
+ fail-fast: false
+ matrix:
+ browser: [chromium, firefox, webkit]
+ steps:
+ - uses: actions/checkout@v6
+ - uses: astral-sh/setup-uv@v8.0.0
+ with:
+ python-version: "3.12"
+ enable-cache: true
+ - run: uv sync --package blackbox
+ - name: Install Playwright browsers
+ run: uv run playwright install --with-deps ${{ matrix.browser }}
+ - name: Run E2E tests (${{ matrix.browser }})
+ run: uv run pytest packages/blackbox/tests/e2e/ -v --browser ${{ matrix.browser }} --tracing=retain-on-failure
+ - name: Upload Playwright traces
+ if: failure()
+ uses: actions/upload-artifact@v4
+ with:
+ name: playwright-traces-${{ matrix.browser }}
+ path: test-results/
+ retention-days: 7
+
version:
if: github.event_name == 'pull_request'
runs-on: ubuntu-latest
diff --git a/packages/blackbox/README.md b/packages/blackbox/README.md
new file mode 100644
index 0000000..47bec9f
--- /dev/null
+++ b/packages/blackbox/README.md
@@ -0,0 +1,61 @@
+# blackbox
+
+A flight data recorder for AI coding agent sessions.
+
+## Why "blackbox"?
+
+Aircraft carry black boxes (flight data recorders) that silently capture
+everything during a flight, then become invaluable when you need to
+understand what happened. This package does the same for AI coding agent
+sessions: it watches, records, and lets you replay what the agent did,
+how it spent tokens, where it got stuck, and whether the session achieved
+its goal.
+
+Currently supports Claude Code. Codex and Gemini support is planned.
+
+## What it does
+
+**Dashboard** -- a local HTMX web UI for browsing session transcripts
+in real time.
+
+- Sidebar with all sessions from `~/.claude/projects/`, sorted by recency
+- Live session detection via filesystem watching (green dot indicator)
+- Streaming log view with filter presets (all, compact, important, errors)
+- Tool call previews, error highlighting, user message formatting
+
+**Analytics models** -- structured data types for session-level metrics,
+weekly trends, project breakdowns, and recommendations. These feed into
+the analysis pipeline (in progress) that will produce session digests
+and surface patterns across sessions.
+
+## Usage
+
+```bash
+blackbox serve # open dashboard at http://localhost:7100
+blackbox serve --port 8080 # custom port
+blackbox serve --no-open # don't auto-open browser
+```
+
+## Package structure
+
+```
+src/blackbox/
+ cli.py # CLI entry point (serve command)
+ models.py # All domain models (attrs frozen classes)
+ dashboard/
+ app.py # FastAPI instance + lifespan
+ routes.py # API endpoints + SSE log streaming
+ rendering.py # HTML rendering, filtering, formatting
+ transcript.py # JSONL transcript parser + session scanner
+ watcher.py # Watchdog-based live session detection + cache
+ templates/ # Jinja2 templates (Tailwind + HTMX)
+```
+
+## Development
+
+```bash
+uv sync
+uv run fastapi dev src/blackbox/dashboard/app.py # hot reload on :8000
+uv run pytest tests/ -v
+uv run ruff check src/ tests/
+```
diff --git a/packages/blackbox/pyproject.toml b/packages/blackbox/pyproject.toml
new file mode 100644
index 0000000..1575af9
--- /dev/null
+++ b/packages/blackbox/pyproject.toml
@@ -0,0 +1,95 @@
+[project]
+name = "blackbox"
+version = "0.1.0"
+description = "Flight data recorder for AI coding agent sessions"
+requires-python = ">=3.12"
+dependencies = [
+ "attrs>=24.2.0",
+ "danom>=0.13.0",
+ "fastapi[standard]>=0.115.0",
+ "jinja2>=3.1.0",
+ "sse-starlette>=2.0.0",
+ "uvicorn>=0.30.0",
+ "watchdog>=4.0.0",
+]
+
+[project.scripts]
+blackbox = "blackbox.cli:main"
+
+[build-system]
+requires = ["uv_build>=0.7.2,<0.8"]
+build-backend = "uv_build"
+
+[tool.uv.sources]
+danom = { git = "https://github.com/KRRT7/danom.git", branch = "feat/add-py-typed" }
+
+[dependency-groups]
+dev = [
+ "pytest>=9.0.3",
+ "pytest-cov>=6.2.1",
+ "ruff>=0.15.12",
+ "interrogate>=1.7.0",
+ "pytest-asyncio>=1.3.0",
+ "ty>=0.0.33",
+ "pytest-playwright>=0.7.2",
+]
+typing = [
+ "mypy>=1.20.2",
+]
+
+[tool.ty.environment]
+python-version = "3.12"
+
+[tool.mypy]
+strict = true
+warn_return_any = true
+warn_unused_configs = true
+
+[tool.pytest.ini_options]
+asyncio_mode = "auto"
+markers = [
+ "e2e: end-to-end browser tests (requires playwright)",
+]
+
+[tool.coverage.run]
+source = ["blackbox"]
+branch = true
+
+[tool.coverage.report]
+show_missing = true
+skip_empty = true
+
+[tool.interrogate]
+fail-under = 100
+verbose = 2
+
+[tool.ruff]
+line-length = 120
+
+[tool.ruff.lint]
+select = ["ALL"]
+ignore = [
+ "A",
+ "ANN",
+ "ARG",
+ "ASYNC240",
+ "COM812",
+ "D",
+ "E501",
+ "EM",
+ "FBT",
+ "ISC001",
+ "PLR2004",
+ "RET504",
+ "S",
+ "SIM300",
+ "TC003",
+ "TRY003",
+]
+
+[tool.ruff.lint.per-file-ignores]
+"tests/**" = [
+ "PLC0415",
+ "SIM300",
+ "SLF001",
+]
diff --git a/packages/blackbox/src/blackbox/__init__.py b/packages/blackbox/src/blackbox/__init__.py
new file mode 100644
index 0000000..0b68c34
--- /dev/null
+++ b/packages/blackbox/src/blackbox/__init__.py
@@ -0,0 +1,23 @@
+"""Blackbox — flight data recorder for AI coding agent sessions."""
+
+from __future__ import annotations
+
+from blackbox.models import (
+ ProjectStats,
+ Recommendation,
+ SessionAudit,
+ SessionDigest,
+ SessionEvent,
+ SessionMeta,
+ WeekStats,
+)
+
+__all__ = [
+ "ProjectStats",
+ "Recommendation",
+ "SessionAudit",
+ "SessionDigest",
+ "SessionEvent",
+ "SessionMeta",
+ "WeekStats",
+]
diff --git a/packages/blackbox/src/blackbox/analytics.py b/packages/blackbox/src/blackbox/analytics.py
new file mode 100644
index 0000000..db3f230
--- /dev/null
+++ b/packages/blackbox/src/blackbox/analytics.py
@@ -0,0 +1,393 @@
+"""Extract structured analytics from Claude Code session transcripts."""
+
+from __future__ import annotations
+
+import json
+from collections import Counter
+from pathlib import Path
+from typing import Any
+
+from blackbox.dashboard.transcript import ts_to_epoch
+from blackbox.models import (
+ CODEFLASH_AGENT_PREFIXES,
+ CODEFLASH_COMMANDS,
+ CODEFLASH_SKILLS,
+ CodeflashSession,
+ SessionMeta,
+)
+
+EDIT_TOOLS = {"Edit", "Write", "NotebookEdit"}
+FILE_EXTENSIONS: dict[str, str] = {
+ ".py": "python",
+ ".js": "javascript",
+ ".ts": "typescript",
+ ".tsx": "typescript",
+ ".jsx": "javascript",
+ ".go": "go",
+ ".rs": "rust",
+ ".java": "java",
+ ".rb": "ruby",
+ ".sh": "shell",
+ ".bash": "shell",
+ ".zsh": "shell",
+ ".toml": "toml",
+ ".yaml": "yaml",
+ ".yml": "yaml",
+ ".json": "json",
+ ".md": "markdown",
+ ".html": "html",
+ ".css": "css",
+}
+
+
+def extract_meta(path: Path) -> SessionMeta | None: # noqa: C901, PLR0912, PLR0915
+ """Extract a SessionMeta from a raw .jsonl transcript."""
+ session_id = path.stem
+ project_path = path.parent.name
+
+ timestamps: list[float] = []
+ user_messages = 0
+ assistant_messages = 0
+ tool_calls = 0
+ tool_counts: Counter[str] = Counter()
+ tool_errors = 0
+ tool_error_categories: Counter[str] = Counter()
+ tool_error_details: list[tuple[str, str]] = []
+ input_tokens = 0
+ output_tokens = 0
+ cache_read_tokens = 0
+ cache_creation_tokens = 0
+ files_modified: set[str] = set()
+ lines_added = 0
+ lines_removed = 0
+ git_commits = 0
+ git_branch: str | None = None
+ user_interruptions = 0
+ compactions = 0
+ subagents_spawned = 0
+ thinking_blocks = 0
+ web_searches = 0
+ web_fetches = 0
+ permission_mode: str | None = None
+ languages: Counter[str] = Counter()
+ first_prompt = ""
+ pending_tools: dict[str, str] = {}
+ codeflash_agents: set[str] = set()
+ codeflash_skills: set[str] = set()
+ codeflash_commands: set[str] = set()
+ teams_created = 0
+
+ try:
+ text = path.read_text()
+ except OSError:
+ return None
+
+ for line in text.splitlines():
+ if not line.strip():
+ continue
+ try:
+ raw = json.loads(line)
+ except json.JSONDecodeError:
+ continue
+
+ ts = ts_to_epoch(raw.get("timestamp"))
+ if ts:
+ timestamps.append(ts)
+
+ entry_type = raw.get("type", "")
+
+ if entry_type == "permission-mode":
+ permission_mode = raw.get("permissionMode")
+ continue
+
+ if entry_type == "summary":
+ compactions += 1
+ continue
+
+ if git_branch is None and raw.get("gitBranch"):
+ git_branch = raw["gitBranch"]
+
+ if entry_type == "user":
+ msg = raw.get("message", {})
+ if not isinstance(msg, dict):
+ continue
+ content = msg.get("content", "")
+ if isinstance(content, str) and content.strip():
+ user_messages += 1
+ if not first_prompt:
+ first_prompt = content[:120]
+ elif isinstance(content, list):
+ has_tool_result = any(isinstance(b, dict) and b.get("type") == "tool_result" for b in content)
+ if has_tool_result:
+ for block in content:
+ if not isinstance(block, dict) or block.get("type") != "tool_result":
+ continue
+ tool_use_id = block.get("tool_use_id", "")
+ is_error = block.get("is_error", False)
+ if is_error:
+ tool_errors += 1
+ tool_name = pending_tools.get(tool_use_id, "unknown")
+ category = classify_error(tool_name, block, raw)
+ tool_error_categories[category] += 1
+ stderr = ""
+ tur = raw.get("toolUseResult", {})
+ if isinstance(tur, dict):
+ stderr = tur.get("stderr", "")
+ detail_text = stderr or str(block.get("content", ""))[:200]
+ tool_error_details.append((category, detail_text))
+ else:
+ user_messages += 1
+ if not first_prompt:
+ texts = [b.get("text", "") for b in content if isinstance(b, dict) and b.get("type") == "text"]
+ first_prompt = " ".join(texts)[:120]
+
+ interrupted = False
+ tur = raw.get("toolUseResult", {})
+ if isinstance(tur, dict):
+ interrupted = tur.get("interrupted", False)
+ if interrupted:
+ user_interruptions += 1
+
+ elif entry_type == "assistant":
+ msg = raw.get("message", {})
+ if not isinstance(msg, dict):
+ continue
+ usage = msg.get("usage", {})
+ if usage:
+ input_tokens += usage.get("input_tokens", 0)
+ output_tokens += usage.get("output_tokens", 0)
+ cache_read_tokens += usage.get("cache_read_input_tokens", 0)
+ cache_creation_tokens += usage.get("cache_creation_input_tokens", 0)
+ stu = usage.get("server_tool_use", {})
+ if stu:
+ web_searches += stu.get("web_search_requests", 0)
+ web_fetches += stu.get("web_fetch_requests", 0)
+
+ content = msg.get("content", [])
+ if not isinstance(content, list):
+ assistant_messages += 1
+ continue
+
+ has_text = False
+ for block in content:
+ if not isinstance(block, dict):
+ continue
+ btype = block.get("type", "")
+ if btype == "text" and block.get("text", "").strip():
+ has_text = True
+ elif btype == "thinking":
+ thinking_blocks += 1
+ elif btype == "tool_use":
+ tool_name = block.get("name", "unknown")
+ tool_calls += 1
+ tool_counts[tool_name] += 1
+ tool_id = block.get("id", "")
+ if tool_id:
+ pending_tools[tool_id] = tool_name
+ if tool_name == "Agent":
+ subagents_spawned += 1
+ if tool_name == "TeamCreate":
+ teams_created += 1
+ tool_input = block.get("input", {})
+ if isinstance(tool_input, dict):
+ track_file_changes(
+ tool_name,
+ tool_input,
+ files_modified,
+ languages,
+ )
+ lines_a, lines_r = count_diff_lines(tool_name, tool_input)
+ lines_added += lines_a
+ lines_removed += lines_r
+ if tool_name == "Bash":
+ cmd = tool_input.get("command", "")
+ if isinstance(cmd, str) and "git commit" in cmd and "--amend" not in cmd:
+ git_commits += 1
+ if tool_name == "Agent":
+ agent_name = tool_input.get("name", "") or tool_input.get("subagent_type", "")
+ if agent_name in CODEFLASH_AGENT_PREFIXES:
+ codeflash_agents.add(agent_name)
+ if tool_name == "Skill":
+ skill_name = tool_input.get("skill", "")
+ if skill_name in CODEFLASH_SKILLS:
+ codeflash_skills.add(skill_name)
+ if skill_name in CODEFLASH_COMMANDS:
+ codeflash_commands.add(skill_name)
+ if has_text:
+ assistant_messages += 1
+
+ if not timestamps:
+ return None
+
+ start_time = min(timestamps)
+ end_time = max(timestamps)
+
+ return SessionMeta(
+ session_id=session_id,
+ project_path=project_path,
+ transcript_path=str(path),
+ start_time=start_time,
+ end_time=end_time,
+ duration_s=end_time - start_time,
+ user_messages=user_messages,
+ assistant_messages=assistant_messages,
+ tool_calls=tool_calls,
+ tool_counts=dict(tool_counts),
+ tool_errors=tool_errors,
+ tool_error_categories=dict(tool_error_categories),
+ tool_error_details=tuple(tool_error_details),
+ input_tokens=input_tokens,
+ output_tokens=output_tokens,
+ cache_read_tokens=cache_read_tokens,
+ cache_creation_tokens=cache_creation_tokens,
+ languages=dict(languages),
+ files_modified=len(files_modified),
+ lines_added=lines_added,
+ lines_removed=lines_removed,
+ git_commits=git_commits,
+ git_branch=git_branch,
+ user_interruptions=user_interruptions,
+ compactions=compactions,
+ subagents_spawned=subagents_spawned,
+ thinking_blocks=thinking_blocks,
+ web_searches=web_searches,
+ web_fetches=web_fetches,
+ permission_mode=permission_mode,
+ first_prompt=first_prompt,
+ codeflash=detect_codeflash(codeflash_agents, codeflash_skills, codeflash_commands, teams_created),
+ )
+
+
+def classify_error(tool_name: str, block: dict[str, Any], raw: dict[str, Any]) -> str:
+ """Classify a tool error into a category based on tool name and error content."""
+ tur = raw.get("toolUseResult", {})
+ stderr = ""
+ if isinstance(tur, dict):
+ stderr = tur.get("stderr", "")
+ error_text = (stderr or str(block.get("content", ""))).lower()
+
+ if tool_name == "Edit":
+ return "edit_failed"
+ if tool_name == "Bash":
+ return classify_bash_error(error_text)
+ if tool_name in ("Read", "Write"):
+ return classify_file_error(error_text)
+ return "tool_error"
+
+
+def classify_bash_error(error_text: str) -> str:
+ if "permission denied" in error_text:
+ return "permission_denied"
+ if "command not found" in error_text:
+ return "command_not_found"
+ return "command_failed"
+
+
+def classify_file_error(error_text: str) -> str:
+ if "not found" in error_text or "no such file" in error_text:
+ return "file_not_found"
+ return "file_error"
+
+
+def track_file_changes(
+ tool_name: str,
+ tool_input: dict[str, Any],
+ files: set[str],
+ languages: Counter[str],
+) -> None:
+ """Track which files were modified and what languages are involved."""
+ if tool_name not in EDIT_TOOLS:
+ return
+ fp = tool_input.get("file_path", "")
+ if not fp:
+ return
+ files.add(fp)
+ ext = Path(fp).suffix.lower()
+ lang = FILE_EXTENSIONS.get(ext)
+ if lang:
+ languages[lang] += 1
+
+
+def count_diff_lines(tool_name: str, tool_input: dict[str, Any]) -> tuple[int, int]:
+ """Estimate lines added/removed from Edit and Write tool inputs."""
+ if tool_name == "Edit":
+ old = tool_input.get("old_string", "")
+ new = tool_input.get("new_string", "")
+ if isinstance(old, str) and isinstance(new, str):
+ old_lines = old.count("\n") + (1 if old else 0)
+ new_lines = new.count("\n") + (1 if new else 0)
+ added = max(0, new_lines - old_lines)
+ removed = max(0, old_lines - new_lines)
+ return added, removed
+ if tool_name == "Write":
+ content = tool_input.get("content", "")
+ if isinstance(content, str):
+ return content.count("\n") + 1, 0
+ return 0, 0
+
+
+def detect_codeflash(
+ agents: set[str],
+ skills: set[str],
+ commands: set[str],
+ teams_created: int,
+) -> CodeflashSession | None:
+ """Build a CodeflashSession if any codeflash plugin signals were detected."""
+ if not agents and not skills and not commands:
+ return None
+
+ language = infer_language(agents)
+ domain = infer_domain(agents)
+
+ return CodeflashSession(
+ is_codeflash=True,
+ language=language,
+ agents_used=tuple(sorted(agents)),
+ skills_invoked=tuple(sorted(skills)),
+ commands_invoked=tuple(sorted(commands)),
+ teams_created=teams_created,
+ optimization_domain=domain,
+ has_researcher="codeflash-researcher" in agents,
+ has_reviewer="codeflash-review" in agents,
+ has_ci_handler=any(a.endswith("-ci") for a in agents),
+ has_pr_prep=any(a.endswith("-pr-prep") for a in agents),
+ )
+
+
+LANGUAGE_AGENT_MARKERS: dict[str, str] = {
+ "codeflash-python": "python",
+ "codeflash-javascript": "javascript",
+ "codeflash-java": "java",
+}
+
+
+def infer_language(agents: set[str]) -> str | None:
+ """Infer the target language from which language-specific agents were invoked."""
+ for marker, lang in LANGUAGE_AGENT_MARKERS.items():
+ if marker in agents:
+ return lang
+ for agent in agents:
+ if agent.startswith("codeflash-js-"):
+ return "javascript"
+ if agent.startswith("codeflash-java-"):
+ return "java"
+ return None
+
+
+DOMAIN_AGENT_SUFFIXES: dict[str, str] = {
+ "-cpu": "cpu",
+ "-memory": "memory",
+ "-async": "async",
+ "-structure": "structure",
+ "-deep": "deep",
+ "-bundle": "bundle",
+}
+
+
+def infer_domain(agents: set[str]) -> str | None:
+ """Infer the optimization domain from specialist agents used."""
+ for agent in agents:
+ for suffix, domain in DOMAIN_AGENT_SUFFIXES.items():
+ if agent.endswith(suffix):
+ return domain
+ return None
diff --git a/packages/blackbox/src/blackbox/cli.py b/packages/blackbox/src/blackbox/cli.py
new file mode 100644
index 0000000..91a743d
--- /dev/null
+++ b/packages/blackbox/src/blackbox/cli.py
@@ -0,0 +1,58 @@
+from __future__ import annotations
+
+import argparse
+import sys
+import webbrowser
+
+from danom import Err, safe
+
+
+@safe
+def parse_args(argv: list[str] | None = None) -> argparse.Namespace:
+ parser = argparse.ArgumentParser(
+ prog="blackbox",
+ description="Flight data recorder for AI coding agent sessions",
+ )
+ subparsers = parser.add_subparsers(dest="command", required=True)
+
+ serve_parser = subparsers.add_parser("serve", help="Launch the session dashboard")
+ serve_parser.add_argument("--port", type=int, default=7100)
+ serve_parser.add_argument("--no-open", action="store_true", help="Don't open browser automatically")
+
+ return parser.parse_args(argv)
+
+
+@safe
+def run(args: argparse.Namespace) -> None:
+ if args.command == "serve":
+ run_serve(args)
+ else:
+ msg = f"Unknown command: {args.command}"
+ raise ValueError(msg)
+
+
+def run_serve(args: argparse.Namespace) -> None:
+ import uvicorn # noqa: PLC0415
+
+ from blackbox.dashboard.app import app # noqa: PLC0415
+
+ if not args.no_open:
+ import threading # noqa: PLC0415
+
+ def open_browser() -> None:
+ import time # noqa: PLC0415
+
+ time.sleep(1.0)
+ webbrowser.open(f"http://localhost:{args.port}")
+
+ threading.Thread(target=open_browser, daemon=True).start()
+
+ uvicorn.run(app, host="127.0.0.1", port=args.port, log_level="warning")
+
+
+def main() -> None:
+ args = parse_args().unwrap()
+ result = run(args)
+ if isinstance(result, Err):
+ print(f"Error: {result.error}", file=sys.stderr) # noqa: T201
+ sys.exit(1)
diff --git a/packages/blackbox/src/blackbox/dashboard/__init__.py b/packages/blackbox/src/blackbox/dashboard/__init__.py
new file mode 100644
index 0000000..e69de29
diff --git a/packages/blackbox/src/blackbox/dashboard/app.py b/packages/blackbox/src/blackbox/dashboard/app.py
new file mode 100644
index 0000000..e76c5c0
--- /dev/null
+++ b/packages/blackbox/src/blackbox/dashboard/app.py
@@ -0,0 +1,37 @@
+"""FastAPI + HTMX dashboard for browsing Claude Code session transcripts."""
+
+from __future__ import annotations
+
+from contextlib import asynccontextmanager
+from pathlib import Path
+from typing import TYPE_CHECKING
+
+from fastapi import FastAPI
+
+from blackbox.dashboard.routes import PROJECTS_DIR, router
+from blackbox.dashboard.watcher import SessionWatcher
+
+if TYPE_CHECKING:
+ from collections.abc import AsyncIterator
+
+
+def create_app(projects_dir: Path | None = None) -> FastAPI:
+ """Create the dashboard FastAPI app, optionally overriding the projects directory."""
+ actual_dir = projects_dir or PROJECTS_DIR
+
+ @asynccontextmanager
+ async def lifespan(the_app: FastAPI) -> AsyncIterator[None]:
+ """Start/stop the session watcher around the app lifecycle."""
+ watcher = SessionWatcher(actual_dir)
+ watcher.start()
+ the_app.state.watcher = watcher
+ the_app.state.projects_dir = actual_dir
+ yield
+ watcher.stop()
+
+ application = FastAPI(title="blackbox", lifespan=lifespan)
+ application.include_router(router)
+ return application
+
+
+app = create_app()
diff --git a/packages/blackbox/src/blackbox/dashboard/rendering.py b/packages/blackbox/src/blackbox/dashboard/rendering.py
new file mode 100644
index 0000000..a561251
--- /dev/null
+++ b/packages/blackbox/src/blackbox/dashboard/rendering.py
@@ -0,0 +1,180 @@
+"""HTML rendering helpers for log entries."""
+
+from __future__ import annotations
+
+import re
+import time
+from datetime import UTC, datetime
+from typing import TYPE_CHECKING
+
+if TYPE_CHECKING:
+ from blackbox.models import LogEntry
+
+FILTER_PRESETS: dict[str, set[str] | None] = {
+ "all": None,
+ "compact": None,
+ "important": {"status", "assistant", "error", "info"},
+ "errors": {"error"},
+}
+
+SKIP_LEVELS = {
+ "delta",
+ "stream",
+ "block_stop",
+ "block_start",
+ "thinking_delta",
+ "tool_start",
+}
+
+
+def fmt_time(ts: float) -> str:
+ return datetime.fromtimestamp(ts, tz=UTC).strftime("%H:%M:%S")
+
+
+def fmt_duration(started: float, finished: float | None) -> str:
+ end = finished or time.time()
+ secs = int(end - started)
+ if secs < 0:
+ return "0s"
+ if secs < 60:
+ return f"{secs}s"
+ mins, secs = divmod(secs, 60)
+ if mins < 60:
+ return f"{mins}m{secs:02d}s"
+ hrs, mins = divmod(mins, 60)
+ return f"{hrs}h{mins:02d}m"
+
+
+def fmt_relative(ts: float) -> str:
+ delta = time.time() - ts
+ if delta < 60:
+ return "just now"
+ if delta < 3600:
+ return f"{int(delta / 60)}m ago"
+ if delta < 86400:
+ return f"{int(delta / 3600)}h ago"
+ return f"{int(delta / 86400)}d ago"
+
+
+def passes_filter(
+ entry: LogEntry,
+ filter_name: str,
+ allowed: set[str] | None,
+) -> bool:
+ stripped = entry.message.strip()
+ if not stripped:
+ return False
+ if filter_name == "all":
+ return True
+ if entry.level in SKIP_LEVELS:
+ return False
+ if allowed is not None and entry.level not in allowed:
+ return False
+ return not (entry.level == "assistant" and stripped == "(thinking)")
+
+
+def esc(text: str) -> str:
+ return text.replace("&", "&").replace("<", "<").replace(">", ">").replace("\n", " ")
+
+
+BOLD_RE = re.compile(r"\*\*(.+?)\*\*")
+
+
+def esc_md(text: str) -> str:
+ escaped = esc(text)
+ return BOLD_RE.sub(r'\1 ', escaped)
+
+
+def shorten_paths(text: str) -> str:
+ text = re.sub(r"/(?:private/)?tmp/[^\s\"']+/?", "", text)
+ return text
+
+
+SOURCE_CLASSES = {
+ "claude": "bg-blue-500",
+ "user": "bg-green-500",
+ "system": "bg-surface-700",
+}
+
+SOURCE_LABELS = {
+ "claude": "CLU",
+ "user": "USR",
+ "system": "SYS",
+}
+
+
+def tool_call_html(preview: str) -> str:
+ shortened = shorten_paths(preview)
+ lines = shortened.split("\n")
+ if len(lines) <= 3:
+ return f'{esc(shortened)} '
+ summary = esc(lines[0])
+ rest = esc("\n".join(lines[1:]))
+ return (
+ f'{summary}'
+ f'+{len(lines) - 1}'
+ f" lines "
+ f'{rest} '
+ f" "
+ )
+
+
+def render_log_html(entry: LogEntry) -> str: # noqa: C901, PLR0912
+ ts = fmt_time(entry.timestamp)
+ src_cls = SOURCE_CLASSES.get(entry.source, "bg-gray-600")
+ src_label = SOURCE_LABELS.get(entry.source, entry.source[:3].upper())
+
+ if entry.level == "tool_call":
+ tool = entry.data.get("tool", "tool")
+ preview = entry.data.get("input_preview", entry.message)
+ badge_cls = "bg-amber-500"
+ badge_label = esc(tool[:12])
+ elif entry.level == "tool_result":
+ badge_cls = "bg-gray-700"
+ badge_label = "RES"
+ else:
+ badge_cls = src_cls
+ badge_label = src_label
+
+ if entry.source == "user" and entry.level == "info":
+ msg = f'{esc_md(entry.message)} '
+ elif entry.level == "tool_call":
+ preview = entry.data.get("input_preview", "")
+ msg = tool_call_html(preview)
+ elif entry.level == "tool_result":
+ text = entry.message[:500]
+ if len(entry.message) > 500:
+ text += "..."
+ msg = f'{esc(shorten_paths(text))} '
+ elif entry.level == "assistant":
+ if entry.message.strip() == "(thinking)":
+ msg = f'{esc(entry.message)} '
+ else:
+ msg = f'{esc_md(entry.message)} '
+ elif entry.level == "error":
+ msg = f'{esc(entry.message)} '
+ else:
+ msg = f'{esc(entry.message)} '
+
+ extra_div_classes = ""
+ if entry.level == "assistant" and entry.message.strip() == "(thinking)":
+ extra_div_classes = " border-t border-surface-800 mt-2 pt-1"
+
+ is_tool = entry.level in ("tool_call", "tool_result")
+ opacity = " opacity-60" if is_tool else ""
+ indent = " pl-4" if is_tool else ""
+
+ return (
+ f'
'
+ f'{ts} '
+ f''
+ f"{badge_label} "
+ f'{msg} '
+ f"
"
+ )
diff --git a/packages/blackbox/src/blackbox/dashboard/routes.py b/packages/blackbox/src/blackbox/dashboard/routes.py
new file mode 100644
index 0000000..3b4abba
--- /dev/null
+++ b/packages/blackbox/src/blackbox/dashboard/routes.py
@@ -0,0 +1,170 @@
+"""Route handlers for the session dashboard."""
+
+from __future__ import annotations
+
+import asyncio
+from pathlib import Path
+from typing import TYPE_CHECKING
+
+import attrs
+from fastapi import APIRouter, Request
+from fastapi.responses import HTMLResponse
+from fastapi.templating import Jinja2Templates
+from sse_starlette.sse import EventSourceResponse, ServerSentEvent # type: ignore[attr-defined]
+
+from blackbox.analytics import extract_meta
+from blackbox.dashboard.rendering import (
+ FILTER_PRESETS,
+ fmt_duration,
+ fmt_relative,
+ fmt_time,
+ passes_filter,
+ render_log_html,
+)
+from blackbox.dashboard.transcript import parse_transcript, parse_transcript_tail, scan_sessions
+from blackbox.models import SessionInfo
+
+if TYPE_CHECKING:
+ from collections.abc import AsyncIterator
+
+ from blackbox.dashboard.watcher import SessionWatcher
+ from blackbox.models import LogEntry
+
+TEMPLATES_DIR = Path(__file__).parent / "templates"
+PROJECTS_DIR = Path.home() / ".claude" / "projects"
+
+HISTORY_BATCH = 200
+
+router = APIRouter()
+templates = Jinja2Templates(directory=str(TEMPLATES_DIR))
+templates.env.globals["fmt_time"] = fmt_time # type: ignore[assignment] # ty: ignore[invalid-assignment]
+templates.env.globals["fmt_duration"] = fmt_duration # type: ignore[assignment] # ty: ignore[invalid-assignment]
+templates.env.globals["fmt_relative"] = fmt_relative # type: ignore[assignment] # ty: ignore[invalid-assignment]
+
+
+def mark_live(sessions: list[SessionInfo], watcher: SessionWatcher) -> list[SessionInfo]:
+ live_ids = watcher.live_session_ids()
+ if not live_ids:
+ return sessions
+ return [attrs.evolve(s, is_live=True) if s.session_id in live_ids else s for s in sessions]
+
+
+@router.get("/", response_class=HTMLResponse)
+async def index(request: Request, session: str = "") -> HTMLResponse:
+ watcher: SessionWatcher = request.app.state.watcher
+ sessions = mark_live(watcher.get_sessions(scan_sessions), watcher)
+ return templates.TemplateResponse(
+ request,
+ "index.html",
+ context={"sessions": sessions, "selected_id": session},
+ )
+
+
+@router.get("/sessions", response_class=HTMLResponse)
+async def session_list(request: Request, selected: str = "") -> HTMLResponse:
+ watcher: SessionWatcher = request.app.state.watcher
+ sessions = mark_live(watcher.get_sessions(scan_sessions), watcher)
+ return templates.TemplateResponse(
+ request,
+ "partials/session_list.html",
+ context={"sessions": sessions, "selected_id": selected},
+ )
+
+
+@router.get("/sessions/{project_path}/{session_id}", response_class=HTMLResponse)
+async def session_detail(
+ request: Request,
+ project_path: str,
+ session_id: str,
+ filter: str = "compact",
+) -> HTMLResponse:
+ projects_dir: Path = request.app.state.projects_dir
+ transcript_path = projects_dir / project_path / f"{session_id}.jsonl"
+ if not transcript_path.exists():
+ return HTMLResponse(
+ 'Session not found
',
+ )
+ info = build_session_info(transcript_path, session_id, project_path)
+ meta = extract_meta(transcript_path)
+ return templates.TemplateResponse(
+ request,
+ "partials/session_detail.html",
+ context={
+ "session": info,
+ "meta": meta,
+ "filter": filter,
+ "filters": list(FILTER_PRESETS.keys()),
+ },
+ )
+
+
+def build_session_info(path: Path, session_id: str, project_path: str) -> SessionInfo:
+ """Build a SessionInfo from a transcript file for the detail view."""
+ from blackbox.dashboard.transcript import decode_project_name, quick_session_info # noqa: PLC0415
+
+ info = quick_session_info(path, session_id, project_path, decode_project_name(project_path))
+ if info:
+ return info
+ return SessionInfo(
+ session_id=session_id,
+ project_path=project_path,
+ project_name=project_path,
+ transcript_path=str(path),
+ started_at=path.stat().st_mtime,
+ )
+
+
+def filter_and_render(entries: list[LogEntry], filter_name: str, allowed: set[str] | None) -> list[str]:
+ return [
+ html
+ for entry in entries
+ if passes_filter(entry, filter_name, allowed)
+ for html in [render_log_html(entry)]
+ if html
+ ]
+
+
+async def log_stream(
+ transcript_path: Path,
+ filter_name: str,
+) -> AsyncIterator[ServerSentEvent]:
+ allowed = FILTER_PRESETS.get(filter_name)
+ entries = await asyncio.to_thread(parse_transcript, transcript_path)
+
+ batch: list[str] = []
+ for entry in entries:
+ if passes_filter(entry, filter_name, allowed):
+ html = render_log_html(entry)
+ if html:
+ batch.append(html)
+ if len(batch) >= HISTORY_BATCH:
+ yield ServerSentEvent(data="\n".join(batch), event="log")
+ batch = []
+ if batch:
+ yield ServerSentEvent(data="\n".join(batch), event="log")
+
+ offset = transcript_path.stat().st_size
+ while True:
+ await asyncio.sleep(1.0)
+ try:
+ current_size = transcript_path.stat().st_size
+ except OSError:
+ break
+ if current_size <= offset:
+ continue
+ new_entries, offset = await asyncio.to_thread(parse_transcript_tail, transcript_path, offset)
+ rendered = filter_and_render(new_entries, filter_name, allowed)
+ if rendered:
+ yield ServerSentEvent(data="\n".join(rendered), event="log")
+
+
+@router.get("/sessions/{project_path}/{session_id}/logs")
+async def session_logs(
+ request: Request,
+ project_path: str,
+ session_id: str,
+ filter: str = "compact",
+) -> EventSourceResponse:
+ projects_dir: Path = request.app.state.projects_dir
+ transcript_path = projects_dir / project_path / f"{session_id}.jsonl"
+ return EventSourceResponse(log_stream(transcript_path, filter))
diff --git a/packages/blackbox/src/blackbox/dashboard/templates/base.html b/packages/blackbox/src/blackbox/dashboard/templates/base.html
new file mode 100644
index 0000000..8c934d6
--- /dev/null
+++ b/packages/blackbox/src/blackbox/dashboard/templates/base.html
@@ -0,0 +1,108 @@
+
+
+
+
+
+ Blackbox
+
+
+
+
+
+
+
+
+
+ {% block content %}{% endblock %}
+
+
+
diff --git a/packages/blackbox/src/blackbox/dashboard/templates/index.html b/packages/blackbox/src/blackbox/dashboard/templates/index.html
new file mode 100644
index 0000000..4e2c524
--- /dev/null
+++ b/packages/blackbox/src/blackbox/dashboard/templates/index.html
@@ -0,0 +1,98 @@
+{% extends "base.html" %}
+{% block content %}
+
+
+
+
+
+
+
+ {% if selected_id %}
+
+
+ {% else %}
+
+
+
+
+
+
+
+
+
+
Select a session to review
+
sessions appear as Claude Code writes them
+
+
+ {% endif %}
+
+
+
+{% endblock %}
diff --git a/packages/blackbox/src/blackbox/dashboard/templates/partials/session_detail.html b/packages/blackbox/src/blackbox/dashboard/templates/partials/session_detail.html
new file mode 100644
index 0000000..9d20673
--- /dev/null
+++ b/packages/blackbox/src/blackbox/dashboard/templates/partials/session_detail.html
@@ -0,0 +1,189 @@
+
+
+
+
+
+
+ {{ session.project_name }}
+
+ {{ session.session_id[:8] }}
+
+
+ {{ fmt_time(session.started_at) }}
+ {% if session.finished_at %}
+ {{ fmt_duration(session.started_at, session.finished_at) }}
+ {% else %}
+
+
+ active
+
+ {% endif %}
+ {% if session.message_count %}
+ {{ session.message_count }} messages
+ {% endif %}
+
+ {% if session.first_prompt %}
+
+ {{ session.first_prompt }}
+
+ {% endif %}
+
+
+
+
+
+ {% for f in filters %}
+
+ {{ f | capitalize }}
+
+ {% endfor %}
+
+
+
+{% if meta %}
+
+
+
+
+
+
+ analytics
+ {{ "{:,}".format(meta.input_tokens + meta.output_tokens) }} tokens / {{ meta.tool_calls }} tools{% if meta.tool_errors %} ({{ meta.tool_errors }} err) {% endif %}
+
+
+ {% if meta.codeflash %}
+
+
+ codeflash
+
+ {% if meta.codeflash.language %}
+
+ {{ meta.codeflash.language }}
+
+ {% endif %}
+ {% if meta.codeflash.optimization_domain %}
+
+ {{ meta.codeflash.optimization_domain }}
+
+ {% endif %}
+ {% if meta.codeflash.has_researcher %}
+ researcher
+ {% endif %}
+ {% if meta.codeflash.has_reviewer %}
+ reviewer
+ {% endif %}
+ {% if meta.codeflash.has_ci_handler %}
+ CI
+ {% endif %}
+ {% if meta.codeflash.has_pr_prep %}
+ PR prep
+ {% endif %}
+
+ {% endif %}
+
+
+
+
tokens
+
{{ "{:,}".format(meta.input_tokens) }} in / {{ "{:,}".format(meta.output_tokens) }} out
+
+
+
tools
+
{{ meta.tool_calls }} calls
+ {% if meta.tool_errors %}({{ meta.tool_errors }} errors) {% endif %}
+
+
+
+
messages
+
{{ meta.user_messages }} user / {{ meta.assistant_messages }} assistant
+
+ {% if meta.files_modified %}
+
+
files
+
{{ meta.files_modified }} modified +{{ meta.lines_added }} /-{{ meta.lines_removed }}
+
+ {% endif %}
+ {% if meta.git_commits %}
+
+
git
+
{{ meta.git_commits }} commits{% if meta.git_branch %} on {{ meta.git_branch }}{% endif %}
+
+ {% endif %}
+ {% if meta.cache_read_tokens %}
+
+
cache
+
{{ "{:.0%}".format(meta.cache_hit_rate) }} hit rate
+
+ {% endif %}
+ {% if meta.compactions %}
+
+
compactions
+
{{ meta.compactions }}
+
+ {% endif %}
+ {% if meta.thinking_blocks %}
+
+
thinking
+
{{ meta.thinking_blocks }} blocks
+
+ {% endif %}
+ {% if meta.subagents_spawned %}
+
+
subagents
+
{{ meta.subagents_spawned }}
+
+ {% endif %}
+ {% if meta.web_searches or meta.web_fetches %}
+
+
web
+
{{ meta.web_searches }} searches / {{ meta.web_fetches }} fetches
+
+ {% endif %}
+ {% if meta.user_interruptions %}
+
+
interruptions
+
{{ meta.user_interruptions }}
+
+ {% endif %}
+ {% if meta.permission_mode %}
+
+
mode
+
{{ meta.permission_mode }}
+
+ {% endif %}
+
+
+ {% if meta.tool_counts %}
+
+ {% for tool, count in meta.tool_counts|dictsort(by='value', reverse=true) %}
+ {% if loop.index <= 8 %}
+
+ {{ tool }}={{ count }}
+
+ {% endif %}
+ {% endfor %}
+
+ {% endif %}
+
+
+{% endif %}
+
+
+
+
diff --git a/packages/blackbox/src/blackbox/dashboard/templates/partials/session_list.html b/packages/blackbox/src/blackbox/dashboard/templates/partials/session_list.html
new file mode 100644
index 0000000..9b69057
--- /dev/null
+++ b/packages/blackbox/src/blackbox/dashboard/templates/partials/session_list.html
@@ -0,0 +1,49 @@
+{% for s in sessions %}
+
+
+
+ {% if s.is_live %}
+
+ Live
+ {% else %}
+
+ {% endif %}
+
+ {{ fmt_relative(s.started_at) }}
+ {% if s.finished_at %}
+ · {{ fmt_duration(s.started_at, s.finished_at) }}
+ {% endif %}
+
+
+
+
+ {{ s.project_name }}
+
+
+ {% if s.first_prompt %}
+
+ {{ s.first_prompt }}
+
+ {% endif %}
+
+
+ {{ s.session_id[:8] }}
+ {% if s.message_count %}
+ {{ s.message_count }} msgs
+ {% endif %}
+
+
+
+{% endfor %}
+
+{% if not sessions %}
+
+
No sessions found
+
Waiting for Claude Code sessions...
+
+{% endif %}
diff --git a/packages/blackbox/src/blackbox/dashboard/transcript.py b/packages/blackbox/src/blackbox/dashboard/transcript.py
new file mode 100644
index 0000000..8c18411
--- /dev/null
+++ b/packages/blackbox/src/blackbox/dashboard/transcript.py
@@ -0,0 +1,279 @@
+"""Parse Claude Code .jsonl transcripts into LogEntry objects."""
+
+from __future__ import annotations
+
+import json
+from pathlib import Path
+from typing import Any
+
+from blackbox.models import LogEntry, SessionInfo
+
+
+def ts_to_epoch(ts: str | None) -> float:
+ if not ts:
+ return 0.0
+ from datetime import UTC, datetime # noqa: PLC0415
+
+ try:
+ dt = datetime.fromisoformat(ts)
+ return dt.replace(tzinfo=UTC if dt.tzinfo is None else dt.tzinfo).timestamp()
+ except (ValueError, AttributeError):
+ return 0.0
+
+
+def extract_text_content(content: Any) -> str:
+ if isinstance(content, str):
+ return content
+ if isinstance(content, list):
+ return "\n".join(
+ block.get("text", "") for block in content if isinstance(block, dict) and block.get("type") == "text"
+ )
+ return ""
+
+
+def extract_tool_uses(content: Any) -> list[dict[str, Any]]:
+ if not isinstance(content, list):
+ return []
+ return [block for block in content if isinstance(block, dict) and block.get("type") == "tool_use"]
+
+
+def extract_tool_results(content: Any) -> list[dict[str, Any]]:
+ if not isinstance(content, list):
+ return []
+ return [block for block in content if isinstance(block, dict) and block.get("type") == "tool_result"]
+
+
+def parse_transcript(path: Path) -> list[LogEntry]:
+ """Parse a Claude Code .jsonl transcript into a list of LogEntry objects."""
+ entries: list[LogEntry] = []
+ for line in path.read_text().splitlines():
+ if not line.strip():
+ continue
+ try:
+ raw = json.loads(line)
+ except json.JSONDecodeError:
+ continue
+ parsed = parse_entry(raw)
+ entries.extend(parsed)
+ return entries
+
+
+def parse_transcript_tail(path: Path, offset: int) -> tuple[list[LogEntry], int]:
+ """Parse only new bytes appended after *offset*. Returns (entries, new_offset)."""
+ with path.open("rb") as f:
+ f.seek(offset)
+ tail = f.read()
+ new_offset = offset + len(tail)
+ entries: list[LogEntry] = []
+ for line in tail.decode("utf-8", errors="replace").splitlines():
+ if not line.strip():
+ continue
+ try:
+ raw = json.loads(line)
+ except json.JSONDecodeError:
+ continue
+ entries.extend(parse_entry(raw))
+ return entries, new_offset
+
+
+def parse_entry(raw: dict[str, Any]) -> list[LogEntry]:
+ entry_type = raw.get("type", "")
+ ts = ts_to_epoch(raw.get("timestamp"))
+ message = raw.get("message", {})
+
+ if entry_type == "user":
+ return parse_user_entry(ts, message, raw)
+ if entry_type == "assistant":
+ return parse_assistant_entry(ts, message)
+ if entry_type == "system":
+ text = extract_text_content(message.get("content", "")) if isinstance(message, dict) else str(message)
+ if text:
+ return [LogEntry(timestamp=ts, source="system", level="info", message=text)]
+ return []
+
+
+def parse_user_entry(ts: float, message: Any, raw: dict[str, Any]) -> list[LogEntry]:
+ if not isinstance(message, dict):
+ return []
+ content = message.get("content", "")
+ entries: list[LogEntry] = []
+
+ tool_results = extract_tool_results(content)
+ if tool_results:
+ for tr in tool_results:
+ result_text = tr.get("content", "")
+ if isinstance(result_text, list):
+ result_text = " ".join(b.get("text", "") for b in result_text if isinstance(b, dict))
+ is_error = tr.get("is_error", False)
+ tool_use_result = raw.get("toolUseResult", {})
+ if not isinstance(tool_use_result, dict):
+ tool_use_result = {}
+ stdout = tool_use_result.get("stdout", "")
+ stderr = tool_use_result.get("stderr", "")
+ display = stdout or result_text or ""
+ if is_error and stderr:
+ display = stderr
+ level = "error" if is_error else "tool_result"
+ entries.append(LogEntry(timestamp=ts, source="claude", level=level, message=display[:2000]))
+ return entries
+
+ text = extract_text_content(content)
+ if text:
+ entries.append(LogEntry(timestamp=ts, source="user", level="info", message=text))
+ return entries
+
+
+def parse_assistant_entry(ts: float, message: Any) -> list[LogEntry]:
+ if not isinstance(message, dict):
+ return []
+ content = message.get("content", [])
+ if not isinstance(content, list):
+ return [LogEntry(timestamp=ts, source="claude", level="assistant", message=str(content))] if content else []
+
+ entries: list[LogEntry] = []
+ for block in content:
+ if not isinstance(block, dict):
+ continue
+ btype = block.get("type", "")
+ if btype == "text":
+ text = block.get("text", "")
+ if text:
+ entries.append(LogEntry(timestamp=ts, source="claude", level="assistant", message=text))
+ elif btype == "tool_use":
+ tool_name = block.get("name", "tool")
+ tool_input = block.get("input", {})
+ preview = tool_input_preview(tool_name, tool_input)
+ entries.append(
+ LogEntry(
+ timestamp=ts,
+ source="claude",
+ level="tool_call",
+ message=f"{tool_name}: {preview}",
+ data={"tool": tool_name, "input_preview": preview},
+ )
+ )
+ elif btype == "thinking":
+ entries.append(LogEntry(timestamp=ts, source="claude", level="assistant", message="(thinking)"))
+ return entries
+
+
+def tool_input_preview(tool_name: str, tool_input: dict[str, Any]) -> str:
+ if tool_name == "Bash":
+ return str(tool_input.get("command", ""))
+ if tool_name in ("Read", "Write"):
+ return str(tool_input.get("file_path", ""))
+ if tool_name == "Edit":
+ fp = tool_input.get("file_path", "")
+ old = str(tool_input.get("old_string", ""))[:80]
+ return f"{fp}\n{old}..."
+ if tool_name == "Agent":
+ return str(tool_input.get("description", tool_input.get("prompt", "")))[:200]
+ if tool_name == "Skill":
+ return str(tool_input.get("skill", ""))
+ return json.dumps(tool_input, default=str)[:200]
+
+
+def scan_sessions(projects_dir: Path) -> list[SessionInfo]:
+ """Scan ~/.claude/projects/ for session transcripts."""
+ sessions: list[SessionInfo] = []
+ if not projects_dir.is_dir():
+ return sessions
+
+ for project_dir in sorted(projects_dir.iterdir()):
+ if not project_dir.is_dir():
+ continue
+ project_name = decode_project_name(project_dir.name)
+ for jsonl in sorted(project_dir.glob("*.jsonl"), key=lambda p: p.stat().st_mtime, reverse=True):
+ session_id = jsonl.stem
+ info = quick_session_info(jsonl, session_id, project_dir.name, project_name)
+ if info:
+ sessions.append(info)
+
+ sessions.sort(key=lambda s: s.started_at, reverse=True)
+ return sessions
+
+
+def decode_project_name(encoded: str) -> str:
+ parts = encoded.split("-")
+ if len(parts) >= 2 and parts[0] == "":
+ meaningful = [p for p in parts if p not in ("Users", "private", "tmp", "")]
+ if meaningful:
+ return "/".join(meaningful[-2:]) if len(meaningful) >= 2 else meaningful[-1]
+ return encoded
+
+
+def quick_session_info( # noqa: C901, PLR0912
+ path: Path,
+ session_id: str,
+ encoded_project: str,
+ project_name: str,
+) -> SessionInfo | None:
+ """Read just enough of the transcript to build sidebar metadata."""
+ first_prompt = ""
+ started_at = 0.0
+ finished_at = 0.0
+ message_count = 0
+ cwd = ""
+
+ try:
+ with path.open() as f:
+ for i, line in enumerate(f):
+ if not line.strip():
+ continue
+ try:
+ raw = json.loads(line)
+ except json.JSONDecodeError:
+ continue
+ ts = ts_to_epoch(raw.get("timestamp"))
+ if ts and (started_at == 0.0 or ts < started_at):
+ started_at = ts
+ if ts and ts > finished_at:
+ finished_at = ts
+
+ if raw.get("type") == "user":
+ message_count += 1
+ msg = raw.get("message", {})
+ if isinstance(msg, dict) and not first_prompt:
+ content = msg.get("content", "")
+ text = extract_text_content(content)
+ if text and not any(
+ isinstance(b, dict) and b.get("type") == "tool_result"
+ for b in (content if isinstance(content, list) else [])
+ ):
+ first_prompt = text[:120]
+ if not cwd:
+ cwd = raw.get("cwd", "")
+
+ if i > 500:
+ break
+ except OSError:
+ return None
+
+ if started_at == 0.0:
+ return None
+
+ # Use file mtime for finished_at — always accurate even for resumed
+ # sessions, and avoids reading the entire file for long transcripts.
+ try:
+ mtime = path.stat().st_mtime
+ if mtime > started_at:
+ finished_at = mtime
+ except OSError:
+ pass
+
+ display_name = project_name
+ if cwd:
+ parts = Path(cwd).parts
+ if len(parts) >= 2:
+ display_name = "/".join(parts[-2:])
+
+ return SessionInfo(
+ session_id=session_id,
+ project_path=encoded_project,
+ project_name=display_name,
+ transcript_path=str(path),
+ started_at=started_at,
+ finished_at=finished_at if finished_at > started_at else None,
+ first_prompt=first_prompt,
+ message_count=message_count,
+ )
diff --git a/packages/blackbox/src/blackbox/dashboard/watcher.py b/packages/blackbox/src/blackbox/dashboard/watcher.py
new file mode 100644
index 0000000..b96497f
--- /dev/null
+++ b/packages/blackbox/src/blackbox/dashboard/watcher.py
@@ -0,0 +1,85 @@
+"""Watchdog-based live session discovery for ~/.claude/projects/."""
+
+from __future__ import annotations
+
+import threading
+import time
+from collections.abc import Callable
+from pathlib import Path
+from typing import TYPE_CHECKING, Any
+
+from watchdog.events import FileSystemEventHandler
+from watchdog.observers import Observer
+
+if TYPE_CHECKING:
+ from watchdog.events import FileSystemEvent
+
+ from blackbox.models import SessionInfo
+
+LIVE_THRESHOLD_S = 30.0
+
+
+class SessionWatcher(FileSystemEventHandler):
+ """Watches the Claude Code projects directory for transcript changes.
+
+ Tracks which session files have been recently modified so the dashboard
+ can mark them as "live" in the sidebar. Also caches the session list
+ and invalidates it when any transcript file changes.
+ """
+
+ def __init__(self, projects_dir: Path) -> None:
+ self._projects_dir = projects_dir
+ self._lock = threading.Lock()
+ self._last_modified: dict[str, float] = {}
+ self._observer: Any = None
+ self._cached_sessions: list[SessionInfo] | None = None
+
+ def start(self) -> None:
+ if not self._projects_dir.is_dir():
+ return
+ self._observer = Observer()
+ self._observer.schedule(self, str(self._projects_dir), recursive=True)
+ self._observer.daemon = True
+ self._observer.start()
+
+ def stop(self) -> None:
+ if self._observer is not None:
+ self._observer.stop()
+ self._observer.join(timeout=2.0)
+ self._observer = None
+
+ def on_modified(self, event: FileSystemEvent) -> None:
+ if event.is_directory:
+ return
+ path = Path(str(event.src_path))
+ if path.suffix != ".jsonl":
+ return
+ session_id = path.stem
+ with self._lock:
+ self._last_modified[session_id] = time.time()
+ self._cached_sessions = None
+
+ def on_created(self, event: FileSystemEvent) -> None:
+ self.on_modified(event)
+
+ def on_deleted(self, event: FileSystemEvent) -> None:
+ if not event.is_directory and Path(str(event.src_path)).suffix == ".jsonl":
+ with self._lock:
+ self._cached_sessions = None
+
+ def live_session_ids(self) -> set[str]:
+ now = time.time()
+ with self._lock:
+ expired = [sid for sid, ts in self._last_modified.items() if now - ts > LIVE_THRESHOLD_S]
+ for sid in expired:
+ del self._last_modified[sid]
+ return {sid for sid, ts in self._last_modified.items() if now - ts <= LIVE_THRESHOLD_S}
+
+ def get_sessions(self, scan_fn: Callable[[Path], list[SessionInfo]]) -> list[SessionInfo]:
+ with self._lock:
+ if self._cached_sessions is not None:
+ return self._cached_sessions
+ sessions = scan_fn(self._projects_dir)
+ with self._lock:
+ self._cached_sessions = sessions
+ return sessions
diff --git a/packages/blackbox/src/blackbox/formatting.py b/packages/blackbox/src/blackbox/formatting.py
new file mode 100644
index 0000000..203989c
--- /dev/null
+++ b/packages/blackbox/src/blackbox/formatting.py
@@ -0,0 +1,266 @@
+"""Text formatting for analytics models."""
+
+from __future__ import annotations
+
+import json
+
+import attrs
+
+from blackbox.models import (
+ CodeflashSession,
+ ProjectStats,
+ Recommendation,
+ SessionAudit,
+ SessionDigest,
+ SessionMeta,
+ arrow,
+ sparkline,
+)
+
+
+class MetaFormatter:
+ """Formats a SessionMeta for display."""
+
+ def __init__(self, meta: SessionMeta) -> None:
+ self.meta = meta
+
+ def summary(self) -> str:
+ """Format as a human-readable summary."""
+ m = self.meta
+ lines = [
+ f"Session {m.session_id[:8]} ({m.duration_minutes:.0f}min):",
+ f" Messages: {m.user_messages} user / {m.assistant_messages} assistant",
+ f" Tools: {m.tool_calls} calls ({m.tool_errors} errors)",
+ f" Tokens: {m.input_tokens:,} in / {m.output_tokens:,} out (cache hit {m.cache_hit_rate:.0%})",
+ ]
+ if m.git_commits:
+ lines.append(f" Git: {m.git_commits} commits on {m.git_branch or 'unknown'}")
+ if m.files_modified:
+ lines.append(f" Files: {m.files_modified} modified (+{m.lines_added}/-{m.lines_removed})")
+ if m.compactions:
+ lines.append(f" Compactions: {m.compactions}")
+ if m.user_interruptions:
+ lines.append(f" Interruptions: {m.user_interruptions}")
+ if m.thinking_blocks:
+ lines.append(f" Thinking blocks: {m.thinking_blocks}")
+ if m.web_searches or m.web_fetches:
+ lines.append(f" Web: {m.web_searches} searches / {m.web_fetches} fetches")
+ if m.permission_mode:
+ lines.append(f" Permission mode: {m.permission_mode}")
+ top = sorted(m.tool_counts.items(), key=lambda x: x[1], reverse=True)[:5]
+ if top:
+ lines.append(f" Top tools: {', '.join(f'{n}={c}' for n, c in top)}")
+ return "\n".join(lines)
+
+
+class AuditFormatter:
+ """Formats a SessionAudit for display."""
+
+ def __init__(self, audit: SessionAudit) -> None:
+ self.audit = audit
+
+ def summary(self) -> str:
+ """Format as a human-readable summary."""
+ a = self.audit
+ lines = [
+ f"Audit for {a.session_id[:8]}:",
+ f" Outcome: {a.outcome} | Satisfaction: {a.satisfaction}",
+ f" Type: {a.session_type}",
+ ]
+ if a.goal_categories:
+ goals = ", ".join(
+ f"{k}({v})" for k, v in sorted(a.goal_categories.items(), key=lambda x: x[1], reverse=True)[:3]
+ )
+ lines.append(f" Goals: {goals}")
+ if a.friction_counts:
+ frictions = ", ".join(
+ f"{k}({v})" for k, v in sorted(a.friction_counts.items(), key=lambda x: x[1], reverse=True)[:3]
+ )
+ lines.append(f" Friction: {frictions}")
+ if a.user_instructions:
+ lines.append(f" Instructions: {len(a.user_instructions)} extracted")
+ if a.summary:
+ lines.append(f" Summary: {a.summary[:120]}")
+ return "\n".join(lines)
+
+
+class RecommendationFormatter:
+ """Formats a Recommendation for display."""
+
+ def __init__(self, rec: Recommendation) -> None:
+ self.rec = rec
+
+ def summary(self) -> str:
+ """Format as a human-readable summary."""
+ return f"{self.rec.suggestion}\n Evidence: {self.rec.evidence}"
+
+
+class ProjectFormatter:
+ """Formats a ProjectStats for display."""
+
+ def __init__(self, project: ProjectStats) -> None:
+ self.project = project
+
+ def summary(self) -> str:
+ """Format as a human-readable summary."""
+ p = self.project
+ marker = " [!]" if p.is_outlier else ""
+ lines = [
+ f"{p.project_name}{marker}: {p.session_count} sessions, "
+ f"{p.success_rate:.0%} success, "
+ f"{p.avg_tool_errors:.1f} errors/session, "
+ f"{p.avg_duration_s / 60:.0f}min avg"
+ ]
+ if p.top_error_categories:
+ lines.append(f" Errors: {' '.join(f'{n}({c})' for n, c in p.top_error_categories[:3])}")
+ if p.top_friction:
+ lines.append(f" Friction: {' '.join(f'{n}({c})' for n, c in p.top_friction[:3])}")
+ return "\n".join(lines)
+
+
+class DigestFormatter:
+ """Formats a SessionDigest for display."""
+
+ def __init__(self, digest: SessionDigest) -> None:
+ self.digest = digest
+
+ def summary(self) -> str:
+ """Format as a human-readable summary."""
+ lines: list[str] = []
+ self.render_overview(lines)
+ self.render_trends(lines)
+ self.render_projects(lines)
+ self.render_recommendations(lines)
+ return "\n".join(lines)
+
+ def to_json(self) -> str:
+ """Serialize to JSON."""
+ return json.dumps(attrs.asdict(self.digest), indent=2, default=str)
+
+ def render_overview(self, lines: list[str]) -> None:
+ """Render the overview section."""
+ d = self.digest
+ lines.append(f"Session Digest ({d.session_count} sessions)")
+ lines.append("")
+ lines.append(
+ f" {d.success_rate:.0%} success rate | "
+ f"{d.avg_duration_s / 60:.1f}min avg | "
+ f"{d.avg_tool_errors:.1f} errors/session"
+ )
+ lines.append(f" Avg tokens: {d.avg_input_tokens:,.0f} in / {d.avg_output_tokens:,.0f} out")
+ lines.append(f" Avg tool calls: {d.avg_tool_calls:.1f}")
+ if d.outcome_distribution:
+ lines.append("")
+ lines.append("Outcomes:")
+ for outcome, count in sorted(d.outcome_distribution.items(), key=lambda x: x[1], reverse=True):
+ pct = count / max(d.session_count, 1) * 100
+ lines.append(f" {outcome}: {count} ({pct:.0f}%)")
+ if d.satisfaction_distribution:
+ lines.append("")
+ lines.append("Satisfaction:")
+ for sat, count in sorted(d.satisfaction_distribution.items(), key=lambda x: x[1], reverse=True):
+ pct = count / max(d.session_count, 1) * 100
+ lines.append(f" {sat}: {count} ({pct:.0f}%)")
+ if d.top_friction:
+ lines.append("")
+ lines.append("Top friction:")
+ for name, count in d.top_friction[:5]:
+ lines.append(f" {name}: {count}")
+
+ def render_trends(self, lines: list[str]) -> None:
+ """Render the trends section."""
+ d = self.digest
+ if not d.weeks:
+ return
+ lines.append("")
+ lines.append("Trends")
+ lines.append(
+ f" Success rate: {d.rolling_success_rate:.0%} avg "
+ f"({arrow(d.success_rate_change)} {d.success_rate_change:+.0%})"
+ )
+ lines.append(
+ f" Error rate: {d.rolling_error_rate:.1f}/session avg "
+ f"({arrow(d.error_rate_change, invert=True)} {d.error_rate_change:+.1f})"
+ )
+ lines.append(
+ f" Duration: {d.rolling_duration_s / 60:.0f}min avg "
+ f"({arrow(d.duration_change, invert=True)} {d.duration_change / 60:+.0f}min)"
+ )
+ if len(d.weeks) >= 2:
+ lines.append(
+ f" Success: [{sparkline([w.success_rate for w in d.weeks])}] "
+ f"Errors: [{sparkline([w.avg_errors_per_session for w in d.weeks])}]"
+ )
+ if d.error_category_deltas:
+ lines.append("")
+ lines.append(" Error category trends:")
+ for cat, pct, rolling, latest_count in d.error_category_deltas:
+ lines.append(
+ f" {cat}: {arrow(pct, invert=True)} {pct:+.0%} ({rolling:.0f}/wk -> {latest_count:.0f})"
+ )
+ lines.append("")
+ lines.append(" Weekly breakdown:")
+ lines.extend(
+ f" {w.week}: {w.session_count} sessions, "
+ f"{w.success_rate:.0%} success, "
+ f"{w.avg_errors_per_session:.1f} errors, "
+ f"{w.avg_duration_s / 60:.0f}min avg"
+ for w in d.weeks
+ )
+
+ def render_projects(self, lines: list[str]) -> None:
+ """Render the projects section."""
+ d = self.digest
+ if not d.projects:
+ return
+ lines.append("")
+ lines.append(f"Projects ({len(d.projects)})")
+ lines.extend(f" {ProjectFormatter(p).summary()}" for p in d.projects)
+
+ def render_recommendations(self, lines: list[str]) -> None:
+ """Render the recommendations section."""
+ d = self.digest
+ if not d.recommendations:
+ return
+ lines.append("")
+ lines.append("Recommendations")
+ for i, rec in enumerate(d.recommendations, 1):
+ lines.append(f" {i}. {RecommendationFormatter(rec).summary()}")
+
+
+class CodeflashFormatter:
+ """Formats a CodeflashSession for display."""
+
+ def __init__(self, cf: CodeflashSession) -> None:
+ self.cf = cf
+
+ def summary(self) -> str:
+ """Format as a human-readable summary."""
+ if not self.cf.is_codeflash:
+ return "Not a codeflash session"
+ c = self.cf
+ lines = ["Codeflash plugin session"]
+ if c.language:
+ lines[0] += f" ({c.language})"
+ pairs = [
+ (c.optimization_domain, f" Domain: {c.optimization_domain}"),
+ (c.agents_used, f" Agents: {', '.join(c.agents_used)}"),
+ (c.skills_invoked, f" Skills: {', '.join(c.skills_invoked)}"),
+ (c.commands_invoked, f" Commands: {', '.join(c.commands_invoked)}"),
+ (c.teams_created, f" Teams created: {c.teams_created}"),
+ ]
+ lines.extend(text for cond, text in pairs if cond)
+ capabilities = self.capabilities()
+ if capabilities:
+ lines.append(f" Capabilities: {', '.join(capabilities)}")
+ return "\n".join(lines)
+
+ def capabilities(self) -> list[str]:
+ c = self.cf
+ mapping = [
+ (c.has_researcher, "researcher"),
+ (c.has_reviewer, "reviewer"),
+ (c.has_ci_handler, "CI"),
+ (c.has_pr_prep, "PR prep"),
+ ]
+ return [name for flag, name in mapping if flag]
diff --git a/packages/blackbox/src/blackbox/models.py b/packages/blackbox/src/blackbox/models.py
new file mode 100644
index 0000000..882d950
--- /dev/null
+++ b/packages/blackbox/src/blackbox/models.py
@@ -0,0 +1,261 @@
+from __future__ import annotations
+
+from typing import Any
+
+import attrs
+
+# ---------------------------------------------------------------------------
+# Dashboard models
+# ---------------------------------------------------------------------------
+
+
+@attrs.frozen
+class LogEntry:
+ """A single renderable log event."""
+
+ timestamp: float
+ source: str # "claude", "user", "system"
+ level: str # "assistant", "tool_call", "tool_result", "status", "error", "info"
+ message: str
+ data: dict[str, Any] = attrs.Factory(dict)
+
+
+@attrs.frozen
+class SessionInfo:
+ """Lightweight metadata for the sidebar session list."""
+
+ session_id: str
+ project_path: str
+ project_name: str
+ transcript_path: str
+ started_at: float
+ finished_at: float | None = None
+ first_prompt: str = ""
+ message_count: int = 0
+ is_live: bool = False
+
+
+# ---------------------------------------------------------------------------
+# Analytics models
+# ---------------------------------------------------------------------------
+
+SPARK_CHARS = " _.~*"
+
+
+def sparkline(values: list[float]) -> str:
+ if len(values) < 2:
+ return ""
+ lo, hi = min(values), max(values)
+ if hi == lo:
+ return SPARK_CHARS[2] * len(values)
+ scale = len(SPARK_CHARS) - 1
+ return "".join(SPARK_CHARS[round((v - lo) / (hi - lo) * scale)] for v in values)
+
+
+def arrow(delta: float, *, invert: bool = False) -> str:
+ if abs(delta) < 0.05:
+ return "="
+ positive = delta > 0
+ if invert:
+ positive = not positive
+ return "^" if positive else "v"
+
+
+@attrs.frozen
+class SessionEvent:
+ timestamp: str | None
+ speaker: str # "user" | "assistant" | "system"
+ text: str
+ tool_name: str | None
+ file_path: str | None
+ command: str | None
+ is_error: bool
+ error_category: str | None
+ attachment_type: str | None
+
+
+@attrs.frozen
+class SessionMeta:
+ session_id: str
+ project_path: str
+ transcript_path: str
+ start_time: float
+ end_time: float
+ duration_s: float
+ user_messages: int
+ assistant_messages: int
+ tool_calls: int
+ tool_counts: dict[str, int] = attrs.Factory(dict)
+ tool_errors: int = 0
+ tool_error_categories: dict[str, int] = attrs.Factory(dict)
+ tool_error_details: tuple[tuple[str, str], ...] = ()
+ input_tokens: int = 0
+ output_tokens: int = 0
+ cache_read_tokens: int = 0
+ cache_creation_tokens: int = 0
+ languages: dict[str, int] = attrs.Factory(dict)
+ files_modified: int = 0
+ lines_added: int = 0
+ lines_removed: int = 0
+ git_commits: int = 0
+ git_branch: str | None = None
+ user_interruptions: int = 0
+ compactions: int = 0
+ subagents_spawned: int = 0
+ thinking_blocks: int = 0
+ web_searches: int = 0
+ web_fetches: int = 0
+ permission_mode: str | None = None
+ first_prompt: str = ""
+ codeflash: CodeflashSession | None = None
+
+ @property
+ def duration_minutes(self) -> float:
+ return self.duration_s / 60
+
+ @property
+ def total_tokens(self) -> int:
+ return self.input_tokens + self.output_tokens
+
+ @property
+ def cache_hit_rate(self) -> float:
+ total = self.input_tokens + self.cache_read_tokens + self.cache_creation_tokens
+ return self.cache_read_tokens / total if total else 0.0
+
+
+@attrs.frozen
+class SessionAudit:
+ session_id: str
+ goal_categories: dict[str, int] = attrs.Factory(dict)
+ outcome: str = "unclear"
+ satisfaction: str = "neutral"
+ friction_counts: dict[str, int] = attrs.Factory(dict)
+ session_type: str = "single_task"
+ user_instructions: tuple[str, ...] = ()
+ summary: str = ""
+
+
+@attrs.frozen
+class Recommendation:
+ suggestion: str
+ evidence: str
+ frequency: float
+ source_sessions: int
+
+
+@attrs.frozen
+class WeekStats:
+ week: str
+ session_count: int
+ success_rate: float
+ avg_errors_per_session: float
+ avg_duration_s: float
+ error_category_counts: dict[str, int] = attrs.Factory(dict)
+
+
+@attrs.define
+class ProjectStats:
+ project_path: str
+ project_name: str
+ session_count: int
+ success_rate: float
+ avg_tool_errors: float
+ avg_duration_s: float
+ top_error_categories: tuple[tuple[str, int], ...]
+ top_friction: tuple[tuple[str, int], ...]
+ is_outlier: bool = False
+
+
+@attrs.frozen
+class SessionDigest:
+ session_count: int
+ date_range: tuple[float, float]
+ success_rate: float
+ outcome_distribution: dict[str, int] = attrs.Factory(dict)
+ satisfaction_distribution: dict[str, int] = attrs.Factory(dict)
+ top_friction: tuple[tuple[str, int], ...] = ()
+ avg_duration_s: float = 0.0
+ avg_input_tokens: float = 0.0
+ avg_output_tokens: float = 0.0
+ avg_tool_calls: float = 0.0
+ avg_tool_errors: float = 0.0
+ weeks: tuple[WeekStats, ...] = ()
+ rolling_success_rate: float = 0.0
+ rolling_error_rate: float = 0.0
+ rolling_duration_s: float = 0.0
+ success_rate_change: float = 0.0
+ error_rate_change: float = 0.0
+ duration_change: float = 0.0
+ error_category_deltas: tuple[tuple[str, float, float, float], ...] = ()
+ projects: tuple[ProjectStats, ...] = ()
+ recommendations: tuple[Recommendation, ...] = ()
+
+
+# ---------------------------------------------------------------------------
+# Codeflash plugin detection
+# ---------------------------------------------------------------------------
+
+CODEFLASH_AGENT_PREFIXES = (
+ "codeflash",
+ "codeflash-python",
+ "codeflash-deep",
+ "codeflash-cpu",
+ "codeflash-memory",
+ "codeflash-async",
+ "codeflash-structure",
+ "codeflash-setup",
+ "codeflash-scan",
+ "codeflash-ci",
+ "codeflash-pr-prep",
+ "codeflash-researcher",
+ "codeflash-review",
+ "codeflash-javascript",
+ "codeflash-js-deep",
+ "codeflash-js-cpu",
+ "codeflash-js-memory",
+ "codeflash-js-async",
+ "codeflash-js-structure",
+ "codeflash-js-bundle",
+ "codeflash-js-setup",
+ "codeflash-js-scan",
+ "codeflash-js-ci",
+ "codeflash-js-pr-prep",
+ "codeflash-java",
+ "codeflash-java-deep",
+ "codeflash-java-cpu",
+ "codeflash-java-memory",
+ "codeflash-java-async",
+ "codeflash-java-structure",
+ "codeflash-java-setup",
+ "codeflash-java-scan",
+ "codeflash-java-ci",
+ "codeflash-java-pr-prep",
+)
+
+CODEFLASH_SKILLS = (
+ "codeflash-optimize",
+ "memray-profiling",
+)
+
+CODEFLASH_COMMANDS = (
+ "codex-review",
+ "codex-setup",
+ "codex-status",
+)
+
+
+@attrs.frozen
+class CodeflashSession:
+ """Plugin-specific metadata detected from a codeflash agent session."""
+
+ is_codeflash: bool = False
+ language: str | None = None
+ agents_used: tuple[str, ...] = ()
+ skills_invoked: tuple[str, ...] = ()
+ commands_invoked: tuple[str, ...] = ()
+ teams_created: int = 0
+ optimization_domain: str | None = None
+ has_researcher: bool = False
+ has_reviewer: bool = False
+ has_ci_handler: bool = False
+ has_pr_prep: bool = False
diff --git a/packages/blackbox/src/blackbox/py.typed b/packages/blackbox/src/blackbox/py.typed
new file mode 100644
index 0000000..e69de29
diff --git a/packages/blackbox/tests/__init__.py b/packages/blackbox/tests/__init__.py
new file mode 100644
index 0000000..e69de29
diff --git a/packages/blackbox/tests/conftest.py b/packages/blackbox/tests/conftest.py
new file mode 100644
index 0000000..54a5660
--- /dev/null
+++ b/packages/blackbox/tests/conftest.py
@@ -0,0 +1,38 @@
+from __future__ import annotations
+
+from typing import Any
+
+from blackbox.models import SessionAudit, SessionMeta
+
+
+def make_meta(**kw: Any) -> SessionMeta:
+ defaults: dict[str, Any] = {
+ "session_id": "abcd1234-5678-9012-3456-789012345678",
+ "project_path": "/tmp/project",
+ "transcript_path": "/tmp/project/.claude/sessions/abc.jsonl",
+ "start_time": 1700000000.0,
+ "end_time": 1700003600.0,
+ "duration_s": 3600.0,
+ "user_messages": 10,
+ "assistant_messages": 12,
+ "tool_calls": 25,
+ }
+ defaults.update(kw)
+ return SessionMeta(**defaults)
+
+
+def make_audit(**kw: Any) -> SessionAudit:
+ defaults: dict[str, Any] = {
+ "session_id": "abcd1234-5678-9012-3456-789012345678",
+ "outcome": "mostly_achieved",
+ "satisfaction": "satisfied",
+ }
+ defaults.update(kw)
+ return SessionAudit(**defaults)
+
+
+def pair(
+ meta_kw: dict[str, Any] | None = None,
+ audit_kw: dict[str, Any] | None = None,
+) -> tuple[SessionMeta, SessionAudit]:
+ return make_meta(**(meta_kw or {})), make_audit(**(audit_kw or {}))
diff --git a/packages/blackbox/tests/e2e/__init__.py b/packages/blackbox/tests/e2e/__init__.py
new file mode 100644
index 0000000..e69de29
diff --git a/packages/blackbox/tests/e2e/conftest.py b/packages/blackbox/tests/e2e/conftest.py
new file mode 100644
index 0000000..2828cbb
--- /dev/null
+++ b/packages/blackbox/tests/e2e/conftest.py
@@ -0,0 +1,188 @@
+"""Fixtures for Playwright end-to-end tests."""
+
+from __future__ import annotations
+
+import json
+import socket
+import threading
+import time
+from collections.abc import Iterator
+from typing import TYPE_CHECKING
+
+import pytest
+import uvicorn
+
+if TYPE_CHECKING:
+ from pathlib import Path
+
+ from playwright.sync_api import Page
+
+pytestmark = pytest.mark.e2e
+
+SESSION_A_ID = "sess-aaaa1111-2222-3333-4444-555566667777"
+SESSION_B_ID = "sess-bbbb1111-2222-3333-4444-555566667777"
+PROJECT_A_DIR = "-Users-alice-Desktop-work-myapp"
+PROJECT_B_DIR = "-Users-bob-code-webapp"
+
+
+def _jsonl(*entries: dict) -> str:
+ """Serialize entries as newline-delimited JSON."""
+ return "\n".join(json.dumps(e) for e in entries) + "\n"
+
+
+RICH_SESSION = _jsonl(
+ {
+ "type": "user",
+ "timestamp": "2025-03-15T10:00:00Z",
+ "message": {"content": "Help me optimize this function for better performance"},
+ "cwd": "/Users/alice/Desktop/work/myapp",
+ },
+ {
+ "type": "assistant",
+ "timestamp": "2025-03-15T10:00:05Z",
+ "message": {
+ "content": [{"type": "text", "text": "Let me look at the code and find optimization opportunities."}],
+ "usage": {"input_tokens": 500, "output_tokens": 120, "cache_read_input_tokens": 200},
+ },
+ },
+ {
+ "type": "assistant",
+ "timestamp": "2025-03-15T10:00:08Z",
+ "message": {
+ "content": [
+ {
+ "type": "tool_use",
+ "id": "tu_read_1",
+ "name": "Read",
+ "input": {"file_path": "/Users/alice/Desktop/work/myapp/main.py"},
+ }
+ ],
+ "usage": {"input_tokens": 100, "output_tokens": 30},
+ },
+ },
+ {
+ "type": "user",
+ "timestamp": "2025-03-15T10:00:09Z",
+ "message": {
+ "content": [
+ {"type": "tool_result", "tool_use_id": "tu_read_1", "content": "def sort_items(items):\n pass"}
+ ]
+ },
+ },
+ {
+ "type": "assistant",
+ "timestamp": "2025-03-15T10:00:15Z",
+ "message": {
+ "content": [
+ {
+ "type": "tool_use",
+ "id": "tu_bash_1",
+ "name": "Bash",
+ "input": {"command": "uv run pytest tests/ -v"},
+ }
+ ],
+ "usage": {"input_tokens": 200, "output_tokens": 50},
+ },
+ },
+ {
+ "type": "user",
+ "timestamp": "2025-03-15T10:00:20Z",
+ "message": {
+ "content": [{"type": "tool_result", "tool_use_id": "tu_bash_1", "content": "FAILED", "is_error": True}]
+ },
+ "toolUseResult": {"stderr": "AssertionError: expected 42 got 0"},
+ },
+ {
+ "type": "assistant",
+ "timestamp": "2025-03-15T10:00:25Z",
+ "message": {
+ "content": [
+ {"type": "thinking", "thinking": "I need to fix the test."},
+ {"type": "text", "text": "The test failed. Let me fix the implementation."},
+ ],
+ "usage": {"input_tokens": 300, "output_tokens": 80},
+ },
+ },
+ {
+ "type": "user",
+ "timestamp": "2025-03-15T10:01:00Z",
+ "message": {"content": "That looks great, thanks!"},
+ },
+)
+
+MINIMAL_SESSION = _jsonl(
+ {
+ "type": "user",
+ "timestamp": "2025-03-15T09:00:00Z",
+ "message": {"content": "What is this project about?"},
+ "cwd": "/Users/bob/code/webapp",
+ },
+)
+
+
+def _get_free_port() -> int:
+ """Find a free TCP port on localhost."""
+ with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
+ s.bind(("127.0.0.1", 0))
+ return s.getsockname()[1]
+
+
+@pytest.fixture(scope="session")
+def projects_dir(tmp_path_factory: pytest.TempPathFactory) -> Path:
+ """Create a temp directory tree with fixture session transcripts."""
+ root = tmp_path_factory.mktemp("projects")
+
+ project_a = root / PROJECT_A_DIR
+ project_a.mkdir()
+ (project_a / f"{SESSION_A_ID}.jsonl").write_text(RICH_SESSION)
+
+ project_b = root / PROJECT_B_DIR
+ project_b.mkdir()
+ (project_b / f"{SESSION_B_ID}.jsonl").write_text(MINIMAL_SESSION)
+
+ return root
+
+
+@pytest.fixture(scope="session")
+def live_server(projects_dir: Path) -> Iterator[str]:
+ """Start the dashboard on a random free port and yield the base URL."""
+ from blackbox.dashboard.app import create_app
+
+ port = _get_free_port()
+ application = create_app(projects_dir=projects_dir)
+
+ config = uvicorn.Config(application, host="127.0.0.1", port=port, log_level="warning")
+ server = uvicorn.Server(config)
+ thread = threading.Thread(target=server.run, daemon=True)
+ thread.start()
+
+ deadline = time.monotonic() + 10
+ while time.monotonic() < deadline:
+ try:
+ with socket.create_connection(("127.0.0.1", port), timeout=0.5):
+ break
+ except OSError:
+ time.sleep(0.1)
+ else:
+ msg = "Live server did not start in time"
+ raise RuntimeError(msg)
+
+ yield f"http://127.0.0.1:{port}"
+
+ server.should_exit = True
+ thread.join(timeout=5.0)
+
+
+@pytest.fixture(scope="session")
+def base_url(live_server: str) -> str:
+ """Provide the base URL for pytest-playwright's page.goto()."""
+ return live_server
+
+
+@pytest.fixture
+def dashboard(page: Page, base_url: str) -> Page:
+ """Navigate to the dashboard index and wait for session list to load."""
+ page.goto(base_url)
+ page.locator("#session-list-container").wait_for(state="attached")
+ page.locator("#session-list-container > div").first.wait_for(state="visible", timeout=10_000)
+ return page
diff --git a/packages/blackbox/tests/e2e/test_dashboard_loads.py b/packages/blackbox/tests/e2e/test_dashboard_loads.py
new file mode 100644
index 0000000..552d6b7
--- /dev/null
+++ b/packages/blackbox/tests/e2e/test_dashboard_loads.py
@@ -0,0 +1,35 @@
+"""Smoke tests: dashboard loads and renders basic structure."""
+
+from __future__ import annotations
+
+from typing import TYPE_CHECKING
+
+import pytest
+from playwright.sync_api import expect
+
+if TYPE_CHECKING:
+ from playwright.sync_api import Page
+
+pytestmark = pytest.mark.e2e
+
+
+class TestDashboardLoads:
+ """Dashboard renders its core layout on first load."""
+
+ def test_page_title(self, dashboard: Page) -> None:
+ """Page title is 'Blackbox'."""
+ assert "Blackbox" == dashboard.title()
+
+ def test_brand_visible(self, dashboard: Page) -> None:
+ """Sidebar brand text is visible."""
+ expect(dashboard.locator("#sidebar .sidebar-full span.text-accent-400")).to_have_text("blackbox")
+
+ def test_empty_state_without_selection(self, page: Page, base_url: str) -> None:
+ """Empty state shown when no session is selected."""
+ page.goto(base_url)
+ expect(page.get_by_text("Select a session to review")).to_be_visible()
+
+ def test_session_list_loads_via_htmx(self, dashboard: Page) -> None:
+ """Session list container is populated by the HTMX load trigger."""
+ items = dashboard.locator("#session-list-container > div")
+ assert items.count() >= 2
diff --git a/packages/blackbox/tests/e2e/test_session_detail.py b/packages/blackbox/tests/e2e/test_session_detail.py
new file mode 100644
index 0000000..e3964cf
--- /dev/null
+++ b/packages/blackbox/tests/e2e/test_session_detail.py
@@ -0,0 +1,77 @@
+"""Tests for session detail view, filters, and analytics."""
+
+from __future__ import annotations
+
+import re
+from typing import TYPE_CHECKING
+
+import pytest
+from playwright.sync_api import expect
+
+from tests.e2e.conftest import PROJECT_A_DIR, SESSION_A_ID
+
+if TYPE_CHECKING:
+ from playwright.sync_api import Page
+
+pytestmark = pytest.mark.e2e
+
+
+class TestSessionDetail:
+ """Clicking a session loads the detail view."""
+
+ def test_clicking_session_loads_detail(self, dashboard: Page) -> None:
+ """Session detail header appears after clicking a session."""
+ dashboard.get_by_text("work/myapp").first.click()
+ expect(dashboard.locator("#session-detail h2")).to_be_visible(timeout=10_000)
+
+ def test_detail_shows_project_name(self, dashboard: Page) -> None:
+ """Session detail header shows the project name."""
+ dashboard.get_by_text("work/myapp").first.click()
+ expect(dashboard.locator("#session-detail h2")).to_contain_text("work/myapp")
+
+ def test_detail_shows_session_id_prefix(self, dashboard: Page) -> None:
+ """Session detail header shows the 8-char session ID prefix."""
+ dashboard.get_by_text("work/myapp").first.click()
+ detail = dashboard.locator("#session-detail")
+ expect(detail.get_by_text("sess-aaa")).to_be_visible(timeout=10_000)
+
+ def test_filter_buttons_visible(self, dashboard: Page) -> None:
+ """Filter buttons (Compact, All, etc.) are visible after loading a session."""
+ dashboard.get_by_text("work/myapp").first.click()
+ detail = dashboard.locator("#session-detail")
+ expect(detail.get_by_role("button", name="Compact")).to_be_visible(timeout=10_000)
+ expect(detail.get_by_role("button", name="All")).to_be_visible()
+
+ def test_filter_button_active_state(self, dashboard: Page) -> None:
+ """The default filter (compact) has the active accent styling."""
+ dashboard.get_by_text("work/myapp").first.click()
+ compact_btn = dashboard.locator("#session-detail button", has_text="Compact").first
+ expect(compact_btn).to_be_visible(timeout=10_000)
+ expect(compact_btn).to_have_class(re.compile(r"text-accent-400"))
+
+ def test_switching_filter_changes_active_button(self, dashboard: Page) -> None:
+ """Clicking 'All' makes it active and removes active from 'Compact'."""
+ dashboard.get_by_text("work/myapp").first.click()
+ all_btn = dashboard.locator("#session-detail button", has_text="All").first
+ expect(all_btn).to_be_visible(timeout=10_000)
+ all_btn.click()
+ expect(all_btn).to_have_class(re.compile(r"text-accent-400"))
+
+ def test_analytics_panel_exists(self, dashboard: Page) -> None:
+ """Analytics details element is present for sessions with metadata."""
+ dashboard.get_by_text("work/myapp").first.click()
+ analytics = dashboard.locator("#session-detail details")
+ expect(analytics).to_be_visible(timeout=10_000)
+
+ def test_analytics_panel_expands(self, dashboard: Page) -> None:
+ """Clicking the analytics summary expands the panel to show token counts."""
+ dashboard.get_by_text("work/myapp").first.click()
+ summary = dashboard.locator("#session-detail details summary")
+ expect(summary).to_be_visible(timeout=10_000)
+ summary.click()
+ expect(dashboard.get_by_text("tokens", exact=True)).to_be_visible()
+
+ def test_session_not_found(self, page: Page, base_url: str) -> None:
+ """Navigating to a non-existent session shows an error."""
+ page.goto(f"{base_url}/?session={PROJECT_A_DIR}/{SESSION_A_ID.replace('aaaa', 'zzzz')}")
+ expect(page.get_by_text("Session not found")).to_be_visible(timeout=10_000)
diff --git a/packages/blackbox/tests/e2e/test_session_list.py b/packages/blackbox/tests/e2e/test_session_list.py
new file mode 100644
index 0000000..a8df037
--- /dev/null
+++ b/packages/blackbox/tests/e2e/test_session_list.py
@@ -0,0 +1,42 @@
+"""Tests for the session list sidebar."""
+
+from __future__ import annotations
+
+from typing import TYPE_CHECKING
+
+import pytest
+from playwright.sync_api import expect
+
+if TYPE_CHECKING:
+ from playwright.sync_api import Page
+
+pytestmark = pytest.mark.e2e
+
+
+class TestSessionList:
+ """Session list sidebar displays session metadata correctly."""
+
+ def test_project_names_visible(self, dashboard: Page) -> None:
+ """Project names from fixture data appear in the session list."""
+ expect(dashboard.get_by_text("work/myapp")).to_be_visible()
+ expect(dashboard.get_by_text("code/webapp")).to_be_visible()
+
+ def test_first_prompt_shown(self, dashboard: Page) -> None:
+ """First user prompt is displayed in the session item."""
+ expect(dashboard.get_by_text("Help me optimize this function")).to_be_visible()
+
+ def test_session_id_prefix_shown(self, dashboard: Page) -> None:
+ """The 8-char session ID prefix is visible."""
+ expect(dashboard.get_by_text("sess-aaa")).to_be_visible()
+ expect(dashboard.get_by_text("sess-bbb")).to_be_visible()
+
+ def test_message_count_shown(self, dashboard: Page) -> None:
+ """Message count badge is visible for sessions with messages."""
+ expect(dashboard.get_by_text("4 msgs")).to_be_visible()
+
+ def test_session_list_refreshes_on_poll(self, dashboard: Page) -> None:
+ """The HTMX poll fires and the list remains populated."""
+ with dashboard.expect_response("**/sessions*"):
+ dashboard.wait_for_timeout(5500)
+ items = dashboard.locator("#session-list-container > div")
+ assert items.count() >= 2
diff --git a/packages/blackbox/tests/e2e/test_sidebar.py b/packages/blackbox/tests/e2e/test_sidebar.py
new file mode 100644
index 0000000..d6882b3
--- /dev/null
+++ b/packages/blackbox/tests/e2e/test_sidebar.py
@@ -0,0 +1,50 @@
+"""Tests for sidebar collapse/expand and localStorage persistence."""
+
+from __future__ import annotations
+
+import re
+from typing import TYPE_CHECKING
+
+import pytest
+from playwright.sync_api import expect
+
+if TYPE_CHECKING:
+ from playwright.sync_api import Page
+
+pytestmark = pytest.mark.e2e
+
+
+class TestSidebar:
+ """Sidebar collapse/expand behavior and localStorage persistence."""
+
+ def test_collapse_button(self, dashboard: Page) -> None:
+ """Clicking collapse hides the sidebar full content."""
+ dashboard.locator("#collapse-btn").click()
+ expect(dashboard.locator("#sidebar")).to_have_class(re.compile(r"collapsed"))
+
+ def test_expand_button(self, dashboard: Page) -> None:
+ """Clicking expand restores the sidebar."""
+ dashboard.locator("#collapse-btn").click()
+ expect(dashboard.locator("#sidebar")).to_have_class(re.compile(r"collapsed"))
+ dashboard.locator("#expand-btn").click()
+ sidebar_classes = dashboard.locator("#sidebar").get_attribute("class") or ""
+ assert "collapsed" not in sidebar_classes
+
+ def test_collapse_persists_to_localstorage(self, dashboard: Page) -> None:
+ """Collapsing sets localStorage sidebar-collapsed to '1'."""
+ dashboard.locator("#collapse-btn").click()
+ value = dashboard.evaluate("() => localStorage.getItem('sidebar-collapsed')")
+ assert "1" == value
+
+ def test_state_restored_on_reload(self, dashboard: Page, base_url: str) -> None:
+ """Collapsed state persists across page reloads."""
+ dashboard.locator("#collapse-btn").click()
+ dashboard.goto(base_url)
+ expect(dashboard.locator("#sidebar")).to_have_class(re.compile(r"collapsed"))
+
+ def test_expand_clears_localstorage(self, dashboard: Page) -> None:
+ """Expanding sets localStorage sidebar-collapsed to '0'."""
+ dashboard.locator("#collapse-btn").click()
+ dashboard.locator("#expand-btn").click()
+ value = dashboard.evaluate("() => localStorage.getItem('sidebar-collapsed')")
+ assert "0" == value
diff --git a/packages/blackbox/tests/e2e/test_sse_streaming.py b/packages/blackbox/tests/e2e/test_sse_streaming.py
new file mode 100644
index 0000000..f6ab7f6
--- /dev/null
+++ b/packages/blackbox/tests/e2e/test_sse_streaming.py
@@ -0,0 +1,41 @@
+"""Tests for SSE log streaming in the session detail view."""
+
+from __future__ import annotations
+
+from typing import TYPE_CHECKING
+
+import pytest
+from playwright.sync_api import expect
+
+if TYPE_CHECKING:
+ from playwright.sync_api import Page
+
+pytestmark = pytest.mark.e2e
+
+
+class TestSSEStreaming:
+ """SSE log events render log entries in the DOM."""
+
+ def _load_session(self, dashboard: Page) -> None:
+ """Click the rich session to trigger SSE streaming."""
+ dashboard.get_by_text("work/myapp").first.click()
+ dashboard.locator("#log-container").wait_for(state="attached", timeout=10_000)
+
+ def test_log_entries_appear(self, dashboard: Page) -> None:
+ """Log entries are inserted into the log container via SSE."""
+ self._load_session(dashboard)
+ container = dashboard.locator("#log-container")
+ expect(container).not_to_be_empty(timeout=15_000)
+
+ def test_log_container_has_children(self, dashboard: Page) -> None:
+ """Log container receives child elements from SSE events."""
+ self._load_session(dashboard)
+ dashboard.locator("#log-container > *").first.wait_for(state="visible", timeout=15_000)
+ assert dashboard.locator("#log-container > *").count() >= 1
+
+ def test_sse_connection_established(self, dashboard: Page) -> None:
+ """The SSE URL is set on the log container's data attribute."""
+ self._load_session(dashboard)
+ url = dashboard.locator("#log-container").get_attribute("data-sse-url")
+ assert url is not None
+ assert "/logs" in url
diff --git a/packages/blackbox/tests/test_analytics.py b/packages/blackbox/tests/test_analytics.py
new file mode 100644
index 0000000..99b56bb
--- /dev/null
+++ b/packages/blackbox/tests/test_analytics.py
@@ -0,0 +1,693 @@
+"""Tests for analytics extraction and codeflash detection."""
+
+from __future__ import annotations
+
+import json
+from pathlib import Path
+from typing import Any
+
+from blackbox.analytics import (
+ classify_error,
+ count_diff_lines,
+ detect_codeflash,
+ extract_meta,
+ infer_domain,
+ infer_language,
+ track_file_changes,
+)
+
+
+def _ts(offset: int = 0) -> str:
+ return f"2026-04-28T12:00:{offset:02d}Z"
+
+
+def _write_jsonl(path: Path, entries: list[dict[str, Any]]) -> None:
+ path.write_text("\n".join(json.dumps(e) for e in entries) + "\n")
+
+
+# ---------------------------------------------------------------------------
+# extract_meta basics
+# ---------------------------------------------------------------------------
+
+
+class TestExtractMeta:
+ def test_returns_none_for_empty_file(self, tmp_path: Path) -> None:
+ p = tmp_path / "proj" / "empty.jsonl"
+ p.parent.mkdir()
+ p.write_text("")
+ assert extract_meta(p) is None
+
+ def test_returns_none_for_missing_file(self, tmp_path: Path) -> None:
+ assert extract_meta(tmp_path / "missing.jsonl") is None
+
+ def test_returns_none_for_no_timestamps(self, tmp_path: Path) -> None:
+ p = tmp_path / "proj" / "sess.jsonl"
+ p.parent.mkdir()
+ _write_jsonl(p, [{"type": "system", "message": "hello"}])
+ assert extract_meta(p) is None
+
+ def test_basic_session(self, tmp_path: Path) -> None:
+ p = tmp_path / "proj" / "abc123.jsonl"
+ p.parent.mkdir()
+ _write_jsonl(
+ p,
+ [
+ {
+ "type": "user",
+ "timestamp": _ts(0),
+ "message": {"content": "optimize this function"},
+ },
+ {
+ "type": "assistant",
+ "timestamp": _ts(10),
+ "message": {
+ "content": [{"type": "text", "text": "I'll help you."}],
+ "usage": {"input_tokens": 500, "output_tokens": 200},
+ },
+ },
+ ],
+ )
+ meta = extract_meta(p)
+ assert meta is not None
+ assert meta.session_id == "abc123"
+ assert meta.project_path == "proj"
+ assert meta.user_messages == 1
+ assert meta.assistant_messages == 1
+ assert meta.input_tokens == 500
+ assert meta.output_tokens == 200
+ assert "optimize this function" in meta.first_prompt
+
+ def test_counts_tool_calls(self, tmp_path: Path) -> None:
+ p = tmp_path / "proj" / "sess.jsonl"
+ p.parent.mkdir()
+ _write_jsonl(
+ p,
+ [
+ {
+ "type": "assistant",
+ "timestamp": _ts(0),
+ "message": {
+ "content": [
+ {"type": "tool_use", "id": "t1", "name": "Read", "input": {"file_path": "/a.py"}},
+ {
+ "type": "tool_use",
+ "id": "t2",
+ "name": "Edit",
+ "input": {"file_path": "/a.py", "old_string": "x", "new_string": "y"},
+ },
+ ],
+ "usage": {"input_tokens": 100, "output_tokens": 50},
+ },
+ },
+ ],
+ )
+ meta = extract_meta(p)
+ assert meta is not None
+ assert meta.tool_calls == 2
+ assert meta.tool_counts == {"Read": 1, "Edit": 1}
+
+ def test_counts_git_commits(self, tmp_path: Path) -> None:
+ p = tmp_path / "proj" / "sess.jsonl"
+ p.parent.mkdir()
+ _write_jsonl(
+ p,
+ [
+ {
+ "type": "assistant",
+ "timestamp": _ts(0),
+ "message": {
+ "content": [
+ {
+ "type": "tool_use",
+ "id": "t1",
+ "name": "Bash",
+ "input": {"command": "git commit -m 'fix things'"},
+ }
+ ],
+ "usage": {},
+ },
+ },
+ ],
+ )
+ meta = extract_meta(p)
+ assert meta is not None
+ assert meta.git_commits == 1
+
+ def test_amend_not_counted_as_commit(self, tmp_path: Path) -> None:
+ p = tmp_path / "proj" / "sess.jsonl"
+ p.parent.mkdir()
+ _write_jsonl(
+ p,
+ [
+ {
+ "type": "assistant",
+ "timestamp": _ts(0),
+ "message": {
+ "content": [
+ {
+ "type": "tool_use",
+ "id": "t1",
+ "name": "Bash",
+ "input": {"command": "git commit --amend -m 'fix'"},
+ }
+ ],
+ "usage": {},
+ },
+ },
+ ],
+ )
+ meta = extract_meta(p)
+ assert meta is not None
+ assert meta.git_commits == 0
+
+ def test_counts_compactions(self, tmp_path: Path) -> None:
+ p = tmp_path / "proj" / "sess.jsonl"
+ p.parent.mkdir()
+ _write_jsonl(
+ p,
+ [
+ {"type": "user", "timestamp": _ts(0), "message": {"content": "hi"}},
+ {"type": "summary", "timestamp": _ts(5)},
+ {"type": "summary", "timestamp": _ts(10)},
+ ],
+ )
+ meta = extract_meta(p)
+ assert meta is not None
+ assert meta.compactions == 2
+
+ def test_counts_thinking_blocks(self, tmp_path: Path) -> None:
+ p = tmp_path / "proj" / "sess.jsonl"
+ p.parent.mkdir()
+ _write_jsonl(
+ p,
+ [
+ {
+ "type": "assistant",
+ "timestamp": _ts(0),
+ "message": {
+ "content": [
+ {"type": "thinking", "thinking": "let me think..."},
+ {"type": "text", "text": "here's my answer"},
+ ],
+ "usage": {},
+ },
+ },
+ ],
+ )
+ meta = extract_meta(p)
+ assert meta is not None
+ assert meta.thinking_blocks == 1
+
+ def test_tracks_permission_mode(self, tmp_path: Path) -> None:
+ p = tmp_path / "proj" / "sess.jsonl"
+ p.parent.mkdir()
+ _write_jsonl(
+ p,
+ [
+ {"type": "permission-mode", "timestamp": _ts(0), "permissionMode": "bypassPermissions"},
+ {"type": "user", "timestamp": _ts(1), "message": {"content": "go"}},
+ ],
+ )
+ meta = extract_meta(p)
+ assert meta is not None
+ assert meta.permission_mode == "bypassPermissions"
+
+ def test_tracks_web_usage(self, tmp_path: Path) -> None:
+ p = tmp_path / "proj" / "sess.jsonl"
+ p.parent.mkdir()
+ _write_jsonl(
+ p,
+ [
+ {
+ "type": "assistant",
+ "timestamp": _ts(0),
+ "message": {
+ "content": [{"type": "text", "text": "searching"}],
+ "usage": {
+ "input_tokens": 100,
+ "output_tokens": 50,
+ "server_tool_use": {"web_search_requests": 2, "web_fetch_requests": 1},
+ },
+ },
+ },
+ ],
+ )
+ meta = extract_meta(p)
+ assert meta is not None
+ assert meta.web_searches == 2
+ assert meta.web_fetches == 1
+
+ def test_skips_invalid_json_lines(self, tmp_path: Path) -> None:
+ p = tmp_path / "proj" / "sess.jsonl"
+ p.parent.mkdir()
+ p.write_text(
+ json.dumps({"type": "user", "timestamp": _ts(0), "message": {"content": "hi"}})
+ + "\nnot valid json\n"
+ + json.dumps(
+ {
+ "type": "assistant",
+ "timestamp": _ts(1),
+ "message": {"content": [{"type": "text", "text": "ok"}], "usage": {}},
+ }
+ )
+ + "\n"
+ )
+ meta = extract_meta(p)
+ assert meta is not None
+ assert meta.user_messages == 1
+ assert meta.assistant_messages == 1
+
+ def test_tracks_tool_errors(self, tmp_path: Path) -> None:
+ p = tmp_path / "proj" / "sess.jsonl"
+ p.parent.mkdir()
+ _write_jsonl(
+ p,
+ [
+ {
+ "type": "assistant",
+ "timestamp": _ts(0),
+ "message": {
+ "content": [{"type": "tool_use", "id": "t1", "name": "Bash", "input": {"command": "ls /nope"}}],
+ "usage": {},
+ },
+ },
+ {
+ "type": "user",
+ "timestamp": _ts(1),
+ "message": {
+ "content": [
+ {
+ "type": "tool_result",
+ "tool_use_id": "t1",
+ "is_error": True,
+ "content": "command not found",
+ }
+ ],
+ },
+ },
+ ],
+ )
+ meta = extract_meta(p)
+ assert meta is not None
+ assert meta.tool_errors == 1
+ assert meta.tool_error_categories["command_not_found"] == 1
+
+
+# ---------------------------------------------------------------------------
+# classify_error
+# ---------------------------------------------------------------------------
+
+
+class TestClassifyError:
+ def test_edit_always_edit_failed(self) -> None:
+ assert "edit_failed" == classify_error("Edit", {}, {})
+
+ def test_bash_permission_denied(self) -> None:
+ block = {"content": "Permission denied"}
+ assert "permission_denied" == classify_error("Bash", block, {})
+
+ def test_bash_command_not_found(self) -> None:
+ block = {"content": "command not found"}
+ assert "command_not_found" == classify_error("Bash", block, {})
+
+ def test_bash_generic_failure(self) -> None:
+ block = {"content": "exit code 1"}
+ assert "command_failed" == classify_error("Bash", block, {})
+
+ def test_read_file_not_found(self) -> None:
+ block = {"content": "no such file"}
+ assert "file_not_found" == classify_error("Read", block, {})
+
+ def test_write_file_not_found(self) -> None:
+ block = {"content": "not found"}
+ assert "file_not_found" == classify_error("Write", block, {})
+
+ def test_read_generic_error(self) -> None:
+ block = {"content": "some io error"}
+ assert "file_error" == classify_error("Read", block, {})
+
+ def test_unknown_tool(self) -> None:
+ assert "tool_error" == classify_error("CustomTool", {}, {})
+
+ def test_stderr_from_tool_use_result(self) -> None:
+ raw = {"toolUseResult": {"stderr": "Permission denied"}}
+ assert "permission_denied" == classify_error("Bash", {"content": ""}, raw)
+
+
+# ---------------------------------------------------------------------------
+# track_file_changes
+# ---------------------------------------------------------------------------
+
+
+class TestTrackFileChanges:
+ def test_tracks_edit_tool(self) -> None:
+ from collections import Counter
+
+ files: set[str] = set()
+ langs = Counter[str]()
+ track_file_changes("Edit", {"file_path": "/app/main.py"}, files, langs)
+ assert "/app/main.py" in files
+ assert langs["python"] == 1
+
+ def test_ignores_non_edit_tools(self) -> None:
+ from collections import Counter
+
+ files: set[str] = set()
+ langs = Counter[str]()
+ track_file_changes("Read", {"file_path": "/app/main.py"}, files, langs)
+ assert len(files) == 0
+
+ def test_unknown_extension(self) -> None:
+ from collections import Counter
+
+ files: set[str] = set()
+ langs = Counter[str]()
+ track_file_changes("Write", {"file_path": "/app/data.xyz"}, files, langs)
+ assert "/app/data.xyz" in files
+ assert len(langs) == 0
+
+
+# ---------------------------------------------------------------------------
+# count_diff_lines
+# ---------------------------------------------------------------------------
+
+
+class TestCountDiffLines:
+ def test_edit_adds_lines(self) -> None:
+ assert (2, 0) == count_diff_lines("Edit", {"old_string": "a\n", "new_string": "a\nb\nc\n"})
+
+ def test_edit_removes_lines(self) -> None:
+ assert (0, 2) == count_diff_lines("Edit", {"old_string": "a\nb\nc\n", "new_string": "a\n"})
+
+ def test_write_counts_all_lines(self) -> None:
+ assert (3, 0) == count_diff_lines("Write", {"content": "a\nb\nc"})
+
+ def test_other_tools_zero(self) -> None:
+ assert (0, 0) == count_diff_lines("Read", {})
+
+
+# ---------------------------------------------------------------------------
+# detect_codeflash
+# ---------------------------------------------------------------------------
+
+
+class TestDetectCodeflash:
+ def test_returns_none_when_no_signals(self) -> None:
+ assert detect_codeflash(set(), set(), set(), 0) is None
+
+ def test_detects_from_agents(self) -> None:
+ cf = detect_codeflash({"codeflash-python", "codeflash-deep"}, set(), set(), 0)
+ assert cf is not None
+ assert cf.is_codeflash
+ assert cf.language == "python"
+ assert cf.optimization_domain == "deep"
+ assert "codeflash-deep" in cf.agents_used
+ assert "codeflash-python" in cf.agents_used
+
+ def test_detects_from_skills(self) -> None:
+ cf = detect_codeflash(set(), {"codeflash-optimize"}, set(), 0)
+ assert cf is not None
+ assert cf.is_codeflash
+ assert "codeflash-optimize" in cf.skills_invoked
+
+ def test_detects_from_commands(self) -> None:
+ cf = detect_codeflash(set(), set(), {"codex-review"}, 0)
+ assert cf is not None
+ assert "codex-review" in cf.commands_invoked
+
+ def test_tracks_teams(self) -> None:
+ cf = detect_codeflash({"codeflash"}, set(), set(), 3)
+ assert cf is not None
+ assert cf.teams_created == 3
+
+ def test_detects_researcher(self) -> None:
+ cf = detect_codeflash({"codeflash-researcher"}, set(), set(), 0)
+ assert cf is not None
+ assert cf.has_researcher
+
+ def test_detects_reviewer(self) -> None:
+ cf = detect_codeflash({"codeflash-review"}, set(), set(), 0)
+ assert cf is not None
+ assert cf.has_reviewer
+
+ def test_detects_ci_handler(self) -> None:
+ cf = detect_codeflash({"codeflash-ci"}, set(), set(), 0)
+ assert cf is not None
+ assert cf.has_ci_handler
+
+ def test_detects_pr_prep(self) -> None:
+ cf = detect_codeflash({"codeflash-pr-prep"}, set(), set(), 0)
+ assert cf is not None
+ assert cf.has_pr_prep
+
+ def test_infers_javascript_from_prefix(self) -> None:
+ cf = detect_codeflash({"codeflash-js-cpu"}, set(), set(), 0)
+ assert cf is not None
+ assert cf.language == "javascript"
+ assert cf.optimization_domain == "cpu"
+
+ def test_infers_java_from_prefix(self) -> None:
+ cf = detect_codeflash({"codeflash-java-memory"}, set(), set(), 0)
+ assert cf is not None
+ assert cf.language == "java"
+ assert cf.optimization_domain == "memory"
+
+ def test_memory_domain(self) -> None:
+ cf = detect_codeflash({"codeflash-memory"}, set(), set(), 0)
+ assert cf is not None
+ assert cf.optimization_domain == "memory"
+
+ def test_async_domain(self) -> None:
+ cf = detect_codeflash({"codeflash-async"}, set(), set(), 0)
+ assert cf is not None
+ assert cf.optimization_domain == "async"
+
+ def test_structure_domain(self) -> None:
+ cf = detect_codeflash({"codeflash-structure"}, set(), set(), 0)
+ assert cf is not None
+ assert cf.optimization_domain == "structure"
+
+ def test_bundle_domain(self) -> None:
+ cf = detect_codeflash({"codeflash-js-bundle"}, set(), set(), 0)
+ assert cf is not None
+ assert cf.optimization_domain == "bundle"
+
+
+# ---------------------------------------------------------------------------
+# _infer_language / _infer_domain
+# ---------------------------------------------------------------------------
+
+
+class TestInferLanguage:
+ def test_python_from_marker(self) -> None:
+ assert "python" == infer_language({"codeflash-python"})
+
+ def test_javascript_from_marker(self) -> None:
+ assert "javascript" == infer_language({"codeflash-javascript"})
+
+ def test_javascript_from_js_prefix(self) -> None:
+ assert "javascript" == infer_language({"codeflash-js-deep"})
+
+ def test_java_from_marker(self) -> None:
+ assert "java" == infer_language({"codeflash-java"})
+
+ def test_java_from_prefix(self) -> None:
+ assert "java" == infer_language({"codeflash-java-cpu"})
+
+ def test_none_for_generic_agent(self) -> None:
+ assert infer_language({"codeflash"}) is None
+
+ def test_none_for_empty(self) -> None:
+ assert infer_language(set()) is None
+
+
+class TestInferDomain:
+ def test_cpu(self) -> None:
+ assert "cpu" == infer_domain({"codeflash-cpu"})
+
+ def test_memory(self) -> None:
+ assert "memory" == infer_domain({"codeflash-memory"})
+
+ def test_deep(self) -> None:
+ assert "deep" == infer_domain({"codeflash-deep"})
+
+ def test_async(self) -> None:
+ assert "async" == infer_domain({"codeflash-async"})
+
+ def test_structure(self) -> None:
+ assert "structure" == infer_domain({"codeflash-structure"})
+
+ def test_bundle(self) -> None:
+ assert "bundle" == infer_domain({"codeflash-js-bundle"})
+
+ def test_none_for_router_only(self) -> None:
+ assert infer_domain({"codeflash-python"}) is None
+
+ def test_none_for_empty(self) -> None:
+ assert infer_domain(set()) is None
+
+
+# ---------------------------------------------------------------------------
+# extract_meta codeflash integration
+# ---------------------------------------------------------------------------
+
+
+class TestExtractMetaCodeflash:
+ def test_non_codeflash_session_has_none(self, tmp_path: Path) -> None:
+ p = tmp_path / "proj" / "sess.jsonl"
+ p.parent.mkdir()
+ _write_jsonl(
+ p,
+ [
+ {"type": "user", "timestamp": _ts(0), "message": {"content": "hello"}},
+ {
+ "type": "assistant",
+ "timestamp": _ts(1),
+ "message": {
+ "content": [{"type": "text", "text": "hi"}],
+ "usage": {},
+ },
+ },
+ ],
+ )
+ meta = extract_meta(p)
+ assert meta is not None
+ assert meta.codeflash is None
+
+ def test_detects_codeflash_agent_spawn(self, tmp_path: Path) -> None:
+ p = tmp_path / "proj" / "sess.jsonl"
+ p.parent.mkdir()
+ _write_jsonl(
+ p,
+ [
+ {
+ "type": "assistant",
+ "timestamp": _ts(0),
+ "message": {
+ "content": [
+ {
+ "type": "tool_use",
+ "id": "t1",
+ "name": "Agent",
+ "input": {"name": "codeflash-python", "prompt": "optimize"},
+ }
+ ],
+ "usage": {},
+ },
+ },
+ ],
+ )
+ meta = extract_meta(p)
+ assert meta is not None
+ assert meta.codeflash is not None
+ assert meta.codeflash.is_codeflash
+ assert meta.codeflash.language == "python"
+ assert "codeflash-python" in meta.codeflash.agents_used
+
+ def test_detects_codeflash_skill(self, tmp_path: Path) -> None:
+ p = tmp_path / "proj" / "sess.jsonl"
+ p.parent.mkdir()
+ _write_jsonl(
+ p,
+ [
+ {
+ "type": "assistant",
+ "timestamp": _ts(0),
+ "message": {
+ "content": [
+ {
+ "type": "tool_use",
+ "id": "t1",
+ "name": "Skill",
+ "input": {"skill": "codeflash-optimize"},
+ }
+ ],
+ "usage": {},
+ },
+ },
+ ],
+ )
+ meta = extract_meta(p)
+ assert meta is not None
+ assert meta.codeflash is not None
+ assert "codeflash-optimize" in meta.codeflash.skills_invoked
+
+ def test_detects_team_creates(self, tmp_path: Path) -> None:
+ p = tmp_path / "proj" / "sess.jsonl"
+ p.parent.mkdir()
+ _write_jsonl(
+ p,
+ [
+ {
+ "type": "assistant",
+ "timestamp": _ts(0),
+ "message": {
+ "content": [
+ {"type": "tool_use", "id": "t1", "name": "TeamCreate", "input": {}},
+ {
+ "type": "tool_use",
+ "id": "t2",
+ "name": "Agent",
+ "input": {"name": "codeflash-deep", "prompt": "go"},
+ },
+ ],
+ "usage": {},
+ },
+ },
+ ],
+ )
+ meta = extract_meta(p)
+ assert meta is not None
+ assert meta.codeflash is not None
+ assert meta.codeflash.teams_created == 1
+
+ def test_detects_multiple_agents(self, tmp_path: Path) -> None:
+ p = tmp_path / "proj" / "sess.jsonl"
+ p.parent.mkdir()
+ _write_jsonl(
+ p,
+ [
+ {
+ "type": "assistant",
+ "timestamp": _ts(0),
+ "message": {
+ "content": [
+ {
+ "type": "tool_use",
+ "id": "t1",
+ "name": "Agent",
+ "input": {"name": "codeflash-python", "prompt": "start"},
+ },
+ {
+ "type": "tool_use",
+ "id": "t2",
+ "name": "Agent",
+ "input": {"name": "codeflash-deep", "prompt": "optimize"},
+ },
+ {
+ "type": "tool_use",
+ "id": "t3",
+ "name": "Agent",
+ "input": {"name": "codeflash-researcher", "prompt": "research"},
+ },
+ {
+ "type": "tool_use",
+ "id": "t4",
+ "name": "Agent",
+ "input": {"name": "codeflash-review", "prompt": "review"},
+ },
+ ],
+ "usage": {},
+ },
+ },
+ ],
+ )
+ meta = extract_meta(p)
+ assert meta is not None
+ cf = meta.codeflash
+ assert cf is not None
+ assert cf.language == "python"
+ assert cf.optimization_domain == "deep"
+ assert cf.has_researcher
+ assert cf.has_reviewer
+ assert len(cf.agents_used) == 4
diff --git a/packages/blackbox/tests/test_cli.py b/packages/blackbox/tests/test_cli.py
new file mode 100644
index 0000000..e6dd1e5
--- /dev/null
+++ b/packages/blackbox/tests/test_cli.py
@@ -0,0 +1,54 @@
+from __future__ import annotations
+
+from argparse import Namespace
+
+import pytest
+
+from blackbox.cli import main, parse_args, run
+
+
+class TestParseArgs:
+ def test_serve_defaults(self) -> None:
+ args = parse_args(["serve"]).unwrap()
+ assert "serve" == args.command
+ assert 7100 == args.port
+ assert args.no_open is False
+
+ def test_serve_custom_port(self) -> None:
+ args = parse_args(["serve", "--port", "8080"]).unwrap()
+ assert 8080 == args.port
+
+ def test_serve_no_open(self) -> None:
+ args = parse_args(["serve", "--no-open"]).unwrap()
+ assert args.no_open is True
+
+ def test_no_command_errors(self) -> None:
+ with pytest.raises(SystemExit):
+ parse_args([])
+
+
+class TestRun:
+ def test_serve_launches_uvicorn(self, monkeypatch: pytest.MonkeyPatch) -> None:
+ called_with: dict[str, object] = {}
+
+ def fake_uvicorn_run(app: object, **kwargs: object) -> None:
+ called_with["app"] = app
+ called_with.update(kwargs)
+
+ monkeypatch.setattr("uvicorn.run", fake_uvicorn_run)
+ args = parse_args(["serve", "--no-open"]).unwrap()
+ run(args).unwrap()
+ assert "127.0.0.1" == called_with["host"]
+ assert 7100 == called_with["port"]
+
+ def test_unknown_command(self) -> None:
+ args = Namespace(command="bogus")
+ result = run(args)
+ assert not result.is_ok()
+
+
+class TestMain:
+ def test_main_serve(self, monkeypatch: pytest.MonkeyPatch) -> None:
+ monkeypatch.setattr("sys.argv", ["blackbox", "serve", "--no-open"])
+ monkeypatch.setattr("uvicorn.run", lambda *a, **kw: None)
+ main()
diff --git a/packages/blackbox/tests/test_formatting.py b/packages/blackbox/tests/test_formatting.py
new file mode 100644
index 0000000..63e0a69
--- /dev/null
+++ b/packages/blackbox/tests/test_formatting.py
@@ -0,0 +1,333 @@
+from __future__ import annotations
+
+import json
+from typing import Any
+
+from blackbox.formatting import (
+ AuditFormatter,
+ DigestFormatter,
+ MetaFormatter,
+ ProjectFormatter,
+ RecommendationFormatter,
+)
+from blackbox.models import (
+ ProjectStats,
+ Recommendation,
+ SessionAudit,
+ SessionDigest,
+ WeekStats,
+)
+from tests.conftest import make_meta
+
+# ---------------------------------------------------------------------------
+# MetaFormatter
+# ---------------------------------------------------------------------------
+
+
+class TestMetaFormatter:
+ def test_basic(self) -> None:
+ text = MetaFormatter(make_meta(input_tokens=5000, output_tokens=2000, tool_errors=2)).summary()
+ assert "abcd1234" in text
+ assert "60min" in text
+ assert "10 user / 12 assistant" in text
+ assert "25 calls (2 errors)" in text
+ assert "5,000 in / 2,000 out" in text
+
+ def test_with_git(self) -> None:
+ assert "5 commits on main" in MetaFormatter(make_meta(git_commits=5, git_branch="main")).summary()
+
+ def test_git_without_branch(self) -> None:
+ assert "unknown" in MetaFormatter(make_meta(git_commits=1, git_branch=None)).summary()
+
+ def test_with_files(self) -> None:
+ text = MetaFormatter(make_meta(files_modified=3, lines_added=100, lines_removed=20)).summary()
+ assert "3 modified" in text
+ assert "+100/-20" in text
+
+ def test_without_files(self) -> None:
+ assert "modified" not in MetaFormatter(make_meta(files_modified=0)).summary()
+
+ def test_with_compactions(self) -> None:
+ assert "Compactions: 3" in MetaFormatter(make_meta(compactions=3)).summary()
+
+ def test_without_compactions(self) -> None:
+ assert "Compactions" not in MetaFormatter(make_meta(compactions=0)).summary()
+
+ def test_with_interruptions(self) -> None:
+ assert "Interruptions: 2" in MetaFormatter(make_meta(user_interruptions=2)).summary()
+
+ def test_without_interruptions(self) -> None:
+ assert "Interruptions" not in MetaFormatter(make_meta(user_interruptions=0)).summary()
+
+ def test_top_tools_capped_at_5(self) -> None:
+ meta = make_meta(tool_counts={"Read": 20, "Edit": 15, "Bash": 10, "Write": 5, "Grep": 3, "X": 1})
+ text = MetaFormatter(meta).summary()
+ assert "Read=20" in text
+ assert "X=1" not in text
+
+ def test_no_top_tools_when_empty(self) -> None:
+ assert "Top tools" not in MetaFormatter(make_meta(tool_counts={})).summary()
+
+ def test_thinking_blocks_shown_when_nonzero(self) -> None:
+ assert "Thinking blocks: 5" in MetaFormatter(make_meta(thinking_blocks=5)).summary()
+
+ def test_thinking_blocks_hidden_when_zero(self) -> None:
+ assert "Thinking blocks" not in MetaFormatter(make_meta(thinking_blocks=0)).summary()
+
+ def test_web_shown_when_nonzero(self) -> None:
+ text = MetaFormatter(make_meta(web_searches=3, web_fetches=1)).summary()
+ assert "Web: 3 searches / 1 fetches" in text
+
+ def test_web_hidden_when_zero(self) -> None:
+ assert "Web:" not in MetaFormatter(make_meta(web_searches=0, web_fetches=0)).summary()
+
+ def test_permission_mode_shown_when_set(self) -> None:
+ text = MetaFormatter(make_meta(permission_mode="bypassPermissions")).summary()
+ assert "Permission mode: bypassPermissions" in text
+
+ def test_permission_mode_hidden_when_none(self) -> None:
+ assert "Permission mode" not in MetaFormatter(make_meta(permission_mode=None)).summary()
+
+
+# ---------------------------------------------------------------------------
+# AuditFormatter
+# ---------------------------------------------------------------------------
+
+
+class TestAuditFormatter:
+ def test_basic(self) -> None:
+ a = SessionAudit(
+ session_id="abcd1234-5678",
+ outcome="success",
+ satisfaction="positive",
+ session_type="debugging",
+ )
+ text = AuditFormatter(a).summary()
+ assert "abcd1234" in text
+ assert "Outcome: success" in text
+ assert "Satisfaction: positive" in text
+ assert "Type: debugging" in text
+
+ def test_with_goals(self) -> None:
+ a = SessionAudit(session_id="x", goal_categories={"bugfix": 5, "refactor": 3})
+ text = AuditFormatter(a).summary()
+ assert "Goals:" in text
+ assert "bugfix(5)" in text
+
+ def test_without_goals(self) -> None:
+ assert "Goals" not in AuditFormatter(SessionAudit(session_id="x", goal_categories={})).summary()
+
+ def test_with_friction(self) -> None:
+ a = SessionAudit(session_id="x", friction_counts={"permission_denied": 4})
+ assert "permission_denied(4)" in AuditFormatter(a).summary()
+
+ def test_without_friction(self) -> None:
+ assert "Friction" not in AuditFormatter(SessionAudit(session_id="x")).summary()
+
+ def test_with_instructions(self) -> None:
+ a = SessionAudit(session_id="x", user_instructions=("use pytest", "no comments"))
+ assert "Instructions: 2 extracted" in AuditFormatter(a).summary()
+
+ def test_without_instructions(self) -> None:
+ assert "Instructions" not in AuditFormatter(SessionAudit(session_id="x")).summary()
+
+ def test_summary_truncated_at_120(self) -> None:
+ a = SessionAudit(session_id="x", summary="x" * 200)
+ text = AuditFormatter(a).summary()
+ summary_line = next(line for line in text.split("\n") if "Summary" in line)
+ assert len(summary_line.split("Summary: ")[1]) == 120
+
+
+# ---------------------------------------------------------------------------
+# RecommendationFormatter
+# ---------------------------------------------------------------------------
+
+
+class TestRecommendationFormatter:
+ def test_basic(self) -> None:
+ r = Recommendation(suggestion="do X", evidence="50% failure", frequency=0.5, source_sessions=5)
+ text = RecommendationFormatter(r).summary()
+ assert "do X" in text
+ assert "50% failure" in text
+
+
+# ---------------------------------------------------------------------------
+# ProjectFormatter
+# ---------------------------------------------------------------------------
+
+
+class TestProjectFormatter:
+ def make(self, **kw: Any) -> ProjectStats:
+ defaults: dict[str, Any] = {
+ "project_path": "/proj/myapp",
+ "project_name": "myapp",
+ "session_count": 10,
+ "success_rate": 0.9,
+ "avg_tool_errors": 2.5,
+ "avg_duration_s": 600.0,
+ "top_error_categories": (),
+ "top_friction": (),
+ }
+ defaults.update(kw)
+ return ProjectStats(**defaults)
+
+ def test_basic(self) -> None:
+ text = ProjectFormatter(self.make()).summary()
+ assert "myapp: 10 sessions" in text
+ assert "90% success" in text
+
+ def test_outlier_marker(self) -> None:
+ assert "[!]" in ProjectFormatter(self.make(is_outlier=True)).summary()
+
+ def test_error_categories_shown(self) -> None:
+ p = self.make(top_error_categories=(("edit_failed", 8), ("command_failed", 3)))
+ assert "Errors: edit_failed(8)" in ProjectFormatter(p).summary()
+
+ def test_friction_shown(self) -> None:
+ p = self.make(top_friction=(("user_rejected", 4),))
+ assert "Friction: user_rejected(4)" in ProjectFormatter(p).summary()
+
+ def test_no_sub_lines_when_clean(self) -> None:
+ text = ProjectFormatter(self.make()).summary()
+ assert len(text.strip().split("\n")) == 1
+
+
+# ---------------------------------------------------------------------------
+# DigestFormatter
+# ---------------------------------------------------------------------------
+
+
+class TestDigestFormatter:
+ def make(self, **kw: Any) -> SessionDigest:
+ defaults: dict[str, Any] = {"session_count": 10, "date_range": (100.0, 500.0), "success_rate": 0.8}
+ defaults.update(kw)
+ return SessionDigest(**defaults)
+
+ def test_includes_count(self) -> None:
+ assert "42 sessions" in DigestFormatter(self.make(session_count=42)).summary()
+
+ def test_success_rate(self) -> None:
+ assert "80% success rate" in DigestFormatter(self.make(success_rate=0.8)).summary()
+
+ def test_outcome_distribution(self) -> None:
+ digest = self.make(
+ session_count=10,
+ outcome_distribution={"fully_achieved": 7, "unclear": 3},
+ )
+ text = DigestFormatter(digest).summary()
+ assert "fully_achieved: 7 (70%)" in text
+
+ def test_no_trends_without_weeks(self) -> None:
+ assert "Trends" not in DigestFormatter(self.make()).summary()
+
+ def test_trends_with_weeks(self) -> None:
+ w = WeekStats(
+ week="2026-W17", session_count=5, success_rate=0.7, avg_errors_per_session=1.0, avg_duration_s=600.0
+ )
+ digest = self.make(weeks=(w,), rolling_success_rate=0.7)
+ text = DigestFormatter(digest).summary()
+ assert "Trends" in text
+ assert "2026-W17" in text
+
+ def test_no_projects_without_data(self) -> None:
+ assert "Projects" not in DigestFormatter(self.make()).summary()
+
+ def test_no_recommendations_without_data(self) -> None:
+ assert "Recommendations" not in DigestFormatter(self.make()).summary()
+
+ def test_with_recommendations(self) -> None:
+ r = Recommendation(suggestion="Fix the thing", evidence="50% failure", frequency=0.5, source_sessions=10)
+ text = DigestFormatter(self.make(recommendations=(r,))).summary()
+ assert "Recommendations" in text
+ assert "1. Fix the thing" in text
+
+ def test_satisfaction_distribution(self) -> None:
+ digest = self.make(
+ session_count=10,
+ satisfaction_distribution={"happy": 6, "neutral": 4},
+ )
+ text = DigestFormatter(digest).summary()
+ assert "Satisfaction:" in text
+ assert "happy: 6" in text
+
+ def test_top_friction(self) -> None:
+ digest = self.make(top_friction=(("tool_failed", 12), ("blocked", 3)))
+ text = DigestFormatter(digest).summary()
+ assert "Top friction:" in text
+ assert "tool_failed: 12" in text
+
+ def test_sparkline_with_two_weeks(self) -> None:
+ w1 = WeekStats(
+ week="2026-W16", session_count=3, success_rate=0.5, avg_errors_per_session=2.0, avg_duration_s=600.0
+ )
+ w2 = WeekStats(
+ week="2026-W17", session_count=4, success_rate=0.9, avg_errors_per_session=0.5, avg_duration_s=400.0
+ )
+ text = DigestFormatter(self.make(weeks=(w1, w2), rolling_success_rate=0.7)).summary()
+ assert "Success: [" in text
+ assert "Errors: [" in text
+
+ def test_error_category_deltas(self) -> None:
+ w = WeekStats(
+ week="2026-W17", session_count=3, success_rate=0.7, avg_errors_per_session=1.0, avg_duration_s=600.0
+ )
+ digest = self.make(
+ weeks=(w,),
+ error_category_deltas=(("command_failed", 0.5, 4.0, 6.0),),
+ )
+ text = DigestFormatter(digest).summary()
+ assert "Error category trends:" in text
+ assert "command_failed" in text
+
+ def test_with_projects(self) -> None:
+ p = ProjectStats(
+ project_path="/proj/myapp",
+ project_name="myapp",
+ session_count=5,
+ success_rate=0.8,
+ avg_tool_errors=1.0,
+ avg_duration_s=300.0,
+ top_error_categories=(),
+ top_friction=(),
+ )
+ text = DigestFormatter(self.make(projects=(p,))).summary()
+ assert "Projects (1)" in text
+ assert "myapp" in text
+
+
+class TestDigestToJson:
+ def make(self, **kw: Any) -> SessionDigest:
+ defaults: dict[str, Any] = {"session_count": 10, "date_range": (100.0, 500.0), "success_rate": 0.8}
+ defaults.update(kw)
+ return SessionDigest(**defaults)
+
+ def test_valid_json(self) -> None:
+ j = DigestFormatter(self.make(session_count=5)).to_json()
+ parsed = json.loads(j)
+ assert parsed["session_count"] == 5
+
+ def test_with_nested_weeks(self) -> None:
+ w = WeekStats(
+ week="2026-W17", session_count=3, success_rate=0.7, avg_errors_per_session=1.0, avg_duration_s=600.0
+ )
+ j = DigestFormatter(self.make(weeks=(w,))).to_json()
+ parsed = json.loads(j)
+ assert len(parsed["weeks"]) == 1
+ assert parsed["weeks"][0]["week"] == "2026-W17"
+
+ def test_with_nested_projects(self) -> None:
+ p = ProjectStats(
+ project_path="/p",
+ project_name="p",
+ session_count=5,
+ success_rate=0.8,
+ avg_tool_errors=1.0,
+ avg_duration_s=300.0,
+ top_error_categories=(),
+ top_friction=(),
+ is_outlier=False,
+ )
+ j = DigestFormatter(self.make(projects=(p,))).to_json()
+ parsed = json.loads(j)
+ assert len(parsed["projects"]) == 1
+ assert parsed["projects"][0]["project_name"] == "p"
diff --git a/packages/blackbox/tests/test_models.py b/packages/blackbox/tests/test_models.py
new file mode 100644
index 0000000..7d9b793
--- /dev/null
+++ b/packages/blackbox/tests/test_models.py
@@ -0,0 +1,305 @@
+from __future__ import annotations
+
+import json
+from typing import Any
+
+import attrs
+import pytest
+
+from blackbox.models import (
+ ProjectStats,
+ Recommendation,
+ SessionAudit,
+ SessionDigest,
+ SessionEvent,
+ WeekStats,
+ arrow,
+ sparkline,
+)
+from tests.conftest import make_audit, make_meta
+
+# ---------------------------------------------------------------------------
+# sparkline and arrow
+# ---------------------------------------------------------------------------
+
+
+class TestSparkline:
+ def test_empty_or_single_returns_empty(self) -> None:
+ assert "" == sparkline([])
+ assert "" == sparkline([1.0])
+
+ def test_ascending_produces_increasing_chars(self) -> None:
+ result = sparkline([0.0, 0.5, 1.0])
+ assert len(result) == 3
+ assert result[0] <= result[-1]
+
+ def test_descending_produces_decreasing_chars(self) -> None:
+ result = sparkline([1.0, 0.5, 0.0])
+ assert result[0] >= result[-1]
+
+ def test_constant_values_produce_middle_char(self) -> None:
+ result = sparkline([5.0, 5.0, 5.0])
+ assert len(result) == 3
+ assert len(set(result)) == 1
+
+ def test_two_values_uses_full_range(self) -> None:
+ result = sparkline([0.0, 1.0])
+ assert len(result) == 2
+ assert result[0] != result[-1]
+
+
+class TestArrow:
+ def test_near_zero_delta_returns_equals(self) -> None:
+ assert "=" == arrow(0.0)
+ assert "=" == arrow(0.04)
+ assert "=" == arrow(-0.04)
+
+ def test_positive_delta_returns_up(self) -> None:
+ assert "^" == arrow(0.1)
+
+ def test_negative_delta_returns_down(self) -> None:
+ assert "v" == arrow(-0.1)
+
+ def test_invert_flips_positive(self) -> None:
+ assert "v" == arrow(0.1, invert=True)
+
+ def test_invert_flips_negative(self) -> None:
+ assert "^" == arrow(-0.1, invert=True)
+
+ def test_invert_near_zero_still_equals(self) -> None:
+ assert "=" == arrow(0.0, invert=True)
+
+
+# ---------------------------------------------------------------------------
+# SessionEvent
+# ---------------------------------------------------------------------------
+
+
+class TestSessionEvent:
+ def test_construction(self) -> None:
+ e = SessionEvent(
+ timestamp="2024-01-01T00:00:00Z",
+ speaker="user",
+ text="hello",
+ tool_name=None,
+ file_path=None,
+ command=None,
+ is_error=False,
+ error_category=None,
+ attachment_type=None,
+ )
+ assert e.speaker == "user"
+ assert e.text == "hello"
+ assert not e.is_error
+
+ def test_frozen(self) -> None:
+ e = SessionEvent("ts", "user", "hi", None, None, None, False, None, None)
+ with pytest.raises(attrs.exceptions.FrozenInstanceError):
+ e.speaker = "assistant" # type: ignore[misc]
+
+ def test_equality(self) -> None:
+ e1 = SessionEvent("ts", "user", "hi", None, None, None, False, None, None)
+ e2 = SessionEvent("ts", "user", "hi", None, None, None, False, None, None)
+ assert e1 == e2
+
+ def test_attrs_asdict(self) -> None:
+ e = SessionEvent("ts", "user", "hi", None, None, None, False, None, None)
+ d = attrs.asdict(e)
+ assert d["speaker"] == "user"
+ assert d["text"] == "hi"
+ assert json.dumps(d) # JSON-serializable
+
+
+# ---------------------------------------------------------------------------
+# SessionMeta — properties
+# ---------------------------------------------------------------------------
+
+
+class TestSessionMetaProperties:
+ def test_duration_minutes(self) -> None:
+ assert make_meta(duration_s=3600.0).duration_minutes == 60.0
+
+ def test_duration_minutes_zero(self) -> None:
+ assert make_meta(duration_s=0.0).duration_minutes == 0.0
+
+ def test_total_tokens(self) -> None:
+ assert make_meta(input_tokens=1000, output_tokens=500).total_tokens == 1500
+
+ def test_total_tokens_default(self) -> None:
+ assert make_meta().total_tokens == 0
+
+ def test_cache_hit_rate(self) -> None:
+ meta = make_meta(input_tokens=500, cache_read_tokens=300, cache_creation_tokens=200)
+ assert meta.cache_hit_rate == 0.3
+
+ def test_cache_hit_rate_zero_tokens(self) -> None:
+ assert make_meta().cache_hit_rate == 0.0
+
+ def test_cache_hit_rate_full(self) -> None:
+ meta = make_meta(input_tokens=0, cache_read_tokens=1000, cache_creation_tokens=0)
+ assert meta.cache_hit_rate == 1.0
+
+
+class TestSessionMetaFrozen:
+ def test_frozen(self) -> None:
+ meta = make_meta()
+ with pytest.raises(attrs.exceptions.FrozenInstanceError):
+ meta.session_id = "new" # type: ignore[misc]
+
+
+class TestSessionMetaAsDict:
+ def test_returns_dict(self) -> None:
+ d = attrs.asdict(make_meta())
+ assert isinstance(d, dict)
+ assert d["session_id"] == "abcd1234-5678-9012-3456-789012345678"
+ assert d["duration_s"] == 3600.0
+
+ def test_includes_optional_fields(self) -> None:
+ d = attrs.asdict(make_meta(git_branch="feature", git_commits=3))
+ assert d["git_branch"] == "feature"
+ assert d["git_commits"] == 3
+
+
+# ---------------------------------------------------------------------------
+# SessionAudit
+# ---------------------------------------------------------------------------
+
+
+class TestSessionAuditDefaults:
+ def test_defaults(self) -> None:
+ a = SessionAudit(session_id="x")
+ assert a.outcome == "unclear"
+ assert a.satisfaction == "neutral"
+ assert a.session_type == "single_task"
+ assert a.goal_categories == {}
+ assert a.friction_counts == {}
+ assert a.user_instructions == ()
+ assert a.summary == ""
+
+ def test_frozen(self) -> None:
+ a = SessionAudit(session_id="x")
+ with pytest.raises(attrs.exceptions.FrozenInstanceError):
+ a.outcome = "success" # type: ignore[misc]
+
+
+class TestSessionAuditAsDict:
+ def test_returns_dict(self) -> None:
+ a = make_audit()
+ d = attrs.asdict(a)
+ assert isinstance(d, dict)
+ assert d["outcome"] == "mostly_achieved"
+
+ def test_reflects_values(self) -> None:
+ a = make_audit(outcome="success", satisfaction="positive", session_type="multi_task")
+ d = attrs.asdict(a)
+ assert d["outcome"] == "success"
+ assert d["session_type"] == "multi_task"
+
+
+# ---------------------------------------------------------------------------
+# ProjectStats — mutable is_outlier
+# ---------------------------------------------------------------------------
+
+
+class TestProjectStats:
+ def make(self, **kw: Any) -> ProjectStats:
+ defaults: dict[str, Any] = {
+ "project_path": "/proj/myapp",
+ "project_name": "myapp",
+ "session_count": 10,
+ "success_rate": 0.9,
+ "avg_tool_errors": 2.5,
+ "avg_duration_s": 600.0,
+ "top_error_categories": (),
+ "top_friction": (),
+ }
+ defaults.update(kw)
+ return ProjectStats(**defaults)
+
+ def test_is_outlier_default_false(self) -> None:
+ assert not self.make().is_outlier
+
+ def test_is_outlier_mutable(self) -> None:
+ p = self.make()
+ p.is_outlier = True
+ assert p.is_outlier
+
+
+# ---------------------------------------------------------------------------
+# WeekStats + Recommendation
+# ---------------------------------------------------------------------------
+
+
+class TestWeekStats:
+ def test_frozen(self) -> None:
+ w = WeekStats(
+ week="2026-W17", session_count=3, success_rate=0.7, avg_errors_per_session=1.0, avg_duration_s=600.0
+ )
+ with pytest.raises(attrs.exceptions.FrozenInstanceError):
+ w.session_count = 5 # type: ignore[misc]
+
+ def test_default_error_counts(self) -> None:
+ w = WeekStats(
+ week="2026-W17", session_count=3, success_rate=0.7, avg_errors_per_session=1.0, avg_duration_s=600.0
+ )
+ assert w.error_category_counts == {}
+
+
+class TestRecommendation:
+ def test_frozen(self) -> None:
+ r = Recommendation(suggestion="do X", evidence="50%", frequency=0.5, source_sessions=5)
+ with pytest.raises(attrs.exceptions.FrozenInstanceError):
+ r.suggestion = "do Y" # type: ignore[misc]
+
+
+# ---------------------------------------------------------------------------
+# SessionDigest
+# ---------------------------------------------------------------------------
+
+
+class TestSessionDigest:
+ def make(self, **kw: Any) -> SessionDigest:
+ defaults: dict[str, Any] = {"session_count": 10, "date_range": (100.0, 500.0), "success_rate": 0.8}
+ defaults.update(kw)
+ return SessionDigest(**defaults)
+
+ def test_attrs_asdict(self) -> None:
+ d = attrs.asdict(self.make())
+ assert d["session_count"] == 10
+ assert d["success_rate"] == 0.8
+
+ def test_frozen(self) -> None:
+ d = self.make()
+ with pytest.raises(attrs.exceptions.FrozenInstanceError):
+ d.session_count = 99 # type: ignore[misc]
+
+ def test_json_serializable(self) -> None:
+ j = json.dumps(attrs.asdict(self.make(session_count=5)), indent=2, default=str)
+ parsed = json.loads(j)
+ assert parsed["session_count"] == 5
+
+ def test_json_with_nested_weeks(self) -> None:
+ w = WeekStats(
+ week="2026-W17", session_count=3, success_rate=0.7, avg_errors_per_session=1.0, avg_duration_s=600.0
+ )
+ j = json.dumps(attrs.asdict(self.make(weeks=(w,))), indent=2, default=str)
+ parsed = json.loads(j)
+ assert len(parsed["weeks"]) == 1
+ assert parsed["weeks"][0]["week"] == "2026-W17"
+
+ def test_json_with_nested_projects(self) -> None:
+ p = ProjectStats(
+ project_path="/p",
+ project_name="p",
+ session_count=5,
+ success_rate=0.8,
+ avg_tool_errors=1.0,
+ avg_duration_s=300.0,
+ top_error_categories=(),
+ top_friction=(),
+ is_outlier=False,
+ )
+ j = json.dumps(attrs.asdict(self.make(projects=(p,))), indent=2, default=str)
+ parsed = json.loads(j)
+ assert len(parsed["projects"]) == 1
+ assert parsed["projects"][0]["project_name"] == "p"
diff --git a/packages/blackbox/tests/test_rendering.py b/packages/blackbox/tests/test_rendering.py
new file mode 100644
index 0000000..429d589
--- /dev/null
+++ b/packages/blackbox/tests/test_rendering.py
@@ -0,0 +1,268 @@
+from __future__ import annotations
+
+import time
+
+from blackbox.dashboard.rendering import (
+ esc,
+ esc_md,
+ fmt_duration,
+ fmt_relative,
+ fmt_time,
+ passes_filter,
+ render_log_html,
+ shorten_paths,
+ tool_call_html,
+)
+from blackbox.models import LogEntry
+
+# ---------------------------------------------------------------------------
+# fmt_time
+# ---------------------------------------------------------------------------
+
+
+class TestFmtTime:
+ def test_epoch_zero(self) -> None:
+ assert "00:00:00" == fmt_time(0.0)
+
+ def test_known_timestamp(self) -> None:
+ assert "01:46:40" == fmt_time(1_000_000_000.0)
+
+ def test_fractional_seconds_truncated(self) -> None:
+ assert "00:00:00" == fmt_time(0.999)
+
+
+# ---------------------------------------------------------------------------
+# fmt_duration
+# ---------------------------------------------------------------------------
+
+
+class TestFmtDuration:
+ def test_zero_seconds(self) -> None:
+ assert "0s" == fmt_duration(100.0, 100.0)
+
+ def test_seconds_only(self) -> None:
+ assert "45s" == fmt_duration(0.0, 45.0)
+
+ def test_minutes_and_seconds(self) -> None:
+ assert "2m30s" == fmt_duration(0.0, 150.0)
+
+ def test_hours_and_minutes(self) -> None:
+ assert "1h30m" == fmt_duration(0.0, 5400.0)
+
+ def test_negative_clamps_to_zero(self) -> None:
+ assert "0s" == fmt_duration(100.0, 50.0)
+
+ def test_none_finished_uses_current_time(self) -> None:
+ result = fmt_duration(time.time() - 10, None)
+ assert result.endswith("s")
+
+ def test_exactly_60_seconds(self) -> None:
+ assert "1m00s" == fmt_duration(0.0, 60.0)
+
+ def test_exactly_one_hour(self) -> None:
+ assert "1h00m" == fmt_duration(0.0, 3600.0)
+
+
+# ---------------------------------------------------------------------------
+# fmt_relative
+# ---------------------------------------------------------------------------
+
+
+class TestFmtRelative:
+ def test_just_now(self) -> None:
+ assert "just now" == fmt_relative(time.time())
+
+ def test_minutes_ago(self) -> None:
+ assert "5m ago" == fmt_relative(time.time() - 300)
+
+ def test_hours_ago(self) -> None:
+ assert "2h ago" == fmt_relative(time.time() - 7200)
+
+ def test_days_ago(self) -> None:
+ assert "3d ago" == fmt_relative(time.time() - 259200)
+
+
+# ---------------------------------------------------------------------------
+# esc / esc_md
+# ---------------------------------------------------------------------------
+
+
+class TestEsc:
+ def test_ampersand(self) -> None:
+ assert "a & b" == esc("a & b")
+
+ def test_angle_brackets(self) -> None:
+ assert "<div>" == esc("")
+
+ def test_newlines_become_br(self) -> None:
+ assert "a b" == esc("a\nb")
+
+ def test_combined(self) -> None:
+ assert "<b>hi</b> &" == esc("hi \n&")
+
+
+class TestEscMd:
+ def test_bold_converted(self) -> None:
+ result = esc_md("hello **world**")
+ assert 'world ' in result
+
+ def test_html_still_escaped(self) -> None:
+ result = esc_md("