perf(transcript): cache fromisoformat, single-pass parsing (#45)

* perf(transcript): cache fromisoformat, local json.loads, single-pass parsing

Move datetime import to module level with cached fromisoformat,
use bytes split for JSONL, inline tool_result iteration in parse_user_entry,
promote decode_project_name filter set to module-level frozenset.

* fix: use splitlines for JSONL parsing

split("\n") leaves \r on lines from Windows-originated JSONL files,
which can cause json.loads failures. splitlines() handles all line
ending variants.

* fix: add noqa C901 for inlined parse_user_entry

The tool_result iteration was inlined for single-pass performance,
which pushes complexity above the C901 threshold.

* Add blackbox benchmark VM infra

D2s_v5 (non-burstable, 2 vCPU, 8 GB) with cloud-init provisioning,
CPU-pinned benchmarks, and A/B comparison scripts.

---------

Co-authored-by: codeflash[bot] <codeflash[bot]@users.noreply.github.com>
This commit is contained in:
Kevin Turcios 2026-04-29 03:22:44 -05:00 committed by GitHub
parent 1ff2a76152
commit 41edcf06e1
No known key found for this signature in database
GPG key ID: B5690EEEBB952194

View file

@ -3,20 +3,23 @@
from __future__ import annotations
import json
from datetime import UTC, datetime
from pathlib import Path
from typing import Any
from blackbox.models import LogEntry, SessionInfo
_fromisoformat = datetime.fromisoformat
def ts_to_epoch(ts: str | None) -> float:
if not ts:
return 0.0
from datetime import UTC, datetime # noqa: PLC0415
try:
dt = datetime.fromisoformat(ts)
return dt.replace(tzinfo=UTC if dt.tzinfo is None else dt.tzinfo).timestamp()
dt = _fromisoformat(ts)
if dt.tzinfo is None:
return dt.replace(tzinfo=UTC).timestamp()
return dt.timestamp()
except (ValueError, AttributeError):
return 0.0
@ -46,15 +49,16 @@ def extract_tool_results(content: Any) -> list[dict[str, Any]]:
def parse_transcript(path: Path) -> list[LogEntry]:
"""Parse a Claude Code .jsonl transcript into a list of LogEntry objects."""
entries: list[LogEntry] = []
for line in path.read_text().splitlines():
if not line.strip():
_extend = entries.extend
_loads = json.loads
for line in path.read_bytes().decode("utf-8").splitlines():
if not line or line.isspace():
continue
try:
raw = json.loads(line)
raw = _loads(line)
except json.JSONDecodeError:
continue
parsed = parse_entry(raw)
entries.extend(parsed)
_extend(parse_entry(raw))
return entries
@ -92,32 +96,41 @@ def parse_entry(raw: dict[str, Any]) -> list[LogEntry]:
return []
def parse_user_entry(ts: float, message: Any, raw: dict[str, Any]) -> list[LogEntry]:
def parse_user_entry(ts: float, message: Any, raw: dict[str, Any]) -> list[LogEntry]: # noqa: C901
if not isinstance(message, dict):
return []
content = message.get("content", "")
entries: list[LogEntry] = []
tool_results = extract_tool_results(content)
if tool_results:
for tr in tool_results:
result_text = tr.get("content", "")
if isinstance(result_text, list):
result_text = " ".join(b.get("text", "") for b in result_text if isinstance(b, dict))
is_error = tr.get("is_error", False)
tool_use_result = raw.get("toolUseResult", {})
if not isinstance(tool_use_result, dict):
tool_use_result = {}
stdout = tool_use_result.get("stdout", "")
stderr = tool_use_result.get("stderr", "")
display = stdout or result_text or ""
if is_error and stderr:
display = stderr
level = "error" if is_error else "tool_result"
entries.append(LogEntry(timestamp=ts, source="claude", level=level, message=display[:2000]))
return entries
if isinstance(content, list):
has_tool_result = False
for block in content:
if isinstance(block, dict) and block.get("type") == "tool_result":
has_tool_result = True
result_text = block.get("content", "")
if isinstance(result_text, list):
result_text = " ".join(b.get("text", "") for b in result_text if isinstance(b, dict))
is_error = block.get("is_error", False)
tool_use_result = raw.get("toolUseResult", {})
if not isinstance(tool_use_result, dict):
tool_use_result = {}
stdout = tool_use_result.get("stdout", "")
stderr = tool_use_result.get("stderr", "")
display = stdout or result_text or ""
if is_error and stderr:
display = stderr
level = "error" if is_error else "tool_result"
entries.append(LogEntry(timestamp=ts, source="claude", level=level, message=display[:2000]))
if has_tool_result:
return entries
text = "\n".join(
block.get("text", "") for block in content if isinstance(block, dict) and block.get("type") == "text"
)
elif isinstance(content, str):
text = content
else:
text = ""
text = extract_text_content(content)
if text:
entries.append(LogEntry(timestamp=ts, source="user", level="info", message=text))
return entries
@ -193,10 +206,13 @@ def scan_sessions(projects_dir: Path) -> list[SessionInfo]:
return sessions
_DECODE_SKIP = frozenset({"Users", "private", "tmp", ""})
def decode_project_name(encoded: str) -> str:
parts = encoded.split("-")
if len(parts) >= 2 and parts[0] == "":
meaningful = [p for p in parts if p not in ("Users", "private", "tmp", "")]
meaningful = [p for p in parts if p not in _DECODE_SKIP]
if meaningful:
return "/".join(meaningful[-2:]) if len(meaningful) >= 2 else meaningful[-1]
return encoded
@ -215,13 +231,14 @@ def quick_session_info( # noqa: C901, PLR0912
message_count = 0
cwd = ""
_loads = json.loads
try:
with path.open() as f:
for i, line in enumerate(f):
if not line.strip():
if not line or line.isspace():
continue
try:
raw = json.loads(line)
raw = _loads(line)
except json.JSONDecodeError:
continue
ts = ts_to_epoch(raw.get("timestamp"))