From 0ad5e60523c787191538d40014336bfe86718a64 Mon Sep 17 00:00:00 2001 From: Kevin Turcios <106575910+KRRT7@users.noreply.github.com> Date: Tue, 28 Apr 2026 19:58:43 -0500 Subject: [PATCH] Add blackbox package: session flight recorder with HTMX dashboard (#39) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * feat(blackbox): add package with models, CLI, and HTMX dashboard * test(blackbox): add comprehensive test coverage for dashboard * feat(blackbox): cache session scanning via watcher invalidation * docs(blackbox): add README and use fastapi[standard] for dev server * refactor(blackbox): extract presentation logic into formatter classes * refactor(blackbox): extract classify_error helpers * feat(blackbox): wire analytics into session detail view Show token usage, tool breakdowns, and session stats in a collapsible panel when viewing a session. * feat(blackbox): add codeflash plugin detection Detect codeflash agent names, skills, and commands in transcripts. Surface language, optimization domain, and capability badges in the analytics panel. * refactor(blackbox): remove underscore prefixes from internal functions * chore: add ty python-version to root pyproject.toml * chore(blackbox): fix lint errors in test files * style(blackbox): apply ruff formatting to analytics * feat(blackbox): add Playwright E2E tests for dashboard Refactor app.py to expose create_app() factory accepting a projects_dir override, enabling tests to run against fixture data instead of the real ~/.claude/projects/ directory. Routes now read projects_dir from app.state instead of the module-level constant. Add 26 Playwright tests across 5 files covering dashboard loading, session list, session detail with filters and analytics, sidebar collapse/localStorage persistence, and SSE log streaming. All tests pass on chromium, firefox, and webkit (78 total). CI gets a new e2e-blackbox job with a browser matrix strategy running all three engines in parallel, conditional on blackbox path changes, with trace upload on failure. * fix(ci): sync only blackbox package in e2e job * fix(ci): exclude e2e tests from unit test job The test job doesn't install Playwright browsers, so e2e tests error when pytest collects them. Ignore tests/e2e/ directories in the test job — those are handled by the dedicated e2e-blackbox job. --- .github/workflows/ci.yml | 35 +- packages/blackbox/README.md | 61 ++ packages/blackbox/pyproject.toml | 95 +++ packages/blackbox/src/blackbox/__init__.py | 23 + packages/blackbox/src/blackbox/analytics.py | 393 ++++++++++ packages/blackbox/src/blackbox/cli.py | 58 ++ .../src/blackbox/dashboard/__init__.py | 0 .../blackbox/src/blackbox/dashboard/app.py | 37 + .../src/blackbox/dashboard/rendering.py | 180 +++++ .../blackbox/src/blackbox/dashboard/routes.py | 170 +++++ .../blackbox/dashboard/templates/base.html | 108 +++ .../blackbox/dashboard/templates/index.html | 98 +++ .../templates/partials/session_detail.html | 189 +++++ .../templates/partials/session_list.html | 49 ++ .../src/blackbox/dashboard/transcript.py | 279 +++++++ .../src/blackbox/dashboard/watcher.py | 85 +++ packages/blackbox/src/blackbox/formatting.py | 266 +++++++ packages/blackbox/src/blackbox/models.py | 261 +++++++ packages/blackbox/src/blackbox/py.typed | 0 packages/blackbox/tests/__init__.py | 0 packages/blackbox/tests/conftest.py | 38 + packages/blackbox/tests/e2e/__init__.py | 0 packages/blackbox/tests/e2e/conftest.py | 188 +++++ .../tests/e2e/test_dashboard_loads.py | 35 + .../blackbox/tests/e2e/test_session_detail.py | 77 ++ .../blackbox/tests/e2e/test_session_list.py | 42 ++ packages/blackbox/tests/e2e/test_sidebar.py | 50 ++ .../blackbox/tests/e2e/test_sse_streaming.py | 41 ++ packages/blackbox/tests/test_analytics.py | 693 ++++++++++++++++++ packages/blackbox/tests/test_cli.py | 54 ++ packages/blackbox/tests/test_formatting.py | 333 +++++++++ packages/blackbox/tests/test_models.py | 305 ++++++++ packages/blackbox/tests/test_rendering.py | 268 +++++++ packages/blackbox/tests/test_routes.py | 74 ++ packages/blackbox/tests/test_transcript.py | 552 ++++++++++++++ packages/blackbox/tests/test_watcher.py | 169 +++++ pyproject.toml | 3 + uv.lock | 569 +++++++++++++- 38 files changed, 5855 insertions(+), 23 deletions(-) create mode 100644 packages/blackbox/README.md create mode 100644 packages/blackbox/pyproject.toml create mode 100644 packages/blackbox/src/blackbox/__init__.py create mode 100644 packages/blackbox/src/blackbox/analytics.py create mode 100644 packages/blackbox/src/blackbox/cli.py create mode 100644 packages/blackbox/src/blackbox/dashboard/__init__.py create mode 100644 packages/blackbox/src/blackbox/dashboard/app.py create mode 100644 packages/blackbox/src/blackbox/dashboard/rendering.py create mode 100644 packages/blackbox/src/blackbox/dashboard/routes.py create mode 100644 packages/blackbox/src/blackbox/dashboard/templates/base.html create mode 100644 packages/blackbox/src/blackbox/dashboard/templates/index.html create mode 100644 packages/blackbox/src/blackbox/dashboard/templates/partials/session_detail.html create mode 100644 packages/blackbox/src/blackbox/dashboard/templates/partials/session_list.html create mode 100644 packages/blackbox/src/blackbox/dashboard/transcript.py create mode 100644 packages/blackbox/src/blackbox/dashboard/watcher.py create mode 100644 packages/blackbox/src/blackbox/formatting.py create mode 100644 packages/blackbox/src/blackbox/models.py create mode 100644 packages/blackbox/src/blackbox/py.typed create mode 100644 packages/blackbox/tests/__init__.py create mode 100644 packages/blackbox/tests/conftest.py create mode 100644 packages/blackbox/tests/e2e/__init__.py create mode 100644 packages/blackbox/tests/e2e/conftest.py create mode 100644 packages/blackbox/tests/e2e/test_dashboard_loads.py create mode 100644 packages/blackbox/tests/e2e/test_session_detail.py create mode 100644 packages/blackbox/tests/e2e/test_session_list.py create mode 100644 packages/blackbox/tests/e2e/test_sidebar.py create mode 100644 packages/blackbox/tests/e2e/test_sse_streaming.py create mode 100644 packages/blackbox/tests/test_analytics.py create mode 100644 packages/blackbox/tests/test_cli.py create mode 100644 packages/blackbox/tests/test_formatting.py create mode 100644 packages/blackbox/tests/test_models.py create mode 100644 packages/blackbox/tests/test_rendering.py create mode 100644 packages/blackbox/tests/test_routes.py create mode 100644 packages/blackbox/tests/test_transcript.py create mode 100644 packages/blackbox/tests/test_watcher.py diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index a46cd09..b96e581 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -66,18 +66,49 @@ jobs: - name: Test changed packages run: | if [ "${{ github.event_name }}" = "push" ]; then - uv run pytest packages/ -v + uv run pytest packages/ -v --ignore=packages/blackbox/tests/e2e else CHANGED='${{ needs.changes.outputs.packages }}' for pkg in $(echo "$CHANGED" | jq -r '.[]'); do echo "::group::Testing $pkg" - uv run pytest "packages/$pkg" -v + uv run pytest "packages/$pkg" -v --ignore="packages/$pkg/tests/e2e" echo "::endgroup::" done fi env: CI: "true" + e2e-blackbox: + needs: changes + if: >- + github.event_name == 'push' + || contains(fromJSON(needs.changes.outputs.packages), 'blackbox') + runs-on: ubuntu-latest + permissions: + contents: read + strategy: + fail-fast: false + matrix: + browser: [chromium, firefox, webkit] + steps: + - uses: actions/checkout@v6 + - uses: astral-sh/setup-uv@v8.0.0 + with: + python-version: "3.12" + enable-cache: true + - run: uv sync --package blackbox + - name: Install Playwright browsers + run: uv run playwright install --with-deps ${{ matrix.browser }} + - name: Run E2E tests (${{ matrix.browser }}) + run: uv run pytest packages/blackbox/tests/e2e/ -v --browser ${{ matrix.browser }} --tracing=retain-on-failure + - name: Upload Playwright traces + if: failure() + uses: actions/upload-artifact@v4 + with: + name: playwright-traces-${{ matrix.browser }} + path: test-results/ + retention-days: 7 + version: if: github.event_name == 'pull_request' runs-on: ubuntu-latest diff --git a/packages/blackbox/README.md b/packages/blackbox/README.md new file mode 100644 index 0000000..47bec9f --- /dev/null +++ b/packages/blackbox/README.md @@ -0,0 +1,61 @@ +# blackbox + +A flight data recorder for AI coding agent sessions. + +## Why "blackbox"? + +Aircraft carry black boxes (flight data recorders) that silently capture +everything during a flight, then become invaluable when you need to +understand what happened. This package does the same for AI coding agent +sessions: it watches, records, and lets you replay what the agent did, +how it spent tokens, where it got stuck, and whether the session achieved +its goal. + +Currently supports Claude Code. Codex and Gemini support is planned. + +## What it does + +**Dashboard** -- a local HTMX web UI for browsing session transcripts +in real time. + +- Sidebar with all sessions from `~/.claude/projects/`, sorted by recency +- Live session detection via filesystem watching (green dot indicator) +- Streaming log view with filter presets (all, compact, important, errors) +- Tool call previews, error highlighting, user message formatting + +**Analytics models** -- structured data types for session-level metrics, +weekly trends, project breakdowns, and recommendations. These feed into +the analysis pipeline (in progress) that will produce session digests +and surface patterns across sessions. + +## Usage + +```bash +blackbox serve # open dashboard at http://localhost:7100 +blackbox serve --port 8080 # custom port +blackbox serve --no-open # don't auto-open browser +``` + +## Package structure + +``` +src/blackbox/ + cli.py # CLI entry point (serve command) + models.py # All domain models (attrs frozen classes) + dashboard/ + app.py # FastAPI instance + lifespan + routes.py # API endpoints + SSE log streaming + rendering.py # HTML rendering, filtering, formatting + transcript.py # JSONL transcript parser + session scanner + watcher.py # Watchdog-based live session detection + cache + templates/ # Jinja2 templates (Tailwind + HTMX) +``` + +## Development + +```bash +uv sync +uv run fastapi dev src/blackbox/dashboard/app.py # hot reload on :8000 +uv run pytest tests/ -v +uv run ruff check src/ tests/ +``` diff --git a/packages/blackbox/pyproject.toml b/packages/blackbox/pyproject.toml new file mode 100644 index 0000000..1575af9 --- /dev/null +++ b/packages/blackbox/pyproject.toml @@ -0,0 +1,95 @@ +[project] +name = "blackbox" +version = "0.1.0" +description = "Flight data recorder for AI coding agent sessions" +requires-python = ">=3.12" +dependencies = [ + "attrs>=24.2.0", + "danom>=0.13.0", + "fastapi[standard]>=0.115.0", + "jinja2>=3.1.0", + "sse-starlette>=2.0.0", + "uvicorn>=0.30.0", + "watchdog>=4.0.0", +] + +[project.scripts] +blackbox = "blackbox.cli:main" + +[build-system] +requires = ["uv_build>=0.7.2,<0.8"] +build-backend = "uv_build" + +[tool.uv.sources] +danom = { git = "https://github.com/KRRT7/danom.git", branch = "feat/add-py-typed" } + +[dependency-groups] +dev = [ + "pytest>=9.0.3", + "pytest-cov>=6.2.1", + "ruff>=0.15.12", + "interrogate>=1.7.0", + "pytest-asyncio>=1.3.0", + "ty>=0.0.33", + "pytest-playwright>=0.7.2", +] +typing = [ + "mypy>=1.20.2", +] + +[tool.ty.environment] +python-version = "3.12" + +[tool.mypy] +strict = true +warn_return_any = true +warn_unused_configs = true + +[tool.pytest.ini_options] +asyncio_mode = "auto" +markers = [ + "e2e: end-to-end browser tests (requires playwright)", +] + +[tool.coverage.run] +source = ["blackbox"] +branch = true + +[tool.coverage.report] +show_missing = true +skip_empty = true + +[tool.interrogate] +fail-under = 100 +verbose = 2 + +[tool.ruff] +line-length = 120 + +[tool.ruff.lint] +select = ["ALL"] +ignore = [ + "A", + "ANN", + "ARG", + "ASYNC240", + "COM812", + "D", + "E501", + "EM", + "FBT", + "ISC001", + "PLR2004", + "RET504", + "S", + "SIM300", + "TC003", + "TRY003", +] + +[tool.ruff.lint.per-file-ignores] +"tests/**" = [ + "PLC0415", + "SIM300", + "SLF001", +] diff --git a/packages/blackbox/src/blackbox/__init__.py b/packages/blackbox/src/blackbox/__init__.py new file mode 100644 index 0000000..0b68c34 --- /dev/null +++ b/packages/blackbox/src/blackbox/__init__.py @@ -0,0 +1,23 @@ +"""Blackbox — flight data recorder for AI coding agent sessions.""" + +from __future__ import annotations + +from blackbox.models import ( + ProjectStats, + Recommendation, + SessionAudit, + SessionDigest, + SessionEvent, + SessionMeta, + WeekStats, +) + +__all__ = [ + "ProjectStats", + "Recommendation", + "SessionAudit", + "SessionDigest", + "SessionEvent", + "SessionMeta", + "WeekStats", +] diff --git a/packages/blackbox/src/blackbox/analytics.py b/packages/blackbox/src/blackbox/analytics.py new file mode 100644 index 0000000..db3f230 --- /dev/null +++ b/packages/blackbox/src/blackbox/analytics.py @@ -0,0 +1,393 @@ +"""Extract structured analytics from Claude Code session transcripts.""" + +from __future__ import annotations + +import json +from collections import Counter +from pathlib import Path +from typing import Any + +from blackbox.dashboard.transcript import ts_to_epoch +from blackbox.models import ( + CODEFLASH_AGENT_PREFIXES, + CODEFLASH_COMMANDS, + CODEFLASH_SKILLS, + CodeflashSession, + SessionMeta, +) + +EDIT_TOOLS = {"Edit", "Write", "NotebookEdit"} +FILE_EXTENSIONS: dict[str, str] = { + ".py": "python", + ".js": "javascript", + ".ts": "typescript", + ".tsx": "typescript", + ".jsx": "javascript", + ".go": "go", + ".rs": "rust", + ".java": "java", + ".rb": "ruby", + ".sh": "shell", + ".bash": "shell", + ".zsh": "shell", + ".toml": "toml", + ".yaml": "yaml", + ".yml": "yaml", + ".json": "json", + ".md": "markdown", + ".html": "html", + ".css": "css", +} + + +def extract_meta(path: Path) -> SessionMeta | None: # noqa: C901, PLR0912, PLR0915 + """Extract a SessionMeta from a raw .jsonl transcript.""" + session_id = path.stem + project_path = path.parent.name + + timestamps: list[float] = [] + user_messages = 0 + assistant_messages = 0 + tool_calls = 0 + tool_counts: Counter[str] = Counter() + tool_errors = 0 + tool_error_categories: Counter[str] = Counter() + tool_error_details: list[tuple[str, str]] = [] + input_tokens = 0 + output_tokens = 0 + cache_read_tokens = 0 + cache_creation_tokens = 0 + files_modified: set[str] = set() + lines_added = 0 + lines_removed = 0 + git_commits = 0 + git_branch: str | None = None + user_interruptions = 0 + compactions = 0 + subagents_spawned = 0 + thinking_blocks = 0 + web_searches = 0 + web_fetches = 0 + permission_mode: str | None = None + languages: Counter[str] = Counter() + first_prompt = "" + pending_tools: dict[str, str] = {} + codeflash_agents: set[str] = set() + codeflash_skills: set[str] = set() + codeflash_commands: set[str] = set() + teams_created = 0 + + try: + text = path.read_text() + except OSError: + return None + + for line in text.splitlines(): + if not line.strip(): + continue + try: + raw = json.loads(line) + except json.JSONDecodeError: + continue + + ts = ts_to_epoch(raw.get("timestamp")) + if ts: + timestamps.append(ts) + + entry_type = raw.get("type", "") + + if entry_type == "permission-mode": + permission_mode = raw.get("permissionMode") + continue + + if entry_type == "summary": + compactions += 1 + continue + + if git_branch is None and raw.get("gitBranch"): + git_branch = raw["gitBranch"] + + if entry_type == "user": + msg = raw.get("message", {}) + if not isinstance(msg, dict): + continue + content = msg.get("content", "") + if isinstance(content, str) and content.strip(): + user_messages += 1 + if not first_prompt: + first_prompt = content[:120] + elif isinstance(content, list): + has_tool_result = any(isinstance(b, dict) and b.get("type") == "tool_result" for b in content) + if has_tool_result: + for block in content: + if not isinstance(block, dict) or block.get("type") != "tool_result": + continue + tool_use_id = block.get("tool_use_id", "") + is_error = block.get("is_error", False) + if is_error: + tool_errors += 1 + tool_name = pending_tools.get(tool_use_id, "unknown") + category = classify_error(tool_name, block, raw) + tool_error_categories[category] += 1 + stderr = "" + tur = raw.get("toolUseResult", {}) + if isinstance(tur, dict): + stderr = tur.get("stderr", "") + detail_text = stderr or str(block.get("content", ""))[:200] + tool_error_details.append((category, detail_text)) + else: + user_messages += 1 + if not first_prompt: + texts = [b.get("text", "") for b in content if isinstance(b, dict) and b.get("type") == "text"] + first_prompt = " ".join(texts)[:120] + + interrupted = False + tur = raw.get("toolUseResult", {}) + if isinstance(tur, dict): + interrupted = tur.get("interrupted", False) + if interrupted: + user_interruptions += 1 + + elif entry_type == "assistant": + msg = raw.get("message", {}) + if not isinstance(msg, dict): + continue + usage = msg.get("usage", {}) + if usage: + input_tokens += usage.get("input_tokens", 0) + output_tokens += usage.get("output_tokens", 0) + cache_read_tokens += usage.get("cache_read_input_tokens", 0) + cache_creation_tokens += usage.get("cache_creation_input_tokens", 0) + stu = usage.get("server_tool_use", {}) + if stu: + web_searches += stu.get("web_search_requests", 0) + web_fetches += stu.get("web_fetch_requests", 0) + + content = msg.get("content", []) + if not isinstance(content, list): + assistant_messages += 1 + continue + + has_text = False + for block in content: + if not isinstance(block, dict): + continue + btype = block.get("type", "") + if btype == "text" and block.get("text", "").strip(): + has_text = True + elif btype == "thinking": + thinking_blocks += 1 + elif btype == "tool_use": + tool_name = block.get("name", "unknown") + tool_calls += 1 + tool_counts[tool_name] += 1 + tool_id = block.get("id", "") + if tool_id: + pending_tools[tool_id] = tool_name + if tool_name == "Agent": + subagents_spawned += 1 + if tool_name == "TeamCreate": + teams_created += 1 + tool_input = block.get("input", {}) + if isinstance(tool_input, dict): + track_file_changes( + tool_name, + tool_input, + files_modified, + languages, + ) + lines_a, lines_r = count_diff_lines(tool_name, tool_input) + lines_added += lines_a + lines_removed += lines_r + if tool_name == "Bash": + cmd = tool_input.get("command", "") + if isinstance(cmd, str) and "git commit" in cmd and "--amend" not in cmd: + git_commits += 1 + if tool_name == "Agent": + agent_name = tool_input.get("name", "") or tool_input.get("subagent_type", "") + if agent_name in CODEFLASH_AGENT_PREFIXES: + codeflash_agents.add(agent_name) + if tool_name == "Skill": + skill_name = tool_input.get("skill", "") + if skill_name in CODEFLASH_SKILLS: + codeflash_skills.add(skill_name) + if skill_name in CODEFLASH_COMMANDS: + codeflash_commands.add(skill_name) + if has_text: + assistant_messages += 1 + + if not timestamps: + return None + + start_time = min(timestamps) + end_time = max(timestamps) + + return SessionMeta( + session_id=session_id, + project_path=project_path, + transcript_path=str(path), + start_time=start_time, + end_time=end_time, + duration_s=end_time - start_time, + user_messages=user_messages, + assistant_messages=assistant_messages, + tool_calls=tool_calls, + tool_counts=dict(tool_counts), + tool_errors=tool_errors, + tool_error_categories=dict(tool_error_categories), + tool_error_details=tuple(tool_error_details), + input_tokens=input_tokens, + output_tokens=output_tokens, + cache_read_tokens=cache_read_tokens, + cache_creation_tokens=cache_creation_tokens, + languages=dict(languages), + files_modified=len(files_modified), + lines_added=lines_added, + lines_removed=lines_removed, + git_commits=git_commits, + git_branch=git_branch, + user_interruptions=user_interruptions, + compactions=compactions, + subagents_spawned=subagents_spawned, + thinking_blocks=thinking_blocks, + web_searches=web_searches, + web_fetches=web_fetches, + permission_mode=permission_mode, + first_prompt=first_prompt, + codeflash=detect_codeflash(codeflash_agents, codeflash_skills, codeflash_commands, teams_created), + ) + + +def classify_error(tool_name: str, block: dict[str, Any], raw: dict[str, Any]) -> str: + """Classify a tool error into a category based on tool name and error content.""" + tur = raw.get("toolUseResult", {}) + stderr = "" + if isinstance(tur, dict): + stderr = tur.get("stderr", "") + error_text = (stderr or str(block.get("content", ""))).lower() + + if tool_name == "Edit": + return "edit_failed" + if tool_name == "Bash": + return classify_bash_error(error_text) + if tool_name in ("Read", "Write"): + return classify_file_error(error_text) + return "tool_error" + + +def classify_bash_error(error_text: str) -> str: + if "permission denied" in error_text: + return "permission_denied" + if "command not found" in error_text: + return "command_not_found" + return "command_failed" + + +def classify_file_error(error_text: str) -> str: + if "not found" in error_text or "no such file" in error_text: + return "file_not_found" + return "file_error" + + +def track_file_changes( + tool_name: str, + tool_input: dict[str, Any], + files: set[str], + languages: Counter[str], +) -> None: + """Track which files were modified and what languages are involved.""" + if tool_name not in EDIT_TOOLS: + return + fp = tool_input.get("file_path", "") + if not fp: + return + files.add(fp) + ext = Path(fp).suffix.lower() + lang = FILE_EXTENSIONS.get(ext) + if lang: + languages[lang] += 1 + + +def count_diff_lines(tool_name: str, tool_input: dict[str, Any]) -> tuple[int, int]: + """Estimate lines added/removed from Edit and Write tool inputs.""" + if tool_name == "Edit": + old = tool_input.get("old_string", "") + new = tool_input.get("new_string", "") + if isinstance(old, str) and isinstance(new, str): + old_lines = old.count("\n") + (1 if old else 0) + new_lines = new.count("\n") + (1 if new else 0) + added = max(0, new_lines - old_lines) + removed = max(0, old_lines - new_lines) + return added, removed + if tool_name == "Write": + content = tool_input.get("content", "") + if isinstance(content, str): + return content.count("\n") + 1, 0 + return 0, 0 + + +def detect_codeflash( + agents: set[str], + skills: set[str], + commands: set[str], + teams_created: int, +) -> CodeflashSession | None: + """Build a CodeflashSession if any codeflash plugin signals were detected.""" + if not agents and not skills and not commands: + return None + + language = infer_language(agents) + domain = infer_domain(agents) + + return CodeflashSession( + is_codeflash=True, + language=language, + agents_used=tuple(sorted(agents)), + skills_invoked=tuple(sorted(skills)), + commands_invoked=tuple(sorted(commands)), + teams_created=teams_created, + optimization_domain=domain, + has_researcher="codeflash-researcher" in agents, + has_reviewer="codeflash-review" in agents, + has_ci_handler=any(a.endswith("-ci") for a in agents), + has_pr_prep=any(a.endswith("-pr-prep") for a in agents), + ) + + +LANGUAGE_AGENT_MARKERS: dict[str, str] = { + "codeflash-python": "python", + "codeflash-javascript": "javascript", + "codeflash-java": "java", +} + + +def infer_language(agents: set[str]) -> str | None: + """Infer the target language from which language-specific agents were invoked.""" + for marker, lang in LANGUAGE_AGENT_MARKERS.items(): + if marker in agents: + return lang + for agent in agents: + if agent.startswith("codeflash-js-"): + return "javascript" + if agent.startswith("codeflash-java-"): + return "java" + return None + + +DOMAIN_AGENT_SUFFIXES: dict[str, str] = { + "-cpu": "cpu", + "-memory": "memory", + "-async": "async", + "-structure": "structure", + "-deep": "deep", + "-bundle": "bundle", +} + + +def infer_domain(agents: set[str]) -> str | None: + """Infer the optimization domain from specialist agents used.""" + for agent in agents: + for suffix, domain in DOMAIN_AGENT_SUFFIXES.items(): + if agent.endswith(suffix): + return domain + return None diff --git a/packages/blackbox/src/blackbox/cli.py b/packages/blackbox/src/blackbox/cli.py new file mode 100644 index 0000000..91a743d --- /dev/null +++ b/packages/blackbox/src/blackbox/cli.py @@ -0,0 +1,58 @@ +from __future__ import annotations + +import argparse +import sys +import webbrowser + +from danom import Err, safe + + +@safe +def parse_args(argv: list[str] | None = None) -> argparse.Namespace: + parser = argparse.ArgumentParser( + prog="blackbox", + description="Flight data recorder for AI coding agent sessions", + ) + subparsers = parser.add_subparsers(dest="command", required=True) + + serve_parser = subparsers.add_parser("serve", help="Launch the session dashboard") + serve_parser.add_argument("--port", type=int, default=7100) + serve_parser.add_argument("--no-open", action="store_true", help="Don't open browser automatically") + + return parser.parse_args(argv) + + +@safe +def run(args: argparse.Namespace) -> None: + if args.command == "serve": + run_serve(args) + else: + msg = f"Unknown command: {args.command}" + raise ValueError(msg) + + +def run_serve(args: argparse.Namespace) -> None: + import uvicorn # noqa: PLC0415 + + from blackbox.dashboard.app import app # noqa: PLC0415 + + if not args.no_open: + import threading # noqa: PLC0415 + + def open_browser() -> None: + import time # noqa: PLC0415 + + time.sleep(1.0) + webbrowser.open(f"http://localhost:{args.port}") + + threading.Thread(target=open_browser, daemon=True).start() + + uvicorn.run(app, host="127.0.0.1", port=args.port, log_level="warning") + + +def main() -> None: + args = parse_args().unwrap() + result = run(args) + if isinstance(result, Err): + print(f"Error: {result.error}", file=sys.stderr) # noqa: T201 + sys.exit(1) diff --git a/packages/blackbox/src/blackbox/dashboard/__init__.py b/packages/blackbox/src/blackbox/dashboard/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/packages/blackbox/src/blackbox/dashboard/app.py b/packages/blackbox/src/blackbox/dashboard/app.py new file mode 100644 index 0000000..e76c5c0 --- /dev/null +++ b/packages/blackbox/src/blackbox/dashboard/app.py @@ -0,0 +1,37 @@ +"""FastAPI + HTMX dashboard for browsing Claude Code session transcripts.""" + +from __future__ import annotations + +from contextlib import asynccontextmanager +from pathlib import Path +from typing import TYPE_CHECKING + +from fastapi import FastAPI + +from blackbox.dashboard.routes import PROJECTS_DIR, router +from blackbox.dashboard.watcher import SessionWatcher + +if TYPE_CHECKING: + from collections.abc import AsyncIterator + + +def create_app(projects_dir: Path | None = None) -> FastAPI: + """Create the dashboard FastAPI app, optionally overriding the projects directory.""" + actual_dir = projects_dir or PROJECTS_DIR + + @asynccontextmanager + async def lifespan(the_app: FastAPI) -> AsyncIterator[None]: + """Start/stop the session watcher around the app lifecycle.""" + watcher = SessionWatcher(actual_dir) + watcher.start() + the_app.state.watcher = watcher + the_app.state.projects_dir = actual_dir + yield + watcher.stop() + + application = FastAPI(title="blackbox", lifespan=lifespan) + application.include_router(router) + return application + + +app = create_app() diff --git a/packages/blackbox/src/blackbox/dashboard/rendering.py b/packages/blackbox/src/blackbox/dashboard/rendering.py new file mode 100644 index 0000000..a561251 --- /dev/null +++ b/packages/blackbox/src/blackbox/dashboard/rendering.py @@ -0,0 +1,180 @@ +"""HTML rendering helpers for log entries.""" + +from __future__ import annotations + +import re +import time +from datetime import UTC, datetime +from typing import TYPE_CHECKING + +if TYPE_CHECKING: + from blackbox.models import LogEntry + +FILTER_PRESETS: dict[str, set[str] | None] = { + "all": None, + "compact": None, + "important": {"status", "assistant", "error", "info"}, + "errors": {"error"}, +} + +SKIP_LEVELS = { + "delta", + "stream", + "block_stop", + "block_start", + "thinking_delta", + "tool_start", +} + + +def fmt_time(ts: float) -> str: + return datetime.fromtimestamp(ts, tz=UTC).strftime("%H:%M:%S") + + +def fmt_duration(started: float, finished: float | None) -> str: + end = finished or time.time() + secs = int(end - started) + if secs < 0: + return "0s" + if secs < 60: + return f"{secs}s" + mins, secs = divmod(secs, 60) + if mins < 60: + return f"{mins}m{secs:02d}s" + hrs, mins = divmod(mins, 60) + return f"{hrs}h{mins:02d}m" + + +def fmt_relative(ts: float) -> str: + delta = time.time() - ts + if delta < 60: + return "just now" + if delta < 3600: + return f"{int(delta / 60)}m ago" + if delta < 86400: + return f"{int(delta / 3600)}h ago" + return f"{int(delta / 86400)}d ago" + + +def passes_filter( + entry: LogEntry, + filter_name: str, + allowed: set[str] | None, +) -> bool: + stripped = entry.message.strip() + if not stripped: + return False + if filter_name == "all": + return True + if entry.level in SKIP_LEVELS: + return False + if allowed is not None and entry.level not in allowed: + return False + return not (entry.level == "assistant" and stripped == "(thinking)") + + +def esc(text: str) -> str: + return text.replace("&", "&").replace("<", "<").replace(">", ">").replace("\n", "
") + + +BOLD_RE = re.compile(r"\*\*(.+?)\*\*") + + +def esc_md(text: str) -> str: + escaped = esc(text) + return BOLD_RE.sub(r'\1', escaped) + + +def shorten_paths(text: str) -> str: + text = re.sub(r"/(?:private/)?tmp/[^\s\"']+/?", "", text) + return text + + +SOURCE_CLASSES = { + "claude": "bg-blue-500", + "user": "bg-green-500", + "system": "bg-surface-700", +} + +SOURCE_LABELS = { + "claude": "CLU", + "user": "USR", + "system": "SYS", +} + + +def tool_call_html(preview: str) -> str: + shortened = shorten_paths(preview) + lines = shortened.split("\n") + if len(lines) <= 3: + return f'{esc(shortened)}' + summary = esc(lines[0]) + rest = esc("\n".join(lines[1:])) + return ( + f'{summary}' + f'
+{len(lines) - 1}' + f" lines" + f'
{rest}
' + f"
" + ) + + +def render_log_html(entry: LogEntry) -> str: # noqa: C901, PLR0912 + ts = fmt_time(entry.timestamp) + src_cls = SOURCE_CLASSES.get(entry.source, "bg-gray-600") + src_label = SOURCE_LABELS.get(entry.source, entry.source[:3].upper()) + + if entry.level == "tool_call": + tool = entry.data.get("tool", "tool") + preview = entry.data.get("input_preview", entry.message) + badge_cls = "bg-amber-500" + badge_label = esc(tool[:12]) + elif entry.level == "tool_result": + badge_cls = "bg-gray-700" + badge_label = "RES" + else: + badge_cls = src_cls + badge_label = src_label + + if entry.source == "user" and entry.level == "info": + msg = f'{esc_md(entry.message)}' + elif entry.level == "tool_call": + preview = entry.data.get("input_preview", "") + msg = tool_call_html(preview) + elif entry.level == "tool_result": + text = entry.message[:500] + if len(entry.message) > 500: + text += "..." + msg = f'{esc(shorten_paths(text))}' + elif entry.level == "assistant": + if entry.message.strip() == "(thinking)": + msg = f'{esc(entry.message)}' + else: + msg = f'{esc_md(entry.message)}' + elif entry.level == "error": + msg = f'{esc(entry.message)}' + else: + msg = f'{esc(entry.message)}' + + extra_div_classes = "" + if entry.level == "assistant" and entry.message.strip() == "(thinking)": + extra_div_classes = " border-t border-surface-800 mt-2 pt-1" + + is_tool = entry.level in ("tool_call", "tool_result") + opacity = " opacity-60" if is_tool else "" + indent = " pl-4" if is_tool else "" + + return ( + f'
' + f'{ts}' + f'' + f"{badge_label}" + f'{msg}' + f"
" + ) diff --git a/packages/blackbox/src/blackbox/dashboard/routes.py b/packages/blackbox/src/blackbox/dashboard/routes.py new file mode 100644 index 0000000..3b4abba --- /dev/null +++ b/packages/blackbox/src/blackbox/dashboard/routes.py @@ -0,0 +1,170 @@ +"""Route handlers for the session dashboard.""" + +from __future__ import annotations + +import asyncio +from pathlib import Path +from typing import TYPE_CHECKING + +import attrs +from fastapi import APIRouter, Request +from fastapi.responses import HTMLResponse +from fastapi.templating import Jinja2Templates +from sse_starlette.sse import EventSourceResponse, ServerSentEvent # type: ignore[attr-defined] + +from blackbox.analytics import extract_meta +from blackbox.dashboard.rendering import ( + FILTER_PRESETS, + fmt_duration, + fmt_relative, + fmt_time, + passes_filter, + render_log_html, +) +from blackbox.dashboard.transcript import parse_transcript, parse_transcript_tail, scan_sessions +from blackbox.models import SessionInfo + +if TYPE_CHECKING: + from collections.abc import AsyncIterator + + from blackbox.dashboard.watcher import SessionWatcher + from blackbox.models import LogEntry + +TEMPLATES_DIR = Path(__file__).parent / "templates" +PROJECTS_DIR = Path.home() / ".claude" / "projects" + +HISTORY_BATCH = 200 + +router = APIRouter() +templates = Jinja2Templates(directory=str(TEMPLATES_DIR)) +templates.env.globals["fmt_time"] = fmt_time # type: ignore[assignment] # ty: ignore[invalid-assignment] +templates.env.globals["fmt_duration"] = fmt_duration # type: ignore[assignment] # ty: ignore[invalid-assignment] +templates.env.globals["fmt_relative"] = fmt_relative # type: ignore[assignment] # ty: ignore[invalid-assignment] + + +def mark_live(sessions: list[SessionInfo], watcher: SessionWatcher) -> list[SessionInfo]: + live_ids = watcher.live_session_ids() + if not live_ids: + return sessions + return [attrs.evolve(s, is_live=True) if s.session_id in live_ids else s for s in sessions] + + +@router.get("/", response_class=HTMLResponse) +async def index(request: Request, session: str = "") -> HTMLResponse: + watcher: SessionWatcher = request.app.state.watcher + sessions = mark_live(watcher.get_sessions(scan_sessions), watcher) + return templates.TemplateResponse( + request, + "index.html", + context={"sessions": sessions, "selected_id": session}, + ) + + +@router.get("/sessions", response_class=HTMLResponse) +async def session_list(request: Request, selected: str = "") -> HTMLResponse: + watcher: SessionWatcher = request.app.state.watcher + sessions = mark_live(watcher.get_sessions(scan_sessions), watcher) + return templates.TemplateResponse( + request, + "partials/session_list.html", + context={"sessions": sessions, "selected_id": selected}, + ) + + +@router.get("/sessions/{project_path}/{session_id}", response_class=HTMLResponse) +async def session_detail( + request: Request, + project_path: str, + session_id: str, + filter: str = "compact", +) -> HTMLResponse: + projects_dir: Path = request.app.state.projects_dir + transcript_path = projects_dir / project_path / f"{session_id}.jsonl" + if not transcript_path.exists(): + return HTMLResponse( + '
Session not found
', + ) + info = build_session_info(transcript_path, session_id, project_path) + meta = extract_meta(transcript_path) + return templates.TemplateResponse( + request, + "partials/session_detail.html", + context={ + "session": info, + "meta": meta, + "filter": filter, + "filters": list(FILTER_PRESETS.keys()), + }, + ) + + +def build_session_info(path: Path, session_id: str, project_path: str) -> SessionInfo: + """Build a SessionInfo from a transcript file for the detail view.""" + from blackbox.dashboard.transcript import decode_project_name, quick_session_info # noqa: PLC0415 + + info = quick_session_info(path, session_id, project_path, decode_project_name(project_path)) + if info: + return info + return SessionInfo( + session_id=session_id, + project_path=project_path, + project_name=project_path, + transcript_path=str(path), + started_at=path.stat().st_mtime, + ) + + +def filter_and_render(entries: list[LogEntry], filter_name: str, allowed: set[str] | None) -> list[str]: + return [ + html + for entry in entries + if passes_filter(entry, filter_name, allowed) + for html in [render_log_html(entry)] + if html + ] + + +async def log_stream( + transcript_path: Path, + filter_name: str, +) -> AsyncIterator[ServerSentEvent]: + allowed = FILTER_PRESETS.get(filter_name) + entries = await asyncio.to_thread(parse_transcript, transcript_path) + + batch: list[str] = [] + for entry in entries: + if passes_filter(entry, filter_name, allowed): + html = render_log_html(entry) + if html: + batch.append(html) + if len(batch) >= HISTORY_BATCH: + yield ServerSentEvent(data="\n".join(batch), event="log") + batch = [] + if batch: + yield ServerSentEvent(data="\n".join(batch), event="log") + + offset = transcript_path.stat().st_size + while True: + await asyncio.sleep(1.0) + try: + current_size = transcript_path.stat().st_size + except OSError: + break + if current_size <= offset: + continue + new_entries, offset = await asyncio.to_thread(parse_transcript_tail, transcript_path, offset) + rendered = filter_and_render(new_entries, filter_name, allowed) + if rendered: + yield ServerSentEvent(data="\n".join(rendered), event="log") + + +@router.get("/sessions/{project_path}/{session_id}/logs") +async def session_logs( + request: Request, + project_path: str, + session_id: str, + filter: str = "compact", +) -> EventSourceResponse: + projects_dir: Path = request.app.state.projects_dir + transcript_path = projects_dir / project_path / f"{session_id}.jsonl" + return EventSourceResponse(log_stream(transcript_path, filter)) diff --git a/packages/blackbox/src/blackbox/dashboard/templates/base.html b/packages/blackbox/src/blackbox/dashboard/templates/base.html new file mode 100644 index 0000000..8c934d6 --- /dev/null +++ b/packages/blackbox/src/blackbox/dashboard/templates/base.html @@ -0,0 +1,108 @@ + + + + + + Blackbox + + + + + + + + + + {% block content %}{% endblock %} + + + diff --git a/packages/blackbox/src/blackbox/dashboard/templates/index.html b/packages/blackbox/src/blackbox/dashboard/templates/index.html new file mode 100644 index 0000000..4e2c524 --- /dev/null +++ b/packages/blackbox/src/blackbox/dashboard/templates/index.html @@ -0,0 +1,98 @@ +{% extends "base.html" %} +{% block content %} +
+ + + + + +
+ {% if selected_id %} +
+
+ {% else %} + +
+
+
+ + + + +
+

Select a session to review

+

sessions appear as Claude Code writes them

+
+
+ {% endif %} +
+ +
+{% endblock %} diff --git a/packages/blackbox/src/blackbox/dashboard/templates/partials/session_detail.html b/packages/blackbox/src/blackbox/dashboard/templates/partials/session_detail.html new file mode 100644 index 0000000..9d20673 --- /dev/null +++ b/packages/blackbox/src/blackbox/dashboard/templates/partials/session_detail.html @@ -0,0 +1,189 @@ + +
+
+
+
+

+ {{ session.project_name }} +

+ {{ session.session_id[:8] }} +
+
+ {{ fmt_time(session.started_at) }} + {% if session.finished_at %} + {{ fmt_duration(session.started_at, session.finished_at) }} + {% else %} + + + active + + {% endif %} + {% if session.message_count %} + {{ session.message_count }} messages + {% endif %} +
+ {% if session.first_prompt %} +
+ {{ session.first_prompt }} +
+ {% endif %} +
+
+ + +
+ {% for f in filters %} + + {% endfor %} +
+
+ +{% if meta %} + +
+ + + + + analytics + {{ "{:,}".format(meta.input_tokens + meta.output_tokens) }} tokens / {{ meta.tool_calls }} tools{% if meta.tool_errors %} ({{ meta.tool_errors }} err){% endif %} + +
+ {% if meta.codeflash %} +
+ + codeflash + + {% if meta.codeflash.language %} + + {{ meta.codeflash.language }} + + {% endif %} + {% if meta.codeflash.optimization_domain %} + + {{ meta.codeflash.optimization_domain }} + + {% endif %} + {% if meta.codeflash.has_researcher %} + researcher + {% endif %} + {% if meta.codeflash.has_reviewer %} + reviewer + {% endif %} + {% if meta.codeflash.has_ci_handler %} + CI + {% endif %} + {% if meta.codeflash.has_pr_prep %} + PR prep + {% endif %} +
+ {% endif %} + +
+
+ tokens +
{{ "{:,}".format(meta.input_tokens) }} in / {{ "{:,}".format(meta.output_tokens) }} out
+
+
+ tools +
{{ meta.tool_calls }} calls + {% if meta.tool_errors %}({{ meta.tool_errors }} errors){% endif %} +
+
+
+ messages +
{{ meta.user_messages }} user / {{ meta.assistant_messages }} assistant
+
+ {% if meta.files_modified %} +
+ files +
{{ meta.files_modified }} modified +{{ meta.lines_added }}/-{{ meta.lines_removed }}
+
+ {% endif %} + {% if meta.git_commits %} +
+ git +
{{ meta.git_commits }} commits{% if meta.git_branch %} on {{ meta.git_branch }}{% endif %}
+
+ {% endif %} + {% if meta.cache_read_tokens %} +
+ cache +
{{ "{:.0%}".format(meta.cache_hit_rate) }} hit rate
+
+ {% endif %} + {% if meta.compactions %} +
+ compactions +
{{ meta.compactions }}
+
+ {% endif %} + {% if meta.thinking_blocks %} +
+ thinking +
{{ meta.thinking_blocks }} blocks
+
+ {% endif %} + {% if meta.subagents_spawned %} +
+ subagents +
{{ meta.subagents_spawned }}
+
+ {% endif %} + {% if meta.web_searches or meta.web_fetches %} +
+ web +
{{ meta.web_searches }} searches / {{ meta.web_fetches }} fetches
+
+ {% endif %} + {% if meta.user_interruptions %} +
+ interruptions +
{{ meta.user_interruptions }}
+
+ {% endif %} + {% if meta.permission_mode %} +
+ mode +
{{ meta.permission_mode }}
+
+ {% endif %} +
+ + {% if meta.tool_counts %} +
+ {% for tool, count in meta.tool_counts|dictsort(by='value', reverse=true) %} + {% if loop.index <= 8 %} + + {{ tool }}={{ count }} + + {% endif %} + {% endfor %} +
+ {% endif %} +
+
+{% endif %} + + +
+
diff --git a/packages/blackbox/src/blackbox/dashboard/templates/partials/session_list.html b/packages/blackbox/src/blackbox/dashboard/templates/partials/session_list.html new file mode 100644 index 0000000..9b69057 --- /dev/null +++ b/packages/blackbox/src/blackbox/dashboard/templates/partials/session_list.html @@ -0,0 +1,49 @@ +{% for s in sessions %} +
+ +
+ {% if s.is_live %} + + Live + {% else %} + + {% endif %} + + {{ fmt_relative(s.started_at) }} + {% if s.finished_at %} + · {{ fmt_duration(s.started_at, s.finished_at) }} + {% endif %} + +
+ +
+ {{ s.project_name }} +
+ + {% if s.first_prompt %} +
+ {{ s.first_prompt }} +
+ {% endif %} + +
+ {{ s.session_id[:8] }} + {% if s.message_count %} + {{ s.message_count }} msgs + {% endif %} +
+ +
+{% endfor %} + +{% if not sessions %} +
+

No sessions found

+

Waiting for Claude Code sessions...

+
+{% endif %} diff --git a/packages/blackbox/src/blackbox/dashboard/transcript.py b/packages/blackbox/src/blackbox/dashboard/transcript.py new file mode 100644 index 0000000..8c18411 --- /dev/null +++ b/packages/blackbox/src/blackbox/dashboard/transcript.py @@ -0,0 +1,279 @@ +"""Parse Claude Code .jsonl transcripts into LogEntry objects.""" + +from __future__ import annotations + +import json +from pathlib import Path +from typing import Any + +from blackbox.models import LogEntry, SessionInfo + + +def ts_to_epoch(ts: str | None) -> float: + if not ts: + return 0.0 + from datetime import UTC, datetime # noqa: PLC0415 + + try: + dt = datetime.fromisoformat(ts) + return dt.replace(tzinfo=UTC if dt.tzinfo is None else dt.tzinfo).timestamp() + except (ValueError, AttributeError): + return 0.0 + + +def extract_text_content(content: Any) -> str: + if isinstance(content, str): + return content + if isinstance(content, list): + return "\n".join( + block.get("text", "") for block in content if isinstance(block, dict) and block.get("type") == "text" + ) + return "" + + +def extract_tool_uses(content: Any) -> list[dict[str, Any]]: + if not isinstance(content, list): + return [] + return [block for block in content if isinstance(block, dict) and block.get("type") == "tool_use"] + + +def extract_tool_results(content: Any) -> list[dict[str, Any]]: + if not isinstance(content, list): + return [] + return [block for block in content if isinstance(block, dict) and block.get("type") == "tool_result"] + + +def parse_transcript(path: Path) -> list[LogEntry]: + """Parse a Claude Code .jsonl transcript into a list of LogEntry objects.""" + entries: list[LogEntry] = [] + for line in path.read_text().splitlines(): + if not line.strip(): + continue + try: + raw = json.loads(line) + except json.JSONDecodeError: + continue + parsed = parse_entry(raw) + entries.extend(parsed) + return entries + + +def parse_transcript_tail(path: Path, offset: int) -> tuple[list[LogEntry], int]: + """Parse only new bytes appended after *offset*. Returns (entries, new_offset).""" + with path.open("rb") as f: + f.seek(offset) + tail = f.read() + new_offset = offset + len(tail) + entries: list[LogEntry] = [] + for line in tail.decode("utf-8", errors="replace").splitlines(): + if not line.strip(): + continue + try: + raw = json.loads(line) + except json.JSONDecodeError: + continue + entries.extend(parse_entry(raw)) + return entries, new_offset + + +def parse_entry(raw: dict[str, Any]) -> list[LogEntry]: + entry_type = raw.get("type", "") + ts = ts_to_epoch(raw.get("timestamp")) + message = raw.get("message", {}) + + if entry_type == "user": + return parse_user_entry(ts, message, raw) + if entry_type == "assistant": + return parse_assistant_entry(ts, message) + if entry_type == "system": + text = extract_text_content(message.get("content", "")) if isinstance(message, dict) else str(message) + if text: + return [LogEntry(timestamp=ts, source="system", level="info", message=text)] + return [] + + +def parse_user_entry(ts: float, message: Any, raw: dict[str, Any]) -> list[LogEntry]: + if not isinstance(message, dict): + return [] + content = message.get("content", "") + entries: list[LogEntry] = [] + + tool_results = extract_tool_results(content) + if tool_results: + for tr in tool_results: + result_text = tr.get("content", "") + if isinstance(result_text, list): + result_text = " ".join(b.get("text", "") for b in result_text if isinstance(b, dict)) + is_error = tr.get("is_error", False) + tool_use_result = raw.get("toolUseResult", {}) + if not isinstance(tool_use_result, dict): + tool_use_result = {} + stdout = tool_use_result.get("stdout", "") + stderr = tool_use_result.get("stderr", "") + display = stdout or result_text or "" + if is_error and stderr: + display = stderr + level = "error" if is_error else "tool_result" + entries.append(LogEntry(timestamp=ts, source="claude", level=level, message=display[:2000])) + return entries + + text = extract_text_content(content) + if text: + entries.append(LogEntry(timestamp=ts, source="user", level="info", message=text)) + return entries + + +def parse_assistant_entry(ts: float, message: Any) -> list[LogEntry]: + if not isinstance(message, dict): + return [] + content = message.get("content", []) + if not isinstance(content, list): + return [LogEntry(timestamp=ts, source="claude", level="assistant", message=str(content))] if content else [] + + entries: list[LogEntry] = [] + for block in content: + if not isinstance(block, dict): + continue + btype = block.get("type", "") + if btype == "text": + text = block.get("text", "") + if text: + entries.append(LogEntry(timestamp=ts, source="claude", level="assistant", message=text)) + elif btype == "tool_use": + tool_name = block.get("name", "tool") + tool_input = block.get("input", {}) + preview = tool_input_preview(tool_name, tool_input) + entries.append( + LogEntry( + timestamp=ts, + source="claude", + level="tool_call", + message=f"{tool_name}: {preview}", + data={"tool": tool_name, "input_preview": preview}, + ) + ) + elif btype == "thinking": + entries.append(LogEntry(timestamp=ts, source="claude", level="assistant", message="(thinking)")) + return entries + + +def tool_input_preview(tool_name: str, tool_input: dict[str, Any]) -> str: + if tool_name == "Bash": + return str(tool_input.get("command", "")) + if tool_name in ("Read", "Write"): + return str(tool_input.get("file_path", "")) + if tool_name == "Edit": + fp = tool_input.get("file_path", "") + old = str(tool_input.get("old_string", ""))[:80] + return f"{fp}\n{old}..." + if tool_name == "Agent": + return str(tool_input.get("description", tool_input.get("prompt", "")))[:200] + if tool_name == "Skill": + return str(tool_input.get("skill", "")) + return json.dumps(tool_input, default=str)[:200] + + +def scan_sessions(projects_dir: Path) -> list[SessionInfo]: + """Scan ~/.claude/projects/ for session transcripts.""" + sessions: list[SessionInfo] = [] + if not projects_dir.is_dir(): + return sessions + + for project_dir in sorted(projects_dir.iterdir()): + if not project_dir.is_dir(): + continue + project_name = decode_project_name(project_dir.name) + for jsonl in sorted(project_dir.glob("*.jsonl"), key=lambda p: p.stat().st_mtime, reverse=True): + session_id = jsonl.stem + info = quick_session_info(jsonl, session_id, project_dir.name, project_name) + if info: + sessions.append(info) + + sessions.sort(key=lambda s: s.started_at, reverse=True) + return sessions + + +def decode_project_name(encoded: str) -> str: + parts = encoded.split("-") + if len(parts) >= 2 and parts[0] == "": + meaningful = [p for p in parts if p not in ("Users", "private", "tmp", "")] + if meaningful: + return "/".join(meaningful[-2:]) if len(meaningful) >= 2 else meaningful[-1] + return encoded + + +def quick_session_info( # noqa: C901, PLR0912 + path: Path, + session_id: str, + encoded_project: str, + project_name: str, +) -> SessionInfo | None: + """Read just enough of the transcript to build sidebar metadata.""" + first_prompt = "" + started_at = 0.0 + finished_at = 0.0 + message_count = 0 + cwd = "" + + try: + with path.open() as f: + for i, line in enumerate(f): + if not line.strip(): + continue + try: + raw = json.loads(line) + except json.JSONDecodeError: + continue + ts = ts_to_epoch(raw.get("timestamp")) + if ts and (started_at == 0.0 or ts < started_at): + started_at = ts + if ts and ts > finished_at: + finished_at = ts + + if raw.get("type") == "user": + message_count += 1 + msg = raw.get("message", {}) + if isinstance(msg, dict) and not first_prompt: + content = msg.get("content", "") + text = extract_text_content(content) + if text and not any( + isinstance(b, dict) and b.get("type") == "tool_result" + for b in (content if isinstance(content, list) else []) + ): + first_prompt = text[:120] + if not cwd: + cwd = raw.get("cwd", "") + + if i > 500: + break + except OSError: + return None + + if started_at == 0.0: + return None + + # Use file mtime for finished_at — always accurate even for resumed + # sessions, and avoids reading the entire file for long transcripts. + try: + mtime = path.stat().st_mtime + if mtime > started_at: + finished_at = mtime + except OSError: + pass + + display_name = project_name + if cwd: + parts = Path(cwd).parts + if len(parts) >= 2: + display_name = "/".join(parts[-2:]) + + return SessionInfo( + session_id=session_id, + project_path=encoded_project, + project_name=display_name, + transcript_path=str(path), + started_at=started_at, + finished_at=finished_at if finished_at > started_at else None, + first_prompt=first_prompt, + message_count=message_count, + ) diff --git a/packages/blackbox/src/blackbox/dashboard/watcher.py b/packages/blackbox/src/blackbox/dashboard/watcher.py new file mode 100644 index 0000000..b96497f --- /dev/null +++ b/packages/blackbox/src/blackbox/dashboard/watcher.py @@ -0,0 +1,85 @@ +"""Watchdog-based live session discovery for ~/.claude/projects/.""" + +from __future__ import annotations + +import threading +import time +from collections.abc import Callable +from pathlib import Path +from typing import TYPE_CHECKING, Any + +from watchdog.events import FileSystemEventHandler +from watchdog.observers import Observer + +if TYPE_CHECKING: + from watchdog.events import FileSystemEvent + + from blackbox.models import SessionInfo + +LIVE_THRESHOLD_S = 30.0 + + +class SessionWatcher(FileSystemEventHandler): + """Watches the Claude Code projects directory for transcript changes. + + Tracks which session files have been recently modified so the dashboard + can mark them as "live" in the sidebar. Also caches the session list + and invalidates it when any transcript file changes. + """ + + def __init__(self, projects_dir: Path) -> None: + self._projects_dir = projects_dir + self._lock = threading.Lock() + self._last_modified: dict[str, float] = {} + self._observer: Any = None + self._cached_sessions: list[SessionInfo] | None = None + + def start(self) -> None: + if not self._projects_dir.is_dir(): + return + self._observer = Observer() + self._observer.schedule(self, str(self._projects_dir), recursive=True) + self._observer.daemon = True + self._observer.start() + + def stop(self) -> None: + if self._observer is not None: + self._observer.stop() + self._observer.join(timeout=2.0) + self._observer = None + + def on_modified(self, event: FileSystemEvent) -> None: + if event.is_directory: + return + path = Path(str(event.src_path)) + if path.suffix != ".jsonl": + return + session_id = path.stem + with self._lock: + self._last_modified[session_id] = time.time() + self._cached_sessions = None + + def on_created(self, event: FileSystemEvent) -> None: + self.on_modified(event) + + def on_deleted(self, event: FileSystemEvent) -> None: + if not event.is_directory and Path(str(event.src_path)).suffix == ".jsonl": + with self._lock: + self._cached_sessions = None + + def live_session_ids(self) -> set[str]: + now = time.time() + with self._lock: + expired = [sid for sid, ts in self._last_modified.items() if now - ts > LIVE_THRESHOLD_S] + for sid in expired: + del self._last_modified[sid] + return {sid for sid, ts in self._last_modified.items() if now - ts <= LIVE_THRESHOLD_S} + + def get_sessions(self, scan_fn: Callable[[Path], list[SessionInfo]]) -> list[SessionInfo]: + with self._lock: + if self._cached_sessions is not None: + return self._cached_sessions + sessions = scan_fn(self._projects_dir) + with self._lock: + self._cached_sessions = sessions + return sessions diff --git a/packages/blackbox/src/blackbox/formatting.py b/packages/blackbox/src/blackbox/formatting.py new file mode 100644 index 0000000..203989c --- /dev/null +++ b/packages/blackbox/src/blackbox/formatting.py @@ -0,0 +1,266 @@ +"""Text formatting for analytics models.""" + +from __future__ import annotations + +import json + +import attrs + +from blackbox.models import ( + CodeflashSession, + ProjectStats, + Recommendation, + SessionAudit, + SessionDigest, + SessionMeta, + arrow, + sparkline, +) + + +class MetaFormatter: + """Formats a SessionMeta for display.""" + + def __init__(self, meta: SessionMeta) -> None: + self.meta = meta + + def summary(self) -> str: + """Format as a human-readable summary.""" + m = self.meta + lines = [ + f"Session {m.session_id[:8]} ({m.duration_minutes:.0f}min):", + f" Messages: {m.user_messages} user / {m.assistant_messages} assistant", + f" Tools: {m.tool_calls} calls ({m.tool_errors} errors)", + f" Tokens: {m.input_tokens:,} in / {m.output_tokens:,} out (cache hit {m.cache_hit_rate:.0%})", + ] + if m.git_commits: + lines.append(f" Git: {m.git_commits} commits on {m.git_branch or 'unknown'}") + if m.files_modified: + lines.append(f" Files: {m.files_modified} modified (+{m.lines_added}/-{m.lines_removed})") + if m.compactions: + lines.append(f" Compactions: {m.compactions}") + if m.user_interruptions: + lines.append(f" Interruptions: {m.user_interruptions}") + if m.thinking_blocks: + lines.append(f" Thinking blocks: {m.thinking_blocks}") + if m.web_searches or m.web_fetches: + lines.append(f" Web: {m.web_searches} searches / {m.web_fetches} fetches") + if m.permission_mode: + lines.append(f" Permission mode: {m.permission_mode}") + top = sorted(m.tool_counts.items(), key=lambda x: x[1], reverse=True)[:5] + if top: + lines.append(f" Top tools: {', '.join(f'{n}={c}' for n, c in top)}") + return "\n".join(lines) + + +class AuditFormatter: + """Formats a SessionAudit for display.""" + + def __init__(self, audit: SessionAudit) -> None: + self.audit = audit + + def summary(self) -> str: + """Format as a human-readable summary.""" + a = self.audit + lines = [ + f"Audit for {a.session_id[:8]}:", + f" Outcome: {a.outcome} | Satisfaction: {a.satisfaction}", + f" Type: {a.session_type}", + ] + if a.goal_categories: + goals = ", ".join( + f"{k}({v})" for k, v in sorted(a.goal_categories.items(), key=lambda x: x[1], reverse=True)[:3] + ) + lines.append(f" Goals: {goals}") + if a.friction_counts: + frictions = ", ".join( + f"{k}({v})" for k, v in sorted(a.friction_counts.items(), key=lambda x: x[1], reverse=True)[:3] + ) + lines.append(f" Friction: {frictions}") + if a.user_instructions: + lines.append(f" Instructions: {len(a.user_instructions)} extracted") + if a.summary: + lines.append(f" Summary: {a.summary[:120]}") + return "\n".join(lines) + + +class RecommendationFormatter: + """Formats a Recommendation for display.""" + + def __init__(self, rec: Recommendation) -> None: + self.rec = rec + + def summary(self) -> str: + """Format as a human-readable summary.""" + return f"{self.rec.suggestion}\n Evidence: {self.rec.evidence}" + + +class ProjectFormatter: + """Formats a ProjectStats for display.""" + + def __init__(self, project: ProjectStats) -> None: + self.project = project + + def summary(self) -> str: + """Format as a human-readable summary.""" + p = self.project + marker = " [!]" if p.is_outlier else "" + lines = [ + f"{p.project_name}{marker}: {p.session_count} sessions, " + f"{p.success_rate:.0%} success, " + f"{p.avg_tool_errors:.1f} errors/session, " + f"{p.avg_duration_s / 60:.0f}min avg" + ] + if p.top_error_categories: + lines.append(f" Errors: {' '.join(f'{n}({c})' for n, c in p.top_error_categories[:3])}") + if p.top_friction: + lines.append(f" Friction: {' '.join(f'{n}({c})' for n, c in p.top_friction[:3])}") + return "\n".join(lines) + + +class DigestFormatter: + """Formats a SessionDigest for display.""" + + def __init__(self, digest: SessionDigest) -> None: + self.digest = digest + + def summary(self) -> str: + """Format as a human-readable summary.""" + lines: list[str] = [] + self.render_overview(lines) + self.render_trends(lines) + self.render_projects(lines) + self.render_recommendations(lines) + return "\n".join(lines) + + def to_json(self) -> str: + """Serialize to JSON.""" + return json.dumps(attrs.asdict(self.digest), indent=2, default=str) + + def render_overview(self, lines: list[str]) -> None: + """Render the overview section.""" + d = self.digest + lines.append(f"Session Digest ({d.session_count} sessions)") + lines.append("") + lines.append( + f" {d.success_rate:.0%} success rate | " + f"{d.avg_duration_s / 60:.1f}min avg | " + f"{d.avg_tool_errors:.1f} errors/session" + ) + lines.append(f" Avg tokens: {d.avg_input_tokens:,.0f} in / {d.avg_output_tokens:,.0f} out") + lines.append(f" Avg tool calls: {d.avg_tool_calls:.1f}") + if d.outcome_distribution: + lines.append("") + lines.append("Outcomes:") + for outcome, count in sorted(d.outcome_distribution.items(), key=lambda x: x[1], reverse=True): + pct = count / max(d.session_count, 1) * 100 + lines.append(f" {outcome}: {count} ({pct:.0f}%)") + if d.satisfaction_distribution: + lines.append("") + lines.append("Satisfaction:") + for sat, count in sorted(d.satisfaction_distribution.items(), key=lambda x: x[1], reverse=True): + pct = count / max(d.session_count, 1) * 100 + lines.append(f" {sat}: {count} ({pct:.0f}%)") + if d.top_friction: + lines.append("") + lines.append("Top friction:") + for name, count in d.top_friction[:5]: + lines.append(f" {name}: {count}") + + def render_trends(self, lines: list[str]) -> None: + """Render the trends section.""" + d = self.digest + if not d.weeks: + return + lines.append("") + lines.append("Trends") + lines.append( + f" Success rate: {d.rolling_success_rate:.0%} avg " + f"({arrow(d.success_rate_change)} {d.success_rate_change:+.0%})" + ) + lines.append( + f" Error rate: {d.rolling_error_rate:.1f}/session avg " + f"({arrow(d.error_rate_change, invert=True)} {d.error_rate_change:+.1f})" + ) + lines.append( + f" Duration: {d.rolling_duration_s / 60:.0f}min avg " + f"({arrow(d.duration_change, invert=True)} {d.duration_change / 60:+.0f}min)" + ) + if len(d.weeks) >= 2: + lines.append( + f" Success: [{sparkline([w.success_rate for w in d.weeks])}] " + f"Errors: [{sparkline([w.avg_errors_per_session for w in d.weeks])}]" + ) + if d.error_category_deltas: + lines.append("") + lines.append(" Error category trends:") + for cat, pct, rolling, latest_count in d.error_category_deltas: + lines.append( + f" {cat}: {arrow(pct, invert=True)} {pct:+.0%} ({rolling:.0f}/wk -> {latest_count:.0f})" + ) + lines.append("") + lines.append(" Weekly breakdown:") + lines.extend( + f" {w.week}: {w.session_count} sessions, " + f"{w.success_rate:.0%} success, " + f"{w.avg_errors_per_session:.1f} errors, " + f"{w.avg_duration_s / 60:.0f}min avg" + for w in d.weeks + ) + + def render_projects(self, lines: list[str]) -> None: + """Render the projects section.""" + d = self.digest + if not d.projects: + return + lines.append("") + lines.append(f"Projects ({len(d.projects)})") + lines.extend(f" {ProjectFormatter(p).summary()}" for p in d.projects) + + def render_recommendations(self, lines: list[str]) -> None: + """Render the recommendations section.""" + d = self.digest + if not d.recommendations: + return + lines.append("") + lines.append("Recommendations") + for i, rec in enumerate(d.recommendations, 1): + lines.append(f" {i}. {RecommendationFormatter(rec).summary()}") + + +class CodeflashFormatter: + """Formats a CodeflashSession for display.""" + + def __init__(self, cf: CodeflashSession) -> None: + self.cf = cf + + def summary(self) -> str: + """Format as a human-readable summary.""" + if not self.cf.is_codeflash: + return "Not a codeflash session" + c = self.cf + lines = ["Codeflash plugin session"] + if c.language: + lines[0] += f" ({c.language})" + pairs = [ + (c.optimization_domain, f" Domain: {c.optimization_domain}"), + (c.agents_used, f" Agents: {', '.join(c.agents_used)}"), + (c.skills_invoked, f" Skills: {', '.join(c.skills_invoked)}"), + (c.commands_invoked, f" Commands: {', '.join(c.commands_invoked)}"), + (c.teams_created, f" Teams created: {c.teams_created}"), + ] + lines.extend(text for cond, text in pairs if cond) + capabilities = self.capabilities() + if capabilities: + lines.append(f" Capabilities: {', '.join(capabilities)}") + return "\n".join(lines) + + def capabilities(self) -> list[str]: + c = self.cf + mapping = [ + (c.has_researcher, "researcher"), + (c.has_reviewer, "reviewer"), + (c.has_ci_handler, "CI"), + (c.has_pr_prep, "PR prep"), + ] + return [name for flag, name in mapping if flag] diff --git a/packages/blackbox/src/blackbox/models.py b/packages/blackbox/src/blackbox/models.py new file mode 100644 index 0000000..882d950 --- /dev/null +++ b/packages/blackbox/src/blackbox/models.py @@ -0,0 +1,261 @@ +from __future__ import annotations + +from typing import Any + +import attrs + +# --------------------------------------------------------------------------- +# Dashboard models +# --------------------------------------------------------------------------- + + +@attrs.frozen +class LogEntry: + """A single renderable log event.""" + + timestamp: float + source: str # "claude", "user", "system" + level: str # "assistant", "tool_call", "tool_result", "status", "error", "info" + message: str + data: dict[str, Any] = attrs.Factory(dict) + + +@attrs.frozen +class SessionInfo: + """Lightweight metadata for the sidebar session list.""" + + session_id: str + project_path: str + project_name: str + transcript_path: str + started_at: float + finished_at: float | None = None + first_prompt: str = "" + message_count: int = 0 + is_live: bool = False + + +# --------------------------------------------------------------------------- +# Analytics models +# --------------------------------------------------------------------------- + +SPARK_CHARS = " _.~*" + + +def sparkline(values: list[float]) -> str: + if len(values) < 2: + return "" + lo, hi = min(values), max(values) + if hi == lo: + return SPARK_CHARS[2] * len(values) + scale = len(SPARK_CHARS) - 1 + return "".join(SPARK_CHARS[round((v - lo) / (hi - lo) * scale)] for v in values) + + +def arrow(delta: float, *, invert: bool = False) -> str: + if abs(delta) < 0.05: + return "=" + positive = delta > 0 + if invert: + positive = not positive + return "^" if positive else "v" + + +@attrs.frozen +class SessionEvent: + timestamp: str | None + speaker: str # "user" | "assistant" | "system" + text: str + tool_name: str | None + file_path: str | None + command: str | None + is_error: bool + error_category: str | None + attachment_type: str | None + + +@attrs.frozen +class SessionMeta: + session_id: str + project_path: str + transcript_path: str + start_time: float + end_time: float + duration_s: float + user_messages: int + assistant_messages: int + tool_calls: int + tool_counts: dict[str, int] = attrs.Factory(dict) + tool_errors: int = 0 + tool_error_categories: dict[str, int] = attrs.Factory(dict) + tool_error_details: tuple[tuple[str, str], ...] = () + input_tokens: int = 0 + output_tokens: int = 0 + cache_read_tokens: int = 0 + cache_creation_tokens: int = 0 + languages: dict[str, int] = attrs.Factory(dict) + files_modified: int = 0 + lines_added: int = 0 + lines_removed: int = 0 + git_commits: int = 0 + git_branch: str | None = None + user_interruptions: int = 0 + compactions: int = 0 + subagents_spawned: int = 0 + thinking_blocks: int = 0 + web_searches: int = 0 + web_fetches: int = 0 + permission_mode: str | None = None + first_prompt: str = "" + codeflash: CodeflashSession | None = None + + @property + def duration_minutes(self) -> float: + return self.duration_s / 60 + + @property + def total_tokens(self) -> int: + return self.input_tokens + self.output_tokens + + @property + def cache_hit_rate(self) -> float: + total = self.input_tokens + self.cache_read_tokens + self.cache_creation_tokens + return self.cache_read_tokens / total if total else 0.0 + + +@attrs.frozen +class SessionAudit: + session_id: str + goal_categories: dict[str, int] = attrs.Factory(dict) + outcome: str = "unclear" + satisfaction: str = "neutral" + friction_counts: dict[str, int] = attrs.Factory(dict) + session_type: str = "single_task" + user_instructions: tuple[str, ...] = () + summary: str = "" + + +@attrs.frozen +class Recommendation: + suggestion: str + evidence: str + frequency: float + source_sessions: int + + +@attrs.frozen +class WeekStats: + week: str + session_count: int + success_rate: float + avg_errors_per_session: float + avg_duration_s: float + error_category_counts: dict[str, int] = attrs.Factory(dict) + + +@attrs.define +class ProjectStats: + project_path: str + project_name: str + session_count: int + success_rate: float + avg_tool_errors: float + avg_duration_s: float + top_error_categories: tuple[tuple[str, int], ...] + top_friction: tuple[tuple[str, int], ...] + is_outlier: bool = False + + +@attrs.frozen +class SessionDigest: + session_count: int + date_range: tuple[float, float] + success_rate: float + outcome_distribution: dict[str, int] = attrs.Factory(dict) + satisfaction_distribution: dict[str, int] = attrs.Factory(dict) + top_friction: tuple[tuple[str, int], ...] = () + avg_duration_s: float = 0.0 + avg_input_tokens: float = 0.0 + avg_output_tokens: float = 0.0 + avg_tool_calls: float = 0.0 + avg_tool_errors: float = 0.0 + weeks: tuple[WeekStats, ...] = () + rolling_success_rate: float = 0.0 + rolling_error_rate: float = 0.0 + rolling_duration_s: float = 0.0 + success_rate_change: float = 0.0 + error_rate_change: float = 0.0 + duration_change: float = 0.0 + error_category_deltas: tuple[tuple[str, float, float, float], ...] = () + projects: tuple[ProjectStats, ...] = () + recommendations: tuple[Recommendation, ...] = () + + +# --------------------------------------------------------------------------- +# Codeflash plugin detection +# --------------------------------------------------------------------------- + +CODEFLASH_AGENT_PREFIXES = ( + "codeflash", + "codeflash-python", + "codeflash-deep", + "codeflash-cpu", + "codeflash-memory", + "codeflash-async", + "codeflash-structure", + "codeflash-setup", + "codeflash-scan", + "codeflash-ci", + "codeflash-pr-prep", + "codeflash-researcher", + "codeflash-review", + "codeflash-javascript", + "codeflash-js-deep", + "codeflash-js-cpu", + "codeflash-js-memory", + "codeflash-js-async", + "codeflash-js-structure", + "codeflash-js-bundle", + "codeflash-js-setup", + "codeflash-js-scan", + "codeflash-js-ci", + "codeflash-js-pr-prep", + "codeflash-java", + "codeflash-java-deep", + "codeflash-java-cpu", + "codeflash-java-memory", + "codeflash-java-async", + "codeflash-java-structure", + "codeflash-java-setup", + "codeflash-java-scan", + "codeflash-java-ci", + "codeflash-java-pr-prep", +) + +CODEFLASH_SKILLS = ( + "codeflash-optimize", + "memray-profiling", +) + +CODEFLASH_COMMANDS = ( + "codex-review", + "codex-setup", + "codex-status", +) + + +@attrs.frozen +class CodeflashSession: + """Plugin-specific metadata detected from a codeflash agent session.""" + + is_codeflash: bool = False + language: str | None = None + agents_used: tuple[str, ...] = () + skills_invoked: tuple[str, ...] = () + commands_invoked: tuple[str, ...] = () + teams_created: int = 0 + optimization_domain: str | None = None + has_researcher: bool = False + has_reviewer: bool = False + has_ci_handler: bool = False + has_pr_prep: bool = False diff --git a/packages/blackbox/src/blackbox/py.typed b/packages/blackbox/src/blackbox/py.typed new file mode 100644 index 0000000..e69de29 diff --git a/packages/blackbox/tests/__init__.py b/packages/blackbox/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/packages/blackbox/tests/conftest.py b/packages/blackbox/tests/conftest.py new file mode 100644 index 0000000..54a5660 --- /dev/null +++ b/packages/blackbox/tests/conftest.py @@ -0,0 +1,38 @@ +from __future__ import annotations + +from typing import Any + +from blackbox.models import SessionAudit, SessionMeta + + +def make_meta(**kw: Any) -> SessionMeta: + defaults: dict[str, Any] = { + "session_id": "abcd1234-5678-9012-3456-789012345678", + "project_path": "/tmp/project", + "transcript_path": "/tmp/project/.claude/sessions/abc.jsonl", + "start_time": 1700000000.0, + "end_time": 1700003600.0, + "duration_s": 3600.0, + "user_messages": 10, + "assistant_messages": 12, + "tool_calls": 25, + } + defaults.update(kw) + return SessionMeta(**defaults) + + +def make_audit(**kw: Any) -> SessionAudit: + defaults: dict[str, Any] = { + "session_id": "abcd1234-5678-9012-3456-789012345678", + "outcome": "mostly_achieved", + "satisfaction": "satisfied", + } + defaults.update(kw) + return SessionAudit(**defaults) + + +def pair( + meta_kw: dict[str, Any] | None = None, + audit_kw: dict[str, Any] | None = None, +) -> tuple[SessionMeta, SessionAudit]: + return make_meta(**(meta_kw or {})), make_audit(**(audit_kw or {})) diff --git a/packages/blackbox/tests/e2e/__init__.py b/packages/blackbox/tests/e2e/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/packages/blackbox/tests/e2e/conftest.py b/packages/blackbox/tests/e2e/conftest.py new file mode 100644 index 0000000..2828cbb --- /dev/null +++ b/packages/blackbox/tests/e2e/conftest.py @@ -0,0 +1,188 @@ +"""Fixtures for Playwright end-to-end tests.""" + +from __future__ import annotations + +import json +import socket +import threading +import time +from collections.abc import Iterator +from typing import TYPE_CHECKING + +import pytest +import uvicorn + +if TYPE_CHECKING: + from pathlib import Path + + from playwright.sync_api import Page + +pytestmark = pytest.mark.e2e + +SESSION_A_ID = "sess-aaaa1111-2222-3333-4444-555566667777" +SESSION_B_ID = "sess-bbbb1111-2222-3333-4444-555566667777" +PROJECT_A_DIR = "-Users-alice-Desktop-work-myapp" +PROJECT_B_DIR = "-Users-bob-code-webapp" + + +def _jsonl(*entries: dict) -> str: + """Serialize entries as newline-delimited JSON.""" + return "\n".join(json.dumps(e) for e in entries) + "\n" + + +RICH_SESSION = _jsonl( + { + "type": "user", + "timestamp": "2025-03-15T10:00:00Z", + "message": {"content": "Help me optimize this function for better performance"}, + "cwd": "/Users/alice/Desktop/work/myapp", + }, + { + "type": "assistant", + "timestamp": "2025-03-15T10:00:05Z", + "message": { + "content": [{"type": "text", "text": "Let me look at the code and find optimization opportunities."}], + "usage": {"input_tokens": 500, "output_tokens": 120, "cache_read_input_tokens": 200}, + }, + }, + { + "type": "assistant", + "timestamp": "2025-03-15T10:00:08Z", + "message": { + "content": [ + { + "type": "tool_use", + "id": "tu_read_1", + "name": "Read", + "input": {"file_path": "/Users/alice/Desktop/work/myapp/main.py"}, + } + ], + "usage": {"input_tokens": 100, "output_tokens": 30}, + }, + }, + { + "type": "user", + "timestamp": "2025-03-15T10:00:09Z", + "message": { + "content": [ + {"type": "tool_result", "tool_use_id": "tu_read_1", "content": "def sort_items(items):\n pass"} + ] + }, + }, + { + "type": "assistant", + "timestamp": "2025-03-15T10:00:15Z", + "message": { + "content": [ + { + "type": "tool_use", + "id": "tu_bash_1", + "name": "Bash", + "input": {"command": "uv run pytest tests/ -v"}, + } + ], + "usage": {"input_tokens": 200, "output_tokens": 50}, + }, + }, + { + "type": "user", + "timestamp": "2025-03-15T10:00:20Z", + "message": { + "content": [{"type": "tool_result", "tool_use_id": "tu_bash_1", "content": "FAILED", "is_error": True}] + }, + "toolUseResult": {"stderr": "AssertionError: expected 42 got 0"}, + }, + { + "type": "assistant", + "timestamp": "2025-03-15T10:00:25Z", + "message": { + "content": [ + {"type": "thinking", "thinking": "I need to fix the test."}, + {"type": "text", "text": "The test failed. Let me fix the implementation."}, + ], + "usage": {"input_tokens": 300, "output_tokens": 80}, + }, + }, + { + "type": "user", + "timestamp": "2025-03-15T10:01:00Z", + "message": {"content": "That looks great, thanks!"}, + }, +) + +MINIMAL_SESSION = _jsonl( + { + "type": "user", + "timestamp": "2025-03-15T09:00:00Z", + "message": {"content": "What is this project about?"}, + "cwd": "/Users/bob/code/webapp", + }, +) + + +def _get_free_port() -> int: + """Find a free TCP port on localhost.""" + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: + s.bind(("127.0.0.1", 0)) + return s.getsockname()[1] + + +@pytest.fixture(scope="session") +def projects_dir(tmp_path_factory: pytest.TempPathFactory) -> Path: + """Create a temp directory tree with fixture session transcripts.""" + root = tmp_path_factory.mktemp("projects") + + project_a = root / PROJECT_A_DIR + project_a.mkdir() + (project_a / f"{SESSION_A_ID}.jsonl").write_text(RICH_SESSION) + + project_b = root / PROJECT_B_DIR + project_b.mkdir() + (project_b / f"{SESSION_B_ID}.jsonl").write_text(MINIMAL_SESSION) + + return root + + +@pytest.fixture(scope="session") +def live_server(projects_dir: Path) -> Iterator[str]: + """Start the dashboard on a random free port and yield the base URL.""" + from blackbox.dashboard.app import create_app + + port = _get_free_port() + application = create_app(projects_dir=projects_dir) + + config = uvicorn.Config(application, host="127.0.0.1", port=port, log_level="warning") + server = uvicorn.Server(config) + thread = threading.Thread(target=server.run, daemon=True) + thread.start() + + deadline = time.monotonic() + 10 + while time.monotonic() < deadline: + try: + with socket.create_connection(("127.0.0.1", port), timeout=0.5): + break + except OSError: + time.sleep(0.1) + else: + msg = "Live server did not start in time" + raise RuntimeError(msg) + + yield f"http://127.0.0.1:{port}" + + server.should_exit = True + thread.join(timeout=5.0) + + +@pytest.fixture(scope="session") +def base_url(live_server: str) -> str: + """Provide the base URL for pytest-playwright's page.goto().""" + return live_server + + +@pytest.fixture +def dashboard(page: Page, base_url: str) -> Page: + """Navigate to the dashboard index and wait for session list to load.""" + page.goto(base_url) + page.locator("#session-list-container").wait_for(state="attached") + page.locator("#session-list-container > div").first.wait_for(state="visible", timeout=10_000) + return page diff --git a/packages/blackbox/tests/e2e/test_dashboard_loads.py b/packages/blackbox/tests/e2e/test_dashboard_loads.py new file mode 100644 index 0000000..552d6b7 --- /dev/null +++ b/packages/blackbox/tests/e2e/test_dashboard_loads.py @@ -0,0 +1,35 @@ +"""Smoke tests: dashboard loads and renders basic structure.""" + +from __future__ import annotations + +from typing import TYPE_CHECKING + +import pytest +from playwright.sync_api import expect + +if TYPE_CHECKING: + from playwright.sync_api import Page + +pytestmark = pytest.mark.e2e + + +class TestDashboardLoads: + """Dashboard renders its core layout on first load.""" + + def test_page_title(self, dashboard: Page) -> None: + """Page title is 'Blackbox'.""" + assert "Blackbox" == dashboard.title() + + def test_brand_visible(self, dashboard: Page) -> None: + """Sidebar brand text is visible.""" + expect(dashboard.locator("#sidebar .sidebar-full span.text-accent-400")).to_have_text("blackbox") + + def test_empty_state_without_selection(self, page: Page, base_url: str) -> None: + """Empty state shown when no session is selected.""" + page.goto(base_url) + expect(page.get_by_text("Select a session to review")).to_be_visible() + + def test_session_list_loads_via_htmx(self, dashboard: Page) -> None: + """Session list container is populated by the HTMX load trigger.""" + items = dashboard.locator("#session-list-container > div") + assert items.count() >= 2 diff --git a/packages/blackbox/tests/e2e/test_session_detail.py b/packages/blackbox/tests/e2e/test_session_detail.py new file mode 100644 index 0000000..e3964cf --- /dev/null +++ b/packages/blackbox/tests/e2e/test_session_detail.py @@ -0,0 +1,77 @@ +"""Tests for session detail view, filters, and analytics.""" + +from __future__ import annotations + +import re +from typing import TYPE_CHECKING + +import pytest +from playwright.sync_api import expect + +from tests.e2e.conftest import PROJECT_A_DIR, SESSION_A_ID + +if TYPE_CHECKING: + from playwright.sync_api import Page + +pytestmark = pytest.mark.e2e + + +class TestSessionDetail: + """Clicking a session loads the detail view.""" + + def test_clicking_session_loads_detail(self, dashboard: Page) -> None: + """Session detail header appears after clicking a session.""" + dashboard.get_by_text("work/myapp").first.click() + expect(dashboard.locator("#session-detail h2")).to_be_visible(timeout=10_000) + + def test_detail_shows_project_name(self, dashboard: Page) -> None: + """Session detail header shows the project name.""" + dashboard.get_by_text("work/myapp").first.click() + expect(dashboard.locator("#session-detail h2")).to_contain_text("work/myapp") + + def test_detail_shows_session_id_prefix(self, dashboard: Page) -> None: + """Session detail header shows the 8-char session ID prefix.""" + dashboard.get_by_text("work/myapp").first.click() + detail = dashboard.locator("#session-detail") + expect(detail.get_by_text("sess-aaa")).to_be_visible(timeout=10_000) + + def test_filter_buttons_visible(self, dashboard: Page) -> None: + """Filter buttons (Compact, All, etc.) are visible after loading a session.""" + dashboard.get_by_text("work/myapp").first.click() + detail = dashboard.locator("#session-detail") + expect(detail.get_by_role("button", name="Compact")).to_be_visible(timeout=10_000) + expect(detail.get_by_role("button", name="All")).to_be_visible() + + def test_filter_button_active_state(self, dashboard: Page) -> None: + """The default filter (compact) has the active accent styling.""" + dashboard.get_by_text("work/myapp").first.click() + compact_btn = dashboard.locator("#session-detail button", has_text="Compact").first + expect(compact_btn).to_be_visible(timeout=10_000) + expect(compact_btn).to_have_class(re.compile(r"text-accent-400")) + + def test_switching_filter_changes_active_button(self, dashboard: Page) -> None: + """Clicking 'All' makes it active and removes active from 'Compact'.""" + dashboard.get_by_text("work/myapp").first.click() + all_btn = dashboard.locator("#session-detail button", has_text="All").first + expect(all_btn).to_be_visible(timeout=10_000) + all_btn.click() + expect(all_btn).to_have_class(re.compile(r"text-accent-400")) + + def test_analytics_panel_exists(self, dashboard: Page) -> None: + """Analytics details element is present for sessions with metadata.""" + dashboard.get_by_text("work/myapp").first.click() + analytics = dashboard.locator("#session-detail details") + expect(analytics).to_be_visible(timeout=10_000) + + def test_analytics_panel_expands(self, dashboard: Page) -> None: + """Clicking the analytics summary expands the panel to show token counts.""" + dashboard.get_by_text("work/myapp").first.click() + summary = dashboard.locator("#session-detail details summary") + expect(summary).to_be_visible(timeout=10_000) + summary.click() + expect(dashboard.get_by_text("tokens", exact=True)).to_be_visible() + + def test_session_not_found(self, page: Page, base_url: str) -> None: + """Navigating to a non-existent session shows an error.""" + page.goto(f"{base_url}/?session={PROJECT_A_DIR}/{SESSION_A_ID.replace('aaaa', 'zzzz')}") + expect(page.get_by_text("Session not found")).to_be_visible(timeout=10_000) diff --git a/packages/blackbox/tests/e2e/test_session_list.py b/packages/blackbox/tests/e2e/test_session_list.py new file mode 100644 index 0000000..a8df037 --- /dev/null +++ b/packages/blackbox/tests/e2e/test_session_list.py @@ -0,0 +1,42 @@ +"""Tests for the session list sidebar.""" + +from __future__ import annotations + +from typing import TYPE_CHECKING + +import pytest +from playwright.sync_api import expect + +if TYPE_CHECKING: + from playwright.sync_api import Page + +pytestmark = pytest.mark.e2e + + +class TestSessionList: + """Session list sidebar displays session metadata correctly.""" + + def test_project_names_visible(self, dashboard: Page) -> None: + """Project names from fixture data appear in the session list.""" + expect(dashboard.get_by_text("work/myapp")).to_be_visible() + expect(dashboard.get_by_text("code/webapp")).to_be_visible() + + def test_first_prompt_shown(self, dashboard: Page) -> None: + """First user prompt is displayed in the session item.""" + expect(dashboard.get_by_text("Help me optimize this function")).to_be_visible() + + def test_session_id_prefix_shown(self, dashboard: Page) -> None: + """The 8-char session ID prefix is visible.""" + expect(dashboard.get_by_text("sess-aaa")).to_be_visible() + expect(dashboard.get_by_text("sess-bbb")).to_be_visible() + + def test_message_count_shown(self, dashboard: Page) -> None: + """Message count badge is visible for sessions with messages.""" + expect(dashboard.get_by_text("4 msgs")).to_be_visible() + + def test_session_list_refreshes_on_poll(self, dashboard: Page) -> None: + """The HTMX poll fires and the list remains populated.""" + with dashboard.expect_response("**/sessions*"): + dashboard.wait_for_timeout(5500) + items = dashboard.locator("#session-list-container > div") + assert items.count() >= 2 diff --git a/packages/blackbox/tests/e2e/test_sidebar.py b/packages/blackbox/tests/e2e/test_sidebar.py new file mode 100644 index 0000000..d6882b3 --- /dev/null +++ b/packages/blackbox/tests/e2e/test_sidebar.py @@ -0,0 +1,50 @@ +"""Tests for sidebar collapse/expand and localStorage persistence.""" + +from __future__ import annotations + +import re +from typing import TYPE_CHECKING + +import pytest +from playwright.sync_api import expect + +if TYPE_CHECKING: + from playwright.sync_api import Page + +pytestmark = pytest.mark.e2e + + +class TestSidebar: + """Sidebar collapse/expand behavior and localStorage persistence.""" + + def test_collapse_button(self, dashboard: Page) -> None: + """Clicking collapse hides the sidebar full content.""" + dashboard.locator("#collapse-btn").click() + expect(dashboard.locator("#sidebar")).to_have_class(re.compile(r"collapsed")) + + def test_expand_button(self, dashboard: Page) -> None: + """Clicking expand restores the sidebar.""" + dashboard.locator("#collapse-btn").click() + expect(dashboard.locator("#sidebar")).to_have_class(re.compile(r"collapsed")) + dashboard.locator("#expand-btn").click() + sidebar_classes = dashboard.locator("#sidebar").get_attribute("class") or "" + assert "collapsed" not in sidebar_classes + + def test_collapse_persists_to_localstorage(self, dashboard: Page) -> None: + """Collapsing sets localStorage sidebar-collapsed to '1'.""" + dashboard.locator("#collapse-btn").click() + value = dashboard.evaluate("() => localStorage.getItem('sidebar-collapsed')") + assert "1" == value + + def test_state_restored_on_reload(self, dashboard: Page, base_url: str) -> None: + """Collapsed state persists across page reloads.""" + dashboard.locator("#collapse-btn").click() + dashboard.goto(base_url) + expect(dashboard.locator("#sidebar")).to_have_class(re.compile(r"collapsed")) + + def test_expand_clears_localstorage(self, dashboard: Page) -> None: + """Expanding sets localStorage sidebar-collapsed to '0'.""" + dashboard.locator("#collapse-btn").click() + dashboard.locator("#expand-btn").click() + value = dashboard.evaluate("() => localStorage.getItem('sidebar-collapsed')") + assert "0" == value diff --git a/packages/blackbox/tests/e2e/test_sse_streaming.py b/packages/blackbox/tests/e2e/test_sse_streaming.py new file mode 100644 index 0000000..f6ab7f6 --- /dev/null +++ b/packages/blackbox/tests/e2e/test_sse_streaming.py @@ -0,0 +1,41 @@ +"""Tests for SSE log streaming in the session detail view.""" + +from __future__ import annotations + +from typing import TYPE_CHECKING + +import pytest +from playwright.sync_api import expect + +if TYPE_CHECKING: + from playwright.sync_api import Page + +pytestmark = pytest.mark.e2e + + +class TestSSEStreaming: + """SSE log events render log entries in the DOM.""" + + def _load_session(self, dashboard: Page) -> None: + """Click the rich session to trigger SSE streaming.""" + dashboard.get_by_text("work/myapp").first.click() + dashboard.locator("#log-container").wait_for(state="attached", timeout=10_000) + + def test_log_entries_appear(self, dashboard: Page) -> None: + """Log entries are inserted into the log container via SSE.""" + self._load_session(dashboard) + container = dashboard.locator("#log-container") + expect(container).not_to_be_empty(timeout=15_000) + + def test_log_container_has_children(self, dashboard: Page) -> None: + """Log container receives child elements from SSE events.""" + self._load_session(dashboard) + dashboard.locator("#log-container > *").first.wait_for(state="visible", timeout=15_000) + assert dashboard.locator("#log-container > *").count() >= 1 + + def test_sse_connection_established(self, dashboard: Page) -> None: + """The SSE URL is set on the log container's data attribute.""" + self._load_session(dashboard) + url = dashboard.locator("#log-container").get_attribute("data-sse-url") + assert url is not None + assert "/logs" in url diff --git a/packages/blackbox/tests/test_analytics.py b/packages/blackbox/tests/test_analytics.py new file mode 100644 index 0000000..99b56bb --- /dev/null +++ b/packages/blackbox/tests/test_analytics.py @@ -0,0 +1,693 @@ +"""Tests for analytics extraction and codeflash detection.""" + +from __future__ import annotations + +import json +from pathlib import Path +from typing import Any + +from blackbox.analytics import ( + classify_error, + count_diff_lines, + detect_codeflash, + extract_meta, + infer_domain, + infer_language, + track_file_changes, +) + + +def _ts(offset: int = 0) -> str: + return f"2026-04-28T12:00:{offset:02d}Z" + + +def _write_jsonl(path: Path, entries: list[dict[str, Any]]) -> None: + path.write_text("\n".join(json.dumps(e) for e in entries) + "\n") + + +# --------------------------------------------------------------------------- +# extract_meta basics +# --------------------------------------------------------------------------- + + +class TestExtractMeta: + def test_returns_none_for_empty_file(self, tmp_path: Path) -> None: + p = tmp_path / "proj" / "empty.jsonl" + p.parent.mkdir() + p.write_text("") + assert extract_meta(p) is None + + def test_returns_none_for_missing_file(self, tmp_path: Path) -> None: + assert extract_meta(tmp_path / "missing.jsonl") is None + + def test_returns_none_for_no_timestamps(self, tmp_path: Path) -> None: + p = tmp_path / "proj" / "sess.jsonl" + p.parent.mkdir() + _write_jsonl(p, [{"type": "system", "message": "hello"}]) + assert extract_meta(p) is None + + def test_basic_session(self, tmp_path: Path) -> None: + p = tmp_path / "proj" / "abc123.jsonl" + p.parent.mkdir() + _write_jsonl( + p, + [ + { + "type": "user", + "timestamp": _ts(0), + "message": {"content": "optimize this function"}, + }, + { + "type": "assistant", + "timestamp": _ts(10), + "message": { + "content": [{"type": "text", "text": "I'll help you."}], + "usage": {"input_tokens": 500, "output_tokens": 200}, + }, + }, + ], + ) + meta = extract_meta(p) + assert meta is not None + assert meta.session_id == "abc123" + assert meta.project_path == "proj" + assert meta.user_messages == 1 + assert meta.assistant_messages == 1 + assert meta.input_tokens == 500 + assert meta.output_tokens == 200 + assert "optimize this function" in meta.first_prompt + + def test_counts_tool_calls(self, tmp_path: Path) -> None: + p = tmp_path / "proj" / "sess.jsonl" + p.parent.mkdir() + _write_jsonl( + p, + [ + { + "type": "assistant", + "timestamp": _ts(0), + "message": { + "content": [ + {"type": "tool_use", "id": "t1", "name": "Read", "input": {"file_path": "/a.py"}}, + { + "type": "tool_use", + "id": "t2", + "name": "Edit", + "input": {"file_path": "/a.py", "old_string": "x", "new_string": "y"}, + }, + ], + "usage": {"input_tokens": 100, "output_tokens": 50}, + }, + }, + ], + ) + meta = extract_meta(p) + assert meta is not None + assert meta.tool_calls == 2 + assert meta.tool_counts == {"Read": 1, "Edit": 1} + + def test_counts_git_commits(self, tmp_path: Path) -> None: + p = tmp_path / "proj" / "sess.jsonl" + p.parent.mkdir() + _write_jsonl( + p, + [ + { + "type": "assistant", + "timestamp": _ts(0), + "message": { + "content": [ + { + "type": "tool_use", + "id": "t1", + "name": "Bash", + "input": {"command": "git commit -m 'fix things'"}, + } + ], + "usage": {}, + }, + }, + ], + ) + meta = extract_meta(p) + assert meta is not None + assert meta.git_commits == 1 + + def test_amend_not_counted_as_commit(self, tmp_path: Path) -> None: + p = tmp_path / "proj" / "sess.jsonl" + p.parent.mkdir() + _write_jsonl( + p, + [ + { + "type": "assistant", + "timestamp": _ts(0), + "message": { + "content": [ + { + "type": "tool_use", + "id": "t1", + "name": "Bash", + "input": {"command": "git commit --amend -m 'fix'"}, + } + ], + "usage": {}, + }, + }, + ], + ) + meta = extract_meta(p) + assert meta is not None + assert meta.git_commits == 0 + + def test_counts_compactions(self, tmp_path: Path) -> None: + p = tmp_path / "proj" / "sess.jsonl" + p.parent.mkdir() + _write_jsonl( + p, + [ + {"type": "user", "timestamp": _ts(0), "message": {"content": "hi"}}, + {"type": "summary", "timestamp": _ts(5)}, + {"type": "summary", "timestamp": _ts(10)}, + ], + ) + meta = extract_meta(p) + assert meta is not None + assert meta.compactions == 2 + + def test_counts_thinking_blocks(self, tmp_path: Path) -> None: + p = tmp_path / "proj" / "sess.jsonl" + p.parent.mkdir() + _write_jsonl( + p, + [ + { + "type": "assistant", + "timestamp": _ts(0), + "message": { + "content": [ + {"type": "thinking", "thinking": "let me think..."}, + {"type": "text", "text": "here's my answer"}, + ], + "usage": {}, + }, + }, + ], + ) + meta = extract_meta(p) + assert meta is not None + assert meta.thinking_blocks == 1 + + def test_tracks_permission_mode(self, tmp_path: Path) -> None: + p = tmp_path / "proj" / "sess.jsonl" + p.parent.mkdir() + _write_jsonl( + p, + [ + {"type": "permission-mode", "timestamp": _ts(0), "permissionMode": "bypassPermissions"}, + {"type": "user", "timestamp": _ts(1), "message": {"content": "go"}}, + ], + ) + meta = extract_meta(p) + assert meta is not None + assert meta.permission_mode == "bypassPermissions" + + def test_tracks_web_usage(self, tmp_path: Path) -> None: + p = tmp_path / "proj" / "sess.jsonl" + p.parent.mkdir() + _write_jsonl( + p, + [ + { + "type": "assistant", + "timestamp": _ts(0), + "message": { + "content": [{"type": "text", "text": "searching"}], + "usage": { + "input_tokens": 100, + "output_tokens": 50, + "server_tool_use": {"web_search_requests": 2, "web_fetch_requests": 1}, + }, + }, + }, + ], + ) + meta = extract_meta(p) + assert meta is not None + assert meta.web_searches == 2 + assert meta.web_fetches == 1 + + def test_skips_invalid_json_lines(self, tmp_path: Path) -> None: + p = tmp_path / "proj" / "sess.jsonl" + p.parent.mkdir() + p.write_text( + json.dumps({"type": "user", "timestamp": _ts(0), "message": {"content": "hi"}}) + + "\nnot valid json\n" + + json.dumps( + { + "type": "assistant", + "timestamp": _ts(1), + "message": {"content": [{"type": "text", "text": "ok"}], "usage": {}}, + } + ) + + "\n" + ) + meta = extract_meta(p) + assert meta is not None + assert meta.user_messages == 1 + assert meta.assistant_messages == 1 + + def test_tracks_tool_errors(self, tmp_path: Path) -> None: + p = tmp_path / "proj" / "sess.jsonl" + p.parent.mkdir() + _write_jsonl( + p, + [ + { + "type": "assistant", + "timestamp": _ts(0), + "message": { + "content": [{"type": "tool_use", "id": "t1", "name": "Bash", "input": {"command": "ls /nope"}}], + "usage": {}, + }, + }, + { + "type": "user", + "timestamp": _ts(1), + "message": { + "content": [ + { + "type": "tool_result", + "tool_use_id": "t1", + "is_error": True, + "content": "command not found", + } + ], + }, + }, + ], + ) + meta = extract_meta(p) + assert meta is not None + assert meta.tool_errors == 1 + assert meta.tool_error_categories["command_not_found"] == 1 + + +# --------------------------------------------------------------------------- +# classify_error +# --------------------------------------------------------------------------- + + +class TestClassifyError: + def test_edit_always_edit_failed(self) -> None: + assert "edit_failed" == classify_error("Edit", {}, {}) + + def test_bash_permission_denied(self) -> None: + block = {"content": "Permission denied"} + assert "permission_denied" == classify_error("Bash", block, {}) + + def test_bash_command_not_found(self) -> None: + block = {"content": "command not found"} + assert "command_not_found" == classify_error("Bash", block, {}) + + def test_bash_generic_failure(self) -> None: + block = {"content": "exit code 1"} + assert "command_failed" == classify_error("Bash", block, {}) + + def test_read_file_not_found(self) -> None: + block = {"content": "no such file"} + assert "file_not_found" == classify_error("Read", block, {}) + + def test_write_file_not_found(self) -> None: + block = {"content": "not found"} + assert "file_not_found" == classify_error("Write", block, {}) + + def test_read_generic_error(self) -> None: + block = {"content": "some io error"} + assert "file_error" == classify_error("Read", block, {}) + + def test_unknown_tool(self) -> None: + assert "tool_error" == classify_error("CustomTool", {}, {}) + + def test_stderr_from_tool_use_result(self) -> None: + raw = {"toolUseResult": {"stderr": "Permission denied"}} + assert "permission_denied" == classify_error("Bash", {"content": ""}, raw) + + +# --------------------------------------------------------------------------- +# track_file_changes +# --------------------------------------------------------------------------- + + +class TestTrackFileChanges: + def test_tracks_edit_tool(self) -> None: + from collections import Counter + + files: set[str] = set() + langs = Counter[str]() + track_file_changes("Edit", {"file_path": "/app/main.py"}, files, langs) + assert "/app/main.py" in files + assert langs["python"] == 1 + + def test_ignores_non_edit_tools(self) -> None: + from collections import Counter + + files: set[str] = set() + langs = Counter[str]() + track_file_changes("Read", {"file_path": "/app/main.py"}, files, langs) + assert len(files) == 0 + + def test_unknown_extension(self) -> None: + from collections import Counter + + files: set[str] = set() + langs = Counter[str]() + track_file_changes("Write", {"file_path": "/app/data.xyz"}, files, langs) + assert "/app/data.xyz" in files + assert len(langs) == 0 + + +# --------------------------------------------------------------------------- +# count_diff_lines +# --------------------------------------------------------------------------- + + +class TestCountDiffLines: + def test_edit_adds_lines(self) -> None: + assert (2, 0) == count_diff_lines("Edit", {"old_string": "a\n", "new_string": "a\nb\nc\n"}) + + def test_edit_removes_lines(self) -> None: + assert (0, 2) == count_diff_lines("Edit", {"old_string": "a\nb\nc\n", "new_string": "a\n"}) + + def test_write_counts_all_lines(self) -> None: + assert (3, 0) == count_diff_lines("Write", {"content": "a\nb\nc"}) + + def test_other_tools_zero(self) -> None: + assert (0, 0) == count_diff_lines("Read", {}) + + +# --------------------------------------------------------------------------- +# detect_codeflash +# --------------------------------------------------------------------------- + + +class TestDetectCodeflash: + def test_returns_none_when_no_signals(self) -> None: + assert detect_codeflash(set(), set(), set(), 0) is None + + def test_detects_from_agents(self) -> None: + cf = detect_codeflash({"codeflash-python", "codeflash-deep"}, set(), set(), 0) + assert cf is not None + assert cf.is_codeflash + assert cf.language == "python" + assert cf.optimization_domain == "deep" + assert "codeflash-deep" in cf.agents_used + assert "codeflash-python" in cf.agents_used + + def test_detects_from_skills(self) -> None: + cf = detect_codeflash(set(), {"codeflash-optimize"}, set(), 0) + assert cf is not None + assert cf.is_codeflash + assert "codeflash-optimize" in cf.skills_invoked + + def test_detects_from_commands(self) -> None: + cf = detect_codeflash(set(), set(), {"codex-review"}, 0) + assert cf is not None + assert "codex-review" in cf.commands_invoked + + def test_tracks_teams(self) -> None: + cf = detect_codeflash({"codeflash"}, set(), set(), 3) + assert cf is not None + assert cf.teams_created == 3 + + def test_detects_researcher(self) -> None: + cf = detect_codeflash({"codeflash-researcher"}, set(), set(), 0) + assert cf is not None + assert cf.has_researcher + + def test_detects_reviewer(self) -> None: + cf = detect_codeflash({"codeflash-review"}, set(), set(), 0) + assert cf is not None + assert cf.has_reviewer + + def test_detects_ci_handler(self) -> None: + cf = detect_codeflash({"codeflash-ci"}, set(), set(), 0) + assert cf is not None + assert cf.has_ci_handler + + def test_detects_pr_prep(self) -> None: + cf = detect_codeflash({"codeflash-pr-prep"}, set(), set(), 0) + assert cf is not None + assert cf.has_pr_prep + + def test_infers_javascript_from_prefix(self) -> None: + cf = detect_codeflash({"codeflash-js-cpu"}, set(), set(), 0) + assert cf is not None + assert cf.language == "javascript" + assert cf.optimization_domain == "cpu" + + def test_infers_java_from_prefix(self) -> None: + cf = detect_codeflash({"codeflash-java-memory"}, set(), set(), 0) + assert cf is not None + assert cf.language == "java" + assert cf.optimization_domain == "memory" + + def test_memory_domain(self) -> None: + cf = detect_codeflash({"codeflash-memory"}, set(), set(), 0) + assert cf is not None + assert cf.optimization_domain == "memory" + + def test_async_domain(self) -> None: + cf = detect_codeflash({"codeflash-async"}, set(), set(), 0) + assert cf is not None + assert cf.optimization_domain == "async" + + def test_structure_domain(self) -> None: + cf = detect_codeflash({"codeflash-structure"}, set(), set(), 0) + assert cf is not None + assert cf.optimization_domain == "structure" + + def test_bundle_domain(self) -> None: + cf = detect_codeflash({"codeflash-js-bundle"}, set(), set(), 0) + assert cf is not None + assert cf.optimization_domain == "bundle" + + +# --------------------------------------------------------------------------- +# _infer_language / _infer_domain +# --------------------------------------------------------------------------- + + +class TestInferLanguage: + def test_python_from_marker(self) -> None: + assert "python" == infer_language({"codeflash-python"}) + + def test_javascript_from_marker(self) -> None: + assert "javascript" == infer_language({"codeflash-javascript"}) + + def test_javascript_from_js_prefix(self) -> None: + assert "javascript" == infer_language({"codeflash-js-deep"}) + + def test_java_from_marker(self) -> None: + assert "java" == infer_language({"codeflash-java"}) + + def test_java_from_prefix(self) -> None: + assert "java" == infer_language({"codeflash-java-cpu"}) + + def test_none_for_generic_agent(self) -> None: + assert infer_language({"codeflash"}) is None + + def test_none_for_empty(self) -> None: + assert infer_language(set()) is None + + +class TestInferDomain: + def test_cpu(self) -> None: + assert "cpu" == infer_domain({"codeflash-cpu"}) + + def test_memory(self) -> None: + assert "memory" == infer_domain({"codeflash-memory"}) + + def test_deep(self) -> None: + assert "deep" == infer_domain({"codeflash-deep"}) + + def test_async(self) -> None: + assert "async" == infer_domain({"codeflash-async"}) + + def test_structure(self) -> None: + assert "structure" == infer_domain({"codeflash-structure"}) + + def test_bundle(self) -> None: + assert "bundle" == infer_domain({"codeflash-js-bundle"}) + + def test_none_for_router_only(self) -> None: + assert infer_domain({"codeflash-python"}) is None + + def test_none_for_empty(self) -> None: + assert infer_domain(set()) is None + + +# --------------------------------------------------------------------------- +# extract_meta codeflash integration +# --------------------------------------------------------------------------- + + +class TestExtractMetaCodeflash: + def test_non_codeflash_session_has_none(self, tmp_path: Path) -> None: + p = tmp_path / "proj" / "sess.jsonl" + p.parent.mkdir() + _write_jsonl( + p, + [ + {"type": "user", "timestamp": _ts(0), "message": {"content": "hello"}}, + { + "type": "assistant", + "timestamp": _ts(1), + "message": { + "content": [{"type": "text", "text": "hi"}], + "usage": {}, + }, + }, + ], + ) + meta = extract_meta(p) + assert meta is not None + assert meta.codeflash is None + + def test_detects_codeflash_agent_spawn(self, tmp_path: Path) -> None: + p = tmp_path / "proj" / "sess.jsonl" + p.parent.mkdir() + _write_jsonl( + p, + [ + { + "type": "assistant", + "timestamp": _ts(0), + "message": { + "content": [ + { + "type": "tool_use", + "id": "t1", + "name": "Agent", + "input": {"name": "codeflash-python", "prompt": "optimize"}, + } + ], + "usage": {}, + }, + }, + ], + ) + meta = extract_meta(p) + assert meta is not None + assert meta.codeflash is not None + assert meta.codeflash.is_codeflash + assert meta.codeflash.language == "python" + assert "codeflash-python" in meta.codeflash.agents_used + + def test_detects_codeflash_skill(self, tmp_path: Path) -> None: + p = tmp_path / "proj" / "sess.jsonl" + p.parent.mkdir() + _write_jsonl( + p, + [ + { + "type": "assistant", + "timestamp": _ts(0), + "message": { + "content": [ + { + "type": "tool_use", + "id": "t1", + "name": "Skill", + "input": {"skill": "codeflash-optimize"}, + } + ], + "usage": {}, + }, + }, + ], + ) + meta = extract_meta(p) + assert meta is not None + assert meta.codeflash is not None + assert "codeflash-optimize" in meta.codeflash.skills_invoked + + def test_detects_team_creates(self, tmp_path: Path) -> None: + p = tmp_path / "proj" / "sess.jsonl" + p.parent.mkdir() + _write_jsonl( + p, + [ + { + "type": "assistant", + "timestamp": _ts(0), + "message": { + "content": [ + {"type": "tool_use", "id": "t1", "name": "TeamCreate", "input": {}}, + { + "type": "tool_use", + "id": "t2", + "name": "Agent", + "input": {"name": "codeflash-deep", "prompt": "go"}, + }, + ], + "usage": {}, + }, + }, + ], + ) + meta = extract_meta(p) + assert meta is not None + assert meta.codeflash is not None + assert meta.codeflash.teams_created == 1 + + def test_detects_multiple_agents(self, tmp_path: Path) -> None: + p = tmp_path / "proj" / "sess.jsonl" + p.parent.mkdir() + _write_jsonl( + p, + [ + { + "type": "assistant", + "timestamp": _ts(0), + "message": { + "content": [ + { + "type": "tool_use", + "id": "t1", + "name": "Agent", + "input": {"name": "codeflash-python", "prompt": "start"}, + }, + { + "type": "tool_use", + "id": "t2", + "name": "Agent", + "input": {"name": "codeflash-deep", "prompt": "optimize"}, + }, + { + "type": "tool_use", + "id": "t3", + "name": "Agent", + "input": {"name": "codeflash-researcher", "prompt": "research"}, + }, + { + "type": "tool_use", + "id": "t4", + "name": "Agent", + "input": {"name": "codeflash-review", "prompt": "review"}, + }, + ], + "usage": {}, + }, + }, + ], + ) + meta = extract_meta(p) + assert meta is not None + cf = meta.codeflash + assert cf is not None + assert cf.language == "python" + assert cf.optimization_domain == "deep" + assert cf.has_researcher + assert cf.has_reviewer + assert len(cf.agents_used) == 4 diff --git a/packages/blackbox/tests/test_cli.py b/packages/blackbox/tests/test_cli.py new file mode 100644 index 0000000..e6dd1e5 --- /dev/null +++ b/packages/blackbox/tests/test_cli.py @@ -0,0 +1,54 @@ +from __future__ import annotations + +from argparse import Namespace + +import pytest + +from blackbox.cli import main, parse_args, run + + +class TestParseArgs: + def test_serve_defaults(self) -> None: + args = parse_args(["serve"]).unwrap() + assert "serve" == args.command + assert 7100 == args.port + assert args.no_open is False + + def test_serve_custom_port(self) -> None: + args = parse_args(["serve", "--port", "8080"]).unwrap() + assert 8080 == args.port + + def test_serve_no_open(self) -> None: + args = parse_args(["serve", "--no-open"]).unwrap() + assert args.no_open is True + + def test_no_command_errors(self) -> None: + with pytest.raises(SystemExit): + parse_args([]) + + +class TestRun: + def test_serve_launches_uvicorn(self, monkeypatch: pytest.MonkeyPatch) -> None: + called_with: dict[str, object] = {} + + def fake_uvicorn_run(app: object, **kwargs: object) -> None: + called_with["app"] = app + called_with.update(kwargs) + + monkeypatch.setattr("uvicorn.run", fake_uvicorn_run) + args = parse_args(["serve", "--no-open"]).unwrap() + run(args).unwrap() + assert "127.0.0.1" == called_with["host"] + assert 7100 == called_with["port"] + + def test_unknown_command(self) -> None: + args = Namespace(command="bogus") + result = run(args) + assert not result.is_ok() + + +class TestMain: + def test_main_serve(self, monkeypatch: pytest.MonkeyPatch) -> None: + monkeypatch.setattr("sys.argv", ["blackbox", "serve", "--no-open"]) + monkeypatch.setattr("uvicorn.run", lambda *a, **kw: None) + main() diff --git a/packages/blackbox/tests/test_formatting.py b/packages/blackbox/tests/test_formatting.py new file mode 100644 index 0000000..63e0a69 --- /dev/null +++ b/packages/blackbox/tests/test_formatting.py @@ -0,0 +1,333 @@ +from __future__ import annotations + +import json +from typing import Any + +from blackbox.formatting import ( + AuditFormatter, + DigestFormatter, + MetaFormatter, + ProjectFormatter, + RecommendationFormatter, +) +from blackbox.models import ( + ProjectStats, + Recommendation, + SessionAudit, + SessionDigest, + WeekStats, +) +from tests.conftest import make_meta + +# --------------------------------------------------------------------------- +# MetaFormatter +# --------------------------------------------------------------------------- + + +class TestMetaFormatter: + def test_basic(self) -> None: + text = MetaFormatter(make_meta(input_tokens=5000, output_tokens=2000, tool_errors=2)).summary() + assert "abcd1234" in text + assert "60min" in text + assert "10 user / 12 assistant" in text + assert "25 calls (2 errors)" in text + assert "5,000 in / 2,000 out" in text + + def test_with_git(self) -> None: + assert "5 commits on main" in MetaFormatter(make_meta(git_commits=5, git_branch="main")).summary() + + def test_git_without_branch(self) -> None: + assert "unknown" in MetaFormatter(make_meta(git_commits=1, git_branch=None)).summary() + + def test_with_files(self) -> None: + text = MetaFormatter(make_meta(files_modified=3, lines_added=100, lines_removed=20)).summary() + assert "3 modified" in text + assert "+100/-20" in text + + def test_without_files(self) -> None: + assert "modified" not in MetaFormatter(make_meta(files_modified=0)).summary() + + def test_with_compactions(self) -> None: + assert "Compactions: 3" in MetaFormatter(make_meta(compactions=3)).summary() + + def test_without_compactions(self) -> None: + assert "Compactions" not in MetaFormatter(make_meta(compactions=0)).summary() + + def test_with_interruptions(self) -> None: + assert "Interruptions: 2" in MetaFormatter(make_meta(user_interruptions=2)).summary() + + def test_without_interruptions(self) -> None: + assert "Interruptions" not in MetaFormatter(make_meta(user_interruptions=0)).summary() + + def test_top_tools_capped_at_5(self) -> None: + meta = make_meta(tool_counts={"Read": 20, "Edit": 15, "Bash": 10, "Write": 5, "Grep": 3, "X": 1}) + text = MetaFormatter(meta).summary() + assert "Read=20" in text + assert "X=1" not in text + + def test_no_top_tools_when_empty(self) -> None: + assert "Top tools" not in MetaFormatter(make_meta(tool_counts={})).summary() + + def test_thinking_blocks_shown_when_nonzero(self) -> None: + assert "Thinking blocks: 5" in MetaFormatter(make_meta(thinking_blocks=5)).summary() + + def test_thinking_blocks_hidden_when_zero(self) -> None: + assert "Thinking blocks" not in MetaFormatter(make_meta(thinking_blocks=0)).summary() + + def test_web_shown_when_nonzero(self) -> None: + text = MetaFormatter(make_meta(web_searches=3, web_fetches=1)).summary() + assert "Web: 3 searches / 1 fetches" in text + + def test_web_hidden_when_zero(self) -> None: + assert "Web:" not in MetaFormatter(make_meta(web_searches=0, web_fetches=0)).summary() + + def test_permission_mode_shown_when_set(self) -> None: + text = MetaFormatter(make_meta(permission_mode="bypassPermissions")).summary() + assert "Permission mode: bypassPermissions" in text + + def test_permission_mode_hidden_when_none(self) -> None: + assert "Permission mode" not in MetaFormatter(make_meta(permission_mode=None)).summary() + + +# --------------------------------------------------------------------------- +# AuditFormatter +# --------------------------------------------------------------------------- + + +class TestAuditFormatter: + def test_basic(self) -> None: + a = SessionAudit( + session_id="abcd1234-5678", + outcome="success", + satisfaction="positive", + session_type="debugging", + ) + text = AuditFormatter(a).summary() + assert "abcd1234" in text + assert "Outcome: success" in text + assert "Satisfaction: positive" in text + assert "Type: debugging" in text + + def test_with_goals(self) -> None: + a = SessionAudit(session_id="x", goal_categories={"bugfix": 5, "refactor": 3}) + text = AuditFormatter(a).summary() + assert "Goals:" in text + assert "bugfix(5)" in text + + def test_without_goals(self) -> None: + assert "Goals" not in AuditFormatter(SessionAudit(session_id="x", goal_categories={})).summary() + + def test_with_friction(self) -> None: + a = SessionAudit(session_id="x", friction_counts={"permission_denied": 4}) + assert "permission_denied(4)" in AuditFormatter(a).summary() + + def test_without_friction(self) -> None: + assert "Friction" not in AuditFormatter(SessionAudit(session_id="x")).summary() + + def test_with_instructions(self) -> None: + a = SessionAudit(session_id="x", user_instructions=("use pytest", "no comments")) + assert "Instructions: 2 extracted" in AuditFormatter(a).summary() + + def test_without_instructions(self) -> None: + assert "Instructions" not in AuditFormatter(SessionAudit(session_id="x")).summary() + + def test_summary_truncated_at_120(self) -> None: + a = SessionAudit(session_id="x", summary="x" * 200) + text = AuditFormatter(a).summary() + summary_line = next(line for line in text.split("\n") if "Summary" in line) + assert len(summary_line.split("Summary: ")[1]) == 120 + + +# --------------------------------------------------------------------------- +# RecommendationFormatter +# --------------------------------------------------------------------------- + + +class TestRecommendationFormatter: + def test_basic(self) -> None: + r = Recommendation(suggestion="do X", evidence="50% failure", frequency=0.5, source_sessions=5) + text = RecommendationFormatter(r).summary() + assert "do X" in text + assert "50% failure" in text + + +# --------------------------------------------------------------------------- +# ProjectFormatter +# --------------------------------------------------------------------------- + + +class TestProjectFormatter: + def make(self, **kw: Any) -> ProjectStats: + defaults: dict[str, Any] = { + "project_path": "/proj/myapp", + "project_name": "myapp", + "session_count": 10, + "success_rate": 0.9, + "avg_tool_errors": 2.5, + "avg_duration_s": 600.0, + "top_error_categories": (), + "top_friction": (), + } + defaults.update(kw) + return ProjectStats(**defaults) + + def test_basic(self) -> None: + text = ProjectFormatter(self.make()).summary() + assert "myapp: 10 sessions" in text + assert "90% success" in text + + def test_outlier_marker(self) -> None: + assert "[!]" in ProjectFormatter(self.make(is_outlier=True)).summary() + + def test_error_categories_shown(self) -> None: + p = self.make(top_error_categories=(("edit_failed", 8), ("command_failed", 3))) + assert "Errors: edit_failed(8)" in ProjectFormatter(p).summary() + + def test_friction_shown(self) -> None: + p = self.make(top_friction=(("user_rejected", 4),)) + assert "Friction: user_rejected(4)" in ProjectFormatter(p).summary() + + def test_no_sub_lines_when_clean(self) -> None: + text = ProjectFormatter(self.make()).summary() + assert len(text.strip().split("\n")) == 1 + + +# --------------------------------------------------------------------------- +# DigestFormatter +# --------------------------------------------------------------------------- + + +class TestDigestFormatter: + def make(self, **kw: Any) -> SessionDigest: + defaults: dict[str, Any] = {"session_count": 10, "date_range": (100.0, 500.0), "success_rate": 0.8} + defaults.update(kw) + return SessionDigest(**defaults) + + def test_includes_count(self) -> None: + assert "42 sessions" in DigestFormatter(self.make(session_count=42)).summary() + + def test_success_rate(self) -> None: + assert "80% success rate" in DigestFormatter(self.make(success_rate=0.8)).summary() + + def test_outcome_distribution(self) -> None: + digest = self.make( + session_count=10, + outcome_distribution={"fully_achieved": 7, "unclear": 3}, + ) + text = DigestFormatter(digest).summary() + assert "fully_achieved: 7 (70%)" in text + + def test_no_trends_without_weeks(self) -> None: + assert "Trends" not in DigestFormatter(self.make()).summary() + + def test_trends_with_weeks(self) -> None: + w = WeekStats( + week="2026-W17", session_count=5, success_rate=0.7, avg_errors_per_session=1.0, avg_duration_s=600.0 + ) + digest = self.make(weeks=(w,), rolling_success_rate=0.7) + text = DigestFormatter(digest).summary() + assert "Trends" in text + assert "2026-W17" in text + + def test_no_projects_without_data(self) -> None: + assert "Projects" not in DigestFormatter(self.make()).summary() + + def test_no_recommendations_without_data(self) -> None: + assert "Recommendations" not in DigestFormatter(self.make()).summary() + + def test_with_recommendations(self) -> None: + r = Recommendation(suggestion="Fix the thing", evidence="50% failure", frequency=0.5, source_sessions=10) + text = DigestFormatter(self.make(recommendations=(r,))).summary() + assert "Recommendations" in text + assert "1. Fix the thing" in text + + def test_satisfaction_distribution(self) -> None: + digest = self.make( + session_count=10, + satisfaction_distribution={"happy": 6, "neutral": 4}, + ) + text = DigestFormatter(digest).summary() + assert "Satisfaction:" in text + assert "happy: 6" in text + + def test_top_friction(self) -> None: + digest = self.make(top_friction=(("tool_failed", 12), ("blocked", 3))) + text = DigestFormatter(digest).summary() + assert "Top friction:" in text + assert "tool_failed: 12" in text + + def test_sparkline_with_two_weeks(self) -> None: + w1 = WeekStats( + week="2026-W16", session_count=3, success_rate=0.5, avg_errors_per_session=2.0, avg_duration_s=600.0 + ) + w2 = WeekStats( + week="2026-W17", session_count=4, success_rate=0.9, avg_errors_per_session=0.5, avg_duration_s=400.0 + ) + text = DigestFormatter(self.make(weeks=(w1, w2), rolling_success_rate=0.7)).summary() + assert "Success: [" in text + assert "Errors: [" in text + + def test_error_category_deltas(self) -> None: + w = WeekStats( + week="2026-W17", session_count=3, success_rate=0.7, avg_errors_per_session=1.0, avg_duration_s=600.0 + ) + digest = self.make( + weeks=(w,), + error_category_deltas=(("command_failed", 0.5, 4.0, 6.0),), + ) + text = DigestFormatter(digest).summary() + assert "Error category trends:" in text + assert "command_failed" in text + + def test_with_projects(self) -> None: + p = ProjectStats( + project_path="/proj/myapp", + project_name="myapp", + session_count=5, + success_rate=0.8, + avg_tool_errors=1.0, + avg_duration_s=300.0, + top_error_categories=(), + top_friction=(), + ) + text = DigestFormatter(self.make(projects=(p,))).summary() + assert "Projects (1)" in text + assert "myapp" in text + + +class TestDigestToJson: + def make(self, **kw: Any) -> SessionDigest: + defaults: dict[str, Any] = {"session_count": 10, "date_range": (100.0, 500.0), "success_rate": 0.8} + defaults.update(kw) + return SessionDigest(**defaults) + + def test_valid_json(self) -> None: + j = DigestFormatter(self.make(session_count=5)).to_json() + parsed = json.loads(j) + assert parsed["session_count"] == 5 + + def test_with_nested_weeks(self) -> None: + w = WeekStats( + week="2026-W17", session_count=3, success_rate=0.7, avg_errors_per_session=1.0, avg_duration_s=600.0 + ) + j = DigestFormatter(self.make(weeks=(w,))).to_json() + parsed = json.loads(j) + assert len(parsed["weeks"]) == 1 + assert parsed["weeks"][0]["week"] == "2026-W17" + + def test_with_nested_projects(self) -> None: + p = ProjectStats( + project_path="/p", + project_name="p", + session_count=5, + success_rate=0.8, + avg_tool_errors=1.0, + avg_duration_s=300.0, + top_error_categories=(), + top_friction=(), + is_outlier=False, + ) + j = DigestFormatter(self.make(projects=(p,))).to_json() + parsed = json.loads(j) + assert len(parsed["projects"]) == 1 + assert parsed["projects"][0]["project_name"] == "p" diff --git a/packages/blackbox/tests/test_models.py b/packages/blackbox/tests/test_models.py new file mode 100644 index 0000000..7d9b793 --- /dev/null +++ b/packages/blackbox/tests/test_models.py @@ -0,0 +1,305 @@ +from __future__ import annotations + +import json +from typing import Any + +import attrs +import pytest + +from blackbox.models import ( + ProjectStats, + Recommendation, + SessionAudit, + SessionDigest, + SessionEvent, + WeekStats, + arrow, + sparkline, +) +from tests.conftest import make_audit, make_meta + +# --------------------------------------------------------------------------- +# sparkline and arrow +# --------------------------------------------------------------------------- + + +class TestSparkline: + def test_empty_or_single_returns_empty(self) -> None: + assert "" == sparkline([]) + assert "" == sparkline([1.0]) + + def test_ascending_produces_increasing_chars(self) -> None: + result = sparkline([0.0, 0.5, 1.0]) + assert len(result) == 3 + assert result[0] <= result[-1] + + def test_descending_produces_decreasing_chars(self) -> None: + result = sparkline([1.0, 0.5, 0.0]) + assert result[0] >= result[-1] + + def test_constant_values_produce_middle_char(self) -> None: + result = sparkline([5.0, 5.0, 5.0]) + assert len(result) == 3 + assert len(set(result)) == 1 + + def test_two_values_uses_full_range(self) -> None: + result = sparkline([0.0, 1.0]) + assert len(result) == 2 + assert result[0] != result[-1] + + +class TestArrow: + def test_near_zero_delta_returns_equals(self) -> None: + assert "=" == arrow(0.0) + assert "=" == arrow(0.04) + assert "=" == arrow(-0.04) + + def test_positive_delta_returns_up(self) -> None: + assert "^" == arrow(0.1) + + def test_negative_delta_returns_down(self) -> None: + assert "v" == arrow(-0.1) + + def test_invert_flips_positive(self) -> None: + assert "v" == arrow(0.1, invert=True) + + def test_invert_flips_negative(self) -> None: + assert "^" == arrow(-0.1, invert=True) + + def test_invert_near_zero_still_equals(self) -> None: + assert "=" == arrow(0.0, invert=True) + + +# --------------------------------------------------------------------------- +# SessionEvent +# --------------------------------------------------------------------------- + + +class TestSessionEvent: + def test_construction(self) -> None: + e = SessionEvent( + timestamp="2024-01-01T00:00:00Z", + speaker="user", + text="hello", + tool_name=None, + file_path=None, + command=None, + is_error=False, + error_category=None, + attachment_type=None, + ) + assert e.speaker == "user" + assert e.text == "hello" + assert not e.is_error + + def test_frozen(self) -> None: + e = SessionEvent("ts", "user", "hi", None, None, None, False, None, None) + with pytest.raises(attrs.exceptions.FrozenInstanceError): + e.speaker = "assistant" # type: ignore[misc] + + def test_equality(self) -> None: + e1 = SessionEvent("ts", "user", "hi", None, None, None, False, None, None) + e2 = SessionEvent("ts", "user", "hi", None, None, None, False, None, None) + assert e1 == e2 + + def test_attrs_asdict(self) -> None: + e = SessionEvent("ts", "user", "hi", None, None, None, False, None, None) + d = attrs.asdict(e) + assert d["speaker"] == "user" + assert d["text"] == "hi" + assert json.dumps(d) # JSON-serializable + + +# --------------------------------------------------------------------------- +# SessionMeta — properties +# --------------------------------------------------------------------------- + + +class TestSessionMetaProperties: + def test_duration_minutes(self) -> None: + assert make_meta(duration_s=3600.0).duration_minutes == 60.0 + + def test_duration_minutes_zero(self) -> None: + assert make_meta(duration_s=0.0).duration_minutes == 0.0 + + def test_total_tokens(self) -> None: + assert make_meta(input_tokens=1000, output_tokens=500).total_tokens == 1500 + + def test_total_tokens_default(self) -> None: + assert make_meta().total_tokens == 0 + + def test_cache_hit_rate(self) -> None: + meta = make_meta(input_tokens=500, cache_read_tokens=300, cache_creation_tokens=200) + assert meta.cache_hit_rate == 0.3 + + def test_cache_hit_rate_zero_tokens(self) -> None: + assert make_meta().cache_hit_rate == 0.0 + + def test_cache_hit_rate_full(self) -> None: + meta = make_meta(input_tokens=0, cache_read_tokens=1000, cache_creation_tokens=0) + assert meta.cache_hit_rate == 1.0 + + +class TestSessionMetaFrozen: + def test_frozen(self) -> None: + meta = make_meta() + with pytest.raises(attrs.exceptions.FrozenInstanceError): + meta.session_id = "new" # type: ignore[misc] + + +class TestSessionMetaAsDict: + def test_returns_dict(self) -> None: + d = attrs.asdict(make_meta()) + assert isinstance(d, dict) + assert d["session_id"] == "abcd1234-5678-9012-3456-789012345678" + assert d["duration_s"] == 3600.0 + + def test_includes_optional_fields(self) -> None: + d = attrs.asdict(make_meta(git_branch="feature", git_commits=3)) + assert d["git_branch"] == "feature" + assert d["git_commits"] == 3 + + +# --------------------------------------------------------------------------- +# SessionAudit +# --------------------------------------------------------------------------- + + +class TestSessionAuditDefaults: + def test_defaults(self) -> None: + a = SessionAudit(session_id="x") + assert a.outcome == "unclear" + assert a.satisfaction == "neutral" + assert a.session_type == "single_task" + assert a.goal_categories == {} + assert a.friction_counts == {} + assert a.user_instructions == () + assert a.summary == "" + + def test_frozen(self) -> None: + a = SessionAudit(session_id="x") + with pytest.raises(attrs.exceptions.FrozenInstanceError): + a.outcome = "success" # type: ignore[misc] + + +class TestSessionAuditAsDict: + def test_returns_dict(self) -> None: + a = make_audit() + d = attrs.asdict(a) + assert isinstance(d, dict) + assert d["outcome"] == "mostly_achieved" + + def test_reflects_values(self) -> None: + a = make_audit(outcome="success", satisfaction="positive", session_type="multi_task") + d = attrs.asdict(a) + assert d["outcome"] == "success" + assert d["session_type"] == "multi_task" + + +# --------------------------------------------------------------------------- +# ProjectStats — mutable is_outlier +# --------------------------------------------------------------------------- + + +class TestProjectStats: + def make(self, **kw: Any) -> ProjectStats: + defaults: dict[str, Any] = { + "project_path": "/proj/myapp", + "project_name": "myapp", + "session_count": 10, + "success_rate": 0.9, + "avg_tool_errors": 2.5, + "avg_duration_s": 600.0, + "top_error_categories": (), + "top_friction": (), + } + defaults.update(kw) + return ProjectStats(**defaults) + + def test_is_outlier_default_false(self) -> None: + assert not self.make().is_outlier + + def test_is_outlier_mutable(self) -> None: + p = self.make() + p.is_outlier = True + assert p.is_outlier + + +# --------------------------------------------------------------------------- +# WeekStats + Recommendation +# --------------------------------------------------------------------------- + + +class TestWeekStats: + def test_frozen(self) -> None: + w = WeekStats( + week="2026-W17", session_count=3, success_rate=0.7, avg_errors_per_session=1.0, avg_duration_s=600.0 + ) + with pytest.raises(attrs.exceptions.FrozenInstanceError): + w.session_count = 5 # type: ignore[misc] + + def test_default_error_counts(self) -> None: + w = WeekStats( + week="2026-W17", session_count=3, success_rate=0.7, avg_errors_per_session=1.0, avg_duration_s=600.0 + ) + assert w.error_category_counts == {} + + +class TestRecommendation: + def test_frozen(self) -> None: + r = Recommendation(suggestion="do X", evidence="50%", frequency=0.5, source_sessions=5) + with pytest.raises(attrs.exceptions.FrozenInstanceError): + r.suggestion = "do Y" # type: ignore[misc] + + +# --------------------------------------------------------------------------- +# SessionDigest +# --------------------------------------------------------------------------- + + +class TestSessionDigest: + def make(self, **kw: Any) -> SessionDigest: + defaults: dict[str, Any] = {"session_count": 10, "date_range": (100.0, 500.0), "success_rate": 0.8} + defaults.update(kw) + return SessionDigest(**defaults) + + def test_attrs_asdict(self) -> None: + d = attrs.asdict(self.make()) + assert d["session_count"] == 10 + assert d["success_rate"] == 0.8 + + def test_frozen(self) -> None: + d = self.make() + with pytest.raises(attrs.exceptions.FrozenInstanceError): + d.session_count = 99 # type: ignore[misc] + + def test_json_serializable(self) -> None: + j = json.dumps(attrs.asdict(self.make(session_count=5)), indent=2, default=str) + parsed = json.loads(j) + assert parsed["session_count"] == 5 + + def test_json_with_nested_weeks(self) -> None: + w = WeekStats( + week="2026-W17", session_count=3, success_rate=0.7, avg_errors_per_session=1.0, avg_duration_s=600.0 + ) + j = json.dumps(attrs.asdict(self.make(weeks=(w,))), indent=2, default=str) + parsed = json.loads(j) + assert len(parsed["weeks"]) == 1 + assert parsed["weeks"][0]["week"] == "2026-W17" + + def test_json_with_nested_projects(self) -> None: + p = ProjectStats( + project_path="/p", + project_name="p", + session_count=5, + success_rate=0.8, + avg_tool_errors=1.0, + avg_duration_s=300.0, + top_error_categories=(), + top_friction=(), + is_outlier=False, + ) + j = json.dumps(attrs.asdict(self.make(projects=(p,))), indent=2, default=str) + parsed = json.loads(j) + assert len(parsed["projects"]) == 1 + assert parsed["projects"][0]["project_name"] == "p" diff --git a/packages/blackbox/tests/test_rendering.py b/packages/blackbox/tests/test_rendering.py new file mode 100644 index 0000000..429d589 --- /dev/null +++ b/packages/blackbox/tests/test_rendering.py @@ -0,0 +1,268 @@ +from __future__ import annotations + +import time + +from blackbox.dashboard.rendering import ( + esc, + esc_md, + fmt_duration, + fmt_relative, + fmt_time, + passes_filter, + render_log_html, + shorten_paths, + tool_call_html, +) +from blackbox.models import LogEntry + +# --------------------------------------------------------------------------- +# fmt_time +# --------------------------------------------------------------------------- + + +class TestFmtTime: + def test_epoch_zero(self) -> None: + assert "00:00:00" == fmt_time(0.0) + + def test_known_timestamp(self) -> None: + assert "01:46:40" == fmt_time(1_000_000_000.0) + + def test_fractional_seconds_truncated(self) -> None: + assert "00:00:00" == fmt_time(0.999) + + +# --------------------------------------------------------------------------- +# fmt_duration +# --------------------------------------------------------------------------- + + +class TestFmtDuration: + def test_zero_seconds(self) -> None: + assert "0s" == fmt_duration(100.0, 100.0) + + def test_seconds_only(self) -> None: + assert "45s" == fmt_duration(0.0, 45.0) + + def test_minutes_and_seconds(self) -> None: + assert "2m30s" == fmt_duration(0.0, 150.0) + + def test_hours_and_minutes(self) -> None: + assert "1h30m" == fmt_duration(0.0, 5400.0) + + def test_negative_clamps_to_zero(self) -> None: + assert "0s" == fmt_duration(100.0, 50.0) + + def test_none_finished_uses_current_time(self) -> None: + result = fmt_duration(time.time() - 10, None) + assert result.endswith("s") + + def test_exactly_60_seconds(self) -> None: + assert "1m00s" == fmt_duration(0.0, 60.0) + + def test_exactly_one_hour(self) -> None: + assert "1h00m" == fmt_duration(0.0, 3600.0) + + +# --------------------------------------------------------------------------- +# fmt_relative +# --------------------------------------------------------------------------- + + +class TestFmtRelative: + def test_just_now(self) -> None: + assert "just now" == fmt_relative(time.time()) + + def test_minutes_ago(self) -> None: + assert "5m ago" == fmt_relative(time.time() - 300) + + def test_hours_ago(self) -> None: + assert "2h ago" == fmt_relative(time.time() - 7200) + + def test_days_ago(self) -> None: + assert "3d ago" == fmt_relative(time.time() - 259200) + + +# --------------------------------------------------------------------------- +# esc / esc_md +# --------------------------------------------------------------------------- + + +class TestEsc: + def test_ampersand(self) -> None: + assert "a & b" == esc("a & b") + + def test_angle_brackets(self) -> None: + assert "<div>" == esc("
") + + def test_newlines_become_br(self) -> None: + assert "a
b" == esc("a\nb") + + def test_combined(self) -> None: + assert "<b>hi</b>
&" == esc("hi\n&") + + +class TestEscMd: + def test_bold_converted(self) -> None: + result = esc_md("hello **world**") + assert 'world' in result + + def test_html_still_escaped(self) -> None: + result = esc_md("